当前位置: 首页 > article >正文

Netty基础—6.Netty实现RPC服务三

大纲

1.RPC的相关概念

2.RPC服务调用端动态代理实现

3.Netty客户端之RPC远程调用过程分析

4.RPC网络通信中的编码解码器

5.Netty服务端之RPC服务提供端的处理

6.RPC服务调用端实现超时功能

5.Netty服务端之RPC服务提供端的处理

(1)RPC服务提供端NettyServer

(2)基于反射调用请求对象的目标方法

(1)RPC服务提供端NettyRpcServer

public class ServiceConfig {private String serviceName;//调用方的服务名称private Class serviceInterfaceClass;//服务接口类型private Class serviceClass;...
}public class NettyRpcServer {private static final Logger logger = LogManager.getLogger(NettyRpcServer.class);private static final int DEFAULT_PORT = 8998;private List<ServiceConfig> serviceConfigs = new CopyOnWriteArrayList<ServiceConfig>();private int port;public NettyRpcServer(int port) {this.port = port;}public void start() {logger.info("Netty RPC Server Starting...");EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup();EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossEventLoopGroup, workerEventLoopGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new RpcDecoder(RpcRequest.class)).addLast(new RpcEncoder(RpcResponse.class)).addLast(new NettyRpcServerHandler(serviceConfigs));}}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);//到这一步为止,server启动了而且监听指定的端口号了ChannelFuture channelFuture = serverBootstrap.bind(port).sync();logger.info("Netty RPC Server started successfully, listened[" + port + "]");//进入一个阻塞的状态,同步一直等待到你的server端要关闭掉channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {logger.error("Netty RPC Server failed to start, listened[" + port + "]");} finally {bossEventLoopGroup.shutdownGracefully();workerEventLoopGroup.shutdownGracefully();}}//可以代理多个服务public void addServiceConfig(ServiceConfig serviceConfig) {this.serviceConfigs.add(serviceConfig);}public static void main(String[] args) {ServiceConfig serviceConfig = new ServiceConfig( "TestService", TestService.class, TestServiceImpl.class);NettyRpcServer nettyRpcServer = new NettyRpcServer(DEFAULT_PORT);nettyRpcServer.addServiceConfig(serviceConfig);nettyRpcServer.start();}
}

(2)基于反射调用请求对象的目标方法

//RpcRequest类需要修改字段调整为如下所示
public class RpcRequest implements Serializable {private String requestId;private String className;private String methodName;private Class[] parameterTypes;//参数类型private Object[] args;//参数值private String invokerApplicationName;//调用方的服务名称private String invokerIp;//调用方的IP地址...
}public class NettyRpcServerHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LogManager.getLogger(NettyRpcServerHandler.class);private ConcurrentHashMap<String, ServiceConfig> serviceConfigMap = new ConcurrentHashMap<String, ServiceConfig>();public NettyRpcServerHandler(List<ServiceConfig> serviceConfigs) {for (ServiceConfig serviceConfig : serviceConfigs) {String serviceInterfaceClass = serviceConfig.getServiceInterfaceClass().getName();serviceConfigMap.put(serviceInterfaceClass, serviceConfig);}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {RpcRequest rpcRequest = (RpcRequest)msg;logger.info("Netty RPC Server receives the request: " + rpcRequest);RpcResponse rpcResponse = new RpcResponse();rpcResponse.setRequestId(rpcRequest.getRequestId());try {//此时我们要实现什么呢?//我们需要根据RpcRequest指定的class,获取到这个class//然后通过反射构建这个class对象实例//接着通过反射获取到这个RpcRequest指定方法和入参类型的method//最后通过反射调用,传入方法,拿到返回值//根据接口名字拿到接口实现类的名字后再获取类ServiceConfig serviceConfig = serviceConfigMap.get(rpcRequest.getServiceInterfaceClass());Class clazz = serviceConfig.getServiceClass();Object instance = clazz.newInstance();Method method = clazz.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());Object result = method.invoke(instance, rpcRequest.getArgs());//把rpc调用结果封装到响应里去rpcResponse.setResult(result);rpcResponse.setSuccess(RpcResponse.SUCCESS);} catch(Exception e) {logger.error("Netty RPC Server failed to response the request.", e);rpcResponse.setSuccess(RpcResponse.FAILURE);rpcResponse.setException(e);}ctx.write(rpcResponse);ctx.flush();logger.info("send RPC response to client: " + rpcResponse);}
}

6.RPC服务调用端实现超时功能

public class ReferenceConfig {private static final long DEFAULT_TIMEOUT = 5000;private static final String DEFAULT_SERVICE_HOST = "127.0.0.1";private static final int DEFAULT_SERVICE_PORT = 8998;private Class serviceInterfaceClass;private String serviceHost;private int servicePort;private long timeout;...
}public class NettyRpcClient {private static final Logger logger = LogManager.getLogger(NettyRpcClient.class);private ReferenceConfig referenceConfig;private ChannelFuture channelFuture;private NettyRpcClientHandler nettyRpcClientHandler;public NettyRpcClient(ReferenceConfig referenceConfig) {this.referenceConfig = referenceConfig;this.nettyRpcClientHandler = new NettyRpcClientHandler(referenceConfig.getTimeout());}public void connect() {logger.info("connecting to Netty RPC server: " + referenceConfig.getServiceHost() + ":" + referenceConfig.getServicePort());EventLoopGroup eventLoopGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)//长时间没有通信就发送一个检测包.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new RpcEncoder(RpcRequest.class)).addLast(new RpcDecoder(RpcResponse.class)).addLast(new NettyRpcReadTimeoutHandler(referenceConfig.getTimeout())).addLast(nettyRpcClientHandler);}});       try {if (referenceConfig.getServiceHost() != null && !referenceConfig.getServiceHost().equals("")) {channelFuture = bootstrap.connect(referenceConfig.getServiceHost(), referenceConfig.getServicePort()).sync();logger.info("successfully connected.");}} catch(Exception e) {throw new RuntimeException(e);}}public RpcResponse remoteCall(RpcRequest rpcRequest) throws Throwable {//标记一下请求发起的时间NettyRpcRequestTimeHolder.put(rpcRequest.getRequestId(), new Date().getTime());channelFuture.channel().writeAndFlush(rpcRequest).sync();RpcResponse rpcResponse = nettyRpcClientHandler.getRpcResponse(rpcRequest.getRequestId());logger.info("receives response from netty rpc server.");if (rpcResponse.isSuccess()) {return rpcResponse;}throw rpcResponse.getException();}
}public class NettyRpcReadTimeoutHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LogManager.getLogger(NettyRpcReadTimeoutHandler.class);private long timeout;public NettyRpcReadTimeoutHandler(long timeout) {this.timeout = timeout;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {RpcResponse rpcResponse = (RpcResponse)msg;long requestTime = NettyRpcRequestTimeHolder.get(rpcResponse.getRequestId());long now = new Date().getTime();if (now - requestTime >= timeout) {rpcResponse.setTimeout(true);logger.error("Netty RPC response is marked as timeout status: " + rpcResponse);}//移除发起请求时间的标记NettyRpcRequestTimeHolder.remove(rpcResponse.getRequestId());ctx.fireChannelRead(rpcResponse);}
}public class NettyRpcClientHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LogManager.getLogger(NettyRpcClientHandler.class);private static final long GET_RPC_RESPONSE_SLEEP_INTERVAL = 5;private ConcurrentHashMap<String, RpcResponse> rpcResponses = new ConcurrentHashMap<String, RpcResponse>();private long timeout;public NettyRpcClientHandler(long timeout) {this.timeout = timeout;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {RpcResponse rpcResponse = (RpcResponse) msg;if (rpcResponse.getTimeout()) {logger.error("Netty RPC client receives the response timeout: " + rpcResponse);} else {rpcResponses.put(rpcResponse.getRequestId(), rpcResponse);logger.info("Netty RPC client receives the response: " + rpcResponse);}}public RpcResponse getRpcResponse(String requestId) throws NettyRpcReadTimeoutException {long waitStartTime = new Date().getTime();while (rpcResponses.get(requestId) == null) {try {long now = new Date().getTime();if (now - waitStartTime >= timeout) {break;}Thread.sleep(GET_RPC_RESPONSE_SLEEP_INTERVAL);} catch (InterruptedException e) {logger.error("wait for response interrupted", e);}}RpcResponse rpcResponse = rpcResponses.get(requestId);if (rpcResponse == null) {logger.error("Get RPC response timeout.");throw new NettyRpcReadTimeoutException("Get RPC response timeout.");} else {rpcResponses.remove(requestId);}return rpcResponse;}
}

相关文章:

Netty基础—6.Netty实现RPC服务三

大纲 1.RPC的相关概念 2.RPC服务调用端动态代理实现 3.Netty客户端之RPC远程调用过程分析 4.RPC网络通信中的编码解码器 5.Netty服务端之RPC服务提供端的处理 6.RPC服务调用端实现超时功能 5.Netty服务端之RPC服务提供端的处理 (1)RPC服务提供端NettyServer (2)基于反射…...

用vue3显示websocket的状态

在上次vue3项目上增加一个标签&#xff0c;显示当前的连接状态&#xff0c;两个按钮:重新连接 和 断开连接 修改App.vue <template><header><title>ws状态测试</title></header><main><WsStatus /></main> </template>…...

python拉取大视频导入deepseek大模型解决方案

使用Python拉取大视频并导入大模型&#xff0c;需要综合考虑数据获取、存储、处理和资源管理&#xff0c;确保高效稳定地处理大视频数据&#xff0c;同时充分利用大模型的性能&#xff0c;以下是分步方案及代码示例&#xff1a; --- 1. 分块下载大视频&#xff08;避免内存溢出…...

为什么需要使用十堰高防服务器?

十堰高防服务器的核心价值与应用必要性 一、‌应对复杂攻击的防御能力‌ ‌T级DDoS攻击防护‌ 十堰高防服务器搭载 ‌T级清洗中心‌&#xff0c;支持智能流量调度与分层处理&#xff0c;可抵御 ‌800Gbps-1.2Tbps‌ 的大规模混合攻击&#xff08;如SYN Flood、UDP反射&#xff…...

[特殊字符] 深度实战:Android 13 系统定制之 Recovery 模式瘦身指南

&#x1f31f; 核心需求 在 Android 13 商显设备开发中&#xff0c;需精简 Recovery 模式的菜单选项&#xff08;如Reboot to bootloader/Enter rescue&#xff09;&#xff0c;但直接修改g_menu_actions后在User 版本出现黑屏卡死问题&#xff0c;需综合方案解决。 &#x1f5…...

向量数据库技术系列四-FAISS介绍

一、前言 FAISS&#xff08;Facebook AI Similarity Search&#xff09;是由Facebook AI Research开发的一个开源库&#xff0c;主要用于高效地进行大规模相似性搜索和聚类操作。主要功能如下&#xff1a; 向量索引与搜索&#xff1a;FAISS提供了多种索引和搜索向量的方法&…...

人工智能中的线性代数基础详解

‌ 线性代数是人工智能领域的重要数学基础之一,是人工智能技术的底层数学支柱,它为数据表示、模型构建和算法优化提供了核心工具。其核心概念与算法应用贯穿数据表示、模型训练及优化全过程。更多内容可看我文章:人工智能数学基础详解与拓展-CSDN博客 一、基本介绍 …...

格雷码.

格雷码 - OI Wiki 格雷码_百度百科 简介 格雷码&#xff08;Gray Code&#xff09;&#xff0c;又称为二进制格雷码或循环二进制码&#xff0c;是一种二进制编码方式。它得名于贝尔实验室的工程师弗兰克格雷&#xff08;Frank Gray&#xff09;&#xff0c;他于1940年代提出…...

【毕业论文格式】word分页符后的标题段前间距消失

文章目录 【问题描述】 分页符之后的段落开头&#xff0c;明明设置了标题有段前段后间距&#xff0c;但是没有显示间距&#xff1a; 【解决办法】 选中标题&#xff0c;选择边框 3. 选择段前间距&#xff0c;1~31磅的一个数 结果...

kubernetes对于一个nginx服务的增删改查

1、创建 Nginx 服务 1.1、创建 Deployment Deployment 用于管理 Pod 副本和更新策略。 方式一&#xff1a;命令式创建 kubectl create deployment nginx-deployment --imagenginx:latest --replicas3 --port80--replicas3&#xff1a;指定副本数为 3 --port80&#xff1a;容…...

PackageManagerService

首语 PackageManagerService(以下简称PMS)是Android最核心的系统服务之一&#xff0c;它是应用程序包管理服务&#xff0c;管理手机上所有的应用程序&#xff0c;包括应用程序的安装、卸载、更新、应用信息的查询、应用程序的禁用和启用等。 职责 在Android系统启动过程中扫…...

【蓝桥杯每日一题】3.16

&#x1f3dd;️专栏&#xff1a; 【蓝桥杯备篇】 &#x1f305;主页&#xff1a; f狐o狸x 目录 3.9 高精度算法 一、高精度加法 题目链接&#xff1a; 题目描述&#xff1a; 解题思路&#xff1a; 解题代码&#xff1a; 二、高精度减法 题目链接&#xff1a; 题目描述&…...

2.7 滑动窗口专题:串联所有单词的子串

LeetCode 30. 串联所有单词的子串算法对比分析 1. 题目链接 LeetCode 30. 串联所有单词的子串 2. 题目描述 给定一个字符串 s 和一个字符串数组 words&#xff0c;words 中所有单词长度相同。要求找到 s 中所有起始索引&#xff0c;使得从该位置开始的连续子串包含 words 中所…...

电脑实用小工具--VMware常用功能简介

一、创建、编辑虚拟机 1.1 创建新的虚拟机 详见文章新创建虚拟机流程 1.2 编辑虚拟机 创建完成后&#xff0c;点击编辑虚拟机设置&#xff0c;可对虚拟机内存、处理器、硬盘等各再次进行编辑设置。 二、虚拟机开关机 2.1 打开虚拟机 虚拟机创建成功后&#xff0c;点击…...

为训练大模型而努力-分享2W多张卡通头像的图片

最近我一直在研究AI大模型相关的内容&#xff0c;想着从现在开始慢慢收集各种各样的图片&#xff0c;万一以后需要训练大模型的时候可以用到&#xff0c;或者自己以后也许会需要。于是决定慢慢收集这些图片&#xff0c;为未来的学习和训练大模型做一些铺垫&#xff0c;哈哈。 …...

从零开始学习机器人---如何高效学习机械原理

如何高效学习机械原理 1. 理解课程的核心概念2. 结合图形和模型学习3. 掌握公式和计算方法4. 理论与实践相结合5. 总结和复习6. 保持好奇心和探索精神 总结 机械原理是一门理论性和实践性都很强的课程&#xff0c;涉及到机械系统的运动、动力传递、机构设计等内容。快速学习机械…...

JVM 垃圾回收器的选择

一&#xff1a;jvm性能指标吞吐量以及用户停顿时间解释。 二&#xff1a;垃圾回收器的选择。 三&#xff1a;垃圾回收器在jvm中的配置。 四&#xff1a;jvm中常用的gc算法。 一&#xff1a;jvm性能指标吞吐量以及用户停顿时间解释。 在 JVM 调优和垃圾回收器选择中&#xff0…...

使用GPTQ量化Llama-3-8B大模型

使用GPTQ量化8B生成式语言模型 服务器配置&#xff1a;4*3090 描述&#xff1a;使用四张3090&#xff0c;分别进行单卡量化&#xff0c;多卡量化。并使用SGLang部署量化后的模型&#xff0c;使用GPTQ量化 原来的模型精度为FP16&#xff0c;量化为4bit 首先下载gptqmodel量化…...

2025-03-16 学习记录--C/C++-PTA 习题4-2 求幂级数展开的部分和

合抱之木&#xff0c;生于毫末&#xff1b;九层之台&#xff0c;起于累土&#xff1b;千里之行&#xff0c;始于足下。&#x1f4aa;&#x1f3fb; 一、题目描述 ⭐️ 习题4-2 求幂级数展开的部分和 已知函数e^x可以展开为幂级数1xx^2/2!x^3/3!⋯x^k/k!⋯。现给定一个实数x&a…...

【C#】Http请求设置接收不安全的证书

在进行HTTP请求时&#xff0c;出现以下报错&#xff0c;可设置接收不安全证书跳过证书验证&#xff0c;建议仅测试环境设置&#xff0c;生产环境可能会造成系统漏洞 /// <summary> /// HttpGet请求方法 /// </summary> /// <param name"requestUrl"&…...

从PDF文件中提取数据

笔记 import pdfplumber # 打开PDF文件 with pdfplumber.open(数学公式.pdf) as pdf:for i in pdf.pages: # 遍历页print(i.extract_text()) # extract_text()方法提取内容print(f---------第{i.page_number}页结束---------)...

【k8s001】K8s架构浅析

Kubernetes 架构浅析 #mermaid-svg-irCZnQUuietSX3Ro {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-irCZnQUuietSX3Ro .error-icon{fill:#552222;}#mermaid-svg-irCZnQUuietSX3Ro .error-text{fill:#552222;stroke…...

NPU、边缘计算与算力都是什么啊?

考虑到灵活性和经济性&#xff0c;公司购置一台边缘计算机&#xff0c;正在尝试将PCGPU的计算机视觉项目转到边缘计算机NPU上。本文简单整理了三个概念&#xff0c;并试图将其做个概要的说明。 一、算力&#xff1a;数字世界的“基础能源” 1.1 算力是什么 **算力&#xff08…...

AP AR

混淆矩阵 真实值正例真实值负例预测值正例TPFP预测值负例FNTN &#xff08;根据阈值预测&#xff09; P精确度计算&#xff1a;TP/(TPFP) R召回率计算&#xff1a;TP/(TPFN) AP 综合考虑P R 根据不同的阈值计算出不同的PR组合&#xff0c; 画出PR曲线&#xff0c;计算曲线…...

Leetcode-1278.Palindrome Partitioning III [C++][Java]

目录 一、题目描述 二、解题思路 【C】 【Java】 Leetcode-1278.Palindrome Partitioning IIIhttps://leetcode.com/problems/palindrome-partitioning-iii/description/1278. 分割回文串 III - 力扣&#xff08;LeetCode&#xff09;1278. 分割回文串 III - 给你一个由小写…...

Java集合 - ArrayList

ArrayList 是 Java 集合框架中最常用的动态数组实现类&#xff0c;位于 java.util 包中。它基于数组实现&#xff0c;支持动态扩容和随机访问。 1. 特点 动态数组&#xff1a;ArrayList 的底层是一个数组&#xff0c;可以根据需要动态扩展容量。 有序&#xff1a;元素按照插入…...

C++特性——智能指针

为什么需要智能指针 对于定义的局部变量&#xff0c;当作用域结束之后&#xff0c;就会自动回收&#xff0c;这没有什么问题。 当时用new delete的时候&#xff0c;就是动态分配对象的时候&#xff0c;如果new了一个变量&#xff0c;但却没有delete&#xff0c;这会造成内存泄…...

ctf web入门知识合集

文章目录 01做题思路02信息泄露及利用robots.txt.git文件泄露dirsearch ctfshow做题记录信息搜集web1web2web3web4web5web6web7web8SVN泄露与 Git泄露的区别web9web10 php的基础概念php的基础语法1. PHP 基本语法结构2. PHP 变量3.输出数据4.数组5.超全局变量6.文件操作 php的命…...

DeepSeek:技术教育领域的AI变革者——从理论到实践的全面解析

一、技术教育为何需要DeepSeek&#xff1f; 在数字化转型的浪潮下&#xff0c;技术教育面临着知识更新快、实践门槛高、个性化需求强三大核心挑战。传统的教学模式难以满足开发者快速掌握前沿技术、构建复杂系统能力的需求。DeepSeek作为国产开源大模型的代表&#xff0c;凭借…...

MySQL-存储过程和自定义函数

存储过程 存储过程&#xff0c;一组预编译的 SQL 语句和流程控制语句&#xff0c;被命名并存储在数据库中。存储过程可以用来封装复杂的数据库操作逻辑&#xff0c;并在需要时进行调用。 使用存储过程 创建存储过程 create procedure 存储过程名() begin存储过程的逻辑代码&…...