Simplified k8s API Body watcher code and test. (#923)
This commit is contained in:
parent
1ffa289d39
commit
0b1e6d7eaf
@ -4,7 +4,6 @@ import (
|
|||||||
"bufio"
|
"bufio"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// bodyWatcher scans the body of a request for chunks
|
// bodyWatcher scans the body of a request for chunks
|
||||||
@ -34,16 +33,6 @@ func (wr *bodyWatcher) Stop() {
|
|||||||
func (wr *bodyWatcher) stream() {
|
func (wr *bodyWatcher) stream() {
|
||||||
reader := bufio.NewReader(wr.res.Body)
|
reader := bufio.NewReader(wr.res.Body)
|
||||||
|
|
||||||
// ignore first few messages from stream,
|
|
||||||
// as they are usually old.
|
|
||||||
ignore := true
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
<-time.After(time.Second)
|
|
||||||
ignore = false
|
|
||||||
}()
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
// stop the watcher
|
// stop the watcher
|
||||||
defer wr.Stop()
|
defer wr.Stop()
|
||||||
|
|
||||||
@ -54,11 +43,6 @@ func (wr *bodyWatcher) stream() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// ignore for the first second
|
|
||||||
if ignore {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// send the event
|
// send the event
|
||||||
var event Event
|
var event Event
|
||||||
if err := json.Unmarshal(b, &event); err != nil {
|
if err := json.Unmarshal(b, &event); err != nil {
|
||||||
@ -66,11 +50,9 @@ func (wr *bodyWatcher) stream() {
|
|||||||
}
|
}
|
||||||
wr.results <- event
|
wr.results <- event
|
||||||
}
|
}
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBodyWatcher creates a k8s body watcher for
|
// NewBodyWatcher creates a k8s body watcher for a given http request
|
||||||
// a given http request
|
|
||||||
func NewBodyWatcher(req *http.Request, client *http.Client) (Watch, error) {
|
func NewBodyWatcher(req *http.Request, client *http.Client) (Watch, error) {
|
||||||
stop := make(chan struct{})
|
stop := make(chan struct{})
|
||||||
req.Cancel = stop
|
req.Cancel = stop
|
||||||
|
@ -37,17 +37,15 @@ func TestBodyWatcher(t *testing.T) {
|
|||||||
|
|
||||||
req, err := http.NewRequest("GET", ts.URL, nil)
|
req, err := http.NewRequest("GET", ts.URL, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("did not expect NewRequest to return err: %v", err)
|
t.Fatalf("failed to create new request: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// setup body watcher
|
// setup body watcher
|
||||||
w, err := NewBodyWatcher(req, http.DefaultClient)
|
w, err := NewBodyWatcher(req, http.DefaultClient)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("did not expect NewBodyWatcher to return %v", err)
|
t.Fatalf("failed to create new BodyWatcher %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
<-time.After(time.Second)
|
|
||||||
|
|
||||||
// send action strings in, and expect result back
|
// send action strings in, and expect result back
|
||||||
ch <- actions[0]
|
ch <- actions[0]
|
||||||
if r := <-w.ResultChan(); r.Type != "create" {
|
if r := <-w.ResultChan(); r.Type != "create" {
|
||||||
|
Loading…
Reference in New Issue
Block a user