major codec upgrade

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2020-11-23 16:18:47 +03:00
parent daffa9e548
commit c9049c3845
30 changed files with 196 additions and 1004 deletions

View File

@ -1,495 +0,0 @@
// Package rpc is a go-micro rpc handler.
package rpc
import (
"encoding/json"
"io"
"net/http"
"strconv"
"strings"
jsonpatch "github.com/evanphx/json-patch/v5"
"github.com/oxtoacart/bpool"
jsonrpc "github.com/unistack-org/micro-codec-jsonrpc"
protorpc "github.com/unistack-org/micro-codec-protorpc"
"github.com/unistack-org/micro/v3/api"
"github.com/unistack-org/micro/v3/api/handler"
"github.com/unistack-org/micro/v3/api/internal/proto"
"github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/errors"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/util/ctx"
"github.com/unistack-org/micro/v3/util/qson"
"github.com/unistack-org/micro/v3/util/router"
)
const (
Handler = "rpc"
)
var (
// supported json codecs
jsonCodecs = []string{
"application/grpc+json",
"application/json",
"application/json-rpc",
}
// support proto codecs
protoCodecs = []string{
"application/grpc",
"application/grpc+proto",
"application/proto",
"application/protobuf",
"application/proto-rpc",
"application/octet-stream",
}
bufferPool = bpool.NewSizedBufferPool(1024, 8)
)
type rpcHandler struct {
opts handler.Options
s *api.Service
}
type buffer struct {
io.ReadCloser
}
func (b *buffer) Write(_ []byte) (int, error) {
return 0, nil
}
func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
bsize := handler.DefaultMaxRecvSize
if h.opts.MaxRecvSize > 0 {
bsize = h.opts.MaxRecvSize
}
r.Body = http.MaxBytesReader(w, r.Body, bsize)
defer r.Body.Close()
var service *api.Service
if h.s != nil {
// we were given the service
service = h.s
} else if h.opts.Router != nil {
// try get service from router
s, err := h.opts.Router.Route(r)
if err != nil {
writeError(w, r, errors.InternalServerError("go.micro.api", err.Error()))
return
}
service = s
} else {
// we have no way of routing the request
writeError(w, r, errors.InternalServerError("go.micro.api", "no route found"))
return
}
ct := r.Header.Get("Content-Type")
// Strip charset from Content-Type (like `application/json; charset=UTF-8`)
if idx := strings.IndexRune(ct, ';'); idx >= 0 {
ct = ct[:idx]
}
// micro client
c := h.opts.Client
// create context
cx := ctx.FromRequest(r)
// set merged context to request
*r = *r.Clone(cx)
// if stream we currently only support json
if isStream(r, service) {
serveWebsocket(cx, w, r, service, c)
return
}
// create custom router
callOpt := client.WithRouter(router.New(service.Services))
// walk the standard call path
// get payload
br, err := requestPayload(r)
if err != nil {
writeError(w, r, err)
return
}
var rsp []byte
switch {
// proto codecs
case hasCodec(ct, protoCodecs):
request := &proto.Message{}
// if the extracted payload isn't empty lets use it
if len(br) > 0 {
request = proto.NewMessage(br)
}
// create request/response
response := &proto.Message{}
req := c.NewRequest(
service.Name,
service.Endpoint.Name,
request,
client.WithContentType(ct),
)
// make the call
if err := c.Call(cx, req, response, callOpt); err != nil {
writeError(w, r, err)
return
}
// marshall response
rsp, err = response.Marshal()
if err != nil {
writeError(w, r, err)
return
}
default:
// if json codec is not present set to json
if !hasCodec(ct, jsonCodecs) {
ct = "application/json"
}
// default to trying json
var request json.RawMessage
// if the extracted payload isn't empty lets use it
if len(br) > 0 {
request = json.RawMessage(br)
}
// create request/response
var response json.RawMessage
req := c.NewRequest(
service.Name,
service.Endpoint.Name,
&request,
client.WithContentType(ct),
)
// make the call
if err := c.Call(cx, req, &response, callOpt); err != nil {
writeError(w, r, err)
return
}
// marshall response
rsp, err = response.MarshalJSON()
if err != nil {
writeError(w, r, err)
return
}
}
// write the response
writeResponse(w, r, rsp)
}
func (rh *rpcHandler) String() string {
return "rpc"
}
func hasCodec(ct string, codecs []string) bool {
for _, codec := range codecs {
if ct == codec {
return true
}
}
return false
}
// requestPayload takes a *http.Request.
// If the request is a GET the query string parameters are extracted and marshaled to JSON and the raw bytes are returned.
// If the request method is a POST the request body is read and returned
func requestPayload(r *http.Request) ([]byte, error) {
var err error
// we have to decode json-rpc and proto-rpc because we suck
// well actually because there's no proxy codec right now
ct := r.Header.Get("Content-Type")
switch {
case strings.Contains(ct, "application/json-rpc"):
msg := codec.Message{
Type: codec.Request,
Header: metadata.New(0),
}
c := jsonrpc.NewCodec(&buffer{r.Body})
if err = c.ReadHeader(&msg, codec.Request); err != nil {
return nil, err
}
var raw json.RawMessage
if err = c.ReadBody(&raw); err != nil {
return nil, err
}
return ([]byte)(raw), nil
case strings.Contains(ct, "application/proto-rpc"), strings.Contains(ct, "application/octet-stream"):
msg := codec.Message{
Type: codec.Request,
Header: metadata.New(0),
}
c := protorpc.NewCodec(&buffer{r.Body})
if err = c.ReadHeader(&msg, codec.Request); err != nil {
return nil, err
}
var raw proto.Message
if err = c.ReadBody(&raw); err != nil {
return nil, err
}
return raw.Marshal()
case strings.Contains(ct, "application/www-x-form-urlencoded"):
if err = r.ParseForm(); err != nil {
return nil, err
}
// generate a new set of values from the form
vals := make(map[string]string, len(r.Form))
for k, v := range r.Form {
vals[k] = strings.Join(v, ",")
}
// marshal
return json.Marshal(vals)
// TODO: application/grpc
}
// otherwise as per usual
ctx := r.Context()
// dont user metadata.FromContext as it mangles names
md, ok := metadata.FromContext(ctx)
if !ok {
md = metadata.New(0)
}
// allocate maximum
matches := make(map[string]interface{}, len(md))
bodydst := ""
// get fields from url path
for k, v := range md {
k = strings.ToLower(k)
// filter own keys
if strings.HasPrefix(k, "x-api-field-") {
matches[strings.TrimPrefix(k, "x-api-field-")] = v
delete(md, k)
} else if k == "x-api-body" {
bodydst = v
delete(md, k)
}
}
// map of all fields
req := make(map[string]interface{}, len(md))
// get fields from url values
if len(r.URL.RawQuery) > 0 {
umd := make(map[string]interface{})
err = qson.Unmarshal(&umd, r.URL.RawQuery)
if err != nil {
return nil, err
}
for k, v := range umd {
matches[k] = v
}
}
// restore context without fields
*r = *r.Clone(metadata.NewContext(ctx, md))
for k, v := range matches {
ps := strings.Split(k, ".")
if len(ps) == 1 {
req[k] = v
continue
}
em := make(map[string]interface{})
em[ps[len(ps)-1]] = v
for i := len(ps) - 2; i > 0; i-- {
nm := make(map[string]interface{})
nm[ps[i]] = em
em = nm
}
if vm, ok := req[ps[0]]; ok {
// nested map
nm := vm.(map[string]interface{})
for vk, vv := range em {
nm[vk] = vv
}
req[ps[0]] = nm
} else {
req[ps[0]] = em
}
}
pathbuf := []byte("{}")
if len(req) > 0 {
pathbuf, err = json.Marshal(req)
if err != nil {
return nil, err
}
}
urlbuf := []byte("{}")
out, err := jsonpatch.MergeMergePatches(urlbuf, pathbuf)
if err != nil {
return nil, err
}
switch r.Method {
case "GET":
// empty response
if strings.Contains(ct, "application/json") && string(out) == "{}" {
return out, nil
} else if string(out) == "{}" && !strings.Contains(ct, "application/json") {
return []byte{}, nil
}
return out, nil
case "PATCH", "POST", "PUT", "DELETE":
bodybuf := []byte("{}")
buf := bufferPool.Get()
defer bufferPool.Put(buf)
if _, err := buf.ReadFrom(r.Body); err != nil {
return nil, err
}
if b := buf.Bytes(); len(b) > 0 {
bodybuf = b
}
if bodydst == "" || bodydst == "*" {
if out, err = jsonpatch.MergeMergePatches(out, bodybuf); err == nil {
return out, nil
}
}
var jsonbody map[string]interface{}
if json.Valid(bodybuf) {
if err = json.Unmarshal(bodybuf, &jsonbody); err != nil {
return nil, err
}
}
dstmap := make(map[string]interface{})
ps := strings.Split(bodydst, ".")
if len(ps) == 1 {
if jsonbody != nil {
dstmap[ps[0]] = jsonbody
} else {
// old unexpected behaviour
dstmap[ps[0]] = bodybuf
}
} else {
em := make(map[string]interface{})
if jsonbody != nil {
em[ps[len(ps)-1]] = jsonbody
} else {
// old unexpected behaviour
em[ps[len(ps)-1]] = bodybuf
}
for i := len(ps) - 2; i > 0; i-- {
nm := make(map[string]interface{})
nm[ps[i]] = em
em = nm
}
dstmap[ps[0]] = em
}
bodyout, err := json.Marshal(dstmap)
if err != nil {
return nil, err
}
if out, err = jsonpatch.MergeMergePatches(out, bodyout); err == nil {
return out, nil
}
//fallback to previous unknown behaviour
return bodybuf, nil
}
return []byte{}, nil
}
func writeError(w http.ResponseWriter, r *http.Request, err error) {
ce := errors.Parse(err.Error())
switch ce.Code {
case 0:
// assuming it's totally screwed
ce.Code = 500
ce.Id = "go.micro.api"
ce.Status = http.StatusText(500)
ce.Detail = "error during request: " + ce.Detail
w.WriteHeader(500)
default:
w.WriteHeader(int(ce.Code))
}
// response content type
w.Header().Set("Content-Type", "application/json")
// Set trailers
if strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
w.Header().Set("Trailer", "grpc-status")
w.Header().Set("Trailer", "grpc-message")
w.Header().Set("grpc-status", "13")
w.Header().Set("grpc-message", ce.Detail)
}
_, werr := w.Write([]byte(ce.Error()))
if werr != nil {
if logger.V(logger.ErrorLevel) {
logger.Error(werr.Error())
}
}
}
func writeResponse(w http.ResponseWriter, r *http.Request, rsp []byte) {
w.Header().Set("Content-Type", r.Header.Get("Content-Type"))
w.Header().Set("Content-Length", strconv.Itoa(len(rsp)))
// Set trailers
if strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
w.Header().Set("Trailer", "grpc-status")
w.Header().Set("Trailer", "grpc-message")
w.Header().Set("grpc-status", "0")
w.Header().Set("grpc-message", "")
}
// write 204 status if rsp is nil
if len(rsp) == 0 {
w.WriteHeader(http.StatusNoContent)
}
// write response
_, err := w.Write(rsp)
if err != nil {
if logger.V(logger.ErrorLevel) {
logger.Error(err.Error())
}
}
}
func NewHandler(opts ...handler.Option) handler.Handler {
options := handler.NewOptions(opts...)
return &rpcHandler{
opts: options,
}
}
func WithService(s *api.Service, opts ...handler.Option) handler.Handler {
options := handler.NewOptions(opts...)
return &rpcHandler{
opts: options,
s: s,
}
}

View File

@ -1,112 +0,0 @@
package rpc
import (
"bytes"
"net/http"
"testing"
go_api "github.com/unistack-org/micro/v3/api/proto"
jsonpb "google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)
func TestRequestPayloadFromRequest(t *testing.T) {
// our test event so that we can validate serialising / deserializing of true protos works
protoEvent := go_api.Event{
Name: "Test",
}
protoBytes, err := proto.Marshal(&protoEvent)
if err != nil {
t.Fatal("Failed to marshal proto", err)
}
jsonBytes, err := jsonpb.Marshal(&protoEvent)
if err != nil {
t.Fatal("Failed to marshal proto to JSON ", err)
}
jsonUrlBytes := []byte(`{"key1":"val1","key2":"val2","name":"Test"}`)
t.Run("extracting a json from a POST request with url params", func(t *testing.T) {
r, err := http.NewRequest("POST", "http://localhost/my/path?key1=val1&key2=val2", bytes.NewReader(jsonBytes))
if err != nil {
t.Fatalf("Failed to created http.Request: %v", err)
}
extByte, err := requestPayload(r)
if err != nil {
t.Fatalf("Failed to extract payload from request: %v", err)
}
if string(extByte) != string(jsonUrlBytes) {
t.Fatalf("Expected %v and %v to match", string(extByte), jsonUrlBytes)
}
})
t.Run("extracting a proto from a POST request", func(t *testing.T) {
r, err := http.NewRequest("POST", "http://localhost/my/path", bytes.NewReader(protoBytes))
if err != nil {
t.Fatalf("Failed to created http.Request: %v", err)
}
extByte, err := requestPayload(r)
if err != nil {
t.Fatalf("Failed to extract payload from request: %v", err)
}
if string(extByte) != string(protoBytes) {
t.Fatalf("Expected %v and %v to match", string(extByte), string(protoBytes))
}
})
t.Run("extracting JSON from a POST request", func(t *testing.T) {
r, err := http.NewRequest("POST", "http://localhost/my/path", bytes.NewReader(jsonBytes))
if err != nil {
t.Fatalf("Failed to created http.Request: %v", err)
}
extByte, err := requestPayload(r)
if err != nil {
t.Fatalf("Failed to extract payload from request: %v", err)
}
if string(extByte) != string(jsonBytes) {
t.Fatalf("Expected %v and %v to match", string(extByte), string(jsonBytes))
}
})
t.Run("extracting params from a GET request", func(t *testing.T) {
r, err := http.NewRequest("GET", "http://localhost/my/path", nil)
if err != nil {
t.Fatalf("Failed to created http.Request: %v", err)
}
q := r.URL.Query()
q.Add("name", "Test")
r.URL.RawQuery = q.Encode()
extByte, err := requestPayload(r)
if err != nil {
t.Fatalf("Failed to extract payload from request: %v", err)
}
if string(extByte) != string(jsonBytes) {
t.Fatalf("Expected %v and %v to match", string(extByte), string(jsonBytes))
}
})
t.Run("GET request with no params", func(t *testing.T) {
r, err := http.NewRequest("GET", "http://localhost/my/path", nil)
if err != nil {
t.Fatalf("Failed to created http.Request: %v", err)
}
extByte, err := requestPayload(r)
if err != nil {
t.Fatalf("Failed to extract payload from request: %v", err)
}
if string(extByte) != "" {
t.Fatalf("Expected %v and %v to match", string(extByte), "")
}
})
}

View File

@ -1,263 +0,0 @@
package rpc
import (
"bytes"
"context"
"encoding/json"
"io"
"net/http"
"strings"
"time"
"github.com/gobwas/httphead"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
raw "github.com/unistack-org/micro-codec-bytes"
"github.com/unistack-org/micro/v3/api"
"github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/util/router"
)
// serveWebsocket will stream rpc back over websockets assuming json
func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request, service *api.Service, c client.Client) {
var op ws.OpCode
ct := r.Header.Get("Content-Type")
// Strip charset from Content-Type (like `application/json; charset=UTF-8`)
if idx := strings.IndexRune(ct, ';'); idx >= 0 {
ct = ct[:idx]
}
// check proto from request
switch ct {
case "application/json":
op = ws.OpText
default:
op = ws.OpBinary
}
hdr := make(http.Header)
if proto, ok := r.Header["Sec-WebSocket-Protocol"]; ok {
for _, p := range proto {
switch p {
case "binary":
hdr["Sec-WebSocket-Protocol"] = []string{"binary"}
op = ws.OpBinary
default:
op = ws.OpBinary
}
}
}
payload, err := requestPayload(r)
if err != nil {
if logger.V(logger.ErrorLevel) {
logger.Error(err.Error())
}
return
}
upgrader := ws.HTTPUpgrader{Timeout: 5 * time.Second,
Protocol: func(proto string) bool {
if strings.Contains(proto, "binary") {
return true
}
// fallback to support all protocols now
return true
},
Extension: func(httphead.Option) bool {
// disable extensions for compatibility
return false
},
Header: hdr,
}
conn, rw, _, err := upgrader.Upgrade(r, w)
if err != nil {
if logger.V(logger.ErrorLevel) {
logger.Error(err.Error())
}
return
}
defer func() {
if err := conn.Close(); err != nil {
if logger.V(logger.ErrorLevel) {
logger.Error(err.Error())
}
return
}
}()
var request interface{}
if !bytes.Equal(payload, []byte(`{}`)) {
switch ct {
case "application/json", "":
m := json.RawMessage(payload)
request = &m
default:
request = &raw.Frame{Data: payload}
}
}
// we always need to set content type for message
if ct == "" {
ct = "application/json"
}
req := c.NewRequest(
service.Name,
service.Endpoint.Name,
request,
client.WithContentType(ct),
client.StreamingRequest(),
)
// create custom router
callOpt := client.WithRouter(router.New(service.Services))
// create a new stream
stream, err := c.Stream(ctx, req, callOpt)
if err != nil {
if logger.V(logger.ErrorLevel) {
logger.Error(err.Error())
}
return
}
if request != nil {
if err = stream.Send(request); err != nil {
if logger.V(logger.ErrorLevel) {
logger.Error(err.Error())
}
return
}
}
go writeLoop(rw, stream)
rsp := stream.Response()
// receive from stream and send to client
for {
select {
case <-ctx.Done():
return
case <-stream.Context().Done():
return
default:
// read backend response body
buf, err := rsp.Read()
if err != nil {
// wants to avoid import grpc/status.Status
if strings.Contains(err.Error(), "context canceled") {
return
}
if logger.V(logger.ErrorLevel) {
logger.Error(err.Error())
}
return
}
// write the response
if err := wsutil.WriteServerMessage(rw, op, buf); err != nil {
if logger.V(logger.ErrorLevel) {
logger.Error(err.Error())
}
return
}
if err = rw.Flush(); err != nil {
if logger.V(logger.ErrorLevel) {
logger.Error(err.Error())
}
return
}
}
}
}
// writeLoop
func writeLoop(rw io.ReadWriter, stream client.Stream) {
// close stream when done
defer stream.Close()
for {
select {
case <-stream.Context().Done():
return
default:
buf, op, err := wsutil.ReadClientData(rw)
if err != nil {
if wserr, ok := err.(wsutil.ClosedError); ok {
switch wserr.Code {
case ws.StatusGoingAway:
// this happens when user leave the page
return
case ws.StatusNormalClosure, ws.StatusNoStatusRcvd:
// this happens when user close ws connection, or we don't get any status
return
}
}
if logger.V(logger.ErrorLevel) {
logger.Error(err.Error())
}
return
}
switch op {
default:
// not relevant
continue
case ws.OpText, ws.OpBinary:
break
}
// send to backend
// default to trying json
// if the extracted payload isn't empty lets use it
request := &raw.Frame{Data: buf}
if err := stream.Send(request); err != nil {
if logger.V(logger.ErrorLevel) {
logger.Error(err.Error())
}
return
}
}
}
}
func isStream(r *http.Request, srv *api.Service) bool {
// check if it's a web socket
if !isWebSocket(r) {
return false
}
// check if the endpoint supports streaming
for _, service := range srv.Services {
for _, ep := range service.Endpoints {
// skip if it doesn't match the name
if ep.Name != srv.Endpoint.Name {
continue
}
// matched if the name
if v := ep.Metadata["stream"]; v == "true" {
return true
}
}
}
return false
}
func isWebSocket(r *http.Request) bool {
contains := func(key, val string) bool {
vv := strings.Split(r.Header.Get(key), ",")
for _, v := range vv {
if val == strings.ToLower(strings.TrimSpace(v)) {
return true
}
}
return false
}
if contains("Connection", "upgrade") && contains("Upgrade", "websocket") {
return true
}
return false
}

View File

@ -138,7 +138,7 @@ func (s *storage) Stat(key string) (certmagic.KeyInfo, error) {
}, nil }, nil
} }
// NewStorage returns a certmagic.Storage backed by a go-micro/lock and go-micro/store // NewStorage returns a certmagic.Storage backed by a micro/lock and micro/store
func NewStorage(lock sync.Sync, store store.Store) certmagic.Storage { func NewStorage(lock sync.Sync, store store.Store) certmagic.Storage {
return &storage{ return &storage{
lock: lock, lock: lock,

View File

@ -74,7 +74,7 @@ func (s *httpServer) Start() error {
} }
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Info("HTTP API Listening on %s", l.Addr().String()) config.Logger.Infof("HTTP API Listening on %s", l.Addr().String())
} }
s.Lock() s.Lock()
@ -85,7 +85,7 @@ func (s *httpServer) Start() error {
if err := http.Serve(l, s.mux); err != nil { if err := http.Serve(l, s.mux); err != nil {
// temporary fix // temporary fix
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("serve err: %v", err) config.Logger.Errorf("serve err: %v", err)
} }
s.Stop() s.Stop()
} }

View File

@ -13,7 +13,7 @@ import (
type Options struct { type Options struct {
Addrs []string Addrs []string
Secure bool Secure bool
Codec codec.Marshaler Codec codec.Codec
// Logger // Logger
Logger logger.Logger Logger logger.Logger
@ -125,7 +125,7 @@ func Addrs(addrs ...string) Option {
// Codec sets the codec used for encoding/decoding used where // Codec sets the codec used for encoding/decoding used where
// a broker does not support headers // a broker does not support headers
func Codec(c codec.Marshaler) Option { func Codec(c codec.Codec) Option {
return func(o *Options) { return func(o *Options) {
o.Codec = c o.Codec = c
} }

View File

@ -48,7 +48,7 @@ type Request interface {
// The unencoded request body // The unencoded request body
Body() interface{} Body() interface{}
// Write to the encoded request writer. This is nil before a call is made // Write to the encoded request writer. This is nil before a call is made
Codec() codec.Writer Codec() codec.Codec
// indicates whether the request will be a streaming one rather than unary // indicates whether the request will be a streaming one rather than unary
Stream() bool Stream() bool
} }
@ -56,7 +56,7 @@ type Request interface {
// Response is the response received from a service // Response is the response received from a service
type Response interface { type Response interface {
// Read the response // Read the response
Codec() codec.Reader Codec() codec.Codec
// read the header // read the header
Header() metadata.Metadata Header() metadata.Metadata
// Read the undecoded response // Read the undecoded response

View File

@ -3,14 +3,26 @@ package client
import ( import (
"context" "context"
raw "github.com/unistack-org/micro-codec-bytes"
json "github.com/unistack-org/micro-codec-json"
"github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/broker"
"github.com/unistack-org/micro/v3/codec" "github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/errors" "github.com/unistack-org/micro/v3/errors"
"github.com/unistack-org/micro/v3/metadata" "github.com/unistack-org/micro/v3/metadata"
) )
var (
DefaultCodecs = map[string]codec.Codec{
//"application/json": cjson.NewCodec,
//"application/json-rpc": cjsonrpc.NewCodec,
//"application/protobuf": cproto.NewCodec,
//"application/proto-rpc": cprotorpc.NewCodec,
"application/octet-stream": codec.NewCodec(),
}
)
const (
defaultContentType = "application/json"
)
type noopClient struct { type noopClient struct {
opts Options opts Options
} }
@ -27,7 +39,7 @@ type noopRequest struct {
endpoint string endpoint string
contentType string contentType string
body interface{} body interface{}
codec codec.Writer codec codec.Codec
stream bool stream bool
} }
@ -56,7 +68,7 @@ func (n *noopRequest) Body() interface{} {
return n.body return n.body
} }
func (n *noopRequest) Codec() codec.Writer { func (n *noopRequest) Codec() codec.Codec {
return n.codec return n.codec
} }
@ -65,11 +77,11 @@ func (n *noopRequest) Stream() bool {
} }
type noopResponse struct { type noopResponse struct {
codec codec.Reader codec codec.Codec
header metadata.Metadata header metadata.Metadata
} }
func (n *noopResponse) Codec() codec.Reader { func (n *noopResponse) Codec() codec.Codec {
return n.codec return n.codec
} }
@ -123,6 +135,16 @@ func (n *noopMessage) ContentType() string {
return n.opts.ContentType return n.opts.ContentType
} }
func (n *noopClient) newCodec(contentType string) (codec.Codec, error) {
if cf, ok := n.opts.Codecs[contentType]; ok {
return cf, nil
}
if cf, ok := DefaultCodecs[contentType]; ok {
return cf, nil
}
return nil, codec.ErrUnknownContentType
}
func (n *noopClient) Init(opts ...Option) error { func (n *noopClient) Init(opts ...Option) error {
for _, o := range opts { for _, o := range opts {
o(&n.opts) o(&n.opts)
@ -168,21 +190,15 @@ func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOpti
md["Micro-Topic"] = p.Topic() md["Micro-Topic"] = p.Topic()
// passed in raw data // passed in raw data
if d, ok := p.Payload().(*raw.Frame); ok { if d, ok := p.Payload().(*codec.Frame); ok {
body = d.Data body = d.Data
} else { } else {
cf := n.opts.Broker.Options().Codec // use codec for payload
if cf == nil { cf, err := n.newCodec(p.ContentType())
cf = json.Marshaler{} if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
} }
/*
// use codec for payload
cf, err := n.opts.Codecs[p.ContentType()]
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
*/
// set the body // set the body
b, err := cf.Marshal(p.Payload()) b, err := cf.Marshal(p.Payload())
if err != nil { if err != nil {

View File

@ -22,7 +22,7 @@ type Options struct {
// Plugged interfaces // Plugged interfaces
Broker broker.Broker Broker broker.Broker
Codecs map[string]codec.NewCodec Codecs map[string]codec.Codec
Router router.Router Router router.Router
Selector selector.Selector Selector selector.Selector
Transport transport.Transport Transport transport.Transport
@ -141,8 +141,8 @@ type RequestOptions struct {
func NewOptions(opts ...Option) Options { func NewOptions(opts ...Option) Options {
options := Options{ options := Options{
Context: context.Background(), Context: context.Background(),
ContentType: "application/protobuf", ContentType: "application/json",
Codecs: make(map[string]codec.NewCodec), Codecs: make(map[string]codec.Codec),
CallOptions: CallOptions{ CallOptions: CallOptions{
Backoff: DefaultBackoff, Backoff: DefaultBackoff,
Retry: DefaultRetry, Retry: DefaultRetry,
@ -179,7 +179,7 @@ func Logger(l logger.Logger) Option {
} }
// Codec to be used to encode/decode requests for a given content type // Codec to be used to encode/decode requests for a given content type
func Codec(contentType string, c codec.NewCodec) Option { func Codec(contentType string, c codec.Codec) Option {
return func(o *Options) { return func(o *Options) {
o.Codecs[contentType] = c o.Codecs[contentType] = c
} }

View File

@ -34,7 +34,7 @@ func (r *testRequest) Body() interface{} {
return r.body return r.body
} }
func (r *testRequest) Codec() codec.Writer { func (r *testRequest) Codec() codec.Codec {
return r.codec return r.codec
} }

View File

@ -18,40 +18,22 @@ const (
var ( var (
// ErrInvalidMessage returned when invalid messge passed to codec // ErrInvalidMessage returned when invalid messge passed to codec
ErrInvalidMessage = errors.New("invalid message") ErrInvalidMessage = errors.New("invalid message")
// ErrUnknownContentType returned when content-type is unknown
ErrUnknownContentType = errors.New("unknown content-type")
) )
// MessageType // MessageType
type MessageType int type MessageType int
// NewCodec takes in a connection/buffer and returns a new Codec // Codec encodes/decodes various types of messages used within micro.
type NewCodec func(io.ReadWriteCloser) Codec
// Codec encodes/decodes various types of messages used within go-micro.
// ReadHeader and ReadBody are called in pairs to read requests/responses // ReadHeader and ReadBody are called in pairs to read requests/responses
// from the connection. Close is called when finished with the // from the connection. Close is called when finished with the
// connection. ReadBody may be called with a nil argument to force the // connection. ReadBody may be called with a nil argument to force the
// body to be read and discarded. // body to be read and discarded.
type Codec interface { type Codec interface {
Reader ReadHeader(io.ReadWriter, *Message, MessageType) error
Writer ReadBody(io.ReadWriter, interface{}) error
Close() error Write(io.ReadWriter, *Message, interface{}) error
String() string
}
// Reader interface
type Reader interface {
ReadHeader(*Message, MessageType) error
ReadBody(interface{}) error
}
// Writer interface
type Writer interface {
Write(*Message, interface{}) error
}
// Marshaler is a simple encoding interface used for the broker/transport
// where headers are not supported by the underlying implementation.
type Marshaler interface {
Marshal(interface{}) ([]byte, error) Marshal(interface{}) ([]byte, error)
Unmarshal([]byte, interface{}) error Unmarshal([]byte, interface{}) error
String() string String() string

93
codec/noop.go Normal file
View File

@ -0,0 +1,93 @@
package codec
import (
"io"
"io/ioutil"
)
type noopCodec struct {
}
// Frame gives us the ability to define raw data to send over the pipes
type Frame struct {
Data []byte
}
func (c *noopCodec) ReadHeader(conn io.ReadWriter, m *Message, t MessageType) error {
return nil
}
func (c *noopCodec) ReadBody(conn io.ReadWriter, b interface{}) error {
// read bytes
buf, err := ioutil.ReadAll(conn)
if err != nil {
return err
}
if b == nil {
return nil
}
switch v := b.(type) {
case []byte:
v = buf
case *[]byte:
*v = buf
case *Frame:
v.Data = buf
default:
return ErrInvalidMessage
}
return nil
}
func (c *noopCodec) Write(conn io.ReadWriter, m *Message, b interface{}) error {
var v []byte
switch vb := b.(type) {
case nil:
return nil
case *Frame:
v = vb.Data
case *[]byte:
v = *vb
case []byte:
v = vb
default:
return ErrInvalidMessage
}
_, err := conn.Write(v)
return err
}
func (c *noopCodec) String() string {
return "noop"
}
func NewCodec() Codec {
return &noopCodec{}
}
func (n *noopCodec) Marshal(v interface{}) ([]byte, error) {
switch ve := v.(type) {
case *[]byte:
return *ve, nil
case []byte:
return ve, nil
case *Message:
return ve.Body, nil
}
return nil, ErrInvalidMessage
}
func (n *noopCodec) Unmarshal(d []byte, v interface{}) error {
switch ve := v.(type) {
case []byte:
ve = d
case *[]byte:
*ve = d
case *Message:
ve.Body = d
}
return ErrInvalidMessage
}

8
go.mod
View File

@ -7,11 +7,8 @@ require (
github.com/caddyserver/certmagic v0.10.6 github.com/caddyserver/certmagic v0.10.6
github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/ef-ds/deque v1.0.4-0.20190904040645-54cb57c252a1 github.com/ef-ds/deque v1.0.4-0.20190904040645-54cb57c252a1
github.com/evanphx/json-patch/v5 v5.1.0
github.com/ghodss/yaml v1.0.0 github.com/ghodss/yaml v1.0.0
github.com/go-acme/lego/v3 v3.4.0 github.com/go-acme/lego/v3 v3.4.0
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee
github.com/gobwas/ws v1.0.3
github.com/golang/protobuf v1.4.3 github.com/golang/protobuf v1.4.3
github.com/google/uuid v1.1.2 github.com/google/uuid v1.1.2
github.com/hashicorp/hcl v1.0.0 github.com/hashicorp/hcl v1.0.0
@ -19,11 +16,6 @@ require (
github.com/miekg/dns v1.1.31 github.com/miekg/dns v1.1.31
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c
github.com/stretchr/testify v1.5.1 github.com/stretchr/testify v1.5.1
github.com/unistack-org/micro-codec-bytes v0.0.0-20200828083432-4e49e953d844
github.com/unistack-org/micro-codec-json v0.0.0-20201102222734-a29c895ec05c
github.com/unistack-org/micro-codec-jsonrpc v0.0.0-20201102222451-ff6a69988bcd
github.com/unistack-org/micro-codec-proto v0.0.0-20201102222202-769c2d6a4b92
github.com/unistack-org/micro-codec-protorpc v0.0.0-20201102222610-3a343898c077
github.com/unistack-org/micro-config-cmd v0.0.0-20201028144621-5a55f1aad70a github.com/unistack-org/micro-config-cmd v0.0.0-20201028144621-5a55f1aad70a
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a
golang.org/x/net v0.0.0-20200904194848-62affa334b73 golang.org/x/net v0.0.0-20200904194848-62affa334b73

11
go.sum
View File

@ -258,25 +258,14 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
github.com/timewasted/linode v0.0.0-20160829202747-37e84520dcf7/go.mod h1:imsgLplxEC/etjIhdr3dNzV3JeT27LbVu5pYWm0JCBY= github.com/timewasted/linode v0.0.0-20160829202747-37e84520dcf7/go.mod h1:imsgLplxEC/etjIhdr3dNzV3JeT27LbVu5pYWm0JCBY=
github.com/transip/gotransip v0.0.0-20190812104329-6d8d9179b66f/go.mod h1:i0f4R4o2HM0m3DZYQWsj6/MEowD57VzoH0v3d7igeFY= github.com/transip/gotransip v0.0.0-20190812104329-6d8d9179b66f/go.mod h1:i0f4R4o2HM0m3DZYQWsj6/MEowD57VzoH0v3d7igeFY=
github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g= github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g=
github.com/unistack-org/micro-codec-bytes v0.0.0-20200827104921-3616a69473a6/go.mod h1:g5sOI8TWgGZiVHe8zoUPdtz7+0oLnqTnfBoai6Qb7jE=
github.com/unistack-org/micro-codec-bytes v0.0.0-20200828083432-4e49e953d844 h1:5b1yuSllbsMm/9fUIlIXSr8DbsKT/sAKSCgOx6+SAfI= github.com/unistack-org/micro-codec-bytes v0.0.0-20200828083432-4e49e953d844 h1:5b1yuSllbsMm/9fUIlIXSr8DbsKT/sAKSCgOx6+SAfI=
github.com/unistack-org/micro-codec-bytes v0.0.0-20200828083432-4e49e953d844/go.mod h1:g5sOI8TWgGZiVHe8zoUPdtz7+0oLnqTnfBoai6Qb7jE= github.com/unistack-org/micro-codec-bytes v0.0.0-20200828083432-4e49e953d844/go.mod h1:g5sOI8TWgGZiVHe8zoUPdtz7+0oLnqTnfBoai6Qb7jE=
github.com/unistack-org/micro-codec-json v0.0.0-20201102222734-a29c895ec05c h1:RtcNaK8rQSl7xAoy1W437dvZLCVjSC6e4JcolepSQs0=
github.com/unistack-org/micro-codec-json v0.0.0-20201102222734-a29c895ec05c/go.mod h1:dG5aUyhBv+ebOl/UFW2Aj2GTfVxxXWi6AcynpePOAhQ=
github.com/unistack-org/micro-codec-jsonrpc v0.0.0-20201102222451-ff6a69988bcd h1:qXSiEfVnCgrwTHYvAnEPSHEai3+5EUH9ZYovLpxGDwg=
github.com/unistack-org/micro-codec-jsonrpc v0.0.0-20201102222451-ff6a69988bcd/go.mod h1:PFyvkGhavl+3tEPgOaLAhoJJX4/webVGW59BSOXDfNM=
github.com/unistack-org/micro-codec-proto v0.0.0-20201102222202-769c2d6a4b92 h1:1rPDBu7Nwo3ZL6r6H5rj7qNchHSdBF4zcewAeTUEMC4=
github.com/unistack-org/micro-codec-proto v0.0.0-20201102222202-769c2d6a4b92/go.mod h1:31JMo683bBQ+uN9YufpUU6ESHphyx3DFmTXEnjpJV9Y=
github.com/unistack-org/micro-codec-protorpc v0.0.0-20201102222610-3a343898c077 h1:uK7owL8TPSwoQiDM1V/0swmgCEepSQKXoi8GEnGxtlU=
github.com/unistack-org/micro-codec-protorpc v0.0.0-20201102222610-3a343898c077/go.mod h1:Ct4uAVZaDEyBZj9Q0poDkbzu6zKXUCcSqJkv/MWPpeI=
github.com/unistack-org/micro-config-cmd v0.0.0-20200828075439-d859b9d7265b/go.mod h1:6pm1cadbwsFcEW1ZbV5Fp0i3goR3TNfROMNSPih3I8k=
github.com/unistack-org/micro-config-cmd v0.0.0-20200909210346-ec89783dc46c/go.mod h1:6pm1cadbwsFcEW1ZbV5Fp0i3goR3TNfROMNSPih3I8k= github.com/unistack-org/micro-config-cmd v0.0.0-20200909210346-ec89783dc46c/go.mod h1:6pm1cadbwsFcEW1ZbV5Fp0i3goR3TNfROMNSPih3I8k=
github.com/unistack-org/micro-config-cmd v0.0.0-20200909210755-6e7e85eeab34/go.mod h1:fT1gYn+TtfVZZ5tNx56bZIncJjmlji66g7GKdWua5hE= github.com/unistack-org/micro-config-cmd v0.0.0-20200909210755-6e7e85eeab34/go.mod h1:fT1gYn+TtfVZZ5tNx56bZIncJjmlji66g7GKdWua5hE=
github.com/unistack-org/micro-config-cmd v0.0.0-20200920140133-0853deb2e5dc/go.mod h1:il8nz4ZEcX3Usyfrtwy+YtQcb7xSUSFJdSe8PBJ9gOA= github.com/unistack-org/micro-config-cmd v0.0.0-20200920140133-0853deb2e5dc/go.mod h1:il8nz4ZEcX3Usyfrtwy+YtQcb7xSUSFJdSe8PBJ9gOA=
github.com/unistack-org/micro-config-cmd v0.0.0-20201028144621-5a55f1aad70a h1:VjlqP1qZkjC0Chmx5MKFPIbtSCigeICFDf8vaLZGh9o= github.com/unistack-org/micro-config-cmd v0.0.0-20201028144621-5a55f1aad70a h1:VjlqP1qZkjC0Chmx5MKFPIbtSCigeICFDf8vaLZGh9o=
github.com/unistack-org/micro-config-cmd v0.0.0-20201028144621-5a55f1aad70a/go.mod h1:MzMg+qh1wORZwYtg5AVgFkNFrXVVbdPKW7s/Is+A994= github.com/unistack-org/micro-config-cmd v0.0.0-20201028144621-5a55f1aad70a/go.mod h1:MzMg+qh1wORZwYtg5AVgFkNFrXVVbdPKW7s/Is+A994=
github.com/unistack-org/micro/v3 v3.0.0-20200827083227-aa99378adc6e/go.mod h1:rPQbnry3nboAnMczj8B1Gzlcyv/HYoMZLgd3/3nttJ4= github.com/unistack-org/micro/v3 v3.0.0-20200827083227-aa99378adc6e/go.mod h1:rPQbnry3nboAnMczj8B1Gzlcyv/HYoMZLgd3/3nttJ4=
github.com/unistack-org/micro/v3 v3.0.0-gamma/go.mod h1:iEtpu3wTYCRs3pQ3VsFEO7JBO4lOMpkOwMyrpZyIDPo=
github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200909210629-caec730248b1/go.mod h1:mmqHR9WelHUXqg2mELjsQ+FJHcWs6mNmXg+wEYO2T3c= github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200909210629-caec730248b1/go.mod h1:mmqHR9WelHUXqg2mELjsQ+FJHcWs6mNmXg+wEYO2T3c=
github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200920135754-1cbd1d2bad83/go.mod h1:HUzMG4Mcy97958VxWTg8zuazZgwQ/aoLZ8wtBVONwRE= github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200920135754-1cbd1d2bad83/go.mod h1:HUzMG4Mcy97958VxWTg8zuazZgwQ/aoLZ8wtBVONwRE=
github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200922103357-4c4fa00a5d94/go.mod h1:aL+8VhSXpx0SuEeXPOWUo5BgS7kyvWYobeXFay90UUM= github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200922103357-4c4fa00a5d94/go.mod h1:aL+8VhSXpx0SuEeXPOWUo5BgS7kyvWYobeXFay90UUM=

View File

@ -12,7 +12,7 @@ import (
type serviceKey struct{} type serviceKey struct{}
// Service is an interface that wraps the lower level libraries // Service is an interface that wraps the lower level libraries
// within go-micro. Its a convenience method for building // within micro. Its a convenience method for building
// and initialising services. // and initialising services.
type Service interface { type Service interface {
// The service name // The service name

View File

@ -12,7 +12,7 @@ type Options struct {
// Database to write to // Database to write to
Database string Database string
// for serialising // for serialising
Codec codec.Marshaler Codec codec.Codec
// for locking // for locking
Sync sync.Sync Sync sync.Sync
// for storage // for storage

View File

@ -14,7 +14,7 @@ type Options struct {
Addrs []string Addrs []string
// Codec is the codec interface to use where headers are not supported // Codec is the codec interface to use where headers are not supported
// by the transport and the entire payload must be encoded // by the transport and the entire payload must be encoded
Codec codec.Marshaler Codec codec.Codec
// Secure tells the transport to secure the connection. // Secure tells the transport to secure the connection.
// In the case TLSConfig is not specified best effort self-signed // In the case TLSConfig is not specified best effort self-signed
// certs should be used // certs should be used
@ -121,7 +121,7 @@ func Context(ctx context.Context) Option {
// Codec sets the codec used for encoding where the transport // Codec sets the codec used for encoding where the transport
// does not support message headers // does not support message headers
func Codec(c codec.Marshaler) Option { func Codec(c codec.Codec) Option {
return func(o *Options) { return func(o *Options) {
o.Codec = c o.Codec = c
} }

View File

@ -46,7 +46,7 @@ var (
// Mode of the session // Mode of the session
type Mode uint8 type Mode uint8
// Tunnel creates a gre tunnel on top of the go-micro/transport. // Tunnel creates a gre tunnel on top of the micro/transport.
// It establishes multiple streams using the Micro-Tunnel-Channel header // It establishes multiple streams using the Micro-Tunnel-Channel header
// and Micro-Tunnel-Session header. The tunnel id is a hash of // and Micro-Tunnel-Session header. The tunnel id is a hash of
// the address being requested. // the address being requested.

View File

@ -1,4 +1,4 @@
// Package proxy is a transparent proxy built on the go-micro/server // Package proxy is a transparent proxy built on the micro/server
package proxy package proxy
import ( import (

View File

@ -1,4 +1,4 @@
// Package proxy is a transparent proxy built on the go-micro/server // Package proxy is a transparent proxy built on the micro/server
package proxy package proxy
import ( import (
@ -7,7 +7,7 @@ import (
"github.com/unistack-org/micro/v3/server" "github.com/unistack-org/micro/v3/server"
) )
// Proxy can be used as a proxy server for go-micro services // Proxy can be used as a proxy server for micro services
type Proxy interface { type Proxy interface {
// ProcessMessage handles inbound messages // ProcessMessage handles inbound messages
ProcessMessage(context.Context, server.Message) error ProcessMessage(context.Context, server.Message) error

View File

@ -1,4 +1,4 @@
// Package registry resolves names using the go-micro registry // Package registry resolves names using the micro registry
package registry package registry
import ( import (

View File

@ -1,17 +1,15 @@
package server package server
import ( import (
"bytes"
"fmt" "fmt"
"sort" "sort"
"sync" "sync"
"time" "time"
craw "github.com/unistack-org/micro-codec-bytes" // cjson "github.com/unistack-org/micro-codec-json"
cjson "github.com/unistack-org/micro-codec-json" // cjsonrpc "github.com/unistack-org/micro-codec-jsonrpc"
cjsonrpc "github.com/unistack-org/micro-codec-jsonrpc" // cproto "github.com/unistack-org/micro-codec-proto"
cproto "github.com/unistack-org/micro-codec-proto" // cprotorpc "github.com/unistack-org/micro-codec-protorpc"
cprotorpc "github.com/unistack-org/micro-codec-protorpc"
"github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/broker"
"github.com/unistack-org/micro/v3/codec" "github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/logger"
@ -19,12 +17,12 @@ import (
) )
var ( var (
DefaultCodecs = map[string]codec.NewCodec{ DefaultCodecs = map[string]codec.Codec{
"application/json": cjson.NewCodec, //"application/json": cjson.NewCodec,
"application/json-rpc": cjsonrpc.NewCodec, //"application/json-rpc": cjsonrpc.NewCodec,
"application/protobuf": cproto.NewCodec, //"application/protobuf": cproto.NewCodec,
"application/proto-rpc": cprotorpc.NewCodec, //"application/proto-rpc": cprotorpc.NewCodec,
"application/octet-stream": craw.NewCodec, "application/octet-stream": codec.NewCodec(),
} }
) )
@ -50,14 +48,14 @@ func NewServer(opts ...Option) Server {
return &noopServer{opts: NewOptions(opts...)} return &noopServer{opts: NewOptions(opts...)}
} }
func (n *noopServer) newCodec(contentType string) (codec.NewCodec, error) { func (n *noopServer) newCodec(contentType string) (codec.Codec, error) {
if cf, ok := n.opts.Codecs[contentType]; ok { if cf, ok := n.opts.Codecs[contentType]; ok {
return cf, nil return cf, nil
} }
if cf, ok := DefaultCodecs[contentType]; ok { if cf, ok := DefaultCodecs[contentType]; ok {
return cf, nil return cf, nil
} }
return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType) return nil, codec.ErrUnknownContentType
} }
func (n *noopServer) Handle(handler Handler) error { func (n *noopServer) Handle(handler Handler) error {
@ -188,7 +186,7 @@ func (n *noopServer) Register() error {
if !registered { if !registered {
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Info("Registry [%s] Registering node: %s", config.Registry.String(), service.Nodes[0].Id) config.Logger.Infof("registry [%s] Registering node: %s", config.Registry.String(), service.Nodes[0].Id)
} }
} }
@ -221,7 +219,7 @@ func (n *noopServer) Register() error {
opts = append(opts, broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)) opts = append(opts, broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck))
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Info("Subscribing to topic: %s", sb.Topic()) config.Logger.Infof("subscribing to topic: %s", sb.Topic())
} }
sub, err := config.Broker.Subscribe(cx, sb.Topic(), handler, opts...) sub, err := config.Broker.Subscribe(cx, sb.Topic(), handler, opts...)
if err != nil { if err != nil {
@ -251,7 +249,7 @@ func (n *noopServer) Deregister() error {
} }
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Info("deregistering node: %s", service.Nodes[0].Id) config.Logger.Infof("deregistering node: %s", service.Nodes[0].Id)
} }
if err := DefaultDeregisterFunc(service, config); err != nil { if err := DefaultDeregisterFunc(service, config); err != nil {
@ -281,11 +279,11 @@ func (n *noopServer) Deregister() error {
go func(s broker.Subscriber) { go func(s broker.Subscriber) {
defer wg.Done() defer wg.Done()
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Info("unsubscribing from topic: %s", s.Topic()) config.Logger.Infof("unsubscribing from topic: %s", s.Topic())
} }
if err := s.Unsubscribe(cx); err != nil { if err := s.Unsubscribe(cx); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("unsubscribing from topic: %s err: %v", s.Topic(), err) config.Logger.Errorf("unsubscribing from topic: %s err: %v", s.Topic(), err)
} }
} }
}(sub) }(sub)
@ -308,7 +306,7 @@ func (n *noopServer) Start() error {
n.RUnlock() n.RUnlock()
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Info("Server [noop] Listening on %s", config.Address) config.Logger.Infof("server [noop] Listening on %s", config.Address)
} }
n.Lock() n.Lock()
if len(config.Advertise) == 0 { if len(config.Advertise) == 0 {
@ -321,26 +319,26 @@ func (n *noopServer) Start() error {
// connect to the broker // connect to the broker
if err := config.Broker.Connect(config.Context); err != nil { if err := config.Broker.Connect(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("Broker [%s] connect error: %v", config.Broker.String(), err) config.Logger.Errorf("broker [%s] connect error: %v", config.Broker.String(), err)
} }
return err return err
} }
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Info("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()) config.Logger.Infof("broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())
} }
} }
// use RegisterCheck func before register // use RegisterCheck func before register
if err := config.RegisterCheck(config.Context); err != nil { if err := config.RegisterCheck(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("Server %s-%s register check error: %s", config.Name, config.Id, err) config.Logger.Errorf("server %s-%s register check error: %s", config.Name, config.Id, err)
} }
} else { } else {
// announce self to the world // announce self to the world
if err := n.Register(); err != nil { if err := n.Register(); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("Server register error: %v", err) config.Logger.Errorf("server register error: %v", err)
} }
} }
} }
@ -368,23 +366,23 @@ func (n *noopServer) Start() error {
rerr := config.RegisterCheck(config.Context) rerr := config.RegisterCheck(config.Context)
if rerr != nil && registered { if rerr != nil && registered {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr) config.Logger.Errorf("server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr)
} }
// deregister self in case of error // deregister self in case of error
if err := n.Deregister(); err != nil { if err := n.Deregister(); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("Server %s-%s deregister error: %s", config.Name, config.Id, err) config.Logger.Errorf("server %s-%s deregister error: %s", config.Name, config.Id, err)
} }
} }
} else if rerr != nil && !registered { } else if rerr != nil && !registered {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("Server %s-%s register check error: %s", config.Name, config.Id, rerr) config.Logger.Errorf("server %s-%s register check error: %s", config.Name, config.Id, rerr)
} }
continue continue
} }
if err := n.Register(); err != nil { if err := n.Register(); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("Server %s-%s register error: %s", config.Name, config.Id, err) config.Logger.Errorf("server %s-%s register error: %s", config.Name, config.Id, err)
} }
} }
// wait for exit // wait for exit
@ -396,7 +394,7 @@ func (n *noopServer) Start() error {
// deregister self // deregister self
if err := n.Deregister(); err != nil { if err := n.Deregister(); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("Server deregister error: ", err) config.Logger.Errorf("server deregister error: ", err)
} }
} }
@ -409,12 +407,12 @@ func (n *noopServer) Start() error {
ch <- nil ch <- nil
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Info("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()) config.Logger.Infof("broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())
} }
// disconnect broker // disconnect broker
if err := config.Broker.Disconnect(config.Context); err != nil { if err := config.Broker.Disconnect(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("Broker [%s] disconnect error: %v", config.Broker.String(), err) config.Logger.Errorf("broker [%s] disconnect error: %v", config.Broker.String(), err)
} }
} }
}() }()
@ -446,11 +444,3 @@ func (n *noopServer) Stop() error {
return err return err
} }
type noopcodec struct {
*bytes.Buffer
}
func (c noopcodec) Close() error {
return nil
}

View File

@ -18,7 +18,7 @@ import (
// Options server struct // Options server struct
type Options struct { type Options struct {
Codecs map[string]codec.NewCodec Codecs map[string]codec.Codec
Broker broker.Broker Broker broker.Broker
Registry registry.Registry Registry registry.Registry
Tracer tracer.Tracer Tracer tracer.Tracer
@ -62,7 +62,7 @@ type Options struct {
func NewOptions(opts ...Option) Options { func NewOptions(opts ...Option) Options {
options := Options{ options := Options{
Auth: auth.DefaultAuth, Auth: auth.DefaultAuth,
Codecs: make(map[string]codec.NewCodec), Codecs: make(map[string]codec.Codec),
Context: context.Background(), Context: context.Background(),
Metadata: metadata.New(0), Metadata: metadata.New(0),
RegisterInterval: DefaultRegisterInterval, RegisterInterval: DefaultRegisterInterval,
@ -144,7 +144,7 @@ func Broker(b broker.Broker) Option {
} }
// Codec to use to encode/decode requests for a given content type // Codec to use to encode/decode requests for a given content type
func Codec(contentType string, c codec.NewCodec) Option { func Codec(contentType string, c codec.Codec) Option {
return func(o *Options) { return func(o *Options) {
o.Codecs[contentType] = c o.Codecs[contentType] = c
} }

View File

@ -34,6 +34,6 @@ func (r *rpcMessage) Body() []byte {
return r.body return r.body
} }
func (r *rpcMessage) Codec() codec.Reader { func (r *rpcMessage) Codec() codec.Codec {
return r.codec return r.codec
} }

View File

@ -58,7 +58,7 @@ type Message interface {
// The raw body of the message // The raw body of the message
Body() []byte Body() []byte
// Codec used to decode the message // Codec used to decode the message
Codec() codec.Reader Codec() codec.Codec
} }
// Request is a synchronous request interface // Request is a synchronous request interface
@ -78,7 +78,7 @@ type Request interface {
// Read the undecoded request body // Read the undecoded request body
Read() ([]byte, error) Read() ([]byte, error)
// The encoded message stream // The encoded message stream
Codec() codec.Reader Codec() codec.Codec
// Indicates whether its a stream // Indicates whether its a stream
Stream() bool Stream() bool
} }
@ -86,7 +86,7 @@ type Request interface {
// Response is the response writer for unencoded messages // Response is the response writer for unencoded messages
type Response interface { type Response interface {
// Encoded writer // Encoded writer
Codec() codec.Writer Codec() codec.Codec
// Write the header // Write the header
WriteHeader(metadata.Metadata) WriteHeader(metadata.Metadata)
// write a response directly to the client // write a response directly to the client

View File

@ -239,7 +239,7 @@ func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handl
req = req.Elem() req = req.Elem()
} }
if err = cf(noopcodec{bytes.NewBuffer(msg.Body)}).ReadBody(req.Interface()); err != nil { if err = cf.ReadBody(bytes.NewBuffer(msg.Body), req.Interface()); err != nil {
return err return err
} }

View File

@ -168,7 +168,7 @@ func (s *service) Start() error {
s.RUnlock() s.RUnlock()
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Info("Starting [service] %s", s.Name()) config.Logger.Infof("starting [service] %s", s.Name())
} }
for _, fn := range s.opts.BeforeStart { for _, fn := range s.opts.BeforeStart {
@ -218,7 +218,7 @@ func (s *service) Stop() error {
s.RUnlock() s.RUnlock()
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Info("Stoppping [service] %s", s.Name()) config.Logger.Infof("stoppping [service] %s", s.Name())
} }
var err error var err error

View File

@ -23,7 +23,7 @@ var (
// ErrReadNamespace is returned when the names could not be read from service account // ErrReadNamespace is returned when the names could not be read from service account
ErrReadNamespace = errors.New("Could not read namespace from service account secret") ErrReadNamespace = errors.New("Could not read namespace from service account secret")
// DefaultImage is default micro image // DefaultImage is default micro image
DefaultImage = "micro/go-micro" DefaultImage = "micro/micro"
// DefaultNamespace is the default k8s namespace // DefaultNamespace is the default k8s namespace
DefaultNamespace = "default" DefaultNamespace = "default"
) )

View File

@ -35,7 +35,7 @@ func TestFormatName(t *testing.T) {
{"foo-bar", "foo-bar"}, {"foo-bar", "foo-bar"},
{"foo.bar", "foo-bar"}, {"foo.bar", "foo-bar"},
{"Foo.Bar", "foo-bar"}, {"Foo.Bar", "foo-bar"},
{"go.micro.foo.bar", "go-micro-foo-bar"}, {"micro.foo.bar", "micro-foo-bar"},
} }
for _, test := range testCases { for _, test := range testCases {

View File

@ -31,8 +31,8 @@ type request struct {
context context.Context context context.Context
} }
func (r *request) Codec() codec.Reader { func (r *request) Codec() codec.Codec {
return r.Request.Codec().(codec.Reader) return r.Request.Codec()
} }
func (r *request) Header() metadata.Metadata { func (r *request) Header() metadata.Metadata {