From 39470c1b1113a1f2313fb3907a49903881362c86 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Sat, 11 Apr 2020 10:37:54 +0100 Subject: [PATCH] Completely replace sync implementation --- sync/cron.go | 99 -------------- sync/etcd/etcd.go | 179 +++++++++++++++++++++++++ sync/leader/etcd/etcd.go | 136 ------------------- sync/leader/leader.go | 25 ---- sync/leader/options.go | 22 ---- sync/lock/etcd/etcd.go | 113 ---------------- sync/lock/http/http.go | 135 ------------------- sync/lock/http/server/server.go | 45 ------- sync/lock/lock.go | 32 ----- sync/lock/memory/memory.go | 142 -------------------- sync/lock/options.go | 33 ----- sync/map.go | 156 ---------------------- sync/memory/memory.go | 202 ++++++++++++++++++++++++++++ sync/options.go | 33 +++-- sync/sync.go | 66 ++++++---- sync/task/broker/broker.go | 227 -------------------------------- sync/task/local/local.go | 59 --------- sync/task/task.go | 85 ------------ sync/time/local/local.go | 18 --- sync/time/ntp/ntp.go | 51 ------- sync/time/time.go | 18 --- 21 files changed, 435 insertions(+), 1441 deletions(-) delete mode 100644 sync/cron.go create mode 100644 sync/etcd/etcd.go delete mode 100644 sync/leader/etcd/etcd.go delete mode 100644 sync/leader/leader.go delete mode 100644 sync/leader/options.go delete mode 100644 sync/lock/etcd/etcd.go delete mode 100644 sync/lock/http/http.go delete mode 100644 sync/lock/http/server/server.go delete mode 100644 sync/lock/lock.go delete mode 100644 sync/lock/memory/memory.go delete mode 100644 sync/lock/options.go delete mode 100644 sync/map.go create mode 100644 sync/memory/memory.go delete mode 100644 sync/task/broker/broker.go delete mode 100644 sync/task/local/local.go delete mode 100644 sync/task/task.go delete mode 100644 sync/time/local/local.go delete mode 100644 sync/time/ntp/ntp.go delete mode 100644 sync/time/time.go diff --git a/sync/cron.go b/sync/cron.go deleted file mode 100644 index fddb7e49..00000000 --- a/sync/cron.go +++ /dev/null @@ -1,99 +0,0 @@ -package sync - -import ( - "fmt" - "math" - "time" - - "github.com/micro/go-micro/v2/logger" - "github.com/micro/go-micro/v2/sync/leader/etcd" - "github.com/micro/go-micro/v2/sync/task" - "github.com/micro/go-micro/v2/sync/task/local" -) - -type syncCron struct { - opts Options -} - -func backoff(attempts int) time.Duration { - if attempts == 0 { - return time.Duration(0) - } - return time.Duration(math.Pow(10, float64(attempts))) * time.Millisecond -} - -func (c *syncCron) Schedule(s task.Schedule, t task.Command) error { - id := fmt.Sprintf("%s-%s", s.String(), t.String()) - - go func() { - // run the scheduler - tc := s.Run() - - var i int - - for { - // leader election - e, err := c.opts.Leader.Elect(id) - if err != nil { - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Errorf("[cron] leader election error: %v", err) - } - time.Sleep(backoff(i)) - i++ - continue - } - - i = 0 - r := e.Revoked() - - // execute the task - Tick: - for { - select { - // schedule tick - case _, ok := <-tc: - // ticked once - if !ok { - break Tick - } - - if logger.V(logger.InfoLevel, logger.DefaultLogger) { - logger.Infof("[cron] executing command %s", t.Name) - } - if err := c.opts.Task.Run(t); err != nil { - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Errorf("[cron] error executing command %s: %v", t.Name, err) - } - } - // leader revoked - case <-r: - break Tick - } - } - - // resign - e.Resign() - } - }() - - return nil -} - -func NewCron(opts ...Option) Cron { - var options Options - for _, o := range opts { - o(&options) - } - - if options.Leader == nil { - options.Leader = etcd.NewLeader() - } - - if options.Task == nil { - options.Task = local.NewTask() - } - - return &syncCron{ - opts: options, - } -} diff --git a/sync/etcd/etcd.go b/sync/etcd/etcd.go new file mode 100644 index 00000000..7d493942 --- /dev/null +++ b/sync/etcd/etcd.go @@ -0,0 +1,179 @@ +// Package etcd is an etcd implementation of lock +package etcd + +import ( + "context" + "errors" + "log" + "path" + "strings" + gosync "sync" + + client "github.com/coreos/etcd/clientv3" + cc "github.com/coreos/etcd/clientv3/concurrency" + "github.com/micro/go-micro/v2/sync" +) + +type etcdSync struct { + options sync.Options + path string + client *client.Client + + mtx gosync.Mutex + locks map[string]*etcdLock +} + +type etcdLock struct { + s *cc.Session + m *cc.Mutex +} + +type etcdLeader struct { + opts sync.LeaderOptions + s *cc.Session + e *cc.Election + id string +} + +func (e *etcdSync) Leader(id string, opts ...sync.LeaderOption) (sync.Leader, error) { + var options sync.LeaderOptions + for _, o := range opts { + o(&options) + } + + // make path + path := path.Join(e.path, strings.Replace(e.options.Prefix+id, "/", "-", -1)) + + s, err := cc.NewSession(e.client) + if err != nil { + return nil, err + } + + l := cc.NewElection(s, path) + + if err := l.Campaign(context.TODO(), id); err != nil { + return nil, err + } + + return &etcdLeader{ + opts: options, + e: l, + id: id, + }, nil +} + +func (e *etcdLeader) Status() chan bool { + ch := make(chan bool, 1) + ech := e.e.Observe(context.Background()) + + go func() { + for r := range ech { + if string(r.Kvs[0].Value) != e.id { + ch <- true + close(ch) + return + } + } + }() + + return ch +} + +func (e *etcdLeader) Resign() error { + return e.e.Resign(context.Background()) +} + +func (e *etcdSync) Init(opts ...sync.Option) error { + for _, o := range opts { + o(&e.options) + } + return nil +} + +func (e *etcdSync) Options() sync.Options { + return e.options +} + +func (e *etcdSync) Lock(id string, opts ...sync.LockOption) error { + var options sync.LockOptions + for _, o := range opts { + o(&options) + } + + // make path + path := path.Join(e.path, strings.Replace(e.options.Prefix+id, "/", "-", -1)) + + var sopts []cc.SessionOption + if options.TTL > 0 { + sopts = append(sopts, cc.WithTTL(int(options.TTL.Seconds()))) + } + + s, err := cc.NewSession(e.client, sopts...) + if err != nil { + return err + } + + m := cc.NewMutex(s, path) + + if err := m.Lock(context.TODO()); err != nil { + return err + } + + e.mtx.Lock() + e.locks[id] = &etcdLock{ + s: s, + m: m, + } + e.mtx.Unlock() + return nil +} + +func (e *etcdSync) Unlock(id string) error { + e.mtx.Lock() + defer e.mtx.Unlock() + v, ok := e.locks[id] + if !ok { + return errors.New("lock not found") + } + err := v.m.Unlock(context.Background()) + delete(e.locks, id) + return err +} + +func (e *etcdSync) String() string { + return "etcd" +} + +func NewSync(opts ...sync.Option) sync.Sync { + var options sync.Options + for _, o := range opts { + o(&options) + } + + var endpoints []string + + for _, addr := range options.Nodes { + if len(addr) > 0 { + endpoints = append(endpoints, addr) + } + } + + if len(endpoints) == 0 { + endpoints = []string{"http://127.0.0.1:2379"} + } + + // TODO: parse addresses + c, err := client.New(client.Config{ + Endpoints: endpoints, + }) + if err != nil { + log.Fatal(err) + } + + return &etcdSync{ + path: "/micro/sync", + client: c, + options: options, + locks: make(map[string]*etcdLock), + } +} diff --git a/sync/leader/etcd/etcd.go b/sync/leader/etcd/etcd.go deleted file mode 100644 index 8e23370b..00000000 --- a/sync/leader/etcd/etcd.go +++ /dev/null @@ -1,136 +0,0 @@ -package etcd - -import ( - "context" - "log" - "path" - "strings" - - client "github.com/coreos/etcd/clientv3" - cc "github.com/coreos/etcd/clientv3/concurrency" - "github.com/micro/go-micro/v2/sync/leader" -) - -type etcdLeader struct { - opts leader.Options - path string - client *client.Client -} - -type etcdElected struct { - s *cc.Session - e *cc.Election - id string -} - -func (e *etcdLeader) Elect(id string, opts ...leader.ElectOption) (leader.Elected, error) { - var options leader.ElectOptions - for _, o := range opts { - o(&options) - } - - // make path - path := path.Join(e.path, strings.Replace(id, "/", "-", -1)) - - s, err := cc.NewSession(e.client) - if err != nil { - return nil, err - } - - l := cc.NewElection(s, path) - - if err := l.Campaign(context.TODO(), id); err != nil { - return nil, err - } - - return &etcdElected{ - e: l, - id: id, - }, nil -} - -func (e *etcdLeader) Follow() chan string { - ch := make(chan string) - - s, err := cc.NewSession(e.client) - if err != nil { - return ch - } - - l := cc.NewElection(s, e.path) - ech := l.Observe(context.Background()) - - go func() { - for r := range ech { - ch <- string(r.Kvs[0].Value) - } - }() - - return ch -} - -func (e *etcdLeader) String() string { - return "etcd" -} - -func (e *etcdElected) Reelect() error { - return e.e.Campaign(context.TODO(), e.id) -} - -func (e *etcdElected) Revoked() chan bool { - ch := make(chan bool, 1) - ech := e.e.Observe(context.Background()) - - go func() { - for r := range ech { - if string(r.Kvs[0].Value) != e.id { - ch <- true - close(ch) - return - } - } - }() - - return ch -} - -func (e *etcdElected) Resign() error { - return e.e.Resign(context.Background()) -} - -func (e *etcdElected) Id() string { - return e.id -} - -func NewLeader(opts ...leader.Option) leader.Leader { - var options leader.Options - for _, o := range opts { - o(&options) - } - - var endpoints []string - - for _, addr := range options.Nodes { - if len(addr) > 0 { - endpoints = append(endpoints, addr) - } - } - - if len(endpoints) == 0 { - endpoints = []string{"http://127.0.0.1:2379"} - } - - // TODO: parse addresses - c, err := client.New(client.Config{ - Endpoints: endpoints, - }) - if err != nil { - log.Fatal(err) - } - - return &etcdLeader{ - path: "/micro/leader", - client: c, - opts: options, - } -} diff --git a/sync/leader/leader.go b/sync/leader/leader.go deleted file mode 100644 index a4ac53f3..00000000 --- a/sync/leader/leader.go +++ /dev/null @@ -1,25 +0,0 @@ -// Package leader provides leader election -package leader - -// Leader provides leadership election -type Leader interface { - // elect leader - Elect(id string, opts ...ElectOption) (Elected, error) - // follow the leader - Follow() chan string -} - -type Elected interface { - // id of leader - Id() string - // seek re-election - Reelect() error - // resign leadership - Resign() error - // observe leadership revocation - Revoked() chan bool -} - -type Option func(o *Options) - -type ElectOption func(o *ElectOptions) diff --git a/sync/leader/options.go b/sync/leader/options.go deleted file mode 100644 index 05e92e91..00000000 --- a/sync/leader/options.go +++ /dev/null @@ -1,22 +0,0 @@ -package leader - -type Options struct { - Nodes []string - Group string -} - -type ElectOptions struct{} - -// Nodes sets the addresses of the underlying systems -func Nodes(a ...string) Option { - return func(o *Options) { - o.Nodes = a - } -} - -// Group sets the group name for coordinating leadership -func Group(g string) Option { - return func(o *Options) { - o.Group = g - } -} diff --git a/sync/lock/etcd/etcd.go b/sync/lock/etcd/etcd.go deleted file mode 100644 index 1d2a7e70..00000000 --- a/sync/lock/etcd/etcd.go +++ /dev/null @@ -1,113 +0,0 @@ -// Package etcd is an etcd implementation of lock -package etcd - -import ( - "context" - "errors" - "log" - "path" - "strings" - "sync" - - client "github.com/coreos/etcd/clientv3" - cc "github.com/coreos/etcd/clientv3/concurrency" - "github.com/micro/go-micro/v2/sync/lock" -) - -type etcdLock struct { - opts lock.Options - path string - client *client.Client - - sync.Mutex - locks map[string]*elock -} - -type elock struct { - s *cc.Session - m *cc.Mutex -} - -func (e *etcdLock) Acquire(id string, opts ...lock.AcquireOption) error { - var options lock.AcquireOptions - for _, o := range opts { - o(&options) - } - - // make path - path := path.Join(e.path, strings.Replace(e.opts.Prefix+id, "/", "-", -1)) - - var sopts []cc.SessionOption - if options.TTL > 0 { - sopts = append(sopts, cc.WithTTL(int(options.TTL.Seconds()))) - } - - s, err := cc.NewSession(e.client, sopts...) - if err != nil { - return err - } - - m := cc.NewMutex(s, path) - - if err := m.Lock(context.TODO()); err != nil { - return err - } - - e.Lock() - e.locks[id] = &elock{ - s: s, - m: m, - } - e.Unlock() - return nil -} - -func (e *etcdLock) Release(id string) error { - e.Lock() - defer e.Unlock() - v, ok := e.locks[id] - if !ok { - return errors.New("lock not found") - } - err := v.m.Unlock(context.Background()) - delete(e.locks, id) - return err -} - -func (e *etcdLock) String() string { - return "etcd" -} - -func NewLock(opts ...lock.Option) lock.Lock { - var options lock.Options - for _, o := range opts { - o(&options) - } - - var endpoints []string - - for _, addr := range options.Nodes { - if len(addr) > 0 { - endpoints = append(endpoints, addr) - } - } - - if len(endpoints) == 0 { - endpoints = []string{"http://127.0.0.1:2379"} - } - - // TODO: parse addresses - c, err := client.New(client.Config{ - Endpoints: endpoints, - }) - if err != nil { - log.Fatal(err) - } - - return &etcdLock{ - path: "/micro/lock", - client: c, - opts: options, - locks: make(map[string]*elock), - } -} diff --git a/sync/lock/http/http.go b/sync/lock/http/http.go deleted file mode 100644 index 3118c189..00000000 --- a/sync/lock/http/http.go +++ /dev/null @@ -1,135 +0,0 @@ -// Package http adds a http lock implementation -package http - -import ( - "errors" - "fmt" - "hash/crc32" - "io/ioutil" - "net/http" - "net/url" - "path/filepath" - "strings" - - "github.com/micro/go-micro/v2/sync/lock" -) - -var ( - DefaultPath = "/sync/lock" - DefaultAddress = "localhost:8080" -) - -type httpLock struct { - opts lock.Options -} - -func (h *httpLock) url(do, id string) (string, error) { - sum := crc32.ChecksumIEEE([]byte(id)) - node := h.opts.Nodes[sum%uint32(len(h.opts.Nodes))] - - // parse the host:port or whatever - uri, err := url.Parse(node) - if err != nil { - return "", err - } - - if len(uri.Scheme) == 0 { - uri.Scheme = "http" - } - - // set path - // build path - path := filepath.Join(DefaultPath, do, h.opts.Prefix, id) - uri.Path = path - - // return url - return uri.String(), nil -} - -func (h *httpLock) Acquire(id string, opts ...lock.AcquireOption) error { - var options lock.AcquireOptions - for _, o := range opts { - o(&options) - } - - uri, err := h.url("acquire", id) - if err != nil { - return err - } - - ttl := fmt.Sprintf("%d", int64(options.TTL.Seconds())) - wait := fmt.Sprintf("%d", int64(options.Wait.Seconds())) - - rsp, err := http.PostForm(uri, url.Values{ - "id": {id}, - "ttl": {ttl}, - "wait": {wait}, - }) - if err != nil { - return err - } - defer rsp.Body.Close() - - b, err := ioutil.ReadAll(rsp.Body) - if err != nil { - return err - } - - // success - if rsp.StatusCode == 200 { - return nil - } - - // return error - return errors.New(string(b)) -} - -func (h *httpLock) Release(id string) error { - uri, err := h.url("release", id) - if err != nil { - return err - } - - vals := url.Values{ - "id": {id}, - } - - req, err := http.NewRequest("DELETE", uri, strings.NewReader(vals.Encode())) - if err != nil { - return err - } - - rsp, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - defer rsp.Body.Close() - - b, err := ioutil.ReadAll(rsp.Body) - if err != nil { - return err - } - - // success - if rsp.StatusCode == 200 { - return nil - } - - // return error - return errors.New(string(b)) -} - -func NewLock(opts ...lock.Option) lock.Lock { - var options lock.Options - for _, o := range opts { - o(&options) - } - - if len(options.Nodes) == 0 { - options.Nodes = []string{DefaultAddress} - } - - return &httpLock{ - opts: options, - } -} diff --git a/sync/lock/http/server/server.go b/sync/lock/http/server/server.go deleted file mode 100644 index df38fd69..00000000 --- a/sync/lock/http/server/server.go +++ /dev/null @@ -1,45 +0,0 @@ -// Package server implements the sync http server -package server - -import ( - "net/http" - - "github.com/micro/go-micro/v2/sync/lock" - lkhttp "github.com/micro/go-micro/v2/sync/lock/http" -) - -func Handler(lk lock.Lock) http.Handler { - mux := http.NewServeMux() - - mux.HandleFunc(lkhttp.DefaultPath, func(w http.ResponseWriter, r *http.Request) { - r.ParseForm() - - id := r.Form.Get("id") - if len(id) == 0 { - return - } - - switch r.Method { - case "POST": - err := lk.Acquire(id) - if err != nil { - http.Error(w, err.Error(), 500) - } - case "DELETE": - err := lk.Release(id) - if err != nil { - http.Error(w, err.Error(), 500) - } - } - }) - - return mux -} - -func Server(lk lock.Lock) *http.Server { - server := &http.Server{ - Addr: lkhttp.DefaultAddress, - Handler: Handler(lk), - } - return server -} diff --git a/sync/lock/lock.go b/sync/lock/lock.go deleted file mode 100644 index c21a9125..00000000 --- a/sync/lock/lock.go +++ /dev/null @@ -1,32 +0,0 @@ -// Package lock provides distributed locking -package lock - -import ( - "errors" - "time" -) - -var ( - ErrLockTimeout = errors.New("lock timeout") -) - -// Lock is a distributed locking interface -type Lock interface { - // Acquire a lock with given id - Acquire(id string, opts ...AcquireOption) error - // Release the lock with given id - Release(id string) error -} - -type Options struct { - Nodes []string - Prefix string -} - -type AcquireOptions struct { - TTL time.Duration - Wait time.Duration -} - -type Option func(o *Options) -type AcquireOption func(o *AcquireOptions) diff --git a/sync/lock/memory/memory.go b/sync/lock/memory/memory.go deleted file mode 100644 index 99ad1532..00000000 --- a/sync/lock/memory/memory.go +++ /dev/null @@ -1,142 +0,0 @@ -// Package memory provides a sync.Mutex implementation of the lock for local use -package memory - -import ( - "sync" - "time" - - lock "github.com/micro/go-micro/v2/sync/lock" -) - -type memoryLock struct { - sync.RWMutex - locks map[string]*mlock -} - -type mlock struct { - id string - time time.Time - ttl time.Duration - release chan bool -} - -func (m *memoryLock) Acquire(id string, opts ...lock.AcquireOption) error { - // lock our access - m.Lock() - - var options lock.AcquireOptions - for _, o := range opts { - o(&options) - } - - lk, ok := m.locks[id] - if !ok { - m.locks[id] = &mlock{ - id: id, - time: time.Now(), - ttl: options.TTL, - release: make(chan bool), - } - // unlock - m.Unlock() - return nil - } - - m.Unlock() - - // set wait time - var wait <-chan time.Time - var ttl <-chan time.Time - - // decide if we should wait - if options.Wait > time.Duration(0) { - wait = time.After(options.Wait) - } - - // check the ttl of the lock - if lk.ttl > time.Duration(0) { - // time lived for the lock - live := time.Since(lk.time) - - // set a timer for the leftover ttl - if live > lk.ttl { - // release the lock if it expired - _ = m.Release(id) - } else { - ttl = time.After(live) - } - } - -lockLoop: - for { - // wait for the lock to be released - select { - case <-lk.release: - m.Lock() - - // someone locked before us - lk, ok = m.locks[id] - if ok { - m.Unlock() - continue - } - - // got chance to lock - m.locks[id] = &mlock{ - id: id, - time: time.Now(), - ttl: options.TTL, - release: make(chan bool), - } - - m.Unlock() - - break lockLoop - case <-ttl: - // ttl exceeded - _ = m.Release(id) - // TODO: check the ttl again above - ttl = nil - // try acquire - continue - case <-wait: - return lock.ErrLockTimeout - } - } - - return nil -} - -func (m *memoryLock) Release(id string) error { - m.Lock() - defer m.Unlock() - - lk, ok := m.locks[id] - // no lock exists - if !ok { - return nil - } - - // delete the lock - delete(m.locks, id) - - select { - case <-lk.release: - return nil - default: - close(lk.release) - } - - return nil -} - -func NewLock(opts ...lock.Option) lock.Lock { - var options lock.Options - for _, o := range opts { - o(&options) - } - - return &memoryLock{ - locks: make(map[string]*mlock), - } -} diff --git a/sync/lock/options.go b/sync/lock/options.go deleted file mode 100644 index 4804ff9c..00000000 --- a/sync/lock/options.go +++ /dev/null @@ -1,33 +0,0 @@ -package lock - -import ( - "time" -) - -// Nodes sets the addresses the underlying lock implementation -func Nodes(a ...string) Option { - return func(o *Options) { - o.Nodes = a - } -} - -// Prefix sets a prefix to any lock ids used -func Prefix(p string) Option { - return func(o *Options) { - o.Prefix = p - } -} - -// TTL sets the lock ttl -func TTL(t time.Duration) AcquireOption { - return func(o *AcquireOptions) { - o.TTL = t - } -} - -// Wait sets the wait time -func Wait(t time.Duration) AcquireOption { - return func(o *AcquireOptions) { - o.Wait = t - } -} diff --git a/sync/map.go b/sync/map.go deleted file mode 100644 index 2196a4e0..00000000 --- a/sync/map.go +++ /dev/null @@ -1,156 +0,0 @@ -package sync - -import ( - "bytes" - "encoding/base64" - "encoding/json" - "fmt" - "sort" - - "github.com/micro/go-micro/v2/store" -) - -type syncMap struct { - opts Options -} - -func ekey(k interface{}) string { - b, _ := json.Marshal(k) - return base64.StdEncoding.EncodeToString(b) -} - -func (m *syncMap) Read(key, val interface{}) error { - if key == nil { - return fmt.Errorf("key is nil") - } - - kstr := ekey(key) - - // lock - if err := m.opts.Lock.Acquire(kstr); err != nil { - return err - } - defer m.opts.Lock.Release(kstr) - - // get key - kval, err := m.opts.Store.Read(kstr) - if err != nil { - return err - } - - if len(kval) == 0 { - return store.ErrNotFound - } - - // decode value - return json.Unmarshal(kval[0].Value, val) -} - -func (m *syncMap) Write(key, val interface{}) error { - if key == nil { - return fmt.Errorf("key is nil") - } - - kstr := ekey(key) - - // lock - if err := m.opts.Lock.Acquire(kstr); err != nil { - return err - } - defer m.opts.Lock.Release(kstr) - - // encode value - b, err := json.Marshal(val) - if err != nil { - return err - } - - // set key - return m.opts.Store.Write(&store.Record{ - Key: kstr, - Value: b, - }) -} - -func (m *syncMap) Delete(key interface{}) error { - if key == nil { - return fmt.Errorf("key is nil") - } - - kstr := ekey(key) - - // lock - if err := m.opts.Lock.Acquire(kstr); err != nil { - return err - } - defer m.opts.Lock.Release(kstr) - return m.opts.Store.Delete(kstr) -} - -func (m *syncMap) Iterate(fn func(key, val interface{}) error) error { - keyvals, err := m.opts.Store.Read("", store.ReadPrefix()) - if err != nil { - return err - } - - sort.Slice(keyvals, func(i, j int) bool { - return keyvals[i].Key < keyvals[j].Key - }) - - for _, keyval := range keyvals { - // lock - if err := m.opts.Lock.Acquire(keyval.Key); err != nil { - return err - } - // unlock - defer m.opts.Lock.Release(keyval.Key) - - // unmarshal value - var val interface{} - - if len(keyval.Value) > 0 && keyval.Value[0] == '{' { - if err := json.Unmarshal(keyval.Value, &val); err != nil { - return err - } - } else { - val = keyval.Value - } - - // exec func - if err := fn(keyval.Key, val); err != nil { - return err - } - - // save val - b, err := json.Marshal(val) - if err != nil { - return err - } - - // no save - if i := bytes.Compare(keyval.Value, b); i == 0 { - return nil - } - - // set key - if err := m.opts.Store.Write(&store.Record{ - Key: keyval.Key, - Value: b, - }); err != nil { - return err - } - } - - return nil -} - -func NewMap(opts ...Option) Map { - var options Options - for _, o := range opts { - o(&options) - } - - return &syncMap{ - opts: options, - } -} diff --git a/sync/memory/memory.go b/sync/memory/memory.go new file mode 100644 index 00000000..f3a92180 --- /dev/null +++ b/sync/memory/memory.go @@ -0,0 +1,202 @@ +// Package memory provides a sync.Mutex implementation of the lock for local use +package memory + +import ( + gosync "sync" + "time" + + "github.com/micro/go-micro/v2/sync" +) + +type memorySync struct { + options sync.Options + + mtx gosync.RWMutex + locks map[string]*memoryLock +} + +type memoryLock struct { + id string + time time.Time + ttl time.Duration + release chan bool +} + +type memoryLeader struct { + opts sync.LeaderOptions + id string + resign func(id string) error + status chan bool +} + +func (m *memoryLeader) Resign() error { + return m.resign(m.id) +} + +func (m *memoryLeader) Status() chan bool { + return m.status +} + +func (m *memorySync) Leader(id string, opts ...sync.LeaderOption) (sync.Leader, error) { + var once gosync.Once + var options sync.LeaderOptions + for _, o := range opts { + o(&options) + } + + // acquire a lock for the id + if err := m.Lock(id); err != nil { + return nil, err + } + + // return the leader + return &memoryLeader{ + opts: options, + id: id, + resign: func(id string) error { + once.Do(func() { + m.Unlock(id) + }) + return nil + }, + // TODO: signal when Unlock is called + status: make(chan bool, 1), + }, nil +} + +func (m *memorySync) Init(opts ...sync.Option) error { + for _, o := range opts { + o(&m.options) + } + return nil +} + +func (m *memorySync) Options() sync.Options { + return m.options +} + +func (m *memorySync) Lock(id string, opts ...sync.LockOption) error { + // lock our access + m.mtx.Lock() + + var options sync.LockOptions + for _, o := range opts { + o(&options) + } + + lk, ok := m.locks[id] + if !ok { + m.locks[id] = &memoryLock{ + id: id, + time: time.Now(), + ttl: options.TTL, + release: make(chan bool), + } + // unlock + m.mtx.Unlock() + return nil + } + + m.mtx.Unlock() + + // set wait time + var wait <-chan time.Time + var ttl <-chan time.Time + + // decide if we should wait + if options.Wait > time.Duration(0) { + wait = time.After(options.Wait) + } + + // check the ttl of the lock + if lk.ttl > time.Duration(0) { + // time lived for the lock + live := time.Since(lk.time) + + // set a timer for the leftover ttl + if live > lk.ttl { + // release the lock if it expired + _ = m.Unlock(id) + } else { + ttl = time.After(live) + } + } + +lockLoop: + for { + // wait for the lock to be released + select { + case <-lk.release: + m.mtx.Lock() + + // someone locked before us + lk, ok = m.locks[id] + if ok { + m.mtx.Unlock() + continue + } + + // got chance to lock + m.locks[id] = &memoryLock{ + id: id, + time: time.Now(), + ttl: options.TTL, + release: make(chan bool), + } + + m.mtx.Unlock() + + break lockLoop + case <-ttl: + // ttl exceeded + _ = m.Unlock(id) + // TODO: check the ttl again above + ttl = nil + // try acquire + continue + case <-wait: + return sync.ErrLockTimeout + } + } + + return nil +} + +func (m *memorySync) Unlock(id string) error { + m.mtx.Lock() + defer m.mtx.Unlock() + + lk, ok := m.locks[id] + // no lock exists + if !ok { + return nil + } + + // delete the lock + delete(m.locks, id) + + select { + case <-lk.release: + return nil + default: + close(lk.release) + } + + return nil +} + +func (m *memorySync) String() string { + return "memory" +} + +func NewSync(opts ...sync.Option) sync.Sync { + var options sync.Options + for _, o := range opts { + o(&options) + } + + return &memorySync{ + options: options, + locks: make(map[string]*memoryLock), + } +} diff --git a/sync/options.go b/sync/options.go index a179479d..6c546356 100644 --- a/sync/options.go +++ b/sync/options.go @@ -1,36 +1,33 @@ package sync import ( - "github.com/micro/go-micro/v2/store" - "github.com/micro/go-micro/v2/sync/leader" - "github.com/micro/go-micro/v2/sync/lock" - "github.com/micro/go-micro/v2/sync/time" + "time" ) -// WithLeader sets the leader election implementation opton -func WithLeader(l leader.Leader) Option { +// Nodes sets the addresses to use +func Nodes(a ...string) Option { return func(o *Options) { - o.Leader = l + o.Nodes = a } } -// WithLock sets the locking implementation option -func WithLock(l lock.Lock) Option { +// Prefix sets a prefix to any lock ids used +func Prefix(p string) Option { return func(o *Options) { - o.Lock = l + o.Prefix = p } } -// WithStore sets the store implementation option -func WithStore(s store.Store) Option { - return func(o *Options) { - o.Store = s +// LockTTL sets the lock ttl +func LockTTL(t time.Duration) LockOption { + return func(o *LockOptions) { + o.TTL = t } } -// WithTime sets the time implementation option -func WithTime(t time.Time) Option { - return func(o *Options) { - o.Time = t +// LockWait sets the wait time +func LockWait(t time.Duration) LockOption { + return func(o *LockOptions) { + o.Wait = t } } diff --git a/sync/sync.go b/sync/sync.go index 69d0c410..94d62043 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -2,40 +2,52 @@ package sync import ( - "github.com/micro/go-micro/v2/store" - "github.com/micro/go-micro/v2/sync/leader" - "github.com/micro/go-micro/v2/sync/lock" - "github.com/micro/go-micro/v2/sync/task" - "github.com/micro/go-micro/v2/sync/time" + "errors" + "time" ) -// Map provides synchronized access to key-value storage. -// It uses the store interface and lock interface to -// provide a consistent storage mechanism. -type Map interface { - // Read value with given key - Read(key, val interface{}) error - // Write value with given key - Write(key, val interface{}) error - // Delete value with given key - Delete(key interface{}) error - // Iterate over all key/vals. Value changes are saved - Iterate(func(key, val interface{}) error) error +var ( + ErrLockTimeout = errors.New("lock timeout") +) + +// Sync is an interface for synchronization +type Sync interface { + // Initialise options + Init(...Option) error + // Return the options + Options() Options + // Elect a leader + Leader(id string, opts ...LeaderOption) (Leader, error) + // Lock acquires a lock + Lock(id string, opts ...LockOption) error + // Unlock releases a lock + Unlock(id string) error + // Sync implementation + String() string } -// Cron is a distributed scheduler using leader election -// and distributed task runners. It uses the leader and -// task interfaces. -type Cron interface { - Schedule(task.Schedule, task.Command) error +// Leader provides leadership election +type Leader interface { + // resign leadership + Resign() error + // status returns when leadership is lost + Status() chan bool } type Options struct { - Leader leader.Leader - Lock lock.Lock - Store store.Store - Task task.Task - Time time.Time + Nodes []string + Prefix string } type Option func(o *Options) + +type LeaderOptions struct {} + +type LeaderOption func(o *LeaderOptions) + +type LockOptions struct { + TTL time.Duration + Wait time.Duration +} + +type LockOption func(o *LockOptions) diff --git a/sync/task/broker/broker.go b/sync/task/broker/broker.go deleted file mode 100644 index feedb0ec..00000000 --- a/sync/task/broker/broker.go +++ /dev/null @@ -1,227 +0,0 @@ -// Package broker provides a distributed task manager built on the micro broker -package broker - -import ( - "context" - "errors" - "fmt" - "math/rand" - "strings" - "sync" - "time" - - "github.com/google/uuid" - "github.com/micro/go-micro/v2/broker" - "github.com/micro/go-micro/v2/sync/task" -) - -type brokerKey struct{} - -// Task is a broker task -type Task struct { - // a micro broker - Broker broker.Broker - // Options - Options task.Options - - mtx sync.RWMutex - status string -} - -func returnError(err error, ch chan error) { - select { - case ch <- err: - default: - } -} - -func (t *Task) Run(c task.Command) error { - // connect - t.Broker.Connect() - // unique id for this runner - id := uuid.New().String() - // topic of the command - topic := fmt.Sprintf("task.%s", c.Name) - - // global error - errCh := make(chan error, t.Options.Pool) - - // subscribe for distributed work - workFn := func(p broker.Event) error { - msg := p.Message() - - // get command name - command := msg.Header["Command"] - - // check the command is what we expect - if command != c.Name { - returnError(errors.New("received unknown command: "+command), errCh) - return nil - } - - // new task created - switch msg.Header["Status"] { - case "start": - // artificially delay start of processing - time.Sleep(time.Millisecond * time.Duration(10+rand.Intn(100))) - - // execute the function - err := c.Func() - - status := "done" - errors := "" - - if err != nil { - status = "error" - errors = err.Error() - } - - // create response - msg := &broker.Message{ - Header: map[string]string{ - "Command": c.Name, - "Error": errors, - "Id": id, - "Status": status, - "Timestamp": fmt.Sprintf("%d", time.Now().Unix()), - }, - // Body is nil, may be used in future - } - - // publish end of task - if err := t.Broker.Publish(topic, msg); err != nil { - returnError(err, errCh) - } - } - - return nil - } - - // subscribe for the pool size - for i := 0; i < t.Options.Pool; i++ { - err := func() error { - // subscribe to work - subWork, err := t.Broker.Subscribe(topic, workFn, broker.Queue(fmt.Sprintf("work.%d", i))) - if err != nil { - return err - } - - // unsubscribe on completion - defer subWork.Unsubscribe() - - return nil - }() - - if err != nil { - return err - } - } - - // subscribe to all status messages - subStatus, err := t.Broker.Subscribe(topic, func(p broker.Event) error { - msg := p.Message() - - // get command name - command := msg.Header["Command"] - - // check the command is what we expect - if command != c.Name { - return nil - } - - // check task status - switch msg.Header["Status"] { - // task is complete - case "done": - errCh <- nil - // someone failed - case "error": - returnError(errors.New(msg.Header["Error"]), errCh) - } - - return nil - }) - if err != nil { - return err - } - defer subStatus.Unsubscribe() - - // a new task - msg := &broker.Message{ - Header: map[string]string{ - "Command": c.Name, - "Id": id, - "Status": "start", - "Timestamp": fmt.Sprintf("%d", time.Now().Unix()), - }, - } - - // artificially delay the start of the task - time.Sleep(time.Millisecond * time.Duration(10+rand.Intn(100))) - - // publish the task - if err := t.Broker.Publish(topic, msg); err != nil { - return err - } - - var gerrors []string - - // wait for all responses - for i := 0; i < t.Options.Pool; i++ { - // check errors - err := <-errCh - - // append to errors - if err != nil { - gerrors = append(gerrors, err.Error()) - } - } - - // return the errors - if len(gerrors) > 0 { - return errors.New("errors: " + strings.Join(gerrors, "\n")) - } - - return nil -} - -func (t *Task) Status() string { - t.mtx.RLock() - defer t.mtx.RUnlock() - return t.status -} - -// Broker sets the micro broker -func WithBroker(b broker.Broker) task.Option { - return func(o *task.Options) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, brokerKey{}, b) - } -} - -// NewTask returns a new broker task -func NewTask(opts ...task.Option) task.Task { - options := task.Options{ - Context: context.Background(), - } - - for _, o := range opts { - o(&options) - } - - if options.Pool == 0 { - options.Pool = 1 - } - - b, ok := options.Context.Value(brokerKey{}).(broker.Broker) - if !ok { - b = broker.DefaultBroker - } - - return &Task{ - Broker: b, - Options: options, - } -} diff --git a/sync/task/local/local.go b/sync/task/local/local.go deleted file mode 100644 index 5ce2631d..00000000 --- a/sync/task/local/local.go +++ /dev/null @@ -1,59 +0,0 @@ -// Package local provides a local task runner -package local - -import ( - "fmt" - "sync" - - "github.com/micro/go-micro/v2/sync/task" -) - -type localTask struct { - opts task.Options - mtx sync.RWMutex - status string -} - -func (l *localTask) Run(t task.Command) error { - ch := make(chan error, l.opts.Pool) - - for i := 0; i < l.opts.Pool; i++ { - go func() { - ch <- t.Execute() - }() - } - - var err error - - for i := 0; i < l.opts.Pool; i++ { - er := <-ch - if err != nil { - err = er - l.mtx.Lock() - l.status = fmt.Sprintf("command [%s] status: %s", t.Name, err.Error()) - l.mtx.Unlock() - } - } - - close(ch) - return err -} - -func (l *localTask) Status() string { - l.mtx.RLock() - defer l.mtx.RUnlock() - return l.status -} - -func NewTask(opts ...task.Option) task.Task { - var options task.Options - for _, o := range opts { - o(&options) - } - if options.Pool == 0 { - options.Pool = 1 - } - return &localTask{ - opts: options, - } -} diff --git a/sync/task/task.go b/sync/task/task.go deleted file mode 100644 index c9b9b5d6..00000000 --- a/sync/task/task.go +++ /dev/null @@ -1,85 +0,0 @@ -// Package task provides an interface for distributed jobs -package task - -import ( - "context" - "fmt" - "time" -) - -// Task represents a distributed task -type Task interface { - // Run runs a command immediately until completion - Run(Command) error - // Status provides status of last execution - Status() string -} - -// Command to be executed -type Command struct { - Name string - Func func() error -} - -// Schedule represents a time or interval at which a task should run -type Schedule struct { - // When to start the schedule. Zero time means immediately - Time time.Time - // Non zero interval dictates an ongoing schedule - Interval time.Duration -} - -type Options struct { - // Pool size for workers - Pool int - // Alternative options - Context context.Context -} - -type Option func(o *Options) - -func (c Command) Execute() error { - return c.Func() -} - -func (c Command) String() string { - return c.Name -} - -func (s Schedule) Run() <-chan time.Time { - d := s.Time.Sub(time.Now()) - - ch := make(chan time.Time, 1) - - go func() { - // wait for start time - <-time.After(d) - - // zero interval - if s.Interval == time.Duration(0) { - ch <- time.Now() - close(ch) - return - } - - // start ticker - ticker := time.NewTicker(s.Interval) - defer ticker.Stop() - for t := range ticker.C { - ch <- t - } - }() - - return ch -} - -func (s Schedule) String() string { - return fmt.Sprintf("%d-%d", s.Time.Unix(), s.Interval) -} - -// WithPool sets the pool size for concurrent work -func WithPool(i int) Option { - return func(o *Options) { - o.Pool = i - } -} diff --git a/sync/time/local/local.go b/sync/time/local/local.go deleted file mode 100644 index 6be36de7..00000000 --- a/sync/time/local/local.go +++ /dev/null @@ -1,18 +0,0 @@ -// Package local provides a local clock -package local - -import ( - gotime "time" - - "github.com/micro/go-micro/v2/sync/time" -) - -type Time struct{} - -func (t *Time) Now() (gotime.Time, error) { - return gotime.Now(), nil -} - -func NewTime(opts ...time.Option) time.Time { - return new(Time) -} diff --git a/sync/time/ntp/ntp.go b/sync/time/ntp/ntp.go deleted file mode 100644 index 91cf5862..00000000 --- a/sync/time/ntp/ntp.go +++ /dev/null @@ -1,51 +0,0 @@ -// Package ntp provides ntp synchronized time -package ntp - -import ( - "context" - gotime "time" - - "github.com/beevik/ntp" - "github.com/micro/go-micro/v2/sync/time" -) - -type ntpTime struct { - server string -} - -type ntpServerKey struct{} - -func (n *ntpTime) Now() (gotime.Time, error) { - return ntp.Time(n.server) -} - -// NewTime returns ntp time -func NewTime(opts ...time.Option) time.Time { - options := time.Options{ - Context: context.Background(), - } - - for _, o := range opts { - o(&options) - } - - server := "time.google.com" - - if k, ok := options.Context.Value(ntpServerKey{}).(string); ok { - server = k - } - - return &ntpTime{ - server: server, - } -} - -// WithServer sets the ntp server -func WithServer(s string) time.Option { - return func(o *time.Options) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, ntpServerKey{}, s) - } -} diff --git a/sync/time/time.go b/sync/time/time.go deleted file mode 100644 index 7c11a751..00000000 --- a/sync/time/time.go +++ /dev/null @@ -1,18 +0,0 @@ -// Package time provides clock synchronization -package time - -import ( - "context" - "time" -) - -// Time returns synchronized time -type Time interface { - Now() (time.Time, error) -} - -type Options struct { - Context context.Context -} - -type Option func(o *Options)