Added asynq and a task client to the container to faciliate task queues.

This commit is contained in:
mikestefanello 2022-02-02 21:24:52 -05:00
parent a82ed9c6d0
commit c43f62a570
12 changed files with 392 additions and 34 deletions

138
README.md
View File

@ -3,8 +3,10 @@
[![Go Report Card](https://goreportcard.com/badge/github.com/mikestefanello/pagoda)](https://goreportcard.com/report/github.com/mikestefanello/pagoda)
[![Test](https://github.com/mikestefanello/pagoda/actions/workflows/test.yml/badge.svg)](https://github.com/mikestefanello/pagoda/actions/workflows/test.yml)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![Go Reference](https://pkg.go.dev/badge/github.com/mikestefanello/pagoda.svg)](https://pkg.go.dev/github.com/mikestefanello/pagoda)
[![GoT](https://img.shields.io/badge/Made%20with-Go-1f425f.svg)](https://go.dev)
<p align="center"><img alt="Logo" src="https://user-images.githubusercontent.com/552328/147838644-0efac538-a97e-4a46-86a0-41e3abdf9f20.png" height="200px"/></p>
## Table of Contents
@ -80,6 +82,11 @@
* [Get data](#get-data)
* [Flush data](#flush-data)
* [Flush tags](#flush-tags)
* [Tasks](#tasks)
* [Queues](#queues)
* [Scheduled tasks](#scheduled-tasks)
* [Workers](#workers)
* [Monitoring](#monitoring)
* [Static files](#static-files)
* [Cache control headers](#cache-control-headers)
* [Cache-buster](#cache-buster)
@ -180,6 +187,7 @@ The container is located at `services/container.go` and is meant to house all of
- Authentication
- Mail
- Template renderer
- Tasks
A new container can be created and initialized via `services.NewContainer()`. It can be later shutdown via `Shutdown()`.
@ -216,20 +224,21 @@ A helper function (`config.SwitchEnvironment`) is available to make switching th
```go
func TestMain(m *testing.M) {
// Set the environment to test
config.SwitchEnvironment(config.EnvTest)
// Set the environment to test
config.SwitchEnvironment(config.EnvTest)
// Start a new container
c = services.NewContainer()
defer func() {
if err := c.Shutdown(); err != nil {
c.Web.Logger.Fatal(err)
}
}()
// Start a new container
c = services.NewContainer()
// Run tests
exitVal := m.Run()
os.Exit(exitVal)
// Run tests
exitVal := m.Run()
// Shutdown the container
if err := c.Shutdown(); err != nil {
panic(err)
}
os.Exit(exitVal)
}
```
@ -980,6 +989,110 @@ err := c.Cache.
Execute(ctx)
```
## Tasks
Tasks are operations to be executed in the background, either in a queue, at a specfic time, after a given amount of time, or according to a periodic interval (like _cron_). Some examples of tasks could be long-running operations, bulk processing, cleanup, notifications, and so on.
Since we're already using [Redis](https://redis.io) as a _cache_, it's available to act as a message broker as well and handle the processing of queued tasks. [Asynq](https://github.com/hibiken/asynq) is the library chosen to interface with Redis and handle queueing tasks and processing them asynchronously with workers.
To make things even easier, a custom client (`TaskClient`) is provided as a _Service_ on the `Container` which exposes a simple interface with [asynq](https://github.com/hibiken/asynq).
For more detailed information about [asynq](https://github.com/hibiken/asynq) and it's usage, review the [wiki](https://github.com/hibiken/asynq/wiki).
### Queues
All tasks must be placed in to queues in order to be executed by [workers](#workers). You are not required to specify a queue when creating a task, as it will be placed in the default queue if one is not provided. [Asynq](https://github.com/hibiken/asynq) supports multiple queues which allows for functionality such as [prioritization](https://github.com/hibiken/asynq/wiki/Queue-Priority).
Creating a queued task is easy and at the minimum only requires the name of the task:
```go
err := c.Tasks.
New("my_task").
Save()
```
This will add a task to the _default_ queue with a task _name_ of `my_task`. The name is used to route the task to the correct [worker](#workers).
#### Options
Tasks can be created and queued with various chained options:
```go
err := c.Tasks.
New("my_task").
Payload(taskData).
Queue("critical").
MaxRetries(5).
Timeout(30 * time.Second).
Wait(5 * time.Second).
Retain(2 * time.Hour).
Save()
```
In this example, this task will be:
- Assigned a task name of `my_task`
- The task worker will be sent `taskData` as the payload
- Put in to the `critical` queue
- Be retried up to 5 times in the event of a failure
- Timeout after 30 seconds of execution
- Wait 5 seconds before execution starts
- Retain the task data in Redis for 2 hours after execution completes
### Scheduled tasks
Tasks can be scheduled to execute at a single point in the future or at a periodic interval. These tasks can also use the options highlighted in the previous section.
**To execute a task once at a specific time:**
```go
err := c.Tasks.
New("my_task").
At(time.Date(2022, time.November, 10, 23, 0, 0, 0, time.UTC)).
Save()
```
**To execute a periodic task using a cron schedule:**
```go
err := c.Tasks.
New("my_task").
Periodic("*/10 * * * *")
Save()
```
**To execute a periodic task using a simple syntax:**
```go
err := c.Tasks.
New("my_task").
Periodic("@every 10m")
Save()
```
#### Scheduler
A service needs to run in order to add periodic tasks to the queue at the specified intervals. When the application is started, this _scheduler_ service will also be started. In `main.go`, this is done with the following code:
```go
go func() {
if err := c.Tasks.StartScheduler(); err != nil {
c.Web.Logger.Fatalf("scheduler shutdown: %v", err)
}
}()
```
In the event of an application restart, periodic tasks must be re-registered with the _scheduler_ in order to continue being queued for execution.
### Workers
Workers are what executes the queued tasks. No workers are included so you will have to implement your own for each task you need to support. You have the option of listening to and executing tasks within this application, or creating a separate application to faciliate this.
The [asynq quickstarter](https://github.com/hibiken/asynq#quickstart) provides a clear example of how to go about implementing this by leveraging `asynq.NewServer` to listen for queued tasks and `asynq.NewServeMux` to route tasks to their workers much like an HTTP router does.
### Monitoring
[Asynq](https://github.com/hibiken/asynq) comes with two options to monitor your queues: 1) [Command-line tool](https://github.com/hibiken/asynq#command-line-tool) and 2) [Web UI](https://github.com/hibiken/asynqmon)
## Static files
Static files are currently configured in the router (`routes/router.go`) to be served from the `static` directory. If you wish to change the directory, alter the constant `config.StaticDir`. The URL prefix for static files is `/files` which is controlled via the `config.StaticPrefix` constant.
@ -1084,6 +1197,7 @@ Future work includes but is not limited to:
Thank you to all of the following amazing projects for making this possible.
- [alpinejs](https://github.com/alpinejs/alpine)
- [asynq](https://github.com/hibiken/asynq)
- [bulma](https://github.com/jgthms/bulma)
- [docker](https://www.docker.com/)
- [echo](https://github.com/labstack/echo)

View File

@ -30,14 +30,15 @@ func TestMain(m *testing.M) {
// Create a new container
c = services.NewContainer()
defer func() {
if err := c.Shutdown(); err != nil {
c.Web.Logger.Fatal(err)
}
}()
// Run tests
exitVal := m.Run()
// Shutdown the container
if err := c.Shutdown(); err != nil {
panic(err)
}
os.Exit(exitVal)
}

6
go.mod
View File

@ -43,6 +43,7 @@ require (
github.com/gorilla/context v1.1.1 // indirect
github.com/gorilla/securecookie v1.1.1 // indirect
github.com/hashicorp/hcl/v2 v2.10.0 // indirect
github.com/hibiken/asynq v0.21.0 // indirect
github.com/huandu/xstrings v1.3.2 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
@ -69,6 +70,7 @@ require (
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/sirupsen/logrus v1.6.0 // indirect
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/cobra v1.3.0 // indirect
@ -81,9 +83,9 @@ require (
golang.org/x/mod v0.5.1 // indirect
golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect
golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 // indirect
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect
golang.org/x/tools v0.1.9-0.20211216111533-8d383106f7e7 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/appengine v1.6.7 // indirect

11
go.sum
View File

@ -246,6 +246,7 @@ github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl
github.com/go-playground/validator/v10 v10.9.0 h1:NgTtmN58D0m8+UuxtYmGztBJB7VnPgjj221I1QHci2A=
github.com/go-playground/validator/v10 v10.9.0/go.mod h1:74x4gJWsvQexRdW8Pn3dXSGrTK4nAUsbPlLADvpJkos=
github.com/go-redis/redis/v8 v8.9.0/go.mod h1:ik7vb7+gm8Izylxu6kf6wG26/t2VljgCfSQ1DM4O1uU=
github.com/go-redis/redis/v8 v8.11.2/go.mod h1:DLomh7y2e3ggQXQLd1YgmvIfecPJoFl7WU5SOQ/r06M=
github.com/go-redis/redis/v8 v8.11.4 h1:kHoYkfZP6+pe04aFTnhDH6GDROa5yJdHJVNxV3F46Tg=
github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
@ -343,6 +344,7 @@ github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm4
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
@ -410,6 +412,8 @@ github.com/hashicorp/memberlist v0.3.0/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOn
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/hashicorp/serf v0.9.5/go.mod h1:UWDWwZeL5cuWDJdl0C6wrvrUwEqtQ4ZKBKKENpqIUyk=
github.com/hashicorp/serf v0.9.6/go.mod h1:TXZNMjZQijwlDvp+r0b63xZ45H7JmCmgg4gpTwn9UV4=
github.com/hibiken/asynq v0.21.0 h1:uH9XogJhjq/S39E0/DEPWLZQ6hHJ73UiblZTe4RzHwA=
github.com/hibiken/asynq v0.21.0/go.mod h1:tyc63ojaW8SJ5SBm8mvI4DDONsguP5HE85EEl4Qr5Ig=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/huandu/xstrings v1.3.2 h1:L18LIDzqlW6xN2rEkpdV8+oL/IXWJ1APd+vsdYy4Wdw=
github.com/huandu/xstrings v1.3.2/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
@ -707,6 +711,8 @@ github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
@ -833,6 +839,7 @@ go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v0.10.0/go.mod h1:VCZuO8V8mFPlL0F5J5GK1rtHV3DrFcQ1R8ryq7FK0aI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
@ -1082,6 +1089,8 @@ golang.org/x/sys v0.0.0-20211205182925-97ca703d548d h1:FjkYO/PPp4Wi0EAUOVLxePm7q
golang.org/x/sys v0.0.0-20211205182925-97ca703d548d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 h1:XfKQ4OlFl8okEOr5UvAqFRVj8pY/4yfcXrddB8qAbU0=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27 h1:XDXtA5hveEEV8JB2l7nhMTp3t3cHp9ZpwcdjqyEWLlo=
golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@ -1101,6 +1110,8 @@ golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 h1:Hir2P/De0WpUhtrKGGjvSb2YxUgyZ7EFOSLIcSSpiwE=
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 h1:GZokNIeuVkl3aZHJchRrr13WCsols02MLUcz1U9is6M=
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

View File

@ -51,6 +51,13 @@ func main() {
}
}()
// Start the scheduler service to queue periodic tasks
go func() {
if err := c.Tasks.StartScheduler(); err != nil {
c.Web.Logger.Fatalf("scheduler shutdown: %v", err)
}
}()
// Wait for interrupt signal to gracefully shutdown the server with a timeout of 10 seconds.
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt)

View File

@ -21,11 +21,6 @@ func TestMain(m *testing.M) {
// Create a new container
c = services.NewContainer()
defer func() {
if err := c.Shutdown(); err != nil {
c.Web.Logger.Fatal(err)
}
}()
// Create a user
var err error
@ -35,5 +30,11 @@ func TestMain(m *testing.M) {
// Run tests
exitVal := m.Run()
// Shutdown the container
if err = c.Shutdown(); err != nil {
panic(err)
}
os.Exit(exitVal)
}

View File

@ -28,11 +28,6 @@ func TestMain(m *testing.M) {
// Start a new container
c = services.NewContainer()
defer func() {
if err := c.Shutdown(); err != nil {
c.Web.Logger.Fatal(err)
}
}()
// Start a test HTTP server
BuildRouter(c)
@ -40,7 +35,13 @@ func TestMain(m *testing.M) {
// Run tests
exitVal := m.Run()
// Shutdown the container and test server
if err := c.Shutdown(); err != nil {
panic(err)
}
srv.Close()
os.Exit(exitVal)
}

View File

@ -46,6 +46,9 @@ type Container struct {
// TemplateRenderer stores a service to easily render and cache templates
TemplateRenderer *TemplateRenderer
// Tasks stores the task client
Tasks *TaskClient
}
// NewContainer creates and initializes a new Container
@ -60,6 +63,7 @@ func NewContainer() *Container {
c.initAuth()
c.initTemplateRenderer()
c.initMail()
c.initTasks()
return c
}
@ -74,6 +78,9 @@ func (c *Container) Shutdown() error {
if err := c.Database.Close(); err != nil {
return err
}
if err := c.Tasks.Close(); err != nil {
return err
}
return nil
}
@ -182,3 +189,8 @@ func (c *Container) initMail() {
panic(fmt.Sprintf("failed to create mail client: %v", err))
}
}
// initTasks initializes the task client
func (c *Container) initTasks() {
c.Tasks = NewTaskClient(c.Config.Cache)
}

View File

@ -16,4 +16,5 @@ func TestNewContainer(t *testing.T) {
assert.NotNil(t, c.Mail)
assert.NotNil(t, c.Auth)
assert.NotNil(t, c.TemplateRenderer)
assert.NotNil(t, c.Tasks)
}

View File

@ -23,11 +23,6 @@ func TestMain(m *testing.M) {
// Create a new container
c = NewContainer()
defer func() {
if err := c.Shutdown(); err != nil {
c.Web.Logger.Fatal(err)
}
}()
// Create a web context
ctx, _ = tests.NewContext(c.Web, "/")
@ -41,5 +36,11 @@ func TestMain(m *testing.M) {
// Run tests
exitVal := m.Run()
// Shutdown the container
if err = c.Shutdown(); err != nil {
panic(err)
}
os.Exit(exitVal)
}

172
services/tasks.go Normal file
View File

@ -0,0 +1,172 @@
package services
import (
"encoding/json"
"fmt"
"time"
"github.com/hibiken/asynq"
"github.com/mikestefanello/pagoda/config"
)
type (
// TaskClient is that client that allows you to queue or schedule task execution
TaskClient struct {
// client stores the asynq client
client *asynq.Client
// scheduler stores the asynq scheduler
scheduler *asynq.Scheduler
}
// task handles task creation operations
task struct {
client *TaskClient
name string
payload interface{}
periodic *string
queue *string
maxRetries *int
timeout *time.Duration
deadline *time.Time
at *time.Time
wait *time.Duration
retain *time.Duration
}
)
// NewTaskClient creates a new task client
func NewTaskClient(cfg config.CacheConfig) *TaskClient {
conn := asynq.RedisClientOpt{
Addr: fmt.Sprintf("%s:%d", cfg.Hostname, cfg.Port),
Password: cfg.Password,
}
return &TaskClient{
client: asynq.NewClient(conn),
scheduler: asynq.NewScheduler(conn, nil),
}
}
// Close closes the connection to the task service
func (t *TaskClient) Close() error {
return t.client.Close()
}
// StartScheduler starts the scheduler service which adds scheduled tasks to the queue
// This must be running in order to queue tasks set for periodic execution
func (t *TaskClient) StartScheduler() error {
return t.scheduler.Run()
}
// New starts a task creation operation
func (t *TaskClient) New(name string) *task {
return &task{
client: t,
name: name,
}
}
// Payload sets the task payload data which will be sent to the task handler
func (t *task) Payload(payload interface{}) *task {
t.payload = payload
return t
}
// Periodic sets the task to execute periodically according to a given interval
// The interval can be either in cron form ("*/5 * * * *") or "@every 30s"
func (t *task) Periodic(interval string) *task {
t.periodic = &interval
return t
}
// Queue specifies the name of the queue to add the task to
// The default queue will be used if this is not set
func (t *task) Queue(queue string) *task {
t.queue = &queue
return t
}
// Timeout sets the task timeout, meaning the task must execute within a given duration
func (t *task) Timeout(timeout time.Duration) *task {
t.timeout = &timeout
return t
}
// Deadline sets the task execution deadline to a specific date and time
func (t *task) Deadline(deadline time.Time) *task {
t.deadline = &deadline
return t
}
// At sets the exact date and time the task should be executed
func (t *task) At(processAt time.Time) *task {
t.at = &processAt
return t
}
// Wait instructs the task to wait a given duration before it is executed
func (t *task) Wait(duration time.Duration) *task {
t.wait = &duration
return t
}
// Retain instructs the task service to retain the task data for a given duration after execution is complete
func (t *task) Retain(duration time.Duration) *task {
t.retain = &duration
return t
}
// MaxRetries sets the maximum amount of times to retry executing the task in the event of a failure
func (t *task) MaxRetries(retries int) *task {
t.maxRetries = &retries
return t
}
// Save saves the task so it can be executed
func (t *task) Save() error {
var err error
// Build the payload
var payload []byte
if t.payload != nil {
if payload, err = json.Marshal(t.payload); err != nil {
return err
}
}
// Build the task options
opts := make([]asynq.Option, 0)
if t.queue != nil {
opts = append(opts, asynq.Queue(*t.queue))
}
if t.maxRetries != nil {
opts = append(opts, asynq.MaxRetry(*t.maxRetries))
}
if t.timeout != nil {
opts = append(opts, asynq.Timeout(*t.timeout))
}
if t.deadline != nil {
opts = append(opts, asynq.Deadline(*t.deadline))
}
if t.wait != nil {
opts = append(opts, asynq.ProcessIn(*t.wait))
}
if t.retain != nil {
opts = append(opts, asynq.Retention(*t.retain))
}
if t.at != nil {
opts = append(opts, asynq.ProcessAt(*t.at))
}
// Build the task
task := asynq.NewTask(t.name, payload, opts...)
// Schedule, if needed
if t.periodic != nil {
_, err = t.client.scheduler.Register(*t.periodic, task)
} else {
_, err = t.client.client.Enqueue(task)
}
return err
}

35
services/tasks_test.go Normal file
View File

@ -0,0 +1,35 @@
package services
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestTaskClient_New(t *testing.T) {
now := time.Now()
tk := c.Tasks.
New("task1").
Payload("payload").
Queue("queue").
Periodic("@every 5s").
MaxRetries(5).
Timeout(5 * time.Second).
Deadline(now).
At(now).
Wait(6 * time.Second).
Retain(7 * time.Second)
assert.Equal(t, "task1", tk.name)
assert.Equal(t, "payload", tk.payload.(string))
assert.Equal(t, "queue", *tk.queue)
assert.Equal(t, "@every 5s", *tk.periodic)
assert.Equal(t, 5, *tk.maxRetries)
assert.Equal(t, 5*time.Second, *tk.timeout)
assert.Equal(t, now, *tk.deadline)
assert.Equal(t, now, *tk.at)
assert.Equal(t, 6*time.Second, *tk.wait)
assert.Equal(t, 7*time.Second, *tk.retain)
assert.NoError(t, tk.Save())
}