Switched to backlite for task queues

Ported from
f54d9f8b37
This commit is contained in:
Tony Grosinger 2024-07-27 11:52:05 -07:00
parent e3e37a6db8
commit 48dd3433a7
11 changed files with 91 additions and 362 deletions

View File

@ -80,7 +80,7 @@
* [Flush tags](#flush-tags)
* [Tasks](#tasks)
* [Queues](#queues)
* [Runner](#runner)
* [Dispatcher](#dispatcher)
* [Cron](#cron)
* [Static files](#static-files)
* [Cache control headers](#cache-control-headers)
@ -966,77 +966,27 @@ As shown in the previous examples, cache tags were provided because they can be
Tasks are queued operations to be executed in the background, either immediately, at a specfic time, or after a given amount of time has passed. Some examples of tasks could be long-running operations, bulk processing, cleanup, notifications, etc.
Since we're already using [SQLite](https://sqlite.org/) for our database, it's available to act as a persistent store for queued tasks so that tasks are never lost, can be retried until successful, and their concurrent execution can be managed. [Goqite](https://github.com/maragudk/goqite) is the library chosen to interface with [SQLite](https://sqlite.org/) and handle queueing tasks and processing them asynchronously.
Since we're already using [SQLite](https://sqlite.org/) for our database, it's available to act as a persistent store for queued tasks so that tasks are never lost, can be retried until successful, and their concurrent execution can be managed. [Backlite](https://github.com/mikestefanello/backlite) is the library chosen to interface with [SQLite](https://sqlite.org/) and handle queueing tasks and processing them asynchronously.
To make things even easier, a custom client (`TaskClient`) is provided as a _Service_ on the `Container` which exposes a simple interface with [goqite](https://github.com/maragudk/goqite) that supports type-safe tasks and queues.
To make things easy, the _Backlite_ client (`TaskClient`) is provided as a _Service_ on the `Container` which allows you to register queues and add tasks.
### Queues
A full example of a queue implementation can be found in `pkg/tasks` with an interactive form to create a task and add to the queue at `/task` (see `pkg/handlers/task.go`).
A full example of a queue implementation can be found in `pkg/tasks` with an interactive form to create a task and add to the queue at `/task` (see `pkg/handlers/task.go`). Also refer to the [Backlite](https://github.com/mikestefanello/backlite) documentation for reference and examples.
A queue starts by declaring a `Task` _type_, which is the object that gets placed in to a queue and eventually passed to a queue subscriber (a callback function to process the task). A `Task` must implement the `Name()` method which returns a unique name for the task. For example:
See `pkg/tasks/register.go` for a simple way to register all of your queues and to easily pass the `Container` to them so the queue processor callbacks have access to all of your app's dependencies.
### Dispatcher
The _task dispatcher_ is what manages the worker pool used for executing tasks in the background. It monitors incoming and scheduled tasks and handles sending them to the pool for execution by the queue's processor callback. This must be started in order for this to happen. In `cmd/web/main.go`, the _task dispatcher_ is automatically started when the app starts via:
```go
type MyTask struct {
Text string
Num int
}
func (t MyTask) Name() string {
return "my_task"
}
c.Tasks.Start(ctx)
```
Then, create the queue for `MyTask` tasks:
The app [configuration](#configuration) contains values to configure the client and dispatcher including how many goroutines to use, when to release stuck tasks back into the queue, and how often to cleanup retained tasks in the database.
```go
q := services.NewQueue[MyTask](func(ctx context.Context, task MyTask) error {
// This is where you process the task
fmt.Println("Processed %s task!", task.Text)
return nil
})
```
And finally, register the queue with the `TaskClient`:
```go
c.Tasks.Register(q)
```
See `pkg/tasks/register.go` for a simple way to register all of your queues and to easily pass the `Container` to them so the queue subscriber callbacks have access to all of your app's dependencies.
Now you can easily add a task to the queue using the `TaskClient`:
```go
task := MyTask{Text: "Hello world!", Num: 10}
err := c.Tasks.
New(task).
Save()
```
#### Options
Tasks can be created and queued with various chained options:
```go
err := c.Tasks.
New(task).
Wait(30 * time.Second). // Wait 30 seconds before passing the task to the subscriber
At(time.Date(...)). // Wait until a given date before passing the task to the subscriber
Tx(tx). // Include the queueing of this task in a database transaction
Save()
```
### Runner
The _task runner_ is what manages periodically polling the database for available queued tasks to process and passing them to the queue's subscriber callback. This must be started in order for this to happen. In `cmd/web/main.go`, the _task runner_ is started by using the `TaskClient`:
```go
go c.Tasks.StartRunner(ctx)
```
The app [configuration](#configuration) contains values to configure the runner including how often to poll the database for tasks, the maximum amount of retries for a given task, and the amount of tasks that can be processed concurrently.
When the app is shutdown, the dispatcher is given 10 seconds to wait for any in-progress tasks to finish execution. This can be changed in `cmd/web/main.go`.
## Cron
@ -1172,12 +1122,12 @@ Future work includes but is not limited to:
Thank you to all of the following amazing projects for making this possible.
- [alpinejs](https://github.com/alpinejs/alpine)
- [backlite](https://github.com/mikestefanello/backlite)
- [bulma](https://github.com/jgthms/bulma)
- [echo](https://github.com/labstack/echo)
- [golang-migrate](https://github.com/golang-migrate/migrate)
- [go](https://go.dev/)
- [go-sqlite3](https://github.com/mattn/go-sqlite3)
- [goqite](https://github.com/maragudk/goqite)
- [goquery](https://github.com/PuerkitoBio/goquery)
- [htmx](https://github.com/bigskysoftware/htmx)
- [jwt](https://github.com/golang-jwt/jwt)

View File

@ -9,6 +9,7 @@ import (
"net/http"
"os"
"os/signal"
"sync"
"time"
"git.grosinger.net/tgrosinger/saasitone/pkg/handlers"
@ -60,18 +61,31 @@ func main() {
tasks.Register(c)
// Start the task runner to execute queued tasks
ctx, cancel := context.WithCancel(context.Background())
go c.Tasks.StartRunner(ctx)
c.Tasks.Start(context.Background())
// Wait for interrupt signal to gracefully shut down the server with a timeout of 10 seconds.
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt)
signal.Notify(quit, os.Kill)
<-quit
cancel()
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
// Shutdown both the task runner and web server
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := c.Web.Shutdown(ctx); err != nil {
log.Fatal(err)
}
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
c.Tasks.Stop(ctx)
}()
go func() {
defer wg.Done()
if err := c.Web.Shutdown(ctx); err != nil {
log.Fatal(err)
}
}()
wg.Wait()
}

View File

@ -31,9 +31,9 @@ storage:
migrationsDir: db/migrations
tasks:
pollInterval: "1s"
maxRetries: 10
goroutines: 1
releaseAfter: "15m"
cleanupInterval: "1h"
mail:
hostname: "localhost"

View File

@ -111,9 +111,9 @@ type (
// TasksConfig stores the tasks configuration
TasksConfig struct {
PollInterval time.Duration
MaxRetries int
Goroutines int
Goroutines int
ReleaseAfter time.Duration
CleanupInterval time.Duration
}
// MailConfig stores the mail configuration
@ -152,6 +152,7 @@ func GetConfig() (Config, error) {
}
usedConfigFilePath := viper.GetViper().ConfigFileUsed()
configFileDir := filepath.Dir(usedConfigFilePath)
if !filepath.IsAbs(c.Storage.DatabaseFile) {
c.Storage.DatabaseFile = filepath.Join(configFileDir, c.Storage.DatabaseFile)

5
go.mod
View File

@ -1,8 +1,8 @@
module git.grosinger.net/tgrosinger/saasitone
go 1.22
go 1.22.4
toolchain go1.22.1
toolchain go1.22.5
require (
entgo.io/ent v0.13.1
@ -54,6 +54,7 @@ require (
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/mikestefanello/backlite v0.1.0 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/go-wordwrap v1.0.1 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect

2
go.sum
View File

@ -116,6 +116,8 @@ github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/maypok86/otter v1.2.1 h1:xyvMW+t0vE1sKt/++GTkznLitEl7D/msqXkAbLwiC1M=
github.com/maypok86/otter v1.2.1/go.mod h1:mKLfoI7v1HOmQMwFgX4QkRk23mX6ge3RDvjdHOWG4R4=
github.com/mikestefanello/backlite v0.1.0 h1:bIiZJXPZB8V5PXWvDmkTepY015w3gJdeRrP3QrEV4Ls=
github.com/mikestefanello/backlite v0.1.0/go.mod h1:/vj8LPZWG/xqK/3uHaqOtu5JRLDEWqeyJKWTAlADTV0=
github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw=
github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s=
github.com/mitchellh/go-wordwrap v1.0.1 h1:TLuKupo69TCn6TQSyGxwI1EblZZEsQ0vMlAFQflz0v0=

View File

@ -7,6 +7,7 @@ import (
"github.com/a-h/templ"
"github.com/go-playground/validator/v10"
"github.com/labstack/echo/v4"
"github.com/mikestefanello/backlite"
"git.grosinger.net/tgrosinger/saasitone/pkg/form"
"git.grosinger.net/tgrosinger/saasitone/pkg/msg"
@ -25,7 +26,7 @@ const (
type (
Task struct {
tasks *services.TaskClient
tasks *backlite.Client
*services.TemplateRenderer
}
)
@ -75,9 +76,10 @@ func (h *Task) Submit(ctx echo.Context) error {
}
// Insert the task
err = h.tasks.New(tasks.ExampleTask{
Message: input.Message,
}).
err = h.tasks.
Add(tasks.ExampleTask{
Message: input.Message,
}).
Wait(time.Duration(input.Delay) * time.Second).
Save()
if err != nil {

View File

@ -6,9 +6,11 @@ import (
"github.com/labstack/echo/v4"
_ "github.com/mattn/go-sqlite3"
"github.com/mikestefanello/backlite"
"git.grosinger.net/tgrosinger/saasitone/config"
"git.grosinger.net/tgrosinger/saasitone/pkg/funcmap"
"git.grosinger.net/tgrosinger/saasitone/pkg/log"
)
// Container contains all services used by the application and provides an easy way to handle dependency
@ -39,7 +41,7 @@ type Container struct {
TemplateRenderer *TemplateRenderer
// Tasks stores the task client
Tasks *TaskClient
Tasks *backlite.Client
}
// NewContainer creates and initializes a new Container
@ -139,10 +141,21 @@ func (c *Container) initMail() {
// initTasks initializes the task client
func (c *Container) initTasks() {
var err error
// You could use a separate database for tasks, if you'd like. but using one
// makes transaction support easier
c.Tasks, err = NewTaskClient(c.Config.Tasks, c.DB.DB())
c.Tasks, err = backlite.NewClient(backlite.ClientConfig{
DB: c.DB.DB(),
Logger: log.Default(),
NumWorkers: c.Config.Tasks.Goroutines,
ReleaseAfter: c.Config.Tasks.ReleaseAfter,
CleanupInterval: c.Config.Tasks.CleanupInterval,
})
if err != nil {
panic(fmt.Sprintf("failed to create task client: %v", err))
}
if err = c.Tasks.Install(); err != nil {
panic(fmt.Sprintf("failed to install task schema: %v", err))
}
}

View File

@ -1,205 +0,0 @@
package services
import (
"bytes"
"context"
"database/sql"
"encoding/gob"
"strings"
"sync"
"time"
"github.com/maragudk/goqite"
"github.com/maragudk/goqite/jobs"
"git.grosinger.net/tgrosinger/saasitone/config"
"git.grosinger.net/tgrosinger/saasitone/pkg/log"
)
type (
// TaskClient is that client that allows you to queue or schedule task execution.
// Under the hood we create only a single queue using goqite for all tasks because we do not want more than one
// runner to process the tasks. The TaskClient wrapper provides abstractions for separate, type-safe queues.
TaskClient struct {
queue *goqite.Queue
runner *jobs.Runner
buffers sync.Pool
}
// Task is a job that can be added to a queue and later passed to and executed by a QueueSubscriber.
// See pkg/tasks for an example of how this can be used with a queue.
Task interface {
Name() string
}
// TaskSaveOp handles task save operations
TaskSaveOp struct {
client *TaskClient
task Task
tx *sql.Tx
at *time.Time
wait *time.Duration
}
// Queue is a queue that a Task can be pushed to for execution.
// While this can be implemented directly, it's recommended to use NewQueue() which uses generics in
// order to provide type-safe queues and queue subscriber callbacks for task execution.
Queue interface {
// Name returns the name of the task this queue processes
Name() string
// Receive receives the Task payload to be processed
Receive(ctx context.Context, payload []byte) error
}
// queue provides a type-safe implementation of Queue
queue[T Task] struct {
name string
subscriber QueueSubscriber[T]
}
// QueueSubscriber is a generic subscriber callback for a given queue to process Tasks
QueueSubscriber[T Task] func(context.Context, T) error
)
// NewTaskClient creates a new task client
func NewTaskClient(cfg config.TasksConfig, db *sql.DB) (*TaskClient, error) {
// Install the schema
if err := goqite.Setup(context.Background(), db); err != nil {
// An error is returned if we already ran this and there's no better way to check.
// You can and probably should handle this via migrations
if !strings.Contains(err.Error(), "already exists") {
return nil, err
}
}
t := &TaskClient{
queue: goqite.New(goqite.NewOpts{
DB: db,
Name: "tasks",
MaxReceive: cfg.MaxRetries,
}),
buffers: sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(nil)
},
},
}
t.runner = jobs.NewRunner(jobs.NewRunnerOpts{
Limit: cfg.Goroutines,
Log: log.Default(),
PollInterval: cfg.PollInterval,
Queue: t.queue,
})
return t, nil
}
// StartRunner starts the scheduler service which adds scheduled tasks to the queue.
// This must be running in order to execute queued tasked.
// To stop the runner, cancel the context.
// This is a blocking call.
func (t *TaskClient) StartRunner(ctx context.Context) {
t.runner.Start(ctx)
}
// Register registers a queue so tasks can be added to it and processed
func (t *TaskClient) Register(queue Queue) {
t.runner.Register(queue.Name(), queue.Receive)
}
// New starts a task save operation
func (t *TaskClient) New(task Task) *TaskSaveOp {
return &TaskSaveOp{
client: t,
task: task,
}
}
// At sets the exact date and time the task should be executed
func (t *TaskSaveOp) At(processAt time.Time) *TaskSaveOp {
t.Wait(time.Until(processAt))
return t
}
// Wait instructs the task to wait a given duration before it is executed
func (t *TaskSaveOp) Wait(duration time.Duration) *TaskSaveOp {
t.wait = &duration
return t
}
// Tx will include the task as part of a given database transaction
func (t *TaskSaveOp) Tx(tx *sql.Tx) *TaskSaveOp {
t.tx = tx
return t
}
// Save saves the task, so it can be queued for execution
func (t *TaskSaveOp) Save() error {
type message struct {
Name string
Message []byte
}
// Encode the task
taskBuf := t.client.buffers.Get().(*bytes.Buffer)
if err := gob.NewEncoder(taskBuf).Encode(t.task); err != nil {
return err
}
// Wrap and encode the message
// This is needed as a workaround because goqite doesn't support delays using the jobs package,
// so we format the message the way it expects but use the queue to supply the delay
msgBuf := t.client.buffers.Get().(*bytes.Buffer)
wrapper := message{Name: t.task.Name(), Message: taskBuf.Bytes()}
if err := gob.NewEncoder(msgBuf).Encode(wrapper); err != nil {
return err
}
msg := goqite.Message{
Body: msgBuf.Bytes(),
}
if t.wait != nil {
msg.Delay = *t.wait
}
// Put the buffers back in the pool for re-use
taskBuf.Reset()
msgBuf.Reset()
t.client.buffers.Put(taskBuf)
t.client.buffers.Put(msgBuf)
if t.tx == nil {
return t.client.queue.Send(context.Background(), msg)
} else {
return t.client.queue.SendTx(context.Background(), t.tx, msg)
}
}
// NewQueue queues a new type-safe Queue of a given Task type
func NewQueue[T Task](subscriber QueueSubscriber[T]) Queue {
var task T
q := &queue[T]{
name: task.Name(),
subscriber: subscriber,
}
return q
}
func (q *queue[T]) Name() string {
return q.name
}
func (q *queue[T]) Receive(ctx context.Context, payload []byte) error {
var obj T
err := gob.NewDecoder(bytes.NewReader(payload)).Decode(&obj)
if err != nil {
return err
}
return q.subscriber(ctx, obj)
}

View File

@ -1,69 +0,0 @@
package services
import (
"context"
"database/sql"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
"time"
)
type testTask struct {
Val int
}
func (t testTask) Name() string {
return "test_task"
}
func TestTaskClient_New(t *testing.T) {
var subCalled bool
queue := NewQueue[testTask](func(ctx context.Context, task testTask) error {
subCalled = true
assert.Equal(t, 123, task.Val)
return nil
})
c.Tasks.Register(queue)
task := testTask{Val: 123}
tx := &sql.Tx{}
op := c.Tasks.
New(task).
Wait(5 * time.Second).
Tx(tx)
// Check that the task op was built correctly
assert.Equal(t, task, op.task)
assert.Equal(t, tx, op.tx)
assert.Equal(t, 5*time.Second, *op.wait)
// Remove the transaction and delay so we can process the task immediately
op.tx, op.wait = nil, nil
err := op.Save()
require.NoError(t, err)
// Start the runner
ctx, cancel := context.WithCancel(context.Background())
go c.Tasks.StartRunner(ctx)
defer cancel()
// Check for up to 5 seconds if the task executed
start := time.Now()
waitLoop:
for {
switch {
case subCalled:
break waitLoop
case time.Since(start) > (5 * time.Second):
break waitLoop
default:
time.Sleep(10 * time.Millisecond)
}
}
assert.True(t, subCalled)
}

View File

@ -2,29 +2,49 @@ package tasks
import (
"context"
"time"
"github.com/mikestefanello/backlite"
"git.grosinger.net/tgrosinger/saasitone/pkg/log"
"git.grosinger.net/tgrosinger/saasitone/pkg/services"
)
// ExampleTask is an example implementation of services.Task
// ExampleTask is an example implementation of backlite.Task
// This represents the task that can be queued for execution via the task client and should contain everything
// that your queue subscriber needs to process the task.
// that your queue processor needs to process the task.
type ExampleTask struct {
Message string
}
// Name satisfies the services.Task interface by proviing a unique name for this Task type
// Config satisfies the backlite.Task interface by providing configuration for the queue that these items will be
func (t ExampleTask) Name() string {
// placed into for execution.
return "example_task"
}
func (t ExampleTask) Config() backlite.QueueConfig {
return backlite.QueueConfig{
Name: "ExampleTask",
MaxAttempts: 3,
Timeout: 5 * time.Second,
Backoff: 10 * time.Second,
Retention: &backlite.Retention{
Duration: 24 * time.Hour,
OnlyFailed: false,
Data: &backlite.RetainData{
OnlyFailed: false,
},
},
}
}
// NewExampleTaskQueue provides a Queue that can process ExampleTask tasks
// The service container is provided so the subscriber can have access to the app dependencies.
// All queues must be registered in the Register() function.
// Whenever an ExampleTask is added to the task client, it will be queued and eventually sent here for execution.
func NewExampleTaskQueue(c *services.Container) services.Queue {
return services.NewQueue[ExampleTask](func(ctx context.Context, task ExampleTask) error {
func NewExampleTaskQueue(c *services.Container) backlite.Queue {
return backlite.NewQueue[ExampleTask](func(ctx context.Context, task ExampleTask) error {
log.Default().Info("Example task received",
"message", task.Message,
)