当前位置: 首页 > news >正文

Kafka简单实践

使用 Apache Kafka 和 Swoole 的 PHP 实践案例

一、引言

Apache Kafka 是一个开源的分布式流处理平台,能够处理大量的实时数据流。由于其高吞吐量、可扩展性和持久性,Kafka 成为构建微服务架构和大数据处理的重要工具。Swoole 是一个高性能的异步网络通信框架,允许 PHP 以异步方式进行高并发的处理。结合这两者,我们可以构建一个高效的消息传递系统。本文将介绍 Kafka 的基本概念,并通过一个使用 PHP 和 Swoole 的实际案例来演示如何使用 Kafka 进行消息处理。

二、Kafka 的基本概念

2.1 什么是 Kafka

Kafka 是一个分布式的流处理平台,设计用来处理实时数据流。其核心组件如下:

  • 主题(Topic):Kafka 中的数据流分类,消费者可以通过订阅主题来接收消息。
  • 生产者(Producer):向主题发布消息的客户端。
  • 消费者(Consumer):从主题读取消息的客户端。
  • 消费者组(Consumer Group):多个消费者可以组成一个消费者组,共享读取同一主题的消息。
  • 代理(Broker):Kafka 集群中的服务器,负责存储消息和处理请求。

2.2 Kafka 的特点

  • 高吞吐量:Kafka 能够每秒处理数百万条消息,适合大规模数据处理。
  • 持久性:所有消息都被持久化到磁盘,可以通过设置保留策略来管理。
  • 可扩展性:Kafka 可以横向扩展,增加更多代理以提高处理能力。
  • 容错性:Kafka 具有内置的故障转移能力,保证消息传递的可靠性。

三、Swoole 的基本概念

3.1 什么是 Swoole

Swoole 是一个高性能的 PHP 扩展,提供了异步、协程和多线程等功能,使 PHP 能够处理高并发请求。它可以用于构建高性能的 Web 服务器、API 服务器及微服务。

3.2 Swoole 的特点

  • 高性能:能够处理数万并发连接,适合高并发应用。
  • 异步非阻塞:支持异步 IO,能够提升应用的响应速度。
  • 协程支持:提供协程机制,使得异步编程更加简单直观。

四、使用 Kafka 和 Swoole 的 PHP 实践案例

4.1 环境准备

在本示例中,我们将创建一个 Kafka 生产者和消费者,并使用 Swoole 来处理高并发请求。

1. 安装 Kafka

确保在你的环境中已经安装并配置好 Kafka 和 ZooKeeper。可以参考 Kafka 官方文档进行安装。

2. 安装 Swoole

在你的 PHP 环境中安装 Swoole 扩展。可以使用 PECL 进行安装:

pecl install swoole
3. 安装 php-rdkafka

同样需要安装 php-rdkafka 扩展,以便与 Kafka 进行交互:

sudo apt-get install librdkafka-dev
pecl install rdkafka

php.ini 文件中添加以下行启用扩展:

extension=rdkafka.so

重启你的 Web 服务器。

4.2 创建 Kafka 生产者和消费者

4.2.1 生产者示例
<?php
// Producer.php
use RdKafka\Producer;
use RdKafka\Topic;require 'vendor/autoload.php';$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', 'localhost:9092'); // 设置 Kafka 代理地址$producer = new Producer($conf);
$topic = 'test_topic'; // 主题名称// Swoole HTTP 服务器
$http = new Swoole\Http\Server("127.0.0.1", 9501);$http->on("request", function ($request, $response) use ($producer, $topic) {$message = isset($request->post['message']) ? $request->post['message'] : "Hello Kafka!";$producer->newTopic($topic)->produce(RD_KAFKA_PARTITION_UA, 0, $message); // 发送消息$producer->flush(10000);$response->header("Content-Type", "text/plain");$response->end("Message sent: " . $message);
});// 启动服务器
$http->start();
?>
4.2.2 消费者示例
<?php
// Consumer.php
use RdKafka\Consumer;
use RdKafka\ConsumerTopic;require 'vendor/autoload.php';$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', 'localhost:9092'); // 设置 Kafka 代理地址
$conf->set('group.id', 'test_group'); // 设置消费者组$consumer = new Consumer($conf);
$consumer->addBrokers("localhost:9092");$topic = $consumer->newTopic("test_topic"); // 创建或获取主题
$topic->consumeStart(0, RD_KAFKA_OFFSET_END); // 从结束位置开始消费// Swoole 协程
Co\run(function () use ($topic) {while (true) {$message = $topic->consume(0, 1000); // 消费消息,超时为1000msif ($message->err) {if ($message->err === RD_KAFKA_RESP_ERR__TIMED_OUT) {continue; // 超时,继续循环} else {echo "Error: " . $message->errstr() . "\n"; // 输出错误信息break; // 出现错误,退出循环}}echo "Received message: " . $message->payload . "\n"; // 输出消息内容}
});
?>

4.3 启动示例

  1. 启动 ZooKeeper 和 Kafka 代理:
# 启动 ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties# 启动 Kafka 代理
bin/kafka-server-start.sh config/server.properties
  1. 在另一个终端中,运行消费者脚本:
php Consumer.php
  1. 在另一个终端中,运行生产者脚本:
php Producer.php
  1. 使用 HTTP 客户端(如 Postman 或 curl)向生产者发送 POST 请求:
curl -X POST http://127.0.0.1:9501 -d "message=Hello from Swoole!"

消费者将在终端中接收到消息。

五、总结

通过结合 Apache Kafka 和 Swoole,我们能够构建一个高效的消息传递系统。Kafka 提供了可靠的消息队列,而 Swoole 则为 PHP 提供了高并发处理能力。本文中的示例展示了如何使用这两者创建简单的生产者和消费者。随着项目需求的增加,我们可以进一步扩展该系统,例如进行消息处理、增加错误处理逻辑、实现数据持久化等。

Kafka 和 Swoole 的组合使得开发实时数据处理和高性能应用变得更加容易,是现代应用架构中不可或缺的一部分。

相关文章:

Kafka简单实践

使用 Apache Kafka 和 Swoole 的 PHP 实践案例 一、引言 Apache Kafka 是一个开源的分布式流处理平台&#xff0c;能够处理大量的实时数据流。由于其高吞吐量、可扩展性和持久性&#xff0c;Kafka 成为构建微服务架构和大数据处理的重要工具。Swoole 是一个高性能的异步网络通…...

JS

文章目录 项目地址一、JS1.1 if语句1.2 for循环1.2 三元表达式1.3 switch1.4 数组的push方法1.5 fuction1.5.1 arguments1.6 匿名函数1.7 预解析1.8 js对象1.8.1创建一个类1.8.2 遍历对象1.9 js的内置对象1.9.1 随机整数二、DOM2.1 获取元素2.2 事件基础2.2.1 事件三要素2.2.2 …...

【原创】java+ssm+mysql商品库存管理系统(进销存)设计与实现

个人主页&#xff1a;程序猿小小杨 个人简介&#xff1a;从事开发多年&#xff0c;Java、Php、Python、前端开发均有涉猎 博客内容&#xff1a;Java项目实战、项目演示、技术分享 文末有作者名片&#xff0c;希望和大家一起共同进步&#xff0c;你只管努力&#xff0c;剩下的交…...

three.js 杂记

欧拉角旋转变换 x,y,z 弧度单位 THREE.MathUtils.DEG2RAD 度数转弧度 new THREE.Euler( - 90 * THREE.MathUtils.DEG2RAD, 0, 0 ) radius:半径 setFromSphericalCoords ( radius : Float, phi : Float, theta : Float ) : this 从球坐标中的radius、phi和theta设置该向量…...

基于Hadoop、hive的数仓搭建实践

文章目录 架构图Hadoop搭建Hive 搭建MySQL搭建官网文档下载配置配置hive环境变量配置日志文件配置hive-site 复制mysql 驱动包删除日志包初始化元数据启动metastore服务使用hive CLI启动hiveServer2访问hiveserver2客户端连接beeline shell连接 Dbeaver连接经验 基于HDFS Hive…...

新的恶意软件活动通过游戏应用程序瞄准 Windows 用户

一种新的恶意软件 Winos4.0 被积极用于网络攻击活动。FortiGuard实验室发现&#xff0c;这种先进的恶意框架是从臭名昭著的 Gh0strat 演变而来的&#xff0c;配备了模块化组件&#xff0c;可在受感染的设备上进行一系列恶意活动。 这些攻击已在游戏相关应用程序中发现&#xf…...

【Hutool系列】反射工具-ReflectUtil

前言 反射是 Java 中一种强大的机制&#xff0c;可以在运行时动态地获取类的信息并操作类的属性和方法。在 Java 中&#xff0c;通过反射可以获取和设置类的字段、调用类的方法、创建类的实例等。Java的反射机制&#xff0c;可以让语言变得更加灵活&#xff0c;对对象的操作也更…...

【操作系统专业课】第二次作业

第1题(进程同步与互斥) 使用二值信号量实现 n 个进程之间的互斥。 1. 定义一个二值信号量 mutex= 1。 二值信号量:二值信号量只有两种取值,0 (资源已被占用)和 1(资源可用)。 2. 进程进入临界区前的操作:每个进程在进入临界区之前,都需要执行 P(mutex) 操作。 P 操作…...

Scala的迭代器

1.对比foreach 它的优点在于&#xff1a; (1) 内存效率高。迭代器采用延迟计算的方式&#xff0c;它不会将整个集合加载到内存中&#xff0c;而是在每次调用next方法时才计算并返回下一个元素。 (2) 统一的遍历方法。迭代器为不同类型的集合&#xff08;如列表、集合、映射等…...

(RK3566驱动开发 - 1).pinctrl和gpio子系统

一.设备树 pinctrl部分可以参考 rockchip 官方的绑定文档 &#xff1a;kernel/Documentation/devicetree/bindings/pinctrl PIN_BANK&#xff1a;引脚所属的组 - 本次例程使用的是 GPIO3_A1 这个引脚&#xff0c;所以所属的组为 3&#xff1b; PIN_BANK_IDX&#xff1a;引脚的…...

css三角制作(二十课)

代码&#xff1a; <style>/* 边框原理 */.box1 {width: 0;height: 0;border-top: 100px solid pink;border-bottom: 100px solid blue;border-left: 100px solid yellow;border-right: 100px solid greenyellow;}/* 三角制作 */.box2 {width: 0;height: 0;border: 100px …...

C++_priority_queue(优先级队列)

✨✨ 欢迎大家来到小伞的大讲堂✨✨ &#x1f388;&#x1f388;养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; 所属专栏&#xff1a;C学习 小伞的主页&#xff1a;xiaosan_blog 1. priority_queue的介绍和使用 priority_queue文档介绍 优先级队列的实现的关键…...

微信小程序——01开发前的准备和开发工具

文章目录 一、开发前的准备1注册小程序账号2安装开发者工具 二、开发者工具的使用1创建项目2 工具的使用3目录结构4各个页面之间的关系5 权限管理6提交审核和发布 一、开发前的准备 开发前需要进行以下准备&#xff1a; 1 注册小程序账号2激活邮箱3 信息登记4 登录小程序管理后…...

MySQL 的主从复制数据同步

一、什么是 MySQL 的主从复制 MySQL 的主从复制&#xff08;Master-Slave Replication&#xff09;是一种将数据从一个主数据库服务器&#xff08;主库&#xff09;复制到一个或多个从数据库服务器&#xff08;从库&#xff09;的技术。主库负责所有的数据写操作&#xff0c;从…...

python——面向对象

一、面向对象编程 1.1 面向过程与面向对象 面向过程和面向对象都是一种编程方式&#xff0c;只不过再设计上有区别。 1.1.1 面向过程pop&#xff1a; 举例&#xff1a;孩子上学 1. 妈妈起床 2. 妈妈洗漱 3. 妈妈做饭 4. 妈妈把孩子叫起来 5. 孩子起床 6. 孩子洗漱 7. 孩子吃…...

Microsoft 365 Exchange如何设置可信发件IP白名单

1、 进入到 Microsoft 365 admin center 管理中心 &#xff0c;点击 管理中心 下的 安全 在弹出的新页面中&#xff0c;依次点击 策略和规则 – 威胁策略 – 反垃圾邮件 再单击 连接筛选器策略(默认) – 编辑连接筛选器策略 2、在 IP 允许列表 中添加可信邮件 IP 段&#xff0…...

LM27313典型电路之升压电路

下图为升压芯片LM27313典型电路图&#xff1a; 从图中可以看出&#xff1a;系统电压VSYS3.7伏&#xff0c;通过C26与C27两个滤波电容后&#xff0c;到达升压芯片的VIN输入脚pin5。 其中电源芯片的电压输出由下式子决定&#xff1a; VOUT1.23*(1R17/R21) 其中VOUT是图中的V5D…...

嵌入式面试八股文(七)·#ifndef#define#endif的作用、以及内存分区(全局区、堆区、栈区、代码区)

目录 1. 头文件中的#ifndef / #define / #endif的作用是什么&#xff1f; 2. 内存分区&#xff1a;全局区、堆区、栈区、代码区简单描述&#xff1f; 2.1 代码区&#xff08;Text Segment&#xff09;&#xff1a; 2.2 全局区&#xff08;Data Segment&#xff09;&…...

【弱监督视频异常检测】2024-ESWA-基于扩散的弱监督视频异常检测常态预训练

2024-ESWA-Diffusion-based normality pre-training for weakly supervised video anomaly detection 基于扩散的弱监督视频异常检测常态预训练摘要1. 引言2. 相关工作3. 方法论3.1. 使用扩散自动编码器进行常态学习3.2. 全局-局部特征编码器3.2.1 局部块3.2.2 全局块3.2.3 协同…...

Android 13 实现屏幕熄屏一段时候后关闭 Wi-Fi 和清空多任务列表

明白了,您这个补丁的功能是当设备屏幕关闭一段时间后,自动关闭 Wi-Fi 连接并清空多任务菜单。以下是更新后的博客内容,包含了对功能的详细解释和代码实现: 修改 PowerManagerService.java 以实现屏幕灭屏后关闭 Wi-Fi 和清空多任务菜单功能 在本篇博客中,我们将介绍一个针…...

手把手图解:用Wireshark抓个包,带你‘看见’一次IMS注册和SIP会话的全过程

手把手图解&#xff1a;用Wireshark抓个包&#xff0c;带你‘看见’一次IMS注册和SIP会话的全过程 通信工程师的日常工作中&#xff0c;最令人着迷的莫过于将抽象的网络协议转化为可视化的数据流。当终端设备向IMS核心网发起注册并建立语音会话时&#xff0c;背后究竟发生了什么…...

如何彻底解决Mac设备滚动方向冲突:Scroll Reverser终极配置指南

如何彻底解决Mac设备滚动方向冲突&#xff1a;Scroll Reverser终极配置指南 【免费下载链接】Scroll-Reverser Per-device scrolling prefs on macOS. 项目地址: https://gitcode.com/gh_mirrors/sc/Scroll-Reverser 你是不是经常在Mac上同时使用触控板和鼠标&#xff0…...

猫抓Cat-Catch:浏览器资源嗅探神器,轻松下载网页视频和流媒体资源

猫抓Cat-Catch&#xff1a;浏览器资源嗅探神器&#xff0c;轻松下载网页视频和流媒体资源 【免费下载链接】cat-catch 猫抓 浏览器资源嗅探扩展 / cat-catch Browser Resource Sniffing Extension 项目地址: https://gitcode.com/GitHub_Trending/ca/cat-catch 你是否曾…...

汽车供应链客户定位方法拆解:复杂B2B能力如何被客户看懂

从B2B表达方法看&#xff0c;汽车供应链客户定位可以理解为一个“客户判断结构化”的问题。企业不是简单输出自我介绍&#xff0c;而是要把技术能力、项目经验、质量体系、协同机制与证据材料&#xff0c;转化为客户不同角色都能使用的判断信息。很多汽车供应商在做客户定位时&…...

魔百盒CM311-1s刷机后体验:安卓9.0固件到底香不香?附5621DS无线实测

魔百盒CM311-1s刷机实战&#xff1a;安卓9.0系统深度评测与无线性能揭秘 当手中的魔百盒CM311-1s遇上安卓9.0系统&#xff0c;这场硬件与软件的碰撞会擦出怎样的火花&#xff1f;作为一款搭载S905L3B芯片的电视盒子&#xff0c;其原生系统往往受限于运营商定制化限制&#xff0…...

从Quill光标到用户头像:手把手教你为Yjs协同编辑器添加完整的在线用户列表(附状态同步技巧)

从Quill光标到用户头像&#xff1a;构建企业级协同编辑器的完整用户感知系统 在数字化办公场景中&#xff0c;协同编辑器的用户体验往往决定了团队协作效率的上限。当多个用户同时编辑同一份文档时&#xff0c;简单的光标显示已无法满足现代团队对协作透明度的需求。本文将深入…...

保姆级教程:在群晖DSM 7.2上为虚幻引擎5项目配置Perforce Helix Core(附TypeMap避坑清单)

群晖DSM 7.2上为虚幻引擎5配置Perforce Helix Core全指南 对于独立游戏开发者和小型工作室来说&#xff0c;版本控制系统是项目管理的基石。Perforce Helix Core以其卓越的大文件处理能力&#xff0c;成为虚幻引擎项目版本控制的首选方案。本文将手把手指导你在群晖NAS上搭建Pe…...

别再死记硬背公式了!用‘推磨小矮人’和‘磁极跳舞’理解PMSM的电角度与机械角度

用“推磨小矮人”和“磁极跳舞”轻松掌握PMSM角度转换 电机控制领域的初学者常被永磁同步电机&#xff08;PMSM&#xff09;中电角度与机械角度的关系困扰。传统教材中“电角度极对数机械角度”的公式虽然简洁&#xff0c;却缺乏直观的物理图像支撑。本文将用两个生活化的比喻…...

别再只会if-else了!用STM32状态机实现按键短按、长按、双击(附完整代码)

STM32状态机实战&#xff1a;从零设计支持短按、长按、双击的按键驱动库 在嵌入式开发中&#xff0c;按键处理看似简单&#xff0c;却是最能体现开发者设计功力的场景之一。传统的中断加延时消抖方式虽然能快速实现功能&#xff0c;但随着需求复杂化&#xff08;比如需要区分短…...

智能手表核心升级:三星OLED与4nm处理器如何重塑用户体验

1. 项目概述&#xff1a;一次旗舰智能手表核心元件的深度迭代最近看到一条关于谷歌Pixel Watch 2的消息&#xff0c;核心信息点很明确&#xff1a;屏幕将由三星供应OLED面板&#xff0c;同时处理器将升级到4纳米制程。这看起来只是两个硬件参数的简单罗列&#xff0c;但对于我们…...