diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index 2dd65ec6..1613506e 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -8,6 +8,7 @@ import ( "fmt" "sync" "time" + "os" "github.com/micro/go-micro/broker" "github.com/micro/go-micro/client" @@ -48,6 +49,18 @@ func (g *grpcClient) secure() grpc.DialOption { } func (g *grpcClient) next(request client.Request, opts client.CallOptions) (selector.Next, error) { + service := request.Service() + + // get proxy + if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 { + service = prx + } + + // get proxy address + if prx := os.Getenv("MICRO_PROXY_ADDRESS"); len(prx) > 0 { + opts.Address = prx + } + // return remote address if len(opts.Address) > 0 { return func() (*registry.Node, error) { @@ -58,7 +71,7 @@ func (g *grpcClient) next(request client.Request, opts client.CallOptions) (sele } // get next nodes from the selector - next, err := g.opts.Selector.Select(request.Service(), opts.SelectOptions...) + next, err := g.opts.Selector.Select(service, opts.SelectOptions...) if err != nil && err == selector.ErrNotFound { return nil, errors.NotFound("go.micro.client", err.Error()) } else if err != nil { @@ -164,7 +177,7 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client dialCtx, cancel = context.WithCancel(ctx) } defer cancel() - cc, err := grpc.DialContext(dialCtx, address, grpc.WithDefaultCallOptions(grpc.CallCustomCodec(cf)), g.secure()) + cc, err := grpc.DialContext(dialCtx, address, grpc.WithDefaultCallOptions(grpc.ForceCodec(cf)), g.secure()) if err != nil { return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) } @@ -175,14 +188,21 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client ServerStreams: true, } - st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Endpoint(), req.Body()), grpc.CallContentSubtype(cf.String())) + st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Endpoint(), req.Body())) if err != nil { return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err)) } + rsp := &response{ + conn: cc, + stream: st, + codec: cf, + } + return &grpcStream{ context: ctx, request: req, + response: rsp, stream: st, conn: cc, }, nil diff --git a/client/grpc/request.go b/client/grpc/request.go index 28409c44..cbb8fc53 100644 --- a/client/grpc/request.go +++ b/client/grpc/request.go @@ -15,6 +15,7 @@ type grpcRequest struct { contentType string request interface{} opts client.RequestOptions + codec codec.Codec } func methodToGRPC(method string, request interface{}) string { @@ -80,7 +81,7 @@ func (g *grpcRequest) Endpoint() string { } func (g *grpcRequest) Codec() codec.Writer { - return nil + return g.codec } func (g *grpcRequest) Body() interface{} { diff --git a/client/grpc/response.go b/client/grpc/response.go new file mode 100644 index 00000000..23c452f7 --- /dev/null +++ b/client/grpc/response.go @@ -0,0 +1,37 @@ +package grpc + +import ( + "strings" + + "github.com/micro/go-micro/codec" + "google.golang.org/grpc" +) + +type response struct { + conn *grpc.ClientConn + stream grpc.ClientStream + codec grpc.Codec +} + +// Read the response +func (r *response) Codec() codec.Reader { + return nil +} + +// read the header +func (r *response) Header() map[string]string { + md, err := r.stream.Header() + if err != nil { + return map[string]string{} + } + hdr := make(map[string]string) + for k, v := range md { + hdr[k] = strings.Join(v, ",") + } + return hdr +} + +// Read the undecoded response +func (r *response) Read() ([]byte, error) { + return nil, nil +} diff --git a/client/grpc/stream.go b/client/grpc/stream.go index bc00dbf8..d9aae958 100644 --- a/client/grpc/stream.go +++ b/client/grpc/stream.go @@ -14,8 +14,9 @@ type grpcStream struct { sync.RWMutex err error conn *grpc.ClientConn - request client.Request stream grpc.ClientStream + request client.Request + response client.Response context context.Context } @@ -28,7 +29,7 @@ func (g *grpcStream) Request() client.Request { } func (g *grpcStream) Response() client.Response { - return nil + return g.response } func (g *grpcStream) Send(msg interface{}) error { diff --git a/client/rpc/rpc.go b/client/mucp/mucp.go similarity index 78% rename from client/rpc/rpc.go rename to client/mucp/mucp.go index 70371c05..67b43dbd 100644 --- a/client/rpc/rpc.go +++ b/client/mucp/mucp.go @@ -1,5 +1,5 @@ -// Package rpc provides an rpc client -package rpc +// Package mucp provides an mucp client +package mucp import ( "github.com/micro/go-micro/client" diff --git a/client/rpc_client.go b/client/rpc_client.go index 1b95e82d..d22e7e42 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -567,5 +567,5 @@ func (r *rpcClient) NewRequest(service, method string, request interface{}, reqO } func (r *rpcClient) String() string { - return "rpc" + return "mucp" } diff --git a/cmd/cmd.go b/cmd/cmd.go index 25057466..310aa431 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -11,7 +11,11 @@ import ( "github.com/micro/cli" "github.com/micro/go-micro/client" + cgrpc "github.com/micro/go-micro/client/grpc" + cmucp "github.com/micro/go-micro/client/mucp" "github.com/micro/go-micro/server" + smucp "github.com/micro/go-micro/server/mucp" + sgrpc "github.com/micro/go-micro/server/grpc" "github.com/micro/go-micro/util/log" // brokers @@ -177,6 +181,8 @@ var ( DefaultClients = map[string]func(...client.Option) client.Client{ "rpc": client.NewClient, + "mucp": cmucp.NewClient, + "grpc": cgrpc.NewClient, } DefaultRegistries = map[string]func(...registry.Option) registry.Registry{ @@ -195,6 +201,8 @@ var ( DefaultServers = map[string]func(...server.Option) server.Server{ "rpc": server.NewServer, + "mucp": smucp.NewServer, + "grpc": sgrpc.NewServer, } DefaultTransports = map[string]func(...transport.Option) transport.Transport{ diff --git a/server/grpc/grpc.go b/server/grpc/grpc.go index 74a2745b..bde2e4c6 100644 --- a/server/grpc/grpc.go +++ b/server/grpc/grpc.go @@ -81,6 +81,14 @@ func newGRPCServer(opts ...server.Option) server.Server { return srv } +type grpcRouter struct { + h func(context.Context, server.Request, interface{}) error +} + +func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error { + return r.h(ctx, req, rsp) +} + func (g *grpcServer) configure(opts ...server.Option) { // Don't reprocess where there's no config if len(opts) == 0 && g.srv != nil { @@ -167,19 +175,6 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) error { return status.New(codes.InvalidArgument, err.Error()).Err() } - g.rpc.mu.Lock() - service := g.rpc.serviceMap[serviceName] - g.rpc.mu.Unlock() - - if service == nil { - return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %v", service)).Err() - } - - mtype := service.method[methodName] - if mtype == nil { - return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %v", service)).Err() - } - // get grpc metadata gmd, ok := metadata.FromIncomingContext(stream.Context()) if !ok { @@ -214,6 +209,51 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) error { } } + // process via router + if g.opts.Router != nil { + // create a client.Request + request := &rpcRequest{ + service: g.opts.Name, + contentType: ct, + method: fmt.Sprintf("%s.%s", serviceName, methodName), + } + + response := &rpcResponse{} + + // create a wrapped function + handler := func(ctx context.Context, req server.Request, rsp interface{}) error { + return g.opts.Router.ServeRequest(ctx, req, rsp.(server.Response)) + } + + // execute the wrapper for it + for i := len(g.opts.HdlrWrappers); i > 0; i-- { + handler = g.opts.HdlrWrappers[i-1](handler) + } + + r := grpcRouter{handler} + + // serve the actual request using the request router + if err := r.ServeRequest(ctx, request, response); err != nil { + return status.Errorf(codes.Internal, err.Error()) + } + + return nil + } + + // process the standard request flow + g.rpc.mu.Lock() + service := g.rpc.serviceMap[serviceName] + g.rpc.mu.Unlock() + + if service == nil { + return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %v", service)).Err() + } + + mtype := service.method[methodName] + if mtype == nil { + return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %v", service)).Err() + } + // process unary if !mtype.stream { return g.processRequest(stream, service, mtype, ct, ctx) diff --git a/server/grpc/response.go b/server/grpc/response.go new file mode 100644 index 00000000..451b1f4e --- /dev/null +++ b/server/grpc/response.go @@ -0,0 +1,35 @@ +package grpc + +import ( + "net/http" + + "github.com/micro/go-micro/codec" + "github.com/micro/go-micro/transport" +) + +type rpcResponse struct { + header map[string]string + socket transport.Socket + codec codec.Codec +} + +func (r *rpcResponse) Codec() codec.Writer { + return r.codec +} + +func (r *rpcResponse) WriteHeader(hdr map[string]string) { + for k, v := range hdr { + r.header[k] = v + } +} + +func (r *rpcResponse) Write(b []byte) error { + if _, ok := r.header["Content-Type"]; !ok { + r.header["Content-Type"] = http.DetectContentType(b) + } + + return r.socket.Send(&transport.Message{ + Header: r.header, + Body: b, + }) +} diff --git a/server/rpc/rpc.go b/server/mucp/mucp.go similarity index 77% rename from server/rpc/rpc.go rename to server/mucp/mucp.go index 38278ffb..c4133dc0 100644 --- a/server/rpc/rpc.go +++ b/server/mucp/mucp.go @@ -1,5 +1,5 @@ -// Package rpc provides an rpc server -package rpc +// Package mucp provides an mucp server +package mucp import ( "github.com/micro/go-micro/server" diff --git a/server/rpc_server.go b/server/rpc_server.go index 22fca2d9..b1db38be 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -611,5 +611,5 @@ func (s *rpcServer) Stop() error { } func (s *rpcServer) String() string { - return "rpc" + return "mucp" } diff --git a/service/mucp/mucp.go b/service/mucp/mucp.go new file mode 100644 index 00000000..0275b846 --- /dev/null +++ b/service/mucp/mucp.go @@ -0,0 +1,21 @@ +// Package mucp initialises a mucp service +package mucp + +import ( + // TODO: change to go-micro/service + "github.com/micro/go-micro" + "github.com/micro/go-micro/client/mucp" + "github.com/micro/go-micro/server/mucp" +) + +// NewService returns a new mucp service +func NewService(opts ...micro.Option) micro.Service { + options := []micro.Option{ + micro.Client(mucp.NewClient()), + micro.Server(mucp.NewServer()), + } + + options = append(options, opts...) + + return micro.NewService(opts...) +}