讲解基础环境
mac os
php 8.3
php-rdkafka
laravel10
安装扩展
方式 1
安装依赖 librdkafka
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka # git clone 后的文件夹
make && make install
安装 php
扩展
git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
/opt/homebrew/Cellar/php/8.3.0/bin/phpize # 找到需要安装扩展的php安装目录下的 phpize
./configure --with-php-config=/opt/homebrew/Cellar/php/8.3.0/bin/php-config # 找到需要安装扩展的php安装目录下的 php-config
make && make install
php.ini
加入配置
extension = rdkafka.so
方式 2
安装依赖
brew install librdkafka # mac os
apt install librdkafka-dev # ubuntu
yum install librdkafka-devel # centos
安装 php
扩展
pecl install rdkafka
如果提示需要安装 librdkafka
的话在执行 pecl
的时候输入电脑上的安装目录示例 /opt/homebrew/Cellar/librdkafka/2.3.0
安装 Kafka 环境 (docker-compose)
保存到 kafka.yml
version: '3'
name: kafka-group
services:
zookeeper-test:
image: zookeeper
ports:
- "2181:2181"
# volumes:
# - zookeeper_vol:/data
# - zookeeper_vol:/datalog
# - zookeeper_vol:/logs
container_name: zookeeper-test
kafka-test:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: "localhost"
KAFKA_ZOOKEEPER_CONNECT: "zookeeper-test:2181"
KAFKA_LOG_DIRS: "/kafka/logs"
# volumes:
# - kafka_vol:/kafka
depends_on:
- zookeeper-test
container_name: kafka-test
kafka-manager:
image: sheepkiller/kafka-manager:latest
restart: always
container_name: kafka-manager
hostname: kafka-manager
ports:
- "9010:9000"
links:
- kafka-test
external_links:
- zookeeper-test
depends_on:
- zookeeper-test
environment:
ZK_HOSTS: zookeeper-test:2181
TZ: CST-8
创建命令
docker-compose -p kafka -f kafka.yml up --no-start
封装工具
位置 ./Kafka/KafkaClient.php
<?php
namespace App\Logic\Kafka;
use RdKafka\Conf;
use RdKafka\Consumer;
use RdKafka\Producer;
use Exception;
class KafkaClient
{
private Producer|Consumer $rk;
/** @var string[] $brokerList 需要连接的 kafka 节点地址 */
private array $brokerList = [
'127.0.0.1:9092'
];
public const ProducerMode = 'producer';
public const ConsumerMode = 'consumer';
public const TOPIC_TEST = 'test';
/**
* @param string $mode 模式
* @param bool $debug 是否开启调试模式
* @throws null
*/
public function __construct(string $mode, bool $debug = false)
{
$conf = new Conf();
if ($debug) {
$conf->set('log_level', (string) LOG_DEBUG);
$conf->set('debug', 'all');
} else {
$conf->set('log_level', (string) LOG_ERR);
}
$this->rk = match ($mode) {
self::ProducerMode => new Producer($conf),
self::ConsumerMode => new Consumer($conf),
default => throw new Exception('Not find mode'),
};
$this->rk->addBrokers(implode(',', $this->brokerList));
}
/**
* @param string $topicName 主题名称
* @return KafkaTopic
*/
public function newTopic(string $topicName): KafkaTopic
{
return new KafkaTopic($this->rk, $topicName);
}
}
位置 ./Kafka/KafkaTopic.php
<?php
namespace App\Logic\Kafka;
use RdKafka\Consumer;
use RdKafka\ConsumerTopic;
use RdKafka\Message;
use RdKafka\Producer;
use RdKafka\ProducerTopic;
class KafkaTopic
{
private Producer|Consumer $rk;
private ProducerTopic|ConsumerTopic $rkTopic;
public function __construct(Producer|Consumer $rk, string $name)
{
$this->rk = $rk;
$this->rkTopic = $this->rk->newTopic($name);
}
public function send(mixed $data): void
{
// 发送消息
$this->rkTopic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($data, JSON_UNESCAPED_UNICODE));
// 刷新发送队列
$this->rk->flush(1000);
}
public function recv(callable $callback): void
{
// 订阅消息 consumeStart 参数 2 下方有讲解
$this->rkTopic->consumeStart(0, -1);
// 循环消费消息
while (true) {
$payload = $this->rkTopic->consume(0, 1000);
if (empty($payload)) continue;
/** @var Message $payload */
if ($payload->err ?? false) {
echo "Error: {$payload->errstr()}\n";
} else {
$callback(json_decode($payload->payload, true));
// 保存下一个偏移量
$this->rkTopic->offsetStore($payload->partition, $payload->offset+1);
}
}
}
}
使用演示代码
生产者
<?php
namespace App\Console\Commands\Kafka;
use App\Logic\Kafka\KafkaClient;
use Illuminate\Console\Command;
use Illuminate\Support\Str;
class KafkaSend extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'app:kafka-send';
/**
* The console command description.
*
* @var string
*/
protected $description = 'Command description';
/**
* Execute the console command.
*/
public function handle(): void
{
$kafka = new KafkaClient(KafkaClient::ProducerMode);
$kafka->newTopic(KafkaClient::TOPIC_TEST)->send(['aaa' => Str::random(10)]);
}
}
消费者
<?php
namespace App\Console\Commands\Kafka;
use App\Logic\Kafka\KafkaClient;
use Illuminate\Console\Command;
class KafkaRecv extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'app:kafka-recv';
/**
* The console command description.
*
* @var string
*/
protected $description = 'Command description';
/**
* Execute the console command.
*/
public function handle()
{
$kafka = new KafkaClient(KafkaClient::ConsumerMode);
$kafka->newTopic(KafkaClient::TOPIC_TEST)->recv(function (mixed $data) {
dump($data);
});
}
}
consumeStart 参数 2 的参数选择 ⬇️
偏移量选项 | 作用 | 使用时机 |
---|---|---|
RD_KAFKA_OFFSET_BEGINNING | 从最早的消息开始消费 | 当你想要从 Kafka 主题的最早消息开始消费时。 |
RD_KAFKA_OFFSET_END | 从当前末尾开始消费 | 当你只需要消费最新发布的消息,不需要处理历史消息时。 |
RD_KAFKA_OFFSET_STORED | 从存储的偏移量处开始消费 | 当你希望消费者从上次提交的偏移量处开始消费,而不是从最早或最新的消息开始。 |
指定特定的偏移量值 | 从指定的偏移量处开始消费 | 当你想要从特定的偏移量值处开始消费,比如说重新消费某个特定的消息。 |
评论区