micro/sync/map.go

167 lines
2.7 KiB
Go
Raw Permalink Normal View History

2019-05-31 02:43:23 +03:00
package sync
import (
"bytes"
"encoding/base64"
"encoding/json"
"fmt"
"sort"
2019-05-31 02:43:23 +03:00
"github.com/micro/go-micro/v2/store"
ckv "github.com/micro/go-micro/v2/store/etcd"
lock "github.com/micro/go-micro/v2/sync/lock/etcd"
2019-05-31 02:43:23 +03:00
)
2019-06-11 20:21:33 +03:00
type syncMap struct {
2019-05-31 02:43:23 +03:00
opts Options
}
func ekey(k interface{}) string {
b, _ := json.Marshal(k)
return base64.StdEncoding.EncodeToString(b)
}
2019-06-11 20:21:33 +03:00
func (m *syncMap) Read(key, val interface{}) error {
2019-05-31 02:43:23 +03:00
if key == nil {
return fmt.Errorf("key is nil")
}
kstr := ekey(key)
// lock
if err := m.opts.Lock.Acquire(kstr); err != nil {
return err
}
defer m.opts.Lock.Release(kstr)
// get key
2019-06-12 09:50:04 +03:00
kval, err := m.opts.Store.Read(kstr)
2019-05-31 02:43:23 +03:00
if err != nil {
return err
}
2019-10-11 16:08:50 +03:00
if len(kval) == 0 {
return store.ErrNotFound
}
2019-05-31 02:43:23 +03:00
// decode value
2019-10-11 16:08:50 +03:00
return json.Unmarshal(kval[0].Value, val)
2019-05-31 02:43:23 +03:00
}
2019-06-11 20:21:33 +03:00
func (m *syncMap) Write(key, val interface{}) error {
2019-05-31 02:43:23 +03:00
if key == nil {
return fmt.Errorf("key is nil")
}
kstr := ekey(key)
// lock
if err := m.opts.Lock.Acquire(kstr); err != nil {
return err
}
defer m.opts.Lock.Release(kstr)
// encode value
b, err := json.Marshal(val)
if err != nil {
return err
}
// set key
2019-06-12 09:50:04 +03:00
return m.opts.Store.Write(&store.Record{
2019-05-31 02:43:23 +03:00
Key: kstr,
Value: b,
})
}
2019-06-11 20:21:33 +03:00
func (m *syncMap) Delete(key interface{}) error {
2019-05-31 02:43:23 +03:00
if key == nil {
return fmt.Errorf("key is nil")
}
kstr := ekey(key)
// lock
if err := m.opts.Lock.Acquire(kstr); err != nil {
return err
}
defer m.opts.Lock.Release(kstr)
2019-06-12 09:50:04 +03:00
return m.opts.Store.Delete(kstr)
2019-05-31 02:43:23 +03:00
}
2019-06-11 20:21:33 +03:00
func (m *syncMap) Iterate(fn func(key, val interface{}) error) error {
keyvals, err := m.opts.Store.Read("", store.ReadPrefix())
2019-05-31 02:43:23 +03:00
if err != nil {
return err
}
sort.Slice(keyvals, func(i, j int) bool {
return keyvals[i].Key < keyvals[j].Key
})
2019-05-31 02:43:23 +03:00
for _, keyval := range keyvals {
// lock
if err := m.opts.Lock.Acquire(keyval.Key); err != nil {
return err
}
// unlock
defer m.opts.Lock.Release(keyval.Key)
// unmarshal value
var val interface{}
if len(keyval.Value) > 0 && keyval.Value[0] == '{' {
if err := json.Unmarshal(keyval.Value, &val); err != nil {
return err
}
} else {
val = keyval.Value
}
// exec func
if err := fn(keyval.Key, val); err != nil {
return err
}
// save val
b, err := json.Marshal(val)
if err != nil {
return err
}
// no save
if i := bytes.Compare(keyval.Value, b); i == 0 {
return nil
}
// set key
2019-06-12 09:50:04 +03:00
if err := m.opts.Store.Write(&store.Record{
2019-05-31 02:43:23 +03:00
Key: keyval.Key,
Value: b,
}); err != nil {
return err
}
}
return nil
}
2019-06-11 20:21:33 +03:00
func NewMap(opts ...Option) Map {
2019-05-31 02:43:23 +03:00
var options Options
for _, o := range opts {
o(&options)
}
if options.Lock == nil {
options.Lock = lock.NewLock()
}
2019-06-12 09:50:04 +03:00
if options.Store == nil {
options.Store = ckv.NewStore()
2019-05-31 02:43:23 +03:00
}
2019-06-11 20:21:33 +03:00
return &syncMap{
2019-05-31 02:43:23 +03:00
opts: options,
}
}