Extract processIncomingStream func to make requestStream cleaner

This commit is contained in:
Yuriy Taraday 2018-05-06 00:56:21 +04:00
parent f6ceaff755
commit 7442c709b1

62
rpc.go
View File

@ -298,6 +298,27 @@ func (l *Libvirt) request(proc uint32, program uint32, payload []byte) (response
return l.requestStream(proc, program, payload, nil, nil) return l.requestStream(proc, program, payload, nil, nil)
} }
func (l *Libvirt) processIncomingStream(c chan response, inStream io.Writer) (response, error) {
for {
resp, err := l.getResponse(c)
if err != nil {
return resp, err
}
// StatusOK here means end of stream
if resp.Status == StatusOK {
return resp, nil
}
// StatusError is handled in getResponse, so this is StatusContinue
// StatusContinue is valid here only for stream packets
if inStream != nil {
_, err = inStream.Write(resp.Payload)
if err != nil {
return response{}, err
}
}
}
}
func (l *Libvirt) requestStream(proc uint32, program uint32, payload []byte, outStream io.Reader, inStream io.Writer) (response, error) { func (l *Libvirt) requestStream(proc uint32, program uint32, payload []byte, outStream io.Reader, inStream io.Writer) (response, error) {
serial := l.serial() serial := l.serial()
c := make(chan response) c := make(chan response)
@ -315,49 +336,26 @@ func (l *Libvirt) requestStream(proc uint32, program uint32, payload []byte, out
return resp, err return resp, err
} }
var abortOutStream chan bool
var outStreamErr chan error
if outStream != nil { if outStream != nil {
abortOutStream = make(chan bool) abortOutStream := make(chan bool)
outStreamErr = make(chan error) outStreamErr := make(chan error)
go func() { go func() {
outStreamErr <- l.sendStream(serial, proc, program, outStream, abortOutStream) outStreamErr <- l.sendStream(serial, proc, program, outStream, abortOutStream)
}() }()
}
// Even without incoming stream server sends confirmation once all data is received // Even without incoming stream server sends confirmation once all data is received
if outStream != nil || inStream != nil { resp, err = l.processIncomingStream(c, inStream)
for { if err != nil {
resp, err = l.getResponse(c) abortOutStream <- true
if err != nil { return resp, err
if outStream != nil {
abortOutStream <- true
}
return resp, err
}
// Continue is valid here only for stream packets
if resp.Status == StatusContinue {
if inStream != nil {
_, err = inStream.Write(resp.Payload)
if err != nil {
if outStream != nil {
abortOutStream <- true
}
return response{}, err
}
}
} else {
// StatusError is handled in getResponse, so this is StatusOK, end of stream
break
}
} }
}
if outStream != nil {
err = <-outStreamErr err = <-outStreamErr
if err != nil { if err != nil {
return response{}, err return response{}, err
} }
} else if inStream != nil {
return l.processIncomingStream(c, inStream)
} }
return resp, nil return resp, nil