当前位置: 首页 > news >正文

记录一次gRpc流式操作(jedis版)

使用背景: 从redis队列中发送和消费消息.(使用gRpc的流式实现的消费消息)

gRpc协议类定义

service方法定义
service MQDataService{
rpc sendFacebookAndroidMsg(google.protobuf.StringValue)returns (ResultProto);
rpc receiveFacebookAndroidMsg(empty)returns (stream google.protobuf.StringValue);
}

服务端写法

@Overridepublic void sendFacebookAndroidMsg(StringValue request, StreamObserver<ResultProto> responseObserver) {CacheKey cacheKey= AppKey.appReport;String key=cacheKey.get_keyName().replace("{PLATFORM}", MqTopic.FB_TOPIC).replace("{APPTYPE}", "0");RedissonFactory.pushMsg(key, request.getValue(), cacheKey.get_dbIndex(),cacheKey.get_expireSecondTime());ResultProto.Builder builder = ResultProto.newBuilder();builder.setCode(ResultType.SUCCESS);responseObserver.onNext(builder.build());responseObserver.onCompleted();}@Overridepublic void receiveFacebookAndroidMsg(empty request, StreamObserver<StringValue> responseObserver) {MQListener mqListener=new MQListener(responseObserver);try {CacheKey cacheKey= AppKey.appReport;String key=cacheKey.get_keyName().replace("{PLATFORM}", MqTopic.FB_TOPIC).replace("{APPTYPE}","0");RedissonFactory.getRedis().subscribe(mqListener,key);} catch (Exception e) {}finally {responseObserver.onCompleted();}}// 消息监听响应
public class MQListener extends JedisPubSub {public MQListener(StreamObserver<StringValue> responseObserver){_responseObserver=responseObserver;}private StreamObserver<StringValue> _responseObserver;// 取得订阅的消息后的处理public void onMessage(String channel, String message) {if(!StringUtil.isNullOrEmpty(message)){StringValue.Builder builder = StringValue.newBuilder();builder.setValue(message);_responseObserver.onNext(builder.build());}}// 初始化订阅时候的处理public void onSubscribe(String channel, int subscribedChannels) {...}// 取消订阅时候的处理public void onUnsubscribe(String channel, int subscribedChannels) {...}// 初始化按表达式的方式订阅时候的处理public void onPSubscribe(String pattern, int subscribedChannels) {...}// 取消按表达式的方式订阅时候的处理public void onPUnsubscribe(String pattern, int subscribedChannels) {...}// 取得按表达式的方式订阅的消息后的处理public void onPMessage(String pattern, String channel, String message) {...}
}

客户端写法

public static void receiveFacebookAndroidMsg() {try {log.info("facebook android msg");// 接收消息StreamObserver<StringValue> responseObserver = new StreamObserver<StringValue>() {@Overridepublic void onNext(StringValue msgProto) {try {log.info("facebook android msg 接收到消息: {}", msgProto.getValue());JSONObject jsonObject = JSONObject.parseObject(msgProto.getValue());...} catch (Exception e) {log.error("facebook ios msg 消费失败{}", e.getMessage());// 发给mq重新消费...}}@Overridepublic void onError(Throwable throwable) {System.err.println("Error occurred: " + throwable.getMessage());log.info("facebook android Error occurred: {}", throwable.getMessage());}@Overridepublic void onCompleted() {System.out.println("Stream completed.");log.info("facebook android Stream completed.");}};log.info("接收fb android msg 开始");ClientManager.getMqDataServiceStub().receiveFacebookAndroidMsg(empty.newBuilder().build(), responseObserver);log.info("接收fb android msg 成功");} catch (Exception e) {log.info("出错了");}}

源码下载

相关文章:

记录一次gRpc流式操作(jedis版)

使用背景: 从redis队列中发送和消费消息.(使用gRpc的流式实现的消费消息) gRpc协议类定义 service方法定义 service MQDataService{ rpc sendFacebookAndroidMsg(google.protobuf.StringValue)returns (ResultProto); rpc receiveFacebookAndroidMsg(empty)returns (stream g…...

20241001国庆学习

n60f/p 这个n是指旋转磁场的速度。 极数表示旋转转子的永磁体极数&#xff0c;具有一对N极&#xff0f;S极的电机称为双极电机。 极数可以是2、4、6、8等。 &#xff08;从电机控制的角度来看&#xff0c;当极数增加一倍时&#xff0c;转速将减半&#xff0c;当极数增加四倍时…...

基于SSM的农产品仓库管理系统【附源码】

基于SSM的农产品仓库管理系统&#xff08;源码L文说明文档&#xff09; 目录 4 系统设计 4.1 系统概要设计 4.2 系统功能结构设计 4.3 数据库设计 4.3.1 数据库E-R图设计 4.3.2 数据库表结构设计 5 系统实现 5.1 管理员功能介绍 5.1.1 用户管…...

fmt:C++ 格式化库

fmt 是一个现代化、快速且安全的 C 格式化库&#xff0c;专注于高效地格式化文本。它提供了类似 Python 的 format 功能&#xff0c;但具有更高的性能和类型安全特性。fmt 库在处理字符串格式化、日志输出以及构建用户友好的输出时尤为强大。自从 C20 标准引入 std::format 后&…...

RabbitMQ MQ的可靠性及消费者的可靠性

1.MQ可靠性&#xff1a; 如何保证消息的可靠性&#xff1a; (1).通过配置可以让交换机、队列、以及发送的消息都持久化。这样队列中的消息会持久化到磁盘&#xff0c;MQ重起消息依然存在。 (2).3.6.0版本开始&#xff0c;RabbitMQ引入了惰性队列模式&#xff0c;这种模式下&am…...

使用 Nexus 代理 Docker Hub 的配置指南

在本篇文章中&#xff0c;我们将详细介绍如何配置 Nexus 以代理 Docker Hub&#xff0c;从而实现更高效的镜像管理。以下步骤涵盖了从 Nexus 的安装到 Docker 客户端的配置。 1. 配置 Nexus 1.1 登录 Nexus 打开浏览器&#xff0c;访问 Nexus 的 URL&#xff08;例如 http:/…...

笔记整理—linux进程部分(4)进程状态与守护进程

进程的几种重要状态&#xff0c;就绪态&#xff1b;运行态&#xff1b;僵尸态&#xff1b;等待态&#xff08;浅度睡眠、深度睡眠&#xff09;&#xff1b;停止态。 就单核CPU而言&#xff0c;在同一时间只能运行一个进程&#xff0c;但实际上要运行的进程不止一个&#xff0c;…...

# VirtualBox中安装的CentOS 6.5网络设置为NAT模式时,怎么使用SecureCRT连接CentOS6.5系统?

VirtualBox中安装的CentOS 6.5网络设置为NAT模式时&#xff0c;怎么使用SecureCRT连接CentOS6.5系统&#xff1f; 一、查询 【VirtualBox Host-Only Network】虚拟网卡的网络配置 IP。 1、按键盘上WIN R 组合键&#xff0c;打开【运行】&#xff0c;输入【 ncpa.cpl 】&…...

7-1.Android SQLite 之 SQLiteDatabase 简单编码模板(SQLiteDatabase 使用、SQL 语句编写)

一、SQLiteDatabase SQLite 是一种轻量级的数据库引擎&#xff0c;它非常适合在移动设备&#xff08;例如&#xff0c;Android&#xff09;上使用 SQLiteDatabase 允许应用程序与 SQLite 数据库进行交互&#xff0c;它提供了增删改查等一系列方法 二、SQLiteDatabase 简单编码…...

灰度图像重心(质心)求取算法

1、图像的重心坐标计算 假设我们有一个二维图像,其中 (x, y) 表示图像中每个像素的坐标。I(x, y) 表示图像在 (x, y) 处的亮度(或像素值),通常是灰度值。 图像的重心坐标 (X, Y) 可以通过以下公式计算: X = Σ [x * I(x, y)] / Σ I(x, y) Y = Σ [y * I(x, y)] / Σ I(…...

k8s 1.28.2 集群部署 ingress 1.11.1 包含 admission-webhook

文章目录 [toc]证书创建部署 ingress-controlleringress 验证创建测试 nginx pod创建错误的 ingress 配置创建正确的 ingress 配置 ingress 官方 yaml 文件&#xff1a;deploy.yaml基于官方 yaml 文件做了一些修改 官方的 svc 是 ClusterIP 和 LoadBalancer&#xff0c;我这边把…...

pom web 自动化测试框架分享

这是初版的 pom web 测试框架&#xff0c;目录如下同时部分代码也放在下面&#xff0c;详细代码可前往 github 查看&#xff0c;欢迎大家给出宝贵意见。 |--base | base_page.py&#xff08;封装方法&#xff09; | |--config | allure_config.py&#xff08;测试报告配…...

一些以前使用的linux及shell命令,gnuplot脚本

tar tar -cvzf xxx.tar.gz * -c&#xff0c;--create 创建新的tar文件 -v&#xff0c;--verbose 列出每一步处理涉及的文件的信息&#xff0c;只用一个“v”时&#xff0c;仅列出文件名 使用两个“v”时&#xff0c;列出权限、所有者、大小、时间、文件名等信息 -z&#xff0c…...

Django一分钟:DRF模型序列化器处理关联关系的示例与注意事项

DRF的ModelSerializer序列化器与Django的Model模型紧密映射&#xff0c;本文将通过简单的示例介绍几种处理关联关系的方法。 1. 创建模型和初始数据 创建模型 from django.db import modelsclass Product(models.Model):product_name models.CharField(max_length255)quant…...

Python爬虫selenium框架基本使用

一、安装导入 使用包管理器安装 pip3 install selenium 二、WebDriver工具 要使用这个工具我们需要保证安装了一个浏览器的驱动器。 Python的WebDriver是一个用于自动化Web浏览器操作的工具&#xff0c;它属于Selenium的一部分&#xff0c;特别是Selenium 2.0及以后版本中…...

sql 时间交集

任务&#xff08;取时间交集&#xff09; 前端输入开始时间和结束时间&#xff0c;通过sql筛选出活动开始时间和活动结束时间再开时时间和结束时间有交集的活动 想法&#xff1a; 前后一段时间内遇到了类似取交集的&#xff0c;从网上找到了两种写法&#xff0c;再结合GPT等…...

【深度学习】05-Rnn循环神经网络-01- 自然语言处理概述/词嵌入层/循环网络/文本生成案例精讲

循环神经网络&#xff08;RNN&#xff09;主要用于自然语言处理的。 循环神经网络&#xff08;RNN&#xff09;、卷积神经网络&#xff08;CNN&#xff09;和全连接神经网络&#xff08;FCN&#xff09;是三种常见的神经网络类型&#xff0c;各自擅长处理不同类型的数据。下面…...

基于JAVA+SpringBoot+Vue的电商平台的设计与实现

基于JAVASpringBootVue的电商平台的设计与实现 前言 ✌全网粉丝20W,csdn特邀作者、博客专家、CSDN[新星计划]导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末附源码下载链接&#x1f345…...

CSS盒模型-怪异盒模型笔记-思维导图-案例等

文章目录 一、盒模型&#xff08;重点&#xff09;二、怪异盒模型三、块级元素和行内元素区别汇总四、块级元素和行内元素的转换(显示方式)||元素的显示和隐藏五、思维导图六、笔记资料 一、盒模型&#xff08;重点&#xff09; 所有HTML元素可以看作盒子。 CSS盒模型本质上是…...

thinkphp6开发的通用网站系统源码

thinkphp6开发的通用网站系统源码。 基于ThinkPHP6框架开发的通用后台权限管理系统&#xff0c;底层采用国内最流行的ThinkPHP6框架&#xff0c; 支持内容管理、文章管理、用户管理、权限管理、角色管理等功能。 代码下载百度网盘...

遍历 Map 类型集合的方法汇总

1 方法一 先用方法 keySet() 获取集合中的所有键。再通过 gey(key) 方法用对应键获取值 import java.util.HashMap; import java.util.Set;public class Test {public static void main(String[] args) {HashMap hashMap new HashMap();hashMap.put("语文",99);has…...

Swagger和OpenApi的前世今生

Swagger与OpenAPI的关系演进是API标准化进程中的重要篇章&#xff0c;二者共同塑造了现代RESTful API的开发范式。 本期就扒一扒其技术演进的关键节点与核心逻辑&#xff1a; &#x1f504; 一、起源与初创期&#xff1a;Swagger的诞生&#xff08;2010-2014&#xff09; 核心…...

OPENCV形态学基础之二腐蚀

一.腐蚀的原理 (图1) 数学表达式&#xff1a;dst(x,y) erode(src(x,y)) min(x,y)src(xx,yy) 腐蚀也是图像形态学的基本功能之一&#xff0c;腐蚀跟膨胀属于反向操作&#xff0c;膨胀是把图像图像变大&#xff0c;而腐蚀就是把图像变小。腐蚀后的图像变小变暗淡。 腐蚀…...

服务器--宝塔命令

一、宝塔面板安装命令 ⚠️ 必须使用 root 用户 或 sudo 权限执行&#xff01; sudo su - 1. CentOS 系统&#xff1a; yum install -y wget && wget -O install.sh http://download.bt.cn/install/install_6.0.sh && sh install.sh2. Ubuntu / Debian 系统…...

LangChain知识库管理后端接口:数据库操作详解—— 构建本地知识库系统的基础《二》

这段 Python 代码是一个完整的 知识库数据库操作模块&#xff0c;用于对本地知识库系统中的知识库进行增删改查&#xff08;CRUD&#xff09;操作。它基于 SQLAlchemy ORM 框架 和一个自定义的装饰器 with_session 实现数据库会话管理。 &#x1f4d8; 一、整体功能概述 该模块…...

RSS 2025|从说明书学习复杂机器人操作任务:NUS邵林团队提出全新机器人装配技能学习框架Manual2Skill

视觉语言模型&#xff08;Vision-Language Models, VLMs&#xff09;&#xff0c;为真实环境中的机器人操作任务提供了极具潜力的解决方案。 尽管 VLMs 取得了显著进展&#xff0c;机器人仍难以胜任复杂的长时程任务&#xff08;如家具装配&#xff09;&#xff0c;主要受限于人…...

C语言中提供的第三方库之哈希表实现

一. 简介 前面一篇文章简单学习了C语言中第三方库&#xff08;uthash库&#xff09;提供对哈希表的操作&#xff0c;文章如下&#xff1a; C语言中提供的第三方库uthash常用接口-CSDN博客 本文简单学习一下第三方库 uthash库对哈希表的操作。 二. uthash库哈希表操作示例 u…...

论文阅读:LLM4Drive: A Survey of Large Language Models for Autonomous Driving

地址&#xff1a;LLM4Drive: A Survey of Large Language Models for Autonomous Driving 摘要翻译 自动驾驶技术作为推动交通和城市出行变革的催化剂&#xff0c;正从基于规则的系统向数据驱动策略转变。传统的模块化系统受限于级联模块间的累积误差和缺乏灵活性的预设规则。…...

redis和redission的区别

Redis 和 Redisson 是两个密切相关但又本质不同的技术&#xff0c;它们扮演着完全不同的角色&#xff1a; Redis: 内存数据库/数据结构存储 本质&#xff1a; 它是一个开源的、高性能的、基于内存的 键值存储数据库。它也可以将数据持久化到磁盘。 核心功能&#xff1a; 提供丰…...

Ubuntu系统多网卡多相机IP设置方法

目录 1、硬件情况 2、如何设置网卡和相机IP 2.1 万兆网卡连接交换机&#xff0c;交换机再连相机 2.1.1 网卡设置 2.1.2 相机设置 2.3 万兆网卡直连相机 1、硬件情况 2个网卡n个相机 电脑系统信息&#xff0c;系统版本&#xff1a;Ubuntu22.04.5 LTS&#xff1b;内核版本…...