From a96f6adf07f334b1ff61bbb8f1c1e547bef1e996 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Fri, 11 Oct 2019 14:08:50 +0100 Subject: [PATCH] store handler implementation --- store/etcd/etcd.go | 59 ++- store/memory/memory.go | 58 +-- store/memory/memory_test.go | 2 +- store/service/handler/handler.go | 89 +++++ store/service/proto/store.micro.go | 207 ++++++++++ store/service/proto/store.pb.go | 618 +++++++++++++++++++++++++++++ store/service/proto/store.proto | 48 +++ store/store.go | 6 +- sync/map.go | 6 +- 9 files changed, 1048 insertions(+), 45 deletions(-) create mode 100644 store/service/handler/handler.go create mode 100644 store/service/proto/store.micro.go create mode 100644 store/service/proto/store.pb.go create mode 100644 store/service/proto/store.proto diff --git a/store/etcd/etcd.go b/store/etcd/etcd.go index 7d866d63..d0094fb4 100644 --- a/store/etcd/etcd.go +++ b/store/etcd/etcd.go @@ -5,6 +5,7 @@ import ( "context" "log" + "github.com/coreos/etcd/mvcc/mvccpb" client "github.com/coreos/etcd/clientv3" "github.com/micro/go-micro/config/options" "github.com/micro/go-micro/store" @@ -15,30 +16,56 @@ type ekv struct { kv client.KV } -func (e *ekv) Read(key string) (*store.Record, error) { - keyval, err := e.kv.Get(context.Background(), key) - if err != nil { - return nil, err +func (e *ekv) Read(keys ...string) ([]*store.Record, error) { + var values []*mvccpb.KeyValue + + for _, key := range keys { + keyval, err := e.kv.Get(context.Background(), key) + if err != nil { + return nil, err + } + + if keyval == nil || len(keyval.Kvs) == 0 { + return nil, store.ErrNotFound + } + + values = append(values, keyval.Kvs...) } - if keyval == nil || len(keyval.Kvs) == 0 { - return nil, store.ErrNotFound + var records []*store.Record + + for _, kv := range values { + records = append(records, &store.Record{ + Key: string(kv.Key), + Value: kv.Value, + // TODO: implement expiry + }) } - return &store.Record{ - Key: string(keyval.Kvs[0].Key), - Value: keyval.Kvs[0].Value, - }, nil + return records, nil } -func (e *ekv) Delete(key string) error { - _, err := e.kv.Delete(context.Background(), key) - return err +func (e *ekv) Delete(keys ...string) error { + var gerr error + for _, key := range keys { + _, err := e.kv.Delete(context.Background(), key) + if err != nil { + gerr = err + } + } + return gerr } -func (e *ekv) Write(record *store.Record) error { - _, err := e.kv.Put(context.Background(), record.Key, string(record.Value)) - return err +func (e *ekv) Write(records ...*store.Record) error { + var gerr error + for _, record := range records { + // TODO create lease to expire keys + _, err := e.kv.Put(context.Background(), record.Key, string(record.Value)) + if err != nil { + gerr = err + } + } + return gerr } func (e *ekv) Sync() ([]*store.Record, error) { diff --git a/store/memory/memory.go b/store/memory/memory.go index a7d5a83c..3eeab31d 100644 --- a/store/memory/memory.go +++ b/store/memory/memory.go @@ -48,51 +48,61 @@ func (m *memoryStore) Sync() ([]*store.Record, error) { return values, nil } -func (m *memoryStore) Read(key string) (*store.Record, error) { +func (m *memoryStore) Read(keys ...string) ([]*store.Record, error) { m.RLock() defer m.RUnlock() - v, ok := m.values[key] - if !ok { - return nil, store.ErrNotFound - } + var records []*store.Record - // get expiry - d := v.r.Expiry - t := time.Since(v.c) - - // expired - if d > time.Duration(0) { - if t > d { + for _, key := range keys { + v, ok := m.values[key] + if !ok { return nil, store.ErrNotFound } - // update expiry - v.r.Expiry -= t - v.c = time.Now() + + // get expiry + d := v.r.Expiry + t := time.Since(v.c) + + // expired + if d > time.Duration(0) { + if t > d { + return nil, store.ErrNotFound + } + // update expiry + v.r.Expiry -= t + v.c = time.Now() + } + + records = append(records, v.r) } - return v.r, nil + return records, nil } -func (m *memoryStore) Write(r *store.Record) error { +func (m *memoryStore) Write(records ...*store.Record) error { m.Lock() defer m.Unlock() - // set the record - m.values[r.Key] = &memoryRecord{ - r: r, - c: time.Now(), + for _, r := range records { + // set the record + m.values[r.Key] = &memoryRecord{ + r: r, + c: time.Now(), + } } return nil } -func (m *memoryStore) Delete(key string) error { +func (m *memoryStore) Delete(keys ...string) error { m.Lock() defer m.Unlock() - // delete the value - delete(m.values, key) + for _, key := range keys { + // delete the value + delete(m.values, key) + } return nil } diff --git a/store/memory/memory_test.go b/store/memory/memory_test.go index 639d18db..de889856 100644 --- a/store/memory/memory_test.go +++ b/store/memory/memory_test.go @@ -25,7 +25,7 @@ func TestReadRecordExpire(t *testing.T) { if err != nil { t.Fatal(err) } - if rrec.Expiry >= expire { + if rrec[0].Expiry >= expire { t.Fatal("expiry of read record is not changed") } diff --git a/store/service/handler/handler.go b/store/service/handler/handler.go new file mode 100644 index 00000000..51af02b6 --- /dev/null +++ b/store/service/handler/handler.go @@ -0,0 +1,89 @@ +package handler + +import ( + "context" + "io" + "time" + + "github.com/micro/go-micro/errors" + "github.com/micro/go-micro/store" + pb "github.com/micro/go-micro/store/service/proto" +) + +type Store struct { + Store store.Store +} + +func (s *Store) Read(ctx context.Context, req *pb.ReadRequest, rsp *pb.ReadResponse) error { + vals, err := s.Store.Read(req.Keys...) + if err != nil { + return errors.InternalServerError("go.micro.store", err.Error()) + } + for _, val := range vals { + rsp.Records = append(rsp.Records, &pb.Record{ + Key: val.Key, + Value: val.Value, + Expiry: int64(val.Expiry.Seconds()), + }) + } + return nil +} + +func (s *Store) Write(ctx context.Context, req *pb.WriteRequest, rsp *pb.WriteResponse) error { + var records []*store.Record + + for _, record := range req.Records { + records = append(records, &store.Record{ + Key: record.Key, + Value: record.Value, + Expiry: time.Duration(record.Expiry) * time.Second, + }) + } + + err := s.Store.Write(records...) + if err != nil { + return errors.InternalServerError("go.micro.store", err.Error()) + } + return nil +} + +func (s *Store) Delete(ctx context.Context, req *pb.DeleteRequest, rsp *pb.DeleteResponse) error { + err := s.Store.Delete(req.Keys...) + if err != nil { + return errors.InternalServerError("go.micro.store", err.Error()) + } + return nil +} + +func (s *Store) Sync(ctx context.Context, req *pb.SyncRequest, stream pb.Store_SyncStream) error { + var vals []*store.Record + var err error + + if len(req.Key) > 0 { + vals, err = s.Store.Read(req.Key) + } else { + vals, err = s.Store.Sync() + } + if err != nil { + return errors.InternalServerError("go.micro.store", err.Error()) + } + rsp := new(pb.SyncResponse) + + // TODO: batch sync + for _, val := range vals { + rsp.Records = append(rsp.Records, &pb.Record{ + Key: val.Key, + Value: val.Value, + Expiry: int64(val.Expiry.Seconds()), + }) + } + + err = stream.Send(rsp) + if err == io.EOF { + return nil + } + if err != nil { + return errors.InternalServerError("go.micro.store", err.Error()) + } + return nil +} diff --git a/store/service/proto/store.micro.go b/store/service/proto/store.micro.go new file mode 100644 index 00000000..ce4885ec --- /dev/null +++ b/store/service/proto/store.micro.go @@ -0,0 +1,207 @@ +// Code generated by protoc-gen-micro. DO NOT EDIT. +// source: micro/go-micro/store/service/proto/store.proto + +package go_micro_store + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +import ( + context "context" + client "github.com/micro/go-micro/client" + server "github.com/micro/go-micro/server" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ client.Option +var _ server.Option + +// Client API for Store service + +type StoreService interface { + Read(ctx context.Context, in *ReadRequest, opts ...client.CallOption) (*ReadResponse, error) + Write(ctx context.Context, in *WriteRequest, opts ...client.CallOption) (*WriteResponse, error) + Delete(ctx context.Context, in *DeleteRequest, opts ...client.CallOption) (*DeleteResponse, error) + Sync(ctx context.Context, in *SyncRequest, opts ...client.CallOption) (Store_SyncService, error) +} + +type storeService struct { + c client.Client + name string +} + +func NewStoreService(name string, c client.Client) StoreService { + if c == nil { + c = client.NewClient() + } + if len(name) == 0 { + name = "go.micro.store" + } + return &storeService{ + c: c, + name: name, + } +} + +func (c *storeService) Read(ctx context.Context, in *ReadRequest, opts ...client.CallOption) (*ReadResponse, error) { + req := c.c.NewRequest(c.name, "Store.Read", in) + out := new(ReadResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *storeService) Write(ctx context.Context, in *WriteRequest, opts ...client.CallOption) (*WriteResponse, error) { + req := c.c.NewRequest(c.name, "Store.Write", in) + out := new(WriteResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *storeService) Delete(ctx context.Context, in *DeleteRequest, opts ...client.CallOption) (*DeleteResponse, error) { + req := c.c.NewRequest(c.name, "Store.Delete", in) + out := new(DeleteResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *storeService) Sync(ctx context.Context, in *SyncRequest, opts ...client.CallOption) (Store_SyncService, error) { + req := c.c.NewRequest(c.name, "Store.Sync", &SyncRequest{}) + stream, err := c.c.Stream(ctx, req, opts...) + if err != nil { + return nil, err + } + if err := stream.Send(in); err != nil { + return nil, err + } + return &storeServiceSync{stream}, nil +} + +type Store_SyncService interface { + SendMsg(interface{}) error + RecvMsg(interface{}) error + Close() error + Recv() (*SyncResponse, error) +} + +type storeServiceSync struct { + stream client.Stream +} + +func (x *storeServiceSync) Close() error { + return x.stream.Close() +} + +func (x *storeServiceSync) SendMsg(m interface{}) error { + return x.stream.Send(m) +} + +func (x *storeServiceSync) RecvMsg(m interface{}) error { + return x.stream.Recv(m) +} + +func (x *storeServiceSync) Recv() (*SyncResponse, error) { + m := new(SyncResponse) + err := x.stream.Recv(m) + if err != nil { + return nil, err + } + return m, nil +} + +// Server API for Store service + +type StoreHandler interface { + Read(context.Context, *ReadRequest, *ReadResponse) error + Write(context.Context, *WriteRequest, *WriteResponse) error + Delete(context.Context, *DeleteRequest, *DeleteResponse) error + Sync(context.Context, *SyncRequest, Store_SyncStream) error +} + +func RegisterStoreHandler(s server.Server, hdlr StoreHandler, opts ...server.HandlerOption) error { + type store interface { + Read(ctx context.Context, in *ReadRequest, out *ReadResponse) error + Write(ctx context.Context, in *WriteRequest, out *WriteResponse) error + Delete(ctx context.Context, in *DeleteRequest, out *DeleteResponse) error + Sync(ctx context.Context, stream server.Stream) error + } + type Store struct { + store + } + h := &storeHandler{hdlr} + return s.Handle(s.NewHandler(&Store{h}, opts...)) +} + +type storeHandler struct { + StoreHandler +} + +func (h *storeHandler) Read(ctx context.Context, in *ReadRequest, out *ReadResponse) error { + return h.StoreHandler.Read(ctx, in, out) +} + +func (h *storeHandler) Write(ctx context.Context, in *WriteRequest, out *WriteResponse) error { + return h.StoreHandler.Write(ctx, in, out) +} + +func (h *storeHandler) Delete(ctx context.Context, in *DeleteRequest, out *DeleteResponse) error { + return h.StoreHandler.Delete(ctx, in, out) +} + +func (h *storeHandler) Sync(ctx context.Context, stream server.Stream) error { + m := new(SyncRequest) + if err := stream.Recv(m); err != nil { + return err + } + return h.StoreHandler.Sync(ctx, m, &storeSyncStream{stream}) +} + +type Store_SyncStream interface { + SendMsg(interface{}) error + RecvMsg(interface{}) error + Close() error + Send(*SyncResponse) error +} + +type storeSyncStream struct { + stream server.Stream +} + +func (x *storeSyncStream) Close() error { + return x.stream.Close() +} + +func (x *storeSyncStream) SendMsg(m interface{}) error { + return x.stream.Send(m) +} + +func (x *storeSyncStream) RecvMsg(m interface{}) error { + return x.stream.Recv(m) +} + +func (x *storeSyncStream) Send(m *SyncResponse) error { + return x.stream.Send(m) +} diff --git a/store/service/proto/store.pb.go b/store/service/proto/store.pb.go new file mode 100644 index 00000000..ce166cb7 --- /dev/null +++ b/store/service/proto/store.pb.go @@ -0,0 +1,618 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: micro/go-micro/store/service/proto/store.proto + +package go_micro_store + +import ( + context "context" + fmt "fmt" + proto "github.com/golang/protobuf/proto" + grpc "google.golang.org/grpc" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type Record struct { + // key of the record + Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + // value in the record + Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + // timestamp in unix seconds + Expiry int64 `protobuf:"varint,3,opt,name=expiry,proto3" json:"expiry,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Record) Reset() { *m = Record{} } +func (m *Record) String() string { return proto.CompactTextString(m) } +func (*Record) ProtoMessage() {} +func (*Record) Descriptor() ([]byte, []int) { + return fileDescriptor_f84ccc98e143ed3e, []int{0} +} + +func (m *Record) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Record.Unmarshal(m, b) +} +func (m *Record) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Record.Marshal(b, m, deterministic) +} +func (m *Record) XXX_Merge(src proto.Message) { + xxx_messageInfo_Record.Merge(m, src) +} +func (m *Record) XXX_Size() int { + return xxx_messageInfo_Record.Size(m) +} +func (m *Record) XXX_DiscardUnknown() { + xxx_messageInfo_Record.DiscardUnknown(m) +} + +var xxx_messageInfo_Record proto.InternalMessageInfo + +func (m *Record) GetKey() string { + if m != nil { + return m.Key + } + return "" +} + +func (m *Record) GetValue() []byte { + if m != nil { + return m.Value + } + return nil +} + +func (m *Record) GetExpiry() int64 { + if m != nil { + return m.Expiry + } + return 0 +} + +type ReadRequest struct { + Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ReadRequest) Reset() { *m = ReadRequest{} } +func (m *ReadRequest) String() string { return proto.CompactTextString(m) } +func (*ReadRequest) ProtoMessage() {} +func (*ReadRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_f84ccc98e143ed3e, []int{1} +} + +func (m *ReadRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ReadRequest.Unmarshal(m, b) +} +func (m *ReadRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ReadRequest.Marshal(b, m, deterministic) +} +func (m *ReadRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ReadRequest.Merge(m, src) +} +func (m *ReadRequest) XXX_Size() int { + return xxx_messageInfo_ReadRequest.Size(m) +} +func (m *ReadRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ReadRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_ReadRequest proto.InternalMessageInfo + +func (m *ReadRequest) GetKeys() []string { + if m != nil { + return m.Keys + } + return nil +} + +type ReadResponse struct { + Records []*Record `protobuf:"bytes,1,rep,name=records,proto3" json:"records,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ReadResponse) Reset() { *m = ReadResponse{} } +func (m *ReadResponse) String() string { return proto.CompactTextString(m) } +func (*ReadResponse) ProtoMessage() {} +func (*ReadResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_f84ccc98e143ed3e, []int{2} +} + +func (m *ReadResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ReadResponse.Unmarshal(m, b) +} +func (m *ReadResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ReadResponse.Marshal(b, m, deterministic) +} +func (m *ReadResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_ReadResponse.Merge(m, src) +} +func (m *ReadResponse) XXX_Size() int { + return xxx_messageInfo_ReadResponse.Size(m) +} +func (m *ReadResponse) XXX_DiscardUnknown() { + xxx_messageInfo_ReadResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_ReadResponse proto.InternalMessageInfo + +func (m *ReadResponse) GetRecords() []*Record { + if m != nil { + return m.Records + } + return nil +} + +type WriteRequest struct { + Records []*Record `protobuf:"bytes,2,rep,name=records,proto3" json:"records,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *WriteRequest) Reset() { *m = WriteRequest{} } +func (m *WriteRequest) String() string { return proto.CompactTextString(m) } +func (*WriteRequest) ProtoMessage() {} +func (*WriteRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_f84ccc98e143ed3e, []int{3} +} + +func (m *WriteRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_WriteRequest.Unmarshal(m, b) +} +func (m *WriteRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_WriteRequest.Marshal(b, m, deterministic) +} +func (m *WriteRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_WriteRequest.Merge(m, src) +} +func (m *WriteRequest) XXX_Size() int { + return xxx_messageInfo_WriteRequest.Size(m) +} +func (m *WriteRequest) XXX_DiscardUnknown() { + xxx_messageInfo_WriteRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_WriteRequest proto.InternalMessageInfo + +func (m *WriteRequest) GetRecords() []*Record { + if m != nil { + return m.Records + } + return nil +} + +type WriteResponse struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *WriteResponse) Reset() { *m = WriteResponse{} } +func (m *WriteResponse) String() string { return proto.CompactTextString(m) } +func (*WriteResponse) ProtoMessage() {} +func (*WriteResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_f84ccc98e143ed3e, []int{4} +} + +func (m *WriteResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_WriteResponse.Unmarshal(m, b) +} +func (m *WriteResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_WriteResponse.Marshal(b, m, deterministic) +} +func (m *WriteResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_WriteResponse.Merge(m, src) +} +func (m *WriteResponse) XXX_Size() int { + return xxx_messageInfo_WriteResponse.Size(m) +} +func (m *WriteResponse) XXX_DiscardUnknown() { + xxx_messageInfo_WriteResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_WriteResponse proto.InternalMessageInfo + +type DeleteRequest struct { + Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *DeleteRequest) Reset() { *m = DeleteRequest{} } +func (m *DeleteRequest) String() string { return proto.CompactTextString(m) } +func (*DeleteRequest) ProtoMessage() {} +func (*DeleteRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_f84ccc98e143ed3e, []int{5} +} + +func (m *DeleteRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_DeleteRequest.Unmarshal(m, b) +} +func (m *DeleteRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_DeleteRequest.Marshal(b, m, deterministic) +} +func (m *DeleteRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_DeleteRequest.Merge(m, src) +} +func (m *DeleteRequest) XXX_Size() int { + return xxx_messageInfo_DeleteRequest.Size(m) +} +func (m *DeleteRequest) XXX_DiscardUnknown() { + xxx_messageInfo_DeleteRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_DeleteRequest proto.InternalMessageInfo + +func (m *DeleteRequest) GetKeys() []string { + if m != nil { + return m.Keys + } + return nil +} + +type DeleteResponse struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *DeleteResponse) Reset() { *m = DeleteResponse{} } +func (m *DeleteResponse) String() string { return proto.CompactTextString(m) } +func (*DeleteResponse) ProtoMessage() {} +func (*DeleteResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_f84ccc98e143ed3e, []int{6} +} + +func (m *DeleteResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_DeleteResponse.Unmarshal(m, b) +} +func (m *DeleteResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_DeleteResponse.Marshal(b, m, deterministic) +} +func (m *DeleteResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_DeleteResponse.Merge(m, src) +} +func (m *DeleteResponse) XXX_Size() int { + return xxx_messageInfo_DeleteResponse.Size(m) +} +func (m *DeleteResponse) XXX_DiscardUnknown() { + xxx_messageInfo_DeleteResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_DeleteResponse proto.InternalMessageInfo + +type SyncRequest struct { + // optional key + Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *SyncRequest) Reset() { *m = SyncRequest{} } +func (m *SyncRequest) String() string { return proto.CompactTextString(m) } +func (*SyncRequest) ProtoMessage() {} +func (*SyncRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_f84ccc98e143ed3e, []int{7} +} + +func (m *SyncRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_SyncRequest.Unmarshal(m, b) +} +func (m *SyncRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_SyncRequest.Marshal(b, m, deterministic) +} +func (m *SyncRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_SyncRequest.Merge(m, src) +} +func (m *SyncRequest) XXX_Size() int { + return xxx_messageInfo_SyncRequest.Size(m) +} +func (m *SyncRequest) XXX_DiscardUnknown() { + xxx_messageInfo_SyncRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_SyncRequest proto.InternalMessageInfo + +func (m *SyncRequest) GetKey() string { + if m != nil { + return m.Key + } + return "" +} + +type SyncResponse struct { + Records []*Record `protobuf:"bytes,1,rep,name=records,proto3" json:"records,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *SyncResponse) Reset() { *m = SyncResponse{} } +func (m *SyncResponse) String() string { return proto.CompactTextString(m) } +func (*SyncResponse) ProtoMessage() {} +func (*SyncResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_f84ccc98e143ed3e, []int{8} +} + +func (m *SyncResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_SyncResponse.Unmarshal(m, b) +} +func (m *SyncResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_SyncResponse.Marshal(b, m, deterministic) +} +func (m *SyncResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_SyncResponse.Merge(m, src) +} +func (m *SyncResponse) XXX_Size() int { + return xxx_messageInfo_SyncResponse.Size(m) +} +func (m *SyncResponse) XXX_DiscardUnknown() { + xxx_messageInfo_SyncResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_SyncResponse proto.InternalMessageInfo + +func (m *SyncResponse) GetRecords() []*Record { + if m != nil { + return m.Records + } + return nil +} + +func init() { + proto.RegisterType((*Record)(nil), "go.micro.store.Record") + proto.RegisterType((*ReadRequest)(nil), "go.micro.store.ReadRequest") + proto.RegisterType((*ReadResponse)(nil), "go.micro.store.ReadResponse") + proto.RegisterType((*WriteRequest)(nil), "go.micro.store.WriteRequest") + proto.RegisterType((*WriteResponse)(nil), "go.micro.store.WriteResponse") + proto.RegisterType((*DeleteRequest)(nil), "go.micro.store.DeleteRequest") + proto.RegisterType((*DeleteResponse)(nil), "go.micro.store.DeleteResponse") + proto.RegisterType((*SyncRequest)(nil), "go.micro.store.SyncRequest") + proto.RegisterType((*SyncResponse)(nil), "go.micro.store.SyncResponse") +} + +func init() { + proto.RegisterFile("micro/go-micro/store/service/proto/store.proto", fileDescriptor_f84ccc98e143ed3e) +} + +var fileDescriptor_f84ccc98e143ed3e = []byte{ + // 333 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x92, 0xcb, 0x6e, 0xf2, 0x30, + 0x10, 0x85, 0x09, 0x81, 0xfc, 0x62, 0xb8, 0xfc, 0x68, 0x54, 0xa1, 0x88, 0xde, 0xd2, 0x74, 0x93, + 0x4d, 0x03, 0xa2, 0x2f, 0x50, 0xa9, 0x17, 0xb5, 0x5b, 0xb3, 0xe8, 0x9a, 0x86, 0x11, 0x8a, 0xa0, + 0x98, 0x3a, 0x01, 0x35, 0x2f, 0xd4, 0xe7, 0xac, 0x6c, 0x27, 0x69, 0x90, 0x41, 0xaa, 0xba, 0x1b, + 0x7b, 0xce, 0x1c, 0x9f, 0xf9, 0x64, 0x08, 0xdf, 0xe3, 0x48, 0xf0, 0xd1, 0x82, 0xdf, 0xe8, 0x22, + 0x49, 0xb9, 0xa0, 0x51, 0x42, 0x62, 0x17, 0x47, 0x34, 0xda, 0x08, 0x9e, 0xe6, 0x77, 0xa1, 0xaa, + 0xb1, 0xb7, 0xe0, 0x7a, 0x24, 0x54, 0xb7, 0xfe, 0x33, 0x38, 0x8c, 0x22, 0x2e, 0xe6, 0xd8, 0x07, + 0x7b, 0x49, 0x99, 0x6b, 0x79, 0x56, 0xd0, 0x62, 0xb2, 0xc4, 0x13, 0x68, 0xee, 0x66, 0xab, 0x2d, + 0xb9, 0x75, 0xcf, 0x0a, 0x3a, 0x4c, 0x1f, 0x70, 0x00, 0x0e, 0x7d, 0x6e, 0x62, 0x91, 0xb9, 0xb6, + 0x67, 0x05, 0x36, 0xcb, 0x4f, 0xfe, 0x15, 0xb4, 0x19, 0xcd, 0xe6, 0x8c, 0x3e, 0xb6, 0x94, 0xa4, + 0x88, 0xd0, 0x58, 0x52, 0x96, 0xb8, 0x96, 0x67, 0x07, 0x2d, 0xa6, 0x6a, 0xff, 0x0e, 0x3a, 0x5a, + 0x92, 0x6c, 0xf8, 0x3a, 0x21, 0x1c, 0xc3, 0x3f, 0xa1, 0x1e, 0xd7, 0xb2, 0xf6, 0x64, 0x10, 0xee, + 0xc7, 0x0b, 0x75, 0x36, 0x56, 0xc8, 0xa4, 0xc3, 0xab, 0x88, 0x53, 0x2a, 0x5e, 0xa9, 0x38, 0xd4, + 0x7f, 0xe7, 0xf0, 0x1f, 0xba, 0xb9, 0x83, 0x0e, 0xe1, 0x5f, 0x43, 0xf7, 0x81, 0x56, 0xf4, 0xe3, + 0x79, 0x28, 0x79, 0x1f, 0x7a, 0x85, 0x28, 0x1f, 0xbb, 0x84, 0xf6, 0x34, 0x5b, 0x47, 0xc5, 0x90, + 0x41, 0x4f, 0x46, 0xd5, 0x82, 0xbf, 0x2e, 0x3b, 0xf9, 0xaa, 0x43, 0x73, 0x2a, 0x3b, 0x78, 0x0f, + 0x0d, 0x09, 0x0e, 0x4f, 0xcd, 0x91, 0x92, 0xf8, 0xf0, 0xec, 0x70, 0x33, 0xcf, 0x5b, 0xc3, 0x27, + 0x68, 0xaa, 0xcd, 0xd1, 0x10, 0x56, 0x91, 0x0e, 0xcf, 0x8f, 0x74, 0x4b, 0x9f, 0x17, 0x70, 0x34, + 0x0b, 0x34, 0xa4, 0x7b, 0x20, 0x87, 0x17, 0xc7, 0xda, 0xa5, 0xd5, 0x23, 0x34, 0x24, 0x23, 0x73, + 0xaf, 0x0a, 0x5a, 0x73, 0xaf, 0x2a, 0x56, 0xbf, 0x36, 0xb6, 0xde, 0x1c, 0xf5, 0xb7, 0x6f, 0xbf, + 0x03, 0x00, 0x00, 0xff, 0xff, 0x30, 0xc8, 0x99, 0x52, 0x0d, 0x03, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// StoreClient is the client API for Store service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type StoreClient interface { + Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadResponse, error) + Write(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteResponse, error) + Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error) + Sync(ctx context.Context, in *SyncRequest, opts ...grpc.CallOption) (Store_SyncClient, error) +} + +type storeClient struct { + cc *grpc.ClientConn +} + +func NewStoreClient(cc *grpc.ClientConn) StoreClient { + return &storeClient{cc} +} + +func (c *storeClient) Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadResponse, error) { + out := new(ReadResponse) + err := c.cc.Invoke(ctx, "/go.micro.store.Store/Read", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *storeClient) Write(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteResponse, error) { + out := new(WriteResponse) + err := c.cc.Invoke(ctx, "/go.micro.store.Store/Write", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *storeClient) Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error) { + out := new(DeleteResponse) + err := c.cc.Invoke(ctx, "/go.micro.store.Store/Delete", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *storeClient) Sync(ctx context.Context, in *SyncRequest, opts ...grpc.CallOption) (Store_SyncClient, error) { + stream, err := c.cc.NewStream(ctx, &_Store_serviceDesc.Streams[0], "/go.micro.store.Store/Sync", opts...) + if err != nil { + return nil, err + } + x := &storeSyncClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Store_SyncClient interface { + Recv() (*SyncResponse, error) + grpc.ClientStream +} + +type storeSyncClient struct { + grpc.ClientStream +} + +func (x *storeSyncClient) Recv() (*SyncResponse, error) { + m := new(SyncResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// StoreServer is the server API for Store service. +type StoreServer interface { + Read(context.Context, *ReadRequest) (*ReadResponse, error) + Write(context.Context, *WriteRequest) (*WriteResponse, error) + Delete(context.Context, *DeleteRequest) (*DeleteResponse, error) + Sync(*SyncRequest, Store_SyncServer) error +} + +func RegisterStoreServer(s *grpc.Server, srv StoreServer) { + s.RegisterService(&_Store_serviceDesc, srv) +} + +func _Store_Read_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ReadRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(StoreServer).Read(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/go.micro.store.Store/Read", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(StoreServer).Read(ctx, req.(*ReadRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Store_Write_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(WriteRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(StoreServer).Write(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/go.micro.store.Store/Write", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(StoreServer).Write(ctx, req.(*WriteRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Store_Delete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(DeleteRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(StoreServer).Delete(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/go.micro.store.Store/Delete", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(StoreServer).Delete(ctx, req.(*DeleteRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Store_Sync_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(SyncRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(StoreServer).Sync(m, &storeSyncServer{stream}) +} + +type Store_SyncServer interface { + Send(*SyncResponse) error + grpc.ServerStream +} + +type storeSyncServer struct { + grpc.ServerStream +} + +func (x *storeSyncServer) Send(m *SyncResponse) error { + return x.ServerStream.SendMsg(m) +} + +var _Store_serviceDesc = grpc.ServiceDesc{ + ServiceName: "go.micro.store.Store", + HandlerType: (*StoreServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Read", + Handler: _Store_Read_Handler, + }, + { + MethodName: "Write", + Handler: _Store_Write_Handler, + }, + { + MethodName: "Delete", + Handler: _Store_Delete_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "Sync", + Handler: _Store_Sync_Handler, + ServerStreams: true, + }, + }, + Metadata: "micro/go-micro/store/service/proto/store.proto", +} diff --git a/store/service/proto/store.proto b/store/service/proto/store.proto new file mode 100644 index 00000000..a5abe4aa --- /dev/null +++ b/store/service/proto/store.proto @@ -0,0 +1,48 @@ +syntax = "proto3"; + +package go.micro.store; + +service Store { + rpc Read(ReadRequest) returns (ReadResponse) {}; + rpc Write(WriteRequest) returns (WriteResponse) {}; + rpc Delete(DeleteRequest) returns (DeleteResponse) {}; + rpc Sync(SyncRequest) returns (stream SyncResponse) {}; +} + +message Record { + // key of the record + string key = 1; + // value in the record + bytes value = 2; + // timestamp in unix seconds + int64 expiry = 3; +} + +message ReadRequest { + repeated string keys = 1; +} + +message ReadResponse { + repeated Record records = 1; +} + +message WriteRequest { + repeated Record records = 2; +} + +message WriteResponse {} + +message DeleteRequest { + repeated string keys = 1; +} + +message DeleteResponse {} + +message SyncRequest { + // optional key + string key = 1; +} + +message SyncResponse { + repeated Record records = 1; +} diff --git a/store/store.go b/store/store.go index c6442947..e3d8efac 100644 --- a/store/store.go +++ b/store/store.go @@ -19,11 +19,11 @@ type Store interface { // Sync all the known records Sync() ([]*Record, error) // Read a record with key - Read(key string) (*Record, error) + Read(keys ...string) ([]*Record, error) // Write a record - Write(r *Record) error + Write(recs ...*Record) error // Delete a record with key - Delete(key string) error + Delete(keys ...string) error } // Record represents a data record diff --git a/sync/map.go b/sync/map.go index a87bd988..a9611ff0 100644 --- a/sync/map.go +++ b/sync/map.go @@ -39,8 +39,12 @@ func (m *syncMap) Read(key, val interface{}) error { return err } + if len(kval) == 0 { + return store.ErrNotFound + } + // decode value - return json.Unmarshal(kval.Value, val) + return json.Unmarshal(kval[0].Value, val) } func (m *syncMap) Write(key, val interface{}) error {