JavaWeb_LeadNews_Day6-Kafka
JavaWeb_LeadNews_Day6-Kafka
- Kafka
- 概述
- 安装配置
- kafka入门
- kafka高可用方案
- kafka详解
- 生产者同步异步发送消息
- 生产者参数配置
- 消费者同步异步提交偏移量
- SpringBoot集成kafka
- 自媒体文章上下架
- 实现思路
- 具体实现
- 来源
- Gitee
Kafka
概述
- 对比

- 选择

- 介绍
- producer: 发布消息的对象称之为主题生产者 (Kafka topic producer)
- topic: Kafka将消息分门别类,每一类的消息称之为一个主题 (Topic)
- consumer:订阅消息并处理发布的消息的对象称之为主题消费者 (consumers)
- broker:已发布的消息保存在一组服务器中,称之为Kafka集群,集群中的每一个服务器都是一个代理(Broker)。消费者可以订阅个或多个主题 (topic),并从Broker拉数据,从而消费这些已发布的消息
安装配置
- 安装zookeeper
// 下载zookeeper镜像 docker pull zookeeper:3.4.14 // 创建容器 docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14 - 安装kafka
// 下载kafka镜像 docker pull wurstmeister/kafka:2.12-2.3.1 // 创建容器 docker run -d --name kafka \ --env KAFKA_ADVERTISED_HOST_NAME=192.168.174.133 \ --env KAFKA_ZOOKEEPER_CONNECT=192.168.174.133:2181 \ --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.174.133:9092 \ --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \ --env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \ --net=host wurstmeister/kafka:2.12-2.3.1// 解释 --net=host,直接使用容器宿主机的网络命名空间,即没有独立的网络环境。它使用宿主机的ip和端口(云主机会不好使)
kafka入门
- 依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId> </dependency> - Producer
public class ProducerQuickStart {public static void main(String[] args) {// 1. kafka链接配置信息Properties prop = new Properties();// 1.1 kafka链接地址prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.174.133:9092");// 1.2 key和value的序列化prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 2. 创建kafka生产者对象KafkaProducer<String, String> producer = new KafkaProducer<>(prop);// 3. 发送信息// 参数列表: topic, key, valueProducerRecord<String, String> record = new ProducerRecord<>("topic-first", "key1", "Hello Kafka!");producer.send(record);// 4. 关闭消息通道// 必须关闭, 否则消息发送bucgproducer.close();} } - Consumer
public class ConsumerQuickStart {public static void main(String[] args) {// 1. kafka的配置信息Properties prop = new Properties();// 1.1 链接地址prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.174.133:9092");// 1.2 key和value的反序列化器prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 1.3 设置消费者组prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");// 2. 创建消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);// 3. 订阅主题consumer.subscribe(Collections.singleton("topic-first"));// 4. 拉取信息while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.key());System.out.println(record.value());}}} } - 总结
- 同一组只有一个消费者能够接收到消息, 如果需要所有消费者都能接收到消息, 需要消费者在不同的组
kafka高可用方案
-
集群

-
备份

kafka定义了两类副本:- 领导者副本
- 追随者副本
数据在领导者副本存储后, 会同步到追随者副本

同步方式
leader失效后, 选择leader的原则- 优先从ISR中选取, 因为ISR的数据和leader是同步的.
- ISR中的follower都不行了, 就从其他的follower中选取.
- 当所有的follower都失效了, 第一种是等待ISR中的follower活过来, 数据可靠, 但等待时间不确定, 第二种是等待任意follower活过来, 最快速度恢复可用性, 但数据不一定完整.
kafka详解
生产者同步异步发送消息
// 同步发送
RecordMetadata metadata = producer.send(record).get();
System.out.println(metadata.offset());// 异步发送
producer.send(record, new Callback(){@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e != null) {System.out.println("记录异常信息到日志表中");}System.out.println(recordMetadata.offset());}
});
生产者参数配置
- 消息确认
确认机制 说明 acks=0 生产者在成功写入消息之前不会等待任何来自服务器的响应,消息有丢失的风险,但是速度最快 acks=1(默认值) 只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应 acks=all 只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应 prop.put(ProducerConfig.ACKS_CONFIG, "all"); - 消息重传
设置消息重传次数, 默认每次重试之间等待100msprop.put(ProducerConfig.RETRIES_CONFIG, 10); - 消息压缩
默认情况, 消息发送不会压缩
使用压缩可以降低网络传输开销和存储开销, 而这往往是向kafka发送消息的瓶颈所在压缩算法 说明 snappy 占用较少的 CPU,却能提供较好的性能和相当可观的压缩比,如果看重性能和网络带宽,建议采用 lz4 占用较少的 CPU,压缩和解压缩速度较快,压缩比也很客观 gzip 占用较多的CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法 prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
消费者同步异步提交偏移量
// 同步提交偏移量
consumer.commitSync();// 异步提交偏移量
consumer.commitAsync(new OffsetCommitCallback(){@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {if(e!=null){System.out.println("记录错误的提交偏移量"+map+", 异常信息为"+e);}}
});// 同步异步提交
try {while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.key());System.out.println(record.value());System.out.println(record.partition());System.out.println(record.offset());}// 异步提交偏移量consumer.commitAsync();}
} catch (Exception e) {e.printStackTrace();System.out.println("记录错误的信息:"+e);
}finally {// 同步consumer.commitSync();
}
SpringBoot集成kafka
- 依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId> </dependency> <dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId> </dependency> - 配置
server:port: 9991 spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.174.133:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer - Producer
@RestController public class HelloController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("/hello")public String hello(){kafkaTemplate.send("itcast-topic", "黑马程序员");return "ok";} } - Consumer
@Component public class HelloListener {@KafkaListener(topics = "itcast-topic")public void onMessage(String message){if(!StringUtils.isEmpty(message)){System.out.println(message);}} } - 传递对象
// Producer User user = new User(); user.setName("tom"); user.setAge(18); kafkaTemplate.send("itcast-topic", JSON.toJSONString(user));// Consumer System.out.println(JSON.parseObject(message, User.class));
自媒体文章上下架
实现思路

具体实现
- Producer
public ResponseResult downOrUp(WmNewsDto dto) {// 1. 检验参数// 1.0 检查文章dto是否为空if(dto == null){return ResponseResult.errorRe(AppHttpCodeEnum.PARAM_INVALID, "不可缺少");}// 1.1 检查文章上架参数是否合法if(dto.getEnable() != 0 && dto.getEnabl!= 1){// 默认上架dto.setEnable((short) 1);}// 2. 查询文章WmNews news = getById(dto.getId());if(news == null){return ResponseResult.errorRe(AppHttpCodeEnum.DATA_NOT_EXIST, 存在");}// 3. 查询文章状态if(news.getStatus() != WmNews.StaPUBLISHED.getCode()){return ResponseResult.errorRe(AppHttpCodeEnum.PARAM_INVALID, 章不是发布状态, 不能上下架");}// 4. 上下架news.setEnable(dto.getEnable());updateById(new// 5. 发送消息, 通知article修改文章的配置if(news.getArticleId() != null){HashMap<String, Object> map = HashMap<>();map.put("articleId", news.getArtic());map.put("enable", news.getEnable());kafkaTemplate.(WmNewsMessageConstaWM_NEWS_UP_OR_DOWN_TOPIC, JtoJSONString(map));return ResponseResult.okRe(AppHttpCodeEnum.SUCCESS); } - Consumer
// Listener
@KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
public void onMessage(String message)
{if(StringUtils.isNotBlank(message)){Map map = JSON.parseObject(message, Map.class);apArticleConfigService.updateByMap(map);}
}// Service
public void updateByMap(Map map) {// 0 下架, 1 上架Object enable = map.get("enable");boolean isDown = true;if(enable.equals(1)){isDown = false;}// 修改文章update(Wrappers.<ApArticleConfig>lambdaUpdate().eq(ApArticleConfig::getArticleId, map.get("articleId")).set(ApArticleConfig::getIsDown, isDown));
}
来源
黑马程序员. 黑马头条
Gitee
https://gitee.com/yu-ba-ba-ba/leadnews
相关文章:
JavaWeb_LeadNews_Day6-Kafka
JavaWeb_LeadNews_Day6-Kafka Kafka概述安装配置kafka入门kafka高可用方案kafka详解生产者同步异步发送消息生产者参数配置消费者同步异步提交偏移量 SpringBoot集成kafka 自媒体文章上下架实现思路具体实现 来源Gitee Kafka 概述 对比 选择 介绍 producer: 发布消息的对象称…...
ATTCK覆盖度97.1%!360终端安全管理系统获赛可达认证
近日,国际知名第三方网络安全检测服务机构——赛可达实验室(SKD Labs)发布最新测试报告,360终端安全管理系统以ATT&CK V12框架攻击技术覆盖面377个、覆盖度97.1%,勒索病毒、挖矿病毒检出率100%,误报率0…...
透视俄乌网络战之一:数据擦除软件
数据擦除破坏 1. WhisperGate2. HermeticWiper3. IsaacWiper4. WhisperKill5. CaddyWiper6. DoubleZero7. AcidRain8. RURansom 数据是政府、社会和企业组织运行的关键要素。数据擦除软件可以在不留任何痕迹的情况下擦除数据并阻止操作系统恢复摧,达到摧毁或目标系统…...
微服务中间件--Nacos
Nacos 1. Nacos入门a.服务注册到Nacosb.Nacos服务分级存储模型c.NacosRule负载均衡d.服务实例的权重设置e.环境隔离 - namespacef.Nacos和Eureka的对比 2. Nacos配置管理a.统一配置管理b.配置热更新c.多环境配置共享 1. Nacos入门 Nacos是阿里巴巴的产品,现在是Spr…...
驱动开发点亮led灯
头文件 #ifndef __HEAD_H__ #define __HEAD_H__#define PHY_LED_MODER 0X50006000 #define PHY_LED_ODR 0X50006014 #define PHY_LED_RCC 0X50000A28 #define PHY_LED_FMODER 0X50007000 #define PHY_LED_FODR 0X50007014#endif驱动代码 #include <linux/init.h> #incl…...
回归预测 | MATLAB实现IPSO-SVM改进粒子群优化算法优化支持向量机多输入单输出回归预测(多指标,多图)
回归预测 | MATLAB实现IPSO-SVM改进粒子群优化算法优化支持向量机多输入单输出回归预测(多指标,多图) 目录 回归预测 | MATLAB实现IPSO-SVM改进粒子群优化算法优化支持向量机多输入单输出回归预测(多指标,多图…...
数学建模之“TOPSIS数学模型”原理和代码详解
一、简介 TOPSIS(Technique for Order Preference by Similarity to Ideal Solution)是一种多准则决策分析方法,用于解决多个候选方案之间的排序和选择问题。它基于一种数学模型,通过比较每个候选方案与理想解和负理想解之间的相…...
threejs使用gui改变相机的参数
调节相机远近角度 定义相机的配置: const cameraConfg reactive({ fov: 45 }) gui中加入调节fov的方法 const gui new dat.GUI();const cameraFolder gui.addFolder("相机属性设置");cameraFolder.add(cameraConfg, "fov", 0, 100).name(…...
计算机竞赛 图像识别-人脸识别与疲劳检测 - python opencv
文章目录 0 前言1 课题背景2 Dlib人脸识别2.1 简介2.2 Dlib优点2.3 相关代码2.4 人脸数据库2.5 人脸录入加识别效果 3 疲劳检测算法3.1 眼睛检测算法3.3 点头检测算法 4 PyQt54.1 简介4.2相关界面代码 5 最后 0 前言 🔥 优质竞赛项目系列,今天要分享的是…...
PHP8的字符串操作3-PHP8知识详解
今天继续分享字符串的操作,前面说到了字符串的去除空格和特殊字符,获取字符串的长度,截取字符串、检索字符串。 今天继续分享字符串的其他操作。如:替换字符串、分割和合成字符串。 5、替换字符串 替换字符串就是对指定字符串中…...
Unity VR:XR Interaction Toolkit 输入系统(Input System):获取手柄的输入
文章目录 📕教程说明📕Input System 和 XR Input Subsystem(推荐 Input System)📕Input Action Asset⭐Actions Maps⭐Actions⭐Action Properties🔍Action Type (Value, Button, Pass through) ⭐Binding …...
智慧工地一体化云平台源码:监管端、工地端、危大工程、智慧大屏、物联网、塔机、吊钩、升降机
智慧工地解决方案依托计算机技术、物联网、云计算、大数据、人工智能、VR&AR等技术相结合,为工程项目管理提供先进技术手段,构建工地现场智能监控和控制体系,弥补传统方法在监管中的缺陷,最终实现项目对人、机、料、法、环的全…...
C# 表达式体方法 C#算阶乘
//表达式体方法private int Add(int a, int b) > a b;[Fact]public void Test(){var result1 Factorial(1);//1var result2 Factorial(2);//2var result3 Factorial(3);//6var result4 Factorial(4);//24var result5 Factorial(5);//120var result6 Add(100, 200);//…...
互联网发展历程:保护与隔离,防火墙的安全壁垒
互联网的快速发展,不仅带来了便利和连接,也引发了越来越多的安全威胁。在数字时代,保护数据和网络安全变得尤为重要。然而,在早期的网络中,安全问题常常让人担忧。 安全问题的困扰:网络威胁日益增加 随着互…...
基于IMX6ULLmini的linux裸机开发系列七:中断处理流程
中断上下文 cpu通过内核寄存器来运行指令并进行数据的读写处理的,它在进入中断前一个时刻的具体值,称为中断上下文 中断上下文是指CPU在进入中断之前保存的寄存器状态和其他相关信息。当CPU接收到中断请求时,它会保存当前正在执行的指令的状…...
Postman软件基本用法:浏览器复制请求信息并导入到软件从而测试、发送请求
本文介绍在浏览器中,获取网页中的某一个请求信息,并将其导入到Postman软件,并进行API请求测试的方法。 Postman是一款流行的API开发和测试工具,它提供了一个用户友好的界面,用于创建、测试、调试和文档化API。本文就介…...
react go实现用户历史登录列表页面
refer: http://ip-api.com/ 1.首先需要创建一个保存用户历史的登录的表,然后连接go 2.在用户登录的时候,获取用户的IP IP位置,在后端直接处理数据即可(不需要在前端传递数据) (1)增加路由&am…...
如何做好服务性能测试
一、什么是性能测试 新功能上线或切换底层数据库或扩容调优,根据实际业务场景的需要,做必要的性能压测,收集性能数据,作为上线的基准报告。 性能测试一般分一下几个阶段: 1. 性能测试 并发量小(jmeter 并…...
速通蓝桥杯嵌入式省一教程:(五)用按键和屏幕实现嵌入式交互系统
一个完整的嵌入式系统,包括任务执行部分和人机交互部分。在前四节中,我们已经讲解了LED、LCD和按键,用这三者就能够实现一个人机交互系统,也即搭建整个嵌入式系统的框架。在后续,只要将各个功能加入到这个交互系统中&a…...
虚拟拍摄,如何用stable diffusion制作自己的形象照?
最近收到了某活动的嘉宾邀请,我将分享: 主题:生成式人工智能的创新实践 简要描述:从品牌营销、智能体、数字内容创作、下一代社区范式等方面,分享LLM与图像等生成式模型的落地应用与实践经验。 领域/研究方向ÿ…...
css实现圆环展示百分比,根据值动态展示所占比例
代码如下 <view class""><view class"circle-chart"><view v-if"!!num" class"pie-item" :style"{background: conic-gradient(var(--one-color) 0%,#E9E6F1 ${num}%),}"></view><view v-else …...
学校招生小程序源码介绍
基于ThinkPHPFastAdminUniApp开发的学校招生小程序源码,专为学校招生场景量身打造,功能实用且操作便捷。 从技术架构来看,ThinkPHP提供稳定可靠的后台服务,FastAdmin加速开发流程,UniApp则保障小程序在多端有良好的兼…...
1.3 VSCode安装与环境配置
进入网址Visual Studio Code - Code Editing. Redefined下载.deb文件,然后打开终端,进入下载文件夹,键入命令 sudo dpkg -i code_1.100.3-1748872405_amd64.deb 在终端键入命令code即启动vscode 需要安装插件列表 1.Chinese简化 2.ros …...
【算法训练营Day07】字符串part1
文章目录 反转字符串反转字符串II替换数字 反转字符串 题目链接:344. 反转字符串 双指针法,两个指针的元素直接调转即可 class Solution {public void reverseString(char[] s) {int head 0;int end s.length - 1;while(head < end) {char temp …...
AI,如何重构理解、匹配与决策?
AI 时代,我们如何理解消费? 作者|王彬 封面|Unplash 人们通过信息理解世界。 曾几何时,PC 与移动互联网重塑了人们的购物路径:信息变得唾手可得,商品决策变得高度依赖内容。 但 AI 时代的来…...
Linux离线(zip方式)安装docker
目录 基础信息操作系统信息docker信息 安装实例安装步骤示例 遇到的问题问题1:修改默认工作路径启动失败问题2 找不到对应组 基础信息 操作系统信息 OS版本:CentOS 7 64位 内核版本:3.10.0 相关命令: uname -rcat /etc/os-rele…...
Web中间件--tomcat学习
Web中间件–tomcat Java虚拟机详解 什么是JAVA虚拟机 Java虚拟机是一个抽象的计算机,它可以执行Java字节码。Java虚拟机是Java平台的一部分,Java平台由Java语言、Java API和Java虚拟机组成。Java虚拟机的主要作用是将Java字节码转换为机器代码&#x…...
Oracle11g安装包
Oracle 11g安装包 适用于windows系统,64位 下载路径 oracle 11g 安装包...
MFE(微前端) Module Federation:Webpack.config.js文件中每个属性的含义解释
以Module Federation 插件详为例,Webpack.config.js它可能的配置和含义如下: 前言 Module Federation 的Webpack.config.js核心配置包括: name filename(定义应用标识) remotes(引用远程模块࿰…...
C# winform教程(二)----checkbox
一、作用 提供一个用户选择或者不选的状态,这是一个可以多选的控件。 二、属性 其实功能大差不差,除了特殊的几个外,与button基本相同,所有说几个独有的 checkbox属性 名称内容含义appearance控件外观可以变成按钮形状checkali…...
