Add sync => go-sync

This commit is contained in:
Asim Aslam
2019-05-31 00:43:23 +01:00
parent 4035ab5c7b
commit 95d134b57e
28 changed files with 2192 additions and 0 deletions

219
sync/task/broker/broker.go Normal file
View File

@@ -0,0 +1,219 @@
// Package broker provides a distributed task manager built on the micro broker
package broker
import (
"context"
"errors"
"fmt"
"math/rand"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/sync/task"
)
type brokerKey struct{}
// Task is a broker task
type Task struct {
// a micro broker
Broker broker.Broker
// Options
Options task.Options
mtx sync.RWMutex
status string
}
func returnError(err error, ch chan error) {
select {
case ch <- err:
default:
}
}
func (t *Task) Run(c task.Command) error {
// connect
t.Broker.Connect()
// unique id for this runner
id := uuid.New().String()
// topic of the command
topic := fmt.Sprintf("task.%s", c.Name)
// global error
errCh := make(chan error, t.Options.Pool)
// subscribe for distributed work
workFn := func(p broker.Publication) error {
msg := p.Message()
// get command name
command := msg.Header["Command"]
// check the command is what we expect
if command != c.Name {
returnError(errors.New("received unknown command: "+command), errCh)
return nil
}
// new task created
switch msg.Header["Status"] {
case "start":
// artificially delay start of processing
time.Sleep(time.Millisecond * time.Duration(10+rand.Intn(100)))
// execute the function
err := c.Func()
status := "done"
errors := ""
if err != nil {
status = "error"
errors = err.Error()
}
// create response
msg := &broker.Message{
Header: map[string]string{
"Command": c.Name,
"Error": errors,
"Id": id,
"Status": status,
"Timestamp": fmt.Sprintf("%d", time.Now().Unix()),
},
// Body is nil, may be used in future
}
// publish end of task
if err := t.Broker.Publish(topic, msg); err != nil {
returnError(err, errCh)
}
}
return nil
}
// subscribe for the pool size
for i := 0; i < t.Options.Pool; i++ {
// subscribe to work
subWork, err := t.Broker.Subscribe(topic, workFn, broker.Queue(fmt.Sprintf("work.%d", i)))
if err != nil {
return err
}
// unsubscribe on completion
defer subWork.Unsubscribe()
}
// subscribe to all status messages
subStatus, err := t.Broker.Subscribe(topic, func(p broker.Publication) error {
msg := p.Message()
// get command name
command := msg.Header["Command"]
// check the command is what we expect
if command != c.Name {
return nil
}
// check task status
switch msg.Header["Status"] {
// task is complete
case "done":
errCh <- nil
// someone failed
case "error":
returnError(errors.New(msg.Header["Error"]), errCh)
}
return nil
})
if err != nil {
return err
}
defer subStatus.Unsubscribe()
// a new task
msg := &broker.Message{
Header: map[string]string{
"Command": c.Name,
"Id": id,
"Status": "start",
"Timestamp": fmt.Sprintf("%d", time.Now().Unix()),
},
}
// artificially delay the start of the task
time.Sleep(time.Millisecond * time.Duration(10+rand.Intn(100)))
// publish the task
if err := t.Broker.Publish(topic, msg); err != nil {
return err
}
var gerrors []string
// wait for all responses
for i := 0; i < t.Options.Pool; i++ {
// check errors
err := <-errCh
// append to errors
if err != nil {
gerrors = append(gerrors, err.Error())
}
}
// return the errors
if len(gerrors) > 0 {
return errors.New("errors: " + strings.Join(gerrors, "\n"))
}
return nil
}
func (t *Task) Status() string {
t.mtx.RLock()
defer t.mtx.RUnlock()
return t.status
}
// Broker sets the micro broker
func WithBroker(b broker.Broker) task.Option {
return func(o *task.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, brokerKey{}, b)
}
}
// NewTask returns a new broker task
func NewTask(opts ...task.Option) task.Task {
options := task.Options{
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
if options.Pool == 0 {
options.Pool = 1
}
b, ok := options.Context.Value(brokerKey{}).(broker.Broker)
if !ok {
b = broker.DefaultBroker
}
return &Task{
Broker: b,
Options: options,
}
}

59
sync/task/local/local.go Normal file
View File

@@ -0,0 +1,59 @@
// Package local provides a local task runner
package local
import (
"fmt"
"sync"
"github.com/micro/go-micro/sync/task"
)
type localTask struct {
opts task.Options
mtx sync.RWMutex
status string
}
func (l *localTask) Run(t task.Command) error {
ch := make(chan error, l.opts.Pool)
for i := 0; i < l.opts.Pool; i++ {
go func() {
ch <- t.Execute()
}()
}
var err error
for i := 0; i < l.opts.Pool; i++ {
er := <-ch
if err != nil {
err = er
l.mtx.Lock()
l.status = fmt.Sprintf("command [%s] status: %s", t.Name, err.Error())
l.mtx.Unlock()
}
}
close(ch)
return err
}
func (l *localTask) Status() string {
l.mtx.RLock()
defer l.mtx.RUnlock()
return l.status
}
func NewTask(opts ...task.Option) task.Task {
var options task.Options
for _, o := range opts {
o(&options)
}
if options.Pool == 0 {
options.Pool = 1
}
return &localTask{
opts: options,
}
}

83
sync/task/task.go Normal file
View File

@@ -0,0 +1,83 @@
// Package task provides an interface for distributed jobs
package task
import (
"context"
"fmt"
"time"
)
// Task represents a distributed task
type Task interface {
// Run runs a command immediately until completion
Run(Command) error
// Status provides status of last execution
Status() string
}
// Command to be executed
type Command struct {
Name string
Func func() error
}
// Schedule represents a time or interval at which a task should run
type Schedule struct {
// When to start the schedule. Zero time means immediately
Time time.Time
// Non zero interval dictates an ongoing schedule
Interval time.Duration
}
type Options struct {
// Pool size for workers
Pool int
// Alternative options
Context context.Context
}
type Option func(o *Options)
func (c Command) Execute() error {
return c.Func()
}
func (c Command) String() string {
return c.Name
}
func (s Schedule) Run() <-chan time.Time {
d := s.Time.Sub(time.Now())
ch := make(chan time.Time, 1)
go func() {
// wait for start time
<-time.After(d)
// zero interval
if s.Interval == time.Duration(0) {
ch <- time.Now()
close(ch)
return
}
// start ticker
for t := range time.Tick(s.Interval) {
ch <- t
}
}()
return ch
}
func (s Schedule) String() string {
return fmt.Sprintf("%d-%d", s.Time.Unix(), s.Interval)
}
// WithPool sets the pool size for concurrent work
func WithPool(i int) Option {
return func(o *Options) {
o.Pool = i
}
}