侧边栏壁纸
博主头像
ToDream博主等级

行动起来,活在当下

  • 累计撰写 13 篇文章
  • 累计创建 5 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

Kafka (PHP Client)

X
X
2023-12-14 / 0 评论 / 0 点赞 / 52 阅读 / 18825 字

讲解基础环境

mac os php 8.3 php-rdkafka laravel10

安装扩展

方式 1

安装依赖 librdkafka

git clone https://github.com/edenhill/librdkafka.git
cd librdkafka # git clone 后的文件夹
make && make install

安装 php 扩展

git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
/opt/homebrew/Cellar/php/8.3.0/bin/phpize # 找到需要安装扩展的php安装目录下的 phpize
./configure --with-php-config=/opt/homebrew/Cellar/php/8.3.0/bin/php-config # 找到需要安装扩展的php安装目录下的 php-config
make && make install

php.ini 加入配置

extension = rdkafka.so

方式 2

安装依赖

brew install librdkafka # mac os
apt install librdkafka-dev # ubuntu
yum install librdkafka-devel # centos

安装 php 扩展

pecl install rdkafka

如果提示需要安装 librdkafka 的话在执行 pecl 的时候输入电脑上的安装目录示例 /opt/homebrew/Cellar/librdkafka/2.3.0

安装 Kafka 环境 (docker-compose)

保存到 kafka.yml

version: '3'

name: kafka-group

services:
  zookeeper-test:
    image: zookeeper
    ports:
      - "2181:2181"
    # volumes:
    #   - zookeeper_vol:/data
    #   - zookeeper_vol:/datalog
    #   - zookeeper_vol:/logs
    container_name: zookeeper-test

  kafka-test:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: "localhost"
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper-test:2181"
      KAFKA_LOG_DIRS: "/kafka/logs"
    # volumes:
    #   - kafka_vol:/kafka
    depends_on:
      - zookeeper-test
    container_name: kafka-test

  kafka-manager:
    image: sheepkiller/kafka-manager:latest
    restart: always
    container_name: kafka-manager
    hostname: kafka-manager
    ports:
      - "9010:9000"
    links:
      - kafka-test
    external_links:
      - zookeeper-test
    depends_on:
      - zookeeper-test
    environment:
      ZK_HOSTS: zookeeper-test:2181
      TZ: CST-8

创建命令

docker-compose -p kafka -f kafka.yml up --no-start

封装工具

位置 ./Kafka/KafkaClient.php

<?php

namespace App\Logic\Kafka;

use RdKafka\Conf;
use RdKafka\Consumer;
use RdKafka\Producer;
use Exception;

class KafkaClient
{
    private Producer|Consumer $rk;

    /** @var string[] $brokerList 需要连接的 kafka 节点地址 */
    private array $brokerList = [
        '127.0.0.1:9092'
    ];

    public const ProducerMode = 'producer';
    public const ConsumerMode = 'consumer';

    public const TOPIC_TEST = 'test';

    /**
     * @param string $mode 模式
     * @param bool $debug 是否开启调试模式
     * @throws null
     */
    public function __construct(string $mode, bool $debug = false)
    {
        $conf = new Conf();

        if ($debug) {
            $conf->set('log_level', (string) LOG_DEBUG);
            $conf->set('debug', 'all');
        } else {
            $conf->set('log_level', (string) LOG_ERR);
        }

        $this->rk = match ($mode) {
            self::ProducerMode => new Producer($conf),
            self::ConsumerMode => new Consumer($conf),
            default => throw new Exception('Not find mode'),
        };

        $this->rk->addBrokers(implode(',', $this->brokerList));
    }

    /**
     * @param string $topicName 主题名称
     * @return KafkaTopic
     */
    public function newTopic(string $topicName): KafkaTopic
    {
        return new KafkaTopic($this->rk, $topicName);
    }
}

位置 ./Kafka/KafkaTopic.php

<?php

namespace App\Logic\Kafka;

use RdKafka\Consumer;
use RdKafka\ConsumerTopic;
use RdKafka\Message;
use RdKafka\Producer;
use RdKafka\ProducerTopic;

class KafkaTopic
{
    private Producer|Consumer $rk;
    private ProducerTopic|ConsumerTopic $rkTopic;

    public function __construct(Producer|Consumer $rk, string $name)
    {
        $this->rk = $rk;
        $this->rkTopic = $this->rk->newTopic($name);
    }

    public function send(mixed $data): void
    {
        // 发送消息
        $this->rkTopic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($data, JSON_UNESCAPED_UNICODE));
        // 刷新发送队列
        $this->rk->flush(1000);
    }

    public function recv(callable $callback): void
    {
        // 订阅消息 consumeStart 参数 2 下方有讲解
        $this->rkTopic->consumeStart(0, -1);
        // 循环消费消息
        while (true) {
            $payload = $this->rkTopic->consume(0, 1000);
            if (empty($payload)) continue;
            /** @var Message $payload */
            if ($payload->err ?? false) {
                echo "Error: {$payload->errstr()}\n";
            } else {
                $callback(json_decode($payload->payload, true));
                // 保存下一个偏移量
                $this->rkTopic->offsetStore($payload->partition, $payload->offset+1);
            }
        }
    }
}

使用演示代码

生产者
<?php

namespace App\Console\Commands\Kafka;

use App\Logic\Kafka\KafkaClient;
use Illuminate\Console\Command;
use Illuminate\Support\Str;

class KafkaSend extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'app:kafka-send';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Command description';

    /**
     * Execute the console command.
     */
    public function handle(): void
    {
        $kafka = new KafkaClient(KafkaClient::ProducerMode);
        $kafka->newTopic(KafkaClient::TOPIC_TEST)->send(['aaa' => Str::random(10)]);
    }
}
消费者
<?php

namespace App\Console\Commands\Kafka;

use App\Logic\Kafka\KafkaClient;
use Illuminate\Console\Command;

class KafkaRecv extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'app:kafka-recv';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Command description';

    /**
     * Execute the console command.
     */
    public function handle()
    {
        $kafka = new KafkaClient(KafkaClient::ConsumerMode);
        $kafka->newTopic(KafkaClient::TOPIC_TEST)->recv(function (mixed $data) {
            dump($data);
        });
    }
}
consumeStart 参数 2 的参数选择 ⬇️
偏移量选项作用使用时机
RD_KAFKA_OFFSET_BEGINNING从最早的消息开始消费当你想要从 Kafka 主题的最早消息开始消费时。
RD_KAFKA_OFFSET_END从当前末尾开始消费当你只需要消费最新发布的消息,不需要处理历史消息时。
RD_KAFKA_OFFSET_STORED从存储的偏移量处开始消费当你希望消费者从上次提交的偏移量处开始消费,而不是从最早或最新的消息开始。
指定特定的偏移量值从指定的偏移量处开始消费当你想要从特定的偏移量值处开始消费,比如说重新消费某个特定的消息。
0

评论区