From ab7312706317f368c407ca70502cea7b548cc2a3 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Tue, 17 Mar 2020 14:27:20 +0300 Subject: [PATCH] grpc client/server fixes (#1355) * grpc client/server fixes Signed-off-by: Vasiliy Tolstov --- client/grpc/error.go | 27 +++++++++++++++------------ client/grpc/grpc.go | 2 +- errors/errors.go | 19 +++++++++++++++++++ errors/errors_test.go | 16 ++++++++++++++++ server/grpc/grpc.go | 19 +++++-------------- server/grpc/util.go | 11 ----------- store/service/service.go | 10 +++++++--- 7 files changed, 63 insertions(+), 41 deletions(-) diff --git a/client/grpc/error.go b/client/grpc/error.go index b625d1d8..1cd36842 100644 --- a/client/grpc/error.go +++ b/client/grpc/error.go @@ -17,18 +17,21 @@ func microError(err error) error { } // grpc error - if s, ok := status.FromError(err); ok { - details := s.Details() - if len(details) == 0 { - if e := errors.Parse(s.Message()); e.Code > 0 { - return e // actually a micro error - } - return errors.InternalServerError("go.micro.client", s.Message()) - } - // return first error from details - return details[0].(error) + s, ok := status.FromError(err) + if !ok { + return err } - // do nothing - return err + // return first error from details + if details := s.Details(); len(details) > 0 { + return microError(details[0].(error)) + } + + // try to decode micro *errors.Error + if e := errors.Parse(s.Message()); e.Code > 0 { + return e // actually a micro error + } + + // fallback + return errors.InternalServerError("go.micro.client", s.Message()) } diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index 1f92b7c4..432ef539 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -465,11 +465,11 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface // make the call err = gcall(ctx, node, req, rsp, callOpts) + g.opts.Selector.Mark(service, node, err) if verr, ok := err.(*errors.Error); ok { return verr } - g.opts.Selector.Mark(service, node, err) return err } diff --git a/errors/errors.go b/errors/errors.go index df6e6c9f..6c6598aa 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -115,3 +115,22 @@ func InternalServerError(id, format string, a ...interface{}) error { Status: http.StatusText(500), } } + +func Equal(err1 error, err2 error) bool { + verr1, ok1 := err1.(*Error) + verr2, ok2 := err2.(*Error) + + if ok1 != ok2 { + return false + } + + if !ok1 { + return err1 == err2 + } + + if verr1.Code != verr2.Code { + return false + } + + return true +} diff --git a/errors/errors_test.go b/errors/errors_test.go index 757d275d..662dd8d0 100644 --- a/errors/errors_test.go +++ b/errors/errors_test.go @@ -1,10 +1,26 @@ package errors import ( + er "errors" "net/http" "testing" ) +func TestEqual(t *testing.T) { + err1 := NotFound("myid1", "msg1") + err2 := NotFound("myid2", "msg2") + + if !Equal(err1, err2) { + t.Fatal("errors must be equal") + } + + err3 := er.New("my test err") + if Equal(err1, err3) { + t.Fatal("errors must be not equal") + } + +} + func TestErrors(t *testing.T) { testData := []*Error{ { diff --git a/server/grpc/grpc.go b/server/grpc/grpc.go index 4015780d..af71aa5e 100644 --- a/server/grpc/grpc.go +++ b/server/grpc/grpc.go @@ -279,6 +279,9 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) error { // serve the actual request using the request router if err := r.ServeRequest(ctx, request, response); err != nil { + if _, ok := status.FromError(err); ok { + return err + } return status.Errorf(codes.Internal, err.Error()) } @@ -379,7 +382,6 @@ func (g *grpcServer) processRequest(stream grpc.ServerStream, service *service, for i := len(g.opts.HdlrWrappers); i > 0; i-- { fn = g.opts.HdlrWrappers[i-1](fn) } - statusCode := codes.OK statusDesc := "" // execute the handler @@ -402,24 +404,19 @@ func (g *grpcServer) processRequest(stream grpc.ServerStream, service *service, if err != nil { return err } - case *rpcError: - // rpcError handling may be we have ability to attach it to details? - statusCode = verr.code - statusDesc = verr.desc - errStatus = status.New(statusCode, statusDesc) default: // default case user pass own error type that not proto based statusCode = convertCode(verr) statusDesc = verr.Error() errStatus = status.New(statusCode, statusDesc) } + return errStatus.Err() } if err := stream.SendMsg(replyv.Interface()); err != nil { return err } - return status.New(statusCode, statusDesc).Err() } } @@ -459,8 +456,7 @@ func (g *grpcServer) processStream(stream grpc.ServerStream, service *service, m statusCode := codes.OK statusDesc := "" - appErr := fn(ctx, r, ss) - if appErr != nil { + if appErr := fn(ctx, r, ss); appErr != nil { var err error var errStatus *status.Status switch verr := appErr.(type) { @@ -480,11 +476,6 @@ func (g *grpcServer) processStream(stream grpc.ServerStream, service *service, m if err != nil { return err } - case *rpcError: - // rpcError handling may be we have ability to attach it to details? - statusCode = verr.code - statusDesc = verr.desc - errStatus = status.New(statusCode, statusDesc) default: // default case user pass own error type that not proto based statusCode = convertCode(verr) diff --git a/server/grpc/util.go b/server/grpc/util.go index 05835488..dfb467ab 100644 --- a/server/grpc/util.go +++ b/server/grpc/util.go @@ -2,7 +2,6 @@ package grpc import ( "context" - "fmt" "io" "os" "sync" @@ -10,16 +9,6 @@ import ( "google.golang.org/grpc/codes" ) -// rpcError defines the status from an RPC. -type rpcError struct { - code codes.Code - desc string -} - -func (e *rpcError) Error() string { - return fmt.Sprintf("rpc error: code = %d desc = %s", e.code, e.desc) -} - // convertCode converts a standard Go error into its canonical code. Note that // this is only used to translate the error returned by the server applications. func convertCode(err error) codes.Code { diff --git a/store/service/service.go b/store/service/service.go index 90443744..b15340ff 100644 --- a/store/service/service.go +++ b/store/service/service.go @@ -62,7 +62,7 @@ func (s *serviceStore) Context() context.Context { // Sync all the known records func (s *serviceStore) List(opts ...store.ListOption) ([]string, error) { stream, err := s.Client.List(s.Context(), &pb.ListRequest{}, client.WithAddress(s.Nodes...)) - if verr, ok := err.(*errors.Error); ok && verr.Code == 404 { + if err != nil && errors.Equal(err, errors.NotFound("", "")) { return nil, store.ErrNotFound } else if err != nil { return nil, err @@ -101,7 +101,7 @@ func (s *serviceStore) Read(key string, opts ...store.ReadOption) ([]*store.Reco Prefix: options.Prefix, }, }, client.WithAddress(s.Nodes...)) - if verr, ok := err.(*errors.Error); ok && verr.Code == 404 { + if err != nil && errors.Equal(err, errors.NotFound("", "")) { return nil, store.ErrNotFound } else if err != nil { return nil, err @@ -129,6 +129,9 @@ func (s *serviceStore) Write(record *store.Record, opts ...store.WriteOption) er Expiry: int64(record.Expiry.Seconds()), }, }, client.WithAddress(s.Nodes...)) + if err != nil && errors.Equal(err, errors.NotFound("", "")) { + return store.ErrNotFound + } return err } @@ -138,9 +141,10 @@ func (s *serviceStore) Delete(key string, opts ...store.DeleteOption) error { _, err := s.Client.Delete(s.Context(), &pb.DeleteRequest{ Key: key, }, client.WithAddress(s.Nodes...)) - if verr, ok := err.(*errors.Error); ok && verr.Code == 404 { + if err != nil && errors.Equal(err, errors.NotFound("", "")) { return store.ErrNotFound } + return err }