For the legacy

This commit is contained in:
Asim Aslam
2019-01-18 10:12:57 +00:00
parent 9ce9977d21
commit 2cd2258731
14 changed files with 99 additions and 21 deletions

View File

@@ -41,6 +41,15 @@ var (
"application/proto-rpc": protorpc.NewCodec,
"application/octet-stream": raw.NewCodec,
}
// TODO: remove legacy codec list
defaultCodecs = map[string]codec.NewCodec{
"application/json": jsonrpc.NewCodec,
"application/json-rpc": jsonrpc.NewCodec,
"application/protobuf": protorpc.NewCodec,
"application/proto-rpc": protorpc.NewCodec,
"application/octet-stream": protorpc.NewCodec,
}
)
func (rwc *readWriteCloser) Read(p []byte) (n int, err error) {
@@ -57,6 +66,33 @@ func (rwc *readWriteCloser) Close() error {
return nil
}
// setupProtocol sets up the old protocol
func setupProtocol(msg *transport.Message) codec.NewCodec {
// if the protocol exists do nothing
if len(msg.Header["X-Micro-Protocol"]) > 0 {
return nil
}
// if 0.17 - 0.21
if len(msg.Header["X-Micro-Service"]) > 0 {
// set method to endpoint
if len(msg.Header["X-Micro-Method"]) == 0 {
msg.Header["X-Micro-Method"] = msg.Header["X-Micro-Endpoint"]
}
// set endpoint to method
if len(msg.Header["X-Micro-Endpoint"]) == 0 {
msg.Header["X-Micro-Endpoint"] = msg.Header["X-Micro-Method"]
}
// done
return nil
}
// old ways
return defaultCodecs[msg.Header["Content-Type"]]
}
func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCodec) codec.Codec {
rwc := &readWriteCloser{
rbuf: bytes.NewBuffer(req.Body),
@@ -109,6 +145,7 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
// set some internal things
m.Target = m.Header["X-Micro-Service"]
m.Method = m.Header["X-Micro-Method"]
m.Endpoint = m.Header["X-Micro-Endpoint"]
m.Id = m.Header["X-Micro-Id"]
@@ -141,6 +178,8 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error {
// create a new message
m := &codec.Message{
Target: r.Target,
Method: r.Method,
Endpoint: r.Endpoint,
Id: r.Id,
Error: r.Error,
@@ -162,6 +201,11 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error {
m.Header["X-Micro-Service"] = r.Target
}
// set request method
if len(r.Method) > 0 {
m.Header["X-Micro-Method"] = r.Method
}
// set request endpoint
if len(r.Endpoint) > 0 {
m.Header["X-Micro-Endpoint"] = r.Endpoint

View File

@@ -7,6 +7,7 @@ import (
type rpcRequest struct {
service string
method string
endpoint string
contentType string
socket transport.Socket
@@ -34,6 +35,10 @@ func (r *rpcRequest) Service() string {
return r.service
}
func (r *rpcRequest) Method() string {
return r.method
}
func (r *rpcRequest) Endpoint() string {
return r.endpoint
}

View File

@@ -189,6 +189,7 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
r := &rpcRequest{
service: req.msg.Target,
contentType: req.msg.Header["Content-Type"],
method: req.msg.Method,
endpoint: req.msg.Endpoint,
body: req.msg.Body,
}

View File

@@ -97,17 +97,23 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
ct = DefaultContentType
}
// TODO: needs better error handling
cf, err := s.newCodec(ct)
if err != nil {
sock.Send(&transport.Message{
Header: map[string]string{
"Content-Type": "text/plain",
},
Body: []byte(err.Error()),
})
s.wg.Done()
return
// setup old protocol
cf := setupProtocol(&msg)
// no old codec
if cf == nil {
// TODO: needs better error handling
var err error
if cf, err = s.newCodec(ct); err != nil {
sock.Send(&transport.Message{
Header: map[string]string{
"Content-Type": "text/plain",
},
Body: []byte(err.Error()),
})
s.wg.Done()
return
}
}
rcodec := newRpcCodec(&msg, sock, cf)
@@ -115,6 +121,7 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
// internal request
request := &rpcRequest{
service: msg.Header["X-Micro-Service"],
method: msg.Header["X-Micro-Method"],
endpoint: msg.Header["X-Micro-Endpoint"],
contentType: ct,
codec: rcodec,

View File

@@ -31,6 +31,8 @@ func (r *rpcStream) Send(msg interface{}) error {
defer r.Unlock()
resp := codec.Message{
Target: r.request.Service(),
Method: r.request.Method(),
Endpoint: r.request.Endpoint(),
Id: r.id,
Type: codec.Response,

View File

@@ -45,6 +45,8 @@ type Message interface {
type Request interface {
// Service name requested
Service() string
// The action requested
Method() string
// Endpoint name requested
Endpoint() string
// Content type provided