Merge branch 'master' into grpc
This commit is contained in:
		| @@ -295,7 +295,7 @@ func (g *grpcClient) Options() client.Options { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (g *grpcClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message { | func (g *grpcClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message { | ||||||
| 	return newGRPCPublication(topic, msg, "application/octet-stream") | 	return newGRPCPublication(topic, msg, g.opts.ContentType, opts...) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (g *grpcClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request { | func (g *grpcClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request { | ||||||
|   | |||||||
| @@ -1,46 +1,46 @@ | |||||||
| // Package network is an interface for defining a network overlay | // Package network is a package for defining a network overlay | ||||||
| package network | package network | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
| 	"github.com/micro/go-micro/config/options" | 	"github.com/micro/go-micro/config/options" | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | // Network is an interface defining networks or graphs | ||||||
| type Network interface { | type Network interface { | ||||||
| 	options.Options | 	options.Options | ||||||
| 	// Id of this network | 	// Id of this node | ||||||
| 	Id() string | 	Id() uint64 | ||||||
| 	// Connect to the network with id | 	// Connect to a node | ||||||
| 	Connect(id string) error | 	Connect(id uint64) (Link, error) | ||||||
| 	// Close the network connection | 	// Close the network connection | ||||||
| 	Close() error | 	Close() error | ||||||
| 	// Accept messages | 	// Accept messages on the network | ||||||
| 	Accept() (*Message, error) | 	Accept() (*Message, error) | ||||||
| 	// Send a message | 	// Send a message to the network | ||||||
| 	Send(*Message) error | 	Send(*Message) error | ||||||
| 	// Advertise a service on this network | 	// Retrieve list of connections | ||||||
| 	Advertise(service string) error | 	Links() ([]Link, error) | ||||||
| 	// Retrieve list of nodes for a service |  | ||||||
| 	Nodes(service string) ([]Node, error) |  | ||||||
| } | } | ||||||
|  |  | ||||||
| // Node represents a network node | // Node represents a network node | ||||||
| type Node interface { | type Node interface { | ||||||
| 	// Id of the node | 	// Node is a network. Network is a node. | ||||||
| 	Id() string | 	Network | ||||||
| 	// The network for this node |  | ||||||
| 	Network() Network |  | ||||||
| } | } | ||||||
|  |  | ||||||
| // Message is a message sent over the network | // Link is a connection to another node | ||||||
| type Message struct { | type Link interface { | ||||||
| 	// Headers are the routing headers | 	// remote node | ||||||
| 	// e.g Micro-Service, Micro-Endpoint, Micro-Network | 	Node | ||||||
| 	// see https://github.com/micro/development/blob/master/protocol.md | 	// length of link which dictates speed | ||||||
| 	Header map[string]string | 	Length() int | ||||||
| 	// Body is the encaspulated payload | 	// weight of link which dictates curvature | ||||||
| 	Body []byte | 	Weight() int | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // Message is the base type for opaque data | ||||||
|  | type Message []byte | ||||||
|  |  | ||||||
| var ( | var ( | ||||||
| 	// TODO: set default network | 	// TODO: set default network | ||||||
| 	DefaultNetwork Network | 	DefaultNetwork Network | ||||||
|   | |||||||
| @@ -1,203 +0,0 @@ | |||||||
| // Package transport implements the network as a transport interface |  | ||||||
| package transport |  | ||||||
|  |  | ||||||
| import ( |  | ||||||
| 	"context" |  | ||||||
| 	"time" |  | ||||||
|  |  | ||||||
| 	"github.com/micro/go-micro/network" |  | ||||||
| 	"github.com/micro/go-micro/transport" |  | ||||||
| 	"github.com/micro/go-micro/util/backoff" |  | ||||||
| ) |  | ||||||
|  |  | ||||||
| type networkKey struct{} |  | ||||||
|  |  | ||||||
| // Transport is a network transport |  | ||||||
| type Transport struct { |  | ||||||
| 	Network network.Network |  | ||||||
| 	options transport.Options |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Socket is a transport socket |  | ||||||
| type Socket struct { |  | ||||||
| 	// The service |  | ||||||
| 	Service string |  | ||||||
|  |  | ||||||
| 	// Send via Network.Send(Message) |  | ||||||
| 	Network network.Network |  | ||||||
|  |  | ||||||
| 	// Remote/Local |  | ||||||
| 	remote, local string |  | ||||||
|  |  | ||||||
| 	// the first message if its a listener |  | ||||||
| 	message *network.Message |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Listener is a transport listener |  | ||||||
| type Listener struct { |  | ||||||
| 	// The local service |  | ||||||
| 	Service string |  | ||||||
|  |  | ||||||
| 	// The network |  | ||||||
| 	Network network.Network |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (s *Socket) Local() string { |  | ||||||
| 	return s.local |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (s *Socket) Remote() string { |  | ||||||
| 	return s.remote |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (s *Socket) Close() error { |  | ||||||
| 	// TODO: should it close the network? |  | ||||||
| 	return s.Network.Close() |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (t *Transport) Init(opts ...transport.Option) error { |  | ||||||
| 	for _, o := range opts { |  | ||||||
| 		o(&t.options) |  | ||||||
| 	} |  | ||||||
| 	return nil |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (t *Transport) Options() transport.Options { |  | ||||||
| 	return t.options |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (t *Transport) Dial(service string, opts ...transport.DialOption) (transport.Client, error) { |  | ||||||
| 	// TODO: establish pseudo socket? |  | ||||||
| 	return &Socket{ |  | ||||||
| 		Service: service, |  | ||||||
| 		Network: t.Network, |  | ||||||
| 		remote:  service, |  | ||||||
| 		// TODO: local |  | ||||||
| 		local: "local", |  | ||||||
| 	}, nil |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (t *Transport) Listen(service string, opts ...transport.ListenOption) (transport.Listener, error) { |  | ||||||
| 	// TODO specify connect id |  | ||||||
| 	if err := t.Network.Connect("micro.mu"); err != nil { |  | ||||||
| 		return nil, err |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// advertise the service |  | ||||||
| 	if err := t.Network.Advertise(service); err != nil { |  | ||||||
| 		return nil, err |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	return &Listener{ |  | ||||||
| 		Service: service, |  | ||||||
| 		Network: t.Network, |  | ||||||
| 	}, nil |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (t *Transport) String() string { |  | ||||||
| 	return "network" |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (s *Socket) Send(msg *transport.Message) error { |  | ||||||
| 	// TODO: set routing headers? |  | ||||||
| 	return s.Network.Send(&network.Message{ |  | ||||||
| 		Header: msg.Header, |  | ||||||
| 		Body:   msg.Body, |  | ||||||
| 	}) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (s *Socket) Recv(msg *transport.Message) error { |  | ||||||
| 	if msg == nil { |  | ||||||
| 		msg = new(transport.Message) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// return first message |  | ||||||
| 	if s.message != nil { |  | ||||||
| 		msg.Header = s.message.Header |  | ||||||
| 		msg.Body = s.message.Body |  | ||||||
| 		s.message = nil |  | ||||||
| 		return nil |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	m, err := s.Network.Accept() |  | ||||||
| 	if err != nil { |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	msg.Header = m.Header |  | ||||||
| 	msg.Body = m.Body |  | ||||||
| 	return nil |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (l *Listener) Addr() string { |  | ||||||
| 	return l.Service |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (l *Listener) Close() error { |  | ||||||
| 	return l.Network.Close() |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (l *Listener) Accept(fn func(transport.Socket)) error { |  | ||||||
| 	var i int |  | ||||||
|  |  | ||||||
| 	for { |  | ||||||
| 		msg, err := l.Network.Accept() |  | ||||||
| 		if err != nil { |  | ||||||
| 			// increment error counter |  | ||||||
| 			i++ |  | ||||||
|  |  | ||||||
| 			// break if lots of error |  | ||||||
| 			if i > 3 { |  | ||||||
| 				return err |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			// otherwise continue |  | ||||||
| 			time.Sleep(backoff.Do(i)) |  | ||||||
| 			continue |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		// reset |  | ||||||
| 		i = 0 |  | ||||||
|  |  | ||||||
| 		// execute in go routine |  | ||||||
| 		go fn(&Socket{ |  | ||||||
| 			Service: l.Service, |  | ||||||
| 			Network: l.Network, |  | ||||||
| 			local:   l.Service, |  | ||||||
| 			// TODO: remote |  | ||||||
| 			remote:  "remote", |  | ||||||
| 			message: msg, |  | ||||||
| 		}) |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // NewTransport returns a new network transport. It assumes the network is already connected |  | ||||||
| func NewTransport(opts ...transport.Option) transport.Transport { |  | ||||||
| 	options := transport.Options{ |  | ||||||
| 		Context: context.Background(), |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	for _, o := range opts { |  | ||||||
| 		o(&options) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	net, ok := options.Context.Value(networkKey{}).(network.Network) |  | ||||||
| 	if !ok { |  | ||||||
| 		net = network.DefaultNetwork |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	return &Transport{ |  | ||||||
| 		options: options, |  | ||||||
| 		Network: net, |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // WithNetwork passes in the network |  | ||||||
| func WithNetwork(n network.Network) transport.Option { |  | ||||||
| 	return func(o *transport.Options) { |  | ||||||
| 		if o.Context == nil { |  | ||||||
| 			o.Context = context.Background() |  | ||||||
| 		} |  | ||||||
| 		o.Context = context.WithValue(o.Context, networkKey{}, n) |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| @@ -734,6 +734,12 @@ func (g *gossipRegistry) Register(s *registry.Service, opts ...registry.Register | |||||||
| 		notify: nil, | 		notify: nil, | ||||||
| 	}) | 	}) | ||||||
|  |  | ||||||
|  | 	// send update to local watchers | ||||||
|  | 	g.updates <- &update{ | ||||||
|  | 		Update:  up, | ||||||
|  | 		Service: s, | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	// wait | 	// wait | ||||||
| 	<-time.After(g.interval * 2) | 	<-time.After(g.interval * 2) | ||||||
|  |  | ||||||
| @@ -770,6 +776,12 @@ func (g *gossipRegistry) Deregister(s *registry.Service) error { | |||||||
| 		notify: nil, | 		notify: nil, | ||||||
| 	}) | 	}) | ||||||
|  |  | ||||||
|  | 	// send update to local watchers | ||||||
|  | 	g.updates <- &update{ | ||||||
|  | 		Update:  up, | ||||||
|  | 		Service: s, | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	// wait | 	// wait | ||||||
| 	<-time.After(g.interval * 2) | 	<-time.After(g.interval * 2) | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user