记录一次gRpc流式操作(jedis版)
使用背景: 从redis队列中发送和消费消息.(使用gRpc的流式实现的消费消息)
gRpc协议类定义
service方法定义
service MQDataService{
rpc sendFacebookAndroidMsg(google.protobuf.StringValue)returns (ResultProto);
rpc receiveFacebookAndroidMsg(empty)returns (stream google.protobuf.StringValue);
}
服务端写法
@Overridepublic void sendFacebookAndroidMsg(StringValue request, StreamObserver<ResultProto> responseObserver) {CacheKey cacheKey= AppKey.appReport;String key=cacheKey.get_keyName().replace("{PLATFORM}", MqTopic.FB_TOPIC).replace("{APPTYPE}", "0");RedissonFactory.pushMsg(key, request.getValue(), cacheKey.get_dbIndex(),cacheKey.get_expireSecondTime());ResultProto.Builder builder = ResultProto.newBuilder();builder.setCode(ResultType.SUCCESS);responseObserver.onNext(builder.build());responseObserver.onCompleted();}@Overridepublic void receiveFacebookAndroidMsg(empty request, StreamObserver<StringValue> responseObserver) {MQListener mqListener=new MQListener(responseObserver);try {CacheKey cacheKey= AppKey.appReport;String key=cacheKey.get_keyName().replace("{PLATFORM}", MqTopic.FB_TOPIC).replace("{APPTYPE}","0");RedissonFactory.getRedis().subscribe(mqListener,key);} catch (Exception e) {}finally {responseObserver.onCompleted();}}// 消息监听响应
public class MQListener extends JedisPubSub {public MQListener(StreamObserver<StringValue> responseObserver){_responseObserver=responseObserver;}private StreamObserver<StringValue> _responseObserver;// 取得订阅的消息后的处理public void onMessage(String channel, String message) {if(!StringUtil.isNullOrEmpty(message)){StringValue.Builder builder = StringValue.newBuilder();builder.setValue(message);_responseObserver.onNext(builder.build());}}// 初始化订阅时候的处理public void onSubscribe(String channel, int subscribedChannels) {...}// 取消订阅时候的处理public void onUnsubscribe(String channel, int subscribedChannels) {...}// 初始化按表达式的方式订阅时候的处理public void onPSubscribe(String pattern, int subscribedChannels) {...}// 取消按表达式的方式订阅时候的处理public void onPUnsubscribe(String pattern, int subscribedChannels) {...}// 取得按表达式的方式订阅的消息后的处理public void onPMessage(String pattern, String channel, String message) {...}
}
客户端写法
public static void receiveFacebookAndroidMsg() {try {log.info("facebook android msg");// 接收消息StreamObserver<StringValue> responseObserver = new StreamObserver<StringValue>() {@Overridepublic void onNext(StringValue msgProto) {try {log.info("facebook android msg 接收到消息: {}", msgProto.getValue());JSONObject jsonObject = JSONObject.parseObject(msgProto.getValue());...} catch (Exception e) {log.error("facebook ios msg 消费失败{}", e.getMessage());// 发给mq重新消费...}}@Overridepublic void onError(Throwable throwable) {System.err.println("Error occurred: " + throwable.getMessage());log.info("facebook android Error occurred: {}", throwable.getMessage());}@Overridepublic void onCompleted() {System.out.println("Stream completed.");log.info("facebook android Stream completed.");}};log.info("接收fb android msg 开始");ClientManager.getMqDataServiceStub().receiveFacebookAndroidMsg(empty.newBuilder().build(), responseObserver);log.info("接收fb android msg 成功");} catch (Exception e) {log.info("出错了");}}
源码下载
相关文章:
记录一次gRpc流式操作(jedis版)
使用背景: 从redis队列中发送和消费消息.(使用gRpc的流式实现的消费消息) gRpc协议类定义 service方法定义 service MQDataService{ rpc sendFacebookAndroidMsg(google.protobuf.StringValue)returns (ResultProto); rpc receiveFacebookAndroidMsg(empty)returns (stream g…...

20241001国庆学习
n60f/p 这个n是指旋转磁场的速度。 极数表示旋转转子的永磁体极数,具有一对N极/S极的电机称为双极电机。 极数可以是2、4、6、8等。 (从电机控制的角度来看,当极数增加一倍时,转速将减半,当极数增加四倍时…...

基于SSM的农产品仓库管理系统【附源码】
基于SSM的农产品仓库管理系统(源码L文说明文档) 目录 4 系统设计 4.1 系统概要设计 4.2 系统功能结构设计 4.3 数据库设计 4.3.1 数据库E-R图设计 4.3.2 数据库表结构设计 5 系统实现 5.1 管理员功能介绍 5.1.1 用户管…...

fmt:C++ 格式化库
fmt 是一个现代化、快速且安全的 C 格式化库,专注于高效地格式化文本。它提供了类似 Python 的 format 功能,但具有更高的性能和类型安全特性。fmt 库在处理字符串格式化、日志输出以及构建用户友好的输出时尤为强大。自从 C20 标准引入 std::format 后&…...
RabbitMQ MQ的可靠性及消费者的可靠性
1.MQ可靠性: 如何保证消息的可靠性: (1).通过配置可以让交换机、队列、以及发送的消息都持久化。这样队列中的消息会持久化到磁盘,MQ重起消息依然存在。 (2).3.6.0版本开始,RabbitMQ引入了惰性队列模式,这种模式下&am…...
使用 Nexus 代理 Docker Hub 的配置指南
在本篇文章中,我们将详细介绍如何配置 Nexus 以代理 Docker Hub,从而实现更高效的镜像管理。以下步骤涵盖了从 Nexus 的安装到 Docker 客户端的配置。 1. 配置 Nexus 1.1 登录 Nexus 打开浏览器,访问 Nexus 的 URL(例如 http:/…...

笔记整理—linux进程部分(4)进程状态与守护进程
进程的几种重要状态,就绪态;运行态;僵尸态;等待态(浅度睡眠、深度睡眠);停止态。 就单核CPU而言,在同一时间只能运行一个进程,但实际上要运行的进程不止一个,…...

# VirtualBox中安装的CentOS 6.5网络设置为NAT模式时,怎么使用SecureCRT连接CentOS6.5系统?
VirtualBox中安装的CentOS 6.5网络设置为NAT模式时,怎么使用SecureCRT连接CentOS6.5系统? 一、查询 【VirtualBox Host-Only Network】虚拟网卡的网络配置 IP。 1、按键盘上WIN R 组合键,打开【运行】,输入【 ncpa.cpl 】&…...
7-1.Android SQLite 之 SQLiteDatabase 简单编码模板(SQLiteDatabase 使用、SQL 语句编写)
一、SQLiteDatabase SQLite 是一种轻量级的数据库引擎,它非常适合在移动设备(例如,Android)上使用 SQLiteDatabase 允许应用程序与 SQLite 数据库进行交互,它提供了增删改查等一系列方法 二、SQLiteDatabase 简单编码…...
灰度图像重心(质心)求取算法
1、图像的重心坐标计算 假设我们有一个二维图像,其中 (x, y) 表示图像中每个像素的坐标。I(x, y) 表示图像在 (x, y) 处的亮度(或像素值),通常是灰度值。 图像的重心坐标 (X, Y) 可以通过以下公式计算: X = Σ [x * I(x, y)] / Σ I(x, y) Y = Σ [y * I(x, y)] / Σ I(…...
k8s 1.28.2 集群部署 ingress 1.11.1 包含 admission-webhook
文章目录 [toc]证书创建部署 ingress-controlleringress 验证创建测试 nginx pod创建错误的 ingress 配置创建正确的 ingress 配置 ingress 官方 yaml 文件:deploy.yaml基于官方 yaml 文件做了一些修改 官方的 svc 是 ClusterIP 和 LoadBalancer,我这边把…...
pom web 自动化测试框架分享
这是初版的 pom web 测试框架,目录如下同时部分代码也放在下面,详细代码可前往 github 查看,欢迎大家给出宝贵意见。 |--base | base_page.py(封装方法) | |--config | allure_config.py(测试报告配…...
一些以前使用的linux及shell命令,gnuplot脚本
tar tar -cvzf xxx.tar.gz * -c,--create 创建新的tar文件 -v,--verbose 列出每一步处理涉及的文件的信息,只用一个“v”时,仅列出文件名 使用两个“v”时,列出权限、所有者、大小、时间、文件名等信息 -z,…...
Django一分钟:DRF模型序列化器处理关联关系的示例与注意事项
DRF的ModelSerializer序列化器与Django的Model模型紧密映射,本文将通过简单的示例介绍几种处理关联关系的方法。 1. 创建模型和初始数据 创建模型 from django.db import modelsclass Product(models.Model):product_name models.CharField(max_length255)quant…...
Python爬虫selenium框架基本使用
一、安装导入 使用包管理器安装 pip3 install selenium 二、WebDriver工具 要使用这个工具我们需要保证安装了一个浏览器的驱动器。 Python的WebDriver是一个用于自动化Web浏览器操作的工具,它属于Selenium的一部分,特别是Selenium 2.0及以后版本中…...
sql 时间交集
任务(取时间交集) 前端输入开始时间和结束时间,通过sql筛选出活动开始时间和活动结束时间再开时时间和结束时间有交集的活动 想法: 前后一段时间内遇到了类似取交集的,从网上找到了两种写法,再结合GPT等…...

【深度学习】05-Rnn循环神经网络-01- 自然语言处理概述/词嵌入层/循环网络/文本生成案例精讲
循环神经网络(RNN)主要用于自然语言处理的。 循环神经网络(RNN)、卷积神经网络(CNN)和全连接神经网络(FCN)是三种常见的神经网络类型,各自擅长处理不同类型的数据。下面…...

基于JAVA+SpringBoot+Vue的电商平台的设计与实现
基于JAVASpringBootVue的电商平台的设计与实现 前言 ✌全网粉丝20W,csdn特邀作者、博客专家、CSDN[新星计划]导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末附源码下载链接🍅…...

CSS盒模型-怪异盒模型笔记-思维导图-案例等
文章目录 一、盒模型(重点)二、怪异盒模型三、块级元素和行内元素区别汇总四、块级元素和行内元素的转换(显示方式)||元素的显示和隐藏五、思维导图六、笔记资料 一、盒模型(重点) 所有HTML元素可以看作盒子。 CSS盒模型本质上是…...

thinkphp6开发的通用网站系统源码
thinkphp6开发的通用网站系统源码。 基于ThinkPHP6框架开发的通用后台权限管理系统,底层采用国内最流行的ThinkPHP6框架, 支持内容管理、文章管理、用户管理、权限管理、角色管理等功能。 代码下载百度网盘...

深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录
ASP.NET Core 是一个跨平台的开源框架,用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录,以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...

Linux 文件类型,目录与路径,文件与目录管理
文件类型 后面的字符表示文件类型标志 普通文件:-(纯文本文件,二进制文件,数据格式文件) 如文本文件、图片、程序文件等。 目录文件:d(directory) 用来存放其他文件或子目录。 设备…...
【Linux】shell脚本忽略错误继续执行
在 shell 脚本中,可以使用 set -e 命令来设置脚本在遇到错误时退出执行。如果你希望脚本忽略错误并继续执行,可以在脚本开头添加 set e 命令来取消该设置。 举例1 #!/bin/bash# 取消 set -e 的设置 set e# 执行命令,并忽略错误 rm somefile…...

Xshell远程连接Kali(默认 | 私钥)Note版
前言:xshell远程连接,私钥连接和常规默认连接 任务一 开启ssh服务 service ssh status //查看ssh服务状态 service ssh start //开启ssh服务 update-rc.d ssh enable //开启自启动ssh服务 任务二 修改配置文件 vi /etc/ssh/ssh_config //第一…...
R语言AI模型部署方案:精准离线运行详解
R语言AI模型部署方案:精准离线运行详解 一、项目概述 本文将构建一个完整的R语言AI部署解决方案,实现鸢尾花分类模型的训练、保存、离线部署和预测功能。核心特点: 100%离线运行能力自包含环境依赖生产级错误处理跨平台兼容性模型版本管理# 文件结构说明 Iris_AI_Deployme…...

visual studio 2022更改主题为深色
visual studio 2022更改主题为深色 点击visual studio 上方的 工具-> 选项 在选项窗口中,选择 环境 -> 常规 ,将其中的颜色主题改成深色 点击确定,更改完成...

UDP(Echoserver)
网络命令 Ping 命令 检测网络是否连通 使用方法: ping -c 次数 网址ping -c 3 www.baidu.comnetstat 命令 netstat 是一个用来查看网络状态的重要工具. 语法:netstat [选项] 功能:查看网络状态 常用选项: n 拒绝显示别名&#…...

蓝牙 BLE 扫描面试题大全(2):进阶面试题与实战演练
前文覆盖了 BLE 扫描的基础概念与经典问题蓝牙 BLE 扫描面试题大全(1):从基础到实战的深度解析-CSDN博客,但实际面试中,企业更关注候选人对复杂场景的应对能力(如多设备并发扫描、低功耗与高发现率的平衡)和前沿技术的…...
Auto-Coder使用GPT-4o完成:在用TabPFN这个模型构建一个预测未来3天涨跌的分类任务
通过akshare库,获取股票数据,并生成TabPFN这个模型 可以识别、处理的格式,写一个完整的预处理示例,并构建一个预测未来 3 天股价涨跌的分类任务 用TabPFN这个模型构建一个预测未来 3 天股价涨跌的分类任务,进行预测并输…...
3403. 从盒子中找出字典序最大的字符串 I
3403. 从盒子中找出字典序最大的字符串 I 题目链接:3403. 从盒子中找出字典序最大的字符串 I 代码如下: class Solution { public:string answerString(string word, int numFriends) {if (numFriends 1) {return word;}string res;for (int i 0;i &…...