Netty学习example示例
文章目录
- simple
- Server端
- NettyServer
- NettyServerHandler
- Client端
- NettyClient
- NettyClientHandler
- tcp(粘包和拆包)
- Server端
- NettyTcpServer
- NettyTcpServerHandler
- Client端
- NettyTcpClient
- NettyTcpClientHandler
- protocol
- codec
- CustomMessageDecoder
- CustomMessageEncoder
- server端
- ProtocolServer
- ProtocolServerHandler
- client端
- ProtocolClient
- ProtocolClientHandler
- http
- Server端
- HttpServer
- HttpServerHandler
- Client端
- HttpClient
- HttpClientHandler
- ws
- Server端
- WsServer
- WsServerHandler
- Client端
- WsClient
- WebSocketClientHandler
- protobuf
- Server端
- NettyServer
- NettyServerHandler
- Student.proto
- Client端
- NettyClient
- NettyClientHandler
simple
Server端
NettyServer
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;import java.net.InetSocketAddress;@Slf4j
public class NettyServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler()).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(new NettyServerHandler());}});ChannelFuture bindFuture = serverBootstrap.bind(new InetSocketAddress(8888));Channel channel = bindFuture.sync().channel();log.info("server start");channel.closeFuture().sync();log.info("server stop");} catch (Exception e) {log.info("server error", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
NettyServerHandler
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {public void channelRead(io.netty.channel.ChannelHandlerContext ctx, Object msg) throws Exception {log.info("服务端接收到客户端数据:{}", msg);ctx.writeAndFlush("服务端收到客户端的数据: " + msg);}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {log.info("【NettyServerHandler->userEventTriggered】: {}", evt);}public void exceptionCaught(io.netty.channel.ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("exceptionCaught异常:", cause);ctx.close();}public void handlerAdded(io.netty.channel.ChannelHandlerContext ctx) throws Exception {log.info("handlerAdded:{}", ctx.channel().remoteAddress());}public void handlerRemoved(io.netty.channel.ChannelHandlerContext ctx) throws Exception {log.info("handlerRemoved:{}", ctx.channel().remoteAddress());}public void channelRegistered(io.netty.channel.ChannelHandlerContext ctx) throws Exception {log.info("channelRegistered:{}", ctx.channel().remoteAddress());}public void channelUnregistered(io.netty.channel.ChannelHandlerContext ctx) throws Exception {log.info("channelUnregistered:{}", ctx.channel().remoteAddress());}public void channelActive(io.netty.channel.ChannelHandlerContext ctx) throws Exception {log.info("客户端连接:{}", ctx.channel().remoteAddress());}public void channelInactive(io.netty.channel.ChannelHandlerContext ctx) throws Exception {log.info("客户端断开连接:{}", ctx.channel().remoteAddress());}}
Client端
NettyClient
@Slf4j
public class NettyClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup(1);try {Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(new NettyClientHandler());}});ChannelFuture connectFuture = bootstrap.connect("127.0.0.1", 8888);Channel channel = connectFuture.sync().channel();System.out.println("客户端连接成功");Scanner sc = new Scanner(System.in);while (true) {System.out.println("请输入内容: ");String line = sc.nextLine();if (line == null || line.isEmpty()) {continue;} else if ("exit".equals(line)) {channel.close();break;}channel.writeAndFlush(line);}channel.closeFuture().sync();System.out.println("客户端关闭");} catch (Exception e) {log.error("客户端发生异常: ", e);}}}
NettyClientHandler
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info( "【NettyClientHandler->channelRead】: {}", msg);}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {log.info( "【NettyClientHandler->userEventTriggered】: {}", evt);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info( "异常: {}", cause.getMessage());ctx.close();}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->handlerAdded】: {}", ctx);}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->handlerRemoved】: {}", ctx);}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelRegistered】: {}", ctx);}@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelUnregistered】: {}", ctx);}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelActive】: {}", ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelInactive】: {}", ctx);}
}
tcp(粘包和拆包)
Server端
NettyTcpServer
@Slf4j
public class NettyTcpServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup(1);try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new NettyTcpServerHandler());}});ChannelFuture bindFuture = serverBootstrap.bind("127.0.0.1", 9090);Channel channel = bindFuture.sync().channel();log.info("server start");channel.closeFuture().sync();} catch (Exception e) {log.info("server error", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}}
NettyTcpServerHandler
@Slf4j
public class NettyTcpServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;byte[] bytes = new byte[byteBuf.readableBytes()];byteBuf.readBytes(bytes);String content = new String(bytes, StandardCharsets.UTF_8);log.info("服务端接收到的数据字节长度为:{}, 内容为: {}", bytes.length, content);ByteBuf buf = Unpooled.copiedBuffer(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8));ctx.writeAndFlush(buf);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info("异常: {}", cause.getMessage());ctx.close();}
}
Client端
NettyTcpClient
@Slf4j
public class NettyTcpClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup(1);try {Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new NettyTcpClientHandler());}});ChannelFuture connectFuture = bootstrap.connect(new InetSocketAddress("127.0.0.1", 9090));Channel channel = connectFuture.sync().channel();channel.closeFuture().sync();} catch (Exception e) {log.error("客户端发生异常: ", e);} finally {group.shutdownGracefully();}}}
NettyTcpClientHandler
@Slf4j
public class NettyTcpClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;log.info("客户端接收到数据:{}", byteBuf.toString(StandardCharsets.UTF_8));}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {/*粘包:1. 这里连续发送10次byteBuf,发现服务端有可能1次就全部接收了,也有可能3次接受了,也有可能4次接收了,这是不确定的,这也就意味着基于底层NIO的tcp的数据传输 是基于流式传输的,会出现粘包的问题。2. 因此服务端必须 自行处理粘包问题,区分消息边界3. 这里测试的时候,可以多启动几个客户端来观察4. 这里示例的粘包示例与上面simple的区别在于:这里是在短时间内连续发送*//*for (int i = 0; i < 10; i++) {ByteBuf byteBuf = Unpooled.copiedBuffer(("hello, server " + i).getBytes(StandardCharsets.UTF_8));ctx.writeAndFlush(byteBuf);}*//*拆包:1. 这里1次发送了1个10000字节长的数据,而服务端分多次收到,有可能是2次,有可能是1次, 这是不确定的,2. 假设真实数据包就有这么长,那么服务端可能需要分多次才能接收到完整的数据包,3. 同时,我们发现总的数据长度服务端都接收到了,这说明底层NIO的tcp的数据传输 是可靠的4. 1条比较长的消息,服务端分多次才能收到,所以服务端需要解决拆包的问题,将多次接收到的消息转为1条完整的消息5. 这里示例的拆包示例与上面simple的区别在于:这里1次发送的消息数据很长*/StringBuilder sb = new StringBuilder();for (int i = 0; i < 1000; i++) {sb.append("Netty拆包示例|");}ctx.writeAndFlush(Unpooled.copiedBuffer(sb.toString().getBytes(StandardCharsets.UTF_8)));log.info("客户端发送数据长度:{}", sb.toString().length());/* 拆包 与 粘包 的核心问题就是 tcp是流式传输的,tcp可以保证数据可靠传输,但需要对方在接收时需要能区分出消息边界,从而获取1条完整的消息 */}}
protocol
codec
使用自定义协议,编解码器,识别消息边界,处理粘包和拆包问题
CustomMessageDecoder
public class CustomMessageDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {if (in.readableBytes() < 4) {return;}in.markReaderIndex();int len = in.readInt();if (in.readableBytes() < len) {in.resetReaderIndex();return;}byte[] bytes = new byte[len];in.readBytes(bytes);out.add(CustomMessage.builder().len(len).content(bytes).build());}
}
CustomMessageEncoder
public class CustomMessageEncoder extends MessageToByteEncoder<CustomMessage> {@Overrideprotected void encode(ChannelHandlerContext ctx, CustomMessage msg, ByteBuf out) {out.writeInt(msg.getLen());out.writeBytes(msg.getContent());}
}
server端
ProtocolServer
@Slf4j
public class ProtocolServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new CustomMessageDecoder());pipeline.addLast(new CustomMessageEncoder());pipeline.addLast(new ProtocolServerHandler());}});ChannelFuture bindFuture = serverBootstrap.bind(new InetSocketAddress("127.0.0.1", 9090));Channel channel = bindFuture.sync().channel();log.info("server start");channel.closeFuture().sync();} catch (Exception e) {log.info("server error", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}log.info("server stop");}}
ProtocolServerHandler
@Slf4j
public class ProtocolServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 这里直接转, 如果不能转的话, 就说明前面的解码有问题CustomMessage customMessage = (CustomMessage) msg;log.info("服务端收到消息: {}, {}", customMessage.getLen(), new String(customMessage.getContent()));// 将消息回过去(需要加上对应的编码器)ctx.writeAndFlush(customMessage);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info("ProtocolServerHandler异常: {}", cause.getMessage());ctx.close();}
}
client端
ProtocolClient
@Slf4j
public class ProtocolClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup(1);try {Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new CustomMessageEncoder());pipeline.addLast(new CustomMessageDecoder());pipeline.addLast(new ProtocolClientHandler());}});ChannelFuture connectFuture = bootstrap.connect("localhost", 9090);Channel channel = connectFuture.sync().channel();channel.closeFuture().sync();} catch (Exception e) {log.info("client error", e);} finally {group.shutdownGracefully();}}}
ProtocolClientHandler
@Slf4j
public class ProtocolClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 这里直接转, 如果不能转的话, 就说明前面的解码有问题CustomMessage customMessage = (CustomMessage) msg;log.info("客户端收到消息: {}, {}", customMessage.getLen(), new String(customMessage.getContent()));}@Overridepublic void channelActive(ChannelHandlerContext ctx) {for (int i = 1; i <= 20; i++) {byte[] bytes = ("hello, server " + i).getBytes(StandardCharsets.UTF_8);CustomMessage message = CustomMessage.builder().content(bytes).len(bytes.length).build();ctx.writeAndFlush(message);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.error("【ProtocolClientHandler->exceptionCaught】: {}", cause.getMessage());}
}
http
Server端
HttpServer
@Slf4j
public class HttpServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler("【服务端主】")).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("loggingHandler", new LoggingHandler("【服务端从】"));pipeline.addLast("httpServerCodec", new HttpServerCodec());pipeline.addLast("aggregator", new HttpObjectAggregator(10 * 1024 * 1024));pipeline.addLast("httpServerHandler", new HttpServerHandler());}});ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(8080));channelFuture.sync();log.info("http服务器启动成功, 您可以访问: http://localhost:8080/test");channelFuture.channel().closeFuture().sync();} catch (Exception e) {log.info("服务端发生异常: ", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}}
HttpServerHandler
@Slf4j
public class HttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {log.info("【HttpServerHandler->处理】:{}", msg);if (msg instanceof FullHttpRequest) {FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;String uri = fullHttpRequest.uri();log.info("【uri】:{}", uri);HttpMethod method = fullHttpRequest.method();log.info("【method】:{}", method);// 响应回去byte[] bytes = ("服务器收到时间" + LocalDateTime.now()).getBytes(StandardCharsets.UTF_8);DefaultFullHttpResponse fullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK,Unpooled.copiedBuffer(bytes));fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, bytes.length);fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8");ChannelPromise promise = ctx.newPromise();promise.addListener(new GenericFutureListener<Future<? super Void>>() {@Overridepublic void operationComplete(Future<? super Void> future) throws Exception {log.info("操作完成");log.info("isDone: {}", future.isDone());log.info("isSuccess: {}", future.isSuccess());log.info("isCancelled: {}", future.isCancelled());log.info("hasException: {}", future.cause() != null, future.cause());}});ctx.writeAndFlush(fullHttpResponse, promise);log.info("刚刚写完");}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {log.error("【HttpServerHandler->userEventTriggered】:{}", evt);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("【HttpServerHandler->exceptionCaught】", cause);}public void channelRegistered(ChannelHandlerContext ctx) {log.info("【HttpServerHandler->channelRegistered】");}public void channelUnregistered(ChannelHandlerContext ctx) {log.info("【HttpServerHandler->channelUnregistered】");}public void handlerAdded(ChannelHandlerContext ctx) {log.info("【HttpServerHandler->handlerAdded】");}public void handlerRemoved(ChannelHandlerContext ctx) {log.info("【HttpServerHandler->handlerRemoved】");}public void channelActive(ChannelHandlerContext ctx) {log.info("【HttpServerHandler->channelActive】");}public void channelInactive(ChannelHandlerContext ctx) {log.info("【HttpServerHandler->channelInactive】");}}
Client端
HttpClient
@Slf4j
public class HttpClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup(1);try {Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("loggingHandler", new LoggingHandler(LogLevel.DEBUG));pipeline.addLast("httpClientCodec", new HttpClientCodec());pipeline.addLast("", new HttpObjectAggregator(10 * 1024));pipeline.addLast("httpClientHandler", new HttpClientHandler());}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080);channelFuture.sync();Channel channel = channelFuture.channel();sendGetRequest(channel);// 等待通道关闭channelFuture.channel().closeFuture().sync();} catch (Exception e) {log.info("客户端发生异常: ", e);} finally {// 遇到问题, 调用此方法后客户端没有正常关闭, 将netty版本4.1.20.FINAL切换到4.1.76.FINAL即可group.shutdownGracefully();log.info("关闭group-finally");}log.info("客户端执行完毕");}private static void sendGetRequest(Channel channel) throws URISyntaxException {String url = "http://localhost:8080/test"; // 测试URLURI uri = new URI(url);String host = uri.getHost();String path = uri.getRawPath() + (uri.getRawQuery() == null ? "" : "?" + uri.getRawQuery());// 构建HTTP请求FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,HttpMethod.GET,path,Unpooled.EMPTY_BUFFER);request.headers().set(HttpHeaderNames.HOST, host).set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE).set(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP);// 发送请求ChannelFuture channelFuture = channel.writeAndFlush(request);log.info("Request sent: " + request);}}
HttpClientHandler
@Slf4j
public class HttpClientHandler extends SimpleChannelInboundHandler<FullHttpResponse> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse response) {// 处理响应log.info("处理响应, 响应头: {}", response.headers().toString());log.info("处理响应, 响应体: {}", response.content().toString(CharsetUtil.UTF_8));// 关闭连接ctx.channel().close();log.info("关闭连接");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.info( "异常: {}", cause.getMessage());ctx.close();}
}
ws
Server端
WsServer
@Slf4j
public class WsServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler()).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("httpServerCodec", new HttpServerCodec());pipeline.addLast("aggregator", new HttpObjectAggregator(10 * 1024 * 1024));WebSocketServerProtocolConfig config = WebSocketServerProtocolConfig.newBuilder().websocketPath("/ws").checkStartsWith(true).build();pipeline.addLast("wsProtocolHandler", new WebSocketServerProtocolHandler(config));pipeline.addLast("wsServerHandler", new WsServerHandler());}});ChannelFuture channelFuture = serverBootstrap.bind("127.0.0.1", 9090);channelFuture.sync();log.info("ws服务启动成功");channelFuture.channel().closeFuture().sync();} catch (Exception e) {log.error("服务端发生异常: ", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}log.info("ws服务关闭");}}
WsServerHandler
@Slf4j
public class WsServerHandler extends SimpleChannelInboundHandler<WebSocketFrame> {private static Map<String, Channel> CHANNELS = new ConcurrentHashMap<>();private static AttributeKey<String> ATTRIBUTE_KEY_TOKEN = AttributeKey.valueOf("token");private static AttributeKey<Boolean> ATTRIBUTE_KEY_REPEAT = AttributeKey.valueOf("repeat");@Overrideprotected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame webSocketFrame) throws Exception {log.info("【WsServerHandler->处理】:{}", webSocketFrame);if (webSocketFrame instanceof TextWebSocketFrame) {TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) webSocketFrame;log.info("【textWebSocketFrame.text()】:{}", textWebSocketFrame.text());sendAll(ctx.channel(), textWebSocketFrame.text());}}private void sendAll(Channel channel, String text) {CHANNELS.forEach((token, ch) -> {if (channel != ch) {ch.writeAndFlush(new TextWebSocketFrame(text));}});}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {log.info("【WsServerHandler->userEventTriggered】: {}", evt);if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {WebSocketServerProtocolHandler.HandshakeComplete handshakeComplete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;String requestUri = handshakeComplete.requestUri();String subprotocol = handshakeComplete.selectedSubprotocol();log.info("【requestUri】:{}", requestUri);log.info("【subprotocol】:{}", subprotocol);handleAuth(requestUri, ctx);}}private void handleAuth(String requestUri, ChannelHandlerContext ctx) {try {Map<String, String> queryParams = getQueryParams(requestUri);String token = queryParams.get("token");log.info("【token】:{}", token);if (token == null) {ctx.close();log.info("token为空, 关闭channel");} else {ctx.channel().attr(ATTRIBUTE_KEY_TOKEN).set(token);Channel oldChannel = CHANNELS.put(token, ctx.channel());if (oldChannel != null) {oldChannel.attr(ATTRIBUTE_KEY_REPEAT).set(true);oldChannel.close();} else {sendAll(ctx.channel(), "欢迎" + token + "进入聊天室");}}} catch (Exception e) {ctx.close();}}private static Map<String, String> getQueryParams(String requestUri) throws URISyntaxException {URI uri = new URI(requestUri);String query = uri.getQuery();Map<String, String> queryParams = new HashMap<>();if (query != null) {String[] params = query.split("&");for (String param : params) {String[] keyValue = param.split("=");String key = keyValue[0];String value = keyValue.length > 1 ? keyValue[1] : "";queryParams.put(key, value);}}return queryParams;}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("【WsServerHandler->exceptionCaught】", cause);}public void handlerAdded(ChannelHandlerContext ctx) {log.info("【WsServerHandler->handlerAdded】");}public void handlerRemoved(ChannelHandlerContext ctx) {log.info("【WsServerHandler->handlerRemoved】");}public void channelRegistered(ChannelHandlerContext ctx) {log.info("【WsServerHandler->channelRegistered】");}public void channelUnregistered(ChannelHandlerContext ctx) {log.info("【WsServerHandler->channelUnregistered】");}public void channelActive(ChannelHandlerContext ctx) {log.info("【WsServerHandler->channelActive】");}public void channelInactive(ChannelHandlerContext ctx) {log.info("【WsServerHandler->channelInactive】");Channel channel = ctx.channel();Boolean isRepeat = channel.attr(ATTRIBUTE_KEY_REPEAT).get() != null&& channel.attr(ATTRIBUTE_KEY_REPEAT).get();if (!isRepeat) {CHANNELS.computeIfPresent(ctx.attr(ATTRIBUTE_KEY_TOKEN).get(), (key, ch) -> {CHANNELS.remove(channel.attr(ATTRIBUTE_KEY_TOKEN));sendAll(channel, channel.attr(ATTRIBUTE_KEY_TOKEN).get() + "离开聊天室");return null;});}}}
Client端
WsClient
@Slf4j
public class WsClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup(1);try {CountDownLatch connectLatch = new CountDownLatch(1);Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new HttpClientCodec());pipeline.addLast(new HttpObjectAggregator(10 * 1024));WebSocketClientProtocolConfig config = WebSocketClientProtocolConfig.newBuilder().handleCloseFrames(false).build();WebSocketClientHandshaker webSocketClientHandshaker = WebSocketClientHandshakerFactory.newHandshaker(new URI("ws://localhost:9090/ws/1?token=abc"),WebSocketVersion.V13,null,true,new DefaultHttpHeaders());pipeline.addLast(new WebSocketClientProtocolHandler(webSocketClientHandshaker, config));pipeline.addLast(new WebSocketClientHandler(connectLatch));}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090);Channel channel = channelFuture.channel();channelFuture.addListener((ChannelFutureListener) future -> {if (!future.isSuccess()) {System.err.println("Connection failed: " + future.cause());connectLatch.countDown(); // 确保不会死等}});// 等待连接完成(带超时)if (!connectLatch.await(10, TimeUnit.SECONDS)) {throw new RuntimeException("Connection timed out");}Scanner sc = new Scanner(System.in);while (true) {System.out.print("请输入:");String line = sc.nextLine();if (StringUtil.isNullOrEmpty(line)) {continue;}if ("exit".equals(line)) {channel.close();break;} else {// 发送消息WebSocketFrame frame = new TextWebSocketFrame(line);channelFuture.channel().writeAndFlush(frame);}}channelFuture.channel().closeFuture().sync();} catch (Exception e) {log.info("客户端发生异常: ", e);} finally {group.shutdownGracefully();}}}
WebSocketClientHandler
@Slf4j
public class WebSocketClientHandler extends SimpleChannelInboundHandler<WebSocketFrame> {private CountDownLatch connectLatch;public WebSocketClientHandler(CountDownLatch connectLatch) {this.connectLatch = connectLatch;}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {// 处理接收到的WebSocket帧if (frame instanceof TextWebSocketFrame) {String text = ((TextWebSocketFrame) frame).text();System.out.println("Received: " + text);} else if (frame instanceof PingWebSocketFrame) {// 响应Ping帧ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));System.out.println("Responded to ping");} else if (frame instanceof CloseWebSocketFrame) {System.out.println("Received close frame");ctx.close();} else if (frame instanceof BinaryWebSocketFrame) {System.out.println("Received binary data: " + frame.content().readableBytes() + " bytes");}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {// 处理握手完成事件if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {System.out.println("WebSocket handshake complete event");// 握手完成后可以发送初始消息connectLatch.countDown();}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {System.err.println("WebSocket error: ");cause.printStackTrace();ctx.close();}}
protobuf
Server端
NettyServer
@Slf4j
public class NettyServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler()).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new ProtobufEncoder());pipeline.addLast(new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));pipeline.addLast(new NettyServerHandler());}});ChannelFuture bindFuture = serverBootstrap.bind(new InetSocketAddress(8888));Channel channel = bindFuture.sync().channel();log.info("server start");channel.closeFuture().sync();log.info("server stop");} catch (Exception e) {log.info("server error", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
NettyServerHandler
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("服务端接收到客户端数据:{}", msg);if (msg instanceof StudentPOJO.Student) {StudentPOJO.Student student = (StudentPOJO.Student) msg;log.info( "客户端发送的数据:{}, {}, {}", student, student.getId(), student.getName());}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {log.info("【NettyServerHandler->userEventTriggered】: {}", evt);}public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("exceptionCaught异常:", cause);ctx.close();}public void handlerAdded(ChannelHandlerContext ctx) throws Exception {log.info("handlerAdded:{}", ctx.channel().remoteAddress());}public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {log.info("handlerRemoved:{}", ctx.channel().remoteAddress());}public void channelRegistered(ChannelHandlerContext ctx) throws Exception {log.info("channelRegistered:{}", ctx.channel().remoteAddress());}public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {log.info("channelUnregistered:{}", ctx.channel().remoteAddress());}public void channelActive(ChannelHandlerContext ctx) throws Exception {log.info("channelActive:{}", ctx.channel().remoteAddress());}public void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info("channelInactive:{}", ctx.channel().remoteAddress());}}
Student.proto
syntax = "proto3"; //版本
option java_outer_classname = "StudentPOJO";//生成的外部类名,同时也是文件名
//protobuf 使用message 管理数据
message Student { //会在 StudentPOJO 外部类生成一个内部类 Student, 他是真正发送的POJO对象int32 id = 1; // Student 类中有 一个属性 名字为 id 类型为int32(protobuf类型) 1表示属性序号,不是值string name = 2;
}
// 执行命令 protoc.exe --java_out=生成路径 Student.proto路径
Client端
NettyClient
@Slf4j
public class NettyClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup(1);try {Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new ProtobufEncoder());pipeline.addLast(new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));pipeline.addLast(new NettyClientHandler());}});ChannelFuture connectFuture = bootstrap.connect("127.0.0.1", 8888);Channel channel = connectFuture.sync().channel();log.info("客户端连接成功");channel.closeFuture().sync();log.info("客户端关闭");} catch (Exception e) {log.error("客户端发生异常: ", e);}}}
NettyClientHandler
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info( "【NettyClientHandler->channelRead】: {}", msg);}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {log.info( "【NettyClientHandler->userEventTriggered】: {}", evt);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info( "异常: {}", cause.getMessage());ctx.close();}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->handlerAdded】: {}", ctx);}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->handlerRemoved】: {}", ctx);}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelRegistered】: {}", ctx);}@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelUnregistered】: {}", ctx);}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelActive】: {}", ctx);StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(1).setName("张三san").build();ctx.writeAndFlush(student);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelInactive】: {}", ctx);}
}
相关文章:

Netty学习example示例
文章目录 simpleServer端NettyServerNettyServerHandler Client端NettyClientNettyClientHandler tcp(粘包和拆包)Server端NettyTcpServerNettyTcpServerHandler Client端NettyTcpClientNettyTcpClientHandler protocolcodecCustomMessageDecoderCustomM…...
几种常用的Agent的Prompt格式
一、基础框架范式(Google推荐标准) 1. 角色与职能定义 <Role_Definition> 你是“项目专家”(Project Pro),作为家居园艺零售商的首席AI助手,专注于家装改造领域。你的核心使命: 1. 协助…...
数据库运维管理系统在AI方向的实践
引言 关系型数据库(如MySQL、PostgreSQL、SQL Server、Oracle等)作为核心数据存储平台,承载着关键业务系统的运行。数据库的运维管理(DBA)工作变得愈发复杂和重要,涉及性能监控、故障诊断、容量规划、安全审计、自动化运维等多个方面。传统的数据库运维依赖人工经验,效…...

[RoarCTF 2019]Easy Calc
查看源代码 <!--Ive set up WAF to ensure security.--> <script>$(#calc).submit(function(){$.ajax({url:"calc.php?num"encodeURIComponent($("#content").val()),type:GET,success:function(data){$("#result").html(<div …...

[Windows]在Win上安装bash和zsh - 一个脚本搞定
目录 前言安装步骤配置要求下载安装脚本启动程序 前言 Windows是一个很流行的系统, 但是在Windows上安装bash和zsh一直是一个让人头疼的问题. 本蛙特意打包了一个程序, 用于一站式解决这一类的问题. 安装步骤 配置要求 系统: Windows软件: Powershell 5.1或以上 下载安装…...
ubuntu系统上运行jar程序输出时间时区不对
springboot项目打包jar文件在ubuntu系统上运行,发现在系统和日志里面,显示和打印的当前时间时区都是UTC0,通过timedatectl命令设置系统时区为Asia/Shanghai,命令date -R发现系统已经修改成功,但是发现springboot仍然输…...
React 播客专栏 Vol.18|React 第二阶段复习 · 样式与 Hooks 全面整合
视频版 🎙 欢迎回到《前端达人 React播客书单》第 18 期。 今天,我们将对第二阶段的内容进行系统复盘,重点是两个关键词:样式 与 Hooks。 样式,决定组件“长什么样”Hooks,决定组件“怎么动起来” 我们不但…...
从认识AI开始-----解密LSTM:RNN的进化之路
前言 我在上一篇文章中介绍了 RNN,它是一个隐变量模型,主要通过隐藏状态连接时间序列,实现了序列信息的记忆与建模。然而,RNN在实践中面临严重的“梯度消失”与“长期依赖建模困难”问题: 难以捕捉相隔很远的时间步之…...

leetcode0513. 找树左下角的值-meidum
1 题目:找树左下角的值 官方标定难度:中 给定一个二叉树的 根节点 root,请找出该二叉树的 最底层 最左边 节点的值。 假设二叉树中至少有一个节点。 示例 1: 输入: root [2,1,3] 输出: 1 示例 2: 输入: [1,2,3,4,null,5,6,null,null,7]…...

命令行式本地与服务器互传文件
文章目录 1. 背景2. 传输方式2.1 SCP 协议传输2.2 SFTP 协议传输 3. 注意 命令行式本地与服务器互传文件 1. 背景 多设备协同工作中,因操作系统的不同,我们经常需要将另外一个系统中的文件传输到本地PC进行浏览、编译。多设备文件互传,在嵌入…...
MPTCP 聚合吞吐
只破不立假把式,前面连续喷 MPTCP 是个错误,今天说说如何克服。 到底谁在阻碍 MPTCP 聚合吞吐一定要搞清楚,是算法硬伤,是数据不足。前文说过,将一个窗口内的数据多路径 spray 有损吞吐,想要聚合吞吐&…...
JavaScript性能优化实战技术文章大纲
代码层面优化 避免全局变量污染,使用let和const替代var,减少作用域链查找开销。 // 反例:全局变量 var globalVar 低效;// 正例:局部变量 function optimized() {const localVar 高效; }减少DOM操作,合并多次操作或…...

LabelImg: 开源图像标注工具指南
LabelImg: 开源图像标注工具指南 1. 简介 LabelImg 是一个图形化的图像标注工具,使用 Python 和 Qt 开发。它是目标检测任务中最常用的标注工具之一,支持 PASCAL VOC 和 YOLO 格式的标注输出。该工具开源、免费,并且跨平台支持 Windows、Lin…...

计算机网络 TCP篇常见面试题总结
目录 TCP 的三次握手与四次挥手详解 1. 三次握手(Three-Way Handshake) 2. 四次挥手(Four-Way Handshake) TCP 为什么可靠? 1. 序列号与确认应答(ACK) 2. 超时重传(Retransmis…...

树欲静而风不止,子欲养而亲不待
2025年6月2日,13~26℃,一般 待办: 物理2 、物理 学生重修 职称材料的最后检查 教学技能大赛PPT 遇见:使用通义创作了一副照片,很好看!都有想用来创作自己的头像了! 提示词如下: A b…...

Kotlin中的::操作符详解
Kotlin提供了::操作符,用于创建对类或对象的成员(函数、属性)的引用。这种机制叫做成员引用(Member Reference)。这是Kotlin高阶函数和函数式编程的重要组成部分。 简化函数传递 在Java中,我们这样传方法: list.forEach(item -> System.…...
【Linux】(1)—进程概念-③Linux进程概念与PCB
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、Linux进程概念与PCB 前言 提示:以下是本篇文章正文内容,下面案例可供参考 什么是进程? 进程可以理解为"正在执行的…...
神经网络中的梯度消失与梯度爆炸
在深层次的神经网络中很容易出现梯度消失与梯度爆炸的问题。这篇博客就详细介绍一下为什么会产生梯度消失与梯度爆炸的问题,以及如何解决。 首先梯度是什么 类比快递员送包裹: 神经网络训练时,需要根据预测错误(损失函数&#…...

深入详解编译与链接:翻译环境和运行环境,翻译环境:预编译+编译+汇编+链接,运行环境
目录 一、翻译环境和运行环境 二、翻译环境:预编译编译汇编链接 (一)预处理(预编译) (二)编译 1、词法分析 2、语法分析 3、语义分析 (三)汇编 (四&…...
系统架构设计师案例分析----经典架构风格特点
这次的考试太大意了,很多知识点有印象,但不能完整的描述出来。今年11月的考试,要认真备考,从现在开始,把案例分析和论文内容整理出来,一是方便记忆,二是和各位考一起分享。欢迎各位拍砖。 这段…...
基于大模型的急性乳腺炎全病程风险预测与综合治疗方案研究
目录 一、引言 1.1 研究背景与意义 1.2 研究目的与创新点 1.3 研究方法与技术路线 二、急性乳腺炎概述 三、大模型技术原理与应用现状 3.1 大模型基本原理 3.2 在医疗领域的应用案例 3.3 选择大模型用于急性乳腺炎预测的依据 四、大模型预测急性乳腺炎各阶段风险 4.…...
HTML实战:爱心图的实现
设计思路 使用纯CSS创建多种风格的爱心 添加平滑的动画效果 实现交互式爱心生成器 响应式设计适应不同设备 优雅的UI布局和色彩方案 <!DOCTYPE html> <html lang"zh-CN"> <head> <meta charset"UTF-8"> <meta nam…...

定时任务:springboot集成xxl-job-core(二)
定时任务实现方式: 存在的问题: xxl-job的原理: 可以根据服务器的个数进行动态分片,每台服务器分到的处理数据是不一样的。 1. 多台机器动态注册 多台机器同时配置了调度器xxl-job-admin之后,执行器那里会有多个注…...

DeviceNET转EtherCAT网关:医院药房自动化的智能升级神经中枢
在现代医院药房自动化系统中,高效、精准、可靠的设备通信是保障患者用药安全与效率的核心。当面临既有支持DeviceNET协议的传感器、执行器(如药盒状态传感器、机械臂限位开关)需接入先进EtherCAT高速实时网络时,JH-DVN-ECT疆鸿智能…...

一:UML类图
一、类的设计 提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加 学习设计模式的第一步是看懂UML类图,类图能直观的表达类、对象之间的关系,这将有助于后续对代码的编写。 类图在软件设计及应用框架前期设计中是不可缺少的一部分,它的主要成分包括:类名、…...
数据库三范式的理解
最近在学习数据库知识,发现 “数据库三范式” 这个概念特别重要,今天就来和大家分享一下我的理解,欢迎各位指正 一、数据库三范式是什么? 数据库三范式是为了让数据库结构更合理、减少数据冗余、提高数据完整性的设计规则。 第一范式&…...

Java 中 MySQL 索引深度解析:面试核心知识点与实战
🤟致敬读者 🟩感谢阅读🟦笑口常开🟪生日快乐⬛早点睡觉 📘博主相关 🟧博主信息🟨博客首页🟫专栏推荐🟥活动信息 文章目录 Java 中 MySQL 索引深度解析:面试…...
DeepSeek 部署中的常见问题及解决方案
技术文章大纲:DeepSeek 部署中的常见问题及解决方案 部署环境配置问题 硬件兼容性问题(如GPU驱动版本不匹配) 操作系统及依赖库版本冲突(CUDA/cuDNN版本) Python虚拟环境配置错误 模型加载与初始化失败 预训练模型…...
Nvidia Intern 笔试回忆
Nvidia intern compute arch 的笔试回忆,感觉强度拉满,两个半小时6道编程题,难度堪比ACM,需要自己写好输入输出(ACM好歹有个签到题 ),图论的题比较多,跟大厂面试题不是同一level...…...
鸿蒙OS基于UniApp的WebRTC视频会议系统实践:从0到1的HarmonyOS适配之路#三方框架 #Uniapp
基于UniApp的WebRTC视频会议系统实践:从0到1的HarmonyOS适配之路 引言 在移动互联网时代,实时音视频通讯已成为各类应用的标配功能。本文将结合我在某大型企业协同办公项目中的实战经验,详细讲解如何使用UniApp框架开发一个支持鸿蒙系统的W…...