微服务之消息中间件(rabbitMQ)

常用MQ中间件产品 ActiveMQ RabbitMQ  Kafka 基于AMQP协议

1.Producer生成消息并发送给MQ  (同步,异步)

2.MQ接收消息并将消息数据持久化到消息存储

3.MQ向Producer返回消息的接收结果 (返回值,异常)

4.Consumer监听并消费MQ中的消息

5.Consumer获取到消息后执行业务处理

6.Consumer对已成功消费的消息想MQ进行ACK确认(确认后的消息将从MQ中删除)

安装服务端:

1.docker安装rabbitMQ

docker run -d --hostname my-rabbit  --name rabbit -15672:15672 -p 5672:5672  rabbitmq

2.安装 rabbitmq-c,C 与 RabbitMQ 通信需要依赖 rabbitmq-c 库(librabbitmq),具体请看

https://github.com/alanxz/rabbitmq-c

git clone git://github.com/alanxz/rabbitmq-c.git

cd rabbitmq-c

cmake -DCMAKE_INSTALL_PREFIX=/usr/local/rabbitmq

cmake --build . --target install

注意路径指定,下载amqp扩展时需要指定路径,同时需要修拷贝安装路径下的lib64为lib,否则php扩展载入的时候找不到扩展

3.接下来就是安装 php amqp 的扩展了:

你可以源码编译安装

wget http://pecl.php.net/get/amqp-1.8.0.tgz

tar zxvf amqp-1.8.0.tgz

cd amqp-1.8.0

/usr/local/php/bin/phpize

./configure --with-php-config=/usr/local/php/bin/php-config --with-amqp --with-librabbitmq-dir=/usr/local/rabbitmq

make && make install

必须指定好 rabbitmq-c文件的安装路径才行

最后将'extension=amqp.so'加入 php.ini

生产者:

<?php
   try{
        $config =[
            'host' => '127.0.0.1',
            'vhost' => '/',
            'port' => 5672,
            'login' => 'guest',
            'password' => 'guest'
        ];
        $exName = 'tradeExchange';
        $routeKey='/trade';
        //实例化对象
        $mq = new AMQPConnection($config);
        //连接对象
        $mq->connect();
        //建立通道
        $ch = new AMQPChannel($mq);
        //建立交换机
        $ex = new AMQPExchange($ch);
        $ex->setName($exName);//通道名称
        $ex->setType(AMQP_EX_TYPE_DIRECT);//声明消息发送的模式
        $ex->declareExchange();//声明通道,创建通道
        //发布消息到交换机中,并且绑定好路由关系
        var_dump($ex->publish('我是一条任务消息',$routeKey));
   }catch (Exception $e){
         var_dump($e);
   
   }

消费者:


<?php
   try{
       $config =[
           'host' => '127.0.0.1',
           'vhost' => '/',
           'port' => 5672,
           'login' => 'guest',
           'password' => 'guest'
       ];
       $exName = 'tradeExchange';
       $routeKey='/trade';
       $queueName='trade';
       //实例化对象
       $mq = new AMQPConnection($config);
       //连接对象
       $mq->connect();
       //建立通道
       $ch = new AMQPChannel($mq);
       //建立交换机
       $ex = new AMQPExchange($ch);
       $ex->setName($exName);//通道名称
   
       //声明队列,并绑定交换机跟路由key
       $queue =new AMQPQueue($ch);//连接路由
       $queue->setName($queueName);
       $queue->declareQueue();
       //绑定监听
       $queue->bind($exName,$routeKey);
       //获取队列
       $queue->consume(function ($envelope,$queue){
           $data=$envelope->getBody();
           var_dump($data);
           //消费完成回应ack
           $queue->ack($envelope->getDeliveryTag());
       });//阻塞监听
   }catch (Exception $e){
       var_dump($e);
   
   }



MyAnswer博客
请先登录后发表评论
  • 最新评论
  • 总共0条评论