Move sync deps, change uuid to google and update go.mod

This commit is contained in:
Asim Aslam
2019-06-07 13:53:42 +01:00
parent 9e23855c37
commit a2fbf19341
14 changed files with 9 additions and 765 deletions

View File

@@ -1,93 +0,0 @@
// Package etcd is an etcd v3 implementation of kv
package etcd
import (
"context"
"log"
"github.com/micro/go-micro/sync/data"
client "go.etcd.io/etcd/clientv3"
)
type ekv struct {
kv client.KV
}
func (e *ekv) Read(key string) (*data.Record, error) {
keyval, err := e.kv.Get(context.Background(), key)
if err != nil {
return nil, err
}
if keyval == nil || len(keyval.Kvs) == 0 {
return nil, data.ErrNotFound
}
return &data.Record{
Key: string(keyval.Kvs[0].Key),
Value: keyval.Kvs[0].Value,
}, nil
}
func (e *ekv) Delete(key string) error {
_, err := e.kv.Delete(context.Background(), key)
return err
}
func (e *ekv) Write(record *data.Record) error {
_, err := e.kv.Put(context.Background(), record.Key, string(record.Value))
return err
}
func (e *ekv) Dump() ([]*data.Record, error) {
keyval, err := e.kv.Get(context.Background(), "/", client.WithPrefix())
if err != nil {
return nil, err
}
var vals []*data.Record
if keyval == nil || len(keyval.Kvs) == 0 {
return vals, nil
}
for _, keyv := range keyval.Kvs {
vals = append(vals, &data.Record{
Key: string(keyv.Key),
Value: keyv.Value,
})
}
return vals, nil
}
func (e *ekv) String() string {
return "etcd"
}
func NewData(opts ...data.Option) data.Data {
var options data.Options
for _, o := range opts {
o(&options)
}
var endpoints []string
for _, addr := range options.Nodes {
if len(addr) > 0 {
endpoints = append(endpoints, addr)
}
}
if len(endpoints) == 0 {
endpoints = []string{"http://127.0.0.1:2379"}
}
// TODO: parse addresses
c, err := client.New(client.Config{
Endpoints: endpoints,
})
if err != nil {
log.Fatal(err)
}
return &ekv{
kv: client.NewKV(c),
}
}

View File

@@ -1,178 +0,0 @@
package memcached
import (
"bufio"
"bytes"
"fmt"
"io"
"net"
"strings"
"time"
mc "github.com/bradfitz/gomemcache/memcache"
"github.com/micro/go-micro/sync/data"
)
type mkv struct {
Server *mc.ServerList
Client *mc.Client
}
func (m *mkv) Read(key string) (*data.Record, error) {
keyval, err := m.Client.Get(key)
if err != nil && err == mc.ErrCacheMiss {
return nil, data.ErrNotFound
} else if err != nil {
return nil, err
}
if keyval == nil {
return nil, data.ErrNotFound
}
return &data.Record{
Key: keyval.Key,
Value: keyval.Value,
Expiration: time.Second * time.Duration(keyval.Expiration),
}, nil
}
func (m *mkv) Delete(key string) error {
return m.Client.Delete(key)
}
func (m *mkv) Write(record *data.Record) error {
return m.Client.Set(&mc.Item{
Key: record.Key,
Value: record.Value,
Expiration: int32(record.Expiration.Seconds()),
})
}
func (m *mkv) Dump() ([]*data.Record, error) {
// stats
// cachedump
// get keys
var keys []string
//data := make(map[string]string)
if err := m.Server.Each(func(c net.Addr) error {
cc, err := net.Dial("tcp", c.String())
if err != nil {
return err
}
defer cc.Close()
b := bufio.NewReadWriter(bufio.NewReader(cc), bufio.NewWriter(cc))
// get records
if _, err := fmt.Fprintf(b, "stats records\r\n"); err != nil {
return err
}
b.Flush()
v, err := b.ReadSlice('\n')
if err != nil {
return err
}
parts := bytes.Split(v, []byte("\n"))
if len(parts) < 1 {
return nil
}
vals := strings.Split(string(parts[0]), ":")
records := vals[1]
// drain
for {
buf, err := b.ReadSlice('\n')
if err == io.EOF {
break
}
if err != nil {
return err
}
if strings.HasPrefix(string(buf), "END") {
break
}
}
b.Writer.Reset(cc)
b.Reader.Reset(cc)
if _, err := fmt.Fprintf(b, "lru_crawler metadump %s\r\n", records); err != nil {
return err
}
b.Flush()
for {
v, err := b.ReadString('\n')
if err == io.EOF {
break
}
if err != nil {
return err
}
if strings.HasPrefix(v, "END") {
break
}
key := strings.Split(v, " ")[0]
keys = append(keys, strings.TrimPrefix(key, "key="))
}
return nil
}); err != nil {
return nil, err
}
var vals []*data.Record
// concurrent op
ch := make(chan *data.Record, len(keys))
for _, k := range keys {
go func(key string) {
i, _ := m.Read(key)
ch <- i
}(k)
}
for i := 0; i < len(keys); i++ {
record := <-ch
if record == nil {
continue
}
vals = append(vals, record)
}
close(ch)
return vals, nil
}
func (m *mkv) String() string {
return "memcached"
}
func NewData(opts ...data.Option) data.Data {
var options data.Options
for _, o := range opts {
o(&options)
}
if len(options.Nodes) == 0 {
options.Nodes = []string{"127.0.0.1:11211"}
}
ss := new(mc.ServerList)
ss.SetServers(options.Nodes...)
return &mkv{
Server: ss,
Client: mc.New(options.Nodes...),
}
}

View File

@@ -1,82 +0,0 @@
package redis
import (
"github.com/micro/go-micro/sync/data"
redis "gopkg.in/redis.v3"
)
type rkv struct {
Client *redis.Client
}
func (r *rkv) Read(key string) (*data.Record, error) {
val, err := r.Client.Get(key).Bytes()
if err != nil && err == redis.Nil {
return nil, data.ErrNotFound
} else if err != nil {
return nil, err
}
if val == nil {
return nil, data.ErrNotFound
}
d, err := r.Client.TTL(key).Result()
if err != nil {
return nil, err
}
return &data.Record{
Key: key,
Value: val,
Expiration: d,
}, nil
}
func (r *rkv) Delete(key string) error {
return r.Client.Del(key).Err()
}
func (r *rkv) Write(record *data.Record) error {
return r.Client.Set(record.Key, record.Value, record.Expiration).Err()
}
func (r *rkv) Dump() ([]*data.Record, error) {
keys, err := r.Client.Keys("*").Result()
if err != nil {
return nil, err
}
var vals []*data.Record
for _, k := range keys {
i, err := r.Read(k)
if err != nil {
return nil, err
}
vals = append(vals, i)
}
return vals, nil
}
func (r *rkv) String() string {
return "redis"
}
func NewData(opts ...data.Option) data.Data {
var options data.Options
for _, o := range opts {
o(&options)
}
if len(options.Nodes) == 0 {
options.Nodes = []string{"127.0.0.1:6379"}
}
return &rkv{
Client: redis.NewClient(&redis.Options{
Addr: options.Nodes[0],
Password: "", // no password set
DB: 0, // use default DB
}),
}
}

View File

@@ -1,145 +0,0 @@
package etcd
import (
"context"
"log"
"path"
"strings"
client "github.com/coreos/etcd/clientv3"
cc "github.com/coreos/etcd/clientv3/concurrency"
"github.com/micro/go-micro/sync/leader"
)
type etcdLeader struct {
opts leader.Options
path string
client *client.Client
}
type etcdElected struct {
s *cc.Session
e *cc.Election
id string
}
func (e *etcdLeader) Elect(id string, opts ...leader.ElectOption) (leader.Elected, error) {
var options leader.ElectOptions
for _, o := range opts {
o(&options)
}
// make path
path := path.Join(e.path, strings.Replace(id, "/", "-", -1))
s, err := cc.NewSession(e.client)
if err != nil {
return nil, err
}
l := cc.NewElection(s, path)
ctx, _ := context.WithCancel(context.Background())
if err := l.Campaign(ctx, id); err != nil {
return nil, err
}
return &etcdElected{
e: l,
id: id,
}, nil
}
func (e *etcdLeader) Follow() chan string {
ch := make(chan string)
s, err := cc.NewSession(e.client)
if err != nil {
return ch
}
l := cc.NewElection(s, e.path)
ech := l.Observe(context.Background())
go func() {
for {
select {
case r, ok := <-ech:
if !ok {
return
}
ch <- string(r.Kvs[0].Value)
}
}
}()
return ch
}
func (e *etcdLeader) String() string {
return "etcd"
}
func (e *etcdElected) Reelect() error {
ctx, _ := context.WithCancel(context.Background())
return e.e.Campaign(ctx, e.id)
}
func (e *etcdElected) Revoked() chan bool {
ch := make(chan bool, 1)
ech := e.e.Observe(context.Background())
go func() {
for r := range ech {
if string(r.Kvs[0].Value) != e.id {
ch <- true
close(ch)
return
}
}
}()
return ch
}
func (e *etcdElected) Resign() error {
return e.e.Resign(context.Background())
}
func (e *etcdElected) Id() string {
return e.id
}
func NewLeader(opts ...leader.Option) leader.Leader {
var options leader.Options
for _, o := range opts {
o(&options)
}
var endpoints []string
for _, addr := range options.Nodes {
if len(addr) > 0 {
endpoints = append(endpoints, addr)
}
}
if len(endpoints) == 0 {
endpoints = []string{"http://127.0.0.1:2379"}
}
// TODO: parse addresses
c, err := client.New(client.Config{
Endpoints: endpoints,
})
if err != nil {
log.Fatal(err)
}
return &etcdLeader{
path: "/micro/leader",
client: c,
opts: options,
}
}

View File

@@ -1,115 +0,0 @@
// Package etcd is an etcd implementation of lock
package etcd
import (
"context"
"errors"
"log"
"path"
"strings"
"sync"
client "github.com/coreos/etcd/clientv3"
cc "github.com/coreos/etcd/clientv3/concurrency"
"github.com/micro/go-micro/sync/lock"
)
type etcdLock struct {
opts lock.Options
path string
client *client.Client
sync.Mutex
locks map[string]*elock
}
type elock struct {
s *cc.Session
m *cc.Mutex
}
func (e *etcdLock) Acquire(id string, opts ...lock.AcquireOption) error {
var options lock.AcquireOptions
for _, o := range opts {
o(&options)
}
// make path
path := path.Join(e.path, strings.Replace(e.opts.Prefix+id, "/", "-", -1))
var sopts []cc.SessionOption
if options.TTL > 0 {
sopts = append(sopts, cc.WithTTL(int(options.TTL.Seconds())))
}
s, err := cc.NewSession(e.client, sopts...)
if err != nil {
return err
}
m := cc.NewMutex(s, path)
ctx, _ := context.WithCancel(context.Background())
if err := m.Lock(ctx); err != nil {
return err
}
e.Lock()
e.locks[id] = &elock{
s: s,
m: m,
}
e.Unlock()
return nil
}
func (e *etcdLock) Release(id string) error {
e.Lock()
defer e.Unlock()
v, ok := e.locks[id]
if !ok {
return errors.New("lock not found")
}
err := v.m.Unlock(context.Background())
delete(e.locks, id)
return err
}
func (e *etcdLock) String() string {
return "etcd"
}
func NewLock(opts ...lock.Option) lock.Lock {
var options lock.Options
for _, o := range opts {
o(&options)
}
var endpoints []string
for _, addr := range options.Nodes {
if len(addr) > 0 {
endpoints = append(endpoints, addr)
}
}
if len(endpoints) == 0 {
endpoints = []string{"http://127.0.0.1:2379"}
}
// TODO: parse addresses
c, err := client.New(client.Config{
Endpoints: endpoints,
})
if err != nil {
log.Fatal(err)
}
return &etcdLock{
path: "/micro/lock",
client: c,
opts: options,
locks: make(map[string]*elock),
}
}

View File

@@ -1,29 +0,0 @@
package redis
import (
"sync"
"github.com/gomodule/redigo/redis"
)
type pool struct {
sync.Mutex
i int
addrs []string
}
func (p *pool) Get() redis.Conn {
for i := 0; i < 3; i++ {
p.Lock()
addr := p.addrs[p.i%len(p.addrs)]
p.i++
p.Unlock()
c, err := redis.Dial("tcp", addr)
if err != nil {
continue
}
return c
}
return nil
}

View File

@@ -1,94 +0,0 @@
// Package redis is a redis implemenation of lock
package redis
import (
"errors"
"sync"
"time"
"github.com/go-redsync/redsync"
"github.com/micro/go-micro/sync/lock"
)
type redisLock struct {
sync.Mutex
locks map[string]*redsync.Mutex
opts lock.Options
c *redsync.Redsync
}
func (r *redisLock) Acquire(id string, opts ...lock.AcquireOption) error {
var options lock.AcquireOptions
for _, o := range opts {
o(&options)
}
var ropts []redsync.Option
if options.Wait > time.Duration(0) {
ropts = append(ropts, redsync.SetRetryDelay(options.Wait))
ropts = append(ropts, redsync.SetTries(1))
}
if options.TTL > time.Duration(0) {
ropts = append(ropts, redsync.SetExpiry(options.TTL))
}
m := r.c.NewMutex(r.opts.Prefix+id, ropts...)
err := m.Lock()
if err != nil {
return err
}
r.Lock()
r.locks[id] = m
r.Unlock()
return nil
}
func (r *redisLock) Release(id string) error {
r.Lock()
defer r.Unlock()
m, ok := r.locks[id]
if !ok {
return errors.New("lock not found")
}
unlocked := m.Unlock()
delete(r.locks, id)
if !unlocked {
return errors.New("lock not unlocked")
}
return nil
}
func (r *redisLock) String() string {
return "redis"
}
func NewLock(opts ...lock.Option) lock.Lock {
var options lock.Options
for _, o := range opts {
o(&options)
}
nodes := options.Nodes
if len(nodes) == 0 {
nodes = []string{"127.0.0.1:6379"}
}
rpool := redsync.New([]redsync.Pool{&pool{
addrs: nodes,
}})
return &redisLock{
locks: make(map[string]*redsync.Mutex),
opts: options,
c: rpool,
}
}