当前位置: 首页 > news >正文

使用 Netty 实现 RPC 通信框架

使用 Netty 实现 RPC 通信框架

远程过程调用(RPC,Remote Procedure Call) 是分布式系统中非常重要的通信机制。它允许客户端调用远程服务器上的方法,就像调用本地方法一样。RPC 的核心在于屏蔽底层通信细节,使开发者关注业务逻辑。

Netty 作为一个高性能的网络通信框架,非常适合实现 RPC 框架。本篇文章将介绍如何使用 Netty 实现一个简单的 RPC 通信框架。


1. RPC 通信框架基本原理

1.1 核心组成

RPC 框架的核心模块通常包括:

  1. 服务注册与发现
    • 将服务接口及其实现类的地址注册到中心(如注册中心或简单的服务端映射)。
  2. 序列化与反序列化
    • 将方法调用、参数等序列化成字节流,传输到远程服务器,服务器再反序列化进行处理。
  3. 网络通信
    • 使用 Netty 实现客户端和服务端之间的数据传输。
  4. 动态代理
    • 使用动态代理拦截客户端对接口的调用,将调用信息发送到服务端并返回结果。

1.2 RPC 调用流程

  1. 客户端
    • 客户端调用代理对象的方法。
    • 代理对象将方法、参数打包成 RPC 请求,发送到服务器。
  2. 服务器
    • 服务器解析 RPC 请求,定位到具体的方法和参数。
    • 调用本地方法,获取结果后返回给客户端。
  3. 客户端
    • 接收服务器的响应,将结果返回给调用者。

2. Netty 实现 RPC 通信框架

2.1 项目结构设计

src/main/java/
├── common/         // 通用模块
│   ├── RpcRequest.java       // RPC 请求封装
│   ├── RpcResponse.java      // RPC 响应封装
│   ├── Serializer.java       // 序列化接口
│   ├── JsonSerializer.java   // JSON 序列化实现
├── server/         // 服务端模块
│   ├── RpcServer.java         // RPC 服务端
│   ├── ServiceRegistry.java   // 服务注册表
├── client/         // 客户端模块
│   ├── RpcClient.java         // RPC 客户端
│   ├── RpcProxy.java          // 客户端动态代理

2.2 核心代码实现

2.2.1 通用模块

(1) RPC 请求与响应类

RpcRequestRpcResponse 用于封装客户端发送的请求和服务器的响应。

public class RpcRequest {private String methodName; // 方法名private String className;  // 类名private Object[] parameters; // 参数private Class<?>[] paramTypes; // 参数类型// Getters and setters
}public class RpcResponse {private Object result; // 方法调用结果private String error;  // 错误信息(如果有)// Getters and setters
}

(2) 序列化接口

为确保传输的数据可以跨网络传递,定义序列化与反序列化的接口。

public interface Serializer {byte[] serialize(Object obj);   // 序列化<T> T deserialize(byte[] bytes, Class<T> clazz); // 反序列化
}

(3) JSON 序列化实现

使用 Jackson 实现简单的 JSON 序列化。

import com.fasterxml.jackson.databind.ObjectMapper;public class JsonSerializer implements Serializer {private static final ObjectMapper objectMapper = new ObjectMapper();@Overridepublic byte[] serialize(Object obj) {try {return objectMapper.writeValueAsBytes(obj);} catch (Exception e) {throw new RuntimeException("Serialization failed", e);}}@Overridepublic <T> T deserialize(byte[] bytes, Class<T> clazz) {try {return objectMapper.readValue(bytes, clazz);} catch (Exception e) {throw new RuntimeException("Deserialization failed", e);}}
}

2.2.2 服务端模块

(1) 服务注册表

ServiceRegistry 用于存储服务接口与实现类的映射。

import java.util.HashMap;
import java.util.Map;public class ServiceRegistry {private final Map<String, Object> services = new HashMap<>();public void register(String className, Object serviceImpl) {services.put(className, serviceImpl);}public Object getService(String className) {return services.get(className);}
}

(2) RPC 服务端

服务端接收 RPC 请求并调用对应的服务实现。

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;public class RpcServer {private final int port;private final ServiceRegistry serviceRegistry;public RpcServer(int port, ServiceRegistry serviceRegistry) {this.port = port;this.serviceRegistry = serviceRegistry;}public void start() throws InterruptedException {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 4));pipeline.addLast(new LengthFieldPrepender(4));pipeline.addLast(new RpcServerHandler(serviceRegistry));}});ChannelFuture future = bootstrap.bind(port).sync();System.out.println("RPC Server started on port " + port);future.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}class RpcServerHandler extends SimpleChannelInboundHandler<RpcRequest> {private final ServiceRegistry serviceRegistry;public RpcServerHandler(ServiceRegistry serviceRegistry) {this.serviceRegistry = serviceRegistry;}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception {Object service = serviceRegistry.getService(request.getClassName());if (service == null) {ctx.writeAndFlush(new RpcResponse(null, "Service not found"));return;}// 调用服务实现Object result = service.getClass().getMethod(request.getMethodName(), request.getParamTypes()).invoke(service, request.getParameters());ctx.writeAndFlush(new RpcResponse(result, null));}
}

2.2.3 客户端模块

(1) RPC 客户端

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;public class RpcClient {private final String host;private final int port;public RpcClient(String host, int port) {this.host = host;this.port = port;}public RpcResponse send(RpcRequest request) throws InterruptedException {EventLoopGroup group = new NioEventLoopGroup();RpcResponse response = new RpcResponse();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new RpcClientHandler(response));}});Channel channel = bootstrap.connect(host, port).sync().channel();channel.writeAndFlush(request).sync();channel.closeFuture().sync();} finally {group.shutdownGracefully();}return response;}
}class RpcClientHandler extends SimpleChannelInboundHandler<RpcResponse> {private final RpcResponse response;public RpcClientHandler(RpcResponse response) {this.response = response;}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception {response.setResult(msg.getResult());response.setError(msg.getError());}
}

(2) 动态代理

import java.lang.reflect.Proxy;public class RpcProxy {private final RpcClient client;public RpcProxy(RpcClient client) {this.client = client;}@SuppressWarnings("unchecked")public <T> T create(Class<T> serviceClass) {return (T) Proxy.newProxyInstance(serviceClass.getClassLoader(), new Class<?>[]{serviceClass}, (proxy, method, args) -> {RpcRequest request = new RpcRequest();request.setClassName(serviceClass.getName());request.setMethodName(method.getName());request.setParameters(args);request.setParamTypes(method.getParameterTypes());RpcResponse response = client.send(request);if (response.getError() != null) {throw new RuntimeException(response.getError());}return response.getResult();});}
}

2.3 测试示例

  1. 定义服务接口和实现:

    public interface HelloService {String sayHello(String name);
    }public class HelloServiceImpl implements HelloService {@Overridepublic String sayHello(String name) {return "Hello, " + name;}
    }
    
  2. 服务端注册服务并启动:

    ServiceRegistry registry = new ServiceRegistry();
    registry.register(HelloService.class.getName(), new HelloServiceImpl());RpcServer server = new RpcServer(8080, registry);
    server.start();
    
  3. 客户端调用服务:

    RpcClient client = new RpcClient("localhost", 8080);
    RpcProxy proxy = new RpcProxy(client);HelloService service = proxy.create(HelloService.class);
    String result = service.sayHello("Netty");
    System.out.println(result); // 输出: Hello, Netty
    

3. 总结

通过上述代码,我们实现了一个简单的基于 Netty 的 RPC 通信框架,涵盖了服务注册、序列化、网络通信和动态代理等核心模块。

关键点回顾

  1. 服务端:通过 ServiceRegistry 注册服务,并使用 Netty 接收和处理 RPC 请求。
  2. 客户端:通过动态代理封装 RPC 调用,简化客户端使用。
  3. 序列化:使用 JSON 进行数据的序列化和反序列化。

该框架可以作为一个简单的模板,在实际应用中可扩展为支持注册中心(如 Zookeeper)、负载均衡、异步调用等高级功能的完整 RPC 框架。

相关文章:

使用 Netty 实现 RPC 通信框架

使用 Netty 实现 RPC 通信框架 远程过程调用&#xff08;RPC&#xff0c;Remote Procedure Call&#xff09; 是分布式系统中非常重要的通信机制。它允许客户端调用远程服务器上的方法&#xff0c;就像调用本地方法一样。RPC 的核心在于屏蔽底层通信细节&#xff0c;使开发者关…...

【机器学习06--贝叶斯分类器】

文章目录 基础理解01 贝叶斯决策论02 极大似然估计03 朴素贝叶斯分类器04 半朴素贝叶斯分类器05 贝叶斯网06 EM算法 补充修正1. 贝叶斯定理与分类的基本概念2. 贝叶斯决策论3. 极大似然估计4. 朴素贝叶斯分类器5. 半朴素贝叶斯分类器6. 贝叶斯网7. EM算法 面试常考 基础理解 本…...

创建vue3项目步骤以及安装第三方插件步骤【保姆级教程】

&#x1f399;座右铭&#xff1a;得之坦然&#xff0c;失之淡然。 &#x1f48e;擅长领域&#xff1a;前端 是的&#xff0c;我需要您的&#xff1a; &#x1f9e1;点赞❤️关注&#x1f499;收藏&#x1f49b; 是我持续下去的动力&#xff01; 目录 一. 简单汇总一下创建…...

[146 LRU缓存](https://leetcode.cn/problems/lru-cache/)

分析 维护一个双向链表保存缓存中的元素。 如果元素超过容量阈值&#xff0c;则删除最久未使用的元素。为了实现这个功能&#xff0c;将get(), put()方法获取的元素添加到链表首部。 为了在O(1)时间复杂度执行get()方法&#xff0c;再新建一个映射表&#xff0c;缓存key与链表…...

【Java Nio Netty】基于TCP的简单Netty自定义协议实现(万字,全篇例子)

基于TCP的简单Netty自定义协议实现&#xff08;万字&#xff0c;全篇例子&#xff09; 前言 有一阵子没写博客了&#xff0c;最近在学习Netty写一个实时聊天软件&#xff0c;一个高性能异步事件驱动的网络应用框架&#xff0c;我们常用的SpringBoot一般基于Http协议&#xff0…...

【JavaWeb后端学习笔记】Redis常用命令以及Java客户端操作Redis

redis 1、redis安装与启动服务2、redis数据类型3、redis常用命令3.1 字符串String3.2 哈希Hash3.3 列表List3.4 集合Set&#xff08;无序&#xff09;3.5 有序集合zset3.6 通用命令 4、使用Java操作Redis4.1 环境准备4.2 Java操作字符串String4.3 Java操作哈希Hash4.4 Java操作…...

pdb调试器详解

文章目录 1. 启动 pdb 调试器1.1 在代码中插入断点1.2 使用命令行直接调试脚本 2. 常用调试命令2.1 基本命令2.2 高级命令2.3 断点操作 3. 调试过程示例4. 调试技巧4.1 条件断点4.2 自动启用调试4.2.1 运行程序时指定 -m pdb4.2.2在代码中启用 pdb.post_mortem4.2.3 使用 sys.e…...

项目15:简易扫雷--- 《跟着小王学Python·新手》

项目15&#xff1a;简易扫雷 — 《跟着小王学Python新手》 《跟着小王学Python》 是一套精心设计的Python学习教程&#xff0c;适合各个层次的学习者。本教程从基础语法入手&#xff0c;逐步深入到高级应用&#xff0c;以实例驱动的方式&#xff0c;帮助学习者逐步掌握Python的…...

Flink CDC实时同步mysql数据

官方参考资料&#xff1a; https://nightlies.apache.org/flink/flink-cdc-docs-master/zh/docs/connectors/flink-sources/mysql-cdc/ Apache Flink 的 Change Data Capture (CDC) 是一种用于捕获数据库变化&#xff08;如插入、更新和删除操作&#xff09;的技术。Flink CDC…...

题解 - 自然数无序拆分

题目描述 美羊羊给喜羊羊和沸羊羊出了一道难题&#xff0c;说谁能先做出来&#xff0c;我就奖励给他我自己做的一样礼物。沸羊羊这下可乐了&#xff0c;于是马上答应立刻做出来&#xff0c;喜羊羊见状&#xff0c;当然也不甘示弱&#xff0c;向沸羊羊发起了挑战。 可是这道题目…...

dfs_bool_void 两种写法感悟

dfs 的两种写法 在看之前实现图的遍历 dfs 和拓扑排序 dfs 实现的代码的时候的感悟 图的遍历 dfs 和拓扑排序 dfs 的区别 0 → 1 ↓ ↓ 2 → 3图的邻接表表示&#xff1a; adjList[0] {1, 2}; adjList[1] {3}; adjList[2] {3}; adjList[3] {};正常的 DFS 遍历&#x…...

MySQL 主从复制与 Binlog 深度解析

目录 1. Binlog的工作原理与配置2. 主从复制的设置与故障排除3. 数据一致性与同步延迟的处理 小结 MySQL的binlog&#xff08;二进制日志&#xff09;和主从复制是实现数据备份、容灾、负载均衡以及数据同步的重要机制。在高可用性架构和分布式数据库设计中&#xff0c;binlog同…...

大连理工大学《2024年845自动控制原理真题》 (完整版)

本文内容&#xff0c;全部选自自动化考研联盟的&#xff1a;《大连理工大学845自控考研资料》的真题篇。后续会持续更新更多学校&#xff0c;更多年份的真题&#xff0c;记得关注哦 目录 2024年真题 Part1&#xff1a;2024年完整版真题 2024年真题...

Java性能调优 - 多线程性能调优

锁优化 Synchronized 在JDK1.6中引入了分级锁机制来优化Synchronized。当一个线程获取锁时 首先对象锁将成为一个偏向锁&#xff0c;这样做是为了优化同一线程重复获取锁&#xff0c;导致的用户态与内核态的切换问题&#xff1b;其次如果有多个线程竞争锁资源&#xff0c;锁…...

行为树详解(4)——节点参数配置化

【分析】 行为树是否足够灵活强大依赖于足够丰富的各类条件节点和动作节点&#xff0c;在实现这些节点时&#xff0c;不可避免的&#xff0c;节点本身需要有一些参数供配置。 这些参数可以分为静态的固定值的参数以及动态读取设置的参数。 静态参数直接设置为Public即可&…...

计算机网络中的三大交换技术详解与实现

目录 计算机网络中的三大交换技术详解与实现1. 计算机网络中的交换技术概述1.1 交换技术的意义1.2 三大交换技术简介 2. 电路交换技术2.1 理论介绍2.2 Python实现及代码详解2.3 案例分析 3. 分组交换技术3.1 理论介绍3.2 Python实现及代码详解3.3 案例分析 4. 报文交换技术4.1 …...

《杨辉三角》

题目描述 给出 n(1≤n≤20)n(1≤n≤20)&#xff0c;输出杨辉三角的前 nn 行。 如果你不知道什么是杨辉三角&#xff0c;可以观察样例找找规律。 输入格式 无 输出格式 无 输入输出样例 输入 #1复制 6 输出 #1复制 1 1 1 1 2 1 1 3 3 1 1 4 6 4 1 1 5 10 10 5 1 C语言…...

ARM学习(35)单元测试框架以及MinGW GCC覆盖率报告

单元测试框架以及MinGW GCC覆盖率报告 1、单元测试与覆盖率简介 随着代码越写越多,越来越需要注意自测的重要性,基本可以提前解决90%的问题,所以就来介绍一下单元测试,单元测试是否测试充分,需要进行评价,覆盖率就是单元测试是否充分的评估工具。 例如跑过单元测试后,…...

边缘计算+人工智能:让设备更聪明的秘密

引言&#xff1a;日常生活中的“智能”设备 你是否发现&#xff0c;身边的设备正变得越来越“聪明”&#xff1f; 早上醒来时&#xff0c;智能音箱已经根据你的日程播放舒缓音乐&#xff1b;走进厨房&#xff0c;智能冰箱提醒你今天的食材库存&#xff1b;而在城市道路上&…...

neo4j知识图谱AOPC的安装方法

AOPC下载链接&#xff1a;aopc全版本github下载 APOC&#xff0c;全称为Awesome Procedures On Cypher&#xff0c;是Neo4j图数据库的一个非常强大和流行的扩展库。它极大地丰富了Cypher查询语言的功能&#xff0c;提供了超过450个过程&#xff08;procedures&#xff09;和函数…...

Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以?

Golang 面试经典题&#xff1a;map 的 key 可以是什么类型&#xff1f;哪些不可以&#xff1f; 在 Golang 的面试中&#xff0c;map 类型的使用是一个常见的考点&#xff0c;其中对 key 类型的合法性 是一道常被提及的基础却很容易被忽视的问题。本文将带你深入理解 Golang 中…...

Spring Boot 实现流式响应(兼容 2.7.x)

在实际开发中&#xff0c;我们可能会遇到一些流式数据处理的场景&#xff0c;比如接收来自上游接口的 Server-Sent Events&#xff08;SSE&#xff09; 或 流式 JSON 内容&#xff0c;并将其原样中转给前端页面或客户端。这种情况下&#xff0c;传统的 RestTemplate 缓存机制会…...

【CSS position 属性】static、relative、fixed、absolute 、sticky详细介绍,多层嵌套定位示例

文章目录 ★ position 的五种类型及基本用法 ★ 一、position 属性概述 二、position 的五种类型详解(初学者版) 1. static(默认值) 2. relative(相对定位) 3. absolute(绝对定位) 4. fixed(固定定位) 5. sticky(粘性定位) 三、定位元素的层级关系(z-i…...

WordPress插件:AI多语言写作与智能配图、免费AI模型、SEO文章生成

厌倦手动写WordPress文章&#xff1f;AI自动生成&#xff0c;效率提升10倍&#xff01; 支持多语言、自动配图、定时发布&#xff0c;让内容创作更轻松&#xff01; AI内容生成 → 不想每天写文章&#xff1f;AI一键生成高质量内容&#xff01;多语言支持 → 跨境电商必备&am…...

c#开发AI模型对话

AI模型 前面已经介绍了一般AI模型本地部署&#xff0c;直接调用现成的模型数据。这里主要讲述讲接口集成到我们自己的程序中使用方式。 微软提供了ML.NET来开发和使用AI模型&#xff0c;但是目前国内可能使用不多&#xff0c;至少实践例子很少看见。开发训练模型就不介绍了&am…...

【开发技术】.Net使用FFmpeg视频特定帧上绘制内容

目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法&#xff0c;当前调用一个医疗行业的AI识别算法后返回…...

佰力博科技与您探讨热释电测量的几种方法

热释电的测量主要涉及热释电系数的测定&#xff0c;这是表征热释电材料性能的重要参数。热释电系数的测量方法主要包括静态法、动态法和积分电荷法。其中&#xff0c;积分电荷法最为常用&#xff0c;其原理是通过测量在电容器上积累的热释电电荷&#xff0c;从而确定热释电系数…...

网站指纹识别

网站指纹识别 网站的最基本组成&#xff1a;服务器&#xff08;操作系统&#xff09;、中间件&#xff08;web容器&#xff09;、脚本语言、数据厍 为什么要了解这些&#xff1f;举个例子&#xff1a;发现了一个文件读取漏洞&#xff0c;我们需要读/etc/passwd&#xff0c;如…...

Unity中的transform.up

2025年6月8日&#xff0c;周日下午 在Unity中&#xff0c;transform.up是Transform组件的一个属性&#xff0c;表示游戏对象在世界空间中的“上”方向&#xff08;Y轴正方向&#xff09;&#xff0c;且会随对象旋转动态变化。以下是关键点解析&#xff1a; 基本定义 transfor…...

多元隐函数 偏导公式

我们来推导隐函数 z z ( x , y ) z z(x, y) zz(x,y) 的偏导公式&#xff0c;给定一个隐函数关系&#xff1a; F ( x , y , z ( x , y ) ) 0 F(x, y, z(x, y)) 0 F(x,y,z(x,y))0 &#x1f9e0; 目标&#xff1a; 求 ∂ z ∂ x \frac{\partial z}{\partial x} ∂x∂z​、 …...