Change a few things
This commit is contained in:
		| @@ -4,6 +4,8 @@ package client | |||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
|  | 	"github.com/micro/go-micro/codec" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // Client is the interface used to make requests to services. | // Client is the interface used to make requests to services. | ||||||
| @@ -20,6 +22,11 @@ type Client interface { | |||||||
| 	String() string | 	String() string | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // Router manages request routing | ||||||
|  | type Router interface { | ||||||
|  | 	SendRequest(context.Context, Request) (Response, error) | ||||||
|  | } | ||||||
|  |  | ||||||
| // Message is the interface for publishing asynchronously | // Message is the interface for publishing asynchronously | ||||||
| type Message interface { | type Message interface { | ||||||
| 	Topic() string | 	Topic() string | ||||||
| @@ -29,14 +36,30 @@ type Message interface { | |||||||
|  |  | ||||||
| // Request is the interface for a synchronous request used by Call or Stream | // Request is the interface for a synchronous request used by Call or Stream | ||||||
| type Request interface { | type Request interface { | ||||||
|  | 	// The service to call | ||||||
| 	Service() string | 	Service() string | ||||||
|  | 	// The method to call | ||||||
| 	Method() string | 	Method() string | ||||||
|  | 	// The content type | ||||||
| 	ContentType() string | 	ContentType() string | ||||||
| 	Request() interface{} | 	// The unencoded request body | ||||||
|  | 	Body() interface{} | ||||||
|  | 	// Write to the encoded request writer. This is nil before a call is made | ||||||
|  | 	Codec() codec.Writer | ||||||
| 	// indicates whether the request will be a streaming one rather than unary | 	// indicates whether the request will be a streaming one rather than unary | ||||||
| 	Stream() bool | 	Stream() bool | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // Response is the response received from a service | ||||||
|  | type Response interface { | ||||||
|  | 	// Read the response | ||||||
|  | 	Codec() codec.Reader | ||||||
|  | 	// read the header | ||||||
|  | 	Header() map[string]string | ||||||
|  | 	// Read the undecoded response | ||||||
|  | 	Read() ([]byte, error) | ||||||
|  | } | ||||||
|  |  | ||||||
| // Stream is the inteface for a bidirectional synchronous stream | // Stream is the inteface for a bidirectional synchronous stream | ||||||
| type Stream interface { | type Stream interface { | ||||||
| 	Context() context.Context | 	Context() context.Context | ||||||
|   | |||||||
| @@ -97,7 +97,7 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp | |||||||
| 		request: req, | 		request: req, | ||||||
| 		closed:  make(chan bool), | 		closed:  make(chan bool), | ||||||
| 		codec:   newRpcCodec(msg, c, cf), | 		codec:   newRpcCodec(msg, c, cf), | ||||||
| 		seq:     fmt.Sprintf("%v", seq), | 		id:      fmt.Sprintf("%v", seq), | ||||||
| 	} | 	} | ||||||
| 	defer stream.Close() | 	defer stream.Close() | ||||||
|  |  | ||||||
| @@ -111,7 +111,7 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp | |||||||
| 		}() | 		}() | ||||||
|  |  | ||||||
| 		// send request | 		// send request | ||||||
| 		if err := stream.Send(req.Request()); err != nil { | 		if err := stream.Send(req.Body()); err != nil { | ||||||
| 			ch <- err | 			ch <- err | ||||||
| 			return | 			return | ||||||
| 		} | 		} | ||||||
| @@ -183,7 +183,7 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request, opt | |||||||
| 	ch := make(chan error, 1) | 	ch := make(chan error, 1) | ||||||
|  |  | ||||||
| 	go func() { | 	go func() { | ||||||
| 		ch <- stream.Send(req.Request()) | 		ch <- stream.Send(req.Body()) | ||||||
| 	}() | 	}() | ||||||
|  |  | ||||||
| 	var grr error | 	var grr error | ||||||
|   | |||||||
| @@ -3,7 +3,6 @@ package client | |||||||
| import ( | import ( | ||||||
| 	"bytes" | 	"bytes" | ||||||
| 	errs "errors" | 	errs "errors" | ||||||
| 	"fmt" |  | ||||||
|  |  | ||||||
| 	"github.com/micro/go-micro/codec" | 	"github.com/micro/go-micro/codec" | ||||||
| 	raw "github.com/micro/go-micro/codec/bytes" | 	raw "github.com/micro/go-micro/codec/bytes" | ||||||
| @@ -85,7 +84,7 @@ func (rwc *readWriteCloser) Close() error { | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCodec) *rpcCodec { | func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCodec) codec.Codec { | ||||||
| 	rwc := &readWriteCloser{ | 	rwc := &readWriteCloser{ | ||||||
| 		wbuf: bytes.NewBuffer(nil), | 		wbuf: bytes.NewBuffer(nil), | ||||||
| 		rbuf: bytes.NewBuffer(nil), | 		rbuf: bytes.NewBuffer(nil), | ||||||
| @@ -99,34 +98,45 @@ func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCod | |||||||
| 	return r | 	return r | ||||||
| } | } | ||||||
|  |  | ||||||
| func (c *rpcCodec) Write(req *request, body interface{}) error { | func (c *rpcCodec) Write(wm *codec.Message, body interface{}) error { | ||||||
| 	c.buf.wbuf.Reset() | 	c.buf.wbuf.Reset() | ||||||
|  |  | ||||||
| 	m := &codec.Message{ | 	m := &codec.Message{ | ||||||
| 		Id:     req.Seq, | 		Id:     wm.Id, | ||||||
| 		Target: req.Service, | 		Target: wm.Target, | ||||||
| 		Method: req.ServiceMethod, | 		Method: wm.Method, | ||||||
| 		Type:   codec.Request, | 		Type:   codec.Request, | ||||||
| 		Header: map[string]string{ | 		Header: map[string]string{ | ||||||
| 			"X-Micro-Id":      fmt.Sprintf("%v", req.Seq), | 			"X-Micro-Id":      wm.Id, | ||||||
| 			"X-Micro-Service": req.Service, | 			"X-Micro-Service": wm.Target, | ||||||
| 			"X-Micro-Method":  req.ServiceMethod, | 			"X-Micro-Method":  wm.Method, | ||||||
| 		}, | 		}, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if err := c.codec.Write(m, body); err != nil { | 	if err := c.codec.Write(m, body); err != nil { | ||||||
| 		return errors.InternalServerError("go.micro.client.codec", err.Error()) | 		return errors.InternalServerError("go.micro.client.codec", err.Error()) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// set body | ||||||
|  | 	if len(wm.Body) > 0 { | ||||||
|  | 		c.req.Body = wm.Body | ||||||
|  | 	} else { | ||||||
| 		c.req.Body = c.buf.wbuf.Bytes() | 		c.req.Body = c.buf.wbuf.Bytes() | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// set header | ||||||
| 	for k, v := range m.Header { | 	for k, v := range m.Header { | ||||||
| 		c.req.Header[k] = v | 		c.req.Header[k] = v | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// send the request | ||||||
| 	if err := c.client.Send(c.req); err != nil { | 	if err := c.client.Send(c.req); err != nil { | ||||||
| 		return errors.InternalServerError("go.micro.client.transport", err.Error()) | 		return errors.InternalServerError("go.micro.client.transport", err.Error()) | ||||||
| 	} | 	} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (c *rpcCodec) Read(r *response, b interface{}) error { | func (c *rpcCodec) ReadHeader(wm *codec.Message, r codec.MessageType) error { | ||||||
| 	var m transport.Message | 	var m transport.Message | ||||||
| 	if err := c.client.Recv(&m); err != nil { | 	if err := c.client.Recv(&m); err != nil { | ||||||
| 		return errors.InternalServerError("go.micro.client.transport", err.Error()) | 		return errors.InternalServerError("go.micro.client.transport", err.Error()) | ||||||
| @@ -139,34 +149,38 @@ func (c *rpcCodec) Read(r *response, b interface{}) error { | |||||||
| 	me.Header = m.Header | 	me.Header = m.Header | ||||||
|  |  | ||||||
| 	// read header | 	// read header | ||||||
| 	err := c.codec.ReadHeader(&me, codec.Response) | 	err := c.codec.ReadHeader(&me, r) | ||||||
| 	r.ServiceMethod = me.Method | 	wm.Method = me.Method | ||||||
| 	r.Seq = me.Id | 	wm.Id = me.Id | ||||||
| 	r.Error = me.Error | 	wm.Error = me.Error | ||||||
|  |  | ||||||
| 	// check error in header | 	// check error in header | ||||||
| 	if len(me.Error) == 0 { | 	if len(me.Error) == 0 { | ||||||
| 		r.Error = me.Header["X-Micro-Error"] | 		wm.Error = me.Header["X-Micro-Error"] | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// check method in header | 	// check method in header | ||||||
| 	if len(me.Method) == 0 { | 	if len(me.Method) == 0 { | ||||||
| 		r.ServiceMethod = me.Header["X-Micro-Method"] | 		wm.Method = me.Header["X-Micro-Method"] | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if len(me.Id) == 0 { | 	if len(me.Id) == 0 { | ||||||
| 		r.Seq = me.Header["X-Micro-Id"] | 		wm.Id = me.Header["X-Micro-Id"] | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// return header error | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return errors.InternalServerError("go.micro.client.codec", err.Error()) | 		return errors.InternalServerError("go.micro.client.codec", err.Error()) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *rpcCodec) ReadBody(b interface{}) error { | ||||||
| 	// read body | 	// read body | ||||||
| 	if err := c.codec.ReadBody(b); err != nil { | 	if err := c.codec.ReadBody(b); err != nil { | ||||||
| 		return errors.InternalServerError("go.micro.client.codec", err.Error()) | 		return errors.InternalServerError("go.micro.client.codec", err.Error()) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -178,3 +192,7 @@ func (c *rpcCodec) Close() error { | |||||||
| 	} | 	} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func (c *rpcCodec) String() string { | ||||||
|  | 	return "rpc" | ||||||
|  | } | ||||||
|   | |||||||
| @@ -1,10 +1,15 @@ | |||||||
| package client | package client | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"github.com/micro/go-micro/codec" | ||||||
|  | ) | ||||||
|  |  | ||||||
| type rpcRequest struct { | type rpcRequest struct { | ||||||
| 	service     string | 	service     string | ||||||
| 	method      string | 	method      string | ||||||
| 	contentType string | 	contentType string | ||||||
| 	request     interface{} | 	codec       codec.Codec | ||||||
|  | 	body        interface{} | ||||||
| 	opts        RequestOptions | 	opts        RequestOptions | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -23,7 +28,7 @@ func newRequest(service, method string, request interface{}, contentType string, | |||||||
| 	return &rpcRequest{ | 	return &rpcRequest{ | ||||||
| 		service:     service, | 		service:     service, | ||||||
| 		method:      method, | 		method:      method, | ||||||
| 		request:     request, | 		body:        request, | ||||||
| 		contentType: contentType, | 		contentType: contentType, | ||||||
| 		opts:        opts, | 		opts:        opts, | ||||||
| 	} | 	} | ||||||
| @@ -41,8 +46,12 @@ func (r *rpcRequest) Method() string { | |||||||
| 	return r.method | 	return r.method | ||||||
| } | } | ||||||
|  |  | ||||||
| func (r *rpcRequest) Request() interface{} { | func (r *rpcRequest) Body() interface{} { | ||||||
| 	return r.request | 	return r.body | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (r *rpcRequest) Codec() codec.Writer { | ||||||
|  | 	return r.codec | ||||||
| } | } | ||||||
|  |  | ||||||
| func (r *rpcRequest) Stream() bool { | func (r *rpcRequest) Stream() bool { | ||||||
|   | |||||||
| @@ -4,16 +4,18 @@ import ( | |||||||
| 	"context" | 	"context" | ||||||
| 	"io" | 	"io" | ||||||
| 	"sync" | 	"sync" | ||||||
|  |  | ||||||
|  | 	"github.com/micro/go-micro/codec" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // Implements the streamer interface | // Implements the streamer interface | ||||||
| type rpcStream struct { | type rpcStream struct { | ||||||
| 	sync.RWMutex | 	sync.RWMutex | ||||||
| 	seq     string | 	id      string | ||||||
| 	closed  chan bool | 	closed  chan bool | ||||||
| 	err     error | 	err     error | ||||||
| 	request Request | 	request Request | ||||||
| 	codec   *rpcCodec | 	codec   codec.Codec | ||||||
| 	context context.Context | 	context context.Context | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -43,18 +45,18 @@ func (r *rpcStream) Send(msg interface{}) error { | |||||||
| 		return errShutdown | 		return errShutdown | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	seq := r.seq | 	req := codec.Message{ | ||||||
|  | 		Id:     r.id, | ||||||
| 	req := request{ | 		Target: r.request.Service(), | ||||||
| 		Service:       r.request.Service(), | 		Method: r.request.Method(), | ||||||
| 		Seq:           seq, | 		Type:   codec.Request, | ||||||
| 		ServiceMethod: r.request.Method(), |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if err := r.codec.Write(&req, msg); err != nil { | 	if err := r.codec.Write(&req, msg); err != nil { | ||||||
| 		r.err = err | 		r.err = err | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -67,9 +69,9 @@ func (r *rpcStream) Recv(msg interface{}) error { | |||||||
| 		return errShutdown | 		return errShutdown | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	var resp response | 	var resp codec.Message | ||||||
|  |  | ||||||
| 	if err := r.codec.Read(&resp, msg); err != nil { | 	if err := r.codec.ReadHeader(&resp, codec.Response); err != nil { | ||||||
| 		if err == io.EOF && !r.isClosed() { | 		if err == io.EOF && !r.isClosed() { | ||||||
| 			r.err = io.ErrUnexpectedEOF | 			r.err = io.ErrUnexpectedEOF | ||||||
| 			return io.ErrUnexpectedEOF | 			return io.ErrUnexpectedEOF | ||||||
| @@ -81,13 +83,20 @@ func (r *rpcStream) Recv(msg interface{}) error { | |||||||
| 	switch { | 	switch { | ||||||
| 	case len(resp.Error) > 0: | 	case len(resp.Error) > 0: | ||||||
| 		// We've got an error response. Give this to the request; | 		// We've got an error response. Give this to the request; | ||||||
| 		// any subsequent requests will get the ReadBody | 		// any subsequent requests will get the ReadResponseBody | ||||||
| 		// error if there is one. | 		// error if there is one. | ||||||
| 		if resp.Error != lastStreamResponseError { | 		if resp.Error != lastStreamResponseError { | ||||||
| 			r.err = serverError(resp.Error) | 			r.err = serverError(resp.Error) | ||||||
| 		} else { | 		} else { | ||||||
| 			r.err = io.EOF | 			r.err = io.EOF | ||||||
| 		} | 		} | ||||||
|  | 		if err := r.codec.ReadBody(nil); err != nil { | ||||||
|  | 			r.err = err | ||||||
|  | 		} | ||||||
|  | 	default: | ||||||
|  | 		if err := r.codec.ReadBody(msg); err != nil { | ||||||
|  | 			r.err = err | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	return r.err | 	return r.err | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user