minimize allocations (#1472)

* server: minimize allocations on re-register

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* server: stop old instance before Init()

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* client/grpc: fix allocations in protobuf marshal

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* codec/json: fix allocations in protobuf marshal

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* remove stop from init

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* codec/grpc: expose MaxMessageSize

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* codec: use buffer pool

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* metadata: minimize reallocations

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* util/wrapper: use metadata helper

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* registry/cache: move logs to debug level

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* server: move logs to debug level

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* server: cache service only when Advertise is ip addr

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* server: use metadata.Copy

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2020-04-08 12:50:19 +03:00
committed by GitHub
parent 98fc3dfbad
commit 1fbc056dd4
10 changed files with 220 additions and 83 deletions

View File

@@ -59,6 +59,9 @@ type grpcServer struct {
started bool
// used for first registration
registered bool
// registry service instance
rsvc *registry.Service
}
func init() {
@@ -102,6 +105,9 @@ func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp se
}
func (g *grpcServer) configure(opts ...server.Option) {
g.Lock()
defer g.Unlock()
// Don't reprocess where there's no config
if len(opts) == 0 && g.srv != nil {
return
@@ -127,6 +133,7 @@ func (g *grpcServer) configure(opts ...server.Option) {
gopts = append(gopts, opts...)
}
g.rsvc = nil
g.srv = grpc.NewServer(gopts...)
}
@@ -559,11 +566,24 @@ func (g *grpcServer) Subscribe(sb server.Subscriber) error {
}
func (g *grpcServer) Register() error {
g.RLock()
rsvc := g.rsvc
config := g.opts
g.RUnlock()
// if service already filled, reuse it and return early
if rsvc != nil {
rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL)}
if err := config.Registry.Register(rsvc, rOpts...); err != nil {
return err
}
return nil
}
var err error
var advt, host, port string
// parse address for host, port
config := g.opts
var cacheService bool
// check the advertise address first
// if it exists then use it, otherwise
@@ -584,16 +604,17 @@ func (g *grpcServer) Register() error {
host = advt
}
if ip := net.ParseIP(host); ip != nil {
cacheService = true
}
addr, err := addr.Extract(host)
if err != nil {
return err
}
// make copy of metadata
md := make(meta.Metadata)
for k, v := range config.Metadata {
md[k] = v
}
md := meta.Copy(config.Metadata)
// register service
node := &registry.Node{
@@ -646,13 +667,13 @@ func (g *grpcServer) Register() error {
Endpoints: endpoints,
}
g.Lock()
g.RLock()
registered := g.registered
g.Unlock()
g.RUnlock()
if !registered {
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Registry [%s] Registering node: %s", config.Registry.String(), node.Id)
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Registry [%s] Registering node: %s", config.Registry.String(), node.Id)
}
}
@@ -671,6 +692,9 @@ func (g *grpcServer) Register() error {
g.Lock()
defer g.Unlock()
if cacheService {
g.rsvc = service
}
g.registered = true
for sb := range g.subscribers {
@@ -688,8 +712,8 @@ func (g *grpcServer) Register() error {
opts = append(opts, broker.DisableAutoAck())
}
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Subscribing to topic: %s", sb.Topic())
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debug("Subscribing to topic: %s", sb.Topic())
}
sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...)
if err != nil {
@@ -705,7 +729,9 @@ func (g *grpcServer) Deregister() error {
var err error
var advt, host, port string
g.RLock()
config := g.opts
g.RUnlock()
// check the advertise address first
// if it exists then use it, otherwise
@@ -742,14 +768,15 @@ func (g *grpcServer) Deregister() error {
Nodes: []*registry.Node{node},
}
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Deregistering node: %s", node.Id)
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Deregistering node: %s", node.Id)
}
if err := config.Registry.Deregister(service); err != nil {
return err
}
g.Lock()
g.rsvc = nil
if !g.registered {
g.Unlock()
@@ -760,8 +787,8 @@ func (g *grpcServer) Deregister() error {
for sb, subs := range g.subscribers {
for _, sub := range subs {
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Unsubscribing from topic: %s", sub.Topic())
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Unsubscribing from topic: %s", sub.Topic())
}
sub.Unsubscribe()
}
@@ -819,11 +846,14 @@ func (g *grpcServer) Start() error {
if len(g.subscribers) > 0 {
// connect to the broker
if err := config.Broker.Connect(); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Broker [%s] connect error: %v", config.Broker.String(), err)
}
return err
}
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())
}
}
@@ -900,11 +930,15 @@ func (g *grpcServer) Start() error {
// close transport
ch <- nil
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())
}
// disconnect broker
config.Broker.Disconnect()
if err := config.Broker.Disconnect(); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Broker [%s] disconnect error: %v", config.Broker.String(), err)
}
}
}()
// mark the server as started
@@ -930,6 +964,7 @@ func (g *grpcServer) Stop() error {
select {
case err = <-ch:
g.Lock()
g.rsvc = nil
g.started = false
g.Unlock()
}

View File

@@ -40,6 +40,8 @@ type rpcServer struct {
subscriber broker.Subscriber
// graceful exit
wg *sync.WaitGroup
rsvc *registry.Service
}
func newRpcServer(opts ...Option) Server {
@@ -459,10 +461,11 @@ func (s *rpcServer) Options() Options {
func (s *rpcServer) Init(opts ...Option) error {
s.Lock()
defer s.Unlock()
for _, opt := range opts {
opt(&s.opts)
}
// update router if its the default
if s.opts.Router == nil {
r := newRpcRouter()
@@ -472,7 +475,8 @@ func (s *rpcServer) Init(opts ...Option) error {
s.router = r
}
s.Unlock()
s.rsvc = nil
return nil
}
@@ -510,11 +514,24 @@ func (s *rpcServer) Subscribe(sb Subscriber) error {
}
func (s *rpcServer) Register() error {
s.RLock()
rsvc := s.rsvc
config := s.Options()
s.RUnlock()
if rsvc != nil {
rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL)}
if err := config.Registry.Register(rsvc, rOpts...); err != nil {
return err
}
return nil
}
var err error
var advt, host, port string
// parse address for host, port
config := s.Options()
var cacheService bool
// check the advertise address first
// if it exists then use it, otherwise
@@ -535,16 +552,17 @@ func (s *rpcServer) Register() error {
host = advt
}
if ip := net.ParseIP(host); ip != nil {
cacheService = true
}
addr, err := addr.Extract(host)
if err != nil {
return err
}
// make copy of metadata
md := make(metadata.Metadata)
for k, v := range config.Metadata {
md[k] = v
}
md := metadata.Copy(config.Metadata)
// mq-rpc(eg. nats) doesn't need the port. its addr is queue name.
if port != "" {
@@ -612,7 +630,9 @@ func (s *rpcServer) Register() error {
s.RUnlock()
if !registered {
log.Infof("Registry [%s] Registering node: %s", config.Registry.String(), node.Id)
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
log.Debugf("Registry [%s] Registering node: %s", config.Registry.String(), node.Id)
}
}
// create registry options
@@ -630,6 +650,9 @@ func (s *rpcServer) Register() error {
s.Lock()
defer s.Unlock()
if cacheService {
s.rsvc = service
}
s.registered = true
// set what we're advertising
s.opts.Advertise = addr
@@ -665,8 +688,9 @@ func (s *rpcServer) Register() error {
if err != nil {
return err
}
log.Infof("Subscribing to topic: %s", sub.Topic())
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
log.Debugf("Subscribing to topic: %s", sub.Topic())
}
s.subscribers[sb] = []broker.Subscriber{sub}
}
@@ -677,7 +701,9 @@ func (s *rpcServer) Deregister() error {
var err error
var advt, host, port string
s.RLock()
config := s.Options()
s.RUnlock()
// check the advertise address first
// if it exists then use it, otherwise
@@ -719,12 +745,15 @@ func (s *rpcServer) Deregister() error {
Nodes: []*registry.Node{node},
}
log.Infof("Registry [%s] Deregistering node: %s", config.Registry.String(), node.Id)
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
log.Debugf("Registry [%s] Deregistering node: %s", config.Registry.String(), node.Id)
}
if err := config.Registry.Deregister(service); err != nil {
return err
}
s.Lock()
s.rsvc = nil
if !s.registered {
s.Unlock()
@@ -741,7 +770,9 @@ func (s *rpcServer) Deregister() error {
for sb, subs := range s.subscribers {
for _, sub := range subs {
log.Infof("Unsubscribing %s from topic: %s", node.Id, sub.Topic())
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
log.Debugf("Unsubscribing %s from topic: %s", node.Id, sub.Topic())
}
sub.Unsubscribe()
}
s.subscribers[sb] = nil
@@ -767,7 +798,9 @@ func (s *rpcServer) Start() error {
return err
}
log.Infof("Transport [%s] Listening on %s", config.Transport.String(), ts.Addr())
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
log.Debugf("Transport [%s] Listening on %s", config.Transport.String(), ts.Addr())
}
// swap address
s.Lock()
@@ -775,22 +808,31 @@ func (s *rpcServer) Start() error {
s.opts.Address = ts.Addr()
s.Unlock()
bname := config.Broker.String()
// connect to the broker
if err := config.Broker.Connect(); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
log.Errorf("Broker [%s] connect error: %v", bname, err)
}
return err
}
bname := config.Broker.String()
log.Infof("Broker [%s] Connected to %s", bname, config.Broker.Address())
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
log.Debugf("Broker [%s] Connected to %s", bname, config.Broker.Address())
}
// use RegisterCheck func before register
if err = s.opts.RegisterCheck(s.opts.Context); err != nil {
log.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, err)
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
log.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, err)
}
} else {
// announce self to the world
if err = s.Register(); err != nil {
log.Errorf("Server %s-%s register error: %s", config.Name, config.Id, err)
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
log.Errorf("Server %s-%s register error: %s", config.Name, config.Id, err)
}
}
}
@@ -811,7 +853,9 @@ func (s *rpcServer) Start() error {
// check the error and backoff
default:
if err != nil {
log.Errorf("Accept error: %v", err)
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
log.Errorf("Accept error: %v", err)
}
time.Sleep(time.Second)
continue
}
@@ -844,17 +888,25 @@ func (s *rpcServer) Start() error {
s.RUnlock()
rerr := s.opts.RegisterCheck(s.opts.Context)
if rerr != nil && registered {
log.Errorf("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, err)
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
log.Errorf("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, err)
}
// deregister self in case of error
if err := s.Deregister(); err != nil {
log.Errorf("Server %s-%s deregister error: %s", config.Name, config.Id, err)
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
log.Errorf("Server %s-%s deregister error: %s", config.Name, config.Id, err)
}
}
} else if rerr != nil && !registered {
log.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, err)
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
log.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, err)
}
continue
}
if err := s.Register(); err != nil {
log.Errorf("Server %s-%s register error: %s", config.Name, config.Id, err)
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
log.Errorf("Server %s-%s register error: %s", config.Name, config.Id, err)
}
}
// wait for exit
case ch = <-s.exit:
@@ -870,7 +922,9 @@ func (s *rpcServer) Start() error {
if registered {
// deregister self
if err := s.Deregister(); err != nil {
log.Errorf("Server %s-%s deregister error: %s", config.Name, config.Id, err)
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
log.Errorf("Server %s-%s deregister error: %s", config.Name, config.Id, err)
}
}
}
@@ -886,9 +940,15 @@ func (s *rpcServer) Start() error {
// close transport listener
ch <- ts.Close()
log.Infof("Broker [%s] Disconnected from %s", bname, config.Broker.Address())
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
log.Debugf("Broker [%s] Disconnected from %s", bname, config.Broker.Address())
}
// disconnect the broker
config.Broker.Disconnect()
if err := config.Broker.Disconnect(); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
log.Errorf("Broker [%s] Disconnect error: %v", bname, err)
}
}
// swap back address
s.Lock()