Kafka简单实践
使用 Apache Kafka 和 Swoole 的 PHP 实践案例
一、引言
Apache Kafka 是一个开源的分布式流处理平台,能够处理大量的实时数据流。由于其高吞吐量、可扩展性和持久性,Kafka 成为构建微服务架构和大数据处理的重要工具。Swoole 是一个高性能的异步网络通信框架,允许 PHP 以异步方式进行高并发的处理。结合这两者,我们可以构建一个高效的消息传递系统。本文将介绍 Kafka 的基本概念,并通过一个使用 PHP 和 Swoole 的实际案例来演示如何使用 Kafka 进行消息处理。
二、Kafka 的基本概念
2.1 什么是 Kafka
Kafka 是一个分布式的流处理平台,设计用来处理实时数据流。其核心组件如下:
- 主题(Topic):Kafka 中的数据流分类,消费者可以通过订阅主题来接收消息。
- 生产者(Producer):向主题发布消息的客户端。
- 消费者(Consumer):从主题读取消息的客户端。
- 消费者组(Consumer Group):多个消费者可以组成一个消费者组,共享读取同一主题的消息。
- 代理(Broker):Kafka 集群中的服务器,负责存储消息和处理请求。
2.2 Kafka 的特点
- 高吞吐量:Kafka 能够每秒处理数百万条消息,适合大规模数据处理。
- 持久性:所有消息都被持久化到磁盘,可以通过设置保留策略来管理。
- 可扩展性:Kafka 可以横向扩展,增加更多代理以提高处理能力。
- 容错性:Kafka 具有内置的故障转移能力,保证消息传递的可靠性。
三、Swoole 的基本概念
3.1 什么是 Swoole
Swoole 是一个高性能的 PHP 扩展,提供了异步、协程和多线程等功能,使 PHP 能够处理高并发请求。它可以用于构建高性能的 Web 服务器、API 服务器及微服务。
3.2 Swoole 的特点
- 高性能:能够处理数万并发连接,适合高并发应用。
- 异步非阻塞:支持异步 IO,能够提升应用的响应速度。
- 协程支持:提供协程机制,使得异步编程更加简单直观。
四、使用 Kafka 和 Swoole 的 PHP 实践案例
4.1 环境准备
在本示例中,我们将创建一个 Kafka 生产者和消费者,并使用 Swoole 来处理高并发请求。
1. 安装 Kafka
确保在你的环境中已经安装并配置好 Kafka 和 ZooKeeper。可以参考 Kafka 官方文档进行安装。
2. 安装 Swoole
在你的 PHP 环境中安装 Swoole 扩展。可以使用 PECL 进行安装:
pecl install swoole
3. 安装 php-rdkafka
同样需要安装 php-rdkafka
扩展,以便与 Kafka 进行交互:
sudo apt-get install librdkafka-dev
pecl install rdkafka
在 php.ini
文件中添加以下行启用扩展:
extension=rdkafka.so
重启你的 Web 服务器。
4.2 创建 Kafka 生产者和消费者
4.2.1 生产者示例
<?php
// Producer.php
use RdKafka\Producer;
use RdKafka\Topic;require 'vendor/autoload.php';$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', 'localhost:9092'); // 设置 Kafka 代理地址$producer = new Producer($conf);
$topic = 'test_topic'; // 主题名称// Swoole HTTP 服务器
$http = new Swoole\Http\Server("127.0.0.1", 9501);$http->on("request", function ($request, $response) use ($producer, $topic) {$message = isset($request->post['message']) ? $request->post['message'] : "Hello Kafka!";$producer->newTopic($topic)->produce(RD_KAFKA_PARTITION_UA, 0, $message); // 发送消息$producer->flush(10000);$response->header("Content-Type", "text/plain");$response->end("Message sent: " . $message);
});// 启动服务器
$http->start();
?>
4.2.2 消费者示例
<?php
// Consumer.php
use RdKafka\Consumer;
use RdKafka\ConsumerTopic;require 'vendor/autoload.php';$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', 'localhost:9092'); // 设置 Kafka 代理地址
$conf->set('group.id', 'test_group'); // 设置消费者组$consumer = new Consumer($conf);
$consumer->addBrokers("localhost:9092");$topic = $consumer->newTopic("test_topic"); // 创建或获取主题
$topic->consumeStart(0, RD_KAFKA_OFFSET_END); // 从结束位置开始消费// Swoole 协程
Co\run(function () use ($topic) {while (true) {$message = $topic->consume(0, 1000); // 消费消息,超时为1000msif ($message->err) {if ($message->err === RD_KAFKA_RESP_ERR__TIMED_OUT) {continue; // 超时,继续循环} else {echo "Error: " . $message->errstr() . "\n"; // 输出错误信息break; // 出现错误,退出循环}}echo "Received message: " . $message->payload . "\n"; // 输出消息内容}
});
?>
4.3 启动示例
- 启动 ZooKeeper 和 Kafka 代理:
# 启动 ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties# 启动 Kafka 代理
bin/kafka-server-start.sh config/server.properties
- 在另一个终端中,运行消费者脚本:
php Consumer.php
- 在另一个终端中,运行生产者脚本:
php Producer.php
- 使用 HTTP 客户端(如 Postman 或 curl)向生产者发送 POST 请求:
curl -X POST http://127.0.0.1:9501 -d "message=Hello from Swoole!"
消费者将在终端中接收到消息。
五、总结
通过结合 Apache Kafka 和 Swoole,我们能够构建一个高效的消息传递系统。Kafka 提供了可靠的消息队列,而 Swoole 则为 PHP 提供了高并发处理能力。本文中的示例展示了如何使用这两者创建简单的生产者和消费者。随着项目需求的增加,我们可以进一步扩展该系统,例如进行消息处理、增加错误处理逻辑、实现数据持久化等。
Kafka 和 Swoole 的组合使得开发实时数据处理和高性能应用变得更加容易,是现代应用架构中不可或缺的一部分。
相关文章:

Kafka简单实践
使用 Apache Kafka 和 Swoole 的 PHP 实践案例 一、引言 Apache Kafka 是一个开源的分布式流处理平台,能够处理大量的实时数据流。由于其高吞吐量、可扩展性和持久性,Kafka 成为构建微服务架构和大数据处理的重要工具。Swoole 是一个高性能的异步网络通…...

JS
文章目录 项目地址一、JS1.1 if语句1.2 for循环1.2 三元表达式1.3 switch1.4 数组的push方法1.5 fuction1.5.1 arguments1.6 匿名函数1.7 预解析1.8 js对象1.8.1创建一个类1.8.2 遍历对象1.9 js的内置对象1.9.1 随机整数二、DOM2.1 获取元素2.2 事件基础2.2.1 事件三要素2.2.2 …...

【原创】java+ssm+mysql商品库存管理系统(进销存)设计与实现
个人主页:程序猿小小杨 个人简介:从事开发多年,Java、Php、Python、前端开发均有涉猎 博客内容:Java项目实战、项目演示、技术分享 文末有作者名片,希望和大家一起共同进步,你只管努力,剩下的交…...

three.js 杂记
欧拉角旋转变换 x,y,z 弧度单位 THREE.MathUtils.DEG2RAD 度数转弧度 new THREE.Euler( - 90 * THREE.MathUtils.DEG2RAD, 0, 0 ) radius:半径 setFromSphericalCoords ( radius : Float, phi : Float, theta : Float ) : this 从球坐标中的radius、phi和theta设置该向量…...

基于Hadoop、hive的数仓搭建实践
文章目录 架构图Hadoop搭建Hive 搭建MySQL搭建官网文档下载配置配置hive环境变量配置日志文件配置hive-site 复制mysql 驱动包删除日志包初始化元数据启动metastore服务使用hive CLI启动hiveServer2访问hiveserver2客户端连接beeline shell连接 Dbeaver连接经验 基于HDFS Hive…...

新的恶意软件活动通过游戏应用程序瞄准 Windows 用户
一种新的恶意软件 Winos4.0 被积极用于网络攻击活动。FortiGuard实验室发现,这种先进的恶意框架是从臭名昭著的 Gh0strat 演变而来的,配备了模块化组件,可在受感染的设备上进行一系列恶意活动。 这些攻击已在游戏相关应用程序中发现…...

【Hutool系列】反射工具-ReflectUtil
前言 反射是 Java 中一种强大的机制,可以在运行时动态地获取类的信息并操作类的属性和方法。在 Java 中,通过反射可以获取和设置类的字段、调用类的方法、创建类的实例等。Java的反射机制,可以让语言变得更加灵活,对对象的操作也更…...
【操作系统专业课】第二次作业
第1题(进程同步与互斥) 使用二值信号量实现 n 个进程之间的互斥。 1. 定义一个二值信号量 mutex= 1。 二值信号量:二值信号量只有两种取值,0 (资源已被占用)和 1(资源可用)。 2. 进程进入临界区前的操作:每个进程在进入临界区之前,都需要执行 P(mutex) 操作。 P 操作…...

Scala的迭代器
1.对比foreach 它的优点在于: (1) 内存效率高。迭代器采用延迟计算的方式,它不会将整个集合加载到内存中,而是在每次调用next方法时才计算并返回下一个元素。 (2) 统一的遍历方法。迭代器为不同类型的集合(如列表、集合、映射等…...

(RK3566驱动开发 - 1).pinctrl和gpio子系统
一.设备树 pinctrl部分可以参考 rockchip 官方的绑定文档 :kernel/Documentation/devicetree/bindings/pinctrl PIN_BANK:引脚所属的组 - 本次例程使用的是 GPIO3_A1 这个引脚,所以所属的组为 3; PIN_BANK_IDX:引脚的…...

css三角制作(二十课)
代码: <style>/* 边框原理 */.box1 {width: 0;height: 0;border-top: 100px solid pink;border-bottom: 100px solid blue;border-left: 100px solid yellow;border-right: 100px solid greenyellow;}/* 三角制作 */.box2 {width: 0;height: 0;border: 100px …...

C++_priority_queue(优先级队列)
✨✨ 欢迎大家来到小伞的大讲堂✨✨ 🎈🎈养成好习惯,先赞后看哦~🎈🎈 所属专栏:C学习 小伞的主页:xiaosan_blog 1. priority_queue的介绍和使用 priority_queue文档介绍 优先级队列的实现的关键…...

微信小程序——01开发前的准备和开发工具
文章目录 一、开发前的准备1注册小程序账号2安装开发者工具 二、开发者工具的使用1创建项目2 工具的使用3目录结构4各个页面之间的关系5 权限管理6提交审核和发布 一、开发前的准备 开发前需要进行以下准备: 1 注册小程序账号2激活邮箱3 信息登记4 登录小程序管理后…...

MySQL 的主从复制数据同步
一、什么是 MySQL 的主从复制 MySQL 的主从复制(Master-Slave Replication)是一种将数据从一个主数据库服务器(主库)复制到一个或多个从数据库服务器(从库)的技术。主库负责所有的数据写操作,从…...

python——面向对象
一、面向对象编程 1.1 面向过程与面向对象 面向过程和面向对象都是一种编程方式,只不过再设计上有区别。 1.1.1 面向过程pop: 举例:孩子上学 1. 妈妈起床 2. 妈妈洗漱 3. 妈妈做饭 4. 妈妈把孩子叫起来 5. 孩子起床 6. 孩子洗漱 7. 孩子吃…...

Microsoft 365 Exchange如何设置可信发件IP白名单
1、 进入到 Microsoft 365 admin center 管理中心 ,点击 管理中心 下的 安全 在弹出的新页面中,依次点击 策略和规则 – 威胁策略 – 反垃圾邮件 再单击 连接筛选器策略(默认) – 编辑连接筛选器策略 2、在 IP 允许列表 中添加可信邮件 IP 段࿰…...

LM27313典型电路之升压电路
下图为升压芯片LM27313典型电路图: 从图中可以看出:系统电压VSYS3.7伏,通过C26与C27两个滤波电容后,到达升压芯片的VIN输入脚pin5。 其中电源芯片的电压输出由下式子决定: VOUT1.23*(1R17/R21) 其中VOUT是图中的V5D…...

嵌入式面试八股文(七)·#ifndef#define#endif的作用、以及内存分区(全局区、堆区、栈区、代码区)
目录 1. 头文件中的#ifndef / #define / #endif的作用是什么? 2. 内存分区:全局区、堆区、栈区、代码区简单描述? 2.1 代码区(Text Segment): 2.2 全局区(Data Segment)&…...

【弱监督视频异常检测】2024-ESWA-基于扩散的弱监督视频异常检测常态预训练
2024-ESWA-Diffusion-based normality pre-training for weakly supervised video anomaly detection 基于扩散的弱监督视频异常检测常态预训练摘要1. 引言2. 相关工作3. 方法论3.1. 使用扩散自动编码器进行常态学习3.2. 全局-局部特征编码器3.2.1 局部块3.2.2 全局块3.2.3 协同…...

Android 13 实现屏幕熄屏一段时候后关闭 Wi-Fi 和清空多任务列表
明白了,您这个补丁的功能是当设备屏幕关闭一段时间后,自动关闭 Wi-Fi 连接并清空多任务菜单。以下是更新后的博客内容,包含了对功能的详细解释和代码实现: 修改 PowerManagerService.java 以实现屏幕灭屏后关闭 Wi-Fi 和清空多任务菜单功能 在本篇博客中,我们将介绍一个针…...

Elasticsearch磁盘占用大于95%时将所有索引置为只读
在一个稳定运行的功能中,突然收到报错。经查明,是在向 Elasticsearch 中插入文档时出现了错误: AuthorizationException: AuthorizationException(403, ucluster_block_exception, ublocked by: [FORBIDDEN/12/index read-only / allow delete (api)];) 网上也有其他人报出类…...

删除 git config 保存的密码
要从 Git 中删除保存的密码,你可以根据你之前使用的保存方法来操作。以下是一些常见的方法来删除 Git 中保存的密码: 删除 credential.helper 中的密码 如果你之前使用 store 或 cache 作为 credential.helper,你可以执行以下步骤来删除保存…...

Springboot环境搭建详解
springboot学习视频记录: 笔记: a:Springboot maven常见依赖、配置文件笔记-CSDN博客 b:Springboot环境搭建详解-CSDN博客 day01 6:springboot的parent和starter依赖- a 7:启动类的位置配置- b 8&am…...

SpringCloud框架学习(第三部分:Resilience4j 与 Micrometer)
目录 九、CircuitBreaker断路器 1.前言(Hystrix) 2.服务雪崩 3.Circuit Breaker 4. Resilience4j 5.案例实战 (1)熔断(服务熔断 服务降级) Ⅰ. 按照 COUNT_BASED(计数的滑动窗口…...

Scala的Map集合(不可变)
package gxy//类型:不可变,可变 //操作:添加元素,删除元素,查询元素,移除元素,遍历 object map {def main(args: Array[String]): Unit {//不可变mapval map1 Map("鄂" -> "…...

深入剖析:Spring MVC与Struts的较量
标题:深入剖析:Spring MVC与Struts的较量 引言 在Java Web开发领域,Spring MVC和Struts是两个非常流行的框架。它们各自拥有不同的特点,适用于不同的应用场景。本文将深入探讨Spring MVC和Struts的区别,从底层机制、…...

4.Mybatis中,在Mapper的SQL映射文件中,使用<choose><when>无法识别参数的情况
正确结果 <?xml version"1.0" encoding"UTF-8" ?> <!DOCTYPE mapperPUBLIC "-//mybatis.org//DTD Mapper 3.0//EN""http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace"com.itheima.mapper.Bra…...

antd proFromSelect 懒加载+模糊查询
antd proFromSelect 懒加载模糊查询 场景 查询用户的时候数量特别大,有10w条数据,不可能直接全部查询用来展示 所以本文章将讲解如何使用懒加载模糊查询,解决数量过大的问题 后端代码就不用展示了,很简单的分页查询,主…...

Spring Boot 牛刀小试 org.springframework.boot:spring-boot-maven-plugin:找不到类错误
今天看了下书翻了下Spring Boot的用法,下载idea后, 反复出现org.springframework.boot:spring-boot-maven-plugin:找不到类错误,后来看了下调试窗口,发现是连不上maven的网站443错误,解决思路很简单,把ide连…...

qt中ctrl+鼠标左键无法进入
现象:qt中ctrl鼠标左键无法跳转部分函数,例如能跳到textEdit->toPlainText().,但无法跳转到toUtf8();但编译没有问题 排查1:我发现是交叉编译链的问题,使用linux自带就可以进,用ATK-I.MX6U就部分不能进…...