当前位置: 首页 > news >正文

牛客项目(五)-使用kafka实现发送系统通知

kafka入门以及与spring整合

在这里插入图片描述

Message.java

import java.util.Date;public class Message {private int id;private int fromId;private int toId;private String conversationId;private String content;private int status;private Date createTime;public int getId() {return id;}public void setId(int id) {this.id = id;}public int getFromId() {return fromId;}public void setFromId(int fromId) {this.fromId = fromId;}public int getToId() {return toId;}public void setToId(int toId) {this.toId = toId;}public String getConversationId() {return conversationId;}public void setConversationId(String conversationId) {this.conversationId = conversationId;}public String getContent() {return content;}public void setContent(String content) {this.content = content;}public int getStatus() {return status;}public void setStatus(int status) {this.status = status;}public Date getCreateTime() {return createTime;}public void setCreateTime(Date createTime) {this.createTime = createTime;}@Overridepublic String toString() {return "Message{" +"id=" + id +", fromId=" + fromId +", toId=" + toId +", conversationId='" + conversationId + '\'' +", content='" + content + '\'' +", status=" + status +", createTime=" + createTime +'}';}
}

EventConsumer.java

定义事件消费者

import com.alibaba.fastjson.JSONObject;
import edu.npu.newcoder.community.community.entity.DiscussPost;
import edu.npu.newcoder.community.community.entity.Event;
import edu.npu.newcoder.community.community.entity.Message;
import edu.npu.newcoder.community.community.service.DiscussPostService;
import edu.npu.newcoder.community.community.service.ElasticsearchService;
import edu.npu.newcoder.community.community.service.MessageService;
import edu.npu.newcoder.community.community.util.CommunityConstant;
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.util.Date;
import java.util.HashMap;
import java.util.Map;@Component
public class EventConsumer implements CommunityConstant {
//    private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class);@Autowiredprivate MessageService messageService;@Autowiredprivate DiscussPostService discussPostService;@Autowiredprivate ElasticsearchService elasticsearchService;//加一个监听相关主题的listener@KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})public void handleCommentMessage(ConsumerRecord record){if(record == null || record.value()==null){System.out.println("错误发帖");return;}Event event= JSONObject.parseObject(record.value().toString(),Event.class);if(event == null){System.out.println("错误发帖");return;}//发送站内通知Message message = new Message();message.setFromId(SYSTEM_USERID);message.setToId(event.getEntityUserId());message.setConversationId(event.getTopic());message.setCreateTime(new Date());//message的内容Map<String,Object> content=new HashMap<>();content.put("userId",event.getUserId());content.put("entityType",event.getEntityType());content.put("entityId",event.getEntityId());if(!event.getData().isEmpty()){for(Map.Entry<String,Object> entry:event.getData().entrySet()){content.put(entry.getKey(),entry.getValue());}}message.setContent(JSONObject.toJSONString(content));System.out.println(message);messageService.addMessage(message);System.out.println("成功处理事件");}}

Event.java

定义一个事件实体 以方便在消息的发送与处理

import java.util.HashMap;
import java.util.Map;//用于事件驱动的kafka消息队列开发
public class Event {private String topic;//事件触发的人private int userId;//事件发生在哪个实体private int entityType;private int entityId;//实体作者private int entityUserId;//存储额外数据private Map<String,Object> data = new HashMap<>();public String getTopic() {return topic;}public Event setTopic(String topic) {this.topic = topic;return this;}public int getUserId() {return userId;}public Event setUserId(int userId) {this.userId = userId;return this;}public int getEntityType() {return entityType;}public Event setEntityType(int entityType) {this.entityType = entityType;return this;}public int getEntityId() {return entityId;}public Event setEntityId(int entityId) {this.entityId = entityId;return this;}public int getEntityUserId() {return entityUserId;}public Event setEntityUserId(int entityUserId) {this.entityUserId = entityUserId;return this;}public Map<String, Object> getData() {return data;}public Event setData(String key,Object value) {this.data.put(key,value);return this;}}

EventProducer.java

定义事件的生产者

import com.alibaba.fastjson.JSONObject;
import edu.npu.newcoder.community.community.entity.Event;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;@Component
public class EventProducer {
//生产者使用kafkaTemplate发送消息@AutowiredKafkaTemplate kafkaTemplate;//处理事件public void fireEvent(Event event){//将事件发布到指定的主题//将event转换为json数据进行消息发送kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));System.out.println("成功发送"+event.getTopic());}
}

EventConsumer.java

定义事件消费者

import com.alibaba.fastjson.JSONObject;
import edu.npu.newcoder.community.community.entity.DiscussPost;
import edu.npu.newcoder.community.community.entity.Event;
import edu.npu.newcoder.community.community.entity.Message;
import edu.npu.newcoder.community.community.service.DiscussPostService;
import edu.npu.newcoder.community.community.service.ElasticsearchService;
import edu.npu.newcoder.community.community.service.MessageService;
import edu.npu.newcoder.community.community.util.CommunityConstant;
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.util.Date;
import java.util.HashMap;
import java.util.Map;@Component
public class EventConsumer implements CommunityConstant {
//    private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class);@Autowiredprivate MessageService messageService;@Autowiredprivate DiscussPostService discussPostService;@Autowiredprivate ElasticsearchService elasticsearchService;//加一个监听相关主题的listener@KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})public void handleCommentMessage(ConsumerRecord record){if(record == null || record.value()==null){System.out.println("错误发帖");return;}Event event= JSONObject.parseObject(record.value().toString(),Event.class);if(event == null){System.out.println("错误发帖");return;}//发送站内通知Message message = new Message();message.setFromId(SYSTEM_USERID);message.setToId(event.getEntityUserId());message.setConversationId(event.getTopic());message.setCreateTime(new Date());//message的内容Map<String,Object> content=new HashMap<>();content.put("userId",event.getUserId());content.put("entityType",event.getEntityType());content.put("entityId",event.getEntityId());if(!event.getData().isEmpty()){for(Map.Entry<String,Object> entry:event.getData().entrySet()){content.put(entry.getKey(),entry.getValue());}}message.setContent(JSONObject.toJSONString(content));System.out.println(message);messageService.addMessage(message);System.out.println("成功处理事件");}}

在特定的地方触发消息产生

CommentController

 //触发评论事件Event event=new Event().setTopic(TOPIC_COMMENT).setUserId(hostHolder.getUser().getId()).setEntityType(comment.getEntityType()).setEntityId(comment.getEntityId()).setData("postId",discussPostId);if(comment.getEntityType() == ENTITY_TYPE_POST){DiscussPost target=discussPostService.findDiscussPostById(comment.getEntityId());event.setEntityUserId(target.getUserId());}else if(comment.getEntityType()==ENTITY_TYPE_COMMENT){//根据评论的id查询评论Comment target =commentService.findCommentById(comment.getEntityId());event.setEntityUserId(target.getUserId());}eventProducer.fireEvent(event);

LikeController

 //触发点赞事件if(likeStatus ==1){Event event =new Event().setTopic(TOPIC_LIKE).setUserId(hostHolder.getUser().getId()).setEntityType(entityType).setEntityId(entityId).setEntityUserId(entityUserId).setData("postId",postId);eventProducer.fireEvent(event);}

FollowController

 //触发关注事件Event event = new Event().setTopic(TOPIC_FOLLOW).setUserId(hostHolder.getUser().getId()).setEntityType(entityType).setEntityId(entityId).setEntityUserId(entityId);eventProducer.fireEvent(event);

相关文章:

牛客项目(五)-使用kafka实现发送系统通知

kafka入门以及与spring整合 Message.java import java.util.Date;public class Message {private int id;private int fromId;private int toId;private String conversationId;private String content;private int status;private Date createTime;public int getId() {retur…...

计算机网络——第一章时延部分深入学习、相关习题及详细解析

目录 时延相关 习题1 习题1-改 习题2 时延相关 之前我们学习过&#xff0c;时延由发送时延、传播时延和处理时延三部分构成。 发送时延的计算公式为“分组长度除以发送速率”&#xff0c; 发送速率应该从网卡速率、信道带宽、以及对端的接口速率中取最小。 传播时延的计…...

CSS3媒体查询与页面自适应

2017年9月&#xff0c;W3C发布媒体查询(Media Query Level 4)候选推荐标准规范&#xff0c;它扩展了已经发布的媒体查询的功能。该规范用于CSS的media规则&#xff0c;可以为文档设定特定条件的样式&#xff0c;也可以用于HTML、JavaScript等语言。 1、媒体查询基础 媒体查询…...

UG\NX二次开发 超长的对象属性值,怎么设置

文章作者:里海 来源网站:里海NX二次开发3000例专栏 感谢粉丝订阅 感谢 Dr. Lin 订阅本专栏,非常感谢。 简介 使用UF_ATTR_assign设置对象属性,如果属性值超过UF_ATTR_MAX_STRING_LEN则会报错。 #define UF_ATTR_MAX_STRING_LEN 132 怎么办呢?下面这种方法可以解决: 效果 …...

流媒体服务实现H5实时预览视频

目录 背景方案业务实践细节注意 待办 背景 客户aws服务磁盘存储告急&#xff0c;最高可扩容16T。排查如下&#xff1a;主要是视频文件存在大量复制使用的情况。例如发布节目时复制、预览时复制&#xff0c;这样上传一份视频后最大会有四份拷贝&#xff08;预览、普通发布、互动…...

C++适配器

文章目录 引言栈和队列 priority_queue仿函数迭代器区间 引言 栈的特性是先进后出&#xff0c;队列的特性是先进先出&#xff0c;然而双向队列同时具有栈和队列的特性&#xff0c;所以我们可以通过双向队列来适配出栈和队列。 先看库里面 栈和队列 stack和queue模板参数里面都…...

基于openresty waf二次开发多次匹配到的ip再做拉黑

我们想在openresty waf的基础上做二次开发&#xff0c;比如再精确一些。比如我们先匹配到了select的url我们先打分10分&#xff0c;匹配到cc 1000/s我们再给这个ip打10分…直到100分我们就拉黑这个ip。 [openresty waf][1] #cat reids_w.lua require lib local redis require…...

新一代构建工具Vite-xyphf

一、什么vite? vite:是一款思维比较前卫而且先进的构建工具,他解决了一些webpack解决不了的问题——在开发环境下可以实现按需编译&#xff0c;加快了开发速度。而在生产环境下&#xff0c;它使用Rollup进行打包&#xff0c;提供更好的tree-shaking、代码压缩和性能优化&…...

Flink源码解析三之执行计划⽣成

JobManager Leader 选举 首先flink会依据配置获取RecoveryMode,RecoveryMode一共两两种:STANDALONE和ZOOKEEPER。 如果用户配置的是STANDALONE,会直接去配置中获取JobManager的地址如果用户配置的是ZOOKEEPER,flink会首先尝试连接zookeeper,利用zookeeper的leadder选举服务发现…...

Flutter 常见错误记录总结

1、当 flutter pub get 指令报如下错误时&#xff1a; pub get failed command: "/Users/***/developer/flutter/bin/cache/dart-sdk/bin/dart __deprecated_pub --color --directory . get --example" pub env: { "FLUTTER_ROOT": "/Users/***/dev…...

[ASP]校无忧在线报名系统 v2.1

校无忧在线报名系统为了满足各地不同的报名人员的需求&#xff0c;为提供更为高效、方便、快捷的报名条件&#xff0c;同时也为减轻管理人员的工作难度&#xff1b;更为协调报名人员与管理人员的关系&#xff0c;快速提高了报名人员与管理人员的工作效率应运而生。系统适用于政…...

【Hydro】部分基流分割方法及程序代码说明

目录 说明一、数字滤波法单参数数字滤波Lyne-Hollick滤波法Chapman滤波法Chapman-Maxwell滤波法Boughton-Chapman滤波法 双参数滤波法Eckhardt滤波法 二、其他基流分割方法基流指数&#xff08;BFI&#xff09;法时间步长&#xff08;HYSEP&#xff09;法PART法加里宁-阿里巴扬…...

C#Regex正则表达式(Regular Expression)

在C#中&#xff0c;Regex是正则表达式&#xff08;Regular Expression&#xff09;的缩写&#xff0c;它是一种强大的文本匹配和处理工具。正则表达式是一种用于描述模式的字符串&#xff0c;它可以用来在文本中查找、替换和提取满足特定模式的内容。 在C#中&#xff0c;你可以…...

Wi-Fi还可以做什么?柯南解释IOT应用

大会报告&#xff1a;无线人工智能技术正在改变世界 Wi-Fi还可以做什么&#xff1f;随着带宽的提升&#xff0c;无线终端可以识别出更多的多径&#xff0c;每条多径都可以视作一个虚拟传感器&#xff0c;以感知周边环境。基于此&#xff0c;越来越多的无线感知产品应运而生。20…...

centos部署java程序

后台启动java程序 nohup java -jar -XX:HeapDumpOnOutOfMemoryError -XX:HeapDumpPath/data/app1/logs/ /data/app1.jar --spring.config.location/data/app1/config/application.properties,/data/app1/config/application-dev.properties > /data/app1/logs 2>&1…...

Sqoop导入到Hive,Hive使用 HA

Sqoop写入Hive卡在连接Hive的JDBC上不执行 Sqoop访问 启用 HA模式的Hive 找到Hive的安装根目录&#xff1a;$HIVE_HOME/conf 创建一个新的配置文件&#xff1a;beeline-hs2-connection.xml <?xml version"1.0"?> <?xml-stylesheet type"text/xsl…...

[笔记] %的含义

取模 不赘述。 引导符 重点说一下在printf("%d", n);中的意思。 这里的意思是&#xff1a;将""外对应位置的结果返回给引导符所在的位置&#xff0c; %后面跟着的是结果对应的数据类型&#xff0c; 只有数据类型匹配才能正确输出结果。...

FRI及相关SNARKs的Fiat-Shamir安全

1. 引言 本文主要参考&#xff1a; Alexander R. Block 2023年论文 Fiat-Shamir Security of FRI and Related SNARKsAlbert Garreta 2023年9月在ZK Summit 10上分享 ZK10: Fiat-Shamir security of FRI and related SNARKs - Albert Garreta (Nethermind) 评估参数用的Sage…...

TensorFlow案例学习:使用 YAMNet 进行迁移学习,对音频进行识别

前言 上一篇文章 TensorFlow案例学习&#xff1a;简单的音频识别 我们简单学习了音频识别。这次我们继续学习如何使用成熟的语音分类模型来进行迁移学习 官方教程&#xff1a; 使用 YAMNet 进行迁移学习&#xff0c;用于环境声音分类 模型下载地址&#xff08;需要科学上网&…...

MySQL CHAR 和 VARCHAR 的区别

文章目录 1.区别1.1 存储方式不同1.2 最大长度不同1.3 尾随空格处理方式不同1.4 读写效率不同 2.小结参考文献 在 MySQL 中&#xff0c;CHAR 和 VARCHAR 是两种不同的文本数据类型&#xff0c;CHAR 和 VARCHAR 类型声明时需要指定一个长度&#xff0c;该长度指示您希望存储的最…...

使用van-uploader 的UI组件,结合vue2如何实现图片上传组件的封装

以下是基于 vant-ui&#xff08;适配 Vue2 版本 &#xff09;实现截图中照片上传预览、删除功能&#xff0c;并封装成可复用组件的完整代码&#xff0c;包含样式和逻辑实现&#xff0c;可直接在 Vue2 项目中使用&#xff1a; 1. 封装的图片上传组件 ImageUploader.vue <te…...

微信小程序云开发平台MySQL的连接方式

注&#xff1a;微信小程序云开发平台指的是腾讯云开发 先给结论&#xff1a;微信小程序云开发平台的MySQL&#xff0c;无法通过获取数据库连接信息的方式进行连接&#xff0c;连接只能通过云开发的SDK连接&#xff0c;具体要参考官方文档&#xff1a; 为什么&#xff1f; 因为…...

智能仓储的未来:自动化、AI与数据分析如何重塑物流中心

当仓库学会“思考”&#xff0c;物流的终极形态正在诞生 想象这样的场景&#xff1a; 凌晨3点&#xff0c;某物流中心灯火通明却空无一人。AGV机器人集群根据实时订单动态规划路径&#xff1b;AI视觉系统在0.1秒内扫描包裹信息&#xff1b;数字孪生平台正模拟次日峰值流量压力…...

浅谈不同二分算法的查找情况

二分算法原理比较简单&#xff0c;但是实际的算法模板却有很多&#xff0c;这一切都源于二分查找问题中的复杂情况和二分算法的边界处理&#xff0c;以下是博主对一些二分算法查找的情况分析。 需要说明的是&#xff0c;以下二分算法都是基于有序序列为升序有序的情况&#xf…...

ArcGIS Pro制作水平横向图例+多级标注

今天介绍下载ArcGIS Pro中如何设置水平横向图例。 之前我们介绍了ArcGIS的横向图例制作&#xff1a;ArcGIS横向、多列图例、顺序重排、符号居中、批量更改图例符号等等&#xff08;ArcGIS出图图例8大技巧&#xff09;&#xff0c;那这次我们看看ArcGIS Pro如何更加快捷的操作。…...

Springboot社区养老保险系统小程序

一、前言 随着我国经济迅速发展&#xff0c;人们对手机的需求越来越大&#xff0c;各种手机软件也都在被广泛应用&#xff0c;但是对于手机进行数据信息管理&#xff0c;对于手机的各种软件也是备受用户的喜爱&#xff0c;社区养老保险系统小程序被用户普遍使用&#xff0c;为方…...

基于Java Swing的电子通讯录设计与实现:附系统托盘功能代码详解

JAVASQL电子通讯录带系统托盘 一、系统概述 本电子通讯录系统采用Java Swing开发桌面应用&#xff0c;结合SQLite数据库实现联系人管理功能&#xff0c;并集成系统托盘功能提升用户体验。系统支持联系人的增删改查、分组管理、搜索过滤等功能&#xff0c;同时可以最小化到系统…...

Linux离线(zip方式)安装docker

目录 基础信息操作系统信息docker信息 安装实例安装步骤示例 遇到的问题问题1&#xff1a;修改默认工作路径启动失败问题2 找不到对应组 基础信息 操作系统信息 OS版本&#xff1a;CentOS 7 64位 内核版本&#xff1a;3.10.0 相关命令&#xff1a; uname -rcat /etc/os-rele…...

浪潮交换机配置track检测实现高速公路收费网络主备切换NQA

浪潮交换机track配置 项目背景高速网络拓扑网络情况分析通信线路收费网络路由 收费汇聚交换机相应配置收费汇聚track配置 项目背景 在实施省内一条高速公路时遇到的需求&#xff0c;本次涉及的主要是收费汇聚交换机的配置&#xff0c;浪潮网络设备在高速项目很少&#xff0c;通…...

IP如何挑?2025年海外专线IP如何购买?

你花了时间和预算买了IP&#xff0c;结果IP质量不佳&#xff0c;项目效率低下不说&#xff0c;还可能带来莫名的网络问题&#xff0c;是不是太闹心了&#xff1f;尤其是在面对海外专线IP时&#xff0c;到底怎么才能买到适合自己的呢&#xff1f;所以&#xff0c;挑IP绝对是个技…...