Kafka命令行的使用/Spark-Streaming核心编程(二)
Kafka命令行的使用
创建topic
kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --topic test1 --partitions 3 --replication-factor 3
分区数量,副本数量,都是必须的。
数据的形式:
主题名称-分区编号。
在Kafka的数据目录下查看。
设定副本数量,不能大于broker的数量。
2.2查看所有的topic
kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181
2.3查看某个topic的详细信息
kafka-topics.sh --describe --zookeeper node01:2181,node02:2181,node03:2181 --topic test1
ISR: In-Sync Replicas 可以提供服务的副本。
AR = ISR + OSR
2.4删除topic
kafka-topics.sh --delete --zookeeper node01:2181,node02:2181,node03:2181 --topic test1
2.5生产数据
kafka-console-producer.sh:
指定broker
指定topic
写数据的命令:
kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test1
Spark-Streaming核心编程(二)
- 需求:通过 SparkStreaming 从 Kafka 读取数据,并将读取过来的数据做简单计算,最终打印到控制台。
- 导入依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.0.0</version>
</dependency>
- 编写代码
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
object DirectAPI {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("direct")
val ssc = new StreamingContext(sparkConf, Seconds(3))
// 定义 Kafka 相关参数
val kafkaPara: Map[String, Object] = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "node01:9092,node02:9092,node03:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "kafka",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer]
)
// 通过读取 Kafka 数据,创建 DStream
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set("kafka"), kafkaPara)
)
// 提取出数据中的 value 部分
val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())
// wordCount 计算逻辑
valueDStream.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.print()
ssc.start()
ssc.awaitTermination()
}
}
- 开启Kafka集群
- 开启Kafka生产者,产生数据
kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic kafka
- 运行程序,接收Kafka生产的数据并进行相应处理
8)查看消费进度
kafka-consumer-groups.sh --describe --bootstrap-server node01:9092,node02:9092,node03:9092 --group kafka
相关文章:

Kafka命令行的使用/Spark-Streaming核心编程(二)
Kafka命令行的使用 创建topic kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --topic test1 --partitions 3 --replication-factor 3 分区数量,副本数量,都是必须的。 数据的形式: 主题名称-分区编号。 在…...

2020-06-23 暑期学习日更计划(机器学习入门之路(资源汇总)+概率论)
机器学习入门 前言 说实话,机器学习想学好真心不易,很多时候都感觉自己学得云里雾里。以前一段时间自己为了完成毕业设计,在机器学习的理论部分并没有深究,仅仅通过TensorFlow框架力求快速实现模型。现在来看,很多时候…...

SQL 时间转换的CONVERT()函数应用说明
目录 1.常用查询使用的几个 2.其他总结 1.常用查询使用的几个 SELECT CONVERT(VARCHAR, GETDATE(), 112) SELECT CONVERT(VARCHAR, GETDATE(), 113)SELECT CONVERT(VARCHAR, GETDATE()-1, 112) SELECT CONVERT(VARCHAR, GETDATE()-1, 113) 2.其他总结 SELECT CONVERT(VARCHA…...
高企复审奖补!2025年合肥市高新技术企业重新认定奖励补贴政策及申报条件
一、合肥市高新技术企业重新认定奖励补贴政策 (一)高新区高新技术企业重新认定复审补贴奖励 重新认定为国家高新技术企业的给予5万元一次性奖励。 (二)经开区高新技术企业重新认定复审补贴奖励 对重新认定的企业,给…...
手机端本地服务与后端微服务的技术差异
以下是手机内部本地服务与后端微服务架构及通信协议的对比分析,结合两者的核心设计差异与技术实现特点展开: 一、架构设计对比 维度手机端本地服务后端微服务核心目标资源效率、离线优先、动态更新高并发处理、分布式事务、服务治理服务拆分粒度按功能…...

SystemWeaver详解:从入门到精通的深度实战指南
SystemWeaver详解:从入门到精通的深度实战指南 文章目录 SystemWeaver详解:从入门到精通的深度实战指南一、SystemWeaver环境搭建与基础配置1.1 多平台安装全流程 二、新手必学的十大核心操作2.1 项目创建全流程2.2 建模工具箱深度解析 三、需求工程与系…...
高光谱相机在工业检测中的应用:LED屏检、PCB板缺陷检测
随着工业检测精度要求的不断提升,传统机器视觉技术逐渐暴露出对非可见光物质特性识别不足、复杂缺陷检出率低等局限性。高光谱相机凭借其独特的光谱分析能力,为工业检测提供了革命性的解决方案。以下结合中达瑞和VIX系列推扫式高光谱相机的技术特点与实际…...

windows中kafka4.0集群搭建
参考文献 Apache Kafka windows启动kafka4.0(不再需要zookeeper)_kafka压缩包-CSDN博客 Kafka 4.0 KRaft集群部署_kafka4.0集群部署-CSDN博客 正文 注意jdk需要17版本以上的 修改D:\software\kafka_2.13-4.0.0\node1\config\server.properties配置文…...
Oracle Linux8 安装 MySQL 8.4.3,搭建一主一从
文章目录 安装依赖获取安装包解压准备相关目录设置配置文件启动数据库连接数据库socket 文件优化同样方法准备 3307 数据库实例设置配置文件启动 3307 实例数据库连接并查看 3307 数据库实例基于 bin log 搭建主从模式 安装依赖 yum install -y numactl libaio ncurses-compat…...

【JavaWeb后端开发04】java操作数据库(JDBC + Mybatis+ yml格式)详解
文章目录 1. 前言2. JDBC2.1 介绍2.2 入门程序2.2.1 DataGrip2.2.2 在IDEA执行sql语句 2.3 查询数据案例2.3.1 需求2.3.2 准备工作2.3.3 AI代码实现2.3.4 代码剖析2.3.4.1 ResultSet2.3.4.2 预编译SQL2.3.4.2.1 SQL注入2.3.4.2.2 SQL注入解决2.3.4.2.3 性能更高 2.4 增删改数据…...

postman 删除注销账号
一、删除账号 1.右上角找到 头像,view profile https://123456-6586950.postman.co/settings/me/account 二、找回账号 1.查看日志所在位置 三、postman更新后只剩下history 在 Postman 中,如果你发现更新后只剩下 History(历史记录&…...

Java发展史及版本详细说明
Java发展史及版本详细说明 1. Java 1.0(1996年1月23日) 核心功能: 首个正式版本,支持面向对象编程、垃圾回收、网络编程。包含基础类库(java.lang、java.io、java.awt)。支持Applet(浏览器嵌入…...

React 5 种组件提取思路与实践
在开发时,经常遇到一些高度重复但略有差异的 UI 模式,此时我们当然会把组件提取出去,但是组件提取的方式有很多,怎么根据不同场景选取合适的方式呢?尤其时在复杂的业务场景中,组件提取的思路影响着着代码的可维护性、可读性以及扩展性。本文将以一个[详情]组件为例,探讨…...

[java八股文][Java基础面试篇]I/O
Java怎么实现网络IO高并发编程? 可以用 Java NIO ,是一种同步非阻塞的I/O模型,也是I/O多路复用的基础。 传统的BIO里面socket.read(),如果TCP RecvBuffer里没有数据,函数会一直阻塞,直到收到数据…...

数据结构-冒泡排序(Python)
目录 冒泡排序算法思想 冒泡排序算法步骤 冒泡排序代码实现 冒泡排序算法分析 冒泡排序算法思想 冒泡排序(Bubble Sort)基本思想: 经过多次迭代,通过相邻元素之间的比较与交换,使值较小的元素逐步从后面移到前面…...
Java单例模式详解:实现线程安全的全局访问点
精心整理了最新的面试资料和简历模板,有需要的可以自行获取 点击前往百度网盘获取 点击前往夸克网盘获取 一、什么是单例模式? 单例模式(Singleton Pattern)是一种创建型设计模式,它保证一个类仅有一个实例ÿ…...
React-组件和props
1、类组件 import React from react; class ClassApp extends React.Component {constructor(props) {super(props);this.state{};}render() {return (<div><h1>这是一个类组件</h1><p>接收父组件传过来的值:{this.props.name}</p>&…...
Java面试:从Spring Boot到微服务的全面考核
Java面试:从Spring Boot到微服务的全面考核 场景设定: 在一家互联网大厂的面试室内,严肃的面试官正准备开始对前来面试的赵大宝进行技术考核。赵大宝是一位自称在Java开发方面经验丰富的求职者,不过却是个搞笑的水货程序员。 第…...

深入理解React高阶组件(HOC):原理、实现与应用实践
组件复用的艺术 在React应用开发中,随着项目规模的增长,组件逻辑的复用变得越来越重要。传统的组件复用方式如组件组合和props传递在某些复杂场景下显得力不从心。高阶组件(Higher-Order Component,简称HOC)作为React中…...

Neo4j社区版在win下安装教程(非docker环境)
要在 Windows 10 上安装 Neo4j 社区版数据库并且不使用 Docker Desktop,你可以按照以下步骤操作: 1. 安装 Java Development Kit (JDK) Neo4j 需要 Java 运行环境。推荐安装 JDK 17 或 JDK 11(请根据你下载的 Neo4j 版本查看具体的兼容性要…...
【AI 加持下的 Python 编程实战 2_10】DIY 拓展:从扫雷小游戏开发再探问题分解与 AI 代码调试能力(中)
文章目录 DIY 实战:从扫雷小游戏开发再探问题分解能力3 问题分解实战(自顶向下)3.2 页面渲染逻辑3.3 事件绑定逻辑 4 代码实现(自底向上)4.1 页面渲染部分4.2 事件绑定部分 写在前面 本篇将利用《Learn AI-assisted Py…...
使用PHP对接印度尼西亚股票市场
在本篇文章中,我们将介绍如何使用PHP语言与StockTV API接口对接,获取并处理印度尼西亚(Indonesia)的股票市场数据。我们将以查询IPO信息和查看涨跌排行榜为例,展示具体的操作流程。 准备工作 首先,确保您…...

如何在 Odoo 18 中配置自动化动作
如何在 Odoo 18 中配置自动化动作 Odoo是一款多功能的业务管理平台,旨在帮助各种规模的企业更高效地处理日常运营。凭借其涵盖销售、库存、客户关系管理(CRM)、会计和人力资源等领域的多样化模块,Odoo 简化了业务流程,…...

node.js 实战——(Http 知识点学习)
HTTP 又称为超文本传输协议 是一种基于TCP/IP的应用层通信协议;这个协议详细规定了 浏览器 和万维网 服务器 之间互相通信的规则。协议中主要规定了两个方面的内容: 客户端:用来向服务器发送数据,可以被称之为请求报文服务端&am…...

新市场环境下新能源汽车电流传感技术发展前瞻
新能源革命重构产业格局 在全球碳中和战略驱动下,新能源汽车产业正经历结构性变革。国际清洁交通委员会(ICCT)最新报告显示,2023年全球新能源汽车渗透率突破18%,中国市场以42%的市占率持续领跑。这种产业变革正沿着&q…...
系统重装——联想sharkbay主板电脑
上周给一台老电脑重装系统系统,型号是lenovo sharkbay主板的电脑,趁着最近固态便宜,入手了两块长城的固态,装上以后插上启动U盘,死活进不去boot系统。提示 bootmgr 缺失,上网查了许久,终于解决了…...
CentOS 7.9升级OpenSSH到9.9p2
初始版本 ssh -V OpenSSH_7.4p1, OpenSSL 1.0.2k-fips 26 Jan 2017 1.安装编译依赖 yum install -y gcc perl make zlib-devel pam-devel openssl-devel wget 2.升级OpenSSL到1.1.1版本 2.1 备份当前OpenSSL配置 sudo cp -r /usr/bin/openssl /usr/bin/openssl.bak sudo …...

fastjson使用parseObject转换成JSONObject出现将字符特殊字符解析解决
现象:将字符串的${TARGET_VALUE}转换成NULL字符串了问题代码: import com.alibaba.fastjson.JSON;JSONObject config JSON.parseObject(o.toString()); 解决方法: 1.更换fastjson版本 import com.alibaba.fastjson2.JSON;或者使用其他JS…...
37、aiomysql实操习题
练习题1:慢查询优化 题目描述 将以下低效查询优化为索引查询: # 原始低效查询 await cursor.execute("SELECT * FROM orders WHERE YEAR(created_at)2023")参考答案 # 优化后查询(使用索引范围扫描) await cursor.e…...
Rust 2025:内存安全革命与异步编程新纪元
Rust 2025 Edition通过区域内存管理、泛型关联类型和零成本异步框架三大革新,重新定义系统级编程语言的能力边界。本次升级不仅将内存安全验证效率提升80%,更通过异步执行器架构优化实现微秒级任务切换。本文从编译器原理、运行时机制、编程范式转型三个…...