当前位置: 首页 > news >正文

即时通讯增加kafka渠道

此次给im服务增加kafka渠道,刚好最近有对SpringCloudStream进行了解,刚好用来练练手

增加kafka渠道

  • pom.xml

引入stream相关依赖

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream</artifactId>
</dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

由于涉及到SpringCloud,可以交由spring-cloud-dependencies统一管理

<dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>2.7.18</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>2021.0.8</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>
  • application.yml

在yml中对stream相关项进行配置

server:port: 18080
cus:ws:exclude-receiver-info-flag: truereceiver-excludes-himself-flag: true#更改渠道为streamcommunication-type: streamcloud:function:#允许stream访问的beandefinition: listenerstream:kafka:binder:#kafka链接信息brokers: ${KAFKA_BROKERS:127.0.0.1:9092}#允许自动创建topicauto-create-topics: truebindings:#消费者bean-in-indexlistener-in-0:#主题destination: TEST_TOPIC
  • RedisSendExecutor

kafka生产者

package com.example.im.infra.executor.send.stream;import com.example.im.infra.constant.ImConstants;
import com.example.im.infra.executor.send.AbstractBaseSendExecutor;
import com.example.im.infra.executor.send.dto.MessageInfo;
import com.example.im.infra.executor.send.dto.ScopeOfSendingEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.stereotype.Component;/*** @author PC* 消息队列执行*/
@Component
public class StreamSendExecutor extends AbstractBaseSendExecutor {private final static Logger logger = LoggerFactory.getLogger(StreamSendExecutor.class);private final StreamBridge streamBridge;@Autowiredpublic StreamSendExecutor(StreamBridge streamBridge) {this.streamBridge = streamBridge;}@Overridepublic String getCommunicationType() {return ImConstants.CommunicationType.STREAM;}@Overridepublic void sendToUser(String sendUserName, String message) {MessageInfo messageInfo = new MessageInfo();messageInfo.setSendUserName(sendUserName);messageInfo.setMessage(message);messageInfo.setScopeOfSending(ScopeOfSendingEnum.USER);logger.debug("send to user stream websocket, topic is " + "TEST_TOPIC");streamBridge.send("TEST_TOPIC", messageInfo);}@Overridepublic void sendToAll(String sendUserName, String message) {MessageInfo messageInfo = new MessageInfo();messageInfo.setSendUserName(sendUserName);messageInfo.setMessage(message);messageInfo.setScopeOfSending(ScopeOfSendingEnum.ALL);logger.debug("send to user stream websocket, topic is " + "TEST_TOPIC");streamBridge.send("TEST_TOPIC", messageInfo);}
}
  • StreamMessageListener

kafka消费者

package com.example.im.infra.executor.send.stream;import com.example.im.infra.executor.send.DefaultSendExecutor;
import com.example.im.infra.executor.send.dto.MessageInfo;
import com.example.im.infra.util.JsonUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import java.nio.charset.StandardCharsets;
import java.util.function.Function;/*** @author PC* 消息队列监听*/
@Component
public class StreamMessageListener {private final static Logger logger = LoggerFactory.getLogger(StreamSendExecutor.class);private DefaultSendExecutor defaultSendExecutor;@Autowiredpublic void setDefaultSendExecutor(DefaultSendExecutor defaultSendExecutor) {this.defaultSendExecutor = defaultSendExecutor;}@Beanpublic Function<Flux<Message<byte[]>>, Mono<Void>> listener() {return messageInfoFlux -> messageInfoFlux.map(message -> {String messageJson = new String(message.getPayload(), StandardCharsets.UTF_8);MessageInfo messageInfo = JsonUtils.toObjectByTypeReference(messageJson, new TypeReference<MessageInfo>() {});switch (messageInfo.getScopeOfSending()) {case USER:defaultSendExecutor.sendToUser(messageInfo.getSendUserName(), messageInfo.getMessage());break;case ALL:defaultSendExecutor.sendToAll(messageInfo.getSendUserName(), messageInfo.getMessage());break;default://一般来说不会出现该情况,除非用户覆盖了ScopeOfSending,后续可以开个扩展发送范围的口子logger.warn("invalid sending range:" + messageInfo.getScopeOfSending().getScopeCode());break;}return messageInfo;}).then();}
}

测试

test2向test1发送消息,成功接收

直接在消息队列中发送消息,test1也接收到了消息

参考资料

[1].SpringCloudStream中文文档

[2].im项目

相关文章:

即时通讯增加kafka渠道

此次给im服务增加kafka渠道&#xff0c;刚好最近有对SpringCloudStream进行了解&#xff0c;刚好用来练练手 增加kafka渠道 pom.xml 引入stream相关依赖 <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-strea…...

建造者模式和工厂模式的区别

工厂模式和建造者模式都是创建型设计模式&#xff0c;它们的主要作用都是为了简化对象的创建过程&#xff0c;但是它们在设计意图和实现细节上有着显著的区别。 总结区别&#xff1a; 关注点不同&#xff1a; 工厂模式关注的是对象的创建。建造者模式关注的是对象的构造过程…...

GEE数据集——ERA5-陆地每日汇总--ECMWF气候再分析数据集

目录 简介 数据集说明 Dataset Availability Dataset Provider Collection Snippet 空间信息 Resolution Bands Table 变量 代码 代码链接 结果 引用 许可 网址推荐 0代码在线构建地图应用 机器学习 简介 注&#xff08;2024-04-19&#xff09;&#xff1a; …...

Spring Boot 中的 @RequestMapping 和 Spring 中的 @RequestMapping 有什么区别?

在Spring框架中&#xff0c;RequestMapping注解用于映射Web请求到处理器&#xff08;Controller&#xff09;的方法上。在Spring Boot中&#xff0c;这个注解的使用方式和目的与Spring框架中是完全相同的。RequestMapping注解可以用于类或方法上&#xff0c;以声明请求的映射。…...

PROFINET开发或EtherNet/IP开发嵌入式归一板有用于工业称重秤

这是真实案例。然而&#xff0c;客户选择不展示其品牌名称。 Anybus嵌入式解决方案帮助工业称重设备制造商连接到任何工业网络。多网络连接使称重设备能够轻松访问不同的控制系统&#xff0c;从而加快上市时间。 我们最终找到了HMSNetworks的Anybus解决方案。他们的成熟技术和专…...

【Kafka】Kafka源码解析之producer过程解读

从本篇开始 打算用三篇文章 分别介绍下Producer生产消费&#xff0c;Consumer消费消息 以及Spring是如何集成Kafka 三部分&#xff0c;致于对于Broker的源码解析&#xff0c;因为是scala语言写的&#xff0c;暂时不打算进行学习分享。 总体介绍 clients : 保存的是Kafka客户端…...

深度学习笔记20_数据增强

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 | 接辅导、项目定制 一、我的环境 1.语言环境&#xff1a;Python 3.9 2.编译器&#xff1a;Pycharm 3.深度学习环境&#xff1a;TensorFlow 2.10.0 二、GPU设置…...

模板变量与php变量对比做判断

${item.create_name}如何与php变量对比 在PHP中&#xff0c;您可以通过将字符串内嵌到双引号中来将模板变量 ${item.create_name} 与PHP变量进行对比。如果您有一个PHP变量 $phpVariable 并且想要检查它是否与 ${item.create_name} 相同&#xff0c;您可以使用 str_replace 函…...

C语言 | Leetcode C语言题解之第485题最大连续1的个数

题目&#xff1a; 题解&#xff1a; int findMaxConsecutiveOnes(int* nums, int numsSize) {int maxCount 0, count 0;for (int i 0; i < numsSize; i) {if (nums[i] 1) {count;} else {maxCount fmax(maxCount, count);count 0;}}maxCount fmax(maxCount, count);…...

C语言复习概要(六)

公主请阅 1. 深入理解数组与指针在C语言中的应用1.1 数组名的理解 2. 使用指针访问数组3. 一维数组传参的本质4. 冒泡排序的实现5. 二级指针6. 指针数组7. 指针数组模拟二维数组8.总结 1. 深入理解数组与指针在C语言中的应用 数组与指针是C语言的核心概念之一&#xff0c;理解…...

PyQt 入门教程(2)搭建开发环境

文章目录 一、搭建开发环境1、安装PyQt5与pyqt5-tools2、配置QtDesigner3、配置Pyuic4、配置Pyrcc 一、搭建开发环境 1、安装PyQt5与pyqt5-tools PyQt5&#xff1a; PyQt的开发库。Pyqt5-tools&#xff1a; 它是一个包含多种工具的工具包&#xff0c;旨在帮助开发者更方便地使…...

Flink Kubernetes Operator

Flink Kubernetes Operator是一个用于在Kubernetes集群上管理Apache Flink应用的工具。 一、基本概念 Flink Kubernetes Operator允许用户通过Kubernetes的原生工具&#xff08;如kubectl&#xff09;来管理Flink应用程序及其生命周期。它简化了Flink应用在Kubernetes集群上的…...

【最新华为OD机试E卷-支持在线评测】字符统计及重排(100分)多语言题解-(Python/C/JavaScript/Java/Cpp)

🍭 大家好这里是春秋招笔试突围 ,一枚热爱算法的程序员 💻 ACM金牌🏅️团队 | 大厂实习经历 | 多年算法竞赛经历 ✨ 本系列打算持续跟新华为OD-E/D卷的多语言AC题解 🧩 大部分包含 Python / C / Javascript / Java / Cpp 多语言代码 👏 感谢大家的订阅➕ 和 喜欢�…...

springboot使用GDAL获取tif文件的缩略图并转为base64

springboot使用GDAL获取tif文件的缩略图并转为base64 首先需要安装gdal&#xff1a;https://blog.csdn.net/qq_61950936/article/details/142880279?spm1001.2014.3001.5501 然后是配置pom.xml文件&#xff1a; <!--处理缩略图的--><dependency><groupId>o…...

Pytorch——pip下载安装pytorch慢的解决办法

一、找到需要下载的pytorch链接 运行&#xff1a;pip install torch1.11.0cu113 torchvision0.12.0cu113 torchaudio0.11.0 --extra-index-url https://download.pytorch.org/whl/cu113。然后得到&#xff1a; 我这里为&#xff1a;https://download.pytorch.org/whl/cu113/t…...

uniapp微信小程序调用百度OCR

uniapp编写微信小程序调用百度OCR 公司有一个识别行驶证需求&#xff0c;调用百度ocr识别 使用了image-tools这个插件&#xff0c;因为百度ocr接口用图片的base64 这里只是简单演示&#xff0c;accesstoken获取接口还是要放在服务器端&#xff0c;不然就暴露了自己的百度项目k…...

Vue3+TS项目---实用的复杂类型定义总结

namespace 概念 在TypeScript中&#xff0c;namespace是一种用于组织代码得结构&#xff0c;主要用于将相关得功能&#xff08;例如类、接口、函数等&#xff09;组合在一起。它可以帮助避免命名冲突&#xff0c;尤其是在大项目中。 用法 1.定义命名空间 使用namespace关键…...

尚硅谷rabbitmq2024 工作模式路由篇 第11节 答疑

String exchangeName "test_direct"; /! 创建交换机 人图全 channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIREcT, b: true, b1: false, b2: false, map: null); /1 创建队列 String queue1Name "test_direct_queue1"; String queue2Name &q…...

HTTP vs WebSocket

本文将对比介绍HTTP 和 WebSocket &#xff01; 相关文章&#xff1a; 1.HTTP 详解 2.WebSocket 详解 一、HTTP&#xff1a;请求/响应的主流协议 HTTP&#xff08;超文本传输协议&#xff09;是用于发送和接收网页数据的标准协议。它最早于1991年由Tim Berners-Lee提出来&…...

R语言医学数据分析实践-数据读写

【图书推荐】《R语言医学数据分析实践》-CSDN博客 《R语言医学数据分析实践 李丹 宋立桓 蔡伟祺 清华大学出版社9787302673484》【摘要 书评 试读】- 京东图书 (jd.com) R语言编程_夏天又到了的博客-CSDN博客 R编程环境的搭建-CSDN博客 在分析公共卫生数据时&#xff0c;数…...

【Axure高保真原型】引导弹窗

今天和大家中分享引导弹窗的原型模板&#xff0c;载入页面后&#xff0c;会显示引导弹窗&#xff0c;适用于引导用户使用页面&#xff0c;点击完成后&#xff0c;会显示下一个引导弹窗&#xff0c;直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…...

C++初阶-list的底层

目录 1.std::list实现的所有代码 2.list的简单介绍 2.1实现list的类 2.2_list_iterator的实现 2.2.1_list_iterator实现的原因和好处 2.2.2_list_iterator实现 2.3_list_node的实现 2.3.1. 避免递归的模板依赖 2.3.2. 内存布局一致性 2.3.3. 类型安全的替代方案 2.3.…...

智慧工地云平台源码,基于微服务架构+Java+Spring Cloud +UniApp +MySql

智慧工地管理云平台系统&#xff0c;智慧工地全套源码&#xff0c;java版智慧工地源码&#xff0c;支持PC端、大屏端、移动端。 智慧工地聚焦建筑行业的市场需求&#xff0c;提供“平台网络终端”的整体解决方案&#xff0c;提供劳务管理、视频管理、智能监测、绿色施工、安全管…...

2024年赣州旅游投资集团社会招聘笔试真

2024年赣州旅游投资集团社会招聘笔试真 题 ( 满 分 1 0 0 分 时 间 1 2 0 分 钟 ) 一、单选题(每题只有一个正确答案,答错、不答或多答均不得分) 1.纪要的特点不包括()。 A.概括重点 B.指导传达 C. 客观纪实 D.有言必录 【答案】: D 2.1864年,()预言了电磁波的存在,并指出…...

【SQL学习笔记1】增删改查+多表连接全解析(内附SQL免费在线练习工具)

可以使用Sqliteviz这个网站免费编写sql语句&#xff0c;它能够让用户直接在浏览器内练习SQL的语法&#xff0c;不需要安装任何软件。 链接如下&#xff1a; sqliteviz 注意&#xff1a; 在转写SQL语法时&#xff0c;关键字之间有一个特定的顺序&#xff0c;这个顺序会影响到…...

【JavaWeb】Docker项目部署

引言 之前学习了Linux操作系统的常见命令&#xff0c;在Linux上安装软件&#xff0c;以及如何在Linux上部署一个单体项目&#xff0c;大多数同学都会有相同的感受&#xff0c;那就是麻烦。 核心体现在三点&#xff1a; 命令太多了&#xff0c;记不住 软件安装包名字复杂&…...

ip子接口配置及删除

配置永久生效的子接口&#xff0c;2个IP 都可以登录你这一台服务器。重启不失效。 永久的 [应用] vi /etc/sysconfig/network-scripts/ifcfg-eth0修改文件内内容 TYPE"Ethernet" BOOTPROTO"none" NAME"eth0" DEVICE"eth0" ONBOOT&q…...

Springboot社区养老保险系统小程序

一、前言 随着我国经济迅速发展&#xff0c;人们对手机的需求越来越大&#xff0c;各种手机软件也都在被广泛应用&#xff0c;但是对于手机进行数据信息管理&#xff0c;对于手机的各种软件也是备受用户的喜爱&#xff0c;社区养老保险系统小程序被用户普遍使用&#xff0c;为方…...

Java + Spring Boot + Mybatis 实现批量插入

在 Java 中使用 Spring Boot 和 MyBatis 实现批量插入可以通过以下步骤完成。这里提供两种常用方法&#xff1a;使用 MyBatis 的 <foreach> 标签和批处理模式&#xff08;ExecutorType.BATCH&#xff09;。 方法一&#xff1a;使用 XML 的 <foreach> 标签&#xff…...

给网站添加live2d看板娘

给网站添加live2d看板娘 参考文献&#xff1a; stevenjoezhang/live2d-widget: 把萌萌哒的看板娘抱回家 (ノ≧∇≦)ノ | Live2D widget for web platformEikanya/Live2d-model: Live2d model collectionzenghongtu/live2d-model-assets 前言 网站环境如下&#xff0c;文章也主…...