codec rewrite

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2020-11-26 01:17:21 +03:00
parent 5098a36a6b
commit 36040a5765
9 changed files with 306 additions and 318 deletions

111
grpc.go
View File

@@ -16,6 +16,7 @@ import (
oldproto "github.com/golang/protobuf/proto"
"github.com/unistack-org/micro/v3/broker"
"github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/errors"
"github.com/unistack-org/micro/v3/logger"
meta "github.com/unistack-org/micro/v3/metadata"
@@ -33,12 +34,6 @@ import (
"google.golang.org/protobuf/proto"
)
var (
// DefaultMaxMsgSize define maximum message size that server can send
// or receive. Default value is 4MB.
DefaultMaxMsgSize = 1024 * 1024 * 4
)
const (
defaultContentType = "application/grpc"
)
@@ -67,13 +62,7 @@ type grpcServer struct {
// registry service instance
rsvc *registry.Service
codecs map[string]encoding.Codec
}
func init() {
encoding.RegisterCodec(wrapCodec{jsonCodec{}})
encoding.RegisterCodec(wrapCodec{protoCodec{}})
encoding.RegisterCodec(wrapCodec{bytesCodec{}})
codecs map[string]codec.Codec
}
func newGRPCServer(opts ...server.Option) server.Server {
@@ -122,22 +111,14 @@ func (g *grpcServer) configure(opts ...server.Option) error {
g.wg = g.opts.Wait
g.codecs = make(map[string]encoding.Codec, len(defaultGRPCCodecs))
for k, v := range defaultGRPCCodecs {
g.codecs[k] = v
}
var codecs map[string]encoding.Codec
if g.opts.Context != nil {
if v, ok := g.opts.Context.Value(codecsKey{}).(map[string]encoding.Codec); ok && v != nil {
codecs = v
if codecs, ok := g.opts.Context.Value(codecsKey{}).(map[string]encoding.Codec); ok && codecs != nil {
for k, v := range codecs {
g.opts.Codecs[k] = &wrapGrpcCodec{v}
}
}
}
for k, v := range codecs {
g.codecs[k] = v
}
maxMsgSize := g.getMaxMsgSize()
gopts := []grpc.ServerOption{
@@ -177,11 +158,11 @@ func (g *grpcServer) configure(opts ...server.Option) error {
func (g *grpcServer) getMaxMsgSize() int {
if g.opts.Context == nil {
return DefaultMaxMsgSize
return codec.DefaultMaxMsgSize
}
s, ok := g.opts.Context.Value(maxMsgSizeKey{}).(int)
if !ok {
return DefaultMaxMsgSize
return codec.DefaultMaxMsgSize
}
return s
}
@@ -236,7 +217,7 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err
config := g.opts
g.RUnlock()
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("grpc handler got error: %s", err)
config.Logger.Errorf("grpc handler got error: %s", err)
}
}
}()
@@ -304,30 +285,30 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err
// process via router
if g.opts.Router != nil {
cc, err := g.newGRPCCodec(ct)
cf, err := g.newCodec(ct)
if err != nil {
return errors.InternalServerError(g.opts.Name, err.Error())
}
codec := &grpcCodec{
ServerStream: stream,
method: fmt.Sprintf("%s.%s", serviceName, methodName),
endpoint: fmt.Sprintf("%s.%s", serviceName, methodName),
target: g.opts.Name,
c: cc,
}
// create a client.Request
request := &rpcRequest{
rw: &wrapStream{stream},
service: serviceFromMethod(fullMethod),
contentType: ct,
method: fmt.Sprintf("%s.%s", serviceName, methodName),
codec: codec,
endpoint: fmt.Sprintf("%s.%s", serviceName, methodName),
target: g.opts.Name,
codec: cf,
stream: true,
}
response := &rpcResponse{
header: make(map[string]string),
codec: codec,
method: fmt.Sprintf("%s.%s", serviceName, methodName),
endpoint: fmt.Sprintf("%s.%s", serviceName, methodName),
target: g.opts.Name,
rw: &wrapStream{stream},
header: make(map[string]string),
codec: cf,
}
// create a wrapped function
@@ -424,11 +405,11 @@ func (g *grpcServer) processRequest(stream grpc.ServerStream, service *service,
function := mtype.method.Func
var returnValues []reflect.Value
cc, err := g.newGRPCCodec(ct)
cf, err := g.newCodec(ct)
if err != nil {
return errors.InternalServerError(g.opts.Name, err.Error())
}
b, err := cc.Marshal(argv.Interface())
b, err := cf.Marshal(argv.Interface())
if err != nil {
return err
}
@@ -628,15 +609,15 @@ func (g *grpcServer) processStream(stream grpc.ServerStream, service *service, m
return status.New(statusCode, statusDesc).Err()
}
func (g *grpcServer) newGRPCCodec(contentType string) (encoding.Codec, error) {
func (g *grpcServer) newCodec(ct string) (codec.Codec, error) {
g.RLock()
defer g.RUnlock()
if c, ok := g.codecs[contentType]; ok {
if c, ok := g.opts.Codecs[ct]; ok {
return c, nil
}
return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType)
return nil, codec.ErrUnknownContentType
}
func (g *grpcServer) Options() server.Options {
@@ -753,7 +734,7 @@ func (g *grpcServer) Register() error {
if !registered {
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info("Registry [%s] Registering node: %s", config.Registry.String(), service.Nodes[0].Id)
config.Logger.Infof("Registry [%s] Registering node: %s", config.Registry.String(), service.Nodes[0].Id)
}
}
@@ -785,7 +766,7 @@ func (g *grpcServer) Register() error {
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info("Subscribing to topic: %s", sb.Topic())
config.Logger.Infof("Subscribing to topic: %s", sb.Topic())
}
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
if err != nil {
@@ -813,7 +794,7 @@ func (g *grpcServer) Deregister() error {
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info("Deregistering node: %s", service.Nodes[0].Id)
config.Logger.Infof("Deregistering node: %s", service.Nodes[0].Id)
}
if err := server.DefaultDeregisterFunc(service, config); err != nil {
@@ -837,11 +818,11 @@ func (g *grpcServer) Deregister() error {
go func(s broker.Subscriber) {
defer wg.Done()
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info("Unsubscribing from topic: %s", s.Topic())
config.Logger.Infof("Unsubscribing from topic: %s", s.Topic())
}
if err := s.Unsubscribe(g.opts.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("Unsubscribing from topic: %s err: %v", s.Topic(), err)
config.Logger.Errorf("Unsubscribing from topic: %s err: %v", s.Topic(), err)
}
}
}(sub)
@@ -864,6 +845,10 @@ func (g *grpcServer) Start() error {
config := g.Options()
for _, k := range config.Codecs {
encoding.RegisterCodec(&wrapMicroCodec{k})
}
// micro: config.Transport.Listen(config.Address)
var ts net.Listener
@@ -891,7 +876,7 @@ func (g *grpcServer) Start() error {
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info("Server [grpc] Listening on %s", ts.Addr().String())
config.Logger.Infof("Server [grpc] Listening on %s", ts.Addr().String())
}
g.Lock()
g.opts.Address = ts.Addr().String()
@@ -905,26 +890,26 @@ func (g *grpcServer) Start() error {
// connect to the broker
if err := config.Broker.Connect(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("Broker [%s] connect error: %v", config.Broker.String(), err)
config.Logger.Errorf("Broker [%s] connect error: %v", config.Broker.String(), err)
}
return err
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())
config.Logger.Infof("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())
}
}
// use RegisterCheck func before register
if err := g.opts.RegisterCheck(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("Server %s-%s register check error: %s", config.Name, config.Id, err)
config.Logger.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, err)
}
} else {
// announce self to the world
if err := g.Register(); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("Server register error: %v", err)
config.Logger.Errorf("Server register error: %v", err)
}
}
}
@@ -933,11 +918,11 @@ func (g *grpcServer) Start() error {
go func() {
if err := g.srv.Serve(ts); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("gRPC Server start error: %v", err)
config.Logger.Errorf("gRPC Server start error: %v", err)
}
if err := g.Stop(); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("gRPC Server stop error: %v", err)
config.Logger.Errorf("gRPC Server stop error: %v", err)
}
}
}
@@ -966,23 +951,23 @@ func (g *grpcServer) Start() error {
rerr := g.opts.RegisterCheck(g.opts.Context)
if rerr != nil && registered {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr)
config.Logger.Errorf("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr)
}
// deregister self in case of error
if err := g.Deregister(); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("Server %s-%s deregister error: %s", config.Name, config.Id, err)
config.Logger.Errorf("Server %s-%s deregister error: %s", config.Name, config.Id, err)
}
}
} else if rerr != nil && !registered {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("Server %s-%s register check error: %s", config.Name, config.Id, rerr)
config.Logger.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, rerr)
}
continue
}
if err := g.Register(); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("Server %s-%s register error: %s", config.Name, config.Id, err)
config.Logger.Errorf("Server %s-%s register error: %s", config.Name, config.Id, err)
}
}
// wait for exit
@@ -994,7 +979,7 @@ func (g *grpcServer) Start() error {
// deregister self
if err := g.Deregister(); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("Server deregister error: ", err)
config.Logger.Errorf("Server deregister error: ", err)
}
}
@@ -1021,12 +1006,12 @@ func (g *grpcServer) Start() error {
ch <- nil
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())
config.Logger.Infof("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())
}
// disconnect broker
if err := config.Broker.Disconnect(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("Broker [%s] disconnect error: %v", config.Broker.String(), err)
config.Logger.Errorf("Broker [%s] disconnect error: %v", config.Broker.String(), err)
}
}
}()