From b076ef906aca82d47489d87109718d6f6cdfacbe Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Mon, 9 Sep 2019 08:57:57 -0700 Subject: [PATCH 1/3] Add service registry --- registry/proto/registry.micro.go | 224 ++++++++ registry/proto/registry.pb.go | 848 +++++++++++++++++++++++++++++++ registry/proto/registry.proto | 73 +++ registry/service/service.go | 155 ++++++ registry/service/util.go | 133 +++++ registry/service/watcher.go | 49 ++ 6 files changed, 1482 insertions(+) create mode 100644 registry/proto/registry.micro.go create mode 100644 registry/proto/registry.pb.go create mode 100644 registry/proto/registry.proto create mode 100644 registry/service/service.go create mode 100644 registry/service/util.go create mode 100644 registry/service/watcher.go diff --git a/registry/proto/registry.micro.go b/registry/proto/registry.micro.go new file mode 100644 index 00000000..3b78fdbf --- /dev/null +++ b/registry/proto/registry.micro.go @@ -0,0 +1,224 @@ +// Code generated by protoc-gen-micro. DO NOT EDIT. +// source: micro/go-micro/registry/proto/registry.proto + +package go_micro_registry + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +import ( + context "context" + client "github.com/micro/go-micro/client" + server "github.com/micro/go-micro/server" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ client.Option +var _ server.Option + +// Client API for Registry service + +type RegistryService interface { + GetService(ctx context.Context, in *GetRequest, opts ...client.CallOption) (*GetResponse, error) + Register(ctx context.Context, in *Service, opts ...client.CallOption) (*EmptyResponse, error) + Deregister(ctx context.Context, in *Service, opts ...client.CallOption) (*EmptyResponse, error) + ListServices(ctx context.Context, in *ListRequest, opts ...client.CallOption) (*ListResponse, error) + Watch(ctx context.Context, in *WatchRequest, opts ...client.CallOption) (Registry_WatchService, error) +} + +type registryService struct { + c client.Client + name string +} + +func NewRegistryService(name string, c client.Client) RegistryService { + if c == nil { + c = client.NewClient() + } + if len(name) == 0 { + name = "go.micro.registry" + } + return ®istryService{ + c: c, + name: name, + } +} + +func (c *registryService) GetService(ctx context.Context, in *GetRequest, opts ...client.CallOption) (*GetResponse, error) { + req := c.c.NewRequest(c.name, "Registry.GetService", in) + out := new(GetResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *registryService) Register(ctx context.Context, in *Service, opts ...client.CallOption) (*EmptyResponse, error) { + req := c.c.NewRequest(c.name, "Registry.Register", in) + out := new(EmptyResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *registryService) Deregister(ctx context.Context, in *Service, opts ...client.CallOption) (*EmptyResponse, error) { + req := c.c.NewRequest(c.name, "Registry.Deregister", in) + out := new(EmptyResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *registryService) ListServices(ctx context.Context, in *ListRequest, opts ...client.CallOption) (*ListResponse, error) { + req := c.c.NewRequest(c.name, "Registry.ListServices", in) + out := new(ListResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *registryService) Watch(ctx context.Context, in *WatchRequest, opts ...client.CallOption) (Registry_WatchService, error) { + req := c.c.NewRequest(c.name, "Registry.Watch", &WatchRequest{}) + stream, err := c.c.Stream(ctx, req, opts...) + if err != nil { + return nil, err + } + if err := stream.Send(in); err != nil { + return nil, err + } + return ®istryServiceWatch{stream}, nil +} + +type Registry_WatchService interface { + SendMsg(interface{}) error + RecvMsg(interface{}) error + Close() error + Recv() (*Result, error) +} + +type registryServiceWatch struct { + stream client.Stream +} + +func (x *registryServiceWatch) Close() error { + return x.stream.Close() +} + +func (x *registryServiceWatch) SendMsg(m interface{}) error { + return x.stream.Send(m) +} + +func (x *registryServiceWatch) RecvMsg(m interface{}) error { + return x.stream.Recv(m) +} + +func (x *registryServiceWatch) Recv() (*Result, error) { + m := new(Result) + err := x.stream.Recv(m) + if err != nil { + return nil, err + } + return m, nil +} + +// Server API for Registry service + +type RegistryHandler interface { + GetService(context.Context, *GetRequest, *GetResponse) error + Register(context.Context, *Service, *EmptyResponse) error + Deregister(context.Context, *Service, *EmptyResponse) error + ListServices(context.Context, *ListRequest, *ListResponse) error + Watch(context.Context, *WatchRequest, Registry_WatchStream) error +} + +func RegisterRegistryHandler(s server.Server, hdlr RegistryHandler, opts ...server.HandlerOption) error { + type registry interface { + GetService(ctx context.Context, in *GetRequest, out *GetResponse) error + Register(ctx context.Context, in *Service, out *EmptyResponse) error + Deregister(ctx context.Context, in *Service, out *EmptyResponse) error + ListServices(ctx context.Context, in *ListRequest, out *ListResponse) error + Watch(ctx context.Context, stream server.Stream) error + } + type Registry struct { + registry + } + h := ®istryHandler{hdlr} + return s.Handle(s.NewHandler(&Registry{h}, opts...)) +} + +type registryHandler struct { + RegistryHandler +} + +func (h *registryHandler) GetService(ctx context.Context, in *GetRequest, out *GetResponse) error { + return h.RegistryHandler.GetService(ctx, in, out) +} + +func (h *registryHandler) Register(ctx context.Context, in *Service, out *EmptyResponse) error { + return h.RegistryHandler.Register(ctx, in, out) +} + +func (h *registryHandler) Deregister(ctx context.Context, in *Service, out *EmptyResponse) error { + return h.RegistryHandler.Deregister(ctx, in, out) +} + +func (h *registryHandler) ListServices(ctx context.Context, in *ListRequest, out *ListResponse) error { + return h.RegistryHandler.ListServices(ctx, in, out) +} + +func (h *registryHandler) Watch(ctx context.Context, stream server.Stream) error { + m := new(WatchRequest) + if err := stream.Recv(m); err != nil { + return err + } + return h.RegistryHandler.Watch(ctx, m, ®istryWatchStream{stream}) +} + +type Registry_WatchStream interface { + SendMsg(interface{}) error + RecvMsg(interface{}) error + Close() error + Send(*Result) error +} + +type registryWatchStream struct { + stream server.Stream +} + +func (x *registryWatchStream) Close() error { + return x.stream.Close() +} + +func (x *registryWatchStream) SendMsg(m interface{}) error { + return x.stream.Send(m) +} + +func (x *registryWatchStream) RecvMsg(m interface{}) error { + return x.stream.Recv(m) +} + +func (x *registryWatchStream) Send(m *Result) error { + return x.stream.Send(m) +} diff --git a/registry/proto/registry.pb.go b/registry/proto/registry.pb.go new file mode 100644 index 00000000..b05602ed --- /dev/null +++ b/registry/proto/registry.pb.go @@ -0,0 +1,848 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: micro/go-micro/registry/proto/registry.proto + +package go_micro_registry + +import ( + context "context" + fmt "fmt" + proto "github.com/golang/protobuf/proto" + grpc "google.golang.org/grpc" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +// Service represents a go-micro service +type Service struct { + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Version string `protobuf:"bytes,2,opt,name=version,proto3" json:"version,omitempty"` + Metadata map[string]string `protobuf:"bytes,3,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + Endpoints []*Endpoint `protobuf:"bytes,4,rep,name=endpoints,proto3" json:"endpoints,omitempty"` + Nodes []*Node `protobuf:"bytes,5,rep,name=nodes,proto3" json:"nodes,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Service) Reset() { *m = Service{} } +func (m *Service) String() string { return proto.CompactTextString(m) } +func (*Service) ProtoMessage() {} +func (*Service) Descriptor() ([]byte, []int) { + return fileDescriptor_f287a6b809166ad2, []int{0} +} + +func (m *Service) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Service.Unmarshal(m, b) +} +func (m *Service) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Service.Marshal(b, m, deterministic) +} +func (m *Service) XXX_Merge(src proto.Message) { + xxx_messageInfo_Service.Merge(m, src) +} +func (m *Service) XXX_Size() int { + return xxx_messageInfo_Service.Size(m) +} +func (m *Service) XXX_DiscardUnknown() { + xxx_messageInfo_Service.DiscardUnknown(m) +} + +var xxx_messageInfo_Service proto.InternalMessageInfo + +func (m *Service) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +func (m *Service) GetVersion() string { + if m != nil { + return m.Version + } + return "" +} + +func (m *Service) GetMetadata() map[string]string { + if m != nil { + return m.Metadata + } + return nil +} + +func (m *Service) GetEndpoints() []*Endpoint { + if m != nil { + return m.Endpoints + } + return nil +} + +func (m *Service) GetNodes() []*Node { + if m != nil { + return m.Nodes + } + return nil +} + +// Node represents the node the service is on +type Node struct { + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Address string `protobuf:"bytes,2,opt,name=address,proto3" json:"address,omitempty"` + Port int64 `protobuf:"varint,3,opt,name=port,proto3" json:"port,omitempty"` + Metadata map[string]string `protobuf:"bytes,4,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Node) Reset() { *m = Node{} } +func (m *Node) String() string { return proto.CompactTextString(m) } +func (*Node) ProtoMessage() {} +func (*Node) Descriptor() ([]byte, []int) { + return fileDescriptor_f287a6b809166ad2, []int{1} +} + +func (m *Node) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Node.Unmarshal(m, b) +} +func (m *Node) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Node.Marshal(b, m, deterministic) +} +func (m *Node) XXX_Merge(src proto.Message) { + xxx_messageInfo_Node.Merge(m, src) +} +func (m *Node) XXX_Size() int { + return xxx_messageInfo_Node.Size(m) +} +func (m *Node) XXX_DiscardUnknown() { + xxx_messageInfo_Node.DiscardUnknown(m) +} + +var xxx_messageInfo_Node proto.InternalMessageInfo + +func (m *Node) GetId() string { + if m != nil { + return m.Id + } + return "" +} + +func (m *Node) GetAddress() string { + if m != nil { + return m.Address + } + return "" +} + +func (m *Node) GetPort() int64 { + if m != nil { + return m.Port + } + return 0 +} + +func (m *Node) GetMetadata() map[string]string { + if m != nil { + return m.Metadata + } + return nil +} + +// Endpoint is a endpoint provided by a service +type Endpoint struct { + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Request *Value `protobuf:"bytes,2,opt,name=request,proto3" json:"request,omitempty"` + Response *Value `protobuf:"bytes,3,opt,name=response,proto3" json:"response,omitempty"` + Metadata map[string]string `protobuf:"bytes,4,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Endpoint) Reset() { *m = Endpoint{} } +func (m *Endpoint) String() string { return proto.CompactTextString(m) } +func (*Endpoint) ProtoMessage() {} +func (*Endpoint) Descriptor() ([]byte, []int) { + return fileDescriptor_f287a6b809166ad2, []int{2} +} + +func (m *Endpoint) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Endpoint.Unmarshal(m, b) +} +func (m *Endpoint) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Endpoint.Marshal(b, m, deterministic) +} +func (m *Endpoint) XXX_Merge(src proto.Message) { + xxx_messageInfo_Endpoint.Merge(m, src) +} +func (m *Endpoint) XXX_Size() int { + return xxx_messageInfo_Endpoint.Size(m) +} +func (m *Endpoint) XXX_DiscardUnknown() { + xxx_messageInfo_Endpoint.DiscardUnknown(m) +} + +var xxx_messageInfo_Endpoint proto.InternalMessageInfo + +func (m *Endpoint) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +func (m *Endpoint) GetRequest() *Value { + if m != nil { + return m.Request + } + return nil +} + +func (m *Endpoint) GetResponse() *Value { + if m != nil { + return m.Response + } + return nil +} + +func (m *Endpoint) GetMetadata() map[string]string { + if m != nil { + return m.Metadata + } + return nil +} + +// Value is an opaque value for a request or response +type Value struct { + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Type string `protobuf:"bytes,2,opt,name=type,proto3" json:"type,omitempty"` + Values []*Value `protobuf:"bytes,3,rep,name=values,proto3" json:"values,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Value) Reset() { *m = Value{} } +func (m *Value) String() string { return proto.CompactTextString(m) } +func (*Value) ProtoMessage() {} +func (*Value) Descriptor() ([]byte, []int) { + return fileDescriptor_f287a6b809166ad2, []int{3} +} + +func (m *Value) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Value.Unmarshal(m, b) +} +func (m *Value) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Value.Marshal(b, m, deterministic) +} +func (m *Value) XXX_Merge(src proto.Message) { + xxx_messageInfo_Value.Merge(m, src) +} +func (m *Value) XXX_Size() int { + return xxx_messageInfo_Value.Size(m) +} +func (m *Value) XXX_DiscardUnknown() { + xxx_messageInfo_Value.DiscardUnknown(m) +} + +var xxx_messageInfo_Value proto.InternalMessageInfo + +func (m *Value) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +func (m *Value) GetType() string { + if m != nil { + return m.Type + } + return "" +} + +func (m *Value) GetValues() []*Value { + if m != nil { + return m.Values + } + return nil +} + +// Result is returns by the watcher +type Result struct { + Action string `protobuf:"bytes,1,opt,name=action,proto3" json:"action,omitempty"` + Service *Service `protobuf:"bytes,2,opt,name=service,proto3" json:"service,omitempty"` + Timestamp int64 `protobuf:"varint,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Result) Reset() { *m = Result{} } +func (m *Result) String() string { return proto.CompactTextString(m) } +func (*Result) ProtoMessage() {} +func (*Result) Descriptor() ([]byte, []int) { + return fileDescriptor_f287a6b809166ad2, []int{4} +} + +func (m *Result) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Result.Unmarshal(m, b) +} +func (m *Result) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Result.Marshal(b, m, deterministic) +} +func (m *Result) XXX_Merge(src proto.Message) { + xxx_messageInfo_Result.Merge(m, src) +} +func (m *Result) XXX_Size() int { + return xxx_messageInfo_Result.Size(m) +} +func (m *Result) XXX_DiscardUnknown() { + xxx_messageInfo_Result.DiscardUnknown(m) +} + +var xxx_messageInfo_Result proto.InternalMessageInfo + +func (m *Result) GetAction() string { + if m != nil { + return m.Action + } + return "" +} + +func (m *Result) GetService() *Service { + if m != nil { + return m.Service + } + return nil +} + +func (m *Result) GetTimestamp() int64 { + if m != nil { + return m.Timestamp + } + return 0 +} + +type EmptyResponse struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *EmptyResponse) Reset() { *m = EmptyResponse{} } +func (m *EmptyResponse) String() string { return proto.CompactTextString(m) } +func (*EmptyResponse) ProtoMessage() {} +func (*EmptyResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_f287a6b809166ad2, []int{5} +} + +func (m *EmptyResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_EmptyResponse.Unmarshal(m, b) +} +func (m *EmptyResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_EmptyResponse.Marshal(b, m, deterministic) +} +func (m *EmptyResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_EmptyResponse.Merge(m, src) +} +func (m *EmptyResponse) XXX_Size() int { + return xxx_messageInfo_EmptyResponse.Size(m) +} +func (m *EmptyResponse) XXX_DiscardUnknown() { + xxx_messageInfo_EmptyResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_EmptyResponse proto.InternalMessageInfo + +type GetRequest struct { + Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *GetRequest) Reset() { *m = GetRequest{} } +func (m *GetRequest) String() string { return proto.CompactTextString(m) } +func (*GetRequest) ProtoMessage() {} +func (*GetRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_f287a6b809166ad2, []int{6} +} + +func (m *GetRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_GetRequest.Unmarshal(m, b) +} +func (m *GetRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_GetRequest.Marshal(b, m, deterministic) +} +func (m *GetRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetRequest.Merge(m, src) +} +func (m *GetRequest) XXX_Size() int { + return xxx_messageInfo_GetRequest.Size(m) +} +func (m *GetRequest) XXX_DiscardUnknown() { + xxx_messageInfo_GetRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_GetRequest proto.InternalMessageInfo + +func (m *GetRequest) GetService() string { + if m != nil { + return m.Service + } + return "" +} + +type GetResponse struct { + Services []*Service `protobuf:"bytes,1,rep,name=services,proto3" json:"services,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *GetResponse) Reset() { *m = GetResponse{} } +func (m *GetResponse) String() string { return proto.CompactTextString(m) } +func (*GetResponse) ProtoMessage() {} +func (*GetResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_f287a6b809166ad2, []int{7} +} + +func (m *GetResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_GetResponse.Unmarshal(m, b) +} +func (m *GetResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_GetResponse.Marshal(b, m, deterministic) +} +func (m *GetResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetResponse.Merge(m, src) +} +func (m *GetResponse) XXX_Size() int { + return xxx_messageInfo_GetResponse.Size(m) +} +func (m *GetResponse) XXX_DiscardUnknown() { + xxx_messageInfo_GetResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_GetResponse proto.InternalMessageInfo + +func (m *GetResponse) GetServices() []*Service { + if m != nil { + return m.Services + } + return nil +} + +type ListRequest struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ListRequest) Reset() { *m = ListRequest{} } +func (m *ListRequest) String() string { return proto.CompactTextString(m) } +func (*ListRequest) ProtoMessage() {} +func (*ListRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_f287a6b809166ad2, []int{8} +} + +func (m *ListRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ListRequest.Unmarshal(m, b) +} +func (m *ListRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ListRequest.Marshal(b, m, deterministic) +} +func (m *ListRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ListRequest.Merge(m, src) +} +func (m *ListRequest) XXX_Size() int { + return xxx_messageInfo_ListRequest.Size(m) +} +func (m *ListRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ListRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_ListRequest proto.InternalMessageInfo + +type ListResponse struct { + Services []*Service `protobuf:"bytes,1,rep,name=services,proto3" json:"services,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ListResponse) Reset() { *m = ListResponse{} } +func (m *ListResponse) String() string { return proto.CompactTextString(m) } +func (*ListResponse) ProtoMessage() {} +func (*ListResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_f287a6b809166ad2, []int{9} +} + +func (m *ListResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ListResponse.Unmarshal(m, b) +} +func (m *ListResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ListResponse.Marshal(b, m, deterministic) +} +func (m *ListResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_ListResponse.Merge(m, src) +} +func (m *ListResponse) XXX_Size() int { + return xxx_messageInfo_ListResponse.Size(m) +} +func (m *ListResponse) XXX_DiscardUnknown() { + xxx_messageInfo_ListResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_ListResponse proto.InternalMessageInfo + +func (m *ListResponse) GetServices() []*Service { + if m != nil { + return m.Services + } + return nil +} + +type WatchRequest struct { + // service is optional + Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *WatchRequest) Reset() { *m = WatchRequest{} } +func (m *WatchRequest) String() string { return proto.CompactTextString(m) } +func (*WatchRequest) ProtoMessage() {} +func (*WatchRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_f287a6b809166ad2, []int{10} +} + +func (m *WatchRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_WatchRequest.Unmarshal(m, b) +} +func (m *WatchRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_WatchRequest.Marshal(b, m, deterministic) +} +func (m *WatchRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_WatchRequest.Merge(m, src) +} +func (m *WatchRequest) XXX_Size() int { + return xxx_messageInfo_WatchRequest.Size(m) +} +func (m *WatchRequest) XXX_DiscardUnknown() { + xxx_messageInfo_WatchRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_WatchRequest proto.InternalMessageInfo + +func (m *WatchRequest) GetService() string { + if m != nil { + return m.Service + } + return "" +} + +func init() { + proto.RegisterType((*Service)(nil), "go.micro.registry.Service") + proto.RegisterMapType((map[string]string)(nil), "go.micro.registry.Service.MetadataEntry") + proto.RegisterType((*Node)(nil), "go.micro.registry.Node") + proto.RegisterMapType((map[string]string)(nil), "go.micro.registry.Node.MetadataEntry") + proto.RegisterType((*Endpoint)(nil), "go.micro.registry.Endpoint") + proto.RegisterMapType((map[string]string)(nil), "go.micro.registry.Endpoint.MetadataEntry") + proto.RegisterType((*Value)(nil), "go.micro.registry.Value") + proto.RegisterType((*Result)(nil), "go.micro.registry.Result") + proto.RegisterType((*EmptyResponse)(nil), "go.micro.registry.EmptyResponse") + proto.RegisterType((*GetRequest)(nil), "go.micro.registry.GetRequest") + proto.RegisterType((*GetResponse)(nil), "go.micro.registry.GetResponse") + proto.RegisterType((*ListRequest)(nil), "go.micro.registry.ListRequest") + proto.RegisterType((*ListResponse)(nil), "go.micro.registry.ListResponse") + proto.RegisterType((*WatchRequest)(nil), "go.micro.registry.WatchRequest") +} + +func init() { + proto.RegisterFile("micro/go-micro/registry/proto/registry.proto", fileDescriptor_f287a6b809166ad2) +} + +var fileDescriptor_f287a6b809166ad2 = []byte{ + // 577 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x55, 0x6d, 0x8b, 0xd3, 0x4c, + 0x14, 0x6d, 0x92, 0xbe, 0xde, 0x6e, 0x9f, 0x47, 0x2f, 0xa2, 0x31, 0xbe, 0x95, 0x80, 0x52, 0xc1, + 0xcd, 0x2e, 0x75, 0x11, 0x5f, 0x3e, 0x09, 0x5b, 0x17, 0x64, 0x57, 0x70, 0x04, 0xfd, 0x1c, 0x9b, + 0x4b, 0x0d, 0x6e, 0x5e, 0x9c, 0x99, 0x16, 0xfa, 0x1f, 0x04, 0xff, 0x84, 0x3f, 0xc5, 0x1f, 0x26, + 0x99, 0xcc, 0x34, 0x5d, 0x36, 0xa9, 0x1f, 0x56, 0xbf, 0xcd, 0xcd, 0x9c, 0x73, 0xe6, 0x9e, 0x73, + 0x67, 0x5a, 0x78, 0x92, 0xc4, 0x73, 0x9e, 0x1d, 0x2c, 0xb2, 0xfd, 0x72, 0xc1, 0x69, 0x11, 0x0b, + 0xc9, 0xd7, 0x07, 0x39, 0xcf, 0x64, 0x55, 0x06, 0xaa, 0xc4, 0xeb, 0x8b, 0x2c, 0x50, 0xb8, 0xc0, + 0x6c, 0xf8, 0x3f, 0x6d, 0xe8, 0x7d, 0x20, 0xbe, 0x8a, 0xe7, 0x84, 0x08, 0xed, 0x34, 0x4c, 0xc8, + 0xb5, 0xc6, 0xd6, 0x64, 0xc0, 0xd4, 0x1a, 0x5d, 0xe8, 0xad, 0x88, 0x8b, 0x38, 0x4b, 0x5d, 0x5b, + 0x7d, 0x36, 0x25, 0x1e, 0x43, 0x3f, 0x21, 0x19, 0x46, 0xa1, 0x0c, 0x5d, 0x67, 0xec, 0x4c, 0x86, + 0xd3, 0x49, 0x70, 0x49, 0x3f, 0xd0, 0xda, 0xc1, 0x99, 0x86, 0xce, 0x52, 0xc9, 0xd7, 0x6c, 0xc3, + 0xc4, 0x17, 0x30, 0xa0, 0x34, 0xca, 0xb3, 0x38, 0x95, 0xc2, 0x6d, 0x2b, 0x99, 0x3b, 0x35, 0x32, + 0x33, 0x8d, 0x61, 0x15, 0x1a, 0xf7, 0xa1, 0x93, 0x66, 0x11, 0x09, 0xb7, 0xa3, 0x68, 0xb7, 0x6a, + 0x68, 0xef, 0xb2, 0x88, 0x58, 0x89, 0xf2, 0x5e, 0xc1, 0xe8, 0x42, 0x13, 0x78, 0x0d, 0x9c, 0xaf, + 0xb4, 0xd6, 0x6e, 0x8b, 0x25, 0xde, 0x80, 0xce, 0x2a, 0x3c, 0x5f, 0x92, 0xb6, 0x5a, 0x16, 0x2f, + 0xed, 0xe7, 0x96, 0xff, 0xcb, 0x82, 0x76, 0x21, 0x86, 0xff, 0x81, 0x1d, 0x47, 0x9a, 0x63, 0xc7, + 0x51, 0x91, 0x4f, 0x18, 0x45, 0x9c, 0x84, 0x30, 0xf9, 0xe8, 0xb2, 0x48, 0x33, 0xcf, 0xb8, 0x74, + 0x9d, 0xb1, 0x35, 0x71, 0x98, 0x5a, 0xe3, 0xeb, 0xad, 0xcc, 0x4a, 0xb3, 0x0f, 0x1b, 0xba, 0x6e, + 0x0a, 0xec, 0x6a, 0x36, 0xbe, 0xdb, 0xd0, 0x37, 0x51, 0xd6, 0x8e, 0x7b, 0x0a, 0x3d, 0x4e, 0xdf, + 0x96, 0x24, 0xa4, 0x22, 0x0f, 0xa7, 0x6e, 0x4d, 0x7f, 0x1f, 0x0b, 0x3d, 0x66, 0x80, 0x78, 0x04, + 0x7d, 0x4e, 0x22, 0xcf, 0x52, 0x41, 0xca, 0xec, 0x2e, 0xd2, 0x06, 0x89, 0xb3, 0x4b, 0x51, 0x3c, + 0xde, 0x31, 0xf7, 0x7f, 0x13, 0x47, 0x08, 0x1d, 0xd5, 0x56, 0x6d, 0x14, 0x08, 0x6d, 0xb9, 0xce, + 0x0d, 0x4b, 0xad, 0xf1, 0x10, 0xba, 0x8a, 0x2d, 0xf4, 0x8d, 0x6f, 0x36, 0xaa, 0x71, 0xbe, 0x84, + 0x2e, 0x23, 0xb1, 0x3c, 0x97, 0x78, 0x13, 0xba, 0xe1, 0x5c, 0x16, 0x0f, 0xa9, 0x3c, 0x45, 0x57, + 0x78, 0x04, 0x3d, 0x51, 0x3e, 0x12, 0x1d, 0xb9, 0xd7, 0xfc, 0x8c, 0x98, 0x81, 0xe2, 0x5d, 0x18, + 0xc8, 0x38, 0x21, 0x21, 0xc3, 0x24, 0xd7, 0x57, 0xac, 0xfa, 0xe0, 0xff, 0x0f, 0xa3, 0x59, 0x92, + 0xcb, 0x35, 0xd3, 0x69, 0xfb, 0x8f, 0x00, 0x4e, 0x48, 0x32, 0x3d, 0x31, 0xb7, 0x3a, 0xb2, 0xec, + 0xc5, 0x94, 0xfe, 0x0c, 0x86, 0x0a, 0xa7, 0x87, 0xf4, 0x0c, 0xfa, 0x7a, 0x47, 0xb8, 0x96, 0x72, + 0xbc, 0xab, 0xb9, 0x0d, 0xd6, 0x1f, 0xc1, 0xf0, 0x34, 0x16, 0xe6, 0x3c, 0xff, 0x0d, 0xec, 0x95, + 0xe5, 0x15, 0x65, 0x27, 0xb0, 0xf7, 0x29, 0x94, 0xf3, 0x2f, 0x7f, 0xf4, 0x31, 0xfd, 0xe1, 0x40, + 0x9f, 0x69, 0x21, 0x3c, 0x53, 0xe6, 0xcd, 0xaf, 0xdc, 0xbd, 0x9a, 0xa3, 0xaa, 0x6c, 0xbc, 0xfb, + 0x4d, 0xdb, 0x3a, 0xc9, 0x16, 0xbe, 0x35, 0xd2, 0xc4, 0x71, 0x47, 0xdf, 0xde, 0xb8, 0xee, 0x3e, + 0x5f, 0x98, 0x4a, 0x0b, 0x4f, 0x01, 0x8e, 0x89, 0xff, 0x2d, 0xb5, 0xf7, 0x65, 0xce, 0x9a, 0x22, + 0xb0, 0xce, 0xcb, 0xd6, 0x5c, 0xbc, 0x07, 0x8d, 0xfb, 0x1b, 0xc9, 0x13, 0xe8, 0xa8, 0xc8, 0xb1, + 0x0e, 0xbb, 0x3d, 0x0c, 0xef, 0x76, 0x0d, 0xa0, 0xbc, 0xfa, 0x7e, 0xeb, 0xd0, 0xfa, 0xdc, 0x55, + 0x7f, 0x41, 0x4f, 0x7f, 0x07, 0x00, 0x00, 0xff, 0xff, 0x6b, 0x0b, 0x12, 0xd6, 0xb2, 0x06, 0x00, + 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// RegistryClient is the client API for Registry service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type RegistryClient interface { + GetService(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (*GetResponse, error) + Register(ctx context.Context, in *Service, opts ...grpc.CallOption) (*EmptyResponse, error) + Deregister(ctx context.Context, in *Service, opts ...grpc.CallOption) (*EmptyResponse, error) + ListServices(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error) + Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (Registry_WatchClient, error) +} + +type registryClient struct { + cc *grpc.ClientConn +} + +func NewRegistryClient(cc *grpc.ClientConn) RegistryClient { + return ®istryClient{cc} +} + +func (c *registryClient) GetService(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (*GetResponse, error) { + out := new(GetResponse) + err := c.cc.Invoke(ctx, "/go.micro.registry.Registry/GetService", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *registryClient) Register(ctx context.Context, in *Service, opts ...grpc.CallOption) (*EmptyResponse, error) { + out := new(EmptyResponse) + err := c.cc.Invoke(ctx, "/go.micro.registry.Registry/Register", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *registryClient) Deregister(ctx context.Context, in *Service, opts ...grpc.CallOption) (*EmptyResponse, error) { + out := new(EmptyResponse) + err := c.cc.Invoke(ctx, "/go.micro.registry.Registry/Deregister", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *registryClient) ListServices(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error) { + out := new(ListResponse) + err := c.cc.Invoke(ctx, "/go.micro.registry.Registry/ListServices", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *registryClient) Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (Registry_WatchClient, error) { + stream, err := c.cc.NewStream(ctx, &_Registry_serviceDesc.Streams[0], "/go.micro.registry.Registry/Watch", opts...) + if err != nil { + return nil, err + } + x := ®istryWatchClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Registry_WatchClient interface { + Recv() (*Result, error) + grpc.ClientStream +} + +type registryWatchClient struct { + grpc.ClientStream +} + +func (x *registryWatchClient) Recv() (*Result, error) { + m := new(Result) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// RegistryServer is the server API for Registry service. +type RegistryServer interface { + GetService(context.Context, *GetRequest) (*GetResponse, error) + Register(context.Context, *Service) (*EmptyResponse, error) + Deregister(context.Context, *Service) (*EmptyResponse, error) + ListServices(context.Context, *ListRequest) (*ListResponse, error) + Watch(*WatchRequest, Registry_WatchServer) error +} + +func RegisterRegistryServer(s *grpc.Server, srv RegistryServer) { + s.RegisterService(&_Registry_serviceDesc, srv) +} + +func _Registry_GetService_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(RegistryServer).GetService(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/go.micro.registry.Registry/GetService", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(RegistryServer).GetService(ctx, req.(*GetRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Registry_Register_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Service) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(RegistryServer).Register(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/go.micro.registry.Registry/Register", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(RegistryServer).Register(ctx, req.(*Service)) + } + return interceptor(ctx, in, info, handler) +} + +func _Registry_Deregister_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Service) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(RegistryServer).Deregister(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/go.micro.registry.Registry/Deregister", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(RegistryServer).Deregister(ctx, req.(*Service)) + } + return interceptor(ctx, in, info, handler) +} + +func _Registry_ListServices_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ListRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(RegistryServer).ListServices(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/go.micro.registry.Registry/ListServices", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(RegistryServer).ListServices(ctx, req.(*ListRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Registry_Watch_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(WatchRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(RegistryServer).Watch(m, ®istryWatchServer{stream}) +} + +type Registry_WatchServer interface { + Send(*Result) error + grpc.ServerStream +} + +type registryWatchServer struct { + grpc.ServerStream +} + +func (x *registryWatchServer) Send(m *Result) error { + return x.ServerStream.SendMsg(m) +} + +var _Registry_serviceDesc = grpc.ServiceDesc{ + ServiceName: "go.micro.registry.Registry", + HandlerType: (*RegistryServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "GetService", + Handler: _Registry_GetService_Handler, + }, + { + MethodName: "Register", + Handler: _Registry_Register_Handler, + }, + { + MethodName: "Deregister", + Handler: _Registry_Deregister_Handler, + }, + { + MethodName: "ListServices", + Handler: _Registry_ListServices_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "Watch", + Handler: _Registry_Watch_Handler, + ServerStreams: true, + }, + }, + Metadata: "micro/go-micro/registry/proto/registry.proto", +} diff --git a/registry/proto/registry.proto b/registry/proto/registry.proto new file mode 100644 index 00000000..8d7cc854 --- /dev/null +++ b/registry/proto/registry.proto @@ -0,0 +1,73 @@ +syntax = "proto3"; + +package go.micro.registry; + +service Registry { + rpc GetService(GetRequest) returns (GetResponse) {}; + rpc Register(Service) returns (EmptyResponse) {}; + rpc Deregister(Service) returns (EmptyResponse) {}; + rpc ListServices(ListRequest) returns (ListResponse) {}; + rpc Watch(WatchRequest) returns (stream Result) {}; +} + +// Service represents a go-micro service +message Service { + string name = 1; + string version = 2; + map metadata = 3; + repeated Endpoint endpoints = 4; + repeated Node nodes = 5; +} + +// Node represents the node the service is on +message Node { + string id = 1; + string address = 2; + int64 port = 3; + map metadata = 4; +} + +// Endpoint is a endpoint provided by a service +message Endpoint { + string name = 1; + Value request = 2; + Value response = 3; + map metadata = 4; +} + +// Value is an opaque value for a request or response +message Value { + string name = 1; + string type = 2; + repeated Value values = 3; +} + +// Result is returns by the watcher +message Result { + string action = 1; // create, update, delete + Service service = 2; + int64 timestamp = 3; // unix timestamp +} + +message EmptyResponse {} + +message GetRequest { + string service = 1; +} + +message GetResponse { + repeated Service services = 1; +} + +message ListRequest { + // TODO: filtering +} + +message ListResponse { + repeated Service services = 1; +} + +message WatchRequest { + // service is optional + string service = 1; +} diff --git a/registry/service/service.go b/registry/service/service.go new file mode 100644 index 00000000..796a5283 --- /dev/null +++ b/registry/service/service.go @@ -0,0 +1,155 @@ +// Package service uses the registry service +package service + +import ( + "context" + "time" + + "github.com/micro/go-micro/client" + "github.com/micro/go-micro/registry" + pb "github.com/micro/go-micro/registry/proto" +) + +var ( + // The default service name + DefaultService = "go.micro.service" +) + +type serviceRegistry struct { + opts registry.Options + // name of the registry + name string + // address + address []string + // client to call registry + client pb.RegistryService +} + +func (s *serviceRegistry) callOpts() []client.CallOption { + var opts []client.CallOption + + // set registry address + if len(s.address) > 0 { + opts = append(opts, client.WithAddress(s.address...)) + } + + // set timeout + if s.opts.Timeout > time.Duration(0) { + opts = append(opts, client.WithRequestTimeout(s.opts.Timeout)) + } + + return opts +} + +func (s *serviceRegistry) Init(opts ...registry.Option) error { + for _, o := range opts { + o(&s.opts) + } + return nil +} + +func (s *serviceRegistry) Options() registry.Options { + return s.opts +} + +func (s *serviceRegistry) Register(srv *registry.Service, opts ...registry.RegisterOption) error { + var options registry.RegisterOptions + for _, o := range opts { + o(&options) + } + + // register the service + _, err := s.client.Register(context.TODO(), toProto(srv), s.callOpts()...) + if err != nil { + return err + } + + return nil +} + +func (s *serviceRegistry) Deregister(srv *registry.Service) error { + // deregister the service + _, err := s.client.Deregister(context.TODO(), toProto(srv), s.callOpts()...) + if err != nil { + return err + } + return nil +} + +func (s *serviceRegistry) GetService(name string) ([]*registry.Service, error) { + rsp, err := s.client.GetService(context.TODO(), &pb.GetRequest{ + Service: name, + }, s.callOpts()...) + + if err != nil { + return nil, err + } + + var services []*registry.Service + for _, service := range rsp.Services { + services = append(services, toService(service)) + } + return services, nil +} + +func (s *serviceRegistry) ListServices() ([]*registry.Service, error) { + rsp, err := s.client.ListServices(context.TODO(), &pb.ListRequest{}, s.callOpts()...) + if err != nil { + return nil, err + } + + var services []*registry.Service + for _, service := range rsp.Services { + services = append(services, toService(service)) + } + + return services, nil +} + +func (s *serviceRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) { + var options registry.WatchOptions + for _, o := range opts { + o(&options) + } + + stream, err := s.client.Watch(context.TODO(), &pb.WatchRequest{ + Service: options.Service, + }, s.callOpts()...) + + if err != nil { + return nil, err + } + + return newWatcher(stream), nil +} + +func (s *serviceRegistry) String() string { + return s.name +} + +// NewRegistry returns a new registry service client +func NewRegistry(opts ...registry.Option) registry.Registry { + var options registry.Options + for _, o := range opts { + o(&options) + } + + // use mdns to find the service registry + mReg := registry.NewRegistry() + + // create new client with mdns + cli := client.NewClient( + client.Registry(mReg), + ) + + // service name + // TODO: accept option + name := DefaultService + + return &serviceRegistry{ + opts: options, + name: name, + address: options.Addrs, + client: pb.NewRegistryService(name, cli), + } +} diff --git a/registry/service/util.go b/registry/service/util.go new file mode 100644 index 00000000..3aa7bd40 --- /dev/null +++ b/registry/service/util.go @@ -0,0 +1,133 @@ +package service + +import ( + "github.com/micro/go-micro/registry" + pb "github.com/micro/go-micro/registry/proto" +) + +func values(v []*registry.Value) []*pb.Value { + if len(v) == 0 { + return []*pb.Value{} + } + + var vs []*pb.Value + for _, vi := range v { + vs = append(vs, &pb.Value{ + Name: vi.Name, + Type: vi.Type, + Values: values(vi.Values), + }) + } + return vs +} + +func toValues(v []*pb.Value) []*registry.Value { + if len(v) == 0 { + return []*registry.Value{} + } + + var vs []*registry.Value + for _, vi := range v { + vs = append(vs, ®istry.Value{ + Name: vi.Name, + Type: vi.Type, + Values: toValues(vi.Values), + }) + } + return vs +} + +func toProto(s *registry.Service) *pb.Service { + var endpoints []*pb.Endpoint + for _, ep := range s.Endpoints { + var request, response *pb.Value + + if ep.Request != nil { + request = &pb.Value{ + Name: ep.Request.Name, + Type: ep.Request.Type, + Values: values(ep.Request.Values), + } + } + + if ep.Response != nil { + response = &pb.Value{ + Name: ep.Response.Name, + Type: ep.Response.Type, + Values: values(ep.Response.Values), + } + } + + endpoints = append(endpoints, &pb.Endpoint{ + Name: ep.Name, + Request: request, + Response: response, + Metadata: ep.Metadata, + }) + } + + var nodes []*pb.Node + + for _, node := range s.Nodes { + nodes = append(nodes, &pb.Node{ + Id: node.Id, + Address: node.Address, + Metadata: node.Metadata, + }) + } + + return &pb.Service{ + Name: s.Name, + Version: s.Version, + Metadata: s.Metadata, + Endpoints: endpoints, + Nodes: nodes, + } +} + +func toService(s *pb.Service) *registry.Service { + var endpoints []*registry.Endpoint + for _, ep := range s.Endpoints { + var request, response *registry.Value + + if ep.Request != nil { + request = ®istry.Value{ + Name: ep.Request.Name, + Type: ep.Request.Type, + Values: toValues(ep.Request.Values), + } + } + + if ep.Response != nil { + response = ®istry.Value{ + Name: ep.Response.Name, + Type: ep.Response.Type, + Values: toValues(ep.Response.Values), + } + } + + endpoints = append(endpoints, ®istry.Endpoint{ + Name: ep.Name, + Request: request, + Response: response, + Metadata: ep.Metadata, + }) + } + + var nodes []*registry.Node + for _, node := range s.Nodes { + nodes = append(nodes, ®istry.Node{ + Id: node.Id, + Address: node.Address, + Metadata: node.Metadata, + }) + } + + return ®istry.Service{ + Name: s.Name, + Version: s.Version, + Metadata: s.Metadata, + Endpoints: endpoints, + Nodes: nodes, + } +} diff --git a/registry/service/watcher.go b/registry/service/watcher.go new file mode 100644 index 00000000..c5ede205 --- /dev/null +++ b/registry/service/watcher.go @@ -0,0 +1,49 @@ +package service + +import ( + "github.com/micro/go-micro/registry" + pb "github.com/micro/go-micro/registry/proto" +) + +type serviceWatcher struct { + stream pb.Registry_WatchService + closed chan bool +} + +func (s *serviceWatcher) Next() (*registry.Result, error) { + for { + // check if closed + select { + case <-s.closed: + return nil, registry.ErrWatcherStopped + default: + } + + r, err := s.stream.Recv() + if err != nil { + return nil, err + } + + return ®istry.Result{ + Action: r.Action, + Service: toService(r.Service), + }, nil + } +} + +func (s *serviceWatcher) Stop() { + select { + case <-s.closed: + return + default: + close(s.closed) + s.stream.Close() + } +} + +func newWatcher(stream pb.Registry_WatchService) registry.Watcher { + return &serviceWatcher{ + stream: stream, + closed: make(chan bool), + } +} From 1f44d7a4a1870743561dac1b4fed9a8498efcf74 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Mon, 9 Sep 2019 09:20:17 -0700 Subject: [PATCH 2/3] Add registry handler --- registry/handler/handler.go | 70 +++++++++++++++++++++++++++++++++++++ registry/service/service.go | 8 ++--- registry/service/util.go | 4 +-- registry/service/watcher.go | 2 +- 4 files changed, 77 insertions(+), 7 deletions(-) create mode 100644 registry/handler/handler.go diff --git a/registry/handler/handler.go b/registry/handler/handler.go new file mode 100644 index 00000000..62604e41 --- /dev/null +++ b/registry/handler/handler.go @@ -0,0 +1,70 @@ +package handler + +import ( + "context" + + "github.com/micro/go-micro/errors" + "github.com/micro/go-micro/registry" + pb "github.com/micro/go-micro/registry/proto" + "github.com/micro/go-micro/registry/service" +) + +type Registry struct { + // internal registry + Registry registry.Registry +} + +func (r *Registry) GetService(ctx context.Context, req pb.GetRequest, rsp pb.GetResponse) error { + services, err := r.Registry.GetService(req.Service) + for _, srv := range services { + rsp.Services = append(rsp.Services, service.ToProto(srv)) + } + return nil +} + +func (r *Registry) Register(ctx context.Context, req pb.Service, rsp pb.EmptyResponse) error { + err := r.Registry.Register(service.ToService(req.Service)) + if err != nil { + return errors.InternalServerError("go.micro.registry", err.Error()) + } + return nil +} + +func (r *Registry) Deregister(ctx context.Context, req pb.Service, rsp pb.EmptyResponse) error { + err := r.Registry.Deregister(service.ToService(req.Service)) + if err != nil { + return errors.InternalServerError("go.micro.registry", err.Error()) + } + return nil +} + +func (r *Registry) ListServices(ctx context.Context, req pb.ListRequest, rsp pb.ListResponse) error { + services, err := r.Registry.ListServices() + for _, srv := range services { + rsp.Services = append(rsp.Services, service.ToProto(srv)) + } + return nil +} + +func (r *Registry) Watch(ctx context.Context, req pb.WatchRequest, rsp pb.Registry_WatchStream) error { + watcher, err := r.Registry.Watcher(registry.WatchOption(req.Service)) + if err != nil { + return errors.InternalServerError("go.micro.registry", err.Error()) + } + + for { + next, err := watcher.Next() + if err != nil { + return errors.InternalServerError("go.micro.registry", err.Error()) + } + err = rsp.Send(&pb.Result{ + Action: next.Action, + Service: service.ToProto(next.Service), + }) + if err != nil { + return errors.InternalServerError("go.micro.registry", err.Error()) + } + } + + return nil +} diff --git a/registry/service/service.go b/registry/service/service.go index 796a5283..2473c32f 100644 --- a/registry/service/service.go +++ b/registry/service/service.go @@ -59,7 +59,7 @@ func (s *serviceRegistry) Register(srv *registry.Service, opts ...registry.Regis } // register the service - _, err := s.client.Register(context.TODO(), toProto(srv), s.callOpts()...) + _, err := s.client.Register(context.TODO(), ToProto(srv), s.callOpts()...) if err != nil { return err } @@ -69,7 +69,7 @@ func (s *serviceRegistry) Register(srv *registry.Service, opts ...registry.Regis func (s *serviceRegistry) Deregister(srv *registry.Service) error { // deregister the service - _, err := s.client.Deregister(context.TODO(), toProto(srv), s.callOpts()...) + _, err := s.client.Deregister(context.TODO(), ToProto(srv), s.callOpts()...) if err != nil { return err } @@ -87,7 +87,7 @@ func (s *serviceRegistry) GetService(name string) ([]*registry.Service, error) { var services []*registry.Service for _, service := range rsp.Services { - services = append(services, toService(service)) + services = append(services, ToService(service)) } return services, nil } @@ -100,7 +100,7 @@ func (s *serviceRegistry) ListServices() ([]*registry.Service, error) { var services []*registry.Service for _, service := range rsp.Services { - services = append(services, toService(service)) + services = append(services, ToService(service)) } return services, nil diff --git a/registry/service/util.go b/registry/service/util.go index 3aa7bd40..bbde4095 100644 --- a/registry/service/util.go +++ b/registry/service/util.go @@ -37,7 +37,7 @@ func toValues(v []*pb.Value) []*registry.Value { return vs } -func toProto(s *registry.Service) *pb.Service { +func ToProto(s *registry.Service) *pb.Service { var endpoints []*pb.Endpoint for _, ep := range s.Endpoints { var request, response *pb.Value @@ -85,7 +85,7 @@ func toProto(s *registry.Service) *pb.Service { } } -func toService(s *pb.Service) *registry.Service { +func ToService(s *pb.Service) *registry.Service { var endpoints []*registry.Endpoint for _, ep := range s.Endpoints { var request, response *registry.Value diff --git a/registry/service/watcher.go b/registry/service/watcher.go index c5ede205..08f2144d 100644 --- a/registry/service/watcher.go +++ b/registry/service/watcher.go @@ -26,7 +26,7 @@ func (s *serviceWatcher) Next() (*registry.Result, error) { return ®istry.Result{ Action: r.Action, - Service: toService(r.Service), + Service: ToService(r.Service), }, nil } } From 2c16c7e62f05f04633f859d55d9f305b17114fbe Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Mon, 9 Sep 2019 09:25:47 -0700 Subject: [PATCH 3/3] Fix build breaks --- registry/handler/handler.go | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/registry/handler/handler.go b/registry/handler/handler.go index 62604e41..f86148cd 100644 --- a/registry/handler/handler.go +++ b/registry/handler/handler.go @@ -14,40 +14,46 @@ type Registry struct { Registry registry.Registry } -func (r *Registry) GetService(ctx context.Context, req pb.GetRequest, rsp pb.GetResponse) error { +func (r *Registry) GetService(ctx context.Context, req *pb.GetRequest, rsp *pb.GetResponse) error { services, err := r.Registry.GetService(req.Service) + if err != nil { + return errors.InternalServerError("go.micro.registry", err.Error()) + } for _, srv := range services { rsp.Services = append(rsp.Services, service.ToProto(srv)) } return nil } -func (r *Registry) Register(ctx context.Context, req pb.Service, rsp pb.EmptyResponse) error { - err := r.Registry.Register(service.ToService(req.Service)) +func (r *Registry) Register(ctx context.Context, req *pb.Service, rsp *pb.EmptyResponse) error { + err := r.Registry.Register(service.ToService(req)) if err != nil { return errors.InternalServerError("go.micro.registry", err.Error()) } return nil } -func (r *Registry) Deregister(ctx context.Context, req pb.Service, rsp pb.EmptyResponse) error { - err := r.Registry.Deregister(service.ToService(req.Service)) +func (r *Registry) Deregister(ctx context.Context, req *pb.Service, rsp *pb.EmptyResponse) error { + err := r.Registry.Deregister(service.ToService(req)) if err != nil { return errors.InternalServerError("go.micro.registry", err.Error()) } return nil } -func (r *Registry) ListServices(ctx context.Context, req pb.ListRequest, rsp pb.ListResponse) error { +func (r *Registry) ListServices(ctx context.Context, req *pb.ListRequest, rsp *pb.ListResponse) error { services, err := r.Registry.ListServices() + if err != nil { + return errors.InternalServerError("go.micro.registry", err.Error()) + } for _, srv := range services { rsp.Services = append(rsp.Services, service.ToProto(srv)) } return nil } -func (r *Registry) Watch(ctx context.Context, req pb.WatchRequest, rsp pb.Registry_WatchStream) error { - watcher, err := r.Registry.Watcher(registry.WatchOption(req.Service)) +func (r *Registry) Watch(ctx context.Context, req *pb.WatchRequest, rsp pb.Registry_WatchStream) error { + watcher, err := r.Registry.Watch(registry.WatchService(req.Service)) if err != nil { return errors.InternalServerError("go.micro.registry", err.Error()) }