Merge pull request #576 from milosgajdos83/router-rpc

Added List and Watch rpc calls.
This commit is contained in:
Asim Aslam 2019-07-10 17:55:23 +01:00 committed by GitHub
commit 8b7ac8a3f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 380 additions and 63 deletions

View File

@ -95,7 +95,7 @@ func (r *router) manageServiceRoutes(service *registry.Service, action string) e
} }
switch action { switch action {
case "insert", "create": case "insert", "create":
if err := r.opts.Table.Add(route); err != nil && err != table.ErrDuplicateRoute { if err := r.opts.Table.Create(route); err != nil && err != table.ErrDuplicateRoute {
return fmt.Errorf("failed adding route for service %s: %s", service.Name, err) return fmt.Errorf("failed adding route for service %s: %s", service.Name, err)
} }
case "delete": case "delete":
@ -227,7 +227,7 @@ func isFlapping(curr, prev *table.Event) bool {
return true return true
} }
if curr.Type == table.Insert && prev.Type == table.Delete || curr.Type == table.Delete && prev.Type == table.Insert { if curr.Type == table.Create && prev.Type == table.Delete || curr.Type == table.Delete && prev.Type == table.Create {
return true return true
} }
@ -389,7 +389,7 @@ func (r *router) Advertise() (<-chan *Advert, error) {
events := make([]*table.Event, len(routes)) events := make([]*table.Event, len(routes))
for i, route := range routes { for i, route := range routes {
event := &table.Event{ event := &table.Event{
Type: table.Insert, Type: table.Create,
Timestamp: time.Now(), Timestamp: time.Now(),
Route: route, Route: route,
} }
@ -406,7 +406,7 @@ func (r *router) Advertise() (<-chan *Advert, error) {
Network: "*", Network: "*",
Metric: table.DefaultLocalMetric, Metric: table.DefaultLocalMetric,
} }
if err := r.opts.Table.Add(route); err != nil { if err := r.opts.Table.Create(route); err != nil {
return nil, fmt.Errorf("failed adding default gateway route: %s", err) return nil, fmt.Errorf("failed adding default gateway route: %s", err)
} }
} }

View File

@ -34,7 +34,9 @@ var _ server.Option
// Client API for Router service // Client API for Router service
type RouterService interface { type RouterService interface {
Watch(ctx context.Context, in *WatchRequest, opts ...client.CallOption) (Router_WatchService, error)
Lookup(ctx context.Context, in *LookupRequest, opts ...client.CallOption) (*LookupResponse, error) Lookup(ctx context.Context, in *LookupRequest, opts ...client.CallOption) (*LookupResponse, error)
List(ctx context.Context, in *ListRequest, opts ...client.CallOption) (*ListResponse, error)
} }
type routerService struct { type routerService struct {
@ -55,6 +57,50 @@ func NewRouterService(name string, c client.Client) RouterService {
} }
} }
func (c *routerService) Watch(ctx context.Context, in *WatchRequest, opts ...client.CallOption) (Router_WatchService, error) {
req := c.c.NewRequest(c.name, "Router.Watch", &WatchRequest{})
stream, err := c.c.Stream(ctx, req, opts...)
if err != nil {
return nil, err
}
if err := stream.Send(in); err != nil {
return nil, err
}
return &routerServiceWatch{stream}, nil
}
type Router_WatchService interface {
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error
Recv() (*TableEvent, error)
}
type routerServiceWatch struct {
stream client.Stream
}
func (x *routerServiceWatch) Close() error {
return x.stream.Close()
}
func (x *routerServiceWatch) SendMsg(m interface{}) error {
return x.stream.Send(m)
}
func (x *routerServiceWatch) RecvMsg(m interface{}) error {
return x.stream.Recv(m)
}
func (x *routerServiceWatch) Recv() (*TableEvent, error) {
m := new(TableEvent)
err := x.stream.Recv(m)
if err != nil {
return nil, err
}
return m, nil
}
func (c *routerService) Lookup(ctx context.Context, in *LookupRequest, opts ...client.CallOption) (*LookupResponse, error) { func (c *routerService) Lookup(ctx context.Context, in *LookupRequest, opts ...client.CallOption) (*LookupResponse, error) {
req := c.c.NewRequest(c.name, "Router.Lookup", in) req := c.c.NewRequest(c.name, "Router.Lookup", in)
out := new(LookupResponse) out := new(LookupResponse)
@ -65,15 +111,29 @@ func (c *routerService) Lookup(ctx context.Context, in *LookupRequest, opts ...c
return out, nil return out, nil
} }
func (c *routerService) List(ctx context.Context, in *ListRequest, opts ...client.CallOption) (*ListResponse, error) {
req := c.c.NewRequest(c.name, "Router.List", in)
out := new(ListResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for Router service // Server API for Router service
type RouterHandler interface { type RouterHandler interface {
Watch(context.Context, *WatchRequest, Router_WatchStream) error
Lookup(context.Context, *LookupRequest, *LookupResponse) error Lookup(context.Context, *LookupRequest, *LookupResponse) error
List(context.Context, *ListRequest, *ListResponse) error
} }
func RegisterRouterHandler(s server.Server, hdlr RouterHandler, opts ...server.HandlerOption) error { func RegisterRouterHandler(s server.Server, hdlr RouterHandler, opts ...server.HandlerOption) error {
type router interface { type router interface {
Watch(ctx context.Context, stream server.Stream) error
Lookup(ctx context.Context, in *LookupRequest, out *LookupResponse) error Lookup(ctx context.Context, in *LookupRequest, out *LookupResponse) error
List(ctx context.Context, in *ListRequest, out *ListResponse) error
} }
type Router struct { type Router struct {
router router
@ -86,6 +146,45 @@ type routerHandler struct {
RouterHandler RouterHandler
} }
func (h *routerHandler) Watch(ctx context.Context, stream server.Stream) error {
m := new(WatchRequest)
if err := stream.Recv(m); err != nil {
return err
}
return h.RouterHandler.Watch(ctx, m, &routerWatchStream{stream})
}
type Router_WatchStream interface {
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error
Send(*TableEvent) error
}
type routerWatchStream struct {
stream server.Stream
}
func (x *routerWatchStream) Close() error {
return x.stream.Close()
}
func (x *routerWatchStream) SendMsg(m interface{}) error {
return x.stream.Send(m)
}
func (x *routerWatchStream) RecvMsg(m interface{}) error {
return x.stream.Recv(m)
}
func (x *routerWatchStream) Send(m *TableEvent) error {
return x.stream.Send(m)
}
func (h *routerHandler) Lookup(ctx context.Context, in *LookupRequest, out *LookupResponse) error { func (h *routerHandler) Lookup(ctx context.Context, in *LookupRequest, out *LookupResponse) error {
return h.RouterHandler.Lookup(ctx, in, out) return h.RouterHandler.Lookup(ctx, in, out)
} }
func (h *routerHandler) List(ctx context.Context, in *ListRequest, out *ListResponse) error {
return h.RouterHandler.List(ctx, in, out)
}

View File

@ -20,6 +20,35 @@ var _ = math.Inf
// proto package needs to be updated. // proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
// EventType is TableEvent type
type EventType int32
const (
EventType_Insert EventType = 0
EventType_Delete EventType = 1
EventType_Update EventType = 2
)
var EventType_name = map[int32]string{
0: "Insert",
1: "Delete",
2: "Update",
}
var EventType_value = map[string]int32{
"Insert": 0,
"Delete": 1,
"Update": 2,
}
func (x EventType) String() string {
return proto.EnumName(EventType_name, int32(x))
}
func (EventType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{0}
}
// LookupRequest is made to Lookup // LookupRequest is made to Lookup
type LookupRequest struct { type LookupRequest struct {
Query *Query `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"` Query *Query `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"`
@ -60,7 +89,7 @@ func (m *LookupRequest) GetQuery() *Query {
return nil return nil
} }
// LookupResponse is returns by Lookup // LookupResponse is returned by Lookup
type LookupResponse struct { type LookupResponse struct {
Routes []*Route `protobuf:"bytes,1,rep,name=routes,proto3" json:"routes,omitempty"` Routes []*Route `protobuf:"bytes,1,rep,name=routes,proto3" json:"routes,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
@ -100,6 +129,169 @@ func (m *LookupResponse) GetRoutes() []*Route {
return nil return nil
} }
// WatchRequest is made to Watch Router
type WatchRequest struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *WatchRequest) Reset() { *m = WatchRequest{} }
func (m *WatchRequest) String() string { return proto.CompactTextString(m) }
func (*WatchRequest) ProtoMessage() {}
func (*WatchRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{2}
}
func (m *WatchRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_WatchRequest.Unmarshal(m, b)
}
func (m *WatchRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_WatchRequest.Marshal(b, m, deterministic)
}
func (m *WatchRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_WatchRequest.Merge(m, src)
}
func (m *WatchRequest) XXX_Size() int {
return xxx_messageInfo_WatchRequest.Size(m)
}
func (m *WatchRequest) XXX_DiscardUnknown() {
xxx_messageInfo_WatchRequest.DiscardUnknown(m)
}
var xxx_messageInfo_WatchRequest proto.InternalMessageInfo
// TableEvent is streamed by WatchRequest
type TableEvent struct {
// time of table event
Type EventType `protobuf:"varint,1,opt,name=type,proto3,enum=EventType" json:"type,omitempty"`
// unix timestamp of event
Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
// service route
Route *Route `protobuf:"bytes,3,opt,name=route,proto3" json:"route,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *TableEvent) Reset() { *m = TableEvent{} }
func (m *TableEvent) String() string { return proto.CompactTextString(m) }
func (*TableEvent) ProtoMessage() {}
func (*TableEvent) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{3}
}
func (m *TableEvent) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_TableEvent.Unmarshal(m, b)
}
func (m *TableEvent) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_TableEvent.Marshal(b, m, deterministic)
}
func (m *TableEvent) XXX_Merge(src proto.Message) {
xxx_messageInfo_TableEvent.Merge(m, src)
}
func (m *TableEvent) XXX_Size() int {
return xxx_messageInfo_TableEvent.Size(m)
}
func (m *TableEvent) XXX_DiscardUnknown() {
xxx_messageInfo_TableEvent.DiscardUnknown(m)
}
var xxx_messageInfo_TableEvent proto.InternalMessageInfo
func (m *TableEvent) GetType() EventType {
if m != nil {
return m.Type
}
return EventType_Insert
}
func (m *TableEvent) GetTimestamp() int64 {
if m != nil {
return m.Timestamp
}
return 0
}
func (m *TableEvent) GetRoute() *Route {
if m != nil {
return m.Route
}
return nil
}
// ListRequest is made to List routes
type ListRequest struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ListRequest) Reset() { *m = ListRequest{} }
func (m *ListRequest) String() string { return proto.CompactTextString(m) }
func (*ListRequest) ProtoMessage() {}
func (*ListRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{4}
}
func (m *ListRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListRequest.Unmarshal(m, b)
}
func (m *ListRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ListRequest.Marshal(b, m, deterministic)
}
func (m *ListRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_ListRequest.Merge(m, src)
}
func (m *ListRequest) XXX_Size() int {
return xxx_messageInfo_ListRequest.Size(m)
}
func (m *ListRequest) XXX_DiscardUnknown() {
xxx_messageInfo_ListRequest.DiscardUnknown(m)
}
var xxx_messageInfo_ListRequest proto.InternalMessageInfo
// ListResponse is returned by List
type ListResponse struct {
Routes []*Route `protobuf:"bytes,1,rep,name=routes,proto3" json:"routes,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ListResponse) Reset() { *m = ListResponse{} }
func (m *ListResponse) String() string { return proto.CompactTextString(m) }
func (*ListResponse) ProtoMessage() {}
func (*ListResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{5}
}
func (m *ListResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListResponse.Unmarshal(m, b)
}
func (m *ListResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ListResponse.Marshal(b, m, deterministic)
}
func (m *ListResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_ListResponse.Merge(m, src)
}
func (m *ListResponse) XXX_Size() int {
return xxx_messageInfo_ListResponse.Size(m)
}
func (m *ListResponse) XXX_DiscardUnknown() {
xxx_messageInfo_ListResponse.DiscardUnknown(m)
}
var xxx_messageInfo_ListResponse proto.InternalMessageInfo
func (m *ListResponse) GetRoutes() []*Route {
if m != nil {
return m.Routes
}
return nil
}
// Query is passed in a LookupRequest // Query is passed in a LookupRequest
type Query struct { type Query struct {
// service to lookup // service to lookup
@ -113,7 +305,7 @@ func (m *Query) Reset() { *m = Query{} }
func (m *Query) String() string { return proto.CompactTextString(m) } func (m *Query) String() string { return proto.CompactTextString(m) }
func (*Query) ProtoMessage() {} func (*Query) ProtoMessage() {}
func (*Query) Descriptor() ([]byte, []int) { func (*Query) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{2} return fileDescriptor_367072455c71aedc, []int{6}
} }
func (m *Query) XXX_Unmarshal(b []byte) error { func (m *Query) XXX_Unmarshal(b []byte) error {
@ -164,7 +356,7 @@ func (m *Route) Reset() { *m = Route{} }
func (m *Route) String() string { return proto.CompactTextString(m) } func (m *Route) String() string { return proto.CompactTextString(m) }
func (*Route) ProtoMessage() {} func (*Route) ProtoMessage() {}
func (*Route) Descriptor() ([]byte, []int) { func (*Route) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{3} return fileDescriptor_367072455c71aedc, []int{7}
} }
func (m *Route) XXX_Unmarshal(b []byte) error { func (m *Route) XXX_Unmarshal(b []byte) error {
@ -228,8 +420,13 @@ func (m *Route) GetMetric() int64 {
} }
func init() { func init() {
proto.RegisterEnum("EventType", EventType_name, EventType_value)
proto.RegisterType((*LookupRequest)(nil), "LookupRequest") proto.RegisterType((*LookupRequest)(nil), "LookupRequest")
proto.RegisterType((*LookupResponse)(nil), "LookupResponse") proto.RegisterType((*LookupResponse)(nil), "LookupResponse")
proto.RegisterType((*WatchRequest)(nil), "WatchRequest")
proto.RegisterType((*TableEvent)(nil), "TableEvent")
proto.RegisterType((*ListRequest)(nil), "ListRequest")
proto.RegisterType((*ListResponse)(nil), "ListResponse")
proto.RegisterType((*Query)(nil), "Query") proto.RegisterType((*Query)(nil), "Query")
proto.RegisterType((*Route)(nil), "Route") proto.RegisterType((*Route)(nil), "Route")
} }
@ -237,20 +434,30 @@ func init() {
func init() { proto.RegisterFile("router.proto", fileDescriptor_367072455c71aedc) } func init() { proto.RegisterFile("router.proto", fileDescriptor_367072455c71aedc) }
var fileDescriptor_367072455c71aedc = []byte{ var fileDescriptor_367072455c71aedc = []byte{
// 238 bytes of a gzipped FileDescriptorProto // 390 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x90, 0xc1, 0x4a, 0xc4, 0x30, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x52, 0xc1, 0x8e, 0xd3, 0x30,
0x10, 0x86, 0x8d, 0xdd, 0x46, 0x1c, 0x75, 0x85, 0x1c, 0x24, 0x88, 0x48, 0xcd, 0x69, 0x41, 0x2c, 0x10, 0x8d, 0xb7, 0x89, 0x51, 0xa6, 0x49, 0xa8, 0x7c, 0x40, 0x56, 0xb5, 0x5a, 0x15, 0x4b, 0x88,
0xb2, 0xe2, 0x5b, 0x78, 0x31, 0x6f, 0x50, 0x77, 0x07, 0x29, 0xd5, 0xa6, 0x3b, 0x49, 0x5c, 0xf6, 0x15, 0x08, 0xb3, 0x2a, 0xbf, 0x00, 0x07, 0xa4, 0xbd, 0x60, 0x2d, 0xe2, 0x9c, 0x6d, 0x47, 0x6c,
0x59, 0x7c, 0x59, 0xc9, 0x24, 0x7b, 0xe8, 0xc1, 0x5b, 0xbf, 0xf9, 0x66, 0x7e, 0x9a, 0x1f, 0x2e, 0xd4, 0x36, 0xc9, 0xda, 0xce, 0xae, 0xf2, 0x01, 0x7c, 0x05, 0x3f, 0x8b, 0x3c, 0x49, 0x68, 0x7b,
0xc9, 0xc5, 0x80, 0xd4, 0x4e, 0xe4, 0x82, 0x33, 0x4f, 0x70, 0xf5, 0xe6, 0xdc, 0x10, 0x27, 0x8b, 0x40, 0xe2, 0x36, 0xef, 0x8d, 0xed, 0x79, 0x9e, 0xf7, 0x20, 0xb3, 0x4d, 0xe7, 0xd1, 0xea, 0xd6,
0xbb, 0x88, 0x3e, 0xa8, 0x3b, 0xa8, 0x77, 0x11, 0xe9, 0xa0, 0x45, 0x23, 0x56, 0x17, 0x6b, 0xd9, 0x36, 0xbe, 0x51, 0x1f, 0x20, 0xbf, 0x6d, 0x9a, 0x5d, 0xd7, 0x1a, 0x7c, 0xec, 0xd0, 0x79, 0x71,
0xbe, 0x27, 0xb2, 0x79, 0x68, 0x9e, 0x61, 0x79, 0x5c, 0xf7, 0x93, 0x1b, 0x3d, 0xaa, 0x7b, 0x90, 0x09, 0xc9, 0x63, 0x87, 0xb6, 0x97, 0x6c, 0xc5, 0xae, 0xe7, 0x6b, 0xae, 0xbf, 0x05, 0x64, 0x06,
0x1c, 0xe8, 0xb5, 0x68, 0x2a, 0x3e, 0xb0, 0x09, 0x6d, 0x99, 0x9a, 0x07, 0xa8, 0x39, 0x41, 0x69, 0x52, 0xdd, 0x40, 0x31, 0x1d, 0x77, 0x6d, 0x53, 0x3b, 0x14, 0x57, 0xc0, 0xe9, 0x41, 0x27, 0xd9,
0x38, 0xf3, 0x48, 0x3f, 0xfd, 0x06, 0x39, 0xfa, 0xdc, 0x1e, 0xd1, 0xfc, 0x0a, 0xa8, 0xf9, 0xe8, 0x6a, 0x46, 0x17, 0x4c, 0x80, 0x66, 0x64, 0x55, 0x01, 0xd9, 0x8f, 0xd2, 0x6f, 0x1e, 0xc6, 0xf7,
0xff, 0x9d, 0x64, 0xba, 0xed, 0x96, 0xd0, 0x7b, 0x7d, 0x9a, 0x4d, 0xc1, 0x64, 0x3e, 0xbb, 0x80, 0xd5, 0x03, 0xc0, 0x5d, 0x79, 0xbf, 0xc7, 0x2f, 0x4f, 0x58, 0x7b, 0x71, 0x05, 0xb1, 0xef, 0x5b,
0xfb, 0xee, 0xa0, 0xab, 0x6c, 0x0a, 0x26, 0x33, 0x62, 0xd8, 0x3b, 0x1a, 0xf4, 0x22, 0x9b, 0x82, 0xa4, 0x61, 0xc5, 0x1a, 0x34, 0xb1, 0x77, 0x7d, 0x8b, 0x86, 0x78, 0x71, 0x09, 0xa9, 0xaf, 0x0e,
0x4a, 0xc1, 0xe2, 0xab, 0x1f, 0x07, 0x5d, 0xf3, 0x98, 0xbf, 0xd5, 0x0d, 0xc8, 0x6f, 0x0c, 0xd4, 0xe8, 0x7c, 0x79, 0x68, 0xe5, 0xc5, 0x8a, 0x5d, 0xcf, 0xcc, 0x91, 0x08, 0x5a, 0x69, 0x8a, 0x9c,
0x6f, 0xb4, 0x6c, 0xc4, 0xaa, 0xb2, 0x85, 0xd6, 0xaf, 0x20, 0xf9, 0xe7, 0x48, 0x3d, 0x82, 0xcc, 0x8d, 0x5a, 0x87, 0xd1, 0x03, 0xa9, 0x72, 0x98, 0xdf, 0x56, 0xce, 0x4f, 0x83, 0x35, 0x64, 0x03,
0x8f, 0x57, 0xcb, 0x76, 0x56, 0xda, 0xed, 0x75, 0x3b, 0x6f, 0xc5, 0x9c, 0x7c, 0x48, 0xee, 0xf7, 0xfc, 0x4f, 0xe1, 0xaf, 0x21, 0xa1, 0xaf, 0x0b, 0x09, 0x2f, 0x1c, 0xda, 0xa7, 0x6a, 0x33, 0xc8,
0xe5, 0x2f, 0x00, 0x00, 0xff, 0xff, 0xd3, 0x99, 0xfb, 0x2d, 0x6f, 0x01, 0x00, 0x00, 0x4c, 0xcd, 0x04, 0xd5, 0x6f, 0x06, 0x09, 0x5d, 0xfa, 0xf7, 0x99, 0xd0, 0x29, 0xb7, 0x5b, 0x8b,
0xce, 0x91, 0xfe, 0xd4, 0x4c, 0x30, 0x74, 0x7e, 0x96, 0x1e, 0x9f, 0xcb, 0x9e, 0xf4, 0xa7, 0x66,
0x82, 0xa1, 0x53, 0xa3, 0x7f, 0x6e, 0xec, 0x4e, 0xc6, 0x43, 0x67, 0x84, 0x42, 0x40, 0xbc, 0xaf,
0xea, 0x9d, 0x4c, 0x88, 0xa6, 0x5a, 0xbc, 0x02, 0x7e, 0x40, 0x6f, 0xab, 0x8d, 0xe4, 0xb4, 0xa0,
0x11, 0xbd, 0xfb, 0x08, 0xe9, 0xdf, 0x75, 0x0a, 0x00, 0xfe, 0xb5, 0x76, 0x68, 0xfd, 0x22, 0x0a,
0xf5, 0x67, 0xdc, 0xa3, 0xc7, 0x05, 0x0b, 0xf5, 0xf7, 0x76, 0x5b, 0x7a, 0x5c, 0x5c, 0xac, 0x7f,
0x31, 0xe0, 0xf4, 0x1d, 0x2b, 0xde, 0x42, 0x42, 0xae, 0x89, 0x5c, 0x9f, 0xba, 0xb7, 0x9c, 0xeb,
0xa3, 0x79, 0x2a, 0xba, 0x61, 0xe2, 0x3d, 0xf0, 0x21, 0x10, 0xa2, 0xd0, 0x67, 0x41, 0x5a, 0xbe,
0xd4, 0xe7, 0x49, 0x51, 0x91, 0x78, 0x03, 0x71, 0xb0, 0x40, 0x64, 0xfa, 0xc4, 0x98, 0x65, 0xae,
0x4f, 0x7d, 0x51, 0xd1, 0x3d, 0xa7, 0x68, 0x7e, 0xfa, 0x13, 0x00, 0x00, 0xff, 0xff, 0x7e, 0xbb,
0x6d, 0x5c, 0xaa, 0x02, 0x00, 0x00,
} }

View File

@ -2,7 +2,9 @@ syntax = "proto3";
// Router service is used by the proxy to lookup routes // Router service is used by the proxy to lookup routes
service Router { service Router {
rpc Watch(WatchRequest) returns (stream TableEvent) {};
rpc Lookup(LookupRequest) returns (LookupResponse) {}; rpc Lookup(LookupRequest) returns (LookupResponse) {};
rpc List(ListRequest) returns (ListResponse) {};
} }
// LookupRequest is made to Lookup // LookupRequest is made to Lookup
@ -10,11 +12,39 @@ message LookupRequest {
Query query = 1; Query query = 1;
} }
// LookupResponse is returns by Lookup // LookupResponse is returned by Lookup
message LookupResponse { message LookupResponse {
repeated Route routes = 1; repeated Route routes = 1;
} }
// WatchRequest is made to Watch Router
message WatchRequest {}
// EventType is TableEvent type
enum EventType {
Insert = 0;
Delete = 1;
Update = 2;
}
// TableEvent is streamed by WatchRequest
message TableEvent {
// time of table event
EventType type = 1;
// unix timestamp of event
int64 timestamp = 2;
// service route
Route route = 3;
}
// ListRequest is made to List routes
message ListRequest {}
// ListResponse is returned by List
message ListResponse {
repeated Route routes = 1;
}
// Query is passed in a LookupRequest // Query is passed in a LookupRequest
message Query { message Query {
// service to lookup // service to lookup

View File

@ -2,6 +2,7 @@ package table
import ( import (
"sync" "sync"
"time"
"github.com/google/uuid" "github.com/google/uuid"
) )
@ -51,8 +52,8 @@ func (t *table) Options() TableOptions {
return t.opts return t.opts
} }
// Add adds a route to the routing table // Create creates new route in the routing table
func (t *table) Add(r Route) error { func (t *table) Create(r Route) error {
service := r.Service service := r.Service
sum := r.Hash() sum := r.Hash()
@ -63,14 +64,14 @@ func (t *table) Add(r Route) error {
if _, ok := t.m[service]; !ok { if _, ok := t.m[service]; !ok {
t.m[service] = make(map[uint64]Route) t.m[service] = make(map[uint64]Route)
t.m[service][sum] = r t.m[service][sum] = r
go t.sendEvent(&Event{Type: Insert, Route: r}) go t.sendEvent(&Event{Type: Create, Timestamp: time.Now(), Route: r})
return nil return nil
} }
// add new route to the table for the route destination // add new route to the table for the route destination
if _, ok := t.m[service][sum]; !ok { if _, ok := t.m[service][sum]; !ok {
t.m[service][sum] = r t.m[service][sum] = r
go t.sendEvent(&Event{Type: Insert, Route: r}) go t.sendEvent(&Event{Type: Create, Timestamp: time.Now(), Route: r})
return nil return nil
} }
@ -90,7 +91,7 @@ func (t *table) Delete(r Route) error {
} }
delete(t.m[service], sum) delete(t.m[service], sum)
go t.sendEvent(&Event{Type: Delete, Route: r}) go t.sendEvent(&Event{Type: Delete, Timestamp: time.Now(), Route: r})
return nil return nil
} }
@ -111,7 +112,7 @@ func (t *table) Update(r Route) error {
// if the route has been found update it // if the route has been found update it
if _, ok := t.m[service][sum]; ok { if _, ok := t.m[service][sum]; ok {
t.m[service][sum] = r t.m[service][sum] = r
go t.sendEvent(&Event{Type: Update, Route: r}) go t.sendEvent(&Event{Type: Update, Timestamp: time.Now(), Route: r})
return nil return nil
} }

View File

@ -16,11 +16,11 @@ func testSetup() (Table, Route) {
return table, route return table, route
} }
func TestAdd(t *testing.T) { func TestCreate(t *testing.T) {
table, route := testSetup() table, route := testSetup()
testTableSize := table.Size() testTableSize := table.Size()
if err := table.Add(route); err != nil { if err := table.Create(route); err != nil {
t.Errorf("error adding route: %s", err) t.Errorf("error adding route: %s", err)
} }
testTableSize += 1 testTableSize += 1
@ -28,7 +28,7 @@ func TestAdd(t *testing.T) {
// adds new route for the original destination // adds new route for the original destination
route.Gateway = "dest.gw2" route.Gateway = "dest.gw2"
if err := table.Add(route); err != nil { if err := table.Create(route); err != nil {
t.Errorf("error adding route: %s", err) t.Errorf("error adding route: %s", err)
} }
testTableSize += 1 testTableSize += 1
@ -38,7 +38,7 @@ func TestAdd(t *testing.T) {
} }
// adding the same route under Insert policy must error // adding the same route under Insert policy must error
if err := table.Add(route); err != ErrDuplicateRoute { if err := table.Create(route); err != ErrDuplicateRoute {
t.Errorf("error adding route. Expected error: %s, found: %s", ErrDuplicateRoute, err) t.Errorf("error adding route. Expected error: %s, found: %s", ErrDuplicateRoute, err)
} }
} }
@ -47,7 +47,7 @@ func TestDelete(t *testing.T) {
table, route := testSetup() table, route := testSetup()
testTableSize := table.Size() testTableSize := table.Size()
if err := table.Add(route); err != nil { if err := table.Create(route); err != nil {
t.Errorf("error adding route: %s", err) t.Errorf("error adding route: %s", err)
} }
testTableSize += 1 testTableSize += 1
@ -77,7 +77,7 @@ func TestUpdate(t *testing.T) {
table, route := testSetup() table, route := testSetup()
testTableSize := table.Size() testTableSize := table.Size()
if err := table.Add(route); err != nil { if err := table.Create(route); err != nil {
t.Errorf("error adding route: %s", err) t.Errorf("error adding route: %s", err)
} }
testTableSize += 1 testTableSize += 1
@ -113,7 +113,7 @@ func TestList(t *testing.T) {
for i := 0; i < len(svc); i++ { for i := 0; i < len(svc); i++ {
route.Service = svc[i] route.Service = svc[i]
if err := table.Add(route); err != nil { if err := table.Create(route); err != nil {
t.Errorf("error adding route: %s", err) t.Errorf("error adding route: %s", err)
} }
} }
@ -143,7 +143,7 @@ func TestLookup(t *testing.T) {
route.Service = svc[i] route.Service = svc[i]
route.Network = net[i] route.Network = net[i]
route.Gateway = gw[i] route.Gateway = gw[i]
if err := table.Add(route); err != nil { if err := table.Create(route); err != nil {
t.Errorf("error adding route: %s", err) t.Errorf("error adding route: %s", err)
} }
} }

View File

@ -13,8 +13,8 @@ var (
// Table defines routing table interface // Table defines routing table interface
type Table interface { type Table interface {
// Add adds new route to the routing table // Create new route in the routing table
Add(Route) error Create(Route) error
// Delete deletes existing route from the routing table // Delete deletes existing route from the routing table
Delete(Route) error Delete(Route) error
// Update updates route in the routing table // Update updates route in the routing table

View File

@ -2,7 +2,6 @@ package table
import ( import (
"errors" "errors"
"fmt"
"time" "time"
) )
@ -15,28 +14,14 @@ var (
type EventType int type EventType int
const ( const (
// Insert is emitted when a new route has been inserted // Create is emitted when a new route has been created
Insert EventType = iota Create EventType = iota
// Delete is emitted when an existing route has been deleted // Delete is emitted when an existing route has been deleted
Delete Delete
// Update is emitted when an existing route has been updated // Update is emitted when an existing route has been updated
Update Update
) )
// String returns string representation of the event
func (et EventType) String() string {
switch et {
case Insert:
return "INSERT"
case Delete:
return "DELETE"
case Update:
return "UPDATE"
default:
return "UNKNOWN"
}
}
// Event is returned by a call to Next on the watcher. // Event is returned by a call to Next on the watcher.
type Event struct { type Event struct {
// Type defines type of event // Type defines type of event
@ -47,11 +32,6 @@ type Event struct {
Route Route Route Route
} }
// String prints human readable Event
func (e Event) String() string {
return fmt.Sprintf("[EVENT] time: %s type: %s", e.Timestamp, e.Type)
}
// WatchOption is used to define what routes to watch in the table // WatchOption is used to define what routes to watch in the table
type WatchOption func(*WatchOptions) type WatchOption func(*WatchOptions)
@ -88,7 +68,7 @@ type tableWatcher struct {
// Next returns the next noticed action taken on table // Next returns the next noticed action taken on table
// TODO: this needs to be thought through properly; // TODO: this needs to be thought through properly;
// right now we only allow to watch destination // right now we only allow to watch service
func (w *tableWatcher) Next() (*Event, error) { func (w *tableWatcher) Next() (*Event, error) {
for { for {
select { select {