OPC UA客户端实现(续)
// 创建客户端配置 OpcUaClientConfig config = OpcUaClientConfig.builder() .setEndpoint(endpoint) .setIdentityProvider(new UsernameProvider("mes_user", "password123")) .setRequestTimeout(5000) .build(); // 建立连接 this.client = OpcUaClient.create(config); this.client.connect().get(); // 初始化订阅 setupSubscription(); } private void setupSubscription() throws Exception { // 创建订阅(500ms采样间隔) Subscription subscription = Subscription.create(client) .setPublishingInterval(500.0) .build() .get(); // 添加数据变化监听器 subscription.addDataChangeListener((items, values) -> { for (int i = 0; i < items.size(); i++) { String nodeId = items.get(i).getNodeId().toParseableString(); log.info("OPC UA数据变化: Node={}, Value={}", nodeId, values.get(i).getValue()); // 触发MES事件处理 eventProcessor.handleOPCEvent(nodeId, values.get(i)); } }); // 添加预定义的节点(示例为设备状态节点) subscribeNode(subscription, "ns=2;s=Device1/Status"); subscribeNode(subscription, "ns=2;s=Device1/Temperature"); } private void subscribeNode(Subscription subscription, String nodeIdStr) throws Exception { NodeId nodeId = NodeId.parse(nodeIdStr); subscribedNodes.put(nodeIdStr, nodeId); // 创建监控项 MonitoringParameters parameters = new MonitoringParameters( MonitoringMode.Reporting, 1000.0, // 采样间隔 null, // 过滤器 10, // 队列大小 true // 丢弃最旧数据 ); // 添加到订阅 subscription.addMonitoredItem( MonitoredItemCreateRequest.builder() .setNodeId(nodeId) .setAttributeId(AttributeId.Value) .setMonitoringParameters(parameters) .build() ).get(); } /** * 读取设备当前值(同步调用) */ public DataValue readNodeValue(String nodeIdStr) throws Exception { NodeId nodeId = subscribedNodes.get(nodeIdStr); if (nodeId == null) { nodeId = NodeId.parse(nodeIdStr); } return client.readValue(0, TimestampsToReturn.Both, nodeId).get(); } /** * 写入设备参数(带类型转换) */ public StatusCode writeNodeValue(String nodeIdStr, Object value, Class<?> type) throws Exception { NodeId nodeId = NodeId.parse(nodeIdStr); Variant variant = new Variant(convertType(value, type)); DataValue dataValue = new DataValue(variant); return client.writeValue(nodeId, dataValue).get(); } @PreDestroy public void shutdown() { if (client != null) { client.disconnect(); } }3. MQTT协议实现(IoT设备通信)
依赖配置:
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version> </dependency>MQTT服务实现:
@Service @Slf4j public class MESMqttService implements MqttCallback { private IMqttClient client; private final MqttConfig config; public MESMqttService(MqttConfig config) { this.config = config; initialize(); } private void initialize() { try { // 客户端ID使用应用实例ID+随机后缀 String clientId = config.getClientPrefix() + "-" + UUID.randomUUID().toString().substring(0,8); this.client = new MqttClient(config.getBrokerUrl(), clientId); // 配置连接选项 MqttConnectOptions options = new MqttConnectOptions(); options.setAutomaticReconnect(true); options.setCleanSession(false); options.setConnectionTimeout(10); options.setKeepAliveInterval(60); if (config.getUsername() != null) { options.setUserName(config.getUsername()); options.setPassword(config.getPassword().toCharArray()); } // 设置SSL(如果需要) if (config.isSslEnabled()) { options.setSocketFactory(SSLUtils.getSslSocketFactory()); } // 连接并设置回调 client.connect(options); client.setCallback(this); // 订阅标准主题 subscribeTopics(); } catch (MqttException e) { log.error("MQTT初始化失败", e); } } private void subscribeTopics() throws MqttException { // 设备状态主题(QoS 1) client.subscribe("mes/+/status", 1); // 报警主题(QoS 2) client.subscribe("mes/+/alarm", 2); // 生产数据主题 client.subscribe("prod/+/data", 1); } @Override public void messageArrived(String topic, MqttMessage message) { try { String payload = new String(message.getPayload(), StandardCharsets.UTF_8); log.debug("收到MQTT消息: Topic={}, QoS={}, Payload={}", topic, message.getQos(), payload); // 根据主题路由处理 if (topic.contains("status")) { handleStatusMessage(topic, payload); } else if (topic.contains("alarm")) { handleAlarmMessage(topic, payload); } else if (topic.contains("prod/data")) { handleProductionData(topic, payload); } } catch (Exception e) { log.error("MQTT消息处理异常", e); } } /** * 发布设备控制命令 */ public void publishCommand(String deviceId, String command, int qos) { String topic = "mes/" + deviceId + "/command"; MqttMessage message = new MqttMessage(command.getBytes()); message.setQos(qos); message.setRetained(false); try { client.publish(topic, message); } catch (MqttException e) { log.error("MQTT发布失败: Topic={}", topic, e); } } @PreDestroy public void disconnect() { try { if (client != null && client.isConnected()) { client.disconnect(); } } catch (MqttException e) { log.warn("MQTT断开连接异常", e); } } // 其他接口方法实现... }4. 协议网关设计(统一接入层)
@RestController @RequestMapping("/api/gateway") public class ProtocolGatewayController { @Autowired private MESRestClient restClient; @Autowired private OPCUAMesClient opcuaClient; @Autowired private MESMqttService mqttService; /** * 统一数据上报接口 */ @PostMapping("/report") public ResponseEntity<?> reportData(@RequestBody UnifiedReportRequest request) { switch (request.getProtocolType()) { case "REST": return restClient.sendWorkOrder(request.getTargetUrl(), request.getPayload(), Map.class); case "OPCUA": opcuaClient.writeNodeValue( request.getNodeId(), request.getPayload().get("value"), request.getValueType()); return ResponseEntity.ok().build(); case "MQTT": mqttService.publishCommand( request.getDeviceId(), request.getPayload().toString(), request.getQos()); return ResponseEntity.accepted().build(); default: throw new IllegalArgumentException("不支持的协议类型"); } } /** * 设备状态查询(多协议聚合) */ @GetMapping("/device/{id}/status") public DeviceStatus getDeviceStatus(@PathVariable String id) { DeviceStatus status = new DeviceStatus(); // 从OPCUA获取实时数据 try { DataValue value = opcuaClient.readNodeValue("ns=2;s=" + id + "/Status"); status.setOpcuStatus(value.getValue().toString()); } catch (Exception e) { log.warn("OPCUA查询失败", e); } // 从MQTT获取最后上报状态 status.setMqttStatus(mqttService.getLastStatus(id)); return status; } }5. 异常处理与监控
@ControllerAdvice public class MESExceptionHandler { @ExceptionHandler(OPCUaException.class) public ResponseEntity<ErrorResponse> handleOPCException(OPCUaException ex) { ErrorResponse response = new ErrorResponse( "OPC_UA_ERROR", ex.getMessage(), System.currentTimeMillis()); return ResponseEntity .status(HttpStatus.INTERNAL_SERVER_ERROR) .body(response); } @ExceptionHandler(MqttException.class) public ResponseEntity<ErrorResponse> handleMqttException(MqttException ex) { ErrorResponse response = new ErrorResponse( "MQTT_COMM_ERROR", "MQTT通信异常: " + ex.getReasonCode(), System.currentTimeMillis()); return ResponseEntity .status(HttpStatus.SERVICE_UNAVAILABLE) .body(response); } } // 健康检查端点 @RestController @RequestMapping("/health") public class HealthController { @GetMapping public Map<String, Object> healthCheck() { return Map.of( "status", "UP", "components", Map.of( "opcua", opcuaClient.isConnected(), "mqtt", mqttService.isConnected(), "rest", restClient.isAvailable() ), "timestamp", Instant.now() ); } }6. 性能优化建议
连接池配置(REST):
@Bean public RestTemplate restTemplate() { PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(); connectionManager.setMaxTotal(100); connectionManager.setDefaultMaxPerRoute(20); CloseableHttpClient httpClient = HttpClients.custom() .setConnectionManager(connectionManager) .evictIdleConnections(30, TimeUnit.SECONDS) .build(); return new RestTemplate(new HttpComponentsClientHttpRequestFactory(httpClient)); }OPC UA订阅优化:
// 在订阅配置中添加死区处理(减少不必要的数据传输) parameters.setFilter(new DataChangeFilter( DataChangeTrigger.StatusOrValue, new DeadbandFilter(DeadbandType.Absolute, 0.5) // 值变化超过0.5才上报 ));MQTT消息批处理:
// 使用队列批量处理消息 @Bean public Queue<MqttMessage> mqttBatchQueue() { return new LinkedBlockingQueue<>(1000); } @Scheduled(fixedRate = 1000) public void processMessageBatch() { List<MqttMessage> batch = new ArrayList<>(100); mqttBatchQueue.drainTo(batch, 100); if (!batch.isEmpty()) { dataProcessor.bulkProcess(batch); } }7. 安全实现方案
OPC UA证书管理:
// 自定义证书验证逻辑 client.setCertificateValidator(new CertificateValidator() { @Override public void validate(Certificate certificate) throws UaException { // 检查证书指纹 String thumbprint = certificate.getThumbprint(); if (!trustedCertificates.contains(thumbprint)) { throw new UaException(StatusCodes.Bad_SecurityChecksFailed, "证书未授权: " + thumbprint); } } });MQTT TLS配置:
public class SSLUtils { public static SSLSocketFactory getSslSocketFactory() { try { // 加载信任的CA证书 CertificateFactory cf = CertificateFactory.getInstance("X.509"); Certificate caCert = cf.generateCertificate( new FileInputStream("path/to/ca.crt")); // 创建KeyStore KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType()); keyStore.load(null, null); keyStore.setCertificateEntry("ca", caCert); // 创建TrustManager TrustManagerFactory tmf = TrustManagerFactory .getInstance(TrustManagerFactory.getDefaultAlgorithm()); tmf.init(keyStore); // 创建SSL上下文 SSLContext context = SSLContext.getInstance("TLSv1.2"); context.init(null, tmf.getTrustManagers(), null); return context.getSocketFactory(); } catch (Exception e) { throw new RuntimeException("SSL配置失败", e); } } }接口权限控制:
@PreAuthorize("hasAnyRole('MES_OPERATOR', 'MES_ADMIN')") @PostMapping("/workorder") public ResponseEntity<?> createWorkOrder(@RequestBody WorkOrder order) { // 实现逻辑 }8. 部署架构建议
graph TD A[ERP系统] -->|REST API| B[MES集成服务] C[PLC设备] -->|OPC UA| B D[IoT设备] -->|MQTT| B B -->|Kafka| E[实时数仓] B -->|JDBC| F[关系数据库] B -->|REST| G[可视化大屏]容器化配置示例:
FROM openjdk:17-jdk-slim WORKDIR /app COPY target/mes-integration.jar . EXPOSE 8080 4840 1883 # OPC UA防火墙规则 iptables -A INPUT -p tcp --dport 4840 -j ACCEPT # MQTT防火墙规则 iptables -A INPUT -p tcp --dport 1883 -j ACCEPT CMD ["java", "-jar", "mes-integration.jar"]以上实现方案涵盖了MES系统对接的主要协议和技术要点,可根据实际需求进行裁剪和扩展。建议在实施时:
- 针对具体MES系统版本调整接口规范
- 根据设备规模调整连接池和线程池配置
- 实施完善的日志监控和告警机制
- 进行充分的协议兼容性测试