Compare commits

..

24 Commits

Author SHA1 Message Date
Asim Aslam
461df8d464 Merge pull request #364 from micro/inbox
Add inbox feature to http broker
2019-01-03 11:27:46 +00:00
Asim Aslam
7c2cbe2ad2 better error handling 2019-01-03 11:23:06 +00:00
Asim Aslam
abbeb6d068 add inbox feature to http broker 2019-01-02 19:27:46 +00:00
Asim Aslam
ce36d0156d Merge pull request #362 from micro/codec
Make json/protobuf/grpc codecs
2019-01-02 18:01:34 +00:00
Asim Aslam
29ef3676b2 Merge pull request #363 from micro/proxy
Add support for http proxy
2019-01-02 15:28:57 +00:00
Asim Aslam
2761b8e0f5 Add support for http proxy 2019-01-02 15:24:17 +00:00
Asim Aslam
ed580204a8 Add grpc codec 2019-01-02 12:55:06 +00:00
Asim Aslam
7cf94162b8 remove fmt comment 2019-01-02 12:50:25 +00:00
Asim Aslam
e2623d8ef5 Make json/protobuf codecs 2018-12-31 22:01:16 +00:00
Asim Aslam
b3b4bc6059 remove Plus 2018-12-31 20:51:22 +00:00
Asim Aslam
386ced576a Process header/body in one call 2018-12-31 17:53:16 +00:00
Asim Aslam
dcf7a56f9b rename codec 2018-12-31 17:28:19 +00:00
Asim Aslam
460fb3e70c update package comments 2018-12-29 16:18:05 +00:00
Asim Aslam
5cae330732 Update selector race, rename cache selector 2018-12-29 15:44:51 +00:00
Asim Aslam
ff982b5fd1 add method 2018-12-28 21:27:08 +00:00
Asim Aslam
28324412a4 Add X-Micro-Target header 2018-12-26 14:46:15 +00:00
Asim Aslam
5f2ce6fac4 nitpick readme 2018-12-26 12:03:08 +00:00
Asim Aslam
8b54a850f7 run gossip updater first 2018-12-19 19:04:44 +00:00
Asim Aslam
fae8c5eb4c fix context 2018-12-19 09:27:53 +00:00
Asim Aslam
3bc6556d36 Merge pull request #353 from unistack-org/gossip
implement some gossip options
2018-12-19 09:27:10 +00:00
5bcdf189de implement some gossip options
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2018-12-19 12:25:16 +03:00
Asim Aslam
f2efc685d3 Merge pull request #352 from micro/rwmutex
move to using rwmutex for selector
2018-12-18 18:10:19 +00:00
Asim Aslam
67d10e5f39 simplify get code 2018-12-18 18:06:34 +00:00
Asim Aslam
770c16a66d move to using rwmutex for selector 2018-12-18 16:51:42 +00:00
23 changed files with 1053 additions and 577 deletions

View File

@@ -28,7 +28,7 @@ across the services and retry a different node if there's a problem.
- **Message Encoding** - Dynamic message encoding based on content-type. The client and server will use codecs along with content-type
to seamlessly encode and decode Go types for you. Any variety of messages could be encoded and sent from different clients. The client
and server handle this by default. This includes proto-rpc and json-rpc by default.
and server handle this by default. This includes protobuf and json by default.
- **Sync Streaming** - RPC based request/response with support for bidirectional streaming. We provide an abstraction for synchronous
communication. A request made to a service will be automatically resolved, load balanced, dialled and streamed. The default

View File

@@ -45,6 +45,10 @@ type httpBroker struct {
subscribers map[string][]*httpSubscriber
running bool
exit chan chan error
// offline message inbox
mtx sync.RWMutex
inbox map[string][][]byte
}
type httpSubscriber struct {
@@ -133,6 +137,7 @@ func newHttpBroker(opts ...Option) Broker {
subscribers: make(map[string][]*httpSubscriber),
exit: make(chan chan error),
mux: http.NewServeMux(),
inbox: make(map[string][][]byte),
}
// specify the message handler
@@ -175,6 +180,49 @@ func (h *httpSubscriber) Unsubscribe() error {
return h.hb.unsubscribe(h)
}
func (h *httpBroker) saveMessage(topic string, msg []byte) {
h.mtx.Lock()
defer h.mtx.Unlock()
// get messages
c := h.inbox[topic]
// save message
c = append(c, msg)
// max length 64
if len(c) > 64 {
c = c[:64]
}
// save inbox
h.inbox[topic] = c
}
func (h *httpBroker) getMessage(topic string, num int) [][]byte {
h.mtx.Lock()
defer h.mtx.Unlock()
// get messages
c, ok := h.inbox[topic]
if !ok {
return nil
}
// more message than requests
if len(c) >= num {
msg := c[:num]
h.inbox[topic] = c[num:]
return msg
}
// reset inbox
h.inbox[topic] = nil
// return all messages
return c
}
func (h *httpBroker) subscribe(s *httpSubscriber) error {
h.Lock()
defer h.Unlock()
@@ -454,14 +502,7 @@ func (h *httpBroker) Options() Options {
}
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
h.RLock()
s, err := h.r.GetService("topic:" + topic)
if err != nil {
h.RUnlock()
return err
}
h.RUnlock()
// create the message first
m := &Message{
Header: make(map[string]string),
Body: msg.Body,
@@ -473,12 +514,26 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
m.Header[":topic"] = topic
// encode the message
b, err := h.opts.Codec.Marshal(m)
if err != nil {
return err
}
pub := func(node *registry.Node, b []byte) {
// save the message
h.saveMessage(topic, b)
// now attempt to get the service
h.RLock()
s, err := h.r.GetService("topic:" + topic)
if err != nil {
h.RUnlock()
// ignore error
return nil
}
h.RUnlock()
pub := func(node *registry.Node, t string, b []byte) error {
scheme := "http"
// check if secure is added in metadata
@@ -491,34 +546,71 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
uri := fmt.Sprintf("%s://%s:%d%s?%s", scheme, node.Address, node.Port, DefaultSubPath, vals.Encode())
r, err := h.c.Post(uri, "application/json", bytes.NewReader(b))
if err == nil {
io.Copy(ioutil.Discard, r.Body)
r.Body.Close()
if err != nil {
return err
}
// discard response body
io.Copy(ioutil.Discard, r.Body)
r.Body.Close()
return nil
}
for _, service := range s {
// only process if we have nodes
if len(service.Nodes) == 0 {
continue
}
switch service.Version {
// broadcast version means broadcast to all nodes
case broadcastVersion:
for _, node := range service.Nodes {
// publish async
go pub(node, b)
srv := func(s []*registry.Service, b []byte) {
for _, service := range s {
// only process if we have nodes
if len(service.Nodes) == 0 {
continue
}
default:
// select node to publish to
node := service.Nodes[rand.Int()%len(service.Nodes)]
// publish async
go pub(node, b)
switch service.Version {
// broadcast version means broadcast to all nodes
case broadcastVersion:
var success bool
// publish to all nodes
for _, node := range service.Nodes {
// publish async
if err := pub(node, topic, b); err == nil {
success = true
}
}
// save if it failed to publish at least once
if !success {
h.saveMessage(topic, b)
}
default:
// select node to publish to
node := service.Nodes[rand.Int()%len(service.Nodes)]
// publish async to one node
if err := pub(node, topic, b); err != nil {
// if failed save it
h.saveMessage(topic, b)
}
}
}
}
// do the rest async
go func() {
// get a third of the backlog
messages := h.getMessage(topic, 8)
delay := (len(messages) > 1)
// publish all the messages
for _, msg := range messages {
// serialize here
srv(s, msg)
// sending a backlog of messages
if delay {
time.Sleep(time.Millisecond * 100)
}
}
}()
return nil
}

View File

@@ -96,7 +96,7 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp
context: ctx,
request: req,
closed: make(chan bool),
codec: newRpcPlusCodec(msg, c, cf),
codec: newRpcCodec(msg, c, cf),
seq: seq,
}
defer stream.Close()
@@ -177,7 +177,7 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request, opt
context: ctx,
request: req,
closed: make(chan bool),
codec: newRpcPlusCodec(msg, c, cf),
codec: newRpcCodec(msg, c, cf),
}
ch := make(chan error, 1)

View File

@@ -5,7 +5,9 @@ import (
errs "errors"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/codec/json"
"github.com/micro/go-micro/codec/jsonrpc"
"github.com/micro/go-micro/codec/proto"
"github.com/micro/go-micro/codec/protorpc"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/transport"
@@ -28,7 +30,7 @@ var (
errShutdown = errs.New("connection is shut down")
)
type rpcPlusCodec struct {
type rpcCodec struct {
client transport.Client
codec codec.Codec
@@ -43,9 +45,7 @@ type readWriteCloser struct {
type clientCodec interface {
WriteRequest(*request, interface{}) error
ReadResponseHeader(*response) error
ReadResponseBody(interface{}) error
ReadResponse(*response, interface{}) error
Close() error
}
@@ -67,9 +67,9 @@ var (
defaultContentType = "application/octet-stream"
defaultCodecs = map[string]codec.NewCodec{
"application/json": jsonrpc.NewCodec,
"application/protobuf": proto.NewCodec,
"application/json": json.NewCodec,
"application/json-rpc": jsonrpc.NewCodec,
"application/protobuf": protorpc.NewCodec,
"application/proto-rpc": protorpc.NewCodec,
"application/octet-stream": protorpc.NewCodec,
}
@@ -89,12 +89,12 @@ func (rwc *readWriteCloser) Close() error {
return nil
}
func newRpcPlusCodec(req *transport.Message, client transport.Client, c codec.NewCodec) *rpcPlusCodec {
func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCodec) *rpcCodec {
rwc := &readWriteCloser{
wbuf: bytes.NewBuffer(nil),
rbuf: bytes.NewBuffer(nil),
}
r := &rpcPlusCodec{
r := &rpcCodec{
buf: rwc,
client: client,
codec: c(rwc),
@@ -103,14 +103,18 @@ func newRpcPlusCodec(req *transport.Message, client transport.Client, c codec.Ne
return r
}
func (c *rpcPlusCodec) WriteRequest(req *request, body interface{}) error {
func (c *rpcCodec) WriteRequest(req *request, body interface{}) error {
c.buf.wbuf.Reset()
m := &codec.Message{
Id: req.Seq,
Target: req.Service,
Method: req.ServiceMethod,
Type: codec.Request,
Header: map[string]string{},
Header: map[string]string{
"X-Micro-Service": req.Service,
"X-Micro-Method": req.ServiceMethod,
},
}
if err := c.codec.Write(m, body); err != nil {
return errors.InternalServerError("go.micro.client.codec", err.Error())
@@ -125,32 +129,37 @@ func (c *rpcPlusCodec) WriteRequest(req *request, body interface{}) error {
return nil
}
func (c *rpcPlusCodec) ReadResponseHeader(r *response) error {
func (c *rpcCodec) ReadResponse(r *response, b interface{}) error {
var m transport.Message
if err := c.client.Recv(&m); err != nil {
return errors.InternalServerError("go.micro.client.transport", err.Error())
}
c.buf.rbuf.Reset()
c.buf.rbuf.Write(m.Body)
var me codec.Message
// set headers
me.Header = m.Header
// read header
err := c.codec.ReadHeader(&me, codec.Response)
r.ServiceMethod = me.Method
r.Seq = me.Id
r.Error = me.Error
if err != nil {
return errors.InternalServerError("go.micro.client.codec", err.Error())
}
return nil
}
func (c *rpcPlusCodec) ReadResponseBody(b interface{}) error {
// read body
if err := c.codec.ReadBody(b); err != nil {
return errors.InternalServerError("go.micro.client.codec", err.Error())
}
return nil
}
func (c *rpcPlusCodec) Close() error {
func (c *rpcCodec) Close() error {
c.buf.Close()
c.codec.Close()
if err := c.client.Close(); err != nil {

View File

@@ -68,7 +68,8 @@ func (r *rpcStream) Recv(msg interface{}) error {
}
var resp response
if err := r.codec.ReadResponseHeader(&resp); err != nil {
if err := r.codec.ReadResponse(&resp, msg); err != nil {
if err == io.EOF && !r.isClosed() {
r.err = io.ErrUnexpectedEOF
return io.ErrUnexpectedEOF
@@ -87,13 +88,6 @@ func (r *rpcStream) Recv(msg interface{}) error {
} else {
r.err = io.EOF
}
if err := r.codec.ReadResponseBody(nil); err != nil {
r.err = err
}
default:
if err := r.codec.ReadResponseBody(msg); err != nil {
r.err = err
}
}
return r.err

View File

@@ -26,7 +26,6 @@ import (
// selectors
"github.com/micro/go-micro/selector"
"github.com/micro/go-micro/selector/cache"
// transports
"github.com/micro/go-micro/transport"
@@ -149,7 +148,6 @@ var (
Name: "selector",
EnvVar: "MICRO_SELECTOR",
Usage: "Selector used to pick nodes for querying",
Value: "cache",
},
cli.StringFlag{
Name: "transport",
@@ -179,7 +177,7 @@ var (
DefaultSelectors = map[string]func(...selector.Option) selector.Selector{
"default": selector.NewSelector,
"cache": cache.NewSelector,
"cache": selector.NewSelector,
}
DefaultServers = map[string]func(...server.Option) server.Server{

119
codec/grpc/grpc.go Normal file
View File

@@ -0,0 +1,119 @@
// Package grpc provides a grpc codec
package grpc
import (
"encoding/json"
"errors"
"io"
"strings"
"github.com/golang/protobuf/proto"
"github.com/micro/go-micro/codec"
)
type Codec struct {
Conn io.ReadWriteCloser
ContentType string
}
func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error {
if ct := m.Header["Content-Type"]; len(ct) > 0 {
c.ContentType = ct
}
if ct := m.Header["content-type"]; len(ct) > 0 {
c.ContentType = ct
}
// service method
path := m.Header[":path"]
if len(path) == 0 || path[0] != '/' {
m.Target = m.Header["X-Micro-Service"]
m.Method = m.Header["X-Micro-Method"]
} else {
// [ , a.package.Foo, Bar]
parts := strings.Split(path, "/")
if len(parts) != 3 {
return errors.New("Unknown request path")
}
service := strings.Split(parts[1], ".")
m.Method = strings.Join([]string{service[len(service)-1], parts[2]}, ".")
m.Target = strings.Join(service[:len(service)-1], ".")
}
return nil
}
func (c *Codec) ReadBody(b interface{}) error {
// no body
if b == nil {
return nil
}
_, buf, err := decode(c.Conn)
if err != nil {
return err
}
switch c.ContentType {
case "application/grpc+json":
return json.Unmarshal(buf, b)
case "application/grpc+proto", "application/grpc":
return proto.Unmarshal(buf, b.(proto.Message))
}
return errors.New("Unsupported Content-Type")
}
func (c *Codec) Write(m *codec.Message, b interface{}) error {
var buf []byte
var err error
if ct := m.Header["Content-Type"]; len(ct) > 0 {
c.ContentType = ct
}
if ct := m.Header["content-type"]; len(ct) > 0 {
c.ContentType = ct
}
m.Header["Trailer"] = "grpc-status, grpc-message"
switch c.ContentType {
case "application/grpc+json":
buf, err = json.Marshal(b)
case "application/grpc+proto", "application/grpc":
pb, ok := b.(proto.Message)
if ok {
buf, err = proto.Marshal(pb)
}
default:
err = errors.New("Unsupported Content-Type")
}
if err != nil {
m.Header["grpc-status"] = "8"
m.Header["grpc-message"] = err.Error()
return err
}
m.Header["grpc-status"] = "0"
m.Header["grpc-message"] = ""
return encode(0, buf, c.Conn)
}
func (c *Codec) Close() error {
return c.Conn.Close()
}
func (c *Codec) String() string {
return "grpc"
}
func NewCodec(c io.ReadWriteCloser) codec.Codec {
return &Codec{
Conn: c,
ContentType: "application/grpc",
}
}

70
codec/grpc/util.go Normal file
View File

@@ -0,0 +1,70 @@
package grpc
import (
"encoding/binary"
"fmt"
"io"
)
var (
maxMessageSize = 1024 * 1024 * 4
maxInt = int(^uint(0) >> 1)
)
func decode(r io.Reader) (uint8, []byte, error) {
header := make([]byte, 5)
// read the header
if _, err := r.Read(header[:]); err != nil {
return uint8(0), nil, err
}
// get encoding format e.g compressed
cf := uint8(header[0])
// get message length
length := binary.BigEndian.Uint32(header[1:])
// no encoding format
if length == 0 {
return cf, nil, nil
}
//
if int64(length) > int64(maxInt) {
return cf, nil, fmt.Errorf("grpc: received message larger than max length allowed on current machine (%d vs. %d)", length, maxInt)
}
if int(length) > maxMessageSize {
return cf, nil, fmt.Errorf("grpc: received message larger than max (%d vs. %d)", length, maxMessageSize)
}
msg := make([]byte, int(length))
if _, err := r.Read(msg); err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
return cf, nil, err
}
return cf, msg, nil
}
func encode(cf uint8, buf []byte, w io.Writer) error {
header := make([]byte, 5)
// set compression
header[0] = byte(cf)
// write length as header
binary.BigEndian.PutUint32(header[1:], uint32(len(buf)))
// read the header
if _, err := w.Write(header[:]); err != nil {
return err
}
// write the buffer
_, err := w.Write(buf)
return err
}

43
codec/json/json.go Normal file
View File

@@ -0,0 +1,43 @@
// Package json provides a json codec
package json
import (
"encoding/json"
"io"
"github.com/micro/go-micro/codec"
)
type Codec struct {
Conn io.ReadWriteCloser
Encoder *json.Encoder
Decoder *json.Decoder
}
func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error {
return nil
}
func (c *Codec) ReadBody(b interface{}) error {
return c.Decoder.Decode(b)
}
func (c *Codec) Write(m *codec.Message, b interface{}) error {
return c.Encoder.Encode(b)
}
func (c *Codec) Close() error {
return c.Conn.Close()
}
func (c *Codec) String() string {
return "json"
}
func NewCodec(c io.ReadWriteCloser) codec.Codec {
return &Codec{
Conn: c,
Decoder: json.NewDecoder(c),
Encoder: json.NewEncoder(c),
}
}

49
codec/proto/proto.go Normal file
View File

@@ -0,0 +1,49 @@
// Package proto provides a proto codec
package proto
import (
"io"
"io/ioutil"
"github.com/golang/protobuf/proto"
"github.com/micro/go-micro/codec"
)
type Codec struct {
Conn io.ReadWriteCloser
}
func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error {
return nil
}
func (c *Codec) ReadBody(b interface{}) error {
buf, err := ioutil.ReadAll(c.Conn)
if err != nil {
return err
}
return proto.Unmarshal(buf, b.(proto.Message))
}
func (c *Codec) Write(m *codec.Message, b interface{}) error {
buf, err := proto.Marshal(b.(proto.Message))
if err != nil {
return err
}
_, err = c.Conn.Write(buf)
return err
}
func (c *Codec) Close() error {
return c.Conn.Close()
}
func (c *Codec) String() string {
return "proto"
}
func NewCodec(c io.ReadWriteCloser) codec.Codec {
return &Codec{
Conn: c,
}
}

View File

@@ -2,9 +2,12 @@
package gossip
import (
"context"
"encoding/json"
"io/ioutil"
"net"
"os"
"strconv"
"strings"
"sync"
"time"
@@ -12,7 +15,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/google/uuid"
"github.com/hashicorp/memberlist"
"github.com/micro/go-log"
log "github.com/micro/go-log"
"github.com/micro/go-micro/registry"
pb "github.com/micro/go-micro/registry/gossip/proto"
"github.com/mitchellh/hashstructure"
@@ -56,7 +59,7 @@ type update struct {
var (
// You should change this if using secure
DefaultSecret = []byte("gossip")
DefaultSecret = []byte("micro-gossip-key") // exactly 16 bytes
ExpiryTick = time.Second * 5
)
@@ -104,14 +107,40 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
RetransmitMult: 3,
}
// machine hostname
hostname, _ := os.Hostname()
// create a new default config
c := memberlist.DefaultLocalConfig()
// set bind to random port
c.BindPort = 0
if optConfig, ok := g.options.Context.Value(contextConfig{}).(*memberlist.Config); ok && optConfig != nil {
c = optConfig
}
if hostport, ok := g.options.Context.Value(contextAddress{}).(string); ok {
host, port, err := net.SplitHostPort(hostport)
if err == nil {
pn, err := strconv.Atoi(port)
if err == nil {
c.BindPort = pn
}
c.BindAddr = host
}
} else {
// set bind to random port
c.BindPort = 0
}
if hostport, ok := g.options.Context.Value(contextAdvertise{}).(string); ok {
host, port, err := net.SplitHostPort(hostport)
if err == nil {
pn, err := strconv.Atoi(port)
if err == nil {
c.AdvertisePort = pn
}
c.AdvertiseAddr = host
}
}
// machine hostname
hostname, _ := os.Hostname()
// set the name
c.Name = strings.Join([]string{"micro", hostname, uuid.New().String()}, "-")
@@ -135,8 +164,6 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
c.SecretKey = k
}
// TODO: set advertise addr to advertise behind nat
// create the memberlist
m, err := memberlist.Create(c)
if err != nil {
@@ -544,20 +571,22 @@ func (g *gossipRegistry) String() string {
func NewRegistry(opts ...registry.Option) registry.Registry {
gossip := &gossipRegistry{
options: registry.Options{},
options: registry.Options{
Context: context.Background(),
},
updates: make(chan *update, 100),
services: make(map[string][]*registry.Service),
watchers: make(map[string]chan *registry.Result),
}
// configure the gossiper
if err := configure(gossip, opts...); err != nil {
log.Fatal("Error configuring registry: %v", err)
}
// run the updater
go gossip.run()
// configure the gossiper
if err := configure(gossip, opts...); err != nil {
log.Fatalf("Error configuring registry: %v", err)
}
// wait for setup
<-time.After(gossip.interval * 2)

View File

@@ -3,6 +3,7 @@ package gossip
import (
"context"
"github.com/hashicorp/memberlist"
"github.com/micro/go-micro/registry"
)
@@ -15,3 +16,30 @@ func Secret(k []byte) registry.Option {
o.Context = context.WithValue(o.Context, contextSecretKey{}, k)
}
}
type contextAddress struct{}
// Address to bind to - host:port
func Address(a string) registry.Option {
return func(o *registry.Options) {
o.Context = context.WithValue(o.Context, contextAddress{}, a)
}
}
type contextConfig struct{}
// Config allow to inject a *memberlist.Config struct for configuring gossip
func Config(c *memberlist.Config) registry.Option {
return func(o *registry.Options) {
o.Context = context.WithValue(o.Context, contextConfig{}, c)
}
}
type contextAdvertise struct{}
// The address to advertise for other gossip members - host:port
func Advertise(a string) registry.Option {
return func(o *registry.Options) {
o.Context = context.WithValue(o.Context, contextAdvertise{}, a)
}
}

View File

@@ -38,20 +38,35 @@ func cp(current []*registry.Service) []*registry.Service {
}
func addNodes(old, neu []*registry.Node) []*registry.Node {
var nodes []*registry.Node
// add all new nodes
for _, n := range neu {
var seen bool
for i, o := range old {
node := *n
nodes = append(nodes, &node)
}
// look at old nodes
for _, o := range old {
var exists bool
// check against new nodes
for _, n := range nodes {
// ids match then skip
if o.Id == n.Id {
seen = true
old[i] = n
exists = true
break
}
}
if !seen {
old = append(old, n)
// keep old node
if !exists {
node := *o
nodes = append(nodes, &node)
}
}
return old
return nodes
}
func addServices(old, neu []*registry.Service) []*registry.Service {
@@ -91,19 +106,27 @@ func delNodes(old, del []*registry.Node) []*registry.Node {
func delServices(old, del []*registry.Service) []*registry.Service {
var services []*registry.Service
for i, o := range old {
for _, o := range old {
srv := new(registry.Service)
*srv = *o
var rem bool
for _, s := range del {
if o.Version == s.Version {
old[i].Nodes = delNodes(o.Nodes, s.Nodes)
if len(old[i].Nodes) == 0 {
if srv.Version == s.Version {
srv.Nodes = delNodes(srv.Nodes, s.Nodes)
if len(srv.Nodes) == 0 {
rem = true
}
}
}
if !rem {
services = append(services, o)
services = append(services, srv)
}
}
return services
}

View File

@@ -1,424 +0,0 @@
// Package cache is a caching selector. It uses the registry watcher.
package cache
import (
"sync"
"time"
"github.com/micro/go-log"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
)
type cacheSelector struct {
so selector.Options
ttl time.Duration
// registry cache
sync.Mutex
cache map[string][]*registry.Service
ttls map[string]time.Time
watched map[string]bool
// used to close or reload watcher
reload chan bool
exit chan bool
}
var (
DefaultTTL = time.Minute
)
func (c *cacheSelector) quit() bool {
select {
case <-c.exit:
return true
default:
return false
}
}
// cp copies a service. Because we're caching handing back pointers would
// create a race condition, so we do this instead
// its fast enough
func (c *cacheSelector) cp(current []*registry.Service) []*registry.Service {
var services []*registry.Service
for _, service := range current {
// copy service
s := new(registry.Service)
*s = *service
// copy nodes
var nodes []*registry.Node
for _, node := range service.Nodes {
n := new(registry.Node)
*n = *node
nodes = append(nodes, n)
}
s.Nodes = nodes
// copy endpoints
var eps []*registry.Endpoint
for _, ep := range service.Endpoints {
e := new(registry.Endpoint)
*e = *ep
eps = append(eps, e)
}
s.Endpoints = eps
// append service
services = append(services, s)
}
return services
}
func (c *cacheSelector) del(service string) {
delete(c.cache, service)
delete(c.ttls, service)
}
func (c *cacheSelector) get(service string) ([]*registry.Service, error) {
c.Lock()
defer c.Unlock()
// watch service if not watched
if _, ok := c.watched[service]; !ok {
go c.run(service)
c.watched[service] = true
}
// get does the actual request for a service
// it also caches it
get := func(service string) ([]*registry.Service, error) {
// ask the registry
services, err := c.so.Registry.GetService(service)
if err != nil {
return nil, err
}
// cache results
c.set(service, c.cp(services))
return services, nil
}
// check the cache first
services, ok := c.cache[service]
// cache miss or no services
if !ok || len(services) == 0 {
return get(service)
}
// got cache but lets check ttl
ttl, kk := c.ttls[service]
// within ttl so return cache
if kk && time.Since(ttl) < c.ttl {
return c.cp(services), nil
}
// expired entry so get service
services, err := get(service)
// no error then return error
if err == nil {
return services, nil
}
// not found error then return
if err == registry.ErrNotFound {
return nil, selector.ErrNotFound
}
// other error
// return expired cache as last resort
return c.cp(services), nil
}
func (c *cacheSelector) set(service string, services []*registry.Service) {
c.cache[service] = services
c.ttls[service] = time.Now().Add(c.ttl)
}
func (c *cacheSelector) update(res *registry.Result) {
if res == nil || res.Service == nil {
return
}
c.Lock()
defer c.Unlock()
services, ok := c.cache[res.Service.Name]
if !ok {
// we're not going to cache anything
// unless there was already a lookup
return
}
if len(res.Service.Nodes) == 0 {
switch res.Action {
case "delete":
c.del(res.Service.Name)
}
return
}
// existing service found
var service *registry.Service
var index int
for i, s := range services {
if s.Version == res.Service.Version {
service = s
index = i
}
}
switch res.Action {
case "create", "update":
if service == nil {
c.set(res.Service.Name, append(services, res.Service))
return
}
// append old nodes to new service
for _, cur := range service.Nodes {
var seen bool
for _, node := range res.Service.Nodes {
if cur.Id == node.Id {
seen = true
break
}
}
if !seen {
res.Service.Nodes = append(res.Service.Nodes, cur)
}
}
services[index] = res.Service
c.set(res.Service.Name, services)
case "delete":
if service == nil {
return
}
var nodes []*registry.Node
// filter cur nodes to remove the dead one
for _, cur := range service.Nodes {
var seen bool
for _, del := range res.Service.Nodes {
if del.Id == cur.Id {
seen = true
break
}
}
if !seen {
nodes = append(nodes, cur)
}
}
// still got nodes, save and return
if len(nodes) > 0 {
service.Nodes = nodes
services[index] = service
c.set(service.Name, services)
return
}
// zero nodes left
// only have one thing to delete
// nuke the thing
if len(services) == 1 {
c.del(service.Name)
return
}
// still have more than 1 service
// check the version and keep what we know
var srvs []*registry.Service
for _, s := range services {
if s.Version != service.Version {
srvs = append(srvs, s)
}
}
// save
c.set(service.Name, srvs)
}
}
// run starts the cache watcher loop
// it creates a new watcher if there's a problem
// reloads the watcher if Init is called
// and returns when Close is called
func (c *cacheSelector) run(name string) {
for {
// exit early if already dead
if c.quit() {
return
}
// create new watcher
w, err := c.so.Registry.Watch(
registry.WatchService(name),
)
if err != nil {
if c.quit() {
return
}
log.Log(err)
time.Sleep(time.Second)
continue
}
// watch for events
if err := c.watch(w); err != nil {
if c.quit() {
return
}
log.Log(err)
continue
}
}
}
// watch loops the next event and calls update
// it returns if there's an error
func (c *cacheSelector) watch(w registry.Watcher) error {
defer w.Stop()
// manage this loop
go func() {
// wait for exit or reload signal
select {
case <-c.exit:
case <-c.reload:
}
// stop the watcher
w.Stop()
}()
for {
res, err := w.Next()
if err != nil {
return err
}
c.update(res)
}
}
func (c *cacheSelector) Init(opts ...selector.Option) error {
for _, o := range opts {
o(&c.so)
}
// reload the watcher
go func() {
select {
case <-c.exit:
return
default:
c.reload <- true
}
}()
return nil
}
func (c *cacheSelector) Options() selector.Options {
return c.so
}
func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
sopts := selector.SelectOptions{
Strategy: c.so.Strategy,
}
for _, opt := range opts {
opt(&sopts)
}
// get the service
// try the cache first
// if that fails go directly to the registry
services, err := c.get(service)
if err != nil {
return nil, err
}
// apply the filters
for _, filter := range sopts.Filters {
services = filter(services)
}
// if there's nothing left, return
if len(services) == 0 {
return nil, selector.ErrNoneAvailable
}
return sopts.Strategy(services), nil
}
func (c *cacheSelector) Mark(service string, node *registry.Node, err error) {
}
func (c *cacheSelector) Reset(service string) {
}
// Close stops the watcher and destroys the cache
func (c *cacheSelector) Close() error {
c.Lock()
c.cache = make(map[string][]*registry.Service)
c.watched = make(map[string]bool)
c.Unlock()
select {
case <-c.exit:
return nil
default:
close(c.exit)
}
return nil
}
func (c *cacheSelector) String() string {
return "cache"
}
func NewSelector(opts ...selector.Option) selector.Selector {
sopts := selector.Options{
Strategy: selector.Random,
}
for _, opt := range opts {
opt(&sopts)
}
if sopts.Registry == nil {
sopts.Registry = registry.DefaultRegistry
}
ttl := DefaultTTL
if sopts.Context != nil {
if t, ok := sopts.Context.Value(ttlKey{}).(time.Duration); ok {
ttl = t
}
}
return &cacheSelector{
so: sopts,
ttl: ttl,
watched: make(map[string]bool),
cache: make(map[string][]*registry.Service),
ttls: make(map[string]time.Time),
reload: make(chan bool, 1),
exit: make(chan bool),
}
}

View File

@@ -1,29 +0,0 @@
package cache
import (
"testing"
"github.com/micro/go-micro/registry/mock"
"github.com/micro/go-micro/selector"
)
func TestCacheSelector(t *testing.T) {
counts := map[string]int{}
cache := NewSelector(selector.Registry(mock.NewRegistry()))
next, err := cache.Select("foo")
if err != nil {
t.Errorf("Unexpected error calling cache select: %v", err)
}
for i := 0; i < 100; i++ {
node, err := next()
if err != nil {
t.Errorf("Expected node err, got err: %v", err)
}
counts[node.Id]++
}
t.Logf("Cache Counts %v", counts)
}

View File

@@ -1,27 +1,341 @@
package selector
import (
"sync"
"time"
"github.com/micro/go-log"
"github.com/micro/go-micro/registry"
)
type defaultSelector struct {
so Options
type registrySelector struct {
so Options
ttl time.Duration
// registry cache
sync.RWMutex
cache map[string][]*registry.Service
ttls map[string]time.Time
watched map[string]bool
// used to close or reload watcher
reload chan bool
exit chan bool
}
func (r *defaultSelector) Init(opts ...Option) error {
for _, o := range opts {
o(&r.so)
var (
DefaultTTL = time.Minute
)
func (c *registrySelector) quit() bool {
select {
case <-c.exit:
return true
default:
return false
}
}
// cp copies a service. Because we're caching handing back pointers would
// create a race condition, so we do this instead
// its fast enough
func (c *registrySelector) cp(current []*registry.Service) []*registry.Service {
var services []*registry.Service
for _, service := range current {
// copy service
s := new(registry.Service)
*s = *service
// copy nodes
var nodes []*registry.Node
for _, node := range service.Nodes {
n := new(registry.Node)
*n = *node
nodes = append(nodes, n)
}
s.Nodes = nodes
// copy endpoints
var eps []*registry.Endpoint
for _, ep := range service.Endpoints {
e := new(registry.Endpoint)
*e = *ep
eps = append(eps, e)
}
s.Endpoints = eps
// append service
services = append(services, s)
}
return services
}
func (c *registrySelector) del(service string) {
delete(c.cache, service)
delete(c.ttls, service)
}
func (c *registrySelector) get(service string) ([]*registry.Service, error) {
// read lock
c.RLock()
// check the cache first
services, ok := c.cache[service]
// get cache ttl
ttl, kk := c.ttls[service]
// got services && within ttl so return cache
if ok && kk && time.Since(ttl) < c.ttl {
// make a copy
cp := c.cp(services)
// unlock the read
c.RUnlock()
// return servics
return cp, nil
}
// get does the actual request for a service and cache it
get := func(service string) ([]*registry.Service, error) {
// ask the registry
services, err := c.so.Registry.GetService(service)
if err != nil {
return nil, err
}
// cache results
c.Lock()
c.set(service, c.cp(services))
c.Unlock()
return services, nil
}
// watch service if not watched
if _, ok := c.watched[service]; !ok {
go c.run(service)
}
// unlock the read lock
c.RUnlock()
// get and return services
return get(service)
}
func (c *registrySelector) set(service string, services []*registry.Service) {
c.cache[service] = services
c.ttls[service] = time.Now().Add(c.ttl)
}
func (c *registrySelector) update(res *registry.Result) {
if res == nil || res.Service == nil {
return
}
c.Lock()
defer c.Unlock()
services, ok := c.cache[res.Service.Name]
if !ok {
// we're not going to cache anything
// unless there was already a lookup
return
}
if len(res.Service.Nodes) == 0 {
switch res.Action {
case "delete":
c.del(res.Service.Name)
}
return
}
// existing service found
var service *registry.Service
var index int
for i, s := range services {
if s.Version == res.Service.Version {
service = s
index = i
}
}
switch res.Action {
case "create", "update":
if service == nil {
c.set(res.Service.Name, append(services, res.Service))
return
}
// append old nodes to new service
for _, cur := range service.Nodes {
var seen bool
for _, node := range res.Service.Nodes {
if cur.Id == node.Id {
seen = true
break
}
}
if !seen {
res.Service.Nodes = append(res.Service.Nodes, cur)
}
}
services[index] = res.Service
c.set(res.Service.Name, services)
case "delete":
if service == nil {
return
}
var nodes []*registry.Node
// filter cur nodes to remove the dead one
for _, cur := range service.Nodes {
var seen bool
for _, del := range res.Service.Nodes {
if del.Id == cur.Id {
seen = true
break
}
}
if !seen {
nodes = append(nodes, cur)
}
}
// still got nodes, save and return
if len(nodes) > 0 {
service.Nodes = nodes
services[index] = service
c.set(service.Name, services)
return
}
// zero nodes left
// only have one thing to delete
// nuke the thing
if len(services) == 1 {
c.del(service.Name)
return
}
// still have more than 1 service
// check the version and keep what we know
var srvs []*registry.Service
for _, s := range services {
if s.Version != service.Version {
srvs = append(srvs, s)
}
}
// save
c.set(service.Name, srvs)
}
}
// run starts the cache watcher loop
// it creates a new watcher if there's a problem
// reloads the watcher if Init is called
// and returns when Close is called
func (c *registrySelector) run(name string) {
// set watcher
c.Lock()
c.watched[name] = true
c.Unlock()
// delete watcher on exit
defer func() {
c.Lock()
delete(c.watched, name)
c.Unlock()
}()
for {
// exit early if already dead
if c.quit() {
return
}
// create new watcher
w, err := c.so.Registry.Watch(
registry.WatchService(name),
)
if err != nil {
if c.quit() {
return
}
log.Log(err)
time.Sleep(time.Second)
continue
}
// watch for events
if err := c.watch(w); err != nil {
if c.quit() {
return
}
log.Log(err)
continue
}
}
}
// watch loops the next event and calls update
// it returns if there's an error
func (c *registrySelector) watch(w registry.Watcher) error {
defer w.Stop()
// manage this loop
go func() {
// wait for exit or reload signal
select {
case <-c.exit:
case <-c.reload:
}
// stop the watcher
w.Stop()
}()
for {
res, err := w.Next()
if err != nil {
return err
}
c.update(res)
}
}
func (c *registrySelector) Init(opts ...Option) error {
for _, o := range opts {
o(&c.so)
}
// reload the watcher
go func() {
select {
case <-c.exit:
return
default:
c.reload <- true
}
}()
return nil
}
func (r *defaultSelector) Options() Options {
return r.so
func (c *registrySelector) Options() Options {
return c.so
}
func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, error) {
func (c *registrySelector) Select(service string, opts ...SelectOption) (Next, error) {
sopts := SelectOptions{
Strategy: r.so.Strategy,
Strategy: c.so.Strategy,
}
for _, opt := range opts {
@@ -29,7 +343,9 @@ func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, er
}
// get the service
services, err := r.so.Registry.GetService(service)
// try the cache first
// if that fails go directly to the registry
services, err := c.get(service)
if err != nil {
return nil, err
}
@@ -47,21 +363,33 @@ func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, er
return sopts.Strategy(services), nil
}
func (r *defaultSelector) Mark(service string, node *registry.Node, err error) {
func (c *registrySelector) Mark(service string, node *registry.Node, err error) {
}
func (r *defaultSelector) Reset(service string) {
func (c *registrySelector) Reset(service string) {
}
func (r *defaultSelector) Close() error {
// Close stops the watcher and destroys the cache
func (c *registrySelector) Close() error {
c.Lock()
c.cache = make(map[string][]*registry.Service)
c.watched = make(map[string]bool)
c.Unlock()
select {
case <-c.exit:
return nil
default:
close(c.exit)
}
return nil
}
func (r *defaultSelector) String() string {
return "default"
func (c *registrySelector) String() string {
return "registry"
}
func newDefaultSelector(opts ...Option) Selector {
func NewSelector(opts ...Option) Selector {
sopts := Options{
Strategy: Random,
}
@@ -74,7 +402,21 @@ func newDefaultSelector(opts ...Option) Selector {
sopts.Registry = registry.DefaultRegistry
}
return &defaultSelector{
so: sopts,
ttl := DefaultTTL
if sopts.Context != nil {
if t, ok := sopts.Context.Value("selector_ttl").(time.Duration); ok {
ttl = t
}
}
return &registrySelector{
so: sopts,
ttl: ttl,
watched: make(map[string]bool),
cache: make(map[string][]*registry.Service),
ttls: make(map[string]time.Time),
reload: make(chan bool, 1),
exit: make(chan bool),
}
}

View File

@@ -6,14 +6,14 @@ import (
"github.com/micro/go-micro/registry/mock"
)
func TestDefaultSelector(t *testing.T) {
func TestRegistrySelector(t *testing.T) {
counts := map[string]int{}
rs := newDefaultSelector(Registry(mock.NewRegistry()))
cache := NewSelector(Registry(mock.NewRegistry()))
next, err := rs.Select("foo")
next, err := cache.Select("foo")
if err != nil {
t.Errorf("Unexpected error calling default select: %v", err)
t.Errorf("Unexpected error calling cache select: %v", err)
}
for i := 0; i < 100; i++ {
@@ -24,5 +24,5 @@ func TestDefaultSelector(t *testing.T) {
counts[node.Id]++
}
t.Logf("Default Counts %v", counts)
t.Logf("Selector Counts %v", counts)
}

View File

@@ -1,4 +1,4 @@
package cache
package registry
import (
"context"
@@ -7,14 +7,12 @@ import (
"github.com/micro/go-micro/selector"
)
type ttlKey struct{}
// Set the cache ttl
// Set the registry cache ttl
func TTL(t time.Duration) selector.Option {
return func(o *selector.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, ttlKey{}, t)
o.Context = context.WithValue(o.Context, "selector_ttl", t)
}
}

View File

@@ -0,0 +1,11 @@
// Package registry uses the go-micro registry for selection
package registry
import (
"github.com/micro/go-micro/selector"
)
// NewSelector returns a new registry selector
func NewSelector(opts ...selector.Option) selector.Selector {
return selector.NewSelector(opts...)
}

View File

@@ -1,4 +1,4 @@
// Package selector is a way to load balance service nodes
// Package selector is a way to pick a list of service nodes
package selector
import (
@@ -35,12 +35,8 @@ type Filter func([]*registry.Service) []*registry.Service
type Strategy func([]*registry.Service) Next
var (
DefaultSelector = newDefaultSelector()
DefaultSelector = NewSelector()
ErrNotFound = errors.New("not found")
ErrNoneAvailable = errors.New("none available")
)
func NewSelector(opts ...Option) Selector {
return newDefaultSelector(opts...)
}

View File

@@ -4,7 +4,10 @@ import (
"bytes"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/codec/grpc"
"github.com/micro/go-micro/codec/json"
"github.com/micro/go-micro/codec/jsonrpc"
"github.com/micro/go-micro/codec/proto"
"github.com/micro/go-micro/codec/protorpc"
"github.com/micro/go-micro/transport"
"github.com/pkg/errors"
@@ -25,9 +28,12 @@ type readWriteCloser struct {
var (
defaultCodecs = map[string]codec.NewCodec{
"application/json": jsonrpc.NewCodec,
"application/grpc": grpc.NewCodec,
"application/grpc+json": grpc.NewCodec,
"application/grpc+proto": grpc.NewCodec,
"application/json": json.NewCodec,
"application/json-rpc": jsonrpc.NewCodec,
"application/protobuf": protorpc.NewCodec,
"application/protobuf": proto.NewCodec,
"application/proto-rpc": protorpc.NewCodec,
"application/octet-stream": protorpc.NewCodec,
}
@@ -77,6 +83,10 @@ func (c *rpcCodec) ReadRequestHeader(r *request, first bool) error {
m.Header = tm.Header
}
// set some internal things
m.Target = m.Header["X-Micro-Service"]
m.Method = m.Header["X-Micro-Method"]
err := c.codec.ReadHeader(&m, codec.Request)
r.ServiceMethod = m.Method
r.Seq = m.Id

109
transport/http_proxy.go Normal file
View File

@@ -0,0 +1,109 @@
package transport
import (
"bufio"
"encoding/base64"
"fmt"
"io"
"net"
"net/http"
"net/http/httputil"
"net/url"
)
const (
proxyAuthHeader = "Proxy-Authorization"
)
func getURL(addr string) (*url.URL, error) {
r := &http.Request{
URL: &url.URL{
Scheme: "https",
Host: addr,
},
}
return http.ProxyFromEnvironment(r)
}
type pbuffer struct {
net.Conn
r io.Reader
}
func (p *pbuffer) Read(b []byte) (int, error) {
return p.r.Read(b)
}
func proxyDial(conn net.Conn, addr string, proxyURL *url.URL) (_ net.Conn, err error) {
defer func() {
if err != nil {
conn.Close()
}
}()
r := &http.Request{
Method: http.MethodConnect,
URL: &url.URL{Host: addr},
Header: map[string][]string{"User-Agent": {"micro/latest"}},
}
if user := proxyURL.User; user != nil {
u := user.Username()
p, _ := user.Password()
auth := []byte(u + ":" + p)
basicAuth := base64.StdEncoding.EncodeToString(auth)
r.Header.Add(proxyAuthHeader, "Basic "+basicAuth)
}
if err := r.Write(conn); err != nil {
return nil, fmt.Errorf("failed to write the HTTP request: %v", err)
}
br := bufio.NewReader(conn)
rsp, err := http.ReadResponse(br, r)
if err != nil {
return nil, fmt.Errorf("reading server HTTP response: %v", err)
}
defer rsp.Body.Close()
if rsp.StatusCode != http.StatusOK {
dump, err := httputil.DumpResponse(rsp, true)
if err != nil {
return nil, fmt.Errorf("failed to do connect handshake, status code: %s", rsp.Status)
}
return nil, fmt.Errorf("failed to do connect handshake, response: %q", dump)
}
return &pbuffer{Conn: conn, r: br}, nil
}
// Creates a new connection
func newConn(dial func(string) (net.Conn, error)) func(string) (net.Conn, error) {
return func(addr string) (net.Conn, error) {
// get the proxy url
proxyURL, err := getURL(addr)
if err != nil {
return nil, err
}
// set to addr
callAddr := addr
// got proxy
if proxyURL != nil {
callAddr = proxyURL.Host
}
// dial the addr
c, err := dial(callAddr)
if err != nil {
return nil, err
}
// do proxy connect if we have proxy url
if proxyURL != nil {
c, err = proxyDial(c, addr, proxyURL)
}
return c, err
}
}

View File

@@ -1,7 +1,6 @@
package transport
import (
//"fmt"
"bufio"
"bytes"
"crypto/tls"
@@ -246,6 +245,9 @@ func (h *httpTransportSocket) Recv(m *Message) error {
}
}
// set path
m.Header[":path"] = h.r.URL.Path
// return early early
return nil
}
@@ -277,6 +279,9 @@ func (h *httpTransportSocket) Recv(m *Message) error {
}
}
// set path
m.Header[":path"] = h.r.URL.Path
return nil
}
@@ -452,9 +457,13 @@ func (h *httpTransport) Dial(addr string, opts ...DialOption) (Client, error) {
InsecureSkipVerify: true,
}
}
conn, err = tls.DialWithDialer(&net.Dialer{Timeout: dopts.Timeout}, "tcp", addr, config)
conn, err = newConn(func(addr string) (net.Conn, error) {
return tls.DialWithDialer(&net.Dialer{Timeout: dopts.Timeout}, "tcp", addr, config)
})(addr)
} else {
conn, err = net.DialTimeout("tcp", addr, dopts.Timeout)
conn, err = newConn(func(addr string) (net.Conn, error) {
return net.DialTimeout("tcp", addr, dopts.Timeout)
})(addr)
}
if err != nil {