commit
3a8edd705c
@ -13,8 +13,6 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"runtime"
|
"runtime"
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -614,18 +612,27 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
|
func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
|
||||||
|
var err error
|
||||||
|
var host, port string
|
||||||
options := NewSubscribeOptions(opts...)
|
options := NewSubscribeOptions(opts...)
|
||||||
|
|
||||||
// parse address for host, port
|
// parse address for host, port
|
||||||
parts := strings.Split(h.Address(), ":")
|
host, port, err = net.SplitHostPort(h.Address())
|
||||||
host := strings.Join(parts[:len(parts)-1], ":")
|
if err != nil {
|
||||||
port, _ := strconv.Atoi(parts[len(parts)-1])
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
addr, err := maddr.Extract(host)
|
addr, err := maddr.Extract(host)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ipv6 addr
|
||||||
|
if addr == "::" {
|
||||||
|
// ipv6 addr
|
||||||
|
addr = fmt.Sprintf("[%s]", addr)
|
||||||
|
}
|
||||||
|
|
||||||
// create unique id
|
// create unique id
|
||||||
id := h.id + "." + uuid.New().String()
|
id := h.id + "." + uuid.New().String()
|
||||||
|
|
||||||
@ -638,7 +645,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
|
|||||||
// register service
|
// register service
|
||||||
node := ®istry.Node{
|
node := ®istry.Node{
|
||||||
Id: id,
|
Id: id,
|
||||||
Address: fmt.Sprintf("%s:%d", addr, port),
|
Address: fmt.Sprintf("%s:%s", addr, port),
|
||||||
Metadata: map[string]string{
|
Metadata: map[string]string{
|
||||||
"secure": fmt.Sprintf("%t", secure),
|
"secure": fmt.Sprintf("%t", secure),
|
||||||
},
|
},
|
||||||
|
@ -2,10 +2,7 @@ package grpc
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"net"
|
"net"
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/micro/go-micro/client"
|
"github.com/micro/go-micro/client"
|
||||||
@ -37,10 +34,6 @@ func TestGRPCClient(t *testing.T) {
|
|||||||
go s.Serve(l)
|
go s.Serve(l)
|
||||||
defer s.Stop()
|
defer s.Stop()
|
||||||
|
|
||||||
parts := strings.Split(l.Addr().String(), ":")
|
|
||||||
port, _ := strconv.Atoi(parts[len(parts)-1])
|
|
||||||
addr := strings.Join(parts[:len(parts)-1], ":")
|
|
||||||
|
|
||||||
// create mock registry
|
// create mock registry
|
||||||
r := memory.NewRegistry()
|
r := memory.NewRegistry()
|
||||||
|
|
||||||
@ -51,7 +44,7 @@ func TestGRPCClient(t *testing.T) {
|
|||||||
Nodes: []*registry.Node{
|
Nodes: []*registry.Node{
|
||||||
®istry.Node{
|
®istry.Node{
|
||||||
Id: "test-1",
|
Id: "test-1",
|
||||||
Address: fmt.Sprintf("%s:%d", addr, port),
|
Address: l.Addr().String(),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
@ -504,10 +504,11 @@ func (g *grpcServer) Subscribe(sb server.Subscriber) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) Register() error {
|
func (g *grpcServer) Register() error {
|
||||||
|
var err error
|
||||||
|
var advt, host, port string
|
||||||
|
|
||||||
// parse address for host, port
|
// parse address for host, port
|
||||||
config := g.opts
|
config := g.opts
|
||||||
var advt, host string
|
|
||||||
var port int
|
|
||||||
|
|
||||||
// check the advertise address first
|
// check the advertise address first
|
||||||
// if it exists then use it, otherwise
|
// if it exists then use it, otherwise
|
||||||
@ -518,12 +519,17 @@ func (g *grpcServer) Register() error {
|
|||||||
advt = config.Address
|
advt = config.Address
|
||||||
}
|
}
|
||||||
|
|
||||||
parts := strings.Split(advt, ":")
|
if idx := strings.Count(advt, ":"); idx > 1 {
|
||||||
if len(parts) > 1 {
|
// ipv6 address in format [host]:port or ipv4 host:port
|
||||||
host = strings.Join(parts[:len(parts)-1], ":")
|
host, port, err = net.SplitHostPort(advt)
|
||||||
port, _ = strconv.Atoi(parts[len(parts)-1])
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if host == "::" {
|
||||||
|
host = fmt.Sprintf("[%s]", host)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
host = parts[0]
|
host = advt
|
||||||
}
|
}
|
||||||
|
|
||||||
addr, err := addr.Extract(host)
|
addr, err := addr.Extract(host)
|
||||||
@ -534,7 +540,7 @@ func (g *grpcServer) Register() error {
|
|||||||
// register service
|
// register service
|
||||||
node := ®istry.Node{
|
node := ®istry.Node{
|
||||||
Id: config.Name + "-" + config.Id,
|
Id: config.Name + "-" + config.Id,
|
||||||
Address: fmt.Sprintf("%s:%d", addr, port),
|
Address: fmt.Sprintf("%s:%s", addr, port),
|
||||||
Metadata: config.Metadata,
|
Metadata: config.Metadata,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -629,9 +635,10 @@ func (g *grpcServer) Register() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) Deregister() error {
|
func (g *grpcServer) Deregister() error {
|
||||||
|
var err error
|
||||||
|
var advt, host, port string
|
||||||
|
|
||||||
config := g.opts
|
config := g.opts
|
||||||
var advt, host string
|
|
||||||
var port int
|
|
||||||
|
|
||||||
// check the advertise address first
|
// check the advertise address first
|
||||||
// if it exists then use it, otherwise
|
// if it exists then use it, otherwise
|
||||||
@ -642,12 +649,17 @@ func (g *grpcServer) Deregister() error {
|
|||||||
advt = config.Address
|
advt = config.Address
|
||||||
}
|
}
|
||||||
|
|
||||||
parts := strings.Split(advt, ":")
|
if idx := strings.Count(advt, ":"); idx > 1 {
|
||||||
if len(parts) > 1 {
|
// ipv6 address in format [host]:port or ipv4 host:port
|
||||||
host = strings.Join(parts[:len(parts)-1], ":")
|
host, port, err = net.SplitHostPort(advt)
|
||||||
port, _ = strconv.Atoi(parts[len(parts)-1])
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if host == "::" {
|
||||||
|
host = fmt.Sprintf("[%s]", host)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
host = parts[0]
|
host = advt
|
||||||
}
|
}
|
||||||
|
|
||||||
addr, err := addr.Extract(host)
|
addr, err := addr.Extract(host)
|
||||||
@ -657,7 +669,7 @@ func (g *grpcServer) Deregister() error {
|
|||||||
|
|
||||||
node := ®istry.Node{
|
node := ®istry.Node{
|
||||||
Id: config.Name + "-" + config.Id,
|
Id: config.Name + "-" + config.Id,
|
||||||
Address: fmt.Sprintf("%s:%d", addr, port),
|
Address: fmt.Sprintf("%s:%s", addr, port),
|
||||||
}
|
}
|
||||||
|
|
||||||
service := ®istry.Service{
|
service := ®istry.Service{
|
||||||
|
@ -3,6 +3,7 @@ package server
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -277,10 +278,11 @@ func (s *rpcServer) Subscribe(sb Subscriber) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *rpcServer) Register() error {
|
func (s *rpcServer) Register() error {
|
||||||
|
var err error
|
||||||
|
var advt, host, port string
|
||||||
|
|
||||||
// parse address for host, port
|
// parse address for host, port
|
||||||
config := s.Options()
|
config := s.Options()
|
||||||
var advt, host string
|
|
||||||
var port int
|
|
||||||
|
|
||||||
// check the advertise address first
|
// check the advertise address first
|
||||||
// if it exists then use it, otherwise
|
// if it exists then use it, otherwise
|
||||||
@ -291,12 +293,17 @@ func (s *rpcServer) Register() error {
|
|||||||
advt = config.Address
|
advt = config.Address
|
||||||
}
|
}
|
||||||
|
|
||||||
parts := strings.Split(advt, ":")
|
if idx := strings.Count(advt, ":"); idx > 1 {
|
||||||
if len(parts) > 1 {
|
// ipv6 address in format [host]:port or ipv4 host:port
|
||||||
host = strings.Join(parts[:len(parts)-1], ":")
|
host, port, err = net.SplitHostPort(advt)
|
||||||
port, _ = strconv.Atoi(parts[len(parts)-1])
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if host == "::" {
|
||||||
|
host = fmt.Sprintf("[%s]", host)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
host = parts[0]
|
host = advt
|
||||||
}
|
}
|
||||||
|
|
||||||
addr, err := addr.Extract(host)
|
addr, err := addr.Extract(host)
|
||||||
@ -313,7 +320,7 @@ func (s *rpcServer) Register() error {
|
|||||||
// register service
|
// register service
|
||||||
node := ®istry.Node{
|
node := ®istry.Node{
|
||||||
Id: config.Name + "-" + config.Id,
|
Id: config.Name + "-" + config.Id,
|
||||||
Address: fmt.Sprintf("%s:%d", addr, port),
|
Address: fmt.Sprintf("%s:%s", addr, port),
|
||||||
Metadata: md,
|
Metadata: md,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -413,9 +420,10 @@ func (s *rpcServer) Register() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *rpcServer) Deregister() error {
|
func (s *rpcServer) Deregister() error {
|
||||||
|
var err error
|
||||||
|
var advt, host, port string
|
||||||
|
|
||||||
config := s.Options()
|
config := s.Options()
|
||||||
var advt, host string
|
|
||||||
var port int
|
|
||||||
|
|
||||||
// check the advertise address first
|
// check the advertise address first
|
||||||
// if it exists then use it, otherwise
|
// if it exists then use it, otherwise
|
||||||
@ -426,12 +434,17 @@ func (s *rpcServer) Deregister() error {
|
|||||||
advt = config.Address
|
advt = config.Address
|
||||||
}
|
}
|
||||||
|
|
||||||
parts := strings.Split(advt, ":")
|
if idx := strings.Count(advt, ":"); idx > 1 {
|
||||||
if len(parts) > 1 {
|
// ipv6 address in format [host]:port or ipv4 host:port
|
||||||
host = strings.Join(parts[:len(parts)-1], ":")
|
host, port, err = net.SplitHostPort(advt)
|
||||||
port, _ = strconv.Atoi(parts[len(parts)-1])
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if host == "::" {
|
||||||
|
host = fmt.Sprintf("[%s]", host)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
host = parts[0]
|
host = advt
|
||||||
}
|
}
|
||||||
|
|
||||||
addr, err := addr.Extract(host)
|
addr, err := addr.Extract(host)
|
||||||
@ -441,7 +454,7 @@ func (s *rpcServer) Deregister() error {
|
|||||||
|
|
||||||
node := ®istry.Node{
|
node := ®istry.Node{
|
||||||
Id: config.Name + "-" + config.Id,
|
Id: config.Name + "-" + config.Id,
|
||||||
Address: fmt.Sprintf("%s:%d", addr, port),
|
Address: fmt.Sprintf("%s:%s", addr, port),
|
||||||
}
|
}
|
||||||
|
|
||||||
service := ®istry.Service{
|
service := ®istry.Service{
|
||||||
|
@ -1,15 +1,17 @@
|
|||||||
package grpc
|
package grpc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"strings"
|
"net"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/micro/go-micro/transport"
|
"github.com/micro/go-micro/transport"
|
||||||
)
|
)
|
||||||
|
|
||||||
func expectedPort(t *testing.T, expected string, lsn transport.Listener) {
|
func expectedPort(t *testing.T, expected string, lsn transport.Listener) {
|
||||||
parts := strings.Split(lsn.Addr(), ":")
|
_, port, err := net.SplitHostPort(lsn.Addr())
|
||||||
port := parts[len(parts)-1]
|
if err != nil {
|
||||||
|
t.Errorf("Expected address to be `%s`, got error: %v", expected, err)
|
||||||
|
}
|
||||||
|
|
||||||
if port != expected {
|
if port != expected {
|
||||||
lsn.Close()
|
lsn.Close()
|
||||||
|
@ -2,14 +2,16 @@ package transport
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
"strings"
|
"net"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func expectedPort(t *testing.T, expected string, lsn Listener) {
|
func expectedPort(t *testing.T, expected string, lsn Listener) {
|
||||||
parts := strings.Split(lsn.Addr(), ":")
|
_, port, err := net.SplitHostPort(lsn.Addr())
|
||||||
port := parts[len(parts)-1]
|
if err != nil {
|
||||||
|
t.Errorf("Expected address to be `%s`, got error: %v", expected, err)
|
||||||
|
}
|
||||||
|
|
||||||
if port != expected {
|
if port != expected {
|
||||||
lsn.Close()
|
lsn.Close()
|
||||||
|
@ -30,7 +30,7 @@ func isPrivateIP(ipAddr string) bool {
|
|||||||
// Extract returns a real ip
|
// Extract returns a real ip
|
||||||
func Extract(addr string) (string, error) {
|
func Extract(addr string) (string, error) {
|
||||||
// if addr specified then its returned
|
// if addr specified then its returned
|
||||||
if len(addr) > 0 && (addr != "0.0.0.0" && addr != "[::]") {
|
if len(addr) > 0 && (addr != "0.0.0.0" && addr != "[::]" && addr != "::") {
|
||||||
return addr, nil
|
return addr, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -113,10 +113,13 @@ func IPs() []string {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// dont skip ipv6 addrs
|
||||||
|
/*
|
||||||
ip = ip.To4()
|
ip = ip.To4()
|
||||||
if ip == nil {
|
if ip == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
ipAddrs = append(ipAddrs, ip.String())
|
ipAddrs = append(ipAddrs, ip.String())
|
||||||
}
|
}
|
||||||
|
@ -11,39 +11,43 @@ import (
|
|||||||
// Listen takes addr:portmin-portmax and binds to the first available port
|
// Listen takes addr:portmin-portmax and binds to the first available port
|
||||||
// Example: Listen("localhost:5000-6000", fn)
|
// Example: Listen("localhost:5000-6000", fn)
|
||||||
func Listen(addr string, fn func(string) (net.Listener, error)) (net.Listener, error) {
|
func Listen(addr string, fn func(string) (net.Listener, error)) (net.Listener, error) {
|
||||||
// host:port || host:min-max
|
|
||||||
parts := strings.Split(addr, ":")
|
|
||||||
|
|
||||||
//
|
if strings.Count(addr, ":") == 1 && strings.Count(addr, "-") == 0 {
|
||||||
if len(parts) < 2 {
|
|
||||||
return fn(addr)
|
return fn(addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// host:port || host:min-max
|
||||||
|
host, ports, err := net.SplitHostPort(addr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if host == "::" {
|
||||||
|
host = fmt.Sprintf("[%s]", host)
|
||||||
|
}
|
||||||
|
|
||||||
// try to extract port range
|
// try to extract port range
|
||||||
ports := strings.Split(parts[len(parts)-1], "-")
|
prange := strings.Split(ports, "-")
|
||||||
|
|
||||||
// single port
|
// single port
|
||||||
if len(ports) < 2 {
|
if len(prange) < 2 {
|
||||||
return fn(addr)
|
return fn(addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// we have a port range
|
// we have a port range
|
||||||
|
|
||||||
// extract min port
|
// extract min port
|
||||||
min, err := strconv.Atoi(ports[0])
|
min, err := strconv.Atoi(prange[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.New("unable to extract port range")
|
return nil, errors.New("unable to extract port range")
|
||||||
}
|
}
|
||||||
|
|
||||||
// extract max port
|
// extract max port
|
||||||
max, err := strconv.Atoi(ports[1])
|
max, err := strconv.Atoi(prange[1])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.New("unable to extract port range")
|
return nil, errors.New("unable to extract port range")
|
||||||
}
|
}
|
||||||
|
|
||||||
// set host
|
|
||||||
host := parts[:len(parts)-1]
|
|
||||||
|
|
||||||
// range the ports
|
// range the ports
|
||||||
for port := min; port <= max; port++ {
|
for port := min; port <= max; port++ {
|
||||||
// try bind to host:port
|
// try bind to host:port
|
||||||
|
Loading…
Reference in New Issue
Block a user