项目作者: mcmathja

项目描述 :
Efficient and reliable background processing for Go
高级语言: Go
项目地址: git://github.com/mcmathja/curlyq.git
创建时间: 2020-01-23T21:19:21Z
项目社区:https://github.com/mcmathja/curlyq

开源协议:MIT License

下载


CurlyQ

GoDoc
Build Status
GoCover
Go Report Card
License

CurlyQ provides a simple, easy-to-use interface for performing background processing in Go. It supports scheduled jobs, job deduplication, and configurable concurrent execution out of the box.

Quickstart

  1. package main
  2. import (
  3. "context"
  4. "log"
  5. cq "github.com/mcmathja/curlyq"
  6. )
  7. func main() {
  8. // Create a new producer
  9. producer := cq.NewProducer(&cq.ProducerOpts{
  10. Address: "localhost:6379",
  11. Queue: "testq",
  12. })
  13. // Use the producer to push a job to the queue
  14. producer.Perform(cq.Job{
  15. Data: []byte("Some data!"),
  16. })
  17. // Create a new consumer
  18. consumer := cq.NewConsumer(&cq.ConsumerOpts{
  19. Address: "localhost:6379",
  20. Queue: "testq",
  21. })
  22. // Consume jobs from the queue with the consumer
  23. consumer.Consume(func(ctx context.Context, job cq.Job) error {
  24. log.Println(string(job.Data))
  25. return nil
  26. })
  27. }

The Basics

CurlyQ exposes three key types: Jobs, Producers, and Consumers.

Jobs

A Job wraps your data. In most cases, that’s all you’ll ever need to know about it:

  1. job := cq.Job{
  2. Data: []byte("Some data."),
  3. }

Every Job also exposes an ID field that uniquely identifies it among all jobs in the queue, and an Attempt field representing how many times it has been attempted so far.

Producers

A Producer pushes jobs on to the queue. Create one by providing it with the address of your Redis instance and a queue name:

  1. producer := cq.NewProducer(&cq.ProducerOpts{
  2. Address: "my.redis.addr:6379",
  3. Queue: "queue_name",
  4. })

You can also provide an existing go-redis instance if you would like to configure the queue to run on a more advanced Redis configuration or set up your own retry and timeout logic for network calls:

  1. import "github.com/go-redis/redis/v7"
  2. client := redis.NewClient(&redis.Options{
  3. Password: "p@55vvoRd",
  4. DB: 3,
  5. MaxRetries: 2,
  6. })
  7. producer := cq.NewProducer(&cq.ProducerOpts{
  8. Client: client,
  9. Queue: "queue_name",
  10. })

Running producer.Perform(job) will add a job to the queue to be run asynchronously. You can also schedule a job to be enqueued at a particular time by running producer.PerformAt(time, job), or after a certain wait period by running producer.PerformAfter(duration, job). All of the Perform methods return the ID assigned to the job and an error if one occurred.

You can deduplicate jobs by pre-assigning them IDs:

  1. job := cq.Job{
  2. ID: "todays_job",
  3. }
  4. // Enqueue the job
  5. producer.PerformAfter(10 * time.Second, job)
  6. // Does nothing, because a job with the same ID is already on the queue
  7. producer.Perform(job)

Once a job has been acknowledged, its ID becomes available for reuse.

See the documentation for ProducerOpts for more details about available configuration options.

Consumers

A Consumer pulls jobs off the queue and executes them using a provided handler function. Create one with the same basic options as a Producer:

  1. consumer := cq.NewConsumer(&cq.ConsumerOpts{
  2. Queue: "queue_name",
  3. // With an address:
  4. Address: "my.redis.addr:6379",
  5. // With a preconfigured go-redis client:
  6. Client: redisClient,
  7. })

You start a consumer by running its Consume method with a handler function:

  1. consumer.Consume(func(ctx context.Context, job cq.Job) error {
  2. log.Println("Job %s has been processed!")
  3. return nil
  4. })

If the provided handler function returns nil, the job is considered to have been processed successfully and is removed from the queue. If the handler returns an error or panics, the job is considered to have failed and will be retried or killed based on how many times it has been attempted.

Consume will continue to process jobs until your application receives an interrupt signal or the consumer encounters a fatal error. Fatal errors only occur when the consumer is unable to communicate with Redis for an essential operation, such as updating the status of a job in flight.

See the documentation for ConsumerOpts for more details about available configuration options.