Java实现Modbus读写数据
背景
由于当时项目周期赶,引入了一个PLC4X组件,上手快。接下来就是使用这个组件遇到的一些问题:
- 关闭连接NioEventLoop没有释放导致oom
- 设计思想是一个设备一个连接,而不是一个网关一个连接
- 连接断开后客户端无从感知
前两个问题解决方案参考上篇文章,最后一个问题虽然可以通过isConnect()方法获取到状态,但是连接断开后这个状态并没有更新,只能代码实现失败重连。
所以为了解决以上问题,我打算重新封装一个Modbus组件。
步骤
代码如下所示,目前只分享modbus-core相关的代码。
- modbus-core:实现设备读写指令的下发以及应答。
- modbus-app:实现通用的可灵活配置的modbus设备接入层,通过更新配置信息即可快速引入新设备,无需手写代码重启应用。
为了快速实现modbus组件封装,这里引入了Vertx框架(基于事件+异步)官网链接,而不是原生的Netty框架。
引入架包
<!-- 目前我这里引入最新的版本(4.4.4) -->
<dependency><groupId>io.vertx</groupId><artifactId>vertx-core</artifactId><version>${vertx.version}</version></dependency>
工具类
ByteUtil
package com.bho.modbus.utils;import java.nio.ByteBuffer;public class ByteUtil {/*** 字节数组转字符串* @param bytes* @return*/public static String bytesToHexString(byte[] bytes) {StringBuffer sb = new StringBuffer(bytes.length);String sTemp;for (int i = 0; i < bytes.length; i++) {sTemp = Integer.toHexString(0xFF & bytes[i]);if (sTemp.length() < 2) {sb.append(0);}sb.append(sTemp.toUpperCase());}return sb.toString();}/*** int整型转字节数组* @param data* @param offset* @param len* @return*/public static byte[] intToBytes(int data, int offset, int len) {ByteBuffer buffer = ByteBuffer.allocate(4);buffer.putInt(data);byte[] bytes = buffer.array();if (len - offset == 4) {return bytes;}byte[] dest = new byte[len];System.arraycopy(bytes, offset, dest, 0, len);return dest;}/*** 字节数组转int整型* @param bytes* @param offset* @param len* @return*/public static int bytesToInt(byte[] bytes, int offset, int len) {ByteBuffer buffer = ByteBuffer.allocate(4);for (int i = len; i < 4; i ++) {buffer.put((byte) 0x00);}for (int i = offset; i < offset + len; i++) {buffer.put(bytes[i]);}buffer.flip();return buffer.getInt();}}
Crc16
package com.bho.modbus.utils;public class Crc16 {/*** 获取CRC16校验码* @param arr_buff* @return*/public static byte[] getCrc16(byte[] arr_buff) {int len = arr_buff.length;// 预置 1 个 16 位的寄存器为十六进制FFFF, 称此寄存器为 CRC寄存器。int crc = 0xFFFF;int i, j;for (i = 0; i < len; i++) {// 把第一个 8 位二进制数据 与 16 位的 CRC寄存器的低 8 位相异或, 把结果放于 CRC寄存器crc = ((crc & 0xFF00) | (crc & 0x00FF) ^ (arr_buff[i] & 0xFF));for (j = 0; j < 8; j++) {// 把 CRC 寄存器的内容右移一位( 朝低位)用 0 填补最高位, 并检查右移后的移出位if ((crc & 0x0001) > 0) {// 如果移出位为 1, CRC寄存器与多项式A001进行异或crc = crc >> 1;crc = crc ^ 0xA001;} else// 如果移出位为 0,再次右移一位crc = crc >> 1;}}return intToBytes(crc);}private static byte[] intToBytes(int value) {byte[] src = new byte[2];src[1] = (byte) ((value >> 8) & 0xFF);src[0] = (byte) (value & 0xFF);return src;}}
实体类
ModbusMode
目前只实现了以下两种通信方式,可根据自己需求加入其它通信方式。
package com.bho.modbus.model;import com.bho.modbus.utils.ByteUtil;
import com.bho.modbus.utils.Crc16;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import lombok.extern.log4j.Log4j2;import java.nio.ByteOrder;@Log4j2
public enum ModbusMode {/*** 【事务ID(2) + 协议标识(2) + 数据长度(2)】 + 从机地址(1) + 功能码(1) + 数据区(N)*/TCP,/*** 从机地址(1) + 功能码(1) + 数据区(N) + 【校验码(2)】**/RTU,;public ByteToMessageDecoder getDecoder() {if (this == ModbusMode.TCP) {return new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN, 65536, 4,2, 0, 6, true);}if (this == ModbusMode.RTU){return new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN, 65536, 2,1, 2, 0, true);}return null;}public byte[] readData(byte[] bytes) {int len = bytes.length;if (this == ModbusMode.RTU) {byte[] tempArr = new byte[len - 2];System.arraycopy(bytes, 0, tempArr, 0, tempArr.length);byte[] crc16 = Crc16.getCrc16(tempArr);if (crc16[0] != bytes[len -2] || crc16[1] != bytes[len - 1]) {log.error("Modbus receive illegal data:{}", ByteUtil.bytesToHexString(bytes));return null;}if (log.isDebugEnabled()) {log.debug("read data:{}", ByteUtil.bytesToHexString(tempArr));}return tempArr;}if (this == ModbusMode.TCP) {if (log.isDebugEnabled()) {log.debug("read data:{}", ByteUtil.bytesToHexString(bytes));}return bytes;}return null;}public byte[] writeData(byte[] bytes) {if (log.isDebugEnabled()) {log.debug("write data:{}",ByteUtil.bytesToHexString(bytes));}int len = bytes.length;if (this == ModbusMode.RTU) {byte[] crc16 = Crc16.getCrc16(bytes);byte[] tempArr = new byte[len + 2];System.arraycopy(bytes, 0, tempArr, 0, len);tempArr[len] = crc16[0];tempArr[len + 1] = crc16[1];return tempArr;}if (this == ModbusMode.TCP) {byte[] tempArr = new byte[len + 6];tempArr[1] = 0x01;byte[] lenBytes = ByteUtil.intToBytes(len, 2, 2);tempArr[4] = lenBytes[0];tempArr[5] = lenBytes[1];System.arraycopy(bytes, 0, tempArr, 6, len);return tempArr;}return null;}}
ModbusFunc
功能码
package com.bho.modbus.model;/*** Modbus常见功能码*/
public enum ModbusFunc {/*** 错误代码* 01:非法的功能码* 02:非法的寄存器地址* 03:非法的数据值* 04:从机故障*//*** 请求:* 功能代码:1字节 0x01* 起始地址:2字节 0x0000-0xffff* 线圈数量:2字节 0x0001-0x07d0(2000)** 正确响应:* 功能代码:1字节 0x01* 字节数:1字节 N(读线圈个数/8,余数不为0则加1)* 线圈状态:N字节** 错误响应:* 功能代码:1字节 0x81* 错误代码:1字节 0x01-0x04*/READ_COILS((byte)0x01),//读连续线圈状态READ_DISCRETE_COILS((byte)0x02),//读离散线圈状态 同上/*** 请求:* 功能代码:1字节 0x03* 起始地址:2字节 0x0000-0xffff* 寄存器数量:2字节 0x0001-0x007d(125)** 正确响应:* 功能代码:1字节 0x03* 字节数:1字节 2N(N为寄存器数量)* 寄存器数量:2N字节** 错误响应:* 功能代码:1字节 0x83* 错误代码:1字节 0x01-0x04*/READ_HOLDING_REGISTERS((byte)0x03),//读保持寄存器值READ_INPUT_REGISTERS((byte)0x04),//读输入寄存器值 同上/*** 请求:* 功能代码:1字节 0x05* 起始地址:2字节 0x0000-0xffff* 线圈状态:2字节 0x0000/0xff00** 正确响应:* 功能代码:1字节 0x05* 起始地址:2字节 0x0000-0xffff* 线圈状态:2字节 0x0000/0xff00** 错误响应:* 功能代码:1字节 0x85* 错误代码:1字节 0x01-0x04*/WRITE_SINGLE_COILS((byte)0x05),//写单个线圈/*** 请求:* 功能代码:1字节 0x06* 起始地址:2字节 0x0000-0xffff* 寄存器值:2字节 0x0000-0xffff** 正确响应:* 功能代码:1字节 0x06* 起始地址:2字节 0x0000-0xffff* 寄存器值:2字节 0x0000-0xffff** 错误响应:* 功能代码:1字节 0x86* 错误代码:1字节 0x01-0x04*/WRITE_SINGLE_HOLDING_REGISTERS((byte)0x06),//写单个保持寄存器/*** 请求:* 功能代码:1字节 0x10* 起始地址:2字节 0x0000-0xffff* 写入寄存器个数:2字节 0x0001-0x007b(123)* 写入字节数:1字节 2N(N为寄存器个数)* 寄存器值:2N字节 0x0000-0xffff** 正确响应:* 功能代码:1字节 0x10* 起始地址:2字节 0x0000-0xffff* 写入寄存器个数:2字节 0x0001-0x007b(123)** 错误响应:* 功能代码:1字节 0x90* 错误代码:1字节 0x01-0x04*/WRITE_MULTI_HOLDING_REGISTERS((byte)0x10),//写多个保持寄存器/*** 请求:* 功能代码:1字节 0x0F* 起始地址:2字节 0x0000-0xffff* 写入线圈个数:2字节 0x0001-0x07b0(1968)* 写入字节数:1字节 N(N为线圈个数/8,余数不为0则加1)* 线圈状态:N字节** 正确响应:* 功能代码:1字节 0x0F* 起始地址:2字节 0x0000-0xffff* 写入线圈个数:2字节 0x0001-0x07b0(1968)** 错误响应:* 功能代码:1字节 0x8F* 错误代码:1字节 0x01-0x04*/WRITE_MULTI_COILS((byte)0x0F),//写多个线圈;private byte func;ModbusFunc(byte func) {this.func = func;}public byte getFunc() {return func;}
}
ModbusParamConfig
下发指令参数配置信息
package com.bho.modbus.model;import lombok.Data;@Data
public class ModbusParamConfig {private RegisterType registerType;//寄存器类型private int registerAddress;//寄存器地址private String name;//指标名称private DataType dataType;//指标数据类型private int numberSplit;//(除)倍数public enum RegisterType {COIL,HOLDING_REGISTER,INPUT_REGISTER;}public enum DataType {BOOL,FLOAT,INT;}}
SendCmdTask
下发指令任务
package com.bho.modbus.model;import com.alibaba.fastjson.JSONObject;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import lombok.Data;import java.util.List;@Data
public class SendCmdTask {private List<ModbusParamConfig> paramConfigs;//参数列表private JSONObject reqParam;//请求参数 写数据必填private Boolean isWrite;//是否是写数据private Integer slaveId;//从机IDprivate Integer reqTimeout;//请求超时时间(秒)private Promise<JSONObject> promise;private Long timerId;public SendCmdTask(Vertx vertx, List<ModbusParamConfig> paramConfigs, JSONObject reqParam, Boolean isWrite, Integer slaveId, Integer reqTimeout) {this.paramConfigs = paramConfigs;this.reqParam = reqParam;this.isWrite = isWrite;this.slaveId = slaveId;this.reqTimeout = Math.max(reqTimeout, 5);Promise<JSONObject> promise = Promise.promise();this.promise = promise;this.timerId = vertx.setTimer(reqTimeout * 1000, hh -> promise.tryFail("Request timeout"));}
}
核心类
package com.bho.modbus.core;import com.alibaba.fastjson.JSONObject;
import com.bho.modbus.model.SendCmdTask;
import com.bho.modbus.model.ModbusFunc;
import com.bho.modbus.model.ModbusMode;
import com.bho.modbus.model.ModbusParamConfig;import com.bho.modbus.utils.ByteUtil;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.buffer.impl.BufferImpl;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.impl.NetSocketImpl;
import lombok.extern.log4j.Log4j2;import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;@Log4j2
public class ModbusConnection {private String ip;//从机IPprivate Integer port;//从机端口private AtomicBoolean isAlive;//从机是否在线private ModbusMode mode;//通讯模式private NetSocket netSocket;//客户端连接private boolean isInitiativeClose;//是否是主动关闭连接private Long failRetryTimerId;//失败重试定时器IDprivate Integer failRetryIntervalSecond;//连接断开后重连间隔时间private Integer reqTimeoutSecond = 1;//请求超时时间private Long queueTimerId;//队列定时器private ConcurrentLinkedQueue<SendCmdTask> writeQueue;//写队列 优先写private ConcurrentLinkedQueue<SendCmdTask> readQueue;//读队列private Map<String, Promise<byte[]>> promiseMap;private Vertx vertx;public ModbusConnection(Vertx vertx, String ip, Integer port, Integer failRetryIntervalSecond, ModbusMode mode) {this.vertx = vertx;this.ip = ip;this.port = port;this.failRetryIntervalSecond = failRetryIntervalSecond;this.mode = mode;this.isAlive = new AtomicBoolean(false);this.writeQueue = new ConcurrentLinkedQueue<>();this.readQueue = new ConcurrentLinkedQueue<>();this.promiseMap = new ConcurrentHashMap<>();consumerTaskQueue(true);}/*** 建立连接* @return*/public Future<Boolean> connect(){NetClient netClient = vertx.createNetClient();return vertx.executeBlocking(b -> {netClient.connect(port, ip).onSuccess(socket -> {log.info("Modbus connect success, ip:{}, port:{}", ip, port);netSocket = socket;isAlive.set(true);b.tryComplete(true);NetSocketImpl netSocketImpl = (NetSocketImpl) socket;netSocketImpl.channelHandlerContext().pipeline().addFirst(mode.getDecoder());socket.handler(buf -> {byte[] bytes = mode.readData(buf.getBytes());if (bytes == null) {return;}int slaveId = ByteUtil.bytesToInt(bytes, 0, 1);int funcNo = ByteUtil.bytesToInt(bytes, 1, 1);int errFuncNo = funcNo - 128;String key = String.format("%s_%s", slaveId, funcNo);String errKey = String.format("%s_%s", slaveId, errFuncNo);if (promiseMap.containsKey(key)) {Promise<byte[]> promise = promiseMap.get(key);byte[] content = new byte[bytes.length - 2];System.arraycopy(bytes, 2, content, 0, content.length);promise.tryComplete(content);} else if (promiseMap.containsKey(errKey)) {Promise<byte[]> promise = promiseMap.get(errKey);int data = ByteUtil.bytesToInt(bytes, 2, 1);switch (data) {case 1:promise.tryFail("Illegal function code");break;case 2:promise.tryFail("Illegal register address");break;case 3:promise.tryFail("Illegal data value");break;case 4:promise.tryFail("Slave fault");break;}}});socket.closeHandler(h -> {if (!isInitiativeClose) {log.error("Modbus connect close, ip:{}, port:{}", ip, port);failRetryTimerId = vertx.setTimer(failRetryIntervalSecond * 1000, hh -> connect());} else {log.info("Modbus connect close, ip:{}, port:{}", ip, port);}});}).onFailure(err -> {log.error("Modbus connect fail, ip:{}, port:{}, msg:{}", ip, port, err.getMessage());isAlive.set(false);b.fail(err.getMessage());failRetryTimerId = vertx.setTimer(failRetryIntervalSecond * 1000, h -> connect());});});}/*** 是否在线* @return*/public boolean isActive() {return isAlive.get();}/*** 断开连接*/public void close() {isInitiativeClose = true;if (failRetryTimerId != null) {vertx.cancelTimer(failRetryTimerId);}if (queueTimerId != null) {vertx.cancelTimer(queueTimerId);}if (netSocket != null) {netSocket.close();}}/*** 下发读写任务(串行 优先写任务)* 若并行可直接调用executeTask执行任务,无需排队等候一个个消费任务* @param task 读写任务* @return*/public Promise<JSONObject> offerTask(SendCmdTask task) {if (task.getIsWrite()) {writeQueue.offer(task);} else {readQueue.offer(task);}return task.getPromise();}/*** 消费任务队列 500毫秒轮询一次 优先消费写任务* @param delayFlag*/private void consumerTaskQueue(boolean delayFlag){if(delayFlag){queueTimerId = vertx.setTimer(500,id->{consumerTaskQueue(false);});return;}if(writeQueue.isEmpty() && readQueue.isEmpty()){consumerTaskQueue(true);return;}if(!writeQueue.isEmpty()){SendCmdTask sendCmdTask = writeQueue.poll();sendCmdTask.getPromise().future().onComplete(h->{consumerTaskQueue(false);});executeTask(sendCmdTask);return;}if(!readQueue.isEmpty()){SendCmdTask sendCmdTask = readQueue.poll();sendCmdTask.getPromise().future().onComplete(h->{consumerTaskQueue(false);});executeTask(sendCmdTask);}}private Future<Void> executeTask(SendCmdTask sendCmdTask){vertx.cancelTimer(sendCmdTask.getTimerId());Future<JSONObject> future;if (sendCmdTask.getIsWrite()) {future = executeWrite(sendCmdTask.getReqParam(), sendCmdTask.getParamConfigs(), sendCmdTask.getSlaveId());} else {future = executeQuery(sendCmdTask.getParamConfigs(), sendCmdTask.getSlaveId());}return future.onSuccess(res -> sendCmdTask.getPromise().tryComplete(res)).onFailure(err -> sendCmdTask.getPromise().tryFail(err)).map(o -> null);}/*** 写数据* @param reqParam 下发参数* @param paramConfigs 参数配置列表* @param slaveId 从机ID* @return*/private Future<JSONObject> executeWrite(JSONObject reqParam, List<ModbusParamConfig> paramConfigs, Integer slaveId) {if (!isActive()) {return Future.failedFuture("Gateway offline");}boolean isMerge = isMergeSendCmd(paramConfigs);if (isMerge) {int registerAddress = paramConfigs.get(0).getRegisterAddress();ModbusParamConfig.RegisterType registerType = paramConfigs.get(0).getRegisterType();Promise<byte[]> promise = Promise.promise();List<String> keyList = paramConfigs.stream().map(ModbusParamConfig::getName).collect(Collectors.toList());return vertx.executeBlocking(h -> {Buffer buffer = getWriteCmd(registerAddress, slaveId, reqParam, keyList, registerType, promise);netSocket.write(buffer);promise.future().onSuccess(buf -> {h.complete(reqParam);}).onFailure(err -> {log.error("Modbus executeWrite fail, ip:{}, port:{}, slaveId:{}, msg:{}", ip, port, slaveId, err.getMessage());h.tryFail(err.getMessage());});});}List<Future<Object>> futures = new ArrayList<>();Future blockingFuture = Future.succeededFuture();for (int i = 0; i < paramConfigs.size(); i++) {ModbusParamConfig paramConfig = paramConfigs.get(i);ModbusParamConfig.RegisterType registerType = paramConfig.getRegisterType();Promise<byte[]> promise = Promise.promise();blockingFuture = blockingFuture.compose(suc -> singleExecuteWrite(slaveId, reqParam, promise, registerType, paramConfig),err -> singleExecuteWrite(slaveId, reqParam, promise, registerType, paramConfig));futures.add(blockingFuture);}return commonReplyResult(futures, paramConfigs);}private Future<Object> singleExecuteWrite(int slaveId, JSONObject reqParam, Promise<byte[]> promise, ModbusParamConfig.RegisterType registerType, ModbusParamConfig paramConfig) {return vertx.executeBlocking(h -> {Buffer buffer = getWriteCmd(paramConfig.getRegisterAddress(), slaveId, reqParam, Arrays.asList(paramConfig.getName()), registerType, promise);netSocket.write(buffer);promise.future().onSuccess(buf -> {h.tryComplete(reqParam.get(paramConfig.getName()));}).onFailure(err -> {log.error("Modbus executeWrite fail, ip:{}, port:{}, slaveId:{}, key:{}, msg:{}",ip, port, slaveId, paramConfig.getName(), err.getMessage());h.tryFail(err.getMessage());});});}/*** 读数据* @param paramConfigs 参数配置列表* @param slaveId 从机ID* @return*/private Future<JSONObject> executeQuery(List<ModbusParamConfig> paramConfigs, Integer slaveId) {if (!isActive()) {return Future.failedFuture("Gateway offline");}boolean isMerge = isMergeSendCmd(paramConfigs);if (isMerge) {int registerAddress = paramConfigs.get(0).getRegisterAddress();ModbusParamConfig.RegisterType registerType = paramConfigs.get(0).getRegisterType();int num = paramConfigs.size();Promise<byte[]> promise = Promise.promise();Buffer buffer = getQueryCmd(registerAddress, num, slaveId, registerType, promise);return vertx.executeBlocking(h -> {netSocket.write(buffer);promise.future().onSuccess(buf -> {JSONObject jsonObject = new JSONObject();for (int i = 0; i < paramConfigs.size(); i++) {ModbusParamConfig paramConfig = paramConfigs.get(i);switch (registerType) {case COIL:Integer pow = Double.valueOf(Math.pow(2, i % 8)).intValue();jsonObject.put(paramConfig.getName(), (pow & buf[i / 8 + 1]) == pow);break;case INPUT_REGISTER:case HOLDING_REGISTER:jsonObject.put(paramConfig.getName(), getValue(ByteUtil.bytesToInt(buf, i * 2 + 1, 2), paramConfig.getNumberSplit(), paramConfig.getDataType()));break;}}h.complete(jsonObject);}).onFailure(err -> {log.error("Modbus executeQuery fail, ip:{}, port:{}, slaveId:{}, msg:{}", ip, port, slaveId, err.getMessage());h.tryFail(err.getMessage());});});}List<Future<Object>> futures = new ArrayList<>();Future blockingFuture = Future.succeededFuture();for (int i = 0; i < paramConfigs.size(); i++) {ModbusParamConfig paramConfig = paramConfigs.get(i);ModbusParamConfig.RegisterType registerType = paramConfig.getRegisterType();Promise<byte[]> promise = Promise.promise();blockingFuture = blockingFuture.compose(suc -> singleExecuteQuery(slaveId, promise, registerType, paramConfig),err -> singleExecuteQuery(slaveId, promise, registerType, paramConfig));futures.add(blockingFuture);}return commonReplyResult(futures, paramConfigs);}private Future<Object> singleExecuteQuery(int slaveId, Promise<byte[]> promise, ModbusParamConfig.RegisterType registerType, ModbusParamConfig paramConfig) {return vertx.executeBlocking(h -> {Buffer buffer = getQueryCmd(paramConfig.getRegisterAddress(), 1, slaveId, paramConfig.getRegisterType(), promise);netSocket.write(buffer);promise.future().onSuccess(buf -> {switch (registerType) {case COIL:h.complete(Integer.valueOf(buf[1]) == 1);break;case INPUT_REGISTER:case HOLDING_REGISTER:h.complete(getValue(ByteUtil.bytesToInt(buf, 1, 2), paramConfig.getNumberSplit(), paramConfig.getDataType()));break;}}).onFailure(err -> {log.error("Modbus executeQuery fail, ip:{}, port:{}, slaveId:{}, key:{}, msg:{}",ip, port, slaveId, paramConfig.getName(), err.getMessage());h.tryFail(err.getMessage());});});}/*** 如果所有参数寄存器类型一致并且地址连续 则合并成一条命令下发* @param paramConfigs* @return 是否可以合并下发命令*/private boolean isMergeSendCmd(List<ModbusParamConfig> paramConfigs) {if (paramConfigs.size() == 1) {return false;}int lastPos = paramConfigs.get(0).getRegisterAddress();ModbusParamConfig.RegisterType registerType = paramConfigs.get(0).getRegisterType();for (int i = 1; i < paramConfigs.size(); i++) {int curPos = paramConfigs.get(i).getRegisterAddress();if (curPos - lastPos != 1) {return false;}ModbusParamConfig.RegisterType curRegisterType = paramConfigs.get(i).getRegisterType();if (registerType != curRegisterType) {return false;}lastPos = curPos;}return true;}/*** 获取查询数据命令* @param startPos 查询地址* @param num 查询数量* @param slaveId 从机ID* @param registerType 寄存器类型* @param promise* @return*/private Buffer getQueryCmd(int startPos, int num, int slaveId, ModbusParamConfig.RegisterType registerType, Promise<byte[]> promise) {byte[] bytes = new byte[6];bytes[0] = ByteUtil.intToBytes(slaveId, 3, 1)[0];switch (registerType) {case COIL:bytes[1] = ModbusFunc.READ_COILS.getFunc();break;case HOLDING_REGISTER:bytes[1] = ModbusFunc.READ_HOLDING_REGISTERS.getFunc();break;case INPUT_REGISTER:bytes[1] = ModbusFunc.READ_INPUT_REGISTERS.getFunc();break;}Integer func = ByteUtil.bytesToInt(bytes, 1, 1);String key = String.format("%s_%s", slaveId, func);byte[] startPosBytes = ByteUtil.intToBytes(startPos, 0, 4);bytes[2] = startPosBytes[2];bytes[3] = startPosBytes[3];byte[] numBytes = ByteUtil.intToBytes(num, 0, 4);bytes[4] = numBytes[2];bytes[5] = numBytes[3];Buffer buffer = new BufferImpl();buffer.appendBytes(mode.writeData(bytes));promiseMap.put(key, promise);long timeId = vertx.setTimer(reqTimeoutSecond * 1000, h -> promise.tryFail("Request timeout"));promise.future().onComplete(res -> {promiseMap.remove(key);vertx.cancelTimer(timeId);});return buffer;}/*** 获取写数据命令* @param startPos 查询地址* @param slaveId 从机ID* @param reqParam 写参数* @param keys 参数列表* @param registerType 寄存器类型* @param promise* @return*/private Buffer getWriteCmd(int startPos, int slaveId, JSONObject reqParam,List<String> keys, ModbusParamConfig.RegisterType registerType, Promise<byte[]> promise) {int len = keys.size() == 1 ? 6 : (registerType == ModbusParamConfig.RegisterType.HOLDING_REGISTER ?7 + keys.size() * 2 : 7 + Double.valueOf(Math.ceil(keys.size() / 8.0)).intValue());byte[] bytes = new byte[len];bytes[0] = ByteUtil.intToBytes(slaveId, 3, 1)[0];byte[] startPosBytes = ByteUtil.intToBytes(startPos, 0, 4);bytes[2] = startPosBytes[2];bytes[3] = startPosBytes[3];if (keys.size() == 1) {switch (registerType) {case COIL:bytes[1] = ModbusFunc.WRITE_SINGLE_COILS.getFunc();boolean value = reqParam.getBoolean(keys.get(0));if (value) {bytes[4] = (byte) 0xFF;} else {bytes[4] = 0x00;}bytes[5] = 0x00;break;case HOLDING_REGISTER:bytes[1] = ModbusFunc.WRITE_SINGLE_HOLDING_REGISTERS.getFunc();byte[] dataArr = ByteUtil.intToBytes(reqParam.getInteger(keys.get(0)), 2, 2);bytes[4] = dataArr[0];bytes[5] = dataArr[1];break;}} else {byte[] dataNum = ByteUtil.intToBytes(keys.size(), 2, 2);bytes[4] = dataNum[0];bytes[5] = dataNum[1];switch (registerType) {case COIL:bytes[1] = ModbusFunc.WRITE_MULTI_COILS.getFunc();int dataSize = Double.valueOf(Math.ceil(keys.size() / 8.0)).intValue();bytes[6] = ByteUtil.intToBytes(dataSize, 3, 1)[0];for (int i = 0; i < dataSize; i += 2) {int sum = 0;int startIndex = i * 8;int endIndex = (i + 2) * 8;endIndex = endIndex > keys.size() ? keys.size() : endIndex;for (int j = startIndex; j < endIndex; j++) {sum += Double.valueOf(Math.pow(2, j)).intValue() * (reqParam.getBoolean(keys.get(j)) ? 1 : 0);}byte[] sumArr = ByteUtil.intToBytes(sum, 2, 2);if (i + 8 < keys.size()) {bytes[i + 7] = sumArr[0];bytes[i + 8] = sumArr[1];} else {bytes[i + 7] = sumArr[1];}}break;case HOLDING_REGISTER:bytes[1] = ModbusFunc.WRITE_MULTI_HOLDING_REGISTERS.getFunc();bytes[6] = ByteUtil.intToBytes(keys.size() * 2, 3, 1)[0];for (int i = 0; i < keys.size(); i++) {String paramKey = keys.get(i);Integer value = reqParam.getInteger(paramKey);byte[] dataArr = ByteUtil.intToBytes(value, 2, 2);bytes[i * 2 + 7] = dataArr[0];bytes[i * 2 + 8] = dataArr[1];}break;}}Integer func = ByteUtil.bytesToInt(bytes, 1, 1);String key = String.format("%s_%s", slaveId, func);Buffer buffer = new BufferImpl();buffer.appendBytes(mode.writeData(bytes));promiseMap.put(key, promise);long timeId = vertx.setTimer(reqTimeoutSecond * 1000, h -> promise.tryFail("Request timeout"));promise.future().onComplete(res -> {promiseMap.remove(key);vertx.cancelTimer(timeId);});return buffer;}private Future<JSONObject> commonReplyResult(List<Future<Object >> futures, List<ModbusParamConfig> paramConfigs) {return vertx.executeBlocking(b -> {Future.join(futures).onComplete(h -> {JSONObject okJson = new JSONObject();JSONObject errJson = new JSONObject();for (int i = 0; i < paramConfigs.size(); i++) {ModbusParamConfig paramConfig = paramConfigs.get(i);Future<Object> objectFuture = futures.get(i);if (objectFuture.succeeded()) {okJson.put(paramConfig.getName(), objectFuture.result());} else {errJson.put(paramConfig.getName(), objectFuture.cause().getMessage());}}if (okJson.size() > 0) {b.tryComplete(okJson);} else {b.tryFail(errJson.getString(paramConfigs.get(0).getName()));}});});}private Object getValue(int value, int numberSplit, ModbusParamConfig.DataType dataType) {if (numberSplit == 1) {return value;}Float temp = value * 1f / numberSplit;switch (dataType) {case INT :return Math.round(temp);case FLOAT:return temp;}return temp;}}
测试
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.bho.modbus.model.ModbusMode;
import com.bho.modbus.core.ModbusConnection;
import com.bho.modbus.model.ModbusParamConfig;
import com.bho.modbus.model.SendCmdTask;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import lombok.extern.log4j.Log4j2;import java.util.List;@Log4j2
public class TestModbus {public static final String READ_DATA = "[" +" {" +" \"name\": \"a\"," +" \"registerType\": \"HOLDING_REGISTER\"," +" \"registerAddress\": 504," +" \"dataType\": \"FLOAT\"," +" \"numberSplit\": 10" +" }," +" {" +" \"name\": \"b\"," +" \"registerType\": \"HOLDING_REGISTER\"," +" \"registerAddress\": 505," +" \"dataType\": \"FLOAT\"," +" \"numberSplit\": 10" +" }," +" {" +" \"name\": \"c\"," +" \"registerType\": \"HOLDING_REGISTER\"," +" \"registerAddress\": 506," +" \"dataType\": \"FLOAT\"," +" \"numberSplit\": 10" +" }," +" {" +" \"name\": \"d\"," +" \"registerType\": \"HOLDING_REGISTER\"," +" \"registerAddress\": 507," +" \"dataType\": \"INT\"," +" \"numberSplit\": 1" +" }," +" {" +" \"name\": \"e\"," +" \"registerType\": \"HOLDING_REGISTER\"," +" \"registerAddress\": 508," +" \"dataType\": \"INT\"," +" \"numberSplit\": 1" +" }]";private static final String WRITE_DATA = "[" +" {" +" \"name\": \"do0\"," +" \"registerType\": \"COIL\"," +" \"registerAddress\": 20," +" \"dataType\": \"BOOL\"," +" \"numberSplit\": 1" +" }" +" ,{" +" \"name\": \"do1\"," +" \"registerType\": \"COIL\"," +" \"registerAddress\": 21," +" \"dataType\": \"BOOL\"," +" \"numberSplit\": 1" +" }" +"]";public static void main(String[] args) {testReadData();
// testWriteData();;}private static void testWriteData() {Vertx vertx = Vertx.vertx();ModbusConnection connection = new ModbusConnection(vertx,"127.0.0.1", 502, 30, ModbusMode.TCP);Future<Boolean> connectFuture = connection.connect();JSONObject reqParam = new JSONObject();reqParam.put("do0", false);reqParam.put("do1", false);List<ModbusParamConfig> modbusParamConfigs = JSONArray.parseArray(WRITE_DATA, ModbusParamConfig.class);connectFuture.onComplete(con -> {if (connectFuture.succeeded()) {SendCmdTask task = new SendCmdTask(vertx, modbusParamConfigs, null, false, 21, 10);Promise<JSONObject> promise = connection.offerTask(task);promise.future().onSuccess(suc -> {log.info("read:"+suc);}).onFailure(err -> System.err.println(err.getMessage()));SendCmdTask task2 = new SendCmdTask(vertx, modbusParamConfigs, reqParam, true, 21, 10);Promise<JSONObject> promise2 = connection.offerTask(task2);promise2.future().onSuccess(suc -> {log.info("write:"+suc);}).onFailure(err -> System.err.println(err.getMessage()));} else {System.err.println("gateway offline");}});}private static void testReadData() {Vertx vertx = Vertx.vertx();ModbusConnection connection = new ModbusConnection(vertx,"127.0.0.1", 502, 30, ModbusMode.TCP);Future<Boolean> connectFuture = connection.connect();List<ModbusParamConfig> modbusParamConfigs = JSONArray.parseArray(READ_DATA, ModbusParamConfig.class);connectFuture.onComplete(con -> {if (connection.isActive()) {SendCmdTask task = new SendCmdTask(vertx, modbusParamConfigs, null, false, 2, 10);Promise<JSONObject> promise = connection.offerTask(task);promise.future().onSuccess(suc -> {log.info(suc);}).onFailure(err -> System.err.println(err.getMessage()));} else {System.err.println("gateway offline");}});}
}
运行结果如下:
其实这两个读写示例如果是一个网关可以共用一个Modbus连接。
modbus-app配置参数
格式如下:
{"readable": {"devType01": {"ReportData": [{"name" : "xxx","registerType" : "COIL","registerAddress" : 1,"dataType" : "BOOL","numberSplit" : 1}]},"devType02": {"ReportData": [{"name" : "a","registerType" : "HOLDING_REGISTER","registerAddress" : 1,"dataType" : "INT","numberSplit" : 1},{"name" : "b","registerType" : "HOLDING_REGISTER","registerAddress" : 2,"dataType" : "INT","numberSplit" : 10},{"name": "c","registerType": "","dataType": "FLOAT","mbScript": "(a*10000+b)/10"}]}},"writable": {"devType01": {"Control": [{"name": "operation","registerType": "COIL","registerAddress": 21,"dataType": "BOOL","numberSplit": 1}]}},"readDataPeriods": [{"period" : 60,"deviceTypes": ["devType01"]},{"period" : 600,"deviceTypes": ["devType02","devType03"]}]
}
具体怎么实现这边就不过多讲解了…
结束
不保证代码正确,我这边只是大概实现了一下,仅供参考。若有问题,请批评指出,我会虚心接受并积极修复问题。
相关文章:

Java实现Modbus读写数据
背景 由于当时项目周期赶,引入了一个PLC4X组件,上手快。接下来就是使用这个组件遇到的一些问题: 关闭连接NioEventLoop没有释放导致oom设计思想是一个设备一个连接,而不是一个网关一个连接连接断开后客户端无从感知 前两个问题解…...

C++11新特性⑤ | 仿函数与lambda表达式
目录 1、引言 2、仿函数 3、lambda表达式 3.1、lambda表达式的一般形式 3.2、返回类型说明 3.3、捕获列表的规则 3.4、可以捕获哪些变量 3.5、lambda表达式给编程带来的便利 VC常用功能开发汇总(专栏文章列表,欢迎订阅,持续更新...&a…...
解决websocket不定时出现1005错误
后台抛出异常如下: Operator called default onErrorDropped reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalArgumentException: WebSocket close status code does NOT comply with RFC-6455: 1005 Caused by: java.lang.IllegalArgume…...
文章内容生成随机图像,并将这些图像上链
一、需求背景 在当前的互联网时代,信息越来越快速地传播,一篇好的文章不仅需要有吸引人的文字内容,还需要有精美的配图。但是,对于某些只有文字,而没有图片的文章,我们可以使用程序去生成随机的图片来作为文章的配图。 本文将详细介绍如何使用Java语言实现文章内容生成…...

l8-d9 UDP通信实现
一、函数接口扩展与UDP通信实现流程 1.write/read到send/recv 函数原型: ssize_t send(int sockfd, const void *buf, size_t len, int flags); ssize_t recv(int sockfd, void *buf, size_t len, int flags); 前三个参数同read/write一样; ssize_t rea…...
MongoDB复杂聚合查询与java中MongoTemplate的api对应
MongoDB聚合json脚本 db.getCollection("202303_refund").aggregate([{"$match": {"courseType": "常规班课","teacherRefundReasonCheck": true,"teacherId": {"$in": [7544]},"createTime"…...

WireShark抓包工具的安装
1.下载安装包 在官网或者电脑应用商城都可以下载 2.安装 打开安装包,点击next 点击next 选择UI界面,两种都装上 根据习惯选择 选择安装位置点击安装 开始安装安装成功...
审计智能合约的成本是多少?如何审计智能合约?
审计智能合约的成本是多少?如何审计智能合约? 智能合约安全审计在去中心化金融 (DeFi) 生态系统中非常普遍。如果您投资了一个区块链项目,您的决定可能部分基于智能合约代码审查的结果。 虽然大多数人都了解审计对网络安全的重要性ÿ…...
9.7 校招 内推 面经
绿泡*泡: neituijunsir 交流裙 ,内推/实习/校招汇总表格 1、校招 | Momenta 2024校招火热进行中!新增招聘岗位(内推) 校招 | Momenta 2024校招火热进行中!新增招聘岗位(内推) 2、…...

【网络编程】IO多路复用
IO多路复用是一种高效的I/O处理方式,它允许单个进程能够同时监视多个文件描述符(sockets、文件等),并在其中任何一个文件描述符准备好进行I/O操作时进行处理。它的核心在于使用少量的线程或进程来管理多个I/O操作,以提…...
MySQL与postgreSQL数据库的区别
MySQL 是一个流行的开源关系型数据库管理系统,具有以下优势: 开源和免费:MySQL 是一个开源软件,允许用户免费下载、使用和修改。它的免费版本(Community Edition)提供了广泛的功能,适用于大多数…...

单片机电子元器件-按键
电子元器件 按键上有 四个引脚 1 2 、 3 4 按下之后 导通 1 3 、 2 4 初始导通 通常按键开关为机械弹性开关,开关在闭合不会马上稳定的接通,会有一连串的抖动 抖动时间的长短有机械特性来决定的,一般为5ms 到10 ms 。 消抖的分类 硬件消…...

Nacos docker实现nacos高可用集群项目
目录 Nacos是什么? Nacos在公司里的运用是什么? 使用docker构建nacos容器高可用集群 实验规划图:编辑 1、拉取nacos镜像 2、创建docker网桥(实现集群内的机器的互联互通(所有的nacos和mysql)&#x…...

基于Dubbo实现服务的远程调用
目录 前言 RPC思想 为什么使用Dubbo Dubbo技术框架 编辑 调用关系流程 基础实现 A.提供统一业务Api B.编辑服务提供者Product B.a 添加依赖 B.b 添加Dubbo 配置(基于yaml配置文件) B.c 编写并暴露服务 C.编辑服务消费者 C.a 添加依赖 C.b 添加Dubbo配置 C.c 引用…...

Redis事务的理解
介绍 Redis通过MULTI、EXEC、WATCH等命令来实现事务功能。 事务提供了一种将多个命令请求打包,然后一次性、按照顺序地执行多个命令的机制,并且在事务执行期间,服务器不会因为其他客户端请求而中断事务的执行功能,他会将事务中的…...

PostgreSQL安装异常,服务无法启动导致创建服务器超时
win上安装pg后无法创建服务器,提示创建超时,发现服务列表里面pg15服务 并没有启动,启动服务器发现服务不了,截图忘记截了,复现不了,解决方法是 换个身份,然后继续启动,然后就可以在…...

汽车电子系统网络安全解决方案
声明 本文是学习GB-T 38628-2020 信息安全技术 汽车电子系统网络安全指南. 而整理的学习笔记,分享出来希望更多人受益,如果存在侵权请及时联系我们 汽车电子系统网络安全范围 本标准给出了汽车电子系统网络安全活动框架,以及在此框架下的汽车电子系统网络安全活动…...

切片机制和MR工作机制
切片机制 默认的切片大小和块大小一致,切片的个数决定了MapTask的个数。 数据倾斜问题:如果某个切片的大小太小,会浪费了MapTask申请的CPU资源。 如果剩余数据长度大于128*1.1, 就切片成2份,否则就不进行切分了。 InputFormat基…...
【postgresql 基础入门】基础架构和命名空间层次,查看数据库对象再也不迷路
postgresql 基础架构 专栏内容: postgresql内核源码分析手写数据库toadb并发编程 开源贡献: toadb开源库 个人主页:我的主页 管理社区:开源数据库 座右铭:天行健,君子以自强不息;地势坤&…...
是的,决定放弃算法去机器学习了
可是梦想啊!~她永存心间!!! 我啊~本是执着于这些算法的怪咖,梦想是icpc,ccpc~ 可是啊~ 在以后的科研和工作中,这些算法很多都是用不到的,学习算法更重要的目的是锻炼编程能力和分析…...

Linux相关概念和易错知识点(42)(TCP的连接管理、可靠性、面临复杂网络的处理)
目录 1.TCP的连接管理机制(1)三次握手①握手过程②对握手过程的理解 (2)四次挥手(3)握手和挥手的触发(4)状态切换①挥手过程中状态的切换②握手过程中状态的切换 2.TCP的可靠性&…...
MVC 数据库
MVC 数据库 引言 在软件开发领域,Model-View-Controller(MVC)是一种流行的软件架构模式,它将应用程序分为三个核心组件:模型(Model)、视图(View)和控制器(Controller)。这种模式有助于提高代码的可维护性和可扩展性。本文将深入探讨MVC架构与数据库之间的关系,以…...

1.3 VSCode安装与环境配置
进入网址Visual Studio Code - Code Editing. Redefined下载.deb文件,然后打开终端,进入下载文件夹,键入命令 sudo dpkg -i code_1.100.3-1748872405_amd64.deb 在终端键入命令code即启动vscode 需要安装插件列表 1.Chinese简化 2.ros …...

(转)什么是DockerCompose?它有什么作用?
一、什么是DockerCompose? DockerCompose可以基于Compose文件帮我们快速的部署分布式应用,而无需手动一个个创建和运行容器。 Compose文件是一个文本文件,通过指令定义集群中的每个容器如何运行。 DockerCompose就是把DockerFile转换成指令去运行。 …...
BLEU评分:机器翻译质量评估的黄金标准
BLEU评分:机器翻译质量评估的黄金标准 1. 引言 在自然语言处理(NLP)领域,衡量一个机器翻译模型的性能至关重要。BLEU (Bilingual Evaluation Understudy) 作为一种自动化评估指标,自2002年由IBM的Kishore Papineni等人提出以来,…...

ZYNQ学习记录FPGA(一)ZYNQ简介
一、知识准备 1.一些术语,缩写和概念: 1)ZYNQ全称:ZYNQ7000 All Pgrammable SoC 2)SoC:system on chips(片上系统),对比集成电路的SoB(system on board) 3)ARM:处理器…...
《Offer来了:Java面试核心知识点精讲》大纲
文章目录 一、《Offer来了:Java面试核心知识点精讲》的典型大纲框架Java基础并发编程JVM原理数据库与缓存分布式架构系统设计二、《Offer来了:Java面试核心知识点精讲(原理篇)》技术文章大纲核心主题:Java基础原理与面试高频考点Java虚拟机(JVM)原理Java并发编程原理Jav…...

ArcGIS Pro+ArcGIS给你的地图加上北回归线!
今天来看ArcGIS Pro和ArcGIS中如何给制作的中国地图或者其他大范围地图加上北回归线。 我们将在ArcGIS Pro和ArcGIS中一同介绍。 1 ArcGIS Pro中设置北回归线 1、在ArcGIS Pro中初步设置好经纬格网等,设置经线、纬线都以10间隔显示。 2、需要插入背会归线…...

【技巧】dify前端源代码修改第一弹-增加tab页
回到目录 【技巧】dify前端源代码修改第一弹-增加tab页 尝试修改dify的前端源代码,在知识库增加一个tab页"HELLO WORLD",完成后的效果如下 [gif01] 1. 前端代码进入调试模式 参考 【部署】win10的wsl环境下启动dify的web前端服务 启动调试…...

多模态学习路线(2)——DL基础系列
目录 前言 一、归一化 1. Layer Normalization (LN) 2. Batch Normalization (BN) 3. Instance Normalization (IN) 4. Group Normalization (GN) 5. Root Mean Square Normalization(RMSNorm) 二、激活函数 1. Sigmoid激活函数(二分类&…...