当前位置: 首页 > news >正文

Java实现Modbus读写数据

背景

由于当时项目周期赶,引入了一个PLC4X组件,上手快。接下来就是使用这个组件遇到的一些问题:

  • 关闭连接NioEventLoop没有释放导致oom
  • 设计思想是一个设备一个连接,而不是一个网关一个连接
  • 连接断开后客户端无从感知
    前两个问题解决方案参考上篇文章,最后一个问题虽然可以通过isConnect()方法获取到状态,但是连接断开后这个状态并没有更新,只能代码实现失败重连。
    所以为了解决以上问题,我打算重新封装一个Modbus组件。

步骤

代码如下所示,目前只分享modbus-core相关的代码。

  • modbus-core:实现设备读写指令的下发以及应答。
  • modbus-app:实现通用的可灵活配置的modbus设备接入层,通过更新配置信息即可快速引入新设备,无需手写代码重启应用。

在这里插入图片描述
为了快速实现modbus组件封装,这里引入了Vertx框架(基于事件+异步)官网链接,而不是原生的Netty框架。

引入架包

<!-- 目前我这里引入最新的版本(4.4.4) -->
<dependency><groupId>io.vertx</groupId><artifactId>vertx-core</artifactId><version>${vertx.version}</version></dependency>

工具类

ByteUtil

package com.bho.modbus.utils;import java.nio.ByteBuffer;public class ByteUtil {/*** 字节数组转字符串* @param bytes* @return*/public static  String bytesToHexString(byte[] bytes) {StringBuffer sb = new StringBuffer(bytes.length);String sTemp;for (int i = 0; i < bytes.length; i++) {sTemp = Integer.toHexString(0xFF & bytes[i]);if (sTemp.length() < 2) {sb.append(0);}sb.append(sTemp.toUpperCase());}return sb.toString();}/*** int整型转字节数组* @param data* @param offset* @param len* @return*/public static byte[] intToBytes(int data, int offset, int len) {ByteBuffer buffer = ByteBuffer.allocate(4);buffer.putInt(data);byte[] bytes = buffer.array();if (len - offset == 4) {return bytes;}byte[] dest = new byte[len];System.arraycopy(bytes, offset, dest, 0, len);return dest;}/*** 字节数组转int整型* @param bytes* @param offset* @param len* @return*/public static int bytesToInt(byte[] bytes, int offset, int len) {ByteBuffer buffer = ByteBuffer.allocate(4);for (int i = len; i < 4; i ++) {buffer.put((byte) 0x00);}for (int i = offset; i < offset + len; i++) {buffer.put(bytes[i]);}buffer.flip();return buffer.getInt();}}

Crc16

package com.bho.modbus.utils;public class Crc16 {/*** 获取CRC16校验码* @param arr_buff* @return*/public static byte[] getCrc16(byte[] arr_buff) {int len = arr_buff.length;// 预置 1 个 16 位的寄存器为十六进制FFFF, 称此寄存器为 CRC寄存器。int crc = 0xFFFF;int i, j;for (i = 0; i < len; i++) {// 把第一个 8 位二进制数据 与 16 位的 CRC寄存器的低 8 位相异或, 把结果放于 CRC寄存器crc = ((crc & 0xFF00) | (crc & 0x00FF) ^ (arr_buff[i] & 0xFF));for (j = 0; j < 8; j++) {// 把 CRC 寄存器的内容右移一位( 朝低位)用 0 填补最高位, 并检查右移后的移出位if ((crc & 0x0001) > 0) {// 如果移出位为 1, CRC寄存器与多项式A001进行异或crc = crc >> 1;crc = crc ^ 0xA001;} else// 如果移出位为 0,再次右移一位crc = crc >> 1;}}return intToBytes(crc);}private static byte[] intToBytes(int value) {byte[] src = new byte[2];src[1] = (byte) ((value >> 8) & 0xFF);src[0] = (byte) (value & 0xFF);return src;}}

实体类

ModbusMode

目前只实现了以下两种通信方式,可根据自己需求加入其它通信方式。

package com.bho.modbus.model;import com.bho.modbus.utils.ByteUtil;
import com.bho.modbus.utils.Crc16;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import lombok.extern.log4j.Log4j2;import java.nio.ByteOrder;@Log4j2
public enum ModbusMode {/*** 【事务ID(2) + 协议标识(2) + 数据长度(2)】 + 从机地址(1) + 功能码(1) + 数据区(N)*/TCP,/*** 从机地址(1) + 功能码(1) + 数据区(N) + 【校验码(2)】**/RTU,;public ByteToMessageDecoder getDecoder() {if (this == ModbusMode.TCP) {return new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN, 65536, 4,2, 0, 6, true);}if (this == ModbusMode.RTU){return new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN, 65536, 2,1, 2, 0, true);}return null;}public byte[] readData(byte[] bytes) {int len = bytes.length;if (this == ModbusMode.RTU) {byte[] tempArr = new byte[len - 2];System.arraycopy(bytes, 0, tempArr, 0, tempArr.length);byte[] crc16 = Crc16.getCrc16(tempArr);if (crc16[0] != bytes[len -2] || crc16[1] != bytes[len - 1]) {log.error("Modbus receive illegal data:{}", ByteUtil.bytesToHexString(bytes));return null;}if (log.isDebugEnabled()) {log.debug("read data:{}", ByteUtil.bytesToHexString(tempArr));}return tempArr;}if (this == ModbusMode.TCP) {if (log.isDebugEnabled()) {log.debug("read data:{}", ByteUtil.bytesToHexString(bytes));}return bytes;}return null;}public byte[] writeData(byte[] bytes) {if (log.isDebugEnabled()) {log.debug("write data:{}",ByteUtil.bytesToHexString(bytes));}int len = bytes.length;if (this == ModbusMode.RTU) {byte[] crc16 = Crc16.getCrc16(bytes);byte[] tempArr = new byte[len + 2];System.arraycopy(bytes, 0, tempArr, 0, len);tempArr[len] = crc16[0];tempArr[len + 1] = crc16[1];return tempArr;}if (this == ModbusMode.TCP) {byte[] tempArr = new byte[len + 6];tempArr[1] = 0x01;byte[] lenBytes = ByteUtil.intToBytes(len, 2, 2);tempArr[4] = lenBytes[0];tempArr[5] = lenBytes[1];System.arraycopy(bytes, 0, tempArr, 6, len);return tempArr;}return null;}}

ModbusFunc

功能码

package com.bho.modbus.model;/*** Modbus常见功能码*/
public enum ModbusFunc {/*** 错误代码* 01:非法的功能码* 02:非法的寄存器地址* 03:非法的数据值* 04:从机故障*//*** 请求:*      功能代码:1字节 0x01*      起始地址:2字节 0x0000-0xffff*      线圈数量:2字节 0x0001-0x07d0(2000)** 正确响应:*      功能代码:1字节 0x01*        字节数:1字节 N(读线圈个数/8,余数不为0则加1)*      线圈状态:N字节** 错误响应:*      功能代码:1字节 0x81*      错误代码:1字节 0x01-0x04*/READ_COILS((byte)0x01),//读连续线圈状态READ_DISCRETE_COILS((byte)0x02),//读离散线圈状态 同上/*** 请求:*       功能代码:1字节 0x03*       起始地址:2字节 0x0000-0xffff*     寄存器数量:2字节 0x0001-0x007d(125)** 正确响应:*       功能代码:1字节 0x03*         字节数:1字节 2N(N为寄存器数量)*      寄存器数量:2N字节** 错误响应:*      功能代码:1字节 0x83*      错误代码:1字节 0x01-0x04*/READ_HOLDING_REGISTERS((byte)0x03),//读保持寄存器值READ_INPUT_REGISTERS((byte)0x04),//读输入寄存器值 同上/*** 请求:*      功能代码:1字节 0x05*      起始地址:2字节 0x0000-0xffff*      线圈状态:2字节 0x0000/0xff00** 正确响应:*      功能代码:1字节 0x05*      起始地址:2字节 0x0000-0xffff*      线圈状态:2字节 0x0000/0xff00** 错误响应:*      功能代码:1字节 0x85*      错误代码:1字节 0x01-0x04*/WRITE_SINGLE_COILS((byte)0x05),//写单个线圈/*** 请求:*      功能代码:1字节 0x06*      起始地址:2字节 0x0000-0xffff*      寄存器值:2字节 0x0000-0xffff** 正确响应:*      功能代码:1字节 0x06*      起始地址:2字节 0x0000-0xffff*      寄存器值:2字节 0x0000-0xffff** 错误响应:*      功能代码:1字节 0x86*      错误代码:1字节 0x01-0x04*/WRITE_SINGLE_HOLDING_REGISTERS((byte)0x06),//写单个保持寄存器/*** 请求:*      功能代码:1字节 0x10*      起始地址:2字节 0x0000-0xffff* 写入寄存器个数:2字节 0x0001-0x007b(123)*    写入字节数:1字节 2N(N为寄存器个数)*     寄存器值:2N字节 0x0000-0xffff** 正确响应:*      功能代码:1字节 0x10*      起始地址:2字节 0x0000-0xffff* 写入寄存器个数:2字节 0x0001-0x007b(123)** 错误响应:*      功能代码:1字节 0x90*      错误代码:1字节 0x01-0x04*/WRITE_MULTI_HOLDING_REGISTERS((byte)0x10),//写多个保持寄存器/*** 请求:*      功能代码:1字节 0x0F*      起始地址:2字节 0x0000-0xffff*   写入线圈个数:2字节 0x0001-0x07b0(1968)*    写入字节数:1字节 N(N为线圈个数/8,余数不为0则加1)*     线圈状态:N字节** 正确响应:*      功能代码:1字节 0x0F*      起始地址:2字节 0x0000-0xffff*   写入线圈个数:2字节 0x0001-0x07b0(1968)** 错误响应:*      功能代码:1字节 0x8F*      错误代码:1字节 0x01-0x04*/WRITE_MULTI_COILS((byte)0x0F),//写多个线圈;private byte func;ModbusFunc(byte func) {this.func = func;}public byte getFunc() {return func;}
}

ModbusParamConfig

下发指令参数配置信息

package com.bho.modbus.model;import lombok.Data;@Data
public class ModbusParamConfig {private RegisterType registerType;//寄存器类型private int registerAddress;//寄存器地址private String name;//指标名称private DataType dataType;//指标数据类型private int numberSplit;//(除)倍数public enum RegisterType {COIL,HOLDING_REGISTER,INPUT_REGISTER;}public enum DataType {BOOL,FLOAT,INT;}}

SendCmdTask

下发指令任务

package com.bho.modbus.model;import com.alibaba.fastjson.JSONObject;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import lombok.Data;import java.util.List;@Data
public class SendCmdTask {private List<ModbusParamConfig> paramConfigs;//参数列表private JSONObject reqParam;//请求参数 写数据必填private Boolean isWrite;//是否是写数据private Integer slaveId;//从机IDprivate Integer reqTimeout;//请求超时时间(秒)private Promise<JSONObject> promise;private Long timerId;public SendCmdTask(Vertx vertx, List<ModbusParamConfig> paramConfigs, JSONObject reqParam, Boolean isWrite, Integer slaveId, Integer reqTimeout) {this.paramConfigs = paramConfigs;this.reqParam = reqParam;this.isWrite = isWrite;this.slaveId = slaveId;this.reqTimeout = Math.max(reqTimeout, 5);Promise<JSONObject> promise = Promise.promise();this.promise = promise;this.timerId = vertx.setTimer(reqTimeout * 1000, hh -> promise.tryFail("Request timeout"));}
}

核心类

package com.bho.modbus.core;import com.alibaba.fastjson.JSONObject;
import com.bho.modbus.model.SendCmdTask;
import com.bho.modbus.model.ModbusFunc;
import com.bho.modbus.model.ModbusMode;
import com.bho.modbus.model.ModbusParamConfig;import com.bho.modbus.utils.ByteUtil;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.buffer.impl.BufferImpl;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.impl.NetSocketImpl;
import lombok.extern.log4j.Log4j2;import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;@Log4j2
public class ModbusConnection {private String ip;//从机IPprivate Integer port;//从机端口private AtomicBoolean isAlive;//从机是否在线private ModbusMode mode;//通讯模式private NetSocket netSocket;//客户端连接private boolean isInitiativeClose;//是否是主动关闭连接private Long failRetryTimerId;//失败重试定时器IDprivate Integer failRetryIntervalSecond;//连接断开后重连间隔时间private Integer reqTimeoutSecond = 1;//请求超时时间private Long queueTimerId;//队列定时器private ConcurrentLinkedQueue<SendCmdTask> writeQueue;//写队列 优先写private ConcurrentLinkedQueue<SendCmdTask> readQueue;//读队列private Map<String, Promise<byte[]>> promiseMap;private Vertx vertx;public ModbusConnection(Vertx vertx, String ip, Integer port, Integer failRetryIntervalSecond, ModbusMode mode) {this.vertx = vertx;this.ip = ip;this.port = port;this.failRetryIntervalSecond = failRetryIntervalSecond;this.mode = mode;this.isAlive = new AtomicBoolean(false);this.writeQueue = new ConcurrentLinkedQueue<>();this.readQueue = new ConcurrentLinkedQueue<>();this.promiseMap = new ConcurrentHashMap<>();consumerTaskQueue(true);}/*** 建立连接* @return*/public Future<Boolean> connect(){NetClient netClient = vertx.createNetClient();return vertx.executeBlocking(b -> {netClient.connect(port, ip).onSuccess(socket -> {log.info("Modbus connect success, ip:{}, port:{}", ip, port);netSocket  = socket;isAlive.set(true);b.tryComplete(true);NetSocketImpl netSocketImpl = (NetSocketImpl) socket;netSocketImpl.channelHandlerContext().pipeline().addFirst(mode.getDecoder());socket.handler(buf -> {byte[] bytes = mode.readData(buf.getBytes());if (bytes == null) {return;}int slaveId = ByteUtil.bytesToInt(bytes, 0, 1);int funcNo = ByteUtil.bytesToInt(bytes, 1, 1);int errFuncNo = funcNo - 128;String key = String.format("%s_%s", slaveId, funcNo);String errKey = String.format("%s_%s", slaveId, errFuncNo);if (promiseMap.containsKey(key)) {Promise<byte[]> promise = promiseMap.get(key);byte[] content = new byte[bytes.length - 2];System.arraycopy(bytes, 2, content, 0, content.length);promise.tryComplete(content);} else if (promiseMap.containsKey(errKey)) {Promise<byte[]> promise = promiseMap.get(errKey);int data = ByteUtil.bytesToInt(bytes, 2, 1);switch (data) {case 1:promise.tryFail("Illegal function code");break;case 2:promise.tryFail("Illegal register address");break;case 3:promise.tryFail("Illegal data value");break;case 4:promise.tryFail("Slave fault");break;}}});socket.closeHandler(h -> {if (!isInitiativeClose) {log.error("Modbus connect close, ip:{}, port:{}", ip, port);failRetryTimerId = vertx.setTimer(failRetryIntervalSecond * 1000, hh -> connect());} else {log.info("Modbus connect close, ip:{}, port:{}", ip, port);}});}).onFailure(err -> {log.error("Modbus connect fail, ip:{}, port:{}, msg:{}", ip, port, err.getMessage());isAlive.set(false);b.fail(err.getMessage());failRetryTimerId = vertx.setTimer(failRetryIntervalSecond * 1000, h -> connect());});});}/*** 是否在线* @return*/public boolean isActive() {return isAlive.get();}/*** 断开连接*/public void close() {isInitiativeClose = true;if (failRetryTimerId != null) {vertx.cancelTimer(failRetryTimerId);}if (queueTimerId != null) {vertx.cancelTimer(queueTimerId);}if (netSocket != null) {netSocket.close();}}/*** 下发读写任务(串行 优先写任务)* 若并行可直接调用executeTask执行任务,无需排队等候一个个消费任务* @param task 读写任务* @return*/public Promise<JSONObject> offerTask(SendCmdTask task) {if (task.getIsWrite()) {writeQueue.offer(task);} else {readQueue.offer(task);}return task.getPromise();}/*** 消费任务队列 500毫秒轮询一次 优先消费写任务* @param delayFlag*/private void consumerTaskQueue(boolean delayFlag){if(delayFlag){queueTimerId = vertx.setTimer(500,id->{consumerTaskQueue(false);});return;}if(writeQueue.isEmpty() && readQueue.isEmpty()){consumerTaskQueue(true);return;}if(!writeQueue.isEmpty()){SendCmdTask sendCmdTask = writeQueue.poll();sendCmdTask.getPromise().future().onComplete(h->{consumerTaskQueue(false);});executeTask(sendCmdTask);return;}if(!readQueue.isEmpty()){SendCmdTask sendCmdTask = readQueue.poll();sendCmdTask.getPromise().future().onComplete(h->{consumerTaskQueue(false);});executeTask(sendCmdTask);}}private Future<Void> executeTask(SendCmdTask sendCmdTask){vertx.cancelTimer(sendCmdTask.getTimerId());Future<JSONObject> future;if (sendCmdTask.getIsWrite()) {future = executeWrite(sendCmdTask.getReqParam(), sendCmdTask.getParamConfigs(), sendCmdTask.getSlaveId());} else {future = executeQuery(sendCmdTask.getParamConfigs(), sendCmdTask.getSlaveId());}return future.onSuccess(res -> sendCmdTask.getPromise().tryComplete(res)).onFailure(err -> sendCmdTask.getPromise().tryFail(err)).map(o -> null);}/*** 写数据* @param reqParam 下发参数* @param paramConfigs 参数配置列表* @param slaveId 从机ID* @return*/private Future<JSONObject> executeWrite(JSONObject reqParam, List<ModbusParamConfig> paramConfigs, Integer slaveId) {if (!isActive()) {return Future.failedFuture("Gateway offline");}boolean isMerge = isMergeSendCmd(paramConfigs);if (isMerge) {int registerAddress = paramConfigs.get(0).getRegisterAddress();ModbusParamConfig.RegisterType registerType = paramConfigs.get(0).getRegisterType();Promise<byte[]> promise = Promise.promise();List<String> keyList = paramConfigs.stream().map(ModbusParamConfig::getName).collect(Collectors.toList());return vertx.executeBlocking(h -> {Buffer buffer = getWriteCmd(registerAddress, slaveId, reqParam, keyList, registerType, promise);netSocket.write(buffer);promise.future().onSuccess(buf -> {h.complete(reqParam);}).onFailure(err -> {log.error("Modbus executeWrite fail, ip:{}, port:{}, slaveId:{}, msg:{}", ip, port, slaveId, err.getMessage());h.tryFail(err.getMessage());});});}List<Future<Object>> futures = new ArrayList<>();Future blockingFuture = Future.succeededFuture();for (int i = 0; i < paramConfigs.size(); i++) {ModbusParamConfig paramConfig = paramConfigs.get(i);ModbusParamConfig.RegisterType registerType = paramConfig.getRegisterType();Promise<byte[]> promise = Promise.promise();blockingFuture = blockingFuture.compose(suc -> singleExecuteWrite(slaveId, reqParam, promise, registerType, paramConfig),err -> singleExecuteWrite(slaveId, reqParam, promise, registerType, paramConfig));futures.add(blockingFuture);}return commonReplyResult(futures, paramConfigs);}private Future<Object> singleExecuteWrite(int slaveId, JSONObject reqParam, Promise<byte[]> promise, ModbusParamConfig.RegisterType registerType, ModbusParamConfig paramConfig) {return vertx.executeBlocking(h -> {Buffer buffer = getWriteCmd(paramConfig.getRegisterAddress(), slaveId, reqParam, Arrays.asList(paramConfig.getName()), registerType, promise);netSocket.write(buffer);promise.future().onSuccess(buf -> {h.tryComplete(reqParam.get(paramConfig.getName()));}).onFailure(err -> {log.error("Modbus executeWrite fail, ip:{}, port:{}, slaveId:{}, key:{}, msg:{}",ip, port, slaveId, paramConfig.getName(), err.getMessage());h.tryFail(err.getMessage());});});}/*** 读数据* @param paramConfigs 参数配置列表* @param slaveId 从机ID* @return*/private Future<JSONObject> executeQuery(List<ModbusParamConfig> paramConfigs, Integer slaveId) {if (!isActive()) {return Future.failedFuture("Gateway offline");}boolean isMerge = isMergeSendCmd(paramConfigs);if (isMerge) {int registerAddress = paramConfigs.get(0).getRegisterAddress();ModbusParamConfig.RegisterType registerType = paramConfigs.get(0).getRegisterType();int num = paramConfigs.size();Promise<byte[]> promise = Promise.promise();Buffer buffer = getQueryCmd(registerAddress, num, slaveId, registerType, promise);return vertx.executeBlocking(h -> {netSocket.write(buffer);promise.future().onSuccess(buf -> {JSONObject jsonObject = new JSONObject();for (int i = 0; i < paramConfigs.size(); i++) {ModbusParamConfig paramConfig = paramConfigs.get(i);switch (registerType) {case COIL:Integer pow = Double.valueOf(Math.pow(2, i % 8)).intValue();jsonObject.put(paramConfig.getName(), (pow & buf[i / 8 + 1]) == pow);break;case INPUT_REGISTER:case HOLDING_REGISTER:jsonObject.put(paramConfig.getName(), getValue(ByteUtil.bytesToInt(buf, i * 2 + 1, 2), paramConfig.getNumberSplit(), paramConfig.getDataType()));break;}}h.complete(jsonObject);}).onFailure(err -> {log.error("Modbus executeQuery fail, ip:{}, port:{}, slaveId:{}, msg:{}", ip, port, slaveId, err.getMessage());h.tryFail(err.getMessage());});});}List<Future<Object>> futures = new ArrayList<>();Future blockingFuture = Future.succeededFuture();for (int i = 0; i < paramConfigs.size(); i++) {ModbusParamConfig paramConfig = paramConfigs.get(i);ModbusParamConfig.RegisterType registerType = paramConfig.getRegisterType();Promise<byte[]> promise = Promise.promise();blockingFuture = blockingFuture.compose(suc -> singleExecuteQuery(slaveId, promise, registerType, paramConfig),err -> singleExecuteQuery(slaveId, promise, registerType, paramConfig));futures.add(blockingFuture);}return commonReplyResult(futures, paramConfigs);}private Future<Object> singleExecuteQuery(int slaveId, Promise<byte[]> promise, ModbusParamConfig.RegisterType registerType, ModbusParamConfig paramConfig) {return vertx.executeBlocking(h -> {Buffer buffer = getQueryCmd(paramConfig.getRegisterAddress(), 1, slaveId, paramConfig.getRegisterType(), promise);netSocket.write(buffer);promise.future().onSuccess(buf -> {switch (registerType) {case COIL:h.complete(Integer.valueOf(buf[1]) == 1);break;case INPUT_REGISTER:case HOLDING_REGISTER:h.complete(getValue(ByteUtil.bytesToInt(buf, 1, 2), paramConfig.getNumberSplit(), paramConfig.getDataType()));break;}}).onFailure(err -> {log.error("Modbus executeQuery fail, ip:{}, port:{}, slaveId:{}, key:{}, msg:{}",ip, port, slaveId, paramConfig.getName(), err.getMessage());h.tryFail(err.getMessage());});});}/*** 如果所有参数寄存器类型一致并且地址连续 则合并成一条命令下发* @param paramConfigs* @return 是否可以合并下发命令*/private boolean isMergeSendCmd(List<ModbusParamConfig> paramConfigs) {if (paramConfigs.size() == 1) {return false;}int lastPos = paramConfigs.get(0).getRegisterAddress();ModbusParamConfig.RegisterType registerType = paramConfigs.get(0).getRegisterType();for (int i = 1; i < paramConfigs.size(); i++) {int curPos = paramConfigs.get(i).getRegisterAddress();if (curPos - lastPos != 1) {return false;}ModbusParamConfig.RegisterType curRegisterType = paramConfigs.get(i).getRegisterType();if (registerType != curRegisterType) {return false;}lastPos = curPos;}return true;}/*** 获取查询数据命令* @param startPos 查询地址* @param num 查询数量* @param slaveId 从机ID* @param registerType 寄存器类型* @param promise* @return*/private Buffer getQueryCmd(int startPos, int num, int slaveId, ModbusParamConfig.RegisterType registerType, Promise<byte[]> promise) {byte[] bytes = new byte[6];bytes[0] = ByteUtil.intToBytes(slaveId, 3, 1)[0];switch (registerType) {case COIL:bytes[1] = ModbusFunc.READ_COILS.getFunc();break;case HOLDING_REGISTER:bytes[1] = ModbusFunc.READ_HOLDING_REGISTERS.getFunc();break;case INPUT_REGISTER:bytes[1] = ModbusFunc.READ_INPUT_REGISTERS.getFunc();break;}Integer func = ByteUtil.bytesToInt(bytes, 1, 1);String key = String.format("%s_%s", slaveId, func);byte[] startPosBytes = ByteUtil.intToBytes(startPos, 0, 4);bytes[2] = startPosBytes[2];bytes[3] = startPosBytes[3];byte[] numBytes = ByteUtil.intToBytes(num, 0, 4);bytes[4] = numBytes[2];bytes[5] = numBytes[3];Buffer buffer = new BufferImpl();buffer.appendBytes(mode.writeData(bytes));promiseMap.put(key, promise);long timeId = vertx.setTimer(reqTimeoutSecond * 1000, h -> promise.tryFail("Request timeout"));promise.future().onComplete(res -> {promiseMap.remove(key);vertx.cancelTimer(timeId);});return buffer;}/*** 获取写数据命令* @param startPos 查询地址* @param slaveId 从机ID* @param reqParam 写参数* @param keys  参数列表* @param registerType 寄存器类型* @param promise* @return*/private Buffer getWriteCmd(int startPos, int slaveId, JSONObject reqParam,List<String> keys, ModbusParamConfig.RegisterType registerType, Promise<byte[]> promise) {int len = keys.size() == 1 ? 6 : (registerType == ModbusParamConfig.RegisterType.HOLDING_REGISTER ?7 + keys.size() * 2 : 7 + Double.valueOf(Math.ceil(keys.size() / 8.0)).intValue());byte[] bytes = new byte[len];bytes[0] = ByteUtil.intToBytes(slaveId, 3, 1)[0];byte[] startPosBytes = ByteUtil.intToBytes(startPos, 0, 4);bytes[2] = startPosBytes[2];bytes[3] = startPosBytes[3];if (keys.size() == 1) {switch (registerType) {case COIL:bytes[1] = ModbusFunc.WRITE_SINGLE_COILS.getFunc();boolean value = reqParam.getBoolean(keys.get(0));if (value) {bytes[4] = (byte) 0xFF;} else {bytes[4] = 0x00;}bytes[5] = 0x00;break;case HOLDING_REGISTER:bytes[1] = ModbusFunc.WRITE_SINGLE_HOLDING_REGISTERS.getFunc();byte[] dataArr = ByteUtil.intToBytes(reqParam.getInteger(keys.get(0)), 2, 2);bytes[4] = dataArr[0];bytes[5] = dataArr[1];break;}} else {byte[] dataNum = ByteUtil.intToBytes(keys.size(), 2, 2);bytes[4] = dataNum[0];bytes[5] = dataNum[1];switch (registerType) {case COIL:bytes[1] = ModbusFunc.WRITE_MULTI_COILS.getFunc();int dataSize = Double.valueOf(Math.ceil(keys.size() / 8.0)).intValue();bytes[6] = ByteUtil.intToBytes(dataSize, 3, 1)[0];for (int i = 0; i < dataSize; i += 2) {int sum = 0;int startIndex = i * 8;int endIndex = (i + 2) * 8;endIndex = endIndex > keys.size() ? keys.size() : endIndex;for (int j = startIndex; j < endIndex; j++) {sum += Double.valueOf(Math.pow(2, j)).intValue() * (reqParam.getBoolean(keys.get(j)) ? 1 : 0);}byte[] sumArr = ByteUtil.intToBytes(sum, 2, 2);if (i + 8 < keys.size()) {bytes[i + 7] = sumArr[0];bytes[i + 8] = sumArr[1];} else {bytes[i + 7] = sumArr[1];}}break;case HOLDING_REGISTER:bytes[1] = ModbusFunc.WRITE_MULTI_HOLDING_REGISTERS.getFunc();bytes[6] = ByteUtil.intToBytes(keys.size() * 2, 3, 1)[0];for (int i = 0; i < keys.size(); i++) {String paramKey = keys.get(i);Integer value = reqParam.getInteger(paramKey);byte[] dataArr = ByteUtil.intToBytes(value, 2, 2);bytes[i * 2 + 7] = dataArr[0];bytes[i * 2 + 8] = dataArr[1];}break;}}Integer func = ByteUtil.bytesToInt(bytes, 1, 1);String key = String.format("%s_%s", slaveId, func);Buffer buffer = new BufferImpl();buffer.appendBytes(mode.writeData(bytes));promiseMap.put(key, promise);long timeId = vertx.setTimer(reqTimeoutSecond * 1000, h -> promise.tryFail("Request timeout"));promise.future().onComplete(res -> {promiseMap.remove(key);vertx.cancelTimer(timeId);});return buffer;}private Future<JSONObject> commonReplyResult(List<Future<Object >> futures, List<ModbusParamConfig> paramConfigs) {return vertx.executeBlocking(b -> {Future.join(futures).onComplete(h -> {JSONObject okJson = new JSONObject();JSONObject errJson = new JSONObject();for (int i = 0; i < paramConfigs.size(); i++) {ModbusParamConfig paramConfig = paramConfigs.get(i);Future<Object> objectFuture = futures.get(i);if (objectFuture.succeeded()) {okJson.put(paramConfig.getName(), objectFuture.result());} else {errJson.put(paramConfig.getName(), objectFuture.cause().getMessage());}}if (okJson.size() > 0) {b.tryComplete(okJson);} else {b.tryFail(errJson.getString(paramConfigs.get(0).getName()));}});});}private Object getValue(int value, int numberSplit, ModbusParamConfig.DataType dataType) {if (numberSplit == 1) {return value;}Float temp = value * 1f / numberSplit;switch (dataType) {case INT :return Math.round(temp);case FLOAT:return temp;}return temp;}}

测试

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.bho.modbus.model.ModbusMode;
import com.bho.modbus.core.ModbusConnection;
import com.bho.modbus.model.ModbusParamConfig;
import com.bho.modbus.model.SendCmdTask;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import lombok.extern.log4j.Log4j2;import java.util.List;@Log4j2
public class TestModbus {public static final String READ_DATA = "[" +"        {" +"          \"name\": \"a\"," +"          \"registerType\": \"HOLDING_REGISTER\"," +"          \"registerAddress\": 504," +"          \"dataType\": \"FLOAT\"," +"          \"numberSplit\": 10" +"        }," +"        {" +"          \"name\": \"b\"," +"          \"registerType\": \"HOLDING_REGISTER\"," +"          \"registerAddress\": 505," +"          \"dataType\": \"FLOAT\"," +"          \"numberSplit\": 10" +"        }," +"        {" +"          \"name\": \"c\"," +"          \"registerType\": \"HOLDING_REGISTER\"," +"          \"registerAddress\": 506," +"          \"dataType\": \"FLOAT\"," +"          \"numberSplit\": 10" +"        }," +"        {" +"          \"name\": \"d\"," +"          \"registerType\": \"HOLDING_REGISTER\"," +"          \"registerAddress\": 507," +"          \"dataType\": \"INT\"," +"          \"numberSplit\": 1" +"        }," +"        {" +"          \"name\": \"e\"," +"          \"registerType\": \"HOLDING_REGISTER\"," +"          \"registerAddress\": 508," +"          \"dataType\": \"INT\"," +"          \"numberSplit\": 1" +"        }]";private static final String WRITE_DATA = "[" +"        {" +"          \"name\": \"do0\"," +"          \"registerType\": \"COIL\"," +"          \"registerAddress\": 20," +"          \"dataType\": \"BOOL\"," +"          \"numberSplit\": 1" +"        }" +"        ,{" +"          \"name\": \"do1\"," +"          \"registerType\": \"COIL\"," +"          \"registerAddress\": 21," +"          \"dataType\": \"BOOL\"," +"          \"numberSplit\": 1" +"        }" +"]";public static void main(String[] args) {testReadData();
//        testWriteData();;}private static void testWriteData() {Vertx vertx = Vertx.vertx();ModbusConnection connection = new ModbusConnection(vertx,"127.0.0.1", 502, 30, ModbusMode.TCP);Future<Boolean> connectFuture = connection.connect();JSONObject reqParam = new JSONObject();reqParam.put("do0", false);reqParam.put("do1", false);List<ModbusParamConfig> modbusParamConfigs = JSONArray.parseArray(WRITE_DATA, ModbusParamConfig.class);connectFuture.onComplete(con -> {if (connectFuture.succeeded()) {SendCmdTask task = new SendCmdTask(vertx, modbusParamConfigs, null, false, 21, 10);Promise<JSONObject> promise = connection.offerTask(task);promise.future().onSuccess(suc -> {log.info("read:"+suc);}).onFailure(err -> System.err.println(err.getMessage()));SendCmdTask task2 = new SendCmdTask(vertx, modbusParamConfigs, reqParam, true, 21, 10);Promise<JSONObject> promise2 = connection.offerTask(task2);promise2.future().onSuccess(suc -> {log.info("write:"+suc);}).onFailure(err -> System.err.println(err.getMessage()));} else {System.err.println("gateway offline");}});}private static void testReadData() {Vertx vertx = Vertx.vertx();ModbusConnection connection = new ModbusConnection(vertx,"127.0.0.1", 502, 30, ModbusMode.TCP);Future<Boolean> connectFuture = connection.connect();List<ModbusParamConfig> modbusParamConfigs = JSONArray.parseArray(READ_DATA, ModbusParamConfig.class);connectFuture.onComplete(con -> {if (connection.isActive()) {SendCmdTask task = new SendCmdTask(vertx, modbusParamConfigs, null, false, 2, 10);Promise<JSONObject> promise = connection.offerTask(task);promise.future().onSuccess(suc -> {log.info(suc);}).onFailure(err -> System.err.println(err.getMessage()));} else {System.err.println("gateway offline");}});}
}

运行结果如下:
其实这两个读写示例如果是一个网关可以共用一个Modbus连接。
在这里插入图片描述
在这里插入图片描述

modbus-app配置参数在这里插入图片描述

格式如下:

{"readable": {"devType01": {"ReportData": [{"name" : "xxx","registerType" : "COIL","registerAddress" : 1,"dataType" : "BOOL","numberSplit" : 1}]},"devType02": {"ReportData": [{"name" : "a","registerType" : "HOLDING_REGISTER","registerAddress" : 1,"dataType" : "INT","numberSplit" : 1},{"name" : "b","registerType" : "HOLDING_REGISTER","registerAddress" : 2,"dataType" : "INT","numberSplit" : 10},{"name": "c","registerType": "","dataType": "FLOAT","mbScript": "(a*10000+b)/10"}]}},"writable": {"devType01": {"Control": [{"name": "operation","registerType": "COIL","registerAddress": 21,"dataType": "BOOL","numberSplit": 1}]}},"readDataPeriods": [{"period" : 60,"deviceTypes": ["devType01"]},{"period" : 600,"deviceTypes": ["devType02","devType03"]}]
}

具体怎么实现这边就不过多讲解了…

结束

不保证代码正确,我这边只是大概实现了一下,仅供参考。若有问题,请批评指出,我会虚心接受并积极修复问题。

相关文章:

Java实现Modbus读写数据

背景 由于当时项目周期赶&#xff0c;引入了一个PLC4X组件&#xff0c;上手快。接下来就是使用这个组件遇到的一些问题&#xff1a; 关闭连接NioEventLoop没有释放导致oom设计思想是一个设备一个连接&#xff0c;而不是一个网关一个连接连接断开后客户端无从感知 前两个问题解…...

C++11新特性⑤ | 仿函数与lambda表达式

目录 1、引言 2、仿函数 3、lambda表达式 3.1、lambda表达式的一般形式 3.2、返回类型说明 3.3、捕获列表的规则 3.4、可以捕获哪些变量 3.5、lambda表达式给编程带来的便利 VC常用功能开发汇总&#xff08;专栏文章列表&#xff0c;欢迎订阅&#xff0c;持续更新...&a…...

解决websocket不定时出现1005错误

后台抛出异常如下&#xff1a; Operator called default onErrorDropped reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalArgumentException: WebSocket close status code does NOT comply with RFC-6455: 1005 Caused by: java.lang.IllegalArgume…...

文章内容生成随机图像,并将这些图像上链

一、需求背景 在当前的互联网时代,信息越来越快速地传播,一篇好的文章不仅需要有吸引人的文字内容,还需要有精美的配图。但是,对于某些只有文字,而没有图片的文章,我们可以使用程序去生成随机的图片来作为文章的配图。 本文将详细介绍如何使用Java语言实现文章内容生成…...

l8-d9 UDP通信实现

一、函数接口扩展与UDP通信实现流程 1.write/read到send/recv 函数原型&#xff1a; ssize_t send(int sockfd, const void *buf, size_t len, int flags); ssize_t recv(int sockfd, void *buf, size_t len, int flags); 前三个参数同read/write一样&#xff1b; ssize_t rea…...

MongoDB复杂聚合查询与java中MongoTemplate的api对应

MongoDB聚合json脚本 db.getCollection("202303_refund").aggregate([{"$match": {"courseType": "常规班课","teacherRefundReasonCheck": true,"teacherId": {"$in": [7544]},"createTime"…...

WireShark抓包工具的安装

1.下载安装包 在官网或者电脑应用商城都可以下载 2.安装 打开安装包&#xff0c;点击next 点击next 选择UI界面&#xff0c;两种都装上 根据习惯选择 选择安装位置点击安装 开始安装安装成功...

审计智能合约的成本是多少?如何审计智能合约?

审计智能合约的成本是多少&#xff1f;如何审计智能合约&#xff1f; 智能合约安全审计在去中心化金融 (DeFi) 生态系统中非常普遍。如果您投资了一个区块链项目&#xff0c;您的决定可能部分基于智能合约代码审查的结果。 虽然大多数人都了解审计对网络安全的重要性&#xff…...

9.7 校招 内推 面经

绿泡*泡&#xff1a; neituijunsir 交流裙 &#xff0c;内推/实习/校招汇总表格 1、校招 | Momenta 2024校招火热进行中&#xff01;新增招聘岗位&#xff08;内推&#xff09; 校招 | Momenta 2024校招火热进行中&#xff01;新增招聘岗位&#xff08;内推&#xff09; 2、…...

【网络编程】IO多路复用

IO多路复用是一种高效的I/O处理方式&#xff0c;它允许单个进程能够同时监视多个文件描述符&#xff08;sockets、文件等&#xff09;&#xff0c;并在其中任何一个文件描述符准备好进行I/O操作时进行处理。它的核心在于使用少量的线程或进程来管理多个I/O操作&#xff0c;以提…...

MySQL与postgreSQL数据库的区别

MySQL 是一个流行的开源关系型数据库管理系统&#xff0c;具有以下优势&#xff1a; 开源和免费&#xff1a;MySQL 是一个开源软件&#xff0c;允许用户免费下载、使用和修改。它的免费版本&#xff08;Community Edition&#xff09;提供了广泛的功能&#xff0c;适用于大多数…...

单片机电子元器件-按键

电子元器件 按键上有 四个引脚 1 2 、 3 4 按下之后 导通 1 3 、 2 4 初始导通 通常按键开关为机械弹性开关&#xff0c;开关在闭合不会马上稳定的接通&#xff0c;会有一连串的抖动 抖动时间的长短有机械特性来决定的&#xff0c;一般为5ms 到10 ms 。 消抖的分类 硬件消…...

Nacos docker实现nacos高可用集群项目

目录 Nacos是什么&#xff1f; Nacos在公司里的运用是什么&#xff1f; 使用docker构建nacos容器高可用集群 实验规划图&#xff1a;​编辑 1、拉取nacos镜像 2、创建docker网桥&#xff08;实现集群内的机器的互联互通&#xff08;所有的nacos和mysql&#xff09;&#x…...

基于Dubbo实现服务的远程调用

目录 前言 RPC思想 为什么使用Dubbo Dubbo技术框架 ​编辑 调用关系流程 基础实现 A.提供统一业务Api B.编辑服务提供者Product B.a 添加依赖 B.b 添加Dubbo 配置(基于yaml配置文件) B.c 编写并暴露服务 C.编辑服务消费者 C.a 添加依赖 C.b 添加Dubbo配置 C.c 引用…...

Redis事务的理解

介绍 Redis通过MULTI、EXEC、WATCH等命令来实现事务功能。 事务提供了一种将多个命令请求打包&#xff0c;然后一次性、按照顺序地执行多个命令的机制&#xff0c;并且在事务执行期间&#xff0c;服务器不会因为其他客户端请求而中断事务的执行功能&#xff0c;他会将事务中的…...

PostgreSQL安装异常,服务无法启动导致创建服务器超时

win上安装pg后无法创建服务器&#xff0c;提示创建超时&#xff0c;发现服务列表里面pg15服务 并没有启动&#xff0c;启动服务器发现服务不了&#xff0c;截图忘记截了&#xff0c;复现不了&#xff0c;解决方法是 换个身份&#xff0c;然后继续启动&#xff0c;然后就可以在…...

汽车电子系统网络安全解决方案

声明 本文是学习GB-T 38628-2020 信息安全技术 汽车电子系统网络安全指南. 而整理的学习笔记,分享出来希望更多人受益,如果存在侵权请及时联系我们 汽车电子系统网络安全范围 本标准给出了汽车电子系统网络安全活动框架&#xff0c;以及在此框架下的汽车电子系统网络安全活动…...

切片机制和MR工作机制

切片机制 默认的切片大小和块大小一致&#xff0c;切片的个数决定了MapTask的个数。 数据倾斜问题&#xff1a;如果某个切片的大小太小&#xff0c;会浪费了MapTask申请的CPU资源。 如果剩余数据长度大于128*1.1, 就切片成2份&#xff0c;否则就不进行切分了。 InputFormat基…...

【postgresql 基础入门】基础架构和命名空间层次,查看数据库对象再也不迷路

postgresql 基础架构 ​专栏内容&#xff1a; postgresql内核源码分析手写数据库toadb并发编程 ​开源贡献&#xff1a; toadb开源库 个人主页&#xff1a;我的主页 管理社区&#xff1a;开源数据库 座右铭&#xff1a;天行健&#xff0c;君子以自强不息&#xff1b;地势坤&…...

是的,决定放弃算法去机器学习了

可是梦想啊&#xff01;~她永存心间&#xff01;&#xff01;&#xff01; 我啊~本是执着于这些算法的怪咖&#xff0c;梦想是icpc&#xff0c;ccpc~ 可是啊~ 在以后的科研和工作中&#xff0c;这些算法很多都是用不到的&#xff0c;学习算法更重要的目的是锻炼编程能力和分析…...

铭豹扩展坞 USB转网口 突然无法识别解决方法

当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…...

Go 语言接口详解

Go 语言接口详解 核心概念 接口定义 在 Go 语言中&#xff0c;接口是一种抽象类型&#xff0c;它定义了一组方法的集合&#xff1a; // 定义接口 type Shape interface {Area() float64Perimeter() float64 } 接口实现 Go 接口的实现是隐式的&#xff1a; // 矩形结构体…...

【第二十一章 SDIO接口(SDIO)】

第二十一章 SDIO接口 目录 第二十一章 SDIO接口(SDIO) 1 SDIO 主要功能 2 SDIO 总线拓扑 3 SDIO 功能描述 3.1 SDIO 适配器 3.2 SDIOAHB 接口 4 卡功能描述 4.1 卡识别模式 4.2 卡复位 4.3 操作电压范围确认 4.4 卡识别过程 4.5 写数据块 4.6 读数据块 4.7 数据流…...

Leetcode 3577. Count the Number of Computer Unlocking Permutations

Leetcode 3577. Count the Number of Computer Unlocking Permutations 1. 解题思路2. 代码实现 题目链接&#xff1a;3577. Count the Number of Computer Unlocking Permutations 1. 解题思路 这一题其实就是一个脑筋急转弯&#xff0c;要想要能够将所有的电脑解锁&#x…...

条件运算符

C中的三目运算符&#xff08;也称条件运算符&#xff0c;英文&#xff1a;ternary operator&#xff09;是一种简洁的条件选择语句&#xff0c;语法如下&#xff1a; 条件表达式 ? 表达式1 : 表达式2• 如果“条件表达式”为true&#xff0c;则整个表达式的结果为“表达式1”…...

Java-41 深入浅出 Spring - 声明式事务的支持 事务配置 XML模式 XML+注解模式

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; &#x1f680; AI篇持续更新中&#xff01;&#xff08;长期更新&#xff09; 目前2025年06月05日更新到&#xff1a; AI炼丹日志-28 - Aud…...

【AI学习】三、AI算法中的向量

在人工智能&#xff08;AI&#xff09;算法中&#xff0c;向量&#xff08;Vector&#xff09;是一种将现实世界中的数据&#xff08;如图像、文本、音频等&#xff09;转化为计算机可处理的数值型特征表示的工具。它是连接人类认知&#xff08;如语义、视觉特征&#xff09;与…...

C#学习第29天:表达式树(Expression Trees)

目录 什么是表达式树&#xff1f; 核心概念 1.表达式树的构建 2. 表达式树与Lambda表达式 3.解析和访问表达式树 4.动态条件查询 表达式树的优势 1.动态构建查询 2.LINQ 提供程序支持&#xff1a; 3.性能优化 4.元数据处理 5.代码转换和重写 适用场景 代码复杂性…...

NPOI Excel用OLE对象的形式插入文件附件以及插入图片

static void Main(string[] args) {XlsWithObjData();Console.WriteLine("输出完成"); }static void XlsWithObjData() {// 创建工作簿和单元格,只有HSSFWorkbook,XSSFWorkbook不可以HSSFWorkbook workbook new HSSFWorkbook();HSSFSheet sheet (HSSFSheet)workboo…...

django blank 与 null的区别

1.blank blank控制表单验证时是否允许字段为空 2.null null控制数据库层面是否为空 但是&#xff0c;要注意以下几点&#xff1a; Django的表单验证与null无关&#xff1a;null参数控制的是数据库层面字段是否可以为NULL&#xff0c;而blank参数控制的是Django表单验证时字…...