自定义Dubbo RPC通信协议
前言
Dubbo 协议层的核心SPI接口是org.apache.dubbo.rpc.Protocol,通过扩展该接口和围绕的相关接口,就可以让 Dubbo 使用我们自定义的协议来通信。默认的协议是 dubbo,本文提供一个 Grpc 协议的实现。
设计思路
Google 提供了 Java 的 Grpc 实现,所以我们站在巨人的肩膀上即可,就不用重复造轮子了。
首先,我们要实现 Protocol 接口,服务暴露时开启我们的 GrpcServer,绑定本地端口,用于后续处理连接和请求。
服务端如何处理grpc请求呢???
方案一,是把暴露的所有服务 Invoker 都封装成grpc的 Service,全部统一让 GrpcServer 处理,但是这么做太麻烦了。方案二,是提供一个 DispatcherService,统一处理客户端发来的grpc请求,再根据参数查找要调用的服务,执行本地调用返回结果。本文采用方案二。
客户端引用服务时,我们创建 GrpcInvoker 对象,和服务端建立连接并生成 DispatcherService 的本地存根 Stub 对象,发起 RPC 调用时只需把 RpcInvocation 转换成 Protobuf 消息发出去即可。
实现GrpcProtocol
项目结构
首先,我们新建一个dubbo-extension-protocol-grpc模块,引入必要的依赖。
<dependencies><dependency><groupId>org.apache.dubbo</groupId><artifactId>dubbo-rpc-api</artifactId><version>${dubbo.version}</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-all</artifactId><version>1.56.1</version></dependency>
</dependencies>
项目结构:
main
--java
----dubbo.extension.rpc.grpc
------message
--------RequestData.java
--------ResponseData.java
------Codec.java
------DispatcherService.java
------DispatcherServiceGrpc.java
------GrpcExporter.java
------GrpcInvoker.java
------GrpcProtocol.java
------GrpcProtocolServer.java
--resources
----META-INF/dubbo
------org.apache.dubbo.rpc.Protocol
服务&消息定义
然后是定义grpc的 Service 和消息格式
DispatcherService.proto 请求分发服务的定义
syntax = "proto3";option java_multiple_files = true;
option java_package = "dubbo.extension.rpc.grpc";
option java_outer_classname = "DispatcherServiceProto";
option objc_class_prefix = "HLW";import "RequestData.proto";
import "ResponseData.proto";service DispatcherService {rpc dispatch (RequestData) returns (ResponseData) {}
}
RequestData.proto 请求消息的定义,主要是对 Invocation 的描述
syntax = "proto3";option java_multiple_files = true;
option java_package = "dubbo.extension.rpc.grpc.message";
option java_outer_classname = "RequestDataProto";
option objc_class_prefix = "HLW";message RequestData {string targetServiceUniqueName = 1;string methodName = 2;string serviceName = 3;repeated bytes parameterTypes = 4;string parameterTypesDesc = 5;repeated bytes arguments = 6;bytes attachments = 7;
}
ResponseData.proto 响应消息的定义,主要是对 AppResponse 的描述
syntax = "proto3";option java_multiple_files = true;
option java_package = "dubbo.extension.rpc.grpc.message";
option java_outer_classname = "ResponseataProto";
option objc_class_prefix = "HLW";message ResponseData {int32 status = 1;string errorMessage = 2;bytes result = 3;bytes attachments = 4;
}
使用protobuf-maven-plugin插件把 proto 文件生成对应的 Java 类。
协议实现
新建 GrpcProtocol 类,继承 AbstractProtocol,实现 Protocol 协议细节。
核心是:服务暴露时开启 Grpc 服务,引用服务时生成对应的 Invoker。
public class GrpcProtocol extends AbstractProtocol {@Overrideprotected <T> Invoker<T> protocolBindingRefer(Class<T> type, URL url) throws RpcException {return new GrpcInvoker<>(type, url);}@Overridepublic int getDefaultPort() {return 18080;}@Overridepublic <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {GrpcExporter<T> exporter = new GrpcExporter<>(invoker);exporterMap.put(invoker.getInterface().getName(), exporter);openServer(invoker.getUrl());return exporter;}private void openServer(URL url) {String key = serviceKey(url);ProtocolServer protocolServer = serverMap.get(key);if (protocolServer == null) {synchronized (serverMap) {protocolServer = serverMap.get(key);if (protocolServer == null) {serverMap.put(key, createServer(url));}}}}private ProtocolServer createServer(URL url) {return new GrpcProtocolServer(url, exporterMap);}
}
新建 GrpcProtocolServer 类实现 ProtocolServer 接口,核心是启动 GrpcServer,并添加 DispatcherService 处理请求。
public class GrpcProtocolServer implements ProtocolServer {private final Server server;public GrpcProtocolServer(URL url, Map<String, Exporter<?>> exporterMap) {server = ServerBuilder.forPort(url.getPort()).addService(new DispatcherService(exporterMap)).build();try {server.start();} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic String getAddress() {return null;}@Overridepublic void setAddress(String address) {}@Overridepublic void close() {server.shutdown();}
}
新建 DispatcherService 类实现 Grpc Service,用来处理客户端的grpc请求。核心是把 RequestData 解码成 RpcInvocation,再查找本地 Invoker 调用并返回结果。
public class DispatcherService extends DispatcherServiceGrpc.DispatcherServiceImplBase {private final Map<String, Exporter<?>> exporterMap;public DispatcherService(Map<String, Exporter<?>> exporterMap) {this.exporterMap = exporterMap;}@Overridepublic void dispatch(RequestData request, StreamObserver<ResponseData> responseObserver) {RpcInvocation invocation = Codec.decodeInvocation(request);ResponseData responseData;try {Invoker<?> invoker = exporterMap.get(invocation.getServiceName()).getInvoker();Object returnValue = invoker.invoke(invocation).get().getValue();responseData = Codec.encodeResponse(returnValue, null);} catch (Exception e) {responseData = Codec.encodeResponse(null, e);}responseObserver.onNext(responseData);responseObserver.onCompleted();}
}
新建 GrpcInvoker 类实现 Invoker 接口,服务引用时会创建它,目的是发起 RPC 调用时通过 Stub 发一个请求到 DispatcherService,实现grpc协议的 RPC 调用。
public class GrpcInvoker<T> extends AbstractInvoker<T> {private static final Map<String, DispatcherServiceGrpc.DispatcherServiceFutureStub> STUB_MAP = new ConcurrentHashMap<>();public GrpcInvoker(Class<T> type, URL url) {super(type, url);}private DispatcherServiceGrpc.DispatcherServiceFutureStub getStub() {String key = getUrl().getAddress();DispatcherServiceGrpc.DispatcherServiceFutureStub stub = STUB_MAP.get(key);if (stub == null) {synchronized (STUB_MAP) {stub = STUB_MAP.get(key);if (stub == null) {STUB_MAP.put(key, stub = createClient(getUrl()));}}}return stub;}private DispatcherServiceGrpc.DispatcherServiceFutureStub createClient(URL url) {ManagedChannel channel = ManagedChannelBuilder.forAddress(url.getHost(), url.getPort()).usePlaintext().build();return DispatcherServiceGrpc.newFutureStub(channel);}@Overrideprotected Result doInvoke(Invocation invocation) throws Throwable {RequestData requestData = Codec.encodeInvocation((RpcInvocation) invocation);ResponseData responseData = getStub().dispatch(requestData).get();return Codec.decodeResponse(responseData, invocation);}
}
最后是编解码器 Codec,它的作用是对 RequestData、ResponseData 对象的编解码。对于请求来说,要编解码的是 RpcInvocation;对于响应来说,要编解码的是返回值和异常信息。
方法实参是 Object[] 类型,附带参数是 Map 类型,本身不能直接通过 Protobuf 传输,我们会先利用 Serialization 序列化成字节数组后再传输。
public class Codec {private static final Serialization serialization = ExtensionLoader.getExtensionLoader(Serialization.class).getDefaultExtension();public static RequestData encodeInvocation(RpcInvocation invocation) {RequestData.Builder builder = RequestData.newBuilder().setTargetServiceUniqueName(invocation.getTargetServiceUniqueName()).setMethodName(invocation.getMethodName()).setServiceName(invocation.getServiceName());for (Class<?> parameterType : invocation.getParameterTypes()) {builder.addParameterTypes(serialize(parameterType));}builder.setParameterTypesDesc(invocation.getParameterTypesDesc());for (Object argument : invocation.getArguments()) {builder.addArguments(serialize(argument));}builder.setAttachments(serialize(invocation.getAttachments()));return builder.build();}public static RpcInvocation decodeInvocation(RequestData requestData) {RpcInvocation invocation = new RpcInvocation();invocation.setTargetServiceUniqueName(requestData.getTargetServiceUniqueName());invocation.setMethodName(requestData.getMethodName());invocation.setServiceName(requestData.getServiceName());List<ByteString> parameterTypesList = requestData.getParameterTypesList();Class<?>[] parameterTypes = new Class[parameterTypesList.size()];for (int i = 0; i < parameterTypesList.size(); i++) {parameterTypes[i] = (Class<?>) deserialize(parameterTypesList.get(i));}invocation.setParameterTypes(parameterTypes);invocation.setParameterTypesDesc(requestData.getParameterTypesDesc());List<ByteString> argumentsList = requestData.getArgumentsList();Object[] arguments = new Object[argumentsList.size()];for (int i = 0; i < argumentsList.size(); i++) {arguments[i] = deserialize(argumentsList.get(i));}invocation.setArguments(arguments);invocation.setAttachments((Map<String, String>) deserialize(requestData.getAttachments()));return invocation;}public static Result decodeResponse(ResponseData responseData, Invocation invocation) {AppResponse appResponse = new AppResponse();if (responseData.getStatus() == 200) {appResponse.setValue(deserialize(responseData.getResult()));appResponse.setAttachments((Map<String, String>) deserialize(responseData.getAttachments()));} else {appResponse.setException(new RuntimeException(responseData.getErrorMessage()));}return new AsyncRpcResult(CompletableFuture.completedFuture(appResponse), invocation);}private static Object deserialize(ByteString byteString) {try {InputStream inputStream = new ByteArrayInputStream(byteString.toByteArray());ObjectInput objectInput = serialization.deserialize(null, inputStream);return objectInput.readObject();} catch (Exception e) {throw new RuntimeException(e);}}private static ByteString serialize(Object obj) {try {ByteArrayOutputStream outputStream = new ByteArrayOutputStream();ObjectOutput output = serialization.serialize(null, outputStream);output.writeObject(obj);output.flushBuffer();return ByteString.copyFrom(outputStream.toByteArray());} catch (Exception e) {throw new RuntimeException(e);}}public static ResponseData encodeResponse(Object returnValue, Throwable throwable) {ResponseData.Builder builder = ResponseData.newBuilder();if (throwable == null) {builder.setStatus(200);builder.setResult(serialize(returnValue));builder.setAttachments(serialize(new HashMap<>()));//先忽略} else {builder.setStatus(500);builder.setErrorMessage(throwable.getMessage());}return builder.build();}
}
实现完毕,最后是让 Dubbo 可以加载到我们自定义的 GrpcProtocol,可以通过 SPI 的方式。新建META-INF/dubbo/org.apache.dubbo.rpc.Protocol文件,内容:
grpc=dubbo.extension.rpc.grpc.GrpcProtocol
服务提供方使用自定义协议:
ProtocolConfig protocolConfig = new ProtocolConfig("grpc", 10880);
消费方使用自定义协议:
ReferenceConfig#setUrl("grpc://127.0.0.1:10880");
尾巴
Protocol 层关心的是如何暴露服务和引用服务,以及如何让双方使用某个具体的协议来通信,以完成 RPC 调用。如果你觉得官方提供的 dubbo 协议无法满足你的业务,就可以通过扩展 Protocol 接口来实现你自己的私有协议。
相关文章:
自定义Dubbo RPC通信协议
前言 Dubbo 协议层的核心SPI接口是org.apache.dubbo.rpc.Protocol,通过扩展该接口和围绕的相关接口,就可以让 Dubbo 使用我们自定义的协议来通信。默认的协议是 dubbo,本文提供一个 Grpc 协议的实现。 设计思路 Google 提供了 Java 的 Grpc…...
VB6.0报错:操作符AddressOf使用无效
VB调试,尝试调用DLL中的方法并带有回调函数,报错提示: 操作符AddressOf使用无效 代码: Private Sub btnScan_Click()... WCHBLEStartScanBLEDevices AddressOf callBackEnd Sub This function is called from the dll Public Fu…...
SpringCloud Aliba-Sentinel【中篇】-从入门到学废【5】
目录 1.流控规则 2. 熔断规则 3.热点规则 1.流控规则 1.资源名:唯一名称,默认请求路径 2.针对来源: Sentinel可以针对调用者进行限流,填写微服务名,默认default (不区分来源) 3.阈值类型/单机阈值: QPS(每秒钟的请求数量&…...
四、基础篇 vue条件渲染
v-if v-if 指令用于条件性地渲染一块内容。这块内容只会在指令的表达式返回 truthy 值的时候被渲染。 <template><div class"content"><div v-if"show">show渲染了</div></div> </template><script> export de…...
广东金牌电缆:法大大电子合同助力业务风险管控
广东金牌电缆集团股份有限公司(以下简称“广东金牌电缆”)成立于2013年,现为广东省电线电缆重点生产企业、广东省守合同重信用单位、国家专精特新小巨人企业、国家高新技术企业,拥有自主商标“夺冠”,“夺冠”商标被评…...
机器学习周刊第五期:一个离谱的数据可视化Python库、可交互式动画学概率统计、机器学习最全文档、快速部署机器学习应用的开源项目、Redis 之父的最新文章
date: 2024/01/08 这个网站用可视化的方式讲解概率和统计基础知识,很多内容还是可交互的,非常生动形象。 大家好,欢迎收看第五期机器学习周刊 本期介绍7个内容,涉及Python、概率统计、机器学习、大模型等,目录如下: 一个离谱的Python库看见概率,看见统计2024机器学习最…...
vue和react的hooks
一、什么是hooks 直译“钩子”,在程序中代表,系统运行在某一时期时,会调用注册在该时机的回调函数。例如浏览器提供的onload或addEventListener能注册在浏览器各种时机调用的方法。 二、react中的hooks 一系列以“use”作为开发的方法&…...
2024.1.19
今天狠狠地复习了一下C语言,不复习不知道,一复习吓一跳昂,这感觉好多都忘却了,这并非一件好事,所以说还好复习了,不然考试就有点问题了,但是还好写一下这些代码就马上想起来了,所以说…...
上位机编程:CP56Time2a格式精讲
Cp56Time2a介绍: Cp56Time2a是西门子PLC(可编程逻辑控制器)中用于时间数据传输的一种特殊格式,主要用于PCS7和基于TCP/IP的S7通信过程中。这种时间格式主要为了确保在不同的系统和设备之间进行精确的时间同步。 Cp56Time2a格式&a…...
Webpack5入门到原理12:处理 Html 资源
1. 下载包 npm i html-webpack-plugin -D 2. 配置 webpack.config.js const path require("path"); const ESLintWebpackPlugin require("eslint-webpack-plugin"); const HtmlWebpackPlugin require("html-webpack-plugin");module.expo…...
Vue3-Axios二次封装与Api接口统一管理
一、安装axios npm i axios 二、创建utils工具文件夹 创建request.ts文件 import axios from axios //引入element-plus消息提示 import { ElMessage } from element-plus //引入用户相关的仓库 import useUserStore from /store/modules/user //使用axios对象create方法,创建…...
RHCE: 主从DNS服务器配置 (实现正反向解析)
主服务器配置: 准备工作: #关闭防火墙 [root192 ~]# systemctl stop firewalld#关闭selinux [root192 ~]# setenforce 0#查看selinux状态 [root192 ~]# getenforce Permissive#安装bind包 [root192 ~]# yum install bind -y#查询软件包下的文件 /etc/named.conf #主配置文…...
Git学习笔记(第6章):GitHub操作(远程库操作)
目录 6.1 远程库操作 6.1.1 创建远程库 6.1.2 命名远程库 6.1.3 本地库推送到远程库(push) 6.1.4 远程库拉取到本地库(pull) 6.1.5 远程库克隆到本地库(clone) 6.2 团队内协作 6.3 跨团队协作 6.4 SSH免密登录 6.1 远程库操作 命令 作用 git remote -v 查看所有远程…...
【主题广范|见刊快】2024年海洋工程与测绘遥感国际学术会议(ICOESRS 2024)
【主题广范|见刊快】2024年海洋工程与测绘遥感国际学术会议(ICOESRS 2024) 2024 International Conference Ocean Engineering and Surveying Remote Sensing(ICOESRS 2024) 一、【会议简介】 随着人类对海洋的认识和开发不断深入,海洋工程和测绘遥感技术的研究和应…...
解决el-radio-group只触发一次的问题
1.需求是点击合并后,出来二次确认框。现在的问题是点击完出现二次确认框后,再次点击不出来二次确认框了 2.一开始代码是这样写的 <el-radio-group v-model"unfold" size"mini" changechangeMerge><el-radio-button :labe…...
openssl3.2 - 官方demo学习 - pkey - EVP_PKEY_RSA_keygen.c
文章目录 openssl3.2 - 官方demo学习 - pkey - EVP_PKEY_RSA_keygen.c概述笔记END openssl3.2 - 官方demo学习 - pkey - EVP_PKEY_RSA_keygen.c 概述 官方指出 : RSA key 如果小于2048位, 就属于弱key 官方demo中, 给出的默认key长度为4096位 从名字生成上下文 初始化上下文…...
密码搜|Facebook 8组问答,搞定Pixel与广告之间的关系!
Q1:Pixel(像素/代码)是什么? A:Pixel有多种称呼:Pixel、像素、代码。它只是一种分析工具,可帮助广告主了解用户在网站上采取的操作,继而衡量广告成效。 设置Facebook Pixel像素代码…...
Apache StringUtils:Java字符串处理工具类
简介 在我们的代码中经常需要对字符串判空,截取字符串、转换大小写、分隔字符串、比较字符串、去掉多余空格、拼接字符串、使用正则表达式等等。如果只用 String 类提供的那些方法,我们需要手写大量的额外代码,不然容易出现各种异常。现在有…...
设计模式 代理模式(静态代理 动态代理) 与 Spring Aop源码分析 具体是如何创建Aop代理的
代理模式 代理模式是一种结构型设计模式,它通过创建一个代理对象来控制对真实对象的访问。这种模式可以用于提供额外的功能操作,或者扩展目标对象的功能。 在代理模式中,代理对象与真实对象实现相同的接口,以便在任何地方都可以使…...
【EI会议征稿通知】第七届先进电子材料、计算机与软件工程国际学术会议(AEMCSE 2024)
第七届先进电子材料、计算机与软件工程国际学术会议(AEMCSE 2024) 2024 7th International Conference on Advanced Electronic Materials, Computers and Software Engineering 第七届先进电子材料、计算机与软件工程国际学术会议(AEMCSE 2024)将于2024年5月10-1…...
iPhone密码忘记了办?iPhoneUnlocker,iPhone解锁工具Aiseesoft iPhone Unlocker 高级注册版分享
平时用 iPhone 的时候,难免会碰到解锁的麻烦事。比如密码忘了、人脸识别 / 指纹识别突然不灵,或者买了二手 iPhone 却被原来的 iCloud 账号锁住,这时候就需要靠谱的解锁工具来帮忙了。Aiseesoft iPhone Unlocker 就是专门解决这些问题的软件&…...
【ROS】Nav2源码之nav2_behavior_tree-行为树节点列表
1、行为树节点分类 在 Nav2(Navigation2)的行为树框架中,行为树节点插件按照功能分为 Action(动作节点)、Condition(条件节点)、Control(控制节点) 和 Decorator(装饰节点) 四类。 1.1 动作节点 Action 执行具体的机器人操作或任务,直接与硬件、传感器或外部系统…...
令牌桶 滑动窗口->限流 分布式信号量->限并发的原理 lua脚本分析介绍
文章目录 前言限流限制并发的实际理解限流令牌桶代码实现结果分析令牌桶lua的模拟实现原理总结: 滑动窗口代码实现结果分析lua脚本原理解析 限并发分布式信号量代码实现结果分析lua脚本实现原理 双注解去实现限流 并发结果分析: 实际业务去理解体会统一注…...
鱼香ros docker配置镜像报错:https://registry-1.docker.io/v2/
使用鱼香ros一件安装docker时的https://registry-1.docker.io/v2/问题 一键安装指令 wget http://fishros.com/install -O fishros && . fishros出现问题:docker pull 失败 网络不同,需要使用镜像源 按照如下步骤操作 sudo vi /etc/docker/dae…...
vulnyx Blogger writeup
信息收集 arp-scan nmap 获取userFlag 上web看看 一个默认的页面,gobuster扫一下目录 可以看到扫出的目录中得到了一个有价值的目录/wordpress,说明目标所使用的cms是wordpress,访问http://192.168.43.213/wordpress/然后查看源码能看到 这…...
搭建DNS域名解析服务器(正向解析资源文件)
正向解析资源文件 1)准备工作 服务端及客户端都关闭安全软件 [rootlocalhost ~]# systemctl stop firewalld [rootlocalhost ~]# setenforce 0 2)服务端安装软件:bind 1.配置yum源 [rootlocalhost ~]# cat /etc/yum.repos.d/base.repo [Base…...
【 java 虚拟机知识 第一篇 】
目录 1.内存模型 1.1.JVM内存模型的介绍 1.2.堆和栈的区别 1.3.栈的存储细节 1.4.堆的部分 1.5.程序计数器的作用 1.6.方法区的内容 1.7.字符串池 1.8.引用类型 1.9.内存泄漏与内存溢出 1.10.会出现内存溢出的结构 1.内存模型 1.1.JVM内存模型的介绍 内存模型主要分…...
MinIO Docker 部署:仅开放一个端口
MinIO Docker 部署:仅开放一个端口 在实际的服务器部署中,出于安全和管理的考虑,我们可能只能开放一个端口。MinIO 是一个高性能的对象存储服务,支持 Docker 部署,但默认情况下它需要两个端口:一个是 API 端口(用于存储和访问数据),另一个是控制台端口(用于管理界面…...
【Linux系统】Linux环境变量:系统配置的隐形指挥官
。# Linux系列 文章目录 前言一、环境变量的概念二、常见的环境变量三、环境变量特点及其相关指令3.1 环境变量的全局性3.2、环境变量的生命周期 四、环境变量的组织方式五、C语言对环境变量的操作5.1 设置环境变量:setenv5.2 删除环境变量:unsetenv5.3 遍历所有环境…...
给网站添加live2d看板娘
给网站添加live2d看板娘 参考文献: stevenjoezhang/live2d-widget: 把萌萌哒的看板娘抱回家 (ノ≧∇≦)ノ | Live2D widget for web platformEikanya/Live2d-model: Live2d model collectionzenghongtu/live2d-model-assets 前言 网站环境如下,文章也主…...
