cleanup debug and transport (#1920)

This commit is contained in:
Asim Aslam 2020-08-10 15:58:39 +01:00 committed by GitHub
parent 593b543230
commit 13f495587e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 635 additions and 2505 deletions

View File

@ -12,6 +12,7 @@ import (
regRouter "github.com/micro/go-micro/v3/router/registry"
"github.com/micro/go-micro/v3/selector"
"github.com/micro/go-micro/v3/transport"
thttp "github.com/micro/go-micro/v3/transport/http"
)
type Options struct {
@ -120,7 +121,7 @@ func NewOptions(options ...Option) Options {
Broker: http.NewBroker(),
Router: regRouter.NewRouter(),
Selector: selector.DefaultSelector,
Transport: transport.DefaultTransport,
Transport: thttp.NewTransport(),
}
for _, o := range options {

View File

@ -9,9 +9,7 @@ import (
var (
// Default buffer size if any
DefaultSize = 1024
// DefaultLog logger
DefaultLog = NewLog()
DefaultSize = 256
// Default formatter
DefaultFormat = TextFormat
)

View File

@ -8,11 +8,6 @@ import (
"github.com/micro/go-micro/v3/util/ring"
)
var (
// DefaultSize of the logger buffer
DefaultSize = 1024
)
// memoryLog is default micro log
type memoryLog struct {
*ring.Buffer

View File

@ -1,81 +0,0 @@
package log
import (
"sync"
"github.com/google/uuid"
"github.com/micro/go-micro/v3/util/ring"
)
// Should stream from OS
type osLog struct {
format FormatFunc
once sync.Once
sync.RWMutex
buffer *ring.Buffer
subs map[string]*osStream
}
type osStream struct {
stream chan Record
}
// Read reads log entries from the logger
func (o *osLog) Read(...ReadOption) ([]Record, error) {
var records []Record
// read the last 100 records
for _, v := range o.buffer.Get(100) {
records = append(records, v.Value.(Record))
}
return records, nil
}
// Write writes records to log
func (o *osLog) Write(r Record) error {
o.buffer.Put(r)
return nil
}
// Stream log records
func (o *osLog) Stream() (Stream, error) {
o.Lock()
defer o.Unlock()
// create stream
st := &osStream{
stream: make(chan Record, 128),
}
// save stream
o.subs[uuid.New().String()] = st
return st, nil
}
func (o *osStream) Chan() <-chan Record {
return o.stream
}
func (o *osStream) Stop() error {
return nil
}
func NewLog(opts ...Option) Log {
options := Options{
Format: DefaultFormat,
}
for _, o := range opts {
o(&options)
}
l := &osLog{
format: options.Format,
buffer: ring.New(1024),
subs: make(map[string]*osStream),
}
return l
}

View File

@ -1,94 +0,0 @@
// Package service provides the service log
package service
import (
"context"
"time"
"github.com/micro/go-micro/v3/client/grpc"
"github.com/micro/go-micro/v3/debug/log"
pb "github.com/micro/go-micro/v3/debug/service/proto"
)
// Debug provides debug service client
type debugClient struct {
Client pb.DebugService
}
func (d *debugClient) Trace() ([]*pb.Span, error) {
rsp, err := d.Client.Trace(context.Background(), &pb.TraceRequest{})
if err != nil {
return nil, err
}
return rsp.Spans, nil
}
// Logs queries the services logs and returns a channel to read the logs from
func (d *debugClient) Log(since time.Time, count int, stream bool) (log.Stream, error) {
req := &pb.LogRequest{}
if !since.IsZero() {
req.Since = since.Unix()
}
if count > 0 {
req.Count = int64(count)
}
// set whether to stream
req.Stream = stream
// get the log stream
serverStream, err := d.Client.Log(context.Background(), req)
if err != nil {
return nil, err
}
lg := &logStream{
stream: make(chan log.Record),
stop: make(chan bool),
}
// go stream logs
go d.streamLogs(lg, serverStream)
return lg, nil
}
func (d *debugClient) streamLogs(lg *logStream, stream pb.Debug_LogService) {
defer stream.Close()
defer lg.Stop()
for {
resp, err := stream.Recv()
if err != nil {
break
}
metadata := make(map[string]string)
for k, v := range resp.Metadata {
metadata[k] = v
}
record := log.Record{
Timestamp: time.Unix(resp.Timestamp, 0),
Message: resp.Message,
Metadata: metadata,
}
select {
case <-lg.stop:
return
case lg.stream <- record:
}
}
}
// NewClient provides a debug client
func NewClient(name string) *debugClient {
// create default client
cli := grpc.NewClient()
return &debugClient{
Client: pb.NewDebugService(name, cli),
}
}

View File

@ -1,176 +0,0 @@
// Package handler implements service debug handler embedded in go-micro services
package handler
import (
"context"
"time"
"github.com/micro/go-micro/v3/client"
"github.com/micro/go-micro/v3/debug/log"
proto "github.com/micro/go-micro/v3/debug/service/proto"
"github.com/micro/go-micro/v3/debug/stats"
"github.com/micro/go-micro/v3/debug/trace"
"github.com/micro/go-micro/v3/server"
)
// NewHandler returns an instance of the Debug Handler
func NewHandler(c client.Client) *Debug {
return &Debug{
log: log.DefaultLog,
stats: stats.DefaultStats,
trace: trace.DefaultTracer,
cache: c.Options().Cache,
}
}
type Debug struct {
// must honour the debug handler
proto.DebugHandler
// the logger for retrieving logs
log log.Log
// the stats collector
stats stats.Stats
// the tracer
trace trace.Tracer
// the cache
cache *client.Cache
}
func (d *Debug) Health(ctx context.Context, req *proto.HealthRequest, rsp *proto.HealthResponse) error {
rsp.Status = "ok"
return nil
}
func (d *Debug) Stats(ctx context.Context, req *proto.StatsRequest, rsp *proto.StatsResponse) error {
stats, err := d.stats.Read()
if err != nil {
return err
}
if len(stats) == 0 {
return nil
}
// write the response values
rsp.Timestamp = uint64(stats[0].Timestamp)
rsp.Started = uint64(stats[0].Started)
rsp.Uptime = uint64(stats[0].Uptime)
rsp.Memory = stats[0].Memory
rsp.Gc = stats[0].GC
rsp.Threads = stats[0].Threads
rsp.Requests = stats[0].Requests
rsp.Errors = stats[0].Errors
return nil
}
func (d *Debug) Trace(ctx context.Context, req *proto.TraceRequest, rsp *proto.TraceResponse) error {
traces, err := d.trace.Read(trace.ReadTrace(req.Id))
if err != nil {
return err
}
for _, t := range traces {
var typ proto.SpanType
switch t.Type {
case trace.SpanTypeRequestInbound:
typ = proto.SpanType_INBOUND
case trace.SpanTypeRequestOutbound:
typ = proto.SpanType_OUTBOUND
}
rsp.Spans = append(rsp.Spans, &proto.Span{
Trace: t.Trace,
Id: t.Id,
Parent: t.Parent,
Name: t.Name,
Started: uint64(t.Started.UnixNano()),
Duration: uint64(t.Duration.Nanoseconds()),
Type: typ,
Metadata: t.Metadata,
})
}
return nil
}
func (d *Debug) Log(ctx context.Context, stream server.Stream) error {
req := new(proto.LogRequest)
if err := stream.Recv(req); err != nil {
return err
}
var options []log.ReadOption
since := time.Unix(req.Since, 0)
if !since.IsZero() {
options = append(options, log.Since(since))
}
count := int(req.Count)
if count > 0 {
options = append(options, log.Count(count))
}
if req.Stream {
// TODO: we need to figure out how to close the log stream
// It seems like when a client disconnects,
// the connection stays open until some timeout expires
// or something like that; that means the map of streams
// might end up leaking memory if not cleaned up properly
lgStream, err := d.log.Stream()
if err != nil {
return err
}
defer lgStream.Stop()
for record := range lgStream.Chan() {
// copy metadata
metadata := make(map[string]string)
for k, v := range record.Metadata {
metadata[k] = v
}
// send record
if err := stream.Send(&proto.Record{
Timestamp: record.Timestamp.Unix(),
Message: record.Message.(string),
Metadata: metadata,
}); err != nil {
return err
}
}
// done streaming, return
return nil
}
// get the log records
records, err := d.log.Read(options...)
if err != nil {
return err
}
// send all the logs downstream
for _, record := range records {
// copy metadata
metadata := make(map[string]string)
for k, v := range record.Metadata {
metadata[k] = v
}
// send record
if err := stream.Send(&proto.Record{
Timestamp: record.Timestamp.Unix(),
Message: record.Message.(string),
Metadata: metadata,
}); err != nil {
return err
}
}
return nil
}
// Cache returns all the key value pairs in the client cache
func (d *Debug) Cache(ctx context.Context, req *proto.CacheRequest, rsp *proto.CacheResponse) error {
rsp.Values = d.cache.List()
return nil
}

View File

@ -1,970 +0,0 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: debug/service/proto/debug.proto
package debug
import (
context "context"
fmt "fmt"
proto "github.com/golang/protobuf/proto"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type SpanType int32
const (
SpanType_INBOUND SpanType = 0
SpanType_OUTBOUND SpanType = 1
)
var SpanType_name = map[int32]string{
0: "INBOUND",
1: "OUTBOUND",
}
var SpanType_value = map[string]int32{
"INBOUND": 0,
"OUTBOUND": 1,
}
func (x SpanType) String() string {
return proto.EnumName(SpanType_name, int32(x))
}
func (SpanType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_df91f41a5db378e6, []int{0}
}
type HealthRequest struct {
// optional service name
Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *HealthRequest) Reset() { *m = HealthRequest{} }
func (m *HealthRequest) String() string { return proto.CompactTextString(m) }
func (*HealthRequest) ProtoMessage() {}
func (*HealthRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_df91f41a5db378e6, []int{0}
}
func (m *HealthRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_HealthRequest.Unmarshal(m, b)
}
func (m *HealthRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_HealthRequest.Marshal(b, m, deterministic)
}
func (m *HealthRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_HealthRequest.Merge(m, src)
}
func (m *HealthRequest) XXX_Size() int {
return xxx_messageInfo_HealthRequest.Size(m)
}
func (m *HealthRequest) XXX_DiscardUnknown() {
xxx_messageInfo_HealthRequest.DiscardUnknown(m)
}
var xxx_messageInfo_HealthRequest proto.InternalMessageInfo
func (m *HealthRequest) GetService() string {
if m != nil {
return m.Service
}
return ""
}
type HealthResponse struct {
// default: ok
Status string `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *HealthResponse) Reset() { *m = HealthResponse{} }
func (m *HealthResponse) String() string { return proto.CompactTextString(m) }
func (*HealthResponse) ProtoMessage() {}
func (*HealthResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_df91f41a5db378e6, []int{1}
}
func (m *HealthResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_HealthResponse.Unmarshal(m, b)
}
func (m *HealthResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_HealthResponse.Marshal(b, m, deterministic)
}
func (m *HealthResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_HealthResponse.Merge(m, src)
}
func (m *HealthResponse) XXX_Size() int {
return xxx_messageInfo_HealthResponse.Size(m)
}
func (m *HealthResponse) XXX_DiscardUnknown() {
xxx_messageInfo_HealthResponse.DiscardUnknown(m)
}
var xxx_messageInfo_HealthResponse proto.InternalMessageInfo
func (m *HealthResponse) GetStatus() string {
if m != nil {
return m.Status
}
return ""
}
type StatsRequest struct {
// optional service name
Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *StatsRequest) Reset() { *m = StatsRequest{} }
func (m *StatsRequest) String() string { return proto.CompactTextString(m) }
func (*StatsRequest) ProtoMessage() {}
func (*StatsRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_df91f41a5db378e6, []int{2}
}
func (m *StatsRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_StatsRequest.Unmarshal(m, b)
}
func (m *StatsRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_StatsRequest.Marshal(b, m, deterministic)
}
func (m *StatsRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_StatsRequest.Merge(m, src)
}
func (m *StatsRequest) XXX_Size() int {
return xxx_messageInfo_StatsRequest.Size(m)
}
func (m *StatsRequest) XXX_DiscardUnknown() {
xxx_messageInfo_StatsRequest.DiscardUnknown(m)
}
var xxx_messageInfo_StatsRequest proto.InternalMessageInfo
func (m *StatsRequest) GetService() string {
if m != nil {
return m.Service
}
return ""
}
type StatsResponse struct {
// timestamp of recording
Timestamp uint64 `protobuf:"varint,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
// unix timestamp
Started uint64 `protobuf:"varint,2,opt,name=started,proto3" json:"started,omitempty"`
// in seconds
Uptime uint64 `protobuf:"varint,3,opt,name=uptime,proto3" json:"uptime,omitempty"`
// in bytes
Memory uint64 `protobuf:"varint,4,opt,name=memory,proto3" json:"memory,omitempty"`
// num threads
Threads uint64 `protobuf:"varint,5,opt,name=threads,proto3" json:"threads,omitempty"`
// total gc in nanoseconds
Gc uint64 `protobuf:"varint,6,opt,name=gc,proto3" json:"gc,omitempty"`
// total number of requests
Requests uint64 `protobuf:"varint,7,opt,name=requests,proto3" json:"requests,omitempty"`
// total number of errors
Errors uint64 `protobuf:"varint,8,opt,name=errors,proto3" json:"errors,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *StatsResponse) Reset() { *m = StatsResponse{} }
func (m *StatsResponse) String() string { return proto.CompactTextString(m) }
func (*StatsResponse) ProtoMessage() {}
func (*StatsResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_df91f41a5db378e6, []int{3}
}
func (m *StatsResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_StatsResponse.Unmarshal(m, b)
}
func (m *StatsResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_StatsResponse.Marshal(b, m, deterministic)
}
func (m *StatsResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_StatsResponse.Merge(m, src)
}
func (m *StatsResponse) XXX_Size() int {
return xxx_messageInfo_StatsResponse.Size(m)
}
func (m *StatsResponse) XXX_DiscardUnknown() {
xxx_messageInfo_StatsResponse.DiscardUnknown(m)
}
var xxx_messageInfo_StatsResponse proto.InternalMessageInfo
func (m *StatsResponse) GetTimestamp() uint64 {
if m != nil {
return m.Timestamp
}
return 0
}
func (m *StatsResponse) GetStarted() uint64 {
if m != nil {
return m.Started
}
return 0
}
func (m *StatsResponse) GetUptime() uint64 {
if m != nil {
return m.Uptime
}
return 0
}
func (m *StatsResponse) GetMemory() uint64 {
if m != nil {
return m.Memory
}
return 0
}
func (m *StatsResponse) GetThreads() uint64 {
if m != nil {
return m.Threads
}
return 0
}
func (m *StatsResponse) GetGc() uint64 {
if m != nil {
return m.Gc
}
return 0
}
func (m *StatsResponse) GetRequests() uint64 {
if m != nil {
return m.Requests
}
return 0
}
func (m *StatsResponse) GetErrors() uint64 {
if m != nil {
return m.Errors
}
return 0
}
// LogRequest requests service logs
type LogRequest struct {
// service to request logs for
Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"`
// stream records continuously
Stream bool `protobuf:"varint,2,opt,name=stream,proto3" json:"stream,omitempty"`
// count of records to request
Count int64 `protobuf:"varint,3,opt,name=count,proto3" json:"count,omitempty"`
// relative time in seconds
// before the current time
// from which to show logs
Since int64 `protobuf:"varint,4,opt,name=since,proto3" json:"since,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *LogRequest) Reset() { *m = LogRequest{} }
func (m *LogRequest) String() string { return proto.CompactTextString(m) }
func (*LogRequest) ProtoMessage() {}
func (*LogRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_df91f41a5db378e6, []int{4}
}
func (m *LogRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_LogRequest.Unmarshal(m, b)
}
func (m *LogRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_LogRequest.Marshal(b, m, deterministic)
}
func (m *LogRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_LogRequest.Merge(m, src)
}
func (m *LogRequest) XXX_Size() int {
return xxx_messageInfo_LogRequest.Size(m)
}
func (m *LogRequest) XXX_DiscardUnknown() {
xxx_messageInfo_LogRequest.DiscardUnknown(m)
}
var xxx_messageInfo_LogRequest proto.InternalMessageInfo
func (m *LogRequest) GetService() string {
if m != nil {
return m.Service
}
return ""
}
func (m *LogRequest) GetStream() bool {
if m != nil {
return m.Stream
}
return false
}
func (m *LogRequest) GetCount() int64 {
if m != nil {
return m.Count
}
return 0
}
func (m *LogRequest) GetSince() int64 {
if m != nil {
return m.Since
}
return 0
}
// Record is service log record
type Record struct {
// timestamp of log record
Timestamp int64 `protobuf:"varint,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
// record metadata
Metadata map[string]string `protobuf:"bytes,2,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
// message
Message string `protobuf:"bytes,3,opt,name=message,proto3" json:"message,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Record) Reset() { *m = Record{} }
func (m *Record) String() string { return proto.CompactTextString(m) }
func (*Record) ProtoMessage() {}
func (*Record) Descriptor() ([]byte, []int) {
return fileDescriptor_df91f41a5db378e6, []int{5}
}
func (m *Record) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Record.Unmarshal(m, b)
}
func (m *Record) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Record.Marshal(b, m, deterministic)
}
func (m *Record) XXX_Merge(src proto.Message) {
xxx_messageInfo_Record.Merge(m, src)
}
func (m *Record) XXX_Size() int {
return xxx_messageInfo_Record.Size(m)
}
func (m *Record) XXX_DiscardUnknown() {
xxx_messageInfo_Record.DiscardUnknown(m)
}
var xxx_messageInfo_Record proto.InternalMessageInfo
func (m *Record) GetTimestamp() int64 {
if m != nil {
return m.Timestamp
}
return 0
}
func (m *Record) GetMetadata() map[string]string {
if m != nil {
return m.Metadata
}
return nil
}
func (m *Record) GetMessage() string {
if m != nil {
return m.Message
}
return ""
}
type TraceRequest struct {
// trace id to retrieve
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *TraceRequest) Reset() { *m = TraceRequest{} }
func (m *TraceRequest) String() string { return proto.CompactTextString(m) }
func (*TraceRequest) ProtoMessage() {}
func (*TraceRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_df91f41a5db378e6, []int{6}
}
func (m *TraceRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_TraceRequest.Unmarshal(m, b)
}
func (m *TraceRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_TraceRequest.Marshal(b, m, deterministic)
}
func (m *TraceRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_TraceRequest.Merge(m, src)
}
func (m *TraceRequest) XXX_Size() int {
return xxx_messageInfo_TraceRequest.Size(m)
}
func (m *TraceRequest) XXX_DiscardUnknown() {
xxx_messageInfo_TraceRequest.DiscardUnknown(m)
}
var xxx_messageInfo_TraceRequest proto.InternalMessageInfo
func (m *TraceRequest) GetId() string {
if m != nil {
return m.Id
}
return ""
}
type TraceResponse struct {
Spans []*Span `protobuf:"bytes,1,rep,name=spans,proto3" json:"spans,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *TraceResponse) Reset() { *m = TraceResponse{} }
func (m *TraceResponse) String() string { return proto.CompactTextString(m) }
func (*TraceResponse) ProtoMessage() {}
func (*TraceResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_df91f41a5db378e6, []int{7}
}
func (m *TraceResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_TraceResponse.Unmarshal(m, b)
}
func (m *TraceResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_TraceResponse.Marshal(b, m, deterministic)
}
func (m *TraceResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_TraceResponse.Merge(m, src)
}
func (m *TraceResponse) XXX_Size() int {
return xxx_messageInfo_TraceResponse.Size(m)
}
func (m *TraceResponse) XXX_DiscardUnknown() {
xxx_messageInfo_TraceResponse.DiscardUnknown(m)
}
var xxx_messageInfo_TraceResponse proto.InternalMessageInfo
func (m *TraceResponse) GetSpans() []*Span {
if m != nil {
return m.Spans
}
return nil
}
type Span struct {
// the trace id
Trace string `protobuf:"bytes,1,opt,name=trace,proto3" json:"trace,omitempty"`
// id of the span
Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"`
// parent span
Parent string `protobuf:"bytes,3,opt,name=parent,proto3" json:"parent,omitempty"`
// name of the resource
Name string `protobuf:"bytes,4,opt,name=name,proto3" json:"name,omitempty"`
// time of start in nanoseconds
Started uint64 `protobuf:"varint,5,opt,name=started,proto3" json:"started,omitempty"`
// duration of the execution in nanoseconds
Duration uint64 `protobuf:"varint,6,opt,name=duration,proto3" json:"duration,omitempty"`
// associated metadata
Metadata map[string]string `protobuf:"bytes,7,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
Type SpanType `protobuf:"varint,8,opt,name=type,proto3,enum=SpanType" json:"type,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Span) Reset() { *m = Span{} }
func (m *Span) String() string { return proto.CompactTextString(m) }
func (*Span) ProtoMessage() {}
func (*Span) Descriptor() ([]byte, []int) {
return fileDescriptor_df91f41a5db378e6, []int{8}
}
func (m *Span) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Span.Unmarshal(m, b)
}
func (m *Span) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Span.Marshal(b, m, deterministic)
}
func (m *Span) XXX_Merge(src proto.Message) {
xxx_messageInfo_Span.Merge(m, src)
}
func (m *Span) XXX_Size() int {
return xxx_messageInfo_Span.Size(m)
}
func (m *Span) XXX_DiscardUnknown() {
xxx_messageInfo_Span.DiscardUnknown(m)
}
var xxx_messageInfo_Span proto.InternalMessageInfo
func (m *Span) GetTrace() string {
if m != nil {
return m.Trace
}
return ""
}
func (m *Span) GetId() string {
if m != nil {
return m.Id
}
return ""
}
func (m *Span) GetParent() string {
if m != nil {
return m.Parent
}
return ""
}
func (m *Span) GetName() string {
if m != nil {
return m.Name
}
return ""
}
func (m *Span) GetStarted() uint64 {
if m != nil {
return m.Started
}
return 0
}
func (m *Span) GetDuration() uint64 {
if m != nil {
return m.Duration
}
return 0
}
func (m *Span) GetMetadata() map[string]string {
if m != nil {
return m.Metadata
}
return nil
}
func (m *Span) GetType() SpanType {
if m != nil {
return m.Type
}
return SpanType_INBOUND
}
type CacheRequest struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *CacheRequest) Reset() { *m = CacheRequest{} }
func (m *CacheRequest) String() string { return proto.CompactTextString(m) }
func (*CacheRequest) ProtoMessage() {}
func (*CacheRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_df91f41a5db378e6, []int{9}
}
func (m *CacheRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CacheRequest.Unmarshal(m, b)
}
func (m *CacheRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_CacheRequest.Marshal(b, m, deterministic)
}
func (m *CacheRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_CacheRequest.Merge(m, src)
}
func (m *CacheRequest) XXX_Size() int {
return xxx_messageInfo_CacheRequest.Size(m)
}
func (m *CacheRequest) XXX_DiscardUnknown() {
xxx_messageInfo_CacheRequest.DiscardUnknown(m)
}
var xxx_messageInfo_CacheRequest proto.InternalMessageInfo
type CacheResponse struct {
Values map[string]string `protobuf:"bytes,1,rep,name=values,proto3" json:"values,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *CacheResponse) Reset() { *m = CacheResponse{} }
func (m *CacheResponse) String() string { return proto.CompactTextString(m) }
func (*CacheResponse) ProtoMessage() {}
func (*CacheResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_df91f41a5db378e6, []int{10}
}
func (m *CacheResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CacheResponse.Unmarshal(m, b)
}
func (m *CacheResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_CacheResponse.Marshal(b, m, deterministic)
}
func (m *CacheResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_CacheResponse.Merge(m, src)
}
func (m *CacheResponse) XXX_Size() int {
return xxx_messageInfo_CacheResponse.Size(m)
}
func (m *CacheResponse) XXX_DiscardUnknown() {
xxx_messageInfo_CacheResponse.DiscardUnknown(m)
}
var xxx_messageInfo_CacheResponse proto.InternalMessageInfo
func (m *CacheResponse) GetValues() map[string]string {
if m != nil {
return m.Values
}
return nil
}
func init() {
proto.RegisterEnum("SpanType", SpanType_name, SpanType_value)
proto.RegisterType((*HealthRequest)(nil), "HealthRequest")
proto.RegisterType((*HealthResponse)(nil), "HealthResponse")
proto.RegisterType((*StatsRequest)(nil), "StatsRequest")
proto.RegisterType((*StatsResponse)(nil), "StatsResponse")
proto.RegisterType((*LogRequest)(nil), "LogRequest")
proto.RegisterType((*Record)(nil), "Record")
proto.RegisterMapType((map[string]string)(nil), "Record.MetadataEntry")
proto.RegisterType((*TraceRequest)(nil), "TraceRequest")
proto.RegisterType((*TraceResponse)(nil), "TraceResponse")
proto.RegisterType((*Span)(nil), "Span")
proto.RegisterMapType((map[string]string)(nil), "Span.MetadataEntry")
proto.RegisterType((*CacheRequest)(nil), "CacheRequest")
proto.RegisterType((*CacheResponse)(nil), "CacheResponse")
proto.RegisterMapType((map[string]string)(nil), "CacheResponse.ValuesEntry")
}
func init() { proto.RegisterFile("debug/service/proto/debug.proto", fileDescriptor_df91f41a5db378e6) }
var fileDescriptor_df91f41a5db378e6 = []byte{
// 646 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x54, 0xdb, 0x6e, 0xd3, 0x4a,
0x14, 0x8d, 0xed, 0x38, 0xb1, 0x77, 0x62, 0x9f, 0x6a, 0xce, 0x45, 0x96, 0x0f, 0xd0, 0xca, 0x12,
0x52, 0xb8, 0x68, 0x02, 0xe1, 0x85, 0xcb, 0x1b, 0x14, 0x09, 0xa4, 0xd2, 0x4a, 0xd3, 0x96, 0xf7,
0xa9, 0x3d, 0x4a, 0x03, 0xf5, 0x85, 0x99, 0x71, 0xa5, 0xbc, 0xf0, 0x23, 0xfc, 0x04, 0xff, 0x82,
0xf8, 0x1f, 0x34, 0x17, 0xb7, 0xb6, 0x10, 0xaa, 0x10, 0x6f, 0x5e, 0x6b, 0xaf, 0xd9, 0xd9, 0x7b,
0x69, 0x65, 0xc3, 0x6e, 0xc1, 0xce, 0xda, 0xf5, 0x52, 0x30, 0x7e, 0xb9, 0xc9, 0xd9, 0xb2, 0xe1,
0xb5, 0xac, 0x97, 0x9a, 0xc3, 0xfa, 0x3b, 0xbb, 0x07, 0xd1, 0x1b, 0x46, 0x2f, 0xe4, 0x39, 0x61,
0x9f, 0x5a, 0x26, 0x24, 0x4a, 0x60, 0x6a, 0xd5, 0x89, 0xb3, 0xe7, 0x2c, 0x42, 0xd2, 0xc1, 0x6c,
0x01, 0x71, 0x27, 0x15, 0x4d, 0x5d, 0x09, 0x86, 0xfe, 0x83, 0x89, 0x90, 0x54, 0xb6, 0xc2, 0x4a,
0x2d, 0xca, 0x16, 0x30, 0x3f, 0x96, 0x54, 0x8a, 0x9b, 0x7b, 0x7e, 0x77, 0x20, 0xb2, 0x52, 0xdb,
0xf3, 0x16, 0x84, 0x72, 0x53, 0x32, 0x21, 0x69, 0xd9, 0x68, 0xf5, 0x98, 0x5c, 0x13, 0xba, 0x93,
0xa4, 0x5c, 0xb2, 0x22, 0x71, 0x75, 0xad, 0x83, 0x6a, 0x96, 0xb6, 0x51, 0xc2, 0xc4, 0xd3, 0x05,
0x8b, 0x14, 0x5f, 0xb2, 0xb2, 0xe6, 0xdb, 0x64, 0x6c, 0x78, 0x83, 0x54, 0x27, 0x79, 0xce, 0x19,
0x2d, 0x44, 0xe2, 0x9b, 0x4e, 0x16, 0xa2, 0x18, 0xdc, 0x75, 0x9e, 0x4c, 0x34, 0xe9, 0xae, 0x73,
0x94, 0x42, 0xc0, 0xcd, 0x22, 0x22, 0x99, 0x6a, 0xf6, 0x0a, 0xab, 0xee, 0x8c, 0xf3, 0x9a, 0x8b,
0x24, 0x30, 0xdd, 0x0d, 0xca, 0x3e, 0x00, 0x1c, 0xd4, 0xeb, 0x1b, 0xf7, 0x37, 0x0e, 0x72, 0x46,
0x4b, 0xbd, 0x4e, 0x40, 0x2c, 0x42, 0xff, 0x80, 0x9f, 0xd7, 0x6d, 0x25, 0xf5, 0x32, 0x1e, 0x31,
0x40, 0xb1, 0x62, 0x53, 0xe5, 0x4c, 0xaf, 0xe2, 0x11, 0x03, 0xb2, 0xaf, 0x0e, 0x4c, 0x08, 0xcb,
0x6b, 0x5e, 0xfc, 0x6c, 0x9e, 0xd7, 0x37, 0xef, 0x31, 0x04, 0x25, 0x93, 0xb4, 0xa0, 0x92, 0x26,
0xee, 0x9e, 0xb7, 0x98, 0xad, 0xfe, 0xc5, 0xe6, 0x21, 0x7e, 0x67, 0xf9, 0xd7, 0x95, 0xe4, 0x5b,
0x72, 0x25, 0x53, 0x93, 0x97, 0x4c, 0x08, 0xba, 0x36, 0xb6, 0x86, 0xa4, 0x83, 0xe9, 0x0b, 0x88,
0x06, 0x8f, 0xd0, 0x0e, 0x78, 0x1f, 0xd9, 0xd6, 0x2e, 0xa8, 0x3e, 0xd5, 0xb8, 0x97, 0xf4, 0xa2,
0x65, 0x7a, 0xb7, 0x90, 0x18, 0xf0, 0xdc, 0x7d, 0xea, 0x64, 0x77, 0x60, 0x7e, 0xc2, 0x69, 0xce,
0x3a, 0x83, 0x62, 0x70, 0x37, 0x85, 0x7d, 0xea, 0x6e, 0x8a, 0xec, 0x21, 0x44, 0xb6, 0x6e, 0x53,
0xf1, 0x3f, 0xf8, 0xa2, 0xa1, 0x95, 0x0a, 0x9a, 0x9a, 0xdb, 0xc7, 0xc7, 0x0d, 0xad, 0x88, 0xe1,
0xb2, 0x2f, 0x2e, 0x8c, 0x15, 0x56, 0x3f, 0x28, 0xd5, 0x33, 0xdb, 0xc9, 0x00, 0xdb, 0xdc, 0xed,
0x9a, 0x2b, 0xcf, 0x1b, 0xca, 0x99, 0x35, 0x37, 0x24, 0x16, 0x21, 0x04, 0xe3, 0x8a, 0x96, 0xc6,
0xdc, 0x90, 0xe8, 0xef, 0x7e, 0xde, 0xfc, 0x61, 0xde, 0x52, 0x08, 0x8a, 0x96, 0x53, 0xb9, 0xa9,
0x2b, 0x9b, 0x95, 0x2b, 0x8c, 0x96, 0x3d, 0xa3, 0xa7, 0x7a, 0xe0, 0xbf, 0xf5, 0xc0, 0xbf, 0xb4,
0xf9, 0x36, 0x8c, 0xe5, 0xb6, 0x61, 0x3a, 0x44, 0xf1, 0x2a, 0xd4, 0xe2, 0x93, 0x6d, 0xc3, 0x88,
0xa6, 0xff, 0xcc, 0xeb, 0x18, 0xe6, 0xaf, 0x68, 0x7e, 0xde, 0x79, 0x9d, 0x7d, 0x86, 0xc8, 0x62,
0xeb, 0xed, 0x0a, 0x26, 0x5a, 0xdd, 0x99, 0x9b, 0xe2, 0x41, 0x1d, 0xbf, 0xd7, 0x45, 0x33, 0xb2,
0x55, 0xa6, 0xcf, 0x60, 0xd6, 0xa3, 0x7f, 0x67, 0x9e, 0xfb, 0x77, 0x21, 0xe8, 0xd6, 0x43, 0x33,
0x98, 0xbe, 0x3d, 0x7c, 0x79, 0x74, 0x7a, 0xb8, 0xbf, 0x33, 0x42, 0x73, 0x08, 0x8e, 0x4e, 0x4f,
0x0c, 0x72, 0x56, 0xdf, 0x1c, 0xf0, 0xf7, 0xd5, 0xa1, 0x42, 0xbb, 0xe0, 0x1d, 0xd4, 0x6b, 0x34,
0xc3, 0xd7, 0xff, 0xa8, 0x74, 0x6a, 0x83, 0x9b, 0x8d, 0x1e, 0x39, 0xe8, 0x01, 0x4c, 0xcc, 0x61,
0x42, 0x31, 0x1e, 0x1c, 0xb3, 0xf4, 0x2f, 0x3c, 0xbc, 0x58, 0xd9, 0x08, 0x2d, 0xc0, 0xd7, 0x07,
0x07, 0x45, 0xb8, 0x7f, 0xa3, 0xd2, 0x18, 0x0f, 0xee, 0x90, 0x51, 0xea, 0x10, 0xa2, 0x08, 0xf7,
0xc3, 0x9a, 0xc6, 0x78, 0x90, 0x4d, 0xa3, 0xd4, 0x96, 0xa1, 0x08, 0xf7, 0xad, 0x4e, 0xe3, 0xa1,
0x93, 0xd9, 0xe8, 0x6c, 0xa2, 0xaf, 0xee, 0x93, 0x1f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x22, 0x65,
0x99, 0x10, 0x98, 0x05, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// DebugClient is the client API for Debug service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type DebugClient interface {
Log(ctx context.Context, in *LogRequest, opts ...grpc.CallOption) (Debug_LogClient, error)
Health(ctx context.Context, in *HealthRequest, opts ...grpc.CallOption) (*HealthResponse, error)
Stats(ctx context.Context, in *StatsRequest, opts ...grpc.CallOption) (*StatsResponse, error)
Trace(ctx context.Context, in *TraceRequest, opts ...grpc.CallOption) (*TraceResponse, error)
Cache(ctx context.Context, in *CacheRequest, opts ...grpc.CallOption) (*CacheResponse, error)
}
type debugClient struct {
cc *grpc.ClientConn
}
func NewDebugClient(cc *grpc.ClientConn) DebugClient {
return &debugClient{cc}
}
func (c *debugClient) Log(ctx context.Context, in *LogRequest, opts ...grpc.CallOption) (Debug_LogClient, error) {
stream, err := c.cc.NewStream(ctx, &_Debug_serviceDesc.Streams[0], "/Debug/Log", opts...)
if err != nil {
return nil, err
}
x := &debugLogClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Debug_LogClient interface {
Recv() (*Record, error)
grpc.ClientStream
}
type debugLogClient struct {
grpc.ClientStream
}
func (x *debugLogClient) Recv() (*Record, error) {
m := new(Record)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *debugClient) Health(ctx context.Context, in *HealthRequest, opts ...grpc.CallOption) (*HealthResponse, error) {
out := new(HealthResponse)
err := c.cc.Invoke(ctx, "/Debug/Health", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *debugClient) Stats(ctx context.Context, in *StatsRequest, opts ...grpc.CallOption) (*StatsResponse, error) {
out := new(StatsResponse)
err := c.cc.Invoke(ctx, "/Debug/Stats", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *debugClient) Trace(ctx context.Context, in *TraceRequest, opts ...grpc.CallOption) (*TraceResponse, error) {
out := new(TraceResponse)
err := c.cc.Invoke(ctx, "/Debug/Trace", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *debugClient) Cache(ctx context.Context, in *CacheRequest, opts ...grpc.CallOption) (*CacheResponse, error) {
out := new(CacheResponse)
err := c.cc.Invoke(ctx, "/Debug/Cache", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// DebugServer is the server API for Debug service.
type DebugServer interface {
Log(*LogRequest, Debug_LogServer) error
Health(context.Context, *HealthRequest) (*HealthResponse, error)
Stats(context.Context, *StatsRequest) (*StatsResponse, error)
Trace(context.Context, *TraceRequest) (*TraceResponse, error)
Cache(context.Context, *CacheRequest) (*CacheResponse, error)
}
// UnimplementedDebugServer can be embedded to have forward compatible implementations.
type UnimplementedDebugServer struct {
}
func (*UnimplementedDebugServer) Log(req *LogRequest, srv Debug_LogServer) error {
return status.Errorf(codes.Unimplemented, "method Log not implemented")
}
func (*UnimplementedDebugServer) Health(ctx context.Context, req *HealthRequest) (*HealthResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Health not implemented")
}
func (*UnimplementedDebugServer) Stats(ctx context.Context, req *StatsRequest) (*StatsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Stats not implemented")
}
func (*UnimplementedDebugServer) Trace(ctx context.Context, req *TraceRequest) (*TraceResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Trace not implemented")
}
func (*UnimplementedDebugServer) Cache(ctx context.Context, req *CacheRequest) (*CacheResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Cache not implemented")
}
func RegisterDebugServer(s *grpc.Server, srv DebugServer) {
s.RegisterService(&_Debug_serviceDesc, srv)
}
func _Debug_Log_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(LogRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(DebugServer).Log(m, &debugLogServer{stream})
}
type Debug_LogServer interface {
Send(*Record) error
grpc.ServerStream
}
type debugLogServer struct {
grpc.ServerStream
}
func (x *debugLogServer) Send(m *Record) error {
return x.ServerStream.SendMsg(m)
}
func _Debug_Health_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(HealthRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DebugServer).Health(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/Debug/Health",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DebugServer).Health(ctx, req.(*HealthRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Debug_Stats_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(StatsRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DebugServer).Stats(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/Debug/Stats",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DebugServer).Stats(ctx, req.(*StatsRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Debug_Trace_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(TraceRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DebugServer).Trace(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/Debug/Trace",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DebugServer).Trace(ctx, req.(*TraceRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Debug_Cache_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(CacheRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DebugServer).Cache(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/Debug/Cache",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DebugServer).Cache(ctx, req.(*CacheRequest))
}
return interceptor(ctx, in, info, handler)
}
var _Debug_serviceDesc = grpc.ServiceDesc{
ServiceName: "Debug",
HandlerType: (*DebugServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Health",
Handler: _Debug_Health_Handler,
},
{
MethodName: "Stats",
Handler: _Debug_Stats_Handler,
},
{
MethodName: "Trace",
Handler: _Debug_Trace_Handler,
},
{
MethodName: "Cache",
Handler: _Debug_Cache_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "Log",
Handler: _Debug_Log_Handler,
ServerStreams: true,
},
},
Metadata: "debug/service/proto/debug.proto",
}

View File

@ -1,236 +0,0 @@
// Code generated by protoc-gen-micro. DO NOT EDIT.
// source: debug/service/proto/debug.proto
package debug
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
math "math"
)
import (
context "context"
api "github.com/micro/go-micro/v3/api"
client "github.com/micro/go-micro/v3/client"
server "github.com/micro/go-micro/v3/server"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
// Reference imports to suppress errors if they are not otherwise used.
var _ api.Endpoint
var _ context.Context
var _ client.Option
var _ server.Option
// Api Endpoints for Debug service
func NewDebugEndpoints() []*api.Endpoint {
return []*api.Endpoint{}
}
// Client API for Debug service
type DebugService interface {
Log(ctx context.Context, in *LogRequest, opts ...client.CallOption) (Debug_LogService, error)
Health(ctx context.Context, in *HealthRequest, opts ...client.CallOption) (*HealthResponse, error)
Stats(ctx context.Context, in *StatsRequest, opts ...client.CallOption) (*StatsResponse, error)
Trace(ctx context.Context, in *TraceRequest, opts ...client.CallOption) (*TraceResponse, error)
Cache(ctx context.Context, in *CacheRequest, opts ...client.CallOption) (*CacheResponse, error)
}
type debugService struct {
c client.Client
name string
}
func NewDebugService(name string, c client.Client) DebugService {
return &debugService{
c: c,
name: name,
}
}
func (c *debugService) Log(ctx context.Context, in *LogRequest, opts ...client.CallOption) (Debug_LogService, error) {
req := c.c.NewRequest(c.name, "Debug.Log", &LogRequest{})
stream, err := c.c.Stream(ctx, req, opts...)
if err != nil {
return nil, err
}
if err := stream.Send(in); err != nil {
return nil, err
}
return &debugServiceLog{stream}, nil
}
type Debug_LogService interface {
Context() context.Context
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error
Recv() (*Record, error)
}
type debugServiceLog struct {
stream client.Stream
}
func (x *debugServiceLog) Close() error {
return x.stream.Close()
}
func (x *debugServiceLog) Context() context.Context {
return x.stream.Context()
}
func (x *debugServiceLog) SendMsg(m interface{}) error {
return x.stream.Send(m)
}
func (x *debugServiceLog) RecvMsg(m interface{}) error {
return x.stream.Recv(m)
}
func (x *debugServiceLog) Recv() (*Record, error) {
m := new(Record)
err := x.stream.Recv(m)
if err != nil {
return nil, err
}
return m, nil
}
func (c *debugService) Health(ctx context.Context, in *HealthRequest, opts ...client.CallOption) (*HealthResponse, error) {
req := c.c.NewRequest(c.name, "Debug.Health", in)
out := new(HealthResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *debugService) Stats(ctx context.Context, in *StatsRequest, opts ...client.CallOption) (*StatsResponse, error) {
req := c.c.NewRequest(c.name, "Debug.Stats", in)
out := new(StatsResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *debugService) Trace(ctx context.Context, in *TraceRequest, opts ...client.CallOption) (*TraceResponse, error) {
req := c.c.NewRequest(c.name, "Debug.Trace", in)
out := new(TraceResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *debugService) Cache(ctx context.Context, in *CacheRequest, opts ...client.CallOption) (*CacheResponse, error) {
req := c.c.NewRequest(c.name, "Debug.Cache", in)
out := new(CacheResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for Debug service
type DebugHandler interface {
Log(context.Context, *LogRequest, Debug_LogStream) error
Health(context.Context, *HealthRequest, *HealthResponse) error
Stats(context.Context, *StatsRequest, *StatsResponse) error
Trace(context.Context, *TraceRequest, *TraceResponse) error
Cache(context.Context, *CacheRequest, *CacheResponse) error
}
func RegisterDebugHandler(s server.Server, hdlr DebugHandler, opts ...server.HandlerOption) error {
type debug interface {
Log(ctx context.Context, stream server.Stream) error
Health(ctx context.Context, in *HealthRequest, out *HealthResponse) error
Stats(ctx context.Context, in *StatsRequest, out *StatsResponse) error
Trace(ctx context.Context, in *TraceRequest, out *TraceResponse) error
Cache(ctx context.Context, in *CacheRequest, out *CacheResponse) error
}
type Debug struct {
debug
}
h := &debugHandler{hdlr}
return s.Handle(s.NewHandler(&Debug{h}, opts...))
}
type debugHandler struct {
DebugHandler
}
func (h *debugHandler) Log(ctx context.Context, stream server.Stream) error {
m := new(LogRequest)
if err := stream.Recv(m); err != nil {
return err
}
return h.DebugHandler.Log(ctx, m, &debugLogStream{stream})
}
type Debug_LogStream interface {
Context() context.Context
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error
Send(*Record) error
}
type debugLogStream struct {
stream server.Stream
}
func (x *debugLogStream) Close() error {
return x.stream.Close()
}
func (x *debugLogStream) Context() context.Context {
return x.stream.Context()
}
func (x *debugLogStream) SendMsg(m interface{}) error {
return x.stream.Send(m)
}
func (x *debugLogStream) RecvMsg(m interface{}) error {
return x.stream.Recv(m)
}
func (x *debugLogStream) Send(m *Record) error {
return x.stream.Send(m)
}
func (h *debugHandler) Health(ctx context.Context, in *HealthRequest, out *HealthResponse) error {
return h.DebugHandler.Health(ctx, in, out)
}
func (h *debugHandler) Stats(ctx context.Context, in *StatsRequest, out *StatsResponse) error {
return h.DebugHandler.Stats(ctx, in, out)
}
func (h *debugHandler) Trace(ctx context.Context, in *TraceRequest, out *TraceResponse) error {
return h.DebugHandler.Trace(ctx, in, out)
}
func (h *debugHandler) Cache(ctx context.Context, in *CacheRequest, out *CacheResponse) error {
return h.DebugHandler.Cache(ctx, in, out)
}

View File

@ -1,106 +0,0 @@
syntax = "proto3";
service Debug {
rpc Log(LogRequest) returns (stream Record) {};
rpc Health(HealthRequest) returns (HealthResponse) {};
rpc Stats(StatsRequest) returns (StatsResponse) {};
rpc Trace(TraceRequest) returns (TraceResponse) {};
rpc Cache(CacheRequest) returns (CacheResponse) {};
}
message HealthRequest {
// optional service name
string service = 1;
}
message HealthResponse {
// default: ok
string status = 1;
}
message StatsRequest {
// optional service name
string service = 1;
}
message StatsResponse {
// timestamp of recording
uint64 timestamp = 1;
// unix timestamp
uint64 started = 2;
// in seconds
uint64 uptime = 3;
// in bytes
uint64 memory = 4;
// num threads
uint64 threads = 5;
// total gc in nanoseconds
uint64 gc = 6;
// total number of requests
uint64 requests = 7;
// total number of errors
uint64 errors = 8;
}
// LogRequest requests service logs
message LogRequest {
// service to request logs for
string service = 1;
// stream records continuously
bool stream = 2;
// count of records to request
int64 count = 3;
// relative time in seconds
// before the current time
// from which to show logs
int64 since = 4;
}
// Record is service log record
message Record {
// timestamp of log record
int64 timestamp = 1;
// record metadata
map<string,string> metadata = 2;
// message
string message = 3;
}
message TraceRequest {
// trace id to retrieve
string id = 1;
}
message TraceResponse {
repeated Span spans = 1;
}
enum SpanType {
INBOUND = 0;
OUTBOUND = 1;
}
message Span {
// the trace id
string trace = 1;
// id of the span
string id = 2;
// parent span
string parent = 3;
// name of the resource
string name = 4;
// time of start in nanoseconds
uint64 started = 5;
// duration of the execution in nanoseconds
uint64 duration = 6;
// associated metadata
map<string,string> metadata = 7;
SpanType type = 8;
}
message CacheRequest {}
message CacheResponse {
map<string, string> values = 1;
}

View File

@ -1,64 +0,0 @@
package service
import (
"time"
"github.com/micro/go-micro/v3/debug"
"github.com/micro/go-micro/v3/debug/log"
)
type serviceLog struct {
Client *debugClient
}
// Read reads log entries from the logger
func (s *serviceLog) Read(opts ...log.ReadOption) ([]log.Record, error) {
var options log.ReadOptions
for _, o := range opts {
o(&options)
}
stream, err := s.Client.Log(options.Since, options.Count, false)
if err != nil {
return nil, err
}
defer stream.Stop()
// stream the records until nothing is left
var records []log.Record
for record := range stream.Chan() {
records = append(records, record)
}
return records, nil
}
// There is no write support
func (s *serviceLog) Write(r log.Record) error {
return nil
}
// Stream log records
func (s *serviceLog) Stream() (log.Stream, error) {
return s.Client.Log(time.Time{}, 0, true)
}
// NewLog returns a new log interface
func NewLog(opts ...log.Option) log.Log {
var options log.Options
for _, o := range opts {
o(&options)
}
name := options.Name
// set the default name
if len(name) == 0 {
name = debug.DefaultName
}
return &serviceLog{
Client: NewClient(name),
}
}

View File

@ -1,25 +0,0 @@
package service
import (
"github.com/micro/go-micro/v3/debug/log"
)
type logStream struct {
stream chan log.Record
stop chan bool
}
func (l *logStream) Chan() <-chan log.Record {
return l.stream
}
func (l *logStream) Stop() error {
select {
case <-l.stop:
return nil
default:
close(l.stream)
close(l.stop)
}
return nil
}

View File

@ -5,10 +5,11 @@ import (
"sync"
"time"
"github.com/micro/go-micro/v3/debug/stats"
"github.com/micro/go-micro/v3/util/ring"
)
type stats struct {
type memoryStats struct {
// used to store past stats
buffer *ring.Buffer
@ -18,7 +19,7 @@ type stats struct {
errors uint64
}
func (s *stats) snapshot() *Stat {
func (s *memoryStats) snapshot() *stats.Stat {
s.RLock()
defer s.RUnlock()
@ -27,7 +28,7 @@ func (s *stats) snapshot() *Stat {
now := time.Now().Unix()
return &Stat{
return &stats.Stat{
Timestamp: now,
Started: s.started,
Uptime: now - s.started,
@ -39,32 +40,31 @@ func (s *stats) snapshot() *Stat {
}
}
func (s *stats) Read() ([]*Stat, error) {
// TODO adjustable size and optional read values
buf := s.buffer.Get(60)
var stats []*Stat
func (s *memoryStats) Read() ([]*stats.Stat, error) {
buf := s.buffer.Get(s.buffer.Size())
var buffer []*stats.Stat
// get a value from the buffer if it exists
for _, b := range buf {
stat, ok := b.Value.(*Stat)
stat, ok := b.Value.(*stats.Stat)
if !ok {
continue
}
stats = append(stats, stat)
buffer = append(buffer, stat)
}
// get a snapshot
stats = append(stats, s.snapshot())
buffer = append(buffer, s.snapshot())
return stats, nil
return buffer, nil
}
func (s *stats) Write(stat *Stat) error {
func (s *memoryStats) Write(stat *stats.Stat) error {
s.buffer.Put(stat)
return nil
}
func (s *stats) Record(err error) error {
func (s *memoryStats) Record(err error) error {
s.Lock()
defer s.Unlock()
@ -81,9 +81,9 @@ func (s *stats) Record(err error) error {
// NewStats returns a new in memory stats buffer
// TODO add options
func NewStats() Stats {
return &stats{
func NewStats() stats.Stats {
return &memoryStats{
started: time.Now().Unix(),
buffer: ring.New(60),
buffer: ring.New(1),
}
}

View File

@ -30,7 +30,3 @@ type Stat struct {
// Total errors
Errors uint64
}
var (
DefaultStats = NewStats()
)

6
go.mod
View File

@ -9,7 +9,6 @@ require (
github.com/bitly/go-simplejson v0.5.0
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/bwmarrin/discordgo v0.20.2
github.com/caddyserver/certmagic v0.10.6
github.com/coreos/bbolt v1.3.3 // indirect
github.com/coreos/etcd v3.3.18+incompatible
@ -21,13 +20,11 @@ require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/ef-ds/deque v1.0.4-0.20190904040645-54cb57c252a1
github.com/evanphx/json-patch/v5 v5.0.0
github.com/forestgiant/sliceutil v0.0.0-20160425183142-94783f95db6c
github.com/fsnotify/fsnotify v1.4.7
github.com/fsouza/go-dockerclient v1.6.0
github.com/ghodss/yaml v1.0.0
github.com/go-acme/lego/v3 v3.4.0
github.com/go-git/go-git/v5 v5.1.0
github.com/go-telegram-bot-api/telegram-bot-api v4.6.4+incompatible // indirect
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee
github.com/gobwas/pool v0.2.0 // indirect
github.com/gobwas/ws v1.0.3
@ -52,13 +49,11 @@ require (
github.com/mitchellh/hashstructure v1.0.0
github.com/nats-io/nats-server/v2 v2.1.6 // indirect
github.com/nats-io/nats.go v1.9.2
github.com/nlopes/slack v0.6.1-0.20191106133607-d06c2a2b3249
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pkg/errors v0.9.1
github.com/soheilhy/cmux v0.1.4 // indirect
github.com/stretchr/testify v1.4.0
github.com/technoweenie/multipartstreamer v1.0.1 // indirect
github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf
github.com/tmc/grpc-websocket-proxy v0.0.0-20200122045848-3419fae592fc // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
@ -71,6 +66,5 @@ require (
google.golang.org/genproto v0.0.0-20191216164720-4f79533eabd1
google.golang.org/grpc v1.26.0
google.golang.org/protobuf v1.22.0
gopkg.in/telegram-bot-api.v4 v4.6.4
sigs.k8s.io/yaml v1.1.0 // indirect
)

15
go.sum
View File

@ -64,8 +64,6 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4Yn
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b h1:L/QXpzIa3pOvUGt1D1lA5KjYhPBAN/3iWdP7xeFS9F0=
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b/go.mod h1:H0wQNHz2YrLsuXOZozoeDmnHXkNCRmMW0gwFWDfEZDA=
github.com/bwmarrin/discordgo v0.20.2 h1:nA7jiTtqUA9lT93WL2jPjUp8ZTEInRujBdx1C9gkr20=
github.com/bwmarrin/discordgo v0.20.2/go.mod h1:O9S4p+ofTFwB02em7jkpkV8M3R0/PUVOwN61zSZ0r4Q=
github.com/caddyserver/certmagic v0.10.6 h1:sCya6FmfaN74oZE46kqfaFOVoROD/mF36rTQfjN7TZc=
github.com/caddyserver/certmagic v0.10.6/go.mod h1:Y8jcUBctgk/IhpAzlHKfimZNyXCkfGgRTC0orl8gROQ=
github.com/cenkalti/backoff/v4 v4.0.0 h1:6VeaLF9aI+MAUQ95106HwWzYZgJJpZ4stumjj6RFYAU=
@ -134,8 +132,6 @@ github.com/exoscale/egoscale v0.18.1/go.mod h1:Z7OOdzzTOz1Q1PjQXumlz9Wn/CddH0zSY
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568 h1:BHsljHzVlRcyQhjrss6TZTdY2VfCqZPbv5k3iBFa2ZQ=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
github.com/forestgiant/sliceutil v0.0.0-20160425183142-94783f95db6c h1:pBgVXWDXju1m8W4lnEeIqTHPOzhTUO81a7yknM/xQR4=
github.com/forestgiant/sliceutil v0.0.0-20160425183142-94783f95db6c/go.mod h1:pFdJbAhRf7rh6YYMUdIQGyzne6zYL1tCUW8QV2B3UfY=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsouza/go-dockerclient v1.6.0 h1:f7j+AX94143JL1H3TiqSMkM4EcLDI0De1qD4GGn3Hig=
@ -162,8 +158,6 @@ github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-telegram-bot-api/telegram-bot-api v4.6.4+incompatible h1:2cauKuaELYAEARXRkq2LrJ0yDDv1rW7+wrTEdVL3uaU=
github.com/go-telegram-bot-api/telegram-bot-api v4.6.4+incompatible/go.mod h1:qf9acutJ8cwBUhm1bqgz6Bei9/C/c93FPDljKWwsOgM=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo=
github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8=
@ -222,8 +216,6 @@ github.com/gorilla/handlers v1.4.2 h1:0QniY0USkHQ1RGCLfKxeNHK9bkDHGRYGNDFBCS+YAR
github.com/gorilla/handlers v1.4.2/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ=
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/websocket v1.2.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 h1:THDBEeQ9xZ8JEaCLyLQqXMMdRqNr0QAUJTIkQAUtFjg=
@ -331,8 +323,6 @@ github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OS
github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32/go.mod h1:9wM+0iRr9ahx58uYLpLIr5fm8diHn0JbqRycJi6w0Ms=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nlopes/slack v0.6.1-0.20191106133607-d06c2a2b3249 h1:Pr5gZa2VcmktVwq0lyC39MsN5tz356vC/pQHKvq+QBo=
github.com/nlopes/slack v0.6.1-0.20191106133607-d06c2a2b3249/go.mod h1:JzQ9m3PMAqcpeCam7UaHSuBuupz7CmpjehYMayT6YOk=
github.com/nrdcg/auroradns v1.0.0/go.mod h1:6JPXKzIRzZzMqtTDgueIhTi6rFf1QvYE/HzqidhOhjw=
github.com/nrdcg/dnspod-go v0.4.0/go.mod h1:vZSoFSFeQVm2gWLMkyX61LZ8HI3BaqtHZWgPTGKr6KQ=
github.com/nrdcg/goinwx v0.6.1/go.mod h1:XPiut7enlbEdntAqalBIqcYcTEVhpv/dKWgDCX2SwKQ=
@ -416,8 +406,6 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
github.com/technoweenie/multipartstreamer v1.0.1 h1:XRztA5MXiR1TIRHxH2uNxXxaIkKQDeX7m2XsSOlQEnM=
github.com/technoweenie/multipartstreamer v1.0.1/go.mod h1:jNVxdtShOxzAsukZwTSw6MDx5eUJoiEBsSvzDU9uzog=
github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf h1:Z2X3Os7oRzpdJ75iPqWZc0HeJWFYNCvKsfpQwFpRNTA=
github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf/go.mod h1:M8agBzgqHIhgj7wEn9/0hJUZcrvt9VY+Ln+S1I5Mha0=
github.com/timewasted/linode v0.0.0-20160829202747-37e84520dcf7/go.mod h1:imsgLplxEC/etjIhdr3dNzV3JeT27LbVu5pYWm0JCBY=
@ -457,7 +445,6 @@ go.uber.org/zap v1.13.0 h1:nR6NoDBgAf67s68NhaXbsojM+2gxp3S1hWkHDl27pVU=
go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
golang.org/x/crypto v0.0.0-20180621125126-a49355c7e3f8/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181030102418-4d3f4d9ffa16/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190219172222-a4c6cb3142f2/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
@ -647,8 +634,6 @@ gopkg.in/resty.v1 v1.9.1/go.mod h1:vo52Hzryw9PnPHcJfPsBiFW62XhNx5OczbV9y+IMpgc=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/square/go-jose.v2 v2.3.1 h1:SK5KegNXmKmqE342YYN2qPHEnUYeoMiXXl1poUlI+o4=
gopkg.in/square/go-jose.v2 v2.3.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
gopkg.in/telegram-bot-api.v4 v4.6.4 h1:hpHWhzn4jTCsAJZZ2loNKfy2QWyPDRJVl3aTFXeMW8g=
gopkg.in/telegram-bot-api.v4 v4.6.4/go.mod h1:5DpGO5dbumb40px+dXcwCpcjmeHNYLpk0bp3XRNvWDM=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/warnings.v0 v0.1.2 h1:wFXVbFY8DY5/xOe1ECiWdKCzZlxgshcYVNkBHstARME=

View File

@ -113,8 +113,6 @@ func (l *defaultLogger) Log(level Level, v ...interface{}) {
metadata += fmt.Sprintf(" %s=%v", k, fields[k])
}
dlog.DefaultLog.Write(rec)
t := rec.Timestamp.Format("2006-01-02 15:04:05")
fmt.Printf("%s %s %v\n", t, metadata, rec.Message)
}
@ -154,8 +152,6 @@ func (l *defaultLogger) Logf(level Level, format string, v ...interface{}) {
metadata += fmt.Sprintf(" %s=%v", k, fields[k])
}
dlog.DefaultLog.Write(rec)
t := rec.Timestamp.Format("2006-01-02 15:04:05")
fmt.Printf("%s %s %v\n", t, metadata, rec.Message)
}

View File

@ -9,7 +9,6 @@ import (
"github.com/micro/go-micro/v3/codec"
"github.com/micro/go-micro/v3/registry/mdns"
"github.com/micro/go-micro/v3/server"
"github.com/micro/go-micro/v3/transport"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding"
)
@ -70,7 +69,6 @@ func newOptions(opt ...server.Option) server.Options {
Metadata: map[string]string{},
Broker: http.NewBroker(),
Registry: mdns.NewRegistry(),
Transport: transport.DefaultTransport,
Address: server.DefaultAddress,
Name: server.DefaultName,
Id: server.DefaultId,

View File

@ -14,6 +14,7 @@ import (
"github.com/micro/go-micro/v3/registry"
"github.com/micro/go-micro/v3/registry/mdns"
"github.com/micro/go-micro/v3/transport"
thttp "github.com/micro/go-micro/v3/transport/http"
)
type Options struct {
@ -72,7 +73,7 @@ func newOptions(opt ...Option) Options {
}
if opts.Transport == nil {
opts.Transport = transport.DefaultTransport
opts.Transport = thttp.NewTransport()
}
if opts.RegisterCheck == nil {
@ -228,7 +229,7 @@ func TLSConfig(t *tls.Config) Option {
// set the default transport if one is not
// already set. Required for Init call below.
if o.Transport == nil {
o.Transport = transport.DefaultTransport
o.Transport = thttp.NewTransport()
}
// set the transport tls

View File

@ -1,11 +1,602 @@
// Package http returns a http2 transport using net/http
package http
import (
"bufio"
"bytes"
"crypto/tls"
"errors"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"sync"
"time"
"github.com/micro/go-micro/v3/transport"
maddr "github.com/micro/go-micro/v3/util/addr"
"github.com/micro/go-micro/v3/util/buf"
mnet "github.com/micro/go-micro/v3/util/net"
mls "github.com/micro/go-micro/v3/util/tls"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
)
// NewTransport returns a new http transport using net/http and supporting http2
func NewTransport(opts ...transport.Option) transport.Transport {
return transport.NewTransport(opts...)
type httpTransport struct {
opts transport.Options
}
type httpTransportClient struct {
ht *httpTransport
addr string
conn net.Conn
dialOpts transport.DialOptions
once sync.Once
sync.RWMutex
// request must be stored for response processing
r chan *http.Request
bl []*http.Request
buff *bufio.Reader
// local/remote ip
local string
remote string
}
type httpTransportSocket struct {
ht *httpTransport
w http.ResponseWriter
r *http.Request
rw *bufio.ReadWriter
mtx sync.RWMutex
// the hijacked when using http 1
conn net.Conn
// for the first request
ch chan *http.Request
// h2 things
buf *bufio.Reader
// indicate if socket is closed
closed chan bool
// local/remote ip
local string
remote string
}
type httpTransportListener struct {
ht *httpTransport
listener net.Listener
}
func (h *httpTransportClient) Local() string {
return h.local
}
func (h *httpTransportClient) Remote() string {
return h.remote
}
func (h *httpTransportClient) Send(m *transport.Message) error {
header := make(http.Header)
for k, v := range m.Header {
header.Set(k, v)
}
b := buf.New(bytes.NewBuffer(m.Body))
defer b.Close()
req := &http.Request{
Method: "POST",
URL: &url.URL{
Scheme: "http",
Host: h.addr,
},
Header: header,
Body: b,
ContentLength: int64(b.Len()),
Host: h.addr,
}
h.Lock()
h.bl = append(h.bl, req)
select {
case h.r <- h.bl[0]:
h.bl = h.bl[1:]
default:
}
h.Unlock()
// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
}
return req.Write(h.conn)
}
func (h *httpTransportClient) Recv(m *transport.Message) error {
if m == nil {
return errors.New("message passed in is nil")
}
var r *http.Request
if !h.dialOpts.Stream {
rc, ok := <-h.r
if !ok {
return io.EOF
}
r = rc
}
// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
}
rsp, err := http.ReadResponse(h.buff, r)
if err != nil {
return err
}
defer rsp.Body.Close()
b, err := ioutil.ReadAll(rsp.Body)
if err != nil {
return err
}
if rsp.StatusCode != 200 {
return errors.New(rsp.Status + ": " + string(b))
}
m.Body = b
if m.Header == nil {
m.Header = make(map[string]string, len(rsp.Header))
}
for k, v := range rsp.Header {
if len(v) > 0 {
m.Header[k] = v[0]
} else {
m.Header[k] = ""
}
}
return nil
}
func (h *httpTransportClient) Close() error {
h.once.Do(func() {
h.Lock()
h.buff.Reset(nil)
h.Unlock()
close(h.r)
})
return h.conn.Close()
}
func (h *httpTransportSocket) Local() string {
return h.local
}
func (h *httpTransportSocket) Remote() string {
return h.remote
}
func (h *httpTransportSocket) Recv(m *transport.Message) error {
if m == nil {
return errors.New("message passed in is nil")
}
if m.Header == nil {
m.Header = make(map[string]string, len(h.r.Header))
}
// process http 1
if h.r.ProtoMajor == 1 {
// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
}
var r *http.Request
select {
// get first request
case r = <-h.ch:
// read next request
default:
rr, err := http.ReadRequest(h.rw.Reader)
if err != nil {
return err
}
r = rr
}
// read body
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return err
}
// set body
r.Body.Close()
m.Body = b
// set headers
for k, v := range r.Header {
if len(v) > 0 {
m.Header[k] = v[0]
} else {
m.Header[k] = ""
}
}
// return early early
return nil
}
// only process if the socket is open
select {
case <-h.closed:
return io.EOF
default:
// no op
}
// processing http2 request
// read streaming body
// set max buffer size
// TODO: adjustable buffer size
buf := make([]byte, 4*1024*1024)
// read the request body
n, err := h.buf.Read(buf)
// not an eof error
if err != nil {
return err
}
// check if we have data
if n > 0 {
m.Body = buf[:n]
}
// set headers
for k, v := range h.r.Header {
if len(v) > 0 {
m.Header[k] = v[0]
} else {
m.Header[k] = ""
}
}
// set path
m.Header[":path"] = h.r.URL.Path
return nil
}
func (h *httpTransportSocket) Send(m *transport.Message) error {
if h.r.ProtoMajor == 1 {
// make copy of header
hdr := make(http.Header)
for k, v := range h.r.Header {
hdr[k] = v
}
rsp := &http.Response{
Header: hdr,
Body: ioutil.NopCloser(bytes.NewReader(m.Body)),
Status: "200 OK",
StatusCode: 200,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
ContentLength: int64(len(m.Body)),
}
for k, v := range m.Header {
rsp.Header.Set(k, v)
}
// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
}
return rsp.Write(h.conn)
}
// only process if the socket is open
select {
case <-h.closed:
return io.EOF
default:
// no op
}
// we need to lock to protect the write
h.mtx.RLock()
defer h.mtx.RUnlock()
// set headers
for k, v := range m.Header {
h.w.Header().Set(k, v)
}
// write request
_, err := h.w.Write(m.Body)
// flush the trailers
h.w.(http.Flusher).Flush()
return err
}
func (h *httpTransportSocket) error(m *transport.Message) error {
if h.r.ProtoMajor == 1 {
rsp := &http.Response{
Header: make(http.Header),
Body: ioutil.NopCloser(bytes.NewReader(m.Body)),
Status: "500 Internal Server Error",
StatusCode: 500,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
ContentLength: int64(len(m.Body)),
}
for k, v := range m.Header {
rsp.Header.Set(k, v)
}
return rsp.Write(h.conn)
}
return nil
}
func (h *httpTransportSocket) Close() error {
h.mtx.Lock()
defer h.mtx.Unlock()
select {
case <-h.closed:
return nil
default:
// close the channel
close(h.closed)
// close the buffer
h.r.Body.Close()
// close the connection
if h.r.ProtoMajor == 1 {
return h.conn.Close()
}
}
return nil
}
func (h *httpTransportListener) Addr() string {
return h.listener.Addr().String()
}
func (h *httpTransportListener) Close() error {
return h.listener.Close()
}
func (h *httpTransportListener) Accept(fn func(transport.Socket)) error {
// create handler mux
mux := http.NewServeMux()
// register our transport handler
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
var buf *bufio.ReadWriter
var con net.Conn
// read a regular request
if r.ProtoMajor == 1 {
b, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
r.Body = ioutil.NopCloser(bytes.NewReader(b))
// hijack the conn
hj, ok := w.(http.Hijacker)
if !ok {
// we're screwed
http.Error(w, "cannot serve conn", http.StatusInternalServerError)
return
}
conn, bufrw, err := hj.Hijack()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer conn.Close()
buf = bufrw
con = conn
}
// buffered reader
bufr := bufio.NewReader(r.Body)
// save the request
ch := make(chan *http.Request, 1)
ch <- r
// create a new transport socket
sock := &httpTransportSocket{
ht: h.ht,
w: w,
r: r,
rw: buf,
buf: bufr,
ch: ch,
conn: con,
local: h.Addr(),
remote: r.RemoteAddr,
closed: make(chan bool),
}
// execute the socket
fn(sock)
})
// get optional handlers
if h.ht.opts.Context != nil {
handlers, ok := h.ht.opts.Context.Value("http_handlers").(map[string]http.Handler)
if ok {
for pattern, handler := range handlers {
mux.Handle(pattern, handler)
}
}
}
// default http2 server
srv := &http.Server{
Handler: mux,
}
// insecure connection use h2c
if !(h.ht.opts.Secure || h.ht.opts.TLSConfig != nil) {
srv.Handler = h2c.NewHandler(mux, &http2.Server{})
}
// begin serving
return srv.Serve(h.listener)
}
func (h *httpTransport) Dial(addr string, opts ...transport.DialOption) (transport.Client, error) {
dopts := transport.DialOptions{
Timeout: transport.DefaultDialTimeout,
}
for _, opt := range opts {
opt(&dopts)
}
var conn net.Conn
var err error
// TODO: support dial option here rather than using internal config
if h.opts.Secure || h.opts.TLSConfig != nil {
config := h.opts.TLSConfig
if config == nil {
config = &tls.Config{
InsecureSkipVerify: true,
}
}
config.NextProtos = []string{"http/1.1"}
conn, err = newConn(func(addr string) (net.Conn, error) {
return tls.DialWithDialer(&net.Dialer{Timeout: dopts.Timeout}, "tcp", addr, config)
})(addr)
} else {
conn, err = newConn(func(addr string) (net.Conn, error) {
return net.DialTimeout("tcp", addr, dopts.Timeout)
})(addr)
}
if err != nil {
return nil, err
}
return &httpTransportClient{
ht: h,
addr: addr,
conn: conn,
buff: bufio.NewReader(conn),
dialOpts: dopts,
r: make(chan *http.Request, 1),
local: conn.LocalAddr().String(),
remote: conn.RemoteAddr().String(),
}, nil
}
func (h *httpTransport) Listen(addr string, opts ...transport.ListenOption) (transport.Listener, error) {
var options transport.ListenOptions
for _, o := range opts {
o(&options)
}
var l net.Listener
var err error
// TODO: support use of listen options
if h.opts.Secure || h.opts.TLSConfig != nil {
config := h.opts.TLSConfig
fn := func(addr string) (net.Listener, error) {
if config == nil {
hosts := []string{addr}
// check if its a valid host:port
if host, _, err := net.SplitHostPort(addr); err == nil {
if len(host) == 0 {
hosts = maddr.IPs()
} else {
hosts = []string{host}
}
}
// generate a certificate
cert, err := mls.Certificate(hosts...)
if err != nil {
return nil, err
}
config = &tls.Config{Certificates: []tls.Certificate{cert}}
}
return tls.Listen("tcp", addr, config)
}
l, err = mnet.Listen(addr, fn)
} else {
fn := func(addr string) (net.Listener, error) {
return net.Listen("tcp", addr)
}
l, err = mnet.Listen(addr, fn)
}
if err != nil {
return nil, err
}
return &httpTransportListener{
ht: h,
listener: l,
}, nil
}
func (h *httpTransport) Init(opts ...transport.Option) error {
for _, o := range opts {
o(&h.opts)
}
return nil
}
func (h *httpTransport) Options() transport.Options {
return h.opts
}
func (h *httpTransport) String() string {
return "http"
}
func NewTransport(opts ...transport.Option) transport.Transport {
var options transport.Options
for _, o := range opts {
o(&options)
}
return &httpTransport{opts: options}
}

View File

@ -1,4 +1,4 @@
package transport
package http
import (
"bufio"

View File

@ -1,13 +1,15 @@
package transport
package http
import (
"io"
"net"
"testing"
"time"
"github.com/micro/go-micro/v3/transport"
)
func expectedPort(t *testing.T, expected string, lsn Listener) {
func expectedPort(t *testing.T, expected string, lsn transport.Listener) {
_, port, err := net.SplitHostPort(lsn.Addr())
if err != nil {
t.Errorf("Expected address to be `%s`, got error: %v", expected, err)
@ -53,11 +55,11 @@ func TestHTTPTransportCommunication(t *testing.T) {
}
defer l.Close()
fn := func(sock Socket) {
fn := func(sock transport.Socket) {
defer sock.Close()
for {
var m Message
var m transport.Message
if err := sock.Recv(&m); err != nil {
return
}
@ -86,7 +88,7 @@ func TestHTTPTransportCommunication(t *testing.T) {
}
defer c.Close()
m := Message{
m := transport.Message{
Header: map[string]string{
"Content-Type": "application/json",
},
@ -97,7 +99,7 @@ func TestHTTPTransportCommunication(t *testing.T) {
t.Errorf("Unexpected send err: %v", err)
}
var rm Message
var rm transport.Message
if err := c.Recv(&rm); err != nil {
t.Errorf("Unexpected recv err: %v", err)
@ -119,11 +121,11 @@ func TestHTTPTransportError(t *testing.T) {
}
defer l.Close()
fn := func(sock Socket) {
fn := func(sock transport.Socket) {
defer sock.Close()
for {
var m Message
var m transport.Message
if err := sock.Recv(&m); err != nil {
if err == io.EOF {
return
@ -131,7 +133,7 @@ func TestHTTPTransportError(t *testing.T) {
t.Fatal(err)
}
sock.(*httpTransportSocket).error(&Message{
sock.(*httpTransportSocket).error(&transport.Message{
Body: []byte(`an error occurred`),
})
}
@ -155,7 +157,7 @@ func TestHTTPTransportError(t *testing.T) {
}
defer c.Close()
m := Message{
m := transport.Message{
Header: map[string]string{
"Content-Type": "application/json",
},
@ -166,7 +168,7 @@ func TestHTTPTransportError(t *testing.T) {
t.Errorf("Unexpected send err: %v", err)
}
var rm Message
var rm transport.Message
err = c.Recv(&rm)
if err == nil {
@ -181,7 +183,7 @@ func TestHTTPTransportError(t *testing.T) {
}
func TestHTTPTransportTimeout(t *testing.T) {
tr := NewTransport(Timeout(time.Millisecond * 100))
tr := NewTransport(transport.Timeout(time.Millisecond * 100))
l, err := tr.Listen("127.0.0.1:0")
if err != nil {
@ -191,7 +193,7 @@ func TestHTTPTransportTimeout(t *testing.T) {
done := make(chan bool)
fn := func(sock Socket) {
fn := func(sock transport.Socket) {
defer func() {
sock.Close()
close(done)
@ -207,7 +209,7 @@ func TestHTTPTransportTimeout(t *testing.T) {
}()
for {
var m Message
var m transport.Message
if err := sock.Recv(&m); err != nil {
return
@ -231,7 +233,7 @@ func TestHTTPTransportTimeout(t *testing.T) {
}
defer c.Close()
m := Message{
m := transport.Message{
Header: map[string]string{
"Content-Type": "application/json",
},

View File

@ -1,601 +0,0 @@
package transport
import (
"bufio"
"bytes"
"crypto/tls"
"errors"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"sync"
"time"
maddr "github.com/micro/go-micro/v3/util/addr"
"github.com/micro/go-micro/v3/util/buf"
mnet "github.com/micro/go-micro/v3/util/net"
mls "github.com/micro/go-micro/v3/util/tls"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
)
type httpTransport struct {
opts Options
}
type httpTransportClient struct {
ht *httpTransport
addr string
conn net.Conn
dialOpts DialOptions
once sync.Once
sync.RWMutex
// request must be stored for response processing
r chan *http.Request
bl []*http.Request
buff *bufio.Reader
// local/remote ip
local string
remote string
}
type httpTransportSocket struct {
ht *httpTransport
w http.ResponseWriter
r *http.Request
rw *bufio.ReadWriter
mtx sync.RWMutex
// the hijacked when using http 1
conn net.Conn
// for the first request
ch chan *http.Request
// h2 things
buf *bufio.Reader
// indicate if socket is closed
closed chan bool
// local/remote ip
local string
remote string
}
type httpTransportListener struct {
ht *httpTransport
listener net.Listener
}
func (h *httpTransportClient) Local() string {
return h.local
}
func (h *httpTransportClient) Remote() string {
return h.remote
}
func (h *httpTransportClient) Send(m *Message) error {
header := make(http.Header)
for k, v := range m.Header {
header.Set(k, v)
}
b := buf.New(bytes.NewBuffer(m.Body))
defer b.Close()
req := &http.Request{
Method: "POST",
URL: &url.URL{
Scheme: "http",
Host: h.addr,
},
Header: header,
Body: b,
ContentLength: int64(b.Len()),
Host: h.addr,
}
h.Lock()
h.bl = append(h.bl, req)
select {
case h.r <- h.bl[0]:
h.bl = h.bl[1:]
default:
}
h.Unlock()
// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
}
return req.Write(h.conn)
}
func (h *httpTransportClient) Recv(m *Message) error {
if m == nil {
return errors.New("message passed in is nil")
}
var r *http.Request
if !h.dialOpts.Stream {
rc, ok := <-h.r
if !ok {
return io.EOF
}
r = rc
}
// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
}
rsp, err := http.ReadResponse(h.buff, r)
if err != nil {
return err
}
defer rsp.Body.Close()
b, err := ioutil.ReadAll(rsp.Body)
if err != nil {
return err
}
if rsp.StatusCode != 200 {
return errors.New(rsp.Status + ": " + string(b))
}
m.Body = b
if m.Header == nil {
m.Header = make(map[string]string, len(rsp.Header))
}
for k, v := range rsp.Header {
if len(v) > 0 {
m.Header[k] = v[0]
} else {
m.Header[k] = ""
}
}
return nil
}
func (h *httpTransportClient) Close() error {
h.once.Do(func() {
h.Lock()
h.buff.Reset(nil)
h.Unlock()
close(h.r)
})
return h.conn.Close()
}
func (h *httpTransportSocket) Local() string {
return h.local
}
func (h *httpTransportSocket) Remote() string {
return h.remote
}
func (h *httpTransportSocket) Recv(m *Message) error {
if m == nil {
return errors.New("message passed in is nil")
}
if m.Header == nil {
m.Header = make(map[string]string, len(h.r.Header))
}
// process http 1
if h.r.ProtoMajor == 1 {
// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
}
var r *http.Request
select {
// get first request
case r = <-h.ch:
// read next request
default:
rr, err := http.ReadRequest(h.rw.Reader)
if err != nil {
return err
}
r = rr
}
// read body
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return err
}
// set body
r.Body.Close()
m.Body = b
// set headers
for k, v := range r.Header {
if len(v) > 0 {
m.Header[k] = v[0]
} else {
m.Header[k] = ""
}
}
// return early early
return nil
}
// only process if the socket is open
select {
case <-h.closed:
return io.EOF
default:
// no op
}
// processing http2 request
// read streaming body
// set max buffer size
// TODO: adjustable buffer size
buf := make([]byte, 4*1024*1024)
// read the request body
n, err := h.buf.Read(buf)
// not an eof error
if err != nil {
return err
}
// check if we have data
if n > 0 {
m.Body = buf[:n]
}
// set headers
for k, v := range h.r.Header {
if len(v) > 0 {
m.Header[k] = v[0]
} else {
m.Header[k] = ""
}
}
// set path
m.Header[":path"] = h.r.URL.Path
return nil
}
func (h *httpTransportSocket) Send(m *Message) error {
if h.r.ProtoMajor == 1 {
// make copy of header
hdr := make(http.Header)
for k, v := range h.r.Header {
hdr[k] = v
}
rsp := &http.Response{
Header: hdr,
Body: ioutil.NopCloser(bytes.NewReader(m.Body)),
Status: "200 OK",
StatusCode: 200,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
ContentLength: int64(len(m.Body)),
}
for k, v := range m.Header {
rsp.Header.Set(k, v)
}
// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
}
return rsp.Write(h.conn)
}
// only process if the socket is open
select {
case <-h.closed:
return io.EOF
default:
// no op
}
// we need to lock to protect the write
h.mtx.RLock()
defer h.mtx.RUnlock()
// set headers
for k, v := range m.Header {
h.w.Header().Set(k, v)
}
// write request
_, err := h.w.Write(m.Body)
// flush the trailers
h.w.(http.Flusher).Flush()
return err
}
func (h *httpTransportSocket) error(m *Message) error {
if h.r.ProtoMajor == 1 {
rsp := &http.Response{
Header: make(http.Header),
Body: ioutil.NopCloser(bytes.NewReader(m.Body)),
Status: "500 Internal Server Error",
StatusCode: 500,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
ContentLength: int64(len(m.Body)),
}
for k, v := range m.Header {
rsp.Header.Set(k, v)
}
return rsp.Write(h.conn)
}
return nil
}
func (h *httpTransportSocket) Close() error {
h.mtx.Lock()
defer h.mtx.Unlock()
select {
case <-h.closed:
return nil
default:
// close the channel
close(h.closed)
// close the buffer
h.r.Body.Close()
// close the connection
if h.r.ProtoMajor == 1 {
return h.conn.Close()
}
}
return nil
}
func (h *httpTransportListener) Addr() string {
return h.listener.Addr().String()
}
func (h *httpTransportListener) Close() error {
return h.listener.Close()
}
func (h *httpTransportListener) Accept(fn func(Socket)) error {
// create handler mux
mux := http.NewServeMux()
// register our transport handler
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
var buf *bufio.ReadWriter
var con net.Conn
// read a regular request
if r.ProtoMajor == 1 {
b, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
r.Body = ioutil.NopCloser(bytes.NewReader(b))
// hijack the conn
hj, ok := w.(http.Hijacker)
if !ok {
// we're screwed
http.Error(w, "cannot serve conn", http.StatusInternalServerError)
return
}
conn, bufrw, err := hj.Hijack()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer conn.Close()
buf = bufrw
con = conn
}
// buffered reader
bufr := bufio.NewReader(r.Body)
// save the request
ch := make(chan *http.Request, 1)
ch <- r
// create a new transport socket
sock := &httpTransportSocket{
ht: h.ht,
w: w,
r: r,
rw: buf,
buf: bufr,
ch: ch,
conn: con,
local: h.Addr(),
remote: r.RemoteAddr,
closed: make(chan bool),
}
// execute the socket
fn(sock)
})
// get optional handlers
if h.ht.opts.Context != nil {
handlers, ok := h.ht.opts.Context.Value("http_handlers").(map[string]http.Handler)
if ok {
for pattern, handler := range handlers {
mux.Handle(pattern, handler)
}
}
}
// default http2 server
srv := &http.Server{
Handler: mux,
}
// insecure connection use h2c
if !(h.ht.opts.Secure || h.ht.opts.TLSConfig != nil) {
srv.Handler = h2c.NewHandler(mux, &http2.Server{})
}
// begin serving
return srv.Serve(h.listener)
}
func (h *httpTransport) Dial(addr string, opts ...DialOption) (Client, error) {
dopts := DialOptions{
Timeout: DefaultDialTimeout,
}
for _, opt := range opts {
opt(&dopts)
}
var conn net.Conn
var err error
// TODO: support dial option here rather than using internal config
if h.opts.Secure || h.opts.TLSConfig != nil {
config := h.opts.TLSConfig
if config == nil {
config = &tls.Config{
InsecureSkipVerify: true,
}
}
config.NextProtos = []string{"http/1.1"}
conn, err = newConn(func(addr string) (net.Conn, error) {
return tls.DialWithDialer(&net.Dialer{Timeout: dopts.Timeout}, "tcp", addr, config)
})(addr)
} else {
conn, err = newConn(func(addr string) (net.Conn, error) {
return net.DialTimeout("tcp", addr, dopts.Timeout)
})(addr)
}
if err != nil {
return nil, err
}
return &httpTransportClient{
ht: h,
addr: addr,
conn: conn,
buff: bufio.NewReader(conn),
dialOpts: dopts,
r: make(chan *http.Request, 1),
local: conn.LocalAddr().String(),
remote: conn.RemoteAddr().String(),
}, nil
}
func (h *httpTransport) Listen(addr string, opts ...ListenOption) (Listener, error) {
var options ListenOptions
for _, o := range opts {
o(&options)
}
var l net.Listener
var err error
// TODO: support use of listen options
if h.opts.Secure || h.opts.TLSConfig != nil {
config := h.opts.TLSConfig
fn := func(addr string) (net.Listener, error) {
if config == nil {
hosts := []string{addr}
// check if its a valid host:port
if host, _, err := net.SplitHostPort(addr); err == nil {
if len(host) == 0 {
hosts = maddr.IPs()
} else {
hosts = []string{host}
}
}
// generate a certificate
cert, err := mls.Certificate(hosts...)
if err != nil {
return nil, err
}
config = &tls.Config{Certificates: []tls.Certificate{cert}}
}
return tls.Listen("tcp", addr, config)
}
l, err = mnet.Listen(addr, fn)
} else {
fn := func(addr string) (net.Listener, error) {
return net.Listen("tcp", addr)
}
l, err = mnet.Listen(addr, fn)
}
if err != nil {
return nil, err
}
return &httpTransportListener{
ht: h,
listener: l,
}, nil
}
func (h *httpTransport) Init(opts ...Option) error {
for _, o := range opts {
o(&h.opts)
}
return nil
}
func (h *httpTransport) Options() Options {
return h.opts
}
func (h *httpTransport) String() string {
return "http"
}
func newHTTPTransport(opts ...Option) *httpTransport {
var options Options
for _, o := range opts {
o(&options)
}
return &httpTransport{opts: options}
}

View File

@ -46,11 +46,5 @@ type DialOption func(*DialOptions)
type ListenOption func(*ListenOptions)
var (
DefaultTransport Transport = newHTTPTransport()
DefaultDialTimeout = time.Second * 5
)
func NewTransport(opts ...Option) Transport {
return newHTTPTransport(opts...)
}

View File

@ -1,68 +0,0 @@
// Package mux provides proxy muxing
package mux
import (
"context"
"sync"
"github.com/micro/go-micro/v3/client/grpc"
"github.com/micro/go-micro/v3/debug/service/handler"
"github.com/micro/go-micro/v3/proxy"
"github.com/micro/go-micro/v3/server"
"github.com/micro/go-micro/v3/server/mucp"
)
// Server is a proxy muxer that incudes the use of the DefaultHandler
type Server struct {
// name of service
Name string
// Proxy handler
Proxy proxy.Proxy
// The default handler
Handler Handler
}
type Handler interface {
proxy.Proxy
NewHandler(interface{}, ...server.HandlerOption) server.Handler
Handle(server.Handler) error
}
var (
once sync.Once
)
func (s *Server) ProcessMessage(ctx context.Context, msg server.Message) error {
if msg.Topic() == s.Name {
return s.Handler.ProcessMessage(ctx, msg)
}
return s.Proxy.ProcessMessage(ctx, msg)
}
func (s *Server) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error {
if req.Service() == s.Name {
return s.Handler.ServeRequest(ctx, req, rsp)
}
return s.Proxy.ServeRequest(ctx, req, rsp)
}
func New(name string, p proxy.Proxy) *Server {
r := mucp.DefaultRouter
// only register this once
once.Do(func() {
r.Handle(
// inject the debug handler
r.NewHandler(
handler.NewHandler(grpc.NewClient()),
server.InternalHandler(true),
),
)
})
return &Server{
Name: name,
Proxy: p,
Handler: r,
}
}