php实现kafka
kafka类:
<?phpclass b2c_kafka
{public $broker_list;public $topic;public $group_id;protected $producer = null;protected $consumer = null;protected $receive_wait_time;protected $receive_wait_num;/*** 构造方法* @param object app*/public function __construct(){$this->broker_list = 'kafka';$this->topic = 'local-cn';$this->group_id = ' kafka-map';$this->producer = null;$this->consumer = null;$this->receive_wait_time = 10;$this->receive_wait_num = 100;}/*** 获取生产者*/public function Producer(){$conf = new \RdKafka\Conf();// $conf->set('bootstrap.servers', $this->broker_list);$conf->set('metadata.broker.list', $this->broker_list);// 0:不会等待服务器的反馈。该消息会被立刻添加到 socket buffer 中并认为已经发送完成// 1:leader节点会将记录写入本地日志,并且在所有 follower 节点反馈之前就先确认成功// all:leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成$conf->set('acks', '0');//If you need to produce exactly once and want to keep the original produce order, uncomment the line below//$conf->set('enable.idempotence', 'true');$producer = new \RdKafka\Producer($conf);$this->producer = $producer;return $this;}/*** 发送消息** @param string|array $msg* @param string $topic* @return void*/public function SendMsg($msg = '', $topic = ''){if (empty($topic)) {$topic = $this->topic;}$producer = $this->producer;$topic = $producer->newTopic($topic);if (!is_array($msg)) {$msg = [$msg];}foreach ($msg as $value) {$topic->produce(RD_KAFKA_PARTITION_UA, 0, $value);$producer->poll(0);}for ($flushRetries = 0; $flushRetries < count($msg); $flushRetries++) {$result = $producer->flush(10000); if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {break;}}if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {throw new \RuntimeException('Kafka消息发送失败,错误代码:' . $result);}}/*** 获取消费者** @param string $group_id* @return void*/public function Consumer($group_id = ''){$conf = new \RdKafka\Conf();// Set a rebalance callback to log partition assignments (optional)$conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {switch ($err) {case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:echo "Assign: ";var_dump($partitions);$kafka->assign($partitions);break;case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:echo "Revoke: ";var_dump($partitions);$kafka->assign(NULL);break;default:throw new \Exception($err);}});// Configure the group.id. All consumer with the same group.id will consume// different partitions.if (empty($group_id)) {$group_id = $this->group_id;}// 设置相同的group,防止一次消息被多次消费。// 消费者启动的进程数应小于等于topic的分区数,否则多余的进程是无用的$conf->set('group.id', $group_id);// Initial list of Kafka brokers// $conf->set('bootstrap.servers', $this->broker_list);$conf->set('metadata.broker.list', $this->broker_list);// Set where to start consuming messages when there is no initial offset in// offset store or the desired offset is out of range.// 'smallest': start from the beginning//earliest:简单理解为从头开始消费,latest:简单理解为从最新的开始消费$conf->set('auto.offset.reset', 'earliest');// 在interval.ms的时间内定期向ZooKeeper提交使用者已获取的消息的偏移量// 自动提交分区消费的位置,手动可确保消息被消费$conf->set('enable.auto.commit', true);$conf->set('auto.commit.interval.ms', 1000);$consumer = new \RdKafka\KafkaConsumer($conf);$this->consumer = $consumer;return $this;}/*** 接收消息** @param string $topic* @param array $callback* @return void*/public function ReceiveMsg($topic = '', array $callback = []){$consumer = $this->consumer;if (empty($topic)) {$topic = $this->topic;}if (!is_array($topic)) {$topic = [$topic];}// Subscribe to topic 'test'$consumer->subscribe($topic);echo "Waiting for partition assignment... (make take some time when\n";echo "quickly re-joining the group after leaving it.)\n";$i = 0;$msg_list = [];while (true) {$i++;if ($i > $this->receive_wait_time) {$i = 0;if (empty($msg_list)) {continue;}if (!empty($callback)) {call_user_func_array($callback, [$msg_list]);}$msg_list = [];}// 阻塞一秒钟$message = $consumer->consume(1000);switch ($message->err) {case RD_KAFKA_RESP_ERR_NO_ERROR:$msg_list[] = $message->payload;if (count($msg_list) < $this->receive_wait_num) {continue;}if (!empty($callback)) {call_user_func_array($callback, [$msg_list]);}$i = 0;$msg_list = [];break;case RD_KAFKA_RESP_ERR__PARTITION_EOF:// echo "No more messages; will wait for more\n";break;case RD_KAFKA_RESP_ERR__TIMED_OUT:// echo "Timed out\n";break;default:throw new \Exception($message->errstr(), $message->err);break;}}}}
调用:
$data['member_id'] = '121221';
$data['ip'] = $this->getIp();
$data['timestamp'] = $this->getMicrotime();$kafkaObj = new kafka();
$kafkaObj->Producer()->sendMsg(json_encode($data, 320));
相关文章:
php实现kafka
kafka类: <?phpclass b2c_kafka {public $broker_list;public $topic;public $group_id;protected $producer null;protected $consumer null;protected $receive_wait_time;protected $receive_wait_num;/*** 构造方法* param object app*/public function …...
YOLOv10改进系列,YOLOv10损失函数更换为Powerful-IoU(2024年最新IOU),助力高效涨点
改进前训练结果: 改进后的结果: 摘要 边界框回归(BBR)是目标检测中的核心任务之一,BBR损失函数显著影响其性能。然而,观察到现有基于IoU的损失函数存在不合理的惩罚因子,导致回归过程中锚框扩展,并显著减缓收敛速度。为了解决这个问题,深入分析了锚框扩展的原因。针…...
工具知识 | Linux 常用命令参考手册
目录 文件 查看文件内容 headtailcatnlmore 创建 touchmkdirmktemp 删除 rmrmdir 查找文件 findlocate lspwdwcchattrpastestatgrepsedcdcpmvopensourcetreelnfilesortuniqsplitvim 系统管理 nohupwatchpingwhichshutdownrebootuptimecrontabatunameifconfigwhereischmodlsofc…...
mysql 常用知识点总结
MySQL 是一种广泛使用的关系型数据库管理系统(RDBMS),它基于结构化查询语言(SQL)。了解 MySQL 的语法对数据库管理和操作非常重要。以下是 MySQL 语法的详细完整解释,涵盖基本概念、创建表、查询、修改数据…...
conda常用指令
1、查看conda版本 conda --version 2、更新conda conda update conda 3、查看conda环境信息 conda info 4、查看已有虚拟环境 conda info --envs conda info -e conda env list 5、创建新虚拟环境 conda create --name myenv python3.8 6、激活环境和退出环境 conda…...
前后端分离项目--下载功能
文章目录 不使用代理服务器blobblob构造函数通过FormData对象的getBlob方法创建Blob对象将Blob对象转换成UR 使用代理服务器 前后端分离项目中下载与其他接口的使用不同,一般下载不走node,不通过代理服务器,而是直接在前台发送请求࿰…...
PMP--一模--解题--81-90
文章目录 4.整合管理81、 [单选] 一位先前不活跃的干系人参与程度突然增加,这种意外的参与导致了一些变更请求。项目经理应该做什么? 4.整合管理82、 [单选] 公司的新产品系列将在两个月内发布,95%的项目任务均已完成。但是,管理层…...
计算机网络 --- 【2】计算机网络的组成、功能
目录 一、计算机网络的组成 1.1 从组成部分看 1.2 从工作方式看 1.3 从逻辑功能看 1.4 总结 二、计算机网络的功能 2.1 数据通信 2.2 资源共享编辑 2.3 分布式处理 2.4 提高可靠性 2.5 负载均衡 一、计算机网络的组成 1.1 从组成部分看 我们举例分析计算机网络从…...
『功能项目』切换职业技能面板【49】
我们打开上一篇48切换职业面板的项目, 本章要做的事情是制作第二职业法师技能面板、第三职业面板并且完成切换 双击打开Canvas进入预制体空间 复制三个技能栏面板 重命名 设置第一技能栏 设置第二职业技能栏 设置第三职业技能栏 修改脚本:ChangeProfess…...
寻找排名好的自闭症学校?这些关键因素不可忽视
郑州市如果有一家如星贝育园这样的自闭症公办学校,那无疑将为当地的自闭症儿童及其家庭带来巨大的福音。星贝育园所展现出的专业性、承诺的康复效果保障、以及为特殊儿童提供的全方位支持,都体现了其对自闭症儿童教育康复事业的深刻理解和高度责任感。 …...
Git常用命令(记录)
提交代码 git status 查看状态git add .或者git add xx选择提交全部或者某文件git commit -m “提交信息”git push 创建新分支提交到新的分支 git checkout -b [branch-name] 创建并切换到新分支git add [file-name] 将要上传的文件添加到暂存区git commit -m “commit mes…...
STM32+ESP8266 WiFi连接机智云平台APP远程控制教程
本文档将介绍如何用STM32ESP8266 WiFi模块从零开始连接上机智云,并通过APP进行远程控制。 机智云官网:机智云|智能物联网操作系统 (gizwits.com) 准备:STM32、ESP8266、手机、可上网的WiFi。 1.创建设备 1.1 注册登陆 请自行注册账号并登陆…...
学懂C++(六十):C++ 11、C++ 14、C++ 17、C++ 20新特性大总结(万字详解大全)
一、引言 随着计算机科学与技术的飞速发展,编程语言也在不断进化以满足日益增长的需求。C是一门集高性能和灵活性于一身的编程语言,自1983年诞生以来不断演进,逐渐成为了众多领域的主流编程语言。为了进一步提升开发效率和代码质量࿰…...
杭电1008电梯
提供两种做法,第一种不知道为啥不ac。。。 #include<iostream> using namespace std; //不清楚为什么报错了 int a[10000],x[10000]; int main(){int n;while(cin>>n,n!0){for(int i0;i<n;i){cin>>a[i];if(i0) x[i](65)*a[i]-5;else {if(a[i-…...
【Python小知识 - 2】:在VSCode中切换Python解释器版本
文章目录 在VSCode中切换Python解释器版本 在VSCode中切换Python解释器版本 在VSCode中按下快捷键CtrlShiftP,出现命令框。 输入以下命令: Python: Select Interpreter输入命令回车后即出现不同的Python解释器选项,选择想要切换的Python解释器…...
ubuntu meson安装
安装 sudo python3 -m pip install meson sudo python3 -m pip install ninja问题 libdrm ERROR: Dependency “libdrm” not found, tried pkgconfig and cmake Unable to correct problems, you have held broken packages. sudo apt-get update --fix-missing sudo apt in…...
记者协会评审系统-需求分析
记者协会评审系统-需求分析 文章目录 记者协会评审系统-需求分析一、参与角色二、业务流程2.1 作品推荐收集2.2 初步审核2.3 评审功能(初评)2.4&5 定评和审定三、功能清单2.1 基础功能2.2 业务功能2.3.1.单位投稿功能2.3.2.推荐作品分类管理2.3.3. 稿件初审2.3.4.评审功能…...
python 检索与该查询最相似的句子 使用库hflayers和sentence_transformers来实现遇到的问题
此篇文章总结遇到三个问题(3. OSError: We couldn’t connect to ‘https://huggingface.co’ to load this file, couldn’t find it in the cached files and it looks like sentence-transformers/all-mpnet-base-v2 is not the path to a directory containing a file nam…...
计算机毕业设计 在线新闻聚合平台的设计与实现 Java+SpringBoot+Vue 前后端分离 文档报告 代码讲解 安装调试
🍊作者:计算机编程-吉哥 🍊简介:专业从事JavaWeb程序开发,微信小程序开发,定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事,生活就是快乐的。 🍊心愿:点…...
【机器学习随笔】概率论与实际问题的对应
主要从直观看待问题的角度,对概率分布进行分类。 一、时间维度 1、一个事件两次发生的时间间隔或者说,单位时间内发生了两次的概率,用指数分布 2、多个事件发生的时间间隔,用Gamma分布。 3、单位时间了发生了k次的概率,…...
从零实现富文本编辑器#5-编辑器选区模型的状态结构表达
先前我们总结了浏览器选区模型的交互策略,并且实现了基本的选区操作,还调研了自绘选区的实现。那么相对的,我们还需要设计编辑器的选区表达,也可以称为模型选区。编辑器中应用变更时的操作范围,就是以模型选区为基准来…...
基于uniapp+WebSocket实现聊天对话、消息监听、消息推送、聊天室等功能,多端兼容
基于 UniApp + WebSocket实现多端兼容的实时通讯系统,涵盖WebSocket连接建立、消息收发机制、多端兼容性配置、消息实时监听等功能,适配微信小程序、H5、Android、iOS等终端 目录 技术选型分析WebSocket协议优势UniApp跨平台特性WebSocket 基础实现连接管理消息收发连接…...
YSYX学习记录(八)
C语言,练习0: 先创建一个文件夹,我用的是物理机: 安装build-essential 练习1: 我注释掉了 #include <stdio.h> 出现下面错误 在你的文本编辑器中打开ex1文件,随机修改或删除一部分,之后…...
基础测试工具使用经验
背景 vtune,perf, nsight system等基础测试工具,都是用过的,但是没有记录,都逐渐忘了。所以写这篇博客总结记录一下,只要以后发现新的用法,就记得来编辑补充一下 perf 比较基础的用法: 先改这…...
在鸿蒙HarmonyOS 5中使用DevEco Studio实现录音机应用
1. 项目配置与权限设置 1.1 配置module.json5 {"module": {"requestPermissions": [{"name": "ohos.permission.MICROPHONE","reason": "录音需要麦克风权限"},{"name": "ohos.permission.WRITE…...
打手机检测算法AI智能分析网关V4守护公共/工业/医疗等多场景安全应用
一、方案背景 在现代生产与生活场景中,如工厂高危作业区、医院手术室、公共场景等,人员违规打手机的行为潜藏着巨大风险。传统依靠人工巡查的监管方式,存在效率低、覆盖面不足、判断主观性强等问题,难以满足对人员打手机行为精…...
Python 实现 Web 静态服务器(HTTP 协议)
目录 一、在本地启动 HTTP 服务器1. Windows 下安装 node.js1)下载安装包2)配置环境变量3)安装镜像4)node.js 的常用命令 2. 安装 http-server 服务3. 使用 http-server 开启服务1)使用 http-server2)详解 …...
MySQL 索引底层结构揭秘:B-Tree 与 B+Tree 的区别与应用
文章目录 一、背景知识:什么是 B-Tree 和 BTree? B-Tree(平衡多路查找树) BTree(B-Tree 的变种) 二、结构对比:一张图看懂 三、为什么 MySQL InnoDB 选择 BTree? 1. 范围查询更快 2…...
Python实现简单音频数据压缩与解压算法
Python实现简单音频数据压缩与解压算法 引言 在音频数据处理中,压缩算法是降低存储成本和传输效率的关键技术。Python作为一门灵活且功能强大的编程语言,提供了丰富的库和工具来实现音频数据的压缩与解压。本文将通过一个简单的音频数据压缩与解压算法…...
aardio 自动识别验证码输入
技术尝试 上周在发学习日志时有网友提议“在网页上识别验证码”,于是尝试整合图像识别与网页自动化技术,完成了这套模拟登录流程。核心思路是:截图验证码→OCR识别→自动填充表单→提交并验证结果。 代码在这里 import soImage; import we…...
