// Package broker provides a distributed task manager built on the micro broker package broker import ( "context" "errors" "fmt" "math/rand" "strings" "sync" "time" "github.com/google/uuid" "github.com/micro/go-micro/v2/broker" "github.com/micro/go-micro/v2/sync/task" ) type brokerKey struct{} // Task is a broker task type Task struct { // a micro broker Broker broker.Broker // Options Options task.Options mtx sync.RWMutex status string } func returnError(err error, ch chan error) { select { case ch <- err: default: } } func (t *Task) Run(c task.Command) error { // connect t.Broker.Connect() // unique id for this runner id := uuid.New().String() // topic of the command topic := fmt.Sprintf("task.%s", c.Name) // global error errCh := make(chan error, t.Options.Pool) // subscribe for distributed work workFn := func(p broker.Event) error { msg := p.Message() // get command name command := msg.Header["Command"] // check the command is what we expect if command != c.Name { returnError(errors.New("received unknown command: "+command), errCh) return nil } // new task created switch msg.Header["Status"] { case "start": // artificially delay start of processing time.Sleep(time.Millisecond * time.Duration(10+rand.Intn(100))) // execute the function err := c.Func() status := "done" errors := "" if err != nil { status = "error" errors = err.Error() } // create response msg := &broker.Message{ Header: map[string]string{ "Command": c.Name, "Error": errors, "Id": id, "Status": status, "Timestamp": fmt.Sprintf("%d", time.Now().Unix()), }, // Body is nil, may be used in future } // publish end of task if err := t.Broker.Publish(topic, msg); err != nil { returnError(err, errCh) } } return nil } // subscribe for the pool size for i := 0; i < t.Options.Pool; i++ { err := func() error { // subscribe to work subWork, err := t.Broker.Subscribe(topic, workFn, broker.Queue(fmt.Sprintf("work.%d", i))) if err != nil { return err } // unsubscribe on completion defer subWork.Unsubscribe() return nil }() if err != nil { return err } } // subscribe to all status messages subStatus, err := t.Broker.Subscribe(topic, func(p broker.Event) error { msg := p.Message() // get command name command := msg.Header["Command"] // check the command is what we expect if command != c.Name { return nil } // check task status switch msg.Header["Status"] { // task is complete case "done": errCh <- nil // someone failed case "error": returnError(errors.New(msg.Header["Error"]), errCh) } return nil }) if err != nil { return err } defer subStatus.Unsubscribe() // a new task msg := &broker.Message{ Header: map[string]string{ "Command": c.Name, "Id": id, "Status": "start", "Timestamp": fmt.Sprintf("%d", time.Now().Unix()), }, } // artificially delay the start of the task time.Sleep(time.Millisecond * time.Duration(10+rand.Intn(100))) // publish the task if err := t.Broker.Publish(topic, msg); err != nil { return err } var gerrors []string // wait for all responses for i := 0; i < t.Options.Pool; i++ { // check errors err := <-errCh // append to errors if err != nil { gerrors = append(gerrors, err.Error()) } } // return the errors if len(gerrors) > 0 { return errors.New("errors: " + strings.Join(gerrors, "\n")) } return nil } func (t *Task) Status() string { t.mtx.RLock() defer t.mtx.RUnlock() return t.status } // Broker sets the micro broker func WithBroker(b broker.Broker) task.Option { return func(o *task.Options) { if o.Context == nil { o.Context = context.Background() } o.Context = context.WithValue(o.Context, brokerKey{}, b) } } // NewTask returns a new broker task func NewTask(opts ...task.Option) task.Task { options := task.Options{ Context: context.Background(), } for _, o := range opts { o(&options) } if options.Pool == 0 { options.Pool = 1 } b, ok := options.Context.Value(brokerKey{}).(broker.Broker) if !ok { b = broker.DefaultBroker } return &Task{ Broker: b, Options: options, } }