Merge pull request #650 from micro/monitor

Add monitor/debug/service packages
This commit is contained in:
Asim Aslam 2019-08-06 18:09:57 +01:00 committed by GitHub
commit bc751c55fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1043 additions and 206 deletions

41
debug/handler/debug.go Normal file
View File

@ -0,0 +1,41 @@
package handler
import (
"context"
"runtime"
"time"
proto "github.com/micro/go-micro/debug/proto"
)
type Debug struct {
proto.DebugHandler
started int64
}
var (
DefaultHandler = newDebug()
)
func newDebug() *Debug {
return &Debug{
started: time.Now().Unix(),
}
}
func (d *Debug) Health(ctx context.Context, req *proto.HealthRequest, rsp *proto.HealthResponse) error {
rsp.Status = "ok"
return nil
}
func (d *Debug) Stats(ctx context.Context, req *proto.StatsRequest, rsp *proto.StatsResponse) error {
var mstat runtime.MemStats
runtime.ReadMemStats(&mstat)
rsp.Started = uint64(d.started)
rsp.Uptime = uint64(time.Now().Unix() - d.started)
rsp.Memory = mstat.Alloc
rsp.Gc = mstat.PauseTotalNs
rsp.Threads = uint64(runtime.NumGoroutine())
return nil
}

108
debug/proto/debug.micro.go Normal file
View File

@ -0,0 +1,108 @@
// Code generated by protoc-gen-micro. DO NOT EDIT.
// source: micro/go-micro/debug/proto/debug.proto
package debug
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
math "math"
)
import (
context "context"
client "github.com/micro/go-micro/client"
server "github.com/micro/go-micro/server"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ client.Option
var _ server.Option
// Client API for Debug service
type DebugService interface {
Health(ctx context.Context, in *HealthRequest, opts ...client.CallOption) (*HealthResponse, error)
Stats(ctx context.Context, in *StatsRequest, opts ...client.CallOption) (*StatsResponse, error)
}
type debugService struct {
c client.Client
name string
}
func NewDebugService(name string, c client.Client) DebugService {
if c == nil {
c = client.NewClient()
}
if len(name) == 0 {
name = "debug"
}
return &debugService{
c: c,
name: name,
}
}
func (c *debugService) Health(ctx context.Context, in *HealthRequest, opts ...client.CallOption) (*HealthResponse, error) {
req := c.c.NewRequest(c.name, "Debug.Health", in)
out := new(HealthResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *debugService) Stats(ctx context.Context, in *StatsRequest, opts ...client.CallOption) (*StatsResponse, error) {
req := c.c.NewRequest(c.name, "Debug.Stats", in)
out := new(StatsResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for Debug service
type DebugHandler interface {
Health(context.Context, *HealthRequest, *HealthResponse) error
Stats(context.Context, *StatsRequest, *StatsResponse) error
}
func RegisterDebugHandler(s server.Server, hdlr DebugHandler, opts ...server.HandlerOption) error {
type debug interface {
Health(ctx context.Context, in *HealthRequest, out *HealthResponse) error
Stats(ctx context.Context, in *StatsRequest, out *StatsResponse) error
}
type Debug struct {
debug
}
h := &debugHandler{hdlr}
return s.Handle(s.NewHandler(&Debug{h}, opts...))
}
type debugHandler struct {
DebugHandler
}
func (h *debugHandler) Health(ctx context.Context, in *HealthRequest, out *HealthResponse) error {
return h.DebugHandler.Health(ctx, in, out)
}
func (h *debugHandler) Stats(ctx context.Context, in *StatsRequest, out *StatsResponse) error {
return h.DebugHandler.Stats(ctx, in, out)
}

336
debug/proto/debug.pb.go Normal file
View File

@ -0,0 +1,336 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: micro/go-micro/debug/proto/debug.proto
package debug
import (
context "context"
fmt "fmt"
proto "github.com/golang/protobuf/proto"
grpc "google.golang.org/grpc"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type HealthRequest struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *HealthRequest) Reset() { *m = HealthRequest{} }
func (m *HealthRequest) String() string { return proto.CompactTextString(m) }
func (*HealthRequest) ProtoMessage() {}
func (*HealthRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_f25415e61bccfa1f, []int{0}
}
func (m *HealthRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_HealthRequest.Unmarshal(m, b)
}
func (m *HealthRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_HealthRequest.Marshal(b, m, deterministic)
}
func (m *HealthRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_HealthRequest.Merge(m, src)
}
func (m *HealthRequest) XXX_Size() int {
return xxx_messageInfo_HealthRequest.Size(m)
}
func (m *HealthRequest) XXX_DiscardUnknown() {
xxx_messageInfo_HealthRequest.DiscardUnknown(m)
}
var xxx_messageInfo_HealthRequest proto.InternalMessageInfo
type HealthResponse struct {
// default: ok
Status string `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *HealthResponse) Reset() { *m = HealthResponse{} }
func (m *HealthResponse) String() string { return proto.CompactTextString(m) }
func (*HealthResponse) ProtoMessage() {}
func (*HealthResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_f25415e61bccfa1f, []int{1}
}
func (m *HealthResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_HealthResponse.Unmarshal(m, b)
}
func (m *HealthResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_HealthResponse.Marshal(b, m, deterministic)
}
func (m *HealthResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_HealthResponse.Merge(m, src)
}
func (m *HealthResponse) XXX_Size() int {
return xxx_messageInfo_HealthResponse.Size(m)
}
func (m *HealthResponse) XXX_DiscardUnknown() {
xxx_messageInfo_HealthResponse.DiscardUnknown(m)
}
var xxx_messageInfo_HealthResponse proto.InternalMessageInfo
func (m *HealthResponse) GetStatus() string {
if m != nil {
return m.Status
}
return ""
}
type StatsRequest struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *StatsRequest) Reset() { *m = StatsRequest{} }
func (m *StatsRequest) String() string { return proto.CompactTextString(m) }
func (*StatsRequest) ProtoMessage() {}
func (*StatsRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_f25415e61bccfa1f, []int{2}
}
func (m *StatsRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_StatsRequest.Unmarshal(m, b)
}
func (m *StatsRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_StatsRequest.Marshal(b, m, deterministic)
}
func (m *StatsRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_StatsRequest.Merge(m, src)
}
func (m *StatsRequest) XXX_Size() int {
return xxx_messageInfo_StatsRequest.Size(m)
}
func (m *StatsRequest) XXX_DiscardUnknown() {
xxx_messageInfo_StatsRequest.DiscardUnknown(m)
}
var xxx_messageInfo_StatsRequest proto.InternalMessageInfo
type StatsResponse struct {
// unix timestamp
Started uint64 `protobuf:"varint,1,opt,name=started,proto3" json:"started,omitempty"`
// in seconds
Uptime uint64 `protobuf:"varint,2,opt,name=uptime,proto3" json:"uptime,omitempty"`
// in bytes
Memory uint64 `protobuf:"varint,3,opt,name=memory,proto3" json:"memory,omitempty"`
// num threads
Threads uint64 `protobuf:"varint,4,opt,name=threads,proto3" json:"threads,omitempty"`
// total gc in nanoseconds
Gc uint64 `protobuf:"varint,5,opt,name=gc,proto3" json:"gc,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *StatsResponse) Reset() { *m = StatsResponse{} }
func (m *StatsResponse) String() string { return proto.CompactTextString(m) }
func (*StatsResponse) ProtoMessage() {}
func (*StatsResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_f25415e61bccfa1f, []int{3}
}
func (m *StatsResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_StatsResponse.Unmarshal(m, b)
}
func (m *StatsResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_StatsResponse.Marshal(b, m, deterministic)
}
func (m *StatsResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_StatsResponse.Merge(m, src)
}
func (m *StatsResponse) XXX_Size() int {
return xxx_messageInfo_StatsResponse.Size(m)
}
func (m *StatsResponse) XXX_DiscardUnknown() {
xxx_messageInfo_StatsResponse.DiscardUnknown(m)
}
var xxx_messageInfo_StatsResponse proto.InternalMessageInfo
func (m *StatsResponse) GetStarted() uint64 {
if m != nil {
return m.Started
}
return 0
}
func (m *StatsResponse) GetUptime() uint64 {
if m != nil {
return m.Uptime
}
return 0
}
func (m *StatsResponse) GetMemory() uint64 {
if m != nil {
return m.Memory
}
return 0
}
func (m *StatsResponse) GetThreads() uint64 {
if m != nil {
return m.Threads
}
return 0
}
func (m *StatsResponse) GetGc() uint64 {
if m != nil {
return m.Gc
}
return 0
}
func init() {
proto.RegisterType((*HealthRequest)(nil), "HealthRequest")
proto.RegisterType((*HealthResponse)(nil), "HealthResponse")
proto.RegisterType((*StatsRequest)(nil), "StatsRequest")
proto.RegisterType((*StatsResponse)(nil), "StatsResponse")
}
func init() {
proto.RegisterFile("micro/go-micro/debug/proto/debug.proto", fileDescriptor_f25415e61bccfa1f)
}
var fileDescriptor_f25415e61bccfa1f = []byte{
// 230 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x90, 0x41, 0x4b, 0xc4, 0x30,
0x10, 0x85, 0xb7, 0x75, 0x5b, 0x71, 0xb0, 0x59, 0xc8, 0x41, 0xc2, 0x9e, 0x24, 0x07, 0x29, 0x88,
0x59, 0xd0, 0xbf, 0xe0, 0xc1, 0x73, 0xbd, 0x0b, 0xd9, 0x76, 0xe8, 0x16, 0xac, 0xa9, 0x99, 0xe9,
0xc1, 0xb3, 0x7f, 0x5c, 0x9a, 0xa4, 0x60, 0x6f, 0xef, 0xbd, 0xf0, 0x1e, 0xf9, 0x06, 0x1e, 0xc6,
0xa1, 0xf5, 0xee, 0xd4, 0xbb, 0xa7, 0x28, 0x3a, 0x3c, 0xcf, 0xfd, 0x69, 0xf2, 0x8e, 0x93, 0x36,
0x41, 0xeb, 0x03, 0x54, 0x6f, 0x68, 0x3f, 0xf9, 0xd2, 0xe0, 0xf7, 0x8c, 0xc4, 0xba, 0x06, 0xb1,
0x06, 0x34, 0xb9, 0x2f, 0x42, 0x79, 0x07, 0x25, 0xb1, 0xe5, 0x99, 0x54, 0x76, 0x9f, 0xd5, 0x37,
0x4d, 0x72, 0x5a, 0xc0, 0xed, 0x3b, 0x5b, 0xa6, 0xb5, 0xf9, 0x9b, 0x41, 0x95, 0x82, 0xd4, 0x54,
0x70, 0x4d, 0x6c, 0x3d, 0x63, 0x17, 0xaa, 0xfb, 0x66, 0xb5, 0xcb, 0xe6, 0x3c, 0xf1, 0x30, 0xa2,
0xca, 0xc3, 0x43, 0x72, 0x4b, 0x3e, 0xe2, 0xe8, 0xfc, 0x8f, 0xba, 0x8a, 0x79, 0x74, 0xcb, 0x12,
0x5f, 0x3c, 0xda, 0x8e, 0xd4, 0x3e, 0x2e, 0x25, 0x2b, 0x05, 0xe4, 0x7d, 0xab, 0x8a, 0x10, 0xe6,
0x7d, 0xfb, 0xfc, 0x01, 0xc5, 0xeb, 0xc2, 0x27, 0x1f, 0xa1, 0x8c, 0x20, 0x52, 0x98, 0x0d, 0xe2,
0xf1, 0x60, 0xb6, 0x84, 0x7a, 0x27, 0x6b, 0x28, 0xc2, 0xd7, 0x65, 0x65, 0xfe, 0x33, 0x1d, 0x85,
0xd9, 0x10, 0xe9, 0xdd, 0xb9, 0x0c, 0x77, 0x7b, 0xf9, 0x0b, 0x00, 0x00, 0xff, 0xff, 0xfe, 0xb9,
0x5f, 0xf7, 0x61, 0x01, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// DebugClient is the client API for Debug service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type DebugClient interface {
Health(ctx context.Context, in *HealthRequest, opts ...grpc.CallOption) (*HealthResponse, error)
Stats(ctx context.Context, in *StatsRequest, opts ...grpc.CallOption) (*StatsResponse, error)
}
type debugClient struct {
cc *grpc.ClientConn
}
func NewDebugClient(cc *grpc.ClientConn) DebugClient {
return &debugClient{cc}
}
func (c *debugClient) Health(ctx context.Context, in *HealthRequest, opts ...grpc.CallOption) (*HealthResponse, error) {
out := new(HealthResponse)
err := c.cc.Invoke(ctx, "/Debug/Health", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *debugClient) Stats(ctx context.Context, in *StatsRequest, opts ...grpc.CallOption) (*StatsResponse, error) {
out := new(StatsResponse)
err := c.cc.Invoke(ctx, "/Debug/Stats", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// DebugServer is the server API for Debug service.
type DebugServer interface {
Health(context.Context, *HealthRequest) (*HealthResponse, error)
Stats(context.Context, *StatsRequest) (*StatsResponse, error)
}
func RegisterDebugServer(s *grpc.Server, srv DebugServer) {
s.RegisterService(&_Debug_serviceDesc, srv)
}
func _Debug_Health_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(HealthRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DebugServer).Health(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/Debug/Health",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DebugServer).Health(ctx, req.(*HealthRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Debug_Stats_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(StatsRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DebugServer).Stats(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/Debug/Stats",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DebugServer).Stats(ctx, req.(*StatsRequest))
}
return interceptor(ctx, in, info, handler)
}
var _Debug_serviceDesc = grpc.ServiceDesc{
ServiceName: "Debug",
HandlerType: (*DebugServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Health",
Handler: _Debug_Health_Handler,
},
{
MethodName: "Stats",
Handler: _Debug_Stats_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "micro/go-micro/debug/proto/debug.proto",
}

View File

@ -1,13 +1,9 @@
syntax = "proto3"; syntax = "proto3";
// This is commented out due to import cycles. service Debug {
// But its what we expect the RPC service to rpc Health(HealthRequest) returns (HealthResponse) {}
// return. rpc Stats(StatsRequest) returns (StatsResponse) {}
// }
// service Debug {
// rpc Health(HealthRequest) returns (HealthResponse) {}
// rpc Stats(StatsRequest) returns (StatsResponse) {}
// }
message HealthRequest { message HealthRequest {
} }

View File

@ -5,8 +5,8 @@ import (
"sync" "sync"
"testing" "testing"
proto "github.com/micro/go-micro/debug/proto"
"github.com/micro/go-micro/registry/memory" "github.com/micro/go-micro/registry/memory"
proto "github.com/micro/go-micro/server/debug/proto"
) )
func TestFunction(t *testing.T) { func TestFunction(t *testing.T) {

214
monitor/default.go Normal file
View File

@ -0,0 +1,214 @@
package monitor
import (
"context"
"errors"
"sync"
"time"
"github.com/micro/go-micro/client"
pb "github.com/micro/go-micro/debug/proto"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/registry/cache"
)
type monitor struct {
options Options
exit chan bool
registry cache.Cache
client client.Client
sync.RWMutex
services map[string]*Status
}
// check provides binary running/failed status.
// In the event Debug.Health cannot be called on a service we reap the node.
func (m *monitor) check(service string) (*Status, error) {
services, err := m.registry.GetService(service)
if err != nil {
return nil, err
}
// create debug client
debug := pb.NewDebugService(service, m.client)
var status *Status
var gerr error
// iterate through multiple versions of a service
for _, service := range services {
for _, node := range service.Nodes {
rsp, err := debug.Health(
context.Background(),
// empty health request
&pb.HealthRequest{},
// call this specific node
client.WithAddress(node.Address),
// retry in the event of failure
client.WithRetries(3),
)
if err != nil {
// reap the dead node
m.registry.Deregister(&registry.Service{
Name: service.Name,
Version: service.Version,
Nodes: []*registry.Node{node},
})
// save the error
gerr = err
continue
}
// expecting ok response status
if rsp.Status != "ok" {
gerr = errors.New(rsp.Status)
continue
}
// no error set status
status = &Status{
Code: StatusRunning,
Info: "running",
}
}
}
// if we got the success case return it
if status != nil {
return status, nil
}
// if gerr is not nil return it
if gerr != nil {
return &Status{
Code: StatusFailed,
Info: "not running",
Error: gerr.Error(),
}, nil
}
// otherwise unknown status
return &Status{
Code: StatusUnknown,
Info: "unknown status",
}, nil
}
func (m *monitor) Status(service string) (Status, error) {
m.RLock()
defer m.RUnlock()
if status, ok := m.services[service]; ok {
return *status, nil
}
return Status{}, ErrNotWatching
}
func (m *monitor) Watch(service string) error {
m.Lock()
defer m.Unlock()
// check if we're watching
if _, ok := m.services[service]; ok {
return nil
}
// get the status
status, err := m.check(service)
if err != nil {
return err
}
// set the status
m.services[service] = status
return nil
}
func (m *monitor) Stop() error {
m.Lock()
defer m.Unlock()
select {
case <-m.exit:
return nil
default:
close(m.exit)
for s, _ := range m.services {
delete(m.services, s)
}
m.registry.Stop()
return nil
}
return nil
}
func (m *monitor) run() {
// check the status every tick
t := time.NewTicker(time.Minute)
defer t.Stop()
check := make(chan string)
for {
select {
case <-m.exit:
return
case service := <-check:
// check the status
status, err := m.check(service)
if err != nil {
status = &Status{
Code: StatusUnknown,
Info: "unknown status",
}
}
// save the status
m.Lock()
m.services[service] = status
m.Unlock()
case <-t.C:
// create a list of services
var services []string
m.RLock()
for service, _ := range m.services {
services = append(services, service)
}
m.RUnlock()
// check the status of all watched services
for _, service := range services {
select {
case <-m.exit:
return
case check <- service:
}
}
}
}
}
func newMonitor(opts ...Option) Monitor {
options := Options{
Client: client.DefaultClient,
Registry: registry.DefaultRegistry,
}
for _, o := range opts {
o(&options)
}
m := &monitor{
options: options,
exit: make(chan bool),
client: options.Client,
registry: cache.New(options.Registry),
services: make(map[string]*Status),
}
go m.run()
return m
}

31
monitor/default_test.go Normal file
View File

@ -0,0 +1,31 @@
package monitor
import (
"testing"
)
func TestMonitor(t *testing.T) {
// create new monitor
m := NewMonitor()
services := []string{"foo", "bar", "baz"}
for _, service := range services {
_, err := m.Status(service)
if err == nil {
t.Fatal("expected status error for unknown service")
}
if err := m.Watch(service); err == nil {
t.Fatal("expected watch error for unknown service")
}
// TODO:
// 1. start a service
// 2. watch service
// 3. get service status
}
// stop monitor
m.Stop()
}

39
monitor/monitor.go Normal file
View File

@ -0,0 +1,39 @@
// Package monitor monitors service health
package monitor
import (
"errors"
)
const (
StatusUnknown StatusCode = iota
StatusRunning
StatusFailed
)
type StatusCode int
// Monitor monitors a service and reaps dead instances
type Monitor interface {
// Status of the service
Status(service string) (Status, error)
// Watch starts watching the service
Watch(service string) error
// Stop monitoring
Stop() error
}
type Status struct {
Code StatusCode
Info string
Error string
}
var (
ErrNotWatching = errors.New("not watching")
)
// NewMonitor returns a new monitor
func NewMonitor(opts ...Option) Monitor {
return newMonitor(opts...)
}

25
monitor/options.go Normal file
View File

@ -0,0 +1,25 @@
package monitor
import (
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/registry"
)
type Options struct {
Client client.Client
Registry registry.Registry
}
type Option func(*Options)
func Client(c client.Client) Option {
return func(o *Options) {
o.Client = c
}
}
func Registry(r registry.Registry) Option {
return func(o *Options) {
o.Registry = r
}
}

View File

@ -1,9 +0,0 @@
package server
import (
"github.com/micro/go-micro/server/debug"
)
func registerDebugHandler(s Server) {
s.Handle(s.NewHandler(&debug.Debug{s.Options().DebugHandler}, InternalHandler(true)))
}

View File

@ -1,55 +0,0 @@
package debug
import (
"context"
"runtime"
"time"
proto "github.com/micro/go-micro/server/debug/proto"
)
// The debug handler represents an internal server handler
// used to determine health, status and env info about
// a service node. It's akin to Google's /statusz, /healthz,
// and /varz
type Handler interface {
Health(ctx context.Context, req *proto.HealthRequest, rsp *proto.HealthResponse) error
Stats(ctx context.Context, req *proto.StatsRequest, rsp *proto.StatsResponse) error
}
// Our own internal handler
type debug struct {
started int64
}
// We use this to wrap any debug handlers so we preserve the signature Debug.{Method}
type Debug struct {
Handler
}
var (
DefaultHandler Handler = newDebug()
)
func newDebug() *debug {
return &debug{
started: time.Now().Unix(),
}
}
func (d *debug) Health(ctx context.Context, req *proto.HealthRequest, rsp *proto.HealthResponse) error {
rsp.Status = "ok"
return nil
}
func (d *debug) Stats(ctx context.Context, req *proto.StatsRequest, rsp *proto.StatsResponse) error {
var mstat runtime.MemStats
runtime.ReadMemStats(&mstat)
rsp.Started = uint64(d.started)
rsp.Uptime = uint64(time.Now().Unix() - d.started)
rsp.Memory = mstat.Alloc
rsp.Gc = mstat.PauseTotalNs
rsp.Threads = uint64(runtime.NumGoroutine())
return nil
}

View File

@ -1,100 +0,0 @@
// Code generated by protoc-gen-go.
// source: github.com/micro/go-micro/server/debug/proto/debug.proto
// DO NOT EDIT!
/*
Package debug is a generated protocol buffer package.
It is generated from these files:
github.com/micro/go-micro/server/debug/proto/debug.proto
It has these top-level messages:
HealthRequest
HealthResponse
StatsRequest
StatsResponse
*/
package debug
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type HealthRequest struct {
}
func (m *HealthRequest) Reset() { *m = HealthRequest{} }
func (m *HealthRequest) String() string { return proto.CompactTextString(m) }
func (*HealthRequest) ProtoMessage() {}
func (*HealthRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
type HealthResponse struct {
// default: ok
Status string `protobuf:"bytes,1,opt,name=status" json:"status,omitempty"`
}
func (m *HealthResponse) Reset() { *m = HealthResponse{} }
func (m *HealthResponse) String() string { return proto.CompactTextString(m) }
func (*HealthResponse) ProtoMessage() {}
func (*HealthResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
type StatsRequest struct {
}
func (m *StatsRequest) Reset() { *m = StatsRequest{} }
func (m *StatsRequest) String() string { return proto.CompactTextString(m) }
func (*StatsRequest) ProtoMessage() {}
func (*StatsRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
type StatsResponse struct {
// unix timestamp
Started uint64 `protobuf:"varint,1,opt,name=started" json:"started,omitempty"`
// in seconds
Uptime uint64 `protobuf:"varint,2,opt,name=uptime" json:"uptime,omitempty"`
// in bytes
Memory uint64 `protobuf:"varint,3,opt,name=memory" json:"memory,omitempty"`
// num threads
Threads uint64 `protobuf:"varint,4,opt,name=threads" json:"threads,omitempty"`
// in nanoseconds
Gc uint64 `protobuf:"varint,5,opt,name=gc" json:"gc,omitempty"`
}
func (m *StatsResponse) Reset() { *m = StatsResponse{} }
func (m *StatsResponse) String() string { return proto.CompactTextString(m) }
func (*StatsResponse) ProtoMessage() {}
func (*StatsResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
func init() {
proto.RegisterType((*HealthRequest)(nil), "HealthRequest")
proto.RegisterType((*HealthResponse)(nil), "HealthResponse")
proto.RegisterType((*StatsRequest)(nil), "StatsRequest")
proto.RegisterType((*StatsResponse)(nil), "StatsResponse")
}
var fileDescriptor0 = []byte{
// 201 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x34, 0x8f, 0xbd, 0x6e, 0x83, 0x30,
0x14, 0x85, 0x05, 0xa5, 0x54, 0xbd, 0x2a, 0x54, 0x62, 0xa8, 0x3c, 0x56, 0x4c, 0x2c, 0xc5, 0x43,
0x97, 0x3e, 0x42, 0x67, 0xf2, 0x04, 0xfc, 0x5c, 0x19, 0xa4, 0x38, 0x26, 0xbe, 0xd7, 0x91, 0x32,
0xe7, 0xc5, 0x03, 0xb6, 0xd9, 0xce, 0xf7, 0xd9, 0xe7, 0x48, 0x17, 0xfe, 0xd4, 0xc2, 0xb3, 0x1b,
0xda, 0xd1, 0x68, 0xa9, 0x97, 0xd1, 0x1a, 0xa9, 0xcc, 0x4f, 0x08, 0x84, 0xf6, 0x86, 0x56, 0x4e,
0x38, 0x38, 0x25, 0x57, 0x6b, 0xd8, 0x84, 0xdc, 0xfa, 0x5c, 0x7f, 0x42, 0xf1, 0x8f, 0xfd, 0x99,
0xe7, 0x0e, 0xaf, 0x0e, 0x89, 0xeb, 0x06, 0xca, 0x43, 0xd0, 0x6a, 0x2e, 0x84, 0xd5, 0x17, 0xe4,
0xc4, 0x3d, 0x3b, 0x12, 0xc9, 0x77, 0xd2, 0xbc, 0x77, 0x91, 0xea, 0x12, 0x3e, 0x4e, 0x5b, 0xa2,
0xa3, 0xf9, 0x48, 0xa0, 0x88, 0x22, 0x36, 0x05, 0xbc, 0x6d, 0x7f, 0x2d, 0xe3, 0xe4, 0xab, 0x59,
0x77, 0xe0, 0xbe, 0xe9, 0x56, 0x5e, 0x34, 0x8a, 0xd4, 0x3f, 0x44, 0xda, 0xbd, 0x46, 0x6d, 0xec,
0x5d, 0xbc, 0x04, 0x1f, 0x68, 0x5f, 0xe2, 0xd9, 0x62, 0x3f, 0x91, 0xc8, 0xc2, 0x52, 0xc4, 0xaa,
0x84, 0x54, 0x8d, 0xe2, 0xd5, 0xcb, 0x2d, 0x0d, 0xb9, 0xbf, 0xeb, 0xf7, 0x19, 0x00, 0x00, 0xff,
0xff, 0xc6, 0x75, 0x51, 0x35, 0x13, 0x01, 0x00, 0x00,
}

View File

@ -1,10 +0,0 @@
package grpc
import (
"github.com/micro/go-micro/server"
"github.com/micro/go-micro/server/debug"
)
func registerDebugHandler(s server.Server) {
s.Handle(s.NewHandler(&debug.Debug{s.Options().DebugHandler}, server.InternalHandler(true)))
}

View File

@ -700,7 +700,6 @@ func (g *grpcServer) Deregister() error {
} }
func (g *grpcServer) Start() error { func (g *grpcServer) Start() error {
registerDebugHandler(g)
config := g.opts config := g.opts
// micro: config.Transport.Listen(config.Address) // micro: config.Transport.Listen(config.Address)

View File

@ -8,7 +8,6 @@ import (
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
"github.com/micro/go-micro/server" "github.com/micro/go-micro/server"
"github.com/micro/go-micro/server/debug"
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/encoding" "google.golang.org/grpc/encoding"
@ -89,10 +88,6 @@ func newOptions(opt ...server.Option) server.Options {
opts.Transport = transport.DefaultTransport opts.Transport = transport.DefaultTransport
} }
if opts.DebugHandler == nil {
opts.DebugHandler = debug.DefaultHandler
}
if len(opts.Address) == 0 { if len(opts.Address) == 0 {
opts.Address = server.DefaultAddress opts.Address = server.DefaultAddress
} }

View File

@ -8,7 +8,6 @@ import (
"github.com/micro/go-micro/broker" "github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
"github.com/micro/go-micro/server/debug"
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
) )
@ -36,9 +35,6 @@ type Options struct {
// The router for requests // The router for requests
Router Router Router Router
// Debug Handler which can be set by a user
DebugHandler debug.Handler
// Other options for implementations of the interface // Other options for implementations of the interface
// can be stored in a context // can be stored in a context
Context context.Context Context context.Context
@ -66,10 +62,6 @@ func newOptions(opt ...Option) Options {
opts.Transport = transport.DefaultTransport opts.Transport = transport.DefaultTransport
} }
if opts.DebugHandler == nil {
opts.DebugHandler = debug.DefaultHandler
}
if opts.RegisterCheck == nil { if opts.RegisterCheck == nil {
opts.RegisterCheck = DefaultRegisterCheck opts.RegisterCheck = DefaultRegisterCheck
} }
@ -156,13 +148,6 @@ func Transport(t transport.Transport) Option {
} }
} }
// DebugHandler for this server
func DebugHandler(d debug.Handler) Option {
return func(o *Options) {
o.DebugHandler = d
}
}
// Metadata associated with the server // Metadata associated with the server
func Metadata(md map[string]string) Option { func Metadata(md map[string]string) Option {
return func(o *Options) { return func(o *Options) {

View File

@ -485,7 +485,6 @@ func (s *rpcServer) Deregister() error {
} }
func (s *rpcServer) Start() error { func (s *rpcServer) Start() error {
registerDebugHandler(s)
config := s.Options() config := s.Options()
// start listening on the transport // start listening on the transport

View File

@ -8,6 +8,7 @@ import (
"github.com/micro/go-micro/client" "github.com/micro/go-micro/client"
"github.com/micro/go-micro/config/cmd" "github.com/micro/go-micro/config/cmd"
"github.com/micro/go-micro/debug/handler"
"github.com/micro/go-micro/metadata" "github.com/micro/go-micro/metadata"
"github.com/micro/go-micro/server" "github.com/micro/go-micro/server"
) )
@ -113,6 +114,14 @@ func (s *service) Stop() error {
} }
func (s *service) Run() error { func (s *service) Run() error {
// register the debug handler
s.opts.Server.Handle(
s.opts.Server.NewHandler(
handler.DefaultHandler,
server.InternalHandler(true),
),
)
if err := s.Start(); err != nil { if err := s.Start(); err != nil {
return err return err
} }

220
service/options.go Normal file
View File

@ -0,0 +1,220 @@
package service
import (
"context"
"time"
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/server"
"github.com/micro/go-micro/transport"
)
type Options struct {
Broker broker.Broker
Client client.Client
Server server.Server
Registry registry.Registry
Transport transport.Transport
// Before and After funcs
BeforeStart []func() error
BeforeStop []func() error
AfterStart []func() error
AfterStop []func() error
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
}
type Option func(*Options)
func newOptions(opts ...Option) Options {
opt := Options{
Broker: broker.DefaultBroker,
Client: client.DefaultClient,
Server: server.DefaultServer,
Registry: registry.DefaultRegistry,
Transport: transport.DefaultTransport,
Context: context.Background(),
}
for _, o := range opts {
o(&opt)
}
return opt
}
func Broker(b broker.Broker) Option {
return func(o *Options) {
o.Broker = b
// Update Client and Server
o.Client.Init(client.Broker(b))
o.Server.Init(server.Broker(b))
}
}
func Client(c client.Client) Option {
return func(o *Options) {
o.Client = c
}
}
// Context specifies a context for the service.
// Can be used to signal shutdown of the service.
// Can be used for extra option values.
func Context(ctx context.Context) Option {
return func(o *Options) {
o.Context = ctx
}
}
func Server(s server.Server) Option {
return func(o *Options) {
o.Server = s
}
}
// Registry sets the registry for the service
// and the underlying components
func Registry(r registry.Registry) Option {
return func(o *Options) {
o.Registry = r
// Update Client and Server
o.Client.Init(client.Registry(r))
o.Server.Init(server.Registry(r))
// Update Broker
o.Broker.Init(broker.Registry(r))
}
}
// Transport sets the transport for the service
// and the underlying components
func Transport(t transport.Transport) Option {
return func(o *Options) {
o.Transport = t
// Update Client and Server
o.Client.Init(client.Transport(t))
o.Server.Init(server.Transport(t))
}
}
// Convenience options
// Address sets the address of the server
func Address(addr string) Option {
return func(o *Options) {
o.Server.Init(server.Address(addr))
}
}
// Name of the service
func Name(n string) Option {
return func(o *Options) {
o.Server.Init(server.Name(n))
}
}
// Version of the service
func Version(v string) Option {
return func(o *Options) {
o.Server.Init(server.Version(v))
}
}
// Metadata associated with the service
func Metadata(md map[string]string) Option {
return func(o *Options) {
o.Server.Init(server.Metadata(md))
}
}
// RegisterTTL specifies the TTL to use when registering the service
func RegisterTTL(t time.Duration) Option {
return func(o *Options) {
o.Server.Init(server.RegisterTTL(t))
}
}
// RegisterInterval specifies the interval on which to re-register
func RegisterInterval(t time.Duration) Option {
return func(o *Options) {
o.Server.Init(server.RegisterInterval(t))
}
}
// WrapClient is a convenience method for wrapping a Client with
// some middleware component. A list of wrappers can be provided.
// Wrappers are applied in reverse order so the last is executed first.
func WrapClient(w ...client.Wrapper) Option {
return func(o *Options) {
// apply in reverse
for i := len(w); i > 0; i-- {
o.Client = w[i-1](o.Client)
}
}
}
// WrapCall is a convenience method for wrapping a Client CallFunc
func WrapCall(w ...client.CallWrapper) Option {
return func(o *Options) {
o.Client.Init(client.WrapCall(w...))
}
}
// WrapHandler adds a handler Wrapper to a list of options passed into the server
func WrapHandler(w ...server.HandlerWrapper) Option {
return func(o *Options) {
var wrappers []server.Option
for _, wrap := range w {
wrappers = append(wrappers, server.WrapHandler(wrap))
}
// Init once
o.Server.Init(wrappers...)
}
}
// WrapSubscriber adds a subscriber Wrapper to a list of options passed into the server
func WrapSubscriber(w ...server.SubscriberWrapper) Option {
return func(o *Options) {
var wrappers []server.Option
for _, wrap := range w {
wrappers = append(wrappers, server.WrapSubscriber(wrap))
}
// Init once
o.Server.Init(wrappers...)
}
}
// Before and Afters
func BeforeStart(fn func() error) Option {
return func(o *Options) {
o.BeforeStart = append(o.BeforeStart, fn)
}
}
func BeforeStop(fn func() error) Option {
return func(o *Options) {
o.BeforeStop = append(o.BeforeStop, fn)
}
}
func AfterStart(fn func() error) Option {
return func(o *Options) {
o.AfterStart = append(o.AfterStart, fn)
}
}
func AfterStop(fn func() error) Option {
return func(o *Options) {
o.AfterStop = append(o.AfterStop, fn)
}
}

View File

@ -1,2 +1,16 @@
// Package service encapsulates the client, server and other interfaces to provide a complete micro service. // Package service encapsulates the client, server and other interfaces to provide a complete micro service.
package service package service
import (
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/server"
)
type Service interface {
Init(...Option)
Options() Options
Client() client.Client
Server() server.Server
Run() error
String() string
}

View File

@ -8,8 +8,8 @@ import (
glog "github.com/go-log/log" glog "github.com/go-log/log"
"github.com/micro/go-micro/client" "github.com/micro/go-micro/client"
proto "github.com/micro/go-micro/debug/proto"
"github.com/micro/go-micro/registry/memory" "github.com/micro/go-micro/registry/memory"
proto "github.com/micro/go-micro/server/debug/proto"
"github.com/micro/go-micro/util/log" "github.com/micro/go-micro/util/log"
) )