remove etcd store
This commit is contained in:
parent
e5268dd0a6
commit
57853b2849
@ -1,178 +0,0 @@
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"context"
|
||||
cryptotls "crypto/tls"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/micro/go-micro/v2/store"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// Implement all the options from https://pkg.go.dev/github.com/coreos/etcd/clientv3?tab=doc#Config
|
||||
// Need to use non basic types in context.WithValue
|
||||
type autoSyncInterval string
|
||||
type dialTimeout string
|
||||
type dialKeepAliveTime string
|
||||
type dialKeepAliveTimeout string
|
||||
type maxCallSendMsgSize string
|
||||
type maxCallRecvMsgSize string
|
||||
type tls string
|
||||
type username string
|
||||
type password string
|
||||
type rejectOldCluster string
|
||||
type dialOptions string
|
||||
type clientContext string
|
||||
type permitWithoutStream string
|
||||
|
||||
// AutoSyncInterval is the interval to update endpoints with its latest members.
|
||||
// 0 disables auto-sync. By default auto-sync is disabled.
|
||||
func AutoSyncInterval(d time.Duration) store.Option {
|
||||
return func(o *store.Options) {
|
||||
o.Context = context.WithValue(o.Context, autoSyncInterval(""), d)
|
||||
}
|
||||
}
|
||||
|
||||
// DialTimeout is the timeout for failing to establish a connection.
|
||||
func DialTimeout(d time.Duration) store.Option {
|
||||
return func(o *store.Options) {
|
||||
o.Context = context.WithValue(o.Context, dialTimeout(""), d)
|
||||
}
|
||||
}
|
||||
|
||||
// DialKeepAliveTime is the time after which client pings the server to see if
|
||||
// transport is alive.
|
||||
func DialKeepAliveTime(d time.Duration) store.Option {
|
||||
return func(o *store.Options) {
|
||||
o.Context = context.WithValue(o.Context, dialKeepAliveTime(""), d)
|
||||
}
|
||||
}
|
||||
|
||||
// DialKeepAliveTimeout is the time that the client waits for a response for the
|
||||
// keep-alive probe. If the response is not received in this time, the connection is closed.
|
||||
func DialKeepAliveTimeout(d time.Duration) store.Option {
|
||||
return func(o *store.Options) {
|
||||
o.Context = context.WithValue(o.Context, dialKeepAliveTimeout(""), d)
|
||||
}
|
||||
}
|
||||
|
||||
// MaxCallSendMsgSize is the client-side request send limit in bytes.
|
||||
// If 0, it defaults to 2.0 MiB (2 * 1024 * 1024).
|
||||
// Make sure that "MaxCallSendMsgSize" < server-side default send/recv limit.
|
||||
// ("--max-request-bytes" flag to etcd or "embed.Config.MaxRequestBytes").
|
||||
func MaxCallSendMsgSize(size int) store.Option {
|
||||
return func(o *store.Options) {
|
||||
o.Context = context.WithValue(o.Context, maxCallSendMsgSize(""), size)
|
||||
}
|
||||
}
|
||||
|
||||
// MaxCallRecvMsgSize is the client-side response receive limit.
|
||||
// If 0, it defaults to "math.MaxInt32", because range response can
|
||||
// easily exceed request send limits.
|
||||
// Make sure that "MaxCallRecvMsgSize" >= server-side default send/recv limit.
|
||||
// ("--max-request-bytes" flag to etcd or "embed.Config.MaxRequestBytes").
|
||||
func MaxCallRecvMsgSize(size int) store.Option {
|
||||
return func(o *store.Options) {
|
||||
o.Context = context.WithValue(o.Context, maxCallRecvMsgSize(""), size)
|
||||
}
|
||||
}
|
||||
|
||||
// TLS holds the client secure credentials, if any.
|
||||
func TLS(conf *cryptotls.Config) store.Option {
|
||||
return func(o *store.Options) {
|
||||
t := conf.Clone()
|
||||
o.Context = context.WithValue(o.Context, tls(""), t)
|
||||
}
|
||||
}
|
||||
|
||||
// Username is a user name for authentication.
|
||||
func Username(u string) store.Option {
|
||||
return func(o *store.Options) {
|
||||
o.Context = context.WithValue(o.Context, username(""), u)
|
||||
}
|
||||
}
|
||||
|
||||
// Password is a password for authentication.
|
||||
func Password(p string) store.Option {
|
||||
return func(o *store.Options) {
|
||||
o.Context = context.WithValue(o.Context, password(""), p)
|
||||
}
|
||||
}
|
||||
|
||||
// RejectOldCluster when set will refuse to create a client against an outdated cluster.
|
||||
func RejectOldCluster(b bool) store.Option {
|
||||
return func(o *store.Options) {
|
||||
o.Context = context.WithValue(o.Context, rejectOldCluster(""), b)
|
||||
}
|
||||
}
|
||||
|
||||
// DialOptions is a list of dial options for the grpc client (e.g., for interceptors).
|
||||
// For example, pass "grpc.WithBlock()" to block until the underlying connection is up.
|
||||
// Without this, Dial returns immediately and connecting the server happens in background.
|
||||
func DialOptions(opts []grpc.DialOption) store.Option {
|
||||
return func(o *store.Options) {
|
||||
if len(opts) > 0 {
|
||||
ops := make([]grpc.DialOption, len(opts))
|
||||
copy(ops, opts)
|
||||
o.Context = context.WithValue(o.Context, dialOptions(""), ops)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ClientContext is the default etcd3 client context; it can be used to cancel grpc
|
||||
// dial out andother operations that do not have an explicit context.
|
||||
func ClientContext(ctx context.Context) store.Option {
|
||||
return func(o *store.Options) {
|
||||
o.Context = context.WithValue(o.Context, clientContext(""), ctx)
|
||||
}
|
||||
}
|
||||
|
||||
// PermitWithoutStream when set will allow client to send keepalive pings to server without any active streams(RPCs).
|
||||
func PermitWithoutStream(b bool) store.Option {
|
||||
return func(o *store.Options) {
|
||||
o.Context = context.WithValue(o.Context, permitWithoutStream(""), b)
|
||||
}
|
||||
}
|
||||
|
||||
func (e *etcdStore) applyConfig(cfg *clientv3.Config) {
|
||||
if v := e.options.Context.Value(autoSyncInterval("")); v != nil {
|
||||
cfg.AutoSyncInterval = v.(time.Duration)
|
||||
}
|
||||
if v := e.options.Context.Value(dialTimeout("")); v != nil {
|
||||
cfg.DialTimeout = v.(time.Duration)
|
||||
}
|
||||
if v := e.options.Context.Value(dialKeepAliveTime("")); v != nil {
|
||||
cfg.DialKeepAliveTime = v.(time.Duration)
|
||||
}
|
||||
if v := e.options.Context.Value(dialKeepAliveTimeout("")); v != nil {
|
||||
cfg.DialKeepAliveTimeout = v.(time.Duration)
|
||||
}
|
||||
if v := e.options.Context.Value(maxCallSendMsgSize("")); v != nil {
|
||||
cfg.MaxCallSendMsgSize = v.(int)
|
||||
}
|
||||
if v := e.options.Context.Value(maxCallRecvMsgSize("")); v != nil {
|
||||
cfg.MaxCallRecvMsgSize = v.(int)
|
||||
}
|
||||
if v := e.options.Context.Value(tls("")); v != nil {
|
||||
cfg.TLS = v.(*cryptotls.Config)
|
||||
}
|
||||
if v := e.options.Context.Value(username("")); v != nil {
|
||||
cfg.Username = v.(string)
|
||||
}
|
||||
if v := e.options.Context.Value(password("")); v != nil {
|
||||
cfg.Username = v.(string)
|
||||
}
|
||||
if v := e.options.Context.Value(rejectOldCluster("")); v != nil {
|
||||
cfg.RejectOldCluster = v.(bool)
|
||||
}
|
||||
if v := e.options.Context.Value(dialOptions("")); v != nil {
|
||||
cfg.DialOptions = v.([]grpc.DialOption)
|
||||
}
|
||||
if v := e.options.Context.Value(clientContext("")); v != nil {
|
||||
cfg.Context = v.(context.Context)
|
||||
}
|
||||
if v := e.options.Context.Value(permitWithoutStream("")); v != nil {
|
||||
cfg.PermitWithoutStream = v.(bool)
|
||||
}
|
||||
}
|
@ -1,272 +0,0 @@
|
||||
// Package etcd implements a go-micro/v2/store with etcd
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/gob"
|
||||
"math"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/clientv3/namespace"
|
||||
"github.com/micro/go-micro/v2/store"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type etcdStore struct {
|
||||
options store.Options
|
||||
|
||||
client *clientv3.Client
|
||||
config clientv3.Config
|
||||
}
|
||||
|
||||
// NewStore returns a new etcd store
|
||||
func NewStore(opts ...store.Option) store.Store {
|
||||
e := &etcdStore{}
|
||||
for _, o := range opts {
|
||||
o(&e.options)
|
||||
}
|
||||
e.init()
|
||||
return e
|
||||
}
|
||||
|
||||
func (e *etcdStore) Close() error {
|
||||
return e.client.Close()
|
||||
}
|
||||
|
||||
func (e *etcdStore) Init(opts ...store.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&e.options)
|
||||
}
|
||||
return e.init()
|
||||
}
|
||||
|
||||
func (e *etcdStore) init() error {
|
||||
// ensure context is non-nil
|
||||
e.options.Context = context.Background()
|
||||
// set up config
|
||||
e.config = clientv3.Config{}
|
||||
e.applyConfig(&e.config)
|
||||
if len(e.options.Nodes) == 0 {
|
||||
e.config.Endpoints = []string{"http://127.0.0.1:2379"}
|
||||
} else {
|
||||
e.config.Endpoints = make([]string, len(e.options.Nodes))
|
||||
copy(e.config.Endpoints, e.options.Nodes)
|
||||
}
|
||||
if e.client != nil {
|
||||
e.client.Close()
|
||||
}
|
||||
client, err := clientv3.New(e.config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.client = client
|
||||
ns := ""
|
||||
if len(e.options.Table) > 0 {
|
||||
ns = e.options.Table
|
||||
}
|
||||
if len(e.options.Database) > 0 {
|
||||
ns = e.options.Database + "/" + ns
|
||||
}
|
||||
if len(ns) > 0 {
|
||||
e.client.KV = namespace.NewKV(e.client.KV, ns)
|
||||
e.client.Watcher = namespace.NewWatcher(e.client.Watcher, ns)
|
||||
e.client.Lease = namespace.NewLease(e.client.Lease, ns)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *etcdStore) Options() store.Options {
|
||||
return e.options
|
||||
}
|
||||
|
||||
func (e *etcdStore) String() string {
|
||||
return "etcd"
|
||||
}
|
||||
|
||||
func (e *etcdStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
|
||||
readOpts := store.ReadOptions{}
|
||||
for _, o := range opts {
|
||||
o(&readOpts)
|
||||
}
|
||||
if readOpts.Suffix {
|
||||
return e.readSuffix(key, readOpts)
|
||||
}
|
||||
|
||||
var etcdOpts []clientv3.OpOption
|
||||
if readOpts.Prefix {
|
||||
etcdOpts = append(etcdOpts, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
|
||||
}
|
||||
resp, err := e.client.KV.Get(context.Background(), key, etcdOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if resp.Count == 0 && !(readOpts.Prefix || readOpts.Suffix) {
|
||||
return nil, store.ErrNotFound
|
||||
}
|
||||
var records []*store.Record
|
||||
for _, kv := range resp.Kvs {
|
||||
ir := internalRecord{}
|
||||
if err := gob.NewDecoder(bytes.NewReader(kv.Value)).Decode(&ir); err != nil {
|
||||
return records, errors.Wrapf(err, "couldn't decode %s into internalRecord", err.Error())
|
||||
}
|
||||
r := store.Record{
|
||||
Key: ir.Key,
|
||||
Value: ir.Value,
|
||||
}
|
||||
if !ir.ExpiresAt.IsZero() {
|
||||
r.Expiry = time.Until(ir.ExpiresAt)
|
||||
}
|
||||
records = append(records, &r)
|
||||
}
|
||||
if readOpts.Limit > 0 || readOpts.Offset > 0 {
|
||||
return records[readOpts.Offset:min(readOpts.Limit, uint(len(records)))], nil
|
||||
}
|
||||
return records, nil
|
||||
}
|
||||
|
||||
func (e *etcdStore) readSuffix(key string, readOpts store.ReadOptions) ([]*store.Record, error) {
|
||||
opts := []store.ListOption{store.ListSuffix(key)}
|
||||
if readOpts.Prefix {
|
||||
opts = append(opts, store.ListPrefix(key))
|
||||
}
|
||||
keys, err := e.List(opts...)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "Couldn't list with suffix %s", key)
|
||||
}
|
||||
var records []*store.Record
|
||||
for _, k := range keys {
|
||||
resp, err := e.client.KV.Get(context.Background(), k)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "Couldn't get key %s", k)
|
||||
}
|
||||
ir := internalRecord{}
|
||||
if err := gob.NewDecoder(bytes.NewReader(resp.Kvs[0].Value)).Decode(&ir); err != nil {
|
||||
return records, errors.Wrapf(err, "couldn't decode %s into internalRecord", err.Error())
|
||||
}
|
||||
r := store.Record{
|
||||
Key: ir.Key,
|
||||
Value: ir.Value,
|
||||
}
|
||||
if !ir.ExpiresAt.IsZero() {
|
||||
r.Expiry = time.Until(ir.ExpiresAt)
|
||||
}
|
||||
records = append(records, &r)
|
||||
|
||||
}
|
||||
if readOpts.Limit > 0 || readOpts.Offset > 0 {
|
||||
return records[readOpts.Offset:min(readOpts.Limit, uint(len(records)))], nil
|
||||
}
|
||||
return records, nil
|
||||
}
|
||||
|
||||
func (e *etcdStore) Write(r *store.Record, opts ...store.WriteOption) error {
|
||||
options := store.WriteOptions{}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
if len(opts) > 0 {
|
||||
// Copy the record before applying options, or the incoming record will be mutated
|
||||
newRecord := store.Record{}
|
||||
newRecord.Key = r.Key
|
||||
newRecord.Value = make([]byte, len(r.Value))
|
||||
copy(newRecord.Value, r.Value)
|
||||
newRecord.Expiry = r.Expiry
|
||||
|
||||
if !options.Expiry.IsZero() {
|
||||
newRecord.Expiry = time.Until(options.Expiry)
|
||||
}
|
||||
if options.TTL != 0 {
|
||||
newRecord.Expiry = options.TTL
|
||||
}
|
||||
return e.write(&newRecord)
|
||||
}
|
||||
return e.write(r)
|
||||
}
|
||||
|
||||
func (e *etcdStore) write(r *store.Record) error {
|
||||
var putOpts []clientv3.OpOption
|
||||
ir := &internalRecord{}
|
||||
ir.Key = r.Key
|
||||
ir.Value = make([]byte, len(r.Value))
|
||||
copy(ir.Value, r.Value)
|
||||
if r.Expiry != 0 {
|
||||
ir.ExpiresAt = time.Now().Add(r.Expiry)
|
||||
var leasexpiry int64
|
||||
if r.Expiry.Seconds() < 5.0 {
|
||||
// minimum etcd lease is 5 seconds
|
||||
leasexpiry = 5
|
||||
} else {
|
||||
leasexpiry = int64(math.Ceil(r.Expiry.Seconds()))
|
||||
}
|
||||
lr, err := e.client.Lease.Grant(context.Background(), leasexpiry)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "couldn't grant an etcd lease for %s", r.Key)
|
||||
}
|
||||
putOpts = append(putOpts, clientv3.WithLease(lr.ID))
|
||||
}
|
||||
b := &bytes.Buffer{}
|
||||
if err := gob.NewEncoder(b).Encode(ir); err != nil {
|
||||
return errors.Wrapf(err, "couldn't encode %s", r.Key)
|
||||
}
|
||||
_, err := e.client.KV.Put(context.Background(), ir.Key, string(b.Bytes()), putOpts...)
|
||||
return errors.Wrapf(err, "couldn't put key %s in to etcd", err)
|
||||
}
|
||||
|
||||
func (e *etcdStore) Delete(key string, opts ...store.DeleteOption) error {
|
||||
options := store.DeleteOptions{}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
_, err := e.client.KV.Delete(context.Background(), key)
|
||||
return errors.Wrapf(err, "couldn't delete key %s", key)
|
||||
}
|
||||
|
||||
func (e *etcdStore) List(opts ...store.ListOption) ([]string, error) {
|
||||
options := store.ListOptions{}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
searchPrefix := ""
|
||||
if len(options.Prefix) > 0 {
|
||||
searchPrefix = options.Prefix
|
||||
}
|
||||
resp, err := e.client.KV.Get(context.Background(), searchPrefix, clientv3.WithPrefix(), clientv3.WithKeysOnly(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "couldn't list, etcd get failed")
|
||||
}
|
||||
if len(options.Suffix) == 0 {
|
||||
keys := make([]string, resp.Count)
|
||||
for i, kv := range resp.Kvs {
|
||||
keys[i] = string(kv.Key)
|
||||
}
|
||||
return keys, nil
|
||||
}
|
||||
keys := []string{}
|
||||
for _, kv := range resp.Kvs {
|
||||
if strings.HasSuffix(string(kv.Key), options.Suffix) {
|
||||
keys = append(keys, string(kv.Key))
|
||||
}
|
||||
}
|
||||
if options.Limit > 0 || options.Offset > 0 {
|
||||
return keys[options.Offset:min(options.Limit, uint(len(keys)))], nil
|
||||
}
|
||||
return keys, nil
|
||||
}
|
||||
|
||||
type internalRecord struct {
|
||||
Key string
|
||||
Value []byte
|
||||
ExpiresAt time.Time
|
||||
}
|
||||
|
||||
func min(i, j uint) uint {
|
||||
if i < j {
|
||||
return i
|
||||
}
|
||||
return j
|
||||
}
|
@ -1,225 +0,0 @@
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/kr/pretty"
|
||||
"github.com/micro/go-micro/v2/store"
|
||||
)
|
||||
|
||||
func TestEtcd(t *testing.T) {
|
||||
e := NewStore()
|
||||
if err := e.Init(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
//basictest(e, t)
|
||||
}
|
||||
|
||||
func basictest(s store.Store, t *testing.T) {
|
||||
t.Logf("Testing store %s, with options %# v\n", s.String(), pretty.Formatter(s.Options()))
|
||||
// Read and Write an expiring Record
|
||||
if err := s.Write(&store.Record{
|
||||
Key: "Hello",
|
||||
Value: []byte("World"),
|
||||
Expiry: time.Second * 5,
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if r, err := s.Read("Hello"); err != nil {
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
if len(r) != 1 {
|
||||
t.Fatal("Read returned multiple records")
|
||||
}
|
||||
if r[0].Key != "Hello" {
|
||||
t.Fatalf("Expected %s, got %s", "Hello", r[0].Key)
|
||||
}
|
||||
if string(r[0].Value) != "World" {
|
||||
t.Fatalf("Expected %s, got %s", "World", r[0].Value)
|
||||
}
|
||||
}
|
||||
time.Sleep(time.Second * 6)
|
||||
if records, err := s.Read("Hello"); err != store.ErrNotFound {
|
||||
t.Fatalf("Expected %# v, got %# v\nResults were %# v", store.ErrNotFound, err, pretty.Formatter(records))
|
||||
}
|
||||
|
||||
// Write 3 records with various expiry and get with prefix
|
||||
records := []*store.Record{
|
||||
&store.Record{
|
||||
Key: "foo",
|
||||
Value: []byte("foofoo"),
|
||||
},
|
||||
&store.Record{
|
||||
Key: "foobar",
|
||||
Value: []byte("foobarfoobar"),
|
||||
Expiry: time.Second * 5,
|
||||
},
|
||||
&store.Record{
|
||||
Key: "foobarbaz",
|
||||
Value: []byte("foobarbazfoobarbaz"),
|
||||
Expiry: 2 * time.Second * 5,
|
||||
},
|
||||
}
|
||||
for _, r := range records {
|
||||
if err := s.Write(r); err != nil {
|
||||
t.Fatalf("Couldn't write k: %s, v: %# v (%s)", r.Key, pretty.Formatter(r.Value), err)
|
||||
}
|
||||
}
|
||||
if results, err := s.Read("foo", store.ReadPrefix()); err != nil {
|
||||
t.Fatalf("Couldn't read all \"foo\" keys, got %# v (%s)", pretty.Formatter(results), err)
|
||||
} else {
|
||||
if len(results) != 3 {
|
||||
t.Fatalf("Expected 3 items, got %d", len(results))
|
||||
}
|
||||
}
|
||||
time.Sleep(time.Second * 6)
|
||||
if results, err := s.Read("foo", store.ReadPrefix()); err != nil {
|
||||
t.Fatalf("Couldn't read all \"foo\" keys, got %# v (%s)", pretty.Formatter(results), err)
|
||||
} else {
|
||||
if len(results) != 2 {
|
||||
t.Fatalf("Expected 2 items, got %d", len(results))
|
||||
}
|
||||
}
|
||||
time.Sleep(time.Second * 5)
|
||||
if results, err := s.Read("foo", store.ReadPrefix()); err != nil {
|
||||
t.Fatalf("Couldn't read all \"foo\" keys, got %# v (%s)", pretty.Formatter(results), err)
|
||||
} else {
|
||||
if len(results) != 1 {
|
||||
t.Fatalf("Expected 1 item, got %d", len(results))
|
||||
}
|
||||
}
|
||||
if err := s.Delete("foo", func(d *store.DeleteOptions) {}); err != nil {
|
||||
t.Fatalf("Delete failed (%v)", err)
|
||||
}
|
||||
if results, err := s.Read("foo", store.ReadPrefix()); err != nil {
|
||||
t.Fatalf("Couldn't read all \"foo\" keys, got %# v (%s)", pretty.Formatter(results), err)
|
||||
} else {
|
||||
if len(results) != 0 {
|
||||
t.Fatalf("Expected 0 items, got %d (%# v)", len(results), pretty.Formatter(results))
|
||||
}
|
||||
}
|
||||
|
||||
// Write 3 records with various expiry and get with Suffix
|
||||
records = []*store.Record{
|
||||
&store.Record{
|
||||
Key: "foo",
|
||||
Value: []byte("foofoo"),
|
||||
},
|
||||
&store.Record{
|
||||
Key: "barfoo",
|
||||
Value: []byte("barfoobarfoo"),
|
||||
Expiry: time.Second * 5,
|
||||
},
|
||||
&store.Record{
|
||||
Key: "bazbarfoo",
|
||||
Value: []byte("bazbarfoobazbarfoo"),
|
||||
Expiry: 2 * time.Second * 5,
|
||||
},
|
||||
}
|
||||
for _, r := range records {
|
||||
if err := s.Write(r); err != nil {
|
||||
t.Fatalf("Couldn't write k: %s, v: %# v (%s)", r.Key, pretty.Formatter(r.Value), err)
|
||||
}
|
||||
}
|
||||
if results, err := s.Read("foo", store.ReadSuffix()); err != nil {
|
||||
t.Fatalf("Couldn't read all \"foo\" keys, got %# v (%s)", pretty.Formatter(results), err)
|
||||
} else {
|
||||
if len(results) != 3 {
|
||||
t.Fatalf("Expected 3 items, got %d", len(results))
|
||||
}
|
||||
}
|
||||
time.Sleep(time.Second * 6)
|
||||
if results, err := s.Read("foo", store.ReadSuffix()); err != nil {
|
||||
t.Fatalf("Couldn't read all \"foo\" keys, got %# v (%s)", pretty.Formatter(results), err)
|
||||
} else {
|
||||
if len(results) != 2 {
|
||||
t.Fatalf("Expected 2 items, got %d", len(results))
|
||||
}
|
||||
t.Logf("Prefix test: %v\n", pretty.Formatter(results))
|
||||
}
|
||||
time.Sleep(time.Second * 5)
|
||||
if results, err := s.Read("foo", store.ReadSuffix()); err != nil {
|
||||
t.Fatalf("Couldn't read all \"foo\" keys, got %# v (%s)", pretty.Formatter(results), err)
|
||||
} else {
|
||||
if len(results) != 1 {
|
||||
t.Fatalf("Expected 1 item, got %d", len(results))
|
||||
}
|
||||
t.Logf("Prefix test: %# v\n", pretty.Formatter(results))
|
||||
}
|
||||
if err := s.Delete("foo"); err != nil {
|
||||
t.Fatalf("Delete failed (%v)", err)
|
||||
}
|
||||
if results, err := s.Read("foo", store.ReadSuffix()); err != nil {
|
||||
t.Fatalf("Couldn't read all \"foo\" keys, got %# v (%s)", pretty.Formatter(results), err)
|
||||
} else {
|
||||
if len(results) != 0 {
|
||||
t.Fatalf("Expected 0 items, got %d (%# v)", len(results), pretty.Formatter(results))
|
||||
}
|
||||
}
|
||||
|
||||
// Test Prefix, Suffix and WriteOptions
|
||||
if err := s.Write(&store.Record{
|
||||
Key: "foofoobarbar",
|
||||
Value: []byte("something"),
|
||||
}, store.WriteTTL(time.Millisecond*100)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := s.Write(&store.Record{
|
||||
Key: "foofoo",
|
||||
Value: []byte("something"),
|
||||
}, store.WriteExpiry(time.Now().Add(time.Millisecond*100))); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := s.Write(&store.Record{
|
||||
Key: "barbar",
|
||||
Value: []byte("something"),
|
||||
// TTL has higher precedence than expiry
|
||||
}, store.WriteExpiry(time.Now().Add(time.Hour)), store.WriteTTL(time.Millisecond*100)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if results, err := s.Read("foo", store.ReadPrefix(), store.ReadSuffix()); err != nil {
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
if len(results) != 1 {
|
||||
t.Fatalf("Expected 1 results, got %d: %# v", len(results), pretty.Formatter(results))
|
||||
}
|
||||
}
|
||||
time.Sleep(time.Second * 6)
|
||||
if results, err := s.List(); err != nil {
|
||||
t.Fatalf("List failed: %s", err)
|
||||
} else {
|
||||
if len(results) != 0 {
|
||||
t.Fatal("Expiry options were not effective")
|
||||
}
|
||||
}
|
||||
|
||||
s.Init()
|
||||
for i := 0; i < 10; i++ {
|
||||
s.Write(&store.Record{
|
||||
Key: fmt.Sprintf("a%d", i),
|
||||
Value: []byte{},
|
||||
})
|
||||
}
|
||||
if results, err := s.Read("a", store.ReadLimit(5), store.ReadPrefix()); err != nil {
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
if len(results) != 5 {
|
||||
t.Fatal("Expected 5 results, got ", len(results))
|
||||
}
|
||||
if results[0].Key != "a0" {
|
||||
t.Fatalf("Expected a0, got %s", results[0].Key)
|
||||
}
|
||||
if results[4].Key != "a4" {
|
||||
t.Fatalf("Expected a4, got %s", results[4].Key)
|
||||
}
|
||||
}
|
||||
if results, err := s.Read("a", store.ReadLimit(30), store.ReadOffset(5), store.ReadPrefix()); err != nil {
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
if len(results) != 5 {
|
||||
t.Fatal("Expected 5 results, got ", len(results))
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user