workbunny/webman-rabbitmq
🐇 A PHP implementation of RabbitMQ Client for webman plugin. 🐇
-
什么时候使用消息队列?
当你需要对系统进行解耦、削峰、异步的时候;如发送短信验证码、秒杀活动、资产的异步分账清算等。
-
RabbitMQ和Redis的区别?
Redis中的Stream的特性同样适用于消息队列,并且也包含了比较完善的ACK机制,但在一些点上与RabbitMQ存在不同:
- Redis Stream没有完善的后台管理;RabbitMQ拥有较为完善的后台管理及Api;
- Redis的持久化策略取舍:默认的RDB策略极端情况下存在丢失数据,AOF策略则需要牺牲一些性能;RabbitMQ持久化方案更多,可对消息持久化也可对队列持久化;
- RabbitMQ拥有更多的插件可以提供更完善的协议支持及功能支持;
-
什么时候使用Redis?什么时候使用RabbitMQ?
当你的队列使用比较单一或者比较轻量的时候,请选用 Redis Stream;当你需要一个比较完整的消息队列体系,包括需要利用交换机来绑定不同队列做一些比较复杂的消息任务的时候,请选择RabbitMQ;
当然,如果你的队列使用也比较单一,但你需要用到一些管理后台相关系统化的功能的时候,又不想花费太多时间去开发的时候,也可以使用RabbitMQ;因为RabbitMQ提供了一整套后台管理的体系及 HTTP API 供开发者兼容到自己的管理后台中,不需要再消耗多余的时间去开发功能;
注:这里的 轻量 指的是 无须将应用中的队列服务独立化,该队列服务是该应用独享的
- 此文档为3.0
- 2.0 LTS 2.0文档
RabbitMQ的webman客户端插件;
- 支持5种消费模式:简单队列、workQueue、routing、pub/sub、exchange;
- 支持延迟队列(rabbitMQ须安装插件);
- 异步无阻塞消费、异步无阻塞生产、同步阻塞生产;
┌───────────┐
| Builder A | ──┐
└───────────┘ | | ┌───────────┐
| | | Channel 1 |
| | └───────────┘
┌───────────┐ └─> ┌──────────────────┐ | ┌───────────┐
| Builder A | ────> | Connections Pool | ── connection ──> | | Channel 2 |
└───────────┘ ┌─> └──────────────────┘ min ... MAX | └───────────┘
| <static> <context> | ┌───────────┐
| | | Channel 3 |
┌───────────┐ | | └───────────┘
| Builder C | ──┘ ...
└───────────┘ channel-max
Builder:队列抽象结构BuilderConfig: 队列配置结构
Connection:抽象的连接对象- 连接池为静态,不会因为
Builder的释放而释放 - 可使用
ConnectionsManagement进行管理,也可使用Builder的静态方法进行管理 min_connections: 最小连接数max_connections: 最大连接数idel_timeout: 空闲回收时间 [s]wait_timeout: 等待连接超时时间 [s]
- 连接池为静态,不会因为
Channel:抽象的通道对象Consumer消费者与Producer生产者相互独立,但共用一个Channel池reuse:是否复用Channelwait_min:当Channel达到channel-max被限制创建时,最小的随机等待时间 [ms]wait_max:当Channel达到channel-max被限制创建时,最大的随机等待时间 [ms]
- php >= 8.1
- webman-framework >= 2.0
- rabbitmq-server >= 3.10
composer require workbunny/webman-rabbitmq
<?php declare(strict_types=1);
return [
'enable' => true,
// 日志 LoggerInterface | LoggerInterface::class
'logger' => null,
// 连接 ConnectionInterface | ConnectionInterface::class
'connection' => \Workbunny\WebmanRabbitMQ\Connections\Connection::class
];<?php declare(strict_types=1);
return [
'connections' => [
'rabbitmq' => [
'host' => 'rabbitmq',
'vhost' => '/',
'port' => 5672,
'username' => 'guest',
'password' => 'guest',
'mechanism' => 'AMQPLAIN',
'timeout' => 10,
// 重启间隔
'restart_interval' => 0,
// 心跳间隔
'heartbeat' => 50,
'lazy_connect' => false,
// 消费者
'consumer' => [
'reuse' => true, // 复用
'wait_min' => 10, // 最小间隔
'wait_max' => 90, // 最大间隔
],
// 生产者
'producer' => [
'reuse' => true,
'wait_min' => 10,
'wait_max' => 90,
],
// 连接池
'pool' => [
'min_connections' => 1,
'max_connections' => 10,
'idle_timeout' => 60,
'wait_timeout' => 10
],
// 心跳回调 callable
'heartbeat_callback' => null,
]
]
];- 构建:
php webman workbunny:rabbitmq-builder -h - 移除/关闭:
php webman workbunny:rabbitmq-remove -h - 列表:
php webman workbunny:rabbitmq-list -h
延迟队列需要为 rabbitMQ 安装 rabbitmq_delayed_message_exchange 插件
- 进入 rabbitMQ 的 plugins 目录下执行命令下载插件(以rabbitMQ 3.10.2举例):
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.10.2/rabbitmq_delayed_message_exchange-3.10.2.ez
- 执行安装命令
rabbitmq-plugins enable rabbitmq_delayed_message_exchange - 生产
注:向延迟队列发布普通消息会抛出一个 WebmanRabbitMQException 异常
publish(new TestBuilder(), 'abc', headers: [ 'x-delay' => 10000, # 延迟10秒 ]); # return bool
注:向延迟队列发布普通消息会抛出一个 WebmanRabbitMQPublishException 异常
-
快捷发送
use function Workbunny\WebmanRabbitMQ\publish; use process\workbunny\rabbitmq\TestBuilder; publish(new TestBuilder(), 'abc'); # return bool
-
原生发送,需要手动指定
exchange等use function Workbunny\WebmanRabbitMQ\publish; use process\workbunny\rabbitmq\TestBuilder; new TestBuilder()->connection()->getProducer(true)->publish();
- 创建自定义
Connection需继承实现ConnectionInterface; config/plugin/workbunny/webman-rabbitmq/app.php修改connection_class配置项为自定义Connection类;
- 创建自定义
Builder需继承实现AbstractBuilder;onWorkerStart消费进程启动时会触发,一般用于实现基础消费逻辑;onWorkerStop消费进程结束时会触发,一般用于回收资源;onWorkerReload消费进程重载,一般可置空;classContent用于配合命令行自动生成BuilderClass;
- 框架
bootstrap中加载注册代码- 使用
AbstractBuilder::registerMode()注册模式,便可使用Command命令行自定义--mode={your mode}
- 使用
插件默认使用workerman/rabbitmq的CorountieClient,自定义Client在插件使用范围内需配合自定义Connection使用
通常在需要直接调用原生RabbitMQ-Client并拓展其功能时才需要用到自定义开发
- 创建自定义
Client需继承实现AbstractClient;
