项目作者: poolbang

项目描述 :
基于swoft的redis队列
高级语言: PHP
项目地址: git://github.com/poolbang/queue.git
创建时间: 2018-08-29T00:00:57Z
项目社区:https://github.com/poolbang/queue

开源协议:

下载


Introduction

基于swoft2.0的redis延时队列
详细内容参考有赞的延迟队列

欢迎使用queue 2.0

queue V2.0版本的,配置更简单了,使用更简洁。
去除msg扩展,直接用json代替

安装 composer require poolbang/queue

添加config/queue.php文件,然后内容为:

  1. reutrn [
  2. 'contrast' => 10, //每次对比的元素数量, 默认: 10
  3. 'interval' => 2, //空数据时等待时长, 默认: 1
  4. 'log' => false, //是否写入日志, 默认: true
  5. ];

开启DelayQueue进程,在app/bean.php中的process添加下面内容:

  1. 'process' => [
  2. 'queue' => bean(\Queue\Process\DelayQueueProcess::class),
  3. ],
job任务的类完成时执行的逻辑
  1. namespace App\Models\Logic;
  2. use Queue\JobHandler;
  3. use Swoft\Bean\Annotation\Mapping\Bean;
  4. /**
  5. * @Bean(scope=Bean::PROTOTYPE)
  6. * Class QueueLogic
  7. * @package App\Models\Logic
  8. */
  9. class QueueLogic extends JobHandler
  10. {
  11. protected function perform()
  12. {
  13. echo 'JobId: ' . $this->id . PHP_EOL;
  14. var_dump($this->args);
  15. }
  16. }

用法

  1. use Queue\DelayQueue;
  2. /**
  3. * 添加
  4. *
  5. * @param string $topic 一组相同类型Job的集合(队列)。
  6. * @param string $jobName job任务的类名,是延迟队列里的基本单元。与具体的Topic关联在一起。
  7. * @param integer $delay job任务延迟时间 传入相对于当前时间的延迟时间即可 例如延迟10分钟执行 传入 10*60
  8. * @param integer $ttr job任务超时时间,保证job至少被消费一次,如果时间内未删除Job方法,则会再次投入ready队列中
  9. * @param array $args 执行Job任务时传递的可选参数。
  10. * @param string $jobId 任务id可传入或默认生成
  11. */
  12. DelayQueue::enqueue('test',QueueLogic::class,5,10,['order_id'=>uniqid('queue_')]);
  13. //获取
  14. DelayQueue::get($jobId);
  15. //删除
  16. DelayQueue::remove($jobId);

[future]

用redis的发布订阅来处理消息

由于现在swoft的发布订阅出现问题,暂时不能用发布订阅来处理消息