diff --git a/encoding.go b/encoding.go deleted file mode 100644 index d7323da..0000000 --- a/encoding.go +++ /dev/null @@ -1,73 +0,0 @@ -package mdns - -import ( - "bytes" - "compress/zlib" - "encoding/hex" - "encoding/json" - "io/ioutil" - "strings" -) - -func encode(txt *mdnsTxt) ([]string, error) { - b, err := json.Marshal(txt) - if err != nil { - return nil, err - } - - var buf bytes.Buffer - defer buf.Reset() - - w := zlib.NewWriter(&buf) - if _, err := w.Write(b); err != nil { - return nil, err - } - w.Close() - - encoded := hex.EncodeToString(buf.Bytes()) - - // individual txt limit - if len(encoded) <= 255 { - return []string{encoded}, nil - } - - // split encoded string - var record []string - - for len(encoded) > 255 { - record = append(record, encoded[:255]) - encoded = encoded[255:] - } - - record = append(record, encoded) - - return record, nil -} - -func decode(record []string) (*mdnsTxt, error) { - encoded := strings.Join(record, "") - - hr, err := hex.DecodeString(encoded) - if err != nil { - return nil, err - } - - br := bytes.NewReader(hr) - zr, err := zlib.NewReader(br) - if err != nil { - return nil, err - } - - rbuf, err := ioutil.ReadAll(zr) - if err != nil { - return nil, err - } - - var txt *mdnsTxt - - if err := json.Unmarshal(rbuf, &txt); err != nil { - return nil, err - } - - return txt, nil -} diff --git a/encoding_test.go b/encoding_test.go deleted file mode 100644 index baea1c5..0000000 --- a/encoding_test.go +++ /dev/null @@ -1,67 +0,0 @@ -package mdns - -import ( - "testing" - - "github.com/micro/go-micro/registry" -) - -func TestEncoding(t *testing.T) { - testData := []*mdnsTxt{ - &mdnsTxt{ - Version: "1.0.0", - Metadata: map[string]string{ - "foo": "bar", - }, - Endpoints: []*registry.Endpoint{ - ®istry.Endpoint{ - Name: "endpoint1", - Request: ®istry.Value{ - Name: "request", - Type: "request", - }, - Response: ®istry.Value{ - Name: "response", - Type: "response", - }, - Metadata: map[string]string{ - "foo1": "bar1", - }, - }, - }, - }, - } - - for _, d := range testData { - encoded, err := encode(d) - if err != nil { - t.Fatal(err) - } - - for _, txt := range encoded { - if len(txt) > 255 { - t.Fatalf("One of parts for txt is %d characters", len(txt)) - } - } - - decoded, err := decode(encoded) - if err != nil { - t.Fatal(err) - } - - if decoded.Version != d.Version { - t.Fatalf("Expected version %s got %s", d.Version, decoded.Version) - } - - if len(decoded.Endpoints) != len(d.Endpoints) { - t.Fatalf("Expected %d endpoints, got %d", len(d.Endpoints), len(decoded.Endpoints)) - } - - for k, v := range d.Metadata { - if val := decoded.Metadata[k]; val != v { - t.Fatalf("Expected %s=%s got %s=%s", k, v, k, val) - } - } - } - -} diff --git a/mdns.go b/mdns.go index 920df3f..6739c69 100644 --- a/mdns.go +++ b/mdns.go @@ -1,339 +1,11 @@ -// Package mdns is a multicast dns registry +// Package mdns provides a multicast dns registry package mdns -/* - MDNS is a multicast dns registry for service discovery - This creates a zero dependency system which is great - where multicast dns is available. This usually depends - on the ability to leverage udp and multicast/broadcast. -*/ - import ( - "net" - "strings" - "sync" - "time" - "github.com/micro/go-micro/registry" - "github.com/micro/mdns" - hash "github.com/mitchellh/hashstructure" ) -type mdnsTxt struct { - Service string - Version string - Endpoints []*registry.Endpoint - Metadata map[string]string -} - -type mdnsEntry struct { - hash uint64 - id string - node *mdns.Server -} - -type mdnsRegistry struct { - opts registry.Options - - sync.Mutex - services map[string][]*mdnsEntry -} - -func newRegistry(opts ...registry.Option) registry.Registry { - options := registry.Options{ - Timeout: time.Millisecond * 100, - } - - return &mdnsRegistry{ - opts: options, - services: make(map[string][]*mdnsEntry), - } -} - -func (m *mdnsRegistry) Init(opts ...registry.Option) error { - for _, o := range opts { - o(&m.opts) - } - return nil -} - -func (m *mdnsRegistry) Options() registry.Options { - return m.opts -} - -func (m *mdnsRegistry) Register(service *registry.Service, opts ...registry.RegisterOption) error { - m.Lock() - defer m.Unlock() - - entries, ok := m.services[service.Name] - // first entry, create wildcard used for list queries - if !ok { - s, err := mdns.NewMDNSService( - service.Name, - "_services", - "", - "", - 9999, - []net.IP{net.ParseIP("0.0.0.0")}, - nil, - ) - if err != nil { - return err - } - - srv, err := mdns.NewServer(&mdns.Config{Zone: &mdns.DNSSDService{s}}) - if err != nil { - return err - } - - // append the wildcard entry - entries = append(entries, &mdnsEntry{id: "*", node: srv}) - } - - var gerr error - - for _, node := range service.Nodes { - // create hash of service; uint64 - h, err := hash.Hash(node, nil) - if err != nil { - gerr = err - continue - } - - var seen bool - var e *mdnsEntry - - for _, entry := range entries { - if node.Id == entry.id { - seen = true - e = entry - break - } - } - - // already registered, continue - if seen && e.hash == h { - continue - // hash doesn't match, shutdown - } else if seen { - e.node.Shutdown() - // doesn't exist - } else { - e = &mdnsEntry{hash: h} - } - - txt, err := encode(&mdnsTxt{ - Service: service.Name, - Version: service.Version, - Endpoints: service.Endpoints, - Metadata: node.Metadata, - }) - - if err != nil { - gerr = err - continue - } - - // we got here, new node - s, err := mdns.NewMDNSService( - node.Id, - service.Name, - "", - "", - node.Port, - []net.IP{net.ParseIP(node.Address)}, - txt, - ) - if err != nil { - gerr = err - continue - } - - srv, err := mdns.NewServer(&mdns.Config{Zone: s}) - if err != nil { - gerr = err - continue - } - - e.id = node.Id - e.node = srv - entries = append(entries, e) - } - - // save - m.services[service.Name] = entries - - return gerr -} - -func (m *mdnsRegistry) Deregister(service *registry.Service) error { - m.Lock() - defer m.Unlock() - - var newEntries []*mdnsEntry - - // loop existing entries, check if any match, shutdown those that do - for _, entry := range m.services[service.Name] { - var remove bool - - for _, node := range service.Nodes { - if node.Id == entry.id { - entry.node.Shutdown() - remove = true - break - } - } - - // keep it? - if !remove { - newEntries = append(newEntries, entry) - } - } - - // last entry is the wildcard for list queries. Remove it. - if len(newEntries) == 1 && newEntries[0].id == "*" { - newEntries[0].node.Shutdown() - delete(m.services, service.Name) - } else { - m.services[service.Name] = newEntries - } - - return nil -} - -func (m *mdnsRegistry) GetService(service string) ([]*registry.Service, error) { - p := mdns.DefaultParams(service) - p.Timeout = m.opts.Timeout - entryCh := make(chan *mdns.ServiceEntry, 10) - p.Entries = entryCh - - exit := make(chan bool) - defer close(exit) - - serviceMap := make(map[string]*registry.Service) - - go func() { - for { - select { - case e := <-entryCh: - // list record so skip - if p.Service == "_services" { - continue - } - - if e.TTL == 0 { - continue - } - - txt, err := decode(e.InfoFields) - if err != nil { - continue - } - - if txt.Service != service { - continue - } - - s, ok := serviceMap[txt.Version] - if !ok { - s = ®istry.Service{ - Name: txt.Service, - Version: txt.Version, - Endpoints: txt.Endpoints, - } - } - - s.Nodes = append(s.Nodes, ®istry.Node{ - Id: strings.TrimSuffix(e.Name, "."+p.Service+"."+p.Domain+"."), - Address: e.AddrV4.String(), - Port: e.Port, - Metadata: txt.Metadata, - }) - - serviceMap[txt.Version] = s - case <-exit: - return - } - } - }() - - if err := mdns.Query(p); err != nil { - return nil, err - } - - // create list and return - var services []*registry.Service - - for _, service := range serviceMap { - services = append(services, service) - } - - return services, nil -} - -func (m *mdnsRegistry) ListServices() ([]*registry.Service, error) { - p := mdns.DefaultParams("_services") - p.Timeout = m.opts.Timeout - entryCh := make(chan *mdns.ServiceEntry, 10) - p.Entries = entryCh - - exit := make(chan bool) - defer close(exit) - - serviceMap := make(map[string]bool) - var services []*registry.Service - - go func() { - for { - select { - case e := <-entryCh: - if e.TTL == 0 { - continue - } - - name := strings.TrimSuffix(e.Name, "."+p.Service+"."+p.Domain+".") - if !serviceMap[name] { - serviceMap[name] = true - services = append(services, ®istry.Service{Name: name}) - } - case <-exit: - return - } - } - }() - - if err := mdns.Query(p); err != nil { - return nil, err - } - - return services, nil -} - -func (m *mdnsRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) { - var wo registry.WatchOptions - for _, o := range opts { - o(&wo) - } - - md := &mdnsWatcher{ - wo: wo, - ch: make(chan *mdns.ServiceEntry, 32), - exit: make(chan struct{}), - } - - go func() { - if err := mdns.Listen(md.ch, md.exit); err != nil { - md.Stop() - } - }() - - return md, nil -} - -func (m *mdnsRegistry) String() string { - return "mdns" -} - +// NewRegistry returns a new mdns registry func NewRegistry(opts ...registry.Option) registry.Registry { - return newRegistry(opts...) + return registry.NewRegistry(opts...) } diff --git a/mdns_test.go b/mdns_test.go deleted file mode 100644 index bdf557f..0000000 --- a/mdns_test.go +++ /dev/null @@ -1,134 +0,0 @@ -package mdns - -import ( - "testing" - "time" - - "github.com/micro/go-micro/registry" -) - -func TestMDNS(t *testing.T) { - testData := []*registry.Service{ - ®istry.Service{ - Name: "test1", - Version: "1.0.1", - Nodes: []*registry.Node{ - ®istry.Node{ - Id: "test1-1", - Address: "10.0.0.1", - Port: 10001, - Metadata: map[string]string{ - "foo": "bar", - }, - }, - }, - }, - ®istry.Service{ - Name: "test2", - Version: "1.0.2", - Nodes: []*registry.Node{ - ®istry.Node{ - Id: "test2-1", - Address: "10.0.0.2", - Port: 10002, - Metadata: map[string]string{ - "foo2": "bar2", - }, - }, - }, - }, - ®istry.Service{ - Name: "test3", - Version: "1.0.3", - Nodes: []*registry.Node{ - ®istry.Node{ - Id: "test3-1", - Address: "10.0.0.3", - Port: 10003, - Metadata: map[string]string{ - "foo3": "bar3", - }, - }, - }, - }, - } - - // new registry - r := NewRegistry() - - for _, service := range testData { - // register service - if err := r.Register(service); err != nil { - t.Fatal(err) - } - - // get registered service - s, err := r.GetService(service.Name) - if err != nil { - t.Fatal(err) - } - - if len(s) != 1 { - t.Fatalf("Expected one result for %s got %d", service.Name, len(s)) - - } - - if s[0].Name != service.Name { - t.Fatalf("Expected name %s got %s", service.Name, s[0].Name) - } - - if s[0].Version != service.Version { - t.Fatalf("Expected version %s got %s", service.Version, s[0].Version) - } - - if len(s[0].Nodes) != 1 { - t.Fatalf("Expected 1 node, got %d", len(s[0].Nodes)) - } - - node := s[0].Nodes[0] - - if node.Id != service.Nodes[0].Id { - t.Fatalf("Expected node id %s got %s", service.Nodes[0].Id, node.Id) - } - - if node.Address != service.Nodes[0].Address { - t.Fatalf("Expected node address %s got %s", service.Nodes[0].Address, node.Address) - } - - if node.Port != service.Nodes[0].Port { - t.Fatalf("Expected node port %d got %d", service.Nodes[0].Port, node.Port) - } - } - - services, err := r.ListServices() - if err != nil { - t.Fatal(err) - } - - for _, service := range testData { - var seen bool - for _, s := range services { - if s.Name == service.Name { - seen = true - break - } - } - if !seen { - t.Fatalf("Expected service %s got nothing", service.Name) - } - - // deregister - if err := r.Deregister(service); err != nil { - t.Fatal(err) - } - - time.Sleep(time.Millisecond * 5) - - // check its gone - s, _ := r.GetService(service.Name) - if len(s) > 0 { - t.Fatalf("Expected nothing got %+v", s[0]) - } - } - -} diff --git a/watcher.go b/watcher.go deleted file mode 100644 index 952816d..0000000 --- a/watcher.go +++ /dev/null @@ -1,79 +0,0 @@ -package mdns - -import ( - "errors" - "strings" - - "github.com/micro/go-micro/registry" - "github.com/micro/mdns" -) - -type mdnsWatcher struct { - wo registry.WatchOptions - ch chan *mdns.ServiceEntry - exit chan struct{} -} - -func (m *mdnsWatcher) Next() (*registry.Result, error) { - for { - select { - case e := <-m.ch: - txt, err := decode(e.InfoFields) - if err != nil { - continue - } - - if len(txt.Service) == 0 || len(txt.Version) == 0 { - continue - } - - // Filter watch options - // wo.Service: Only keep services we care about - if len(m.wo.Service) > 0 && txt.Service != m.wo.Service { - continue - } - - var action string - - if e.TTL == 0 { - action = "delete" - } else { - action = "create" - } - - service := ®istry.Service{ - Name: txt.Service, - Version: txt.Version, - Endpoints: txt.Endpoints, - } - - // TODO: don't hardcode .local. - if !strings.HasSuffix(e.Name, "."+service.Name+".local.") { - continue - } - - service.Nodes = append(service.Nodes, ®istry.Node{ - Id: strings.TrimSuffix(e.Name, "."+service.Name+".local."), - Address: e.AddrV4.String(), - Port: e.Port, - Metadata: txt.Metadata, - }) - - return ®istry.Result{ - Action: action, - Service: service, - }, nil - case <-m.exit: - return nil, errors.New("watcher stopped") - } - } -} - -func (m *mdnsWatcher) Stop() { - select { - case <-m.exit: - return - default: - close(m.exit) - } -} diff --git a/watcher_test.go b/watcher_test.go deleted file mode 100644 index f9ff62c..0000000 --- a/watcher_test.go +++ /dev/null @@ -1,144 +0,0 @@ -package mdns - -import ( - "testing" - - "github.com/micro/go-micro/registry" -) - -func TestWatcher(t *testing.T) { - testData := []*registry.Service{ - ®istry.Service{ - Name: "test1", - Version: "1.0.1", - Nodes: []*registry.Node{ - ®istry.Node{ - Id: "test1-1", - Address: "10.0.0.1", - Port: 10001, - Metadata: map[string]string{ - "foo": "bar", - }, - }, - }, - }, - ®istry.Service{ - Name: "test2", - Version: "1.0.2", - Nodes: []*registry.Node{ - ®istry.Node{ - Id: "test2-1", - Address: "10.0.0.2", - Port: 10002, - Metadata: map[string]string{ - "foo2": "bar2", - }, - }, - }, - }, - ®istry.Service{ - Name: "test3", - Version: "1.0.3", - Nodes: []*registry.Node{ - ®istry.Node{ - Id: "test3-1", - Address: "10.0.0.3", - Port: 10003, - Metadata: map[string]string{ - "foo3": "bar3", - }, - }, - }, - }, - } - - testFn := func(service, s *registry.Service) { - if s == nil { - t.Fatalf("Expected one result for %s got nil", service.Name) - - } - - if s.Name != service.Name { - t.Fatalf("Expected name %s got %s", service.Name, s.Name) - } - - if s.Version != service.Version { - t.Fatalf("Expected version %s got %s", service.Version, s.Version) - } - - if len(s.Nodes) != 1 { - t.Fatalf("Expected 1 node, got %d", len(s.Nodes)) - } - - node := s.Nodes[0] - - if node.Id != service.Nodes[0].Id { - t.Fatalf("Expected node id %s got %s", service.Nodes[0].Id, node.Id) - } - - if node.Address != service.Nodes[0].Address { - t.Fatalf("Expected node address %s got %s", service.Nodes[0].Address, node.Address) - } - - if node.Port != service.Nodes[0].Port { - t.Fatalf("Expected node port %d got %d", service.Nodes[0].Port, node.Port) - } - } - - // new registry - r := NewRegistry() - - w, err := r.Watch() - if err != nil { - t.Fatal(err) - } - defer w.Stop() - - for _, service := range testData { - // register service - if err := r.Register(service); err != nil { - t.Fatal(err) - } - - for { - res, err := w.Next() - if err != nil { - t.Fatal(err) - } - - if res.Service.Name != service.Name { - continue - } - - if res.Action != "create" { - t.Fatalf("Expected create event got %s for %s", res.Action, res.Service.Name) - } - - testFn(service, res.Service) - break - } - - // deregister - if err := r.Deregister(service); err != nil { - t.Fatal(err) - } - - for { - res, err := w.Next() - if err != nil { - t.Fatal(err) - } - - if res.Service.Name != service.Name { - continue - } - - if res.Action != "delete" { - continue - } - - testFn(service, res.Service) - break - } - } -}