minimize allocations in logger and tunnel code (#1323)

* logs alloc

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* fix allocs

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* fix allocs

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* tunnel allocs

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* try to fix tunnel

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* cache cipher for send

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* more logger

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* more logger

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* more logger

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* more logger

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* more logger

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* more logger

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* more logger

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2020-03-11 20:55:39 +03:00
parent b0164dca94
commit 05b18c03dc
3 changed files with 72 additions and 28 deletions

52
grpc.go
View File

@ -17,7 +17,7 @@ import (
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/micro/go-micro/v2/broker" "github.com/micro/go-micro/v2/broker"
"github.com/micro/go-micro/v2/errors" "github.com/micro/go-micro/v2/errors"
log "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/logger"
meta "github.com/micro/go-micro/v2/metadata" meta "github.com/micro/go-micro/v2/metadata"
"github.com/micro/go-micro/v2/registry" "github.com/micro/go-micro/v2/registry"
"github.com/micro/go-micro/v2/server" "github.com/micro/go-micro/v2/server"
@ -358,8 +358,10 @@ func (g *grpcServer) processRequest(stream grpc.ServerStream, service *service,
fn := func(ctx context.Context, req server.Request, rsp interface{}) (err error) { fn := func(ctx context.Context, req server.Request, rsp interface{}) (err error) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
log.Error("panic recovered: ", r) if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
log.Error(string(debug.Stack())) logger.Error("panic recovered: ", r)
logger.Error(string(debug.Stack()))
}
err = errors.InternalServerError("go.micro.server", "panic recovered: %v", r) err = errors.InternalServerError("go.micro.server", "panic recovered: %v", r)
} }
}() }()
@ -658,7 +660,9 @@ func (g *grpcServer) Register() error {
g.Unlock() g.Unlock()
if !registered { if !registered {
log.Infof("Registry [%s] Registering node: %s", config.Registry.String(), node.Id) if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Registry [%s] Registering node: %s", config.Registry.String(), node.Id)
}
} }
// create registry options // create registry options
@ -693,7 +697,9 @@ func (g *grpcServer) Register() error {
opts = append(opts, broker.DisableAutoAck()) opts = append(opts, broker.DisableAutoAck())
} }
log.Infof("Subscribing to topic: %s", sb.Topic()) if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Subscribing to topic: %s", sb.Topic())
}
sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...) sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...)
if err != nil { if err != nil {
return err return err
@ -745,7 +751,9 @@ func (g *grpcServer) Deregister() error {
Nodes: []*registry.Node{node}, Nodes: []*registry.Node{node},
} }
log.Infof("Deregistering node: %s", node.Id) if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Deregistering node: %s", node.Id)
}
if err := config.Registry.Deregister(service); err != nil { if err := config.Registry.Deregister(service); err != nil {
return err return err
} }
@ -761,7 +769,9 @@ func (g *grpcServer) Deregister() error {
for sb, subs := range g.subscribers { for sb, subs := range g.subscribers {
for _, sub := range subs { for _, sub := range subs {
log.Infof("Unsubscribing from topic: %s", sub.Topic()) if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Unsubscribing from topic: %s", sub.Topic())
}
sub.Unsubscribe() sub.Unsubscribe()
} }
g.subscribers[sb] = nil g.subscribers[sb] = nil
@ -807,7 +817,9 @@ func (g *grpcServer) Start() error {
} }
} }
log.Infof("Server [grpc] Listening on %s", ts.Addr().String()) if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Server [grpc] Listening on %s", ts.Addr().String())
}
g.Lock() g.Lock()
g.opts.Address = ts.Addr().String() g.opts.Address = ts.Addr().String()
g.Unlock() g.Unlock()
@ -819,18 +831,24 @@ func (g *grpcServer) Start() error {
return err return err
} }
log.Infof("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()) if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())
}
} }
// announce self to the world // announce self to the world
if err := g.Register(); err != nil { if err := g.Register(); err != nil {
log.Errorf("Server register error: ", err) if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Server register error: ", err)
}
} }
// micro: go ts.Accept(s.accept) // micro: go ts.Accept(s.accept)
go func() { go func() {
if err := g.srv.Serve(ts); err != nil { if err := g.srv.Serve(ts); err != nil {
log.Errorf("gRPC Server start error: ", err) if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("gRPC Server start error: ", err)
}
} }
}() }()
@ -852,7 +870,9 @@ func (g *grpcServer) Start() error {
// register self on interval // register self on interval
case <-t.C: case <-t.C:
if err := g.Register(); err != nil { if err := g.Register(); err != nil {
log.Error("Server register error: ", err) if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error("Server register error: ", err)
}
} }
// wait for exit // wait for exit
case ch = <-g.exit: case ch = <-g.exit:
@ -862,7 +882,9 @@ func (g *grpcServer) Start() error {
// deregister self // deregister self
if err := g.Deregister(); err != nil { if err := g.Deregister(); err != nil {
log.Error("Server deregister error: ", err) if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error("Server deregister error: ", err)
}
} }
// wait for waitgroup // wait for waitgroup
@ -887,7 +909,9 @@ func (g *grpcServer) Start() error {
// close transport // close transport
ch <- nil ch <- nil
log.Infof("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()) if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())
}
// disconnect broker // disconnect broker
config.Broker.Disconnect() config.Broker.Disconnect()
}() }()

View File

@ -14,7 +14,7 @@ import (
"unicode" "unicode"
"unicode/utf8" "unicode/utf8"
log "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/logger"
"github.com/micro/go-micro/v2/server" "github.com/micro/go-micro/v2/server"
) )
@ -86,7 +86,9 @@ func prepareEndpoint(method reflect.Method) *methodType {
replyType = mtype.In(3) replyType = mtype.In(3)
contextType = mtype.In(1) contextType = mtype.In(1)
default: default:
log.Error("method", mname, "of", mtype, "has wrong number of ins:", mtype.NumIn()) if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error("method", mname, "of", mtype, "has wrong number of ins:", mtype.NumIn())
}
return nil return nil
} }
@ -94,7 +96,9 @@ func prepareEndpoint(method reflect.Method) *methodType {
// check stream type // check stream type
streamType := reflect.TypeOf((*server.Stream)(nil)).Elem() streamType := reflect.TypeOf((*server.Stream)(nil)).Elem()
if !argType.Implements(streamType) { if !argType.Implements(streamType) {
log.Error(mname, "argument does not implement Streamer interface:", argType) if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(mname, "argument does not implement Streamer interface:", argType)
}
return nil return nil
} }
} else { } else {
@ -102,30 +106,40 @@ func prepareEndpoint(method reflect.Method) *methodType {
// First arg need not be a pointer. // First arg need not be a pointer.
if !isExportedOrBuiltinType(argType) { if !isExportedOrBuiltinType(argType) {
log.Error(mname, "argument type not exported:", argType) if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(mname, "argument type not exported:", argType)
}
return nil return nil
} }
if replyType.Kind() != reflect.Ptr { if replyType.Kind() != reflect.Ptr {
log.Error("method", mname, "reply type not a pointer:", replyType) if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error("method", mname, "reply type not a pointer:", replyType)
}
return nil return nil
} }
// Reply type must be exported. // Reply type must be exported.
if !isExportedOrBuiltinType(replyType) { if !isExportedOrBuiltinType(replyType) {
log.Error("method", mname, "reply type not exported:", replyType) if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error("method", mname, "reply type not exported:", replyType)
}
return nil return nil
} }
} }
// Endpoint() needs one out. // Endpoint() needs one out.
if mtype.NumOut() != 1 { if mtype.NumOut() != 1 {
log.Error("method", mname, "has wrong number of outs:", mtype.NumOut()) if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error("method", mname, "has wrong number of outs:", mtype.NumOut())
}
return nil return nil
} }
// The return type of the method must be error. // The return type of the method must be error.
if returnType := mtype.Out(0); returnType != typeOfError { if returnType := mtype.Out(0); returnType != typeOfError {
log.Error("method", mname, "returns", returnType.String(), "not error") if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error("method", mname, "returns", returnType.String(), "not error")
}
return nil return nil
} }
return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream} return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream}
@ -142,11 +156,13 @@ func (server *rServer) register(rcvr interface{}) error {
s.rcvr = reflect.ValueOf(rcvr) s.rcvr = reflect.ValueOf(rcvr)
sname := reflect.Indirect(s.rcvr).Type().Name() sname := reflect.Indirect(s.rcvr).Type().Name()
if sname == "" { if sname == "" {
log.Fatal("rpc: no service name for type", s.typ.String()) logger.Fatal("rpc: no service name for type", s.typ.String())
} }
if !isExported(sname) { if !isExported(sname) {
s := "rpc Register: type " + sname + " is not exported" s := "rpc Register: type " + sname + " is not exported"
log.Error(s) if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(s)
}
return errors.New(s) return errors.New(s)
} }
if _, present := server.serviceMap[sname]; present { if _, present := server.serviceMap[sname]; present {
@ -165,7 +181,9 @@ func (server *rServer) register(rcvr interface{}) error {
if len(s.method) == 0 { if len(s.method) == 0 {
s := "rpc Register: type " + sname + " has no exported methods of suitable type" s := "rpc Register: type " + sname + " has no exported methods of suitable type"
log.Error(s) if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(s)
}
return errors.New(s) return errors.New(s)
} }
server.serviceMap[s.name] = s server.serviceMap[s.name] = s

View File

@ -9,7 +9,7 @@ import (
"github.com/micro/go-micro/v2/broker" "github.com/micro/go-micro/v2/broker"
"github.com/micro/go-micro/v2/errors" "github.com/micro/go-micro/v2/errors"
log "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/logger"
"github.com/micro/go-micro/v2/metadata" "github.com/micro/go-micro/v2/metadata"
"github.com/micro/go-micro/v2/registry" "github.com/micro/go-micro/v2/registry"
"github.com/micro/go-micro/v2/server" "github.com/micro/go-micro/v2/server"
@ -171,8 +171,10 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
log.Error("panic recovered: ", r) if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
log.Error(string(debug.Stack())) logger.Error("panic recovered: ", r)
logger.Error(string(debug.Stack()))
}
err = errors.InternalServerError("go.micro.server", "panic recovered: %v", r) err = errors.InternalServerError("go.micro.server", "panic recovered: %v", r)
} }
}() }()