Change store.Sync to store.List

This commit is contained in:
Asim Aslam 2019-10-23 22:05:39 +01:00
parent ecac392dbe
commit 82f94c7861
10 changed files with 216 additions and 220 deletions

View File

@ -66,9 +66,9 @@ func New(opts ...options.Option) (store.Store, error) {
}, nil }, nil
} }
// In the cloudflare workers KV implemention, Sync() doesn't guarantee // In the cloudflare workers KV implemention, List() doesn't guarantee
// anything as the workers API is eventually consistent. // anything as the workers API is eventually consistent.
func (w *workersKV) Sync() ([]*store.Record, error) { func (w *workersKV) List() ([]*store.Record, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel() defer cancel()

View File

@ -31,11 +31,11 @@ func TestCloudflare(t *testing.T) {
t.Fatal(err.Error()) t.Fatal(err.Error())
} }
records, err := wkv.Sync() records, err := wkv.List()
if err != nil { if err != nil {
t.Fatalf("Sync: %s\n", err.Error()) t.Fatalf("List: %s\n", err.Error())
} else { } else {
t.Log("Synced " + strconv.Itoa(len(records)) + " records") t.Log("Listed " + strconv.Itoa(len(records)) + " records")
} }
err = wkv.Write( err = wkv.Write(

View File

@ -68,7 +68,7 @@ func (e *ekv) Write(records ...*store.Record) error {
return gerr return gerr
} }
func (e *ekv) Sync() ([]*store.Record, error) { func (e *ekv) List() ([]*store.Record, error) {
keyval, err := e.kv.Get(context.Background(), "/", client.WithPrefix()) keyval, err := e.kv.Get(context.Background(), "/", client.WithPrefix())
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -21,7 +21,7 @@ type memoryRecord struct {
c time.Time c time.Time
} }
func (m *memoryStore) Sync() ([]*store.Record, error) { func (m *memoryStore) List() ([]*store.Record, error) {
m.RLock() m.RLock()
defer m.RUnlock() defer m.RUnlock()

View File

@ -55,19 +55,19 @@ func (s *Store) Delete(ctx context.Context, req *pb.DeleteRequest, rsp *pb.Delet
return nil return nil
} }
func (s *Store) Sync(ctx context.Context, req *pb.SyncRequest, stream pb.Store_SyncStream) error { func (s *Store) List(ctx context.Context, req *pb.ListRequest, stream pb.Store_ListStream) error {
var vals []*store.Record var vals []*store.Record
var err error var err error
if len(req.Key) > 0 { if len(req.Key) > 0 {
vals, err = s.Store.Read(req.Key) vals, err = s.Store.Read(req.Key)
} else { } else {
vals, err = s.Store.Sync() vals, err = s.Store.List()
} }
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.store", err.Error()) return errors.InternalServerError("go.micro.store", err.Error())
} }
rsp := new(pb.SyncResponse) rsp := new(pb.ListResponse)
// TODO: batch sync // TODO: batch sync
for _, val := range vals { for _, val := range vals {

View File

@ -34,10 +34,10 @@ var _ server.Option
// Client API for Store service // Client API for Store service
type StoreService interface { type StoreService interface {
List(ctx context.Context, in *ListRequest, opts ...client.CallOption) (Store_ListService, error)
Read(ctx context.Context, in *ReadRequest, opts ...client.CallOption) (*ReadResponse, error) Read(ctx context.Context, in *ReadRequest, opts ...client.CallOption) (*ReadResponse, error)
Write(ctx context.Context, in *WriteRequest, opts ...client.CallOption) (*WriteResponse, error) Write(ctx context.Context, in *WriteRequest, opts ...client.CallOption) (*WriteResponse, error)
Delete(ctx context.Context, in *DeleteRequest, opts ...client.CallOption) (*DeleteResponse, error) Delete(ctx context.Context, in *DeleteRequest, opts ...client.CallOption) (*DeleteResponse, error)
Sync(ctx context.Context, in *SyncRequest, opts ...client.CallOption) (Store_SyncService, error)
} }
type storeService struct { type storeService struct {
@ -58,6 +58,50 @@ func NewStoreService(name string, c client.Client) StoreService {
} }
} }
func (c *storeService) List(ctx context.Context, in *ListRequest, opts ...client.CallOption) (Store_ListService, error) {
req := c.c.NewRequest(c.name, "Store.List", &ListRequest{})
stream, err := c.c.Stream(ctx, req, opts...)
if err != nil {
return nil, err
}
if err := stream.Send(in); err != nil {
return nil, err
}
return &storeServiceList{stream}, nil
}
type Store_ListService interface {
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error
Recv() (*ListResponse, error)
}
type storeServiceList struct {
stream client.Stream
}
func (x *storeServiceList) Close() error {
return x.stream.Close()
}
func (x *storeServiceList) SendMsg(m interface{}) error {
return x.stream.Send(m)
}
func (x *storeServiceList) RecvMsg(m interface{}) error {
return x.stream.Recv(m)
}
func (x *storeServiceList) Recv() (*ListResponse, error) {
m := new(ListResponse)
err := x.stream.Recv(m)
if err != nil {
return nil, err
}
return m, nil
}
func (c *storeService) Read(ctx context.Context, in *ReadRequest, opts ...client.CallOption) (*ReadResponse, error) { func (c *storeService) Read(ctx context.Context, in *ReadRequest, opts ...client.CallOption) (*ReadResponse, error) {
req := c.c.NewRequest(c.name, "Store.Read", in) req := c.c.NewRequest(c.name, "Store.Read", in)
out := new(ReadResponse) out := new(ReadResponse)
@ -88,65 +132,21 @@ func (c *storeService) Delete(ctx context.Context, in *DeleteRequest, opts ...cl
return out, nil return out, nil
} }
func (c *storeService) Sync(ctx context.Context, in *SyncRequest, opts ...client.CallOption) (Store_SyncService, error) {
req := c.c.NewRequest(c.name, "Store.Sync", &SyncRequest{})
stream, err := c.c.Stream(ctx, req, opts...)
if err != nil {
return nil, err
}
if err := stream.Send(in); err != nil {
return nil, err
}
return &storeServiceSync{stream}, nil
}
type Store_SyncService interface {
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error
Recv() (*SyncResponse, error)
}
type storeServiceSync struct {
stream client.Stream
}
func (x *storeServiceSync) Close() error {
return x.stream.Close()
}
func (x *storeServiceSync) SendMsg(m interface{}) error {
return x.stream.Send(m)
}
func (x *storeServiceSync) RecvMsg(m interface{}) error {
return x.stream.Recv(m)
}
func (x *storeServiceSync) Recv() (*SyncResponse, error) {
m := new(SyncResponse)
err := x.stream.Recv(m)
if err != nil {
return nil, err
}
return m, nil
}
// Server API for Store service // Server API for Store service
type StoreHandler interface { type StoreHandler interface {
List(context.Context, *ListRequest, Store_ListStream) error
Read(context.Context, *ReadRequest, *ReadResponse) error Read(context.Context, *ReadRequest, *ReadResponse) error
Write(context.Context, *WriteRequest, *WriteResponse) error Write(context.Context, *WriteRequest, *WriteResponse) error
Delete(context.Context, *DeleteRequest, *DeleteResponse) error Delete(context.Context, *DeleteRequest, *DeleteResponse) error
Sync(context.Context, *SyncRequest, Store_SyncStream) error
} }
func RegisterStoreHandler(s server.Server, hdlr StoreHandler, opts ...server.HandlerOption) error { func RegisterStoreHandler(s server.Server, hdlr StoreHandler, opts ...server.HandlerOption) error {
type store interface { type store interface {
List(ctx context.Context, stream server.Stream) error
Read(ctx context.Context, in *ReadRequest, out *ReadResponse) error Read(ctx context.Context, in *ReadRequest, out *ReadResponse) error
Write(ctx context.Context, in *WriteRequest, out *WriteResponse) error Write(ctx context.Context, in *WriteRequest, out *WriteResponse) error
Delete(ctx context.Context, in *DeleteRequest, out *DeleteResponse) error Delete(ctx context.Context, in *DeleteRequest, out *DeleteResponse) error
Sync(ctx context.Context, stream server.Stream) error
} }
type Store struct { type Store struct {
store store
@ -159,6 +159,41 @@ type storeHandler struct {
StoreHandler StoreHandler
} }
func (h *storeHandler) List(ctx context.Context, stream server.Stream) error {
m := new(ListRequest)
if err := stream.Recv(m); err != nil {
return err
}
return h.StoreHandler.List(ctx, m, &storeListStream{stream})
}
type Store_ListStream interface {
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error
Send(*ListResponse) error
}
type storeListStream struct {
stream server.Stream
}
func (x *storeListStream) Close() error {
return x.stream.Close()
}
func (x *storeListStream) SendMsg(m interface{}) error {
return x.stream.Send(m)
}
func (x *storeListStream) RecvMsg(m interface{}) error {
return x.stream.Recv(m)
}
func (x *storeListStream) Send(m *ListResponse) error {
return x.stream.Send(m)
}
func (h *storeHandler) Read(ctx context.Context, in *ReadRequest, out *ReadResponse) error { func (h *storeHandler) Read(ctx context.Context, in *ReadRequest, out *ReadResponse) error {
return h.StoreHandler.Read(ctx, in, out) return h.StoreHandler.Read(ctx, in, out)
} }
@ -170,38 +205,3 @@ func (h *storeHandler) Write(ctx context.Context, in *WriteRequest, out *WriteRe
func (h *storeHandler) Delete(ctx context.Context, in *DeleteRequest, out *DeleteResponse) error { func (h *storeHandler) Delete(ctx context.Context, in *DeleteRequest, out *DeleteResponse) error {
return h.StoreHandler.Delete(ctx, in, out) return h.StoreHandler.Delete(ctx, in, out)
} }
func (h *storeHandler) Sync(ctx context.Context, stream server.Stream) error {
m := new(SyncRequest)
if err := stream.Recv(m); err != nil {
return err
}
return h.StoreHandler.Sync(ctx, m, &storeSyncStream{stream})
}
type Store_SyncStream interface {
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error
Send(*SyncResponse) error
}
type storeSyncStream struct {
stream server.Stream
}
func (x *storeSyncStream) Close() error {
return x.stream.Close()
}
func (x *storeSyncStream) SendMsg(m interface{}) error {
return x.stream.Send(m)
}
func (x *storeSyncStream) RecvMsg(m interface{}) error {
return x.stream.Recv(m)
}
func (x *storeSyncStream) Send(m *SyncResponse) error {
return x.stream.Send(m)
}

View File

@ -298,7 +298,7 @@ func (m *DeleteResponse) XXX_DiscardUnknown() {
var xxx_messageInfo_DeleteResponse proto.InternalMessageInfo var xxx_messageInfo_DeleteResponse proto.InternalMessageInfo
type SyncRequest struct { type ListRequest struct {
// optional key // optional key
Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
@ -306,71 +306,71 @@ type SyncRequest struct {
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
} }
func (m *SyncRequest) Reset() { *m = SyncRequest{} } func (m *ListRequest) Reset() { *m = ListRequest{} }
func (m *SyncRequest) String() string { return proto.CompactTextString(m) } func (m *ListRequest) String() string { return proto.CompactTextString(m) }
func (*SyncRequest) ProtoMessage() {} func (*ListRequest) ProtoMessage() {}
func (*SyncRequest) Descriptor() ([]byte, []int) { func (*ListRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_f84ccc98e143ed3e, []int{7} return fileDescriptor_f84ccc98e143ed3e, []int{7}
} }
func (m *SyncRequest) XXX_Unmarshal(b []byte) error { func (m *ListRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SyncRequest.Unmarshal(m, b) return xxx_messageInfo_ListRequest.Unmarshal(m, b)
} }
func (m *SyncRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { func (m *ListRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_SyncRequest.Marshal(b, m, deterministic) return xxx_messageInfo_ListRequest.Marshal(b, m, deterministic)
} }
func (m *SyncRequest) XXX_Merge(src proto.Message) { func (m *ListRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_SyncRequest.Merge(m, src) xxx_messageInfo_ListRequest.Merge(m, src)
} }
func (m *SyncRequest) XXX_Size() int { func (m *ListRequest) XXX_Size() int {
return xxx_messageInfo_SyncRequest.Size(m) return xxx_messageInfo_ListRequest.Size(m)
} }
func (m *SyncRequest) XXX_DiscardUnknown() { func (m *ListRequest) XXX_DiscardUnknown() {
xxx_messageInfo_SyncRequest.DiscardUnknown(m) xxx_messageInfo_ListRequest.DiscardUnknown(m)
} }
var xxx_messageInfo_SyncRequest proto.InternalMessageInfo var xxx_messageInfo_ListRequest proto.InternalMessageInfo
func (m *SyncRequest) GetKey() string { func (m *ListRequest) GetKey() string {
if m != nil { if m != nil {
return m.Key return m.Key
} }
return "" return ""
} }
type SyncResponse struct { type ListResponse struct {
Records []*Record `protobuf:"bytes,1,rep,name=records,proto3" json:"records,omitempty"` Records []*Record `protobuf:"bytes,1,rep,name=records,proto3" json:"records,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
} }
func (m *SyncResponse) Reset() { *m = SyncResponse{} } func (m *ListResponse) Reset() { *m = ListResponse{} }
func (m *SyncResponse) String() string { return proto.CompactTextString(m) } func (m *ListResponse) String() string { return proto.CompactTextString(m) }
func (*SyncResponse) ProtoMessage() {} func (*ListResponse) ProtoMessage() {}
func (*SyncResponse) Descriptor() ([]byte, []int) { func (*ListResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_f84ccc98e143ed3e, []int{8} return fileDescriptor_f84ccc98e143ed3e, []int{8}
} }
func (m *SyncResponse) XXX_Unmarshal(b []byte) error { func (m *ListResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SyncResponse.Unmarshal(m, b) return xxx_messageInfo_ListResponse.Unmarshal(m, b)
} }
func (m *SyncResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { func (m *ListResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_SyncResponse.Marshal(b, m, deterministic) return xxx_messageInfo_ListResponse.Marshal(b, m, deterministic)
} }
func (m *SyncResponse) XXX_Merge(src proto.Message) { func (m *ListResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_SyncResponse.Merge(m, src) xxx_messageInfo_ListResponse.Merge(m, src)
} }
func (m *SyncResponse) XXX_Size() int { func (m *ListResponse) XXX_Size() int {
return xxx_messageInfo_SyncResponse.Size(m) return xxx_messageInfo_ListResponse.Size(m)
} }
func (m *SyncResponse) XXX_DiscardUnknown() { func (m *ListResponse) XXX_DiscardUnknown() {
xxx_messageInfo_SyncResponse.DiscardUnknown(m) xxx_messageInfo_ListResponse.DiscardUnknown(m)
} }
var xxx_messageInfo_SyncResponse proto.InternalMessageInfo var xxx_messageInfo_ListResponse proto.InternalMessageInfo
func (m *SyncResponse) GetRecords() []*Record { func (m *ListResponse) GetRecords() []*Record {
if m != nil { if m != nil {
return m.Records return m.Records
} }
@ -385,8 +385,8 @@ func init() {
proto.RegisterType((*WriteResponse)(nil), "go.micro.store.WriteResponse") proto.RegisterType((*WriteResponse)(nil), "go.micro.store.WriteResponse")
proto.RegisterType((*DeleteRequest)(nil), "go.micro.store.DeleteRequest") proto.RegisterType((*DeleteRequest)(nil), "go.micro.store.DeleteRequest")
proto.RegisterType((*DeleteResponse)(nil), "go.micro.store.DeleteResponse") proto.RegisterType((*DeleteResponse)(nil), "go.micro.store.DeleteResponse")
proto.RegisterType((*SyncRequest)(nil), "go.micro.store.SyncRequest") proto.RegisterType((*ListRequest)(nil), "go.micro.store.ListRequest")
proto.RegisterType((*SyncResponse)(nil), "go.micro.store.SyncResponse") proto.RegisterType((*ListResponse)(nil), "go.micro.store.ListResponse")
} }
func init() { func init() {
@ -395,27 +395,27 @@ func init() {
var fileDescriptor_f84ccc98e143ed3e = []byte{ var fileDescriptor_f84ccc98e143ed3e = []byte{
// 333 bytes of a gzipped FileDescriptorProto // 333 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x92, 0xcb, 0x6e, 0xf2, 0x30, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x92, 0x4d, 0x4f, 0xc2, 0x40,
0x10, 0x85, 0x09, 0x81, 0xfc, 0x62, 0xb8, 0xfc, 0x68, 0x54, 0xa1, 0x88, 0xde, 0xd2, 0x74, 0x93, 0x10, 0x86, 0x59, 0x0a, 0x35, 0x0c, 0x1f, 0x92, 0x89, 0x21, 0x0d, 0x7e, 0xd5, 0x7a, 0xe9, 0xc5,
0x4d, 0x03, 0xa2, 0x2f, 0x50, 0xa9, 0x17, 0xb5, 0x5b, 0xb3, 0xe8, 0x9a, 0x86, 0x11, 0x8a, 0xa0, 0x42, 0xf0, 0x0f, 0x98, 0xf8, 0x11, 0x4d, 0x3c, 0xad, 0x07, 0xcf, 0x08, 0x13, 0xd2, 0x80, 0x2e,
0x98, 0x3a, 0x01, 0x35, 0x2f, 0xd4, 0xe7, 0xac, 0x6c, 0x27, 0x69, 0x90, 0x41, 0xaa, 0xba, 0x1b, 0xee, 0x16, 0x62, 0xff, 0x90, 0xbf, 0xd3, 0xec, 0x6e, 0xab, 0xc5, 0x42, 0x62, 0xbc, 0xcd, 0xee,
0x7b, 0xce, 0x1c, 0x9f, 0xf9, 0x64, 0x08, 0xdf, 0xe3, 0x48, 0xf0, 0xd1, 0x82, 0xdf, 0xe8, 0x22, 0xbc, 0xf3, 0xec, 0xdb, 0x79, 0x0b, 0xd1, 0x6b, 0x3c, 0x91, 0x62, 0x30, 0x13, 0x17, 0xb6, 0x50,
0x49, 0xb9, 0xa0, 0x51, 0x42, 0x62, 0x17, 0x47, 0x34, 0xda, 0x08, 0x9e, 0xe6, 0x77, 0xa1, 0xaa, 0x89, 0x90, 0x34, 0x50, 0x24, 0xd7, 0xf1, 0x84, 0x06, 0x4b, 0x29, 0x92, 0xec, 0x2e, 0x32, 0x35,
0xb1, 0xb7, 0xe0, 0x7a, 0x24, 0x54, 0xb7, 0xfe, 0x33, 0x38, 0x8c, 0x22, 0x2e, 0xe6, 0xd8, 0x07, 0x76, 0x66, 0xc2, 0x8e, 0x44, 0xe6, 0x36, 0xb8, 0x07, 0x97, 0xd3, 0x44, 0xc8, 0x29, 0x76, 0xc1,
0x7b, 0x49, 0x99, 0x6b, 0x79, 0x56, 0xd0, 0x62, 0xb2, 0xc4, 0x13, 0x68, 0xee, 0x66, 0xab, 0x2d, 0x99, 0x53, 0xea, 0x31, 0x9f, 0x85, 0x0d, 0xae, 0x4b, 0x3c, 0x80, 0xfa, 0x7a, 0xbc, 0x58, 0x91,
0xb9, 0x75, 0xcf, 0x0a, 0x3a, 0x4c, 0x1f, 0x70, 0x00, 0x0e, 0x7d, 0x6e, 0x62, 0x91, 0xb9, 0xb6, 0x57, 0xf5, 0x59, 0xd8, 0xe2, 0xf6, 0x80, 0x3d, 0x70, 0xe9, 0x63, 0x19, 0xcb, 0xd4, 0x73, 0x7c,
0x67, 0x05, 0x36, 0xcb, 0x4f, 0xfe, 0x15, 0xb4, 0x19, 0xcd, 0xe6, 0x8c, 0x3e, 0xb6, 0x94, 0xa4, 0x16, 0x3a, 0x3c, 0x3b, 0x05, 0x67, 0xd0, 0xe4, 0x34, 0x9e, 0x72, 0x7a, 0x5f, 0x91, 0x4a, 0x10,
0x88, 0xd0, 0x58, 0x52, 0x96, 0xb8, 0x96, 0x67, 0x07, 0x2d, 0xa6, 0x6a, 0xff, 0x0e, 0x3a, 0x5a, 0xa1, 0x36, 0xa7, 0x54, 0x79, 0xcc, 0x77, 0xc2, 0x06, 0x37, 0x75, 0x70, 0x05, 0x2d, 0x2b, 0x51,
0x92, 0x6c, 0xf8, 0x3a, 0x21, 0x1c, 0xc3, 0x3f, 0xa1, 0x1e, 0xd7, 0xb2, 0xf6, 0x64, 0x10, 0xee, 0x4b, 0xf1, 0xa6, 0x08, 0x87, 0xb0, 0x27, 0xcd, 0xe3, 0x56, 0xd6, 0x1c, 0xf5, 0xa2, 0x4d, 0x7b,
0xc7, 0x0b, 0x75, 0x36, 0x56, 0xc8, 0xa4, 0xc3, 0xab, 0x88, 0x53, 0x2a, 0x5e, 0xa9, 0x38, 0xd4, 0x91, 0xf5, 0xc6, 0x73, 0x99, 0x26, 0x3c, 0xcb, 0x38, 0xa1, 0xfc, 0x95, 0x02, 0xa1, 0xfa, 0x37,
0x7f, 0xe7, 0xf0, 0x1f, 0xba, 0xb9, 0x83, 0x0e, 0xe1, 0x5f, 0x43, 0xf7, 0x81, 0x56, 0xf4, 0xe3, 0xc2, 0x3e, 0xb4, 0x33, 0x82, 0x35, 0x11, 0x9c, 0x43, 0xfb, 0x86, 0x16, 0xf4, 0xc3, 0xdc, 0xe6,
0x79, 0x28, 0x79, 0x1f, 0x7a, 0x85, 0x28, 0x1f, 0xbb, 0x84, 0xf6, 0x34, 0x5b, 0x47, 0xc5, 0x90, 0xbc, 0x0b, 0x9d, 0x5c, 0x94, 0x8d, 0x9d, 0x42, 0xf3, 0x31, 0x56, 0x49, 0x3e, 0x54, 0xda, 0x9e,
0x41, 0x4f, 0x46, 0xd5, 0x82, 0xbf, 0x2e, 0x3b, 0xf9, 0xaa, 0x43, 0x73, 0x2a, 0x3b, 0x78, 0x0f, 0xb6, 0x6a, 0x05, 0xff, 0xfd, 0xd8, 0xd1, 0x67, 0x15, 0xea, 0x4f, 0xba, 0x83, 0xb7, 0x50, 0xd3,
0x0d, 0x09, 0x0e, 0x4f, 0xcd, 0x91, 0x92, 0xf8, 0xf0, 0xec, 0x70, 0x33, 0xcf, 0x5b, 0xc3, 0x27, 0x2c, 0x3c, 0xfc, 0x3d, 0x52, 0xb0, 0xd0, 0x3f, 0xda, 0xde, 0xcc, 0xfc, 0x56, 0x86, 0x0c, 0xaf,
0x68, 0xaa, 0xcd, 0xd1, 0x10, 0x56, 0x91, 0x0e, 0xcf, 0x8f, 0x74, 0x4b, 0x9f, 0x17, 0x70, 0x34, 0xa1, 0xa6, 0xf7, 0x5f, 0xc6, 0x14, 0x82, 0x2b, 0x63, 0x8a, 0x91, 0x05, 0x15, 0xbc, 0x83, 0xba,
0x0b, 0x34, 0xa4, 0x7b, 0x20, 0x87, 0x17, 0xc7, 0xda, 0xa5, 0xd5, 0x23, 0x34, 0x24, 0x23, 0x73, 0x59, 0x20, 0x96, 0x84, 0xc5, 0x64, 0xfa, 0xc7, 0x3b, 0xba, 0xdf, 0x9c, 0x07, 0x70, 0xed, 0x4a,
0xaf, 0x0a, 0x5a, 0x73, 0xaf, 0x2a, 0x56, 0xbf, 0x36, 0xb6, 0xde, 0x1c, 0xf5, 0xb7, 0x6f, 0xbf, 0xb1, 0x24, 0xdd, 0xc8, 0xa3, 0x7f, 0xb2, 0xab, 0x9d, 0xa3, 0x5e, 0x5c, 0xf3, 0x6f, 0x5f, 0x7e,
0x03, 0x00, 0x00, 0xff, 0xff, 0x30, 0xc8, 0x99, 0x52, 0x0d, 0x03, 0x00, 0x00, 0x05, 0x00, 0x00, 0xff, 0xff, 0x30, 0x48, 0x25, 0x2d, 0x0d, 0x03, 0x00, 0x00,
} }
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.
@ -430,10 +430,10 @@ const _ = grpc.SupportPackageIsVersion4
// //
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type StoreClient interface { type StoreClient interface {
List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (Store_ListClient, error)
Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadResponse, error) Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadResponse, error)
Write(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteResponse, error) Write(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteResponse, error)
Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error) Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error)
Sync(ctx context.Context, in *SyncRequest, opts ...grpc.CallOption) (Store_SyncClient, error)
} }
type storeClient struct { type storeClient struct {
@ -444,6 +444,38 @@ func NewStoreClient(cc *grpc.ClientConn) StoreClient {
return &storeClient{cc} return &storeClient{cc}
} }
func (c *storeClient) List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (Store_ListClient, error) {
stream, err := c.cc.NewStream(ctx, &_Store_serviceDesc.Streams[0], "/go.micro.store.Store/List", opts...)
if err != nil {
return nil, err
}
x := &storeListClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Store_ListClient interface {
Recv() (*ListResponse, error)
grpc.ClientStream
}
type storeListClient struct {
grpc.ClientStream
}
func (x *storeListClient) Recv() (*ListResponse, error) {
m := new(ListResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *storeClient) Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadResponse, error) { func (c *storeClient) Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadResponse, error) {
out := new(ReadResponse) out := new(ReadResponse)
err := c.cc.Invoke(ctx, "/go.micro.store.Store/Read", in, out, opts...) err := c.cc.Invoke(ctx, "/go.micro.store.Store/Read", in, out, opts...)
@ -471,50 +503,39 @@ func (c *storeClient) Delete(ctx context.Context, in *DeleteRequest, opts ...grp
return out, nil return out, nil
} }
func (c *storeClient) Sync(ctx context.Context, in *SyncRequest, opts ...grpc.CallOption) (Store_SyncClient, error) {
stream, err := c.cc.NewStream(ctx, &_Store_serviceDesc.Streams[0], "/go.micro.store.Store/Sync", opts...)
if err != nil {
return nil, err
}
x := &storeSyncClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Store_SyncClient interface {
Recv() (*SyncResponse, error)
grpc.ClientStream
}
type storeSyncClient struct {
grpc.ClientStream
}
func (x *storeSyncClient) Recv() (*SyncResponse, error) {
m := new(SyncResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// StoreServer is the server API for Store service. // StoreServer is the server API for Store service.
type StoreServer interface { type StoreServer interface {
List(*ListRequest, Store_ListServer) error
Read(context.Context, *ReadRequest) (*ReadResponse, error) Read(context.Context, *ReadRequest) (*ReadResponse, error)
Write(context.Context, *WriteRequest) (*WriteResponse, error) Write(context.Context, *WriteRequest) (*WriteResponse, error)
Delete(context.Context, *DeleteRequest) (*DeleteResponse, error) Delete(context.Context, *DeleteRequest) (*DeleteResponse, error)
Sync(*SyncRequest, Store_SyncServer) error
} }
func RegisterStoreServer(s *grpc.Server, srv StoreServer) { func RegisterStoreServer(s *grpc.Server, srv StoreServer) {
s.RegisterService(&_Store_serviceDesc, srv) s.RegisterService(&_Store_serviceDesc, srv)
} }
func _Store_List_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(ListRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(StoreServer).List(m, &storeListServer{stream})
}
type Store_ListServer interface {
Send(*ListResponse) error
grpc.ServerStream
}
type storeListServer struct {
grpc.ServerStream
}
func (x *storeListServer) Send(m *ListResponse) error {
return x.ServerStream.SendMsg(m)
}
func _Store_Read_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { func _Store_Read_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ReadRequest) in := new(ReadRequest)
if err := dec(in); err != nil { if err := dec(in); err != nil {
@ -569,27 +590,6 @@ func _Store_Delete_Handler(srv interface{}, ctx context.Context, dec func(interf
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
func _Store_Sync_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(SyncRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(StoreServer).Sync(m, &storeSyncServer{stream})
}
type Store_SyncServer interface {
Send(*SyncResponse) error
grpc.ServerStream
}
type storeSyncServer struct {
grpc.ServerStream
}
func (x *storeSyncServer) Send(m *SyncResponse) error {
return x.ServerStream.SendMsg(m)
}
var _Store_serviceDesc = grpc.ServiceDesc{ var _Store_serviceDesc = grpc.ServiceDesc{
ServiceName: "go.micro.store.Store", ServiceName: "go.micro.store.Store",
HandlerType: (*StoreServer)(nil), HandlerType: (*StoreServer)(nil),
@ -609,8 +609,8 @@ var _Store_serviceDesc = grpc.ServiceDesc{
}, },
Streams: []grpc.StreamDesc{ Streams: []grpc.StreamDesc{
{ {
StreamName: "Sync", StreamName: "List",
Handler: _Store_Sync_Handler, Handler: _Store_List_Handler,
ServerStreams: true, ServerStreams: true,
}, },
}, },

View File

@ -3,10 +3,10 @@ syntax = "proto3";
package go.micro.store; package go.micro.store;
service Store { service Store {
rpc List(ListRequest) returns (stream ListResponse) {};
rpc Read(ReadRequest) returns (ReadResponse) {}; rpc Read(ReadRequest) returns (ReadResponse) {};
rpc Write(WriteRequest) returns (WriteResponse) {}; rpc Write(WriteRequest) returns (WriteResponse) {};
rpc Delete(DeleteRequest) returns (DeleteResponse) {}; rpc Delete(DeleteRequest) returns (DeleteResponse) {};
rpc Sync(SyncRequest) returns (stream SyncResponse) {};
} }
message Record { message Record {
@ -38,11 +38,11 @@ message DeleteRequest {
message DeleteResponse {} message DeleteResponse {}
message SyncRequest { message ListRequest {
// optional key // optional key
string key = 1; string key = 1;
} }
message SyncResponse { message ListResponse {
repeated Record records = 1; repeated Record records = 1;
} }

View File

@ -23,8 +23,8 @@ type serviceStore struct {
} }
// Sync all the known records // Sync all the known records
func (s *serviceStore) Sync() ([]*store.Record, error) { func (s *serviceStore) List() ([]*store.Record, error) {
stream, err := s.Client.Sync(context.Background(), &pb.SyncRequest{}, client.WithAddress(s.Nodes...)) stream, err := s.Client.List(context.Background(), &pb.ListRequest{}, client.WithAddress(s.Nodes...))
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -4,8 +4,6 @@ package store
import ( import (
"errors" "errors"
"time" "time"
"github.com/micro/go-micro/config/options"
) )
var ( var (
@ -14,16 +12,14 @@ var (
// Store is a data storage interface // Store is a data storage interface
type Store interface { type Store interface {
// embed options // List all the known records
options.Options List() ([]*Record, error)
// Sync all the known records
Sync() ([]*Record, error)
// Read a record with key // Read a record with key
Read(keys ...string) ([]*Record, error) Read(key ...string) ([]*Record, error)
// Write a record // Write a record
Write(recs ...*Record) error Write(rec ...*Record) error
// Delete a record with key // Delete a record with key
Delete(keys ...string) error Delete(key ...string) error
} }
// Record represents a data record // Record represents a data record