Update headers to remove X- prefix
This commit is contained in:
parent
7542aafd29
commit
8090f9968d
@ -487,8 +487,8 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
|
|||||||
|
|
||||||
id := uuid.New().String()
|
id := uuid.New().String()
|
||||||
md["Content-Type"] = msg.ContentType()
|
md["Content-Type"] = msg.ContentType()
|
||||||
md["X-Micro-Topic"] = msg.Topic()
|
md["Micro-Topic"] = msg.Topic()
|
||||||
md["X-Micro-Id"] = id
|
md["Micro-Id"] = id
|
||||||
|
|
||||||
// encode message body
|
// encode message body
|
||||||
cf, err := r.newCodec(msg.ContentType())
|
cf, err := r.newCodec(msg.ContentType())
|
||||||
@ -500,8 +500,8 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
|
|||||||
Target: msg.Topic(),
|
Target: msg.Topic(),
|
||||||
Type: codec.Publication,
|
Type: codec.Publication,
|
||||||
Header: map[string]string{
|
Header: map[string]string{
|
||||||
"X-Micro-Id": id,
|
"Micro-Id": id,
|
||||||
"X-Micro-Topic": msg.Topic(),
|
"Micro-Topic": msg.Topic(),
|
||||||
},
|
},
|
||||||
}, msg.Payload()); err != nil {
|
}, msg.Payload()); err != nil {
|
||||||
return errors.InternalServerError("go.micro.client", err.Error())
|
return errors.InternalServerError("go.micro.client", err.Error())
|
||||||
|
@ -84,6 +84,47 @@ func (rwc *readWriteCloser) Close() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getHeaders(m *codec.Message) {
|
||||||
|
get := func(hdr string) string {
|
||||||
|
if hd := m.Header[hdr]; len(hd) > 0 {
|
||||||
|
return hd
|
||||||
|
}
|
||||||
|
// old
|
||||||
|
return m.Header["X-"+hdr]
|
||||||
|
}
|
||||||
|
|
||||||
|
// check error in header
|
||||||
|
if len(m.Error) == 0 {
|
||||||
|
m.Error = get("Micro-Error")
|
||||||
|
}
|
||||||
|
|
||||||
|
// check endpoint in header
|
||||||
|
if len(m.Endpoint) == 0 {
|
||||||
|
m.Endpoint = get("Micro-Endpoint")
|
||||||
|
}
|
||||||
|
|
||||||
|
// check method in header
|
||||||
|
if len(m.Method) == 0 {
|
||||||
|
m.Method = get("Micro-Method")
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(m.Id) == 0 {
|
||||||
|
m.Id = get("Micro-Id")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func setHeaders(m *codec.Message) {
|
||||||
|
set := func(hdr, v string) {
|
||||||
|
m.Header[hdr] = v
|
||||||
|
m.Header["X-"+hdr] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
set("Micro-Id", m.Id)
|
||||||
|
set("Micro-Service", m.Target)
|
||||||
|
set("Micro-Method", m.Method)
|
||||||
|
set("Micro-Endpoint", m.Endpoint)
|
||||||
|
}
|
||||||
|
|
||||||
// setupProtocol sets up the old protocol
|
// setupProtocol sets up the old protocol
|
||||||
func setupProtocol(msg *transport.Message, node *registry.Node) codec.NewCodec {
|
func setupProtocol(msg *transport.Message, node *registry.Node) codec.NewCodec {
|
||||||
protocol := node.Metadata["protocol"]
|
protocol := node.Metadata["protocol"]
|
||||||
@ -133,10 +174,7 @@ func (c *rpcCodec) Write(m *codec.Message, body interface{}) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// set the mucp headers
|
// set the mucp headers
|
||||||
m.Header["X-Micro-Id"] = m.Id
|
setHeaders(m)
|
||||||
m.Header["X-Micro-Service"] = m.Target
|
|
||||||
m.Header["X-Micro-Method"] = m.Method
|
|
||||||
m.Header["X-Micro-Endpoint"] = m.Endpoint
|
|
||||||
|
|
||||||
// if body is bytes Frame don't encode
|
// if body is bytes Frame don't encode
|
||||||
if body != nil {
|
if body != nil {
|
||||||
@ -171,43 +209,25 @@ func (c *rpcCodec) Write(m *codec.Message, body interface{}) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *rpcCodec) ReadHeader(wm *codec.Message, r codec.MessageType) error {
|
func (c *rpcCodec) ReadHeader(m *codec.Message, r codec.MessageType) error {
|
||||||
var m transport.Message
|
var tm transport.Message
|
||||||
if err := c.client.Recv(&m); err != nil {
|
|
||||||
|
// read message from transport
|
||||||
|
if err := c.client.Recv(&tm); err != nil {
|
||||||
return errors.InternalServerError("go.micro.client.transport", err.Error())
|
return errors.InternalServerError("go.micro.client.transport", err.Error())
|
||||||
}
|
}
|
||||||
c.buf.rbuf.Reset()
|
|
||||||
c.buf.rbuf.Write(m.Body)
|
|
||||||
|
|
||||||
var me codec.Message
|
c.buf.rbuf.Reset()
|
||||||
// set headers
|
c.buf.rbuf.Write(tm.Body)
|
||||||
me.Header = m.Header
|
|
||||||
|
// set headers from transport
|
||||||
|
m.Header = tm.Header
|
||||||
|
|
||||||
// read header
|
// read header
|
||||||
err := c.codec.ReadHeader(&me, r)
|
err := c.codec.ReadHeader(m, r)
|
||||||
wm.Endpoint = me.Endpoint
|
|
||||||
wm.Method = me.Method
|
|
||||||
wm.Id = me.Id
|
|
||||||
wm.Error = me.Error
|
|
||||||
|
|
||||||
// check error in header
|
// get headers
|
||||||
if len(me.Error) == 0 {
|
getHeaders(m)
|
||||||
wm.Error = me.Header["X-Micro-Error"]
|
|
||||||
}
|
|
||||||
|
|
||||||
// check endpoint in header
|
|
||||||
if len(me.Endpoint) == 0 {
|
|
||||||
wm.Endpoint = me.Header["X-Micro-Endpoint"]
|
|
||||||
}
|
|
||||||
|
|
||||||
// check method in header
|
|
||||||
if len(me.Method) == 0 {
|
|
||||||
wm.Method = me.Header["X-Micro-Method"]
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(me.Id) == 0 {
|
|
||||||
wm.Id = me.Header["X-Micro-Id"]
|
|
||||||
}
|
|
||||||
|
|
||||||
// return header error
|
// return header error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -29,8 +29,8 @@ func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error {
|
|||||||
// service method
|
// service method
|
||||||
path := m.Header[":path"]
|
path := m.Header[":path"]
|
||||||
if len(path) == 0 || path[0] != '/' {
|
if len(path) == 0 || path[0] != '/' {
|
||||||
m.Target = m.Header["X-Micro-Service"]
|
m.Target = m.Header["Micro-Service"]
|
||||||
m.Endpoint = m.Header["X-Micro-Endpoint"]
|
m.Endpoint = m.Header["Micro-Endpoint"]
|
||||||
} else {
|
} else {
|
||||||
// [ , a.package.Foo, Bar]
|
// [ , a.package.Foo, Bar]
|
||||||
parts := strings.Split(path, "/")
|
parts := strings.Split(path, "/")
|
||||||
|
2
micro.go
2
micro.go
@ -42,7 +42,7 @@ type Publisher interface {
|
|||||||
type Option func(*Options)
|
type Option func(*Options)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
HeaderPrefix = "X-Micro-"
|
HeaderPrefix = "Micro-"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewService creates and returns a new Service based on the packages within.
|
// NewService creates and returns a new Service based on the packages within.
|
||||||
|
@ -66,13 +66,63 @@ func (rwc *readWriteCloser) Close() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getHeader(hdr string, md map[string]string) string {
|
||||||
|
if hd := md[hdr]; len(hd) > 0 {
|
||||||
|
return hd
|
||||||
|
}
|
||||||
|
return md["X-"+hdr]
|
||||||
|
}
|
||||||
|
|
||||||
|
func getHeaders(m *codec.Message) {
|
||||||
|
get := func(hdr, v string) string {
|
||||||
|
if len(v) > 0 {
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
|
||||||
|
if hd := m.Header[hdr]; len(hd) > 0 {
|
||||||
|
return hd
|
||||||
|
}
|
||||||
|
|
||||||
|
// old
|
||||||
|
return m.Header["X-"+hdr]
|
||||||
|
}
|
||||||
|
|
||||||
|
m.Id = get("Micro-Id", m.Id)
|
||||||
|
m.Error = get("Micro-Error", m.Error)
|
||||||
|
m.Endpoint = get("Micro-Endpoint", m.Endpoint)
|
||||||
|
m.Method = get("Micro-Method", m.Method)
|
||||||
|
m.Target = get("Micro-Service", m.Target)
|
||||||
|
|
||||||
|
// TODO: remove this cruft
|
||||||
|
if len(m.Endpoint) == 0 {
|
||||||
|
m.Endpoint = m.Method
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func setHeaders(m, r *codec.Message) {
|
||||||
|
set := func(hdr, v string) {
|
||||||
|
if len(v) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
m.Header[hdr] = v
|
||||||
|
m.Header["X-"+hdr] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
// set headers
|
||||||
|
set("Micro-Id", r.Id)
|
||||||
|
set("Micro-Service", r.Target)
|
||||||
|
set("Micro-Method", r.Method)
|
||||||
|
set("Micro-Endpoint", r.Endpoint)
|
||||||
|
set("Micro-Error", r.Error)
|
||||||
|
}
|
||||||
|
|
||||||
// setupProtocol sets up the old protocol
|
// setupProtocol sets up the old protocol
|
||||||
func setupProtocol(msg *transport.Message) codec.NewCodec {
|
func setupProtocol(msg *transport.Message) codec.NewCodec {
|
||||||
service := msg.Header["X-Micro-Service"]
|
service := getHeader("Micro-Service", msg.Header)
|
||||||
method := msg.Header["X-Micro-Method"]
|
method := getHeader("Micro-Method", msg.Header)
|
||||||
endpoint := msg.Header["X-Micro-Endpoint"]
|
endpoint := getHeader("Micro-Endpoint", msg.Header)
|
||||||
protocol := msg.Header["X-Micro-Protocol"]
|
protocol := getHeader("Micro-Protocol", msg.Header)
|
||||||
target := msg.Header["X-Micro-Target"]
|
target := getHeader("Micro-Target", msg.Header)
|
||||||
|
|
||||||
// if the protocol exists (mucp) do nothing
|
// if the protocol exists (mucp) do nothing
|
||||||
if len(protocol) > 0 {
|
if len(protocol) > 0 {
|
||||||
@ -91,12 +141,12 @@ func setupProtocol(msg *transport.Message) codec.NewCodec {
|
|||||||
|
|
||||||
// no method then set to endpoint
|
// no method then set to endpoint
|
||||||
if len(method) == 0 {
|
if len(method) == 0 {
|
||||||
msg.Header["X-Micro-Method"] = method
|
msg.Header["Micro-Method"] = endpoint
|
||||||
}
|
}
|
||||||
|
|
||||||
// no endpoint then set to method
|
// no endpoint then set to method
|
||||||
if len(endpoint) == 0 {
|
if len(endpoint) == 0 {
|
||||||
msg.Header["X-Micro-Endpoint"] = method
|
msg.Header["Micro-Endpoint"] = method
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -118,7 +168,7 @@ func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCod
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
|
func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
|
||||||
// the initieal message
|
// the initial message
|
||||||
m := codec.Message{
|
m := codec.Message{
|
||||||
Header: c.req.Header,
|
Header: c.req.Header,
|
||||||
Body: c.req.Body,
|
Body: c.req.Body,
|
||||||
@ -153,25 +203,17 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
|
|||||||
c.first = false
|
c.first = false
|
||||||
|
|
||||||
// set some internal things
|
// set some internal things
|
||||||
m.Target = m.Header["X-Micro-Service"]
|
getHeaders(&m)
|
||||||
m.Method = m.Header["X-Micro-Method"]
|
|
||||||
m.Endpoint = m.Header["X-Micro-Endpoint"]
|
|
||||||
m.Id = m.Header["X-Micro-Id"]
|
|
||||||
|
|
||||||
// read header via codec
|
// read header via codec
|
||||||
err := c.codec.ReadHeader(&m, codec.Request)
|
if err := c.codec.ReadHeader(&m, codec.Request); err != nil {
|
||||||
|
return err
|
||||||
// set the method/id
|
|
||||||
r.Method = m.Method
|
|
||||||
r.Endpoint = m.Endpoint
|
|
||||||
r.Id = m.Id
|
|
||||||
|
|
||||||
// TODO: remove the old legacy cruft
|
|
||||||
if len(r.Endpoint) == 0 {
|
|
||||||
r.Endpoint = r.Method
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
// set message
|
||||||
|
*r = m
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *rpcCodec) ReadBody(b interface{}) error {
|
func (c *rpcCodec) ReadBody(b interface{}) error {
|
||||||
@ -206,29 +248,7 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error {
|
|||||||
m.Header = map[string]string{}
|
m.Header = map[string]string{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// set request id
|
setHeaders(m, r)
|
||||||
if len(r.Id) > 0 {
|
|
||||||
m.Header["X-Micro-Id"] = r.Id
|
|
||||||
}
|
|
||||||
|
|
||||||
// set target
|
|
||||||
if len(r.Target) > 0 {
|
|
||||||
m.Header["X-Micro-Service"] = r.Target
|
|
||||||
}
|
|
||||||
|
|
||||||
// set request method
|
|
||||||
if len(r.Method) > 0 {
|
|
||||||
m.Header["X-Micro-Method"] = r.Method
|
|
||||||
}
|
|
||||||
|
|
||||||
// set request endpoint
|
|
||||||
if len(r.Endpoint) > 0 {
|
|
||||||
m.Header["X-Micro-Endpoint"] = r.Endpoint
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(r.Error) > 0 {
|
|
||||||
m.Header["X-Micro-Error"] = r.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
// the body being sent
|
// the body being sent
|
||||||
var body []byte
|
var body []byte
|
||||||
@ -246,6 +266,7 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error {
|
|||||||
// write an error if it failed
|
// write an error if it failed
|
||||||
m.Error = errors.Wrapf(err, "Unable to encode body").Error()
|
m.Error = errors.Wrapf(err, "Unable to encode body").Error()
|
||||||
m.Header["X-Micro-Error"] = m.Error
|
m.Header["X-Micro-Error"] = m.Error
|
||||||
|
m.Header["Micro-Error"] = m.Error
|
||||||
// no body to write
|
// no body to write
|
||||||
if err := c.codec.Write(m, nil); err != nil {
|
if err := c.codec.Write(m, nil); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -120,9 +120,9 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
|
|||||||
|
|
||||||
// internal request
|
// internal request
|
||||||
request := &rpcRequest{
|
request := &rpcRequest{
|
||||||
service: msg.Header["X-Micro-Service"],
|
service: getHeader("Micro-Service", msg.Header),
|
||||||
method: msg.Header["X-Micro-Method"],
|
method: getHeader("Micro-Method", msg.Header),
|
||||||
endpoint: msg.Header["X-Micro-Endpoint"],
|
endpoint: getHeader("Micro-Endpoint", msg.Header),
|
||||||
contentType: ct,
|
contentType: ct,
|
||||||
codec: rcodec,
|
codec: rcodec,
|
||||||
header: msg.Header,
|
header: msg.Header,
|
||||||
|
Loading…
Reference in New Issue
Block a user