JavaWeb_LeadNews_Day6-Kafka
JavaWeb_LeadNews_Day6-Kafka
- Kafka
- 概述
- 安装配置
- kafka入门
- kafka高可用方案
- kafka详解
- 生产者同步异步发送消息
- 生产者参数配置
- 消费者同步异步提交偏移量
- SpringBoot集成kafka
- 自媒体文章上下架
- 实现思路
- 具体实现
- 来源
- Gitee
Kafka
概述
- 对比
- 选择
- 介绍
- producer: 发布消息的对象称之为主题生产者 (Kafka topic producer)
- topic: Kafka将消息分门别类,每一类的消息称之为一个主题 (Topic)
- consumer:订阅消息并处理发布的消息的对象称之为主题消费者 (consumers)
- broker:已发布的消息保存在一组服务器中,称之为Kafka集群,集群中的每一个服务器都是一个代理(Broker)。消费者可以订阅个或多个主题 (topic),并从Broker拉数据,从而消费这些已发布的消息
安装配置
- 安装zookeeper
// 下载zookeeper镜像 docker pull zookeeper:3.4.14 // 创建容器 docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
- 安装kafka
// 下载kafka镜像 docker pull wurstmeister/kafka:2.12-2.3.1 // 创建容器 docker run -d --name kafka \ --env KAFKA_ADVERTISED_HOST_NAME=192.168.174.133 \ --env KAFKA_ZOOKEEPER_CONNECT=192.168.174.133:2181 \ --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.174.133:9092 \ --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \ --env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \ --net=host wurstmeister/kafka:2.12-2.3.1// 解释 --net=host,直接使用容器宿主机的网络命名空间,即没有独立的网络环境。它使用宿主机的ip和端口(云主机会不好使)
kafka入门
- 依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId> </dependency>
- Producer
public class ProducerQuickStart {public static void main(String[] args) {// 1. kafka链接配置信息Properties prop = new Properties();// 1.1 kafka链接地址prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.174.133:9092");// 1.2 key和value的序列化prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 2. 创建kafka生产者对象KafkaProducer<String, String> producer = new KafkaProducer<>(prop);// 3. 发送信息// 参数列表: topic, key, valueProducerRecord<String, String> record = new ProducerRecord<>("topic-first", "key1", "Hello Kafka!");producer.send(record);// 4. 关闭消息通道// 必须关闭, 否则消息发送bucgproducer.close();} }
- Consumer
public class ConsumerQuickStart {public static void main(String[] args) {// 1. kafka的配置信息Properties prop = new Properties();// 1.1 链接地址prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.174.133:9092");// 1.2 key和value的反序列化器prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 1.3 设置消费者组prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");// 2. 创建消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);// 3. 订阅主题consumer.subscribe(Collections.singleton("topic-first"));// 4. 拉取信息while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.key());System.out.println(record.value());}}} }
- 总结
- 同一组只有一个消费者能够接收到消息, 如果需要所有消费者都能接收到消息, 需要消费者在不同的组
kafka高可用方案
-
集群
-
备份
kafka定义了两类副本:- 领导者副本
- 追随者副本
数据在领导者副本存储后, 会同步到追随者副本
同步方式
leader失效后, 选择leader的原则- 优先从ISR中选取, 因为ISR的数据和leader是同步的.
- ISR中的follower都不行了, 就从其他的follower中选取.
- 当所有的follower都失效了, 第一种是等待ISR中的follower活过来, 数据可靠, 但等待时间不确定, 第二种是等待任意follower活过来, 最快速度恢复可用性, 但数据不一定完整.
kafka详解
生产者同步异步发送消息
// 同步发送
RecordMetadata metadata = producer.send(record).get();
System.out.println(metadata.offset());// 异步发送
producer.send(record, new Callback(){@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e != null) {System.out.println("记录异常信息到日志表中");}System.out.println(recordMetadata.offset());}
});
生产者参数配置
- 消息确认
确认机制 说明 acks=0 生产者在成功写入消息之前不会等待任何来自服务器的响应,消息有丢失的风险,但是速度最快 acks=1(默认值) 只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应 acks=all 只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应 prop.put(ProducerConfig.ACKS_CONFIG, "all");
- 消息重传
设置消息重传次数, 默认每次重试之间等待100msprop.put(ProducerConfig.RETRIES_CONFIG, 10);
- 消息压缩
默认情况, 消息发送不会压缩
使用压缩可以降低网络传输开销和存储开销, 而这往往是向kafka发送消息的瓶颈所在压缩算法 说明 snappy 占用较少的 CPU,却能提供较好的性能和相当可观的压缩比,如果看重性能和网络带宽,建议采用 lz4 占用较少的 CPU,压缩和解压缩速度较快,压缩比也很客观 gzip 占用较多的CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法 prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
消费者同步异步提交偏移量
// 同步提交偏移量
consumer.commitSync();// 异步提交偏移量
consumer.commitAsync(new OffsetCommitCallback(){@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {if(e!=null){System.out.println("记录错误的提交偏移量"+map+", 异常信息为"+e);}}
});// 同步异步提交
try {while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.key());System.out.println(record.value());System.out.println(record.partition());System.out.println(record.offset());}// 异步提交偏移量consumer.commitAsync();}
} catch (Exception e) {e.printStackTrace();System.out.println("记录错误的信息:"+e);
}finally {// 同步consumer.commitSync();
}
SpringBoot集成kafka
- 依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId> </dependency> <dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId> </dependency>
- 配置
server:port: 9991 spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.174.133:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- Producer
@RestController public class HelloController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("/hello")public String hello(){kafkaTemplate.send("itcast-topic", "黑马程序员");return "ok";} }
- Consumer
@Component public class HelloListener {@KafkaListener(topics = "itcast-topic")public void onMessage(String message){if(!StringUtils.isEmpty(message)){System.out.println(message);}} }
- 传递对象
// Producer User user = new User(); user.setName("tom"); user.setAge(18); kafkaTemplate.send("itcast-topic", JSON.toJSONString(user));// Consumer System.out.println(JSON.parseObject(message, User.class));
自媒体文章上下架
实现思路
具体实现
- Producer
public ResponseResult downOrUp(WmNewsDto dto) {// 1. 检验参数// 1.0 检查文章dto是否为空if(dto == null){return ResponseResult.errorRe(AppHttpCodeEnum.PARAM_INVALID, "不可缺少");}// 1.1 检查文章上架参数是否合法if(dto.getEnable() != 0 && dto.getEnabl!= 1){// 默认上架dto.setEnable((short) 1);}// 2. 查询文章WmNews news = getById(dto.getId());if(news == null){return ResponseResult.errorRe(AppHttpCodeEnum.DATA_NOT_EXIST, 存在");}// 3. 查询文章状态if(news.getStatus() != WmNews.StaPUBLISHED.getCode()){return ResponseResult.errorRe(AppHttpCodeEnum.PARAM_INVALID, 章不是发布状态, 不能上下架");}// 4. 上下架news.setEnable(dto.getEnable());updateById(new// 5. 发送消息, 通知article修改文章的配置if(news.getArticleId() != null){HashMap<String, Object> map = HashMap<>();map.put("articleId", news.getArtic());map.put("enable", news.getEnable());kafkaTemplate.(WmNewsMessageConstaWM_NEWS_UP_OR_DOWN_TOPIC, JtoJSONString(map));return ResponseResult.okRe(AppHttpCodeEnum.SUCCESS); }
- Consumer
// Listener
@KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
public void onMessage(String message)
{if(StringUtils.isNotBlank(message)){Map map = JSON.parseObject(message, Map.class);apArticleConfigService.updateByMap(map);}
}// Service
public void updateByMap(Map map) {// 0 下架, 1 上架Object enable = map.get("enable");boolean isDown = true;if(enable.equals(1)){isDown = false;}// 修改文章update(Wrappers.<ApArticleConfig>lambdaUpdate().eq(ApArticleConfig::getArticleId, map.get("articleId")).set(ApArticleConfig::getIsDown, isDown));
}
来源
黑马程序员. 黑马头条
Gitee
https://gitee.com/yu-ba-ba-ba/leadnews
相关文章:

JavaWeb_LeadNews_Day6-Kafka
JavaWeb_LeadNews_Day6-Kafka Kafka概述安装配置kafka入门kafka高可用方案kafka详解生产者同步异步发送消息生产者参数配置消费者同步异步提交偏移量 SpringBoot集成kafka 自媒体文章上下架实现思路具体实现 来源Gitee Kafka 概述 对比 选择 介绍 producer: 发布消息的对象称…...

ATTCK覆盖度97.1%!360终端安全管理系统获赛可达认证
近日,国际知名第三方网络安全检测服务机构——赛可达实验室(SKD Labs)发布最新测试报告,360终端安全管理系统以ATT&CK V12框架攻击技术覆盖面377个、覆盖度97.1%,勒索病毒、挖矿病毒检出率100%,误报率0…...

透视俄乌网络战之一:数据擦除软件
数据擦除破坏 1. WhisperGate2. HermeticWiper3. IsaacWiper4. WhisperKill5. CaddyWiper6. DoubleZero7. AcidRain8. RURansom 数据是政府、社会和企业组织运行的关键要素。数据擦除软件可以在不留任何痕迹的情况下擦除数据并阻止操作系统恢复摧,达到摧毁或目标系统…...

微服务中间件--Nacos
Nacos 1. Nacos入门a.服务注册到Nacosb.Nacos服务分级存储模型c.NacosRule负载均衡d.服务实例的权重设置e.环境隔离 - namespacef.Nacos和Eureka的对比 2. Nacos配置管理a.统一配置管理b.配置热更新c.多环境配置共享 1. Nacos入门 Nacos是阿里巴巴的产品,现在是Spr…...
驱动开发点亮led灯
头文件 #ifndef __HEAD_H__ #define __HEAD_H__#define PHY_LED_MODER 0X50006000 #define PHY_LED_ODR 0X50006014 #define PHY_LED_RCC 0X50000A28 #define PHY_LED_FMODER 0X50007000 #define PHY_LED_FODR 0X50007014#endif驱动代码 #include <linux/init.h> #incl…...

回归预测 | MATLAB实现IPSO-SVM改进粒子群优化算法优化支持向量机多输入单输出回归预测(多指标,多图)
回归预测 | MATLAB实现IPSO-SVM改进粒子群优化算法优化支持向量机多输入单输出回归预测(多指标,多图) 目录 回归预测 | MATLAB实现IPSO-SVM改进粒子群优化算法优化支持向量机多输入单输出回归预测(多指标,多图…...

数学建模之“TOPSIS数学模型”原理和代码详解
一、简介 TOPSIS(Technique for Order Preference by Similarity to Ideal Solution)是一种多准则决策分析方法,用于解决多个候选方案之间的排序和选择问题。它基于一种数学模型,通过比较每个候选方案与理想解和负理想解之间的相…...

threejs使用gui改变相机的参数
调节相机远近角度 定义相机的配置: const cameraConfg reactive({ fov: 45 }) gui中加入调节fov的方法 const gui new dat.GUI();const cameraFolder gui.addFolder("相机属性设置");cameraFolder.add(cameraConfg, "fov", 0, 100).name(…...

计算机竞赛 图像识别-人脸识别与疲劳检测 - python opencv
文章目录 0 前言1 课题背景2 Dlib人脸识别2.1 简介2.2 Dlib优点2.3 相关代码2.4 人脸数据库2.5 人脸录入加识别效果 3 疲劳检测算法3.1 眼睛检测算法3.3 点头检测算法 4 PyQt54.1 简介4.2相关界面代码 5 最后 0 前言 🔥 优质竞赛项目系列,今天要分享的是…...

PHP8的字符串操作3-PHP8知识详解
今天继续分享字符串的操作,前面说到了字符串的去除空格和特殊字符,获取字符串的长度,截取字符串、检索字符串。 今天继续分享字符串的其他操作。如:替换字符串、分割和合成字符串。 5、替换字符串 替换字符串就是对指定字符串中…...

Unity VR:XR Interaction Toolkit 输入系统(Input System):获取手柄的输入
文章目录 📕教程说明📕Input System 和 XR Input Subsystem(推荐 Input System)📕Input Action Asset⭐Actions Maps⭐Actions⭐Action Properties🔍Action Type (Value, Button, Pass through) ⭐Binding …...

智慧工地一体化云平台源码:监管端、工地端、危大工程、智慧大屏、物联网、塔机、吊钩、升降机
智慧工地解决方案依托计算机技术、物联网、云计算、大数据、人工智能、VR&AR等技术相结合,为工程项目管理提供先进技术手段,构建工地现场智能监控和控制体系,弥补传统方法在监管中的缺陷,最终实现项目对人、机、料、法、环的全…...
C# 表达式体方法 C#算阶乘
//表达式体方法private int Add(int a, int b) > a b;[Fact]public void Test(){var result1 Factorial(1);//1var result2 Factorial(2);//2var result3 Factorial(3);//6var result4 Factorial(4);//24var result5 Factorial(5);//120var result6 Add(100, 200);//…...

互联网发展历程:保护与隔离,防火墙的安全壁垒
互联网的快速发展,不仅带来了便利和连接,也引发了越来越多的安全威胁。在数字时代,保护数据和网络安全变得尤为重要。然而,在早期的网络中,安全问题常常让人担忧。 安全问题的困扰:网络威胁日益增加 随着互…...

基于IMX6ULLmini的linux裸机开发系列七:中断处理流程
中断上下文 cpu通过内核寄存器来运行指令并进行数据的读写处理的,它在进入中断前一个时刻的具体值,称为中断上下文 中断上下文是指CPU在进入中断之前保存的寄存器状态和其他相关信息。当CPU接收到中断请求时,它会保存当前正在执行的指令的状…...

Postman软件基本用法:浏览器复制请求信息并导入到软件从而测试、发送请求
本文介绍在浏览器中,获取网页中的某一个请求信息,并将其导入到Postman软件,并进行API请求测试的方法。 Postman是一款流行的API开发和测试工具,它提供了一个用户友好的界面,用于创建、测试、调试和文档化API。本文就介…...
react go实现用户历史登录列表页面
refer: http://ip-api.com/ 1.首先需要创建一个保存用户历史的登录的表,然后连接go 2.在用户登录的时候,获取用户的IP IP位置,在后端直接处理数据即可(不需要在前端传递数据) (1)增加路由&am…...

如何做好服务性能测试
一、什么是性能测试 新功能上线或切换底层数据库或扩容调优,根据实际业务场景的需要,做必要的性能压测,收集性能数据,作为上线的基准报告。 性能测试一般分一下几个阶段: 1. 性能测试 并发量小(jmeter 并…...

速通蓝桥杯嵌入式省一教程:(五)用按键和屏幕实现嵌入式交互系统
一个完整的嵌入式系统,包括任务执行部分和人机交互部分。在前四节中,我们已经讲解了LED、LCD和按键,用这三者就能够实现一个人机交互系统,也即搭建整个嵌入式系统的框架。在后续,只要将各个功能加入到这个交互系统中&a…...

虚拟拍摄,如何用stable diffusion制作自己的形象照?
最近收到了某活动的嘉宾邀请,我将分享: 主题:生成式人工智能的创新实践 简要描述:从品牌营销、智能体、数字内容创作、下一代社区范式等方面,分享LLM与图像等生成式模型的落地应用与实践经验。 领域/研究方向ÿ…...
pam_env.so模块配置解析
在PAM(Pluggable Authentication Modules)配置中, /etc/pam.d/su 文件相关配置含义如下: 配置解析 auth required pam_env.so1. 字段分解 字段值说明模块类型auth认证类模块,负责验证用户身份&am…...
Leetcode 3577. Count the Number of Computer Unlocking Permutations
Leetcode 3577. Count the Number of Computer Unlocking Permutations 1. 解题思路2. 代码实现 题目链接:3577. Count the Number of Computer Unlocking Permutations 1. 解题思路 这一题其实就是一个脑筋急转弯,要想要能够将所有的电脑解锁&#x…...

STM32标准库-DMA直接存储器存取
文章目录 一、DMA1.1简介1.2存储器映像1.3DMA框图1.4DMA基本结构1.5DMA请求1.6数据宽度与对齐1.7数据转运DMA1.8ADC扫描模式DMA 二、数据转运DMA2.1接线图2.2代码2.3相关API 一、DMA 1.1简介 DMA(Direct Memory Access)直接存储器存取 DMA可以提供外设…...

零基础设计模式——行为型模式 - 责任链模式
第四部分:行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习!行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想:使多个对象都有机会处…...

算法笔记2
1.字符串拼接最好用StringBuilder,不用String 2.创建List<>类型的数组并创建内存 List arr[] new ArrayList[26]; Arrays.setAll(arr, i -> new ArrayList<>()); 3.去掉首尾空格...
站群服务器的应用场景都有哪些?
站群服务器主要是为了多个网站的托管和管理所设计的,可以通过集中管理和高效资源的分配,来支持多个独立的网站同时运行,让每一个网站都可以分配到独立的IP地址,避免出现IP关联的风险,用户还可以通过控制面板进行管理功…...

Elastic 获得 AWS 教育 ISV 合作伙伴资质,进一步增强教育解决方案产品组合
作者:来自 Elastic Udayasimha Theepireddy (Uday), Brian Bergholm, Marianna Jonsdottir 通过搜索 AI 和云创新推动教育领域的数字化转型。 我们非常高兴地宣布,Elastic 已获得 AWS 教育 ISV 合作伙伴资质。这一重要认证表明,Elastic 作为 …...
如何通过git命令查看项目连接的仓库地址?
要通过 Git 命令查看项目连接的仓库地址,您可以使用以下几种方法: 1. 查看所有远程仓库地址 使用 git remote -v 命令,它会显示项目中配置的所有远程仓库及其对应的 URL: git remote -v输出示例: origin https://…...

高抗扰度汽车光耦合器的特性
晶台光电推出的125℃光耦合器系列产品(包括KL357NU、KL3H7U和KL817U),专为高温环境下的汽车应用设计,具备以下核心优势和技术特点: 一、技术特性分析 高温稳定性 采用先进的LED技术和优化的IC设计,确保在…...
【2D与3D SLAM中的扫描匹配算法全面解析】
引言 扫描匹配(Scan Matching)是同步定位与地图构建(SLAM)系统中的核心组件,它通过对齐连续的传感器观测数据来估计机器人的运动。本文将深入探讨2D和3D SLAM中的各种扫描匹配算法,包括数学原理、实现细节以及实际应用中的性能对比,特别关注…...