diff --git a/server/rpc_server.go b/server/rpc_server.go index b1db38be..6caca4e3 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -40,7 +40,7 @@ func newRpcServer(opts ...Option) Server { return &rpcServer{ opts: options, - router: DefaultRouter, + router: router, handlers: make(map[string]Handler), subscribers: make(map[*subscriber][]broker.Subscriber), exit: make(chan chan error), diff --git a/sync/data/consul/consul.go b/store/consul/consul.go similarity index 59% rename from sync/data/consul/consul.go rename to store/consul/consul.go index 9d2c7a93..9fa1634f 100644 --- a/sync/data/consul/consul.go +++ b/store/consul/consul.go @@ -6,24 +6,26 @@ import ( "net" "github.com/hashicorp/consul/api" - "github.com/micro/go-micro/sync/data" + "github.com/micro/go-micro/options" + "github.com/micro/go-micro/store" ) type ckv struct { + options.Options client *api.Client } -func (c *ckv) Read(key string) (*data.Record, error) { +func (c *ckv) Read(key string) (*store.Record, error) { keyval, _, err := c.client.KV().Get(key, nil) if err != nil { return nil, err } if keyval == nil { - return nil, data.ErrNotFound + return nil, store.ErrNotFound } - return &data.Record{ + return &store.Record{ Key: keyval.Key, Value: keyval.Value, }, nil @@ -34,7 +36,7 @@ func (c *ckv) Delete(key string) error { return err } -func (c *ckv) Write(record *data.Record) error { +func (c *ckv) Write(record *store.Record) error { _, err := c.client.KV().Put(&api.KVPair{ Key: record.Key, Value: record.Value, @@ -42,17 +44,17 @@ func (c *ckv) Write(record *data.Record) error { return err } -func (c *ckv) Dump() ([]*data.Record, error) { +func (c *ckv) Dump() ([]*store.Record, error) { keyval, _, err := c.client.KV().List("/", nil) if err != nil { return nil, err } if keyval == nil { - return nil, data.ErrNotFound + return nil, store.ErrNotFound } - var vals []*data.Record + var vals []*store.Record for _, keyv := range keyval { - vals = append(vals, &data.Record{ + vals = append(vals, &store.Record{ Key: keyv.Key, Value: keyv.Value, }) @@ -64,22 +66,22 @@ func (c *ckv) String() string { return "consul" } -func NewData(opts ...data.Option) data.Data { - var options data.Options - for _, o := range opts { - o(&options) - } - +func NewStore(opts ...options.Option) store.Store { + options := options.NewOptions(opts...) config := api.DefaultConfig() + var nodes []string + + if n, ok := options.Values().Get("store.nodes"); ok { + nodes = n.([]string) + } + // set host - // config.Host something - // check if there are any addrs - if len(options.Nodes) > 0 { - addr, port, err := net.SplitHostPort(options.Nodes[0]) + if len(nodes) > 0 { + addr, port, err := net.SplitHostPort(nodes[0]) if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" { port = "8500" - config.Address = fmt.Sprintf("%s:%s", options.Nodes[0], port) + config.Address = fmt.Sprintf("%s:%s", nodes[0], port) } else if err == nil { config.Address = fmt.Sprintf("%s:%s", addr, port) } @@ -88,6 +90,7 @@ func NewData(opts ...data.Option) data.Data { client, _ := api.NewClient(config) return &ckv{ - client: client, + Options: options, + client: client, } } diff --git a/store/memory/memory.go b/store/memory/memory.go new file mode 100644 index 00000000..4c1d6c8a --- /dev/null +++ b/store/memory/memory.go @@ -0,0 +1,97 @@ +// Package memory is a in-memory store store +package memory + +import ( + "sync" + "time" + + "github.com/micro/go-micro/options" + "github.com/micro/go-micro/store" +) + +type memoryStore struct { + options.Options + + sync.RWMutex + values map[string]*memoryRecord +} + +type memoryRecord struct { + r *store.Record + c time.Time +} + +func (m *memoryStore) Dump() ([]*store.Record, error) { + m.RLock() + defer m.RUnlock() + + var values []*store.Record + + for _, v := range m.values { + // get expiry + d := v.r.Expiry + t := time.Since(v.c) + + // expired + if d > time.Duration(0) && t > d { + continue + } + values = append(values, v.r) + } + + return values, nil +} + +func (m *memoryStore) Read(key string) (*store.Record, error) { + m.RLock() + defer m.RUnlock() + + v, ok := m.values[key] + if !ok { + return nil, store.ErrNotFound + } + + // get expiry + d := v.r.Expiry + t := time.Since(v.c) + + // expired + if d > time.Duration(0) && t > d { + return nil, store.ErrNotFound + } + + return v.r, nil +} + +func (m *memoryStore) Write(r *store.Record) error { + m.Lock() + defer m.Unlock() + + // set the record + m.values[r.Key] = &memoryRecord{ + r: r, + c: time.Now(), + } + + return nil +} + +func (m *memoryStore) Delete(key string) error { + m.Lock() + defer m.Unlock() + + // delete the value + delete(m.values, key) + + return nil +} + +// NewStore returns a new store.Store +func NewStore(opts ...options.Option) store.Store { + options := options.NewOptions(opts...) + + return &memoryStore{ + Options: options, + values: make(map[string]*memoryRecord), + } +} diff --git a/store/options.go b/store/options.go new file mode 100644 index 00000000..2f63e582 --- /dev/null +++ b/store/options.go @@ -0,0 +1,15 @@ +package store + +import ( + "github.com/micro/go-micro/options" +) + +// Set the nodes used to back the store +func Nodes(a ...string) options.Option { + return options.WithValue("store.nodes", a) +} + +// Prefix sets a prefix to any key ids used +func Prefix(p string) options.Option { + return options.WithValue("store.prefix", p) +} diff --git a/sync/data/data.go b/store/store.go similarity index 57% rename from sync/data/data.go rename to store/store.go index 20e4a4c8..e8c17eae 100644 --- a/sync/data/data.go +++ b/store/store.go @@ -1,17 +1,21 @@ -// Package data is an interface for key-value storage. -package data +// Package store is an interface for distribute data storage. +package store import ( "errors" "time" + + "github.com/micro/go-micro/options" ) var ( ErrNotFound = errors.New("not found") ) -// Data is a data storage interface -type Data interface { +// Store is a data storage interface +type Store interface { + // embed options + options.Options // Dump the known records Dump() ([]*Record, error) // Read a record with key @@ -24,9 +28,7 @@ type Data interface { // Record represents a data record type Record struct { - Key string - Value []byte - Expiration time.Duration + Key string + Value []byte + Expiry time.Duration } - -type Option func(o *Options) diff --git a/sync/README.md b/sync/README.md index 94349374..d69c2060 100644 --- a/sync/README.md +++ b/sync/README.md @@ -10,7 +10,6 @@ an external database or eventing system. Go Sync provides a framework for synchr ## Getting Started -- [Data](#data) - simple distributed data storage - [Leader](#leader) - leadership election for group coordination - [Lock](#lock) - distributed locking for exclusive resource access - [Task](#task) - distributed job execution @@ -70,30 +69,6 @@ for { e.Resign() ``` -## Data - -Data provides a simple interface for distributed data storage. - -```go -import ( - "github.com/micro/go-micro/sync/data" - "github.com/micro/go-micro/sync/data/consul" -) - -keyval := consul.NewData() - -err := keyval.Write(&data.Record{ - Key: "foo", - Value: []byte(`bar`), -}) -// handle err - -v, err := keyval.Read("foo") -// handle err - -err = keyval.Delete("foo") -``` - ## Task Task provides distributed job execution. It's a simple way to distribute work across a coordinated pool of workers. diff --git a/sync/data/options.go b/sync/data/options.go deleted file mode 100644 index 1d85aca3..00000000 --- a/sync/data/options.go +++ /dev/null @@ -1,19 +0,0 @@ -package data - -type Options struct { - Nodes []string - Prefix string -} - -func Nodes(a ...string) Option { - return func(o *Options) { - o.Nodes = a - } -} - -// Prefix sets a prefix to any lock ids used -func Prefix(p string) Option { - return func(o *Options) { - o.Prefix = p - } -} diff --git a/sync/db.go b/sync/map.go similarity index 75% rename from sync/db.go rename to sync/map.go index a16af598..2053c57d 100644 --- a/sync/db.go +++ b/sync/map.go @@ -6,12 +6,12 @@ import ( "encoding/json" "fmt" - "github.com/micro/go-micro/sync/data" - ckv "github.com/micro/go-micro/sync/data/consul" + "github.com/micro/go-micro/store" + ckv "github.com/micro/go-micro/store/consul" lock "github.com/micro/go-micro/sync/lock/consul" ) -type syncDB struct { +type syncMap struct { opts Options } @@ -20,7 +20,7 @@ func ekey(k interface{}) string { return base64.StdEncoding.EncodeToString(b) } -func (m *syncDB) Read(key, val interface{}) error { +func (m *syncMap) Read(key, val interface{}) error { if key == nil { return fmt.Errorf("key is nil") } @@ -34,7 +34,7 @@ func (m *syncDB) Read(key, val interface{}) error { defer m.opts.Lock.Release(kstr) // get key - kval, err := m.opts.Data.Read(kstr) + kval, err := m.opts.Store.Read(kstr) if err != nil { return err } @@ -43,7 +43,7 @@ func (m *syncDB) Read(key, val interface{}) error { return json.Unmarshal(kval.Value, val) } -func (m *syncDB) Write(key, val interface{}) error { +func (m *syncMap) Write(key, val interface{}) error { if key == nil { return fmt.Errorf("key is nil") } @@ -63,13 +63,13 @@ func (m *syncDB) Write(key, val interface{}) error { } // set key - return m.opts.Data.Write(&data.Record{ + return m.opts.Store.Write(&store.Record{ Key: kstr, Value: b, }) } -func (m *syncDB) Delete(key interface{}) error { +func (m *syncMap) Delete(key interface{}) error { if key == nil { return fmt.Errorf("key is nil") } @@ -81,11 +81,11 @@ func (m *syncDB) Delete(key interface{}) error { return err } defer m.opts.Lock.Release(kstr) - return m.opts.Data.Delete(kstr) + return m.opts.Store.Delete(kstr) } -func (m *syncDB) Iterate(fn func(key, val interface{}) error) error { - keyvals, err := m.opts.Data.Dump() +func (m *syncMap) Iterate(fn func(key, val interface{}) error) error { + keyvals, err := m.opts.Store.Dump() if err != nil { return err } @@ -126,7 +126,7 @@ func (m *syncDB) Iterate(fn func(key, val interface{}) error) error { } // set key - if err := m.opts.Data.Write(&data.Record{ + if err := m.opts.Store.Write(&store.Record{ Key: keyval.Key, Value: b, }); err != nil { @@ -137,7 +137,7 @@ func (m *syncDB) Iterate(fn func(key, val interface{}) error) error { return nil } -func NewDB(opts ...Option) DB { +func NewMap(opts ...Option) Map { var options Options for _, o := range opts { o(&options) @@ -147,11 +147,11 @@ func NewDB(opts ...Option) DB { options.Lock = lock.NewLock() } - if options.Data == nil { - options.Data = ckv.NewData() + if options.Store == nil { + options.Store = ckv.NewStore() } - return &syncDB{ + return &syncMap{ opts: options, } } diff --git a/sync/options.go b/sync/options.go index 8b46acf0..0e5cf3d0 100644 --- a/sync/options.go +++ b/sync/options.go @@ -1,7 +1,7 @@ package sync import ( - "github.com/micro/go-micro/sync/data" + "github.com/micro/go-micro/store" "github.com/micro/go-micro/sync/leader" "github.com/micro/go-micro/sync/lock" "github.com/micro/go-micro/sync/time" @@ -21,10 +21,10 @@ func WithLock(l lock.Lock) Option { } } -// WithData sets the data implementation option -func WithData(s data.Data) Option { +// WithStore sets the store implementation option +func WithStore(s store.Store) Option { return func(o *Options) { - o.Data = s + o.Store = s } } diff --git a/sync/sync.go b/sync/sync.go index 7cb63c51..7b080b1d 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -2,17 +2,17 @@ package sync import ( - "github.com/micro/go-micro/sync/data" + "github.com/micro/go-micro/store" "github.com/micro/go-micro/sync/leader" "github.com/micro/go-micro/sync/lock" "github.com/micro/go-micro/sync/task" "github.com/micro/go-micro/sync/time" ) -// DB provides synchronized access to key-value storage. -// It uses the data interface and lock interface to +// Map provides synchronized access to key-value storage. +// It uses the store interface and lock interface to // provide a consistent storage mechanism. -type DB interface { +type Map interface { // Read value with given key Read(key, val interface{}) error // Write value with given key @@ -33,7 +33,7 @@ type Cron interface { type Options struct { Leader leader.Leader Lock lock.Lock - Data data.Data + Store store.Store Task task.Task Time time.Time }