Force grpc client/server to use grpc codec for broker
This commit is contained in:
parent
d9a699ae6f
commit
c44fd63301
@ -18,7 +18,6 @@ import (
|
|||||||
"github.com/micro/go-micro/registry"
|
"github.com/micro/go-micro/registry"
|
||||||
"github.com/micro/go-micro/transport"
|
"github.com/micro/go-micro/transport"
|
||||||
|
|
||||||
"github.com/micro/go-micro/util/buf"
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/credentials"
|
"google.golang.org/grpc/credentials"
|
||||||
"google.golang.org/grpc/encoding"
|
"google.golang.org/grpc/encoding"
|
||||||
@ -491,14 +490,13 @@ func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...clie
|
|||||||
}
|
}
|
||||||
md["Content-Type"] = p.ContentType()
|
md["Content-Type"] = p.ContentType()
|
||||||
|
|
||||||
cf, err := g.newCodec(p.ContentType())
|
cf, err := g.newGRPCCodec(p.ContentType())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.InternalServerError("go.micro.client", err.Error())
|
return errors.InternalServerError("go.micro.client", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
b := buf.New(nil)
|
b, err := cf.Marshal(p.Payload())
|
||||||
|
if err != nil {
|
||||||
if err := cf(b).Write(&codec.Message{Type: codec.Event}, p.Payload()); err != nil {
|
|
||||||
return errors.InternalServerError("go.micro.client", err.Error())
|
return errors.InternalServerError("go.micro.client", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -508,7 +506,7 @@ func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...clie
|
|||||||
|
|
||||||
return g.opts.Broker.Publish(p.Topic(), &broker.Message{
|
return g.opts.Broker.Publish(p.Topic(), &broker.Message{
|
||||||
Header: md,
|
Header: md,
|
||||||
Body: b.Bytes(),
|
Body: b,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,18 +1,15 @@
|
|||||||
package grpc
|
package grpc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/micro/go-micro/broker"
|
"github.com/micro/go-micro/broker"
|
||||||
"github.com/micro/go-micro/codec"
|
|
||||||
"github.com/micro/go-micro/metadata"
|
"github.com/micro/go-micro/metadata"
|
||||||
"github.com/micro/go-micro/registry"
|
"github.com/micro/go-micro/registry"
|
||||||
"github.com/micro/go-micro/server"
|
"github.com/micro/go-micro/server"
|
||||||
"github.com/micro/go-micro/util/buf"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -175,7 +172,7 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke
|
|||||||
msg.Header["Content-Type"] = defaultContentType
|
msg.Header["Content-Type"] = defaultContentType
|
||||||
ct = defaultContentType
|
ct = defaultContentType
|
||||||
}
|
}
|
||||||
cf, err := g.newCodec(ct)
|
cf, err := g.newGRPCCodec(ct)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -205,15 +202,7 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke
|
|||||||
req = req.Elem()
|
req = req.Elem()
|
||||||
}
|
}
|
||||||
|
|
||||||
b := buf.New(bytes.NewBuffer(msg.Body))
|
if err := cf.Unmarshal(msg.Body, req.Interface()); err != nil {
|
||||||
co := cf(b)
|
|
||||||
defer co.Close()
|
|
||||||
|
|
||||||
if err := co.ReadHeader(&codec.Message{}, codec.Event); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := co.ReadBody(req.Interface()); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user