From d8d3ee07f5726ca613c92a1e9322bffedb5c121b Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Mon, 27 Jul 2020 13:22:00 +0100 Subject: [PATCH] v3 refactor (#1868) * Move to v3 Co-authored-by: Ben Toogood --- mdns.go | 768 ++++++++++++++++++++++++++++++++++++++++++++++++++- mdns_test.go | 342 +++++++++++++++++++++++ options.go | 18 ++ 3 files changed, 1117 insertions(+), 11 deletions(-) create mode 100644 mdns_test.go create mode 100644 options.go diff --git a/mdns.go b/mdns.go index 498b242..33e91be 100644 --- a/mdns.go +++ b/mdns.go @@ -1,23 +1,769 @@ -// Package mdns provides a multicast dns registry package mdns import ( + "bytes" + "compress/zlib" "context" + "encoding/hex" + "encoding/json" + "fmt" + "io/ioutil" + "net" + "strconv" + "strings" + "sync" + "time" - "github.com/micro/go-micro/v2/registry" + "github.com/google/uuid" + "github.com/micro/go-micro/v3/logger" + "github.com/micro/go-micro/v3/registry" + "github.com/micro/go-micro/v3/util/mdns" ) -// NewRegistry returns a new mdns registry -func NewRegistry(opts ...registry.Option) registry.Registry { - return registry.NewRegistry(opts...) +const ( + // every service is written to the global domain so * domain queries work, e.g. + // calling mdns.List(registry.ListDomain("*")) will list the services across all + // domains + globalDomain = "global" +) + +type mdnsTxt struct { + Service string + Version string + Endpoints []*registry.Endpoint + Metadata map[string]string } -// Domain sets the mdnsDomain -func Domain(d string) registry.Option { - return func(o *registry.Options) { - if o.Context == nil { - o.Context = context.Background() +type mdnsEntry struct { + id string + node *mdns.Server +} + +// services are a key/value map, with the service name as a key and the value being a +// slice of mdns entries, representing the nodes with a single _services entry to be +// used for listing +type services map[string][]*mdnsEntry + +// mdsRegistry is a multicast dns registry +type mdnsRegistry struct { + opts registry.Options + + // the top level domains, these can be overriden using options + defaultDomain string + globalDomain string + + sync.Mutex + domains map[string]services + + mtx sync.RWMutex + + // watchers + watchers map[string]*mdnsWatcher + + // listener + listener chan *mdns.ServiceEntry +} + +type mdnsWatcher struct { + id string + wo registry.WatchOptions + ch chan *mdns.ServiceEntry + exit chan struct{} + // the mdns domain + domain string + // the registry + registry *mdnsRegistry +} + +func encode(txt *mdnsTxt) ([]string, error) { + b, err := json.Marshal(txt) + if err != nil { + return nil, err + } + + var buf bytes.Buffer + defer buf.Reset() + + w := zlib.NewWriter(&buf) + defer func() { + if closeErr := w.Close(); closeErr != nil { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("[mdns] registry close encoding writer err: %v", closeErr) + } } - o.Context = context.WithValue(o.Context, "mdns.domain", d) + }() + if _, err := w.Write(b); err != nil { + return nil, err + } + + if err = w.Close(); err != nil { + return nil, err + } + + encoded := hex.EncodeToString(buf.Bytes()) + // individual txt limit + if len(encoded) <= 255 { + return []string{encoded}, nil + } + + // split encoded string + var record []string + + for len(encoded) > 255 { + record = append(record, encoded[:255]) + encoded = encoded[255:] + } + + record = append(record, encoded) + + return record, nil +} + +func decode(record []string) (*mdnsTxt, error) { + encoded := strings.Join(record, "") + + hr, err := hex.DecodeString(encoded) + if err != nil { + return nil, err + } + + br := bytes.NewReader(hr) + zr, err := zlib.NewReader(br) + if err != nil { + return nil, err + } + defer zr.Close() + + rbuf, err := ioutil.ReadAll(zr) + if err != nil { + return nil, err + } + + var txt *mdnsTxt + + if err := json.Unmarshal(rbuf, &txt); err != nil { + return nil, err + } + + return txt, nil +} + +func newRegistry(opts ...registry.Option) registry.Registry { + options := registry.Options{ + Context: context.Background(), + Timeout: time.Millisecond * 100, + } + + for _, o := range opts { + o(&options) + } + + // set the domain + defaultDomain := registry.DefaultDomain + if d, ok := options.Context.Value("mdns.domain").(string); ok { + defaultDomain = d + } + + return &mdnsRegistry{ + defaultDomain: defaultDomain, + globalDomain: globalDomain, + opts: options, + domains: make(map[string]services), + watchers: make(map[string]*mdnsWatcher), } } + +func (m *mdnsRegistry) Init(opts ...registry.Option) error { + for _, o := range opts { + o(&m.opts) + } + return nil +} + +func (m *mdnsRegistry) Options() registry.Options { + return m.opts +} + +// createServiceMDNSEntry will create a new wildcard mdns entry for the service in the +// given domain. This wildcard mdns entry is used when listing services. +func createServiceMDNSEntry(name, domain string) (*mdnsEntry, error) { + ip := net.ParseIP("0.0.0.0") + + s, err := mdns.NewMDNSService(name, "_services", domain+".", "", 9999, []net.IP{ip}, nil) + if err != nil { + return nil, err + } + + srv, err := mdns.NewServer(&mdns.Config{Zone: &mdns.DNSSDService{MDNSService: s}, LocalhostChecking: true}) + if err != nil { + return nil, err + } + + return &mdnsEntry{id: "*", node: srv}, nil +} + +func (m *mdnsRegistry) getMdnsEntries(domain, serviceName string) ([]*mdnsEntry, error) { + entries, ok := m.domains[domain][serviceName] + if ok { + return entries, nil + } + + // create the wildcard entry used for list queries in this domain + entry, err := createServiceMDNSEntry(serviceName, domain) + if err != nil { + return nil, err + } + + return []*mdnsEntry{entry}, nil +} + +func registerService(service *registry.Service, entries []*mdnsEntry, options registry.RegisterOptions) ([]*mdnsEntry, error) { + var lastError error + for _, node := range service.Nodes { + var seen bool + + for _, entry := range entries { + if node.Id == entry.id { + seen = true + break + } + } + + // this node has already been registered, continue + if seen { + continue + } + + txt, err := encode(&mdnsTxt{ + Service: service.Name, + Version: service.Version, + Endpoints: service.Endpoints, + Metadata: node.Metadata, + }) + + if err != nil { + lastError = err + continue + } + + host, pt, err := net.SplitHostPort(node.Address) + if err != nil { + lastError = err + continue + } + port, _ := strconv.Atoi(pt) + + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("[mdns] registry create new service with ip: %s for: %s", net.ParseIP(host).String(), host) + } + // we got here, new node + s, err := mdns.NewMDNSService( + node.Id, + service.Name, + options.Domain+".", + "", + port, + []net.IP{net.ParseIP(host)}, + txt, + ) + if err != nil { + lastError = err + continue + } + + srv, err := mdns.NewServer(&mdns.Config{Zone: s, LocalhostChecking: true}) + if err != nil { + lastError = err + continue + } + + entries = append(entries, &mdnsEntry{id: node.Id, node: srv}) + } + + return entries, lastError +} + +func createGlobalDomainService(service *registry.Service, options registry.RegisterOptions) *registry.Service { + srv := *service + srv.Nodes = nil + + for _, n := range service.Nodes { + node := n + + // set the original domain in node metadata + if node.Metadata == nil { + node.Metadata = map[string]string{"domain": options.Domain} + } else { + node.Metadata["domain"] = options.Domain + } + + srv.Nodes = append(srv.Nodes, node) + } + + return &srv +} + +func (m *mdnsRegistry) Register(service *registry.Service, opts ...registry.RegisterOption) error { + m.Lock() + + // parse the options + var options registry.RegisterOptions + for _, o := range opts { + o(&options) + } + if len(options.Domain) == 0 { + options.Domain = m.defaultDomain + } + + // create the domain in the memory store if it doesn't yet exist + if _, ok := m.domains[options.Domain]; !ok { + m.domains[options.Domain] = make(services) + } + + entries, err := m.getMdnsEntries(options.Domain, service.Name) + if err != nil { + m.Unlock() + return err + } + + entries, gerr := registerService(service, entries, options) + + // save the mdns entry + m.domains[options.Domain][service.Name] = entries + m.Unlock() + + // register in the global Domain so it can be queried as one + if options.Domain != m.globalDomain { + srv := createGlobalDomainService(service, options) + if err := m.Register(srv, append(opts, registry.RegisterDomain(m.globalDomain))...); err != nil { + gerr = err + } + } + + return gerr +} + +func (m *mdnsRegistry) Deregister(service *registry.Service, opts ...registry.DeregisterOption) error { + // parse the options + var options registry.DeregisterOptions + for _, o := range opts { + o(&options) + } + if len(options.Domain) == 0 { + options.Domain = m.defaultDomain + } + + // register in the global Domain + var err error + if options.Domain != m.globalDomain { + defer func() { + err = m.Deregister(service, append(opts, registry.DeregisterDomain(m.globalDomain))...) + }() + } + + // we want to unlock before we call deregister on the global domain, so it's important this unlock + // is applied after the defer m.Deregister is called above + m.Lock() + defer m.Unlock() + + // the service wasn't registered, we can safely exist + if _, ok := m.domains[options.Domain]; !ok { + return err + } + + // loop existing entries, check if any match, shutdown those that do + var newEntries []*mdnsEntry + for _, entry := range m.domains[options.Domain][service.Name] { + var remove bool + + for _, node := range service.Nodes { + if node.Id == entry.id { + entry.node.Shutdown() + remove = true + break + } + } + + // keep it? + if !remove { + newEntries = append(newEntries, entry) + } + } + + // we have no new entries, we can exit + if len(newEntries) == 0 { + return nil + } + + // we have more than one entry remaining, we can exit + if len(newEntries) > 1 { + m.domains[options.Domain][service.Name] = newEntries + return err + } + + // our remaining entry is not a wildcard, we can exit + if len(newEntries) == 1 && newEntries[0].id != "*" { + m.domains[options.Domain][service.Name] = newEntries + return err + } + + // last entry is the wildcard for list queries. Remove it. + newEntries[0].node.Shutdown() + delete(m.domains[options.Domain], service.Name) + + // check to see if we can delete the domain entry + if len(m.domains[options.Domain]) == 0 { + delete(m.domains, options.Domain) + } + + return err +} + +func (m *mdnsRegistry) GetService(service string, opts ...registry.GetOption) ([]*registry.Service, error) { + // parse the options + var options registry.GetOptions + for _, o := range opts { + o(&options) + } + if len(options.Domain) == 0 { + options.Domain = m.defaultDomain + } + if options.Domain == registry.WildcardDomain { + options.Domain = m.globalDomain + } + + serviceMap := make(map[string]*registry.Service) + entries := make(chan *mdns.ServiceEntry, 10) + done := make(chan bool) + + p := mdns.DefaultParams(service) + // set context with timeout + var cancel context.CancelFunc + p.Context, cancel = context.WithTimeout(context.Background(), m.opts.Timeout) + defer cancel() + // set entries channel + p.Entries = entries + // set the domain + p.Domain = options.Domain + + go func() { + for { + select { + case e := <-entries: + // list record so skip + if e.Name == "_services" { + continue + } + if e.TTL == 0 { + continue + } + + txt, err := decode(e.InfoFields) + if err != nil { + continue + } + + if txt.Service != service { + continue + } + + s, ok := serviceMap[txt.Version] + if !ok { + s = ®istry.Service{ + Name: txt.Service, + Version: txt.Version, + Endpoints: txt.Endpoints, + } + } + addr := "" + // prefer ipv4 addrs + if len(e.AddrV4) > 0 { + addr = e.AddrV4.String() + // else use ipv6 + } else if len(e.AddrV6) > 0 { + addr = "[" + e.AddrV6.String() + "]" + } else { + if logger.V(logger.InfoLevel, logger.DefaultLogger) { + logger.Infof("[mdns]: invalid endpoint received: %v", e) + } + continue + } + s.Nodes = append(s.Nodes, ®istry.Node{ + Id: strings.TrimSuffix(e.Name, "."+p.Service+"."+p.Domain+"."), + Address: fmt.Sprintf("%s:%d", addr, e.Port), + Metadata: txt.Metadata, + }) + + serviceMap[txt.Version] = s + case <-p.Context.Done(): + close(done) + return + } + } + }() + + // execute the query + if err := mdns.Query(p); err != nil { + return nil, err + } + + // wait for completion + <-done + + // create list and return + services := make([]*registry.Service, 0, len(serviceMap)) + + for _, service := range serviceMap { + services = append(services, service) + } + + return services, nil +} + +func (m *mdnsRegistry) ListServices(opts ...registry.ListOption) ([]*registry.Service, error) { + // parse the options + var options registry.ListOptions + for _, o := range opts { + o(&options) + } + if len(options.Domain) == 0 { + options.Domain = m.defaultDomain + } + if options.Domain == registry.WildcardDomain { + options.Domain = m.globalDomain + } + + serviceMap := make(map[string]bool) + entries := make(chan *mdns.ServiceEntry, 10) + done := make(chan bool) + + p := mdns.DefaultParams("_services") + // set context with timeout + var cancel context.CancelFunc + p.Context, cancel = context.WithTimeout(context.Background(), m.opts.Timeout) + defer cancel() + // set entries channel + p.Entries = entries + // set domain + p.Domain = options.Domain + + var services []*registry.Service + + go func() { + for { + select { + case e := <-entries: + if e.TTL == 0 { + continue + } + if !strings.HasSuffix(e.Name, p.Domain+".") { + continue + } + name := strings.TrimSuffix(e.Name, "."+p.Service+"."+p.Domain+".") + if !serviceMap[name] { + serviceMap[name] = true + services = append(services, ®istry.Service{Name: name}) + } + case <-p.Context.Done(): + close(done) + return + } + } + }() + + // execute query + if err := mdns.Query(p); err != nil { + return nil, err + } + + // wait till done + <-done + + return services, nil +} + +func (m *mdnsRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) { + var wo registry.WatchOptions + for _, o := range opts { + o(&wo) + } + if len(wo.Domain) == 0 { + wo.Domain = m.defaultDomain + } + if wo.Domain == registry.WildcardDomain { + wo.Domain = m.globalDomain + } + + md := &mdnsWatcher{ + id: uuid.New().String(), + wo: wo, + ch: make(chan *mdns.ServiceEntry, 32), + exit: make(chan struct{}), + domain: wo.Domain, + registry: m, + } + + m.mtx.Lock() + defer m.mtx.Unlock() + + // save the watcher + m.watchers[md.id] = md + + // check of the listener exists + if m.listener != nil { + return md, nil + } + + // start the listener + go func() { + // go to infinity + for { + m.mtx.Lock() + + // just return if there are no watchers + if len(m.watchers) == 0 { + m.listener = nil + m.mtx.Unlock() + return + } + + // check existing listener + if m.listener != nil { + m.mtx.Unlock() + return + } + + // reset the listener + exit := make(chan struct{}) + ch := make(chan *mdns.ServiceEntry, 32) + m.listener = ch + + m.mtx.Unlock() + + // send messages to the watchers + go func() { + send := func(w *mdnsWatcher, e *mdns.ServiceEntry) { + select { + case w.ch <- e: + default: + } + } + + for { + select { + case <-exit: + return + case e, ok := <-ch: + if !ok { + return + } + m.mtx.RLock() + // send service entry to all watchers + for _, w := range m.watchers { + send(w, e) + } + m.mtx.RUnlock() + } + } + + }() + + // start listening, blocking call + mdns.Listen(ch, exit) + + // mdns.Listen has unblocked + // kill the saved listener + m.mtx.Lock() + m.listener = nil + close(ch) + m.mtx.Unlock() + } + }() + + return md, nil +} + +func (m *mdnsRegistry) String() string { + return "mdns" +} + +func (m *mdnsWatcher) Next() (*registry.Result, error) { + for { + select { + case e := <-m.ch: + txt, err := decode(e.InfoFields) + if err != nil { + continue + } + + if len(txt.Service) == 0 || len(txt.Version) == 0 { + continue + } + + // Filter watch options + // wo.Service: Only keep services we care about + if len(m.wo.Service) > 0 && txt.Service != m.wo.Service { + continue + } + var action string + if e.TTL == 0 { + action = "delete" + } else { + action = "create" + } + + service := ®istry.Service{ + Name: txt.Service, + Version: txt.Version, + Endpoints: txt.Endpoints, + Metadata: txt.Metadata, + } + + // skip anything without the domain we care about + suffix := fmt.Sprintf(".%s.%s.", service.Name, m.domain) + if !strings.HasSuffix(e.Name, suffix) { + continue + } + + var addr string + if len(e.AddrV4) > 0 { + addr = e.AddrV4.String() + } else if len(e.AddrV6) > 0 { + addr = "[" + e.AddrV6.String() + "]" + } else { + addr = e.Addr.String() + } + + service.Nodes = append(service.Nodes, ®istry.Node{ + Id: strings.TrimSuffix(e.Name, suffix), + Address: fmt.Sprintf("%s:%d", addr, e.Port), + Metadata: txt.Metadata, + }) + + return ®istry.Result{ + Action: action, + Service: service, + }, nil + case <-m.exit: + return nil, registry.ErrWatcherStopped + } + } +} + +func (m *mdnsWatcher) Stop() { + select { + case <-m.exit: + return + default: + close(m.exit) + // remove self from the registry + m.registry.mtx.Lock() + delete(m.registry.watchers, m.id) + m.registry.mtx.Unlock() + } +} + +// NewRegistry returns a new default registry which is mdns +func NewRegistry(opts ...registry.Option) registry.Registry { + return newRegistry(opts...) +} diff --git a/mdns_test.go b/mdns_test.go new file mode 100644 index 0000000..7e3d4c1 --- /dev/null +++ b/mdns_test.go @@ -0,0 +1,342 @@ +package mdns + +import ( + "os" + "testing" + "time" + + "github.com/micro/go-micro/v3/registry" +) + +func TestMDNS(t *testing.T) { + // skip test in travis because of sendto: operation not permitted error + if travis := os.Getenv("TRAVIS"); travis == "true" { + t.Skip() + } + + testData := []*registry.Service{ + { + Name: "test1", + Version: "1.0.1", + Nodes: []*registry.Node{ + { + Id: "test1-1", + Address: "10.0.0.1:10001", + Metadata: map[string]string{ + "foo": "bar", + }, + }, + }, + }, + { + Name: "test2", + Version: "1.0.2", + Nodes: []*registry.Node{ + { + Id: "test2-1", + Address: "10.0.0.2:10002", + Metadata: map[string]string{ + "foo2": "bar2", + }, + }, + }, + }, + { + Name: "test3", + Version: "1.0.3", + Nodes: []*registry.Node{ + { + Id: "test3-1", + Address: "10.0.0.3:10003", + Metadata: map[string]string{ + "foo3": "bar3", + }, + }, + }, + }, + } + + travis := os.Getenv("TRAVIS") + + var opts []registry.Option + + if travis == "true" { + opts = append(opts, registry.Timeout(time.Millisecond*100)) + } + + // new registry + r := NewRegistry(opts...) + + for _, service := range testData { + // register service + if err := r.Register(service); err != nil { + t.Fatal(err) + } + + // get registered service + s, err := r.GetService(service.Name) + if err != nil { + t.Fatal(err) + } + + if len(s) != 1 { + t.Fatalf("Expected one result for %s got %d", service.Name, len(s)) + } + + if s[0].Name != service.Name { + t.Fatalf("Expected name %s got %s", service.Name, s[0].Name) + } + + if s[0].Version != service.Version { + t.Fatalf("Expected version %s got %s", service.Version, s[0].Version) + } + + if len(s[0].Nodes) != 1 { + t.Fatalf("Expected 1 node, got %d", len(s[0].Nodes)) + } + + node := s[0].Nodes[0] + + if node.Id != service.Nodes[0].Id { + t.Fatalf("Expected node id %s got %s", service.Nodes[0].Id, node.Id) + } + + if node.Address != service.Nodes[0].Address { + t.Fatalf("Expected node address %s got %s", service.Nodes[0].Address, node.Address) + } + } + + services, err := r.ListServices() + if err != nil { + t.Fatal(err) + } + + for _, service := range testData { + var seen bool + for _, s := range services { + if s.Name == service.Name { + seen = true + break + } + } + if !seen { + t.Fatalf("Expected service %s got nothing", service.Name) + } + + // deregister + if err := r.Deregister(service); err != nil { + t.Fatal(err) + } + + time.Sleep(time.Millisecond * 5) + + // check its gone + s, _ := r.GetService(service.Name) + if len(s) > 0 { + t.Fatalf("Expected nothing got %+v", s[0]) + } + } + +} + +func TestEncoding(t *testing.T) { + testData := []*mdnsTxt{ + { + Version: "1.0.0", + Metadata: map[string]string{ + "foo": "bar", + }, + Endpoints: []*registry.Endpoint{ + { + Name: "endpoint1", + Request: ®istry.Value{ + Name: "request", + Type: "request", + }, + Response: ®istry.Value{ + Name: "response", + Type: "response", + }, + Metadata: map[string]string{ + "foo1": "bar1", + }, + }, + }, + }, + } + + for _, d := range testData { + encoded, err := encode(d) + if err != nil { + t.Fatal(err) + } + + for _, txt := range encoded { + if len(txt) > 255 { + t.Fatalf("One of parts for txt is %d characters", len(txt)) + } + } + + decoded, err := decode(encoded) + if err != nil { + t.Fatal(err) + } + + if decoded.Version != d.Version { + t.Fatalf("Expected version %s got %s", d.Version, decoded.Version) + } + + if len(decoded.Endpoints) != len(d.Endpoints) { + t.Fatalf("Expected %d endpoints, got %d", len(d.Endpoints), len(decoded.Endpoints)) + } + + for k, v := range d.Metadata { + if val := decoded.Metadata[k]; val != v { + t.Fatalf("Expected %s=%s got %s=%s", k, v, k, val) + } + } + } + +} + +func TestWatcher(t *testing.T) { + if travis := os.Getenv("TRAVIS"); travis == "true" { + t.Skip() + } + + testData := []*registry.Service{ + { + Name: "test1", + Version: "1.0.1", + Nodes: []*registry.Node{ + { + Id: "test1-1", + Address: "10.0.0.1:10001", + Metadata: map[string]string{ + "foo": "bar", + }, + }, + }, + }, + { + Name: "test2", + Version: "1.0.2", + Nodes: []*registry.Node{ + { + Id: "test2-1", + Address: "10.0.0.2:10002", + Metadata: map[string]string{ + "foo2": "bar2", + }, + }, + }, + }, + { + Name: "test3", + Version: "1.0.3", + Nodes: []*registry.Node{ + { + Id: "test3-1", + Address: "10.0.0.3:10003", + Metadata: map[string]string{ + "foo3": "bar3", + }, + }, + }, + }, + } + + testFn := func(service, s *registry.Service) { + if s == nil { + t.Fatalf("Expected one result for %s got nil", service.Name) + + } + + if s.Name != service.Name { + t.Fatalf("Expected name %s got %s", service.Name, s.Name) + } + + if s.Version != service.Version { + t.Fatalf("Expected version %s got %s", service.Version, s.Version) + } + + if len(s.Nodes) != 1 { + t.Fatalf("Expected 1 node, got %d", len(s.Nodes)) + } + + node := s.Nodes[0] + + if node.Id != service.Nodes[0].Id { + t.Fatalf("Expected node id %s got %s", service.Nodes[0].Id, node.Id) + } + + if node.Address != service.Nodes[0].Address { + t.Fatalf("Expected node address %s got %s", service.Nodes[0].Address, node.Address) + } + } + + travis := os.Getenv("TRAVIS") + + var opts []registry.Option + + if travis == "true" { + opts = append(opts, registry.Timeout(time.Millisecond*100)) + } + + // new registry + r := NewRegistry(opts...) + + w, err := r.Watch() + if err != nil { + t.Fatal(err) + } + defer w.Stop() + + for _, service := range testData { + // register service + if err := r.Register(service); err != nil { + t.Fatal(err) + } + + for { + res, err := w.Next() + if err != nil { + t.Fatal(err) + } + + if res.Service.Name != service.Name { + continue + } + + if res.Action != "create" { + t.Fatalf("Expected create event got %s for %s", res.Action, res.Service.Name) + } + + testFn(service, res.Service) + break + } + + // deregister + if err := r.Deregister(service); err != nil { + t.Fatal(err) + } + + for { + res, err := w.Next() + if err != nil { + t.Fatal(err) + } + + if res.Service.Name != service.Name { + continue + } + + if res.Action != "delete" { + continue + } + + testFn(service, res.Service) + break + } + } +} diff --git a/options.go b/options.go new file mode 100644 index 0000000..0de2d3b --- /dev/null +++ b/options.go @@ -0,0 +1,18 @@ +// Package mdns provides a multicast dns registry +package mdns + +import ( + "context" + + "github.com/micro/go-micro/v3/registry" +) + +// Domain sets the mdnsDomain +func Domain(d string) registry.Option { + return func(o *registry.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, "mdns.domain", d) + } +}