Kafka实战(Scala操作)
Kafka基础讲解部分
Kafka基础讲解部分
Kafka实战(Scala操作)
1、引入依赖
版本:
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spark.scala.version>2.12</spark.scala.version>
<spark.version>3.1.2</spark.version>
<spark.kafka.version>3.5.1</spark.kafka.version>
<kafka.version>2.8.0</kafka.version>
具体依赖:
<!-- spark-core -->
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_${spark.scala.version}</artifactId><version>${spark.version}</version>
</dependency><!-- spark-sql -->
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_${spark.scala.version}</artifactId><version>${spark.version}</version>
</dependency><!-- kafka -->
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.version}</version>
</dependency>
2、创建生产者(Producer)
一:生产者相关配置讲解:
-
BATCH_SIZE_CONFIG = "batch.size":批处理数量,消息为batch.size大小,生产者才会发送消息 -
LINGER_MS_CONFIG = "linger.ms":延迟时间,如果消息大小迟迟不为batch.size大小,则可以在指定的时间linger.ms后发送 -
RETRIES_CONFIG= "retry.count":重试次数,消息发送失败时,生产者可以再重试retry.count次数 -
ACKS_CONFIG= "acks":ack机制,生产者需要等待aks个副本成功写入消息后,才认为消息发送成功 -
acks一共有三个选项
- acks=0:生产者不需要等待来自服务器的任何确认。一旦消息被发送出去,生产者就认为消息已经成功发送。
- acks=1(默认值):生产者需要等待分区中的leader副本成功写入消息并返回确认后,才认为消息发送成功。
- acks=all 或 acks=-1:生产者需要等待ISR(In-Sync Replicas,同步副本集)中的所有副本都成功写入消息后,才认为消息发送成功。
-
KEY_SERIALIZER_CLASS_CONFIG:键序列化 -
VALUE_SERIALIZER_CLASS_CONFIG:值序列化
二:ProducerRecord讲解:
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {this(topic, partition, timestamp, key, value, null);
}
基本讲解:Topic: 指定消息要发送到的目标主题(topic)的名称。Partition: 可选的分区号。如果未指定分区号,则Kafka会根据键(如果有的话)或者使用默认的分区策略来决定消息被发送到哪个分区。Timestamp: 可选的时间戳。指定消息的时间戳,通常用来记录消息的产生时间Key: 消息的键(optional)。在Kafka中,消息可以有一个可选的键,用于分区(partition)消息。键通常是一个字符串或者字节数组。Value: 消息的实际内容,可以是任何序列化的数据。
异步发送的普通生产者
在异步发送模式下,生产者调用send()方法发送消息后,不会立即等待服务器的响应,而是继续执行后续操作。
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}import java.util.Properties
import scala.util.Random// kafka生产者:将数据导入kafka中
object KafkaProducer {def main(args: Array[String]): Unit = {// 生产者相关配置val producerConf = new Properties()// 设置连接kafka(集群)配置【必配】producerConf.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"single:9092")// 批处理数量(每10条处理一次)producerConf.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,"10")// 延迟时间:如果消息大小迟迟不为batch.size大小,则可以在指定的时间(50ms)后发送producerConf.setProperty(ProducerConfig.LINGER_MS_CONFIG,"50")// 消息发送失败时,生产者可以重试的次数producerConf.setProperty(ProducerConfig.RETRIES_CONFIG,"2")// ack机制:生产者需要等待1个副本成功写入消息后,才认为消息发送成功producerConf.setProperty(ProducerConfig.ACKS_CONFIG,"1")// 键值序列化producerConf.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.IntegerSerializer")producerConf.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")// 构造生产者producerval producer:KafkaProducer[Int,String] = new KafkaProducer(producerConf)// 发送消息val topic = "test02" // 指定主题(需存在)val rand = new Random()for(i<- 1 to 100000){// 封装待发送的消息val record: ProducerRecord[Int, String] = new ProducerRecord[Int, String](topic, 0, System.currentTimeMillis(), i, "test-"+i)// 向生产者发送记录producer.send(record)Thread.sleep(5+rand.nextInt(20))}// 强制将所有待发送的消息立即发送到 Kafka 集群,而不需要等待缓冲区填满或达到任何延迟阈值producer.flush()// 关闭并释放资源producer.close()}
}
异步发送的带回调函数生产者
在异步发送模式下,生产者调用send()方法发送消息后,不会立即等待服务器的响应,而是继续执行后续操作,带回调函数生产者可以在控制台中看到发送成功或失败信息。
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}import java.util.Properties
import scala.util.Randomimport java.util.Properties// kafka生产者:将数据导入kafka中
object KafkaProducer {def main(args: Array[String]): Unit = {// 生产者相关配置val producerConf = new Properties()// 设置连接kafka(集群)配置【必配】producerConf.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"single:9092")// 批处理数量(每10条处理一次)producerConf.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,"10")// 延迟时间:如果消息大小迟迟不为batch.size大小,则可以在指定的时间(50ms)后发送producerConf.setProperty(ProducerConfig.LINGER_MS_CONFIG,"50")// 消息发送失败时,生产者可以重试的次数producerConf.setProperty(ProducerConfig.RETRIES_CONFIG,"2")// ack机制:生产者在发送消息后需要等待来自服务器的确认级别(服务器给出一次确认,才算写入成功)producerConf.setProperty(ProducerConfig.ACKS_CONFIG,"1")// 键值序列化producerConf.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.IntegerSerializer")producerConf.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")// 构造生产者producerval producer:KafkaProducer[Int,String] = new KafkaProducer(producerConf)// 发送消息val topic = "test02" // 指定主题(需存在)val rand = new Random()for(i<- 1 to 100000){// 封装待发送的消息val record: ProducerRecord[Int, String] = new ProducerRecord[Int, String](topic, 0, System.currentTimeMillis(), i, "test-"+i)// 用生产者发送记录producer.send(record,new Callback {override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {if (exception == null){// 发送信息成功println("发送信息成功,topic:"+metadata.topic()+",partition:"+metadata.partition()+",offset:"+metadata.offset())}else{// 发送信息失败exception.printStackTrace()}}})Thread.sleep(5+rand.nextInt(20))}// 强制将所有待发送的消息立即发送到 Kafka 集群,而不需要等待缓冲区填满或达到任何延迟阈值producer.flush()// 关闭并释放资源producer.close()}
}
3、创建消费者(Consumer)
一:生产者相关配置讲解:
GROUP_ID_CONFIG:消费者组的标识符,有还几个消费者共用这个组进行消费同一个主题消息GROUP_INSTANCE_ID_CONFIG:消费者组中消费者实例的标识符,当前消费者自己在消费者组中的唯一标识MAX_POLL_RECORDS_CONFIG="records.count":控制每次从 Kafka 主题中拉取的最大记录数为records.countMAX_POLL_INTERVAL_MS_CONFIG:控制两次连续拉取消息之间的最大时间间隔,若消费者在此时间间隔内没有发送心跳,则会被认为死亡HEARTBEAT_INTERVAL_MS_CONFIG="time":控制消费者发送心跳的频率,每time毫秒发一次心跳ENABLE_AUTO_COMMIT_CONFIG:消费者自动提交偏移量(offset)给 KafkaAUTO_COMMIT_INTERVAL_MS_CONFIG="time":控制自动提交偏移量的频率,即time毫秒提交一次偏移量PARTITION_ASSIGNMENT_STRATEGY_CONFIG="org.apache.kafka.clients.consumer.RoundRobinAssignor":轮询分区分配策略:将所有可用分区依次分配给消费者,以平衡负载AUTO_OFFSET_RESET_DOC="latest":偏移量策略:从分区的最新偏移量开始消费
二:消费者策略(读取数据方式)
Kafka为消费者提供了三种类型的订阅消费模式:subscribe(订阅模式)、SubscribePattern(正则订阅模式)、assign(指定模式)。
-
subscribe(订阅模式):
// 订阅单个主题(test02主题) consumer.subscribe(Collections.singletonList("test02")); -
SubscribePattern(正则订阅模式):
// 使用正则表达式订阅主题 consumer.subscribe(Pattern.compile("topic-.*")); -
assign(指定模式):
// 手动分配特定的分区给消费者(test02主题) TopicPartition partition0 = new TopicPartition("test02", 0); consumer.assign(Collections.singletonList(partition0));
package cha01import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecords, KafkaConsumer}
import java.time.Duration
import java.util.{Collections, Properties}
import scala.collection.JavaConverters.iterableAsScalaIterableConverter// 消费者:用SparkStreaming来消费kafka中数据
object KafkaConsumer {def main(args: Array[String]): Unit = {// 消费者相关配置val props = new Properties()// 设置连接kafka(集群)配置【必配】props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"single:9092")// 消费者组的标识符props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"zhou-240801")// 消费者组中消费者实例的标识符(独享的)props.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,"zhou")// 控制每次从 Kafka 主题中拉取的最大记录数props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"1000")// 控制两次连续拉取消息之间的最大时间间隔,若消费者在此时间间隔内没有发送心跳,则会被认为死亡props.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,"9000") // 9000毫秒(9秒)// 控制消费者发送心跳的频率,每1秒发一次心跳props.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,"1000") // 心跳:1000毫秒(1秒)// 消费者组的最小会话超时时间props.setProperty("group.min.session.timeout.ms","15000") // 15000毫秒(15秒)// 消费者组的最大会话超时时间props.setProperty("group.max.session.timeout.ms","50000") // 50000毫秒(50秒)// 消费者的会话超时时间设置props.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"20000") // 20000毫秒(20秒)// 消费者自动提交偏移量(offset)给 Kafkaprops.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true")// 控制自动提交偏移量的频率,即多久提交一次偏移量props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"500") //500毫秒// 轮询分区分配策略:将所有可用分区依次分配给消费者,以平衡负载props.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor")// 偏移量策略:从分区的最新偏移量开始消费props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_DOC,"latest")// 键值反序列化props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.IntegerDeserializer") // 反序列化props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer") // 反序列化// 创建消费者val consumer = new KafkaConsumer[Int,String](props)// 订阅者模式val topic = "test02" // 主题consumer.subscribe(Collections.singletonList(topic))// 持续监听状态中while (true) {// 获取消息,每次获取消息的间隔为100msval records: ConsumerRecords[Int, String] = consumer.poll(Duration.ofMillis(100))for (record <- records.asScala) {println(s"topic:${record.topic()},key:${record.key()}, value:${record.value()}")}}}
}
相关文章:
Kafka实战(Scala操作)
Kafka基础讲解部分 Kafka基础讲解部分 Kafka实战(Scala操作) 1、引入依赖 版本: <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.report…...
Android Framework 之WMS详解
1.WMS说的就是 WindowManagerService:负责为Activity对应的窗口分配Surface,管理Surface的显示顺序以及位置尺寸,控制窗口动画 。 它是Android系统中为各个客户端即每个app来提供这样的服务的一个类。 在Android系统中在systemServer 进程和各…...
opencv-图像仿射变换
仿射变换设计图像位置角度的变化,是深度学习预处理中常用的功能。仿射变换就是对图像的平移缩放旋转翻转操作的组合 如下图,对图中点1,2,3与图二中三个点一一映射,仍然形成三角形,但形状已经发生改变,通过这两组三点求…...
算法的基本概念
一、算法的基本概念思维导图 二、什么是算法: 1.我们知道数据结构就是将我门现实的世界中的问题数据化,存入计算机中,并实现对数据结构的一些基本操作。 2.算法就是如何处理这些存入计算机中的信息,以求高效的解决实际问题。 3…...
124. Go Template应用实例:用代码生成代码
文章目录 生成器模式生成器代码生成 本文用生成器模式作为例子,来演示如何用代码生成代码。 生成器模式 熟悉 Java 开发的同学都知道,lombok 有一个著名的注解 Builder ,只要加在类上面,就可以自动生成 Builder 模式的代码。如下…...
【AI实践】阿里云方言文本转语音TTS
最近要做一些普通话和方言demo 找一个免费工具 免费在线文字转语音工具 | edge-tts 在线体验 (bingal.com) 还有一些方言在阿里云上找了下,基于官方demo改了一下 阿里云语音合成接口说明_智能语音交互(ISI)-阿里云帮助中心 (aliyun.com) 如何下载安装、使用语音…...
java 之 各类日期格式转换
一、前言 大家在开发过程中必不可少得和日期打交道,对接别的系统时,时间日期格式不一致,每次都要转换! 从 Java1 到 Java8 将近 20 年,再加上 Java8 的普及时间、各种历史 API 兼容过渡时间。我们很多时候需要在旧时间 API 与新时…...
Nvidia黄仁勋对话Meta扎克伯格:AI和下一代计算平台的未来 | SIGGRAPH 2024对谈回顾
在今年的SIGGRAPH图形大会上,Nvidia创始人兼CEO黄仁勋与Meta创始人马克扎克伯格进行了一场长达60分钟的对谈。这场对话不仅讨论了AI的未来发展和Meta的开源哲学,还发布了不少新产品,并深入探讨了下一代计算平台的可能性。 引言 人工智能的发…...
【JAVA设计模式】适配器模式——类适配器模式详解与案例分析
前言 在软件设计中,适配器模式(Adapter Pattern)是一种结构型设计模式,旨在使不兼容的接口能够协同工作。它通过引入一个适配器类,帮助两个接口之间进行适配,使得它们能够互相操作。本文将详细介绍适配器模…...
【Vue】全局组件和局部组件
一、全局组件 定义: 全局组件是在整个Vue应用中都可以使用的组件。它们被注册在Vue的根实例上,因此可以在任何子组件的模板中被引用,而无需在每个组件中重复注册。 注册方式: 全局组件通过Vue.component方法进行注册。这个方法接…...
react引入高德地图并初始化卫星地图
react引入高德地图并初始化卫星地图 1.安装依赖 yarn add react-amap amap/amap-jsapi-loader2.初始化地图 import AMapLoader from "amap/amap-jsapi-loader"; import { FC, useEffect, useRef, useState } from "react";const HomeRight () > {con…...
2024最简七步完成 将本地项目提交到github仓库方法
2024最简七步完成 将本地项目提交到github仓库方法 文章目录 2024最简七步完成 将本地项目提交到github仓库方法一、前言二、具体步骤1、github仓库创建2、将远程仓库拉取并合并(1)初始化本地仓库(2)本地仓库与Github仓库关联&…...
前端WebSocket入门,看这篇就够啦!!
在HTML5 的早期开发过程中,由于意识到现有的 HTTP 协议在实时通信方面的不足,开发者开始探索能够在 Web 环境下实现双向实时通信的新的通信协议,提出了 WebSocket 协议的概念。 一、什么是 WebSocket? WebSocket 是一种在单个 T…...
漏洞复现-F6-11泛微-E-Cology-SQL
本文来自无问社区,更多漏洞信息可前往查看http://www.wwlib.cn/index.php/artread/artid/15575.html 0x01 产品简介 泛微协同管理应用平台e-cology是一套企业级大型协同管理平台 0x02 漏洞概述 该漏洞是由于泛微e-cology未对用户的输入进行有效的过滤࿰…...
Turbo Boost 禁用
最近在做OAI NR的时候关闭CPU 睿频的时候出了一些问题,这里我把我找到的资料记录一下: 禁用 Turbo Boost 的过程可能会因不同的 BIOS/UEFI 和操作系统设置而有所不同。以下是一些可能的原因及解决方法: 可能的原因 BIOS/UEFI 设置问题: 你的…...
假期BUUCTF小练习3
文章目录 [极客大挑战 2019]BuyFlag[BJDCTF2020]Easy MD5[HCTF 2018]admin第一种方法 直接登录第二种方法 flack session伪造第三种方法Unicode欺骗 [MRCTF2020]你传你🐎呢[护网杯 2018]easy_tornadoSSTI注入 [ZJCTF 2019]NiZhuanSiWei [极客大挑战 2019]BuyFlag 一…...
【ubuntu系统】在虚拟机内安装Ubuntu
Ubuntu系统装机 描述新装机后的常规配置, 虚拟机使用vbox terminal 打不开 CTRL ALT F3 进入命令行模式(需要返回桌面时CTRL ALT F1)root用户登入cd /etc/default vi locale LANG“en_US” 改成 LANG“en_US.UTF-8”保存修改后&…...
Python初学者必须掌握的基础知识点
Python初学者必须掌握的基础知识点包括数据类型与变量、控制结构(条件语句和循环语句)、基本数据结构(列表、元组、字典、集合)、函数与模块、以及字符串处理等。以下是对这些基础知识点及其对应代码的详细介绍: 1. …...
ESP32是什么?
ESP32是一款由乐鑫信息科技(Espressif Systems)推出的高度集成的低功耗系统级芯片(SoC),它结合了双核处理器、无线通信、低功耗特性和丰富的外设,特别适用于各种物联网(IoT)应用。以…...
jemalloc分析内存
分析内存泄漏过程中, 由于tcmalloc不能长时间开启heap profile(会不停涨内存,导致内存爆掉).尝试换jemalloc. 交叉编译: git clone https://github.com/jemalloc/jemalloc.git./autogen.sh./configure --hostaarch64-…...
Vim 调用外部命令学习笔记
Vim 外部命令集成完全指南 文章目录 Vim 外部命令集成完全指南核心概念理解命令语法解析语法对比 常用外部命令详解文本排序与去重文本筛选与搜索高级 grep 搜索技巧文本替换与编辑字符处理高级文本处理编程语言处理其他实用命令 范围操作示例指定行范围处理复合命令示例 实用技…...
【Linux】shell脚本忽略错误继续执行
在 shell 脚本中,可以使用 set -e 命令来设置脚本在遇到错误时退出执行。如果你希望脚本忽略错误并继续执行,可以在脚本开头添加 set e 命令来取消该设置。 举例1 #!/bin/bash# 取消 set -e 的设置 set e# 执行命令,并忽略错误 rm somefile…...
脑机新手指南(八):OpenBCI_GUI:从环境搭建到数据可视化(下)
一、数据处理与分析实战 (一)实时滤波与参数调整 基础滤波操作 60Hz 工频滤波:勾选界面右侧 “60Hz” 复选框,可有效抑制电网干扰(适用于北美地区,欧洲用户可调整为 50Hz)。 平滑处理&…...
STM32+rt-thread判断是否联网
一、根据NETDEV_FLAG_INTERNET_UP位判断 static bool is_conncected(void) {struct netdev *dev RT_NULL;dev netdev_get_first_by_flags(NETDEV_FLAG_INTERNET_UP);if (dev RT_NULL){printf("wait netdev internet up...");return false;}else{printf("loc…...
【第二十一章 SDIO接口(SDIO)】
第二十一章 SDIO接口 目录 第二十一章 SDIO接口(SDIO) 1 SDIO 主要功能 2 SDIO 总线拓扑 3 SDIO 功能描述 3.1 SDIO 适配器 3.2 SDIOAHB 接口 4 卡功能描述 4.1 卡识别模式 4.2 卡复位 4.3 操作电压范围确认 4.4 卡识别过程 4.5 写数据块 4.6 读数据块 4.7 数据流…...
ElasticSearch搜索引擎之倒排索引及其底层算法
文章目录 一、搜索引擎1、什么是搜索引擎?2、搜索引擎的分类3、常用的搜索引擎4、搜索引擎的特点二、倒排索引1、简介2、为什么倒排索引不用B+树1.创建时间长,文件大。2.其次,树深,IO次数可怕。3.索引可能会失效。4.精准度差。三. 倒排索引四、算法1、Term Index的算法2、 …...
MySQL账号权限管理指南:安全创建账户与精细授权技巧
在MySQL数据库管理中,合理创建用户账号并分配精确权限是保障数据安全的核心环节。直接使用root账号进行所有操作不仅危险且难以审计操作行为。今天我们来全面解析MySQL账号创建与权限分配的专业方法。 一、为何需要创建独立账号? 最小权限原则…...
【无标题】路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论
路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论 一、传统路径模型的根本缺陷 在经典正方形路径问题中(图1): mermaid graph LR A((A)) --- B((B)) B --- C((C)) C --- D((D)) D --- A A -.- C[无直接路径] B -…...
腾讯云V3签名
想要接入腾讯云的Api,必然先按其文档计算出所要求的签名。 之前也调用过腾讯云的接口,但总是卡在签名这一步,最后放弃选择SDK,这次终于自己代码实现。 可能腾讯云翻新了接口文档,现在阅读起来,清晰了很多&…...
WEB3全栈开发——面试专业技能点P7前端与链上集成
一、Next.js技术栈 ✅ 概念介绍 Next.js 是一个基于 React 的 服务端渲染(SSR)与静态网站生成(SSG) 框架,由 Vercel 开发。它简化了构建生产级 React 应用的过程,并内置了很多特性: ✅ 文件系…...
