commit 8e55cde513cbec9f3e3aaf0ed7beb637510fc9b2 Author: Asim Date: Tue Jan 13 23:31:27 2015 +0000 First diff --git a/README.me b/README.me new file mode 100644 index 00000000..ec5a678f --- /dev/null +++ b/README.me @@ -0,0 +1,3 @@ +# Go Micro - a microservices client/server library + +This a minimalistic step into microservices using HTTP/RPC and protobuf. diff --git a/client/buffer.go b/client/buffer.go new file mode 100644 index 00000000..72c3c529 --- /dev/null +++ b/client/buffer.go @@ -0,0 +1,13 @@ +package client + +import ( + "io" +) + +type buffer struct { + io.ReadWriter +} + +func (b *buffer) Close() error { + return nil +} diff --git a/client/client.go b/client/client.go new file mode 100644 index 00000000..bb405325 --- /dev/null +++ b/client/client.go @@ -0,0 +1,33 @@ +package client + +type Client interface { + NewRequest(string, string, interface{}) Request + NewProtoRequest(string, string, interface{}) Request + NewJsonRequest(string, string, interface{}) Request + Call(interface{}, interface{}) error + CallRemote(string, string, interface{}, interface{}) error +} + +var ( + client = NewRpcClient() +) + +func Call(request Request, response interface{}) error { + return client.Call(request, response) +} + +func CallRemote(address, path string, request Request, response interface{}) error { + return client.CallRemote(address, path, request, response) +} + +func NewRequest(service, method string, request interface{}) Request { + return client.NewRequest(service, method, request) +} + +func NewProtoRequest(service, method string, request interface{}) Request { + return client.NewProtoRequest(service, method, request) +} + +func NewJsonRequest(service, method string, request interface{}) Request { + return client.NewJsonRequest(service, method, request) +} diff --git a/client/headers.go b/client/headers.go new file mode 100644 index 00000000..bae845a3 --- /dev/null +++ b/client/headers.go @@ -0,0 +1,8 @@ +package client + +type Headers interface { + Add(string, string) + Del(string) + Get(string) string + Set(string, string) +} diff --git a/client/request.go b/client/request.go new file mode 100644 index 00000000..9cbe919c --- /dev/null +++ b/client/request.go @@ -0,0 +1,9 @@ +package client + +type Request interface { + Service() string + Method() string + ContentType() string + Request() interface{} + Headers() Headers +} diff --git a/client/rpc_client.go b/client/rpc_client.go new file mode 100644 index 00000000..438a6423 --- /dev/null +++ b/client/rpc_client.go @@ -0,0 +1,157 @@ +package client + +import ( + "bytes" + "fmt" + "io/ioutil" + "math/rand" + "net/http" + "net/url" + "time" + + "github.com/asim/go-micro/errors" + "github.com/asim/go-micro/registry" + rpc "github.com/youtube/vitess/go/rpcplus" + js "github.com/youtube/vitess/go/rpcplus/jsonrpc" + pb "github.com/youtube/vitess/go/rpcplus/pbrpc" +) + +type headerRoundTripper struct { + r http.RoundTripper +} + +type RpcClient struct{} + +func init() { + rand.Seed(time.Now().UnixNano()) +} + +func (t *headerRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { + r.Header.Set("X-Client-Version", "1.0") + return t.r.RoundTrip(r) +} + +func (r *RpcClient) call(address, path string, request Request, response interface{}) error { + pReq := &rpc.Request{ + ServiceMethod: request.Method(), + } + + reqB := bytes.NewBuffer(nil) + defer reqB.Reset() + buf := &buffer{ + reqB, + } + + var cc rpc.ClientCodec + switch request.ContentType() { + case "application/octet-stream": + cc = pb.NewClientCodec(buf) + case "application/json": + cc = js.NewClientCodec(buf) + default: + return errors.InternalServerError("go.micro.client", fmt.Sprintf("Unsupported request type: %s", request.ContentType())) + } + + err := cc.WriteRequest(pReq, request.Request()) + if err != nil { + return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error writing request: %v", err)) + } + + client := &http.Client{} + client.Transport = &headerRoundTripper{http.DefaultTransport} + + request.Headers().Set("Content-Type", request.ContentType()) + + hreq := &http.Request{ + Method: "POST", + URL: &url.URL{ + Scheme: "http", + Host: address, + Path: path, + }, + Header: request.Headers().(http.Header), + Body: buf, + ContentLength: int64(reqB.Len()), + Host: address, + } + + rsp, err := client.Do(hreq) + if err != nil { + return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) + } + defer rsp.Body.Close() + + b, err := ioutil.ReadAll(rsp.Body) + if err != nil { + return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error reading response: %v", err)) + } + + rspB := bytes.NewBuffer(b) + defer rspB.Reset() + rBuf := &buffer{ + rspB, + } + + switch rsp.Header.Get("Content-Type") { + case "application/octet-stream": + cc = pb.NewClientCodec(rBuf) + case "application/json": + cc = js.NewClientCodec(rBuf) + default: + return errors.InternalServerError("go.micro.client", string(b)) + } + + pRsp := &rpc.Response{} + err = cc.ReadResponseHeader(pRsp) + if err != nil { + return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error reading response headers: %v", err)) + } + + if len(pRsp.Error) > 0 { + return errors.Parse(pRsp.Error) + } + + err = cc.ReadResponseBody(response) + if err != nil { + return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error reading response body: %v", err)) + } + + return nil +} + +func (r *RpcClient) CallRemote(address, path string, request Request, response interface{}) error { + return r.call(address, path, request, response) +} + +// TODO: Call(..., opts *Options) error { +func (r *RpcClient) Call(request Request, response interface{}) error { + service, err := registry.GetService(request.Service()) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + + if len(service.Nodes()) == 0 { + return errors.NotFound("go.micro.client", "Service not found") + } + + n := rand.Int() % len(service.Nodes()) + node := service.Nodes()[n] + address := fmt.Sprintf("%s:%d", node.Address(), node.Port()) + return r.call(address, "/_rpc", request, response) +} + +func (r *RpcClient) NewRequest(service, method string, request interface{}) *RpcRequest { + return r.NewProtoRequest(service, method, request) +} + +func (r *RpcClient) NewProtoRequest(service, method string, request interface{}) *RpcRequest { + return newRpcRequest(service, method, request, "application/octet-stream") +} + +func (r *RpcClient) NewJsonRequest(service, method string, request interface{}) *RpcRequest { + return newRpcRequest(service, method, request, "application/json") +} + +func NewRpcClient() *RpcClient { + return &RpcClient{} +} diff --git a/client/rpc_request.go b/client/rpc_request.go new file mode 100644 index 00000000..e95d4ac3 --- /dev/null +++ b/client/rpc_request.go @@ -0,0 +1,45 @@ +package client + +import ( + "net/http" +) + +type RpcRequest struct { + service, method, contentType string + request interface{} + headers http.Header +} + +func newRpcRequest(service, method string, request interface{}, contentType string) *RpcRequest { + return &RpcRequest{ + service: service, + method: method, + request: request, + contentType: contentType, + headers: make(http.Header), + } +} + +func (r *RpcRequest) ContentType() string { + return r.contentType +} + +func (r *RpcRequest) Headers() Headers { + return r.headers +} + +func (r *RpcRequest) Service() string { + return r.service +} + +func (r *RpcRequest) Method() string { + return r.method +} + +func (r *RpcRequest) Request() interface{} { + return r.request +} + +func NewRpcRequest(service, method string, request interface{}, contentType string) *RpcRequest { + return newRpcRequest(service, method, request, contentType) +} diff --git a/errors/errors.go b/errors/errors.go new file mode 100644 index 00000000..cb9da542 --- /dev/null +++ b/errors/errors.go @@ -0,0 +1,81 @@ +package errors + +import ( + "encoding/json" + "net/http" +) + +type Error struct { + Id string `json:"id"` + Code int32 `json:"code"` + Detail string `json:"detail"` + Status string `json:"status"` +} + +func (e *Error) Error() string { + b, _ := json.Marshal(e) + return string(b) +} + +func New(id, detail string, code int32) error { + return &Error{ + Id: id, + Code: code, + Detail: detail, + Status: http.StatusText(int(code)), + } +} + +func Parse(err string) *Error { + var e *Error + errr := json.Unmarshal([]byte(err), &e) + if errr != nil { + e.Detail = err + } + return e +} + +func BadRequest(id, detail string) error { + return &Error{ + Id: id, + Code: 400, + Detail: detail, + Status: http.StatusText(400), + } +} + +func Unauthorized(id, detail string) error { + return &Error{ + Id: id, + Code: 401, + Detail: detail, + Status: http.StatusText(401), + } +} + +func Forbidden(id, detail string) error { + return &Error{ + Id: id, + Code: 403, + Detail: detail, + Status: http.StatusText(403), + } +} + +func NotFound(id, detail string) error { + return &Error{ + Id: id, + Code: 404, + Detail: detail, + Status: http.StatusText(404), + } +} + +func InternalServerError(id, detail string) error { + return &Error{ + Id: id, + Code: 500, + Detail: detail, + Status: http.StatusText(500), + } +} diff --git a/examples/service_client.go b/examples/service_client.go new file mode 100644 index 00000000..e0e4d080 --- /dev/null +++ b/examples/service_client.go @@ -0,0 +1,30 @@ +package main + +import ( + "fmt" + + "code.google.com/p/goprotobuf/proto" + "github.com/asim/go-micro/client" + example "github.com/asim/go-micro/template/proto/example" +) + +func main() { + // Create new request to service go.micro.service.go-template, method Example.Call + req := client.NewRequest("go.micro.service.template", "Example.Call", &example.Request{ + Name: proto.String("John"), + }) + + // Set arbitrary headers + req.Headers().Set("X-User-Id", "john") + req.Headers().Set("X-From-Id", "script") + + rsp := &example.Response{} + + // Call service + if err := client.Call(req, rsp); err != nil { + fmt.Println(err) + return + } + + fmt.Println(rsp.GetMsg()) +} diff --git a/registry/consul_node.go b/registry/consul_node.go new file mode 100644 index 00000000..5cfbbdc9 --- /dev/null +++ b/registry/consul_node.go @@ -0,0 +1,20 @@ +package registry + +type ConsulNode struct { + Node string + NodeId string + NodeAddress string + NodePort int +} + +func (c *ConsulNode) Id() string { + return c.NodeId +} + +func (c *ConsulNode) Address() string { + return c.NodeAddress +} + +func (c *ConsulNode) Port() int { + return c.NodePort +} diff --git a/registry/consul_registry.go b/registry/consul_registry.go new file mode 100644 index 00000000..e9bdbf3a --- /dev/null +++ b/registry/consul_registry.go @@ -0,0 +1,108 @@ +package registry + +import ( + "errors" + + consul "github.com/armon/consul-api" +) + +type ConsulRegistry struct { + Client *consul.Client +} + +var ( + ConsulCheckTTL = "30s" +) + +func (c *ConsulRegistry) Deregister(s Service) error { + if len(s.Nodes()) == 0 { + return errors.New("Require at least one node") + } + + node := s.Nodes()[0] + + _, err := c.Client.Catalog().Deregister(&consul.CatalogDeregistration{ + Node: node.Id(), + Address: node.Address(), + ServiceID: node.Id(), + }, nil) + + return err +} + +func (c *ConsulRegistry) Register(s Service) error { + if len(s.Nodes()) == 0 { + return errors.New("Require at least one node") + } + + node := s.Nodes()[0] + + _, err := c.Client.Catalog().Register(&consul.CatalogRegistration{ + Node: node.Id(), + Address: node.Address(), + Service: &consul.AgentService{ + ID: node.Id(), + Service: s.Name(), + Port: node.Port(), + }, + }, nil) + + return err +} + +func (c *ConsulRegistry) GetService(name string) (Service, error) { + rsp, _, err := c.Client.Catalog().Service(name, "", nil) + if err != nil { + return nil, err + } + + cs := &ConsulService{} + + for _, s := range rsp { + if s.ServiceName != name { + continue + } + + cs.ServiceName = s.ServiceName + cs.ServiceNodes = append(cs.ServiceNodes, &ConsulNode{ + Node: s.Node, + NodeId: s.ServiceID, + NodeAddress: s.Address, + NodePort: s.ServicePort, + }) + } + + return cs, nil +} + +func (c *ConsulRegistry) NewService(name string, nodes ...Node) Service { + var snodes []*ConsulNode + + for _, node := range nodes { + if n, ok := node.(*ConsulNode); ok { + snodes = append(snodes, n) + } + } + + return &ConsulService{ + ServiceName: name, + ServiceNodes: snodes, + } +} + +func (c *ConsulRegistry) NewNode(id, address string, port int) Node { + return &ConsulNode{ + Node: id, + NodeId: id, + NodeAddress: address, + NodePort: port, + } +} + +func NewConsulRegistry() Registry { + client, _ := consul.NewClient(&consul.Config{}) + + return &ConsulRegistry{ + Client: client, + } +} diff --git a/registry/consul_service.go b/registry/consul_service.go new file mode 100644 index 00000000..4b9b88c2 --- /dev/null +++ b/registry/consul_service.go @@ -0,0 +1,20 @@ +package registry + +type ConsulService struct { + ServiceName string + ServiceNodes []*ConsulNode +} + +func (c *ConsulService) Name() string { + return c.ServiceName +} + +func (c *ConsulService) Nodes() []Node { + var nodes []Node + + for _, node := range c.ServiceNodes { + nodes = append(nodes, node) + } + + return nodes +} diff --git a/registry/kubernetes_node.go b/registry/kubernetes_node.go new file mode 100644 index 00000000..aa758a6b --- /dev/null +++ b/registry/kubernetes_node.go @@ -0,0 +1,19 @@ +package registry + +type KubernetesNode struct { + NodeId string + NodeAddress string + NodePort int +} + +func (c *KubernetesNode) Id() string { + return c.NodeId +} + +func (c *KubernetesNode) Address() string { + return c.NodeAddress +} + +func (c *KubernetesNode) Port() int { + return c.NodePort +} diff --git a/registry/kubernetes_registry.go b/registry/kubernetes_registry.go new file mode 100644 index 00000000..a6fa520a --- /dev/null +++ b/registry/kubernetes_registry.go @@ -0,0 +1,77 @@ +package registry + +import ( + "fmt" + "os" + + k8s "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" +) + +type KubernetesRegistry struct { + Client *k8s.Client + Namespace string +} + +func (c *KubernetesRegistry) Deregister(s Service) error { + return nil +} + +func (c *KubernetesRegistry) Register(s Service) error { + return nil +} + +func (c *KubernetesRegistry) GetService(name string) (Service, error) { + services, err := c.Client.Services(c.Namespace).List(labels.OneTermEqualSelector("name", name)) + if err != nil { + return nil, err + } + + if len(services.Items) == 0 { + return nil, fmt.Errorf("Service not found") + } + + ks := &KubernetesService{ServiceName: name} + for _, item := range services.Items { + ks.ServiceNodes = append(ks.ServiceNodes, &KubernetesNode{ + NodeAddress: item.Spec.PortalIP, + NodePort: item.Spec.Port, + }) + } + + return ks, nil +} + +func (c *KubernetesRegistry) NewService(name string, nodes ...Node) Service { + var snodes []*KubernetesNode + + for _, node := range nodes { + if n, ok := node.(*KubernetesNode); ok { + snodes = append(snodes, n) + } + } + + return &KubernetesService{ + ServiceName: name, + ServiceNodes: snodes, + } +} + +func (c *KubernetesRegistry) NewNode(id, address string, port int) Node { + return &KubernetesNode{ + NodeId: id, + NodeAddress: address, + NodePort: port, + } +} + +func NewKubernetesRegistry() Registry { + client, _ := k8s.New(&k8s.Config{ + Host: "http://" + os.Getenv("KUBERNETES_RO_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_RO_SERVICE_PORT"), + }) + + return &KubernetesRegistry{ + Client: client, + Namespace: "default", + } +} diff --git a/registry/kubernetes_service.go b/registry/kubernetes_service.go new file mode 100644 index 00000000..c81c7ae2 --- /dev/null +++ b/registry/kubernetes_service.go @@ -0,0 +1,20 @@ +package registry + +type KubernetesService struct { + ServiceName string + ServiceNodes []*KubernetesNode +} + +func (c *KubernetesService) Name() string { + return c.ServiceName +} + +func (c *KubernetesService) Nodes() []Node { + var nodes []Node + + for _, node := range c.ServiceNodes { + nodes = append(nodes, node) + } + + return nodes +} diff --git a/registry/node.go b/registry/node.go new file mode 100644 index 00000000..fe5831f2 --- /dev/null +++ b/registry/node.go @@ -0,0 +1,11 @@ +package registry + +type Node interface { + Id() string + Address() string + Port() int +} + +func NewNode(id, address string, port int) Node { + return DefaultRegistry.NewNode(id, address, port) +} diff --git a/registry/registry.go b/registry/registry.go new file mode 100644 index 00000000..35235b48 --- /dev/null +++ b/registry/registry.go @@ -0,0 +1,25 @@ +package registry + +type Registry interface { + Register(Service) error + Deregister(Service) error + GetService(string) (Service, error) + NewService(string, ...Node) Service + NewNode(string, string, int) Node +} + +var ( + DefaultRegistry = NewConsulRegistry() +) + +func Register(s Service) error { + return DefaultRegistry.Register(s) +} + +func Deregister(s Service) error { + return DefaultRegistry.Deregister(s) +} + +func GetService(name string) (Service, error) { + return DefaultRegistry.GetService(name) +} diff --git a/registry/service.go b/registry/service.go new file mode 100644 index 00000000..e3bd6964 --- /dev/null +++ b/registry/service.go @@ -0,0 +1,10 @@ +package registry + +type Service interface { + Name() string + Nodes() []Node +} + +func NewService(name string, nodes ...Node) Service { + return DefaultRegistry.NewService(name, nodes...) +} diff --git a/server/buffer.go b/server/buffer.go new file mode 100644 index 00000000..e3f6ebec --- /dev/null +++ b/server/buffer.go @@ -0,0 +1,14 @@ +package server + +import ( + "io" +) + +type buffer struct { + io.Reader + io.Writer +} + +func (b *buffer) Close() error { + return nil +} diff --git a/server/context.go b/server/context.go new file mode 100644 index 00000000..f556f0d3 --- /dev/null +++ b/server/context.go @@ -0,0 +1,35 @@ +package server + +import ( + "time" + + "code.google.com/p/go.net/context" +) + +type ctx struct{} + +func (ctx *ctx) Deadline() (deadline time.Time, ok bool) { + return time.Time{}, false +} + +func (ctx *ctx) Done() <-chan struct{} { + return nil +} + +func (ctx *ctx) Err() error { + return nil +} + +func (ctx *ctx) Value(key interface{}) interface{} { + return nil +} + +func newContext(parent context.Context, s *serverContext) context.Context { + return context.WithValue(parent, "serverContext", s) +} + +// return server.Context +func NewContext(ctx context.Context) (Context, bool) { + c, ok := ctx.Value("serverContext").(*serverContext) + return c, ok +} diff --git a/server/headers.go b/server/headers.go new file mode 100644 index 00000000..9cdf78e7 --- /dev/null +++ b/server/headers.go @@ -0,0 +1,8 @@ +package server + +type Headers interface { + Add(string, string) + Del(string) + Get(string) string + Set(string, string) +} diff --git a/server/health_checker.go b/server/health_checker.go new file mode 100644 index 00000000..612aaa58 --- /dev/null +++ b/server/health_checker.go @@ -0,0 +1,21 @@ +package server + +import ( + "io" + "net/http" + "net/url" +) + +func registerHealthChecker(mux *http.ServeMux) { + req := &http.Request{ + Method: "GET", + URL: &url.URL{ + Path: HealthPath, + }, + } + if _, path := mux.Handler(req); path != HealthPath { + mux.HandleFunc(HealthPath, func(w http.ResponseWriter, r *http.Request) { + io.WriteString(w, "ok") + }) + } +} diff --git a/server/receiver.go b/server/receiver.go new file mode 100644 index 00000000..f7498e30 --- /dev/null +++ b/server/receiver.go @@ -0,0 +1,6 @@ +package server + +type Receiver interface { + Name() string + Handler() interface{} +} diff --git a/server/request.go b/server/request.go new file mode 100644 index 00000000..6c620f4f --- /dev/null +++ b/server/request.go @@ -0,0 +1,6 @@ +package server + +type Request interface { + Headers() Headers + Session(string) string +} diff --git a/server/rpc_receiver.go b/server/rpc_receiver.go new file mode 100644 index 00000000..11d25bb1 --- /dev/null +++ b/server/rpc_receiver.go @@ -0,0 +1,29 @@ +package server + +type RpcReceiver struct { + name string + handler interface{} +} + +func newRpcReceiver(name string, handler interface{}) *RpcReceiver { + return &RpcReceiver{ + name: name, + handler: handler, + } +} + +func (r *RpcReceiver) Name() string { + return r.name +} + +func (r *RpcReceiver) Handler() interface{} { + return r.handler +} + +func NewRpcReceiver(handler interface{}) *RpcReceiver { + return newRpcReceiver("", handler) +} + +func NewNamedRpcReceiver(name string, handler interface{}) *RpcReceiver { + return newRpcReceiver(name, handler) +} diff --git a/server/rpc_server.go b/server/rpc_server.go new file mode 100644 index 00000000..c2f87491 --- /dev/null +++ b/server/rpc_server.go @@ -0,0 +1,215 @@ +package server + +import ( + "bytes" + "fmt" + "io/ioutil" + "net" + "net/http" + "runtime/debug" + "strconv" + "sync" + + "github.com/asim/go-micro/errors" + log "github.com/cihub/seelog" + rpc "github.com/youtube/vitess/go/rpcplus" + js "github.com/youtube/vitess/go/rpcplus/jsonrpc" + pb "github.com/youtube/vitess/go/rpcplus/pbrpc" +) + +type RpcServer struct { + mtx sync.RWMutex + rpc *rpc.Server + address string + exit chan chan error +} + +var ( + HealthPath = "/_status/health" + RpcPath = "/_rpc" +) + +func executeRequestSafely(c *serverContext, r *http.Request) { + defer func() { + if x := recover(); x != nil { + log.Criticalf("Panicked on request: %v", r) + log.Criticalf("%v: %v", x, string(debug.Stack())) + err := errors.InternalServerError("go.micro.server", "Unexpected error") + c.WriteHeader(500) + c.Write([]byte(err.Error())) + } + }() + + http.DefaultServeMux.ServeHTTP(c, r) +} + +func (s *RpcServer) handler(w http.ResponseWriter, r *http.Request) { + c := &serverContext{ + req: &serverRequest{r}, + outHeader: w.Header(), + } + + ctxs.Lock() + ctxs.m[r] = c + ctxs.Unlock() + defer func() { + ctxs.Lock() + delete(ctxs.m, r) + ctxs.Unlock() + }() + + // Patch up RemoteAddr so it looks reasonable. + if addr := r.Header.Get("X-Forwarded-For"); len(addr) > 0 { + r.RemoteAddr = addr + } else { + // Should not normally reach here, but pick a sensible default anyway. + r.RemoteAddr = "127.0.0.1" + } + // The address in the headers will most likely be of these forms: + // 123.123.123.123 + // 2001:db8::1 + // net/http.Request.RemoteAddr is specified to be in "IP:port" form. + if _, _, err := net.SplitHostPort(r.RemoteAddr); err != nil { + // Assume the remote address is only a host; add a default port. + r.RemoteAddr = net.JoinHostPort(r.RemoteAddr, "80") + } + + executeRequestSafely(c, r) + c.outHeader = nil // make sure header changes aren't respected any more + + // Avoid nil Write call if c.Write is never called. + if c.outCode != 0 { + w.WriteHeader(c.outCode) + } + if c.outBody != nil { + w.Write(c.outBody) + } +} + +func (s *RpcServer) Address() string { + s.mtx.RLock() + defer s.mtx.RUnlock() + return s.address +} + +func (s *RpcServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { + serveCtx := getServerContext(req) + + // TODO: get user scope from context + // check access + + if req.Method != "POST" { + err := errors.BadRequest("go.micro.server", "Method not allowed") + http.Error(w, err.Error(), http.StatusMethodNotAllowed) + return + } + defer req.Body.Close() + + b, err := ioutil.ReadAll(req.Body) + if err != nil { + errr := errors.InternalServerError("go.micro.server", fmt.Sprintf("Error reading request body: %v", err)) + w.WriteHeader(500) + w.Write([]byte(errr.Error())) + log.Errorf("Erroring reading request body: %v", err) + return + } + + rbq := bytes.NewBuffer(b) + rsp := bytes.NewBuffer(nil) + defer rsp.Reset() + defer rbq.Reset() + + buf := &buffer{ + rbq, + rsp, + } + + var cc rpc.ServerCodec + switch req.Header.Get("Content-Type") { + case "application/octet-stream": + cc = pb.NewServerCodec(buf) + case "application/json": + cc = js.NewServerCodec(buf) + default: + err = errors.InternalServerError("go.micro.server", fmt.Sprintf("Unsupported content-type: %v", req.Header.Get("Content-Type"))) + w.WriteHeader(500) + w.Write([]byte(err.Error())) + return + } + + ctx := newContext(&ctx{}, serveCtx) + err = s.rpc.ServeRequestWithContext(ctx, cc) + if err != nil { + // This should not be possible. + w.WriteHeader(500) + w.Write([]byte(err.Error())) + log.Errorf("Erroring serving request: %v", err) + return + } + + w.Header().Set("Content-Type", req.Header.Get("Content-Type")) + w.Header().Set("Content-Length", strconv.Itoa(rsp.Len())) + w.Write(rsp.Bytes()) +} + +func (s *RpcServer) Init() error { + log.Debugf("Rpc handler %s", RpcPath) + http.Handle(RpcPath, s) + return nil +} + +func (s *RpcServer) NewReceiver(handler interface{}) Receiver { + return newRpcReceiver("", handler) +} + +func (s *RpcServer) NewNamedReceiver(name string, handler interface{}) Receiver { + return newRpcReceiver(name, handler) +} + +func (s *RpcServer) Register(r Receiver) error { + if len(r.Name()) > 0 { + s.rpc.RegisterName(r.Name(), r.Handler()) + return nil + } + + s.rpc.Register(r.Handler()) + return nil +} + +func (s *RpcServer) Start() error { + registerHealthChecker(http.DefaultServeMux) + + l, err := net.Listen("tcp", s.address) + if err != nil { + return err + } + + log.Debugf("Listening on %s", l.Addr().String()) + + s.mtx.Lock() + s.address = l.Addr().String() + s.mtx.Unlock() + + go http.Serve(l, http.HandlerFunc(s.handler)) + + go func() { + ch := <-s.exit + ch <- l.Close() + }() + + return nil +} + +func (s *RpcServer) Stop() error { + ch := make(chan error) + s.exit <- ch + return <-ch +} + +func NewRpcServer(address string) *RpcServer { + return &RpcServer{ + rpc: rpc.NewServer(), + address: address, + exit: make(chan chan error), + } +} diff --git a/server/server.go b/server/server.go new file mode 100644 index 00000000..92f22e77 --- /dev/null +++ b/server/server.go @@ -0,0 +1,112 @@ +package server + +import ( + "flag" + "os" + "os/signal" + "strconv" + "strings" + "syscall" + + "code.google.com/p/go-uuid/uuid" + "github.com/asim/go-micro/registry" + "github.com/asim/go-micro/store" + log "github.com/cihub/seelog" +) + +type Server interface { + Address() string + Init() error + NewReceiver(interface{}) Receiver + NewNamedReceiver(string, interface{}) Receiver + Register(Receiver) error + Start() error + Stop() error +} + +var ( + Name string + Id string + DefaultServer Server + + flagRegistry string + flagBindAddress string +) + +func init() { + flag.StringVar(&flagRegistry, "registry", "consul", "Registry for discovery. kubernetes, consul, etc") + flag.StringVar(&flagBindAddress, "bind_address", ":0", "Bind address for the server. 127.0.0.1:8080") +} + +func Init() error { + flag.Parse() + + switch flagRegistry { + case "kubernetes": + registry.DefaultRegistry = registry.NewKubernetesRegistry() + store.DefaultStore = store.NewMemcacheStore() + } + + if len(Name) == 0 { + Name = "go-server" + } + + if len(Id) == 0 { + Id = Name + "-" + uuid.NewUUID().String() + } + + if DefaultServer == nil { + DefaultServer = NewRpcServer(flagBindAddress) + } + + return DefaultServer.Init() +} + +func NewReceiver(handler interface{}) Receiver { + return DefaultServer.NewReceiver(handler) +} + +func NewNamedReceiver(path string, handler interface{}) Receiver { + return DefaultServer.NewNamedReceiver(path, handler) +} + +func Register(r Receiver) error { + return DefaultServer.Register(r) +} + +func Run() error { + if err := Start(); err != nil { + return err + } + + // parse address for host, port + parts := strings.Split(DefaultServer.Address(), ":") + host := strings.Join(parts[:len(parts)-1], ":") + port, _ := strconv.Atoi(parts[len(parts)-1]) + + // register service + node := registry.NewNode(Id, host, port) + service := registry.NewService(Name, node) + + log.Debugf("Registering %s", node.Id()) + registry.Register(service) + + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) + log.Debugf("Received signal %s", <-ch) + + log.Debugf("Deregistering %s", node.Id()) + registry.Deregister(service) + + return Stop() +} + +func Start() error { + log.Debugf("Starting server %s id %s", Name, Id) + return DefaultServer.Start() +} + +func Stop() error { + log.Debugf("Stopping server") + return DefaultServer.Stop() +} diff --git a/server/server_context.go b/server/server_context.go new file mode 100644 index 00000000..82955682 --- /dev/null +++ b/server/server_context.go @@ -0,0 +1,120 @@ +package server + +import ( + "net/http" + "sync" + + "github.com/asim/go-micro/client" + log "github.com/cihub/seelog" +) + +var ctxs = struct { + sync.Mutex + m map[*http.Request]*serverContext +}{ + m: make(map[*http.Request]*serverContext), +} + +// A server context interface +type Context interface { + Request() Request // the request made to the server + Headers() Headers // the response headers + NewRequest(string, string, interface{}) client.Request // a new scoped client request + NewProtoRequest(string, string, interface{}) client.Request // a new scoped client request + NewJsonRequest(string, string, interface{}) client.Request // a new scoped client request +} + +// context represents the context of an in-flight HTTP request. +// It implements the appengine.Context and http.ResponseWriter interfaces. +type serverContext struct { + req *serverRequest + outCode int + outHeader http.Header + outBody []byte +} + +// Copied from $GOROOT/src/pkg/net/http/transfer.go. Some response status +// codes do not permit a response body (nor response entity headers such as +// Content-Length, Content-Type, etc). +func bodyAllowedForStatus(status int) bool { + switch { + case status >= 100 && status <= 199: + return false + case status == 204: + return false + case status == 304: + return false + } + return true +} + +func getServerContext(req *http.Request) *serverContext { + ctxs.Lock() + c := ctxs.m[req] + ctxs.Unlock() + + if c == nil { + // Someone passed in an http.Request that is not in-flight. + panic("NewContext passed an unknown http.Request") + } + return c +} + +func (c *serverContext) NewRequest(service, method string, request interface{}) client.Request { + req := client.NewRequest(service, method, request) + // TODO: set headers and scope + req.Headers().Set("X-User-Session", c.Request().Session("X-User-Session")) + return req +} + +func (c *serverContext) NewProtoRequest(service, method string, request interface{}) client.Request { + req := client.NewProtoRequest(service, method, request) + // TODO: set headers and scope + req.Headers().Set("X-User-Session", c.Request().Session("X-User-Session")) + return req +} + +func (c *serverContext) NewJsonRequest(service, method string, request interface{}) client.Request { + req := client.NewJsonRequest(service, method, request) + // TODO: set headers and scope + req.Headers().Set("X-User-Session", c.Request().Session("X-User-Session")) + return req +} + +// The response headers +func (c *serverContext) Headers() Headers { + return c.outHeader +} + +// The response headers +func (c *serverContext) Header() http.Header { + return c.outHeader +} + +// The request made to the server +func (c *serverContext) Request() Request { + return c.req +} + +func (c *serverContext) Write(b []byte) (int, error) { + if c.outCode == 0 { + c.WriteHeader(http.StatusOK) + } + if len(b) > 0 && !bodyAllowedForStatus(c.outCode) { + return 0, http.ErrBodyNotAllowed + } + c.outBody = append(c.outBody, b...) + return len(b), nil +} + +func (c *serverContext) WriteHeader(code int) { + if c.outCode != 0 { + log.Errorf("WriteHeader called multiple times on request.") + return + } + c.outCode = code +} + +func GetContext(r *http.Request) *serverContext { + return getServerContext(r) +} diff --git a/server/server_request.go b/server/server_request.go new file mode 100644 index 00000000..9058ec52 --- /dev/null +++ b/server/server_request.go @@ -0,0 +1,25 @@ +package server + +import ( + "net/http" +) + +type serverRequest struct { + req *http.Request +} + +func (s *serverRequest) Headers() Headers { + return s.req.Header +} + +func (s *serverRequest) Session(name string) string { + if sess := s.Headers().Get(name); len(sess) > 0 { + return sess + } + + c, err := s.req.Cookie(name) + if err != nil { + return "" + } + return c.Value +} diff --git a/store/consul_item.go b/store/consul_item.go new file mode 100644 index 00000000..00c5c718 --- /dev/null +++ b/store/consul_item.go @@ -0,0 +1,14 @@ +package store + +type ConsulItem struct { + key string + value []byte +} + +func (c *ConsulItem) Key() string { + return c.key +} + +func (c *ConsulItem) Value() []byte { + return c.value +} diff --git a/store/consul_store.go b/store/consul_store.go new file mode 100644 index 00000000..147c35c0 --- /dev/null +++ b/store/consul_store.go @@ -0,0 +1,55 @@ +package store + +import ( + "errors" + + consul "github.com/armon/consul-api" +) + +type ConsulStore struct { + Client *consul.Client +} + +func (c *ConsulStore) Get(key string) (Item, error) { + kv, _, err := c.Client.KV().Get(key, nil) + if err != nil { + return nil, err + } + if kv == nil { + return nil, errors.New("key not found") + } + + return &ConsulItem{ + key: kv.Key, + value: kv.Value, + }, nil +} + +func (c *ConsulStore) Del(key string) error { + _, err := c.Client.KV().Delete(key, nil) + return err +} + +func (c *ConsulStore) Put(item Item) error { + _, err := c.Client.KV().Put(&consul.KVPair{ + Key: item.Key(), + Value: item.Value(), + }, nil) + + return err +} + +func (c *ConsulStore) NewItem(key string, value []byte) Item { + return &ConsulItem{ + key: key, + value: value, + } +} + +func NewConsulStore() Store { + client, _ := consul.NewClient(&consul.Config{}) + + return &ConsulStore{ + Client: client, + } +} diff --git a/store/item.go b/store/item.go new file mode 100644 index 00000000..c0d7e23d --- /dev/null +++ b/store/item.go @@ -0,0 +1,6 @@ +package store + +type Item interface { + Key() string + Value() []byte +} diff --git a/store/memcached_item.go b/store/memcached_item.go new file mode 100644 index 00000000..7b35cf7a --- /dev/null +++ b/store/memcached_item.go @@ -0,0 +1,14 @@ +package store + +type MemcacheItem struct { + key string + value []byte +} + +func (m *MemcacheItem) Key() string { + return m.key +} + +func (m *MemcacheItem) Value() []byte { + return m.value +} diff --git a/store/memcached_store.go b/store/memcached_store.go new file mode 100644 index 00000000..54eb2306 --- /dev/null +++ b/store/memcached_store.go @@ -0,0 +1,65 @@ +package store + +import ( + "errors" + "os" + + mc "github.com/bradfitz/gomemcache/memcache" +) + +type MemcacheStore struct { + Client *mc.Client +} + +func (m *MemcacheStore) Get(key string) (Item, error) { + kv, err := m.Client.Get(key) + if err != nil && err == mc.ErrCacheMiss { + return nil, errors.New("key not found") + } else if err != nil { + return nil, err + } + + if kv == nil { + return nil, errors.New("key not found") + } + + return &MemcacheItem{ + key: kv.Key, + value: kv.Value, + }, nil +} + +func (m *MemcacheStore) Del(key string) error { + return m.Client.Delete(key) +} + +func (m *MemcacheStore) Put(item Item) error { + return m.Client.Set(&mc.Item{ + Key: item.Key(), + Value: item.Value(), + }) +} + +func (m *MemcacheStore) NewItem(key string, value []byte) Item { + return &MemcacheItem{ + key: key, + value: value, + } +} + +func NewMemcacheStore() Store { + server := os.Getenv("MEMCACHED_SERVICE_HOST") + port := os.Getenv("MEMCACHED_SERVICE_PORT") + + if len(server) == 0 { + server = "127.0.0.1" + } + + if len(port) == 0 { + port = "11211" + } + + return &MemcacheStore{ + Client: mc.New(server + ":" + port), + } +} diff --git a/store/store.go b/store/store.go new file mode 100644 index 00000000..e07ce125 --- /dev/null +++ b/store/store.go @@ -0,0 +1,28 @@ +package store + +type Store interface { + Get(string) (Item, error) + Del(string) error + Put(Item) error + NewItem(string, []byte) Item +} + +var ( + DefaultStore = NewConsulStore() +) + +func Get(key string) (Item, error) { + return DefaultStore.Get(key) +} + +func Del(key string) error { + return DefaultStore.Del(key) +} + +func Put(item Item) error { + return DefaultStore.Put(item) +} + +func NewItem(key string, value []byte) Item { + return DefaultStore.NewItem(key, value) +} diff --git a/template/Dockerfile b/template/Dockerfile new file mode 100644 index 00000000..4149ef71 --- /dev/null +++ b/template/Dockerfile @@ -0,0 +1,3 @@ +FROM scratch +ADD template / +ENTRYPOINT [ "/template" ] diff --git a/template/README.md b/template/README.md new file mode 100644 index 00000000..0262ada7 --- /dev/null +++ b/template/README.md @@ -0,0 +1,30 @@ +# Template Service + +An example Go service running with go-micro + +### Prerequisites + +Install Consul +[https://www.consul.io/intro/getting-started/install.html](https://www.consul.io/intro/getting-started/install.html) + +Run Consul +``` +$ consul agent -server -bootstrap-expect 1 -data-dir /tmp/consul +``` + +Run Service +``` +$ go run main.go + +1416690099281057746 [Debug] Rpc handler /_rpc +1416690099281092588 [Debug] Starting server go.micro.service.go-template id go.micro.service.go-template-c0bfcb44-728a-11e4-b099-68a86d0d36b6 +1416690099281192941 [Debug] Listening on [::]:58264 +1416690099281215346 [Debug] Registering go.micro.service.go-template-c0bfcb44-728a-11e4-b099-68a86d0d36b6 +``` + +Test Service +``` +$ go run go-micro/examples/service_client.go + +go.micro.service.go-template-c0bfcb44-728a-11e4-b099-68a86d0d36b6: Hello John +``` diff --git a/template/handler/example.go b/template/handler/example.go new file mode 100644 index 00000000..aeb47a1c --- /dev/null +++ b/template/handler/example.go @@ -0,0 +1,20 @@ +package handler + +import ( + "code.google.com/p/go.net/context" + "code.google.com/p/goprotobuf/proto" + + "github.com/asim/go-micro/server" + example "github.com/asim/go-micro/template/proto/example" + log "github.com/cihub/seelog" +) + +type Example struct{} + +func (e *Example) Call(ctx context.Context, req *example.Request, rsp *example.Response) error { + log.Debug("Received Example.Call request") + + rsp.Msg = proto.String(server.Id + ": Hello " + req.GetName()) + + return nil +} diff --git a/template/main.go b/template/main.go new file mode 100644 index 00000000..f946c448 --- /dev/null +++ b/template/main.go @@ -0,0 +1,28 @@ +package main + +import ( + "log" + + "github.com/asim/go-micro/server" + "github.com/asim/go-micro/template/handler" +) + +func main() { + server.Name = "go.micro.service.template" + + // Initialise Server + server.Init() + + // Register Handlers + server.Register( + server.NewReceiver( + new(handler.Example), + ), + ) + + // Run server + if err := server.Run(); err != nil { + log.Fatal(err) + } + +} diff --git a/template/pod.json b/template/pod.json new file mode 100644 index 00000000..1bb0e2ae --- /dev/null +++ b/template/pod.json @@ -0,0 +1,21 @@ +{ + "kind": "Pod", + "apiVersion": "v1beta1", + "id": "template-service", + "desiredState": { + "manifest": { + "version": "v1beta1", + "id": "template-service", + "containers": [{ + "name": "template-service", + "image": "chuhnk/go-template", + "ports": [{"name": "template-service", "containerPort": 8080}], + "command": ["--registry=kubernetes", "--bind_address=:8080"] + }], + } + }, + "labels": { + "name": "go.micro.service.template", + } +} + diff --git a/template/proto/example/example.pb.go b/template/proto/example/example.pb.go new file mode 100644 index 00000000..af50f655 --- /dev/null +++ b/template/proto/example/example.pb.go @@ -0,0 +1,57 @@ +// Code generated by protoc-gen-gogo. +// source: asim/go-micro/template/proto/example/example.proto +// DO NOT EDIT! + +/* +Package go_micro_service_template_example is a generated protocol buffer package. + +It is generated from these files: + asim/go-micro/template/proto/example/example.proto + +It has these top-level messages: + Request + Response +*/ +package go_micro_service_template_example + +import proto "code.google.com/p/gogoprotobuf/proto" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = math.Inf + +type Request struct { + Name *string `protobuf:"bytes,1,req,name=name" json:"name,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Request) Reset() { *m = Request{} } +func (m *Request) String() string { return proto.CompactTextString(m) } +func (*Request) ProtoMessage() {} + +func (m *Request) GetName() string { + if m != nil && m.Name != nil { + return *m.Name + } + return "" +} + +type Response struct { + Msg *string `protobuf:"bytes,1,req,name=msg" json:"msg,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Response) Reset() { *m = Response{} } +func (m *Response) String() string { return proto.CompactTextString(m) } +func (*Response) ProtoMessage() {} + +func (m *Response) GetMsg() string { + if m != nil && m.Msg != nil { + return *m.Msg + } + return "" +} + +func init() { +} diff --git a/template/proto/example/example.proto b/template/proto/example/example.proto new file mode 100644 index 00000000..7e76068f --- /dev/null +++ b/template/proto/example/example.proto @@ -0,0 +1,9 @@ +package go.micro.service.template.example; + +message Request { + required string name = 1; +} + +message Response { + required string msg = 1; +} diff --git a/template/service.json b/template/service.json new file mode 100644 index 00000000..4dc64727 --- /dev/null +++ b/template/service.json @@ -0,0 +1,9 @@ +{ + "id": "template-service", + "kind": "Service", + "apiVersion": "v1beta1", + "port": 9091, + "containerPort": 8080, + "selector": { "name": "go.micro.service.template" }, + "labels": { "name": "go.micro.service.template" } +}