Merge pull request #695 from micro/proxy-link

Support multiple clients in the proxy as Links
This commit is contained in:
Asim Aslam 2019-08-23 14:48:49 +01:00 committed by GitHub
commit 1a32e3a11d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 255 additions and 123 deletions

View File

@ -31,37 +31,37 @@ var _ context.Context
var _ client.Option var _ client.Option
var _ server.Option var _ server.Option
// Client API for Micro service // Client API for Client service
type MicroService interface { type ClientService interface {
// Call allows a single request to be made // Call allows a single request to be made
Call(ctx context.Context, in *Request, opts ...client.CallOption) (*Response, error) Call(ctx context.Context, in *Request, opts ...client.CallOption) (*Response, error)
// Stream is a bidirectional stream // Stream is a bidirectional stream
Stream(ctx context.Context, opts ...client.CallOption) (Micro_StreamService, error) Stream(ctx context.Context, opts ...client.CallOption) (Client_StreamService, error)
// Publish publishes a message and returns an empty Message // Publish publishes a message and returns an empty Message
Publish(ctx context.Context, in *Message, opts ...client.CallOption) (*Message, error) Publish(ctx context.Context, in *Message, opts ...client.CallOption) (*Message, error)
} }
type microService struct { type clientService struct {
c client.Client c client.Client
name string name string
} }
func NewMicroService(name string, c client.Client) MicroService { func NewClientService(name string, c client.Client) ClientService {
if c == nil { if c == nil {
c = client.NewClient() c = client.NewClient()
} }
if len(name) == 0 { if len(name) == 0 {
name = "go.micro.client" name = "go.micro.client"
} }
return &microService{ return &clientService{
c: c, c: c,
name: name, name: name,
} }
} }
func (c *microService) Call(ctx context.Context, in *Request, opts ...client.CallOption) (*Response, error) { func (c *clientService) Call(ctx context.Context, in *Request, opts ...client.CallOption) (*Response, error) {
req := c.c.NewRequest(c.name, "Micro.Call", in) req := c.c.NewRequest(c.name, "Client.Call", in)
out := new(Response) out := new(Response)
err := c.c.Call(ctx, req, out, opts...) err := c.c.Call(ctx, req, out, opts...)
if err != nil { if err != nil {
@ -70,16 +70,16 @@ func (c *microService) Call(ctx context.Context, in *Request, opts ...client.Cal
return out, nil return out, nil
} }
func (c *microService) Stream(ctx context.Context, opts ...client.CallOption) (Micro_StreamService, error) { func (c *clientService) Stream(ctx context.Context, opts ...client.CallOption) (Client_StreamService, error) {
req := c.c.NewRequest(c.name, "Micro.Stream", &Request{}) req := c.c.NewRequest(c.name, "Client.Stream", &Request{})
stream, err := c.c.Stream(ctx, req, opts...) stream, err := c.c.Stream(ctx, req, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &microServiceStream{stream}, nil return &clientServiceStream{stream}, nil
} }
type Micro_StreamService interface { type Client_StreamService interface {
SendMsg(interface{}) error SendMsg(interface{}) error
RecvMsg(interface{}) error RecvMsg(interface{}) error
Close() error Close() error
@ -87,27 +87,27 @@ type Micro_StreamService interface {
Recv() (*Response, error) Recv() (*Response, error)
} }
type microServiceStream struct { type clientServiceStream struct {
stream client.Stream stream client.Stream
} }
func (x *microServiceStream) Close() error { func (x *clientServiceStream) Close() error {
return x.stream.Close() return x.stream.Close()
} }
func (x *microServiceStream) SendMsg(m interface{}) error { func (x *clientServiceStream) SendMsg(m interface{}) error {
return x.stream.Send(m) return x.stream.Send(m)
} }
func (x *microServiceStream) RecvMsg(m interface{}) error { func (x *clientServiceStream) RecvMsg(m interface{}) error {
return x.stream.Recv(m) return x.stream.Recv(m)
} }
func (x *microServiceStream) Send(m *Request) error { func (x *clientServiceStream) Send(m *Request) error {
return x.stream.Send(m) return x.stream.Send(m)
} }
func (x *microServiceStream) Recv() (*Response, error) { func (x *clientServiceStream) Recv() (*Response, error) {
m := new(Response) m := new(Response)
err := x.stream.Recv(m) err := x.stream.Recv(m)
if err != nil { if err != nil {
@ -116,8 +116,8 @@ func (x *microServiceStream) Recv() (*Response, error) {
return m, nil return m, nil
} }
func (c *microService) Publish(ctx context.Context, in *Message, opts ...client.CallOption) (*Message, error) { func (c *clientService) Publish(ctx context.Context, in *Message, opts ...client.CallOption) (*Message, error) {
req := c.c.NewRequest(c.name, "Micro.Publish", in) req := c.c.NewRequest(c.name, "Client.Publish", in)
out := new(Message) out := new(Message)
err := c.c.Call(ctx, req, out, opts...) err := c.c.Call(ctx, req, out, opts...)
if err != nil { if err != nil {
@ -126,43 +126,43 @@ func (c *microService) Publish(ctx context.Context, in *Message, opts ...client.
return out, nil return out, nil
} }
// Server API for Micro service // Server API for Client service
type MicroHandler interface { type ClientHandler interface {
// Call allows a single request to be made // Call allows a single request to be made
Call(context.Context, *Request, *Response) error Call(context.Context, *Request, *Response) error
// Stream is a bidirectional stream // Stream is a bidirectional stream
Stream(context.Context, Micro_StreamStream) error Stream(context.Context, Client_StreamStream) error
// Publish publishes a message and returns an empty Message // Publish publishes a message and returns an empty Message
Publish(context.Context, *Message, *Message) error Publish(context.Context, *Message, *Message) error
} }
func RegisterMicroHandler(s server.Server, hdlr MicroHandler, opts ...server.HandlerOption) error { func RegisterClientHandler(s server.Server, hdlr ClientHandler, opts ...server.HandlerOption) error {
type micro interface { type client interface {
Call(ctx context.Context, in *Request, out *Response) error Call(ctx context.Context, in *Request, out *Response) error
Stream(ctx context.Context, stream server.Stream) error Stream(ctx context.Context, stream server.Stream) error
Publish(ctx context.Context, in *Message, out *Message) error Publish(ctx context.Context, in *Message, out *Message) error
} }
type Micro struct { type Client struct {
micro client
} }
h := &microHandler{hdlr} h := &clientHandler{hdlr}
return s.Handle(s.NewHandler(&Micro{h}, opts...)) return s.Handle(s.NewHandler(&Client{h}, opts...))
} }
type microHandler struct { type clientHandler struct {
MicroHandler ClientHandler
} }
func (h *microHandler) Call(ctx context.Context, in *Request, out *Response) error { func (h *clientHandler) Call(ctx context.Context, in *Request, out *Response) error {
return h.MicroHandler.Call(ctx, in, out) return h.ClientHandler.Call(ctx, in, out)
} }
func (h *microHandler) Stream(ctx context.Context, stream server.Stream) error { func (h *clientHandler) Stream(ctx context.Context, stream server.Stream) error {
return h.MicroHandler.Stream(ctx, &microStreamStream{stream}) return h.ClientHandler.Stream(ctx, &clientStreamStream{stream})
} }
type Micro_StreamStream interface { type Client_StreamStream interface {
SendMsg(interface{}) error SendMsg(interface{}) error
RecvMsg(interface{}) error RecvMsg(interface{}) error
Close() error Close() error
@ -170,27 +170,27 @@ type Micro_StreamStream interface {
Recv() (*Request, error) Recv() (*Request, error)
} }
type microStreamStream struct { type clientStreamStream struct {
stream server.Stream stream server.Stream
} }
func (x *microStreamStream) Close() error { func (x *clientStreamStream) Close() error {
return x.stream.Close() return x.stream.Close()
} }
func (x *microStreamStream) SendMsg(m interface{}) error { func (x *clientStreamStream) SendMsg(m interface{}) error {
return x.stream.Send(m) return x.stream.Send(m)
} }
func (x *microStreamStream) RecvMsg(m interface{}) error { func (x *clientStreamStream) RecvMsg(m interface{}) error {
return x.stream.Recv(m) return x.stream.Recv(m)
} }
func (x *microStreamStream) Send(m *Response) error { func (x *clientStreamStream) Send(m *Response) error {
return x.stream.Send(m) return x.stream.Send(m)
} }
func (x *microStreamStream) Recv() (*Request, error) { func (x *clientStreamStream) Recv() (*Request, error) {
m := new(Request) m := new(Request)
if err := x.stream.Recv(m); err != nil { if err := x.stream.Recv(m); err != nil {
return nil, err return nil, err
@ -198,6 +198,6 @@ func (x *microStreamStream) Recv() (*Request, error) {
return m, nil return m, nil
} }
func (h *microHandler) Publish(ctx context.Context, in *Message, out *Message) error { func (h *clientHandler) Publish(ctx context.Context, in *Message, out *Message) error {
return h.MicroHandler.Publish(ctx, in, out) return h.ClientHandler.Publish(ctx, in, out)
} }

View File

@ -191,23 +191,23 @@ func init() {
var fileDescriptor_7d733ae29171347b = []byte{ var fileDescriptor_7d733ae29171347b = []byte{
// 270 bytes of a gzipped FileDescriptorProto // 270 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x91, 0x3f, 0x4f, 0xc3, 0x30, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x91, 0x41, 0x4b, 0xc3, 0x40,
0x10, 0xc5, 0xeb, 0xfe, 0x4b, 0x39, 0x2a, 0x21, 0x9d, 0x18, 0x4c, 0x06, 0x54, 0x32, 0x65, 0xc1, 0x10, 0x85, 0xbb, 0x6d, 0x4c, 0xea, 0x58, 0x10, 0x06, 0x0f, 0x6b, 0x0e, 0x52, 0x73, 0xca, 0xc5,
0x45, 0x30, 0x23, 0x86, 0xce, 0x95, 0x50, 0x40, 0xac, 0x28, 0x71, 0x4f, 0xc1, 0x52, 0x6a, 0x9b, 0x54, 0xf4, 0x2c, 0x1e, 0x72, 0x16, 0x24, 0x8a, 0x57, 0x49, 0xb6, 0x43, 0x5c, 0x48, 0x77, 0xd7,
0xd8, 0xad, 0x94, 0xef, 0xc8, 0x87, 0x42, 0x38, 0x29, 0x45, 0xd0, 0x2e, 0x6c, 0xf7, 0xee, 0x67, 0xec, 0xb6, 0x90, 0x1f, 0xe9, 0x7f, 0x12, 0x36, 0xa9, 0x15, 0x6d, 0x2f, 0xbd, 0xcd, 0x9b, 0x6f,
0xbd, 0x3b, 0xbf, 0x83, 0x74, 0xad, 0x64, 0x6d, 0xe6, 0xa5, 0xb9, 0x6e, 0x0b, 0x59, 0x29, 0xd2, 0x79, 0x33, 0xfb, 0x06, 0xd2, 0x95, 0x14, 0xad, 0x5e, 0xd4, 0xfa, 0xa6, 0x2f, 0x44, 0x23, 0x49,
0x7e, 0x6e, 0x6b, 0xe3, 0x77, 0x42, 0x04, 0x81, 0x67, 0xa5, 0x11, 0xe1, 0x8d, 0x68, 0xdb, 0xc9, 0xb9, 0x85, 0x69, 0xb5, 0xdb, 0x8a, 0xcc, 0x0b, 0x3c, 0xaf, 0x75, 0xe6, 0xdf, 0x64, 0x7d, 0x3b,
0x16, 0xa2, 0x8c, 0xde, 0x37, 0xe4, 0x3c, 0x72, 0x88, 0x1c, 0xd5, 0x5b, 0x25, 0x89, 0xb3, 0x19, 0xd9, 0x40, 0x54, 0xd0, 0xe7, 0x9a, 0xac, 0x43, 0x0e, 0x91, 0xa5, 0x76, 0x23, 0x05, 0x71, 0x36,
0x4b, 0x4f, 0xb2, 0x9d, 0xc4, 0x18, 0x26, 0xa4, 0x57, 0xd6, 0x28, 0xed, 0x79, 0x3f, 0xa0, 0x6f, 0x67, 0xe9, 0x69, 0xb1, 0x95, 0x18, 0xc3, 0x94, 0xd4, 0xd2, 0x68, 0xa9, 0x1c, 0x1f, 0x7b, 0xf4,
0x8d, 0x57, 0x30, 0x95, 0x46, 0x7b, 0xd2, 0xfe, 0xd5, 0x37, 0x96, 0xf8, 0x20, 0xf0, 0xd3, 0xae, 0xa3, 0xf1, 0x1a, 0x66, 0x42, 0x2b, 0x47, 0xca, 0xbd, 0xbb, 0xce, 0x10, 0x9f, 0x78, 0x7e, 0x36,
0xf7, 0xdc, 0x58, 0x42, 0x84, 0x61, 0x61, 0x56, 0x0d, 0x1f, 0xce, 0x58, 0x3a, 0xcd, 0x42, 0x9d, 0xf4, 0x5e, 0x3b, 0x43, 0x88, 0x10, 0x54, 0x7a, 0xd9, 0xf1, 0x60, 0xce, 0xd2, 0x59, 0xe1, 0xeb,
0x5c, 0xc2, 0x24, 0x23, 0x67, 0x8d, 0x76, 0x7b, 0xce, 0x7e, 0xf0, 0x17, 0x88, 0x96, 0xe4, 0x5c, 0xe4, 0x0a, 0xa6, 0x05, 0x59, 0xa3, 0x95, 0xdd, 0x71, 0xf6, 0x8b, 0xbf, 0x41, 0xf4, 0x44, 0xd6,
0x5e, 0x12, 0x9e, 0xc3, 0xc8, 0x1b, 0xab, 0x64, 0xb7, 0x55, 0x2b, 0xfe, 0xcc, 0xed, 0x1f, 0x9f, 0x96, 0x35, 0xe1, 0x05, 0x9c, 0x38, 0x6d, 0xa4, 0x18, 0xb6, 0xea, 0xc5, 0xbf, 0xb9, 0xe3, 0xc3,
0x3b, 0xd8, 0xfb, 0xde, 0x7e, 0x30, 0x18, 0x2d, 0xbf, 0x02, 0xc0, 0x7b, 0x18, 0x2e, 0xf2, 0xaa, 0x73, 0x27, 0x3b, 0xdf, 0xbb, 0x2f, 0x06, 0x61, 0xee, 0xbf, 0x8e, 0x0f, 0x10, 0xe4, 0x65, 0xd3,
0x42, 0x2e, 0x7e, 0x65, 0x22, 0xba, 0x40, 0xe2, 0x8b, 0x03, 0xa4, 0x5d, 0x39, 0xe9, 0xe1, 0x02, 0x20, 0xcf, 0xfe, 0x84, 0x92, 0x0d, 0x89, 0xc4, 0x97, 0x7b, 0x48, 0xbf, 0x73, 0x32, 0xc2, 0x1c,
0xc6, 0x4f, 0xbe, 0xa6, 0x7c, 0xfd, 0x4f, 0x83, 0x94, 0xdd, 0x30, 0x7c, 0x80, 0xe8, 0x71, 0x53, 0xc2, 0x17, 0xd7, 0x52, 0xb9, 0x3a, 0xd2, 0x20, 0x65, 0xb7, 0x0c, 0x1f, 0x21, 0x7a, 0x5e, 0x57,
0x54, 0xca, 0xbd, 0x1d, 0x70, 0xe9, 0xfe, 0x1f, 0x1f, 0x25, 0x49, 0xaf, 0x18, 0x87, 0xb3, 0xde, 0x8d, 0xb4, 0x1f, 0x7b, 0x5c, 0x86, 0x00, 0xe2, 0x83, 0x24, 0x19, 0x55, 0xa1, 0xbf, 0xeb, 0xfd,
0x7d, 0x06, 0x00, 0x00, 0xff, 0xff, 0xd3, 0x63, 0x94, 0x1a, 0x02, 0x02, 0x00, 0x00, 0x77, 0x00, 0x00, 0x00, 0xff, 0xff, 0x0a, 0x76, 0x1f, 0x51, 0x03, 0x02, 0x00, 0x00,
} }
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.
@ -218,59 +218,59 @@ var _ grpc.ClientConn
// is compatible with the grpc package it is being compiled against. // is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4 const _ = grpc.SupportPackageIsVersion4
// MicroClient is the client API for Micro service. // ClientClient is the client API for Client service.
// //
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type MicroClient interface { type ClientClient interface {
// Call allows a single request to be made // Call allows a single request to be made
Call(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error) Call(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error)
// Stream is a bidirectional stream // Stream is a bidirectional stream
Stream(ctx context.Context, opts ...grpc.CallOption) (Micro_StreamClient, error) Stream(ctx context.Context, opts ...grpc.CallOption) (Client_StreamClient, error)
// Publish publishes a message and returns an empty Message // Publish publishes a message and returns an empty Message
Publish(ctx context.Context, in *Message, opts ...grpc.CallOption) (*Message, error) Publish(ctx context.Context, in *Message, opts ...grpc.CallOption) (*Message, error)
} }
type microClient struct { type clientClient struct {
cc *grpc.ClientConn cc *grpc.ClientConn
} }
func NewMicroClient(cc *grpc.ClientConn) MicroClient { func NewClientClient(cc *grpc.ClientConn) ClientClient {
return &microClient{cc} return &clientClient{cc}
} }
func (c *microClient) Call(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error) { func (c *clientClient) Call(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error) {
out := new(Response) out := new(Response)
err := c.cc.Invoke(ctx, "/go.micro.client.Micro/Call", in, out, opts...) err := c.cc.Invoke(ctx, "/go.micro.client.Client/Call", in, out, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return out, nil return out, nil
} }
func (c *microClient) Stream(ctx context.Context, opts ...grpc.CallOption) (Micro_StreamClient, error) { func (c *clientClient) Stream(ctx context.Context, opts ...grpc.CallOption) (Client_StreamClient, error) {
stream, err := c.cc.NewStream(ctx, &_Micro_serviceDesc.Streams[0], "/go.micro.client.Micro/Stream", opts...) stream, err := c.cc.NewStream(ctx, &_Client_serviceDesc.Streams[0], "/go.micro.client.Client/Stream", opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
x := &microStreamClient{stream} x := &clientStreamClient{stream}
return x, nil return x, nil
} }
type Micro_StreamClient interface { type Client_StreamClient interface {
Send(*Request) error Send(*Request) error
Recv() (*Response, error) Recv() (*Response, error)
grpc.ClientStream grpc.ClientStream
} }
type microStreamClient struct { type clientStreamClient struct {
grpc.ClientStream grpc.ClientStream
} }
func (x *microStreamClient) Send(m *Request) error { func (x *clientStreamClient) Send(m *Request) error {
return x.ClientStream.SendMsg(m) return x.ClientStream.SendMsg(m)
} }
func (x *microStreamClient) Recv() (*Response, error) { func (x *clientStreamClient) Recv() (*Response, error) {
m := new(Response) m := new(Response)
if err := x.ClientStream.RecvMsg(m); err != nil { if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err return nil, err
@ -278,66 +278,66 @@ func (x *microStreamClient) Recv() (*Response, error) {
return m, nil return m, nil
} }
func (c *microClient) Publish(ctx context.Context, in *Message, opts ...grpc.CallOption) (*Message, error) { func (c *clientClient) Publish(ctx context.Context, in *Message, opts ...grpc.CallOption) (*Message, error) {
out := new(Message) out := new(Message)
err := c.cc.Invoke(ctx, "/go.micro.client.Micro/Publish", in, out, opts...) err := c.cc.Invoke(ctx, "/go.micro.client.Client/Publish", in, out, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return out, nil return out, nil
} }
// MicroServer is the server API for Micro service. // ClientServer is the server API for Client service.
type MicroServer interface { type ClientServer interface {
// Call allows a single request to be made // Call allows a single request to be made
Call(context.Context, *Request) (*Response, error) Call(context.Context, *Request) (*Response, error)
// Stream is a bidirectional stream // Stream is a bidirectional stream
Stream(Micro_StreamServer) error Stream(Client_StreamServer) error
// Publish publishes a message and returns an empty Message // Publish publishes a message and returns an empty Message
Publish(context.Context, *Message) (*Message, error) Publish(context.Context, *Message) (*Message, error)
} }
func RegisterMicroServer(s *grpc.Server, srv MicroServer) { func RegisterClientServer(s *grpc.Server, srv ClientServer) {
s.RegisterService(&_Micro_serviceDesc, srv) s.RegisterService(&_Client_serviceDesc, srv)
} }
func _Micro_Call_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { func _Client_Call_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Request) in := new(Request)
if err := dec(in); err != nil { if err := dec(in); err != nil {
return nil, err return nil, err
} }
if interceptor == nil { if interceptor == nil {
return srv.(MicroServer).Call(ctx, in) return srv.(ClientServer).Call(ctx, in)
} }
info := &grpc.UnaryServerInfo{ info := &grpc.UnaryServerInfo{
Server: srv, Server: srv,
FullMethod: "/go.micro.client.Micro/Call", FullMethod: "/go.micro.client.Client/Call",
} }
handler := func(ctx context.Context, req interface{}) (interface{}, error) { handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(MicroServer).Call(ctx, req.(*Request)) return srv.(ClientServer).Call(ctx, req.(*Request))
} }
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
func _Micro_Stream_Handler(srv interface{}, stream grpc.ServerStream) error { func _Client_Stream_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(MicroServer).Stream(&microStreamServer{stream}) return srv.(ClientServer).Stream(&clientStreamServer{stream})
} }
type Micro_StreamServer interface { type Client_StreamServer interface {
Send(*Response) error Send(*Response) error
Recv() (*Request, error) Recv() (*Request, error)
grpc.ServerStream grpc.ServerStream
} }
type microStreamServer struct { type clientStreamServer struct {
grpc.ServerStream grpc.ServerStream
} }
func (x *microStreamServer) Send(m *Response) error { func (x *clientStreamServer) Send(m *Response) error {
return x.ServerStream.SendMsg(m) return x.ServerStream.SendMsg(m)
} }
func (x *microStreamServer) Recv() (*Request, error) { func (x *clientStreamServer) Recv() (*Request, error) {
m := new(Request) m := new(Request)
if err := x.ServerStream.RecvMsg(m); err != nil { if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err return nil, err
@ -345,41 +345,41 @@ func (x *microStreamServer) Recv() (*Request, error) {
return m, nil return m, nil
} }
func _Micro_Publish_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { func _Client_Publish_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Message) in := new(Message)
if err := dec(in); err != nil { if err := dec(in); err != nil {
return nil, err return nil, err
} }
if interceptor == nil { if interceptor == nil {
return srv.(MicroServer).Publish(ctx, in) return srv.(ClientServer).Publish(ctx, in)
} }
info := &grpc.UnaryServerInfo{ info := &grpc.UnaryServerInfo{
Server: srv, Server: srv,
FullMethod: "/go.micro.client.Micro/Publish", FullMethod: "/go.micro.client.Client/Publish",
} }
handler := func(ctx context.Context, req interface{}) (interface{}, error) { handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(MicroServer).Publish(ctx, req.(*Message)) return srv.(ClientServer).Publish(ctx, req.(*Message))
} }
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
var _Micro_serviceDesc = grpc.ServiceDesc{ var _Client_serviceDesc = grpc.ServiceDesc{
ServiceName: "go.micro.client.Micro", ServiceName: "go.micro.client.Client",
HandlerType: (*MicroServer)(nil), HandlerType: (*ClientServer)(nil),
Methods: []grpc.MethodDesc{ Methods: []grpc.MethodDesc{
{ {
MethodName: "Call", MethodName: "Call",
Handler: _Micro_Call_Handler, Handler: _Client_Call_Handler,
}, },
{ {
MethodName: "Publish", MethodName: "Publish",
Handler: _Micro_Publish_Handler, Handler: _Client_Publish_Handler,
}, },
}, },
Streams: []grpc.StreamDesc{ Streams: []grpc.StreamDesc{
{ {
StreamName: "Stream", StreamName: "Stream",
Handler: _Micro_Stream_Handler, Handler: _Client_Stream_Handler,
ServerStreams: true, ServerStreams: true,
ClientStreams: true, ClientStreams: true,
}, },

View File

@ -2,8 +2,8 @@ syntax = "proto3";
package go.micro.client; package go.micro.client;
// Micro is the micro client interface // Client is the micro client interface
service Micro { service Client {
// Call allows a single request to be made // Call allows a single request to be made
rpc Call(Request) returns (Response) {}; rpc Call(Request) returns (Response) {};
// Stream is a bidirectional stream // Stream is a bidirectional stream

View File

@ -10,6 +10,7 @@ import (
"github.com/micro/go-micro/client/grpc" "github.com/micro/go-micro/client/grpc"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
"github.com/micro/go-micro/config/options" "github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/proxy" "github.com/micro/go-micro/proxy"
"github.com/micro/go-micro/server" "github.com/micro/go-micro/server"
) )
@ -61,6 +62,10 @@ func readLoop(r server.Request, s client.Stream) error {
} }
} }
func (p *Proxy) SendRequest(ctx context.Context, req client.Request, rsp client.Response) error {
return errors.InternalServerError("go.micro.proxy.grpc", "SendRequest is unsupported")
}
// ServeRequest honours the server.Proxy interface // ServeRequest honours the server.Proxy interface
func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error { func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error {
// set default client // set default client

View File

@ -10,6 +10,7 @@ import (
"net/url" "net/url"
"path" "path"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/config/options" "github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/errors" "github.com/micro/go-micro/errors"
"github.com/micro/go-micro/proxy" "github.com/micro/go-micro/proxy"
@ -44,6 +45,10 @@ func getEndpoint(hdr map[string]string) string {
return "" return ""
} }
func (p *Proxy) SendRequest(ctx context.Context, req client.Request, rsp client.Response) error {
return errors.InternalServerError("go.micro.proxy.http", "SendRequest is unsupported")
}
// ServeRequest honours the server.Router interface // ServeRequest honours the server.Router interface
func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error { func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error {
if p.Endpoint == "" { if p.Endpoint == "" {

View File

@ -5,6 +5,7 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"sort"
"strings" "strings"
"sync" "sync"
@ -12,6 +13,7 @@ import (
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
"github.com/micro/go-micro/codec/bytes" "github.com/micro/go-micro/codec/bytes"
"github.com/micro/go-micro/config/options" "github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/proxy" "github.com/micro/go-micro/proxy"
"github.com/micro/go-micro/router" "github.com/micro/go-micro/router"
"github.com/micro/go-micro/server" "github.com/micro/go-micro/server"
@ -26,9 +28,12 @@ type Proxy struct {
// Endpoint specifies the fixed service endpoint to call. // Endpoint specifies the fixed service endpoint to call.
Endpoint string Endpoint string
// The client to use for outbound requests // The client to use for outbound requests in the local network
Client client.Client Client client.Client
// Links are used for outbound requests not in the local network
Links map[string]client.Client
// The router for routes // The router for routes
Router router.Router Router router.Router
@ -76,7 +81,7 @@ func readLoop(r server.Request, s client.Stream) error {
} }
// toNodes returns a list of node addresses from given routes // toNodes returns a list of node addresses from given routes
func toNodes(routes map[uint64]router.Route) []string { func toNodes(routes []router.Route) []string {
var nodes []string var nodes []string
for _, node := range routes { for _, node := range routes {
address := node.Address address := node.Address
@ -88,15 +93,37 @@ func toNodes(routes map[uint64]router.Route) []string {
return nodes return nodes
} }
func (p *Proxy) getRoute(service string) ([]string, error) { func (p *Proxy) getLink(r router.Route) (client.Client, error) {
if r.Link == "local" || len(p.Links) == 0 {
return p.Client, nil
}
l, ok := p.Links[r.Link]
if !ok {
return nil, errors.InternalServerError("go.micro.proxy", "link not found")
}
return l, nil
}
func (p *Proxy) getRoute(service string) ([]router.Route, error) {
toSlice := func(r map[uint64]router.Route) []router.Route {
var routes []router.Route
for _, v := range r {
routes = append(routes, v)
}
// sort the routes in order of metric
sort.Slice(routes, func(i, j int) bool { return routes[i].Metric < routes[j].Metric })
return routes
}
// lookup the route cache first // lookup the route cache first
p.Lock() p.Lock()
routes, ok := p.Routes[service] routes, ok := p.Routes[service]
if ok { if ok {
p.Unlock() p.Unlock()
return toNodes(routes), nil return toSlice(routes), nil
} }
p.Routes[service] = make(map[uint64]router.Route)
p.Unlock() p.Unlock()
// lookup the routes in the router // lookup the routes in the router
@ -113,12 +140,16 @@ func (p *Proxy) getRoute(service string) ([]string, error) {
// update the proxy cache // update the proxy cache
p.Lock() p.Lock()
for _, route := range results { for _, route := range results {
// create if does not exist
if _, ok := p.Routes[service]; !ok {
p.Routes[service] = make(map[uint64]router.Route)
}
p.Routes[service][route.Hash()] = route p.Routes[service][route.Hash()] = route
} }
routes = p.Routes[service] routes = p.Routes[service]
p.Unlock() p.Unlock()
return toNodes(routes), nil return toSlice(routes), nil
} }
// manageRouteCache applies action on a given route to Proxy route cache // manageRouteCache applies action on a given route to Proxy route cache
@ -171,12 +202,27 @@ func (p *Proxy) watchRoutes() {
} }
} }
func (p *Proxy) SendRequest(ctx context.Context, req client.Request, rsp client.Response) error {
return errors.InternalServerError("go.micro.proxy", "SendRequest is unsupported")
}
// ServeRequest honours the server.Router interface // ServeRequest honours the server.Router interface
func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error { func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error {
// service name // determine if its local routing
service := req.Service() var local bool
endpoint := req.Endpoint() // address to call
var addresses []string var addresses []string
// routes
var routes []router.Route
// service name to call
service := req.Service()
// endpoint to call
endpoint := req.Endpoint()
// are we network routing or local routing
if len(p.Links) == 0 {
local = true
}
// call a specific backend endpoint either by name or address // call a specific backend endpoint either by name or address
if len(p.Endpoint) > 0 { if len(p.Endpoint) > 0 {
@ -190,7 +236,7 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server
return err return err
} }
// set the address // set the address
addresses = addr routes = addr
// set the name // set the name
service = p.Endpoint service = p.Endpoint
} }
@ -201,16 +247,66 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server
if err != nil { if err != nil {
return err return err
} }
addresses = addr routes = addr
} }
// if the address is already set just serve it
// TODO: figure it out if we should know to pick a link
if len(addresses) > 0 {
// serve the normal way
return p.serveRequest(ctx, p.Client, service, endpoint, req, rsp, client.WithAddress(addresses...))
}
// there's no links e.g we're local routing then just serve it with addresses
if local {
var opts []client.CallOption var opts []client.CallOption
// set address if available // set address if available via routes or specific endpoint
if len(addresses) > 0 { if len(routes) > 0 {
addresses := toNodes(routes)
opts = append(opts, client.WithAddress(addresses...)) opts = append(opts, client.WithAddress(addresses...))
} }
// serve the normal way
return p.serveRequest(ctx, p.Client, service, endpoint, req, rsp, opts...)
}
var gerr error
// we're routing globally with multiple links
// so we need to pick a link per route
for _, route := range routes {
// pick the link or error out
link, err := p.getLink(route)
if err != nil {
// ok let's try again
gerr = err
continue
}
// set the address to call
addresses := toNodes([]router.Route{route})
// do the request with the link
gerr = p.serveRequest(ctx, link, service, endpoint, req, rsp, client.WithAddress(addresses...))
// return on no error since we succeeded
if gerr == nil {
return nil
}
// return where the context deadline was exceeded
if gerr == context.Canceled || gerr == context.DeadlineExceeded {
return err
}
// otherwise attempt to do it all over again
}
// if we got here something went really badly wrong
return gerr
}
func (p *Proxy) serveRequest(ctx context.Context, link client.Client, service, endpoint string, req server.Request, rsp server.Response, opts ...client.CallOption) error {
// read initial request // read initial request
body, err := req.Read() body, err := req.Read()
if err != nil { if err != nil {
@ -218,14 +314,14 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server
} }
// create new request with raw bytes body // create new request with raw bytes body
creq := p.Client.NewRequest(service, endpoint, &bytes.Frame{body}, client.WithContentType(req.ContentType())) creq := link.NewRequest(service, endpoint, &bytes.Frame{body}, client.WithContentType(req.ContentType()))
// not a stream so make a client.Call request // not a stream so make a client.Call request
if !req.Stream() { if !req.Stream() {
crsp := new(bytes.Frame) crsp := new(bytes.Frame)
// make a call to the backend // make a call to the backend
if err := p.Client.Call(ctx, creq, crsp, opts...); err != nil { if err := link.Call(ctx, creq, crsp, opts...); err != nil {
return err return err
} }
@ -238,7 +334,7 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server
} }
// create new stream // create new stream
stream, err := p.Client.Stream(ctx, creq, opts...) stream, err := link.Stream(ctx, creq, opts...)
if err != nil { if err != nil {
return err return err
} }
@ -300,6 +396,7 @@ func NewSingleHostProxy(endpoint string) *Proxy {
// NewProxy returns a new proxy which will route based on mucp headers // NewProxy returns a new proxy which will route based on mucp headers
func NewProxy(opts ...options.Option) proxy.Proxy { func NewProxy(opts ...options.Option) proxy.Proxy {
p := new(Proxy) p := new(Proxy)
p.Links = map[string]client.Client{}
p.Options = options.NewOptions(opts...) p.Options = options.NewOptions(opts...)
p.Options.Init(options.WithString("mucp")) p.Options.Init(options.WithString("mucp"))
@ -320,6 +417,12 @@ func NewProxy(opts ...options.Option) proxy.Proxy {
p.Client = client.DefaultClient p.Client = client.DefaultClient
} }
// get client
links, ok := p.Options.Values().Get("proxy.links")
if ok {
p.Links = links.(map[string]client.Client)
}
// get router // get router
r, ok := p.Options.Values().Get("proxy.router") r, ok := p.Options.Values().Get("proxy.router")
if ok { if ok {

View File

@ -13,6 +13,8 @@ import (
// Proxy can be used as a proxy server for go-micro services // Proxy can be used as a proxy server for go-micro services
type Proxy interface { type Proxy interface {
options.Options options.Options
// SendRequest honours the client.Router interface
SendRequest(context.Context, client.Request, client.Response) error
// ServeRequest honours the server.Router interface // ServeRequest honours the server.Router interface
ServeRequest(context.Context, server.Request, server.Response) error ServeRequest(context.Context, server.Request, server.Response) error
} }
@ -35,3 +37,20 @@ func WithClient(c client.Client) options.Option {
func WithRouter(r router.Router) options.Option { func WithRouter(r router.Router) options.Option {
return options.WithValue("proxy.router", r) return options.WithValue("proxy.router", r)
} }
// WithLink sets a link for outbound requests
func WithLink(name string, c client.Client) options.Option {
return func(o *options.Values) error {
var links map[string]client.Client
v, ok := o.Get("proxy.links")
if ok {
links = v.(map[string]client.Client)
} else {
links = map[string]client.Client{}
}
links[name] = c
// save the links
o.Set("proxy.links", links)
return nil
}
}