diff --git a/client/buffer.go b/client/buffer.go index 72c3c529..9eee2b3d 100644 --- a/client/buffer.go +++ b/client/buffer.go @@ -1,13 +1,14 @@ package client import ( - "io" + "bytes" ) type buffer struct { - io.ReadWriter + *bytes.Buffer } func (b *buffer) Close() error { + b.Buffer.Reset() return nil } diff --git a/client/codec.go b/client/codec.go deleted file mode 100644 index b9044fec..00000000 --- a/client/codec.go +++ /dev/null @@ -1,75 +0,0 @@ -package client - -import ( - "io" - "net/rpc" - - "github.com/youtube/vitess/go/rpcplus" - "github.com/youtube/vitess/go/rpcplus/jsonrpc" - "github.com/youtube/vitess/go/rpcplus/pbrpc" -) - -var ( - defaultContentType = "application/octet-stream" - - defaultCodecs = map[string]codecFunc{ - "application/json": jsonrpc.NewClientCodec, - "application/json-rpc": jsonrpc.NewClientCodec, - "application/protobuf": pbrpc.NewClientCodec, - "application/proto-rpc": pbrpc.NewClientCodec, - "application/octet-stream": pbrpc.NewClientCodec, - } -) - -type CodecFunc func(io.ReadWriteCloser) rpc.ClientCodec - -// only for internal use -type codecFunc func(io.ReadWriteCloser) rpcplus.ClientCodec - -// wraps an net/rpc ClientCodec to provide an rpcplus.ClientCodec -// temporary until we strip out use of rpcplus -type rpcCodecWrap struct { - r rpc.ClientCodec -} - -func (cw *rpcCodecWrap) WriteRequest(r *rpcplus.Request, b interface{}) error { - rc := &rpc.Request{ - ServiceMethod: r.ServiceMethod, - Seq: r.Seq, - } - err := cw.r.WriteRequest(rc, b) - r.ServiceMethod = rc.ServiceMethod - r.Seq = rc.Seq - return err -} - -func (cw *rpcCodecWrap) ReadResponseHeader(r *rpcplus.Response) error { - rc := &rpc.Response{ - ServiceMethod: r.ServiceMethod, - Seq: r.Seq, - Error: r.Error, - } - err := cw.r.ReadResponseHeader(rc) - r.ServiceMethod = rc.ServiceMethod - r.Seq = rc.Seq - r.Error = r.Error - return err -} - -func (cw *rpcCodecWrap) ReadResponseBody(b interface{}) error { - return cw.r.ReadResponseBody(b) -} - -func (cw *rpcCodecWrap) Close() error { - return cw.r.Close() -} - -// wraps a CodecFunc to provide an internal codecFunc -// temporary until we strip rpcplus out -func codecWrap(cf CodecFunc) codecFunc { - return func(rwc io.ReadWriteCloser) rpcplus.ClientCodec { - return &rpcCodecWrap{ - r: cf(rwc), - } - } -} diff --git a/client/options.go b/client/options.go index 4d14a804..e8be919f 100644 --- a/client/options.go +++ b/client/options.go @@ -2,14 +2,15 @@ package client import ( "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/codec" "github.com/micro/go-micro/registry" "github.com/micro/go-micro/transport" ) type options struct { contentType string - codecs map[string]CodecFunc broker broker.Broker + codecs map[string]codec.NewCodec registry registry.Registry transport transport.Transport wrappers []Wrapper @@ -23,9 +24,9 @@ func Broker(b broker.Broker) Option { } // Codec to be used to encode/decode requests for a given content type -func Codec(contentType string, cf CodecFunc) Option { +func Codec(contentType string, c codec.NewCodec) Option { return func(o *options) { - o.codecs[contentType] = cf + o.codecs[contentType] = c } } diff --git a/client/rpc_client.go b/client/rpc_client.go index b2184162..6fa72d8f 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -1,19 +1,18 @@ package client import ( - "encoding/json" + "bytes" "fmt" "sync" "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/codec" c "github.com/micro/go-micro/context" "github.com/micro/go-micro/errors" "github.com/micro/go-micro/registry" "github.com/micro/go-micro/transport" rpc "github.com/youtube/vitess/go/rpcplus" - - "github.com/golang/protobuf/proto" "golang.org/x/net/context" ) @@ -26,7 +25,7 @@ func newRpcClient(opt ...Option) Client { var once sync.Once opts := options{ - codecs: make(map[string]CodecFunc), + codecs: make(map[string]codec.NewCodec), } for _, o := range opt { @@ -60,9 +59,9 @@ func newRpcClient(opt ...Option) Client { return c } -func (r *rpcClient) codecFunc(contentType string) (codecFunc, error) { - if cf, ok := r.opts.codecs[contentType]; ok { - return codecWrap(cf), nil +func (r *rpcClient) newCodec(contentType string) (codec.NewCodec, error) { + if c, ok := r.opts.codecs[contentType]; ok { + return c, nil } if cf, ok := defaultCodecs[contentType]; ok { return cf, nil @@ -84,7 +83,7 @@ func (r *rpcClient) call(ctx context.Context, address string, request Request, r msg.Header["Content-Type"] = request.ContentType() - cf, err := r.codecFunc(request.ContentType()) + cf, err := r.newCodec(request.ContentType()) if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } @@ -117,7 +116,7 @@ func (r *rpcClient) stream(ctx context.Context, address string, request Request, msg.Header["Content-Type"] = request.ContentType() - cf, err := r.codecFunc(request.ContentType()) + cf, err := r.newCodec(request.ContentType()) if err != nil { return nil, errors.InternalServerError("go.micro.client", err.Error()) } @@ -192,30 +191,21 @@ func (r *rpcClient) Publish(ctx context.Context, p Publication) error { md["Content-Type"] = p.ContentType() // encode message body - var body []byte - - switch p.ContentType() { - case "application/octet-stream": - b, err := proto.Marshal(p.Message().(proto.Message)) - if err != nil { - return err - } - body = b - case "application/json": - b, err := json.Marshal(p.Message()) - if err != nil { - return err - } - body = b + cf, err := r.newCodec(p.ContentType()) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + b := &buffer{bytes.NewBuffer(nil)} + if err := cf(b).Write(&codec.Message{Type: codec.Publication}, p.Message()); err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) } - r.once.Do(func() { r.opts.broker.Connect() }) return r.opts.broker.Publish(p.Topic(), &broker.Message{ Header: md, - Body: body, + Body: b.Bytes(), }) } diff --git a/client/rpc_codec.go b/client/rpc_codec.go index 95c0b332..867ba100 100644 --- a/client/rpc_codec.go +++ b/client/rpc_codec.go @@ -3,13 +3,16 @@ package client import ( "bytes" + "github.com/micro/go-micro/codec" + "github.com/micro/go-micro/codec/jsonrpc" + "github.com/micro/go-micro/codec/protorpc" "github.com/micro/go-micro/transport" rpc "github.com/youtube/vitess/go/rpcplus" ) type rpcPlusCodec struct { client transport.Client - codec rpc.ClientCodec + codec codec.Codec req *transport.Message buf *readWriteCloser @@ -20,6 +23,18 @@ type readWriteCloser struct { rbuf *bytes.Buffer } +var ( + defaultContentType = "application/octet-stream" + + defaultCodecs = map[string]codec.NewCodec{ + "application/json": jsonrpc.NewCodec, + "application/json-rpc": jsonrpc.NewCodec, + "application/protobuf": protorpc.NewCodec, + "application/proto-rpc": protorpc.NewCodec, + "application/octet-stream": protorpc.NewCodec, + } +) + func (rwc *readWriteCloser) Read(p []byte) (n int, err error) { return rwc.rbuf.Read(p) } @@ -34,7 +49,7 @@ func (rwc *readWriteCloser) Close() error { return nil } -func newRpcPlusCodec(req *transport.Message, client transport.Client, cf codecFunc) *rpcPlusCodec { +func newRpcPlusCodec(req *transport.Message, client transport.Client, c codec.NewCodec) *rpcPlusCodec { rwc := &readWriteCloser{ wbuf: bytes.NewBuffer(nil), rbuf: bytes.NewBuffer(nil), @@ -42,14 +57,19 @@ func newRpcPlusCodec(req *transport.Message, client transport.Client, cf codecFu r := &rpcPlusCodec{ buf: rwc, client: client, - codec: cf(rwc), + codec: c(rwc), req: req, } return r } func (c *rpcPlusCodec) WriteRequest(req *rpc.Request, body interface{}) error { - if err := c.codec.WriteRequest(req, body); err != nil { + m := &codec.Message{ + Id: req.Seq, + Method: req.ServiceMethod, + Type: codec.Request, + } + if err := c.codec.Write(m, body); err != nil { return err } c.req.Body = c.buf.wbuf.Bytes() @@ -63,14 +83,20 @@ func (c *rpcPlusCodec) ReadResponseHeader(r *rpc.Response) error { } c.buf.rbuf.Reset() c.buf.rbuf.Write(m.Body) - return c.codec.ReadResponseHeader(r) + var me codec.Message + err := c.codec.ReadHeader(&me, codec.Response) + r.ServiceMethod = me.Method + r.Seq = me.Id + r.Error = me.Error + return err } -func (c *rpcPlusCodec) ReadResponseBody(r interface{}) error { - return c.codec.ReadResponseBody(r) +func (c *rpcPlusCodec) ReadResponseBody(b interface{}) error { + return c.codec.ReadBody(b) } func (c *rpcPlusCodec) Close() error { c.buf.Close() + c.codec.Close() return c.client.Close() } diff --git a/codec/codec.go b/codec/codec.go new file mode 100644 index 00000000..bb67429f --- /dev/null +++ b/codec/codec.go @@ -0,0 +1,47 @@ +package codec + +import ( + "io" +) + +const ( + Error MessageType = iota + Request + Response + Publication +) + +type MessageType int + +// Takes in a connection/buffer and returns a new Codec +type NewCodec func(io.ReadWriteCloser) Codec + +// Codec encodes/decodes various types of +// messages used within go-micro +type Codec interface { + Encoder + Decoder + Close() error + String() string +} + +type Encoder interface { + Write(*Message, interface{}) error +} + +type Decoder interface { + ReadHeader(*Message, MessageType) error + ReadBody(interface{}) error +} + +// Message represents detailed information about +// the communication, likely followed by the body. +// In the case of an error, body may be nil. +type Message struct { + Id uint64 + Type MessageType + Target string + Method string + Error string + Headers map[string]string +} diff --git a/codec/jsonrpc/client.go b/codec/jsonrpc/client.go new file mode 100644 index 00000000..7b079e05 --- /dev/null +++ b/codec/jsonrpc/client.go @@ -0,0 +1,97 @@ +package jsonrpc + +import ( + "encoding/json" + "fmt" + "io" + "sync" + + "github.com/micro/go-micro/codec" +) + +type clientCodec struct { + dec *json.Decoder // for reading JSON values + enc *json.Encoder // for writing JSON values + c io.Closer + + // temporary work space + req clientRequest + resp clientResponse + + sync.Mutex + pending map[uint64]string +} + +type clientRequest struct { + Method string `json:"method"` + Params [1]interface{} `json:"params"` + ID uint64 `json:"id"` +} + +type clientResponse struct { + ID uint64 `json:"id"` + Result *json.RawMessage `json:"result"` + Error interface{} `json:"error"` +} + +func newClientCodec(conn io.ReadWriteCloser) *clientCodec { + return &clientCodec{ + dec: json.NewDecoder(conn), + enc: json.NewEncoder(conn), + c: conn, + pending: make(map[uint64]string), + } +} + +func (c *clientCodec) Write(m *codec.Message, b interface{}) error { + c.Lock() + c.pending[m.Id] = m.Method + c.Unlock() + c.req.Method = m.Method + c.req.Params[0] = b + c.req.ID = m.Id + return c.enc.Encode(&c.req) +} + +func (r *clientResponse) reset() { + r.ID = 0 + r.Result = nil + r.Error = nil +} + +func (c *clientCodec) ReadHeader(m *codec.Message) error { + c.resp.reset() + if err := c.dec.Decode(&c.resp); err != nil { + return err + } + + c.Lock() + m.Method = c.pending[c.resp.ID] + delete(c.pending, c.resp.ID) + c.Unlock() + + m.Error = "" + m.Id = c.resp.ID + if c.resp.Error != nil { + x, ok := c.resp.Error.(string) + if !ok { + return fmt.Errorf("invalid error %v", c.resp.Error) + } + if x == "" { + x = "unspecified error" + } + m.Error = x + } + return nil +} + +func (c *clientCodec) ReadBody(x interface{}) error { + if x == nil { + return nil + } + return json.Unmarshal(*c.resp.Result, x) +} + +func (c *clientCodec) Close() error { + return c.c.Close() +} diff --git a/codec/jsonrpc/json.go b/codec/jsonrpc/json.go new file mode 100644 index 00000000..b843dfd4 --- /dev/null +++ b/codec/jsonrpc/json.go @@ -0,0 +1,88 @@ +package jsonrpc + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + + "github.com/micro/go-micro/codec" +) + +type jsonCodec struct { + buf *bytes.Buffer + mt codec.MessageType + rwc io.ReadWriteCloser + c *clientCodec + s *serverCodec +} + +func (j *jsonCodec) Close() error { + j.buf.Reset() + return j.rwc.Close() +} + +func (j *jsonCodec) String() string { + return "json-rpc" +} + +func (j *jsonCodec) Write(m *codec.Message, b interface{}) error { + switch m.Type { + case codec.Request: + return j.c.Write(m, b) + case codec.Response: + return j.s.Write(m, b) + case codec.Publication: + data, err := json.Marshal(b) + if err != nil { + return err + } + _, err = j.rwc.Write(data) + return err + default: + return fmt.Errorf("Unrecognised message type: %v", m.Type) + } + return nil +} + +func (j *jsonCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error { + j.buf.Reset() + j.mt = mt + + switch mt { + case codec.Request: + return j.s.ReadHeader(m) + case codec.Response: + return j.c.ReadHeader(m) + case codec.Publication: + io.Copy(j.buf, j.rwc) + default: + return fmt.Errorf("Unrecognised message type: %v", mt) + } + return nil +} + +func (j *jsonCodec) ReadBody(b interface{}) error { + switch j.mt { + case codec.Request: + return j.s.ReadBody(b) + case codec.Response: + return j.c.ReadBody(b) + case codec.Publication: + if b != nil { + return json.Unmarshal(j.buf.Bytes(), b) + } + default: + return fmt.Errorf("Unrecognised message type: %v", j.mt) + } + return nil +} + +func NewCodec(rwc io.ReadWriteCloser) codec.Codec { + return &jsonCodec{ + buf: bytes.NewBuffer(nil), + rwc: rwc, + c: newClientCodec(rwc), + s: newServerCodec(rwc), + } +} diff --git a/codec/jsonrpc/server.go b/codec/jsonrpc/server.go new file mode 100644 index 00000000..bf78ca43 --- /dev/null +++ b/codec/jsonrpc/server.go @@ -0,0 +1,111 @@ +package jsonrpc + +import ( + "encoding/json" + "errors" + "io" + "sync" + + "github.com/micro/go-micro/codec" +) + +type serverCodec struct { + dec *json.Decoder // for reading JSON values + enc *json.Encoder // for writing JSON values + c io.Closer + + // temporary work space + req serverRequest + resp serverResponse + + sync.Mutex + seq uint64 + pending map[uint64]*json.RawMessage +} + +type serverRequest struct { + Method string `json:"method"` + Params *json.RawMessage `json:"params"` + ID *json.RawMessage `json:"id"` +} + +type serverResponse struct { + ID *json.RawMessage `json:"id"` + Result interface{} `json:"result"` + Error interface{} `json:"error"` +} + +func newServerCodec(conn io.ReadWriteCloser) *serverCodec { + return &serverCodec{ + dec: json.NewDecoder(conn), + enc: json.NewEncoder(conn), + c: conn, + pending: make(map[uint64]*json.RawMessage), + } +} + +func (r *serverRequest) reset() { + r.Method = "" + if r.Params != nil { + *r.Params = (*r.Params)[0:0] + } + if r.ID != nil { + *r.ID = (*r.ID)[0:0] + } +} + +func (c *serverCodec) ReadHeader(m *codec.Message) error { + c.req.reset() + if err := c.dec.Decode(&c.req); err != nil { + return err + } + m.Method = c.req.Method + + c.Lock() + c.seq++ + c.pending[c.seq] = c.req.ID + c.req.ID = nil + m.Id = c.seq + c.Unlock() + + return nil +} + +func (c *serverCodec) ReadBody(x interface{}) error { + if x == nil { + return nil + } + var params [1]interface{} + params[0] = x + return json.Unmarshal(*c.req.Params, ¶ms) +} + +var null = json.RawMessage([]byte("null")) + +func (c *serverCodec) Write(m *codec.Message, x interface{}) error { + var resp serverResponse + c.Lock() + b, ok := c.pending[m.Id] + if !ok { + c.Unlock() + return errors.New("invalid sequence number in response") + } + c.Unlock() + + if b == nil { + // Invalid request so no id. Use JSON null. + b = &null + } + resp.ID = b + resp.Result = x + if m.Error == "" { + resp.Error = nil + } else { + resp.Error = m.Error + } + return c.enc.Encode(resp) +} + +func (c *serverCodec) Close() error { + return c.c.Close() +} diff --git a/codec/protorpc/envelope.pb.go b/codec/protorpc/envelope.pb.go new file mode 100644 index 00000000..9af273e0 --- /dev/null +++ b/codec/protorpc/envelope.pb.go @@ -0,0 +1,83 @@ +// Code generated by protoc-gen-go. +// source: envelope.proto +// DO NOT EDIT! + +/* +Package proto is a generated protocol buffer package. + +It is generated from these files: + envelope.proto + +It has these top-level messages: + Request + Response +*/ +package protorpc + +import proto "github.com/golang/protobuf/proto" +import json "encoding/json" +import math "math" + +// Reference proto, json, and math imports to suppress error if they are not otherwise used. +var _ = proto.Marshal +var _ = &json.SyntaxError{} +var _ = math.Inf + +type Request struct { + ServiceMethod *string `protobuf:"bytes,1,opt,name=service_method" json:"service_method,omitempty"` + Seq *uint64 `protobuf:"fixed64,2,opt,name=seq" json:"seq,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Request) Reset() { *m = Request{} } +func (m *Request) String() string { return proto.CompactTextString(m) } +func (*Request) ProtoMessage() {} + +func (m *Request) GetServiceMethod() string { + if m != nil && m.ServiceMethod != nil { + return *m.ServiceMethod + } + return "" +} + +func (m *Request) GetSeq() uint64 { + if m != nil && m.Seq != nil { + return *m.Seq + } + return 0 +} + +type Response struct { + ServiceMethod *string `protobuf:"bytes,1,opt,name=service_method" json:"service_method,omitempty"` + Seq *uint64 `protobuf:"fixed64,2,opt,name=seq" json:"seq,omitempty"` + Error *string `protobuf:"bytes,3,opt,name=error" json:"error,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Response) Reset() { *m = Response{} } +func (m *Response) String() string { return proto.CompactTextString(m) } +func (*Response) ProtoMessage() {} + +func (m *Response) GetServiceMethod() string { + if m != nil && m.ServiceMethod != nil { + return *m.ServiceMethod + } + return "" +} + +func (m *Response) GetSeq() uint64 { + if m != nil && m.Seq != nil { + return *m.Seq + } + return 0 +} + +func (m *Response) GetError() string { + if m != nil && m.Error != nil { + return *m.Error + } + return "" +} + +func init() { +} diff --git a/codec/protorpc/envelope.proto b/codec/protorpc/envelope.proto new file mode 100644 index 00000000..9c4fd19d --- /dev/null +++ b/codec/protorpc/envelope.proto @@ -0,0 +1,12 @@ +package protorpc; + +message Request { + optional string service_method = 1; + optional fixed64 seq = 2; +} + +message Response { + optional string service_method = 1; + optional fixed64 seq = 2; + optional string error = 3; +} diff --git a/codec/protorpc/netstring.go b/codec/protorpc/netstring.go new file mode 100644 index 00000000..8204a0e2 --- /dev/null +++ b/codec/protorpc/netstring.go @@ -0,0 +1,36 @@ +package protorpc + +import ( + "encoding/binary" + "io" +) + +// WriteNetString writes data to a big-endian netstring on a Writer. +// Size is always a 32-bit unsigned int. +func WriteNetString(w io.Writer, data []byte) (written int, err error) { + size := make([]byte, 4) + binary.BigEndian.PutUint32(size, uint32(len(data))) + if written, err = w.Write(size); err != nil { + return + } + return w.Write(data) +} + +// ReadNetString reads data from a big-endian netstring. +func ReadNetString(r io.Reader) (data []byte, err error) { + sizeBuf := make([]byte, 4) + _, err = r.Read(sizeBuf) + if err != nil { + return nil, err + } + size := binary.BigEndian.Uint32(sizeBuf) + if size == 0 { + return nil, nil + } + data = make([]byte, size) + _, err = r.Read(data) + if err != nil { + return nil, err + } + return +} diff --git a/codec/protorpc/proto.go b/codec/protorpc/proto.go new file mode 100644 index 00000000..51f9e06c --- /dev/null +++ b/codec/protorpc/proto.go @@ -0,0 +1,162 @@ +package protorpc + +import ( + "bytes" + "fmt" + "io" + "sync" + + "github.com/golang/protobuf/proto" + "github.com/micro/go-micro/codec" +) + +type flusher interface { + Flush() error +} + +type protoCodec struct { + sync.Mutex + rwc io.ReadWriteCloser + mt codec.MessageType + buf *bytes.Buffer +} + +func (c *protoCodec) Close() error { + c.buf.Reset() + return c.rwc.Close() +} + +func (c *protoCodec) String() string { + return "proto-rpc" +} + +func (c *protoCodec) Write(m *codec.Message, b interface{}) error { + switch m.Type { + case codec.Request: + c.Lock() + defer c.Unlock() + // This is protobuf, of course we copy it. + pbr := &Request{ServiceMethod: &m.Method, Seq: &m.Id} + data, err := proto.Marshal(pbr) + if err != nil { + return err + } + _, err = WriteNetString(c.rwc, data) + if err != nil { + return err + } + // Of course this is a protobuf! Trust me or detonate the program. + data, err = proto.Marshal(b.(proto.Message)) + if err != nil { + return err + } + _, err = WriteNetString(c.rwc, data) + if err != nil { + return err + } + if flusher, ok := c.rwc.(flusher); ok { + err = flusher.Flush() + } + case codec.Response: + c.Lock() + defer c.Unlock() + rtmp := &Response{ServiceMethod: &m.Method, Seq: &m.Id, Error: &m.Error} + data, err := proto.Marshal(rtmp) + if err != nil { + return err + } + _, err = WriteNetString(c.rwc, data) + if err != nil { + return err + } + if pb, ok := b.(proto.Message); ok { + data, err = proto.Marshal(pb) + if err != nil { + return err + } + } else { + data = nil + } + _, err = WriteNetString(c.rwc, data) + if err != nil { + return err + } + if flusher, ok := c.rwc.(flusher); ok { + err = flusher.Flush() + } + case codec.Publication: + data, err := proto.Marshal(b.(proto.Message)) + if err != nil { + return err + } + c.rwc.Write(data) + default: + return fmt.Errorf("Unrecognised message type: %v", m.Type) + } + return nil +} + +func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error { + c.buf.Reset() + c.mt = mt + + switch mt { + case codec.Request: + data, err := ReadNetString(c.rwc) + if err != nil { + return err + } + rtmp := new(Request) + err = proto.Unmarshal(data, rtmp) + if err != nil { + return err + } + m.Method = *rtmp.ServiceMethod + m.Id = *rtmp.Seq + case codec.Response: + data, err := ReadNetString(c.rwc) + if err != nil { + return err + } + rtmp := new(Response) + err = proto.Unmarshal(data, rtmp) + if err != nil { + return err + } + m.Method = *rtmp.ServiceMethod + m.Id = *rtmp.Seq + m.Error = *rtmp.Error + case codec.Publication: + io.Copy(c.buf, c.rwc) + default: + return fmt.Errorf("Unrecognised message type: %v", mt) + } + return nil +} + +func (c *protoCodec) ReadBody(b interface{}) error { + var data []byte + switch c.mt { + case codec.Request, codec.Response: + var err error + data, err = ReadNetString(c.rwc) + if err != nil { + return err + } + case codec.Publication: + data = c.buf.Bytes() + default: + return fmt.Errorf("Unrecognised message type: %v", c.mt) + } + if b != nil { + return proto.Unmarshal(data, b.(proto.Message)) + } + return nil +} + +func NewCodec(rwc io.ReadWriteCloser) codec.Codec { + return &protoCodec{ + buf: bytes.NewBuffer(nil), + rwc: rwc, + } +} diff --git a/examples/client/main.go b/examples/client/main.go index d5602e5d..b59b8f7c 100644 --- a/examples/client/main.go +++ b/examples/client/main.go @@ -113,35 +113,44 @@ func stream() { return } - stream.Close() + if err := stream.Close(); err != nil { + fmt.Println("stream close err:", err) + } } func main() { cmd.Init() - fmt.Println("\n--- Call example ---\n") - for i := 0; i < 10; i++ { - call(i) + // client.DefaultClient = client.NewClient( + // client.Codec("application/pb", pb.Codec), + // client.ContentType("application/pb"), + // ) + for { + fmt.Println("\n--- Call example ---\n") + for i := 0; i < 10; i++ { + call(i) + } + + fmt.Println("\n--- Streamer example ---\n") + stream() + + fmt.Println("\n--- Publisher example ---\n") + pub() + + fmt.Println("\n--- Wrapper example ---\n") + + // Wrap the default client + client.DefaultClient = logWrap(client.DefaultClient) + + call(0) + + // Wrap using client.Wrap option + client.DefaultClient = client.NewClient( + client.Wrap(traceWrap), + client.Wrap(logWrap), + ) + + call(1) + time.Sleep(time.Millisecond * 100) } - - fmt.Println("\n--- Streamer example ---\n") - stream() - - fmt.Println("\n--- Publisher example ---\n") - pub() - - fmt.Println("\n--- Wrapper example ---\n") - - // Wrap the default client - client.DefaultClient = logWrap(client.DefaultClient) - - call(0) - - // Wrap using client.Wrap option - client.DefaultClient = client.NewClient( - client.Wrap(traceWrap), - client.Wrap(logWrap), - ) - - call(1) } diff --git a/examples/server/main.go b/examples/server/main.go index ef535233..846b0241 100644 --- a/examples/server/main.go +++ b/examples/server/main.go @@ -12,6 +12,10 @@ func main() { // optionally setup command line usage cmd.Init() + // server.DefaultServer = server.NewServer( + // server.Codec("application/bson", bson.Codec), + // ) + // Initialise Server server.Init( server.Name("go.micro.srv.example"), diff --git a/server/buffer.go b/server/buffer.go index e833f1a6..4df03c27 100644 --- a/server/buffer.go +++ b/server/buffer.go @@ -1,13 +1,14 @@ package server import ( - "io" + "bytes" ) type buffer struct { - io.ReadWriter + *bytes.Buffer } func (b *buffer) Close() error { + b.Buffer.Reset() return nil } diff --git a/server/codec.go b/server/codec.go deleted file mode 100644 index b229d99a..00000000 --- a/server/codec.go +++ /dev/null @@ -1,74 +0,0 @@ -package server - -import ( - "io" - "net/rpc" - - "github.com/youtube/vitess/go/rpcplus" - "github.com/youtube/vitess/go/rpcplus/jsonrpc" - "github.com/youtube/vitess/go/rpcplus/pbrpc" -) - -var ( - defaultCodecs = map[string]codecFunc{ - "application/json": jsonrpc.NewServerCodec, - "application/json-rpc": jsonrpc.NewServerCodec, - "application/protobuf": pbrpc.NewServerCodec, - "application/proto-rpc": pbrpc.NewServerCodec, - "application/octet-stream": pbrpc.NewServerCodec, - } -) - -// CodecFunc is used to encode/decode requests/responses -type CodecFunc func(io.ReadWriteCloser) rpc.ServerCodec - -// for internal use only -type codecFunc func(io.ReadWriteCloser) rpcplus.ServerCodec - -// wraps an net/rpc ServerCodec to provide an rpcplus.ServerCodec -// temporary until we strip out use of rpcplus -type rpcCodecWrap struct { - r rpc.ServerCodec -} - -func (cw *rpcCodecWrap) ReadRequestHeader(r *rpcplus.Request) error { - rc := &rpc.Request{ - ServiceMethod: r.ServiceMethod, - Seq: r.Seq, - } - err := cw.r.ReadRequestHeader(rc) - r.ServiceMethod = rc.ServiceMethod - r.Seq = rc.Seq - return err -} - -func (cw *rpcCodecWrap) ReadRequestBody(b interface{}) error { - return cw.r.ReadRequestBody(b) -} - -func (cw *rpcCodecWrap) WriteResponse(r *rpcplus.Response, b interface{}, l bool) error { - rc := &rpc.Response{ - ServiceMethod: r.ServiceMethod, - Seq: r.Seq, - Error: r.Error, - } - err := cw.r.WriteResponse(rc, b) - r.ServiceMethod = rc.ServiceMethod - r.Seq = rc.Seq - r.Error = r.Error - return err -} - -func (cw *rpcCodecWrap) Close() error { - return cw.r.Close() -} - -// wraps a CodecFunc to provide an internal codecFunc -// temporary until we strip rpcplus out -func codecWrap(cf CodecFunc) codecFunc { - return func(rwc io.ReadWriteCloser) rpcplus.ServerCodec { - return &rpcCodecWrap{ - r: cf(rwc), - } - } -} diff --git a/server/options.go b/server/options.go index ce90c51a..ca17fb3c 100644 --- a/server/options.go +++ b/server/options.go @@ -2,12 +2,13 @@ package server import ( "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/codec" "github.com/micro/go-micro/registry" "github.com/micro/go-micro/transport" ) type options struct { - codecs map[string]CodecFunc + codecs map[string]codec.NewCodec broker broker.Broker registry registry.Registry transport transport.Transport @@ -21,7 +22,7 @@ type options struct { func newOptions(opt ...Option) options { opts := options{ - codecs: make(map[string]CodecFunc), + codecs: make(map[string]codec.NewCodec), } for _, o := range opt { @@ -126,9 +127,9 @@ func Broker(b broker.Broker) Option { } // Codec to use to encode/decode requests for a given content type -func Codec(contentType string, cf CodecFunc) Option { +func Codec(contentType string, c codec.NewCodec) Option { return func(o *options) { - o.codecs[contentType] = cf + o.codecs[contentType] = c } } diff --git a/server/rpc_codec.go b/server/rpc_codec.go index 31c1a476..44cd45b5 100644 --- a/server/rpc_codec.go +++ b/server/rpc_codec.go @@ -3,13 +3,16 @@ package server import ( "bytes" + "github.com/micro/go-micro/codec" + "github.com/micro/go-micro/codec/jsonrpc" + "github.com/micro/go-micro/codec/protorpc" "github.com/micro/go-micro/transport" rpc "github.com/youtube/vitess/go/rpcplus" ) type rpcPlusCodec struct { socket transport.Socket - codec rpc.ServerCodec + codec codec.Codec req *transport.Message buf *readWriteCloser @@ -20,6 +23,16 @@ type readWriteCloser struct { rbuf *bytes.Buffer } +var ( + defaultCodecs = map[string]codec.NewCodec{ + "application/json": jsonrpc.NewCodec, + "application/json-rpc": jsonrpc.NewCodec, + "application/protobuf": protorpc.NewCodec, + "application/proto-rpc": protorpc.NewCodec, + "application/octet-stream": protorpc.NewCodec, + } +) + func (rwc *readWriteCloser) Read(p []byte) (n int, err error) { return rwc.rbuf.Read(p) } @@ -34,14 +47,14 @@ func (rwc *readWriteCloser) Close() error { return nil } -func newRpcPlusCodec(req *transport.Message, socket transport.Socket, cf codecFunc) rpc.ServerCodec { +func newRpcPlusCodec(req *transport.Message, socket transport.Socket, c codec.NewCodec) rpc.ServerCodec { rwc := &readWriteCloser{ rbuf: bytes.NewBuffer(req.Body), wbuf: bytes.NewBuffer(nil), } r := &rpcPlusCodec{ buf: rwc, - codec: cf(rwc), + codec: c(rwc), req: req, socket: socket, } @@ -49,16 +62,26 @@ func newRpcPlusCodec(req *transport.Message, socket transport.Socket, cf codecFu } func (c *rpcPlusCodec) ReadRequestHeader(r *rpc.Request) error { - return c.codec.ReadRequestHeader(r) + var m codec.Message + err := c.codec.ReadHeader(&m, codec.Request) + r.ServiceMethod = m.Method + r.Seq = m.Id + return err } -func (c *rpcPlusCodec) ReadRequestBody(r interface{}) error { - return c.codec.ReadRequestBody(r) +func (c *rpcPlusCodec) ReadRequestBody(b interface{}) error { + return c.codec.ReadBody(b) } func (c *rpcPlusCodec) WriteResponse(r *rpc.Response, body interface{}, last bool) error { c.buf.wbuf.Reset() - if err := c.codec.WriteResponse(r, body, last); err != nil { + m := &codec.Message{ + Method: r.ServiceMethod, + Id: r.Seq, + Error: r.Error, + Type: codec.Response, + } + if err := c.codec.Write(m, body); err != nil { return err } return c.socket.Send(&transport.Message{ @@ -69,5 +92,6 @@ func (c *rpcPlusCodec) WriteResponse(r *rpc.Response, body interface{}, last boo func (c *rpcPlusCodec) Close() error { c.buf.Close() + c.codec.Close() return c.socket.Close() } diff --git a/server/rpc_server.go b/server/rpc_server.go index 98755db7..8e55fe15 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/codec" c "github.com/micro/go-micro/context" "github.com/micro/go-micro/registry" "github.com/micro/go-micro/transport" @@ -43,7 +44,7 @@ func (s *rpcServer) accept(sock transport.Socket) { return } - cf, err := s.codecFunc(msg.Header["Content-Type"]) + cf, err := s.newCodec(msg.Header["Content-Type"]) // TODO: needs better error handling if err != nil { sock.Send(&transport.Message{ @@ -73,9 +74,9 @@ func (s *rpcServer) accept(sock transport.Socket) { } } -func (s *rpcServer) codecFunc(contentType string) (codecFunc, error) { +func (s *rpcServer) newCodec(contentType string) (codec.NewCodec, error) { if cf, ok := s.opts.codecs[contentType]; ok { - return codecWrap(cf), nil + return cf, nil } if cf, ok := defaultCodecs[contentType]; ok { return cf, nil @@ -200,7 +201,7 @@ func (s *rpcServer) Register() error { defer s.Unlock() for sb, _ := range s.subscribers { - handler := createSubHandler(sb) + handler := s.createSubHandler(sb) sub, err := config.broker.Subscribe(sb.Topic(), handler) if err != nil { return err diff --git a/server/subscriber.go b/server/subscriber.go index 9395bc44..ea4d82f0 100644 --- a/server/subscriber.go +++ b/server/subscriber.go @@ -1,11 +1,11 @@ package server import ( - "encoding/json" + "bytes" "reflect" - "github.com/golang/protobuf/proto" "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/codec" c "github.com/micro/go-micro/context" "github.com/micro/go-micro/registry" "golang.org/x/net/context" @@ -94,8 +94,19 @@ func newSubscriber(topic string, sub interface{}) Subscriber { } } -func createSubHandler(sb *subscriber) broker.Handler { +func (s *rpcServer) createSubHandler(sb *subscriber) broker.Handler { return func(msg *broker.Message) { + cf, err := s.newCodec(msg.Header["Content-Type"]) + if err != nil { + return + } + + b := &buffer{bytes.NewBuffer(msg.Body)} + co := cf(b) + if err := co.ReadHeader(&codec.Message{}, codec.Publication); err != nil { + return + } + hdr := make(map[string]string) for k, v := range msg.Header { hdr[k] = v @@ -107,7 +118,6 @@ func createSubHandler(sb *subscriber) broker.Handler { for _, handler := range sb.handlers { var isVal bool var req reflect.Value - var uerr error if handler.reqType.Kind() == reflect.Ptr { req = reflect.New(handler.reqType.Elem()) @@ -116,14 +126,7 @@ func createSubHandler(sb *subscriber) broker.Handler { isVal = true } - switch msg.Header["Content-Type"] { - case "application/octet-stream": - uerr = proto.Unmarshal(msg.Body, req.Interface().(proto.Message)) - case "application/json": - uerr = json.Unmarshal(msg.Body, req.Interface()) - } - - if uerr != nil { + if err := co.ReadBody(req.Interface()); err != nil { continue } diff --git a/transport/http_transport.go b/transport/http_transport.go index 429e7627..91527e57 100644 --- a/transport/http_transport.go +++ b/transport/http_transport.go @@ -22,10 +22,12 @@ type httpTransportClient struct { ht *httpTransport addr string conn net.Conn - buff *bufio.Reader dialOpts dialOptions r chan *http.Request once sync.Once + + sync.Mutex + buff *bufio.Reader } type httpTransportSocket struct { @@ -81,6 +83,12 @@ func (h *httpTransportClient) Recv(m *Message) error { r = rc } + h.Lock() + defer h.Unlock() + if h.buff == nil { + return io.EOF + } + rsp, err := http.ReadResponse(h.buff, r) if err != nil { return err @@ -110,11 +118,15 @@ func (h *httpTransportClient) Recv(m *Message) error { } func (h *httpTransportClient) Close() error { - h.buff.Reset(nil) + err := h.conn.Close() h.once.Do(func() { + h.Lock() + h.buff.Reset(nil) + h.buff = nil + h.Unlock() close(h.r) }) - return h.conn.Close() + return err } func (h *httpTransportSocket) Recv(m *Message) error {