micro/config/source/etcd/watcher.go

114 lines
1.8 KiB
Go
Raw Normal View History

package etcd
import (
"context"
"errors"
"sync"
"time"
cetcd "github.com/coreos/etcd/clientv3"
"github.com/micro/go-micro/v3/config/source"
)
type watcher struct {
opts source.Options
name string
stripPrefix string
sync.RWMutex
cs *source.ChangeSet
ch chan *source.ChangeSet
exit chan bool
}
func newWatcher(key, strip string, wc cetcd.Watcher, cs *source.ChangeSet, opts source.Options) (source.Watcher, error) {
w := &watcher{
opts: opts,
name: "etcd",
stripPrefix: strip,
cs: cs,
ch: make(chan *source.ChangeSet),
exit: make(chan bool),
}
ch := wc.Watch(context.Background(), key, cetcd.WithPrefix())
go w.run(wc, ch)
return w, nil
}
func (w *watcher) handle(evs []*cetcd.Event) {
w.RLock()
data := w.cs.Data
w.RUnlock()
var vals map[string]interface{}
// unpackage existing changeset
if err := w.opts.Encoder.Decode(data, &vals); err != nil {
return
}
// update base changeset
d := makeEvMap(w.opts.Encoder, vals, evs, w.stripPrefix)
// pack the changeset
b, err := w.opts.Encoder.Encode(d)
if err != nil {
return
}
// create new changeset
cs := &source.ChangeSet{
Timestamp: time.Now(),
Source: w.name,
Data: b,
Format: w.opts.Encoder.String(),
}
cs.Checksum = cs.Sum()
// set base change set
w.Lock()
w.cs = cs
w.Unlock()
// send update
w.ch <- cs
}
func (w *watcher) run(wc cetcd.Watcher, ch cetcd.WatchChan) {
for {
select {
case rsp, ok := <-ch:
if !ok {
return
}
w.handle(rsp.Events)
case <-w.exit:
wc.Close()
return
}
}
}
func (w *watcher) Next() (*source.ChangeSet, error) {
select {
case cs := <-w.ch:
return cs, nil
case <-w.exit:
return nil, errors.New("watcher stopped")
}
}
func (w *watcher) Stop() error {
select {
case <-w.exit:
return nil
default:
close(w.exit)
}
return nil
}