spring boot+netty 搭建MQTT broken
一、项目结构
二、安装依赖
<!-- netty包 --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.75.Final</version></dependency><!-- 常用JSON工具包 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.80</version></dependency><!--mqtt服务端--><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>
三、配置文件
server:port: 8880
netty:mqtt:port: 1883
# mqtt账号username: admin
#mqtt密码password: 123456
# 日记配置
logging:level:# 开启debug日记打印com.hyx: debug
四、新建具体类
1、新建BootNettyMqttMsgBack类
package com.example.springnettymqtt.MQTTServer.callback;import com.example.springnettymqtt.MQTTServer.config.MQTTServerProperties;
import io.netty.channel.Channel;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;import io.netty.channel.ChannelId;
import io.netty.handler.codec.mqtt.*;import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;import static com.example.springnettymqtt.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceAdd;
import static com.example.springnettymqtt.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceRemove;
import static com.example.springnettymqtt.MQTTServer.server.impl.MQTTServer.*;@Component
@RequiredArgsConstructor
public class BootNettyMqttMsgBack {private static final Logger log = LoggerFactory.getLogger(BootNettyMqttMsgBack.class);private final MQTTServerProperties MQTTserverProperties;/*** 确认连接请求* @param channel* @param mqttMessage*/public void connack (Channel channel, MqttMessage mqttMessage) {MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();// 构建返回报文, 可变报头MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeaderInfo.isCleanSession());// 构建返回报文, 固定报头MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.CONNACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);// 构建CONNACK消息体MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);//log.info("back--"+connAck.toString());log.debug("设备上线,channelId:{}", channel.id());MQTTdeviceAdd(channel);channel.writeAndFlush(connAck);}public void disconnack (Channel channel, MqttMessage mqttMessage) {MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();// 构建返回报文, 可变报头MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BANNED, mqttConnectVariableHeaderInfo.isCleanSession());// 构建返回报文, 固定报头MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.DISCONNECT,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);// 构建CONNACK消息体MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);//log.info("back--"+connAck.toString());channel.writeAndFlush(connAck);log.debug("设备下线,channelId:{}", channel.id());MQTTdeviceRemove(channel);}/*** 根据qos发布确认* @param channel* @param mqttMessage*/public void puback (Channel channel, MqttMessage mqttMessage) {MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader();MqttQoS qos = mqttFixedHeaderInfo.qosLevel();//注意: readableBytes会改变写指针位置,使后续推送数据时,读取数据为空,需要重置 读指针byte[] headBytes = new byte[mqttPublishMessage.payload().readableBytes()];mqttPublishMessage.payload().readBytes(headBytes);String data = new String(headBytes);System.out.println("publish data-->"+data);//重置读取的指针mqttPublishMessage.payload().resetReaderIndex();switch (qos) {case AT_MOST_ONCE: // 至多一次//推送到订阅的客户端subscribSend(mqttMessage);break;case AT_LEAST_ONCE: // 至少一次// 构建返回报文, 可变报头MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());// 构建返回报文, 固定报头MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);// 构建PUBACK消息体MqttPubAckMessage pubAck = new MqttPubAckMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);log.info("back--"+pubAck.toString());channel.writeAndFlush(pubAck);//推送到订阅的客户端subscribSend(mqttMessage);break;case EXACTLY_ONCE: // 刚好一次// 构建返回报文, 固定报头MqttFixedHeader mqttFixedHeaderBack2 = new MqttFixedHeader(MqttMessageType.PUBREC,false, MqttQoS.AT_LEAST_ONCE,false,0x02);// 构建返回报文, 可变报头MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack2 = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack2,mqttMessageIdVariableHeaderBack2);//服务端收到publis的QoS2的消息之后,服务端需要保存一个msgid的记录,并且进入一个状态,// 即之后不管来了几个这个msgid的消息,都不管他,认为是重复的,丢弃。//接收到publish的QoS2消息之后,不能马上投递给上层,而是在本地做持久化,将消息保存起来。int mqttMessageId=mqttPublishMessage.variableHeader().packetId();if(!mqttMessageIdMap.containsKey(mqttMessageId)){//不存在此消息,将此消息暂存mqttMessageIdMap.put(mqttMessageId, mqttMessage);log.info("消息ID"+mqttMessageId+"-->Qos2级别消息,消息缓存");}else{//重复发送消息,直接返回log.info(mqttPublishMessage.variableHeader().packetId()+"消息重复:"+mqttPublishMessage.fixedHeader().isDup());return;}channel.writeAndFlush(mqttMessageBack);break;default:break;}}/*** 发布完成 qos2* @param channel* @param mqttMessage*/public void pubcomp (Channel channel, MqttMessage mqttMessage) {MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();// 构建返回报文, 固定报头MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBCOMP,false, MqttQoS.AT_MOST_ONCE,false,0x02);// 构建返回报文, 可变报头MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack,mqttMessageIdVariableHeaderBack);//log.info("back--"+mqttMessageBack.toString());channel.writeAndFlush(mqttMessageBack);}/*** 订阅确认* @param channel* @param mqttMessage*/public void suback(Channel channel, MqttMessage mqttMessage) {MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) mqttMessage;MqttMessageIdVariableHeader messageIdVariableHeader = mqttSubscribeMessage.variableHeader();// 构建返回报文, 可变报头MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());Set<String> topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription -> mqttTopicSubscription.topicName()).collect(Collectors.toSet());//log.info(topics.toString());List<Integer> grantedQoSLevels = new ArrayList<>(topics.size());for (int i = 0; i < topics.size(); i++) {grantedQoSLevels.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value());}// 构建返回报文 有效负载MqttSubAckPayload payloadBack = new MqttSubAckPayload(grantedQoSLevels);// 构建返回报文 固定报头MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2+topics.size());// 构建返回报文 订阅确认MqttSubAckMessage subAck = new MqttSubAckMessage(mqttFixedHeaderBack,variableHeaderBack, payloadBack);channel.writeAndFlush(subAck);}/*** 取消订阅确认* @param channel* @param mqttMessage*/public void unsuback(Channel channel, MqttMessage mqttMessage) {MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();// 构建返回报文 可变报头MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());// 构建返回报文 固定报头MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2);// 构建返回报文 取消订阅确认MqttUnsubAckMessage unSubAck = new MqttUnsubAckMessage(mqttFixedHeaderBack,variableHeaderBack);channel.writeAndFlush(unSubAck);}/*** 心跳响应* @param channel* @param mqttMessage*/public void pingresp (Channel channel, MqttMessage mqttMessage) {// 心跳响应报文 11010000 00000000 固定报文MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);MqttMessage mqttMessageBack = new MqttMessage(fixedHeader);channel.writeAndFlush(mqttMessageBack);}/*** 订阅推送*/public void subscribSend(MqttMessage mqttMessage){MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;Object obj=mqttMessage.variableHeader();MqttPublishVariableHeader variableHeader=(MqttPublishVariableHeader)obj;String topicName=variableHeader.topicName();int packetId=variableHeader.packetId();//固定消息头 注意此处的消息类型PUBLISH mqtt协议MqttFixedHeader FixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH,false, MqttQoS.AT_LEAST_ONCE,false,0);//可变消息头MqttPublishVariableHeader mqttPublishVariableHeader=new MqttPublishVariableHeader(topicName,packetId);//推送消息体MqttPublishMessage mqttPublishMessageResult=new MqttPublishMessage(FixedHeader,mqttPublishVariableHeader, mqttPublishMessage.content());log.info("推送地址————》"+topicName);if(subscribeMap.containsKey(topicName)){List<ChannelId> channelList=subscribeMap.get(topicName);for (int i = 0; i < channelList.size(); i++) {//订阅次此topic的Mqtt客户端搜到此消息,Channel channelSub=MQTTdeviceChannelGroup.find(channelList.get(i));//writeAndFlush会将ByteBuf的引用释放refCnt会减去1,使用retain加1mqttPublishMessageResult.retain();channelSub.writeAndFlush(mqttPublishMessageResult);}mqttPublishMessageResult.release();}}/*** 用户鉴权*/public boolean authentication(MqttConnectPayload payload){String username=MQTTserverProperties.getUsername();String password=MQTTserverProperties.getPassword();//无账号或者无密码通过if(stringEmptyCheck(password)||stringEmptyCheck(username)){return true;}else {//消息中账号密码为空if(payload.passwordInBytes()==null||payload.userName()==null){return false;}String passwordAuthen=new String(payload.passwordInBytes());String usernameAuthen=payload.userName();if(password.equals(passwordAuthen)&&username.equals(usernameAuthen)){return true;}else {return false;}}}//判断字符字符为空private boolean stringEmptyCheck(String str){if(str==null||"".equals(str)){return true;}else {return false;}}
}
2、新建MqttChannelInit类
package com.example.springnettymqtt.MQTTServer.channel;import com.example.springnettymqtt.MQTTServer.handler.MQTTMessageHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;@Component
@RequiredArgsConstructor
public class MqttChannelInit extends ChannelInitializer<SocketChannel> {@Autowiredprivate MQTTMessageHandler MQTTmessageHandler;@Overrideprotected void initChannel(SocketChannel channel) {channel.pipeline()// 心跳时间.addLast("idle", new IdleStateHandler(600, 600, 1200, TimeUnit.SECONDS)).addLast("encoder", MqttEncoder.INSTANCE).addLast("decoder", new MqttDecoder()).addLast(MQTTmessageHandler);}
}
3、新建MQTTDeviceManerger类
package com.example.springnettymqtt.MQTTServer.channel;import lombok.extern.slf4j.Slf4j;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import lombok.extern.slf4j.Slf4j;import java.util.List;
import java.util.Map;import static com.example.springnettymqtt.MQTTServer.server.impl.MQTTServer.*;@Slf4j
public class MQTTDeviceManerger {/*** 设备接入*/public static void MQTTdeviceAdd(Channel channel) {if(!MQTTdeviceChannelGroup.contains(channel)) {MQTTdeviceChannelGroup.add(channel);}}/*** 设备移除和和订阅的topic*/public static void MQTTdeviceRemove(Channel channel) {if(MQTTdeviceChannelGroup.contains(channel)) {MQTTdeviceChannelGroup.remove(channel);MQTTremoveDeviceChannelId(channel.id());//移除topic中的这个设备的chanelidfor (Map.Entry<String, List<ChannelId>> listEntry : subscribeMap.entrySet()) {try {if (listEntry.getValue().contains(channel.id())) {listEntry.getValue().remove(channel.id());log.info(channel.id() + "下线,topic: " + listEntry.getKey() + "中移除此id");}} catch (Exception e) {e.printStackTrace();}}}}public static void MQTTremoveDeviceChannelId(ChannelId channelId) {MQTTdeviceMap.entrySet().removeIf(item -> item.getValue().equals(channelId));}
}
4、新建配置类
package com.example.springnettymqtt.MQTTServer.config;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;@Configuration
@ConfigurationProperties(prefix = MQTTServerProperties.MQTTPREFIX)
@Data
public class MQTTServerProperties {public static final String MQTTPREFIX = "netty.mqtt";/*** 服务器端口*/private Integer port;/*** mqtt服务器用户名*/private String username;/*** mqtt服务器密码*/private String password;
}
5、新建MQTTMessageHandler类
package com.example.springnettymqtt.MQTTServer.handler;import com.example.springnettymqtt.MQTTServer.callback.BootNettyMqttMsgBack;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import io.netty.channel.*;
import io.netty.handler.codec.mqtt.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;import static com.example.springnettymqtt.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceRemove;
import static com.example.springnettymqtt.MQTTServer.server.impl.MQTTServer.*;@Slf4j
@Component
@ChannelHandler.Sharable
@RequiredArgsConstructor
public class MQTTMessageHandler extends ChannelInboundHandlerAdapter {@Autowiredprivate BootNettyMqttMsgBack bootNettyMqttMsgBack;@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (null != msg) {MqttMessage mqttMessage = (MqttMessage) msg;log.info("info--"+mqttMessage.toString());MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();Channel channel = ctx.channel();if(mqttFixedHeader.messageType().equals(MqttMessageType.CONNECT)){//用户鉴权(配置文件中配置账号和密码,如果没有默认)log.warn("正在尝试鉴权");boolean authentag= bootNettyMqttMsgBack.authentication((MqttConnectPayload)mqttMessage.payload());if(!authentag){return;}// 在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接if(MQTTdeviceChannelGroup.contains(channel)){//移除次设备channel和topicbootNettyMqttMsgBack.disconnack(channel,mqttMessage);}// to do 建议connect消息单独处理,用来对客户端进行认证管理等 这里直接返回一个CONNACK消息bootNettyMqttMsgBack.connack(channel, mqttMessage);}//对于没有鉴权的设备,请求不处理if(!MQTTdeviceChannelGroup.contains(channel)){log.warn(channel.id()+"无鉴权操作");return;}switch (mqttFixedHeader.messageType()){case PUBLISH: // 客户端发布消息// PUBACK报文是对QoS 1等级的PUBLISH报文的响应bootNettyMqttMsgBack.puback(channel, mqttMessage);break;// PUBREL Qos2级别消息,客户端返回case PUBREL:// PUBREL(客户端发给服务端)报文是对PUBREC(服务端发给客户端)报文的响应//服务端收到pubrel之后,正式将消息投递给上层应用层。MqttMessageIdVariableHeader VariableHeader=(MqttMessageIdVariableHeader)mqttMessage.variableHeader();if(mqttMessageIdMap.containsKey(VariableHeader.messageId())) {log.warn("移除消息缓存-->消息id"+VariableHeader.messageId());bootNettyMqttMsgBack.subscribSend(mqttMessageIdMap.get(VariableHeader.messageId()));bootNettyMqttMsgBack.pubcomp(channel, mqttMessage);mqttMessageIdMap.remove(VariableHeader.messageId());}else {//后续多次收到REL消息,制作comp响应bootNettyMqttMsgBack.pubcomp(channel, mqttMessage);}break;case SUBSCRIBE: // 客户端订阅主题// 客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅,每个订阅注册客户端关心的一个或多个主题。// 为了将应用消息转发给与那些订阅匹配的主题,服务端发送PUBLISH报文给客户端。// SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端// to dobootNettyMqttMsgBack.suback(channel, mqttMessage);MqttSubscribePayload SubscribePayload=(MqttSubscribePayload) mqttMessage.payload();;for (int i = 0; i < SubscribePayload.topicSubscriptions().size(); i++) {String topicname=SubscribePayload.topicSubscriptions().get(i).topicName();boolean tag=subscribeMap.containsKey(topicname);if(tag){List<ChannelId> channelIds=subscribeMap.get(topicname);if(!channelIds.contains(channel.id())) {channelIds.add(channel.id());}else {log.warn(channel.id()+"重复订阅");}subscribeMap.put(topicname, channelIds);}else {List<ChannelId> channelIds=new ArrayList<>();channelIds.add(channel.id());subscribeMap.put(topicname,channelIds);}log.info(channel.id()+"订阅地址————》"+topicname);}break;case UNSUBSCRIBE: // 客户端取消订阅// 客户端发送UNSUBSCRIBE报文给服务端,用于取消订阅主题// to dobootNettyMqttMsgBack.unsuback(channel, mqttMessage);Object Unsubscribe=mqttMessage.payload();MqttUnsubscribePayload unsubscribePayload=(MqttUnsubscribePayload)Unsubscribe;int len=unsubscribePayload.topics().size();for (int i = 0; i < len; i++) {String topicname=unsubscribePayload.topics().get(i);boolean tag=subscribeMap.containsKey(topicname);if(tag){List<ChannelId> channelIds=subscribeMap.get(topicname);channelIds.remove(channel.id());subscribeMap.put(topicname,channelIds);}else {log.error("不存在订阅地址——>"+topicname);}log.info(channel.id()+"取消订阅地址————》"+topicname);}break;case PINGREQ: // 客户端发起心跳// 客户端发送PINGREQ报文给服务端的// 在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着// 请求服务端发送 响应确认它还活着,使用网络以确认网络连接没有断开bootNettyMqttMsgBack.pingresp(channel, mqttMessage);break;case DISCONNECT: // 客户端主动断开连接log.debug("设备下线,channelId:{}", channel.id());MQTTdeviceRemove(channel);// DISCONNECT报文是客户端发给服务端的最后一个控制报文, 服务端必须验证所有的保留位都被设置为0// to dobreak;default:break;}}else {return;}}/*** 从客户端收到新的数据、读取完成时调用*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws IOException {}/*** 客户端与服务端第一次建立连接时执行 在channelActive方法之前执行*/@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {super.channelRegistered(ctx);}/*** 客户端与服务端 断连时执行 channelInactive方法之后执行*/@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {log.warn(ctx.channel().id()+"连接断开");MQTTdeviceRemove(ctx.channel());super.channelUnregistered(ctx);}/*** 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {Channel channel = ctx.channel();log.warn(channel.id()+"连接异常断开。。。。。。。");MQTTdeviceRemove(ctx.channel());super.exceptionCaught(ctx, cause);if(channel.isActive()){ctx.close();}}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {log.debug("\n");}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {super.channelActive(ctx);}/*** 服务端 当读超时时 会调用这个方法*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {super.userEventTriggered(ctx, evt);ctx.close();}@Overridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {super.channelWritabilityChanged(ctx);}
}
6、新建接口IMQTTServer及其实现类MQTTServer
package com.example.springnettymqtt.MQTTServer.server;import javax.annotation.PreDestroy;public interface IMQTTServer {/*** 主启动程序,初始化参数** @throws Exception 初始化异常*/void start() throws Exception;/*** 优雅的结束服务器** @throws InterruptedException 提前中断异常*/@PreDestroyvoid destroy() throws InterruptedException;
}
package com.example.springnettymqtt.MQTTServer.server.impl;import com.example.springnettymqtt.MQTTServer.channel.MqttChannelInit;
import com.example.springnettymqtt.MQTTServer.config.MQTTServerProperties;
import com.example.springnettymqtt.MQTTServer.server.IMQTTServer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;@Component
@Slf4j
@RequiredArgsConstructor
public class MQTTServer implements IMQTTServer {private final MqttChannelInit mqttChannelInit;private final MQTTServerProperties MQTTserverProperties;//保存接入的MQTT设备channelpublic static ChannelGroup MQTTdeviceChannelGroup;//保存订阅地址和chanelid,当推送数据时,会向此订阅地址的多个channel发送数据public static Map<String, List<ChannelId>> subscribeMap =new ConcurrentHashMap();//保存设备名称和通道编号,向设备发送消息可以通过名称找到通道public static ConcurrentHashMap<String, ChannelId> MQTTdeviceMap = new ConcurrentHashMap<>();//存放Qos2消息等级的消息ID,这里可使用redis之类的工具做缓存,为了简化配置,使用map暂存public static ConcurrentHashMap<Integer, MqttMessage> mqttMessageIdMap=new ConcurrentHashMap();private EventLoopGroup bossGroup;private EventLoopGroup workerGroup;@Overridepublic void start() {log.info("初始化 Mqttserver ...");bossGroup = new NioEventLoopGroup();workerGroup = new NioEventLoopGroup();MQTTdeviceChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);this.MqttServer();}/*** 初始化*/private void MqttServer() {try {new ServerBootstrap().group(bossGroup, workerGroup).channel( NioServerSocketChannel.class ).localAddress(new InetSocketAddress(MQTTserverProperties.getPort()))// 配置 编码器、解码器、业务处理.childHandler(mqttChannelInit)// tcp缓冲区.option(ChannelOption.SO_BACKLOG, 128)// 将网络数据积累到一定的数量后,服务器端才发送出去,会造成一定的延迟。希望服务是低延迟的,建议将TCP_NODELAY设置为true.childOption(ChannelOption.TCP_NODELAY, true)// 保持长连接.childOption(ChannelOption.SO_KEEPALIVE, true)// 绑定端口,开始接收进来的连接.bind().sync();log.info("MQTT服务启动成功!开始监听端口:{}", MQTTserverProperties.getPort());} catch (Exception e) {e.printStackTrace();bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}/*** 销毁*/@PreDestroy@Overridepublic void destroy() {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}
}
7、新建启动类
package com.example.springnettymqtt.startServer;import com.example.springnettymqtt.MQTTServer.server.impl.MQTTServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Component
public class StartSrver {@Autowiredprivate MQTTServer mqttServer;@PostConstructpublic void startNetty(){new Thread(() -> {try {mqttServer.start();} catch (Exception e) {e.printStackTrace();}}).start();}
}
相关文章:

spring boot+netty 搭建MQTT broken
一、项目结构 二、安装依赖 <!-- netty包 --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.75.Final</version></dependency><!-- 常用JSON工具包 --><…...

从零开始搭建React+TypeScript+webpack开发环境-使用iconfont构建图标库
创建iconfont项目 进入iconfont官网,完成注册流程,即可创建项目。 无法访问iconfont可尝试将电脑dns改为阿里云镜像223.5.5.5和223.6.6.6 添加图标 在图标库里选择图标,加入购物车 将图标添加到之前创建的项目中 生成代码 将代码配置到项目…...

微服务之初始微服务
文章目录 一、服务架构演变1.单体架构2.分布式架构 二、认识微服务三、总结四、微服务技术对比五、SpringCloud注意 一、服务架构演变 1.单体架构 单体架构:将业务的所有功能集中在一个项目中开发,打成一个包部署。 优点: 架构简单部署成本…...

大口径智能水表支持最高水流量是多少?
随着科技的不断发展,我国城市化进程的加快,水资源管理日益受到重视。作为一种先进的用水计量设备,大口径智能水表凭借其高精度、低误差、远程抄表等优点,在市场上备受青睐。那么接下来,小编就来为大家详细的介绍一下大…...

在Spring Boot中使用MyBatis访问数据库
MyBatis,这个对各位使用Java开发的开发者来说还是蛮重要的,我相信诸位在企业开发项目的时候,大多数采用的是Mybatis。使用MyBatis帮助我们解决各种问题,实际上这篇文章,基本上默认为可以跳过的一篇,但是为了…...
懒羊羊闲话2
前言: 笔者谈不上是某个领域的高手,也不是大厂的某个神秘高手,一直游离于小型公司,写下这篇文章献给那些无法接触到好的学习环境,苦恼自己原地踏步的coder。 1、如何快速熟悉某个行业 作为一个编码多年的程序员&#…...

多路转接(上)——select
目录 一、select接口 1.认识select系统调用 2.对各个参数的认识 二、编写select服务器 1.两个工具类 2.网络套接字封装 3.服务器类编写 4.源文件编写 5.运行 一、select接口 1.认识select系统调用 int select(int nfds, fd_set readfds, fd_set writefds, fd_set ex…...

基于SSM的图书管理借阅系统设计与实现
末尾获取源码 开发语言:Java Java开发工具:JDK1.8 后端框架:SSM 前端:采用JSP技术开发 数据库:MySQL5.7和Navicat管理工具结合 服务器:Tomcat8.5 开发软件:IDEA / Eclipse 是否Maven项目&#x…...
Python的内存优化
在Python中,内存管理和优化是一个复杂的话题,因为它涉及到Python解释器的内部机制,特别是Python的垃圾收集和内存分配策略。Python通过自动垃圾收集机制管理内存,主要包括引用计数和标记-清除算法。 Python内存管理机制ÿ…...
蓝桥杯-回文日期[Java]
目录: 学习目标: 学习内容: 学习时间: 题目: 题目描述: 输入描述: 输出描述: 输入输出样例: 示例 1: 运行限制: 题解: 思路: 学习目标: 刷蓝桥杯题库日记 学习内容: 编号498题目回文日期难度…...
acwing算法基础之搜索与图论--树与图的遍历
目录 1 基础知识2 模板3 工程化 1 基础知识 树和图的存储:邻接矩阵、邻接表。 树和图的遍历:dfs、bfs。 2 模板 树是一种特殊的图(即,无环连通图),与图的存储方式相同。 对于无向图中的边ab,…...
前端uniapp请求真是案例(带源码)
目录 案例一案例二最后 案例一 <template><view class"box"><!-- <view class"title-back" click"backPrivious"><</view> --><!-- <view class"title-back" click"backPrivious"…...

MySQL -- mysql connect
MySQL – mysql connect 文章目录 MySQL -- mysql connect一、Connector/C 使用1.环境安装2.尝试链接mysql client 二、MySQL接口1.初始化2.链接数据库3.下发mysql命令4.获取执行结果5.关闭mysql链接6.在C语言中连接MySQL 三、MySQL图形化界面推荐 使用C接口库来进行连接 一、…...
如何用AI帮你下载安卓源码
以Android 11源码下载流程图如下所示: 1. 安装Git和Repo工具 2. 创建一个工作目录 3. 初始化仓库并下载源码 4. 切换到指定的分支 5. 编译源码 具体步骤如下: 安装Git和Repo工具:在Linux或Mac上,可以通过终端运行以下命令安装Gi…...

第三章:人工智能深度学习教程-基础神经网络(第三节-Tensorflow 中的多层感知器学习)
在本文中,我们将了解多层感知器的概念及其使用 TensorFlow 库在 Python 中的实现。 多层感知器 多层感知也称为MLP。它是完全连接的密集层,可将任何输入维度转换为所需的维度。多层感知是具有多个层的神经网络。为了创建神经网络,我们将神…...

Python的版本如何查询?
要查询Python的版本,可以使用以下方法之一: 1.在命令行中使用python --version命令。这会显示安装在计算机上的Python解释器的版本号。 # Author : 小红牛 # 微信公众号:wdPython2.在Python脚本中使用import sys语句,然后打印sy…...

Git的高效使用 git的基础 高级用法
Git的高效使用 git的基础 高级用法 前言 什么是Git 在日常的软件开发过程中,软件版本的管理都离不开使用Git,Git是一个开源的分布式版本控制系统,可以有效、高速地处理从很小到非常大的项目版本管理。 也是Linus Torvalds为了帮助管理Linu…...
关于主表和子表数据的保存
业务需求: 投注站信息保存在表A里,投注站下的设备信息保存在表B里, 一个投注站会有多个设备,要在一个表单里进行投注站和设备信息的填写,保存,回填,修改。 思路: 1)将…...

如何在后台执行 SwiftData 操作
文章目录 前言Core Data 私有队列上下文SwiftData 并发支持使用 ModelActor合并上下文更改的问题通过标识符访问模型总结 前言 SwiftData 是一个用于处理数据操作的框架,特别是在 Swift 语言中进行并发操作。本文介绍了如何在后台执行 SwiftData 操作以及与 Core D…...

TCP和UPD协议
一)应用层协议简介:根据需求明确要传输的信息,明确要传输的数据格式; 应用层协议:这个协议,实际上是和程序员打交道最多的协议了 1)其它四层都是操作系统,驱动,硬件实现好了的,咱们是不需要管 2)应用层:当我…...

记录一个用了很久的git提交到github和gitee比较方便的方法
在当前git init后,在隐藏的git文件夹中找到config文件 [user]name thels [remote "github"]url your github repository urlfetch refs/heads/*:refs/remotes/origin/* [remote "gitee"]url your gitee repository urlfetch refs/heads/*:…...

手机号在网状态查询接口如何用PHP实现调用?
一、什么是手机号在网状态查询接口 通过精准探测手机号的状态,帮助平台减少此类问题的发生,提供更个性化的服务或进行地域性营销 二、应用场景 1. 金融风控 通过运营商在网态查询接口,金融机构可以核验贷款申请人的手机状态,拦…...
Fullstack 面试复习笔记:HTML / CSS 基础梳理
Fullstack 面试复习笔记:HTML / CSS 基础梳理 之前的笔记: Fullstack 面试复习笔记:操作系统 / 网络 / HTTP / 设计模式梳理Fullstack 面试复习笔记:Java 基础语法 / 核心特性体系化总结Fullstack 面试复习笔记:项目…...

Flutter知识点汇总
Flutter架构解析 1. Flutter 是什么?它与其他移动开发框架有什么不同? Flutter 是 Google 开发的开源移动应用开发框架,可用于快速构建高性能、高保真的移动应用(iOS 和 Android),也支持 Web、桌面和嵌入式设备。。它与其他移动开发框架(如 React Native、Xamarin、原…...

RockyLinux9.6搭建k8s集群
博主介绍:✌全网粉丝5W,全栈开发工程师,从事多年软件开发,在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战,博主也曾写过优秀论文,查重率极低,在这方面有丰富的经验…...

Windows账户管理,修改密码,创建帐户...(无需密码)
前言 我们使用wWindows操作系统时,账户是非常重要的概念 它不仅能够帮助我们区分文档主题权限等等 嗯还有最重要的解锁电脑的作用! 但想要管理他,不仅需要原本的密码,而且设置中的管理项也非常的不全。 Windows有一款netplwi…...

gitlab CI/CD本地部署配置
背景: 代码管理平台切换为公司本地服务器的gitlab server。为了保证commit的代码至少编译ok,也为了以后能拓展test cases,现在先搭建本地gitlab server的CI/CD基本的编译job pipeline。 配置步骤: 先安装gitlab-runner: curl -L "ht…...

python打卡day46@浙大疏锦行
知识点回顾: 不同CNN层的特征图:不同通道的特征图什么是注意力:注意力家族,类似于动物园,都是不同的模块,好不好试了才知道。通道注意力:模型的定义和插入的位置通道注意力后的特征图和热力图 内…...
Go语言爬虫系列教程5:HTML解析技术以及第三方库选择
Go语言爬虫系列教程5:HTML解析技术以及第三方库选择 在上一章中,我们使用正则表达式提取网页内容,但这种方法有局限性。对于复杂的HTML结构,我们需要使用专门的HTML解析库。在这一章中,我们将介绍HTML解析技术以及如何…...

基于LLaMA-Factory和Easy Dataset的Qwen3微调实战:从数据准备到LoRA微调推理评估的全流程指南
随着开源大模型如 LLaMA、Qwen 和 Baichuan 的广泛应用,其基于通用数据的训练方式在特定下游任务和垂直领域中的表现仍存在提升空间,因此衍生出针对具体场景的微调训练需求。这些训练涵盖预训练(PT)、指令微调(SFT&…...