2.部署kafka:9092
官方文档:http://kafka.apache.org/documentation.html
(虽然kafka中集成了zookeeper,但还是建议使用独立的zk集群)
Kafka3台集群搭建环境:
操作系统: centos7
防火墙:全关
3台zookeeper集群内的机器,1台logstash
软件版本: zookeeper-3.4.12.tar.gz
软件版本kafka_2.12-2.1.0.tgz
安装软件
(3台zookeeper集群的机器)
# tar xf kafka_2.12-2.1.0.tgz -C /usr/local/
# ln -s /usr/local/kafka_2.12-2.1.0/ /usr/local/kafka
创建数据目录(3台)
# mkdir /data/kafka-logs
修改第一台配置文件
(注意不同颜色标记的部分)
# egrep -v "^$|^#" /usr/local/kafka/config/server.properties
broker.id=1 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
listeners=PLAINTEXT://192.168.148.141:9092 #监听套接字
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
log.dirs=/data/kafka-logs #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数
#如果配置多个目录,新创建的topic把消息持久化在分区数最少那一个目录中
num.partitions=1 #默认的分区数,一个topic默认1个分区数
num.recovery.threads.per.data.dir=1 #在启动时恢复日志和关闭时刷新日志时每个数据目录的线程的数量,默认1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本数
replica.fetch.max.bytes=5242880 #取消息的最大字节数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间,到目录查看是否有过期的消息如果有,删除
zookeeper.connect=192.168.148.141:2181,192.168.148.142:2181,192.168.148.143:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
修改另外两台配置文件
#scp /usr/local/kafka/config/server.properties kafka-2:/usr/local/kafka/config/
broker.id=2
listeners=PLAINTEXT://192.168.148.142:9092
# scp /usr/local/kafka/config/server.properties kafka-3:/usr/local/kafka/config/
broker.id=3
listeners=PLAINTEXT://192.168.148.143:9092
启动kafka(3台)
[root@host1 ~]# /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
查看启动情况(3台)
[root@host1 ~]# jps
10754 QuorumPeerMain
11911 Kafka
12287 Jps
创建topic来验证
[root@host1 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.148.143:2181 --replication-factor 2 --partitions 1 --topic cien
出现Created topic "cien"验证成功运行
在一台服务器上创建一个发布者
[root@host2 ~]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.148.141:9092 --topic cien
> hello kafka
> ni hao ya
>
在另一台服务器上创建一个订阅者
[root@host3 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.148.142:9092 --topic cien --from-beginning
...
hello kafka
ni hao ya
如果都能接收到,说明kafka部署成功!
[root@host3 ~]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper 192.168.10.23:2181 --list #查看所有topic
[root@host3 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.10.23:2181 --topic qianfeng #查看指定topic的详细信息
Topic:qianfeng PartitionCount:1 ReplicationFactor:2 Configs:
Topic: qianfeng Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3
[root@host3 ~]# /usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper 192.168.10.23:2181 --topic qianfeng #删除topic
Topic qianfeng is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
配置elfk集群订阅和zookeeper和kafka

配置第一台logstash生产消息输出到kafka
yum -y install wget
wget https://d6.injdk.cn/oraclejdk/8/jdk-8u341-linux-x64.rpm
yum localinstall jdk-8u341-linux-x64.rpm -y
java -version
1.安装logstash
tar xf logstash-6.4.1.tar.gz -C /usr/local
ln -s /usr/local/logstash-6.4.1 /usr/local/logstash
2.修改配置文件
cd /usr/local/logstash/config/
vim logstash.yml
http.host: "0.0.0.0"
3.编写配置文件
不要过滤, logstash会将message内容写入到队列中
# cd /usr/local/logstash/config/
# vim logstash-kafka.conf
input {file {type => "sys-log"path => "/var/log/messages"start_position => beginning}
}
output {kafka {bootstrap_servers => "192.168.148.141:9092,192.168.148.142:9092,192.168.148.143:9092" #输出到kafka集群topic_id => "sys-log-messages" #主题名称compression_type => "snappy" #压缩类型codec => "json"}
}
启动logstash
# /usr/local/logstash/bin/logstash -f logstash-kafka.conf
在kafka上查看主题,发现已经有了sys-log-messages,说明写入成功了
[root@host2 ~]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper 192.168.148.141:2181 --list
__consumer_offsets
qianfeng
sys-log-messages
[root@host2 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.148.141:2181 --topic sys-log-messages
Topic:sys-log-messages PartitionCount:1 ReplicationFactor:2 Configs:
Topic: sys-log-messages Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2
配置第二台logstash,订阅kafka日志,输出到es集群
# cat kafka-es.conf
input {kafka {bootstrap_servers => "192.168.148.141:9092,192.168.148.142:9092,192.168.148.143:9092" topics => "sys-log-messages" #kafka主题名称codec => "json"auto_offset_reset => "earliest"}
}output {elasticsearch {hosts => ["192.168.148.131:9200","192.168.148.132:9200"]index => "kafka-%{type}-%{+YYYY.MM.dd}"}
}
相关文章:
2.部署kafka:9092
官方文档:http://kafka.apache.org/documentation.html (虽然kafka中集成了zookeeper,但还是建议使用独立的zk集群) Kafka3台集群搭建环境: 操作系统: centos7 防火墙:全关 3台zookeeper集群内的机器,1台logstash 软件版本: …...
学习路之PHP --TP6异步执行功能 (无需安装任何框架)
学习路之PHP --异步执行功能 (无需安装任何框架) 简介一、工具类二、调用三、异步任务的操作四、效果: 简介 执行异步任务是一种很常见的需求,如批量发邮箱,短信等等执行耗时任务时,需要程序异步执行&…...
Uniapp 小程序复制、粘贴功能实现
在开发 Uniapp 小程序的过程中,复制和粘贴功能是非常实用且常见的交互需求。今天,我就来和大家详细分享如何在 Uniapp 中实现这两个功能。 复制功能:uni.setClipboardData方法 goResult() {uni.setClipboardData({data: this.copyContent, /…...
seacmsv9注入管理员账号密码+orderby+limit
一、seacmsv9 SQL注入漏洞 查看源码 <?php session_start(); require_once("include/common.php"); //前置跳转start $cs$_SERVER["REQUEST_URI"]; if($GLOBALS[cfg_mskin]3 AND $GLOBALS[isMobile]1){header("location:$cfg_mhost$cs");}…...
多通道数据采集和信号生成的模块化仪器如何重构飞机电子可靠性测试体系?
飞机的核心电子系统包括发电与配电系统,飞机内部所有设备和系统之间的内部数据通信系统,以及用于外部通信的射频设备。其他所有航空电子元件都依赖这些关键总线进行电力传输或数据通信。在本文中,我们将了解模块化仪器(无论是PCIe…...
天润融通分析DeepSeek如何一键完成从PR接入,到真正的业务接入
DeepSeek出圈之后,市场上很快掀起了一波DeepSeek接入潮。 在客户服务领域,许多企业见识到DeepSeek的超强能力后,也迅速接入DeepSeek并获得了不错的效果。 比如在客户接待服务场景,有企业将DeepSeek应用到智能问答助手࿰…...
免费PDF工具
Smallpdf.com - A Free Solution to all your PDF Problems Smallpdf - the platform that makes it super easy to convert and edit all your PDF files. Solving all your PDF problems in one place - and yes, free. https://smallpdf.com/#rappSmallpdf.com-解决您所有PD…...
PyTorch 源码学习:GPU 内存管理之它山之石——TensorFlow BFC 算法
TensorFlow 和 PyTorch 都是常用的深度学习框架,各自有一套独特但又相似的 GPU 内存管理机制(BFC 算法)。它山之石可以攻玉。了解 TensorFlow 的 BFC 算法有助于学习 PyTorch 管理 GPU 内存的精妙之处。本文重点关注 TensorFlow BFC 算法的核…...
【学写LibreCAD】1 LibreCAD主程序
一、源码 头文件: #ifndef MAIN_H #define MAIN_H#include<QStringList>#define STR(x) #x #define XSTR(x) STR(x)/*** brief handleArgs* param argc cli argument counter from main()* param argv cli arguments from main()* param argClean a list…...
Android Studio超级详细讲解下载、安装配置教程(建议收藏)
博主介绍:✌专注于前后端、机器学习、人工智能应用领域开发的优质创作者、秉着互联网精神开源贡献精神,答疑解惑、坚持优质作品共享。本人是掘金/腾讯云/阿里云等平台优质作者、擅长前后端项目开发和毕业项目实战,深受全网粉丝喜爱与支持✌有…...
CDN与群联云防护的技术差异在哪?
CDN(内容分发网络)与群联云防护是两种常用于提升网站性能和安全的解决方案,但两者的核心目标和技术实现存在显著差异。本文将从防御机制、技术架构、适用场景和代码实现等方面详细对比两者的区别,并提供可直接运行的代码示例。 一…...
故障诊断 | Matlab实现基于DBO-BP-Bagging多特征分类预测/故障诊断
故障诊断 | Matlab实现基于DBO-BP-Bagging多特征分类预测/故障诊断 目录 故障诊断 | Matlab实现基于DBO-BP-Bagging多特征分类预测/故障诊断分类效果基本介绍模型描述DBO-BP-Bagging蜣螂算法优化多特征分类预测一、引言1.1、研究背景和意义1.2、研究现状1.3、研究目的与方法 二…...
Linux-SaltStack配置
文章目录 SaltStack配置 🏡作者主页:点击! 🤖Linux专栏:点击! ⏰️创作时间:2025年02月24日20点51分 SaltStack配置 SaltStack 中既支持SSH协议也支持我们的一个客户端 #获取公钥(…...
内网渗透测试-Vulnerable Docker靶场
靶场来源: Vulnerable Docker: 1 ~ VulnHub 描述:Down By The Docker 有没有想过在容器中玩 docker 错误配置、权限提升等? 下载此 VM,拿出您的渗透测试帽并开始使用 我们有 2 种模式: - HARD:这需要您将 d…...
云计算如何解决延迟问题?
在云计算中,延迟(latency)指的是从请求发出到收到响应之间的时间间隔。延迟过高可能会严重影响用户体验,特别是在需要实时响应的应用中,如在线游戏、视频流、金融交易等。云计算服务如何解决延迟问题,通常依…...
飞书webhook监控业务系统端口
钉钉告警没有额度了,替代方案使用企业微信或者是飞书,以下脚本是飞书为例 监控ping也就是活动主机 #!/bin/bash # IP Ping 监控脚本 date$(date "%Y-%m-%d %H:%M:%S") # 根据实际情况修改飞书 Webhook 地址 webhook"https://open.feish…...
电脑键盘知识
1、键盘四大功能区 1. 功能区 2. 主要信息输入区 3. 编辑区 4. 数字键盘区 笔记本电脑键盘的功能区,使用前需先按Fn键 1.1、功能区 ESC:退出 F1:显示帮助信息 F2:重命名 F4:重复上一步操作 F5:刷新网页 …...
Oracle23版本 创建用户 报 00959和65096错误解决办法
00959错误解决办法,用户名必须已 c##或者C##开头 65096错误解决办法,创建用户名时去掉DEFAULT TABLESPACE smallrainTablespace这个属性 附上oracle 23版本创建表空间和用户语句; sqlplus sys as sysdba CREATE TABLESPACE smallrainOrac…...
SAP-ABAP:使用ST05(SQL Trace)追踪结构字段来源的步骤
ST05 是 SAP 提供的 SQL 跟踪工具,可以记录程序运行期间所有数据库操作(如 SELECT、UPDATE、INSERT)。通过分析跟踪结果,可以精准定位程序中结构字段对应的数据库表。 步骤1:激活ST05跟踪 事务码 ST05 → 点击 Activa…...
《深度学习实战》第3集:循环神经网络(RNN)与序列建模
第3集:循环神经网络(RNN)与序列建模 引言 在深度学习领域,处理序列数据(如文本、语音、时间序列等)是一个重要的研究方向。传统的全连接网络和卷积神经网络(CNN)难以直接捕捉序列中…...
AI头像生成器快速入门:3步搞定赛博朋克/古风/动漫头像创意文案
AI头像生成器快速入门:3步搞定赛博朋克/古风/动漫头像创意文案 1. 为什么你需要AI头像生成器 在社交媒体时代,一个独特的头像能让你在人群中脱颖而出。但设计一个完美的头像并不容易,特别是当你想要特定风格时——无论是未来感十足的赛博朋…...
结合JavaScript前端实现实时文本相似度对比工具
结合JavaScript前端实现实时文本相似度对比工具 1. 引言 你有没有遇到过这样的场景?写文章时,总感觉某两段话意思差不多,但又说不清到底有多像;翻译一段文字后,想对比一下自己的版本和参考译文,看看意思有…...
Linux环境下CTC语音唤醒模型的一键部署教程
Linux环境下CTC语音唤醒模型的一键部署教程 1. 引言 语音唤醒技术现在越来越普及了,就像我们平时对智能音箱说"小爱同学"或者"天猫精灵"一样,设备听到特定关键词就会激活响应。今天要介绍的是一个专门为移动端设计的CTC语音唤醒模…...
TypeScript的模板字面量类型:实现类型安全的SQL查询
TypeScript的模板字面量类型:实现类型安全的SQL查询 在现代前端开发中,TypeScript因其强大的类型系统而备受青睐。而模板字面量类型作为TypeScript 4.1引入的新特性,进一步扩展了类型系统的能力,使得开发者能够实现更精细的类型约…...
Zynq UltraScale+ PL中断深度解析:从硬件连接到软件响应的完整链路
Zynq UltraScale PL中断深度解析:从硬件连接到软件响应的完整链路 在异构计算架构中,中断机制如同神经系统般连接着可编程逻辑(PL)与处理系统(PS)。当工程师需要实现微秒级实时响应或构建高可靠性系统时&am…...
CopyOnWriteArrayList 实现原理
什么是CopyOnWriteArrayList?CopyOnWriteArrayList 是 Java 并发包 (java.util.concurrent) 中一个非常独特且重要的线程安全集合。与 Collections.synchronizedList 不同,CopyOnWriteArrayList 不依赖外部同步,而是通过内部机制实现并发控制…...
XLR8Servo_vhdl:FPGA硬件加速的高精度伺服控制方案
1. XLR8Servo_vhdl 加速伺服控制库深度解析1.1 项目定位与工程痛点XLR8Servo_vhdl 是专为 Alorium Technology XLR8 开发板设计的硬件加速型伺服电机控制库,其核心目标是彻底解决传统 Arduino Servo 库在实时性、资源占用和功能冲突三大维度上的固有缺陷。该库并非软…...
K8s入门到实战
一,简介 1,k8s概述 容器管理适用于集群部署,自动化部署 k8s利于应用扩展 k8s目标实施让容器化应用更加简洁和高效 2,k8s特性 自动装箱:基于容器对应用运行环境的资源配值要求自动部署应用 自我修复:当…...
分享 种 .NET 桌面应用程序自动更新解决方案檀
一、Actor 模型:不是并发技巧,而是领域单元 Actor 模型的本质是: Actor 是独立运行的实体 Actor 之间只通过消息交互 Actor 内部状态不可被外部直接访问 Actor 自行决定如何处理收到的消息 Actor 模型真正解决的是: 如何在不共享状…...
@所有管理者:5分钟让“龙虾”进化为“视觉智能管家”!
一见视觉Skill入驻ClawHub!无需复杂配置与高额成本,即可打造专属“数字店长/数字厂长”,让管理更安心。 现开启内测,首批体验官将优先享有专属体验权益! 巡检靠跑、反馈靠等、复盘靠猜? 连锁门店与工厂车…...
