* First commit: outline of K8s runtime package * Added poller. Added auto-updater into default runtime * Added build and updated Poller interface * Added comments and NewRuntime that accepts Options * DefaultPoller; Runtime options * First commit to add Kubernetes cruft * Add comments * Add micro- prefix to K8s runtime service names * Get rid of import cycles. Move K8s runtime into main runtime package * Major refactoring: Poller replaced by Notifier POller has been replaced by Notifier which returns a channel of events that can be consumed and acted upon. * Added runtime configuration options * K8s runtime is now Kubernetes runtime in dedicated pkg. Naming kung-fu. * Fix typo in command. * Fixed typo * Dont Delete service when runtime stops. runtime.Stop stops services; no need to double-stop * Track runtime services * Parse Unix timestamps properly * Added deployments into K8s client. Debug logging
		
			
				
	
	
		
			93 lines
		
	
	
		
			1.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			93 lines
		
	
	
		
			1.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package watch
 | |
| 
 | |
| import (
 | |
| 	"bufio"
 | |
| 	"encoding/json"
 | |
| 	"net/http"
 | |
| 	"time"
 | |
| )
 | |
| 
 | |
| // bodyWatcher scans the body of a request for chunks
 | |
| type bodyWatcher struct {
 | |
| 	results chan Event
 | |
| 	stop    chan struct{}
 | |
| 	res     *http.Response
 | |
| 	req     *http.Request
 | |
| }
 | |
| 
 | |
| // Changes returns the results channel
 | |
| func (wr *bodyWatcher) ResultChan() <-chan Event {
 | |
| 	return wr.results
 | |
| }
 | |
| 
 | |
| // Stop cancels the request
 | |
| func (wr *bodyWatcher) Stop() {
 | |
| 	select {
 | |
| 	case <-wr.stop:
 | |
| 		return
 | |
| 	default:
 | |
| 		close(wr.stop)
 | |
| 		close(wr.results)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (wr *bodyWatcher) stream() {
 | |
| 	reader := bufio.NewReader(wr.res.Body)
 | |
| 
 | |
| 	// ignore first few messages from stream,
 | |
| 	// as they are usually old.
 | |
| 	ignore := true
 | |
| 
 | |
| 	go func() {
 | |
| 		<-time.After(time.Second)
 | |
| 		ignore = false
 | |
| 	}()
 | |
| 
 | |
| 	go func() {
 | |
| 		// stop the watcher
 | |
| 		defer wr.Stop()
 | |
| 
 | |
| 		for {
 | |
| 			// read a line
 | |
| 			b, err := reader.ReadBytes('\n')
 | |
| 			if err != nil {
 | |
| 				return
 | |
| 			}
 | |
| 
 | |
| 			// ignore for the first second
 | |
| 			if ignore {
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			// send the event
 | |
| 			var event Event
 | |
| 			if err := json.Unmarshal(b, &event); err != nil {
 | |
| 				continue
 | |
| 			}
 | |
| 			wr.results <- event
 | |
| 		}
 | |
| 	}()
 | |
| }
 | |
| 
 | |
| // NewBodyWatcher creates a k8s body watcher for
 | |
| // a given http request
 | |
| func NewBodyWatcher(req *http.Request, client *http.Client) (Watch, error) {
 | |
| 	stop := make(chan struct{})
 | |
| 	req.Cancel = stop
 | |
| 
 | |
| 	res, err := client.Do(req)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	wr := &bodyWatcher{
 | |
| 		results: make(chan Event),
 | |
| 		stop:    stop,
 | |
| 		req:     req,
 | |
| 		res:     res,
 | |
| 	}
 | |
| 
 | |
| 	go wr.stream()
 | |
| 	return wr, nil
 | |
| }
 |