Merge pull request 'update hooks calling and fix errors create' (#137) from devstigneev/micro-client-grpc:v3 into v3

Reviewed-on: #137
This commit is contained in:
Василий Толстов 2024-12-19 23:49:49 +03:00
commit 53c94249bd
3 changed files with 79 additions and 13 deletions

66
codec_test.go Normal file
View File

@ -0,0 +1,66 @@
package grpc
import (
"context"
"go.unistack.org/micro/v3/codec"
gmetadata "google.golang.org/grpc/metadata"
"testing"
)
type mockStream struct {
msg any
}
func (m mockStream) Header() (gmetadata.MD, error) {
return nil, nil
}
func (m mockStream) Trailer() gmetadata.MD {
return nil
}
func (m mockStream) CloseSend() error {
return nil
}
func (m mockStream) Context() context.Context {
return nil
}
func (m *mockStream) SendMsg(msg any) error {
m.msg = msg
return nil
}
func (m *mockStream) RecvMsg(msg any) error {
c := msg.(*codec.Frame)
c.Data = m.msg.(*codec.Frame).Data
return nil
}
func Test_ReadWrap(t *testing.T) {
wp := wrapStream{
&mockStream{},
}
write, err := wp.Write([]byte("test_data"))
if err != nil {
t.Fatal(err)
}
if write != 9 {
t.Error("uncorrected number wrote bytes")
}
b := make([]byte, write)
read, err := wp.Read(b)
if err != nil {
t.Fatal(err)
}
if read != 9 || string(b) != "test_data" {
t.Error("uncorrected number wrote bytes or data")
}
}

24
grpc.go
View File

@ -103,7 +103,7 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
cf, err := g.newCodec(req.ContentType()) cf, err := g.newCodec(req.ContentType())
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", err.Error()) return errors.InternalServerError("go.micro.client", "%+v", err)
} }
maxRecvMsgSize := g.maxRecvMsgSizeValue() maxRecvMsgSize := g.maxRecvMsgSizeValue()
@ -147,7 +147,7 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...) cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...)
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) return errors.InternalServerError("go.micro.client", "Error sending request: %v", err)
} }
defer func() { defer func() {
// defer execution of release // defer execution of release
@ -217,7 +217,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
cf, err := g.newCodec(req.ContentType()) cf, err := g.newCodec(req.ContentType())
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", err.Error()) return errors.InternalServerError("go.micro.client", "%+v", err)
} }
var dialCtx context.Context var dialCtx context.Context
@ -258,7 +258,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...) cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...)
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) return errors.InternalServerError("go.micro.client", "Error sending request: %v", err)
} }
desc := &grpc.StreamDesc{ desc := &grpc.StreamDesc{
@ -291,7 +291,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
// release the connection // release the connection
g.pool.Put(cc, err) g.pool.Put(cc, err)
// now return the error // now return the error
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err)) return errors.InternalServerError("go.micro.client", "Error creating stream: %v", err)
} }
// set request codec // set request codec
@ -421,7 +421,7 @@ func (g *grpcClient) Init(opts ...client.Option) error {
g.funcPublish = g.fnPublish g.funcPublish = g.fnPublish
g.funcBatchPublish = g.fnBatchPublish g.funcBatchPublish = g.fnBatchPublish
g.opts.Hooks.EachNext(func(hook options.Hook) { g.opts.Hooks.EachPrev(func(hook options.Hook) {
switch h := hook.(type) { switch h := hook.(type) {
case client.HookCall: case client.HookCall:
g.funcCall = h(g.funcCall) g.funcCall = h(g.funcCall)
@ -533,7 +533,7 @@ func (g *grpcClient) fnCall(ctx context.Context, req client.Request, rsp interfa
// call backoff first. Someone may want an initial start delay // call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, req, i) t, err := callOpts.Backoff(ctx, req, i)
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", err.Error()) return errors.InternalServerError("go.micro.client", "%+v", err)
} }
// only sleep if greater than 0 // only sleep if greater than 0
@ -548,7 +548,7 @@ func (g *grpcClient) fnCall(ctx context.Context, req client.Request, rsp interfa
// TODO apply any filtering here // TODO apply any filtering here
routes, err = g.opts.Lookup(ctx, req, callOpts) routes, err = g.opts.Lookup(ctx, req, callOpts)
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", err.Error()) return errors.InternalServerError("go.micro.client", "%+v", err)
} }
// balance the list of nodes // balance the list of nodes
@ -675,7 +675,7 @@ func (g *grpcClient) fnStream(ctx context.Context, req client.Request, opts ...c
// call backoff first. Someone may want an initial start delay // call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, req, i) t, err := callOpts.Backoff(ctx, req, i)
if err != nil { if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error()) return nil, errors.InternalServerError("go.micro.client", "%+v", err)
} }
// only sleep if greater than 0 // only sleep if greater than 0
@ -690,7 +690,7 @@ func (g *grpcClient) fnStream(ctx context.Context, req client.Request, opts ...c
// TODO apply any filtering here // TODO apply any filtering here
routes, err = g.opts.Lookup(ctx, req, callOpts) routes, err = g.opts.Lookup(ctx, req, callOpts)
if err != nil { if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error()) return nil, errors.InternalServerError("go.micro.client", "%+v", err)
} }
// balance the list of nodes // balance the list of nodes
@ -822,12 +822,12 @@ func (g *grpcClient) publish(ctx context.Context, ps []client.Message, opts ...c
// use codec for payload // use codec for payload
cf, err := g.newCodec(p.ContentType()) cf, err := g.newCodec(p.ContentType())
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", err.Error()) return errors.InternalServerError("go.micro.client", "%+v", err)
} }
// set the body // set the body
b, err := cf.Marshal(p.Payload()) b, err := cf.Marshal(p.Payload())
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", err.Error()) return errors.InternalServerError("go.micro.client", "%+v", err)
} }
body = b body = b
} }

View File

@ -130,7 +130,7 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption
} }
p.Unlock() p.Unlock()
// create new conn) // nolint (TODO need fix) create new conn)
cc, err := grpc.DialContext(ctx, addr, opts...) cc, err := grpc.DialContext(ctx, addr, opts...)
if err != nil { if err != nil {
return nil, err return nil, err