add support for streaming requests. cleanup watcher initilisation

This commit is contained in:
Asim
2015-06-01 18:55:27 +01:00
parent fa2c27b64f
commit 09c784d294
25 changed files with 729 additions and 384 deletions

View File

@@ -15,18 +15,21 @@ import (
type rmqtport struct {
conn *rabbitMQConn
addrs []string
}
type rmqtportClient struct {
once sync.Once
rt *rmqtport
addr string
replyTo string
sync.Mutex
inflight map[string]chan amqp.Delivery
}
type rmqtportClient struct {
rt *rmqtport
addr string
corId string
reply chan amqp.Delivery
}
type rmqtportSocket struct {
conn *rabbitMQConn
d *amqp.Delivery
@@ -37,86 +40,34 @@ type rmqtportListener struct {
addr string
}
func (r *rmqtportClient) init() {
<-r.rt.conn.Init()
if err := r.rt.conn.Channel.DeclareReplyQueue(r.replyTo); err != nil {
return
}
deliveries, err := r.rt.conn.Channel.ConsumeQueue(r.replyTo)
if err != nil {
return
}
go func() {
for delivery := range deliveries {
go r.handle(delivery)
}
}()
}
func (r *rmqtportClient) handle(delivery amqp.Delivery) {
ch := r.getReq(delivery.CorrelationId)
if ch == nil {
return
}
select {
case ch <- delivery:
default:
}
}
func (r *rmqtportClient) putReq(id string) chan amqp.Delivery {
r.Lock()
ch := make(chan amqp.Delivery, 1)
r.inflight[id] = ch
r.Unlock()
return ch
}
func (r *rmqtportClient) getReq(id string) chan amqp.Delivery {
r.Lock()
defer r.Unlock()
if ch, ok := r.inflight[id]; ok {
delete(r.inflight, id)
return ch
}
return nil
}
func (r *rmqtportClient) Send(m *transport.Message) (*transport.Message, error) {
r.once.Do(r.init)
func (r *rmqtportClient) Send(m *transport.Message) error {
if !r.rt.conn.IsConnected() {
return nil, errors.New("Not connected to AMQP")
return errors.New("Not connected to AMQP")
}
id, err := uuid.NewV4()
if err != nil {
return nil, err
}
replyChan := r.putReq(id.String())
headers := amqp.Table{}
for k, v := range m.Header {
headers[k] = v
}
message := amqp.Publishing{
CorrelationId: id.String(),
CorrelationId: r.corId,
Timestamp: time.Now().UTC(),
Body: m.Body,
ReplyTo: r.replyTo,
ReplyTo: r.rt.replyTo,
Headers: headers,
}
if err := r.rt.conn.Publish("micro", r.addr, message); err != nil {
r.getReq(id.String())
return nil, err
return err
}
return nil
}
func (r *rmqtportClient) Recv(m *transport.Message) error {
select {
case d := <-replyChan:
case d := <-r.reply:
mr := &transport.Message{
Header: make(map[string]string),
Body: d.Body,
@@ -126,13 +77,15 @@ func (r *rmqtportClient) Send(m *transport.Message) (*transport.Message, error)
mr.Header[k] = fmt.Sprintf("%v", v)
}
return mr, nil
*m = *mr
return nil
case <-time.After(time.Second * 10):
return nil, errors.New("timed out")
return errors.New("timed out")
}
}
func (r *rmqtportClient) Close() error {
r.rt.popReq(r.corId)
return nil
}
@@ -202,17 +155,68 @@ func (r *rmqtportListener) Accept(fn func(transport.Socket)) error {
return nil
}
func (r *rmqtport) Dial(addr string) (transport.Client, error) {
func (r *rmqtport) putReq(id string) chan amqp.Delivery {
r.Lock()
ch := make(chan amqp.Delivery, 1)
r.inflight[id] = ch
r.Unlock()
return ch
}
func (r *rmqtport) getReq(id string) chan amqp.Delivery {
r.Lock()
defer r.Unlock()
if ch, ok := r.inflight[id]; ok {
return ch
}
return nil
}
func (r *rmqtport) popReq(id string) {
r.Lock()
defer r.Unlock()
if _, ok := r.inflight[id]; ok {
delete(r.inflight, id)
}
}
func (r *rmqtport) init() {
<-r.conn.Init()
if err := r.conn.Channel.DeclareReplyQueue(r.replyTo); err != nil {
return
}
deliveries, err := r.conn.Channel.ConsumeQueue(r.replyTo)
if err != nil {
return
}
go func() {
for delivery := range deliveries {
go r.handle(delivery)
}
}()
}
func (r *rmqtport) handle(delivery amqp.Delivery) {
ch := r.getReq(delivery.CorrelationId)
if ch == nil {
return
}
ch <- delivery
}
func (r *rmqtport) Dial(addr string, opts ...transport.DialOption) (transport.Client, error) {
id, err := uuid.NewV4()
if err != nil {
return nil, err
}
r.once.Do(r.init)
return &rmqtportClient{
rt: r,
addr: addr,
inflight: make(map[string]chan amqp.Delivery),
replyTo: fmt.Sprintf("replyTo-%s", id.String()),
rt: r,
addr: addr,
corId: id.String(),
reply: r.putReq(id.String()),
}, nil
}
@@ -232,8 +236,12 @@ func (r *rmqtport) Listen(addr string) (transport.Listener, error) {
}
func NewTransport(addrs []string, opt ...transport.Option) transport.Transport {
id, _ := uuid.NewV4()
return &rmqtport{
conn: newRabbitMQConn("", addrs),
addrs: addrs,
conn: newRabbitMQConn("", addrs),
addrs: addrs,
replyTo: id.String(),
inflight: make(map[string]chan amqp.Delivery),
}
}