From b5ca40a91ad89baba40bc7870fae90900367a0a8 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 3 Oct 2019 09:46:20 +0100 Subject: [PATCH] Move out consul sync/lock and store. Move data/store to store --- data/data.go | 2 - data/store/consul/consul.go | 96 ---------------- store/etcd/etcd.go | 91 ++++++++++++++++ {data/store => store}/memory/memory.go | 2 +- {data/store => store}/memory/memory_test.go | 2 +- {data/store => store}/options.go | 0 {data/store => store}/store.go | 0 sync/lock/consul/consul.go | 104 ------------------ sync/lock/etcd/etcd.go | 115 ++++++++++++++++++++ sync/map.go | 6 +- sync/options.go | 2 +- sync/sync.go | 2 +- 12 files changed, 213 insertions(+), 209 deletions(-) delete mode 100644 data/data.go delete mode 100644 data/store/consul/consul.go create mode 100644 store/etcd/etcd.go rename {data/store => store}/memory/memory.go (97%) rename {data/store => store}/memory/memory_test.go (93%) rename {data/store => store}/options.go (100%) rename {data/store => store}/store.go (100%) delete mode 100644 sync/lock/consul/consul.go create mode 100644 sync/lock/etcd/etcd.go diff --git a/data/data.go b/data/data.go deleted file mode 100644 index eb6157d2..00000000 --- a/data/data.go +++ /dev/null @@ -1,2 +0,0 @@ -// Package data is an interface for data access -package data diff --git a/data/store/consul/consul.go b/data/store/consul/consul.go deleted file mode 100644 index 174d043c..00000000 --- a/data/store/consul/consul.go +++ /dev/null @@ -1,96 +0,0 @@ -// Package consul is a consul implementation of kv -package consul - -import ( - "fmt" - "net" - - "github.com/hashicorp/consul/api" - "github.com/micro/go-micro/config/options" - "github.com/micro/go-micro/data/store" -) - -type ckv struct { - options.Options - client *api.Client -} - -func (c *ckv) Read(key string) (*store.Record, error) { - keyval, _, err := c.client.KV().Get(key, nil) - if err != nil { - return nil, err - } - - if keyval == nil { - return nil, store.ErrNotFound - } - - return &store.Record{ - Key: keyval.Key, - Value: keyval.Value, - }, nil -} - -func (c *ckv) Delete(key string) error { - _, err := c.client.KV().Delete(key, nil) - return err -} - -func (c *ckv) Write(record *store.Record) error { - _, err := c.client.KV().Put(&api.KVPair{ - Key: record.Key, - Value: record.Value, - }, nil) - return err -} - -func (c *ckv) Dump() ([]*store.Record, error) { - keyval, _, err := c.client.KV().List("/", nil) - if err != nil { - return nil, err - } - if keyval == nil { - return nil, store.ErrNotFound - } - var vals []*store.Record - for _, keyv := range keyval { - vals = append(vals, &store.Record{ - Key: keyv.Key, - Value: keyv.Value, - }) - } - return vals, nil -} - -func (c *ckv) String() string { - return "consul" -} - -func NewStore(opts ...options.Option) store.Store { - options := options.NewOptions(opts...) - config := api.DefaultConfig() - - var nodes []string - - if n, ok := options.Values().Get("store.nodes"); ok { - nodes = n.([]string) - } - - // set host - if len(nodes) > 0 { - addr, port, err := net.SplitHostPort(nodes[0]) - if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" { - port = "8500" - config.Address = fmt.Sprintf("%s:%s", nodes[0], port) - } else if err == nil { - config.Address = fmt.Sprintf("%s:%s", addr, port) - } - } - - client, _ := api.NewClient(config) - - return &ckv{ - Options: options, - client: client, - } -} diff --git a/store/etcd/etcd.go b/store/etcd/etcd.go new file mode 100644 index 00000000..32994f26 --- /dev/null +++ b/store/etcd/etcd.go @@ -0,0 +1,91 @@ +// Package etcd is an etcd v3 implementation of kv +package etcd + +import ( + "context" + "log" + + client "github.com/coreos/etcd/clientv3" + "github.com/micro/go-micro/config/options" + "github.com/micro/go-micro/store" +) + +type ekv struct { + options.Options + kv client.KV +} + +func (e *ekv) Read(key string) (*store.Record, error) { + keyval, err := e.kv.Get(context.Background(), key) + if err != nil { + return nil, err + } + + if keyval == nil || len(keyval.Kvs) == 0 { + return nil, store.ErrNotFound + } + + return &store.Record{ + Key: string(keyval.Kvs[0].Key), + Value: keyval.Kvs[0].Value, + }, nil +} + +func (e *ekv) Delete(key string) error { + _, err := e.kv.Delete(context.Background(), key) + return err +} + +func (e *ekv) Write(record *store.Record) error { + _, err := e.kv.Put(context.Background(), record.Key, string(record.Value)) + return err +} + +func (e *ekv) Dump() ([]*store.Record, error) { + keyval, err := e.kv.Get(context.Background(), "/", client.WithPrefix()) + if err != nil { + return nil, err + } + var vals []*store.Record + if keyval == nil || len(keyval.Kvs) == 0 { + return vals, nil + } + for _, keyv := range keyval.Kvs { + vals = append(vals, &store.Record{ + Key: string(keyv.Key), + Value: keyv.Value, + }) + } + return vals, nil +} + +func (e *ekv) String() string { + return "etcd" +} + +func NewStore(opts ...options.Option) store.Store { + options := options.NewOptions(opts...) + + var endpoints []string + + if e, ok := options.Values().Get("store.nodes"); ok { + endpoints = e.([]string) + } + + if len(endpoints) == 0 { + endpoints = []string{"http://127.0.0.1:2379"} + } + + // TODO: parse addresses + c, err := client.New(client.Config{ + Endpoints: endpoints, + }) + if err != nil { + log.Fatal(err) + } + + return &ekv{ + Options: options, + kv: client.NewKV(c), + } +} diff --git a/data/store/memory/memory.go b/store/memory/memory.go similarity index 97% rename from data/store/memory/memory.go rename to store/memory/memory.go index bb7892e8..c8e7dd53 100644 --- a/data/store/memory/memory.go +++ b/store/memory/memory.go @@ -6,7 +6,7 @@ import ( "time" "github.com/micro/go-micro/config/options" - "github.com/micro/go-micro/data/store" + "github.com/micro/go-micro/store" ) type memoryStore struct { diff --git a/data/store/memory/memory_test.go b/store/memory/memory_test.go similarity index 93% rename from data/store/memory/memory_test.go rename to store/memory/memory_test.go index 44483869..639d18db 100644 --- a/data/store/memory/memory_test.go +++ b/store/memory/memory_test.go @@ -4,7 +4,7 @@ import ( "testing" "time" - "github.com/micro/go-micro/data/store" + "github.com/micro/go-micro/store" ) func TestReadRecordExpire(t *testing.T) { diff --git a/data/store/options.go b/store/options.go similarity index 100% rename from data/store/options.go rename to store/options.go diff --git a/data/store/store.go b/store/store.go similarity index 100% rename from data/store/store.go rename to store/store.go diff --git a/sync/lock/consul/consul.go b/sync/lock/consul/consul.go deleted file mode 100644 index ef0f2dd4..00000000 --- a/sync/lock/consul/consul.go +++ /dev/null @@ -1,104 +0,0 @@ -// Package consul is a consul implemenation of lock -package consul - -import ( - "errors" - "fmt" - "net" - "sync" - "time" - - "github.com/hashicorp/consul/api" - lock "github.com/micro/go-micro/sync/lock" -) - -type consulLock struct { - sync.Mutex - - locks map[string]*api.Lock - opts lock.Options - c *api.Client -} - -func (c *consulLock) Acquire(id string, opts ...lock.AcquireOption) error { - var options lock.AcquireOptions - for _, o := range opts { - o(&options) - } - - if options.Wait <= time.Duration(0) { - options.Wait = api.DefaultLockWaitTime - } - - ttl := fmt.Sprintf("%v", options.TTL) - if options.TTL <= time.Duration(0) { - ttl = api.DefaultLockSessionTTL - } - - l, err := c.c.LockOpts(&api.LockOptions{ - Key: c.opts.Prefix + id, - LockWaitTime: options.Wait, - SessionTTL: ttl, - }) - - if err != nil { - return err - } - - _, err = l.Lock(nil) - if err != nil { - return err - } - - c.Lock() - c.locks[id] = l - c.Unlock() - - return nil -} - -func (c *consulLock) Release(id string) error { - c.Lock() - defer c.Unlock() - l, ok := c.locks[id] - if !ok { - return errors.New("lock not found") - } - err := l.Unlock() - delete(c.locks, id) - return err -} - -func (c *consulLock) String() string { - return "consul" -} - -func NewLock(opts ...lock.Option) lock.Lock { - var options lock.Options - for _, o := range opts { - o(&options) - } - - config := api.DefaultConfig() - - // set host - // config.Host something - // check if there are any addrs - if len(options.Nodes) > 0 { - addr, port, err := net.SplitHostPort(options.Nodes[0]) - if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" { - port = "8500" - config.Address = fmt.Sprintf("%s:%s", options.Nodes[0], port) - } else if err == nil { - config.Address = fmt.Sprintf("%s:%s", addr, port) - } - } - - client, _ := api.NewClient(config) - - return &consulLock{ - locks: make(map[string]*api.Lock), - opts: options, - c: client, - } -} diff --git a/sync/lock/etcd/etcd.go b/sync/lock/etcd/etcd.go new file mode 100644 index 00000000..0b8ba823 --- /dev/null +++ b/sync/lock/etcd/etcd.go @@ -0,0 +1,115 @@ +// Package etcd is an etcd implementation of lock +package etcd + +import ( + "context" + "errors" + "log" + "path" + "strings" + "sync" + + client "github.com/coreos/etcd/clientv3" + cc "github.com/coreos/etcd/clientv3/concurrency" + "github.com/micro/go-micro/sync/lock" +) + +type etcdLock struct { + opts lock.Options + path string + client *client.Client + + sync.Mutex + locks map[string]*elock +} + +type elock struct { + s *cc.Session + m *cc.Mutex +} + +func (e *etcdLock) Acquire(id string, opts ...lock.AcquireOption) error { + var options lock.AcquireOptions + for _, o := range opts { + o(&options) + } + + // make path + path := path.Join(e.path, strings.Replace(e.opts.Prefix+id, "/", "-", -1)) + + var sopts []cc.SessionOption + if options.TTL > 0 { + sopts = append(sopts, cc.WithTTL(int(options.TTL.Seconds()))) + } + + s, err := cc.NewSession(e.client, sopts...) + if err != nil { + return err + } + + m := cc.NewMutex(s, path) + + ctx, _ := context.WithCancel(context.Background()) + + if err := m.Lock(ctx); err != nil { + return err + } + + e.Lock() + e.locks[id] = &elock{ + s: s, + m: m, + } + e.Unlock() + return nil +} + +func (e *etcdLock) Release(id string) error { + e.Lock() + defer e.Unlock() + v, ok := e.locks[id] + if !ok { + return errors.New("lock not found") + } + err := v.m.Unlock(context.Background()) + delete(e.locks, id) + return err +} + +func (e *etcdLock) String() string { + return "etcd" +} + +func NewLock(opts ...lock.Option) lock.Lock { + var options lock.Options + for _, o := range opts { + o(&options) + } + + var endpoints []string + + for _, addr := range options.Nodes { + if len(addr) > 0 { + endpoints = append(endpoints, addr) + } + } + + if len(endpoints) == 0 { + endpoints = []string{"http://127.0.0.1:2379"} + } + + // TODO: parse addresses + c, err := client.New(client.Config{ + Endpoints: endpoints, + }) + if err != nil { + log.Fatal(err) + } + + return &etcdLock{ + path: "/micro/lock", + client: c, + opts: options, + locks: make(map[string]*elock), + } +} diff --git a/sync/map.go b/sync/map.go index 5f7865e9..5913267b 100644 --- a/sync/map.go +++ b/sync/map.go @@ -6,9 +6,9 @@ import ( "encoding/json" "fmt" - "github.com/micro/go-micro/data/store" - ckv "github.com/micro/go-micro/data/store/consul" - lock "github.com/micro/go-micro/sync/lock/consul" + "github.com/micro/go-micro/store" + ckv "github.com/micro/go-micro/store/etcd" + lock "github.com/micro/go-micro/sync/lock/etcd" ) type syncMap struct { diff --git a/sync/options.go b/sync/options.go index 65f20e28..0e5cf3d0 100644 --- a/sync/options.go +++ b/sync/options.go @@ -1,7 +1,7 @@ package sync import ( - "github.com/micro/go-micro/data/store" + "github.com/micro/go-micro/store" "github.com/micro/go-micro/sync/leader" "github.com/micro/go-micro/sync/lock" "github.com/micro/go-micro/sync/time" diff --git a/sync/sync.go b/sync/sync.go index 27a6104a..7b080b1d 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -2,7 +2,7 @@ package sync import ( - "github.com/micro/go-micro/data/store" + "github.com/micro/go-micro/store" "github.com/micro/go-micro/sync/leader" "github.com/micro/go-micro/sync/lock" "github.com/micro/go-micro/sync/task"