diff --git a/codec_test.go b/codec_test.go new file mode 100644 index 0000000..54b9e90 --- /dev/null +++ b/codec_test.go @@ -0,0 +1,66 @@ +package grpc + +import ( + "context" + "go.unistack.org/micro/v3/codec" + gmetadata "google.golang.org/grpc/metadata" + "testing" +) + +type mockStream struct { + msg any +} + +func (m mockStream) Header() (gmetadata.MD, error) { + return nil, nil +} + +func (m mockStream) Trailer() gmetadata.MD { + return nil +} + +func (m mockStream) CloseSend() error { + return nil +} + +func (m mockStream) Context() context.Context { + return nil +} + +func (m *mockStream) SendMsg(msg any) error { + m.msg = msg + return nil +} + +func (m *mockStream) RecvMsg(msg any) error { + + c := msg.(*codec.Frame) + c.Data = m.msg.(*codec.Frame).Data + + return nil +} + +func Test_ReadWrap(t *testing.T) { + + wp := wrapStream{ + &mockStream{}, + } + + write, err := wp.Write([]byte("test_data")) + if err != nil { + t.Fatal(err) + } + if write != 9 { + t.Error("uncorrected number wrote bytes") + } + + b := make([]byte, write) + read, err := wp.Read(b) + if err != nil { + t.Fatal(err) + } + + if read != 9 || string(b) != "test_data" { + t.Error("uncorrected number wrote bytes or data") + } +} diff --git a/grpc.go b/grpc.go index b1fd925..4caee03 100644 --- a/grpc.go +++ b/grpc.go @@ -103,7 +103,7 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, cf, err := g.newCodec(req.ContentType()) if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) + return errors.InternalServerError("go.micro.client", "%+v", err) } maxRecvMsgSize := g.maxRecvMsgSizeValue() @@ -147,7 +147,7 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...) if err != nil { - return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) + return errors.InternalServerError("go.micro.client", "Error sending request: %v", err) } defer func() { // defer execution of release @@ -217,7 +217,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request cf, err := g.newCodec(req.ContentType()) if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) + return errors.InternalServerError("go.micro.client", "%+v", err) } var dialCtx context.Context @@ -258,7 +258,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...) if err != nil { - return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) + return errors.InternalServerError("go.micro.client", "Error sending request: %v", err) } desc := &grpc.StreamDesc{ @@ -291,7 +291,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request // release the connection g.pool.Put(cc, err) // now return the error - return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err)) + return errors.InternalServerError("go.micro.client", "Error creating stream: %v", err) } // set request codec @@ -421,7 +421,7 @@ func (g *grpcClient) Init(opts ...client.Option) error { g.funcPublish = g.fnPublish g.funcBatchPublish = g.fnBatchPublish - g.opts.Hooks.EachNext(func(hook options.Hook) { + g.opts.Hooks.EachPrev(func(hook options.Hook) { switch h := hook.(type) { case client.HookCall: g.funcCall = h(g.funcCall) @@ -533,7 +533,7 @@ func (g *grpcClient) fnCall(ctx context.Context, req client.Request, rsp interfa // call backoff first. Someone may want an initial start delay t, err := callOpts.Backoff(ctx, req, i) if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) + return errors.InternalServerError("go.micro.client", "%+v", err) } // only sleep if greater than 0 @@ -548,7 +548,7 @@ func (g *grpcClient) fnCall(ctx context.Context, req client.Request, rsp interfa // TODO apply any filtering here routes, err = g.opts.Lookup(ctx, req, callOpts) if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) + return errors.InternalServerError("go.micro.client", "%+v", err) } // balance the list of nodes @@ -675,7 +675,7 @@ func (g *grpcClient) fnStream(ctx context.Context, req client.Request, opts ...c // call backoff first. Someone may want an initial start delay t, err := callOpts.Backoff(ctx, req, i) if err != nil { - return nil, errors.InternalServerError("go.micro.client", err.Error()) + return nil, errors.InternalServerError("go.micro.client", "%+v", err) } // only sleep if greater than 0 @@ -690,7 +690,7 @@ func (g *grpcClient) fnStream(ctx context.Context, req client.Request, opts ...c // TODO apply any filtering here routes, err = g.opts.Lookup(ctx, req, callOpts) if err != nil { - return nil, errors.InternalServerError("go.micro.client", err.Error()) + return nil, errors.InternalServerError("go.micro.client", "%+v", err) } // balance the list of nodes @@ -822,12 +822,12 @@ func (g *grpcClient) publish(ctx context.Context, ps []client.Message, opts ...c // use codec for payload cf, err := g.newCodec(p.ContentType()) if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) + return errors.InternalServerError("go.micro.client", "%+v", err) } // set the body b, err := cf.Marshal(p.Payload()) if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) + return errors.InternalServerError("go.micro.client", "%+v", err) } body = b } diff --git a/grpc_pool.go b/grpc_pool.go index f602710..026e222 100644 --- a/grpc_pool.go +++ b/grpc_pool.go @@ -130,7 +130,7 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption } p.Unlock() - // create new conn) + // nolint (TODO need fix) create new conn) cc, err := grpc.DialContext(ctx, addr, opts...) if err != nil { return nil, err