Merge pull request #598 from unistack-org/ipv6fix

bunch of other ipv6 fixes
This commit is contained in:
Asim Aslam 2019-07-18 07:24:47 -07:00 committed by GitHub
commit 8f2585724c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 56 additions and 40 deletions

View File

@ -627,12 +627,6 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
return nil, err return nil, err
} }
// ipv6 addr
if addr == "::" {
// ipv6 addr
addr = fmt.Sprintf("[%s]", addr)
}
// create unique id // create unique id
id := h.id + "." + uuid.New().String() id := h.id + "." + uuid.New().String()
@ -645,7 +639,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
// register service // register service
node := &registry.Node{ node := &registry.Node{
Id: id, Id: id,
Address: fmt.Sprintf("%s:%s", addr, port), Address: mnet.HostPort(addr, port),
Metadata: map[string]string{ Metadata: map[string]string{
"secure": fmt.Sprintf("%t", secure), "secure": fmt.Sprintf("%t", secure),
}, },

View File

@ -3,15 +3,20 @@ package memory
import ( import (
"errors" "errors"
"math/rand"
"sync" "sync"
"time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/micro/go-micro/broker" "github.com/micro/go-micro/broker"
maddr "github.com/micro/go-micro/util/addr"
mnet "github.com/micro/go-micro/util/net"
) )
type memoryBroker struct { type memoryBroker struct {
opts broker.Options opts broker.Options
addr string
sync.RWMutex sync.RWMutex
connected bool connected bool
Subscribers map[string][]*memorySubscriber Subscribers map[string][]*memorySubscriber
@ -35,7 +40,7 @@ func (m *memoryBroker) Options() broker.Options {
} }
func (m *memoryBroker) Address() string { func (m *memoryBroker) Address() string {
return "" return m.addr
} }
func (m *memoryBroker) Connect() error { func (m *memoryBroker) Connect() error {
@ -46,6 +51,15 @@ func (m *memoryBroker) Connect() error {
return nil return nil
} }
addr, err := maddr.Extract("::")
if err != nil {
return err
}
i := rand.Intn(20000)
// set addr with port
addr = mnet.HostPort(addr, 10000+i)
m.addr = addr
m.connected = true m.connected = true
return nil return nil
@ -171,6 +185,7 @@ func (m *memorySubscriber) Unsubscribe() error {
func NewBroker(opts ...broker.Option) broker.Broker { func NewBroker(opts ...broker.Option) broker.Broker {
var options broker.Options var options broker.Options
rand.Seed(time.Now().UnixNano())
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
} }

View File

@ -22,6 +22,7 @@ import (
"github.com/micro/go-micro/util/addr" "github.com/micro/go-micro/util/addr"
mgrpc "github.com/micro/go-micro/util/grpc" mgrpc "github.com/micro/go-micro/util/grpc"
"github.com/micro/go-micro/util/log" "github.com/micro/go-micro/util/log"
mnet "github.com/micro/go-micro/util/net"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
@ -519,15 +520,12 @@ func (g *grpcServer) Register() error {
advt = config.Address advt = config.Address
} }
if idx := strings.Count(advt, ":"); idx > 1 { if cnt := strings.Count(advt, ":"); cnt >= 1 {
// ipv6 address in format [host]:port or ipv4 host:port // ipv6 address in format [host]:port or ipv4 host:port
host, port, err = net.SplitHostPort(advt) host, port, err = net.SplitHostPort(advt)
if err != nil { if err != nil {
return err return err
} }
if host == "::" {
host = fmt.Sprintf("[%s]", host)
}
} else { } else {
host = advt host = advt
} }
@ -540,7 +538,7 @@ func (g *grpcServer) Register() error {
// register service // register service
node := &registry.Node{ node := &registry.Node{
Id: config.Name + "-" + config.Id, Id: config.Name + "-" + config.Id,
Address: fmt.Sprintf("%s:%s", addr, port), Address: mnet.HostPort(addr, port),
Metadata: config.Metadata, Metadata: config.Metadata,
} }
@ -649,15 +647,12 @@ func (g *grpcServer) Deregister() error {
advt = config.Address advt = config.Address
} }
if idx := strings.Count(advt, ":"); idx > 1 { if cnt := strings.Count(advt, ":"); cnt >= 1 {
// ipv6 address in format [host]:port or ipv4 host:port // ipv6 address in format [host]:port or ipv4 host:port
host, port, err = net.SplitHostPort(advt) host, port, err = net.SplitHostPort(advt)
if err != nil { if err != nil {
return err return err
} }
if host == "::" {
host = fmt.Sprintf("[%s]", host)
}
} else { } else {
host = advt host = advt
} }
@ -669,7 +664,7 @@ func (g *grpcServer) Deregister() error {
node := &registry.Node{ node := &registry.Node{
Id: config.Name + "-" + config.Id, Id: config.Name + "-" + config.Id,
Address: fmt.Sprintf("%s:%s", addr, port), Address: mnet.HostPort(addr, port),
} }
service := &registry.Service{ service := &registry.Service{

View File

@ -18,6 +18,7 @@ import (
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
"github.com/micro/go-micro/util/addr" "github.com/micro/go-micro/util/addr"
log "github.com/micro/go-micro/util/log" log "github.com/micro/go-micro/util/log"
mnet "github.com/micro/go-micro/util/net"
) )
type rpcServer struct { type rpcServer struct {
@ -293,15 +294,12 @@ func (s *rpcServer) Register() error {
advt = config.Address advt = config.Address
} }
if idx := strings.Count(advt, ":"); idx > 1 { if cnt := strings.Count(advt, ":"); cnt >= 1 {
// ipv6 address in format [host]:port or ipv4 host:port // ipv6 address in format [host]:port or ipv4 host:port
host, port, err = net.SplitHostPort(advt) host, port, err = net.SplitHostPort(advt)
if err != nil { if err != nil {
return err return err
} }
if host == "::" {
host = fmt.Sprintf("[%s]", host)
}
} else { } else {
host = advt host = advt
} }
@ -320,7 +318,7 @@ func (s *rpcServer) Register() error {
// register service // register service
node := &registry.Node{ node := &registry.Node{
Id: config.Name + "-" + config.Id, Id: config.Name + "-" + config.Id,
Address: fmt.Sprintf("%s:%s", addr, port), Address: mnet.HostPort(addr, port),
Metadata: md, Metadata: md,
} }
@ -434,15 +432,12 @@ func (s *rpcServer) Deregister() error {
advt = config.Address advt = config.Address
} }
if idx := strings.Count(advt, ":"); idx > 1 { if cnt := strings.Count(advt, ":"); cnt >= 1 {
// ipv6 address in format [host]:port or ipv4 host:port // ipv6 address in format [host]:port or ipv4 host:port
host, port, err = net.SplitHostPort(advt) host, port, err = net.SplitHostPort(advt)
if err != nil { if err != nil {
return err return err
} }
if host == "::" {
host = fmt.Sprintf("[%s]", host)
}
} else { } else {
host = advt host = advt
} }
@ -454,7 +449,7 @@ func (s *rpcServer) Deregister() error {
node := &registry.Node{ node := &registry.Node{
Id: config.Name + "-" + config.Id, Id: config.Name + "-" + config.Id,
Address: fmt.Sprintf("%s:%s", addr, port), Address: mnet.HostPort(addr, port),
} }
service := &registry.Service{ service := &registry.Service{

View File

@ -5,11 +5,13 @@ import (
"errors" "errors"
"fmt" "fmt"
"math/rand" "math/rand"
"strings" "net"
"sync" "sync"
"time" "time"
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
maddr "github.com/micro/go-micro/util/addr"
mnet "github.com/micro/go-micro/util/net"
) )
type memorySocket struct { type memorySocket struct {
@ -170,15 +172,25 @@ func (m *memoryTransport) Listen(addr string, opts ...transport.ListenOption) (t
o(&options) o(&options)
} }
parts := strings.Split(addr, ":") host, port, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
}
addr, err = maddr.Extract(host)
if err != nil {
return nil, err
}
// if zero port then randomly assign one // if zero port then randomly assign one
if len(parts) > 1 && parts[len(parts)-1] == "0" { if len(port) > 0 && port == "0" {
i := rand.Intn(20000) i := rand.Intn(20000)
// set addr with port port = fmt.Sprintf("%d", 10000+i)
addr = fmt.Sprintf("%s:%d", parts[:len(parts)-1], 10000+i)
} }
// set addr with port
addr = mnet.HostPort(addr, port)
if _, ok := m.listeners[addr]; ok { if _, ok := m.listeners[addr]; ok {
return nil, errors.New("already listening on " + addr) return nil, errors.New("already listening on " + addr)
} }

View File

@ -10,7 +10,7 @@ func TestMemoryTransport(t *testing.T) {
tr := NewTransport() tr := NewTransport()
// bind / listen // bind / listen
l, err := tr.Listen("localhost:8080") l, err := tr.Listen("127.0.0.1:8080")
if err != nil { if err != nil {
t.Fatalf("Unexpected error listening %v", err) t.Fatalf("Unexpected error listening %v", err)
} }
@ -37,7 +37,7 @@ func TestMemoryTransport(t *testing.T) {
}() }()
// dial // dial
c, err := tr.Dial("localhost:8080") c, err := tr.Dial("127.0.0.1:8080")
if err != nil { if err != nil {
t.Fatalf("Unexpected error dialing %v", err) t.Fatalf("Unexpected error dialing %v", err)
} }

View File

@ -8,6 +8,15 @@ import (
"strings" "strings"
) )
// HostPort format addr and port suitable for dial
func HostPort(addr string, port interface{}) string {
host := addr
if strings.Count(addr, ":") > 0 {
host = fmt.Sprintf("[%s]", addr)
}
return fmt.Sprintf("%s:%v", host, port)
}
// Listen takes addr:portmin-portmax and binds to the first available port // Listen takes addr:portmin-portmax and binds to the first available port
// Example: Listen("localhost:5000-6000", fn) // Example: Listen("localhost:5000-6000", fn)
func Listen(addr string, fn func(string) (net.Listener, error)) (net.Listener, error) { func Listen(addr string, fn func(string) (net.Listener, error)) (net.Listener, error) {
@ -22,10 +31,6 @@ func Listen(addr string, fn func(string) (net.Listener, error)) (net.Listener, e
return nil, err return nil, err
} }
if host == "::" {
host = fmt.Sprintf("[%s]", host)
}
// try to extract port range // try to extract port range
prange := strings.Split(ports, "-") prange := strings.Split(ports, "-")
@ -51,7 +56,7 @@ func Listen(addr string, fn func(string) (net.Listener, error)) (net.Listener, e
// range the ports // range the ports
for port := min; port <= max; port++ { for port := min; port <= max; port++ {
// try bind to host:port // try bind to host:port
ln, err := fn(fmt.Sprintf("%s:%d", host, port)) ln, err := fn(HostPort(host, port))
if err == nil { if err == nil {
return ln, nil return ln, nil
} }