grpc client/server fixes (#1355)
* grpc client/server fixes Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
03031a694d
commit
ab73127063
@ -17,18 +17,21 @@ func microError(err error) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// grpc error
|
// grpc error
|
||||||
if s, ok := status.FromError(err); ok {
|
s, ok := status.FromError(err)
|
||||||
details := s.Details()
|
if !ok {
|
||||||
if len(details) == 0 {
|
return err
|
||||||
if e := errors.Parse(s.Message()); e.Code > 0 {
|
|
||||||
return e // actually a micro error
|
|
||||||
}
|
|
||||||
return errors.InternalServerError("go.micro.client", s.Message())
|
|
||||||
}
|
|
||||||
// return first error from details
|
|
||||||
return details[0].(error)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// do nothing
|
// return first error from details
|
||||||
return err
|
if details := s.Details(); len(details) > 0 {
|
||||||
|
return microError(details[0].(error))
|
||||||
|
}
|
||||||
|
|
||||||
|
// try to decode micro *errors.Error
|
||||||
|
if e := errors.Parse(s.Message()); e.Code > 0 {
|
||||||
|
return e // actually a micro error
|
||||||
|
}
|
||||||
|
|
||||||
|
// fallback
|
||||||
|
return errors.InternalServerError("go.micro.client", s.Message())
|
||||||
}
|
}
|
||||||
|
@ -465,11 +465,11 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface
|
|||||||
|
|
||||||
// make the call
|
// make the call
|
||||||
err = gcall(ctx, node, req, rsp, callOpts)
|
err = gcall(ctx, node, req, rsp, callOpts)
|
||||||
|
g.opts.Selector.Mark(service, node, err)
|
||||||
if verr, ok := err.(*errors.Error); ok {
|
if verr, ok := err.(*errors.Error); ok {
|
||||||
return verr
|
return verr
|
||||||
}
|
}
|
||||||
|
|
||||||
g.opts.Selector.Mark(service, node, err)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -115,3 +115,22 @@ func InternalServerError(id, format string, a ...interface{}) error {
|
|||||||
Status: http.StatusText(500),
|
Status: http.StatusText(500),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Equal(err1 error, err2 error) bool {
|
||||||
|
verr1, ok1 := err1.(*Error)
|
||||||
|
verr2, ok2 := err2.(*Error)
|
||||||
|
|
||||||
|
if ok1 != ok2 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if !ok1 {
|
||||||
|
return err1 == err2
|
||||||
|
}
|
||||||
|
|
||||||
|
if verr1.Code != verr2.Code {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
@ -1,10 +1,26 @@
|
|||||||
package errors
|
package errors
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
er "errors"
|
||||||
"net/http"
|
"net/http"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestEqual(t *testing.T) {
|
||||||
|
err1 := NotFound("myid1", "msg1")
|
||||||
|
err2 := NotFound("myid2", "msg2")
|
||||||
|
|
||||||
|
if !Equal(err1, err2) {
|
||||||
|
t.Fatal("errors must be equal")
|
||||||
|
}
|
||||||
|
|
||||||
|
err3 := er.New("my test err")
|
||||||
|
if Equal(err1, err3) {
|
||||||
|
t.Fatal("errors must be not equal")
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func TestErrors(t *testing.T) {
|
func TestErrors(t *testing.T) {
|
||||||
testData := []*Error{
|
testData := []*Error{
|
||||||
{
|
{
|
||||||
|
@ -279,6 +279,9 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) error {
|
|||||||
|
|
||||||
// serve the actual request using the request router
|
// serve the actual request using the request router
|
||||||
if err := r.ServeRequest(ctx, request, response); err != nil {
|
if err := r.ServeRequest(ctx, request, response); err != nil {
|
||||||
|
if _, ok := status.FromError(err); ok {
|
||||||
|
return err
|
||||||
|
}
|
||||||
return status.Errorf(codes.Internal, err.Error())
|
return status.Errorf(codes.Internal, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -379,7 +382,6 @@ func (g *grpcServer) processRequest(stream grpc.ServerStream, service *service,
|
|||||||
for i := len(g.opts.HdlrWrappers); i > 0; i-- {
|
for i := len(g.opts.HdlrWrappers); i > 0; i-- {
|
||||||
fn = g.opts.HdlrWrappers[i-1](fn)
|
fn = g.opts.HdlrWrappers[i-1](fn)
|
||||||
}
|
}
|
||||||
|
|
||||||
statusCode := codes.OK
|
statusCode := codes.OK
|
||||||
statusDesc := ""
|
statusDesc := ""
|
||||||
// execute the handler
|
// execute the handler
|
||||||
@ -402,24 +404,19 @@ func (g *grpcServer) processRequest(stream grpc.ServerStream, service *service,
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
case *rpcError:
|
|
||||||
// rpcError handling may be we have ability to attach it to details?
|
|
||||||
statusCode = verr.code
|
|
||||||
statusDesc = verr.desc
|
|
||||||
errStatus = status.New(statusCode, statusDesc)
|
|
||||||
default:
|
default:
|
||||||
// default case user pass own error type that not proto based
|
// default case user pass own error type that not proto based
|
||||||
statusCode = convertCode(verr)
|
statusCode = convertCode(verr)
|
||||||
statusDesc = verr.Error()
|
statusDesc = verr.Error()
|
||||||
errStatus = status.New(statusCode, statusDesc)
|
errStatus = status.New(statusCode, statusDesc)
|
||||||
}
|
}
|
||||||
|
|
||||||
return errStatus.Err()
|
return errStatus.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := stream.SendMsg(replyv.Interface()); err != nil {
|
if err := stream.SendMsg(replyv.Interface()); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return status.New(statusCode, statusDesc).Err()
|
return status.New(statusCode, statusDesc).Err()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -459,8 +456,7 @@ func (g *grpcServer) processStream(stream grpc.ServerStream, service *service, m
|
|||||||
statusCode := codes.OK
|
statusCode := codes.OK
|
||||||
statusDesc := ""
|
statusDesc := ""
|
||||||
|
|
||||||
appErr := fn(ctx, r, ss)
|
if appErr := fn(ctx, r, ss); appErr != nil {
|
||||||
if appErr != nil {
|
|
||||||
var err error
|
var err error
|
||||||
var errStatus *status.Status
|
var errStatus *status.Status
|
||||||
switch verr := appErr.(type) {
|
switch verr := appErr.(type) {
|
||||||
@ -480,11 +476,6 @@ func (g *grpcServer) processStream(stream grpc.ServerStream, service *service, m
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
case *rpcError:
|
|
||||||
// rpcError handling may be we have ability to attach it to details?
|
|
||||||
statusCode = verr.code
|
|
||||||
statusDesc = verr.desc
|
|
||||||
errStatus = status.New(statusCode, statusDesc)
|
|
||||||
default:
|
default:
|
||||||
// default case user pass own error type that not proto based
|
// default case user pass own error type that not proto based
|
||||||
statusCode = convertCode(verr)
|
statusCode = convertCode(verr)
|
||||||
|
@ -2,7 +2,6 @@ package grpc
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
@ -10,16 +9,6 @@ import (
|
|||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
)
|
)
|
||||||
|
|
||||||
// rpcError defines the status from an RPC.
|
|
||||||
type rpcError struct {
|
|
||||||
code codes.Code
|
|
||||||
desc string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *rpcError) Error() string {
|
|
||||||
return fmt.Sprintf("rpc error: code = %d desc = %s", e.code, e.desc)
|
|
||||||
}
|
|
||||||
|
|
||||||
// convertCode converts a standard Go error into its canonical code. Note that
|
// convertCode converts a standard Go error into its canonical code. Note that
|
||||||
// this is only used to translate the error returned by the server applications.
|
// this is only used to translate the error returned by the server applications.
|
||||||
func convertCode(err error) codes.Code {
|
func convertCode(err error) codes.Code {
|
||||||
|
@ -62,7 +62,7 @@ func (s *serviceStore) Context() context.Context {
|
|||||||
// Sync all the known records
|
// Sync all the known records
|
||||||
func (s *serviceStore) List(opts ...store.ListOption) ([]string, error) {
|
func (s *serviceStore) List(opts ...store.ListOption) ([]string, error) {
|
||||||
stream, err := s.Client.List(s.Context(), &pb.ListRequest{}, client.WithAddress(s.Nodes...))
|
stream, err := s.Client.List(s.Context(), &pb.ListRequest{}, client.WithAddress(s.Nodes...))
|
||||||
if verr, ok := err.(*errors.Error); ok && verr.Code == 404 {
|
if err != nil && errors.Equal(err, errors.NotFound("", "")) {
|
||||||
return nil, store.ErrNotFound
|
return nil, store.ErrNotFound
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -101,7 +101,7 @@ func (s *serviceStore) Read(key string, opts ...store.ReadOption) ([]*store.Reco
|
|||||||
Prefix: options.Prefix,
|
Prefix: options.Prefix,
|
||||||
},
|
},
|
||||||
}, client.WithAddress(s.Nodes...))
|
}, client.WithAddress(s.Nodes...))
|
||||||
if verr, ok := err.(*errors.Error); ok && verr.Code == 404 {
|
if err != nil && errors.Equal(err, errors.NotFound("", "")) {
|
||||||
return nil, store.ErrNotFound
|
return nil, store.ErrNotFound
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -129,6 +129,9 @@ func (s *serviceStore) Write(record *store.Record, opts ...store.WriteOption) er
|
|||||||
Expiry: int64(record.Expiry.Seconds()),
|
Expiry: int64(record.Expiry.Seconds()),
|
||||||
},
|
},
|
||||||
}, client.WithAddress(s.Nodes...))
|
}, client.WithAddress(s.Nodes...))
|
||||||
|
if err != nil && errors.Equal(err, errors.NotFound("", "")) {
|
||||||
|
return store.ErrNotFound
|
||||||
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -138,9 +141,10 @@ func (s *serviceStore) Delete(key string, opts ...store.DeleteOption) error {
|
|||||||
_, err := s.Client.Delete(s.Context(), &pb.DeleteRequest{
|
_, err := s.Client.Delete(s.Context(), &pb.DeleteRequest{
|
||||||
Key: key,
|
Key: key,
|
||||||
}, client.WithAddress(s.Nodes...))
|
}, client.WithAddress(s.Nodes...))
|
||||||
if verr, ok := err.(*errors.Error); ok && verr.Code == 404 {
|
if err != nil && errors.Equal(err, errors.NotFound("", "")) {
|
||||||
return store.ErrNotFound
|
return store.ErrNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user