184
mdns.go
184
mdns.go
@@ -15,9 +15,9 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/micro/go-micro/v3/logger"
|
||||
"github.com/micro/go-micro/v3/registry"
|
||||
"github.com/micro/go-micro/v3/util/mdns"
|
||||
util "github.com/unistack-org/micro-register-mdns/v3/util"
|
||||
"github.com/unistack-org/micro/v3/logger"
|
||||
"github.com/unistack-org/micro/v3/register"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -30,13 +30,13 @@ const (
|
||||
type mdnsTxt struct {
|
||||
Service string
|
||||
Version string
|
||||
Endpoints []*registry.Endpoint
|
||||
Endpoints []*register.Endpoint
|
||||
Metadata map[string]string
|
||||
}
|
||||
|
||||
type mdnsEntry struct {
|
||||
id string
|
||||
node *mdns.Server
|
||||
node *util.Server
|
||||
}
|
||||
|
||||
// services are a key/value map, with the service name as a key and the value being a
|
||||
@@ -45,8 +45,8 @@ type mdnsEntry struct {
|
||||
type services map[string][]*mdnsEntry
|
||||
|
||||
// mdsRegistry is a multicast dns registry
|
||||
type mdnsRegistry struct {
|
||||
opts registry.Options
|
||||
type mdnsRegister struct {
|
||||
opts register.Options
|
||||
|
||||
// the top level domains, these can be overriden using options
|
||||
defaultDomain string
|
||||
@@ -61,18 +61,18 @@ type mdnsRegistry struct {
|
||||
watchers map[string]*mdnsWatcher
|
||||
|
||||
// listener
|
||||
listener chan *mdns.ServiceEntry
|
||||
listener chan *util.ServiceEntry
|
||||
}
|
||||
|
||||
type mdnsWatcher struct {
|
||||
id string
|
||||
wo registry.WatchOptions
|
||||
ch chan *mdns.ServiceEntry
|
||||
wo register.WatchOptions
|
||||
ch chan *util.ServiceEntry
|
||||
exit chan struct{}
|
||||
// the mdns domain
|
||||
domain string
|
||||
// the registry
|
||||
registry *mdnsRegistry
|
||||
registry *mdnsRegister
|
||||
}
|
||||
|
||||
func encode(txt *mdnsTxt) ([]string, error) {
|
||||
@@ -87,8 +87,8 @@ func encode(txt *mdnsTxt) ([]string, error) {
|
||||
w := zlib.NewWriter(&buf)
|
||||
defer func() {
|
||||
if closeErr := w.Close(); closeErr != nil {
|
||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||
logger.Errorf("[mdns] registry close encoding writer err: %v", closeErr)
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Errorf(context.TODO(), "[mdns] registry close encoding writer err: %v", closeErr)
|
||||
}
|
||||
}
|
||||
}()
|
||||
@@ -148,8 +148,8 @@ func decode(record []string) (*mdnsTxt, error) {
|
||||
return txt, nil
|
||||
}
|
||||
|
||||
func newRegistry(opts ...registry.Option) registry.Registry {
|
||||
options := registry.Options{
|
||||
func newRegister(opts ...register.Option) register.Register {
|
||||
options := register.Options{
|
||||
Context: context.Background(),
|
||||
Timeout: time.Millisecond * 100,
|
||||
}
|
||||
@@ -159,12 +159,12 @@ func newRegistry(opts ...registry.Option) registry.Registry {
|
||||
}
|
||||
|
||||
// set the domain
|
||||
defaultDomain := registry.DefaultDomain
|
||||
defaultDomain := register.DefaultDomain
|
||||
if d, ok := options.Context.Value("mdns.domain").(string); ok {
|
||||
defaultDomain = d
|
||||
}
|
||||
|
||||
return &mdnsRegistry{
|
||||
return &mdnsRegister{
|
||||
defaultDomain: defaultDomain,
|
||||
globalDomain: globalDomain,
|
||||
opts: options,
|
||||
@@ -173,28 +173,38 @@ func newRegistry(opts ...registry.Option) registry.Registry {
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mdnsRegistry) Init(opts ...registry.Option) error {
|
||||
func (m *mdnsRegister) Init(opts ...register.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&m.opts)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mdnsRegistry) Options() registry.Options {
|
||||
func (m *mdnsRegister) Options() register.Options {
|
||||
return m.opts
|
||||
}
|
||||
|
||||
func (m *mdnsRegister) Connect(ctx context.Context) error {
|
||||
// TODO: real connect
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mdnsRegister) Disconnect(ctx context.Context) error {
|
||||
// TODO: real disconnect
|
||||
return nil
|
||||
}
|
||||
|
||||
// createServiceMDNSEntry will create a new wildcard mdns entry for the service in the
|
||||
// given domain. This wildcard mdns entry is used when listing services.
|
||||
func createServiceMDNSEntry(name, domain string) (*mdnsEntry, error) {
|
||||
ip := net.ParseIP("0.0.0.0")
|
||||
|
||||
s, err := mdns.NewMDNSService(name, "_services", domain+".", "", 9999, []net.IP{ip}, nil)
|
||||
s, err := util.NewMDNSService(name, "_services", domain+".", "", 9999, []net.IP{ip}, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
srv, err := mdns.NewServer(&mdns.Config{Zone: &mdns.DNSSDService{MDNSService: s}, LocalhostChecking: true})
|
||||
srv, err := util.NewServer(&util.Config{Zone: &util.DNSSDService{MDNSService: s} /*, LocalhostChecking: true*/})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -202,7 +212,7 @@ func createServiceMDNSEntry(name, domain string) (*mdnsEntry, error) {
|
||||
return &mdnsEntry{id: "*", node: srv}, nil
|
||||
}
|
||||
|
||||
func (m *mdnsRegistry) createMDNSEntries(domain, serviceName string) ([]*mdnsEntry, error) {
|
||||
func (m *mdnsRegister) createMDNSEntries(domain, serviceName string) ([]*mdnsEntry, error) {
|
||||
// if it already exists don't reegister it again
|
||||
entries, ok := m.domains[domain][serviceName]
|
||||
if ok {
|
||||
@@ -218,7 +228,7 @@ func (m *mdnsRegistry) createMDNSEntries(domain, serviceName string) ([]*mdnsEnt
|
||||
return []*mdnsEntry{entry}, nil
|
||||
}
|
||||
|
||||
func registerService(service *registry.Service, entries []*mdnsEntry, options registry.RegisterOptions) ([]*mdnsEntry, error) {
|
||||
func registerService(service *register.Service, entries []*mdnsEntry, options register.RegisterOptions) ([]*mdnsEntry, error) {
|
||||
var lastError error
|
||||
for _, node := range service.Nodes {
|
||||
var seen bool
|
||||
@@ -254,11 +264,11 @@ func registerService(service *registry.Service, entries []*mdnsEntry, options re
|
||||
}
|
||||
port, _ := strconv.Atoi(pt)
|
||||
|
||||
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
|
||||
logger.Debugf("[mdns] registry create new service with ip: %s for: %s", net.ParseIP(host).String(), host)
|
||||
if logger.V(logger.DebugLevel) {
|
||||
logger.Debugf(context.TODO(), "[mdns] registry create new service with ip: %s for: %s", net.ParseIP(host).String(), host)
|
||||
}
|
||||
// we got here, new node
|
||||
s, err := mdns.NewMDNSService(
|
||||
s, err := util.NewMDNSService(
|
||||
node.Id,
|
||||
service.Name,
|
||||
options.Domain+".",
|
||||
@@ -272,7 +282,7 @@ func registerService(service *registry.Service, entries []*mdnsEntry, options re
|
||||
continue
|
||||
}
|
||||
|
||||
srv, err := mdns.NewServer(&mdns.Config{Zone: s, LocalhostChecking: true})
|
||||
srv, err := util.NewServer(&util.Config{Zone: s /*, LocalhostChecking: true*/})
|
||||
if err != nil {
|
||||
lastError = err
|
||||
continue
|
||||
@@ -284,7 +294,7 @@ func registerService(service *registry.Service, entries []*mdnsEntry, options re
|
||||
return entries, lastError
|
||||
}
|
||||
|
||||
func createGlobalDomainService(service *registry.Service, options registry.RegisterOptions) *registry.Service {
|
||||
func createGlobalDomainService(service *register.Service, options register.RegisterOptions) *register.Service {
|
||||
srv := *service
|
||||
srv.Nodes = nil
|
||||
|
||||
@@ -304,11 +314,11 @@ func createGlobalDomainService(service *registry.Service, options registry.Regis
|
||||
return &srv
|
||||
}
|
||||
|
||||
func (m *mdnsRegistry) Register(service *registry.Service, opts ...registry.RegisterOption) error {
|
||||
func (m *mdnsRegister) Register(ctx context.Context, service *register.Service, opts ...register.RegisterOption) error {
|
||||
m.Lock()
|
||||
|
||||
// parse the options
|
||||
var options registry.RegisterOptions
|
||||
var options register.RegisterOptions
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
@@ -336,7 +346,7 @@ func (m *mdnsRegistry) Register(service *registry.Service, opts ...registry.Regi
|
||||
// register in the global Domain so it can be queried as one
|
||||
if options.Domain != m.globalDomain {
|
||||
srv := createGlobalDomainService(service, options)
|
||||
if err := m.Register(srv, append(opts, registry.RegisterDomain(m.globalDomain))...); err != nil {
|
||||
if err := m.Register(ctx, srv, append(opts, register.RegisterDomain(m.globalDomain))...); err != nil {
|
||||
gerr = err
|
||||
}
|
||||
}
|
||||
@@ -344,9 +354,9 @@ func (m *mdnsRegistry) Register(service *registry.Service, opts ...registry.Regi
|
||||
return gerr
|
||||
}
|
||||
|
||||
func (m *mdnsRegistry) Deregister(service *registry.Service, opts ...registry.DeregisterOption) error {
|
||||
func (m *mdnsRegister) Deregister(ctx context.Context, service *register.Service, opts ...register.DeregisterOption) error {
|
||||
// parse the options
|
||||
var options registry.DeregisterOptions
|
||||
var options register.DeregisterOptions
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
@@ -358,7 +368,7 @@ func (m *mdnsRegistry) Deregister(service *registry.Service, opts ...registry.De
|
||||
var err error
|
||||
if options.Domain != m.globalDomain {
|
||||
defer func() {
|
||||
err = m.Deregister(service, append(opts, registry.DeregisterDomain(m.globalDomain))...)
|
||||
err = m.Deregister(ctx, service, append(opts, register.DeregisterDomain(m.globalDomain))...)
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -420,28 +430,28 @@ func (m *mdnsRegistry) Deregister(service *registry.Service, opts ...registry.De
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *mdnsRegistry) GetService(service string, opts ...registry.GetOption) ([]*registry.Service, error) {
|
||||
func (m *mdnsRegister) LookupService(ctx context.Context, service string, opts ...register.LookupOption) ([]*register.Service, error) {
|
||||
// parse the options
|
||||
var options registry.GetOptions
|
||||
var options register.LookupOptions
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
if len(options.Domain) == 0 {
|
||||
options.Domain = m.defaultDomain
|
||||
}
|
||||
if options.Domain == registry.WildcardDomain {
|
||||
if options.Domain == register.WildcardDomain {
|
||||
options.Domain = m.globalDomain
|
||||
}
|
||||
|
||||
serviceMap := make(map[string]*registry.Service)
|
||||
entries := make(chan *mdns.ServiceEntry, 10)
|
||||
serviceMap := make(map[string]*register.Service)
|
||||
entries := make(chan *util.ServiceEntry, 10)
|
||||
done := make(chan bool)
|
||||
|
||||
p := mdns.DefaultParams(service)
|
||||
p := util.DefaultParams(service)
|
||||
// set context with timeout
|
||||
var cancel context.CancelFunc
|
||||
p.Context, cancel = context.WithTimeout(context.Background(), m.opts.Timeout)
|
||||
defer cancel()
|
||||
//var cancel context.CancelFunc
|
||||
//p.Context, cancel = context.WithTimeout(context.Background(), m.opts.Timeout)
|
||||
//defer cancel()
|
||||
// set entries channel
|
||||
p.Entries = entries
|
||||
// set the domain
|
||||
@@ -470,7 +480,7 @@ func (m *mdnsRegistry) GetService(service string, opts ...registry.GetOption) ([
|
||||
|
||||
s, ok := serviceMap[txt.Version]
|
||||
if !ok {
|
||||
s = ®istry.Service{
|
||||
s = ®ister.Service{
|
||||
Name: txt.Service,
|
||||
Version: txt.Version,
|
||||
Endpoints: txt.Endpoints,
|
||||
@@ -484,27 +494,27 @@ func (m *mdnsRegistry) GetService(service string, opts ...registry.GetOption) ([
|
||||
} else if len(e.AddrV6) > 0 {
|
||||
addr = "[" + e.AddrV6.String() + "]"
|
||||
} else {
|
||||
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
|
||||
logger.Infof("[mdns]: invalid endpoint received: %v", e)
|
||||
if logger.V(logger.InfoLevel) {
|
||||
logger.Infof(context.TODO(), "[mdns]: invalid endpoint received: %v", e)
|
||||
}
|
||||
continue
|
||||
}
|
||||
s.Nodes = append(s.Nodes, ®istry.Node{
|
||||
s.Nodes = append(s.Nodes, ®ister.Node{
|
||||
Id: strings.TrimSuffix(e.Name, "."+p.Service+"."+p.Domain+"."),
|
||||
Address: fmt.Sprintf("%s:%d", addr, e.Port),
|
||||
Metadata: txt.Metadata,
|
||||
})
|
||||
|
||||
serviceMap[txt.Version] = s
|
||||
case <-p.Context.Done():
|
||||
close(done)
|
||||
return
|
||||
//case <-p.Context.Done():
|
||||
// close(done)
|
||||
// return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// execute the query
|
||||
if err := mdns.Query(p); err != nil {
|
||||
if err := util.Query(ctx, p); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -512,7 +522,7 @@ func (m *mdnsRegistry) GetService(service string, opts ...registry.GetOption) ([
|
||||
<-done
|
||||
|
||||
// create list and return
|
||||
services := make([]*registry.Service, 0, len(serviceMap))
|
||||
services := make([]*register.Service, 0, len(serviceMap))
|
||||
|
||||
for _, service := range serviceMap {
|
||||
services = append(services, service)
|
||||
@@ -521,34 +531,34 @@ func (m *mdnsRegistry) GetService(service string, opts ...registry.GetOption) ([
|
||||
return services, nil
|
||||
}
|
||||
|
||||
func (m *mdnsRegistry) ListServices(opts ...registry.ListOption) ([]*registry.Service, error) {
|
||||
func (m *mdnsRegister) ListServices(ctx context.Context, opts ...register.ListOption) ([]*register.Service, error) {
|
||||
// parse the options
|
||||
var options registry.ListOptions
|
||||
var options register.ListOptions
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
if len(options.Domain) == 0 {
|
||||
options.Domain = m.defaultDomain
|
||||
}
|
||||
if options.Domain == registry.WildcardDomain {
|
||||
if options.Domain == register.WildcardDomain {
|
||||
options.Domain = m.globalDomain
|
||||
}
|
||||
|
||||
serviceMap := make(map[string]bool)
|
||||
entries := make(chan *mdns.ServiceEntry, 10)
|
||||
entries := make(chan *util.ServiceEntry, 10)
|
||||
done := make(chan bool)
|
||||
|
||||
p := mdns.DefaultParams("_services")
|
||||
p := util.DefaultParams("_services")
|
||||
// set context with timeout
|
||||
var cancel context.CancelFunc
|
||||
p.Context, cancel = context.WithTimeout(context.Background(), m.opts.Timeout)
|
||||
defer cancel()
|
||||
//var cancel context.CancelFunc
|
||||
//p.Context, cancel = context.WithTimeout(context.Background(), m.opts.Timeout)
|
||||
//defer cancel()
|
||||
// set entries channel
|
||||
p.Entries = entries
|
||||
// set domain
|
||||
p.Domain = options.Domain
|
||||
|
||||
var services []*registry.Service
|
||||
var services []*register.Service
|
||||
|
||||
go func() {
|
||||
for {
|
||||
@@ -563,17 +573,17 @@ func (m *mdnsRegistry) ListServices(opts ...registry.ListOption) ([]*registry.Se
|
||||
name := strings.TrimSuffix(e.Name, "."+p.Service+"."+p.Domain+".")
|
||||
if !serviceMap[name] {
|
||||
serviceMap[name] = true
|
||||
services = append(services, ®istry.Service{Name: name})
|
||||
services = append(services, ®ister.Service{Name: name})
|
||||
}
|
||||
case <-p.Context.Done():
|
||||
close(done)
|
||||
return
|
||||
//case <-p.Context.Done():
|
||||
// close(done)
|
||||
//return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// execute query
|
||||
if err := mdns.Query(p); err != nil {
|
||||
if err := util.Query(ctx, p); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -583,22 +593,22 @@ func (m *mdnsRegistry) ListServices(opts ...registry.ListOption) ([]*registry.Se
|
||||
return services, nil
|
||||
}
|
||||
|
||||
func (m *mdnsRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
|
||||
var wo registry.WatchOptions
|
||||
func (m *mdnsRegister) Watch(ctx context.Context, opts ...register.WatchOption) (register.Watcher, error) {
|
||||
var wo register.WatchOptions
|
||||
for _, o := range opts {
|
||||
o(&wo)
|
||||
}
|
||||
if len(wo.Domain) == 0 {
|
||||
wo.Domain = m.defaultDomain
|
||||
}
|
||||
if wo.Domain == registry.WildcardDomain {
|
||||
if wo.Domain == register.WildcardDomain {
|
||||
wo.Domain = m.globalDomain
|
||||
}
|
||||
|
||||
md := &mdnsWatcher{
|
||||
id: uuid.New().String(),
|
||||
wo: wo,
|
||||
ch: make(chan *mdns.ServiceEntry, 32),
|
||||
ch: make(chan *util.ServiceEntry, 32),
|
||||
exit: make(chan struct{}),
|
||||
domain: wo.Domain,
|
||||
registry: m,
|
||||
@@ -636,14 +646,14 @@ func (m *mdnsRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, er
|
||||
|
||||
// reset the listener
|
||||
exit := make(chan struct{})
|
||||
ch := make(chan *mdns.ServiceEntry, 32)
|
||||
ch := make(chan *util.ServiceEntry, 32)
|
||||
m.listener = ch
|
||||
|
||||
m.mtx.Unlock()
|
||||
|
||||
// send messages to the watchers
|
||||
go func() {
|
||||
send := func(w *mdnsWatcher, e *mdns.ServiceEntry) {
|
||||
send := func(w *mdnsWatcher, e *util.ServiceEntry) {
|
||||
select {
|
||||
case w.ch <- e:
|
||||
default:
|
||||
@@ -670,9 +680,9 @@ func (m *mdnsRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, er
|
||||
}()
|
||||
|
||||
// start listening, blocking call
|
||||
mdns.Listen(ch, exit)
|
||||
util.Listen(ch, exit)
|
||||
|
||||
// mdns.Listen has unblocked
|
||||
// util.Listen has unblocked
|
||||
// kill the saved listener
|
||||
m.mtx.Lock()
|
||||
m.listener = nil
|
||||
@@ -684,11 +694,11 @@ func (m *mdnsRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, er
|
||||
return md, nil
|
||||
}
|
||||
|
||||
func (m *mdnsRegistry) String() string {
|
||||
func (m *mdnsRegister) String() string {
|
||||
return "mdns"
|
||||
}
|
||||
|
||||
func (m *mdnsWatcher) Next() (*registry.Result, error) {
|
||||
func (m *mdnsWatcher) Next() (*register.Result, error) {
|
||||
for {
|
||||
select {
|
||||
case e := <-m.ch:
|
||||
@@ -713,7 +723,7 @@ func (m *mdnsWatcher) Next() (*registry.Result, error) {
|
||||
action = "create"
|
||||
}
|
||||
|
||||
service := ®istry.Service{
|
||||
service := ®ister.Service{
|
||||
Name: txt.Service,
|
||||
Version: txt.Version,
|
||||
Endpoints: txt.Endpoints,
|
||||
@@ -731,22 +741,22 @@ func (m *mdnsWatcher) Next() (*registry.Result, error) {
|
||||
addr = e.AddrV4.String()
|
||||
} else if len(e.AddrV6) > 0 {
|
||||
addr = "[" + e.AddrV6.String() + "]"
|
||||
} else {
|
||||
addr = e.Addr.String()
|
||||
// } else {
|
||||
// addr = e.Addr.String()
|
||||
}
|
||||
|
||||
service.Nodes = append(service.Nodes, ®istry.Node{
|
||||
service.Nodes = append(service.Nodes, ®ister.Node{
|
||||
Id: strings.TrimSuffix(e.Name, suffix),
|
||||
Address: fmt.Sprintf("%s:%d", addr, e.Port),
|
||||
Metadata: txt.Metadata,
|
||||
})
|
||||
|
||||
return ®istry.Result{
|
||||
return ®ister.Result{
|
||||
Action: action,
|
||||
Service: service,
|
||||
}, nil
|
||||
case <-m.exit:
|
||||
return nil, registry.ErrWatcherStopped
|
||||
return nil, register.ErrWatcherStopped
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -764,7 +774,11 @@ func (m *mdnsWatcher) Stop() {
|
||||
}
|
||||
}
|
||||
|
||||
// NewRegistry returns a new default registry which is mdns
|
||||
func NewRegistry(opts ...registry.Option) registry.Registry {
|
||||
return newRegistry(opts...)
|
||||
func (m *mdnsRegister) Name() string {
|
||||
return m.opts.Name
|
||||
}
|
||||
|
||||
// NewRegistry returns a new default registry which is mdns
|
||||
func NewRegister(opts ...register.Option) register.Register {
|
||||
return newRegister(opts...)
|
||||
}
|
||||
|
Reference in New Issue
Block a user