diff --git a/grpc.go b/grpc.go index 5f71922..65c3296 100644 --- a/grpc.go +++ b/grpc.go @@ -35,7 +35,7 @@ import ( ) const ( - defaultContentType = "application/grpc+proto" + DefaultContentType = "application/grpc+proto" ) /* @@ -249,14 +249,33 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err md.Set(k, strings.Join(v, ", ")) } + var td string // timeout for server deadline - to, ok := md.Get("timeout") - if ok { + if v, ok := md.Get("timeout"); ok { md.Del("timeout") + td = v + } + if v, ok := md.Get("Grpc-Timeout"); ok { + md.Del("Grpc-Timeout") + td = v[:len(v)-1] + switch v[:] { + case "S": + td += "s" + case "M": + td += "m" + case "H": + td += "h" + case "m": + td += "ms" + case "u": + td += "us" + case "n": + td += "ns" + } } // get content type - ct := defaultContentType + ct := DefaultContentType if ctype, ok := md.Get("content-type"); ok { ct = ctype @@ -275,8 +294,8 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err } // set the timeout if we have it - if len(to) > 0 { - if n, err := strconv.ParseUint(to, 10, 64); err == nil { + if len(td) > 0 { + if n, err := strconv.ParseUint(td, 10, 64); err == nil { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, time.Duration(n)) defer cancel() @@ -341,6 +360,7 @@ func (g *grpcServer) processRequest(ctx context.Context, stream grpc.ServerStrea } // Unmarshal request + // TODO: avoid Marshal call later by recv to frame and reuse it data if err := stream.RecvMsg(argv.Interface()); err != nil { return err } diff --git a/subscriber.go b/subscriber.go index c0b11a9..59a0b53 100644 --- a/subscriber.go +++ b/subscriber.go @@ -122,8 +122,8 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke ct := msg.Header["Content-Type"] if len(ct) == 0 { - msg.Header["Content-Type"] = defaultContentType - ct = defaultContentType + msg.Header["Content-Type"] = DefaultContentType + ct = DefaultContentType } cf, err := g.newCodec(ct) if err != nil {