当前位置: 首页 > news >正文

即时通讯增加kafka渠道

此次给im服务增加kafka渠道,刚好最近有对SpringCloudStream进行了解,刚好用来练练手

增加kafka渠道

  • pom.xml

引入stream相关依赖

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream</artifactId>
</dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

由于涉及到SpringCloud,可以交由spring-cloud-dependencies统一管理

<dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>2.7.18</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>2021.0.8</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>
  • application.yml

在yml中对stream相关项进行配置

server:port: 18080
cus:ws:exclude-receiver-info-flag: truereceiver-excludes-himself-flag: true#更改渠道为streamcommunication-type: streamcloud:function:#允许stream访问的beandefinition: listenerstream:kafka:binder:#kafka链接信息brokers: ${KAFKA_BROKERS:127.0.0.1:9092}#允许自动创建topicauto-create-topics: truebindings:#消费者bean-in-indexlistener-in-0:#主题destination: TEST_TOPIC
  • RedisSendExecutor

kafka生产者

package com.example.im.infra.executor.send.stream;import com.example.im.infra.constant.ImConstants;
import com.example.im.infra.executor.send.AbstractBaseSendExecutor;
import com.example.im.infra.executor.send.dto.MessageInfo;
import com.example.im.infra.executor.send.dto.ScopeOfSendingEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.stereotype.Component;/*** @author PC* 消息队列执行*/
@Component
public class StreamSendExecutor extends AbstractBaseSendExecutor {private final static Logger logger = LoggerFactory.getLogger(StreamSendExecutor.class);private final StreamBridge streamBridge;@Autowiredpublic StreamSendExecutor(StreamBridge streamBridge) {this.streamBridge = streamBridge;}@Overridepublic String getCommunicationType() {return ImConstants.CommunicationType.STREAM;}@Overridepublic void sendToUser(String sendUserName, String message) {MessageInfo messageInfo = new MessageInfo();messageInfo.setSendUserName(sendUserName);messageInfo.setMessage(message);messageInfo.setScopeOfSending(ScopeOfSendingEnum.USER);logger.debug("send to user stream websocket, topic is " + "TEST_TOPIC");streamBridge.send("TEST_TOPIC", messageInfo);}@Overridepublic void sendToAll(String sendUserName, String message) {MessageInfo messageInfo = new MessageInfo();messageInfo.setSendUserName(sendUserName);messageInfo.setMessage(message);messageInfo.setScopeOfSending(ScopeOfSendingEnum.ALL);logger.debug("send to user stream websocket, topic is " + "TEST_TOPIC");streamBridge.send("TEST_TOPIC", messageInfo);}
}
  • StreamMessageListener

kafka消费者

package com.example.im.infra.executor.send.stream;import com.example.im.infra.executor.send.DefaultSendExecutor;
import com.example.im.infra.executor.send.dto.MessageInfo;
import com.example.im.infra.util.JsonUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import java.nio.charset.StandardCharsets;
import java.util.function.Function;/*** @author PC* 消息队列监听*/
@Component
public class StreamMessageListener {private final static Logger logger = LoggerFactory.getLogger(StreamSendExecutor.class);private DefaultSendExecutor defaultSendExecutor;@Autowiredpublic void setDefaultSendExecutor(DefaultSendExecutor defaultSendExecutor) {this.defaultSendExecutor = defaultSendExecutor;}@Beanpublic Function<Flux<Message<byte[]>>, Mono<Void>> listener() {return messageInfoFlux -> messageInfoFlux.map(message -> {String messageJson = new String(message.getPayload(), StandardCharsets.UTF_8);MessageInfo messageInfo = JsonUtils.toObjectByTypeReference(messageJson, new TypeReference<MessageInfo>() {});switch (messageInfo.getScopeOfSending()) {case USER:defaultSendExecutor.sendToUser(messageInfo.getSendUserName(), messageInfo.getMessage());break;case ALL:defaultSendExecutor.sendToAll(messageInfo.getSendUserName(), messageInfo.getMessage());break;default://一般来说不会出现该情况,除非用户覆盖了ScopeOfSending,后续可以开个扩展发送范围的口子logger.warn("invalid sending range:" + messageInfo.getScopeOfSending().getScopeCode());break;}return messageInfo;}).then();}
}

测试

test2向test1发送消息,成功接收

直接在消息队列中发送消息,test1也接收到了消息

参考资料

[1].SpringCloudStream中文文档

[2].im项目

相关文章:

即时通讯增加kafka渠道

此次给im服务增加kafka渠道&#xff0c;刚好最近有对SpringCloudStream进行了解&#xff0c;刚好用来练练手 增加kafka渠道 pom.xml 引入stream相关依赖 <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-strea…...

建造者模式和工厂模式的区别

工厂模式和建造者模式都是创建型设计模式&#xff0c;它们的主要作用都是为了简化对象的创建过程&#xff0c;但是它们在设计意图和实现细节上有着显著的区别。 总结区别&#xff1a; 关注点不同&#xff1a; 工厂模式关注的是对象的创建。建造者模式关注的是对象的构造过程…...

GEE数据集——ERA5-陆地每日汇总--ECMWF气候再分析数据集

目录 简介 数据集说明 Dataset Availability Dataset Provider Collection Snippet 空间信息 Resolution Bands Table 变量 代码 代码链接 结果 引用 许可 网址推荐 0代码在线构建地图应用 机器学习 简介 注&#xff08;2024-04-19&#xff09;&#xff1a; …...

Spring Boot 中的 @RequestMapping 和 Spring 中的 @RequestMapping 有什么区别?

在Spring框架中&#xff0c;RequestMapping注解用于映射Web请求到处理器&#xff08;Controller&#xff09;的方法上。在Spring Boot中&#xff0c;这个注解的使用方式和目的与Spring框架中是完全相同的。RequestMapping注解可以用于类或方法上&#xff0c;以声明请求的映射。…...

PROFINET开发或EtherNet/IP开发嵌入式归一板有用于工业称重秤

这是真实案例。然而&#xff0c;客户选择不展示其品牌名称。 Anybus嵌入式解决方案帮助工业称重设备制造商连接到任何工业网络。多网络连接使称重设备能够轻松访问不同的控制系统&#xff0c;从而加快上市时间。 我们最终找到了HMSNetworks的Anybus解决方案。他们的成熟技术和专…...

【Kafka】Kafka源码解析之producer过程解读

从本篇开始 打算用三篇文章 分别介绍下Producer生产消费&#xff0c;Consumer消费消息 以及Spring是如何集成Kafka 三部分&#xff0c;致于对于Broker的源码解析&#xff0c;因为是scala语言写的&#xff0c;暂时不打算进行学习分享。 总体介绍 clients : 保存的是Kafka客户端…...

深度学习笔记20_数据增强

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 | 接辅导、项目定制 一、我的环境 1.语言环境&#xff1a;Python 3.9 2.编译器&#xff1a;Pycharm 3.深度学习环境&#xff1a;TensorFlow 2.10.0 二、GPU设置…...

模板变量与php变量对比做判断

${item.create_name}如何与php变量对比 在PHP中&#xff0c;您可以通过将字符串内嵌到双引号中来将模板变量 ${item.create_name} 与PHP变量进行对比。如果您有一个PHP变量 $phpVariable 并且想要检查它是否与 ${item.create_name} 相同&#xff0c;您可以使用 str_replace 函…...

C语言 | Leetcode C语言题解之第485题最大连续1的个数

题目&#xff1a; 题解&#xff1a; int findMaxConsecutiveOnes(int* nums, int numsSize) {int maxCount 0, count 0;for (int i 0; i < numsSize; i) {if (nums[i] 1) {count;} else {maxCount fmax(maxCount, count);count 0;}}maxCount fmax(maxCount, count);…...

C语言复习概要(六)

公主请阅 1. 深入理解数组与指针在C语言中的应用1.1 数组名的理解 2. 使用指针访问数组3. 一维数组传参的本质4. 冒泡排序的实现5. 二级指针6. 指针数组7. 指针数组模拟二维数组8.总结 1. 深入理解数组与指针在C语言中的应用 数组与指针是C语言的核心概念之一&#xff0c;理解…...

PyQt 入门教程(2)搭建开发环境

文章目录 一、搭建开发环境1、安装PyQt5与pyqt5-tools2、配置QtDesigner3、配置Pyuic4、配置Pyrcc 一、搭建开发环境 1、安装PyQt5与pyqt5-tools PyQt5&#xff1a; PyQt的开发库。Pyqt5-tools&#xff1a; 它是一个包含多种工具的工具包&#xff0c;旨在帮助开发者更方便地使…...

Flink Kubernetes Operator

Flink Kubernetes Operator是一个用于在Kubernetes集群上管理Apache Flink应用的工具。 一、基本概念 Flink Kubernetes Operator允许用户通过Kubernetes的原生工具&#xff08;如kubectl&#xff09;来管理Flink应用程序及其生命周期。它简化了Flink应用在Kubernetes集群上的…...

【最新华为OD机试E卷-支持在线评测】字符统计及重排(100分)多语言题解-(Python/C/JavaScript/Java/Cpp)

🍭 大家好这里是春秋招笔试突围 ,一枚热爱算法的程序员 💻 ACM金牌🏅️团队 | 大厂实习经历 | 多年算法竞赛经历 ✨ 本系列打算持续跟新华为OD-E/D卷的多语言AC题解 🧩 大部分包含 Python / C / Javascript / Java / Cpp 多语言代码 👏 感谢大家的订阅➕ 和 喜欢�…...

springboot使用GDAL获取tif文件的缩略图并转为base64

springboot使用GDAL获取tif文件的缩略图并转为base64 首先需要安装gdal&#xff1a;https://blog.csdn.net/qq_61950936/article/details/142880279?spm1001.2014.3001.5501 然后是配置pom.xml文件&#xff1a; <!--处理缩略图的--><dependency><groupId>o…...

Pytorch——pip下载安装pytorch慢的解决办法

一、找到需要下载的pytorch链接 运行&#xff1a;pip install torch1.11.0cu113 torchvision0.12.0cu113 torchaudio0.11.0 --extra-index-url https://download.pytorch.org/whl/cu113。然后得到&#xff1a; 我这里为&#xff1a;https://download.pytorch.org/whl/cu113/t…...

uniapp微信小程序调用百度OCR

uniapp编写微信小程序调用百度OCR 公司有一个识别行驶证需求&#xff0c;调用百度ocr识别 使用了image-tools这个插件&#xff0c;因为百度ocr接口用图片的base64 这里只是简单演示&#xff0c;accesstoken获取接口还是要放在服务器端&#xff0c;不然就暴露了自己的百度项目k…...

Vue3+TS项目---实用的复杂类型定义总结

namespace 概念 在TypeScript中&#xff0c;namespace是一种用于组织代码得结构&#xff0c;主要用于将相关得功能&#xff08;例如类、接口、函数等&#xff09;组合在一起。它可以帮助避免命名冲突&#xff0c;尤其是在大项目中。 用法 1.定义命名空间 使用namespace关键…...

尚硅谷rabbitmq2024 工作模式路由篇 第11节 答疑

String exchangeName "test_direct"; /! 创建交换机 人图全 channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIREcT, b: true, b1: false, b2: false, map: null); /1 创建队列 String queue1Name "test_direct_queue1"; String queue2Name &q…...

HTTP vs WebSocket

本文将对比介绍HTTP 和 WebSocket &#xff01; 相关文章&#xff1a; 1.HTTP 详解 2.WebSocket 详解 一、HTTP&#xff1a;请求/响应的主流协议 HTTP&#xff08;超文本传输协议&#xff09;是用于发送和接收网页数据的标准协议。它最早于1991年由Tim Berners-Lee提出来&…...

R语言医学数据分析实践-数据读写

【图书推荐】《R语言医学数据分析实践》-CSDN博客 《R语言医学数据分析实践 李丹 宋立桓 蔡伟祺 清华大学出版社9787302673484》【摘要 书评 试读】- 京东图书 (jd.com) R语言编程_夏天又到了的博客-CSDN博客 R编程环境的搭建-CSDN博客 在分析公共卫生数据时&#xff0c;数…...

【网络】每天掌握一个Linux命令 - iftop

在Linux系统中&#xff0c;iftop是网络管理的得力助手&#xff0c;能实时监控网络流量、连接情况等&#xff0c;帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...

应用升级/灾备测试时使用guarantee 闪回点迅速回退

1.场景 应用要升级,当升级失败时,数据库回退到升级前. 要测试系统,测试完成后,数据库要回退到测试前。 相对于RMAN恢复需要很长时间&#xff0c; 数据库闪回只需要几分钟。 2.技术实现 数据库设置 2个db_recovery参数 创建guarantee闪回点&#xff0c;不需要开启数据库闪回。…...

React Native 导航系统实战(React Navigation)

导航系统实战&#xff08;React Navigation&#xff09; React Navigation 是 React Native 应用中最常用的导航库之一&#xff0c;它提供了多种导航模式&#xff0c;如堆栈导航&#xff08;Stack Navigator&#xff09;、标签导航&#xff08;Tab Navigator&#xff09;和抽屉…...

《Qt C++ 与 OpenCV:解锁视频播放程序设计的奥秘》

引言:探索视频播放程序设计之旅 在当今数字化时代,多媒体应用已渗透到我们生活的方方面面,从日常的视频娱乐到专业的视频监控、视频会议系统,视频播放程序作为多媒体应用的核心组成部分,扮演着至关重要的角色。无论是在个人电脑、移动设备还是智能电视等平台上,用户都期望…...

【解密LSTM、GRU如何解决传统RNN梯度消失问题】

解密LSTM与GRU&#xff1a;如何让RNN变得更聪明&#xff1f; 在深度学习的世界里&#xff0c;循环神经网络&#xff08;RNN&#xff09;以其卓越的序列数据处理能力广泛应用于自然语言处理、时间序列预测等领域。然而&#xff0c;传统RNN存在的一个严重问题——梯度消失&#…...

2024年赣州旅游投资集团社会招聘笔试真

2024年赣州旅游投资集团社会招聘笔试真 题 ( 满 分 1 0 0 分 时 间 1 2 0 分 钟 ) 一、单选题(每题只有一个正确答案,答错、不答或多答均不得分) 1.纪要的特点不包括()。 A.概括重点 B.指导传达 C. 客观纪实 D.有言必录 【答案】: D 2.1864年,()预言了电磁波的存在,并指出…...

CentOS下的分布式内存计算Spark环境部署

一、Spark 核心架构与应用场景 1.1 分布式计算引擎的核心优势 Spark 是基于内存的分布式计算框架&#xff0c;相比 MapReduce 具有以下核心优势&#xff1a; 内存计算&#xff1a;数据可常驻内存&#xff0c;迭代计算性能提升 10-100 倍&#xff08;文档段落&#xff1a;3-79…...

C++ Visual Studio 2017厂商给的源码没有.sln文件 易兆微芯片下载工具加开机动画下载。

1.先用Visual Studio 2017打开Yichip YC31xx loader.vcxproj&#xff0c;再用Visual Studio 2022打开。再保侟就有.sln文件了。 易兆微芯片下载工具加开机动画下载 ExtraDownloadFile1Info.\logo.bin|0|0|10D2000|0 MFC应用兼容CMD 在BOOL CYichipYC31xxloaderDlg::OnIni…...

从面试角度回答Android中ContentProvider启动原理

Android中ContentProvider原理的面试角度解析&#xff0c;分为​​已启动​​和​​未启动​​两种场景&#xff1a; 一、ContentProvider已启动的情况 1. ​​核心流程​​ ​​触发条件​​&#xff1a;当其他组件&#xff08;如Activity、Service&#xff09;通过ContentR…...

系统掌握PyTorch:图解张量、Autograd、DataLoader、nn.Module与实战模型

本文较长&#xff0c;建议点赞收藏&#xff0c;以免遗失。更多AI大模型应用开发学习视频及资料&#xff0c;尽在聚客AI学院。 本文通过代码驱动的方式&#xff0c;系统讲解PyTorch核心概念和实战技巧&#xff0c;涵盖张量操作、自动微分、数据加载、模型构建和训练全流程&#…...