php实现kafka
kafka类:
<?phpclass b2c_kafka
{public $broker_list;public $topic;public $group_id;protected $producer = null;protected $consumer = null;protected $receive_wait_time;protected $receive_wait_num;/*** 构造方法* @param object app*/public function __construct(){$this->broker_list = 'kafka';$this->topic = 'local-cn';$this->group_id = ' kafka-map';$this->producer = null;$this->consumer = null;$this->receive_wait_time = 10;$this->receive_wait_num = 100;}/*** 获取生产者*/public function Producer(){$conf = new \RdKafka\Conf();// $conf->set('bootstrap.servers', $this->broker_list);$conf->set('metadata.broker.list', $this->broker_list);// 0:不会等待服务器的反馈。该消息会被立刻添加到 socket buffer 中并认为已经发送完成// 1:leader节点会将记录写入本地日志,并且在所有 follower 节点反馈之前就先确认成功// all:leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成$conf->set('acks', '0');//If you need to produce exactly once and want to keep the original produce order, uncomment the line below//$conf->set('enable.idempotence', 'true');$producer = new \RdKafka\Producer($conf);$this->producer = $producer;return $this;}/*** 发送消息** @param string|array $msg* @param string $topic* @return void*/public function SendMsg($msg = '', $topic = ''){if (empty($topic)) {$topic = $this->topic;}$producer = $this->producer;$topic = $producer->newTopic($topic);if (!is_array($msg)) {$msg = [$msg];}foreach ($msg as $value) {$topic->produce(RD_KAFKA_PARTITION_UA, 0, $value);$producer->poll(0);}for ($flushRetries = 0; $flushRetries < count($msg); $flushRetries++) {$result = $producer->flush(10000); if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {break;}}if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {throw new \RuntimeException('Kafka消息发送失败,错误代码:' . $result);}}/*** 获取消费者** @param string $group_id* @return void*/public function Consumer($group_id = ''){$conf = new \RdKafka\Conf();// Set a rebalance callback to log partition assignments (optional)$conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {switch ($err) {case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:echo "Assign: ";var_dump($partitions);$kafka->assign($partitions);break;case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:echo "Revoke: ";var_dump($partitions);$kafka->assign(NULL);break;default:throw new \Exception($err);}});// Configure the group.id. All consumer with the same group.id will consume// different partitions.if (empty($group_id)) {$group_id = $this->group_id;}// 设置相同的group,防止一次消息被多次消费。// 消费者启动的进程数应小于等于topic的分区数,否则多余的进程是无用的$conf->set('group.id', $group_id);// Initial list of Kafka brokers// $conf->set('bootstrap.servers', $this->broker_list);$conf->set('metadata.broker.list', $this->broker_list);// Set where to start consuming messages when there is no initial offset in// offset store or the desired offset is out of range.// 'smallest': start from the beginning//earliest:简单理解为从头开始消费,latest:简单理解为从最新的开始消费$conf->set('auto.offset.reset', 'earliest');// 在interval.ms的时间内定期向ZooKeeper提交使用者已获取的消息的偏移量// 自动提交分区消费的位置,手动可确保消息被消费$conf->set('enable.auto.commit', true);$conf->set('auto.commit.interval.ms', 1000);$consumer = new \RdKafka\KafkaConsumer($conf);$this->consumer = $consumer;return $this;}/*** 接收消息** @param string $topic* @param array $callback* @return void*/public function ReceiveMsg($topic = '', array $callback = []){$consumer = $this->consumer;if (empty($topic)) {$topic = $this->topic;}if (!is_array($topic)) {$topic = [$topic];}// Subscribe to topic 'test'$consumer->subscribe($topic);echo "Waiting for partition assignment... (make take some time when\n";echo "quickly re-joining the group after leaving it.)\n";$i = 0;$msg_list = [];while (true) {$i++;if ($i > $this->receive_wait_time) {$i = 0;if (empty($msg_list)) {continue;}if (!empty($callback)) {call_user_func_array($callback, [$msg_list]);}$msg_list = [];}// 阻塞一秒钟$message = $consumer->consume(1000);switch ($message->err) {case RD_KAFKA_RESP_ERR_NO_ERROR:$msg_list[] = $message->payload;if (count($msg_list) < $this->receive_wait_num) {continue;}if (!empty($callback)) {call_user_func_array($callback, [$msg_list]);}$i = 0;$msg_list = [];break;case RD_KAFKA_RESP_ERR__PARTITION_EOF:// echo "No more messages; will wait for more\n";break;case RD_KAFKA_RESP_ERR__TIMED_OUT:// echo "Timed out\n";break;default:throw new \Exception($message->errstr(), $message->err);break;}}}}
调用:
$data['member_id'] = '121221';
$data['ip'] = $this->getIp();
$data['timestamp'] = $this->getMicrotime();$kafkaObj = new kafka();
$kafkaObj->Producer()->sendMsg(json_encode($data, 320));
相关文章:
php实现kafka
kafka类: <?phpclass b2c_kafka {public $broker_list;public $topic;public $group_id;protected $producer null;protected $consumer null;protected $receive_wait_time;protected $receive_wait_num;/*** 构造方法* param object app*/public function …...
YOLOv10改进系列,YOLOv10损失函数更换为Powerful-IoU(2024年最新IOU),助力高效涨点
改进前训练结果: 改进后的结果: 摘要 边界框回归(BBR)是目标检测中的核心任务之一,BBR损失函数显著影响其性能。然而,观察到现有基于IoU的损失函数存在不合理的惩罚因子,导致回归过程中锚框扩展,并显著减缓收敛速度。为了解决这个问题,深入分析了锚框扩展的原因。针…...
工具知识 | Linux 常用命令参考手册
目录 文件 查看文件内容 headtailcatnlmore 创建 touchmkdirmktemp 删除 rmrmdir 查找文件 findlocate lspwdwcchattrpastestatgrepsedcdcpmvopensourcetreelnfilesortuniqsplitvim 系统管理 nohupwatchpingwhichshutdownrebootuptimecrontabatunameifconfigwhereischmodlsofc…...
mysql 常用知识点总结
MySQL 是一种广泛使用的关系型数据库管理系统(RDBMS),它基于结构化查询语言(SQL)。了解 MySQL 的语法对数据库管理和操作非常重要。以下是 MySQL 语法的详细完整解释,涵盖基本概念、创建表、查询、修改数据…...
conda常用指令
1、查看conda版本 conda --version 2、更新conda conda update conda 3、查看conda环境信息 conda info 4、查看已有虚拟环境 conda info --envs conda info -e conda env list 5、创建新虚拟环境 conda create --name myenv python3.8 6、激活环境和退出环境 conda…...
前后端分离项目--下载功能
文章目录 不使用代理服务器blobblob构造函数通过FormData对象的getBlob方法创建Blob对象将Blob对象转换成UR 使用代理服务器 前后端分离项目中下载与其他接口的使用不同,一般下载不走node,不通过代理服务器,而是直接在前台发送请求࿰…...
PMP--一模--解题--81-90
文章目录 4.整合管理81、 [单选] 一位先前不活跃的干系人参与程度突然增加,这种意外的参与导致了一些变更请求。项目经理应该做什么? 4.整合管理82、 [单选] 公司的新产品系列将在两个月内发布,95%的项目任务均已完成。但是,管理层…...
计算机网络 --- 【2】计算机网络的组成、功能
目录 一、计算机网络的组成 1.1 从组成部分看 1.2 从工作方式看 1.3 从逻辑功能看 1.4 总结 二、计算机网络的功能 2.1 数据通信 2.2 资源共享编辑 2.3 分布式处理 2.4 提高可靠性 2.5 负载均衡 一、计算机网络的组成 1.1 从组成部分看 我们举例分析计算机网络从…...
『功能项目』切换职业技能面板【49】
我们打开上一篇48切换职业面板的项目, 本章要做的事情是制作第二职业法师技能面板、第三职业面板并且完成切换 双击打开Canvas进入预制体空间 复制三个技能栏面板 重命名 设置第一技能栏 设置第二职业技能栏 设置第三职业技能栏 修改脚本:ChangeProfess…...
寻找排名好的自闭症学校?这些关键因素不可忽视
郑州市如果有一家如星贝育园这样的自闭症公办学校,那无疑将为当地的自闭症儿童及其家庭带来巨大的福音。星贝育园所展现出的专业性、承诺的康复效果保障、以及为特殊儿童提供的全方位支持,都体现了其对自闭症儿童教育康复事业的深刻理解和高度责任感。 …...
Git常用命令(记录)
提交代码 git status 查看状态git add .或者git add xx选择提交全部或者某文件git commit -m “提交信息”git push 创建新分支提交到新的分支 git checkout -b [branch-name] 创建并切换到新分支git add [file-name] 将要上传的文件添加到暂存区git commit -m “commit mes…...
STM32+ESP8266 WiFi连接机智云平台APP远程控制教程
本文档将介绍如何用STM32ESP8266 WiFi模块从零开始连接上机智云,并通过APP进行远程控制。 机智云官网:机智云|智能物联网操作系统 (gizwits.com) 准备:STM32、ESP8266、手机、可上网的WiFi。 1.创建设备 1.1 注册登陆 请自行注册账号并登陆…...
学懂C++(六十):C++ 11、C++ 14、C++ 17、C++ 20新特性大总结(万字详解大全)
一、引言 随着计算机科学与技术的飞速发展,编程语言也在不断进化以满足日益增长的需求。C是一门集高性能和灵活性于一身的编程语言,自1983年诞生以来不断演进,逐渐成为了众多领域的主流编程语言。为了进一步提升开发效率和代码质量࿰…...
杭电1008电梯
提供两种做法,第一种不知道为啥不ac。。。 #include<iostream> using namespace std; //不清楚为什么报错了 int a[10000],x[10000]; int main(){int n;while(cin>>n,n!0){for(int i0;i<n;i){cin>>a[i];if(i0) x[i](65)*a[i]-5;else {if(a[i-…...
【Python小知识 - 2】:在VSCode中切换Python解释器版本
文章目录 在VSCode中切换Python解释器版本 在VSCode中切换Python解释器版本 在VSCode中按下快捷键CtrlShiftP,出现命令框。 输入以下命令: Python: Select Interpreter输入命令回车后即出现不同的Python解释器选项,选择想要切换的Python解释器…...
ubuntu meson安装
安装 sudo python3 -m pip install meson sudo python3 -m pip install ninja问题 libdrm ERROR: Dependency “libdrm” not found, tried pkgconfig and cmake Unable to correct problems, you have held broken packages. sudo apt-get update --fix-missing sudo apt in…...
记者协会评审系统-需求分析
记者协会评审系统-需求分析 文章目录 记者协会评审系统-需求分析一、参与角色二、业务流程2.1 作品推荐收集2.2 初步审核2.3 评审功能(初评)2.4&5 定评和审定三、功能清单2.1 基础功能2.2 业务功能2.3.1.单位投稿功能2.3.2.推荐作品分类管理2.3.3. 稿件初审2.3.4.评审功能…...
python 检索与该查询最相似的句子 使用库hflayers和sentence_transformers来实现遇到的问题
此篇文章总结遇到三个问题(3. OSError: We couldn’t connect to ‘https://huggingface.co’ to load this file, couldn’t find it in the cached files and it looks like sentence-transformers/all-mpnet-base-v2 is not the path to a directory containing a file nam…...
计算机毕业设计 在线新闻聚合平台的设计与实现 Java+SpringBoot+Vue 前后端分离 文档报告 代码讲解 安装调试
🍊作者:计算机编程-吉哥 🍊简介:专业从事JavaWeb程序开发,微信小程序开发,定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事,生活就是快乐的。 🍊心愿:点…...
【机器学习随笔】概率论与实际问题的对应
主要从直观看待问题的角度,对概率分布进行分类。 一、时间维度 1、一个事件两次发生的时间间隔或者说,单位时间内发生了两次的概率,用指数分布 2、多个事件发生的时间间隔,用Gamma分布。 3、单位时间了发生了k次的概率,…...
OpenClaw效率对比:Qwen2.5-VL-7B与传统OCR工具在文档处理中的表现
OpenClaw效率对比:Qwen2.5-VL-7B与传统OCR工具在文档处理中的表现 1. 测试背景与动机 最近在整理公司历史项目文档时,遇到了一个棘手的问题:大量扫描版PDF和图片格式的技术文档需要数字化处理。这些文档包含代码片段、手写注释和复杂表格&a…...
Three.js模型加载太慢?试试这个gltf-pipeline压缩技巧,亲测有效!
Three.js模型加载优化实战:gltf-pipeline压缩技巧详解 在Web 3D开发中,Three.js无疑是构建沉浸式体验的首选工具之一。然而,随着3D模型复杂度的提升,文件体积膨胀导致的加载延迟成为开发者面临的普遍挑战。想象一下,用…...
STM32驱动X-NUCLEO-IHM02A1实现工业级步进电机控制
1. X-NUCLEO-IHM02A1 驱动开发深度解析:面向工业级步进电机控制的 STM32 底层实现 X-NUCLEO-IHM02A1 是意法半导体(STMicroelectronics)推出的高性能双通道步进电机驱动扩展板,专为 STM32 Nucleo 开发平台设计。该板基于 STSPIN22…...
大学生食品安全科普网页——web网页期末大作业
(文件先保存到自己网盘,谨防文件丢失!!) 源码获取地址 链接: https://pan.baidu.com/s/1r6C8_J31D01e1uG3FJi27w?pwdzxxh提取码: zxxhhtml科普网页源码 ✅ 网页一共6个页面 ✅ 网页使用html css js完成 布局简单 ✅…...
实战指南:基于快马平台生成企业级cc switch管理系统,助力游戏项目开发
今天想和大家分享一个在游戏开发中特别实用的技术——CC Switch系统。这个系统在商业游戏项目中经常被用来做调试和功能开关控制,最近我在InsCode(快马)平台上快速实现了一个完整的企业级解决方案,整个过程特别顺畅。 先说说什么是CC Switch。简单理解就…...
2026 年1月 13 日-KB5074109(OS内部版本 26200.7623 和 26100.7623)
🔥个人主页:杨利杰YJlio❄️个人专栏:《Sysinternals实战教程》《Windows PowerShell 实战》《WINDOWS教程》《IOS教程》《微信助手》《锤子助手》 《Python》 《Kali Linux》《那些年未解决的Windows疑难杂症》🌟 让复杂的事情更…...
Python 中的正则表达式:从基础到高级应用
Python 中的正则表达式:从基础到高级应用 1. 背景介绍 正则表达式(Regular Expression,简称 regex 或 regexp)是一种用于匹配字符串中字符组合的模式。在 Python 中,正则表达式是处理文本的强大工具,它可以…...
失业期PHP程序员极致利用时间的庖丁解
"失业期 PHP 程序员极致利用时间”,常被误解为“疯狂投简历”或“没日没夜地刷 LeetCode”。 但本质上,这是一场**“认知重构”与“资产增值”的特种战役**。 失业不是“空窗期”,而是上帝强行塞给你的**“全脱产战略转型期”**。 在在职…...
云原生环境中的边缘计算应用
云原生环境中的边缘计算应用 引言:边缘计算的崛起 哥们,别整那些花里胡哨的!作为一个前端开发兼摇滚鼓手,我最烦的就是延迟。在云原生时代,边缘计算让我们离用户更近,减少延迟。今天,我就给你们…...
2026最权威的五大降重复率网站横评
Ai论文网站排名(开题报告、文献综述、降aigc率、降重综合对比) TOP1. 千笔AI TOP2. aipasspaper TOP3. 清北论文 TOP4. 豆包 TOP5. kimi TOP6. deepseek 需从词汇、句式跟逻辑这三方面来着手,以求降低AI生成内容的可识别性。于词汇方面…...
