diff --git a/client/options.go b/client/options.go index 0cdcff95..26c116e3 100644 --- a/client/options.go +++ b/client/options.go @@ -12,6 +12,7 @@ import ( regRouter "github.com/micro/go-micro/v3/router/registry" "github.com/micro/go-micro/v3/selector" "github.com/micro/go-micro/v3/transport" + thttp "github.com/micro/go-micro/v3/transport/http" ) type Options struct { @@ -120,7 +121,7 @@ func NewOptions(options ...Option) Options { Broker: http.NewBroker(), Router: regRouter.NewRouter(), Selector: selector.DefaultSelector, - Transport: transport.DefaultTransport, + Transport: thttp.NewTransport(), } for _, o := range options { diff --git a/debug/log/log.go b/debug/log/log.go index bf90a999..dabdb899 100644 --- a/debug/log/log.go +++ b/debug/log/log.go @@ -9,9 +9,7 @@ import ( var ( // Default buffer size if any - DefaultSize = 1024 - // DefaultLog logger - DefaultLog = NewLog() + DefaultSize = 256 // Default formatter DefaultFormat = TextFormat ) diff --git a/debug/log/memory/memory.go b/debug/log/memory/memory.go index a6025169..5fba4282 100644 --- a/debug/log/memory/memory.go +++ b/debug/log/memory/memory.go @@ -8,11 +8,6 @@ import ( "github.com/micro/go-micro/v3/util/ring" ) -var ( - // DefaultSize of the logger buffer - DefaultSize = 1024 -) - // memoryLog is default micro log type memoryLog struct { *ring.Buffer diff --git a/debug/log/os.go b/debug/log/os.go deleted file mode 100644 index 02b7243b..00000000 --- a/debug/log/os.go +++ /dev/null @@ -1,81 +0,0 @@ -package log - -import ( - "sync" - - "github.com/google/uuid" - "github.com/micro/go-micro/v3/util/ring" -) - -// Should stream from OS -type osLog struct { - format FormatFunc - once sync.Once - - sync.RWMutex - buffer *ring.Buffer - subs map[string]*osStream -} - -type osStream struct { - stream chan Record -} - -// Read reads log entries from the logger -func (o *osLog) Read(...ReadOption) ([]Record, error) { - var records []Record - - // read the last 100 records - for _, v := range o.buffer.Get(100) { - records = append(records, v.Value.(Record)) - } - - return records, nil -} - -// Write writes records to log -func (o *osLog) Write(r Record) error { - o.buffer.Put(r) - return nil -} - -// Stream log records -func (o *osLog) Stream() (Stream, error) { - o.Lock() - defer o.Unlock() - - // create stream - st := &osStream{ - stream: make(chan Record, 128), - } - - // save stream - o.subs[uuid.New().String()] = st - - return st, nil -} - -func (o *osStream) Chan() <-chan Record { - return o.stream -} - -func (o *osStream) Stop() error { - return nil -} - -func NewLog(opts ...Option) Log { - options := Options{ - Format: DefaultFormat, - } - for _, o := range opts { - o(&options) - } - - l := &osLog{ - format: options.Format, - buffer: ring.New(1024), - subs: make(map[string]*osStream), - } - - return l -} diff --git a/debug/service/client.go b/debug/service/client.go deleted file mode 100644 index 29ce5569..00000000 --- a/debug/service/client.go +++ /dev/null @@ -1,94 +0,0 @@ -// Package service provides the service log -package service - -import ( - "context" - "time" - - "github.com/micro/go-micro/v3/client/grpc" - "github.com/micro/go-micro/v3/debug/log" - pb "github.com/micro/go-micro/v3/debug/service/proto" -) - -// Debug provides debug service client -type debugClient struct { - Client pb.DebugService -} - -func (d *debugClient) Trace() ([]*pb.Span, error) { - rsp, err := d.Client.Trace(context.Background(), &pb.TraceRequest{}) - if err != nil { - return nil, err - } - return rsp.Spans, nil -} - -// Logs queries the services logs and returns a channel to read the logs from -func (d *debugClient) Log(since time.Time, count int, stream bool) (log.Stream, error) { - req := &pb.LogRequest{} - if !since.IsZero() { - req.Since = since.Unix() - } - - if count > 0 { - req.Count = int64(count) - } - - // set whether to stream - req.Stream = stream - - // get the log stream - serverStream, err := d.Client.Log(context.Background(), req) - if err != nil { - return nil, err - } - - lg := &logStream{ - stream: make(chan log.Record), - stop: make(chan bool), - } - - // go stream logs - go d.streamLogs(lg, serverStream) - - return lg, nil -} - -func (d *debugClient) streamLogs(lg *logStream, stream pb.Debug_LogService) { - defer stream.Close() - defer lg.Stop() - - for { - resp, err := stream.Recv() - if err != nil { - break - } - - metadata := make(map[string]string) - for k, v := range resp.Metadata { - metadata[k] = v - } - - record := log.Record{ - Timestamp: time.Unix(resp.Timestamp, 0), - Message: resp.Message, - Metadata: metadata, - } - - select { - case <-lg.stop: - return - case lg.stream <- record: - } - } -} - -// NewClient provides a debug client -func NewClient(name string) *debugClient { - // create default client - cli := grpc.NewClient() - - return &debugClient{ - Client: pb.NewDebugService(name, cli), - } -} diff --git a/debug/service/handler/debug.go b/debug/service/handler/debug.go deleted file mode 100644 index de92993f..00000000 --- a/debug/service/handler/debug.go +++ /dev/null @@ -1,176 +0,0 @@ -// Package handler implements service debug handler embedded in go-micro services -package handler - -import ( - "context" - "time" - - "github.com/micro/go-micro/v3/client" - "github.com/micro/go-micro/v3/debug/log" - proto "github.com/micro/go-micro/v3/debug/service/proto" - "github.com/micro/go-micro/v3/debug/stats" - "github.com/micro/go-micro/v3/debug/trace" - "github.com/micro/go-micro/v3/server" -) - -// NewHandler returns an instance of the Debug Handler -func NewHandler(c client.Client) *Debug { - return &Debug{ - log: log.DefaultLog, - stats: stats.DefaultStats, - trace: trace.DefaultTracer, - cache: c.Options().Cache, - } -} - -type Debug struct { - // must honour the debug handler - proto.DebugHandler - // the logger for retrieving logs - log log.Log - // the stats collector - stats stats.Stats - // the tracer - trace trace.Tracer - // the cache - cache *client.Cache -} - -func (d *Debug) Health(ctx context.Context, req *proto.HealthRequest, rsp *proto.HealthResponse) error { - rsp.Status = "ok" - return nil -} - -func (d *Debug) Stats(ctx context.Context, req *proto.StatsRequest, rsp *proto.StatsResponse) error { - stats, err := d.stats.Read() - if err != nil { - return err - } - - if len(stats) == 0 { - return nil - } - - // write the response values - rsp.Timestamp = uint64(stats[0].Timestamp) - rsp.Started = uint64(stats[0].Started) - rsp.Uptime = uint64(stats[0].Uptime) - rsp.Memory = stats[0].Memory - rsp.Gc = stats[0].GC - rsp.Threads = stats[0].Threads - rsp.Requests = stats[0].Requests - rsp.Errors = stats[0].Errors - - return nil -} - -func (d *Debug) Trace(ctx context.Context, req *proto.TraceRequest, rsp *proto.TraceResponse) error { - traces, err := d.trace.Read(trace.ReadTrace(req.Id)) - if err != nil { - return err - } - - for _, t := range traces { - var typ proto.SpanType - switch t.Type { - case trace.SpanTypeRequestInbound: - typ = proto.SpanType_INBOUND - case trace.SpanTypeRequestOutbound: - typ = proto.SpanType_OUTBOUND - } - rsp.Spans = append(rsp.Spans, &proto.Span{ - Trace: t.Trace, - Id: t.Id, - Parent: t.Parent, - Name: t.Name, - Started: uint64(t.Started.UnixNano()), - Duration: uint64(t.Duration.Nanoseconds()), - Type: typ, - Metadata: t.Metadata, - }) - } - - return nil -} - -func (d *Debug) Log(ctx context.Context, stream server.Stream) error { - req := new(proto.LogRequest) - if err := stream.Recv(req); err != nil { - return err - } - - var options []log.ReadOption - - since := time.Unix(req.Since, 0) - if !since.IsZero() { - options = append(options, log.Since(since)) - } - - count := int(req.Count) - if count > 0 { - options = append(options, log.Count(count)) - } - - if req.Stream { - // TODO: we need to figure out how to close the log stream - // It seems like when a client disconnects, - // the connection stays open until some timeout expires - // or something like that; that means the map of streams - // might end up leaking memory if not cleaned up properly - lgStream, err := d.log.Stream() - if err != nil { - return err - } - defer lgStream.Stop() - - for record := range lgStream.Chan() { - // copy metadata - metadata := make(map[string]string) - for k, v := range record.Metadata { - metadata[k] = v - } - // send record - if err := stream.Send(&proto.Record{ - Timestamp: record.Timestamp.Unix(), - Message: record.Message.(string), - Metadata: metadata, - }); err != nil { - return err - } - } - - // done streaming, return - return nil - } - - // get the log records - records, err := d.log.Read(options...) - if err != nil { - return err - } - - // send all the logs downstream - for _, record := range records { - // copy metadata - metadata := make(map[string]string) - for k, v := range record.Metadata { - metadata[k] = v - } - // send record - if err := stream.Send(&proto.Record{ - Timestamp: record.Timestamp.Unix(), - Message: record.Message.(string), - Metadata: metadata, - }); err != nil { - return err - } - } - - return nil -} - -// Cache returns all the key value pairs in the client cache -func (d *Debug) Cache(ctx context.Context, req *proto.CacheRequest, rsp *proto.CacheResponse) error { - rsp.Values = d.cache.List() - return nil -} diff --git a/debug/service/proto/debug.pb.go b/debug/service/proto/debug.pb.go deleted file mode 100644 index 859d44cd..00000000 --- a/debug/service/proto/debug.pb.go +++ /dev/null @@ -1,970 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// source: debug/service/proto/debug.proto - -package debug - -import ( - context "context" - fmt "fmt" - proto "github.com/golang/protobuf/proto" - grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" - math "math" -) - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package - -type SpanType int32 - -const ( - SpanType_INBOUND SpanType = 0 - SpanType_OUTBOUND SpanType = 1 -) - -var SpanType_name = map[int32]string{ - 0: "INBOUND", - 1: "OUTBOUND", -} - -var SpanType_value = map[string]int32{ - "INBOUND": 0, - "OUTBOUND": 1, -} - -func (x SpanType) String() string { - return proto.EnumName(SpanType_name, int32(x)) -} - -func (SpanType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_df91f41a5db378e6, []int{0} -} - -type HealthRequest struct { - // optional service name - Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *HealthRequest) Reset() { *m = HealthRequest{} } -func (m *HealthRequest) String() string { return proto.CompactTextString(m) } -func (*HealthRequest) ProtoMessage() {} -func (*HealthRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_df91f41a5db378e6, []int{0} -} - -func (m *HealthRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_HealthRequest.Unmarshal(m, b) -} -func (m *HealthRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_HealthRequest.Marshal(b, m, deterministic) -} -func (m *HealthRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_HealthRequest.Merge(m, src) -} -func (m *HealthRequest) XXX_Size() int { - return xxx_messageInfo_HealthRequest.Size(m) -} -func (m *HealthRequest) XXX_DiscardUnknown() { - xxx_messageInfo_HealthRequest.DiscardUnknown(m) -} - -var xxx_messageInfo_HealthRequest proto.InternalMessageInfo - -func (m *HealthRequest) GetService() string { - if m != nil { - return m.Service - } - return "" -} - -type HealthResponse struct { - // default: ok - Status string `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *HealthResponse) Reset() { *m = HealthResponse{} } -func (m *HealthResponse) String() string { return proto.CompactTextString(m) } -func (*HealthResponse) ProtoMessage() {} -func (*HealthResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_df91f41a5db378e6, []int{1} -} - -func (m *HealthResponse) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_HealthResponse.Unmarshal(m, b) -} -func (m *HealthResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_HealthResponse.Marshal(b, m, deterministic) -} -func (m *HealthResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_HealthResponse.Merge(m, src) -} -func (m *HealthResponse) XXX_Size() int { - return xxx_messageInfo_HealthResponse.Size(m) -} -func (m *HealthResponse) XXX_DiscardUnknown() { - xxx_messageInfo_HealthResponse.DiscardUnknown(m) -} - -var xxx_messageInfo_HealthResponse proto.InternalMessageInfo - -func (m *HealthResponse) GetStatus() string { - if m != nil { - return m.Status - } - return "" -} - -type StatsRequest struct { - // optional service name - Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *StatsRequest) Reset() { *m = StatsRequest{} } -func (m *StatsRequest) String() string { return proto.CompactTextString(m) } -func (*StatsRequest) ProtoMessage() {} -func (*StatsRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_df91f41a5db378e6, []int{2} -} - -func (m *StatsRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_StatsRequest.Unmarshal(m, b) -} -func (m *StatsRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_StatsRequest.Marshal(b, m, deterministic) -} -func (m *StatsRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_StatsRequest.Merge(m, src) -} -func (m *StatsRequest) XXX_Size() int { - return xxx_messageInfo_StatsRequest.Size(m) -} -func (m *StatsRequest) XXX_DiscardUnknown() { - xxx_messageInfo_StatsRequest.DiscardUnknown(m) -} - -var xxx_messageInfo_StatsRequest proto.InternalMessageInfo - -func (m *StatsRequest) GetService() string { - if m != nil { - return m.Service - } - return "" -} - -type StatsResponse struct { - // timestamp of recording - Timestamp uint64 `protobuf:"varint,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"` - // unix timestamp - Started uint64 `protobuf:"varint,2,opt,name=started,proto3" json:"started,omitempty"` - // in seconds - Uptime uint64 `protobuf:"varint,3,opt,name=uptime,proto3" json:"uptime,omitempty"` - // in bytes - Memory uint64 `protobuf:"varint,4,opt,name=memory,proto3" json:"memory,omitempty"` - // num threads - Threads uint64 `protobuf:"varint,5,opt,name=threads,proto3" json:"threads,omitempty"` - // total gc in nanoseconds - Gc uint64 `protobuf:"varint,6,opt,name=gc,proto3" json:"gc,omitempty"` - // total number of requests - Requests uint64 `protobuf:"varint,7,opt,name=requests,proto3" json:"requests,omitempty"` - // total number of errors - Errors uint64 `protobuf:"varint,8,opt,name=errors,proto3" json:"errors,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *StatsResponse) Reset() { *m = StatsResponse{} } -func (m *StatsResponse) String() string { return proto.CompactTextString(m) } -func (*StatsResponse) ProtoMessage() {} -func (*StatsResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_df91f41a5db378e6, []int{3} -} - -func (m *StatsResponse) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_StatsResponse.Unmarshal(m, b) -} -func (m *StatsResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_StatsResponse.Marshal(b, m, deterministic) -} -func (m *StatsResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_StatsResponse.Merge(m, src) -} -func (m *StatsResponse) XXX_Size() int { - return xxx_messageInfo_StatsResponse.Size(m) -} -func (m *StatsResponse) XXX_DiscardUnknown() { - xxx_messageInfo_StatsResponse.DiscardUnknown(m) -} - -var xxx_messageInfo_StatsResponse proto.InternalMessageInfo - -func (m *StatsResponse) GetTimestamp() uint64 { - if m != nil { - return m.Timestamp - } - return 0 -} - -func (m *StatsResponse) GetStarted() uint64 { - if m != nil { - return m.Started - } - return 0 -} - -func (m *StatsResponse) GetUptime() uint64 { - if m != nil { - return m.Uptime - } - return 0 -} - -func (m *StatsResponse) GetMemory() uint64 { - if m != nil { - return m.Memory - } - return 0 -} - -func (m *StatsResponse) GetThreads() uint64 { - if m != nil { - return m.Threads - } - return 0 -} - -func (m *StatsResponse) GetGc() uint64 { - if m != nil { - return m.Gc - } - return 0 -} - -func (m *StatsResponse) GetRequests() uint64 { - if m != nil { - return m.Requests - } - return 0 -} - -func (m *StatsResponse) GetErrors() uint64 { - if m != nil { - return m.Errors - } - return 0 -} - -// LogRequest requests service logs -type LogRequest struct { - // service to request logs for - Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"` - // stream records continuously - Stream bool `protobuf:"varint,2,opt,name=stream,proto3" json:"stream,omitempty"` - // count of records to request - Count int64 `protobuf:"varint,3,opt,name=count,proto3" json:"count,omitempty"` - // relative time in seconds - // before the current time - // from which to show logs - Since int64 `protobuf:"varint,4,opt,name=since,proto3" json:"since,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *LogRequest) Reset() { *m = LogRequest{} } -func (m *LogRequest) String() string { return proto.CompactTextString(m) } -func (*LogRequest) ProtoMessage() {} -func (*LogRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_df91f41a5db378e6, []int{4} -} - -func (m *LogRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_LogRequest.Unmarshal(m, b) -} -func (m *LogRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_LogRequest.Marshal(b, m, deterministic) -} -func (m *LogRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_LogRequest.Merge(m, src) -} -func (m *LogRequest) XXX_Size() int { - return xxx_messageInfo_LogRequest.Size(m) -} -func (m *LogRequest) XXX_DiscardUnknown() { - xxx_messageInfo_LogRequest.DiscardUnknown(m) -} - -var xxx_messageInfo_LogRequest proto.InternalMessageInfo - -func (m *LogRequest) GetService() string { - if m != nil { - return m.Service - } - return "" -} - -func (m *LogRequest) GetStream() bool { - if m != nil { - return m.Stream - } - return false -} - -func (m *LogRequest) GetCount() int64 { - if m != nil { - return m.Count - } - return 0 -} - -func (m *LogRequest) GetSince() int64 { - if m != nil { - return m.Since - } - return 0 -} - -// Record is service log record -type Record struct { - // timestamp of log record - Timestamp int64 `protobuf:"varint,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"` - // record metadata - Metadata map[string]string `protobuf:"bytes,2,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` - // message - Message string `protobuf:"bytes,3,opt,name=message,proto3" json:"message,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *Record) Reset() { *m = Record{} } -func (m *Record) String() string { return proto.CompactTextString(m) } -func (*Record) ProtoMessage() {} -func (*Record) Descriptor() ([]byte, []int) { - return fileDescriptor_df91f41a5db378e6, []int{5} -} - -func (m *Record) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Record.Unmarshal(m, b) -} -func (m *Record) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Record.Marshal(b, m, deterministic) -} -func (m *Record) XXX_Merge(src proto.Message) { - xxx_messageInfo_Record.Merge(m, src) -} -func (m *Record) XXX_Size() int { - return xxx_messageInfo_Record.Size(m) -} -func (m *Record) XXX_DiscardUnknown() { - xxx_messageInfo_Record.DiscardUnknown(m) -} - -var xxx_messageInfo_Record proto.InternalMessageInfo - -func (m *Record) GetTimestamp() int64 { - if m != nil { - return m.Timestamp - } - return 0 -} - -func (m *Record) GetMetadata() map[string]string { - if m != nil { - return m.Metadata - } - return nil -} - -func (m *Record) GetMessage() string { - if m != nil { - return m.Message - } - return "" -} - -type TraceRequest struct { - // trace id to retrieve - Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *TraceRequest) Reset() { *m = TraceRequest{} } -func (m *TraceRequest) String() string { return proto.CompactTextString(m) } -func (*TraceRequest) ProtoMessage() {} -func (*TraceRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_df91f41a5db378e6, []int{6} -} - -func (m *TraceRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_TraceRequest.Unmarshal(m, b) -} -func (m *TraceRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_TraceRequest.Marshal(b, m, deterministic) -} -func (m *TraceRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_TraceRequest.Merge(m, src) -} -func (m *TraceRequest) XXX_Size() int { - return xxx_messageInfo_TraceRequest.Size(m) -} -func (m *TraceRequest) XXX_DiscardUnknown() { - xxx_messageInfo_TraceRequest.DiscardUnknown(m) -} - -var xxx_messageInfo_TraceRequest proto.InternalMessageInfo - -func (m *TraceRequest) GetId() string { - if m != nil { - return m.Id - } - return "" -} - -type TraceResponse struct { - Spans []*Span `protobuf:"bytes,1,rep,name=spans,proto3" json:"spans,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *TraceResponse) Reset() { *m = TraceResponse{} } -func (m *TraceResponse) String() string { return proto.CompactTextString(m) } -func (*TraceResponse) ProtoMessage() {} -func (*TraceResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_df91f41a5db378e6, []int{7} -} - -func (m *TraceResponse) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_TraceResponse.Unmarshal(m, b) -} -func (m *TraceResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_TraceResponse.Marshal(b, m, deterministic) -} -func (m *TraceResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_TraceResponse.Merge(m, src) -} -func (m *TraceResponse) XXX_Size() int { - return xxx_messageInfo_TraceResponse.Size(m) -} -func (m *TraceResponse) XXX_DiscardUnknown() { - xxx_messageInfo_TraceResponse.DiscardUnknown(m) -} - -var xxx_messageInfo_TraceResponse proto.InternalMessageInfo - -func (m *TraceResponse) GetSpans() []*Span { - if m != nil { - return m.Spans - } - return nil -} - -type Span struct { - // the trace id - Trace string `protobuf:"bytes,1,opt,name=trace,proto3" json:"trace,omitempty"` - // id of the span - Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"` - // parent span - Parent string `protobuf:"bytes,3,opt,name=parent,proto3" json:"parent,omitempty"` - // name of the resource - Name string `protobuf:"bytes,4,opt,name=name,proto3" json:"name,omitempty"` - // time of start in nanoseconds - Started uint64 `protobuf:"varint,5,opt,name=started,proto3" json:"started,omitempty"` - // duration of the execution in nanoseconds - Duration uint64 `protobuf:"varint,6,opt,name=duration,proto3" json:"duration,omitempty"` - // associated metadata - Metadata map[string]string `protobuf:"bytes,7,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` - Type SpanType `protobuf:"varint,8,opt,name=type,proto3,enum=SpanType" json:"type,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *Span) Reset() { *m = Span{} } -func (m *Span) String() string { return proto.CompactTextString(m) } -func (*Span) ProtoMessage() {} -func (*Span) Descriptor() ([]byte, []int) { - return fileDescriptor_df91f41a5db378e6, []int{8} -} - -func (m *Span) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Span.Unmarshal(m, b) -} -func (m *Span) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Span.Marshal(b, m, deterministic) -} -func (m *Span) XXX_Merge(src proto.Message) { - xxx_messageInfo_Span.Merge(m, src) -} -func (m *Span) XXX_Size() int { - return xxx_messageInfo_Span.Size(m) -} -func (m *Span) XXX_DiscardUnknown() { - xxx_messageInfo_Span.DiscardUnknown(m) -} - -var xxx_messageInfo_Span proto.InternalMessageInfo - -func (m *Span) GetTrace() string { - if m != nil { - return m.Trace - } - return "" -} - -func (m *Span) GetId() string { - if m != nil { - return m.Id - } - return "" -} - -func (m *Span) GetParent() string { - if m != nil { - return m.Parent - } - return "" -} - -func (m *Span) GetName() string { - if m != nil { - return m.Name - } - return "" -} - -func (m *Span) GetStarted() uint64 { - if m != nil { - return m.Started - } - return 0 -} - -func (m *Span) GetDuration() uint64 { - if m != nil { - return m.Duration - } - return 0 -} - -func (m *Span) GetMetadata() map[string]string { - if m != nil { - return m.Metadata - } - return nil -} - -func (m *Span) GetType() SpanType { - if m != nil { - return m.Type - } - return SpanType_INBOUND -} - -type CacheRequest struct { - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *CacheRequest) Reset() { *m = CacheRequest{} } -func (m *CacheRequest) String() string { return proto.CompactTextString(m) } -func (*CacheRequest) ProtoMessage() {} -func (*CacheRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_df91f41a5db378e6, []int{9} -} - -func (m *CacheRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_CacheRequest.Unmarshal(m, b) -} -func (m *CacheRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_CacheRequest.Marshal(b, m, deterministic) -} -func (m *CacheRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_CacheRequest.Merge(m, src) -} -func (m *CacheRequest) XXX_Size() int { - return xxx_messageInfo_CacheRequest.Size(m) -} -func (m *CacheRequest) XXX_DiscardUnknown() { - xxx_messageInfo_CacheRequest.DiscardUnknown(m) -} - -var xxx_messageInfo_CacheRequest proto.InternalMessageInfo - -type CacheResponse struct { - Values map[string]string `protobuf:"bytes,1,rep,name=values,proto3" json:"values,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *CacheResponse) Reset() { *m = CacheResponse{} } -func (m *CacheResponse) String() string { return proto.CompactTextString(m) } -func (*CacheResponse) ProtoMessage() {} -func (*CacheResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_df91f41a5db378e6, []int{10} -} - -func (m *CacheResponse) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_CacheResponse.Unmarshal(m, b) -} -func (m *CacheResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_CacheResponse.Marshal(b, m, deterministic) -} -func (m *CacheResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_CacheResponse.Merge(m, src) -} -func (m *CacheResponse) XXX_Size() int { - return xxx_messageInfo_CacheResponse.Size(m) -} -func (m *CacheResponse) XXX_DiscardUnknown() { - xxx_messageInfo_CacheResponse.DiscardUnknown(m) -} - -var xxx_messageInfo_CacheResponse proto.InternalMessageInfo - -func (m *CacheResponse) GetValues() map[string]string { - if m != nil { - return m.Values - } - return nil -} - -func init() { - proto.RegisterEnum("SpanType", SpanType_name, SpanType_value) - proto.RegisterType((*HealthRequest)(nil), "HealthRequest") - proto.RegisterType((*HealthResponse)(nil), "HealthResponse") - proto.RegisterType((*StatsRequest)(nil), "StatsRequest") - proto.RegisterType((*StatsResponse)(nil), "StatsResponse") - proto.RegisterType((*LogRequest)(nil), "LogRequest") - proto.RegisterType((*Record)(nil), "Record") - proto.RegisterMapType((map[string]string)(nil), "Record.MetadataEntry") - proto.RegisterType((*TraceRequest)(nil), "TraceRequest") - proto.RegisterType((*TraceResponse)(nil), "TraceResponse") - proto.RegisterType((*Span)(nil), "Span") - proto.RegisterMapType((map[string]string)(nil), "Span.MetadataEntry") - proto.RegisterType((*CacheRequest)(nil), "CacheRequest") - proto.RegisterType((*CacheResponse)(nil), "CacheResponse") - proto.RegisterMapType((map[string]string)(nil), "CacheResponse.ValuesEntry") -} - -func init() { proto.RegisterFile("debug/service/proto/debug.proto", fileDescriptor_df91f41a5db378e6) } - -var fileDescriptor_df91f41a5db378e6 = []byte{ - // 646 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x54, 0xdb, 0x6e, 0xd3, 0x4a, - 0x14, 0x8d, 0xed, 0x38, 0xb1, 0x77, 0x62, 0x9f, 0x6a, 0xce, 0x45, 0x96, 0x0f, 0xd0, 0xca, 0x12, - 0x52, 0xb8, 0x68, 0x02, 0xe1, 0x85, 0xcb, 0x1b, 0x14, 0x09, 0xa4, 0xd2, 0x4a, 0xd3, 0x96, 0xf7, - 0xa9, 0x3d, 0x4a, 0x03, 0xf5, 0x85, 0x99, 0x71, 0xa5, 0xbc, 0xf0, 0x23, 0xfc, 0x04, 0xff, 0x82, - 0xf8, 0x1f, 0x34, 0x17, 0xb7, 0xb6, 0x10, 0xaa, 0x10, 0x6f, 0x5e, 0x6b, 0xaf, 0xd9, 0xd9, 0x7b, - 0x69, 0x65, 0xc3, 0x6e, 0xc1, 0xce, 0xda, 0xf5, 0x52, 0x30, 0x7e, 0xb9, 0xc9, 0xd9, 0xb2, 0xe1, - 0xb5, 0xac, 0x97, 0x9a, 0xc3, 0xfa, 0x3b, 0xbb, 0x07, 0xd1, 0x1b, 0x46, 0x2f, 0xe4, 0x39, 0x61, - 0x9f, 0x5a, 0x26, 0x24, 0x4a, 0x60, 0x6a, 0xd5, 0x89, 0xb3, 0xe7, 0x2c, 0x42, 0xd2, 0xc1, 0x6c, - 0x01, 0x71, 0x27, 0x15, 0x4d, 0x5d, 0x09, 0x86, 0xfe, 0x83, 0x89, 0x90, 0x54, 0xb6, 0xc2, 0x4a, - 0x2d, 0xca, 0x16, 0x30, 0x3f, 0x96, 0x54, 0x8a, 0x9b, 0x7b, 0x7e, 0x77, 0x20, 0xb2, 0x52, 0xdb, - 0xf3, 0x16, 0x84, 0x72, 0x53, 0x32, 0x21, 0x69, 0xd9, 0x68, 0xf5, 0x98, 0x5c, 0x13, 0xba, 0x93, - 0xa4, 0x5c, 0xb2, 0x22, 0x71, 0x75, 0xad, 0x83, 0x6a, 0x96, 0xb6, 0x51, 0xc2, 0xc4, 0xd3, 0x05, - 0x8b, 0x14, 0x5f, 0xb2, 0xb2, 0xe6, 0xdb, 0x64, 0x6c, 0x78, 0x83, 0x54, 0x27, 0x79, 0xce, 0x19, - 0x2d, 0x44, 0xe2, 0x9b, 0x4e, 0x16, 0xa2, 0x18, 0xdc, 0x75, 0x9e, 0x4c, 0x34, 0xe9, 0xae, 0x73, - 0x94, 0x42, 0xc0, 0xcd, 0x22, 0x22, 0x99, 0x6a, 0xf6, 0x0a, 0xab, 0xee, 0x8c, 0xf3, 0x9a, 0x8b, - 0x24, 0x30, 0xdd, 0x0d, 0xca, 0x3e, 0x00, 0x1c, 0xd4, 0xeb, 0x1b, 0xf7, 0x37, 0x0e, 0x72, 0x46, - 0x4b, 0xbd, 0x4e, 0x40, 0x2c, 0x42, 0xff, 0x80, 0x9f, 0xd7, 0x6d, 0x25, 0xf5, 0x32, 0x1e, 0x31, - 0x40, 0xb1, 0x62, 0x53, 0xe5, 0x4c, 0xaf, 0xe2, 0x11, 0x03, 0xb2, 0xaf, 0x0e, 0x4c, 0x08, 0xcb, - 0x6b, 0x5e, 0xfc, 0x6c, 0x9e, 0xd7, 0x37, 0xef, 0x31, 0x04, 0x25, 0x93, 0xb4, 0xa0, 0x92, 0x26, - 0xee, 0x9e, 0xb7, 0x98, 0xad, 0xfe, 0xc5, 0xe6, 0x21, 0x7e, 0x67, 0xf9, 0xd7, 0x95, 0xe4, 0x5b, - 0x72, 0x25, 0x53, 0x93, 0x97, 0x4c, 0x08, 0xba, 0x36, 0xb6, 0x86, 0xa4, 0x83, 0xe9, 0x0b, 0x88, - 0x06, 0x8f, 0xd0, 0x0e, 0x78, 0x1f, 0xd9, 0xd6, 0x2e, 0xa8, 0x3e, 0xd5, 0xb8, 0x97, 0xf4, 0xa2, - 0x65, 0x7a, 0xb7, 0x90, 0x18, 0xf0, 0xdc, 0x7d, 0xea, 0x64, 0x77, 0x60, 0x7e, 0xc2, 0x69, 0xce, - 0x3a, 0x83, 0x62, 0x70, 0x37, 0x85, 0x7d, 0xea, 0x6e, 0x8a, 0xec, 0x21, 0x44, 0xb6, 0x6e, 0x53, - 0xf1, 0x3f, 0xf8, 0xa2, 0xa1, 0x95, 0x0a, 0x9a, 0x9a, 0xdb, 0xc7, 0xc7, 0x0d, 0xad, 0x88, 0xe1, - 0xb2, 0x2f, 0x2e, 0x8c, 0x15, 0x56, 0x3f, 0x28, 0xd5, 0x33, 0xdb, 0xc9, 0x00, 0xdb, 0xdc, 0xed, - 0x9a, 0x2b, 0xcf, 0x1b, 0xca, 0x99, 0x35, 0x37, 0x24, 0x16, 0x21, 0x04, 0xe3, 0x8a, 0x96, 0xc6, - 0xdc, 0x90, 0xe8, 0xef, 0x7e, 0xde, 0xfc, 0x61, 0xde, 0x52, 0x08, 0x8a, 0x96, 0x53, 0xb9, 0xa9, - 0x2b, 0x9b, 0x95, 0x2b, 0x8c, 0x96, 0x3d, 0xa3, 0xa7, 0x7a, 0xe0, 0xbf, 0xf5, 0xc0, 0xbf, 0xb4, - 0xf9, 0x36, 0x8c, 0xe5, 0xb6, 0x61, 0x3a, 0x44, 0xf1, 0x2a, 0xd4, 0xe2, 0x93, 0x6d, 0xc3, 0x88, - 0xa6, 0xff, 0xcc, 0xeb, 0x18, 0xe6, 0xaf, 0x68, 0x7e, 0xde, 0x79, 0x9d, 0x7d, 0x86, 0xc8, 0x62, - 0xeb, 0xed, 0x0a, 0x26, 0x5a, 0xdd, 0x99, 0x9b, 0xe2, 0x41, 0x1d, 0xbf, 0xd7, 0x45, 0x33, 0xb2, - 0x55, 0xa6, 0xcf, 0x60, 0xd6, 0xa3, 0x7f, 0x67, 0x9e, 0xfb, 0x77, 0x21, 0xe8, 0xd6, 0x43, 0x33, - 0x98, 0xbe, 0x3d, 0x7c, 0x79, 0x74, 0x7a, 0xb8, 0xbf, 0x33, 0x42, 0x73, 0x08, 0x8e, 0x4e, 0x4f, - 0x0c, 0x72, 0x56, 0xdf, 0x1c, 0xf0, 0xf7, 0xd5, 0xa1, 0x42, 0xbb, 0xe0, 0x1d, 0xd4, 0x6b, 0x34, - 0xc3, 0xd7, 0xff, 0xa8, 0x74, 0x6a, 0x83, 0x9b, 0x8d, 0x1e, 0x39, 0xe8, 0x01, 0x4c, 0xcc, 0x61, - 0x42, 0x31, 0x1e, 0x1c, 0xb3, 0xf4, 0x2f, 0x3c, 0xbc, 0x58, 0xd9, 0x08, 0x2d, 0xc0, 0xd7, 0x07, - 0x07, 0x45, 0xb8, 0x7f, 0xa3, 0xd2, 0x18, 0x0f, 0xee, 0x90, 0x51, 0xea, 0x10, 0xa2, 0x08, 0xf7, - 0xc3, 0x9a, 0xc6, 0x78, 0x90, 0x4d, 0xa3, 0xd4, 0x96, 0xa1, 0x08, 0xf7, 0xad, 0x4e, 0xe3, 0xa1, - 0x93, 0xd9, 0xe8, 0x6c, 0xa2, 0xaf, 0xee, 0x93, 0x1f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x22, 0x65, - 0x99, 0x10, 0x98, 0x05, 0x00, 0x00, -} - -// Reference imports to suppress errors if they are not otherwise used. -var _ context.Context -var _ grpc.ClientConn - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -const _ = grpc.SupportPackageIsVersion4 - -// DebugClient is the client API for Debug service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. -type DebugClient interface { - Log(ctx context.Context, in *LogRequest, opts ...grpc.CallOption) (Debug_LogClient, error) - Health(ctx context.Context, in *HealthRequest, opts ...grpc.CallOption) (*HealthResponse, error) - Stats(ctx context.Context, in *StatsRequest, opts ...grpc.CallOption) (*StatsResponse, error) - Trace(ctx context.Context, in *TraceRequest, opts ...grpc.CallOption) (*TraceResponse, error) - Cache(ctx context.Context, in *CacheRequest, opts ...grpc.CallOption) (*CacheResponse, error) -} - -type debugClient struct { - cc *grpc.ClientConn -} - -func NewDebugClient(cc *grpc.ClientConn) DebugClient { - return &debugClient{cc} -} - -func (c *debugClient) Log(ctx context.Context, in *LogRequest, opts ...grpc.CallOption) (Debug_LogClient, error) { - stream, err := c.cc.NewStream(ctx, &_Debug_serviceDesc.Streams[0], "/Debug/Log", opts...) - if err != nil { - return nil, err - } - x := &debugLogClient{stream} - if err := x.ClientStream.SendMsg(in); err != nil { - return nil, err - } - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } - return x, nil -} - -type Debug_LogClient interface { - Recv() (*Record, error) - grpc.ClientStream -} - -type debugLogClient struct { - grpc.ClientStream -} - -func (x *debugLogClient) Recv() (*Record, error) { - m := new(Record) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -func (c *debugClient) Health(ctx context.Context, in *HealthRequest, opts ...grpc.CallOption) (*HealthResponse, error) { - out := new(HealthResponse) - err := c.cc.Invoke(ctx, "/Debug/Health", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *debugClient) Stats(ctx context.Context, in *StatsRequest, opts ...grpc.CallOption) (*StatsResponse, error) { - out := new(StatsResponse) - err := c.cc.Invoke(ctx, "/Debug/Stats", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *debugClient) Trace(ctx context.Context, in *TraceRequest, opts ...grpc.CallOption) (*TraceResponse, error) { - out := new(TraceResponse) - err := c.cc.Invoke(ctx, "/Debug/Trace", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *debugClient) Cache(ctx context.Context, in *CacheRequest, opts ...grpc.CallOption) (*CacheResponse, error) { - out := new(CacheResponse) - err := c.cc.Invoke(ctx, "/Debug/Cache", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -// DebugServer is the server API for Debug service. -type DebugServer interface { - Log(*LogRequest, Debug_LogServer) error - Health(context.Context, *HealthRequest) (*HealthResponse, error) - Stats(context.Context, *StatsRequest) (*StatsResponse, error) - Trace(context.Context, *TraceRequest) (*TraceResponse, error) - Cache(context.Context, *CacheRequest) (*CacheResponse, error) -} - -// UnimplementedDebugServer can be embedded to have forward compatible implementations. -type UnimplementedDebugServer struct { -} - -func (*UnimplementedDebugServer) Log(req *LogRequest, srv Debug_LogServer) error { - return status.Errorf(codes.Unimplemented, "method Log not implemented") -} -func (*UnimplementedDebugServer) Health(ctx context.Context, req *HealthRequest) (*HealthResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Health not implemented") -} -func (*UnimplementedDebugServer) Stats(ctx context.Context, req *StatsRequest) (*StatsResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Stats not implemented") -} -func (*UnimplementedDebugServer) Trace(ctx context.Context, req *TraceRequest) (*TraceResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Trace not implemented") -} -func (*UnimplementedDebugServer) Cache(ctx context.Context, req *CacheRequest) (*CacheResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Cache not implemented") -} - -func RegisterDebugServer(s *grpc.Server, srv DebugServer) { - s.RegisterService(&_Debug_serviceDesc, srv) -} - -func _Debug_Log_Handler(srv interface{}, stream grpc.ServerStream) error { - m := new(LogRequest) - if err := stream.RecvMsg(m); err != nil { - return err - } - return srv.(DebugServer).Log(m, &debugLogServer{stream}) -} - -type Debug_LogServer interface { - Send(*Record) error - grpc.ServerStream -} - -type debugLogServer struct { - grpc.ServerStream -} - -func (x *debugLogServer) Send(m *Record) error { - return x.ServerStream.SendMsg(m) -} - -func _Debug_Health_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(HealthRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(DebugServer).Health(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/Debug/Health", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(DebugServer).Health(ctx, req.(*HealthRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Debug_Stats_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(StatsRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(DebugServer).Stats(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/Debug/Stats", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(DebugServer).Stats(ctx, req.(*StatsRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Debug_Trace_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(TraceRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(DebugServer).Trace(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/Debug/Trace", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(DebugServer).Trace(ctx, req.(*TraceRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Debug_Cache_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(CacheRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(DebugServer).Cache(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/Debug/Cache", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(DebugServer).Cache(ctx, req.(*CacheRequest)) - } - return interceptor(ctx, in, info, handler) -} - -var _Debug_serviceDesc = grpc.ServiceDesc{ - ServiceName: "Debug", - HandlerType: (*DebugServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "Health", - Handler: _Debug_Health_Handler, - }, - { - MethodName: "Stats", - Handler: _Debug_Stats_Handler, - }, - { - MethodName: "Trace", - Handler: _Debug_Trace_Handler, - }, - { - MethodName: "Cache", - Handler: _Debug_Cache_Handler, - }, - }, - Streams: []grpc.StreamDesc{ - { - StreamName: "Log", - Handler: _Debug_Log_Handler, - ServerStreams: true, - }, - }, - Metadata: "debug/service/proto/debug.proto", -} diff --git a/debug/service/proto/debug.pb.micro.go b/debug/service/proto/debug.pb.micro.go deleted file mode 100644 index cfece62d..00000000 --- a/debug/service/proto/debug.pb.micro.go +++ /dev/null @@ -1,236 +0,0 @@ -// Code generated by protoc-gen-micro. DO NOT EDIT. -// source: debug/service/proto/debug.proto - -package debug - -import ( - fmt "fmt" - proto "github.com/golang/protobuf/proto" - math "math" -) - -import ( - context "context" - api "github.com/micro/go-micro/v3/api" - client "github.com/micro/go-micro/v3/client" - server "github.com/micro/go-micro/v3/server" -) - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package - -// Reference imports to suppress errors if they are not otherwise used. -var _ api.Endpoint -var _ context.Context -var _ client.Option -var _ server.Option - -// Api Endpoints for Debug service - -func NewDebugEndpoints() []*api.Endpoint { - return []*api.Endpoint{} -} - -// Client API for Debug service - -type DebugService interface { - Log(ctx context.Context, in *LogRequest, opts ...client.CallOption) (Debug_LogService, error) - Health(ctx context.Context, in *HealthRequest, opts ...client.CallOption) (*HealthResponse, error) - Stats(ctx context.Context, in *StatsRequest, opts ...client.CallOption) (*StatsResponse, error) - Trace(ctx context.Context, in *TraceRequest, opts ...client.CallOption) (*TraceResponse, error) - Cache(ctx context.Context, in *CacheRequest, opts ...client.CallOption) (*CacheResponse, error) -} - -type debugService struct { - c client.Client - name string -} - -func NewDebugService(name string, c client.Client) DebugService { - return &debugService{ - c: c, - name: name, - } -} - -func (c *debugService) Log(ctx context.Context, in *LogRequest, opts ...client.CallOption) (Debug_LogService, error) { - req := c.c.NewRequest(c.name, "Debug.Log", &LogRequest{}) - stream, err := c.c.Stream(ctx, req, opts...) - if err != nil { - return nil, err - } - if err := stream.Send(in); err != nil { - return nil, err - } - return &debugServiceLog{stream}, nil -} - -type Debug_LogService interface { - Context() context.Context - SendMsg(interface{}) error - RecvMsg(interface{}) error - Close() error - Recv() (*Record, error) -} - -type debugServiceLog struct { - stream client.Stream -} - -func (x *debugServiceLog) Close() error { - return x.stream.Close() -} - -func (x *debugServiceLog) Context() context.Context { - return x.stream.Context() -} - -func (x *debugServiceLog) SendMsg(m interface{}) error { - return x.stream.Send(m) -} - -func (x *debugServiceLog) RecvMsg(m interface{}) error { - return x.stream.Recv(m) -} - -func (x *debugServiceLog) Recv() (*Record, error) { - m := new(Record) - err := x.stream.Recv(m) - if err != nil { - return nil, err - } - return m, nil -} - -func (c *debugService) Health(ctx context.Context, in *HealthRequest, opts ...client.CallOption) (*HealthResponse, error) { - req := c.c.NewRequest(c.name, "Debug.Health", in) - out := new(HealthResponse) - err := c.c.Call(ctx, req, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *debugService) Stats(ctx context.Context, in *StatsRequest, opts ...client.CallOption) (*StatsResponse, error) { - req := c.c.NewRequest(c.name, "Debug.Stats", in) - out := new(StatsResponse) - err := c.c.Call(ctx, req, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *debugService) Trace(ctx context.Context, in *TraceRequest, opts ...client.CallOption) (*TraceResponse, error) { - req := c.c.NewRequest(c.name, "Debug.Trace", in) - out := new(TraceResponse) - err := c.c.Call(ctx, req, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *debugService) Cache(ctx context.Context, in *CacheRequest, opts ...client.CallOption) (*CacheResponse, error) { - req := c.c.NewRequest(c.name, "Debug.Cache", in) - out := new(CacheResponse) - err := c.c.Call(ctx, req, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -// Server API for Debug service - -type DebugHandler interface { - Log(context.Context, *LogRequest, Debug_LogStream) error - Health(context.Context, *HealthRequest, *HealthResponse) error - Stats(context.Context, *StatsRequest, *StatsResponse) error - Trace(context.Context, *TraceRequest, *TraceResponse) error - Cache(context.Context, *CacheRequest, *CacheResponse) error -} - -func RegisterDebugHandler(s server.Server, hdlr DebugHandler, opts ...server.HandlerOption) error { - type debug interface { - Log(ctx context.Context, stream server.Stream) error - Health(ctx context.Context, in *HealthRequest, out *HealthResponse) error - Stats(ctx context.Context, in *StatsRequest, out *StatsResponse) error - Trace(ctx context.Context, in *TraceRequest, out *TraceResponse) error - Cache(ctx context.Context, in *CacheRequest, out *CacheResponse) error - } - type Debug struct { - debug - } - h := &debugHandler{hdlr} - return s.Handle(s.NewHandler(&Debug{h}, opts...)) -} - -type debugHandler struct { - DebugHandler -} - -func (h *debugHandler) Log(ctx context.Context, stream server.Stream) error { - m := new(LogRequest) - if err := stream.Recv(m); err != nil { - return err - } - return h.DebugHandler.Log(ctx, m, &debugLogStream{stream}) -} - -type Debug_LogStream interface { - Context() context.Context - SendMsg(interface{}) error - RecvMsg(interface{}) error - Close() error - Send(*Record) error -} - -type debugLogStream struct { - stream server.Stream -} - -func (x *debugLogStream) Close() error { - return x.stream.Close() -} - -func (x *debugLogStream) Context() context.Context { - return x.stream.Context() -} - -func (x *debugLogStream) SendMsg(m interface{}) error { - return x.stream.Send(m) -} - -func (x *debugLogStream) RecvMsg(m interface{}) error { - return x.stream.Recv(m) -} - -func (x *debugLogStream) Send(m *Record) error { - return x.stream.Send(m) -} - -func (h *debugHandler) Health(ctx context.Context, in *HealthRequest, out *HealthResponse) error { - return h.DebugHandler.Health(ctx, in, out) -} - -func (h *debugHandler) Stats(ctx context.Context, in *StatsRequest, out *StatsResponse) error { - return h.DebugHandler.Stats(ctx, in, out) -} - -func (h *debugHandler) Trace(ctx context.Context, in *TraceRequest, out *TraceResponse) error { - return h.DebugHandler.Trace(ctx, in, out) -} - -func (h *debugHandler) Cache(ctx context.Context, in *CacheRequest, out *CacheResponse) error { - return h.DebugHandler.Cache(ctx, in, out) -} diff --git a/debug/service/proto/debug.proto b/debug/service/proto/debug.proto deleted file mode 100644 index 6646cb65..00000000 --- a/debug/service/proto/debug.proto +++ /dev/null @@ -1,106 +0,0 @@ -syntax = "proto3"; - -service Debug { - rpc Log(LogRequest) returns (stream Record) {}; - rpc Health(HealthRequest) returns (HealthResponse) {}; - rpc Stats(StatsRequest) returns (StatsResponse) {}; - rpc Trace(TraceRequest) returns (TraceResponse) {}; - rpc Cache(CacheRequest) returns (CacheResponse) {}; -} - -message HealthRequest { - // optional service name - string service = 1; -} - -message HealthResponse { - // default: ok - string status = 1; -} - -message StatsRequest { - // optional service name - string service = 1; -} - -message StatsResponse { - // timestamp of recording - uint64 timestamp = 1; - // unix timestamp - uint64 started = 2; - // in seconds - uint64 uptime = 3; - // in bytes - uint64 memory = 4; - // num threads - uint64 threads = 5; - // total gc in nanoseconds - uint64 gc = 6; - // total number of requests - uint64 requests = 7; - // total number of errors - uint64 errors = 8; -} - -// LogRequest requests service logs -message LogRequest { - // service to request logs for - string service = 1; - // stream records continuously - bool stream = 2; - // count of records to request - int64 count = 3; - // relative time in seconds - // before the current time - // from which to show logs - int64 since = 4; -} - -// Record is service log record -message Record { - // timestamp of log record - int64 timestamp = 1; - // record metadata - map metadata = 2; - // message - string message = 3; -} - -message TraceRequest { - // trace id to retrieve - string id = 1; -} - -message TraceResponse { - repeated Span spans = 1; -} - - -enum SpanType { - INBOUND = 0; - OUTBOUND = 1; -} - -message Span { - // the trace id - string trace = 1; - // id of the span - string id = 2; - // parent span - string parent = 3; - // name of the resource - string name = 4; - // time of start in nanoseconds - uint64 started = 5; - // duration of the execution in nanoseconds - uint64 duration = 6; - // associated metadata - map metadata = 7; - SpanType type = 8; -} - -message CacheRequest {} - -message CacheResponse { - map values = 1; -} \ No newline at end of file diff --git a/debug/service/service.go b/debug/service/service.go deleted file mode 100644 index 3aee13e5..00000000 --- a/debug/service/service.go +++ /dev/null @@ -1,64 +0,0 @@ -package service - -import ( - "time" - - "github.com/micro/go-micro/v3/debug" - "github.com/micro/go-micro/v3/debug/log" -) - -type serviceLog struct { - Client *debugClient -} - -// Read reads log entries from the logger -func (s *serviceLog) Read(opts ...log.ReadOption) ([]log.Record, error) { - var options log.ReadOptions - for _, o := range opts { - o(&options) - } - - stream, err := s.Client.Log(options.Since, options.Count, false) - if err != nil { - return nil, err - } - defer stream.Stop() - - // stream the records until nothing is left - var records []log.Record - - for record := range stream.Chan() { - records = append(records, record) - } - - return records, nil -} - -// There is no write support -func (s *serviceLog) Write(r log.Record) error { - return nil -} - -// Stream log records -func (s *serviceLog) Stream() (log.Stream, error) { - return s.Client.Log(time.Time{}, 0, true) -} - -// NewLog returns a new log interface -func NewLog(opts ...log.Option) log.Log { - var options log.Options - for _, o := range opts { - o(&options) - } - - name := options.Name - - // set the default name - if len(name) == 0 { - name = debug.DefaultName - } - - return &serviceLog{ - Client: NewClient(name), - } -} diff --git a/debug/service/stream.go b/debug/service/stream.go deleted file mode 100644 index cb801329..00000000 --- a/debug/service/stream.go +++ /dev/null @@ -1,25 +0,0 @@ -package service - -import ( - "github.com/micro/go-micro/v3/debug/log" -) - -type logStream struct { - stream chan log.Record - stop chan bool -} - -func (l *logStream) Chan() <-chan log.Record { - return l.stream -} - -func (l *logStream) Stop() error { - select { - case <-l.stop: - return nil - default: - close(l.stream) - close(l.stop) - } - return nil -} diff --git a/debug/stats/default.go b/debug/stats/memory/memory.go similarity index 63% rename from debug/stats/default.go rename to debug/stats/memory/memory.go index 25d8957a..c539ccd0 100644 --- a/debug/stats/default.go +++ b/debug/stats/memory/memory.go @@ -5,10 +5,11 @@ import ( "sync" "time" + "github.com/micro/go-micro/v3/debug/stats" "github.com/micro/go-micro/v3/util/ring" ) -type stats struct { +type memoryStats struct { // used to store past stats buffer *ring.Buffer @@ -18,7 +19,7 @@ type stats struct { errors uint64 } -func (s *stats) snapshot() *Stat { +func (s *memoryStats) snapshot() *stats.Stat { s.RLock() defer s.RUnlock() @@ -27,7 +28,7 @@ func (s *stats) snapshot() *Stat { now := time.Now().Unix() - return &Stat{ + return &stats.Stat{ Timestamp: now, Started: s.started, Uptime: now - s.started, @@ -39,32 +40,31 @@ func (s *stats) snapshot() *Stat { } } -func (s *stats) Read() ([]*Stat, error) { - // TODO adjustable size and optional read values - buf := s.buffer.Get(60) - var stats []*Stat +func (s *memoryStats) Read() ([]*stats.Stat, error) { + buf := s.buffer.Get(s.buffer.Size()) + var buffer []*stats.Stat // get a value from the buffer if it exists for _, b := range buf { - stat, ok := b.Value.(*Stat) + stat, ok := b.Value.(*stats.Stat) if !ok { continue } - stats = append(stats, stat) + buffer = append(buffer, stat) } // get a snapshot - stats = append(stats, s.snapshot()) + buffer = append(buffer, s.snapshot()) - return stats, nil + return buffer, nil } -func (s *stats) Write(stat *Stat) error { +func (s *memoryStats) Write(stat *stats.Stat) error { s.buffer.Put(stat) return nil } -func (s *stats) Record(err error) error { +func (s *memoryStats) Record(err error) error { s.Lock() defer s.Unlock() @@ -81,9 +81,9 @@ func (s *stats) Record(err error) error { // NewStats returns a new in memory stats buffer // TODO add options -func NewStats() Stats { - return &stats{ +func NewStats() stats.Stats { + return &memoryStats{ started: time.Now().Unix(), - buffer: ring.New(60), + buffer: ring.New(1), } } diff --git a/debug/stats/stats.go b/debug/stats/stats.go index 6cbf4b16..7c714c56 100644 --- a/debug/stats/stats.go +++ b/debug/stats/stats.go @@ -30,7 +30,3 @@ type Stat struct { // Total errors Errors uint64 } - -var ( - DefaultStats = NewStats() -) diff --git a/go.mod b/go.mod index dae152fa..d4b8e798 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,6 @@ require ( github.com/bitly/go-simplejson v0.5.0 github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b - github.com/bwmarrin/discordgo v0.20.2 github.com/caddyserver/certmagic v0.10.6 github.com/coreos/bbolt v1.3.3 // indirect github.com/coreos/etcd v3.3.18+incompatible @@ -21,13 +20,11 @@ require ( github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/ef-ds/deque v1.0.4-0.20190904040645-54cb57c252a1 github.com/evanphx/json-patch/v5 v5.0.0 - github.com/forestgiant/sliceutil v0.0.0-20160425183142-94783f95db6c github.com/fsnotify/fsnotify v1.4.7 github.com/fsouza/go-dockerclient v1.6.0 github.com/ghodss/yaml v1.0.0 github.com/go-acme/lego/v3 v3.4.0 github.com/go-git/go-git/v5 v5.1.0 - github.com/go-telegram-bot-api/telegram-bot-api v4.6.4+incompatible // indirect github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee github.com/gobwas/pool v0.2.0 // indirect github.com/gobwas/ws v1.0.3 @@ -52,13 +49,11 @@ require ( github.com/mitchellh/hashstructure v1.0.0 github.com/nats-io/nats-server/v2 v2.1.6 // indirect github.com/nats-io/nats.go v1.9.2 - github.com/nlopes/slack v0.6.1-0.20191106133607-d06c2a2b3249 github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c github.com/patrickmn/go-cache v2.1.0+incompatible github.com/pkg/errors v0.9.1 github.com/soheilhy/cmux v0.1.4 // indirect github.com/stretchr/testify v1.4.0 - github.com/technoweenie/multipartstreamer v1.0.1 // indirect github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf github.com/tmc/grpc-websocket-proxy v0.0.0-20200122045848-3419fae592fc // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect @@ -71,6 +66,5 @@ require ( google.golang.org/genproto v0.0.0-20191216164720-4f79533eabd1 google.golang.org/grpc v1.26.0 google.golang.org/protobuf v1.22.0 - gopkg.in/telegram-bot-api.v4 v4.6.4 sigs.k8s.io/yaml v1.1.0 // indirect ) diff --git a/go.sum b/go.sum index 60cf6405..2ca78362 100644 --- a/go.sum +++ b/go.sum @@ -64,8 +64,6 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4Yn github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b h1:L/QXpzIa3pOvUGt1D1lA5KjYhPBAN/3iWdP7xeFS9F0= github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b/go.mod h1:H0wQNHz2YrLsuXOZozoeDmnHXkNCRmMW0gwFWDfEZDA= -github.com/bwmarrin/discordgo v0.20.2 h1:nA7jiTtqUA9lT93WL2jPjUp8ZTEInRujBdx1C9gkr20= -github.com/bwmarrin/discordgo v0.20.2/go.mod h1:O9S4p+ofTFwB02em7jkpkV8M3R0/PUVOwN61zSZ0r4Q= github.com/caddyserver/certmagic v0.10.6 h1:sCya6FmfaN74oZE46kqfaFOVoROD/mF36rTQfjN7TZc= github.com/caddyserver/certmagic v0.10.6/go.mod h1:Y8jcUBctgk/IhpAzlHKfimZNyXCkfGgRTC0orl8gROQ= github.com/cenkalti/backoff/v4 v4.0.0 h1:6VeaLF9aI+MAUQ95106HwWzYZgJJpZ4stumjj6RFYAU= @@ -134,8 +132,6 @@ github.com/exoscale/egoscale v0.18.1/go.mod h1:Z7OOdzzTOz1Q1PjQXumlz9Wn/CddH0zSY github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568 h1:BHsljHzVlRcyQhjrss6TZTdY2VfCqZPbv5k3iBFa2ZQ= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= -github.com/forestgiant/sliceutil v0.0.0-20160425183142-94783f95db6c h1:pBgVXWDXju1m8W4lnEeIqTHPOzhTUO81a7yknM/xQR4= -github.com/forestgiant/sliceutil v0.0.0-20160425183142-94783f95db6c/go.mod h1:pFdJbAhRf7rh6YYMUdIQGyzne6zYL1tCUW8QV2B3UfY= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsouza/go-dockerclient v1.6.0 h1:f7j+AX94143JL1H3TiqSMkM4EcLDI0De1qD4GGn3Hig= @@ -162,8 +158,6 @@ github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2 github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= -github.com/go-telegram-bot-api/telegram-bot-api v4.6.4+incompatible h1:2cauKuaELYAEARXRkq2LrJ0yDDv1rW7+wrTEdVL3uaU= -github.com/go-telegram-bot-api/telegram-bot-api v4.6.4+incompatible/go.mod h1:qf9acutJ8cwBUhm1bqgz6Bei9/C/c93FPDljKWwsOgM= github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0= github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8= @@ -222,8 +216,6 @@ github.com/gorilla/handlers v1.4.2 h1:0QniY0USkHQ1RGCLfKxeNHK9bkDHGRYGNDFBCS+YAR github.com/gorilla/handlers v1.4.2/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ= github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= -github.com/gorilla/websocket v1.2.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= -github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 h1:THDBEeQ9xZ8JEaCLyLQqXMMdRqNr0QAUJTIkQAUtFjg= @@ -331,8 +323,6 @@ github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OS github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32/go.mod h1:9wM+0iRr9ahx58uYLpLIr5fm8diHn0JbqRycJi6w0Ms= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= -github.com/nlopes/slack v0.6.1-0.20191106133607-d06c2a2b3249 h1:Pr5gZa2VcmktVwq0lyC39MsN5tz356vC/pQHKvq+QBo= -github.com/nlopes/slack v0.6.1-0.20191106133607-d06c2a2b3249/go.mod h1:JzQ9m3PMAqcpeCam7UaHSuBuupz7CmpjehYMayT6YOk= github.com/nrdcg/auroradns v1.0.0/go.mod h1:6JPXKzIRzZzMqtTDgueIhTi6rFf1QvYE/HzqidhOhjw= github.com/nrdcg/dnspod-go v0.4.0/go.mod h1:vZSoFSFeQVm2gWLMkyX61LZ8HI3BaqtHZWgPTGKr6KQ= github.com/nrdcg/goinwx v0.6.1/go.mod h1:XPiut7enlbEdntAqalBIqcYcTEVhpv/dKWgDCX2SwKQ= @@ -416,8 +406,6 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= -github.com/technoweenie/multipartstreamer v1.0.1 h1:XRztA5MXiR1TIRHxH2uNxXxaIkKQDeX7m2XsSOlQEnM= -github.com/technoweenie/multipartstreamer v1.0.1/go.mod h1:jNVxdtShOxzAsukZwTSw6MDx5eUJoiEBsSvzDU9uzog= github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf h1:Z2X3Os7oRzpdJ75iPqWZc0HeJWFYNCvKsfpQwFpRNTA= github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf/go.mod h1:M8agBzgqHIhgj7wEn9/0hJUZcrvt9VY+Ln+S1I5Mha0= github.com/timewasted/linode v0.0.0-20160829202747-37e84520dcf7/go.mod h1:imsgLplxEC/etjIhdr3dNzV3JeT27LbVu5pYWm0JCBY= @@ -457,7 +445,6 @@ go.uber.org/zap v1.13.0 h1:nR6NoDBgAf67s68NhaXbsojM+2gxp3S1hWkHDl27pVU= go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= golang.org/x/crypto v0.0.0-20180621125126-a49355c7e3f8/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= -golang.org/x/crypto v0.0.0-20181030102418-4d3f4d9ffa16/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190219172222-a4c6cb3142f2/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -647,8 +634,6 @@ gopkg.in/resty.v1 v1.9.1/go.mod h1:vo52Hzryw9PnPHcJfPsBiFW62XhNx5OczbV9y+IMpgc= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/square/go-jose.v2 v2.3.1 h1:SK5KegNXmKmqE342YYN2qPHEnUYeoMiXXl1poUlI+o4= gopkg.in/square/go-jose.v2 v2.3.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= -gopkg.in/telegram-bot-api.v4 v4.6.4 h1:hpHWhzn4jTCsAJZZ2loNKfy2QWyPDRJVl3aTFXeMW8g= -gopkg.in/telegram-bot-api.v4 v4.6.4/go.mod h1:5DpGO5dbumb40px+dXcwCpcjmeHNYLpk0bp3XRNvWDM= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/warnings.v0 v0.1.2 h1:wFXVbFY8DY5/xOe1ECiWdKCzZlxgshcYVNkBHstARME= diff --git a/logger/default.go b/logger/default.go index 8f1149f2..6c96b5db 100644 --- a/logger/default.go +++ b/logger/default.go @@ -113,8 +113,6 @@ func (l *defaultLogger) Log(level Level, v ...interface{}) { metadata += fmt.Sprintf(" %s=%v", k, fields[k]) } - dlog.DefaultLog.Write(rec) - t := rec.Timestamp.Format("2006-01-02 15:04:05") fmt.Printf("%s %s %v\n", t, metadata, rec.Message) } @@ -154,8 +152,6 @@ func (l *defaultLogger) Logf(level Level, format string, v ...interface{}) { metadata += fmt.Sprintf(" %s=%v", k, fields[k]) } - dlog.DefaultLog.Write(rec) - t := rec.Timestamp.Format("2006-01-02 15:04:05") fmt.Printf("%s %s %v\n", t, metadata, rec.Message) } diff --git a/server/grpc/options.go b/server/grpc/options.go index 34eb2e1b..d5bd36d5 100644 --- a/server/grpc/options.go +++ b/server/grpc/options.go @@ -9,7 +9,6 @@ import ( "github.com/micro/go-micro/v3/codec" "github.com/micro/go-micro/v3/registry/mdns" "github.com/micro/go-micro/v3/server" - "github.com/micro/go-micro/v3/transport" "google.golang.org/grpc" "google.golang.org/grpc/encoding" ) @@ -70,7 +69,6 @@ func newOptions(opt ...server.Option) server.Options { Metadata: map[string]string{}, Broker: http.NewBroker(), Registry: mdns.NewRegistry(), - Transport: transport.DefaultTransport, Address: server.DefaultAddress, Name: server.DefaultName, Id: server.DefaultId, diff --git a/server/options.go b/server/options.go index b4c5b636..59611363 100644 --- a/server/options.go +++ b/server/options.go @@ -14,6 +14,7 @@ import ( "github.com/micro/go-micro/v3/registry" "github.com/micro/go-micro/v3/registry/mdns" "github.com/micro/go-micro/v3/transport" + thttp "github.com/micro/go-micro/v3/transport/http" ) type Options struct { @@ -72,7 +73,7 @@ func newOptions(opt ...Option) Options { } if opts.Transport == nil { - opts.Transport = transport.DefaultTransport + opts.Transport = thttp.NewTransport() } if opts.RegisterCheck == nil { @@ -228,7 +229,7 @@ func TLSConfig(t *tls.Config) Option { // set the default transport if one is not // already set. Required for Init call below. if o.Transport == nil { - o.Transport = transport.DefaultTransport + o.Transport = thttp.NewTransport() } // set the transport tls diff --git a/transport/http/http.go b/transport/http/http.go index 03c73fac..63f3ff58 100644 --- a/transport/http/http.go +++ b/transport/http/http.go @@ -1,11 +1,602 @@ -// Package http returns a http2 transport using net/http package http import ( + "bufio" + "bytes" + "crypto/tls" + "errors" + "io" + "io/ioutil" + "net" + "net/http" + "net/url" + "sync" + "time" + "github.com/micro/go-micro/v3/transport" + maddr "github.com/micro/go-micro/v3/util/addr" + "github.com/micro/go-micro/v3/util/buf" + mnet "github.com/micro/go-micro/v3/util/net" + mls "github.com/micro/go-micro/v3/util/tls" + "golang.org/x/net/http2" + "golang.org/x/net/http2/h2c" ) -// NewTransport returns a new http transport using net/http and supporting http2 -func NewTransport(opts ...transport.Option) transport.Transport { - return transport.NewTransport(opts...) +type httpTransport struct { + opts transport.Options +} + +type httpTransportClient struct { + ht *httpTransport + addr string + conn net.Conn + dialOpts transport.DialOptions + once sync.Once + + sync.RWMutex + + // request must be stored for response processing + r chan *http.Request + bl []*http.Request + buff *bufio.Reader + + // local/remote ip + local string + remote string +} + +type httpTransportSocket struct { + ht *httpTransport + w http.ResponseWriter + r *http.Request + rw *bufio.ReadWriter + + mtx sync.RWMutex + + // the hijacked when using http 1 + conn net.Conn + // for the first request + ch chan *http.Request + + // h2 things + buf *bufio.Reader + // indicate if socket is closed + closed chan bool + + // local/remote ip + local string + remote string +} + +type httpTransportListener struct { + ht *httpTransport + listener net.Listener +} + +func (h *httpTransportClient) Local() string { + return h.local +} + +func (h *httpTransportClient) Remote() string { + return h.remote +} + +func (h *httpTransportClient) Send(m *transport.Message) error { + header := make(http.Header) + + for k, v := range m.Header { + header.Set(k, v) + } + + b := buf.New(bytes.NewBuffer(m.Body)) + defer b.Close() + + req := &http.Request{ + Method: "POST", + URL: &url.URL{ + Scheme: "http", + Host: h.addr, + }, + Header: header, + Body: b, + ContentLength: int64(b.Len()), + Host: h.addr, + } + + h.Lock() + h.bl = append(h.bl, req) + select { + case h.r <- h.bl[0]: + h.bl = h.bl[1:] + default: + } + h.Unlock() + + // set timeout if its greater than 0 + if h.ht.opts.Timeout > time.Duration(0) { + h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout)) + } + + return req.Write(h.conn) +} + +func (h *httpTransportClient) Recv(m *transport.Message) error { + if m == nil { + return errors.New("message passed in is nil") + } + + var r *http.Request + if !h.dialOpts.Stream { + rc, ok := <-h.r + if !ok { + return io.EOF + } + r = rc + } + + // set timeout if its greater than 0 + if h.ht.opts.Timeout > time.Duration(0) { + h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout)) + } + + rsp, err := http.ReadResponse(h.buff, r) + if err != nil { + return err + } + defer rsp.Body.Close() + + b, err := ioutil.ReadAll(rsp.Body) + if err != nil { + return err + } + + if rsp.StatusCode != 200 { + return errors.New(rsp.Status + ": " + string(b)) + } + + m.Body = b + + if m.Header == nil { + m.Header = make(map[string]string, len(rsp.Header)) + } + + for k, v := range rsp.Header { + if len(v) > 0 { + m.Header[k] = v[0] + } else { + m.Header[k] = "" + } + } + + return nil +} + +func (h *httpTransportClient) Close() error { + h.once.Do(func() { + h.Lock() + h.buff.Reset(nil) + h.Unlock() + close(h.r) + }) + return h.conn.Close() +} + +func (h *httpTransportSocket) Local() string { + return h.local +} + +func (h *httpTransportSocket) Remote() string { + return h.remote +} + +func (h *httpTransportSocket) Recv(m *transport.Message) error { + if m == nil { + return errors.New("message passed in is nil") + } + if m.Header == nil { + m.Header = make(map[string]string, len(h.r.Header)) + } + + // process http 1 + if h.r.ProtoMajor == 1 { + // set timeout if its greater than 0 + if h.ht.opts.Timeout > time.Duration(0) { + h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout)) + } + + var r *http.Request + + select { + // get first request + case r = <-h.ch: + // read next request + default: + rr, err := http.ReadRequest(h.rw.Reader) + if err != nil { + return err + } + r = rr + } + + // read body + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return err + } + + // set body + r.Body.Close() + m.Body = b + + // set headers + for k, v := range r.Header { + if len(v) > 0 { + m.Header[k] = v[0] + } else { + m.Header[k] = "" + } + } + + // return early early + return nil + } + + // only process if the socket is open + select { + case <-h.closed: + return io.EOF + default: + // no op + } + + // processing http2 request + // read streaming body + + // set max buffer size + // TODO: adjustable buffer size + buf := make([]byte, 4*1024*1024) + + // read the request body + n, err := h.buf.Read(buf) + // not an eof error + if err != nil { + return err + } + + // check if we have data + if n > 0 { + m.Body = buf[:n] + } + + // set headers + for k, v := range h.r.Header { + if len(v) > 0 { + m.Header[k] = v[0] + } else { + m.Header[k] = "" + } + } + + // set path + m.Header[":path"] = h.r.URL.Path + + return nil +} + +func (h *httpTransportSocket) Send(m *transport.Message) error { + if h.r.ProtoMajor == 1 { + // make copy of header + hdr := make(http.Header) + for k, v := range h.r.Header { + hdr[k] = v + } + + rsp := &http.Response{ + Header: hdr, + Body: ioutil.NopCloser(bytes.NewReader(m.Body)), + Status: "200 OK", + StatusCode: 200, + Proto: "HTTP/1.1", + ProtoMajor: 1, + ProtoMinor: 1, + ContentLength: int64(len(m.Body)), + } + + for k, v := range m.Header { + rsp.Header.Set(k, v) + } + + // set timeout if its greater than 0 + if h.ht.opts.Timeout > time.Duration(0) { + h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout)) + } + + return rsp.Write(h.conn) + } + + // only process if the socket is open + select { + case <-h.closed: + return io.EOF + default: + // no op + } + + // we need to lock to protect the write + h.mtx.RLock() + defer h.mtx.RUnlock() + + // set headers + for k, v := range m.Header { + h.w.Header().Set(k, v) + } + + // write request + _, err := h.w.Write(m.Body) + + // flush the trailers + h.w.(http.Flusher).Flush() + + return err +} + +func (h *httpTransportSocket) error(m *transport.Message) error { + if h.r.ProtoMajor == 1 { + rsp := &http.Response{ + Header: make(http.Header), + Body: ioutil.NopCloser(bytes.NewReader(m.Body)), + Status: "500 Internal Server Error", + StatusCode: 500, + Proto: "HTTP/1.1", + ProtoMajor: 1, + ProtoMinor: 1, + ContentLength: int64(len(m.Body)), + } + + for k, v := range m.Header { + rsp.Header.Set(k, v) + } + + return rsp.Write(h.conn) + } + + return nil +} + +func (h *httpTransportSocket) Close() error { + h.mtx.Lock() + defer h.mtx.Unlock() + select { + case <-h.closed: + return nil + default: + // close the channel + close(h.closed) + + // close the buffer + h.r.Body.Close() + + // close the connection + if h.r.ProtoMajor == 1 { + return h.conn.Close() + } + } + + return nil +} + +func (h *httpTransportListener) Addr() string { + return h.listener.Addr().String() +} + +func (h *httpTransportListener) Close() error { + return h.listener.Close() +} + +func (h *httpTransportListener) Accept(fn func(transport.Socket)) error { + // create handler mux + mux := http.NewServeMux() + + // register our transport handler + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + var buf *bufio.ReadWriter + var con net.Conn + + // read a regular request + if r.ProtoMajor == 1 { + b, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + r.Body = ioutil.NopCloser(bytes.NewReader(b)) + // hijack the conn + hj, ok := w.(http.Hijacker) + if !ok { + // we're screwed + http.Error(w, "cannot serve conn", http.StatusInternalServerError) + return + } + + conn, bufrw, err := hj.Hijack() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer conn.Close() + buf = bufrw + con = conn + } + + // buffered reader + bufr := bufio.NewReader(r.Body) + + // save the request + ch := make(chan *http.Request, 1) + ch <- r + + // create a new transport socket + sock := &httpTransportSocket{ + ht: h.ht, + w: w, + r: r, + rw: buf, + buf: bufr, + ch: ch, + conn: con, + local: h.Addr(), + remote: r.RemoteAddr, + closed: make(chan bool), + } + + // execute the socket + fn(sock) + }) + + // get optional handlers + if h.ht.opts.Context != nil { + handlers, ok := h.ht.opts.Context.Value("http_handlers").(map[string]http.Handler) + if ok { + for pattern, handler := range handlers { + mux.Handle(pattern, handler) + } + } + } + + // default http2 server + srv := &http.Server{ + Handler: mux, + } + + // insecure connection use h2c + if !(h.ht.opts.Secure || h.ht.opts.TLSConfig != nil) { + srv.Handler = h2c.NewHandler(mux, &http2.Server{}) + } + + // begin serving + return srv.Serve(h.listener) +} + +func (h *httpTransport) Dial(addr string, opts ...transport.DialOption) (transport.Client, error) { + dopts := transport.DialOptions{ + Timeout: transport.DefaultDialTimeout, + } + + for _, opt := range opts { + opt(&dopts) + } + + var conn net.Conn + var err error + + // TODO: support dial option here rather than using internal config + if h.opts.Secure || h.opts.TLSConfig != nil { + config := h.opts.TLSConfig + if config == nil { + config = &tls.Config{ + InsecureSkipVerify: true, + } + } + config.NextProtos = []string{"http/1.1"} + conn, err = newConn(func(addr string) (net.Conn, error) { + return tls.DialWithDialer(&net.Dialer{Timeout: dopts.Timeout}, "tcp", addr, config) + })(addr) + } else { + conn, err = newConn(func(addr string) (net.Conn, error) { + return net.DialTimeout("tcp", addr, dopts.Timeout) + })(addr) + } + + if err != nil { + return nil, err + } + + return &httpTransportClient{ + ht: h, + addr: addr, + conn: conn, + buff: bufio.NewReader(conn), + dialOpts: dopts, + r: make(chan *http.Request, 1), + local: conn.LocalAddr().String(), + remote: conn.RemoteAddr().String(), + }, nil +} + +func (h *httpTransport) Listen(addr string, opts ...transport.ListenOption) (transport.Listener, error) { + var options transport.ListenOptions + for _, o := range opts { + o(&options) + } + + var l net.Listener + var err error + + // TODO: support use of listen options + if h.opts.Secure || h.opts.TLSConfig != nil { + config := h.opts.TLSConfig + + fn := func(addr string) (net.Listener, error) { + if config == nil { + hosts := []string{addr} + + // check if its a valid host:port + if host, _, err := net.SplitHostPort(addr); err == nil { + if len(host) == 0 { + hosts = maddr.IPs() + } else { + hosts = []string{host} + } + } + + // generate a certificate + cert, err := mls.Certificate(hosts...) + if err != nil { + return nil, err + } + config = &tls.Config{Certificates: []tls.Certificate{cert}} + } + return tls.Listen("tcp", addr, config) + } + + l, err = mnet.Listen(addr, fn) + } else { + fn := func(addr string) (net.Listener, error) { + return net.Listen("tcp", addr) + } + + l, err = mnet.Listen(addr, fn) + } + + if err != nil { + return nil, err + } + + return &httpTransportListener{ + ht: h, + listener: l, + }, nil +} + +func (h *httpTransport) Init(opts ...transport.Option) error { + for _, o := range opts { + o(&h.opts) + } + return nil +} + +func (h *httpTransport) Options() transport.Options { + return h.opts +} + +func (h *httpTransport) String() string { + return "http" +} + +func NewTransport(opts ...transport.Option) transport.Transport { + var options transport.Options + for _, o := range opts { + o(&options) + } + return &httpTransport{opts: options} } diff --git a/transport/http_proxy.go b/transport/http/http_proxy.go similarity index 99% rename from transport/http_proxy.go rename to transport/http/http_proxy.go index 567dd739..328091b5 100644 --- a/transport/http_proxy.go +++ b/transport/http/http_proxy.go @@ -1,4 +1,4 @@ -package transport +package http import ( "bufio" diff --git a/transport/http_transport_test.go b/transport/http/http_transport_test.go similarity index 87% rename from transport/http_transport_test.go rename to transport/http/http_transport_test.go index cbbc8658..4f26e674 100644 --- a/transport/http_transport_test.go +++ b/transport/http/http_transport_test.go @@ -1,13 +1,15 @@ -package transport +package http import ( "io" "net" "testing" "time" + + "github.com/micro/go-micro/v3/transport" ) -func expectedPort(t *testing.T, expected string, lsn Listener) { +func expectedPort(t *testing.T, expected string, lsn transport.Listener) { _, port, err := net.SplitHostPort(lsn.Addr()) if err != nil { t.Errorf("Expected address to be `%s`, got error: %v", expected, err) @@ -53,11 +55,11 @@ func TestHTTPTransportCommunication(t *testing.T) { } defer l.Close() - fn := func(sock Socket) { + fn := func(sock transport.Socket) { defer sock.Close() for { - var m Message + var m transport.Message if err := sock.Recv(&m); err != nil { return } @@ -86,7 +88,7 @@ func TestHTTPTransportCommunication(t *testing.T) { } defer c.Close() - m := Message{ + m := transport.Message{ Header: map[string]string{ "Content-Type": "application/json", }, @@ -97,7 +99,7 @@ func TestHTTPTransportCommunication(t *testing.T) { t.Errorf("Unexpected send err: %v", err) } - var rm Message + var rm transport.Message if err := c.Recv(&rm); err != nil { t.Errorf("Unexpected recv err: %v", err) @@ -119,11 +121,11 @@ func TestHTTPTransportError(t *testing.T) { } defer l.Close() - fn := func(sock Socket) { + fn := func(sock transport.Socket) { defer sock.Close() for { - var m Message + var m transport.Message if err := sock.Recv(&m); err != nil { if err == io.EOF { return @@ -131,7 +133,7 @@ func TestHTTPTransportError(t *testing.T) { t.Fatal(err) } - sock.(*httpTransportSocket).error(&Message{ + sock.(*httpTransportSocket).error(&transport.Message{ Body: []byte(`an error occurred`), }) } @@ -155,7 +157,7 @@ func TestHTTPTransportError(t *testing.T) { } defer c.Close() - m := Message{ + m := transport.Message{ Header: map[string]string{ "Content-Type": "application/json", }, @@ -166,7 +168,7 @@ func TestHTTPTransportError(t *testing.T) { t.Errorf("Unexpected send err: %v", err) } - var rm Message + var rm transport.Message err = c.Recv(&rm) if err == nil { @@ -181,7 +183,7 @@ func TestHTTPTransportError(t *testing.T) { } func TestHTTPTransportTimeout(t *testing.T) { - tr := NewTransport(Timeout(time.Millisecond * 100)) + tr := NewTransport(transport.Timeout(time.Millisecond * 100)) l, err := tr.Listen("127.0.0.1:0") if err != nil { @@ -191,7 +193,7 @@ func TestHTTPTransportTimeout(t *testing.T) { done := make(chan bool) - fn := func(sock Socket) { + fn := func(sock transport.Socket) { defer func() { sock.Close() close(done) @@ -207,7 +209,7 @@ func TestHTTPTransportTimeout(t *testing.T) { }() for { - var m Message + var m transport.Message if err := sock.Recv(&m); err != nil { return @@ -231,7 +233,7 @@ func TestHTTPTransportTimeout(t *testing.T) { } defer c.Close() - m := Message{ + m := transport.Message{ Header: map[string]string{ "Content-Type": "application/json", }, diff --git a/transport/http_transport.go b/transport/http_transport.go deleted file mode 100644 index 16dc3b34..00000000 --- a/transport/http_transport.go +++ /dev/null @@ -1,601 +0,0 @@ -package transport - -import ( - "bufio" - "bytes" - "crypto/tls" - "errors" - "io" - "io/ioutil" - "net" - "net/http" - "net/url" - "sync" - "time" - - maddr "github.com/micro/go-micro/v3/util/addr" - "github.com/micro/go-micro/v3/util/buf" - mnet "github.com/micro/go-micro/v3/util/net" - mls "github.com/micro/go-micro/v3/util/tls" - "golang.org/x/net/http2" - "golang.org/x/net/http2/h2c" -) - -type httpTransport struct { - opts Options -} - -type httpTransportClient struct { - ht *httpTransport - addr string - conn net.Conn - dialOpts DialOptions - once sync.Once - - sync.RWMutex - - // request must be stored for response processing - r chan *http.Request - bl []*http.Request - buff *bufio.Reader - - // local/remote ip - local string - remote string -} - -type httpTransportSocket struct { - ht *httpTransport - w http.ResponseWriter - r *http.Request - rw *bufio.ReadWriter - - mtx sync.RWMutex - - // the hijacked when using http 1 - conn net.Conn - // for the first request - ch chan *http.Request - - // h2 things - buf *bufio.Reader - // indicate if socket is closed - closed chan bool - - // local/remote ip - local string - remote string -} - -type httpTransportListener struct { - ht *httpTransport - listener net.Listener -} - -func (h *httpTransportClient) Local() string { - return h.local -} - -func (h *httpTransportClient) Remote() string { - return h.remote -} - -func (h *httpTransportClient) Send(m *Message) error { - header := make(http.Header) - - for k, v := range m.Header { - header.Set(k, v) - } - - b := buf.New(bytes.NewBuffer(m.Body)) - defer b.Close() - - req := &http.Request{ - Method: "POST", - URL: &url.URL{ - Scheme: "http", - Host: h.addr, - }, - Header: header, - Body: b, - ContentLength: int64(b.Len()), - Host: h.addr, - } - - h.Lock() - h.bl = append(h.bl, req) - select { - case h.r <- h.bl[0]: - h.bl = h.bl[1:] - default: - } - h.Unlock() - - // set timeout if its greater than 0 - if h.ht.opts.Timeout > time.Duration(0) { - h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout)) - } - - return req.Write(h.conn) -} - -func (h *httpTransportClient) Recv(m *Message) error { - if m == nil { - return errors.New("message passed in is nil") - } - - var r *http.Request - if !h.dialOpts.Stream { - rc, ok := <-h.r - if !ok { - return io.EOF - } - r = rc - } - - // set timeout if its greater than 0 - if h.ht.opts.Timeout > time.Duration(0) { - h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout)) - } - - rsp, err := http.ReadResponse(h.buff, r) - if err != nil { - return err - } - defer rsp.Body.Close() - - b, err := ioutil.ReadAll(rsp.Body) - if err != nil { - return err - } - - if rsp.StatusCode != 200 { - return errors.New(rsp.Status + ": " + string(b)) - } - - m.Body = b - - if m.Header == nil { - m.Header = make(map[string]string, len(rsp.Header)) - } - - for k, v := range rsp.Header { - if len(v) > 0 { - m.Header[k] = v[0] - } else { - m.Header[k] = "" - } - } - - return nil -} - -func (h *httpTransportClient) Close() error { - h.once.Do(func() { - h.Lock() - h.buff.Reset(nil) - h.Unlock() - close(h.r) - }) - return h.conn.Close() -} - -func (h *httpTransportSocket) Local() string { - return h.local -} - -func (h *httpTransportSocket) Remote() string { - return h.remote -} - -func (h *httpTransportSocket) Recv(m *Message) error { - if m == nil { - return errors.New("message passed in is nil") - } - if m.Header == nil { - m.Header = make(map[string]string, len(h.r.Header)) - } - - // process http 1 - if h.r.ProtoMajor == 1 { - // set timeout if its greater than 0 - if h.ht.opts.Timeout > time.Duration(0) { - h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout)) - } - - var r *http.Request - - select { - // get first request - case r = <-h.ch: - // read next request - default: - rr, err := http.ReadRequest(h.rw.Reader) - if err != nil { - return err - } - r = rr - } - - // read body - b, err := ioutil.ReadAll(r.Body) - if err != nil { - return err - } - - // set body - r.Body.Close() - m.Body = b - - // set headers - for k, v := range r.Header { - if len(v) > 0 { - m.Header[k] = v[0] - } else { - m.Header[k] = "" - } - } - - // return early early - return nil - } - - // only process if the socket is open - select { - case <-h.closed: - return io.EOF - default: - // no op - } - - // processing http2 request - // read streaming body - - // set max buffer size - // TODO: adjustable buffer size - buf := make([]byte, 4*1024*1024) - - // read the request body - n, err := h.buf.Read(buf) - // not an eof error - if err != nil { - return err - } - - // check if we have data - if n > 0 { - m.Body = buf[:n] - } - - // set headers - for k, v := range h.r.Header { - if len(v) > 0 { - m.Header[k] = v[0] - } else { - m.Header[k] = "" - } - } - - // set path - m.Header[":path"] = h.r.URL.Path - - return nil -} - -func (h *httpTransportSocket) Send(m *Message) error { - if h.r.ProtoMajor == 1 { - // make copy of header - hdr := make(http.Header) - for k, v := range h.r.Header { - hdr[k] = v - } - - rsp := &http.Response{ - Header: hdr, - Body: ioutil.NopCloser(bytes.NewReader(m.Body)), - Status: "200 OK", - StatusCode: 200, - Proto: "HTTP/1.1", - ProtoMajor: 1, - ProtoMinor: 1, - ContentLength: int64(len(m.Body)), - } - - for k, v := range m.Header { - rsp.Header.Set(k, v) - } - - // set timeout if its greater than 0 - if h.ht.opts.Timeout > time.Duration(0) { - h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout)) - } - - return rsp.Write(h.conn) - } - - // only process if the socket is open - select { - case <-h.closed: - return io.EOF - default: - // no op - } - - // we need to lock to protect the write - h.mtx.RLock() - defer h.mtx.RUnlock() - - // set headers - for k, v := range m.Header { - h.w.Header().Set(k, v) - } - - // write request - _, err := h.w.Write(m.Body) - - // flush the trailers - h.w.(http.Flusher).Flush() - - return err -} - -func (h *httpTransportSocket) error(m *Message) error { - if h.r.ProtoMajor == 1 { - rsp := &http.Response{ - Header: make(http.Header), - Body: ioutil.NopCloser(bytes.NewReader(m.Body)), - Status: "500 Internal Server Error", - StatusCode: 500, - Proto: "HTTP/1.1", - ProtoMajor: 1, - ProtoMinor: 1, - ContentLength: int64(len(m.Body)), - } - - for k, v := range m.Header { - rsp.Header.Set(k, v) - } - - return rsp.Write(h.conn) - } - - return nil -} - -func (h *httpTransportSocket) Close() error { - h.mtx.Lock() - defer h.mtx.Unlock() - select { - case <-h.closed: - return nil - default: - // close the channel - close(h.closed) - - // close the buffer - h.r.Body.Close() - - // close the connection - if h.r.ProtoMajor == 1 { - return h.conn.Close() - } - } - - return nil -} - -func (h *httpTransportListener) Addr() string { - return h.listener.Addr().String() -} - -func (h *httpTransportListener) Close() error { - return h.listener.Close() -} - -func (h *httpTransportListener) Accept(fn func(Socket)) error { - // create handler mux - mux := http.NewServeMux() - - // register our transport handler - mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - var buf *bufio.ReadWriter - var con net.Conn - - // read a regular request - if r.ProtoMajor == 1 { - b, err := ioutil.ReadAll(r.Body) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - r.Body = ioutil.NopCloser(bytes.NewReader(b)) - // hijack the conn - hj, ok := w.(http.Hijacker) - if !ok { - // we're screwed - http.Error(w, "cannot serve conn", http.StatusInternalServerError) - return - } - - conn, bufrw, err := hj.Hijack() - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - defer conn.Close() - buf = bufrw - con = conn - } - - // buffered reader - bufr := bufio.NewReader(r.Body) - - // save the request - ch := make(chan *http.Request, 1) - ch <- r - - // create a new transport socket - sock := &httpTransportSocket{ - ht: h.ht, - w: w, - r: r, - rw: buf, - buf: bufr, - ch: ch, - conn: con, - local: h.Addr(), - remote: r.RemoteAddr, - closed: make(chan bool), - } - - // execute the socket - fn(sock) - }) - - // get optional handlers - if h.ht.opts.Context != nil { - handlers, ok := h.ht.opts.Context.Value("http_handlers").(map[string]http.Handler) - if ok { - for pattern, handler := range handlers { - mux.Handle(pattern, handler) - } - } - } - - // default http2 server - srv := &http.Server{ - Handler: mux, - } - - // insecure connection use h2c - if !(h.ht.opts.Secure || h.ht.opts.TLSConfig != nil) { - srv.Handler = h2c.NewHandler(mux, &http2.Server{}) - } - - // begin serving - return srv.Serve(h.listener) -} - -func (h *httpTransport) Dial(addr string, opts ...DialOption) (Client, error) { - dopts := DialOptions{ - Timeout: DefaultDialTimeout, - } - - for _, opt := range opts { - opt(&dopts) - } - - var conn net.Conn - var err error - - // TODO: support dial option here rather than using internal config - if h.opts.Secure || h.opts.TLSConfig != nil { - config := h.opts.TLSConfig - if config == nil { - config = &tls.Config{ - InsecureSkipVerify: true, - } - } - config.NextProtos = []string{"http/1.1"} - conn, err = newConn(func(addr string) (net.Conn, error) { - return tls.DialWithDialer(&net.Dialer{Timeout: dopts.Timeout}, "tcp", addr, config) - })(addr) - } else { - conn, err = newConn(func(addr string) (net.Conn, error) { - return net.DialTimeout("tcp", addr, dopts.Timeout) - })(addr) - } - - if err != nil { - return nil, err - } - - return &httpTransportClient{ - ht: h, - addr: addr, - conn: conn, - buff: bufio.NewReader(conn), - dialOpts: dopts, - r: make(chan *http.Request, 1), - local: conn.LocalAddr().String(), - remote: conn.RemoteAddr().String(), - }, nil -} - -func (h *httpTransport) Listen(addr string, opts ...ListenOption) (Listener, error) { - var options ListenOptions - for _, o := range opts { - o(&options) - } - - var l net.Listener - var err error - - // TODO: support use of listen options - if h.opts.Secure || h.opts.TLSConfig != nil { - config := h.opts.TLSConfig - - fn := func(addr string) (net.Listener, error) { - if config == nil { - hosts := []string{addr} - - // check if its a valid host:port - if host, _, err := net.SplitHostPort(addr); err == nil { - if len(host) == 0 { - hosts = maddr.IPs() - } else { - hosts = []string{host} - } - } - - // generate a certificate - cert, err := mls.Certificate(hosts...) - if err != nil { - return nil, err - } - config = &tls.Config{Certificates: []tls.Certificate{cert}} - } - return tls.Listen("tcp", addr, config) - } - - l, err = mnet.Listen(addr, fn) - } else { - fn := func(addr string) (net.Listener, error) { - return net.Listen("tcp", addr) - } - - l, err = mnet.Listen(addr, fn) - } - - if err != nil { - return nil, err - } - - return &httpTransportListener{ - ht: h, - listener: l, - }, nil -} - -func (h *httpTransport) Init(opts ...Option) error { - for _, o := range opts { - o(&h.opts) - } - return nil -} - -func (h *httpTransport) Options() Options { - return h.opts -} - -func (h *httpTransport) String() string { - return "http" -} - -func newHTTPTransport(opts ...Option) *httpTransport { - var options Options - for _, o := range opts { - o(&options) - } - return &httpTransport{opts: options} -} diff --git a/transport/transport.go b/transport/transport.go index 84f05a2b..312757e0 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -46,11 +46,5 @@ type DialOption func(*DialOptions) type ListenOption func(*ListenOptions) var ( - DefaultTransport Transport = newHTTPTransport() - DefaultDialTimeout = time.Second * 5 ) - -func NewTransport(opts ...Option) Transport { - return newHTTPTransport(opts...) -} diff --git a/util/mux/mux.go b/util/mux/mux.go deleted file mode 100644 index 2f3ab968..00000000 --- a/util/mux/mux.go +++ /dev/null @@ -1,68 +0,0 @@ -// Package mux provides proxy muxing -package mux - -import ( - "context" - "sync" - - "github.com/micro/go-micro/v3/client/grpc" - "github.com/micro/go-micro/v3/debug/service/handler" - "github.com/micro/go-micro/v3/proxy" - "github.com/micro/go-micro/v3/server" - "github.com/micro/go-micro/v3/server/mucp" -) - -// Server is a proxy muxer that incudes the use of the DefaultHandler -type Server struct { - // name of service - Name string - // Proxy handler - Proxy proxy.Proxy - // The default handler - Handler Handler -} - -type Handler interface { - proxy.Proxy - NewHandler(interface{}, ...server.HandlerOption) server.Handler - Handle(server.Handler) error -} - -var ( - once sync.Once -) - -func (s *Server) ProcessMessage(ctx context.Context, msg server.Message) error { - if msg.Topic() == s.Name { - return s.Handler.ProcessMessage(ctx, msg) - } - return s.Proxy.ProcessMessage(ctx, msg) -} - -func (s *Server) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error { - if req.Service() == s.Name { - return s.Handler.ServeRequest(ctx, req, rsp) - } - return s.Proxy.ServeRequest(ctx, req, rsp) -} - -func New(name string, p proxy.Proxy) *Server { - r := mucp.DefaultRouter - - // only register this once - once.Do(func() { - r.Handle( - // inject the debug handler - r.NewHandler( - handler.NewHandler(grpc.NewClient()), - server.InternalHandler(true), - ), - ) - }) - - return &Server{ - Name: name, - Proxy: p, - Handler: r, - } -}