server: remove unparsed body from request and message #102
@ -22,7 +22,7 @@ func (r *rpcMessage) Topic() string {
|
|||||||
return r.topic
|
return r.topic
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *rpcMessage) Payload() interface{} {
|
func (r *rpcMessage) Body() interface{} {
|
||||||
return r.payload
|
return r.payload
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -30,10 +30,6 @@ func (r *rpcMessage) Header() metadata.Metadata {
|
|||||||
return r.header
|
return r.header
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *rpcMessage) Body() []byte {
|
|
||||||
return r.body
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *rpcMessage) Codec() codec.Codec {
|
func (r *rpcMessage) Codec() codec.Codec {
|
||||||
return r.codec
|
return r.codec
|
||||||
}
|
}
|
||||||
|
@ -75,13 +75,11 @@ type Message interface {
|
|||||||
// Topic of the message
|
// Topic of the message
|
||||||
Topic() string
|
Topic() string
|
||||||
// The decoded payload value
|
// The decoded payload value
|
||||||
Payload() interface{}
|
Body() interface{}
|
||||||
// The content type of the payload
|
// The content type of the payload
|
||||||
ContentType() string
|
ContentType() string
|
||||||
// The raw headers of the message
|
// The raw headers of the message
|
||||||
Header() metadata.Metadata
|
Header() metadata.Metadata
|
||||||
// The raw body of the message
|
|
||||||
Body() []byte
|
|
||||||
// Codec used to decode the message
|
// Codec used to decode the message
|
||||||
Codec() codec.Codec
|
Codec() codec.Codec
|
||||||
}
|
}
|
||||||
|
@ -252,7 +252,7 @@ func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.Bat
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
rb := reflect.New(req.Type().Elem())
|
rb := reflect.New(req.Type().Elem())
|
||||||
if err = cf.ReadBody(bytes.NewReader(msg.Body()), rb.Interface()); err != nil {
|
if err = cf.ReadBody(bytes.NewReader(msg.(*rpcMessage).body), rb.Interface()); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
msg.(*rpcMessage).codec = cf
|
msg.(*rpcMessage).codec = cf
|
||||||
@ -269,7 +269,7 @@ func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.Bat
|
|||||||
}
|
}
|
||||||
payloads := reflect.MakeSlice(reqType, 0, len(ms))
|
payloads := reflect.MakeSlice(reqType, 0, len(ms))
|
||||||
for _, m := range ms {
|
for _, m := range ms {
|
||||||
payloads = reflect.Append(payloads, reflect.ValueOf(m.Payload()))
|
payloads = reflect.Append(payloads, reflect.ValueOf(m.Body()))
|
||||||
}
|
}
|
||||||
vals = append(vals, payloads)
|
vals = append(vals, payloads)
|
||||||
|
|
||||||
@ -381,7 +381,7 @@ func (n *noopServer) newSubHandler(sb *subscriber, opts Options) broker.Handler
|
|||||||
vals = append(vals, reflect.ValueOf(ctx))
|
vals = append(vals, reflect.ValueOf(ctx))
|
||||||
}
|
}
|
||||||
|
|
||||||
vals = append(vals, reflect.ValueOf(msg.Payload()))
|
vals = append(vals, reflect.ValueOf(msg.Body()))
|
||||||
|
|
||||||
returnValues := handler.method.Call(vals)
|
returnValues := handler.method.Call(vals)
|
||||||
if rerr := returnValues[0].Interface(); rerr != nil {
|
if rerr := returnValues[0].Interface(); rerr != nil {
|
||||||
@ -406,7 +406,6 @@ func (n *noopServer) newSubHandler(sb *subscriber, opts Options) broker.Handler
|
|||||||
contentType: ct,
|
contentType: ct,
|
||||||
payload: req.Interface(),
|
payload: req.Interface(),
|
||||||
header: msg.Header,
|
header: msg.Header,
|
||||||
body: msg.Body,
|
|
||||||
})
|
})
|
||||||
results <- cerr
|
results <- cerr
|
||||||
}()
|
}()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user