Redis+lua去重消息队列,生产,消费

架构图

MyAnswer博客

为什么使用Reids+lua脚步

优点:
  减少网络开销:不使用Lua的代码向Redis发送多次请求,而脚步只需要一次即可,减少网络传输;
  原子操作: Redis将整个脚步作为一个原子执行,无需担心并发,也就是无需事物;
  复用:脚本会永远保存Redis中,其他客户端可继续使用
缺点:
  代码中存在阻塞

Redis集群中使用 Hash Tag

原理:当一个key包含{}的时候,不会对整个key做hash,而是对{}包含的字符串做hash
Hash tag可以让不同的key拥有相同的hash值,从而分配在同一个曹里
这样针对不同key的批量操作(mget/mset),以及事物,Lua脚本等都可以支持;

实例代码:

<?php
/**
 * Created by PhpStorm.
 * User: Administrator
 * Date: 2019/1/9
 * Time: 23:37
 */

class TestUnique
{
    public  $redis;
    public function __construct()
    {
       $this->redis = new RedisCluster(null,['45.40.207.143:6391','45.40.207.143:6392','45.40.207.143:6393']);
    }

    /**
     *  set集合不含重复value
     *  list集合可以重复
     *  先使用set集合对 keys[2]去重
     *  判断是否重复,不重复则存入到list集合中,重复则不存入
     */
    const SET = '
      local re = redis.call("SADD",KEYS[1],KEYS[2]) --set集合,重复的value值
      if re ==1 then --添加成功
         return redis.call("LPUSH",KEYS[2],ARGV[1])  --list集合
      else
          return 0  --已经存在 
      end
    ';


    /**
     * $KYE
     * 先删除list集合中的$KEY
     * 在删除set集合中KEYS[1]的值$KEY
     */
    const PUT='
     local v =  redis.call("RPOP",KEYS[2]) --删除list集合中的key
     if not v then --删除的失败,或者删除的key不存在
       return 0
     end
       --删除成功,再删除set集合中的元素
       return redis.call("SREM",KEYS[1],KEYS[2])
    ';


    /**
     * 存放
     */
    public function set(){
     return $this->redis->eval(self::SET,['{product_1_2000}:set','{product_1_2000}:2',"product"],2);
    }

    /**
     * 删除
     */
    public function put(){
      return $this->redis->eval(self::PUT,['{product_1_2000}:set','{product_1_2000}:2'],2);
    }
}
$unique =new TestUnique();
var_dump($unique->set());
var_dump($unique->put());

下面是我们封装实例  TestUnique类

<?php
/**
 * Created by PhpStorm.
 * User: Administrator
 * Date: 2019/1/9
 * Time: 23:37
 */

class TestUnique
{
    public  $redis;
    public function __construct()
    {
       $this->redis = new RedisCluster(null,['45.40.207.143:6391','45.40.207.143:6392','45.40.207.143:6393']);
    }

    /**
     *  set集合不含重复value
     *  list集合可以重复
     *  先使用set集合对 keys[2]去重
     *  判断是否重复,不重复则存入到list集合中,重复则不存入
     */
    const SET = '
      local re = redis.call("SADD",KEYS[1],KEYS[2]) --set集合,重复的value值
      if re ==1 then --添加成功
         return redis.call("LPUSH",KEYS[2],ARGV[1])  --list集合
      else
          return 0  --已经存在 
      end
    ';


    /**
     * $KYE
     * 先删除list集合中的$KEY
     * 在删除set集合中KEYS[1]的值$KEY
     */
    const PUT='
     local v =  redis.call("RPOP",KEYS[2]) --删除list集合中的key
     if not v then --删除的失败,或者删除的key不存在
       return 0
     end
       --删除成功,再删除set集合中的元素
       redis.call("SREM",KEYS[1],KEYS[2])
       return v
    ';


    /**
     * 存放
     */
    public function set($setName,$qunqueName,$body){
     return $this->redis->eval(self::SET,[$setName,$qunqueName,$body],2);
    }

    /**
     * 删除
     */
    public function put($setName,$qunqueName){
      return $this->redis->eval(self::PUT,[$setName,$qunqueName],2);
    }
}

生产类

<?php
/**
 * 生产者,当缓存不存在,就请求到源服务器,产生队列
 */
require 'TestUnique.php';

try {

    $setName  = '{test}:set'; //集合的key
    $quniqueName = '{test}:2';//队列的key
    $infoKey="{test}:infoKey"; //获取商品详情的key
    $TestUnique = new TestUnique();
    //判断读取的数据是否在集合中
    $re = $TestUnique->redis->sIsMember($setName,$quniqueName);
    //如果存在就等待读取,如果读取到数据马上结束循环,
    if ($re !==false){
     RESULT:
      $i=0;
      $re ='';
      while ($i<5){
          echo $i.PHP_EOL;
          $info = $TestUnique->redis->get($infoKey);
          if ($info){
              $re = $info;
              break;
          }
          $i++;
          sleep(2);
      }
      if (!$re){
          throw new Exception('数据在队列中,等待更新');
      }
      var_dump($info.PHP_EOL);
    }else {
    //不存在,将数据添加到集和队列中
    $body =['info'=>'shangp','id'=>1111];
    echo $TestUnique->set($setName,$quniqueName,json_encode($body))?'添加队列成功':'添加失败'.PHP_EOL;
    //当第一个用户push完数据后,跳转到等待结果的代码
    goto RESULT;
    }
}catch(Exception $e){
    var_dump($e);

}

消费类

<?php
/**
 * 消费者,取出队列中的数据进行消费
 */
require 'TestUnique.php';



try{
    $setName ='{test}:set'; //集合的key
    $TestUnique = new TestUnique();
    while (true){
        echo '正在处理'.PHP_EOL;
         $re = $TestUnique->redis->sMembers($setName);
         if (!empty($re)){
             /**
              * 循环消费队列
              */
             foreach ($re as $v){
                   $res = json_decode($TestUnique->put($setName,$v),true);
                   //根据类型分类,查询数据库mysql,再写入到redis中
                   switch ($res['info']){
                       case 'shangp':
                         echo '消费队列'.PHP_EOL;
                         //数据从mysql中获取
                         $TestUnique->redis->set('{test}:infoKey',$res['id']);
                       break;
                   }
             }
         }
        sleep(2);
    }

}catch (Exception $e){
    var_dump($e->getMessage());
}

test.sh脚本,检查消费进程是否执行,每分钟检测一次,不存在就执行,存在就重复执行  > /dev/null 将输入的信息丢弃到黑洞  &后台运行

#!/bin/bash
num=`ps aux|grep testconsumber\.php|grep -v grep|wc -l`
if [ $num == 0 ]; then
    /usr/local/php/bin/php /docker/php/testconsumber.php > /dev/null &
fi

chmod a+x test.sh添加执行权限

MyAnswer博客

查看进程状态

MyAnswer博客

查看进程数量

MyAnswer博客

MyAnswer博客
请先登录后发表评论
  • 最新评论
  • 总共0条评论