【gRPC】Java高性能远程调用之gRPC详解
gRPC详解
- 一、什么是gRPC?
- 二、用proto生成代码
- 2.1、前期准备
- 2.2、protobuf插件安装
- 三、简单 RPC
- 3.1、开发gRPC服务端
- 3.2、开发gRPC客户端
- 3.3、验证gRPC服务
- 四、服务器端流式 RPC
- 4.1、开发一个gRPC服务,类型是服务端流
- 4.2、开发一个客户端,调用前面发布的gRPC服务
- 4.3、验证
- 五、客户端流式 RPC
- 5.1、在proto文件中定义客户端流类型的gRPC接口
- 5.2、开发服务端应用
- 5.3、开发客户端应用
- 5.4、验证
- 六、双向流式 RPC
- 6.1、在proto文件中定义双端流类型的gRPC接口
- 6.2、开发服务端应用
- 6.3、开发客户端应用
- 6.4、验证
一、什么是gRPC?
gRPC是一款由Google开发的高性能、开源的 RPC(远程过程调用)框架。它基于 HTTP/2 协议,并使用 Protocol Buffers(Protobuf)作为默认的序列化工具。gRPC 的主要作用包括:
-
高性能:基于 HTTP/2,支持多路复用、流式传输和二进制编码,性能优于传统的 HTTP/1.1。
-
跨语言支持:支持多种编程语言(如 Java、Go、Python、C++ 等),适合微服务架构中的多语言环境。
-
强类型接口:通过 Protobuf 定义服务接口,生成强类型的客户端和服务端代码,减少错误。
-
流式通信:支持单向流、双向流等复杂的通信模式。
-
适用于微服务:适合低延迟、高吞吐量的场景,如微服务之间的通信。
贴一张油管老哥基于GraphQL、REST、gRPC的测试结果,在应对较大的RPS时,gRPC需要更少的CPU和内存使用率,并且网络带宽消耗也是最小的(花费最小),同时还有着很低的时延
,10万的rps响应能在20ms内。
基于以上优点,这篇文章详细说说gRPC怎么使用。
详细代码地址:github,有用的话,麻烦给个star
二、用proto生成代码
2.1、前期准备
环境与依赖
依赖 | 版本 |
---|---|
jdk | 1.8 |
maven | 3.9.9 |
springboot | 2.4.2 |
grpc | 1.26.0 |
protobuf | 3.19.4 |
grpc-client-spring-boot-starter | 2.13.0.RELEASE |
grpc-server-spring-boot-starter | 2.13.0.RELEASE |
项目结构
创建一个父项目grpc-test
,包含三个字项目,grpc-client
、grpc-server
和grpc-api
2.2、protobuf插件安装
- IDEA安装
Protobuf
插件
grpc-test
服务的pom文件中引入maven依赖,parent可替换成自己的springboot版本
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><groupId>com.csdn.dev</groupId><artifactId>csdn-dev-boot</artifactId><version>${revision}</version></parent><modelVersion>4.0.0</modelVersion><packaging>pom</packaging><artifactId>grpc-test</artifactId><modules><module>grpc-server</module><module>grpc-client</module><module>grpc-api</module></modules><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><java.version>1.8</java.version><protobuf.version>3.19.4</protobuf.version><grpc.version>1.26.0</grpc.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>${protobuf.version}</version></dependency><!-- grpc server和spring-boot集成框架 --><dependency><groupId>net.devh</groupId><artifactId>grpc-server-spring-boot-starter</artifactId><version>2.13.0.RELEASE</version></dependency><!-- grpc client和spring-boot集成框架 --><dependency><groupId>net.devh</groupId><artifactId>grpc-client-spring-boot-starter</artifactId><version>2.13.0.RELEASE</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.2.0.RELEASE</version></dependency></dependencies>
</project>
- maven中
protobuf plugin
<build><!-- os系统信息插件, protobuf-maven-plugin需要获取系统信息下载相应的protobuf程序 --><extensions><extension><groupId>kr.motd.maven</groupId><artifactId>os-maven-plugin</artifactId><version>1.6.2</version></extension></extensions><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin><plugin><groupId>org.xolstice.maven.plugins</groupId><artifactId>protobuf-maven-plugin</artifactId><version>0.6.1</version><configuration><protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact><pluginId>grpc-java</pluginId><pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact><!-- proto文件目录 --><protoSourceRoot>${project.basedir}/src/main/proto</protoSourceRoot><!-- 生成的Java文件目录 --><outputDirectory>${project.basedir}/src/main/java/</outputDirectory><clearOutputDirectory>false</clearOutputDirectory><!--<outputDirectory>${project.build.directory}/generated-sources/protobuf</outputDirectory>--></configuration><executions><execution><goals><goal>compile</goal><goal>compile-custom</goal></goals></execution></executions></plugin></plugins>
</build>
在maven中的build中的plugins中添加上面插件。
protoSourceRoot
指定*.proto定义的消息文件路径。
outputDirectory
指定输出的java文件地址。默认是输出到target中。
clearOutputDirectory
是否清空输出文件,默认为是,如果是,outputDirectory下的项目工程会被清空。
执行maven,出现下图中protobuf表示插件安装成功
- 在
grpc-api
模块的src/main/proto
目录下新增名为helloworld.proto
的文件,这里面定义了一个gRPC服务,里面含有一个接口,并且还有这个接口的入参和返回结果的定义:
syntax = "proto3";option java_multiple_files = true;
// 生成java代码的package
option java_package = "com.demo.grpc.protocol";
option java_outer_classname = "HelloWorld";// 入参的数据结构
message HelloRequest {string name = 1;
}// 返回结果的数据结构
message HelloResponse {string message = 1;
}// gRPC服务
service HelloWorldService {// 接口定义rpc SayHello (HelloRequest) returns (HelloResponse) {}
}
- proto文件已经做好,接下来要根据这个文件来生成java代码,双击插件中的
compile
和compile-custom
,分别编译bean
和service
。也可以执行mvn clean compile
生成文件。
- 生成的文件
三、简单 RPC
gRPC服务的开发和调用,实现的效果如下图:
3.1、开发gRPC服务端
- 首先要开发的是gRPC服务端,回顾前文中
helloworld.proto
中定义的服务和接口,如下所示,名为HelloWorldService的服务对外提供名为SayHello接口,这就是咱们接下来的任务,创建一个springboot应用,该应用以gRPC的方式提供SayHello接口给其他应用远程调用:
// gRPC服务
service HelloWorldService {// 接口定义rpc SayHello (HelloRequest) returns (HelloResponse) {}
}
- 基于springboot框架开发一个普通的gRPC服务端应用,一共需要五个步骤,如下图所示,接下来我们按照下图序号的顺序来开发:
- 导入maven依赖,引入grpc-api依赖
<dependency><groupId>com.csdn.dev</groupId><artifactId>grpc-api</artifactId>
</dependency>
- 这是个springboot应用,配置文件内容如下:
spring:application:name: grpc-server
grpc:server:port: 9252
- 新建拦截类
LogGrpcInterceptor.java
,每当gRPC请求到来后该类会先执行,这里是将方法名字在日志中打印出来,您可以对请求响应做更详细的处理:
package com.demo.grpc.interceptor;import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class LogGrpcInterceptor implements ServerInterceptor {@Overridepublic <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata,ServerCallHandler<ReqT, RespT> serverCallHandler) {log.debug(serverCall.getMethodDescriptor().getFullMethodName());return serverCallHandler.startCall(serverCall, metadata);}
}
- 为了让
LogGrpcInterceptor
可以在gRPC请求到来时被执行,需要做相应的配置,如下所示,在普通的bean的配置中添加注解即可:
package com.demo.grpc.config;import com.demo.grpc.interceptor.LogGrpcInterceptor;
import io.grpc.ServerInterceptor;
import net.devh.boot.grpc.server.interceptor.GrpcGlobalServerInterceptor;
import org.springframework.context.annotation.Configuration;@Configuration(proxyBeanMethods = false)
public class GlobalInterceptorConfiguration {@GrpcGlobalServerInterceptorServerInterceptor logServerInterceptor(){return new LogGrpcInterceptor();}
}
- 应用启动类很简单
package com.demo.grpc;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class GrpcServerApplication {public static void main(String[] args) {SpringApplication.run(GrpcServerApplication.class, args);}
}
- 接下来是最重要的service类,gRPC服务在此处对外暴露出去,完整代码如下,有几处要注意的地方稍后提到:
package com.demo.grpc.service;import com.demo.grpc.protocol.HelloRequest;
import com.demo.grpc.protocol.HelloResponse;
import com.demo.grpc.protocol.HelloWorldServiceGrpc;
import io.grpc.stub.StreamObserver;
import net.devh.boot.grpc.server.service.GrpcService;import java.util.Date;@GrpcService
public class GrpcServerService extends HelloWorldServiceGrpc.HelloWorldServiceImplBase {@Overridepublic void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {HelloResponse response = HelloResponse.newBuilder().setMessage("Hello " + request.getName() + ", " + new Date()).build();responseObserver.onNext(response);responseObserver.onCompleted();}
}
- 上述GrpcServerService.java中有几处需要注意:
- 是使用
@GrpcService
注解,再继承HelloWorldServiceImplBase
,这样就可以借助grpc-server-spring-boot-starter
库将sayHello
暴露为gRPC服务; HelloWorldServiceImplBase
是前文中根据proto自动生成的java代码,在grpc-api
模块中;sayHello
方法中处理完毕业务逻辑后,调用onNext
方法填入返回内容;- 调用
onCompleted
方法表示本次gRPC服务完成;
至此,gRPC服务端编码就完成了,咱们接着开始客户端开发;
3.2、开发gRPC客户端
- 同理引入
grpc-api
依赖
<dependency><groupId>com.csdn.dev</groupId><artifactId>grpc-api</artifactId>
</dependency>
- 应用配置文件
grpc-client/src/main/resources/application.yml
,注意address的值就是gRPC服务端的信息,我这里grpc-server
和grpc-client
在同一台电脑上运行,请您根据自己情况来设置:
server:port: 8082
spring:application:name: grpc-clientgrpc:client:# gRPC配置的名字,GrpcClient注解会用到grpc-server:# gRPC服务端地址address: 'static://127.0.0.1:9252'enableKeepAlive: truekeepAliveWithoutCalls: truenegotiationType: plaintext
- 接下来要创建下图展示的类,按序号顺序创建:
- 首先是拦截类
LogGrpcInterceptor
,与服务端的拦截类差不多,不过实现的接口不同:
package com.demo.grpc.interceptor;import io.grpc.*;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class LogGrpcInterceptor implements ClientInterceptor {@Overridepublic <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel channel) {log.debug(method.getFullMethodName());return channel.newCall(method, callOptions);}
}
- 为了让拦截类能够正常工作,即发起gRPC请求的时候被执行,需要新增一个配置类:
package com.demo.grpc.config;import com.demo.grpc.interceptor.LogGrpcInterceptor;
import io.grpc.ClientInterceptor;
import net.devh.boot.grpc.client.interceptor.GrpcGlobalClientInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;@Order(Ordered.LOWEST_PRECEDENCE)
@Configuration(proxyBeanMethods = false)
public class GlobalClientInterceptorConfiguration {@GrpcGlobalClientInterceptorClientInterceptor logGrpcInterceptor() {return new LogGrpcInterceptor();}
}
- 启动类:
package com.demo.grpc;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class GrpcClientApplication {public static void main(String[] args) {SpringApplication.run(GrpcClientApplication.class, args);}
}
- 接下来是最重要的服务类
GrpcClientService
,有几处要注意的地方稍后会提到:
package com.demo.grpc.service;import com.demo.grpc.protocol.HelloRequest;
import com.demo.grpc.protocol.HelloResponse;
import com.demo.grpc.protocol.HelloWorldServiceGrpc;
import io.grpc.StatusRuntimeException;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;@Service
public class GrpcClientService {@GrpcClient("grpc-server")private HelloWorldServiceGrpc.HelloWorldServiceBlockingStub helloWorldServiceBlockingStub;public String sendMessage(final String name) {try {final HelloResponse response = this.helloWorldServiceBlockingStub.sayHello(HelloRequest.newBuilder().setName(name).build());return response.getMessage();} catch (StatusRuntimeException e) {return "FAILED with " + e.getStatus().getCode().name();}}
}
- 上述GrpcClientService类有几处要注意的地方:
- 用
@Service
将GrpcClientService
注册为spring的普通bean实例; - 用
@GrpcClient
修饰HelloWorldServiceBlockingStub
,这样就可以通过grpc-client-spring-boot-starter
库发起gRPC调用,被调用的服务端信息来自名为grpc-server
的配置; HelloWorldServiceBlockingStub
来自前文中根据helloworld.proto
生成的java代码;helloWorldServiceBlockingStub.sayHello
方法会远程调用grpc-server
应用的gRPC服务;
- 为了验证gRPC服务调用能否成功,再新增个web接口,接口内部会调用GrpcClientService.sendMessage,这样咱们通过浏览器就能验证gRPC服务是否调用成功了:
package com.demo.grpc.controller;import com.demo.grpc.service.GrpcClientService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;@RestController
public class GrpcClientController {@Resourceprivate GrpcClientService grpcClientService;@GetMapping("/test1")public String printMessage(@RequestParam(defaultValue = "Hanson") String name) {return grpcClientService.sendMessage(name);}
}
- 编码完成,接下来将两个服务都启动,验证gRPC服务是否正常;
3.3、验证gRPC服务
grpc-server
和grpc-client
都是普通的springboot应用,可以在IDEA中启动,点击下图红框位置,在弹出菜单中选择Run 'LocalServerApplication’
即可启动grpc-server
:
grpc-server
启动后,控制台会提示gRPC server已启动,正在监听9252端口,如下图:
grpc-client
后,在浏览器输入http://localhost:8082/test1?name=Huang
,可以看到响应的内容正是来自grpc-server
的GrpcServerService.java
:
- 从web端到gRPC服务端的关键节点信息如下图:
- 可以看到
grpc-server
的拦截日志:
- 还有
grpc-client
的拦截日志:
四、服务器端流式 RPC
客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息;
4.1、开发一个gRPC服务,类型是服务端流
- 在
src/main/proto
目录下新增文件order.proto
,里面定一个了一个gRPC方法ListOrders
及其入参和返回对象,内容如下,要注意的是返回值要用关键字stream
修饰,表示该接口类型是服务端流:
syntax = "proto3";option java_multiple_files = true;
// 生成java代码的package
option java_package = "com.demo.grpc.order";
// 类名
option java_outer_classname = "OrderProto";// 买家ID
message Buyer {int32 buyerId = 1;
}// 返回结果的数据结构
message Order {// 订单IDint32 orderId = 1;// 商品IDint32 productId = 2;// 交易时间int64 orderTime = 3;// 买家备注string buyerRemark = 4;
}// gRPC服务,这是个在线商城的订单查询服务
service OrderQuery {// 服务端流式:订单列表接口,入参是买家信息,返回订单列表(用stream修饰返回值)rpc ListOrders (Buyer) returns (stream Order) {}
}
- 双击插件中的
compile
和compile-custom
,即可根据proto生成java代码:
- 新生成的java代码如下图红框:
- 接下来是最关键的gRPC服务,代码如下,可见
responseObserver.onNext
方法被多次调用,用以向客户端持续输出数据,最后通过responseObserver.onCompleted
结束输出:
package com.demo.grpc.service;import java.util.ArrayList;
import java.util.List;import com.demo.grpc.order.Buyer;
import com.demo.grpc.order.Order;
import com.demo.grpc.order.OrderQueryGrpc;
import io.grpc.stub.StreamObserver;
import net.devh.boot.grpc.server.service.GrpcService;@GrpcService
public class GrpcOrderServerService extends OrderQueryGrpc.OrderQueryImplBase {/*** 造一批数据* @return*/private static List<Order> mockOrders(){List<Order> list = new ArrayList<>();Order.Builder builder = Order.newBuilder();for (int i = 0; i < 10; i++) {list.add(builder.setOrderId(i).setProductId(100+i).setOrderTime(System.currentTimeMillis()/1000).setBuyerRemark(("Hanson-" + i)).build());}return list;}@Overridepublic void listOrders(Buyer request, StreamObserver<Order> responseObserver) {// 持续输出到clientfor (Order order : mockOrders()) {responseObserver.onNext(order);}// 结束输出responseObserver.onCompleted();}
}
- 至此,服务端开发完成,咱们再开发一个springboot应用作为客户端,看看如何远程调用
listOrders
接口,得到responseObserver.onNext
方法输出的数据;
4.2、开发一个客户端,调用前面发布的gRPC服务
- 客户端模块的基本功能是提供一个web接口,其内部会调用服务端的listOrders接口,将得到的数据返回给前端,如下图:
- 服务端的
listOrders
接口返回的Order
对象里面有很多gRPC相关的内容,不适合作为web接口的返回值,因此定义一个OrderDTO
类作为web接口返回值:
package com.demo.grpc.entity;import lombok.AllArgsConstructor;
import lombok.Data;@Data
@AllArgsConstructor
public class OrderDTO {private int orderId;private int productId;private String orderTime;private String buyerRemark;
}
- 重点来了,
GrpcOrderClientService.java
,里面展示了如何远程调用gRPC服务的listOrders
接口,可见对于服务端流类型的接口,客户端这边通过stub
调用会得到Iterator
类型的返回值,接下来要做的就是遍历Iterator
:
package com.demo.grpc.service;import com.demo.grpc.entity.OrderDTO;
import com.demo.grpc.order.Buyer;
import com.demo.grpc.order.Order;
import io.grpc.StatusRuntimeException;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;@Slf4j
@Service
public class GrpcOrderClientService {@GrpcClient("grpc-server")private com.demo.grpc.order.OrderQueryGrpc.OrderQueryBlockingStub orderQueryBlockingStub;public List<OrderDTO> listOrders(final String name) {// gRPC的请求参数Buyer buyer = Buyer.newBuilder().setBuyerId(101).build();// gRPC的响应Iterator<Order> orderIterator;// 当前方法的返回值List<OrderDTO> orders = new ArrayList<>();// 通过stub发起远程gRPC请求try {orderIterator = orderQueryBlockingStub.listOrders(buyer);} catch (final StatusRuntimeException e) {log.error("error grpc invoke", e);return new ArrayList<>();}DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");log.info("start put order to list");while (orderIterator.hasNext()) {Order order = orderIterator.next();orders.add(new OrderDTO(order.getOrderId(),order.getProductId(),// 使用DateTimeFormatter将时间戳转为字符串dtf.format(LocalDateTime.ofEpochSecond(order.getOrderTime(), 0, ZoneOffset.of("+8"))),order.getBuyerRemark()));log.info("");}log.info("end put order to list");return orders;}
}
- 最后做一个controller类,对外提供一个web接口,里面会调用
GrpcOrderClientService
的方法:
package com.demo.grpc.controller;import com.demo.grpc.entity.OrderVO;
import com.demo.grpc.service.GrpcClientService;
import com.demo.grpc.service.GrpcOrderClientService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.List;@RestController
public class GrpcClientController {@Resourceprivate GrpcOrderClientService grpcOrderClientService;@GetMapping("/test2")public List<OrderVO> printMessage2(@RequestParam(defaultValue = "Hanson") String name) {return grpcOrderClientService.listOrders(name);}
}
- 至此,编码完成,开始验证
4.3、验证
- 启动
grpc-server
,启动成功后会监听9252端口:
- 启动
grpc-client
,再在浏览器访问:http://localhost:8082/test2?name=Huang ,得到结果如下,可见成功地获取了gRPC的远程数据:
五、客户端流式 RPC
客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦客户端完成写入消息,它等待服务器完成读取返回它的响应;
5.1、在proto文件中定义客户端流类型的gRPC接口
- 首先要做的就是定义gRPC接口,打开
order.proto
,在里面新增方法和相关的数据结构,需要重点关注的是AddToCart
方法的入参ProductOrder
前面添加了stream
修饰,代表该方法是客户端流类型:
// 提交购物车时的产品信息
message ProductOrder {// 商品IDint32 productId = 1;// 商品数量int32 number = 2;
}// 提交购物车返回结果的数据结构
message AddCartResponse {// 返回码int32 code = 1;// 描述信息string message = 2;
}// gRPC服务,这是个在线商城的购物车服务
service CartService {// 客户端流式:添加多个商品到购物车rpc AddToCart (stream ProductOrder) returns (AddCartResponse) {}
}
- 双击插件中的
compile
和compile-custom
,即可根据proto生成java代码:
- 新生成的java代码如下图红框:
5.2、开发服务端应用
- 重点是提供grpc服务的
GrpcServerService.java,咱们要做的就是给上层框架返回一个匿名类,至于里面的
onNext、
onCompleted方法何时被调用是上层框架决定的,另外还准备了成员变量
totalCount`,这样就可以记录总数了:
package com.demo.grpc.service;import com.demo.grpc.order.AddCartResponse;
import com.demo.grpc.order.CartServiceGrpc;
import com.demo.grpc.order.ProductOrder;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.server.service.GrpcService;@Slf4j
@GrpcService
public class GrpcCartServerService extends CartServiceGrpc.CartServiceImplBase {@Overridepublic StreamObserver<ProductOrder> addToCart(StreamObserver<AddCartResponse> responseObserver) {// 返回匿名类,给上层框架使用return new StreamObserver<ProductOrder>() {// 记录处理产品的总量private int totalCount = 0;@Overridepublic void onNext(ProductOrder value) {log.info("正在处理商品[{}],数量为[{}]",value.getProductId(),value.getNumber());// 增加总量totalCount += value.getNumber();}@Overridepublic void onError(Throwable t) {log.error("添加购物车异常", t);}@Overridepublic void onCompleted() {log.info("添加购物车完成,共计[{}]件商品", totalCount);responseObserver.onNext(AddCartResponse.newBuilder().setCode(10000).setMessage(String.format("添加购物车完成,共计[%d]件商品", totalCount)).build());responseObserver.onCompleted();}};}
}
5.3、开发客户端应用
- 正常情况下我们都是用
StreamObserver
处理服务端响应,这里由于是异步响应,需要额外的方法从StreamObserver
中取出业务数据,于是定一个新接口,继承自StreamObserver
,新增getExtra
方法可以返回String
对象,详细的用法稍后会看到:
package com.demo.grpc.service;import io.grpc.stub.StreamObserver;public interface ExtendResponseObserver<T> extends StreamObserver<T> {String getExtra();
}
- 重头戏来了,看看如何远程调用客户端流类型的gRPC接口,前面小结提到的2、3、4点都会涉及到,代码中已经添加详细注释:
package com.demo.grpc.service;import com.demo.grpc.order.AddCartResponse;
import com.demo.grpc.order.CartServiceGrpc;
import com.demo.grpc.order.ProductOrder;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;@Service
@Slf4j
public class GrpcCartClientService {@GrpcClient("grpc-server")private CartServiceGrpc.CartServiceStub cartServiceStub;public String addToCart(int count) {CountDownLatch countDownLatch = new CountDownLatch(1);// responseObserver的onNext和onCompleted会在另一个线程中被执行,// ExtendResponseObserver继承自StreamObserverExtendResponseObserver<AddCartResponse> responseObserver = new ExtendResponseObserver<AddCartResponse>() {String extraStr;@Overridepublic String getExtra() {return extraStr;}private int code;private String message;@Overridepublic void onNext(AddCartResponse value) {log.info("on next");code = value.getCode();message = value.getMessage();}@Overridepublic void onError(Throwable t) {log.error("gRPC request error", t);extraStr = "gRPC error, " + t.getMessage();countDownLatch.countDown();}@Overridepublic void onCompleted() {log.info("on complete");extraStr = String.format("返回码[%d],返回信息:%s" , code, message);countDownLatch.countDown();}};// 远程调用,此时数据还没有给到服务端StreamObserver<ProductOrder> requestObserver = cartServiceStub.addToCart(responseObserver);for(int i=0; i<count; i++) {// 发送一笔数据到服务端requestObserver.onNext(build(101 + i, 1 + i));}// 客户端告诉服务端:数据已经发完了requestObserver.onCompleted();try {// 开始等待,如果服务端处理完成,那么responseObserver的onCompleted方法会在另一个线程被执行,// 那里会执行countDownLatch的countDown方法,一但countDown被执行,下面的await就执行完毕了,// await的超时时间设置为2秒countDownLatch.await(2, TimeUnit.SECONDS);} catch (InterruptedException e) {log.error("countDownLatch await error", e);}log.info("service finish");// 服务端返回的内容被放置在requestObserver中,从getExtra方法可以取得return responseObserver.getExtra();}/*** 创建ProductOrder对象* @param productId* @param num* @return*/private static ProductOrder build(int productId, int num) {return ProductOrder.newBuilder().setProductId(productId).setNumber(num).build();}
}
- 最后做个web接口,可以通过web请求验证远程调用:
package com.demo.grpc.controller;import com.demo.grpc.entity.OrderVO;
import com.demo.grpc.service.GrpcCartClientService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.List;@RestController
public class GrpcClientController {@Resourceprivate GrpcCartClientService grpcCartClientService;@GetMapping("/test3")public String printMessage(@RequestParam(defaultValue = "1") int count) {return grpcCartClientService.addToCart(count);}
}
- 编码完成,开始验证
5.4、验证
-
启动client和server
-
浏览器输入http://localhost:8082/test3?count=100,响应如下,可见远程调用gRPC服务成功:
- 下面是服务端日志,可见逐一处理了客户端的每一笔数据:
- 下面是客户端日志,可见由于
CountDownLatch
的作用,发起gRPC请求的线程一直等待responseObserver.onCompleted
在另一个线程被执行完后,才会继续执行:
六、双向流式 RPC
双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器 可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替 的读取和写入消息,或者其他读写的组合。 每个流中的消息顺序被预留;
其实就是结合了4,5两章节内容,快速过一下
6.1、在proto文件中定义双端流类型的gRPC接口
- 首先要做的就是定义gRPC接口,打开
order.proto
,在里面新增方法和相关的数据结构,需要重点关注的是BatchDeduct
方法的入参ProductOrder
和返回值DeductResponse
都添加了stream
修饰(ProductOrder是上一章定义的),代表该方法是双向流类型:
// 扣减库存返回结果的数据结构
message DeductResponse {// 返回码int32 code = 1;// 描述信息string message = 2;
}// gRPC服务,这是个在线商城的库存服务
service StockService {// 双向流式:批量扣减库存rpc BatchDeduct (stream ProductOrder) returns (stream DeductResponse) {}
}
其他相同地方不赘述
6.2、开发服务端应用
- 重点是提供grpc服务的
GrpcStockServerService.java
,咱们要做的就是给上层框架返回一个匿名类,至于里面的onNext
、onCompleted
方法何时被调用是上层框架决定的,另外还准备了成员变量totalCount
,这样就可以记录总数了,由于请求参数是流,因此匿名类的onNext
会被多次调用,并且由于返回值是流,因此onNext
中调用了responseObserver.onNext
方法来响应流中的每个请求,这样客户端就不断收到服务端的响应数据(即客户端的onNext
方法会被多次调用):
package com.demo.grpc.service;import com.demo.grpc.order.DeductResponse;
import com.demo.grpc.order.ProductOrder;
import com.demo.grpc.order.StockServiceGrpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.server.service.GrpcService;@GrpcService
@Slf4j
public class GrpcStockServerService extends StockServiceGrpc.StockServiceImplBase {@Overridepublic StreamObserver<ProductOrder> batchDeduct(StreamObserver<DeductResponse> responseObserver) {// 返回匿名类,给上层框架使用return new StreamObserver<ProductOrder>() {private int totalCount = 0;@Overridepublic void onNext(ProductOrder value) {log.info("正在处理商品[{}],数量为[{}]",value.getProductId(),value.getNumber());// 增加总量totalCount += value.getNumber();int code;String message;// 假设单数的都有库存不足的问题if (0 == value.getNumber() % 2) {code = 10000;message = String.format("商品[%d]扣减库存数[%d]成功", value.getProductId(), value.getNumber());} else {code = 10001;message = String.format("商品[%d]扣减库存数[%d]失败", value.getProductId(), value.getNumber());}responseObserver.onNext(DeductResponse.newBuilder().setCode(code).setMessage(message).build());}@Overridepublic void onError(Throwable t) {log.error("批量减扣库存异常", t);}@Overridepublic void onCompleted() {log.info("批量减扣库存完成,共计[{}]件商品", totalCount);responseObserver.onCompleted();}};}
}
6.3、开发客户端应用
- 看看如何远程调用双向流类型的gRPC接口,代码中已经添加详细注释:
package com.demo.grpc.service;import com.demo.grpc.order.DeductResponse;
import com.demo.grpc.order.ProductOrder;
import com.demo.grpc.order.StockServiceGrpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;/*** @author hanson.huang* @version V1.0* @ClassName GrpcStockClientService* @date 2025/3/5 22:15**/
@Service
@Slf4j
public class GrpcStockClientService {@GrpcClient("grpc-server")private StockServiceGrpc.StockServiceStub stockServiceStub;/*** 批量减库存* @param count* @return*/public String batchDeduct(int count) {CountDownLatch countDownLatch = new CountDownLatch(1);// responseObserver的onNext和onCompleted会在另一个线程中被执行,// ExtendResponseObserver继承自StreamObserverExtendResponseObserver<DeductResponse> responseObserver = new ExtendResponseObserver<DeductResponse>() {// 用stringBuilder保存所有来自服务端的响应private StringBuilder stringBuilder = new StringBuilder();@Overridepublic String getExtra() {return stringBuilder.toString();}/*** 客户端的流式请求期间,每一笔请求都会收到服务端的一个响应,* 对应每个响应,这里的onNext方法都会被执行一次,入参是响应内容* @param value*/@Overridepublic void onNext(DeductResponse value) {log.info("batch deduct on next");// 放入匿名类的成员变量中stringBuilder.append(String.format("返回码[%d],返回信息:%s<br>" , value.getCode(), value.getMessage()));}@Overridepublic void onError(Throwable t) {log.error("batch deduct gRPC request error", t);stringBuilder.append("batch deduct gRPC error, " + t.getMessage());countDownLatch.countDown();}/*** 服务端确认响应完成后,这里的onCompleted方法会被调用*/@Overridepublic void onCompleted() {log.info("batch deduct on complete");// 执行了countDown方法后,前面执行countDownLatch.await方法的线程就不再wait了,// 会继续往下执行countDownLatch.countDown();}};// 远程调用,此时数据还没有给到服务端StreamObserver<ProductOrder> requestObserver = stockServiceStub.batchDeduct(responseObserver);for(int i=0; i<count; i++) {// 每次执行onNext都会发送一笔数据到服务端,// 服务端的onNext方法都会被执行一次requestObserver.onNext(build(101 + i, 1 + i));}// 客户端告诉服务端:数据已经发完了requestObserver.onCompleted();try {// 开始等待,如果服务端处理完成,那么responseObserver的onCompleted方法会在另一个线程被执行,// 那里会执行countDownLatch的countDown方法,一但countDown被执行,下面的await就执行完毕了,// await的超时时间设置为2秒countDownLatch.await(2, TimeUnit.SECONDS);} catch (InterruptedException e) {log.error("countDownLatch await error", e);}log.info("service finish");// 服务端返回的内容被放置在requestObserver中,从getExtra方法可以取得return responseObserver.getExtra();}/*** 创建ProductOrder对象* @param productId* @param num* @return*/private static ProductOrder build(int productId, int num) {return ProductOrder.newBuilder().setProductId(productId).setNumber(num).build();}
}
- 最后做个web接口,可以通过web请求验证远程调用:
package com.demo.grpc.controller;import com.demo.grpc.service.GrpcStockClientService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.List;@RestController
public class GrpcClientController {@Resourceprivate GrpcStockClientService grpcStockClientService;@GetMapping("/test4")public String printMessage4(@RequestParam(defaultValue = "1") int count) {return grpcStockClientService.batchDeduct(count);}
}
6.4、验证
- 这里要改:浏览器输入http://localhost:8082/test3?count=10,响应如下,可见远程调用gRPC服务成功,流式响应的每一笔返回都被客户端收到:
- 服务端
- 客服端
四种类型的gRPC服务及其客户端开发就完成了
创作不易,不妨点赞、收藏、关注支持一下,各位的支持就是我创作的最大动力❤️
相关文章:

【gRPC】Java高性能远程调用之gRPC详解
gRPC详解 一、什么是gRPC?二、用proto生成代码2.1、前期准备2.2、protobuf插件安装 三、简单 RPC3.1、开发gRPC服务端3.2、开发gRPC客户端3.3、验证gRPC服务 四、服务器端流式 RPC4.1、开发一个gRPC服务,类型是服务端流4.2、开发一个客户端,调…...

数据结构知识学习小结
一、动态内存分配基本步骤 1、内存分配简单示例: 个人对于示例的理解: 定义一个整型的指针变量p(着重认为它是一个“变量”我觉得可能会更好理解),这个变量用来存地址的,而不是“值”,malloc函…...

分布式锁—2.Redisson的可重入锁一
大纲 1.Redisson可重入锁RedissonLock概述 2.可重入锁源码之创建RedissonClient实例 3.可重入锁源码之lua脚本加锁逻辑 4.可重入锁源码之WatchDog维持加锁逻辑 5.可重入锁源码之可重入加锁逻辑 6.可重入锁源码之锁的互斥阻塞逻辑 7.可重入锁源码之释放锁逻辑 8.可重入锁…...

计算机毕业设计SpringBoot+Vue.js球队训练信息管理系统(源码+文档+PPT+讲解)
温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 作者简介:Java领…...

FFmpeg入门:最简单的音视频播放器
FFmpeg入门:最简单的音视频播放器 前两章,我们已经了解了分别如何构建一个简单和音频播放器和视频播放器。 FFmpeg入门:最简单的音频播放器 FFmpeg入门:最简单的视频播放器 本章我们将结合上述两章的知识,看看如何融…...

java 查找两个集合的交集部分数据
利用了Java 8的Stream API,代码简洁且效率高 import java.util.stream.Collectors; import java.util.List; import java.util.HashSet; import java.util.Set;public class ListIntersection {public static List<Long> findIntersection(List<Long> …...

【系统架构设计师】以数据为中心的体系结构风格
目录 1. 说明2. 仓库体系结构风格3. 黑板体系结构风格 1. 说明 1.以数据为中心的体系结构风格主要包括仓库体系结构风格和黑板体系结构风格。 2. 仓库体系结构风格 1.仓库(Repository)是存储和维护数据的中心场所。2.在仓库风格中,有两种不…...

通过HTML有序列表(ol/li)实现自动递增编号的完整解决方案
以下是通过HTML有序列表(ol/li)实现自动递增编号的完整解决方案: <!DOCTYPE html> <html> <head> <style> /* 基础样式 */ ol {margin: 1em 0;padding-left: 2em; }/* 方案1:默认数字编号 */ ol.default {list-style-type: dec…...

【Python 数据结构 4.单向链表】
目录 一、单向链表的基本概念 1.单向链表的概念 2.单向链表的元素插入 元素插入的步骤 3.单向链表的元素删除 元素删除的步骤 4.单向链表的元素查找 元素查找的步骤 5.单向链表的元素索引 元素索引的步骤 6.单向链表的元素修改 元素修改的步骤 二、Python中的单向链表 编辑 三…...

基于 vLLM 部署 LSTM 时序预测模型的“下饭”(智能告警预测与根因分析部署)指南
Alright,各位看官老爷们,准备好迎接史上最爆笑、最通俗易懂的 “基于 vLLM 部署 LSTM 时序预测模型的智能告警预测与根因分析部署指南” 吗? 保证让你笑出猪叫,看完直接变身技术大咖!🚀😂 咱们今天的主题,就像是要打造一个“智能运维小管家”! 这个小管家,不仅能提…...

Java多线程与高并发专题——ConcurrentHashMap 在 Java7 和 8 有何不同?
引入 上一篇我们提到HashMap 是线程不安全的,并推荐使用线程安全同时性能比较好的 ConcurrentHashMap。 而在 Java 8 中,对于 ConcurrentHashMap 这个常用的工具类进行了很大的升级,对比之前 Java 7 版本在诸多方面都进行了调整和变化。不过…...

NL2SQL-基于Dify+阿里通义千问大模型,实现自然语音自动生产SQL语句
本文基于Dify阿里通义千问大模型,实现自然语音自动生产SQL语句功能,话不多说直接上效果图 我们可以试着问他几个问题 查询每个部门的员工数量SELECT d.dept_name, COUNT(e.emp_no) AS employee_count FROM employees e JOIN dept_emp de ON e.emp_no d…...

LeetCode 1328.破坏回文串:贪心
【LetMeFly】1328.破坏回文串:贪心 力扣题目链接:https://leetcode.cn/problems/break-a-palindrome/ 给你一个由小写英文字母组成的回文字符串 palindrome ,请你将其中 一个 字符用任意小写英文字母替换,使得结果字符串的 字典…...

计算机视觉|ViT详解:打破视觉与语言界限
一、ViT 的诞生背景 在计算机视觉领域的发展中,卷积神经网络(CNN)一直占据重要地位。自 2012 年 AlexNet 在 ImageNet 大赛中取得优异成绩后,CNN 在图像分类任务中显示出强大能力。随后,VGG、ResNet 等深度网络架构不…...

//定义一个方法,把int数组中的数据按照指定的格式拼接成一个字符串返回,调用该方法,并在控制台输出结果
import java.util.Scanner; public class cha{ public static void main(String[] args){//定义一个方法,把int数组中的数据按照指定的格式拼接成一个字符串返回,调用该方法,并在控制台输出结果//eg: 数组为:int[] arr…...

Python快捷手册
Python快捷手册 后续会陆续更新Python对应的依赖或者工具使用方法 文章目录 Python快捷手册[toc]1-依赖1-词云小工具2-图片添加文字3-BeautifulSoup网络爬虫4-Tkinter界面绘制5-PDF转Word 2-开发1-多线程和队列 3-运维1-Requirement依赖2-波尔实验室3-Anaconda3使用教程4-CentO…...

QT5 GPU使用
一、问题1 1、现象 2、原因分析 出现上图错误,无法创建EGL表面,错误=0x300b。申请不上native window有可能是缺少libqeglfs-mali-integration.so 这个库 3、解决方法 需要将其adb push 到小机端的/usr/lib/qt5/plugins/egldeviceintegrat…...

如何在Spring Boot中读取JAR包内resources目录下文件
精心整理了最新的面试资料和简历模板,有需要的可以自行获取 点击前往百度网盘获取 点击前往夸克网盘获取 以下是如何在Spring Boot中读取JAR包内resources目录下文件的教程,分为多种方法及详细说明: 方法1:使用 ClassPathResour…...

《张一鸣,创业心路与算法思维》
张一鸣,多年如一日的阅读习惯。 爱读人物传记,称教科书式人类知识最浓缩的书,也爱看心理学,创业以及商业管理类的书。 冯仑,王石,联想,杰克韦尔奇,思科。 《乔布斯传》《埃隆马斯…...

SSE 和 WebSocket 的对比
SSE 和 WebSocket 的对比 在现代Web开发中,实时通信是提升用户体验的重要手段。Server-Sent Events(SSE)和WebSocket是两种实现服务器与客户端之间实时数据传输的技术,但它们在功能、适用场景以及实现方式上有所不同。 1. 基本概…...

es如何进行refresh?
在 Elasticsearch 中,refresh 操作的作用是让最近写入的数据可以被搜索到。以下为你介绍几种常见的执行 refresh 操作的方式: 1. 使用 RESTful API 手动刷新 你可以通过向 Elasticsearch 发送 HTTP 请求来手动触发 refresh 操作。可以针对单个索引、多个索引或者所有索引进…...

Kubespray部署企业级高可用K8S指南
目录 前言1 K8S集群节点准备1.1 主机列表1.2 kubespray节点python3及pip3准备1.2.1. 更新系统1.2.2. 安装依赖1.2.3. 下载Python 3.12源码1.2.4. 解压源码包1.2.5. 编译和安装Python1.2.6. 验证安装1.2.7. 设置Python 3.12为默认版本(可选)1.2.8. 安装pi…...

【实战篇】【深度解析DeepSeek:从机器学习到深度学习的全场景落地指南】
一、机器学习模型:DeepSeek的降维打击 1.1 监督学习与无监督学习的"左右互搏" 监督学习就像学霸刷题——给标注数据(参考答案)训练模型。DeepSeek在信贷风控场景中,用逻辑回归模型分析百万级用户数据,通过特征工程挖掘出"凌晨3点频繁申请贷款"这类魔…...

优选算法的智慧之光:滑动窗口专题(二)
专栏:算法的魔法世界 个人主页:手握风云 目录 一、例题讲解 1.1. 最大连续1的个数 III 1.2. 找到字符串中所有字母异位词 1.3. 串联所有单词的子串 1.4. 最小覆盖子串 一、例题讲解 1.1. 最大连续1的个数 III 题目要求是二进制数组&am…...

Kylin麒麟操作系统服务部署 | NFS服务部署
以下所使用的环境为: 虚拟化软件:VMware Workstation 17 Pro 麒麟系统版本:Kylin-Server-V10-SP3-2403-Release-20240426-x86_64 一、 NFS服务概述 NFS(Network File System),即网络文件系统。是一种使用于…...

7.1.2 计算机网络的分类
文章目录 分布范围交换方式 分布范围 计算机网络按照分布范围可分为局域网、广域网、城域网。局域网的范围在10m~1km,例如校园网,网速高,主要用于共享网络资源,拓扑结构简单,约束少。广域网的范围在100km,例…...

Spring Cloud Alibaba 实战:轻松实现 Nacos 服务发现与动态配置管理
1. Nacos 介绍 1.1 什么是 Nacos? Nacos(Naming and Configuration Service)是阿里巴巴开源的一个服务注册中心和配置管理中心。它支持动态服务发现、配置管理和服务治理,适用于微服务架构,尤其是基于 Spring Cloud …...

【数据结构】LRUCache|并查集
目录 一、LRUCache 1.概念 2.实现:哈希表双向链表 3.JDK中类似LRUCahe的数据结构LinkedHashMap 🔥4.OJ练习 二、并查集 1. 并查集原理 2.并查集代码实现 3.并查集OJ 一、LRUCache 1.概念 最近最少使用的,一直Cache替换算法 LRU是Least Recent…...

智能合约中权限管理不当
权限管理不当 : 权限管理不当是智能合约中常见的安全问题之一,尤其是在管理员或特定账户被过度赋予权限的情况下。如果合约中的关键功能,如转移资产、修改合约状态或升级合约逻辑,可以被未经授权的实体随意操作,这将构…...

MariaDB Galera 原理及用例说明
一、底层原理 MariaDB Galera 集群是一种基于同步多主架构的高可用数据库解决方案,适合需要高并发、低延迟和数据强一致性的场景。以下是部署和配置 MariaDB Galera 集群的简明步骤: 1. 环境准备 节点要求:至少 3 个节点(奇数节点…...