update hooks calling and fix errors create #137
66
codec_test.go
Normal file
66
codec_test.go
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
package grpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"go.unistack.org/micro/v3/codec"
|
||||||
|
gmetadata "google.golang.org/grpc/metadata"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
type mockStream struct {
|
||||||
|
msg any
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m mockStream) Header() (gmetadata.MD, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m mockStream) Trailer() gmetadata.MD {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m mockStream) CloseSend() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m mockStream) Context() context.Context {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockStream) SendMsg(msg any) error {
|
||||||
|
m.msg = msg
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockStream) RecvMsg(msg any) error {
|
||||||
|
|
||||||
|
c := msg.(*codec.Frame)
|
||||||
|
c.Data = m.msg.(*codec.Frame).Data
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_ReadWrap(t *testing.T) {
|
||||||
|
|
||||||
|
wp := wrapStream{
|
||||||
|
&mockStream{},
|
||||||
|
}
|
||||||
|
|
||||||
|
write, err := wp.Write([]byte("test_data"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if write != 9 {
|
||||||
|
t.Error("uncorrected number wrote bytes")
|
||||||
|
}
|
||||||
|
|
||||||
|
b := make([]byte, write)
|
||||||
|
read, err := wp.Read(b)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if read != 9 || string(b) != "test_data" {
|
||||||
|
t.Error("uncorrected number wrote bytes or data")
|
||||||
|
}
|
||||||
|
}
|
24
grpc.go
24
grpc.go
@ -103,7 +103,7 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
|
|||||||
|
|
||||||
cf, err := g.newCodec(req.ContentType())
|
cf, err := g.newCodec(req.ContentType())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.InternalServerError("go.micro.client", err.Error())
|
return errors.InternalServerError("go.micro.client", "%+v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
maxRecvMsgSize := g.maxRecvMsgSizeValue()
|
maxRecvMsgSize := g.maxRecvMsgSizeValue()
|
||||||
@ -147,7 +147,7 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
|
|||||||
|
|
||||||
cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...)
|
cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
|
return errors.InternalServerError("go.micro.client", "Error sending request: %v", err)
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
// defer execution of release
|
// defer execution of release
|
||||||
@ -217,7 +217,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
|
|||||||
|
|
||||||
cf, err := g.newCodec(req.ContentType())
|
cf, err := g.newCodec(req.ContentType())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.InternalServerError("go.micro.client", err.Error())
|
return errors.InternalServerError("go.micro.client", "%+v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var dialCtx context.Context
|
var dialCtx context.Context
|
||||||
@ -258,7 +258,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
|
|||||||
|
|
||||||
cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...)
|
cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
|
return errors.InternalServerError("go.micro.client", "Error sending request: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
desc := &grpc.StreamDesc{
|
desc := &grpc.StreamDesc{
|
||||||
@ -291,7 +291,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
|
|||||||
// release the connection
|
// release the connection
|
||||||
g.pool.Put(cc, err)
|
g.pool.Put(cc, err)
|
||||||
// now return the error
|
// now return the error
|
||||||
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err))
|
return errors.InternalServerError("go.micro.client", "Error creating stream: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// set request codec
|
// set request codec
|
||||||
@ -421,7 +421,7 @@ func (g *grpcClient) Init(opts ...client.Option) error {
|
|||||||
g.funcPublish = g.fnPublish
|
g.funcPublish = g.fnPublish
|
||||||
g.funcBatchPublish = g.fnBatchPublish
|
g.funcBatchPublish = g.fnBatchPublish
|
||||||
|
|
||||||
g.opts.Hooks.EachNext(func(hook options.Hook) {
|
g.opts.Hooks.EachPrev(func(hook options.Hook) {
|
||||||
switch h := hook.(type) {
|
switch h := hook.(type) {
|
||||||
case client.HookCall:
|
case client.HookCall:
|
||||||
g.funcCall = h(g.funcCall)
|
g.funcCall = h(g.funcCall)
|
||||||
@ -533,7 +533,7 @@ func (g *grpcClient) fnCall(ctx context.Context, req client.Request, rsp interfa
|
|||||||
// call backoff first. Someone may want an initial start delay
|
// call backoff first. Someone may want an initial start delay
|
||||||
t, err := callOpts.Backoff(ctx, req, i)
|
t, err := callOpts.Backoff(ctx, req, i)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.InternalServerError("go.micro.client", err.Error())
|
return errors.InternalServerError("go.micro.client", "%+v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// only sleep if greater than 0
|
// only sleep if greater than 0
|
||||||
@ -548,7 +548,7 @@ func (g *grpcClient) fnCall(ctx context.Context, req client.Request, rsp interfa
|
|||||||
// TODO apply any filtering here
|
// TODO apply any filtering here
|
||||||
routes, err = g.opts.Lookup(ctx, req, callOpts)
|
routes, err = g.opts.Lookup(ctx, req, callOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.InternalServerError("go.micro.client", err.Error())
|
return errors.InternalServerError("go.micro.client", "%+v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// balance the list of nodes
|
// balance the list of nodes
|
||||||
@ -675,7 +675,7 @@ func (g *grpcClient) fnStream(ctx context.Context, req client.Request, opts ...c
|
|||||||
// call backoff first. Someone may want an initial start delay
|
// call backoff first. Someone may want an initial start delay
|
||||||
t, err := callOpts.Backoff(ctx, req, i)
|
t, err := callOpts.Backoff(ctx, req, i)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
return nil, errors.InternalServerError("go.micro.client", "%+v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// only sleep if greater than 0
|
// only sleep if greater than 0
|
||||||
@ -690,7 +690,7 @@ func (g *grpcClient) fnStream(ctx context.Context, req client.Request, opts ...c
|
|||||||
// TODO apply any filtering here
|
// TODO apply any filtering here
|
||||||
routes, err = g.opts.Lookup(ctx, req, callOpts)
|
routes, err = g.opts.Lookup(ctx, req, callOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
return nil, errors.InternalServerError("go.micro.client", "%+v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// balance the list of nodes
|
// balance the list of nodes
|
||||||
@ -822,12 +822,12 @@ func (g *grpcClient) publish(ctx context.Context, ps []client.Message, opts ...c
|
|||||||
// use codec for payload
|
// use codec for payload
|
||||||
cf, err := g.newCodec(p.ContentType())
|
cf, err := g.newCodec(p.ContentType())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.InternalServerError("go.micro.client", err.Error())
|
return errors.InternalServerError("go.micro.client", "%+v", err)
|
||||||
}
|
}
|
||||||
// set the body
|
// set the body
|
||||||
b, err := cf.Marshal(p.Payload())
|
b, err := cf.Marshal(p.Payload())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.InternalServerError("go.micro.client", err.Error())
|
return errors.InternalServerError("go.micro.client", "%+v", err)
|
||||||
}
|
}
|
||||||
body = b
|
body = b
|
||||||
}
|
}
|
||||||
|
@ -130,7 +130,7 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption
|
|||||||
}
|
}
|
||||||
p.Unlock()
|
p.Unlock()
|
||||||
|
|
||||||
// create new conn)
|
// nolint (TODO need fix) create new conn)
|
||||||
cc, err := grpc.DialContext(ctx, addr, opts...)
|
cc, err := grpc.DialContext(ctx, addr, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
Loading…
x
Reference in New Issue
Block a user