Linkis-RPC的设计思想
我的技术网站 java-broke.site,有大厂完整面经,工作技术,架构师成长之路,等经验分享
Linkis-RPC的设计目标是提供一种灵活、可扩展的微服务间通信机制,支持以下功能:
- 异步请求与响应:支持请求方发送异步请求,接收方处理完后再响应给请求方。
- 广播请求:支持将请求广播给所有目标服务实例。
- 自定义拦截器:允许用户定义拦截器来实现特定功能,如缓存、重试等。
- 服务选择:基于Eureka实现的服务发现,支持通过服务名和实例信息选择特定服务实例。
- 解耦通信机制:请求方和接收方的代码实现解耦,便于独立开发和维护。
Linkis-RPC的架构概述
Linkis-RPC主要由以下几个核心组件构成:
- Sender:请求发送器,用于发送请求到目标服务。
- Receiver:请求接收器,负责接收和处理请求。
- Interceptor:拦截器链,用于在请求发送前进行处理,如广播、重试、缓存等。
- Decoder/Encoder:用于请求和响应的序列化和反序列化。
- Eureka:服务注册与发现中心。
Linkis-RPC的代码结构
Linkis-RPC的源码结构清晰,主要代码模块包括:
- Sender接口和实现:负责发送请求并处理响应。
- Receiver接口和实现:负责接收请求并执行业务逻辑。
- Interceptor接口和实现:拦截器实现类,用于增强请求功能。
- RPCReceiveRestful:内嵌的HTTP服务,用于接收和解码请求。
- 请求和响应编码器/解码器:实现请求和响应的序列化和反序列化。
接下来,我们将逐步分析每个模块的代码实现。
Sender的实现
Sender接口
Sender接口提供了多种发送请求的方法,包括同步和异步请求。
public abstract class Sender {/*** 同步请求方法,等待接收方返回响应。** @param message 请求消息对象* @return 响应对象*/public abstract Object ask(Object message);/*** 带超时设置的同步请求方法。** @param message 请求消息对象* @param timeout 超时时间* @return 响应对象*/public abstract Object ask(Object message, Duration timeout);/*** 仅发送请求,不关心响应。** @param message 请求消息对象*/public abstract void send(Object message);/*** 异步请求方法,在稍后通过其他线程发送请求。** @param message 请求消息对象*/public abstract void deliver(Object message);
}
Sender的实现类
Sender的实现类主要负责构建请求并通过Feign客户端发送请求。
public class DefaultSender extends Sender {private final FeignClient feignClient;private final String serviceName;public DefaultSender(String serviceName, FeignClient feignClient) {this.serviceName = serviceName;this.feignClient = feignClient;}@Overridepublic Object ask(Object message) {return feignClient.post(message);}@Overridepublic Object ask(Object message, Duration timeout) {// 设置请求超时逻辑return feignClient.postWithTimeout(message, timeout);}@Overridepublic void send(Object message) {feignClient.send(message);}@Overridepublic void deliver(Object message) {// 异步发送逻辑CompletableFuture.runAsync(() -> feignClient.post(message));}
}
Receiver的实现
Receiver接口
Receiver接口定义了接收请求和响应的基本方法。
public interface Receiver {/*** 处理异步请求的方法。** @param message 请求消息对象* @param sender 请求发送方的Sender实例*/void receive(Object message, Sender sender);/*** 处理同步请求的方法,返回响应对象。** @param message 请求消息对象* @param sender 请求发送方的Sender实例* @return 响应对象*/Object receiveAndReply(Object message, Sender sender);/*** 带超时设置的同步请求处理方法。** @param message 请求消息对象* @param duration 超时时间* @param sender 请求发送方的Sender实例* @return 响应对象*/Object receiveAndReply(Object message, Duration duration, Sender sender);
}
Receiver的实现类
public class DefaultReceiver implements Receiver {@Overridepublic void receive(Object message, Sender sender) {// 异步处理逻辑System.out.println("Received async message: " + message);// 根据业务需求决定是否需要响应发送方}@Overridepublic Object receiveAndReply(Object message, Sender sender) {// 处理请求并返回响应System.out.println("Received sync message: " + message);return processRequest(message);}@Overridepublic Object receiveAndReply(Object message, Duration duration, Sender sender) {// 处理请求并返回响应,考虑超时System.out.println("Received sync message with timeout: " + message);return processRequestWithTimeout(message, duration);}private Object processRequest(Object message) {// 业务逻辑处理return "Processed: " + message;}private Object processRequestWithTimeout(Object message, Duration duration) {// 业务逻辑处理,支持超时try {Thread.sleep(duration.toMillis());} catch (InterruptedException e) {Thread.currentThread().interrupt();return "Processing interrupted";}return "Processed with timeout: " + message;}
}
Interceptor的实现
拦截器接口
Interceptor接口定义了在请求发送前后的处理逻辑。
public interface RPCInterceptor {/*** 请求发送前的处理逻辑。** @param message 请求消息对象*/void preHandle(Object message);/*** 请求发送后的处理逻辑。** @param message 请求消息对象* @param response 响应对象*/void postHandle(Object message, Object response);
}
广播拦截器实现
广播拦截器用于将请求广播给所有服务实例。
@Componentpublic class BroadcastRPCInterceptor implements RPCInterceptor {@Overridepublic void preHandle(Object message) {if (message instanceof BroadcastMessage) {// 广播逻辑System.out.println("Broadcasting message: " + message);}}@Overridepublic void postHandle(Object message, Object response) {// 后处理逻辑}
}
重试拦截器实现
重试拦截器用于在请求失败时进行重试。
@Componentpublic class RetryableRPCInterceptor implements RPCInterceptor {@Overridepublic void preHandle(Object message) {// 重试逻辑处理}@Overridepublic void postHandle(Object message, Object response) {if (response instanceof Throwable) {System.out.println("Request failed, retrying...");// 重试逻辑}}
}
缓存拦截器实现
缓存拦截器用于对不频繁变动的响应进行缓存。
@Componentpublic class CacheableRPCInterceptor implements RPCInterceptor {private final Map<Object, Object> cache = new ConcurrentHashMap<>();@Overridepublic void preHandle(Object message) {if (message instanceof CacheableMessage) {// 检查缓存Object cachedResponse = cache.get(message);if (cachedResponse != null) {System.out.println("Cache hit: " + message);// 返回缓存响应}}}@Overridepublic void postHandle(Object message, Object response) {if (message instanceof CacheableMessage) {// 缓存响应cache.put(message, response);}}
}
自定义拦截器实现
用户可以根据需要实现自定义拦截器,实现特定功能。
@Componentpublic class CustomRPCInterceptor implements RPCInterceptor {@Overridepublic void preHandle(Object message) {// 自定义处理逻辑}@Overridepublic void postHandle(Object message, Object response) {// 自定义处理逻辑}}
请求和响应编码器/解码器
请求编码器
将请求对象序列化为JSON字符串。
public class RPCEncoder {public String encode(Object message) {// 使用Jackson或Gson进行序列化return new ObjectMapper().writeValueAsString(message);}
}
请求解码器
将JSON字符串反序列化为请求对象。
public class RPCDecoder {public <T> T decode(String json, Class<T> clazz) {// 使用Jackson或Gson进行反序列化return new ObjectMapper().readValue(json, clazz);}
}
RPCReceiveRestful的实现
RPCReceiveRestful是一个内嵌的HTTP服务,用于接收请求并调用解码器进行解码。
@RestController@RequestMapping("/rpc")
public class RPCReceiveRestful {private final RPCDecoder decoder;private final ReceiverManager receiverManager;public RPCReceiveRestful(RPCDecoder decoder, ReceiverManager receiverManager) {this.decoder = decoder;this.receiverManager = receiverManager;}@PostMapping("/receive")public ResponseEntity<Object> receiveRequest(@RequestBody String requestJson) {try {// 解码请求RPCRequest request = decoder.decode(requestJson, RPCRequest.class);// 获取对应的ReceiverReceiver receiver = receiverManager.getReceiver(request.getServiceName());// 调用Receiver处理请求Object response = receiver.receiveAndReply(request.getMessage(), request.getSender());return ResponseEntity.ok(response);} catch (Exception e) {// 请求解码或处理失败return ResponseEntity.status(HttpStatus.BAD_REQUEST).body("Request processing failed");}}
}
示例代码:使用Linkis-RPC进行通信
创建Sender
public class RpcClient {private final Sender sender;public RpcClient(Sender sender) {this.sender = sender;}public void sendMessage(String message) {// 异步发送消息sender.deliver(message);}public Object requestResponse(String message) {// 同步请求响应return sender.ask(message);}
}
创建Receiver
public class RpcServer implements Receiver {@Overridepublic void receive(Object message, Sender sender) {// 处理异步请求System.out.println("Server received async message: " + message);}@Overridepublic Object receiveAndReply(Object message, Sender sender) {// 处理同步请求并返回响应System.out.println("Server received sync message: " + message);return "Server response to: " + message;}@Overridepublic Object receiveAndReply(Object message, Duration duration, Sender sender) {// 处理带超时的同步请求并返回响应System.out.println("Server received sync message with timeout: " + message);return "Server response with timeout to: " + message;}
}
结论
Linkis-RPC提供了一种灵活且强大的微服务间通信机制,解决了传统RPC框架在复杂场景中的不足。通过自定义拦截器、异步请求处理和广播机制,Linkis-RPC能够满足现代微服务架构的通信需求。本文详细分析了Linkis-RPC的设计思想、代码结构,并提供了完整的代码示例,希望能为开发者提供有价值的参考。
如需进一步的讨论和问题解答,欢迎留言,共同探讨Linkis-RPC的更多应用场景和实现细节。
参考链接:公共模块 - RPC 模块 - 《Apache Linkis v1.3.0 中文文档》 - 书栈网 · BookStack
相关文章:
Linkis-RPC的设计思想
我的技术网站 java-broke.site,有大厂完整面经,工作技术,架构师成长之路,等经验分享 Linkis-RPC的设计目标是提供一种灵活、可扩展的微服务间通信机制,支持以下功能: 异步请求与响应:支持请求方…...
31 - memmove()函数
文章目录 1 函数原型2 参数3 返回值4 示例 1 函数原型 memmove():移动内存块,函数原型如下: void * memmove ( void * destination, const void * source, size_t num );cstring库描述如下: Move block of memory 1. Copies th…...
【深度学习】创建和训练Transformer神经网络模型,将葡萄牙语翻译成英语
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言1. 安装2. 数据处理2.1 下载数据集2.2 设置标记器2.3 使用tf.data设置数据管道 3. 测试数据集4. 定义组件4.1 嵌入和位置编码层4.2 添加并规范化4.3 基础注意力…...
[Qt][多元素控件]详细讲解
目录 0.前言1.List Widget2.Table Widget3.Tree Widget 0.前言 Qt中提供的多元素控件有: 列表: QListWidgetQListView 表格: QTableWidgetQTableView 树形: QTreeWidgetQTreeView Widget和View之间的区别,以QTableWi…...
/var/log/里面的文件具体是什么?linux的登录文件
1,什么是登录文件? linux系统官方对登录文件的定义解释我就不说了,我个人理解登录文件其实就是记录系统活动信息的几个文件,登录文件其实就是系统的日志文件。 比如linux系统默认是不会安装nginx的,nginx的日志为/var…...
JVM知识总结(双亲委派机制)
文章收录在网站:http://hardyfish.top/ 文章收录在网站:http://hardyfish.top/ 文章收录在网站:http://hardyfish.top/ 文章收录在网站:http://hardyfish.top/ 双亲委派机制 双亲委派类加载过程 当App尝试加载一个类时&#x…...
YOLOv2:更快更准的目标检测
目录 前言 2.1 简介 2.2 网络结构 2.3 改进方法 2.4 性能表现 前言 自从 You Only Look Once (YOLO) 系列算法问世以来,就以其独特的设计和高效的性能在目标检测领域占据了重要地位。YOLOv1 开创了单阶段检测的新纪元,通过将整个检测过程简化为一个端到端…...
硬件工程师笔面试真题汇总
目录 1、电阻 1)上拉电阻的作用 2)PTC热敏电阻作为电源电路保险丝的工作原理 2、电容 1)电容的特性 2) 电容的特性曲线 3) 1uf的电容通常来滤除什么频率的信号 3、电感 4、二极管 1)二极管特性 2)二极管伏安…...
【vue+marked】marked
一、使用marked 第一步:下载marked和代码块高亮highlight.js npm i markednpm i highlight.jsnpm i markdown-loadernpm i github-markdown-css 第二步:注册并使用 main.js import hljs from "highlight.js"; import "github-markdow…...
无人机之热成像篇
一、定义 无人机热成像技术是指将热成像相机安装在无人机云台上,通过无人机的高空飞行能力和云台的稳定性,结合红外热成像技术对目标区域进行非接触式的温度测量和图像采集。该技术利用物体发出的红外辐射来生成图像,通过测量物体表面温度分布…...
浅谈C/C++指针和引用在Linux和Windows不同环境下的编码风格
目录 0. 前言 1. 代码块、函数体上的 { } 的规范 2. 指针和引用中的 * 和 & 符号的位置 1. Linux 环境下编码风格(gcc) 2. Windows 环境下编码风格(Visual Studio) 3. 简单总结 0. 前言 C/C因为高度的自由性,并没有对一些常见的编码风格进行限制&#…...
【C#】一个项目移动了位置,或者换到其他电脑上,编译报错 Files 的值“IGEF,解决方法
文章目录 1 问题分析2 本文解决方法 一个项目可以正常运行编译的项目,所有路径均为相对路径。 移动了位置,或者换到其他电脑上,编译报错 Files 的值“IGEF, 1 问题分析 这个错误信息表明在处理文件时,Files 的值出…...
代码随想录算法训练营第五十八天|拓扑排序精讲 、dijkstra(朴素版)精讲
拓扑排序 117. 软件构建 from collections import deque, defaultdictdef topological_sort(n, edges):inDegree [0] * n # inDegree 记录每个文件的入度umap defaultdict(list) # 记录文件依赖关系# 构建图和入度表for s, t in edges:inDegree[t] 1umap[s].append(t)# 初…...
【ARM】ULINK Pro如何和SWD接口进行连接调试
【更多软件使用问题请点击亿道电子官方网站】 1、 文档目标 解决ULINK Pro和JTAR接口进行连接问题。 2、 问题场景 因为ULINK Pro本身自带的接口是Cortex-M ETM Interface 20-pin Connector。所以无法和JTAR接口直接进行连接。 图2-1 3、软硬件环境 1)、软件版…...
react框架安全设计
react框架安全设计 1、易受攻击的React版本 React库在过去有一些严重性很高的漏洞,因此最好保持稳定版中的最新版本。 2、数据绑定 使用默认的{}进行数据绑定,React会自动对值进行转义以防止XSS攻击。但注意这种保护只在渲染textContent时候有用,渲染 HTML attributes的…...
Kafka生产调优实践。Kafka消息安全性、消息丢失、消息积压、保证消息顺序性
文章目录 搭建Kafka监控平台合理规划Kafka部署环境合理优化Kafka集群配置优化Kafka客户端使用方式合理保证消息安全消费者防止消息重复消费 生产环境常见问题分析消息零丢失方案消息积压如何处理如何保证消息顺序 搭建Kafka监控平台 官网地址 下载efak-web-3.0.2-bin.tar.gz安…...
DDColor部署安装,在服务器Ubuntu22.04系统——点动科技
DDColor图片上色项目的部署安装,在服务器Ubuntu22.04系统——点动科技 一、ubuntu22.04基本环境配置1.1 更换清华Ubuntu镜像源1.2 更新包列表:2. 安装英伟达显卡驱动2.1 使用wget在命令行下载驱动包2.2 更新软件列表和安装必要软件、依赖2.2 卸载原有驱动…...
使用 SSL/TLS 加密保障 RocketMQ 的安全传输
引言 在现代分布式系统中,数据传输的安全性至关重要。Apache RocketMQ作为一款高性能、高吞吐量的消息中间件,在许多关键应用场景中被广泛使用。为了确保消息传输的安全性,SSL/TLS 加密提供了一种可靠的解决方案。本文将详细介绍如何在 Rock…...
uni-app开发
参考帖 uniapp官方文档 组件库 项目中肯定需要使用第三方组件库,因为现有的这些不够方便我们去使用 uview: 演示 | uView 2.0 - 全面兼容 nvue 的 uni-app 生态框架 - uni-app UI 框架 ThorUI: 介绍 | ThorUI文档 创建uni-app项目 有HBuilder…...
2024社招面经_存储DB广告架构方向
总结 第一次社招,主要是三四月份面的,offer的有高德、拼多多、腾讯、美团、快手、携程,后面面的比较累了,因为美团定级和涨幅都还行就去了美团,没再继续面别的。 因为时间比较久了,只在这里贴一下当时有记…...
Cesium1.95中高性能加载1500个点
一、基本方式: 图标使用.png比.svg性能要好 <template><div id"cesiumContainer"></div><div class"toolbar"><button id"resetButton">重新生成点</button><span id"countDisplay&qu…...
centos 7 部署awstats 网站访问检测
一、基础环境准备(两种安装方式都要做) bash # 安装必要依赖 yum install -y httpd perl mod_perl perl-Time-HiRes perl-DateTime systemctl enable httpd # 设置 Apache 开机自启 systemctl start httpd # 启动 Apache二、安装 AWStats࿰…...
《从零掌握MIPI CSI-2: 协议精解与FPGA摄像头开发实战》-- CSI-2 协议详细解析 (一)
CSI-2 协议详细解析 (一) 1. CSI-2层定义(CSI-2 Layer Definitions) 分层结构 :CSI-2协议分为6层: 物理层(PHY Layer) : 定义电气特性、时钟机制和传输介质(导线&#…...
基于Uniapp开发HarmonyOS 5.0旅游应用技术实践
一、技术选型背景 1.跨平台优势 Uniapp采用Vue.js框架,支持"一次开发,多端部署",可同步生成HarmonyOS、iOS、Android等多平台应用。 2.鸿蒙特性融合 HarmonyOS 5.0的分布式能力与原子化服务,为旅游应用带来…...
NFT模式:数字资产确权与链游经济系统构建
NFT模式:数字资产确权与链游经济系统构建 ——从技术架构到可持续生态的范式革命 一、确权技术革新:构建可信数字资产基石 1. 区块链底层架构的进化 跨链互操作协议:基于LayerZero协议实现以太坊、Solana等公链资产互通,通过零知…...
【Oracle】分区表
个人主页:Guiat 归属专栏:Oracle 文章目录 1. 分区表基础概述1.1 分区表的概念与优势1.2 分区类型概览1.3 分区表的工作原理 2. 范围分区 (RANGE Partitioning)2.1 基础范围分区2.1.1 按日期范围分区2.1.2 按数值范围分区 2.2 间隔分区 (INTERVAL Partit…...
HTML前端开发:JavaScript 获取元素方法详解
作为前端开发者,高效获取 DOM 元素是必备技能。以下是 JS 中核心的获取元素方法,分为两大系列: 一、getElementBy... 系列 传统方法,直接通过 DOM 接口访问,返回动态集合(元素变化会实时更新)。…...
Python实现简单音频数据压缩与解压算法
Python实现简单音频数据压缩与解压算法 引言 在音频数据处理中,压缩算法是降低存储成本和传输效率的关键技术。Python作为一门灵活且功能强大的编程语言,提供了丰富的库和工具来实现音频数据的压缩与解压。本文将通过一个简单的音频数据压缩与解压算法…...
人工智能 - 在Dify、Coze、n8n、FastGPT和RAGFlow之间做出技术选型
在Dify、Coze、n8n、FastGPT和RAGFlow之间做出技术选型。这些平台各有侧重,适用场景差异显著。下面我将从核心功能定位、典型应用场景、真实体验痛点、选型决策关键点进行拆解,并提供具体场景下的推荐方案。 一、核心功能定位速览 平台核心定位技术栈亮…...
CSS3相关知识点
CSS3相关知识点 CSS3私有前缀私有前缀私有前缀存在的意义常见浏览器的私有前缀 CSS3基本语法CSS3 新增长度单位CSS3 新增颜色设置方式CSS3 新增选择器CSS3 新增盒模型相关属性box-sizing 怪异盒模型resize调整盒子大小box-shadow 盒子阴影opacity 不透明度 CSS3 新增背景属性ba…...
