Merge pull request #22 from micro/codec

Codec
This commit is contained in:
Asim 2015-11-28 19:44:50 +00:00
commit 69ee0e4738
22 changed files with 804 additions and 244 deletions

View File

@ -1,13 +1,14 @@
package client package client
import ( import (
"io" "bytes"
) )
type buffer struct { type buffer struct {
io.ReadWriter *bytes.Buffer
} }
func (b *buffer) Close() error { func (b *buffer) Close() error {
b.Buffer.Reset()
return nil return nil
} }

View File

@ -1,75 +0,0 @@
package client
import (
"io"
"net/rpc"
"github.com/youtube/vitess/go/rpcplus"
"github.com/youtube/vitess/go/rpcplus/jsonrpc"
"github.com/youtube/vitess/go/rpcplus/pbrpc"
)
var (
defaultContentType = "application/octet-stream"
defaultCodecs = map[string]codecFunc{
"application/json": jsonrpc.NewClientCodec,
"application/json-rpc": jsonrpc.NewClientCodec,
"application/protobuf": pbrpc.NewClientCodec,
"application/proto-rpc": pbrpc.NewClientCodec,
"application/octet-stream": pbrpc.NewClientCodec,
}
)
type CodecFunc func(io.ReadWriteCloser) rpc.ClientCodec
// only for internal use
type codecFunc func(io.ReadWriteCloser) rpcplus.ClientCodec
// wraps an net/rpc ClientCodec to provide an rpcplus.ClientCodec
// temporary until we strip out use of rpcplus
type rpcCodecWrap struct {
r rpc.ClientCodec
}
func (cw *rpcCodecWrap) WriteRequest(r *rpcplus.Request, b interface{}) error {
rc := &rpc.Request{
ServiceMethod: r.ServiceMethod,
Seq: r.Seq,
}
err := cw.r.WriteRequest(rc, b)
r.ServiceMethod = rc.ServiceMethod
r.Seq = rc.Seq
return err
}
func (cw *rpcCodecWrap) ReadResponseHeader(r *rpcplus.Response) error {
rc := &rpc.Response{
ServiceMethod: r.ServiceMethod,
Seq: r.Seq,
Error: r.Error,
}
err := cw.r.ReadResponseHeader(rc)
r.ServiceMethod = rc.ServiceMethod
r.Seq = rc.Seq
r.Error = r.Error
return err
}
func (cw *rpcCodecWrap) ReadResponseBody(b interface{}) error {
return cw.r.ReadResponseBody(b)
}
func (cw *rpcCodecWrap) Close() error {
return cw.r.Close()
}
// wraps a CodecFunc to provide an internal codecFunc
// temporary until we strip rpcplus out
func codecWrap(cf CodecFunc) codecFunc {
return func(rwc io.ReadWriteCloser) rpcplus.ClientCodec {
return &rpcCodecWrap{
r: cf(rwc),
}
}
}

View File

@ -2,14 +2,15 @@ package client
import ( import (
"github.com/micro/go-micro/broker" "github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
) )
type options struct { type options struct {
contentType string contentType string
codecs map[string]CodecFunc
broker broker.Broker broker broker.Broker
codecs map[string]codec.NewCodec
registry registry.Registry registry registry.Registry
transport transport.Transport transport transport.Transport
wrappers []Wrapper wrappers []Wrapper
@ -23,9 +24,9 @@ func Broker(b broker.Broker) Option {
} }
// Codec to be used to encode/decode requests for a given content type // Codec to be used to encode/decode requests for a given content type
func Codec(contentType string, cf CodecFunc) Option { func Codec(contentType string, c codec.NewCodec) Option {
return func(o *options) { return func(o *options) {
o.codecs[contentType] = cf o.codecs[contentType] = c
} }
} }

View File

@ -1,19 +1,18 @@
package client package client
import ( import (
"encoding/json" "bytes"
"fmt" "fmt"
"sync" "sync"
"github.com/micro/go-micro/broker" "github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec"
c "github.com/micro/go-micro/context" c "github.com/micro/go-micro/context"
"github.com/micro/go-micro/errors" "github.com/micro/go-micro/errors"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
rpc "github.com/youtube/vitess/go/rpcplus" rpc "github.com/youtube/vitess/go/rpcplus"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
@ -26,7 +25,7 @@ func newRpcClient(opt ...Option) Client {
var once sync.Once var once sync.Once
opts := options{ opts := options{
codecs: make(map[string]CodecFunc), codecs: make(map[string]codec.NewCodec),
} }
for _, o := range opt { for _, o := range opt {
@ -60,9 +59,9 @@ func newRpcClient(opt ...Option) Client {
return c return c
} }
func (r *rpcClient) codecFunc(contentType string) (codecFunc, error) { func (r *rpcClient) newCodec(contentType string) (codec.NewCodec, error) {
if cf, ok := r.opts.codecs[contentType]; ok { if c, ok := r.opts.codecs[contentType]; ok {
return codecWrap(cf), nil return c, nil
} }
if cf, ok := defaultCodecs[contentType]; ok { if cf, ok := defaultCodecs[contentType]; ok {
return cf, nil return cf, nil
@ -84,7 +83,7 @@ func (r *rpcClient) call(ctx context.Context, address string, request Request, r
msg.Header["Content-Type"] = request.ContentType() msg.Header["Content-Type"] = request.ContentType()
cf, err := r.codecFunc(request.ContentType()) cf, err := r.newCodec(request.ContentType())
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", err.Error()) return errors.InternalServerError("go.micro.client", err.Error())
} }
@ -117,7 +116,7 @@ func (r *rpcClient) stream(ctx context.Context, address string, request Request,
msg.Header["Content-Type"] = request.ContentType() msg.Header["Content-Type"] = request.ContentType()
cf, err := r.codecFunc(request.ContentType()) cf, err := r.newCodec(request.ContentType())
if err != nil { if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error()) return nil, errors.InternalServerError("go.micro.client", err.Error())
} }
@ -192,30 +191,21 @@ func (r *rpcClient) Publish(ctx context.Context, p Publication) error {
md["Content-Type"] = p.ContentType() md["Content-Type"] = p.ContentType()
// encode message body // encode message body
var body []byte cf, err := r.newCodec(p.ContentType())
if err != nil {
switch p.ContentType() { return errors.InternalServerError("go.micro.client", err.Error())
case "application/octet-stream": }
b, err := proto.Marshal(p.Message().(proto.Message)) b := &buffer{bytes.NewBuffer(nil)}
if err != nil { if err := cf(b).Write(&codec.Message{Type: codec.Publication}, p.Message()); err != nil {
return err return errors.InternalServerError("go.micro.client", err.Error())
}
body = b
case "application/json":
b, err := json.Marshal(p.Message())
if err != nil {
return err
}
body = b
} }
r.once.Do(func() { r.once.Do(func() {
r.opts.broker.Connect() r.opts.broker.Connect()
}) })
return r.opts.broker.Publish(p.Topic(), &broker.Message{ return r.opts.broker.Publish(p.Topic(), &broker.Message{
Header: md, Header: md,
Body: body, Body: b.Bytes(),
}) })
} }

View File

@ -3,13 +3,16 @@ package client
import ( import (
"bytes" "bytes"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/codec/jsonrpc"
"github.com/micro/go-micro/codec/protorpc"
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
rpc "github.com/youtube/vitess/go/rpcplus" rpc "github.com/youtube/vitess/go/rpcplus"
) )
type rpcPlusCodec struct { type rpcPlusCodec struct {
client transport.Client client transport.Client
codec rpc.ClientCodec codec codec.Codec
req *transport.Message req *transport.Message
buf *readWriteCloser buf *readWriteCloser
@ -20,6 +23,18 @@ type readWriteCloser struct {
rbuf *bytes.Buffer rbuf *bytes.Buffer
} }
var (
defaultContentType = "application/octet-stream"
defaultCodecs = map[string]codec.NewCodec{
"application/json": jsonrpc.NewCodec,
"application/json-rpc": jsonrpc.NewCodec,
"application/protobuf": protorpc.NewCodec,
"application/proto-rpc": protorpc.NewCodec,
"application/octet-stream": protorpc.NewCodec,
}
)
func (rwc *readWriteCloser) Read(p []byte) (n int, err error) { func (rwc *readWriteCloser) Read(p []byte) (n int, err error) {
return rwc.rbuf.Read(p) return rwc.rbuf.Read(p)
} }
@ -34,7 +49,7 @@ func (rwc *readWriteCloser) Close() error {
return nil return nil
} }
func newRpcPlusCodec(req *transport.Message, client transport.Client, cf codecFunc) *rpcPlusCodec { func newRpcPlusCodec(req *transport.Message, client transport.Client, c codec.NewCodec) *rpcPlusCodec {
rwc := &readWriteCloser{ rwc := &readWriteCloser{
wbuf: bytes.NewBuffer(nil), wbuf: bytes.NewBuffer(nil),
rbuf: bytes.NewBuffer(nil), rbuf: bytes.NewBuffer(nil),
@ -42,14 +57,19 @@ func newRpcPlusCodec(req *transport.Message, client transport.Client, cf codecFu
r := &rpcPlusCodec{ r := &rpcPlusCodec{
buf: rwc, buf: rwc,
client: client, client: client,
codec: cf(rwc), codec: c(rwc),
req: req, req: req,
} }
return r return r
} }
func (c *rpcPlusCodec) WriteRequest(req *rpc.Request, body interface{}) error { func (c *rpcPlusCodec) WriteRequest(req *rpc.Request, body interface{}) error {
if err := c.codec.WriteRequest(req, body); err != nil { m := &codec.Message{
Id: req.Seq,
Method: req.ServiceMethod,
Type: codec.Request,
}
if err := c.codec.Write(m, body); err != nil {
return err return err
} }
c.req.Body = c.buf.wbuf.Bytes() c.req.Body = c.buf.wbuf.Bytes()
@ -63,14 +83,20 @@ func (c *rpcPlusCodec) ReadResponseHeader(r *rpc.Response) error {
} }
c.buf.rbuf.Reset() c.buf.rbuf.Reset()
c.buf.rbuf.Write(m.Body) c.buf.rbuf.Write(m.Body)
return c.codec.ReadResponseHeader(r) var me codec.Message
err := c.codec.ReadHeader(&me, codec.Response)
r.ServiceMethod = me.Method
r.Seq = me.Id
r.Error = me.Error
return err
} }
func (c *rpcPlusCodec) ReadResponseBody(r interface{}) error { func (c *rpcPlusCodec) ReadResponseBody(b interface{}) error {
return c.codec.ReadResponseBody(r) return c.codec.ReadBody(b)
} }
func (c *rpcPlusCodec) Close() error { func (c *rpcPlusCodec) Close() error {
c.buf.Close() c.buf.Close()
c.codec.Close()
return c.client.Close() return c.client.Close()
} }

47
codec/codec.go Normal file
View File

@ -0,0 +1,47 @@
package codec
import (
"io"
)
const (
Error MessageType = iota
Request
Response
Publication
)
type MessageType int
// Takes in a connection/buffer and returns a new Codec
type NewCodec func(io.ReadWriteCloser) Codec
// Codec encodes/decodes various types of
// messages used within go-micro
type Codec interface {
Encoder
Decoder
Close() error
String() string
}
type Encoder interface {
Write(*Message, interface{}) error
}
type Decoder interface {
ReadHeader(*Message, MessageType) error
ReadBody(interface{}) error
}
// Message represents detailed information about
// the communication, likely followed by the body.
// In the case of an error, body may be nil.
type Message struct {
Id uint64
Type MessageType
Target string
Method string
Error string
Headers map[string]string
}

97
codec/jsonrpc/client.go Normal file
View File

@ -0,0 +1,97 @@
package jsonrpc
import (
"encoding/json"
"fmt"
"io"
"sync"
"github.com/micro/go-micro/codec"
)
type clientCodec struct {
dec *json.Decoder // for reading JSON values
enc *json.Encoder // for writing JSON values
c io.Closer
// temporary work space
req clientRequest
resp clientResponse
sync.Mutex
pending map[uint64]string
}
type clientRequest struct {
Method string `json:"method"`
Params [1]interface{} `json:"params"`
ID uint64 `json:"id"`
}
type clientResponse struct {
ID uint64 `json:"id"`
Result *json.RawMessage `json:"result"`
Error interface{} `json:"error"`
}
func newClientCodec(conn io.ReadWriteCloser) *clientCodec {
return &clientCodec{
dec: json.NewDecoder(conn),
enc: json.NewEncoder(conn),
c: conn,
pending: make(map[uint64]string),
}
}
func (c *clientCodec) Write(m *codec.Message, b interface{}) error {
c.Lock()
c.pending[m.Id] = m.Method
c.Unlock()
c.req.Method = m.Method
c.req.Params[0] = b
c.req.ID = m.Id
return c.enc.Encode(&c.req)
}
func (r *clientResponse) reset() {
r.ID = 0
r.Result = nil
r.Error = nil
}
func (c *clientCodec) ReadHeader(m *codec.Message) error {
c.resp.reset()
if err := c.dec.Decode(&c.resp); err != nil {
return err
}
c.Lock()
m.Method = c.pending[c.resp.ID]
delete(c.pending, c.resp.ID)
c.Unlock()
m.Error = ""
m.Id = c.resp.ID
if c.resp.Error != nil {
x, ok := c.resp.Error.(string)
if !ok {
return fmt.Errorf("invalid error %v", c.resp.Error)
}
if x == "" {
x = "unspecified error"
}
m.Error = x
}
return nil
}
func (c *clientCodec) ReadBody(x interface{}) error {
if x == nil {
return nil
}
return json.Unmarshal(*c.resp.Result, x)
}
func (c *clientCodec) Close() error {
return c.c.Close()
}

88
codec/jsonrpc/json.go Normal file
View File

@ -0,0 +1,88 @@
package jsonrpc
import (
"bytes"
"encoding/json"
"fmt"
"io"
"github.com/micro/go-micro/codec"
)
type jsonCodec struct {
buf *bytes.Buffer
mt codec.MessageType
rwc io.ReadWriteCloser
c *clientCodec
s *serverCodec
}
func (j *jsonCodec) Close() error {
j.buf.Reset()
return j.rwc.Close()
}
func (j *jsonCodec) String() string {
return "json-rpc"
}
func (j *jsonCodec) Write(m *codec.Message, b interface{}) error {
switch m.Type {
case codec.Request:
return j.c.Write(m, b)
case codec.Response:
return j.s.Write(m, b)
case codec.Publication:
data, err := json.Marshal(b)
if err != nil {
return err
}
_, err = j.rwc.Write(data)
return err
default:
return fmt.Errorf("Unrecognised message type: %v", m.Type)
}
return nil
}
func (j *jsonCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
j.buf.Reset()
j.mt = mt
switch mt {
case codec.Request:
return j.s.ReadHeader(m)
case codec.Response:
return j.c.ReadHeader(m)
case codec.Publication:
io.Copy(j.buf, j.rwc)
default:
return fmt.Errorf("Unrecognised message type: %v", mt)
}
return nil
}
func (j *jsonCodec) ReadBody(b interface{}) error {
switch j.mt {
case codec.Request:
return j.s.ReadBody(b)
case codec.Response:
return j.c.ReadBody(b)
case codec.Publication:
if b != nil {
return json.Unmarshal(j.buf.Bytes(), b)
}
default:
return fmt.Errorf("Unrecognised message type: %v", j.mt)
}
return nil
}
func NewCodec(rwc io.ReadWriteCloser) codec.Codec {
return &jsonCodec{
buf: bytes.NewBuffer(nil),
rwc: rwc,
c: newClientCodec(rwc),
s: newServerCodec(rwc),
}
}

111
codec/jsonrpc/server.go Normal file
View File

@ -0,0 +1,111 @@
package jsonrpc
import (
"encoding/json"
"errors"
"io"
"sync"
"github.com/micro/go-micro/codec"
)
type serverCodec struct {
dec *json.Decoder // for reading JSON values
enc *json.Encoder // for writing JSON values
c io.Closer
// temporary work space
req serverRequest
resp serverResponse
sync.Mutex
seq uint64
pending map[uint64]*json.RawMessage
}
type serverRequest struct {
Method string `json:"method"`
Params *json.RawMessage `json:"params"`
ID *json.RawMessage `json:"id"`
}
type serverResponse struct {
ID *json.RawMessage `json:"id"`
Result interface{} `json:"result"`
Error interface{} `json:"error"`
}
func newServerCodec(conn io.ReadWriteCloser) *serverCodec {
return &serverCodec{
dec: json.NewDecoder(conn),
enc: json.NewEncoder(conn),
c: conn,
pending: make(map[uint64]*json.RawMessage),
}
}
func (r *serverRequest) reset() {
r.Method = ""
if r.Params != nil {
*r.Params = (*r.Params)[0:0]
}
if r.ID != nil {
*r.ID = (*r.ID)[0:0]
}
}
func (c *serverCodec) ReadHeader(m *codec.Message) error {
c.req.reset()
if err := c.dec.Decode(&c.req); err != nil {
return err
}
m.Method = c.req.Method
c.Lock()
c.seq++
c.pending[c.seq] = c.req.ID
c.req.ID = nil
m.Id = c.seq
c.Unlock()
return nil
}
func (c *serverCodec) ReadBody(x interface{}) error {
if x == nil {
return nil
}
var params [1]interface{}
params[0] = x
return json.Unmarshal(*c.req.Params, &params)
}
var null = json.RawMessage([]byte("null"))
func (c *serverCodec) Write(m *codec.Message, x interface{}) error {
var resp serverResponse
c.Lock()
b, ok := c.pending[m.Id]
if !ok {
c.Unlock()
return errors.New("invalid sequence number in response")
}
c.Unlock()
if b == nil {
// Invalid request so no id. Use JSON null.
b = &null
}
resp.ID = b
resp.Result = x
if m.Error == "" {
resp.Error = nil
} else {
resp.Error = m.Error
}
return c.enc.Encode(resp)
}
func (c *serverCodec) Close() error {
return c.c.Close()
}

View File

@ -0,0 +1,83 @@
// Code generated by protoc-gen-go.
// source: envelope.proto
// DO NOT EDIT!
/*
Package proto is a generated protocol buffer package.
It is generated from these files:
envelope.proto
It has these top-level messages:
Request
Response
*/
package protorpc
import proto "github.com/golang/protobuf/proto"
import json "encoding/json"
import math "math"
// Reference proto, json, and math imports to suppress error if they are not otherwise used.
var _ = proto.Marshal
var _ = &json.SyntaxError{}
var _ = math.Inf
type Request struct {
ServiceMethod *string `protobuf:"bytes,1,opt,name=service_method" json:"service_method,omitempty"`
Seq *uint64 `protobuf:"fixed64,2,opt,name=seq" json:"seq,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *Request) Reset() { *m = Request{} }
func (m *Request) String() string { return proto.CompactTextString(m) }
func (*Request) ProtoMessage() {}
func (m *Request) GetServiceMethod() string {
if m != nil && m.ServiceMethod != nil {
return *m.ServiceMethod
}
return ""
}
func (m *Request) GetSeq() uint64 {
if m != nil && m.Seq != nil {
return *m.Seq
}
return 0
}
type Response struct {
ServiceMethod *string `protobuf:"bytes,1,opt,name=service_method" json:"service_method,omitempty"`
Seq *uint64 `protobuf:"fixed64,2,opt,name=seq" json:"seq,omitempty"`
Error *string `protobuf:"bytes,3,opt,name=error" json:"error,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *Response) Reset() { *m = Response{} }
func (m *Response) String() string { return proto.CompactTextString(m) }
func (*Response) ProtoMessage() {}
func (m *Response) GetServiceMethod() string {
if m != nil && m.ServiceMethod != nil {
return *m.ServiceMethod
}
return ""
}
func (m *Response) GetSeq() uint64 {
if m != nil && m.Seq != nil {
return *m.Seq
}
return 0
}
func (m *Response) GetError() string {
if m != nil && m.Error != nil {
return *m.Error
}
return ""
}
func init() {
}

View File

@ -0,0 +1,12 @@
package protorpc;
message Request {
optional string service_method = 1;
optional fixed64 seq = 2;
}
message Response {
optional string service_method = 1;
optional fixed64 seq = 2;
optional string error = 3;
}

View File

@ -0,0 +1,36 @@
package protorpc
import (
"encoding/binary"
"io"
)
// WriteNetString writes data to a big-endian netstring on a Writer.
// Size is always a 32-bit unsigned int.
func WriteNetString(w io.Writer, data []byte) (written int, err error) {
size := make([]byte, 4)
binary.BigEndian.PutUint32(size, uint32(len(data)))
if written, err = w.Write(size); err != nil {
return
}
return w.Write(data)
}
// ReadNetString reads data from a big-endian netstring.
func ReadNetString(r io.Reader) (data []byte, err error) {
sizeBuf := make([]byte, 4)
_, err = r.Read(sizeBuf)
if err != nil {
return nil, err
}
size := binary.BigEndian.Uint32(sizeBuf)
if size == 0 {
return nil, nil
}
data = make([]byte, size)
_, err = r.Read(data)
if err != nil {
return nil, err
}
return
}

162
codec/protorpc/proto.go Normal file
View File

@ -0,0 +1,162 @@
package protorpc
import (
"bytes"
"fmt"
"io"
"sync"
"github.com/golang/protobuf/proto"
"github.com/micro/go-micro/codec"
)
type flusher interface {
Flush() error
}
type protoCodec struct {
sync.Mutex
rwc io.ReadWriteCloser
mt codec.MessageType
buf *bytes.Buffer
}
func (c *protoCodec) Close() error {
c.buf.Reset()
return c.rwc.Close()
}
func (c *protoCodec) String() string {
return "proto-rpc"
}
func (c *protoCodec) Write(m *codec.Message, b interface{}) error {
switch m.Type {
case codec.Request:
c.Lock()
defer c.Unlock()
// This is protobuf, of course we copy it.
pbr := &Request{ServiceMethod: &m.Method, Seq: &m.Id}
data, err := proto.Marshal(pbr)
if err != nil {
return err
}
_, err = WriteNetString(c.rwc, data)
if err != nil {
return err
}
// Of course this is a protobuf! Trust me or detonate the program.
data, err = proto.Marshal(b.(proto.Message))
if err != nil {
return err
}
_, err = WriteNetString(c.rwc, data)
if err != nil {
return err
}
if flusher, ok := c.rwc.(flusher); ok {
err = flusher.Flush()
}
case codec.Response:
c.Lock()
defer c.Unlock()
rtmp := &Response{ServiceMethod: &m.Method, Seq: &m.Id, Error: &m.Error}
data, err := proto.Marshal(rtmp)
if err != nil {
return err
}
_, err = WriteNetString(c.rwc, data)
if err != nil {
return err
}
if pb, ok := b.(proto.Message); ok {
data, err = proto.Marshal(pb)
if err != nil {
return err
}
} else {
data = nil
}
_, err = WriteNetString(c.rwc, data)
if err != nil {
return err
}
if flusher, ok := c.rwc.(flusher); ok {
err = flusher.Flush()
}
case codec.Publication:
data, err := proto.Marshal(b.(proto.Message))
if err != nil {
return err
}
c.rwc.Write(data)
default:
return fmt.Errorf("Unrecognised message type: %v", m.Type)
}
return nil
}
func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
c.buf.Reset()
c.mt = mt
switch mt {
case codec.Request:
data, err := ReadNetString(c.rwc)
if err != nil {
return err
}
rtmp := new(Request)
err = proto.Unmarshal(data, rtmp)
if err != nil {
return err
}
m.Method = *rtmp.ServiceMethod
m.Id = *rtmp.Seq
case codec.Response:
data, err := ReadNetString(c.rwc)
if err != nil {
return err
}
rtmp := new(Response)
err = proto.Unmarshal(data, rtmp)
if err != nil {
return err
}
m.Method = *rtmp.ServiceMethod
m.Id = *rtmp.Seq
m.Error = *rtmp.Error
case codec.Publication:
io.Copy(c.buf, c.rwc)
default:
return fmt.Errorf("Unrecognised message type: %v", mt)
}
return nil
}
func (c *protoCodec) ReadBody(b interface{}) error {
var data []byte
switch c.mt {
case codec.Request, codec.Response:
var err error
data, err = ReadNetString(c.rwc)
if err != nil {
return err
}
case codec.Publication:
data = c.buf.Bytes()
default:
return fmt.Errorf("Unrecognised message type: %v", c.mt)
}
if b != nil {
return proto.Unmarshal(data, b.(proto.Message))
}
return nil
}
func NewCodec(rwc io.ReadWriteCloser) codec.Codec {
return &protoCodec{
buf: bytes.NewBuffer(nil),
rwc: rwc,
}
}

View File

@ -113,35 +113,44 @@ func stream() {
return return
} }
stream.Close() if err := stream.Close(); err != nil {
fmt.Println("stream close err:", err)
}
} }
func main() { func main() {
cmd.Init() cmd.Init()
fmt.Println("\n--- Call example ---\n") // client.DefaultClient = client.NewClient(
for i := 0; i < 10; i++ { // client.Codec("application/pb", pb.Codec),
call(i) // client.ContentType("application/pb"),
// )
for {
fmt.Println("\n--- Call example ---\n")
for i := 0; i < 10; i++ {
call(i)
}
fmt.Println("\n--- Streamer example ---\n")
stream()
fmt.Println("\n--- Publisher example ---\n")
pub()
fmt.Println("\n--- Wrapper example ---\n")
// Wrap the default client
client.DefaultClient = logWrap(client.DefaultClient)
call(0)
// Wrap using client.Wrap option
client.DefaultClient = client.NewClient(
client.Wrap(traceWrap),
client.Wrap(logWrap),
)
call(1)
time.Sleep(time.Millisecond * 100)
} }
fmt.Println("\n--- Streamer example ---\n")
stream()
fmt.Println("\n--- Publisher example ---\n")
pub()
fmt.Println("\n--- Wrapper example ---\n")
// Wrap the default client
client.DefaultClient = logWrap(client.DefaultClient)
call(0)
// Wrap using client.Wrap option
client.DefaultClient = client.NewClient(
client.Wrap(traceWrap),
client.Wrap(logWrap),
)
call(1)
} }

View File

@ -12,6 +12,10 @@ func main() {
// optionally setup command line usage // optionally setup command line usage
cmd.Init() cmd.Init()
// server.DefaultServer = server.NewServer(
// server.Codec("application/bson", bson.Codec),
// )
// Initialise Server // Initialise Server
server.Init( server.Init(
server.Name("go.micro.srv.example"), server.Name("go.micro.srv.example"),

View File

@ -1,13 +1,14 @@
package server package server
import ( import (
"io" "bytes"
) )
type buffer struct { type buffer struct {
io.ReadWriter *bytes.Buffer
} }
func (b *buffer) Close() error { func (b *buffer) Close() error {
b.Buffer.Reset()
return nil return nil
} }

View File

@ -1,74 +0,0 @@
package server
import (
"io"
"net/rpc"
"github.com/youtube/vitess/go/rpcplus"
"github.com/youtube/vitess/go/rpcplus/jsonrpc"
"github.com/youtube/vitess/go/rpcplus/pbrpc"
)
var (
defaultCodecs = map[string]codecFunc{
"application/json": jsonrpc.NewServerCodec,
"application/json-rpc": jsonrpc.NewServerCodec,
"application/protobuf": pbrpc.NewServerCodec,
"application/proto-rpc": pbrpc.NewServerCodec,
"application/octet-stream": pbrpc.NewServerCodec,
}
)
// CodecFunc is used to encode/decode requests/responses
type CodecFunc func(io.ReadWriteCloser) rpc.ServerCodec
// for internal use only
type codecFunc func(io.ReadWriteCloser) rpcplus.ServerCodec
// wraps an net/rpc ServerCodec to provide an rpcplus.ServerCodec
// temporary until we strip out use of rpcplus
type rpcCodecWrap struct {
r rpc.ServerCodec
}
func (cw *rpcCodecWrap) ReadRequestHeader(r *rpcplus.Request) error {
rc := &rpc.Request{
ServiceMethod: r.ServiceMethod,
Seq: r.Seq,
}
err := cw.r.ReadRequestHeader(rc)
r.ServiceMethod = rc.ServiceMethod
r.Seq = rc.Seq
return err
}
func (cw *rpcCodecWrap) ReadRequestBody(b interface{}) error {
return cw.r.ReadRequestBody(b)
}
func (cw *rpcCodecWrap) WriteResponse(r *rpcplus.Response, b interface{}, l bool) error {
rc := &rpc.Response{
ServiceMethod: r.ServiceMethod,
Seq: r.Seq,
Error: r.Error,
}
err := cw.r.WriteResponse(rc, b)
r.ServiceMethod = rc.ServiceMethod
r.Seq = rc.Seq
r.Error = r.Error
return err
}
func (cw *rpcCodecWrap) Close() error {
return cw.r.Close()
}
// wraps a CodecFunc to provide an internal codecFunc
// temporary until we strip rpcplus out
func codecWrap(cf CodecFunc) codecFunc {
return func(rwc io.ReadWriteCloser) rpcplus.ServerCodec {
return &rpcCodecWrap{
r: cf(rwc),
}
}
}

View File

@ -2,12 +2,13 @@ package server
import ( import (
"github.com/micro/go-micro/broker" "github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
) )
type options struct { type options struct {
codecs map[string]CodecFunc codecs map[string]codec.NewCodec
broker broker.Broker broker broker.Broker
registry registry.Registry registry registry.Registry
transport transport.Transport transport transport.Transport
@ -21,7 +22,7 @@ type options struct {
func newOptions(opt ...Option) options { func newOptions(opt ...Option) options {
opts := options{ opts := options{
codecs: make(map[string]CodecFunc), codecs: make(map[string]codec.NewCodec),
} }
for _, o := range opt { for _, o := range opt {
@ -126,9 +127,9 @@ func Broker(b broker.Broker) Option {
} }
// Codec to use to encode/decode requests for a given content type // Codec to use to encode/decode requests for a given content type
func Codec(contentType string, cf CodecFunc) Option { func Codec(contentType string, c codec.NewCodec) Option {
return func(o *options) { return func(o *options) {
o.codecs[contentType] = cf o.codecs[contentType] = c
} }
} }

View File

@ -3,13 +3,16 @@ package server
import ( import (
"bytes" "bytes"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/codec/jsonrpc"
"github.com/micro/go-micro/codec/protorpc"
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
rpc "github.com/youtube/vitess/go/rpcplus" rpc "github.com/youtube/vitess/go/rpcplus"
) )
type rpcPlusCodec struct { type rpcPlusCodec struct {
socket transport.Socket socket transport.Socket
codec rpc.ServerCodec codec codec.Codec
req *transport.Message req *transport.Message
buf *readWriteCloser buf *readWriteCloser
@ -20,6 +23,16 @@ type readWriteCloser struct {
rbuf *bytes.Buffer rbuf *bytes.Buffer
} }
var (
defaultCodecs = map[string]codec.NewCodec{
"application/json": jsonrpc.NewCodec,
"application/json-rpc": jsonrpc.NewCodec,
"application/protobuf": protorpc.NewCodec,
"application/proto-rpc": protorpc.NewCodec,
"application/octet-stream": protorpc.NewCodec,
}
)
func (rwc *readWriteCloser) Read(p []byte) (n int, err error) { func (rwc *readWriteCloser) Read(p []byte) (n int, err error) {
return rwc.rbuf.Read(p) return rwc.rbuf.Read(p)
} }
@ -34,14 +47,14 @@ func (rwc *readWriteCloser) Close() error {
return nil return nil
} }
func newRpcPlusCodec(req *transport.Message, socket transport.Socket, cf codecFunc) rpc.ServerCodec { func newRpcPlusCodec(req *transport.Message, socket transport.Socket, c codec.NewCodec) rpc.ServerCodec {
rwc := &readWriteCloser{ rwc := &readWriteCloser{
rbuf: bytes.NewBuffer(req.Body), rbuf: bytes.NewBuffer(req.Body),
wbuf: bytes.NewBuffer(nil), wbuf: bytes.NewBuffer(nil),
} }
r := &rpcPlusCodec{ r := &rpcPlusCodec{
buf: rwc, buf: rwc,
codec: cf(rwc), codec: c(rwc),
req: req, req: req,
socket: socket, socket: socket,
} }
@ -49,16 +62,26 @@ func newRpcPlusCodec(req *transport.Message, socket transport.Socket, cf codecFu
} }
func (c *rpcPlusCodec) ReadRequestHeader(r *rpc.Request) error { func (c *rpcPlusCodec) ReadRequestHeader(r *rpc.Request) error {
return c.codec.ReadRequestHeader(r) var m codec.Message
err := c.codec.ReadHeader(&m, codec.Request)
r.ServiceMethod = m.Method
r.Seq = m.Id
return err
} }
func (c *rpcPlusCodec) ReadRequestBody(r interface{}) error { func (c *rpcPlusCodec) ReadRequestBody(b interface{}) error {
return c.codec.ReadRequestBody(r) return c.codec.ReadBody(b)
} }
func (c *rpcPlusCodec) WriteResponse(r *rpc.Response, body interface{}, last bool) error { func (c *rpcPlusCodec) WriteResponse(r *rpc.Response, body interface{}, last bool) error {
c.buf.wbuf.Reset() c.buf.wbuf.Reset()
if err := c.codec.WriteResponse(r, body, last); err != nil { m := &codec.Message{
Method: r.ServiceMethod,
Id: r.Seq,
Error: r.Error,
Type: codec.Response,
}
if err := c.codec.Write(m, body); err != nil {
return err return err
} }
return c.socket.Send(&transport.Message{ return c.socket.Send(&transport.Message{
@ -69,5 +92,6 @@ func (c *rpcPlusCodec) WriteResponse(r *rpc.Response, body interface{}, last boo
func (c *rpcPlusCodec) Close() error { func (c *rpcPlusCodec) Close() error {
c.buf.Close() c.buf.Close()
c.codec.Close()
return c.socket.Close() return c.socket.Close()
} }

View File

@ -7,6 +7,7 @@ import (
"sync" "sync"
"github.com/micro/go-micro/broker" "github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec"
c "github.com/micro/go-micro/context" c "github.com/micro/go-micro/context"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
@ -43,7 +44,7 @@ func (s *rpcServer) accept(sock transport.Socket) {
return return
} }
cf, err := s.codecFunc(msg.Header["Content-Type"]) cf, err := s.newCodec(msg.Header["Content-Type"])
// TODO: needs better error handling // TODO: needs better error handling
if err != nil { if err != nil {
sock.Send(&transport.Message{ sock.Send(&transport.Message{
@ -73,9 +74,9 @@ func (s *rpcServer) accept(sock transport.Socket) {
} }
} }
func (s *rpcServer) codecFunc(contentType string) (codecFunc, error) { func (s *rpcServer) newCodec(contentType string) (codec.NewCodec, error) {
if cf, ok := s.opts.codecs[contentType]; ok { if cf, ok := s.opts.codecs[contentType]; ok {
return codecWrap(cf), nil return cf, nil
} }
if cf, ok := defaultCodecs[contentType]; ok { if cf, ok := defaultCodecs[contentType]; ok {
return cf, nil return cf, nil
@ -200,7 +201,7 @@ func (s *rpcServer) Register() error {
defer s.Unlock() defer s.Unlock()
for sb, _ := range s.subscribers { for sb, _ := range s.subscribers {
handler := createSubHandler(sb) handler := s.createSubHandler(sb)
sub, err := config.broker.Subscribe(sb.Topic(), handler) sub, err := config.broker.Subscribe(sb.Topic(), handler)
if err != nil { if err != nil {
return err return err

View File

@ -1,11 +1,11 @@
package server package server
import ( import (
"encoding/json" "bytes"
"reflect" "reflect"
"github.com/golang/protobuf/proto"
"github.com/micro/go-micro/broker" "github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec"
c "github.com/micro/go-micro/context" c "github.com/micro/go-micro/context"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
"golang.org/x/net/context" "golang.org/x/net/context"
@ -94,8 +94,19 @@ func newSubscriber(topic string, sub interface{}) Subscriber {
} }
} }
func createSubHandler(sb *subscriber) broker.Handler { func (s *rpcServer) createSubHandler(sb *subscriber) broker.Handler {
return func(msg *broker.Message) { return func(msg *broker.Message) {
cf, err := s.newCodec(msg.Header["Content-Type"])
if err != nil {
return
}
b := &buffer{bytes.NewBuffer(msg.Body)}
co := cf(b)
if err := co.ReadHeader(&codec.Message{}, codec.Publication); err != nil {
return
}
hdr := make(map[string]string) hdr := make(map[string]string)
for k, v := range msg.Header { for k, v := range msg.Header {
hdr[k] = v hdr[k] = v
@ -107,7 +118,6 @@ func createSubHandler(sb *subscriber) broker.Handler {
for _, handler := range sb.handlers { for _, handler := range sb.handlers {
var isVal bool var isVal bool
var req reflect.Value var req reflect.Value
var uerr error
if handler.reqType.Kind() == reflect.Ptr { if handler.reqType.Kind() == reflect.Ptr {
req = reflect.New(handler.reqType.Elem()) req = reflect.New(handler.reqType.Elem())
@ -116,14 +126,7 @@ func createSubHandler(sb *subscriber) broker.Handler {
isVal = true isVal = true
} }
switch msg.Header["Content-Type"] { if err := co.ReadBody(req.Interface()); err != nil {
case "application/octet-stream":
uerr = proto.Unmarshal(msg.Body, req.Interface().(proto.Message))
case "application/json":
uerr = json.Unmarshal(msg.Body, req.Interface())
}
if uerr != nil {
continue continue
} }

View File

@ -22,10 +22,12 @@ type httpTransportClient struct {
ht *httpTransport ht *httpTransport
addr string addr string
conn net.Conn conn net.Conn
buff *bufio.Reader
dialOpts dialOptions dialOpts dialOptions
r chan *http.Request r chan *http.Request
once sync.Once once sync.Once
sync.Mutex
buff *bufio.Reader
} }
type httpTransportSocket struct { type httpTransportSocket struct {
@ -81,6 +83,12 @@ func (h *httpTransportClient) Recv(m *Message) error {
r = rc r = rc
} }
h.Lock()
defer h.Unlock()
if h.buff == nil {
return io.EOF
}
rsp, err := http.ReadResponse(h.buff, r) rsp, err := http.ReadResponse(h.buff, r)
if err != nil { if err != nil {
return err return err
@ -110,11 +118,15 @@ func (h *httpTransportClient) Recv(m *Message) error {
} }
func (h *httpTransportClient) Close() error { func (h *httpTransportClient) Close() error {
h.buff.Reset(nil) err := h.conn.Close()
h.once.Do(func() { h.once.Do(func() {
h.Lock()
h.buff.Reset(nil)
h.buff = nil
h.Unlock()
close(h.r) close(h.r)
}) })
return h.conn.Close() return err
} }
func (h *httpTransportSocket) Recv(m *Message) error { func (h *httpTransportSocket) Recv(m *Message) error {