diff --git a/sync/README.md b/sync/README.md new file mode 100644 index 00000000..94349374 --- /dev/null +++ b/sync/README.md @@ -0,0 +1,141 @@ +# Sync + +Sync is a synchronization library for distributed systems. + +## Overview + +Distributed systems by their very nature are decoupled and independent. In most cases they must honour 2 out of 3 letters of the CAP theorem +e.g Availability and Partitional tolerance but sacrificing consistency. In the case of microservices we often offload this concern to +an external database or eventing system. Go Sync provides a framework for synchronization which can be used in the application by the developer. + +## Getting Started + +- [Data](#data) - simple distributed data storage +- [Leader](#leader) - leadership election for group coordination +- [Lock](#lock) - distributed locking for exclusive resource access +- [Task](#task) - distributed job execution +- [Time](#time) - provides synchronized time + +## Lock + +The Lock interface provides distributed locking. Multiple instances attempting to lock the same id will block until available. + +```go +import "github.com/micro/go-micro/sync/lock/consul" + +lock := consul.NewLock() + +// acquire lock +err := lock.Acquire("id") +// handle err + +// release lock +err = lock.Release("id") +// handle err +``` + +## Leader + +Leader provides leadership election. Useful where one node needs to coordinate some action. + +```go +import ( + "github.com/micro/go-micro/sync/leader" + "github.com/micro/go-micro/sync/leader/consul" +) + +l := consul.NewLeader( + leader.Group("name"), +) + +// elect leader +e, err := l.Elect("id") +// handle err + + +// operate while leader +revoked := e.Revoked() + +for { + select { + case <-revoked: + // re-elect + e.Elect("id") + default: + // leader operation + } +} + +// resign leadership +e.Resign() +``` + +## Data + +Data provides a simple interface for distributed data storage. + +```go +import ( + "github.com/micro/go-micro/sync/data" + "github.com/micro/go-micro/sync/data/consul" +) + +keyval := consul.NewData() + +err := keyval.Write(&data.Record{ + Key: "foo", + Value: []byte(`bar`), +}) +// handle err + +v, err := keyval.Read("foo") +// handle err + +err = keyval.Delete("foo") +``` + +## Task + +Task provides distributed job execution. It's a simple way to distribute work across a coordinated pool of workers. + +```go +import ( + "github.com/micro/go-micro/sync/task" + "github.com/micro/go-micro/sync/task/local" +) + +t := local.NewTask( + task.WithPool(10), +) + +err := t.Run(task.Command{ + Name: "atask", + Func: func() error { + // exec some work + return nil + }, +}) + +if err != nil { + // do something +} +``` + +## Time + +Time provides synchronized time. Local machines may have clock skew and time cannot be guaranteed to be the same everywhere. +Synchronized Time allows you to decide how time is defined for your applications. + +```go +import ( + "github.com/micro/go-micro/sync/time/ntp" +) + + +t := ntp.NewTime() +time, err := t.Now() +``` + +## TODO + +- Event package - strongly consistent event stream e.g kafka diff --git a/sync/cron.go b/sync/cron.go new file mode 100644 index 00000000..52ef20d5 --- /dev/null +++ b/sync/cron.go @@ -0,0 +1,93 @@ +package sync + +import ( + "fmt" + "math" + "time" + + "github.com/micro/go-log" + "github.com/micro/go-micro/sync/leader/consul" + "github.com/micro/go-micro/sync/task" + "github.com/micro/go-micro/sync/task/local" +) + +type syncCron struct { + opts Options +} + +func backoff(attempts int) time.Duration { + if attempts == 0 { + return time.Duration(0) + } + return time.Duration(math.Pow(10, float64(attempts))) * time.Millisecond +} + +func (c *syncCron) Schedule(s task.Schedule, t task.Command) error { + id := fmt.Sprintf("%s-%s", s.String(), t.String()) + + go func() { + // run the scheduler + tc := s.Run() + + var i int + + for { + // leader election + e, err := c.opts.Leader.Elect(id) + if err != nil { + log.Logf("[cron] leader election error: %v", err) + time.Sleep(backoff(i)) + i++ + continue + } + + i = 0 + r := e.Revoked() + + // execute the task + Tick: + for { + select { + // schedule tick + case _, ok := <-tc: + // ticked once + if !ok { + break Tick + } + + log.Logf("[cron] executing command %s", t.Name) + if err := c.opts.Task.Run(t); err != nil { + log.Logf("[cron] error executing command %s: %v", t.Name, err) + } + // leader revoked + case <-r: + break Tick + } + } + + // resign + e.Resign() + } + }() + + return nil +} + +func NewCron(opts ...Option) Cron { + var options Options + for _, o := range opts { + o(&options) + } + + if options.Leader == nil { + options.Leader = consul.NewLeader() + } + + if options.Task == nil { + options.Task = local.NewTask() + } + + return &syncCron{ + opts: options, + } +} diff --git a/sync/data/consul/consul.go b/sync/data/consul/consul.go new file mode 100644 index 00000000..9d2c7a93 --- /dev/null +++ b/sync/data/consul/consul.go @@ -0,0 +1,93 @@ +// Package consul is a consul implementation of kv +package consul + +import ( + "fmt" + "net" + + "github.com/hashicorp/consul/api" + "github.com/micro/go-micro/sync/data" +) + +type ckv struct { + client *api.Client +} + +func (c *ckv) Read(key string) (*data.Record, error) { + keyval, _, err := c.client.KV().Get(key, nil) + if err != nil { + return nil, err + } + + if keyval == nil { + return nil, data.ErrNotFound + } + + return &data.Record{ + Key: keyval.Key, + Value: keyval.Value, + }, nil +} + +func (c *ckv) Delete(key string) error { + _, err := c.client.KV().Delete(key, nil) + return err +} + +func (c *ckv) Write(record *data.Record) error { + _, err := c.client.KV().Put(&api.KVPair{ + Key: record.Key, + Value: record.Value, + }, nil) + return err +} + +func (c *ckv) Dump() ([]*data.Record, error) { + keyval, _, err := c.client.KV().List("/", nil) + if err != nil { + return nil, err + } + if keyval == nil { + return nil, data.ErrNotFound + } + var vals []*data.Record + for _, keyv := range keyval { + vals = append(vals, &data.Record{ + Key: keyv.Key, + Value: keyv.Value, + }) + } + return vals, nil +} + +func (c *ckv) String() string { + return "consul" +} + +func NewData(opts ...data.Option) data.Data { + var options data.Options + for _, o := range opts { + o(&options) + } + + config := api.DefaultConfig() + + // set host + // config.Host something + // check if there are any addrs + if len(options.Nodes) > 0 { + addr, port, err := net.SplitHostPort(options.Nodes[0]) + if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" { + port = "8500" + config.Address = fmt.Sprintf("%s:%s", options.Nodes[0], port) + } else if err == nil { + config.Address = fmt.Sprintf("%s:%s", addr, port) + } + } + + client, _ := api.NewClient(config) + + return &ckv{ + client: client, + } +} diff --git a/sync/data/data.go b/sync/data/data.go new file mode 100644 index 00000000..20e4a4c8 --- /dev/null +++ b/sync/data/data.go @@ -0,0 +1,32 @@ +// Package data is an interface for key-value storage. +package data + +import ( + "errors" + "time" +) + +var ( + ErrNotFound = errors.New("not found") +) + +// Data is a data storage interface +type Data interface { + // Dump the known records + Dump() ([]*Record, error) + // Read a record with key + Read(key string) (*Record, error) + // Write a record + Write(r *Record) error + // Delete a record with key + Delete(key string) error +} + +// Record represents a data record +type Record struct { + Key string + Value []byte + Expiration time.Duration +} + +type Option func(o *Options) diff --git a/sync/data/etcd/etcd.go b/sync/data/etcd/etcd.go new file mode 100644 index 00000000..fa0db751 --- /dev/null +++ b/sync/data/etcd/etcd.go @@ -0,0 +1,93 @@ +// Package etcd is an etcd v3 implementation of kv +package etcd + +import ( + "context" + "log" + + "github.com/micro/go-micro/sync/data" + client "go.etcd.io/etcd/clientv3" +) + +type ekv struct { + kv client.KV +} + +func (e *ekv) Read(key string) (*data.Record, error) { + keyval, err := e.kv.Get(context.Background(), key) + if err != nil { + return nil, err + } + + if keyval == nil || len(keyval.Kvs) == 0 { + return nil, data.ErrNotFound + } + + return &data.Record{ + Key: string(keyval.Kvs[0].Key), + Value: keyval.Kvs[0].Value, + }, nil +} + +func (e *ekv) Delete(key string) error { + _, err := e.kv.Delete(context.Background(), key) + return err +} + +func (e *ekv) Write(record *data.Record) error { + _, err := e.kv.Put(context.Background(), record.Key, string(record.Value)) + return err +} + +func (e *ekv) Dump() ([]*data.Record, error) { + keyval, err := e.kv.Get(context.Background(), "/", client.WithPrefix()) + if err != nil { + return nil, err + } + var vals []*data.Record + if keyval == nil || len(keyval.Kvs) == 0 { + return vals, nil + } + for _, keyv := range keyval.Kvs { + vals = append(vals, &data.Record{ + Key: string(keyv.Key), + Value: keyv.Value, + }) + } + return vals, nil +} + +func (e *ekv) String() string { + return "etcd" +} + +func NewData(opts ...data.Option) data.Data { + var options data.Options + for _, o := range opts { + o(&options) + } + + var endpoints []string + + for _, addr := range options.Nodes { + if len(addr) > 0 { + endpoints = append(endpoints, addr) + } + } + + if len(endpoints) == 0 { + endpoints = []string{"http://127.0.0.1:2379"} + } + + // TODO: parse addresses + c, err := client.New(client.Config{ + Endpoints: endpoints, + }) + if err != nil { + log.Fatal(err) + } + + return &ekv{ + kv: client.NewKV(c), + } +} diff --git a/sync/data/memcached/memcached.go b/sync/data/memcached/memcached.go new file mode 100644 index 00000000..69cdd19b --- /dev/null +++ b/sync/data/memcached/memcached.go @@ -0,0 +1,178 @@ +package memcached + +import ( + "bufio" + "bytes" + "fmt" + "io" + "net" + "strings" + "time" + + mc "github.com/bradfitz/gomemcache/memcache" + "github.com/micro/go-micro/sync/data" +) + +type mkv struct { + Server *mc.ServerList + Client *mc.Client +} + +func (m *mkv) Read(key string) (*data.Record, error) { + keyval, err := m.Client.Get(key) + if err != nil && err == mc.ErrCacheMiss { + return nil, data.ErrNotFound + } else if err != nil { + return nil, err + } + + if keyval == nil { + return nil, data.ErrNotFound + } + + return &data.Record{ + Key: keyval.Key, + Value: keyval.Value, + Expiration: time.Second * time.Duration(keyval.Expiration), + }, nil +} + +func (m *mkv) Delete(key string) error { + return m.Client.Delete(key) +} + +func (m *mkv) Write(record *data.Record) error { + return m.Client.Set(&mc.Item{ + Key: record.Key, + Value: record.Value, + Expiration: int32(record.Expiration.Seconds()), + }) +} + +func (m *mkv) Dump() ([]*data.Record, error) { + // stats + // cachedump + // get keys + + var keys []string + + //data := make(map[string]string) + if err := m.Server.Each(func(c net.Addr) error { + cc, err := net.Dial("tcp", c.String()) + if err != nil { + return err + } + defer cc.Close() + + b := bufio.NewReadWriter(bufio.NewReader(cc), bufio.NewWriter(cc)) + + // get records + if _, err := fmt.Fprintf(b, "stats records\r\n"); err != nil { + return err + } + + b.Flush() + + v, err := b.ReadSlice('\n') + if err != nil { + return err + } + + parts := bytes.Split(v, []byte("\n")) + if len(parts) < 1 { + return nil + } + vals := strings.Split(string(parts[0]), ":") + records := vals[1] + + // drain + for { + buf, err := b.ReadSlice('\n') + if err == io.EOF { + break + } + if err != nil { + return err + } + if strings.HasPrefix(string(buf), "END") { + break + } + } + + b.Writer.Reset(cc) + b.Reader.Reset(cc) + + if _, err := fmt.Fprintf(b, "lru_crawler metadump %s\r\n", records); err != nil { + return err + } + b.Flush() + + for { + v, err := b.ReadString('\n') + if err == io.EOF { + break + } + if err != nil { + return err + } + if strings.HasPrefix(v, "END") { + break + } + key := strings.Split(v, " ")[0] + keys = append(keys, strings.TrimPrefix(key, "key=")) + } + + return nil + }); err != nil { + return nil, err + } + + var vals []*data.Record + + // concurrent op + ch := make(chan *data.Record, len(keys)) + + for _, k := range keys { + go func(key string) { + i, _ := m.Read(key) + ch <- i + }(k) + } + + for i := 0; i < len(keys); i++ { + record := <-ch + + if record == nil { + continue + } + + vals = append(vals, record) + } + + close(ch) + + return vals, nil +} + +func (m *mkv) String() string { + return "memcached" +} + +func NewData(opts ...data.Option) data.Data { + var options data.Options + for _, o := range opts { + o(&options) + } + + if len(options.Nodes) == 0 { + options.Nodes = []string{"127.0.0.1:11211"} + } + + ss := new(mc.ServerList) + ss.SetServers(options.Nodes...) + + return &mkv{ + Server: ss, + Client: mc.New(options.Nodes...), + } +} diff --git a/sync/data/options.go b/sync/data/options.go new file mode 100644 index 00000000..1d85aca3 --- /dev/null +++ b/sync/data/options.go @@ -0,0 +1,19 @@ +package data + +type Options struct { + Nodes []string + Prefix string +} + +func Nodes(a ...string) Option { + return func(o *Options) { + o.Nodes = a + } +} + +// Prefix sets a prefix to any lock ids used +func Prefix(p string) Option { + return func(o *Options) { + o.Prefix = p + } +} diff --git a/sync/data/redis/redis.go b/sync/data/redis/redis.go new file mode 100644 index 00000000..05f05529 --- /dev/null +++ b/sync/data/redis/redis.go @@ -0,0 +1,82 @@ +package redis + +import ( + "github.com/micro/go-micro/sync/data" + redis "gopkg.in/redis.v3" +) + +type rkv struct { + Client *redis.Client +} + +func (r *rkv) Read(key string) (*data.Record, error) { + val, err := r.Client.Get(key).Bytes() + + if err != nil && err == redis.Nil { + return nil, data.ErrNotFound + } else if err != nil { + return nil, err + } + + if val == nil { + return nil, data.ErrNotFound + } + + d, err := r.Client.TTL(key).Result() + if err != nil { + return nil, err + } + + return &data.Record{ + Key: key, + Value: val, + Expiration: d, + }, nil +} + +func (r *rkv) Delete(key string) error { + return r.Client.Del(key).Err() +} + +func (r *rkv) Write(record *data.Record) error { + return r.Client.Set(record.Key, record.Value, record.Expiration).Err() +} + +func (r *rkv) Dump() ([]*data.Record, error) { + keys, err := r.Client.Keys("*").Result() + if err != nil { + return nil, err + } + var vals []*data.Record + for _, k := range keys { + i, err := r.Read(k) + if err != nil { + return nil, err + } + vals = append(vals, i) + } + return vals, nil +} + +func (r *rkv) String() string { + return "redis" +} + +func NewData(opts ...data.Option) data.Data { + var options data.Options + for _, o := range opts { + o(&options) + } + + if len(options.Nodes) == 0 { + options.Nodes = []string{"127.0.0.1:6379"} + } + + return &rkv{ + Client: redis.NewClient(&redis.Options{ + Addr: options.Nodes[0], + Password: "", // no password set + DB: 0, // use default DB + }), + } +} diff --git a/sync/db.go b/sync/db.go new file mode 100644 index 00000000..a16af598 --- /dev/null +++ b/sync/db.go @@ -0,0 +1,157 @@ +package sync + +import ( + "bytes" + "encoding/base64" + "encoding/json" + "fmt" + + "github.com/micro/go-micro/sync/data" + ckv "github.com/micro/go-micro/sync/data/consul" + lock "github.com/micro/go-micro/sync/lock/consul" +) + +type syncDB struct { + opts Options +} + +func ekey(k interface{}) string { + b, _ := json.Marshal(k) + return base64.StdEncoding.EncodeToString(b) +} + +func (m *syncDB) Read(key, val interface{}) error { + if key == nil { + return fmt.Errorf("key is nil") + } + + kstr := ekey(key) + + // lock + if err := m.opts.Lock.Acquire(kstr); err != nil { + return err + } + defer m.opts.Lock.Release(kstr) + + // get key + kval, err := m.opts.Data.Read(kstr) + if err != nil { + return err + } + + // decode value + return json.Unmarshal(kval.Value, val) +} + +func (m *syncDB) Write(key, val interface{}) error { + if key == nil { + return fmt.Errorf("key is nil") + } + + kstr := ekey(key) + + // lock + if err := m.opts.Lock.Acquire(kstr); err != nil { + return err + } + defer m.opts.Lock.Release(kstr) + + // encode value + b, err := json.Marshal(val) + if err != nil { + return err + } + + // set key + return m.opts.Data.Write(&data.Record{ + Key: kstr, + Value: b, + }) +} + +func (m *syncDB) Delete(key interface{}) error { + if key == nil { + return fmt.Errorf("key is nil") + } + + kstr := ekey(key) + + // lock + if err := m.opts.Lock.Acquire(kstr); err != nil { + return err + } + defer m.opts.Lock.Release(kstr) + return m.opts.Data.Delete(kstr) +} + +func (m *syncDB) Iterate(fn func(key, val interface{}) error) error { + keyvals, err := m.opts.Data.Dump() + if err != nil { + return err + } + + for _, keyval := range keyvals { + // lock + if err := m.opts.Lock.Acquire(keyval.Key); err != nil { + return err + } + // unlock + defer m.opts.Lock.Release(keyval.Key) + + // unmarshal value + var val interface{} + + if len(keyval.Value) > 0 && keyval.Value[0] == '{' { + if err := json.Unmarshal(keyval.Value, &val); err != nil { + return err + } + } else { + val = keyval.Value + } + + // exec func + if err := fn(keyval.Key, val); err != nil { + return err + } + + // save val + b, err := json.Marshal(val) + if err != nil { + return err + } + + // no save + if i := bytes.Compare(keyval.Value, b); i == 0 { + return nil + } + + // set key + if err := m.opts.Data.Write(&data.Record{ + Key: keyval.Key, + Value: b, + }); err != nil { + return err + } + } + + return nil +} + +func NewDB(opts ...Option) DB { + var options Options + for _, o := range opts { + o(&options) + } + + if options.Lock == nil { + options.Lock = lock.NewLock() + } + + if options.Data == nil { + options.Data = ckv.NewData() + } + + return &syncDB{ + opts: options, + } +} diff --git a/sync/event/event.go b/sync/event/event.go new file mode 100644 index 00000000..bd85d16c --- /dev/null +++ b/sync/event/event.go @@ -0,0 +1,27 @@ +// Package event provides a distributed log interface +package event + +// Event provides a distributed log interface +type Event interface { + // Log retrieves the log with an id/name + Log(id string) (Log, error) +} + +// Log is an individual event log +type Log interface { + // Close the log handle + Close() error + // Log ID + Id() string + // Read will read the next record + Read() (*Record, error) + // Go to an offset + Seek(offset int64) error + // Write an event to the log + Write(*Record) error +} + +type Record struct { + Metadata map[string]interface{} + Data []byte +} diff --git a/sync/leader/consul/consul.go b/sync/leader/consul/consul.go new file mode 100644 index 00000000..b5b344c6 --- /dev/null +++ b/sync/leader/consul/consul.go @@ -0,0 +1,158 @@ +package consul + +import ( + "fmt" + "log" + "net" + "os" + "path" + "sync" + + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/api/watch" + "github.com/micro/go-micro/sync/leader" +) + +type consulLeader struct { + opts leader.Options + c *api.Client +} + +type consulElected struct { + c *api.Client + l *api.Lock + id string + key string + opts leader.ElectOptions + + mtx sync.RWMutex + rv <-chan struct{} +} + +func (c *consulLeader) Elect(id string, opts ...leader.ElectOption) (leader.Elected, error) { + var options leader.ElectOptions + for _, o := range opts { + o(&options) + } + + key := path.Join("micro/leader", c.opts.Group) + + lc, err := c.c.LockOpts(&api.LockOptions{ + Key: key, + Value: []byte(id), + }) + if err != nil { + return nil, err + } + + rv, err := lc.Lock(nil) + if err != nil { + return nil, err + } + + return &consulElected{ + c: c.c, + key: key, + rv: rv, + id: id, + l: lc, + opts: options, + }, nil +} + +func (c *consulLeader) Follow() chan string { + ch := make(chan string, 1) + + key := path.Join("/micro/leader", c.opts.Group) + + p, err := watch.Parse(map[string]interface{}{ + "type": "key", + "key": key, + }) + if err != nil { + return ch + } + p.Handler = func(idx uint64, raw interface{}) { + if raw == nil { + return // ignore + } + v, ok := raw.(*api.KVPair) + if !ok || v == nil { + return // ignore + } + ch <- string(v.Value) + } + + go p.RunWithClientAndLogger(c.c, log.New(os.Stdout, "consul: ", log.Lshortfile)) + return ch +} + +func (c *consulLeader) String() string { + return "consul" +} + +func (c *consulElected) Id() string { + return c.id +} + +func (c *consulElected) Reelect() error { + rv, err := c.l.Lock(nil) + if err != nil { + return err + } + + c.mtx.Lock() + c.rv = rv + c.mtx.Unlock() + return nil +} + +func (c *consulElected) Revoked() chan bool { + ch := make(chan bool, 1) + c.mtx.RLock() + rv := c.rv + c.mtx.RUnlock() + + go func() { + <-rv + ch <- true + close(ch) + }() + + return ch +} + +func (c *consulElected) Resign() error { + return c.l.Unlock() +} + +func NewLeader(opts ...leader.Option) leader.Leader { + options := leader.Options{ + Group: "default", + } + for _, o := range opts { + o(&options) + } + + config := api.DefaultConfig() + + // set host + // config.Host something + // check if there are any addrs + if len(options.Nodes) > 0 { + addr, port, err := net.SplitHostPort(options.Nodes[0]) + if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" { + port = "8500" + config.Address = fmt.Sprintf("%s:%s", addr, port) + } else if err == nil { + config.Address = fmt.Sprintf("%s:%s", addr, port) + } + } + + client, _ := api.NewClient(config) + + return &consulLeader{ + opts: options, + c: client, + } +} diff --git a/sync/leader/etcd/etcd.go b/sync/leader/etcd/etcd.go new file mode 100644 index 00000000..603c698f --- /dev/null +++ b/sync/leader/etcd/etcd.go @@ -0,0 +1,145 @@ +package etcd + +import ( + "context" + "log" + "path" + "strings" + + "github.com/micro/go-micro/sync/leader" + client "go.etcd.io/etcd/clientv3" + cc "go.etcd.io/etcd/clientv3/concurrency" +) + +type etcdLeader struct { + opts leader.Options + path string + client *client.Client +} + +type etcdElected struct { + s *cc.Session + e *cc.Election + id string +} + +func (e *etcdLeader) Elect(id string, opts ...leader.ElectOption) (leader.Elected, error) { + var options leader.ElectOptions + for _, o := range opts { + o(&options) + } + + // make path + path := path.Join(e.path, strings.Replace(id, "/", "-", -1)) + + s, err := cc.NewSession(e.client) + if err != nil { + return nil, err + } + + l := cc.NewElection(s, path) + + ctx, _ := context.WithCancel(context.Background()) + + if err := l.Campaign(ctx, id); err != nil { + return nil, err + } + + return &etcdElected{ + e: l, + id: id, + }, nil +} + +func (e *etcdLeader) Follow() chan string { + ch := make(chan string) + + s, err := cc.NewSession(e.client) + if err != nil { + return ch + } + + l := cc.NewElection(s, e.path) + ech := l.Observe(context.Background()) + + go func() { + for { + select { + case r, ok := <-ech: + if !ok { + return + } + ch <- string(r.Kvs[0].Value) + } + } + }() + + return ch +} + +func (e *etcdLeader) String() string { + return "etcd" +} + +func (e *etcdElected) Reelect() error { + ctx, _ := context.WithCancel(context.Background()) + return e.e.Campaign(ctx, e.id) +} + +func (e *etcdElected) Revoked() chan bool { + ch := make(chan bool, 1) + ech := e.e.Observe(context.Background()) + + go func() { + for r := range ech { + if string(r.Kvs[0].Value) != e.id { + ch <- true + close(ch) + return + } + } + }() + + return ch +} + +func (e *etcdElected) Resign() error { + return e.e.Resign(context.Background()) +} + +func (e *etcdElected) Id() string { + return e.id +} + +func NewLeader(opts ...leader.Option) leader.Leader { + var options leader.Options + for _, o := range opts { + o(&options) + } + + var endpoints []string + + for _, addr := range options.Nodes { + if len(addr) > 0 { + endpoints = append(endpoints, addr) + } + } + + if len(endpoints) == 0 { + endpoints = []string{"http://127.0.0.1:2379"} + } + + // TODO: parse addresses + c, err := client.New(client.Config{ + Endpoints: endpoints, + }) + if err != nil { + log.Fatal(err) + } + + return &etcdLeader{ + path: "/micro/leader", + client: c, + opts: options, + } +} diff --git a/sync/leader/leader.go b/sync/leader/leader.go new file mode 100644 index 00000000..a4ac53f3 --- /dev/null +++ b/sync/leader/leader.go @@ -0,0 +1,25 @@ +// Package leader provides leader election +package leader + +// Leader provides leadership election +type Leader interface { + // elect leader + Elect(id string, opts ...ElectOption) (Elected, error) + // follow the leader + Follow() chan string +} + +type Elected interface { + // id of leader + Id() string + // seek re-election + Reelect() error + // resign leadership + Resign() error + // observe leadership revocation + Revoked() chan bool +} + +type Option func(o *Options) + +type ElectOption func(o *ElectOptions) diff --git a/sync/leader/options.go b/sync/leader/options.go new file mode 100644 index 00000000..05e92e91 --- /dev/null +++ b/sync/leader/options.go @@ -0,0 +1,22 @@ +package leader + +type Options struct { + Nodes []string + Group string +} + +type ElectOptions struct{} + +// Nodes sets the addresses of the underlying systems +func Nodes(a ...string) Option { + return func(o *Options) { + o.Nodes = a + } +} + +// Group sets the group name for coordinating leadership +func Group(g string) Option { + return func(o *Options) { + o.Group = g + } +} diff --git a/sync/lock/consul/consul.go b/sync/lock/consul/consul.go new file mode 100644 index 00000000..ef0f2dd4 --- /dev/null +++ b/sync/lock/consul/consul.go @@ -0,0 +1,104 @@ +// Package consul is a consul implemenation of lock +package consul + +import ( + "errors" + "fmt" + "net" + "sync" + "time" + + "github.com/hashicorp/consul/api" + lock "github.com/micro/go-micro/sync/lock" +) + +type consulLock struct { + sync.Mutex + + locks map[string]*api.Lock + opts lock.Options + c *api.Client +} + +func (c *consulLock) Acquire(id string, opts ...lock.AcquireOption) error { + var options lock.AcquireOptions + for _, o := range opts { + o(&options) + } + + if options.Wait <= time.Duration(0) { + options.Wait = api.DefaultLockWaitTime + } + + ttl := fmt.Sprintf("%v", options.TTL) + if options.TTL <= time.Duration(0) { + ttl = api.DefaultLockSessionTTL + } + + l, err := c.c.LockOpts(&api.LockOptions{ + Key: c.opts.Prefix + id, + LockWaitTime: options.Wait, + SessionTTL: ttl, + }) + + if err != nil { + return err + } + + _, err = l.Lock(nil) + if err != nil { + return err + } + + c.Lock() + c.locks[id] = l + c.Unlock() + + return nil +} + +func (c *consulLock) Release(id string) error { + c.Lock() + defer c.Unlock() + l, ok := c.locks[id] + if !ok { + return errors.New("lock not found") + } + err := l.Unlock() + delete(c.locks, id) + return err +} + +func (c *consulLock) String() string { + return "consul" +} + +func NewLock(opts ...lock.Option) lock.Lock { + var options lock.Options + for _, o := range opts { + o(&options) + } + + config := api.DefaultConfig() + + // set host + // config.Host something + // check if there are any addrs + if len(options.Nodes) > 0 { + addr, port, err := net.SplitHostPort(options.Nodes[0]) + if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" { + port = "8500" + config.Address = fmt.Sprintf("%s:%s", options.Nodes[0], port) + } else if err == nil { + config.Address = fmt.Sprintf("%s:%s", addr, port) + } + } + + client, _ := api.NewClient(config) + + return &consulLock{ + locks: make(map[string]*api.Lock), + opts: options, + c: client, + } +} diff --git a/sync/lock/etcd/etcd.go b/sync/lock/etcd/etcd.go new file mode 100644 index 00000000..d506e7b5 --- /dev/null +++ b/sync/lock/etcd/etcd.go @@ -0,0 +1,115 @@ +// Package etcd is an etcd implementation of lock +package etcd + +import ( + "context" + "errors" + "log" + "path" + "strings" + "sync" + + "github.com/micro/go-micro/sync/lock" + client "go.etcd.io/etcd/clientv3" + cc "go.etcd.io/etcd/clientv3/concurrency" +) + +type etcdLock struct { + opts lock.Options + path string + client *client.Client + + sync.Mutex + locks map[string]*elock +} + +type elock struct { + s *cc.Session + m *cc.Mutex +} + +func (e *etcdLock) Acquire(id string, opts ...lock.AcquireOption) error { + var options lock.AcquireOptions + for _, o := range opts { + o(&options) + } + + // make path + path := path.Join(e.path, strings.Replace(e.opts.Prefix+id, "/", "-", -1)) + + var sopts []cc.SessionOption + if options.TTL > 0 { + sopts = append(sopts, cc.WithTTL(int(options.TTL.Seconds()))) + } + + s, err := cc.NewSession(e.client, sopts...) + if err != nil { + return err + } + + m := cc.NewMutex(s, path) + + ctx, _ := context.WithCancel(context.Background()) + + if err := m.Lock(ctx); err != nil { + return err + } + + e.Lock() + e.locks[id] = &elock{ + s: s, + m: m, + } + e.Unlock() + return nil +} + +func (e *etcdLock) Release(id string) error { + e.Lock() + defer e.Unlock() + v, ok := e.locks[id] + if !ok { + return errors.New("lock not found") + } + err := v.m.Unlock(context.Background()) + delete(e.locks, id) + return err +} + +func (e *etcdLock) String() string { + return "etcd" +} + +func NewLock(opts ...lock.Option) lock.Lock { + var options lock.Options + for _, o := range opts { + o(&options) + } + + var endpoints []string + + for _, addr := range options.Nodes { + if len(addr) > 0 { + endpoints = append(endpoints, addr) + } + } + + if len(endpoints) == 0 { + endpoints = []string{"http://127.0.0.1:2379"} + } + + // TODO: parse addresses + c, err := client.New(client.Config{ + Endpoints: endpoints, + }) + if err != nil { + log.Fatal(err) + } + + return &etcdLock{ + path: "/micro/lock", + client: c, + opts: options, + locks: make(map[string]*elock), + } +} diff --git a/sync/lock/lock.go b/sync/lock/lock.go new file mode 100644 index 00000000..8be6629f --- /dev/null +++ b/sync/lock/lock.go @@ -0,0 +1,27 @@ +// Package lock provides distributed locking +package lock + +import ( + "time" +) + +// Lock is a distributed locking interface +type Lock interface { + // Acquire a lock with given id + Acquire(id string, opts ...AcquireOption) error + // Release the lock with given id + Release(id string) error +} + +type Options struct { + Nodes []string + Prefix string +} + +type AcquireOptions struct { + TTL time.Duration + Wait time.Duration +} + +type Option func(o *Options) +type AcquireOption func(o *AcquireOptions) diff --git a/sync/lock/options.go b/sync/lock/options.go new file mode 100644 index 00000000..4804ff9c --- /dev/null +++ b/sync/lock/options.go @@ -0,0 +1,33 @@ +package lock + +import ( + "time" +) + +// Nodes sets the addresses the underlying lock implementation +func Nodes(a ...string) Option { + return func(o *Options) { + o.Nodes = a + } +} + +// Prefix sets a prefix to any lock ids used +func Prefix(p string) Option { + return func(o *Options) { + o.Prefix = p + } +} + +// TTL sets the lock ttl +func TTL(t time.Duration) AcquireOption { + return func(o *AcquireOptions) { + o.TTL = t + } +} + +// Wait sets the wait time +func Wait(t time.Duration) AcquireOption { + return func(o *AcquireOptions) { + o.Wait = t + } +} diff --git a/sync/lock/redis/pool.go b/sync/lock/redis/pool.go new file mode 100644 index 00000000..bc80a0b9 --- /dev/null +++ b/sync/lock/redis/pool.go @@ -0,0 +1,29 @@ +package redis + +import ( + "sync" + + "github.com/gomodule/redigo/redis" +) + +type pool struct { + sync.Mutex + i int + addrs []string +} + +func (p *pool) Get() redis.Conn { + for i := 0; i < 3; i++ { + p.Lock() + addr := p.addrs[p.i%len(p.addrs)] + p.i++ + p.Unlock() + + c, err := redis.Dial("tcp", addr) + if err != nil { + continue + } + return c + } + return nil +} diff --git a/sync/lock/redis/redis.go b/sync/lock/redis/redis.go new file mode 100644 index 00000000..2ea4a676 --- /dev/null +++ b/sync/lock/redis/redis.go @@ -0,0 +1,94 @@ +// Package redis is a redis implemenation of lock +package redis + +import ( + "errors" + "sync" + "time" + + "github.com/go-redsync/redsync" + "github.com/micro/go-micro/sync/lock" +) + +type redisLock struct { + sync.Mutex + + locks map[string]*redsync.Mutex + opts lock.Options + c *redsync.Redsync +} + +func (r *redisLock) Acquire(id string, opts ...lock.AcquireOption) error { + var options lock.AcquireOptions + for _, o := range opts { + o(&options) + } + + var ropts []redsync.Option + + if options.Wait > time.Duration(0) { + ropts = append(ropts, redsync.SetRetryDelay(options.Wait)) + ropts = append(ropts, redsync.SetTries(1)) + } + + if options.TTL > time.Duration(0) { + ropts = append(ropts, redsync.SetExpiry(options.TTL)) + } + + m := r.c.NewMutex(r.opts.Prefix+id, ropts...) + err := m.Lock() + if err != nil { + return err + } + + r.Lock() + r.locks[id] = m + r.Unlock() + + return nil +} + +func (r *redisLock) Release(id string) error { + r.Lock() + defer r.Unlock() + m, ok := r.locks[id] + if !ok { + return errors.New("lock not found") + } + + unlocked := m.Unlock() + delete(r.locks, id) + + if !unlocked { + return errors.New("lock not unlocked") + } + + return nil +} + +func (r *redisLock) String() string { + return "redis" +} + +func NewLock(opts ...lock.Option) lock.Lock { + var options lock.Options + for _, o := range opts { + o(&options) + } + + nodes := options.Nodes + + if len(nodes) == 0 { + nodes = []string{"127.0.0.1:6379"} + } + + rpool := redsync.New([]redsync.Pool{&pool{ + addrs: nodes, + }}) + + return &redisLock{ + locks: make(map[string]*redsync.Mutex), + opts: options, + c: rpool, + } +} diff --git a/sync/options.go b/sync/options.go new file mode 100644 index 00000000..8b46acf0 --- /dev/null +++ b/sync/options.go @@ -0,0 +1,36 @@ +package sync + +import ( + "github.com/micro/go-micro/sync/data" + "github.com/micro/go-micro/sync/leader" + "github.com/micro/go-micro/sync/lock" + "github.com/micro/go-micro/sync/time" +) + +// WithLeader sets the leader election implementation opton +func WithLeader(l leader.Leader) Option { + return func(o *Options) { + o.Leader = l + } +} + +// WithLock sets the locking implementation option +func WithLock(l lock.Lock) Option { + return func(o *Options) { + o.Lock = l + } +} + +// WithData sets the data implementation option +func WithData(s data.Data) Option { + return func(o *Options) { + o.Data = s + } +} + +// WithTime sets the time implementation option +func WithTime(t time.Time) Option { + return func(o *Options) { + o.Time = t + } +} diff --git a/sync/sync.go b/sync/sync.go new file mode 100644 index 00000000..7cb63c51 --- /dev/null +++ b/sync/sync.go @@ -0,0 +1,41 @@ +// Package sync is a distributed synchronization framework +package sync + +import ( + "github.com/micro/go-micro/sync/data" + "github.com/micro/go-micro/sync/leader" + "github.com/micro/go-micro/sync/lock" + "github.com/micro/go-micro/sync/task" + "github.com/micro/go-micro/sync/time" +) + +// DB provides synchronized access to key-value storage. +// It uses the data interface and lock interface to +// provide a consistent storage mechanism. +type DB interface { + // Read value with given key + Read(key, val interface{}) error + // Write value with given key + Write(key, val interface{}) error + // Delete value with given key + Delete(key interface{}) error + // Iterate over all key/vals. Value changes are saved + Iterate(func(key, val interface{}) error) error +} + +// Cron is a distributed scheduler using leader election +// and distributed task runners. It uses the leader and +// task interfaces. +type Cron interface { + Schedule(task.Schedule, task.Command) error +} + +type Options struct { + Leader leader.Leader + Lock lock.Lock + Data data.Data + Task task.Task + Time time.Time +} + +type Option func(o *Options) diff --git a/sync/task/broker/broker.go b/sync/task/broker/broker.go new file mode 100644 index 00000000..af0ee673 --- /dev/null +++ b/sync/task/broker/broker.go @@ -0,0 +1,219 @@ +// Package broker provides a distributed task manager built on the micro broker +package broker + +import ( + "context" + "errors" + "fmt" + "math/rand" + "strings" + "sync" + "time" + + "github.com/google/uuid" + "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/sync/task" +) + +type brokerKey struct{} + +// Task is a broker task +type Task struct { + // a micro broker + Broker broker.Broker + // Options + Options task.Options + + mtx sync.RWMutex + status string +} + +func returnError(err error, ch chan error) { + select { + case ch <- err: + default: + } +} + +func (t *Task) Run(c task.Command) error { + // connect + t.Broker.Connect() + // unique id for this runner + id := uuid.New().String() + // topic of the command + topic := fmt.Sprintf("task.%s", c.Name) + + // global error + errCh := make(chan error, t.Options.Pool) + + // subscribe for distributed work + workFn := func(p broker.Publication) error { + msg := p.Message() + + // get command name + command := msg.Header["Command"] + + // check the command is what we expect + if command != c.Name { + returnError(errors.New("received unknown command: "+command), errCh) + return nil + } + + // new task created + switch msg.Header["Status"] { + case "start": + // artificially delay start of processing + time.Sleep(time.Millisecond * time.Duration(10+rand.Intn(100))) + + // execute the function + err := c.Func() + + status := "done" + errors := "" + + if err != nil { + status = "error" + errors = err.Error() + } + + // create response + msg := &broker.Message{ + Header: map[string]string{ + "Command": c.Name, + "Error": errors, + "Id": id, + "Status": status, + "Timestamp": fmt.Sprintf("%d", time.Now().Unix()), + }, + // Body is nil, may be used in future + } + + // publish end of task + if err := t.Broker.Publish(topic, msg); err != nil { + returnError(err, errCh) + } + } + + return nil + } + + // subscribe for the pool size + for i := 0; i < t.Options.Pool; i++ { + // subscribe to work + subWork, err := t.Broker.Subscribe(topic, workFn, broker.Queue(fmt.Sprintf("work.%d", i))) + if err != nil { + return err + } + + // unsubscribe on completion + defer subWork.Unsubscribe() + } + + // subscribe to all status messages + subStatus, err := t.Broker.Subscribe(topic, func(p broker.Publication) error { + msg := p.Message() + + // get command name + command := msg.Header["Command"] + + // check the command is what we expect + if command != c.Name { + return nil + } + + // check task status + switch msg.Header["Status"] { + // task is complete + case "done": + errCh <- nil + // someone failed + case "error": + returnError(errors.New(msg.Header["Error"]), errCh) + } + + return nil + }) + if err != nil { + return err + } + defer subStatus.Unsubscribe() + + // a new task + msg := &broker.Message{ + Header: map[string]string{ + "Command": c.Name, + "Id": id, + "Status": "start", + "Timestamp": fmt.Sprintf("%d", time.Now().Unix()), + }, + } + + // artificially delay the start of the task + time.Sleep(time.Millisecond * time.Duration(10+rand.Intn(100))) + + // publish the task + if err := t.Broker.Publish(topic, msg); err != nil { + return err + } + + var gerrors []string + + // wait for all responses + for i := 0; i < t.Options.Pool; i++ { + // check errors + err := <-errCh + + // append to errors + if err != nil { + gerrors = append(gerrors, err.Error()) + } + } + + // return the errors + if len(gerrors) > 0 { + return errors.New("errors: " + strings.Join(gerrors, "\n")) + } + + return nil +} + +func (t *Task) Status() string { + t.mtx.RLock() + defer t.mtx.RUnlock() + return t.status +} + +// Broker sets the micro broker +func WithBroker(b broker.Broker) task.Option { + return func(o *task.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, brokerKey{}, b) + } +} + +// NewTask returns a new broker task +func NewTask(opts ...task.Option) task.Task { + options := task.Options{ + Context: context.Background(), + } + + for _, o := range opts { + o(&options) + } + + if options.Pool == 0 { + options.Pool = 1 + } + + b, ok := options.Context.Value(brokerKey{}).(broker.Broker) + if !ok { + b = broker.DefaultBroker + } + + return &Task{ + Broker: b, + Options: options, + } +} diff --git a/sync/task/local/local.go b/sync/task/local/local.go new file mode 100644 index 00000000..94aa7707 --- /dev/null +++ b/sync/task/local/local.go @@ -0,0 +1,59 @@ +// Package local provides a local task runner +package local + +import ( + "fmt" + "sync" + + "github.com/micro/go-micro/sync/task" +) + +type localTask struct { + opts task.Options + mtx sync.RWMutex + status string +} + +func (l *localTask) Run(t task.Command) error { + ch := make(chan error, l.opts.Pool) + + for i := 0; i < l.opts.Pool; i++ { + go func() { + ch <- t.Execute() + }() + } + + var err error + + for i := 0; i < l.opts.Pool; i++ { + er := <-ch + if err != nil { + err = er + l.mtx.Lock() + l.status = fmt.Sprintf("command [%s] status: %s", t.Name, err.Error()) + l.mtx.Unlock() + } + } + + close(ch) + return err +} + +func (l *localTask) Status() string { + l.mtx.RLock() + defer l.mtx.RUnlock() + return l.status +} + +func NewTask(opts ...task.Option) task.Task { + var options task.Options + for _, o := range opts { + o(&options) + } + if options.Pool == 0 { + options.Pool = 1 + } + return &localTask{ + opts: options, + } +} diff --git a/sync/task/task.go b/sync/task/task.go new file mode 100644 index 00000000..031355ca --- /dev/null +++ b/sync/task/task.go @@ -0,0 +1,83 @@ +// Package task provides an interface for distributed jobs +package task + +import ( + "context" + "fmt" + "time" +) + +// Task represents a distributed task +type Task interface { + // Run runs a command immediately until completion + Run(Command) error + // Status provides status of last execution + Status() string +} + +// Command to be executed +type Command struct { + Name string + Func func() error +} + +// Schedule represents a time or interval at which a task should run +type Schedule struct { + // When to start the schedule. Zero time means immediately + Time time.Time + // Non zero interval dictates an ongoing schedule + Interval time.Duration +} + +type Options struct { + // Pool size for workers + Pool int + // Alternative options + Context context.Context +} + +type Option func(o *Options) + +func (c Command) Execute() error { + return c.Func() +} + +func (c Command) String() string { + return c.Name +} + +func (s Schedule) Run() <-chan time.Time { + d := s.Time.Sub(time.Now()) + + ch := make(chan time.Time, 1) + + go func() { + // wait for start time + <-time.After(d) + + // zero interval + if s.Interval == time.Duration(0) { + ch <- time.Now() + close(ch) + return + } + + // start ticker + for t := range time.Tick(s.Interval) { + ch <- t + } + }() + + return ch +} + +func (s Schedule) String() string { + return fmt.Sprintf("%d-%d", s.Time.Unix(), s.Interval) +} + +// WithPool sets the pool size for concurrent work +func WithPool(i int) Option { + return func(o *Options) { + o.Pool = i + } +} diff --git a/sync/time/local/local.go b/sync/time/local/local.go new file mode 100644 index 00000000..a3d899c1 --- /dev/null +++ b/sync/time/local/local.go @@ -0,0 +1,18 @@ +// Package local provides a local clock +package local + +import ( + gotime "time" + + "github.com/micro/go-micro/sync/time" +) + +type Time struct{} + +func (t *Time) Now() (gotime.Time, error) { + return gotime.Now(), nil +} + +func NewTime(opts ...time.Option) time.Time { + return new(Time) +} diff --git a/sync/time/ntp/ntp.go b/sync/time/ntp/ntp.go new file mode 100644 index 00000000..2de6a852 --- /dev/null +++ b/sync/time/ntp/ntp.go @@ -0,0 +1,51 @@ +// Package ntp provides ntp synchronized time +package ntp + +import ( + "context" + gotime "time" + + "github.com/beevik/ntp" + "github.com/micro/go-micro/sync/time" +) + +type ntpTime struct { + server string +} + +type ntpServerKey struct{} + +func (n *ntpTime) Now() (gotime.Time, error) { + return ntp.Time(n.server) +} + +// NewTime returns ntp time +func NewTime(opts ...time.Option) time.Time { + options := time.Options{ + Context: context.Background(), + } + + for _, o := range opts { + o(&options) + } + + server := "time.google.com" + + if k, ok := options.Context.Value(ntpServerKey{}).(string); ok { + server = k + } + + return &ntpTime{ + server: server, + } +} + +// WithServer sets the ntp server +func WithServer(s string) time.Option { + return func(o *time.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, ntpServerKey{}, s) + } +} diff --git a/sync/time/time.go b/sync/time/time.go new file mode 100644 index 00000000..7c11a751 --- /dev/null +++ b/sync/time/time.go @@ -0,0 +1,18 @@ +// Package time provides clock synchronization +package time + +import ( + "context" + "time" +) + +// Time returns synchronized time +type Time interface { + Now() (time.Time, error) +} + +type Options struct { + Context context.Context +} + +type Option func(o *Options)