An efficient Go task queue package, facilitating the seamless orchestration and execution of tasks. Alternative to machinery and Celery.
Below is a high-level overview of how Paota works:
The process or component responsible for generating tasks and putting them onto the task queue. This could be any part of your system that needs to perform background or asynchronous work.
Paota uses a task queue as a central mechanism for managing and distributing tasks. The task queue holds the tasks until they are picked up by consumers for processing. The queue serves as a buffer, decoupling the production of tasks from their consumption.
The broker is a service or component responsible for managing the task queue. It receives tasks from the publisher and makes them available for consumption by consumers. The broker ensures that tasks are delivered reliably and efficiently to the consumers.
The consumer is a process or component that pulls tasks from the task queue and executes them. Paota allows for multiple consumers to run concurrently, enabling parallel processing of tasks. Consumers can be distributed across multiple machines for horizontal scaling.
Each consumer runs a worker pool, which is a group of worker processes that execute tasks concurrently. The worker pool allows for efficient utilization of resources by processing multiple tasks simultaneously. The number of workers in a pool can be adjusted based on the available resources and workload.
Tasks are processed by the worker pool concurrently. Each task represents a unit of work that needs to be performed asynchronously. The results of the task execution can be used to update the state of the system or trigger additional actions.
Paota may use storage to persist task-related information, ensuring durability and fault tolerance. This can include storing task state, metadata, and other relevant information. The choice of storage can vary, and Paota supports different storage backends.
Paota is designed to provide high availability and support horizontal scaling. By distributing tasks across multiple consumers and worker pools, the system can handle increased workloads and provide fault tolerance. Additionally, the use of multiple brokers and storage solutions contributes to the overall resilience of the system.
In summary, Paota facilitates the asynchronous processing of tasks in a distributed environment, allowing for efficient utilization of resources, high availability, and horizontal scaling. It is a versatile tool for building scalable and responsive systems that can handle background and asynchronous workloads.
- Middleware for task
- Logging format
- Middleware Support for WorkerPool (requestId in log, authentication, logging)
- MongoBackendStorage
- Pre and Post Hook at Task Level
- Workflow support
- Chaining of Task support
- Coverage Check integration with sonarqube
- API for task management (create/delete/update/get/list)
- Release first version 1.0.0.
- Integrate third-party API for additional functionality.
- Conduct a security audit.
- UI/UX for better engagement.
- Multi Queue
- Ratelimit based consuming
- Consume over Webhook
- Webhook for task events
- CI/CD integration to provide the docker image over dockerhub
- Standard tasks based on ongoing challenges across industry
- SDK for PHP/NodeJS
- APM hook for newrelic
- Custom Job Signature
- Cloud based serverless task execution using api
- Initial project setup.
- AMQP Connection Pool
- Publish Task
- Logger Interface
- WorkerPool Supported
- Consumer with concurrency added
- Consumer task processor based on defined task added
- CircleCI Integration with generating mock and running unit test
- Unit test and code coverage
- Retry for task for amqp broker
- SAST check integration with sonarcloud and codeql
- Schedule Task
- Go (version 1.21 or higher)
- RabbitMQ (installation guide: https://www.rabbitmq.com/download.html)
The Config
struct holds all configuration options for Paota. It includes the following parameters:
- Broker: The message broker to be used. Currently, only "amqp" is supported.
- Store: The type of storage to be used (optional).
- TaskQueueName: The name of the task queue. Default value is "paota_tasks".
- StoreQueueName: The name of the storage queue (optional).
- AMQP: Configuration for the AMQP (RabbitMQ) connection. See AMQP Configuration for details.
Here's an example of how you can set up the main configuration using environment variables:
export BROKER=amqp
export STORE=mongodb
export QUEUE_NAME=paota_tasks
export STORE_QUEUE_NAME=your_store_queue_name
Before using the paota
package, it's essential to understand the AMQP (Advanced Message Queuing Protocol) configuration used by the package. The configuration is encapsulated in the AMQPConfig
struct, which includes the following parameters:
- Url: The connection URL for RabbitMQ. It is a required field.
- Exchange: The name of the RabbitMQ exchange to be used for task queuing. Default value is "paota_task_exchange".
- ExchangeType: The type of the RabbitMQ exchange. Default value is "direct".
- QueueDeclareArgs: Arguments used when declaring a queue. It is of type
QueueDeclareArgs
, which is a map of string to interface{}. - QueueBindingArgs: Arguments used when binding to the exchange. It is of type
QueueBindingArgs
, which is a map of string to interface{}. - BindingKey: The routing key used for binding to the exchange. Default value is "paota_task_binding_key".
- PrefetchCount: The number of messages to prefetch from the RabbitMQ server. It is a required field.
- AutoDelete: A boolean flag indicating whether the queue should be automatically deleted when there are no consumers. Default is false.
- DelayedQueue: The name of the delayed queue for handling delayed tasks. Default value is "paota_task_delayed_queue".
- ConnectionPoolSize: The size of the connection pool for RabbitMQ connections. Default value is 5.
- HeartBeatInterval: The interval (in seconds) at which heartbeats are sent to RabbitMQ. Default value is 10.
Here's an example of how you can configure the AMQPConfig
using environment variables:
export URL=amqp://guest:guest@localhost:55005/
export EXCHANGE=paota_task_exchange
export EXCHANGE_TYPE=direct
export BINDING_KEY=paota_task_binding_key
export PREFETCH_COUNT=5
export AUTO_DELETE=false
export DELAYED_QUEUE=paota_task_delayed_queue
export CONNECTION_POOL_SIZE=5
export HEARTBEAT_INTERVAL=10
For MongoDB integration, the MongoDBConfig
struct is used:
type MongoDBConfig struct {
Client *mongo.Client
Database string
}
The Signature
struct represents a single task invocation and has the following fields:
UUID
: Unique identifier for the task.Name
: Name of the task.Args
: List of arguments for the task, where eachArg
has the following structure:Name
: Name of the argument.Value
: Value of the argument.
RoutingKey
: Directs a task to the correct queue. If left empty, it defaults to the queue's binding key for direct exchanges, or the queue name for other exchange types.Priority
: Priority of the task.RetryCount
: Defines how many times a failed task will be retried (default is 0). Each retry will be spaced out, with the delay increasing after each failure.RetryTimeout
: Sets the wait time before retrying a task. By default, it increases the wait time after each failed attempt using the Fibonacci sequence.IgnoreWhenTaskNotRegistered
: Flag to indicate whether to ignore the task when not registered.
- User-Defined Tasks:
- function_name(arg *task.Signature) error
In order to enqueue jobs, you'll need to make a WorkerPool. publish jobs to broker.
example/producer/main.go
In order to process jobs, you'll need to make a WorkerPool. Add jobs to the pool, and start the pool.
example/consumer/main.go
mockery --inpackage --all
All these benchmarks are done in a notebook with these configuration:
Processor: 1.8 GHz Dual-Core Intel Core i5
Memory: 8 GB 1600 MHz DDR3
The jobs are almost no-op jobs: they simply return nil. Rabbitmq , Consumer and Publisher running on same server
We have acheived benchmarking for 50 publisher publishing request and 1 consumer worker consuming the request at speed of 7000 request per second (concurrency=10 and PrefetchCount=100). If you want to achieve more throughput concurrency can be increased to any extent.
This performance test was conducted to evaluate the throughput and processing capabilities of the system using Paota to consume data from RabbitMQ and insert it into MongoDB.
System Specs: MacBook with 16GB RAM
Local Hosted Services: RabbitMQ, MongoDB
Throughput Rate: 12k messages per second.
Total Records Processed: 10 Lakh data records.
Thank you for flying Paota!