diff --git a/api/handler/rpc/rpc.go b/api/handler/rpc/rpc.go deleted file mode 100644 index 7ddb980d..00000000 --- a/api/handler/rpc/rpc.go +++ /dev/null @@ -1,495 +0,0 @@ -// Package rpc is a go-micro rpc handler. -package rpc - -import ( - "encoding/json" - "io" - "net/http" - "strconv" - "strings" - - jsonpatch "github.com/evanphx/json-patch/v5" - "github.com/oxtoacart/bpool" - jsonrpc "github.com/unistack-org/micro-codec-jsonrpc" - protorpc "github.com/unistack-org/micro-codec-protorpc" - "github.com/unistack-org/micro/v3/api" - "github.com/unistack-org/micro/v3/api/handler" - "github.com/unistack-org/micro/v3/api/internal/proto" - "github.com/unistack-org/micro/v3/client" - "github.com/unistack-org/micro/v3/codec" - "github.com/unistack-org/micro/v3/errors" - "github.com/unistack-org/micro/v3/logger" - "github.com/unistack-org/micro/v3/metadata" - "github.com/unistack-org/micro/v3/util/ctx" - "github.com/unistack-org/micro/v3/util/qson" - "github.com/unistack-org/micro/v3/util/router" -) - -const ( - Handler = "rpc" -) - -var ( - // supported json codecs - jsonCodecs = []string{ - "application/grpc+json", - "application/json", - "application/json-rpc", - } - - // support proto codecs - protoCodecs = []string{ - "application/grpc", - "application/grpc+proto", - "application/proto", - "application/protobuf", - "application/proto-rpc", - "application/octet-stream", - } - - bufferPool = bpool.NewSizedBufferPool(1024, 8) -) - -type rpcHandler struct { - opts handler.Options - s *api.Service -} - -type buffer struct { - io.ReadCloser -} - -func (b *buffer) Write(_ []byte) (int, error) { - return 0, nil -} - -func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - bsize := handler.DefaultMaxRecvSize - if h.opts.MaxRecvSize > 0 { - bsize = h.opts.MaxRecvSize - } - - r.Body = http.MaxBytesReader(w, r.Body, bsize) - - defer r.Body.Close() - var service *api.Service - - if h.s != nil { - // we were given the service - service = h.s - } else if h.opts.Router != nil { - // try get service from router - s, err := h.opts.Router.Route(r) - if err != nil { - writeError(w, r, errors.InternalServerError("go.micro.api", err.Error())) - return - } - service = s - } else { - // we have no way of routing the request - writeError(w, r, errors.InternalServerError("go.micro.api", "no route found")) - return - } - - ct := r.Header.Get("Content-Type") - - // Strip charset from Content-Type (like `application/json; charset=UTF-8`) - if idx := strings.IndexRune(ct, ';'); idx >= 0 { - ct = ct[:idx] - } - - // micro client - c := h.opts.Client - - // create context - cx := ctx.FromRequest(r) - - // set merged context to request - *r = *r.Clone(cx) - // if stream we currently only support json - if isStream(r, service) { - serveWebsocket(cx, w, r, service, c) - return - } - - // create custom router - callOpt := client.WithRouter(router.New(service.Services)) - - // walk the standard call path - // get payload - br, err := requestPayload(r) - if err != nil { - writeError(w, r, err) - return - } - - var rsp []byte - - switch { - // proto codecs - case hasCodec(ct, protoCodecs): - request := &proto.Message{} - // if the extracted payload isn't empty lets use it - if len(br) > 0 { - request = proto.NewMessage(br) - } - - // create request/response - response := &proto.Message{} - - req := c.NewRequest( - service.Name, - service.Endpoint.Name, - request, - client.WithContentType(ct), - ) - - // make the call - if err := c.Call(cx, req, response, callOpt); err != nil { - writeError(w, r, err) - return - } - - // marshall response - rsp, err = response.Marshal() - if err != nil { - writeError(w, r, err) - return - } - - default: - // if json codec is not present set to json - if !hasCodec(ct, jsonCodecs) { - ct = "application/json" - } - - // default to trying json - var request json.RawMessage - // if the extracted payload isn't empty lets use it - if len(br) > 0 { - request = json.RawMessage(br) - } - - // create request/response - var response json.RawMessage - - req := c.NewRequest( - service.Name, - service.Endpoint.Name, - &request, - client.WithContentType(ct), - ) - // make the call - if err := c.Call(cx, req, &response, callOpt); err != nil { - writeError(w, r, err) - return - } - - // marshall response - rsp, err = response.MarshalJSON() - if err != nil { - writeError(w, r, err) - return - } - } - - // write the response - writeResponse(w, r, rsp) -} - -func (rh *rpcHandler) String() string { - return "rpc" -} - -func hasCodec(ct string, codecs []string) bool { - for _, codec := range codecs { - if ct == codec { - return true - } - } - return false -} - -// requestPayload takes a *http.Request. -// If the request is a GET the query string parameters are extracted and marshaled to JSON and the raw bytes are returned. -// If the request method is a POST the request body is read and returned -func requestPayload(r *http.Request) ([]byte, error) { - var err error - - // we have to decode json-rpc and proto-rpc because we suck - // well actually because there's no proxy codec right now - - ct := r.Header.Get("Content-Type") - switch { - case strings.Contains(ct, "application/json-rpc"): - msg := codec.Message{ - Type: codec.Request, - Header: metadata.New(0), - } - c := jsonrpc.NewCodec(&buffer{r.Body}) - if err = c.ReadHeader(&msg, codec.Request); err != nil { - return nil, err - } - var raw json.RawMessage - if err = c.ReadBody(&raw); err != nil { - return nil, err - } - return ([]byte)(raw), nil - case strings.Contains(ct, "application/proto-rpc"), strings.Contains(ct, "application/octet-stream"): - msg := codec.Message{ - Type: codec.Request, - Header: metadata.New(0), - } - c := protorpc.NewCodec(&buffer{r.Body}) - if err = c.ReadHeader(&msg, codec.Request); err != nil { - return nil, err - } - var raw proto.Message - if err = c.ReadBody(&raw); err != nil { - return nil, err - } - return raw.Marshal() - case strings.Contains(ct, "application/www-x-form-urlencoded"): - if err = r.ParseForm(); err != nil { - return nil, err - } - - // generate a new set of values from the form - vals := make(map[string]string, len(r.Form)) - for k, v := range r.Form { - vals[k] = strings.Join(v, ",") - } - - // marshal - return json.Marshal(vals) - // TODO: application/grpc - } - - // otherwise as per usual - ctx := r.Context() - // dont user metadata.FromContext as it mangles names - md, ok := metadata.FromContext(ctx) - if !ok { - md = metadata.New(0) - } - - // allocate maximum - matches := make(map[string]interface{}, len(md)) - bodydst := "" - - // get fields from url path - for k, v := range md { - k = strings.ToLower(k) - // filter own keys - if strings.HasPrefix(k, "x-api-field-") { - matches[strings.TrimPrefix(k, "x-api-field-")] = v - delete(md, k) - } else if k == "x-api-body" { - bodydst = v - delete(md, k) - } - } - - // map of all fields - req := make(map[string]interface{}, len(md)) - - // get fields from url values - if len(r.URL.RawQuery) > 0 { - umd := make(map[string]interface{}) - err = qson.Unmarshal(&umd, r.URL.RawQuery) - if err != nil { - return nil, err - } - for k, v := range umd { - matches[k] = v - } - } - - // restore context without fields - *r = *r.Clone(metadata.NewContext(ctx, md)) - - for k, v := range matches { - ps := strings.Split(k, ".") - if len(ps) == 1 { - req[k] = v - continue - } - em := make(map[string]interface{}) - em[ps[len(ps)-1]] = v - for i := len(ps) - 2; i > 0; i-- { - nm := make(map[string]interface{}) - nm[ps[i]] = em - em = nm - } - if vm, ok := req[ps[0]]; ok { - // nested map - nm := vm.(map[string]interface{}) - for vk, vv := range em { - nm[vk] = vv - } - req[ps[0]] = nm - } else { - req[ps[0]] = em - } - } - pathbuf := []byte("{}") - if len(req) > 0 { - pathbuf, err = json.Marshal(req) - if err != nil { - return nil, err - } - } - - urlbuf := []byte("{}") - out, err := jsonpatch.MergeMergePatches(urlbuf, pathbuf) - if err != nil { - return nil, err - } - - switch r.Method { - case "GET": - // empty response - if strings.Contains(ct, "application/json") && string(out) == "{}" { - return out, nil - } else if string(out) == "{}" && !strings.Contains(ct, "application/json") { - return []byte{}, nil - } - return out, nil - case "PATCH", "POST", "PUT", "DELETE": - bodybuf := []byte("{}") - buf := bufferPool.Get() - defer bufferPool.Put(buf) - if _, err := buf.ReadFrom(r.Body); err != nil { - return nil, err - } - if b := buf.Bytes(); len(b) > 0 { - bodybuf = b - } - if bodydst == "" || bodydst == "*" { - if out, err = jsonpatch.MergeMergePatches(out, bodybuf); err == nil { - return out, nil - } - } - var jsonbody map[string]interface{} - if json.Valid(bodybuf) { - if err = json.Unmarshal(bodybuf, &jsonbody); err != nil { - return nil, err - } - } - dstmap := make(map[string]interface{}) - ps := strings.Split(bodydst, ".") - if len(ps) == 1 { - if jsonbody != nil { - dstmap[ps[0]] = jsonbody - } else { - // old unexpected behaviour - dstmap[ps[0]] = bodybuf - } - } else { - em := make(map[string]interface{}) - if jsonbody != nil { - em[ps[len(ps)-1]] = jsonbody - } else { - // old unexpected behaviour - em[ps[len(ps)-1]] = bodybuf - } - for i := len(ps) - 2; i > 0; i-- { - nm := make(map[string]interface{}) - nm[ps[i]] = em - em = nm - } - dstmap[ps[0]] = em - } - - bodyout, err := json.Marshal(dstmap) - if err != nil { - return nil, err - } - - if out, err = jsonpatch.MergeMergePatches(out, bodyout); err == nil { - return out, nil - } - - //fallback to previous unknown behaviour - return bodybuf, nil - - } - - return []byte{}, nil -} - -func writeError(w http.ResponseWriter, r *http.Request, err error) { - ce := errors.Parse(err.Error()) - - switch ce.Code { - case 0: - // assuming it's totally screwed - ce.Code = 500 - ce.Id = "go.micro.api" - ce.Status = http.StatusText(500) - ce.Detail = "error during request: " + ce.Detail - w.WriteHeader(500) - default: - w.WriteHeader(int(ce.Code)) - } - - // response content type - w.Header().Set("Content-Type", "application/json") - - // Set trailers - if strings.Contains(r.Header.Get("Content-Type"), "application/grpc") { - w.Header().Set("Trailer", "grpc-status") - w.Header().Set("Trailer", "grpc-message") - w.Header().Set("grpc-status", "13") - w.Header().Set("grpc-message", ce.Detail) - } - - _, werr := w.Write([]byte(ce.Error())) - if werr != nil { - if logger.V(logger.ErrorLevel) { - logger.Error(werr.Error()) - } - } -} - -func writeResponse(w http.ResponseWriter, r *http.Request, rsp []byte) { - w.Header().Set("Content-Type", r.Header.Get("Content-Type")) - w.Header().Set("Content-Length", strconv.Itoa(len(rsp))) - - // Set trailers - if strings.Contains(r.Header.Get("Content-Type"), "application/grpc") { - w.Header().Set("Trailer", "grpc-status") - w.Header().Set("Trailer", "grpc-message") - w.Header().Set("grpc-status", "0") - w.Header().Set("grpc-message", "") - } - - // write 204 status if rsp is nil - if len(rsp) == 0 { - w.WriteHeader(http.StatusNoContent) - } - - // write response - _, err := w.Write(rsp) - if err != nil { - if logger.V(logger.ErrorLevel) { - logger.Error(err.Error()) - } - } - -} - -func NewHandler(opts ...handler.Option) handler.Handler { - options := handler.NewOptions(opts...) - return &rpcHandler{ - opts: options, - } -} - -func WithService(s *api.Service, opts ...handler.Option) handler.Handler { - options := handler.NewOptions(opts...) - return &rpcHandler{ - opts: options, - s: s, - } -} diff --git a/api/handler/rpc/rpc_test.go b/api/handler/rpc/rpc_test.go deleted file mode 100644 index 15453c44..00000000 --- a/api/handler/rpc/rpc_test.go +++ /dev/null @@ -1,112 +0,0 @@ -package rpc - -import ( - "bytes" - "net/http" - "testing" - - go_api "github.com/unistack-org/micro/v3/api/proto" - jsonpb "google.golang.org/protobuf/encoding/protojson" - "google.golang.org/protobuf/proto" -) - -func TestRequestPayloadFromRequest(t *testing.T) { - - // our test event so that we can validate serialising / deserializing of true protos works - protoEvent := go_api.Event{ - Name: "Test", - } - - protoBytes, err := proto.Marshal(&protoEvent) - if err != nil { - t.Fatal("Failed to marshal proto", err) - } - - jsonBytes, err := jsonpb.Marshal(&protoEvent) - if err != nil { - t.Fatal("Failed to marshal proto to JSON ", err) - } - - jsonUrlBytes := []byte(`{"key1":"val1","key2":"val2","name":"Test"}`) - - t.Run("extracting a json from a POST request with url params", func(t *testing.T) { - r, err := http.NewRequest("POST", "http://localhost/my/path?key1=val1&key2=val2", bytes.NewReader(jsonBytes)) - if err != nil { - t.Fatalf("Failed to created http.Request: %v", err) - } - - extByte, err := requestPayload(r) - if err != nil { - t.Fatalf("Failed to extract payload from request: %v", err) - } - if string(extByte) != string(jsonUrlBytes) { - t.Fatalf("Expected %v and %v to match", string(extByte), jsonUrlBytes) - } - }) - - t.Run("extracting a proto from a POST request", func(t *testing.T) { - r, err := http.NewRequest("POST", "http://localhost/my/path", bytes.NewReader(protoBytes)) - if err != nil { - t.Fatalf("Failed to created http.Request: %v", err) - } - - extByte, err := requestPayload(r) - if err != nil { - t.Fatalf("Failed to extract payload from request: %v", err) - } - if string(extByte) != string(protoBytes) { - t.Fatalf("Expected %v and %v to match", string(extByte), string(protoBytes)) - } - }) - - t.Run("extracting JSON from a POST request", func(t *testing.T) { - r, err := http.NewRequest("POST", "http://localhost/my/path", bytes.NewReader(jsonBytes)) - if err != nil { - t.Fatalf("Failed to created http.Request: %v", err) - } - - extByte, err := requestPayload(r) - if err != nil { - t.Fatalf("Failed to extract payload from request: %v", err) - } - if string(extByte) != string(jsonBytes) { - t.Fatalf("Expected %v and %v to match", string(extByte), string(jsonBytes)) - } - }) - - t.Run("extracting params from a GET request", func(t *testing.T) { - - r, err := http.NewRequest("GET", "http://localhost/my/path", nil) - if err != nil { - t.Fatalf("Failed to created http.Request: %v", err) - } - - q := r.URL.Query() - q.Add("name", "Test") - r.URL.RawQuery = q.Encode() - - extByte, err := requestPayload(r) - if err != nil { - t.Fatalf("Failed to extract payload from request: %v", err) - } - if string(extByte) != string(jsonBytes) { - t.Fatalf("Expected %v and %v to match", string(extByte), string(jsonBytes)) - } - }) - - t.Run("GET request with no params", func(t *testing.T) { - - r, err := http.NewRequest("GET", "http://localhost/my/path", nil) - if err != nil { - t.Fatalf("Failed to created http.Request: %v", err) - } - - extByte, err := requestPayload(r) - if err != nil { - t.Fatalf("Failed to extract payload from request: %v", err) - } - if string(extByte) != "" { - t.Fatalf("Expected %v and %v to match", string(extByte), "") - } - }) -} diff --git a/api/handler/rpc/stream.go b/api/handler/rpc/stream.go deleted file mode 100644 index e93c46ce..00000000 --- a/api/handler/rpc/stream.go +++ /dev/null @@ -1,263 +0,0 @@ -package rpc - -import ( - "bytes" - "context" - "encoding/json" - "io" - "net/http" - "strings" - "time" - - "github.com/gobwas/httphead" - "github.com/gobwas/ws" - "github.com/gobwas/ws/wsutil" - raw "github.com/unistack-org/micro-codec-bytes" - "github.com/unistack-org/micro/v3/api" - "github.com/unistack-org/micro/v3/client" - "github.com/unistack-org/micro/v3/logger" - "github.com/unistack-org/micro/v3/util/router" -) - -// serveWebsocket will stream rpc back over websockets assuming json -func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request, service *api.Service, c client.Client) { - var op ws.OpCode - - ct := r.Header.Get("Content-Type") - // Strip charset from Content-Type (like `application/json; charset=UTF-8`) - if idx := strings.IndexRune(ct, ';'); idx >= 0 { - ct = ct[:idx] - } - - // check proto from request - switch ct { - case "application/json": - op = ws.OpText - default: - op = ws.OpBinary - } - - hdr := make(http.Header) - if proto, ok := r.Header["Sec-WebSocket-Protocol"]; ok { - for _, p := range proto { - switch p { - case "binary": - hdr["Sec-WebSocket-Protocol"] = []string{"binary"} - op = ws.OpBinary - default: - op = ws.OpBinary - } - } - } - payload, err := requestPayload(r) - if err != nil { - if logger.V(logger.ErrorLevel) { - logger.Error(err.Error()) - } - return - } - - upgrader := ws.HTTPUpgrader{Timeout: 5 * time.Second, - Protocol: func(proto string) bool { - if strings.Contains(proto, "binary") { - return true - } - // fallback to support all protocols now - return true - }, - Extension: func(httphead.Option) bool { - // disable extensions for compatibility - return false - }, - Header: hdr, - } - - conn, rw, _, err := upgrader.Upgrade(r, w) - if err != nil { - if logger.V(logger.ErrorLevel) { - logger.Error(err.Error()) - } - return - } - - defer func() { - if err := conn.Close(); err != nil { - if logger.V(logger.ErrorLevel) { - logger.Error(err.Error()) - } - return - } - }() - - var request interface{} - if !bytes.Equal(payload, []byte(`{}`)) { - switch ct { - case "application/json", "": - m := json.RawMessage(payload) - request = &m - default: - request = &raw.Frame{Data: payload} - } - } - - // we always need to set content type for message - if ct == "" { - ct = "application/json" - } - req := c.NewRequest( - service.Name, - service.Endpoint.Name, - request, - client.WithContentType(ct), - client.StreamingRequest(), - ) - - // create custom router - callOpt := client.WithRouter(router.New(service.Services)) - - // create a new stream - stream, err := c.Stream(ctx, req, callOpt) - if err != nil { - if logger.V(logger.ErrorLevel) { - logger.Error(err.Error()) - } - return - } - - if request != nil { - if err = stream.Send(request); err != nil { - if logger.V(logger.ErrorLevel) { - logger.Error(err.Error()) - } - return - } - } - - go writeLoop(rw, stream) - - rsp := stream.Response() - - // receive from stream and send to client - for { - select { - case <-ctx.Done(): - return - case <-stream.Context().Done(): - return - default: - // read backend response body - buf, err := rsp.Read() - if err != nil { - // wants to avoid import grpc/status.Status - if strings.Contains(err.Error(), "context canceled") { - return - } - if logger.V(logger.ErrorLevel) { - logger.Error(err.Error()) - } - return - } - - // write the response - if err := wsutil.WriteServerMessage(rw, op, buf); err != nil { - if logger.V(logger.ErrorLevel) { - logger.Error(err.Error()) - } - return - } - if err = rw.Flush(); err != nil { - if logger.V(logger.ErrorLevel) { - logger.Error(err.Error()) - } - return - } - } - } -} - -// writeLoop -func writeLoop(rw io.ReadWriter, stream client.Stream) { - // close stream when done - defer stream.Close() - - for { - select { - case <-stream.Context().Done(): - return - default: - buf, op, err := wsutil.ReadClientData(rw) - if err != nil { - if wserr, ok := err.(wsutil.ClosedError); ok { - switch wserr.Code { - case ws.StatusGoingAway: - // this happens when user leave the page - return - case ws.StatusNormalClosure, ws.StatusNoStatusRcvd: - // this happens when user close ws connection, or we don't get any status - return - } - } - if logger.V(logger.ErrorLevel) { - logger.Error(err.Error()) - } - return - } - switch op { - default: - // not relevant - continue - case ws.OpText, ws.OpBinary: - break - } - // send to backend - // default to trying json - // if the extracted payload isn't empty lets use it - request := &raw.Frame{Data: buf} - if err := stream.Send(request); err != nil { - if logger.V(logger.ErrorLevel) { - logger.Error(err.Error()) - } - return - } - } - } -} - -func isStream(r *http.Request, srv *api.Service) bool { - // check if it's a web socket - if !isWebSocket(r) { - return false - } - // check if the endpoint supports streaming - for _, service := range srv.Services { - for _, ep := range service.Endpoints { - // skip if it doesn't match the name - if ep.Name != srv.Endpoint.Name { - continue - } - // matched if the name - if v := ep.Metadata["stream"]; v == "true" { - return true - } - } - } - return false -} - -func isWebSocket(r *http.Request) bool { - contains := func(key, val string) bool { - vv := strings.Split(r.Header.Get(key), ",") - for _, v := range vv { - if val == strings.ToLower(strings.TrimSpace(v)) { - return true - } - } - return false - } - - if contains("Connection", "upgrade") && contains("Upgrade", "websocket") { - return true - } - - return false -} diff --git a/api/server/acme/certmagic/storage.go b/api/server/acme/certmagic/storage.go index ea8b4b1b..241b630e 100644 --- a/api/server/acme/certmagic/storage.go +++ b/api/server/acme/certmagic/storage.go @@ -138,7 +138,7 @@ func (s *storage) Stat(key string) (certmagic.KeyInfo, error) { }, nil } -// NewStorage returns a certmagic.Storage backed by a go-micro/lock and go-micro/store +// NewStorage returns a certmagic.Storage backed by a micro/lock and micro/store func NewStorage(lock sync.Sync, store store.Store) certmagic.Storage { return &storage{ lock: lock, diff --git a/api/server/http/http.go b/api/server/http/http.go index 1366dde3..86c6b112 100644 --- a/api/server/http/http.go +++ b/api/server/http/http.go @@ -74,7 +74,7 @@ func (s *httpServer) Start() error { } if config.Logger.V(logger.InfoLevel) { - config.Logger.Info("HTTP API Listening on %s", l.Addr().String()) + config.Logger.Infof("HTTP API Listening on %s", l.Addr().String()) } s.Lock() @@ -85,7 +85,7 @@ func (s *httpServer) Start() error { if err := http.Serve(l, s.mux); err != nil { // temporary fix if config.Logger.V(logger.ErrorLevel) { - config.Logger.Error("serve err: %v", err) + config.Logger.Errorf("serve err: %v", err) } s.Stop() } diff --git a/broker/options.go b/broker/options.go index 22751108..fae5aaaf 100644 --- a/broker/options.go +++ b/broker/options.go @@ -13,7 +13,7 @@ import ( type Options struct { Addrs []string Secure bool - Codec codec.Marshaler + Codec codec.Codec // Logger Logger logger.Logger @@ -125,7 +125,7 @@ func Addrs(addrs ...string) Option { // Codec sets the codec used for encoding/decoding used where // a broker does not support headers -func Codec(c codec.Marshaler) Option { +func Codec(c codec.Codec) Option { return func(o *Options) { o.Codec = c } diff --git a/client/client.go b/client/client.go index b89f6e0f..e06769da 100644 --- a/client/client.go +++ b/client/client.go @@ -48,7 +48,7 @@ type Request interface { // The unencoded request body Body() interface{} // Write to the encoded request writer. This is nil before a call is made - Codec() codec.Writer + Codec() codec.Codec // indicates whether the request will be a streaming one rather than unary Stream() bool } @@ -56,7 +56,7 @@ type Request interface { // Response is the response received from a service type Response interface { // Read the response - Codec() codec.Reader + Codec() codec.Codec // read the header Header() metadata.Metadata // Read the undecoded response diff --git a/client/noop.go b/client/noop.go index b9e99d19..3b78dc39 100644 --- a/client/noop.go +++ b/client/noop.go @@ -3,14 +3,26 @@ package client import ( "context" - raw "github.com/unistack-org/micro-codec-bytes" - json "github.com/unistack-org/micro-codec-json" "github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/codec" "github.com/unistack-org/micro/v3/errors" "github.com/unistack-org/micro/v3/metadata" ) +var ( + DefaultCodecs = map[string]codec.Codec{ + //"application/json": cjson.NewCodec, + //"application/json-rpc": cjsonrpc.NewCodec, + //"application/protobuf": cproto.NewCodec, + //"application/proto-rpc": cprotorpc.NewCodec, + "application/octet-stream": codec.NewCodec(), + } +) + +const ( + defaultContentType = "application/json" +) + type noopClient struct { opts Options } @@ -27,7 +39,7 @@ type noopRequest struct { endpoint string contentType string body interface{} - codec codec.Writer + codec codec.Codec stream bool } @@ -56,7 +68,7 @@ func (n *noopRequest) Body() interface{} { return n.body } -func (n *noopRequest) Codec() codec.Writer { +func (n *noopRequest) Codec() codec.Codec { return n.codec } @@ -65,11 +77,11 @@ func (n *noopRequest) Stream() bool { } type noopResponse struct { - codec codec.Reader + codec codec.Codec header metadata.Metadata } -func (n *noopResponse) Codec() codec.Reader { +func (n *noopResponse) Codec() codec.Codec { return n.codec } @@ -123,6 +135,16 @@ func (n *noopMessage) ContentType() string { return n.opts.ContentType } +func (n *noopClient) newCodec(contentType string) (codec.Codec, error) { + if cf, ok := n.opts.Codecs[contentType]; ok { + return cf, nil + } + if cf, ok := DefaultCodecs[contentType]; ok { + return cf, nil + } + return nil, codec.ErrUnknownContentType +} + func (n *noopClient) Init(opts ...Option) error { for _, o := range opts { o(&n.opts) @@ -168,21 +190,15 @@ func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOpti md["Micro-Topic"] = p.Topic() // passed in raw data - if d, ok := p.Payload().(*raw.Frame); ok { + if d, ok := p.Payload().(*codec.Frame); ok { body = d.Data } else { - cf := n.opts.Broker.Options().Codec - if cf == nil { - cf = json.Marshaler{} + // use codec for payload + cf, err := n.newCodec(p.ContentType()) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) } - /* - // use codec for payload - cf, err := n.opts.Codecs[p.ContentType()] - if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) - } - */ // set the body b, err := cf.Marshal(p.Payload()) if err != nil { diff --git a/client/options.go b/client/options.go index 169d0f15..03450997 100644 --- a/client/options.go +++ b/client/options.go @@ -22,7 +22,7 @@ type Options struct { // Plugged interfaces Broker broker.Broker - Codecs map[string]codec.NewCodec + Codecs map[string]codec.Codec Router router.Router Selector selector.Selector Transport transport.Transport @@ -141,8 +141,8 @@ type RequestOptions struct { func NewOptions(opts ...Option) Options { options := Options{ Context: context.Background(), - ContentType: "application/protobuf", - Codecs: make(map[string]codec.NewCodec), + ContentType: "application/json", + Codecs: make(map[string]codec.Codec), CallOptions: CallOptions{ Backoff: DefaultBackoff, Retry: DefaultRetry, @@ -179,7 +179,7 @@ func Logger(l logger.Logger) Option { } // Codec to be used to encode/decode requests for a given content type -func Codec(contentType string, c codec.NewCodec) Option { +func Codec(contentType string, c codec.Codec) Option { return func(o *Options) { o.Codecs[contentType] = c } diff --git a/client/test_request.go b/client/test_request.go index bc4a7f53..91db6302 100644 --- a/client/test_request.go +++ b/client/test_request.go @@ -34,7 +34,7 @@ func (r *testRequest) Body() interface{} { return r.body } -func (r *testRequest) Codec() codec.Writer { +func (r *testRequest) Codec() codec.Codec { return r.codec } diff --git a/codec/codec.go b/codec/codec.go index 554969d2..f8d8d7ae 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -18,40 +18,22 @@ const ( var ( // ErrInvalidMessage returned when invalid messge passed to codec ErrInvalidMessage = errors.New("invalid message") + // ErrUnknownContentType returned when content-type is unknown + ErrUnknownContentType = errors.New("unknown content-type") ) // MessageType type MessageType int -// NewCodec takes in a connection/buffer and returns a new Codec -type NewCodec func(io.ReadWriteCloser) Codec - -// Codec encodes/decodes various types of messages used within go-micro. +// Codec encodes/decodes various types of messages used within micro. // ReadHeader and ReadBody are called in pairs to read requests/responses // from the connection. Close is called when finished with the // connection. ReadBody may be called with a nil argument to force the // body to be read and discarded. type Codec interface { - Reader - Writer - Close() error - String() string -} - -// Reader interface -type Reader interface { - ReadHeader(*Message, MessageType) error - ReadBody(interface{}) error -} - -// Writer interface -type Writer interface { - Write(*Message, interface{}) error -} - -// Marshaler is a simple encoding interface used for the broker/transport -// where headers are not supported by the underlying implementation. -type Marshaler interface { + ReadHeader(io.ReadWriter, *Message, MessageType) error + ReadBody(io.ReadWriter, interface{}) error + Write(io.ReadWriter, *Message, interface{}) error Marshal(interface{}) ([]byte, error) Unmarshal([]byte, interface{}) error String() string diff --git a/codec/noop.go b/codec/noop.go new file mode 100644 index 00000000..31606e0f --- /dev/null +++ b/codec/noop.go @@ -0,0 +1,93 @@ +package codec + +import ( + "io" + "io/ioutil" +) + +type noopCodec struct { +} + +// Frame gives us the ability to define raw data to send over the pipes +type Frame struct { + Data []byte +} + +func (c *noopCodec) ReadHeader(conn io.ReadWriter, m *Message, t MessageType) error { + return nil +} + +func (c *noopCodec) ReadBody(conn io.ReadWriter, b interface{}) error { + // read bytes + buf, err := ioutil.ReadAll(conn) + if err != nil { + return err + } + + if b == nil { + return nil + } + + switch v := b.(type) { + case []byte: + v = buf + case *[]byte: + *v = buf + case *Frame: + v.Data = buf + default: + return ErrInvalidMessage + } + + return nil +} + +func (c *noopCodec) Write(conn io.ReadWriter, m *Message, b interface{}) error { + var v []byte + switch vb := b.(type) { + case nil: + return nil + case *Frame: + v = vb.Data + case *[]byte: + v = *vb + case []byte: + v = vb + default: + return ErrInvalidMessage + } + _, err := conn.Write(v) + return err +} + +func (c *noopCodec) String() string { + return "noop" +} + +func NewCodec() Codec { + return &noopCodec{} +} + +func (n *noopCodec) Marshal(v interface{}) ([]byte, error) { + switch ve := v.(type) { + case *[]byte: + return *ve, nil + case []byte: + return ve, nil + case *Message: + return ve.Body, nil + } + return nil, ErrInvalidMessage +} + +func (n *noopCodec) Unmarshal(d []byte, v interface{}) error { + switch ve := v.(type) { + case []byte: + ve = d + case *[]byte: + *ve = d + case *Message: + ve.Body = d + } + return ErrInvalidMessage +} diff --git a/go.mod b/go.mod index c29ef04d..282b9abb 100644 --- a/go.mod +++ b/go.mod @@ -7,11 +7,8 @@ require ( github.com/caddyserver/certmagic v0.10.6 github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/ef-ds/deque v1.0.4-0.20190904040645-54cb57c252a1 - github.com/evanphx/json-patch/v5 v5.1.0 github.com/ghodss/yaml v1.0.0 github.com/go-acme/lego/v3 v3.4.0 - github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee - github.com/gobwas/ws v1.0.3 github.com/golang/protobuf v1.4.3 github.com/google/uuid v1.1.2 github.com/hashicorp/hcl v1.0.0 @@ -19,11 +16,6 @@ require ( github.com/miekg/dns v1.1.31 github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c github.com/stretchr/testify v1.5.1 - github.com/unistack-org/micro-codec-bytes v0.0.0-20200828083432-4e49e953d844 - github.com/unistack-org/micro-codec-json v0.0.0-20201102222734-a29c895ec05c - github.com/unistack-org/micro-codec-jsonrpc v0.0.0-20201102222451-ff6a69988bcd - github.com/unistack-org/micro-codec-proto v0.0.0-20201102222202-769c2d6a4b92 - github.com/unistack-org/micro-codec-protorpc v0.0.0-20201102222610-3a343898c077 github.com/unistack-org/micro-config-cmd v0.0.0-20201028144621-5a55f1aad70a golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a golang.org/x/net v0.0.0-20200904194848-62affa334b73 diff --git a/go.sum b/go.sum index 970dd432..f5bec3b4 100644 --- a/go.sum +++ b/go.sum @@ -258,25 +258,14 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/timewasted/linode v0.0.0-20160829202747-37e84520dcf7/go.mod h1:imsgLplxEC/etjIhdr3dNzV3JeT27LbVu5pYWm0JCBY= github.com/transip/gotransip v0.0.0-20190812104329-6d8d9179b66f/go.mod h1:i0f4R4o2HM0m3DZYQWsj6/MEowD57VzoH0v3d7igeFY= github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g= -github.com/unistack-org/micro-codec-bytes v0.0.0-20200827104921-3616a69473a6/go.mod h1:g5sOI8TWgGZiVHe8zoUPdtz7+0oLnqTnfBoai6Qb7jE= github.com/unistack-org/micro-codec-bytes v0.0.0-20200828083432-4e49e953d844 h1:5b1yuSllbsMm/9fUIlIXSr8DbsKT/sAKSCgOx6+SAfI= github.com/unistack-org/micro-codec-bytes v0.0.0-20200828083432-4e49e953d844/go.mod h1:g5sOI8TWgGZiVHe8zoUPdtz7+0oLnqTnfBoai6Qb7jE= -github.com/unistack-org/micro-codec-json v0.0.0-20201102222734-a29c895ec05c h1:RtcNaK8rQSl7xAoy1W437dvZLCVjSC6e4JcolepSQs0= -github.com/unistack-org/micro-codec-json v0.0.0-20201102222734-a29c895ec05c/go.mod h1:dG5aUyhBv+ebOl/UFW2Aj2GTfVxxXWi6AcynpePOAhQ= -github.com/unistack-org/micro-codec-jsonrpc v0.0.0-20201102222451-ff6a69988bcd h1:qXSiEfVnCgrwTHYvAnEPSHEai3+5EUH9ZYovLpxGDwg= -github.com/unistack-org/micro-codec-jsonrpc v0.0.0-20201102222451-ff6a69988bcd/go.mod h1:PFyvkGhavl+3tEPgOaLAhoJJX4/webVGW59BSOXDfNM= -github.com/unistack-org/micro-codec-proto v0.0.0-20201102222202-769c2d6a4b92 h1:1rPDBu7Nwo3ZL6r6H5rj7qNchHSdBF4zcewAeTUEMC4= -github.com/unistack-org/micro-codec-proto v0.0.0-20201102222202-769c2d6a4b92/go.mod h1:31JMo683bBQ+uN9YufpUU6ESHphyx3DFmTXEnjpJV9Y= -github.com/unistack-org/micro-codec-protorpc v0.0.0-20201102222610-3a343898c077 h1:uK7owL8TPSwoQiDM1V/0swmgCEepSQKXoi8GEnGxtlU= -github.com/unistack-org/micro-codec-protorpc v0.0.0-20201102222610-3a343898c077/go.mod h1:Ct4uAVZaDEyBZj9Q0poDkbzu6zKXUCcSqJkv/MWPpeI= -github.com/unistack-org/micro-config-cmd v0.0.0-20200828075439-d859b9d7265b/go.mod h1:6pm1cadbwsFcEW1ZbV5Fp0i3goR3TNfROMNSPih3I8k= github.com/unistack-org/micro-config-cmd v0.0.0-20200909210346-ec89783dc46c/go.mod h1:6pm1cadbwsFcEW1ZbV5Fp0i3goR3TNfROMNSPih3I8k= github.com/unistack-org/micro-config-cmd v0.0.0-20200909210755-6e7e85eeab34/go.mod h1:fT1gYn+TtfVZZ5tNx56bZIncJjmlji66g7GKdWua5hE= github.com/unistack-org/micro-config-cmd v0.0.0-20200920140133-0853deb2e5dc/go.mod h1:il8nz4ZEcX3Usyfrtwy+YtQcb7xSUSFJdSe8PBJ9gOA= github.com/unistack-org/micro-config-cmd v0.0.0-20201028144621-5a55f1aad70a h1:VjlqP1qZkjC0Chmx5MKFPIbtSCigeICFDf8vaLZGh9o= github.com/unistack-org/micro-config-cmd v0.0.0-20201028144621-5a55f1aad70a/go.mod h1:MzMg+qh1wORZwYtg5AVgFkNFrXVVbdPKW7s/Is+A994= github.com/unistack-org/micro/v3 v3.0.0-20200827083227-aa99378adc6e/go.mod h1:rPQbnry3nboAnMczj8B1Gzlcyv/HYoMZLgd3/3nttJ4= -github.com/unistack-org/micro/v3 v3.0.0-gamma/go.mod h1:iEtpu3wTYCRs3pQ3VsFEO7JBO4lOMpkOwMyrpZyIDPo= github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200909210629-caec730248b1/go.mod h1:mmqHR9WelHUXqg2mELjsQ+FJHcWs6mNmXg+wEYO2T3c= github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200920135754-1cbd1d2bad83/go.mod h1:HUzMG4Mcy97958VxWTg8zuazZgwQ/aoLZ8wtBVONwRE= github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200922103357-4c4fa00a5d94/go.mod h1:aL+8VhSXpx0SuEeXPOWUo5BgS7kyvWYobeXFay90UUM= diff --git a/micro.go b/micro.go index 3b26cc52..87f2a6d4 100644 --- a/micro.go +++ b/micro.go @@ -12,7 +12,7 @@ import ( type serviceKey struct{} // Service is an interface that wraps the lower level libraries -// within go-micro. Its a convenience method for building +// within micro. Its a convenience method for building // and initialising services. type Service interface { // The service name diff --git a/model/options.go b/model/options.go index 6e1284eb..b5421ba9 100644 --- a/model/options.go +++ b/model/options.go @@ -12,7 +12,7 @@ type Options struct { // Database to write to Database string // for serialising - Codec codec.Marshaler + Codec codec.Codec // for locking Sync sync.Sync // for storage diff --git a/network/transport/options.go b/network/transport/options.go index 9c71d2f4..b04d9c4c 100644 --- a/network/transport/options.go +++ b/network/transport/options.go @@ -14,7 +14,7 @@ type Options struct { Addrs []string // Codec is the codec interface to use where headers are not supported // by the transport and the entire payload must be encoded - Codec codec.Marshaler + Codec codec.Codec // Secure tells the transport to secure the connection. // In the case TLSConfig is not specified best effort self-signed // certs should be used @@ -121,7 +121,7 @@ func Context(ctx context.Context) Option { // Codec sets the codec used for encoding where the transport // does not support message headers -func Codec(c codec.Marshaler) Option { +func Codec(c codec.Codec) Option { return func(o *Options) { o.Codec = c } diff --git a/network/tunnel/tunnel.go b/network/tunnel/tunnel.go index 64d794d2..9bd98976 100644 --- a/network/tunnel/tunnel.go +++ b/network/tunnel/tunnel.go @@ -46,7 +46,7 @@ var ( // Mode of the session type Mode uint8 -// Tunnel creates a gre tunnel on top of the go-micro/transport. +// Tunnel creates a gre tunnel on top of the micro/transport. // It establishes multiple streams using the Micro-Tunnel-Channel header // and Micro-Tunnel-Session header. The tunnel id is a hash of // the address being requested. diff --git a/proxy/options.go b/proxy/options.go index e5a518bc..6ce8084a 100644 --- a/proxy/options.go +++ b/proxy/options.go @@ -1,4 +1,4 @@ -// Package proxy is a transparent proxy built on the go-micro/server +// Package proxy is a transparent proxy built on the micro/server package proxy import ( diff --git a/proxy/proxy.go b/proxy/proxy.go index 05204a4f..01ab196d 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -1,4 +1,4 @@ -// Package proxy is a transparent proxy built on the go-micro/server +// Package proxy is a transparent proxy built on the micro/server package proxy import ( @@ -7,7 +7,7 @@ import ( "github.com/unistack-org/micro/v3/server" ) -// Proxy can be used as a proxy server for go-micro services +// Proxy can be used as a proxy server for micro services type Proxy interface { // ProcessMessage handles inbound messages ProcessMessage(context.Context, server.Message) error diff --git a/resolver/registry/registry.go b/resolver/registry/registry.go index b91c57c9..6ebab330 100644 --- a/resolver/registry/registry.go +++ b/resolver/registry/registry.go @@ -1,4 +1,4 @@ -// Package registry resolves names using the go-micro registry +// Package registry resolves names using the micro registry package registry import ( diff --git a/server/noop.go b/server/noop.go index 71a61ebe..1279b11b 100644 --- a/server/noop.go +++ b/server/noop.go @@ -1,17 +1,15 @@ package server import ( - "bytes" "fmt" "sort" "sync" "time" - craw "github.com/unistack-org/micro-codec-bytes" - cjson "github.com/unistack-org/micro-codec-json" - cjsonrpc "github.com/unistack-org/micro-codec-jsonrpc" - cproto "github.com/unistack-org/micro-codec-proto" - cprotorpc "github.com/unistack-org/micro-codec-protorpc" + // cjson "github.com/unistack-org/micro-codec-json" + // cjsonrpc "github.com/unistack-org/micro-codec-jsonrpc" + // cproto "github.com/unistack-org/micro-codec-proto" + // cprotorpc "github.com/unistack-org/micro-codec-protorpc" "github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/codec" "github.com/unistack-org/micro/v3/logger" @@ -19,12 +17,12 @@ import ( ) var ( - DefaultCodecs = map[string]codec.NewCodec{ - "application/json": cjson.NewCodec, - "application/json-rpc": cjsonrpc.NewCodec, - "application/protobuf": cproto.NewCodec, - "application/proto-rpc": cprotorpc.NewCodec, - "application/octet-stream": craw.NewCodec, + DefaultCodecs = map[string]codec.Codec{ + //"application/json": cjson.NewCodec, + //"application/json-rpc": cjsonrpc.NewCodec, + //"application/protobuf": cproto.NewCodec, + //"application/proto-rpc": cprotorpc.NewCodec, + "application/octet-stream": codec.NewCodec(), } ) @@ -50,14 +48,14 @@ func NewServer(opts ...Option) Server { return &noopServer{opts: NewOptions(opts...)} } -func (n *noopServer) newCodec(contentType string) (codec.NewCodec, error) { +func (n *noopServer) newCodec(contentType string) (codec.Codec, error) { if cf, ok := n.opts.Codecs[contentType]; ok { return cf, nil } if cf, ok := DefaultCodecs[contentType]; ok { return cf, nil } - return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType) + return nil, codec.ErrUnknownContentType } func (n *noopServer) Handle(handler Handler) error { @@ -188,7 +186,7 @@ func (n *noopServer) Register() error { if !registered { if config.Logger.V(logger.InfoLevel) { - config.Logger.Info("Registry [%s] Registering node: %s", config.Registry.String(), service.Nodes[0].Id) + config.Logger.Infof("registry [%s] Registering node: %s", config.Registry.String(), service.Nodes[0].Id) } } @@ -221,7 +219,7 @@ func (n *noopServer) Register() error { opts = append(opts, broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)) if config.Logger.V(logger.InfoLevel) { - config.Logger.Info("Subscribing to topic: %s", sb.Topic()) + config.Logger.Infof("subscribing to topic: %s", sb.Topic()) } sub, err := config.Broker.Subscribe(cx, sb.Topic(), handler, opts...) if err != nil { @@ -251,7 +249,7 @@ func (n *noopServer) Deregister() error { } if config.Logger.V(logger.InfoLevel) { - config.Logger.Info("deregistering node: %s", service.Nodes[0].Id) + config.Logger.Infof("deregistering node: %s", service.Nodes[0].Id) } if err := DefaultDeregisterFunc(service, config); err != nil { @@ -281,11 +279,11 @@ func (n *noopServer) Deregister() error { go func(s broker.Subscriber) { defer wg.Done() if config.Logger.V(logger.InfoLevel) { - config.Logger.Info("unsubscribing from topic: %s", s.Topic()) + config.Logger.Infof("unsubscribing from topic: %s", s.Topic()) } if err := s.Unsubscribe(cx); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Error("unsubscribing from topic: %s err: %v", s.Topic(), err) + config.Logger.Errorf("unsubscribing from topic: %s err: %v", s.Topic(), err) } } }(sub) @@ -308,7 +306,7 @@ func (n *noopServer) Start() error { n.RUnlock() if config.Logger.V(logger.InfoLevel) { - config.Logger.Info("Server [noop] Listening on %s", config.Address) + config.Logger.Infof("server [noop] Listening on %s", config.Address) } n.Lock() if len(config.Advertise) == 0 { @@ -321,26 +319,26 @@ func (n *noopServer) Start() error { // connect to the broker if err := config.Broker.Connect(config.Context); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Error("Broker [%s] connect error: %v", config.Broker.String(), err) + config.Logger.Errorf("broker [%s] connect error: %v", config.Broker.String(), err) } return err } if config.Logger.V(logger.InfoLevel) { - config.Logger.Info("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()) + config.Logger.Infof("broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()) } } // use RegisterCheck func before register if err := config.RegisterCheck(config.Context); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Error("Server %s-%s register check error: %s", config.Name, config.Id, err) + config.Logger.Errorf("server %s-%s register check error: %s", config.Name, config.Id, err) } } else { // announce self to the world if err := n.Register(); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Error("Server register error: %v", err) + config.Logger.Errorf("server register error: %v", err) } } } @@ -368,23 +366,23 @@ func (n *noopServer) Start() error { rerr := config.RegisterCheck(config.Context) if rerr != nil && registered { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Error("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr) + config.Logger.Errorf("server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr) } // deregister self in case of error if err := n.Deregister(); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Error("Server %s-%s deregister error: %s", config.Name, config.Id, err) + config.Logger.Errorf("server %s-%s deregister error: %s", config.Name, config.Id, err) } } } else if rerr != nil && !registered { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Error("Server %s-%s register check error: %s", config.Name, config.Id, rerr) + config.Logger.Errorf("server %s-%s register check error: %s", config.Name, config.Id, rerr) } continue } if err := n.Register(); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Error("Server %s-%s register error: %s", config.Name, config.Id, err) + config.Logger.Errorf("server %s-%s register error: %s", config.Name, config.Id, err) } } // wait for exit @@ -396,7 +394,7 @@ func (n *noopServer) Start() error { // deregister self if err := n.Deregister(); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Error("Server deregister error: ", err) + config.Logger.Errorf("server deregister error: ", err) } } @@ -409,12 +407,12 @@ func (n *noopServer) Start() error { ch <- nil if config.Logger.V(logger.InfoLevel) { - config.Logger.Info("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()) + config.Logger.Infof("broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()) } // disconnect broker if err := config.Broker.Disconnect(config.Context); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Error("Broker [%s] disconnect error: %v", config.Broker.String(), err) + config.Logger.Errorf("broker [%s] disconnect error: %v", config.Broker.String(), err) } } }() @@ -446,11 +444,3 @@ func (n *noopServer) Stop() error { return err } - -type noopcodec struct { - *bytes.Buffer -} - -func (c noopcodec) Close() error { - return nil -} diff --git a/server/options.go b/server/options.go index d165918a..aa91f956 100644 --- a/server/options.go +++ b/server/options.go @@ -18,7 +18,7 @@ import ( // Options server struct type Options struct { - Codecs map[string]codec.NewCodec + Codecs map[string]codec.Codec Broker broker.Broker Registry registry.Registry Tracer tracer.Tracer @@ -62,7 +62,7 @@ type Options struct { func NewOptions(opts ...Option) Options { options := Options{ Auth: auth.DefaultAuth, - Codecs: make(map[string]codec.NewCodec), + Codecs: make(map[string]codec.Codec), Context: context.Background(), Metadata: metadata.New(0), RegisterInterval: DefaultRegisterInterval, @@ -144,7 +144,7 @@ func Broker(b broker.Broker) Option { } // Codec to use to encode/decode requests for a given content type -func Codec(contentType string, c codec.NewCodec) Option { +func Codec(contentType string, c codec.Codec) Option { return func(o *Options) { o.Codecs[contentType] = c } diff --git a/server/request.go b/server/request.go index f4496482..cd9880ee 100644 --- a/server/request.go +++ b/server/request.go @@ -34,6 +34,6 @@ func (r *rpcMessage) Body() []byte { return r.body } -func (r *rpcMessage) Codec() codec.Reader { +func (r *rpcMessage) Codec() codec.Codec { return r.codec } diff --git a/server/server.go b/server/server.go index 802390b0..5fb1a3b3 100644 --- a/server/server.go +++ b/server/server.go @@ -58,7 +58,7 @@ type Message interface { // The raw body of the message Body() []byte // Codec used to decode the message - Codec() codec.Reader + Codec() codec.Codec } // Request is a synchronous request interface @@ -78,7 +78,7 @@ type Request interface { // Read the undecoded request body Read() ([]byte, error) // The encoded message stream - Codec() codec.Reader + Codec() codec.Codec // Indicates whether its a stream Stream() bool } @@ -86,7 +86,7 @@ type Request interface { // Response is the response writer for unencoded messages type Response interface { // Encoded writer - Codec() codec.Writer + Codec() codec.Codec // Write the header WriteHeader(metadata.Metadata) // write a response directly to the client diff --git a/server/subscriber.go b/server/subscriber.go index 21ec7d14..9d3575b5 100644 --- a/server/subscriber.go +++ b/server/subscriber.go @@ -239,7 +239,7 @@ func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handl req = req.Elem() } - if err = cf(noopcodec{bytes.NewBuffer(msg.Body)}).ReadBody(req.Interface()); err != nil { + if err = cf.ReadBody(bytes.NewBuffer(msg.Body), req.Interface()); err != nil { return err } diff --git a/service.go b/service.go index bba6bfd5..075e1103 100644 --- a/service.go +++ b/service.go @@ -168,7 +168,7 @@ func (s *service) Start() error { s.RUnlock() if config.Logger.V(logger.InfoLevel) { - config.Logger.Info("Starting [service] %s", s.Name()) + config.Logger.Infof("starting [service] %s", s.Name()) } for _, fn := range s.opts.BeforeStart { @@ -218,7 +218,7 @@ func (s *service) Stop() error { s.RUnlock() if config.Logger.V(logger.InfoLevel) { - config.Logger.Info("Stoppping [service] %s", s.Name()) + config.Logger.Infof("stoppping [service] %s", s.Name()) } var err error diff --git a/util/kubernetes/client/client.go b/util/kubernetes/client/client.go index d37a2bd1..39214384 100644 --- a/util/kubernetes/client/client.go +++ b/util/kubernetes/client/client.go @@ -23,7 +23,7 @@ var ( // ErrReadNamespace is returned when the names could not be read from service account ErrReadNamespace = errors.New("Could not read namespace from service account secret") // DefaultImage is default micro image - DefaultImage = "micro/go-micro" + DefaultImage = "micro/micro" // DefaultNamespace is the default k8s namespace DefaultNamespace = "default" ) diff --git a/util/kubernetes/client/util_test.go b/util/kubernetes/client/util_test.go index 8073a049..54a6139c 100644 --- a/util/kubernetes/client/util_test.go +++ b/util/kubernetes/client/util_test.go @@ -35,7 +35,7 @@ func TestFormatName(t *testing.T) { {"foo-bar", "foo-bar"}, {"foo.bar", "foo-bar"}, {"Foo.Bar", "foo-bar"}, - {"go.micro.foo.bar", "go-micro-foo-bar"}, + {"micro.foo.bar", "micro-foo-bar"}, } for _, test := range testCases { diff --git a/util/stream/stream.go b/util/stream/stream.go index 158d1450..4476a10d 100644 --- a/util/stream/stream.go +++ b/util/stream/stream.go @@ -31,8 +31,8 @@ type request struct { context context.Context } -func (r *request) Codec() codec.Reader { - return r.Request.Codec().(codec.Reader) +func (r *request) Codec() codec.Codec { + return r.Request.Codec() } func (r *request) Header() metadata.Metadata {