check in this cruft
This commit is contained in:
parent
95b8147fa1
commit
ed4bce3285
@ -8,6 +8,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
"os"
|
||||||
|
|
||||||
"github.com/micro/go-micro/broker"
|
"github.com/micro/go-micro/broker"
|
||||||
"github.com/micro/go-micro/client"
|
"github.com/micro/go-micro/client"
|
||||||
@ -48,6 +49,18 @@ func (g *grpcClient) secure() grpc.DialOption {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcClient) next(request client.Request, opts client.CallOptions) (selector.Next, error) {
|
func (g *grpcClient) next(request client.Request, opts client.CallOptions) (selector.Next, error) {
|
||||||
|
service := request.Service()
|
||||||
|
|
||||||
|
// get proxy
|
||||||
|
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
|
||||||
|
service = prx
|
||||||
|
}
|
||||||
|
|
||||||
|
// get proxy address
|
||||||
|
if prx := os.Getenv("MICRO_PROXY_ADDRESS"); len(prx) > 0 {
|
||||||
|
opts.Address = prx
|
||||||
|
}
|
||||||
|
|
||||||
// return remote address
|
// return remote address
|
||||||
if len(opts.Address) > 0 {
|
if len(opts.Address) > 0 {
|
||||||
return func() (*registry.Node, error) {
|
return func() (*registry.Node, error) {
|
||||||
@ -58,7 +71,7 @@ func (g *grpcClient) next(request client.Request, opts client.CallOptions) (sele
|
|||||||
}
|
}
|
||||||
|
|
||||||
// get next nodes from the selector
|
// get next nodes from the selector
|
||||||
next, err := g.opts.Selector.Select(request.Service(), opts.SelectOptions...)
|
next, err := g.opts.Selector.Select(service, opts.SelectOptions...)
|
||||||
if err != nil && err == selector.ErrNotFound {
|
if err != nil && err == selector.ErrNotFound {
|
||||||
return nil, errors.NotFound("go.micro.client", err.Error())
|
return nil, errors.NotFound("go.micro.client", err.Error())
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
@ -164,7 +177,7 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
|
|||||||
dialCtx, cancel = context.WithCancel(ctx)
|
dialCtx, cancel = context.WithCancel(ctx)
|
||||||
}
|
}
|
||||||
defer cancel()
|
defer cancel()
|
||||||
cc, err := grpc.DialContext(dialCtx, address, grpc.WithDefaultCallOptions(grpc.CallCustomCodec(cf)), g.secure())
|
cc, err := grpc.DialContext(dialCtx, address, grpc.WithDefaultCallOptions(grpc.ForceCodec(cf)), g.secure())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
|
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
|
||||||
}
|
}
|
||||||
@ -175,14 +188,21 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
|
|||||||
ServerStreams: true,
|
ServerStreams: true,
|
||||||
}
|
}
|
||||||
|
|
||||||
st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Endpoint(), req.Body()), grpc.CallContentSubtype(cf.String()))
|
st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Endpoint(), req.Body()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err))
|
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rsp := &response{
|
||||||
|
conn: cc,
|
||||||
|
stream: st,
|
||||||
|
codec: cf,
|
||||||
|
}
|
||||||
|
|
||||||
return &grpcStream{
|
return &grpcStream{
|
||||||
context: ctx,
|
context: ctx,
|
||||||
request: req,
|
request: req,
|
||||||
|
response: rsp,
|
||||||
stream: st,
|
stream: st,
|
||||||
conn: cc,
|
conn: cc,
|
||||||
}, nil
|
}, nil
|
||||||
|
@ -15,6 +15,7 @@ type grpcRequest struct {
|
|||||||
contentType string
|
contentType string
|
||||||
request interface{}
|
request interface{}
|
||||||
opts client.RequestOptions
|
opts client.RequestOptions
|
||||||
|
codec codec.Codec
|
||||||
}
|
}
|
||||||
|
|
||||||
func methodToGRPC(method string, request interface{}) string {
|
func methodToGRPC(method string, request interface{}) string {
|
||||||
@ -80,7 +81,7 @@ func (g *grpcRequest) Endpoint() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcRequest) Codec() codec.Writer {
|
func (g *grpcRequest) Codec() codec.Writer {
|
||||||
return nil
|
return g.codec
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcRequest) Body() interface{} {
|
func (g *grpcRequest) Body() interface{} {
|
||||||
|
37
client/grpc/response.go
Normal file
37
client/grpc/response.go
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
package grpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/micro/go-micro/codec"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
type response struct {
|
||||||
|
conn *grpc.ClientConn
|
||||||
|
stream grpc.ClientStream
|
||||||
|
codec grpc.Codec
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read the response
|
||||||
|
func (r *response) Codec() codec.Reader {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// read the header
|
||||||
|
func (r *response) Header() map[string]string {
|
||||||
|
md, err := r.stream.Header()
|
||||||
|
if err != nil {
|
||||||
|
return map[string]string{}
|
||||||
|
}
|
||||||
|
hdr := make(map[string]string)
|
||||||
|
for k, v := range md {
|
||||||
|
hdr[k] = strings.Join(v, ",")
|
||||||
|
}
|
||||||
|
return hdr
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read the undecoded response
|
||||||
|
func (r *response) Read() ([]byte, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
@ -14,8 +14,9 @@ type grpcStream struct {
|
|||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
err error
|
err error
|
||||||
conn *grpc.ClientConn
|
conn *grpc.ClientConn
|
||||||
request client.Request
|
|
||||||
stream grpc.ClientStream
|
stream grpc.ClientStream
|
||||||
|
request client.Request
|
||||||
|
response client.Response
|
||||||
context context.Context
|
context context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -28,7 +29,7 @@ func (g *grpcStream) Request() client.Request {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcStream) Response() client.Response {
|
func (g *grpcStream) Response() client.Response {
|
||||||
return nil
|
return g.response
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcStream) Send(msg interface{}) error {
|
func (g *grpcStream) Send(msg interface{}) error {
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
// Package rpc provides an rpc client
|
// Package mucp provides an mucp client
|
||||||
package rpc
|
package mucp
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/micro/go-micro/client"
|
"github.com/micro/go-micro/client"
|
@ -567,5 +567,5 @@ func (r *rpcClient) NewRequest(service, method string, request interface{}, reqO
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *rpcClient) String() string {
|
func (r *rpcClient) String() string {
|
||||||
return "rpc"
|
return "mucp"
|
||||||
}
|
}
|
||||||
|
@ -11,7 +11,11 @@ import (
|
|||||||
|
|
||||||
"github.com/micro/cli"
|
"github.com/micro/cli"
|
||||||
"github.com/micro/go-micro/client"
|
"github.com/micro/go-micro/client"
|
||||||
|
cgrpc "github.com/micro/go-micro/client/grpc"
|
||||||
|
cmucp "github.com/micro/go-micro/client/mucp"
|
||||||
"github.com/micro/go-micro/server"
|
"github.com/micro/go-micro/server"
|
||||||
|
smucp "github.com/micro/go-micro/server/mucp"
|
||||||
|
sgrpc "github.com/micro/go-micro/server/grpc"
|
||||||
"github.com/micro/go-micro/util/log"
|
"github.com/micro/go-micro/util/log"
|
||||||
|
|
||||||
// brokers
|
// brokers
|
||||||
@ -177,6 +181,8 @@ var (
|
|||||||
|
|
||||||
DefaultClients = map[string]func(...client.Option) client.Client{
|
DefaultClients = map[string]func(...client.Option) client.Client{
|
||||||
"rpc": client.NewClient,
|
"rpc": client.NewClient,
|
||||||
|
"mucp": cmucp.NewClient,
|
||||||
|
"grpc": cgrpc.NewClient,
|
||||||
}
|
}
|
||||||
|
|
||||||
DefaultRegistries = map[string]func(...registry.Option) registry.Registry{
|
DefaultRegistries = map[string]func(...registry.Option) registry.Registry{
|
||||||
@ -195,6 +201,8 @@ var (
|
|||||||
|
|
||||||
DefaultServers = map[string]func(...server.Option) server.Server{
|
DefaultServers = map[string]func(...server.Option) server.Server{
|
||||||
"rpc": server.NewServer,
|
"rpc": server.NewServer,
|
||||||
|
"mucp": smucp.NewServer,
|
||||||
|
"grpc": sgrpc.NewServer,
|
||||||
}
|
}
|
||||||
|
|
||||||
DefaultTransports = map[string]func(...transport.Option) transport.Transport{
|
DefaultTransports = map[string]func(...transport.Option) transport.Transport{
|
||||||
|
@ -81,6 +81,14 @@ func newGRPCServer(opts ...server.Option) server.Server {
|
|||||||
return srv
|
return srv
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type grpcRouter struct {
|
||||||
|
h func(context.Context, server.Request, interface{}) error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error {
|
||||||
|
return r.h(ctx, req, rsp)
|
||||||
|
}
|
||||||
|
|
||||||
func (g *grpcServer) configure(opts ...server.Option) {
|
func (g *grpcServer) configure(opts ...server.Option) {
|
||||||
// Don't reprocess where there's no config
|
// Don't reprocess where there's no config
|
||||||
if len(opts) == 0 && g.srv != nil {
|
if len(opts) == 0 && g.srv != nil {
|
||||||
@ -167,19 +175,6 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) error {
|
|||||||
return status.New(codes.InvalidArgument, err.Error()).Err()
|
return status.New(codes.InvalidArgument, err.Error()).Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
g.rpc.mu.Lock()
|
|
||||||
service := g.rpc.serviceMap[serviceName]
|
|
||||||
g.rpc.mu.Unlock()
|
|
||||||
|
|
||||||
if service == nil {
|
|
||||||
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %v", service)).Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
mtype := service.method[methodName]
|
|
||||||
if mtype == nil {
|
|
||||||
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %v", service)).Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
// get grpc metadata
|
// get grpc metadata
|
||||||
gmd, ok := metadata.FromIncomingContext(stream.Context())
|
gmd, ok := metadata.FromIncomingContext(stream.Context())
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -214,6 +209,51 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// process via router
|
||||||
|
if g.opts.Router != nil {
|
||||||
|
// create a client.Request
|
||||||
|
request := &rpcRequest{
|
||||||
|
service: g.opts.Name,
|
||||||
|
contentType: ct,
|
||||||
|
method: fmt.Sprintf("%s.%s", serviceName, methodName),
|
||||||
|
}
|
||||||
|
|
||||||
|
response := &rpcResponse{}
|
||||||
|
|
||||||
|
// create a wrapped function
|
||||||
|
handler := func(ctx context.Context, req server.Request, rsp interface{}) error {
|
||||||
|
return g.opts.Router.ServeRequest(ctx, req, rsp.(server.Response))
|
||||||
|
}
|
||||||
|
|
||||||
|
// execute the wrapper for it
|
||||||
|
for i := len(g.opts.HdlrWrappers); i > 0; i-- {
|
||||||
|
handler = g.opts.HdlrWrappers[i-1](handler)
|
||||||
|
}
|
||||||
|
|
||||||
|
r := grpcRouter{handler}
|
||||||
|
|
||||||
|
// serve the actual request using the request router
|
||||||
|
if err := r.ServeRequest(ctx, request, response); err != nil {
|
||||||
|
return status.Errorf(codes.Internal, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// process the standard request flow
|
||||||
|
g.rpc.mu.Lock()
|
||||||
|
service := g.rpc.serviceMap[serviceName]
|
||||||
|
g.rpc.mu.Unlock()
|
||||||
|
|
||||||
|
if service == nil {
|
||||||
|
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %v", service)).Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
mtype := service.method[methodName]
|
||||||
|
if mtype == nil {
|
||||||
|
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %v", service)).Err()
|
||||||
|
}
|
||||||
|
|
||||||
// process unary
|
// process unary
|
||||||
if !mtype.stream {
|
if !mtype.stream {
|
||||||
return g.processRequest(stream, service, mtype, ct, ctx)
|
return g.processRequest(stream, service, mtype, ct, ctx)
|
||||||
|
35
server/grpc/response.go
Normal file
35
server/grpc/response.go
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
package grpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/micro/go-micro/codec"
|
||||||
|
"github.com/micro/go-micro/transport"
|
||||||
|
)
|
||||||
|
|
||||||
|
type rpcResponse struct {
|
||||||
|
header map[string]string
|
||||||
|
socket transport.Socket
|
||||||
|
codec codec.Codec
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcResponse) Codec() codec.Writer {
|
||||||
|
return r.codec
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcResponse) WriteHeader(hdr map[string]string) {
|
||||||
|
for k, v := range hdr {
|
||||||
|
r.header[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcResponse) Write(b []byte) error {
|
||||||
|
if _, ok := r.header["Content-Type"]; !ok {
|
||||||
|
r.header["Content-Type"] = http.DetectContentType(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.socket.Send(&transport.Message{
|
||||||
|
Header: r.header,
|
||||||
|
Body: b,
|
||||||
|
})
|
||||||
|
}
|
@ -1,5 +1,5 @@
|
|||||||
// Package rpc provides an rpc server
|
// Package mucp provides an mucp server
|
||||||
package rpc
|
package mucp
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/micro/go-micro/server"
|
"github.com/micro/go-micro/server"
|
@ -611,5 +611,5 @@ func (s *rpcServer) Stop() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *rpcServer) String() string {
|
func (s *rpcServer) String() string {
|
||||||
return "rpc"
|
return "mucp"
|
||||||
}
|
}
|
||||||
|
21
service/mucp/mucp.go
Normal file
21
service/mucp/mucp.go
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
// Package mucp initialises a mucp service
|
||||||
|
package mucp
|
||||||
|
|
||||||
|
import (
|
||||||
|
// TODO: change to go-micro/service
|
||||||
|
"github.com/micro/go-micro"
|
||||||
|
"github.com/micro/go-micro/client/mucp"
|
||||||
|
"github.com/micro/go-micro/server/mucp"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NewService returns a new mucp service
|
||||||
|
func NewService(opts ...micro.Option) micro.Service {
|
||||||
|
options := []micro.Option{
|
||||||
|
micro.Client(mucp.NewClient()),
|
||||||
|
micro.Server(mucp.NewServer()),
|
||||||
|
}
|
||||||
|
|
||||||
|
options = append(options, opts...)
|
||||||
|
|
||||||
|
return micro.NewService(opts...)
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user