2.部署kafka:9092
官方文档:http://kafka.apache.org/documentation.html
(虽然kafka中集成了zookeeper,但还是建议使用独立的zk集群)
Kafka3台集群搭建环境:
操作系统: centos7
防火墙:全关
3台zookeeper集群内的机器,1台logstash
软件版本: zookeeper-3.4.12.tar.gz
软件版本kafka_2.12-2.1.0.tgz
安装软件
(3台zookeeper集群的机器)
# tar xf kafka_2.12-2.1.0.tgz -C /usr/local/
# ln -s /usr/local/kafka_2.12-2.1.0/ /usr/local/kafka
创建数据目录(3台)
# mkdir /data/kafka-logs
修改第一台配置文件
(注意不同颜色标记的部分)
# egrep -v "^$|^#" /usr/local/kafka/config/server.properties
broker.id=1 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
listeners=PLAINTEXT://192.168.148.141:9092 #监听套接字
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
log.dirs=/data/kafka-logs #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数
#如果配置多个目录,新创建的topic把消息持久化在分区数最少那一个目录中
num.partitions=1 #默认的分区数,一个topic默认1个分区数
num.recovery.threads.per.data.dir=1 #在启动时恢复日志和关闭时刷新日志时每个数据目录的线程的数量,默认1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本数
replica.fetch.max.bytes=5242880 #取消息的最大字节数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间,到目录查看是否有过期的消息如果有,删除
zookeeper.connect=192.168.148.141:2181,192.168.148.142:2181,192.168.148.143:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
修改另外两台配置文件
#scp /usr/local/kafka/config/server.properties kafka-2:/usr/local/kafka/config/
broker.id=2
listeners=PLAINTEXT://192.168.148.142:9092
# scp /usr/local/kafka/config/server.properties kafka-3:/usr/local/kafka/config/
broker.id=3
listeners=PLAINTEXT://192.168.148.143:9092
启动kafka(3台)
[root@host1 ~]# /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
查看启动情况(3台)
[root@host1 ~]# jps
10754 QuorumPeerMain
11911 Kafka
12287 Jps
创建topic来验证
[root@host1 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.148.143:2181 --replication-factor 2 --partitions 1 --topic cien
出现Created topic "cien"验证成功运行
在一台服务器上创建一个发布者
[root@host2 ~]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.148.141:9092 --topic cien
> hello kafka
> ni hao ya
>
在另一台服务器上创建一个订阅者
[root@host3 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.148.142:9092 --topic cien --from-beginning
...
hello kafka
ni hao ya
如果都能接收到,说明kafka部署成功!
[root@host3 ~]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper 192.168.10.23:2181 --list #查看所有topic
[root@host3 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.10.23:2181 --topic qianfeng #查看指定topic的详细信息
Topic:qianfeng PartitionCount:1 ReplicationFactor:2 Configs:
Topic: qianfeng Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3
[root@host3 ~]# /usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper 192.168.10.23:2181 --topic qianfeng #删除topic
Topic qianfeng is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
配置elfk集群订阅和zookeeper和kafka

配置第一台logstash生产消息输出到kafka
yum -y install wget
wget https://d6.injdk.cn/oraclejdk/8/jdk-8u341-linux-x64.rpm
yum localinstall jdk-8u341-linux-x64.rpm -y
java -version
1.安装logstash
tar xf logstash-6.4.1.tar.gz -C /usr/local
ln -s /usr/local/logstash-6.4.1 /usr/local/logstash
2.修改配置文件
cd /usr/local/logstash/config/
vim logstash.yml
http.host: "0.0.0.0"
3.编写配置文件
不要过滤, logstash会将message内容写入到队列中
# cd /usr/local/logstash/config/
# vim logstash-kafka.conf
input {file {type => "sys-log"path => "/var/log/messages"start_position => beginning}
}
output {kafka {bootstrap_servers => "192.168.148.141:9092,192.168.148.142:9092,192.168.148.143:9092" #输出到kafka集群topic_id => "sys-log-messages" #主题名称compression_type => "snappy" #压缩类型codec => "json"}
}
启动logstash
# /usr/local/logstash/bin/logstash -f logstash-kafka.conf
在kafka上查看主题,发现已经有了sys-log-messages,说明写入成功了
[root@host2 ~]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper 192.168.148.141:2181 --list
__consumer_offsets
qianfeng
sys-log-messages
[root@host2 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.148.141:2181 --topic sys-log-messages
Topic:sys-log-messages PartitionCount:1 ReplicationFactor:2 Configs:
Topic: sys-log-messages Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2
配置第二台logstash,订阅kafka日志,输出到es集群
# cat kafka-es.conf
input {kafka {bootstrap_servers => "192.168.148.141:9092,192.168.148.142:9092,192.168.148.143:9092" topics => "sys-log-messages" #kafka主题名称codec => "json"auto_offset_reset => "earliest"}
}output {elasticsearch {hosts => ["192.168.148.131:9200","192.168.148.132:9200"]index => "kafka-%{type}-%{+YYYY.MM.dd}"}
}
相关文章:
2.部署kafka:9092
官方文档:http://kafka.apache.org/documentation.html (虽然kafka中集成了zookeeper,但还是建议使用独立的zk集群) Kafka3台集群搭建环境: 操作系统: centos7 防火墙:全关 3台zookeeper集群内的机器,1台logstash 软件版本: …...
学习路之PHP --TP6异步执行功能 (无需安装任何框架)
学习路之PHP --异步执行功能 (无需安装任何框架) 简介一、工具类二、调用三、异步任务的操作四、效果: 简介 执行异步任务是一种很常见的需求,如批量发邮箱,短信等等执行耗时任务时,需要程序异步执行&…...
Uniapp 小程序复制、粘贴功能实现
在开发 Uniapp 小程序的过程中,复制和粘贴功能是非常实用且常见的交互需求。今天,我就来和大家详细分享如何在 Uniapp 中实现这两个功能。 复制功能:uni.setClipboardData方法 goResult() {uni.setClipboardData({data: this.copyContent, /…...
seacmsv9注入管理员账号密码+orderby+limit
一、seacmsv9 SQL注入漏洞 查看源码 <?php session_start(); require_once("include/common.php"); //前置跳转start $cs$_SERVER["REQUEST_URI"]; if($GLOBALS[cfg_mskin]3 AND $GLOBALS[isMobile]1){header("location:$cfg_mhost$cs");}…...
多通道数据采集和信号生成的模块化仪器如何重构飞机电子可靠性测试体系?
飞机的核心电子系统包括发电与配电系统,飞机内部所有设备和系统之间的内部数据通信系统,以及用于外部通信的射频设备。其他所有航空电子元件都依赖这些关键总线进行电力传输或数据通信。在本文中,我们将了解模块化仪器(无论是PCIe…...
天润融通分析DeepSeek如何一键完成从PR接入,到真正的业务接入
DeepSeek出圈之后,市场上很快掀起了一波DeepSeek接入潮。 在客户服务领域,许多企业见识到DeepSeek的超强能力后,也迅速接入DeepSeek并获得了不错的效果。 比如在客户接待服务场景,有企业将DeepSeek应用到智能问答助手࿰…...
免费PDF工具
Smallpdf.com - A Free Solution to all your PDF Problems Smallpdf - the platform that makes it super easy to convert and edit all your PDF files. Solving all your PDF problems in one place - and yes, free. https://smallpdf.com/#rappSmallpdf.com-解决您所有PD…...
PyTorch 源码学习:GPU 内存管理之它山之石——TensorFlow BFC 算法
TensorFlow 和 PyTorch 都是常用的深度学习框架,各自有一套独特但又相似的 GPU 内存管理机制(BFC 算法)。它山之石可以攻玉。了解 TensorFlow 的 BFC 算法有助于学习 PyTorch 管理 GPU 内存的精妙之处。本文重点关注 TensorFlow BFC 算法的核…...
【学写LibreCAD】1 LibreCAD主程序
一、源码 头文件: #ifndef MAIN_H #define MAIN_H#include<QStringList>#define STR(x) #x #define XSTR(x) STR(x)/*** brief handleArgs* param argc cli argument counter from main()* param argv cli arguments from main()* param argClean a list…...
Android Studio超级详细讲解下载、安装配置教程(建议收藏)
博主介绍:✌专注于前后端、机器学习、人工智能应用领域开发的优质创作者、秉着互联网精神开源贡献精神,答疑解惑、坚持优质作品共享。本人是掘金/腾讯云/阿里云等平台优质作者、擅长前后端项目开发和毕业项目实战,深受全网粉丝喜爱与支持✌有…...
CDN与群联云防护的技术差异在哪?
CDN(内容分发网络)与群联云防护是两种常用于提升网站性能和安全的解决方案,但两者的核心目标和技术实现存在显著差异。本文将从防御机制、技术架构、适用场景和代码实现等方面详细对比两者的区别,并提供可直接运行的代码示例。 一…...
故障诊断 | Matlab实现基于DBO-BP-Bagging多特征分类预测/故障诊断
故障诊断 | Matlab实现基于DBO-BP-Bagging多特征分类预测/故障诊断 目录 故障诊断 | Matlab实现基于DBO-BP-Bagging多特征分类预测/故障诊断分类效果基本介绍模型描述DBO-BP-Bagging蜣螂算法优化多特征分类预测一、引言1.1、研究背景和意义1.2、研究现状1.3、研究目的与方法 二…...
Linux-SaltStack配置
文章目录 SaltStack配置 🏡作者主页:点击! 🤖Linux专栏:点击! ⏰️创作时间:2025年02月24日20点51分 SaltStack配置 SaltStack 中既支持SSH协议也支持我们的一个客户端 #获取公钥(…...
内网渗透测试-Vulnerable Docker靶场
靶场来源: Vulnerable Docker: 1 ~ VulnHub 描述:Down By The Docker 有没有想过在容器中玩 docker 错误配置、权限提升等? 下载此 VM,拿出您的渗透测试帽并开始使用 我们有 2 种模式: - HARD:这需要您将 d…...
云计算如何解决延迟问题?
在云计算中,延迟(latency)指的是从请求发出到收到响应之间的时间间隔。延迟过高可能会严重影响用户体验,特别是在需要实时响应的应用中,如在线游戏、视频流、金融交易等。云计算服务如何解决延迟问题,通常依…...
飞书webhook监控业务系统端口
钉钉告警没有额度了,替代方案使用企业微信或者是飞书,以下脚本是飞书为例 监控ping也就是活动主机 #!/bin/bash # IP Ping 监控脚本 date$(date "%Y-%m-%d %H:%M:%S") # 根据实际情况修改飞书 Webhook 地址 webhook"https://open.feish…...
电脑键盘知识
1、键盘四大功能区 1. 功能区 2. 主要信息输入区 3. 编辑区 4. 数字键盘区 笔记本电脑键盘的功能区,使用前需先按Fn键 1.1、功能区 ESC:退出 F1:显示帮助信息 F2:重命名 F4:重复上一步操作 F5:刷新网页 …...
Oracle23版本 创建用户 报 00959和65096错误解决办法
00959错误解决办法,用户名必须已 c##或者C##开头 65096错误解决办法,创建用户名时去掉DEFAULT TABLESPACE smallrainTablespace这个属性 附上oracle 23版本创建表空间和用户语句; sqlplus sys as sysdba CREATE TABLESPACE smallrainOrac…...
SAP-ABAP:使用ST05(SQL Trace)追踪结构字段来源的步骤
ST05 是 SAP 提供的 SQL 跟踪工具,可以记录程序运行期间所有数据库操作(如 SELECT、UPDATE、INSERT)。通过分析跟踪结果,可以精准定位程序中结构字段对应的数据库表。 步骤1:激活ST05跟踪 事务码 ST05 → 点击 Activa…...
《深度学习实战》第3集:循环神经网络(RNN)与序列建模
第3集:循环神经网络(RNN)与序列建模 引言 在深度学习领域,处理序列数据(如文本、语音、时间序列等)是一个重要的研究方向。传统的全连接网络和卷积神经网络(CNN)难以直接捕捉序列中…...
conda相比python好处
Conda 作为 Python 的环境和包管理工具,相比原生 Python 生态(如 pip 虚拟环境)有许多独特优势,尤其在多项目管理、依赖处理和跨平台兼容性等方面表现更优。以下是 Conda 的核心好处: 一、一站式环境管理:…...
渲染学进阶内容——模型
最近在写模组的时候发现渲染器里面离不开模型的定义,在渲染的第二篇文章中简单的讲解了一下关于模型部分的内容,其实不管是方块还是方块实体,都离不开模型的内容 🧱 一、CubeListBuilder 功能解析 CubeListBuilder 是 Minecraft Java 版模型系统的核心构建器,用于动态创…...
1.3 VSCode安装与环境配置
进入网址Visual Studio Code - Code Editing. Redefined下载.deb文件,然后打开终端,进入下载文件夹,键入命令 sudo dpkg -i code_1.100.3-1748872405_amd64.deb 在终端键入命令code即启动vscode 需要安装插件列表 1.Chinese简化 2.ros …...
(二)原型模式
原型的功能是将一个已经存在的对象作为源目标,其余对象都是通过这个源目标创建。发挥复制的作用就是原型模式的核心思想。 一、源型模式的定义 原型模式是指第二次创建对象可以通过复制已经存在的原型对象来实现,忽略对象创建过程中的其它细节。 📌 核心特点: 避免重复初…...
2021-03-15 iview一些问题
1.iview 在使用tree组件时,发现没有set类的方法,只有get,那么要改变tree值,只能遍历treeData,递归修改treeData的checked,发现无法更改,原因在于check模式下,子元素的勾选状态跟父节…...
苍穹外卖--缓存菜品
1.问题说明 用户端小程序展示的菜品数据都是通过查询数据库获得,如果用户端访问量比较大,数据库访问压力随之增大 2.实现思路 通过Redis来缓存菜品数据,减少数据库查询操作。 缓存逻辑分析: ①每个分类下的菜品保持一份缓存数据…...
LLM基础1_语言模型如何处理文本
基于GitHub项目:https://github.com/datawhalechina/llms-from-scratch-cn 工具介绍 tiktoken:OpenAI开发的专业"分词器" torch:Facebook开发的强力计算引擎,相当于超级计算器 理解词嵌入:给词语画"…...
聊一聊接口测试的意义有哪些?
目录 一、隔离性 & 早期测试 二、保障系统集成质量 三、验证业务逻辑的核心层 四、提升测试效率与覆盖度 五、系统稳定性的守护者 六、驱动团队协作与契约管理 七、性能与扩展性的前置评估 八、持续交付的核心支撑 接口测试的意义可以从四个维度展开,首…...
Mysql中select查询语句的执行过程
目录 1、介绍 1.1、组件介绍 1.2、Sql执行顺序 2、执行流程 2.1. 连接与认证 2.2. 查询缓存 2.3. 语法解析(Parser) 2.4、执行sql 1. 预处理(Preprocessor) 2. 查询优化器(Optimizer) 3. 执行器…...
算法:模拟
1.替换所有的问号 1576. 替换所有的问号 - 力扣(LeetCode) 遍历字符串:通过外层循环逐一检查每个字符。遇到 ? 时处理: 内层循环遍历小写字母(a 到 z)。对每个字母检查是否满足: 与…...
