114 lines
		
	
	
		
			1.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			114 lines
		
	
	
		
			1.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package etcd
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	cetcd "github.com/coreos/etcd/clientv3"
 | |
| 	"github.com/micro/go-micro/v3/config/source"
 | |
| )
 | |
| 
 | |
| type watcher struct {
 | |
| 	opts        source.Options
 | |
| 	name        string
 | |
| 	stripPrefix string
 | |
| 
 | |
| 	sync.RWMutex
 | |
| 	cs *source.ChangeSet
 | |
| 
 | |
| 	ch   chan *source.ChangeSet
 | |
| 	exit chan bool
 | |
| }
 | |
| 
 | |
| func newWatcher(key, strip string, wc cetcd.Watcher, cs *source.ChangeSet, opts source.Options) (source.Watcher, error) {
 | |
| 	w := &watcher{
 | |
| 		opts:        opts,
 | |
| 		name:        "etcd",
 | |
| 		stripPrefix: strip,
 | |
| 		cs:          cs,
 | |
| 		ch:          make(chan *source.ChangeSet),
 | |
| 		exit:        make(chan bool),
 | |
| 	}
 | |
| 
 | |
| 	ch := wc.Watch(context.Background(), key, cetcd.WithPrefix())
 | |
| 
 | |
| 	go w.run(wc, ch)
 | |
| 
 | |
| 	return w, nil
 | |
| }
 | |
| 
 | |
| func (w *watcher) handle(evs []*cetcd.Event) {
 | |
| 	w.RLock()
 | |
| 	data := w.cs.Data
 | |
| 	w.RUnlock()
 | |
| 
 | |
| 	var vals map[string]interface{}
 | |
| 
 | |
| 	// unpackage existing changeset
 | |
| 	if err := w.opts.Encoder.Decode(data, &vals); err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// update base changeset
 | |
| 	d := makeEvMap(w.opts.Encoder, vals, evs, w.stripPrefix)
 | |
| 
 | |
| 	// pack the changeset
 | |
| 	b, err := w.opts.Encoder.Encode(d)
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// create new changeset
 | |
| 	cs := &source.ChangeSet{
 | |
| 		Timestamp: time.Now(),
 | |
| 		Source:    w.name,
 | |
| 		Data:      b,
 | |
| 		Format:    w.opts.Encoder.String(),
 | |
| 	}
 | |
| 	cs.Checksum = cs.Sum()
 | |
| 
 | |
| 	// set base change set
 | |
| 	w.Lock()
 | |
| 	w.cs = cs
 | |
| 	w.Unlock()
 | |
| 
 | |
| 	// send update
 | |
| 	w.ch <- cs
 | |
| }
 | |
| 
 | |
| func (w *watcher) run(wc cetcd.Watcher, ch cetcd.WatchChan) {
 | |
| 	for {
 | |
| 		select {
 | |
| 		case rsp, ok := <-ch:
 | |
| 			if !ok {
 | |
| 				return
 | |
| 			}
 | |
| 			w.handle(rsp.Events)
 | |
| 		case <-w.exit:
 | |
| 			wc.Close()
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (w *watcher) Next() (*source.ChangeSet, error) {
 | |
| 	select {
 | |
| 	case cs := <-w.ch:
 | |
| 		return cs, nil
 | |
| 	case <-w.exit:
 | |
| 		return nil, errors.New("watcher stopped")
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (w *watcher) Stop() error {
 | |
| 	select {
 | |
| 	case <-w.exit:
 | |
| 		return nil
 | |
| 	default:
 | |
| 		close(w.exit)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 |