minimize allocations (#1472)

* server: minimize allocations on re-register

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* server: stop old instance before Init()

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* client/grpc: fix allocations in protobuf marshal

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* codec/json: fix allocations in protobuf marshal

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* remove stop from init

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* codec/grpc: expose MaxMessageSize

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* codec: use buffer pool

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* metadata: minimize reallocations

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* util/wrapper: use metadata helper

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* registry/cache: move logs to debug level

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* server: move logs to debug level

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* server: cache service only when Advertise is ip addr

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* server: use metadata.Copy

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2020-04-08 12:50:19 +03:00
parent f874f4cc97
commit f349c7723f

79
grpc.go
View File

@ -59,6 +59,9 @@ type grpcServer struct {
started bool started bool
// used for first registration // used for first registration
registered bool registered bool
// registry service instance
rsvc *registry.Service
} }
func init() { func init() {
@ -102,6 +105,9 @@ func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp se
} }
func (g *grpcServer) configure(opts ...server.Option) { func (g *grpcServer) configure(opts ...server.Option) {
g.Lock()
defer g.Unlock()
// Don't reprocess where there's no config // Don't reprocess where there's no config
if len(opts) == 0 && g.srv != nil { if len(opts) == 0 && g.srv != nil {
return return
@ -127,6 +133,7 @@ func (g *grpcServer) configure(opts ...server.Option) {
gopts = append(gopts, opts...) gopts = append(gopts, opts...)
} }
g.rsvc = nil
g.srv = grpc.NewServer(gopts...) g.srv = grpc.NewServer(gopts...)
} }
@ -559,11 +566,24 @@ func (g *grpcServer) Subscribe(sb server.Subscriber) error {
} }
func (g *grpcServer) Register() error { func (g *grpcServer) Register() error {
g.RLock()
rsvc := g.rsvc
config := g.opts
g.RUnlock()
// if service already filled, reuse it and return early
if rsvc != nil {
rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL)}
if err := config.Registry.Register(rsvc, rOpts...); err != nil {
return err
}
return nil
}
var err error var err error
var advt, host, port string var advt, host, port string
var cacheService bool
// parse address for host, port
config := g.opts
// check the advertise address first // check the advertise address first
// if it exists then use it, otherwise // if it exists then use it, otherwise
@ -584,16 +604,17 @@ func (g *grpcServer) Register() error {
host = advt host = advt
} }
if ip := net.ParseIP(host); ip != nil {
cacheService = true
}
addr, err := addr.Extract(host) addr, err := addr.Extract(host)
if err != nil { if err != nil {
return err return err
} }
// make copy of metadata // make copy of metadata
md := make(meta.Metadata) md := meta.Copy(config.Metadata)
for k, v := range config.Metadata {
md[k] = v
}
// register service // register service
node := &registry.Node{ node := &registry.Node{
@ -646,13 +667,13 @@ func (g *grpcServer) Register() error {
Endpoints: endpoints, Endpoints: endpoints,
} }
g.Lock() g.RLock()
registered := g.registered registered := g.registered
g.Unlock() g.RUnlock()
if !registered { if !registered {
if logger.V(logger.InfoLevel, logger.DefaultLogger) { if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Infof("Registry [%s] Registering node: %s", config.Registry.String(), node.Id) logger.Debugf("Registry [%s] Registering node: %s", config.Registry.String(), node.Id)
} }
} }
@ -671,6 +692,9 @@ func (g *grpcServer) Register() error {
g.Lock() g.Lock()
defer g.Unlock() defer g.Unlock()
if cacheService {
g.rsvc = service
}
g.registered = true g.registered = true
for sb := range g.subscribers { for sb := range g.subscribers {
@ -688,8 +712,8 @@ func (g *grpcServer) Register() error {
opts = append(opts, broker.DisableAutoAck()) opts = append(opts, broker.DisableAutoAck())
} }
if logger.V(logger.InfoLevel, logger.DefaultLogger) { if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Infof("Subscribing to topic: %s", sb.Topic()) logger.Debug("Subscribing to topic: %s", sb.Topic())
} }
sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...) sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...)
if err != nil { if err != nil {
@ -705,7 +729,9 @@ func (g *grpcServer) Deregister() error {
var err error var err error
var advt, host, port string var advt, host, port string
g.RLock()
config := g.opts config := g.opts
g.RUnlock()
// check the advertise address first // check the advertise address first
// if it exists then use it, otherwise // if it exists then use it, otherwise
@ -742,14 +768,15 @@ func (g *grpcServer) Deregister() error {
Nodes: []*registry.Node{node}, Nodes: []*registry.Node{node},
} }
if logger.V(logger.InfoLevel, logger.DefaultLogger) { if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Infof("Deregistering node: %s", node.Id) logger.Debugf("Deregistering node: %s", node.Id)
} }
if err := config.Registry.Deregister(service); err != nil { if err := config.Registry.Deregister(service); err != nil {
return err return err
} }
g.Lock() g.Lock()
g.rsvc = nil
if !g.registered { if !g.registered {
g.Unlock() g.Unlock()
@ -760,8 +787,8 @@ func (g *grpcServer) Deregister() error {
for sb, subs := range g.subscribers { for sb, subs := range g.subscribers {
for _, sub := range subs { for _, sub := range subs {
if logger.V(logger.InfoLevel, logger.DefaultLogger) { if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Infof("Unsubscribing from topic: %s", sub.Topic()) logger.Debugf("Unsubscribing from topic: %s", sub.Topic())
} }
sub.Unsubscribe() sub.Unsubscribe()
} }
@ -819,11 +846,14 @@ func (g *grpcServer) Start() error {
if len(g.subscribers) > 0 { if len(g.subscribers) > 0 {
// connect to the broker // connect to the broker
if err := config.Broker.Connect(); err != nil { if err := config.Broker.Connect(); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Broker [%s] connect error: %v", config.Broker.String(), err)
}
return err return err
} }
if logger.V(logger.InfoLevel, logger.DefaultLogger) { if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Infof("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()) logger.Debugf("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())
} }
} }
@ -900,11 +930,15 @@ func (g *grpcServer) Start() error {
// close transport // close transport
ch <- nil ch <- nil
if logger.V(logger.InfoLevel, logger.DefaultLogger) { if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Infof("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()) logger.Debugf("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())
} }
// disconnect broker // disconnect broker
config.Broker.Disconnect() if err := config.Broker.Disconnect(); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Broker [%s] disconnect error: %v", config.Broker.String(), err)
}
}
}() }()
// mark the server as started // mark the server as started
@ -930,6 +964,7 @@ func (g *grpcServer) Stop() error {
select { select {
case err = <-ch: case err = <-ch:
g.Lock() g.Lock()
g.rsvc = nil
g.started = false g.started = false
g.Unlock() g.Unlock()
} }