当前位置: 首页 > news >正文

记录一次gRpc流式操作(jedis版)

使用背景: 从redis队列中发送和消费消息.(使用gRpc的流式实现的消费消息)

gRpc协议类定义

service方法定义
service MQDataService{
rpc sendFacebookAndroidMsg(google.protobuf.StringValue)returns (ResultProto);
rpc receiveFacebookAndroidMsg(empty)returns (stream google.protobuf.StringValue);
}

服务端写法

@Overridepublic void sendFacebookAndroidMsg(StringValue request, StreamObserver<ResultProto> responseObserver) {CacheKey cacheKey= AppKey.appReport;String key=cacheKey.get_keyName().replace("{PLATFORM}", MqTopic.FB_TOPIC).replace("{APPTYPE}", "0");RedissonFactory.pushMsg(key, request.getValue(), cacheKey.get_dbIndex(),cacheKey.get_expireSecondTime());ResultProto.Builder builder = ResultProto.newBuilder();builder.setCode(ResultType.SUCCESS);responseObserver.onNext(builder.build());responseObserver.onCompleted();}@Overridepublic void receiveFacebookAndroidMsg(empty request, StreamObserver<StringValue> responseObserver) {MQListener mqListener=new MQListener(responseObserver);try {CacheKey cacheKey= AppKey.appReport;String key=cacheKey.get_keyName().replace("{PLATFORM}", MqTopic.FB_TOPIC).replace("{APPTYPE}","0");RedissonFactory.getRedis().subscribe(mqListener,key);} catch (Exception e) {}finally {responseObserver.onCompleted();}}// 消息监听响应
public class MQListener extends JedisPubSub {public MQListener(StreamObserver<StringValue> responseObserver){_responseObserver=responseObserver;}private StreamObserver<StringValue> _responseObserver;// 取得订阅的消息后的处理public void onMessage(String channel, String message) {if(!StringUtil.isNullOrEmpty(message)){StringValue.Builder builder = StringValue.newBuilder();builder.setValue(message);_responseObserver.onNext(builder.build());}}// 初始化订阅时候的处理public void onSubscribe(String channel, int subscribedChannels) {...}// 取消订阅时候的处理public void onUnsubscribe(String channel, int subscribedChannels) {...}// 初始化按表达式的方式订阅时候的处理public void onPSubscribe(String pattern, int subscribedChannels) {...}// 取消按表达式的方式订阅时候的处理public void onPUnsubscribe(String pattern, int subscribedChannels) {...}// 取得按表达式的方式订阅的消息后的处理public void onPMessage(String pattern, String channel, String message) {...}
}

客户端写法

public static void receiveFacebookAndroidMsg() {try {log.info("facebook android msg");// 接收消息StreamObserver<StringValue> responseObserver = new StreamObserver<StringValue>() {@Overridepublic void onNext(StringValue msgProto) {try {log.info("facebook android msg 接收到消息: {}", msgProto.getValue());JSONObject jsonObject = JSONObject.parseObject(msgProto.getValue());...} catch (Exception e) {log.error("facebook ios msg 消费失败{}", e.getMessage());// 发给mq重新消费...}}@Overridepublic void onError(Throwable throwable) {System.err.println("Error occurred: " + throwable.getMessage());log.info("facebook android Error occurred: {}", throwable.getMessage());}@Overridepublic void onCompleted() {System.out.println("Stream completed.");log.info("facebook android Stream completed.");}};log.info("接收fb android msg 开始");ClientManager.getMqDataServiceStub().receiveFacebookAndroidMsg(empty.newBuilder().build(), responseObserver);log.info("接收fb android msg 成功");} catch (Exception e) {log.info("出错了");}}

源码下载

相关文章:

记录一次gRpc流式操作(jedis版)

使用背景: 从redis队列中发送和消费消息.(使用gRpc的流式实现的消费消息) gRpc协议类定义 service方法定义 service MQDataService{ rpc sendFacebookAndroidMsg(google.protobuf.StringValue)returns (ResultProto); rpc receiveFacebookAndroidMsg(empty)returns (stream g…...

20241001国庆学习

n60f/p 这个n是指旋转磁场的速度。 极数表示旋转转子的永磁体极数&#xff0c;具有一对N极&#xff0f;S极的电机称为双极电机。 极数可以是2、4、6、8等。 &#xff08;从电机控制的角度来看&#xff0c;当极数增加一倍时&#xff0c;转速将减半&#xff0c;当极数增加四倍时…...

基于SSM的农产品仓库管理系统【附源码】

基于SSM的农产品仓库管理系统&#xff08;源码L文说明文档&#xff09; 目录 4 系统设计 4.1 系统概要设计 4.2 系统功能结构设计 4.3 数据库设计 4.3.1 数据库E-R图设计 4.3.2 数据库表结构设计 5 系统实现 5.1 管理员功能介绍 5.1.1 用户管…...

fmt:C++ 格式化库

fmt 是一个现代化、快速且安全的 C 格式化库&#xff0c;专注于高效地格式化文本。它提供了类似 Python 的 format 功能&#xff0c;但具有更高的性能和类型安全特性。fmt 库在处理字符串格式化、日志输出以及构建用户友好的输出时尤为强大。自从 C20 标准引入 std::format 后&…...

RabbitMQ MQ的可靠性及消费者的可靠性

1.MQ可靠性&#xff1a; 如何保证消息的可靠性&#xff1a; (1).通过配置可以让交换机、队列、以及发送的消息都持久化。这样队列中的消息会持久化到磁盘&#xff0c;MQ重起消息依然存在。 (2).3.6.0版本开始&#xff0c;RabbitMQ引入了惰性队列模式&#xff0c;这种模式下&am…...

使用 Nexus 代理 Docker Hub 的配置指南

在本篇文章中&#xff0c;我们将详细介绍如何配置 Nexus 以代理 Docker Hub&#xff0c;从而实现更高效的镜像管理。以下步骤涵盖了从 Nexus 的安装到 Docker 客户端的配置。 1. 配置 Nexus 1.1 登录 Nexus 打开浏览器&#xff0c;访问 Nexus 的 URL&#xff08;例如 http:/…...

笔记整理—linux进程部分(4)进程状态与守护进程

进程的几种重要状态&#xff0c;就绪态&#xff1b;运行态&#xff1b;僵尸态&#xff1b;等待态&#xff08;浅度睡眠、深度睡眠&#xff09;&#xff1b;停止态。 就单核CPU而言&#xff0c;在同一时间只能运行一个进程&#xff0c;但实际上要运行的进程不止一个&#xff0c;…...

# VirtualBox中安装的CentOS 6.5网络设置为NAT模式时,怎么使用SecureCRT连接CentOS6.5系统?

VirtualBox中安装的CentOS 6.5网络设置为NAT模式时&#xff0c;怎么使用SecureCRT连接CentOS6.5系统&#xff1f; 一、查询 【VirtualBox Host-Only Network】虚拟网卡的网络配置 IP。 1、按键盘上WIN R 组合键&#xff0c;打开【运行】&#xff0c;输入【 ncpa.cpl 】&…...

7-1.Android SQLite 之 SQLiteDatabase 简单编码模板(SQLiteDatabase 使用、SQL 语句编写)

一、SQLiteDatabase SQLite 是一种轻量级的数据库引擎&#xff0c;它非常适合在移动设备&#xff08;例如&#xff0c;Android&#xff09;上使用 SQLiteDatabase 允许应用程序与 SQLite 数据库进行交互&#xff0c;它提供了增删改查等一系列方法 二、SQLiteDatabase 简单编码…...

灰度图像重心(质心)求取算法

1、图像的重心坐标计算 假设我们有一个二维图像,其中 (x, y) 表示图像中每个像素的坐标。I(x, y) 表示图像在 (x, y) 处的亮度(或像素值),通常是灰度值。 图像的重心坐标 (X, Y) 可以通过以下公式计算: X = Σ [x * I(x, y)] / Σ I(x, y) Y = Σ [y * I(x, y)] / Σ I(…...

k8s 1.28.2 集群部署 ingress 1.11.1 包含 admission-webhook

文章目录 [toc]证书创建部署 ingress-controlleringress 验证创建测试 nginx pod创建错误的 ingress 配置创建正确的 ingress 配置 ingress 官方 yaml 文件&#xff1a;deploy.yaml基于官方 yaml 文件做了一些修改 官方的 svc 是 ClusterIP 和 LoadBalancer&#xff0c;我这边把…...

pom web 自动化测试框架分享

这是初版的 pom web 测试框架&#xff0c;目录如下同时部分代码也放在下面&#xff0c;详细代码可前往 github 查看&#xff0c;欢迎大家给出宝贵意见。 |--base | base_page.py&#xff08;封装方法&#xff09; | |--config | allure_config.py&#xff08;测试报告配…...

一些以前使用的linux及shell命令,gnuplot脚本

tar tar -cvzf xxx.tar.gz * -c&#xff0c;--create 创建新的tar文件 -v&#xff0c;--verbose 列出每一步处理涉及的文件的信息&#xff0c;只用一个“v”时&#xff0c;仅列出文件名 使用两个“v”时&#xff0c;列出权限、所有者、大小、时间、文件名等信息 -z&#xff0c…...

Django一分钟:DRF模型序列化器处理关联关系的示例与注意事项

DRF的ModelSerializer序列化器与Django的Model模型紧密映射&#xff0c;本文将通过简单的示例介绍几种处理关联关系的方法。 1. 创建模型和初始数据 创建模型 from django.db import modelsclass Product(models.Model):product_name models.CharField(max_length255)quant…...

Python爬虫selenium框架基本使用

一、安装导入 使用包管理器安装 pip3 install selenium 二、WebDriver工具 要使用这个工具我们需要保证安装了一个浏览器的驱动器。 Python的WebDriver是一个用于自动化Web浏览器操作的工具&#xff0c;它属于Selenium的一部分&#xff0c;特别是Selenium 2.0及以后版本中…...

sql 时间交集

任务&#xff08;取时间交集&#xff09; 前端输入开始时间和结束时间&#xff0c;通过sql筛选出活动开始时间和活动结束时间再开时时间和结束时间有交集的活动 想法&#xff1a; 前后一段时间内遇到了类似取交集的&#xff0c;从网上找到了两种写法&#xff0c;再结合GPT等…...

【深度学习】05-Rnn循环神经网络-01- 自然语言处理概述/词嵌入层/循环网络/文本生成案例精讲

循环神经网络&#xff08;RNN&#xff09;主要用于自然语言处理的。 循环神经网络&#xff08;RNN&#xff09;、卷积神经网络&#xff08;CNN&#xff09;和全连接神经网络&#xff08;FCN&#xff09;是三种常见的神经网络类型&#xff0c;各自擅长处理不同类型的数据。下面…...

基于JAVA+SpringBoot+Vue的电商平台的设计与实现

基于JAVASpringBootVue的电商平台的设计与实现 前言 ✌全网粉丝20W,csdn特邀作者、博客专家、CSDN[新星计划]导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末附源码下载链接&#x1f345…...

CSS盒模型-怪异盒模型笔记-思维导图-案例等

文章目录 一、盒模型&#xff08;重点&#xff09;二、怪异盒模型三、块级元素和行内元素区别汇总四、块级元素和行内元素的转换(显示方式)||元素的显示和隐藏五、思维导图六、笔记资料 一、盒模型&#xff08;重点&#xff09; 所有HTML元素可以看作盒子。 CSS盒模型本质上是…...

thinkphp6开发的通用网站系统源码

thinkphp6开发的通用网站系统源码。 基于ThinkPHP6框架开发的通用后台权限管理系统&#xff0c;底层采用国内最流行的ThinkPHP6框架&#xff0c; 支持内容管理、文章管理、用户管理、权限管理、角色管理等功能。 代码下载百度网盘...

游戏界面开发与UI框架:零基础上手卡牌游戏界面开发与性能调优

游戏界面开发与UI框架&#xff1a;零基础上手卡牌游戏界面开发与性能调优 【免费下载链接】UiCard Generic UI for card games like Hearthstone, Magic Arena and Slay the Spire... 项目地址: https://gitcode.com/gh_mirrors/ui/UiCard 问题诊断&#xff1a;卡牌UI开…...

工厂里EtherCAT从站模块坏了别慌!手把手教你用Startup list和CoE-online快速换新(附配置顺序避坑指南)

工厂EtherCAT从站模块更换实战指南&#xff1a;Startup list与CoE-online的高效应用 当生产线上的EtherCAT从站模块突然罢工&#xff0c;设备维护工程师往往面临两难选择&#xff1a;是临时在线修改参数快速恢复生产&#xff0c;还是彻底解决"即插即用"的配置难题&am…...

第4章 编码规范-4.1 命名规范

在Python中&#xff0c;变量、常量、模块、包、函数、类、对象、属性、方法和异常类都具有一定的命名规范。但是&#xff0c;这些命名规范都是通用性规范&#xff0c;而不是强制性规范&#xff0c;所以具体的命名规范还需要以开发项目的要求为主。&#xff08;1&#xff09;变量…...

RCLAMP0542T.TCT‌静电保护TVS 二极管阵列 SEMTECH 电子元器件IC 芯片

RCLAMP0542T.TCT‌ 是由 ‌SEMTECH‌ 公司推出的一款超低电容、双通道ESD&#xff08;静电放电&#xff09;保护 TVS 二极管阵列&#xff0c;具备0.45pF 超低电容、5A 浪涌承受能力和超小型 SLP1610P4T 封装&#xff0c;专为高速数据接口设计&#xff0c;广泛应用于通信设备、消…...

Qwen3.5-4B-Claude-Opus高性能推理教程:Q4_K_M量化下GPU吞吐量实测分析

Qwen3.5-4B-Claude-Opus高性能推理教程&#xff1a;Q4_K_M量化下GPU吞吐量实测分析 1. 模型概述 Qwen3.5-4B-Claude-4.6-Opus-Reasoning-Distilled-GGUF是基于Qwen3.5-4B架构的推理蒸馏模型&#xff0c;特别强化了结构化分析、分步骤回答以及代码与逻辑类问题的处理能力。该版…...

Vivado中OSD核报错全攻略:从IP_flow 19-167到BD 41-1030的解决方案

Vivado中OSD核报错全攻略&#xff1a;从IP_flow 19-167到BD 41-1030的解决方案 在FPGA开发过程中&#xff0c;Xilinx Vivado工具链的OSD&#xff08;On-Screen Display&#xff09;核是一个常用的视频处理IP&#xff0c;但开发者常会遇到各种报错问题。本文将深入解析从IP_flo…...

Linux驱动开发实战:从设备树到内核调试全解析

Linux驱动工程师实战经验分享&#xff1a;从入门到进阶的技术要点解析1. 设备树系统的深入理解1.1 设备树的基本概念在Linux驱动开发初期&#xff0c;大多数工程师都是从最简单的模块开发开始。典型的入门流程包括&#xff1a;#include <linux/module.h> #include <li…...

计算机毕业设计:基于Django与LSTM的大众点评评价预测系统 Django框架 LSTM Hadoop Spark Hive 可视化 大数据 食品 食物(建议收藏)✅

博主介绍&#xff1a;✌全网粉丝10W&#xff0c;前互联网大厂软件研发、集结硕博英豪成立软件开发工作室&#xff0c;专注于计算机相关专业项目实战6年之久&#xff0c;累计开发项目作品上万套。凭借丰富的经验与专业实力&#xff0c;已帮助成千上万的学生顺利毕业&#xff0c;…...

别再为IP冲突头疼!YOLOv5+海康威视摄像头组网与实时检测的完整避坑指南

工业视觉组网实战&#xff1a;YOLOv5与海康威视摄像头的智能协同方案 在智能制造与安防监控领域&#xff0c;将AI算法与专业摄像设备结合已成为技术标配。但当工程师真正着手部署时&#xff0c;往往会陷入网络配置的泥潭——IP冲突导致设备失联、RTSP流媒体断断续续、多网卡环…...

基于三相两电平逆变器的VSG并网系统:电压电流双闭环控制的仿真研究

VSG并网&#xff0c;基于三相两电平逆变器的虚拟同步机并网&#xff0c;电压电流双闭环控制 1.VSG 2.电压电流双闭环 3..提供相关参考文献 支持simulink2022以下版本&#xff0c;联系跟我说什么版本&#xff0c;我给转成你版本&#xff08;默认发2016b&#xff09;。最近在研究…...