From 3adce58eb2d572cd6cd9d5f21969cd3601046b04 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Tue, 6 Aug 2019 17:53:14 +0100 Subject: [PATCH 1/2] Add monitor/debug packages --- debug/handler/debug.go | 41 +++ debug/proto/debug.micro.go | 108 +++++++ debug/proto/debug.pb.go | 336 ++++++++++++++++++++++ {server/debug => debug}/proto/debug.proto | 12 +- function_test.go | 2 +- monitor/default.go | 213 ++++++++++++++ monitor/default_test.go | 19 ++ monitor/monitor.go | 39 +++ monitor/options.go | 25 ++ server/debug.go | 9 - server/debug/debug.go | 55 ---- server/debug/proto/debug.pb.go | 100 ------- server/grpc/debug.go | 10 - server/grpc/grpc.go | 1 - server/grpc/options.go | 5 - server/options.go | 15 - server/rpc_server.go | 1 - service.go | 9 + service/options.go | 220 ++++++++++++++ service/service.go | 14 + service_test.go | 2 +- 21 files changed, 1030 insertions(+), 206 deletions(-) create mode 100644 debug/handler/debug.go create mode 100644 debug/proto/debug.micro.go create mode 100644 debug/proto/debug.pb.go rename {server/debug => debug}/proto/debug.proto (59%) create mode 100644 monitor/default.go create mode 100644 monitor/default_test.go create mode 100644 monitor/monitor.go create mode 100644 monitor/options.go delete mode 100644 server/debug.go delete mode 100644 server/debug/debug.go delete mode 100644 server/debug/proto/debug.pb.go delete mode 100644 server/grpc/debug.go create mode 100644 service/options.go diff --git a/debug/handler/debug.go b/debug/handler/debug.go new file mode 100644 index 00000000..973a6fc4 --- /dev/null +++ b/debug/handler/debug.go @@ -0,0 +1,41 @@ +package handler + +import ( + "context" + "runtime" + "time" + + proto "github.com/micro/go-micro/debug/proto" +) + +type Debug struct { + proto.DebugHandler + started int64 +} + +var ( + DefaultHandler = newDebug() +) + +func newDebug() *Debug { + return &Debug{ + started: time.Now().Unix(), + } +} + +func (d *Debug) Health(ctx context.Context, req *proto.HealthRequest, rsp *proto.HealthResponse) error { + rsp.Status = "ok" + return nil +} + +func (d *Debug) Stats(ctx context.Context, req *proto.StatsRequest, rsp *proto.StatsResponse) error { + var mstat runtime.MemStats + runtime.ReadMemStats(&mstat) + + rsp.Started = uint64(d.started) + rsp.Uptime = uint64(time.Now().Unix() - d.started) + rsp.Memory = mstat.Alloc + rsp.Gc = mstat.PauseTotalNs + rsp.Threads = uint64(runtime.NumGoroutine()) + return nil +} diff --git a/debug/proto/debug.micro.go b/debug/proto/debug.micro.go new file mode 100644 index 00000000..baa06cee --- /dev/null +++ b/debug/proto/debug.micro.go @@ -0,0 +1,108 @@ +// Code generated by protoc-gen-micro. DO NOT EDIT. +// source: micro/go-micro/debug/proto/debug.proto + +package debug + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +import ( + context "context" + client "github.com/micro/go-micro/client" + server "github.com/micro/go-micro/server" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ client.Option +var _ server.Option + +// Client API for Debug service + +type DebugService interface { + Health(ctx context.Context, in *HealthRequest, opts ...client.CallOption) (*HealthResponse, error) + Stats(ctx context.Context, in *StatsRequest, opts ...client.CallOption) (*StatsResponse, error) +} + +type debugService struct { + c client.Client + name string +} + +func NewDebugService(name string, c client.Client) DebugService { + if c == nil { + c = client.NewClient() + } + if len(name) == 0 { + name = "debug" + } + return &debugService{ + c: c, + name: name, + } +} + +func (c *debugService) Health(ctx context.Context, in *HealthRequest, opts ...client.CallOption) (*HealthResponse, error) { + req := c.c.NewRequest(c.name, "Debug.Health", in) + out := new(HealthResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *debugService) Stats(ctx context.Context, in *StatsRequest, opts ...client.CallOption) (*StatsResponse, error) { + req := c.c.NewRequest(c.name, "Debug.Stats", in) + out := new(StatsResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for Debug service + +type DebugHandler interface { + Health(context.Context, *HealthRequest, *HealthResponse) error + Stats(context.Context, *StatsRequest, *StatsResponse) error +} + +func RegisterDebugHandler(s server.Server, hdlr DebugHandler, opts ...server.HandlerOption) error { + type debug interface { + Health(ctx context.Context, in *HealthRequest, out *HealthResponse) error + Stats(ctx context.Context, in *StatsRequest, out *StatsResponse) error + } + type Debug struct { + debug + } + h := &debugHandler{hdlr} + return s.Handle(s.NewHandler(&Debug{h}, opts...)) +} + +type debugHandler struct { + DebugHandler +} + +func (h *debugHandler) Health(ctx context.Context, in *HealthRequest, out *HealthResponse) error { + return h.DebugHandler.Health(ctx, in, out) +} + +func (h *debugHandler) Stats(ctx context.Context, in *StatsRequest, out *StatsResponse) error { + return h.DebugHandler.Stats(ctx, in, out) +} diff --git a/debug/proto/debug.pb.go b/debug/proto/debug.pb.go new file mode 100644 index 00000000..5ba4f8c9 --- /dev/null +++ b/debug/proto/debug.pb.go @@ -0,0 +1,336 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: micro/go-micro/debug/proto/debug.proto + +package debug + +import ( + context "context" + fmt "fmt" + proto "github.com/golang/protobuf/proto" + grpc "google.golang.org/grpc" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type HealthRequest struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *HealthRequest) Reset() { *m = HealthRequest{} } +func (m *HealthRequest) String() string { return proto.CompactTextString(m) } +func (*HealthRequest) ProtoMessage() {} +func (*HealthRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_f25415e61bccfa1f, []int{0} +} + +func (m *HealthRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_HealthRequest.Unmarshal(m, b) +} +func (m *HealthRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_HealthRequest.Marshal(b, m, deterministic) +} +func (m *HealthRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_HealthRequest.Merge(m, src) +} +func (m *HealthRequest) XXX_Size() int { + return xxx_messageInfo_HealthRequest.Size(m) +} +func (m *HealthRequest) XXX_DiscardUnknown() { + xxx_messageInfo_HealthRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_HealthRequest proto.InternalMessageInfo + +type HealthResponse struct { + // default: ok + Status string `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *HealthResponse) Reset() { *m = HealthResponse{} } +func (m *HealthResponse) String() string { return proto.CompactTextString(m) } +func (*HealthResponse) ProtoMessage() {} +func (*HealthResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_f25415e61bccfa1f, []int{1} +} + +func (m *HealthResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_HealthResponse.Unmarshal(m, b) +} +func (m *HealthResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_HealthResponse.Marshal(b, m, deterministic) +} +func (m *HealthResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_HealthResponse.Merge(m, src) +} +func (m *HealthResponse) XXX_Size() int { + return xxx_messageInfo_HealthResponse.Size(m) +} +func (m *HealthResponse) XXX_DiscardUnknown() { + xxx_messageInfo_HealthResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_HealthResponse proto.InternalMessageInfo + +func (m *HealthResponse) GetStatus() string { + if m != nil { + return m.Status + } + return "" +} + +type StatsRequest struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *StatsRequest) Reset() { *m = StatsRequest{} } +func (m *StatsRequest) String() string { return proto.CompactTextString(m) } +func (*StatsRequest) ProtoMessage() {} +func (*StatsRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_f25415e61bccfa1f, []int{2} +} + +func (m *StatsRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_StatsRequest.Unmarshal(m, b) +} +func (m *StatsRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_StatsRequest.Marshal(b, m, deterministic) +} +func (m *StatsRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_StatsRequest.Merge(m, src) +} +func (m *StatsRequest) XXX_Size() int { + return xxx_messageInfo_StatsRequest.Size(m) +} +func (m *StatsRequest) XXX_DiscardUnknown() { + xxx_messageInfo_StatsRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_StatsRequest proto.InternalMessageInfo + +type StatsResponse struct { + // unix timestamp + Started uint64 `protobuf:"varint,1,opt,name=started,proto3" json:"started,omitempty"` + // in seconds + Uptime uint64 `protobuf:"varint,2,opt,name=uptime,proto3" json:"uptime,omitempty"` + // in bytes + Memory uint64 `protobuf:"varint,3,opt,name=memory,proto3" json:"memory,omitempty"` + // num threads + Threads uint64 `protobuf:"varint,4,opt,name=threads,proto3" json:"threads,omitempty"` + // total gc in nanoseconds + Gc uint64 `protobuf:"varint,5,opt,name=gc,proto3" json:"gc,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *StatsResponse) Reset() { *m = StatsResponse{} } +func (m *StatsResponse) String() string { return proto.CompactTextString(m) } +func (*StatsResponse) ProtoMessage() {} +func (*StatsResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_f25415e61bccfa1f, []int{3} +} + +func (m *StatsResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_StatsResponse.Unmarshal(m, b) +} +func (m *StatsResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_StatsResponse.Marshal(b, m, deterministic) +} +func (m *StatsResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_StatsResponse.Merge(m, src) +} +func (m *StatsResponse) XXX_Size() int { + return xxx_messageInfo_StatsResponse.Size(m) +} +func (m *StatsResponse) XXX_DiscardUnknown() { + xxx_messageInfo_StatsResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_StatsResponse proto.InternalMessageInfo + +func (m *StatsResponse) GetStarted() uint64 { + if m != nil { + return m.Started + } + return 0 +} + +func (m *StatsResponse) GetUptime() uint64 { + if m != nil { + return m.Uptime + } + return 0 +} + +func (m *StatsResponse) GetMemory() uint64 { + if m != nil { + return m.Memory + } + return 0 +} + +func (m *StatsResponse) GetThreads() uint64 { + if m != nil { + return m.Threads + } + return 0 +} + +func (m *StatsResponse) GetGc() uint64 { + if m != nil { + return m.Gc + } + return 0 +} + +func init() { + proto.RegisterType((*HealthRequest)(nil), "HealthRequest") + proto.RegisterType((*HealthResponse)(nil), "HealthResponse") + proto.RegisterType((*StatsRequest)(nil), "StatsRequest") + proto.RegisterType((*StatsResponse)(nil), "StatsResponse") +} + +func init() { + proto.RegisterFile("micro/go-micro/debug/proto/debug.proto", fileDescriptor_f25415e61bccfa1f) +} + +var fileDescriptor_f25415e61bccfa1f = []byte{ + // 230 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x90, 0x41, 0x4b, 0xc4, 0x30, + 0x10, 0x85, 0xb7, 0x75, 0x5b, 0x71, 0xb0, 0x59, 0xc8, 0x41, 0xc2, 0x9e, 0x24, 0x07, 0x29, 0x88, + 0x59, 0xd0, 0xbf, 0xe0, 0xc1, 0x73, 0xbd, 0x0b, 0xd9, 0x76, 0xe8, 0x16, 0xac, 0xa9, 0x99, 0xe9, + 0xc1, 0xb3, 0x7f, 0x5c, 0x9a, 0xa4, 0x60, 0x6f, 0xef, 0xbd, 0xf0, 0x1e, 0xf9, 0x06, 0x1e, 0xc6, + 0xa1, 0xf5, 0xee, 0xd4, 0xbb, 0xa7, 0x28, 0x3a, 0x3c, 0xcf, 0xfd, 0x69, 0xf2, 0x8e, 0x93, 0x36, + 0x41, 0xeb, 0x03, 0x54, 0x6f, 0x68, 0x3f, 0xf9, 0xd2, 0xe0, 0xf7, 0x8c, 0xc4, 0xba, 0x06, 0xb1, + 0x06, 0x34, 0xb9, 0x2f, 0x42, 0x79, 0x07, 0x25, 0xb1, 0xe5, 0x99, 0x54, 0x76, 0x9f, 0xd5, 0x37, + 0x4d, 0x72, 0x5a, 0xc0, 0xed, 0x3b, 0x5b, 0xa6, 0xb5, 0xf9, 0x9b, 0x41, 0x95, 0x82, 0xd4, 0x54, + 0x70, 0x4d, 0x6c, 0x3d, 0x63, 0x17, 0xaa, 0xfb, 0x66, 0xb5, 0xcb, 0xe6, 0x3c, 0xf1, 0x30, 0xa2, + 0xca, 0xc3, 0x43, 0x72, 0x4b, 0x3e, 0xe2, 0xe8, 0xfc, 0x8f, 0xba, 0x8a, 0x79, 0x74, 0xcb, 0x12, + 0x5f, 0x3c, 0xda, 0x8e, 0xd4, 0x3e, 0x2e, 0x25, 0x2b, 0x05, 0xe4, 0x7d, 0xab, 0x8a, 0x10, 0xe6, + 0x7d, 0xfb, 0xfc, 0x01, 0xc5, 0xeb, 0xc2, 0x27, 0x1f, 0xa1, 0x8c, 0x20, 0x52, 0x98, 0x0d, 0xe2, + 0xf1, 0x60, 0xb6, 0x84, 0x7a, 0x27, 0x6b, 0x28, 0xc2, 0xd7, 0x65, 0x65, 0xfe, 0x33, 0x1d, 0x85, + 0xd9, 0x10, 0xe9, 0xdd, 0xb9, 0x0c, 0x77, 0x7b, 0xf9, 0x0b, 0x00, 0x00, 0xff, 0xff, 0xfe, 0xb9, + 0x5f, 0xf7, 0x61, 0x01, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// DebugClient is the client API for Debug service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type DebugClient interface { + Health(ctx context.Context, in *HealthRequest, opts ...grpc.CallOption) (*HealthResponse, error) + Stats(ctx context.Context, in *StatsRequest, opts ...grpc.CallOption) (*StatsResponse, error) +} + +type debugClient struct { + cc *grpc.ClientConn +} + +func NewDebugClient(cc *grpc.ClientConn) DebugClient { + return &debugClient{cc} +} + +func (c *debugClient) Health(ctx context.Context, in *HealthRequest, opts ...grpc.CallOption) (*HealthResponse, error) { + out := new(HealthResponse) + err := c.cc.Invoke(ctx, "/Debug/Health", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *debugClient) Stats(ctx context.Context, in *StatsRequest, opts ...grpc.CallOption) (*StatsResponse, error) { + out := new(StatsResponse) + err := c.cc.Invoke(ctx, "/Debug/Stats", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// DebugServer is the server API for Debug service. +type DebugServer interface { + Health(context.Context, *HealthRequest) (*HealthResponse, error) + Stats(context.Context, *StatsRequest) (*StatsResponse, error) +} + +func RegisterDebugServer(s *grpc.Server, srv DebugServer) { + s.RegisterService(&_Debug_serviceDesc, srv) +} + +func _Debug_Health_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(HealthRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DebugServer).Health(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/Debug/Health", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DebugServer).Health(ctx, req.(*HealthRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Debug_Stats_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(StatsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DebugServer).Stats(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/Debug/Stats", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DebugServer).Stats(ctx, req.(*StatsRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _Debug_serviceDesc = grpc.ServiceDesc{ + ServiceName: "Debug", + HandlerType: (*DebugServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Health", + Handler: _Debug_Health_Handler, + }, + { + MethodName: "Stats", + Handler: _Debug_Stats_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "micro/go-micro/debug/proto/debug.proto", +} diff --git a/server/debug/proto/debug.proto b/debug/proto/debug.proto similarity index 59% rename from server/debug/proto/debug.proto rename to debug/proto/debug.proto index 8d96192e..b642cd41 100644 --- a/server/debug/proto/debug.proto +++ b/debug/proto/debug.proto @@ -1,13 +1,9 @@ syntax = "proto3"; -// This is commented out due to import cycles. -// But its what we expect the RPC service to -// return. -// -// service Debug { -// rpc Health(HealthRequest) returns (HealthResponse) {} -// rpc Stats(StatsRequest) returns (StatsResponse) {} -// } +service Debug { + rpc Health(HealthRequest) returns (HealthResponse) {} + rpc Stats(StatsRequest) returns (StatsResponse) {} +} message HealthRequest { } diff --git a/function_test.go b/function_test.go index dd85590f..a7d1b18d 100644 --- a/function_test.go +++ b/function_test.go @@ -5,8 +5,8 @@ import ( "sync" "testing" + proto "github.com/micro/go-micro/debug/proto" "github.com/micro/go-micro/registry/memory" - proto "github.com/micro/go-micro/server/debug/proto" ) func TestFunction(t *testing.T) { diff --git a/monitor/default.go b/monitor/default.go new file mode 100644 index 00000000..307ec36e --- /dev/null +++ b/monitor/default.go @@ -0,0 +1,213 @@ +package monitor + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/micro/go-micro/client" + pb "github.com/micro/go-micro/debug/proto" + "github.com/micro/go-micro/registry" + "github.com/micro/go-micro/registry/cache" +) + +type monitor struct { + options Options + + exit chan bool + registry cache.Cache + client client.Client + + sync.RWMutex + services map[string]*Status +} + +// check provides binary running/failed status. +// In the event Debug.Health cannot be called on a service we reap the node. +func (m *monitor) check(service string) (*Status, error) { + services, err := m.registry.GetService(service) + if err != nil { + return nil, err + } + + // create debug client + debug := pb.NewDebugService(service, m.client) + + var status *Status + var gerr error + + // iterate through multiple versions of a service + for _, service := range services { + for _, node := range service.Nodes { + rsp, err := debug.Health( + context.Background(), + // empty health request + &pb.HealthRequest{}, + // call this specific node + client.WithAddress(node.Address), + // retry in the event of failure + client.WithRetries(3), + ) + if err != nil { + // reap the dead node + m.registry.Deregister(®istry.Service{ + Name: service.Name, + Version: service.Version, + Nodes: []*registry.Node{node}, + }) + + // save the error + gerr = err + continue + } + + // expecting ok response status + if rsp.Status != "ok" { + gerr = errors.New(rsp.Status) + continue + } + + // no error set status + status = &Status{ + Code: StatusRunning, + Info: "running", + } + } + } + + // if we got the success case return it + if status != nil { + return status, nil + } + + // if gerr is not nil return it + if gerr != nil { + return &Status{ + Code: StatusFailed, + Info: "not running", + Error: gerr.Error(), + }, nil + } + + // otherwise unknown status + return &Status{ + Code: StatusUnknown, + Info: "unknown status", + }, nil +} + +func (m *monitor) Status(service string) (Status, error) { + m.RLock() + defer m.RUnlock() + if status, ok := m.services[service]; ok { + return *status, nil + } + return Status{}, ErrNotWatching +} + +func (m *monitor) Watch(service string) error { + m.Lock() + defer m.Unlock() + + // check if we're watching + if _, ok := m.services[service]; ok { + return nil + } + + // get the status + status, err := m.check(service) + if err != nil { + return err + } + + // set the status + m.services[service] = status + return nil +} + +func (m *monitor) Stop() error { + m.Lock() + defer m.RUnlock() + + select { + case <-m.exit: + return nil + default: + close(m.exit) + for s, _ := range m.services { + delete(m.services, s) + } + m.registry.Stop() + return nil + } + + return nil +} + +func (m *monitor) run() { + // check the status every tick + t := time.NewTicker(time.Minute) + defer t.Stop() + + check := make(chan string) + + for { + select { + case <-m.exit: + return + case service := <-check: + // check the status + status, err := m.check(service) + if err != nil { + status = &Status{ + Code: StatusUnknown, + Info: "unknown status", + } + } + + // save the status + m.Lock() + m.services[service] = status + m.Unlock() + case <-t.C: + // create a list of services + var services []string + m.RLock() + for service, _ := range m.services { + services = append(services, service) + } + m.RUnlock() + + // check the status of all watched services + for _, service := range services { + select { + case <-m.exit: + return + case check <- service: + } + } + } + } +} + +func newMonitor(opts ...Option) Monitor { + options := Options{ + Client: client.DefaultClient, + Registry: registry.DefaultRegistry, + } + + for _, o := range opts { + o(&options) + } + + m := &monitor{ + options: options, + client: options.Client, + registry: cache.New(options.Registry), + services: make(map[string]*Status), + } + + go m.run() + return m +} diff --git a/monitor/default_test.go b/monitor/default_test.go new file mode 100644 index 00000000..6910cdac --- /dev/null +++ b/monitor/default_test.go @@ -0,0 +1,19 @@ +package monitor + +import ( + "testing" +) + +func TestMonitor(t *testing.T) { + // create new monitor + m := NewMonitor() + + services := []string{"foo", "bar", "baz"} + + for _, service := range services { + _, err := m.Status(service) + if err != nil { + t.Fatal("expected status error for unknown service") + } + } +} diff --git a/monitor/monitor.go b/monitor/monitor.go new file mode 100644 index 00000000..41fd17cd --- /dev/null +++ b/monitor/monitor.go @@ -0,0 +1,39 @@ +// Package monitor monitors service health +package monitor + +import ( + "errors" +) + +const ( + StatusUnknown StatusCode = iota + StatusRunning + StatusFailed +) + +type StatusCode int + +// Monitor monitors a service and reaps dead instances +type Monitor interface { + // Status of the service + Status(service string) (Status, error) + // Watch starts watching the service + Watch(service string) error + // Stop monitoring + Stop() error +} + +type Status struct { + Code StatusCode + Info string + Error string +} + +var ( + ErrNotWatching = errors.New("not watching") +) + +// NewMonitor returns a new monitor +func NewMonitor(opts ...Option) Monitor { + return newMonitor(opts...) +} diff --git a/monitor/options.go b/monitor/options.go new file mode 100644 index 00000000..445d39d9 --- /dev/null +++ b/monitor/options.go @@ -0,0 +1,25 @@ +package monitor + +import ( + "github.com/micro/go-micro/client" + "github.com/micro/go-micro/registry" +) + +type Options struct { + Client client.Client + Registry registry.Registry +} + +type Option func(*Options) + +func Client(c client.Client) Option { + return func(o *Options) { + o.Client = c + } +} + +func Registry(r registry.Registry) Option { + return func(o *Options) { + o.Registry = r + } +} diff --git a/server/debug.go b/server/debug.go deleted file mode 100644 index 72cc88a7..00000000 --- a/server/debug.go +++ /dev/null @@ -1,9 +0,0 @@ -package server - -import ( - "github.com/micro/go-micro/server/debug" -) - -func registerDebugHandler(s Server) { - s.Handle(s.NewHandler(&debug.Debug{s.Options().DebugHandler}, InternalHandler(true))) -} diff --git a/server/debug/debug.go b/server/debug/debug.go deleted file mode 100644 index dc2fa8c7..00000000 --- a/server/debug/debug.go +++ /dev/null @@ -1,55 +0,0 @@ -package debug - -import ( - "context" - "runtime" - "time" - - proto "github.com/micro/go-micro/server/debug/proto" -) - -// The debug handler represents an internal server handler -// used to determine health, status and env info about -// a service node. It's akin to Google's /statusz, /healthz, -// and /varz -type Handler interface { - Health(ctx context.Context, req *proto.HealthRequest, rsp *proto.HealthResponse) error - Stats(ctx context.Context, req *proto.StatsRequest, rsp *proto.StatsResponse) error -} - -// Our own internal handler -type debug struct { - started int64 -} - -// We use this to wrap any debug handlers so we preserve the signature Debug.{Method} -type Debug struct { - Handler -} - -var ( - DefaultHandler Handler = newDebug() -) - -func newDebug() *debug { - return &debug{ - started: time.Now().Unix(), - } -} - -func (d *debug) Health(ctx context.Context, req *proto.HealthRequest, rsp *proto.HealthResponse) error { - rsp.Status = "ok" - return nil -} - -func (d *debug) Stats(ctx context.Context, req *proto.StatsRequest, rsp *proto.StatsResponse) error { - var mstat runtime.MemStats - runtime.ReadMemStats(&mstat) - - rsp.Started = uint64(d.started) - rsp.Uptime = uint64(time.Now().Unix() - d.started) - rsp.Memory = mstat.Alloc - rsp.Gc = mstat.PauseTotalNs - rsp.Threads = uint64(runtime.NumGoroutine()) - return nil -} diff --git a/server/debug/proto/debug.pb.go b/server/debug/proto/debug.pb.go deleted file mode 100644 index 7cfa7d5e..00000000 --- a/server/debug/proto/debug.pb.go +++ /dev/null @@ -1,100 +0,0 @@ -// Code generated by protoc-gen-go. -// source: github.com/micro/go-micro/server/debug/proto/debug.proto -// DO NOT EDIT! - -/* -Package debug is a generated protocol buffer package. - -It is generated from these files: - github.com/micro/go-micro/server/debug/proto/debug.proto - -It has these top-level messages: - HealthRequest - HealthResponse - StatsRequest - StatsResponse -*/ -package debug - -import proto "github.com/golang/protobuf/proto" -import fmt "fmt" -import math "math" - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package - -type HealthRequest struct { -} - -func (m *HealthRequest) Reset() { *m = HealthRequest{} } -func (m *HealthRequest) String() string { return proto.CompactTextString(m) } -func (*HealthRequest) ProtoMessage() {} -func (*HealthRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } - -type HealthResponse struct { - // default: ok - Status string `protobuf:"bytes,1,opt,name=status" json:"status,omitempty"` -} - -func (m *HealthResponse) Reset() { *m = HealthResponse{} } -func (m *HealthResponse) String() string { return proto.CompactTextString(m) } -func (*HealthResponse) ProtoMessage() {} -func (*HealthResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } - -type StatsRequest struct { -} - -func (m *StatsRequest) Reset() { *m = StatsRequest{} } -func (m *StatsRequest) String() string { return proto.CompactTextString(m) } -func (*StatsRequest) ProtoMessage() {} -func (*StatsRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } - -type StatsResponse struct { - // unix timestamp - Started uint64 `protobuf:"varint,1,opt,name=started" json:"started,omitempty"` - // in seconds - Uptime uint64 `protobuf:"varint,2,opt,name=uptime" json:"uptime,omitempty"` - // in bytes - Memory uint64 `protobuf:"varint,3,opt,name=memory" json:"memory,omitempty"` - // num threads - Threads uint64 `protobuf:"varint,4,opt,name=threads" json:"threads,omitempty"` - // in nanoseconds - Gc uint64 `protobuf:"varint,5,opt,name=gc" json:"gc,omitempty"` -} - -func (m *StatsResponse) Reset() { *m = StatsResponse{} } -func (m *StatsResponse) String() string { return proto.CompactTextString(m) } -func (*StatsResponse) ProtoMessage() {} -func (*StatsResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } - -func init() { - proto.RegisterType((*HealthRequest)(nil), "HealthRequest") - proto.RegisterType((*HealthResponse)(nil), "HealthResponse") - proto.RegisterType((*StatsRequest)(nil), "StatsRequest") - proto.RegisterType((*StatsResponse)(nil), "StatsResponse") -} - -var fileDescriptor0 = []byte{ - // 201 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x34, 0x8f, 0xbd, 0x6e, 0x83, 0x30, - 0x14, 0x85, 0x05, 0xa5, 0x54, 0xbd, 0x2a, 0x54, 0x62, 0xa8, 0x3c, 0x56, 0x4c, 0x2c, 0xc5, 0x43, - 0x97, 0x3e, 0x42, 0x67, 0xf2, 0x04, 0xfc, 0x5c, 0x19, 0xa4, 0x38, 0x26, 0xbe, 0xd7, 0x91, 0x32, - 0xe7, 0xc5, 0x03, 0xb6, 0xd9, 0xce, 0xf7, 0xd9, 0xe7, 0x48, 0x17, 0xfe, 0xd4, 0xc2, 0xb3, 0x1b, - 0xda, 0xd1, 0x68, 0xa9, 0x97, 0xd1, 0x1a, 0xa9, 0xcc, 0x4f, 0x08, 0x84, 0xf6, 0x86, 0x56, 0x4e, - 0x38, 0x38, 0x25, 0x57, 0x6b, 0xd8, 0x84, 0xdc, 0xfa, 0x5c, 0x7f, 0x42, 0xf1, 0x8f, 0xfd, 0x99, - 0xe7, 0x0e, 0xaf, 0x0e, 0x89, 0xeb, 0x06, 0xca, 0x43, 0xd0, 0x6a, 0x2e, 0x84, 0xd5, 0x17, 0xe4, - 0xc4, 0x3d, 0x3b, 0x12, 0xc9, 0x77, 0xd2, 0xbc, 0x77, 0x91, 0xea, 0x12, 0x3e, 0x4e, 0x5b, 0xa2, - 0xa3, 0xf9, 0x48, 0xa0, 0x88, 0x22, 0x36, 0x05, 0xbc, 0x6d, 0x7f, 0x2d, 0xe3, 0xe4, 0xab, 0x59, - 0x77, 0xe0, 0xbe, 0xe9, 0x56, 0x5e, 0x34, 0x8a, 0xd4, 0x3f, 0x44, 0xda, 0xbd, 0x46, 0x6d, 0xec, - 0x5d, 0xbc, 0x04, 0x1f, 0x68, 0x5f, 0xe2, 0xd9, 0x62, 0x3f, 0x91, 0xc8, 0xc2, 0x52, 0xc4, 0xaa, - 0x84, 0x54, 0x8d, 0xe2, 0xd5, 0xcb, 0x2d, 0x0d, 0xb9, 0xbf, 0xeb, 0xf7, 0x19, 0x00, 0x00, 0xff, - 0xff, 0xc6, 0x75, 0x51, 0x35, 0x13, 0x01, 0x00, 0x00, -} diff --git a/server/grpc/debug.go b/server/grpc/debug.go deleted file mode 100644 index 782b8b1e..00000000 --- a/server/grpc/debug.go +++ /dev/null @@ -1,10 +0,0 @@ -package grpc - -import ( - "github.com/micro/go-micro/server" - "github.com/micro/go-micro/server/debug" -) - -func registerDebugHandler(s server.Server) { - s.Handle(s.NewHandler(&debug.Debug{s.Options().DebugHandler}, server.InternalHandler(true))) -} diff --git a/server/grpc/grpc.go b/server/grpc/grpc.go index de283043..2aefc6f1 100644 --- a/server/grpc/grpc.go +++ b/server/grpc/grpc.go @@ -700,7 +700,6 @@ func (g *grpcServer) Deregister() error { } func (g *grpcServer) Start() error { - registerDebugHandler(g) config := g.opts // micro: config.Transport.Listen(config.Address) diff --git a/server/grpc/options.go b/server/grpc/options.go index 4a3bef1d..c57c022e 100644 --- a/server/grpc/options.go +++ b/server/grpc/options.go @@ -8,7 +8,6 @@ import ( "github.com/micro/go-micro/codec" "github.com/micro/go-micro/registry" "github.com/micro/go-micro/server" - "github.com/micro/go-micro/server/debug" "github.com/micro/go-micro/transport" "google.golang.org/grpc" "google.golang.org/grpc/encoding" @@ -89,10 +88,6 @@ func newOptions(opt ...server.Option) server.Options { opts.Transport = transport.DefaultTransport } - if opts.DebugHandler == nil { - opts.DebugHandler = debug.DefaultHandler - } - if len(opts.Address) == 0 { opts.Address = server.DefaultAddress } diff --git a/server/options.go b/server/options.go index 0d0ee51f..c75ed4d1 100644 --- a/server/options.go +++ b/server/options.go @@ -8,7 +8,6 @@ import ( "github.com/micro/go-micro/broker" "github.com/micro/go-micro/codec" "github.com/micro/go-micro/registry" - "github.com/micro/go-micro/server/debug" "github.com/micro/go-micro/transport" ) @@ -36,9 +35,6 @@ type Options struct { // The router for requests Router Router - // Debug Handler which can be set by a user - DebugHandler debug.Handler - // Other options for implementations of the interface // can be stored in a context Context context.Context @@ -66,10 +62,6 @@ func newOptions(opt ...Option) Options { opts.Transport = transport.DefaultTransport } - if opts.DebugHandler == nil { - opts.DebugHandler = debug.DefaultHandler - } - if opts.RegisterCheck == nil { opts.RegisterCheck = DefaultRegisterCheck } @@ -156,13 +148,6 @@ func Transport(t transport.Transport) Option { } } -// DebugHandler for this server -func DebugHandler(d debug.Handler) Option { - return func(o *Options) { - o.DebugHandler = d - } -} - // Metadata associated with the server func Metadata(md map[string]string) Option { return func(o *Options) { diff --git a/server/rpc_server.go b/server/rpc_server.go index daa28faf..b68e6425 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -485,7 +485,6 @@ func (s *rpcServer) Deregister() error { } func (s *rpcServer) Start() error { - registerDebugHandler(s) config := s.Options() // start listening on the transport diff --git a/service.go b/service.go index 01c82e9c..e612a4b3 100644 --- a/service.go +++ b/service.go @@ -8,6 +8,7 @@ import ( "github.com/micro/go-micro/client" "github.com/micro/go-micro/config/cmd" + "github.com/micro/go-micro/debug/handler" "github.com/micro/go-micro/metadata" "github.com/micro/go-micro/server" ) @@ -113,6 +114,14 @@ func (s *service) Stop() error { } func (s *service) Run() error { + // register the debug handler + s.opts.Server.Handle( + s.opts.Server.NewHandler( + handler.DefaultHandler, + server.InternalHandler(true), + ), + ) + if err := s.Start(); err != nil { return err } diff --git a/service/options.go b/service/options.go new file mode 100644 index 00000000..9a077629 --- /dev/null +++ b/service/options.go @@ -0,0 +1,220 @@ +package service + +import ( + "context" + "time" + + "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/client" + "github.com/micro/go-micro/registry" + "github.com/micro/go-micro/server" + "github.com/micro/go-micro/transport" +) + +type Options struct { + Broker broker.Broker + Client client.Client + Server server.Server + Registry registry.Registry + Transport transport.Transport + + // Before and After funcs + BeforeStart []func() error + BeforeStop []func() error + AfterStart []func() error + AfterStop []func() error + + // Other options for implementations of the interface + // can be stored in a context + Context context.Context +} + +type Option func(*Options) + +func newOptions(opts ...Option) Options { + opt := Options{ + Broker: broker.DefaultBroker, + Client: client.DefaultClient, + Server: server.DefaultServer, + Registry: registry.DefaultRegistry, + Transport: transport.DefaultTransport, + Context: context.Background(), + } + + for _, o := range opts { + o(&opt) + } + + return opt +} + +func Broker(b broker.Broker) Option { + return func(o *Options) { + o.Broker = b + // Update Client and Server + o.Client.Init(client.Broker(b)) + o.Server.Init(server.Broker(b)) + } +} + +func Client(c client.Client) Option { + return func(o *Options) { + o.Client = c + } +} + +// Context specifies a context for the service. +// Can be used to signal shutdown of the service. +// Can be used for extra option values. +func Context(ctx context.Context) Option { + return func(o *Options) { + o.Context = ctx + } +} + +func Server(s server.Server) Option { + return func(o *Options) { + o.Server = s + } +} + +// Registry sets the registry for the service +// and the underlying components +func Registry(r registry.Registry) Option { + return func(o *Options) { + o.Registry = r + // Update Client and Server + o.Client.Init(client.Registry(r)) + o.Server.Init(server.Registry(r)) + // Update Broker + o.Broker.Init(broker.Registry(r)) + } +} + +// Transport sets the transport for the service +// and the underlying components +func Transport(t transport.Transport) Option { + return func(o *Options) { + o.Transport = t + // Update Client and Server + o.Client.Init(client.Transport(t)) + o.Server.Init(server.Transport(t)) + } +} + +// Convenience options + +// Address sets the address of the server +func Address(addr string) Option { + return func(o *Options) { + o.Server.Init(server.Address(addr)) + } +} + +// Name of the service +func Name(n string) Option { + return func(o *Options) { + o.Server.Init(server.Name(n)) + } +} + +// Version of the service +func Version(v string) Option { + return func(o *Options) { + o.Server.Init(server.Version(v)) + } +} + +// Metadata associated with the service +func Metadata(md map[string]string) Option { + return func(o *Options) { + o.Server.Init(server.Metadata(md)) + } +} + +// RegisterTTL specifies the TTL to use when registering the service +func RegisterTTL(t time.Duration) Option { + return func(o *Options) { + o.Server.Init(server.RegisterTTL(t)) + } +} + +// RegisterInterval specifies the interval on which to re-register +func RegisterInterval(t time.Duration) Option { + return func(o *Options) { + o.Server.Init(server.RegisterInterval(t)) + } +} + +// WrapClient is a convenience method for wrapping a Client with +// some middleware component. A list of wrappers can be provided. +// Wrappers are applied in reverse order so the last is executed first. +func WrapClient(w ...client.Wrapper) Option { + return func(o *Options) { + // apply in reverse + for i := len(w); i > 0; i-- { + o.Client = w[i-1](o.Client) + } + } +} + +// WrapCall is a convenience method for wrapping a Client CallFunc +func WrapCall(w ...client.CallWrapper) Option { + return func(o *Options) { + o.Client.Init(client.WrapCall(w...)) + } +} + +// WrapHandler adds a handler Wrapper to a list of options passed into the server +func WrapHandler(w ...server.HandlerWrapper) Option { + return func(o *Options) { + var wrappers []server.Option + + for _, wrap := range w { + wrappers = append(wrappers, server.WrapHandler(wrap)) + } + + // Init once + o.Server.Init(wrappers...) + } +} + +// WrapSubscriber adds a subscriber Wrapper to a list of options passed into the server +func WrapSubscriber(w ...server.SubscriberWrapper) Option { + return func(o *Options) { + var wrappers []server.Option + + for _, wrap := range w { + wrappers = append(wrappers, server.WrapSubscriber(wrap)) + } + + // Init once + o.Server.Init(wrappers...) + } +} + +// Before and Afters + +func BeforeStart(fn func() error) Option { + return func(o *Options) { + o.BeforeStart = append(o.BeforeStart, fn) + } +} + +func BeforeStop(fn func() error) Option { + return func(o *Options) { + o.BeforeStop = append(o.BeforeStop, fn) + } +} + +func AfterStart(fn func() error) Option { + return func(o *Options) { + o.AfterStart = append(o.AfterStart, fn) + } +} + +func AfterStop(fn func() error) Option { + return func(o *Options) { + o.AfterStop = append(o.AfterStop, fn) + } +} diff --git a/service/service.go b/service/service.go index 4dc92031..5b9d3027 100644 --- a/service/service.go +++ b/service/service.go @@ -1,2 +1,16 @@ // Package service encapsulates the client, server and other interfaces to provide a complete micro service. package service + +import ( + "github.com/micro/go-micro/client" + "github.com/micro/go-micro/server" +) + +type Service interface { + Init(...Option) + Options() Options + Client() client.Client + Server() server.Server + Run() error + String() string +} diff --git a/service_test.go b/service_test.go index 8d4c4316..82195aed 100644 --- a/service_test.go +++ b/service_test.go @@ -8,8 +8,8 @@ import ( glog "github.com/go-log/log" "github.com/micro/go-micro/client" + proto "github.com/micro/go-micro/debug/proto" "github.com/micro/go-micro/registry/memory" - proto "github.com/micro/go-micro/server/debug/proto" "github.com/micro/go-micro/util/log" ) From 91f2af91de1c55bb2e68640fea2d5d0abef11aff Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Tue, 6 Aug 2019 18:05:05 +0100 Subject: [PATCH 2/2] Fix bugs in monitor --- monitor/default.go | 3 ++- monitor/default_test.go | 14 +++++++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/monitor/default.go b/monitor/default.go index 307ec36e..f534ad53 100644 --- a/monitor/default.go +++ b/monitor/default.go @@ -128,7 +128,7 @@ func (m *monitor) Watch(service string) error { func (m *monitor) Stop() error { m.Lock() - defer m.RUnlock() + defer m.Unlock() select { case <-m.exit: @@ -203,6 +203,7 @@ func newMonitor(opts ...Option) Monitor { m := &monitor{ options: options, + exit: make(chan bool), client: options.Client, registry: cache.New(options.Registry), services: make(map[string]*Status), diff --git a/monitor/default_test.go b/monitor/default_test.go index 6910cdac..da8ff1f3 100644 --- a/monitor/default_test.go +++ b/monitor/default_test.go @@ -12,8 +12,20 @@ func TestMonitor(t *testing.T) { for _, service := range services { _, err := m.Status(service) - if err != nil { + if err == nil { t.Fatal("expected status error for unknown service") } + + if err := m.Watch(service); err == nil { + t.Fatal("expected watch error for unknown service") + } + + // TODO: + // 1. start a service + // 2. watch service + // 3. get service status } + + // stop monitor + m.Stop() }