Skip to content

Gocelery is a task queue implementation for Go modules used to asynchronously execute work outside the HTTP request-response cycle. Celery is an implementation of the task queue concept.

License

Notifications You must be signed in to change notification settings

gopy-art/gocelery

Repository files navigation

GitHub go.mod Go version of a Go module GitHub go.mod Go version of a Go module GitHub go.mod Go version of a Go module GitHub go.mod Go version of a Go module

gocelery

Gocelery is a task queue implementation for Go modules used to asynchronously execute work outside the HTTP request-response cycle. Celery is an implementation of the task queue concept.

How it works?

this package has two side for does its work!
1 - controller
2 - worker

controller : it implements functions which work with broker and backend , and the responsebilty of them are to declare and insert data to the broker!

worker : it implements functions which work with broker and backend, and the responsebilty of them are to read from broker and set result to the backend!

Note : we have two concept in this package, broker and backend. broker is a system that we can share and read our data from that. backend is a system that we can set our results from package in that, for example the result of our workers!

Distributed Systems you can use

  1. RabbitMQ = RabbitMQ is an open source message agent software that implements the Advanced Message Queuing protocol.
    The files that impelement rabbitMQ celery in this package are :

    • amqp_backend.go
    • amqp_broker.go
    • amqp.go
  2. Redis = Redis is a very high-performance extensible key-value database management system written in C ANSI. It is part of the NoSQL movement and aims to provide the highest possible performance.
    The files that impelement redis celery in this package are :

    • redis_backend.go
    • redis_broker.go

How to use it?

if you want to use this package you can follow this steps :

  • clone this module
  • make gocelery folder in your project
  • copy and paste all of the files to the gocelery folder
  • run go mod tidy command in your terminal now enjoy!

Implement controller code example

you can write controller for redis and rabbitMQ, and as you read at the top, the controller will set and declare the messages and data to the broker and backend.
code example with RabbitMQ :

	// declare the url of the rabbitMQ
	rabbit_url := fmt.Sprintf("amqp://%s:%s@%s/", username, password, url)

	// make the backend and broker with rabbitMQ
	CeleryBackend := gocelery.NewAMQPCeleryBackend(rabbit_url)
	CeleryBroker := gocelery.NewAMQPCeleryBroker(rabbit_url, "test", true)

	// initialize celery client
	client, err := gocelery.NewCeleryClient(
		CeleryBroker,
		CeleryBackend,
		1, // number of worker for the client
	)
	if err != nil {
		log.Println("error in client , ", err)
	}
	////// Error handling

	// declare the task name for the message
	taskName := fmt.Sprintf("worker.%s", "test")

	for v := range 3 {
		_ = v
		// prepare the message for set to the queue
		msg := make(map[string]interface{})
		msg["a"] = rand.Intn(10)
		msg["b"] = rand.Intn(10)

		// send the message to the rabbitMQ
		_, err = client.DelayKwargs(taskName, msg)
		if err != nil {
			panic(err)
		}
		time.Sleep(1 * time.Second)
	}

	client.WaitForStopWorker()

code example with Redis :

    // create redis connection pool
	redisPool := &redis.Pool{
		MaxIdle:     3,                 // maximum number of idle connections in the pool
		MaxActive:   0,                 // maximum number of connections allocated by the pool at a given time
		IdleTimeout: 240 * time.Second, // close connections after remaining idle for this duration
		Dial: func() (redis.Conn, error) {
			c, err := redis.DialURL("redis://")
			if err != nil {
				return nil, err
			}
			return c, err
		},
		TestOnBorrow: func(c redis.Conn, t time.Time) error {
			_, err := c.Do("PING")
			return err
		},
	}

	// initialize celery client
	client, err := gocelery.NewCeleryClient(
		gocelery.NewRedisBroker(redisPool),
		&gocelery.RedisCeleryBackend{Pool: redisPool},
		1, // number of workers
	)
	if err != nil {
		log.Println("error in client , ", err)
	}
	////// Error handling

	// declare the task name for the message
	taskName := fmt.Sprintf("worker.%s", "test")

	for v := range 10 {
		_ = v
		// prepare the message for set to the queue
		msg := make(map[string]interface{})
		msg["a"] = rand.Intn(10)
		msg["b"] = rand.Intn(10)

		// send the message to the rabbitMQ
		_, err = client.DelayKwargs(taskName, msg)
		if err != nil {
			panic(err)
		}
		time.Sleep(1 * time.Second)
	}

	client.WaitForStopWorker()

Implement worker code example

you can write worker for redis and rabbitMQ too, and as you know worker will read from redis or rabbitMQ and set the result to the redis or rabbitMQ.
code example with RabbitMQ :

	// exampleAddTask is integer addition task
	// with named arguments
	type ExampleAddTask struct {
		TaskID string
		a      int
		b      int
	}

	// this function is for reading the argument that has been passed to the message from 'Kwargs'
	func (a *ExampleAddTask) ParseKwargs(kwargs map[string]interface{}) error {
		kwargA, ok := kwargs["a"]
		if !ok {
			return fmt.Errorf("undefined kwarg a")
		}
		kwargAFloat, ok := kwargA.(float64)
		if !ok {
			return fmt.Errorf("malformed kwarg a")
		}
		a.a = int(kwargAFloat)
		kwargB, ok := kwargs["b"]
		if !ok {
			return fmt.Errorf("undefined kwarg b")
		}
		kwargBFloat, ok := kwargB.(float64)
		if !ok {
			return fmt.Errorf("malformed kwarg b")
		}
		a.b = int(kwargBFloat)
		return nil
	}

	func (a *ExampleAddTask) ParseId(id string) error {
		a.TaskID = id
		return nil
	}

	// The main function that will be execute
	func (a *ExampleAddTask) RunTask() (interface{}, error) {
		result := a.a + a.b
		fmt.Printf("Task with uuid %v has result %v \n", a.TaskID, result)
		return result, nil
	}

	// declare the url of the rabbitMQ
	rabbit_url := fmt.Sprintf("amqp://%s:%s@%s/", "guest", "guest", "localhost:5672")

	// make the backend and broker with rabbitMQ
	CeleryBackend := gocelery.NewAMQPCeleryBackend(rabbit_url)
	CeleryBroker := gocelery.NewAMQPCeleryBroker(rabbit_url, "test", true)

	// initialize celery client
	worker := gocelery.NewCeleryWorker(
		CeleryBroker,
		CeleryBackend,
		3, // number of worker for the client
	)

	ch := make(chan int)
	// register task
	worker.Register("worker.test", &ExampleAddTask{})

	// start workers (non-blocking call)
	worker.StartWorker()

	// wait for client request
	<-ch

	// stop workers gracefully (blocking call)
	worker.StopWorker()

code example with Redis :

	// exampleAddTask is integer addition task
	// with named arguments
	type ExampleAddTask struct {
		TaskID string
		a      int
		b      int
	}

	// this function is for reading the argument that has been passed to the message from 'Kwargs'
	func (a *ExampleAddTask) ParseKwargs(kwargs map[string]interface{}) error {
		kwargA, ok := kwargs["a"]
		if !ok {
			return fmt.Errorf("undefined kwarg a")
		}
		kwargAFloat, ok := kwargA.(float64)
		if !ok {
			return fmt.Errorf("malformed kwarg a")
		}
		a.a = int(kwargAFloat)
		kwargB, ok := kwargs["b"]
		if !ok {
			return fmt.Errorf("undefined kwarg b")
		}
		kwargBFloat, ok := kwargB.(float64)
		if !ok {
			return fmt.Errorf("malformed kwarg b")
		}
		a.b = int(kwargBFloat)
		return nil
	}

	func (a *ExampleAddTask) ParseId(id string) error {
		a.TaskID = id
		return nil
	}

	// The main function that will be execute
	func (a *ExampleAddTask) RunTask() (interface{}, error) {
		result := a.a + a.b
		fmt.Printf("Task with uuid %v has result %v \n", a.TaskID, result)
		return result, nil
	}

	// create redis connection pool
	redisPool := &redis.Pool{
		MaxIdle:     3,                 // maximum number of idle connections in the pool
		MaxActive:   0,                 // maximum number of connections allocated by the pool at a given time
		IdleTimeout: 240 * time.Second, // close connections after remaining idle for this duration
		Dial: func() (redis.Conn, error) {
			c, err := redis.DialURL("redis://")
			if err != nil {
				return nil, err
			}
			return c, err
		},
		TestOnBorrow: func(c redis.Conn, t time.Time) error {
			_, err := c.Do("PING")
			return err
		},
	}

	// initialize celery client
	worker := gocelery.NewCeleryWorker(
		gocelery.NewRedisBroker(redisPool),
		&gocelery.RedisCeleryBackend{Pool: redisPool},
		6, // number of workers
	)

	////// Error handling

	ch := make(chan int)
	// register task
	worker.Register("worker.test", &ExampleAddTask{})

	// start workers (non-blocking call)
	worker.StartWorker()

	// wait for client request
	<-ch

	// stop workers gracefully (blocking call)
	worker.StopWorker()

NOTE : you can also use both distributed systems (redis, rabbitMQ) at the same time. for example you can use rabbitMQ for broker and redis for backend!

Structure of messages in broker

the message type in broker is json and its in this structure :

	// TaskMessage is celery-compatible message
	type TaskMessage struct {
		ID      string                 `json:"id"`
		Task    string                 `json:"task"`
		Args    []interface{}          `json:"args"`
		Kwargs  map[string]interface{} `json:"kwargs"`
		Retries int                    `json:"retries"`
		ETA     *string                `json:"eta"`
		Expires *time.Time             `json:"expires"`
	}

Note : Celery must be configured to use json instead of default pickle encoding. This is because Go currently has no stable support for decoding pickle objects. Pass below configuration parameters to use json.

I hope you can enjoy using this package!

About

Gocelery is a task queue implementation for Go modules used to asynchronously execute work outside the HTTP request-response cycle. Celery is an implementation of the task queue concept.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages