server: remove unparsed body from request and message

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2022-03-21 15:23:41 +03:00
parent 74633f4290
commit 4904cad8ef
3 changed files with 5 additions and 12 deletions

View File

@ -22,7 +22,7 @@ func (r *rpcMessage) Topic() string {
return r.topic return r.topic
} }
func (r *rpcMessage) Payload() interface{} { func (r *rpcMessage) Body() interface{} {
return r.payload return r.payload
} }
@ -30,10 +30,6 @@ func (r *rpcMessage) Header() metadata.Metadata {
return r.header return r.header
} }
func (r *rpcMessage) Body() []byte {
return r.body
}
func (r *rpcMessage) Codec() codec.Codec { func (r *rpcMessage) Codec() codec.Codec {
return r.codec return r.codec
} }

View File

@ -75,13 +75,11 @@ type Message interface {
// Topic of the message // Topic of the message
Topic() string Topic() string
// The decoded payload value // The decoded payload value
Payload() interface{} Body() interface{}
// The content type of the payload // The content type of the payload
ContentType() string ContentType() string
// The raw headers of the message // The raw headers of the message
Header() metadata.Metadata Header() metadata.Metadata
// The raw body of the message
Body() []byte
// Codec used to decode the message // Codec used to decode the message
Codec() codec.Codec Codec() codec.Codec
} }

View File

@ -252,7 +252,7 @@ func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.Bat
return err return err
} }
rb := reflect.New(req.Type().Elem()) rb := reflect.New(req.Type().Elem())
if err = cf.ReadBody(bytes.NewReader(msg.Body()), rb.Interface()); err != nil { if err = cf.ReadBody(bytes.NewReader(msg.(*rpcMessage).body), rb.Interface()); err != nil {
return err return err
} }
msg.(*rpcMessage).codec = cf msg.(*rpcMessage).codec = cf
@ -269,7 +269,7 @@ func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.Bat
} }
payloads := reflect.MakeSlice(reqType, 0, len(ms)) payloads := reflect.MakeSlice(reqType, 0, len(ms))
for _, m := range ms { for _, m := range ms {
payloads = reflect.Append(payloads, reflect.ValueOf(m.Payload())) payloads = reflect.Append(payloads, reflect.ValueOf(m.Body()))
} }
vals = append(vals, payloads) vals = append(vals, payloads)
@ -381,7 +381,7 @@ func (n *noopServer) newSubHandler(sb *subscriber, opts Options) broker.Handler
vals = append(vals, reflect.ValueOf(ctx)) vals = append(vals, reflect.ValueOf(ctx))
} }
vals = append(vals, reflect.ValueOf(msg.Payload())) vals = append(vals, reflect.ValueOf(msg.Body()))
returnValues := handler.method.Call(vals) returnValues := handler.method.Call(vals)
if rerr := returnValues[0].Interface(); rerr != nil { if rerr := returnValues[0].Interface(); rerr != nil {
@ -406,7 +406,6 @@ func (n *noopServer) newSubHandler(sb *subscriber, opts Options) broker.Handler
contentType: ct, contentType: ct,
payload: req.Interface(), payload: req.Interface(),
header: msg.Header, header: msg.Header,
body: msg.Body,
}) })
results <- cerr results <- cerr
}() }()