use send/recv semantics

This commit is contained in:
Asim 2015-05-21 21:08:19 +01:00
parent 4f909d0be4
commit c9df1cf7d2
8 changed files with 166 additions and 168 deletions

View File

@ -86,7 +86,7 @@ func (r *RpcClient) call(address, path string, request Request, response interfa
msg.Header["Content-Type"] = request.ContentType() msg.Header["Content-Type"] = request.ContentType()
c, err := r.opts.transport.NewClient(address) c, err := r.opts.transport.Dial(address)
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
} }

View File

@ -26,10 +26,10 @@ var (
RpcPath = "/_rpc" RpcPath = "/_rpc"
) )
func (s *RpcServer) serve(sock transport.Socket) { func (s *RpcServer) accept(sock transport.Socket) {
// serveCtx := getServerContext(req) // serveCtx := getServerContext(req)
msg, err := sock.Recv() var msg transport.Message
if err != nil { if err := sock.Recv(&msg); err != nil {
return return
} }
@ -55,13 +55,16 @@ func (s *RpcServer) serve(sock transport.Socket) {
} }
//ctx := newContext(&ctx{}, serveCtx) //ctx := newContext(&ctx{}, serveCtx)
err = s.rpc.ServeRequestWithContext(context.Background(), cc) if err := s.rpc.ServeRequestWithContext(context.Background(), cc); err != nil {
if err != nil {
return return
} }
sock.WriteHeader("Content-Type", msg.Header["Content-Type"]) sock.Send(&transport.Message{
sock.Write(rsp.Bytes()) Header: map[string]string{
"Content-Type": msg.Header["Content-Type"],
},
Body: rsp.Bytes(),
})
} }
func (s *RpcServer) Address() string { func (s *RpcServer) Address() string {
@ -96,7 +99,7 @@ func (s *RpcServer) Register(r Receiver) error {
func (s *RpcServer) Start() error { func (s *RpcServer) Start() error {
registerHealthChecker(http.DefaultServeMux) registerHealthChecker(http.DefaultServeMux)
ts, err := s.opts.transport.NewServer(s.address) ts, err := s.opts.transport.Listen(s.address)
if err != nil { if err != nil {
return err return err
} }
@ -107,7 +110,7 @@ func (s *RpcServer) Start() error {
s.address = ts.Addr() s.address = ts.Addr()
s.mtx.RUnlock() s.mtx.RUnlock()
go ts.Serve(s.serve) go ts.Accept(s.accept)
go func() { go func() {
ch := <-s.exit ch := <-s.exit

View File

@ -2,6 +2,7 @@ package transport
import ( import (
"bytes" "bytes"
"errors"
"io/ioutil" "io/ioutil"
"net" "net"
"net/http" "net/http"
@ -26,7 +27,7 @@ type HttpTransportSocket struct {
w http.ResponseWriter w http.ResponseWriter
} }
type HttpTransportServer struct { type HttpTransportListener struct {
listener net.Listener listener net.Listener
} }
@ -92,46 +93,55 @@ func (h *HttpTransportClient) Close() error {
return nil return nil
} }
func (h *HttpTransportSocket) Recv() (*Message, error) { func (h *HttpTransportSocket) Recv(m *Message) error {
b, err := ioutil.ReadAll(h.r.Body) if m == nil {
if err != nil { return errors.New("message passed in is nil")
return nil, err
} }
m := &Message{ b, err := ioutil.ReadAll(h.r.Body)
if err != nil {
return err
}
mr := &Message{
Header: make(map[string]string), Header: make(map[string]string),
Body: b, Body: b,
} }
for k, v := range h.r.Header { for k, v := range h.r.Header {
if len(v) > 0 { if len(v) > 0 {
m.Header[k] = v[0] mr.Header[k] = v[0]
} else { } else {
m.Header[k] = "" mr.Header[k] = ""
} }
} }
return m, nil *m = *mr
return nil
} }
func (h *HttpTransportSocket) WriteHeader(k string, v string) { func (h *HttpTransportSocket) Send(m *Message) error {
h.w.Header().Set(k, v) for k, v := range m.Header {
} h.w.Header().Set(k, v)
}
func (h *HttpTransportSocket) Write(b []byte) error { _, err := h.w.Write(m.Body)
_, err := h.w.Write(b)
return err return err
} }
func (h *HttpTransportServer) Addr() string { func (h *HttpTransportSocket) Close() error {
return nil
}
func (h *HttpTransportListener) Addr() string {
return h.listener.Addr().String() return h.listener.Addr().String()
} }
func (h *HttpTransportServer) Close() error { func (h *HttpTransportListener) Close() error {
return h.listener.Close() return h.listener.Close()
} }
func (h *HttpTransportServer) Serve(fn func(Socket)) error { func (h *HttpTransportListener) Accept(fn func(Socket)) error {
srv := &http.Server{ srv := &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fn(&HttpTransportSocket{ fn(&HttpTransportSocket{
@ -144,20 +154,20 @@ func (h *HttpTransportServer) Serve(fn func(Socket)) error {
return srv.Serve(h.listener) return srv.Serve(h.listener)
} }
func (h *HttpTransport) NewClient(addr string) (Client, error) { func (h *HttpTransport) Dial(addr string) (Client, error) {
return &HttpTransportClient{ return &HttpTransportClient{
ht: h, ht: h,
addr: addr, addr: addr,
}, nil }, nil
} }
func (h *HttpTransport) NewServer(addr string) (Server, error) { func (h *HttpTransport) Listen(addr string) (Listener, error) {
l, err := net.Listen("tcp", addr) l, err := net.Listen("tcp", addr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &HttpTransportServer{ return &HttpTransportListener{
listener: l, listener: l,
}, nil }, nil
} }

View File

@ -1,8 +1,8 @@
package transport package transport
import ( import (
"bytes"
"encoding/json" "encoding/json"
"errors"
"strings" "strings"
"time" "time"
@ -19,12 +19,11 @@ type NatsTransportClient struct {
} }
type NatsTransportSocket struct { type NatsTransportSocket struct {
m *nats.Msg conn *nats.Conn
hdr map[string]string m *nats.Msg
buf *bytes.Buffer
} }
type NatsTransportServer struct { type NatsTransportListener struct {
conn *nats.Conn conn *nats.Conn
addr string addr string
exit chan bool exit chan bool
@ -54,58 +53,45 @@ func (n *NatsTransportClient) Close() error {
return nil return nil
} }
func (n *NatsTransportSocket) Recv() (*Message, error) { func (n *NatsTransportSocket) Recv(m *Message) error {
var m *Message if m == nil {
return errors.New("message passed in is nil")
if err := json.Unmarshal(n.m.Data, &m); err != nil {
return nil, err
} }
return m, nil if err := json.Unmarshal(n.m.Data, &m); err != nil {
return err
}
return nil
} }
func (n *NatsTransportSocket) WriteHeader(k string, v string) { func (n *NatsTransportSocket) Send(m *Message) error {
n.hdr[k] = v b, err := json.Marshal(m)
if err != nil {
return err
}
return n.conn.Publish(n.m.Reply, b)
} }
func (n *NatsTransportSocket) Write(b []byte) error { func (n *NatsTransportSocket) Close() error {
_, err := n.buf.Write(b) return nil
return err
} }
func (n *NatsTransportServer) Addr() string { func (n *NatsTransportListener) Addr() string {
return n.addr return n.addr
} }
func (n *NatsTransportServer) Close() error { func (n *NatsTransportListener) Close() error {
n.exit <- true n.exit <- true
n.conn.Close() n.conn.Close()
return nil return nil
} }
func (n *NatsTransportServer) Serve(fn func(Socket)) error { func (n *NatsTransportListener) Accept(fn func(Socket)) error {
s, err := n.conn.Subscribe(n.addr, func(m *nats.Msg) { s, err := n.conn.Subscribe(n.addr, func(m *nats.Msg) {
buf := bytes.NewBuffer(nil)
hdr := make(map[string]string)
fn(&NatsTransportSocket{ fn(&NatsTransportSocket{
m: m, conn: n.conn,
hdr: hdr, m: m,
buf: buf,
}) })
mrsp := &Message{
Header: hdr,
Body: buf.Bytes(),
}
b, err := json.Marshal(mrsp)
if err != nil {
return
}
n.conn.Publish(m.Reply, b)
buf.Reset()
}) })
if err != nil { if err != nil {
return err return err
@ -115,7 +101,7 @@ func (n *NatsTransportServer) Serve(fn func(Socket)) error {
return s.Unsubscribe() return s.Unsubscribe()
} }
func (n *NatsTransport) NewClient(addr string) (Client, error) { func (n *NatsTransport) Dial(addr string) (Client, error) {
cAddr := nats.DefaultURL cAddr := nats.DefaultURL
if len(n.addrs) > 0 && strings.HasPrefix(n.addrs[0], "nats://") { if len(n.addrs) > 0 && strings.HasPrefix(n.addrs[0], "nats://") {
@ -133,7 +119,7 @@ func (n *NatsTransport) NewClient(addr string) (Client, error) {
}, nil }, nil
} }
func (n *NatsTransport) NewServer(addr string) (Server, error) { func (n *NatsTransport) Listen(addr string) (Listener, error) {
cAddr := nats.DefaultURL cAddr := nats.DefaultURL
if len(n.addrs) > 0 && strings.HasPrefix(n.addrs[0], "nats://") { if len(n.addrs) > 0 && strings.HasPrefix(n.addrs[0], "nats://") {
@ -145,7 +131,7 @@ func (n *NatsTransport) NewServer(addr string) (Server, error) {
return nil, err return nil, err
} }
return &NatsTransportServer{ return &NatsTransportListener{
addr: nats.NewInbox(), addr: nats.NewInbox(),
conn: c, conn: c,
exit: make(chan bool, 1), exit: make(chan bool, 1),

View File

@ -11,18 +11,18 @@ import (
"github.com/streadway/amqp" "github.com/streadway/amqp"
) )
type RabbitChannel struct { type rabbitMQChannel struct {
uuid string uuid string
connection *amqp.Connection connection *amqp.Connection
channel *amqp.Channel channel *amqp.Channel
} }
func NewRabbitChannel(conn *amqp.Connection) (*RabbitChannel, error) { func newRabbitChannel(conn *amqp.Connection) (*rabbitMQChannel, error) {
id, err := uuid.NewV4() id, err := uuid.NewV4()
if err != nil { if err != nil {
return nil, err return nil, err
} }
rabbitCh := &RabbitChannel{ rabbitCh := &rabbitMQChannel{
uuid: id.String(), uuid: id.String(),
connection: conn, connection: conn,
} }
@ -33,7 +33,7 @@ func NewRabbitChannel(conn *amqp.Connection) (*RabbitChannel, error) {
} }
func (r *RabbitChannel) Connect() error { func (r *rabbitMQChannel) Connect() error {
var err error var err error
r.channel, err = r.connection.Channel() r.channel, err = r.connection.Channel()
if err != nil { if err != nil {
@ -42,21 +42,21 @@ func (r *RabbitChannel) Connect() error {
return nil return nil
} }
func (r *RabbitChannel) Close() error { func (r *rabbitMQChannel) Close() error {
if r.channel == nil { if r.channel == nil {
return errors.New("Channel is nil") return errors.New("Channel is nil")
} }
return r.channel.Close() return r.channel.Close()
} }
func (r *RabbitChannel) Publish(exchange, key string, message amqp.Publishing) error { func (r *rabbitMQChannel) Publish(exchange, key string, message amqp.Publishing) error {
if r.channel == nil { if r.channel == nil {
return errors.New("Channel is nil") return errors.New("Channel is nil")
} }
return r.channel.Publish(exchange, key, false, false, message) return r.channel.Publish(exchange, key, false, false, message)
} }
func (r *RabbitChannel) DeclareExchange(exchange string) error { func (r *rabbitMQChannel) DeclareExchange(exchange string) error {
return r.channel.ExchangeDeclare( return r.channel.ExchangeDeclare(
exchange, // name exchange, // name
"topic", // kind "topic", // kind
@ -68,7 +68,7 @@ func (r *RabbitChannel) DeclareExchange(exchange string) error {
) )
} }
func (r *RabbitChannel) DeclareQueue(queue string) error { func (r *rabbitMQChannel) DeclareQueue(queue string) error {
_, err := r.channel.QueueDeclare( _, err := r.channel.QueueDeclare(
queue, // name queue, // name
false, // durable false, // durable
@ -80,7 +80,7 @@ func (r *RabbitChannel) DeclareQueue(queue string) error {
return err return err
} }
func (r *RabbitChannel) DeclareDurableQueue(queue string) error { func (r *rabbitMQChannel) DeclareDurableQueue(queue string) error {
_, err := r.channel.QueueDeclare( _, err := r.channel.QueueDeclare(
queue, // name queue, // name
true, // durable true, // durable
@ -92,7 +92,7 @@ func (r *RabbitChannel) DeclareDurableQueue(queue string) error {
return err return err
} }
func (r *RabbitChannel) DeclareReplyQueue(queue string) error { func (r *rabbitMQChannel) DeclareReplyQueue(queue string) error {
_, err := r.channel.QueueDeclare( _, err := r.channel.QueueDeclare(
queue, // name queue, // name
false, // durable false, // durable
@ -104,7 +104,7 @@ func (r *RabbitChannel) DeclareReplyQueue(queue string) error {
return err return err
} }
func (r *RabbitChannel) ConsumeQueue(queue string) (<-chan amqp.Delivery, error) { func (r *rabbitMQChannel) ConsumeQueue(queue string) (<-chan amqp.Delivery, error) {
return r.channel.Consume( return r.channel.Consume(
queue, // queue queue, // queue
r.uuid, // consumer r.uuid, // consumer
@ -116,7 +116,7 @@ func (r *RabbitChannel) ConsumeQueue(queue string) (<-chan amqp.Delivery, error)
) )
} }
func (r *RabbitChannel) BindQueue(queue, exchange string) error { func (r *rabbitMQChannel) BindQueue(queue, exchange string) error {
return r.channel.QueueBind( return r.channel.QueueBind(
queue, // name queue, // name
queue, // key queue, // key

View File

@ -17,10 +17,10 @@ var (
DefaultRabbitURL = "amqp://guest:guest@127.0.0.1:5672" DefaultRabbitURL = "amqp://guest:guest@127.0.0.1:5672"
) )
type RabbitConnection struct { type rabbitMQConn struct {
Connection *amqp.Connection Connection *amqp.Connection
Channel *RabbitChannel Channel *rabbitMQChannel
ExchangeChannel *RabbitChannel ExchangeChannel *rabbitMQChannel
notify chan bool notify chan bool
exchange string exchange string
url string url string
@ -32,12 +32,33 @@ type RabbitConnection struct {
closed bool closed bool
} }
func (r *RabbitConnection) Init() chan bool { func newRabbitMQConn(exchange string, urls []string) *rabbitMQConn {
var url string
if len(urls) > 0 && strings.HasPrefix(urls[0], "amqp://") {
url = urls[0]
} else {
url = DefaultRabbitURL
}
if len(exchange) == 0 {
exchange = DefaultExchange
}
return &rabbitMQConn{
exchange: exchange,
url: url,
notify: make(chan bool, 1),
close: make(chan bool),
}
}
func (r *rabbitMQConn) Init() chan bool {
go r.Connect(r.notify) go r.Connect(r.notify)
return r.notify return r.notify
} }
func (r *RabbitConnection) Connect(connected chan bool) { func (r *rabbitMQConn) Connect(connected chan bool) {
for { for {
if err := r.tryToConnect(); err != nil { if err := r.tryToConnect(); err != nil {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
@ -63,11 +84,11 @@ func (r *RabbitConnection) Connect(connected chan bool) {
} }
} }
func (r *RabbitConnection) IsConnected() bool { func (r *rabbitMQConn) IsConnected() bool {
return r.connected return r.connected
} }
func (r *RabbitConnection) Close() { func (r *rabbitMQConn) Close() {
r.mtx.Lock() r.mtx.Lock()
defer r.mtx.Unlock() defer r.mtx.Unlock()
@ -79,26 +100,26 @@ func (r *RabbitConnection) Close() {
r.closed = true r.closed = true
} }
func (r *RabbitConnection) tryToConnect() error { func (r *rabbitMQConn) tryToConnect() error {
var err error var err error
r.Connection, err = amqp.Dial(r.url) r.Connection, err = amqp.Dial(r.url)
if err != nil { if err != nil {
return err return err
} }
r.Channel, err = NewRabbitChannel(r.Connection) r.Channel, err = newRabbitChannel(r.Connection)
if err != nil { if err != nil {
return err return err
} }
r.Channel.DeclareExchange(r.exchange) r.Channel.DeclareExchange(r.exchange)
r.ExchangeChannel, err = NewRabbitChannel(r.Connection) r.ExchangeChannel, err = newRabbitChannel(r.Connection)
if err != nil { if err != nil {
return err return err
} }
return nil return nil
} }
func (r *RabbitConnection) Consume(queue string) (<-chan amqp.Delivery, error) { func (r *rabbitMQConn) Consume(queue string) (<-chan amqp.Delivery, error) {
consumerChannel, err := NewRabbitChannel(r.Connection) consumerChannel, err := newRabbitChannel(r.Connection)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -121,27 +142,6 @@ func (r *RabbitConnection) Consume(queue string) (<-chan amqp.Delivery, error) {
return deliveries, nil return deliveries, nil
} }
func (r *RabbitConnection) Publish(exchange, key string, msg amqp.Publishing) error { func (r *rabbitMQConn) Publish(exchange, key string, msg amqp.Publishing) error {
return r.ExchangeChannel.Publish(exchange, key, msg) return r.ExchangeChannel.Publish(exchange, key, msg)
} }
func NewRabbitConnection(exchange string, urls []string) *RabbitConnection {
var url string
if len(urls) > 0 && strings.HasPrefix(urls[0], "amqp://") {
url = urls[0]
} else {
url = DefaultRabbitURL
}
if len(exchange) == 0 {
exchange = DefaultExchange
}
return &RabbitConnection{
exchange: exchange,
url: url,
notify: make(chan bool, 1),
close: make(chan bool),
}
}

View File

@ -1,7 +1,6 @@
package transport package transport
import ( import (
"bytes"
"fmt" "fmt"
"sync" "sync"
"time" "time"
@ -12,7 +11,7 @@ import (
) )
type RabbitMQTransport struct { type RabbitMQTransport struct {
conn *RabbitConnection conn *rabbitMQConn
addrs []string addrs []string
} }
@ -27,13 +26,12 @@ type RabbitMQTransportClient struct {
} }
type RabbitMQTransportSocket struct { type RabbitMQTransportSocket struct {
d *amqp.Delivery conn *rabbitMQConn
hdr amqp.Table d *amqp.Delivery
buf *bytes.Buffer
} }
type RabbitMQTransportServer struct { type RabbitMQTransportListener struct {
conn *RabbitConnection conn *rabbitMQConn
addr string addr string
} }
@ -136,62 +134,63 @@ func (r *RabbitMQTransportClient) Close() error {
return nil return nil
} }
func (r *RabbitMQTransportSocket) Recv() (*Message, error) { func (r *RabbitMQTransportSocket) Recv(m *Message) error {
m := &Message{ if m == nil {
return errors.New("message passed in is nil")
}
mr := &Message{
Header: make(map[string]string), Header: make(map[string]string),
Body: r.d.Body, Body: r.d.Body,
} }
for k, v := range r.d.Headers { for k, v := range r.d.Headers {
m.Header[k] = fmt.Sprintf("%v", v) mr.Header[k] = fmt.Sprintf("%v", v)
} }
return m, nil *m = *mr
return nil
} }
func (r *RabbitMQTransportSocket) WriteHeader(k string, v string) { func (r *RabbitMQTransportSocket) Send(m *Message) error {
r.hdr[k] = v msg := amqp.Publishing{
CorrelationId: r.d.CorrelationId,
Timestamp: time.Now().UTC(),
Body: m.Body,
Headers: amqp.Table{},
}
for k, v := range m.Header {
msg.Headers[k] = v
}
return r.conn.Publish("", r.d.ReplyTo, msg)
} }
func (r *RabbitMQTransportSocket) Write(b []byte) error { func (r *RabbitMQTransportSocket) Close() error {
_, err := r.buf.Write(b) return nil
return err
} }
func (r *RabbitMQTransportServer) Addr() string { func (r *RabbitMQTransportListener) Addr() string {
return r.addr return r.addr
} }
func (r *RabbitMQTransportServer) Close() error { func (r *RabbitMQTransportListener) Close() error {
r.conn.Close() r.conn.Close()
return nil return nil
} }
func (r *RabbitMQTransportServer) Serve(fn func(Socket)) error { func (r *RabbitMQTransportListener) Accept(fn func(Socket)) error {
deliveries, err := r.conn.Consume(r.addr) deliveries, err := r.conn.Consume(r.addr)
if err != nil { if err != nil {
return err return err
} }
handler := func(d amqp.Delivery) { handler := func(d amqp.Delivery) {
buf := bytes.NewBuffer(nil)
headers := amqp.Table{}
fn(&RabbitMQTransportSocket{ fn(&RabbitMQTransportSocket{
d: &d, d: &d,
hdr: headers, conn: r.conn,
buf: buf,
}) })
msg := amqp.Publishing{
CorrelationId: d.CorrelationId,
Timestamp: time.Now().UTC(),
Body: buf.Bytes(),
Headers: headers,
}
r.conn.Publish("", d.ReplyTo, msg)
buf.Reset()
} }
for d := range deliveries { for d := range deliveries {
@ -201,7 +200,7 @@ func (r *RabbitMQTransportServer) Serve(fn func(Socket)) error {
return nil return nil
} }
func (r *RabbitMQTransport) NewClient(addr string) (Client, error) { func (r *RabbitMQTransport) Dial(addr string) (Client, error) {
id, err := uuid.NewV4() id, err := uuid.NewV4()
if err != nil { if err != nil {
return nil, err return nil, err
@ -215,16 +214,16 @@ func (r *RabbitMQTransport) NewClient(addr string) (Client, error) {
}, nil }, nil
} }
func (r *RabbitMQTransport) NewServer(addr string) (Server, error) { func (r *RabbitMQTransport) Listen(addr string) (Listener, error) {
id, err := uuid.NewV4() id, err := uuid.NewV4()
if err != nil { if err != nil {
return nil, err return nil, err
} }
conn := NewRabbitConnection("", r.addrs) conn := newRabbitMQConn("", r.addrs)
<-conn.Init() <-conn.Init()
return &RabbitMQTransportServer{ return &RabbitMQTransportListener{
addr: id.String(), addr: id.String(),
conn: conn, conn: conn,
}, nil }, nil
@ -232,7 +231,7 @@ func (r *RabbitMQTransport) NewServer(addr string) (Server, error) {
func NewRabbitMQTransport(addrs []string) *RabbitMQTransport { func NewRabbitMQTransport(addrs []string) *RabbitMQTransport {
return &RabbitMQTransport{ return &RabbitMQTransport{
conn: NewRabbitConnection("", addrs), conn: newRabbitMQConn("", addrs),
addrs: addrs, addrs: addrs,
} }
} }

View File

@ -6,9 +6,9 @@ type Message struct {
} }
type Socket interface { type Socket interface {
Recv() (*Message, error) Recv(*Message) error
WriteHeader(string, string) Send(*Message) error
Write([]byte) error Close() error
} }
type Client interface { type Client interface {
@ -16,25 +16,25 @@ type Client interface {
Close() error Close() error
} }
type Server interface { type Listener interface {
Addr() string Addr() string
Close() error Close() error
Serve(func(Socket)) error Accept(func(Socket)) error
} }
type Transport interface { type Transport interface {
NewClient(addr string) (Client, error) Dial(addr string) (Client, error)
NewServer(addr string) (Server, error) Listen(addr string) (Listener, error)
} }
var ( var (
DefaultTransport Transport = NewHttpTransport([]string{}) DefaultTransport Transport = NewHttpTransport([]string{})
) )
func NewClient(addr string) (Client, error) { func Dial(addr string) (Client, error) {
return DefaultTransport.NewClient(addr) return DefaultTransport.Dial(addr)
} }
func NewServer(addr string) (Server, error) { func Listen(addr string) (Listener, error) {
return DefaultTransport.NewServer(addr) return DefaultTransport.Listen(addr)
} }