即时通讯增加kafka渠道
此次给im服务增加kafka渠道,刚好最近有对SpringCloudStream进行了解,刚好用来练练手
增加kafka渠道
- pom.xml
引入stream相关依赖
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream</artifactId>
</dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
由于涉及到SpringCloud,可以交由spring-cloud-dependencies统一管理
<dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>2.7.18</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>2021.0.8</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>
- application.yml
在yml中对stream相关项进行配置
server:port: 18080
cus:ws:exclude-receiver-info-flag: truereceiver-excludes-himself-flag: true#更改渠道为streamcommunication-type: streamcloud:function:#允许stream访问的beandefinition: listenerstream:kafka:binder:#kafka链接信息brokers: ${KAFKA_BROKERS:127.0.0.1:9092}#允许自动创建topicauto-create-topics: truebindings:#消费者bean-in-indexlistener-in-0:#主题destination: TEST_TOPIC
- RedisSendExecutor
kafka生产者
package com.example.im.infra.executor.send.stream;import com.example.im.infra.constant.ImConstants;
import com.example.im.infra.executor.send.AbstractBaseSendExecutor;
import com.example.im.infra.executor.send.dto.MessageInfo;
import com.example.im.infra.executor.send.dto.ScopeOfSendingEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.stereotype.Component;/*** @author PC* 消息队列执行*/
@Component
public class StreamSendExecutor extends AbstractBaseSendExecutor {private final static Logger logger = LoggerFactory.getLogger(StreamSendExecutor.class);private final StreamBridge streamBridge;@Autowiredpublic StreamSendExecutor(StreamBridge streamBridge) {this.streamBridge = streamBridge;}@Overridepublic String getCommunicationType() {return ImConstants.CommunicationType.STREAM;}@Overridepublic void sendToUser(String sendUserName, String message) {MessageInfo messageInfo = new MessageInfo();messageInfo.setSendUserName(sendUserName);messageInfo.setMessage(message);messageInfo.setScopeOfSending(ScopeOfSendingEnum.USER);logger.debug("send to user stream websocket, topic is " + "TEST_TOPIC");streamBridge.send("TEST_TOPIC", messageInfo);}@Overridepublic void sendToAll(String sendUserName, String message) {MessageInfo messageInfo = new MessageInfo();messageInfo.setSendUserName(sendUserName);messageInfo.setMessage(message);messageInfo.setScopeOfSending(ScopeOfSendingEnum.ALL);logger.debug("send to user stream websocket, topic is " + "TEST_TOPIC");streamBridge.send("TEST_TOPIC", messageInfo);}
}
- StreamMessageListener
kafka消费者
package com.example.im.infra.executor.send.stream;import com.example.im.infra.executor.send.DefaultSendExecutor;
import com.example.im.infra.executor.send.dto.MessageInfo;
import com.example.im.infra.util.JsonUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import java.nio.charset.StandardCharsets;
import java.util.function.Function;/*** @author PC* 消息队列监听*/
@Component
public class StreamMessageListener {private final static Logger logger = LoggerFactory.getLogger(StreamSendExecutor.class);private DefaultSendExecutor defaultSendExecutor;@Autowiredpublic void setDefaultSendExecutor(DefaultSendExecutor defaultSendExecutor) {this.defaultSendExecutor = defaultSendExecutor;}@Beanpublic Function<Flux<Message<byte[]>>, Mono<Void>> listener() {return messageInfoFlux -> messageInfoFlux.map(message -> {String messageJson = new String(message.getPayload(), StandardCharsets.UTF_8);MessageInfo messageInfo = JsonUtils.toObjectByTypeReference(messageJson, new TypeReference<MessageInfo>() {});switch (messageInfo.getScopeOfSending()) {case USER:defaultSendExecutor.sendToUser(messageInfo.getSendUserName(), messageInfo.getMessage());break;case ALL:defaultSendExecutor.sendToAll(messageInfo.getSendUserName(), messageInfo.getMessage());break;default://一般来说不会出现该情况,除非用户覆盖了ScopeOfSending,后续可以开个扩展发送范围的口子logger.warn("invalid sending range:" + messageInfo.getScopeOfSending().getScopeCode());break;}return messageInfo;}).then();}
}
测试
test2向test1发送消息,成功接收


直接在消息队列中发送消息,test1也接收到了消息


参考资料
[1].SpringCloudStream中文文档
[2].im项目
相关文章:
即时通讯增加kafka渠道
此次给im服务增加kafka渠道,刚好最近有对SpringCloudStream进行了解,刚好用来练练手 增加kafka渠道 pom.xml 引入stream相关依赖 <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-strea…...
建造者模式和工厂模式的区别
工厂模式和建造者模式都是创建型设计模式,它们的主要作用都是为了简化对象的创建过程,但是它们在设计意图和实现细节上有着显著的区别。 总结区别: 关注点不同: 工厂模式关注的是对象的创建。建造者模式关注的是对象的构造过程…...
GEE数据集——ERA5-陆地每日汇总--ECMWF气候再分析数据集
目录 简介 数据集说明 Dataset Availability Dataset Provider Collection Snippet 空间信息 Resolution Bands Table 变量 代码 代码链接 结果 引用 许可 网址推荐 0代码在线构建地图应用 机器学习 简介 注(2024-04-19): …...
Spring Boot 中的 @RequestMapping 和 Spring 中的 @RequestMapping 有什么区别?
在Spring框架中,RequestMapping注解用于映射Web请求到处理器(Controller)的方法上。在Spring Boot中,这个注解的使用方式和目的与Spring框架中是完全相同的。RequestMapping注解可以用于类或方法上,以声明请求的映射。…...
PROFINET开发或EtherNet/IP开发嵌入式归一板有用于工业称重秤
这是真实案例。然而,客户选择不展示其品牌名称。 Anybus嵌入式解决方案帮助工业称重设备制造商连接到任何工业网络。多网络连接使称重设备能够轻松访问不同的控制系统,从而加快上市时间。 我们最终找到了HMSNetworks的Anybus解决方案。他们的成熟技术和专…...
【Kafka】Kafka源码解析之producer过程解读
从本篇开始 打算用三篇文章 分别介绍下Producer生产消费,Consumer消费消息 以及Spring是如何集成Kafka 三部分,致于对于Broker的源码解析,因为是scala语言写的,暂时不打算进行学习分享。 总体介绍 clients : 保存的是Kafka客户端…...
深度学习笔记20_数据增强
🍨 本文为🔗365天深度学习训练营 中的学习记录博客🍖 原作者:K同学啊 | 接辅导、项目定制 一、我的环境 1.语言环境:Python 3.9 2.编译器:Pycharm 3.深度学习环境:TensorFlow 2.10.0 二、GPU设置…...
模板变量与php变量对比做判断
${item.create_name}如何与php变量对比 在PHP中,您可以通过将字符串内嵌到双引号中来将模板变量 ${item.create_name} 与PHP变量进行对比。如果您有一个PHP变量 $phpVariable 并且想要检查它是否与 ${item.create_name} 相同,您可以使用 str_replace 函…...
C语言 | Leetcode C语言题解之第485题最大连续1的个数
题目: 题解: int findMaxConsecutiveOnes(int* nums, int numsSize) {int maxCount 0, count 0;for (int i 0; i < numsSize; i) {if (nums[i] 1) {count;} else {maxCount fmax(maxCount, count);count 0;}}maxCount fmax(maxCount, count);…...
C语言复习概要(六)
公主请阅 1. 深入理解数组与指针在C语言中的应用1.1 数组名的理解 2. 使用指针访问数组3. 一维数组传参的本质4. 冒泡排序的实现5. 二级指针6. 指针数组7. 指针数组模拟二维数组8.总结 1. 深入理解数组与指针在C语言中的应用 数组与指针是C语言的核心概念之一,理解…...
PyQt 入门教程(2)搭建开发环境
文章目录 一、搭建开发环境1、安装PyQt5与pyqt5-tools2、配置QtDesigner3、配置Pyuic4、配置Pyrcc 一、搭建开发环境 1、安装PyQt5与pyqt5-tools PyQt5: PyQt的开发库。Pyqt5-tools: 它是一个包含多种工具的工具包,旨在帮助开发者更方便地使…...
Flink Kubernetes Operator
Flink Kubernetes Operator是一个用于在Kubernetes集群上管理Apache Flink应用的工具。 一、基本概念 Flink Kubernetes Operator允许用户通过Kubernetes的原生工具(如kubectl)来管理Flink应用程序及其生命周期。它简化了Flink应用在Kubernetes集群上的…...
【最新华为OD机试E卷-支持在线评测】字符统计及重排(100分)多语言题解-(Python/C/JavaScript/Java/Cpp)
🍭 大家好这里是春秋招笔试突围 ,一枚热爱算法的程序员 💻 ACM金牌🏅️团队 | 大厂实习经历 | 多年算法竞赛经历 ✨ 本系列打算持续跟新华为OD-E/D卷的多语言AC题解 🧩 大部分包含 Python / C / Javascript / Java / Cpp 多语言代码 👏 感谢大家的订阅➕ 和 喜欢�…...
springboot使用GDAL获取tif文件的缩略图并转为base64
springboot使用GDAL获取tif文件的缩略图并转为base64 首先需要安装gdal:https://blog.csdn.net/qq_61950936/article/details/142880279?spm1001.2014.3001.5501 然后是配置pom.xml文件: <!--处理缩略图的--><dependency><groupId>o…...
Pytorch——pip下载安装pytorch慢的解决办法
一、找到需要下载的pytorch链接 运行:pip install torch1.11.0cu113 torchvision0.12.0cu113 torchaudio0.11.0 --extra-index-url https://download.pytorch.org/whl/cu113。然后得到: 我这里为:https://download.pytorch.org/whl/cu113/t…...
uniapp微信小程序调用百度OCR
uniapp编写微信小程序调用百度OCR 公司有一个识别行驶证需求,调用百度ocr识别 使用了image-tools这个插件,因为百度ocr接口用图片的base64 这里只是简单演示,accesstoken获取接口还是要放在服务器端,不然就暴露了自己的百度项目k…...
Vue3+TS项目---实用的复杂类型定义总结
namespace 概念 在TypeScript中,namespace是一种用于组织代码得结构,主要用于将相关得功能(例如类、接口、函数等)组合在一起。它可以帮助避免命名冲突,尤其是在大项目中。 用法 1.定义命名空间 使用namespace关键…...
尚硅谷rabbitmq2024 工作模式路由篇 第11节 答疑
String exchangeName "test_direct"; /! 创建交换机 人图全 channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIREcT, b: true, b1: false, b2: false, map: null); /1 创建队列 String queue1Name "test_direct_queue1"; String queue2Name &q…...
HTTP vs WebSocket
本文将对比介绍HTTP 和 WebSocket ! 相关文章: 1.HTTP 详解 2.WebSocket 详解 一、HTTP:请求/响应的主流协议 HTTP(超文本传输协议)是用于发送和接收网页数据的标准协议。它最早于1991年由Tim Berners-Lee提出来&…...
R语言医学数据分析实践-数据读写
【图书推荐】《R语言医学数据分析实践》-CSDN博客 《R语言医学数据分析实践 李丹 宋立桓 蔡伟祺 清华大学出版社9787302673484》【摘要 书评 试读】- 京东图书 (jd.com) R语言编程_夏天又到了的博客-CSDN博客 R编程环境的搭建-CSDN博客 在分析公共卫生数据时,数…...
【Axure高保真原型】引导弹窗
今天和大家中分享引导弹窗的原型模板,载入页面后,会显示引导弹窗,适用于引导用户使用页面,点击完成后,会显示下一个引导弹窗,直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…...
C++初阶-list的底层
目录 1.std::list实现的所有代码 2.list的简单介绍 2.1实现list的类 2.2_list_iterator的实现 2.2.1_list_iterator实现的原因和好处 2.2.2_list_iterator实现 2.3_list_node的实现 2.3.1. 避免递归的模板依赖 2.3.2. 内存布局一致性 2.3.3. 类型安全的替代方案 2.3.…...
智慧工地云平台源码,基于微服务架构+Java+Spring Cloud +UniApp +MySql
智慧工地管理云平台系统,智慧工地全套源码,java版智慧工地源码,支持PC端、大屏端、移动端。 智慧工地聚焦建筑行业的市场需求,提供“平台网络终端”的整体解决方案,提供劳务管理、视频管理、智能监测、绿色施工、安全管…...
2024年赣州旅游投资集团社会招聘笔试真
2024年赣州旅游投资集团社会招聘笔试真 题 ( 满 分 1 0 0 分 时 间 1 2 0 分 钟 ) 一、单选题(每题只有一个正确答案,答错、不答或多答均不得分) 1.纪要的特点不包括()。 A.概括重点 B.指导传达 C. 客观纪实 D.有言必录 【答案】: D 2.1864年,()预言了电磁波的存在,并指出…...
【SQL学习笔记1】增删改查+多表连接全解析(内附SQL免费在线练习工具)
可以使用Sqliteviz这个网站免费编写sql语句,它能够让用户直接在浏览器内练习SQL的语法,不需要安装任何软件。 链接如下: sqliteviz 注意: 在转写SQL语法时,关键字之间有一个特定的顺序,这个顺序会影响到…...
【JavaWeb】Docker项目部署
引言 之前学习了Linux操作系统的常见命令,在Linux上安装软件,以及如何在Linux上部署一个单体项目,大多数同学都会有相同的感受,那就是麻烦。 核心体现在三点: 命令太多了,记不住 软件安装包名字复杂&…...
ip子接口配置及删除
配置永久生效的子接口,2个IP 都可以登录你这一台服务器。重启不失效。 永久的 [应用] vi /etc/sysconfig/network-scripts/ifcfg-eth0修改文件内内容 TYPE"Ethernet" BOOTPROTO"none" NAME"eth0" DEVICE"eth0" ONBOOT&q…...
Springboot社区养老保险系统小程序
一、前言 随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也是备受用户的喜爱,社区养老保险系统小程序被用户普遍使用,为方…...
Java + Spring Boot + Mybatis 实现批量插入
在 Java 中使用 Spring Boot 和 MyBatis 实现批量插入可以通过以下步骤完成。这里提供两种常用方法:使用 MyBatis 的 <foreach> 标签和批处理模式(ExecutorType.BATCH)。 方法一:使用 XML 的 <foreach> 标签ÿ…...
给网站添加live2d看板娘
给网站添加live2d看板娘 参考文献: stevenjoezhang/live2d-widget: 把萌萌哒的看板娘抱回家 (ノ≧∇≦)ノ | Live2D widget for web platformEikanya/Live2d-model: Live2d model collectionzenghongtu/live2d-model-assets 前言 网站环境如下,文章也主…...
