From a2fbf193418cd6659e3d8abadab7b4efbf9ac786 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Fri, 7 Jun 2019 13:53:42 +0100 Subject: [PATCH] Move sync deps, change uuid to google and update go.mod --- api/handler/api/api.go | 2 +- api/handler/api/util.go | 2 +- api/handler/cloudevents/event.go | 4 +- api/handler/event/event.go | 4 +- config/source/memory/memory.go | 4 +- go.mod | 8 +- go.sum | 14 --- sync/data/etcd/etcd.go | 93 ---------------- sync/data/memcached/memcached.go | 178 ------------------------------- sync/data/redis/redis.go | 82 -------------- sync/leader/etcd/etcd.go | 145 ------------------------- sync/lock/etcd/etcd.go | 115 -------------------- sync/lock/redis/pool.go | 29 ----- sync/lock/redis/redis.go | 94 ---------------- 14 files changed, 9 insertions(+), 765 deletions(-) delete mode 100644 sync/data/etcd/etcd.go delete mode 100644 sync/data/memcached/memcached.go delete mode 100644 sync/data/redis/redis.go delete mode 100644 sync/leader/etcd/etcd.go delete mode 100644 sync/lock/etcd/etcd.go delete mode 100644 sync/lock/redis/pool.go delete mode 100644 sync/lock/redis/redis.go diff --git a/api/handler/api/api.go b/api/handler/api/api.go index 8b56fb4a..20fa421d 100644 --- a/api/handler/api/api.go +++ b/api/handler/api/api.go @@ -6,11 +6,11 @@ import ( goapi "github.com/micro/go-micro/api" "github.com/micro/go-micro/api/handler" + api "github.com/micro/go-micro/api/proto" "github.com/micro/go-micro/client" "github.com/micro/go-micro/errors" "github.com/micro/go-micro/selector" "github.com/micro/go-micro/util/ctx" - api "github.com/micro/go-micro/api/proto" ) type apiHandler struct { diff --git a/api/handler/api/util.go b/api/handler/api/util.go index fe4d02d3..123ad3f8 100644 --- a/api/handler/api/util.go +++ b/api/handler/api/util.go @@ -8,9 +8,9 @@ import ( "net/http" "strings" + api "github.com/micro/go-micro/api/proto" "github.com/micro/go-micro/registry" "github.com/micro/go-micro/selector" - api "github.com/micro/go-micro/api/proto" ) func requestToProto(r *http.Request) (*api.Request, error) { diff --git a/api/handler/cloudevents/event.go b/api/handler/cloudevents/event.go index 90b4e8bb..6c0c6cf9 100644 --- a/api/handler/cloudevents/event.go +++ b/api/handler/cloudevents/event.go @@ -31,7 +31,7 @@ import ( "time" "unicode" - "github.com/pborman/uuid" + "github.com/google/uuid" "gopkg.in/go-playground/validator.v9" ) @@ -66,7 +66,7 @@ func New(eventType string, mimeType string, payload interface{}) *Event { EventType: eventType, CloudEventsVersion: CloudEventsVersion, Source: "https://micro.mu", - EventID: uuid.NewUUID().String(), + EventID: uuid.New().String(), EventTime: &now, ContentType: mimeType, Data: payload, diff --git a/api/handler/event/event.go b/api/handler/event/event.go index 066043b4..6fb45ae8 100644 --- a/api/handler/event/event.go +++ b/api/handler/event/event.go @@ -10,10 +10,10 @@ import ( "strings" "time" + "github.com/google/uuid" "github.com/micro/go-micro/api/handler" proto "github.com/micro/go-micro/api/proto" "github.com/micro/go-micro/util/ctx" - "github.com/pborman/uuid" ) type event struct { @@ -73,7 +73,7 @@ func (e *event) ServeHTTP(w http.ResponseWriter, r *http.Request) { ev := &proto.Event{ Name: action, // TODO: dedupe event - Id: fmt.Sprintf("%s-%s-%s", topic, action, uuid.NewUUID().String()), + Id: fmt.Sprintf("%s-%s-%s", topic, action, uuid.New().String()), Header: make(map[string]*proto.Pair), Timestamp: time.Now().Unix(), } diff --git a/config/source/memory/memory.go b/config/source/memory/memory.go index 50a509fd..607c7b4a 100644 --- a/config/source/memory/memory.go +++ b/config/source/memory/memory.go @@ -5,8 +5,8 @@ import ( "sync" "time" + "github.com/google/uuid" "github.com/micro/go-micro/config/source" - "github.com/pborman/uuid" ) type memory struct { @@ -29,7 +29,7 @@ func (s *memory) Read() (*source.ChangeSet, error) { func (s *memory) Watch() (source.Watcher, error) { w := &watcher{ - Id: uuid.NewUUID().String(), + Id: uuid.New().String(), Updates: make(chan *source.ChangeSet, 100), Source: s, } diff --git a/go.mod b/go.mod index d28b0980..b2cb5e95 100644 --- a/go.mod +++ b/go.mod @@ -6,9 +6,8 @@ require ( github.com/BurntSushi/toml v0.3.1 github.com/beevik/ntp v0.2.0 github.com/bitly/go-simplejson v0.5.0 - github.com/bradfitz/gomemcache v0.0.0-20190329173943-551aad21a668 github.com/bwmarrin/discordgo v0.19.0 - github.com/coreos/etcd v3.3.13+incompatible + github.com/coreos/etcd v3.3.13+incompatible // indirect github.com/forestgiant/sliceutil v0.0.0-20160425183142-94783f95db6c github.com/fsnotify/fsnotify v1.4.7 github.com/fsouza/go-dockerclient v1.4.1 @@ -16,9 +15,7 @@ require ( github.com/go-log/log v0.1.0 github.com/go-playground/locales v0.12.1 // indirect github.com/go-playground/universal-translator v0.16.0 // indirect - github.com/go-redsync/redsync v1.2.0 github.com/golang/protobuf v1.3.1 - github.com/gomodule/redigo v2.0.0+incompatible github.com/google/uuid v1.1.1 github.com/gorilla/handlers v1.4.0 github.com/gorilla/websocket v1.4.0 @@ -37,16 +34,13 @@ require ( github.com/modern-go/reflect2 v1.0.1 // indirect github.com/nats-io/nats.go v1.8.1 github.com/nlopes/slack v0.5.0 - github.com/pborman/uuid v1.2.0 github.com/pkg/errors v0.8.1 github.com/technoweenie/multipartstreamer v1.0.1 // indirect go.etcd.io/etcd v3.3.13+incompatible golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5 golang.org/x/net v0.0.0-20190606173856-1492cefac77f google.golang.org/grpc v1.21.1 - gopkg.in/bsm/ratelimit.v1 v1.0.0-20160220154919-db14e161995a // indirect gopkg.in/go-playground/validator.v9 v9.29.0 - gopkg.in/redis.v3 v3.6.4 gopkg.in/src-d/go-git.v4 v4.11.0 gopkg.in/telegram-bot-api.v4 v4.6.4 ) diff --git a/go.sum b/go.sum index a21651f2..1e4d5b40 100644 --- a/go.sum +++ b/go.sum @@ -14,8 +14,6 @@ github.com/beevik/ntp v0.2.0/go.mod h1:hIHWr+l3+/clUnF44zdK+CWW7fO8dR5cIylAQ76NR github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bitly/go-simplejson v0.5.0 h1:6IH+V8/tVMab511d5bn4M7EwGXZf9Hj6i2xSwkNEM+Y= github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA= -github.com/bradfitz/gomemcache v0.0.0-20190329173943-551aad21a668 h1:U/lr3Dgy4WK+hNk4tyD+nuGjpVLPEHuJSFXMw11/HPA= -github.com/bradfitz/gomemcache v0.0.0-20190329173943-551aad21a668/go.mod h1:H0wQNHz2YrLsuXOZozoeDmnHXkNCRmMW0gwFWDfEZDA= github.com/bwmarrin/discordgo v0.19.0 h1:kMED/DB0NR1QhRcalb85w0Cu3Ep2OrGAqZH1R5awQiY= github.com/bwmarrin/discordgo v0.19.0/go.mod h1:O9S4p+ofTFwB02em7jkpkV8M3R0/PUVOwN61zSZ0r4Q= github.com/cheekybits/genny v1.0.0 h1:uGGa4nei+j20rOSeDeP5Of12XVm7TGUd4dJA9RDitfE= @@ -52,8 +50,6 @@ github.com/go-playground/locales v0.12.1 h1:2FITxuFt/xuCNP1Acdhv62OzaCiviiE4kotf github.com/go-playground/locales v0.12.1/go.mod h1:IUMDtCfWo/w/mtMfIE/IG2K+Ey3ygWanZIBtBW0W2TM= github.com/go-playground/universal-translator v0.16.0 h1:X++omBR/4cE2MNg91AoC3rmGrCjJ8eAeUP/K/EKx4DM= github.com/go-playground/universal-translator v0.16.0/go.mod h1:1AnU7NaIRDWWzGEKwgtJRd2xk99HeFyHw3yid4rvQIY= -github.com/go-redsync/redsync v1.2.0 h1:a4y3xKQUOA5092Grjps3F5vaRbjA9uoUB59RVwOMttA= -github.com/go-redsync/redsync v1.2.0/go.mod h1:QClK/s99KRhfKdpxLTMsI5mSu43iLp0NfOneLPie+78= github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -63,13 +59,10 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.3.0/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a1/R87v0= github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0= -github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/handlers v1.4.0 h1:XulKRWSQK5uChr4pEgSE4Tc/OcmnU9GJuSwdog/tZsA= @@ -177,8 +170,6 @@ github.com/opencontainers/image-spec v1.0.1/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zM github.com/opencontainers/runc v0.1.1 h1:GlxAyO6x8rfZYN9Tt0Kti5a/cP41iuiO2yYT0IJGY8Y= github.com/opencontainers/runc v0.1.1/go.mod h1:qT5XzbpPznkRYVz/mWwUaVBUv2rmF59PVA73FjuZG0U= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= -github.com/pborman/uuid v1.2.0 h1:J7Q5mO4ysT1dv8hyrUGHb9+ooztCXu1D8MY8DZYsu3g= -github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pelletier/go-buffruneio v0.2.0 h1:U4t4R6YkofJ5xHm3dJzuRpPZ0mr5MMCoAWooScCR7aA= github.com/pelletier/go-buffruneio v0.2.0/go.mod h1:JkE26KsDizTr40EUHkXVtNPvgGtbSNq5BcowyYOWdKo= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -199,7 +190,6 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKNihBbwlX8dLpwxCl3+HnXKV/R0e+sRLd9C8= github.com/technoweenie/multipartstreamer v1.0.1 h1:XRztA5MXiR1TIRHxH2uNxXxaIkKQDeX7m2XsSOlQEnM= github.com/technoweenie/multipartstreamer v1.0.1/go.mod h1:jNVxdtShOxzAsukZwTSw6MDx5eUJoiEBsSvzDU9uzog= github.com/xanzy/ssh-agent v0.2.0 h1:Adglfbi5p9Z0BmK2oKU9nTG+zKfniSfnaMYB+ULd+Ro= @@ -248,15 +238,11 @@ google.golang.org/genproto v0.0.0-20180831171423-11092d34479b h1:lohp5blsw53GBXt google.golang.org/genproto v0.0.0-20180831171423-11092d34479b/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/grpc v1.21.1 h1:j6XxA85m/6txkUCHvzlV5f+HBNl/1r5cZ2A/3IEFOO8= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= -gopkg.in/bsm/ratelimit.v1 v1.0.0-20160220154919-db14e161995a h1:stTHdEoWg1pQ8riaP5ROrjS6zy6wewH/Q2iwnLCQUXY= -gopkg.in/bsm/ratelimit.v1 v1.0.0-20160220154919-db14e161995a/go.mod h1:KF9sEfUPAXdG8Oev9e99iLGnl2uJMjc5B+4y3O7x610= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/go-playground/validator.v9 v9.29.0 h1:5ofssLNYgAA/inWn6rTZ4juWpRJUwEnXc1LG2IeXwgQ= gopkg.in/go-playground/validator.v9 v9.29.0/go.mod h1:+c9/zcJMFNgbLvly1L1V+PpxWdVbfP1avr/N00E2vyQ= -gopkg.in/redis.v3 v3.6.4 h1:u7XgPH1rWwsdZnR+azldXC6x9qDU2luydOIeU/l52fE= -gopkg.in/redis.v3 v3.6.4/go.mod h1:6XeGv/CrsUFDU9aVbUdNykN7k1zVmoeg83KC9RbQfiU= gopkg.in/src-d/go-billy.v4 v4.2.1 h1:omN5CrMrMcQ+4I8bJ0wEhOBPanIRWzFC953IiXKdYzo= gopkg.in/src-d/go-billy.v4 v4.2.1/go.mod h1:tm33zBoOwxjYHZIE+OV8bxTWFMJLrconzFMd38aARFk= gopkg.in/src-d/go-git-fixtures.v3 v3.1.1/go.mod h1:dLBcvytrw/TYZsNTWCnkNF2DSIlzWYqTe3rJR56Ac7g= diff --git a/sync/data/etcd/etcd.go b/sync/data/etcd/etcd.go deleted file mode 100644 index fa0db751..00000000 --- a/sync/data/etcd/etcd.go +++ /dev/null @@ -1,93 +0,0 @@ -// Package etcd is an etcd v3 implementation of kv -package etcd - -import ( - "context" - "log" - - "github.com/micro/go-micro/sync/data" - client "go.etcd.io/etcd/clientv3" -) - -type ekv struct { - kv client.KV -} - -func (e *ekv) Read(key string) (*data.Record, error) { - keyval, err := e.kv.Get(context.Background(), key) - if err != nil { - return nil, err - } - - if keyval == nil || len(keyval.Kvs) == 0 { - return nil, data.ErrNotFound - } - - return &data.Record{ - Key: string(keyval.Kvs[0].Key), - Value: keyval.Kvs[0].Value, - }, nil -} - -func (e *ekv) Delete(key string) error { - _, err := e.kv.Delete(context.Background(), key) - return err -} - -func (e *ekv) Write(record *data.Record) error { - _, err := e.kv.Put(context.Background(), record.Key, string(record.Value)) - return err -} - -func (e *ekv) Dump() ([]*data.Record, error) { - keyval, err := e.kv.Get(context.Background(), "/", client.WithPrefix()) - if err != nil { - return nil, err - } - var vals []*data.Record - if keyval == nil || len(keyval.Kvs) == 0 { - return vals, nil - } - for _, keyv := range keyval.Kvs { - vals = append(vals, &data.Record{ - Key: string(keyv.Key), - Value: keyv.Value, - }) - } - return vals, nil -} - -func (e *ekv) String() string { - return "etcd" -} - -func NewData(opts ...data.Option) data.Data { - var options data.Options - for _, o := range opts { - o(&options) - } - - var endpoints []string - - for _, addr := range options.Nodes { - if len(addr) > 0 { - endpoints = append(endpoints, addr) - } - } - - if len(endpoints) == 0 { - endpoints = []string{"http://127.0.0.1:2379"} - } - - // TODO: parse addresses - c, err := client.New(client.Config{ - Endpoints: endpoints, - }) - if err != nil { - log.Fatal(err) - } - - return &ekv{ - kv: client.NewKV(c), - } -} diff --git a/sync/data/memcached/memcached.go b/sync/data/memcached/memcached.go deleted file mode 100644 index 69cdd19b..00000000 --- a/sync/data/memcached/memcached.go +++ /dev/null @@ -1,178 +0,0 @@ -package memcached - -import ( - "bufio" - "bytes" - "fmt" - "io" - "net" - "strings" - "time" - - mc "github.com/bradfitz/gomemcache/memcache" - "github.com/micro/go-micro/sync/data" -) - -type mkv struct { - Server *mc.ServerList - Client *mc.Client -} - -func (m *mkv) Read(key string) (*data.Record, error) { - keyval, err := m.Client.Get(key) - if err != nil && err == mc.ErrCacheMiss { - return nil, data.ErrNotFound - } else if err != nil { - return nil, err - } - - if keyval == nil { - return nil, data.ErrNotFound - } - - return &data.Record{ - Key: keyval.Key, - Value: keyval.Value, - Expiration: time.Second * time.Duration(keyval.Expiration), - }, nil -} - -func (m *mkv) Delete(key string) error { - return m.Client.Delete(key) -} - -func (m *mkv) Write(record *data.Record) error { - return m.Client.Set(&mc.Item{ - Key: record.Key, - Value: record.Value, - Expiration: int32(record.Expiration.Seconds()), - }) -} - -func (m *mkv) Dump() ([]*data.Record, error) { - // stats - // cachedump - // get keys - - var keys []string - - //data := make(map[string]string) - if err := m.Server.Each(func(c net.Addr) error { - cc, err := net.Dial("tcp", c.String()) - if err != nil { - return err - } - defer cc.Close() - - b := bufio.NewReadWriter(bufio.NewReader(cc), bufio.NewWriter(cc)) - - // get records - if _, err := fmt.Fprintf(b, "stats records\r\n"); err != nil { - return err - } - - b.Flush() - - v, err := b.ReadSlice('\n') - if err != nil { - return err - } - - parts := bytes.Split(v, []byte("\n")) - if len(parts) < 1 { - return nil - } - vals := strings.Split(string(parts[0]), ":") - records := vals[1] - - // drain - for { - buf, err := b.ReadSlice('\n') - if err == io.EOF { - break - } - if err != nil { - return err - } - if strings.HasPrefix(string(buf), "END") { - break - } - } - - b.Writer.Reset(cc) - b.Reader.Reset(cc) - - if _, err := fmt.Fprintf(b, "lru_crawler metadump %s\r\n", records); err != nil { - return err - } - b.Flush() - - for { - v, err := b.ReadString('\n') - if err == io.EOF { - break - } - if err != nil { - return err - } - if strings.HasPrefix(v, "END") { - break - } - key := strings.Split(v, " ")[0] - keys = append(keys, strings.TrimPrefix(key, "key=")) - } - - return nil - }); err != nil { - return nil, err - } - - var vals []*data.Record - - // concurrent op - ch := make(chan *data.Record, len(keys)) - - for _, k := range keys { - go func(key string) { - i, _ := m.Read(key) - ch <- i - }(k) - } - - for i := 0; i < len(keys); i++ { - record := <-ch - - if record == nil { - continue - } - - vals = append(vals, record) - } - - close(ch) - - return vals, nil -} - -func (m *mkv) String() string { - return "memcached" -} - -func NewData(opts ...data.Option) data.Data { - var options data.Options - for _, o := range opts { - o(&options) - } - - if len(options.Nodes) == 0 { - options.Nodes = []string{"127.0.0.1:11211"} - } - - ss := new(mc.ServerList) - ss.SetServers(options.Nodes...) - - return &mkv{ - Server: ss, - Client: mc.New(options.Nodes...), - } -} diff --git a/sync/data/redis/redis.go b/sync/data/redis/redis.go deleted file mode 100644 index 05f05529..00000000 --- a/sync/data/redis/redis.go +++ /dev/null @@ -1,82 +0,0 @@ -package redis - -import ( - "github.com/micro/go-micro/sync/data" - redis "gopkg.in/redis.v3" -) - -type rkv struct { - Client *redis.Client -} - -func (r *rkv) Read(key string) (*data.Record, error) { - val, err := r.Client.Get(key).Bytes() - - if err != nil && err == redis.Nil { - return nil, data.ErrNotFound - } else if err != nil { - return nil, err - } - - if val == nil { - return nil, data.ErrNotFound - } - - d, err := r.Client.TTL(key).Result() - if err != nil { - return nil, err - } - - return &data.Record{ - Key: key, - Value: val, - Expiration: d, - }, nil -} - -func (r *rkv) Delete(key string) error { - return r.Client.Del(key).Err() -} - -func (r *rkv) Write(record *data.Record) error { - return r.Client.Set(record.Key, record.Value, record.Expiration).Err() -} - -func (r *rkv) Dump() ([]*data.Record, error) { - keys, err := r.Client.Keys("*").Result() - if err != nil { - return nil, err - } - var vals []*data.Record - for _, k := range keys { - i, err := r.Read(k) - if err != nil { - return nil, err - } - vals = append(vals, i) - } - return vals, nil -} - -func (r *rkv) String() string { - return "redis" -} - -func NewData(opts ...data.Option) data.Data { - var options data.Options - for _, o := range opts { - o(&options) - } - - if len(options.Nodes) == 0 { - options.Nodes = []string{"127.0.0.1:6379"} - } - - return &rkv{ - Client: redis.NewClient(&redis.Options{ - Addr: options.Nodes[0], - Password: "", // no password set - DB: 0, // use default DB - }), - } -} diff --git a/sync/leader/etcd/etcd.go b/sync/leader/etcd/etcd.go deleted file mode 100644 index 6179bf0b..00000000 --- a/sync/leader/etcd/etcd.go +++ /dev/null @@ -1,145 +0,0 @@ -package etcd - -import ( - "context" - "log" - "path" - "strings" - - client "github.com/coreos/etcd/clientv3" - cc "github.com/coreos/etcd/clientv3/concurrency" - "github.com/micro/go-micro/sync/leader" -) - -type etcdLeader struct { - opts leader.Options - path string - client *client.Client -} - -type etcdElected struct { - s *cc.Session - e *cc.Election - id string -} - -func (e *etcdLeader) Elect(id string, opts ...leader.ElectOption) (leader.Elected, error) { - var options leader.ElectOptions - for _, o := range opts { - o(&options) - } - - // make path - path := path.Join(e.path, strings.Replace(id, "/", "-", -1)) - - s, err := cc.NewSession(e.client) - if err != nil { - return nil, err - } - - l := cc.NewElection(s, path) - - ctx, _ := context.WithCancel(context.Background()) - - if err := l.Campaign(ctx, id); err != nil { - return nil, err - } - - return &etcdElected{ - e: l, - id: id, - }, nil -} - -func (e *etcdLeader) Follow() chan string { - ch := make(chan string) - - s, err := cc.NewSession(e.client) - if err != nil { - return ch - } - - l := cc.NewElection(s, e.path) - ech := l.Observe(context.Background()) - - go func() { - for { - select { - case r, ok := <-ech: - if !ok { - return - } - ch <- string(r.Kvs[0].Value) - } - } - }() - - return ch -} - -func (e *etcdLeader) String() string { - return "etcd" -} - -func (e *etcdElected) Reelect() error { - ctx, _ := context.WithCancel(context.Background()) - return e.e.Campaign(ctx, e.id) -} - -func (e *etcdElected) Revoked() chan bool { - ch := make(chan bool, 1) - ech := e.e.Observe(context.Background()) - - go func() { - for r := range ech { - if string(r.Kvs[0].Value) != e.id { - ch <- true - close(ch) - return - } - } - }() - - return ch -} - -func (e *etcdElected) Resign() error { - return e.e.Resign(context.Background()) -} - -func (e *etcdElected) Id() string { - return e.id -} - -func NewLeader(opts ...leader.Option) leader.Leader { - var options leader.Options - for _, o := range opts { - o(&options) - } - - var endpoints []string - - for _, addr := range options.Nodes { - if len(addr) > 0 { - endpoints = append(endpoints, addr) - } - } - - if len(endpoints) == 0 { - endpoints = []string{"http://127.0.0.1:2379"} - } - - // TODO: parse addresses - c, err := client.New(client.Config{ - Endpoints: endpoints, - }) - if err != nil { - log.Fatal(err) - } - - return &etcdLeader{ - path: "/micro/leader", - client: c, - opts: options, - } -} diff --git a/sync/lock/etcd/etcd.go b/sync/lock/etcd/etcd.go deleted file mode 100644 index 0b8ba823..00000000 --- a/sync/lock/etcd/etcd.go +++ /dev/null @@ -1,115 +0,0 @@ -// Package etcd is an etcd implementation of lock -package etcd - -import ( - "context" - "errors" - "log" - "path" - "strings" - "sync" - - client "github.com/coreos/etcd/clientv3" - cc "github.com/coreos/etcd/clientv3/concurrency" - "github.com/micro/go-micro/sync/lock" -) - -type etcdLock struct { - opts lock.Options - path string - client *client.Client - - sync.Mutex - locks map[string]*elock -} - -type elock struct { - s *cc.Session - m *cc.Mutex -} - -func (e *etcdLock) Acquire(id string, opts ...lock.AcquireOption) error { - var options lock.AcquireOptions - for _, o := range opts { - o(&options) - } - - // make path - path := path.Join(e.path, strings.Replace(e.opts.Prefix+id, "/", "-", -1)) - - var sopts []cc.SessionOption - if options.TTL > 0 { - sopts = append(sopts, cc.WithTTL(int(options.TTL.Seconds()))) - } - - s, err := cc.NewSession(e.client, sopts...) - if err != nil { - return err - } - - m := cc.NewMutex(s, path) - - ctx, _ := context.WithCancel(context.Background()) - - if err := m.Lock(ctx); err != nil { - return err - } - - e.Lock() - e.locks[id] = &elock{ - s: s, - m: m, - } - e.Unlock() - return nil -} - -func (e *etcdLock) Release(id string) error { - e.Lock() - defer e.Unlock() - v, ok := e.locks[id] - if !ok { - return errors.New("lock not found") - } - err := v.m.Unlock(context.Background()) - delete(e.locks, id) - return err -} - -func (e *etcdLock) String() string { - return "etcd" -} - -func NewLock(opts ...lock.Option) lock.Lock { - var options lock.Options - for _, o := range opts { - o(&options) - } - - var endpoints []string - - for _, addr := range options.Nodes { - if len(addr) > 0 { - endpoints = append(endpoints, addr) - } - } - - if len(endpoints) == 0 { - endpoints = []string{"http://127.0.0.1:2379"} - } - - // TODO: parse addresses - c, err := client.New(client.Config{ - Endpoints: endpoints, - }) - if err != nil { - log.Fatal(err) - } - - return &etcdLock{ - path: "/micro/lock", - client: c, - opts: options, - locks: make(map[string]*elock), - } -} diff --git a/sync/lock/redis/pool.go b/sync/lock/redis/pool.go deleted file mode 100644 index bc80a0b9..00000000 --- a/sync/lock/redis/pool.go +++ /dev/null @@ -1,29 +0,0 @@ -package redis - -import ( - "sync" - - "github.com/gomodule/redigo/redis" -) - -type pool struct { - sync.Mutex - i int - addrs []string -} - -func (p *pool) Get() redis.Conn { - for i := 0; i < 3; i++ { - p.Lock() - addr := p.addrs[p.i%len(p.addrs)] - p.i++ - p.Unlock() - - c, err := redis.Dial("tcp", addr) - if err != nil { - continue - } - return c - } - return nil -} diff --git a/sync/lock/redis/redis.go b/sync/lock/redis/redis.go deleted file mode 100644 index 2ea4a676..00000000 --- a/sync/lock/redis/redis.go +++ /dev/null @@ -1,94 +0,0 @@ -// Package redis is a redis implemenation of lock -package redis - -import ( - "errors" - "sync" - "time" - - "github.com/go-redsync/redsync" - "github.com/micro/go-micro/sync/lock" -) - -type redisLock struct { - sync.Mutex - - locks map[string]*redsync.Mutex - opts lock.Options - c *redsync.Redsync -} - -func (r *redisLock) Acquire(id string, opts ...lock.AcquireOption) error { - var options lock.AcquireOptions - for _, o := range opts { - o(&options) - } - - var ropts []redsync.Option - - if options.Wait > time.Duration(0) { - ropts = append(ropts, redsync.SetRetryDelay(options.Wait)) - ropts = append(ropts, redsync.SetTries(1)) - } - - if options.TTL > time.Duration(0) { - ropts = append(ropts, redsync.SetExpiry(options.TTL)) - } - - m := r.c.NewMutex(r.opts.Prefix+id, ropts...) - err := m.Lock() - if err != nil { - return err - } - - r.Lock() - r.locks[id] = m - r.Unlock() - - return nil -} - -func (r *redisLock) Release(id string) error { - r.Lock() - defer r.Unlock() - m, ok := r.locks[id] - if !ok { - return errors.New("lock not found") - } - - unlocked := m.Unlock() - delete(r.locks, id) - - if !unlocked { - return errors.New("lock not unlocked") - } - - return nil -} - -func (r *redisLock) String() string { - return "redis" -} - -func NewLock(opts ...lock.Option) lock.Lock { - var options lock.Options - for _, o := range opts { - o(&options) - } - - nodes := options.Nodes - - if len(nodes) == 0 { - nodes = []string{"127.0.0.1:6379"} - } - - rpool := redsync.New([]redsync.Pool{&pool{ - addrs: nodes, - }}) - - return &redisLock{ - locks: make(map[string]*redsync.Mutex), - opts: options, - c: rpool, - } -}