Merge pull request #813 from micro/store
Move out consul sync/lock and store. Move data/store to store
This commit is contained in:
		| @@ -1,2 +0,0 @@ | |||||||
| // Package data is an interface for data access |  | ||||||
| package data |  | ||||||
| @@ -1,96 +0,0 @@ | |||||||
| // Package consul is a consul implementation of kv |  | ||||||
| package consul |  | ||||||
|  |  | ||||||
| import ( |  | ||||||
| 	"fmt" |  | ||||||
| 	"net" |  | ||||||
|  |  | ||||||
| 	"github.com/hashicorp/consul/api" |  | ||||||
| 	"github.com/micro/go-micro/config/options" |  | ||||||
| 	"github.com/micro/go-micro/data/store" |  | ||||||
| ) |  | ||||||
|  |  | ||||||
| type ckv struct { |  | ||||||
| 	options.Options |  | ||||||
| 	client *api.Client |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (c *ckv) Read(key string) (*store.Record, error) { |  | ||||||
| 	keyval, _, err := c.client.KV().Get(key, nil) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return nil, err |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	if keyval == nil { |  | ||||||
| 		return nil, store.ErrNotFound |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	return &store.Record{ |  | ||||||
| 		Key:   keyval.Key, |  | ||||||
| 		Value: keyval.Value, |  | ||||||
| 	}, nil |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (c *ckv) Delete(key string) error { |  | ||||||
| 	_, err := c.client.KV().Delete(key, nil) |  | ||||||
| 	return err |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (c *ckv) Write(record *store.Record) error { |  | ||||||
| 	_, err := c.client.KV().Put(&api.KVPair{ |  | ||||||
| 		Key:   record.Key, |  | ||||||
| 		Value: record.Value, |  | ||||||
| 	}, nil) |  | ||||||
| 	return err |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (c *ckv) Dump() ([]*store.Record, error) { |  | ||||||
| 	keyval, _, err := c.client.KV().List("/", nil) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return nil, err |  | ||||||
| 	} |  | ||||||
| 	if keyval == nil { |  | ||||||
| 		return nil, store.ErrNotFound |  | ||||||
| 	} |  | ||||||
| 	var vals []*store.Record |  | ||||||
| 	for _, keyv := range keyval { |  | ||||||
| 		vals = append(vals, &store.Record{ |  | ||||||
| 			Key:   keyv.Key, |  | ||||||
| 			Value: keyv.Value, |  | ||||||
| 		}) |  | ||||||
| 	} |  | ||||||
| 	return vals, nil |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (c *ckv) String() string { |  | ||||||
| 	return "consul" |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func NewStore(opts ...options.Option) store.Store { |  | ||||||
| 	options := options.NewOptions(opts...) |  | ||||||
| 	config := api.DefaultConfig() |  | ||||||
|  |  | ||||||
| 	var nodes []string |  | ||||||
|  |  | ||||||
| 	if n, ok := options.Values().Get("store.nodes"); ok { |  | ||||||
| 		nodes = n.([]string) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// set host |  | ||||||
| 	if len(nodes) > 0 { |  | ||||||
| 		addr, port, err := net.SplitHostPort(nodes[0]) |  | ||||||
| 		if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" { |  | ||||||
| 			port = "8500" |  | ||||||
| 			config.Address = fmt.Sprintf("%s:%s", nodes[0], port) |  | ||||||
| 		} else if err == nil { |  | ||||||
| 			config.Address = fmt.Sprintf("%s:%s", addr, port) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	client, _ := api.NewClient(config) |  | ||||||
|  |  | ||||||
| 	return &ckv{ |  | ||||||
| 		Options: options, |  | ||||||
| 		client:  client, |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
							
								
								
									
										91
									
								
								store/etcd/etcd.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										91
									
								
								store/etcd/etcd.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,91 @@ | |||||||
|  | // Package etcd is an etcd v3 implementation of kv | ||||||
|  | package etcd | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"log" | ||||||
|  |  | ||||||
|  | 	client "github.com/coreos/etcd/clientv3" | ||||||
|  | 	"github.com/micro/go-micro/config/options" | ||||||
|  | 	"github.com/micro/go-micro/store" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | type ekv struct { | ||||||
|  | 	options.Options | ||||||
|  | 	kv client.KV | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (e *ekv) Read(key string) (*store.Record, error) { | ||||||
|  | 	keyval, err := e.kv.Get(context.Background(), key) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if keyval == nil || len(keyval.Kvs) == 0 { | ||||||
|  | 		return nil, store.ErrNotFound | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return &store.Record{ | ||||||
|  | 		Key:   string(keyval.Kvs[0].Key), | ||||||
|  | 		Value: keyval.Kvs[0].Value, | ||||||
|  | 	}, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (e *ekv) Delete(key string) error { | ||||||
|  | 	_, err := e.kv.Delete(context.Background(), key) | ||||||
|  | 	return err | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (e *ekv) Write(record *store.Record) error { | ||||||
|  | 	_, err := e.kv.Put(context.Background(), record.Key, string(record.Value)) | ||||||
|  | 	return err | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (e *ekv) Dump() ([]*store.Record, error) { | ||||||
|  | 	keyval, err := e.kv.Get(context.Background(), "/", client.WithPrefix()) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	var vals []*store.Record | ||||||
|  | 	if keyval == nil || len(keyval.Kvs) == 0 { | ||||||
|  | 		return vals, nil | ||||||
|  | 	} | ||||||
|  | 	for _, keyv := range keyval.Kvs { | ||||||
|  | 		vals = append(vals, &store.Record{ | ||||||
|  | 			Key:   string(keyv.Key), | ||||||
|  | 			Value: keyv.Value, | ||||||
|  | 		}) | ||||||
|  | 	} | ||||||
|  | 	return vals, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (e *ekv) String() string { | ||||||
|  | 	return "etcd" | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func NewStore(opts ...options.Option) store.Store { | ||||||
|  | 	options := options.NewOptions(opts...) | ||||||
|  |  | ||||||
|  | 	var endpoints []string | ||||||
|  |  | ||||||
|  | 	if e, ok := options.Values().Get("store.nodes"); ok { | ||||||
|  | 		endpoints = e.([]string) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if len(endpoints) == 0 { | ||||||
|  | 		endpoints = []string{"http://127.0.0.1:2379"} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// TODO: parse addresses | ||||||
|  | 	c, err := client.New(client.Config{ | ||||||
|  | 		Endpoints: endpoints, | ||||||
|  | 	}) | ||||||
|  | 	if err != nil { | ||||||
|  | 		log.Fatal(err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return &ekv{ | ||||||
|  | 		Options: options, | ||||||
|  | 		kv:      client.NewKV(c), | ||||||
|  | 	} | ||||||
|  | } | ||||||
| @@ -6,7 +6,7 @@ import ( | |||||||
| 	"time" | 	"time" | ||||||
| 
 | 
 | ||||||
| 	"github.com/micro/go-micro/config/options" | 	"github.com/micro/go-micro/config/options" | ||||||
| 	"github.com/micro/go-micro/data/store" | 	"github.com/micro/go-micro/store" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| type memoryStore struct { | type memoryStore struct { | ||||||
| @@ -4,7 +4,7 @@ import ( | |||||||
| 	"testing" | 	"testing" | ||||||
| 	"time" | 	"time" | ||||||
| 
 | 
 | ||||||
| 	"github.com/micro/go-micro/data/store" | 	"github.com/micro/go-micro/store" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| func TestReadRecordExpire(t *testing.T) { | func TestReadRecordExpire(t *testing.T) { | ||||||
| @@ -1,104 +0,0 @@ | |||||||
| // Package consul is a consul implemenation of lock |  | ||||||
| package consul |  | ||||||
|  |  | ||||||
| import ( |  | ||||||
| 	"errors" |  | ||||||
| 	"fmt" |  | ||||||
| 	"net" |  | ||||||
| 	"sync" |  | ||||||
| 	"time" |  | ||||||
|  |  | ||||||
| 	"github.com/hashicorp/consul/api" |  | ||||||
| 	lock "github.com/micro/go-micro/sync/lock" |  | ||||||
| ) |  | ||||||
|  |  | ||||||
| type consulLock struct { |  | ||||||
| 	sync.Mutex |  | ||||||
|  |  | ||||||
| 	locks map[string]*api.Lock |  | ||||||
| 	opts  lock.Options |  | ||||||
| 	c     *api.Client |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (c *consulLock) Acquire(id string, opts ...lock.AcquireOption) error { |  | ||||||
| 	var options lock.AcquireOptions |  | ||||||
| 	for _, o := range opts { |  | ||||||
| 		o(&options) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	if options.Wait <= time.Duration(0) { |  | ||||||
| 		options.Wait = api.DefaultLockWaitTime |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	ttl := fmt.Sprintf("%v", options.TTL) |  | ||||||
| 	if options.TTL <= time.Duration(0) { |  | ||||||
| 		ttl = api.DefaultLockSessionTTL |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	l, err := c.c.LockOpts(&api.LockOptions{ |  | ||||||
| 		Key:          c.opts.Prefix + id, |  | ||||||
| 		LockWaitTime: options.Wait, |  | ||||||
| 		SessionTTL:   ttl, |  | ||||||
| 	}) |  | ||||||
|  |  | ||||||
| 	if err != nil { |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	_, err = l.Lock(nil) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	c.Lock() |  | ||||||
| 	c.locks[id] = l |  | ||||||
| 	c.Unlock() |  | ||||||
|  |  | ||||||
| 	return nil |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (c *consulLock) Release(id string) error { |  | ||||||
| 	c.Lock() |  | ||||||
| 	defer c.Unlock() |  | ||||||
| 	l, ok := c.locks[id] |  | ||||||
| 	if !ok { |  | ||||||
| 		return errors.New("lock not found") |  | ||||||
| 	} |  | ||||||
| 	err := l.Unlock() |  | ||||||
| 	delete(c.locks, id) |  | ||||||
| 	return err |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (c *consulLock) String() string { |  | ||||||
| 	return "consul" |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func NewLock(opts ...lock.Option) lock.Lock { |  | ||||||
| 	var options lock.Options |  | ||||||
| 	for _, o := range opts { |  | ||||||
| 		o(&options) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	config := api.DefaultConfig() |  | ||||||
|  |  | ||||||
| 	// set host |  | ||||||
| 	// config.Host something |  | ||||||
| 	// check if there are any addrs |  | ||||||
| 	if len(options.Nodes) > 0 { |  | ||||||
| 		addr, port, err := net.SplitHostPort(options.Nodes[0]) |  | ||||||
| 		if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" { |  | ||||||
| 			port = "8500" |  | ||||||
| 			config.Address = fmt.Sprintf("%s:%s", options.Nodes[0], port) |  | ||||||
| 		} else if err == nil { |  | ||||||
| 			config.Address = fmt.Sprintf("%s:%s", addr, port) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	client, _ := api.NewClient(config) |  | ||||||
|  |  | ||||||
| 	return &consulLock{ |  | ||||||
| 		locks: make(map[string]*api.Lock), |  | ||||||
| 		opts:  options, |  | ||||||
| 		c:     client, |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
							
								
								
									
										115
									
								
								sync/lock/etcd/etcd.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										115
									
								
								sync/lock/etcd/etcd.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,115 @@ | |||||||
|  | // Package etcd is an etcd implementation of lock | ||||||
|  | package etcd | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"errors" | ||||||
|  | 	"log" | ||||||
|  | 	"path" | ||||||
|  | 	"strings" | ||||||
|  | 	"sync" | ||||||
|  |  | ||||||
|  | 	client "github.com/coreos/etcd/clientv3" | ||||||
|  | 	cc "github.com/coreos/etcd/clientv3/concurrency" | ||||||
|  | 	"github.com/micro/go-micro/sync/lock" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | type etcdLock struct { | ||||||
|  | 	opts   lock.Options | ||||||
|  | 	path   string | ||||||
|  | 	client *client.Client | ||||||
|  |  | ||||||
|  | 	sync.Mutex | ||||||
|  | 	locks map[string]*elock | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type elock struct { | ||||||
|  | 	s *cc.Session | ||||||
|  | 	m *cc.Mutex | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (e *etcdLock) Acquire(id string, opts ...lock.AcquireOption) error { | ||||||
|  | 	var options lock.AcquireOptions | ||||||
|  | 	for _, o := range opts { | ||||||
|  | 		o(&options) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// make path | ||||||
|  | 	path := path.Join(e.path, strings.Replace(e.opts.Prefix+id, "/", "-", -1)) | ||||||
|  |  | ||||||
|  | 	var sopts []cc.SessionOption | ||||||
|  | 	if options.TTL > 0 { | ||||||
|  | 		sopts = append(sopts, cc.WithTTL(int(options.TTL.Seconds()))) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	s, err := cc.NewSession(e.client, sopts...) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	m := cc.NewMutex(s, path) | ||||||
|  |  | ||||||
|  | 	ctx, _ := context.WithCancel(context.Background()) | ||||||
|  |  | ||||||
|  | 	if err := m.Lock(ctx); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	e.Lock() | ||||||
|  | 	e.locks[id] = &elock{ | ||||||
|  | 		s: s, | ||||||
|  | 		m: m, | ||||||
|  | 	} | ||||||
|  | 	e.Unlock() | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (e *etcdLock) Release(id string) error { | ||||||
|  | 	e.Lock() | ||||||
|  | 	defer e.Unlock() | ||||||
|  | 	v, ok := e.locks[id] | ||||||
|  | 	if !ok { | ||||||
|  | 		return errors.New("lock not found") | ||||||
|  | 	} | ||||||
|  | 	err := v.m.Unlock(context.Background()) | ||||||
|  | 	delete(e.locks, id) | ||||||
|  | 	return err | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (e *etcdLock) String() string { | ||||||
|  | 	return "etcd" | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func NewLock(opts ...lock.Option) lock.Lock { | ||||||
|  | 	var options lock.Options | ||||||
|  | 	for _, o := range opts { | ||||||
|  | 		o(&options) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	var endpoints []string | ||||||
|  |  | ||||||
|  | 	for _, addr := range options.Nodes { | ||||||
|  | 		if len(addr) > 0 { | ||||||
|  | 			endpoints = append(endpoints, addr) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if len(endpoints) == 0 { | ||||||
|  | 		endpoints = []string{"http://127.0.0.1:2379"} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// TODO: parse addresses | ||||||
|  | 	c, err := client.New(client.Config{ | ||||||
|  | 		Endpoints: endpoints, | ||||||
|  | 	}) | ||||||
|  | 	if err != nil { | ||||||
|  | 		log.Fatal(err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return &etcdLock{ | ||||||
|  | 		path:   "/micro/lock", | ||||||
|  | 		client: c, | ||||||
|  | 		opts:   options, | ||||||
|  | 		locks:  make(map[string]*elock), | ||||||
|  | 	} | ||||||
|  | } | ||||||
| @@ -6,9 +6,9 @@ import ( | |||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| 	"fmt" | 	"fmt" | ||||||
|  |  | ||||||
| 	"github.com/micro/go-micro/data/store" | 	"github.com/micro/go-micro/store" | ||||||
| 	ckv "github.com/micro/go-micro/data/store/consul" | 	ckv "github.com/micro/go-micro/store/etcd" | ||||||
| 	lock "github.com/micro/go-micro/sync/lock/consul" | 	lock "github.com/micro/go-micro/sync/lock/etcd" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| type syncMap struct { | type syncMap struct { | ||||||
|   | |||||||
| @@ -1,7 +1,7 @@ | |||||||
| package sync | package sync | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
| 	"github.com/micro/go-micro/data/store" | 	"github.com/micro/go-micro/store" | ||||||
| 	"github.com/micro/go-micro/sync/leader" | 	"github.com/micro/go-micro/sync/leader" | ||||||
| 	"github.com/micro/go-micro/sync/lock" | 	"github.com/micro/go-micro/sync/lock" | ||||||
| 	"github.com/micro/go-micro/sync/time" | 	"github.com/micro/go-micro/sync/time" | ||||||
|   | |||||||
| @@ -2,7 +2,7 @@ | |||||||
| package sync | package sync | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
| 	"github.com/micro/go-micro/data/store" | 	"github.com/micro/go-micro/store" | ||||||
| 	"github.com/micro/go-micro/sync/leader" | 	"github.com/micro/go-micro/sync/leader" | ||||||
| 	"github.com/micro/go-micro/sync/lock" | 	"github.com/micro/go-micro/sync/lock" | ||||||
| 	"github.com/micro/go-micro/sync/task" | 	"github.com/micro/go-micro/sync/task" | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user