当前位置: 首页 > news >正文

Netty 入门学习

前言

学习Spark源码绕不开通信,Spark通信是基于Netty实现的,所以先简单学习总结一下Netty。

Spark 通信历史

最开始: Akka
Spark 1.3: 开始引入Netty,为了解决大块数据(如Shuffle)的传输问题
Spark 1.6:支持配置使用 Akka 或者 Netty
Spark 2:完全废弃Akka,全部使用Netty

Akka 是一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用。
Spark 借鉴Akka 通过 Netty 实现了类似的简约版的Actor 模型

Netty

Server 主要代码:

// 创建ServerBootstrap实例,服务器启动对象
ServerBootstrap bootstrap = new ServerBootstrap();ChannelFuture channelFuture = bootstrap.bind(8888).sync();
// 等待服务器关闭
channelFuture.channel().closeFuture().sync();

主要是启动 ServerBootstrap、绑定端口、等待关闭。

Client 主要代码:

// 创建Bootstrap实例,客户端启动对象
Bootstrap bootstrap = new Bootstrap();
// 连接服务端
ChannelFuture channelFuture = bootstrap.connect("localhost", 8888).sync();

Server 添加 Handler

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ServerHandler());}
});
bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ClientHandler());}
});

这里的 ServerHandler 和 ClientHandler 都是自己实现的类,处理具体的逻辑。
如channelActive 建立连接时发消息给服务器,channelRead 读取数据时调用,处理读取数据的逻辑。给服务器或者客户端发消息可以用 writeAndFlush 方法。

完整代码

地址:https://gitee.com/dongkelun/java-learning/tree/master/netty-learning/src/main/java/com/dkl/java/demo

NettyServer

package com.dkl.java.demo;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;public class NettyServer {public static void main(String[] args) {try {bind();} catch (InterruptedException e) {throw new RuntimeException(e);}}public static void bind() throws InterruptedException {// 创建boss线程组,用于接收连接EventLoopGroup bossGroup = new NioEventLoopGroup(1);// 创建worker线程组,用于处理连接上的I/O操作,含有子线程NioEventGroup个数为CPU核数大小的2倍EventLoopGroup workerGroup = new NioEventLoopGroup();try {// 创建ServerBootstrap实例,服务器启动对象ServerBootstrap bootstrap = new ServerBootstrap();// 使用链式编程配置参数// 将boss线程组和worker线程组暂存到ServerBootstrapbootstrap.group(bossGroup, workerGroup);// 设置服务端Channel类型为NioServerSocketChannel作为通道实现bootstrap.channel(NioServerSocketChannel.class);bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// 添加ServerHandler到ChannelPipeline,对workerGroup的SocketChannel(客户端)设置处理器socketChannel.pipeline().addLast(new ServerHandler());}});// 设置启动参数,初始化服务器连接队列大小。服务端处理客户端连接请求是顺序处理,一个时间内只能处理一个客户端请求// 当有多个客户端同时来请求时,未处理的请求先放入队列中bootstrap.option(ChannelOption.SO_BACKLOG, 1024);// 绑定端口并启动服务器,bind方法是异步的,sync方法是等待异步操作执行完成,返回ChannelFuture异步对象ChannelFuture channelFuture = bootstrap.bind(8888).sync();// 等待服务器关闭channelFuture.channel().closeFuture().sync();} finally {// 优雅地关闭boss线程组bossGroup.shutdownGracefully();// 优雅地关闭worker线程组workerGroup.shutdownGracefully();}}
}

ServerHandler

package com.dkl.java.demo;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;public class ServerHandler extends ChannelInboundHandlerAdapter {/*** 当 Channel 已经注册到它的 EventLoop 并且能够处理 I/O 时被调用** @param ctx* @throws Exception*/@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("执行 channelRegistered");}/*** 当 Channel 从它的 EventLoop 注销并且无法处理任何 I/O 时被调* 用** @param ctx* @throws Exception*/@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {System.out.println("执行 channelUnregistered");}/*** 当 Channel 处于活动状态时被调用;Channel 已经连接/绑定并且已经就绪** @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("执行 channelActive");}/*** 当 Channel 离开活动状态并且不再连接它的远程节点时被调用** @param ctx* @throws Exception*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("执行 channelInactive");}/*** 当从 Channel 读取数据时被调用** @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("执行 channelRead");// 处理接收到的数据ByteBuf byteBuf = (ByteBuf) msg;try {// 将接收到的字节数据转换为字符串String message = byteBuf.toString(CharsetUtil.UTF_8);// 打印接收到的消息System.out.println("Server端收到客户消息: " + message);// 发送响应消息给客户端ctx.writeAndFlush(Unpooled.copiedBuffer("我是服务端,我收到你的消息啦~", CharsetUtil.UTF_8));} finally {// 释放ByteBuf资源ReferenceCountUtil.release(byteBuf);}}/*** 当 Channel 上的一个读操作完成时被调用,对通道的读取完成的事件或通知。当读取完成可通知发送方或其他的相关方,告诉他们接受方读取完成** @param ctx* @throws Exception*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {System.out.println("执行 channelReadComplete");}/*** 当 ChannelnboundHandler.fireUserEventTriggered()方法被调用时被* 调用** @param ctx* @param evt* @throws Exception*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {System.out.println("执行 userEventTriggered");}/*** 当 Channel 的可写状态发生改变时被调用。可以通过调用 Channel 的 isWritable()方法* * 来检测 Channel 的可写性。与可写性相关的阈值可以通过* * Channel.config().setWriteHighWaterMark()和 Channel.config().setWriteLowWaterMark()方法来* * 设置** @param ctx* @throws Exception*/@Overridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {System.out.println("执行 channelWritabilityChanged");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("执行 exceptionCaught");// 异常处理cause.printStackTrace();ctx.close();}
}

NettyClient

package com.dkl.java.demo;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;public class NettyClient {public static void main(String[] args) {start();}public static void start() {// 创建EventLoopGroup,用于处理客户端的I/O操作EventLoopGroup groupThread = new NioEventLoopGroup();try {// 创建Bootstrap实例,客户端启动对象Bootstrap bootstrap = new Bootstrap();bootstrap.group(groupThread);// 设置服务端Channel类型为NioSocketChannel作为通道实现bootstrap.channel(NioSocketChannel.class);// 设置客户端处理bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ClientHandler());}});// 绑定端口ChannelFuture channelFuture = bootstrap.connect("localhost", 8888).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {// 优雅地关闭线程groupThread.shutdownGracefully();}}
}

ClientHandler

package com.dkl.java.demo;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;public class ClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) {// 连接建立时的处理,发送请求消息给服务器ctx.writeAndFlush(Unpooled.copiedBuffer("你好,服务端!我是客户端,测试通道连接", CharsetUtil.UTF_8));}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 处理接收到的数据ByteBuf byteBuf = (ByteBuf) msg;try {// 将接收到的字节数据转换为字符串String message = byteBuf.toString(CharsetUtil.UTF_8);// 打印接收到的消息System.out.println("受到服务端响应的消息: " + message);// TODO: 对数据进行业务处理} finally {// 释放ByteBuf资源ReferenceCountUtil.release(byteBuf);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// 异常处理cause.printStackTrace();ctx.close();}
}

运行截图


handler 执行顺序

Server 端

连接时:执行 channelRegistered
执行 channelActive
执行 channelRead
执行 channelReadComplete断开连接时:执行 channelReadComplete
(强制中断 Client 连接
执行 exceptionCaught
执行 userEventTriggered (exceptionCaught 中 ctx.close()) 触发
)
执行 channelInactive
执行 channelUnregisteredchannelReadComplete 中 ctx.close(); 触发:
执行 channelInactive
执行 channelUnregistered

Client 端

执行 channelRegistered
执行 channelActive
执行 channelRead
执行 channelReadComplete

Spark 对应位置

  • Spark版本:3.2.3
  • Server: org.apache.spark.network.server.TransportServer.init
  • Client: org.apache.spark.network.client.TransportClientFactory.createClient


相关文章:

Netty 入门学习

前言 学习Spark源码绕不开通信&#xff0c;Spark通信是基于Netty实现的&#xff0c;所以先简单学习总结一下Netty。 Spark 通信历史 最开始: Akka Spark 1.3&#xff1a; 开始引入Netty&#xff0c;为了解决大块数据&#xff08;如Shuffle&#xff09;的传输问题 Spark 1.6&…...

Magentic-One、AutoGen、LangGraph、CrewAI 或 OpenAI Swarm:哪种多 AI 代理框架最好?

目录 一、说明 二、 AutoGen-自动生成&#xff08;微软&#xff09; 2.1 特征 2.2 局限性 三、 CrewAI 3.1 特征 3.2 限制&#xff1a; 四、LangGraph 4.1 特征&#xff1a; 4.2 限制&#xff1a; 五、OpenAI Swarm 5.1 特征 5.2 限制 六、Magentic-One 6.1 特征 6.2 限制 七、…...

openstack下如何生成centos9 centos10 和Ubuntu24 镜像

如何生成一个centos 10和centos 9 的镜像1. 下载 对应的版本 wget https://cloud.centos.org/centos/10-stream/x86_64/images/CentOS-Stream-GenericCloud-x86_64-10-latest.x86_64.qcow2 wget https://cloud.centos.org/centos/9-stream/x86_64/images/CentOS-Stream-Gener…...

Kivy App开发之UX控件Slider滑块

在app中可能会调节如音量,亮度等,可以使用Slider来实现,该控件调用方便,兼容性好,滑动平稳。在一些参数设置中,也可以用来调整数值。 支持水平和垂直方向,可以设置默认值,最小及最大值。 使用方法,需用引入Slider类,通过Slider类生成一个滑块并设置相关的样式后,再…...

CSS——22.静态伪类(伪类是选择不同元素状态)

<!DOCTYPE html> <html><head><meta charset"UTF-8"><title>静态伪类</title> </head><body><a href"#">我爱学习</a></body> </html>单击链接前的样式 左键单击&#xff08;且…...

python学opencv|读取图像(三十)使用cv2.getAffineTransform()函数倾斜拉伸图像

【1】引言 前序已经学习了如何平移和旋转缩放图像&#xff0c;相关文章链接为&#xff1a; python学opencv|读取图像&#xff08;二十七&#xff09;使用cv2.warpAffine&#xff08;&#xff09;函数平移图像-CSDN博客 python学opencv|读取图像&#xff08;二十八&#xff0…...

Unity3D中基于ILRuntime的组件化开发详解

前言 在Unity3D开发中&#xff0c;组件化开发是一种高效且灵活的软件架构方式。通过将游戏功能拆分为独立的、可重用的组件&#xff0c;开发者可以更容易地管理、扩展和维护代码。而ILRuntime作为一款基于C#的热更新框架&#xff0c;为Unity3D开发者提供了一种高效的热更新和组…...

ELK的搭建

ELK elk&#xff1a;elasticsearch logstatsh kibana统一日志收集系统 elasticsearch&#xff1a;分布式的全文索引引擎点非关系型数据库,存储所有的日志信息&#xff0c;主和从&#xff0c;最少需要2台 logstatsh&#xff1a;动态的从各种指定的数据源&#xff0c;获取数据…...

国产信创实践(国能磐石服务器操作系统CEOS +东方通TongHttpServer)

替换介绍&#xff1a; 国能磐石服务器操作系统CEOS 对标 Linux 服务器操作系统&#xff08;Ubuntu, CentOS&#xff09; 东方通TongHttpServer 对标 Nginx 负载均衡Web服务器 第一步&#xff1a; 服务器安装CEOS映像文件&#xff0c;可直接安装&#xff0c;本文采用使用VMware …...

C#里使用libxl读取EXCEL文件里的图片并保存出来

有时候需要读取EXCEL里的图片文件, 因为很多用户喜欢使用图片保存在EXCEL里,比如用户保存一些现场整改的图片。 如果需要把这些图片抽取出来,再保存到系统里,就需要读取这些图片数据,生成合适的文件再保存。 在libxl里也提供了这样的方法, 如下: var picType = boo…...

【开源免费】基于SpringBoot+Vue.JS企业级工位管理系统(JAVA毕业设计)

本文项目编号 T 127 &#xff0c;文末自助获取源码 \color{red}{T127&#xff0c;文末自助获取源码} T127&#xff0c;文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…...

美国大学的计算机科学专业排名

美国的计算机科学专业在全球范围内享有盛誉&#xff0c;许多大学在该领域具有卓越的教学和研究实力。以下是根据最新的排名和信息整理的美国计算机科学专业顶尖大学列表&#xff1a; 2025年 U.S. News 美国本科计算机科学专业排名&#xff1a; 斯坦福大学&#xff08;Stanfor…...

机器学习实战——决策树:从原理到应用的深度解析

✨个人主页欢迎您的访问 ✨期待您的三连 ✨ ✨个人主页欢迎您的访问 ✨期待您的三连 ✨ ✨个人主页欢迎您的访问 ✨期待您的三连✨ ​​​ ​​​ ​​ 决策树&#xff08;Decision Tree&#xff09;是一种简单而直观的分类与回归模型&#xff0c;在机器学习中广泛应用。它的…...

开源生成式物理引擎Genesis,可模拟世界万物

这是生成大模型时代 —— 它们能生成文本、图像、音频、视频、3D 对象…… 而如果将所有这些组合到一起&#xff0c;我们可能会得到一个世界&#xff01; 现在&#xff0c;不管是 LeCun 正在探索的世界模型&#xff0c;还是李飞飞想要攻克的空间智能&#xff0c;又或是其他研究…...

kubernetes第七天

1.影响pod调度的因素 nodeName 节点名 resources 资源限制 hostNetwork 宿主机网络 污点 污点容忍 Pod亲和性 Pod反亲和性 节点亲和性 2.污点 通常是作用于worker节点上&#xff0c;其可以影响pod的调度 语法&#xff1a;key[value]:effect effect:[ɪˈfek…...

RK3588上CPU和GPU算力以及opencv resize的性能对比测试

RK3588上CPU和GPU算力以及opencv resize的性能对比测试 一.背景二.小结三.相关链接四.操作步骤1.环境搭建A.安装依赖B.设置GPU为高性能模式C.获取GPU信息D.获取CPU信息 2.调用OpenCL SDK获取GPU信息3.使用OpenCL API计算矩阵乘4.使用clpeak测试GPU的性能5.使用OpenBLAS测试CPU的…...

基于Centos 7系统的安全加固方案

创作不易&#xff0c;麻烦点个免费的赞和关注吧&#xff01; 声明&#xff01; 免责声明&#xff1a;本教程作者及相关参与人员对于任何直接或间接使用本教程内容而导致的任何形式的损失或损害&#xff0c;包括但不限于数据丢失、系统损坏、个人隐私泄露或经济损失等&#xf…...

IT行业的发展趋势

一、引言 IT&#xff08;信息技术&#xff09;行业自诞生以来&#xff0c;就以惊人的速度发展&#xff0c;不断改变着我们的生活、工作和社会结构。如今&#xff0c;随着技术的持续创新、市场需求的演变以及全球经济格局的变化&#xff0c;IT行业正迈向新的发展阶段&#xff0…...

《探秘开源多模态神经网络模型:AI 新时代的万能钥匙》

《探秘开源多模态神经网络模型&#xff1a;AI 新时代的万能钥匙》 一、多模态模型的崛起之路&#xff08;一&#xff09;从单一到多元&#xff1a;模态的融合演进&#xff08;二&#xff09;关键技术突破&#xff1a;解锁多模态潜能 二、开源多模态模型深度剖析&#xff08;一&…...

ROS核心概念解析:从Node到Master,再到roslaunch的全面指南

Node 在ROS中&#xff0c;最小的进程单元就是节点&#xff08;node&#xff09;。一个软件包里可以有多个可执行文件&#xff0c;可执行文件在运行之后就成了一个进程(process)&#xff0c;这个进程在ROS中就叫做节点。 从程序角度来说&#xff0c;node就是一个可执行文件&…...

IDEA运行Tomcat出现乱码问题解决汇总

最近正值期末周&#xff0c;有很多同学在写期末Java web作业时&#xff0c;运行tomcat出现乱码问题&#xff0c;经过多次解决与研究&#xff0c;我做了如下整理&#xff1a; 原因&#xff1a; IDEA本身编码与tomcat的编码与Windows编码不同导致&#xff0c;Windows 系统控制台…...

【位运算】消失的两个数字(hard)

消失的两个数字&#xff08;hard&#xff09; 题⽬描述&#xff1a;解法&#xff08;位运算&#xff09;&#xff1a;Java 算法代码&#xff1a;更简便代码 题⽬链接&#xff1a;⾯试题 17.19. 消失的两个数字 题⽬描述&#xff1a; 给定⼀个数组&#xff0c;包含从 1 到 N 所有…...

linux arm系统烧录

1、打开瑞芯微程序 2、按住linux arm 的 recover按键 插入电源 3、当瑞芯微检测到有设备 4、松开recover按键 5、选择升级固件 6、点击固件选择本地刷机的linux arm 镜像 7、点击升级 &#xff08;忘了有没有这步了 估计有&#xff09; 刷机程序 和 镜像 就不提供了。要刷的时…...

【碎碎念】宝可梦 Mesh GO : 基于MESH网络的口袋妖怪 宝可梦GO游戏自组网系统

目录 游戏说明《宝可梦 Mesh GO》 —— 局域宝可梦探索Pokmon GO 类游戏核心理念应用场景Mesh 特性 宝可梦玩法融合设计游戏构想要素1. 地图探索&#xff08;基于物理空间 广播范围&#xff09;2. 野生宝可梦生成与广播3. 对战系统4. 道具与通信5. 延伸玩法 安全性设计 技术选…...

Java线上CPU飙高问题排查全指南

一、引言 在Java应用的线上运行环境中&#xff0c;CPU飙高是一个常见且棘手的性能问题。当系统出现CPU飙高时&#xff0c;通常会导致应用响应缓慢&#xff0c;甚至服务不可用&#xff0c;严重影响用户体验和业务运行。因此&#xff0c;掌握一套科学有效的CPU飙高问题排查方法&…...

力扣热题100 k个一组反转链表题解

题目: 代码: func reverseKGroup(head *ListNode, k int) *ListNode {cur : headfor i : 0; i < k; i {if cur nil {return head}cur cur.Next}newHead : reverse(head, cur)head.Next reverseKGroup(cur, k)return newHead }func reverse(start, end *ListNode) *ListN…...

Golang——6、指针和结构体

指针和结构体 1、指针1.1、指针地址和指针类型1.2、指针取值1.3、new和make 2、结构体2.1、type关键字的使用2.2、结构体的定义和初始化2.3、结构体方法和接收者2.4、给任意类型添加方法2.5、结构体的匿名字段2.6、嵌套结构体2.7、嵌套匿名结构体2.8、结构体的继承 3、结构体与…...

宇树科技,改名了!

提到国内具身智能和机器人领域的代表企业&#xff0c;那宇树科技&#xff08;Unitree&#xff09;必须名列其榜。 最近&#xff0c;宇树科技的一项新变动消息在业界引发了不少关注和讨论&#xff0c;即&#xff1a; 宇树向其合作伙伴发布了一封公司名称变更函称&#xff0c;因…...

打手机检测算法AI智能分析网关V4守护公共/工业/医疗等多场景安全应用

一、方案背景​ 在现代生产与生活场景中&#xff0c;如工厂高危作业区、医院手术室、公共场景等&#xff0c;人员违规打手机的行为潜藏着巨大风险。传统依靠人工巡查的监管方式&#xff0c;存在效率低、覆盖面不足、判断主观性强等问题&#xff0c;难以满足对人员打手机行为精…...

Caliper 配置文件解析:fisco-bcos.json

config.yaml 文件 config.yaml 是 Caliper 的主配置文件,通常包含以下内容: test:name: fisco-bcos-test # 测试名称description: Performance test of FISCO-BCOS # 测试描述workers:type: local # 工作进程类型number: 5 # 工作进程数量monitor:type: - docker- pro…...