Skip to content

Taskrunner is an asynchronous task queue with Redis Streams backend

License

Notifications You must be signed in to change notification settings

soroosh-tanzadeh/taskrunner

Repository files navigation

TaskRunner

TaskRunner is a robust and efficient Go library designed to leverage the power of Redis Streams for distributed task execution.

Key Features

  • Asynchronous task Processing: TaskRunner enables the scheduling and execution of tasks asynchronously, allowing your application to remain responsive and performant under heavy load.
  • Scalable and Distributed: Designed to scale horizontally, this library can expand its capacity by adding more workers without impacting the existing infrastructure. It supports seamless distribution of tasks across a cluster of servers, optimizing resource usage and balancing the load.
  • Fault Tolerance: Includes features for automatic task retries and error handling, ensuring that system failures do not lead to task loss or inconsistencies.
  • Simple API: Offers a straightforward and intuitive API that makes it easy to integrate and use within your existing Go applications.
  • Real-time Monitoring and Logging: Integrates monitoring capabilities to track task status and performance metrics in real-time, alongside comprehensive logging for debugging and audit trails.
  • Task Scheduler: The Task Scheduler allows the scheduling and execution of delayed tasks using Redis Sorted Sets (ZSET) and Redis Streams. Tasks are scheduled, enqueued, and executed by workers in a distributed manner, ensuring scalability and reliability.

Task Queue

Simple Example

package main

import (
	"context"
	"fmt"
	"strconv"
	"sync"
	"time"

	"github.com/soroosh-tanzadeh/taskrunner/redisstream"
	"github.com/soroosh-tanzadeh/taskrunner/runner"
	"github.com/redis/go-redis/v9"
)

func main() {
	rdb := redis.NewClient(&redis.Options{
		Addr:     "127.0.0.1:6379",
		DB:       5,
		PoolSize: 100,
	})
	wg := sync.WaitGroup{}

	queue := redisstream.NewRedisStreamMessageQueue(rdb, "example_tasks", "default", time.Second*30, true)
	taskRunner := runner.NewTaskRunner(runner.TaskRunnerConfig{
		BatchSize:         10,
		ConsumerGroup:     "example",
		ConsumersPrefix:   "default",
		NumWorkers:        10,
		ReplicationFactor: 1,
		LongQueueHook: func(s runner.Stats) {
			fmt.Printf("%v \n", s)
		},
		LongQueueThreshold: time.Second * 30,
	}, rdb, queue)

	taskRunner.RegisterTask(&runner.Task{
		Name:     "exampletask",
		MaxRetry: 10,
		Action: func(ctx context.Context, payload any) error {
			fmt.Printf("Hello from example task %s\n", payload)
			return nil
		},
		Unique: false,
	})

	wg.Add(1)
	go func() {
		defer wg.Done()
		taskRunner.Start(context.Background())
	}()

	for i := 0; i < 100; i++ {
		taskRunner.Dispatch(context.Background(), "exampletask", strconv.Itoa(i))
	}

	wg.Wait()

}

TaskRunnerConfig Parameters

Below is a detailed description of each parameter in the TaskRunnerConfig struct.

Parameter Type Description
BatchSize int The number of tasks to be processed in a single batch by the consumer.
ConsumerGroup string The name of the consumer group used in Redis Streams to differentiate between different sets of consumers.
ConsumersPrefix string A prefix used for naming consumers within the group, aiding in identification and management.
NumWorkers int The number of consumer workers that will concurrently process tasks from the queue.
ReplicationFactor int ReplicationFactor Number of pod replicas configured. Let T_avg be the average execution time of task, Q_len be the length of the queue, and W_num be the number of workers. The total execution time for the queue is estimated as (T_avg * Q_len) / (W_num * ReplicationFactor).
FailedTaskHandler FailedTaskHandler When task can not be retried any more, this function will be called
LongQueueHook LongQueueHook A hook function or callback that is triggered when the task queue length exceeds a specified threshold.
LongQueueThreshold time.Duration The duration or length of the queue that triggers the LongQueueHook when exceeded.

This configuration is critical for efficiently managing and operating asynchronous task processing systems, especially in environments where task distribution and fault tolerance are key.

Recommendation: For optimal performance with long-running tasks, it is recommended to use a small BatchSize coupled with a large number of NumWorkers. This configuration ensures that tasks are distributed more evenly across workers, reducing bottlenecks and improving overall efficiency.

Task Paramters

Below is a table detailing the fields of the Task struct:

Field Type Description
Name string The name of the task.
MaxRetry int The maximum number of times the task is retried if it fails initially.
ReservationTimeout time.Duration The duration after which a worker must send a heartbeat if the task is still running. This prevents the task from being reassigned or reclaimed by another worker.
Action TaskAction The action associated with the task.
Unique bool If set to true, the task will not be dispatched if an identical task (by name and UniqueKey) is already in the queue and has not been completed.
UniqueKey UniqueKeyFunc Specifies a function to generate a unique key for the task. If Unique is true, tasks with the same unique key must not be in the queue simultaneously.
UniqueFor int64 The time in seconds that must elapse since a similarly unique task was last enqueued before a new task with the same name and unique key can be dispatched.

Task Scheduler

The Task Scheduler allows the scheduling and execution delayed tasks using Redis Sorted Sets (ZSET) and Redis Streams.

Note Scheduler Cycle is 5 second

How it works?

package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/redis/go-redis/v9"
	"github.com/soroosh-tanzadeh/taskrunner/redisstream"
	"github.com/soroosh-tanzadeh/taskrunner/runner"
)

func main() {
	rdb := redis.NewClient(&redis.Options{
		Addr:     "127.0.0.1:6379",
		DB:       5,
		PoolSize: 100,
		Password: "123456",
	})
	wg := sync.WaitGroup{}

	queue := redisstream.NewRedisStreamMessageQueue(rdb, "example_tasks", "default", time.Second*30, true, false)
	taskRunner := runner.NewTaskRunner(runner.TaskRunnerConfig{
		BatchSize:         10,
		ConsumerGroup:     "example",
		ConsumersPrefix:   "default",
		NumWorkers:        10,
		ReplicationFactor: 1,
		LongQueueHook: func(s runner.Stats) {
			fmt.Printf("%v \n", s)
		},
		LongQueueThreshold: time.Second * 30,
	}, rdb, queue)

	taskRunner.RegisterTask(&runner.Task{
		Name:     "exampletask",
		MaxRetry: 10,
		Action: func(ctx context.Context, payload any) error {
			fmt.Printf("Hello from example task: %s\n", payload)
			return nil
		},
		Unique: false,
	})

	ctx, cancel := context.WithCancel(context.Background())

	wg.Add(1)
	go func() {
		defer wg.Done()
		taskRunner.Start(ctx)
	}()

	wg.Add(1)
	go func() {
		defer wg.Done()
		taskRunner.StartDelayedSchedule(ctx, 1000)
	}()

	taskRunner.DispatchDelayed(context.Background(), "exampletask", "I'm delyed 1", time.Second*5)
	taskRunner.DispatchDelayed(context.Background(), "exampletask", "I'm delyed 2", time.Second*10)

	<-time.After(time.Second * 15)

	cancel()

	wg.Wait()
}