Add input and output streams handling to rpc layer
This commit is contained in:
		
							
								
								
									
										82
									
								
								rpc.go
									
									
									
									
									
								
							
							
						
						
									
										82
									
								
								rpc.go
									
									
									
									
									
								
							| @@ -303,6 +303,10 @@ Loop: | ||||
| // returns response returned by server. | ||||
| // if response is not OK, decodes error from it and returns it. | ||||
| func (l *Libvirt) request(proc uint32, program uint32, payload []byte) (response, error) { | ||||
| 	return l.requestStream(proc, program, payload, nil, nil) | ||||
| } | ||||
|  | ||||
| func (l *Libvirt) requestStream(proc uint32, program uint32, payload []byte, outStream io.Reader, inStream io.Writer) (response, error) { | ||||
| 	serial := l.serial() | ||||
| 	c := make(chan response) | ||||
|  | ||||
| @@ -319,6 +323,84 @@ func (l *Libvirt) request(proc uint32, program uint32, payload []byte) (response | ||||
| 		return resp, err | ||||
| 	} | ||||
|  | ||||
| 	var abortOutStream chan bool | ||||
| 	var outStreamErr chan error | ||||
| 	if outStream != nil { | ||||
| 		abortOutStream = make(chan bool) | ||||
| 		outStreamErr = make(chan error) | ||||
| 		go func() { | ||||
| 			var err error | ||||
| 			var n int | ||||
| 			buf := make([]byte, 1<<22-24) | ||||
| 			for { | ||||
| 				select { | ||||
| 				case <-abortOutStream: | ||||
| 					err = l.sendPacket(serial, proc, program, nil, Stream, StatusError) | ||||
| 					break | ||||
| 				default: | ||||
| 				} | ||||
| 				n, err = outStream.Read(buf) | ||||
| 				if err != nil { | ||||
| 					if err == io.EOF { | ||||
| 						err = l.sendPacket(serial, proc, program, nil, Stream, StatusOK) | ||||
| 						break | ||||
| 					} else { | ||||
| 						// keep original error | ||||
| 						err := l.sendPacket(serial, proc, program, nil, Stream, StatusError) | ||||
| 						if err != nil { | ||||
| 							outStreamErr <- err | ||||
| 							return | ||||
| 						} else { | ||||
| 							break | ||||
| 						} | ||||
| 					} | ||||
| 				} | ||||
| 				if n > 0 { | ||||
| 					err = l.sendPacket(serial, proc, program, buf[:n], Stream, StatusContinue) | ||||
| 					if err != nil { | ||||
| 						break | ||||
| 					} | ||||
| 				} | ||||
| 			} | ||||
| 			outStreamErr <- err | ||||
| 		}() | ||||
| 	} | ||||
|  | ||||
| 	// Even without incoming stream server sends confirmation once all data is received | ||||
| 	if outStream != nil || inStream != nil { | ||||
| 		for { | ||||
| 			resp, err = l.getResponse(c) | ||||
| 			if err != nil { | ||||
| 				if outStream != nil { | ||||
| 					abortOutStream <- true | ||||
| 				} | ||||
| 				return resp, err | ||||
| 			} | ||||
| 			// Continue is valid here only for stream packets | ||||
| 			if resp.Status == StatusContinue { | ||||
| 				if inStream != nil { | ||||
| 					_, err = inStream.Write(resp.Payload) | ||||
| 					if err != nil { | ||||
| 						if outStream != nil { | ||||
| 							abortOutStream <- true | ||||
| 						} | ||||
| 						return response{}, err | ||||
| 					} | ||||
| 				} | ||||
| 			} else { | ||||
| 				// StatusError is handled in getResponse, so this is StatusOK, end of stream | ||||
| 				break | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if outStream != nil { | ||||
| 		err = <-outStreamErr | ||||
| 		if err != nil { | ||||
| 			return response{}, nil | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return resp, nil | ||||
| } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user