当前位置: 首页 > news >正文

php实现kafka

kafka类:

<?phpclass b2c_kafka
{public $broker_list;public $topic;public $group_id;protected $producer = null;protected $consumer = null;protected $receive_wait_time;protected $receive_wait_num;/*** 构造方法* @param object app*/public function __construct(){$this->broker_list = 'kafka';$this->topic = 'local-cn';$this->group_id = ' kafka-map';$this->producer = null;$this->consumer = null;$this->receive_wait_time = 10;$this->receive_wait_num = 100;}/*** 获取生产者*/public function Producer(){$conf = new \RdKafka\Conf();// $conf->set('bootstrap.servers', $this->broker_list);$conf->set('metadata.broker.list', $this->broker_list);// 0:不会等待服务器的反馈。该消息会被立刻添加到 socket buffer 中并认为已经发送完成// 1:leader节点会将记录写入本地日志,并且在所有 follower 节点反馈之前就先确认成功// all:leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成$conf->set('acks', '0');//If you need to produce exactly once and want to keep the original produce order, uncomment the line below//$conf->set('enable.idempotence', 'true');$producer = new \RdKafka\Producer($conf);$this->producer = $producer;return $this;}/*** 发送消息** @param string|array $msg* @param string $topic* @return void*/public function SendMsg($msg = '', $topic = ''){if (empty($topic)) {$topic = $this->topic;}$producer = $this->producer;$topic = $producer->newTopic($topic);if (!is_array($msg)) {$msg = [$msg];}foreach ($msg as $value) {$topic->produce(RD_KAFKA_PARTITION_UA, 0, $value);$producer->poll(0);}for ($flushRetries = 0; $flushRetries < count($msg); $flushRetries++) {$result = $producer->flush(10000);            if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {break;}}if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {throw new \RuntimeException('Kafka消息发送失败,错误代码:' . $result);}}/*** 获取消费者** @param string $group_id* @return void*/public function Consumer($group_id = ''){$conf = new \RdKafka\Conf();// Set a rebalance callback to log partition assignments (optional)$conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {switch ($err) {case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:echo "Assign: ";var_dump($partitions);$kafka->assign($partitions);break;case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:echo "Revoke: ";var_dump($partitions);$kafka->assign(NULL);break;default:throw new \Exception($err);}});// Configure the group.id. All consumer with the same group.id will consume// different partitions.if (empty($group_id)) {$group_id = $this->group_id;}// 设置相同的group,防止一次消息被多次消费。// 消费者启动的进程数应小于等于topic的分区数,否则多余的进程是无用的$conf->set('group.id', $group_id);// Initial list of Kafka brokers// $conf->set('bootstrap.servers', $this->broker_list);$conf->set('metadata.broker.list', $this->broker_list);// Set where to start consuming messages when there is no initial offset in// offset store or the desired offset is out of range.// 'smallest': start from the beginning//earliest:简单理解为从头开始消费,latest:简单理解为从最新的开始消费$conf->set('auto.offset.reset', 'earliest');// 在interval.ms的时间内定期向ZooKeeper提交使用者已获取的消息的偏移量// 自动提交分区消费的位置,手动可确保消息被消费$conf->set('enable.auto.commit', true);$conf->set('auto.commit.interval.ms', 1000);$consumer = new \RdKafka\KafkaConsumer($conf);$this->consumer = $consumer;return $this;}/*** 接收消息** @param string $topic* @param array $callback* @return void*/public function ReceiveMsg($topic = '', array $callback = []){$consumer = $this->consumer;if (empty($topic)) {$topic = $this->topic;}if (!is_array($topic)) {$topic = [$topic];}// Subscribe to topic 'test'$consumer->subscribe($topic);echo "Waiting for partition assignment... (make take some time when\n";echo "quickly re-joining the group after leaving it.)\n";$i = 0;$msg_list = [];while (true) {$i++;if ($i > $this->receive_wait_time) {$i = 0;if (empty($msg_list)) {continue;}if (!empty($callback)) {call_user_func_array($callback, [$msg_list]);}$msg_list = [];}// 阻塞一秒钟$message = $consumer->consume(1000);switch ($message->err) {case RD_KAFKA_RESP_ERR_NO_ERROR:$msg_list[] = $message->payload;if (count($msg_list) < $this->receive_wait_num) {continue;}if (!empty($callback)) {call_user_func_array($callback, [$msg_list]);}$i = 0;$msg_list = [];break;case RD_KAFKA_RESP_ERR__PARTITION_EOF:// echo "No more messages; will wait for more\n";break;case RD_KAFKA_RESP_ERR__TIMED_OUT:// echo "Timed out\n";break;default:throw new \Exception($message->errstr(), $message->err);break;}}}}

调用:

$data['member_id']     = '121221';
$data['ip']            = $this->getIp();
$data['timestamp']     = $this->getMicrotime();$kafkaObj = new kafka();
$kafkaObj->Producer()->sendMsg(json_encode($data, 320));

相关文章:

php实现kafka

kafka类&#xff1a; <?phpclass b2c_kafka {public $broker_list;public $topic;public $group_id;protected $producer null;protected $consumer null;protected $receive_wait_time;protected $receive_wait_num;/*** 构造方法* param object app*/public function …...

YOLOv10改进系列,YOLOv10损失函数更换为Powerful-IoU(2024年最新IOU),助力高效涨点

改进前训练结果: 改进后的结果: 摘要 边界框回归(BBR)是目标检测中的核心任务之一,BBR损失函数显著影响其性能。然而,观察到现有基于IoU的损失函数存在不合理的惩罚因子,导致回归过程中锚框扩展,并显著减缓收敛速度。为了解决这个问题,深入分析了锚框扩展的原因。针…...

工具知识 | Linux 常用命令参考手册

目录 文件 查看文件内容 headtailcatnlmore 创建 touchmkdirmktemp 删除 rmrmdir 查找文件 findlocate lspwdwcchattrpastestatgrepsedcdcpmvopensourcetreelnfilesortuniqsplitvim 系统管理 nohupwatchpingwhichshutdownrebootuptimecrontabatunameifconfigwhereischmodlsofc…...

mysql 常用知识点总结

MySQL 是一种广泛使用的关系型数据库管理系统&#xff08;RDBMS&#xff09;&#xff0c;它基于结构化查询语言&#xff08;SQL&#xff09;。了解 MySQL 的语法对数据库管理和操作非常重要。以下是 MySQL 语法的详细完整解释&#xff0c;涵盖基本概念、创建表、查询、修改数据…...

conda常用指令

1、查看conda版本 conda --version 2、更新conda conda update conda 3、查看conda环境信息 conda info 4、查看已有虚拟环境 conda info --envs conda info -e conda env list 5、创建新虚拟环境 conda create --name myenv python3.8 6、激活环境和退出环境 conda…...

前后端分离项目--下载功能

文章目录 不使用代理服务器blobblob构造函数通过FormData对象的getBlob方法创建Blob对象将Blob对象转换成UR 使用代理服务器 前后端分离项目中下载与其他接口的使用不同&#xff0c;一般下载不走node&#xff0c;不通过代理服务器&#xff0c;而是直接在前台发送请求&#xff0…...

PMP--一模--解题--81-90

文章目录 4.整合管理81、 [单选] 一位先前不活跃的干系人参与程度突然增加&#xff0c;这种意外的参与导致了一些变更请求。项目经理应该做什么&#xff1f; 4.整合管理82、 [单选] 公司的新产品系列将在两个月内发布&#xff0c;95%的项目任务均已完成。但是&#xff0c;管理层…...

计算机网络 --- 【2】计算机网络的组成、功能

目录 一、计算机网络的组成 1.1 从组成部分看 1.2 从工作方式看 1.3 从逻辑功能看 1.4 总结 二、计算机网络的功能 2.1 数据通信 2.2 资源共享​编辑 2.3 分布式处理 2.4 提高可靠性 2.5 负载均衡 一、计算机网络的组成 1.1 从组成部分看 我们举例分析计算机网络从…...

『功能项目』切换职业技能面板【49】

我们打开上一篇48切换职业面板的项目&#xff0c; 本章要做的事情是制作第二职业法师技能面板、第三职业面板并且完成切换 双击打开Canvas进入预制体空间 复制三个技能栏面板 重命名 设置第一技能栏 设置第二职业技能栏 设置第三职业技能栏 修改脚本&#xff1a;ChangeProfess…...

寻找排名好的自闭症学校?这些关键因素不可忽视

郑州市如果有一家如星贝育园这样的自闭症公办学校&#xff0c;那无疑将为当地的自闭症儿童及其家庭带来巨大的福音。星贝育园所展现出的专业性、承诺的康复效果保障、以及为特殊儿童提供的全方位支持&#xff0c;都体现了其对自闭症儿童教育康复事业的深刻理解和高度责任感。 …...

Git常用命令(记录)

提交代码 git status 查看状态git add .或者git add xx选择提交全部或者某文件git commit -m “提交信息”git push 创建新分支提交到新的分支 git checkout -b [branch-name] 创建并切换到新分支git add [file-name] 将要上传的文件添加到暂存区git commit -m “commit mes…...

STM32+ESP8266 WiFi连接机智云平台APP远程控制教程

本文档将介绍如何用STM32ESP8266 WiFi模块从零开始连接上机智云&#xff0c;并通过APP进行远程控制。 机智云官网&#xff1a;机智云|智能物联网操作系统 (gizwits.com) 准备&#xff1a;STM32、ESP8266、手机、可上网的WiFi。 1.创建设备 1.1 注册登陆 请自行注册账号并登陆…...

学懂C++(六十):C++ 11、C++ 14、C++ 17、C++ 20新特性大总结(万字详解大全)

一、引言 随着计算机科学与技术的飞速发展&#xff0c;编程语言也在不断进化以满足日益增长的需求。C是一门集高性能和灵活性于一身的编程语言&#xff0c;自1983年诞生以来不断演进&#xff0c;逐渐成为了众多领域的主流编程语言。为了进一步提升开发效率和代码质量&#xff0…...

杭电1008电梯

提供两种做法&#xff0c;第一种不知道为啥不ac。。。 #include<iostream> using namespace std; //不清楚为什么报错了 int a[10000],x[10000]; int main(){int n;while(cin>>n,n!0){for(int i0;i<n;i){cin>>a[i];if(i0) x[i](65)*a[i]-5;else {if(a[i-…...

【Python小知识 - 2】:在VSCode中切换Python解释器版本

文章目录 在VSCode中切换Python解释器版本 在VSCode中切换Python解释器版本 在VSCode中按下快捷键CtrlShiftP&#xff0c;出现命令框。 输入以下命令&#xff1a; Python: Select Interpreter输入命令回车后即出现不同的Python解释器选项&#xff0c;选择想要切换的Python解释器…...

ubuntu meson安装

安装 sudo python3 -m pip install meson sudo python3 -m pip install ninja问题 libdrm ERROR: Dependency “libdrm” not found, tried pkgconfig and cmake Unable to correct problems, you have held broken packages. sudo apt-get update --fix-missing sudo apt in…...

记者协会评审系统-需求分析

记者协会评审系统-需求分析 文章目录 记者协会评审系统-需求分析一、参与角色二、业务流程2.1 作品推荐收集2.2 初步审核2.3 评审功能(初评)2.4&5 定评和审定三、功能清单2.1 基础功能2.2 业务功能2.3.1.单位投稿功能2.3.2.推荐作品分类管理2.3.3. 稿件初审2.3.4.评审功能…...

python 检索与该查询最相似的句子 使用库hflayers和sentence_transformers来实现遇到的问题

此篇文章总结遇到三个问题(3. OSError: We couldn’t connect to ‘https://huggingface.co’ to load this file, couldn’t find it in the cached files and it looks like sentence-transformers/all-mpnet-base-v2 is not the path to a directory containing a file nam…...

计算机毕业设计 在线新闻聚合平台的设计与实现 Java+SpringBoot+Vue 前后端分离 文档报告 代码讲解 安装调试

&#x1f34a;作者&#xff1a;计算机编程-吉哥 &#x1f34a;简介&#xff1a;专业从事JavaWeb程序开发&#xff0c;微信小程序开发&#xff0c;定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事&#xff0c;生活就是快乐的。 &#x1f34a;心愿&#xff1a;点…...

【机器学习随笔】概率论与实际问题的对应

主要从直观看待问题的角度&#xff0c;对概率分布进行分类。 一、时间维度 1、一个事件两次发生的时间间隔或者说&#xff0c;单位时间内发生了两次的概率&#xff0c;用指数分布 2、多个事件发生的时间间隔&#xff0c;用Gamma分布。 3、单位时间了发生了k次的概率&#xff0c…...

装饰模式(Decorator Pattern)重构java邮件发奖系统实战

前言 现在我们有个如下的需求&#xff0c;设计一个邮件发奖的小系统&#xff0c; 需求 1.数据验证 → 2. 敏感信息加密 → 3. 日志记录 → 4. 实际发送邮件 装饰器模式&#xff08;Decorator Pattern&#xff09;允许向一个现有的对象添加新的功能&#xff0c;同时又不改变其…...

Lombok 的 @Data 注解失效,未生成 getter/setter 方法引发的HTTP 406 错误

HTTP 状态码 406 (Not Acceptable) 和 500 (Internal Server Error) 是两类完全不同的错误&#xff0c;它们的含义、原因和解决方法都有显著区别。以下是详细对比&#xff1a; 1. HTTP 406 (Not Acceptable) 含义&#xff1a; 客户端请求的内容类型与服务器支持的内容类型不匹…...

pam_env.so模块配置解析

在PAM&#xff08;Pluggable Authentication Modules&#xff09;配置中&#xff0c; /etc/pam.d/su 文件相关配置含义如下&#xff1a; 配置解析 auth required pam_env.so1. 字段分解 字段值说明模块类型auth认证类模块&#xff0c;负责验证用户身份&am…...

Cinnamon修改面板小工具图标

Cinnamon开始菜单-CSDN博客 设置模块都是做好的&#xff0c;比GNOME简单得多&#xff01; 在 applet.js 里增加 const Settings imports.ui.settings;this.settings new Settings.AppletSettings(this, HTYMenusonichy, instance_id); this.settings.bind(menu-icon, menu…...

三体问题详解

从物理学角度&#xff0c;三体问题之所以不稳定&#xff0c;是因为三个天体在万有引力作用下相互作用&#xff0c;形成一个非线性耦合系统。我们可以从牛顿经典力学出发&#xff0c;列出具体的运动方程&#xff0c;并说明为何这个系统本质上是混沌的&#xff0c;无法得到一般解…...

MySQL用户和授权

开放MySQL白名单 可以通过iptables-save命令确认对应客户端ip是否可以访问MySQL服务&#xff1a; test: # iptables-save | grep 3306 -A mp_srv_whitelist -s 172.16.14.102/32 -p tcp -m tcp --dport 3306 -j ACCEPT -A mp_srv_whitelist -s 172.16.4.16/32 -p tcp -m tcp -…...

RNN避坑指南:从数学推导到LSTM/GRU工业级部署实战流程

本文较长&#xff0c;建议点赞收藏&#xff0c;以免遗失。更多AI大模型应用开发学习视频及资料&#xff0c;尽在聚客AI学院。 本文全面剖析RNN核心原理&#xff0c;深入讲解梯度消失/爆炸问题&#xff0c;并通过LSTM/GRU结构实现解决方案&#xff0c;提供时间序列预测和文本生成…...

AspectJ 在 Android 中的完整使用指南

一、环境配置&#xff08;Gradle 7.0 适配&#xff09; 1. 项目级 build.gradle // 注意&#xff1a;沪江插件已停更&#xff0c;推荐官方兼容方案 buildscript {dependencies {classpath org.aspectj:aspectjtools:1.9.9.1 // AspectJ 工具} } 2. 模块级 build.gradle plu…...

无人机侦测与反制技术的进展与应用

国家电网无人机侦测与反制技术的进展与应用 引言 随着无人机&#xff08;无人驾驶飞行器&#xff0c;UAV&#xff09;技术的快速发展&#xff0c;其在商业、娱乐和军事领域的广泛应用带来了新的安全挑战。特别是对于关键基础设施如电力系统&#xff0c;无人机的“黑飞”&…...

基于IDIG-GAN的小样本电机轴承故障诊断

目录 🔍 核心问题 一、IDIG-GAN模型原理 1. 整体架构 2. 核心创新点 (1) ​梯度归一化(Gradient Normalization)​​ (2) ​判别器梯度间隙正则化(Discriminator Gradient Gap Regularization)​​ (3) ​自注意力机制(Self-Attention)​​ 3. 完整损失函数 二…...