Merge pull request #58 from YorikSar/refactor_request

Move fetching response from channel to request() method
This commit is contained in:
Geoff Hickey 2018-02-23 14:51:59 -05:00 committed by GitHub
commit 101d836616
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 802 additions and 3180 deletions

View File

@ -140,17 +140,11 @@ func (l *Libvirt) {{.Name}}({{range $ix, $arg := .Args}}{{if $ix}}, {{end}}{{.Na
return return
} }
{{end}} {{end}}
var resp <-chan response {{if .RetStruct}} var r response{{end}}
resp, err = l.request({{.Num}}, constants.Program, &buf) {{if .RetStruct}}r{{else}}_{{end}}, err = l.request({{.Num}}, constants.Program, &buf)
if err != nil { if err != nil {
return return
} }
r := <-resp
if r.Status != StatusOK {
err = decodeError(r.Payload)
return
}
{{if .RetStruct}} {{if .RetStruct}}
// Return value unmarshaling // Return value unmarshaling
rdr := bytes.NewReader(r.Payload) rdr := bytes.NewReader(r.Payload)

File diff suppressed because it is too large Load Diff

View File

@ -152,19 +152,12 @@ func (l *Libvirt) Events(dom string) (<-chan DomainEvent, error) {
return nil, err return nil, err
} }
resp, err := l.request(constants.QEMUConnectDomainMonitorEventRegister, constants.ProgramQEMU, &buf) res, err := l.request(constants.QEMUConnectDomainMonitorEventRegister, constants.ProgramQEMU, &buf)
if err != nil { if err != nil {
return nil, err
}
res := <-resp
if res.Status != StatusOK {
err = decodeError(res.Payload)
if err == ErrUnsupported { if err == ErrUnsupported {
return nil, ErrEventsNotSupported return nil, ErrEventsNotSupported
} }
return nil, err
return nil, decodeError(res.Payload)
} }
dec := xdr.NewDecoder(bytes.NewReader(res.Payload)) dec := xdr.NewDecoder(bytes.NewReader(res.Payload))
@ -250,17 +243,11 @@ func (l *Libvirt) Run(dom string, cmd []byte) ([]byte, error) {
return nil, err return nil, err
} }
resp, err := l.request(constants.QEMUDomainMonitor, constants.ProgramQEMU, &buf) res, err := l.request(constants.QEMUDomainMonitor, constants.ProgramQEMU, &buf)
if err != nil { if err != nil {
return nil, err return nil, err
} }
res := <-resp
// check for libvirt errors
if res.Status != StatusOK {
return nil, decodeError(res.Payload)
}
// check for QEMU process errors // check for QEMU process errors
if err = getQEMUError(res); err != nil { if err = getQEMUError(res); err != nil {
return nil, err return nil, err

53
rpc.go
View File

@ -131,41 +131,22 @@ func (l *Libvirt) connect() error {
// libvirt requires that we call auth-list prior to connecting, // libvirt requires that we call auth-list prior to connecting,
// event when no authentication is used. // event when no authentication is used.
resp, err := l.request(constants.ProcAuthList, constants.Program, &buf) _, err = l.request(constants.ProcAuthList, constants.Program, &buf)
if err != nil { if err != nil {
return err return err
} }
r := <-resp _, err = l.request(constants.ProcConnectOpen, constants.Program, &buf)
if r.Status != StatusOK {
return decodeError(r.Payload)
}
resp, err = l.request(constants.ProcConnectOpen, constants.Program, &buf)
if err != nil { if err != nil {
return err return err
} }
r = <-resp
if r.Status != StatusOK {
return decodeError(r.Payload)
}
return nil return nil
} }
func (l *Libvirt) disconnect() error { func (l *Libvirt) disconnect() error {
resp, err := l.request(constants.ProcConnectClose, constants.Program, nil) _, err := l.request(constants.ProcConnectClose, constants.Program, nil)
if err != nil { return err
return err
}
r := <-resp
if r.Status != StatusOK {
return decodeError(r.Payload)
}
return nil
} }
// listen processes incoming data and routes // listen processes incoming data and routes
@ -281,16 +262,11 @@ func (l *Libvirt) removeStream(id uint32) error {
return err return err
} }
resp, err := l.request(constants.QEMUConnectDomainMonitorEventDeregister, constants.ProgramQEMU, &buf) _, err = l.request(constants.QEMUConnectDomainMonitorEventDeregister, constants.ProgramQEMU, &buf)
if err != nil { if err != nil {
return err return err
} }
res := <-resp
if res.Status != StatusOK {
return decodeError(res.Payload)
}
l.em.Lock() l.em.Lock()
delete(l.events, id) delete(l.events, id)
l.em.Unlock() l.em.Unlock()
@ -314,9 +290,9 @@ func (l *Libvirt) deregister(id uint32) {
} }
// request performs a libvirt RPC request. // request performs a libvirt RPC request.
// The returned channel is used by the caller to receive the asynchronous // returns response returned by server.
// call response. The channel is closed once a response has been sent. // if response is not OK, decodes error from it and returns it.
func (l *Libvirt) request(proc uint32, program uint32, payload *bytes.Buffer) (<-chan response, error) { func (l *Libvirt) request(proc uint32, program uint32, payload *bytes.Buffer) (response, error) {
serial := l.serial() serial := l.serial()
c := make(chan response) c := make(chan response)
@ -344,22 +320,27 @@ func (l *Libvirt) request(proc uint32, program uint32, payload *bytes.Buffer) (<
defer l.mu.Unlock() defer l.mu.Unlock()
err := binary.Write(l.w, binary.BigEndian, p) err := binary.Write(l.w, binary.BigEndian, p)
if err != nil { if err != nil {
return nil, err return response{}, err
} }
// write payload // write payload
if payload != nil { if payload != nil {
err = binary.Write(l.w, binary.BigEndian, payload.Bytes()) err = binary.Write(l.w, binary.BigEndian, payload.Bytes())
if err != nil { if err != nil {
return nil, err return response{}, err
} }
} }
if err := l.w.Flush(); err != nil { if err := l.w.Flush(); err != nil {
return nil, err return response{}, err
} }
return c, nil resp := <-c
if resp.Status != StatusOK {
return resp, decodeError(resp.Payload)
}
return resp, nil
} }
// encode XDR encodes the provided data. // encode XDR encodes the provided data.