Merge pull request #98 from unistack-org/grpc-fixes

support Grpc-Timeout header, export default content type
This commit is contained in:
Василий Толстов 2022-03-21 13:06:19 +03:00 committed by GitHub
commit cdee03a96e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 28 additions and 8 deletions

32
grpc.go
View File

@ -35,7 +35,7 @@ import (
) )
const ( const (
defaultContentType = "application/grpc+proto" DefaultContentType = "application/grpc+proto"
) )
/* /*
@ -249,14 +249,33 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err
md.Set(k, strings.Join(v, ", ")) md.Set(k, strings.Join(v, ", "))
} }
var td string
// timeout for server deadline // timeout for server deadline
to, ok := md.Get("timeout") if v, ok := md.Get("timeout"); ok {
if ok {
md.Del("timeout") md.Del("timeout")
td = v
}
if v, ok := md.Get("Grpc-Timeout"); ok {
md.Del("Grpc-Timeout")
td = v[:len(v)-1]
switch v[:] {
case "S":
td += "s"
case "M":
td += "m"
case "H":
td += "h"
case "m":
td += "ms"
case "u":
td += "us"
case "n":
td += "ns"
}
} }
// get content type // get content type
ct := defaultContentType ct := DefaultContentType
if ctype, ok := md.Get("content-type"); ok { if ctype, ok := md.Get("content-type"); ok {
ct = ctype ct = ctype
@ -275,8 +294,8 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err
} }
// set the timeout if we have it // set the timeout if we have it
if len(to) > 0 { if len(td) > 0 {
if n, err := strconv.ParseUint(to, 10, 64); err == nil { if n, err := strconv.ParseUint(td, 10, 64); err == nil {
var cancel context.CancelFunc var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(n)) ctx, cancel = context.WithTimeout(ctx, time.Duration(n))
defer cancel() defer cancel()
@ -341,6 +360,7 @@ func (g *grpcServer) processRequest(ctx context.Context, stream grpc.ServerStrea
} }
// Unmarshal request // Unmarshal request
// TODO: avoid Marshal call later by recv to frame and reuse it data
if err := stream.RecvMsg(argv.Interface()); err != nil { if err := stream.RecvMsg(argv.Interface()); err != nil {
return err return err
} }

View File

@ -122,8 +122,8 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke
ct := msg.Header["Content-Type"] ct := msg.Header["Content-Type"]
if len(ct) == 0 { if len(ct) == 0 {
msg.Header["Content-Type"] = defaultContentType msg.Header["Content-Type"] = DefaultContentType
ct = defaultContentType ct = DefaultContentType
} }
cf, err := g.newCodec(ct) cf, err := g.newCodec(ct)
if err != nil { if err != nil {