rearrange where we account for errors and data sent

This commit is contained in:
Asim Aslam 2019-10-23 16:15:39 +01:00
parent 842fc01568
commit fe180148a1

View File

@ -160,53 +160,11 @@ func (l *link) expiry() {
} }
func (l *link) send(m *transport.Message) error { func (l *link) send(m *transport.Message) error {
dataSent := len(m.Body) if m.Header == nil {
m.Header = make(map[string]string)
// set header length
for k, v := range m.Header {
dataSent += (len(k) + len(v))
} }
// get time now
now := time.Now()
// send the message // send the message
err := l.Socket.Send(m) return l.Socket.Send(m)
l.Lock()
defer l.Unlock()
// there's an error increment the counter and bail
if err != nil {
l.errCount++
return err
}
// reset the counter
l.errCount = 0
// calculate based on data
if dataSent > 0 {
// measure time taken
delta := time.Since(now)
// bit sent
bits := dataSent * 1024
// rate of send in bits per nanosecond
rate := float64(bits) / float64(delta.Nanoseconds())
// default the rate if its zero
if l.rate == 0 {
// rate per second
l.rate = rate * 1e9
} else {
// set new rate per second
l.rate = 0.8*l.rate + 0.2*(rate*1e9)
}
}
return err
} }
// recv a message on the link // recv a message on the link
@ -214,9 +172,6 @@ func (l *link) recv(m *transport.Message) error {
if m.Header == nil { if m.Header == nil {
m.Header = make(map[string]string) m.Header = make(map[string]string)
} }
// TODO measure the receive latency if possible
// receive the transport message // receive the transport message
return l.Socket.Recv(m) return l.Socket.Recv(m)
} }
@ -269,6 +224,10 @@ func (l *link) Send(m *transport.Message) error {
status: make(chan error, 1), status: make(chan error, 1),
} }
// get time now
now := time.Now()
// queue the message
select { select {
case <-l.closed: case <-l.closed:
return io.EOF return io.EOF
@ -276,15 +235,57 @@ func (l *link) Send(m *transport.Message) error {
// in the send queue // in the send queue
} }
// error to use
var err error
// wait for response // wait for response
select { select {
case <-l.closed: case <-l.closed:
return io.EOF return io.EOF
case err := <-p.status: case err = <-p.status:
}
l.Lock()
defer l.Unlock()
// there's an error increment the counter and bail
if err != nil {
l.errCount++
return err return err
} }
// never reached // reset the counter
l.errCount = 0
// calculate the data sent
dataSent := len(m.Body)
// set header length
for k, v := range m.Header {
dataSent += (len(k) + len(v))
}
// calculate based on data
if dataSent > 0 {
// measure time taken
delta := time.Since(now)
// bit sent
bits := dataSent * 1024
// rate of send in bits per nanosecond
rate := float64(bits) / float64(delta.Nanoseconds())
// default the rate if its zero
if l.rate == 0 {
// rate per second
l.rate = rate * 1e9
} else {
// set new rate per second
l.rate = 0.8*l.rate + 0.2*(rate*1e9)
}
}
return nil return nil
} }