当前位置: 首页 > news >正文

kafka微服务学习

消息中间件对比:
1、吞吐、可靠性、性能
在这里插入图片描述

Kafka安装

Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper

  • Docker安装zookeeper

下载镜像:

docker pull zookeeper:3.4.14

创建容器

docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
  • Docker安装kafka

下载镜像:

docker pull wurstmeister/kafka:2.12-2.3.1

创建容器

docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1

kafka入门

  • 生产者发送消息,多个消费者只能有一个消费者接收到消息
  • 生产者发送消息,多个消费者都可以接收到消息

(1)创建kafka-demo项目,导入依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>

(2)生产者发送消息

package com.heima.kafka.sample;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** 生产者*/
public class ProducerQuickStart {public static void main(String[] args) {//1.kafka的配置信息Properties properties = new Properties();//kafka的连接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//发送失败,失败的重试次数properties.put(ProducerConfig.RETRIES_CONFIG,5);//消息key的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//消息value的序列化器properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//2.生产者对象KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);//封装发送的消息ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");//3.发送消息producer.send(record);//4.关闭消息通道,必须关闭,否则消息发送不成功producer.close();}}

(3)消费者接收消息

package com.heima.kafka.sample;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*** 消费者*/
public class ConsumerQuickStart {public static void main(String[] args) {//1.添加kafka的配置信息Properties properties = new Properties();//kafka的连接地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");//消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");//消息的反序列化器properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//2.消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);//3.订阅主题consumer.subscribe(Collections.singletonList("itheima-topic"));//当前线程一直处于监听状态while (true) {//4.获取消息ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());}}}}

kafka高可用设计

1、设计集群模式:

Kafka的服务器端由被称为 Broker 的服务进程构成,即一个 Kafka 集群由多个Broker 组成。当一个机器宕机了,另外一个机器就会替补山
在这里插入图片描述

2、备份机制:

Kafka定义了两类副本

  1. 领导者副本(Leader Replica)
  2. 追随者副本 (Follower Replica)
    追随者副本分为两类:
    1、一种是ISR副本,同步保存
    2、普通的副本,异步保存
    出现主节点宕机,会先选ISR副本中的一个成为新的主节点,保证数据一致性,没有ISR节点,再从普通节点中挑选
    针对全部节点宕机的情况,有两种策略:
    1、等待第一个ISR副本,保证了数据的尽可能一致
    2、等待一个复活的追随者,无论是ISR还是普通,提高系统的高可用性。

kafka生产者详解

1发送类型

  • 同步发送

    使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功

RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();
System.out.println(recordMetadata.offset());
  • 异步发送

    调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数

//异步消息发送
producer.send(kvProducerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e != null){System.out.println("记录异常信息到日志表中");}System.out.println(recordMetadata.offset());}
});

2参数详解

  • ack

代码的配置方式:

//ack配置  消息确认机制
prop.put(ProducerConfig.ACKS_CONFIG,"all");

参数的选择说明

确认机制说明
acks=0生产者在成功写入消息之前不会等待任何来自服务器的响应,消息有丢失的风险,但是速度最快
acks=1(默认值)只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应
acks=all只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应
  • retries

生产者从服务器收到的错误有可能是临时性错误,在这种情况下,retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试返回错误,默认情况下,生产者会在每次重试之间等待100ms

代码中配置方式:

//重试次数
prop.put(ProducerConfig.RETRIES_CONFIG,10);
  • 消息压缩

默认情况下, 消息发送时不会被压缩。

代码中配置方式:

//数据压缩
prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
压缩算法说明
snappy占用较少的 CPU, 却能提供较好的性能和相当可观的压缩比, 如果看重性能和网络带宽,建议采用
lz4占用较少的 CPU, 压缩和解压缩速度较快,压缩比也很客观
gzip占用较多的 CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法

使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。

kafka消费者

消息的有序性

方法:一个topic分区能保证自己的数据是按照先后消费的,但是不能保证跨分区消息处理的先后顺序。我么只能使用一个分区,在单分区种,消息可以保证严格顺序消费

提交和偏移量

在这里插入图片描述
自动提交:
当enable.auto.commit被设置为true,提交方式就是让消费者自动提交偏移量,每隔5秒消费者会自动把从poll0方法接收的最大偏移量提交上去,这样只是记录了规定时间内的最大偏移量,其实与数据提交的偏移量存在偏差,因此可能会出现数据的重复提交或者丢失
手动提交
当enableauto.commit被设置为false可以有以下三种提交方式

  • 提交当前偏移量(同步提交)
  • 异步提交
  • 同步和异步组合提交

同步提交:commitSync()方法会一直尝试直至提交成功,如果提交失败也可以记录到错误日志里。

while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());System.out.println(record.key());try {consumer.commitSync();//同步提交当前最新的偏移量}catch (CommitFailedException e){System.out.println("记录提交失败的异常:"+e);}}
}

异步提交:手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。消息没有重试机制

while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());System.out.println(record.key());}consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {if(e!=null){System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e);}}});
}

同步和异步组合提交

异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖

举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。

try {while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());System.out.println(record.key());}consumer.commitAsync();}
}catch (Exception e){+e.printStackTrace();System.out.println("记录错误信息:"+e);
}finally {try {consumer.commitSync();}finally {consumer.close();}
}

springboot整合kafka

1、在父类中的pop文件中导入依赖包

```xml
<!-- kafkfa -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>

2、在需要用到kafka的微服务的naco中分别配置生产者和消费者配置

spring:kafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
spring:kafka:bootstrap-servers: 192.168.200.130:9092consumer:group-id: ${spring.application.name}key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

传递消息为对象

目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式

方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,本章节不介绍

方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,本项目采用这种方式

  • 发送消息
@GetMapping("/hello")
public String hello(){User user = new User();user.setUsername("xiaowang");user.setAge(18);kafkaTemplate.send("user-topic", JSON.toJSONString(user));return "ok";
}
  • 接收消息
package com.heima.kafka.listener;import com.alibaba.fastjson.JSON;
import com.heima.kafka.pojo.User;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;@Component
public class HelloListener {@KafkaListener(topics = "user-topic")public void onMessage(String message){if(!StringUtils.isEmpty(message)){User user = JSON.parseObject(message, User.class);System.out.println(user);}}
}

相关文章:

kafka微服务学习

消息中间件对比&#xff1a; 1、吞吐、可靠性、性能 Kafka安装 Kafka对于zookeeper是强依赖&#xff0c;保存kafka相关的节点数据&#xff0c;所以安装Kafka之前必须先安装zookeeper Docker安装zookeeper 下载镜像&#xff1a; docker pull zookeeper:3.4.14创建容器 do…...

5G网络切片,到底是什么?

网络切片&#xff0c;是5G引入的一个全新概念。 一看到切片&#xff0c;首先想到的&#xff0c;必然是把一个完整的东西切成薄片。于是&#xff0c;切面包或者切西瓜这样的画面&#xff0c;映入脑海。 添加图片注释&#xff0c;不超过 140 字&#xff08;可选&#xff09; 然而…...

linux安装nodejs

写在前面 因为工作需要&#xff0c;需要使用到nodejs&#xff0c;所以这里简单记录下学习过程。 1&#xff1a;安装 wget https://nodejs.org/dist/v14.17.4/node-v14.17.4-linux-x64.tar.xz tar xf node-v14.17.4-linux-x64.tar.xz mkdir /usr/local/lib/node // 这一步骤根…...

第1天:Python基础语法(一)

** 1、Python简介 ** Python是一种高级、通用的编程语言&#xff0c;由Guido van Rossum于1989年创造。它被设计为易于阅读和理解&#xff0c;具有简洁而清晰的语法&#xff0c;使得初学者和专业开发人员都能够轻松上手。 Python拥有丰富的标准库&#xff0c;提供了广泛的功…...

ppt聚光灯效果

1.放入三张图片内容或其他 2.全选复制成图片 3.设置黑色矩形&#xff0c;透明度30% 4.粘贴复制后的图片&#xff0c;制定图层 5.插入椭圆&#xff0c;先选中矩形&#xff0c;再选中椭圆&#xff0c;点击绘图工具&#xff0c;选择相交即可&#xff08;关键&#xff09;...

图文解析 Nacos 配置中心的实现

目录 一、什么是 Nacos 二、配置中心的架构 三、Nacos 使用示例 &#xff08;一&#xff09;官方代码示例 &#xff08;二&#xff09;Properties 解读 &#xff08;三&#xff09;配置项的层级设计 &#xff08;四&#xff09;获取配置 &#xff08;五&#xff09;注册…...

P1918 保龄球

Portal. 记录每一个瓶子数对应的位置即可。 注意到值域很大&#xff08; a i ≤ 1 0 9 a_i\leq 10^9 ai​≤109&#xff09;&#xff0c;要用 map 存储。 #include <bits/stdc.h> using namespace std;map<int,int> p;int main() {int n;cin>>n;for(int i…...

SAP-PP-报错:工作中心 7333_JQ 工厂 7331 对任务清单类型 N 不存在

创建工艺路线时报错&#xff1a;工作中心 7333_JQ 工厂 7331 对任务清单类型 N 不存在&#xff0c; 这是因为在创建工作中心时未维护控制键值导致的...

MySQL -- 用户管理

MySQL – 用户管理 文章目录 MySQL -- 用户管理一、用户1.用户信息2.创建用户3.删除用户4.远端登录MySQL5.修改用户密码6.数据库的权限 一、用户 1.用户信息 MySQL中的用户&#xff0c;都存储在系统数据库mysql的user表中&#xff1a; host&#xff1a; 表示这个用户可以从…...

IOS浏览器不支持对element ui table的宽度设置百分比

IOS浏览器不支持对element ui table的宽度设置百分比 IOS浏览器会把百分号识别成px&#xff0c;所以我们可以根据屏幕宽度将百分比转换成px getColumnWidth(data) {const screenWidth window.innerWidth;const desiredPercentage data;const widthInPixels (screenWidth *…...

Vue+OpenLayers 创建地图并显示鼠标所在经纬度

1、效果 2、创建地图 本文用的是高德地图 页面 <div class"map" id"map"></div><div id"mouse-position" class"position_coordinate"></div>初始化地图 var gaodeLayer new TileLayer({title: "高德地…...

01-编码-H264编码原理

1.整体概念 编码的含义就是压缩,将摄像头采集的YUV或RGB数据压缩成H264。 压缩的过程就是去除信息冗余的过程,一般视频有如下的冗余信息。 (1)空间冗余:在同一个画面中,相邻的像素点之间的变化很小,因而可以用一个特定大小的矩阵来描述相邻的这些像素。 (2)时间冗余:…...

RxJava/RxAndroid的操作符使用(二)

文章目录 一、创建操作1、基本创建2、快速创建2.1 empty2.2 never2.3 error2.4 from2.5 just 3、定时与延时创建操作3.1 defer3.2 timer3.3 interval3.4 intervalRange3.5 range3.6 repeat 二、过滤操作1、skip/skipLast2、debounce3、distinct——去重4、elementAt——获取指定…...

【C语法学习】20 - 文件访问顺序

文章目录 0 前言1 文件位置指示符2 rewind()函数2.1 函数原型2.2 参数2.3 返回值2.4 使用说明 3 ftell()函数3.1 函数原型3.2 参数3.3 返回值 4 fseek()函数4.1 函数原型4.2 参数4.3 返回值 5 示例5.1 示例15.2 示例2 0 前言 C语言文件访问分为顺序文件访问和随机文件访问。 …...

Etcd 常用命令与备份恢复

1. etcd简介 官方网站&#xff1a;etcd.io 官方文档&#xff1a;etcd.io/docs/v3.5/op-guide/maintenance 官方硬件推荐&#xff1a;etcd.io/docs/v3.5/op-guide/hardware github地址&#xff1a;github.com/etcd-io/etcd etcd是CoreOS团队于2013年6月发起的开源项目&#xf…...

获取任意时间段内周、季度、半年的二级联动

#需求是获取两个时间内 年周 、年季度、年半年的二级联动# 找了半天也找不到什么有用的信息 就自己简单写了一个 思路是先获取年的列表再去嵌套查询 根据前端VUE提供的格式嵌套 public function getDate(){$leixing Request::param(leixing);$larr array(1,2,3,4);if(empty(…...

前端面试系列之工程化篇

如果对前端八股文感兴趣&#xff0c;可以留意公重号&#xff1a;码农补给站&#xff0c;总有你要的干货。 前端工程化 Webpack 概念 本质上&#xff0c;webpack 是一个用于现代 JavaScript 应用程序的静态模块打包工具。当 webpack 处理应用程序时&#xff0c;它会在内部从一个…...

京东按关键词搜索商品列表接口:竞品分析,商品管理,营销策略制定

京东搜索商品列表接口是京东开放平台提供的一种API接口&#xff0c;通过调用该接口&#xff0c;开发者可以获取京东平台上商品的列表数据&#xff0c;包括商品的标题、价格、库存、月销量、总销量、详情描述、图片等信息。 接口的主要作用包括&#xff1a; 市场调研&#xff…...

Microsoft Dynamics 365 CE 扩展定制 - 9. Dynamics 365扩展

在本章中,我们将介绍以下内容: Dynamics 365应用程序Dynamics 365通用数据服务构建Dynamics 365 PowerApp使用Flow在CDS和Dynamics 365之间移动数据从AppSource安装解决方案使用数据导出服务解决方案进行数据复制从CRM数据构建Power BI仪表板简介 多年来,Dynamics CRM已从一…...

多篇论文介绍-Wiou

论文地址 目录 https://arxiv.org/pdf/2301.10051.pdf 01 CIEFRNet&#xff1a;面向高速公路的抛洒物检测算法 02改进 YOLOv5 的 PDC 钻头复合片缺损识别 03 基于SimAM注意力机制的DCN-YOLOv5水下目标检测 04 基于改进YOLOv7-tiny 算法的输电线路螺栓缺销检测 05 基于改…...

UE5 学习系列(二)用户操作界面及介绍

这篇博客是 UE5 学习系列博客的第二篇&#xff0c;在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下&#xff1a; 【Note】&#xff1a;如果你已经完成安装等操作&#xff0c;可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作&#xff0c;重…...

应用升级/灾备测试时使用guarantee 闪回点迅速回退

1.场景 应用要升级,当升级失败时,数据库回退到升级前. 要测试系统,测试完成后,数据库要回退到测试前。 相对于RMAN恢复需要很长时间&#xff0c; 数据库闪回只需要几分钟。 2.技术实现 数据库设置 2个db_recovery参数 创建guarantee闪回点&#xff0c;不需要开启数据库闪回。…...

51c自动驾驶~合集58

我自己的原文哦~ https://blog.51cto.com/whaosoft/13967107 #CCA-Attention 全局池化局部保留&#xff0c;CCA-Attention为LLM长文本建模带来突破性进展 琶洲实验室、华南理工大学联合推出关键上下文感知注意力机制&#xff08;CCA-Attention&#xff09;&#xff0c;…...

Docker 运行 Kafka 带 SASL 认证教程

Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明&#xff1a;server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…...

【论文笔记】若干矿井粉尘检测算法概述

总的来说&#xff0c;传统机器学习、传统机器学习与深度学习的结合、LSTM等算法所需要的数据集来源于矿井传感器测量的粉尘浓度&#xff0c;通过建立回归模型来预测未来矿井的粉尘浓度。传统机器学习算法性能易受数据中极端值的影响。YOLO等计算机视觉算法所需要的数据集来源于…...

Spring Boot+Neo4j知识图谱实战:3步搭建智能关系网络!

一、引言 在数据驱动的背景下&#xff0c;知识图谱凭借其高效的信息组织能力&#xff0c;正逐步成为各行业应用的关键技术。本文聚焦 Spring Boot与Neo4j图数据库的技术结合&#xff0c;探讨知识图谱开发的实现细节&#xff0c;帮助读者掌握该技术栈在实际项目中的落地方法。 …...

零基础设计模式——行为型模式 - 责任链模式

第四部分&#xff1a;行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习&#xff01;行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想&#xff1a;使多个对象都有机会处…...

SAP学习笔记 - 开发26 - 前端Fiori开发 OData V2 和 V4 的差异 (Deepseek整理)

上一章用到了V2 的概念&#xff0c;其实 Fiori当中还有 V4&#xff0c;咱们这一章来总结一下 V2 和 V4。 SAP学习笔记 - 开发25 - 前端Fiori开发 Remote OData Service(使用远端Odata服务)&#xff0c;代理中间件&#xff08;ui5-middleware-simpleproxy&#xff09;-CSDN博客…...

安宝特方案丨船舶智造的“AR+AI+作业标准化管理解决方案”(装配)

船舶制造装配管理现状&#xff1a;装配工作依赖人工经验&#xff0c;装配工人凭借长期实践积累的操作技巧完成零部件组装。企业通常制定了装配作业指导书&#xff0c;但在实际执行中&#xff0c;工人对指导书的理解和遵循程度参差不齐。 船舶装配过程中的挑战与需求 挑战 (1…...

代码随想录刷题day30

1、零钱兑换II 给你一个整数数组 coins 表示不同面额的硬币&#xff0c;另给一个整数 amount 表示总金额。 请你计算并返回可以凑成总金额的硬币组合数。如果任何硬币组合都无法凑出总金额&#xff0c;返回 0 。 假设每一种面额的硬币有无限个。 题目数据保证结果符合 32 位带…...