diff --git a/client/client.go b/client/client.go index 6a048d87..21ef9732 100644 --- a/client/client.go +++ b/client/client.go @@ -2,14 +2,15 @@ package client import ( "github.com/myodc/go-micro/transport" + "golang.org/x/net/context" ) type Client interface { NewRequest(string, string, interface{}) Request NewProtoRequest(string, string, interface{}) Request NewJsonRequest(string, string, interface{}) Request - Call(Request, interface{}) error - CallRemote(string, string, Request, interface{}) error + Call(context.Context, Request, interface{}) error + CallRemote(context.Context, string, Request, interface{}) error } type options struct { @@ -28,12 +29,12 @@ func Transport(t transport.Transport) Option { } } -func Call(request Request, response interface{}) error { - return DefaultClient.Call(request, response) +func Call(ctx context.Context, request Request, response interface{}) error { + return DefaultClient.Call(ctx, request, response) } -func CallRemote(address, path string, request Request, response interface{}) error { - return DefaultClient.CallRemote(address, path, request, response) +func CallRemote(ctx context.Context, address string, request Request, response interface{}) error { + return DefaultClient.CallRemote(ctx, address, request, response) } func NewRequest(service, method string, request interface{}) Request { diff --git a/client/headers.go b/client/headers.go deleted file mode 100644 index bae845a3..00000000 --- a/client/headers.go +++ /dev/null @@ -1,8 +0,0 @@ -package client - -type Headers interface { - Add(string, string) - Del(string) - Get(string) string - Set(string, string) -} diff --git a/client/request.go b/client/request.go index 9cbe919c..7a2b7607 100644 --- a/client/request.go +++ b/client/request.go @@ -5,5 +5,4 @@ type Request interface { Method() string ContentType() string Request() interface{} - Headers() Headers } diff --git a/client/rpc_client.go b/client/rpc_client.go index d1dc2a74..7fbd6e31 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -7,13 +7,16 @@ import ( "net/http" "time" + c "github.com/myodc/go-micro/context" "github.com/myodc/go-micro/errors" "github.com/myodc/go-micro/registry" "github.com/myodc/go-micro/transport" + rpc "github.com/youtube/vitess/go/rpcplus" js "github.com/youtube/vitess/go/rpcplus/jsonrpc" pb "github.com/youtube/vitess/go/rpcplus/pbrpc" - ctx "golang.org/x/net/context" + + "golang.org/x/net/context" "google.golang.org/grpc" ) @@ -34,14 +37,14 @@ func (t *headerRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) return t.r.RoundTrip(r) } -func (r *RpcClient) call(address, path string, request Request, response interface{}) error { +func (r *RpcClient) call(ctx context.Context, address string, request Request, response interface{}) error { switch request.ContentType() { case "application/grpc": cc, err := grpc.Dial(address) if err != nil { return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error connecting to server: %v", err)) } - if err := grpc.Invoke(ctx.Background(), path, request.Request(), response, cc); err != nil { + if err := grpc.Invoke(ctx, request.Method(), request.Request(), response, cc); err != nil { return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) } return nil @@ -77,10 +80,10 @@ func (r *RpcClient) call(address, path string, request Request, response interfa Body: reqB.Bytes(), } - h, _ := request.Headers().(http.Header) - for k, v := range h { - if len(v) > 0 { - msg.Header[k] = v[0] + md, ok := c.GetMetaData(ctx) + if ok { + for k, v := range md { + msg.Header[k] = v } } @@ -129,12 +132,12 @@ func (r *RpcClient) call(address, path string, request Request, response interfa return nil } -func (r *RpcClient) CallRemote(address, path string, request Request, response interface{}) error { - return r.call(address, path, request, response) +func (r *RpcClient) CallRemote(ctx context.Context, address string, request Request, response interface{}) error { + return r.call(ctx, address, request, response) } // TODO: Call(..., opts *Options) error { -func (r *RpcClient) Call(request Request, response interface{}) error { +func (r *RpcClient) Call(ctx context.Context, request Request, response interface{}) error { service, err := registry.GetService(request.Service()) if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) @@ -152,7 +155,7 @@ func (r *RpcClient) Call(request Request, response interface{}) error { address = fmt.Sprintf("%s:%d", address, node.Port()) } - return r.call(address, "", request, response) + return r.call(ctx, address, request, response) } func (r *RpcClient) NewRequest(service, method string, request interface{}) Request { diff --git a/client/rpc_request.go b/client/rpc_request.go index e95d4ac3..5025b1db 100644 --- a/client/rpc_request.go +++ b/client/rpc_request.go @@ -1,13 +1,10 @@ package client -import ( - "net/http" -) - type RpcRequest struct { - service, method, contentType string - request interface{} - headers http.Header + service string + method string + contentType string + request interface{} } func newRpcRequest(service, method string, request interface{}, contentType string) *RpcRequest { @@ -16,7 +13,6 @@ func newRpcRequest(service, method string, request interface{}, contentType stri method: method, request: request, contentType: contentType, - headers: make(http.Header), } } @@ -24,10 +20,6 @@ func (r *RpcRequest) ContentType() string { return r.contentType } -func (r *RpcRequest) Headers() Headers { - return r.headers -} - func (r *RpcRequest) Service() string { return r.service } diff --git a/context/context.go b/context/context.go new file mode 100644 index 00000000..ac28f7e5 --- /dev/null +++ b/context/context.go @@ -0,0 +1,22 @@ +package context + +import ( + "golang.org/x/net/context" +) + +type key int + +const ( + mdKey = key(0) +) + +type MetaData map[string]string + +func GetMetaData(ctx context.Context) (MetaData, bool) { + md, ok := ctx.Value(mdKey).(MetaData) + return md, ok +} + +func WithMetaData(ctx context.Context, md MetaData) context.Context { + return context.WithValue(ctx, mdKey, md) +} diff --git a/examples/grpc_client.go b/examples/grpc_client.go index 55c5c3a5..9c8be094 100644 --- a/examples/grpc_client.go +++ b/examples/grpc_client.go @@ -5,16 +5,17 @@ import ( h "github.com/grpc/grpc-common/go/helloworld" "github.com/myodc/go-micro/client" + "golang.org/x/net/context" ) // run github.com/grpc/grpc-common/go/greeter_server/main.go func main() { - req := client.NewRpcRequest("helloworld.Greeter", "SayHello", &h.HelloRequest{ + req := client.NewRpcRequest("helloworld.Greeter", "helloworld.Greeter/SayHello", &h.HelloRequest{ Name: "John", }, "application/grpc") rsp := &h.HelloReply{} - err := client.CallRemote("localhost:50051", "helloworld.Greeter/SayHello", req, rsp) + err := client.CallRemote(context.Background(), "localhost:50051", req, rsp) if err != nil { fmt.Println(err) } diff --git a/examples/service_client.go b/examples/service_client.go index 5821bb7b..83d6339d 100644 --- a/examples/service_client.go +++ b/examples/service_client.go @@ -5,7 +5,9 @@ import ( "github.com/myodc/go-micro/client" "github.com/myodc/go-micro/cmd" + c "github.com/myodc/go-micro/context" example "github.com/myodc/go-micro/template/proto/example" + "golang.org/x/net/context" ) func main() { @@ -16,14 +18,16 @@ func main() { Name: "John", }) - // Set arbitrary headers - req.Headers().Set("X-User-Id", "john") - req.Headers().Set("X-From-Id", "script") + // create context with metadata + ctx := c.WithMetaData(context.Background(), map[string]string{ + "X-User-Id": "john", + "X-From-Id": "script", + }) rsp := &example.Response{} // Call service - if err := client.Call(req, rsp); err != nil { + if err := client.Call(ctx, req, rsp); err != nil { fmt.Println(err) return } diff --git a/server/context.go b/server/context.go deleted file mode 100644 index f556f0d3..00000000 --- a/server/context.go +++ /dev/null @@ -1,35 +0,0 @@ -package server - -import ( - "time" - - "code.google.com/p/go.net/context" -) - -type ctx struct{} - -func (ctx *ctx) Deadline() (deadline time.Time, ok bool) { - return time.Time{}, false -} - -func (ctx *ctx) Done() <-chan struct{} { - return nil -} - -func (ctx *ctx) Err() error { - return nil -} - -func (ctx *ctx) Value(key interface{}) interface{} { - return nil -} - -func newContext(parent context.Context, s *serverContext) context.Context { - return context.WithValue(parent, "serverContext", s) -} - -// return server.Context -func NewContext(ctx context.Context) (Context, bool) { - c, ok := ctx.Value("serverContext").(*serverContext) - return c, ok -} diff --git a/server/headers.go b/server/headers.go deleted file mode 100644 index 9cdf78e7..00000000 --- a/server/headers.go +++ /dev/null @@ -1,8 +0,0 @@ -package server - -type Headers interface { - Add(string, string) - Del(string) - Get(string) string - Set(string, string) -} diff --git a/server/request.go b/server/request.go deleted file mode 100644 index 6c620f4f..00000000 --- a/server/request.go +++ /dev/null @@ -1,6 +0,0 @@ -package server - -type Request interface { - Headers() Headers - Session(string) string -} diff --git a/server/rpc_server.go b/server/rpc_server.go index b5184b83..9a2ddcae 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -4,11 +4,14 @@ import ( "bytes" "sync" - log "github.com/golang/glog" + c "github.com/myodc/go-micro/context" "github.com/myodc/go-micro/transport" + + log "github.com/golang/glog" rpc "github.com/youtube/vitess/go/rpcplus" js "github.com/youtube/vitess/go/rpcplus/jsonrpc" pb "github.com/youtube/vitess/go/rpcplus/pbrpc" + "golang.org/x/net/context" ) @@ -26,7 +29,6 @@ var ( ) func (s *RpcServer) accept(sock transport.Socket) { - // serveCtx := getServerContext(req) var msg transport.Message if err := sock.Recv(&msg); err != nil { return @@ -50,17 +52,21 @@ func (s *RpcServer) accept(sock transport.Socket) { cc = js.NewServerCodec(buf) default: return - // return nil, errors.InternalServerError("go.micro.server", fmt.Sprintf("Unsupported content-type: %v", req.Header.Get("Content-Type"))) } - //ctx := newContext(&ctx{}, serveCtx) - if err := s.rpc.ServeRequestWithContext(context.Background(), cc); err != nil { + // strip our headers + ct := msg.Header["Content-Type"] + delete(msg.Header, "Content-Type") + + ctx := c.WithMetaData(context.Background(), msg.Header) + + if err := s.rpc.ServeRequestWithContext(ctx, cc); err != nil { return } sock.Send(&transport.Message{ Header: map[string]string{ - "Content-Type": msg.Header["Content-Type"], + "Content-Type": ct, }, Body: rsp.Bytes(), }) diff --git a/server/server_context.go b/server/server_context.go deleted file mode 100644 index cc2a7a7d..00000000 --- a/server/server_context.go +++ /dev/null @@ -1,120 +0,0 @@ -package server - -import ( - "net/http" - "sync" - - log "github.com/golang/glog" - "github.com/myodc/go-micro/client" -) - -var ctxs = struct { - sync.Mutex - m map[*http.Request]*serverContext -}{ - m: make(map[*http.Request]*serverContext), -} - -// A server context interface -type Context interface { - Request() Request // the request made to the server - Headers() Headers // the response headers - NewRequest(string, string, interface{}) client.Request // a new scoped client request - NewProtoRequest(string, string, interface{}) client.Request // a new scoped client request - NewJsonRequest(string, string, interface{}) client.Request // a new scoped client request -} - -// context represents the context of an in-flight HTTP request. -// It implements the appengine.Context and http.ResponseWriter interfaces. -type serverContext struct { - req *serverRequest - outCode int - outHeader http.Header - outBody []byte -} - -// Copied from $GOROOT/src/pkg/net/http/transfer.go. Some response status -// codes do not permit a response body (nor response entity headers such as -// Content-Length, Content-Type, etc). -func bodyAllowedForStatus(status int) bool { - switch { - case status >= 100 && status <= 199: - return false - case status == 204: - return false - case status == 304: - return false - } - return true -} - -func getServerContext(req *http.Request) *serverContext { - ctxs.Lock() - c := ctxs.m[req] - ctxs.Unlock() - - if c == nil { - // Someone passed in an http.Request that is not in-flight. - panic("NewContext passed an unknown http.Request") - } - return c -} - -func (c *serverContext) NewRequest(service, method string, request interface{}) client.Request { - req := client.NewRequest(service, method, request) - // TODO: set headers and scope - req.Headers().Set("X-User-Session", c.Request().Session("X-User-Session")) - return req -} - -func (c *serverContext) NewProtoRequest(service, method string, request interface{}) client.Request { - req := client.NewProtoRequest(service, method, request) - // TODO: set headers and scope - req.Headers().Set("X-User-Session", c.Request().Session("X-User-Session")) - return req -} - -func (c *serverContext) NewJsonRequest(service, method string, request interface{}) client.Request { - req := client.NewJsonRequest(service, method, request) - // TODO: set headers and scope - req.Headers().Set("X-User-Session", c.Request().Session("X-User-Session")) - return req -} - -// The response headers -func (c *serverContext) Headers() Headers { - return c.outHeader -} - -// The response headers -func (c *serverContext) Header() http.Header { - return c.outHeader -} - -// The request made to the server -func (c *serverContext) Request() Request { - return c.req -} - -func (c *serverContext) Write(b []byte) (int, error) { - if c.outCode == 0 { - c.WriteHeader(http.StatusOK) - } - if len(b) > 0 && !bodyAllowedForStatus(c.outCode) { - return 0, http.ErrBodyNotAllowed - } - c.outBody = append(c.outBody, b...) - return len(b), nil -} - -func (c *serverContext) WriteHeader(code int) { - if c.outCode != 0 { - log.Error("WriteHeader called multiple times on request.") - return - } - c.outCode = code -} - -func GetContext(r *http.Request) *serverContext { - return getServerContext(r) -} diff --git a/server/server_request.go b/server/server_request.go deleted file mode 100644 index 9058ec52..00000000 --- a/server/server_request.go +++ /dev/null @@ -1,25 +0,0 @@ -package server - -import ( - "net/http" -) - -type serverRequest struct { - req *http.Request -} - -func (s *serverRequest) Headers() Headers { - return s.req.Header -} - -func (s *serverRequest) Session(name string) string { - if sess := s.Headers().Get(name); len(sess) > 0 { - return sess - } - - c, err := s.req.Cookie(name) - if err != nil { - return "" - } - return c.Value -} diff --git a/template/handler/example.go b/template/handler/example.go index f55e3f1b..1823376f 100644 --- a/template/handler/example.go +++ b/template/handler/example.go @@ -1,17 +1,23 @@ package handler import ( - "code.google.com/p/go.net/context" - log "github.com/golang/glog" + c "github.com/myodc/go-micro/context" "github.com/myodc/go-micro/server" example "github.com/myodc/go-micro/template/proto/example" + + "golang.org/x/net/context" ) type Example struct{} func (e *Example) Call(ctx context.Context, req *example.Request, rsp *example.Response) error { - log.Info("Received Example.Call request") + md, ok := c.GetMetaData(ctx) + if ok { + log.Infof("Received Example.Call request with metadata: %v", md) + } else { + log.Info("Received Example.Call request") + } rsp.Msg = server.Id + ": Hello " + req.Name diff --git a/transport/http_transport.go b/transport/http_transport.go index 278a524e..48f004da 100644 --- a/transport/http_transport.go +++ b/transport/http_transport.go @@ -54,7 +54,6 @@ func (h *HttpTransportClient) Send(m *Message) (*Message, error) { URL: &url.URL{ Scheme: "http", Host: h.addr, - // Path: path, }, Header: header, Body: buf,