diff --git a/cmd/cmd.go b/cmd/cmd.go index dcd6220f..5c3ce99f 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -20,6 +20,7 @@ import ( // registries "github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry/consul" + "github.com/micro/go-micro/registry/mdns" // selectors "github.com/micro/go-micro/selector" @@ -139,6 +140,7 @@ var ( DefaultRegistries = map[string]func(...registry.Option) registry.Registry{ "consul": consul.NewRegistry, + "mdns": mdns.NewRegistry, } DefaultSelectors = map[string]func(...selector.Option) selector.Selector{ diff --git a/registry/mdns/encoding.go b/registry/mdns/encoding.go new file mode 100644 index 00000000..d7323daf --- /dev/null +++ b/registry/mdns/encoding.go @@ -0,0 +1,73 @@ +package mdns + +import ( + "bytes" + "compress/zlib" + "encoding/hex" + "encoding/json" + "io/ioutil" + "strings" +) + +func encode(txt *mdnsTxt) ([]string, error) { + b, err := json.Marshal(txt) + if err != nil { + return nil, err + } + + var buf bytes.Buffer + defer buf.Reset() + + w := zlib.NewWriter(&buf) + if _, err := w.Write(b); err != nil { + return nil, err + } + w.Close() + + encoded := hex.EncodeToString(buf.Bytes()) + + // individual txt limit + if len(encoded) <= 255 { + return []string{encoded}, nil + } + + // split encoded string + var record []string + + for len(encoded) > 255 { + record = append(record, encoded[:255]) + encoded = encoded[255:] + } + + record = append(record, encoded) + + return record, nil +} + +func decode(record []string) (*mdnsTxt, error) { + encoded := strings.Join(record, "") + + hr, err := hex.DecodeString(encoded) + if err != nil { + return nil, err + } + + br := bytes.NewReader(hr) + zr, err := zlib.NewReader(br) + if err != nil { + return nil, err + } + + rbuf, err := ioutil.ReadAll(zr) + if err != nil { + return nil, err + } + + var txt *mdnsTxt + + if err := json.Unmarshal(rbuf, &txt); err != nil { + return nil, err + } + + return txt, nil +} diff --git a/registry/mdns/encoding_test.go b/registry/mdns/encoding_test.go new file mode 100644 index 00000000..baea1c59 --- /dev/null +++ b/registry/mdns/encoding_test.go @@ -0,0 +1,67 @@ +package mdns + +import ( + "testing" + + "github.com/micro/go-micro/registry" +) + +func TestEncoding(t *testing.T) { + testData := []*mdnsTxt{ + &mdnsTxt{ + Version: "1.0.0", + Metadata: map[string]string{ + "foo": "bar", + }, + Endpoints: []*registry.Endpoint{ + ®istry.Endpoint{ + Name: "endpoint1", + Request: ®istry.Value{ + Name: "request", + Type: "request", + }, + Response: ®istry.Value{ + Name: "response", + Type: "response", + }, + Metadata: map[string]string{ + "foo1": "bar1", + }, + }, + }, + }, + } + + for _, d := range testData { + encoded, err := encode(d) + if err != nil { + t.Fatal(err) + } + + for _, txt := range encoded { + if len(txt) > 255 { + t.Fatalf("One of parts for txt is %d characters", len(txt)) + } + } + + decoded, err := decode(encoded) + if err != nil { + t.Fatal(err) + } + + if decoded.Version != d.Version { + t.Fatalf("Expected version %s got %s", d.Version, decoded.Version) + } + + if len(decoded.Endpoints) != len(d.Endpoints) { + t.Fatalf("Expected %d endpoints, got %d", len(d.Endpoints), len(decoded.Endpoints)) + } + + for k, v := range d.Metadata { + if val := decoded.Metadata[k]; val != v { + t.Fatalf("Expected %s=%s got %s=%s", k, v, k, val) + } + } + } + +} diff --git a/registry/mdns/mdns.go b/registry/mdns/mdns.go new file mode 100644 index 00000000..eeb8a97e --- /dev/null +++ b/registry/mdns/mdns.go @@ -0,0 +1,317 @@ +package mdns + +/* + MDNS is a multicast dns registry for service discovery + This creates a zero dependency system which is great + where multicast dns is available. This usually depends + on the ability to leverage udp and multicast/broadcast. +*/ + +import ( + "net" + "strings" + "sync" + "time" + + "github.com/micro/go-micro/registry" + "github.com/micro/mdns" + hash "github.com/mitchellh/hashstructure" +) + +type mdnsTxt struct { + Service string + Version string + Endpoints []*registry.Endpoint + Metadata map[string]string +} + +type mdnsEntry struct { + hash uint64 + id string + node *mdns.Server +} + +type mdnsRegistry struct { + opts registry.Options + + sync.Mutex + services map[string][]*mdnsEntry +} + +func newRegistry(opts ...registry.Option) registry.Registry { + options := registry.Options{ + Timeout: time.Millisecond * 100, + } + + return &mdnsRegistry{ + opts: options, + services: make(map[string][]*mdnsEntry), + } +} + +func (m *mdnsRegistry) Register(service *registry.Service, opts ...registry.RegisterOption) error { + m.Lock() + defer m.Unlock() + + entries, ok := m.services[service.Name] + // first entry, create wildcard used for list queries + if !ok { + s, err := mdns.NewMDNSService( + service.Name, + "_services", + "", + "", + 9999, + []net.IP{net.ParseIP("0.0.0.0")}, + nil, + ) + if err != nil { + return err + } + + srv, err := mdns.NewServer(&mdns.Config{Zone: &mdns.DNSSDService{s}}) + if err != nil { + return err + } + + // append the wildcard entry + entries = append(entries, &mdnsEntry{id: "*", node: srv}) + } + + var gerr error + + for _, node := range service.Nodes { + // create hash of service; uint64 + h, err := hash.Hash(node, nil) + if err != nil { + gerr = err + continue + } + + var seen bool + var e *mdnsEntry + + for _, entry := range entries { + if node.Id == entry.id { + seen = true + e = entry + break + } + } + + // already registered, continue + if seen && e.hash == h { + continue + // hash doesn't match, shutdown + } else if seen { + e.node.Shutdown() + // doesn't exist + } else { + e = &mdnsEntry{hash: h} + } + + txt, err := encode(&mdnsTxt{ + Service: service.Name, + Version: service.Version, + Endpoints: service.Endpoints, + Metadata: node.Metadata, + }) + + if err != nil { + gerr = err + continue + } + + // we got here, new node + s, err := mdns.NewMDNSService( + node.Id, + service.Name, + "", + "", + node.Port, + []net.IP{net.ParseIP(node.Address)}, + txt, + ) + if err != nil { + gerr = err + continue + } + + srv, err := mdns.NewServer(&mdns.Config{Zone: s}) + if err != nil { + gerr = err + continue + } + + e.id = node.Id + e.node = srv + entries = append(entries, e) + } + + // save + m.services[service.Name] = entries + + return gerr +} + +func (m *mdnsRegistry) Deregister(service *registry.Service) error { + m.Lock() + defer m.Unlock() + + var newEntries []*mdnsEntry + + // loop existing entries, check if any match, shutdown those that do + for _, entry := range m.services[service.Name] { + var remove bool + + for _, node := range service.Nodes { + if node.Id == entry.id { + entry.node.Shutdown() + remove = true + break + } + } + + // keep it? + if !remove { + newEntries = append(newEntries, entry) + } + } + + // last entry is the wildcard for list queries. Remove it. + if len(newEntries) == 1 && newEntries[0].id == "*" { + newEntries[0].node.Shutdown() + delete(m.services, service.Name) + } else { + m.services[service.Name] = newEntries + } + + return nil +} + +func (m *mdnsRegistry) GetService(service string) ([]*registry.Service, error) { + p := mdns.DefaultParams(service) + p.Timeout = m.opts.Timeout + entryCh := make(chan *mdns.ServiceEntry, 10) + p.Entries = entryCh + + exit := make(chan bool) + defer close(exit) + + serviceMap := make(map[string]*registry.Service) + + go func() { + for { + select { + case e := <-entryCh: + // list record so skip + if p.Service == "_services" { + continue + } + + if e.TTL == 0 { + continue + } + + txt, err := decode(e.InfoFields) + if err != nil { + continue + } + + s, ok := serviceMap[txt.Version] + if !ok { + s = ®istry.Service{ + Name: service, + Version: txt.Version, + Endpoints: txt.Endpoints, + } + } + + s.Nodes = append(s.Nodes, ®istry.Node{ + Id: strings.TrimSuffix(e.Name, "."+p.Service+"."+p.Domain+"."), + Address: e.AddrV4.String(), + Port: e.Port, + Metadata: txt.Metadata, + }) + + serviceMap[txt.Version] = s + case <-exit: + return + } + } + }() + + if err := mdns.Query(p); err != nil { + return nil, err + } + + // create list and return + var services []*registry.Service + + for _, service := range serviceMap { + services = append(services, service) + } + + return services, nil +} + +func (m *mdnsRegistry) ListServices() ([]*registry.Service, error) { + p := mdns.DefaultParams("_services") + p.Timeout = m.opts.Timeout + entryCh := make(chan *mdns.ServiceEntry, 10) + p.Entries = entryCh + + exit := make(chan bool) + defer close(exit) + + serviceMap := make(map[string]bool) + var services []*registry.Service + + go func() { + for { + select { + case e := <-entryCh: + if e.TTL == 0 { + continue + } + + name := strings.TrimSuffix(e.Name, "."+p.Service+"."+p.Domain+".") + if !serviceMap[name] { + serviceMap[name] = true + services = append(services, ®istry.Service{Name: name}) + } + case <-exit: + return + } + } + }() + + if err := mdns.Query(p); err != nil { + return nil, err + } + + return services, nil +} + +func (m *mdnsRegistry) Watch() (registry.Watcher, error) { + md := &mdnsWatcher{ + ch: make(chan *mdns.ServiceEntry, 32), + exit: make(chan struct{}), + } + + go func() { + if err := mdns.Listen(md.ch, md.exit); err != nil { + md.Stop() + } + }() + + return md, nil +} + +func (m *mdnsRegistry) String() string { + return "mdns" +} + +func NewRegistry(opts ...registry.Option) registry.Registry { + return newRegistry(opts...) +} diff --git a/registry/mdns/watcher.go b/registry/mdns/watcher.go new file mode 100644 index 00000000..379b40f6 --- /dev/null +++ b/registry/mdns/watcher.go @@ -0,0 +1,72 @@ +package mdns + +import ( + "errors" + "strings" + + "github.com/micro/go-micro/registry" + "github.com/micro/mdns" +) + +type mdnsWatcher struct { + ch chan *mdns.ServiceEntry + exit chan struct{} +} + +func (m *mdnsWatcher) Next() (*registry.Result, error) { + for { + select { + case e := <-m.ch: + txt, err := decode(e.InfoFields) + if err != nil { + continue + } + + if len(txt.Service) == 0 || len(txt.Version) == 0 { + continue + } + + var action string + + if e.TTL == 0 { + action = "delete" + } else { + action = "create" + } + + service := ®istry.Service{ + Name: txt.Service, + Version: txt.Version, + Endpoints: txt.Endpoints, + } + + // TODO: don't hardcode .local. + if !strings.HasSuffix(e.Name, "."+service.Name+".local.") { + continue + } + + service.Nodes = append(service.Nodes, ®istry.Node{ + Id: strings.TrimSuffix(e.Name, "."+service.Name+".local."), + Address: e.AddrV4.String(), + Port: e.Port, + Metadata: txt.Metadata, + }) + + return ®istry.Result{ + Action: action, + Service: service, + }, nil + case <-m.exit: + return nil, errors.New("watcher stopped") + } + } +} + +func (m *mdnsWatcher) Stop() { + select { + case <-m.exit: + return + default: + close(m.exit) + } +}