We need the right sequence number for the stream

This commit is contained in:
Asim 2016-01-21 01:03:27 +00:00
parent 8724e68ae4
commit 93ea171b31
2 changed files with 6 additions and 4 deletions

View File

@ -30,18 +30,16 @@ func (r *rpcStream) Send(msg interface{}) error {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
seq := r.seq
r.seq++
resp := response{ resp := response{
ServiceMethod: r.request.Method(), ServiceMethod: r.request.Method(),
Seq: seq, Seq: r.seq,
} }
err := r.codec.WriteResponse(&resp, msg, false) err := r.codec.WriteResponse(&resp, msg, false)
if err != nil { if err != nil {
log.Println("rpc: writing response:", err) log.Println("rpc: writing response:", err)
} }
return err return err
} }
@ -57,6 +55,9 @@ func (r *rpcStream) Recv(msg interface{}) error {
return err return err
} }
// we need to stay upto date with sequence numbers
r.seq = req.Seq
if err := r.codec.ReadRequestBody(msg); err != nil { if err := r.codec.ReadRequestBody(msg); err != nil {
return err return err
} }

View File

@ -277,6 +277,7 @@ func (s *service) call(ctx context.Context, server *server, sending *sync.Mutex,
context: ctx, context: ctx,
codec: codec, codec: codec,
request: r, request: r,
seq: req.Seq,
} }
// Invoke the method, providing a new value for the reply. // Invoke the method, providing a new value for the reply.