当前位置: 首页 > news >正文

php实现kafka

kafka类:

<?phpclass b2c_kafka
{public $broker_list;public $topic;public $group_id;protected $producer = null;protected $consumer = null;protected $receive_wait_time;protected $receive_wait_num;/*** 构造方法* @param object app*/public function __construct(){$this->broker_list = 'kafka';$this->topic = 'local-cn';$this->group_id = ' kafka-map';$this->producer = null;$this->consumer = null;$this->receive_wait_time = 10;$this->receive_wait_num = 100;}/*** 获取生产者*/public function Producer(){$conf = new \RdKafka\Conf();// $conf->set('bootstrap.servers', $this->broker_list);$conf->set('metadata.broker.list', $this->broker_list);// 0:不会等待服务器的反馈。该消息会被立刻添加到 socket buffer 中并认为已经发送完成// 1:leader节点会将记录写入本地日志,并且在所有 follower 节点反馈之前就先确认成功// all:leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成$conf->set('acks', '0');//If you need to produce exactly once and want to keep the original produce order, uncomment the line below//$conf->set('enable.idempotence', 'true');$producer = new \RdKafka\Producer($conf);$this->producer = $producer;return $this;}/*** 发送消息** @param string|array $msg* @param string $topic* @return void*/public function SendMsg($msg = '', $topic = ''){if (empty($topic)) {$topic = $this->topic;}$producer = $this->producer;$topic = $producer->newTopic($topic);if (!is_array($msg)) {$msg = [$msg];}foreach ($msg as $value) {$topic->produce(RD_KAFKA_PARTITION_UA, 0, $value);$producer->poll(0);}for ($flushRetries = 0; $flushRetries < count($msg); $flushRetries++) {$result = $producer->flush(10000);            if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {break;}}if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {throw new \RuntimeException('Kafka消息发送失败,错误代码:' . $result);}}/*** 获取消费者** @param string $group_id* @return void*/public function Consumer($group_id = ''){$conf = new \RdKafka\Conf();// Set a rebalance callback to log partition assignments (optional)$conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {switch ($err) {case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:echo "Assign: ";var_dump($partitions);$kafka->assign($partitions);break;case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:echo "Revoke: ";var_dump($partitions);$kafka->assign(NULL);break;default:throw new \Exception($err);}});// Configure the group.id. All consumer with the same group.id will consume// different partitions.if (empty($group_id)) {$group_id = $this->group_id;}// 设置相同的group,防止一次消息被多次消费。// 消费者启动的进程数应小于等于topic的分区数,否则多余的进程是无用的$conf->set('group.id', $group_id);// Initial list of Kafka brokers// $conf->set('bootstrap.servers', $this->broker_list);$conf->set('metadata.broker.list', $this->broker_list);// Set where to start consuming messages when there is no initial offset in// offset store or the desired offset is out of range.// 'smallest': start from the beginning//earliest:简单理解为从头开始消费,latest:简单理解为从最新的开始消费$conf->set('auto.offset.reset', 'earliest');// 在interval.ms的时间内定期向ZooKeeper提交使用者已获取的消息的偏移量// 自动提交分区消费的位置,手动可确保消息被消费$conf->set('enable.auto.commit', true);$conf->set('auto.commit.interval.ms', 1000);$consumer = new \RdKafka\KafkaConsumer($conf);$this->consumer = $consumer;return $this;}/*** 接收消息** @param string $topic* @param array $callback* @return void*/public function ReceiveMsg($topic = '', array $callback = []){$consumer = $this->consumer;if (empty($topic)) {$topic = $this->topic;}if (!is_array($topic)) {$topic = [$topic];}// Subscribe to topic 'test'$consumer->subscribe($topic);echo "Waiting for partition assignment... (make take some time when\n";echo "quickly re-joining the group after leaving it.)\n";$i = 0;$msg_list = [];while (true) {$i++;if ($i > $this->receive_wait_time) {$i = 0;if (empty($msg_list)) {continue;}if (!empty($callback)) {call_user_func_array($callback, [$msg_list]);}$msg_list = [];}// 阻塞一秒钟$message = $consumer->consume(1000);switch ($message->err) {case RD_KAFKA_RESP_ERR_NO_ERROR:$msg_list[] = $message->payload;if (count($msg_list) < $this->receive_wait_num) {continue;}if (!empty($callback)) {call_user_func_array($callback, [$msg_list]);}$i = 0;$msg_list = [];break;case RD_KAFKA_RESP_ERR__PARTITION_EOF:// echo "No more messages; will wait for more\n";break;case RD_KAFKA_RESP_ERR__TIMED_OUT:// echo "Timed out\n";break;default:throw new \Exception($message->errstr(), $message->err);break;}}}}

调用:

$data['member_id']     = '121221';
$data['ip']            = $this->getIp();
$data['timestamp']     = $this->getMicrotime();$kafkaObj = new kafka();
$kafkaObj->Producer()->sendMsg(json_encode($data, 320));

相关文章:

php实现kafka

kafka类&#xff1a; <?phpclass b2c_kafka {public $broker_list;public $topic;public $group_id;protected $producer null;protected $consumer null;protected $receive_wait_time;protected $receive_wait_num;/*** 构造方法* param object app*/public function …...

YOLOv10改进系列,YOLOv10损失函数更换为Powerful-IoU(2024年最新IOU),助力高效涨点

改进前训练结果: 改进后的结果: 摘要 边界框回归(BBR)是目标检测中的核心任务之一,BBR损失函数显著影响其性能。然而,观察到现有基于IoU的损失函数存在不合理的惩罚因子,导致回归过程中锚框扩展,并显著减缓收敛速度。为了解决这个问题,深入分析了锚框扩展的原因。针…...

工具知识 | Linux 常用命令参考手册

目录 文件 查看文件内容 headtailcatnlmore 创建 touchmkdirmktemp 删除 rmrmdir 查找文件 findlocate lspwdwcchattrpastestatgrepsedcdcpmvopensourcetreelnfilesortuniqsplitvim 系统管理 nohupwatchpingwhichshutdownrebootuptimecrontabatunameifconfigwhereischmodlsofc…...

mysql 常用知识点总结

MySQL 是一种广泛使用的关系型数据库管理系统&#xff08;RDBMS&#xff09;&#xff0c;它基于结构化查询语言&#xff08;SQL&#xff09;。了解 MySQL 的语法对数据库管理和操作非常重要。以下是 MySQL 语法的详细完整解释&#xff0c;涵盖基本概念、创建表、查询、修改数据…...

conda常用指令

1、查看conda版本 conda --version 2、更新conda conda update conda 3、查看conda环境信息 conda info 4、查看已有虚拟环境 conda info --envs conda info -e conda env list 5、创建新虚拟环境 conda create --name myenv python3.8 6、激活环境和退出环境 conda…...

前后端分离项目--下载功能

文章目录 不使用代理服务器blobblob构造函数通过FormData对象的getBlob方法创建Blob对象将Blob对象转换成UR 使用代理服务器 前后端分离项目中下载与其他接口的使用不同&#xff0c;一般下载不走node&#xff0c;不通过代理服务器&#xff0c;而是直接在前台发送请求&#xff0…...

PMP--一模--解题--81-90

文章目录 4.整合管理81、 [单选] 一位先前不活跃的干系人参与程度突然增加&#xff0c;这种意外的参与导致了一些变更请求。项目经理应该做什么&#xff1f; 4.整合管理82、 [单选] 公司的新产品系列将在两个月内发布&#xff0c;95%的项目任务均已完成。但是&#xff0c;管理层…...

计算机网络 --- 【2】计算机网络的组成、功能

目录 一、计算机网络的组成 1.1 从组成部分看 1.2 从工作方式看 1.3 从逻辑功能看 1.4 总结 二、计算机网络的功能 2.1 数据通信 2.2 资源共享​编辑 2.3 分布式处理 2.4 提高可靠性 2.5 负载均衡 一、计算机网络的组成 1.1 从组成部分看 我们举例分析计算机网络从…...

『功能项目』切换职业技能面板【49】

我们打开上一篇48切换职业面板的项目&#xff0c; 本章要做的事情是制作第二职业法师技能面板、第三职业面板并且完成切换 双击打开Canvas进入预制体空间 复制三个技能栏面板 重命名 设置第一技能栏 设置第二职业技能栏 设置第三职业技能栏 修改脚本&#xff1a;ChangeProfess…...

寻找排名好的自闭症学校?这些关键因素不可忽视

郑州市如果有一家如星贝育园这样的自闭症公办学校&#xff0c;那无疑将为当地的自闭症儿童及其家庭带来巨大的福音。星贝育园所展现出的专业性、承诺的康复效果保障、以及为特殊儿童提供的全方位支持&#xff0c;都体现了其对自闭症儿童教育康复事业的深刻理解和高度责任感。 …...

Git常用命令(记录)

提交代码 git status 查看状态git add .或者git add xx选择提交全部或者某文件git commit -m “提交信息”git push 创建新分支提交到新的分支 git checkout -b [branch-name] 创建并切换到新分支git add [file-name] 将要上传的文件添加到暂存区git commit -m “commit mes…...

STM32+ESP8266 WiFi连接机智云平台APP远程控制教程

本文档将介绍如何用STM32ESP8266 WiFi模块从零开始连接上机智云&#xff0c;并通过APP进行远程控制。 机智云官网&#xff1a;机智云|智能物联网操作系统 (gizwits.com) 准备&#xff1a;STM32、ESP8266、手机、可上网的WiFi。 1.创建设备 1.1 注册登陆 请自行注册账号并登陆…...

学懂C++(六十):C++ 11、C++ 14、C++ 17、C++ 20新特性大总结(万字详解大全)

一、引言 随着计算机科学与技术的飞速发展&#xff0c;编程语言也在不断进化以满足日益增长的需求。C是一门集高性能和灵活性于一身的编程语言&#xff0c;自1983年诞生以来不断演进&#xff0c;逐渐成为了众多领域的主流编程语言。为了进一步提升开发效率和代码质量&#xff0…...

杭电1008电梯

提供两种做法&#xff0c;第一种不知道为啥不ac。。。 #include<iostream> using namespace std; //不清楚为什么报错了 int a[10000],x[10000]; int main(){int n;while(cin>>n,n!0){for(int i0;i<n;i){cin>>a[i];if(i0) x[i](65)*a[i]-5;else {if(a[i-…...

【Python小知识 - 2】:在VSCode中切换Python解释器版本

文章目录 在VSCode中切换Python解释器版本 在VSCode中切换Python解释器版本 在VSCode中按下快捷键CtrlShiftP&#xff0c;出现命令框。 输入以下命令&#xff1a; Python: Select Interpreter输入命令回车后即出现不同的Python解释器选项&#xff0c;选择想要切换的Python解释器…...

ubuntu meson安装

安装 sudo python3 -m pip install meson sudo python3 -m pip install ninja问题 libdrm ERROR: Dependency “libdrm” not found, tried pkgconfig and cmake Unable to correct problems, you have held broken packages. sudo apt-get update --fix-missing sudo apt in…...

记者协会评审系统-需求分析

记者协会评审系统-需求分析 文章目录 记者协会评审系统-需求分析一、参与角色二、业务流程2.1 作品推荐收集2.2 初步审核2.3 评审功能(初评)2.4&5 定评和审定三、功能清单2.1 基础功能2.2 业务功能2.3.1.单位投稿功能2.3.2.推荐作品分类管理2.3.3. 稿件初审2.3.4.评审功能…...

python 检索与该查询最相似的句子 使用库hflayers和sentence_transformers来实现遇到的问题

此篇文章总结遇到三个问题(3. OSError: We couldn’t connect to ‘https://huggingface.co’ to load this file, couldn’t find it in the cached files and it looks like sentence-transformers/all-mpnet-base-v2 is not the path to a directory containing a file nam…...

计算机毕业设计 在线新闻聚合平台的设计与实现 Java+SpringBoot+Vue 前后端分离 文档报告 代码讲解 安装调试

&#x1f34a;作者&#xff1a;计算机编程-吉哥 &#x1f34a;简介&#xff1a;专业从事JavaWeb程序开发&#xff0c;微信小程序开发&#xff0c;定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事&#xff0c;生活就是快乐的。 &#x1f34a;心愿&#xff1a;点…...

【机器学习随笔】概率论与实际问题的对应

主要从直观看待问题的角度&#xff0c;对概率分布进行分类。 一、时间维度 1、一个事件两次发生的时间间隔或者说&#xff0c;单位时间内发生了两次的概率&#xff0c;用指数分布 2、多个事件发生的时间间隔&#xff0c;用Gamma分布。 3、单位时间了发生了k次的概率&#xff0c…...

基于langchain的简单RAG的实现

闲来无事&#xff0c;想研究一下RAG的实现流程&#xff0c;看网上用langchain的比较多&#xff0c;我自己在下面也跑了跑&#xff0c;代码很简单&#xff0c;以次博客记录一下&#xff0c;方便回顾 langchain LangChain 是一个基于大型语言模型&#xff08;LLM&#xff09;开发…...

ROS2,工作空间中新建了一个python脚本,需要之后作为节点运行。告诉我步骤?

提问 ROS2&#xff0c;工作空间中新建了一个python脚本&#xff0c;需要之后运行。告诉我步骤&#xff1f; 大概要包括而不限于&#xff1a;chmod给可执行权限、setup.py中entry point的配置&#xff0c;如果在launch文件中要使用&#xff0c;还涉及到launch.py文件的配置。最…...

ubuntu中使用docker

上一篇我已经下载了一个ubuntu:20.04的镜像&#xff1b; 1. 查看所有镜像 sudo docker images 2. 基于本地存在的ubuntu:20.04镜像创建一个容器&#xff0c;容器的名为cppubuntu-1。创建的时候就会启动容器。 sudo docker run -itd --name cppubuntu-1 ubuntu:20.04 结果出…...

WebRTC通话原理与入门难度实战指南

波煮的实习公司主要是音视频业务&#xff0c;所以最近在补习WebRTC的相关内容&#xff0c;会不定期给大家分享学习心得和笔记。 文章目录 WebRTC通话原理进行媒体协商&#xff1a;彼此要了解对方支持的媒体格式网络协商&#xff1a;彼此要了解对方的网络情况&#xff0c;这样才…...

Linux中su与sudo命令的区别:权限管理的关键差异解析

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐&#xff1a;「storms…...

el-tabs 切换时数据不更新的问题

最近业务需求&#xff0c;需要在页面中使用tabs&#xff0c;使用过程中出现tabs切换&#xff0c;数据不更新的问题&#xff0c;以下是思路和解决办法。 Vue 会追踪你在模板中绑定的数据&#xff0c;并在数据发生变化时重新渲染相应的部分。但在使用 el-tabs 时&#xff0c;有时…...

OpenCV 图像色彩空间转换与抠图

一、知识点: 1、色彩空间转换函数 (1)、void cvtColor( InputArray src, OutputArray dst, int code, int dstCn 0, AlgorithmHint hint cv::ALGO_HINT_DEFAULT ); (2)、将图像从一种颜色空间转换为另一种。 (3)、参数说明: src: 输入图像&#xff0c;即要进行颜…...

【Visual Studio 2022】卸载安装,ASP.NET

Visual Studio 2022 彻底卸载教程 手动清理残留文件夹 删除C:\Program Files\Microsoft Visual Studio 是旧版本 Visual Studio 的残留安装目录 文件夹名对应的 Visual Studio 版本Microsoft Visual Studio 9.0Visual Studio 2008Microsoft Visual Studio 10.0Visual Studio…...

【云安全】以Aliyun为例聊云厂商服务常见利用手段

目录 OSS-bucket_policy_readable OSS-object_public_access OSS-bucket_object_traversal OSS-Special Bucket Policy OSS-unrestricted_file_upload OSS-object_acl_writable ECS-SSRF 云攻防场景下对云厂商服务的利用大同小异&#xff0c;下面以阿里云为例 其他如腾…...

Qt客户端技巧 -- 窗口美化 -- 圆角窗口

不解析&#xff0c;直接给代码例子 利用窗口重绘事件处理函数paintEvent main.cpp #include <QtCore/qglobal.h> #if QT_VERSION > 0x050000 #include <QtWidgets/QApplication> #else #include <QtGui/QApplication> #endif#include "roundedwin…...