Linkis-RPC的设计思想
我的技术网站 java-broke.site,有大厂完整面经,工作技术,架构师成长之路,等经验分享
Linkis-RPC的设计目标是提供一种灵活、可扩展的微服务间通信机制,支持以下功能:
- 异步请求与响应:支持请求方发送异步请求,接收方处理完后再响应给请求方。
- 广播请求:支持将请求广播给所有目标服务实例。
- 自定义拦截器:允许用户定义拦截器来实现特定功能,如缓存、重试等。
- 服务选择:基于Eureka实现的服务发现,支持通过服务名和实例信息选择特定服务实例。
- 解耦通信机制:请求方和接收方的代码实现解耦,便于独立开发和维护。
Linkis-RPC的架构概述
Linkis-RPC主要由以下几个核心组件构成:
- Sender:请求发送器,用于发送请求到目标服务。
- Receiver:请求接收器,负责接收和处理请求。
- Interceptor:拦截器链,用于在请求发送前进行处理,如广播、重试、缓存等。
- Decoder/Encoder:用于请求和响应的序列化和反序列化。
- Eureka:服务注册与发现中心。
Linkis-RPC的代码结构
Linkis-RPC的源码结构清晰,主要代码模块包括:
- Sender接口和实现:负责发送请求并处理响应。
- Receiver接口和实现:负责接收请求并执行业务逻辑。
- Interceptor接口和实现:拦截器实现类,用于增强请求功能。
- RPCReceiveRestful:内嵌的HTTP服务,用于接收和解码请求。
- 请求和响应编码器/解码器:实现请求和响应的序列化和反序列化。
接下来,我们将逐步分析每个模块的代码实现。
Sender的实现
Sender接口
Sender接口提供了多种发送请求的方法,包括同步和异步请求。
public abstract class Sender {/*** 同步请求方法,等待接收方返回响应。** @param message 请求消息对象* @return 响应对象*/public abstract Object ask(Object message);/*** 带超时设置的同步请求方法。** @param message 请求消息对象* @param timeout 超时时间* @return 响应对象*/public abstract Object ask(Object message, Duration timeout);/*** 仅发送请求,不关心响应。** @param message 请求消息对象*/public abstract void send(Object message);/*** 异步请求方法,在稍后通过其他线程发送请求。** @param message 请求消息对象*/public abstract void deliver(Object message);
}
Sender的实现类
Sender的实现类主要负责构建请求并通过Feign客户端发送请求。
public class DefaultSender extends Sender {private final FeignClient feignClient;private final String serviceName;public DefaultSender(String serviceName, FeignClient feignClient) {this.serviceName = serviceName;this.feignClient = feignClient;}@Overridepublic Object ask(Object message) {return feignClient.post(message);}@Overridepublic Object ask(Object message, Duration timeout) {// 设置请求超时逻辑return feignClient.postWithTimeout(message, timeout);}@Overridepublic void send(Object message) {feignClient.send(message);}@Overridepublic void deliver(Object message) {// 异步发送逻辑CompletableFuture.runAsync(() -> feignClient.post(message));}
}
Receiver的实现
Receiver接口
Receiver接口定义了接收请求和响应的基本方法。
public interface Receiver {/*** 处理异步请求的方法。** @param message 请求消息对象* @param sender 请求发送方的Sender实例*/void receive(Object message, Sender sender);/*** 处理同步请求的方法,返回响应对象。** @param message 请求消息对象* @param sender 请求发送方的Sender实例* @return 响应对象*/Object receiveAndReply(Object message, Sender sender);/*** 带超时设置的同步请求处理方法。** @param message 请求消息对象* @param duration 超时时间* @param sender 请求发送方的Sender实例* @return 响应对象*/Object receiveAndReply(Object message, Duration duration, Sender sender);
}
Receiver的实现类
public class DefaultReceiver implements Receiver {@Overridepublic void receive(Object message, Sender sender) {// 异步处理逻辑System.out.println("Received async message: " + message);// 根据业务需求决定是否需要响应发送方}@Overridepublic Object receiveAndReply(Object message, Sender sender) {// 处理请求并返回响应System.out.println("Received sync message: " + message);return processRequest(message);}@Overridepublic Object receiveAndReply(Object message, Duration duration, Sender sender) {// 处理请求并返回响应,考虑超时System.out.println("Received sync message with timeout: " + message);return processRequestWithTimeout(message, duration);}private Object processRequest(Object message) {// 业务逻辑处理return "Processed: " + message;}private Object processRequestWithTimeout(Object message, Duration duration) {// 业务逻辑处理,支持超时try {Thread.sleep(duration.toMillis());} catch (InterruptedException e) {Thread.currentThread().interrupt();return "Processing interrupted";}return "Processed with timeout: " + message;}
}
Interceptor的实现
拦截器接口
Interceptor接口定义了在请求发送前后的处理逻辑。
public interface RPCInterceptor {/*** 请求发送前的处理逻辑。** @param message 请求消息对象*/void preHandle(Object message);/*** 请求发送后的处理逻辑。** @param message 请求消息对象* @param response 响应对象*/void postHandle(Object message, Object response);
}
广播拦截器实现
广播拦截器用于将请求广播给所有服务实例。
@Componentpublic class BroadcastRPCInterceptor implements RPCInterceptor {@Overridepublic void preHandle(Object message) {if (message instanceof BroadcastMessage) {// 广播逻辑System.out.println("Broadcasting message: " + message);}}@Overridepublic void postHandle(Object message, Object response) {// 后处理逻辑}
}
重试拦截器实现
重试拦截器用于在请求失败时进行重试。
@Componentpublic class RetryableRPCInterceptor implements RPCInterceptor {@Overridepublic void preHandle(Object message) {// 重试逻辑处理}@Overridepublic void postHandle(Object message, Object response) {if (response instanceof Throwable) {System.out.println("Request failed, retrying...");// 重试逻辑}}
}
缓存拦截器实现
缓存拦截器用于对不频繁变动的响应进行缓存。
@Componentpublic class CacheableRPCInterceptor implements RPCInterceptor {private final Map<Object, Object> cache = new ConcurrentHashMap<>();@Overridepublic void preHandle(Object message) {if (message instanceof CacheableMessage) {// 检查缓存Object cachedResponse = cache.get(message);if (cachedResponse != null) {System.out.println("Cache hit: " + message);// 返回缓存响应}}}@Overridepublic void postHandle(Object message, Object response) {if (message instanceof CacheableMessage) {// 缓存响应cache.put(message, response);}}
}
自定义拦截器实现
用户可以根据需要实现自定义拦截器,实现特定功能。
@Componentpublic class CustomRPCInterceptor implements RPCInterceptor {@Overridepublic void preHandle(Object message) {// 自定义处理逻辑}@Overridepublic void postHandle(Object message, Object response) {// 自定义处理逻辑}}
请求和响应编码器/解码器
请求编码器
将请求对象序列化为JSON字符串。
public class RPCEncoder {public String encode(Object message) {// 使用Jackson或Gson进行序列化return new ObjectMapper().writeValueAsString(message);}
}
请求解码器
将JSON字符串反序列化为请求对象。
public class RPCDecoder {public <T> T decode(String json, Class<T> clazz) {// 使用Jackson或Gson进行反序列化return new ObjectMapper().readValue(json, clazz);}
}
RPCReceiveRestful的实现
RPCReceiveRestful是一个内嵌的HTTP服务,用于接收请求并调用解码器进行解码。
@RestController@RequestMapping("/rpc")
public class RPCReceiveRestful {private final RPCDecoder decoder;private final ReceiverManager receiverManager;public RPCReceiveRestful(RPCDecoder decoder, ReceiverManager receiverManager) {this.decoder = decoder;this.receiverManager = receiverManager;}@PostMapping("/receive")public ResponseEntity<Object> receiveRequest(@RequestBody String requestJson) {try {// 解码请求RPCRequest request = decoder.decode(requestJson, RPCRequest.class);// 获取对应的ReceiverReceiver receiver = receiverManager.getReceiver(request.getServiceName());// 调用Receiver处理请求Object response = receiver.receiveAndReply(request.getMessage(), request.getSender());return ResponseEntity.ok(response);} catch (Exception e) {// 请求解码或处理失败return ResponseEntity.status(HttpStatus.BAD_REQUEST).body("Request processing failed");}}
}
示例代码:使用Linkis-RPC进行通信
创建Sender
public class RpcClient {private final Sender sender;public RpcClient(Sender sender) {this.sender = sender;}public void sendMessage(String message) {// 异步发送消息sender.deliver(message);}public Object requestResponse(String message) {// 同步请求响应return sender.ask(message);}
}
创建Receiver
public class RpcServer implements Receiver {@Overridepublic void receive(Object message, Sender sender) {// 处理异步请求System.out.println("Server received async message: " + message);}@Overridepublic Object receiveAndReply(Object message, Sender sender) {// 处理同步请求并返回响应System.out.println("Server received sync message: " + message);return "Server response to: " + message;}@Overridepublic Object receiveAndReply(Object message, Duration duration, Sender sender) {// 处理带超时的同步请求并返回响应System.out.println("Server received sync message with timeout: " + message);return "Server response with timeout to: " + message;}
}
结论
Linkis-RPC提供了一种灵活且强大的微服务间通信机制,解决了传统RPC框架在复杂场景中的不足。通过自定义拦截器、异步请求处理和广播机制,Linkis-RPC能够满足现代微服务架构的通信需求。本文详细分析了Linkis-RPC的设计思想、代码结构,并提供了完整的代码示例,希望能为开发者提供有价值的参考。
如需进一步的讨论和问题解答,欢迎留言,共同探讨Linkis-RPC的更多应用场景和实现细节。
参考链接:公共模块 - RPC 模块 - 《Apache Linkis v1.3.0 中文文档》 - 书栈网 · BookStack
相关文章:
Linkis-RPC的设计思想
我的技术网站 java-broke.site,有大厂完整面经,工作技术,架构师成长之路,等经验分享 Linkis-RPC的设计目标是提供一种灵活、可扩展的微服务间通信机制,支持以下功能: 异步请求与响应:支持请求方…...
31 - memmove()函数
文章目录 1 函数原型2 参数3 返回值4 示例 1 函数原型 memmove():移动内存块,函数原型如下: void * memmove ( void * destination, const void * source, size_t num );cstring库描述如下: Move block of memory 1. Copies th…...
【深度学习】创建和训练Transformer神经网络模型,将葡萄牙语翻译成英语
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言1. 安装2. 数据处理2.1 下载数据集2.2 设置标记器2.3 使用tf.data设置数据管道 3. 测试数据集4. 定义组件4.1 嵌入和位置编码层4.2 添加并规范化4.3 基础注意力…...
[Qt][多元素控件]详细讲解
目录 0.前言1.List Widget2.Table Widget3.Tree Widget 0.前言 Qt中提供的多元素控件有: 列表: QListWidgetQListView 表格: QTableWidgetQTableView 树形: QTreeWidgetQTreeView Widget和View之间的区别,以QTableWi…...
/var/log/里面的文件具体是什么?linux的登录文件
1,什么是登录文件? linux系统官方对登录文件的定义解释我就不说了,我个人理解登录文件其实就是记录系统活动信息的几个文件,登录文件其实就是系统的日志文件。 比如linux系统默认是不会安装nginx的,nginx的日志为/var…...
JVM知识总结(双亲委派机制)
文章收录在网站:http://hardyfish.top/ 文章收录在网站:http://hardyfish.top/ 文章收录在网站:http://hardyfish.top/ 文章收录在网站:http://hardyfish.top/ 双亲委派机制 双亲委派类加载过程 当App尝试加载一个类时&#x…...
YOLOv2:更快更准的目标检测
目录 前言 2.1 简介 2.2 网络结构 2.3 改进方法 2.4 性能表现 前言 自从 You Only Look Once (YOLO) 系列算法问世以来,就以其独特的设计和高效的性能在目标检测领域占据了重要地位。YOLOv1 开创了单阶段检测的新纪元,通过将整个检测过程简化为一个端到端…...
硬件工程师笔面试真题汇总
目录 1、电阻 1)上拉电阻的作用 2)PTC热敏电阻作为电源电路保险丝的工作原理 2、电容 1)电容的特性 2) 电容的特性曲线 3) 1uf的电容通常来滤除什么频率的信号 3、电感 4、二极管 1)二极管特性 2)二极管伏安…...
【vue+marked】marked
一、使用marked 第一步:下载marked和代码块高亮highlight.js npm i markednpm i highlight.jsnpm i markdown-loadernpm i github-markdown-css 第二步:注册并使用 main.js import hljs from "highlight.js"; import "github-markdow…...
无人机之热成像篇
一、定义 无人机热成像技术是指将热成像相机安装在无人机云台上,通过无人机的高空飞行能力和云台的稳定性,结合红外热成像技术对目标区域进行非接触式的温度测量和图像采集。该技术利用物体发出的红外辐射来生成图像,通过测量物体表面温度分布…...
浅谈C/C++指针和引用在Linux和Windows不同环境下的编码风格
目录 0. 前言 1. 代码块、函数体上的 { } 的规范 2. 指针和引用中的 * 和 & 符号的位置 1. Linux 环境下编码风格(gcc) 2. Windows 环境下编码风格(Visual Studio) 3. 简单总结 0. 前言 C/C因为高度的自由性,并没有对一些常见的编码风格进行限制&#…...
【C#】一个项目移动了位置,或者换到其他电脑上,编译报错 Files 的值“IGEF,解决方法
文章目录 1 问题分析2 本文解决方法 一个项目可以正常运行编译的项目,所有路径均为相对路径。 移动了位置,或者换到其他电脑上,编译报错 Files 的值“IGEF, 1 问题分析 这个错误信息表明在处理文件时,Files 的值出…...
代码随想录算法训练营第五十八天|拓扑排序精讲 、dijkstra(朴素版)精讲
拓扑排序 117. 软件构建 from collections import deque, defaultdictdef topological_sort(n, edges):inDegree [0] * n # inDegree 记录每个文件的入度umap defaultdict(list) # 记录文件依赖关系# 构建图和入度表for s, t in edges:inDegree[t] 1umap[s].append(t)# 初…...
【ARM】ULINK Pro如何和SWD接口进行连接调试
【更多软件使用问题请点击亿道电子官方网站】 1、 文档目标 解决ULINK Pro和JTAR接口进行连接问题。 2、 问题场景 因为ULINK Pro本身自带的接口是Cortex-M ETM Interface 20-pin Connector。所以无法和JTAR接口直接进行连接。 图2-1 3、软硬件环境 1)、软件版…...
react框架安全设计
react框架安全设计 1、易受攻击的React版本 React库在过去有一些严重性很高的漏洞,因此最好保持稳定版中的最新版本。 2、数据绑定 使用默认的{}进行数据绑定,React会自动对值进行转义以防止XSS攻击。但注意这种保护只在渲染textContent时候有用,渲染 HTML attributes的…...
Kafka生产调优实践。Kafka消息安全性、消息丢失、消息积压、保证消息顺序性
文章目录 搭建Kafka监控平台合理规划Kafka部署环境合理优化Kafka集群配置优化Kafka客户端使用方式合理保证消息安全消费者防止消息重复消费 生产环境常见问题分析消息零丢失方案消息积压如何处理如何保证消息顺序 搭建Kafka监控平台 官网地址 下载efak-web-3.0.2-bin.tar.gz安…...
DDColor部署安装,在服务器Ubuntu22.04系统——点动科技
DDColor图片上色项目的部署安装,在服务器Ubuntu22.04系统——点动科技 一、ubuntu22.04基本环境配置1.1 更换清华Ubuntu镜像源1.2 更新包列表:2. 安装英伟达显卡驱动2.1 使用wget在命令行下载驱动包2.2 更新软件列表和安装必要软件、依赖2.2 卸载原有驱动…...
使用 SSL/TLS 加密保障 RocketMQ 的安全传输
引言 在现代分布式系统中,数据传输的安全性至关重要。Apache RocketMQ作为一款高性能、高吞吐量的消息中间件,在许多关键应用场景中被广泛使用。为了确保消息传输的安全性,SSL/TLS 加密提供了一种可靠的解决方案。本文将详细介绍如何在 Rock…...
uni-app开发
参考帖 uniapp官方文档 组件库 项目中肯定需要使用第三方组件库,因为现有的这些不够方便我们去使用 uview: 演示 | uView 2.0 - 全面兼容 nvue 的 uni-app 生态框架 - uni-app UI 框架 ThorUI: 介绍 | ThorUI文档 创建uni-app项目 有HBuilder…...
2024社招面经_存储DB广告架构方向
总结 第一次社招,主要是三四月份面的,offer的有高德、拼多多、腾讯、美团、快手、携程,后面面的比较累了,因为美团定级和涨幅都还行就去了美团,没再继续面别的。 因为时间比较久了,只在这里贴一下当时有记…...
基于距离变化能量开销动态调整的WSN低功耗拓扑控制开销算法matlab仿真
目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.算法仿真参数 5.算法理论概述 6.参考文献 7.完整程序 1.程序功能描述 通过动态调整节点通信的能量开销,平衡网络负载,延长WSN生命周期。具体通过建立基于距离的能量消耗模型&am…...
关于nvm与node.js
1 安装nvm 安装过程中手动修改 nvm的安装路径, 以及修改 通过nvm安装node后正在使用的node的存放目录【这句话可能难以理解,但接着往下看你就了然了】 2 修改nvm中settings.txt文件配置 nvm安装成功后,通常在该文件中会出现以下配置&…...
基于当前项目通过npm包形式暴露公共组件
1.package.sjon文件配置 其中xh-flowable就是暴露出去的npm包名 2.创建tpyes文件夹,并新增内容 3.创建package文件夹...
第25节 Node.js 断言测试
Node.js的assert模块主要用于编写程序的单元测试时使用,通过断言可以提早发现和排查出错误。 稳定性: 5 - 锁定 这个模块可用于应用的单元测试,通过 require(assert) 可以使用这个模块。 assert.fail(actual, expected, message, operator) 使用参数…...
linux 下常用变更-8
1、删除普通用户 查询用户初始UID和GIDls -l /home/ ###家目录中查看UID cat /etc/group ###此文件查看GID删除用户1.编辑文件 /etc/passwd 找到对应的行,YW343:x:0:0::/home/YW343:/bin/bash 2.将标红的位置修改为用户对应初始UID和GID: YW3…...
vue3+vite项目中使用.env文件环境变量方法
vue3vite项目中使用.env文件环境变量方法 .env文件作用命名规则常用的配置项示例使用方法注意事项在vite.config.js文件中读取环境变量方法 .env文件作用 .env 文件用于定义环境变量,这些变量可以在项目中通过 import.meta.env 进行访问。Vite 会自动加载这些环境变…...
【Go语言基础【13】】函数、闭包、方法
文章目录 零、概述一、函数基础1、函数基础概念2、参数传递机制3、返回值特性3.1. 多返回值3.2. 命名返回值3.3. 错误处理 二、函数类型与高阶函数1. 函数类型定义2. 高阶函数(函数作为参数、返回值) 三、匿名函数与闭包1. 匿名函数(Lambda函…...
return this;返回的是谁
一个审批系统的示例来演示责任链模式的实现。假设公司需要处理不同金额的采购申请,不同级别的经理有不同的审批权限: // 抽象处理者:审批者 abstract class Approver {protected Approver successor; // 下一个处理者// 设置下一个处理者pub…...
LLMs 系列实操科普(1)
写在前面: 本期内容我们继续 Andrej Karpathy 的《How I use LLMs》讲座内容,原视频时长 ~130 分钟,以实操演示主流的一些 LLMs 的使用,由于涉及到实操,实际上并不适合以文字整理,但还是决定尽量整理一份笔…...
uniapp 开发ios, xcode 提交app store connect 和 testflight内测
uniapp 中配置 配置manifest 文档:manifest.json 应用配置 | uni-app官网 hbuilderx中本地打包 下载IOS最新SDK 开发环境 | uni小程序SDK hbulderx 版本号:4.66 对应的sdk版本 4.66 两者必须一致 本地打包的资源导入到SDK 导入资源 | uni小程序SDK …...
