Completely replace sync implementation

This commit is contained in:
Asim Aslam 2020-04-11 10:37:54 +01:00
parent 6d553cb6fe
commit 39470c1b11
21 changed files with 435 additions and 1441 deletions

View File

@ -1,99 +0,0 @@
package sync
import (
"fmt"
"math"
"time"
"github.com/micro/go-micro/v2/logger"
"github.com/micro/go-micro/v2/sync/leader/etcd"
"github.com/micro/go-micro/v2/sync/task"
"github.com/micro/go-micro/v2/sync/task/local"
)
type syncCron struct {
opts Options
}
func backoff(attempts int) time.Duration {
if attempts == 0 {
return time.Duration(0)
}
return time.Duration(math.Pow(10, float64(attempts))) * time.Millisecond
}
func (c *syncCron) Schedule(s task.Schedule, t task.Command) error {
id := fmt.Sprintf("%s-%s", s.String(), t.String())
go func() {
// run the scheduler
tc := s.Run()
var i int
for {
// leader election
e, err := c.opts.Leader.Elect(id)
if err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("[cron] leader election error: %v", err)
}
time.Sleep(backoff(i))
i++
continue
}
i = 0
r := e.Revoked()
// execute the task
Tick:
for {
select {
// schedule tick
case _, ok := <-tc:
// ticked once
if !ok {
break Tick
}
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("[cron] executing command %s", t.Name)
}
if err := c.opts.Task.Run(t); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("[cron] error executing command %s: %v", t.Name, err)
}
}
// leader revoked
case <-r:
break Tick
}
}
// resign
e.Resign()
}
}()
return nil
}
func NewCron(opts ...Option) Cron {
var options Options
for _, o := range opts {
o(&options)
}
if options.Leader == nil {
options.Leader = etcd.NewLeader()
}
if options.Task == nil {
options.Task = local.NewTask()
}
return &syncCron{
opts: options,
}
}

179
sync/etcd/etcd.go Normal file
View File

@ -0,0 +1,179 @@
// Package etcd is an etcd implementation of lock
package etcd
import (
"context"
"errors"
"log"
"path"
"strings"
gosync "sync"
client "github.com/coreos/etcd/clientv3"
cc "github.com/coreos/etcd/clientv3/concurrency"
"github.com/micro/go-micro/v2/sync"
)
type etcdSync struct {
options sync.Options
path string
client *client.Client
mtx gosync.Mutex
locks map[string]*etcdLock
}
type etcdLock struct {
s *cc.Session
m *cc.Mutex
}
type etcdLeader struct {
opts sync.LeaderOptions
s *cc.Session
e *cc.Election
id string
}
func (e *etcdSync) Leader(id string, opts ...sync.LeaderOption) (sync.Leader, error) {
var options sync.LeaderOptions
for _, o := range opts {
o(&options)
}
// make path
path := path.Join(e.path, strings.Replace(e.options.Prefix+id, "/", "-", -1))
s, err := cc.NewSession(e.client)
if err != nil {
return nil, err
}
l := cc.NewElection(s, path)
if err := l.Campaign(context.TODO(), id); err != nil {
return nil, err
}
return &etcdLeader{
opts: options,
e: l,
id: id,
}, nil
}
func (e *etcdLeader) Status() chan bool {
ch := make(chan bool, 1)
ech := e.e.Observe(context.Background())
go func() {
for r := range ech {
if string(r.Kvs[0].Value) != e.id {
ch <- true
close(ch)
return
}
}
}()
return ch
}
func (e *etcdLeader) Resign() error {
return e.e.Resign(context.Background())
}
func (e *etcdSync) Init(opts ...sync.Option) error {
for _, o := range opts {
o(&e.options)
}
return nil
}
func (e *etcdSync) Options() sync.Options {
return e.options
}
func (e *etcdSync) Lock(id string, opts ...sync.LockOption) error {
var options sync.LockOptions
for _, o := range opts {
o(&options)
}
// make path
path := path.Join(e.path, strings.Replace(e.options.Prefix+id, "/", "-", -1))
var sopts []cc.SessionOption
if options.TTL > 0 {
sopts = append(sopts, cc.WithTTL(int(options.TTL.Seconds())))
}
s, err := cc.NewSession(e.client, sopts...)
if err != nil {
return err
}
m := cc.NewMutex(s, path)
if err := m.Lock(context.TODO()); err != nil {
return err
}
e.mtx.Lock()
e.locks[id] = &etcdLock{
s: s,
m: m,
}
e.mtx.Unlock()
return nil
}
func (e *etcdSync) Unlock(id string) error {
e.mtx.Lock()
defer e.mtx.Unlock()
v, ok := e.locks[id]
if !ok {
return errors.New("lock not found")
}
err := v.m.Unlock(context.Background())
delete(e.locks, id)
return err
}
func (e *etcdSync) String() string {
return "etcd"
}
func NewSync(opts ...sync.Option) sync.Sync {
var options sync.Options
for _, o := range opts {
o(&options)
}
var endpoints []string
for _, addr := range options.Nodes {
if len(addr) > 0 {
endpoints = append(endpoints, addr)
}
}
if len(endpoints) == 0 {
endpoints = []string{"http://127.0.0.1:2379"}
}
// TODO: parse addresses
c, err := client.New(client.Config{
Endpoints: endpoints,
})
if err != nil {
log.Fatal(err)
}
return &etcdSync{
path: "/micro/sync",
client: c,
options: options,
locks: make(map[string]*etcdLock),
}
}

View File

@ -1,136 +0,0 @@
package etcd
import (
"context"
"log"
"path"
"strings"
client "github.com/coreos/etcd/clientv3"
cc "github.com/coreos/etcd/clientv3/concurrency"
"github.com/micro/go-micro/v2/sync/leader"
)
type etcdLeader struct {
opts leader.Options
path string
client *client.Client
}
type etcdElected struct {
s *cc.Session
e *cc.Election
id string
}
func (e *etcdLeader) Elect(id string, opts ...leader.ElectOption) (leader.Elected, error) {
var options leader.ElectOptions
for _, o := range opts {
o(&options)
}
// make path
path := path.Join(e.path, strings.Replace(id, "/", "-", -1))
s, err := cc.NewSession(e.client)
if err != nil {
return nil, err
}
l := cc.NewElection(s, path)
if err := l.Campaign(context.TODO(), id); err != nil {
return nil, err
}
return &etcdElected{
e: l,
id: id,
}, nil
}
func (e *etcdLeader) Follow() chan string {
ch := make(chan string)
s, err := cc.NewSession(e.client)
if err != nil {
return ch
}
l := cc.NewElection(s, e.path)
ech := l.Observe(context.Background())
go func() {
for r := range ech {
ch <- string(r.Kvs[0].Value)
}
}()
return ch
}
func (e *etcdLeader) String() string {
return "etcd"
}
func (e *etcdElected) Reelect() error {
return e.e.Campaign(context.TODO(), e.id)
}
func (e *etcdElected) Revoked() chan bool {
ch := make(chan bool, 1)
ech := e.e.Observe(context.Background())
go func() {
for r := range ech {
if string(r.Kvs[0].Value) != e.id {
ch <- true
close(ch)
return
}
}
}()
return ch
}
func (e *etcdElected) Resign() error {
return e.e.Resign(context.Background())
}
func (e *etcdElected) Id() string {
return e.id
}
func NewLeader(opts ...leader.Option) leader.Leader {
var options leader.Options
for _, o := range opts {
o(&options)
}
var endpoints []string
for _, addr := range options.Nodes {
if len(addr) > 0 {
endpoints = append(endpoints, addr)
}
}
if len(endpoints) == 0 {
endpoints = []string{"http://127.0.0.1:2379"}
}
// TODO: parse addresses
c, err := client.New(client.Config{
Endpoints: endpoints,
})
if err != nil {
log.Fatal(err)
}
return &etcdLeader{
path: "/micro/leader",
client: c,
opts: options,
}
}

View File

@ -1,25 +0,0 @@
// Package leader provides leader election
package leader
// Leader provides leadership election
type Leader interface {
// elect leader
Elect(id string, opts ...ElectOption) (Elected, error)
// follow the leader
Follow() chan string
}
type Elected interface {
// id of leader
Id() string
// seek re-election
Reelect() error
// resign leadership
Resign() error
// observe leadership revocation
Revoked() chan bool
}
type Option func(o *Options)
type ElectOption func(o *ElectOptions)

View File

@ -1,22 +0,0 @@
package leader
type Options struct {
Nodes []string
Group string
}
type ElectOptions struct{}
// Nodes sets the addresses of the underlying systems
func Nodes(a ...string) Option {
return func(o *Options) {
o.Nodes = a
}
}
// Group sets the group name for coordinating leadership
func Group(g string) Option {
return func(o *Options) {
o.Group = g
}
}

View File

@ -1,113 +0,0 @@
// Package etcd is an etcd implementation of lock
package etcd
import (
"context"
"errors"
"log"
"path"
"strings"
"sync"
client "github.com/coreos/etcd/clientv3"
cc "github.com/coreos/etcd/clientv3/concurrency"
"github.com/micro/go-micro/v2/sync/lock"
)
type etcdLock struct {
opts lock.Options
path string
client *client.Client
sync.Mutex
locks map[string]*elock
}
type elock struct {
s *cc.Session
m *cc.Mutex
}
func (e *etcdLock) Acquire(id string, opts ...lock.AcquireOption) error {
var options lock.AcquireOptions
for _, o := range opts {
o(&options)
}
// make path
path := path.Join(e.path, strings.Replace(e.opts.Prefix+id, "/", "-", -1))
var sopts []cc.SessionOption
if options.TTL > 0 {
sopts = append(sopts, cc.WithTTL(int(options.TTL.Seconds())))
}
s, err := cc.NewSession(e.client, sopts...)
if err != nil {
return err
}
m := cc.NewMutex(s, path)
if err := m.Lock(context.TODO()); err != nil {
return err
}
e.Lock()
e.locks[id] = &elock{
s: s,
m: m,
}
e.Unlock()
return nil
}
func (e *etcdLock) Release(id string) error {
e.Lock()
defer e.Unlock()
v, ok := e.locks[id]
if !ok {
return errors.New("lock not found")
}
err := v.m.Unlock(context.Background())
delete(e.locks, id)
return err
}
func (e *etcdLock) String() string {
return "etcd"
}
func NewLock(opts ...lock.Option) lock.Lock {
var options lock.Options
for _, o := range opts {
o(&options)
}
var endpoints []string
for _, addr := range options.Nodes {
if len(addr) > 0 {
endpoints = append(endpoints, addr)
}
}
if len(endpoints) == 0 {
endpoints = []string{"http://127.0.0.1:2379"}
}
// TODO: parse addresses
c, err := client.New(client.Config{
Endpoints: endpoints,
})
if err != nil {
log.Fatal(err)
}
return &etcdLock{
path: "/micro/lock",
client: c,
opts: options,
locks: make(map[string]*elock),
}
}

View File

@ -1,135 +0,0 @@
// Package http adds a http lock implementation
package http
import (
"errors"
"fmt"
"hash/crc32"
"io/ioutil"
"net/http"
"net/url"
"path/filepath"
"strings"
"github.com/micro/go-micro/v2/sync/lock"
)
var (
DefaultPath = "/sync/lock"
DefaultAddress = "localhost:8080"
)
type httpLock struct {
opts lock.Options
}
func (h *httpLock) url(do, id string) (string, error) {
sum := crc32.ChecksumIEEE([]byte(id))
node := h.opts.Nodes[sum%uint32(len(h.opts.Nodes))]
// parse the host:port or whatever
uri, err := url.Parse(node)
if err != nil {
return "", err
}
if len(uri.Scheme) == 0 {
uri.Scheme = "http"
}
// set path
// build path
path := filepath.Join(DefaultPath, do, h.opts.Prefix, id)
uri.Path = path
// return url
return uri.String(), nil
}
func (h *httpLock) Acquire(id string, opts ...lock.AcquireOption) error {
var options lock.AcquireOptions
for _, o := range opts {
o(&options)
}
uri, err := h.url("acquire", id)
if err != nil {
return err
}
ttl := fmt.Sprintf("%d", int64(options.TTL.Seconds()))
wait := fmt.Sprintf("%d", int64(options.Wait.Seconds()))
rsp, err := http.PostForm(uri, url.Values{
"id": {id},
"ttl": {ttl},
"wait": {wait},
})
if err != nil {
return err
}
defer rsp.Body.Close()
b, err := ioutil.ReadAll(rsp.Body)
if err != nil {
return err
}
// success
if rsp.StatusCode == 200 {
return nil
}
// return error
return errors.New(string(b))
}
func (h *httpLock) Release(id string) error {
uri, err := h.url("release", id)
if err != nil {
return err
}
vals := url.Values{
"id": {id},
}
req, err := http.NewRequest("DELETE", uri, strings.NewReader(vals.Encode()))
if err != nil {
return err
}
rsp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer rsp.Body.Close()
b, err := ioutil.ReadAll(rsp.Body)
if err != nil {
return err
}
// success
if rsp.StatusCode == 200 {
return nil
}
// return error
return errors.New(string(b))
}
func NewLock(opts ...lock.Option) lock.Lock {
var options lock.Options
for _, o := range opts {
o(&options)
}
if len(options.Nodes) == 0 {
options.Nodes = []string{DefaultAddress}
}
return &httpLock{
opts: options,
}
}

View File

@ -1,45 +0,0 @@
// Package server implements the sync http server
package server
import (
"net/http"
"github.com/micro/go-micro/v2/sync/lock"
lkhttp "github.com/micro/go-micro/v2/sync/lock/http"
)
func Handler(lk lock.Lock) http.Handler {
mux := http.NewServeMux()
mux.HandleFunc(lkhttp.DefaultPath, func(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
id := r.Form.Get("id")
if len(id) == 0 {
return
}
switch r.Method {
case "POST":
err := lk.Acquire(id)
if err != nil {
http.Error(w, err.Error(), 500)
}
case "DELETE":
err := lk.Release(id)
if err != nil {
http.Error(w, err.Error(), 500)
}
}
})
return mux
}
func Server(lk lock.Lock) *http.Server {
server := &http.Server{
Addr: lkhttp.DefaultAddress,
Handler: Handler(lk),
}
return server
}

View File

@ -1,32 +0,0 @@
// Package lock provides distributed locking
package lock
import (
"errors"
"time"
)
var (
ErrLockTimeout = errors.New("lock timeout")
)
// Lock is a distributed locking interface
type Lock interface {
// Acquire a lock with given id
Acquire(id string, opts ...AcquireOption) error
// Release the lock with given id
Release(id string) error
}
type Options struct {
Nodes []string
Prefix string
}
type AcquireOptions struct {
TTL time.Duration
Wait time.Duration
}
type Option func(o *Options)
type AcquireOption func(o *AcquireOptions)

View File

@ -1,142 +0,0 @@
// Package memory provides a sync.Mutex implementation of the lock for local use
package memory
import (
"sync"
"time"
lock "github.com/micro/go-micro/v2/sync/lock"
)
type memoryLock struct {
sync.RWMutex
locks map[string]*mlock
}
type mlock struct {
id string
time time.Time
ttl time.Duration
release chan bool
}
func (m *memoryLock) Acquire(id string, opts ...lock.AcquireOption) error {
// lock our access
m.Lock()
var options lock.AcquireOptions
for _, o := range opts {
o(&options)
}
lk, ok := m.locks[id]
if !ok {
m.locks[id] = &mlock{
id: id,
time: time.Now(),
ttl: options.TTL,
release: make(chan bool),
}
// unlock
m.Unlock()
return nil
}
m.Unlock()
// set wait time
var wait <-chan time.Time
var ttl <-chan time.Time
// decide if we should wait
if options.Wait > time.Duration(0) {
wait = time.After(options.Wait)
}
// check the ttl of the lock
if lk.ttl > time.Duration(0) {
// time lived for the lock
live := time.Since(lk.time)
// set a timer for the leftover ttl
if live > lk.ttl {
// release the lock if it expired
_ = m.Release(id)
} else {
ttl = time.After(live)
}
}
lockLoop:
for {
// wait for the lock to be released
select {
case <-lk.release:
m.Lock()
// someone locked before us
lk, ok = m.locks[id]
if ok {
m.Unlock()
continue
}
// got chance to lock
m.locks[id] = &mlock{
id: id,
time: time.Now(),
ttl: options.TTL,
release: make(chan bool),
}
m.Unlock()
break lockLoop
case <-ttl:
// ttl exceeded
_ = m.Release(id)
// TODO: check the ttl again above
ttl = nil
// try acquire
continue
case <-wait:
return lock.ErrLockTimeout
}
}
return nil
}
func (m *memoryLock) Release(id string) error {
m.Lock()
defer m.Unlock()
lk, ok := m.locks[id]
// no lock exists
if !ok {
return nil
}
// delete the lock
delete(m.locks, id)
select {
case <-lk.release:
return nil
default:
close(lk.release)
}
return nil
}
func NewLock(opts ...lock.Option) lock.Lock {
var options lock.Options
for _, o := range opts {
o(&options)
}
return &memoryLock{
locks: make(map[string]*mlock),
}
}

View File

@ -1,33 +0,0 @@
package lock
import (
"time"
)
// Nodes sets the addresses the underlying lock implementation
func Nodes(a ...string) Option {
return func(o *Options) {
o.Nodes = a
}
}
// Prefix sets a prefix to any lock ids used
func Prefix(p string) Option {
return func(o *Options) {
o.Prefix = p
}
}
// TTL sets the lock ttl
func TTL(t time.Duration) AcquireOption {
return func(o *AcquireOptions) {
o.TTL = t
}
}
// Wait sets the wait time
func Wait(t time.Duration) AcquireOption {
return func(o *AcquireOptions) {
o.Wait = t
}
}

View File

@ -1,156 +0,0 @@
package sync
import (
"bytes"
"encoding/base64"
"encoding/json"
"fmt"
"sort"
"github.com/micro/go-micro/v2/store"
)
type syncMap struct {
opts Options
}
func ekey(k interface{}) string {
b, _ := json.Marshal(k)
return base64.StdEncoding.EncodeToString(b)
}
func (m *syncMap) Read(key, val interface{}) error {
if key == nil {
return fmt.Errorf("key is nil")
}
kstr := ekey(key)
// lock
if err := m.opts.Lock.Acquire(kstr); err != nil {
return err
}
defer m.opts.Lock.Release(kstr)
// get key
kval, err := m.opts.Store.Read(kstr)
if err != nil {
return err
}
if len(kval) == 0 {
return store.ErrNotFound
}
// decode value
return json.Unmarshal(kval[0].Value, val)
}
func (m *syncMap) Write(key, val interface{}) error {
if key == nil {
return fmt.Errorf("key is nil")
}
kstr := ekey(key)
// lock
if err := m.opts.Lock.Acquire(kstr); err != nil {
return err
}
defer m.opts.Lock.Release(kstr)
// encode value
b, err := json.Marshal(val)
if err != nil {
return err
}
// set key
return m.opts.Store.Write(&store.Record{
Key: kstr,
Value: b,
})
}
func (m *syncMap) Delete(key interface{}) error {
if key == nil {
return fmt.Errorf("key is nil")
}
kstr := ekey(key)
// lock
if err := m.opts.Lock.Acquire(kstr); err != nil {
return err
}
defer m.opts.Lock.Release(kstr)
return m.opts.Store.Delete(kstr)
}
func (m *syncMap) Iterate(fn func(key, val interface{}) error) error {
keyvals, err := m.opts.Store.Read("", store.ReadPrefix())
if err != nil {
return err
}
sort.Slice(keyvals, func(i, j int) bool {
return keyvals[i].Key < keyvals[j].Key
})
for _, keyval := range keyvals {
// lock
if err := m.opts.Lock.Acquire(keyval.Key); err != nil {
return err
}
// unlock
defer m.opts.Lock.Release(keyval.Key)
// unmarshal value
var val interface{}
if len(keyval.Value) > 0 && keyval.Value[0] == '{' {
if err := json.Unmarshal(keyval.Value, &val); err != nil {
return err
}
} else {
val = keyval.Value
}
// exec func
if err := fn(keyval.Key, val); err != nil {
return err
}
// save val
b, err := json.Marshal(val)
if err != nil {
return err
}
// no save
if i := bytes.Compare(keyval.Value, b); i == 0 {
return nil
}
// set key
if err := m.opts.Store.Write(&store.Record{
Key: keyval.Key,
Value: b,
}); err != nil {
return err
}
}
return nil
}
func NewMap(opts ...Option) Map {
var options Options
for _, o := range opts {
o(&options)
}
return &syncMap{
opts: options,
}
}

202
sync/memory/memory.go Normal file
View File

@ -0,0 +1,202 @@
// Package memory provides a sync.Mutex implementation of the lock for local use
package memory
import (
gosync "sync"
"time"
"github.com/micro/go-micro/v2/sync"
)
type memorySync struct {
options sync.Options
mtx gosync.RWMutex
locks map[string]*memoryLock
}
type memoryLock struct {
id string
time time.Time
ttl time.Duration
release chan bool
}
type memoryLeader struct {
opts sync.LeaderOptions
id string
resign func(id string) error
status chan bool
}
func (m *memoryLeader) Resign() error {
return m.resign(m.id)
}
func (m *memoryLeader) Status() chan bool {
return m.status
}
func (m *memorySync) Leader(id string, opts ...sync.LeaderOption) (sync.Leader, error) {
var once gosync.Once
var options sync.LeaderOptions
for _, o := range opts {
o(&options)
}
// acquire a lock for the id
if err := m.Lock(id); err != nil {
return nil, err
}
// return the leader
return &memoryLeader{
opts: options,
id: id,
resign: func(id string) error {
once.Do(func() {
m.Unlock(id)
})
return nil
},
// TODO: signal when Unlock is called
status: make(chan bool, 1),
}, nil
}
func (m *memorySync) Init(opts ...sync.Option) error {
for _, o := range opts {
o(&m.options)
}
return nil
}
func (m *memorySync) Options() sync.Options {
return m.options
}
func (m *memorySync) Lock(id string, opts ...sync.LockOption) error {
// lock our access
m.mtx.Lock()
var options sync.LockOptions
for _, o := range opts {
o(&options)
}
lk, ok := m.locks[id]
if !ok {
m.locks[id] = &memoryLock{
id: id,
time: time.Now(),
ttl: options.TTL,
release: make(chan bool),
}
// unlock
m.mtx.Unlock()
return nil
}
m.mtx.Unlock()
// set wait time
var wait <-chan time.Time
var ttl <-chan time.Time
// decide if we should wait
if options.Wait > time.Duration(0) {
wait = time.After(options.Wait)
}
// check the ttl of the lock
if lk.ttl > time.Duration(0) {
// time lived for the lock
live := time.Since(lk.time)
// set a timer for the leftover ttl
if live > lk.ttl {
// release the lock if it expired
_ = m.Unlock(id)
} else {
ttl = time.After(live)
}
}
lockLoop:
for {
// wait for the lock to be released
select {
case <-lk.release:
m.mtx.Lock()
// someone locked before us
lk, ok = m.locks[id]
if ok {
m.mtx.Unlock()
continue
}
// got chance to lock
m.locks[id] = &memoryLock{
id: id,
time: time.Now(),
ttl: options.TTL,
release: make(chan bool),
}
m.mtx.Unlock()
break lockLoop
case <-ttl:
// ttl exceeded
_ = m.Unlock(id)
// TODO: check the ttl again above
ttl = nil
// try acquire
continue
case <-wait:
return sync.ErrLockTimeout
}
}
return nil
}
func (m *memorySync) Unlock(id string) error {
m.mtx.Lock()
defer m.mtx.Unlock()
lk, ok := m.locks[id]
// no lock exists
if !ok {
return nil
}
// delete the lock
delete(m.locks, id)
select {
case <-lk.release:
return nil
default:
close(lk.release)
}
return nil
}
func (m *memorySync) String() string {
return "memory"
}
func NewSync(opts ...sync.Option) sync.Sync {
var options sync.Options
for _, o := range opts {
o(&options)
}
return &memorySync{
options: options,
locks: make(map[string]*memoryLock),
}
}

View File

@ -1,36 +1,33 @@
package sync package sync
import ( import (
"github.com/micro/go-micro/v2/store" "time"
"github.com/micro/go-micro/v2/sync/leader"
"github.com/micro/go-micro/v2/sync/lock"
"github.com/micro/go-micro/v2/sync/time"
) )
// WithLeader sets the leader election implementation opton // Nodes sets the addresses to use
func WithLeader(l leader.Leader) Option { func Nodes(a ...string) Option {
return func(o *Options) { return func(o *Options) {
o.Leader = l o.Nodes = a
} }
} }
// WithLock sets the locking implementation option // Prefix sets a prefix to any lock ids used
func WithLock(l lock.Lock) Option { func Prefix(p string) Option {
return func(o *Options) { return func(o *Options) {
o.Lock = l o.Prefix = p
} }
} }
// WithStore sets the store implementation option // LockTTL sets the lock ttl
func WithStore(s store.Store) Option { func LockTTL(t time.Duration) LockOption {
return func(o *Options) { return func(o *LockOptions) {
o.Store = s o.TTL = t
} }
} }
// WithTime sets the time implementation option // LockWait sets the wait time
func WithTime(t time.Time) Option { func LockWait(t time.Duration) LockOption {
return func(o *Options) { return func(o *LockOptions) {
o.Time = t o.Wait = t
} }
} }

View File

@ -2,40 +2,52 @@
package sync package sync
import ( import (
"github.com/micro/go-micro/v2/store" "errors"
"github.com/micro/go-micro/v2/sync/leader" "time"
"github.com/micro/go-micro/v2/sync/lock"
"github.com/micro/go-micro/v2/sync/task"
"github.com/micro/go-micro/v2/sync/time"
) )
// Map provides synchronized access to key-value storage. var (
// It uses the store interface and lock interface to ErrLockTimeout = errors.New("lock timeout")
// provide a consistent storage mechanism. )
type Map interface {
// Read value with given key // Sync is an interface for synchronization
Read(key, val interface{}) error type Sync interface {
// Write value with given key // Initialise options
Write(key, val interface{}) error Init(...Option) error
// Delete value with given key // Return the options
Delete(key interface{}) error Options() Options
// Iterate over all key/vals. Value changes are saved // Elect a leader
Iterate(func(key, val interface{}) error) error Leader(id string, opts ...LeaderOption) (Leader, error)
// Lock acquires a lock
Lock(id string, opts ...LockOption) error
// Unlock releases a lock
Unlock(id string) error
// Sync implementation
String() string
} }
// Cron is a distributed scheduler using leader election // Leader provides leadership election
// and distributed task runners. It uses the leader and type Leader interface {
// task interfaces. // resign leadership
type Cron interface { Resign() error
Schedule(task.Schedule, task.Command) error // status returns when leadership is lost
Status() chan bool
} }
type Options struct { type Options struct {
Leader leader.Leader Nodes []string
Lock lock.Lock Prefix string
Store store.Store
Task task.Task
Time time.Time
} }
type Option func(o *Options) type Option func(o *Options)
type LeaderOptions struct {}
type LeaderOption func(o *LeaderOptions)
type LockOptions struct {
TTL time.Duration
Wait time.Duration
}
type LockOption func(o *LockOptions)

View File

@ -1,227 +0,0 @@
// Package broker provides a distributed task manager built on the micro broker
package broker
import (
"context"
"errors"
"fmt"
"math/rand"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/micro/go-micro/v2/broker"
"github.com/micro/go-micro/v2/sync/task"
)
type brokerKey struct{}
// Task is a broker task
type Task struct {
// a micro broker
Broker broker.Broker
// Options
Options task.Options
mtx sync.RWMutex
status string
}
func returnError(err error, ch chan error) {
select {
case ch <- err:
default:
}
}
func (t *Task) Run(c task.Command) error {
// connect
t.Broker.Connect()
// unique id for this runner
id := uuid.New().String()
// topic of the command
topic := fmt.Sprintf("task.%s", c.Name)
// global error
errCh := make(chan error, t.Options.Pool)
// subscribe for distributed work
workFn := func(p broker.Event) error {
msg := p.Message()
// get command name
command := msg.Header["Command"]
// check the command is what we expect
if command != c.Name {
returnError(errors.New("received unknown command: "+command), errCh)
return nil
}
// new task created
switch msg.Header["Status"] {
case "start":
// artificially delay start of processing
time.Sleep(time.Millisecond * time.Duration(10+rand.Intn(100)))
// execute the function
err := c.Func()
status := "done"
errors := ""
if err != nil {
status = "error"
errors = err.Error()
}
// create response
msg := &broker.Message{
Header: map[string]string{
"Command": c.Name,
"Error": errors,
"Id": id,
"Status": status,
"Timestamp": fmt.Sprintf("%d", time.Now().Unix()),
},
// Body is nil, may be used in future
}
// publish end of task
if err := t.Broker.Publish(topic, msg); err != nil {
returnError(err, errCh)
}
}
return nil
}
// subscribe for the pool size
for i := 0; i < t.Options.Pool; i++ {
err := func() error {
// subscribe to work
subWork, err := t.Broker.Subscribe(topic, workFn, broker.Queue(fmt.Sprintf("work.%d", i)))
if err != nil {
return err
}
// unsubscribe on completion
defer subWork.Unsubscribe()
return nil
}()
if err != nil {
return err
}
}
// subscribe to all status messages
subStatus, err := t.Broker.Subscribe(topic, func(p broker.Event) error {
msg := p.Message()
// get command name
command := msg.Header["Command"]
// check the command is what we expect
if command != c.Name {
return nil
}
// check task status
switch msg.Header["Status"] {
// task is complete
case "done":
errCh <- nil
// someone failed
case "error":
returnError(errors.New(msg.Header["Error"]), errCh)
}
return nil
})
if err != nil {
return err
}
defer subStatus.Unsubscribe()
// a new task
msg := &broker.Message{
Header: map[string]string{
"Command": c.Name,
"Id": id,
"Status": "start",
"Timestamp": fmt.Sprintf("%d", time.Now().Unix()),
},
}
// artificially delay the start of the task
time.Sleep(time.Millisecond * time.Duration(10+rand.Intn(100)))
// publish the task
if err := t.Broker.Publish(topic, msg); err != nil {
return err
}
var gerrors []string
// wait for all responses
for i := 0; i < t.Options.Pool; i++ {
// check errors
err := <-errCh
// append to errors
if err != nil {
gerrors = append(gerrors, err.Error())
}
}
// return the errors
if len(gerrors) > 0 {
return errors.New("errors: " + strings.Join(gerrors, "\n"))
}
return nil
}
func (t *Task) Status() string {
t.mtx.RLock()
defer t.mtx.RUnlock()
return t.status
}
// Broker sets the micro broker
func WithBroker(b broker.Broker) task.Option {
return func(o *task.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, brokerKey{}, b)
}
}
// NewTask returns a new broker task
func NewTask(opts ...task.Option) task.Task {
options := task.Options{
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
if options.Pool == 0 {
options.Pool = 1
}
b, ok := options.Context.Value(brokerKey{}).(broker.Broker)
if !ok {
b = broker.DefaultBroker
}
return &Task{
Broker: b,
Options: options,
}
}

View File

@ -1,59 +0,0 @@
// Package local provides a local task runner
package local
import (
"fmt"
"sync"
"github.com/micro/go-micro/v2/sync/task"
)
type localTask struct {
opts task.Options
mtx sync.RWMutex
status string
}
func (l *localTask) Run(t task.Command) error {
ch := make(chan error, l.opts.Pool)
for i := 0; i < l.opts.Pool; i++ {
go func() {
ch <- t.Execute()
}()
}
var err error
for i := 0; i < l.opts.Pool; i++ {
er := <-ch
if err != nil {
err = er
l.mtx.Lock()
l.status = fmt.Sprintf("command [%s] status: %s", t.Name, err.Error())
l.mtx.Unlock()
}
}
close(ch)
return err
}
func (l *localTask) Status() string {
l.mtx.RLock()
defer l.mtx.RUnlock()
return l.status
}
func NewTask(opts ...task.Option) task.Task {
var options task.Options
for _, o := range opts {
o(&options)
}
if options.Pool == 0 {
options.Pool = 1
}
return &localTask{
opts: options,
}
}

View File

@ -1,85 +0,0 @@
// Package task provides an interface for distributed jobs
package task
import (
"context"
"fmt"
"time"
)
// Task represents a distributed task
type Task interface {
// Run runs a command immediately until completion
Run(Command) error
// Status provides status of last execution
Status() string
}
// Command to be executed
type Command struct {
Name string
Func func() error
}
// Schedule represents a time or interval at which a task should run
type Schedule struct {
// When to start the schedule. Zero time means immediately
Time time.Time
// Non zero interval dictates an ongoing schedule
Interval time.Duration
}
type Options struct {
// Pool size for workers
Pool int
// Alternative options
Context context.Context
}
type Option func(o *Options)
func (c Command) Execute() error {
return c.Func()
}
func (c Command) String() string {
return c.Name
}
func (s Schedule) Run() <-chan time.Time {
d := s.Time.Sub(time.Now())
ch := make(chan time.Time, 1)
go func() {
// wait for start time
<-time.After(d)
// zero interval
if s.Interval == time.Duration(0) {
ch <- time.Now()
close(ch)
return
}
// start ticker
ticker := time.NewTicker(s.Interval)
defer ticker.Stop()
for t := range ticker.C {
ch <- t
}
}()
return ch
}
func (s Schedule) String() string {
return fmt.Sprintf("%d-%d", s.Time.Unix(), s.Interval)
}
// WithPool sets the pool size for concurrent work
func WithPool(i int) Option {
return func(o *Options) {
o.Pool = i
}
}

View File

@ -1,18 +0,0 @@
// Package local provides a local clock
package local
import (
gotime "time"
"github.com/micro/go-micro/v2/sync/time"
)
type Time struct{}
func (t *Time) Now() (gotime.Time, error) {
return gotime.Now(), nil
}
func NewTime(opts ...time.Option) time.Time {
return new(Time)
}

View File

@ -1,51 +0,0 @@
// Package ntp provides ntp synchronized time
package ntp
import (
"context"
gotime "time"
"github.com/beevik/ntp"
"github.com/micro/go-micro/v2/sync/time"
)
type ntpTime struct {
server string
}
type ntpServerKey struct{}
func (n *ntpTime) Now() (gotime.Time, error) {
return ntp.Time(n.server)
}
// NewTime returns ntp time
func NewTime(opts ...time.Option) time.Time {
options := time.Options{
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
server := "time.google.com"
if k, ok := options.Context.Value(ntpServerKey{}).(string); ok {
server = k
}
return &ntpTime{
server: server,
}
}
// WithServer sets the ntp server
func WithServer(s string) time.Option {
return func(o *time.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, ntpServerKey{}, s)
}
}

View File

@ -1,18 +0,0 @@
// Package time provides clock synchronization
package time
import (
"context"
"time"
)
// Time returns synchronized time
type Time interface {
Now() (time.Time, error)
}
type Options struct {
Context context.Context
}
type Option func(o *Options)