Compare commits

..

66 Commits

Author SHA1 Message Date
Asim Aslam
c17d0fcc0f grpc request 2019-01-13 19:54:07 +00:00
Asim Aslam
e1bc240a14 Respond with error type 2019-01-13 12:15:35 +00:00
Asim Aslam
01f6683035 Add router option 2019-01-13 12:15:13 +00:00
Asim Aslam
bfd341a269 Execute wrappers before router 2019-01-11 15:49:54 +00:00
Asim Aslam
9897c630ae remove request/response 2019-01-11 14:04:37 +00:00
Asim Aslam
36788487a7 set headers as appropriate 2019-01-11 13:44:47 +00:00
Asim Aslam
3043841cf5 Don't process nil 2019-01-10 22:14:32 +00:00
Asim Aslam
04103fe048 Merge pull request #379 from micro/endpoint
Rename method to endpoint
2019-01-10 22:12:28 +00:00
Asim Aslam
9adebfcf1e rename method to endpoint 2019-01-10 21:25:31 +00:00
Asim Aslam
f853f88bcd gofmt 2019-01-10 20:35:20 +00:00
Asim Aslam
40ff5b749b Set topic header 2019-01-10 20:35:10 +00:00
Asim Aslam
59d82b0abe Add response 2019-01-10 11:43:36 +00:00
Asim Aslam
648da5494f Change a few things 2019-01-10 11:39:39 +00:00
Asim Aslam
bb31480f1a downgrade code generated stuff 2019-01-10 10:57:04 +00:00
Asim Aslam
c086c33bb3 remove codecs 2019-01-10 09:42:02 +00:00
Asim Aslam
6e0e4a684c Further crufting 2019-01-09 19:28:13 +00:00
Asim Aslam
873fc6d663 rewriting a lot 2019-01-09 19:11:47 +00:00
Asim Aslam
1561ccbc14 remove clientCodec 2019-01-09 17:33:28 +00:00
Asim Aslam
d004c9624b Add router modifications 2019-01-09 16:20:57 +00:00
Asim Aslam
ee380c6b7a reorder 2019-01-09 09:06:30 +00:00
Asim Aslam
7a1f735825 remove server codec 2019-01-09 09:02:30 +00:00
Asim Aslam
69119cc622 Merge pull request #376 from jiyeyuran/patch-3
add locker
2019-01-09 08:42:08 +00:00
xinfei.wu
eec1726f1d add package comment 2019-01-09 16:31:23 +08:00
xinfei.wu
453ce2fcbe add locker 2019-01-09 14:24:12 +08:00
Asim Aslam
d5df31eeb8 Merge pull request #375 from micro/codec
further codec changes
2019-01-08 21:04:22 +00:00
Asim Aslam
f46828be33 Add Router interface 2019-01-08 20:32:47 +00:00
Asim Aslam
4cb41721f1 further codec changes 2019-01-08 15:38:25 +00:00
Asim Aslam
216dbb771a rename requestHeader 2019-01-07 18:25:31 +00:00
Asim Aslam
c9963cb870 rename 2019-01-07 18:20:47 +00:00
Asim Aslam
e8b431c5ff rename codec interface 2019-01-07 18:17:13 +00:00
Asim Aslam
9544058af3 Merge pull request #372 from micro/codec
Switch default codec and add default codec for server
2019-01-07 17:54:28 +00:00
Asim Aslam
c717af21ac Some router changes 2019-01-07 17:17:06 +00:00
Asim Aslam
46ece968d4 rename service to router 2019-01-07 14:44:40 +00:00
Asim Aslam
fcc730931c Merge pull request #371 from micro/dns
Add dns selector
2019-01-07 13:56:24 +00:00
Asim Aslam
d519180806 Merge branch 'master' into dns 2019-01-07 13:52:37 +00:00
Asim Aslam
78af321790 Merge pull request #367 from micro/static
Add static selector
2019-01-07 13:51:47 +00:00
Asim Aslam
d179c971af Switch default codec and add default codec for server 2019-01-07 13:48:38 +00:00
Asim Aslam
d6a5ff432c add net.LookupHost for dns 2019-01-07 09:34:07 +00:00
Asim Aslam
5aeb28dfee Add error header 2019-01-07 09:11:36 +00:00
Asim Aslam
f9da55e8a9 Add dns selector 2019-01-07 07:41:26 +00:00
Asim Aslam
4692af4393 Add static selector 2019-01-06 21:12:02 +00:00
Asim Aslam
4adc31e62d add bytes codec, still unused 2019-01-04 14:07:16 +00:00
Asim Aslam
461df8d464 Merge pull request #364 from micro/inbox
Add inbox feature to http broker
2019-01-03 11:27:46 +00:00
Asim Aslam
7c2cbe2ad2 better error handling 2019-01-03 11:23:06 +00:00
Asim Aslam
abbeb6d068 add inbox feature to http broker 2019-01-02 19:27:46 +00:00
Asim Aslam
ce36d0156d Merge pull request #362 from micro/codec
Make json/protobuf/grpc codecs
2019-01-02 18:01:34 +00:00
Asim Aslam
29ef3676b2 Merge pull request #363 from micro/proxy
Add support for http proxy
2019-01-02 15:28:57 +00:00
Asim Aslam
2761b8e0f5 Add support for http proxy 2019-01-02 15:24:17 +00:00
Asim Aslam
ed580204a8 Add grpc codec 2019-01-02 12:55:06 +00:00
Asim Aslam
7cf94162b8 remove fmt comment 2019-01-02 12:50:25 +00:00
Asim Aslam
e2623d8ef5 Make json/protobuf codecs 2018-12-31 22:01:16 +00:00
Asim Aslam
b3b4bc6059 remove Plus 2018-12-31 20:51:22 +00:00
Asim Aslam
386ced576a Process header/body in one call 2018-12-31 17:53:16 +00:00
Asim Aslam
dcf7a56f9b rename codec 2018-12-31 17:28:19 +00:00
Asim Aslam
460fb3e70c update package comments 2018-12-29 16:18:05 +00:00
Asim Aslam
5cae330732 Update selector race, rename cache selector 2018-12-29 15:44:51 +00:00
Asim Aslam
ff982b5fd1 add method 2018-12-28 21:27:08 +00:00
Asim Aslam
28324412a4 Add X-Micro-Target header 2018-12-26 14:46:15 +00:00
Asim Aslam
5f2ce6fac4 nitpick readme 2018-12-26 12:03:08 +00:00
Asim Aslam
8b54a850f7 run gossip updater first 2018-12-19 19:04:44 +00:00
Asim Aslam
fae8c5eb4c fix context 2018-12-19 09:27:53 +00:00
Asim Aslam
3bc6556d36 Merge pull request #353 from unistack-org/gossip
implement some gossip options
2018-12-19 09:27:10 +00:00
5bcdf189de implement some gossip options
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2018-12-19 12:25:16 +03:00
Asim Aslam
f2efc685d3 Merge pull request #352 from micro/rwmutex
move to using rwmutex for selector
2018-12-18 18:10:19 +00:00
Asim Aslam
67d10e5f39 simplify get code 2018-12-18 18:06:34 +00:00
Asim Aslam
770c16a66d move to using rwmutex for selector 2018-12-18 16:51:42 +00:00
61 changed files with 2178 additions and 1105 deletions

View File

@@ -28,7 +28,7 @@ across the services and retry a different node if there's a problem.
- **Message Encoding** - Dynamic message encoding based on content-type. The client and server will use codecs along with content-type
to seamlessly encode and decode Go types for you. Any variety of messages could be encoded and sent from different clients. The client
and server handle this by default. This includes proto-rpc and json-rpc by default.
and server handle this by default. This includes protobuf and json by default.
- **Sync Streaming** - RPC based request/response with support for bidirectional streaming. We provide an abstraction for synchronous
communication. A request made to a service will be automatically resolved, load balanced, dialled and streamed. The default

View File

@@ -1,10 +0,0 @@
package codec
// Codec is used for encoding where the broker doesn't natively support
// headers in the message type. In this case the entire message is
// encoded as the payload
type Codec interface {
Marshal(interface{}) ([]byte, error)
Unmarshal([]byte, interface{}) error
String() string
}

View File

@@ -1,25 +0,0 @@
package json
import (
"encoding/json"
"github.com/micro/go-micro/broker/codec"
)
type jsonCodec struct{}
func (j jsonCodec) Marshal(v interface{}) ([]byte, error) {
return json.Marshal(v)
}
func (j jsonCodec) Unmarshal(d []byte, v interface{}) error {
return json.Unmarshal(d, v)
}
func (j jsonCodec) String() string {
return "json"
}
func NewCodec() codec.Codec {
return jsonCodec{}
}

View File

@@ -1,35 +0,0 @@
package noop
import (
"errors"
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/broker/codec"
)
type noopCodec struct{}
func (n noopCodec) Marshal(v interface{}) ([]byte, error) {
msg, ok := v.(*broker.Message)
if !ok {
return nil, errors.New("invalid message")
}
return msg.Body, nil
}
func (n noopCodec) Unmarshal(d []byte, v interface{}) error {
msg, ok := v.(*broker.Message)
if !ok {
return errors.New("invalid message")
}
msg.Body = d
return nil
}
func (n noopCodec) String() string {
return "noop"
}
func NewCodec() codec.Codec {
return noopCodec{}
}

View File

@@ -20,7 +20,7 @@ import (
"github.com/google/uuid"
"github.com/micro/go-log"
"github.com/micro/go-micro/broker/codec/json"
"github.com/micro/go-micro/codec/json"
merr "github.com/micro/go-micro/errors"
"github.com/micro/go-micro/registry"
"github.com/micro/go-rcache"
@@ -45,6 +45,10 @@ type httpBroker struct {
subscribers map[string][]*httpSubscriber
running bool
exit chan chan error
// offline message inbox
mtx sync.RWMutex
inbox map[string][][]byte
}
type httpSubscriber struct {
@@ -104,7 +108,7 @@ func newTransport(config *tls.Config) *http.Transport {
func newHttpBroker(opts ...Option) Broker {
options := Options{
Codec: json.NewCodec(),
Codec: json.Marshaler{},
Context: context.TODO(),
}
@@ -133,6 +137,7 @@ func newHttpBroker(opts ...Option) Broker {
subscribers: make(map[string][]*httpSubscriber),
exit: make(chan chan error),
mux: http.NewServeMux(),
inbox: make(map[string][][]byte),
}
// specify the message handler
@@ -175,6 +180,49 @@ func (h *httpSubscriber) Unsubscribe() error {
return h.hb.unsubscribe(h)
}
func (h *httpBroker) saveMessage(topic string, msg []byte) {
h.mtx.Lock()
defer h.mtx.Unlock()
// get messages
c := h.inbox[topic]
// save message
c = append(c, msg)
// max length 64
if len(c) > 64 {
c = c[:64]
}
// save inbox
h.inbox[topic] = c
}
func (h *httpBroker) getMessage(topic string, num int) [][]byte {
h.mtx.Lock()
defer h.mtx.Unlock()
// get messages
c, ok := h.inbox[topic]
if !ok {
return nil
}
// more message than requests
if len(c) >= num {
msg := c[:num]
h.inbox[topic] = c[num:]
return msg
}
// reset inbox
h.inbox[topic] = nil
// return all messages
return c
}
func (h *httpBroker) subscribe(s *httpSubscriber) error {
h.Lock()
defer h.Unlock()
@@ -454,14 +502,7 @@ func (h *httpBroker) Options() Options {
}
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
h.RLock()
s, err := h.r.GetService("topic:" + topic)
if err != nil {
h.RUnlock()
return err
}
h.RUnlock()
// create the message first
m := &Message{
Header: make(map[string]string),
Body: msg.Body,
@@ -473,12 +514,26 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
m.Header[":topic"] = topic
// encode the message
b, err := h.opts.Codec.Marshal(m)
if err != nil {
return err
}
pub := func(node *registry.Node, b []byte) {
// save the message
h.saveMessage(topic, b)
// now attempt to get the service
h.RLock()
s, err := h.r.GetService("topic:" + topic)
if err != nil {
h.RUnlock()
// ignore error
return nil
}
h.RUnlock()
pub := func(node *registry.Node, t string, b []byte) error {
scheme := "http"
// check if secure is added in metadata
@@ -491,34 +546,71 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
uri := fmt.Sprintf("%s://%s:%d%s?%s", scheme, node.Address, node.Port, DefaultSubPath, vals.Encode())
r, err := h.c.Post(uri, "application/json", bytes.NewReader(b))
if err == nil {
io.Copy(ioutil.Discard, r.Body)
r.Body.Close()
if err != nil {
return err
}
// discard response body
io.Copy(ioutil.Discard, r.Body)
r.Body.Close()
return nil
}
for _, service := range s {
// only process if we have nodes
if len(service.Nodes) == 0 {
continue
}
switch service.Version {
// broadcast version means broadcast to all nodes
case broadcastVersion:
for _, node := range service.Nodes {
// publish async
go pub(node, b)
srv := func(s []*registry.Service, b []byte) {
for _, service := range s {
// only process if we have nodes
if len(service.Nodes) == 0 {
continue
}
default:
// select node to publish to
node := service.Nodes[rand.Int()%len(service.Nodes)]
// publish async
go pub(node, b)
switch service.Version {
// broadcast version means broadcast to all nodes
case broadcastVersion:
var success bool
// publish to all nodes
for _, node := range service.Nodes {
// publish async
if err := pub(node, topic, b); err == nil {
success = true
}
}
// save if it failed to publish at least once
if !success {
h.saveMessage(topic, b)
}
default:
// select node to publish to
node := service.Nodes[rand.Int()%len(service.Nodes)]
// publish async to one node
if err := pub(node, topic, b); err != nil {
// if failed save it
h.saveMessage(topic, b)
}
}
}
}
// do the rest async
go func() {
// get a third of the backlog
messages := h.getMessage(topic, 8)
delay := (len(messages) > 1)
// publish all the messages
for _, msg := range messages {
// serialize here
srv(s, msg)
// sending a backlog of messages
if delay {
time.Sleep(time.Millisecond * 100)
}
}
}()
return nil
}

View File

@@ -4,14 +4,14 @@ import (
"context"
"crypto/tls"
"github.com/micro/go-micro/broker/codec"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/registry"
)
type Options struct {
Addrs []string
Secure bool
Codec codec.Codec
Codec codec.Marshaler
TLSConfig *tls.Config
// Other options for implementations of the interface
// can be stored in a context
@@ -71,7 +71,7 @@ func Addrs(addrs ...string) Option {
// Codec sets the codec used for encoding/decoding used where
// a broker does not support headers
func Codec(c codec.Codec) Option {
func Codec(c codec.Marshaler) Option {
return func(o *Options) {
o.Codec = c
}

View File

@@ -4,6 +4,8 @@ package client
import (
"context"
"time"
"github.com/micro/go-micro/codec"
)
// Client is the interface used to make requests to services.
@@ -13,13 +15,18 @@ type Client interface {
Init(...Option) error
Options() Options
NewMessage(topic string, msg interface{}, opts ...MessageOption) Message
NewRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
NewRequest(service, endpoint string, req interface{}, reqOpts ...RequestOption) Request
Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
Publish(ctx context.Context, msg Message, opts ...PublishOption) error
String() string
}
// Router manages request routing
type Router interface {
SendRequest(context.Context, Request) (Response, error)
}
// Message is the interface for publishing asynchronously
type Message interface {
Topic() string
@@ -29,14 +36,30 @@ type Message interface {
// Request is the interface for a synchronous request used by Call or Stream
type Request interface {
// The service to call
Service() string
Method() string
// The endpoint to call
Endpoint() string
// The content type
ContentType() string
Request() interface{}
// The unencoded request body
Body() interface{}
// Write to the encoded request writer. This is nil before a call is made
Codec() codec.Writer
// indicates whether the request will be a streaming one rather than unary
Stream() bool
}
// Response is the response received from a service
type Response interface {
// Read the response
Codec() codec.Reader
// read the header
Header() map[string]string
// Read the undecoded response
Read() ([]byte, error)
}
// Stream is the inteface for a bidirectional synchronous stream
type Stream interface {
Context() context.Context
@@ -102,8 +125,8 @@ func NewClient(opt ...Option) Client {
// Creates a new request using the default client. Content Type will
// be set to the default within options and use the appropriate codec
func NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return DefaultClient.NewRequest(service, method, request, reqOpts...)
func NewRequest(service, endpoint string, request interface{}, reqOpts ...RequestOption) Request {
return DefaultClient.NewRequest(service, endpoint, request, reqOpts...)
}
// Creates a streaming connection with a service and returns responses on the

View File

@@ -16,7 +16,7 @@ var (
)
type MockResponse struct {
Method string
Endpoint string
Response interface{}
Error error
}
@@ -54,8 +54,8 @@ func (m *MockClient) NewMessage(topic string, msg interface{}, opts ...client.Me
return m.Client.NewMessage(topic, msg, opts...)
}
func (m *MockClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
return m.Client.NewRequest(service, method, req, reqOpts...)
func (m *MockClient) NewRequest(service, endpoint string, req interface{}, reqOpts ...client.RequestOption) client.Request {
return m.Client.NewRequest(service, endpoint, req, reqOpts...)
}
func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
@@ -68,7 +68,7 @@ func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface
}
for _, r := range response {
if r.Method != req.Method() {
if r.Endpoint != req.Endpoint() {
continue
}
@@ -91,7 +91,7 @@ func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface
return nil
}
return fmt.Errorf("rpc: can't find service %s", req.Method())
return fmt.Errorf("rpc: can't find service %s", req.Endpoint())
}
func (m *MockClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {

View File

@@ -13,17 +13,17 @@ func TestClient(t *testing.T) {
}
response := []MockResponse{
{Method: "Foo.Bar", Response: map[string]interface{}{"foo": "bar"}},
{Method: "Foo.Struct", Response: &TestResponse{Param: "aparam"}},
{Method: "Foo.Fail", Error: errors.InternalServerError("go.mock", "failed")},
{Method: "Foo.Func", Response: func() string { return "string" }},
{Method: "Foo.FuncStruct", Response: func() *TestResponse { return &TestResponse{Param: "aparam"} }},
{Endpoint: "Foo.Bar", Response: map[string]interface{}{"foo": "bar"}},
{Endpoint: "Foo.Struct", Response: &TestResponse{Param: "aparam"}},
{Endpoint: "Foo.Fail", Error: errors.InternalServerError("go.mock", "failed")},
{Endpoint: "Foo.Func", Response: func() string { return "string" }},
{Endpoint: "Foo.FuncStruct", Response: func() *TestResponse { return &TestResponse{Param: "aparam"} }},
}
c := NewClient(Response("go.mock", response))
for _, r := range response {
req := c.NewRequest("go.mock", r.Method, map[string]interface{}{"foo": "bar"})
req := c.NewRequest("go.mock", r.Endpoint, map[string]interface{}{"foo": "bar"})
var rsp interface{}
err := c.Call(context.TODO(), req, &rsp)

View File

@@ -22,6 +22,9 @@ type Options struct {
Selector selector.Selector
Transport transport.Transport
// Router sets the router
Router Router
// Connection Pool
PoolSize int
PoolTTL time.Duration
@@ -99,7 +102,7 @@ func newOptions(options ...Option) Options {
}
if len(opts.ContentType) == 0 {
opts.ContentType = defaultContentType
opts.ContentType = DefaultContentType
}
if opts.Broker == nil {
@@ -306,3 +309,10 @@ func StreamingRequest() RequestOption {
o.Stream = true
}
}
// WithRouter sets the client router
func WithRouter(r Router) Option {
return func(o *Options) {
o.Router = r
}
}

View File

@@ -9,6 +9,7 @@ import (
"sync/atomic"
"github.com/google/uuid"
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/errors"
@@ -49,7 +50,7 @@ func (r *rpcClient) newCodec(contentType string) (codec.NewCodec, error) {
if c, ok := r.opts.Codecs[contentType]; ok {
return c, nil
}
if cf, ok := defaultCodecs[contentType]; ok {
if cf, ok := DefaultCodecs[contentType]; ok {
return cf, nil
}
return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType)
@@ -96,8 +97,8 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp
context: ctx,
request: req,
closed: make(chan bool),
codec: newRpcPlusCodec(msg, c, cf),
seq: seq,
codec: newRpcCodec(msg, c, cf),
id: fmt.Sprintf("%v", seq),
}
defer stream.Close()
@@ -111,7 +112,7 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp
}()
// send request
if err := stream.Send(req.Request()); err != nil {
if err := stream.Send(req.Body()); err != nil {
ch <- err
return
}
@@ -177,13 +178,13 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request, opt
context: ctx,
request: req,
closed: make(chan bool),
codec: newRpcPlusCodec(msg, c, cf),
codec: newRpcCodec(msg, c, cf),
}
ch := make(chan error, 1)
go func() {
ch <- stream.Send(req.Request())
ch <- stream.Send(req.Body())
}()
var grr error
@@ -444,7 +445,11 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
if !ok {
md = make(map[string]string)
}
id := uuid.New().String()
md["Content-Type"] = msg.ContentType()
md["X-Micro-Topic"] = msg.Topic()
md["X-Micro-Id"] = id
// encode message body
cf, err := r.newCodec(msg.ContentType())
@@ -452,7 +457,14 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
return errors.InternalServerError("go.micro.client", err.Error())
}
b := &buffer{bytes.NewBuffer(nil)}
if err := cf(b).Write(&codec.Message{Type: codec.Publication}, msg.Payload()); err != nil {
if err := cf(b).Write(&codec.Message{
Target: msg.Topic(),
Type: codec.Publication,
Header: map[string]string{
"X-Micro-Id": id,
"X-Micro-Topic": msg.Topic(),
},
}, msg.Payload()); err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
r.once.Do(func() {

View File

@@ -14,7 +14,7 @@ import (
func TestCallAddress(t *testing.T) {
var called bool
service := "test.service"
method := "Test.Method"
endpoint := "Test.Endpoint"
address := "10.1.10.1:8080"
wrap := func(cf CallFunc) CallFunc {
@@ -25,8 +25,8 @@ func TestCallAddress(t *testing.T) {
return fmt.Errorf("expected service: %s got %s", service, req.Service())
}
if req.Method() != method {
return fmt.Errorf("expected service: %s got %s", method, req.Method())
if req.Endpoint() != endpoint {
return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint())
}
if addr != address {
@@ -45,7 +45,7 @@ func TestCallAddress(t *testing.T) {
)
c.Options().Selector.Init(selector.Registry(r))
req := c.NewRequest(service, method, nil)
req := c.NewRequest(service, endpoint, nil)
// test calling remote address
if err := c.Call(context.Background(), req, nil, WithAddress(address)); err != nil {
@@ -60,7 +60,7 @@ func TestCallAddress(t *testing.T) {
func TestCallRetry(t *testing.T) {
service := "test.service"
method := "Test.Method"
endpoint := "Test.Endpoint"
address := "10.1.10.1:8080"
var called int
@@ -84,7 +84,7 @@ func TestCallRetry(t *testing.T) {
)
c.Options().Selector.Init(selector.Registry(r))
req := c.NewRequest(service, method, nil)
req := c.NewRequest(service, endpoint, nil)
// test calling remote address
if err := c.Call(context.Background(), req, nil, WithAddress(address)); err != nil {
@@ -101,7 +101,7 @@ func TestCallWrapper(t *testing.T) {
var called bool
id := "test.1"
service := "test.service"
method := "Test.Method"
endpoint := "Test.Endpoint"
host := "10.1.10.1"
port := 8080
address := "10.1.10.1:8080"
@@ -114,8 +114,8 @@ func TestCallWrapper(t *testing.T) {
return fmt.Errorf("expected service: %s got %s", service, req.Service())
}
if req.Method() != method {
return fmt.Errorf("expected service: %s got %s", method, req.Method())
if req.Endpoint() != endpoint {
return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint())
}
if addr != address {
@@ -146,7 +146,7 @@ func TestCallWrapper(t *testing.T) {
},
})
req := c.NewRequest(service, method, nil)
req := c.NewRequest(service, endpoint, nil)
if err := c.Call(context.Background(), req, nil); err != nil {
t.Fatal("call wrapper error", err)
}

View File

@@ -5,7 +5,11 @@ import (
errs "errors"
"github.com/micro/go-micro/codec"
raw "github.com/micro/go-micro/codec/bytes"
"github.com/micro/go-micro/codec/grpc"
"github.com/micro/go-micro/codec/json"
"github.com/micro/go-micro/codec/jsonrpc"
"github.com/micro/go-micro/codec/proto"
"github.com/micro/go-micro/codec/protorpc"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/transport"
@@ -28,7 +32,7 @@ var (
errShutdown = errs.New("connection is shut down")
)
type rpcPlusCodec struct {
type rpcCodec struct {
client transport.Client
codec codec.Codec
@@ -41,37 +45,18 @@ type readWriteCloser struct {
rbuf *bytes.Buffer
}
type clientCodec interface {
WriteRequest(*request, interface{}) error
ReadResponseHeader(*response) error
ReadResponseBody(interface{}) error
Close() error
}
type request struct {
Service string
ServiceMethod string // format: "Service.Method"
Seq uint64 // sequence number chosen by client
next *request // for free list in Server
}
type response struct {
ServiceMethod string // echoes that of the Request
Seq uint64 // echoes that of the request
Error string // error, if any.
next *response // for free list in Server
}
var (
defaultContentType = "application/octet-stream"
DefaultContentType = "application/protobuf"
defaultCodecs = map[string]codec.NewCodec{
"application/json": jsonrpc.NewCodec,
DefaultCodecs = map[string]codec.NewCodec{
"application/grpc": grpc.NewCodec,
"application/grpc+json": grpc.NewCodec,
"application/grpc+proto": grpc.NewCodec,
"application/protobuf": proto.NewCodec,
"application/json": json.NewCodec,
"application/json-rpc": jsonrpc.NewCodec,
"application/protobuf": protorpc.NewCodec,
"application/proto-rpc": protorpc.NewCodec,
"application/octet-stream": protorpc.NewCodec,
"application/octet-stream": raw.NewCodec,
}
)
@@ -89,12 +74,12 @@ func (rwc *readWriteCloser) Close() error {
return nil
}
func newRpcPlusCodec(req *transport.Message, client transport.Client, c codec.NewCodec) *rpcPlusCodec {
func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCodec) codec.Codec {
rwc := &readWriteCloser{
wbuf: bytes.NewBuffer(nil),
rbuf: bytes.NewBuffer(nil),
}
r := &rpcPlusCodec{
r := &rpcCodec{
buf: rwc,
client: client,
codec: c(rwc),
@@ -103,54 +88,93 @@ func newRpcPlusCodec(req *transport.Message, client transport.Client, c codec.Ne
return r
}
func (c *rpcPlusCodec) WriteRequest(req *request, body interface{}) error {
func (c *rpcCodec) Write(wm *codec.Message, body interface{}) error {
c.buf.wbuf.Reset()
m := &codec.Message{
Id: req.Seq,
Target: req.Service,
Method: req.ServiceMethod,
Type: codec.Request,
Header: map[string]string{},
Id: wm.Id,
Target: wm.Target,
Endpoint: wm.Endpoint,
Type: codec.Request,
Header: map[string]string{
"X-Micro-Id": wm.Id,
"X-Micro-Service": wm.Target,
"X-Micro-Endpoint": wm.Endpoint,
},
}
if err := c.codec.Write(m, body); err != nil {
return errors.InternalServerError("go.micro.client.codec", err.Error())
}
c.req.Body = c.buf.wbuf.Bytes()
// set body
if len(wm.Body) > 0 {
c.req.Body = wm.Body
} else {
c.req.Body = c.buf.wbuf.Bytes()
}
// set header
for k, v := range m.Header {
c.req.Header[k] = v
}
// send the request
if err := c.client.Send(c.req); err != nil {
return errors.InternalServerError("go.micro.client.transport", err.Error())
}
return nil
}
func (c *rpcPlusCodec) ReadResponseHeader(r *response) error {
func (c *rpcCodec) ReadHeader(wm *codec.Message, r codec.MessageType) error {
var m transport.Message
if err := c.client.Recv(&m); err != nil {
return errors.InternalServerError("go.micro.client.transport", err.Error())
}
c.buf.rbuf.Reset()
c.buf.rbuf.Write(m.Body)
var me codec.Message
err := c.codec.ReadHeader(&me, codec.Response)
r.ServiceMethod = me.Method
r.Seq = me.Id
r.Error = me.Error
// set headers
me.Header = m.Header
// read header
err := c.codec.ReadHeader(&me, r)
wm.Endpoint = me.Endpoint
wm.Id = me.Id
wm.Error = me.Error
// check error in header
if len(me.Error) == 0 {
wm.Error = me.Header["X-Micro-Error"]
}
// check method in header
if len(me.Endpoint) == 0 {
wm.Endpoint = me.Header["X-Micro-Endpoint"]
}
if len(me.Id) == 0 {
wm.Id = me.Header["X-Micro-Id"]
}
// return header error
if err != nil {
return errors.InternalServerError("go.micro.client.codec", err.Error())
}
return nil
}
func (c *rpcPlusCodec) ReadResponseBody(b interface{}) error {
func (c *rpcCodec) ReadBody(b interface{}) error {
// read body
if err := c.codec.ReadBody(b); err != nil {
return errors.InternalServerError("go.micro.client.codec", err.Error())
}
return nil
}
func (c *rpcPlusCodec) Close() error {
func (c *rpcCodec) Close() error {
c.buf.Close()
c.codec.Close()
if err := c.client.Close(); err != nil {
@@ -158,3 +182,7 @@ func (c *rpcPlusCodec) Close() error {
}
return nil
}
func (c *rpcCodec) String() string {
return "rpc"
}

View File

@@ -1,14 +1,19 @@
package client
import (
"github.com/micro/go-micro/codec"
)
type rpcRequest struct {
service string
method string
endpoint string
contentType string
request interface{}
codec codec.Codec
body interface{}
opts RequestOptions
}
func newRequest(service, method string, request interface{}, contentType string, reqOpts ...RequestOption) Request {
func newRequest(service, endpoint string, request interface{}, contentType string, reqOpts ...RequestOption) Request {
var opts RequestOptions
for _, o := range reqOpts {
@@ -22,8 +27,8 @@ func newRequest(service, method string, request interface{}, contentType string,
return &rpcRequest{
service: service,
method: method,
request: request,
endpoint: endpoint,
body: request,
contentType: contentType,
opts: opts,
}
@@ -37,12 +42,16 @@ func (r *rpcRequest) Service() string {
return r.service
}
func (r *rpcRequest) Method() string {
return r.method
func (r *rpcRequest) Endpoint() string {
return r.endpoint
}
func (r *rpcRequest) Request() interface{} {
return r.request
func (r *rpcRequest) Body() interface{} {
return r.body
}
func (r *rpcRequest) Codec() codec.Writer {
return r.codec
}
func (r *rpcRequest) Stream() bool {

View File

@@ -5,19 +5,19 @@ import (
)
func TestRequestOptions(t *testing.T) {
r := newRequest("service", "method", nil, "application/json")
r := newRequest("service", "endpoint", nil, "application/json")
if r.Service() != "service" {
t.Fatalf("expected 'service' got %s", r.Service())
}
if r.Method() != "method" {
t.Fatalf("expected 'method' got %s", r.Method())
if r.Endpoint() != "endpoint" {
t.Fatalf("expected 'endpoint' got %s", r.Endpoint())
}
if r.ContentType() != "application/json" {
t.Fatalf("expected 'method' got %s", r.ContentType())
t.Fatalf("expected 'endpoint' got %s", r.ContentType())
}
r2 := newRequest("service", "method", nil, "application/json", WithContentType("application/protobuf"))
r2 := newRequest("service", "endpoint", nil, "application/json", WithContentType("application/protobuf"))
if r2.ContentType() != "application/protobuf" {
t.Fatalf("expected 'method' got %s", r2.ContentType())
t.Fatalf("expected 'endpoint' got %s", r2.ContentType())
}
}

35
client/rpc_response.go Normal file
View File

@@ -0,0 +1,35 @@
package client
import (
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/transport"
)
type rpcResponse struct {
header map[string]string
body []byte
socket transport.Socket
codec codec.Codec
}
func (r *rpcResponse) Codec() codec.Writer {
return r.codec
}
func (r *rpcResponse) Header() map[string]string {
return r.header
}
func (r *rpcResponse) Read() ([]byte, error) {
var msg transport.Message
if err := r.socket.Recv(&msg); err != nil {
return nil, err
}
// set internals
r.header = msg.Header
r.body = msg.Body
return msg.Body, nil
}

View File

@@ -4,16 +4,18 @@ import (
"context"
"io"
"sync"
"github.com/micro/go-micro/codec"
)
// Implements the streamer interface
type rpcStream struct {
sync.RWMutex
seq uint64
id string
closed chan bool
err error
request Request
codec clientCodec
codec codec.Codec
context context.Context
}
@@ -43,18 +45,18 @@ func (r *rpcStream) Send(msg interface{}) error {
return errShutdown
}
seq := r.seq
req := request{
Service: r.request.Service(),
Seq: seq,
ServiceMethod: r.request.Method(),
req := codec.Message{
Id: r.id,
Target: r.request.Service(),
Endpoint: r.request.Endpoint(),
Type: codec.Request,
}
if err := r.codec.WriteRequest(&req, msg); err != nil {
if err := r.codec.Write(&req, msg); err != nil {
r.err = err
return err
}
return nil
}
@@ -67,8 +69,9 @@ func (r *rpcStream) Recv(msg interface{}) error {
return errShutdown
}
var resp response
if err := r.codec.ReadResponseHeader(&resp); err != nil {
var resp codec.Message
if err := r.codec.ReadHeader(&resp, codec.Response); err != nil {
if err == io.EOF && !r.isClosed() {
r.err = io.ErrUnexpectedEOF
return io.ErrUnexpectedEOF
@@ -87,11 +90,11 @@ func (r *rpcStream) Recv(msg interface{}) error {
} else {
r.err = io.EOF
}
if err := r.codec.ReadResponseBody(nil); err != nil {
if err := r.codec.ReadBody(nil); err != nil {
r.err = err
}
default:
if err := r.codec.ReadResponseBody(msg); err != nil {
if err := r.codec.ReadBody(msg); err != nil {
r.err = err
}
}

View File

@@ -26,7 +26,8 @@ import (
// selectors
"github.com/micro/go-micro/selector"
"github.com/micro/go-micro/selector/cache"
"github.com/micro/go-micro/selector/dns"
"github.com/micro/go-micro/selector/static"
// transports
"github.com/micro/go-micro/transport"
@@ -149,7 +150,6 @@ var (
Name: "selector",
EnvVar: "MICRO_SELECTOR",
Usage: "Selector used to pick nodes for querying",
Value: "cache",
},
cli.StringFlag{
Name: "transport",
@@ -179,7 +179,9 @@ var (
DefaultSelectors = map[string]func(...selector.Option) selector.Selector{
"default": selector.NewSelector,
"cache": cache.NewSelector,
"dns": dns.NewSelector,
"cache": selector.NewSelector,
"static": static.NewSelector,
}
DefaultServers = map[string]func(...server.Option) server.Server{

58
codec/bytes/bytes.go Normal file
View File

@@ -0,0 +1,58 @@
// Package bytes provides a bytes codec which does not encode or decode anything
package bytes
import (
"fmt"
"io"
"io/ioutil"
"github.com/micro/go-micro/codec"
)
type Codec struct {
Conn io.ReadWriteCloser
}
func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error {
return nil
}
func (c *Codec) ReadBody(b interface{}) error {
v, ok := b.(*[]byte)
if !ok {
return fmt.Errorf("failed to read body: %v is not type of *[]byte", b)
}
// read bytes
buf, err := ioutil.ReadAll(c.Conn)
if err != nil {
return err
}
// set bytes
*v = buf
return nil
}
func (c *Codec) Write(m *codec.Message, b interface{}) error {
v, ok := b.(*[]byte)
if !ok {
return fmt.Errorf("failed to write: %v is not type of *[]byte", b)
}
_, err := c.Conn.Write(*v)
return err
}
func (c *Codec) Close() error {
return c.Conn.Close()
}
func (c *Codec) String() string {
return "bytes"
}
func NewCodec(c io.ReadWriteCloser) codec.Codec {
return &Codec{
Conn: c,
}
}

38
codec/bytes/marshaler.go Normal file
View File

@@ -0,0 +1,38 @@
package bytes
import (
"errors"
)
type Marshaler struct{}
type Message struct {
Header map[string]string
Body []byte
}
func (n Marshaler) Marshal(v interface{}) ([]byte, error) {
switch v.(type) {
case []byte:
return v.([]byte), nil
case *Message:
return v.(*Message).Body, nil
}
return nil, errors.New("invalid message")
}
func (n Marshaler) Unmarshal(d []byte, v interface{}) error {
switch v.(type) {
case *[]byte:
ve := v.(*[]byte)
*ve = d
case *Message:
ve := v.(*Message)
ve.Body = d
}
return errors.New("invalid message")
}
func (n Marshaler) String() string {
return "bytes"
}

View File

@@ -23,10 +23,26 @@ type NewCodec func(io.ReadWriteCloser) Codec
// connection. ReadBody may be called with a nil argument to force the
// body to be read and discarded.
type Codec interface {
Reader
Writer
Close() error
String() string
}
type Reader interface {
ReadHeader(*Message, MessageType) error
ReadBody(interface{}) error
}
type Writer interface {
Write(*Message, interface{}) error
Close() error
}
// Marshaler is a simple encoding interface used for the broker/transport
// where headers are not supported by the underlying implementation.
type Marshaler interface {
Marshal(interface{}) ([]byte, error)
Unmarshal([]byte, interface{}) error
String() string
}
@@ -34,10 +50,13 @@ type Codec interface {
// the communication, likely followed by the body.
// In the case of an error, body may be nil.
type Message struct {
Id uint64
Type MessageType
Target string
Method string
Error string
Id string
Type MessageType
Target string
Endpoint string
Error string
// The values read from the socket
Header map[string]string
Body []byte
}

132
codec/grpc/grpc.go Normal file
View File

@@ -0,0 +1,132 @@
// Package grpc provides a grpc codec
package grpc
import (
"encoding/json"
"errors"
"fmt"
"io"
"strings"
"github.com/golang/protobuf/proto"
"github.com/micro/go-micro/codec"
)
type Codec struct {
Conn io.ReadWriteCloser
ContentType string
}
func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error {
if ct := m.Header["Content-Type"]; len(ct) > 0 {
c.ContentType = ct
}
if ct := m.Header["content-type"]; len(ct) > 0 {
c.ContentType = ct
}
// service method
path := m.Header[":path"]
if len(path) == 0 || path[0] != '/' {
m.Target = m.Header["X-Micro-Service"]
m.Endpoint = m.Header["X-Micro-Endpoint"]
} else {
// [ , a.package.Foo, Bar]
parts := strings.Split(path, "/")
if len(parts) != 3 {
return errors.New("Unknown request path")
}
service := strings.Split(parts[1], ".")
m.Endpoint = strings.Join([]string{service[len(service)-1], parts[2]}, ".")
m.Target = strings.Join(service[:len(service)-1], ".")
}
return nil
}
func (c *Codec) ReadBody(b interface{}) error {
// no body
if b == nil {
return nil
}
_, buf, err := decode(c.Conn)
if err != nil {
return err
}
switch c.ContentType {
case "application/grpc+json":
return json.Unmarshal(buf, b)
case "application/grpc+proto", "application/grpc":
return proto.Unmarshal(buf, b.(proto.Message))
}
return errors.New("Unsupported Content-Type")
}
func (c *Codec) Write(m *codec.Message, b interface{}) error {
var buf []byte
var err error
if ct := m.Header["Content-Type"]; len(ct) > 0 {
c.ContentType = ct
}
if ct := m.Header["content-type"]; len(ct) > 0 {
c.ContentType = ct
}
switch m.Type {
case codec.Request:
parts := strings.Split(m.Endpoint, ".")
m.Header[":method"] = "POST"
m.Header[":path"] = fmt.Sprintf("/%s.%s/%s", m.Target, parts[0], parts[1])
m.Header[":proto"] = "HTTP/2.0"
m.Header["te"] = "trailers"
m.Header["user-agent"] = "grpc-go/1.0.0"
m.Header[":authority"] = m.Target
m.Header["content-type"] = c.ContentType
case codec.Response:
m.Header["Trailer"] = "grpc-status, grpc-message"
m.Header["grpc-status"] = "0"
m.Header["grpc-message"] = ""
}
// marshal content
switch c.ContentType {
case "application/grpc+json":
buf, err = json.Marshal(b)
case "application/grpc+proto", "application/grpc":
pb, ok := b.(proto.Message)
if ok {
buf, err = proto.Marshal(pb)
}
default:
err = errors.New("Unsupported Content-Type")
}
// check error
if err != nil {
m.Header["grpc-status"] = "8"
m.Header["grpc-message"] = err.Error()
return err
}
return encode(0, buf, c.Conn)
}
func (c *Codec) Close() error {
return c.Conn.Close()
}
func (c *Codec) String() string {
return "grpc"
}
func NewCodec(c io.ReadWriteCloser) codec.Codec {
return &Codec{
Conn: c,
ContentType: "application/grpc",
}
}

70
codec/grpc/util.go Normal file
View File

@@ -0,0 +1,70 @@
package grpc
import (
"encoding/binary"
"fmt"
"io"
)
var (
maxMessageSize = 1024 * 1024 * 4
maxInt = int(^uint(0) >> 1)
)
func decode(r io.Reader) (uint8, []byte, error) {
header := make([]byte, 5)
// read the header
if _, err := r.Read(header[:]); err != nil {
return uint8(0), nil, err
}
// get encoding format e.g compressed
cf := uint8(header[0])
// get message length
length := binary.BigEndian.Uint32(header[1:])
// no encoding format
if length == 0 {
return cf, nil, nil
}
//
if int64(length) > int64(maxInt) {
return cf, nil, fmt.Errorf("grpc: received message larger than max length allowed on current machine (%d vs. %d)", length, maxInt)
}
if int(length) > maxMessageSize {
return cf, nil, fmt.Errorf("grpc: received message larger than max (%d vs. %d)", length, maxMessageSize)
}
msg := make([]byte, int(length))
if _, err := r.Read(msg); err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
return cf, nil, err
}
return cf, msg, nil
}
func encode(cf uint8, buf []byte, w io.Writer) error {
header := make([]byte, 5)
// set compression
header[0] = byte(cf)
// write length as header
binary.BigEndian.PutUint32(header[1:], uint32(len(buf)))
// read the header
if _, err := w.Write(header[:]); err != nil {
return err
}
// write the buffer
_, err := w.Write(buf)
return err
}

49
codec/json/json.go Normal file
View File

@@ -0,0 +1,49 @@
// Package json provides a json codec
package json
import (
"encoding/json"
"io"
"github.com/micro/go-micro/codec"
)
type Codec struct {
Conn io.ReadWriteCloser
Encoder *json.Encoder
Decoder *json.Decoder
}
func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error {
return nil
}
func (c *Codec) ReadBody(b interface{}) error {
if b == nil {
return nil
}
return c.Decoder.Decode(b)
}
func (c *Codec) Write(m *codec.Message, b interface{}) error {
if b == nil {
return nil
}
return c.Encoder.Encode(b)
}
func (c *Codec) Close() error {
return c.Conn.Close()
}
func (c *Codec) String() string {
return "json"
}
func NewCodec(c io.ReadWriteCloser) codec.Codec {
return &Codec{
Conn: c,
Decoder: json.NewDecoder(c),
Encoder: json.NewEncoder(c),
}
}

19
codec/json/marshaler.go Normal file
View File

@@ -0,0 +1,19 @@
package json
import (
"encoding/json"
)
type Marshaler struct{}
func (j Marshaler) Marshal(v interface{}) ([]byte, error) {
return json.Marshal(v)
}
func (j Marshaler) Unmarshal(d []byte, v interface{}) error {
return json.Unmarshal(d, v)
}
func (j Marshaler) String() string {
return "json"
}

View File

@@ -19,17 +19,17 @@ type clientCodec struct {
resp clientResponse
sync.Mutex
pending map[uint64]string
pending map[interface{}]string
}
type clientRequest struct {
Method string `json:"method"`
Params [1]interface{} `json:"params"`
ID uint64 `json:"id"`
ID interface{} `json:"id"`
}
type clientResponse struct {
ID uint64 `json:"id"`
ID interface{} `json:"id"`
Result *json.RawMessage `json:"result"`
Error interface{} `json:"error"`
}
@@ -39,15 +39,15 @@ func newClientCodec(conn io.ReadWriteCloser) *clientCodec {
dec: json.NewDecoder(conn),
enc: json.NewEncoder(conn),
c: conn,
pending: make(map[uint64]string),
pending: make(map[interface{}]string),
}
}
func (c *clientCodec) Write(m *codec.Message, b interface{}) error {
c.Lock()
c.pending[m.Id] = m.Method
c.pending[m.Id] = m.Endpoint
c.Unlock()
c.req.Method = m.Method
c.req.Method = m.Endpoint
c.req.Params[0] = b
c.req.ID = m.Id
return c.enc.Encode(&c.req)
@@ -66,12 +66,12 @@ func (c *clientCodec) ReadHeader(m *codec.Message) error {
}
c.Lock()
m.Method = c.pending[c.resp.ID]
m.Endpoint = c.pending[c.resp.ID]
delete(c.pending, c.resp.ID)
c.Unlock()
m.Error = ""
m.Id = c.resp.ID
m.Id = fmt.Sprintf("%v", c.resp.ID)
if c.resp.Error != nil {
x, ok := c.resp.Error.(string)
if !ok {

View File

@@ -31,7 +31,7 @@ func (j *jsonCodec) Write(m *codec.Message, b interface{}) error {
switch m.Type {
case codec.Request:
return j.c.Write(m, b)
case codec.Response:
case codec.Response, codec.Error:
return j.s.Write(m, b)
case codec.Publication:
data, err := json.Marshal(b)

View File

@@ -2,9 +2,8 @@ package jsonrpc
import (
"encoding/json"
"errors"
"fmt"
"io"
"sync"
"github.com/micro/go-micro/codec"
)
@@ -17,30 +16,25 @@ type serverCodec struct {
// temporary work space
req serverRequest
resp serverResponse
sync.Mutex
seq uint64
pending map[uint64]*json.RawMessage
}
type serverRequest struct {
Method string `json:"method"`
Params *json.RawMessage `json:"params"`
ID *json.RawMessage `json:"id"`
ID interface{} `json:"id"`
}
type serverResponse struct {
ID *json.RawMessage `json:"id"`
Result interface{} `json:"result"`
Error interface{} `json:"error"`
ID interface{} `json:"id"`
Result interface{} `json:"result"`
Error interface{} `json:"error"`
}
func newServerCodec(conn io.ReadWriteCloser) *serverCodec {
return &serverCodec{
dec: json.NewDecoder(conn),
enc: json.NewEncoder(conn),
c: conn,
pending: make(map[uint64]*json.RawMessage),
dec: json.NewDecoder(conn),
enc: json.NewEncoder(conn),
c: conn,
}
}
@@ -50,7 +44,7 @@ func (r *serverRequest) reset() {
*r.Params = (*r.Params)[0:0]
}
if r.ID != nil {
*r.ID = (*r.ID)[0:0]
r.ID = nil
}
}
@@ -59,15 +53,9 @@ func (c *serverCodec) ReadHeader(m *codec.Message) error {
if err := c.dec.Decode(&c.req); err != nil {
return err
}
m.Method = c.req.Method
c.Lock()
c.seq++
c.pending[c.seq] = c.req.ID
m.Endpoint = c.req.Method
m.Id = fmt.Sprintf("%v", c.req.ID)
c.req.ID = nil
m.Id = c.seq
c.Unlock()
return nil
}
@@ -84,19 +72,7 @@ var null = json.RawMessage([]byte("null"))
func (c *serverCodec) Write(m *codec.Message, x interface{}) error {
var resp serverResponse
c.Lock()
b, ok := c.pending[m.Id]
if !ok {
c.Unlock()
return errors.New("invalid sequence number in response")
}
c.Unlock()
if b == nil {
// Invalid request so no id. Use JSON null.
b = &null
}
resp.ID = b
resp.ID = m.Id
resp.Result = x
if m.Error == "" {
resp.Error = nil

19
codec/proto/marshaler.go Normal file
View File

@@ -0,0 +1,19 @@
package proto
import (
"github.com/golang/protobuf/proto"
)
type Marshaler struct{}
func (Marshaler) Marshal(v interface{}) ([]byte, error) {
return proto.Marshal(v.(proto.Message))
}
func (Marshaler) Unmarshal(data []byte, v interface{}) error {
return proto.Unmarshal(data, v.(proto.Message))
}
func (Marshaler) Name() string {
return "proto"
}

56
codec/proto/proto.go Normal file
View File

@@ -0,0 +1,56 @@
// Package proto provides a proto codec
package proto
import (
"io"
"io/ioutil"
"github.com/golang/protobuf/proto"
"github.com/micro/go-micro/codec"
)
type Codec struct {
Conn io.ReadWriteCloser
}
func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error {
return nil
}
func (c *Codec) ReadBody(b interface{}) error {
buf, err := ioutil.ReadAll(c.Conn)
if err != nil {
return err
}
if b == nil {
return nil
}
return proto.Unmarshal(buf, b.(proto.Message))
}
func (c *Codec) Write(m *codec.Message, b interface{}) error {
p, ok := b.(proto.Message)
if !ok {
return nil
}
buf, err := proto.Marshal(p)
if err != nil {
return err
}
_, err = c.Conn.Write(buf)
return err
}
func (c *Codec) Close() error {
return c.Conn.Close()
}
func (c *Codec) String() string {
return "proto"
}
func NewCodec(c io.ReadWriteCloser) codec.Codec {
return &Codec{
Conn: c,
}
}

View File

@@ -5,6 +5,7 @@ import (
"bytes"
"fmt"
"io"
"strconv"
"sync"
"github.com/golang/protobuf/proto"
@@ -31,13 +32,22 @@ func (c *protoCodec) String() string {
return "proto-rpc"
}
func id(id string) *uint64 {
p, err := strconv.ParseInt(id, 10, 64)
if err != nil {
p = 0
}
i := uint64(p)
return &i
}
func (c *protoCodec) Write(m *codec.Message, b interface{}) error {
switch m.Type {
case codec.Request:
c.Lock()
defer c.Unlock()
// This is protobuf, of course we copy it.
pbr := &Request{ServiceMethod: &m.Method, Seq: &m.Id}
pbr := &Request{ServiceMethod: &m.Endpoint, Seq: id(m.Id)}
data, err := proto.Marshal(pbr)
if err != nil {
return err
@@ -60,10 +70,10 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error {
return err
}
}
case codec.Response:
case codec.Response, codec.Error:
c.Lock()
defer c.Unlock()
rtmp := &Response{ServiceMethod: &m.Method, Seq: &m.Id, Error: &m.Error}
rtmp := &Response{ServiceMethod: &m.Endpoint, Seq: id(m.Id), Error: &m.Error}
data, err := proto.Marshal(rtmp)
if err != nil {
return err
@@ -116,8 +126,8 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
if err != nil {
return err
}
m.Method = rtmp.GetServiceMethod()
m.Id = rtmp.GetSeq()
m.Endpoint = rtmp.GetServiceMethod()
m.Id = fmt.Sprintf("%d", rtmp.GetSeq())
case codec.Response:
data, err := ReadNetString(c.rwc)
if err != nil {
@@ -128,8 +138,8 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
if err != nil {
return err
}
m.Method = rtmp.GetServiceMethod()
m.Id = rtmp.GetSeq()
m.Endpoint = rtmp.GetServiceMethod()
m.Id = fmt.Sprintf("%d", rtmp.GetSeq())
m.Error = rtmp.GetError()
case codec.Publication:
_, err := io.Copy(c.buf, c.rwc)

View File

@@ -2,9 +2,12 @@
package gossip
import (
"context"
"encoding/json"
"io/ioutil"
"net"
"os"
"strconv"
"strings"
"sync"
"time"
@@ -12,7 +15,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/google/uuid"
"github.com/hashicorp/memberlist"
"github.com/micro/go-log"
log "github.com/micro/go-log"
"github.com/micro/go-micro/registry"
pb "github.com/micro/go-micro/registry/gossip/proto"
"github.com/mitchellh/hashstructure"
@@ -56,7 +59,7 @@ type update struct {
var (
// You should change this if using secure
DefaultSecret = []byte("gossip")
DefaultSecret = []byte("micro-gossip-key") // exactly 16 bytes
ExpiryTick = time.Second * 5
)
@@ -104,14 +107,40 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
RetransmitMult: 3,
}
// machine hostname
hostname, _ := os.Hostname()
// create a new default config
c := memberlist.DefaultLocalConfig()
// set bind to random port
c.BindPort = 0
if optConfig, ok := g.options.Context.Value(contextConfig{}).(*memberlist.Config); ok && optConfig != nil {
c = optConfig
}
if hostport, ok := g.options.Context.Value(contextAddress{}).(string); ok {
host, port, err := net.SplitHostPort(hostport)
if err == nil {
pn, err := strconv.Atoi(port)
if err == nil {
c.BindPort = pn
}
c.BindAddr = host
}
} else {
// set bind to random port
c.BindPort = 0
}
if hostport, ok := g.options.Context.Value(contextAdvertise{}).(string); ok {
host, port, err := net.SplitHostPort(hostport)
if err == nil {
pn, err := strconv.Atoi(port)
if err == nil {
c.AdvertisePort = pn
}
c.AdvertiseAddr = host
}
}
// machine hostname
hostname, _ := os.Hostname()
// set the name
c.Name = strings.Join([]string{"micro", hostname, uuid.New().String()}, "-")
@@ -135,8 +164,6 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
c.SecretKey = k
}
// TODO: set advertise addr to advertise behind nat
// create the memberlist
m, err := memberlist.Create(c)
if err != nil {
@@ -544,20 +571,22 @@ func (g *gossipRegistry) String() string {
func NewRegistry(opts ...registry.Option) registry.Registry {
gossip := &gossipRegistry{
options: registry.Options{},
options: registry.Options{
Context: context.Background(),
},
updates: make(chan *update, 100),
services: make(map[string][]*registry.Service),
watchers: make(map[string]chan *registry.Result),
}
// configure the gossiper
if err := configure(gossip, opts...); err != nil {
log.Fatal("Error configuring registry: %v", err)
}
// run the updater
go gossip.run()
// configure the gossiper
if err := configure(gossip, opts...); err != nil {
log.Fatalf("Error configuring registry: %v", err)
}
// wait for setup
<-time.After(gossip.interval * 2)

View File

@@ -3,6 +3,7 @@ package gossip
import (
"context"
"github.com/hashicorp/memberlist"
"github.com/micro/go-micro/registry"
)
@@ -15,3 +16,30 @@ func Secret(k []byte) registry.Option {
o.Context = context.WithValue(o.Context, contextSecretKey{}, k)
}
}
type contextAddress struct{}
// Address to bind to - host:port
func Address(a string) registry.Option {
return func(o *registry.Options) {
o.Context = context.WithValue(o.Context, contextAddress{}, a)
}
}
type contextConfig struct{}
// Config allow to inject a *memberlist.Config struct for configuring gossip
func Config(c *memberlist.Config) registry.Option {
return func(o *registry.Options) {
o.Context = context.WithValue(o.Context, contextConfig{}, c)
}
}
type contextAdvertise struct{}
// The address to advertise for other gossip members - host:port
func Advertise(a string) registry.Option {
return func(o *registry.Options) {
o.Context = context.WithValue(o.Context, contextAdvertise{}, a)
}
}

View File

@@ -3,11 +3,9 @@
package gossip
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
math "math"
)
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
@@ -18,7 +16,7 @@ var _ = math.Inf
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
// Update is the message broadcast
type Update struct {
@@ -45,17 +43,16 @@ func (m *Update) Reset() { *m = Update{} }
func (m *Update) String() string { return proto.CompactTextString(m) }
func (*Update) ProtoMessage() {}
func (*Update) Descriptor() ([]byte, []int) {
return fileDescriptor_18cba623e76e57f3, []int{0}
return fileDescriptor_gossip_fd1eb378131a5d12, []int{0}
}
func (m *Update) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Update.Unmarshal(m, b)
}
func (m *Update) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Update.Marshal(b, m, deterministic)
}
func (m *Update) XXX_Merge(src proto.Message) {
xxx_messageInfo_Update.Merge(m, src)
func (dst *Update) XXX_Merge(src proto.Message) {
xxx_messageInfo_Update.Merge(dst, src)
}
func (m *Update) XXX_Size() int {
return xxx_messageInfo_Update.Size(m)
@@ -121,10 +118,10 @@ func init() {
}
func init() {
proto.RegisterFile("github.com/micro/go-micro/registry/gossip/proto/gossip.proto", fileDescriptor_18cba623e76e57f3)
proto.RegisterFile("github.com/micro/go-micro/registry/gossip/proto/gossip.proto", fileDescriptor_gossip_fd1eb378131a5d12)
}
var fileDescriptor_18cba623e76e57f3 = []byte{
var fileDescriptor_gossip_fd1eb378131a5d12 = []byte{
// 251 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x90, 0xcf, 0x4a, 0xc4, 0x30,
0x10, 0x87, 0x69, 0xb6, 0x9b, 0xb5, 0xe3, 0x1f, 0x64, 0x10, 0x09, 0xb2, 0x87, 0xe2, 0xa9, 0x17,

View File

@@ -38,20 +38,35 @@ func cp(current []*registry.Service) []*registry.Service {
}
func addNodes(old, neu []*registry.Node) []*registry.Node {
var nodes []*registry.Node
// add all new nodes
for _, n := range neu {
var seen bool
for i, o := range old {
node := *n
nodes = append(nodes, &node)
}
// look at old nodes
for _, o := range old {
var exists bool
// check against new nodes
for _, n := range nodes {
// ids match then skip
if o.Id == n.Id {
seen = true
old[i] = n
exists = true
break
}
}
if !seen {
old = append(old, n)
// keep old node
if !exists {
node := *o
nodes = append(nodes, &node)
}
}
return old
return nodes
}
func addServices(old, neu []*registry.Service) []*registry.Service {
@@ -91,19 +106,27 @@ func delNodes(old, del []*registry.Node) []*registry.Node {
func delServices(old, del []*registry.Service) []*registry.Service {
var services []*registry.Service
for i, o := range old {
for _, o := range old {
srv := new(registry.Service)
*srv = *o
var rem bool
for _, s := range del {
if o.Version == s.Version {
old[i].Nodes = delNodes(o.Nodes, s.Nodes)
if len(old[i].Nodes) == 0 {
if srv.Version == s.Version {
srv.Nodes = delNodes(srv.Nodes, s.Nodes)
if len(srv.Nodes) == 0 {
rem = true
}
}
}
if !rem {
services = append(services, o)
services = append(services, srv)
}
}
return services
}

View File

@@ -2,10 +2,13 @@
package mock
import (
"sync"
"github.com/micro/go-micro/registry"
)
type mockRegistry struct {
sync.RWMutex
Services map[string][]*registry.Service
}
@@ -55,11 +58,17 @@ var (
)
func (m *mockRegistry) init() {
m.Lock()
defer m.Unlock()
// add some mock data
m.Services = mockData
}
func (m *mockRegistry) GetService(service string) ([]*registry.Service, error) {
m.Lock()
defer m.Unlock()
s, ok := m.Services[service]
if !ok || len(s) == 0 {
return nil, registry.ErrNotFound
@@ -69,6 +78,9 @@ func (m *mockRegistry) GetService(service string) ([]*registry.Service, error) {
}
func (m *mockRegistry) ListServices() ([]*registry.Service, error) {
m.Lock()
defer m.Unlock()
var services []*registry.Service
for _, service := range m.Services {
services = append(services, service...)
@@ -77,12 +89,18 @@ func (m *mockRegistry) ListServices() ([]*registry.Service, error) {
}
func (m *mockRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error {
m.Lock()
defer m.Unlock()
services := addServices(m.Services[s.Name], []*registry.Service{s})
m.Services[s.Name] = services
return nil
}
func (m *mockRegistry) Deregister(s *registry.Service) error {
m.Lock()
defer m.Unlock()
services := delServices(m.Services[s.Name], []*registry.Service{s})
m.Services[s.Name] = services
return nil

View File

@@ -1,424 +0,0 @@
// Package cache is a caching selector. It uses the registry watcher.
package cache
import (
"sync"
"time"
"github.com/micro/go-log"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
)
type cacheSelector struct {
so selector.Options
ttl time.Duration
// registry cache
sync.Mutex
cache map[string][]*registry.Service
ttls map[string]time.Time
watched map[string]bool
// used to close or reload watcher
reload chan bool
exit chan bool
}
var (
DefaultTTL = time.Minute
)
func (c *cacheSelector) quit() bool {
select {
case <-c.exit:
return true
default:
return false
}
}
// cp copies a service. Because we're caching handing back pointers would
// create a race condition, so we do this instead
// its fast enough
func (c *cacheSelector) cp(current []*registry.Service) []*registry.Service {
var services []*registry.Service
for _, service := range current {
// copy service
s := new(registry.Service)
*s = *service
// copy nodes
var nodes []*registry.Node
for _, node := range service.Nodes {
n := new(registry.Node)
*n = *node
nodes = append(nodes, n)
}
s.Nodes = nodes
// copy endpoints
var eps []*registry.Endpoint
for _, ep := range service.Endpoints {
e := new(registry.Endpoint)
*e = *ep
eps = append(eps, e)
}
s.Endpoints = eps
// append service
services = append(services, s)
}
return services
}
func (c *cacheSelector) del(service string) {
delete(c.cache, service)
delete(c.ttls, service)
}
func (c *cacheSelector) get(service string) ([]*registry.Service, error) {
c.Lock()
defer c.Unlock()
// watch service if not watched
if _, ok := c.watched[service]; !ok {
go c.run(service)
c.watched[service] = true
}
// get does the actual request for a service
// it also caches it
get := func(service string) ([]*registry.Service, error) {
// ask the registry
services, err := c.so.Registry.GetService(service)
if err != nil {
return nil, err
}
// cache results
c.set(service, c.cp(services))
return services, nil
}
// check the cache first
services, ok := c.cache[service]
// cache miss or no services
if !ok || len(services) == 0 {
return get(service)
}
// got cache but lets check ttl
ttl, kk := c.ttls[service]
// within ttl so return cache
if kk && time.Since(ttl) < c.ttl {
return c.cp(services), nil
}
// expired entry so get service
services, err := get(service)
// no error then return error
if err == nil {
return services, nil
}
// not found error then return
if err == registry.ErrNotFound {
return nil, selector.ErrNotFound
}
// other error
// return expired cache as last resort
return c.cp(services), nil
}
func (c *cacheSelector) set(service string, services []*registry.Service) {
c.cache[service] = services
c.ttls[service] = time.Now().Add(c.ttl)
}
func (c *cacheSelector) update(res *registry.Result) {
if res == nil || res.Service == nil {
return
}
c.Lock()
defer c.Unlock()
services, ok := c.cache[res.Service.Name]
if !ok {
// we're not going to cache anything
// unless there was already a lookup
return
}
if len(res.Service.Nodes) == 0 {
switch res.Action {
case "delete":
c.del(res.Service.Name)
}
return
}
// existing service found
var service *registry.Service
var index int
for i, s := range services {
if s.Version == res.Service.Version {
service = s
index = i
}
}
switch res.Action {
case "create", "update":
if service == nil {
c.set(res.Service.Name, append(services, res.Service))
return
}
// append old nodes to new service
for _, cur := range service.Nodes {
var seen bool
for _, node := range res.Service.Nodes {
if cur.Id == node.Id {
seen = true
break
}
}
if !seen {
res.Service.Nodes = append(res.Service.Nodes, cur)
}
}
services[index] = res.Service
c.set(res.Service.Name, services)
case "delete":
if service == nil {
return
}
var nodes []*registry.Node
// filter cur nodes to remove the dead one
for _, cur := range service.Nodes {
var seen bool
for _, del := range res.Service.Nodes {
if del.Id == cur.Id {
seen = true
break
}
}
if !seen {
nodes = append(nodes, cur)
}
}
// still got nodes, save and return
if len(nodes) > 0 {
service.Nodes = nodes
services[index] = service
c.set(service.Name, services)
return
}
// zero nodes left
// only have one thing to delete
// nuke the thing
if len(services) == 1 {
c.del(service.Name)
return
}
// still have more than 1 service
// check the version and keep what we know
var srvs []*registry.Service
for _, s := range services {
if s.Version != service.Version {
srvs = append(srvs, s)
}
}
// save
c.set(service.Name, srvs)
}
}
// run starts the cache watcher loop
// it creates a new watcher if there's a problem
// reloads the watcher if Init is called
// and returns when Close is called
func (c *cacheSelector) run(name string) {
for {
// exit early if already dead
if c.quit() {
return
}
// create new watcher
w, err := c.so.Registry.Watch(
registry.WatchService(name),
)
if err != nil {
if c.quit() {
return
}
log.Log(err)
time.Sleep(time.Second)
continue
}
// watch for events
if err := c.watch(w); err != nil {
if c.quit() {
return
}
log.Log(err)
continue
}
}
}
// watch loops the next event and calls update
// it returns if there's an error
func (c *cacheSelector) watch(w registry.Watcher) error {
defer w.Stop()
// manage this loop
go func() {
// wait for exit or reload signal
select {
case <-c.exit:
case <-c.reload:
}
// stop the watcher
w.Stop()
}()
for {
res, err := w.Next()
if err != nil {
return err
}
c.update(res)
}
}
func (c *cacheSelector) Init(opts ...selector.Option) error {
for _, o := range opts {
o(&c.so)
}
// reload the watcher
go func() {
select {
case <-c.exit:
return
default:
c.reload <- true
}
}()
return nil
}
func (c *cacheSelector) Options() selector.Options {
return c.so
}
func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
sopts := selector.SelectOptions{
Strategy: c.so.Strategy,
}
for _, opt := range opts {
opt(&sopts)
}
// get the service
// try the cache first
// if that fails go directly to the registry
services, err := c.get(service)
if err != nil {
return nil, err
}
// apply the filters
for _, filter := range sopts.Filters {
services = filter(services)
}
// if there's nothing left, return
if len(services) == 0 {
return nil, selector.ErrNoneAvailable
}
return sopts.Strategy(services), nil
}
func (c *cacheSelector) Mark(service string, node *registry.Node, err error) {
}
func (c *cacheSelector) Reset(service string) {
}
// Close stops the watcher and destroys the cache
func (c *cacheSelector) Close() error {
c.Lock()
c.cache = make(map[string][]*registry.Service)
c.watched = make(map[string]bool)
c.Unlock()
select {
case <-c.exit:
return nil
default:
close(c.exit)
}
return nil
}
func (c *cacheSelector) String() string {
return "cache"
}
func NewSelector(opts ...selector.Option) selector.Selector {
sopts := selector.Options{
Strategy: selector.Random,
}
for _, opt := range opts {
opt(&sopts)
}
if sopts.Registry == nil {
sopts.Registry = registry.DefaultRegistry
}
ttl := DefaultTTL
if sopts.Context != nil {
if t, ok := sopts.Context.Value(ttlKey{}).(time.Duration); ok {
ttl = t
}
}
return &cacheSelector{
so: sopts,
ttl: ttl,
watched: make(map[string]bool),
cache: make(map[string][]*registry.Service),
ttls: make(map[string]time.Time),
reload: make(chan bool, 1),
exit: make(chan bool),
}
}

View File

@@ -1,29 +0,0 @@
package cache
import (
"testing"
"github.com/micro/go-micro/registry/mock"
"github.com/micro/go-micro/selector"
)
func TestCacheSelector(t *testing.T) {
counts := map[string]int{}
cache := NewSelector(selector.Registry(mock.NewRegistry()))
next, err := cache.Select("foo")
if err != nil {
t.Errorf("Unexpected error calling cache select: %v", err)
}
for i := 0; i < 100; i++ {
node, err := next()
if err != nil {
t.Errorf("Expected node err, got err: %v", err)
}
counts[node.Id]++
}
t.Logf("Cache Counts %v", counts)
}

View File

@@ -1,27 +1,341 @@
package selector
import (
"sync"
"time"
"github.com/micro/go-log"
"github.com/micro/go-micro/registry"
)
type defaultSelector struct {
so Options
type registrySelector struct {
so Options
ttl time.Duration
// registry cache
sync.RWMutex
cache map[string][]*registry.Service
ttls map[string]time.Time
watched map[string]bool
// used to close or reload watcher
reload chan bool
exit chan bool
}
func (r *defaultSelector) Init(opts ...Option) error {
for _, o := range opts {
o(&r.so)
var (
DefaultTTL = time.Minute
)
func (c *registrySelector) quit() bool {
select {
case <-c.exit:
return true
default:
return false
}
}
// cp copies a service. Because we're caching handing back pointers would
// create a race condition, so we do this instead
// its fast enough
func (c *registrySelector) cp(current []*registry.Service) []*registry.Service {
var services []*registry.Service
for _, service := range current {
// copy service
s := new(registry.Service)
*s = *service
// copy nodes
var nodes []*registry.Node
for _, node := range service.Nodes {
n := new(registry.Node)
*n = *node
nodes = append(nodes, n)
}
s.Nodes = nodes
// copy endpoints
var eps []*registry.Endpoint
for _, ep := range service.Endpoints {
e := new(registry.Endpoint)
*e = *ep
eps = append(eps, e)
}
s.Endpoints = eps
// append service
services = append(services, s)
}
return services
}
func (c *registrySelector) del(service string) {
delete(c.cache, service)
delete(c.ttls, service)
}
func (c *registrySelector) get(service string) ([]*registry.Service, error) {
// read lock
c.RLock()
// check the cache first
services, ok := c.cache[service]
// get cache ttl
ttl, kk := c.ttls[service]
// got services && within ttl so return cache
if ok && kk && time.Since(ttl) < c.ttl {
// make a copy
cp := c.cp(services)
// unlock the read
c.RUnlock()
// return servics
return cp, nil
}
// get does the actual request for a service and cache it
get := func(service string) ([]*registry.Service, error) {
// ask the registry
services, err := c.so.Registry.GetService(service)
if err != nil {
return nil, err
}
// cache results
c.Lock()
c.set(service, c.cp(services))
c.Unlock()
return services, nil
}
// watch service if not watched
if _, ok := c.watched[service]; !ok {
go c.run(service)
}
// unlock the read lock
c.RUnlock()
// get and return services
return get(service)
}
func (c *registrySelector) set(service string, services []*registry.Service) {
c.cache[service] = services
c.ttls[service] = time.Now().Add(c.ttl)
}
func (c *registrySelector) update(res *registry.Result) {
if res == nil || res.Service == nil {
return
}
c.Lock()
defer c.Unlock()
services, ok := c.cache[res.Service.Name]
if !ok {
// we're not going to cache anything
// unless there was already a lookup
return
}
if len(res.Service.Nodes) == 0 {
switch res.Action {
case "delete":
c.del(res.Service.Name)
}
return
}
// existing service found
var service *registry.Service
var index int
for i, s := range services {
if s.Version == res.Service.Version {
service = s
index = i
}
}
switch res.Action {
case "create", "update":
if service == nil {
c.set(res.Service.Name, append(services, res.Service))
return
}
// append old nodes to new service
for _, cur := range service.Nodes {
var seen bool
for _, node := range res.Service.Nodes {
if cur.Id == node.Id {
seen = true
break
}
}
if !seen {
res.Service.Nodes = append(res.Service.Nodes, cur)
}
}
services[index] = res.Service
c.set(res.Service.Name, services)
case "delete":
if service == nil {
return
}
var nodes []*registry.Node
// filter cur nodes to remove the dead one
for _, cur := range service.Nodes {
var seen bool
for _, del := range res.Service.Nodes {
if del.Id == cur.Id {
seen = true
break
}
}
if !seen {
nodes = append(nodes, cur)
}
}
// still got nodes, save and return
if len(nodes) > 0 {
service.Nodes = nodes
services[index] = service
c.set(service.Name, services)
return
}
// zero nodes left
// only have one thing to delete
// nuke the thing
if len(services) == 1 {
c.del(service.Name)
return
}
// still have more than 1 service
// check the version and keep what we know
var srvs []*registry.Service
for _, s := range services {
if s.Version != service.Version {
srvs = append(srvs, s)
}
}
// save
c.set(service.Name, srvs)
}
}
// run starts the cache watcher loop
// it creates a new watcher if there's a problem
// reloads the watcher if Init is called
// and returns when Close is called
func (c *registrySelector) run(name string) {
// set watcher
c.Lock()
c.watched[name] = true
c.Unlock()
// delete watcher on exit
defer func() {
c.Lock()
delete(c.watched, name)
c.Unlock()
}()
for {
// exit early if already dead
if c.quit() {
return
}
// create new watcher
w, err := c.so.Registry.Watch(
registry.WatchService(name),
)
if err != nil {
if c.quit() {
return
}
log.Log(err)
time.Sleep(time.Second)
continue
}
// watch for events
if err := c.watch(w); err != nil {
if c.quit() {
return
}
log.Log(err)
continue
}
}
}
// watch loops the next event and calls update
// it returns if there's an error
func (c *registrySelector) watch(w registry.Watcher) error {
defer w.Stop()
// manage this loop
go func() {
// wait for exit or reload signal
select {
case <-c.exit:
case <-c.reload:
}
// stop the watcher
w.Stop()
}()
for {
res, err := w.Next()
if err != nil {
return err
}
c.update(res)
}
}
func (c *registrySelector) Init(opts ...Option) error {
for _, o := range opts {
o(&c.so)
}
// reload the watcher
go func() {
select {
case <-c.exit:
return
default:
c.reload <- true
}
}()
return nil
}
func (r *defaultSelector) Options() Options {
return r.so
func (c *registrySelector) Options() Options {
return c.so
}
func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, error) {
func (c *registrySelector) Select(service string, opts ...SelectOption) (Next, error) {
sopts := SelectOptions{
Strategy: r.so.Strategy,
Strategy: c.so.Strategy,
}
for _, opt := range opts {
@@ -29,7 +343,9 @@ func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, er
}
// get the service
services, err := r.so.Registry.GetService(service)
// try the cache first
// if that fails go directly to the registry
services, err := c.get(service)
if err != nil {
return nil, err
}
@@ -47,21 +363,33 @@ func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, er
return sopts.Strategy(services), nil
}
func (r *defaultSelector) Mark(service string, node *registry.Node, err error) {
func (c *registrySelector) Mark(service string, node *registry.Node, err error) {
}
func (r *defaultSelector) Reset(service string) {
func (c *registrySelector) Reset(service string) {
}
func (r *defaultSelector) Close() error {
// Close stops the watcher and destroys the cache
func (c *registrySelector) Close() error {
c.Lock()
c.cache = make(map[string][]*registry.Service)
c.watched = make(map[string]bool)
c.Unlock()
select {
case <-c.exit:
return nil
default:
close(c.exit)
}
return nil
}
func (r *defaultSelector) String() string {
return "default"
func (c *registrySelector) String() string {
return "registry"
}
func newDefaultSelector(opts ...Option) Selector {
func NewSelector(opts ...Option) Selector {
sopts := Options{
Strategy: Random,
}
@@ -74,7 +402,21 @@ func newDefaultSelector(opts ...Option) Selector {
sopts.Registry = registry.DefaultRegistry
}
return &defaultSelector{
so: sopts,
ttl := DefaultTTL
if sopts.Context != nil {
if t, ok := sopts.Context.Value("selector_ttl").(time.Duration); ok {
ttl = t
}
}
return &registrySelector{
so: sopts,
ttl: ttl,
watched: make(map[string]bool),
cache: make(map[string][]*registry.Service),
ttls: make(map[string]time.Time),
reload: make(chan bool, 1),
exit: make(chan bool),
}
}

View File

@@ -6,14 +6,14 @@ import (
"github.com/micro/go-micro/registry/mock"
)
func TestDefaultSelector(t *testing.T) {
func TestRegistrySelector(t *testing.T) {
counts := map[string]int{}
rs := newDefaultSelector(Registry(mock.NewRegistry()))
cache := NewSelector(Registry(mock.NewRegistry()))
next, err := rs.Select("foo")
next, err := cache.Select("foo")
if err != nil {
t.Errorf("Unexpected error calling default select: %v", err)
t.Errorf("Unexpected error calling cache select: %v", err)
}
for i := 0; i < 100; i++ {
@@ -24,5 +24,5 @@ func TestDefaultSelector(t *testing.T) {
counts[node.Id]++
}
t.Logf("Default Counts %v", counts)
t.Logf("Selector Counts %v", counts)
}

128
selector/dns/dns.go Normal file
View File

@@ -0,0 +1,128 @@
// Package dns provides a dns SRV selector
package dns
import (
"net"
"strconv"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
)
type dnsSelector struct {
options selector.Options
domain string
}
var (
DefaultDomain = "local"
)
func (d *dnsSelector) Init(opts ...selector.Option) error {
for _, o := range opts {
o(&d.options)
}
return nil
}
func (d *dnsSelector) Options() selector.Options {
return d.options
}
func (d *dnsSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
var srv []*net.SRV
// check if its host:port
host, port, err := net.SplitHostPort(service)
// not host:port
if err != nil {
// lookup the SRV record
_, srvs, err := net.LookupSRV(service, "tcp", d.domain)
if err != nil {
return nil, err
}
// set SRV records
srv = srvs
// got host:port
} else {
p, _ := strconv.Atoi(port)
// lookup the A record
ips, err := net.LookupHost(host)
if err != nil {
return nil, err
}
// create SRV records
for _, ip := range ips {
srv = append(srv, &net.SRV{
Target: ip,
Port: uint16(p),
})
}
}
var nodes []*registry.Node
for _, node := range srv {
nodes = append(nodes, &registry.Node{
Id: node.Target,
Address: node.Target,
Port: int(node.Port),
})
}
services := []*registry.Service{
&registry.Service{
Name: service,
Nodes: nodes,
},
}
sopts := selector.SelectOptions{
Strategy: d.options.Strategy,
}
for _, opt := range opts {
opt(&sopts)
}
// apply the filters
for _, filter := range sopts.Filters {
services = filter(services)
}
// if there's nothing left, return
if len(services) == 0 {
return nil, selector.ErrNoneAvailable
}
return sopts.Strategy(services), nil
}
func (d *dnsSelector) Mark(service string, node *registry.Node, err error) {
return
}
func (d *dnsSelector) Reset(service string) {
return
}
func (d *dnsSelector) Close() error {
return nil
}
func (d *dnsSelector) String() string {
return "dns"
}
func NewSelector(opts ...selector.Option) selector.Selector {
options := selector.Options{
Strategy: selector.Random,
}
for _, o := range opts {
o(&options)
}
return &dnsSelector{options: options, domain: DefaultDomain}
}

View File

@@ -1,4 +1,4 @@
package cache
package registry
import (
"context"
@@ -7,14 +7,12 @@ import (
"github.com/micro/go-micro/selector"
)
type ttlKey struct{}
// Set the cache ttl
// Set the registry cache ttl
func TTL(t time.Duration) selector.Option {
return func(o *selector.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, ttlKey{}, t)
o.Context = context.WithValue(o.Context, "selector_ttl", t)
}
}

View File

@@ -0,0 +1,11 @@
// Package registry uses the go-micro registry for selection
package registry
import (
"github.com/micro/go-micro/selector"
)
// NewSelector returns a new registry selector
func NewSelector(opts ...selector.Option) selector.Selector {
return selector.NewSelector(opts...)
}

View File

@@ -1,4 +1,4 @@
// Package selector is a way to load balance service nodes
// Package selector is a way to pick a list of service nodes
package selector
import (
@@ -35,12 +35,8 @@ type Filter func([]*registry.Service) []*registry.Service
type Strategy func([]*registry.Service) Next
var (
DefaultSelector = newDefaultSelector()
DefaultSelector = NewSelector()
ErrNotFound = errors.New("not found")
ErrNoneAvailable = errors.New("none available")
)
func NewSelector(opts ...Option) Selector {
return newDefaultSelector(opts...)
}

71
selector/static/static.go Normal file
View File

@@ -0,0 +1,71 @@
// Package static provides a static resolver which returns the name/ip passed in without any change
package static
import (
"net"
"strconv"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
)
// staticSelector is a static selector
type staticSelector struct {
opts selector.Options
}
func (s *staticSelector) Init(opts ...selector.Option) error {
for _, o := range opts {
o(&s.opts)
}
return nil
}
func (s *staticSelector) Options() selector.Options {
return s.opts
}
func (s *staticSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
var port int
addr, pt, err := net.SplitHostPort(service)
if err != nil {
addr = service
port = 0
} else {
port, _ = strconv.Atoi(pt)
}
return func() (*registry.Node, error) {
return &registry.Node{
Id: service,
Address: addr,
Port: port,
}, nil
}, nil
}
func (s *staticSelector) Mark(service string, node *registry.Node, err error) {
return
}
func (s *staticSelector) Reset(service string) {
return
}
func (s *staticSelector) Close() error {
return nil
}
func (s *staticSelector) String() string {
return "static"
}
func NewSelector(opts ...selector.Option) selector.Selector {
var options selector.Options
for _, o := range opts {
o(&options)
}
return &staticSelector{
opts: options,
}
}

View File

@@ -25,8 +25,12 @@ type Options struct {
HdlrWrappers []HandlerWrapper
SubWrappers []SubscriberWrapper
// The register expiry time
RegisterTTL time.Duration
// The router for requests
Router Router
// Debug Handler which can be set by a user
DebugHandler debug.DebugHandler
@@ -164,6 +168,13 @@ func RegisterTTL(t time.Duration) Option {
}
}
// WithRouter sets the request router
func WithRouter(r Router) Option {
return func(o *Options) {
o.Router = r
}
}
// Wait tells the server to wait for requests to finish before exiting
func Wait(b bool) Option {
return func(o *Options) {

View File

@@ -4,7 +4,11 @@ import (
"bytes"
"github.com/micro/go-micro/codec"
raw "github.com/micro/go-micro/codec/bytes"
"github.com/micro/go-micro/codec/grpc"
"github.com/micro/go-micro/codec/json"
"github.com/micro/go-micro/codec/jsonrpc"
"github.com/micro/go-micro/codec/proto"
"github.com/micro/go-micro/codec/protorpc"
"github.com/micro/go-micro/transport"
"github.com/pkg/errors"
@@ -13,6 +17,7 @@ import (
type rpcCodec struct {
socket transport.Socket
codec codec.Codec
first bool
req *transport.Message
buf *readWriteCloser
@@ -24,12 +29,17 @@ type readWriteCloser struct {
}
var (
defaultCodecs = map[string]codec.NewCodec{
"application/json": jsonrpc.NewCodec,
DefaultContentType = "application/protobuf"
DefaultCodecs = map[string]codec.NewCodec{
"application/grpc": grpc.NewCodec,
"application/grpc+json": grpc.NewCodec,
"application/grpc+proto": grpc.NewCodec,
"application/json": json.NewCodec,
"application/json-rpc": jsonrpc.NewCodec,
"application/protobuf": protorpc.NewCodec,
"application/protobuf": proto.NewCodec,
"application/proto-rpc": protorpc.NewCodec,
"application/octet-stream": protorpc.NewCodec,
"application/octet-stream": raw.NewCodec,
}
)
@@ -47,12 +57,13 @@ func (rwc *readWriteCloser) Close() error {
return nil
}
func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCodec) serverCodec {
func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCodec) codec.Codec {
rwc := &readWriteCloser{
rbuf: bytes.NewBuffer(req.Body),
wbuf: bytes.NewBuffer(nil),
}
r := &rpcCodec{
first: true,
buf: rwc,
codec: c(rwc),
req: req,
@@ -61,53 +72,120 @@ func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCod
return r
}
func (c *rpcCodec) ReadRequestHeader(r *request, first bool) error {
m := codec.Message{Header: c.req.Header}
func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
// the initieal message
m := codec.Message{
Header: c.req.Header,
Body: c.req.Body,
}
if !first {
// if its a follow on request read it
if !c.first {
var tm transport.Message
// read off the socket
if err := c.socket.Recv(&tm); err != nil {
return err
}
// reset the read buffer
c.buf.rbuf.Reset()
// write the body to the buffer
if _, err := c.buf.rbuf.Write(tm.Body); err != nil {
return err
}
// set the message header
m.Header = tm.Header
// set the message body
m.Body = tm.Body
}
// no longer first read
c.first = false
// set some internal things
m.Target = m.Header["X-Micro-Service"]
m.Endpoint = m.Header["X-Micro-Endpoint"]
m.Id = m.Header["X-Micro-Id"]
// read header via codec
err := c.codec.ReadHeader(&m, codec.Request)
r.ServiceMethod = m.Method
r.Seq = m.Id
// set the method/id
r.Endpoint = m.Endpoint
r.Id = m.Id
return err
}
func (c *rpcCodec) ReadRequestBody(b interface{}) error {
func (c *rpcCodec) ReadBody(b interface{}) error {
return c.codec.ReadBody(b)
}
func (c *rpcCodec) WriteResponse(r *response, body interface{}, last bool) error {
func (c *rpcCodec) Write(r *codec.Message, b interface{}) error {
c.buf.wbuf.Reset()
// create a new message
m := &codec.Message{
Method: r.ServiceMethod,
Id: r.Seq,
Error: r.Error,
Type: codec.Response,
Header: map[string]string{},
Endpoint: r.Endpoint,
Id: r.Id,
Error: r.Error,
Type: r.Type,
Header: map[string]string{},
}
if err := c.codec.Write(m, body); err != nil {
// set request id
if len(r.Id) > 0 {
m.Header["X-Micro-Id"] = r.Id
}
// set target
if len(r.Target) > 0 {
m.Header["X-Micro-Service"] = r.Target
}
// set request endpoint
if len(r.Endpoint) > 0 {
m.Header["X-Micro-Endpoint"] = r.Endpoint
}
if len(r.Error) > 0 {
m.Header["X-Micro-Error"] = r.Error
}
// the body being sent
var body []byte
// if we have encoded data just send it
if len(r.Body) > 0 {
body = r.Body
// write to the body
} else if err := c.codec.Write(m, b); err != nil {
c.buf.wbuf.Reset()
// write an error if it failed
m.Error = errors.Wrapf(err, "Unable to encode body").Error()
m.Header["X-Micro-Error"] = m.Error
// no body to write
if err := c.codec.Write(m, nil); err != nil {
return err
}
// write the body
} else {
// set the body
body = c.buf.wbuf.Bytes()
}
m.Header["Content-Type"] = c.req.Header["Content-Type"]
// Set content type if theres content
if len(body) > 0 {
m.Header["Content-Type"] = c.req.Header["Content-Type"]
}
// send on the socket
return c.socket.Send(&transport.Message{
Header: m.Header,
Body: c.buf.wbuf.Bytes(),
Body: body,
})
}
@@ -116,3 +194,7 @@ func (c *rpcCodec) Close() error {
c.codec.Close()
return c.socket.Close()
}
func (c *rpcCodec) String() string {
return "rpc"
}

View File

@@ -47,15 +47,14 @@ func TestCodecWriteError(t *testing.T) {
socket: socket,
}
err := c.WriteResponse(&response{
ServiceMethod: "Service.Method",
Seq: 0,
Error: "",
next: nil,
}, "body", false)
err := c.Write(&codec.Message{
Endpoint: "Service.Endpoint",
Id: "0",
Error: "",
}, "body")
if err != nil {
t.Fatalf(`Expected WriteResponse to fail; got "%+v" instead`, err)
t.Fatalf(`Expected Write to fail; got "%+v" instead`, err)
}
const expectedError = "Unable to encode body: simulating a codec write failure"

View File

@@ -1,10 +1,18 @@
package server
import (
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/transport"
)
type rpcRequest struct {
service string
method string
endpoint string
contentType string
request interface{}
socket transport.Socket
codec codec.Codec
header map[string]string
body []byte
stream bool
}
@@ -14,6 +22,10 @@ type rpcMessage struct {
payload interface{}
}
func (r *rpcRequest) Codec() codec.Reader {
return r.codec
}
func (r *rpcRequest) ContentType() string {
return r.contentType
}
@@ -22,12 +34,30 @@ func (r *rpcRequest) Service() string {
return r.service
}
func (r *rpcRequest) Method() string {
return r.method
func (r *rpcRequest) Endpoint() string {
return r.endpoint
}
func (r *rpcRequest) Request() interface{} {
return r.request
func (r *rpcRequest) Header() map[string]string {
return r.header
}
func (r *rpcRequest) Read() ([]byte, error) {
// got a body
if r.body != nil {
b := r.body
r.body = nil
return b, nil
}
var msg transport.Message
err := r.socket.Recv(&msg)
if err != nil {
return nil, err
}
r.header = msg.Header
return msg.Body, nil
}
func (r *rpcRequest) Stream() bool {

35
server/rpc_response.go Normal file
View File

@@ -0,0 +1,35 @@
package server
import (
"net/http"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/transport"
)
type rpcResponse struct {
header map[string]string
socket transport.Socket
codec codec.Codec
}
func (r *rpcResponse) Codec() codec.Writer {
return r.codec
}
func (r *rpcResponse) WriteHeader(hdr map[string]string) {
for k, v := range hdr {
r.header[k] = v
}
}
func (r *rpcResponse) Write(b []byte) error {
if _, ok := r.header["Content-Type"]; !ok {
r.header["Content-Type"] = http.DetectContentType(b)
}
return r.socket.Send(&transport.Message{
Header: r.header,
Body: b,
})
}

View File

@@ -17,6 +17,7 @@ import (
"unicode/utf8"
"github.com/micro/go-log"
"github.com/micro/go-micro/codec"
)
var (
@@ -48,20 +49,17 @@ type service struct {
}
type request struct {
ServiceMethod string // format: "Service.Method"
Seq uint64 // sequence number chosen by client
next *request // for free list in Server
msg *codec.Message
next *request // for free list in Server
}
type response struct {
ServiceMethod string // echoes that of the Request
Seq uint64 // echoes that of the request
Error string // error, if any.
next *response // for free list in Server
msg *codec.Message
next *response // for free list in Server
}
// server represents an RPC Server.
type server struct {
// router represents an RPC router.
type router struct {
name string
mu sync.Mutex // protects the serviceMap
serviceMap map[string]*service
@@ -72,6 +70,12 @@ type server struct {
hdlrWrappers []HandlerWrapper
}
func newRpcRouter() *router {
return &router{
serviceMap: make(map[string]*service),
}
}
// Is this an exported - upper case - name?
func isExported(name string) bool {
rune, _ := utf8.DecodeRuneInString(name)
@@ -158,78 +162,40 @@ func prepareMethod(method reflect.Method) *methodType {
return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream}
}
func (server *server) register(rcvr interface{}) error {
server.mu.Lock()
defer server.mu.Unlock()
if server.serviceMap == nil {
server.serviceMap = make(map[string]*service)
}
s := new(service)
s.typ = reflect.TypeOf(rcvr)
s.rcvr = reflect.ValueOf(rcvr)
sname := reflect.Indirect(s.rcvr).Type().Name()
if sname == "" {
log.Fatal("rpc: no service name for type", s.typ.String())
}
if !isExported(sname) {
s := "rpc Register: type " + sname + " is not exported"
log.Log(s)
return errors.New(s)
}
if _, present := server.serviceMap[sname]; present {
return errors.New("rpc: service already defined: " + sname)
}
s.name = sname
s.method = make(map[string]*methodType)
func (router *router) sendResponse(sending sync.Locker, req *request, reply interface{}, cc codec.Writer, errmsg string, last bool) (err error) {
msg := new(codec.Message)
msg.Type = codec.Response
resp := router.getResponse()
resp.msg = msg
// Install the methods
for m := 0; m < s.typ.NumMethod(); m++ {
method := s.typ.Method(m)
if mt := prepareMethod(method); mt != nil {
s.method[method.Name] = mt
}
}
if len(s.method) == 0 {
s := "rpc Register: type " + sname + " has no exported methods of suitable type"
log.Log(s)
return errors.New(s)
}
server.serviceMap[s.name] = s
return nil
}
func (server *server) sendResponse(sending sync.Locker, req *request, reply interface{}, codec serverCodec, errmsg string, last bool) (err error) {
resp := server.getResponse()
// Encode the response header
resp.ServiceMethod = req.ServiceMethod
resp.msg.Endpoint = req.msg.Endpoint
if errmsg != "" {
resp.Error = errmsg
resp.msg.Error = errmsg
reply = invalidRequest
}
resp.Seq = req.Seq
resp.msg.Id = req.msg.Id
sending.Lock()
err = codec.WriteResponse(resp, reply, last)
err = cc.Write(resp.msg, reply)
sending.Unlock()
server.freeResponse(resp)
router.freeResponse(resp)
return err
}
func (s *service) call(ctx context.Context, server *server, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, codec serverCodec, ct string) {
func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, cc codec.Writer) {
function := mtype.method.Func
var returnValues []reflect.Value
r := &rpcRequest{
service: server.name,
contentType: ct,
method: req.ServiceMethod,
service: req.msg.Target,
contentType: req.msg.Header["Content-Type"],
endpoint: req.msg.Endpoint,
body: req.msg.Body,
}
if !mtype.stream {
r.request = argv.Interface()
fn := func(ctx context.Context, req Request, rsp interface{}) error {
returnValues = function.Call([]reflect.Value{s.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(req.Request()), reflect.ValueOf(rsp)})
returnValues = function.Call([]reflect.Value{s.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(argv.Interface()), reflect.ValueOf(rsp)})
// The return value for the method is an error.
if err := returnValues[0].Interface(); err != nil {
@@ -239,21 +205,17 @@ func (s *service) call(ctx context.Context, server *server, sending *sync.Mutex,
return nil
}
for i := len(server.hdlrWrappers); i > 0; i-- {
fn = server.hdlrWrappers[i-1](fn)
}
errmsg := ""
err := fn(ctx, r, replyv.Interface())
if err != nil {
errmsg = err.Error()
}
err = server.sendResponse(sending, req, replyv.Interface(), codec, errmsg, true)
err = router.sendResponse(sending, req, replyv.Interface(), cc, errmsg, true)
if err != nil {
log.Log("rpc call: unable to send response: ", err)
}
server.freeRequest(req)
router.freeRequest(req)
return
}
@@ -264,9 +226,9 @@ func (s *service) call(ctx context.Context, server *server, sending *sync.Mutex,
stream := &rpcStream{
context: ctx,
codec: codec,
codec: cc.(codec.Codec),
request: r,
seq: req.Seq,
id: req.msg.Id,
}
// Invoke the method, providing a new value for the reply.
@@ -284,10 +246,6 @@ func (s *service) call(ctx context.Context, server *server, sending *sync.Mutex,
}
}
for i := len(server.hdlrWrappers); i > 0; i-- {
fn = server.hdlrWrappers[i-1](fn)
}
// client.Stream request
r.stream = true
@@ -299,8 +257,8 @@ func (s *service) call(ctx context.Context, server *server, sending *sync.Mutex,
// this is the last packet, we don't do anything with
// the error here (well sendStreamResponse will log it
// already)
server.sendResponse(sending, req, nil, codec, errmsg, true)
server.freeRequest(req)
router.sendResponse(sending, req, nil, cc, errmsg, true)
router.freeRequest(req)
}
func (m *methodType) prepareContext(ctx context.Context) reflect.Value {
@@ -310,77 +268,61 @@ func (m *methodType) prepareContext(ctx context.Context) reflect.Value {
return reflect.Zero(m.ContextType)
}
func (server *server) serveRequest(ctx context.Context, codec serverCodec, ct string) error {
sending := new(sync.Mutex)
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
if err != nil {
if !keepReading {
return err
}
// send a response if we actually managed to read a header.
if req != nil {
server.sendResponse(sending, req, invalidRequest, codec, err.Error(), true)
server.freeRequest(req)
}
return err
}
service.call(ctx, server, sending, mtype, req, argv, replyv, codec, ct)
return nil
}
func (server *server) getRequest() *request {
server.reqLock.Lock()
req := server.freeReq
func (router *router) getRequest() *request {
router.reqLock.Lock()
req := router.freeReq
if req == nil {
req = new(request)
} else {
server.freeReq = req.next
router.freeReq = req.next
*req = request{}
}
server.reqLock.Unlock()
router.reqLock.Unlock()
return req
}
func (server *server) freeRequest(req *request) {
server.reqLock.Lock()
req.next = server.freeReq
server.freeReq = req
server.reqLock.Unlock()
func (router *router) freeRequest(req *request) {
router.reqLock.Lock()
req.next = router.freeReq
router.freeReq = req
router.reqLock.Unlock()
}
func (server *server) getResponse() *response {
server.respLock.Lock()
resp := server.freeResp
func (router *router) getResponse() *response {
router.respLock.Lock()
resp := router.freeResp
if resp == nil {
resp = new(response)
} else {
server.freeResp = resp.next
router.freeResp = resp.next
*resp = response{}
}
server.respLock.Unlock()
router.respLock.Unlock()
return resp
}
func (server *server) freeResponse(resp *response) {
server.respLock.Lock()
resp.next = server.freeResp
server.freeResp = resp
server.respLock.Unlock()
func (router *router) freeResponse(resp *response) {
router.respLock.Lock()
resp.next = router.freeResp
router.freeResp = resp
router.respLock.Unlock()
}
func (server *server) readRequest(codec serverCodec) (service *service, mtype *methodType, req *request, argv, replyv reflect.Value, keepReading bool, err error) {
service, mtype, req, keepReading, err = server.readRequestHeader(codec)
func (router *router) readRequest(r Request) (service *service, mtype *methodType, req *request, argv, replyv reflect.Value, keepReading bool, err error) {
cc := r.Codec()
service, mtype, req, keepReading, err = router.readHeader(cc)
if err != nil {
if !keepReading {
return
}
// discard body
codec.ReadRequestBody(nil)
cc.ReadBody(nil)
return
}
// is it a streaming request? then we don't read the body
if mtype.stream {
codec.ReadRequestBody(nil)
cc.ReadBody(nil)
return
}
@@ -393,7 +335,7 @@ func (server *server) readRequest(codec serverCodec) (service *service, mtype *m
argIsValue = true
}
// argv guaranteed to be a pointer now.
if err = codec.ReadRequestBody(argv.Interface()); err != nil {
if err = cc.ReadBody(argv.Interface()); err != nil {
return
}
if argIsValue {
@@ -406,16 +348,20 @@ func (server *server) readRequest(codec serverCodec) (service *service, mtype *m
return
}
func (server *server) readRequestHeader(codec serverCodec) (service *service, mtype *methodType, req *request, keepReading bool, err error) {
func (router *router) readHeader(cc codec.Reader) (service *service, mtype *methodType, req *request, keepReading bool, err error) {
// Grab the request header.
req = server.getRequest()
err = codec.ReadRequestHeader(req, true)
msg := new(codec.Message)
msg.Type = codec.Request
req = router.getRequest()
req.msg = msg
err = cc.ReadHeader(msg, msg.Type)
if err != nil {
req = nil
if err == io.EOF || err == io.ErrUnexpectedEOF {
return
}
err = errors.New("rpc: server cannot decode request: " + err.Error())
err = errors.New("rpc: router cannot decode request: " + err.Error())
return
}
@@ -423,30 +369,89 @@ func (server *server) readRequestHeader(codec serverCodec) (service *service, mt
// we can still recover and move on to the next request.
keepReading = true
serviceMethod := strings.Split(req.ServiceMethod, ".")
serviceMethod := strings.Split(req.msg.Endpoint, ".")
if len(serviceMethod) != 2 {
err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
err = errors.New("rpc: service/endpoint request ill-formed: " + req.msg.Endpoint)
return
}
// Look up the request.
server.mu.Lock()
service = server.serviceMap[serviceMethod[0]]
server.mu.Unlock()
router.mu.Lock()
service = router.serviceMap[serviceMethod[0]]
router.mu.Unlock()
if service == nil {
err = errors.New("rpc: can't find service " + req.ServiceMethod)
err = errors.New("rpc: can't find service " + serviceMethod[0])
return
}
mtype = service.method[serviceMethod[1]]
if mtype == nil {
err = errors.New("rpc: can't find method " + req.ServiceMethod)
err = errors.New("rpc: can't find method " + serviceMethod[1])
}
return
}
type serverCodec interface {
ReadRequestHeader(*request, bool) error
ReadRequestBody(interface{}) error
WriteResponse(*response, interface{}, bool) error
Close() error
func (router *router) NewHandler(h interface{}, opts ...HandlerOption) Handler {
return newRpcHandler(h, opts...)
}
func (router *router) Handle(h Handler) error {
router.mu.Lock()
defer router.mu.Unlock()
if router.serviceMap == nil {
router.serviceMap = make(map[string]*service)
}
if len(h.Name()) == 0 {
return errors.New("rpc.Handle: handler has no name")
}
if !isExported(h.Name()) {
return errors.New("rpc.Handle: type " + h.Name() + " is not exported")
}
rcvr := h.Handler()
s := new(service)
s.typ = reflect.TypeOf(rcvr)
s.rcvr = reflect.ValueOf(rcvr)
// check name
if _, present := router.serviceMap[h.Name()]; present {
return errors.New("rpc.Handle: service already defined: " + h.Name())
}
s.name = h.Name()
s.method = make(map[string]*methodType)
// Install the methods
for m := 0; m < s.typ.NumMethod(); m++ {
method := s.typ.Method(m)
if mt := prepareMethod(method); mt != nil {
s.method[method.Name] = mt
}
}
// Check there are methods
if len(s.method) == 0 {
return errors.New("rpc Register: type " + s.name + " has no exported methods of suitable type")
}
// save handler
router.serviceMap[s.name] = s
return nil
}
func (router *router) ServeRequest(ctx context.Context, r Request, rsp Response) error {
sending := new(sync.Mutex)
service, mtype, req, argv, replyv, keepReading, err := router.readRequest(r)
if err != nil {
if !keepReading {
return err
}
// send a response if we actually managed to read a header.
if req != nil {
router.sendResponse(sending, req, invalidRequest, rsp.Codec(), err.Error(), true)
router.freeRequest(req)
}
return err
}
service.call(ctx, router, sending, mtype, req, argv, replyv, rsp.Codec())
return nil
}

View File

@@ -21,8 +21,8 @@ import (
)
type rpcServer struct {
rpc *server
exit chan chan error
router *router
exit chan chan error
sync.RWMutex
opts Options
@@ -37,19 +37,16 @@ type rpcServer struct {
func newRpcServer(opts ...Option) Server {
options := newOptions(opts...)
return &rpcServer{
opts: options,
rpc: &server{
name: options.Name,
serviceMap: make(map[string]*service),
hdlrWrappers: options.HdlrWrappers,
},
opts: options,
router: newRpcRouter(),
handlers: make(map[string]Handler),
subscribers: make(map[*subscriber][]broker.Subscriber),
exit: make(chan chan error),
}
}
func (s *rpcServer) accept(sock transport.Socket) {
// ServeConn serves a single connection
func (s *rpcServer) ServeConn(sock transport.Socket) {
defer func() {
// close socket
sock.Close()
@@ -74,8 +71,34 @@ func (s *rpcServer) accept(sock transport.Socket) {
// we use this Content-Type header to identify the codec needed
ct := msg.Header["Content-Type"]
cf, err := s.newCodec(ct)
// strip our headers
hdr := make(map[string]string)
for k, v := range msg.Header {
hdr[k] = v
}
// set local/remote ips
hdr["Local"] = sock.Local()
hdr["Remote"] = sock.Remote()
// create new context
ctx := metadata.NewContext(context.Background(), hdr)
// set the timeout if we have it
if len(to) > 0 {
if n, err := strconv.ParseUint(to, 10, 64); err == nil {
ctx, _ = context.WithTimeout(ctx, time.Duration(n))
}
}
// no content type
if len(ct) == 0 {
msg.Header["Content-Type"] = DefaultContentType
ct = DefaultContentType
}
// TODO: needs better error handling
cf, err := s.newCodec(ct)
if err != nil {
sock.Send(&transport.Message{
Header: map[string]string{
@@ -87,35 +110,58 @@ func (s *rpcServer) accept(sock transport.Socket) {
return
}
codec := newRpcCodec(&msg, sock, cf)
rcodec := newRpcCodec(&msg, sock, cf)
// strip our headers
hdr := make(map[string]string)
for k, v := range msg.Header {
hdr[k] = v
}
delete(hdr, "Content-Type")
delete(hdr, "Timeout")
// set local/remote ips
hdr["Local"] = sock.Local()
hdr["Remote"] = sock.Remote()
ctx := metadata.NewContext(context.Background(), hdr)
// set the timeout if we have it
if len(to) > 0 {
if n, err := strconv.ParseUint(to, 10, 64); err == nil {
ctx, _ = context.WithTimeout(ctx, time.Duration(n))
}
// internal request
request := &rpcRequest{
service: msg.Header["X-Micro-Service"],
endpoint: msg.Header["X-Micro-Endpoint"],
contentType: ct,
codec: rcodec,
header: msg.Header,
body: msg.Body,
socket: sock,
stream: true,
}
// TODO: needs better error handling
if err := s.rpc.serveRequest(ctx, codec, ct); err != nil {
// internal response
response := &rpcResponse{
header: make(map[string]string),
socket: sock,
codec: rcodec,
}
// set router
r := s.opts.Router
// if nil use default router
if s.opts.Router == nil {
r = s.router
}
// create a wrapped function
handler := func(ctx context.Context, req Request, rsp interface{}) error {
return r.ServeRequest(ctx, req, rsp.(Response))
}
for i := len(s.opts.HdlrWrappers); i > 0; i-- {
handler = s.opts.HdlrWrappers[i-1](handler)
}
// TODO: handle error better
if err := handler(ctx, request, response); err != nil {
// write an error response
rcodec.Write(&codec.Message{
Header: msg.Header,
Error: err.Error(),
Type: codec.Error,
}, nil)
s.wg.Done()
log.Logf("Unexpected error serving request, closing socket: %v", err)
return
}
// done
s.wg.Done()
}
}
@@ -124,7 +170,7 @@ func (s *rpcServer) newCodec(contentType string) (codec.NewCodec, error) {
if cf, ok := s.opts.Codecs[contentType]; ok {
return cf, nil
}
if cf, ok := defaultCodecs[contentType]; ok {
if cf, ok := DefaultCodecs[contentType]; ok {
return cf, nil
}
return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType)
@@ -142,25 +188,25 @@ func (s *rpcServer) Init(opts ...Option) error {
for _, opt := range opts {
opt(&s.opts)
}
// update internal server
s.rpc = &server{
name: s.opts.Name,
serviceMap: s.rpc.serviceMap,
hdlrWrappers: s.opts.HdlrWrappers,
}
// update router
r := newRpcRouter()
r.serviceMap = s.router.serviceMap
s.router = r
s.Unlock()
return nil
}
func (s *rpcServer) NewHandler(h interface{}, opts ...HandlerOption) Handler {
return newRpcHandler(h, opts...)
return s.router.NewHandler(h, opts...)
}
func (s *rpcServer) Handle(h Handler) error {
s.Lock()
defer s.Unlock()
if err := s.rpc.register(h.Handler()); err != nil {
if err := s.router.Handle(h); err != nil {
return err
}
@@ -401,7 +447,7 @@ func (s *rpcServer) Start() error {
go func() {
for {
err := ts.Accept(s.accept)
err := ts.Accept(s.ServeConn)
// check if we're supposed to exit
select {

View File

@@ -3,16 +3,18 @@ package server
import (
"context"
"sync"
"github.com/micro/go-micro/codec"
)
// Implements the Streamer interface
type rpcStream struct {
sync.RWMutex
seq uint64
id string
closed bool
err error
request Request
codec serverCodec
codec codec.Codec
context context.Context
}
@@ -28,29 +30,31 @@ func (r *rpcStream) Send(msg interface{}) error {
r.Lock()
defer r.Unlock()
resp := response{
ServiceMethod: r.request.Method(),
Seq: r.seq,
resp := codec.Message{
Endpoint: r.request.Endpoint(),
Id: r.id,
Type: codec.Response,
}
return r.codec.WriteResponse(&resp, msg, false)
return r.codec.Write(&resp, msg)
}
func (r *rpcStream) Recv(msg interface{}) error {
r.Lock()
defer r.Unlock()
req := request{}
req := new(codec.Message)
req.Type = codec.Request
if err := r.codec.ReadRequestHeader(&req, false); err != nil {
if err := r.codec.ReadHeader(req, req.Type); err != nil {
// discard body
r.codec.ReadRequestBody(nil)
r.codec.ReadBody(nil)
return err
}
// we need to stay up to date with sequence numbers
r.seq = req.Seq
return r.codec.ReadRequestBody(msg)
r.id = req.Id
return r.codec.ReadBody(msg)
}
func (r *rpcStream) Error() error {

View File

@@ -9,6 +9,7 @@ import (
"github.com/google/uuid"
"github.com/micro/go-log"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/registry"
)
@@ -27,6 +28,12 @@ type Server interface {
String() string
}
// Router handle serving messages
type Router interface {
// ServeRequest processes a request to completion
ServeRequest(context.Context, Request, Response) error
}
// Message is an async message interface
type Message interface {
Topic() string
@@ -36,14 +43,32 @@ type Message interface {
// Request is a synchronous request interface
type Request interface {
// Service name requested
Service() string
Method() string
// Endpoint name requested
Endpoint() string
// Content type provided
ContentType() string
Request() interface{}
// indicates whether the request will be streamed
// Header of the request
Header() map[string]string
// Read the undecoded request body
Read() ([]byte, error)
// The encoded message stream
Codec() codec.Reader
// Indicates whether its a stream
Stream() bool
}
// Response is the response writer for unencoded messages
type Response interface {
// Encoded writer
Codec() codec.Writer
// Write the header
WriteHeader(map[string]string)
// write a response directly to the client
Write([]byte) error
}
// Stream represents a stream established with a client.
// A stream can be bidirectional which is indicated by the request.
// The last error will be left in Error().
@@ -58,7 +83,7 @@ type Stream interface {
}
// Handler interface represents a request handler. It's generated
// by passing any type of public concrete object with methods into server.NewHandler.
// by passing any type of public concrete object with endpoints into server.NewHandler.
// Most will pass in a struct.
//
// Example:
@@ -77,7 +102,7 @@ type Handler interface {
}
// Subscriber interface represents a subscription to a given topic using
// a specific subscriber function or object with methods.
// a specific subscriber function or object with endpoints.
type Subscriber interface {
Topic() string
Subscriber() interface{}
@@ -97,6 +122,7 @@ var (
DefaultVersion = "1.0.0"
DefaultId = uuid.New().String()
DefaultServer Server = newRpcServer()
DefaultRouter = newRpcRouter()
)
// DefaultOptions returns config options for the default service
@@ -125,7 +151,7 @@ func NewSubscriber(topic string, h interface{}, opts ...SubscriberOption) Subscr
// NewHandler creates a new handler interface using the default server
// Handlers are required to be a public object with public
// methods. Call to a service method such as Foo.Bar expects
// endpoints. Call to a service endpoint such as Foo.Bar expects
// the type:
//
// type Foo struct {}

View File

@@ -164,17 +164,29 @@ func validateSubscriber(sub Subscriber) error {
func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
return func(p broker.Publication) error {
msg := p.Message()
// get codec
ct := msg.Header["Content-Type"]
// default content type
if len(ct) == 0 {
msg.Header["Content-Type"] = DefaultContentType
ct = DefaultContentType
}
// get codec
cf, err := s.newCodec(ct)
if err != nil {
return err
}
// copy headers
hdr := make(map[string]string)
for k, v := range msg.Header {
hdr[k] = v
}
delete(hdr, "Content-Type")
// create context
ctx := metadata.NewContext(context.Background(), hdr)
results := make(chan error, len(sb.handlers))

View File

@@ -1,10 +0,0 @@
package codec
// Codec is used for encoding where the transport doesn't natively support
// headers in the message type. In this case the entire message is
// encoded as the payload
type Codec interface {
Marshal(interface{}) ([]byte, error)
Unmarshal([]byte, interface{}) error
String() string
}

View File

@@ -1,25 +0,0 @@
package json
import (
"encoding/json"
"github.com/micro/go-micro/transport/codec"
)
type jsonCodec struct{}
func (j jsonCodec) Marshal(v interface{}) ([]byte, error) {
return json.Marshal(v)
}
func (j jsonCodec) Unmarshal(d []byte, v interface{}) error {
return json.Unmarshal(d, v)
}
func (j jsonCodec) String() string {
return "json"
}
func NewCodec() codec.Codec {
return jsonCodec{}
}

View File

@@ -1,35 +0,0 @@
package noop
import (
"errors"
"github.com/micro/go-micro/transport"
"github.com/micro/go-micro/transport/codec"
)
type noopCodec struct{}
func (n noopCodec) Marshal(v interface{}) ([]byte, error) {
msg, ok := v.(*transport.Message)
if !ok {
return nil, errors.New("invalid message")
}
return msg.Body, nil
}
func (n noopCodec) Unmarshal(d []byte, v interface{}) error {
msg, ok := v.(*transport.Message)
if !ok {
return errors.New("invalid message")
}
msg.Body = d
return nil
}
func (n noopCodec) String() string {
return "noop"
}
func NewCodec() codec.Codec {
return noopCodec{}
}

109
transport/http_proxy.go Normal file
View File

@@ -0,0 +1,109 @@
package transport
import (
"bufio"
"encoding/base64"
"fmt"
"io"
"net"
"net/http"
"net/http/httputil"
"net/url"
)
const (
proxyAuthHeader = "Proxy-Authorization"
)
func getURL(addr string) (*url.URL, error) {
r := &http.Request{
URL: &url.URL{
Scheme: "https",
Host: addr,
},
}
return http.ProxyFromEnvironment(r)
}
type pbuffer struct {
net.Conn
r io.Reader
}
func (p *pbuffer) Read(b []byte) (int, error) {
return p.r.Read(b)
}
func proxyDial(conn net.Conn, addr string, proxyURL *url.URL) (_ net.Conn, err error) {
defer func() {
if err != nil {
conn.Close()
}
}()
r := &http.Request{
Method: http.MethodConnect,
URL: &url.URL{Host: addr},
Header: map[string][]string{"User-Agent": {"micro/latest"}},
}
if user := proxyURL.User; user != nil {
u := user.Username()
p, _ := user.Password()
auth := []byte(u + ":" + p)
basicAuth := base64.StdEncoding.EncodeToString(auth)
r.Header.Add(proxyAuthHeader, "Basic "+basicAuth)
}
if err := r.Write(conn); err != nil {
return nil, fmt.Errorf("failed to write the HTTP request: %v", err)
}
br := bufio.NewReader(conn)
rsp, err := http.ReadResponse(br, r)
if err != nil {
return nil, fmt.Errorf("reading server HTTP response: %v", err)
}
defer rsp.Body.Close()
if rsp.StatusCode != http.StatusOK {
dump, err := httputil.DumpResponse(rsp, true)
if err != nil {
return nil, fmt.Errorf("failed to do connect handshake, status code: %s", rsp.Status)
}
return nil, fmt.Errorf("failed to do connect handshake, response: %q", dump)
}
return &pbuffer{Conn: conn, r: br}, nil
}
// Creates a new connection
func newConn(dial func(string) (net.Conn, error)) func(string) (net.Conn, error) {
return func(addr string) (net.Conn, error) {
// get the proxy url
proxyURL, err := getURL(addr)
if err != nil {
return nil, err
}
// set to addr
callAddr := addr
// got proxy
if proxyURL != nil {
callAddr = proxyURL.Host
}
// dial the addr
c, err := dial(callAddr)
if err != nil {
return nil, err
}
// do proxy connect if we have proxy url
if proxyURL != nil {
c, err = proxyDial(c, addr, proxyURL)
}
return c, err
}
}

View File

@@ -1,7 +1,6 @@
package transport
import (
//"fmt"
"bufio"
"bytes"
"crypto/tls"
@@ -277,6 +276,9 @@ func (h *httpTransportSocket) Recv(m *Message) error {
}
}
// set path
m.Header[":path"] = h.r.URL.Path
return nil
}
@@ -452,9 +454,13 @@ func (h *httpTransport) Dial(addr string, opts ...DialOption) (Client, error) {
InsecureSkipVerify: true,
}
}
conn, err = tls.DialWithDialer(&net.Dialer{Timeout: dopts.Timeout}, "tcp", addr, config)
conn, err = newConn(func(addr string) (net.Conn, error) {
return tls.DialWithDialer(&net.Dialer{Timeout: dopts.Timeout}, "tcp", addr, config)
})(addr)
} else {
conn, err = net.DialTimeout("tcp", addr, dopts.Timeout)
conn, err = newConn(func(addr string) (net.Conn, error) {
return net.DialTimeout("tcp", addr, dopts.Timeout)
})(addr)
}
if err != nil {

View File

@@ -5,12 +5,12 @@ import (
"crypto/tls"
"time"
"github.com/micro/go-micro/transport/codec"
"github.com/micro/go-micro/codec"
)
type Options struct {
Addrs []string
Codec codec.Codec
Codec codec.Marshaler
Secure bool
TLSConfig *tls.Config
// Timeout sets the timeout for Send/Recv
@@ -50,7 +50,7 @@ func Addrs(addrs ...string) Option {
// Codec sets the codec used for encoding where the transport
// does not support message headers
func Codec(c codec.Codec) Option {
func Codec(c codec.Marshaler) Option {
return func(o *Options) {
o.Codec = c
}