php实现kafka
kafka类:
<?phpclass b2c_kafka
{public $broker_list;public $topic;public $group_id;protected $producer = null;protected $consumer = null;protected $receive_wait_time;protected $receive_wait_num;/*** 构造方法* @param object app*/public function __construct(){$this->broker_list = 'kafka';$this->topic = 'local-cn';$this->group_id = ' kafka-map';$this->producer = null;$this->consumer = null;$this->receive_wait_time = 10;$this->receive_wait_num = 100;}/*** 获取生产者*/public function Producer(){$conf = new \RdKafka\Conf();// $conf->set('bootstrap.servers', $this->broker_list);$conf->set('metadata.broker.list', $this->broker_list);// 0:不会等待服务器的反馈。该消息会被立刻添加到 socket buffer 中并认为已经发送完成// 1:leader节点会将记录写入本地日志,并且在所有 follower 节点反馈之前就先确认成功// all:leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成$conf->set('acks', '0');//If you need to produce exactly once and want to keep the original produce order, uncomment the line below//$conf->set('enable.idempotence', 'true');$producer = new \RdKafka\Producer($conf);$this->producer = $producer;return $this;}/*** 发送消息** @param string|array $msg* @param string $topic* @return void*/public function SendMsg($msg = '', $topic = ''){if (empty($topic)) {$topic = $this->topic;}$producer = $this->producer;$topic = $producer->newTopic($topic);if (!is_array($msg)) {$msg = [$msg];}foreach ($msg as $value) {$topic->produce(RD_KAFKA_PARTITION_UA, 0, $value);$producer->poll(0);}for ($flushRetries = 0; $flushRetries < count($msg); $flushRetries++) {$result = $producer->flush(10000); if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {break;}}if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {throw new \RuntimeException('Kafka消息发送失败,错误代码:' . $result);}}/*** 获取消费者** @param string $group_id* @return void*/public function Consumer($group_id = ''){$conf = new \RdKafka\Conf();// Set a rebalance callback to log partition assignments (optional)$conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {switch ($err) {case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:echo "Assign: ";var_dump($partitions);$kafka->assign($partitions);break;case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:echo "Revoke: ";var_dump($partitions);$kafka->assign(NULL);break;default:throw new \Exception($err);}});// Configure the group.id. All consumer with the same group.id will consume// different partitions.if (empty($group_id)) {$group_id = $this->group_id;}// 设置相同的group,防止一次消息被多次消费。// 消费者启动的进程数应小于等于topic的分区数,否则多余的进程是无用的$conf->set('group.id', $group_id);// Initial list of Kafka brokers// $conf->set('bootstrap.servers', $this->broker_list);$conf->set('metadata.broker.list', $this->broker_list);// Set where to start consuming messages when there is no initial offset in// offset store or the desired offset is out of range.// 'smallest': start from the beginning//earliest:简单理解为从头开始消费,latest:简单理解为从最新的开始消费$conf->set('auto.offset.reset', 'earliest');// 在interval.ms的时间内定期向ZooKeeper提交使用者已获取的消息的偏移量// 自动提交分区消费的位置,手动可确保消息被消费$conf->set('enable.auto.commit', true);$conf->set('auto.commit.interval.ms', 1000);$consumer = new \RdKafka\KafkaConsumer($conf);$this->consumer = $consumer;return $this;}/*** 接收消息** @param string $topic* @param array $callback* @return void*/public function ReceiveMsg($topic = '', array $callback = []){$consumer = $this->consumer;if (empty($topic)) {$topic = $this->topic;}if (!is_array($topic)) {$topic = [$topic];}// Subscribe to topic 'test'$consumer->subscribe($topic);echo "Waiting for partition assignment... (make take some time when\n";echo "quickly re-joining the group after leaving it.)\n";$i = 0;$msg_list = [];while (true) {$i++;if ($i > $this->receive_wait_time) {$i = 0;if (empty($msg_list)) {continue;}if (!empty($callback)) {call_user_func_array($callback, [$msg_list]);}$msg_list = [];}// 阻塞一秒钟$message = $consumer->consume(1000);switch ($message->err) {case RD_KAFKA_RESP_ERR_NO_ERROR:$msg_list[] = $message->payload;if (count($msg_list) < $this->receive_wait_num) {continue;}if (!empty($callback)) {call_user_func_array($callback, [$msg_list]);}$i = 0;$msg_list = [];break;case RD_KAFKA_RESP_ERR__PARTITION_EOF:// echo "No more messages; will wait for more\n";break;case RD_KAFKA_RESP_ERR__TIMED_OUT:// echo "Timed out\n";break;default:throw new \Exception($message->errstr(), $message->err);break;}}}}
调用:
$data['member_id'] = '121221';
$data['ip'] = $this->getIp();
$data['timestamp'] = $this->getMicrotime();$kafkaObj = new kafka();
$kafkaObj->Producer()->sendMsg(json_encode($data, 320));
相关文章:
php实现kafka
kafka类: <?phpclass b2c_kafka {public $broker_list;public $topic;public $group_id;protected $producer null;protected $consumer null;protected $receive_wait_time;protected $receive_wait_num;/*** 构造方法* param object app*/public function …...
YOLOv10改进系列,YOLOv10损失函数更换为Powerful-IoU(2024年最新IOU),助力高效涨点
改进前训练结果: 改进后的结果: 摘要 边界框回归(BBR)是目标检测中的核心任务之一,BBR损失函数显著影响其性能。然而,观察到现有基于IoU的损失函数存在不合理的惩罚因子,导致回归过程中锚框扩展,并显著减缓收敛速度。为了解决这个问题,深入分析了锚框扩展的原因。针…...
工具知识 | Linux 常用命令参考手册
目录 文件 查看文件内容 headtailcatnlmore 创建 touchmkdirmktemp 删除 rmrmdir 查找文件 findlocate lspwdwcchattrpastestatgrepsedcdcpmvopensourcetreelnfilesortuniqsplitvim 系统管理 nohupwatchpingwhichshutdownrebootuptimecrontabatunameifconfigwhereischmodlsofc…...
mysql 常用知识点总结
MySQL 是一种广泛使用的关系型数据库管理系统(RDBMS),它基于结构化查询语言(SQL)。了解 MySQL 的语法对数据库管理和操作非常重要。以下是 MySQL 语法的详细完整解释,涵盖基本概念、创建表、查询、修改数据…...
conda常用指令
1、查看conda版本 conda --version 2、更新conda conda update conda 3、查看conda环境信息 conda info 4、查看已有虚拟环境 conda info --envs conda info -e conda env list 5、创建新虚拟环境 conda create --name myenv python3.8 6、激活环境和退出环境 conda…...
前后端分离项目--下载功能
文章目录 不使用代理服务器blobblob构造函数通过FormData对象的getBlob方法创建Blob对象将Blob对象转换成UR 使用代理服务器 前后端分离项目中下载与其他接口的使用不同,一般下载不走node,不通过代理服务器,而是直接在前台发送请求࿰…...
PMP--一模--解题--81-90
文章目录 4.整合管理81、 [单选] 一位先前不活跃的干系人参与程度突然增加,这种意外的参与导致了一些变更请求。项目经理应该做什么? 4.整合管理82、 [单选] 公司的新产品系列将在两个月内发布,95%的项目任务均已完成。但是,管理层…...
计算机网络 --- 【2】计算机网络的组成、功能
目录 一、计算机网络的组成 1.1 从组成部分看 1.2 从工作方式看 1.3 从逻辑功能看 1.4 总结 二、计算机网络的功能 2.1 数据通信 2.2 资源共享编辑 2.3 分布式处理 2.4 提高可靠性 2.5 负载均衡 一、计算机网络的组成 1.1 从组成部分看 我们举例分析计算机网络从…...
『功能项目』切换职业技能面板【49】
我们打开上一篇48切换职业面板的项目, 本章要做的事情是制作第二职业法师技能面板、第三职业面板并且完成切换 双击打开Canvas进入预制体空间 复制三个技能栏面板 重命名 设置第一技能栏 设置第二职业技能栏 设置第三职业技能栏 修改脚本:ChangeProfess…...
寻找排名好的自闭症学校?这些关键因素不可忽视
郑州市如果有一家如星贝育园这样的自闭症公办学校,那无疑将为当地的自闭症儿童及其家庭带来巨大的福音。星贝育园所展现出的专业性、承诺的康复效果保障、以及为特殊儿童提供的全方位支持,都体现了其对自闭症儿童教育康复事业的深刻理解和高度责任感。 …...
Git常用命令(记录)
提交代码 git status 查看状态git add .或者git add xx选择提交全部或者某文件git commit -m “提交信息”git push 创建新分支提交到新的分支 git checkout -b [branch-name] 创建并切换到新分支git add [file-name] 将要上传的文件添加到暂存区git commit -m “commit mes…...
STM32+ESP8266 WiFi连接机智云平台APP远程控制教程
本文档将介绍如何用STM32ESP8266 WiFi模块从零开始连接上机智云,并通过APP进行远程控制。 机智云官网:机智云|智能物联网操作系统 (gizwits.com) 准备:STM32、ESP8266、手机、可上网的WiFi。 1.创建设备 1.1 注册登陆 请自行注册账号并登陆…...
学懂C++(六十):C++ 11、C++ 14、C++ 17、C++ 20新特性大总结(万字详解大全)
一、引言 随着计算机科学与技术的飞速发展,编程语言也在不断进化以满足日益增长的需求。C是一门集高性能和灵活性于一身的编程语言,自1983年诞生以来不断演进,逐渐成为了众多领域的主流编程语言。为了进一步提升开发效率和代码质量࿰…...
杭电1008电梯
提供两种做法,第一种不知道为啥不ac。。。 #include<iostream> using namespace std; //不清楚为什么报错了 int a[10000],x[10000]; int main(){int n;while(cin>>n,n!0){for(int i0;i<n;i){cin>>a[i];if(i0) x[i](65)*a[i]-5;else {if(a[i-…...
【Python小知识 - 2】:在VSCode中切换Python解释器版本
文章目录 在VSCode中切换Python解释器版本 在VSCode中切换Python解释器版本 在VSCode中按下快捷键CtrlShiftP,出现命令框。 输入以下命令: Python: Select Interpreter输入命令回车后即出现不同的Python解释器选项,选择想要切换的Python解释器…...
ubuntu meson安装
安装 sudo python3 -m pip install meson sudo python3 -m pip install ninja问题 libdrm ERROR: Dependency “libdrm” not found, tried pkgconfig and cmake Unable to correct problems, you have held broken packages. sudo apt-get update --fix-missing sudo apt in…...
记者协会评审系统-需求分析
记者协会评审系统-需求分析 文章目录 记者协会评审系统-需求分析一、参与角色二、业务流程2.1 作品推荐收集2.2 初步审核2.3 评审功能(初评)2.4&5 定评和审定三、功能清单2.1 基础功能2.2 业务功能2.3.1.单位投稿功能2.3.2.推荐作品分类管理2.3.3. 稿件初审2.3.4.评审功能…...
python 检索与该查询最相似的句子 使用库hflayers和sentence_transformers来实现遇到的问题
此篇文章总结遇到三个问题(3. OSError: We couldn’t connect to ‘https://huggingface.co’ to load this file, couldn’t find it in the cached files and it looks like sentence-transformers/all-mpnet-base-v2 is not the path to a directory containing a file nam…...
计算机毕业设计 在线新闻聚合平台的设计与实现 Java+SpringBoot+Vue 前后端分离 文档报告 代码讲解 安装调试
🍊作者:计算机编程-吉哥 🍊简介:专业从事JavaWeb程序开发,微信小程序开发,定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事,生活就是快乐的。 🍊心愿:点…...
【机器学习随笔】概率论与实际问题的对应
主要从直观看待问题的角度,对概率分布进行分类。 一、时间维度 1、一个事件两次发生的时间间隔或者说,单位时间内发生了两次的概率,用指数分布 2、多个事件发生的时间间隔,用Gamma分布。 3、单位时间了发生了k次的概率,…...
利用最小二乘法找圆心和半径
#include <iostream> #include <vector> #include <cmath> #include <Eigen/Dense> // 需安装Eigen库用于矩阵运算 // 定义点结构 struct Point { double x, y; Point(double x_, double y_) : x(x_), y(y_) {} }; // 最小二乘法求圆心和半径 …...
Linux简单的操作
ls ls 查看当前目录 ll 查看详细内容 ls -a 查看所有的内容 ls --help 查看方法文档 pwd pwd 查看当前路径 cd cd 转路径 cd .. 转上一级路径 cd 名 转换路径 …...
华为OD机试-食堂供餐-二分法
import java.util.Arrays; import java.util.Scanner;public class DemoTest3 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseint a in.nextIn…...
数据链路层的主要功能是什么
数据链路层(OSI模型第2层)的核心功能是在相邻网络节点(如交换机、主机)间提供可靠的数据帧传输服务,主要职责包括: 🔑 核心功能详解: 帧封装与解封装 封装: 将网络层下发…...
C# SqlSugar:依赖注入与仓储模式实践
C# SqlSugar:依赖注入与仓储模式实践 在 C# 的应用开发中,数据库操作是必不可少的环节。为了让数据访问层更加简洁、高效且易于维护,许多开发者会选择成熟的 ORM(对象关系映射)框架,SqlSugar 就是其中备受…...
【碎碎念】宝可梦 Mesh GO : 基于MESH网络的口袋妖怪 宝可梦GO游戏自组网系统
目录 游戏说明《宝可梦 Mesh GO》 —— 局域宝可梦探索Pokmon GO 类游戏核心理念应用场景Mesh 特性 宝可梦玩法融合设计游戏构想要素1. 地图探索(基于物理空间 广播范围)2. 野生宝可梦生成与广播3. 对战系统4. 道具与通信5. 延伸玩法 安全性设计 技术选…...
微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据
微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据 Power Query 具有大量专门帮助您清理和准备数据以供分析的功能。 您将了解如何简化复杂模型、更改数据类型、重命名对象和透视数据。 您还将了解如何分析列,以便知晓哪些列包含有价值的数据,…...
Yolov8 目标检测蒸馏学习记录
yolov8系列模型蒸馏基本流程,代码下载:这里本人提交了一个demo:djdll/Yolov8_Distillation: Yolov8轻量化_蒸馏代码实现 在轻量化模型设计中,**知识蒸馏(Knowledge Distillation)**被广泛应用,作为提升模型…...
嵌入式学习笔记DAY33(网络编程——TCP)
一、网络架构 C/S (client/server 客户端/服务器):由客户端和服务器端两个部分组成。客户端通常是用户使用的应用程序,负责提供用户界面和交互逻辑 ,接收用户输入,向服务器发送请求,并展示服务…...
Netty从入门到进阶(二)
二、Netty入门 1. 概述 1.1 Netty是什么 Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients. Netty是一个异步的、基于事件驱动的网络应用框架,用于…...
