当前位置: 首页 > news >正文

spring boot+netty 搭建MQTT broken

一、项目结构
在这里插入图片描述
二、安装依赖

<!--      netty包  --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.75.Final</version></dependency><!--   常用JSON工具包 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.80</version></dependency><!--mqtt服务端--><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>

三、配置文件

server:port: 8880
netty:mqtt:port: 1883
#    mqtt账号username: admin
#mqtt密码password: 123456
# 日记配置
logging:level:# 开启debug日记打印com.hyx: debug

四、新建具体类
1、新建BootNettyMqttMsgBack类

package com.example.springnettymqtt.MQTTServer.callback;import com.example.springnettymqtt.MQTTServer.config.MQTTServerProperties;
import io.netty.channel.Channel;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;import io.netty.channel.ChannelId;
import io.netty.handler.codec.mqtt.*;import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;import static com.example.springnettymqtt.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceAdd;
import static com.example.springnettymqtt.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceRemove;
import static com.example.springnettymqtt.MQTTServer.server.impl.MQTTServer.*;@Component
@RequiredArgsConstructor
public class BootNettyMqttMsgBack {private static final Logger log =  LoggerFactory.getLogger(BootNettyMqttMsgBack.class);private final MQTTServerProperties MQTTserverProperties;/*** 	确认连接请求* @param channel* @param mqttMessage*/public void connack (Channel channel, MqttMessage mqttMessage) {MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();//	构建返回报文, 可变报头MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeaderInfo.isCleanSession());//	构建返回报文, 固定报头MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.CONNACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);//	构建CONNACK消息体MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);//log.info("back--"+connAck.toString());log.debug("设备上线,channelId:{}", channel.id());MQTTdeviceAdd(channel);channel.writeAndFlush(connAck);}public void disconnack (Channel channel, MqttMessage mqttMessage) {MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();//	构建返回报文, 可变报头MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BANNED, mqttConnectVariableHeaderInfo.isCleanSession());//	构建返回报文, 固定报头MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.DISCONNECT,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);//	构建CONNACK消息体MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);//log.info("back--"+connAck.toString());channel.writeAndFlush(connAck);log.debug("设备下线,channelId:{}", channel.id());MQTTdeviceRemove(channel);}/*** 	根据qos发布确认* @param channel* @param mqttMessage*/public  void puback (Channel channel, MqttMessage mqttMessage) {MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader();MqttQoS qos =  mqttFixedHeaderInfo.qosLevel();//注意:	readableBytes会改变写指针位置,使后续推送数据时,读取数据为空,需要重置	读指针byte[] headBytes = new byte[mqttPublishMessage.payload().readableBytes()];mqttPublishMessage.payload().readBytes(headBytes);String data = new String(headBytes);System.out.println("publish data-->"+data);//重置读取的指针mqttPublishMessage.payload().resetReaderIndex();switch (qos) {case AT_MOST_ONCE: 		//	至多一次//推送到订阅的客户端subscribSend(mqttMessage);break;case AT_LEAST_ONCE:		//	至少一次//	构建返回报文, 可变报头MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());//	构建返回报文, 固定报头MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);//	构建PUBACK消息体MqttPubAckMessage pubAck = new MqttPubAckMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);log.info("back--"+pubAck.toString());channel.writeAndFlush(pubAck);//推送到订阅的客户端subscribSend(mqttMessage);break;case EXACTLY_ONCE:		//	刚好一次//	构建返回报文, 固定报头MqttFixedHeader mqttFixedHeaderBack2 = new MqttFixedHeader(MqttMessageType.PUBREC,false, MqttQoS.AT_LEAST_ONCE,false,0x02);//	构建返回报文, 可变报头MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack2 = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack2,mqttMessageIdVariableHeaderBack2);//服务端收到publis的QoS2的消息之后,服务端需要保存一个msgid的记录,并且进入一个状态,// 即之后不管来了几个这个msgid的消息,都不管他,认为是重复的,丢弃。//接收到publish的QoS2消息之后,不能马上投递给上层,而是在本地做持久化,将消息保存起来。int mqttMessageId=mqttPublishMessage.variableHeader().packetId();if(!mqttMessageIdMap.containsKey(mqttMessageId)){//不存在此消息,将此消息暂存mqttMessageIdMap.put(mqttMessageId, mqttMessage);log.info("消息ID"+mqttMessageId+"-->Qos2级别消息,消息缓存");}else{//重复发送消息,直接返回log.info(mqttPublishMessage.variableHeader().packetId()+"消息重复:"+mqttPublishMessage.fixedHeader().isDup());return;}channel.writeAndFlush(mqttMessageBack);break;default:break;}}/*** 	发布完成 qos2* @param channel* @param mqttMessage*/public  void pubcomp (Channel channel, MqttMessage mqttMessage) {MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();//	构建返回报文, 固定报头MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBCOMP,false, MqttQoS.AT_MOST_ONCE,false,0x02);//	构建返回报文, 可变报头MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack,mqttMessageIdVariableHeaderBack);//log.info("back--"+mqttMessageBack.toString());channel.writeAndFlush(mqttMessageBack);}/*** 	订阅确认* @param channel* @param mqttMessage*/public  void suback(Channel channel, MqttMessage mqttMessage) {MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) mqttMessage;MqttMessageIdVariableHeader messageIdVariableHeader = mqttSubscribeMessage.variableHeader();//	构建返回报文, 可变报头MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());Set<String> topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription -> mqttTopicSubscription.topicName()).collect(Collectors.toSet());//log.info(topics.toString());List<Integer> grantedQoSLevels = new ArrayList<>(topics.size());for (int i = 0; i < topics.size(); i++) {grantedQoSLevels.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value());}//	构建返回报文	有效负载MqttSubAckPayload payloadBack = new MqttSubAckPayload(grantedQoSLevels);//	构建返回报文	固定报头MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2+topics.size());//	构建返回报文	订阅确认MqttSubAckMessage subAck = new MqttSubAckMessage(mqttFixedHeaderBack,variableHeaderBack, payloadBack);channel.writeAndFlush(subAck);}/*** 	取消订阅确认* @param channel* @param mqttMessage*/public  void unsuback(Channel channel, MqttMessage mqttMessage) {MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();//	构建返回报文	可变报头MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());//	构建返回报文	固定报头MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2);//	构建返回报文	取消订阅确认MqttUnsubAckMessage unSubAck = new MqttUnsubAckMessage(mqttFixedHeaderBack,variableHeaderBack);channel.writeAndFlush(unSubAck);}/*** 	心跳响应* @param channel* @param mqttMessage*/public  void pingresp (Channel channel, MqttMessage mqttMessage) {//	心跳响应报文	11010000 00000000  固定报文MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);MqttMessage mqttMessageBack = new MqttMessage(fixedHeader);channel.writeAndFlush(mqttMessageBack);}/*** 订阅推送*/public  void subscribSend(MqttMessage mqttMessage){MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;Object obj=mqttMessage.variableHeader();MqttPublishVariableHeader variableHeader=(MqttPublishVariableHeader)obj;String topicName=variableHeader.topicName();int packetId=variableHeader.packetId();//固定消息头 注意此处的消息类型PUBLISH mqtt协议MqttFixedHeader FixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH,false, MqttQoS.AT_LEAST_ONCE,false,0);//可变消息头MqttPublishVariableHeader mqttPublishVariableHeader=new MqttPublishVariableHeader(topicName,packetId);//推送消息体MqttPublishMessage mqttPublishMessageResult=new MqttPublishMessage(FixedHeader,mqttPublishVariableHeader, mqttPublishMessage.content());log.info("推送地址————》"+topicName);if(subscribeMap.containsKey(topicName)){List<ChannelId> channelList=subscribeMap.get(topicName);for (int i = 0; i < channelList.size(); i++) {//订阅次此topic的Mqtt客户端搜到此消息,Channel channelSub=MQTTdeviceChannelGroup.find(channelList.get(i));//writeAndFlush会将ByteBuf的引用释放refCnt会减去1,使用retain加1mqttPublishMessageResult.retain();channelSub.writeAndFlush(mqttPublishMessageResult);}mqttPublishMessageResult.release();}}/*** 用户鉴权*/public boolean authentication(MqttConnectPayload payload){String username=MQTTserverProperties.getUsername();String password=MQTTserverProperties.getPassword();//无账号或者无密码通过if(stringEmptyCheck(password)||stringEmptyCheck(username)){return true;}else {//消息中账号密码为空if(payload.passwordInBytes()==null||payload.userName()==null){return false;}String passwordAuthen=new String(payload.passwordInBytes());String usernameAuthen=payload.userName();if(password.equals(passwordAuthen)&&username.equals(usernameAuthen)){return true;}else {return false;}}}//判断字符字符为空private boolean stringEmptyCheck(String str){if(str==null||"".equals(str)){return true;}else {return false;}}
}

2、新建MqttChannelInit类

package com.example.springnettymqtt.MQTTServer.channel;import com.example.springnettymqtt.MQTTServer.handler.MQTTMessageHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;@Component
@RequiredArgsConstructor
public class MqttChannelInit extends ChannelInitializer<SocketChannel> {@Autowiredprivate MQTTMessageHandler MQTTmessageHandler;@Overrideprotected void initChannel(SocketChannel channel) {channel.pipeline()// 心跳时间.addLast("idle", new IdleStateHandler(600, 600, 1200, TimeUnit.SECONDS)).addLast("encoder", MqttEncoder.INSTANCE).addLast("decoder", new MqttDecoder()).addLast(MQTTmessageHandler);}
}

3、新建MQTTDeviceManerger类

package com.example.springnettymqtt.MQTTServer.channel;import lombok.extern.slf4j.Slf4j;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import lombok.extern.slf4j.Slf4j;import java.util.List;
import java.util.Map;import static com.example.springnettymqtt.MQTTServer.server.impl.MQTTServer.*;@Slf4j
public class MQTTDeviceManerger {/*** 设备接入*/public static void MQTTdeviceAdd(Channel channel) {if(!MQTTdeviceChannelGroup.contains(channel)) {MQTTdeviceChannelGroup.add(channel);}}/*** 设备移除和和订阅的topic*/public static void MQTTdeviceRemove(Channel channel) {if(MQTTdeviceChannelGroup.contains(channel)) {MQTTdeviceChannelGroup.remove(channel);MQTTremoveDeviceChannelId(channel.id());//移除topic中的这个设备的chanelidfor (Map.Entry<String, List<ChannelId>> listEntry : subscribeMap.entrySet()) {try {if (listEntry.getValue().contains(channel.id())) {listEntry.getValue().remove(channel.id());log.info(channel.id() + "下线,topic:  " + listEntry.getKey() + "中移除此id");}} catch (Exception e) {e.printStackTrace();}}}}public static void MQTTremoveDeviceChannelId(ChannelId channelId) {MQTTdeviceMap.entrySet().removeIf(item -> item.getValue().equals(channelId));}
}

4、新建配置类

package com.example.springnettymqtt.MQTTServer.config;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;@Configuration
@ConfigurationProperties(prefix = MQTTServerProperties.MQTTPREFIX)
@Data
public class MQTTServerProperties {public static final String MQTTPREFIX = "netty.mqtt";/*** 服务器端口*/private Integer port;/*** mqtt服务器用户名*/private String username;/*** mqtt服务器密码*/private String password;
}

5、新建MQTTMessageHandler类

package com.example.springnettymqtt.MQTTServer.handler;import com.example.springnettymqtt.MQTTServer.callback.BootNettyMqttMsgBack;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import io.netty.channel.*;
import io.netty.handler.codec.mqtt.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;import static com.example.springnettymqtt.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceRemove;
import static com.example.springnettymqtt.MQTTServer.server.impl.MQTTServer.*;@Slf4j
@Component
@ChannelHandler.Sharable
@RequiredArgsConstructor
public class MQTTMessageHandler extends ChannelInboundHandlerAdapter {@Autowiredprivate BootNettyMqttMsgBack bootNettyMqttMsgBack;@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (null != msg) {MqttMessage mqttMessage = (MqttMessage) msg;log.info("info--"+mqttMessage.toString());MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();Channel channel = ctx.channel();if(mqttFixedHeader.messageType().equals(MqttMessageType.CONNECT)){//用户鉴权(配置文件中配置账号和密码,如果没有默认)log.warn("正在尝试鉴权");boolean authentag=  bootNettyMqttMsgBack.authentication((MqttConnectPayload)mqttMessage.payload());if(!authentag){return;}//	在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接if(MQTTdeviceChannelGroup.contains(channel)){//移除次设备channel和topicbootNettyMqttMsgBack.disconnack(channel,mqttMessage);}//	to do 建议connect消息单独处理,用来对客户端进行认证管理等 这里直接返回一个CONNACK消息bootNettyMqttMsgBack.connack(channel, mqttMessage);}//对于没有鉴权的设备,请求不处理if(!MQTTdeviceChannelGroup.contains(channel)){log.warn(channel.id()+"无鉴权操作");return;}switch (mqttFixedHeader.messageType()){case PUBLISH:		//	客户端发布消息//	PUBACK报文是对QoS 1等级的PUBLISH报文的响应bootNettyMqttMsgBack.puback(channel, mqttMessage);break;// PUBREL	Qos2级别消息,客户端返回case PUBREL://	PUBREL(客户端发给服务端)报文是对PUBREC(服务端发给客户端)报文的响应//服务端收到pubrel之后,正式将消息投递给上层应用层。MqttMessageIdVariableHeader VariableHeader=(MqttMessageIdVariableHeader)mqttMessage.variableHeader();if(mqttMessageIdMap.containsKey(VariableHeader.messageId())) {log.warn("移除消息缓存-->消息id"+VariableHeader.messageId());bootNettyMqttMsgBack.subscribSend(mqttMessageIdMap.get(VariableHeader.messageId()));bootNettyMqttMsgBack.pubcomp(channel, mqttMessage);mqttMessageIdMap.remove(VariableHeader.messageId());}else {//后续多次收到REL消息,制作comp响应bootNettyMqttMsgBack.pubcomp(channel, mqttMessage);}break;case SUBSCRIBE:		//	客户端订阅主题//	客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅,每个订阅注册客户端关心的一个或多个主题。//	为了将应用消息转发给与那些订阅匹配的主题,服务端发送PUBLISH报文给客户端。//	SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端// 	to dobootNettyMqttMsgBack.suback(channel, mqttMessage);MqttSubscribePayload SubscribePayload=(MqttSubscribePayload) mqttMessage.payload();;for (int i = 0; i < SubscribePayload.topicSubscriptions().size(); i++) {String topicname=SubscribePayload.topicSubscriptions().get(i).topicName();boolean tag=subscribeMap.containsKey(topicname);if(tag){List<ChannelId> channelIds=subscribeMap.get(topicname);if(!channelIds.contains(channel.id())) {channelIds.add(channel.id());}else {log.warn(channel.id()+"重复订阅");}subscribeMap.put(topicname, channelIds);}else {List<ChannelId> channelIds=new ArrayList<>();channelIds.add(channel.id());subscribeMap.put(topicname,channelIds);}log.info(channel.id()+"订阅地址————》"+topicname);}break;case UNSUBSCRIBE:	//	客户端取消订阅//	客户端发送UNSUBSCRIBE报文给服务端,用于取消订阅主题//	to dobootNettyMqttMsgBack.unsuback(channel, mqttMessage);Object Unsubscribe=mqttMessage.payload();MqttUnsubscribePayload unsubscribePayload=(MqttUnsubscribePayload)Unsubscribe;int len=unsubscribePayload.topics().size();for (int i = 0; i < len; i++) {String topicname=unsubscribePayload.topics().get(i);boolean tag=subscribeMap.containsKey(topicname);if(tag){List<ChannelId> channelIds=subscribeMap.get(topicname);channelIds.remove(channel.id());subscribeMap.put(topicname,channelIds);}else {log.error("不存在订阅地址——>"+topicname);}log.info(channel.id()+"取消订阅地址————》"+topicname);}break;case PINGREQ:		//	客户端发起心跳//	客户端发送PINGREQ报文给服务端的//	在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着//	请求服务端发送 响应确认它还活着,使用网络以确认网络连接没有断开bootNettyMqttMsgBack.pingresp(channel, mqttMessage);break;case DISCONNECT:	//	客户端主动断开连接log.debug("设备下线,channelId:{}", channel.id());MQTTdeviceRemove(channel);//	DISCONNECT报文是客户端发给服务端的最后一个控制报文, 服务端必须验证所有的保留位都被设置为0//	to dobreak;default:break;}}else {return;}}/*** 	从客户端收到新的数据、读取完成时调用*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws IOException {}/*** 	客户端与服务端第一次建立连接时执行 在channelActive方法之前执行*/@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {super.channelRegistered(ctx);}/*** 	客户端与服务端 断连时执行 channelInactive方法之后执行*/@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {log.warn(ctx.channel().id()+"连接断开");MQTTdeviceRemove(ctx.channel());super.channelUnregistered(ctx);}/*** 	当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {Channel channel = ctx.channel();log.warn(channel.id()+"连接异常断开。。。。。。。");MQTTdeviceRemove(ctx.channel());super.exceptionCaught(ctx, cause);if(channel.isActive()){ctx.close();}}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {log.debug("\n");}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {super.channelActive(ctx);}/*** 	服务端 当读超时时 会调用这个方法*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {super.userEventTriggered(ctx, evt);ctx.close();}@Overridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {super.channelWritabilityChanged(ctx);}
}

6、新建接口IMQTTServer及其实现类MQTTServer

package com.example.springnettymqtt.MQTTServer.server;import javax.annotation.PreDestroy;public interface IMQTTServer {/*** 主启动程序,初始化参数** @throws Exception 初始化异常*/void start() throws Exception;/*** 优雅的结束服务器** @throws InterruptedException 提前中断异常*/@PreDestroyvoid destroy() throws InterruptedException;
}
package com.example.springnettymqtt.MQTTServer.server.impl;import com.example.springnettymqtt.MQTTServer.channel.MqttChannelInit;
import com.example.springnettymqtt.MQTTServer.config.MQTTServerProperties;
import com.example.springnettymqtt.MQTTServer.server.IMQTTServer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;@Component
@Slf4j
@RequiredArgsConstructor
public class MQTTServer implements IMQTTServer {private final MqttChannelInit mqttChannelInit;private final MQTTServerProperties MQTTserverProperties;//保存接入的MQTT设备channelpublic static ChannelGroup MQTTdeviceChannelGroup;//保存订阅地址和chanelid,当推送数据时,会向此订阅地址的多个channel发送数据public static Map<String, List<ChannelId>> subscribeMap =new ConcurrentHashMap();//保存设备名称和通道编号,向设备发送消息可以通过名称找到通道public static ConcurrentHashMap<String, ChannelId> MQTTdeviceMap = new ConcurrentHashMap<>();//存放Qos2消息等级的消息ID,这里可使用redis之类的工具做缓存,为了简化配置,使用map暂存public static ConcurrentHashMap<Integer, MqttMessage> mqttMessageIdMap=new ConcurrentHashMap();private EventLoopGroup bossGroup;private EventLoopGroup workerGroup;@Overridepublic void start() {log.info("初始化 Mqttserver ...");bossGroup = new NioEventLoopGroup();workerGroup =  new NioEventLoopGroup();MQTTdeviceChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);this.MqttServer();}/*** 初始化*/private void MqttServer() {try {new ServerBootstrap().group(bossGroup, workerGroup).channel( NioServerSocketChannel.class ).localAddress(new InetSocketAddress(MQTTserverProperties.getPort()))// 配置 编码器、解码器、业务处理.childHandler(mqttChannelInit)// tcp缓冲区.option(ChannelOption.SO_BACKLOG, 128)// 将网络数据积累到一定的数量后,服务器端才发送出去,会造成一定的延迟。希望服务是低延迟的,建议将TCP_NODELAY设置为true.childOption(ChannelOption.TCP_NODELAY, true)// 保持长连接.childOption(ChannelOption.SO_KEEPALIVE, true)// 绑定端口,开始接收进来的连接.bind().sync();log.info("MQTT服务启动成功!开始监听端口:{}", MQTTserverProperties.getPort());} catch (Exception e) {e.printStackTrace();bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}/*** 销毁*/@PreDestroy@Overridepublic void destroy() {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}
}

7、新建启动类

package com.example.springnettymqtt.startServer;import com.example.springnettymqtt.MQTTServer.server.impl.MQTTServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Component
public class StartSrver {@Autowiredprivate MQTTServer mqttServer;@PostConstructpublic void startNetty(){new Thread(() -> {try {mqttServer.start();} catch (Exception e) {e.printStackTrace();}}).start();}
}

相关文章:

spring boot+netty 搭建MQTT broken

一、项目结构 二、安装依赖 <!-- netty包 --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.75.Final</version></dependency><!-- 常用JSON工具包 --><…...

从零开始搭建React+TypeScript+webpack开发环境-使用iconfont构建图标库

创建iconfont项目 进入iconfont官网&#xff0c;完成注册流程&#xff0c;即可创建项目。 无法访问iconfont可尝试将电脑dns改为阿里云镜像223.5.5.5和223.6.6.6 添加图标 在图标库里选择图标&#xff0c;加入购物车 将图标添加到之前创建的项目中 生成代码 将代码配置到项目…...

微服务之初始微服务

文章目录 一、服务架构演变1.单体架构2.分布式架构 二、认识微服务三、总结四、微服务技术对比五、SpringCloud注意 一、服务架构演变 1.单体架构 单体架构&#xff1a;将业务的所有功能集中在一个项目中开发&#xff0c;打成一个包部署。 优点&#xff1a; 架构简单部署成本…...

大口径智能水表支持最高水流量是多少?

随着科技的不断发展&#xff0c;我国城市化进程的加快&#xff0c;水资源管理日益受到重视。作为一种先进的用水计量设备&#xff0c;大口径智能水表凭借其高精度、低误差、远程抄表等优点&#xff0c;在市场上备受青睐。那么接下来&#xff0c;小编就来为大家详细的介绍一下大…...

在Spring Boot中使用MyBatis访问数据库

MyBatis&#xff0c;这个对各位使用Java开发的开发者来说还是蛮重要的&#xff0c;我相信诸位在企业开发项目的时候&#xff0c;大多数采用的是Mybatis。使用MyBatis帮助我们解决各种问题&#xff0c;实际上这篇文章&#xff0c;基本上默认为可以跳过的一篇&#xff0c;但是为了…...

懒羊羊闲话2

前言&#xff1a; 笔者谈不上是某个领域的高手&#xff0c;也不是大厂的某个神秘高手&#xff0c;一直游离于小型公司&#xff0c;写下这篇文章献给那些无法接触到好的学习环境&#xff0c;苦恼自己原地踏步的coder。 1、如何快速熟悉某个行业 作为一个编码多年的程序员&#…...

多路转接(上)——select

目录 一、select接口 1.认识select系统调用 2.对各个参数的认识 二、编写select服务器 1.两个工具类 2.网络套接字封装 3.服务器类编写 4.源文件编写 5.运行 一、select接口 1.认识select系统调用 int select(int nfds, fd_set readfds, fd_set writefds, fd_set ex…...

基于SSM的图书管理借阅系统设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;采用JSP技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#x…...

Python的内存优化

在Python中&#xff0c;内存管理和优化是一个复杂的话题&#xff0c;因为它涉及到Python解释器的内部机制&#xff0c;特别是Python的垃圾收集和内存分配策略。Python通过自动垃圾收集机制管理内存&#xff0c;主要包括引用计数和标记-清除算法。 Python内存管理机制&#xff…...

蓝桥杯-回文日期[Java]

目录: 学习目标&#xff1a; 学习内容&#xff1a; 学习时间&#xff1a; 题目&#xff1a; 题目描述: 输入描述: 输出描述: 输入输出样例: 示例 1: 运行限制: 题解: 思路: 学习目标&#xff1a; 刷蓝桥杯题库日记 学习内容&#xff1a; 编号498题目回文日期难度…...

acwing算法基础之搜索与图论--树与图的遍历

目录 1 基础知识2 模板3 工程化 1 基础知识 树和图的存储&#xff1a;邻接矩阵、邻接表。 树和图的遍历&#xff1a;dfs、bfs。 2 模板 树是一种特殊的图&#xff08;即&#xff0c;无环连通图&#xff09;&#xff0c;与图的存储方式相同。 对于无向图中的边ab&#xff0c;…...

前端uniapp请求真是案例(带源码)

目录 案例一案例二最后 案例一 <template><view class"box"><!-- <view class"title-back" click"backPrivious"><</view> --><!-- <view class"title-back" click"backPrivious"…...

MySQL -- mysql connect

MySQL – mysql connect 文章目录 MySQL -- mysql connect一、Connector/C 使用1.环境安装2.尝试链接mysql client 二、MySQL接口1.初始化2.链接数据库3.下发mysql命令4.获取执行结果5.关闭mysql链接6.在C语言中连接MySQL 三、MySQL图形化界面推荐 使用C接口库来进行连接 一、…...

如何用AI帮你下载安卓源码

以Android 11源码下载流程图如下所示&#xff1a; 1. 安装Git和Repo工具 2. 创建一个工作目录 3. 初始化仓库并下载源码 4. 切换到指定的分支 5. 编译源码 具体步骤如下&#xff1a; 安装Git和Repo工具&#xff1a;在Linux或Mac上&#xff0c;可以通过终端运行以下命令安装Gi…...

第三章:人工智能深度学习教程-基础神经网络(第三节-Tensorflow 中的多层感知器学习)

在本文中&#xff0c;我们将了解多层感知器的概念及其使用 TensorFlow 库在 Python 中的实现。 多层感知器 多层感知也称为MLP。它是完全连接的密集层&#xff0c;可将任何输入维度转换为所需的维度。多层感知是具有多个层的神经网络。为了创建神经网络&#xff0c;我们将神…...

Python的版本如何查询?

要查询Python的版本&#xff0c;可以使用以下方法之一&#xff1a; 1.在命令行中使用python --version命令。这会显示安装在计算机上的Python解释器的版本号。 # Author : 小红牛 # 微信公众号&#xff1a;wdPython2.在Python脚本中使用import sys语句&#xff0c;然后打印sy…...

Git的高效使用 git的基础 高级用法

Git的高效使用 git的基础 高级用法 前言 什么是Git 在日常的软件开发过程中&#xff0c;软件版本的管理都离不开使用Git&#xff0c;Git是一个开源的分布式版本控制系统&#xff0c;可以有效、高速地处理从很小到非常大的项目版本管理。 也是Linus Torvalds为了帮助管理Linu…...

关于主表和子表数据的保存

业务需求&#xff1a; 投注站信息保存在表A里&#xff0c;投注站下的设备信息保存在表B里&#xff0c; 一个投注站会有多个设备&#xff0c;要在一个表单里进行投注站和设备信息的填写&#xff0c;保存&#xff0c;回填&#xff0c;修改。 思路&#xff1a; 1&#xff09;将…...

如何在后台执行 SwiftData 操作

文章目录 前言Core Data 私有队列上下文SwiftData 并发支持使用 ModelActor合并上下文更改的问题通过标识符访问模型总结 前言 SwiftData 是一个用于处理数据操作的框架&#xff0c;特别是在 Swift 语言中进行并发操作。本文介绍了如何在后台执行 SwiftData 操作以及与 Core D…...

TCP和UPD协议

一)应用层协议简介:根据需求明确要传输的信息&#xff0c;明确要传输的数据格式&#xff1b; 应用层协议:这个协议&#xff0c;实际上是和程序员打交道最多的协议了 1)其它四层都是操作系统&#xff0c;驱动&#xff0c;硬件实现好了的&#xff0c;咱们是不需要管 2)应用层:当我…...

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式

一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明&#xff1a;假设每台服务器已…...

stm32G473的flash模式是单bank还是双bank?

今天突然有人stm32G473的flash模式是单bank还是双bank&#xff1f;由于时间太久&#xff0c;我真忘记了。搜搜发现&#xff0c;还真有人和我一样。见下面的链接&#xff1a;https://shequ.stmicroelectronics.cn/forum.php?modviewthread&tid644563 根据STM32G4系列参考手…...

Spark 之 入门讲解详细版(1)

1、简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室&#xff08;Algorithms, Machines, and People Lab&#xff09;开发通用内存并行计算框架。Spark在2013年6月进入Apache成为孵化项目&#xff0c;8个月后成为Apache顶级项目&#xff0c;速度之快足见过人之处&…...

React hook之useRef

React useRef 详解 useRef 是 React 提供的一个 Hook&#xff0c;用于在函数组件中创建可变的引用对象。它在 React 开发中有多种重要用途&#xff0c;下面我将全面详细地介绍它的特性和用法。 基本概念 1. 创建 ref const refContainer useRef(initialValue);initialValu…...

数据链路层的主要功能是什么

数据链路层&#xff08;OSI模型第2层&#xff09;的核心功能是在相邻网络节点&#xff08;如交换机、主机&#xff09;间提供可靠的数据帧传输服务&#xff0c;主要职责包括&#xff1a; &#x1f511; 核心功能详解&#xff1a; 帧封装与解封装 封装&#xff1a; 将网络层下发…...

Springcloud:Eureka 高可用集群搭建实战(服务注册与发现的底层原理与避坑指南)

引言&#xff1a;为什么 Eureka 依然是存量系统的核心&#xff1f; 尽管 Nacos 等新注册中心崛起&#xff0c;但金融、电力等保守行业仍有大量系统运行在 Eureka 上。理解其高可用设计与自我保护机制&#xff0c;是保障分布式系统稳定的必修课。本文将手把手带你搭建生产级 Eur…...

vue3 定时器-定义全局方法 vue+ts

1.创建ts文件 路径&#xff1a;src/utils/timer.ts 完整代码&#xff1a; import { onUnmounted } from vuetype TimerCallback (...args: any[]) > voidexport function useGlobalTimer() {const timers: Map<number, NodeJS.Timeout> new Map()// 创建定时器con…...

NFT模式:数字资产确权与链游经济系统构建

NFT模式&#xff1a;数字资产确权与链游经济系统构建 ——从技术架构到可持续生态的范式革命 一、确权技术革新&#xff1a;构建可信数字资产基石 1. 区块链底层架构的进化 跨链互操作协议&#xff1a;基于LayerZero协议实现以太坊、Solana等公链资产互通&#xff0c;通过零知…...

微信小程序云开发平台MySQL的连接方式

注&#xff1a;微信小程序云开发平台指的是腾讯云开发 先给结论&#xff1a;微信小程序云开发平台的MySQL&#xff0c;无法通过获取数据库连接信息的方式进行连接&#xff0c;连接只能通过云开发的SDK连接&#xff0c;具体要参考官方文档&#xff1a; 为什么&#xff1f; 因为…...

c#开发AI模型对话

AI模型 前面已经介绍了一般AI模型本地部署&#xff0c;直接调用现成的模型数据。这里主要讲述讲接口集成到我们自己的程序中使用方式。 微软提供了ML.NET来开发和使用AI模型&#xff0c;但是目前国内可能使用不多&#xff0c;至少实践例子很少看见。开发训练模型就不介绍了&am…...