当前位置: 首页 > news >正文

牛客项目(五)-使用kafka实现发送系统通知

kafka入门以及与spring整合

在这里插入图片描述

Message.java

import java.util.Date;public class Message {private int id;private int fromId;private int toId;private String conversationId;private String content;private int status;private Date createTime;public int getId() {return id;}public void setId(int id) {this.id = id;}public int getFromId() {return fromId;}public void setFromId(int fromId) {this.fromId = fromId;}public int getToId() {return toId;}public void setToId(int toId) {this.toId = toId;}public String getConversationId() {return conversationId;}public void setConversationId(String conversationId) {this.conversationId = conversationId;}public String getContent() {return content;}public void setContent(String content) {this.content = content;}public int getStatus() {return status;}public void setStatus(int status) {this.status = status;}public Date getCreateTime() {return createTime;}public void setCreateTime(Date createTime) {this.createTime = createTime;}@Overridepublic String toString() {return "Message{" +"id=" + id +", fromId=" + fromId +", toId=" + toId +", conversationId='" + conversationId + '\'' +", content='" + content + '\'' +", status=" + status +", createTime=" + createTime +'}';}
}

EventConsumer.java

定义事件消费者

import com.alibaba.fastjson.JSONObject;
import edu.npu.newcoder.community.community.entity.DiscussPost;
import edu.npu.newcoder.community.community.entity.Event;
import edu.npu.newcoder.community.community.entity.Message;
import edu.npu.newcoder.community.community.service.DiscussPostService;
import edu.npu.newcoder.community.community.service.ElasticsearchService;
import edu.npu.newcoder.community.community.service.MessageService;
import edu.npu.newcoder.community.community.util.CommunityConstant;
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.util.Date;
import java.util.HashMap;
import java.util.Map;@Component
public class EventConsumer implements CommunityConstant {
//    private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class);@Autowiredprivate MessageService messageService;@Autowiredprivate DiscussPostService discussPostService;@Autowiredprivate ElasticsearchService elasticsearchService;//加一个监听相关主题的listener@KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})public void handleCommentMessage(ConsumerRecord record){if(record == null || record.value()==null){System.out.println("错误发帖");return;}Event event= JSONObject.parseObject(record.value().toString(),Event.class);if(event == null){System.out.println("错误发帖");return;}//发送站内通知Message message = new Message();message.setFromId(SYSTEM_USERID);message.setToId(event.getEntityUserId());message.setConversationId(event.getTopic());message.setCreateTime(new Date());//message的内容Map<String,Object> content=new HashMap<>();content.put("userId",event.getUserId());content.put("entityType",event.getEntityType());content.put("entityId",event.getEntityId());if(!event.getData().isEmpty()){for(Map.Entry<String,Object> entry:event.getData().entrySet()){content.put(entry.getKey(),entry.getValue());}}message.setContent(JSONObject.toJSONString(content));System.out.println(message);messageService.addMessage(message);System.out.println("成功处理事件");}}

Event.java

定义一个事件实体 以方便在消息的发送与处理

import java.util.HashMap;
import java.util.Map;//用于事件驱动的kafka消息队列开发
public class Event {private String topic;//事件触发的人private int userId;//事件发生在哪个实体private int entityType;private int entityId;//实体作者private int entityUserId;//存储额外数据private Map<String,Object> data = new HashMap<>();public String getTopic() {return topic;}public Event setTopic(String topic) {this.topic = topic;return this;}public int getUserId() {return userId;}public Event setUserId(int userId) {this.userId = userId;return this;}public int getEntityType() {return entityType;}public Event setEntityType(int entityType) {this.entityType = entityType;return this;}public int getEntityId() {return entityId;}public Event setEntityId(int entityId) {this.entityId = entityId;return this;}public int getEntityUserId() {return entityUserId;}public Event setEntityUserId(int entityUserId) {this.entityUserId = entityUserId;return this;}public Map<String, Object> getData() {return data;}public Event setData(String key,Object value) {this.data.put(key,value);return this;}}

EventProducer.java

定义事件的生产者

import com.alibaba.fastjson.JSONObject;
import edu.npu.newcoder.community.community.entity.Event;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;@Component
public class EventProducer {
//生产者使用kafkaTemplate发送消息@AutowiredKafkaTemplate kafkaTemplate;//处理事件public void fireEvent(Event event){//将事件发布到指定的主题//将event转换为json数据进行消息发送kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));System.out.println("成功发送"+event.getTopic());}
}

EventConsumer.java

定义事件消费者

import com.alibaba.fastjson.JSONObject;
import edu.npu.newcoder.community.community.entity.DiscussPost;
import edu.npu.newcoder.community.community.entity.Event;
import edu.npu.newcoder.community.community.entity.Message;
import edu.npu.newcoder.community.community.service.DiscussPostService;
import edu.npu.newcoder.community.community.service.ElasticsearchService;
import edu.npu.newcoder.community.community.service.MessageService;
import edu.npu.newcoder.community.community.util.CommunityConstant;
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.util.Date;
import java.util.HashMap;
import java.util.Map;@Component
public class EventConsumer implements CommunityConstant {
//    private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class);@Autowiredprivate MessageService messageService;@Autowiredprivate DiscussPostService discussPostService;@Autowiredprivate ElasticsearchService elasticsearchService;//加一个监听相关主题的listener@KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})public void handleCommentMessage(ConsumerRecord record){if(record == null || record.value()==null){System.out.println("错误发帖");return;}Event event= JSONObject.parseObject(record.value().toString(),Event.class);if(event == null){System.out.println("错误发帖");return;}//发送站内通知Message message = new Message();message.setFromId(SYSTEM_USERID);message.setToId(event.getEntityUserId());message.setConversationId(event.getTopic());message.setCreateTime(new Date());//message的内容Map<String,Object> content=new HashMap<>();content.put("userId",event.getUserId());content.put("entityType",event.getEntityType());content.put("entityId",event.getEntityId());if(!event.getData().isEmpty()){for(Map.Entry<String,Object> entry:event.getData().entrySet()){content.put(entry.getKey(),entry.getValue());}}message.setContent(JSONObject.toJSONString(content));System.out.println(message);messageService.addMessage(message);System.out.println("成功处理事件");}}

在特定的地方触发消息产生

CommentController

 //触发评论事件Event event=new Event().setTopic(TOPIC_COMMENT).setUserId(hostHolder.getUser().getId()).setEntityType(comment.getEntityType()).setEntityId(comment.getEntityId()).setData("postId",discussPostId);if(comment.getEntityType() == ENTITY_TYPE_POST){DiscussPost target=discussPostService.findDiscussPostById(comment.getEntityId());event.setEntityUserId(target.getUserId());}else if(comment.getEntityType()==ENTITY_TYPE_COMMENT){//根据评论的id查询评论Comment target =commentService.findCommentById(comment.getEntityId());event.setEntityUserId(target.getUserId());}eventProducer.fireEvent(event);

LikeController

 //触发点赞事件if(likeStatus ==1){Event event =new Event().setTopic(TOPIC_LIKE).setUserId(hostHolder.getUser().getId()).setEntityType(entityType).setEntityId(entityId).setEntityUserId(entityUserId).setData("postId",postId);eventProducer.fireEvent(event);}

FollowController

 //触发关注事件Event event = new Event().setTopic(TOPIC_FOLLOW).setUserId(hostHolder.getUser().getId()).setEntityType(entityType).setEntityId(entityId).setEntityUserId(entityId);eventProducer.fireEvent(event);

相关文章:

牛客项目(五)-使用kafka实现发送系统通知

kafka入门以及与spring整合 Message.java import java.util.Date;public class Message {private int id;private int fromId;private int toId;private String conversationId;private String content;private int status;private Date createTime;public int getId() {retur…...

计算机网络——第一章时延部分深入学习、相关习题及详细解析

目录 时延相关 习题1 习题1-改 习题2 时延相关 之前我们学习过&#xff0c;时延由发送时延、传播时延和处理时延三部分构成。 发送时延的计算公式为“分组长度除以发送速率”&#xff0c; 发送速率应该从网卡速率、信道带宽、以及对端的接口速率中取最小。 传播时延的计…...

CSS3媒体查询与页面自适应

2017年9月&#xff0c;W3C发布媒体查询(Media Query Level 4)候选推荐标准规范&#xff0c;它扩展了已经发布的媒体查询的功能。该规范用于CSS的media规则&#xff0c;可以为文档设定特定条件的样式&#xff0c;也可以用于HTML、JavaScript等语言。 1、媒体查询基础 媒体查询…...

UG\NX二次开发 超长的对象属性值,怎么设置

文章作者:里海 来源网站:里海NX二次开发3000例专栏 感谢粉丝订阅 感谢 Dr. Lin 订阅本专栏,非常感谢。 简介 使用UF_ATTR_assign设置对象属性,如果属性值超过UF_ATTR_MAX_STRING_LEN则会报错。 #define UF_ATTR_MAX_STRING_LEN 132 怎么办呢?下面这种方法可以解决: 效果 …...

流媒体服务实现H5实时预览视频

目录 背景方案业务实践细节注意 待办 背景 客户aws服务磁盘存储告急&#xff0c;最高可扩容16T。排查如下&#xff1a;主要是视频文件存在大量复制使用的情况。例如发布节目时复制、预览时复制&#xff0c;这样上传一份视频后最大会有四份拷贝&#xff08;预览、普通发布、互动…...

C++适配器

文章目录 引言栈和队列 priority_queue仿函数迭代器区间 引言 栈的特性是先进后出&#xff0c;队列的特性是先进先出&#xff0c;然而双向队列同时具有栈和队列的特性&#xff0c;所以我们可以通过双向队列来适配出栈和队列。 先看库里面 栈和队列 stack和queue模板参数里面都…...

基于openresty waf二次开发多次匹配到的ip再做拉黑

我们想在openresty waf的基础上做二次开发&#xff0c;比如再精确一些。比如我们先匹配到了select的url我们先打分10分&#xff0c;匹配到cc 1000/s我们再给这个ip打10分…直到100分我们就拉黑这个ip。 [openresty waf][1] #cat reids_w.lua require lib local redis require…...

新一代构建工具Vite-xyphf

一、什么vite? vite:是一款思维比较前卫而且先进的构建工具,他解决了一些webpack解决不了的问题——在开发环境下可以实现按需编译&#xff0c;加快了开发速度。而在生产环境下&#xff0c;它使用Rollup进行打包&#xff0c;提供更好的tree-shaking、代码压缩和性能优化&…...

Flink源码解析三之执行计划⽣成

JobManager Leader 选举 首先flink会依据配置获取RecoveryMode,RecoveryMode一共两两种:STANDALONE和ZOOKEEPER。 如果用户配置的是STANDALONE,会直接去配置中获取JobManager的地址如果用户配置的是ZOOKEEPER,flink会首先尝试连接zookeeper,利用zookeeper的leadder选举服务发现…...

Flutter 常见错误记录总结

1、当 flutter pub get 指令报如下错误时&#xff1a; pub get failed command: "/Users/***/developer/flutter/bin/cache/dart-sdk/bin/dart __deprecated_pub --color --directory . get --example" pub env: { "FLUTTER_ROOT": "/Users/***/dev…...

[ASP]校无忧在线报名系统 v2.1

校无忧在线报名系统为了满足各地不同的报名人员的需求&#xff0c;为提供更为高效、方便、快捷的报名条件&#xff0c;同时也为减轻管理人员的工作难度&#xff1b;更为协调报名人员与管理人员的关系&#xff0c;快速提高了报名人员与管理人员的工作效率应运而生。系统适用于政…...

【Hydro】部分基流分割方法及程序代码说明

目录 说明一、数字滤波法单参数数字滤波Lyne-Hollick滤波法Chapman滤波法Chapman-Maxwell滤波法Boughton-Chapman滤波法 双参数滤波法Eckhardt滤波法 二、其他基流分割方法基流指数&#xff08;BFI&#xff09;法时间步长&#xff08;HYSEP&#xff09;法PART法加里宁-阿里巴扬…...

C#Regex正则表达式(Regular Expression)

在C#中&#xff0c;Regex是正则表达式&#xff08;Regular Expression&#xff09;的缩写&#xff0c;它是一种强大的文本匹配和处理工具。正则表达式是一种用于描述模式的字符串&#xff0c;它可以用来在文本中查找、替换和提取满足特定模式的内容。 在C#中&#xff0c;你可以…...

Wi-Fi还可以做什么?柯南解释IOT应用

大会报告&#xff1a;无线人工智能技术正在改变世界 Wi-Fi还可以做什么&#xff1f;随着带宽的提升&#xff0c;无线终端可以识别出更多的多径&#xff0c;每条多径都可以视作一个虚拟传感器&#xff0c;以感知周边环境。基于此&#xff0c;越来越多的无线感知产品应运而生。20…...

centos部署java程序

后台启动java程序 nohup java -jar -XX:HeapDumpOnOutOfMemoryError -XX:HeapDumpPath/data/app1/logs/ /data/app1.jar --spring.config.location/data/app1/config/application.properties,/data/app1/config/application-dev.properties > /data/app1/logs 2>&1…...

Sqoop导入到Hive,Hive使用 HA

Sqoop写入Hive卡在连接Hive的JDBC上不执行 Sqoop访问 启用 HA模式的Hive 找到Hive的安装根目录&#xff1a;$HIVE_HOME/conf 创建一个新的配置文件&#xff1a;beeline-hs2-connection.xml <?xml version"1.0"?> <?xml-stylesheet type"text/xsl…...

[笔记] %的含义

取模 不赘述。 引导符 重点说一下在printf("%d", n);中的意思。 这里的意思是&#xff1a;将""外对应位置的结果返回给引导符所在的位置&#xff0c; %后面跟着的是结果对应的数据类型&#xff0c; 只有数据类型匹配才能正确输出结果。...

FRI及相关SNARKs的Fiat-Shamir安全

1. 引言 本文主要参考&#xff1a; Alexander R. Block 2023年论文 Fiat-Shamir Security of FRI and Related SNARKsAlbert Garreta 2023年9月在ZK Summit 10上分享 ZK10: Fiat-Shamir security of FRI and related SNARKs - Albert Garreta (Nethermind) 评估参数用的Sage…...

TensorFlow案例学习:使用 YAMNet 进行迁移学习,对音频进行识别

前言 上一篇文章 TensorFlow案例学习&#xff1a;简单的音频识别 我们简单学习了音频识别。这次我们继续学习如何使用成熟的语音分类模型来进行迁移学习 官方教程&#xff1a; 使用 YAMNet 进行迁移学习&#xff0c;用于环境声音分类 模型下载地址&#xff08;需要科学上网&…...

MySQL CHAR 和 VARCHAR 的区别

文章目录 1.区别1.1 存储方式不同1.2 最大长度不同1.3 尾随空格处理方式不同1.4 读写效率不同 2.小结参考文献 在 MySQL 中&#xff0c;CHAR 和 VARCHAR 是两种不同的文本数据类型&#xff0c;CHAR 和 VARCHAR 类型声明时需要指定一个长度&#xff0c;该长度指示您希望存储的最…...

无法与IP建立连接,未能下载VSCode服务器

如题&#xff0c;在远程连接服务器的时候突然遇到了这个提示。 查阅了一圈&#xff0c;发现是VSCode版本自动更新惹的祸&#xff01;&#xff01;&#xff01; 在VSCode的帮助->关于这里发现前几天VSCode自动更新了&#xff0c;我的版本号变成了1.100.3 才导致了远程连接出…...

页面渲染流程与性能优化

页面渲染流程与性能优化详解&#xff08;完整版&#xff09; 一、现代浏览器渲染流程&#xff08;详细说明&#xff09; 1. 构建DOM树 浏览器接收到HTML文档后&#xff0c;会逐步解析并构建DOM&#xff08;Document Object Model&#xff09;树。具体过程如下&#xff1a; (…...

ESP32 I2S音频总线学习笔记(四): INMP441采集音频并实时播放

简介 前面两期文章我们介绍了I2S的读取和写入&#xff0c;一个是通过INMP441麦克风模块采集音频&#xff0c;一个是通过PCM5102A模块播放音频&#xff0c;那如果我们将两者结合起来&#xff0c;将麦克风采集到的音频通过PCM5102A播放&#xff0c;是不是就可以做一个扩音器了呢…...

屋顶变身“发电站” ,中天合创屋面分布式光伏发电项目顺利并网!

5月28日&#xff0c;中天合创屋面分布式光伏发电项目顺利并网发电&#xff0c;该项目位于内蒙古自治区鄂尔多斯市乌审旗&#xff0c;项目利用中天合创聚乙烯、聚丙烯仓库屋面作为场地建设光伏电站&#xff0c;总装机容量为9.96MWp。 项目投运后&#xff0c;每年可节约标煤3670…...

css的定位(position)详解:相对定位 绝对定位 固定定位

在 CSS 中&#xff0c;元素的定位通过 position 属性控制&#xff0c;共有 5 种定位模式&#xff1a;static&#xff08;静态定位&#xff09;、relative&#xff08;相对定位&#xff09;、absolute&#xff08;绝对定位&#xff09;、fixed&#xff08;固定定位&#xff09;和…...

三体问题详解

从物理学角度&#xff0c;三体问题之所以不稳定&#xff0c;是因为三个天体在万有引力作用下相互作用&#xff0c;形成一个非线性耦合系统。我们可以从牛顿经典力学出发&#xff0c;列出具体的运动方程&#xff0c;并说明为何这个系统本质上是混沌的&#xff0c;无法得到一般解…...

工业自动化时代的精准装配革新:迁移科技3D视觉系统如何重塑机器人定位装配

AI3D视觉的工业赋能者 迁移科技成立于2017年&#xff0c;作为行业领先的3D工业相机及视觉系统供应商&#xff0c;累计完成数亿元融资。其核心技术覆盖硬件设计、算法优化及软件集成&#xff0c;通过稳定、易用、高回报的AI3D视觉系统&#xff0c;为汽车、新能源、金属制造等行…...

项目部署到Linux上时遇到的错误(Redis,MySQL,无法正确连接,地址占用问题)

Redis无法正确连接 在运行jar包时出现了这样的错误 查询得知问题核心在于Redis连接失败&#xff0c;具体原因是客户端发送了密码认证请求&#xff0c;但Redis服务器未设置密码 1.为Redis设置密码&#xff08;匹配客户端配置&#xff09; 步骤&#xff1a; 1&#xff09;.修…...

Python Ovito统计金刚石结构数量

大家好,我是小马老师。 本文介绍python ovito方法统计金刚石结构的方法。 Ovito Identify diamond structure命令可以识别和统计金刚石结构,但是无法直接输出结构的变化情况。 本文使用python调用ovito包的方法,可以持续统计各步的金刚石结构,具体代码如下: from ovito…...

MySQL 8.0 事务全面讲解

以下是一个结合两次回答的 MySQL 8.0 事务全面讲解&#xff0c;涵盖了事务的核心概念、操作示例、失败回滚、隔离级别、事务性 DDL 和 XA 事务等内容&#xff0c;并修正了查看隔离级别的命令。 MySQL 8.0 事务全面讲解 一、事务的核心概念&#xff08;ACID&#xff09; 事务是…...