220 lines
		
	
	
		
			4.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			220 lines
		
	
	
		
			4.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Package broker provides a distributed task manager built on the micro broker
 | 
						|
package broker
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"math/rand"
 | 
						|
	"strings"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/google/uuid"
 | 
						|
	"github.com/micro/go-micro/broker"
 | 
						|
	"github.com/micro/go-micro/sync/task"
 | 
						|
)
 | 
						|
 | 
						|
type brokerKey struct{}
 | 
						|
 | 
						|
// Task is a broker task
 | 
						|
type Task struct {
 | 
						|
	// a micro broker
 | 
						|
	Broker broker.Broker
 | 
						|
	// Options
 | 
						|
	Options task.Options
 | 
						|
 | 
						|
	mtx    sync.RWMutex
 | 
						|
	status string
 | 
						|
}
 | 
						|
 | 
						|
func returnError(err error, ch chan error) {
 | 
						|
	select {
 | 
						|
	case ch <- err:
 | 
						|
	default:
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (t *Task) Run(c task.Command) error {
 | 
						|
	// connect
 | 
						|
	t.Broker.Connect()
 | 
						|
	// unique id for this runner
 | 
						|
	id := uuid.New().String()
 | 
						|
	// topic of the command
 | 
						|
	topic := fmt.Sprintf("task.%s", c.Name)
 | 
						|
 | 
						|
	// global error
 | 
						|
	errCh := make(chan error, t.Options.Pool)
 | 
						|
 | 
						|
	// subscribe for distributed work
 | 
						|
	workFn := func(p broker.Publication) error {
 | 
						|
		msg := p.Message()
 | 
						|
 | 
						|
		// get command name
 | 
						|
		command := msg.Header["Command"]
 | 
						|
 | 
						|
		// check the command is what we expect
 | 
						|
		if command != c.Name {
 | 
						|
			returnError(errors.New("received unknown command: "+command), errCh)
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
 | 
						|
		// new task created
 | 
						|
		switch msg.Header["Status"] {
 | 
						|
		case "start":
 | 
						|
			// artificially delay start of processing
 | 
						|
			time.Sleep(time.Millisecond * time.Duration(10+rand.Intn(100)))
 | 
						|
 | 
						|
			// execute the function
 | 
						|
			err := c.Func()
 | 
						|
 | 
						|
			status := "done"
 | 
						|
			errors := ""
 | 
						|
 | 
						|
			if err != nil {
 | 
						|
				status = "error"
 | 
						|
				errors = err.Error()
 | 
						|
			}
 | 
						|
 | 
						|
			// create response
 | 
						|
			msg := &broker.Message{
 | 
						|
				Header: map[string]string{
 | 
						|
					"Command":   c.Name,
 | 
						|
					"Error":     errors,
 | 
						|
					"Id":        id,
 | 
						|
					"Status":    status,
 | 
						|
					"Timestamp": fmt.Sprintf("%d", time.Now().Unix()),
 | 
						|
				},
 | 
						|
				// Body is nil, may be used in future
 | 
						|
			}
 | 
						|
 | 
						|
			// publish end of task
 | 
						|
			if err := t.Broker.Publish(topic, msg); err != nil {
 | 
						|
				returnError(err, errCh)
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	// subscribe for the pool size
 | 
						|
	for i := 0; i < t.Options.Pool; i++ {
 | 
						|
		// subscribe to work
 | 
						|
		subWork, err := t.Broker.Subscribe(topic, workFn, broker.Queue(fmt.Sprintf("work.%d", i)))
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
 | 
						|
		// unsubscribe on completion
 | 
						|
		defer subWork.Unsubscribe()
 | 
						|
	}
 | 
						|
 | 
						|
	// subscribe to all status messages
 | 
						|
	subStatus, err := t.Broker.Subscribe(topic, func(p broker.Publication) error {
 | 
						|
		msg := p.Message()
 | 
						|
 | 
						|
		// get command name
 | 
						|
		command := msg.Header["Command"]
 | 
						|
 | 
						|
		// check the command is what we expect
 | 
						|
		if command != c.Name {
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
 | 
						|
		// check task status
 | 
						|
		switch msg.Header["Status"] {
 | 
						|
		// task is complete
 | 
						|
		case "done":
 | 
						|
			errCh <- nil
 | 
						|
		// someone failed
 | 
						|
		case "error":
 | 
						|
			returnError(errors.New(msg.Header["Error"]), errCh)
 | 
						|
		}
 | 
						|
 | 
						|
		return nil
 | 
						|
	})
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	defer subStatus.Unsubscribe()
 | 
						|
 | 
						|
	// a new task
 | 
						|
	msg := &broker.Message{
 | 
						|
		Header: map[string]string{
 | 
						|
			"Command":   c.Name,
 | 
						|
			"Id":        id,
 | 
						|
			"Status":    "start",
 | 
						|
			"Timestamp": fmt.Sprintf("%d", time.Now().Unix()),
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	// artificially delay the start of the task
 | 
						|
	time.Sleep(time.Millisecond * time.Duration(10+rand.Intn(100)))
 | 
						|
 | 
						|
	// publish the task
 | 
						|
	if err := t.Broker.Publish(topic, msg); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	var gerrors []string
 | 
						|
 | 
						|
	// wait for all responses
 | 
						|
	for i := 0; i < t.Options.Pool; i++ {
 | 
						|
		// check errors
 | 
						|
		err := <-errCh
 | 
						|
 | 
						|
		// append to errors
 | 
						|
		if err != nil {
 | 
						|
			gerrors = append(gerrors, err.Error())
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// return the errors
 | 
						|
	if len(gerrors) > 0 {
 | 
						|
		return errors.New("errors: " + strings.Join(gerrors, "\n"))
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (t *Task) Status() string {
 | 
						|
	t.mtx.RLock()
 | 
						|
	defer t.mtx.RUnlock()
 | 
						|
	return t.status
 | 
						|
}
 | 
						|
 | 
						|
// Broker sets the micro broker
 | 
						|
func WithBroker(b broker.Broker) task.Option {
 | 
						|
	return func(o *task.Options) {
 | 
						|
		if o.Context == nil {
 | 
						|
			o.Context = context.Background()
 | 
						|
		}
 | 
						|
		o.Context = context.WithValue(o.Context, brokerKey{}, b)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// NewTask returns a new broker task
 | 
						|
func NewTask(opts ...task.Option) task.Task {
 | 
						|
	options := task.Options{
 | 
						|
		Context: context.Background(),
 | 
						|
	}
 | 
						|
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&options)
 | 
						|
	}
 | 
						|
 | 
						|
	if options.Pool == 0 {
 | 
						|
		options.Pool = 1
 | 
						|
	}
 | 
						|
 | 
						|
	b, ok := options.Context.Value(brokerKey{}).(broker.Broker)
 | 
						|
	if !ok {
 | 
						|
		b = broker.DefaultBroker
 | 
						|
	}
 | 
						|
 | 
						|
	return &Task{
 | 
						|
		Broker:  b,
 | 
						|
		Options: options,
 | 
						|
	}
 | 
						|
}
 |