rewriting a lot

This commit is contained in:
Asim Aslam 2019-01-09 19:11:47 +00:00
parent 1561ccbc14
commit 873fc6d663
5 changed files with 79 additions and 15 deletions

View File

@ -2,13 +2,16 @@ package server
import (
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/transport"
)
type rpcRequest struct {
service string
method string
contentType string
socket transport.Socket
codec codec.Codec
header map[string]string
body []byte
stream bool
}
@ -35,8 +38,26 @@ func (r *rpcRequest) Method() string {
return r.method
}
func (r *rpcRequest) Body() []byte {
return r.body
func (r *rpcRequest) Header() map[string]string {
return r.header
}
func (r *rpcRequest) Read() ([]byte, error) {
// got a body
if r.body != nil {
b := r.body
r.body = nil
return b, nil
}
var msg transport.Message
err := r.socket.Recv(&msg)
if err != nil {
return nil, err
}
r.header = msg.Header
return msg.Body, nil
}
func (r *rpcRequest) Stream() bool {

29
server/rpc_response.go Normal file
View File

@ -0,0 +1,29 @@
package server
import (
"net/http"
"github.com/micro/go-micro/transport"
)
type rpcResponse struct {
header map[string]string
socket transport.Socket
}
func (r *rpcResponse) WriteHeader(hdr map[string]string) {
for k, v := range hdr {
r.header[k] = v
}
}
func (r *rpcResponse) Write(b []byte) error {
if _, ok := r.header["Content-Type"]; !ok {
r.header["Content-Type"] = http.DetectContentType(b)
}
return r.socket.Send(&transport.Message{
Header: r.header,
Body: b,
})
}

View File

@ -18,7 +18,6 @@ import (
"github.com/micro/go-log"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/transport"
)
var (
@ -449,7 +448,7 @@ func (router *router) Handle(h Handler) error {
return nil
}
func (router *router) ServeRequest(ctx context.Context, r Request, s transport.Socket) error {
func (router *router) ServeRequest(ctx context.Context, r Request, rsp Response) error {
cc := r.Codec()
sending := new(sync.Mutex)
service, mtype, req, argv, replyv, keepReading, err := router.readRequest(r)

View File

@ -116,20 +116,26 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
method: msg.Header["X-Micro-Method"],
contentType: ct,
codec: newRpcCodec(&msg, sock, cf),
header: msg.Header,
body: msg.Body,
socket: sock,
stream: true,
}
// set router
var r Router
r = s.router
// internal response
response := &rpcResponse{
header: make(map[string]string),
socket: sock,
}
if s.opts.Router != nil {
r = s.opts.Router
// set router
r := s.opts.Router
if s.opts.Router == nil {
r = s.router
}
// TODO: needs better error handling
if err := r.ServeRequest(ctx, request, sock); err != nil {
if err := r.ServeRequest(ctx, request, response); err != nil {
s.wg.Done()
log.Logf("Unexpected error serving request, closing socket: %v", err)
return

View File

@ -11,7 +11,6 @@ import (
"github.com/micro/go-log"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/transport"
)
// Server is a simple micro server abstraction
@ -32,7 +31,7 @@ type Server interface {
// Router handle serving messages
type Router interface {
// ServeRequest processes a request to completion
ServeRequest(context.Context, Request, transport.Socket) error
ServeRequest(context.Context, Request, Response) error
}
// Message is an async message interface
@ -48,16 +47,26 @@ type Request interface {
Service() string
// Method name requested
Method() string
// The initial request body
Body() []byte
// Content type provided
ContentType() string
// The codec for encoding/decoding messages
// Header of the request
Header() map[string]string
// Read the undecoded request body
Read() ([]byte, error)
// The encoded message stream
Codec() codec.Codec
// Indicates whether its a stream
Stream() bool
}
// Response is the response writer for unencoded messages
type Response interface {
// Write the header
WriteHeader(map[string]string)
// write a response directly to the client
Write([]byte) error
}
// Stream represents a stream established with a client.
// A stream can be bidirectional which is indicated by the request.
// The last error will be left in Error().