Milos Gajdos 6f7702a093 [WIP] K8s update and runtime package changes (#895)
* First commit: outline of K8s runtime package

* Added poller. Added auto-updater into default runtime

* Added build and updated Poller interface

* Added comments and NewRuntime that accepts Options

* DefaultPoller; Runtime options

* First commit to add Kubernetes cruft

* Add comments

* Add micro- prefix to K8s runtime service names

* Get rid of import cycles. Move K8s runtime into main runtime package

* Major refactoring: Poller replaced by Notifier

POller has been replaced by Notifier which returns a channel of events
that can be consumed and acted upon.

* Added runtime configuration options

* K8s runtime is now Kubernetes runtime in dedicated pkg. Naming kung-fu.

* Fix typo in command.

* Fixed typo

* Dont Delete service when runtime stops.

runtime.Stop stops services; no need to double-stop

* Track runtime services

* Parse Unix timestamps properly

* Added deployments into K8s client. Debug logging
2019-11-02 13:25:10 +00:00

72 lines
1.6 KiB
Go

package watch
import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"
)
var actions = []string{
`{"type": "create", "object":{"foo": "bar"}}`,
`{"type": "delete", INVALID}`,
`{"type": "update", "object":{"foo": {"foo": "bar"}}}`,
`{"type": "delete", "object":null}`,
}
func TestBodyWatcher(t *testing.T) {
// set up server with handler to flush strings from ch.
ch := make(chan string)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
t.Fatal("expected ResponseWriter to be a flusher")
}
fmt.Fprintf(w, "\n")
flusher.Flush()
for v := range ch {
fmt.Fprintf(w, "%s\n", v)
flusher.Flush()
time.Sleep(10 * time.Millisecond)
}
}))
defer ts.Close()
req, err := http.NewRequest("GET", ts.URL, nil)
if err != nil {
t.Fatalf("did not expect NewRequest to return err: %v", err)
}
// setup body watcher
w, err := NewBodyWatcher(req, http.DefaultClient)
if err != nil {
t.Fatalf("did not expect NewBodyWatcher to return %v", err)
}
<-time.After(time.Second)
// send action strings in, and expect result back
ch <- actions[0]
if r := <-w.ResultChan(); r.Type != "create" {
t.Fatalf("expected result to be create")
}
ch <- actions[1] // should be ignored as its invalid json
ch <- actions[2]
if r := <-w.ResultChan(); r.Type != "update" {
t.Fatalf("expected result to be update")
}
ch <- actions[3]
if r := <-w.ResultChan(); r.Type != "delete" {
t.Fatalf("expected result to be delete")
}
// stop should clean up all channels.
w.Stop()
close(ch)
}