Netty基础—6.Netty实现RPC服务三
大纲
1.RPC的相关概念
2.RPC服务调用端动态代理实现
3.Netty客户端之RPC远程调用过程分析
4.RPC网络通信中的编码解码器
5.Netty服务端之RPC服务提供端的处理
6.RPC服务调用端实现超时功能
5.Netty服务端之RPC服务提供端的处理
(1)RPC服务提供端NettyServer
(2)基于反射调用请求对象的目标方法
(1)RPC服务提供端NettyRpcServer
public class ServiceConfig {private String serviceName;//调用方的服务名称private Class serviceInterfaceClass;//服务接口类型private Class serviceClass;...
}public class NettyRpcServer {private static final Logger logger = LogManager.getLogger(NettyRpcServer.class);private static final int DEFAULT_PORT = 8998;private List<ServiceConfig> serviceConfigs = new CopyOnWriteArrayList<ServiceConfig>();private int port;public NettyRpcServer(int port) {this.port = port;}public void start() {logger.info("Netty RPC Server Starting...");EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup();EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossEventLoopGroup, workerEventLoopGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new RpcDecoder(RpcRequest.class)).addLast(new RpcEncoder(RpcResponse.class)).addLast(new NettyRpcServerHandler(serviceConfigs));}}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);//到这一步为止,server启动了而且监听指定的端口号了ChannelFuture channelFuture = serverBootstrap.bind(port).sync();logger.info("Netty RPC Server started successfully, listened[" + port + "]");//进入一个阻塞的状态,同步一直等待到你的server端要关闭掉channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {logger.error("Netty RPC Server failed to start, listened[" + port + "]");} finally {bossEventLoopGroup.shutdownGracefully();workerEventLoopGroup.shutdownGracefully();}}//可以代理多个服务public void addServiceConfig(ServiceConfig serviceConfig) {this.serviceConfigs.add(serviceConfig);}public static void main(String[] args) {ServiceConfig serviceConfig = new ServiceConfig( "TestService", TestService.class, TestServiceImpl.class);NettyRpcServer nettyRpcServer = new NettyRpcServer(DEFAULT_PORT);nettyRpcServer.addServiceConfig(serviceConfig);nettyRpcServer.start();}
}
(2)基于反射调用请求对象的目标方法
//RpcRequest类需要修改字段调整为如下所示
public class RpcRequest implements Serializable {private String requestId;private String className;private String methodName;private Class[] parameterTypes;//参数类型private Object[] args;//参数值private String invokerApplicationName;//调用方的服务名称private String invokerIp;//调用方的IP地址...
}public class NettyRpcServerHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LogManager.getLogger(NettyRpcServerHandler.class);private ConcurrentHashMap<String, ServiceConfig> serviceConfigMap = new ConcurrentHashMap<String, ServiceConfig>();public NettyRpcServerHandler(List<ServiceConfig> serviceConfigs) {for (ServiceConfig serviceConfig : serviceConfigs) {String serviceInterfaceClass = serviceConfig.getServiceInterfaceClass().getName();serviceConfigMap.put(serviceInterfaceClass, serviceConfig);}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {RpcRequest rpcRequest = (RpcRequest)msg;logger.info("Netty RPC Server receives the request: " + rpcRequest);RpcResponse rpcResponse = new RpcResponse();rpcResponse.setRequestId(rpcRequest.getRequestId());try {//此时我们要实现什么呢?//我们需要根据RpcRequest指定的class,获取到这个class//然后通过反射构建这个class对象实例//接着通过反射获取到这个RpcRequest指定方法和入参类型的method//最后通过反射调用,传入方法,拿到返回值//根据接口名字拿到接口实现类的名字后再获取类ServiceConfig serviceConfig = serviceConfigMap.get(rpcRequest.getServiceInterfaceClass());Class clazz = serviceConfig.getServiceClass();Object instance = clazz.newInstance();Method method = clazz.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());Object result = method.invoke(instance, rpcRequest.getArgs());//把rpc调用结果封装到响应里去rpcResponse.setResult(result);rpcResponse.setSuccess(RpcResponse.SUCCESS);} catch(Exception e) {logger.error("Netty RPC Server failed to response the request.", e);rpcResponse.setSuccess(RpcResponse.FAILURE);rpcResponse.setException(e);}ctx.write(rpcResponse);ctx.flush();logger.info("send RPC response to client: " + rpcResponse);}
}
6.RPC服务调用端实现超时功能
public class ReferenceConfig {private static final long DEFAULT_TIMEOUT = 5000;private static final String DEFAULT_SERVICE_HOST = "127.0.0.1";private static final int DEFAULT_SERVICE_PORT = 8998;private Class serviceInterfaceClass;private String serviceHost;private int servicePort;private long timeout;...
}public class NettyRpcClient {private static final Logger logger = LogManager.getLogger(NettyRpcClient.class);private ReferenceConfig referenceConfig;private ChannelFuture channelFuture;private NettyRpcClientHandler nettyRpcClientHandler;public NettyRpcClient(ReferenceConfig referenceConfig) {this.referenceConfig = referenceConfig;this.nettyRpcClientHandler = new NettyRpcClientHandler(referenceConfig.getTimeout());}public void connect() {logger.info("connecting to Netty RPC server: " + referenceConfig.getServiceHost() + ":" + referenceConfig.getServicePort());EventLoopGroup eventLoopGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)//长时间没有通信就发送一个检测包.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new RpcEncoder(RpcRequest.class)).addLast(new RpcDecoder(RpcResponse.class)).addLast(new NettyRpcReadTimeoutHandler(referenceConfig.getTimeout())).addLast(nettyRpcClientHandler);}}); try {if (referenceConfig.getServiceHost() != null && !referenceConfig.getServiceHost().equals("")) {channelFuture = bootstrap.connect(referenceConfig.getServiceHost(), referenceConfig.getServicePort()).sync();logger.info("successfully connected.");}} catch(Exception e) {throw new RuntimeException(e);}}public RpcResponse remoteCall(RpcRequest rpcRequest) throws Throwable {//标记一下请求发起的时间NettyRpcRequestTimeHolder.put(rpcRequest.getRequestId(), new Date().getTime());channelFuture.channel().writeAndFlush(rpcRequest).sync();RpcResponse rpcResponse = nettyRpcClientHandler.getRpcResponse(rpcRequest.getRequestId());logger.info("receives response from netty rpc server.");if (rpcResponse.isSuccess()) {return rpcResponse;}throw rpcResponse.getException();}
}public class NettyRpcReadTimeoutHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LogManager.getLogger(NettyRpcReadTimeoutHandler.class);private long timeout;public NettyRpcReadTimeoutHandler(long timeout) {this.timeout = timeout;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {RpcResponse rpcResponse = (RpcResponse)msg;long requestTime = NettyRpcRequestTimeHolder.get(rpcResponse.getRequestId());long now = new Date().getTime();if (now - requestTime >= timeout) {rpcResponse.setTimeout(true);logger.error("Netty RPC response is marked as timeout status: " + rpcResponse);}//移除发起请求时间的标记NettyRpcRequestTimeHolder.remove(rpcResponse.getRequestId());ctx.fireChannelRead(rpcResponse);}
}public class NettyRpcClientHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LogManager.getLogger(NettyRpcClientHandler.class);private static final long GET_RPC_RESPONSE_SLEEP_INTERVAL = 5;private ConcurrentHashMap<String, RpcResponse> rpcResponses = new ConcurrentHashMap<String, RpcResponse>();private long timeout;public NettyRpcClientHandler(long timeout) {this.timeout = timeout;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {RpcResponse rpcResponse = (RpcResponse) msg;if (rpcResponse.getTimeout()) {logger.error("Netty RPC client receives the response timeout: " + rpcResponse);} else {rpcResponses.put(rpcResponse.getRequestId(), rpcResponse);logger.info("Netty RPC client receives the response: " + rpcResponse);}}public RpcResponse getRpcResponse(String requestId) throws NettyRpcReadTimeoutException {long waitStartTime = new Date().getTime();while (rpcResponses.get(requestId) == null) {try {long now = new Date().getTime();if (now - waitStartTime >= timeout) {break;}Thread.sleep(GET_RPC_RESPONSE_SLEEP_INTERVAL);} catch (InterruptedException e) {logger.error("wait for response interrupted", e);}}RpcResponse rpcResponse = rpcResponses.get(requestId);if (rpcResponse == null) {logger.error("Get RPC response timeout.");throw new NettyRpcReadTimeoutException("Get RPC response timeout.");} else {rpcResponses.remove(requestId);}return rpcResponse;}
}
相关文章:
Netty基础—6.Netty实现RPC服务三
大纲 1.RPC的相关概念 2.RPC服务调用端动态代理实现 3.Netty客户端之RPC远程调用过程分析 4.RPC网络通信中的编码解码器 5.Netty服务端之RPC服务提供端的处理 6.RPC服务调用端实现超时功能 5.Netty服务端之RPC服务提供端的处理 (1)RPC服务提供端NettyServer (2)基于反射…...
用vue3显示websocket的状态
在上次vue3项目上增加一个标签,显示当前的连接状态,两个按钮:重新连接 和 断开连接 修改App.vue <template><header><title>ws状态测试</title></header><main><WsStatus /></main> </template>…...
python拉取大视频导入deepseek大模型解决方案
使用Python拉取大视频并导入大模型,需要综合考虑数据获取、存储、处理和资源管理,确保高效稳定地处理大视频数据,同时充分利用大模型的性能,以下是分步方案及代码示例: --- 1. 分块下载大视频(避免内存溢出…...
为什么需要使用十堰高防服务器?
十堰高防服务器的核心价值与应用必要性 一、应对复杂攻击的防御能力 T级DDoS攻击防护 十堰高防服务器搭载 T级清洗中心,支持智能流量调度与分层处理,可抵御 800Gbps-1.2Tbps 的大规模混合攻击(如SYN Flood、UDP反射ÿ…...
[特殊字符] 深度实战:Android 13 系统定制之 Recovery 模式瘦身指南
🌟 核心需求 在 Android 13 商显设备开发中,需精简 Recovery 模式的菜单选项(如Reboot to bootloader/Enter rescue),但直接修改g_menu_actions后在User 版本出现黑屏卡死问题,需综合方案解决。 ǵ…...
向量数据库技术系列四-FAISS介绍
一、前言 FAISS(Facebook AI Similarity Search)是由Facebook AI Research开发的一个开源库,主要用于高效地进行大规模相似性搜索和聚类操作。主要功能如下: 向量索引与搜索:FAISS提供了多种索引和搜索向量的方法&…...
人工智能中的线性代数基础详解
线性代数是人工智能领域的重要数学基础之一,是人工智能技术的底层数学支柱,它为数据表示、模型构建和算法优化提供了核心工具。其核心概念与算法应用贯穿数据表示、模型训练及优化全过程。更多内容可看我文章:人工智能数学基础详解与拓展-CSDN博客 一、基本介绍 …...
格雷码.
格雷码 - OI Wiki 格雷码_百度百科 简介 格雷码(Gray Code),又称为二进制格雷码或循环二进制码,是一种二进制编码方式。它得名于贝尔实验室的工程师弗兰克格雷(Frank Gray),他于1940年代提出…...
【毕业论文格式】word分页符后的标题段前间距消失
文章目录 【问题描述】 分页符之后的段落开头,明明设置了标题有段前段后间距,但是没有显示间距: 【解决办法】 选中标题,选择边框 3. 选择段前间距,1~31磅的一个数 结果...
kubernetes对于一个nginx服务的增删改查
1、创建 Nginx 服务 1.1、创建 Deployment Deployment 用于管理 Pod 副本和更新策略。 方式一:命令式创建 kubectl create deployment nginx-deployment --imagenginx:latest --replicas3 --port80--replicas3:指定副本数为 3 --port80:容…...
PackageManagerService
首语 PackageManagerService(以下简称PMS)是Android最核心的系统服务之一,它是应用程序包管理服务,管理手机上所有的应用程序,包括应用程序的安装、卸载、更新、应用信息的查询、应用程序的禁用和启用等。 职责 在Android系统启动过程中扫…...
【蓝桥杯每日一题】3.16
🏝️专栏: 【蓝桥杯备篇】 🌅主页: f狐o狸x 目录 3.9 高精度算法 一、高精度加法 题目链接: 题目描述: 解题思路: 解题代码: 二、高精度减法 题目链接: 题目描述&…...
2.7 滑动窗口专题:串联所有单词的子串
LeetCode 30. 串联所有单词的子串算法对比分析 1. 题目链接 LeetCode 30. 串联所有单词的子串 2. 题目描述 给定一个字符串 s 和一个字符串数组 words,words 中所有单词长度相同。要求找到 s 中所有起始索引,使得从该位置开始的连续子串包含 words 中所…...
电脑实用小工具--VMware常用功能简介
一、创建、编辑虚拟机 1.1 创建新的虚拟机 详见文章新创建虚拟机流程 1.2 编辑虚拟机 创建完成后,点击编辑虚拟机设置,可对虚拟机内存、处理器、硬盘等各再次进行编辑设置。 二、虚拟机开关机 2.1 打开虚拟机 虚拟机创建成功后,点击…...
为训练大模型而努力-分享2W多张卡通头像的图片
最近我一直在研究AI大模型相关的内容,想着从现在开始慢慢收集各种各样的图片,万一以后需要训练大模型的时候可以用到,或者自己以后也许会需要。于是决定慢慢收集这些图片,为未来的学习和训练大模型做一些铺垫,哈哈。 …...
从零开始学习机器人---如何高效学习机械原理
如何高效学习机械原理 1. 理解课程的核心概念2. 结合图形和模型学习3. 掌握公式和计算方法4. 理论与实践相结合5. 总结和复习6. 保持好奇心和探索精神 总结 机械原理是一门理论性和实践性都很强的课程,涉及到机械系统的运动、动力传递、机构设计等内容。快速学习机械…...
JVM 垃圾回收器的选择
一:jvm性能指标吞吐量以及用户停顿时间解释。 二:垃圾回收器的选择。 三:垃圾回收器在jvm中的配置。 四:jvm中常用的gc算法。 一:jvm性能指标吞吐量以及用户停顿时间解释。 在 JVM 调优和垃圾回收器选择中࿰…...
使用GPTQ量化Llama-3-8B大模型
使用GPTQ量化8B生成式语言模型 服务器配置:4*3090 描述:使用四张3090,分别进行单卡量化,多卡量化。并使用SGLang部署量化后的模型,使用GPTQ量化 原来的模型精度为FP16,量化为4bit 首先下载gptqmodel量化…...
2025-03-16 学习记录--C/C++-PTA 习题4-2 求幂级数展开的部分和
合抱之木,生于毫末;九层之台,起于累土;千里之行,始于足下。💪🏻 一、题目描述 ⭐️ 习题4-2 求幂级数展开的部分和 已知函数e^x可以展开为幂级数1xx^2/2!x^3/3!⋯x^k/k!⋯。现给定一个实数x&a…...
【C#】Http请求设置接收不安全的证书
在进行HTTP请求时,出现以下报错,可设置接收不安全证书跳过证书验证,建议仅测试环境设置,生产环境可能会造成系统漏洞 /// <summary> /// HttpGet请求方法 /// </summary> /// <param name"requestUrl"&…...
从PDF文件中提取数据
笔记 import pdfplumber # 打开PDF文件 with pdfplumber.open(数学公式.pdf) as pdf:for i in pdf.pages: # 遍历页print(i.extract_text()) # extract_text()方法提取内容print(f---------第{i.page_number}页结束---------)...
【k8s001】K8s架构浅析
Kubernetes 架构浅析 #mermaid-svg-irCZnQUuietSX3Ro {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-irCZnQUuietSX3Ro .error-icon{fill:#552222;}#mermaid-svg-irCZnQUuietSX3Ro .error-text{fill:#552222;stroke…...
NPU、边缘计算与算力都是什么啊?
考虑到灵活性和经济性,公司购置一台边缘计算机,正在尝试将PCGPU的计算机视觉项目转到边缘计算机NPU上。本文简单整理了三个概念,并试图将其做个概要的说明。 一、算力:数字世界的“基础能源” 1.1 算力是什么 **算力(…...
AP AR
混淆矩阵 真实值正例真实值负例预测值正例TPFP预测值负例FNTN (根据阈值预测) P精确度计算:TP/(TPFP) R召回率计算:TP/(TPFN) AP 综合考虑P R 根据不同的阈值计算出不同的PR组合, 画出PR曲线,计算曲线…...
Leetcode-1278.Palindrome Partitioning III [C++][Java]
目录 一、题目描述 二、解题思路 【C】 【Java】 Leetcode-1278.Palindrome Partitioning IIIhttps://leetcode.com/problems/palindrome-partitioning-iii/description/1278. 分割回文串 III - 力扣(LeetCode)1278. 分割回文串 III - 给你一个由小写…...
Java集合 - ArrayList
ArrayList 是 Java 集合框架中最常用的动态数组实现类,位于 java.util 包中。它基于数组实现,支持动态扩容和随机访问。 1. 特点 动态数组:ArrayList 的底层是一个数组,可以根据需要动态扩展容量。 有序:元素按照插入…...
C++特性——智能指针
为什么需要智能指针 对于定义的局部变量,当作用域结束之后,就会自动回收,这没有什么问题。 当时用new delete的时候,就是动态分配对象的时候,如果new了一个变量,但却没有delete,这会造成内存泄…...
ctf web入门知识合集
文章目录 01做题思路02信息泄露及利用robots.txt.git文件泄露dirsearch ctfshow做题记录信息搜集web1web2web3web4web5web6web7web8SVN泄露与 Git泄露的区别web9web10 php的基础概念php的基础语法1. PHP 基本语法结构2. PHP 变量3.输出数据4.数组5.超全局变量6.文件操作 php的命…...
DeepSeek:技术教育领域的AI变革者——从理论到实践的全面解析
一、技术教育为何需要DeepSeek? 在数字化转型的浪潮下,技术教育面临着知识更新快、实践门槛高、个性化需求强三大核心挑战。传统的教学模式难以满足开发者快速掌握前沿技术、构建复杂系统能力的需求。DeepSeek作为国产开源大模型的代表,凭借…...
MySQL-存储过程和自定义函数
存储过程 存储过程,一组预编译的 SQL 语句和流程控制语句,被命名并存储在数据库中。存储过程可以用来封装复杂的数据库操作逻辑,并在需要时进行调用。 使用存储过程 创建存储过程 create procedure 存储过程名() begin存储过程的逻辑代码&…...
