Merge branch 'master' of ssh://github.com/micro/go-micro

This commit is contained in:
Asim Aslam 2019-07-10 18:24:12 +01:00
commit 196e76e350
8 changed files with 380 additions and 63 deletions

View File

@ -95,7 +95,7 @@ func (r *router) manageServiceRoutes(service *registry.Service, action string) e
}
switch action {
case "insert", "create":
if err := r.opts.Table.Add(route); err != nil && err != table.ErrDuplicateRoute {
if err := r.opts.Table.Create(route); err != nil && err != table.ErrDuplicateRoute {
return fmt.Errorf("failed adding route for service %s: %s", service.Name, err)
}
case "delete":
@ -227,7 +227,7 @@ func isFlapping(curr, prev *table.Event) bool {
return true
}
if curr.Type == table.Insert && prev.Type == table.Delete || curr.Type == table.Delete && prev.Type == table.Insert {
if curr.Type == table.Create && prev.Type == table.Delete || curr.Type == table.Delete && prev.Type == table.Create {
return true
}
@ -389,7 +389,7 @@ func (r *router) Advertise() (<-chan *Advert, error) {
events := make([]*table.Event, len(routes))
for i, route := range routes {
event := &table.Event{
Type: table.Insert,
Type: table.Create,
Timestamp: time.Now(),
Route: route,
}
@ -406,7 +406,7 @@ func (r *router) Advertise() (<-chan *Advert, error) {
Network: "*",
Metric: table.DefaultLocalMetric,
}
if err := r.opts.Table.Add(route); err != nil {
if err := r.opts.Table.Create(route); err != nil {
return nil, fmt.Errorf("failed adding default gateway route: %s", err)
}
}

View File

@ -34,7 +34,9 @@ var _ server.Option
// Client API for Router service
type RouterService interface {
Watch(ctx context.Context, in *WatchRequest, opts ...client.CallOption) (Router_WatchService, error)
Lookup(ctx context.Context, in *LookupRequest, opts ...client.CallOption) (*LookupResponse, error)
List(ctx context.Context, in *ListRequest, opts ...client.CallOption) (*ListResponse, error)
}
type routerService struct {
@ -55,6 +57,50 @@ func NewRouterService(name string, c client.Client) RouterService {
}
}
func (c *routerService) Watch(ctx context.Context, in *WatchRequest, opts ...client.CallOption) (Router_WatchService, error) {
req := c.c.NewRequest(c.name, "Router.Watch", &WatchRequest{})
stream, err := c.c.Stream(ctx, req, opts...)
if err != nil {
return nil, err
}
if err := stream.Send(in); err != nil {
return nil, err
}
return &routerServiceWatch{stream}, nil
}
type Router_WatchService interface {
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error
Recv() (*TableEvent, error)
}
type routerServiceWatch struct {
stream client.Stream
}
func (x *routerServiceWatch) Close() error {
return x.stream.Close()
}
func (x *routerServiceWatch) SendMsg(m interface{}) error {
return x.stream.Send(m)
}
func (x *routerServiceWatch) RecvMsg(m interface{}) error {
return x.stream.Recv(m)
}
func (x *routerServiceWatch) Recv() (*TableEvent, error) {
m := new(TableEvent)
err := x.stream.Recv(m)
if err != nil {
return nil, err
}
return m, nil
}
func (c *routerService) Lookup(ctx context.Context, in *LookupRequest, opts ...client.CallOption) (*LookupResponse, error) {
req := c.c.NewRequest(c.name, "Router.Lookup", in)
out := new(LookupResponse)
@ -65,15 +111,29 @@ func (c *routerService) Lookup(ctx context.Context, in *LookupRequest, opts ...c
return out, nil
}
func (c *routerService) List(ctx context.Context, in *ListRequest, opts ...client.CallOption) (*ListResponse, error) {
req := c.c.NewRequest(c.name, "Router.List", in)
out := new(ListResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for Router service
type RouterHandler interface {
Watch(context.Context, *WatchRequest, Router_WatchStream) error
Lookup(context.Context, *LookupRequest, *LookupResponse) error
List(context.Context, *ListRequest, *ListResponse) error
}
func RegisterRouterHandler(s server.Server, hdlr RouterHandler, opts ...server.HandlerOption) error {
type router interface {
Watch(ctx context.Context, stream server.Stream) error
Lookup(ctx context.Context, in *LookupRequest, out *LookupResponse) error
List(ctx context.Context, in *ListRequest, out *ListResponse) error
}
type Router struct {
router
@ -86,6 +146,45 @@ type routerHandler struct {
RouterHandler
}
func (h *routerHandler) Watch(ctx context.Context, stream server.Stream) error {
m := new(WatchRequest)
if err := stream.Recv(m); err != nil {
return err
}
return h.RouterHandler.Watch(ctx, m, &routerWatchStream{stream})
}
type Router_WatchStream interface {
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error
Send(*TableEvent) error
}
type routerWatchStream struct {
stream server.Stream
}
func (x *routerWatchStream) Close() error {
return x.stream.Close()
}
func (x *routerWatchStream) SendMsg(m interface{}) error {
return x.stream.Send(m)
}
func (x *routerWatchStream) RecvMsg(m interface{}) error {
return x.stream.Recv(m)
}
func (x *routerWatchStream) Send(m *TableEvent) error {
return x.stream.Send(m)
}
func (h *routerHandler) Lookup(ctx context.Context, in *LookupRequest, out *LookupResponse) error {
return h.RouterHandler.Lookup(ctx, in, out)
}
func (h *routerHandler) List(ctx context.Context, in *ListRequest, out *ListResponse) error {
return h.RouterHandler.List(ctx, in, out)
}

View File

@ -20,6 +20,35 @@ var _ = math.Inf
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
// EventType is TableEvent type
type EventType int32
const (
EventType_Insert EventType = 0
EventType_Delete EventType = 1
EventType_Update EventType = 2
)
var EventType_name = map[int32]string{
0: "Insert",
1: "Delete",
2: "Update",
}
var EventType_value = map[string]int32{
"Insert": 0,
"Delete": 1,
"Update": 2,
}
func (x EventType) String() string {
return proto.EnumName(EventType_name, int32(x))
}
func (EventType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{0}
}
// LookupRequest is made to Lookup
type LookupRequest struct {
Query *Query `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"`
@ -60,7 +89,7 @@ func (m *LookupRequest) GetQuery() *Query {
return nil
}
// LookupResponse is returns by Lookup
// LookupResponse is returned by Lookup
type LookupResponse struct {
Routes []*Route `protobuf:"bytes,1,rep,name=routes,proto3" json:"routes,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
@ -100,6 +129,169 @@ func (m *LookupResponse) GetRoutes() []*Route {
return nil
}
// WatchRequest is made to Watch Router
type WatchRequest struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *WatchRequest) Reset() { *m = WatchRequest{} }
func (m *WatchRequest) String() string { return proto.CompactTextString(m) }
func (*WatchRequest) ProtoMessage() {}
func (*WatchRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{2}
}
func (m *WatchRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_WatchRequest.Unmarshal(m, b)
}
func (m *WatchRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_WatchRequest.Marshal(b, m, deterministic)
}
func (m *WatchRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_WatchRequest.Merge(m, src)
}
func (m *WatchRequest) XXX_Size() int {
return xxx_messageInfo_WatchRequest.Size(m)
}
func (m *WatchRequest) XXX_DiscardUnknown() {
xxx_messageInfo_WatchRequest.DiscardUnknown(m)
}
var xxx_messageInfo_WatchRequest proto.InternalMessageInfo
// TableEvent is streamed by WatchRequest
type TableEvent struct {
// time of table event
Type EventType `protobuf:"varint,1,opt,name=type,proto3,enum=EventType" json:"type,omitempty"`
// unix timestamp of event
Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
// service route
Route *Route `protobuf:"bytes,3,opt,name=route,proto3" json:"route,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *TableEvent) Reset() { *m = TableEvent{} }
func (m *TableEvent) String() string { return proto.CompactTextString(m) }
func (*TableEvent) ProtoMessage() {}
func (*TableEvent) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{3}
}
func (m *TableEvent) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_TableEvent.Unmarshal(m, b)
}
func (m *TableEvent) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_TableEvent.Marshal(b, m, deterministic)
}
func (m *TableEvent) XXX_Merge(src proto.Message) {
xxx_messageInfo_TableEvent.Merge(m, src)
}
func (m *TableEvent) XXX_Size() int {
return xxx_messageInfo_TableEvent.Size(m)
}
func (m *TableEvent) XXX_DiscardUnknown() {
xxx_messageInfo_TableEvent.DiscardUnknown(m)
}
var xxx_messageInfo_TableEvent proto.InternalMessageInfo
func (m *TableEvent) GetType() EventType {
if m != nil {
return m.Type
}
return EventType_Insert
}
func (m *TableEvent) GetTimestamp() int64 {
if m != nil {
return m.Timestamp
}
return 0
}
func (m *TableEvent) GetRoute() *Route {
if m != nil {
return m.Route
}
return nil
}
// ListRequest is made to List routes
type ListRequest struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ListRequest) Reset() { *m = ListRequest{} }
func (m *ListRequest) String() string { return proto.CompactTextString(m) }
func (*ListRequest) ProtoMessage() {}
func (*ListRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{4}
}
func (m *ListRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListRequest.Unmarshal(m, b)
}
func (m *ListRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ListRequest.Marshal(b, m, deterministic)
}
func (m *ListRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_ListRequest.Merge(m, src)
}
func (m *ListRequest) XXX_Size() int {
return xxx_messageInfo_ListRequest.Size(m)
}
func (m *ListRequest) XXX_DiscardUnknown() {
xxx_messageInfo_ListRequest.DiscardUnknown(m)
}
var xxx_messageInfo_ListRequest proto.InternalMessageInfo
// ListResponse is returned by List
type ListResponse struct {
Routes []*Route `protobuf:"bytes,1,rep,name=routes,proto3" json:"routes,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ListResponse) Reset() { *m = ListResponse{} }
func (m *ListResponse) String() string { return proto.CompactTextString(m) }
func (*ListResponse) ProtoMessage() {}
func (*ListResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{5}
}
func (m *ListResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListResponse.Unmarshal(m, b)
}
func (m *ListResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ListResponse.Marshal(b, m, deterministic)
}
func (m *ListResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_ListResponse.Merge(m, src)
}
func (m *ListResponse) XXX_Size() int {
return xxx_messageInfo_ListResponse.Size(m)
}
func (m *ListResponse) XXX_DiscardUnknown() {
xxx_messageInfo_ListResponse.DiscardUnknown(m)
}
var xxx_messageInfo_ListResponse proto.InternalMessageInfo
func (m *ListResponse) GetRoutes() []*Route {
if m != nil {
return m.Routes
}
return nil
}
// Query is passed in a LookupRequest
type Query struct {
// service to lookup
@ -113,7 +305,7 @@ func (m *Query) Reset() { *m = Query{} }
func (m *Query) String() string { return proto.CompactTextString(m) }
func (*Query) ProtoMessage() {}
func (*Query) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{2}
return fileDescriptor_367072455c71aedc, []int{6}
}
func (m *Query) XXX_Unmarshal(b []byte) error {
@ -164,7 +356,7 @@ func (m *Route) Reset() { *m = Route{} }
func (m *Route) String() string { return proto.CompactTextString(m) }
func (*Route) ProtoMessage() {}
func (*Route) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{3}
return fileDescriptor_367072455c71aedc, []int{7}
}
func (m *Route) XXX_Unmarshal(b []byte) error {
@ -228,8 +420,13 @@ func (m *Route) GetMetric() int64 {
}
func init() {
proto.RegisterEnum("EventType", EventType_name, EventType_value)
proto.RegisterType((*LookupRequest)(nil), "LookupRequest")
proto.RegisterType((*LookupResponse)(nil), "LookupResponse")
proto.RegisterType((*WatchRequest)(nil), "WatchRequest")
proto.RegisterType((*TableEvent)(nil), "TableEvent")
proto.RegisterType((*ListRequest)(nil), "ListRequest")
proto.RegisterType((*ListResponse)(nil), "ListResponse")
proto.RegisterType((*Query)(nil), "Query")
proto.RegisterType((*Route)(nil), "Route")
}
@ -237,20 +434,30 @@ func init() {
func init() { proto.RegisterFile("router.proto", fileDescriptor_367072455c71aedc) }
var fileDescriptor_367072455c71aedc = []byte{
// 238 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x90, 0xc1, 0x4a, 0xc4, 0x30,
0x10, 0x86, 0x8d, 0xdd, 0x46, 0x1c, 0x75, 0x85, 0x1c, 0x24, 0x88, 0x48, 0xcd, 0x69, 0x41, 0x2c,
0xb2, 0xe2, 0x5b, 0x78, 0x31, 0x6f, 0x50, 0x77, 0x07, 0x29, 0xd5, 0xa6, 0x3b, 0x49, 0x5c, 0xf6,
0x59, 0x7c, 0x59, 0xc9, 0x24, 0x7b, 0xe8, 0xc1, 0x5b, 0xbf, 0xf9, 0x66, 0x7e, 0x9a, 0x1f, 0x2e,
0xc9, 0xc5, 0x80, 0xd4, 0x4e, 0xe4, 0x82, 0x33, 0x4f, 0x70, 0xf5, 0xe6, 0xdc, 0x10, 0x27, 0x8b,
0xbb, 0x88, 0x3e, 0xa8, 0x3b, 0xa8, 0x77, 0x11, 0xe9, 0xa0, 0x45, 0x23, 0x56, 0x17, 0x6b, 0xd9,
0xbe, 0x27, 0xb2, 0x79, 0x68, 0x9e, 0x61, 0x79, 0x5c, 0xf7, 0x93, 0x1b, 0x3d, 0xaa, 0x7b, 0x90,
0x1c, 0xe8, 0xb5, 0x68, 0x2a, 0x3e, 0xb0, 0x09, 0x6d, 0x99, 0x9a, 0x07, 0xa8, 0x39, 0x41, 0x69,
0x38, 0xf3, 0x48, 0x3f, 0xfd, 0x06, 0x39, 0xfa, 0xdc, 0x1e, 0xd1, 0xfc, 0x0a, 0xa8, 0xf9, 0xe8,
0xff, 0x9d, 0x64, 0xba, 0xed, 0x96, 0xd0, 0x7b, 0x7d, 0x9a, 0x4d, 0xc1, 0x64, 0x3e, 0xbb, 0x80,
0xfb, 0xee, 0xa0, 0xab, 0x6c, 0x0a, 0x26, 0x33, 0x62, 0xd8, 0x3b, 0x1a, 0xf4, 0x22, 0x9b, 0x82,
0x4a, 0xc1, 0xe2, 0xab, 0x1f, 0x07, 0x5d, 0xf3, 0x98, 0xbf, 0xd5, 0x0d, 0xc8, 0x6f, 0x0c, 0xd4,
0x6f, 0xb4, 0x6c, 0xc4, 0xaa, 0xb2, 0x85, 0xd6, 0xaf, 0x20, 0xf9, 0xe7, 0x48, 0x3d, 0x82, 0xcc,
0x8f, 0x57, 0xcb, 0x76, 0x56, 0xda, 0xed, 0x75, 0x3b, 0x6f, 0xc5, 0x9c, 0x7c, 0x48, 0xee, 0xf7,
0xe5, 0x2f, 0x00, 0x00, 0xff, 0xff, 0xd3, 0x99, 0xfb, 0x2d, 0x6f, 0x01, 0x00, 0x00,
// 390 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x52, 0xc1, 0x8e, 0xd3, 0x30,
0x10, 0x8d, 0xb7, 0x89, 0x51, 0xa6, 0x49, 0xa8, 0x7c, 0x40, 0x56, 0xb5, 0x5a, 0x15, 0x4b, 0x88,
0x15, 0x08, 0xb3, 0x2a, 0xbf, 0x00, 0x07, 0xa4, 0xbd, 0x60, 0x2d, 0xe2, 0x9c, 0x6d, 0x47, 0x6c,
0xd4, 0x36, 0xc9, 0xda, 0xce, 0xae, 0xf2, 0x01, 0x7c, 0x05, 0x3f, 0x8b, 0x3c, 0x49, 0x68, 0x7b,
0x40, 0xe2, 0x36, 0xef, 0x8d, 0xed, 0x79, 0x9e, 0xf7, 0x20, 0xb3, 0x4d, 0xe7, 0xd1, 0xea, 0xd6,
0x36, 0xbe, 0x51, 0x1f, 0x20, 0xbf, 0x6d, 0x9a, 0x5d, 0xd7, 0x1a, 0x7c, 0xec, 0xd0, 0x79, 0x71,
0x09, 0xc9, 0x63, 0x87, 0xb6, 0x97, 0x6c, 0xc5, 0xae, 0xe7, 0x6b, 0xae, 0xbf, 0x05, 0x64, 0x06,
0x52, 0xdd, 0x40, 0x31, 0x1d, 0x77, 0x6d, 0x53, 0x3b, 0x14, 0x57, 0xc0, 0xe9, 0x41, 0x27, 0xd9,
0x6a, 0x46, 0x17, 0x4c, 0x80, 0x66, 0x64, 0x55, 0x01, 0xd9, 0x8f, 0xd2, 0x6f, 0x1e, 0xc6, 0xf7,
0xd5, 0x03, 0xc0, 0x5d, 0x79, 0xbf, 0xc7, 0x2f, 0x4f, 0x58, 0x7b, 0x71, 0x05, 0xb1, 0xef, 0x5b,
0xa4, 0x61, 0xc5, 0x1a, 0x34, 0xb1, 0x77, 0x7d, 0x8b, 0x86, 0x78, 0x71, 0x09, 0xa9, 0xaf, 0x0e,
0xe8, 0x7c, 0x79, 0x68, 0xe5, 0xc5, 0x8a, 0x5d, 0xcf, 0xcc, 0x91, 0x08, 0x5a, 0x69, 0x8a, 0x9c,
0x8d, 0x5a, 0x87, 0xd1, 0x03, 0xa9, 0x72, 0x98, 0xdf, 0x56, 0xce, 0x4f, 0x83, 0x35, 0x64, 0x03,
0xfc, 0x4f, 0xe1, 0xaf, 0x21, 0xa1, 0xaf, 0x0b, 0x09, 0x2f, 0x1c, 0xda, 0xa7, 0x6a, 0x33, 0xc8,
0x4c, 0xcd, 0x04, 0xd5, 0x6f, 0x06, 0x09, 0x5d, 0xfa, 0xf7, 0x99, 0xd0, 0x29, 0xb7, 0x5b, 0x8b,
0xce, 0x91, 0xfe, 0xd4, 0x4c, 0x30, 0x74, 0x7e, 0x96, 0x1e, 0x9f, 0xcb, 0x9e, 0xf4, 0xa7, 0x66,
0x82, 0xa1, 0x53, 0xa3, 0x7f, 0x6e, 0xec, 0x4e, 0xc6, 0x43, 0x67, 0x84, 0x42, 0x40, 0xbc, 0xaf,
0xea, 0x9d, 0x4c, 0x88, 0xa6, 0x5a, 0xbc, 0x02, 0x7e, 0x40, 0x6f, 0xab, 0x8d, 0xe4, 0xb4, 0xa0,
0x11, 0xbd, 0xfb, 0x08, 0xe9, 0xdf, 0x75, 0x0a, 0x00, 0xfe, 0xb5, 0x76, 0x68, 0xfd, 0x22, 0x0a,
0xf5, 0x67, 0xdc, 0xa3, 0xc7, 0x05, 0x0b, 0xf5, 0xf7, 0x76, 0x5b, 0x7a, 0x5c, 0x5c, 0xac, 0x7f,
0x31, 0xe0, 0xf4, 0x1d, 0x2b, 0xde, 0x42, 0x42, 0xae, 0x89, 0x5c, 0x9f, 0xba, 0xb7, 0x9c, 0xeb,
0xa3, 0x79, 0x2a, 0xba, 0x61, 0xe2, 0x3d, 0xf0, 0x21, 0x10, 0xa2, 0xd0, 0x67, 0x41, 0x5a, 0xbe,
0xd4, 0xe7, 0x49, 0x51, 0x91, 0x78, 0x03, 0x71, 0xb0, 0x40, 0x64, 0xfa, 0xc4, 0x98, 0x65, 0xae,
0x4f, 0x7d, 0x51, 0xd1, 0x3d, 0xa7, 0x68, 0x7e, 0xfa, 0x13, 0x00, 0x00, 0xff, 0xff, 0x7e, 0xbb,
0x6d, 0x5c, 0xaa, 0x02, 0x00, 0x00,
}

View File

@ -2,7 +2,9 @@ syntax = "proto3";
// Router service is used by the proxy to lookup routes
service Router {
rpc Watch(WatchRequest) returns (stream TableEvent) {};
rpc Lookup(LookupRequest) returns (LookupResponse) {};
rpc List(ListRequest) returns (ListResponse) {};
}
// LookupRequest is made to Lookup
@ -10,11 +12,39 @@ message LookupRequest {
Query query = 1;
}
// LookupResponse is returns by Lookup
// LookupResponse is returned by Lookup
message LookupResponse {
repeated Route routes = 1;
}
// WatchRequest is made to Watch Router
message WatchRequest {}
// EventType is TableEvent type
enum EventType {
Insert = 0;
Delete = 1;
Update = 2;
}
// TableEvent is streamed by WatchRequest
message TableEvent {
// time of table event
EventType type = 1;
// unix timestamp of event
int64 timestamp = 2;
// service route
Route route = 3;
}
// ListRequest is made to List routes
message ListRequest {}
// ListResponse is returned by List
message ListResponse {
repeated Route routes = 1;
}
// Query is passed in a LookupRequest
message Query {
// service to lookup

View File

@ -2,6 +2,7 @@ package table
import (
"sync"
"time"
"github.com/google/uuid"
)
@ -51,8 +52,8 @@ func (t *table) Options() TableOptions {
return t.opts
}
// Add adds a route to the routing table
func (t *table) Add(r Route) error {
// Create creates new route in the routing table
func (t *table) Create(r Route) error {
service := r.Service
sum := r.Hash()
@ -63,14 +64,14 @@ func (t *table) Add(r Route) error {
if _, ok := t.m[service]; !ok {
t.m[service] = make(map[uint64]Route)
t.m[service][sum] = r
go t.sendEvent(&Event{Type: Insert, Route: r})
go t.sendEvent(&Event{Type: Create, Timestamp: time.Now(), Route: r})
return nil
}
// add new route to the table for the route destination
if _, ok := t.m[service][sum]; !ok {
t.m[service][sum] = r
go t.sendEvent(&Event{Type: Insert, Route: r})
go t.sendEvent(&Event{Type: Create, Timestamp: time.Now(), Route: r})
return nil
}
@ -90,7 +91,7 @@ func (t *table) Delete(r Route) error {
}
delete(t.m[service], sum)
go t.sendEvent(&Event{Type: Delete, Route: r})
go t.sendEvent(&Event{Type: Delete, Timestamp: time.Now(), Route: r})
return nil
}
@ -111,7 +112,7 @@ func (t *table) Update(r Route) error {
// if the route has been found update it
if _, ok := t.m[service][sum]; ok {
t.m[service][sum] = r
go t.sendEvent(&Event{Type: Update, Route: r})
go t.sendEvent(&Event{Type: Update, Timestamp: time.Now(), Route: r})
return nil
}

View File

@ -16,11 +16,11 @@ func testSetup() (Table, Route) {
return table, route
}
func TestAdd(t *testing.T) {
func TestCreate(t *testing.T) {
table, route := testSetup()
testTableSize := table.Size()
if err := table.Add(route); err != nil {
if err := table.Create(route); err != nil {
t.Errorf("error adding route: %s", err)
}
testTableSize += 1
@ -28,7 +28,7 @@ func TestAdd(t *testing.T) {
// adds new route for the original destination
route.Gateway = "dest.gw2"
if err := table.Add(route); err != nil {
if err := table.Create(route); err != nil {
t.Errorf("error adding route: %s", err)
}
testTableSize += 1
@ -38,7 +38,7 @@ func TestAdd(t *testing.T) {
}
// adding the same route under Insert policy must error
if err := table.Add(route); err != ErrDuplicateRoute {
if err := table.Create(route); err != ErrDuplicateRoute {
t.Errorf("error adding route. Expected error: %s, found: %s", ErrDuplicateRoute, err)
}
}
@ -47,7 +47,7 @@ func TestDelete(t *testing.T) {
table, route := testSetup()
testTableSize := table.Size()
if err := table.Add(route); err != nil {
if err := table.Create(route); err != nil {
t.Errorf("error adding route: %s", err)
}
testTableSize += 1
@ -77,7 +77,7 @@ func TestUpdate(t *testing.T) {
table, route := testSetup()
testTableSize := table.Size()
if err := table.Add(route); err != nil {
if err := table.Create(route); err != nil {
t.Errorf("error adding route: %s", err)
}
testTableSize += 1
@ -113,7 +113,7 @@ func TestList(t *testing.T) {
for i := 0; i < len(svc); i++ {
route.Service = svc[i]
if err := table.Add(route); err != nil {
if err := table.Create(route); err != nil {
t.Errorf("error adding route: %s", err)
}
}
@ -143,7 +143,7 @@ func TestLookup(t *testing.T) {
route.Service = svc[i]
route.Network = net[i]
route.Gateway = gw[i]
if err := table.Add(route); err != nil {
if err := table.Create(route); err != nil {
t.Errorf("error adding route: %s", err)
}
}

View File

@ -13,8 +13,8 @@ var (
// Table defines routing table interface
type Table interface {
// Add adds new route to the routing table
Add(Route) error
// Create new route in the routing table
Create(Route) error
// Delete deletes existing route from the routing table
Delete(Route) error
// Update updates route in the routing table

View File

@ -2,7 +2,6 @@ package table
import (
"errors"
"fmt"
"time"
)
@ -15,28 +14,14 @@ var (
type EventType int
const (
// Insert is emitted when a new route has been inserted
Insert EventType = iota
// Create is emitted when a new route has been created
Create EventType = iota
// Delete is emitted when an existing route has been deleted
Delete
// Update is emitted when an existing route has been updated
Update
)
// String returns string representation of the event
func (et EventType) String() string {
switch et {
case Insert:
return "INSERT"
case Delete:
return "DELETE"
case Update:
return "UPDATE"
default:
return "UNKNOWN"
}
}
// Event is returned by a call to Next on the watcher.
type Event struct {
// Type defines type of event
@ -47,11 +32,6 @@ type Event struct {
Route Route
}
// String prints human readable Event
func (e Event) String() string {
return fmt.Sprintf("[EVENT] time: %s type: %s", e.Timestamp, e.Type)
}
// WatchOption is used to define what routes to watch in the table
type WatchOption func(*WatchOptions)
@ -88,7 +68,7 @@ type tableWatcher struct {
// Next returns the next noticed action taken on table
// TODO: this needs to be thought through properly;
// right now we only allow to watch destination
// right now we only allow to watch service
func (w *tableWatcher) Next() (*Event, error) {
for {
select {