Move out consul sync/lock and store. Move data/store to store

This commit is contained in:
Asim Aslam 2019-10-03 09:46:20 +01:00
parent b81bb07afc
commit b5ca40a91a
12 changed files with 213 additions and 209 deletions

View File

@ -1,2 +0,0 @@
// Package data is an interface for data access
package data

View File

@ -1,96 +0,0 @@
// Package consul is a consul implementation of kv
package consul
import (
"fmt"
"net"
"github.com/hashicorp/consul/api"
"github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/data/store"
)
type ckv struct {
options.Options
client *api.Client
}
func (c *ckv) Read(key string) (*store.Record, error) {
keyval, _, err := c.client.KV().Get(key, nil)
if err != nil {
return nil, err
}
if keyval == nil {
return nil, store.ErrNotFound
}
return &store.Record{
Key: keyval.Key,
Value: keyval.Value,
}, nil
}
func (c *ckv) Delete(key string) error {
_, err := c.client.KV().Delete(key, nil)
return err
}
func (c *ckv) Write(record *store.Record) error {
_, err := c.client.KV().Put(&api.KVPair{
Key: record.Key,
Value: record.Value,
}, nil)
return err
}
func (c *ckv) Dump() ([]*store.Record, error) {
keyval, _, err := c.client.KV().List("/", nil)
if err != nil {
return nil, err
}
if keyval == nil {
return nil, store.ErrNotFound
}
var vals []*store.Record
for _, keyv := range keyval {
vals = append(vals, &store.Record{
Key: keyv.Key,
Value: keyv.Value,
})
}
return vals, nil
}
func (c *ckv) String() string {
return "consul"
}
func NewStore(opts ...options.Option) store.Store {
options := options.NewOptions(opts...)
config := api.DefaultConfig()
var nodes []string
if n, ok := options.Values().Get("store.nodes"); ok {
nodes = n.([]string)
}
// set host
if len(nodes) > 0 {
addr, port, err := net.SplitHostPort(nodes[0])
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
port = "8500"
config.Address = fmt.Sprintf("%s:%s", nodes[0], port)
} else if err == nil {
config.Address = fmt.Sprintf("%s:%s", addr, port)
}
}
client, _ := api.NewClient(config)
return &ckv{
Options: options,
client: client,
}
}

91
store/etcd/etcd.go Normal file
View File

@ -0,0 +1,91 @@
// Package etcd is an etcd v3 implementation of kv
package etcd
import (
"context"
"log"
client "github.com/coreos/etcd/clientv3"
"github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/store"
)
type ekv struct {
options.Options
kv client.KV
}
func (e *ekv) Read(key string) (*store.Record, error) {
keyval, err := e.kv.Get(context.Background(), key)
if err != nil {
return nil, err
}
if keyval == nil || len(keyval.Kvs) == 0 {
return nil, store.ErrNotFound
}
return &store.Record{
Key: string(keyval.Kvs[0].Key),
Value: keyval.Kvs[0].Value,
}, nil
}
func (e *ekv) Delete(key string) error {
_, err := e.kv.Delete(context.Background(), key)
return err
}
func (e *ekv) Write(record *store.Record) error {
_, err := e.kv.Put(context.Background(), record.Key, string(record.Value))
return err
}
func (e *ekv) Dump() ([]*store.Record, error) {
keyval, err := e.kv.Get(context.Background(), "/", client.WithPrefix())
if err != nil {
return nil, err
}
var vals []*store.Record
if keyval == nil || len(keyval.Kvs) == 0 {
return vals, nil
}
for _, keyv := range keyval.Kvs {
vals = append(vals, &store.Record{
Key: string(keyv.Key),
Value: keyv.Value,
})
}
return vals, nil
}
func (e *ekv) String() string {
return "etcd"
}
func NewStore(opts ...options.Option) store.Store {
options := options.NewOptions(opts...)
var endpoints []string
if e, ok := options.Values().Get("store.nodes"); ok {
endpoints = e.([]string)
}
if len(endpoints) == 0 {
endpoints = []string{"http://127.0.0.1:2379"}
}
// TODO: parse addresses
c, err := client.New(client.Config{
Endpoints: endpoints,
})
if err != nil {
log.Fatal(err)
}
return &ekv{
Options: options,
kv: client.NewKV(c),
}
}

View File

@ -6,7 +6,7 @@ import (
"time" "time"
"github.com/micro/go-micro/config/options" "github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/data/store" "github.com/micro/go-micro/store"
) )
type memoryStore struct { type memoryStore struct {

View File

@ -4,7 +4,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/micro/go-micro/data/store" "github.com/micro/go-micro/store"
) )
func TestReadRecordExpire(t *testing.T) { func TestReadRecordExpire(t *testing.T) {

View File

@ -1,104 +0,0 @@
// Package consul is a consul implemenation of lock
package consul
import (
"errors"
"fmt"
"net"
"sync"
"time"
"github.com/hashicorp/consul/api"
lock "github.com/micro/go-micro/sync/lock"
)
type consulLock struct {
sync.Mutex
locks map[string]*api.Lock
opts lock.Options
c *api.Client
}
func (c *consulLock) Acquire(id string, opts ...lock.AcquireOption) error {
var options lock.AcquireOptions
for _, o := range opts {
o(&options)
}
if options.Wait <= time.Duration(0) {
options.Wait = api.DefaultLockWaitTime
}
ttl := fmt.Sprintf("%v", options.TTL)
if options.TTL <= time.Duration(0) {
ttl = api.DefaultLockSessionTTL
}
l, err := c.c.LockOpts(&api.LockOptions{
Key: c.opts.Prefix + id,
LockWaitTime: options.Wait,
SessionTTL: ttl,
})
if err != nil {
return err
}
_, err = l.Lock(nil)
if err != nil {
return err
}
c.Lock()
c.locks[id] = l
c.Unlock()
return nil
}
func (c *consulLock) Release(id string) error {
c.Lock()
defer c.Unlock()
l, ok := c.locks[id]
if !ok {
return errors.New("lock not found")
}
err := l.Unlock()
delete(c.locks, id)
return err
}
func (c *consulLock) String() string {
return "consul"
}
func NewLock(opts ...lock.Option) lock.Lock {
var options lock.Options
for _, o := range opts {
o(&options)
}
config := api.DefaultConfig()
// set host
// config.Host something
// check if there are any addrs
if len(options.Nodes) > 0 {
addr, port, err := net.SplitHostPort(options.Nodes[0])
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
port = "8500"
config.Address = fmt.Sprintf("%s:%s", options.Nodes[0], port)
} else if err == nil {
config.Address = fmt.Sprintf("%s:%s", addr, port)
}
}
client, _ := api.NewClient(config)
return &consulLock{
locks: make(map[string]*api.Lock),
opts: options,
c: client,
}
}

115
sync/lock/etcd/etcd.go Normal file
View File

@ -0,0 +1,115 @@
// Package etcd is an etcd implementation of lock
package etcd
import (
"context"
"errors"
"log"
"path"
"strings"
"sync"
client "github.com/coreos/etcd/clientv3"
cc "github.com/coreos/etcd/clientv3/concurrency"
"github.com/micro/go-micro/sync/lock"
)
type etcdLock struct {
opts lock.Options
path string
client *client.Client
sync.Mutex
locks map[string]*elock
}
type elock struct {
s *cc.Session
m *cc.Mutex
}
func (e *etcdLock) Acquire(id string, opts ...lock.AcquireOption) error {
var options lock.AcquireOptions
for _, o := range opts {
o(&options)
}
// make path
path := path.Join(e.path, strings.Replace(e.opts.Prefix+id, "/", "-", -1))
var sopts []cc.SessionOption
if options.TTL > 0 {
sopts = append(sopts, cc.WithTTL(int(options.TTL.Seconds())))
}
s, err := cc.NewSession(e.client, sopts...)
if err != nil {
return err
}
m := cc.NewMutex(s, path)
ctx, _ := context.WithCancel(context.Background())
if err := m.Lock(ctx); err != nil {
return err
}
e.Lock()
e.locks[id] = &elock{
s: s,
m: m,
}
e.Unlock()
return nil
}
func (e *etcdLock) Release(id string) error {
e.Lock()
defer e.Unlock()
v, ok := e.locks[id]
if !ok {
return errors.New("lock not found")
}
err := v.m.Unlock(context.Background())
delete(e.locks, id)
return err
}
func (e *etcdLock) String() string {
return "etcd"
}
func NewLock(opts ...lock.Option) lock.Lock {
var options lock.Options
for _, o := range opts {
o(&options)
}
var endpoints []string
for _, addr := range options.Nodes {
if len(addr) > 0 {
endpoints = append(endpoints, addr)
}
}
if len(endpoints) == 0 {
endpoints = []string{"http://127.0.0.1:2379"}
}
// TODO: parse addresses
c, err := client.New(client.Config{
Endpoints: endpoints,
})
if err != nil {
log.Fatal(err)
}
return &etcdLock{
path: "/micro/lock",
client: c,
opts: options,
locks: make(map[string]*elock),
}
}

View File

@ -6,9 +6,9 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/micro/go-micro/data/store" "github.com/micro/go-micro/store"
ckv "github.com/micro/go-micro/data/store/consul" ckv "github.com/micro/go-micro/store/etcd"
lock "github.com/micro/go-micro/sync/lock/consul" lock "github.com/micro/go-micro/sync/lock/etcd"
) )
type syncMap struct { type syncMap struct {

View File

@ -1,7 +1,7 @@
package sync package sync
import ( import (
"github.com/micro/go-micro/data/store" "github.com/micro/go-micro/store"
"github.com/micro/go-micro/sync/leader" "github.com/micro/go-micro/sync/leader"
"github.com/micro/go-micro/sync/lock" "github.com/micro/go-micro/sync/lock"
"github.com/micro/go-micro/sync/time" "github.com/micro/go-micro/sync/time"

View File

@ -2,7 +2,7 @@
package sync package sync
import ( import (
"github.com/micro/go-micro/data/store" "github.com/micro/go-micro/store"
"github.com/micro/go-micro/sync/leader" "github.com/micro/go-micro/sync/leader"
"github.com/micro/go-micro/sync/lock" "github.com/micro/go-micro/sync/lock"
"github.com/micro/go-micro/sync/task" "github.com/micro/go-micro/sync/task"