From 9d514f0e607c54718b78e2d1ec27edfbe156f72a Mon Sep 17 00:00:00 2001 From: Asim Date: Thu, 21 May 2015 19:24:57 +0100 Subject: [PATCH] further transport rework --- client/client.go | 26 +++++--- client/rpc_client.go | 33 +++++++--- examples/service_client.go | 3 + server/rpc_server.go | 32 ++++++---- server/server.go | 25 ++++++-- transport/http_transport.go | 18 +++--- transport/nats_transport.go | 46 ++++++++------ transport/rabbitmq_channel.go | 5 +- transport/rabbitmq_connection.go | 40 ++++++------ transport/rabbitmq_transport.go | 106 ++++++++++++++++--------------- transport/transport.go | 12 ++-- 11 files changed, 209 insertions(+), 137 deletions(-) diff --git a/client/client.go b/client/client.go index bb405325..8d145a30 100644 --- a/client/client.go +++ b/client/client.go @@ -1,33 +1,43 @@ package client +import ( + "github.com/myodc/go-micro/transport" +) + type Client interface { NewRequest(string, string, interface{}) Request NewProtoRequest(string, string, interface{}) Request NewJsonRequest(string, string, interface{}) Request - Call(interface{}, interface{}) error - CallRemote(string, string, interface{}, interface{}) error + Call(Request, interface{}) error + CallRemote(string, string, Request, interface{}) error } +type options struct { + transport transport.Transport +} + +type Options func(*options) + var ( - client = NewRpcClient() + DefaultClient Client = NewRpcClient() ) func Call(request Request, response interface{}) error { - return client.Call(request, response) + return DefaultClient.Call(request, response) } func CallRemote(address, path string, request Request, response interface{}) error { - return client.CallRemote(address, path, request, response) + return DefaultClient.CallRemote(address, path, request, response) } func NewRequest(service, method string, request interface{}) Request { - return client.NewRequest(service, method, request) + return DefaultClient.NewRequest(service, method, request) } func NewProtoRequest(service, method string, request interface{}) Request { - return client.NewProtoRequest(service, method, request) + return DefaultClient.NewProtoRequest(service, method, request) } func NewJsonRequest(service, method string, request interface{}) Request { - return client.NewJsonRequest(service, method, request) + return DefaultClient.NewJsonRequest(service, method, request) } diff --git a/client/rpc_client.go b/client/rpc_client.go index 524b5049..9d65ebde 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -22,7 +22,7 @@ type headerRoundTripper struct { } type RpcClient struct { - transport transport.Transport + opts options } func init() { @@ -86,7 +86,7 @@ func (r *RpcClient) call(address, path string, request Request, response interfa msg.Header["Content-Type"] = request.ContentType() - c, err := r.transport.NewClient(request.Service(), address) + c, err := r.opts.transport.NewClient(address) if err != nil { return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) } @@ -146,24 +146,39 @@ func (r *RpcClient) Call(request Request, response interface{}) error { n := rand.Int() % len(service.Nodes()) node := service.Nodes()[n] - address := fmt.Sprintf("%s:%d", node.Address(), node.Port()) - return r.call(address, "/_rpc", request, response) + + address := node.Address() + if node.Port() > 0 { + address = fmt.Sprintf("%s:%d", address, node.Port()) + } + + return r.call(address, "", request, response) } -func (r *RpcClient) NewRequest(service, method string, request interface{}) *RpcRequest { +func (r *RpcClient) NewRequest(service, method string, request interface{}) Request { return r.NewProtoRequest(service, method, request) } -func (r *RpcClient) NewProtoRequest(service, method string, request interface{}) *RpcRequest { +func (r *RpcClient) NewProtoRequest(service, method string, request interface{}) Request { return newRpcRequest(service, method, request, "application/octet-stream") } -func (r *RpcClient) NewJsonRequest(service, method string, request interface{}) *RpcRequest { +func (r *RpcClient) NewJsonRequest(service, method string, request interface{}) Request { return newRpcRequest(service, method, request, "application/json") } -func NewRpcClient() *RpcClient { +func NewRpcClient(opt ...Options) *RpcClient { + var opts options + + for _, o := range opt { + o(&opts) + } + + if opts.transport == nil { + opts.transport = transport.DefaultTransport + } + return &RpcClient{ - transport: transport.DefaultTransport, + opts: opts, } } diff --git a/examples/service_client.go b/examples/service_client.go index a31c5e33..5821bb7b 100644 --- a/examples/service_client.go +++ b/examples/service_client.go @@ -4,10 +4,13 @@ import ( "fmt" "github.com/myodc/go-micro/client" + "github.com/myodc/go-micro/cmd" example "github.com/myodc/go-micro/template/proto/example" ) func main() { + cmd.Init() + client.DefaultClient = client.NewRpcClient() // Create new request to service go.micro.service.go-template, method Example.Call req := client.NewRequest("go.micro.service.template", "Example.Call", &example.Request{ Name: "John", diff --git a/server/rpc_server.go b/server/rpc_server.go index 9e65b9c2..7e57b0ca 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -14,11 +14,11 @@ import ( ) type RpcServer struct { - mtx sync.RWMutex - address string - transport transport.Transport - rpc *rpc.Server - exit chan chan error + mtx sync.RWMutex + address string + opts options + rpc *rpc.Server + exit chan chan error } var ( @@ -96,7 +96,7 @@ func (s *RpcServer) Register(r Receiver) error { func (s *RpcServer) Start() error { registerHealthChecker(http.DefaultServeMux) - ts, err := s.transport.NewServer(Name, s.address) + ts, err := s.opts.transport.NewServer(s.address) if err != nil { return err } @@ -123,11 +123,21 @@ func (s *RpcServer) Stop() error { return <-ch } -func NewRpcServer(address string) *RpcServer { +func NewRpcServer(address string, opt ...Options) *RpcServer { + var opts options + + for _, o := range opt { + o(&opts) + } + + if opts.transport == nil { + opts.transport = transport.DefaultTransport + } + return &RpcServer{ - address: address, - transport: transport.DefaultTransport, - rpc: rpc.NewServer(), - exit: make(chan chan error), + opts: opts, + address: address, + rpc: rpc.NewServer(), + exit: make(chan chan error), } } diff --git a/server/server.go b/server/server.go index dc1463f0..fd973ea1 100644 --- a/server/server.go +++ b/server/server.go @@ -10,6 +10,7 @@ import ( "code.google.com/p/go-uuid/uuid" log "github.com/golang/glog" "github.com/myodc/go-micro/registry" + "github.com/myodc/go-micro/transport" ) type Server interface { @@ -22,6 +23,12 @@ type Server interface { Stop() error } +type options struct { + transport transport.Transport +} + +type Options func(*options) + var ( Address string Name string @@ -65,16 +72,26 @@ func Run() error { } // parse address for host, port + var host string + var port int parts := strings.Split(DefaultServer.Address(), ":") - host := strings.Join(parts[:len(parts)-1], ":") - port, _ := strconv.Atoi(parts[len(parts)-1]) + if len(parts) > 1 { + host = strings.Join(parts[:len(parts)-1], ":") + port, _ = strconv.Atoi(parts[len(parts)-1]) + } else { + host = parts[0] + } // register service node := registry.NewNode(Id, host, port) service := registry.NewService(Name, node) - log.Infof("Registering %s", node.Id()) - registry.Register(service) + log.Infof("Registering node: %s", node.Id()) + + err := registry.Register(service) + if err != nil { + log.Fatal("Failed to register: %v", err) + } ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) diff --git a/transport/http_transport.go b/transport/http_transport.go index abd0a82b..c63d854a 100644 --- a/transport/http_transport.go +++ b/transport/http_transport.go @@ -17,8 +17,8 @@ type HttpTransport struct { } type HttpTransportClient struct { - client *http.Client - target string + ht *HttpTransport + addr string } type HttpTransportSocket struct { @@ -52,16 +52,16 @@ func (h *HttpTransportClient) Send(m *Message) (*Message, error) { Method: "POST", URL: &url.URL{ Scheme: "http", - Host: h.target, + Host: h.addr, // Path: path, }, Header: header, Body: buf, ContentLength: int64(reqB.Len()), - Host: h.target, + Host: h.addr, } - rsp, err := h.client.Do(hreq) + rsp, err := h.ht.client.Do(hreq) if err != nil { return nil, err } @@ -144,14 +144,14 @@ func (h *HttpTransportServer) Serve(fn func(Socket)) error { return srv.Serve(h.listener) } -func (h *HttpTransport) NewClient(name, addr string) (Client, error) { +func (h *HttpTransport) NewClient(addr string) (Client, error) { return &HttpTransportClient{ - client: h.client, - target: addr, + ht: h, + addr: addr, }, nil } -func (h *HttpTransport) NewServer(name, addr string) (Server, error) { +func (h *HttpTransport) NewServer(addr string) (Server, error) { l, err := net.Listen("tcp", addr) if err != nil { return nil, err diff --git a/transport/nats_transport.go b/transport/nats_transport.go index 49a896b7..fe0fb188 100644 --- a/transport/nats_transport.go +++ b/transport/nats_transport.go @@ -9,11 +9,13 @@ import ( "github.com/apcera/nats" ) -type NatsTransport struct{} +type NatsTransport struct { + addrs []string +} type NatsTransportClient struct { - conn *nats.Conn - target string + conn *nats.Conn + addr string } type NatsTransportSocket struct { @@ -24,7 +26,7 @@ type NatsTransportSocket struct { type NatsTransportServer struct { conn *nats.Conn - name string + addr string exit chan bool } @@ -34,7 +36,7 @@ func (n *NatsTransportClient) Send(m *Message) (*Message, error) { return nil, err } - rsp, err := n.conn.Request(n.target, b, time.Second*10) + rsp, err := n.conn.Request(n.addr, b, time.Second*10) if err != nil { return nil, err } @@ -72,7 +74,7 @@ func (n *NatsTransportSocket) Write(b []byte) error { } func (n *NatsTransportServer) Addr() string { - return "127.0.0.1:4222" + return n.addr } func (n *NatsTransportServer) Close() error { @@ -82,7 +84,7 @@ func (n *NatsTransportServer) Close() error { } func (n *NatsTransportServer) Serve(fn func(Socket)) error { - s, err := n.conn.QueueSubscribe(n.name, "queue:"+n.name, func(m *nats.Msg) { + s, err := n.conn.Subscribe(n.addr, func(m *nats.Msg) { buf := bytes.NewBuffer(nil) hdr := make(map[string]string) @@ -113,39 +115,45 @@ func (n *NatsTransportServer) Serve(fn func(Socket)) error { return s.Unsubscribe() } -func (n *NatsTransport) NewClient(name, addr string) (Client, error) { - if !strings.HasPrefix(addr, "nats://") { - addr = nats.DefaultURL +func (n *NatsTransport) NewClient(addr string) (Client, error) { + cAddr := nats.DefaultURL + + if len(n.addrs) > 0 && strings.HasPrefix(n.addrs[0], "nats://") { + cAddr = n.addrs[0] } - c, err := nats.Connect(addr) + c, err := nats.Connect(cAddr) if err != nil { return nil, err } return &NatsTransportClient{ - conn: c, - target: name, + conn: c, + addr: addr, }, nil } -func (n *NatsTransport) NewServer(name, addr string) (Server, error) { - if !strings.HasPrefix(addr, "nats://") { - addr = nats.DefaultURL +func (n *NatsTransport) NewServer(addr string) (Server, error) { + cAddr := nats.DefaultURL + + if len(n.addrs) > 0 && strings.HasPrefix(n.addrs[0], "nats://") { + cAddr = n.addrs[0] } - c, err := nats.Connect(addr) + c, err := nats.Connect(cAddr) if err != nil { return nil, err } return &NatsTransportServer{ - name: name, + addr: nats.NewInbox(), conn: c, exit: make(chan bool, 1), }, nil } func NewNatsTransport(addrs []string) *NatsTransport { - return &NatsTransport{} + return &NatsTransport{ + addrs: addrs, + } } diff --git a/transport/rabbitmq_channel.go b/transport/rabbitmq_channel.go index fcd9fdb7..0c215cd9 100644 --- a/transport/rabbitmq_channel.go +++ b/transport/rabbitmq_channel.go @@ -2,7 +2,6 @@ package transport // // All credit to Mondo -// https://github.com/mondough/typhon // import ( @@ -50,11 +49,11 @@ func (r *RabbitChannel) Close() error { return r.channel.Close() } -func (r *RabbitChannel) Publish(exchange, routingKey string, message amqp.Publishing) error { +func (r *RabbitChannel) Publish(exchange, key string, message amqp.Publishing) error { if r.channel == nil { return errors.New("Channel is nil") } - return r.channel.Publish(exchange, routingKey, false, false, message) + return r.channel.Publish(exchange, key, false, false, message) } func (r *RabbitChannel) DeclareExchange(exchange string) error { diff --git a/transport/rabbitmq_connection.go b/transport/rabbitmq_connection.go index 0e8424d3..a4353b01 100644 --- a/transport/rabbitmq_connection.go +++ b/transport/rabbitmq_connection.go @@ -2,10 +2,10 @@ package transport // // All credit to Mondo -// https://github.com/mondough/typhon // import ( + "strings" "sync" "time" @@ -27,9 +27,9 @@ type RabbitConnection struct { connected bool - mtx sync.Mutex - closeChan chan struct{} - closed bool + mtx sync.Mutex + close chan bool + closed bool } func (r *RabbitConnection) Init() chan bool { @@ -53,7 +53,7 @@ func (r *RabbitConnection) Connect(connected chan bool) { case <-notifyClose: // Spin around and reconnect r.connected = false - case <-r.closeChan: + case <-r.close: // Shut down connection if err := r.Connection.Close(); err != nil { } @@ -75,7 +75,7 @@ func (r *RabbitConnection) Close() { return } - close(r.closeChan) + close(r.close) r.closed = true } @@ -97,23 +97,23 @@ func (r *RabbitConnection) tryToConnect() error { return nil } -func (r *RabbitConnection) Consume(serverName string) (<-chan amqp.Delivery, error) { +func (r *RabbitConnection) Consume(queue string) (<-chan amqp.Delivery, error) { consumerChannel, err := NewRabbitChannel(r.Connection) if err != nil { return nil, err } - err = consumerChannel.DeclareQueue(serverName) + err = consumerChannel.DeclareQueue(queue) if err != nil { return nil, err } - deliveries, err := consumerChannel.ConsumeQueue(serverName) + deliveries, err := consumerChannel.ConsumeQueue(queue) if err != nil { return nil, err } - err = consumerChannel.BindQueue(serverName, r.exchange) + err = consumerChannel.BindQueue(queue, r.exchange) if err != nil { return nil, err } @@ -121,12 +121,16 @@ func (r *RabbitConnection) Consume(serverName string) (<-chan amqp.Delivery, err return deliveries, nil } -func (r *RabbitConnection) Publish(exchange, routingKey string, msg amqp.Publishing) error { - return r.ExchangeChannel.Publish(exchange, routingKey, msg) +func (r *RabbitConnection) Publish(exchange, key string, msg amqp.Publishing) error { + return r.ExchangeChannel.Publish(exchange, key, msg) } -func NewRabbitConnection(exchange, url string) *RabbitConnection { - if len(url) == 0 { +func NewRabbitConnection(exchange string, urls []string) *RabbitConnection { + var url string + + if len(urls) > 0 && strings.HasPrefix(urls[0], "amqp://") { + url = urls[0] + } else { url = DefaultRabbitURL } @@ -135,9 +139,9 @@ func NewRabbitConnection(exchange, url string) *RabbitConnection { } return &RabbitConnection{ - exchange: DefaultExchange, - url: DefaultRabbitURL, - notify: make(chan bool, 1), - closeChan: make(chan struct{}), + exchange: exchange, + url: url, + notify: make(chan bool, 1), + close: make(chan bool), } } diff --git a/transport/rabbitmq_transport.go b/transport/rabbitmq_transport.go index 78a85e1c..c33392cb 100644 --- a/transport/rabbitmq_transport.go +++ b/transport/rabbitmq_transport.go @@ -12,13 +12,14 @@ import ( ) type RabbitMQTransport struct { - conn *RabbitConnection + conn *RabbitConnection + addrs []string } type RabbitMQTransportClient struct { once sync.Once - conn *RabbitConnection - target string + rt *RabbitMQTransport + addr string replyTo string sync.Mutex @@ -33,27 +34,27 @@ type RabbitMQTransportSocket struct { type RabbitMQTransportServer struct { conn *RabbitConnection - name string + addr string } -func (h *RabbitMQTransportClient) init() { - <-h.conn.Init() - if err := h.conn.Channel.DeclareReplyQueue(h.replyTo); err != nil { +func (r *RabbitMQTransportClient) init() { + <-r.rt.conn.Init() + if err := r.rt.conn.Channel.DeclareReplyQueue(r.replyTo); err != nil { return } - deliveries, err := h.conn.Channel.ConsumeQueue(h.replyTo) + deliveries, err := r.rt.conn.Channel.ConsumeQueue(r.replyTo) if err != nil { return } go func() { for delivery := range deliveries { - go h.handle(delivery) + go r.handle(delivery) } }() } -func (h *RabbitMQTransportClient) handle(delivery amqp.Delivery) { - ch := h.getReq(delivery.CorrelationId) +func (r *RabbitMQTransportClient) handle(delivery amqp.Delivery) { + ch := r.getReq(delivery.CorrelationId) if ch == nil { return } @@ -63,28 +64,28 @@ func (h *RabbitMQTransportClient) handle(delivery amqp.Delivery) { } } -func (h *RabbitMQTransportClient) putReq(id string) chan amqp.Delivery { - h.Lock() +func (r *RabbitMQTransportClient) putReq(id string) chan amqp.Delivery { + r.Lock() ch := make(chan amqp.Delivery, 1) - h.inflight[id] = ch - h.Unlock() + r.inflight[id] = ch + r.Unlock() return ch } -func (h *RabbitMQTransportClient) getReq(id string) chan amqp.Delivery { - h.Lock() - defer h.Unlock() - if ch, ok := h.inflight[id]; ok { - delete(h.inflight, id) +func (r *RabbitMQTransportClient) getReq(id string) chan amqp.Delivery { + r.Lock() + defer r.Unlock() + if ch, ok := r.inflight[id]; ok { + delete(r.inflight, id) return ch } return nil } -func (h *RabbitMQTransportClient) Send(m *Message) (*Message, error) { - h.once.Do(h.init) +func (r *RabbitMQTransportClient) Send(m *Message) (*Message, error) { + r.once.Do(r.init) - if !h.conn.IsConnected() { + if !r.rt.conn.IsConnected() { return nil, errors.New("Not connected to AMQP") } @@ -93,7 +94,7 @@ func (h *RabbitMQTransportClient) Send(m *Message) (*Message, error) { return nil, err } - replyChan := h.putReq(id.String()) + replyChan := r.putReq(id.String()) headers := amqp.Table{} @@ -105,12 +106,12 @@ func (h *RabbitMQTransportClient) Send(m *Message) (*Message, error) { CorrelationId: id.String(), Timestamp: time.Now().UTC(), Body: m.Body, - ReplyTo: h.replyTo, + ReplyTo: r.replyTo, Headers: headers, } - if err := h.conn.Publish("micro", h.target, message); err != nil { - h.getReq(id.String()) + if err := r.rt.conn.Publish("micro", r.addr, message); err != nil { + r.getReq(id.String()) return nil, err } @@ -131,44 +132,43 @@ func (h *RabbitMQTransportClient) Send(m *Message) (*Message, error) { } } -func (h *RabbitMQTransportClient) Close() error { - h.conn.Close() +func (r *RabbitMQTransportClient) Close() error { return nil } -func (h *RabbitMQTransportSocket) Recv() (*Message, error) { +func (r *RabbitMQTransportSocket) Recv() (*Message, error) { m := &Message{ Header: make(map[string]string), - Body: h.d.Body, + Body: r.d.Body, } - for k, v := range h.d.Headers { + for k, v := range r.d.Headers { m.Header[k] = fmt.Sprintf("%v", v) } return m, nil } -func (h *RabbitMQTransportSocket) WriteHeader(k string, v string) { - h.hdr[k] = v +func (r *RabbitMQTransportSocket) WriteHeader(k string, v string) { + r.hdr[k] = v } -func (h *RabbitMQTransportSocket) Write(b []byte) error { - _, err := h.buf.Write(b) +func (r *RabbitMQTransportSocket) Write(b []byte) error { + _, err := r.buf.Write(b) return err } -func (h *RabbitMQTransportServer) Addr() string { - return h.conn.Connection.LocalAddr().String() +func (r *RabbitMQTransportServer) Addr() string { + return r.addr } -func (h *RabbitMQTransportServer) Close() error { - h.conn.Close() +func (r *RabbitMQTransportServer) Close() error { + r.conn.Close() return nil } -func (h *RabbitMQTransportServer) Serve(fn func(Socket)) error { - deliveries, err := h.conn.Consume(h.name) +func (r *RabbitMQTransportServer) Serve(fn func(Socket)) error { + deliveries, err := r.conn.Consume(r.addr) if err != nil { return err } @@ -190,7 +190,7 @@ func (h *RabbitMQTransportServer) Serve(fn func(Socket)) error { Headers: headers, } - h.conn.Publish("", d.ReplyTo, msg) + r.conn.Publish("", d.ReplyTo, msg) buf.Reset() } @@ -201,32 +201,38 @@ func (h *RabbitMQTransportServer) Serve(fn func(Socket)) error { return nil } -func (h *RabbitMQTransport) NewClient(name, addr string) (Client, error) { +func (r *RabbitMQTransport) NewClient(addr string) (Client, error) { id, err := uuid.NewV4() if err != nil { return nil, err } return &RabbitMQTransportClient{ - conn: h.conn, - target: name, + rt: r, + addr: addr, inflight: make(map[string]chan amqp.Delivery), replyTo: fmt.Sprintf("replyTo-%s", id.String()), }, nil } -func (h *RabbitMQTransport) NewServer(name, addr string) (Server, error) { - conn := NewRabbitConnection("", "") +func (r *RabbitMQTransport) NewServer(addr string) (Server, error) { + id, err := uuid.NewV4() + if err != nil { + return nil, err + } + + conn := NewRabbitConnection("", r.addrs) <-conn.Init() return &RabbitMQTransportServer{ - name: name, + addr: id.String(), conn: conn, }, nil } func NewRabbitMQTransport(addrs []string) *RabbitMQTransport { return &RabbitMQTransport{ - conn: NewRabbitConnection("", ""), + conn: NewRabbitConnection("", addrs), + addrs: addrs, } } diff --git a/transport/transport.go b/transport/transport.go index 6c349aee..2c4de967 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -23,18 +23,18 @@ type Server interface { } type Transport interface { - NewClient(name, addr string) (Client, error) - NewServer(name, addr string) (Server, error) + NewClient(addr string) (Client, error) + NewServer(addr string) (Server, error) } var ( DefaultTransport Transport = NewHttpTransport([]string{}) ) -func NewClient(name, addr string) (Client, error) { - return DefaultTransport.NewClient(name, addr) +func NewClient(addr string) (Client, error) { + return DefaultTransport.NewClient(addr) } -func NewServer(name, addr string) (Server, error) { - return DefaultTransport.NewServer(name, addr) +func NewServer(addr string) (Server, error) { + return DefaultTransport.NewServer(addr) }