Merge pull request #513 from micro/crufting

Crufting
This commit is contained in:
Asim Aslam 2019-06-12 13:03:17 +01:00 committed by GitHub
commit 2b18b11ab1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 208 additions and 44 deletions

View File

@ -6,6 +6,7 @@ import (
"context"
"crypto/tls"
"fmt"
"os"
"sync"
"time"
@ -48,6 +49,18 @@ func (g *grpcClient) secure() grpc.DialOption {
}
func (g *grpcClient) next(request client.Request, opts client.CallOptions) (selector.Next, error) {
service := request.Service()
// get proxy
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
service = prx
}
// get proxy address
if prx := os.Getenv("MICRO_PROXY_ADDRESS"); len(prx) > 0 {
opts.Address = prx
}
// return remote address
if len(opts.Address) > 0 {
return func() (*registry.Node, error) {
@ -58,7 +71,7 @@ func (g *grpcClient) next(request client.Request, opts client.CallOptions) (sele
}
// get next nodes from the selector
next, err := g.opts.Selector.Select(request.Service(), opts.SelectOptions...)
next, err := g.opts.Selector.Select(service, opts.SelectOptions...)
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
@ -99,7 +112,7 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R
var grr error
cc, err := g.pool.getConn(address, grpc.WithDefaultCallOptions(grpc.CallCustomCodec(cf)),
cc, err := g.pool.getConn(address, grpc.WithDefaultCallOptions(grpc.ForceCodec(cf)),
grpc.WithTimeout(opts.DialTimeout), g.secure(),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
@ -116,7 +129,7 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R
ch := make(chan error, 1)
go func() {
err := cc.Invoke(ctx, methodToGRPC(req.Endpoint(), req.Body()), req.Body(), rsp, grpc.CallContentSubtype(cf.String()))
err := cc.Invoke(ctx, methodToGRPC(req.Endpoint(), req.Body()), req.Body(), rsp, grpc.ForceCodec(cf))
ch <- microError(err)
}()
@ -164,7 +177,7 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
dialCtx, cancel = context.WithCancel(ctx)
}
defer cancel()
cc, err := grpc.DialContext(dialCtx, address, grpc.WithDefaultCallOptions(grpc.CallCustomCodec(cf)), g.secure())
cc, err := grpc.DialContext(dialCtx, address, grpc.WithDefaultCallOptions(grpc.ForceCodec(cf)), g.secure())
if err != nil {
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
}
@ -175,16 +188,23 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
ServerStreams: true,
}
st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Endpoint(), req.Body()), grpc.CallContentSubtype(cf.String()))
st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Endpoint(), req.Body()))
if err != nil {
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err))
}
rsp := &response{
conn: cc,
stream: st,
codec: cf,
}
return &grpcStream{
context: ctx,
request: req,
stream: st,
conn: cc,
context: ctx,
request: req,
response: rsp,
stream: st,
conn: cc,
}, nil
}
@ -210,7 +230,7 @@ func (g *grpcClient) maxSendMsgSizeValue() int {
return v.(int)
}
func (g *grpcClient) newGRPCCodec(contentType string) (grpc.Codec, error) {
func (g *grpcClient) newGRPCCodec(contentType string) (encoding.Codec, error) {
codecs := make(map[string]encoding.Codec)
if g.opts.Context != nil {
if v := g.opts.Context.Value(codecsKey{}); v != nil {

View File

@ -15,6 +15,7 @@ type grpcRequest struct {
contentType string
request interface{}
opts client.RequestOptions
codec codec.Codec
}
func methodToGRPC(method string, request interface{}) string {
@ -80,7 +81,7 @@ func (g *grpcRequest) Endpoint() string {
}
func (g *grpcRequest) Codec() codec.Writer {
return nil
return g.codec
}
func (g *grpcRequest) Body() interface{} {

38
client/grpc/response.go Normal file
View File

@ -0,0 +1,38 @@
package grpc
import (
"strings"
"github.com/micro/go-micro/codec"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding"
)
type response struct {
conn *grpc.ClientConn
stream grpc.ClientStream
codec encoding.Codec
}
// Read the response
func (r *response) Codec() codec.Reader {
return nil
}
// read the header
func (r *response) Header() map[string]string {
md, err := r.stream.Header()
if err != nil {
return map[string]string{}
}
hdr := make(map[string]string)
for k, v := range md {
hdr[k] = strings.Join(v, ",")
}
return hdr
}
// Read the undecoded response
func (r *response) Read() ([]byte, error) {
return nil, nil
}

View File

@ -12,11 +12,12 @@ import (
// Implements the streamer interface
type grpcStream struct {
sync.RWMutex
err error
conn *grpc.ClientConn
request client.Request
stream grpc.ClientStream
context context.Context
err error
conn *grpc.ClientConn
stream grpc.ClientStream
request client.Request
response client.Response
context context.Context
}
func (g *grpcStream) Context() context.Context {
@ -28,7 +29,7 @@ func (g *grpcStream) Request() client.Request {
}
func (g *grpcStream) Response() client.Response {
return nil
return g.response
}
func (g *grpcStream) Send(msg interface{}) error {

View File

@ -1,5 +1,5 @@
// Package rpc provides an rpc client
package rpc
// Package mucp provides an mucp client
package mucp
import (
"github.com/micro/go-micro/client"

View File

@ -567,5 +567,5 @@ func (r *rpcClient) NewRequest(service, method string, request interface{}, reqO
}
func (r *rpcClient) String() string {
return "rpc"
return "mucp"
}

View File

@ -11,7 +11,11 @@ import (
"github.com/micro/cli"
"github.com/micro/go-micro/client"
cgrpc "github.com/micro/go-micro/client/grpc"
cmucp "github.com/micro/go-micro/client/mucp"
"github.com/micro/go-micro/server"
smucp "github.com/micro/go-micro/server/mucp"
sgrpc "github.com/micro/go-micro/server/grpc"
"github.com/micro/go-micro/util/log"
// brokers
@ -177,6 +181,8 @@ var (
DefaultClients = map[string]func(...client.Option) client.Client{
"rpc": client.NewClient,
"mucp": cmucp.NewClient,
"grpc": cgrpc.NewClient,
}
DefaultRegistries = map[string]func(...registry.Option) registry.Registry{
@ -195,6 +201,8 @@ var (
DefaultServers = map[string]func(...server.Option) server.Server{
"rpc": server.NewServer,
"mucp": smucp.NewServer,
"grpc": sgrpc.NewServer,
}
DefaultTransports = map[string]func(...transport.Option) transport.Transport{

View File

@ -10,7 +10,7 @@ import (
"github.com/micro/go-micro/client/grpc"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/codec/bytes"
"github.com/micro/go-micro/options"
"github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/proxy"
"github.com/micro/go-micro/server"
)

View File

@ -11,7 +11,7 @@ import (
"path"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/options"
"github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/proxy"
"github.com/micro/go-micro/server"
)

View File

@ -9,7 +9,7 @@ import (
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/codec/bytes"
"github.com/micro/go-micro/options"
"github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/proxy"
"github.com/micro/go-micro/server"
)

View File

@ -5,7 +5,7 @@ import (
"context"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/options"
"github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/server"
)

View File

@ -81,6 +81,14 @@ func newGRPCServer(opts ...server.Option) server.Server {
return srv
}
type grpcRouter struct {
h func(context.Context, server.Request, interface{}) error
}
func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error {
return r.h(ctx, req, rsp)
}
func (g *grpcServer) configure(opts ...server.Option) {
// Don't reprocess where there's no config
if len(opts) == 0 && g.srv != nil {
@ -167,19 +175,6 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) error {
return status.New(codes.InvalidArgument, err.Error()).Err()
}
g.rpc.mu.Lock()
service := g.rpc.serviceMap[serviceName]
g.rpc.mu.Unlock()
if service == nil {
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %v", service)).Err()
}
mtype := service.method[methodName]
if mtype == nil {
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %v", service)).Err()
}
// get grpc metadata
gmd, ok := metadata.FromIncomingContext(stream.Context())
if !ok {
@ -214,6 +209,51 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) error {
}
}
// process via router
if g.opts.Router != nil {
// create a client.Request
request := &rpcRequest{
service: g.opts.Name,
contentType: ct,
method: fmt.Sprintf("%s.%s", serviceName, methodName),
}
response := &rpcResponse{}
// create a wrapped function
handler := func(ctx context.Context, req server.Request, rsp interface{}) error {
return g.opts.Router.ServeRequest(ctx, req, rsp.(server.Response))
}
// execute the wrapper for it
for i := len(g.opts.HdlrWrappers); i > 0; i-- {
handler = g.opts.HdlrWrappers[i-1](handler)
}
r := grpcRouter{handler}
// serve the actual request using the request router
if err := r.ServeRequest(ctx, request, response); err != nil {
return status.Errorf(codes.Internal, err.Error())
}
return nil
}
// process the standard request flow
g.rpc.mu.Lock()
service := g.rpc.serviceMap[serviceName]
g.rpc.mu.Unlock()
if service == nil {
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %v", service)).Err()
}
mtype := service.method[methodName]
if mtype == nil {
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %v", service)).Err()
}
// process unary
if !mtype.stream {
return g.processRequest(stream, service, mtype, ct, ctx)

35
server/grpc/response.go Normal file
View File

@ -0,0 +1,35 @@
package grpc
import (
"net/http"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/transport"
)
type rpcResponse struct {
header map[string]string
socket transport.Socket
codec codec.Codec
}
func (r *rpcResponse) Codec() codec.Writer {
return r.codec
}
func (r *rpcResponse) WriteHeader(hdr map[string]string) {
for k, v := range hdr {
r.header[k] = v
}
}
func (r *rpcResponse) Write(b []byte) error {
if _, ok := r.header["Content-Type"]; !ok {
r.header["Content-Type"] = http.DetectContentType(b)
}
return r.socket.Send(&transport.Message{
Header: r.header,
Body: b,
})
}

View File

@ -1,5 +1,5 @@
// Package rpc provides an rpc server
package rpc
// Package mucp provides an mucp server
package mucp
import (
"github.com/micro/go-micro/server"

View File

@ -611,5 +611,5 @@ func (s *rpcServer) Stop() error {
}
func (s *rpcServer) String() string {
return "rpc"
return "mucp"
}

21
service/mucp/mucp.go Normal file
View File

@ -0,0 +1,21 @@
// Package mucp initialises a mucp service
package mucp
import (
// TODO: change to go-micro/service
"github.com/micro/go-micro"
cmucp "github.com/micro/go-micro/client/mucp"
smucp "github.com/micro/go-micro/server/mucp"
)
// NewService returns a new mucp service
func NewService(opts ...micro.Option) micro.Service {
options := []micro.Option{
micro.Client(cmucp.NewClient()),
micro.Server(smucp.NewServer()),
}
options = append(options, opts...)
return micro.NewService(opts...)
}

View File

@ -6,7 +6,7 @@ import (
"net"
"github.com/hashicorp/consul/api"
"github.com/micro/go-micro/options"
"github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/store"
)

View File

@ -5,7 +5,7 @@ import (
"sync"
"time"
"github.com/micro/go-micro/options"
"github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/store"
)

View File

@ -1,7 +1,7 @@
package store
import (
"github.com/micro/go-micro/options"
"github.com/micro/go-micro/config/options"
)
// Set the nodes used to back the store

View File

@ -5,7 +5,7 @@ import (
"errors"
"time"
"github.com/micro/go-micro/options"
"github.com/micro/go-micro/config/options"
)
var (