diff --git a/network/router/default.go b/network/router/default.go index b1170796..245a7b31 100644 --- a/network/router/default.go +++ b/network/router/default.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/google/uuid" "github.com/micro/go-micro/registry" ) @@ -43,15 +44,17 @@ var ( type router struct { sync.RWMutex // embed the table - *table - opts Options - status Status - exit chan struct{} - errChan chan error - eventChan chan *Event - advertChan chan *Advert - advertWg *sync.WaitGroup - wg *sync.WaitGroup + table *table + opts Options + status Status + exit chan struct{} + errChan chan error + eventChan chan *Event + advertWg *sync.WaitGroup + wg *sync.WaitGroup + + // advert subscribers + subscribers map[string]chan *Advert } // newRouter creates new router and returns it @@ -65,11 +68,12 @@ func newRouter(opts ...Option) Router { } r := &router{ - table: newTable(), - opts: options, - status: Status{Code: Stopped, Error: nil}, - advertWg: &sync.WaitGroup{}, - wg: &sync.WaitGroup{}, + table: newTable(), + opts: options, + status: Status{Code: Stopped, Error: nil}, + advertWg: &sync.WaitGroup{}, + wg: &sync.WaitGroup{}, + subscribers: make(map[string]chan *Advert), } go r.run() @@ -90,19 +94,23 @@ func (r *router) Options() Options { return r.opts } +func (r *router) Table() Table { + return r.table +} + // manageRoute applies action on a given route func (r *router) manageRoute(route Route, action string) error { switch action { case "create": - if err := r.Create(route); err != nil && err != ErrDuplicateRoute { + if err := r.table.Create(route); err != nil && err != ErrDuplicateRoute { return fmt.Errorf("failed adding route for service %s: %s", route.Service, err) } case "update": - if err := r.Update(route); err != nil && err != ErrDuplicateRoute { + if err := r.table.Update(route); err != nil && err != ErrDuplicateRoute { return fmt.Errorf("failed updating route for service %s: %s", route.Service, err) } case "delete": - if err := r.Delete(route); err != nil && err != ErrRouteNotFound { + if err := r.table.Delete(route); err != nil && err != ErrRouteNotFound { return fmt.Errorf("failed deleting route for service %s: %s", route.Service, err) } default: @@ -244,11 +252,23 @@ func (r *router) publishAdvert(advType AdvertType, events []*Event) { Events: events, } - select { - case r.advertChan <- a: - case <-r.exit: - return + r.RLock() + for _, sub := range r.subscribers { + // check the exit chan first + select { + case <-r.exit: + r.RUnlock() + return + default: + } + + // now send the message + select { + case sub <- a: + default: + } } + r.RUnlock() } // advertiseTable advertises the whole routing table to the network @@ -260,7 +280,7 @@ func (r *router) advertiseTable() error { select { case <-ticker.C: // list routing table routes to announce - routes, err := r.List() + routes, err := r.table.List() if err != nil { return fmt.Errorf("failed listing routes: %s", err) } @@ -410,8 +430,6 @@ func (r *router) advertiseEvents() error { case <-r.exit: // first wait for the advertiser to finish r.advertWg.Wait() - // close the advert channel - close(r.advertChan) return nil } } @@ -434,9 +452,6 @@ func (r *router) watchErrors() { // drain the advertise channel only if advertising if r.status.Code == Advertising { - // drain the advertise channel - for range r.advertChan { - } // drain the event channel for range r.eventChan { } @@ -474,7 +489,7 @@ func (r *router) run() { Network: "*", Metric: DefaultLocalMetric, } - if err := r.Create(route); err != nil { + if err := r.table.Create(route); err != nil { r.status = Status{Code: Error, Error: fmt.Errorf("failed adding default gateway route: %s", err)} return } @@ -525,10 +540,12 @@ func (r *router) Advertise() (<-chan *Advert, error) { switch r.status.Code { case Advertising: - return r.advertChan, nil + advertChan := make(chan *Advert) + r.subscribers[uuid.New().String()] = advertChan + return advertChan, nil case Running: // list routing table routes to announce - routes, err := r.List() + routes, err := r.table.List() if err != nil { return nil, fmt.Errorf("failed listing routes: %s", err) } @@ -543,8 +560,7 @@ func (r *router) Advertise() (<-chan *Advert, error) { events[i] = event } - // create advertise and event channels - r.advertChan = make(chan *Advert) + // create event channels r.eventChan = make(chan *Event) // advertise your presence @@ -573,7 +589,11 @@ func (r *router) Advertise() (<-chan *Advert, error) { // mark router as Running and set its Error to nil r.status = Status{Code: Advertising, Error: nil} - return r.advertChan, nil + // create advert channel + advertChan := make(chan *Advert) + r.subscribers[uuid.New().String()] = advertChan + + return advertChan, nil case Stopped: return nil, fmt.Errorf("not running") } @@ -604,6 +624,14 @@ func (r *router) Process(a *Advert) error { return nil } +func (r *router) Lookup(q Query) ([]Route, error) { + return r.table.Query(q) +} + +func (r *router) Watch(opts ...WatchOption) (Watcher, error) { + return r.table.Watch(opts...) +} + // Status returns router status func (r *router) Status() Status { r.RLock() @@ -625,14 +653,20 @@ func (r *router) Stop() error { // drain the advertise channel only if advertising if r.status.Code == Advertising { - // drain the advertise channel - for range r.advertChan { - } // drain the event channel for range r.eventChan { } } + // close advert subscribers + for id, sub := range r.subscribers { + // close the channel + close(sub) + + // delete the subscriber + delete(r.subscribers, id) + } + // mark the router as Stopped and set its Error to nil r.status = Status{Code: Stopped, Error: nil} } diff --git a/network/router/options.go b/network/router/options.go index 003e0b33..13119e1b 100644 --- a/network/router/options.go +++ b/network/router/options.go @@ -2,6 +2,7 @@ package router import ( "github.com/google/uuid" + "github.com/micro/go-micro/client" "github.com/micro/go-micro/registry" ) @@ -17,6 +18,8 @@ type Options struct { Network string // Registry is the local registry Registry registry.Registry + // Client for calling router + Client client.Client } // Id sets Router Id @@ -33,6 +36,13 @@ func Address(a string) Option { } } +// Client to call router service +func Client(c client.Client) Option { + return func(o *Options) { + o.Client = c + } +} + // Gateway sets network gateway func Gateway(g string) Option { return func(o *Options) { diff --git a/network/router/proto/router.micro.go b/network/router/proto/router.micro.go index 96386187..ee5bbd51 100644 --- a/network/router/proto/router.micro.go +++ b/network/router/proto/router.micro.go @@ -1,5 +1,5 @@ // Code generated by protoc-gen-micro. DO NOT EDIT. -// source: router.proto +// source: go-micro/network/router/proto/router.proto package go_micro_router @@ -34,14 +34,11 @@ var _ server.Option // Client API for Router service type RouterService interface { - List(ctx context.Context, in *ListRequest, opts ...client.CallOption) (*ListResponse, error) Lookup(ctx context.Context, in *LookupRequest, opts ...client.CallOption) (*LookupResponse, error) Watch(ctx context.Context, in *WatchRequest, opts ...client.CallOption) (Router_WatchService, error) Advertise(ctx context.Context, in *AdvertiseRequest, opts ...client.CallOption) (Router_AdvertiseService, error) Process(ctx context.Context, in *Advert, opts ...client.CallOption) (*ProcessResponse, error) - Create(ctx context.Context, in *Route, opts ...client.CallOption) (*CreateResponse, error) - Delete(ctx context.Context, in *Route, opts ...client.CallOption) (*DeleteResponse, error) - Update(ctx context.Context, in *Route, opts ...client.CallOption) (*UpdateResponse, error) + Status(ctx context.Context, in *Request, opts ...client.CallOption) (*StatusResponse, error) } type routerService struct { @@ -62,16 +59,6 @@ func NewRouterService(name string, c client.Client) RouterService { } } -func (c *routerService) List(ctx context.Context, in *ListRequest, opts ...client.CallOption) (*ListResponse, error) { - req := c.c.NewRequest(c.name, "Router.List", in) - out := new(ListResponse) - err := c.c.Call(ctx, req, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - func (c *routerService) Lookup(ctx context.Context, in *LookupRequest, opts ...client.CallOption) (*LookupResponse, error) { req := c.c.NewRequest(c.name, "Router.Lookup", in) out := new(LookupResponse) @@ -180,29 +167,9 @@ func (c *routerService) Process(ctx context.Context, in *Advert, opts ...client. return out, nil } -func (c *routerService) Create(ctx context.Context, in *Route, opts ...client.CallOption) (*CreateResponse, error) { - req := c.c.NewRequest(c.name, "Router.Create", in) - out := new(CreateResponse) - err := c.c.Call(ctx, req, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *routerService) Delete(ctx context.Context, in *Route, opts ...client.CallOption) (*DeleteResponse, error) { - req := c.c.NewRequest(c.name, "Router.Delete", in) - out := new(DeleteResponse) - err := c.c.Call(ctx, req, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *routerService) Update(ctx context.Context, in *Route, opts ...client.CallOption) (*UpdateResponse, error) { - req := c.c.NewRequest(c.name, "Router.Update", in) - out := new(UpdateResponse) +func (c *routerService) Status(ctx context.Context, in *Request, opts ...client.CallOption) (*StatusResponse, error) { + req := c.c.NewRequest(c.name, "Router.Status", in) + out := new(StatusResponse) err := c.c.Call(ctx, req, out, opts...) if err != nil { return nil, err @@ -213,26 +180,20 @@ func (c *routerService) Update(ctx context.Context, in *Route, opts ...client.Ca // Server API for Router service type RouterHandler interface { - List(context.Context, *ListRequest, *ListResponse) error Lookup(context.Context, *LookupRequest, *LookupResponse) error Watch(context.Context, *WatchRequest, Router_WatchStream) error Advertise(context.Context, *AdvertiseRequest, Router_AdvertiseStream) error Process(context.Context, *Advert, *ProcessResponse) error - Create(context.Context, *Route, *CreateResponse) error - Delete(context.Context, *Route, *DeleteResponse) error - Update(context.Context, *Route, *UpdateResponse) error + Status(context.Context, *Request, *StatusResponse) error } func RegisterRouterHandler(s server.Server, hdlr RouterHandler, opts ...server.HandlerOption) error { type router interface { - List(ctx context.Context, in *ListRequest, out *ListResponse) error Lookup(ctx context.Context, in *LookupRequest, out *LookupResponse) error Watch(ctx context.Context, stream server.Stream) error Advertise(ctx context.Context, stream server.Stream) error Process(ctx context.Context, in *Advert, out *ProcessResponse) error - Create(ctx context.Context, in *Route, out *CreateResponse) error - Delete(ctx context.Context, in *Route, out *DeleteResponse) error - Update(ctx context.Context, in *Route, out *UpdateResponse) error + Status(ctx context.Context, in *Request, out *StatusResponse) error } type Router struct { router @@ -245,10 +206,6 @@ type routerHandler struct { RouterHandler } -func (h *routerHandler) List(ctx context.Context, in *ListRequest, out *ListResponse) error { - return h.RouterHandler.List(ctx, in, out) -} - func (h *routerHandler) Lookup(ctx context.Context, in *LookupRequest, out *LookupResponse) error { return h.RouterHandler.Lookup(ctx, in, out) } @@ -327,14 +284,133 @@ func (h *routerHandler) Process(ctx context.Context, in *Advert, out *ProcessRes return h.RouterHandler.Process(ctx, in, out) } -func (h *routerHandler) Create(ctx context.Context, in *Route, out *CreateResponse) error { - return h.RouterHandler.Create(ctx, in, out) +func (h *routerHandler) Status(ctx context.Context, in *Request, out *StatusResponse) error { + return h.RouterHandler.Status(ctx, in, out) } -func (h *routerHandler) Delete(ctx context.Context, in *Route, out *DeleteResponse) error { - return h.RouterHandler.Delete(ctx, in, out) +// Client API for Table service + +type TableService interface { + Create(ctx context.Context, in *Route, opts ...client.CallOption) (*CreateResponse, error) + Delete(ctx context.Context, in *Route, opts ...client.CallOption) (*DeleteResponse, error) + Update(ctx context.Context, in *Route, opts ...client.CallOption) (*UpdateResponse, error) + Query(ctx context.Context, in *QueryRequest, opts ...client.CallOption) (*QueryResponse, error) + List(ctx context.Context, in *Request, opts ...client.CallOption) (*ListResponse, error) } -func (h *routerHandler) Update(ctx context.Context, in *Route, out *UpdateResponse) error { - return h.RouterHandler.Update(ctx, in, out) +type tableService struct { + c client.Client + name string +} + +func NewTableService(name string, c client.Client) TableService { + if c == nil { + c = client.NewClient() + } + if len(name) == 0 { + name = "go.micro.router" + } + return &tableService{ + c: c, + name: name, + } +} + +func (c *tableService) Create(ctx context.Context, in *Route, opts ...client.CallOption) (*CreateResponse, error) { + req := c.c.NewRequest(c.name, "Table.Create", in) + out := new(CreateResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *tableService) Delete(ctx context.Context, in *Route, opts ...client.CallOption) (*DeleteResponse, error) { + req := c.c.NewRequest(c.name, "Table.Delete", in) + out := new(DeleteResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *tableService) Update(ctx context.Context, in *Route, opts ...client.CallOption) (*UpdateResponse, error) { + req := c.c.NewRequest(c.name, "Table.Update", in) + out := new(UpdateResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *tableService) Query(ctx context.Context, in *QueryRequest, opts ...client.CallOption) (*QueryResponse, error) { + req := c.c.NewRequest(c.name, "Table.Query", in) + out := new(QueryResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *tableService) List(ctx context.Context, in *Request, opts ...client.CallOption) (*ListResponse, error) { + req := c.c.NewRequest(c.name, "Table.List", in) + out := new(ListResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for Table service + +type TableHandler interface { + Create(context.Context, *Route, *CreateResponse) error + Delete(context.Context, *Route, *DeleteResponse) error + Update(context.Context, *Route, *UpdateResponse) error + Query(context.Context, *QueryRequest, *QueryResponse) error + List(context.Context, *Request, *ListResponse) error +} + +func RegisterTableHandler(s server.Server, hdlr TableHandler, opts ...server.HandlerOption) error { + type table interface { + Create(ctx context.Context, in *Route, out *CreateResponse) error + Delete(ctx context.Context, in *Route, out *DeleteResponse) error + Update(ctx context.Context, in *Route, out *UpdateResponse) error + Query(ctx context.Context, in *QueryRequest, out *QueryResponse) error + List(ctx context.Context, in *Request, out *ListResponse) error + } + type Table struct { + table + } + h := &tableHandler{hdlr} + return s.Handle(s.NewHandler(&Table{h}, opts...)) +} + +type tableHandler struct { + TableHandler +} + +func (h *tableHandler) Create(ctx context.Context, in *Route, out *CreateResponse) error { + return h.TableHandler.Create(ctx, in, out) +} + +func (h *tableHandler) Delete(ctx context.Context, in *Route, out *DeleteResponse) error { + return h.TableHandler.Delete(ctx, in, out) +} + +func (h *tableHandler) Update(ctx context.Context, in *Route, out *UpdateResponse) error { + return h.TableHandler.Update(ctx, in, out) +} + +func (h *tableHandler) Query(ctx context.Context, in *QueryRequest, out *QueryResponse) error { + return h.TableHandler.Query(ctx, in, out) +} + +func (h *tableHandler) List(ctx context.Context, in *Request, out *ListResponse) error { + return h.TableHandler.List(ctx, in, out) } diff --git a/network/router/proto/router.pb.go b/network/router/proto/router.pb.go index 1582776d..6b90c0c8 100644 --- a/network/router/proto/router.pb.go +++ b/network/router/proto/router.pb.go @@ -1,11 +1,13 @@ // Code generated by protoc-gen-go. DO NOT EDIT. -// source: router.proto +// source: go-micro/network/router/proto/router.proto package go_micro_router import ( + context "context" fmt "fmt" proto "github.com/golang/protobuf/proto" + grpc "google.golang.org/grpc" math "math" ) @@ -43,7 +45,7 @@ func (x AdvertType) String() string { } func (AdvertType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{0} + return fileDescriptor_fc08514fc6dadd29, []int{0} } // EventType defines the type of event @@ -72,40 +74,40 @@ func (x EventType) String() string { } func (EventType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{1} + return fileDescriptor_fc08514fc6dadd29, []int{1} } -// ListRequest is made to List routes -type ListRequest struct { +// Empty request +type Request struct { XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } -func (m *ListRequest) Reset() { *m = ListRequest{} } -func (m *ListRequest) String() string { return proto.CompactTextString(m) } -func (*ListRequest) ProtoMessage() {} -func (*ListRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{0} +func (m *Request) Reset() { *m = Request{} } +func (m *Request) String() string { return proto.CompactTextString(m) } +func (*Request) ProtoMessage() {} +func (*Request) Descriptor() ([]byte, []int) { + return fileDescriptor_fc08514fc6dadd29, []int{0} } -func (m *ListRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_ListRequest.Unmarshal(m, b) +func (m *Request) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Request.Unmarshal(m, b) } -func (m *ListRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_ListRequest.Marshal(b, m, deterministic) +func (m *Request) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Request.Marshal(b, m, deterministic) } -func (m *ListRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_ListRequest.Merge(m, src) +func (m *Request) XXX_Merge(src proto.Message) { + xxx_messageInfo_Request.Merge(m, src) } -func (m *ListRequest) XXX_Size() int { - return xxx_messageInfo_ListRequest.Size(m) +func (m *Request) XXX_Size() int { + return xxx_messageInfo_Request.Size(m) } -func (m *ListRequest) XXX_DiscardUnknown() { - xxx_messageInfo_ListRequest.DiscardUnknown(m) +func (m *Request) XXX_DiscardUnknown() { + xxx_messageInfo_Request.DiscardUnknown(m) } -var xxx_messageInfo_ListRequest proto.InternalMessageInfo +var xxx_messageInfo_Request proto.InternalMessageInfo // ListResponse is returned by List type ListResponse struct { @@ -119,7 +121,7 @@ func (m *ListResponse) Reset() { *m = ListResponse{} } func (m *ListResponse) String() string { return proto.CompactTextString(m) } func (*ListResponse) ProtoMessage() {} func (*ListResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{1} + return fileDescriptor_fc08514fc6dadd29, []int{1} } func (m *ListResponse) XXX_Unmarshal(b []byte) error { @@ -159,7 +161,7 @@ func (m *LookupRequest) Reset() { *m = LookupRequest{} } func (m *LookupRequest) String() string { return proto.CompactTextString(m) } func (*LookupRequest) ProtoMessage() {} func (*LookupRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{2} + return fileDescriptor_fc08514fc6dadd29, []int{2} } func (m *LookupRequest) XXX_Unmarshal(b []byte) error { @@ -199,7 +201,7 @@ func (m *LookupResponse) Reset() { *m = LookupResponse{} } func (m *LookupResponse) String() string { return proto.CompactTextString(m) } func (*LookupResponse) ProtoMessage() {} func (*LookupResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{3} + return fileDescriptor_fc08514fc6dadd29, []int{3} } func (m *LookupResponse) XXX_Unmarshal(b []byte) error { @@ -227,6 +229,84 @@ func (m *LookupResponse) GetRoutes() []*Route { return nil } +type QueryRequest struct { + Query *Query `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *QueryRequest) Reset() { *m = QueryRequest{} } +func (m *QueryRequest) String() string { return proto.CompactTextString(m) } +func (*QueryRequest) ProtoMessage() {} +func (*QueryRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_fc08514fc6dadd29, []int{4} +} + +func (m *QueryRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_QueryRequest.Unmarshal(m, b) +} +func (m *QueryRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_QueryRequest.Marshal(b, m, deterministic) +} +func (m *QueryRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_QueryRequest.Merge(m, src) +} +func (m *QueryRequest) XXX_Size() int { + return xxx_messageInfo_QueryRequest.Size(m) +} +func (m *QueryRequest) XXX_DiscardUnknown() { + xxx_messageInfo_QueryRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_QueryRequest proto.InternalMessageInfo + +func (m *QueryRequest) GetQuery() *Query { + if m != nil { + return m.Query + } + return nil +} + +type QueryResponse struct { + Routes []*Route `protobuf:"bytes,1,rep,name=routes,proto3" json:"routes,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *QueryResponse) Reset() { *m = QueryResponse{} } +func (m *QueryResponse) String() string { return proto.CompactTextString(m) } +func (*QueryResponse) ProtoMessage() {} +func (*QueryResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_fc08514fc6dadd29, []int{5} +} + +func (m *QueryResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_QueryResponse.Unmarshal(m, b) +} +func (m *QueryResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_QueryResponse.Marshal(b, m, deterministic) +} +func (m *QueryResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_QueryResponse.Merge(m, src) +} +func (m *QueryResponse) XXX_Size() int { + return xxx_messageInfo_QueryResponse.Size(m) +} +func (m *QueryResponse) XXX_DiscardUnknown() { + xxx_messageInfo_QueryResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_QueryResponse proto.InternalMessageInfo + +func (m *QueryResponse) GetRoutes() []*Route { + if m != nil { + return m.Routes + } + return nil +} + // WatchRequest is made to Watch Router type WatchRequest struct { XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -238,7 +318,7 @@ func (m *WatchRequest) Reset() { *m = WatchRequest{} } func (m *WatchRequest) String() string { return proto.CompactTextString(m) } func (*WatchRequest) ProtoMessage() {} func (*WatchRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{4} + return fileDescriptor_fc08514fc6dadd29, []int{6} } func (m *WatchRequest) XXX_Unmarshal(b []byte) error { @@ -270,7 +350,7 @@ func (m *AdvertiseRequest) Reset() { *m = AdvertiseRequest{} } func (m *AdvertiseRequest) String() string { return proto.CompactTextString(m) } func (*AdvertiseRequest) ProtoMessage() {} func (*AdvertiseRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{5} + return fileDescriptor_fc08514fc6dadd29, []int{7} } func (m *AdvertiseRequest) XXX_Unmarshal(b []byte) error { @@ -312,7 +392,7 @@ func (m *Advert) Reset() { *m = Advert{} } func (m *Advert) String() string { return proto.CompactTextString(m) } func (*Advert) ProtoMessage() {} func (*Advert) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{6} + return fileDescriptor_fc08514fc6dadd29, []int{8} } func (m *Advert) XXX_Unmarshal(b []byte) error { @@ -379,7 +459,7 @@ func (m *ProcessResponse) Reset() { *m = ProcessResponse{} } func (m *ProcessResponse) String() string { return proto.CompactTextString(m) } func (*ProcessResponse) ProtoMessage() {} func (*ProcessResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{7} + return fileDescriptor_fc08514fc6dadd29, []int{9} } func (m *ProcessResponse) XXX_Unmarshal(b []byte) error { @@ -411,7 +491,7 @@ func (m *CreateResponse) Reset() { *m = CreateResponse{} } func (m *CreateResponse) String() string { return proto.CompactTextString(m) } func (*CreateResponse) ProtoMessage() {} func (*CreateResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{8} + return fileDescriptor_fc08514fc6dadd29, []int{10} } func (m *CreateResponse) XXX_Unmarshal(b []byte) error { @@ -443,7 +523,7 @@ func (m *DeleteResponse) Reset() { *m = DeleteResponse{} } func (m *DeleteResponse) String() string { return proto.CompactTextString(m) } func (*DeleteResponse) ProtoMessage() {} func (*DeleteResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{9} + return fileDescriptor_fc08514fc6dadd29, []int{11} } func (m *DeleteResponse) XXX_Unmarshal(b []byte) error { @@ -475,7 +555,7 @@ func (m *UpdateResponse) Reset() { *m = UpdateResponse{} } func (m *UpdateResponse) String() string { return proto.CompactTextString(m) } func (*UpdateResponse) ProtoMessage() {} func (*UpdateResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{10} + return fileDescriptor_fc08514fc6dadd29, []int{12} } func (m *UpdateResponse) XXX_Unmarshal(b []byte) error { @@ -513,7 +593,7 @@ func (m *Event) Reset() { *m = Event{} } func (m *Event) String() string { return proto.CompactTextString(m) } func (*Event) ProtoMessage() {} func (*Event) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{11} + return fileDescriptor_fc08514fc6dadd29, []int{13} } func (m *Event) XXX_Unmarshal(b []byte) error { @@ -572,7 +652,7 @@ func (m *Query) Reset() { *m = Query{} } func (m *Query) String() string { return proto.CompactTextString(m) } func (*Query) ProtoMessage() {} func (*Query) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{12} + return fileDescriptor_fc08514fc6dadd29, []int{14} } func (m *Query) XXX_Unmarshal(b []byte) error { @@ -637,7 +717,7 @@ func (m *Route) Reset() { *m = Route{} } func (m *Route) String() string { return proto.CompactTextString(m) } func (*Route) ProtoMessage() {} func (*Route) Descriptor() ([]byte, []int) { - return fileDescriptor_367072455c71aedc, []int{13} + return fileDescriptor_fc08514fc6dadd29, []int{15} } func (m *Route) XXX_Unmarshal(b []byte) error { @@ -700,13 +780,101 @@ func (m *Route) GetMetric() int64 { return 0 } +type Status struct { + Code string `protobuf:"bytes,1,opt,name=code,proto3" json:"code,omitempty"` + Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Status) Reset() { *m = Status{} } +func (m *Status) String() string { return proto.CompactTextString(m) } +func (*Status) ProtoMessage() {} +func (*Status) Descriptor() ([]byte, []int) { + return fileDescriptor_fc08514fc6dadd29, []int{16} +} + +func (m *Status) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Status.Unmarshal(m, b) +} +func (m *Status) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Status.Marshal(b, m, deterministic) +} +func (m *Status) XXX_Merge(src proto.Message) { + xxx_messageInfo_Status.Merge(m, src) +} +func (m *Status) XXX_Size() int { + return xxx_messageInfo_Status.Size(m) +} +func (m *Status) XXX_DiscardUnknown() { + xxx_messageInfo_Status.DiscardUnknown(m) +} + +var xxx_messageInfo_Status proto.InternalMessageInfo + +func (m *Status) GetCode() string { + if m != nil { + return m.Code + } + return "" +} + +func (m *Status) GetError() string { + if m != nil { + return m.Error + } + return "" +} + +type StatusResponse struct { + Status *Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *StatusResponse) Reset() { *m = StatusResponse{} } +func (m *StatusResponse) String() string { return proto.CompactTextString(m) } +func (*StatusResponse) ProtoMessage() {} +func (*StatusResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_fc08514fc6dadd29, []int{17} +} + +func (m *StatusResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_StatusResponse.Unmarshal(m, b) +} +func (m *StatusResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_StatusResponse.Marshal(b, m, deterministic) +} +func (m *StatusResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_StatusResponse.Merge(m, src) +} +func (m *StatusResponse) XXX_Size() int { + return xxx_messageInfo_StatusResponse.Size(m) +} +func (m *StatusResponse) XXX_DiscardUnknown() { + xxx_messageInfo_StatusResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_StatusResponse proto.InternalMessageInfo + +func (m *StatusResponse) GetStatus() *Status { + if m != nil { + return m.Status + } + return nil +} + func init() { proto.RegisterEnum("go.micro.router.AdvertType", AdvertType_name, AdvertType_value) proto.RegisterEnum("go.micro.router.EventType", EventType_name, EventType_value) - proto.RegisterType((*ListRequest)(nil), "go.micro.router.ListRequest") + proto.RegisterType((*Request)(nil), "go.micro.router.Request") proto.RegisterType((*ListResponse)(nil), "go.micro.router.ListResponse") proto.RegisterType((*LookupRequest)(nil), "go.micro.router.LookupRequest") proto.RegisterType((*LookupResponse)(nil), "go.micro.router.LookupResponse") + proto.RegisterType((*QueryRequest)(nil), "go.micro.router.QueryRequest") + proto.RegisterType((*QueryResponse)(nil), "go.micro.router.QueryResponse") proto.RegisterType((*WatchRequest)(nil), "go.micro.router.WatchRequest") proto.RegisterType((*AdvertiseRequest)(nil), "go.micro.router.AdvertiseRequest") proto.RegisterType((*Advert)(nil), "go.micro.router.Advert") @@ -717,47 +885,513 @@ func init() { proto.RegisterType((*Event)(nil), "go.micro.router.Event") proto.RegisterType((*Query)(nil), "go.micro.router.Query") proto.RegisterType((*Route)(nil), "go.micro.router.Route") + proto.RegisterType((*Status)(nil), "go.micro.router.Status") + proto.RegisterType((*StatusResponse)(nil), "go.micro.router.StatusResponse") } -func init() { proto.RegisterFile("router.proto", fileDescriptor_367072455c71aedc) } - -var fileDescriptor_367072455c71aedc = []byte{ - // 591 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x55, 0xc1, 0x6e, 0xd3, 0x40, - 0x10, 0xf5, 0x26, 0xb6, 0x2b, 0x4f, 0x53, 0xd7, 0xcc, 0xa1, 0x58, 0xa6, 0x40, 0xf0, 0xa9, 0xaa, - 0x2a, 0x17, 0x85, 0x33, 0x88, 0x52, 0xca, 0xa5, 0x3d, 0x80, 0x45, 0xc5, 0xd9, 0xd8, 0xa3, 0x62, - 0xb5, 0xb1, 0xdd, 0xdd, 0x4d, 0xab, 0x9c, 0xf9, 0x0c, 0xbe, 0x80, 0xff, 0xe0, 0xc3, 0x90, 0x77, - 0xed, 0xd6, 0x75, 0x62, 0xa4, 0x72, 0xca, 0xce, 0xcc, 0x9b, 0x37, 0x3b, 0x33, 0xfb, 0x62, 0x98, - 0xf0, 0x72, 0x21, 0x89, 0x47, 0x15, 0x2f, 0x65, 0x89, 0xdb, 0x17, 0x65, 0x34, 0xcf, 0x53, 0x5e, - 0x46, 0xda, 0x1d, 0x6e, 0xc1, 0xe6, 0x59, 0x2e, 0x64, 0x4c, 0xd7, 0x0b, 0x12, 0x32, 0x7c, 0x07, - 0x13, 0x6d, 0x8a, 0xaa, 0x2c, 0x04, 0x61, 0x04, 0xb6, 0x02, 0x0a, 0x9f, 0x4d, 0xc7, 0x7b, 0x9b, - 0xb3, 0x9d, 0xa8, 0x47, 0x10, 0xc5, 0xf5, 0x4f, 0xdc, 0xa0, 0xc2, 0xb7, 0xb0, 0x75, 0x56, 0x96, - 0x97, 0x8b, 0xaa, 0x21, 0xc4, 0x03, 0xb0, 0xae, 0x17, 0xc4, 0x97, 0x3e, 0x9b, 0xb2, 0xb5, 0xf9, - 0x5f, 0xea, 0x68, 0xac, 0x41, 0xe1, 0x7b, 0x70, 0xdb, 0xf4, 0xff, 0xbc, 0x80, 0x0b, 0x93, 0x6f, - 0x89, 0x4c, 0x7f, 0xb4, 0x0d, 0x21, 0x78, 0x47, 0xd9, 0x0d, 0x71, 0x99, 0x0b, 0x6a, 0x7d, 0xbf, - 0x19, 0xd8, 0xda, 0x89, 0x2e, 0x8c, 0xf2, 0x4c, 0xdd, 0xcd, 0x89, 0x47, 0x79, 0x86, 0x87, 0x60, - 0xca, 0x65, 0x45, 0xfe, 0x68, 0xca, 0xf6, 0xdc, 0xd9, 0xb3, 0x95, 0x62, 0x3a, 0xed, 0xeb, 0xb2, - 0xa2, 0x58, 0x01, 0x71, 0x17, 0x1c, 0x99, 0xcf, 0x49, 0xc8, 0x64, 0x5e, 0xf9, 0xe3, 0x29, 0xdb, - 0x1b, 0xc7, 0xf7, 0x0e, 0xf4, 0x60, 0x2c, 0xe5, 0x95, 0x6f, 0x2a, 0x7f, 0x7d, 0xac, 0xfb, 0xa1, - 0x1b, 0x2a, 0xa4, 0xf0, 0xad, 0x81, 0x7e, 0x4e, 0xea, 0x70, 0xdc, 0xa0, 0xc2, 0x27, 0xb0, 0xfd, - 0x99, 0x97, 0x29, 0x09, 0xd1, 0x8e, 0x24, 0xf4, 0xc0, 0x3d, 0xe6, 0x94, 0x48, 0xea, 0x7a, 0x3e, - 0xd2, 0x15, 0x3d, 0xf4, 0x9c, 0x57, 0x59, 0x17, 0xf3, 0x93, 0x81, 0xa5, 0xa8, 0x31, 0x6a, 0x7a, - 0x64, 0xaa, 0xc7, 0x60, 0xfd, 0x05, 0x86, 0x5a, 0x1c, 0xf5, 0x5b, 0x3c, 0x00, 0x4b, 0xe5, 0xa9, - 0xe6, 0x87, 0xf7, 0xa3, 0x41, 0xe1, 0x39, 0x58, 0x6a, 0xe1, 0xe8, 0xc3, 0x86, 0x20, 0x7e, 0x93, - 0xa7, 0xd4, 0x4c, 0xbf, 0x35, 0xeb, 0xc8, 0x45, 0x22, 0xe9, 0x36, 0x59, 0xaa, 0x62, 0x4e, 0xdc, - 0x9a, 0x75, 0xa4, 0x20, 0x79, 0x5b, 0xf2, 0x4b, 0x55, 0xcc, 0x89, 0x5b, 0x33, 0xfc, 0xc5, 0xc0, - 0x52, 0x75, 0xfe, 0xcd, 0x9b, 0x64, 0x19, 0x27, 0x21, 0x5a, 0xde, 0xc6, 0xec, 0x56, 0x1c, 0x0f, - 0x56, 0x34, 0x1f, 0x54, 0x44, 0x04, 0xf3, 0x2a, 0x2f, 0x2e, 0x7d, 0x4b, 0xb9, 0xd5, 0x19, 0x77, - 0xc0, 0x9e, 0x93, 0xe4, 0x79, 0xea, 0xdb, 0x6a, 0x4a, 0x8d, 0xb5, 0x3f, 0x03, 0xb8, 0x7f, 0x37, - 0x88, 0xe0, 0x6a, 0xeb, 0xa8, 0x28, 0xca, 0x45, 0x91, 0x92, 0x67, 0xa0, 0x07, 0x13, 0xed, 0xd3, - 0x4b, 0xf3, 0xd8, 0xfe, 0x21, 0x38, 0x77, 0x7b, 0x40, 0x00, 0x5b, 0x6f, 0xdc, 0x33, 0xea, 0xb3, - 0xde, 0xb5, 0xc7, 0xea, 0x73, 0x93, 0x30, 0x9a, 0xfd, 0x31, 0xc1, 0x56, 0x23, 0xe0, 0x78, 0x02, - 0x66, 0x2d, 0x62, 0xdc, 0x5d, 0xd9, 0x45, 0x47, 0xea, 0xc1, 0xf3, 0x81, 0x68, 0xf3, 0x5e, 0x0c, - 0x3c, 0x05, 0x5b, 0x8b, 0x11, 0x5f, 0xac, 0x42, 0xbb, 0x22, 0x0f, 0x5e, 0x0e, 0xc6, 0xef, 0xc8, - 0x3e, 0x80, 0xa5, 0x74, 0x89, 0xab, 0x65, 0xbb, 0x7a, 0x0d, 0x06, 0xf4, 0x10, 0x1a, 0xaf, 0x19, - 0x9e, 0x82, 0x73, 0xa7, 0x65, 0x7c, 0x35, 0xa0, 0xcd, 0x7b, 0x9d, 0x07, 0x4f, 0x07, 0x20, 0x8a, - 0xec, 0x13, 0x6c, 0x34, 0xc2, 0xc2, 0x21, 0x5c, 0x30, 0x5d, 0x09, 0xf4, 0xb5, 0x68, 0xe0, 0x71, - 0xbb, 0x1b, 0x1c, 0x78, 0xfa, 0x6b, 0xa6, 0xd3, 0x93, 0xaf, 0x22, 0xd1, 0x4b, 0x7d, 0x04, 0x49, - 0x4f, 0xf1, 0x8a, 0x44, 0xbf, 0x86, 0x47, 0x90, 0xf4, 0xfe, 0x24, 0x8c, 0xef, 0xb6, 0xfa, 0x4e, - 0xbc, 0xf9, 0x1b, 0x00, 0x00, 0xff, 0xff, 0xbb, 0x08, 0x6d, 0x39, 0x37, 0x06, 0x00, 0x00, +func init() { + proto.RegisterFile("go-micro/network/router/proto/router.proto", fileDescriptor_fc08514fc6dadd29) +} + +var fileDescriptor_fc08514fc6dadd29 = []byte{ + // 702 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x55, 0xcd, 0x4e, 0xdb, 0x40, + 0x10, 0xb6, 0x93, 0xd8, 0xc8, 0xd3, 0x60, 0xdc, 0x51, 0x05, 0x56, 0x5a, 0x68, 0xea, 0x13, 0x42, + 0xd4, 0xa9, 0xd2, 0x6b, 0xff, 0x52, 0x4a, 0x55, 0x09, 0x0e, 0xad, 0x0b, 0xea, 0xd9, 0xd8, 0x2b, + 0x6a, 0x91, 0x78, 0xcd, 0xee, 0x06, 0x94, 0x73, 0x1f, 0xa3, 0x4f, 0xd0, 0xe7, 0xea, 0x33, 0xf4, + 0x5e, 0x79, 0x77, 0x1d, 0x92, 0x18, 0x23, 0xc1, 0xc9, 0x3b, 0x7f, 0xdf, 0xec, 0xcc, 0xce, 0x37, + 0x86, 0xbd, 0x73, 0xfa, 0x72, 0x92, 0x25, 0x8c, 0x0e, 0x72, 0x22, 0xae, 0x29, 0xbb, 0x18, 0x30, + 0x3a, 0x15, 0x84, 0x0d, 0x0a, 0x46, 0x05, 0xd5, 0x42, 0x28, 0x05, 0xdc, 0x38, 0xa7, 0xa1, 0xf4, + 0x0d, 0x95, 0x3a, 0x70, 0x60, 0x2d, 0x22, 0x97, 0x53, 0xc2, 0x45, 0xf0, 0x0e, 0xba, 0xc7, 0x19, + 0x17, 0x11, 0xe1, 0x05, 0xcd, 0x39, 0xc1, 0x10, 0x6c, 0xe9, 0xc4, 0x7d, 0xb3, 0xdf, 0xde, 0x7d, + 0x34, 0xdc, 0x0c, 0x57, 0x82, 0xc3, 0xa8, 0xfc, 0x44, 0xda, 0x2b, 0x78, 0x0b, 0xeb, 0xc7, 0x94, + 0x5e, 0x4c, 0x0b, 0x0d, 0x88, 0xfb, 0x60, 0x5d, 0x4e, 0x09, 0x9b, 0xf9, 0x66, 0xdf, 0xbc, 0x35, + 0xfe, 0x5b, 0x69, 0x8d, 0x94, 0x53, 0xf0, 0x01, 0xdc, 0x2a, 0xfc, 0x81, 0x17, 0x78, 0x03, 0x5d, + 0x85, 0xf8, 0xa0, 0xfc, 0xef, 0x61, 0x5d, 0x47, 0x3f, 0x30, 0xbd, 0x0b, 0xdd, 0x1f, 0xb1, 0x48, + 0x7e, 0x56, 0xfd, 0x44, 0xf0, 0x46, 0xe9, 0x15, 0x61, 0x22, 0xe3, 0xa4, 0xd2, 0xfd, 0x31, 0xc1, + 0x56, 0x4a, 0x74, 0xa1, 0x95, 0xa5, 0xf2, 0x6a, 0x4e, 0xd4, 0xca, 0x52, 0x1c, 0x40, 0x47, 0xcc, + 0x0a, 0xe2, 0xb7, 0xfa, 0xe6, 0xae, 0x3b, 0x7c, 0x5a, 0x4b, 0xa6, 0xc2, 0x4e, 0x66, 0x05, 0x89, + 0xa4, 0x23, 0x3e, 0x03, 0x47, 0x64, 0x13, 0xc2, 0x45, 0x3c, 0x29, 0xfc, 0x76, 0xdf, 0xdc, 0x6d, + 0x47, 0x37, 0x0a, 0xf4, 0xa0, 0x2d, 0xc4, 0xd8, 0xef, 0x48, 0x7d, 0x79, 0x2c, 0xeb, 0x21, 0x57, + 0x24, 0x17, 0xdc, 0xb7, 0x1a, 0xea, 0x39, 0x2c, 0xcd, 0x91, 0xf6, 0x0a, 0x1e, 0xc3, 0xc6, 0x57, + 0x46, 0x13, 0xc2, 0x79, 0xd5, 0x92, 0xc0, 0x03, 0xf7, 0x80, 0x91, 0x58, 0x90, 0x45, 0xcd, 0x27, + 0x32, 0x26, 0xcb, 0x9a, 0xd3, 0x22, 0x5d, 0xf4, 0xf9, 0x65, 0x82, 0x25, 0xa1, 0x31, 0xd4, 0x35, + 0x9a, 0xb2, 0xc6, 0xde, 0xed, 0x17, 0x68, 0x2a, 0xb1, 0xb5, 0x5a, 0xe2, 0x3e, 0x58, 0x32, 0x4e, + 0x16, 0xdf, 0xfc, 0x3e, 0xca, 0x29, 0x38, 0x05, 0x4b, 0xbe, 0x2f, 0xfa, 0xb0, 0xc6, 0x09, 0xbb, + 0xca, 0x12, 0xa2, 0xbb, 0x5f, 0x89, 0xa5, 0xe5, 0x3c, 0x16, 0xe4, 0x3a, 0x9e, 0xc9, 0x64, 0x4e, + 0x54, 0x89, 0xa5, 0x45, 0x93, 0x4b, 0x26, 0x73, 0xa2, 0x4a, 0x0c, 0x7e, 0x9b, 0x60, 0xc9, 0x3c, + 0x77, 0xe3, 0xc6, 0x69, 0xca, 0x08, 0xe7, 0x15, 0xae, 0x16, 0x17, 0x33, 0xb6, 0x1b, 0x33, 0x76, + 0x96, 0x32, 0x22, 0x42, 0x67, 0x9c, 0xe5, 0x17, 0xbe, 0x25, 0xd5, 0xf2, 0x8c, 0x9b, 0x60, 0x4f, + 0x88, 0x60, 0x59, 0xe2, 0xdb, 0xb2, 0x4b, 0x5a, 0x0a, 0x86, 0x60, 0x7f, 0x17, 0xb1, 0x98, 0xf2, + 0x32, 0x2a, 0xa1, 0x69, 0x75, 0x35, 0x79, 0xc6, 0x27, 0x60, 0x11, 0xc6, 0x28, 0xd3, 0xb7, 0x52, + 0x42, 0x30, 0x02, 0x57, 0xc5, 0xcc, 0x99, 0x30, 0x00, 0x9b, 0x4b, 0x8d, 0x66, 0xd2, 0x56, 0xad, + 0xd3, 0x3a, 0x40, 0xbb, 0xed, 0x0d, 0x01, 0x6e, 0xc6, 0x15, 0x11, 0x5c, 0x25, 0x8d, 0xf2, 0x9c, + 0x4e, 0xf3, 0x84, 0x78, 0x06, 0x7a, 0xd0, 0x55, 0x3a, 0x35, 0x2b, 0x9e, 0xb9, 0x37, 0x00, 0x67, + 0xfe, 0xfc, 0x08, 0x60, 0xab, 0x41, 0xf3, 0x8c, 0xf2, 0xac, 0x46, 0xcc, 0x33, 0xcb, 0xb3, 0x0e, + 0x68, 0x0d, 0xff, 0xb5, 0xc0, 0x96, 0x9d, 0x67, 0x78, 0x04, 0xb6, 0xda, 0x1d, 0xb8, 0x53, 0xbb, + 0xda, 0xd2, 0x4e, 0xea, 0x3d, 0x6f, 0xb4, 0xeb, 0x61, 0x35, 0xf0, 0x23, 0x58, 0x92, 0xc7, 0xb8, + 0x5d, 0xf3, 0x5d, 0xe4, 0x77, 0xaf, 0x81, 0x3f, 0x81, 0xf1, 0xca, 0xc4, 0x23, 0x70, 0xe6, 0xdc, + 0xc7, 0x17, 0x0d, 0x5c, 0xbe, 0xd9, 0x0b, 0xbd, 0xad, 0x06, 0x17, 0x09, 0xf6, 0x19, 0xd6, 0x34, + 0x11, 0xb1, 0xc9, 0xaf, 0xd7, 0xaf, 0x19, 0x56, 0xb9, 0x6b, 0xe0, 0xe1, 0x7c, 0x18, 0xfc, 0x3a, + 0x55, 0x1a, 0xfb, 0xb3, 0x3c, 0x0b, 0x81, 0x31, 0xfc, 0xdb, 0x02, 0xeb, 0x24, 0x3e, 0x1b, 0x13, + 0x3c, 0xa8, 0x5e, 0x09, 0x1b, 0xb8, 0x77, 0x0b, 0xdc, 0xca, 0xfe, 0x30, 0x4a, 0x10, 0xf5, 0xbc, + 0xf7, 0x00, 0x59, 0x59, 0x39, 0x12, 0x44, 0xcd, 0xc5, 0x3d, 0x40, 0x56, 0xb6, 0x94, 0x81, 0x5f, + 0xaa, 0x0d, 0xb1, 0xdd, 0xf0, 0xa7, 0xd0, 0x3d, 0xda, 0x69, 0x32, 0xcf, 0x91, 0x46, 0xd0, 0x29, + 0x7f, 0xa5, 0x77, 0xf4, 0xb9, 0x9e, 0x62, 0xf1, 0xdf, 0x1b, 0x18, 0x67, 0xb6, 0xfc, 0x61, 0xbf, + 0xfe, 0x1f, 0x00, 0x00, 0xff, 0xff, 0xa6, 0xfc, 0x65, 0xca, 0xde, 0x07, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// RouterClient is the client API for Router service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type RouterClient interface { + Lookup(ctx context.Context, in *LookupRequest, opts ...grpc.CallOption) (*LookupResponse, error) + Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (Router_WatchClient, error) + Advertise(ctx context.Context, in *AdvertiseRequest, opts ...grpc.CallOption) (Router_AdvertiseClient, error) + Process(ctx context.Context, in *Advert, opts ...grpc.CallOption) (*ProcessResponse, error) + Status(ctx context.Context, in *Request, opts ...grpc.CallOption) (*StatusResponse, error) +} + +type routerClient struct { + cc *grpc.ClientConn +} + +func NewRouterClient(cc *grpc.ClientConn) RouterClient { + return &routerClient{cc} +} + +func (c *routerClient) Lookup(ctx context.Context, in *LookupRequest, opts ...grpc.CallOption) (*LookupResponse, error) { + out := new(LookupResponse) + err := c.cc.Invoke(ctx, "/go.micro.router.Router/Lookup", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *routerClient) Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (Router_WatchClient, error) { + stream, err := c.cc.NewStream(ctx, &_Router_serviceDesc.Streams[0], "/go.micro.router.Router/Watch", opts...) + if err != nil { + return nil, err + } + x := &routerWatchClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Router_WatchClient interface { + Recv() (*Event, error) + grpc.ClientStream +} + +type routerWatchClient struct { + grpc.ClientStream +} + +func (x *routerWatchClient) Recv() (*Event, error) { + m := new(Event) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *routerClient) Advertise(ctx context.Context, in *AdvertiseRequest, opts ...grpc.CallOption) (Router_AdvertiseClient, error) { + stream, err := c.cc.NewStream(ctx, &_Router_serviceDesc.Streams[1], "/go.micro.router.Router/Advertise", opts...) + if err != nil { + return nil, err + } + x := &routerAdvertiseClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Router_AdvertiseClient interface { + Recv() (*Advert, error) + grpc.ClientStream +} + +type routerAdvertiseClient struct { + grpc.ClientStream +} + +func (x *routerAdvertiseClient) Recv() (*Advert, error) { + m := new(Advert) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *routerClient) Process(ctx context.Context, in *Advert, opts ...grpc.CallOption) (*ProcessResponse, error) { + out := new(ProcessResponse) + err := c.cc.Invoke(ctx, "/go.micro.router.Router/Process", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *routerClient) Status(ctx context.Context, in *Request, opts ...grpc.CallOption) (*StatusResponse, error) { + out := new(StatusResponse) + err := c.cc.Invoke(ctx, "/go.micro.router.Router/Status", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// RouterServer is the server API for Router service. +type RouterServer interface { + Lookup(context.Context, *LookupRequest) (*LookupResponse, error) + Watch(*WatchRequest, Router_WatchServer) error + Advertise(*AdvertiseRequest, Router_AdvertiseServer) error + Process(context.Context, *Advert) (*ProcessResponse, error) + Status(context.Context, *Request) (*StatusResponse, error) +} + +func RegisterRouterServer(s *grpc.Server, srv RouterServer) { + s.RegisterService(&_Router_serviceDesc, srv) +} + +func _Router_Lookup_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(LookupRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(RouterServer).Lookup(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/go.micro.router.Router/Lookup", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(RouterServer).Lookup(ctx, req.(*LookupRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Router_Watch_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(WatchRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(RouterServer).Watch(m, &routerWatchServer{stream}) +} + +type Router_WatchServer interface { + Send(*Event) error + grpc.ServerStream +} + +type routerWatchServer struct { + grpc.ServerStream +} + +func (x *routerWatchServer) Send(m *Event) error { + return x.ServerStream.SendMsg(m) +} + +func _Router_Advertise_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(AdvertiseRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(RouterServer).Advertise(m, &routerAdvertiseServer{stream}) +} + +type Router_AdvertiseServer interface { + Send(*Advert) error + grpc.ServerStream +} + +type routerAdvertiseServer struct { + grpc.ServerStream +} + +func (x *routerAdvertiseServer) Send(m *Advert) error { + return x.ServerStream.SendMsg(m) +} + +func _Router_Process_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Advert) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(RouterServer).Process(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/go.micro.router.Router/Process", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(RouterServer).Process(ctx, req.(*Advert)) + } + return interceptor(ctx, in, info, handler) +} + +func _Router_Status_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Request) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(RouterServer).Status(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/go.micro.router.Router/Status", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(RouterServer).Status(ctx, req.(*Request)) + } + return interceptor(ctx, in, info, handler) +} + +var _Router_serviceDesc = grpc.ServiceDesc{ + ServiceName: "go.micro.router.Router", + HandlerType: (*RouterServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Lookup", + Handler: _Router_Lookup_Handler, + }, + { + MethodName: "Process", + Handler: _Router_Process_Handler, + }, + { + MethodName: "Status", + Handler: _Router_Status_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "Watch", + Handler: _Router_Watch_Handler, + ServerStreams: true, + }, + { + StreamName: "Advertise", + Handler: _Router_Advertise_Handler, + ServerStreams: true, + }, + }, + Metadata: "go-micro/network/router/proto/router.proto", +} + +// TableClient is the client API for Table service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type TableClient interface { + Create(ctx context.Context, in *Route, opts ...grpc.CallOption) (*CreateResponse, error) + Delete(ctx context.Context, in *Route, opts ...grpc.CallOption) (*DeleteResponse, error) + Update(ctx context.Context, in *Route, opts ...grpc.CallOption) (*UpdateResponse, error) + Query(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (*QueryResponse, error) + List(ctx context.Context, in *Request, opts ...grpc.CallOption) (*ListResponse, error) +} + +type tableClient struct { + cc *grpc.ClientConn +} + +func NewTableClient(cc *grpc.ClientConn) TableClient { + return &tableClient{cc} +} + +func (c *tableClient) Create(ctx context.Context, in *Route, opts ...grpc.CallOption) (*CreateResponse, error) { + out := new(CreateResponse) + err := c.cc.Invoke(ctx, "/go.micro.router.Table/Create", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *tableClient) Delete(ctx context.Context, in *Route, opts ...grpc.CallOption) (*DeleteResponse, error) { + out := new(DeleteResponse) + err := c.cc.Invoke(ctx, "/go.micro.router.Table/Delete", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *tableClient) Update(ctx context.Context, in *Route, opts ...grpc.CallOption) (*UpdateResponse, error) { + out := new(UpdateResponse) + err := c.cc.Invoke(ctx, "/go.micro.router.Table/Update", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *tableClient) Query(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (*QueryResponse, error) { + out := new(QueryResponse) + err := c.cc.Invoke(ctx, "/go.micro.router.Table/Query", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *tableClient) List(ctx context.Context, in *Request, opts ...grpc.CallOption) (*ListResponse, error) { + out := new(ListResponse) + err := c.cc.Invoke(ctx, "/go.micro.router.Table/List", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// TableServer is the server API for Table service. +type TableServer interface { + Create(context.Context, *Route) (*CreateResponse, error) + Delete(context.Context, *Route) (*DeleteResponse, error) + Update(context.Context, *Route) (*UpdateResponse, error) + Query(context.Context, *QueryRequest) (*QueryResponse, error) + List(context.Context, *Request) (*ListResponse, error) +} + +func RegisterTableServer(s *grpc.Server, srv TableServer) { + s.RegisterService(&_Table_serviceDesc, srv) +} + +func _Table_Create_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Route) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TableServer).Create(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/go.micro.router.Table/Create", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TableServer).Create(ctx, req.(*Route)) + } + return interceptor(ctx, in, info, handler) +} + +func _Table_Delete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Route) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TableServer).Delete(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/go.micro.router.Table/Delete", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TableServer).Delete(ctx, req.(*Route)) + } + return interceptor(ctx, in, info, handler) +} + +func _Table_Update_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Route) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TableServer).Update(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/go.micro.router.Table/Update", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TableServer).Update(ctx, req.(*Route)) + } + return interceptor(ctx, in, info, handler) +} + +func _Table_Query_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(QueryRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TableServer).Query(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/go.micro.router.Table/Query", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TableServer).Query(ctx, req.(*QueryRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Table_List_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Request) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TableServer).List(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/go.micro.router.Table/List", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TableServer).List(ctx, req.(*Request)) + } + return interceptor(ctx, in, info, handler) +} + +var _Table_serviceDesc = grpc.ServiceDesc{ + ServiceName: "go.micro.router.Table", + HandlerType: (*TableServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Create", + Handler: _Table_Create_Handler, + }, + { + MethodName: "Delete", + Handler: _Table_Delete_Handler, + }, + { + MethodName: "Update", + Handler: _Table_Update_Handler, + }, + { + MethodName: "Query", + Handler: _Table_Query_Handler, + }, + { + MethodName: "List", + Handler: _Table_List_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "go-micro/network/router/proto/router.proto", } diff --git a/network/router/proto/router.proto b/network/router/proto/router.proto index 921e3e7e..15ef2cd2 100644 --- a/network/router/proto/router.proto +++ b/network/router/proto/router.proto @@ -4,18 +4,23 @@ package go.micro.router; // Router service is used by the proxy to lookup routes service Router { - rpc List(ListRequest) returns (ListResponse) {}; rpc Lookup(LookupRequest) returns (LookupResponse) {}; rpc Watch(WatchRequest) returns (stream Event) {}; - rpc Advertise(AdvertiseRequest) returns (stream Advert) {}; + rpc Advertise(Request) returns (stream Advert) {}; rpc Process(Advert) returns (ProcessResponse) {}; - rpc Create(Route) returns (CreateResponse) {}; - rpc Delete(Route) returns (DeleteResponse) {}; - rpc Update(Route) returns (UpdateResponse) {}; + rpc Status(Request) returns (StatusResponse) {}; } -// ListRequest is made to List routes -message ListRequest {} +service Table { + rpc Create(Route) returns (CreateResponse) {}; + rpc Delete(Route) returns (DeleteResponse) {}; + rpc Update(Route) returns (UpdateResponse) {}; + rpc List(Request) returns (ListResponse) {}; + rpc Query(QueryRequest) returns (QueryResponse) {}; +} + +// Empty request +message Request {} // ListResponse is returned by List message ListResponse { @@ -32,12 +37,17 @@ message LookupResponse { repeated Route routes = 1; } +message QueryRequest{ + Query query = 1; +} + +message QueryResponse { + repeated Route routes = 1; +} + // WatchRequest is made to Watch Router message WatchRequest {} -// AdvertiseRequest request a stream of Adverts -message AdvertiseRequest {} - // AdvertType defines the type of advert enum AdvertType { AdvertAnnounce = 0; @@ -112,3 +122,12 @@ message Route { // the metric / score of this route int64 metric = 6; } + +message Status { + string code = 1; + string error = 2; +} + +message StatusResponse { + Status status = 1; +} diff --git a/network/router/router.go b/network/router/router.go index 30ea857a..af6f8bc3 100644 --- a/network/router/router.go +++ b/network/router/router.go @@ -11,6 +11,8 @@ type Router interface { Init(...Option) error // Options returns the router options Options() Options + // The routing table + Table() Table // Advertise advertises routes to the network Advertise() (<-chan *Advert, error) // Process processes incoming adverts @@ -27,6 +29,19 @@ type Router interface { String() string } +type Table interface { + // Create new route in the routing table + Create(Route) error + // Delete deletes existing route from the routing table + Delete(Route) error + // Update updates route in the routing table + Update(Route) error + // List returns the list of all routes in the table + List() ([]Route, error) + // Query queries routes in the routing table + Query(Query) ([]Route, error) +} + // Option used by the router type Option func(*Options) diff --git a/network/router/service/service.go b/network/router/service/service.go index 09806cd3..21d025d1 100644 --- a/network/router/service/service.go +++ b/network/router/service/service.go @@ -2,12 +2,12 @@ package service import ( "context" + "errors" "fmt" "io" "sync" "time" - "github.com/google/uuid" "github.com/micro/go-micro/client" "github.com/micro/go-micro/network/router" pb "github.com/micro/go-micro/network/router/proto" @@ -16,13 +16,13 @@ import ( type svc struct { sync.RWMutex opts router.Options + callOpts []client.CallOption router pb.RouterService - status router.Status - watchers map[string]*svcWatcher - exit chan struct{} + table *table + status *router.Status + exit chan bool errChan chan error advertChan chan *router.Advert - wg *sync.WaitGroup } // NewRouter creates new service router and returns it @@ -36,18 +36,27 @@ func NewRouter(opts ...router.Option) router.Router { } // NOTE: might need some client opts here - client := client.DefaultClient + cli := client.DefaultClient + + // set options client + if options.Client != nil { + cli = options.Client + } // NOTE: should we have Client/Service option in router.Options? s := &svc{ - opts: options, - router: pb.NewRouterService(router.DefaultName, client), - status: router.Status{Code: router.Stopped, Error: nil}, - watchers: make(map[string]*svcWatcher), - wg: &sync.WaitGroup{}, + opts: options, + router: pb.NewRouterService(router.DefaultName, cli), } - go s.run() + // set the router address to call + if len(options.Address) > 0 { + s.callOpts = []client.CallOption{ + client.WithAddress(options.Address), + } + } + // set the table + s.table = &table{pb.NewTableService(router.DefaultName, cli), s.callOpts} return s } @@ -65,128 +74,12 @@ func (s *svc) Options() router.Options { return s.opts } -// watchRouter watches router and send events to all registered watchers -func (s *svc) watchRouter(stream pb.Router_WatchService) error { - s.wg.Add(1) +func (s *svc) Table() router.Table { + return s.table +} + +func (s *svc) advertiseEvents(advertChan chan *router.Advert, stream pb.Router_AdvertiseService) error { go func() { - defer s.wg.Done() - <-s.exit - stream.Close() - }() - - var watchErr error - - for { - resp, err := stream.Recv() - if err != nil { - if err != io.EOF { - watchErr = err - } - break - } - - route := router.Route{ - Service: resp.Route.Service, - Address: resp.Route.Address, - Gateway: resp.Route.Gateway, - Network: resp.Route.Network, - Link: resp.Route.Link, - Metric: int(resp.Route.Metric), - } - - event := &router.Event{ - Type: router.EventType(resp.Type), - Timestamp: time.Unix(0, resp.Timestamp), - Route: route, - } - - // TODO: might make this non-blocking - s.RLock() - for _, w := range s.watchers { - select { - case w.resChan <- event: - case <-w.done: - } - } - s.RUnlock() - } - - return watchErr -} - -// watchErrors watches router errors and takes appropriate actions -func (s *svc) watchErrors() { - var err error - - select { - case <-s.exit: - case err = <-s.errChan: - } - - s.Lock() - defer s.Unlock() - if s.status.Code != router.Stopped { - // notify all goroutines to finish - close(s.exit) - if s.status.Code == router.Advertising { - // drain the advertise channel - for range s.advertChan { - } - } - s.status = router.Status{Code: router.Stopped, Error: nil} - } - - if err != nil { - s.status = router.Status{Code: router.Error, Error: err} - } -} - -// Run runs the router. -func (s *svc) run() { - s.Lock() - defer s.Unlock() - - switch s.status.Code { - case router.Stopped, router.Error: - stream, err := s.router.Watch(context.Background(), &pb.WatchRequest{}) - if err != nil { - s.status = router.Status{Code: router.Error, Error: fmt.Errorf("failed getting event stream: %s", err)} - return - } - - // create error and exit channels - s.errChan = make(chan error, 1) - s.exit = make(chan struct{}) - - s.wg.Add(1) - go func() { - defer s.wg.Done() - select { - case s.errChan <- s.watchRouter(stream): - case <-s.exit: - } - }() - - // watch for errors and cleanup - s.wg.Add(1) - go func() { - defer s.wg.Done() - s.watchErrors() - }() - - // mark router as Running and set its Error to nil - s.status = router.Status{Code: router.Running, Error: nil} - - return - } - - return -} - -func (s *svc) advertiseEvents(stream pb.Router_AdvertiseService) error { - s.wg.Add(1) - go func() { - defer s.wg.Done() <-s.exit stream.Close() }() @@ -229,15 +122,15 @@ func (s *svc) advertiseEvents(stream pb.Router_AdvertiseService) error { } select { - case s.advertChan <- advert: + case advertChan <- advert: case <-s.exit: - close(s.advertChan) + close(advertChan) return nil } } // close the channel on exit - close(s.advertChan) + close(advertChan) return advErr } @@ -247,33 +140,29 @@ func (s *svc) Advertise() (<-chan *router.Advert, error) { s.Lock() defer s.Unlock() - switch s.status.Code { - case router.Advertising: - return s.advertChan, nil - case router.Running: - stream, err := s.router.Advertise(context.Background(), &pb.AdvertiseRequest{}) + // get the status + status := s.Status() + + switch status.Code { + case router.Running, router.Advertising: + stream, err := s.router.Advertise(context.Background(), &pb.AdvertiseRequest{}, s.callOpts...) if err != nil { return nil, fmt.Errorf("failed getting advert stream: %s", err) } - // create advertise and event channels - s.advertChan = make(chan *router.Advert) - - s.wg.Add(1) - go func() { - defer s.wg.Done() - select { - case s.errChan <- s.advertiseEvents(stream): - case <-s.exit: - } - }() - - // mark router as Running and set its Error to nil - s.status = router.Status{Code: router.Advertising, Error: nil} - - return s.advertChan, nil + advertChan := make(chan *router.Advert) + go s.advertiseEvents(advertChan, stream) + return advertChan, nil case router.Stopped: - return nil, fmt.Errorf("not running") + // check if our router is stopped + select { + case <-s.exit: + s.exit = make(chan bool) + // call advertise again + return s.Advertise() + default: + return nil, fmt.Errorf("not running") + } } return nil, fmt.Errorf("error: %s", s.status.Error) @@ -306,87 +195,75 @@ func (s *svc) Process(advert *router.Advert) error { Events: events, } - if _, err := s.router.Process(context.Background(), advertReq); err != nil { + if _, err := s.router.Process(context.Background(), advertReq, s.callOpts...); err != nil { return err } return nil } -// Create new route in the routing table -func (s *svc) Create(r router.Route) error { - route := &pb.Route{ - Service: r.Service, - Address: r.Address, - Gateway: r.Gateway, - Network: r.Network, - Link: r.Link, - Metric: int64(r.Metric), +// Status returns router status +func (s *svc) Status() router.Status { + s.Lock() + defer s.Unlock() + + // check if its stopped + select { + case <-s.exit: + return router.Status{ + Code: router.Stopped, + Error: nil, + } + default: + // don't block } - if _, err := s.router.Create(context.Background(), route); err != nil { - return err - } - - return nil -} - -// Delete deletes existing route from the routing table -func (s *svc) Delete(r router.Route) error { - route := &pb.Route{ - Service: r.Service, - Address: r.Address, - Gateway: r.Gateway, - Network: r.Network, - Link: r.Link, - Metric: int64(r.Metric), - } - - if _, err := s.router.Delete(context.Background(), route); err != nil { - return err - } - - return nil -} - -// Update updates route in the routing table -func (s *svc) Update(r router.Route) error { - route := &pb.Route{ - Service: r.Service, - Address: r.Address, - Gateway: r.Gateway, - Network: r.Network, - Link: r.Link, - Metric: int64(r.Metric), - } - - if _, err := s.router.Update(context.Background(), route); err != nil { - return err - } - - return nil -} - -// List returns the list of all routes in the table -func (s *svc) List() ([]router.Route, error) { - resp, err := s.router.List(context.Background(), &pb.ListRequest{}) + // check the remote router + rsp, err := s.router.Status(context.Background(), &pb.Request{}, s.callOpts...) if err != nil { - return nil, err - } - - routes := make([]router.Route, len(resp.Routes)) - for i, route := range resp.Routes { - routes[i] = router.Route{ - Service: route.Service, - Address: route.Address, - Gateway: route.Gateway, - Network: route.Network, - Link: route.Link, - Metric: int(route.Metric), + return router.Status{ + Code: router.Error, + Error: err, } } - return routes, nil + code := router.Running + var serr error + + switch rsp.Status.Code { + case "running": + code = router.Running + case "advertising": + code = router.Advertising + case "stopped": + code = router.Stopped + case "error": + code = router.Error + } + + if len(rsp.Status.Error) > 0 { + serr = errors.New(rsp.Status.Error) + } + + return router.Status{ + Code: code, + Error: serr, + } +} + +// Remote router cannot be stopped +func (s *svc) Stop() error { + s.Lock() + defer s.Unlock() + + select { + case <-s.exit: + return nil + default: + close(s.exit) + } + + return nil } // Lookup looks up routes in the routing table and returns them @@ -398,7 +275,7 @@ func (s *svc) Lookup(q router.Query) ([]router.Route, error) { Gateway: q.Options().Gateway, Network: q.Options().Network, }, - }) + }, s.callOpts...) // errored out if err != nil { @@ -422,69 +299,15 @@ func (s *svc) Lookup(q router.Query) ([]router.Route, error) { // Watch returns a watcher which allows to track updates to the routing table func (s *svc) Watch(opts ...router.WatchOption) (router.Watcher, error) { - wopts := router.WatchOptions{ - Service: "*", + rsp, err := s.router.Watch(context.Background(), &pb.WatchRequest{}, s.callOpts...) + if err != nil { + return nil, err } - + var options router.WatchOptions for _, o := range opts { - o(&wopts) + o(&options) } - - w := &svcWatcher{ - opts: wopts, - resChan: make(chan *router.Event, 10), - done: make(chan struct{}), - } - - s.Lock() - s.watchers[uuid.New().String()] = w - s.Unlock() - - // when the router stops, stop the watcher and exit - s.wg.Add(1) - go func() { - defer s.wg.Done() - <-s.exit - w.Stop() - }() - - return w, nil -} - -// Status returns router status -func (s *svc) Status() router.Status { - s.RLock() - defer s.RUnlock() - - // make a copy of the status - status := s.status - - return status -} - -// Stop stops the router -func (s *svc) Stop() error { - s.Lock() - // only close the channel if the router is running and/or advertising - if s.status.Code == router.Running || s.status.Code == router.Advertising { - // notify all goroutines to finish - close(s.exit) - - // drain the advertise channel only if advertising - if s.status.Code == router.Advertising { - for range s.advertChan { - } - } - - // mark the router as Stopped and set its Error to nil - s.status = router.Status{Code: router.Stopped, Error: nil} - } - s.Unlock() - - // wait for all goroutines to finish - s.wg.Wait() - - return nil + return newWatcher(rsp, options) } // Returns the router implementation diff --git a/network/router/service/table.go b/network/router/service/table.go new file mode 100644 index 00000000..2843a8f6 --- /dev/null +++ b/network/router/service/table.go @@ -0,0 +1,121 @@ +package service + +import ( + "context" + + "github.com/micro/go-micro/client" + "github.com/micro/go-micro/network/router" + pb "github.com/micro/go-micro/network/router/proto" +) + +type table struct { + table pb.TableService + callOpts []client.CallOption +} + +// Create new route in the routing table +func (t *table) Create(r router.Route) error { + route := &pb.Route{ + Service: r.Service, + Address: r.Address, + Gateway: r.Gateway, + Network: r.Network, + Link: r.Link, + Metric: int64(r.Metric), + } + + if _, err := t.table.Create(context.Background(), route, t.callOpts...); err != nil { + return err + } + + return nil +} + +// Delete deletes existing route from the routing table +func (t *table) Delete(r router.Route) error { + route := &pb.Route{ + Service: r.Service, + Address: r.Address, + Gateway: r.Gateway, + Network: r.Network, + Link: r.Link, + Metric: int64(r.Metric), + } + + if _, err := t.table.Delete(context.Background(), route, t.callOpts...); err != nil { + return err + } + + return nil +} + +// Update updates route in the routing table +func (t *table) Update(r router.Route) error { + route := &pb.Route{ + Service: r.Service, + Address: r.Address, + Gateway: r.Gateway, + Network: r.Network, + Link: r.Link, + Metric: int64(r.Metric), + } + + if _, err := t.table.Update(context.Background(), route, t.callOpts...); err != nil { + return err + } + + return nil +} + +// List returns the list of all routes in the table +func (t *table) List() ([]router.Route, error) { + resp, err := t.table.List(context.Background(), &pb.Request{}, t.callOpts...) + if err != nil { + return nil, err + } + + routes := make([]router.Route, len(resp.Routes)) + for i, route := range resp.Routes { + routes[i] = router.Route{ + Service: route.Service, + Address: route.Address, + Gateway: route.Gateway, + Network: route.Network, + Link: route.Link, + Metric: int(route.Metric), + } + } + + return routes, nil +} + +// Lookup looks up routes in the routing table and returns them +func (t *table) Query(q router.Query) ([]router.Route, error) { + // call the router + resp, err := t.table.Query(context.Background(), &pb.QueryRequest{ + Query: &pb.Query{ + Service: q.Options().Service, + Gateway: q.Options().Gateway, + Network: q.Options().Network, + }, + }, t.callOpts...) + + // errored out + if err != nil { + return nil, err + } + + routes := make([]router.Route, len(resp.Routes)) + for i, route := range resp.Routes { + routes[i] = router.Route{ + Service: route.Service, + Address: route.Address, + Gateway: route.Gateway, + Network: route.Network, + Link: route.Link, + Metric: int(route.Metric), + } + } + + return routes, nil +} diff --git a/network/router/service/watcher.go b/network/router/service/watcher.go index 9ed37dab..53bb7648 100644 --- a/network/router/service/watcher.go +++ b/network/router/service/watcher.go @@ -1,20 +1,88 @@ package service import ( + "io" "sync" + "time" "github.com/micro/go-micro/network/router" + pb "github.com/micro/go-micro/network/router/proto" ) -type svcWatcher struct { +type watcher struct { sync.RWMutex opts router.WatchOptions resChan chan *router.Event done chan struct{} } +func newWatcher(rsp pb.Router_WatchService, opts router.WatchOptions) (*watcher, error) { + w := &watcher{ + opts: opts, + resChan: make(chan *router.Event), + done: make(chan struct{}), + } + + go func() { + for { + select { + case <-w.done: + return + default: + if err := w.watch(rsp); err != nil { + w.Stop() + return + } + } + } + }() + + return w, nil +} + +// watchRouter watches router and send events to all registered watchers +func (w *watcher) watch(stream pb.Router_WatchService) error { + defer stream.Close() + + var watchErr error + + for { + resp, err := stream.Recv() + if err != nil { + if err != io.EOF { + watchErr = err + } + break + } + + route := router.Route{ + Service: resp.Route.Service, + Address: resp.Route.Address, + Gateway: resp.Route.Gateway, + Network: resp.Route.Network, + Link: resp.Route.Link, + Metric: int(resp.Route.Metric), + } + + event := &router.Event{ + Type: router.EventType(resp.Type), + Timestamp: time.Unix(0, resp.Timestamp), + Route: route, + } + + for { + select { + case w.resChan <- event: + case <-w.done: + } + } + } + + return watchErr +} + // Next is a blocking call that returns watch result -func (w *svcWatcher) Next() (*router.Event, error) { +func (w *watcher) Next() (*router.Event, error) { for { select { case res := <-w.resChan: @@ -31,12 +99,12 @@ func (w *svcWatcher) Next() (*router.Event, error) { } // Chan returns event channel -func (w *svcWatcher) Chan() (<-chan *router.Event, error) { +func (w *watcher) Chan() (<-chan *router.Event, error) { return w.resChan, nil } // Stop stops watcher -func (w *svcWatcher) Stop() { +func (w *watcher) Stop() { w.Lock() defer w.Unlock() diff --git a/network/router/table.go b/network/router/table.go index 931dcdca..905944b7 100644 --- a/network/router/table.go +++ b/network/router/table.go @@ -128,7 +128,7 @@ func findRoutes(routes map[uint64]Route, network, router string) []Route { } // Lookup queries routing table and returns all routes that match the lookup query -func (t *table) Lookup(q Query) ([]Route, error) { +func (t *table) Query(q Query) ([]Route, error) { t.RLock() defer t.RUnlock() diff --git a/network/router/table_test.go b/network/router/table_test.go index dc35a5f1..16c73f2f 100644 --- a/network/router/table_test.go +++ b/network/router/table_test.go @@ -103,7 +103,7 @@ func TestList(t *testing.T) { } } -func TestLookup(t *testing.T) { +func TestQuery(t *testing.T) { table, route := testSetup() svc := []string{"svc1", "svc2", "svc3"} @@ -122,7 +122,7 @@ func TestLookup(t *testing.T) { // return all routes query := NewQuery() - routes, err := table.Lookup(query) + routes, err := table.Query(query) if err != nil { t.Errorf("error looking up routes: %s", err) } @@ -130,7 +130,7 @@ func TestLookup(t *testing.T) { // query particular net query = NewQuery(QueryNetwork("net1")) - routes, err = table.Lookup(query) + routes, err = table.Query(query) if err != nil { t.Errorf("error looking up routes: %s", err) } @@ -143,7 +143,7 @@ func TestLookup(t *testing.T) { gateway := "gw1" query = NewQuery(QueryGateway(gateway)) - routes, err = table.Lookup(query) + routes, err = table.Query(query) if err != nil { t.Errorf("error looking up routes: %s", err) } @@ -163,7 +163,7 @@ func TestLookup(t *testing.T) { QueryNetwork(network), ) - routes, err = table.Lookup(query) + routes, err = table.Query(query) if err != nil { t.Errorf("error looking up routes: %s", err) } @@ -183,7 +183,7 @@ func TestLookup(t *testing.T) { // bullshit route query query = NewQuery(QueryService("foobar")) - routes, err = table.Lookup(query) + routes, err = table.Query(query) if err != ErrRouteNotFound { t.Errorf("error looking up routes. Expected: %s, found: %s", ErrRouteNotFound, err) }