侧边栏壁纸
博主头像
ToDream博主等级

行动起来,活在当下

  • 累计撰写 13 篇文章
  • 累计创建 5 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

PHP 使用 RabbitMQ

X
X
2023-11-27 / 0 评论 / 1 点赞 / 43 阅读 / 15941 字

1. 为什么要使用 RabbitMQ 🤔

异步通信:允许系统中的不同部分通过消息传递进行通信,而无需实时直接交互。这种异步通信可以提高系统的可伸缩性和灵活性。

解耦系统组件:消息代理系统可以在系统内部或跨系统之间充当中介,从而实现系统组件的解耦。这意味着每个组件可以独立地工作,而无需直接了解其他组件的实现细节。

消息队列:RabbitMQ等消息代理系统利用消息队列的方式来存储和传递消息。这可以帮助处理系统中的大量请求或数据,确保消息在需要时得到处理,而不会因为某个组件繁忙而丢失。

异步任务处理:允许将耗时的任务从主应用程序中分离出来并异步处理。通过将这些任务发送到队列中,可以提高应用程序的性能和响应性。

消息持久化:消息代理系统通常支持消息持久化,即使在代理或消费者宕机后,消息也不会丢失。这有助于确保数据的安全性和可靠性。

负载均衡:使用消息队列可以实现负载均衡,多个消费者可以同时从队列中获取消息并处理,提高系统的吞吐量和效率。

处理失败和重试机制:消息代理系统通常具备处理失败消息和重试机制的能力,可以有效处理由于错误而导致的消息处理失败的情况。

2. 关于使用

简单封装
<?php

namespace App\Logic;

use PhpAmqpLib\Channel\AbstractChannel;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class RabbitMqClient
{
    private AMQPStreamConnection $conn;
    private AbstractChannel|AMQPChannel $chan;
    private string $queueName;

    public function __construct()
    {
        $this->conn = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin');
        $this->chan = $this->conn->channel();
    }

    public function setQueue(string $queueName): self
    {
        $this->chan->queue_declare($this->queueName = $queueName, auto_delete: false);
        return $this;
    }

    /**
     * 发送
     *
     * @param mixed $data 需要发送的数据
     * @return void
     */
    public function send(mixed $data): void
    {
        $this->chan->basic_publish(new AMQPMessage(json_encode($data, JSON_UNESCAPED_UNICODE)), '', $this->queueName);
    }

    /**
     * 接收消息
     *
     * @param callable $callback
     * @return void
     */
    public function recv(callable $callback): void
    {
        $this->chan->basic_consume($this->queueName, '', false, false, false, false, $callback);
        while (count($this->chan->callbacks)) {
            $this->chan->wait();
        }
    }

    /**
     * 获取队列名称
     *
     * @return string
     */
    public function getQueueName(): string
    {
        return $this->queueName;
    }

    /**
     * 获取channel
     *
     * @return AbstractChannel|AMQPChannel
     */
    public function getChan(): AbstractChannel|AMQPChannel
    {
        return $this->chan;
    }

    /**
     * 关闭连接
     *
     * @return void
     * @throws null
     */
    public function close (): void
    {
        $this->conn->close();
        $this->chan->close();
    }
}

发送端
<?php

namespace App\Console\Commands\RabbitMQ;

use App\Logic\RabbitMqClient;
use Illuminate\Console\Command;

class RabbitMqSend extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'rabbit-mq:send';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Command description';

    /**
     * Execute the console command.
     */
    public function handle(): void
    {
        $mq = new RabbitMqClient();

        $mq->setQueue('test');
        $mq->send(['a' => 1]);
    }
}

接收端
<?php

namespace App\Console\Commands\RabbitMQ;

use App\Logic\RabbitMqClient;
use Illuminate\Console\Command;
use PhpAmqpLib\Message\AMQPMessage;

class RabbitMqRecv extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'rabbit-mq:recv';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Command description';

    /**
     * Execute the console command.
     */
    public function handle(): void
    {
        $mq = new RabbitMqClient();

        $mq->setQueue('test');

        while (true) {
            $mq->recv(function (AMQPMessage $data) use ($mq) {
                $this->queueHandler($mq, json_decode($data->getBody()), true);
              	// ack 确认消息已处理 避免进入重新发布 (没有ack的消息会在当前连接客户端断开重启后重新接收到此消息)
                $mq->getChan()->basic_ack($data->getDeliveryTag());
            });
        }
    }

    private function queueHandler(RabbitMqClient $mq, mixed $data): void
    {
        // 输出 decode 后接收到的消息
        dump($data);
    }

}

3. 函数讲解

基础依赖
composer require php-amqplib/php-amqplib
使用示例
use App\Logic\RabbitMqClient;

// 创建 RabbitMqClient 实例
$rabbitMqClient = new RabbitMqClient();

// 设置队列名称
$rabbitMqClient->setQueue('your_queue_name');

// 发送消息
$rabbitMqClient->send(['message' => 'Hello, RabbitMQ!']);

// 接收消息
$rabbitMqClient->recv(function ($msg) {
    echo "Received: ", $msg->body, "\n";
});

// 关闭连接
$rabbitMqClient->close();

方法

__construct()

初始化 RabbitMqClient 类并建立与 RabbitMQ 代理的连接。

setQueue(string $queueName): self

设置要发送/接收消息的队列名称。

send(mixed $data): void

向队列发送消息。

recv(callable $callback): void

从队列接收消息。使用回调函数处理接收到的消息。

getQueueName(): string

获取当前队列的名称。

getChan(): AbstractChannel|AMQPChannel

获取当前的 AMQP 通道。

close(): void

关闭与 RabbitMQ 的连接和通道。

注意事项
  • 请确保在使用 send()recv() 方法之后调用 close() 方法以关闭连接,避免资源泄露。
1

评论区