当前位置: 首页 > news >正文

BeansTalkd 做消息队列服务

无意间看到这个仓库讲php关于 BeanStalkd 的扩展,然后就去了解了一下beanstalkd,用它可以用来做队列服务。

话不多说,安装一下试试。

首先 sudo apt search beanstalk 搜索一下发现


Sorting... Done
Full Text Search... Done
awscli/focal-updates,focal-updates 1.18.69-1ubuntu0.20.04.1 all
  Universal Command Line Environment for AWS

beanstalkd/focal,now 1.11-1 amd64 [installed]
  simple, in-memory, workqueue service

python-celery-common/focal,focal 4.2.1-5ubuntu1 all
  async task/job queue based on message passing (common files)

python-celery-doc/focal,focal 4.2.1-5ubuntu1 all
  async task/job queue based on message passing (Documentation)

python3-celery/focal,focal 4.2.1-5ubuntu1 all
  async task/job queue based on message passing (Python3 version)

ruby-beaneater/focal,focal 1.0.0-1 all
  simple beanstalkd client for Ruby
 

好了,找到这个beanstalkd了

然后安装

 sudo apt-get install beanstalkd 

Reading package lists... Done
Building dependency tree       
Reading state information... Done
Suggested packages:
  doc-base
The following NEW packages will be installed:
  beanstalkd
0 upgraded, 1 newly installed, 0 to remove and 90 not upgraded.
Need to get 43.9 kB of archives.
After this operation, 125 kB of additional disk space will be used.
Get:1 http://cn.archive.ubuntu.com/ubuntu focal/universe amd64 beanstalkd amd64 1.11-1 [43.9 kB]
Get:1 http://cn.archive.ubuntu.com/ubuntu focal/universe amd64 beanstalkd amd64 1.11-1 [43.9 kB]
Fetched 31.3 kB in 4s (7,047 B/s)     
Selecting previously unselected package beanstalkd.
(Reading database ... 256731 files and directories currently installed.)
Preparing to unpack .../beanstalkd_1.11-1_amd64.deb ...
Unpacking beanstalkd (1.11-1) ...
Setting up beanstalkd (1.11-1) ...
Created symlink /etc/systemd/system/multi-user.target.wants/beanstalkd.service → /lib/systemd/system/beanstalkd.service.
beanstalkd.socket is a disabled or a static unit, not starting it.
Processing triggers for man-db (2.9.1-1) ...
Processing triggers for systemd (245.4-4ubuntu3.11) ...
 

安装成功,这个东西会监听 11300 端口
然后使用这个工具来看看


监控工具:

wget https://github.com/src-d/beanstool/releases/download/v0.2.0/beanstool_v0.2.0_linux_amd64.tar.gz

获取文件后解压

tar -xvzf beanstool_v0.2.0_linux_amd64.tar.gz

然后拷贝到 /usr/local/bin/

sudo cp beanstool_v0.2.0_linux_amd64/beanstool /usr/local/bin/

这样就直接用 beanstool 了

查看当前状态

beanstool stats

结果

+---------+----------+----------+----------+----------+----------+----------+----------+
| Name    | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |
+---------+----------+----------+----------+----------+----------+----------+----------+
| default | 0        | 0        | 0        | 0        | 0        | 0        | 0        |
+---------+----------+----------+----------+----------+----------+----------+----------+

然后使用composer的vendor包

composer require pda/pheanstalk
安装完毕后创建一个input.php文件做生产者


<?php
require __DIR__ . '/vendor/autoload.php';

use Pheanstalk\Pheanstalk;

$pheanstalk = Pheanstalk::create('127.0.0.1');

// Queue a Job
$pheanstalk
  ->useTube('testtube')
  ->put("job payload goes here\n");

$pheanstalk
    ->useTube('testtube')
    ->put(
        json_encode(['test' => 'data']),  // encode data in payload
        Pheanstalk::DEFAULT_PRIORITY,     // default priority
        30, // delay by 30s
        60  // beanstalk will retry job after 60s
     );


再创建一个output.php文件做消费者


<?php
require __DIR__ . '/vendor/autoload.php';
use Pheanstalk\Pheanstalk;

$pheanstalk = Pheanstalk::create('127.0.0.1');

// we want jobs from 'testtube' only.
$pheanstalk->watch('testtube');

// this hangs until a Job is produced.
$job = $pheanstalk->reserve();

try {
    $jobPayload = $job->getData();
    // do work.

    sleep(2);
    // If it's going to take a long time, periodically
    // tell beanstalk we're alive to stop it rescheduling the job.
    $pheanstalk->touch($job);
    sleep(2);

    // eventually we're done, delete job.
    $pheanstalk->delete($job);
}
catch(\Exception $e) {
    // handle exception.
    // and let some other worker retry.
    $pheanstalk->release($job); 
}
 

然后执行一下  php input.php

查看 状态

 
$ beanstool stats
+----------+----------+----------+----------+----------+----------+----------+----------+
| Name     | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |
+----------+----------+----------+----------+----------+----------+----------+----------+
| default  | 0        | 0        | 0        | 0        | 0        | 0        | 0        |
| testtube | 0        | 1        | 1        | 0        | 0        | 0        | 2        |
+----------+----------+----------+----------+----------+----------+----------+----------+
 
我们看到有一个在ready状态,一个在delayed状态,这是由于第二次的put采用了延时30s,然后过一段时间后再看

 
$ beanstool stats
+----------+----------+----------+----------+----------+----------+----------+----------+
| Name     | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |
+----------+----------+----------+----------+----------+----------+----------+----------+
| default  | 0        | 0        | 0        | 0        | 0        | 0        | 0        |
| testtube | 0        | 0        | 2        | 0        | 0        | 0        | 2        |
+----------+----------+----------+----------+----------+----------+----------+----------+
复制代码
已经有2个在ready状态了。

此时我们用消费者执行一下 php output.php 

与此同时迅速看状态

复制代码
$ beanstool stats
+----------+----------+----------+----------+----------+----------+----------+----------+
| Name     | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |
+----------+----------+----------+----------+----------+----------+----------+----------+
| default  | 0        | 0        | 0        | 0        | 0        | 0        | 0        |
| testtube | 0        | 0        | 1        | 1        | 0        | 0        | 2        |
+----------+----------+----------+----------+----------+----------+----------+----------+
再次执行
$ beanstool stats
+----------+----------+----------+----------+----------+----------+----------+----------+
| Name     | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |
+----------+----------+----------+----------+----------+----------+----------+----------+
| default  | 0        | 0        | 0        | 0        | 0        | 0        | 0        |
| testtube | 0        | 0        | 1        | 0        | 0        | 0        | 2        |
+----------+----------+----------+----------+----------+----------+----------+----------+
复制代码
因为我们有sleep(2),所以要尽量快点操作这个状态监控的命令,可以看到有一个拿出来放入了reserved,然后就消失了(实际上这是后面的代码delete导致的,因为已经消费完毕)

再次执行 php output.php 

 
$ beanstool stats:
+----------+----------+----------+----------+----------+----------+----------+----------+
| Name     | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |
+----------+----------+----------+----------+----------+----------+----------+----------+
| default  | 0        | 0        | 0        | 0        | 0        | 0        | 0        |
| testtube | 0        | 0        | 0        | 1        | 0        | 0        | 2        |
+----------+----------+----------+----------+----------+----------+----------+----------+

$ beanstool stats:
+---------+----------+----------+----------+----------+----------+----------+----------+
| Name    | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |
+---------+----------+----------+----------+----------+----------+----------+----------+
| default | 0        | 0        | 0        | 0        | 0        | 0        | 0        |
+---------+----------+----------+----------+----------+----------+----------+----------+
 
同样也是迅速观测这个状态,发现消费1个,然后删除1个,现在队列空了,这说明确实是符合我们的期望的。

然后回到文章开头提到的扩展,这个扩展就是帮我们实现了composer装的那个pheanstalk包。


这个扩展如何安装呢?

步骤如下:

克隆项目

git clone https://gitee.com/qzfzz/php-beanstalk.git
进行编译 

phpize

然后

./configure

之后

make && make install

sudo make install

然后修改 php.ini

sudo gedit /etc/php/7.4/cli/php.ini /etc/php/7.4/apache2/php.ini

加上这句

extension=beanstalk

重启apache2

sudo /etc/init.d/apache2 restart
之后就可以使用这个扩展了。

这个扩展它封装为函数了,可以看到他有个例子文件

简单的找了几个例子

复制代码
<?php

$config = [
  'host' => '127.0.0.1',
  'port' => 11300
];
$beanstalk_obj = beanstalk_connect($config['host'], $config['port']);
$last_job_id = beanstalk_put($beanstalk_obj, "message detail");
beanstalk_delete($beanstalk_obj, $last_job_id);
$last_job_id = beanstalk_putInTube($beanstalk_obj, 'tubea', "message detail");
复制代码
可以看到使用 connect 连接, put 塞入新的job消息, putInTube 来塞入指定管道的tubea,delete来删除等等,具体可以看看源代码学习一下,我对比了一下这两种方式实现效率。

第一种采用composer包(我还特意去掉了加载文件所需要的时间)

复制代码
<?phprequire __DIR__ . '/vendor/autoload.php';

use Pheanstalk\Pheanstalk;

$start = microtime( true );
$pheanstalk = Pheanstalk::create('127.0.0.1');

$pheanstalk
  ->useTube('testtube')
  ->put(date("Y-m-d H:i:s") . "job payload goes here\n");

$end = microtime(true);

echo ($end - $start) * 1000 . " ms";
复制代码
执行需要2.59ms

第二种直接用扩展函数


<?php
$start = microtime(true);

$config = [
  'host' => '127.0.0.1',
  'port' => 11300
];

$beanstalk_object = beanstalk_connect($config['host'], $config['port']);
$last_job_id = beanstalk_putInTube($beanstalk_object, 'testtube', date("Y-m-d H:i:s") . "job payload goes here\n");

$end = microtime(true);

echo ($end - $start) * 1000 . " ms";
复制代码
执行需要0.34ms

不得不说,扩展就是扩展,就是快的多啊!

我另外测试了一下投递极限

循环投递10000次消息,大概在500ms左右

复制代码
$start = microtime( true );
for ($i=0; $i < 10000; $i++) { 
  $pheanstalk
  ->useTube('testtube')
  ->put(date("Y-m-d H:i:s") . "job payload goes here\n");
}
$end = microtime( true );
echo ($end - $start) * 1000 . " ms";


 
也就意味着1秒钟只能投递20000条消息,这比rabbitmq的投递慢多了(参见这篇文章)

但是它执行1次投递消息的时候却比这个rabbitmq的代码执行的快,原因是我测试了一下这个rabbitmq的连接上就耗费了9ms

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

更别提还有这两步了

$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
这样来看,想迅速投递一条短消息,或者建立小信息量的job用beanstalk是很不错的,如果有大量消息集中投递,那使用 rabbitmq 是很不错的。

另外这个beanstalk投递可以延时,非常适合有些时候需要在当前时间一段时间后执行某个任务,往后弄个类似于一次性的定时器的功能,这个东西值得尝试。

而且如果使用while死循环将 output.php 脚本一直挂着跑,它就能一直消费消息了,这样就等于有个后端进程一直能帮我们做消费者处理堆积的任务了,特殊场景可以考虑一下这个方案。

Windows 安装 Beanstalkd

下载并安装 cygwin

1. 添加镜像地址 - http://mirrors.aliyun.com
2. 下载 `gcc`、`gcc-core`、`make`、`automake`, 在 Devel 分支下

下载 beanstalkd-win 包

1. 解压并进入beanstalkd-win目录
2. 打开CMD窗口,运行 ./beanstalkd.exe -l 127.0.0.1 -p 11300
 

相关文章:

BeansTalkd 做消息队列服务

无意间看到这个仓库讲php关于 BeanStalkd 的扩展&#xff0c;然后就去了解了一下beanstalkd&#xff0c;用它可以用来做队列服务。 话不多说&#xff0c;安装一下试试。 首先 sudo apt search beanstalk 搜索一下发现 Sorting... Done Full Text Search... Done awscli/focal…...

csv文件添加文件内容和读取

append content to file import numpy as np acc_listnp.array([0.97,0.92,0.93,0.89]) # 注意这个地方添加文件不需要特别声明是什么文件 file open("result.csv", "a") print("{:.2f}, {:.2f}".format(acc_list.mean(), acc_list.std()), f…...

关于禅道的安装配置以及项目管理、团队协同工作

目录 一、禅道是什么&#xff1f; 二、特点和功能 三、安装禅道 3.1 下载官网 3.2 版本考虑 3.3 禅道使用手册参考 3.4 Windows端安装禅道 四、启动禅道 4.1 访问禅道 四、禅道部分功能的使用 4.1 添加项目集 4.2 启动/关闭项目 4.3 项目计划仪表盘/阶段目标/研发…...

使用Wireshark提取流量中图片方法

0.前言 记得一次CTF当中有一题是给了一个pcapng格式的流量包&#xff0c;flag好像在某个响应中的图片里。比较简单&#xff0c;后来也遇到过类似的情况&#xff0c;所以总结和记录一下使用Wireshark提取图片的方法。 提取的前提是HTTP协议&#xff0c;至于HTTPS的协议需要导入服…...

C#,简单修改Visual Studio 2022设置以支持C#最新版本的编译器,尊享编程之趣

1 PLS README & CHAPTER 5 用一个超简单的例子说明各版本 C# 的差异。 使用新版本&#xff08;比如C#.11&#xff09;&#xff0c;当然有一定的好处。我们在写程序的时候一般这样&#xff1a; Visual Studio 2022 默认只能这样写&#xff1a; string imageFile Path.C…...

小程序Tab栏与页面滚动联动

小程序tab栏切换与页面滚动联动 tab栏与页面滚动联动点击tab栏页面跳到指定位置滚动页面时切换tab栏 tab栏与页面滚动联动 在进行小程序开发时&#xff0c;需要实现点击tab栏页面滚动到某一指定位置&#xff0c;并且滚动页面时&#xff0c;小程序的tab栏进行切换。 在一开始&a…...

Java,数据结构与集合源码,关于List接口的实现类(ArrayList、Vector、LinkedList)的源码剖析

目录 ArrayList ArrayList的特点&#xff1a; ArrayList源码解析&#xff1a; Vector Vector的特点&#xff1a; Vector源码解析&#xff1a; LinkedList LinkedList的特点&#xff1a; LinkedList的源码剖析&#xff1a; 使用说明&#xff1a; ArrayList ArrayList的…...

算法基础(python版本)

第二章 算法设计思想 一、搜索排序 1.排序算法 https://visualgo.net/zh/sorting (1)冒泡排序 # 思路&#xff1a; # (1)比较相邻元素&#xff0c;如果第一个比第二个大&#xff0c;则交换他们 # (2)第一轮下来&#xff0c;可以保证最后一个数一定是最大的&#xff1b;第二…...

使用Arrays.Sort并定制Comparator排序解决合并区间

合并区间-力扣算法题56题 以数组 intervals 表示若干个区间的集合&#xff0c;其中单个区间为 intervals[i] [starti, endi] 。请你合并所有重叠的区间&#xff0c;并返回 一个不重叠的区间数组&#xff0c;该数组需恰好覆盖输入中的所有区间 。 示例 1&#xff1a; 输入&am…...

【机器学习】039_合理初始化

一、稳定训练 目标&#xff1a;使梯度值在更合理的范围内 常见方法如下&#xff1a; 将乘法变为加法 ResNet&#xff1a;当层数较多时&#xff0c;会加入一些加法进去 LSTM&#xff1a;如果时序序列较长时&#xff0c;把一些对时序的乘法做加法 归一化 梯度归一化&…...

使用Arrays.asList与不使用的区别

在写算法的时候&#xff0c;遇到了有的题解使用的是Arrays.asList&#xff0c;也有的是直接新建一个List集合将元素加进去的。 看了一下算法的时间&#xff0c;两者居然相差了9秒。 算法原地址&#xff1a; 力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长…...

基于可变形卷积和注意力机制的带钢表面缺陷快速检测网络DCAM-Net(论文阅读笔记)

原论文链接->DCAM-Net: A Rapid Detection Network for Strip Steel Surface Defects Based on Deformable Convolution and Attention Mechanism | IEEE Journals & Magazine | IEEE Xplore DCAM-Net: A Rapid Detection Network for Strip Steel Surface Defects Base…...

el-table 对循环产生的空白列赋默认值

1. el-table 空白列赋值 对el-table中未传数据存在空白的列赋默认值0。使用el-table 提供的插槽 slot-scope&#xff1a;{{ row || ‘0’ }} 原数据&#xff1a; <el-table-column label"集镇" :propcity ><template slot-scope"{row}">{{…...

新一代网络监控技术——Telemetry

一、Telemetry的背景 传统的网络设备监控方式有SNMP、CLI、Syslog、NetStream、sFlow&#xff0c;其中SNMP为主流的监控数据方式。而随着网络系统规模的扩大&#xff0c;网络设备数量的增多&#xff0c;网络结构的复杂&#xff0c;相应监控要求也不断提升&#xff0c;如今这些…...

java斗牛,咋金花

无聊时间&#xff0c;打发下游戏 简单说下思路 目录 1.创建牌对象 2.创建52张牌&#xff0c;不包含大小王 3.洗牌 4.发牌 1.创建牌对象 2.创建52张牌&#xff0c;不包含大小王 3.洗牌 4.发牌 /*** 扑克牌*/ public class Poker {/*** 花色*/private String cardSuits…...

深信服技术认证“SCSA-S”划重点:信息收集

为帮助大家更加系统化地学习网络安全知识&#xff0c;以及更高效地通过深信服安全服务认证工程师考核&#xff0c;深信服特别推出“SCSA-S认证备考秘笈”共十期内容&#xff0c;“考试重点”内容框架&#xff0c;帮助大家快速get重点知识~ 划重点来啦 深信服安全服务认证工程师…...

代码逻辑修复与其他爬虫ip库的应用

在一个项目中&#xff0c;由于需要设置 http_proxy 来爬虫IP访问网络&#xff0c;但在使用 requests 库下载文件时遇到了问题。具体表现为在执行 Python 脚本时&#xff0c;程序会阻塞并最终超时&#xff0c;无法正常完成文件下载。 解决方案 针对这个问题&#xff0c;我们可以…...

字符串结尾空格比较相关参数BLANK_PAD_MODE(DM8:达梦数据库)

DM8:达梦数据库 字符串结尾空格比较相关参数BLANK_PAD_MODE 环境介绍1 BLANK_PAD_MODE01.1 初始化数据库1.2 创建测试表 T0 2 BLANK_PAD_MODE12.1 初始化数据库2.2 创建测试表 T1 3 BLANK_PAD_MODE只对字段varchar类型生效3.1 BLANK_PAD_MODE 对char 类型对比无效3.2 在两个数据…...

微型计算机原理MOOC题

一、8254 1.掉坑了&#xff0c;AL传到端口不意味着一定传到的是低位&#xff0c;要看控制字D5和D4&#xff0c;10是只写高位&#xff0c;所以是0A00.。。 2. 3. 4.待解决&#xff1a;...

TensorFlow实战教程(十八)-Keras搭建卷积神经网络及CNN原理详解

从本专栏开始,作者正式研究Python深度学习、神经网络及人工智能相关知识。前一篇文章详细讲解了Keras实现分类学习,以MNIST数字图片为例进行讲解。本篇文章详细讲解了卷积神经网络CNN原理,并通过Keras编写CNN实现了MNIST分类学习案例。基础性文章,希望对您有所帮助! 一…...

测试微信模版消息推送

进入“开发接口管理”--“公众平台测试账号”&#xff0c;无需申请公众账号、可在测试账号中体验并测试微信公众平台所有高级接口。 获取access_token: 自定义模版消息&#xff1a; 关注测试号&#xff1a;扫二维码关注测试号。 发送模版消息&#xff1a; import requests da…...

Zustand 状态管理库:极简而强大的解决方案

Zustand 是一个轻量级、快速和可扩展的状态管理库&#xff0c;特别适合 React 应用。它以简洁的 API 和高效的性能解决了 Redux 等状态管理方案中的繁琐问题。 核心优势对比 基本使用指南 1. 创建 Store // store.js import create from zustandconst useStore create((set)…...

Spring Boot+Neo4j知识图谱实战:3步搭建智能关系网络!

一、引言 在数据驱动的背景下&#xff0c;知识图谱凭借其高效的信息组织能力&#xff0c;正逐步成为各行业应用的关键技术。本文聚焦 Spring Boot与Neo4j图数据库的技术结合&#xff0c;探讨知识图谱开发的实现细节&#xff0c;帮助读者掌握该技术栈在实际项目中的落地方法。 …...

高防服务器能够抵御哪些网络攻击呢?

高防服务器作为一种有着高度防御能力的服务器&#xff0c;可以帮助网站应对分布式拒绝服务攻击&#xff0c;有效识别和清理一些恶意的网络流量&#xff0c;为用户提供安全且稳定的网络环境&#xff0c;那么&#xff0c;高防服务器一般都可以抵御哪些网络攻击呢&#xff1f;下面…...

Go 并发编程基础:通道(Channel)的使用

在 Go 中&#xff0c;Channel 是 Goroutine 之间通信的核心机制。它提供了一个线程安全的通信方式&#xff0c;用于在多个 Goroutine 之间传递数据&#xff0c;从而实现高效的并发编程。 本章将介绍 Channel 的基本概念、用法、缓冲、关闭机制以及 select 的使用。 一、Channel…...

Windows安装Miniconda

一、下载 https://www.anaconda.com/download/success 二、安装 三、配置镜像源 Anaconda/Miniconda pip 配置清华镜像源_anaconda配置清华源-CSDN博客 四、常用操作命令 Anaconda/Miniconda 基本操作命令_miniconda创建环境命令-CSDN博客...

第7篇:中间件全链路监控与 SQL 性能分析实践

7.1 章节导读 在构建数据库中间件的过程中&#xff0c;可观测性 和 性能分析 是保障系统稳定性与可维护性的核心能力。 特别是在复杂分布式场景中&#xff0c;必须做到&#xff1a; &#x1f50d; 追踪每一条 SQL 的生命周期&#xff08;从入口到数据库执行&#xff09;&#…...

Vue3 PC端 UI组件库我更推荐Naive UI

一、Vue3生态现状与UI库选择的重要性 随着Vue3的稳定发布和Composition API的广泛采用&#xff0c;前端开发者面临着UI组件库的重新选择。一个好的UI库不仅能提升开发效率&#xff0c;还能确保项目的长期可维护性。本文将对比三大主流Vue3 UI库&#xff08;Naive UI、Element …...

ArcPy扩展模块的使用(3)

管理工程项目 arcpy.mp模块允许用户管理布局、地图、报表、文件夹连接、视图等工程项目。例如&#xff0c;可以更新、修复或替换图层数据源&#xff0c;修改图层的符号系统&#xff0c;甚至自动在线执行共享要托管在组织中的工程项。 以下代码展示了如何更新图层的数据源&…...

PydanticAI快速入门示例

参考链接&#xff1a;https://ai.pydantic.dev/#why-use-pydanticai 示例代码 from pydantic_ai import Agent from pydantic_ai.models.openai import OpenAIModel from pydantic_ai.providers.openai import OpenAIProvider# 配置使用阿里云通义千问模型 model OpenAIMode…...