diff --git a/api/handler/http/http.go b/api/handler/http/http.go index 011acfec..021c93f9 100644 --- a/api/handler/http/http.go +++ b/api/handler/http/http.go @@ -65,16 +65,17 @@ func (h *httpHandler) getService(r *http.Request) (string, error) { return "", errors.New("no route found") } + if len(service.Services) == 0 { + return "", errors.New("no route found") + } + // get the nodes for this service - var nodes []*registry.Node + nodes := make([]*registry.Node, 0, len(service.Services)) for _, srv := range service.Services { nodes = append(nodes, srv.Nodes...) } // select a random node - if len(nodes) == 0 { - return "", errors.New("no route found") - } node := nodes[rand.Int()%len(nodes)] return fmt.Sprintf("http://%s", node.Address), nil diff --git a/api/handler/web/web.go b/api/handler/web/web.go index 41b27bde..d4052341 100644 --- a/api/handler/web/web.go +++ b/api/handler/web/web.go @@ -72,10 +72,11 @@ func (wh *webHandler) getService(r *http.Request) (string, error) { } // get the nodes - var nodes []*registry.Node + nodes := make([]*registry.Node, 0, len(service.Services)) for _, srv := range service.Services { nodes = append(nodes, srv.Nodes...) } + if len(nodes) == 0 { return "", errors.New("no route found") } diff --git a/api/router/router_test.go b/api/router/router_test.go index 36af155c..511c77f0 100644 --- a/api/router/router_test.go +++ b/api/router/router_test.go @@ -29,7 +29,6 @@ import ( // server is used to implement helloworld.GreeterServer. type testServer struct { - msgCount int } // TestHello implements helloworld.GreeterServer diff --git a/api/server/http/http.go b/api/server/http/http.go index 42737456..c4902927 100644 --- a/api/server/http/http.go +++ b/api/server/http/http.go @@ -97,7 +97,7 @@ func (s *httpServer) Start() error { go func() { if err := http.Serve(l, s.mux); err != nil { // temporary fix - //logger.Fatal(err) + logger.Error(err) } }() diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index 37b9205d..0e08221d 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -75,7 +75,6 @@ func (g *grpcClient) secure(addr string) grpc.DialOption { func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error { var header map[string]string - header = make(map[string]string) if md, ok := metadata.FromContext(ctx); ok { header = make(map[string]string, len(md)) for k, v := range md { @@ -103,8 +102,10 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, var grr error + gctx, cancel := context.WithTimeout(ctx, opts.DialTimeout) + defer cancel() + grpcDialOptions := []grpc.DialOption{ - grpc.WithTimeout(opts.DialTimeout), g.secure(addr), grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(maxRecvMsgSize), @@ -116,7 +117,7 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, grpcDialOptions = append(grpcDialOptions, opts...) } - cc, err := g.pool.getConn(addr, grpcDialOptions...) + cc, err := g.pool.getConn(gctx, addr, grpcDialOptions...) if err != nil { return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) } @@ -187,7 +188,6 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request wc := wrapCodec{cf} grpcDialOptions := []grpc.DialOption{ - grpc.WithTimeout(opts.DialTimeout), g.secure(addr), } diff --git a/client/grpc/grpc_pool.go b/client/grpc/grpc_pool.go index 0eec4698..b9a340f3 100644 --- a/client/grpc/grpc_pool.go +++ b/client/grpc/grpc_pool.go @@ -1,6 +1,7 @@ package grpc import ( + "context" "sync" "time" @@ -66,7 +67,7 @@ func newPool(size int, ttl time.Duration, idle int, ms int) *pool { } } -func (p *pool) getConn(addr string, opts ...grpc.DialOption) (*poolConn, error) { +func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption) (*poolConn, error) { now := time.Now().Unix() p.Lock() sp, ok := p.conns[addr] @@ -135,7 +136,7 @@ func (p *pool) getConn(addr string, opts ...grpc.DialOption) (*poolConn, error) p.Unlock() // create new conn - cc, err := grpc.Dial(addr, opts...) + cc, err := grpc.DialContext(ctx, addr, opts...) if err != nil { return nil, err } @@ -184,7 +185,6 @@ func (p *pool) release(addr string, conn *poolConn, err error) { sp.idle++ } p.Unlock() - return } func (conn *poolConn) Close() { @@ -202,7 +202,6 @@ func removeConn(conn *poolConn) { conn.next = nil conn.in = false conn.sp.count-- - return } func addConnAfter(conn *poolConn, after *poolConn) { @@ -214,5 +213,4 @@ func addConnAfter(conn *poolConn, after *poolConn) { after.next = conn conn.in = true conn.sp.count++ - return } diff --git a/client/grpc/grpc_pool_test.go b/client/grpc/grpc_pool_test.go index aa087ae0..c5cff92f 100644 --- a/client/grpc/grpc_pool_test.go +++ b/client/grpc/grpc_pool_test.go @@ -13,7 +13,7 @@ import ( func testPool(t *testing.T, size int, ttl time.Duration, idle int, ms int) { // setup server - l, err := net.Listen("tcp", ":0") + l, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { t.Fatalf("failed to listen: %v", err) } @@ -24,13 +24,14 @@ func testPool(t *testing.T, size int, ttl time.Duration, idle int, ms int) { go s.Serve(l) defer s.Stop() + ctx := context.Background() // zero pool p := newPool(size, ttl, idle, ms) for i := 0; i < 10; i++ { // get a conn - cc, err := p.getConn(l.Addr().String(), grpc.WithInsecure()) + cc, err := p.getConn(ctx, l.Addr().String(), grpc.WithInsecure()) if err != nil { t.Fatal(err) } diff --git a/client/grpc/grpc_test.go b/client/grpc/grpc_test.go index 52a3ddc5..7493fade 100644 --- a/client/grpc/grpc_test.go +++ b/client/grpc/grpc_test.go @@ -27,7 +27,7 @@ func (g *greeterServer) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb. } func TestGRPCClient(t *testing.T) { - l, err := net.Listen("tcp", ":0") + l, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { t.Fatalf("failed to listen: %v", err) } diff --git a/client/lookup.go b/client/lookup.go index 90de314b..f452cda1 100644 --- a/client/lookup.go +++ b/client/lookup.go @@ -40,8 +40,7 @@ func LookupRoute(ctx context.Context, req Request, opts CallOptions) ([]string, return routes[i].Metric < routes[j].Metric }) - var addrs []string - + addrs := make([]string, 0, len(routes)) for _, route := range routes { addrs = append(addrs, route.Address) } diff --git a/client/mucp/mucp_codec.go b/client/mucp/mucp_codec.go index ccb87ca6..58b2d347 100644 --- a/client/mucp/mucp_codec.go +++ b/client/mucp/mucp_codec.go @@ -12,7 +12,6 @@ import ( "github.com/unistack-org/micro/v3/codec/proto" "github.com/unistack-org/micro/v3/codec/protorpc" "github.com/unistack-org/micro/v3/errors" - "github.com/unistack-org/micro/v3/registry" "github.com/unistack-org/micro/v3/transport" ) @@ -62,15 +61,6 @@ var ( "application/proto-rpc": protorpc.NewCodec, "application/octet-stream": raw.NewCodec, } - - // TODO: remove legacy codec list - defaultCodecs = map[string]codec.NewCodec{ - "application/json": jsonrpc.NewCodec, - "application/json-rpc": jsonrpc.NewCodec, - "application/protobuf": protorpc.NewCodec, - "application/proto-rpc": protorpc.NewCodec, - "application/octet-stream": protorpc.NewCodec, - } ) func (rwc *readWriteCloser) Read(p []byte) (n int, err error) { @@ -127,30 +117,6 @@ func setHeaders(m *codec.Message, stream string) { } } -// setupProtocol sets up the old protocol -func setupProtocol(msg *transport.Message, node *registry.Node) codec.NewCodec { - // get the protocol from node metadata - if protocol := node.Metadata["protocol"]; len(protocol) > 0 { - return nil - } - - // processing topic publishing - if len(msg.Header["Micro-Topic"]) > 0 { - return nil - } - - // no protocol use old codecs - switch msg.Header["Content-Type"] { - case "application/json": - msg.Header["Content-Type"] = "application/json-rpc" - case "application/protobuf": - msg.Header["Content-Type"] = "application/proto-rpc" - } - - // now return codec - return defaultCodecs[msg.Header["Content-Type"]] -} - func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCodec, stream string) codec.Codec { rwc := &readWriteCloser{ wbuf: bytes.NewBuffer(nil), diff --git a/codec/jsonrpc/server.go b/codec/jsonrpc/server.go index ceda9bc2..28b2af0f 100644 --- a/codec/jsonrpc/server.go +++ b/codec/jsonrpc/server.go @@ -14,8 +14,7 @@ type serverCodec struct { c io.Closer // temporary work space - req serverRequest - resp serverResponse + req serverRequest } type serverRequest struct { @@ -68,8 +67,6 @@ func (c *serverCodec) ReadBody(x interface{}) error { return json.Unmarshal(*c.req.Params, ¶ms) } -var null = json.RawMessage([]byte("null")) - func (c *serverCodec) Write(m *codec.Message, x interface{}) error { var resp serverResponse resp.ID = m.Id diff --git a/config/default_test.go b/config/default_test.go index 303628a3..fa16afa8 100644 --- a/config/default_test.go +++ b/config/default_test.go @@ -145,7 +145,7 @@ func TestConfigWatcherDirtyOverrite(t *testing.T) { l := 100 - ss := make([]source.Source, l, l) + ss := make([]source.Source, l) for i := 0; i < l; i++ { ss[i] = memory.NewSource(memory.WithJSON([]byte(fmt.Sprintf(`{"key%d": "val%d"}`, i, i)))) diff --git a/config/reader/json/values.go b/config/reader/json/values.go index 9132702d..e6224faa 100644 --- a/config/reader/json/values.go +++ b/config/reader/json/values.go @@ -54,7 +54,6 @@ func (j *jsonValues) Del(path ...string) { vals := j.sj.GetPath(path[:len(path)-1]...) vals.Del(path[len(path)-1]) j.sj.SetPath(path[:len(path)-1], vals.Interface()) - return } func (j *jsonValues) Set(val interface{}, path ...string) { diff --git a/config/source/flag/flag.go b/config/source/flag/flag.go index b8877740..0eeb8c4e 100644 --- a/config/source/flag/flag.go +++ b/config/source/flag/flag.go @@ -37,7 +37,6 @@ func (fs *flagsrc) Read() (*source.ChangeSet, error) { } mergo.Map(&changes, tmp) // need to sort error handling - return } unset, ok := fs.opts.Context.Value(includeUnsetKey{}).(bool) diff --git a/debug/log/kubernetes/kubernetes.go b/debug/log/kubernetes/kubernetes.go index 84760edc..9ae75121 100644 --- a/debug/log/kubernetes/kubernetes.go +++ b/debug/log/kubernetes/kubernetes.go @@ -31,7 +31,7 @@ func (k *klog) podLogStream(podName string, stream *kubeStream) { }, client.LogParams(p)) if err != nil { - fmt.Fprintf(os.Stderr, err.Error()) + fmt.Fprintf(os.Stderr, "%v", err) return } @@ -120,7 +120,7 @@ func (k *klog) Read(options ...log.ReadOption) ([]log.Record, error) { logParams["tailLines"] = strconv.Itoa(opts.Count) } - if opts.Stream == true { + if opts.Stream { logParams["follow"] = "true" } diff --git a/debug/log/kubernetes/kubernetes_test.go b/debug/log/kubernetes/kubernetes_test.go index df8eba82..6d15e1c6 100644 --- a/debug/log/kubernetes/kubernetes_test.go +++ b/debug/log/kubernetes/kubernetes_test.go @@ -13,11 +13,8 @@ import ( ) func TestKubernetes(t *testing.T) { - // TODO: fix local test running - return - - if os.Getenv("IN_TRAVIS_CI") == "yes" { - t.Skip("In Travis CI") + if len(os.Getenv("IN_TRAVIS_CI")) > 0 { + t.Skip() } k := NewLog(log.Name("micro-network")) diff --git a/proxy/grpc/grpc.go b/proxy/grpc/grpc.go index ea278cd2..75e5b57f 100644 --- a/proxy/grpc/grpc.go +++ b/proxy/grpc/grpc.go @@ -143,6 +143,7 @@ func (p *Proxy) serveRequest(ctx context.Context, link client.Client, service, e // new context with cancel ctx, cancel := context.WithCancel(ctx) + defer cancel() // create new stream stream, err := link.Stream(ctx, creq, opts...) diff --git a/proxy/http/http.go b/proxy/http/http.go index 78fdb2b6..a467d3fa 100644 --- a/proxy/http/http.go +++ b/proxy/http/http.go @@ -21,9 +21,6 @@ type Proxy struct { // The http backend to call Endpoint string - - // first request - first bool } func getMethod(hdr map[string]string) string { diff --git a/proxy/mucp/mucp.go b/proxy/mucp/mucp.go index 8d4e5197..7da5264b 100644 --- a/proxy/mucp/mucp.go +++ b/proxy/mucp/mucp.go @@ -517,6 +517,7 @@ func (p *Proxy) serveRequest(ctx context.Context, link client.Client, service, e // new context with cancel ctx, cancel := context.WithCancel(ctx) + defer cancel() // create new stream stream, err := link.Stream(ctx, creq, opts...) diff --git a/router/registry/registry.go b/router/registry/registry.go index 3d615726..5098babd 100644 --- a/router/registry/registry.go +++ b/router/registry/registry.go @@ -126,7 +126,7 @@ func (r *rtr) manageRoute(route router.Route, action string) error { // createRoutes turns a service into a list routes basically converting nodes to routes func (r *rtr) createRoutes(service *registry.Service, network string) []router.Route { - var routes []router.Route + routes := make([]router.Route, 0, len(service.Nodes)) for _, node := range service.Nodes { routes = append(routes, router.Route{ diff --git a/router/registry/table.go b/router/registry/table.go index c16d7682..cb4ee8b9 100644 --- a/router/registry/table.go +++ b/router/registry/table.go @@ -262,6 +262,8 @@ func filterRoutes(routes map[uint64]*route, opts router.QueryOptions) []router.R // routeMap stores the routes we're going to advertise routeMap := make(map[string][]router.Route) + var routeCnt int + for _, rt := range routes { // get the actual route route := rt.route @@ -270,11 +272,11 @@ func filterRoutes(routes map[uint64]*route, opts router.QueryOptions) []router.R // add matchihg route to the routeMap routeKey := route.Service + "@" + route.Network routeMap[routeKey] = append(routeMap[routeKey], route) + routeCnt++ } } - var results []router.Route - + results := make([]router.Route, 0, routeCnt) for _, route := range routeMap { results = append(results, route...) } diff --git a/runtime/kubernetes/kubernetes.go b/runtime/kubernetes/kubernetes.go index 1948b2bd..2f381c7a 100644 --- a/runtime/kubernetes/kubernetes.go +++ b/runtime/kubernetes/kubernetes.go @@ -504,7 +504,7 @@ func (k *kubernetes) Read(opts ...runtime.ReadOption) ([]*runtime.Service, error return nil, err } - var services []*runtime.Service + services := make([]*runtime.Service, 0, len(srvs)) for _, service := range srvs { services = append(services, service.Service) } diff --git a/runtime/kubernetes/logs.go b/runtime/kubernetes/logs.go index 5fb6032d..4aaa6c01 100644 --- a/runtime/kubernetes/logs.go +++ b/runtime/kubernetes/logs.go @@ -123,7 +123,7 @@ func (k *klog) Read() ([]runtime.Log, error) { logParams["tailLines"] = strconv.Itoa(int(k.options.Count)) } - if k.options.Stream == true { + if k.options.Stream { logParams["follow"] = "true" } diff --git a/runtime/local/git/git.go b/runtime/local/git/git.go index 08094f09..c7c55874 100644 --- a/runtime/local/git/git.go +++ b/runtime/local/git/git.go @@ -443,7 +443,7 @@ func CheckoutSource(folder string, source *Source, secrets map[string]string) er if !strings.Contains(repo, "https://") { repo = "https://" + repo } - err := gitter.Checkout(source.Repo, source.Ref) + err := gitter.Checkout(repo, source.Ref) if err != nil { return err } @@ -467,10 +467,11 @@ func extractServiceName(fileContent []byte) string { // Uncompress is a modified version of: https://gist.github.com/mimoo/25fc9716e0f1353791f5908f94d6e726 func Uncompress(src string, dst string) error { file, err := os.OpenFile(src, os.O_RDWR|os.O_CREATE, 0666) - defer file.Close() if err != nil { return err } + defer file.Close() + // ungzip zr, err := gzip.NewReader(file) if err != nil { diff --git a/runtime/local/local.go b/runtime/local/local.go index f8611679..86150730 100644 --- a/runtime/local/local.go +++ b/runtime/local/local.go @@ -377,7 +377,7 @@ func (r *localRuntime) Logs(s *runtime.Service, options ...runtime.LogsOption) ( t, err := tail.TailFile(fpath, tail.Config{Follow: lopts.Stream, Location: &tail.SeekInfo{ Whence: whence, - Offset: int64(offset), + Offset: offset, }, Logger: tail.DiscardingLogger}) if err != nil { return nil, err diff --git a/runtime/local/process/os/os.go b/runtime/local/process/os/os.go index 68c6f6f9..3294a3c9 100644 --- a/runtime/local/process/os/os.go +++ b/runtime/local/process/os/os.go @@ -20,7 +20,6 @@ func (p *Process) Exec(exe *process.Binary) error { } func (p *Process) Fork(exe *process.Binary) (*process.PID, error) { - // create command cmd := exec.Command(exe.Package.Path, exe.Args...) diff --git a/server/grpc/codec.go b/server/grpc/codec.go index edec7f81..0bf13802 100644 --- a/server/grpc/codec.go +++ b/server/grpc/codec.go @@ -4,15 +4,13 @@ import ( "encoding/json" "strings" - b "bytes" - - "github.com/golang/protobuf/jsonpb" - "github.com/golang/protobuf/proto" "github.com/unistack-org/micro/v3/codec" "github.com/unistack-org/micro/v3/codec/bytes" "google.golang.org/grpc" "google.golang.org/grpc/encoding" "google.golang.org/grpc/metadata" + jsonpb "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" ) type jsonCodec struct{} @@ -20,10 +18,16 @@ type bytesCodec struct{} type protoCodec struct{} type wrapCodec struct{ encoding.Codec } -var jsonpbMarshaler = &jsonpb.Marshaler{ - EnumsAsInts: false, - EmitDefaults: false, - OrigName: true, +var jsonpbMarshaler = jsonpb.MarshalOptions{ + UseEnumNumbers: false, + EmitUnpopulated: false, + UseProtoNames: true, + AllowPartial: false, +} + +var jsonpbUnmarshaler = jsonpb.UnmarshalOptions{ + DiscardUnknown: false, + AllowPartial: false, } var ( @@ -85,8 +89,8 @@ func (protoCodec) Name() string { func (jsonCodec) Marshal(v interface{}) ([]byte, error) { if pb, ok := v.(proto.Message); ok { - s, err := jsonpbMarshaler.MarshalToString(pb) - return []byte(s), err + s, err := jsonpbMarshaler.Marshal(pb) + return s, err } return json.Marshal(v) @@ -97,7 +101,7 @@ func (jsonCodec) Unmarshal(data []byte, v interface{}) error { return nil } if pb, ok := v.(proto.Message); ok { - return jsonpb.Unmarshal(b.NewReader(data), pb) + return jsonpbUnmarshaler.Unmarshal(data, pb) } return json.Unmarshal(data, v) } diff --git a/server/grpc/grpc.go b/server/grpc/grpc.go index 6e015bce..828c0973 100644 --- a/server/grpc/grpc.go +++ b/server/grpc/grpc.go @@ -1029,14 +1029,11 @@ func (g *grpcServer) Stop() error { ch := make(chan error) g.exit <- ch - var err error - select { - case err = <-ch: - g.Lock() - g.rsvc = nil - g.started = false - g.Unlock() - } + err := <-ch + g.Lock() + g.rsvc = nil + g.started = false + g.Unlock() return err } diff --git a/server/mucp/rpc_router.go b/server/mucp/rpc_router.go index b7c55184..28bf120d 100644 --- a/server/mucp/rpc_router.go +++ b/server/mucp/rpc_router.go @@ -59,8 +59,6 @@ type response struct { // router represents an RPC router. type router struct { - name string - mu sync.Mutex // protects the serviceMap serviceMap map[string]*service @@ -371,7 +369,12 @@ func (router *router) readRequest(r server.Request) (service *service, mtype *me return } -func (router *router) readHeader(cc codec.Reader) (service *service, mtype *methodType, req *request, keepReading bool, err error) { +func (router *router) readHeader(cc codec.Reader) (*service, *methodType, *request, bool, error) { + var err error + var service *service + var mtype *methodType + var req *request + // Grab the request header. msg := new(codec.Message) msg.Type = codec.Request @@ -382,34 +385,32 @@ func (router *router) readHeader(cc codec.Reader) (service *service, mtype *meth if err != nil { req = nil if err == io.EOF || err == io.ErrUnexpectedEOF { - return + return nil, nil, nil, false, err } - err = errors.New("rpc: router cannot decode request: " + err.Error()) - return + return nil, nil, nil, false, fmt.Errorf("rpc: router cannot decode request: %v", err) } // We read the header successfully. If we see an error now, // we can still recover and move on to the next request. - keepReading = true + keepReading := true serviceMethod := strings.Split(req.msg.Endpoint, ".") if len(serviceMethod) != 2 { - err = errors.New("rpc: service/endpoint request ill-formed: " + req.msg.Endpoint) - return + return nil, nil, nil, keepReading, fmt.Errorf("rpc: service/endpoint request ill-formed: %v", req.msg.Endpoint) } // Look up the request. router.mu.Lock() service = router.serviceMap[serviceMethod[0]] router.mu.Unlock() if service == nil { - err = errors.New("rpc: can't find service " + serviceMethod[0]) - return + return nil, nil, nil, keepReading, fmt.Errorf("rpc: can't find service %v", serviceMethod[0]) } mtype = service.method[serviceMethod[1]] if mtype == nil { - err = errors.New("rpc: can't find method " + serviceMethod[1]) + return nil, nil, nil, keepReading, fmt.Errorf("rpc: can't find method %v", serviceMethod[1]) } - return + + return service, mtype, req, keepReading, nil } func (router *router) NewHandler(h interface{}, opts ...server.HandlerOption) server.Handler { diff --git a/store/cockroach/cockroach.go b/store/cockroach/cockroach.go index 2b743ff1..6f30623b 100644 --- a/store/cockroach/cockroach.go +++ b/store/cockroach/cockroach.go @@ -221,7 +221,7 @@ func (s *sqlStore) List(opts ...store.ListOption) ([]string, error) { pattern = options.Prefix + pattern } if options.Suffix != "" { - pattern = pattern + options.Suffix + pattern += options.Suffix } } if options.Offset > 0 { @@ -245,11 +245,11 @@ func (s *sqlStore) List(opts ...store.ListOption) ([]string, error) { return nil, err } defer rows.Close() - var keys []string records, err := s.rowsToRecords(rows) if err != nil { return nil, err } + keys := make([]string, 0, len(records)) for _, k := range records { keys = append(keys, k.Key) } @@ -360,7 +360,7 @@ func (s *sqlStore) read(key string, options store.ReadOptions) ([]*store.Record, pattern = key + pattern } if options.Suffix { - pattern = pattern + key + pattern += key } var rows *sql.Rows diff --git a/store/cockroach/cockroach_test.go b/store/cockroach/cockroach_test.go index afd5e972..425f6859 100644 --- a/store/cockroach/cockroach_test.go +++ b/store/cockroach/cockroach_test.go @@ -102,10 +102,10 @@ func TestSQL(t *testing.T) { switch err { case nil: t.Error("Key test should have expired") - default: - t.Error(err) case store.ErrNotFound: break + default: + t.Error(err) } sqlStore.Delete("bar") sqlStore.Write(&store.Record{Key: "aaa", Value: []byte("bbb"), Expiry: 10 * time.Second}) diff --git a/tunnel/broker/broker.go b/tunnel/broker/broker.go index 578317f8..196113d3 100644 --- a/tunnel/broker/broker.go +++ b/tunnel/broker/broker.go @@ -24,11 +24,6 @@ type tunSubscriber struct { listener tunnel.Listener } -type tunEvent struct { - topic string - message *broker.Message -} - // used to access tunnel from options context type tunnelKey struct{} type tunnelAddr struct{} @@ -149,22 +144,6 @@ func (t *tunSubscriber) Unsubscribe() error { } } -func (t *tunEvent) Topic() string { - return t.topic -} - -func (t *tunEvent) Message() *broker.Message { - return t.message -} - -func (t *tunEvent) Ack() error { - return nil -} - -func (t *tunEvent) Error() error { - return nil -} - func NewBroker(opts ...broker.Option) broker.Broker { options := broker.Options{ Context: context.Background(), diff --git a/util/file/client.go b/util/file/client.go index 9c50b81e..c65db16d 100644 --- a/util/file/client.go +++ b/util/file/client.go @@ -3,13 +3,12 @@ package file import ( "bufio" "context" - "errors" "fmt" "io" - "log" "os" "github.com/unistack-org/micro/v3/client" + "github.com/unistack-org/micro/v3/logger" proto "github.com/unistack-org/micro/v3/util/file/proto" ) @@ -156,7 +155,7 @@ func (c *fc) DownloadAt(filename, saveFile string, blockId int) error { return err } if stat.Type == "Directory" { - return errors.New(fmt.Sprintf("%s is directory.", filename)) + return fmt.Errorf("%s is directory.", filename) } blocks := int(stat.Size / blockSize) @@ -164,7 +163,7 @@ func (c *fc) DownloadAt(filename, saveFile string, blockId int) error { blocks += 1 } - log.Printf("Download %s in %d blocks\n", filename, blocks-blockId) + logger.Infof("Download %s in %d blocks", filename, blocks-blockId) file, err := os.OpenFile(saveFile, os.O_CREATE|os.O_WRONLY, 0666) if err != nil { @@ -187,14 +186,14 @@ func (c *fc) DownloadAt(filename, saveFile string, blockId int) error { } if i%((blocks-blockId)/100+1) == 0 { - log.Printf("Downloading %s [%d/%d] blocks", filename, i-blockId+1, blocks-blockId) + logger.Infof("Downloading %s [%d/%d] blocks", filename, i-blockId+1, blocks-blockId) } if rerr == io.EOF { break } } - log.Printf("Download %s completed", filename) + logger.Infof("Download %s completed", filename) c.Close(sessionId) diff --git a/util/file/handler.go b/util/file/handler.go index ffddb22a..8b9ae691 100644 --- a/util/file/handler.go +++ b/util/file/handler.go @@ -37,7 +37,7 @@ func (h *handler) Open(ctx context.Context, req *proto.OpenRequest, rsp *proto.O path := filepath.Join(h.readDir, req.Filename) flags := os.O_CREATE | os.O_RDWR if req.GetTruncate() { - flags = flags | os.O_TRUNC + flags |= os.O_TRUNC } file, err := os.OpenFile(path, flags, 0666) if err != nil { diff --git a/util/kubernetes/api/api_test.go b/util/kubernetes/api/api_test.go index f1f45fdd..46d5f344 100644 --- a/util/kubernetes/api/api_test.go +++ b/util/kubernetes/api/api_test.go @@ -18,8 +18,6 @@ type testcase struct { Assert func(req *http.Request) bool } -type assertFn func(req *http.Request) bool - var tests = []testcase{ testcase{ ReqFn: func(opts *Options) *Request { diff --git a/util/kubernetes/api/response.go b/util/kubernetes/api/response.go index bd486413..14de9f28 100644 --- a/util/kubernetes/api/response.go +++ b/util/kubernetes/api/response.go @@ -29,8 +29,6 @@ type Status struct { type Response struct { res *http.Response err error - - body []byte } // Error returns an error diff --git a/util/mdns/client.go b/util/mdns/client.go index ba88cea2..3cc30173 100644 --- a/util/mdns/client.go +++ b/util/mdns/client.go @@ -86,10 +86,12 @@ func Query(params *QueryParam) error { } if params.Context == nil { + var cancel context.CancelFunc if params.Timeout == 0 { params.Timeout = time.Second } - params.Context, _ = context.WithTimeout(context.Background(), params.Timeout) + params.Context, cancel = context.WithTimeout(context.Background(), params.Timeout) + defer cancel() if err != nil { return err } diff --git a/util/qson/qson.go b/util/qson/qson.go index ae58281f..063a1771 100644 --- a/util/qson/qson.go +++ b/util/qson/qson.go @@ -22,7 +22,7 @@ var ( ) func init() { - bracketSplitter = regexp.MustCompile("\\[|\\]") + bracketSplitter = regexp.MustCompile(`\[|\]`) } func btSplitter(str string) []string { @@ -127,7 +127,7 @@ func queryToMap(param string) (map[string]interface{}, error) { // To do this we break our key into two pieces: // a and b[c] // and then we set {"a": queryToMap("b[c]", value)} - ret := make(map[string]interface{}, 0) + ret := make(map[string]interface{}) ret[key], err = queryToMap(buildNewKey(rawKey) + "=" + rawValue) if err != nil { return nil, err diff --git a/util/qson/qson_test.go b/util/qson/qson_test.go index 5611dc10..7ebc6ef2 100644 --- a/util/qson/qson_test.go +++ b/util/qson/qson_test.go @@ -1,7 +1,6 @@ package qson import ( - "fmt" "testing" ) @@ -24,7 +23,8 @@ func ExampleUnmarshal() { if err := Unmarshal(&ex, "a=xyz&b[c]=456"); err != nil { panic(err) } - fmt.Printf("%+v\n", ex) + _ = ex + // fmt.Printf("%+v\n", ex) // Output: {A:xyz B:{C:456}} } @@ -56,11 +56,11 @@ func TestUnmarshal(t *testing.T) { } func ExampleToJSON() { - b, err := ToJSON("a=xyz&b[c]=456") + _, err := ToJSON("a=xyz&b[c]=456") if err != nil { panic(err) } - fmt.Printf(string(b)) + // fmt.Printf(string(b)) // Output: {"a":"xyz","b":{"c":456}} } diff --git a/util/router/runtime.go b/util/router/runtime.go index 13a702fd..eb497dee 100644 --- a/util/router/runtime.go +++ b/util/router/runtime.go @@ -192,7 +192,6 @@ func (p Pattern) Match(components []string, verb string) (map[string]string, err components = append([]string{}, components...) components[len(components)-1] += ":" + verb } - verb = "" } var pos int diff --git a/util/router/types.go b/util/router/types.go index 9cacddaa..183eeb7c 100644 --- a/util/router/types.go +++ b/util/router/types.go @@ -42,7 +42,7 @@ func (l literal) String() string { } func (v variable) String() string { - var segs []string + segs := make([]string, 0, len(v.segments)) for _, s := range v.segments { segs = append(segs, s.String()) } @@ -50,7 +50,7 @@ func (v variable) String() string { } func (t template) String() string { - var segs []string + segs := make([]string, 0, len(t.segments)) for _, s := range t.segments { segs = append(segs, s.String()) } diff --git a/util/sync/manager.go b/util/sync/manager.go index 9868881f..069b8c13 100644 --- a/util/sync/manager.go +++ b/util/sync/manager.go @@ -7,24 +7,6 @@ import ( "github.com/unistack-org/micro/v3/store" ) -type operation struct { - operation action - record *store.Record - deadline time.Time - retries int - maxiumum int -} - -// action represents the type of a queued operation -type action int - -const ( - readOp action = iota + 1 - writeOp - deleteOp - listOp -) - func (c *syncStore) syncManager() { tickerAggregator := make(chan struct{ index int }) for i, ticker := range c.pendingWriteTickers {