当前位置: 首页 > news >正文

即时通讯增加kafka渠道

此次给im服务增加kafka渠道,刚好最近有对SpringCloudStream进行了解,刚好用来练练手

增加kafka渠道

  • pom.xml

引入stream相关依赖

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream</artifactId>
</dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

由于涉及到SpringCloud,可以交由spring-cloud-dependencies统一管理

<dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>2.7.18</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>2021.0.8</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>
  • application.yml

在yml中对stream相关项进行配置

server:port: 18080
cus:ws:exclude-receiver-info-flag: truereceiver-excludes-himself-flag: true#更改渠道为streamcommunication-type: streamcloud:function:#允许stream访问的beandefinition: listenerstream:kafka:binder:#kafka链接信息brokers: ${KAFKA_BROKERS:127.0.0.1:9092}#允许自动创建topicauto-create-topics: truebindings:#消费者bean-in-indexlistener-in-0:#主题destination: TEST_TOPIC
  • RedisSendExecutor

kafka生产者

package com.example.im.infra.executor.send.stream;import com.example.im.infra.constant.ImConstants;
import com.example.im.infra.executor.send.AbstractBaseSendExecutor;
import com.example.im.infra.executor.send.dto.MessageInfo;
import com.example.im.infra.executor.send.dto.ScopeOfSendingEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.stereotype.Component;/*** @author PC* 消息队列执行*/
@Component
public class StreamSendExecutor extends AbstractBaseSendExecutor {private final static Logger logger = LoggerFactory.getLogger(StreamSendExecutor.class);private final StreamBridge streamBridge;@Autowiredpublic StreamSendExecutor(StreamBridge streamBridge) {this.streamBridge = streamBridge;}@Overridepublic String getCommunicationType() {return ImConstants.CommunicationType.STREAM;}@Overridepublic void sendToUser(String sendUserName, String message) {MessageInfo messageInfo = new MessageInfo();messageInfo.setSendUserName(sendUserName);messageInfo.setMessage(message);messageInfo.setScopeOfSending(ScopeOfSendingEnum.USER);logger.debug("send to user stream websocket, topic is " + "TEST_TOPIC");streamBridge.send("TEST_TOPIC", messageInfo);}@Overridepublic void sendToAll(String sendUserName, String message) {MessageInfo messageInfo = new MessageInfo();messageInfo.setSendUserName(sendUserName);messageInfo.setMessage(message);messageInfo.setScopeOfSending(ScopeOfSendingEnum.ALL);logger.debug("send to user stream websocket, topic is " + "TEST_TOPIC");streamBridge.send("TEST_TOPIC", messageInfo);}
}
  • StreamMessageListener

kafka消费者

package com.example.im.infra.executor.send.stream;import com.example.im.infra.executor.send.DefaultSendExecutor;
import com.example.im.infra.executor.send.dto.MessageInfo;
import com.example.im.infra.util.JsonUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import java.nio.charset.StandardCharsets;
import java.util.function.Function;/*** @author PC* 消息队列监听*/
@Component
public class StreamMessageListener {private final static Logger logger = LoggerFactory.getLogger(StreamSendExecutor.class);private DefaultSendExecutor defaultSendExecutor;@Autowiredpublic void setDefaultSendExecutor(DefaultSendExecutor defaultSendExecutor) {this.defaultSendExecutor = defaultSendExecutor;}@Beanpublic Function<Flux<Message<byte[]>>, Mono<Void>> listener() {return messageInfoFlux -> messageInfoFlux.map(message -> {String messageJson = new String(message.getPayload(), StandardCharsets.UTF_8);MessageInfo messageInfo = JsonUtils.toObjectByTypeReference(messageJson, new TypeReference<MessageInfo>() {});switch (messageInfo.getScopeOfSending()) {case USER:defaultSendExecutor.sendToUser(messageInfo.getSendUserName(), messageInfo.getMessage());break;case ALL:defaultSendExecutor.sendToAll(messageInfo.getSendUserName(), messageInfo.getMessage());break;default://一般来说不会出现该情况,除非用户覆盖了ScopeOfSending,后续可以开个扩展发送范围的口子logger.warn("invalid sending range:" + messageInfo.getScopeOfSending().getScopeCode());break;}return messageInfo;}).then();}
}

测试

test2向test1发送消息,成功接收

直接在消息队列中发送消息,test1也接收到了消息

参考资料

[1].SpringCloudStream中文文档

[2].im项目

相关文章:

即时通讯增加kafka渠道

此次给im服务增加kafka渠道&#xff0c;刚好最近有对SpringCloudStream进行了解&#xff0c;刚好用来练练手 增加kafka渠道 pom.xml 引入stream相关依赖 <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-strea…...

建造者模式和工厂模式的区别

工厂模式和建造者模式都是创建型设计模式&#xff0c;它们的主要作用都是为了简化对象的创建过程&#xff0c;但是它们在设计意图和实现细节上有着显著的区别。 总结区别&#xff1a; 关注点不同&#xff1a; 工厂模式关注的是对象的创建。建造者模式关注的是对象的构造过程…...

GEE数据集——ERA5-陆地每日汇总--ECMWF气候再分析数据集

目录 简介 数据集说明 Dataset Availability Dataset Provider Collection Snippet 空间信息 Resolution Bands Table 变量 代码 代码链接 结果 引用 许可 网址推荐 0代码在线构建地图应用 机器学习 简介 注&#xff08;2024-04-19&#xff09;&#xff1a; …...

Spring Boot 中的 @RequestMapping 和 Spring 中的 @RequestMapping 有什么区别?

在Spring框架中&#xff0c;RequestMapping注解用于映射Web请求到处理器&#xff08;Controller&#xff09;的方法上。在Spring Boot中&#xff0c;这个注解的使用方式和目的与Spring框架中是完全相同的。RequestMapping注解可以用于类或方法上&#xff0c;以声明请求的映射。…...

PROFINET开发或EtherNet/IP开发嵌入式归一板有用于工业称重秤

这是真实案例。然而&#xff0c;客户选择不展示其品牌名称。 Anybus嵌入式解决方案帮助工业称重设备制造商连接到任何工业网络。多网络连接使称重设备能够轻松访问不同的控制系统&#xff0c;从而加快上市时间。 我们最终找到了HMSNetworks的Anybus解决方案。他们的成熟技术和专…...

【Kafka】Kafka源码解析之producer过程解读

从本篇开始 打算用三篇文章 分别介绍下Producer生产消费&#xff0c;Consumer消费消息 以及Spring是如何集成Kafka 三部分&#xff0c;致于对于Broker的源码解析&#xff0c;因为是scala语言写的&#xff0c;暂时不打算进行学习分享。 总体介绍 clients : 保存的是Kafka客户端…...

深度学习笔记20_数据增强

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 | 接辅导、项目定制 一、我的环境 1.语言环境&#xff1a;Python 3.9 2.编译器&#xff1a;Pycharm 3.深度学习环境&#xff1a;TensorFlow 2.10.0 二、GPU设置…...

模板变量与php变量对比做判断

${item.create_name}如何与php变量对比 在PHP中&#xff0c;您可以通过将字符串内嵌到双引号中来将模板变量 ${item.create_name} 与PHP变量进行对比。如果您有一个PHP变量 $phpVariable 并且想要检查它是否与 ${item.create_name} 相同&#xff0c;您可以使用 str_replace 函…...

C语言 | Leetcode C语言题解之第485题最大连续1的个数

题目&#xff1a; 题解&#xff1a; int findMaxConsecutiveOnes(int* nums, int numsSize) {int maxCount 0, count 0;for (int i 0; i < numsSize; i) {if (nums[i] 1) {count;} else {maxCount fmax(maxCount, count);count 0;}}maxCount fmax(maxCount, count);…...

C语言复习概要(六)

公主请阅 1. 深入理解数组与指针在C语言中的应用1.1 数组名的理解 2. 使用指针访问数组3. 一维数组传参的本质4. 冒泡排序的实现5. 二级指针6. 指针数组7. 指针数组模拟二维数组8.总结 1. 深入理解数组与指针在C语言中的应用 数组与指针是C语言的核心概念之一&#xff0c;理解…...

PyQt 入门教程(2)搭建开发环境

文章目录 一、搭建开发环境1、安装PyQt5与pyqt5-tools2、配置QtDesigner3、配置Pyuic4、配置Pyrcc 一、搭建开发环境 1、安装PyQt5与pyqt5-tools PyQt5&#xff1a; PyQt的开发库。Pyqt5-tools&#xff1a; 它是一个包含多种工具的工具包&#xff0c;旨在帮助开发者更方便地使…...

Flink Kubernetes Operator

Flink Kubernetes Operator是一个用于在Kubernetes集群上管理Apache Flink应用的工具。 一、基本概念 Flink Kubernetes Operator允许用户通过Kubernetes的原生工具&#xff08;如kubectl&#xff09;来管理Flink应用程序及其生命周期。它简化了Flink应用在Kubernetes集群上的…...

【最新华为OD机试E卷-支持在线评测】字符统计及重排(100分)多语言题解-(Python/C/JavaScript/Java/Cpp)

🍭 大家好这里是春秋招笔试突围 ,一枚热爱算法的程序员 💻 ACM金牌🏅️团队 | 大厂实习经历 | 多年算法竞赛经历 ✨ 本系列打算持续跟新华为OD-E/D卷的多语言AC题解 🧩 大部分包含 Python / C / Javascript / Java / Cpp 多语言代码 👏 感谢大家的订阅➕ 和 喜欢�…...

springboot使用GDAL获取tif文件的缩略图并转为base64

springboot使用GDAL获取tif文件的缩略图并转为base64 首先需要安装gdal&#xff1a;https://blog.csdn.net/qq_61950936/article/details/142880279?spm1001.2014.3001.5501 然后是配置pom.xml文件&#xff1a; <!--处理缩略图的--><dependency><groupId>o…...

Pytorch——pip下载安装pytorch慢的解决办法

一、找到需要下载的pytorch链接 运行&#xff1a;pip install torch1.11.0cu113 torchvision0.12.0cu113 torchaudio0.11.0 --extra-index-url https://download.pytorch.org/whl/cu113。然后得到&#xff1a; 我这里为&#xff1a;https://download.pytorch.org/whl/cu113/t…...

uniapp微信小程序调用百度OCR

uniapp编写微信小程序调用百度OCR 公司有一个识别行驶证需求&#xff0c;调用百度ocr识别 使用了image-tools这个插件&#xff0c;因为百度ocr接口用图片的base64 这里只是简单演示&#xff0c;accesstoken获取接口还是要放在服务器端&#xff0c;不然就暴露了自己的百度项目k…...

Vue3+TS项目---实用的复杂类型定义总结

namespace 概念 在TypeScript中&#xff0c;namespace是一种用于组织代码得结构&#xff0c;主要用于将相关得功能&#xff08;例如类、接口、函数等&#xff09;组合在一起。它可以帮助避免命名冲突&#xff0c;尤其是在大项目中。 用法 1.定义命名空间 使用namespace关键…...

尚硅谷rabbitmq2024 工作模式路由篇 第11节 答疑

String exchangeName "test_direct"; /! 创建交换机 人图全 channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIREcT, b: true, b1: false, b2: false, map: null); /1 创建队列 String queue1Name "test_direct_queue1"; String queue2Name &q…...

HTTP vs WebSocket

本文将对比介绍HTTP 和 WebSocket &#xff01; 相关文章&#xff1a; 1.HTTP 详解 2.WebSocket 详解 一、HTTP&#xff1a;请求/响应的主流协议 HTTP&#xff08;超文本传输协议&#xff09;是用于发送和接收网页数据的标准协议。它最早于1991年由Tim Berners-Lee提出来&…...

R语言医学数据分析实践-数据读写

【图书推荐】《R语言医学数据分析实践》-CSDN博客 《R语言医学数据分析实践 李丹 宋立桓 蔡伟祺 清华大学出版社9787302673484》【摘要 书评 试读】- 京东图书 (jd.com) R语言编程_夏天又到了的博客-CSDN博客 R编程环境的搭建-CSDN博客 在分析公共卫生数据时&#xff0c;数…...

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道&#xff08;多模态 OCR → 语义检索 → 答案渲染&#xff09;、两级检索&#xff08;倒排 BM25 向量 HNSW&#xff09;并以大语言模型兜底”的整体框架&#xff1a; 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后&#xff0c;分别用…...

测试微信模版消息推送

进入“开发接口管理”--“公众平台测试账号”&#xff0c;无需申请公众账号、可在测试账号中体验并测试微信公众平台所有高级接口。 获取access_token: 自定义模版消息&#xff1a; 关注测试号&#xff1a;扫二维码关注测试号。 发送模版消息&#xff1a; import requests da…...

ES6从入门到精通:前言

ES6简介 ES6&#xff08;ECMAScript 2015&#xff09;是JavaScript语言的重大更新&#xff0c;引入了许多新特性&#xff0c;包括语法糖、新数据类型、模块化支持等&#xff0c;显著提升了开发效率和代码可维护性。 核心知识点概览 变量声明 let 和 const 取代 var&#xf…...

多场景 OkHttpClient 管理器 - Android 网络通信解决方案

下面是一个完整的 Android 实现&#xff0c;展示如何创建和管理多个 OkHttpClient 实例&#xff0c;分别用于长连接、普通 HTTP 请求和文件下载场景。 <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas…...

centos 7 部署awstats 网站访问检测

一、基础环境准备&#xff08;两种安装方式都要做&#xff09; bash # 安装必要依赖 yum install -y httpd perl mod_perl perl-Time-HiRes perl-DateTime systemctl enable httpd # 设置 Apache 开机自启 systemctl start httpd # 启动 Apache二、安装 AWStats&#xff0…...

蓝桥杯 2024 15届国赛 A组 儿童节快乐

P10576 [蓝桥杯 2024 国 A] 儿童节快乐 题目描述 五彩斑斓的气球在蓝天下悠然飘荡&#xff0c;轻快的音乐在耳边持续回荡&#xff0c;小朋友们手牵着手一同畅快欢笑。在这样一片安乐祥和的氛围下&#xff0c;六一来了。 今天是六一儿童节&#xff0c;小蓝老师为了让大家在节…...

华为OD机试-食堂供餐-二分法

import java.util.Arrays; import java.util.Scanner;public class DemoTest3 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseint a in.nextIn…...

相机Camera日志分析之三十一:高通Camx HAL十种流程基础分析关键字汇总(后续持续更新中)

【关注我,后续持续新增专题博文,谢谢!!!】 上一篇我们讲了:有对最普通的场景进行各个日志注释讲解,但相机场景太多,日志差异也巨大。后面将展示各种场景下的日志。 通过notepad++打开场景下的日志,通过下列分类关键字搜索,即可清晰的分析不同场景的相机运行流程差异…...

根据万维钢·精英日课6的内容,使用AI(2025)可以参考以下方法:

根据万维钢精英日课6的内容&#xff0c;使用AI&#xff08;2025&#xff09;可以参考以下方法&#xff1a; 四个洞见 模型已经比人聪明&#xff1a;以ChatGPT o3为代表的AI非常强大&#xff0c;能运用高级理论解释道理、引用最新学术论文&#xff0c;生成对顶尖科学家都有用的…...

Map相关知识

数据结构 二叉树 二叉树&#xff0c;顾名思义&#xff0c;每个节点最多有两个“叉”&#xff0c;也就是两个子节点&#xff0c;分别是左子 节点和右子节点。不过&#xff0c;二叉树并不要求每个节点都有两个子节点&#xff0c;有的节点只 有左子节点&#xff0c;有的节点只有…...