diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index 439be459..fd5bc853 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -110,12 +110,21 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R var grr error - cc, err := g.pool.getConn(address, grpc.WithDefaultCallOptions(grpc.ForceCodec(cf)), - grpc.WithTimeout(opts.DialTimeout), g.secure(), + grpcDialOptions := []grpc.DialOption{ + grpc.WithDefaultCallOptions(grpc.ForceCodec(cf)), + grpc.WithTimeout(opts.DialTimeout), + g.secure(), grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(maxRecvMsgSize), grpc.MaxCallSendMsgSize(maxSendMsgSize), - )) + ), + } + + if opts := g.getGrpcDialOptions(); opts != nil { + grpcDialOptions = append(grpcDialOptions, opts...) + } + + cc, err := g.pool.getConn(address, grpcDialOptions...) if err != nil { return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) } @@ -127,7 +136,11 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R ch := make(chan error, 1) go func() { - err := cc.Invoke(ctx, methodToGRPC(req.Service(), req.Endpoint()), req.Body(), rsp, grpc.CallContentSubtype(cf.Name())) + grpcCallOptions := []grpc.CallOption{grpc.CallContentSubtype(cf.Name())} + if opts := g.getGrpcCallOptions(); opts != nil { + grpcCallOptions = append(grpcCallOptions, opts...) + } + err := cc.Invoke(ctx, methodToGRPC(req.Service(), req.Endpoint()), req.Body(), rsp, grpcCallOptions...) ch <- microError(err) }() @@ -175,7 +188,16 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client wc := wrapCodec{cf} - cc, err := grpc.DialContext(dialCtx, address, grpc.WithDefaultCallOptions(grpc.ForceCodec(wc)), g.secure()) + grpcDialOptions := []grpc.DialOption{ + grpc.WithDefaultCallOptions(grpc.ForceCodec(wc)), + g.secure(), + } + + if opts := g.getGrpcDialOptions(); opts != nil { + grpcDialOptions = append(grpcDialOptions, opts...) + } + + cc, err := grpc.DialContext(dialCtx, address, grpcDialOptions...) if err != nil { return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) } @@ -186,7 +208,11 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client ServerStreams: true, } - st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Service(), req.Endpoint())) + grpcCallOptions := []grpc.CallOption{} + if opts := g.getGrpcCallOptions(); opts != nil { + grpcCallOptions = append(grpcCallOptions, opts...) + } + st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Service(), req.Endpoint()), grpcCallOptions...) if err != nil { return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err)) } @@ -514,6 +540,46 @@ func (g *grpcClient) String() string { return "grpc" } +func (g *grpcClient) getGrpcDialOptions() []grpc.DialOption { + if g.opts.CallOptions.Context == nil { + return nil + } + + v := g.opts.CallOptions.Context.Value(grpcDialOptions{}) + + if v == nil { + return nil + } + + opts, ok := v.([]grpc.DialOption) + + if !ok { + return nil + } + + return opts +} + +func (g *grpcClient) getGrpcCallOptions() []grpc.CallOption { + if g.opts.CallOptions.Context == nil { + return nil + } + + v := g.opts.CallOptions.Context.Value(grpcCallOptions{}) + + if v == nil { + return nil + } + + opts, ok := v.([]grpc.CallOption) + + if !ok { + return nil + } + + return opts +} + func newClient(opts ...client.Option) client.Client { options := client.Options{ Codecs: make(map[string]codec.NewCodec), diff --git a/client/grpc/options.go b/client/grpc/options.go index c702ade3..e7f2fceb 100644 --- a/client/grpc/options.go +++ b/client/grpc/options.go @@ -6,6 +6,7 @@ import ( "crypto/tls" "github.com/micro/go-micro/client" + "google.golang.org/grpc" "google.golang.org/grpc/encoding" ) @@ -23,6 +24,8 @@ type codecsKey struct{} type tlsAuth struct{} type maxRecvMsgSizeKey struct{} type maxSendMsgSizeKey struct{} +type grpcDialOptions struct{} +type grpcCallOptions struct{} // gRPC Codec to be used to encode/decode requests for a given content type func Codec(contentType string, c encoding.Codec) client.Option { @@ -72,3 +75,27 @@ func MaxSendMsgSize(s int) client.Option { o.Context = context.WithValue(o.Context, maxSendMsgSizeKey{}, s) } } + +// +// DialOptions to be used to configure gRPC dial options +// +func DialOptions(opts ...grpc.DialOption) client.CallOption { + return func(o *client.CallOptions) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, grpcDialOptions{}, opts) + } +} + +// +// CallOptions to be used to configure gRPC call options +// +func CallOptions(opts ...grpc.CallOption) client.CallOption { + return func(o *client.CallOptions) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, grpcCallOptions{}, opts) + } +} diff --git a/network/default.go b/network/default.go index 04385dcc..58d74573 100644 --- a/network/default.go +++ b/network/default.go @@ -470,7 +470,7 @@ func (n *network) prunePeerRoutes(peer *node) error { // lookup all routes routable via gw q = []router.QueryOption{ - router.QueryGateway(peer.id), + router.QueryGateway(peer.address), } if err := n.pruneRoutes(q...); err != nil { return err @@ -497,6 +497,25 @@ func (n *network) prune() { log.Debugf("Network failed pruning peer %s routes: %v", id, err) } } + // get a list of all routes + routes, err := n.options.Router.Table().List() + if err != nil { + log.Debugf("Network failed listing routes: %v", err) + continue + } + // collect all the router IDs in the routing table + routers := make(map[string]bool) + for _, route := range routes { + if _, ok := routers[route.Router]; !ok { + routers[route.Router] = true + // if the router is NOT in our peer graph, delete all routes originated by it + if peerNode := n.node.GetPeerNode(route.Router); peerNode == nil { + if err := n.pruneRoutes(router.QueryRouter(route.Router)); err != nil { + log.Debugf("Network failed deleting routes by %s: %v", route.Router, err) + } + } + } + } } } } diff --git a/network/handler/handler.go b/network/handler/handler.go index 218d931a..62162e64 100644 --- a/network/handler/handler.go +++ b/network/handler/handler.go @@ -27,16 +27,14 @@ func flatten(n network.Node, visited map[string]bool) []network.Node { visited = make(map[string]bool) } - // check if already visited - if visited[n.Id()] == true { - return nil - } - // create new list of nodes var nodes []network.Node - // append the current node - nodes = append(nodes, n) + // check if already visited + if visited[n.Id()] == false { + // append the current node + nodes = append(nodes, n) + } // set to visited visited[n.Id()] = true diff --git a/registry/cache/cache.go b/registry/cache/cache.go index a17960e5..8acd79a3 100644 --- a/registry/cache/cache.go +++ b/registry/cache/cache.go @@ -144,7 +144,7 @@ func (c *cache) get(service string) ([]*registry.Service, error) { } // reset the status - if c.getStatus(); err != nil { + if err := c.getStatus(); err != nil { c.setStatus(nil) } diff --git a/registry/etcd/etcd.go b/registry/etcd/etcd.go index 9f01de8d..ed4fec1c 100644 --- a/registry/etcd/etcd.go +++ b/registry/etcd/etcd.go @@ -8,6 +8,7 @@ import ( "errors" "net" "path" + "sort" "strings" "sync" "time" @@ -20,7 +21,7 @@ import ( ) var ( - prefix = "/micro/registry" + prefix = "/micro/registry/" ) type etcdRegistry struct { @@ -148,7 +149,7 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op defer cancel() // look for the existing key - rsp, err := e.client.Get(ctx, nodePath(s.Name, node.Id)) + rsp, err := e.client.Get(ctx, nodePath(s.Name, node.Id), clientv3.WithSerializable()) if err != nil { return err } @@ -310,7 +311,7 @@ func (e *etcdRegistry) GetService(name string) ([]*registry.Service, error) { ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) defer cancel() - rsp, err := e.client.Get(ctx, servicePath(name)+"/", clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend)) + rsp, err := e.client.Get(ctx, servicePath(name)+"/", clientv3.WithPrefix(), clientv3.WithSerializable()) if err != nil { return nil, err } @@ -344,6 +345,7 @@ func (e *etcdRegistry) GetService(name string) ([]*registry.Service, error) { for _, service := range serviceMap { services = append(services, service) } + return services, nil } @@ -354,7 +356,7 @@ func (e *etcdRegistry) ListServices() ([]*registry.Service, error) { ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) defer cancel() - rsp, err := e.client.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend)) + rsp, err := e.client.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithSerializable()) if err != nil { return nil, err } @@ -381,6 +383,9 @@ func (e *etcdRegistry) ListServices() ([]*registry.Service, error) { services = append(services, service) } + // sort the services + sort.Slice(services, func(i, j int) bool { return services[i].Name < services[j].Name }) + return services, nil } diff --git a/router/default.go b/router/default.go index a25990a2..48cdb230 100644 --- a/router/default.go +++ b/router/default.go @@ -319,23 +319,13 @@ func (r *router) advertiseTable() error { for { select { case <-ticker.C: - // list routing table routes to announce - routes, err := r.table.List() + // do full table flush + events, err := r.flushRouteEvents(Update) if err != nil { - return fmt.Errorf("failed listing routes: %s", err) - } - // collect all the added routes before we attempt to add default gateway - events := make([]*Event, len(routes)) - for i, route := range routes { - event := &Event{ - Type: Update, - Timestamp: time.Now(), - Route: route, - } - events[i] = event + return fmt.Errorf("failed flushing routes: %s", err) } - // advertise all routes as Update events to subscribers + // advertise routes to subscribers if len(events) > 0 { log.Debugf("Router flushing table with %d events: %s", len(events), r.options.Id) r.advertWg.Add(1) @@ -692,15 +682,58 @@ func (r *router) flushRouteEvents(evType EventType) ([]*Event, error) { return nil, fmt.Errorf("failed listing routes: %s", err) } + if r.options.Advertise == AdvertiseAll { + // build a list of events to advertise + events := make([]*Event, len(routes)) + for i, route := range routes { + event := &Event{ + Type: evType, + Timestamp: time.Now(), + Route: route, + } + events[i] = event + } + return events, nil + } + + // routeMap stores optimal routes per service + bestRoutes := make(map[string]Route) + + // go through all routes found in the routing table and collapse them to optimal routes + for _, route := range routes { + routeKey := route.Service + "@" + route.Network + optimal, ok := bestRoutes[routeKey] + if !ok { + bestRoutes[routeKey] = route + continue + } + // if the current optimal route metric is higher than routing table route, replace it + if optimal.Metric > route.Metric { + bestRoutes[routeKey] = route + continue + } + // if the metrics are the same, prefer advertising your own route + if optimal.Metric == route.Metric { + if route.Router == r.options.Id { + bestRoutes[routeKey] = route + continue + } + } + } + + log.Debugf("Router advertising %d best routes out of %d", len(bestRoutes), len(routes)) + // build a list of events to advertise - events := make([]*Event, len(routes)) - for i, route := range routes { + events := make([]*Event, len(bestRoutes)) + i := 0 + for _, route := range bestRoutes { event := &Event{ Type: evType, Timestamp: time.Now(), Route: route, } events[i] = event + i++ } return events, nil diff --git a/router/options.go b/router/options.go index 13119e1b..31d82ea8 100644 --- a/router/options.go +++ b/router/options.go @@ -18,6 +18,8 @@ type Options struct { Network string // Registry is the local registry Registry registry.Registry + // Advertise is the advertising strategy + Advertise Strategy // Client for calling router Client client.Client } @@ -64,12 +66,20 @@ func Registry(r registry.Registry) Option { } } +// Strategy sets route advertising strategy +func Advertise(a Strategy) Option { + return func(o *Options) { + o.Advertise = a + } +} + // DefaultOptions returns router default options func DefaultOptions() Options { return Options{ - Id: uuid.New().String(), - Address: DefaultAddress, - Network: DefaultNetwork, - Registry: registry.DefaultRegistry, + Id: uuid.New().String(), + Address: DefaultAddress, + Network: DefaultNetwork, + Registry: registry.DefaultRegistry, + Advertise: AdvertiseBest, } } diff --git a/router/router.go b/router/router.go index 756a5a42..cd300e7e 100644 --- a/router/router.go +++ b/router/router.go @@ -139,6 +139,28 @@ type Advert struct { Events []*Event } +// Strategy is route advertisement strategy +type Strategy int + +const ( + // AdvertiseAll advertises all routes to the network + AdvertiseAll Strategy = iota + // AdvertiseBest advertises optimal routes to the network + AdvertiseBest +) + +// String returns human readable Strategy +func (s Strategy) String() string { + switch s { + case AdvertiseAll: + return "all" + case AdvertiseBest: + return "best" + default: + return "unknown" + } +} + // NewRouter creates new Router and returns it func NewRouter(opts ...Option) Router { return newRouter(opts...) diff --git a/store/etcd/etcd.go b/store/etcd/etcd.go index 7d866d63..bd3697c7 100644 --- a/store/etcd/etcd.go +++ b/store/etcd/etcd.go @@ -6,6 +6,7 @@ import ( "log" client "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/mvcc/mvccpb" "github.com/micro/go-micro/config/options" "github.com/micro/go-micro/store" ) @@ -15,30 +16,56 @@ type ekv struct { kv client.KV } -func (e *ekv) Read(key string) (*store.Record, error) { - keyval, err := e.kv.Get(context.Background(), key) - if err != nil { - return nil, err +func (e *ekv) Read(keys ...string) ([]*store.Record, error) { + var values []*mvccpb.KeyValue + + for _, key := range keys { + keyval, err := e.kv.Get(context.Background(), key) + if err != nil { + return nil, err + } + + if keyval == nil || len(keyval.Kvs) == 0 { + return nil, store.ErrNotFound + } + + values = append(values, keyval.Kvs...) } - if keyval == nil || len(keyval.Kvs) == 0 { - return nil, store.ErrNotFound + var records []*store.Record + + for _, kv := range values { + records = append(records, &store.Record{ + Key: string(kv.Key), + Value: kv.Value, + // TODO: implement expiry + }) } - return &store.Record{ - Key: string(keyval.Kvs[0].Key), - Value: keyval.Kvs[0].Value, - }, nil + return records, nil } -func (e *ekv) Delete(key string) error { - _, err := e.kv.Delete(context.Background(), key) - return err +func (e *ekv) Delete(keys ...string) error { + var gerr error + for _, key := range keys { + _, err := e.kv.Delete(context.Background(), key) + if err != nil { + gerr = err + } + } + return gerr } -func (e *ekv) Write(record *store.Record) error { - _, err := e.kv.Put(context.Background(), record.Key, string(record.Value)) - return err +func (e *ekv) Write(records ...*store.Record) error { + var gerr error + for _, record := range records { + // TODO create lease to expire keys + _, err := e.kv.Put(context.Background(), record.Key, string(record.Value)) + if err != nil { + gerr = err + } + } + return gerr } func (e *ekv) Sync() ([]*store.Record, error) { diff --git a/store/memory/memory.go b/store/memory/memory.go index a7d5a83c..3eeab31d 100644 --- a/store/memory/memory.go +++ b/store/memory/memory.go @@ -48,51 +48,61 @@ func (m *memoryStore) Sync() ([]*store.Record, error) { return values, nil } -func (m *memoryStore) Read(key string) (*store.Record, error) { +func (m *memoryStore) Read(keys ...string) ([]*store.Record, error) { m.RLock() defer m.RUnlock() - v, ok := m.values[key] - if !ok { - return nil, store.ErrNotFound - } + var records []*store.Record - // get expiry - d := v.r.Expiry - t := time.Since(v.c) - - // expired - if d > time.Duration(0) { - if t > d { + for _, key := range keys { + v, ok := m.values[key] + if !ok { return nil, store.ErrNotFound } - // update expiry - v.r.Expiry -= t - v.c = time.Now() + + // get expiry + d := v.r.Expiry + t := time.Since(v.c) + + // expired + if d > time.Duration(0) { + if t > d { + return nil, store.ErrNotFound + } + // update expiry + v.r.Expiry -= t + v.c = time.Now() + } + + records = append(records, v.r) } - return v.r, nil + return records, nil } -func (m *memoryStore) Write(r *store.Record) error { +func (m *memoryStore) Write(records ...*store.Record) error { m.Lock() defer m.Unlock() - // set the record - m.values[r.Key] = &memoryRecord{ - r: r, - c: time.Now(), + for _, r := range records { + // set the record + m.values[r.Key] = &memoryRecord{ + r: r, + c: time.Now(), + } } return nil } -func (m *memoryStore) Delete(key string) error { +func (m *memoryStore) Delete(keys ...string) error { m.Lock() defer m.Unlock() - // delete the value - delete(m.values, key) + for _, key := range keys { + // delete the value + delete(m.values, key) + } return nil } diff --git a/store/memory/memory_test.go b/store/memory/memory_test.go index 639d18db..de889856 100644 --- a/store/memory/memory_test.go +++ b/store/memory/memory_test.go @@ -25,7 +25,7 @@ func TestReadRecordExpire(t *testing.T) { if err != nil { t.Fatal(err) } - if rrec.Expiry >= expire { + if rrec[0].Expiry >= expire { t.Fatal("expiry of read record is not changed") } diff --git a/store/service/handler/handler.go b/store/service/handler/handler.go new file mode 100644 index 00000000..51af02b6 --- /dev/null +++ b/store/service/handler/handler.go @@ -0,0 +1,89 @@ +package handler + +import ( + "context" + "io" + "time" + + "github.com/micro/go-micro/errors" + "github.com/micro/go-micro/store" + pb "github.com/micro/go-micro/store/service/proto" +) + +type Store struct { + Store store.Store +} + +func (s *Store) Read(ctx context.Context, req *pb.ReadRequest, rsp *pb.ReadResponse) error { + vals, err := s.Store.Read(req.Keys...) + if err != nil { + return errors.InternalServerError("go.micro.store", err.Error()) + } + for _, val := range vals { + rsp.Records = append(rsp.Records, &pb.Record{ + Key: val.Key, + Value: val.Value, + Expiry: int64(val.Expiry.Seconds()), + }) + } + return nil +} + +func (s *Store) Write(ctx context.Context, req *pb.WriteRequest, rsp *pb.WriteResponse) error { + var records []*store.Record + + for _, record := range req.Records { + records = append(records, &store.Record{ + Key: record.Key, + Value: record.Value, + Expiry: time.Duration(record.Expiry) * time.Second, + }) + } + + err := s.Store.Write(records...) + if err != nil { + return errors.InternalServerError("go.micro.store", err.Error()) + } + return nil +} + +func (s *Store) Delete(ctx context.Context, req *pb.DeleteRequest, rsp *pb.DeleteResponse) error { + err := s.Store.Delete(req.Keys...) + if err != nil { + return errors.InternalServerError("go.micro.store", err.Error()) + } + return nil +} + +func (s *Store) Sync(ctx context.Context, req *pb.SyncRequest, stream pb.Store_SyncStream) error { + var vals []*store.Record + var err error + + if len(req.Key) > 0 { + vals, err = s.Store.Read(req.Key) + } else { + vals, err = s.Store.Sync() + } + if err != nil { + return errors.InternalServerError("go.micro.store", err.Error()) + } + rsp := new(pb.SyncResponse) + + // TODO: batch sync + for _, val := range vals { + rsp.Records = append(rsp.Records, &pb.Record{ + Key: val.Key, + Value: val.Value, + Expiry: int64(val.Expiry.Seconds()), + }) + } + + err = stream.Send(rsp) + if err == io.EOF { + return nil + } + if err != nil { + return errors.InternalServerError("go.micro.store", err.Error()) + } + return nil +} diff --git a/store/service/proto/store.micro.go b/store/service/proto/store.micro.go new file mode 100644 index 00000000..ce4885ec --- /dev/null +++ b/store/service/proto/store.micro.go @@ -0,0 +1,207 @@ +// Code generated by protoc-gen-micro. DO NOT EDIT. +// source: micro/go-micro/store/service/proto/store.proto + +package go_micro_store + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +import ( + context "context" + client "github.com/micro/go-micro/client" + server "github.com/micro/go-micro/server" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ client.Option +var _ server.Option + +// Client API for Store service + +type StoreService interface { + Read(ctx context.Context, in *ReadRequest, opts ...client.CallOption) (*ReadResponse, error) + Write(ctx context.Context, in *WriteRequest, opts ...client.CallOption) (*WriteResponse, error) + Delete(ctx context.Context, in *DeleteRequest, opts ...client.CallOption) (*DeleteResponse, error) + Sync(ctx context.Context, in *SyncRequest, opts ...client.CallOption) (Store_SyncService, error) +} + +type storeService struct { + c client.Client + name string +} + +func NewStoreService(name string, c client.Client) StoreService { + if c == nil { + c = client.NewClient() + } + if len(name) == 0 { + name = "go.micro.store" + } + return &storeService{ + c: c, + name: name, + } +} + +func (c *storeService) Read(ctx context.Context, in *ReadRequest, opts ...client.CallOption) (*ReadResponse, error) { + req := c.c.NewRequest(c.name, "Store.Read", in) + out := new(ReadResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *storeService) Write(ctx context.Context, in *WriteRequest, opts ...client.CallOption) (*WriteResponse, error) { + req := c.c.NewRequest(c.name, "Store.Write", in) + out := new(WriteResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *storeService) Delete(ctx context.Context, in *DeleteRequest, opts ...client.CallOption) (*DeleteResponse, error) { + req := c.c.NewRequest(c.name, "Store.Delete", in) + out := new(DeleteResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *storeService) Sync(ctx context.Context, in *SyncRequest, opts ...client.CallOption) (Store_SyncService, error) { + req := c.c.NewRequest(c.name, "Store.Sync", &SyncRequest{}) + stream, err := c.c.Stream(ctx, req, opts...) + if err != nil { + return nil, err + } + if err := stream.Send(in); err != nil { + return nil, err + } + return &storeServiceSync{stream}, nil +} + +type Store_SyncService interface { + SendMsg(interface{}) error + RecvMsg(interface{}) error + Close() error + Recv() (*SyncResponse, error) +} + +type storeServiceSync struct { + stream client.Stream +} + +func (x *storeServiceSync) Close() error { + return x.stream.Close() +} + +func (x *storeServiceSync) SendMsg(m interface{}) error { + return x.stream.Send(m) +} + +func (x *storeServiceSync) RecvMsg(m interface{}) error { + return x.stream.Recv(m) +} + +func (x *storeServiceSync) Recv() (*SyncResponse, error) { + m := new(SyncResponse) + err := x.stream.Recv(m) + if err != nil { + return nil, err + } + return m, nil +} + +// Server API for Store service + +type StoreHandler interface { + Read(context.Context, *ReadRequest, *ReadResponse) error + Write(context.Context, *WriteRequest, *WriteResponse) error + Delete(context.Context, *DeleteRequest, *DeleteResponse) error + Sync(context.Context, *SyncRequest, Store_SyncStream) error +} + +func RegisterStoreHandler(s server.Server, hdlr StoreHandler, opts ...server.HandlerOption) error { + type store interface { + Read(ctx context.Context, in *ReadRequest, out *ReadResponse) error + Write(ctx context.Context, in *WriteRequest, out *WriteResponse) error + Delete(ctx context.Context, in *DeleteRequest, out *DeleteResponse) error + Sync(ctx context.Context, stream server.Stream) error + } + type Store struct { + store + } + h := &storeHandler{hdlr} + return s.Handle(s.NewHandler(&Store{h}, opts...)) +} + +type storeHandler struct { + StoreHandler +} + +func (h *storeHandler) Read(ctx context.Context, in *ReadRequest, out *ReadResponse) error { + return h.StoreHandler.Read(ctx, in, out) +} + +func (h *storeHandler) Write(ctx context.Context, in *WriteRequest, out *WriteResponse) error { + return h.StoreHandler.Write(ctx, in, out) +} + +func (h *storeHandler) Delete(ctx context.Context, in *DeleteRequest, out *DeleteResponse) error { + return h.StoreHandler.Delete(ctx, in, out) +} + +func (h *storeHandler) Sync(ctx context.Context, stream server.Stream) error { + m := new(SyncRequest) + if err := stream.Recv(m); err != nil { + return err + } + return h.StoreHandler.Sync(ctx, m, &storeSyncStream{stream}) +} + +type Store_SyncStream interface { + SendMsg(interface{}) error + RecvMsg(interface{}) error + Close() error + Send(*SyncResponse) error +} + +type storeSyncStream struct { + stream server.Stream +} + +func (x *storeSyncStream) Close() error { + return x.stream.Close() +} + +func (x *storeSyncStream) SendMsg(m interface{}) error { + return x.stream.Send(m) +} + +func (x *storeSyncStream) RecvMsg(m interface{}) error { + return x.stream.Recv(m) +} + +func (x *storeSyncStream) Send(m *SyncResponse) error { + return x.stream.Send(m) +} diff --git a/store/service/proto/store.pb.go b/store/service/proto/store.pb.go new file mode 100644 index 00000000..ce166cb7 --- /dev/null +++ b/store/service/proto/store.pb.go @@ -0,0 +1,618 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: micro/go-micro/store/service/proto/store.proto + +package go_micro_store + +import ( + context "context" + fmt "fmt" + proto "github.com/golang/protobuf/proto" + grpc "google.golang.org/grpc" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type Record struct { + // key of the record + Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + // value in the record + Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + // timestamp in unix seconds + Expiry int64 `protobuf:"varint,3,opt,name=expiry,proto3" json:"expiry,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Record) Reset() { *m = Record{} } +func (m *Record) String() string { return proto.CompactTextString(m) } +func (*Record) ProtoMessage() {} +func (*Record) Descriptor() ([]byte, []int) { + return fileDescriptor_f84ccc98e143ed3e, []int{0} +} + +func (m *Record) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Record.Unmarshal(m, b) +} +func (m *Record) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Record.Marshal(b, m, deterministic) +} +func (m *Record) XXX_Merge(src proto.Message) { + xxx_messageInfo_Record.Merge(m, src) +} +func (m *Record) XXX_Size() int { + return xxx_messageInfo_Record.Size(m) +} +func (m *Record) XXX_DiscardUnknown() { + xxx_messageInfo_Record.DiscardUnknown(m) +} + +var xxx_messageInfo_Record proto.InternalMessageInfo + +func (m *Record) GetKey() string { + if m != nil { + return m.Key + } + return "" +} + +func (m *Record) GetValue() []byte { + if m != nil { + return m.Value + } + return nil +} + +func (m *Record) GetExpiry() int64 { + if m != nil { + return m.Expiry + } + return 0 +} + +type ReadRequest struct { + Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ReadRequest) Reset() { *m = ReadRequest{} } +func (m *ReadRequest) String() string { return proto.CompactTextString(m) } +func (*ReadRequest) ProtoMessage() {} +func (*ReadRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_f84ccc98e143ed3e, []int{1} +} + +func (m *ReadRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ReadRequest.Unmarshal(m, b) +} +func (m *ReadRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ReadRequest.Marshal(b, m, deterministic) +} +func (m *ReadRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ReadRequest.Merge(m, src) +} +func (m *ReadRequest) XXX_Size() int { + return xxx_messageInfo_ReadRequest.Size(m) +} +func (m *ReadRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ReadRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_ReadRequest proto.InternalMessageInfo + +func (m *ReadRequest) GetKeys() []string { + if m != nil { + return m.Keys + } + return nil +} + +type ReadResponse struct { + Records []*Record `protobuf:"bytes,1,rep,name=records,proto3" json:"records,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ReadResponse) Reset() { *m = ReadResponse{} } +func (m *ReadResponse) String() string { return proto.CompactTextString(m) } +func (*ReadResponse) ProtoMessage() {} +func (*ReadResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_f84ccc98e143ed3e, []int{2} +} + +func (m *ReadResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ReadResponse.Unmarshal(m, b) +} +func (m *ReadResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ReadResponse.Marshal(b, m, deterministic) +} +func (m *ReadResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_ReadResponse.Merge(m, src) +} +func (m *ReadResponse) XXX_Size() int { + return xxx_messageInfo_ReadResponse.Size(m) +} +func (m *ReadResponse) XXX_DiscardUnknown() { + xxx_messageInfo_ReadResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_ReadResponse proto.InternalMessageInfo + +func (m *ReadResponse) GetRecords() []*Record { + if m != nil { + return m.Records + } + return nil +} + +type WriteRequest struct { + Records []*Record `protobuf:"bytes,2,rep,name=records,proto3" json:"records,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *WriteRequest) Reset() { *m = WriteRequest{} } +func (m *WriteRequest) String() string { return proto.CompactTextString(m) } +func (*WriteRequest) ProtoMessage() {} +func (*WriteRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_f84ccc98e143ed3e, []int{3} +} + +func (m *WriteRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_WriteRequest.Unmarshal(m, b) +} +func (m *WriteRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_WriteRequest.Marshal(b, m, deterministic) +} +func (m *WriteRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_WriteRequest.Merge(m, src) +} +func (m *WriteRequest) XXX_Size() int { + return xxx_messageInfo_WriteRequest.Size(m) +} +func (m *WriteRequest) XXX_DiscardUnknown() { + xxx_messageInfo_WriteRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_WriteRequest proto.InternalMessageInfo + +func (m *WriteRequest) GetRecords() []*Record { + if m != nil { + return m.Records + } + return nil +} + +type WriteResponse struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *WriteResponse) Reset() { *m = WriteResponse{} } +func (m *WriteResponse) String() string { return proto.CompactTextString(m) } +func (*WriteResponse) ProtoMessage() {} +func (*WriteResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_f84ccc98e143ed3e, []int{4} +} + +func (m *WriteResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_WriteResponse.Unmarshal(m, b) +} +func (m *WriteResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_WriteResponse.Marshal(b, m, deterministic) +} +func (m *WriteResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_WriteResponse.Merge(m, src) +} +func (m *WriteResponse) XXX_Size() int { + return xxx_messageInfo_WriteResponse.Size(m) +} +func (m *WriteResponse) XXX_DiscardUnknown() { + xxx_messageInfo_WriteResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_WriteResponse proto.InternalMessageInfo + +type DeleteRequest struct { + Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *DeleteRequest) Reset() { *m = DeleteRequest{} } +func (m *DeleteRequest) String() string { return proto.CompactTextString(m) } +func (*DeleteRequest) ProtoMessage() {} +func (*DeleteRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_f84ccc98e143ed3e, []int{5} +} + +func (m *DeleteRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_DeleteRequest.Unmarshal(m, b) +} +func (m *DeleteRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_DeleteRequest.Marshal(b, m, deterministic) +} +func (m *DeleteRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_DeleteRequest.Merge(m, src) +} +func (m *DeleteRequest) XXX_Size() int { + return xxx_messageInfo_DeleteRequest.Size(m) +} +func (m *DeleteRequest) XXX_DiscardUnknown() { + xxx_messageInfo_DeleteRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_DeleteRequest proto.InternalMessageInfo + +func (m *DeleteRequest) GetKeys() []string { + if m != nil { + return m.Keys + } + return nil +} + +type DeleteResponse struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *DeleteResponse) Reset() { *m = DeleteResponse{} } +func (m *DeleteResponse) String() string { return proto.CompactTextString(m) } +func (*DeleteResponse) ProtoMessage() {} +func (*DeleteResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_f84ccc98e143ed3e, []int{6} +} + +func (m *DeleteResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_DeleteResponse.Unmarshal(m, b) +} +func (m *DeleteResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_DeleteResponse.Marshal(b, m, deterministic) +} +func (m *DeleteResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_DeleteResponse.Merge(m, src) +} +func (m *DeleteResponse) XXX_Size() int { + return xxx_messageInfo_DeleteResponse.Size(m) +} +func (m *DeleteResponse) XXX_DiscardUnknown() { + xxx_messageInfo_DeleteResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_DeleteResponse proto.InternalMessageInfo + +type SyncRequest struct { + // optional key + Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *SyncRequest) Reset() { *m = SyncRequest{} } +func (m *SyncRequest) String() string { return proto.CompactTextString(m) } +func (*SyncRequest) ProtoMessage() {} +func (*SyncRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_f84ccc98e143ed3e, []int{7} +} + +func (m *SyncRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_SyncRequest.Unmarshal(m, b) +} +func (m *SyncRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_SyncRequest.Marshal(b, m, deterministic) +} +func (m *SyncRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_SyncRequest.Merge(m, src) +} +func (m *SyncRequest) XXX_Size() int { + return xxx_messageInfo_SyncRequest.Size(m) +} +func (m *SyncRequest) XXX_DiscardUnknown() { + xxx_messageInfo_SyncRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_SyncRequest proto.InternalMessageInfo + +func (m *SyncRequest) GetKey() string { + if m != nil { + return m.Key + } + return "" +} + +type SyncResponse struct { + Records []*Record `protobuf:"bytes,1,rep,name=records,proto3" json:"records,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *SyncResponse) Reset() { *m = SyncResponse{} } +func (m *SyncResponse) String() string { return proto.CompactTextString(m) } +func (*SyncResponse) ProtoMessage() {} +func (*SyncResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_f84ccc98e143ed3e, []int{8} +} + +func (m *SyncResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_SyncResponse.Unmarshal(m, b) +} +func (m *SyncResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_SyncResponse.Marshal(b, m, deterministic) +} +func (m *SyncResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_SyncResponse.Merge(m, src) +} +func (m *SyncResponse) XXX_Size() int { + return xxx_messageInfo_SyncResponse.Size(m) +} +func (m *SyncResponse) XXX_DiscardUnknown() { + xxx_messageInfo_SyncResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_SyncResponse proto.InternalMessageInfo + +func (m *SyncResponse) GetRecords() []*Record { + if m != nil { + return m.Records + } + return nil +} + +func init() { + proto.RegisterType((*Record)(nil), "go.micro.store.Record") + proto.RegisterType((*ReadRequest)(nil), "go.micro.store.ReadRequest") + proto.RegisterType((*ReadResponse)(nil), "go.micro.store.ReadResponse") + proto.RegisterType((*WriteRequest)(nil), "go.micro.store.WriteRequest") + proto.RegisterType((*WriteResponse)(nil), "go.micro.store.WriteResponse") + proto.RegisterType((*DeleteRequest)(nil), "go.micro.store.DeleteRequest") + proto.RegisterType((*DeleteResponse)(nil), "go.micro.store.DeleteResponse") + proto.RegisterType((*SyncRequest)(nil), "go.micro.store.SyncRequest") + proto.RegisterType((*SyncResponse)(nil), "go.micro.store.SyncResponse") +} + +func init() { + proto.RegisterFile("micro/go-micro/store/service/proto/store.proto", fileDescriptor_f84ccc98e143ed3e) +} + +var fileDescriptor_f84ccc98e143ed3e = []byte{ + // 333 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x92, 0xcb, 0x6e, 0xf2, 0x30, + 0x10, 0x85, 0x09, 0x81, 0xfc, 0x62, 0xb8, 0xfc, 0x68, 0x54, 0xa1, 0x88, 0xde, 0xd2, 0x74, 0x93, + 0x4d, 0x03, 0xa2, 0x2f, 0x50, 0xa9, 0x17, 0xb5, 0x5b, 0xb3, 0xe8, 0x9a, 0x86, 0x11, 0x8a, 0xa0, + 0x98, 0x3a, 0x01, 0x35, 0x2f, 0xd4, 0xe7, 0xac, 0x6c, 0x27, 0x69, 0x90, 0x41, 0xaa, 0xba, 0x1b, + 0x7b, 0xce, 0x1c, 0x9f, 0xf9, 0x64, 0x08, 0xdf, 0xe3, 0x48, 0xf0, 0xd1, 0x82, 0xdf, 0xe8, 0x22, + 0x49, 0xb9, 0xa0, 0x51, 0x42, 0x62, 0x17, 0x47, 0x34, 0xda, 0x08, 0x9e, 0xe6, 0x77, 0xa1, 0xaa, + 0xb1, 0xb7, 0xe0, 0x7a, 0x24, 0x54, 0xb7, 0xfe, 0x33, 0x38, 0x8c, 0x22, 0x2e, 0xe6, 0xd8, 0x07, + 0x7b, 0x49, 0x99, 0x6b, 0x79, 0x56, 0xd0, 0x62, 0xb2, 0xc4, 0x13, 0x68, 0xee, 0x66, 0xab, 0x2d, + 0xb9, 0x75, 0xcf, 0x0a, 0x3a, 0x4c, 0x1f, 0x70, 0x00, 0x0e, 0x7d, 0x6e, 0x62, 0x91, 0xb9, 0xb6, + 0x67, 0x05, 0x36, 0xcb, 0x4f, 0xfe, 0x15, 0xb4, 0x19, 0xcd, 0xe6, 0x8c, 0x3e, 0xb6, 0x94, 0xa4, + 0x88, 0xd0, 0x58, 0x52, 0x96, 0xb8, 0x96, 0x67, 0x07, 0x2d, 0xa6, 0x6a, 0xff, 0x0e, 0x3a, 0x5a, + 0x92, 0x6c, 0xf8, 0x3a, 0x21, 0x1c, 0xc3, 0x3f, 0xa1, 0x1e, 0xd7, 0xb2, 0xf6, 0x64, 0x10, 0xee, + 0xc7, 0x0b, 0x75, 0x36, 0x56, 0xc8, 0xa4, 0xc3, 0xab, 0x88, 0x53, 0x2a, 0x5e, 0xa9, 0x38, 0xd4, + 0x7f, 0xe7, 0xf0, 0x1f, 0xba, 0xb9, 0x83, 0x0e, 0xe1, 0x5f, 0x43, 0xf7, 0x81, 0x56, 0xf4, 0xe3, + 0x79, 0x28, 0x79, 0x1f, 0x7a, 0x85, 0x28, 0x1f, 0xbb, 0x84, 0xf6, 0x34, 0x5b, 0x47, 0xc5, 0x90, + 0x41, 0x4f, 0x46, 0xd5, 0x82, 0xbf, 0x2e, 0x3b, 0xf9, 0xaa, 0x43, 0x73, 0x2a, 0x3b, 0x78, 0x0f, + 0x0d, 0x09, 0x0e, 0x4f, 0xcd, 0x91, 0x92, 0xf8, 0xf0, 0xec, 0x70, 0x33, 0xcf, 0x5b, 0xc3, 0x27, + 0x68, 0xaa, 0xcd, 0xd1, 0x10, 0x56, 0x91, 0x0e, 0xcf, 0x8f, 0x74, 0x4b, 0x9f, 0x17, 0x70, 0x34, + 0x0b, 0x34, 0xa4, 0x7b, 0x20, 0x87, 0x17, 0xc7, 0xda, 0xa5, 0xd5, 0x23, 0x34, 0x24, 0x23, 0x73, + 0xaf, 0x0a, 0x5a, 0x73, 0xaf, 0x2a, 0x56, 0xbf, 0x36, 0xb6, 0xde, 0x1c, 0xf5, 0xb7, 0x6f, 0xbf, + 0x03, 0x00, 0x00, 0xff, 0xff, 0x30, 0xc8, 0x99, 0x52, 0x0d, 0x03, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// StoreClient is the client API for Store service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type StoreClient interface { + Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadResponse, error) + Write(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteResponse, error) + Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error) + Sync(ctx context.Context, in *SyncRequest, opts ...grpc.CallOption) (Store_SyncClient, error) +} + +type storeClient struct { + cc *grpc.ClientConn +} + +func NewStoreClient(cc *grpc.ClientConn) StoreClient { + return &storeClient{cc} +} + +func (c *storeClient) Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadResponse, error) { + out := new(ReadResponse) + err := c.cc.Invoke(ctx, "/go.micro.store.Store/Read", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *storeClient) Write(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteResponse, error) { + out := new(WriteResponse) + err := c.cc.Invoke(ctx, "/go.micro.store.Store/Write", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *storeClient) Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error) { + out := new(DeleteResponse) + err := c.cc.Invoke(ctx, "/go.micro.store.Store/Delete", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *storeClient) Sync(ctx context.Context, in *SyncRequest, opts ...grpc.CallOption) (Store_SyncClient, error) { + stream, err := c.cc.NewStream(ctx, &_Store_serviceDesc.Streams[0], "/go.micro.store.Store/Sync", opts...) + if err != nil { + return nil, err + } + x := &storeSyncClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Store_SyncClient interface { + Recv() (*SyncResponse, error) + grpc.ClientStream +} + +type storeSyncClient struct { + grpc.ClientStream +} + +func (x *storeSyncClient) Recv() (*SyncResponse, error) { + m := new(SyncResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// StoreServer is the server API for Store service. +type StoreServer interface { + Read(context.Context, *ReadRequest) (*ReadResponse, error) + Write(context.Context, *WriteRequest) (*WriteResponse, error) + Delete(context.Context, *DeleteRequest) (*DeleteResponse, error) + Sync(*SyncRequest, Store_SyncServer) error +} + +func RegisterStoreServer(s *grpc.Server, srv StoreServer) { + s.RegisterService(&_Store_serviceDesc, srv) +} + +func _Store_Read_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ReadRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(StoreServer).Read(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/go.micro.store.Store/Read", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(StoreServer).Read(ctx, req.(*ReadRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Store_Write_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(WriteRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(StoreServer).Write(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/go.micro.store.Store/Write", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(StoreServer).Write(ctx, req.(*WriteRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Store_Delete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(DeleteRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(StoreServer).Delete(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/go.micro.store.Store/Delete", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(StoreServer).Delete(ctx, req.(*DeleteRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Store_Sync_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(SyncRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(StoreServer).Sync(m, &storeSyncServer{stream}) +} + +type Store_SyncServer interface { + Send(*SyncResponse) error + grpc.ServerStream +} + +type storeSyncServer struct { + grpc.ServerStream +} + +func (x *storeSyncServer) Send(m *SyncResponse) error { + return x.ServerStream.SendMsg(m) +} + +var _Store_serviceDesc = grpc.ServiceDesc{ + ServiceName: "go.micro.store.Store", + HandlerType: (*StoreServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Read", + Handler: _Store_Read_Handler, + }, + { + MethodName: "Write", + Handler: _Store_Write_Handler, + }, + { + MethodName: "Delete", + Handler: _Store_Delete_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "Sync", + Handler: _Store_Sync_Handler, + ServerStreams: true, + }, + }, + Metadata: "micro/go-micro/store/service/proto/store.proto", +} diff --git a/store/service/proto/store.proto b/store/service/proto/store.proto new file mode 100644 index 00000000..a5abe4aa --- /dev/null +++ b/store/service/proto/store.proto @@ -0,0 +1,48 @@ +syntax = "proto3"; + +package go.micro.store; + +service Store { + rpc Read(ReadRequest) returns (ReadResponse) {}; + rpc Write(WriteRequest) returns (WriteResponse) {}; + rpc Delete(DeleteRequest) returns (DeleteResponse) {}; + rpc Sync(SyncRequest) returns (stream SyncResponse) {}; +} + +message Record { + // key of the record + string key = 1; + // value in the record + bytes value = 2; + // timestamp in unix seconds + int64 expiry = 3; +} + +message ReadRequest { + repeated string keys = 1; +} + +message ReadResponse { + repeated Record records = 1; +} + +message WriteRequest { + repeated Record records = 2; +} + +message WriteResponse {} + +message DeleteRequest { + repeated string keys = 1; +} + +message DeleteResponse {} + +message SyncRequest { + // optional key + string key = 1; +} + +message SyncResponse { + repeated Record records = 1; +} diff --git a/store/service/service.go b/store/service/service.go new file mode 100644 index 00000000..d7cc3805 --- /dev/null +++ b/store/service/service.go @@ -0,0 +1,119 @@ +// Package service implements the store service interface +package service + +import ( + "context" + "io" + "time" + + "github.com/micro/go-micro/client" + "github.com/micro/go-micro/config/options" + "github.com/micro/go-micro/store" + pb "github.com/micro/go-micro/store/service/proto" +) + +type serviceStore struct { + options.Options + + // Addresses of the nodes + Nodes []string + + // store service client + Client pb.StoreService +} + +// Sync all the known records +func (s *serviceStore) Sync() ([]*store.Record, error) { + stream, err := s.Client.Sync(context.Background(), &pb.SyncRequest{}, client.WithAddress(s.Nodes...)) + if err != nil { + return nil, err + } + defer stream.Close() + + var records []*store.Record + + for { + rsp, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + return records, err + } + for _, record := range rsp.Records { + records = append(records, &store.Record{ + Key: record.Key, + Value: record.Value, + Expiry: time.Duration(record.Expiry) * time.Second, + }) + } + } + + return records, nil +} + +// Read a record with key +func (s *serviceStore) Read(keys ...string) ([]*store.Record, error) { + rsp, err := s.Client.Read(context.Background(), &pb.ReadRequest{ + Keys: keys, + }, client.WithAddress(s.Nodes...)) + if err != nil { + return nil, err + } + var records []*store.Record + for _, val := range rsp.Records { + records = append(records, &store.Record{ + Key: val.Key, + Value: val.Value, + Expiry: time.Duration(val.Expiry) * time.Second, + }) + } + return records, nil +} + +// Write a record +func (s *serviceStore) Write(recs ...*store.Record) error { + var records []*pb.Record + + for _, record := range recs { + records = append(records, &pb.Record{ + Key: record.Key, + Value: record.Value, + Expiry: int64(record.Expiry.Seconds()), + }) + } + + _, err := s.Client.Write(context.Background(), &pb.WriteRequest{ + Records: records, + }, client.WithAddress(s.Nodes...)) + + return err +} + +// Delete a record with key +func (s *serviceStore) Delete(keys ...string) error { + _, err := s.Client.Delete(context.Background(), &pb.DeleteRequest{ + Keys: keys, + }, client.WithAddress(s.Nodes...)) + return err +} + +// NewStore returns a new store service implementation +func NewStore(opts ...options.Option) store.Store { + options := options.NewOptions(opts...) + + var nodes []string + + n, ok := options.Values().Get("store.nodes") + if ok { + nodes = n.([]string) + } + + service := &serviceStore{ + Options: options, + Nodes: nodes, + Client: pb.NewStoreService("go.micro.store", client.DefaultClient), + } + + return service +} diff --git a/store/store.go b/store/store.go index c6442947..e3d8efac 100644 --- a/store/store.go +++ b/store/store.go @@ -19,11 +19,11 @@ type Store interface { // Sync all the known records Sync() ([]*Record, error) // Read a record with key - Read(key string) (*Record, error) + Read(keys ...string) ([]*Record, error) // Write a record - Write(r *Record) error + Write(recs ...*Record) error // Delete a record with key - Delete(key string) error + Delete(keys ...string) error } // Record represents a data record diff --git a/sync/map.go b/sync/map.go index a87bd988..a9611ff0 100644 --- a/sync/map.go +++ b/sync/map.go @@ -39,8 +39,12 @@ func (m *syncMap) Read(key, val interface{}) error { return err } + if len(kval) == 0 { + return store.ErrNotFound + } + // decode value - return json.Unmarshal(kval.Value, val) + return json.Unmarshal(kval[0].Value, val) } func (m *syncMap) Write(key, val interface{}) error { diff --git a/tunnel/default.go b/tunnel/default.go index 6107021e..c65829b1 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -919,14 +919,12 @@ func (t *tun) Close() error { default: close(t.closed) t.connected = false - - // send a close message - // we don't close the link - // just the tunnel - return t.close() } - return nil + // send a close message + // we don't close the link + // just the tunnel + return t.close() } // Dial an address diff --git a/tunnel/link.go b/tunnel/link.go index f2395727..181121e7 100644 --- a/tunnel/link.go +++ b/tunnel/link.go @@ -98,7 +98,6 @@ func (l *link) Close() error { return nil default: close(l.closed) - return nil } return nil diff --git a/tunnel/listener.go b/tunnel/listener.go index ee394519..f154b2a6 100644 --- a/tunnel/listener.go +++ b/tunnel/listener.go @@ -181,5 +181,4 @@ func (t *tunListener) Accept() (Session, error) { } return c, nil } - return nil, nil } diff --git a/tunnel/session.go b/tunnel/session.go index 5d93fc32..6757f150 100644 --- a/tunnel/session.go +++ b/tunnel/session.go @@ -237,8 +237,6 @@ func (s *session) Send(m *transport.Message) error { case <-s.closed: return io.EOF } - - return nil } // Recv is used to receive a message