当前位置: 首页 > news >正文

php实现kafka

kafka类:

<?phpclass b2c_kafka
{public $broker_list;public $topic;public $group_id;protected $producer = null;protected $consumer = null;protected $receive_wait_time;protected $receive_wait_num;/*** 构造方法* @param object app*/public function __construct(){$this->broker_list = 'kafka';$this->topic = 'local-cn';$this->group_id = ' kafka-map';$this->producer = null;$this->consumer = null;$this->receive_wait_time = 10;$this->receive_wait_num = 100;}/*** 获取生产者*/public function Producer(){$conf = new \RdKafka\Conf();// $conf->set('bootstrap.servers', $this->broker_list);$conf->set('metadata.broker.list', $this->broker_list);// 0:不会等待服务器的反馈。该消息会被立刻添加到 socket buffer 中并认为已经发送完成// 1:leader节点会将记录写入本地日志,并且在所有 follower 节点反馈之前就先确认成功// all:leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成$conf->set('acks', '0');//If you need to produce exactly once and want to keep the original produce order, uncomment the line below//$conf->set('enable.idempotence', 'true');$producer = new \RdKafka\Producer($conf);$this->producer = $producer;return $this;}/*** 发送消息** @param string|array $msg* @param string $topic* @return void*/public function SendMsg($msg = '', $topic = ''){if (empty($topic)) {$topic = $this->topic;}$producer = $this->producer;$topic = $producer->newTopic($topic);if (!is_array($msg)) {$msg = [$msg];}foreach ($msg as $value) {$topic->produce(RD_KAFKA_PARTITION_UA, 0, $value);$producer->poll(0);}for ($flushRetries = 0; $flushRetries < count($msg); $flushRetries++) {$result = $producer->flush(10000);            if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {break;}}if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {throw new \RuntimeException('Kafka消息发送失败,错误代码:' . $result);}}/*** 获取消费者** @param string $group_id* @return void*/public function Consumer($group_id = ''){$conf = new \RdKafka\Conf();// Set a rebalance callback to log partition assignments (optional)$conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {switch ($err) {case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:echo "Assign: ";var_dump($partitions);$kafka->assign($partitions);break;case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:echo "Revoke: ";var_dump($partitions);$kafka->assign(NULL);break;default:throw new \Exception($err);}});// Configure the group.id. All consumer with the same group.id will consume// different partitions.if (empty($group_id)) {$group_id = $this->group_id;}// 设置相同的group,防止一次消息被多次消费。// 消费者启动的进程数应小于等于topic的分区数,否则多余的进程是无用的$conf->set('group.id', $group_id);// Initial list of Kafka brokers// $conf->set('bootstrap.servers', $this->broker_list);$conf->set('metadata.broker.list', $this->broker_list);// Set where to start consuming messages when there is no initial offset in// offset store or the desired offset is out of range.// 'smallest': start from the beginning//earliest:简单理解为从头开始消费,latest:简单理解为从最新的开始消费$conf->set('auto.offset.reset', 'earliest');// 在interval.ms的时间内定期向ZooKeeper提交使用者已获取的消息的偏移量// 自动提交分区消费的位置,手动可确保消息被消费$conf->set('enable.auto.commit', true);$conf->set('auto.commit.interval.ms', 1000);$consumer = new \RdKafka\KafkaConsumer($conf);$this->consumer = $consumer;return $this;}/*** 接收消息** @param string $topic* @param array $callback* @return void*/public function ReceiveMsg($topic = '', array $callback = []){$consumer = $this->consumer;if (empty($topic)) {$topic = $this->topic;}if (!is_array($topic)) {$topic = [$topic];}// Subscribe to topic 'test'$consumer->subscribe($topic);echo "Waiting for partition assignment... (make take some time when\n";echo "quickly re-joining the group after leaving it.)\n";$i = 0;$msg_list = [];while (true) {$i++;if ($i > $this->receive_wait_time) {$i = 0;if (empty($msg_list)) {continue;}if (!empty($callback)) {call_user_func_array($callback, [$msg_list]);}$msg_list = [];}// 阻塞一秒钟$message = $consumer->consume(1000);switch ($message->err) {case RD_KAFKA_RESP_ERR_NO_ERROR:$msg_list[] = $message->payload;if (count($msg_list) < $this->receive_wait_num) {continue;}if (!empty($callback)) {call_user_func_array($callback, [$msg_list]);}$i = 0;$msg_list = [];break;case RD_KAFKA_RESP_ERR__PARTITION_EOF:// echo "No more messages; will wait for more\n";break;case RD_KAFKA_RESP_ERR__TIMED_OUT:// echo "Timed out\n";break;default:throw new \Exception($message->errstr(), $message->err);break;}}}}

调用:

$data['member_id']     = '121221';
$data['ip']            = $this->getIp();
$data['timestamp']     = $this->getMicrotime();$kafkaObj = new kafka();
$kafkaObj->Producer()->sendMsg(json_encode($data, 320));

相关文章:

php实现kafka

kafka类&#xff1a; <?phpclass b2c_kafka {public $broker_list;public $topic;public $group_id;protected $producer null;protected $consumer null;protected $receive_wait_time;protected $receive_wait_num;/*** 构造方法* param object app*/public function …...

YOLOv10改进系列,YOLOv10损失函数更换为Powerful-IoU(2024年最新IOU),助力高效涨点

改进前训练结果: 改进后的结果: 摘要 边界框回归(BBR)是目标检测中的核心任务之一,BBR损失函数显著影响其性能。然而,观察到现有基于IoU的损失函数存在不合理的惩罚因子,导致回归过程中锚框扩展,并显著减缓收敛速度。为了解决这个问题,深入分析了锚框扩展的原因。针…...

工具知识 | Linux 常用命令参考手册

目录 文件 查看文件内容 headtailcatnlmore 创建 touchmkdirmktemp 删除 rmrmdir 查找文件 findlocate lspwdwcchattrpastestatgrepsedcdcpmvopensourcetreelnfilesortuniqsplitvim 系统管理 nohupwatchpingwhichshutdownrebootuptimecrontabatunameifconfigwhereischmodlsofc…...

mysql 常用知识点总结

MySQL 是一种广泛使用的关系型数据库管理系统&#xff08;RDBMS&#xff09;&#xff0c;它基于结构化查询语言&#xff08;SQL&#xff09;。了解 MySQL 的语法对数据库管理和操作非常重要。以下是 MySQL 语法的详细完整解释&#xff0c;涵盖基本概念、创建表、查询、修改数据…...

conda常用指令

1、查看conda版本 conda --version 2、更新conda conda update conda 3、查看conda环境信息 conda info 4、查看已有虚拟环境 conda info --envs conda info -e conda env list 5、创建新虚拟环境 conda create --name myenv python3.8 6、激活环境和退出环境 conda…...

前后端分离项目--下载功能

文章目录 不使用代理服务器blobblob构造函数通过FormData对象的getBlob方法创建Blob对象将Blob对象转换成UR 使用代理服务器 前后端分离项目中下载与其他接口的使用不同&#xff0c;一般下载不走node&#xff0c;不通过代理服务器&#xff0c;而是直接在前台发送请求&#xff0…...

PMP--一模--解题--81-90

文章目录 4.整合管理81、 [单选] 一位先前不活跃的干系人参与程度突然增加&#xff0c;这种意外的参与导致了一些变更请求。项目经理应该做什么&#xff1f; 4.整合管理82、 [单选] 公司的新产品系列将在两个月内发布&#xff0c;95%的项目任务均已完成。但是&#xff0c;管理层…...

计算机网络 --- 【2】计算机网络的组成、功能

目录 一、计算机网络的组成 1.1 从组成部分看 1.2 从工作方式看 1.3 从逻辑功能看 1.4 总结 二、计算机网络的功能 2.1 数据通信 2.2 资源共享​编辑 2.3 分布式处理 2.4 提高可靠性 2.5 负载均衡 一、计算机网络的组成 1.1 从组成部分看 我们举例分析计算机网络从…...

『功能项目』切换职业技能面板【49】

我们打开上一篇48切换职业面板的项目&#xff0c; 本章要做的事情是制作第二职业法师技能面板、第三职业面板并且完成切换 双击打开Canvas进入预制体空间 复制三个技能栏面板 重命名 设置第一技能栏 设置第二职业技能栏 设置第三职业技能栏 修改脚本&#xff1a;ChangeProfess…...

寻找排名好的自闭症学校?这些关键因素不可忽视

郑州市如果有一家如星贝育园这样的自闭症公办学校&#xff0c;那无疑将为当地的自闭症儿童及其家庭带来巨大的福音。星贝育园所展现出的专业性、承诺的康复效果保障、以及为特殊儿童提供的全方位支持&#xff0c;都体现了其对自闭症儿童教育康复事业的深刻理解和高度责任感。 …...

Git常用命令(记录)

提交代码 git status 查看状态git add .或者git add xx选择提交全部或者某文件git commit -m “提交信息”git push 创建新分支提交到新的分支 git checkout -b [branch-name] 创建并切换到新分支git add [file-name] 将要上传的文件添加到暂存区git commit -m “commit mes…...

STM32+ESP8266 WiFi连接机智云平台APP远程控制教程

本文档将介绍如何用STM32ESP8266 WiFi模块从零开始连接上机智云&#xff0c;并通过APP进行远程控制。 机智云官网&#xff1a;机智云|智能物联网操作系统 (gizwits.com) 准备&#xff1a;STM32、ESP8266、手机、可上网的WiFi。 1.创建设备 1.1 注册登陆 请自行注册账号并登陆…...

学懂C++(六十):C++ 11、C++ 14、C++ 17、C++ 20新特性大总结(万字详解大全)

一、引言 随着计算机科学与技术的飞速发展&#xff0c;编程语言也在不断进化以满足日益增长的需求。C是一门集高性能和灵活性于一身的编程语言&#xff0c;自1983年诞生以来不断演进&#xff0c;逐渐成为了众多领域的主流编程语言。为了进一步提升开发效率和代码质量&#xff0…...

杭电1008电梯

提供两种做法&#xff0c;第一种不知道为啥不ac。。。 #include<iostream> using namespace std; //不清楚为什么报错了 int a[10000],x[10000]; int main(){int n;while(cin>>n,n!0){for(int i0;i<n;i){cin>>a[i];if(i0) x[i](65)*a[i]-5;else {if(a[i-…...

【Python小知识 - 2】:在VSCode中切换Python解释器版本

文章目录 在VSCode中切换Python解释器版本 在VSCode中切换Python解释器版本 在VSCode中按下快捷键CtrlShiftP&#xff0c;出现命令框。 输入以下命令&#xff1a; Python: Select Interpreter输入命令回车后即出现不同的Python解释器选项&#xff0c;选择想要切换的Python解释器…...

ubuntu meson安装

安装 sudo python3 -m pip install meson sudo python3 -m pip install ninja问题 libdrm ERROR: Dependency “libdrm” not found, tried pkgconfig and cmake Unable to correct problems, you have held broken packages. sudo apt-get update --fix-missing sudo apt in…...

记者协会评审系统-需求分析

记者协会评审系统-需求分析 文章目录 记者协会评审系统-需求分析一、参与角色二、业务流程2.1 作品推荐收集2.2 初步审核2.3 评审功能(初评)2.4&5 定评和审定三、功能清单2.1 基础功能2.2 业务功能2.3.1.单位投稿功能2.3.2.推荐作品分类管理2.3.3. 稿件初审2.3.4.评审功能…...

python 检索与该查询最相似的句子 使用库hflayers和sentence_transformers来实现遇到的问题

此篇文章总结遇到三个问题(3. OSError: We couldn’t connect to ‘https://huggingface.co’ to load this file, couldn’t find it in the cached files and it looks like sentence-transformers/all-mpnet-base-v2 is not the path to a directory containing a file nam…...

计算机毕业设计 在线新闻聚合平台的设计与实现 Java+SpringBoot+Vue 前后端分离 文档报告 代码讲解 安装调试

&#x1f34a;作者&#xff1a;计算机编程-吉哥 &#x1f34a;简介&#xff1a;专业从事JavaWeb程序开发&#xff0c;微信小程序开发&#xff0c;定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事&#xff0c;生活就是快乐的。 &#x1f34a;心愿&#xff1a;点…...

【机器学习随笔】概率论与实际问题的对应

主要从直观看待问题的角度&#xff0c;对概率分布进行分类。 一、时间维度 1、一个事件两次发生的时间间隔或者说&#xff0c;单位时间内发生了两次的概率&#xff0c;用指数分布 2、多个事件发生的时间间隔&#xff0c;用Gamma分布。 3、单位时间了发生了k次的概率&#xff0c…...

谷歌浏览器插件

项目中有时候会用到插件 sync-cookie-extension1.0.0&#xff1a;开发环境同步测试 cookie 至 localhost&#xff0c;便于本地请求服务携带 cookie 参考地址&#xff1a;https://juejin.cn/post/7139354571712757767 里面有源码下载下来&#xff0c;加在到扩展即可使用FeHelp…...

SkyWalking 10.2.0 SWCK 配置过程

SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外&#xff0c;K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案&#xff0c;全安装在K8S群集中。 具体可参…...

Swift 协议扩展精进之路:解决 CoreData 托管实体子类的类型不匹配问题(下)

概述 在 Swift 开发语言中&#xff0c;各位秃头小码农们可以充分利用语法本身所带来的便利去劈荆斩棘。我们还可以恣意利用泛型、协议关联类型和协议扩展来进一步简化和优化我们复杂的代码需求。 不过&#xff0c;在涉及到多个子类派生于基类进行多态模拟的场景下&#xff0c;…...

ESP32读取DHT11温湿度数据

芯片&#xff1a;ESP32 环境&#xff1a;Arduino 一、安装DHT11传感器库 红框的库&#xff0c;别安装错了 二、代码 注意&#xff0c;DATA口要连接在D15上 #include "DHT.h" // 包含DHT库#define DHTPIN 15 // 定义DHT11数据引脚连接到ESP32的GPIO15 #define D…...

大语言模型(LLM)中的KV缓存压缩与动态稀疏注意力机制设计

随着大语言模型&#xff08;LLM&#xff09;参数规模的增长&#xff0c;推理阶段的内存占用和计算复杂度成为核心挑战。传统注意力机制的计算复杂度随序列长度呈二次方增长&#xff0c;而KV缓存的内存消耗可能高达数十GB&#xff08;例如Llama2-7B处理100K token时需50GB内存&a…...

laravel8+vue3.0+element-plus搭建方法

创建 laravel8 项目 composer create-project --prefer-dist laravel/laravel laravel8 8.* 安装 laravel/ui composer require laravel/ui 修改 package.json 文件 "devDependencies": {"vue/compiler-sfc": "^3.0.7","axios": …...

深入浅出深度学习基础:从感知机到全连接神经网络的核心原理与应用

文章目录 前言一、感知机 (Perceptron)1.1 基础介绍1.1.1 感知机是什么&#xff1f;1.1.2 感知机的工作原理 1.2 感知机的简单应用&#xff1a;基本逻辑门1.2.1 逻辑与 (Logic AND)1.2.2 逻辑或 (Logic OR)1.2.3 逻辑与非 (Logic NAND) 1.3 感知机的实现1.3.1 简单实现 (基于阈…...

Kafka入门-生产者

生产者 生产者发送流程&#xff1a; 延迟时间为0ms时&#xff0c;也就意味着每当有数据就会直接发送 异步发送API 异步发送和同步发送的不同在于&#xff1a;异步发送不需要等待结果&#xff0c;同步发送必须等待结果才能进行下一步发送。 普通异步发送 首先导入所需的k…...

省略号和可变参数模板

本文主要介绍如何展开可变参数的参数包 1.C语言的va_list展开可变参数 #include <iostream> #include <cstdarg>void printNumbers(int count, ...) {// 声明va_list类型的变量va_list args;// 使用va_start将可变参数写入变量argsva_start(args, count);for (in…...

怎么让Comfyui导出的图像不包含工作流信息,

为了数据安全&#xff0c;让Comfyui导出的图像不包含工作流信息&#xff0c;导出的图像就不会拖到comfyui中加载出来工作流。 ComfyUI的目录下node.py 直接移除 pnginfo&#xff08;推荐&#xff09;​​ 在 save_images 方法中&#xff0c;​​删除或注释掉所有与 metadata …...