JavaWeb_LeadNews_Day6-Kafka
JavaWeb_LeadNews_Day6-Kafka
- Kafka
- 概述
- 安装配置
- kafka入门
- kafka高可用方案
- kafka详解
- 生产者同步异步发送消息
- 生产者参数配置
- 消费者同步异步提交偏移量
- SpringBoot集成kafka
- 自媒体文章上下架
- 实现思路
- 具体实现
- 来源
- Gitee
Kafka
概述
- 对比

- 选择

- 介绍
- producer: 发布消息的对象称之为主题生产者 (Kafka topic producer)
- topic: Kafka将消息分门别类,每一类的消息称之为一个主题 (Topic)
- consumer:订阅消息并处理发布的消息的对象称之为主题消费者 (consumers)
- broker:已发布的消息保存在一组服务器中,称之为Kafka集群,集群中的每一个服务器都是一个代理(Broker)。消费者可以订阅个或多个主题 (topic),并从Broker拉数据,从而消费这些已发布的消息
安装配置
- 安装zookeeper
// 下载zookeeper镜像 docker pull zookeeper:3.4.14 // 创建容器 docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14 - 安装kafka
// 下载kafka镜像 docker pull wurstmeister/kafka:2.12-2.3.1 // 创建容器 docker run -d --name kafka \ --env KAFKA_ADVERTISED_HOST_NAME=192.168.174.133 \ --env KAFKA_ZOOKEEPER_CONNECT=192.168.174.133:2181 \ --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.174.133:9092 \ --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \ --env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \ --net=host wurstmeister/kafka:2.12-2.3.1// 解释 --net=host,直接使用容器宿主机的网络命名空间,即没有独立的网络环境。它使用宿主机的ip和端口(云主机会不好使)
kafka入门
- 依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId> </dependency> - Producer
public class ProducerQuickStart {public static void main(String[] args) {// 1. kafka链接配置信息Properties prop = new Properties();// 1.1 kafka链接地址prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.174.133:9092");// 1.2 key和value的序列化prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 2. 创建kafka生产者对象KafkaProducer<String, String> producer = new KafkaProducer<>(prop);// 3. 发送信息// 参数列表: topic, key, valueProducerRecord<String, String> record = new ProducerRecord<>("topic-first", "key1", "Hello Kafka!");producer.send(record);// 4. 关闭消息通道// 必须关闭, 否则消息发送bucgproducer.close();} } - Consumer
public class ConsumerQuickStart {public static void main(String[] args) {// 1. kafka的配置信息Properties prop = new Properties();// 1.1 链接地址prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.174.133:9092");// 1.2 key和value的反序列化器prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 1.3 设置消费者组prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");// 2. 创建消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);// 3. 订阅主题consumer.subscribe(Collections.singleton("topic-first"));// 4. 拉取信息while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.key());System.out.println(record.value());}}} } - 总结
- 同一组只有一个消费者能够接收到消息, 如果需要所有消费者都能接收到消息, 需要消费者在不同的组
kafka高可用方案
-
集群

-
备份

kafka定义了两类副本:- 领导者副本
- 追随者副本
数据在领导者副本存储后, 会同步到追随者副本

同步方式
leader失效后, 选择leader的原则- 优先从ISR中选取, 因为ISR的数据和leader是同步的.
- ISR中的follower都不行了, 就从其他的follower中选取.
- 当所有的follower都失效了, 第一种是等待ISR中的follower活过来, 数据可靠, 但等待时间不确定, 第二种是等待任意follower活过来, 最快速度恢复可用性, 但数据不一定完整.
kafka详解
生产者同步异步发送消息
// 同步发送
RecordMetadata metadata = producer.send(record).get();
System.out.println(metadata.offset());// 异步发送
producer.send(record, new Callback(){@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e != null) {System.out.println("记录异常信息到日志表中");}System.out.println(recordMetadata.offset());}
});
生产者参数配置
- 消息确认
确认机制 说明 acks=0 生产者在成功写入消息之前不会等待任何来自服务器的响应,消息有丢失的风险,但是速度最快 acks=1(默认值) 只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应 acks=all 只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应 prop.put(ProducerConfig.ACKS_CONFIG, "all"); - 消息重传
设置消息重传次数, 默认每次重试之间等待100msprop.put(ProducerConfig.RETRIES_CONFIG, 10); - 消息压缩
默认情况, 消息发送不会压缩
使用压缩可以降低网络传输开销和存储开销, 而这往往是向kafka发送消息的瓶颈所在压缩算法 说明 snappy 占用较少的 CPU,却能提供较好的性能和相当可观的压缩比,如果看重性能和网络带宽,建议采用 lz4 占用较少的 CPU,压缩和解压缩速度较快,压缩比也很客观 gzip 占用较多的CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法 prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
消费者同步异步提交偏移量
// 同步提交偏移量
consumer.commitSync();// 异步提交偏移量
consumer.commitAsync(new OffsetCommitCallback(){@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {if(e!=null){System.out.println("记录错误的提交偏移量"+map+", 异常信息为"+e);}}
});// 同步异步提交
try {while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.key());System.out.println(record.value());System.out.println(record.partition());System.out.println(record.offset());}// 异步提交偏移量consumer.commitAsync();}
} catch (Exception e) {e.printStackTrace();System.out.println("记录错误的信息:"+e);
}finally {// 同步consumer.commitSync();
}
SpringBoot集成kafka
- 依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId> </dependency> <dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId> </dependency> - 配置
server:port: 9991 spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.174.133:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer - Producer
@RestController public class HelloController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("/hello")public String hello(){kafkaTemplate.send("itcast-topic", "黑马程序员");return "ok";} } - Consumer
@Component public class HelloListener {@KafkaListener(topics = "itcast-topic")public void onMessage(String message){if(!StringUtils.isEmpty(message)){System.out.println(message);}} } - 传递对象
// Producer User user = new User(); user.setName("tom"); user.setAge(18); kafkaTemplate.send("itcast-topic", JSON.toJSONString(user));// Consumer System.out.println(JSON.parseObject(message, User.class));
自媒体文章上下架
实现思路

具体实现
- Producer
public ResponseResult downOrUp(WmNewsDto dto) {// 1. 检验参数// 1.0 检查文章dto是否为空if(dto == null){return ResponseResult.errorRe(AppHttpCodeEnum.PARAM_INVALID, "不可缺少");}// 1.1 检查文章上架参数是否合法if(dto.getEnable() != 0 && dto.getEnabl!= 1){// 默认上架dto.setEnable((short) 1);}// 2. 查询文章WmNews news = getById(dto.getId());if(news == null){return ResponseResult.errorRe(AppHttpCodeEnum.DATA_NOT_EXIST, 存在");}// 3. 查询文章状态if(news.getStatus() != WmNews.StaPUBLISHED.getCode()){return ResponseResult.errorRe(AppHttpCodeEnum.PARAM_INVALID, 章不是发布状态, 不能上下架");}// 4. 上下架news.setEnable(dto.getEnable());updateById(new// 5. 发送消息, 通知article修改文章的配置if(news.getArticleId() != null){HashMap<String, Object> map = HashMap<>();map.put("articleId", news.getArtic());map.put("enable", news.getEnable());kafkaTemplate.(WmNewsMessageConstaWM_NEWS_UP_OR_DOWN_TOPIC, JtoJSONString(map));return ResponseResult.okRe(AppHttpCodeEnum.SUCCESS); } - Consumer
// Listener
@KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
public void onMessage(String message)
{if(StringUtils.isNotBlank(message)){Map map = JSON.parseObject(message, Map.class);apArticleConfigService.updateByMap(map);}
}// Service
public void updateByMap(Map map) {// 0 下架, 1 上架Object enable = map.get("enable");boolean isDown = true;if(enable.equals(1)){isDown = false;}// 修改文章update(Wrappers.<ApArticleConfig>lambdaUpdate().eq(ApArticleConfig::getArticleId, map.get("articleId")).set(ApArticleConfig::getIsDown, isDown));
}
来源
黑马程序员. 黑马头条
Gitee
https://gitee.com/yu-ba-ba-ba/leadnews
相关文章:
JavaWeb_LeadNews_Day6-Kafka
JavaWeb_LeadNews_Day6-Kafka Kafka概述安装配置kafka入门kafka高可用方案kafka详解生产者同步异步发送消息生产者参数配置消费者同步异步提交偏移量 SpringBoot集成kafka 自媒体文章上下架实现思路具体实现 来源Gitee Kafka 概述 对比 选择 介绍 producer: 发布消息的对象称…...
ATTCK覆盖度97.1%!360终端安全管理系统获赛可达认证
近日,国际知名第三方网络安全检测服务机构——赛可达实验室(SKD Labs)发布最新测试报告,360终端安全管理系统以ATT&CK V12框架攻击技术覆盖面377个、覆盖度97.1%,勒索病毒、挖矿病毒检出率100%,误报率0…...
透视俄乌网络战之一:数据擦除软件
数据擦除破坏 1. WhisperGate2. HermeticWiper3. IsaacWiper4. WhisperKill5. CaddyWiper6. DoubleZero7. AcidRain8. RURansom 数据是政府、社会和企业组织运行的关键要素。数据擦除软件可以在不留任何痕迹的情况下擦除数据并阻止操作系统恢复摧,达到摧毁或目标系统…...
微服务中间件--Nacos
Nacos 1. Nacos入门a.服务注册到Nacosb.Nacos服务分级存储模型c.NacosRule负载均衡d.服务实例的权重设置e.环境隔离 - namespacef.Nacos和Eureka的对比 2. Nacos配置管理a.统一配置管理b.配置热更新c.多环境配置共享 1. Nacos入门 Nacos是阿里巴巴的产品,现在是Spr…...
驱动开发点亮led灯
头文件 #ifndef __HEAD_H__ #define __HEAD_H__#define PHY_LED_MODER 0X50006000 #define PHY_LED_ODR 0X50006014 #define PHY_LED_RCC 0X50000A28 #define PHY_LED_FMODER 0X50007000 #define PHY_LED_FODR 0X50007014#endif驱动代码 #include <linux/init.h> #incl…...
回归预测 | MATLAB实现IPSO-SVM改进粒子群优化算法优化支持向量机多输入单输出回归预测(多指标,多图)
回归预测 | MATLAB实现IPSO-SVM改进粒子群优化算法优化支持向量机多输入单输出回归预测(多指标,多图) 目录 回归预测 | MATLAB实现IPSO-SVM改进粒子群优化算法优化支持向量机多输入单输出回归预测(多指标,多图…...
数学建模之“TOPSIS数学模型”原理和代码详解
一、简介 TOPSIS(Technique for Order Preference by Similarity to Ideal Solution)是一种多准则决策分析方法,用于解决多个候选方案之间的排序和选择问题。它基于一种数学模型,通过比较每个候选方案与理想解和负理想解之间的相…...
threejs使用gui改变相机的参数
调节相机远近角度 定义相机的配置: const cameraConfg reactive({ fov: 45 }) gui中加入调节fov的方法 const gui new dat.GUI();const cameraFolder gui.addFolder("相机属性设置");cameraFolder.add(cameraConfg, "fov", 0, 100).name(…...
计算机竞赛 图像识别-人脸识别与疲劳检测 - python opencv
文章目录 0 前言1 课题背景2 Dlib人脸识别2.1 简介2.2 Dlib优点2.3 相关代码2.4 人脸数据库2.5 人脸录入加识别效果 3 疲劳检测算法3.1 眼睛检测算法3.3 点头检测算法 4 PyQt54.1 简介4.2相关界面代码 5 最后 0 前言 🔥 优质竞赛项目系列,今天要分享的是…...
PHP8的字符串操作3-PHP8知识详解
今天继续分享字符串的操作,前面说到了字符串的去除空格和特殊字符,获取字符串的长度,截取字符串、检索字符串。 今天继续分享字符串的其他操作。如:替换字符串、分割和合成字符串。 5、替换字符串 替换字符串就是对指定字符串中…...
Unity VR:XR Interaction Toolkit 输入系统(Input System):获取手柄的输入
文章目录 📕教程说明📕Input System 和 XR Input Subsystem(推荐 Input System)📕Input Action Asset⭐Actions Maps⭐Actions⭐Action Properties🔍Action Type (Value, Button, Pass through) ⭐Binding …...
智慧工地一体化云平台源码:监管端、工地端、危大工程、智慧大屏、物联网、塔机、吊钩、升降机
智慧工地解决方案依托计算机技术、物联网、云计算、大数据、人工智能、VR&AR等技术相结合,为工程项目管理提供先进技术手段,构建工地现场智能监控和控制体系,弥补传统方法在监管中的缺陷,最终实现项目对人、机、料、法、环的全…...
C# 表达式体方法 C#算阶乘
//表达式体方法private int Add(int a, int b) > a b;[Fact]public void Test(){var result1 Factorial(1);//1var result2 Factorial(2);//2var result3 Factorial(3);//6var result4 Factorial(4);//24var result5 Factorial(5);//120var result6 Add(100, 200);//…...
互联网发展历程:保护与隔离,防火墙的安全壁垒
互联网的快速发展,不仅带来了便利和连接,也引发了越来越多的安全威胁。在数字时代,保护数据和网络安全变得尤为重要。然而,在早期的网络中,安全问题常常让人担忧。 安全问题的困扰:网络威胁日益增加 随着互…...
基于IMX6ULLmini的linux裸机开发系列七:中断处理流程
中断上下文 cpu通过内核寄存器来运行指令并进行数据的读写处理的,它在进入中断前一个时刻的具体值,称为中断上下文 中断上下文是指CPU在进入中断之前保存的寄存器状态和其他相关信息。当CPU接收到中断请求时,它会保存当前正在执行的指令的状…...
Postman软件基本用法:浏览器复制请求信息并导入到软件从而测试、发送请求
本文介绍在浏览器中,获取网页中的某一个请求信息,并将其导入到Postman软件,并进行API请求测试的方法。 Postman是一款流行的API开发和测试工具,它提供了一个用户友好的界面,用于创建、测试、调试和文档化API。本文就介…...
react go实现用户历史登录列表页面
refer: http://ip-api.com/ 1.首先需要创建一个保存用户历史的登录的表,然后连接go 2.在用户登录的时候,获取用户的IP IP位置,在后端直接处理数据即可(不需要在前端传递数据) (1)增加路由&am…...
如何做好服务性能测试
一、什么是性能测试 新功能上线或切换底层数据库或扩容调优,根据实际业务场景的需要,做必要的性能压测,收集性能数据,作为上线的基准报告。 性能测试一般分一下几个阶段: 1. 性能测试 并发量小(jmeter 并…...
速通蓝桥杯嵌入式省一教程:(五)用按键和屏幕实现嵌入式交互系统
一个完整的嵌入式系统,包括任务执行部分和人机交互部分。在前四节中,我们已经讲解了LED、LCD和按键,用这三者就能够实现一个人机交互系统,也即搭建整个嵌入式系统的框架。在后续,只要将各个功能加入到这个交互系统中&a…...
虚拟拍摄,如何用stable diffusion制作自己的形象照?
最近收到了某活动的嘉宾邀请,我将分享: 主题:生成式人工智能的创新实践 简要描述:从品牌营销、智能体、数字内容创作、下一代社区范式等方面,分享LLM与图像等生成式模型的落地应用与实践经验。 领域/研究方向ÿ…...
业务系统对接大模型的基础方案:架构设计与关键步骤
业务系统对接大模型:架构设计与关键步骤 在当今数字化转型的浪潮中,大语言模型(LLM)已成为企业提升业务效率和创新能力的关键技术之一。将大模型集成到业务系统中,不仅可以优化用户体验,还能为业务决策提供…...
idea大量爆红问题解决
问题描述 在学习和工作中,idea是程序员不可缺少的一个工具,但是突然在有些时候就会出现大量爆红的问题,发现无法跳转,无论是关机重启或者是替换root都无法解决 就是如上所展示的问题,但是程序依然可以启动。 问题解决…...
基于大模型的 UI 自动化系统
基于大模型的 UI 自动化系统 下面是一个完整的 Python 系统,利用大模型实现智能 UI 自动化,结合计算机视觉和自然语言处理技术,实现"看屏操作"的能力。 系统架构设计 #mermaid-svg-2gn2GRvh5WCP2ktF {font-family:"trebuchet ms",verdana,arial,sans-…...
MongoDB学习和应用(高效的非关系型数据库)
一丶 MongoDB简介 对于社交类软件的功能,我们需要对它的功能特点进行分析: 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具: mysql:关系型数据库&am…...
Swift 协议扩展精进之路:解决 CoreData 托管实体子类的类型不匹配问题(下)
概述 在 Swift 开发语言中,各位秃头小码农们可以充分利用语法本身所带来的便利去劈荆斩棘。我们还可以恣意利用泛型、协议关联类型和协议扩展来进一步简化和优化我们复杂的代码需求。 不过,在涉及到多个子类派生于基类进行多态模拟的场景下,…...
CMake基础:构建流程详解
目录 1.CMake构建过程的基本流程 2.CMake构建的具体步骤 2.1.创建构建目录 2.2.使用 CMake 生成构建文件 2.3.编译和构建 2.4.清理构建文件 2.5.重新配置和构建 3.跨平台构建示例 4.工具链与交叉编译 5.CMake构建后的项目结构解析 5.1.CMake构建后的目录结构 5.2.构…...
关于iview组件中使用 table , 绑定序号分页后序号从1开始的解决方案
问题描述:iview使用table 中type: "index",分页之后 ,索引还是从1开始,试过绑定后台返回数据的id, 这种方法可行,就是后台返回数据的每个页面id都不完全是按照从1开始的升序,因此百度了下,找到了…...
连锁超市冷库节能解决方案:如何实现超市降本增效
在连锁超市冷库运营中,高能耗、设备损耗快、人工管理低效等问题长期困扰企业。御控冷库节能解决方案通过智能控制化霜、按需化霜、实时监控、故障诊断、自动预警、远程控制开关六大核心技术,实现年省电费15%-60%,且不改动原有装备、安装快捷、…...
P3 QT项目----记事本(3.8)
3.8 记事本项目总结 项目源码 1.main.cpp #include "widget.h" #include <QApplication> int main(int argc, char *argv[]) {QApplication a(argc, argv);Widget w;w.show();return a.exec(); } 2.widget.cpp #include "widget.h" #include &q…...
Redis数据倾斜问题解决
Redis 数据倾斜问题解析与解决方案 什么是 Redis 数据倾斜 Redis 数据倾斜指的是在 Redis 集群中,部分节点存储的数据量或访问量远高于其他节点,导致这些节点负载过高,影响整体性能。 数据倾斜的主要表现 部分节点内存使用率远高于其他节…...
