spring boot+netty 搭建MQTT broken
一、项目结构

二、安装依赖
<!-- netty包 --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.75.Final</version></dependency><!-- 常用JSON工具包 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.80</version></dependency><!--mqtt服务端--><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>
三、配置文件
server:port: 8880
netty:mqtt:port: 1883
# mqtt账号username: admin
#mqtt密码password: 123456
# 日记配置
logging:level:# 开启debug日记打印com.hyx: debug
四、新建具体类
1、新建BootNettyMqttMsgBack类
package com.example.springnettymqtt.MQTTServer.callback;import com.example.springnettymqtt.MQTTServer.config.MQTTServerProperties;
import io.netty.channel.Channel;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;import io.netty.channel.ChannelId;
import io.netty.handler.codec.mqtt.*;import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;import static com.example.springnettymqtt.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceAdd;
import static com.example.springnettymqtt.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceRemove;
import static com.example.springnettymqtt.MQTTServer.server.impl.MQTTServer.*;@Component
@RequiredArgsConstructor
public class BootNettyMqttMsgBack {private static final Logger log = LoggerFactory.getLogger(BootNettyMqttMsgBack.class);private final MQTTServerProperties MQTTserverProperties;/*** 确认连接请求* @param channel* @param mqttMessage*/public void connack (Channel channel, MqttMessage mqttMessage) {MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();// 构建返回报文, 可变报头MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeaderInfo.isCleanSession());// 构建返回报文, 固定报头MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.CONNACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);// 构建CONNACK消息体MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);//log.info("back--"+connAck.toString());log.debug("设备上线,channelId:{}", channel.id());MQTTdeviceAdd(channel);channel.writeAndFlush(connAck);}public void disconnack (Channel channel, MqttMessage mqttMessage) {MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();// 构建返回报文, 可变报头MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BANNED, mqttConnectVariableHeaderInfo.isCleanSession());// 构建返回报文, 固定报头MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.DISCONNECT,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);// 构建CONNACK消息体MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);//log.info("back--"+connAck.toString());channel.writeAndFlush(connAck);log.debug("设备下线,channelId:{}", channel.id());MQTTdeviceRemove(channel);}/*** 根据qos发布确认* @param channel* @param mqttMessage*/public void puback (Channel channel, MqttMessage mqttMessage) {MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader();MqttQoS qos = mqttFixedHeaderInfo.qosLevel();//注意: readableBytes会改变写指针位置,使后续推送数据时,读取数据为空,需要重置 读指针byte[] headBytes = new byte[mqttPublishMessage.payload().readableBytes()];mqttPublishMessage.payload().readBytes(headBytes);String data = new String(headBytes);System.out.println("publish data-->"+data);//重置读取的指针mqttPublishMessage.payload().resetReaderIndex();switch (qos) {case AT_MOST_ONCE: // 至多一次//推送到订阅的客户端subscribSend(mqttMessage);break;case AT_LEAST_ONCE: // 至少一次// 构建返回报文, 可变报头MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());// 构建返回报文, 固定报头MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);// 构建PUBACK消息体MqttPubAckMessage pubAck = new MqttPubAckMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);log.info("back--"+pubAck.toString());channel.writeAndFlush(pubAck);//推送到订阅的客户端subscribSend(mqttMessage);break;case EXACTLY_ONCE: // 刚好一次// 构建返回报文, 固定报头MqttFixedHeader mqttFixedHeaderBack2 = new MqttFixedHeader(MqttMessageType.PUBREC,false, MqttQoS.AT_LEAST_ONCE,false,0x02);// 构建返回报文, 可变报头MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack2 = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack2,mqttMessageIdVariableHeaderBack2);//服务端收到publis的QoS2的消息之后,服务端需要保存一个msgid的记录,并且进入一个状态,// 即之后不管来了几个这个msgid的消息,都不管他,认为是重复的,丢弃。//接收到publish的QoS2消息之后,不能马上投递给上层,而是在本地做持久化,将消息保存起来。int mqttMessageId=mqttPublishMessage.variableHeader().packetId();if(!mqttMessageIdMap.containsKey(mqttMessageId)){//不存在此消息,将此消息暂存mqttMessageIdMap.put(mqttMessageId, mqttMessage);log.info("消息ID"+mqttMessageId+"-->Qos2级别消息,消息缓存");}else{//重复发送消息,直接返回log.info(mqttPublishMessage.variableHeader().packetId()+"消息重复:"+mqttPublishMessage.fixedHeader().isDup());return;}channel.writeAndFlush(mqttMessageBack);break;default:break;}}/*** 发布完成 qos2* @param channel* @param mqttMessage*/public void pubcomp (Channel channel, MqttMessage mqttMessage) {MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();// 构建返回报文, 固定报头MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBCOMP,false, MqttQoS.AT_MOST_ONCE,false,0x02);// 构建返回报文, 可变报头MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack,mqttMessageIdVariableHeaderBack);//log.info("back--"+mqttMessageBack.toString());channel.writeAndFlush(mqttMessageBack);}/*** 订阅确认* @param channel* @param mqttMessage*/public void suback(Channel channel, MqttMessage mqttMessage) {MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) mqttMessage;MqttMessageIdVariableHeader messageIdVariableHeader = mqttSubscribeMessage.variableHeader();// 构建返回报文, 可变报头MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());Set<String> topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription -> mqttTopicSubscription.topicName()).collect(Collectors.toSet());//log.info(topics.toString());List<Integer> grantedQoSLevels = new ArrayList<>(topics.size());for (int i = 0; i < topics.size(); i++) {grantedQoSLevels.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value());}// 构建返回报文 有效负载MqttSubAckPayload payloadBack = new MqttSubAckPayload(grantedQoSLevels);// 构建返回报文 固定报头MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2+topics.size());// 构建返回报文 订阅确认MqttSubAckMessage subAck = new MqttSubAckMessage(mqttFixedHeaderBack,variableHeaderBack, payloadBack);channel.writeAndFlush(subAck);}/*** 取消订阅确认* @param channel* @param mqttMessage*/public void unsuback(Channel channel, MqttMessage mqttMessage) {MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();// 构建返回报文 可变报头MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());// 构建返回报文 固定报头MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2);// 构建返回报文 取消订阅确认MqttUnsubAckMessage unSubAck = new MqttUnsubAckMessage(mqttFixedHeaderBack,variableHeaderBack);channel.writeAndFlush(unSubAck);}/*** 心跳响应* @param channel* @param mqttMessage*/public void pingresp (Channel channel, MqttMessage mqttMessage) {// 心跳响应报文 11010000 00000000 固定报文MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);MqttMessage mqttMessageBack = new MqttMessage(fixedHeader);channel.writeAndFlush(mqttMessageBack);}/*** 订阅推送*/public void subscribSend(MqttMessage mqttMessage){MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;Object obj=mqttMessage.variableHeader();MqttPublishVariableHeader variableHeader=(MqttPublishVariableHeader)obj;String topicName=variableHeader.topicName();int packetId=variableHeader.packetId();//固定消息头 注意此处的消息类型PUBLISH mqtt协议MqttFixedHeader FixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH,false, MqttQoS.AT_LEAST_ONCE,false,0);//可变消息头MqttPublishVariableHeader mqttPublishVariableHeader=new MqttPublishVariableHeader(topicName,packetId);//推送消息体MqttPublishMessage mqttPublishMessageResult=new MqttPublishMessage(FixedHeader,mqttPublishVariableHeader, mqttPublishMessage.content());log.info("推送地址————》"+topicName);if(subscribeMap.containsKey(topicName)){List<ChannelId> channelList=subscribeMap.get(topicName);for (int i = 0; i < channelList.size(); i++) {//订阅次此topic的Mqtt客户端搜到此消息,Channel channelSub=MQTTdeviceChannelGroup.find(channelList.get(i));//writeAndFlush会将ByteBuf的引用释放refCnt会减去1,使用retain加1mqttPublishMessageResult.retain();channelSub.writeAndFlush(mqttPublishMessageResult);}mqttPublishMessageResult.release();}}/*** 用户鉴权*/public boolean authentication(MqttConnectPayload payload){String username=MQTTserverProperties.getUsername();String password=MQTTserverProperties.getPassword();//无账号或者无密码通过if(stringEmptyCheck(password)||stringEmptyCheck(username)){return true;}else {//消息中账号密码为空if(payload.passwordInBytes()==null||payload.userName()==null){return false;}String passwordAuthen=new String(payload.passwordInBytes());String usernameAuthen=payload.userName();if(password.equals(passwordAuthen)&&username.equals(usernameAuthen)){return true;}else {return false;}}}//判断字符字符为空private boolean stringEmptyCheck(String str){if(str==null||"".equals(str)){return true;}else {return false;}}
}
2、新建MqttChannelInit类
package com.example.springnettymqtt.MQTTServer.channel;import com.example.springnettymqtt.MQTTServer.handler.MQTTMessageHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;@Component
@RequiredArgsConstructor
public class MqttChannelInit extends ChannelInitializer<SocketChannel> {@Autowiredprivate MQTTMessageHandler MQTTmessageHandler;@Overrideprotected void initChannel(SocketChannel channel) {channel.pipeline()// 心跳时间.addLast("idle", new IdleStateHandler(600, 600, 1200, TimeUnit.SECONDS)).addLast("encoder", MqttEncoder.INSTANCE).addLast("decoder", new MqttDecoder()).addLast(MQTTmessageHandler);}
}
3、新建MQTTDeviceManerger类
package com.example.springnettymqtt.MQTTServer.channel;import lombok.extern.slf4j.Slf4j;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import lombok.extern.slf4j.Slf4j;import java.util.List;
import java.util.Map;import static com.example.springnettymqtt.MQTTServer.server.impl.MQTTServer.*;@Slf4j
public class MQTTDeviceManerger {/*** 设备接入*/public static void MQTTdeviceAdd(Channel channel) {if(!MQTTdeviceChannelGroup.contains(channel)) {MQTTdeviceChannelGroup.add(channel);}}/*** 设备移除和和订阅的topic*/public static void MQTTdeviceRemove(Channel channel) {if(MQTTdeviceChannelGroup.contains(channel)) {MQTTdeviceChannelGroup.remove(channel);MQTTremoveDeviceChannelId(channel.id());//移除topic中的这个设备的chanelidfor (Map.Entry<String, List<ChannelId>> listEntry : subscribeMap.entrySet()) {try {if (listEntry.getValue().contains(channel.id())) {listEntry.getValue().remove(channel.id());log.info(channel.id() + "下线,topic: " + listEntry.getKey() + "中移除此id");}} catch (Exception e) {e.printStackTrace();}}}}public static void MQTTremoveDeviceChannelId(ChannelId channelId) {MQTTdeviceMap.entrySet().removeIf(item -> item.getValue().equals(channelId));}
}
4、新建配置类
package com.example.springnettymqtt.MQTTServer.config;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;@Configuration
@ConfigurationProperties(prefix = MQTTServerProperties.MQTTPREFIX)
@Data
public class MQTTServerProperties {public static final String MQTTPREFIX = "netty.mqtt";/*** 服务器端口*/private Integer port;/*** mqtt服务器用户名*/private String username;/*** mqtt服务器密码*/private String password;
}
5、新建MQTTMessageHandler类
package com.example.springnettymqtt.MQTTServer.handler;import com.example.springnettymqtt.MQTTServer.callback.BootNettyMqttMsgBack;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import io.netty.channel.*;
import io.netty.handler.codec.mqtt.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;import static com.example.springnettymqtt.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceRemove;
import static com.example.springnettymqtt.MQTTServer.server.impl.MQTTServer.*;@Slf4j
@Component
@ChannelHandler.Sharable
@RequiredArgsConstructor
public class MQTTMessageHandler extends ChannelInboundHandlerAdapter {@Autowiredprivate BootNettyMqttMsgBack bootNettyMqttMsgBack;@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (null != msg) {MqttMessage mqttMessage = (MqttMessage) msg;log.info("info--"+mqttMessage.toString());MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();Channel channel = ctx.channel();if(mqttFixedHeader.messageType().equals(MqttMessageType.CONNECT)){//用户鉴权(配置文件中配置账号和密码,如果没有默认)log.warn("正在尝试鉴权");boolean authentag= bootNettyMqttMsgBack.authentication((MqttConnectPayload)mqttMessage.payload());if(!authentag){return;}// 在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接if(MQTTdeviceChannelGroup.contains(channel)){//移除次设备channel和topicbootNettyMqttMsgBack.disconnack(channel,mqttMessage);}// to do 建议connect消息单独处理,用来对客户端进行认证管理等 这里直接返回一个CONNACK消息bootNettyMqttMsgBack.connack(channel, mqttMessage);}//对于没有鉴权的设备,请求不处理if(!MQTTdeviceChannelGroup.contains(channel)){log.warn(channel.id()+"无鉴权操作");return;}switch (mqttFixedHeader.messageType()){case PUBLISH: // 客户端发布消息// PUBACK报文是对QoS 1等级的PUBLISH报文的响应bootNettyMqttMsgBack.puback(channel, mqttMessage);break;// PUBREL Qos2级别消息,客户端返回case PUBREL:// PUBREL(客户端发给服务端)报文是对PUBREC(服务端发给客户端)报文的响应//服务端收到pubrel之后,正式将消息投递给上层应用层。MqttMessageIdVariableHeader VariableHeader=(MqttMessageIdVariableHeader)mqttMessage.variableHeader();if(mqttMessageIdMap.containsKey(VariableHeader.messageId())) {log.warn("移除消息缓存-->消息id"+VariableHeader.messageId());bootNettyMqttMsgBack.subscribSend(mqttMessageIdMap.get(VariableHeader.messageId()));bootNettyMqttMsgBack.pubcomp(channel, mqttMessage);mqttMessageIdMap.remove(VariableHeader.messageId());}else {//后续多次收到REL消息,制作comp响应bootNettyMqttMsgBack.pubcomp(channel, mqttMessage);}break;case SUBSCRIBE: // 客户端订阅主题// 客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅,每个订阅注册客户端关心的一个或多个主题。// 为了将应用消息转发给与那些订阅匹配的主题,服务端发送PUBLISH报文给客户端。// SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端// to dobootNettyMqttMsgBack.suback(channel, mqttMessage);MqttSubscribePayload SubscribePayload=(MqttSubscribePayload) mqttMessage.payload();;for (int i = 0; i < SubscribePayload.topicSubscriptions().size(); i++) {String topicname=SubscribePayload.topicSubscriptions().get(i).topicName();boolean tag=subscribeMap.containsKey(topicname);if(tag){List<ChannelId> channelIds=subscribeMap.get(topicname);if(!channelIds.contains(channel.id())) {channelIds.add(channel.id());}else {log.warn(channel.id()+"重复订阅");}subscribeMap.put(topicname, channelIds);}else {List<ChannelId> channelIds=new ArrayList<>();channelIds.add(channel.id());subscribeMap.put(topicname,channelIds);}log.info(channel.id()+"订阅地址————》"+topicname);}break;case UNSUBSCRIBE: // 客户端取消订阅// 客户端发送UNSUBSCRIBE报文给服务端,用于取消订阅主题// to dobootNettyMqttMsgBack.unsuback(channel, mqttMessage);Object Unsubscribe=mqttMessage.payload();MqttUnsubscribePayload unsubscribePayload=(MqttUnsubscribePayload)Unsubscribe;int len=unsubscribePayload.topics().size();for (int i = 0; i < len; i++) {String topicname=unsubscribePayload.topics().get(i);boolean tag=subscribeMap.containsKey(topicname);if(tag){List<ChannelId> channelIds=subscribeMap.get(topicname);channelIds.remove(channel.id());subscribeMap.put(topicname,channelIds);}else {log.error("不存在订阅地址——>"+topicname);}log.info(channel.id()+"取消订阅地址————》"+topicname);}break;case PINGREQ: // 客户端发起心跳// 客户端发送PINGREQ报文给服务端的// 在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着// 请求服务端发送 响应确认它还活着,使用网络以确认网络连接没有断开bootNettyMqttMsgBack.pingresp(channel, mqttMessage);break;case DISCONNECT: // 客户端主动断开连接log.debug("设备下线,channelId:{}", channel.id());MQTTdeviceRemove(channel);// DISCONNECT报文是客户端发给服务端的最后一个控制报文, 服务端必须验证所有的保留位都被设置为0// to dobreak;default:break;}}else {return;}}/*** 从客户端收到新的数据、读取完成时调用*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws IOException {}/*** 客户端与服务端第一次建立连接时执行 在channelActive方法之前执行*/@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {super.channelRegistered(ctx);}/*** 客户端与服务端 断连时执行 channelInactive方法之后执行*/@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {log.warn(ctx.channel().id()+"连接断开");MQTTdeviceRemove(ctx.channel());super.channelUnregistered(ctx);}/*** 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {Channel channel = ctx.channel();log.warn(channel.id()+"连接异常断开。。。。。。。");MQTTdeviceRemove(ctx.channel());super.exceptionCaught(ctx, cause);if(channel.isActive()){ctx.close();}}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {log.debug("\n");}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {super.channelActive(ctx);}/*** 服务端 当读超时时 会调用这个方法*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {super.userEventTriggered(ctx, evt);ctx.close();}@Overridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {super.channelWritabilityChanged(ctx);}
}
6、新建接口IMQTTServer及其实现类MQTTServer
package com.example.springnettymqtt.MQTTServer.server;import javax.annotation.PreDestroy;public interface IMQTTServer {/*** 主启动程序,初始化参数** @throws Exception 初始化异常*/void start() throws Exception;/*** 优雅的结束服务器** @throws InterruptedException 提前中断异常*/@PreDestroyvoid destroy() throws InterruptedException;
}
package com.example.springnettymqtt.MQTTServer.server.impl;import com.example.springnettymqtt.MQTTServer.channel.MqttChannelInit;
import com.example.springnettymqtt.MQTTServer.config.MQTTServerProperties;
import com.example.springnettymqtt.MQTTServer.server.IMQTTServer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;@Component
@Slf4j
@RequiredArgsConstructor
public class MQTTServer implements IMQTTServer {private final MqttChannelInit mqttChannelInit;private final MQTTServerProperties MQTTserverProperties;//保存接入的MQTT设备channelpublic static ChannelGroup MQTTdeviceChannelGroup;//保存订阅地址和chanelid,当推送数据时,会向此订阅地址的多个channel发送数据public static Map<String, List<ChannelId>> subscribeMap =new ConcurrentHashMap();//保存设备名称和通道编号,向设备发送消息可以通过名称找到通道public static ConcurrentHashMap<String, ChannelId> MQTTdeviceMap = new ConcurrentHashMap<>();//存放Qos2消息等级的消息ID,这里可使用redis之类的工具做缓存,为了简化配置,使用map暂存public static ConcurrentHashMap<Integer, MqttMessage> mqttMessageIdMap=new ConcurrentHashMap();private EventLoopGroup bossGroup;private EventLoopGroup workerGroup;@Overridepublic void start() {log.info("初始化 Mqttserver ...");bossGroup = new NioEventLoopGroup();workerGroup = new NioEventLoopGroup();MQTTdeviceChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);this.MqttServer();}/*** 初始化*/private void MqttServer() {try {new ServerBootstrap().group(bossGroup, workerGroup).channel( NioServerSocketChannel.class ).localAddress(new InetSocketAddress(MQTTserverProperties.getPort()))// 配置 编码器、解码器、业务处理.childHandler(mqttChannelInit)// tcp缓冲区.option(ChannelOption.SO_BACKLOG, 128)// 将网络数据积累到一定的数量后,服务器端才发送出去,会造成一定的延迟。希望服务是低延迟的,建议将TCP_NODELAY设置为true.childOption(ChannelOption.TCP_NODELAY, true)// 保持长连接.childOption(ChannelOption.SO_KEEPALIVE, true)// 绑定端口,开始接收进来的连接.bind().sync();log.info("MQTT服务启动成功!开始监听端口:{}", MQTTserverProperties.getPort());} catch (Exception e) {e.printStackTrace();bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}/*** 销毁*/@PreDestroy@Overridepublic void destroy() {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}
}
7、新建启动类
package com.example.springnettymqtt.startServer;import com.example.springnettymqtt.MQTTServer.server.impl.MQTTServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Component
public class StartSrver {@Autowiredprivate MQTTServer mqttServer;@PostConstructpublic void startNetty(){new Thread(() -> {try {mqttServer.start();} catch (Exception e) {e.printStackTrace();}}).start();}
}相关文章:
spring boot+netty 搭建MQTT broken
一、项目结构 二、安装依赖 <!-- netty包 --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.75.Final</version></dependency><!-- 常用JSON工具包 --><…...
从零开始搭建React+TypeScript+webpack开发环境-使用iconfont构建图标库
创建iconfont项目 进入iconfont官网,完成注册流程,即可创建项目。 无法访问iconfont可尝试将电脑dns改为阿里云镜像223.5.5.5和223.6.6.6 添加图标 在图标库里选择图标,加入购物车 将图标添加到之前创建的项目中 生成代码 将代码配置到项目…...
微服务之初始微服务
文章目录 一、服务架构演变1.单体架构2.分布式架构 二、认识微服务三、总结四、微服务技术对比五、SpringCloud注意 一、服务架构演变 1.单体架构 单体架构:将业务的所有功能集中在一个项目中开发,打成一个包部署。 优点: 架构简单部署成本…...
大口径智能水表支持最高水流量是多少?
随着科技的不断发展,我国城市化进程的加快,水资源管理日益受到重视。作为一种先进的用水计量设备,大口径智能水表凭借其高精度、低误差、远程抄表等优点,在市场上备受青睐。那么接下来,小编就来为大家详细的介绍一下大…...
在Spring Boot中使用MyBatis访问数据库
MyBatis,这个对各位使用Java开发的开发者来说还是蛮重要的,我相信诸位在企业开发项目的时候,大多数采用的是Mybatis。使用MyBatis帮助我们解决各种问题,实际上这篇文章,基本上默认为可以跳过的一篇,但是为了…...
懒羊羊闲话2
前言: 笔者谈不上是某个领域的高手,也不是大厂的某个神秘高手,一直游离于小型公司,写下这篇文章献给那些无法接触到好的学习环境,苦恼自己原地踏步的coder。 1、如何快速熟悉某个行业 作为一个编码多年的程序员&#…...
多路转接(上)——select
目录 一、select接口 1.认识select系统调用 2.对各个参数的认识 二、编写select服务器 1.两个工具类 2.网络套接字封装 3.服务器类编写 4.源文件编写 5.运行 一、select接口 1.认识select系统调用 int select(int nfds, fd_set readfds, fd_set writefds, fd_set ex…...
基于SSM的图书管理借阅系统设计与实现
末尾获取源码 开发语言:Java Java开发工具:JDK1.8 后端框架:SSM 前端:采用JSP技术开发 数据库:MySQL5.7和Navicat管理工具结合 服务器:Tomcat8.5 开发软件:IDEA / Eclipse 是否Maven项目&#x…...
Python的内存优化
在Python中,内存管理和优化是一个复杂的话题,因为它涉及到Python解释器的内部机制,特别是Python的垃圾收集和内存分配策略。Python通过自动垃圾收集机制管理内存,主要包括引用计数和标记-清除算法。 Python内存管理机制ÿ…...
蓝桥杯-回文日期[Java]
目录: 学习目标: 学习内容: 学习时间: 题目: 题目描述: 输入描述: 输出描述: 输入输出样例: 示例 1: 运行限制: 题解: 思路: 学习目标: 刷蓝桥杯题库日记 学习内容: 编号498题目回文日期难度…...
acwing算法基础之搜索与图论--树与图的遍历
目录 1 基础知识2 模板3 工程化 1 基础知识 树和图的存储:邻接矩阵、邻接表。 树和图的遍历:dfs、bfs。 2 模板 树是一种特殊的图(即,无环连通图),与图的存储方式相同。 对于无向图中的边ab,…...
前端uniapp请求真是案例(带源码)
目录 案例一案例二最后 案例一 <template><view class"box"><!-- <view class"title-back" click"backPrivious"><</view> --><!-- <view class"title-back" click"backPrivious"…...
MySQL -- mysql connect
MySQL – mysql connect 文章目录 MySQL -- mysql connect一、Connector/C 使用1.环境安装2.尝试链接mysql client 二、MySQL接口1.初始化2.链接数据库3.下发mysql命令4.获取执行结果5.关闭mysql链接6.在C语言中连接MySQL 三、MySQL图形化界面推荐 使用C接口库来进行连接 一、…...
如何用AI帮你下载安卓源码
以Android 11源码下载流程图如下所示: 1. 安装Git和Repo工具 2. 创建一个工作目录 3. 初始化仓库并下载源码 4. 切换到指定的分支 5. 编译源码 具体步骤如下: 安装Git和Repo工具:在Linux或Mac上,可以通过终端运行以下命令安装Gi…...
第三章:人工智能深度学习教程-基础神经网络(第三节-Tensorflow 中的多层感知器学习)
在本文中,我们将了解多层感知器的概念及其使用 TensorFlow 库在 Python 中的实现。 多层感知器 多层感知也称为MLP。它是完全连接的密集层,可将任何输入维度转换为所需的维度。多层感知是具有多个层的神经网络。为了创建神经网络,我们将神…...
Python的版本如何查询?
要查询Python的版本,可以使用以下方法之一: 1.在命令行中使用python --version命令。这会显示安装在计算机上的Python解释器的版本号。 # Author : 小红牛 # 微信公众号:wdPython2.在Python脚本中使用import sys语句,然后打印sy…...
Git的高效使用 git的基础 高级用法
Git的高效使用 git的基础 高级用法 前言 什么是Git 在日常的软件开发过程中,软件版本的管理都离不开使用Git,Git是一个开源的分布式版本控制系统,可以有效、高速地处理从很小到非常大的项目版本管理。 也是Linus Torvalds为了帮助管理Linu…...
关于主表和子表数据的保存
业务需求: 投注站信息保存在表A里,投注站下的设备信息保存在表B里, 一个投注站会有多个设备,要在一个表单里进行投注站和设备信息的填写,保存,回填,修改。 思路: 1)将…...
如何在后台执行 SwiftData 操作
文章目录 前言Core Data 私有队列上下文SwiftData 并发支持使用 ModelActor合并上下文更改的问题通过标识符访问模型总结 前言 SwiftData 是一个用于处理数据操作的框架,特别是在 Swift 语言中进行并发操作。本文介绍了如何在后台执行 SwiftData 操作以及与 Core D…...
TCP和UPD协议
一)应用层协议简介:根据需求明确要传输的信息,明确要传输的数据格式; 应用层协议:这个协议,实际上是和程序员打交道最多的协议了 1)其它四层都是操作系统,驱动,硬件实现好了的,咱们是不需要管 2)应用层:当我…...
ssc377d修改flash分区大小
1、flash的分区默认分配16M、 / # df -h Filesystem Size Used Available Use% Mounted on /dev/root 1.9M 1.9M 0 100% / /dev/mtdblock4 3.0M...
Opencv中的addweighted函数
一.addweighted函数作用 addweighted()是OpenCV库中用于图像处理的函数,主要功能是将两个输入图像(尺寸和类型相同)按照指定的权重进行加权叠加(图像融合),并添加一个标量值&#x…...
sqlserver 根据指定字符 解析拼接字符串
DECLARE LotNo NVARCHAR(50)A,B,C DECLARE xml XML ( SELECT <x> REPLACE(LotNo, ,, </x><x>) </x> ) DECLARE ErrorCode NVARCHAR(50) -- 提取 XML 中的值 SELECT value x.value(., VARCHAR(MAX))…...
反射获取方法和属性
Java反射获取方法 在Java中,反射(Reflection)是一种强大的机制,允许程序在运行时访问和操作类的内部属性和方法。通过反射,可以动态地创建对象、调用方法、改变属性值,这在很多Java框架中如Spring和Hiberna…...
【决胜公务员考试】求职OMG——见面课测验1
2025最新版!!!6.8截至答题,大家注意呀! 博主码字不易点个关注吧,祝期末顺利~~ 1.单选题(2分) 下列说法错误的是:( B ) A.选调生属于公务员系统 B.公务员属于事业编 C.选调生有基层锻炼的要求 D…...
第 86 场周赛:矩阵中的幻方、钥匙和房间、将数组拆分成斐波那契序列、猜猜这个单词
Q1、[中等] 矩阵中的幻方 1、题目描述 3 x 3 的幻方是一个填充有 从 1 到 9 的不同数字的 3 x 3 矩阵,其中每行,每列以及两条对角线上的各数之和都相等。 给定一个由整数组成的row x col 的 grid,其中有多少个 3 3 的 “幻方” 子矩阵&am…...
Python Einops库:深度学习中的张量操作革命
Einops(爱因斯坦操作库)就像给张量操作戴上了一副"语义眼镜"——让你用人类能理解的方式告诉计算机如何操作多维数组。这个基于爱因斯坦求和约定的库,用类似自然语言的表达式替代了晦涩的API调用,彻底改变了深度学习工程…...
API网关Kong的鉴权与限流:高并发场景下的核心实践
🔥「炎码工坊」技术弹药已装填! 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 引言 在微服务架构中,API网关承担着流量调度、安全防护和协议转换的核心职责。作为云原生时代的代表性网关,Kong凭借其插件化架构…...
Python实现简单音频数据压缩与解压算法
Python实现简单音频数据压缩与解压算法 引言 在音频数据处理中,压缩算法是降低存储成本和传输效率的关键技术。Python作为一门灵活且功能强大的编程语言,提供了丰富的库和工具来实现音频数据的压缩与解压。本文将通过一个简单的音频数据压缩与解压算法…...
jdbc查询mysql数据库时,出现id顺序错误的情况
我在repository中的查询语句如下所示,即传入一个List<intager>的数据,返回这些id的问题列表。但是由于数据库查询时ID列表的顺序与预期不一致,会导致返回的id是从小到大排列的,但我不希望这样。 Query("SELECT NEW com…...
