Add watcher.... OH YEAAA

This commit is contained in:
Asim 2016-05-01 19:31:03 +01:00
parent 59f1a9a07b
commit e14f9a0380
2 changed files with 94 additions and 1 deletions

View File

@ -19,6 +19,7 @@ import (
) )
type mdnsTxt struct { type mdnsTxt struct {
Service string
Version string Version string
Endpoints []*registry.Endpoint Endpoints []*registry.Endpoint
Metadata map[string]string Metadata map[string]string
@ -110,6 +111,7 @@ func (m *mdnsRegistry) Register(service *registry.Service, opts ...registry.Regi
} }
txt, err := encode(&mdnsTxt{ txt, err := encode(&mdnsTxt{
Service: service.Name,
Version: service.Version, Version: service.Version,
Endpoints: service.Endpoints, Endpoints: service.Endpoints,
Metadata: node.Metadata, Metadata: node.Metadata,
@ -207,6 +209,10 @@ func (m *mdnsRegistry) GetService(service string) ([]*registry.Service, error) {
continue continue
} }
if e.TTL == 0 {
continue
}
txt, err := decode(e.InfoFields) txt, err := decode(e.InfoFields)
if err != nil { if err != nil {
continue continue
@ -265,6 +271,10 @@ func (m *mdnsRegistry) ListServices() ([]*registry.Service, error) {
for { for {
select { select {
case e := <-entryCh: case e := <-entryCh:
if e.TTL == 0 {
continue
}
name := strings.TrimSuffix(e.Name, "."+p.Service+"."+p.Domain+".") name := strings.TrimSuffix(e.Name, "."+p.Service+"."+p.Domain+".")
if !serviceMap[name] { if !serviceMap[name] {
serviceMap[name] = true serviceMap[name] = true
@ -284,7 +294,18 @@ func (m *mdnsRegistry) ListServices() ([]*registry.Service, error) {
} }
func (m *mdnsRegistry) Watch() (registry.Watcher, error) { func (m *mdnsRegistry) Watch() (registry.Watcher, error) {
return nil, nil md := &mdnsWatcher{
ch: make(chan *mdns.ServiceEntry, 32),
exit: make(chan struct{}),
}
go func() {
if err := mdns.Listen(md.ch, md.exit); err != nil {
md.Stop()
}
}()
return md, nil
} }
func (m *mdnsRegistry) String() string { func (m *mdnsRegistry) String() string {

72
registry/mdns/watcher.go Normal file
View File

@ -0,0 +1,72 @@
package mdns
import (
"errors"
"strings"
"github.com/micro/go-micro/registry"
"github.com/micro/mdns"
)
type mdnsWatcher struct {
ch chan *mdns.ServiceEntry
exit chan struct{}
}
func (m *mdnsWatcher) Next() (*registry.Result, error) {
for {
select {
case e := <-m.ch:
txt, err := decode(e.InfoFields)
if err != nil {
continue
}
if len(txt.Service) == 0 || len(txt.Version) == 0 {
continue
}
var action string
if e.TTL == 0 {
action = "delete"
} else {
action = "create"
}
service := &registry.Service{
Name: txt.Service,
Version: txt.Version,
Endpoints: txt.Endpoints,
}
// TODO: don't hardcode .local.
if !strings.HasSuffix(e.Name, "."+service.Name+".local.") {
continue
}
service.Nodes = append(service.Nodes, &registry.Node{
Id: strings.TrimSuffix(e.Name, "."+service.Name+".local."),
Address: e.AddrV4.String(),
Port: e.Port,
Metadata: txt.Metadata,
})
return &registry.Result{
Action: action,
Service: service,
}, nil
case <-m.exit:
return nil, errors.New("watcher stopped")
}
}
}
func (m *mdnsWatcher) Stop() {
select {
case <-m.exit:
return
default:
close(m.exit)
}
}