Merge pull request #2 from micro/master

merge
This commit is contained in:
Shu xian 2020-01-23 22:17:54 +08:00 committed by GitHub
commit bd6a5af2b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 385 additions and 667 deletions

View File

@ -1,5 +1,5 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// source: proto/service.proto // source: go-micro/config/source/service/proto/service.proto
package service package service
@ -35,7 +35,7 @@ func (m *ChangeSet) Reset() { *m = ChangeSet{} }
func (m *ChangeSet) String() string { return proto.CompactTextString(m) } func (m *ChangeSet) String() string { return proto.CompactTextString(m) }
func (*ChangeSet) ProtoMessage() {} func (*ChangeSet) ProtoMessage() {}
func (*ChangeSet) Descriptor() ([]byte, []int) { func (*ChangeSet) Descriptor() ([]byte, []int) {
return fileDescriptor_c33392ef2c1961ba, []int{0} return fileDescriptor_05971a9aaecb0484, []int{0}
} }
func (m *ChangeSet) XXX_Unmarshal(b []byte) error { func (m *ChangeSet) XXX_Unmarshal(b []byte) error {
@ -104,7 +104,7 @@ func (m *Change) Reset() { *m = Change{} }
func (m *Change) String() string { return proto.CompactTextString(m) } func (m *Change) String() string { return proto.CompactTextString(m) }
func (*Change) ProtoMessage() {} func (*Change) ProtoMessage() {}
func (*Change) Descriptor() ([]byte, []int) { func (*Change) Descriptor() ([]byte, []int) {
return fileDescriptor_c33392ef2c1961ba, []int{1} return fileDescriptor_05971a9aaecb0484, []int{1}
} }
func (m *Change) XXX_Unmarshal(b []byte) error { func (m *Change) XXX_Unmarshal(b []byte) error {
@ -157,7 +157,7 @@ func (m *CreateRequest) Reset() { *m = CreateRequest{} }
func (m *CreateRequest) String() string { return proto.CompactTextString(m) } func (m *CreateRequest) String() string { return proto.CompactTextString(m) }
func (*CreateRequest) ProtoMessage() {} func (*CreateRequest) ProtoMessage() {}
func (*CreateRequest) Descriptor() ([]byte, []int) { func (*CreateRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_c33392ef2c1961ba, []int{2} return fileDescriptor_05971a9aaecb0484, []int{2}
} }
func (m *CreateRequest) XXX_Unmarshal(b []byte) error { func (m *CreateRequest) XXX_Unmarshal(b []byte) error {
@ -195,7 +195,7 @@ func (m *CreateResponse) Reset() { *m = CreateResponse{} }
func (m *CreateResponse) String() string { return proto.CompactTextString(m) } func (m *CreateResponse) String() string { return proto.CompactTextString(m) }
func (*CreateResponse) ProtoMessage() {} func (*CreateResponse) ProtoMessage() {}
func (*CreateResponse) Descriptor() ([]byte, []int) { func (*CreateResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_c33392ef2c1961ba, []int{3} return fileDescriptor_05971a9aaecb0484, []int{3}
} }
func (m *CreateResponse) XXX_Unmarshal(b []byte) error { func (m *CreateResponse) XXX_Unmarshal(b []byte) error {
@ -227,7 +227,7 @@ func (m *UpdateRequest) Reset() { *m = UpdateRequest{} }
func (m *UpdateRequest) String() string { return proto.CompactTextString(m) } func (m *UpdateRequest) String() string { return proto.CompactTextString(m) }
func (*UpdateRequest) ProtoMessage() {} func (*UpdateRequest) ProtoMessage() {}
func (*UpdateRequest) Descriptor() ([]byte, []int) { func (*UpdateRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_c33392ef2c1961ba, []int{4} return fileDescriptor_05971a9aaecb0484, []int{4}
} }
func (m *UpdateRequest) XXX_Unmarshal(b []byte) error { func (m *UpdateRequest) XXX_Unmarshal(b []byte) error {
@ -265,7 +265,7 @@ func (m *UpdateResponse) Reset() { *m = UpdateResponse{} }
func (m *UpdateResponse) String() string { return proto.CompactTextString(m) } func (m *UpdateResponse) String() string { return proto.CompactTextString(m) }
func (*UpdateResponse) ProtoMessage() {} func (*UpdateResponse) ProtoMessage() {}
func (*UpdateResponse) Descriptor() ([]byte, []int) { func (*UpdateResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_c33392ef2c1961ba, []int{5} return fileDescriptor_05971a9aaecb0484, []int{5}
} }
func (m *UpdateResponse) XXX_Unmarshal(b []byte) error { func (m *UpdateResponse) XXX_Unmarshal(b []byte) error {
@ -297,7 +297,7 @@ func (m *DeleteRequest) Reset() { *m = DeleteRequest{} }
func (m *DeleteRequest) String() string { return proto.CompactTextString(m) } func (m *DeleteRequest) String() string { return proto.CompactTextString(m) }
func (*DeleteRequest) ProtoMessage() {} func (*DeleteRequest) ProtoMessage() {}
func (*DeleteRequest) Descriptor() ([]byte, []int) { func (*DeleteRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_c33392ef2c1961ba, []int{6} return fileDescriptor_05971a9aaecb0484, []int{6}
} }
func (m *DeleteRequest) XXX_Unmarshal(b []byte) error { func (m *DeleteRequest) XXX_Unmarshal(b []byte) error {
@ -335,7 +335,7 @@ func (m *DeleteResponse) Reset() { *m = DeleteResponse{} }
func (m *DeleteResponse) String() string { return proto.CompactTextString(m) } func (m *DeleteResponse) String() string { return proto.CompactTextString(m) }
func (*DeleteResponse) ProtoMessage() {} func (*DeleteResponse) ProtoMessage() {}
func (*DeleteResponse) Descriptor() ([]byte, []int) { func (*DeleteResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_c33392ef2c1961ba, []int{7} return fileDescriptor_05971a9aaecb0484, []int{7}
} }
func (m *DeleteResponse) XXX_Unmarshal(b []byte) error { func (m *DeleteResponse) XXX_Unmarshal(b []byte) error {
@ -366,7 +366,7 @@ func (m *ListRequest) Reset() { *m = ListRequest{} }
func (m *ListRequest) String() string { return proto.CompactTextString(m) } func (m *ListRequest) String() string { return proto.CompactTextString(m) }
func (*ListRequest) ProtoMessage() {} func (*ListRequest) ProtoMessage() {}
func (*ListRequest) Descriptor() ([]byte, []int) { func (*ListRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_c33392ef2c1961ba, []int{8} return fileDescriptor_05971a9aaecb0484, []int{8}
} }
func (m *ListRequest) XXX_Unmarshal(b []byte) error { func (m *ListRequest) XXX_Unmarshal(b []byte) error {
@ -398,7 +398,7 @@ func (m *ListResponse) Reset() { *m = ListResponse{} }
func (m *ListResponse) String() string { return proto.CompactTextString(m) } func (m *ListResponse) String() string { return proto.CompactTextString(m) }
func (*ListResponse) ProtoMessage() {} func (*ListResponse) ProtoMessage() {}
func (*ListResponse) Descriptor() ([]byte, []int) { func (*ListResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_c33392ef2c1961ba, []int{9} return fileDescriptor_05971a9aaecb0484, []int{9}
} }
func (m *ListResponse) XXX_Unmarshal(b []byte) error { func (m *ListResponse) XXX_Unmarshal(b []byte) error {
@ -438,7 +438,7 @@ func (m *ReadRequest) Reset() { *m = ReadRequest{} }
func (m *ReadRequest) String() string { return proto.CompactTextString(m) } func (m *ReadRequest) String() string { return proto.CompactTextString(m) }
func (*ReadRequest) ProtoMessage() {} func (*ReadRequest) ProtoMessage() {}
func (*ReadRequest) Descriptor() ([]byte, []int) { func (*ReadRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_c33392ef2c1961ba, []int{10} return fileDescriptor_05971a9aaecb0484, []int{10}
} }
func (m *ReadRequest) XXX_Unmarshal(b []byte) error { func (m *ReadRequest) XXX_Unmarshal(b []byte) error {
@ -484,7 +484,7 @@ func (m *ReadResponse) Reset() { *m = ReadResponse{} }
func (m *ReadResponse) String() string { return proto.CompactTextString(m) } func (m *ReadResponse) String() string { return proto.CompactTextString(m) }
func (*ReadResponse) ProtoMessage() {} func (*ReadResponse) ProtoMessage() {}
func (*ReadResponse) Descriptor() ([]byte, []int) { func (*ReadResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_c33392ef2c1961ba, []int{11} return fileDescriptor_05971a9aaecb0484, []int{11}
} }
func (m *ReadResponse) XXX_Unmarshal(b []byte) error { func (m *ReadResponse) XXX_Unmarshal(b []byte) error {
@ -524,7 +524,7 @@ func (m *WatchRequest) Reset() { *m = WatchRequest{} }
func (m *WatchRequest) String() string { return proto.CompactTextString(m) } func (m *WatchRequest) String() string { return proto.CompactTextString(m) }
func (*WatchRequest) ProtoMessage() {} func (*WatchRequest) ProtoMessage() {}
func (*WatchRequest) Descriptor() ([]byte, []int) { func (*WatchRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_c33392ef2c1961ba, []int{12} return fileDescriptor_05971a9aaecb0484, []int{12}
} }
func (m *WatchRequest) XXX_Unmarshal(b []byte) error { func (m *WatchRequest) XXX_Unmarshal(b []byte) error {
@ -571,7 +571,7 @@ func (m *WatchResponse) Reset() { *m = WatchResponse{} }
func (m *WatchResponse) String() string { return proto.CompactTextString(m) } func (m *WatchResponse) String() string { return proto.CompactTextString(m) }
func (*WatchResponse) ProtoMessage() {} func (*WatchResponse) ProtoMessage() {}
func (*WatchResponse) Descriptor() ([]byte, []int) { func (*WatchResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_c33392ef2c1961ba, []int{13} return fileDescriptor_05971a9aaecb0484, []int{13}
} }
func (m *WatchResponse) XXX_Unmarshal(b []byte) error { func (m *WatchResponse) XXX_Unmarshal(b []byte) error {
@ -623,35 +623,38 @@ func init() {
proto.RegisterType((*WatchResponse)(nil), "WatchResponse") proto.RegisterType((*WatchResponse)(nil), "WatchResponse")
} }
func init() { proto.RegisterFile("proto/service.proto", fileDescriptor_c33392ef2c1961ba) } func init() {
proto.RegisterFile("go-micro/config/source/service/proto/service.proto", fileDescriptor_05971a9aaecb0484)
var fileDescriptor_c33392ef2c1961ba = []byte{ }
// 427 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x93, 0xcf, 0x6a, 0xdb, 0x40, var fileDescriptor_05971a9aaecb0484 = []byte{
0x10, 0xc6, 0x2d, 0xdb, 0x51, 0xaa, 0xb1, 0x24, 0x87, 0x29, 0x14, 0x21, 0x0a, 0x35, 0x0b, 0x05, // 443 bytes of a gzipped FileDescriptorProto
0xd3, 0xc2, 0x26, 0x38, 0x7d, 0x83, 0xf4, 0xd6, 0x9e, 0x36, 0x94, 0xf6, 0xba, 0x95, 0xa7, 0xb5, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x93, 0xcf, 0x6a, 0xdb, 0x40,
0x49, 0x14, 0xa9, 0xda, 0x55, 0xa0, 0x8f, 0xd0, 0x3e, 0x75, 0xd9, 0x3f, 0x8a, 0x24, 0x17, 0x4c, 0x10, 0xc6, 0x2d, 0xdb, 0x51, 0xab, 0xb1, 0x24, 0x87, 0x39, 0x14, 0x21, 0x0a, 0x35, 0x82, 0x82,
0x7c, 0xdb, 0x99, 0xd9, 0xf9, 0xe6, 0xd3, 0xfc, 0x56, 0xf0, 0xb2, 0x6e, 0x2a, 0x5d, 0x5d, 0x2a, 0x69, 0xe9, 0x2a, 0x38, 0x7d, 0x03, 0xf7, 0xd6, 0x9e, 0x54, 0x4a, 0x7b, 0xdd, 0xca, 0x13, 0x5b,
0x6a, 0x1e, 0xf7, 0x05, 0x71, 0x1b, 0xb1, 0x3f, 0x01, 0x44, 0x37, 0x3b, 0xf9, 0xf0, 0x93, 0x6e, 0x24, 0xf2, 0xaa, 0xda, 0x75, 0xa0, 0x8f, 0x90, 0xb7, 0x2e, 0xfb, 0x47, 0xb1, 0xe4, 0x82, 0x89,
0x49, 0x23, 0xc2, 0x7c, 0x2b, 0xb5, 0xcc, 0x82, 0x55, 0xb0, 0x8e, 0x85, 0x3d, 0x63, 0x0e, 0x2f, 0x6f, 0x3b, 0x33, 0x3b, 0xdf, 0x7c, 0x9a, 0x9f, 0x16, 0x56, 0x5b, 0xf1, 0xa9, 0xae, 0xca, 0x56,
0x8a, 0x1d, 0x15, 0x77, 0xaa, 0x2d, 0xb3, 0xe9, 0x2a, 0x58, 0x47, 0xe2, 0x29, 0xc6, 0x57, 0x10, 0xe4, 0xa5, 0xd8, 0xdf, 0x55, 0xdb, 0x5c, 0x8a, 0x43, 0x5b, 0x52, 0x2e, 0xa9, 0x7d, 0xac, 0x4a,
0xfe, 0xa8, 0x9a, 0x52, 0xea, 0x6c, 0x66, 0x2b, 0x3e, 0x32, 0x79, 0x55, 0xb5, 0x4d, 0x41, 0xd9, 0xca, 0x9b, 0x56, 0x28, 0xd1, 0x45, 0xcc, 0x44, 0xd9, 0x93, 0x07, 0xc1, 0x7a, 0xc7, 0xf7, 0x5b,
0xdc, 0xe5, 0x5d, 0x84, 0xaf, 0x21, 0xd2, 0xfb, 0x92, 0x94, 0x96, 0x65, 0x9d, 0x9d, 0xad, 0x82, 0xfa, 0x4e, 0x0a, 0x11, 0xa6, 0x1b, 0xae, 0x78, 0xe2, 0x2d, 0xbc, 0x65, 0x58, 0x98, 0x33, 0xa6,
0xf5, 0x4c, 0xf4, 0x09, 0xf6, 0x0d, 0x42, 0x67, 0x05, 0x2f, 0x60, 0x76, 0x47, 0xbf, 0xad, 0x8d, 0xf0, 0xba, 0xdc, 0x51, 0x79, 0x2f, 0x0f, 0x75, 0x32, 0x5e, 0x78, 0xcb, 0xa0, 0x78, 0x8e, 0xf1,
0x48, 0x98, 0xa3, 0x71, 0x56, 0x4b, 0xbd, 0xf3, 0x0e, 0xec, 0x19, 0xd7, 0x10, 0x15, 0x9d, 0x75, 0x0d, 0xf8, 0x77, 0xa2, 0xad, 0xb9, 0x4a, 0x26, 0xa6, 0xe2, 0x22, 0x9d, 0xb7, 0xb3, 0x93, 0xa9,
0x6b, 0x60, 0xb1, 0x01, 0xfe, 0xf4, 0x31, 0xa2, 0x2f, 0xb2, 0x2b, 0x48, 0x6e, 0x1a, 0x92, 0x9a, 0xcd, 0xdb, 0x08, 0xdf, 0x42, 0xa0, 0xaa, 0x9a, 0xa4, 0xe2, 0x75, 0x93, 0x5c, 0x2d, 0xbc, 0xe5,
0x04, 0xfd, 0x6a, 0x49, 0x69, 0x7c, 0x03, 0xa1, 0xab, 0xda, 0x19, 0x8b, 0xcd, 0xb9, 0xef, 0x13, 0xa4, 0x38, 0x26, 0xb2, 0x5f, 0xe0, 0x5b, 0x2b, 0x78, 0x0d, 0x93, 0x7b, 0xfa, 0x6b, 0x6c, 0x04,
0x3e, 0xcd, 0x2e, 0x20, 0xed, 0x3a, 0x54, 0x5d, 0x3d, 0x28, 0x32, 0x1a, 0x5f, 0xea, 0xed, 0x89, 0x85, 0x3e, 0x6a, 0x67, 0x0d, 0x57, 0x3b, 0xe7, 0xc0, 0x9c, 0x71, 0x09, 0x41, 0xd9, 0x59, 0x37,
0x1a, 0x5d, 0x47, 0xaf, 0xf1, 0x91, 0xee, 0xe9, 0x34, 0x8d, 0xae, 0xc3, 0x6b, 0x24, 0xb0, 0xf8, 0x06, 0x66, 0x2b, 0x60, 0xcf, 0x1f, 0x53, 0x1c, 0x8b, 0xd9, 0x0d, 0x44, 0xeb, 0x96, 0xb8, 0xa2,
0xbc, 0x57, 0xda, 0x2b, 0xb0, 0x4b, 0x88, 0x5d, 0xe8, 0xca, 0x46, 0xf1, 0x51, 0xde, 0xb7, 0xa4, 0x82, 0xfe, 0x1c, 0x48, 0x2a, 0x7c, 0x07, 0xbe, 0xad, 0x9a, 0x19, 0xb3, 0xd5, 0x2b, 0xd7, 0x57,
0xb2, 0x60, 0x35, 0x1b, 0x29, 0xba, 0x34, 0xbb, 0x86, 0x85, 0x20, 0xb9, 0xed, 0x1c, 0x3c, 0x6b, 0xb8, 0x74, 0x76, 0x0d, 0x71, 0xd7, 0x21, 0x1b, 0xb1, 0x97, 0xa4, 0x35, 0x7e, 0x34, 0x9b, 0x0b,
0xd5, 0x66, 0x8a, 0x6b, 0xea, 0xa7, 0x1c, 0xf7, 0xfd, 0x01, 0xe2, 0xaf, 0x52, 0x17, 0xbb, 0xd3, 0x35, 0xba, 0x8e, 0xa3, 0xc6, 0x17, 0x7a, 0xa0, 0xcb, 0x34, 0xba, 0x0e, 0xa7, 0x11, 0xc1, 0xec,
0xc6, 0x7c, 0x82, 0xc4, 0x77, 0xf9, 0x39, 0xff, 0xb7, 0x8d, 0xa0, 0x4f, 0x8f, 0x40, 0xdf, 0xfc, 0x5b, 0x25, 0x95, 0x53, 0xc8, 0x72, 0x08, 0x6d, 0x68, 0xcb, 0x5a, 0xf1, 0x91, 0x3f, 0x1c, 0x48,
0x9d, 0xc2, 0xf9, 0xad, 0x7b, 0xec, 0xf8, 0x1e, 0x42, 0x87, 0x13, 0x53, 0x3e, 0x7a, 0x09, 0xf9, 0x26, 0xde, 0x62, 0x32, 0x50, 0xb4, 0xe9, 0xec, 0x16, 0x66, 0x05, 0xf1, 0x4d, 0xe7, 0xe0, 0x45,
0x92, 0x1f, 0x70, 0x9e, 0x98, 0xcb, 0x8e, 0x1b, 0xa6, 0x7c, 0x84, 0x3c, 0x5f, 0xf2, 0x03, 0xa0, 0xab, 0xd6, 0x53, 0x6c, 0xd3, 0x71, 0xca, 0x79, 0xdf, 0x9f, 0x21, 0xfc, 0xc9, 0x55, 0xb9, 0xbb,
0xf6, 0xb2, 0x03, 0x84, 0x29, 0x1f, 0xb1, 0xcd, 0x97, 0xfc, 0x80, 0xdc, 0x04, 0xdf, 0xc2, 0xdc, 0x6c, 0xcc, 0x57, 0x88, 0x5c, 0x97, 0x9b, 0xf3, 0x7f, 0xdb, 0x00, 0xfa, 0xf8, 0x0c, 0xf4, 0xd5,
0xc0, 0xc2, 0x98, 0x0f, 0x10, 0xe6, 0x09, 0x1f, 0x12, 0x74, 0xd7, 0xcc, 0xb6, 0x31, 0xe6, 0x03, 0xd3, 0x18, 0xfc, 0xb5, 0x79, 0x08, 0xf8, 0x11, 0x7c, 0x4b, 0x13, 0x63, 0x36, 0xf8, 0x11, 0xd2,
0x52, 0x79, 0xc2, 0x87, 0x08, 0xd8, 0x04, 0xdf, 0xc1, 0x99, 0xdd, 0x16, 0x26, 0x7c, 0xb8, 0xeb, 0x39, 0x3b, 0xc1, 0x3c, 0xd2, 0x97, 0x2d, 0x36, 0x8c, 0xd9, 0x80, 0x78, 0x3a, 0x67, 0x27, 0x3c,
0x3c, 0xe5, 0xa3, 0x25, 0xb2, 0xc9, 0x55, 0xf0, 0x3d, 0xb4, 0xbf, 0xfb, 0xf5, 0xbf, 0x00, 0x00, 0xcd, 0x65, 0xcb, 0x07, 0x63, 0x36, 0x40, 0x9b, 0xce, 0xd9, 0x09, 0xb8, 0x11, 0xbe, 0x87, 0xa9,
0x00, 0xff, 0xff, 0xc3, 0x45, 0xac, 0x57, 0x05, 0x04, 0x00, 0x00, 0x66, 0x85, 0x21, 0xeb, 0x11, 0x4c, 0x23, 0xd6, 0x07, 0x68, 0xaf, 0xe9, 0x65, 0x63, 0xc8, 0x7a,
0xa0, 0xd2, 0x88, 0xf5, 0x09, 0x64, 0x23, 0xfc, 0x00, 0x57, 0x66, 0x59, 0x18, 0xb1, 0xfe, 0xaa,
0xd3, 0x98, 0x0d, 0x76, 0x98, 0x8d, 0x6e, 0xbc, 0xdf, 0xbe, 0x79, 0xed, 0xb7, 0xff, 0x02, 0x00,
0x00, 0xff, 0xff, 0xea, 0xa0, 0x1e, 0x8e, 0x23, 0x04, 0x00, 0x00,
} }

View File

@ -1,5 +1,5 @@
// Code generated by protoc-gen-micro. DO NOT EDIT. // Code generated by protoc-gen-micro. DO NOT EDIT.
// source: proto/service.proto // source: go-micro/config/source/service/proto/service.proto
package service package service
@ -31,37 +31,37 @@ var _ context.Context
var _ client.Option var _ client.Option
var _ server.Option var _ server.Option
// Client API for Service service // Client API for Config service
type Service interface { type ConfigService interface {
Create(ctx context.Context, in *CreateRequest, opts ...client.CallOption) (*CreateResponse, error) Create(ctx context.Context, in *CreateRequest, opts ...client.CallOption) (*CreateResponse, error)
Update(ctx context.Context, in *UpdateRequest, opts ...client.CallOption) (*UpdateResponse, error) Update(ctx context.Context, in *UpdateRequest, opts ...client.CallOption) (*UpdateResponse, error)
Delete(ctx context.Context, in *DeleteRequest, opts ...client.CallOption) (*DeleteResponse, error) Delete(ctx context.Context, in *DeleteRequest, opts ...client.CallOption) (*DeleteResponse, error)
List(ctx context.Context, in *ListRequest, opts ...client.CallOption) (*ListResponse, error) List(ctx context.Context, in *ListRequest, opts ...client.CallOption) (*ListResponse, error)
Read(ctx context.Context, in *ReadRequest, opts ...client.CallOption) (*ReadResponse, error) Read(ctx context.Context, in *ReadRequest, opts ...client.CallOption) (*ReadResponse, error)
Watch(ctx context.Context, in *WatchRequest, opts ...client.CallOption) (Service_WatchService, error) Watch(ctx context.Context, in *WatchRequest, opts ...client.CallOption) (Config_WatchService, error)
} }
type service struct { type configService struct {
c client.Client c client.Client
name string name string
} }
func NewService(name string, c client.Client) Service { func NewConfigService(name string, c client.Client) ConfigService {
if c == nil { if c == nil {
c = client.NewClient() c = client.NewClient()
} }
if len(name) == 0 { if len(name) == 0 {
name = "service" name = "config"
} }
return &service{ return &configService{
c: c, c: c,
name: name, name: name,
} }
} }
func (c *service) Create(ctx context.Context, in *CreateRequest, opts ...client.CallOption) (*CreateResponse, error) { func (c *configService) Create(ctx context.Context, in *CreateRequest, opts ...client.CallOption) (*CreateResponse, error) {
req := c.c.NewRequest(c.name, "Service.Create", in) req := c.c.NewRequest(c.name, "Config.Create", in)
out := new(CreateResponse) out := new(CreateResponse)
err := c.c.Call(ctx, req, out, opts...) err := c.c.Call(ctx, req, out, opts...)
if err != nil { if err != nil {
@ -70,8 +70,8 @@ func (c *service) Create(ctx context.Context, in *CreateRequest, opts ...client.
return out, nil return out, nil
} }
func (c *service) Update(ctx context.Context, in *UpdateRequest, opts ...client.CallOption) (*UpdateResponse, error) { func (c *configService) Update(ctx context.Context, in *UpdateRequest, opts ...client.CallOption) (*UpdateResponse, error) {
req := c.c.NewRequest(c.name, "Service.Update", in) req := c.c.NewRequest(c.name, "Config.Update", in)
out := new(UpdateResponse) out := new(UpdateResponse)
err := c.c.Call(ctx, req, out, opts...) err := c.c.Call(ctx, req, out, opts...)
if err != nil { if err != nil {
@ -80,8 +80,8 @@ func (c *service) Update(ctx context.Context, in *UpdateRequest, opts ...client.
return out, nil return out, nil
} }
func (c *service) Delete(ctx context.Context, in *DeleteRequest, opts ...client.CallOption) (*DeleteResponse, error) { func (c *configService) Delete(ctx context.Context, in *DeleteRequest, opts ...client.CallOption) (*DeleteResponse, error) {
req := c.c.NewRequest(c.name, "Service.Delete", in) req := c.c.NewRequest(c.name, "Config.Delete", in)
out := new(DeleteResponse) out := new(DeleteResponse)
err := c.c.Call(ctx, req, out, opts...) err := c.c.Call(ctx, req, out, opts...)
if err != nil { if err != nil {
@ -90,8 +90,8 @@ func (c *service) Delete(ctx context.Context, in *DeleteRequest, opts ...client.
return out, nil return out, nil
} }
func (c *service) List(ctx context.Context, in *ListRequest, opts ...client.CallOption) (*ListResponse, error) { func (c *configService) List(ctx context.Context, in *ListRequest, opts ...client.CallOption) (*ListResponse, error) {
req := c.c.NewRequest(c.name, "Service.List", in) req := c.c.NewRequest(c.name, "Config.List", in)
out := new(ListResponse) out := new(ListResponse)
err := c.c.Call(ctx, req, out, opts...) err := c.c.Call(ctx, req, out, opts...)
if err != nil { if err != nil {
@ -100,8 +100,8 @@ func (c *service) List(ctx context.Context, in *ListRequest, opts ...client.Call
return out, nil return out, nil
} }
func (c *service) Read(ctx context.Context, in *ReadRequest, opts ...client.CallOption) (*ReadResponse, error) { func (c *configService) Read(ctx context.Context, in *ReadRequest, opts ...client.CallOption) (*ReadResponse, error) {
req := c.c.NewRequest(c.name, "Service.Read", in) req := c.c.NewRequest(c.name, "Config.Read", in)
out := new(ReadResponse) out := new(ReadResponse)
err := c.c.Call(ctx, req, out, opts...) err := c.c.Call(ctx, req, out, opts...)
if err != nil { if err != nil {
@ -110,8 +110,8 @@ func (c *service) Read(ctx context.Context, in *ReadRequest, opts ...client.Call
return out, nil return out, nil
} }
func (c *service) Watch(ctx context.Context, in *WatchRequest, opts ...client.CallOption) (Service_WatchService, error) { func (c *configService) Watch(ctx context.Context, in *WatchRequest, opts ...client.CallOption) (Config_WatchService, error) {
req := c.c.NewRequest(c.name, "Service.Watch", &WatchRequest{}) req := c.c.NewRequest(c.name, "Config.Watch", &WatchRequest{})
stream, err := c.c.Stream(ctx, req, opts...) stream, err := c.c.Stream(ctx, req, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
@ -119,33 +119,33 @@ func (c *service) Watch(ctx context.Context, in *WatchRequest, opts ...client.Ca
if err := stream.Send(in); err != nil { if err := stream.Send(in); err != nil {
return nil, err return nil, err
} }
return &serviceWatch{stream}, nil return &configServiceWatch{stream}, nil
} }
type Service_WatchService interface { type Config_WatchService interface {
SendMsg(interface{}) error SendMsg(interface{}) error
RecvMsg(interface{}) error RecvMsg(interface{}) error
Close() error Close() error
Recv() (*WatchResponse, error) Recv() (*WatchResponse, error)
} }
type serviceWatch struct { type configServiceWatch struct {
stream client.Stream stream client.Stream
} }
func (x *serviceWatch) Close() error { func (x *configServiceWatch) Close() error {
return x.stream.Close() return x.stream.Close()
} }
func (x *serviceWatch) SendMsg(m interface{}) error { func (x *configServiceWatch) SendMsg(m interface{}) error {
return x.stream.Send(m) return x.stream.Send(m)
} }
func (x *serviceWatch) RecvMsg(m interface{}) error { func (x *configServiceWatch) RecvMsg(m interface{}) error {
return x.stream.Recv(m) return x.stream.Recv(m)
} }
func (x *serviceWatch) Recv() (*WatchResponse, error) { func (x *configServiceWatch) Recv() (*WatchResponse, error) {
m := new(WatchResponse) m := new(WatchResponse)
err := x.stream.Recv(m) err := x.stream.Recv(m)
if err != nil { if err != nil {
@ -154,19 +154,19 @@ func (x *serviceWatch) Recv() (*WatchResponse, error) {
return m, nil return m, nil
} }
// Server API for Service service // Server API for Config service
type ServiceHandler interface { type ConfigHandler interface {
Create(context.Context, *CreateRequest, *CreateResponse) error Create(context.Context, *CreateRequest, *CreateResponse) error
Update(context.Context, *UpdateRequest, *UpdateResponse) error Update(context.Context, *UpdateRequest, *UpdateResponse) error
Delete(context.Context, *DeleteRequest, *DeleteResponse) error Delete(context.Context, *DeleteRequest, *DeleteResponse) error
List(context.Context, *ListRequest, *ListResponse) error List(context.Context, *ListRequest, *ListResponse) error
Read(context.Context, *ReadRequest, *ReadResponse) error Read(context.Context, *ReadRequest, *ReadResponse) error
Watch(context.Context, *WatchRequest, Service_WatchStream) error Watch(context.Context, *WatchRequest, Config_WatchStream) error
} }
func RegisterServiceHandler(s server.Server, hdlr ServiceHandler, opts ...server.HandlerOption) error { func RegisterConfigHandler(s server.Server, hdlr ConfigHandler, opts ...server.HandlerOption) error {
type service interface { type config interface {
Create(ctx context.Context, in *CreateRequest, out *CreateResponse) error Create(ctx context.Context, in *CreateRequest, out *CreateResponse) error
Update(ctx context.Context, in *UpdateRequest, out *UpdateResponse) error Update(ctx context.Context, in *UpdateRequest, out *UpdateResponse) error
Delete(ctx context.Context, in *DeleteRequest, out *DeleteResponse) error Delete(ctx context.Context, in *DeleteRequest, out *DeleteResponse) error
@ -174,68 +174,68 @@ func RegisterServiceHandler(s server.Server, hdlr ServiceHandler, opts ...server
Read(ctx context.Context, in *ReadRequest, out *ReadResponse) error Read(ctx context.Context, in *ReadRequest, out *ReadResponse) error
Watch(ctx context.Context, stream server.Stream) error Watch(ctx context.Context, stream server.Stream) error
} }
type Service struct { type Config struct {
service config
} }
h := &serviceHandler{hdlr} h := &configHandler{hdlr}
return s.Handle(s.NewHandler(&Service{h}, opts...)) return s.Handle(s.NewHandler(&Config{h}, opts...))
} }
type serviceHandler struct { type configHandler struct {
ServiceHandler ConfigHandler
} }
func (h *serviceHandler) Create(ctx context.Context, in *CreateRequest, out *CreateResponse) error { func (h *configHandler) Create(ctx context.Context, in *CreateRequest, out *CreateResponse) error {
return h.ServiceHandler.Create(ctx, in, out) return h.ConfigHandler.Create(ctx, in, out)
} }
func (h *serviceHandler) Update(ctx context.Context, in *UpdateRequest, out *UpdateResponse) error { func (h *configHandler) Update(ctx context.Context, in *UpdateRequest, out *UpdateResponse) error {
return h.ServiceHandler.Update(ctx, in, out) return h.ConfigHandler.Update(ctx, in, out)
} }
func (h *serviceHandler) Delete(ctx context.Context, in *DeleteRequest, out *DeleteResponse) error { func (h *configHandler) Delete(ctx context.Context, in *DeleteRequest, out *DeleteResponse) error {
return h.ServiceHandler.Delete(ctx, in, out) return h.ConfigHandler.Delete(ctx, in, out)
} }
func (h *serviceHandler) List(ctx context.Context, in *ListRequest, out *ListResponse) error { func (h *configHandler) List(ctx context.Context, in *ListRequest, out *ListResponse) error {
return h.ServiceHandler.List(ctx, in, out) return h.ConfigHandler.List(ctx, in, out)
} }
func (h *serviceHandler) Read(ctx context.Context, in *ReadRequest, out *ReadResponse) error { func (h *configHandler) Read(ctx context.Context, in *ReadRequest, out *ReadResponse) error {
return h.ServiceHandler.Read(ctx, in, out) return h.ConfigHandler.Read(ctx, in, out)
} }
func (h *serviceHandler) Watch(ctx context.Context, stream server.Stream) error { func (h *configHandler) Watch(ctx context.Context, stream server.Stream) error {
m := new(WatchRequest) m := new(WatchRequest)
if err := stream.Recv(m); err != nil { if err := stream.Recv(m); err != nil {
return err return err
} }
return h.ServiceHandler.Watch(ctx, m, &serviceWatchStream{stream}) return h.ConfigHandler.Watch(ctx, m, &configWatchStream{stream})
} }
type Service_WatchStream interface { type Config_WatchStream interface {
SendMsg(interface{}) error SendMsg(interface{}) error
RecvMsg(interface{}) error RecvMsg(interface{}) error
Close() error Close() error
Send(*WatchResponse) error Send(*WatchResponse) error
} }
type serviceWatchStream struct { type configWatchStream struct {
stream server.Stream stream server.Stream
} }
func (x *serviceWatchStream) Close() error { func (x *configWatchStream) Close() error {
return x.stream.Close() return x.stream.Close()
} }
func (x *serviceWatchStream) SendMsg(m interface{}) error { func (x *configWatchStream) SendMsg(m interface{}) error {
return x.stream.Send(m) return x.stream.Send(m)
} }
func (x *serviceWatchStream) RecvMsg(m interface{}) error { func (x *configWatchStream) RecvMsg(m interface{}) error {
return x.stream.Recv(m) return x.stream.Recv(m)
} }
func (x *serviceWatchStream) Send(m *WatchResponse) error { func (x *configWatchStream) Send(m *WatchResponse) error {
return x.stream.Send(m) return x.stream.Send(m)
} }

View File

@ -1,12 +1,12 @@
syntax = "proto3"; syntax = "proto3";
service Service { service Config {
rpc Create (CreateRequest) returns (CreateResponse) {} rpc Create (CreateRequest) returns (CreateResponse) {}
rpc Update (UpdateRequest) returns (UpdateResponse) {} rpc Update (UpdateRequest) returns (UpdateResponse) {}
rpc Delete (DeleteRequest) returns (DeleteResponse) {} rpc Delete (DeleteRequest) returns (DeleteResponse) {}
rpc List (ListRequest) returns (ListResponse) {} rpc List (ListRequest) returns (ListResponse) {}
rpc Read (ReadRequest) returns (ReadResponse) {} rpc Read (ReadRequest) returns (ReadResponse) {}
rpc Watch (WatchRequest) returns (stream WatchResponse) {} rpc Watch (WatchRequest) returns (stream WatchResponse) {}
} }
message ChangeSet { message ChangeSet {
@ -64,4 +64,4 @@ message WatchRequest {
message WatchResponse { message WatchResponse {
string key = 1; string key = 1;
ChangeSet changeSet = 2; ChangeSet changeSet = 2;
} }

View File

@ -21,7 +21,7 @@ type service struct {
key string key string
path string path string
opts source.Options opts source.Options
client proto.Service client proto.ConfigService
} }
func (m *service) Read() (set *source.ChangeSet, err error) { func (m *service) Read() (set *source.ChangeSet, err error) {
@ -83,7 +83,7 @@ func NewSource(opts ...source.Option) source.Source {
opts: options, opts: options,
key: key, key: key,
path: path, path: path,
client: proto.NewService(addr, DefaultClient), client: proto.NewConfigService(addr, DefaultClient),
} }
return s return s

View File

@ -6,10 +6,10 @@ import (
) )
type watcher struct { type watcher struct {
stream proto.Service_WatchService stream proto.Config_WatchService
} }
func newWatcher(stream proto.Service_WatchService) (source.Watcher, error) { func newWatcher(stream proto.Config_WatchService) (source.Watcher, error) {
return &watcher{stream: stream}, nil return &watcher{stream: stream}, nil
} }

View File

@ -382,8 +382,23 @@ func (n *network) initNodes(startup bool) {
return return
} }
// strip self
var init []string
// our current address
advertised := n.server.Options().Advertise
for _, node := range nodes {
// skip self
if node == advertised {
continue
}
// add the node
init = append(init, node)
}
// initialize the tunnel // initialize the tunnel
log.Tracef("Network initialising nodes %+v\n", nodes) log.Tracef("Network initialising nodes %+v\n", init)
n.tunnel.Init( n.tunnel.Init(
tunnel.Nodes(nodes...), tunnel.Nodes(nodes...),
@ -640,6 +655,11 @@ func (n *network) processCtrlChan(listener tunnel.Listener) {
var events []*router.Event var events []*router.Event
for _, event := range pbRtrAdvert.Events { for _, event := range pbRtrAdvert.Events {
// for backwards compatibility reasons
if event == nil || event.Route == nil {
continue
}
// we know the advertising node is not the origin of the route // we know the advertising node is not the origin of the route
if pbRtrAdvert.Id != event.Route.Router { if pbRtrAdvert.Id != event.Route.Router {
// if the origin router is not the advertising node peer // if the origin router is not the advertising node peer

View File

@ -203,10 +203,6 @@ func (p *Proxy) cacheRoutes(service string) ([]router.Route, error) {
// lookup the routes in the router // lookup the routes in the router
results, err := p.Router.Lookup(router.QueryService(service)) results, err := p.Router.Lookup(router.QueryService(service))
if err != nil { if err != nil {
// check the status of the router
if status := p.Router.Status(); status.Code == router.Error {
return nil, status.Error
}
// otherwise return the error // otherwise return the error
return nil, err return nil, err
} }

View File

@ -1,6 +1,7 @@
package router package router
import ( import (
"errors"
"fmt" "fmt"
"math" "math"
"sort" "sort"
@ -16,8 +17,6 @@ import (
var ( var (
// AdvertiseEventsTick is time interval in which the router advertises route updates // AdvertiseEventsTick is time interval in which the router advertises route updates
AdvertiseEventsTick = 10 * time.Second AdvertiseEventsTick = 10 * time.Second
// AdvertiseTableTick is time interval in which router advertises all routes found in routing table
AdvertiseTableTick = 2 * time.Minute
// DefaultAdvertTTL is default advertisement TTL // DefaultAdvertTTL is default advertisement TTL
DefaultAdvertTTL = 2 * time.Minute DefaultAdvertTTL = 2 * time.Minute
// AdvertSuppress is advert suppression threshold // AdvertSuppress is advert suppression threshold
@ -37,14 +36,12 @@ var (
// router implements default router // router implements default router
type router struct { type router struct {
sync.RWMutex sync.RWMutex
options Options
status Status running bool
table *table table *table
exit chan struct{} options Options
errChan chan error exit chan bool
eventChan chan *Event eventChan chan *Event
advertWg *sync.WaitGroup
wg *sync.WaitGroup
// advert subscribers // advert subscribers
sub sync.RWMutex sub sync.RWMutex
@ -61,15 +58,9 @@ func newRouter(opts ...Option) Router {
o(&options) o(&options)
} }
// set initial status to Stopped
status := Status{Code: Stopped, Error: nil}
return &router{ return &router{
options: options, options: options,
status: status,
table: newTable(), table: newTable(),
advertWg: &sync.WaitGroup{},
wg: &sync.WaitGroup{},
subscribers: make(map[string]chan *Advert), subscribers: make(map[string]chan *Advert),
} }
} }
@ -125,7 +116,7 @@ func (r *router) manageRoute(route Route, action string) error {
// manageServiceRoutes applies action to all routes of the service. // manageServiceRoutes applies action to all routes of the service.
// It returns error of the action fails with error. // It returns error of the action fails with error.
func (r *router) manageServiceRoutes(service *registry.Service, action string) error { func (r *router) manageRoutes(service *registry.Service, action string) error {
// action is the routing table action // action is the routing table action
action = strings.ToLower(action) action = strings.ToLower(action)
@ -166,7 +157,7 @@ func (r *router) manageRegistryRoutes(reg registry.Registry, action string) erro
} }
// manage the routes for all returned services // manage the routes for all returned services
for _, srv := range srvs { for _, srv := range srvs {
if err := r.manageServiceRoutes(srv, action); err != nil { if err := r.manageRoutes(srv, action); err != nil {
return err return err
} }
} }
@ -181,42 +172,35 @@ func (r *router) watchRegistry(w registry.Watcher) error {
exit := make(chan bool) exit := make(chan bool)
defer func() { defer func() {
// close the exit channel when the go routine finishes
close(exit) close(exit)
}() }()
// wait in the background for the router to stop
// when the router stops, stop the watcher and exit
r.wg.Add(1)
go func() { go func() {
defer w.Stop() defer w.Stop()
defer r.wg.Done()
select { select {
case <-r.exit:
return
case <-exit: case <-exit:
return return
case <-r.exit:
return
} }
}() }()
var watchErr error
for { for {
res, err := w.Next() res, err := w.Next()
if err != nil { if err != nil {
if err != registry.ErrWatcherStopped { if err != registry.ErrWatcherStopped {
watchErr = err return err
} }
break break
} }
if err := r.manageServiceRoutes(res.Service, res.Action); err != nil { if err := r.manageRoutes(res.Service, res.Action); err != nil {
return err return err
} }
} }
return watchErr return nil
} }
// watchTable watches routing table entries and either adds or deletes locally registered service to/from network registry // watchTable watches routing table entries and either adds or deletes locally registered service to/from network registry
@ -225,16 +209,13 @@ func (r *router) watchTable(w Watcher) error {
exit := make(chan bool) exit := make(chan bool)
defer func() { defer func() {
// close the exit channel when the go routine finishes
close(exit) close(exit)
}() }()
// wait in the background for the router to stop // wait in the background for the router to stop
// when the router stops, stop the watcher and exit // when the router stops, stop the watcher and exit
r.wg.Add(1)
go func() { go func() {
defer w.Stop() defer w.Stop()
defer r.wg.Done()
select { select {
case <-r.exit: case <-r.exit:
@ -244,13 +225,11 @@ func (r *router) watchTable(w Watcher) error {
} }
}() }()
var watchErr error
for { for {
event, err := w.Next() event, err := w.Next()
if err != nil { if err != nil {
if err != ErrWatcherStopped { if err != ErrWatcherStopped {
watchErr = err return err
} }
break break
} }
@ -260,13 +239,11 @@ func (r *router) watchTable(w Watcher) error {
close(r.eventChan) close(r.eventChan)
return nil return nil
case r.eventChan <- event: case r.eventChan <- event:
// process event
} }
} }
// close event channel on error return nil
close(r.eventChan)
return watchErr
} }
// publishAdvert publishes router advert to advert channel // publishAdvert publishes router advert to advert channel
@ -292,36 +269,6 @@ func (r *router) publishAdvert(advType AdvertType, events []*Event) {
r.sub.RUnlock() r.sub.RUnlock()
} }
// advertiseTable advertises the whole routing table to the network
func (r *router) advertiseTable() error {
// create table advertisement ticker
ticker := time.NewTicker(AdvertiseTableTick)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// do full table flush
events, err := r.flushRouteEvents(Update)
if err != nil {
return fmt.Errorf("failed flushing routes: %s", err)
}
// advertise routes to subscribers
if len(events) > 0 {
log.Debugf("Router flushing table with %d events: %s", len(events), r.options.Id)
r.advertWg.Add(1)
go func() {
defer r.advertWg.Done()
r.publishAdvert(RouteUpdate, events)
}()
}
case <-r.exit:
return nil
}
}
}
// advert contains a route event to be advertised // advert contains a route event to be advertised
type advert struct { type advert struct {
// event received from routing table // event received from routing table
@ -392,17 +339,39 @@ func (r *router) advertiseEvents() error {
adverts := make(adverts) adverts := make(adverts)
// routing table watcher // routing table watcher
tableWatcher, err := r.Watch() w, err := r.Watch()
if err != nil { if err != nil {
return fmt.Errorf("failed creating routing table watcher: %v", err) return err
} }
defer w.Stop()
r.wg.Add(1)
go func() { go func() {
defer r.wg.Done() var err error
select {
case r.errChan <- r.watchTable(tableWatcher): for {
case <-r.exit: select {
case <-r.exit:
return
default:
if w == nil {
// routing table watcher
w, err = r.Watch()
if err != nil {
log.Logf("Error creating watcher: %v", err)
time.Sleep(time.Second)
continue
}
}
if err := r.watchTable(w); err != nil {
log.Logf("Error watching table: %v", err)
time.Sleep(time.Second)
}
// reset
w.Stop()
w = nil
}
} }
}() }()
@ -446,11 +415,7 @@ func (r *router) advertiseEvents() error {
// advertise events to subscribers // advertise events to subscribers
if len(events) > 0 { if len(events) > 0 {
log.Debugf("Router publishing %d events", len(events)) log.Debugf("Router publishing %d events", len(events))
r.advertWg.Add(1) go r.publishAdvert(RouteUpdate, events)
go func() {
defer r.advertWg.Done()
r.publishAdvert(RouteUpdate, events)
}()
} }
case e := <-r.eventChan: case e := <-r.eventChan:
// if event is nil, continue // if event is nil, continue
@ -502,65 +467,19 @@ func (r *router) advertiseEvents() error {
a.penalty += Penalty a.penalty += Penalty
log.Debugf("Router advert %d for route %s %s event penalty: %f", hash, a.event.Route.Service, a.event.Route.Address, a.penalty) log.Debugf("Router advert %d for route %s %s event penalty: %f", hash, a.event.Route.Service, a.event.Route.Address, a.penalty)
case <-r.exit: case <-r.exit:
// first wait for the advertiser to finish w.Stop()
r.advertWg.Wait()
return nil return nil
} }
} }
} }
// close closes exit channels // drain all the events, only called on Stop
func (r *router) close() { func (r *router) drain() {
log.Debugf("Router closing remaining channels") for {
// drain the advertise channel only if advertising select {
if r.status.Code == Advertising { case <-r.eventChan:
// drain the event channel default:
for range r.eventChan { return
}
// close advert subscribers
for id, sub := range r.subscribers {
select {
case <-sub:
default:
}
// close the channel
close(sub)
// delete the subscriber
r.sub.Lock()
delete(r.subscribers, id)
r.sub.Unlock()
}
}
// mark the router as Stopped and set its Error to nil
r.status = Status{Code: Stopped, Error: nil}
}
// watchErrors watches router errors and takes appropriate actions
func (r *router) watchErrors() {
var err error
select {
case <-r.exit:
return
case err = <-r.errChan:
}
r.Lock()
defer r.Unlock()
// if the router is not stopped, stop it
if r.status.Code != Stopped {
// notify all goroutines to finish
close(r.exit)
// close all the channels
r.close()
// set the status error
if err != nil {
r.status.Error = err
} }
} }
} }
@ -570,16 +489,13 @@ func (r *router) Start() error {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
// only start if we're stopped if r.running {
if r.status.Code != Stopped {
return nil return nil
} }
// add all local service routes into the routing table // add all local service routes into the routing table
if err := r.manageRegistryRoutes(r.options.Registry, "create"); err != nil { if err := r.manageRegistryRoutes(r.options.Registry, "create"); err != nil {
e := fmt.Errorf("failed adding registry routes: %s", err) return fmt.Errorf("failed adding registry routes: %s", err)
r.status = Status{Code: Error, Error: e}
return e
} }
// add default gateway into routing table // add default gateway into routing table
@ -595,42 +511,49 @@ func (r *router) Start() error {
Metric: DefaultLocalMetric, Metric: DefaultLocalMetric,
} }
if err := r.table.Create(route); err != nil { if err := r.table.Create(route); err != nil {
e := fmt.Errorf("failed adding default gateway route: %s", err) return fmt.Errorf("failed adding default gateway route: %s", err)
r.status = Status{Code: Error, Error: e}
return e
} }
} }
// create error and exit channels // create error and exit channels
r.errChan = make(chan error, 1) r.exit = make(chan bool)
r.exit = make(chan struct{})
// registry watcher // registry watcher
regWatcher, err := r.options.Registry.Watch() w, err := r.options.Registry.Watch()
if err != nil { if err != nil {
e := fmt.Errorf("failed creating registry watcher: %v", err) return fmt.Errorf("failed creating registry watcher: %v", err)
r.status = Status{Code: Error, Error: e}
return e
} }
r.wg.Add(1)
go func() { go func() {
defer r.wg.Done() var err error
select {
case r.errChan <- r.watchRegistry(regWatcher): for {
case <-r.exit: select {
case <-r.exit:
w.Stop()
return
default:
if w == nil {
w, err = r.options.Registry.Watch()
if err != nil {
log.Logf("failed creating registry watcher: %v", err)
time.Sleep(time.Second)
continue
}
}
if err := r.watchRegistry(w); err != nil {
log.Logf("Error watching the registry: %v", err)
time.Sleep(time.Second)
}
w.Stop()
w = nil
}
} }
}() }()
// watch for errors and cleanup r.running = true
r.wg.Add(1)
go func() {
defer r.wg.Done()
r.watchErrors()
}()
// mark router as Running
r.status = Status{Code: Running, Error: nil}
return nil return nil
} }
@ -642,61 +565,46 @@ func (r *router) Advertise() (<-chan *Advert, error) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
switch r.status.Code { if !r.running {
case Advertising: return nil, errors.New("not running")
advertChan := make(chan *Advert, 128)
r.subscribers[uuid.New().String()] = advertChan
return advertChan, nil
case Running:
// list all the routes and pack them into even slice to advertise
events, err := r.flushRouteEvents(Create)
if err != nil {
return nil, fmt.Errorf("failed to flush routes: %s", err)
}
// create event channels
r.eventChan = make(chan *Event)
// create advert channel
advertChan := make(chan *Advert, 128)
r.subscribers[uuid.New().String()] = advertChan
// advertise your presence
r.advertWg.Add(1)
go func() {
defer r.advertWg.Done()
r.publishAdvert(Announce, events)
}()
r.wg.Add(1)
go func() {
defer r.wg.Done()
select {
case r.errChan <- r.advertiseEvents():
case <-r.exit:
}
}()
r.advertWg.Add(1)
go func() {
defer r.advertWg.Done()
// advertise the whole routing table
select {
case r.errChan <- r.advertiseTable():
case <-r.exit:
}
}()
// mark router as Running and set its Error to nil
r.status = Status{Code: Advertising, Error: nil}
log.Debugf("Router starting to advertise")
return advertChan, nil
case Stopped:
return nil, fmt.Errorf("not running")
} }
return nil, fmt.Errorf("error: %s", r.status.Error) // already advertising
if r.eventChan != nil {
advertChan := make(chan *Advert, 128)
r.subscribers[uuid.New().String()] = advertChan
return advertChan, nil
}
// list all the routes and pack them into even slice to advertise
events, err := r.flushRouteEvents(Create)
if err != nil {
return nil, fmt.Errorf("failed to flush routes: %s", err)
}
// create event channels
r.eventChan = make(chan *Event)
// create advert channel
advertChan := make(chan *Advert, 128)
r.subscribers[uuid.New().String()] = advertChan
// advertise your presence
go r.publishAdvert(Announce, events)
go func() {
select {
case <-r.exit:
return
default:
if err := r.advertiseEvents(); err != nil {
log.Logf("Error adveritising events: %v", err)
}
}
}()
return advertChan, nil
} }
// Process updates the routing table using the advertised values // Process updates the routing table using the advertised values
@ -774,48 +682,39 @@ func (r *router) Watch(opts ...WatchOption) (Watcher, error) {
return r.table.Watch(opts...) return r.table.Watch(opts...)
} }
// Status returns router status
func (r *router) Status() Status {
r.RLock()
defer r.RUnlock()
// make a copy of the status
status := r.status
return status
}
// Stop stops the router // Stop stops the router
func (r *router) Stop() error { func (r *router) Stop() error {
r.Lock() r.Lock()
defer r.Unlock()
log.Debugf("Router shutting down") select {
case <-r.exit:
switch r.status.Code { return nil
case Stopped, Error: default:
r.Unlock()
return r.status.Error
case Running, Advertising:
// notify all goroutines to finish
close(r.exit) close(r.exit)
// close all the channels // extract the events
// NOTE: close marks the router status as Stopped r.drain()
r.close()
// close advert subscribers
for id, sub := range r.subscribers {
// close the channel
close(sub)
// delete the subscriber
r.sub.Lock()
delete(r.subscribers, id)
r.sub.Unlock()
}
} }
r.Unlock()
log.Tracef("Router waiting for all goroutines to finish") // remove event chan
r.eventChan = nil
// wait for all goroutines to finish
r.wg.Wait()
log.Debugf("Router successfully stopped")
return nil return nil
} }
// String prints debugging information about router // String prints debugging information about router
func (r *router) String() string { func (r *router) String() string {
return "memory" return "registry"
} }

View File

@ -38,7 +38,6 @@ func TestRouterAdvertise(t *testing.T) {
// lower the advertise interval // lower the advertise interval
AdvertiseEventsTick = 500 * time.Millisecond AdvertiseEventsTick = 500 * time.Millisecond
AdvertiseTableTick = 1 * time.Second
if err := r.Start(); err != nil { if err := r.Start(); err != nil {
t.Errorf("failed to start router: %v", err) t.Errorf("failed to start router: %v", err)

View File

@ -34,8 +34,6 @@ type Router interface {
Watch(opts ...WatchOption) (Watcher, error) Watch(opts ...WatchOption) (Watcher, error)
// Start starts the router // Start starts the router
Start() error Start() error
// Status returns router status
Status() Status
// Stop stops the router // Stop stops the router
Stop() error Stop() error
// Returns the router implementation // Returns the router implementation
@ -73,34 +71,6 @@ const (
Error Error
) )
func (s StatusCode) String() string {
switch s {
case Running:
return "running"
case Advertising:
return "advertising"
case Stopped:
return "stopped"
case Error:
return "error"
default:
return "unknown"
}
}
// Status is router status
type Status struct {
// Code defines router status
Code StatusCode
// Error contains error description
Error error
}
// String returns human readable status
func (s Status) String() string {
return s.Code.String()
}
// AdvertType is route advertisement type // AdvertType is route advertisement type
type AdvertType int type AdvertType int

View File

@ -1,5 +1,5 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// source: router.proto // source: micro/go-micro/router/service/proto/router.proto
package go_micro_router package go_micro_router
@ -43,7 +43,7 @@ func (x AdvertType) String() string {
} }
func (AdvertType) EnumDescriptor() ([]byte, []int) { func (AdvertType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{0} return fileDescriptor_c2b04f200fb3e806, []int{0}
} }
// EventType defines the type of event // EventType defines the type of event
@ -72,7 +72,7 @@ func (x EventType) String() string {
} }
func (EventType) EnumDescriptor() ([]byte, []int) { func (EventType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{1} return fileDescriptor_c2b04f200fb3e806, []int{1}
} }
// Empty request // Empty request
@ -86,7 +86,7 @@ func (m *Request) Reset() { *m = Request{} }
func (m *Request) String() string { return proto.CompactTextString(m) } func (m *Request) String() string { return proto.CompactTextString(m) }
func (*Request) ProtoMessage() {} func (*Request) ProtoMessage() {}
func (*Request) Descriptor() ([]byte, []int) { func (*Request) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{0} return fileDescriptor_c2b04f200fb3e806, []int{0}
} }
func (m *Request) XXX_Unmarshal(b []byte) error { func (m *Request) XXX_Unmarshal(b []byte) error {
@ -118,7 +118,7 @@ func (m *Response) Reset() { *m = Response{} }
func (m *Response) String() string { return proto.CompactTextString(m) } func (m *Response) String() string { return proto.CompactTextString(m) }
func (*Response) ProtoMessage() {} func (*Response) ProtoMessage() {}
func (*Response) Descriptor() ([]byte, []int) { func (*Response) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{1} return fileDescriptor_c2b04f200fb3e806, []int{1}
} }
func (m *Response) XXX_Unmarshal(b []byte) error { func (m *Response) XXX_Unmarshal(b []byte) error {
@ -151,7 +151,7 @@ func (m *ListResponse) Reset() { *m = ListResponse{} }
func (m *ListResponse) String() string { return proto.CompactTextString(m) } func (m *ListResponse) String() string { return proto.CompactTextString(m) }
func (*ListResponse) ProtoMessage() {} func (*ListResponse) ProtoMessage() {}
func (*ListResponse) Descriptor() ([]byte, []int) { func (*ListResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{2} return fileDescriptor_c2b04f200fb3e806, []int{2}
} }
func (m *ListResponse) XXX_Unmarshal(b []byte) error { func (m *ListResponse) XXX_Unmarshal(b []byte) error {
@ -191,7 +191,7 @@ func (m *LookupRequest) Reset() { *m = LookupRequest{} }
func (m *LookupRequest) String() string { return proto.CompactTextString(m) } func (m *LookupRequest) String() string { return proto.CompactTextString(m) }
func (*LookupRequest) ProtoMessage() {} func (*LookupRequest) ProtoMessage() {}
func (*LookupRequest) Descriptor() ([]byte, []int) { func (*LookupRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{3} return fileDescriptor_c2b04f200fb3e806, []int{3}
} }
func (m *LookupRequest) XXX_Unmarshal(b []byte) error { func (m *LookupRequest) XXX_Unmarshal(b []byte) error {
@ -231,7 +231,7 @@ func (m *LookupResponse) Reset() { *m = LookupResponse{} }
func (m *LookupResponse) String() string { return proto.CompactTextString(m) } func (m *LookupResponse) String() string { return proto.CompactTextString(m) }
func (*LookupResponse) ProtoMessage() {} func (*LookupResponse) ProtoMessage() {}
func (*LookupResponse) Descriptor() ([]byte, []int) { func (*LookupResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{4} return fileDescriptor_c2b04f200fb3e806, []int{4}
} }
func (m *LookupResponse) XXX_Unmarshal(b []byte) error { func (m *LookupResponse) XXX_Unmarshal(b []byte) error {
@ -271,7 +271,7 @@ func (m *QueryRequest) Reset() { *m = QueryRequest{} }
func (m *QueryRequest) String() string { return proto.CompactTextString(m) } func (m *QueryRequest) String() string { return proto.CompactTextString(m) }
func (*QueryRequest) ProtoMessage() {} func (*QueryRequest) ProtoMessage() {}
func (*QueryRequest) Descriptor() ([]byte, []int) { func (*QueryRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{5} return fileDescriptor_c2b04f200fb3e806, []int{5}
} }
func (m *QueryRequest) XXX_Unmarshal(b []byte) error { func (m *QueryRequest) XXX_Unmarshal(b []byte) error {
@ -311,7 +311,7 @@ func (m *QueryResponse) Reset() { *m = QueryResponse{} }
func (m *QueryResponse) String() string { return proto.CompactTextString(m) } func (m *QueryResponse) String() string { return proto.CompactTextString(m) }
func (*QueryResponse) ProtoMessage() {} func (*QueryResponse) ProtoMessage() {}
func (*QueryResponse) Descriptor() ([]byte, []int) { func (*QueryResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{6} return fileDescriptor_c2b04f200fb3e806, []int{6}
} }
func (m *QueryResponse) XXX_Unmarshal(b []byte) error { func (m *QueryResponse) XXX_Unmarshal(b []byte) error {
@ -350,7 +350,7 @@ func (m *WatchRequest) Reset() { *m = WatchRequest{} }
func (m *WatchRequest) String() string { return proto.CompactTextString(m) } func (m *WatchRequest) String() string { return proto.CompactTextString(m) }
func (*WatchRequest) ProtoMessage() {} func (*WatchRequest) ProtoMessage() {}
func (*WatchRequest) Descriptor() ([]byte, []int) { func (*WatchRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{7} return fileDescriptor_c2b04f200fb3e806, []int{7}
} }
func (m *WatchRequest) XXX_Unmarshal(b []byte) error { func (m *WatchRequest) XXX_Unmarshal(b []byte) error {
@ -392,7 +392,7 @@ func (m *Advert) Reset() { *m = Advert{} }
func (m *Advert) String() string { return proto.CompactTextString(m) } func (m *Advert) String() string { return proto.CompactTextString(m) }
func (*Advert) ProtoMessage() {} func (*Advert) ProtoMessage() {}
func (*Advert) Descriptor() ([]byte, []int) { func (*Advert) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{8} return fileDescriptor_c2b04f200fb3e806, []int{8}
} }
func (m *Advert) XXX_Unmarshal(b []byte) error { func (m *Advert) XXX_Unmarshal(b []byte) error {
@ -459,7 +459,7 @@ func (m *ProcessResponse) Reset() { *m = ProcessResponse{} }
func (m *ProcessResponse) String() string { return proto.CompactTextString(m) } func (m *ProcessResponse) String() string { return proto.CompactTextString(m) }
func (*ProcessResponse) ProtoMessage() {} func (*ProcessResponse) ProtoMessage() {}
func (*ProcessResponse) Descriptor() ([]byte, []int) { func (*ProcessResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{9} return fileDescriptor_c2b04f200fb3e806, []int{9}
} }
func (m *ProcessResponse) XXX_Unmarshal(b []byte) error { func (m *ProcessResponse) XXX_Unmarshal(b []byte) error {
@ -491,7 +491,7 @@ func (m *CreateResponse) Reset() { *m = CreateResponse{} }
func (m *CreateResponse) String() string { return proto.CompactTextString(m) } func (m *CreateResponse) String() string { return proto.CompactTextString(m) }
func (*CreateResponse) ProtoMessage() {} func (*CreateResponse) ProtoMessage() {}
func (*CreateResponse) Descriptor() ([]byte, []int) { func (*CreateResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{10} return fileDescriptor_c2b04f200fb3e806, []int{10}
} }
func (m *CreateResponse) XXX_Unmarshal(b []byte) error { func (m *CreateResponse) XXX_Unmarshal(b []byte) error {
@ -523,7 +523,7 @@ func (m *DeleteResponse) Reset() { *m = DeleteResponse{} }
func (m *DeleteResponse) String() string { return proto.CompactTextString(m) } func (m *DeleteResponse) String() string { return proto.CompactTextString(m) }
func (*DeleteResponse) ProtoMessage() {} func (*DeleteResponse) ProtoMessage() {}
func (*DeleteResponse) Descriptor() ([]byte, []int) { func (*DeleteResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{11} return fileDescriptor_c2b04f200fb3e806, []int{11}
} }
func (m *DeleteResponse) XXX_Unmarshal(b []byte) error { func (m *DeleteResponse) XXX_Unmarshal(b []byte) error {
@ -555,7 +555,7 @@ func (m *UpdateResponse) Reset() { *m = UpdateResponse{} }
func (m *UpdateResponse) String() string { return proto.CompactTextString(m) } func (m *UpdateResponse) String() string { return proto.CompactTextString(m) }
func (*UpdateResponse) ProtoMessage() {} func (*UpdateResponse) ProtoMessage() {}
func (*UpdateResponse) Descriptor() ([]byte, []int) { func (*UpdateResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{12} return fileDescriptor_c2b04f200fb3e806, []int{12}
} }
func (m *UpdateResponse) XXX_Unmarshal(b []byte) error { func (m *UpdateResponse) XXX_Unmarshal(b []byte) error {
@ -578,12 +578,14 @@ var xxx_messageInfo_UpdateResponse proto.InternalMessageInfo
// Event is routing table event // Event is routing table event
type Event struct { type Event struct {
// the unique event id
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
// type of event // type of event
Type EventType `protobuf:"varint,1,opt,name=type,proto3,enum=go.micro.router.EventType" json:"type,omitempty"` Type EventType `protobuf:"varint,2,opt,name=type,proto3,enum=go.micro.router.EventType" json:"type,omitempty"`
// unix timestamp of event // unix timestamp of event
Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` Timestamp int64 `protobuf:"varint,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
// service route // service route
Route *Route `protobuf:"bytes,3,opt,name=route,proto3" json:"route,omitempty"` Route *Route `protobuf:"bytes,4,opt,name=route,proto3" json:"route,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
@ -593,7 +595,7 @@ func (m *Event) Reset() { *m = Event{} }
func (m *Event) String() string { return proto.CompactTextString(m) } func (m *Event) String() string { return proto.CompactTextString(m) }
func (*Event) ProtoMessage() {} func (*Event) ProtoMessage() {}
func (*Event) Descriptor() ([]byte, []int) { func (*Event) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{13} return fileDescriptor_c2b04f200fb3e806, []int{13}
} }
func (m *Event) XXX_Unmarshal(b []byte) error { func (m *Event) XXX_Unmarshal(b []byte) error {
@ -614,6 +616,13 @@ func (m *Event) XXX_DiscardUnknown() {
var xxx_messageInfo_Event proto.InternalMessageInfo var xxx_messageInfo_Event proto.InternalMessageInfo
func (m *Event) GetId() string {
if m != nil {
return m.Id
}
return ""
}
func (m *Event) GetType() EventType { func (m *Event) GetType() EventType {
if m != nil { if m != nil {
return m.Type return m.Type
@ -652,7 +661,7 @@ func (m *Query) Reset() { *m = Query{} }
func (m *Query) String() string { return proto.CompactTextString(m) } func (m *Query) String() string { return proto.CompactTextString(m) }
func (*Query) ProtoMessage() {} func (*Query) ProtoMessage() {}
func (*Query) Descriptor() ([]byte, []int) { func (*Query) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{14} return fileDescriptor_c2b04f200fb3e806, []int{14}
} }
func (m *Query) XXX_Unmarshal(b []byte) error { func (m *Query) XXX_Unmarshal(b []byte) error {
@ -719,7 +728,7 @@ func (m *Route) Reset() { *m = Route{} }
func (m *Route) String() string { return proto.CompactTextString(m) } func (m *Route) String() string { return proto.CompactTextString(m) }
func (*Route) ProtoMessage() {} func (*Route) ProtoMessage() {}
func (*Route) Descriptor() ([]byte, []int) { func (*Route) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{15} return fileDescriptor_c2b04f200fb3e806, []int{15}
} }
func (m *Route) XXX_Unmarshal(b []byte) error { func (m *Route) XXX_Unmarshal(b []byte) error {
@ -789,92 +798,6 @@ func (m *Route) GetMetric() int64 {
return 0 return 0
} }
type Status struct {
Code string `protobuf:"bytes,1,opt,name=code,proto3" json:"code,omitempty"`
Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Status) Reset() { *m = Status{} }
func (m *Status) String() string { return proto.CompactTextString(m) }
func (*Status) ProtoMessage() {}
func (*Status) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{16}
}
func (m *Status) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Status.Unmarshal(m, b)
}
func (m *Status) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Status.Marshal(b, m, deterministic)
}
func (m *Status) XXX_Merge(src proto.Message) {
xxx_messageInfo_Status.Merge(m, src)
}
func (m *Status) XXX_Size() int {
return xxx_messageInfo_Status.Size(m)
}
func (m *Status) XXX_DiscardUnknown() {
xxx_messageInfo_Status.DiscardUnknown(m)
}
var xxx_messageInfo_Status proto.InternalMessageInfo
func (m *Status) GetCode() string {
if m != nil {
return m.Code
}
return ""
}
func (m *Status) GetError() string {
if m != nil {
return m.Error
}
return ""
}
type StatusResponse struct {
Status *Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *StatusResponse) Reset() { *m = StatusResponse{} }
func (m *StatusResponse) String() string { return proto.CompactTextString(m) }
func (*StatusResponse) ProtoMessage() {}
func (*StatusResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{17}
}
func (m *StatusResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_StatusResponse.Unmarshal(m, b)
}
func (m *StatusResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_StatusResponse.Marshal(b, m, deterministic)
}
func (m *StatusResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_StatusResponse.Merge(m, src)
}
func (m *StatusResponse) XXX_Size() int {
return xxx_messageInfo_StatusResponse.Size(m)
}
func (m *StatusResponse) XXX_DiscardUnknown() {
xxx_messageInfo_StatusResponse.DiscardUnknown(m)
}
var xxx_messageInfo_StatusResponse proto.InternalMessageInfo
func (m *StatusResponse) GetStatus() *Status {
if m != nil {
return m.Status
}
return nil
}
func init() { func init() {
proto.RegisterEnum("go.micro.router.AdvertType", AdvertType_name, AdvertType_value) proto.RegisterEnum("go.micro.router.AdvertType", AdvertType_name, AdvertType_value)
proto.RegisterEnum("go.micro.router.EventType", EventType_name, EventType_value) proto.RegisterEnum("go.micro.router.EventType", EventType_name, EventType_value)
@ -894,56 +817,53 @@ func init() {
proto.RegisterType((*Event)(nil), "go.micro.router.Event") proto.RegisterType((*Event)(nil), "go.micro.router.Event")
proto.RegisterType((*Query)(nil), "go.micro.router.Query") proto.RegisterType((*Query)(nil), "go.micro.router.Query")
proto.RegisterType((*Route)(nil), "go.micro.router.Route") proto.RegisterType((*Route)(nil), "go.micro.router.Route")
proto.RegisterType((*Status)(nil), "go.micro.router.Status")
proto.RegisterType((*StatusResponse)(nil), "go.micro.router.StatusResponse")
} }
func init() { proto.RegisterFile("router.proto", fileDescriptor_367072455c71aedc) } func init() {
proto.RegisterFile("micro/go-micro/router/service/proto/router.proto", fileDescriptor_c2b04f200fb3e806)
var fileDescriptor_367072455c71aedc = []byte{ }
// 693 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x55, 0x4f, 0x4f, 0xdb, 0x4a, var fileDescriptor_c2b04f200fb3e806 = []byte{
0x10, 0xb7, 0x93, 0xd8, 0x79, 0x99, 0x17, 0x8c, 0xdf, 0xe8, 0x09, 0xac, 0xb4, 0x40, 0xe4, 0x13, // 645 bytes of a gzipped FileDescriptorProto
0x42, 0xc8, 0x54, 0xe9, 0xb5, 0xff, 0x02, 0xa5, 0xaa, 0x54, 0x0e, 0xad, 0x0b, 0xea, 0xd9, 0xd8, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x55, 0xcd, 0x4e, 0xdb, 0x40,
0x23, 0x6a, 0x91, 0xd8, 0x66, 0x77, 0x03, 0xca, 0xb9, 0x9f, 0xa6, 0xe7, 0x7e, 0xa4, 0x5e, 0xfb, 0x10, 0xb6, 0x93, 0xd8, 0xa9, 0xa7, 0x21, 0xb8, 0x73, 0xa0, 0x56, 0x5a, 0x68, 0xe4, 0x13, 0x42,
0x21, 0x2a, 0xef, 0xae, 0x43, 0x88, 0x31, 0x12, 0x9c, 0xbc, 0xf3, 0xef, 0x37, 0xb3, 0x3b, 0xbf, 0xd4, 0x41, 0xe9, 0xb5, 0x7f, 0x40, 0x5b, 0x55, 0x2a, 0x87, 0xd6, 0x02, 0xf5, 0x6c, 0x92, 0x11,
0x19, 0x43, 0x9f, 0xe5, 0x33, 0x41, 0x2c, 0x28, 0x58, 0x2e, 0x72, 0x5c, 0xbf, 0xc8, 0x83, 0x69, 0xb5, 0x48, 0x6c, 0xb3, 0xbb, 0x01, 0xe5, 0x39, 0xfa, 0x0c, 0x3d, 0xf4, 0xdc, 0x47, 0xea, 0x8b,
0x1a, 0xb3, 0x3c, 0x50, 0x6a, 0xbf, 0x07, 0xdd, 0x90, 0xae, 0x66, 0xc4, 0x85, 0x0f, 0xf0, 0x4f, 0x54, 0xfb, 0x63, 0x48, 0x62, 0x8c, 0x0a, 0x27, 0xef, 0xfc, 0x7d, 0x33, 0x3b, 0x33, 0x9f, 0x17,
0x48, 0xbc, 0xc8, 0x33, 0x4e, 0xfe, 0x1b, 0xe8, 0x9f, 0xa4, 0x5c, 0x54, 0x32, 0x06, 0x60, 0xcb, 0xf6, 0xa6, 0xe9, 0x88, 0xe5, 0x83, 0xb3, 0xfc, 0xa5, 0x3e, 0xb0, 0x7c, 0x26, 0x88, 0x0d, 0x38,
0x00, 0xee, 0x99, 0xc3, 0xf6, 0xee, 0xbf, 0xa3, 0x8d, 0x60, 0x05, 0x28, 0x08, 0xcb, 0x4f, 0xa8, 0xb1, 0xcb, 0x74, 0x44, 0x83, 0x82, 0xe5, 0xa2, 0x54, 0x46, 0x4a, 0xc0, 0xf5, 0xb3, 0x3c, 0x52,
0xbd, 0xfc, 0xd7, 0xb0, 0x76, 0x92, 0xe7, 0x97, 0xb3, 0x42, 0x83, 0xe3, 0x3e, 0x58, 0x57, 0x33, 0xbe, 0x91, 0x56, 0x87, 0x1e, 0xb4, 0x63, 0xba, 0x98, 0x11, 0x17, 0x21, 0xc0, 0xa3, 0x98, 0x78,
0x62, 0x73, 0xcf, 0x1c, 0x9a, 0xf7, 0xc6, 0x7f, 0x29, 0xad, 0xa1, 0x72, 0xf2, 0xdf, 0x81, 0x53, 0x91, 0x67, 0x9c, 0xc2, 0xb7, 0xd0, 0x39, 0x4a, 0xb9, 0x28, 0x65, 0x8c, 0xc0, 0x55, 0x01, 0x3c,
0x85, 0x3f, 0xb1, 0x80, 0x57, 0xd0, 0x57, 0x88, 0x4f, 0xca, 0xff, 0x16, 0xd6, 0x74, 0xf4, 0x13, 0xb0, 0xfb, 0xcd, 0xed, 0xc7, 0xc3, 0x8d, 0x68, 0x05, 0x28, 0x8a, 0xe5, 0x27, 0x36, 0x5e, 0xe1,
0xd3, 0x3b, 0xd0, 0xff, 0x16, 0x89, 0xf8, 0x7b, 0xf5, 0xb6, 0x3f, 0x4d, 0xb0, 0xc7, 0xc9, 0x35, 0x1b, 0x58, 0x3b, 0xca, 0xf3, 0xf3, 0x59, 0x61, 0xc0, 0x71, 0x17, 0x9c, 0x8b, 0x19, 0xb1, 0x79,
0x31, 0x81, 0x0e, 0xb4, 0xd2, 0x44, 0x96, 0xd1, 0x0b, 0x5b, 0x69, 0x82, 0x07, 0xd0, 0x11, 0xf3, 0x60, 0xf7, 0xed, 0x5b, 0xe3, 0xbf, 0x49, 0x6b, 0xac, 0x9d, 0xc2, 0xf7, 0xd0, 0x2d, 0xc3, 0x1f,
0x82, 0xbc, 0xd6, 0xd0, 0xdc, 0x75, 0x46, 0xcf, 0x6a, 0xc0, 0x2a, 0xec, 0x74, 0x5e, 0x50, 0x28, 0x58, 0xc0, 0x6b, 0xe8, 0x68, 0xc4, 0x07, 0xe5, 0x7f, 0x07, 0x6b, 0x26, 0xfa, 0x81, 0xe9, 0xbb,
0x1d, 0xf1, 0x39, 0xf4, 0x44, 0x3a, 0x25, 0x2e, 0xa2, 0x69, 0xe1, 0xb5, 0x87, 0xe6, 0x6e, 0x3b, 0xd0, 0xf9, 0x9e, 0x88, 0xd1, 0x8f, 0xb2, 0xb7, 0xbf, 0x6d, 0x70, 0xf7, 0xc7, 0x97, 0xc4, 0x04,
0xbc, 0x55, 0xa0, 0x0b, 0x6d, 0x21, 0x26, 0x5e, 0x47, 0xea, 0xcb, 0x63, 0x59, 0x3b, 0x5d, 0x53, 0x76, 0xa1, 0x91, 0x8e, 0x55, 0x19, 0x5e, 0xdc, 0x48, 0xc7, 0x38, 0x80, 0x96, 0x98, 0x17, 0x14,
0x26, 0xb8, 0x67, 0x35, 0xd4, 0x7e, 0x5c, 0x9a, 0x43, 0xed, 0xe5, 0xff, 0x07, 0xeb, 0x9f, 0x59, 0x34, 0xfa, 0xf6, 0x76, 0x77, 0xf8, 0xac, 0x02, 0xac, 0xc3, 0x8e, 0xe7, 0x05, 0xc5, 0xca, 0x11,
0x1e, 0x13, 0xe7, 0x0b, 0x3a, 0xb8, 0xe0, 0x1c, 0x31, 0x8a, 0x04, 0x2d, 0x6b, 0xde, 0xd3, 0x84, 0x9f, 0x83, 0x27, 0xd2, 0x29, 0x71, 0x91, 0x4c, 0x8b, 0xa0, 0xd9, 0xb7, 0xb7, 0x9b, 0xf1, 0x8d,
0xee, 0x6a, 0xce, 0x8a, 0x64, 0xd9, 0xe7, 0x87, 0x09, 0x96, 0x84, 0xc6, 0x40, 0xdf, 0xd1, 0x94, 0x02, 0x7d, 0x68, 0x0a, 0x31, 0x09, 0x5a, 0x4a, 0x2f, 0x8f, 0xb2, 0x76, 0xba, 0xa4, 0x4c, 0xf0,
0x77, 0x1c, 0xdc, 0x5f, 0x40, 0xd3, 0x15, 0x5b, 0xab, 0x57, 0xdc, 0x07, 0x4b, 0xc6, 0xc9, 0xcb, 0xc0, 0xa9, 0xa9, 0xfd, 0xa3, 0x34, 0xc7, 0xc6, 0x2b, 0x7c, 0x02, 0xeb, 0x5f, 0x59, 0x3e, 0x22,
0x37, 0xf7, 0x42, 0x39, 0xf9, 0x67, 0x60, 0xc9, 0x5e, 0xa2, 0x07, 0x5d, 0x4e, 0xec, 0x3a, 0x8d, 0xce, 0xaf, 0xd7, 0xc1, 0x87, 0xee, 0x21, 0xa3, 0x44, 0xd0, 0xa2, 0xe6, 0x03, 0x4d, 0x68, 0x59,
0x49, 0xbf, 0x7e, 0x25, 0x96, 0x96, 0x8b, 0x48, 0xd0, 0x4d, 0x34, 0x97, 0xc9, 0x7a, 0x61, 0x25, 0x73, 0x52, 0x8c, 0x17, 0x7d, 0x7e, 0xda, 0xe0, 0x28, 0xe8, 0xca, 0x9d, 0xa3, 0xa5, 0x3b, 0xf7,
0x96, 0x96, 0x8c, 0xc4, 0x4d, 0xce, 0x2e, 0x65, 0xb2, 0x5e, 0x58, 0x89, 0xfe, 0x2f, 0x13, 0x2c, 0x6e, 0x2f, 0xe8, 0xbf, 0xaf, 0xbc, 0x0b, 0x8e, 0x8a, 0x53, 0x97, 0xae, 0x9f, 0x8d, 0x76, 0x0a,
0x99, 0xe7, 0x61, 0xdc, 0x28, 0x49, 0x18, 0x71, 0x5e, 0xe1, 0x6a, 0x71, 0x39, 0x63, 0xbb, 0x31, 0x4f, 0xc0, 0x51, 0xb3, 0xc5, 0x00, 0xda, 0x86, 0x29, 0xa6, 0xb2, 0x52, 0x94, 0x96, 0xb3, 0x44,
0x63, 0xe7, 0x4e, 0x46, 0xdc, 0xd0, 0x1c, 0x64, 0x9e, 0x25, 0x0d, 0x5a, 0x42, 0x84, 0xce, 0x24, 0xd0, 0x55, 0x32, 0x57, 0x15, 0x7a, 0x71, 0x29, 0x4a, 0x4b, 0x46, 0xe2, 0x2a, 0x67, 0xe7, 0xaa,
0xcd, 0x2e, 0x3d, 0x5b, 0x6a, 0xe5, 0xb9, 0xf4, 0x9d, 0x92, 0x60, 0x69, 0xec, 0x75, 0xe5, 0xeb, 0x0c, 0x2f, 0x2e, 0xc5, 0xf0, 0x8f, 0x0d, 0x8e, 0xca, 0x73, 0x37, 0x6e, 0x32, 0x1e, 0x33, 0xe2,
0x69, 0xc9, 0x1f, 0x81, 0xfd, 0x55, 0x44, 0x62, 0xc6, 0xcb, 0xa8, 0x38, 0x4f, 0xaa, 0x92, 0xe5, 0xbc, 0xc4, 0x35, 0xe2, 0x62, 0xc6, 0x66, 0x6d, 0xc6, 0xd6, 0x52, 0x46, 0xdc, 0x30, 0x3b, 0xc9,
0x19, 0xff, 0x07, 0x8b, 0x18, 0xcb, 0x99, 0xae, 0x56, 0x09, 0xfe, 0x18, 0x1c, 0x15, 0xb3, 0x98, 0x02, 0x47, 0x19, 0x8c, 0x84, 0x08, 0xad, 0x49, 0x9a, 0x9d, 0x07, 0xae, 0xd2, 0xaa, 0xb3, 0xf4,
0x86, 0x03, 0xb0, 0xb9, 0xd4, 0xe8, 0x69, 0xda, 0xac, 0x75, 0x40, 0x07, 0x68, 0xb7, 0xbd, 0x11, 0x9d, 0x92, 0x60, 0xe9, 0x28, 0x68, 0xab, 0xee, 0x19, 0x69, 0x67, 0x08, 0x70, 0xb3, 0x5f, 0x88,
0xc0, 0x2d, 0x8d, 0x11, 0xc1, 0x51, 0xd2, 0x38, 0xcb, 0xf2, 0x59, 0x16, 0x93, 0x6b, 0xa0, 0x0b, 0xd0, 0xd5, 0xd2, 0x7e, 0x96, 0xe5, 0xb3, 0x6c, 0x44, 0xbe, 0x85, 0x3e, 0x74, 0xb4, 0x4e, 0x0f,
0x7d, 0xa5, 0x53, 0x1c, 0x72, 0xcd, 0xbd, 0x03, 0xe8, 0x2d, 0x68, 0x81, 0x00, 0xb6, 0x22, 0xa0, 0xd7, 0xb7, 0x77, 0x06, 0xe0, 0x5d, 0xcf, 0x07, 0x01, 0x5c, 0xbd, 0x19, 0xbe, 0x25, 0xcf, 0x7a,
0x6b, 0x94, 0x67, 0x45, 0x3d, 0xd7, 0x2c, 0xcf, 0x3a, 0xa0, 0x35, 0xfa, 0xd3, 0x02, 0x3b, 0x54, 0x27, 0x7c, 0x5b, 0x9e, 0x4d, 0x40, 0x63, 0xf8, 0xab, 0x01, 0x6e, 0xac, 0x6b, 0xfb, 0x02, 0xae,
0x4f, 0xf2, 0x09, 0x6c, 0xb5, 0x3f, 0x70, 0xbb, 0x56, 0xda, 0x9d, 0xbd, 0x34, 0xd8, 0x69, 0xb4, 0x26, 0x36, 0x6e, 0x55, 0xa6, 0xb4, 0xf4, 0xc3, 0xe8, 0xbd, 0xa8, 0xb5, 0x9b, 0xed, 0xb2, 0xf0,
0x6b, 0x12, 0x1b, 0x78, 0x08, 0x96, 0x9c, 0x65, 0xdc, 0xaa, 0xf9, 0x2e, 0xcf, 0xf8, 0xa0, 0x61, 0x00, 0x1c, 0x45, 0x32, 0xdc, 0xac, 0xf8, 0x2e, 0x92, 0xaf, 0x57, 0xb3, 0xf0, 0xa1, 0xb5, 0x67,
0xae, 0x7c, 0xe3, 0x85, 0x89, 0x87, 0xd0, 0x53, 0xd7, 0x4b, 0x39, 0xa1, 0x57, 0x27, 0xac, 0x86, 0xe3, 0x01, 0x78, 0xfa, 0x7a, 0x29, 0x27, 0x0c, 0xaa, 0x9b, 0x63, 0x20, 0x9e, 0xd6, 0xd0, 0x52,
0xd8, 0x6c, 0x98, 0x7e, 0x89, 0xf1, 0x01, 0xba, 0x7a, 0x2e, 0xb1, 0xc9, 0x6f, 0x30, 0xac, 0x19, 0x61, 0x7c, 0x82, 0xb6, 0x21, 0x0c, 0xd6, 0xf9, 0xf5, 0xfa, 0x15, 0xc3, 0x2a, 0xc7, 0xac, 0xe1,
0x56, 0x47, 0xd9, 0xc0, 0xe3, 0x05, 0x07, 0x9a, 0x0b, 0xd9, 0x69, 0xea, 0xe8, 0x02, 0x66, 0xf4, 0xdf, 0x06, 0x38, 0xc7, 0xc9, 0xe9, 0x84, 0xf0, 0xb0, 0xec, 0x2a, 0xd6, 0x2c, 0xf3, 0x2d, 0xed,
0xbb, 0x05, 0xd6, 0x69, 0x74, 0x3e, 0x21, 0x3c, 0xaa, 0x9a, 0x83, 0x0d, 0xa3, 0x78, 0x0f, 0xdc, 0x59, 0x21, 0xa8, 0x25, 0x41, 0xf4, 0x38, 0xee, 0x01, 0xb2, 0xc2, 0x69, 0x05, 0xa2, 0xe7, 0x78,
0xca, 0x3a, 0x31, 0x4a, 0x10, 0xd5, 0xd5, 0x47, 0x80, 0xac, 0x6c, 0x20, 0x09, 0xa2, 0xe8, 0xf0, 0x0f, 0x90, 0x95, 0xdf, 0x80, 0x85, 0xfb, 0xd0, 0x92, 0xaf, 0xc9, 0x1d, 0xfd, 0xad, 0x4e, 0x70,
0x08, 0x90, 0x95, 0xa5, 0x65, 0xe0, 0x18, 0x3a, 0xe5, 0xbf, 0xef, 0x81, 0xd7, 0xa9, 0x13, 0x61, 0xf1, 0xf9, 0x09, 0x2d, 0xfc, 0x5c, 0xb2, 0x76, 0xb3, 0xe6, 0xcf, 0x6d, 0x80, 0xb6, 0xea, 0xcc,
0xf9, 0x67, 0xe9, 0x1b, 0xf8, 0xb1, 0xda, 0x39, 0x5b, 0x0d, 0xff, 0x19, 0x0d, 0xb4, 0xdd, 0x64, 0x25, 0xd2, 0xa9, 0xab, 0x5e, 0xc2, 0x57, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0x5a, 0x08, 0x7e,
0xae, 0x90, 0xce, 0x6d, 0xf9, 0xdf, 0x7e, 0xf9, 0x37, 0x00, 0x00, 0xff, 0xff, 0x86, 0x75, 0x28, 0xf4, 0x3d, 0x07, 0x00, 0x00,
0x0b, 0xc7, 0x07, 0x00, 0x00,
} }

View File

@ -1,5 +1,5 @@
// Code generated by protoc-gen-micro. DO NOT EDIT. // Code generated by protoc-gen-micro. DO NOT EDIT.
// source: router.proto // source: micro/go-micro/router/service/proto/router.proto
package go_micro_router package go_micro_router
@ -38,7 +38,6 @@ type RouterService interface {
Watch(ctx context.Context, in *WatchRequest, opts ...client.CallOption) (Router_WatchService, error) Watch(ctx context.Context, in *WatchRequest, opts ...client.CallOption) (Router_WatchService, error)
Advertise(ctx context.Context, in *Request, opts ...client.CallOption) (Router_AdvertiseService, error) Advertise(ctx context.Context, in *Request, opts ...client.CallOption) (Router_AdvertiseService, error)
Process(ctx context.Context, in *Advert, opts ...client.CallOption) (*ProcessResponse, error) Process(ctx context.Context, in *Advert, opts ...client.CallOption) (*ProcessResponse, error)
Status(ctx context.Context, in *Request, opts ...client.CallOption) (*StatusResponse, error)
} }
type routerService struct { type routerService struct {
@ -167,16 +166,6 @@ func (c *routerService) Process(ctx context.Context, in *Advert, opts ...client.
return out, nil return out, nil
} }
func (c *routerService) Status(ctx context.Context, in *Request, opts ...client.CallOption) (*StatusResponse, error) {
req := c.c.NewRequest(c.name, "Router.Status", in)
out := new(StatusResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for Router service // Server API for Router service
type RouterHandler interface { type RouterHandler interface {
@ -184,7 +173,6 @@ type RouterHandler interface {
Watch(context.Context, *WatchRequest, Router_WatchStream) error Watch(context.Context, *WatchRequest, Router_WatchStream) error
Advertise(context.Context, *Request, Router_AdvertiseStream) error Advertise(context.Context, *Request, Router_AdvertiseStream) error
Process(context.Context, *Advert, *ProcessResponse) error Process(context.Context, *Advert, *ProcessResponse) error
Status(context.Context, *Request, *StatusResponse) error
} }
func RegisterRouterHandler(s server.Server, hdlr RouterHandler, opts ...server.HandlerOption) error { func RegisterRouterHandler(s server.Server, hdlr RouterHandler, opts ...server.HandlerOption) error {
@ -193,7 +181,6 @@ func RegisterRouterHandler(s server.Server, hdlr RouterHandler, opts ...server.H
Watch(ctx context.Context, stream server.Stream) error Watch(ctx context.Context, stream server.Stream) error
Advertise(ctx context.Context, stream server.Stream) error Advertise(ctx context.Context, stream server.Stream) error
Process(ctx context.Context, in *Advert, out *ProcessResponse) error Process(ctx context.Context, in *Advert, out *ProcessResponse) error
Status(ctx context.Context, in *Request, out *StatusResponse) error
} }
type Router struct { type Router struct {
router router
@ -284,10 +271,6 @@ func (h *routerHandler) Process(ctx context.Context, in *Advert, out *ProcessRes
return h.RouterHandler.Process(ctx, in, out) return h.RouterHandler.Process(ctx, in, out)
} }
func (h *routerHandler) Status(ctx context.Context, in *Request, out *StatusResponse) error {
return h.RouterHandler.Status(ctx, in, out)
}
// Client API for Table service // Client API for Table service
type TableService interface { type TableService interface {

View File

@ -8,7 +8,6 @@ service Router {
rpc Watch(WatchRequest) returns (stream Event) {}; rpc Watch(WatchRequest) returns (stream Event) {};
rpc Advertise(Request) returns (stream Advert) {}; rpc Advertise(Request) returns (stream Advert) {};
rpc Process(Advert) returns (ProcessResponse) {}; rpc Process(Advert) returns (ProcessResponse) {};
rpc Status(Request) returns (StatusResponse) {};
} }
service Table { service Table {
@ -94,12 +93,14 @@ enum EventType {
// Event is routing table event // Event is routing table event
message Event { message Event {
// the unique event id
string id = 1;
// type of event // type of event
EventType type = 1; EventType type = 2;
// unix timestamp of event // unix timestamp of event
int64 timestamp = 2; int64 timestamp = 3;
// service route // service route
Route route = 3; Route route = 4;
} }
// Query is passed in a LookupRequest // Query is passed in a LookupRequest
@ -129,12 +130,3 @@ message Route {
// the metric / score of this route // the metric / score of this route
int64 metric = 7; int64 metric = 7;
} }
message Status {
string code = 1;
string error = 2;
}
message StatusResponse {
Status status = 1;
}

View File

@ -2,7 +2,6 @@ package service
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"io" "io"
"sync" "sync"
@ -19,7 +18,6 @@ type svc struct {
callOpts []client.CallOption callOpts []client.CallOption
router pb.RouterService router pb.RouterService
table *table table *table
status *router.Status
exit chan bool exit chan bool
errChan chan error errChan chan error
advertChan chan *router.Advert advertChan chan *router.Advert
@ -43,16 +41,9 @@ func NewRouter(opts ...router.Option) router.Router {
cli = options.Client cli = options.Client
} }
// set the status to Stopped
status := &router.Status{
Code: router.Stopped,
Error: nil,
}
// NOTE: should we have Client/Service option in router.Options? // NOTE: should we have Client/Service option in router.Options?
s := &svc{ s := &svc{
opts: options, opts: options,
status: status,
router: pb.NewRouterService(router.DefaultName, cli), router: pb.NewRouterService(router.DefaultName, cli),
} }
@ -98,12 +89,6 @@ func (s *svc) Table() router.Table {
func (s *svc) Start() error { func (s *svc) Start() error {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
s.status = &router.Status{
Code: router.Running,
Error: nil,
}
return nil return nil
} }
@ -136,6 +121,7 @@ func (s *svc) advertiseEvents(advertChan chan *router.Advert, stream pb.Router_A
} }
events[i] = &router.Event{ events[i] = &router.Event{
Id: event.Id,
Type: router.EventType(event.Type), Type: router.EventType(event.Type),
Timestamp: time.Unix(0, event.Timestamp), Timestamp: time.Unix(0, event.Timestamp),
Route: route, Route: route,
@ -169,21 +155,16 @@ func (s *svc) Advertise() (<-chan *router.Advert, error) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
switch s.status.Code { stream, err := s.router.Advertise(context.Background(), &pb.Request{}, s.callOpts...)
case router.Running, router.Advertising: if err != nil {
stream, err := s.router.Advertise(context.Background(), &pb.Request{}, s.callOpts...) return nil, fmt.Errorf("failed getting advert stream: %s", err)
if err != nil {
return nil, fmt.Errorf("failed getting advert stream: %s", err)
}
// create advertise and event channels
advertChan := make(chan *router.Advert)
go s.advertiseEvents(advertChan, stream)
return advertChan, nil
case router.Stopped:
return nil, fmt.Errorf("not running")
} }
return nil, fmt.Errorf("error: %s", s.status.Error) // create advertise and event channels
advertChan := make(chan *router.Advert)
go s.advertiseEvents(advertChan, stream)
return advertChan, nil
} }
// Process processes incoming adverts // Process processes incoming adverts
@ -199,6 +180,7 @@ func (s *svc) Process(advert *router.Advert) error {
Metric: event.Route.Metric, Metric: event.Route.Metric,
} }
e := &pb.Event{ e := &pb.Event{
Id: event.Id,
Type: pb.EventType(event.Type), Type: pb.EventType(event.Type),
Timestamp: event.Timestamp.UnixNano(), Timestamp: event.Timestamp.UnixNano(),
Route: route, Route: route,
@ -220,55 +202,6 @@ func (s *svc) Process(advert *router.Advert) error {
return nil return nil
} }
// Status returns router status
func (s *svc) Status() router.Status {
s.Lock()
defer s.Unlock()
// check if its stopped
select {
case <-s.exit:
return router.Status{
Code: router.Stopped,
Error: nil,
}
default:
// don't block
}
// check the remote router
rsp, err := s.router.Status(context.Background(), &pb.Request{}, s.callOpts...)
if err != nil {
return router.Status{
Code: router.Error,
Error: err,
}
}
code := router.Running
var serr error
switch rsp.Status.Code {
case "running":
code = router.Running
case "advertising":
code = router.Advertising
case "stopped":
code = router.Stopped
case "error":
code = router.Error
}
if len(rsp.Status.Error) > 0 {
serr = errors.New(rsp.Status.Error)
}
return router.Status{
Code: code,
Error: serr,
}
}
// Remote router cannot be stopped // Remote router cannot be stopped
func (s *svc) Stop() error { func (s *svc) Stop() error {
s.Lock() s.Lock()

View File

@ -65,6 +65,7 @@ func (w *watcher) watch(stream pb.Router_WatchService) error {
} }
event := &router.Event{ event := &router.Event{
Id: resp.Id,
Type: router.EventType(resp.Type), Type: router.EventType(resp.Type),
Timestamp: time.Unix(0, resp.Timestamp), Timestamp: time.Unix(0, resp.Timestamp),
Route: route, Route: route,

View File

@ -38,6 +38,10 @@ func (t *table) sendEvent(e *Event) {
t.RLock() t.RLock()
defer t.RUnlock() defer t.RUnlock()
if len(e.Id) == 0 {
e.Id = uuid.New().String()
}
for _, w := range t.watchers { for _, w := range t.watchers {
select { select {
case w.resChan <- e: case w.resChan <- e:

View File

@ -39,6 +39,8 @@ func (t EventType) String() string {
// Event is returned by a call to Next on the watcher. // Event is returned by a call to Next on the watcher.
type Event struct { type Event struct {
// Unique id of the event
Id string
// Type defines type of event // Type defines type of event
Type EventType Type EventType
// Timestamp is event timestamp // Timestamp is event timestamp

View File

@ -7,7 +7,7 @@ import (
"os/exec" "os/exec"
"strconv" "strconv"
"github.com/micro/go-micro/runtime/process" "github.com/micro/go-micro/runtime/local/process"
) )
func (p *Process) Exec(exe *process.Executable) error { func (p *Process) Exec(exe *process.Executable) error {

View File

@ -25,10 +25,6 @@ import (
var ( var (
lastStreamResponseError = errors.New("EOS") lastStreamResponseError = errors.New("EOS")
// A value sent as a placeholder for the server's response value when the server
// receives an invalid request. It is never decoded by the client since the Response
// contains an error when it is used.
invalidRequest = struct{}{}
// Precompute the reflect type for error. Can't use error directly // Precompute the reflect type for error. Can't use error directly
// because Typeof takes an empty interface value. This is annoying. // because Typeof takes an empty interface value. This is annoying.

View File

@ -129,7 +129,7 @@ type Option func(*Options)
var ( var (
DefaultAddress = ":0" DefaultAddress = ":0"
DefaultName = "go.micro.server" DefaultName = "go.micro.server"
DefaultVersion = time.Now().Format("2006.01.02.15.04") DefaultVersion = "latest"
DefaultId = uuid.New().String() DefaultId = uuid.New().String()
DefaultServer Server = newRpcServer() DefaultServer Server = newRpcServer()
DefaultRouter = newRpcRouter() DefaultRouter = newRpcRouter()