diff --git a/client/grpc/codec.go b/client/grpc/codec.go index 4994db70..62f30384 100644 --- a/client/grpc/codec.go +++ b/client/grpc/codec.go @@ -2,12 +2,15 @@ package grpc import ( "fmt" + "strings" "github.com/golang/protobuf/proto" "github.com/json-iterator/go" "github.com/micro/go-micro/codec" + "github.com/micro/go-micro/codec/bytes" "github.com/micro/go-micro/codec/jsonrpc" "github.com/micro/go-micro/codec/protorpc" + "google.golang.org/grpc" "google.golang.org/grpc/encoding" ) @@ -52,7 +55,28 @@ func (w wrapCodec) String() string { return w.Codec.Name() } +func (w wrapCodec) Marshal(v interface{}) ([]byte, error) { + b, ok := v.(*bytes.Frame) + if ok { + return b.Data, nil + } + return w.Codec.Marshal(v) +} + +func (w wrapCodec) Unmarshal(data []byte, v interface{}) error { + b, ok := v.(*bytes.Frame) + if ok { + b.Data = data + return nil + } + return w.Codec.Unmarshal(data, v) +} + func (protoCodec) Marshal(v interface{}) ([]byte, error) { + b, ok := v.(*bytes.Frame) + if ok { + return b.Data, nil + } return proto.Marshal(v.(proto.Message)) } @@ -96,3 +120,59 @@ func (jsonCodec) Unmarshal(data []byte, v interface{}) error { func (jsonCodec) Name() string { return "json" } + +type grpcCodec struct { + // headers + id string + target string + method string + endpoint string + + s grpc.ClientStream + c encoding.Codec +} + +func (g *grpcCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error { + md, err := g.s.Header() + if err != nil { + return err + } + if m == nil { + m = new(codec.Message) + } + if m.Header == nil { + m.Header = make(map[string]string) + } + for k, v := range md { + m.Header[k] = strings.Join(v, ",") + } + m.Id = g.id + m.Target = g.target + m.Method = g.method + m.Endpoint = g.endpoint + return nil +} + +func (g *grpcCodec) ReadBody(v interface{}) error { + if f, ok := v.(*bytes.Frame); ok { + return g.s.RecvMsg(f) + } + return g.s.RecvMsg(v) +} + +func (g *grpcCodec) Write(m *codec.Message, v interface{}) error { + // if we don't have a body + if v != nil { + return g.s.SendMsg(v) + } + // write the body using the framing codec + return g.s.SendMsg(&bytes.Frame{m.Body}) +} + +func (g *grpcCodec) Close() error { + return g.s.CloseSend() +} + +func (g *grpcCodec) String() string { + return g.c.Name() +} diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index 555fed0e..ac1dc82e 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -32,8 +32,9 @@ type grpcClient struct { } func init() { - encoding.RegisterCodec(jsonCodec{}) - encoding.RegisterCodec(bytesCodec{}) + encoding.RegisterCodec(wrapCodec{jsonCodec{}}) + encoding.RegisterCodec(wrapCodec{jsonCodec{}}) + encoding.RegisterCodec(wrapCodec{bytesCodec{}}) } // secure returns the dial option for whether its a secure or insecure connection @@ -129,7 +130,7 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R ch := make(chan error, 1) go func() { - err := cc.Invoke(ctx, methodToGRPC(req.Endpoint(), req.Body()), req.Body(), rsp, grpc.ForceCodec(cf)) + err := cc.Invoke(ctx, methodToGRPC(req.Service(), req.Endpoint()), req.Body(), rsp, grpc.ForceCodec(cf)) ch <- microError(err) }() @@ -177,7 +178,10 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client dialCtx, cancel = context.WithCancel(ctx) } defer cancel() - cc, err := grpc.DialContext(dialCtx, address, grpc.WithDefaultCallOptions(grpc.ForceCodec(cf)), g.secure()) + + wc := wrapCodec{cf} + + cc, err := grpc.DialContext(dialCtx, address, grpc.WithDefaultCallOptions(grpc.ForceCodec(wc)), g.secure()) if err != nil { return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) } @@ -188,15 +192,26 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client ServerStreams: true, } - st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Endpoint(), req.Body())) + st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Service(), req.Endpoint())) if err != nil { return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err)) } + codec := &grpcCodec{ + s: st, + c: wc, + } + + // set request codec + if r, ok := req.(*grpcRequest); ok { + r.codec = codec + } + rsp := &response{ conn: cc, stream: st, codec: cf, + gcodec: codec, } return &grpcStream{ diff --git a/client/grpc/grpc_test.go b/client/grpc/grpc_test.go index 574058cb..074d8931 100644 --- a/client/grpc/grpc_test.go +++ b/client/grpc/grpc_test.go @@ -45,7 +45,7 @@ func TestGRPCClient(t *testing.T) { // register service r.Register(®istry.Service{ - Name: "test", + Name: "helloworld", Version: "test", Nodes: []*registry.Node{ ®istry.Node{ @@ -73,7 +73,7 @@ func TestGRPCClient(t *testing.T) { } for _, method := range testMethods { - req := c.NewRequest("test", method, &pb.HelloRequest{ + req := c.NewRequest("helloworld", method, &pb.HelloRequest{ Name: "John", }) diff --git a/client/grpc/request.go b/client/grpc/request.go index cb14a330..066577ce 100644 --- a/client/grpc/request.go +++ b/client/grpc/request.go @@ -2,7 +2,6 @@ package grpc import ( "fmt" - "reflect" "strings" "github.com/micro/go-micro/client" @@ -18,30 +17,25 @@ type grpcRequest struct { codec codec.Codec } -func methodToGRPC(method string, request interface{}) string { +// service Struct.Method /service.Struct/Method +func methodToGRPC(service, method string) string { // no method or already grpc method if len(method) == 0 || method[0] == '/' { return method } - // can't operate on nil request - t := reflect.TypeOf(request) - if t == nil { - return method - } - // dereference - if t.Kind() == reflect.Ptr { - t = t.Elem() - } - // get package name - pParts := strings.Split(t.PkgPath(), "/") - pkg := pParts[len(pParts)-1] + // assume method is Foo.Bar mParts := strings.Split(method, ".") if len(mParts) != 2 { return method } + + if len(service) == 0 { + return fmt.Sprintf("/%s/%s", mParts[0], mParts[1]) + } + // return /pkg.Foo/Bar - return fmt.Sprintf("/%s.%s/%s", pkg, mParts[0], mParts[1]) + return fmt.Sprintf("/%s.%s/%s", service, mParts[0], mParts[1]) } func newGRPCRequest(service, method string, request interface{}, contentType string, reqOpts ...client.RequestOption) client.Request { diff --git a/client/grpc/request_test.go b/client/grpc/request_test.go index eab3a3a1..c73d675b 100644 --- a/client/grpc/request_test.go +++ b/client/grpc/request_test.go @@ -2,45 +2,38 @@ package grpc import ( "testing" - - pb "google.golang.org/grpc/examples/helloworld/helloworld" ) func TestMethodToGRPC(t *testing.T) { testData := []struct { + service string method string expect string - request interface{} }{ { + "helloworld", "Greeter.SayHello", "/helloworld.Greeter/SayHello", - new(pb.HelloRequest), }, { + "helloworld", "/helloworld.Greeter/SayHello", "/helloworld.Greeter/SayHello", - new(pb.HelloRequest), }, { + "", + "/helloworld.Greeter/SayHello", + "/helloworld.Greeter/SayHello", + }, + { + "", "Greeter.SayHello", - "/helloworld.Greeter/SayHello", - pb.HelloRequest{}, - }, - { - "/helloworld.Greeter/SayHello", - "/helloworld.Greeter/SayHello", - pb.HelloRequest{}, - }, - { - "Greeter.SayHello", - "Greeter.SayHello", - nil, + "/Greeter/SayHello", }, } for _, d := range testData { - method := methodToGRPC(d.method, d.request) + method := methodToGRPC(d.service, d.method) if method != d.expect { t.Fatalf("expected %s got %s", d.expect, method) } diff --git a/client/grpc/response.go b/client/grpc/response.go index 7ef72241..c870fad9 100644 --- a/client/grpc/response.go +++ b/client/grpc/response.go @@ -4,6 +4,7 @@ import ( "strings" "github.com/micro/go-micro/codec" + "github.com/micro/go-micro/codec/bytes" "google.golang.org/grpc" "google.golang.org/grpc/encoding" ) @@ -12,11 +13,12 @@ type response struct { conn *grpc.ClientConn stream grpc.ClientStream codec encoding.Codec + gcodec codec.Codec } // Read the response func (r *response) Codec() codec.Reader { - return nil + return r.gcodec } // read the header @@ -34,5 +36,9 @@ func (r *response) Header() map[string]string { // Read the undecoded response func (r *response) Read() ([]byte, error) { - return nil, nil + f := &bytes.Frame{} + if err := r.gcodec.ReadBody(f); err != nil { + return nil, err + } + return f.Data, nil } diff --git a/client/proto/client.micro.go b/client/proto/client.micro.go new file mode 100644 index 00000000..d8c9ad88 --- /dev/null +++ b/client/proto/client.micro.go @@ -0,0 +1,203 @@ +// Code generated by protoc-gen-micro. DO NOT EDIT. +// source: micro/go-micro/client/proto/client.proto + +package go_micro_client + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +import ( + context "context" + client "github.com/micro/go-micro/client" + server "github.com/micro/go-micro/server" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ client.Option +var _ server.Option + +// Client API for Micro service + +type MicroService interface { + // Call allows a single request to be made + Call(ctx context.Context, in *Request, opts ...client.CallOption) (*Response, error) + // Stream is a bidirectional stream + Stream(ctx context.Context, opts ...client.CallOption) (Micro_StreamService, error) + // Publish publishes a message and returns an empty Message + Publish(ctx context.Context, in *Message, opts ...client.CallOption) (*Message, error) +} + +type microService struct { + c client.Client + name string +} + +func NewMicroService(name string, c client.Client) MicroService { + if c == nil { + c = client.NewClient() + } + if len(name) == 0 { + name = "go.micro.client" + } + return µService{ + c: c, + name: name, + } +} + +func (c *microService) Call(ctx context.Context, in *Request, opts ...client.CallOption) (*Response, error) { + req := c.c.NewRequest(c.name, "Micro.Call", in) + out := new(Response) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *microService) Stream(ctx context.Context, opts ...client.CallOption) (Micro_StreamService, error) { + req := c.c.NewRequest(c.name, "Micro.Stream", &Request{}) + stream, err := c.c.Stream(ctx, req, opts...) + if err != nil { + return nil, err + } + return µServiceStream{stream}, nil +} + +type Micro_StreamService interface { + SendMsg(interface{}) error + RecvMsg(interface{}) error + Close() error + Send(*Request) error + Recv() (*Response, error) +} + +type microServiceStream struct { + stream client.Stream +} + +func (x *microServiceStream) Close() error { + return x.stream.Close() +} + +func (x *microServiceStream) SendMsg(m interface{}) error { + return x.stream.Send(m) +} + +func (x *microServiceStream) RecvMsg(m interface{}) error { + return x.stream.Recv(m) +} + +func (x *microServiceStream) Send(m *Request) error { + return x.stream.Send(m) +} + +func (x *microServiceStream) Recv() (*Response, error) { + m := new(Response) + err := x.stream.Recv(m) + if err != nil { + return nil, err + } + return m, nil +} + +func (c *microService) Publish(ctx context.Context, in *Message, opts ...client.CallOption) (*Message, error) { + req := c.c.NewRequest(c.name, "Micro.Publish", in) + out := new(Message) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for Micro service + +type MicroHandler interface { + // Call allows a single request to be made + Call(context.Context, *Request, *Response) error + // Stream is a bidirectional stream + Stream(context.Context, Micro_StreamStream) error + // Publish publishes a message and returns an empty Message + Publish(context.Context, *Message, *Message) error +} + +func RegisterMicroHandler(s server.Server, hdlr MicroHandler, opts ...server.HandlerOption) error { + type micro interface { + Call(ctx context.Context, in *Request, out *Response) error + Stream(ctx context.Context, stream server.Stream) error + Publish(ctx context.Context, in *Message, out *Message) error + } + type Micro struct { + micro + } + h := µHandler{hdlr} + return s.Handle(s.NewHandler(&Micro{h}, opts...)) +} + +type microHandler struct { + MicroHandler +} + +func (h *microHandler) Call(ctx context.Context, in *Request, out *Response) error { + return h.MicroHandler.Call(ctx, in, out) +} + +func (h *microHandler) Stream(ctx context.Context, stream server.Stream) error { + return h.MicroHandler.Stream(ctx, µStreamStream{stream}) +} + +type Micro_StreamStream interface { + SendMsg(interface{}) error + RecvMsg(interface{}) error + Close() error + Send(*Response) error + Recv() (*Request, error) +} + +type microStreamStream struct { + stream server.Stream +} + +func (x *microStreamStream) Close() error { + return x.stream.Close() +} + +func (x *microStreamStream) SendMsg(m interface{}) error { + return x.stream.Send(m) +} + +func (x *microStreamStream) RecvMsg(m interface{}) error { + return x.stream.Recv(m) +} + +func (x *microStreamStream) Send(m *Response) error { + return x.stream.Send(m) +} + +func (x *microStreamStream) Recv() (*Request, error) { + m := new(Request) + if err := x.stream.Recv(m); err != nil { + return nil, err + } + return m, nil +} + +func (h *microHandler) Publish(ctx context.Context, in *Message, out *Message) error { + return h.MicroHandler.Publish(ctx, in, out) +} diff --git a/client/proto/client.pb.go b/client/proto/client.pb.go new file mode 100644 index 00000000..2052f077 --- /dev/null +++ b/client/proto/client.pb.go @@ -0,0 +1,388 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: micro/go-micro/client/proto/client.proto + +package go_micro_client + +import ( + context "context" + fmt "fmt" + proto "github.com/golang/protobuf/proto" + grpc "google.golang.org/grpc" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type Request struct { + Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"` + Endpoint string `protobuf:"bytes,2,opt,name=endpoint,proto3" json:"endpoint,omitempty"` + ContentType string `protobuf:"bytes,3,opt,name=content_type,json=contentType,proto3" json:"content_type,omitempty"` + Body []byte `protobuf:"bytes,4,opt,name=body,proto3" json:"body,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Request) Reset() { *m = Request{} } +func (m *Request) String() string { return proto.CompactTextString(m) } +func (*Request) ProtoMessage() {} +func (*Request) Descriptor() ([]byte, []int) { + return fileDescriptor_7d733ae29171347b, []int{0} +} + +func (m *Request) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Request.Unmarshal(m, b) +} +func (m *Request) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Request.Marshal(b, m, deterministic) +} +func (m *Request) XXX_Merge(src proto.Message) { + xxx_messageInfo_Request.Merge(m, src) +} +func (m *Request) XXX_Size() int { + return xxx_messageInfo_Request.Size(m) +} +func (m *Request) XXX_DiscardUnknown() { + xxx_messageInfo_Request.DiscardUnknown(m) +} + +var xxx_messageInfo_Request proto.InternalMessageInfo + +func (m *Request) GetService() string { + if m != nil { + return m.Service + } + return "" +} + +func (m *Request) GetEndpoint() string { + if m != nil { + return m.Endpoint + } + return "" +} + +func (m *Request) GetContentType() string { + if m != nil { + return m.ContentType + } + return "" +} + +func (m *Request) GetBody() []byte { + if m != nil { + return m.Body + } + return nil +} + +type Response struct { + Body []byte `protobuf:"bytes,1,opt,name=body,proto3" json:"body,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Response) Reset() { *m = Response{} } +func (m *Response) String() string { return proto.CompactTextString(m) } +func (*Response) ProtoMessage() {} +func (*Response) Descriptor() ([]byte, []int) { + return fileDescriptor_7d733ae29171347b, []int{1} +} + +func (m *Response) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Response.Unmarshal(m, b) +} +func (m *Response) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Response.Marshal(b, m, deterministic) +} +func (m *Response) XXX_Merge(src proto.Message) { + xxx_messageInfo_Response.Merge(m, src) +} +func (m *Response) XXX_Size() int { + return xxx_messageInfo_Response.Size(m) +} +func (m *Response) XXX_DiscardUnknown() { + xxx_messageInfo_Response.DiscardUnknown(m) +} + +var xxx_messageInfo_Response proto.InternalMessageInfo + +func (m *Response) GetBody() []byte { + if m != nil { + return m.Body + } + return nil +} + +type Message struct { + Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + ContentType string `protobuf:"bytes,2,opt,name=content_type,json=contentType,proto3" json:"content_type,omitempty"` + Body []byte `protobuf:"bytes,3,opt,name=body,proto3" json:"body,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Message) Reset() { *m = Message{} } +func (m *Message) String() string { return proto.CompactTextString(m) } +func (*Message) ProtoMessage() {} +func (*Message) Descriptor() ([]byte, []int) { + return fileDescriptor_7d733ae29171347b, []int{2} +} + +func (m *Message) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Message.Unmarshal(m, b) +} +func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Message.Marshal(b, m, deterministic) +} +func (m *Message) XXX_Merge(src proto.Message) { + xxx_messageInfo_Message.Merge(m, src) +} +func (m *Message) XXX_Size() int { + return xxx_messageInfo_Message.Size(m) +} +func (m *Message) XXX_DiscardUnknown() { + xxx_messageInfo_Message.DiscardUnknown(m) +} + +var xxx_messageInfo_Message proto.InternalMessageInfo + +func (m *Message) GetTopic() string { + if m != nil { + return m.Topic + } + return "" +} + +func (m *Message) GetContentType() string { + if m != nil { + return m.ContentType + } + return "" +} + +func (m *Message) GetBody() []byte { + if m != nil { + return m.Body + } + return nil +} + +func init() { + proto.RegisterType((*Request)(nil), "go.micro.client.Request") + proto.RegisterType((*Response)(nil), "go.micro.client.Response") + proto.RegisterType((*Message)(nil), "go.micro.client.Message") +} + +func init() { + proto.RegisterFile("micro/go-micro/client/proto/client.proto", fileDescriptor_7d733ae29171347b) +} + +var fileDescriptor_7d733ae29171347b = []byte{ + // 270 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x91, 0x3f, 0x4f, 0xc3, 0x30, + 0x10, 0xc5, 0xeb, 0xfe, 0x4b, 0x39, 0x2a, 0x21, 0x9d, 0x18, 0x4c, 0x06, 0x54, 0x32, 0x65, 0xc1, + 0x45, 0x30, 0x23, 0x86, 0xce, 0x95, 0x50, 0x40, 0xac, 0x28, 0x71, 0x4f, 0xc1, 0x52, 0x6a, 0x9b, + 0xd8, 0xad, 0x94, 0xef, 0xc8, 0x87, 0x42, 0x38, 0x29, 0x45, 0xd0, 0x2e, 0x6c, 0xf7, 0xee, 0x67, + 0xbd, 0x3b, 0xbf, 0x83, 0x74, 0xad, 0x64, 0x6d, 0xe6, 0xa5, 0xb9, 0x6e, 0x0b, 0x59, 0x29, 0xd2, + 0x7e, 0x6e, 0x6b, 0xe3, 0x77, 0x42, 0x04, 0x81, 0x67, 0xa5, 0x11, 0xe1, 0x8d, 0x68, 0xdb, 0xc9, + 0x16, 0xa2, 0x8c, 0xde, 0x37, 0xe4, 0x3c, 0x72, 0x88, 0x1c, 0xd5, 0x5b, 0x25, 0x89, 0xb3, 0x19, + 0x4b, 0x4f, 0xb2, 0x9d, 0xc4, 0x18, 0x26, 0xa4, 0x57, 0xd6, 0x28, 0xed, 0x79, 0x3f, 0xa0, 0x6f, + 0x8d, 0x57, 0x30, 0x95, 0x46, 0x7b, 0xd2, 0xfe, 0xd5, 0x37, 0x96, 0xf8, 0x20, 0xf0, 0xd3, 0xae, + 0xf7, 0xdc, 0x58, 0x42, 0x84, 0x61, 0x61, 0x56, 0x0d, 0x1f, 0xce, 0x58, 0x3a, 0xcd, 0x42, 0x9d, + 0x5c, 0xc2, 0x24, 0x23, 0x67, 0x8d, 0x76, 0x7b, 0xce, 0x7e, 0xf0, 0x17, 0x88, 0x96, 0xe4, 0x5c, + 0x5e, 0x12, 0x9e, 0xc3, 0xc8, 0x1b, 0xab, 0x64, 0xb7, 0x55, 0x2b, 0xfe, 0xcc, 0xed, 0x1f, 0x9f, + 0x3b, 0xd8, 0xfb, 0xde, 0x7e, 0x30, 0x18, 0x2d, 0xbf, 0x02, 0xc0, 0x7b, 0x18, 0x2e, 0xf2, 0xaa, + 0x42, 0x2e, 0x7e, 0x65, 0x22, 0xba, 0x40, 0xe2, 0x8b, 0x03, 0xa4, 0x5d, 0x39, 0xe9, 0xe1, 0x02, + 0xc6, 0x4f, 0xbe, 0xa6, 0x7c, 0xfd, 0x4f, 0x83, 0x94, 0xdd, 0x30, 0x7c, 0x80, 0xe8, 0x71, 0x53, + 0x54, 0xca, 0xbd, 0x1d, 0x70, 0xe9, 0xfe, 0x1f, 0x1f, 0x25, 0x49, 0xaf, 0x18, 0x87, 0xb3, 0xde, + 0x7d, 0x06, 0x00, 0x00, 0xff, 0xff, 0xd3, 0x63, 0x94, 0x1a, 0x02, 0x02, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// MicroClient is the client API for Micro service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type MicroClient interface { + // Call allows a single request to be made + Call(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error) + // Stream is a bidirectional stream + Stream(ctx context.Context, opts ...grpc.CallOption) (Micro_StreamClient, error) + // Publish publishes a message and returns an empty Message + Publish(ctx context.Context, in *Message, opts ...grpc.CallOption) (*Message, error) +} + +type microClient struct { + cc *grpc.ClientConn +} + +func NewMicroClient(cc *grpc.ClientConn) MicroClient { + return µClient{cc} +} + +func (c *microClient) Call(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error) { + out := new(Response) + err := c.cc.Invoke(ctx, "/go.micro.client.Micro/Call", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *microClient) Stream(ctx context.Context, opts ...grpc.CallOption) (Micro_StreamClient, error) { + stream, err := c.cc.NewStream(ctx, &_Micro_serviceDesc.Streams[0], "/go.micro.client.Micro/Stream", opts...) + if err != nil { + return nil, err + } + x := µStreamClient{stream} + return x, nil +} + +type Micro_StreamClient interface { + Send(*Request) error + Recv() (*Response, error) + grpc.ClientStream +} + +type microStreamClient struct { + grpc.ClientStream +} + +func (x *microStreamClient) Send(m *Request) error { + return x.ClientStream.SendMsg(m) +} + +func (x *microStreamClient) Recv() (*Response, error) { + m := new(Response) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *microClient) Publish(ctx context.Context, in *Message, opts ...grpc.CallOption) (*Message, error) { + out := new(Message) + err := c.cc.Invoke(ctx, "/go.micro.client.Micro/Publish", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// MicroServer is the server API for Micro service. +type MicroServer interface { + // Call allows a single request to be made + Call(context.Context, *Request) (*Response, error) + // Stream is a bidirectional stream + Stream(Micro_StreamServer) error + // Publish publishes a message and returns an empty Message + Publish(context.Context, *Message) (*Message, error) +} + +func RegisterMicroServer(s *grpc.Server, srv MicroServer) { + s.RegisterService(&_Micro_serviceDesc, srv) +} + +func _Micro_Call_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Request) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MicroServer).Call(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/go.micro.client.Micro/Call", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MicroServer).Call(ctx, req.(*Request)) + } + return interceptor(ctx, in, info, handler) +} + +func _Micro_Stream_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(MicroServer).Stream(µStreamServer{stream}) +} + +type Micro_StreamServer interface { + Send(*Response) error + Recv() (*Request, error) + grpc.ServerStream +} + +type microStreamServer struct { + grpc.ServerStream +} + +func (x *microStreamServer) Send(m *Response) error { + return x.ServerStream.SendMsg(m) +} + +func (x *microStreamServer) Recv() (*Request, error) { + m := new(Request) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func _Micro_Publish_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Message) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MicroServer).Publish(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/go.micro.client.Micro/Publish", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MicroServer).Publish(ctx, req.(*Message)) + } + return interceptor(ctx, in, info, handler) +} + +var _Micro_serviceDesc = grpc.ServiceDesc{ + ServiceName: "go.micro.client.Micro", + HandlerType: (*MicroServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Call", + Handler: _Micro_Call_Handler, + }, + { + MethodName: "Publish", + Handler: _Micro_Publish_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "Stream", + Handler: _Micro_Stream_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "micro/go-micro/client/proto/client.proto", +} diff --git a/client/proto/client.proto b/client/proto/client.proto new file mode 100644 index 00000000..b855fa22 --- /dev/null +++ b/client/proto/client.proto @@ -0,0 +1,30 @@ +syntax = "proto3"; + +package go.micro.client; + +// Micro is the micro client interface +service Micro { + // Call allows a single request to be made + rpc Call(Request) returns (Response) {}; + // Stream is a bidirectional stream + rpc Stream(stream Request) returns (stream Response) {}; + // Publish publishes a message and returns an empty Message + rpc Publish(Message) returns (Message) {}; +} + +message Request { + string service = 1; + string endpoint = 2; + string content_type = 3; + bytes body = 4; +} + +message Response { + bytes body = 1; +} + +message Message { + string topic = 1; + string content_type = 2; + bytes body = 3; +} diff --git a/proxy/grpc/grpc.go b/proxy/grpc/grpc.go index 89f71f8b..afb2453c 100644 --- a/proxy/grpc/grpc.go +++ b/proxy/grpc/grpc.go @@ -9,7 +9,6 @@ import ( "github.com/micro/go-micro/client" "github.com/micro/go-micro/client/grpc" "github.com/micro/go-micro/codec" - "github.com/micro/go-micro/codec/bytes" "github.com/micro/go-micro/config/options" "github.com/micro/go-micro/proxy" "github.com/micro/go-micro/server" @@ -86,14 +85,8 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server } } - // read initial request - body, err := req.Read() - if err != nil { - return err - } - // create new request with raw bytes body - creq := p.Client.NewRequest(service, endpoint, &bytes.Frame{body}, client.WithContentType(req.ContentType())) + creq := p.Client.NewRequest(service, endpoint, nil, client.WithContentType(req.ContentType())) // create new stream stream, err := p.Client.Stream(ctx, creq, opts...) diff --git a/proxy/mucp/mucp.go b/proxy/mucp/mucp.go index ea239d45..cca97c90 100644 --- a/proxy/mucp/mucp.go +++ b/proxy/mucp/mucp.go @@ -39,6 +39,7 @@ func readLoop(r server.Request, s client.Stream) error { if err == io.EOF { return nil } + if err != nil { return err } @@ -50,6 +51,7 @@ func readLoop(r server.Request, s client.Stream) error { Header: hdr, Body: body, } + // write the raw request err = req.Codec().Write(msg, nil) if err == io.EOF { diff --git a/server/grpc/codec.go b/server/grpc/codec.go index 50a96ef4..cfa97f92 100644 --- a/server/grpc/codec.go +++ b/server/grpc/codec.go @@ -3,17 +3,22 @@ package grpc import ( "encoding/json" "fmt" + "strings" "github.com/golang/protobuf/proto" "github.com/micro/go-micro/codec" + "github.com/micro/go-micro/codec/bytes" "github.com/micro/go-micro/codec/jsonrpc" "github.com/micro/go-micro/codec/protorpc" + "google.golang.org/grpc" "google.golang.org/grpc/encoding" + "google.golang.org/grpc/metadata" ) type jsonCodec struct{} type bytesCodec struct{} type protoCodec struct{} +type wrapCodec struct{ encoding.Codec } var ( defaultGRPCCodecs = map[string]encoding.Codec{ @@ -36,6 +41,27 @@ var ( } ) +func (w wrapCodec) String() string { + return w.Codec.Name() +} + +func (w wrapCodec) Marshal(v interface{}) ([]byte, error) { + b, ok := v.(*bytes.Frame) + if ok { + return b.Data, nil + } + return w.Codec.Marshal(v) +} + +func (w wrapCodec) Unmarshal(data []byte, v interface{}) error { + b, ok := v.(*bytes.Frame) + if ok { + b.Data = data + return nil + } + return w.Codec.Unmarshal(data, v) +} + func (protoCodec) Marshal(v interface{}) ([]byte, error) { return proto.Marshal(v.(proto.Message)) } @@ -80,3 +106,61 @@ func (bytesCodec) Unmarshal(data []byte, v interface{}) error { func (bytesCodec) Name() string { return "bytes" } + +type grpcCodec struct { + // headers + id string + target string + method string + endpoint string + + s grpc.ServerStream + c encoding.Codec +} + +func (g *grpcCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error { + md, _ := metadata.FromIncomingContext(g.s.Context()) + if m == nil { + m = new(codec.Message) + } + if m.Header == nil { + m.Header = make(map[string]string) + } + for k, v := range md { + m.Header[k] = strings.Join(v, ",") + } + m.Id = g.id + m.Target = g.target + m.Method = g.method + m.Endpoint = g.endpoint + return nil +} + +func (g *grpcCodec) ReadBody(v interface{}) error { + // caller has requested a frame + if f, ok := v.(*bytes.Frame); ok { + return g.s.RecvMsg(f) + } + return g.s.RecvMsg(v) +} + +func (g *grpcCodec) Write(m *codec.Message, v interface{}) error { + // if we don't have a body + if v != nil { + b, err := g.c.Marshal(v) + if err != nil { + return err + } + m.Body = b + } + // write the body using the framing codec + return g.s.SendMsg(&bytes.Frame{m.Body}) +} + +func (g *grpcCodec) Close() error { + return nil +} + +func (g *grpcCodec) String() string { + return g.c.Name() +} diff --git a/server/grpc/grpc.go b/server/grpc/grpc.go index bde2e4c6..2f6e1d40 100644 --- a/server/grpc/grpc.go +++ b/server/grpc/grpc.go @@ -56,8 +56,9 @@ type grpcServer struct { } func init() { - encoding.RegisterCodec(jsonCodec{}) - encoding.RegisterCodec(bytesCodec{}) + encoding.RegisterCodec(wrapCodec{protoCodec{}}) + encoding.RegisterCodec(wrapCodec{jsonCodec{}}) + encoding.RegisterCodec(wrapCodec{bytesCodec{}}) } func newGRPCServer(opts ...server.Option) server.Server { @@ -211,14 +212,30 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) error { // process via router if g.opts.Router != nil { - // create a client.Request - request := &rpcRequest{ - service: g.opts.Name, - contentType: ct, - method: fmt.Sprintf("%s.%s", serviceName, methodName), + cc, err := g.newGRPCCodec(ct) + if err != nil { + return errors.InternalServerError("go.micro.server", err.Error()) + } + codec := &grpcCodec{ + method: fmt.Sprintf("%s.%s", serviceName, methodName), + endpoint: fmt.Sprintf("%s.%s", serviceName, methodName), + target: g.opts.Name, + s: stream, + c: cc, } - response := &rpcResponse{} + // create a client.Request + request := &rpcRequest{ + service: mgrpc.ServiceFromMethod(fullMethod), + contentType: ct, + method: fmt.Sprintf("%s.%s", serviceName, methodName), + codec: codec, + } + + response := &rpcResponse{ + header: make(map[string]string), + codec: codec, + } // create a wrapped function handler := func(ctx context.Context, req server.Request, rsp interface{}) error { diff --git a/server/grpc/request.go b/server/grpc/request.go index 951c1a19..617b9a7d 100644 --- a/server/grpc/request.go +++ b/server/grpc/request.go @@ -2,6 +2,7 @@ package grpc import ( "github.com/micro/go-micro/codec" + "github.com/micro/go-micro/codec/bytes" ) type rpcRequest struct { @@ -46,7 +47,11 @@ func (r *rpcRequest) Header() map[string]string { } func (r *rpcRequest) Read() ([]byte, error) { - return r.body, nil + f := &bytes.Frame{} + if err := r.codec.ReadBody(f); err != nil { + return nil, err + } + return f.Data, nil } func (r *rpcRequest) Stream() bool { diff --git a/server/grpc/response.go b/server/grpc/response.go index 451b1f4e..f13ad89c 100644 --- a/server/grpc/response.go +++ b/server/grpc/response.go @@ -1,15 +1,11 @@ package grpc import ( - "net/http" - "github.com/micro/go-micro/codec" - "github.com/micro/go-micro/transport" ) type rpcResponse struct { header map[string]string - socket transport.Socket codec codec.Codec } @@ -24,12 +20,8 @@ func (r *rpcResponse) WriteHeader(hdr map[string]string) { } func (r *rpcResponse) Write(b []byte) error { - if _, ok := r.header["Content-Type"]; !ok { - r.header["Content-Type"] = http.DetectContentType(b) - } - - return r.socket.Send(&transport.Message{ + return r.codec.Write(&codec.Message{ Header: r.header, Body: b, - }) + }, nil) } diff --git a/util/grpc/grpc.go b/util/grpc/grpc.go index b06a0673..59c5a969 100644 --- a/util/grpc/grpc.go +++ b/util/grpc/grpc.go @@ -38,3 +38,20 @@ func ServiceMethod(m string) (string, string, error) { return parts[0], parts[1], nil } + +// ServiceFromMethod returns the service +// /service.Foo/Bar => service +func ServiceFromMethod(m string) string { + if len(m) == 0 { + return m + } + if m[0] != '/' { + return m + } + parts := strings.Split(m, "/") + if len(parts) < 3 { + return m + } + parts = strings.Split(parts[1], ".") + return strings.Join(parts[:len(parts)-1], ".") +}