Added task worker service and example task processor.

This commit is contained in:
mikestefanello 2022-02-08 08:58:22 -05:00
parent 726556e973
commit 83cdbc4395
6 changed files with 105 additions and 13 deletions

View File

@ -54,4 +54,10 @@ run:
# Run all tests
.PHONY: test
test:
go test -p 1 ./...
go test -p 1 ./...
# Run the worker
.PHONY: worker
worker:
clear
go run worker/worker.go

View File

@ -85,7 +85,7 @@
* [Tasks](#tasks)
* [Queues](#queues)
* [Scheduled tasks](#scheduled-tasks)
* [Workers](#workers)
* [Worker](#worker)
* [Monitoring](#monitoring)
* [Static files](#static-files)
* [Cache control headers](#cache-control-headers)
@ -1004,7 +1004,7 @@ For more detailed information about [asynq](https://github.com/hibiken/asynq) an
### Queues
All tasks must be placed in to queues in order to be executed by [workers](#workers). You are not required to specify a queue when creating a task, as it will be placed in the default queue if one is not provided. [Asynq](https://github.com/hibiken/asynq) supports multiple queues which allows for functionality such as [prioritization](https://github.com/hibiken/asynq/wiki/Queue-Priority).
All tasks must be placed in to queues in order to be executed by the [worker](#worker). You are not required to specify a queue when creating a task, as it will be placed in the default queue if one is not provided. [Asynq](https://github.com/hibiken/asynq) supports multiple queues which allows for functionality such as [prioritization](https://github.com/hibiken/asynq/wiki/Queue-Priority).
Creating a queued task is easy and at the minimum only requires the name of the task:
@ -1014,7 +1014,7 @@ err := c.Tasks.
Save()
```
This will add a task to the _default_ queue with a task _name_ of `my_task`. The name is used to route the task to the correct [worker](#workers).
This will add a task to the _default_ queue with a task _type_ of `my_task`. The type is used to route the task to the correct [worker](#worker).
#### Options
@ -1033,7 +1033,7 @@ err := c.Tasks.
```
In this example, this task will be:
- Assigned a task name of `my_task`
- Assigned a task type of `my_task`
- The task worker will be sent `taskData` as the payload
- Put in to the `critical` queue
- Be retried up to 5 times in the event of a failure
@ -1086,11 +1086,30 @@ go func() {
In the event of an application restart, periodic tasks must be re-registered with the _scheduler_ in order to continue being queued for execution.
### Workers
### Worker
Workers are what executes the queued tasks. No workers are included so you will have to implement your own for each task you need to support. You have the option of listening to and executing tasks within this application, or creating a separate application to faciliate this.
The worker is a service that executes the queued tasks using task processors. Included is a basic implementation of a separate worker service that will listen for and execute tasks being added to the queues. If you prefer to move the worker so it runs alongside the web server, you can do that, though it's recommended to keep these processes separate for performance and scalability reasons.
The [asynq quickstarter](https://github.com/hibiken/asynq#quickstart) provides a clear example of how to go about implementing this by leveraging `asynq.NewServer` to listen for queued tasks and `asynq.NewServeMux` to route tasks to their workers much like an HTTP router does.
The underlying functionality of the worker service is provided by [asynq](https://github.com/hibiken/asynq), so it's highly recommended that you review the documentation for that project first.
#### Starting the worker
A make target was added to allow you to start the worker service easily. From the root of the repository, execute `make worker`.
#### Understanding the service
The worker service is located in [worker/worker.go](/worker/worker.go) and starts with the creation of a new `*asynq.Server` provided by `asynq.NewServer()`. There are various configuration options available, so be sure to review them all.
Prior to starting the service, we need to route tasks according to their _type_ to their handlers which will process the tasks. This is done by using `async.ServeMux` much like you would use an HTTP router:
```go
mux := asynq.NewServeMux()
mux.Handle(tasks.TypeExample, new(tasks.ExampleProcessor))
```
In this example, all tasks of _type_ `tasks.TypeExample` will be routed to `ExampleProcessor` which is a struct that implements `ProcessTask()`. See the included [basic example](/worker/tasks/example.go).
Finally, the service is started with `async.Server.Run(mux)`.
### Monitoring

View File

@ -22,7 +22,7 @@ type (
// task handles task creation operations
task struct {
client *TaskClient
name string
typ string
payload interface{}
periodic *string
queue *string
@ -67,10 +67,10 @@ func (t *TaskClient) StartScheduler() error {
}
// New starts a task creation operation
func (t *TaskClient) New(name string) *task {
func (t *TaskClient) New(typ string) *task {
return &task{
client: t,
name: name,
typ: typ,
}
}
@ -167,7 +167,7 @@ func (t *task) Save() error {
}
// Build the task
task := asynq.NewTask(t.name, payload, opts...)
task := asynq.NewTask(t.typ, payload, opts...)
// Schedule, if needed
if t.periodic != nil {

View File

@ -21,7 +21,7 @@ func TestTaskClient_New(t *testing.T) {
Wait(6 * time.Second).
Retain(7 * time.Second)
assert.Equal(t, "task1", tk.name)
assert.Equal(t, "task1", tk.typ)
assert.Equal(t, "payload", tk.payload.(string))
assert.Equal(t, "queue", *tk.queue)
assert.Equal(t, "@every 5s", *tk.periodic)

22
worker/tasks/example.go Normal file
View File

@ -0,0 +1,22 @@
package tasks
import (
"context"
"log"
"github.com/hibiken/asynq"
)
// TypeExample is the type for the example task.
// This is what is passed in to TaskClient.New() when creating a new task
const TypeExample = "example_task"
// ExampleProcessor processes example tasks
type ExampleProcessor struct {
}
// ProcessTask handles the processing of the task
func (p *ExampleProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
log.Printf("executing task: %s", t.Type())
return nil
}

45
worker/worker.go Normal file
View File

@ -0,0 +1,45 @@
package main
import (
"fmt"
"log"
"github.com/hibiken/asynq"
"github.com/mikestefanello/pagoda/config"
"github.com/mikestefanello/pagoda/worker/tasks"
)
func main() {
// Load the configuration
cfg, err := config.GetConfig()
if err != nil {
panic(fmt.Sprintf("failed to load config: %v", err))
}
// Build the worker server
srv := asynq.NewServer(
asynq.RedisClientOpt{
Addr: fmt.Sprintf("%s:%d", cfg.Cache.Hostname, cfg.Cache.Port),
DB: cfg.Cache.Database,
Password: cfg.Cache.Password,
},
asynq.Config{
// See asynq.Config for all available options and explanation
Concurrency: 10,
Queues: map[string]int{
"critical": 6,
"default": 3,
"low": 1,
},
},
)
// Map task types to the handlers
mux := asynq.NewServeMux()
mux.Handle(tasks.TypeExample, new(tasks.ExampleProcessor))
// Start the worker server
if err := srv.Run(mux); err != nil {
log.Fatalf("could not run worker server: %v", err)
}
}