当前位置: 首页 > news >正文

BeansTalkd 做消息队列服务

无意间看到这个仓库讲php关于 BeanStalkd 的扩展,然后就去了解了一下beanstalkd,用它可以用来做队列服务。

话不多说,安装一下试试。

首先 sudo apt search beanstalk 搜索一下发现


Sorting... Done
Full Text Search... Done
awscli/focal-updates,focal-updates 1.18.69-1ubuntu0.20.04.1 all
  Universal Command Line Environment for AWS

beanstalkd/focal,now 1.11-1 amd64 [installed]
  simple, in-memory, workqueue service

python-celery-common/focal,focal 4.2.1-5ubuntu1 all
  async task/job queue based on message passing (common files)

python-celery-doc/focal,focal 4.2.1-5ubuntu1 all
  async task/job queue based on message passing (Documentation)

python3-celery/focal,focal 4.2.1-5ubuntu1 all
  async task/job queue based on message passing (Python3 version)

ruby-beaneater/focal,focal 1.0.0-1 all
  simple beanstalkd client for Ruby
 

好了,找到这个beanstalkd了

然后安装

 sudo apt-get install beanstalkd 

Reading package lists... Done
Building dependency tree       
Reading state information... Done
Suggested packages:
  doc-base
The following NEW packages will be installed:
  beanstalkd
0 upgraded, 1 newly installed, 0 to remove and 90 not upgraded.
Need to get 43.9 kB of archives.
After this operation, 125 kB of additional disk space will be used.
Get:1 http://cn.archive.ubuntu.com/ubuntu focal/universe amd64 beanstalkd amd64 1.11-1 [43.9 kB]
Get:1 http://cn.archive.ubuntu.com/ubuntu focal/universe amd64 beanstalkd amd64 1.11-1 [43.9 kB]
Fetched 31.3 kB in 4s (7,047 B/s)     
Selecting previously unselected package beanstalkd.
(Reading database ... 256731 files and directories currently installed.)
Preparing to unpack .../beanstalkd_1.11-1_amd64.deb ...
Unpacking beanstalkd (1.11-1) ...
Setting up beanstalkd (1.11-1) ...
Created symlink /etc/systemd/system/multi-user.target.wants/beanstalkd.service → /lib/systemd/system/beanstalkd.service.
beanstalkd.socket is a disabled or a static unit, not starting it.
Processing triggers for man-db (2.9.1-1) ...
Processing triggers for systemd (245.4-4ubuntu3.11) ...
 

安装成功,这个东西会监听 11300 端口
然后使用这个工具来看看


监控工具:

wget https://github.com/src-d/beanstool/releases/download/v0.2.0/beanstool_v0.2.0_linux_amd64.tar.gz

获取文件后解压

tar -xvzf beanstool_v0.2.0_linux_amd64.tar.gz

然后拷贝到 /usr/local/bin/

sudo cp beanstool_v0.2.0_linux_amd64/beanstool /usr/local/bin/

这样就直接用 beanstool 了

查看当前状态

beanstool stats

结果

+---------+----------+----------+----------+----------+----------+----------+----------+
| Name    | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |
+---------+----------+----------+----------+----------+----------+----------+----------+
| default | 0        | 0        | 0        | 0        | 0        | 0        | 0        |
+---------+----------+----------+----------+----------+----------+----------+----------+

然后使用composer的vendor包

composer require pda/pheanstalk
安装完毕后创建一个input.php文件做生产者


<?php
require __DIR__ . '/vendor/autoload.php';

use Pheanstalk\Pheanstalk;

$pheanstalk = Pheanstalk::create('127.0.0.1');

// Queue a Job
$pheanstalk
  ->useTube('testtube')
  ->put("job payload goes here\n");

$pheanstalk
    ->useTube('testtube')
    ->put(
        json_encode(['test' => 'data']),  // encode data in payload
        Pheanstalk::DEFAULT_PRIORITY,     // default priority
        30, // delay by 30s
        60  // beanstalk will retry job after 60s
     );


再创建一个output.php文件做消费者


<?php
require __DIR__ . '/vendor/autoload.php';
use Pheanstalk\Pheanstalk;

$pheanstalk = Pheanstalk::create('127.0.0.1');

// we want jobs from 'testtube' only.
$pheanstalk->watch('testtube');

// this hangs until a Job is produced.
$job = $pheanstalk->reserve();

try {
    $jobPayload = $job->getData();
    // do work.

    sleep(2);
    // If it's going to take a long time, periodically
    // tell beanstalk we're alive to stop it rescheduling the job.
    $pheanstalk->touch($job);
    sleep(2);

    // eventually we're done, delete job.
    $pheanstalk->delete($job);
}
catch(\Exception $e) {
    // handle exception.
    // and let some other worker retry.
    $pheanstalk->release($job); 
}
 

然后执行一下  php input.php

查看 状态

 
$ beanstool stats
+----------+----------+----------+----------+----------+----------+----------+----------+
| Name     | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |
+----------+----------+----------+----------+----------+----------+----------+----------+
| default  | 0        | 0        | 0        | 0        | 0        | 0        | 0        |
| testtube | 0        | 1        | 1        | 0        | 0        | 0        | 2        |
+----------+----------+----------+----------+----------+----------+----------+----------+
 
我们看到有一个在ready状态,一个在delayed状态,这是由于第二次的put采用了延时30s,然后过一段时间后再看

 
$ beanstool stats
+----------+----------+----------+----------+----------+----------+----------+----------+
| Name     | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |
+----------+----------+----------+----------+----------+----------+----------+----------+
| default  | 0        | 0        | 0        | 0        | 0        | 0        | 0        |
| testtube | 0        | 0        | 2        | 0        | 0        | 0        | 2        |
+----------+----------+----------+----------+----------+----------+----------+----------+
复制代码
已经有2个在ready状态了。

此时我们用消费者执行一下 php output.php 

与此同时迅速看状态

复制代码
$ beanstool stats
+----------+----------+----------+----------+----------+----------+----------+----------+
| Name     | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |
+----------+----------+----------+----------+----------+----------+----------+----------+
| default  | 0        | 0        | 0        | 0        | 0        | 0        | 0        |
| testtube | 0        | 0        | 1        | 1        | 0        | 0        | 2        |
+----------+----------+----------+----------+----------+----------+----------+----------+
再次执行
$ beanstool stats
+----------+----------+----------+----------+----------+----------+----------+----------+
| Name     | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |
+----------+----------+----------+----------+----------+----------+----------+----------+
| default  | 0        | 0        | 0        | 0        | 0        | 0        | 0        |
| testtube | 0        | 0        | 1        | 0        | 0        | 0        | 2        |
+----------+----------+----------+----------+----------+----------+----------+----------+
复制代码
因为我们有sleep(2),所以要尽量快点操作这个状态监控的命令,可以看到有一个拿出来放入了reserved,然后就消失了(实际上这是后面的代码delete导致的,因为已经消费完毕)

再次执行 php output.php 

 
$ beanstool stats:
+----------+----------+----------+----------+----------+----------+----------+----------+
| Name     | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |
+----------+----------+----------+----------+----------+----------+----------+----------+
| default  | 0        | 0        | 0        | 0        | 0        | 0        | 0        |
| testtube | 0        | 0        | 0        | 1        | 0        | 0        | 2        |
+----------+----------+----------+----------+----------+----------+----------+----------+

$ beanstool stats:
+---------+----------+----------+----------+----------+----------+----------+----------+
| Name    | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |
+---------+----------+----------+----------+----------+----------+----------+----------+
| default | 0        | 0        | 0        | 0        | 0        | 0        | 0        |
+---------+----------+----------+----------+----------+----------+----------+----------+
 
同样也是迅速观测这个状态,发现消费1个,然后删除1个,现在队列空了,这说明确实是符合我们的期望的。

然后回到文章开头提到的扩展,这个扩展就是帮我们实现了composer装的那个pheanstalk包。


这个扩展如何安装呢?

步骤如下:

克隆项目

git clone https://gitee.com/qzfzz/php-beanstalk.git
进行编译 

phpize

然后

./configure

之后

make && make install

sudo make install

然后修改 php.ini

sudo gedit /etc/php/7.4/cli/php.ini /etc/php/7.4/apache2/php.ini

加上这句

extension=beanstalk

重启apache2

sudo /etc/init.d/apache2 restart
之后就可以使用这个扩展了。

这个扩展它封装为函数了,可以看到他有个例子文件

简单的找了几个例子

复制代码
<?php

$config = [
  'host' => '127.0.0.1',
  'port' => 11300
];
$beanstalk_obj = beanstalk_connect($config['host'], $config['port']);
$last_job_id = beanstalk_put($beanstalk_obj, "message detail");
beanstalk_delete($beanstalk_obj, $last_job_id);
$last_job_id = beanstalk_putInTube($beanstalk_obj, 'tubea', "message detail");
复制代码
可以看到使用 connect 连接, put 塞入新的job消息, putInTube 来塞入指定管道的tubea,delete来删除等等,具体可以看看源代码学习一下,我对比了一下这两种方式实现效率。

第一种采用composer包(我还特意去掉了加载文件所需要的时间)

复制代码
<?phprequire __DIR__ . '/vendor/autoload.php';

use Pheanstalk\Pheanstalk;

$start = microtime( true );
$pheanstalk = Pheanstalk::create('127.0.0.1');

$pheanstalk
  ->useTube('testtube')
  ->put(date("Y-m-d H:i:s") . "job payload goes here\n");

$end = microtime(true);

echo ($end - $start) * 1000 . " ms";
复制代码
执行需要2.59ms

第二种直接用扩展函数


<?php
$start = microtime(true);

$config = [
  'host' => '127.0.0.1',
  'port' => 11300
];

$beanstalk_object = beanstalk_connect($config['host'], $config['port']);
$last_job_id = beanstalk_putInTube($beanstalk_object, 'testtube', date("Y-m-d H:i:s") . "job payload goes here\n");

$end = microtime(true);

echo ($end - $start) * 1000 . " ms";
复制代码
执行需要0.34ms

不得不说,扩展就是扩展,就是快的多啊!

我另外测试了一下投递极限

循环投递10000次消息,大概在500ms左右

复制代码
$start = microtime( true );
for ($i=0; $i < 10000; $i++) { 
  $pheanstalk
  ->useTube('testtube')
  ->put(date("Y-m-d H:i:s") . "job payload goes here\n");
}
$end = microtime( true );
echo ($end - $start) * 1000 . " ms";


 
也就意味着1秒钟只能投递20000条消息,这比rabbitmq的投递慢多了(参见这篇文章)

但是它执行1次投递消息的时候却比这个rabbitmq的代码执行的快,原因是我测试了一下这个rabbitmq的连接上就耗费了9ms

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

更别提还有这两步了

$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
这样来看,想迅速投递一条短消息,或者建立小信息量的job用beanstalk是很不错的,如果有大量消息集中投递,那使用 rabbitmq 是很不错的。

另外这个beanstalk投递可以延时,非常适合有些时候需要在当前时间一段时间后执行某个任务,往后弄个类似于一次性的定时器的功能,这个东西值得尝试。

而且如果使用while死循环将 output.php 脚本一直挂着跑,它就能一直消费消息了,这样就等于有个后端进程一直能帮我们做消费者处理堆积的任务了,特殊场景可以考虑一下这个方案。

Windows 安装 Beanstalkd

下载并安装 cygwin

1. 添加镜像地址 - http://mirrors.aliyun.com
2. 下载 `gcc`、`gcc-core`、`make`、`automake`, 在 Devel 分支下

下载 beanstalkd-win 包

1. 解压并进入beanstalkd-win目录
2. 打开CMD窗口,运行 ./beanstalkd.exe -l 127.0.0.1 -p 11300
 

相关文章:

BeansTalkd 做消息队列服务

无意间看到这个仓库讲php关于 BeanStalkd 的扩展&#xff0c;然后就去了解了一下beanstalkd&#xff0c;用它可以用来做队列服务。 话不多说&#xff0c;安装一下试试。 首先 sudo apt search beanstalk 搜索一下发现 Sorting... Done Full Text Search... Done awscli/focal…...

csv文件添加文件内容和读取

append content to file import numpy as np acc_listnp.array([0.97,0.92,0.93,0.89]) # 注意这个地方添加文件不需要特别声明是什么文件 file open("result.csv", "a") print("{:.2f}, {:.2f}".format(acc_list.mean(), acc_list.std()), f…...

关于禅道的安装配置以及项目管理、团队协同工作

目录 一、禅道是什么&#xff1f; 二、特点和功能 三、安装禅道 3.1 下载官网 3.2 版本考虑 3.3 禅道使用手册参考 3.4 Windows端安装禅道 四、启动禅道 4.1 访问禅道 四、禅道部分功能的使用 4.1 添加项目集 4.2 启动/关闭项目 4.3 项目计划仪表盘/阶段目标/研发…...

使用Wireshark提取流量中图片方法

0.前言 记得一次CTF当中有一题是给了一个pcapng格式的流量包&#xff0c;flag好像在某个响应中的图片里。比较简单&#xff0c;后来也遇到过类似的情况&#xff0c;所以总结和记录一下使用Wireshark提取图片的方法。 提取的前提是HTTP协议&#xff0c;至于HTTPS的协议需要导入服…...

C#,简单修改Visual Studio 2022设置以支持C#最新版本的编译器,尊享编程之趣

1 PLS README & CHAPTER 5 用一个超简单的例子说明各版本 C# 的差异。 使用新版本&#xff08;比如C#.11&#xff09;&#xff0c;当然有一定的好处。我们在写程序的时候一般这样&#xff1a; Visual Studio 2022 默认只能这样写&#xff1a; string imageFile Path.C…...

小程序Tab栏与页面滚动联动

小程序tab栏切换与页面滚动联动 tab栏与页面滚动联动点击tab栏页面跳到指定位置滚动页面时切换tab栏 tab栏与页面滚动联动 在进行小程序开发时&#xff0c;需要实现点击tab栏页面滚动到某一指定位置&#xff0c;并且滚动页面时&#xff0c;小程序的tab栏进行切换。 在一开始&a…...

Java,数据结构与集合源码,关于List接口的实现类(ArrayList、Vector、LinkedList)的源码剖析

目录 ArrayList ArrayList的特点&#xff1a; ArrayList源码解析&#xff1a; Vector Vector的特点&#xff1a; Vector源码解析&#xff1a; LinkedList LinkedList的特点&#xff1a; LinkedList的源码剖析&#xff1a; 使用说明&#xff1a; ArrayList ArrayList的…...

算法基础(python版本)

第二章 算法设计思想 一、搜索排序 1.排序算法 https://visualgo.net/zh/sorting (1)冒泡排序 # 思路&#xff1a; # (1)比较相邻元素&#xff0c;如果第一个比第二个大&#xff0c;则交换他们 # (2)第一轮下来&#xff0c;可以保证最后一个数一定是最大的&#xff1b;第二…...

使用Arrays.Sort并定制Comparator排序解决合并区间

合并区间-力扣算法题56题 以数组 intervals 表示若干个区间的集合&#xff0c;其中单个区间为 intervals[i] [starti, endi] 。请你合并所有重叠的区间&#xff0c;并返回 一个不重叠的区间数组&#xff0c;该数组需恰好覆盖输入中的所有区间 。 示例 1&#xff1a; 输入&am…...

【机器学习】039_合理初始化

一、稳定训练 目标&#xff1a;使梯度值在更合理的范围内 常见方法如下&#xff1a; 将乘法变为加法 ResNet&#xff1a;当层数较多时&#xff0c;会加入一些加法进去 LSTM&#xff1a;如果时序序列较长时&#xff0c;把一些对时序的乘法做加法 归一化 梯度归一化&…...

使用Arrays.asList与不使用的区别

在写算法的时候&#xff0c;遇到了有的题解使用的是Arrays.asList&#xff0c;也有的是直接新建一个List集合将元素加进去的。 看了一下算法的时间&#xff0c;两者居然相差了9秒。 算法原地址&#xff1a; 力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长…...

基于可变形卷积和注意力机制的带钢表面缺陷快速检测网络DCAM-Net(论文阅读笔记)

原论文链接->DCAM-Net: A Rapid Detection Network for Strip Steel Surface Defects Based on Deformable Convolution and Attention Mechanism | IEEE Journals & Magazine | IEEE Xplore DCAM-Net: A Rapid Detection Network for Strip Steel Surface Defects Base…...

el-table 对循环产生的空白列赋默认值

1. el-table 空白列赋值 对el-table中未传数据存在空白的列赋默认值0。使用el-table 提供的插槽 slot-scope&#xff1a;{{ row || ‘0’ }} 原数据&#xff1a; <el-table-column label"集镇" :propcity ><template slot-scope"{row}">{{…...

新一代网络监控技术——Telemetry

一、Telemetry的背景 传统的网络设备监控方式有SNMP、CLI、Syslog、NetStream、sFlow&#xff0c;其中SNMP为主流的监控数据方式。而随着网络系统规模的扩大&#xff0c;网络设备数量的增多&#xff0c;网络结构的复杂&#xff0c;相应监控要求也不断提升&#xff0c;如今这些…...

java斗牛,咋金花

无聊时间&#xff0c;打发下游戏 简单说下思路 目录 1.创建牌对象 2.创建52张牌&#xff0c;不包含大小王 3.洗牌 4.发牌 1.创建牌对象 2.创建52张牌&#xff0c;不包含大小王 3.洗牌 4.发牌 /*** 扑克牌*/ public class Poker {/*** 花色*/private String cardSuits…...

深信服技术认证“SCSA-S”划重点:信息收集

为帮助大家更加系统化地学习网络安全知识&#xff0c;以及更高效地通过深信服安全服务认证工程师考核&#xff0c;深信服特别推出“SCSA-S认证备考秘笈”共十期内容&#xff0c;“考试重点”内容框架&#xff0c;帮助大家快速get重点知识~ 划重点来啦 深信服安全服务认证工程师…...

代码逻辑修复与其他爬虫ip库的应用

在一个项目中&#xff0c;由于需要设置 http_proxy 来爬虫IP访问网络&#xff0c;但在使用 requests 库下载文件时遇到了问题。具体表现为在执行 Python 脚本时&#xff0c;程序会阻塞并最终超时&#xff0c;无法正常完成文件下载。 解决方案 针对这个问题&#xff0c;我们可以…...

字符串结尾空格比较相关参数BLANK_PAD_MODE(DM8:达梦数据库)

DM8:达梦数据库 字符串结尾空格比较相关参数BLANK_PAD_MODE 环境介绍1 BLANK_PAD_MODE01.1 初始化数据库1.2 创建测试表 T0 2 BLANK_PAD_MODE12.1 初始化数据库2.2 创建测试表 T1 3 BLANK_PAD_MODE只对字段varchar类型生效3.1 BLANK_PAD_MODE 对char 类型对比无效3.2 在两个数据…...

微型计算机原理MOOC题

一、8254 1.掉坑了&#xff0c;AL传到端口不意味着一定传到的是低位&#xff0c;要看控制字D5和D4&#xff0c;10是只写高位&#xff0c;所以是0A00.。。 2. 3. 4.待解决&#xff1a;...

TensorFlow实战教程(十八)-Keras搭建卷积神经网络及CNN原理详解

从本专栏开始,作者正式研究Python深度学习、神经网络及人工智能相关知识。前一篇文章详细讲解了Keras实现分类学习,以MNIST数字图片为例进行讲解。本篇文章详细讲解了卷积神经网络CNN原理,并通过Keras编写CNN实现了MNIST分类学习案例。基础性文章,希望对您有所帮助! 一…...

Docker与Testcontainers构建本地AI测试环境实践

1. 项目概述"Local AI with Dockers Testcontainers"这个组合乍看有些矛盾——AI模型通常需要GPU资源&#xff0c;而Testcontainers作为轻量级测试工具似乎更适合微服务场景。但实际这正是现代AI工程化的一个巧妙实践&#xff1a;用容器化技术解决AI开发中最头疼的环…...

AgentFlocks:构建去中心化多智能体协作系统的开源框架实践

1. 项目概述&#xff1a;从“羊群”到“智能体集群”的范式跃迁最近在开源社区里&#xff0c;一个名为AgentFlocks/flocks的项目引起了我的注意。这个名字很有意思&#xff0c;“flocks”直译是“羊群”或“鸟群”&#xff0c;而“Agent”则指向了当下最热的智能体。这不禁让我…...

LeetCode 二分图判定题解

LeetCode 二分图判定题解 题目描述 二分图是一种特殊的图&#xff0c;它的顶点可以被分为两个不相交的集合&#xff0c;使得图中的每条边都连接不同集合中的顶点。 示例&#xff1a; 对于以下图&#xff1a;A -- B| |C -- D这是一个二分图&#xff0c;因为可以将顶点分为两个…...

ARM架构调试系统核心:MDSCR_EL1寄存器详解与实践

1. ARM架构调试系统概述在嵌入式系统和低层软件开发中&#xff0c;调试功能的重要性不言而喻。ARM架构提供了一套完整的调试基础设施&#xff0c;其中MDSCR_EL1&#xff08;Monitor Debug System Control Register&#xff09;是调试系统的核心控制枢纽。这个64位寄存器位于EL1…...

10年运维总监深度拆解:成本优化与资源管理,如何在“稳”与“省”之间找到最佳平衡点?

一句话核心价值&#xff1a;本文帮你建立一套可量化、可落地的“稳中有省”运维决策框架&#xff0c;让你在2026年IT预算持续承压的背景下&#xff0c;既能守住系统生命线&#xff0c;又能把每一分钱花在刀刃上。一、你在追求“省”的时候&#xff0c;到底在冒多大的“不稳”风…...

基于Hermes Agent与Railway的自主AI智能体一键部署实战

1. 项目概述&#xff1a;一键部署你的智能AI助手 最近在折腾AI智能体&#xff0c;发现了一个挺有意思的项目&#xff1a;Hermes Agent。简单来说&#xff0c;这是一个能自我进化的自主AI智能体&#xff0c;最吸引我的是它原生支持Telegram&#xff0c;这意味着你可以直接在Tel…...

LLM智能体开发中的数据标准化实践与ADP协议解析

1. 项目背景与核心价值在大型语言模型&#xff08;LLM&#xff09;智能体开发领域&#xff0c;微调数据集的质量和标准化程度直接决定了智能体的最终表现。当前行业面临的核心痛点在于&#xff1a;不同研究团队和企业在构建智能体时&#xff0c;往往使用各自私有格式的数据集&a…...

使用Hugging Face Spaces构建交互式图像数据集可视化工具

1. 项目概述在计算机视觉领域&#xff0c;数据可视化是理解数据集特征的关键第一步。Hugging Face Spaces&#xff08;简称HF Space&#xff09;提供了一个绝佳的平台&#xff0c;让开发者能够快速构建和分享交互式的机器学习应用。这个项目将带你从零开始&#xff0c;创建一个…...

【20年嵌入式老兵亲授】:C语言裸机编程在工业边缘节点中规避内存泄漏与时序抖动的7个硬核技巧

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;裸机环境下的C语言编程本质与工业边缘节点特殊约束 在工业边缘计算场景中&#xff0c;裸机&#xff08;Bare-metal&#xff09;C编程并非仅是“不带操作系统的C”&#xff0c;而是对硬件时序、内存拓扑…...

AI智能体浏览器自动化实战:绕过反爬虫与验证码的终极方案

1. 项目概述&#xff1a;为AI智能体赋予“真实浏览器之手”如果你正在使用Claude Code、Cursor、OpenClaw这类AI编程助手&#xff0c;并且尝试过让它们帮你自动完成一些网页操作——比如抓取商品价格、监控新闻动态、或者自动填写表单——那你大概率经历过这样的挫败&#xff1…...