Add working grpc proxy config

This commit is contained in:
Asim Aslam 2019-06-18 18:51:52 +01:00
parent f65694670e
commit d3a6297b17
16 changed files with 785 additions and 86 deletions

View File

@ -10,8 +10,8 @@ import (
"github.com/micro/go-micro/codec/bytes" "github.com/micro/go-micro/codec/bytes"
"github.com/micro/go-micro/codec/jsonrpc" "github.com/micro/go-micro/codec/jsonrpc"
"github.com/micro/go-micro/codec/protorpc" "github.com/micro/go-micro/codec/protorpc"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/encoding"
) )
type jsonCodec struct{} type jsonCodec struct{}
@ -154,27 +154,19 @@ func (g *grpcCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
} }
func (g *grpcCodec) ReadBody(v interface{}) error { func (g *grpcCodec) ReadBody(v interface{}) error {
frame := &bytes.Frame{} if f, ok := v.(*bytes.Frame); ok {
if err := g.s.RecvMsg(frame); err != nil { return g.s.RecvMsg(f)
return err
} }
return g.c.Unmarshal(frame.Data, v) return g.s.RecvMsg(v)
} }
func (g *grpcCodec) Write(m *codec.Message, v interface{}) error { func (g *grpcCodec) Write(m *codec.Message, v interface{}) error {
// if we don't have a body // if we don't have a body
if len(m.Body) == 0 { if v != nil {
b, err := g.c.Marshal(v) return g.s.SendMsg(v)
if err != nil {
return err
} }
m.Body = b
}
// create an encoded frame
frame := &bytes.Frame{m.Body}
// write the body using the framing codec // write the body using the framing codec
return g.s.SendMsg(frame) return g.s.SendMsg(&bytes.Frame{m.Body})
} }
func (g *grpcCodec) Close() error { func (g *grpcCodec) Close() error {
@ -184,4 +176,3 @@ func (g *grpcCodec) Close() error {
func (g *grpcCodec) String() string { func (g *grpcCodec) String() string {
return g.c.Name() return g.c.Name()
} }

View File

@ -32,8 +32,9 @@ type grpcClient struct {
} }
func init() { func init() {
encoding.RegisterCodec(jsonCodec{}) encoding.RegisterCodec(wrapCodec{jsonCodec{}})
encoding.RegisterCodec(bytesCodec{}) encoding.RegisterCodec(wrapCodec{jsonCodec{}})
encoding.RegisterCodec(wrapCodec{bytesCodec{}})
} }
// secure returns the dial option for whether its a secure or insecure connection // secure returns the dial option for whether its a secure or insecure connection
@ -129,7 +130,7 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R
ch := make(chan error, 1) ch := make(chan error, 1)
go func() { go func() {
err := cc.Invoke(ctx, methodToGRPC(req.Endpoint(), req.Body()), req.Body(), rsp, grpc.ForceCodec(cf)) err := cc.Invoke(ctx, methodToGRPC(req.Service(), req.Endpoint()), req.Body(), rsp, grpc.ForceCodec(cf))
ch <- microError(err) ch <- microError(err)
}() }()
@ -191,23 +192,26 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
ServerStreams: true, ServerStreams: true,
} }
st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Endpoint(), req.Body())) st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Service(), req.Endpoint()))
if err != nil { if err != nil {
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err)) return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err))
} }
// set request codec codec := &grpcCodec{
if r, ok := req.(*grpcRequest); ok {
r.codec = &grpcCodec{
s: st, s: st,
c: wc, c: wc,
} }
// set request codec
if r, ok := req.(*grpcRequest); ok {
r.codec = codec
} }
rsp := &response{ rsp := &response{
conn: cc, conn: cc,
stream: st, stream: st,
codec: cf, codec: cf,
gcodec: codec,
} }
return &grpcStream{ return &grpcStream{

View File

@ -2,7 +2,6 @@ package grpc
import ( import (
"fmt" "fmt"
"reflect"
"strings" "strings"
"github.com/micro/go-micro/client" "github.com/micro/go-micro/client"
@ -18,30 +17,21 @@ type grpcRequest struct {
codec codec.Codec codec codec.Codec
} }
func methodToGRPC(method string, request interface{}) string { // service Struct.Method /service.Struct/Method
func methodToGRPC(service, method string) string {
// no method or already grpc method // no method or already grpc method
if len(method) == 0 || method[0] == '/' { if len(method) == 0 || method[0] == '/' {
return method return method
} }
// can't operate on nil request
t := reflect.TypeOf(request)
if t == nil {
return method
}
// dereference
if t.Kind() == reflect.Ptr {
t = t.Elem()
}
// get package name
pParts := strings.Split(t.PkgPath(), "/")
pkg := pParts[len(pParts)-1]
// assume method is Foo.Bar // assume method is Foo.Bar
mParts := strings.Split(method, ".") mParts := strings.Split(method, ".")
if len(mParts) != 2 { if len(mParts) != 2 {
return method return method
} }
// return /pkg.Foo/Bar // return /pkg.Foo/Bar
return fmt.Sprintf("/%s.%s/%s", pkg, mParts[0], mParts[1]) return fmt.Sprintf("/%s.%s/%s", service, mParts[0], mParts[1])
} }
func newGRPCRequest(service, method string, request interface{}, contentType string, reqOpts ...client.RequestOption) client.Request { func newGRPCRequest(service, method string, request interface{}, contentType string, reqOpts ...client.RequestOption) client.Request {

View File

@ -4,6 +4,7 @@ import (
"strings" "strings"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
"github.com/micro/go-micro/codec/bytes"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/encoding" "google.golang.org/grpc/encoding"
) )
@ -12,11 +13,12 @@ type response struct {
conn *grpc.ClientConn conn *grpc.ClientConn
stream grpc.ClientStream stream grpc.ClientStream
codec encoding.Codec codec encoding.Codec
gcodec codec.Codec
} }
// Read the response // Read the response
func (r *response) Codec() codec.Reader { func (r *response) Codec() codec.Reader {
return nil return r.gcodec
} }
// read the header // read the header
@ -34,5 +36,9 @@ func (r *response) Header() map[string]string {
// Read the undecoded response // Read the undecoded response
func (r *response) Read() ([]byte, error) { func (r *response) Read() ([]byte, error) {
return nil, nil f := &bytes.Frame{}
if err := r.gcodec.ReadBody(f); err != nil {
return nil, err
}
return f.Data, nil
} }

View File

@ -0,0 +1,203 @@
// Code generated by protoc-gen-micro. DO NOT EDIT.
// source: micro/go-micro/client/proto/client.proto
package go_micro_client
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
math "math"
)
import (
context "context"
client "github.com/micro/go-micro/client"
server "github.com/micro/go-micro/server"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ client.Option
var _ server.Option
// Client API for Micro service
type MicroService interface {
// Call allows a single request to be made
Call(ctx context.Context, in *Request, opts ...client.CallOption) (*Response, error)
// Stream is a bidirectional stream
Stream(ctx context.Context, opts ...client.CallOption) (Micro_StreamService, error)
// Publish publishes a message and returns an empty Message
Publish(ctx context.Context, in *Message, opts ...client.CallOption) (*Message, error)
}
type microService struct {
c client.Client
name string
}
func NewMicroService(name string, c client.Client) MicroService {
if c == nil {
c = client.NewClient()
}
if len(name) == 0 {
name = "go.micro.client"
}
return &microService{
c: c,
name: name,
}
}
func (c *microService) Call(ctx context.Context, in *Request, opts ...client.CallOption) (*Response, error) {
req := c.c.NewRequest(c.name, "Micro.Call", in)
out := new(Response)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *microService) Stream(ctx context.Context, opts ...client.CallOption) (Micro_StreamService, error) {
req := c.c.NewRequest(c.name, "Micro.Stream", &Request{})
stream, err := c.c.Stream(ctx, req, opts...)
if err != nil {
return nil, err
}
return &microServiceStream{stream}, nil
}
type Micro_StreamService interface {
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error
Send(*Request) error
Recv() (*Response, error)
}
type microServiceStream struct {
stream client.Stream
}
func (x *microServiceStream) Close() error {
return x.stream.Close()
}
func (x *microServiceStream) SendMsg(m interface{}) error {
return x.stream.Send(m)
}
func (x *microServiceStream) RecvMsg(m interface{}) error {
return x.stream.Recv(m)
}
func (x *microServiceStream) Send(m *Request) error {
return x.stream.Send(m)
}
func (x *microServiceStream) Recv() (*Response, error) {
m := new(Response)
err := x.stream.Recv(m)
if err != nil {
return nil, err
}
return m, nil
}
func (c *microService) Publish(ctx context.Context, in *Message, opts ...client.CallOption) (*Message, error) {
req := c.c.NewRequest(c.name, "Micro.Publish", in)
out := new(Message)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for Micro service
type MicroHandler interface {
// Call allows a single request to be made
Call(context.Context, *Request, *Response) error
// Stream is a bidirectional stream
Stream(context.Context, Micro_StreamStream) error
// Publish publishes a message and returns an empty Message
Publish(context.Context, *Message, *Message) error
}
func RegisterMicroHandler(s server.Server, hdlr MicroHandler, opts ...server.HandlerOption) error {
type micro interface {
Call(ctx context.Context, in *Request, out *Response) error
Stream(ctx context.Context, stream server.Stream) error
Publish(ctx context.Context, in *Message, out *Message) error
}
type Micro struct {
micro
}
h := &microHandler{hdlr}
return s.Handle(s.NewHandler(&Micro{h}, opts...))
}
type microHandler struct {
MicroHandler
}
func (h *microHandler) Call(ctx context.Context, in *Request, out *Response) error {
return h.MicroHandler.Call(ctx, in, out)
}
func (h *microHandler) Stream(ctx context.Context, stream server.Stream) error {
return h.MicroHandler.Stream(ctx, &microStreamStream{stream})
}
type Micro_StreamStream interface {
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error
Send(*Response) error
Recv() (*Request, error)
}
type microStreamStream struct {
stream server.Stream
}
func (x *microStreamStream) Close() error {
return x.stream.Close()
}
func (x *microStreamStream) SendMsg(m interface{}) error {
return x.stream.Send(m)
}
func (x *microStreamStream) RecvMsg(m interface{}) error {
return x.stream.Recv(m)
}
func (x *microStreamStream) Send(m *Response) error {
return x.stream.Send(m)
}
func (x *microStreamStream) Recv() (*Request, error) {
m := new(Request)
if err := x.stream.Recv(m); err != nil {
return nil, err
}
return m, nil
}
func (h *microHandler) Publish(ctx context.Context, in *Message, out *Message) error {
return h.MicroHandler.Publish(ctx, in, out)
}

388
client/proto/client.pb.go Normal file
View File

@ -0,0 +1,388 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: micro/go-micro/client/proto/client.proto
package go_micro_client
import (
context "context"
fmt "fmt"
proto "github.com/golang/protobuf/proto"
grpc "google.golang.org/grpc"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type Request struct {
Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"`
Endpoint string `protobuf:"bytes,2,opt,name=endpoint,proto3" json:"endpoint,omitempty"`
ContentType string `protobuf:"bytes,3,opt,name=content_type,json=contentType,proto3" json:"content_type,omitempty"`
Body []byte `protobuf:"bytes,4,opt,name=body,proto3" json:"body,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Request) Reset() { *m = Request{} }
func (m *Request) String() string { return proto.CompactTextString(m) }
func (*Request) ProtoMessage() {}
func (*Request) Descriptor() ([]byte, []int) {
return fileDescriptor_7d733ae29171347b, []int{0}
}
func (m *Request) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Request.Unmarshal(m, b)
}
func (m *Request) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Request.Marshal(b, m, deterministic)
}
func (m *Request) XXX_Merge(src proto.Message) {
xxx_messageInfo_Request.Merge(m, src)
}
func (m *Request) XXX_Size() int {
return xxx_messageInfo_Request.Size(m)
}
func (m *Request) XXX_DiscardUnknown() {
xxx_messageInfo_Request.DiscardUnknown(m)
}
var xxx_messageInfo_Request proto.InternalMessageInfo
func (m *Request) GetService() string {
if m != nil {
return m.Service
}
return ""
}
func (m *Request) GetEndpoint() string {
if m != nil {
return m.Endpoint
}
return ""
}
func (m *Request) GetContentType() string {
if m != nil {
return m.ContentType
}
return ""
}
func (m *Request) GetBody() []byte {
if m != nil {
return m.Body
}
return nil
}
type Response struct {
Body []byte `protobuf:"bytes,1,opt,name=body,proto3" json:"body,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Response) Reset() { *m = Response{} }
func (m *Response) String() string { return proto.CompactTextString(m) }
func (*Response) ProtoMessage() {}
func (*Response) Descriptor() ([]byte, []int) {
return fileDescriptor_7d733ae29171347b, []int{1}
}
func (m *Response) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Response.Unmarshal(m, b)
}
func (m *Response) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Response.Marshal(b, m, deterministic)
}
func (m *Response) XXX_Merge(src proto.Message) {
xxx_messageInfo_Response.Merge(m, src)
}
func (m *Response) XXX_Size() int {
return xxx_messageInfo_Response.Size(m)
}
func (m *Response) XXX_DiscardUnknown() {
xxx_messageInfo_Response.DiscardUnknown(m)
}
var xxx_messageInfo_Response proto.InternalMessageInfo
func (m *Response) GetBody() []byte {
if m != nil {
return m.Body
}
return nil
}
type Message struct {
Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"`
ContentType string `protobuf:"bytes,2,opt,name=content_type,json=contentType,proto3" json:"content_type,omitempty"`
Body []byte `protobuf:"bytes,3,opt,name=body,proto3" json:"body,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Message) Reset() { *m = Message{} }
func (m *Message) String() string { return proto.CompactTextString(m) }
func (*Message) ProtoMessage() {}
func (*Message) Descriptor() ([]byte, []int) {
return fileDescriptor_7d733ae29171347b, []int{2}
}
func (m *Message) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Message.Unmarshal(m, b)
}
func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Message.Marshal(b, m, deterministic)
}
func (m *Message) XXX_Merge(src proto.Message) {
xxx_messageInfo_Message.Merge(m, src)
}
func (m *Message) XXX_Size() int {
return xxx_messageInfo_Message.Size(m)
}
func (m *Message) XXX_DiscardUnknown() {
xxx_messageInfo_Message.DiscardUnknown(m)
}
var xxx_messageInfo_Message proto.InternalMessageInfo
func (m *Message) GetTopic() string {
if m != nil {
return m.Topic
}
return ""
}
func (m *Message) GetContentType() string {
if m != nil {
return m.ContentType
}
return ""
}
func (m *Message) GetBody() []byte {
if m != nil {
return m.Body
}
return nil
}
func init() {
proto.RegisterType((*Request)(nil), "go.micro.client.Request")
proto.RegisterType((*Response)(nil), "go.micro.client.Response")
proto.RegisterType((*Message)(nil), "go.micro.client.Message")
}
func init() {
proto.RegisterFile("micro/go-micro/client/proto/client.proto", fileDescriptor_7d733ae29171347b)
}
var fileDescriptor_7d733ae29171347b = []byte{
// 270 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x91, 0x3f, 0x4f, 0xc3, 0x30,
0x10, 0xc5, 0xeb, 0xfe, 0x4b, 0x39, 0x2a, 0x21, 0x9d, 0x18, 0x4c, 0x06, 0x54, 0x32, 0x65, 0xc1,
0x45, 0x30, 0x23, 0x86, 0xce, 0x95, 0x50, 0x40, 0xac, 0x28, 0x71, 0x4f, 0xc1, 0x52, 0x6a, 0x9b,
0xd8, 0xad, 0x94, 0xef, 0xc8, 0x87, 0x42, 0x38, 0x29, 0x45, 0xd0, 0x2e, 0x6c, 0xf7, 0xee, 0x67,
0xbd, 0x3b, 0xbf, 0x83, 0x74, 0xad, 0x64, 0x6d, 0xe6, 0xa5, 0xb9, 0x6e, 0x0b, 0x59, 0x29, 0xd2,
0x7e, 0x6e, 0x6b, 0xe3, 0x77, 0x42, 0x04, 0x81, 0x67, 0xa5, 0x11, 0xe1, 0x8d, 0x68, 0xdb, 0xc9,
0x16, 0xa2, 0x8c, 0xde, 0x37, 0xe4, 0x3c, 0x72, 0x88, 0x1c, 0xd5, 0x5b, 0x25, 0x89, 0xb3, 0x19,
0x4b, 0x4f, 0xb2, 0x9d, 0xc4, 0x18, 0x26, 0xa4, 0x57, 0xd6, 0x28, 0xed, 0x79, 0x3f, 0xa0, 0x6f,
0x8d, 0x57, 0x30, 0x95, 0x46, 0x7b, 0xd2, 0xfe, 0xd5, 0x37, 0x96, 0xf8, 0x20, 0xf0, 0xd3, 0xae,
0xf7, 0xdc, 0x58, 0x42, 0x84, 0x61, 0x61, 0x56, 0x0d, 0x1f, 0xce, 0x58, 0x3a, 0xcd, 0x42, 0x9d,
0x5c, 0xc2, 0x24, 0x23, 0x67, 0x8d, 0x76, 0x7b, 0xce, 0x7e, 0xf0, 0x17, 0x88, 0x96, 0xe4, 0x5c,
0x5e, 0x12, 0x9e, 0xc3, 0xc8, 0x1b, 0xab, 0x64, 0xb7, 0x55, 0x2b, 0xfe, 0xcc, 0xed, 0x1f, 0x9f,
0x3b, 0xd8, 0xfb, 0xde, 0x7e, 0x30, 0x18, 0x2d, 0xbf, 0x02, 0xc0, 0x7b, 0x18, 0x2e, 0xf2, 0xaa,
0x42, 0x2e, 0x7e, 0x65, 0x22, 0xba, 0x40, 0xe2, 0x8b, 0x03, 0xa4, 0x5d, 0x39, 0xe9, 0xe1, 0x02,
0xc6, 0x4f, 0xbe, 0xa6, 0x7c, 0xfd, 0x4f, 0x83, 0x94, 0xdd, 0x30, 0x7c, 0x80, 0xe8, 0x71, 0x53,
0x54, 0xca, 0xbd, 0x1d, 0x70, 0xe9, 0xfe, 0x1f, 0x1f, 0x25, 0x49, 0xaf, 0x18, 0x87, 0xb3, 0xde,
0x7d, 0x06, 0x00, 0x00, 0xff, 0xff, 0xd3, 0x63, 0x94, 0x1a, 0x02, 0x02, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// MicroClient is the client API for Micro service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type MicroClient interface {
// Call allows a single request to be made
Call(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error)
// Stream is a bidirectional stream
Stream(ctx context.Context, opts ...grpc.CallOption) (Micro_StreamClient, error)
// Publish publishes a message and returns an empty Message
Publish(ctx context.Context, in *Message, opts ...grpc.CallOption) (*Message, error)
}
type microClient struct {
cc *grpc.ClientConn
}
func NewMicroClient(cc *grpc.ClientConn) MicroClient {
return &microClient{cc}
}
func (c *microClient) Call(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error) {
out := new(Response)
err := c.cc.Invoke(ctx, "/go.micro.client.Micro/Call", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *microClient) Stream(ctx context.Context, opts ...grpc.CallOption) (Micro_StreamClient, error) {
stream, err := c.cc.NewStream(ctx, &_Micro_serviceDesc.Streams[0], "/go.micro.client.Micro/Stream", opts...)
if err != nil {
return nil, err
}
x := &microStreamClient{stream}
return x, nil
}
type Micro_StreamClient interface {
Send(*Request) error
Recv() (*Response, error)
grpc.ClientStream
}
type microStreamClient struct {
grpc.ClientStream
}
func (x *microStreamClient) Send(m *Request) error {
return x.ClientStream.SendMsg(m)
}
func (x *microStreamClient) Recv() (*Response, error) {
m := new(Response)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *microClient) Publish(ctx context.Context, in *Message, opts ...grpc.CallOption) (*Message, error) {
out := new(Message)
err := c.cc.Invoke(ctx, "/go.micro.client.Micro/Publish", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// MicroServer is the server API for Micro service.
type MicroServer interface {
// Call allows a single request to be made
Call(context.Context, *Request) (*Response, error)
// Stream is a bidirectional stream
Stream(Micro_StreamServer) error
// Publish publishes a message and returns an empty Message
Publish(context.Context, *Message) (*Message, error)
}
func RegisterMicroServer(s *grpc.Server, srv MicroServer) {
s.RegisterService(&_Micro_serviceDesc, srv)
}
func _Micro_Call_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Request)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(MicroServer).Call(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/go.micro.client.Micro/Call",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(MicroServer).Call(ctx, req.(*Request))
}
return interceptor(ctx, in, info, handler)
}
func _Micro_Stream_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(MicroServer).Stream(&microStreamServer{stream})
}
type Micro_StreamServer interface {
Send(*Response) error
Recv() (*Request, error)
grpc.ServerStream
}
type microStreamServer struct {
grpc.ServerStream
}
func (x *microStreamServer) Send(m *Response) error {
return x.ServerStream.SendMsg(m)
}
func (x *microStreamServer) Recv() (*Request, error) {
m := new(Request)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func _Micro_Publish_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Message)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(MicroServer).Publish(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/go.micro.client.Micro/Publish",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(MicroServer).Publish(ctx, req.(*Message))
}
return interceptor(ctx, in, info, handler)
}
var _Micro_serviceDesc = grpc.ServiceDesc{
ServiceName: "go.micro.client.Micro",
HandlerType: (*MicroServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Call",
Handler: _Micro_Call_Handler,
},
{
MethodName: "Publish",
Handler: _Micro_Publish_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "Stream",
Handler: _Micro_Stream_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "micro/go-micro/client/proto/client.proto",
}

30
client/proto/client.proto Normal file
View File

@ -0,0 +1,30 @@
syntax = "proto3";
package go.micro.client;
// Micro is the micro client interface
service Micro {
// Call allows a single request to be made
rpc Call(Request) returns (Response) {};
// Stream is a bidirectional stream
rpc Stream(stream Request) returns (stream Response) {};
// Publish publishes a message and returns an empty Message
rpc Publish(Message) returns (Message) {};
}
message Request {
string service = 1;
string endpoint = 2;
string content_type = 3;
bytes body = 4;
}
message Response {
bytes body = 1;
}
message Message {
string topic = 1;
string content_type = 2;
bytes body = 3;
}

View File

@ -14,8 +14,8 @@ import (
cgrpc "github.com/micro/go-micro/client/grpc" cgrpc "github.com/micro/go-micro/client/grpc"
cmucp "github.com/micro/go-micro/client/mucp" cmucp "github.com/micro/go-micro/client/mucp"
"github.com/micro/go-micro/server" "github.com/micro/go-micro/server"
smucp "github.com/micro/go-micro/server/mucp"
sgrpc "github.com/micro/go-micro/server/grpc" sgrpc "github.com/micro/go-micro/server/grpc"
smucp "github.com/micro/go-micro/server/mucp"
"github.com/micro/go-micro/util/log" "github.com/micro/go-micro/util/log"
// brokers // brokers

View File

@ -9,7 +9,6 @@ import (
"github.com/micro/go-micro/client" "github.com/micro/go-micro/client"
"github.com/micro/go-micro/client/grpc" "github.com/micro/go-micro/client/grpc"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
"github.com/micro/go-micro/codec/bytes"
"github.com/micro/go-micro/config/options" "github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/proxy" "github.com/micro/go-micro/proxy"
"github.com/micro/go-micro/server" "github.com/micro/go-micro/server"
@ -86,14 +85,8 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server
} }
} }
// read initial request
body, err := req.Read()
if err != nil {
return err
}
// create new request with raw bytes body // create new request with raw bytes body
creq := p.Client.NewRequest(service, endpoint, &bytes.Frame{body}, client.WithContentType(req.ContentType())) creq := p.Client.NewRequest(service, endpoint, nil, client.WithContentType(req.ContentType()))
// create new stream // create new stream
stream, err := p.Client.Stream(ctx, creq, opts...) stream, err := p.Client.Stream(ctx, creq, opts...)

View File

@ -10,8 +10,8 @@ import (
"net/url" "net/url"
"path" "path"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/config/options" "github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/proxy" "github.com/micro/go-micro/proxy"
"github.com/micro/go-micro/server" "github.com/micro/go-micro/server"
) )

View File

@ -39,6 +39,7 @@ func readLoop(r server.Request, s client.Stream) error {
if err == io.EOF { if err == io.EOF {
return nil return nil
} }
if err != nil { if err != nil {
return err return err
} }
@ -50,6 +51,7 @@ func readLoop(r server.Request, s client.Stream) error {
Header: hdr, Header: hdr,
Body: body, Body: body,
} }
// write the raw request // write the raw request
err = req.Codec().Write(msg, nil) err = req.Codec().Write(msg, nil)
if err == io.EOF { if err == io.EOF {

View File

@ -3,19 +3,22 @@ package grpc
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"strings"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
"github.com/micro/go-micro/codec/bytes" "github.com/micro/go-micro/codec/bytes"
"github.com/micro/go-micro/codec/jsonrpc" "github.com/micro/go-micro/codec/jsonrpc"
"github.com/micro/go-micro/codec/protorpc" "github.com/micro/go-micro/codec/protorpc"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding" "google.golang.org/grpc/encoding"
"google.golang.org/grpc/metadata"
) )
type jsonCodec struct{} type jsonCodec struct{}
type bytesCodec struct{} type bytesCodec struct{}
type protoCodec struct{} type protoCodec struct{}
type wrapCodec struct { encoding.Codec } type wrapCodec struct{ encoding.Codec }
var ( var (
defaultGRPCCodecs = map[string]encoding.Codec{ defaultGRPCCodecs = map[string]encoding.Codec{
@ -103,3 +106,61 @@ func (bytesCodec) Unmarshal(data []byte, v interface{}) error {
func (bytesCodec) Name() string { func (bytesCodec) Name() string {
return "bytes" return "bytes"
} }
type grpcCodec struct {
// headers
id string
target string
method string
endpoint string
s grpc.ServerStream
c encoding.Codec
}
func (g *grpcCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
md, _ := metadata.FromIncomingContext(g.s.Context())
if m == nil {
m = new(codec.Message)
}
if m.Header == nil {
m.Header = make(map[string]string)
}
for k, v := range md {
m.Header[k] = strings.Join(v, ",")
}
m.Id = g.id
m.Target = g.target
m.Method = g.method
m.Endpoint = g.endpoint
return nil
}
func (g *grpcCodec) ReadBody(v interface{}) error {
// caller has requested a frame
if f, ok := v.(*bytes.Frame); ok {
return g.s.RecvMsg(f)
}
return g.s.RecvMsg(v)
}
func (g *grpcCodec) Write(m *codec.Message, v interface{}) error {
// if we don't have a body
if v != nil {
b, err := g.c.Marshal(v)
if err != nil {
return err
}
m.Body = b
}
// write the body using the framing codec
return g.s.SendMsg(&bytes.Frame{m.Body})
}
func (g *grpcCodec) Close() error {
return nil
}
func (g *grpcCodec) String() string {
return g.c.Name()
}

View File

@ -56,6 +56,7 @@ type grpcServer struct {
} }
func init() { func init() {
encoding.RegisterCodec(wrapCodec{protoCodec{}})
encoding.RegisterCodec(wrapCodec{jsonCodec{}}) encoding.RegisterCodec(wrapCodec{jsonCodec{}})
encoding.RegisterCodec(wrapCodec{bytesCodec{}}) encoding.RegisterCodec(wrapCodec{bytesCodec{}})
} }
@ -211,14 +212,30 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) error {
// process via router // process via router
if g.opts.Router != nil { if g.opts.Router != nil {
// create a client.Request cc, err := g.newGRPCCodec(ct)
request := &rpcRequest{ if err != nil {
service: g.opts.Name, return errors.InternalServerError("go.micro.server", err.Error())
contentType: ct, }
codec := &grpcCodec{
method: fmt.Sprintf("%s.%s", serviceName, methodName), method: fmt.Sprintf("%s.%s", serviceName, methodName),
endpoint: fmt.Sprintf("%s.%s", serviceName, methodName),
target: g.opts.Name,
s: stream,
c: cc,
} }
response := &rpcResponse{} // create a client.Request
request := &rpcRequest{
service: mgrpc.ServiceFromMethod(fullMethod),
contentType: ct,
method: fmt.Sprintf("%s.%s", serviceName, methodName),
codec: codec,
}
response := &rpcResponse{
header: make(map[string]string),
codec: codec,
}
// create a wrapped function // create a wrapped function
handler := func(ctx context.Context, req server.Request, rsp interface{}) error { handler := func(ctx context.Context, req server.Request, rsp interface{}) error {

View File

@ -2,6 +2,7 @@ package grpc
import ( import (
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
"github.com/micro/go-micro/codec/bytes"
) )
type rpcRequest struct { type rpcRequest struct {
@ -46,7 +47,11 @@ func (r *rpcRequest) Header() map[string]string {
} }
func (r *rpcRequest) Read() ([]byte, error) { func (r *rpcRequest) Read() ([]byte, error) {
return r.body, nil f := &bytes.Frame{}
if err := r.codec.ReadBody(f); err != nil {
return nil, err
}
return f.Data, nil
} }
func (r *rpcRequest) Stream() bool { func (r *rpcRequest) Stream() bool {

View File

@ -1,15 +1,11 @@
package grpc package grpc
import ( import (
"net/http"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
"github.com/micro/go-micro/transport"
) )
type rpcResponse struct { type rpcResponse struct {
header map[string]string header map[string]string
socket transport.Socket
codec codec.Codec codec codec.Codec
} }
@ -24,12 +20,8 @@ func (r *rpcResponse) WriteHeader(hdr map[string]string) {
} }
func (r *rpcResponse) Write(b []byte) error { func (r *rpcResponse) Write(b []byte) error {
if _, ok := r.header["Content-Type"]; !ok { return r.codec.Write(&codec.Message{
r.header["Content-Type"] = http.DetectContentType(b)
}
return r.socket.Send(&transport.Message{
Header: r.header, Header: r.header,
Body: b, Body: b,
}) }, nil)
} }

View File

@ -38,3 +38,20 @@ func ServiceMethod(m string) (string, string, error) {
return parts[0], parts[1], nil return parts[0], parts[1], nil
} }
// ServiceFromMethod returns the service
// /service.Foo/Bar => service
func ServiceFromMethod(m string) string {
if len(m) == 0 {
return m
}
if m[0] != '/' {
return m
}
parts := strings.Split(m, "/")
if len(parts) < 3 {
return m
}
parts = strings.Split(parts[1], ".")
return strings.Join(parts[:len(parts)-1], ".")
}