micro/sync/task/broker/broker.go
2019-05-31 00:43:23 +01:00

220 lines
4.1 KiB
Go

// Package broker provides a distributed task manager built on the micro broker
package broker
import (
"context"
"errors"
"fmt"
"math/rand"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/sync/task"
)
type brokerKey struct{}
// Task is a broker task
type Task struct {
// a micro broker
Broker broker.Broker
// Options
Options task.Options
mtx sync.RWMutex
status string
}
func returnError(err error, ch chan error) {
select {
case ch <- err:
default:
}
}
func (t *Task) Run(c task.Command) error {
// connect
t.Broker.Connect()
// unique id for this runner
id := uuid.New().String()
// topic of the command
topic := fmt.Sprintf("task.%s", c.Name)
// global error
errCh := make(chan error, t.Options.Pool)
// subscribe for distributed work
workFn := func(p broker.Publication) error {
msg := p.Message()
// get command name
command := msg.Header["Command"]
// check the command is what we expect
if command != c.Name {
returnError(errors.New("received unknown command: "+command), errCh)
return nil
}
// new task created
switch msg.Header["Status"] {
case "start":
// artificially delay start of processing
time.Sleep(time.Millisecond * time.Duration(10+rand.Intn(100)))
// execute the function
err := c.Func()
status := "done"
errors := ""
if err != nil {
status = "error"
errors = err.Error()
}
// create response
msg := &broker.Message{
Header: map[string]string{
"Command": c.Name,
"Error": errors,
"Id": id,
"Status": status,
"Timestamp": fmt.Sprintf("%d", time.Now().Unix()),
},
// Body is nil, may be used in future
}
// publish end of task
if err := t.Broker.Publish(topic, msg); err != nil {
returnError(err, errCh)
}
}
return nil
}
// subscribe for the pool size
for i := 0; i < t.Options.Pool; i++ {
// subscribe to work
subWork, err := t.Broker.Subscribe(topic, workFn, broker.Queue(fmt.Sprintf("work.%d", i)))
if err != nil {
return err
}
// unsubscribe on completion
defer subWork.Unsubscribe()
}
// subscribe to all status messages
subStatus, err := t.Broker.Subscribe(topic, func(p broker.Publication) error {
msg := p.Message()
// get command name
command := msg.Header["Command"]
// check the command is what we expect
if command != c.Name {
return nil
}
// check task status
switch msg.Header["Status"] {
// task is complete
case "done":
errCh <- nil
// someone failed
case "error":
returnError(errors.New(msg.Header["Error"]), errCh)
}
return nil
})
if err != nil {
return err
}
defer subStatus.Unsubscribe()
// a new task
msg := &broker.Message{
Header: map[string]string{
"Command": c.Name,
"Id": id,
"Status": "start",
"Timestamp": fmt.Sprintf("%d", time.Now().Unix()),
},
}
// artificially delay the start of the task
time.Sleep(time.Millisecond * time.Duration(10+rand.Intn(100)))
// publish the task
if err := t.Broker.Publish(topic, msg); err != nil {
return err
}
var gerrors []string
// wait for all responses
for i := 0; i < t.Options.Pool; i++ {
// check errors
err := <-errCh
// append to errors
if err != nil {
gerrors = append(gerrors, err.Error())
}
}
// return the errors
if len(gerrors) > 0 {
return errors.New("errors: " + strings.Join(gerrors, "\n"))
}
return nil
}
func (t *Task) Status() string {
t.mtx.RLock()
defer t.mtx.RUnlock()
return t.status
}
// Broker sets the micro broker
func WithBroker(b broker.Broker) task.Option {
return func(o *task.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, brokerKey{}, b)
}
}
// NewTask returns a new broker task
func NewTask(opts ...task.Option) task.Task {
options := task.Options{
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
if options.Pool == 0 {
options.Pool = 1
}
b, ok := options.Context.Value(brokerKey{}).(broker.Broker)
if !ok {
b = broker.DefaultBroker
}
return &Task{
Broker: b,
Options: options,
}
}