From 6c108d95b2ffd90e2356137919299d3401318b1f Mon Sep 17 00:00:00 2001 From: Asim Date: Wed, 27 Apr 2016 18:21:05 +0100 Subject: [PATCH 1/6] First attempt at mdns --- cmd/cmd.go | 2 + registry/mdns/encoding.go | 124 ++++++++++++++ registry/mdns/encoding_test.go | 147 +++++++++++++++++ registry/mdns/mdns.go | 284 +++++++++++++++++++++++++++++++++ 4 files changed, 557 insertions(+) create mode 100644 registry/mdns/encoding.go create mode 100644 registry/mdns/encoding_test.go create mode 100644 registry/mdns/mdns.go diff --git a/cmd/cmd.go b/cmd/cmd.go index dcd6220f..cfb60bfd 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -20,6 +20,7 @@ import ( // registries "github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry/consul" + "github.com/micro/go-micro/registry/mdns" // selectors "github.com/micro/go-micro/selector" @@ -139,6 +140,7 @@ var ( DefaultRegistries = map[string]func(...registry.Option) registry.Registry{ "consul": consul.NewRegistry, + "mdns": mdns.NewRegistry, } DefaultSelectors = map[string]func(...selector.Option) selector.Selector{ diff --git a/registry/mdns/encoding.go b/registry/mdns/encoding.go new file mode 100644 index 00000000..bb8feffe --- /dev/null +++ b/registry/mdns/encoding.go @@ -0,0 +1,124 @@ +package mdns + +import ( + "bytes" + "compress/zlib" + "encoding/hex" + "encoding/json" + "io/ioutil" + + "github.com/micro/go-micro/registry" +) + +func encode(buf []byte) string { + var b bytes.Buffer + defer b.Reset() + + w := zlib.NewWriter(&b) + if _, err := w.Write(buf); err != nil { + return "" + } + w.Close() + + return hex.EncodeToString(b.Bytes()) +} + +func decode(d string) []byte { + hr, err := hex.DecodeString(d) + if err != nil { + return nil + } + + br := bytes.NewReader(hr) + zr, err := zlib.NewReader(br) + if err != nil { + return nil + } + + rbuf, err := ioutil.ReadAll(zr) + if err != nil { + return nil + } + + return rbuf +} + +func encodeEndpoints(en []*registry.Endpoint) []string { + var tags []string + for _, e := range en { + if b, err := json.Marshal(e); err == nil { + tags = append(tags, "e-"+encode(b)) + } + } + return tags +} + +func decodeEndpoints(tags []string) []*registry.Endpoint { + var en []*registry.Endpoint + + for _, tag := range tags { + if len(tag) == 0 || tag[0] != 'e' || tag[1] != '-' { + continue + } + + buf := decode(tag[2:]) + + var e *registry.Endpoint + if err := json.Unmarshal(buf, &e); err == nil { + en = append(en, e) + } + } + return en +} + +func encodeMetadata(md map[string]string) []string { + var tags []string + for k, v := range md { + if b, err := json.Marshal(map[string]string{ + k: v, + }); err == nil { + // new encoding + tags = append(tags, "t-"+encode(b)) + } + } + return tags +} + +func decodeMetadata(tags []string) map[string]string { + md := make(map[string]string) + + for _, tag := range tags { + if len(tag) == 0 || tag[0] != 't' || tag[1] != '-' { + continue + } + + buf := decode(tag[2:]) + + var kv map[string]string + + // Now unmarshal + if err := json.Unmarshal(buf, &kv); err == nil { + for k, v := range kv { + md[k] = v + } + } + } + return md +} + +func encodeVersion(v string) []string { + return []string{ + // new encoding, + "v-" + encode([]byte(v)), + } +} + +func decodeVersion(tags []string) (string, bool) { + for _, tag := range tags { + if len(tag) < 2 || tag[0] != 'v' || tag[1] != '-' { + continue + } + return string(decode(tag[2:])), true + } + return "", false +} diff --git a/registry/mdns/encoding_test.go b/registry/mdns/encoding_test.go new file mode 100644 index 00000000..27f0f670 --- /dev/null +++ b/registry/mdns/encoding_test.go @@ -0,0 +1,147 @@ +package mdns + +import ( + "encoding/json" + "testing" + + "github.com/micro/go-micro/registry" +) + +func TestEncodingEndpoints(t *testing.T) { + eps := []*registry.Endpoint{ + ®istry.Endpoint{ + Name: "endpoint1", + Request: ®istry.Value{ + Name: "request", + Type: "request", + }, + Response: ®istry.Value{ + Name: "response", + Type: "response", + }, + Metadata: map[string]string{ + "foo1": "bar1", + }, + }, + ®istry.Endpoint{ + Name: "endpoint2", + Request: ®istry.Value{ + Name: "request", + Type: "request", + }, + Response: ®istry.Value{ + Name: "response", + Type: "response", + }, + Metadata: map[string]string{ + "foo2": "bar2", + }, + }, + ®istry.Endpoint{ + Name: "endpoint3", + Request: ®istry.Value{ + Name: "request", + Type: "request", + }, + Response: ®istry.Value{ + Name: "response", + Type: "response", + }, + Metadata: map[string]string{ + "foo3": "bar3", + }, + }, + } + + testEp := func(ep *registry.Endpoint, enc string) { + // encode endpoint + e := encodeEndpoints([]*registry.Endpoint{ep}) + + // check there are two tags; old and new + if len(e) != 1 { + t.Fatalf("Expected 1 encoded tag, got %v", e) + } + + // check old encoding + var seen bool + + for _, en := range e { + if en == enc { + seen = true + break + } + } + + if !seen { + t.Fatalf("Expected %s but not found", enc) + } + + // decode + d := decodeEndpoints([]string{enc}) + if len(d) == 0 { + t.Fatalf("Expected %v got %v", ep, d) + } + + // check name + if d[0].Name != ep.Name { + t.Fatalf("Expected ep %s got %s", ep.Name, d[0].Name) + } + + // check all the metadata exists + for k, v := range ep.Metadata { + if gv := d[0].Metadata[k]; gv != v { + t.Fatalf("Expected key %s val %s got val %s", k, v, gv) + } + } + } + + for _, ep := range eps { + // JSON encoded + jencoded, err := json.Marshal(ep) + if err != nil { + t.Fatal(err) + } + + // HEX encoded + hencoded := encode(jencoded) + // endpoint tag + hepTag := "e-" + hencoded + testEp(ep, hepTag) + } +} + +func TestEncodingVersion(t *testing.T) { + testData := []struct { + decoded string + encoded string + }{ + {"1.0.0", "v-789c32d433d03300040000ffff02ce00ee"}, + {"latest", "v-789cca492c492d2e01040000ffff08cc028e"}, + } + + for _, data := range testData { + e := encodeVersion(data.decoded) + + if e[0] != data.encoded { + t.Fatalf("Expected %s got %s", data.encoded, e) + } + + d, ok := decodeVersion(e) + if !ok { + t.Fatalf("Unexpected %t for %s", ok, data.encoded) + } + + if d != data.decoded { + t.Fatalf("Expected %s got %s", data.decoded, d) + } + + d, ok = decodeVersion([]string{data.encoded}) + if !ok { + t.Fatalf("Unexpected %t for %s", ok, data.encoded) + } + + if d != data.decoded { + t.Fatalf("Expected %s got %s", data.decoded, d) + } + } +} diff --git a/registry/mdns/mdns.go b/registry/mdns/mdns.go new file mode 100644 index 00000000..cb1e933b --- /dev/null +++ b/registry/mdns/mdns.go @@ -0,0 +1,284 @@ +package mdns + +/* + MDNS is a multicast dns registry for service discovery + This creates a zero dependency system which is great + where multicast dns is available. This usually depends + on the ability to leverage udp and multicast/broadcast. +*/ + +import ( + "net" + "strings" + "sync" + "time" + + "github.com/hashicorp/mdns" + "github.com/micro/go-micro/registry" + hash "github.com/mitchellh/hashstructure" +) + +type mdnsEntry struct { + hash uint64 + id string + node *mdns.Server +} + +type mdnsRegistry struct { + opts registry.Options + + sync.Mutex + services map[string][]*mdnsEntry +} + +func newRegistry(opts ...registry.Option) registry.Registry { + options := registry.Options{ + Timeout: time.Millisecond * 100, + } + + return &mdnsRegistry{ + opts: options, + services: make(map[string][]*mdnsEntry), + } +} + +func (m *mdnsRegistry) Register(service *registry.Service, opts ...registry.RegisterOption) error { + m.Lock() + defer m.Unlock() + + entries, ok := m.services[service.Name] + // first entry, create wildcard used for list queries + if !ok { + s, err := mdns.NewMDNSService( + service.Name, + "_services", + "", + "", + 9999, + []net.IP{net.ParseIP("0.0.0.0")}, + nil, + ) + if err != nil { + return err + } + + srv, err := mdns.NewServer(&mdns.Config{Zone: s}) + if err != nil { + return err + } + + // append the wildcard entry + entries = append(entries, &mdnsEntry{id: "*", node: srv}) + } + + var gerr error + + for _, node := range service.Nodes { + // create hash of service; uint64 + h, err := hash.Hash(node, nil) + if err != nil { + gerr = err + continue + } + + var seen bool + var e *mdnsEntry + + for _, entry := range entries { + if node.Id == entry.id { + seen = true + e = entry + break + } + } + + // already registered, continue + if seen && e.hash == h { + continue + // hash doesn't match, shutdown + } else if seen { + e.node.Shutdown() + // doesn't exist + } else { + e = &mdnsEntry{hash: h} + } + + var txt []string + txt = append(txt, encodeVersion(service.Version)...) + txt = append(txt, encodeMetadata(node.Metadata)...) + // txt = append(txt, encodeEndpoints(service.Endpoints)...) + + // we got here, new node + s, err := mdns.NewMDNSService( + node.Id, + service.Name, + "", + "", + node.Port, + []net.IP{net.ParseIP(node.Address)}, + txt, + ) + if err != nil { + gerr = err + continue + } + + srv, err := mdns.NewServer(&mdns.Config{Zone: s}) + if err != nil { + gerr = err + continue + } + + e.id = node.Id + e.node = srv + entries = append(entries, e) + } + + // save + m.services[service.Name] = entries + + return gerr +} + +func (m *mdnsRegistry) Deregister(service *registry.Service) error { + m.Lock() + defer m.Unlock() + + var newEntries []*mdnsEntry + + // loop existing entries, check if any match, shutdown those that do + for _, entry := range m.services[service.Name] { + var remove bool + + for _, node := range service.Nodes { + if node.Id == entry.id { + entry.node.Shutdown() + remove = true + break + } + } + + // keep it? + if !remove { + newEntries = append(newEntries, entry) + } + } + + // last entry is the wildcard for list queries. Remove it. + if len(newEntries) == 1 && newEntries[0].id == "*" { + newEntries[0].node.Shutdown() + delete(m.services, service.Name) + } else { + m.services[service.Name] = newEntries + } + + return nil +} + +func (m *mdnsRegistry) GetService(service string) ([]*registry.Service, error) { + p := mdns.DefaultParams(service) + p.Timeout = m.opts.Timeout + entryCh := make(chan *mdns.ServiceEntry, 10) + p.Entries = entryCh + + exit := make(chan bool) + defer close(exit) + + serviceMap := make(map[string]*registry.Service) + + go func() { + for { + select { + case e := <-entryCh: + // list record so skip + if p.Service == "_services" { + continue + } + + version, exists := decodeVersion(e.InfoFields) + if !exists { + continue + } + + s, ok := serviceMap[version] + if !ok { + s = ®istry.Service{ + Name: service, + Version: version, + // Endpoints: decodeEndpoints(e.InfoFields), + } + } + + s.Nodes = append(s.Nodes, ®istry.Node{ + Id: strings.TrimSuffix(e.Name, "."+p.Service+"."+p.Domain+"."), + Address: e.AddrV4.String(), + Port: e.Port, + Metadata: decodeMetadata(e.InfoFields), + }) + + serviceMap[version] = s + case <-exit: + return + } + } + }() + + if err := mdns.Query(p); err != nil { + return nil, err + } + + // create list and return + var services []*registry.Service + + for _, service := range serviceMap { + services = append(services, service) + } + + return services, nil +} + +func (m *mdnsRegistry) ListServices() ([]*registry.Service, error) { + p := mdns.DefaultParams("_services") + p.Timeout = m.opts.Timeout + entryCh := make(chan *mdns.ServiceEntry, 10) + p.Entries = entryCh + + exit := make(chan bool) + defer close(exit) + + serviceMap := make(map[string]bool) + var services []*registry.Service + + go func() { + for { + select { + case e := <-entryCh: + name := strings.TrimSuffix(e.Name, "."+p.Service+"."+p.Domain+".") + if !serviceMap[name] { + serviceMap[name] = true + services = append(services, ®istry.Service{Name: name}) + } + case <-exit: + return + } + } + }() + + if err := mdns.Query(p); err != nil { + return nil, err + } + + return services, nil +} + +func (m *mdnsRegistry) Watch() (registry.Watcher, error) { + return nil, nil +} + +func (m *mdnsRegistry) String() string { + return "mdns" +} + +func NewRegistry(opts ...registry.Option) registry.Registry { + return newRegistry(opts...) +} From c26f989bbb2877479b892fd45422950100016274 Mon Sep 17 00:00:00 2001 From: Asim Date: Thu, 28 Apr 2016 18:36:59 +0100 Subject: [PATCH 2/6] Fix encoding so we split across txt records --- registry/mdns/encoding.go | 135 ++++++++----------------- registry/mdns/encoding_test.go | 174 +++++++++------------------------ registry/mdns/mdns.go | 36 ++++--- 3 files changed, 113 insertions(+), 232 deletions(-) diff --git a/registry/mdns/encoding.go b/registry/mdns/encoding.go index bb8feffe..d7323daf 100644 --- a/registry/mdns/encoding.go +++ b/registry/mdns/encoding.go @@ -6,119 +6,68 @@ import ( "encoding/hex" "encoding/json" "io/ioutil" - - "github.com/micro/go-micro/registry" + "strings" ) -func encode(buf []byte) string { - var b bytes.Buffer - defer b.Reset() +func encode(txt *mdnsTxt) ([]string, error) { + b, err := json.Marshal(txt) + if err != nil { + return nil, err + } - w := zlib.NewWriter(&b) - if _, err := w.Write(buf); err != nil { - return "" + var buf bytes.Buffer + defer buf.Reset() + + w := zlib.NewWriter(&buf) + if _, err := w.Write(b); err != nil { + return nil, err } w.Close() - return hex.EncodeToString(b.Bytes()) + encoded := hex.EncodeToString(buf.Bytes()) + + // individual txt limit + if len(encoded) <= 255 { + return []string{encoded}, nil + } + + // split encoded string + var record []string + + for len(encoded) > 255 { + record = append(record, encoded[:255]) + encoded = encoded[255:] + } + + record = append(record, encoded) + + return record, nil } -func decode(d string) []byte { - hr, err := hex.DecodeString(d) +func decode(record []string) (*mdnsTxt, error) { + encoded := strings.Join(record, "") + + hr, err := hex.DecodeString(encoded) if err != nil { - return nil + return nil, err } br := bytes.NewReader(hr) zr, err := zlib.NewReader(br) if err != nil { - return nil + return nil, err } rbuf, err := ioutil.ReadAll(zr) if err != nil { - return nil + return nil, err } - return rbuf -} + var txt *mdnsTxt -func encodeEndpoints(en []*registry.Endpoint) []string { - var tags []string - for _, e := range en { - if b, err := json.Marshal(e); err == nil { - tags = append(tags, "e-"+encode(b)) - } + if err := json.Unmarshal(rbuf, &txt); err != nil { + return nil, err } - return tags -} - -func decodeEndpoints(tags []string) []*registry.Endpoint { - var en []*registry.Endpoint - - for _, tag := range tags { - if len(tag) == 0 || tag[0] != 'e' || tag[1] != '-' { - continue - } - - buf := decode(tag[2:]) - - var e *registry.Endpoint - if err := json.Unmarshal(buf, &e); err == nil { - en = append(en, e) - } - } - return en -} - -func encodeMetadata(md map[string]string) []string { - var tags []string - for k, v := range md { - if b, err := json.Marshal(map[string]string{ - k: v, - }); err == nil { - // new encoding - tags = append(tags, "t-"+encode(b)) - } - } - return tags -} - -func decodeMetadata(tags []string) map[string]string { - md := make(map[string]string) - - for _, tag := range tags { - if len(tag) == 0 || tag[0] != 't' || tag[1] != '-' { - continue - } - - buf := decode(tag[2:]) - - var kv map[string]string - - // Now unmarshal - if err := json.Unmarshal(buf, &kv); err == nil { - for k, v := range kv { - md[k] = v - } - } - } - return md -} - -func encodeVersion(v string) []string { - return []string{ - // new encoding, - "v-" + encode([]byte(v)), - } -} - -func decodeVersion(tags []string) (string, bool) { - for _, tag := range tags { - if len(tag) < 2 || tag[0] != 'v' || tag[1] != '-' { - continue - } - return string(decode(tag[2:])), true - } - return "", false + + return txt, nil } diff --git a/registry/mdns/encoding_test.go b/registry/mdns/encoding_test.go index 27f0f670..baea1c59 100644 --- a/registry/mdns/encoding_test.go +++ b/registry/mdns/encoding_test.go @@ -1,147 +1,67 @@ package mdns import ( - "encoding/json" "testing" "github.com/micro/go-micro/registry" ) -func TestEncodingEndpoints(t *testing.T) { - eps := []*registry.Endpoint{ - ®istry.Endpoint{ - Name: "endpoint1", - Request: ®istry.Value{ - Name: "request", - Type: "request", - }, - Response: ®istry.Value{ - Name: "response", - Type: "response", - }, +func TestEncoding(t *testing.T) { + testData := []*mdnsTxt{ + &mdnsTxt{ + Version: "1.0.0", Metadata: map[string]string{ - "foo1": "bar1", + "foo": "bar", }, - }, - ®istry.Endpoint{ - Name: "endpoint2", - Request: ®istry.Value{ - Name: "request", - Type: "request", - }, - Response: ®istry.Value{ - Name: "response", - Type: "response", - }, - Metadata: map[string]string{ - "foo2": "bar2", - }, - }, - ®istry.Endpoint{ - Name: "endpoint3", - Request: ®istry.Value{ - Name: "request", - Type: "request", - }, - Response: ®istry.Value{ - Name: "response", - Type: "response", - }, - Metadata: map[string]string{ - "foo3": "bar3", + Endpoints: []*registry.Endpoint{ + ®istry.Endpoint{ + Name: "endpoint1", + Request: ®istry.Value{ + Name: "request", + Type: "request", + }, + Response: ®istry.Value{ + Name: "response", + Type: "response", + }, + Metadata: map[string]string{ + "foo1": "bar1", + }, + }, }, }, } - testEp := func(ep *registry.Endpoint, enc string) { - // encode endpoint - e := encodeEndpoints([]*registry.Endpoint{ep}) - - // check there are two tags; old and new - if len(e) != 1 { - t.Fatalf("Expected 1 encoded tag, got %v", e) - } - - // check old encoding - var seen bool - - for _, en := range e { - if en == enc { - seen = true - break - } - } - - if !seen { - t.Fatalf("Expected %s but not found", enc) - } - - // decode - d := decodeEndpoints([]string{enc}) - if len(d) == 0 { - t.Fatalf("Expected %v got %v", ep, d) - } - - // check name - if d[0].Name != ep.Name { - t.Fatalf("Expected ep %s got %s", ep.Name, d[0].Name) - } - - // check all the metadata exists - for k, v := range ep.Metadata { - if gv := d[0].Metadata[k]; gv != v { - t.Fatalf("Expected key %s val %s got val %s", k, v, gv) - } - } - } - - for _, ep := range eps { - // JSON encoded - jencoded, err := json.Marshal(ep) + for _, d := range testData { + encoded, err := encode(d) if err != nil { t.Fatal(err) } - // HEX encoded - hencoded := encode(jencoded) - // endpoint tag - hepTag := "e-" + hencoded - testEp(ep, hepTag) - } -} - -func TestEncodingVersion(t *testing.T) { - testData := []struct { - decoded string - encoded string - }{ - {"1.0.0", "v-789c32d433d03300040000ffff02ce00ee"}, - {"latest", "v-789cca492c492d2e01040000ffff08cc028e"}, - } - - for _, data := range testData { - e := encodeVersion(data.decoded) - - if e[0] != data.encoded { - t.Fatalf("Expected %s got %s", data.encoded, e) - } - - d, ok := decodeVersion(e) - if !ok { - t.Fatalf("Unexpected %t for %s", ok, data.encoded) - } - - if d != data.decoded { - t.Fatalf("Expected %s got %s", data.decoded, d) - } - - d, ok = decodeVersion([]string{data.encoded}) - if !ok { - t.Fatalf("Unexpected %t for %s", ok, data.encoded) - } - - if d != data.decoded { - t.Fatalf("Expected %s got %s", data.decoded, d) - } + for _, txt := range encoded { + if len(txt) > 255 { + t.Fatalf("One of parts for txt is %d characters", len(txt)) + } + } + + decoded, err := decode(encoded) + if err != nil { + t.Fatal(err) + } + + if decoded.Version != d.Version { + t.Fatalf("Expected version %s got %s", d.Version, decoded.Version) + } + + if len(decoded.Endpoints) != len(d.Endpoints) { + t.Fatalf("Expected %d endpoints, got %d", len(d.Endpoints), len(decoded.Endpoints)) + } + + for k, v := range d.Metadata { + if val := decoded.Metadata[k]; val != v { + t.Fatalf("Expected %s=%s got %s=%s", k, v, k, val) + } + } } + } diff --git a/registry/mdns/mdns.go b/registry/mdns/mdns.go index cb1e933b..b7d056e9 100644 --- a/registry/mdns/mdns.go +++ b/registry/mdns/mdns.go @@ -18,6 +18,12 @@ import ( hash "github.com/mitchellh/hashstructure" ) +type mdnsTxt struct { + Version string + Endpoints []*registry.Endpoint + Metadata map[string]string +} + type mdnsEntry struct { hash uint64 id string @@ -103,10 +109,16 @@ func (m *mdnsRegistry) Register(service *registry.Service, opts ...registry.Regi e = &mdnsEntry{hash: h} } - var txt []string - txt = append(txt, encodeVersion(service.Version)...) - txt = append(txt, encodeMetadata(node.Metadata)...) - // txt = append(txt, encodeEndpoints(service.Endpoints)...) + txt, err := encode(&mdnsTxt{ + Version: service.Version, + Endpoints: service.Endpoints, + Metadata: node.Metadata, + }) + + if err != nil { + gerr = err + continue + } // we got here, new node s, err := mdns.NewMDNSService( @@ -195,17 +207,17 @@ func (m *mdnsRegistry) GetService(service string) ([]*registry.Service, error) { continue } - version, exists := decodeVersion(e.InfoFields) - if !exists { + txt, err := decode(e.InfoFields) + if err != nil { continue } - s, ok := serviceMap[version] + s, ok := serviceMap[txt.Version] if !ok { s = ®istry.Service{ - Name: service, - Version: version, - // Endpoints: decodeEndpoints(e.InfoFields), + Name: service, + Version: txt.Version, + Endpoints: txt.Endpoints, } } @@ -213,10 +225,10 @@ func (m *mdnsRegistry) GetService(service string) ([]*registry.Service, error) { Id: strings.TrimSuffix(e.Name, "."+p.Service+"."+p.Domain+"."), Address: e.AddrV4.String(), Port: e.Port, - Metadata: decodeMetadata(e.InfoFields), + Metadata: txt.Metadata, }) - serviceMap[version] = s + serviceMap[txt.Version] = s case <-exit: return } From ae8c9482027c4e9c1b87a8006cb5efd74dc6ac94 Mon Sep 17 00:00:00 2001 From: Asim Date: Sat, 30 Apr 2016 00:15:00 +0100 Subject: [PATCH 3/6] Use our fork of mdns with all the updates --- registry/mdns/mdns.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/registry/mdns/mdns.go b/registry/mdns/mdns.go index b7d056e9..92bcfee5 100644 --- a/registry/mdns/mdns.go +++ b/registry/mdns/mdns.go @@ -13,7 +13,7 @@ import ( "sync" "time" - "github.com/hashicorp/mdns" + "github.com/micro/mdns" "github.com/micro/go-micro/registry" hash "github.com/mitchellh/hashstructure" ) From 5b94e4672dd5b275ccd6ab6ba86b806e72baf4db Mon Sep 17 00:00:00 2001 From: Asim Date: Sat, 30 Apr 2016 00:20:05 +0100 Subject: [PATCH 4/6] Failed to gofmt --- cmd/cmd.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/cmd.go b/cmd/cmd.go index cfb60bfd..5c3ce99f 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -140,7 +140,7 @@ var ( DefaultRegistries = map[string]func(...registry.Option) registry.Registry{ "consul": consul.NewRegistry, - "mdns": mdns.NewRegistry, + "mdns": mdns.NewRegistry, } DefaultSelectors = map[string]func(...selector.Option) selector.Selector{ From 59f1a9a07b833c57b4ed6ee0d3b643eb552e3dfd Mon Sep 17 00:00:00 2001 From: Asim Date: Sat, 30 Apr 2016 00:22:31 +0100 Subject: [PATCH 5/6] Use mdns-sd service --- registry/mdns/mdns.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/registry/mdns/mdns.go b/registry/mdns/mdns.go index 92bcfee5..711007c0 100644 --- a/registry/mdns/mdns.go +++ b/registry/mdns/mdns.go @@ -13,8 +13,8 @@ import ( "sync" "time" - "github.com/micro/mdns" "github.com/micro/go-micro/registry" + "github.com/micro/mdns" hash "github.com/mitchellh/hashstructure" ) @@ -68,7 +68,7 @@ func (m *mdnsRegistry) Register(service *registry.Service, opts ...registry.Regi return err } - srv, err := mdns.NewServer(&mdns.Config{Zone: s}) + srv, err := mdns.NewServer(&mdns.Config{Zone: &mdns.DNSSDService{s}}) if err != nil { return err } From e14f9a0380e5e3e4e1fc06f7361f73cfc2f6662b Mon Sep 17 00:00:00 2001 From: Asim Date: Sun, 1 May 2016 19:31:03 +0100 Subject: [PATCH 6/6] Add watcher.... OH YEAAA --- registry/mdns/mdns.go | 23 ++++++++++++- registry/mdns/watcher.go | 72 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 1 deletion(-) create mode 100644 registry/mdns/watcher.go diff --git a/registry/mdns/mdns.go b/registry/mdns/mdns.go index 711007c0..eeb8a97e 100644 --- a/registry/mdns/mdns.go +++ b/registry/mdns/mdns.go @@ -19,6 +19,7 @@ import ( ) type mdnsTxt struct { + Service string Version string Endpoints []*registry.Endpoint Metadata map[string]string @@ -110,6 +111,7 @@ func (m *mdnsRegistry) Register(service *registry.Service, opts ...registry.Regi } txt, err := encode(&mdnsTxt{ + Service: service.Name, Version: service.Version, Endpoints: service.Endpoints, Metadata: node.Metadata, @@ -207,6 +209,10 @@ func (m *mdnsRegistry) GetService(service string) ([]*registry.Service, error) { continue } + if e.TTL == 0 { + continue + } + txt, err := decode(e.InfoFields) if err != nil { continue @@ -265,6 +271,10 @@ func (m *mdnsRegistry) ListServices() ([]*registry.Service, error) { for { select { case e := <-entryCh: + if e.TTL == 0 { + continue + } + name := strings.TrimSuffix(e.Name, "."+p.Service+"."+p.Domain+".") if !serviceMap[name] { serviceMap[name] = true @@ -284,7 +294,18 @@ func (m *mdnsRegistry) ListServices() ([]*registry.Service, error) { } func (m *mdnsRegistry) Watch() (registry.Watcher, error) { - return nil, nil + md := &mdnsWatcher{ + ch: make(chan *mdns.ServiceEntry, 32), + exit: make(chan struct{}), + } + + go func() { + if err := mdns.Listen(md.ch, md.exit); err != nil { + md.Stop() + } + }() + + return md, nil } func (m *mdnsRegistry) String() string { diff --git a/registry/mdns/watcher.go b/registry/mdns/watcher.go new file mode 100644 index 00000000..379b40f6 --- /dev/null +++ b/registry/mdns/watcher.go @@ -0,0 +1,72 @@ +package mdns + +import ( + "errors" + "strings" + + "github.com/micro/go-micro/registry" + "github.com/micro/mdns" +) + +type mdnsWatcher struct { + ch chan *mdns.ServiceEntry + exit chan struct{} +} + +func (m *mdnsWatcher) Next() (*registry.Result, error) { + for { + select { + case e := <-m.ch: + txt, err := decode(e.InfoFields) + if err != nil { + continue + } + + if len(txt.Service) == 0 || len(txt.Version) == 0 { + continue + } + + var action string + + if e.TTL == 0 { + action = "delete" + } else { + action = "create" + } + + service := ®istry.Service{ + Name: txt.Service, + Version: txt.Version, + Endpoints: txt.Endpoints, + } + + // TODO: don't hardcode .local. + if !strings.HasSuffix(e.Name, "."+service.Name+".local.") { + continue + } + + service.Nodes = append(service.Nodes, ®istry.Node{ + Id: strings.TrimSuffix(e.Name, "."+service.Name+".local."), + Address: e.AddrV4.String(), + Port: e.Port, + Metadata: txt.Metadata, + }) + + return ®istry.Result{ + Action: action, + Service: service, + }, nil + case <-m.exit: + return nil, errors.New("watcher stopped") + } + } +} + +func (m *mdnsWatcher) Stop() { + select { + case <-m.exit: + return + default: + close(m.exit) + } +}