项目作者: egret135

项目描述 :
基于php实现kafka和redis的队列
高级语言: PHP
项目地址: git://github.com/egret135/queue.git
创建时间: 2020-04-08T01:09:23Z
项目社区:https://github.com/egret135/queue

开源协议:

下载


queue

PHP的一个队列包,目前已有kafka和redis两种队列驱动,实现方式是用swoole的进程池 + symfony的console + enqueue的队列包

kafka队列使用需要先安装 rdkafka 拓展

redis队列的使用需要有一个redis的操作包,这里指定了predis

后续可能会继续拓展新的队列驱动,目前只存在这两种

版本要求

  1. "php": "^7.1.3",
  2. "ext-swoole": "^2.0 || ^3.0 || ^4.0",

安装教程

建议在composer.json里加上

  1. {
  2. "require": {
  3. xxxxxx,
  4. "egret/queue": "^1.0"
  5. },
  6. "config": {
  7. "bin-dir": "bin"
  8. }
  9. }

```shell script
composer update egret/queue

  1. 当然也可以直接composer require,但是这样可执行文件就不会放到跟composer.json同级的bin目录下,比较难找
  2. ```shell script
  3. composer require egret/queue

使用指引

创建一个简单的redis队列

  1. <?php
  2. namespace Egret\Queue\Test;
  3. use Egret\Queue\AbstractQueueCommand;
  4. use Monolog\Handler\RotatingFileHandler;
  5. use Monolog\Logger;
  6. use Egret\Queue\DingTalk;
  7. // 继承这个抽象类
  8. class TestRedisQueue extends AbstractQueueCommand
  9. {
  10. protected function configure()
  11. {
  12. // 调用父类的configure方法,必须要有
  13. parent::configure();
  14. // 设置队列名称,必须要有,设置队列描述(非必须)
  15. $this->setName('redis')->setDescription('测试redis队列');
  16. // 设置队列驱动,不设置默认redis,目前只支持redis/kafka
  17. $this->driver = 'redis';
  18. // 设置redis连接的配置,这里测试时用,相关的配置信息往下看
  19. $this->config = [
  20. 'host' => '127.0.0.1',
  21. 'port' => 6379,
  22. 'password' => '123456',
  23. 'database' => 10,
  24. 'predis_options' => [
  25. // 指定缓存的前缀
  26. 'prefix' => 'queue:test:'
  27. ]
  28. ];
  29. // 设置pid文件的存放路径,可以不设置,不设置不能用命令停止队列,这里配置的是我测试的路径,大家按自己的情况设置值
  30. $this->pidPath = CONSOLE_PATH . '/test/runtime';
  31. }
  32. // 设置Monolog的日志,设置之后会写入任务的相关日志,可不设置
  33. protected function getLogger()
  34. {
  35. $logger = new Logger('queue');
  36. $rotating = new RotatingFileHandler(CONSOLE_PATH . '/test/runtime/info.log', 30);
  37. $logger->pushHandler($rotating);
  38. return $logger;
  39. }
  40. // 配置钉钉机器人,任务异常时会报警到钉钉上,可不设置
  41. protected function getDingTalk()
  42. {
  43. $config = [
  44. 'token' => 'xxxx',
  45. 'secret' => 'xxxx',
  46. 'sign' => true
  47. ];
  48. return new DingTalk($config['token'], $config['sign'], $config['secret']);
  49. }
  50. }

启动队列命令

```shell script
./queue 队列别名 start topic名称(redis的键/kafka的topic)

Options:
-w, —workerNum=WORKERNUM 进程数量 [default: 1]
-r, —maxRetry=MAXRETRY 任务失败最大重试次数 [default: 3]
-t, —timeout=TIMEOUT 单次等待超时时间,单位毫秒 [default: 30000]
-c, —ding_trace_num=DING_TRACE_NUM 队列异常钉钉通知展示调试信息数,一般可以不用修改 [default: 5]
-m, —ding_notice_mobile=DING_NOTICE_MOBILE 队列异常钉钉通知@人,要用手机号,多个用,隔开,格式是13400000000,13500000000: [default: “”]
-d, —daemon 以守护进程运行

启动上面的队列,指定topic为test

./queue redis start test

  1. 查看队列
  2. ```shell script
  3. ./queue redis status
  4. Queue: redis
  5. PID file: /queue-redis.pid, PID: 0
  6. +-----------+-------+-------+------+--------+------------------------------+
  7. | USER | PID | RSS | STAT | START | COMMAND |
  8. +-----------+-------+-------+------+--------+------------------------------+
  9. | zbangtang | 11993 | 6720 | S+ | 3:28PM | php ./queue redis start test |
  10. | zbangtang | 11984 | 16056 | S+ | 3:28PM | php ./queue redis start test |
  11. +-----------+-------+-------+------+--------+------------------------------+

停止队列,需要有PID文件才可以

```shell script
./queue redis stop

  1. 上面就完成了一个消费者队列的创建
  2. 下面展示一下生产者,先创建一个工作类,所有工作类都必须继承AbstractJob
  3. ```php
  4. <?php
  5. namespace Egret\Queue\Test;
  6. use Egret\Queue\AbstractJob;
  7. class TestJob extends AbstractJob
  8. {
  9. protected $name;
  10. protected $flag;
  11. public function __construct($name, $flag = true)
  12. {
  13. $this->name = $name;
  14. $this->flag = $flag;
  15. }
  16. public function execute()
  17. {
  18. echo sprintf('queue %s is run' . PHP_EOL, $this->name);
  19. return $this->flag;
  20. }
  21. }

Redis生产者代码

  1. $job = new TestJob($this->getName(), false);
  2. (new RedisQueue($redisConf, $topic))->produce($job);

Kafka生产者代码

  1. $job = new TestJob($this->getName(), false);
  2. $kafkaConf = [
  3. 'global' => [
  4. 'group.id' => 'test-group',
  5. 'metadata.broker.list' => '127.0.0.1:9092',
  6. 'enable.auto.commit' => 'false',
  7. ],
  8. 'topic' => [
  9. 'auto.offset.reset' => 'latest',
  10. ],
  11. ];
  12. (new KafkaQueue($kafkaConf, $topic))->produce($job);

到这里就完成了队列的生产者和消费者的创建了,还差一个步骤就是把消费者加入到queue命令行中,用命令便可以启动和管理

此处采用文件加载的方式,使用的时候可以在composer.json同级的目录,或者同级的app或者application亦或者src目录下创建console.php,内容示例如下:

  1. <?php
  2. use Egret\Queue\Test\TestConsumer;
  3. use Egret\Queue\Test\TestCustomErrorQueue;
  4. use Egret\Queue\Test\TestDingTalkQueue;
  5. use Egret\Queue\Test\TestKafkaQueue;
  6. use Egret\Queue\Test\TestLoggerQueue;
  7. use Egret\Queue\Test\TestRedisQueue;
  8. return [
  9. TestDingTalkQueue::class,
  10. TestKafkaQueue::class,
  11. TestLoggerQueue::class,
  12. TestRedisQueue::class,
  13. TestConsumer::class,
  14. TestCustomErrorQueue::class,
  15. ];

必须把console.php存放到指定的几个地方之一,不然读取不到,文件存放位置示例:

```shell script
— 项目demo
  — app文件夹
    — 可在app文件夹下创建console.php
  — application文件夹
    — 可在application文件夹下创建console.php
  — src文件夹
    — 可在src文件夹下创建console.php
  — vendor
  — 可在这一层目录下创建console.php
  — composer.json
  — composer.lock

  1. 创建完后执行./queue便可以看到相关的队列了
  2. ```shell script
  3. ./queue
  4. ________
  5. \_____ \ __ __ ____ __ __ ____
  6. / / \ \| | \_/ __ \| | \_/ __ \
  7. / \_/. \ | /\ ___/| | /\ ___/
  8. \_____\ \_/____/ \___ >____/ \___ >
  9. \__> \/ \/
  10. v1.0.0
  11. Usage:
  12. command [options] [arguments]
  13. Options:
  14. -h, --help Display this help message
  15. -q, --quiet Do not output any message
  16. -V, --version Display this application version
  17. --ansi Force ANSI output
  18. --no-ansi Disable ANSI output
  19. -n, --no-interaction Do not ask any interactive question
  20. -v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug
  21. Available commands:
  22. consumer 队列生产者
  23. dingtalk 测试钉钉报警
  24. error 设置自定义的错误处理
  25. help Displays help for a command
  26. kafka 测试kafka队列
  27. list Lists commands
  28. logger 测试monolog日志
  29. redis 测试redis队列