diff --git a/store/etcd/config.go b/store/etcd/config.go deleted file mode 100644 index eb79cea1..00000000 --- a/store/etcd/config.go +++ /dev/null @@ -1,178 +0,0 @@ -package etcd - -import ( - "context" - cryptotls "crypto/tls" - "time" - - "github.com/coreos/etcd/clientv3" - "github.com/micro/go-micro/v2/store" - "google.golang.org/grpc" -) - -// Implement all the options from https://pkg.go.dev/github.com/coreos/etcd/clientv3?tab=doc#Config -// Need to use non basic types in context.WithValue -type autoSyncInterval string -type dialTimeout string -type dialKeepAliveTime string -type dialKeepAliveTimeout string -type maxCallSendMsgSize string -type maxCallRecvMsgSize string -type tls string -type username string -type password string -type rejectOldCluster string -type dialOptions string -type clientContext string -type permitWithoutStream string - -// AutoSyncInterval is the interval to update endpoints with its latest members. -// 0 disables auto-sync. By default auto-sync is disabled. -func AutoSyncInterval(d time.Duration) store.Option { - return func(o *store.Options) { - o.Context = context.WithValue(o.Context, autoSyncInterval(""), d) - } -} - -// DialTimeout is the timeout for failing to establish a connection. -func DialTimeout(d time.Duration) store.Option { - return func(o *store.Options) { - o.Context = context.WithValue(o.Context, dialTimeout(""), d) - } -} - -// DialKeepAliveTime is the time after which client pings the server to see if -// transport is alive. -func DialKeepAliveTime(d time.Duration) store.Option { - return func(o *store.Options) { - o.Context = context.WithValue(o.Context, dialKeepAliveTime(""), d) - } -} - -// DialKeepAliveTimeout is the time that the client waits for a response for the -// keep-alive probe. If the response is not received in this time, the connection is closed. -func DialKeepAliveTimeout(d time.Duration) store.Option { - return func(o *store.Options) { - o.Context = context.WithValue(o.Context, dialKeepAliveTimeout(""), d) - } -} - -// MaxCallSendMsgSize is the client-side request send limit in bytes. -// If 0, it defaults to 2.0 MiB (2 * 1024 * 1024). -// Make sure that "MaxCallSendMsgSize" < server-side default send/recv limit. -// ("--max-request-bytes" flag to etcd or "embed.Config.MaxRequestBytes"). -func MaxCallSendMsgSize(size int) store.Option { - return func(o *store.Options) { - o.Context = context.WithValue(o.Context, maxCallSendMsgSize(""), size) - } -} - -// MaxCallRecvMsgSize is the client-side response receive limit. -// If 0, it defaults to "math.MaxInt32", because range response can -// easily exceed request send limits. -// Make sure that "MaxCallRecvMsgSize" >= server-side default send/recv limit. -// ("--max-request-bytes" flag to etcd or "embed.Config.MaxRequestBytes"). -func MaxCallRecvMsgSize(size int) store.Option { - return func(o *store.Options) { - o.Context = context.WithValue(o.Context, maxCallRecvMsgSize(""), size) - } -} - -// TLS holds the client secure credentials, if any. -func TLS(conf *cryptotls.Config) store.Option { - return func(o *store.Options) { - t := conf.Clone() - o.Context = context.WithValue(o.Context, tls(""), t) - } -} - -// Username is a user name for authentication. -func Username(u string) store.Option { - return func(o *store.Options) { - o.Context = context.WithValue(o.Context, username(""), u) - } -} - -// Password is a password for authentication. -func Password(p string) store.Option { - return func(o *store.Options) { - o.Context = context.WithValue(o.Context, password(""), p) - } -} - -// RejectOldCluster when set will refuse to create a client against an outdated cluster. -func RejectOldCluster(b bool) store.Option { - return func(o *store.Options) { - o.Context = context.WithValue(o.Context, rejectOldCluster(""), b) - } -} - -// DialOptions is a list of dial options for the grpc client (e.g., for interceptors). -// For example, pass "grpc.WithBlock()" to block until the underlying connection is up. -// Without this, Dial returns immediately and connecting the server happens in background. -func DialOptions(opts []grpc.DialOption) store.Option { - return func(o *store.Options) { - if len(opts) > 0 { - ops := make([]grpc.DialOption, len(opts)) - copy(ops, opts) - o.Context = context.WithValue(o.Context, dialOptions(""), ops) - } - } -} - -// ClientContext is the default etcd3 client context; it can be used to cancel grpc -// dial out andother operations that do not have an explicit context. -func ClientContext(ctx context.Context) store.Option { - return func(o *store.Options) { - o.Context = context.WithValue(o.Context, clientContext(""), ctx) - } -} - -// PermitWithoutStream when set will allow client to send keepalive pings to server without any active streams(RPCs). -func PermitWithoutStream(b bool) store.Option { - return func(o *store.Options) { - o.Context = context.WithValue(o.Context, permitWithoutStream(""), b) - } -} - -func (e *etcdStore) applyConfig(cfg *clientv3.Config) { - if v := e.options.Context.Value(autoSyncInterval("")); v != nil { - cfg.AutoSyncInterval = v.(time.Duration) - } - if v := e.options.Context.Value(dialTimeout("")); v != nil { - cfg.DialTimeout = v.(time.Duration) - } - if v := e.options.Context.Value(dialKeepAliveTime("")); v != nil { - cfg.DialKeepAliveTime = v.(time.Duration) - } - if v := e.options.Context.Value(dialKeepAliveTimeout("")); v != nil { - cfg.DialKeepAliveTimeout = v.(time.Duration) - } - if v := e.options.Context.Value(maxCallSendMsgSize("")); v != nil { - cfg.MaxCallSendMsgSize = v.(int) - } - if v := e.options.Context.Value(maxCallRecvMsgSize("")); v != nil { - cfg.MaxCallRecvMsgSize = v.(int) - } - if v := e.options.Context.Value(tls("")); v != nil { - cfg.TLS = v.(*cryptotls.Config) - } - if v := e.options.Context.Value(username("")); v != nil { - cfg.Username = v.(string) - } - if v := e.options.Context.Value(password("")); v != nil { - cfg.Username = v.(string) - } - if v := e.options.Context.Value(rejectOldCluster("")); v != nil { - cfg.RejectOldCluster = v.(bool) - } - if v := e.options.Context.Value(dialOptions("")); v != nil { - cfg.DialOptions = v.([]grpc.DialOption) - } - if v := e.options.Context.Value(clientContext("")); v != nil { - cfg.Context = v.(context.Context) - } - if v := e.options.Context.Value(permitWithoutStream("")); v != nil { - cfg.PermitWithoutStream = v.(bool) - } -} diff --git a/store/etcd/etcd.go b/store/etcd/etcd.go deleted file mode 100644 index d4d9cf97..00000000 --- a/store/etcd/etcd.go +++ /dev/null @@ -1,272 +0,0 @@ -// Package etcd implements a go-micro/v2/store with etcd -package etcd - -import ( - "bytes" - "context" - "encoding/gob" - "math" - "strings" - "time" - - "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/clientv3/namespace" - "github.com/micro/go-micro/v2/store" - "github.com/pkg/errors" -) - -type etcdStore struct { - options store.Options - - client *clientv3.Client - config clientv3.Config -} - -// NewStore returns a new etcd store -func NewStore(opts ...store.Option) store.Store { - e := &etcdStore{} - for _, o := range opts { - o(&e.options) - } - e.init() - return e -} - -func (e *etcdStore) Close() error { - return e.client.Close() -} - -func (e *etcdStore) Init(opts ...store.Option) error { - for _, o := range opts { - o(&e.options) - } - return e.init() -} - -func (e *etcdStore) init() error { - // ensure context is non-nil - e.options.Context = context.Background() - // set up config - e.config = clientv3.Config{} - e.applyConfig(&e.config) - if len(e.options.Nodes) == 0 { - e.config.Endpoints = []string{"http://127.0.0.1:2379"} - } else { - e.config.Endpoints = make([]string, len(e.options.Nodes)) - copy(e.config.Endpoints, e.options.Nodes) - } - if e.client != nil { - e.client.Close() - } - client, err := clientv3.New(e.config) - if err != nil { - return err - } - e.client = client - ns := "" - if len(e.options.Table) > 0 { - ns = e.options.Table - } - if len(e.options.Database) > 0 { - ns = e.options.Database + "/" + ns - } - if len(ns) > 0 { - e.client.KV = namespace.NewKV(e.client.KV, ns) - e.client.Watcher = namespace.NewWatcher(e.client.Watcher, ns) - e.client.Lease = namespace.NewLease(e.client.Lease, ns) - } - - return nil -} - -func (e *etcdStore) Options() store.Options { - return e.options -} - -func (e *etcdStore) String() string { - return "etcd" -} - -func (e *etcdStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) { - readOpts := store.ReadOptions{} - for _, o := range opts { - o(&readOpts) - } - if readOpts.Suffix { - return e.readSuffix(key, readOpts) - } - - var etcdOpts []clientv3.OpOption - if readOpts.Prefix { - etcdOpts = append(etcdOpts, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) - } - resp, err := e.client.KV.Get(context.Background(), key, etcdOpts...) - if err != nil { - return nil, err - } - if resp.Count == 0 && !(readOpts.Prefix || readOpts.Suffix) { - return nil, store.ErrNotFound - } - var records []*store.Record - for _, kv := range resp.Kvs { - ir := internalRecord{} - if err := gob.NewDecoder(bytes.NewReader(kv.Value)).Decode(&ir); err != nil { - return records, errors.Wrapf(err, "couldn't decode %s into internalRecord", err.Error()) - } - r := store.Record{ - Key: ir.Key, - Value: ir.Value, - } - if !ir.ExpiresAt.IsZero() { - r.Expiry = time.Until(ir.ExpiresAt) - } - records = append(records, &r) - } - if readOpts.Limit > 0 || readOpts.Offset > 0 { - return records[readOpts.Offset:min(readOpts.Limit, uint(len(records)))], nil - } - return records, nil -} - -func (e *etcdStore) readSuffix(key string, readOpts store.ReadOptions) ([]*store.Record, error) { - opts := []store.ListOption{store.ListSuffix(key)} - if readOpts.Prefix { - opts = append(opts, store.ListPrefix(key)) - } - keys, err := e.List(opts...) - if err != nil { - return nil, errors.Wrapf(err, "Couldn't list with suffix %s", key) - } - var records []*store.Record - for _, k := range keys { - resp, err := e.client.KV.Get(context.Background(), k) - if err != nil { - return nil, errors.Wrapf(err, "Couldn't get key %s", k) - } - ir := internalRecord{} - if err := gob.NewDecoder(bytes.NewReader(resp.Kvs[0].Value)).Decode(&ir); err != nil { - return records, errors.Wrapf(err, "couldn't decode %s into internalRecord", err.Error()) - } - r := store.Record{ - Key: ir.Key, - Value: ir.Value, - } - if !ir.ExpiresAt.IsZero() { - r.Expiry = time.Until(ir.ExpiresAt) - } - records = append(records, &r) - - } - if readOpts.Limit > 0 || readOpts.Offset > 0 { - return records[readOpts.Offset:min(readOpts.Limit, uint(len(records)))], nil - } - return records, nil -} - -func (e *etcdStore) Write(r *store.Record, opts ...store.WriteOption) error { - options := store.WriteOptions{} - for _, o := range opts { - o(&options) - } - - if len(opts) > 0 { - // Copy the record before applying options, or the incoming record will be mutated - newRecord := store.Record{} - newRecord.Key = r.Key - newRecord.Value = make([]byte, len(r.Value)) - copy(newRecord.Value, r.Value) - newRecord.Expiry = r.Expiry - - if !options.Expiry.IsZero() { - newRecord.Expiry = time.Until(options.Expiry) - } - if options.TTL != 0 { - newRecord.Expiry = options.TTL - } - return e.write(&newRecord) - } - return e.write(r) -} - -func (e *etcdStore) write(r *store.Record) error { - var putOpts []clientv3.OpOption - ir := &internalRecord{} - ir.Key = r.Key - ir.Value = make([]byte, len(r.Value)) - copy(ir.Value, r.Value) - if r.Expiry != 0 { - ir.ExpiresAt = time.Now().Add(r.Expiry) - var leasexpiry int64 - if r.Expiry.Seconds() < 5.0 { - // minimum etcd lease is 5 seconds - leasexpiry = 5 - } else { - leasexpiry = int64(math.Ceil(r.Expiry.Seconds())) - } - lr, err := e.client.Lease.Grant(context.Background(), leasexpiry) - if err != nil { - return errors.Wrapf(err, "couldn't grant an etcd lease for %s", r.Key) - } - putOpts = append(putOpts, clientv3.WithLease(lr.ID)) - } - b := &bytes.Buffer{} - if err := gob.NewEncoder(b).Encode(ir); err != nil { - return errors.Wrapf(err, "couldn't encode %s", r.Key) - } - _, err := e.client.KV.Put(context.Background(), ir.Key, string(b.Bytes()), putOpts...) - return errors.Wrapf(err, "couldn't put key %s in to etcd", err) -} - -func (e *etcdStore) Delete(key string, opts ...store.DeleteOption) error { - options := store.DeleteOptions{} - for _, o := range opts { - o(&options) - } - _, err := e.client.KV.Delete(context.Background(), key) - return errors.Wrapf(err, "couldn't delete key %s", key) -} - -func (e *etcdStore) List(opts ...store.ListOption) ([]string, error) { - options := store.ListOptions{} - for _, o := range opts { - o(&options) - } - searchPrefix := "" - if len(options.Prefix) > 0 { - searchPrefix = options.Prefix - } - resp, err := e.client.KV.Get(context.Background(), searchPrefix, clientv3.WithPrefix(), clientv3.WithKeysOnly(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) - if err != nil { - return nil, errors.Wrap(err, "couldn't list, etcd get failed") - } - if len(options.Suffix) == 0 { - keys := make([]string, resp.Count) - for i, kv := range resp.Kvs { - keys[i] = string(kv.Key) - } - return keys, nil - } - keys := []string{} - for _, kv := range resp.Kvs { - if strings.HasSuffix(string(kv.Key), options.Suffix) { - keys = append(keys, string(kv.Key)) - } - } - if options.Limit > 0 || options.Offset > 0 { - return keys[options.Offset:min(options.Limit, uint(len(keys)))], nil - } - return keys, nil -} - -type internalRecord struct { - Key string - Value []byte - ExpiresAt time.Time -} - -func min(i, j uint) uint { - if i < j { - return i - } - return j -} diff --git a/store/etcd/etcd_test.go b/store/etcd/etcd_test.go deleted file mode 100644 index fade2fce..00000000 --- a/store/etcd/etcd_test.go +++ /dev/null @@ -1,225 +0,0 @@ -package etcd - -import ( - "fmt" - "testing" - "time" - - "github.com/kr/pretty" - "github.com/micro/go-micro/v2/store" -) - -func TestEtcd(t *testing.T) { - e := NewStore() - if err := e.Init(); err != nil { - t.Fatal(err) - } - //basictest(e, t) -} - -func basictest(s store.Store, t *testing.T) { - t.Logf("Testing store %s, with options %# v\n", s.String(), pretty.Formatter(s.Options())) - // Read and Write an expiring Record - if err := s.Write(&store.Record{ - Key: "Hello", - Value: []byte("World"), - Expiry: time.Second * 5, - }); err != nil { - t.Fatal(err) - } - if r, err := s.Read("Hello"); err != nil { - t.Fatal(err) - } else { - if len(r) != 1 { - t.Fatal("Read returned multiple records") - } - if r[0].Key != "Hello" { - t.Fatalf("Expected %s, got %s", "Hello", r[0].Key) - } - if string(r[0].Value) != "World" { - t.Fatalf("Expected %s, got %s", "World", r[0].Value) - } - } - time.Sleep(time.Second * 6) - if records, err := s.Read("Hello"); err != store.ErrNotFound { - t.Fatalf("Expected %# v, got %# v\nResults were %# v", store.ErrNotFound, err, pretty.Formatter(records)) - } - - // Write 3 records with various expiry and get with prefix - records := []*store.Record{ - &store.Record{ - Key: "foo", - Value: []byte("foofoo"), - }, - &store.Record{ - Key: "foobar", - Value: []byte("foobarfoobar"), - Expiry: time.Second * 5, - }, - &store.Record{ - Key: "foobarbaz", - Value: []byte("foobarbazfoobarbaz"), - Expiry: 2 * time.Second * 5, - }, - } - for _, r := range records { - if err := s.Write(r); err != nil { - t.Fatalf("Couldn't write k: %s, v: %# v (%s)", r.Key, pretty.Formatter(r.Value), err) - } - } - if results, err := s.Read("foo", store.ReadPrefix()); err != nil { - t.Fatalf("Couldn't read all \"foo\" keys, got %# v (%s)", pretty.Formatter(results), err) - } else { - if len(results) != 3 { - t.Fatalf("Expected 3 items, got %d", len(results)) - } - } - time.Sleep(time.Second * 6) - if results, err := s.Read("foo", store.ReadPrefix()); err != nil { - t.Fatalf("Couldn't read all \"foo\" keys, got %# v (%s)", pretty.Formatter(results), err) - } else { - if len(results) != 2 { - t.Fatalf("Expected 2 items, got %d", len(results)) - } - } - time.Sleep(time.Second * 5) - if results, err := s.Read("foo", store.ReadPrefix()); err != nil { - t.Fatalf("Couldn't read all \"foo\" keys, got %# v (%s)", pretty.Formatter(results), err) - } else { - if len(results) != 1 { - t.Fatalf("Expected 1 item, got %d", len(results)) - } - } - if err := s.Delete("foo", func(d *store.DeleteOptions) {}); err != nil { - t.Fatalf("Delete failed (%v)", err) - } - if results, err := s.Read("foo", store.ReadPrefix()); err != nil { - t.Fatalf("Couldn't read all \"foo\" keys, got %# v (%s)", pretty.Formatter(results), err) - } else { - if len(results) != 0 { - t.Fatalf("Expected 0 items, got %d (%# v)", len(results), pretty.Formatter(results)) - } - } - - // Write 3 records with various expiry and get with Suffix - records = []*store.Record{ - &store.Record{ - Key: "foo", - Value: []byte("foofoo"), - }, - &store.Record{ - Key: "barfoo", - Value: []byte("barfoobarfoo"), - Expiry: time.Second * 5, - }, - &store.Record{ - Key: "bazbarfoo", - Value: []byte("bazbarfoobazbarfoo"), - Expiry: 2 * time.Second * 5, - }, - } - for _, r := range records { - if err := s.Write(r); err != nil { - t.Fatalf("Couldn't write k: %s, v: %# v (%s)", r.Key, pretty.Formatter(r.Value), err) - } - } - if results, err := s.Read("foo", store.ReadSuffix()); err != nil { - t.Fatalf("Couldn't read all \"foo\" keys, got %# v (%s)", pretty.Formatter(results), err) - } else { - if len(results) != 3 { - t.Fatalf("Expected 3 items, got %d", len(results)) - } - } - time.Sleep(time.Second * 6) - if results, err := s.Read("foo", store.ReadSuffix()); err != nil { - t.Fatalf("Couldn't read all \"foo\" keys, got %# v (%s)", pretty.Formatter(results), err) - } else { - if len(results) != 2 { - t.Fatalf("Expected 2 items, got %d", len(results)) - } - t.Logf("Prefix test: %v\n", pretty.Formatter(results)) - } - time.Sleep(time.Second * 5) - if results, err := s.Read("foo", store.ReadSuffix()); err != nil { - t.Fatalf("Couldn't read all \"foo\" keys, got %# v (%s)", pretty.Formatter(results), err) - } else { - if len(results) != 1 { - t.Fatalf("Expected 1 item, got %d", len(results)) - } - t.Logf("Prefix test: %# v\n", pretty.Formatter(results)) - } - if err := s.Delete("foo"); err != nil { - t.Fatalf("Delete failed (%v)", err) - } - if results, err := s.Read("foo", store.ReadSuffix()); err != nil { - t.Fatalf("Couldn't read all \"foo\" keys, got %# v (%s)", pretty.Formatter(results), err) - } else { - if len(results) != 0 { - t.Fatalf("Expected 0 items, got %d (%# v)", len(results), pretty.Formatter(results)) - } - } - - // Test Prefix, Suffix and WriteOptions - if err := s.Write(&store.Record{ - Key: "foofoobarbar", - Value: []byte("something"), - }, store.WriteTTL(time.Millisecond*100)); err != nil { - t.Fatal(err) - } - if err := s.Write(&store.Record{ - Key: "foofoo", - Value: []byte("something"), - }, store.WriteExpiry(time.Now().Add(time.Millisecond*100))); err != nil { - t.Fatal(err) - } - if err := s.Write(&store.Record{ - Key: "barbar", - Value: []byte("something"), - // TTL has higher precedence than expiry - }, store.WriteExpiry(time.Now().Add(time.Hour)), store.WriteTTL(time.Millisecond*100)); err != nil { - t.Fatal(err) - } - if results, err := s.Read("foo", store.ReadPrefix(), store.ReadSuffix()); err != nil { - t.Fatal(err) - } else { - if len(results) != 1 { - t.Fatalf("Expected 1 results, got %d: %# v", len(results), pretty.Formatter(results)) - } - } - time.Sleep(time.Second * 6) - if results, err := s.List(); err != nil { - t.Fatalf("List failed: %s", err) - } else { - if len(results) != 0 { - t.Fatal("Expiry options were not effective") - } - } - - s.Init() - for i := 0; i < 10; i++ { - s.Write(&store.Record{ - Key: fmt.Sprintf("a%d", i), - Value: []byte{}, - }) - } - if results, err := s.Read("a", store.ReadLimit(5), store.ReadPrefix()); err != nil { - t.Fatal(err) - } else { - if len(results) != 5 { - t.Fatal("Expected 5 results, got ", len(results)) - } - if results[0].Key != "a0" { - t.Fatalf("Expected a0, got %s", results[0].Key) - } - if results[4].Key != "a4" { - t.Fatalf("Expected a4, got %s", results[4].Key) - } - } - if results, err := s.Read("a", store.ReadLimit(30), store.ReadOffset(5), store.ReadPrefix()); err != nil { - t.Fatal(err) - } else { - if len(results) != 5 { - t.Fatal("Expected 5 results, got ", len(results)) - } - } -}