strip down mdns watcher

This commit is contained in:
Asim Aslam 2020-04-12 11:01:09 +01:00
parent 4e539361fa
commit cf67d460b7
4 changed files with 221 additions and 236 deletions

View File

@ -54,6 +54,17 @@ type mdnsRegistry struct {
listener chan *mdns.ServiceEntry
}
type mdnsWatcher struct {
id string
wo WatchOptions
ch chan *mdns.ServiceEntry
exit chan struct{}
// the mdns domain
domain string
// the registry
registry *mdnsRegistry
}
func encode(txt *mdnsTxt) ([]string, error) {
b, err := json.Marshal(txt)
if err != nil {
@ -534,6 +545,74 @@ func (m *mdnsRegistry) String() string {
return "mdns"
}
func (m *mdnsWatcher) Next() (*Result, error) {
for {
select {
case e := <-m.ch:
txt, err := decode(e.InfoFields)
if err != nil {
continue
}
if len(txt.Service) == 0 || len(txt.Version) == 0 {
continue
}
// Filter watch options
// wo.Service: Only keep services we care about
if len(m.wo.Service) > 0 && txt.Service != m.wo.Service {
continue
}
var action string
if e.TTL == 0 {
action = "delete"
} else {
action = "create"
}
service := &Service{
Name: txt.Service,
Version: txt.Version,
Endpoints: txt.Endpoints,
}
// skip anything without the domain we care about
suffix := fmt.Sprintf(".%s.%s.", service.Name, m.domain)
if !strings.HasSuffix(e.Name, suffix) {
continue
}
service.Nodes = append(service.Nodes, &Node{
Id: strings.TrimSuffix(e.Name, suffix),
Address: fmt.Sprintf("%s:%d", e.AddrV4.String(), e.Port),
Metadata: txt.Metadata,
})
return &Result{
Action: action,
Service: service,
}, nil
case <-m.exit:
return nil, ErrWatcherStopped
}
}
}
func (m *mdnsWatcher) Stop() {
select {
case <-m.exit:
return
default:
close(m.exit)
// remove self from the registry
m.registry.mtx.Lock()
delete(m.registry.watchers, m.id)
m.registry.mtx.Unlock()
}
}
// NewRegistry returns a new default registry which is mdns
func NewRegistry(opts ...Option) Registry {
return newRegistry(opts...)

View File

@ -197,3 +197,145 @@ func TestEncoding(t *testing.T) {
}
}
func TestWatcher(t *testing.T) {
if travis := os.Getenv("TRAVIS"); travis == "true" {
t.Skip()
}
testData := []*Service{
{
Name: "test1",
Version: "1.0.1",
Nodes: []*Node{
{
Id: "test1-1",
Address: "10.0.0.1:10001",
Metadata: map[string]string{
"foo": "bar",
},
},
},
},
{
Name: "test2",
Version: "1.0.2",
Nodes: []*Node{
{
Id: "test2-1",
Address: "10.0.0.2:10002",
Metadata: map[string]string{
"foo2": "bar2",
},
},
},
},
{
Name: "test3",
Version: "1.0.3",
Nodes: []*Node{
{
Id: "test3-1",
Address: "10.0.0.3:10003",
Metadata: map[string]string{
"foo3": "bar3",
},
},
},
},
}
testFn := func(service, s *Service) {
if s == nil {
t.Fatalf("Expected one result for %s got nil", service.Name)
}
if s.Name != service.Name {
t.Fatalf("Expected name %s got %s", service.Name, s.Name)
}
if s.Version != service.Version {
t.Fatalf("Expected version %s got %s", service.Version, s.Version)
}
if len(s.Nodes) != 1 {
t.Fatalf("Expected 1 node, got %d", len(s.Nodes))
}
node := s.Nodes[0]
if node.Id != service.Nodes[0].Id {
t.Fatalf("Expected node id %s got %s", service.Nodes[0].Id, node.Id)
}
if node.Address != service.Nodes[0].Address {
t.Fatalf("Expected node address %s got %s", service.Nodes[0].Address, node.Address)
}
}
travis := os.Getenv("TRAVIS")
var opts []Option
if travis == "true" {
opts = append(opts, Timeout(time.Millisecond*100))
}
// new registry
r := NewRegistry(opts...)
w, err := r.Watch()
if err != nil {
t.Fatal(err)
}
defer w.Stop()
for _, service := range testData {
// register service
if err := r.Register(service); err != nil {
t.Fatal(err)
}
for {
res, err := w.Next()
if err != nil {
t.Fatal(err)
}
if res.Service.Name != service.Name {
continue
}
if res.Action != "create" {
t.Fatalf("Expected create event got %s for %s", res.Action, res.Service.Name)
}
testFn(service, res.Service)
break
}
// deregister
if err := r.Deregister(service); err != nil {
t.Fatal(err)
}
for {
res, err := w.Next()
if err != nil {
t.Fatal(err)
}
if res.Service.Name != service.Name {
continue
}
if res.Action != "delete" {
continue
}
testFn(service, res.Service)
break
}
}
}

View File

@ -1,87 +0,0 @@
package registry
import (
"fmt"
"strings"
"github.com/micro/go-micro/v2/util/mdns"
)
type mdnsWatcher struct {
id string
wo WatchOptions
ch chan *mdns.ServiceEntry
exit chan struct{}
// the mdns domain
domain string
// the registry
registry *mdnsRegistry
}
func (m *mdnsWatcher) Next() (*Result, error) {
for {
select {
case e := <-m.ch:
txt, err := decode(e.InfoFields)
if err != nil {
continue
}
if len(txt.Service) == 0 || len(txt.Version) == 0 {
continue
}
// Filter watch options
// wo.Service: Only keep services we care about
if len(m.wo.Service) > 0 && txt.Service != m.wo.Service {
continue
}
var action string
if e.TTL == 0 {
action = "delete"
} else {
action = "create"
}
service := &Service{
Name: txt.Service,
Version: txt.Version,
Endpoints: txt.Endpoints,
}
// skip anything without the domain we care about
suffix := fmt.Sprintf(".%s.%s.", service.Name, m.domain)
if !strings.HasSuffix(e.Name, suffix) {
continue
}
service.Nodes = append(service.Nodes, &Node{
Id: strings.TrimSuffix(e.Name, suffix),
Address: fmt.Sprintf("%s:%d", e.AddrV4.String(), e.Port),
Metadata: txt.Metadata,
})
return &Result{
Action: action,
Service: service,
}, nil
case <-m.exit:
return nil, ErrWatcherStopped
}
}
}
func (m *mdnsWatcher) Stop() {
select {
case <-m.exit:
return
default:
close(m.exit)
// remove self from the registry
m.registry.mtx.Lock()
delete(m.registry.watchers, m.id)
m.registry.mtx.Unlock()
}
}

View File

@ -1,149 +0,0 @@
package registry
import (
"os"
"testing"
"time"
)
func TestWatcher(t *testing.T) {
if travis := os.Getenv("TRAVIS"); travis == "true" {
t.Skip()
}
testData := []*Service{
{
Name: "test1",
Version: "1.0.1",
Nodes: []*Node{
{
Id: "test1-1",
Address: "10.0.0.1:10001",
Metadata: map[string]string{
"foo": "bar",
},
},
},
},
{
Name: "test2",
Version: "1.0.2",
Nodes: []*Node{
{
Id: "test2-1",
Address: "10.0.0.2:10002",
Metadata: map[string]string{
"foo2": "bar2",
},
},
},
},
{
Name: "test3",
Version: "1.0.3",
Nodes: []*Node{
{
Id: "test3-1",
Address: "10.0.0.3:10003",
Metadata: map[string]string{
"foo3": "bar3",
},
},
},
},
}
testFn := func(service, s *Service) {
if s == nil {
t.Fatalf("Expected one result for %s got nil", service.Name)
}
if s.Name != service.Name {
t.Fatalf("Expected name %s got %s", service.Name, s.Name)
}
if s.Version != service.Version {
t.Fatalf("Expected version %s got %s", service.Version, s.Version)
}
if len(s.Nodes) != 1 {
t.Fatalf("Expected 1 node, got %d", len(s.Nodes))
}
node := s.Nodes[0]
if node.Id != service.Nodes[0].Id {
t.Fatalf("Expected node id %s got %s", service.Nodes[0].Id, node.Id)
}
if node.Address != service.Nodes[0].Address {
t.Fatalf("Expected node address %s got %s", service.Nodes[0].Address, node.Address)
}
}
travis := os.Getenv("TRAVIS")
var opts []Option
if travis == "true" {
opts = append(opts, Timeout(time.Millisecond*100))
}
// new registry
r := NewRegistry(opts...)
w, err := r.Watch()
if err != nil {
t.Fatal(err)
}
defer w.Stop()
for _, service := range testData {
// register service
if err := r.Register(service); err != nil {
t.Fatal(err)
}
for {
res, err := w.Next()
if err != nil {
t.Fatal(err)
}
if res.Service.Name != service.Name {
continue
}
if res.Action != "create" {
t.Fatalf("Expected create event got %s for %s", res.Action, res.Service.Name)
}
testFn(service, res.Service)
break
}
// deregister
if err := r.Deregister(service); err != nil {
t.Fatal(err)
}
for {
res, err := w.Next()
if err != nil {
t.Fatal(err)
}
if res.Service.Name != service.Name {
continue
}
if res.Action != "delete" {
continue
}
testFn(service, res.Service)
break
}
}
}