check in this cruft

This commit is contained in:
Asim Aslam 2019-06-08 19:40:44 +01:00 committed by Vasiliy Tolstov
parent 96d68eeace
commit 502ac83bfe
4 changed files with 65 additions and 6 deletions

26
grpc.go
View File

@ -8,6 +8,7 @@ import (
"fmt" "fmt"
"sync" "sync"
"time" "time"
"os"
"github.com/micro/go-micro/broker" "github.com/micro/go-micro/broker"
"github.com/micro/go-micro/client" "github.com/micro/go-micro/client"
@ -48,6 +49,18 @@ func (g *grpcClient) secure() grpc.DialOption {
} }
func (g *grpcClient) next(request client.Request, opts client.CallOptions) (selector.Next, error) { func (g *grpcClient) next(request client.Request, opts client.CallOptions) (selector.Next, error) {
service := request.Service()
// get proxy
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
service = prx
}
// get proxy address
if prx := os.Getenv("MICRO_PROXY_ADDRESS"); len(prx) > 0 {
opts.Address = prx
}
// return remote address // return remote address
if len(opts.Address) > 0 { if len(opts.Address) > 0 {
return func() (*registry.Node, error) { return func() (*registry.Node, error) {
@ -58,7 +71,7 @@ func (g *grpcClient) next(request client.Request, opts client.CallOptions) (sele
} }
// get next nodes from the selector // get next nodes from the selector
next, err := g.opts.Selector.Select(request.Service(), opts.SelectOptions...) next, err := g.opts.Selector.Select(service, opts.SelectOptions...)
if err != nil && err == selector.ErrNotFound { if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error()) return nil, errors.NotFound("go.micro.client", err.Error())
} else if err != nil { } else if err != nil {
@ -164,7 +177,7 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
dialCtx, cancel = context.WithCancel(ctx) dialCtx, cancel = context.WithCancel(ctx)
} }
defer cancel() defer cancel()
cc, err := grpc.DialContext(dialCtx, address, grpc.WithDefaultCallOptions(grpc.CallCustomCodec(cf)), g.secure()) cc, err := grpc.DialContext(dialCtx, address, grpc.WithDefaultCallOptions(grpc.ForceCodec(cf)), g.secure())
if err != nil { if err != nil {
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
} }
@ -175,14 +188,21 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
ServerStreams: true, ServerStreams: true,
} }
st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Endpoint(), req.Body()), grpc.CallContentSubtype(cf.String())) st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Endpoint(), req.Body()))
if err != nil { if err != nil {
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err)) return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err))
} }
rsp := &response{
conn: cc,
stream: st,
codec: cf,
}
return &grpcStream{ return &grpcStream{
context: ctx, context: ctx,
request: req, request: req,
response: rsp,
stream: st, stream: st,
conn: cc, conn: cc,
}, nil }, nil

View File

@ -15,6 +15,7 @@ type grpcRequest struct {
contentType string contentType string
request interface{} request interface{}
opts client.RequestOptions opts client.RequestOptions
codec codec.Codec
} }
func methodToGRPC(method string, request interface{}) string { func methodToGRPC(method string, request interface{}) string {
@ -80,7 +81,7 @@ func (g *grpcRequest) Endpoint() string {
} }
func (g *grpcRequest) Codec() codec.Writer { func (g *grpcRequest) Codec() codec.Writer {
return nil return g.codec
} }
func (g *grpcRequest) Body() interface{} { func (g *grpcRequest) Body() interface{} {

37
response.go Normal file
View File

@ -0,0 +1,37 @@
package grpc
import (
"strings"
"github.com/micro/go-micro/codec"
"google.golang.org/grpc"
)
type response struct {
conn *grpc.ClientConn
stream grpc.ClientStream
codec grpc.Codec
}
// Read the response
func (r *response) Codec() codec.Reader {
return nil
}
// read the header
func (r *response) Header() map[string]string {
md, err := r.stream.Header()
if err != nil {
return map[string]string{}
}
hdr := make(map[string]string)
for k, v := range md {
hdr[k] = strings.Join(v, ",")
}
return hdr
}
// Read the undecoded response
func (r *response) Read() ([]byte, error) {
return nil, nil
}

View File

@ -14,8 +14,9 @@ type grpcStream struct {
sync.RWMutex sync.RWMutex
err error err error
conn *grpc.ClientConn conn *grpc.ClientConn
request client.Request
stream grpc.ClientStream stream grpc.ClientStream
request client.Request
response client.Response
context context.Context context context.Context
} }
@ -28,7 +29,7 @@ func (g *grpcStream) Request() client.Request {
} }
func (g *grpcStream) Response() client.Response { func (g *grpcStream) Response() client.Response {
return nil return g.response
} }
func (g *grpcStream) Send(msg interface{}) error { func (g *grpcStream) Send(msg interface{}) error {