Kafka 快速上手:安装部署与 HelloWorld 实践(二)
四、Kafka 的 HelloWorld 实践
完成 Kafka 的安装部署后,我们就可以进行一些简单的操作来体验 Kafka 的功能了。下面通过一个 HelloWorld 示例,展示如何在 Kafka 中创建主题、发送消息和消费消息。
(一)创建主题(Topic)
在 Kafka 中,主题(Topic)是消息的逻辑分类,生产者将消息发送到特定的主题,消费者则从主题中订阅并消费消息 。可以将主题看作是一个消息的容器,不同的主题可以用来区分不同类型的消息。例如,在一个电商系统中,可以创建 “order - topic” 主题来存储订单相关的消息,“user - behavior - topic” 主题来存储用户行为相关的消息等。
使用 Kafka 提供的命令行工具kafka - topics.sh来创建主题。以下是创建一个名为 “test - topic” 的主题的命令:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test-topic
- --bootstrap - server:指定 Kafka broker 的地址和端口,这里是 “localhost:9092”,表示本地运行的 Kafka broker。
- --replication - factor:指定主题的副本因子,即每个分区的副本数量。这里设置为 1,表示每个分区只有一个副本。在生产环境中,通常会设置为大于 1 的值,以提高数据的可靠性和容错性 。例如设置为 3,意味着每个分区会有 3 个副本分布在不同的 broker 上,当其中一个 broker 出现故障时,其他副本可以继续提供服务。
- --partitions:指定主题的分区数。这里设置为 1,表示该主题只有一个分区。分区是主题的物理划分,通过增加分区数可以提高 Kafka 的并行处理能力和吞吐量。例如,在处理大量消息时,可以将分区数设置为多个,每个分区可以由不同的消费者进行并行消费。
- --topic:指定要创建的主题名称,这里是 “test - topic”。
执行上述命令后,如果看到 “Created topic test - topic” 的提示信息,说明主题创建成功。
(二)发送消息
接下来,我们使用 Kafka 的命令行工具kafka - console - producer.sh向刚刚创建的 “test - topic” 主题发送消息。该工具允许我们在命令行中输入消息,并将其发送到指定的主题。
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
执行上述命令后,命令行将等待你输入消息。输入一条消息,例如 “Hello, Kafka!”,然后按下回车键,这条消息就会被发送到 “test - topic” 主题中。你可以继续输入更多的消息,每输入一条消息并回车,该消息就会被发送到主题。例如,再输入 “这是我的第一条 Kafka 消息”,同样会被发送到 “test - topic” 主题。
(三)消费消息
发送消息后,我们可以使用kafka - console - consumer.sh工具从 “test - topic” 主题中消费消息。该工具会从主题中拉取消息并在命令行中显示出来。
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning
- --from - beginning:表示从主题的最早消息开始消费。如果不加上这个参数,默认只会消费从当前时刻起新产生的消息,之前发送的消息将不会被消费到 。例如,如果你先发送了 5 条消息,然后在消费时没有使用 “--from - beginning” 参数,那么只能消费到之后新发送的消息,之前的 5 条消息就会被错过。
执行上述命令后,你将看到之前发送到 “test - topic” 主题的所有消息依次显示在命令行中,按照发送的顺序排列。例如,会先显示 “Hello, Kafka!”,再显示 “这是我的第一条 Kafka 消息”。这样,我们就完成了一个简单的 Kafka HelloWorld 实践,通过创建主题、发送消息和消费消息,初步体验了 Kafka 的消息发布和订阅功能 。
五、常见问题及解决方法
在 Kafka 的安装部署和 HelloWorld 实践过程中,可能会遇到一些问题,下面为你列举一些常见问题及对应的解决方法:
5.1 端口冲突
问题描述:启动 Kafka 或 ZooKeeper 时,提示端口已被占用,例如启动 Kafka 时出现 “Address already in use: 9092” 错误,说明 9092 端口已被其他程序占用 ;启动 ZooKeeper 时出现 “Address already in use: 2181” 错误,说明 2181 端口被占用。
解决方法:
- 使用命令查看占用端口的进程。在 Linux 系统中,可以使用netstat -tuln | grep 端口号命令,例如查看 9092 端口的占用情况:netstat -tuln | grep 9092 ,该命令会列出所有使用 9092 端口的进程信息,其中 “LISTEN” 状态表示该端口正在被监听。也可以使用lsof -i :端口号命令,如lsof -i :9092 ,它会更详细地显示占用端口的进程相关信息,包括进程 ID(PID)、进程名称等。
- 根据查找到的进程 ID,使用kill -9 进程ID命令终止占用端口的进程。例如,如果查找到 9092 端口被进程 ID 为 1234 的进程占用,那么可以执行kill -9 1234命令来终止该进程。
- 如果不想终止占用端口的进程,也可以修改 Kafka 或 ZooKeeper 的配置文件,将其监听端口修改为其他未被占用的端口。在server.properties文件中,修改listeners参数的值,如listeners=PLAINTEXT://localhost:9093 ,将 Kafka 的监听端口改为 9093;在zookeeper.properties文件中,修改clientPort参数的值,如clientPort=2182 ,将 ZooKeeper 的监听端口改为 2182。修改完成后,重新启动 Kafka 和 ZooKeeper。
5.2 JDK 环境配置错误
问题描述:启动 Kafka 或执行相关命令时,提示找不到 Java 命令或 Java 环境配置错误,例如出现 “kafka-run-class.sh: line 271: /usr/local/java/jdk/bin/java: No such file or directory” 错误。
解决方法:
- 检查 JDK 是否正确安装。可以在命令行中输入java -version命令,如果显示 Java 的版本信息,说明 JDK 已安装;如果提示 “command not found”,则说明 JDK 未正确安装或环境变量未配置。
- 确认 JDK 环境变量的配置是否正确。在 Linux 系统中,需要检查/etc/profile文件或用户主目录下的.bashrc文件中关于 JDK 环境变量的配置。例如,在/etc/profile文件中,JDK 环境变量的配置通常如下:
export JAVA_HOME=/usr/local/java/jdk
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
其中,JAVA_HOME变量的值应指向 JDK 的安装目录。修改完配置文件后,需要执行source /etc/profile(或source ~/.bashrc)命令使配置生效。
3. 如果 JDK 安装目录发生了变化,需要相应地修改环境变量中的JAVA_HOME值。
4. 检查系统的PATH环境变量中是否包含 JDK 的bin目录。可以使用echo $PATH命令查看PATH变量的值,确保其中包含 JDK 的bin目录路径,如/usr/local/java/jdk/bin 。如果没有包含,可以按照上述步骤在配置文件中添加。
5.3 Kafka 配置文件错误
问题描述:启动 Kafka 时,出现各种与配置相关的错误,例如 “Invalid value for configuration log.dirs: /data/kafka-logs does not exist”,提示日志目录不存在;或者 “Unknown server configuration property 'xxx'”,提示配置文件中存在未知的配置属性。
解决方法:
- 对于日志目录不存在的问题,需要根据错误提示,检查server.properties文件中log.dirs参数指定的目录是否真实存在。如果不存在,需要手动创建该目录,并赋予适当的权限。例如,如果log.dirs=/data/kafka-logs ,可以使用以下命令创建目录并设置权限:
sudo mkdir -p /data/kafka-logs
sudo chown -R $USER:$USER /data/kafka-logs
- 对于未知配置属性的问题,仔细检查server.properties文件,确认是否存在拼写错误或过时的配置项。Kafka 的配置文件中可能会有一些注释掉的示例配置项,在修改配置时要注意不要误启用了错误的配置。同时,可以参考 Kafka 的官方文档,查看正确的配置属性及其用法。如果是因为 Kafka 版本更新导致某些配置项发生变化,需要根据新版本的要求调整配置文件。
- 如果在配置zookeeper.connect参数时出现连接错误,例如 “Connection refused”,需要检查 ZooKeeper 的地址和端口是否正确,以及 ZooKeeper 服务是否已经正常启动。确保zookeeper.connect参数的值与 ZooKeeper 的实际配置一致,如zookeeper.connect=localhost:2181 。如果 ZooKeeper 部署在远程服务器上,还需要检查网络连接是否正常。
5.4 消费者无法消费消息
问题描述:使用kafka - console - consumer.sh消费消息时,无法接收到生产者发送的消息,或者提示 “Failed to find any more messages” 等错误。
解决方法:
- 检查消费者命令的参数是否正确。确保--bootstrap - server参数指定的 Kafka broker 地址和端口正确,--topic参数指定的主题名称与生产者发送消息的主题一致。例如,如果 Kafka broker 的地址是 “localhost:9092”,主题是 “test - topic”,那么消费者命令应该是bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning 。
- 确认消费者是否从正确的位置开始消费消息。如果没有使用--from - beginning参数,消费者默认只会消费从当前时刻起新产生的消息。如果之前已经发送了消息,而你希望消费这些历史消息,就需要加上--from - beginning参数。
- 检查生产者是否成功发送了消息。可以查看生产者的日志输出,确认消息是否被正确发送到 Kafka。如果生产者发送消息时出现错误,如网络问题、权限问题等,也会导致消费者无法消费到消息。
- 检查 Kafka 的主题配置和分区情况。确保主题的分区数和副本因子配置合理,没有出现分区丢失或损坏的情况。可以使用kafka - topics.sh命令查看主题的详细信息,如bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test-topic ,该命令会显示主题的分区、副本等信息。如果发现分区或副本存在问题,需要根据具体情况进行修复或重新创建主题。
六、总结与展望
通过本文的学习,我们从 Kafka 的基本概念和优势入手,深入了解了它在大数据和分布式系统领域的重要地位。随后,我们详细地完成了 Kafka 在 Linux 系统下的安装部署过程,包括安装前的准备工作、解压安装包、配置 ZooKeeper 和 Kafka 以及启动 Kafka 服务。在 HelloWorld 实践环节,我们学会了如何使用 Kafka 的命令行工具创建主题、发送消息和消费消息,初步体验了 Kafka 的消息发布和订阅功能 。同时,针对安装部署和实践过程中可能遇到的常见问题,我们也提供了相应的解决方法,帮助大家顺利解决问题。
然而,Kafka 的强大之处远不止于此。在掌握了基本的安装部署和使用方法后,你可以进一步探索 Kafka 的高级特性,如分区策略、副本机制、事务处理、消息压缩等,这些特性能够帮助你更好地应对复杂的业务场景和高并发、高可靠性的需求 。例如,在处理海量数据时,可以通过合理调整分区策略和副本机制,提高数据处理的效率和可靠性;在涉及多个操作的事务场景中,事务处理特性能够保证数据的一致性。
此外,Kafka 在实际项目中常与其他大数据工具和框架结合使用,如 Spark Streaming、Flink、Hadoop 等。通过学习这些组合使用的案例和实践,你可以构建出更加完善和高效的大数据处理和分析系统 。例如,将 Kafka 作为数据源,为 Spark Streaming 或 Flink 提供实时数据流,进行实时的数据处理和分析;将处理后的数据存储到 Hadoop 的 HDFS 中,进行进一步的存储和分析。
希望大家能够通过本文的学习,对 Kafka 产生浓厚的兴趣,并在后续的学习和实践中不断深入探索,将 Kafka 应用到实际项目中,提升自己的技术能力和解决问题的能力 。如果你在学习过程中有任何疑问或心得,欢迎在评论区留言交流,让我们一起共同进步!
相关文章:
Kafka 快速上手:安装部署与 HelloWorld 实践(二)
四、Kafka 的 HelloWorld 实践 完成 Kafka 的安装部署后,我们就可以进行一些简单的操作来体验 Kafka 的功能了。下面通过一个 HelloWorld 示例,展示如何在 Kafka 中创建主题、发送消息和消费消息。 (一)创建主题(Top…...

opencv学习笔记2:卷积、均值滤波、中值滤波
目录 一、卷积概念 1.定义 2.数学原理 3.实例计算 (1) 输入与卷积核 (2)计算输出 g(2,2) 4.作用 二、针对图像噪声的滤波技术——均值滤波 1.均值滤波概念 (1)均值滤波作用 (2&#…...

在 Android Studio 中使用 GitLab 添加图片到 README.md
1. 将图片文件添加到项目中 在项目根目录下创建一个 images 或 assets 文件夹 将你的图片文件(如 screenshot.png)复制到这个文件夹中 2. 跟提交项目一样,提交图片到 GitLab 在 Android Studio 的 Git 工具窗口中: 右键点击图片…...

HarmonyOS:如何在启动框架中初始化HMRouter
应用启动时通常需要执行一系列初始化启动任务,如果将启动任务都放在应用主模块(即entry类型的Module)的UIAbility组件的onCreate生命周期中,那么只能在主线程中依次执行,不但影响应用的启动速度,而且当启动…...
Ubuntu下有关UDP网络通信的指令
1、查看防火墙状态: sudo ufw status # Ubuntu 2、 检查系统全局广播设置 # 查看是否忽略广播包(0表示接收,1表示忽略) sysctl net.ipv4.icmp_echo_ignore_broadcasts# 查看是否允许广播转发(1表示允许)…...
JavaWeb预习(jdbc)
基础 1.驱动程序接口Driver 每种数据库都提供了数据库驱动程序,并且都提供了一个实现java.sql.Driver接口的类,称为Driver 对于MySql,其Driver类为com.mysql.jdbc.Driver,加载该类的语句为: Class.forName("c…...

Web3 借贷与清算机制全解析:链上金融的运行逻辑
Web3 借贷与清算机制全解析:链上金融的运行逻辑 超额抵押借款 例如,借款人用ETH为抵押借入DAI;借款人的ETH的价值一定是要超过DAI的价值;借款人可以任意自由的使用自己借出的DAI 稳定币 第一步:借款人需要去提供一定…...

【Vue3】(三)vue3中的pinia状态管理、组件通信
目录 一、vue3的pinia 二、【props】传参 三、【自定义事件】传参 四、【mitt】传参 五、【v-model】传参(平常基本不写) 六、【$attrs】传参 七、【$refs和$parent】传参 八、provide和inject 一、vue3的pinia 1、什么是pinia? pinia …...
ingress-nginx 开启 Prometheus 监控 + Grafana 查看指标
环境已经部署了 ingress-nginx(DaemonSet 方式),并且 Prometheus Grafana 也已经运行。但之前 /metrics 端点没有暴露 Nginx 核心指标(如 nginx_ingress_controller_requests_total),经过调整后现在可以正…...
SQL进阶之旅 Day 21:临时表与内存表应用
【SQL进阶之旅 Day 21】临时表与内存表应用 文章简述 在SQL开发过程中,面对复杂查询、数据预处理和性能优化时,临时表和内存表是不可或缺的工具。本文深入讲解了临时表(Temporary Table)和内存表(Memory Table&#x…...

Jenkins自动化部署Maven项目
Jenkins自动化部署Maven项目 一、环境准备(Prerequisites) SpringBoot项目 确保项目根目录有标准Maven结构(pom.xml)且包含Dockerfile: # Dockerfile 示例 FROM openjdk:11-jre-slim VOLUME /tmp ARG JAR_FILE=target/*.jar COPY ${JAR_FILE} app.jar ENTRYPOINT ["j…...

LeetCode 高频 SQL 50 题(基础版)之 【高级字符串函数 / 正则表达式 / 子句】· 上
题目:1667. 修复表中的名字 题解: select user_id, concat(upper(left(name,1)),lower(right(name,length(name)-1))) name from Users order by user_id题目:1527. 患某种疾病的患者 题解: select * from Patients where con…...

Python 中 Django 中间件:原理、方法与实战应用
在 Python 的 Web 开发领域,Django 框架凭借其高效、便捷和功能丰富的特点备受开发者青睐。而 Django 中间件作为 Django 框架的重要组成部分,犹如 Web 应用的 “交通枢纽”,能够在请求与响应的处理流程中,实现对请求和响应的拦截…...

深入浅出玩转物联网时间同步:基于BC260Y的NTP实验与嵌入式仿真教学革命
在万物互联的时代,精准的时间戳是物联网系统的神经节拍器,而NTP协议正是维持这一节律的核心技术。 一、时间同步:物联网世界的隐形基石 在智慧城市、工业4.0等场景中,分散的设备需要毫秒级的时间协同。网络时间协议(N…...
数学建模期末速成 主成分分析的基本步骤
设有 n n n个研究对象, m m m个指标变量 x 1 , x 2 , ⋯ , x m x_1,x_2,\cdots,x_m x1,x2,⋯,xm,第 i i i个对象关于第 j j j个指标取值为 a i j a_{ij} aij,构造数据矩阵 A ( a i j ) n m A\left(\begin{array}{c}a_{ij}\end{array}\right)_{…...
视频音频去掉开头结尾 视频去掉前n秒后n秒 电视剧去掉开头歌曲
视频音频去掉开头结尾 视频去掉前n秒后n秒 视频音频去掉开头结尾 视频去掉前n秒后n秒 电视剧去掉开头歌曲 如果你有一些视频或者音频,你想去掉开头或结尾的几秒钟,那么你可以尝试一下这个工具,首先,我们来看一下,我们以…...

【在线五子棋对战】二、websocket 服务器搭建
文章目录 Ⅰ. WebSocket1、简介2、特点3、原理解析4、报文格式 Ⅱ. WebSocketpp1、认识2、常用接口3、websocketpp库搭建服务器搭建流程主体框架填充回调函数细节 4、编写 makefile 文件5、websocket客户端 Ⅰ. WebSocket 1、简介 WebSocket 是从 HTML5 开始支持的一种网页端…...

C++课设:从零开始打造影院订票系统
名人说:路漫漫其修远兮,吾将上下而求索。—— 屈原《离骚》 创作者:Code_流苏(CSDN)(一个喜欢古诗词和编程的Coder😊) 专栏介绍:《编程项目实战》 目录 一、项目背景与需求分析二、系统架构设计…...

【计算机网络】数据链路层-滑动窗口协议
数据链路层滑动窗口协议 1. 三种协议对比表 特性停止-等待协议GBN协议SR协议窗口大小发送 1,接收 1发送 W (1<W≤2ⁿ-1),接收 1发送 C,接收 R确认方式单个确认累积确认选择性确认重传策略超时重传回退N帧重传选择性重传接收缓冲区…...

在linux系统上,如何安装Elasticsearch?
1.问题描述 当尝试连接时报错,报错内容为: elastic_transport.ConnectionError: Connection error caused by: ConnectionError(Connection error caused by: NewConnectionError(<urllib3.connection.HTTPConnection object at 0x7fd808b179d0>:…...

wpf Behaviors库实现支持多选操作进行后台绑定数据的ListView
<ListView ItemsSource"{Binding SchemeItems}" SelectionMode"Extended" VerticalAlignment"Stretch" HorizontalAlignment"Stretch"><ListView.ContextMenu><ContextMenu><MenuItem Header"删除" …...
【HarmonyOS 5】拍摄美化开发实践介绍以及详细案例
以下是 HarmonyOS 5 拍摄美化功能的简洁介绍,整合核心能力与技术亮点: 一、AI 影像创新 AI 魔法移图 系统级图像分层技术实现人物/物体自由拖拽、缩放与复制,突破传统构图限制。自动分离主体与背景,一键生成错位创意照&…...

《Vuejs设计与实现》第 8 章(挂载与更新)
目录 8.1 挂载子节点与属性 8.2 HTML Attributes 与 DOM Properties 8.3 设置元素属性的正确方式 8.4 处理 class 属性 8.5 卸载操作 8.6 区分 vnode 类型 8.7 事件处理优化 8.8 事件冒泡与更新时机问题 8.9 子节点的更新 8.10 文本节点和注释节点 8.11 片段…...

Ubuntu20.04中 Redis 的安装和配置
Ubuntu20.04 中 Redis 的安装和配置 Ubuntu 安装 MySQL 及其配置 1. Redis 的安装 更新系统包列表并安装 Redis : # 更新包管理工具 sudo apt update# -y:自动确认所有提示(非交互式安装) sudo apt install -y redis-server测…...
从游戏到自动驾驶:互联网时代强化学习如何让机器学会自主决策?
一、为什么机器需要“试错学习”?——强化学习的核心秘密 你有没有玩过《超级马里奥》?当你操控马里奥躲避乌龟、跳过悬崖时,其实就在用一种“试错”的方法学习最优路径。强化学习(Reinforcement Learning, RL)就是让…...

实验四:图像灰度处理
实验四 图像处理实验报告 目录 实验目的实验内容 原理描述Verilog HDL设计源代码Testbench仿真代码及仿真结果XDC文件配置下板测试 实验体会实验照片 实验目的 在实验三的基础上,将图片显示在显示器上,并进行灰度处理。 实验内容 原理描述 1. 图片的…...
asp.net mvc如何简化控制器逻辑
在ASP.NET MVC中,可以通过以下方法简化控制器逻辑: ASP.NET——MVC编程_aspnet mvc-CSDN博客 .NET/ASP.NET MVC Controller 控制器(IController控制器的创建过程) https://cloud.tencent.com/developer/article/1015115 【转载…...

解析“与此站点的连接不安全”警告:成因与应对策略
一、技术本质:SSL/TLS协议的信任链断裂 现代浏览器通过SSL/TLS协议建立加密通信,其核心在于证书颁发机构(CA)构建的信任链。当用户访问网站时,浏览器会验证服务器证书的有效性,包括: 证书链完…...
PyCharm和VS Code哪个更适合初学者
对于 Python 初学者来说,选择 VS Code 还是 PyCharm 取决于你的具体需求和使用场景。以下是两者的详细对比和推荐建议: VS Code 优点: 轻量级:启动速度快,占用资源少,适合在低端设备上运行。高度可定制&am…...

⚡️ Linux Docker 基本命令参数详解
🐳 Linux Docker 基本命令参数详解 📘 1. Docker 简介 Docker 是一个开源的容器化平台,它通过将应用及其依赖打包到一个轻量级、可移植的容器中,从而实现跨平台运行。Docker 采用 C/S 架构,服务端称为 Docker Daemon&a…...