server: remove unparsed body from request and message #102
@ -22,7 +22,7 @@ func (r *rpcMessage) Topic() string {
|
||||
return r.topic
|
||||
}
|
||||
|
||||
func (r *rpcMessage) Payload() interface{} {
|
||||
func (r *rpcMessage) Body() interface{} {
|
||||
return r.payload
|
||||
}
|
||||
|
||||
@ -30,10 +30,6 @@ func (r *rpcMessage) Header() metadata.Metadata {
|
||||
return r.header
|
||||
}
|
||||
|
||||
func (r *rpcMessage) Body() []byte {
|
||||
return r.body
|
||||
}
|
||||
|
||||
func (r *rpcMessage) Codec() codec.Codec {
|
||||
return r.codec
|
||||
}
|
||||
|
@ -75,13 +75,11 @@ type Message interface {
|
||||
// Topic of the message
|
||||
Topic() string
|
||||
// The decoded payload value
|
||||
Payload() interface{}
|
||||
Body() interface{}
|
||||
// The content type of the payload
|
||||
ContentType() string
|
||||
// The raw headers of the message
|
||||
Header() metadata.Metadata
|
||||
// The raw body of the message
|
||||
Body() []byte
|
||||
// Codec used to decode the message
|
||||
Codec() codec.Codec
|
||||
}
|
||||
|
@ -252,7 +252,7 @@ func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.Bat
|
||||
return err
|
||||
}
|
||||
rb := reflect.New(req.Type().Elem())
|
||||
if err = cf.ReadBody(bytes.NewReader(msg.Body()), rb.Interface()); err != nil {
|
||||
if err = cf.ReadBody(bytes.NewReader(msg.(*rpcMessage).body), rb.Interface()); err != nil {
|
||||
return err
|
||||
}
|
||||
msg.(*rpcMessage).codec = cf
|
||||
@ -269,7 +269,7 @@ func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.Bat
|
||||
}
|
||||
payloads := reflect.MakeSlice(reqType, 0, len(ms))
|
||||
for _, m := range ms {
|
||||
payloads = reflect.Append(payloads, reflect.ValueOf(m.Payload()))
|
||||
payloads = reflect.Append(payloads, reflect.ValueOf(m.Body()))
|
||||
}
|
||||
vals = append(vals, payloads)
|
||||
|
||||
@ -381,7 +381,7 @@ func (n *noopServer) newSubHandler(sb *subscriber, opts Options) broker.Handler
|
||||
vals = append(vals, reflect.ValueOf(ctx))
|
||||
}
|
||||
|
||||
vals = append(vals, reflect.ValueOf(msg.Payload()))
|
||||
vals = append(vals, reflect.ValueOf(msg.Body()))
|
||||
|
||||
returnValues := handler.method.Call(vals)
|
||||
if rerr := returnValues[0].Interface(); rerr != nil {
|
||||
@ -406,7 +406,6 @@ func (n *noopServer) newSubHandler(sb *subscriber, opts Options) broker.Handler
|
||||
contentType: ct,
|
||||
payload: req.Interface(),
|
||||
header: msg.Header,
|
||||
body: msg.Body,
|
||||
})
|
||||
results <- cerr
|
||||
}()
|
||||
|
Loading…
Reference in New Issue
Block a user