diff --git a/README.md b/README.md index 7b34eee..6489928 100644 --- a/README.md +++ b/README.md @@ -80,7 +80,7 @@ * [Flush tags](#flush-tags) * [Tasks](#tasks) * [Queues](#queues) - * [Runner](#runner) + * [Dispatcher](#dispatcher) * [Cron](#cron) * [Static files](#static-files) * [Cache control headers](#cache-control-headers) @@ -966,77 +966,27 @@ As shown in the previous examples, cache tags were provided because they can be Tasks are queued operations to be executed in the background, either immediately, at a specfic time, or after a given amount of time has passed. Some examples of tasks could be long-running operations, bulk processing, cleanup, notifications, etc. -Since we're already using [SQLite](https://sqlite.org/) for our database, it's available to act as a persistent store for queued tasks so that tasks are never lost, can be retried until successful, and their concurrent execution can be managed. [Goqite](https://github.com/maragudk/goqite) is the library chosen to interface with [SQLite](https://sqlite.org/) and handle queueing tasks and processing them asynchronously. +Since we're already using [SQLite](https://sqlite.org/) for our database, it's available to act as a persistent store for queued tasks so that tasks are never lost, can be retried until successful, and their concurrent execution can be managed. [Backlite](https://github.com/mikestefanello/backlite) is the library chosen to interface with [SQLite](https://sqlite.org/) and handle queueing tasks and processing them asynchronously. -To make things even easier, a custom client (`TaskClient`) is provided as a _Service_ on the `Container` which exposes a simple interface with [goqite](https://github.com/maragudk/goqite) that supports type-safe tasks and queues. +To make things easy, the _Backlite_ client (`TaskClient`) is provided as a _Service_ on the `Container` which allows you to register queues and add tasks. ### Queues -A full example of a queue implementation can be found in `pkg/tasks` with an interactive form to create a task and add to the queue at `/task` (see `pkg/handlers/task.go`). +A full example of a queue implementation can be found in `pkg/tasks` with an interactive form to create a task and add to the queue at `/task` (see `pkg/handlers/task.go`). Also refer to the [Backlite](https://github.com/mikestefanello/backlite) documentation for reference and examples. -A queue starts by declaring a `Task` _type_, which is the object that gets placed in to a queue and eventually passed to a queue subscriber (a callback function to process the task). A `Task` must implement the `Name()` method which returns a unique name for the task. For example: +See `pkg/tasks/register.go` for a simple way to register all of your queues and to easily pass the `Container` to them so the queue processor callbacks have access to all of your app's dependencies. + +### Dispatcher + +The _task dispatcher_ is what manages the worker pool used for executing tasks in the background. It monitors incoming and scheduled tasks and handles sending them to the pool for execution by the queue's processor callback. This must be started in order for this to happen. In `cmd/web/main.go`, the _task dispatcher_ is automatically started when the app starts via: ```go -type MyTask struct { - Text string - Num int -} - -func (t MyTask) Name() string { - return "my_task" -} +c.Tasks.Start(ctx) ``` -Then, create the queue for `MyTask` tasks: +The app [configuration](#configuration) contains values to configure the client and dispatcher including how many goroutines to use, when to release stuck tasks back into the queue, and how often to cleanup retained tasks in the database. -```go -q := services.NewQueue[MyTask](func(ctx context.Context, task MyTask) error { - // This is where you process the task - fmt.Println("Processed %s task!", task.Text) - return nil -}) -``` - -And finally, register the queue with the `TaskClient`: - -```go -c.Tasks.Register(q) -``` - -See `pkg/tasks/register.go` for a simple way to register all of your queues and to easily pass the `Container` to them so the queue subscriber callbacks have access to all of your app's dependencies. - -Now you can easily add a task to the queue using the `TaskClient`: - -```go -task := MyTask{Text: "Hello world!", Num: 10} - -err := c.Tasks. - New(task). - Save() -``` - -#### Options - -Tasks can be created and queued with various chained options: - -```go -err := c.Tasks. - New(task). - Wait(30 * time.Second). // Wait 30 seconds before passing the task to the subscriber - At(time.Date(...)). // Wait until a given date before passing the task to the subscriber - Tx(tx). // Include the queueing of this task in a database transaction - Save() -``` - -### Runner - -The _task runner_ is what manages periodically polling the database for available queued tasks to process and passing them to the queue's subscriber callback. This must be started in order for this to happen. In `cmd/web/main.go`, the _task runner_ is started by using the `TaskClient`: - -```go -go c.Tasks.StartRunner(ctx) -``` - -The app [configuration](#configuration) contains values to configure the runner including how often to poll the database for tasks, the maximum amount of retries for a given task, and the amount of tasks that can be processed concurrently. +When the app is shutdown, the dispatcher is given 10 seconds to wait for any in-progress tasks to finish execution. This can be changed in `cmd/web/main.go`. ## Cron @@ -1172,12 +1122,12 @@ Future work includes but is not limited to: Thank you to all of the following amazing projects for making this possible. - [alpinejs](https://github.com/alpinejs/alpine) +- [backlite](https://github.com/mikestefanello/backlite) - [bulma](https://github.com/jgthms/bulma) - [echo](https://github.com/labstack/echo) - [golang-migrate](https://github.com/golang-migrate/migrate) - [go](https://go.dev/) - [go-sqlite3](https://github.com/mattn/go-sqlite3) -- [goqite](https://github.com/maragudk/goqite) - [goquery](https://github.com/PuerkitoBio/goquery) - [htmx](https://github.com/bigskysoftware/htmx) - [jwt](https://github.com/golang-jwt/jwt) diff --git a/cmd/web/main.go b/cmd/web/main.go index 8ab120f..691b490 100644 --- a/cmd/web/main.go +++ b/cmd/web/main.go @@ -9,6 +9,7 @@ import ( "net/http" "os" "os/signal" + "sync" "time" "git.grosinger.net/tgrosinger/saasitone/pkg/handlers" @@ -60,18 +61,31 @@ func main() { tasks.Register(c) // Start the task runner to execute queued tasks - ctx, cancel := context.WithCancel(context.Background()) - go c.Tasks.StartRunner(ctx) + c.Tasks.Start(context.Background()) // Wait for interrupt signal to gracefully shut down the server with a timeout of 10 seconds. quit := make(chan os.Signal, 1) signal.Notify(quit, os.Interrupt) signal.Notify(quit, os.Kill) <-quit - cancel() - ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) + // Shutdown both the task runner and web server + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - if err := c.Web.Shutdown(ctx); err != nil { - log.Fatal(err) - } + + wg := sync.WaitGroup{} + wg.Add(2) + + go func() { + defer wg.Done() + c.Tasks.Stop(ctx) + }() + + go func() { + defer wg.Done() + if err := c.Web.Shutdown(ctx); err != nil { + log.Fatal(err) + } + }() + + wg.Wait() } diff --git a/config.yaml b/config.yaml index 1892d07..4a86765 100644 --- a/config.yaml +++ b/config.yaml @@ -31,9 +31,9 @@ storage: migrationsDir: db/migrations tasks: - pollInterval: "1s" - maxRetries: 10 goroutines: 1 + releaseAfter: "15m" + cleanupInterval: "1h" mail: hostname: "localhost" diff --git a/config/config.go b/config/config.go index 3b7a6c2..120c5b3 100644 --- a/config/config.go +++ b/config/config.go @@ -111,9 +111,9 @@ type ( // TasksConfig stores the tasks configuration TasksConfig struct { - PollInterval time.Duration - MaxRetries int - Goroutines int + Goroutines int + ReleaseAfter time.Duration + CleanupInterval time.Duration } // MailConfig stores the mail configuration @@ -152,6 +152,7 @@ func GetConfig() (Config, error) { } usedConfigFilePath := viper.GetViper().ConfigFileUsed() + configFileDir := filepath.Dir(usedConfigFilePath) if !filepath.IsAbs(c.Storage.DatabaseFile) { c.Storage.DatabaseFile = filepath.Join(configFileDir, c.Storage.DatabaseFile) diff --git a/go.mod b/go.mod index ecf92a9..1ece0fd 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,8 @@ module git.grosinger.net/tgrosinger/saasitone -go 1.22 +go 1.22.4 -toolchain go1.22.1 +toolchain go1.22.5 require ( entgo.io/ent v0.13.1 @@ -54,6 +54,7 @@ require ( github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.9 // indirect + github.com/mikestefanello/backlite v0.1.0 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/go-wordwrap v1.0.1 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect diff --git a/go.sum b/go.sum index f85c2d4..3e54b9e 100644 --- a/go.sum +++ b/go.sum @@ -116,6 +116,8 @@ github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/maypok86/otter v1.2.1 h1:xyvMW+t0vE1sKt/++GTkznLitEl7D/msqXkAbLwiC1M= github.com/maypok86/otter v1.2.1/go.mod h1:mKLfoI7v1HOmQMwFgX4QkRk23mX6ge3RDvjdHOWG4R4= +github.com/mikestefanello/backlite v0.1.0 h1:bIiZJXPZB8V5PXWvDmkTepY015w3gJdeRrP3QrEV4Ls= +github.com/mikestefanello/backlite v0.1.0/go.mod h1:/vj8LPZWG/xqK/3uHaqOtu5JRLDEWqeyJKWTAlADTV0= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/go-wordwrap v1.0.1 h1:TLuKupo69TCn6TQSyGxwI1EblZZEsQ0vMlAFQflz0v0= diff --git a/pkg/handlers/task.go b/pkg/handlers/task.go index 9a4ed2a..d1fba5a 100644 --- a/pkg/handlers/task.go +++ b/pkg/handlers/task.go @@ -7,6 +7,7 @@ import ( "github.com/a-h/templ" "github.com/go-playground/validator/v10" "github.com/labstack/echo/v4" + "github.com/mikestefanello/backlite" "git.grosinger.net/tgrosinger/saasitone/pkg/form" "git.grosinger.net/tgrosinger/saasitone/pkg/msg" @@ -25,7 +26,7 @@ const ( type ( Task struct { - tasks *services.TaskClient + tasks *backlite.Client *services.TemplateRenderer } ) @@ -75,9 +76,10 @@ func (h *Task) Submit(ctx echo.Context) error { } // Insert the task - err = h.tasks.New(tasks.ExampleTask{ - Message: input.Message, - }). + err = h.tasks. + Add(tasks.ExampleTask{ + Message: input.Message, + }). Wait(time.Duration(input.Delay) * time.Second). Save() if err != nil { diff --git a/pkg/services/container.go b/pkg/services/container.go index baf08d7..8118706 100644 --- a/pkg/services/container.go +++ b/pkg/services/container.go @@ -6,9 +6,11 @@ import ( "github.com/labstack/echo/v4" _ "github.com/mattn/go-sqlite3" + "github.com/mikestefanello/backlite" "git.grosinger.net/tgrosinger/saasitone/config" "git.grosinger.net/tgrosinger/saasitone/pkg/funcmap" + "git.grosinger.net/tgrosinger/saasitone/pkg/log" ) // Container contains all services used by the application and provides an easy way to handle dependency @@ -39,7 +41,7 @@ type Container struct { TemplateRenderer *TemplateRenderer // Tasks stores the task client - Tasks *TaskClient + Tasks *backlite.Client } // NewContainer creates and initializes a new Container @@ -139,10 +141,21 @@ func (c *Container) initMail() { // initTasks initializes the task client func (c *Container) initTasks() { var err error + // You could use a separate database for tasks, if you'd like. but using one // makes transaction support easier - c.Tasks, err = NewTaskClient(c.Config.Tasks, c.DB.DB()) + c.Tasks, err = backlite.NewClient(backlite.ClientConfig{ + DB: c.DB.DB(), + Logger: log.Default(), + NumWorkers: c.Config.Tasks.Goroutines, + ReleaseAfter: c.Config.Tasks.ReleaseAfter, + CleanupInterval: c.Config.Tasks.CleanupInterval, + }) if err != nil { panic(fmt.Sprintf("failed to create task client: %v", err)) } + + if err = c.Tasks.Install(); err != nil { + panic(fmt.Sprintf("failed to install task schema: %v", err)) + } } diff --git a/pkg/services/tasks.go b/pkg/services/tasks.go deleted file mode 100644 index 547e282..0000000 --- a/pkg/services/tasks.go +++ /dev/null @@ -1,205 +0,0 @@ -package services - -import ( - "bytes" - "context" - "database/sql" - "encoding/gob" - "strings" - "sync" - "time" - - "github.com/maragudk/goqite" - "github.com/maragudk/goqite/jobs" - - "git.grosinger.net/tgrosinger/saasitone/config" - "git.grosinger.net/tgrosinger/saasitone/pkg/log" -) - -type ( - // TaskClient is that client that allows you to queue or schedule task execution. - // Under the hood we create only a single queue using goqite for all tasks because we do not want more than one - // runner to process the tasks. The TaskClient wrapper provides abstractions for separate, type-safe queues. - TaskClient struct { - queue *goqite.Queue - runner *jobs.Runner - buffers sync.Pool - } - - // Task is a job that can be added to a queue and later passed to and executed by a QueueSubscriber. - // See pkg/tasks for an example of how this can be used with a queue. - Task interface { - Name() string - } - - // TaskSaveOp handles task save operations - TaskSaveOp struct { - client *TaskClient - task Task - tx *sql.Tx - at *time.Time - wait *time.Duration - } - - // Queue is a queue that a Task can be pushed to for execution. - // While this can be implemented directly, it's recommended to use NewQueue() which uses generics in - // order to provide type-safe queues and queue subscriber callbacks for task execution. - Queue interface { - // Name returns the name of the task this queue processes - Name() string - - // Receive receives the Task payload to be processed - Receive(ctx context.Context, payload []byte) error - } - - // queue provides a type-safe implementation of Queue - queue[T Task] struct { - name string - subscriber QueueSubscriber[T] - } - - // QueueSubscriber is a generic subscriber callback for a given queue to process Tasks - QueueSubscriber[T Task] func(context.Context, T) error -) - -// NewTaskClient creates a new task client -func NewTaskClient(cfg config.TasksConfig, db *sql.DB) (*TaskClient, error) { - // Install the schema - if err := goqite.Setup(context.Background(), db); err != nil { - // An error is returned if we already ran this and there's no better way to check. - // You can and probably should handle this via migrations - if !strings.Contains(err.Error(), "already exists") { - return nil, err - } - } - - t := &TaskClient{ - queue: goqite.New(goqite.NewOpts{ - DB: db, - Name: "tasks", - MaxReceive: cfg.MaxRetries, - }), - buffers: sync.Pool{ - New: func() interface{} { - return bytes.NewBuffer(nil) - }, - }, - } - - t.runner = jobs.NewRunner(jobs.NewRunnerOpts{ - Limit: cfg.Goroutines, - Log: log.Default(), - PollInterval: cfg.PollInterval, - Queue: t.queue, - }) - - return t, nil -} - -// StartRunner starts the scheduler service which adds scheduled tasks to the queue. -// This must be running in order to execute queued tasked. -// To stop the runner, cancel the context. -// This is a blocking call. -func (t *TaskClient) StartRunner(ctx context.Context) { - t.runner.Start(ctx) -} - -// Register registers a queue so tasks can be added to it and processed -func (t *TaskClient) Register(queue Queue) { - t.runner.Register(queue.Name(), queue.Receive) -} - -// New starts a task save operation -func (t *TaskClient) New(task Task) *TaskSaveOp { - return &TaskSaveOp{ - client: t, - task: task, - } -} - -// At sets the exact date and time the task should be executed -func (t *TaskSaveOp) At(processAt time.Time) *TaskSaveOp { - t.Wait(time.Until(processAt)) - return t -} - -// Wait instructs the task to wait a given duration before it is executed -func (t *TaskSaveOp) Wait(duration time.Duration) *TaskSaveOp { - t.wait = &duration - return t -} - -// Tx will include the task as part of a given database transaction -func (t *TaskSaveOp) Tx(tx *sql.Tx) *TaskSaveOp { - t.tx = tx - return t -} - -// Save saves the task, so it can be queued for execution -func (t *TaskSaveOp) Save() error { - type message struct { - Name string - Message []byte - } - - // Encode the task - taskBuf := t.client.buffers.Get().(*bytes.Buffer) - if err := gob.NewEncoder(taskBuf).Encode(t.task); err != nil { - return err - } - - // Wrap and encode the message - // This is needed as a workaround because goqite doesn't support delays using the jobs package, - // so we format the message the way it expects but use the queue to supply the delay - msgBuf := t.client.buffers.Get().(*bytes.Buffer) - wrapper := message{Name: t.task.Name(), Message: taskBuf.Bytes()} - if err := gob.NewEncoder(msgBuf).Encode(wrapper); err != nil { - return err - } - - msg := goqite.Message{ - Body: msgBuf.Bytes(), - } - - if t.wait != nil { - msg.Delay = *t.wait - } - - // Put the buffers back in the pool for re-use - taskBuf.Reset() - msgBuf.Reset() - t.client.buffers.Put(taskBuf) - t.client.buffers.Put(msgBuf) - - if t.tx == nil { - return t.client.queue.Send(context.Background(), msg) - } else { - return t.client.queue.SendTx(context.Background(), t.tx, msg) - } -} - -// NewQueue queues a new type-safe Queue of a given Task type -func NewQueue[T Task](subscriber QueueSubscriber[T]) Queue { - var task T - - q := &queue[T]{ - name: task.Name(), - subscriber: subscriber, - } - - return q -} - -func (q *queue[T]) Name() string { - return q.name -} - -func (q *queue[T]) Receive(ctx context.Context, payload []byte) error { - var obj T - err := gob.NewDecoder(bytes.NewReader(payload)).Decode(&obj) - if err != nil { - return err - } - - return q.subscriber(ctx, obj) -} diff --git a/pkg/services/tasks_test.go b/pkg/services/tasks_test.go deleted file mode 100644 index a34385c..0000000 --- a/pkg/services/tasks_test.go +++ /dev/null @@ -1,69 +0,0 @@ -package services - -import ( - "context" - "database/sql" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "testing" - "time" -) - -type testTask struct { - Val int -} - -func (t testTask) Name() string { - return "test_task" -} - -func TestTaskClient_New(t *testing.T) { - var subCalled bool - - queue := NewQueue[testTask](func(ctx context.Context, task testTask) error { - subCalled = true - assert.Equal(t, 123, task.Val) - return nil - }) - c.Tasks.Register(queue) - - task := testTask{Val: 123} - - tx := &sql.Tx{} - - op := c.Tasks. - New(task). - Wait(5 * time.Second). - Tx(tx) - - // Check that the task op was built correctly - assert.Equal(t, task, op.task) - assert.Equal(t, tx, op.tx) - assert.Equal(t, 5*time.Second, *op.wait) - - // Remove the transaction and delay so we can process the task immediately - op.tx, op.wait = nil, nil - err := op.Save() - require.NoError(t, err) - - // Start the runner - ctx, cancel := context.WithCancel(context.Background()) - go c.Tasks.StartRunner(ctx) - defer cancel() - - // Check for up to 5 seconds if the task executed - start := time.Now() -waitLoop: - for { - switch { - case subCalled: - break waitLoop - case time.Since(start) > (5 * time.Second): - break waitLoop - default: - time.Sleep(10 * time.Millisecond) - } - } - - assert.True(t, subCalled) -} diff --git a/pkg/tasks/example.go b/pkg/tasks/example.go index 0a6019b..f3ac549 100644 --- a/pkg/tasks/example.go +++ b/pkg/tasks/example.go @@ -2,29 +2,49 @@ package tasks import ( "context" + "time" + + "github.com/mikestefanello/backlite" "git.grosinger.net/tgrosinger/saasitone/pkg/log" "git.grosinger.net/tgrosinger/saasitone/pkg/services" ) -// ExampleTask is an example implementation of services.Task +// ExampleTask is an example implementation of backlite.Task // This represents the task that can be queued for execution via the task client and should contain everything -// that your queue subscriber needs to process the task. +// that your queue processor needs to process the task. type ExampleTask struct { Message string } -// Name satisfies the services.Task interface by proviing a unique name for this Task type +// Config satisfies the backlite.Task interface by providing configuration for the queue that these items will be func (t ExampleTask) Name() string { + // placed into for execution. return "example_task" } +func (t ExampleTask) Config() backlite.QueueConfig { + return backlite.QueueConfig{ + Name: "ExampleTask", + MaxAttempts: 3, + Timeout: 5 * time.Second, + Backoff: 10 * time.Second, + Retention: &backlite.Retention{ + Duration: 24 * time.Hour, + OnlyFailed: false, + Data: &backlite.RetainData{ + OnlyFailed: false, + }, + }, + } +} + // NewExampleTaskQueue provides a Queue that can process ExampleTask tasks // The service container is provided so the subscriber can have access to the app dependencies. // All queues must be registered in the Register() function. // Whenever an ExampleTask is added to the task client, it will be queued and eventually sent here for execution. -func NewExampleTaskQueue(c *services.Container) services.Queue { - return services.NewQueue[ExampleTask](func(ctx context.Context, task ExampleTask) error { +func NewExampleTaskQueue(c *services.Container) backlite.Queue { + return backlite.NewQueue[ExampleTask](func(ctx context.Context, task ExampleTask) error { log.Default().Info("Example task received", "message", task.Message, )