Netty 入门学习
前言
学习Spark源码绕不开通信,Spark通信是基于Netty实现的,所以先简单学习总结一下Netty。
Spark 通信历史
最开始: Akka
Spark 1.3: 开始引入Netty,为了解决大块数据(如Shuffle)的传输问题
Spark 1.6:支持配置使用 Akka 或者 Netty
Spark 2:完全废弃Akka,全部使用Netty
Akka 是一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用。
Spark 借鉴Akka 通过 Netty 实现了类似的简约版的Actor 模型
Netty
Server 主要代码:
// 创建ServerBootstrap实例,服务器启动对象
ServerBootstrap bootstrap = new ServerBootstrap();ChannelFuture channelFuture = bootstrap.bind(8888).sync();
// 等待服务器关闭
channelFuture.channel().closeFuture().sync();
主要是启动 ServerBootstrap、绑定端口、等待关闭。
Client 主要代码:
// 创建Bootstrap实例,客户端启动对象
Bootstrap bootstrap = new Bootstrap();
// 连接服务端
ChannelFuture channelFuture = bootstrap.connect("localhost", 8888).sync();
Server 添加 Handler
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ServerHandler());}
});
bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ClientHandler());}
});
这里的 ServerHandler 和 ClientHandler 都是自己实现的类,处理具体的逻辑。
如channelActive 建立连接时发消息给服务器,channelRead 读取数据时调用,处理读取数据的逻辑。给服务器或者客户端发消息可以用 writeAndFlush 方法。
完整代码
地址:https://gitee.com/dongkelun/java-learning/tree/master/netty-learning/src/main/java/com/dkl/java/demo
NettyServer
package com.dkl.java.demo;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;public class NettyServer {public static void main(String[] args) {try {bind();} catch (InterruptedException e) {throw new RuntimeException(e);}}public static void bind() throws InterruptedException {// 创建boss线程组,用于接收连接EventLoopGroup bossGroup = new NioEventLoopGroup(1);// 创建worker线程组,用于处理连接上的I/O操作,含有子线程NioEventGroup个数为CPU核数大小的2倍EventLoopGroup workerGroup = new NioEventLoopGroup();try {// 创建ServerBootstrap实例,服务器启动对象ServerBootstrap bootstrap = new ServerBootstrap();// 使用链式编程配置参数// 将boss线程组和worker线程组暂存到ServerBootstrapbootstrap.group(bossGroup, workerGroup);// 设置服务端Channel类型为NioServerSocketChannel作为通道实现bootstrap.channel(NioServerSocketChannel.class);bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// 添加ServerHandler到ChannelPipeline,对workerGroup的SocketChannel(客户端)设置处理器socketChannel.pipeline().addLast(new ServerHandler());}});// 设置启动参数,初始化服务器连接队列大小。服务端处理客户端连接请求是顺序处理,一个时间内只能处理一个客户端请求// 当有多个客户端同时来请求时,未处理的请求先放入队列中bootstrap.option(ChannelOption.SO_BACKLOG, 1024);// 绑定端口并启动服务器,bind方法是异步的,sync方法是等待异步操作执行完成,返回ChannelFuture异步对象ChannelFuture channelFuture = bootstrap.bind(8888).sync();// 等待服务器关闭channelFuture.channel().closeFuture().sync();} finally {// 优雅地关闭boss线程组bossGroup.shutdownGracefully();// 优雅地关闭worker线程组workerGroup.shutdownGracefully();}}
}
ServerHandler
package com.dkl.java.demo;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;public class ServerHandler extends ChannelInboundHandlerAdapter {/*** 当 Channel 已经注册到它的 EventLoop 并且能够处理 I/O 时被调用** @param ctx* @throws Exception*/@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("执行 channelRegistered");}/*** 当 Channel 从它的 EventLoop 注销并且无法处理任何 I/O 时被调* 用** @param ctx* @throws Exception*/@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {System.out.println("执行 channelUnregistered");}/*** 当 Channel 处于活动状态时被调用;Channel 已经连接/绑定并且已经就绪** @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("执行 channelActive");}/*** 当 Channel 离开活动状态并且不再连接它的远程节点时被调用** @param ctx* @throws Exception*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("执行 channelInactive");}/*** 当从 Channel 读取数据时被调用** @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("执行 channelRead");// 处理接收到的数据ByteBuf byteBuf = (ByteBuf) msg;try {// 将接收到的字节数据转换为字符串String message = byteBuf.toString(CharsetUtil.UTF_8);// 打印接收到的消息System.out.println("Server端收到客户消息: " + message);// 发送响应消息给客户端ctx.writeAndFlush(Unpooled.copiedBuffer("我是服务端,我收到你的消息啦~", CharsetUtil.UTF_8));} finally {// 释放ByteBuf资源ReferenceCountUtil.release(byteBuf);}}/*** 当 Channel 上的一个读操作完成时被调用,对通道的读取完成的事件或通知。当读取完成可通知发送方或其他的相关方,告诉他们接受方读取完成** @param ctx* @throws Exception*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {System.out.println("执行 channelReadComplete");}/*** 当 ChannelnboundHandler.fireUserEventTriggered()方法被调用时被* 调用** @param ctx* @param evt* @throws Exception*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {System.out.println("执行 userEventTriggered");}/*** 当 Channel 的可写状态发生改变时被调用。可以通过调用 Channel 的 isWritable()方法* * 来检测 Channel 的可写性。与可写性相关的阈值可以通过* * Channel.config().setWriteHighWaterMark()和 Channel.config().setWriteLowWaterMark()方法来* * 设置** @param ctx* @throws Exception*/@Overridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {System.out.println("执行 channelWritabilityChanged");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("执行 exceptionCaught");// 异常处理cause.printStackTrace();ctx.close();}
}
NettyClient
package com.dkl.java.demo;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;public class NettyClient {public static void main(String[] args) {start();}public static void start() {// 创建EventLoopGroup,用于处理客户端的I/O操作EventLoopGroup groupThread = new NioEventLoopGroup();try {// 创建Bootstrap实例,客户端启动对象Bootstrap bootstrap = new Bootstrap();bootstrap.group(groupThread);// 设置服务端Channel类型为NioSocketChannel作为通道实现bootstrap.channel(NioSocketChannel.class);// 设置客户端处理bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ClientHandler());}});// 绑定端口ChannelFuture channelFuture = bootstrap.connect("localhost", 8888).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {// 优雅地关闭线程groupThread.shutdownGracefully();}}
}
ClientHandler
package com.dkl.java.demo;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;public class ClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) {// 连接建立时的处理,发送请求消息给服务器ctx.writeAndFlush(Unpooled.copiedBuffer("你好,服务端!我是客户端,测试通道连接", CharsetUtil.UTF_8));}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 处理接收到的数据ByteBuf byteBuf = (ByteBuf) msg;try {// 将接收到的字节数据转换为字符串String message = byteBuf.toString(CharsetUtil.UTF_8);// 打印接收到的消息System.out.println("受到服务端响应的消息: " + message);// TODO: 对数据进行业务处理} finally {// 释放ByteBuf资源ReferenceCountUtil.release(byteBuf);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// 异常处理cause.printStackTrace();ctx.close();}
}
运行截图
handler 执行顺序
Server 端
连接时:执行 channelRegistered
执行 channelActive
执行 channelRead
执行 channelReadComplete断开连接时:执行 channelReadComplete
(强制中断 Client 连接
执行 exceptionCaught
执行 userEventTriggered (exceptionCaught 中 ctx.close()) 触发
)
执行 channelInactive
执行 channelUnregisteredchannelReadComplete 中 ctx.close(); 触发:
执行 channelInactive
执行 channelUnregistered
Client 端
执行 channelRegistered
执行 channelActive
执行 channelRead
执行 channelReadComplete
Spark 对应位置
- Spark版本:3.2.3
- Server: org.apache.spark.network.server.TransportServer.init
- Client: org.apache.spark.network.client.TransportClientFactory.createClient
相关文章:

Netty 入门学习
前言 学习Spark源码绕不开通信,Spark通信是基于Netty实现的,所以先简单学习总结一下Netty。 Spark 通信历史 最开始: Akka Spark 1.3: 开始引入Netty,为了解决大块数据(如Shuffle)的传输问题 Spark 1.6&…...

Magentic-One、AutoGen、LangGraph、CrewAI 或 OpenAI Swarm:哪种多 AI 代理框架最好?
目录 一、说明 二、 AutoGen-自动生成(微软) 2.1 特征 2.2 局限性 三、 CrewAI 3.1 特征 3.2 限制: 四、LangGraph 4.1 特征: 4.2 限制: 五、OpenAI Swarm 5.1 特征 5.2 限制 六、Magentic-One 6.1 特征 6.2 限制 七、…...
openstack下如何生成centos9 centos10 和Ubuntu24 镜像
如何生成一个centos 10和centos 9 的镜像1. 下载 对应的版本 wget https://cloud.centos.org/centos/10-stream/x86_64/images/CentOS-Stream-GenericCloud-x86_64-10-latest.x86_64.qcow2 wget https://cloud.centos.org/centos/9-stream/x86_64/images/CentOS-Stream-Gener…...
Kivy App开发之UX控件Slider滑块
在app中可能会调节如音量,亮度等,可以使用Slider来实现,该控件调用方便,兼容性好,滑动平稳。在一些参数设置中,也可以用来调整数值。 支持水平和垂直方向,可以设置默认值,最小及最大值。 使用方法,需用引入Slider类,通过Slider类生成一个滑块并设置相关的样式后,再…...

CSS——22.静态伪类(伪类是选择不同元素状态)
<!DOCTYPE html> <html><head><meta charset"UTF-8"><title>静态伪类</title> </head><body><a href"#">我爱学习</a></body> </html>单击链接前的样式 左键单击(且…...

python学opencv|读取图像(三十)使用cv2.getAffineTransform()函数倾斜拉伸图像
【1】引言 前序已经学习了如何平移和旋转缩放图像,相关文章链接为: python学opencv|读取图像(二十七)使用cv2.warpAffine()函数平移图像-CSDN博客 python学opencv|读取图像(二十八࿰…...
Unity3D中基于ILRuntime的组件化开发详解
前言 在Unity3D开发中,组件化开发是一种高效且灵活的软件架构方式。通过将游戏功能拆分为独立的、可重用的组件,开发者可以更容易地管理、扩展和维护代码。而ILRuntime作为一款基于C#的热更新框架,为Unity3D开发者提供了一种高效的热更新和组…...

ELK的搭建
ELK elk:elasticsearch logstatsh kibana统一日志收集系统 elasticsearch:分布式的全文索引引擎点非关系型数据库,存储所有的日志信息,主和从,最少需要2台 logstatsh:动态的从各种指定的数据源,获取数据…...

国产信创实践(国能磐石服务器操作系统CEOS +东方通TongHttpServer)
替换介绍: 国能磐石服务器操作系统CEOS 对标 Linux 服务器操作系统(Ubuntu, CentOS) 东方通TongHttpServer 对标 Nginx 负载均衡Web服务器 第一步: 服务器安装CEOS映像文件,可直接安装,本文采用使用VMware …...

C#里使用libxl读取EXCEL文件里的图片并保存出来
有时候需要读取EXCEL里的图片文件, 因为很多用户喜欢使用图片保存在EXCEL里,比如用户保存一些现场整改的图片。 如果需要把这些图片抽取出来,再保存到系统里,就需要读取这些图片数据,生成合适的文件再保存。 在libxl里也提供了这样的方法, 如下: var picType = boo…...

【开源免费】基于SpringBoot+Vue.JS企业级工位管理系统(JAVA毕业设计)
本文项目编号 T 127 ,文末自助获取源码 \color{red}{T127,文末自助获取源码} T127,文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…...
美国大学的计算机科学专业排名
美国的计算机科学专业在全球范围内享有盛誉,许多大学在该领域具有卓越的教学和研究实力。以下是根据最新的排名和信息整理的美国计算机科学专业顶尖大学列表: 2025年 U.S. News 美国本科计算机科学专业排名: 斯坦福大学(Stanfor…...

机器学习实战——决策树:从原理到应用的深度解析
✨个人主页欢迎您的访问 ✨期待您的三连 ✨ ✨个人主页欢迎您的访问 ✨期待您的三连 ✨ ✨个人主页欢迎您的访问 ✨期待您的三连✨ 决策树(Decision Tree)是一种简单而直观的分类与回归模型,在机器学习中广泛应用。它的…...

开源生成式物理引擎Genesis,可模拟世界万物
这是生成大模型时代 —— 它们能生成文本、图像、音频、视频、3D 对象…… 而如果将所有这些组合到一起,我们可能会得到一个世界! 现在,不管是 LeCun 正在探索的世界模型,还是李飞飞想要攻克的空间智能,又或是其他研究…...

kubernetes第七天
1.影响pod调度的因素 nodeName 节点名 resources 资源限制 hostNetwork 宿主机网络 污点 污点容忍 Pod亲和性 Pod反亲和性 节点亲和性 2.污点 通常是作用于worker节点上,其可以影响pod的调度 语法:key[value]:effect effect:[ɪˈfek…...
RK3588上CPU和GPU算力以及opencv resize的性能对比测试
RK3588上CPU和GPU算力以及opencv resize的性能对比测试 一.背景二.小结三.相关链接四.操作步骤1.环境搭建A.安装依赖B.设置GPU为高性能模式C.获取GPU信息D.获取CPU信息 2.调用OpenCL SDK获取GPU信息3.使用OpenCL API计算矩阵乘4.使用clpeak测试GPU的性能5.使用OpenBLAS测试CPU的…...

基于Centos 7系统的安全加固方案
创作不易,麻烦点个免费的赞和关注吧! 声明! 免责声明:本教程作者及相关参与人员对于任何直接或间接使用本教程内容而导致的任何形式的损失或损害,包括但不限于数据丢失、系统损坏、个人隐私泄露或经济损失等…...
IT行业的发展趋势
一、引言 IT(信息技术)行业自诞生以来,就以惊人的速度发展,不断改变着我们的生活、工作和社会结构。如今,随着技术的持续创新、市场需求的演变以及全球经济格局的变化,IT行业正迈向新的发展阶段࿰…...
《探秘开源多模态神经网络模型:AI 新时代的万能钥匙》
《探秘开源多模态神经网络模型:AI 新时代的万能钥匙》 一、多模态模型的崛起之路(一)从单一到多元:模态的融合演进(二)关键技术突破:解锁多模态潜能 二、开源多模态模型深度剖析(一&…...

ROS核心概念解析:从Node到Master,再到roslaunch的全面指南
Node 在ROS中,最小的进程单元就是节点(node)。一个软件包里可以有多个可执行文件,可执行文件在运行之后就成了一个进程(process),这个进程在ROS中就叫做节点。 从程序角度来说,node就是一个可执行文件&…...

装饰模式(Decorator Pattern)重构java邮件发奖系统实战
前言 现在我们有个如下的需求,设计一个邮件发奖的小系统, 需求 1.数据验证 → 2. 敏感信息加密 → 3. 日志记录 → 4. 实际发送邮件 装饰器模式(Decorator Pattern)允许向一个现有的对象添加新的功能,同时又不改变其…...

python打卡day49
知识点回顾: 通道注意力模块复习空间注意力模块CBAM的定义 作业:尝试对今天的模型检查参数数目,并用tensorboard查看训练过程 import torch import torch.nn as nn# 定义通道注意力 class ChannelAttention(nn.Module):def __init__(self,…...

《从零掌握MIPI CSI-2: 协议精解与FPGA摄像头开发实战》-- CSI-2 协议详细解析 (一)
CSI-2 协议详细解析 (一) 1. CSI-2层定义(CSI-2 Layer Definitions) 分层结构 :CSI-2协议分为6层: 物理层(PHY Layer) : 定义电气特性、时钟机制和传输介质(导线&#…...
基于服务器使用 apt 安装、配置 Nginx
🧾 一、查看可安装的 Nginx 版本 首先,你可以运行以下命令查看可用版本: apt-cache madison nginx-core输出示例: nginx-core | 1.18.0-6ubuntu14.6 | http://archive.ubuntu.com/ubuntu focal-updates/main amd64 Packages ng…...

【机器视觉】单目测距——运动结构恢复
ps:图是随便找的,为了凑个封面 前言 在前面对光流法进行进一步改进,希望将2D光流推广至3D场景流时,发现2D转3D过程中存在尺度歧义问题,需要补全摄像头拍摄图像中缺失的深度信息,否则解空间不收敛…...

微信小程序 - 手机震动
一、界面 <button type"primary" bindtap"shortVibrate">短震动</button> <button type"primary" bindtap"longVibrate">长震动</button> 二、js逻辑代码 注:文档 https://developers.weixin.qq…...

【OSG学习笔记】Day 16: 骨骼动画与蒙皮(osgAnimation)
骨骼动画基础 骨骼动画是 3D 计算机图形中常用的技术,它通过以下两个主要组件实现角色动画。 骨骼系统 (Skeleton):由层级结构的骨头组成,类似于人体骨骼蒙皮 (Mesh Skinning):将模型网格顶点绑定到骨骼上,使骨骼移动…...

Unity | AmplifyShaderEditor插件基础(第七集:平面波动shader)
目录 一、👋🏻前言 二、😈sinx波动的基本原理 三、😈波动起来 1.sinx节点介绍 2.vertexPosition 3.集成Vector3 a.节点Append b.连起来 4.波动起来 a.波动的原理 b.时间节点 c.sinx的处理 四、🌊波动优化…...

tauri项目,如何在rust端读取电脑环境变量
如果想在前端通过调用来获取环境变量的值,可以通过标准的依赖: std::env::var(name).ok() 想在前端通过调用来获取,可以写一个command函数: #[tauri::command] pub fn get_env_var(name: String) -> Result<String, Stri…...
SpringAI实战:ChatModel智能对话全解
一、引言:Spring AI 与 Chat Model 的核心价值 🚀 在 Java 生态中集成大模型能力,Spring AI 提供了高效的解决方案 🤖。其中 Chat Model 作为核心交互组件,通过标准化接口简化了与大语言模型(LLM࿰…...