当前位置: 首页 > news >正文

牛客项目(五)-使用kafka实现发送系统通知

kafka入门以及与spring整合

在这里插入图片描述

Message.java

import java.util.Date;public class Message {private int id;private int fromId;private int toId;private String conversationId;private String content;private int status;private Date createTime;public int getId() {return id;}public void setId(int id) {this.id = id;}public int getFromId() {return fromId;}public void setFromId(int fromId) {this.fromId = fromId;}public int getToId() {return toId;}public void setToId(int toId) {this.toId = toId;}public String getConversationId() {return conversationId;}public void setConversationId(String conversationId) {this.conversationId = conversationId;}public String getContent() {return content;}public void setContent(String content) {this.content = content;}public int getStatus() {return status;}public void setStatus(int status) {this.status = status;}public Date getCreateTime() {return createTime;}public void setCreateTime(Date createTime) {this.createTime = createTime;}@Overridepublic String toString() {return "Message{" +"id=" + id +", fromId=" + fromId +", toId=" + toId +", conversationId='" + conversationId + '\'' +", content='" + content + '\'' +", status=" + status +", createTime=" + createTime +'}';}
}

EventConsumer.java

定义事件消费者

import com.alibaba.fastjson.JSONObject;
import edu.npu.newcoder.community.community.entity.DiscussPost;
import edu.npu.newcoder.community.community.entity.Event;
import edu.npu.newcoder.community.community.entity.Message;
import edu.npu.newcoder.community.community.service.DiscussPostService;
import edu.npu.newcoder.community.community.service.ElasticsearchService;
import edu.npu.newcoder.community.community.service.MessageService;
import edu.npu.newcoder.community.community.util.CommunityConstant;
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.util.Date;
import java.util.HashMap;
import java.util.Map;@Component
public class EventConsumer implements CommunityConstant {
//    private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class);@Autowiredprivate MessageService messageService;@Autowiredprivate DiscussPostService discussPostService;@Autowiredprivate ElasticsearchService elasticsearchService;//加一个监听相关主题的listener@KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})public void handleCommentMessage(ConsumerRecord record){if(record == null || record.value()==null){System.out.println("错误发帖");return;}Event event= JSONObject.parseObject(record.value().toString(),Event.class);if(event == null){System.out.println("错误发帖");return;}//发送站内通知Message message = new Message();message.setFromId(SYSTEM_USERID);message.setToId(event.getEntityUserId());message.setConversationId(event.getTopic());message.setCreateTime(new Date());//message的内容Map<String,Object> content=new HashMap<>();content.put("userId",event.getUserId());content.put("entityType",event.getEntityType());content.put("entityId",event.getEntityId());if(!event.getData().isEmpty()){for(Map.Entry<String,Object> entry:event.getData().entrySet()){content.put(entry.getKey(),entry.getValue());}}message.setContent(JSONObject.toJSONString(content));System.out.println(message);messageService.addMessage(message);System.out.println("成功处理事件");}}

Event.java

定义一个事件实体 以方便在消息的发送与处理

import java.util.HashMap;
import java.util.Map;//用于事件驱动的kafka消息队列开发
public class Event {private String topic;//事件触发的人private int userId;//事件发生在哪个实体private int entityType;private int entityId;//实体作者private int entityUserId;//存储额外数据private Map<String,Object> data = new HashMap<>();public String getTopic() {return topic;}public Event setTopic(String topic) {this.topic = topic;return this;}public int getUserId() {return userId;}public Event setUserId(int userId) {this.userId = userId;return this;}public int getEntityType() {return entityType;}public Event setEntityType(int entityType) {this.entityType = entityType;return this;}public int getEntityId() {return entityId;}public Event setEntityId(int entityId) {this.entityId = entityId;return this;}public int getEntityUserId() {return entityUserId;}public Event setEntityUserId(int entityUserId) {this.entityUserId = entityUserId;return this;}public Map<String, Object> getData() {return data;}public Event setData(String key,Object value) {this.data.put(key,value);return this;}}

EventProducer.java

定义事件的生产者

import com.alibaba.fastjson.JSONObject;
import edu.npu.newcoder.community.community.entity.Event;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;@Component
public class EventProducer {
//生产者使用kafkaTemplate发送消息@AutowiredKafkaTemplate kafkaTemplate;//处理事件public void fireEvent(Event event){//将事件发布到指定的主题//将event转换为json数据进行消息发送kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));System.out.println("成功发送"+event.getTopic());}
}

EventConsumer.java

定义事件消费者

import com.alibaba.fastjson.JSONObject;
import edu.npu.newcoder.community.community.entity.DiscussPost;
import edu.npu.newcoder.community.community.entity.Event;
import edu.npu.newcoder.community.community.entity.Message;
import edu.npu.newcoder.community.community.service.DiscussPostService;
import edu.npu.newcoder.community.community.service.ElasticsearchService;
import edu.npu.newcoder.community.community.service.MessageService;
import edu.npu.newcoder.community.community.util.CommunityConstant;
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.util.Date;
import java.util.HashMap;
import java.util.Map;@Component
public class EventConsumer implements CommunityConstant {
//    private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class);@Autowiredprivate MessageService messageService;@Autowiredprivate DiscussPostService discussPostService;@Autowiredprivate ElasticsearchService elasticsearchService;//加一个监听相关主题的listener@KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})public void handleCommentMessage(ConsumerRecord record){if(record == null || record.value()==null){System.out.println("错误发帖");return;}Event event= JSONObject.parseObject(record.value().toString(),Event.class);if(event == null){System.out.println("错误发帖");return;}//发送站内通知Message message = new Message();message.setFromId(SYSTEM_USERID);message.setToId(event.getEntityUserId());message.setConversationId(event.getTopic());message.setCreateTime(new Date());//message的内容Map<String,Object> content=new HashMap<>();content.put("userId",event.getUserId());content.put("entityType",event.getEntityType());content.put("entityId",event.getEntityId());if(!event.getData().isEmpty()){for(Map.Entry<String,Object> entry:event.getData().entrySet()){content.put(entry.getKey(),entry.getValue());}}message.setContent(JSONObject.toJSONString(content));System.out.println(message);messageService.addMessage(message);System.out.println("成功处理事件");}}

在特定的地方触发消息产生

CommentController

 //触发评论事件Event event=new Event().setTopic(TOPIC_COMMENT).setUserId(hostHolder.getUser().getId()).setEntityType(comment.getEntityType()).setEntityId(comment.getEntityId()).setData("postId",discussPostId);if(comment.getEntityType() == ENTITY_TYPE_POST){DiscussPost target=discussPostService.findDiscussPostById(comment.getEntityId());event.setEntityUserId(target.getUserId());}else if(comment.getEntityType()==ENTITY_TYPE_COMMENT){//根据评论的id查询评论Comment target =commentService.findCommentById(comment.getEntityId());event.setEntityUserId(target.getUserId());}eventProducer.fireEvent(event);

LikeController

 //触发点赞事件if(likeStatus ==1){Event event =new Event().setTopic(TOPIC_LIKE).setUserId(hostHolder.getUser().getId()).setEntityType(entityType).setEntityId(entityId).setEntityUserId(entityUserId).setData("postId",postId);eventProducer.fireEvent(event);}

FollowController

 //触发关注事件Event event = new Event().setTopic(TOPIC_FOLLOW).setUserId(hostHolder.getUser().getId()).setEntityType(entityType).setEntityId(entityId).setEntityUserId(entityId);eventProducer.fireEvent(event);

相关文章:

牛客项目(五)-使用kafka实现发送系统通知

kafka入门以及与spring整合 Message.java import java.util.Date;public class Message {private int id;private int fromId;private int toId;private String conversationId;private String content;private int status;private Date createTime;public int getId() {retur…...

计算机网络——第一章时延部分深入学习、相关习题及详细解析

目录 时延相关 习题1 习题1-改 习题2 时延相关 之前我们学习过&#xff0c;时延由发送时延、传播时延和处理时延三部分构成。 发送时延的计算公式为“分组长度除以发送速率”&#xff0c; 发送速率应该从网卡速率、信道带宽、以及对端的接口速率中取最小。 传播时延的计…...

CSS3媒体查询与页面自适应

2017年9月&#xff0c;W3C发布媒体查询(Media Query Level 4)候选推荐标准规范&#xff0c;它扩展了已经发布的媒体查询的功能。该规范用于CSS的media规则&#xff0c;可以为文档设定特定条件的样式&#xff0c;也可以用于HTML、JavaScript等语言。 1、媒体查询基础 媒体查询…...

UG\NX二次开发 超长的对象属性值,怎么设置

文章作者:里海 来源网站:里海NX二次开发3000例专栏 感谢粉丝订阅 感谢 Dr. Lin 订阅本专栏,非常感谢。 简介 使用UF_ATTR_assign设置对象属性,如果属性值超过UF_ATTR_MAX_STRING_LEN则会报错。 #define UF_ATTR_MAX_STRING_LEN 132 怎么办呢?下面这种方法可以解决: 效果 …...

流媒体服务实现H5实时预览视频

目录 背景方案业务实践细节注意 待办 背景 客户aws服务磁盘存储告急&#xff0c;最高可扩容16T。排查如下&#xff1a;主要是视频文件存在大量复制使用的情况。例如发布节目时复制、预览时复制&#xff0c;这样上传一份视频后最大会有四份拷贝&#xff08;预览、普通发布、互动…...

C++适配器

文章目录 引言栈和队列 priority_queue仿函数迭代器区间 引言 栈的特性是先进后出&#xff0c;队列的特性是先进先出&#xff0c;然而双向队列同时具有栈和队列的特性&#xff0c;所以我们可以通过双向队列来适配出栈和队列。 先看库里面 栈和队列 stack和queue模板参数里面都…...

基于openresty waf二次开发多次匹配到的ip再做拉黑

我们想在openresty waf的基础上做二次开发&#xff0c;比如再精确一些。比如我们先匹配到了select的url我们先打分10分&#xff0c;匹配到cc 1000/s我们再给这个ip打10分…直到100分我们就拉黑这个ip。 [openresty waf][1] #cat reids_w.lua require lib local redis require…...

新一代构建工具Vite-xyphf

一、什么vite? vite:是一款思维比较前卫而且先进的构建工具,他解决了一些webpack解决不了的问题——在开发环境下可以实现按需编译&#xff0c;加快了开发速度。而在生产环境下&#xff0c;它使用Rollup进行打包&#xff0c;提供更好的tree-shaking、代码压缩和性能优化&…...

Flink源码解析三之执行计划⽣成

JobManager Leader 选举 首先flink会依据配置获取RecoveryMode,RecoveryMode一共两两种:STANDALONE和ZOOKEEPER。 如果用户配置的是STANDALONE,会直接去配置中获取JobManager的地址如果用户配置的是ZOOKEEPER,flink会首先尝试连接zookeeper,利用zookeeper的leadder选举服务发现…...

Flutter 常见错误记录总结

1、当 flutter pub get 指令报如下错误时&#xff1a; pub get failed command: "/Users/***/developer/flutter/bin/cache/dart-sdk/bin/dart __deprecated_pub --color --directory . get --example" pub env: { "FLUTTER_ROOT": "/Users/***/dev…...

[ASP]校无忧在线报名系统 v2.1

校无忧在线报名系统为了满足各地不同的报名人员的需求&#xff0c;为提供更为高效、方便、快捷的报名条件&#xff0c;同时也为减轻管理人员的工作难度&#xff1b;更为协调报名人员与管理人员的关系&#xff0c;快速提高了报名人员与管理人员的工作效率应运而生。系统适用于政…...

【Hydro】部分基流分割方法及程序代码说明

目录 说明一、数字滤波法单参数数字滤波Lyne-Hollick滤波法Chapman滤波法Chapman-Maxwell滤波法Boughton-Chapman滤波法 双参数滤波法Eckhardt滤波法 二、其他基流分割方法基流指数&#xff08;BFI&#xff09;法时间步长&#xff08;HYSEP&#xff09;法PART法加里宁-阿里巴扬…...

C#Regex正则表达式(Regular Expression)

在C#中&#xff0c;Regex是正则表达式&#xff08;Regular Expression&#xff09;的缩写&#xff0c;它是一种强大的文本匹配和处理工具。正则表达式是一种用于描述模式的字符串&#xff0c;它可以用来在文本中查找、替换和提取满足特定模式的内容。 在C#中&#xff0c;你可以…...

Wi-Fi还可以做什么?柯南解释IOT应用

大会报告&#xff1a;无线人工智能技术正在改变世界 Wi-Fi还可以做什么&#xff1f;随着带宽的提升&#xff0c;无线终端可以识别出更多的多径&#xff0c;每条多径都可以视作一个虚拟传感器&#xff0c;以感知周边环境。基于此&#xff0c;越来越多的无线感知产品应运而生。20…...

centos部署java程序

后台启动java程序 nohup java -jar -XX:HeapDumpOnOutOfMemoryError -XX:HeapDumpPath/data/app1/logs/ /data/app1.jar --spring.config.location/data/app1/config/application.properties,/data/app1/config/application-dev.properties > /data/app1/logs 2>&1…...

Sqoop导入到Hive,Hive使用 HA

Sqoop写入Hive卡在连接Hive的JDBC上不执行 Sqoop访问 启用 HA模式的Hive 找到Hive的安装根目录&#xff1a;$HIVE_HOME/conf 创建一个新的配置文件&#xff1a;beeline-hs2-connection.xml <?xml version"1.0"?> <?xml-stylesheet type"text/xsl…...

[笔记] %的含义

取模 不赘述。 引导符 重点说一下在printf("%d", n);中的意思。 这里的意思是&#xff1a;将""外对应位置的结果返回给引导符所在的位置&#xff0c; %后面跟着的是结果对应的数据类型&#xff0c; 只有数据类型匹配才能正确输出结果。...

FRI及相关SNARKs的Fiat-Shamir安全

1. 引言 本文主要参考&#xff1a; Alexander R. Block 2023年论文 Fiat-Shamir Security of FRI and Related SNARKsAlbert Garreta 2023年9月在ZK Summit 10上分享 ZK10: Fiat-Shamir security of FRI and related SNARKs - Albert Garreta (Nethermind) 评估参数用的Sage…...

TensorFlow案例学习:使用 YAMNet 进行迁移学习,对音频进行识别

前言 上一篇文章 TensorFlow案例学习&#xff1a;简单的音频识别 我们简单学习了音频识别。这次我们继续学习如何使用成熟的语音分类模型来进行迁移学习 官方教程&#xff1a; 使用 YAMNet 进行迁移学习&#xff0c;用于环境声音分类 模型下载地址&#xff08;需要科学上网&…...

MySQL CHAR 和 VARCHAR 的区别

文章目录 1.区别1.1 存储方式不同1.2 最大长度不同1.3 尾随空格处理方式不同1.4 读写效率不同 2.小结参考文献 在 MySQL 中&#xff0c;CHAR 和 VARCHAR 是两种不同的文本数据类型&#xff0c;CHAR 和 VARCHAR 类型声明时需要指定一个长度&#xff0c;该长度指示您希望存储的最…...

idea大量爆红问题解决

问题描述 在学习和工作中&#xff0c;idea是程序员不可缺少的一个工具&#xff0c;但是突然在有些时候就会出现大量爆红的问题&#xff0c;发现无法跳转&#xff0c;无论是关机重启或者是替换root都无法解决 就是如上所展示的问题&#xff0c;但是程序依然可以启动。 问题解决…...

Java 语言特性(面试系列1)

一、面向对象编程 1. 封装&#xff08;Encapsulation&#xff09; 定义&#xff1a;将数据&#xff08;属性&#xff09;和操作数据的方法绑定在一起&#xff0c;通过访问控制符&#xff08;private、protected、public&#xff09;隐藏内部实现细节。示例&#xff1a; public …...

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility 1. 实验室环境1.1 实验室环境1.2 小测试 2. The Endor System2.1 部署应用2.2 检查现有策略 3. Cilium 策略实体3.1 创建 allow-all 网络策略3.2 在 Hubble CLI 中验证网络策略源3.3 …...

基础测试工具使用经验

背景 vtune&#xff0c;perf, nsight system等基础测试工具&#xff0c;都是用过的&#xff0c;但是没有记录&#xff0c;都逐渐忘了。所以写这篇博客总结记录一下&#xff0c;只要以后发现新的用法&#xff0c;就记得来编辑补充一下 perf 比较基础的用法&#xff1a; 先改这…...

《基于Apache Flink的流处理》笔记

思维导图 1-3 章 4-7章 8-11 章 参考资料 源码&#xff1a; https://github.com/streaming-with-flink 博客 https://flink.apache.org/bloghttps://www.ververica.com/blog 聚会及会议 https://flink-forward.orghttps://www.meetup.com/topics/apache-flink https://n…...

MySQL中【正则表达式】用法

MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现&#xff08;两者等价&#xff09;&#xff0c;用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例&#xff1a; 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...

(转)什么是DockerCompose?它有什么作用?

一、什么是DockerCompose? DockerCompose可以基于Compose文件帮我们快速的部署分布式应用&#xff0c;而无需手动一个个创建和运行容器。 Compose文件是一个文本文件&#xff0c;通过指令定义集群中的每个容器如何运行。 DockerCompose就是把DockerFile转换成指令去运行。 …...

Razor编程中@Html的方法使用大全

文章目录 1. 基础HTML辅助方法1.1 Html.ActionLink()1.2 Html.RouteLink()1.3 Html.Display() / Html.DisplayFor()1.4 Html.Editor() / Html.EditorFor()1.5 Html.Label() / Html.LabelFor()1.6 Html.TextBox() / Html.TextBoxFor() 2. 表单相关辅助方法2.1 Html.BeginForm() …...

[大语言模型]在个人电脑上部署ollama 并进行管理,最后配置AI程序开发助手.

ollama官网: 下载 https://ollama.com/ 安装 查看可以使用的模型 https://ollama.com/search 例如 https://ollama.com/library/deepseek-r1/tags # deepseek-r1:7bollama pull deepseek-r1:7b改token数量为409622 16384 ollama命令说明 ollama serve #&#xff1a…...

PostgreSQL——环境搭建

一、Linux # 安装 PostgreSQL 15 仓库 sudo dnf install -y https://download.postgresql.org/pub/repos/yum/reporpms/EL-$(rpm -E %{rhel})-x86_64/pgdg-redhat-repo-latest.noarch.rpm# 安装之前先确认是否已经存在PostgreSQL rpm -qa | grep postgres# 如果存在&#xff0…...