diff --git a/client/rpc_client.go b/client/rpc_client.go index 3028d366..77c546cb 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -487,8 +487,8 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt id := uuid.New().String() md["Content-Type"] = msg.ContentType() - md["X-Micro-Topic"] = msg.Topic() - md["X-Micro-Id"] = id + md["Micro-Topic"] = msg.Topic() + md["Micro-Id"] = id // encode message body cf, err := r.newCodec(msg.ContentType()) @@ -500,8 +500,8 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt Target: msg.Topic(), Type: codec.Publication, Header: map[string]string{ - "X-Micro-Id": id, - "X-Micro-Topic": msg.Topic(), + "Micro-Id": id, + "Micro-Topic": msg.Topic(), }, }, msg.Payload()); err != nil { return errors.InternalServerError("go.micro.client", err.Error()) diff --git a/client/rpc_codec.go b/client/rpc_codec.go index 35f64e41..53588ab9 100644 --- a/client/rpc_codec.go +++ b/client/rpc_codec.go @@ -84,6 +84,47 @@ func (rwc *readWriteCloser) Close() error { return nil } +func getHeaders(m *codec.Message) { + get := func(hdr string) string { + if hd := m.Header[hdr]; len(hd) > 0 { + return hd + } + // old + return m.Header["X-"+hdr] + } + + // check error in header + if len(m.Error) == 0 { + m.Error = get("Micro-Error") + } + + // check endpoint in header + if len(m.Endpoint) == 0 { + m.Endpoint = get("Micro-Endpoint") + } + + // check method in header + if len(m.Method) == 0 { + m.Method = get("Micro-Method") + } + + if len(m.Id) == 0 { + m.Id = get("Micro-Id") + } +} + +func setHeaders(m *codec.Message) { + set := func(hdr, v string) { + m.Header[hdr] = v + m.Header["X-"+hdr] = v + } + + set("Micro-Id", m.Id) + set("Micro-Service", m.Target) + set("Micro-Method", m.Method) + set("Micro-Endpoint", m.Endpoint) +} + // setupProtocol sets up the old protocol func setupProtocol(msg *transport.Message, node *registry.Node) codec.NewCodec { protocol := node.Metadata["protocol"] @@ -133,10 +174,7 @@ func (c *rpcCodec) Write(m *codec.Message, body interface{}) error { } // set the mucp headers - m.Header["X-Micro-Id"] = m.Id - m.Header["X-Micro-Service"] = m.Target - m.Header["X-Micro-Method"] = m.Method - m.Header["X-Micro-Endpoint"] = m.Endpoint + setHeaders(m) // if body is bytes Frame don't encode if body != nil { @@ -171,43 +209,25 @@ func (c *rpcCodec) Write(m *codec.Message, body interface{}) error { return nil } -func (c *rpcCodec) ReadHeader(wm *codec.Message, r codec.MessageType) error { - var m transport.Message - if err := c.client.Recv(&m); err != nil { +func (c *rpcCodec) ReadHeader(m *codec.Message, r codec.MessageType) error { + var tm transport.Message + + // read message from transport + if err := c.client.Recv(&tm); err != nil { return errors.InternalServerError("go.micro.client.transport", err.Error()) } - c.buf.rbuf.Reset() - c.buf.rbuf.Write(m.Body) - var me codec.Message - // set headers - me.Header = m.Header + c.buf.rbuf.Reset() + c.buf.rbuf.Write(tm.Body) + + // set headers from transport + m.Header = tm.Header // read header - err := c.codec.ReadHeader(&me, r) - wm.Endpoint = me.Endpoint - wm.Method = me.Method - wm.Id = me.Id - wm.Error = me.Error + err := c.codec.ReadHeader(m, r) - // check error in header - if len(me.Error) == 0 { - wm.Error = me.Header["X-Micro-Error"] - } - - // check endpoint in header - if len(me.Endpoint) == 0 { - wm.Endpoint = me.Header["X-Micro-Endpoint"] - } - - // check method in header - if len(me.Method) == 0 { - wm.Method = me.Header["X-Micro-Method"] - } - - if len(me.Id) == 0 { - wm.Id = me.Header["X-Micro-Id"] - } + // get headers + getHeaders(m) // return header error if err != nil { diff --git a/codec/grpc/grpc.go b/codec/grpc/grpc.go index 732f0356..d347c31d 100644 --- a/codec/grpc/grpc.go +++ b/codec/grpc/grpc.go @@ -29,8 +29,8 @@ func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error { // service method path := m.Header[":path"] if len(path) == 0 || path[0] != '/' { - m.Target = m.Header["X-Micro-Service"] - m.Endpoint = m.Header["X-Micro-Endpoint"] + m.Target = m.Header["Micro-Service"] + m.Endpoint = m.Header["Micro-Endpoint"] } else { // [ , a.package.Foo, Bar] parts := strings.Split(path, "/") diff --git a/micro.go b/micro.go index be9ee3ca..44bc52fc 100644 --- a/micro.go +++ b/micro.go @@ -42,7 +42,7 @@ type Publisher interface { type Option func(*Options) var ( - HeaderPrefix = "X-Micro-" + HeaderPrefix = "Micro-" ) // NewService creates and returns a new Service based on the packages within. diff --git a/server/rpc_codec.go b/server/rpc_codec.go index 261b5fd1..ef2a169f 100644 --- a/server/rpc_codec.go +++ b/server/rpc_codec.go @@ -66,13 +66,63 @@ func (rwc *readWriteCloser) Close() error { return nil } +func getHeader(hdr string, md map[string]string) string { + if hd := md[hdr]; len(hd) > 0 { + return hd + } + return md["X-"+hdr] +} + +func getHeaders(m *codec.Message) { + get := func(hdr, v string) string { + if len(v) > 0 { + return v + } + + if hd := m.Header[hdr]; len(hd) > 0 { + return hd + } + + // old + return m.Header["X-"+hdr] + } + + m.Id = get("Micro-Id", m.Id) + m.Error = get("Micro-Error", m.Error) + m.Endpoint = get("Micro-Endpoint", m.Endpoint) + m.Method = get("Micro-Method", m.Method) + m.Target = get("Micro-Service", m.Target) + + // TODO: remove this cruft + if len(m.Endpoint) == 0 { + m.Endpoint = m.Method + } +} + +func setHeaders(m, r *codec.Message) { + set := func(hdr, v string) { + if len(v) == 0 { + return + } + m.Header[hdr] = v + m.Header["X-"+hdr] = v + } + + // set headers + set("Micro-Id", r.Id) + set("Micro-Service", r.Target) + set("Micro-Method", r.Method) + set("Micro-Endpoint", r.Endpoint) + set("Micro-Error", r.Error) +} + // setupProtocol sets up the old protocol func setupProtocol(msg *transport.Message) codec.NewCodec { - service := msg.Header["X-Micro-Service"] - method := msg.Header["X-Micro-Method"] - endpoint := msg.Header["X-Micro-Endpoint"] - protocol := msg.Header["X-Micro-Protocol"] - target := msg.Header["X-Micro-Target"] + service := getHeader("Micro-Service", msg.Header) + method := getHeader("Micro-Method", msg.Header) + endpoint := getHeader("Micro-Endpoint", msg.Header) + protocol := getHeader("Micro-Protocol", msg.Header) + target := getHeader("Micro-Target", msg.Header) // if the protocol exists (mucp) do nothing if len(protocol) > 0 { @@ -91,12 +141,12 @@ func setupProtocol(msg *transport.Message) codec.NewCodec { // no method then set to endpoint if len(method) == 0 { - msg.Header["X-Micro-Method"] = method + msg.Header["Micro-Method"] = endpoint } // no endpoint then set to method if len(endpoint) == 0 { - msg.Header["X-Micro-Endpoint"] = method + msg.Header["Micro-Endpoint"] = method } return nil @@ -118,7 +168,7 @@ func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCod } func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error { - // the initieal message + // the initial message m := codec.Message{ Header: c.req.Header, Body: c.req.Body, @@ -153,25 +203,17 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error { c.first = false // set some internal things - m.Target = m.Header["X-Micro-Service"] - m.Method = m.Header["X-Micro-Method"] - m.Endpoint = m.Header["X-Micro-Endpoint"] - m.Id = m.Header["X-Micro-Id"] + getHeaders(&m) // read header via codec - err := c.codec.ReadHeader(&m, codec.Request) - - // set the method/id - r.Method = m.Method - r.Endpoint = m.Endpoint - r.Id = m.Id - - // TODO: remove the old legacy cruft - if len(r.Endpoint) == 0 { - r.Endpoint = r.Method + if err := c.codec.ReadHeader(&m, codec.Request); err != nil { + return err } - return err + // set message + *r = m + + return nil } func (c *rpcCodec) ReadBody(b interface{}) error { @@ -206,29 +248,7 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error { m.Header = map[string]string{} } - // set request id - if len(r.Id) > 0 { - m.Header["X-Micro-Id"] = r.Id - } - - // set target - if len(r.Target) > 0 { - m.Header["X-Micro-Service"] = r.Target - } - - // set request method - if len(r.Method) > 0 { - m.Header["X-Micro-Method"] = r.Method - } - - // set request endpoint - if len(r.Endpoint) > 0 { - m.Header["X-Micro-Endpoint"] = r.Endpoint - } - - if len(r.Error) > 0 { - m.Header["X-Micro-Error"] = r.Error - } + setHeaders(m, r) // the body being sent var body []byte @@ -246,6 +266,7 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error { // write an error if it failed m.Error = errors.Wrapf(err, "Unable to encode body").Error() m.Header["X-Micro-Error"] = m.Error + m.Header["Micro-Error"] = m.Error // no body to write if err := c.codec.Write(m, nil); err != nil { return err diff --git a/server/rpc_server.go b/server/rpc_server.go index 41462723..0ba11836 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -120,9 +120,9 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { // internal request request := &rpcRequest{ - service: msg.Header["X-Micro-Service"], - method: msg.Header["X-Micro-Method"], - endpoint: msg.Header["X-Micro-Endpoint"], + service: getHeader("Micro-Service", msg.Header), + method: getHeader("Micro-Method", msg.Header), + endpoint: getHeader("Micro-Endpoint", msg.Header), contentType: ct, codec: rcodec, header: msg.Header,