swoft+kafka实现消息队列处理消息发送

时间:2021-02-08作者:klpeng分类:IT综合浏览:389评论:0

kafka 使用的是腾讯云的ckafka
php 使用kafka 使用的是 rdkafka作为消费端
rdkafka 安装 (docker):

apt-get update
apt install librdkafka-dev
pecl install rdkafka
docker-php-ext-enable rdkafka

生产端:

/**
 * Class ParamLogic
 * @package App\Model\Logic
 */
class KafkaLogic
{
    public static function addMsg($param)
    {
        $rk = new \RdKafka\Producer();
//        $rk->setLogLevel(LOG_DEBUG); // 设置日志级别
        $rk->addBrokers(config('config.kafka.metadata_broker_list')); // 添加经纪人,就是ip地址

        $topic = $rk->newTopic(config('config.kafka.topic')[0]); // 新建主题
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($param)); // 生成并发送单个消息
        // Forget messages that are not fully sent yet
        $rk->poll(0);
        $timeout_ms = 1000*10;
        $result = $rk->flush($timeout_ms);
        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
            return false;
        }
        return true;
    }
}

消费端:swoft 开了一个进程池

配置:

 'processPool' => [
        'class' => \Swoft\Process\ProcessPool::class,
        'workerNum' => 3
    ],

使用:

/**
 * Class WorkerProcess
 *
 * @since 2.0
 *
 * @Process(workerId={0,1,2})
 */
class MessagesWorkerProcess implements ProcessInterface
{
    /**
     * @param Pool $pool
     * @param int  $workerId
     */
    public function run(Pool $pool, int $workerId): void
    {
        $conf = new \RdKafka\Conf();
        $conf->set('group.id', config('config.kafka.group_id'));
        $conf->set('metadata.broker.list', config('config.kafka.metadata_broker_list'));
        $conf->set('auto.offset.reset', 'smallest');

        $consumer = new \RdKafka\KafkaConsumer($conf);
        $consumer->subscribe(config('config.kafka.topic'));
        while (true) {
            // 1000是一秒
            $time = 120*1000;
            $message = $consumer->consume($time);
            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    $data = $message->payload;
//                    print_r($data);
                    $res = MessageLogic::handle($data); //具体处理方法
                    if($res){
                        //消费掉
                        $consumer->commit($message);
                    }
                    break;
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
//                    echo "No more messages; will wait for more\n";
                    break;
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
//                    echo "Timed out\n";
                    break;
                default:
//                    throw new \Exception($message->errstr(), $message->err);
                    break;
            }
        }
    }
}

打赏
文章版权声明:除非注明,否则均为彭超的博客原创文章,转载或复制请以超链接形式并注明出处。
相关推荐

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

猜你喜欢