Linkis-RPC的设计思想
我的技术网站 java-broke.site,有大厂完整面经,工作技术,架构师成长之路,等经验分享
Linkis-RPC的设计目标是提供一种灵活、可扩展的微服务间通信机制,支持以下功能:
- 异步请求与响应:支持请求方发送异步请求,接收方处理完后再响应给请求方。
- 广播请求:支持将请求广播给所有目标服务实例。
- 自定义拦截器:允许用户定义拦截器来实现特定功能,如缓存、重试等。
- 服务选择:基于Eureka实现的服务发现,支持通过服务名和实例信息选择特定服务实例。
- 解耦通信机制:请求方和接收方的代码实现解耦,便于独立开发和维护。
Linkis-RPC的架构概述
Linkis-RPC主要由以下几个核心组件构成:
- Sender:请求发送器,用于发送请求到目标服务。
- Receiver:请求接收器,负责接收和处理请求。
- Interceptor:拦截器链,用于在请求发送前进行处理,如广播、重试、缓存等。
- Decoder/Encoder:用于请求和响应的序列化和反序列化。
- Eureka:服务注册与发现中心。
Linkis-RPC的代码结构
Linkis-RPC的源码结构清晰,主要代码模块包括:
- Sender接口和实现:负责发送请求并处理响应。
- Receiver接口和实现:负责接收请求并执行业务逻辑。
- Interceptor接口和实现:拦截器实现类,用于增强请求功能。
- RPCReceiveRestful:内嵌的HTTP服务,用于接收和解码请求。
- 请求和响应编码器/解码器:实现请求和响应的序列化和反序列化。
接下来,我们将逐步分析每个模块的代码实现。
Sender的实现
Sender接口
Sender
接口提供了多种发送请求的方法,包括同步和异步请求。
public abstract class Sender {/*** 同步请求方法,等待接收方返回响应。** @param message 请求消息对象* @return 响应对象*/public abstract Object ask(Object message);/*** 带超时设置的同步请求方法。** @param message 请求消息对象* @param timeout 超时时间* @return 响应对象*/public abstract Object ask(Object message, Duration timeout);/*** 仅发送请求,不关心响应。** @param message 请求消息对象*/public abstract void send(Object message);/*** 异步请求方法,在稍后通过其他线程发送请求。** @param message 请求消息对象*/public abstract void deliver(Object message);
}
Sender的实现类
Sender
的实现类主要负责构建请求并通过Feign客户端发送请求。
public class DefaultSender extends Sender {private final FeignClient feignClient;private final String serviceName;public DefaultSender(String serviceName, FeignClient feignClient) {this.serviceName = serviceName;this.feignClient = feignClient;}@Overridepublic Object ask(Object message) {return feignClient.post(message);}@Overridepublic Object ask(Object message, Duration timeout) {// 设置请求超时逻辑return feignClient.postWithTimeout(message, timeout);}@Overridepublic void send(Object message) {feignClient.send(message);}@Overridepublic void deliver(Object message) {// 异步发送逻辑CompletableFuture.runAsync(() -> feignClient.post(message));}
}
Receiver的实现
Receiver接口
Receiver
接口定义了接收请求和响应的基本方法。
public interface Receiver {/*** 处理异步请求的方法。** @param message 请求消息对象* @param sender 请求发送方的Sender实例*/void receive(Object message, Sender sender);/*** 处理同步请求的方法,返回响应对象。** @param message 请求消息对象* @param sender 请求发送方的Sender实例* @return 响应对象*/Object receiveAndReply(Object message, Sender sender);/*** 带超时设置的同步请求处理方法。** @param message 请求消息对象* @param duration 超时时间* @param sender 请求发送方的Sender实例* @return 响应对象*/Object receiveAndReply(Object message, Duration duration, Sender sender);
}
Receiver的实现类
public class DefaultReceiver implements Receiver {@Overridepublic void receive(Object message, Sender sender) {// 异步处理逻辑System.out.println("Received async message: " + message);// 根据业务需求决定是否需要响应发送方}@Overridepublic Object receiveAndReply(Object message, Sender sender) {// 处理请求并返回响应System.out.println("Received sync message: " + message);return processRequest(message);}@Overridepublic Object receiveAndReply(Object message, Duration duration, Sender sender) {// 处理请求并返回响应,考虑超时System.out.println("Received sync message with timeout: " + message);return processRequestWithTimeout(message, duration);}private Object processRequest(Object message) {// 业务逻辑处理return "Processed: " + message;}private Object processRequestWithTimeout(Object message, Duration duration) {// 业务逻辑处理,支持超时try {Thread.sleep(duration.toMillis());} catch (InterruptedException e) {Thread.currentThread().interrupt();return "Processing interrupted";}return "Processed with timeout: " + message;}
}
Interceptor的实现
拦截器接口
Interceptor
接口定义了在请求发送前后的处理逻辑。
public interface RPCInterceptor {/*** 请求发送前的处理逻辑。** @param message 请求消息对象*/void preHandle(Object message);/*** 请求发送后的处理逻辑。** @param message 请求消息对象* @param response 响应对象*/void postHandle(Object message, Object response);
}
广播拦截器实现
广播拦截器用于将请求广播给所有服务实例。
@Componentpublic class BroadcastRPCInterceptor implements RPCInterceptor {@Overridepublic void preHandle(Object message) {if (message instanceof BroadcastMessage) {// 广播逻辑System.out.println("Broadcasting message: " + message);}}@Overridepublic void postHandle(Object message, Object response) {// 后处理逻辑}
}
重试拦截器实现
重试拦截器用于在请求失败时进行重试。
@Componentpublic class RetryableRPCInterceptor implements RPCInterceptor {@Overridepublic void preHandle(Object message) {// 重试逻辑处理}@Overridepublic void postHandle(Object message, Object response) {if (response instanceof Throwable) {System.out.println("Request failed, retrying...");// 重试逻辑}}
}
缓存拦截器实现
缓存拦截器用于对不频繁变动的响应进行缓存。
@Componentpublic class CacheableRPCInterceptor implements RPCInterceptor {private final Map<Object, Object> cache = new ConcurrentHashMap<>();@Overridepublic void preHandle(Object message) {if (message instanceof CacheableMessage) {// 检查缓存Object cachedResponse = cache.get(message);if (cachedResponse != null) {System.out.println("Cache hit: " + message);// 返回缓存响应}}}@Overridepublic void postHandle(Object message, Object response) {if (message instanceof CacheableMessage) {// 缓存响应cache.put(message, response);}}
}
自定义拦截器实现
用户可以根据需要实现自定义拦截器,实现特定功能。
@Componentpublic class CustomRPCInterceptor implements RPCInterceptor {@Overridepublic void preHandle(Object message) {// 自定义处理逻辑}@Overridepublic void postHandle(Object message, Object response) {// 自定义处理逻辑}}
请求和响应编码器/解码器
请求编码器
将请求对象序列化为JSON字符串。
public class RPCEncoder {public String encode(Object message) {// 使用Jackson或Gson进行序列化return new ObjectMapper().writeValueAsString(message);}
}
请求解码器
将JSON字符串反序列化为请求对象。
public class RPCDecoder {public <T> T decode(String json, Class<T> clazz) {// 使用Jackson或Gson进行反序列化return new ObjectMapper().readValue(json, clazz);}
}
RPCReceiveRestful的实现
RPCReceiveRestful
是一个内嵌的HTTP服务,用于接收请求并调用解码器进行解码。
@RestController@RequestMapping("/rpc")
public class RPCReceiveRestful {private final RPCDecoder decoder;private final ReceiverManager receiverManager;public RPCReceiveRestful(RPCDecoder decoder, ReceiverManager receiverManager) {this.decoder = decoder;this.receiverManager = receiverManager;}@PostMapping("/receive")public ResponseEntity<Object> receiveRequest(@RequestBody String requestJson) {try {// 解码请求RPCRequest request = decoder.decode(requestJson, RPCRequest.class);// 获取对应的ReceiverReceiver receiver = receiverManager.getReceiver(request.getServiceName());// 调用Receiver处理请求Object response = receiver.receiveAndReply(request.getMessage(), request.getSender());return ResponseEntity.ok(response);} catch (Exception e) {// 请求解码或处理失败return ResponseEntity.status(HttpStatus.BAD_REQUEST).body("Request processing failed");}}
}
示例代码:使用Linkis-RPC进行通信
创建Sender
public class RpcClient {private final Sender sender;public RpcClient(Sender sender) {this.sender = sender;}public void sendMessage(String message) {// 异步发送消息sender.deliver(message);}public Object requestResponse(String message) {// 同步请求响应return sender.ask(message);}
}
创建Receiver
public class RpcServer implements Receiver {@Overridepublic void receive(Object message, Sender sender) {// 处理异步请求System.out.println("Server received async message: " + message);}@Overridepublic Object receiveAndReply(Object message, Sender sender) {// 处理同步请求并返回响应System.out.println("Server received sync message: " + message);return "Server response to: " + message;}@Overridepublic Object receiveAndReply(Object message, Duration duration, Sender sender) {// 处理带超时的同步请求并返回响应System.out.println("Server received sync message with timeout: " + message);return "Server response with timeout to: " + message;}
}
结论
Linkis-RPC提供了一种灵活且强大的微服务间通信机制,解决了传统RPC框架在复杂场景中的不足。通过自定义拦截器、异步请求处理和广播机制,Linkis-RPC能够满足现代微服务架构的通信需求。本文详细分析了Linkis-RPC的设计思想、代码结构,并提供了完整的代码示例,希望能为开发者提供有价值的参考。
如需进一步的讨论和问题解答,欢迎留言,共同探讨Linkis-RPC的更多应用场景和实现细节。
参考链接:公共模块 - RPC 模块 - 《Apache Linkis v1.3.0 中文文档》 - 书栈网 · BookStack
相关文章:
Linkis-RPC的设计思想
我的技术网站 java-broke.site,有大厂完整面经,工作技术,架构师成长之路,等经验分享 Linkis-RPC的设计目标是提供一种灵活、可扩展的微服务间通信机制,支持以下功能: 异步请求与响应:支持请求方…...

31 - memmove()函数
文章目录 1 函数原型2 参数3 返回值4 示例 1 函数原型 memmove():移动内存块,函数原型如下: void * memmove ( void * destination, const void * source, size_t num );cstring库描述如下: Move block of memory 1. Copies th…...

【深度学习】创建和训练Transformer神经网络模型,将葡萄牙语翻译成英语
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言1. 安装2. 数据处理2.1 下载数据集2.2 设置标记器2.3 使用tf.data设置数据管道 3. 测试数据集4. 定义组件4.1 嵌入和位置编码层4.2 添加并规范化4.3 基础注意力…...
[Qt][多元素控件]详细讲解
目录 0.前言1.List Widget2.Table Widget3.Tree Widget 0.前言 Qt中提供的多元素控件有: 列表: QListWidgetQListView 表格: QTableWidgetQTableView 树形: QTreeWidgetQTreeView Widget和View之间的区别,以QTableWi…...
/var/log/里面的文件具体是什么?linux的登录文件
1,什么是登录文件? linux系统官方对登录文件的定义解释我就不说了,我个人理解登录文件其实就是记录系统活动信息的几个文件,登录文件其实就是系统的日志文件。 比如linux系统默认是不会安装nginx的,nginx的日志为/var…...

JVM知识总结(双亲委派机制)
文章收录在网站:http://hardyfish.top/ 文章收录在网站:http://hardyfish.top/ 文章收录在网站:http://hardyfish.top/ 文章收录在网站:http://hardyfish.top/ 双亲委派机制 双亲委派类加载过程 当App尝试加载一个类时&#x…...
YOLOv2:更快更准的目标检测
目录 前言 2.1 简介 2.2 网络结构 2.3 改进方法 2.4 性能表现 前言 自从 You Only Look Once (YOLO) 系列算法问世以来,就以其独特的设计和高效的性能在目标检测领域占据了重要地位。YOLOv1 开创了单阶段检测的新纪元,通过将整个检测过程简化为一个端到端…...
硬件工程师笔面试真题汇总
目录 1、电阻 1)上拉电阻的作用 2)PTC热敏电阻作为电源电路保险丝的工作原理 2、电容 1)电容的特性 2) 电容的特性曲线 3) 1uf的电容通常来滤除什么频率的信号 3、电感 4、二极管 1)二极管特性 2)二极管伏安…...
【vue+marked】marked
一、使用marked 第一步:下载marked和代码块高亮highlight.js npm i markednpm i highlight.jsnpm i markdown-loadernpm i github-markdown-css 第二步:注册并使用 main.js import hljs from "highlight.js"; import "github-markdow…...

无人机之热成像篇
一、定义 无人机热成像技术是指将热成像相机安装在无人机云台上,通过无人机的高空飞行能力和云台的稳定性,结合红外热成像技术对目标区域进行非接触式的温度测量和图像采集。该技术利用物体发出的红外辐射来生成图像,通过测量物体表面温度分布…...

浅谈C/C++指针和引用在Linux和Windows不同环境下的编码风格
目录 0. 前言 1. 代码块、函数体上的 { } 的规范 2. 指针和引用中的 * 和 & 符号的位置 1. Linux 环境下编码风格(gcc) 2. Windows 环境下编码风格(Visual Studio) 3. 简单总结 0. 前言 C/C因为高度的自由性,并没有对一些常见的编码风格进行限制&#…...

【C#】一个项目移动了位置,或者换到其他电脑上,编译报错 Files 的值“IGEF,解决方法
文章目录 1 问题分析2 本文解决方法 一个项目可以正常运行编译的项目,所有路径均为相对路径。 移动了位置,或者换到其他电脑上,编译报错 Files 的值“IGEF, 1 问题分析 这个错误信息表明在处理文件时,Files 的值出…...
代码随想录算法训练营第五十八天|拓扑排序精讲 、dijkstra(朴素版)精讲
拓扑排序 117. 软件构建 from collections import deque, defaultdictdef topological_sort(n, edges):inDegree [0] * n # inDegree 记录每个文件的入度umap defaultdict(list) # 记录文件依赖关系# 构建图和入度表for s, t in edges:inDegree[t] 1umap[s].append(t)# 初…...

【ARM】ULINK Pro如何和SWD接口进行连接调试
【更多软件使用问题请点击亿道电子官方网站】 1、 文档目标 解决ULINK Pro和JTAR接口进行连接问题。 2、 问题场景 因为ULINK Pro本身自带的接口是Cortex-M ETM Interface 20-pin Connector。所以无法和JTAR接口直接进行连接。 图2-1 3、软硬件环境 1)、软件版…...
react框架安全设计
react框架安全设计 1、易受攻击的React版本 React库在过去有一些严重性很高的漏洞,因此最好保持稳定版中的最新版本。 2、数据绑定 使用默认的{}进行数据绑定,React会自动对值进行转义以防止XSS攻击。但注意这种保护只在渲染textContent时候有用,渲染 HTML attributes的…...

Kafka生产调优实践。Kafka消息安全性、消息丢失、消息积压、保证消息顺序性
文章目录 搭建Kafka监控平台合理规划Kafka部署环境合理优化Kafka集群配置优化Kafka客户端使用方式合理保证消息安全消费者防止消息重复消费 生产环境常见问题分析消息零丢失方案消息积压如何处理如何保证消息顺序 搭建Kafka监控平台 官网地址 下载efak-web-3.0.2-bin.tar.gz安…...

DDColor部署安装,在服务器Ubuntu22.04系统——点动科技
DDColor图片上色项目的部署安装,在服务器Ubuntu22.04系统——点动科技 一、ubuntu22.04基本环境配置1.1 更换清华Ubuntu镜像源1.2 更新包列表:2. 安装英伟达显卡驱动2.1 使用wget在命令行下载驱动包2.2 更新软件列表和安装必要软件、依赖2.2 卸载原有驱动…...
使用 SSL/TLS 加密保障 RocketMQ 的安全传输
引言 在现代分布式系统中,数据传输的安全性至关重要。Apache RocketMQ作为一款高性能、高吞吐量的消息中间件,在许多关键应用场景中被广泛使用。为了确保消息传输的安全性,SSL/TLS 加密提供了一种可靠的解决方案。本文将详细介绍如何在 Rock…...
uni-app开发
参考帖 uniapp官方文档 组件库 项目中肯定需要使用第三方组件库,因为现有的这些不够方便我们去使用 uview: 演示 | uView 2.0 - 全面兼容 nvue 的 uni-app 生态框架 - uni-app UI 框架 ThorUI: 介绍 | ThorUI文档 创建uni-app项目 有HBuilder…...
2024社招面经_存储DB广告架构方向
总结 第一次社招,主要是三四月份面的,offer的有高德、拼多多、腾讯、美团、快手、携程,后面面的比较累了,因为美团定级和涨幅都还行就去了美团,没再继续面别的。 因为时间比较久了,只在这里贴一下当时有记…...

wordpress后台更新后 前端没变化的解决方法
使用siteground主机的wordpress网站,会出现更新了网站内容和修改了php模板文件、js文件、css文件、图片文件后,网站没有变化的情况。 不熟悉siteground主机的新手,遇到这个问题,就很抓狂,明明是哪都没操作错误&#x…...
反向工程与模型迁移:打造未来商品详情API的可持续创新体系
在电商行业蓬勃发展的当下,商品详情API作为连接电商平台与开发者、商家及用户的关键纽带,其重要性日益凸显。传统商品详情API主要聚焦于商品基本信息(如名称、价格、库存等)的获取与展示,已难以满足市场对个性化、智能…...

MongoDB学习和应用(高效的非关系型数据库)
一丶 MongoDB简介 对于社交类软件的功能,我们需要对它的功能特点进行分析: 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具: mysql:关系型数据库&am…...

【大模型RAG】Docker 一键部署 Milvus 完整攻略
本文概要 Milvus 2.5 Stand-alone 版可通过 Docker 在几分钟内完成安装;只需暴露 19530(gRPC)与 9091(HTTP/WebUI)两个端口,即可让本地电脑通过 PyMilvus 或浏览器访问远程 Linux 服务器上的 Milvus。下面…...

高频面试之3Zookeeper
高频面试之3Zookeeper 文章目录 高频面试之3Zookeeper3.1 常用命令3.2 选举机制3.3 Zookeeper符合法则中哪两个?3.4 Zookeeper脑裂3.5 Zookeeper用来干嘛了 3.1 常用命令 ls、get、create、delete、deleteall3.2 选举机制 半数机制(过半机制࿰…...

【2025年】解决Burpsuite抓不到https包的问题
环境:windows11 burpsuite:2025.5 在抓取https网站时,burpsuite抓取不到https数据包,只显示: 解决该问题只需如下三个步骤: 1、浏览器中访问 http://burp 2、下载 CA certificate 证书 3、在设置--隐私与安全--…...

涂鸦T5AI手搓语音、emoji、otto机器人从入门到实战
“🤖手搓TuyaAI语音指令 😍秒变表情包大师,让萌系Otto机器人🔥玩出智能新花样!开整!” 🤖 Otto机器人 → 直接点明主体 手搓TuyaAI语音 → 强调 自主编程/自定义 语音控制(TuyaAI…...

什么是Ansible Jinja2
理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具,可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板,允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板,并通…...

JVM 内存结构 详解
内存结构 运行时数据区: Java虚拟机在运行Java程序过程中管理的内存区域。 程序计数器: 线程私有,程序控制流的指示器,分支、循环、跳转、异常处理、线程恢复等基础功能都依赖这个计数器完成。 每个线程都有一个程序计数…...

Linux 中如何提取压缩文件 ?
Linux 是一种流行的开源操作系统,它提供了许多工具来管理、压缩和解压缩文件。压缩文件有助于节省存储空间,使数据传输更快。本指南将向您展示如何在 Linux 中提取不同类型的压缩文件。 1. Unpacking ZIP Files ZIP 文件是非常常见的,要在 …...