Compare commits

...

41 Commits

Author SHA1 Message Date
919520219c client: WithBodyOnly publish option
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-06-11 14:14:41 +03:00
60a5e737f8 util/reflect: return pointer from helper funcs
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-05-25 22:44:22 +03:00
34f0b209cc codec: add ability to control codec via struct tags
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-05-25 22:20:39 +03:00
ba8e1889fe dependabot
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-05-16 17:18:56 +03:00
dae5c57a60 Create dependabot.yml 2021-05-15 14:46:22 +03:00
ea590d57df meter/wrapper: add inflight request/message count (#47)
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-05-10 17:59:40 +03:00
Renovate Bot
9aa6969836 fix(deps): update golang.org/x/net commit hash to 4163338 2021-05-10 14:29:32 +00:00
Renovate Bot
c00c705c24 fix(deps): update golang.org/x/net commit hash to 16afe75 2021-05-08 09:02:01 +00:00
Renovate Bot
0239f795d8 fix(deps): update golang.org/x/net commit hash to 7fd8e65 2021-05-03 10:07:41 +00:00
Renovate Bot
e69b43881d fix(deps): update golang.org/x/net commit hash to f8dd838 2021-05-01 23:20:59 +00:00
3a48a613fe not fail on lint now
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-04-27 08:36:11 +03:00
86626c5922 fieldalignment of all structs to save memory
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-04-27 08:32:47 +03:00
ee11f39a2f fieldaligment
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-04-27 00:03:18 +03:00
3bdfdd8fd2 meter: fix labels
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-04-27 00:03:18 +03:00
6dfdff7fd8 fieldaligment
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-04-27 00:03:18 +03:00
00a4785df3 fixup
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-04-27 00:03:18 +03:00
Renovate Bot
bae3b0ef94 fix(deps): update golang.org/x/net commit hash to 5f58ad6 2021-04-23 23:52:34 +00:00
Renovate Bot
89b0565062 fix(deps): update golang.org/x/net commit hash to 4e50805 2021-04-22 03:04:18 +00:00
1f8b0aeb61 store: remove unused Value type
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-04-22 00:57:06 +03:00
Renovate Bot
5b6f849e0a fix(deps): update golang.org/x/net commit hash to 798c215 2021-04-20 23:13:59 +00:00
Renovate Bot
3b416fffde fix(deps): update golang.org/x/net commit hash to d25e304 2021-04-20 15:04:17 +00:00
3a60103aed server: drop Internal option
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-04-20 12:45:14 +03:00
41837a67f8 register: drop verbose values export
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-04-20 12:39:21 +03:00
852f19598d util/reflect: fix protobuf field name detection
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-04-19 11:34:28 +03:00
6537b35773 util/reflect: add interface merging
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-04-19 01:19:37 +03:00
b733f1316f remove stale generate stuff
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-04-16 17:36:27 +03:00
Renovate Bot
840af5574c fix(deps): update golang.org/x/net commit hash to e915ea6 2021-04-16 00:56:52 +00:00
Renovate Bot
56e5b7001c fix(deps): update golang.org/x/net commit hash to 0645797 2021-04-14 21:41:15 +00:00
Renovate Bot
11dc6fd752 fix(deps): update golang.org/x/net commit hash to afb366f 2021-04-10 11:09:36 +00:00
a2695d8699 util/reflect: rewrite struct merging with map
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-04-10 01:22:40 +03:00
618421de05 client: allow to set content-type for call
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-04-09 23:09:12 +03:00
Renovate Bot
30baaabd9f fix(deps): update golang.org/x/net commit hash to a5a99cb 2021-04-05 19:46:46 +00:00
df5bce1191 util/reflect: fix StructURLValues
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-04-03 11:50:23 +03:00
Renovate Bot
089d0fe4df fix(deps): update golang.org/x/net commit hash to 0fccb6f 2021-03-31 22:50:38 +00:00
a06f535303 util/reflect: add StructURLValues func
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-04-01 00:30:26 +03:00
Renovate Bot
eba586a329 fix(deps): update golang.org/x/net commit hash to cb1fcc7 2021-03-31 09:19:27 +00:00
Renovate Bot
d74a8645e8 fix(deps): update golang.org/x/net commit hash to e572328 2021-03-31 00:52:53 +00:00
Renovate Bot
5a00786192 fix(deps): update golang.org/x/net commit hash to cd0ac97 2021-03-30 22:28:38 +00:00
Renovate Bot
b3e9941634 fix(deps): update golang.org/x/net commit hash to c8897c2 2021-03-30 16:02:28 +00:00
Renovate Bot
a5a5904302 fix(deps): update golang.org/x/net commit hash to 22f4162 2021-03-30 11:44:00 +00:00
Renovate Bot
a59832e57e fix(deps): update golang.org/x/net commit hash to df645c7 2021-03-30 05:11:12 +00:00
100 changed files with 1503 additions and 1289 deletions

19
.github/dependabot.yml vendored Normal file
View File

@@ -0,0 +1,19 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for all configuration options:
# https://help.github.com/github/administering-a-repository/configuration-options-for-dependency-updates
version: 2
updates:
# Maintain dependencies for GitHub Actions
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "daily"
# Maintain dependencies for Golang
- package-ecosystem: "gomod"
directory: "/"
schedule:
interval: "daily"

15
.github/generate.sh vendored
View File

@@ -1,15 +0,0 @@
#!/bin/bash -e
find . -type f -name '*.pb.*.go' -o -name '*.pb.go' -a ! -name 'message.pb.go' -delete
PROTOS=$(find . -type f -name '*.proto' | grep -v proto/google/api)
mkdir -p proto/google/api
curl -s -o proto/google/api/annotations.proto -L https://raw.githubusercontent.com/googleapis/googleapis/master/google/api/annotations.proto
curl -s -o proto/google/api/http.proto -L https://raw.githubusercontent.com/googleapis/googleapis/master/google/api/http.proto
for PROTO in $PROTOS; do
echo $PROTO
protoc -I./proto -I. -I$(dirname $PROTO) --go-grpc_out=paths=source_relative:. --go_out=paths=source_relative:. --micro_out=paths=source_relative:. $PROTO
done
rm -r proto

20
.github/renovate.json vendored
View File

@@ -1,20 +0,0 @@
{
"extends": [
"config:base"
],
"postUpdateOptions": ["gomodTidy"],
"packageRules": [
{
"matchUpdateTypes": ["minor", "patch", "pin", "digest"],
"automerge": true
},
{
"groupName": "all deps",
"separateMajorMinor": true,
"groupSlug": "all",
"packagePatterns": [
"*"
]
}
]
}

13
.github/stale.sh vendored
View File

@@ -1,13 +0,0 @@
#!/bin/bash -ex
export PATH=$PATH:$(pwd)/bin
export GO111MODULE=on
export GOBIN=$(pwd)/bin
#go get github.com/rvflash/goup@v0.4.1
#goup -v ./...
#go get github.com/psampaz/go-mod-outdated@v0.6.0
go list -u -m -mod=mod -json all | go-mod-outdated -update -direct -ci || true
#go list -u -m -json all | go-mod-outdated -update

View File

@@ -53,7 +53,7 @@ jobs:
continue-on-error: true continue-on-error: true
with: with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version. # Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.30 version: v1.39
# Optional: working directory, useful for monorepos # Optional: working directory, useful for monorepos
# working-directory: somedir # working-directory: somedir
# Optional: golangci-lint command line arguments. # Optional: golangci-lint command line arguments.

View File

@@ -53,7 +53,7 @@ jobs:
continue-on-error: true continue-on-error: true
with: with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version. # Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.30 version: v1.39
# Optional: working directory, useful for monorepos # Optional: working directory, useful for monorepos
# working-directory: somedir # working-directory: somedir
# Optional: golangci-lint command line arguments. # Optional: golangci-lint command line arguments.

View File

@@ -1,30 +1,44 @@
run: run:
concurrency: 4
deadline: 5m deadline: 5m
modules-download-mode: readonly issues-exit-code: 1
skip-files: tests: true
- ".*\\.pb\\.go$"
- ".*\\.pb\\.micro\\.go$" linters-settings:
govet:
check-shadowing: true
enable:
- fieldalignment
linters: linters:
disable-all: false
enable-all: false
enable: enable:
- megacheck
- staticcheck
- deadcode
- varcheck
- gosimple
- unused
- prealloc
- scopelint
- gocritic
- goimports
- unconvert
- govet - govet
- nakedret - deadcode
- errcheck
- govet
- ineffassign
- staticcheck
- structcheck - structcheck
- gosec
disable:
- maligned
- interfacer
- typecheck - typecheck
- dupl - unused
- varcheck
- bodyclose
- gci
- goconst
- gocritic
- gosimple
- gofmt
- gofumpt
- goimports
- golint
- gosec
- makezero
- misspell
- nakedret
- nestif
- nilerr
- noctx
- prealloc
- unconvert
- unparam
disable-all: false

View File

@@ -149,5 +149,4 @@ func TestValidate(t *testing.T) {
if err := Validate(epPcreInvalid); err == nil { if err := Validate(epPcreInvalid); err == nil {
t.Fatalf("invalid pcre %v", epPcreInvalid.Path[0]) t.Fatalf("invalid pcre %v", epPcreInvalid.Path[0])
} }
} }

View File

@@ -6,10 +6,8 @@ import (
"github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/logger"
) )
var ( // DefaultMaxRecvSize specifies max recv size for handler
// DefaultMaxRecvSize specifies max recv size for handler var DefaultMaxRecvSize int64 = 1024 * 1024 * 100 // 10Mb
DefaultMaxRecvSize int64 = 1024 * 1024 * 100 // 10Mb
)
// Options struct holds handler options // Options struct holds handler options
type Options struct { type Options struct {

View File

@@ -15,12 +15,12 @@ import (
// NewResolver creates new subdomain api resolver // NewResolver creates new subdomain api resolver
func NewResolver(parent resolver.Resolver, opts ...resolver.Option) resolver.Resolver { func NewResolver(parent resolver.Resolver, opts ...resolver.Option) resolver.Resolver {
options := resolver.NewOptions(opts...) options := resolver.NewOptions(opts...)
return &subdomainResolver{options, parent} return &subdomainResolver{opts: options, Resolver: parent}
} }
type subdomainResolver struct { type subdomainResolver struct {
opts resolver.Options
resolver.Resolver resolver.Resolver
opts resolver.Options
} }
// Resolve resolve endpoint based on subdomain // Resolve resolve endpoint based on subdomain

View File

@@ -19,9 +19,7 @@ type vpathResolver struct {
opts resolver.Options opts resolver.Options
} }
var ( var re = regexp.MustCompile("^v[0-9]+$")
re = regexp.MustCompile("^v[0-9]+$")
)
// Resolve endpoint // Resolve endpoint
func (r *vpathResolver) Resolve(req *http.Request, opts ...resolver.ResolveOption) (*resolver.Endpoint, error) { func (r *vpathResolver) Resolve(req *http.Request, opts ...resolver.ResolveOption) (*resolver.Endpoint, error) {

View File

@@ -7,10 +7,8 @@ import (
"github.com/unistack-org/micro/v3/api" "github.com/unistack-org/micro/v3/api"
) )
var ( // DefaultRouter contains default router implementation
// DefaultRouter contains default router implementation var DefaultRouter Router
DefaultRouter Router
)
// Router is used to determine an endpoint for a request // Router is used to determine an endpoint for a request
type Router interface { type Router interface {

View File

@@ -24,11 +24,11 @@ func TestVerify(t *testing.T) {
} }
tt := []struct { tt := []struct {
Name string Error error
Rules []*Rule
Account *Account Account *Account
Resource *Resource Resource *Resource
Error error Name string
Rules []*Rule
}{ }{
{ {
Name: "NoRules", Name: "NoRules",

View File

@@ -7,10 +7,8 @@ import (
"github.com/unistack-org/micro/v3/metadata" "github.com/unistack-org/micro/v3/metadata"
) )
var ( // DefaultBroker default broker
// DefaultBroker default broker var DefaultBroker Broker = NewBroker()
DefaultBroker Broker = NewBroker()
)
// Broker is an interface used for asynchronous messaging. // Broker is an interface used for asynchronous messaging.
type Broker interface { type Broker interface {

View File

@@ -13,27 +13,27 @@ import (
) )
type memoryBroker struct { type memoryBroker struct {
opts Options
Subscribers map[string][]*memorySubscriber Subscribers map[string][]*memorySubscriber
addr string addr string
opts Options
sync.RWMutex sync.RWMutex
connected bool connected bool
} }
type memoryEvent struct { type memoryEvent struct {
opts Options
err error err error
message interface{} message interface{}
topic string topic string
opts Options
} }
type memorySubscriber struct { type memorySubscriber struct {
opts SubscribeOptions
ctx context.Context ctx context.Context
exit chan bool exit chan bool
handler Handler handler Handler
id string id string
topic string topic string
opts SubscribeOptions
} }
func (m *memoryBroker) Options() Options { func (m *memoryBroker) Options() Options {

View File

@@ -9,16 +9,10 @@ import (
"github.com/unistack-org/micro/v3/metadata" "github.com/unistack-org/micro/v3/metadata"
) )
var ( // DefaultCodecs will be used to encode/decode data
// DefaultCodecs will be used to encode/decode data var DefaultCodecs = map[string]codec.Codec{
DefaultCodecs = map[string]codec.Codec{ "application/octet-stream": codec.NewCodec(),
//"application/json": cjson.NewCodec, }
//"application/json-rpc": cjsonrpc.NewCodec,
//"application/protobuf": cproto.NewCodec,
//"application/proto-rpc": cprotorpc.NewCodec,
"application/octet-stream": codec.NewCodec(),
}
)
type noopClient struct { type noopClient struct {
opts Options opts Options

View File

@@ -19,8 +19,8 @@ import (
// Options holds client options // Options holds client options
type Options struct { type Options struct {
// CallOptions contains default CallOptions // Selector used to select needed address
CallOptions CallOptions Selector selector.Selector
// Logger used to log messages // Logger used to log messages
Logger logger.Logger Logger logger.Logger
// Tracer used for tracing // Tracer used for tracing
@@ -31,30 +31,30 @@ type Options struct {
Meter meter.Meter Meter meter.Meter
// Router used to get route // Router used to get route
Router router.Router Router router.Router
// Selector used to select needed address
Selector selector.Selector
// Transport used for transfer messages // Transport used for transfer messages
Transport transport.Transport Transport transport.Transport
// Context is used for external options // Context is used for external options
Context context.Context Context context.Context
// Codecs map
Codecs map[string]codec.Codec
// Lookup func used to get destination addr // Lookup func used to get destination addr
Lookup LookupFunc Lookup LookupFunc
// Codecs map
Codecs map[string]codec.Codec
// TLSConfig specifies tls.Config for secure connection
TLSConfig *tls.Config
// Proxy is used for proxy requests // Proxy is used for proxy requests
Proxy string Proxy string
// ContentType is Used to select codec // ContentType is used to select codec
ContentType string ContentType string
// Name is the client name // Name is the client name
Name string Name string
// Wrappers contains wrappers // Wrappers contains wrappers
Wrappers []Wrapper Wrappers []Wrapper
// CallOptions contains default CallOptions
CallOptions CallOptions
// PoolSize connection pool size // PoolSize connection pool size
PoolSize int PoolSize int
// PoolTTL connection pool ttl // PoolTTL connection pool ttl
PoolTTL time.Duration PoolTTL time.Duration
// TLSConfig specifies tls.Config for secure connection
TLSConfig *tls.Config
} }
// NewCallOptions creates new call options struct // NewCallOptions creates new call options struct
@@ -80,6 +80,8 @@ type CallOptions struct {
Backoff BackoffFunc Backoff BackoffFunc
// Network name // Network name
Network string Network string
// Content-Type
ContentType string
// CallWrappers call wrappers // CallWrappers call wrappers
CallWrappers []CallWrapper CallWrappers []CallWrapper
// SelectOptions selector options // SelectOptions selector options
@@ -116,6 +118,8 @@ func NewPublishOptions(opts ...PublishOption) PublishOptions {
// PublishOptions holds publish options // PublishOptions holds publish options
type PublishOptions struct { type PublishOptions struct {
// BodyOnly will publish only message body
BodyOnly bool
// Context used for external options // Context used for external options
Context context.Context Context context.Context
// Exchange topic exchange name // Exchange topic exchange name
@@ -375,6 +379,13 @@ func WithExchange(e string) PublishOption {
} }
} }
// WithBodyOnly publish only message body
func WithBodyOnly(b bool) PublishOption {
return func(o *PublishOptions) {
o.BodyOnly = b
}
}
// PublishContext sets the context in publish options // PublishContext sets the context in publish options
func PublishContext(ctx context.Context) PublishOption { func PublishContext(ctx context.Context) PublishOption {
return func(o *PublishOptions) { return func(o *PublishOptions) {
@@ -382,6 +393,13 @@ func PublishContext(ctx context.Context) PublishOption {
} }
} }
// WithContentType specifies call content type
func WithContentType(ct string) CallOption {
return func(o *CallOptions) {
o.ContentType = ct
}
}
// WithAddress sets the remote addresses to use rather than using service discovery // WithAddress sets the remote addresses to use rather than using service discovery
func WithAddress(a ...string) CallOption { func WithAddress(a ...string) CallOption {
return func(o *CallOptions) { return func(o *CallOptions) {
@@ -486,16 +504,16 @@ func WithMessageContentType(ct string) MessageOption {
} }
} }
// WithContentType specifies request content type // StreamingRequest specifies that request is streaming
func WithContentType(ct string) RequestOption { func StreamingRequest(b bool) RequestOption {
return func(o *RequestOptions) {
o.Stream = b
}
}
// RequestContentType specifies request content type
func RequestContentType(ct string) RequestOption {
return func(o *RequestOptions) { return func(o *RequestOptions) {
o.ContentType = ct o.ContentType = ct
} }
} }
// StreamingRequest specifies that request is streaming
func StreamingRequest() RequestOption {
return func(o *RequestOptions) {
o.Stream = true
}
}

View File

@@ -5,13 +5,13 @@ import (
) )
type testRequest struct { type testRequest struct {
opts RequestOptions
codec codec.Codec codec codec.Codec
body interface{} body interface{}
service string
method string method string
endpoint string endpoint string
contentType string contentType string
service string opts RequestOptions
} }
func (r *testRequest) ContentType() string { func (r *testRequest) ContentType() string {

View File

@@ -28,6 +28,8 @@ var (
DefaultMaxMsgSize int = 1024 * 1024 * 4 // 4Mb DefaultMaxMsgSize int = 1024 * 1024 * 4 // 4Mb
// DefaultCodec is the global default codec // DefaultCodec is the global default codec
DefaultCodec Codec = NewCodec() DefaultCodec Codec = NewCodec()
// DefaultTagName specifies struct tag name to control codec Marshal/Unmarshal
DefaultTagName = "codec"
) )
// MessageType specifies message type for codec // MessageType specifies message type for codec

View File

@@ -5,8 +5,7 @@ import (
"io" "io"
) )
type noopCodec struct { type noopCodec struct{}
}
func (c *noopCodec) ReadHeader(conn io.Reader, m *Message, t MessageType) error { func (c *noopCodec) ReadHeader(conn io.Reader, m *Message, t MessageType) error {
return nil return nil

View File

@@ -6,10 +6,8 @@ import (
"errors" "errors"
) )
var ( // DefaultConfig default config
// DefaultConfig default config var DefaultConfig Config = NewConfig()
DefaultConfig Config = NewConfig()
)
var ( var (
// ErrCodecMissing is returned when codec needed and not specified // ErrCodecMissing is returned when codec needed and not specified
@@ -37,10 +35,10 @@ type Config interface {
} }
// Watcher is the config watcher // Watcher is the config watcher
//type Watcher interface { // type Watcher interface {
// Next() (, error) // Next() (, error)
// Stop() error // Stop() error
//} // }
// Load loads config from config sources // Load loads config from config sources
func Load(ctx context.Context, cs ...Config) error { func Load(ctx context.Context, cs ...Config) error {

View File

@@ -35,7 +35,7 @@ func (c *defaultConfig) Load(ctx context.Context) error {
src, err := rutil.Zero(c.opts.Struct) src, err := rutil.Zero(c.opts.Struct)
if err == nil { if err == nil {
valueOf := reflect.ValueOf(src) valueOf := reflect.ValueOf(src)
if err = c.fillValues(ctx, valueOf); err == nil { if err = c.fillValues(valueOf); err == nil {
err = mergo.Merge(c.opts.Struct, src, mergo.WithOverride, mergo.WithTypeCheck, mergo.WithAppendSlice) err = mergo.Merge(c.opts.Struct, src, mergo.WithOverride, mergo.WithTypeCheck, mergo.WithAppendSlice)
} }
} }
@@ -54,7 +54,7 @@ func (c *defaultConfig) Load(ctx context.Context) error {
} }
//nolint:gocyclo //nolint:gocyclo
func (c *defaultConfig) fillValue(ctx context.Context, value reflect.Value, val string) error { func (c *defaultConfig) fillValue(value reflect.Value, val string) error {
if !rutil.IsEmpty(value) { if !rutil.IsEmpty(value) {
return nil return nil
} }
@@ -71,10 +71,10 @@ func (c *defaultConfig) fillValue(ctx context.Context, value reflect.Value, val
kv := strings.FieldsFunc(nval, func(c rune) bool { return c == '=' }) kv := strings.FieldsFunc(nval, func(c rune) bool { return c == '=' })
mkey := reflect.Indirect(reflect.New(kt)) mkey := reflect.Indirect(reflect.New(kt))
mval := reflect.Indirect(reflect.New(et)) mval := reflect.Indirect(reflect.New(et))
if err := c.fillValue(ctx, mkey, kv[0]); err != nil { if err := c.fillValue(mkey, kv[0]); err != nil {
return err return err
} }
if err := c.fillValue(ctx, mval, kv[1]); err != nil { if err := c.fillValue(mval, kv[1]); err != nil {
return err return err
} }
value.SetMapIndex(mkey, mval) value.SetMapIndex(mkey, mval)
@@ -84,7 +84,7 @@ func (c *defaultConfig) fillValue(ctx context.Context, value reflect.Value, val
value.Set(reflect.MakeSlice(reflect.SliceOf(value.Type().Elem()), len(nvals), len(nvals))) value.Set(reflect.MakeSlice(reflect.SliceOf(value.Type().Elem()), len(nvals), len(nvals)))
for idx, nval := range nvals { for idx, nval := range nvals {
nvalue := reflect.Indirect(reflect.New(value.Type().Elem())) nvalue := reflect.Indirect(reflect.New(value.Type().Elem()))
if err := c.fillValue(ctx, nvalue, nval); err != nil { if err := c.fillValue(nvalue, nval); err != nil {
return err return err
} }
value.Index(idx).Set(nvalue) value.Index(idx).Set(nvalue)
@@ -173,7 +173,7 @@ func (c *defaultConfig) fillValue(ctx context.Context, value reflect.Value, val
return nil return nil
} }
func (c *defaultConfig) fillValues(ctx context.Context, valueOf reflect.Value) error { func (c *defaultConfig) fillValues(valueOf reflect.Value) error {
var values reflect.Value var values reflect.Value
if valueOf.Kind() == reflect.Ptr { if valueOf.Kind() == reflect.Ptr {
@@ -200,7 +200,7 @@ func (c *defaultConfig) fillValues(ctx context.Context, valueOf reflect.Value) e
switch value.Kind() { switch value.Kind() {
case reflect.Struct: case reflect.Struct:
value.Set(reflect.Indirect(reflect.New(value.Type()))) value.Set(reflect.Indirect(reflect.New(value.Type())))
if err := c.fillValues(ctx, value); err != nil { if err := c.fillValues(value); err != nil {
return err return err
} }
continue continue
@@ -214,7 +214,7 @@ func (c *defaultConfig) fillValues(ctx context.Context, valueOf reflect.Value) e
value.Set(reflect.New(value.Type().Elem())) value.Set(reflect.New(value.Type().Elem()))
} }
value = value.Elem() value = value.Elem()
if err := c.fillValues(ctx, value); err != nil { if err := c.fillValues(value); err != nil {
return err return err
} }
continue continue
@@ -224,7 +224,7 @@ func (c *defaultConfig) fillValues(ctx context.Context, valueOf reflect.Value) e
continue continue
} }
if err := c.fillValue(ctx, value, tag); err != nil { if err := c.fillValue(value, tag); err != nil {
return err return err
} }
} }

View File

@@ -10,30 +10,30 @@ import (
type Cfg struct { type Cfg struct {
StringValue string `default:"string_value"` StringValue string `default:"string_value"`
IntValue int `default:"99"`
IgnoreValue string `json:"-"` IgnoreValue string `json:"-"`
StructValue struct { StructValue struct {
StringValue string `default:"string_value"` StringValue string `default:"string_value"`
} }
IntValue int `default:"99"`
} }
func TestDefault(t *testing.T) { func TestDefault(t *testing.T) {
ctx := context.Background() ctx := context.Background()
conf := &Cfg{IntValue: 10} conf := &Cfg{IntValue: 10}
blfn := func(ctx context.Context, cfg config.Config) error { blfn := func(ctx context.Context, cfg config.Config) error {
conf, ok := cfg.Options().Struct.(*Cfg) nconf, ok := cfg.Options().Struct.(*Cfg)
if !ok { if !ok {
return fmt.Errorf("failed to get Struct from options: %v", cfg.Options()) return fmt.Errorf("failed to get Struct from options: %v", cfg.Options())
} }
conf.StringValue = "before_load" nconf.StringValue = "before_load"
return nil return nil
} }
alfn := func(ctx context.Context, cfg config.Config) error { alfn := func(ctx context.Context, cfg config.Config) error {
conf, ok := cfg.Options().Struct.(*Cfg) nconf, ok := cfg.Options().Struct.(*Cfg)
if !ok { if !ok {
return fmt.Errorf("failed to get Struct from options: %v", cfg.Options()) return fmt.Errorf("failed to get Struct from options: %v", cfg.Options())
} }
conf.StringValue = "after_load" nconf.StringValue = "after_load"
return nil return nil
} }

View File

@@ -7,9 +7,9 @@ import (
) )
type Config struct { type Config struct {
Value string
SubConfig *SubConfig SubConfig *SubConfig
Config *Config Config *Config
Value string
} }
type SubConfig struct { type SubConfig struct {

View File

@@ -17,7 +17,6 @@ func TestFromError(t *testing.T) {
if merr.Id != "go.micro.test" || merr.Code != 404 { if merr.Id != "go.micro.test" || merr.Code != 404 {
t.Fatalf("invalid conversation %v != %v", err, merr) t.Fatalf("invalid conversation %v != %v", err, merr)
} }
} }
func TestEqual(t *testing.T) { func TestEqual(t *testing.T) {
@@ -32,7 +31,6 @@ func TestEqual(t *testing.T) {
if Equal(err1, err3) { if Equal(err1, err3) {
t.Fatal("errors must be not equal") t.Fatal("errors must be not equal")
} }
} }
func TestErrors(t *testing.T) { func TestErrors(t *testing.T) {

View File

@@ -44,10 +44,10 @@ func TestDag(t *testing.T) {
var steps [][]string var steps [][]string
fn := func(n dag.Vertex, idx int) error { fn := func(n dag.Vertex, idx int) error {
if idx == 0 { if idx == 0 {
steps = make([][]string, 1, 1) steps = make([][]string, 1)
steps[0] = make([]string, 0, 1) steps[0] = make([]string, 0, 1)
} else if idx >= len(steps) { } else if idx >= len(steps) {
tsteps := make([][]string, idx+1, idx+1) tsteps := make([][]string, idx+1)
copy(tsteps, steps) copy(tsteps, steps)
steps = tsteps steps = tsteps
steps[idx] = make([]string, 0, 1) steps[idx] = make([]string, 0, 1)

View File

@@ -1,7 +1,17 @@
// Package flow is an interface used for saga pattern messaging // Package flow is an interface used for saga pattern microservice workflow
package flow package flow
type Step interface { type Step interface {
// Endpoint returns service_name.service_method // Endpoint returns rpc endpoint service_name.service_method or broker topic
Endpoint() string Endpoint() string
} }
type Workflow interface {
Steps() [][]Step
Stop() error
}
type Flow interface {
Start(Workflow) error
Stop(Workflow)
}

View File

@@ -1,3 +0,0 @@
package micro
//go:generate ./.github/generate.sh

2
go.mod
View File

@@ -9,5 +9,5 @@ require (
github.com/imdario/mergo v0.3.12 github.com/imdario/mergo v0.3.12
github.com/patrickmn/go-cache v2.1.0+incompatible github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34 github.com/silas/dag v0.0.0-20210121180416-41cf55125c34
golang.org/x/net v0.0.0-20210326220855-61e056675ecf golang.org/x/net v0.0.0-20210510120150-4163338589ed
) )

8
go.sum
View File

@@ -10,12 +10,12 @@ github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaR
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34 h1:vBfVmA5mZhsQa2jr1FOL9nfA37N/jnbBmi5XUfviVTI= github.com/silas/dag v0.0.0-20210121180416-41cf55125c34 h1:vBfVmA5mZhsQa2jr1FOL9nfA37N/jnbBmi5XUfviVTI=
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
golang.org/x/net v0.0.0-20210326220855-61e056675ecf h1:WUcCxqQqDT0aXO4VnQbfMvp4zh7m1Gb2clVuHUAGGRE= golang.org/x/net v0.0.0-20210510120150-4163338589ed h1:p9UgmWI9wKpfYmgaV/IZKGdXc5qEK45tDwwwDyjS26I=
golang.org/x/net v0.0.0-20210326220855-61e056675ecf/go.mod h1:uSPa2vr4CLtc/ILN5odXGNXS6mhrKVzTaCXzk9m6W3k= golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210324051608-47abb6519492/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=

View File

@@ -21,8 +21,8 @@ func init() {
} }
type defaultLogger struct { type defaultLogger struct {
opts Options
enc *json.Encoder enc *json.Encoder
opts Options
sync.RWMutex sync.RWMutex
} }

View File

@@ -76,7 +76,7 @@ func WithContext(ctx context.Context) Option {
} }
// WithName sets the name // WithName sets the name
func withName(n string) Option { func WithName(n string) Option {
return func(o *Options) { return func(o *Options) {
o.Name = n o.Name = n
} }

View File

@@ -70,21 +70,23 @@ type lWrapper struct {
opts Options opts Options
} }
type ClientCallObserver func(context.Context, client.Request, interface{}, []client.CallOption, error) []string type (
type ClientStreamObserver func(context.Context, client.Request, []client.CallOption, client.Stream, error) []string ClientCallObserver func(context.Context, client.Request, interface{}, []client.CallOption, error) []string
type ClientPublishObserver func(context.Context, client.Message, []client.PublishOption, error) []string ClientStreamObserver func(context.Context, client.Request, []client.CallOption, client.Stream, error) []string
type ClientCallFuncObserver func(context.Context, string, client.Request, interface{}, client.CallOptions, error) []string ClientPublishObserver func(context.Context, client.Message, []client.PublishOption, error) []string
type ServerHandlerObserver func(context.Context, server.Request, interface{}, error) []string ClientCallFuncObserver func(context.Context, string, client.Request, interface{}, client.CallOptions, error) []string
type ServerSubscriberObserver func(context.Context, server.Message, error) []string ServerHandlerObserver func(context.Context, server.Request, interface{}, error) []string
ServerSubscriberObserver func(context.Context, server.Message, error) []string
)
// Options struct for wrapper // Options struct for wrapper
type Options struct { type Options struct {
// Logger that used for log // Logger that used for log
Logger logger.Logger Logger logger.Logger
// Level for logger // ServerHandlerObservers funcs
Level logger.Level ServerHandlerObservers []ServerHandlerObserver
// Enabled flag // ServerSubscriberObservers funcs
Enabled bool ServerSubscriberObservers []ServerSubscriberObserver
// ClientCallObservers funcs // ClientCallObservers funcs
ClientCallObservers []ClientCallObserver ClientCallObservers []ClientCallObserver
// ClientStreamObservers funcs // ClientStreamObservers funcs
@@ -93,12 +95,12 @@ type Options struct {
ClientPublishObservers []ClientPublishObserver ClientPublishObservers []ClientPublishObserver
// ClientCallFuncObservers funcs // ClientCallFuncObservers funcs
ClientCallFuncObservers []ClientCallFuncObserver ClientCallFuncObservers []ClientCallFuncObserver
// ServerHandlerObservers funcs
ServerHandlerObservers []ServerHandlerObserver
// ServerSubscriberObservers funcs
ServerSubscriberObservers []ServerSubscriberObserver
// SkipEndpoints // SkipEndpoints
SkipEndpoints []string SkipEndpoints []string
// Level for logger
Level logger.Level
// Enabled flag
Enabled bool
} }
// Option func signature // Option func signature
@@ -213,7 +215,7 @@ func (l *lWrapper) Call(ctx context.Context, req client.Request, rsp interface{}
for _, o := range l.opts.ClientCallObservers { for _, o := range l.opts.ClientCallObservers {
labels = append(labels, o(ctx, req, rsp, opts, err)...) labels = append(labels, o(ctx, req, rsp, opts, err)...)
} }
fields := make(map[string]interface{}, int(len(labels)/2)) fields := make(map[string]interface{}, len(labels)/2)
for i := 0; i < len(labels); i += 2 { for i := 0; i < len(labels); i += 2 {
fields[labels[i]] = labels[i+1] fields[labels[i]] = labels[i+1]
} }
@@ -240,7 +242,7 @@ func (l *lWrapper) Stream(ctx context.Context, req client.Request, opts ...clien
for _, o := range l.opts.ClientStreamObservers { for _, o := range l.opts.ClientStreamObservers {
labels = append(labels, o(ctx, req, opts, stream, err)...) labels = append(labels, o(ctx, req, opts, stream, err)...)
} }
fields := make(map[string]interface{}, int(len(labels)/2)) fields := make(map[string]interface{}, len(labels)/2)
for i := 0; i < len(labels); i += 2 { for i := 0; i < len(labels); i += 2 {
fields[labels[i]] = labels[i+1] fields[labels[i]] = labels[i+1]
} }
@@ -267,7 +269,7 @@ func (l *lWrapper) Publish(ctx context.Context, msg client.Message, opts ...clie
for _, o := range l.opts.ClientPublishObservers { for _, o := range l.opts.ClientPublishObservers {
labels = append(labels, o(ctx, msg, opts, err)...) labels = append(labels, o(ctx, msg, opts, err)...)
} }
fields := make(map[string]interface{}, int(len(labels)/2)) fields := make(map[string]interface{}, len(labels)/2)
for i := 0; i < len(labels); i += 2 { for i := 0; i < len(labels); i += 2 {
fields[labels[i]] = labels[i+1] fields[labels[i]] = labels[i+1]
} }
@@ -294,7 +296,7 @@ func (l *lWrapper) ServerHandler(ctx context.Context, req server.Request, rsp in
for _, o := range l.opts.ServerHandlerObservers { for _, o := range l.opts.ServerHandlerObservers {
labels = append(labels, o(ctx, req, rsp, err)...) labels = append(labels, o(ctx, req, rsp, err)...)
} }
fields := make(map[string]interface{}, int(len(labels)/2)) fields := make(map[string]interface{}, len(labels)/2)
for i := 0; i < len(labels); i += 2 { for i := 0; i < len(labels); i += 2 {
fields[labels[i]] = labels[i+1] fields[labels[i]] = labels[i+1]
} }
@@ -321,7 +323,7 @@ func (l *lWrapper) ServerSubscriber(ctx context.Context, msg server.Message) err
for _, o := range l.opts.ServerSubscriberObservers { for _, o := range l.opts.ServerSubscriberObservers {
labels = append(labels, o(ctx, msg, err)...) labels = append(labels, o(ctx, msg, err)...)
} }
fields := make(map[string]interface{}, int(len(labels)/2)) fields := make(map[string]interface{}, len(labels)/2)
for i := 0; i < len(labels); i += 2 { for i := 0; i < len(labels); i += 2 {
fields[labels[i]] = labels[i+1] fields[labels[i]] = labels[i+1]
} }
@@ -372,7 +374,7 @@ func (l *lWrapper) ClientCallFunc(ctx context.Context, addr string, req client.R
for _, o := range l.opts.ClientCallFuncObservers { for _, o := range l.opts.ClientCallFuncObservers {
labels = append(labels, o(ctx, addr, req, rsp, opts, err)...) labels = append(labels, o(ctx, addr, req, rsp, opts, err)...)
} }
fields := make(map[string]interface{}, int(len(labels)/2)) fields := make(map[string]interface{}, len(labels)/2)
for i := 0; i < len(labels); i += 2 { for i := 0; i < len(labels); i += 2 {
fields[labels[i]] = labels[i+1] fields[labels[i]] = labels[i+1]
} }

View File

@@ -5,9 +5,11 @@ import (
"context" "context"
) )
type mdIncomingKey struct{} type (
type mdOutgoingKey struct{} mdIncomingKey struct{}
type mdKey struct{} mdOutgoingKey struct{}
mdKey struct{}
)
// FromIncomingContext returns metadata from incoming ctx // FromIncomingContext returns metadata from incoming ctx
// returned metadata shoud not be modified or race condition happens // returned metadata shoud not be modified or race condition happens

View File

@@ -6,10 +6,8 @@ import (
"sort" "sort"
) )
var ( // HeaderPrefix for all headers passed
// HeaderPrefix for all headers passed var HeaderPrefix = "Micro-"
HeaderPrefix = "Micro-"
)
// Metadata is our way of representing request headers internally. // Metadata is our way of representing request headers internally.
// They're used at the RPC level and translate back and forth // They're used at the RPC level and translate back and forth
@@ -20,10 +18,8 @@ type rawMetadata struct {
md Metadata md Metadata
} }
var ( // defaultMetadataSize used when need to init new Metadata
// defaultMetadataSize used when need to init new Metadata var defaultMetadataSize = 2
defaultMetadataSize = 2
)
// Iterator used to iterate over metadata with order // Iterator used to iterate over metadata with order
type Iterator struct { type Iterator struct {

View File

@@ -76,7 +76,7 @@ func TestIterator(t *testing.T) {
var k, v string var k, v string
for iter.Next(&k, &v) { for iter.Next(&k, &v) {
//fmt.Printf("k: %s, v: %s\n", k, v) // fmt.Printf("k: %s, v: %s\n", k, v)
} }
} }
@@ -102,7 +102,6 @@ func TestMedataCanonicalKey(t *testing.T) {
} else if v != "12345" { } else if v != "12345" {
t.Fatalf("invalid metadata value: %s != %s", "12345", v) t.Fatalf("invalid metadata value: %s != %s", "12345", v)
} }
} }
func TestMetadataSet(t *testing.T) { func TestMetadataSet(t *testing.T) {
@@ -130,7 +129,6 @@ func TestMetadataDelete(t *testing.T) {
if ok { if ok {
t.Fatal("key Baz not deleted") t.Fatal("key Baz not deleted")
} }
} }
func TestNilContext(t *testing.T) { func TestNilContext(t *testing.T) {

View File

@@ -9,12 +9,10 @@ import (
"github.com/unistack-org/micro/v3/meter" "github.com/unistack-org/micro/v3/meter"
) )
var ( // guard to fail early
// guard to fail early var _ MeterServer = &Handler{}
_ MeterServer = &handler{}
)
type handler struct { type Handler struct {
opts Options opts Options
} }
@@ -22,8 +20,8 @@ type Option func(*Options)
type Options struct { type Options struct {
Meter meter.Meter Meter meter.Meter
MeterOptions []meter.Option
Name string Name string
MeterOptions []meter.Option
} }
func Meter(m meter.Meter) Option { func Meter(m meter.Meter) Option {
@@ -52,12 +50,12 @@ func NewOptions(opts ...Option) Options {
return options return options
} }
func NewHandler(opts ...Option) *handler { func NewHandler(opts ...Option) *Handler {
options := NewOptions(opts...) options := NewOptions(opts...)
return &handler{opts: options} return &Handler{opts: options}
} }
func (h *handler) Metrics(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error { func (h *Handler) Metrics(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
buf := bytes.NewBuffer(nil) buf := bytes.NewBuffer(nil)
if err := h.opts.Meter.Write(buf, h.opts.MeterOptions...); err != nil { if err := h.opts.Meter.Write(buf, h.opts.MeterOptions...); err != nil {
return errors.InternalServerError(h.opts.Name, "%v", err) return errors.InternalServerError(h.opts.Name, "%v", err)

View File

@@ -29,13 +29,13 @@ var (
type Meter interface { type Meter interface {
Name() string Name() string
Init(opts ...Option) error Init(opts ...Option) error
Counter(name string, opts ...Option) Counter Counter(name string, labels ...string) Counter
FloatCounter(name string, opts ...Option) FloatCounter FloatCounter(name string, labels ...string) FloatCounter
Gauge(name string, fn func() float64, opts ...Option) Gauge Gauge(name string, fn func() float64, labels ...string) Gauge
Set(opts ...Option) Meter Set(opts ...Option) Meter
Histogram(name string, opts ...Option) Histogram Histogram(name string, labels ...string) Histogram
Summary(name string, opts ...Option) Summary Summary(name string, labels ...string) Summary
SummaryExt(name string, window time.Duration, quantiles []float64, opts ...Option) Summary SummaryExt(name string, window time.Duration, quantiles []float64, labels ...string) Summary
Write(w io.Writer, opts ...Option) error Write(w io.Writer, opts ...Option) error
Options() Options Options() Options
String() string String() string

View File

@@ -10,7 +10,7 @@ func TestNoopMeter(t *testing.T) {
t.Fatalf("invalid options parsing: %v", m.Options()) t.Fatalf("invalid options parsing: %v", m.Options())
} }
cnt := m.Counter("counter", Labels("server", "noop")) cnt := m.Counter("counter", "server", "noop")
cnt.Inc() cnt.Inc()
} }

View File

@@ -28,57 +28,33 @@ func (r *noopMeter) Init(opts ...Option) error {
} }
// Counter implements the Meter interface // Counter implements the Meter interface
func (r *noopMeter) Counter(name string, opts ...Option) Counter { func (r *noopMeter) Counter(name string, labels ...string) Counter {
options := Options{} return &noopCounter{labels: labels}
for _, o := range opts {
o(&options)
}
return &noopCounter{labels: options.Labels}
} }
// FloatCounter implements the Meter interface // FloatCounter implements the Meter interface
func (r *noopMeter) FloatCounter(name string, opts ...Option) FloatCounter { func (r *noopMeter) FloatCounter(name string, labels ...string) FloatCounter {
options := Options{} return &noopFloatCounter{labels: labels}
for _, o := range opts {
o(&options)
}
return &noopFloatCounter{labels: options.Labels}
} }
// Gauge implements the Meter interface // Gauge implements the Meter interface
func (r *noopMeter) Gauge(name string, f func() float64, opts ...Option) Gauge { func (r *noopMeter) Gauge(name string, f func() float64, labels ...string) Gauge {
options := Options{} return &noopGauge{labels: labels}
for _, o := range opts {
o(&options)
}
return &noopGauge{labels: options.Labels}
} }
// Summary implements the Meter interface // Summary implements the Meter interface
func (r *noopMeter) Summary(name string, opts ...Option) Summary { func (r *noopMeter) Summary(name string, labels ...string) Summary {
options := Options{} return &noopSummary{labels: labels}
for _, o := range opts {
o(&options)
}
return &noopSummary{labels: options.Labels}
} }
// SummaryExt implements the Meter interface // SummaryExt implements the Meter interface
func (r *noopMeter) SummaryExt(name string, window time.Duration, quantiles []float64, opts ...Option) Summary { func (r *noopMeter) SummaryExt(name string, window time.Duration, quantiles []float64, labels ...string) Summary {
options := Options{} return &noopSummary{labels: labels}
for _, o := range opts {
o(&options)
}
return &noopSummary{labels: options.Labels}
} }
// Histogram implements the Meter interface // Histogram implements the Meter interface
func (r *noopMeter) Histogram(name string, opts ...Option) Histogram { func (r *noopMeter) Histogram(name string, labels ...string) Histogram {
options := Options{} return &noopHistogram{labels: labels}
for _, o := range opts {
o(&options)
}
return &noopHistogram{labels: options.Labels}
} }
// Set implements the Meter interface // Set implements the Meter interface
@@ -111,11 +87,9 @@ type noopCounter struct {
} }
func (r *noopCounter) Add(int) { func (r *noopCounter) Add(int) {
} }
func (r *noopCounter) Dec() { func (r *noopCounter) Dec() {
} }
func (r *noopCounter) Get() uint64 { func (r *noopCounter) Get() uint64 {
@@ -123,11 +97,9 @@ func (r *noopCounter) Get() uint64 {
} }
func (r *noopCounter) Inc() { func (r *noopCounter) Inc() {
} }
func (r *noopCounter) Set(uint64) { func (r *noopCounter) Set(uint64) {
} }
type noopFloatCounter struct { type noopFloatCounter struct {
@@ -135,7 +107,6 @@ type noopFloatCounter struct {
} }
func (r *noopFloatCounter) Add(float64) { func (r *noopFloatCounter) Add(float64) {
} }
func (r *noopFloatCounter) Get() float64 { func (r *noopFloatCounter) Get() float64 {
@@ -143,11 +114,9 @@ func (r *noopFloatCounter) Get() float64 {
} }
func (r *noopFloatCounter) Set(float64) { func (r *noopFloatCounter) Set(float64) {
} }
func (r *noopFloatCounter) Sub(float64) { func (r *noopFloatCounter) Sub(float64) {
} }
type noopGauge struct { type noopGauge struct {
@@ -163,11 +132,9 @@ type noopSummary struct {
} }
func (r *noopSummary) Update(float64) { func (r *noopSummary) Update(float64) {
} }
func (r *noopSummary) UpdateDuration(time.Time) { func (r *noopSummary) UpdateDuration(time.Time) {
} }
type noopHistogram struct { type noopHistogram struct {
@@ -175,15 +142,12 @@ type noopHistogram struct {
} }
func (r *noopHistogram) Reset() { func (r *noopHistogram) Reset() {
} }
func (r *noopHistogram) Update(float64) { func (r *noopHistogram) Update(float64) {
} }
func (r *noopHistogram) UpdateDuration(time.Time) { func (r *noopHistogram) UpdateDuration(time.Time) {
} }
//func (r *noopHistogram) VisitNonZeroBuckets(f func(vmrange string, count uint64)) {} // func (r *noopHistogram) VisitNonZeroBuckets(f func(vmrange string, count uint64)) {}

View File

@@ -90,7 +90,7 @@ func Logger(l logger.Logger) Option {
func Labels(ls ...string) Option { func Labels(ls ...string) Option {
return func(o *Options) { return func(o *Options) {
o.Labels = ls o.Labels = append(o.Labels, ls...)
} }
} }

View File

@@ -11,18 +11,25 @@ import (
) )
var ( var (
ClientRequestDurationSeconds = "client_request_duration_seconds" ClientRequestDurationSeconds = "client_request_duration_seconds"
ClientRequestLatencyMicroseconds = "client_request_latency_microseconds" ClientRequestLatencyMicroseconds = "client_request_latency_microseconds"
ClientRequestTotal = "client_request_total" ClientRequestTotal = "client_request_total"
ServerRequestDurationSeconds = "server_request_duration_seconds" ClientRequestInflight = "client_request_inflight"
ServerRequestLatencyMicroseconds = "server_request_latency_microseconds"
ServerRequestTotal = "server_request_total" ServerRequestDurationSeconds = "server_request_duration_seconds"
PublishMessageDurationSeconds = "publish_message_duration_seconds" ServerRequestLatencyMicroseconds = "server_request_latency_microseconds"
PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds" ServerRequestTotal = "server_request_total"
PublishMessageTotal = "publish_message_total" ServerRequestInflight = "server_request_inflight"
PublishMessageDurationSeconds = "publish_message_duration_seconds"
PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds"
PublishMessageTotal = "publish_message_total"
PublishMessageInflight = "publish_message_inflight"
SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds" SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds"
SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds" SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds"
SubscribeMessageTotal = "subscribe_message_total" SubscribeMessageTotal = "subscribe_message_total"
SubscribeMessageInflight = "subscribe_message_inflight"
labelSuccess = "success" labelSuccess = "success"
labelFailure = "failure" labelFailure = "failure"
@@ -116,22 +123,25 @@ func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request,
return w.callFunc(ctx, addr, req, rsp, opts) return w.callFunc(ctx, addr, req, rsp, opts)
} }
} }
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(ClientRequestInflight, labels...).Inc()
ts := time.Now() ts := time.Now()
err := w.callFunc(ctx, addr, req, rsp, opts) err := w.callFunc(ctx, addr, req, rsp, opts)
te := time.Since(ts) te := time.Since(ts)
w.opts.Meter.Counter(ClientRequestInflight, labels...).Dec()
lopts := w.opts.lopts w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint)) w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds())
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(ClientRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil { if err == nil {
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess)) labels = append(labels, labelStatus, labelSuccess)
} else { } else {
lopts = append(lopts, meter.Labels(labelStatus, labelFailure)) labels = append(labels, labelStatus, labelFailure)
} }
w.opts.Meter.Counter(ClientRequestTotal, lopts...).Inc() w.opts.Meter.Counter(ClientRequestTotal, labels...).Inc()
return err return err
} }
@@ -144,22 +154,24 @@ func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{},
} }
} }
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(ClientRequestInflight, labels...).Inc()
ts := time.Now() ts := time.Now()
err := w.Client.Call(ctx, req, rsp, opts...) err := w.Client.Call(ctx, req, rsp, opts...)
te := time.Since(ts) te := time.Since(ts)
w.opts.Meter.Counter(ClientRequestInflight, labels...).Dec()
lopts := w.opts.lopts w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint)) w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds())
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(ClientRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil { if err == nil {
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess)) labels = append(labels, labelStatus, labelSuccess)
} else { } else {
lopts = append(lopts, meter.Labels(labelStatus, labelFailure)) labels = append(labels, labelStatus, labelFailure)
} }
w.opts.Meter.Counter(ClientRequestTotal, lopts...).Inc() w.opts.Meter.Counter(ClientRequestTotal, labels...).Inc()
return err return err
} }
@@ -172,22 +184,24 @@ func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client
} }
} }
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(ClientRequestInflight, labels...).Inc()
ts := time.Now() ts := time.Now()
stream, err := w.Client.Stream(ctx, req, opts...) stream, err := w.Client.Stream(ctx, req, opts...)
te := time.Since(ts) te := time.Since(ts)
w.opts.Meter.Counter(ClientRequestInflight, labels...).Dec()
lopts := w.opts.lopts w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint)) w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds())
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(ClientRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil { if err == nil {
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess)) labels = append(labels, labelStatus, labelSuccess)
} else { } else {
lopts = append(lopts, meter.Labels(labelStatus, labelFailure)) labels = append(labels, labelStatus, labelFailure)
} }
w.opts.Meter.Counter(ClientRequestTotal, lopts...).Inc() w.opts.Meter.Counter(ClientRequestTotal, labels...).Inc()
return stream, err return stream, err
} }
@@ -195,22 +209,24 @@ func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client
func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
endpoint := p.Topic() endpoint := p.Topic()
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(PublishMessageInflight, labels...).Inc()
ts := time.Now() ts := time.Now()
err := w.Client.Publish(ctx, p, opts...) err := w.Client.Publish(ctx, p, opts...)
te := time.Since(ts) te := time.Since(ts)
w.opts.Meter.Counter(PublishMessageInflight, labels...).Dec()
lopts := w.opts.lopts w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, labels...).Update(te.Seconds())
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint)) w.opts.Meter.Histogram(PublishMessageDurationSeconds, labels...).Update(te.Seconds())
w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(PublishMessageDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil { if err == nil {
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess)) labels = append(labels, labelStatus, labelSuccess)
} else { } else {
lopts = append(lopts, meter.Labels(labelStatus, labelFailure)) labels = append(labels, labelStatus, labelFailure)
} }
w.opts.Meter.Counter(PublishMessageTotal, lopts...).Inc() w.opts.Meter.Counter(PublishMessageTotal, labels...).Inc()
return err return err
} }
@@ -231,22 +247,24 @@ func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc {
} }
} }
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(ServerRequestInflight, labels...).Inc()
ts := time.Now() ts := time.Now()
err := fn(ctx, req, rsp) err := fn(ctx, req, rsp)
te := time.Since(ts) te := time.Since(ts)
w.opts.Meter.Counter(ServerRequestInflight, labels...).Dec()
lopts := w.opts.lopts w.opts.Meter.Summary(ServerRequestLatencyMicroseconds, labels...).Update(te.Seconds())
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint)) w.opts.Meter.Histogram(ServerRequestDurationSeconds, labels...).Update(te.Seconds())
w.opts.Meter.Summary(ServerRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(ServerRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil { if err == nil {
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess)) labels = append(labels, labelStatus, labelSuccess)
} else { } else {
lopts = append(lopts, meter.Labels(labelStatus, labelFailure)) labels = append(labels, labelStatus, labelFailure)
} }
w.opts.Meter.Counter(ServerRequestTotal, lopts...).Inc() w.opts.Meter.Counter(ServerRequestTotal, labels...).Inc()
return err return err
} }
@@ -263,22 +281,24 @@ func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc
return func(ctx context.Context, msg server.Message) error { return func(ctx context.Context, msg server.Message) error {
endpoint := msg.Topic() endpoint := msg.Topic()
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Inc()
ts := time.Now() ts := time.Now()
err := fn(ctx, msg) err := fn(ctx, msg)
te := time.Since(ts) te := time.Since(ts)
w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Dec()
lopts := w.opts.lopts w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, labels...).Update(te.Seconds())
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint)) w.opts.Meter.Histogram(SubscribeMessageDurationSeconds, labels...).Update(te.Seconds())
w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(SubscribeMessageDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil { if err == nil {
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess)) labels = append(labels, labelStatus, labelSuccess)
} else { } else {
lopts = append(lopts, meter.Labels(labelStatus, labelFailure)) labels = append(labels, labelStatus, labelFailure)
} }
w.opts.Meter.Counter(SubscribeMessageTotal, lopts...).Inc() w.opts.Meter.Counter(SubscribeMessageTotal, labels...).Inc()
return err return err
} }

View File

@@ -31,18 +31,18 @@ type memoryClient struct {
} }
type memoryListener struct { type memoryListener struct {
topts Options
ctx context.Context
lopts ListenOptions lopts ListenOptions
ctx context.Context
exit chan bool exit chan bool
conn chan *memorySocket conn chan *memorySocket
addr string addr string
topts Options
sync.RWMutex sync.RWMutex
} }
type memoryTransport struct { type memoryTransport struct {
opts Options
listeners map[string]*memoryListener listeners map[string]*memoryListener
opts Options
sync.RWMutex sync.RWMutex
} }

View File

@@ -27,9 +27,9 @@ func TestMemoryTransport(t *testing.T) {
if len(os.Getenv("INTEGRATION_TESTS")) == 0 { if len(os.Getenv("INTEGRATION_TESTS")) == 0 {
t.Logf("Server Received %s", string(m.Body)) t.Logf("Server Received %s", string(m.Body))
} }
if err := sock.Send(&Message{ if cerr := sock.Send(&Message{
Body: []byte(`pong`), Body: []byte(`pong`),
}); err != nil { }); cerr != nil {
return return
} }
} }
@@ -60,7 +60,6 @@ func TestMemoryTransport(t *testing.T) {
t.Logf("Client Received %s", string(m.Body)) t.Logf("Client Received %s", string(m.Body))
} }
} }
} }
func TestListener(t *testing.T) { func TestListener(t *testing.T) {

View File

@@ -12,16 +12,16 @@ import (
) )
type tunBroker struct { type tunBroker struct {
opts broker.Options
tunnel tunnel.Tunnel tunnel tunnel.Tunnel
opts broker.Options
} }
type tunSubscriber struct { type tunSubscriber struct {
opts broker.SubscribeOptions
listener tunnel.Listener listener tunnel.Listener
handler broker.Handler handler broker.Handler
closed chan bool closed chan bool
topic string topic string
opts broker.SubscribeOptions
} }
type tunEvent struct { type tunEvent struct {
@@ -30,8 +30,10 @@ type tunEvent struct {
} }
// used to access tunnel from options context // used to access tunnel from options context
type tunnelKey struct{} type (
type tunnelAddr struct{} tunnelKey struct{}
tunnelAddr struct{}
)
func (t *tunBroker) Init(opts ...broker.Option) error { func (t *tunBroker) Init(opts ...broker.Option) error {
for _, o := range opts { for _, o := range opts {

View File

@@ -34,8 +34,8 @@ type Options struct {
Token string Token string
// Name holds the tunnel name // Name holds the tunnel name
Name string Name string
// Id holds the tunnel id // ID holds the tunnel id
Id string ID string
// Address holds the tunnel address // Address holds the tunnel address
Address string Address string
// Nodes holds the tunnel nodes // Nodes holds the tunnel nodes
@@ -68,10 +68,10 @@ type ListenOptions struct {
Timeout time.Duration Timeout time.Duration
} }
// Id sets the tunnel id // ID sets the tunnel id
func Id(id string) Option { func ID(id string) Option {
return func(o *Options) { return func(o *Options) {
o.Id = id o.ID = id
} }
} }
@@ -164,7 +164,7 @@ func DialWait(b bool) DialOption {
// NewOptions returns router default options with filled values // NewOptions returns router default options with filled values
func NewOptions(opts ...Option) Options { func NewOptions(opts ...Option) Options {
options := Options{ options := Options{
Id: uuid.New().String(), ID: uuid.New().String(),
Address: DefaultAddress, Address: DefaultAddress,
Token: DefaultToken, Token: DefaultToken,
Logger: logger.DefaultLogger, Logger: logger.DefaultLogger,

View File

@@ -10,9 +10,8 @@ import (
) )
type tunTransport struct { type tunTransport struct {
tunnel tunnel.Tunnel
options transport.Options options transport.Options
tunnel tunnel.Tunnel
} }
type tunnelKey struct{} type tunnelKey struct{}
@@ -88,7 +87,7 @@ func NewTransport(opts ...transport.Option) transport.Transport {
} }
// initialise // initialise
//t.Init(opts...) // t.Init(opts...)
return t return t
} }

View File

@@ -9,10 +9,8 @@ import (
"github.com/unistack-org/micro/v3/network/transport" "github.com/unistack-org/micro/v3/network/transport"
) )
var ( // DefaultTunnel contains default tunnel implementation
// DefaultTunnel contains default tunnel implementation var DefaultTunnel Tunnel
DefaultTunnel Tunnel
)
const ( const (
// Unicast send over one link // Unicast send over one link

View File

@@ -17,8 +17,6 @@ import (
"github.com/unistack-org/micro/v3/server" "github.com/unistack-org/micro/v3/server"
"github.com/unistack-org/micro/v3/store" "github.com/unistack-org/micro/v3/store"
"github.com/unistack-org/micro/v3/tracer" "github.com/unistack-org/micro/v3/tracer"
// "github.com/unistack-org/micro/v3/profiler"
// "github.com/unistack-org/micro/v3/runtime"
) )
// Options for micro service // Options for micro service
@@ -78,8 +76,8 @@ func NewOptions(opts ...Option) Options {
Meters: []meter.Meter{meter.DefaultMeter}, Meters: []meter.Meter{meter.DefaultMeter},
Configs: []config.Config{config.DefaultConfig}, Configs: []config.Config{config.DefaultConfig},
Stores: []store.Store{store.DefaultStore}, Stores: []store.Store{store.DefaultStore},
//Runtime runtime.Runtime // Runtime runtime.Runtime
//Profile profile.Profile // Profile profile.Profile
} }
for _, o := range opts { for _, o := range opts {

View File

@@ -16,10 +16,8 @@ type httpProfile struct {
running bool running bool
} }
var ( // DefaultAddress for http profiler
// DefaultAddress for http profiler var DefaultAddress = ":6060"
DefaultAddress = ":6060"
)
// Start the profiler // Start the profiler
func (h *httpProfile) Start() error { func (h *httpProfile) Start() error {

View File

@@ -11,10 +11,8 @@ type Profiler interface {
String() string String() string
} }
var ( // DefaultProfiler holds the default profiler
// DefaultProfiler holds the default profiler var DefaultProfiler Profiler = NewProfiler()
DefaultProfiler Profiler = NewProfiler()
)
// Options holds the options for profiler // Options holds the options for profiler
type Options struct { type Options struct {

View File

@@ -7,10 +7,8 @@ import (
"github.com/unistack-org/micro/v3/server" "github.com/unistack-org/micro/v3/server"
) )
var ( // DefaultEndpoint holds default proxy address
// DefaultEndpoint holds default proxy address var DefaultEndpoint = "localhost:9090"
DefaultEndpoint = "localhost:9090"
)
// Proxy can be used as a proxy server for micro services // Proxy can be used as a proxy server for micro services
type Proxy interface { type Proxy interface {

View File

@@ -3,7 +3,6 @@ package register
import ( import (
"fmt" "fmt"
"reflect" "reflect"
"strings"
"unicode" "unicode"
"unicode/utf8" "unicode/utf8"
@@ -11,12 +10,12 @@ import (
) )
// ExtractValue from reflect.Type from specified depth // ExtractValue from reflect.Type from specified depth
func ExtractValue(v reflect.Type, d int) *Value { func ExtractValue(v reflect.Type, d int) string {
if d == 3 { if d == 3 {
return nil return ""
} }
if v == nil { if v == nil {
return nil return ""
} }
if v.Kind() == reflect.Ptr { if v.Kind() == reflect.Ptr {
@@ -25,7 +24,7 @@ func ExtractValue(v reflect.Type, d int) *Value {
// slices and maps don't have a defined name // slices and maps don't have a defined name
if (v.Kind() == reflect.Slice || v.Kind() == reflect.Map) || len(v.Name()) == 0 { if (v.Kind() == reflect.Slice || v.Kind() == reflect.Map) || len(v.Name()) == 0 {
return nil return ""
} }
// get the rune character // get the rune character
@@ -33,58 +32,10 @@ func ExtractValue(v reflect.Type, d int) *Value {
// crude check for is unexported field // crude check for is unexported field
if unicode.IsLower(a) { if unicode.IsLower(a) {
return nil return ""
} }
arg := &Value{ return v.Name()
Name: v.Name(),
Type: v.Name(),
}
switch v.Kind() {
case reflect.Struct:
for i := 0; i < v.NumField(); i++ {
f := v.Field(i)
val := ExtractValue(f.Type, d+1)
if val == nil {
continue
}
// if we can find a json tag use it
if tags := f.Tag.Get("json"); len(tags) > 0 {
parts := strings.Split(tags, ",")
if parts[0] == "-" || parts[0] == "omitempty" {
continue
}
val.Name = parts[0]
}
// if there's no name default it
if len(val.Name) == 0 {
val.Name = v.Field(i).Name
}
arg.Values = append(arg.Values, val)
}
case reflect.Slice:
p := v.Elem()
if p.Kind() == reflect.Ptr {
p = p.Elem()
}
arg.Type = "[]" + p.Name()
case reflect.Map:
p := v.Elem()
if p.Kind() == reflect.Ptr {
p = p.Elem()
}
key := v.Key()
if key.Kind() == reflect.Ptr {
key = key.Elem()
}
arg.Type = fmt.Sprintf("map[%s]%s", key.Name(), p.Name())
}
return arg
} }
// ExtractEndpoint extract *Endpoint from reflect.Method // ExtractEndpoint extract *Endpoint from reflect.Method
@@ -116,7 +67,7 @@ func ExtractEndpoint(method reflect.Method) *Endpoint {
request := ExtractValue(reqType, 0) request := ExtractValue(reqType, 0)
response := ExtractValue(rspType, 0) response := ExtractValue(rspType, 0)
if request == nil || response == nil { if request == "" || response == "" {
return nil return nil
} }
@@ -135,7 +86,7 @@ func ExtractEndpoint(method reflect.Method) *Endpoint {
} }
// ExtractSubValue exctact *Value from reflect.Type // ExtractSubValue exctact *Value from reflect.Type
func ExtractSubValue(typ reflect.Type) *Value { func ExtractSubValue(typ reflect.Type) string {
var reqType reflect.Type var reqType reflect.Type
switch typ.NumIn() { switch typ.NumIn() {
case 1: case 1:
@@ -145,7 +96,7 @@ func ExtractSubValue(typ reflect.Type) *Value {
case 3: case 3:
reqType = typ.In(2) reqType = typ.In(2)
default: default:
return nil return ""
} }
return ExtractValue(reqType, 0) return ExtractValue(reqType, 0)
} }

View File

@@ -36,28 +36,21 @@ func TestExtractEndpoint(t *testing.T) {
t.Fatalf("Expected handler Test, got %s", endpoints[0].Name) t.Fatalf("Expected handler Test, got %s", endpoints[0].Name)
} }
if endpoints[0].Request == nil { if endpoints[0].Request == "" {
t.Fatal("Expected non nil Request") t.Fatal("Expected non nil Request")
} }
if endpoints[0].Response == nil { if endpoints[0].Response == "" {
t.Fatal("Expected non nil Request") t.Fatal("Expected non nil Request")
} }
if endpoints[0].Request.Name != "TestRequest" { if endpoints[0].Request != "TestRequest" {
t.Fatalf("Expected TestRequest got %s", endpoints[0].Request.Name) t.Fatalf("Expected TestRequest got %s", endpoints[0].Request)
} }
if endpoints[0].Response.Name != "TestResponse" { if endpoints[0].Response != "TestResponse" {
t.Fatalf("Expected TestResponse got %s", endpoints[0].Response.Name) t.Fatalf("Expected TestResponse got %s", endpoints[0].Response)
}
if endpoints[0].Request.Type != "TestRequest" {
t.Fatalf("Expected TestRequest type got %s", endpoints[0].Request.Type)
}
if endpoints[0].Response.Type != "TestResponse" {
t.Fatalf("Expected TestResponse type got %s", endpoints[0].Response.Type)
} }
t.Logf("XXX %#+v\n", endpoints[0])
} }

View File

@@ -30,10 +30,9 @@ type record struct {
} }
type memory struct { type memory struct {
opts Options
// records is a KV map with domain name as the key and a services map as the value
records map[string]services records map[string]services
watchers map[string]*watcher watchers map[string]*watcher
opts Options
sync.RWMutex sync.RWMutex
} }
@@ -65,7 +64,7 @@ func (m *memory) ttlPrune() {
for id, n := range record.Nodes { for id, n := range record.Nodes {
if n.TTL != 0 && time.Since(n.LastSeen) > n.TTL { if n.TTL != 0 && time.Since(n.LastSeen) > n.TTL {
if m.opts.Logger.V(logger.DebugLevel) { if m.opts.Logger.V(logger.DebugLevel) {
m.opts.Logger.Debugf(m.opts.Context, "Register TTL expired for node %s of service %s", n.Id, service) m.opts.Logger.Debugf(m.opts.Context, "Register TTL expired for node %s of service %s", n.ID, service)
} }
delete(m.records[domain][service][version].Nodes, id) delete(m.records[domain][service][version].Nodes, id)
} }
@@ -162,7 +161,7 @@ func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOptio
for _, n := range s.Nodes { for _, n := range s.Nodes {
// check if already exists // check if already exists
if _, ok := srvs[s.Name][s.Version].Nodes[n.Id]; ok { if _, ok := srvs[s.Name][s.Version].Nodes[n.ID]; ok {
continue continue
} }
@@ -177,9 +176,9 @@ func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOptio
metadata["domain"] = options.Domain metadata["domain"] = options.Domain
// add the node // add the node
srvs[s.Name][s.Version].Nodes[n.Id] = &node{ srvs[s.Name][s.Version].Nodes[n.ID] = &node{
Node: &Node{ Node: &Node{
Id: n.Id, ID: n.ID,
Address: n.Address, Address: n.Address,
Metadata: metadata, Metadata: metadata,
}, },
@@ -201,8 +200,8 @@ func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOptio
if m.opts.Logger.V(logger.DebugLevel) { if m.opts.Logger.V(logger.DebugLevel) {
m.opts.Logger.Debugf(m.opts.Context, "Updated registration for service: %s, version: %s", s.Name, s.Version) m.opts.Logger.Debugf(m.opts.Context, "Updated registration for service: %s, version: %s", s.Name, s.Version)
} }
srvs[s.Name][s.Version].Nodes[n.Id].TTL = options.TTL srvs[s.Name][s.Version].Nodes[n.ID].TTL = options.TTL
srvs[s.Name][s.Version].Nodes[n.Id].LastSeen = time.Now() srvs[s.Name][s.Version].Nodes[n.ID].LastSeen = time.Now()
} }
} }
@@ -242,11 +241,11 @@ func (m *memory) Deregister(ctx context.Context, s *Service, opts ...DeregisterO
// deregister all of the service nodes from this version // deregister all of the service nodes from this version
for _, n := range s.Nodes { for _, n := range s.Nodes {
if _, ok := version.Nodes[n.Id]; ok { if _, ok := version.Nodes[n.ID]; ok {
if m.opts.Logger.V(logger.DebugLevel) { if m.opts.Logger.V(logger.DebugLevel) {
m.opts.Logger.Debugf(m.opts.Context, "Register removed node from service: %s, version: %s", s.Name, s.Version) m.opts.Logger.Debugf(m.opts.Context, "Register removed node from service: %s, version: %s", s.Name, s.Version)
} }
delete(version.Nodes, n.Id) delete(version.Nodes, n.ID)
} }
} }
@@ -458,7 +457,7 @@ func serviceToRecord(s *Service, ttl time.Duration) *record {
nodes := make(map[string]*node, len(s.Nodes)) nodes := make(map[string]*node, len(s.Nodes))
for _, n := range s.Nodes { for _, n := range s.Nodes {
nodes[n.Id] = &node{ nodes[n.ID] = &node{
Node: n, Node: n,
TTL: ttl, TTL: ttl,
LastSeen: time.Now(), LastSeen: time.Now(),
@@ -490,40 +489,31 @@ func recordToService(r *record, domain string) *Service {
endpoints := make([]*Endpoint, len(r.Endpoints)) endpoints := make([]*Endpoint, len(r.Endpoints))
for i, e := range r.Endpoints { for i, e := range r.Endpoints {
request := new(Value) md := make(map[string]string, len(e.Metadata))
if e.Request != nil {
*request = *e.Request
}
response := new(Value)
if e.Response != nil {
*response = *e.Response
}
metadata := make(map[string]string, len(e.Metadata))
for k, v := range e.Metadata { for k, v := range e.Metadata {
metadata[k] = v md[k] = v
} }
endpoints[i] = &Endpoint{ endpoints[i] = &Endpoint{
Name: e.Name, Name: e.Name,
Request: request, Request: e.Request,
Response: response, Response: e.Response,
Metadata: metadata, Metadata: md,
} }
} }
nodes := make([]*Node, len(r.Nodes)) nodes := make([]*Node, len(r.Nodes))
i := 0 i := 0
for _, n := range r.Nodes { for _, n := range r.Nodes {
metadata := make(map[string]string, len(n.Metadata)) md := make(map[string]string, len(n.Metadata))
for k, v := range n.Metadata { for k, v := range n.Metadata {
metadata[k] = v md[k] = v
} }
nodes[i] = &Node{ nodes[i] = &Node{
Id: n.Id, ID: n.ID,
Address: n.Address, Address: n.Address,
Metadata: metadata, Metadata: md,
} }
i++ i++
} }

View File

@@ -8,72 +8,70 @@ import (
"time" "time"
) )
var ( var testData = map[string][]*Service{
testData = map[string][]*Service{ "foo": {
"foo": { {
{ Name: "foo",
Name: "foo", Version: "1.0.0",
Version: "1.0.0", Nodes: []*Node{
Nodes: []*Node{ {
{ ID: "foo-1.0.0-123",
Id: "foo-1.0.0-123", Address: "localhost:9999",
Address: "localhost:9999",
},
{
Id: "foo-1.0.0-321",
Address: "localhost:9999",
},
}, },
}, {
{ ID: "foo-1.0.0-321",
Name: "foo", Address: "localhost:9999",
Version: "1.0.1",
Nodes: []*Node{
{
Id: "foo-1.0.1-321",
Address: "localhost:6666",
},
},
},
{
Name: "foo",
Version: "1.0.3",
Nodes: []*Node{
{
Id: "foo-1.0.3-345",
Address: "localhost:8888",
},
}, },
}, },
}, },
"bar": { {
{ Name: "foo",
Name: "bar", Version: "1.0.1",
Version: "default", Nodes: []*Node{
Nodes: []*Node{ {
{ ID: "foo-1.0.1-321",
Id: "bar-1.0.0-123", Address: "localhost:6666",
Address: "localhost:9999",
},
{
Id: "bar-1.0.0-321",
Address: "localhost:9999",
},
},
},
{
Name: "bar",
Version: "latest",
Nodes: []*Node{
{
Id: "bar-1.0.1-321",
Address: "localhost:6666",
},
}, },
}, },
}, },
} {
) Name: "foo",
Version: "1.0.3",
Nodes: []*Node{
{
ID: "foo-1.0.3-345",
Address: "localhost:8888",
},
},
},
},
"bar": {
{
Name: "bar",
Version: "default",
Nodes: []*Node{
{
ID: "bar-1.0.0-123",
Address: "localhost:9999",
},
{
ID: "bar-1.0.0-321",
Address: "localhost:9999",
},
},
},
{
Name: "bar",
Version: "latest",
Nodes: []*Node{
{
ID: "bar-1.0.1-321",
Address: "localhost:6666",
},
},
},
},
}
//nolint:gocyclo //nolint:gocyclo
func TestMemoryRegistry(t *testing.T) { func TestMemoryRegistry(t *testing.T) {

View File

@@ -44,6 +44,7 @@ func NewOptions(opts ...Option) Options {
return options return options
} }
// nolint: golint
// RegisterOptions holds options for register method // RegisterOptions holds options for register method
type RegisterOptions struct { type RegisterOptions struct {
Context context.Context Context context.Context
@@ -196,6 +197,7 @@ func TLSConfig(t *tls.Config) Option {
} }
} }
// nolint: golint
// RegisterAttempts specifies register atempts count // RegisterAttempts specifies register atempts count
func RegisterAttempts(t int) RegisterOption { func RegisterAttempts(t int) RegisterOption {
return func(o *RegisterOptions) { return func(o *RegisterOptions) {
@@ -203,6 +205,7 @@ func RegisterAttempts(t int) RegisterOption {
} }
} }
// nolint: golint
// RegisterTTL specifies register ttl // RegisterTTL specifies register ttl
func RegisterTTL(t time.Duration) RegisterOption { func RegisterTTL(t time.Duration) RegisterOption {
return func(o *RegisterOptions) { return func(o *RegisterOptions) {
@@ -210,6 +213,7 @@ func RegisterTTL(t time.Duration) RegisterOption {
} }
} }
// nolint: golint
// RegisterContext sets the register context // RegisterContext sets the register context
func RegisterContext(ctx context.Context) RegisterOption { func RegisterContext(ctx context.Context) RegisterOption {
return func(o *RegisterOptions) { return func(o *RegisterOptions) {
@@ -217,6 +221,7 @@ func RegisterContext(ctx context.Context) RegisterOption {
} }
} }
// nolint: golint
// RegisterDomain secifies register domain // RegisterDomain secifies register domain
func RegisterDomain(d string) RegisterOption { func RegisterDomain(d string) RegisterOption {
return func(o *RegisterOptions) { return func(o *RegisterOptions) {

View File

@@ -53,29 +53,23 @@ type Service struct {
// Node holds node register info // Node holds node register info
type Node struct { type Node struct {
Metadata metadata.Metadata `json:"metadata"` Metadata metadata.Metadata `json:"metadata"`
Id string `json:"id"` ID string `json:"id"`
Address string `json:"address"` Address string `json:"address"`
} }
// Endpoint holds endpoint register info // Endpoint holds endpoint register info
type Endpoint struct { type Endpoint struct {
Request *Value `json:"request"` Request string `json:"request"`
Response *Value `json:"response"` Response string `json:"response"`
Metadata metadata.Metadata `json:"metadata"` Metadata metadata.Metadata `json:"metadata"`
Name string `json:"name"` Name string `json:"name"`
} }
// Value holds additional kv stuff
type Value struct {
Name string `json:"name"`
Type string `json:"type"`
Values []*Value `json:"values"`
}
// Option func signature // Option func signature
type Option func(*Options) type Option func(*Options)
// RegisterOption option is used to register service // RegisterOption option is used to register service
// nolint: golint
type RegisterOption func(*RegisterOptions) type RegisterOption func(*RegisterOptions)
// WatchOption option is used to watch service changes // WatchOption option is used to watch service changes

View File

@@ -52,8 +52,8 @@ type Event struct {
Timestamp time.Time Timestamp time.Time
// Service is register service // Service is register service
Service *Service Service *Service
// Id is register id // ID is register id
Id string ID string
// Type defines type of event // Type defines type of event
Type EventType Type EventType
} }

View File

@@ -5,10 +5,8 @@ import (
"time" "time"
) )
var ( // ErrWatcherStopped is returned when routing table watcher has been stopped
// ErrWatcherStopped is returned when routing table watcher has been stopped var ErrWatcherStopped = errors.New("watcher stopped")
ErrWatcherStopped = errors.New("watcher stopped")
)
// EventType defines routing table event // EventType defines routing table event
type EventType int type EventType int
@@ -38,12 +36,12 @@ func (t EventType) String() string {
// Event is returned by a call to Next on the watcher. // Event is returned by a call to Next on the watcher.
type Event struct { type Event struct {
// Route is table route
Route Route
// Timestamp is event timestamp // Timestamp is event timestamp
Timestamp time.Time Timestamp time.Time
// Id of the event // Id of the event
Id string Id string
// Route is table route
Route Route
// Type defines type of event // Type defines type of event
Type EventType Type EventType
} }

View File

@@ -8,10 +8,8 @@ import (
"github.com/unistack-org/micro/v3/metadata" "github.com/unistack-org/micro/v3/metadata"
) )
var ( // ErrAlreadyExists error
// ErrAlreadyExists error var ErrAlreadyExists = errors.New("already exists")
ErrAlreadyExists = errors.New("already exists")
)
// Runtime is a service runtime manager // Runtime is a service runtime manager
type Runtime interface { type Runtime interface {

View File

@@ -5,10 +5,8 @@ import (
"errors" "errors"
) )
var ( // ErrNoneAvailable is returned by select when no routes were provided to select from
// ErrNoneAvailable is returned by select when no routes were provided to select from var ErrNoneAvailable = errors.New("none available")
ErrNoneAvailable = errors.New("none available")
)
// Selector selects a route from a pool // Selector selects a route from a pool
type Selector interface { type Selector interface {

View File

@@ -13,7 +13,7 @@ type rpcHandler struct {
endpoints []*register.Endpoint endpoints []*register.Endpoint
} }
func newRpcHandler(handler interface{}, opts ...HandlerOption) Handler { func newRPCHandler(handler interface{}, opts ...HandlerOption) Handler {
options := NewHandlerOptions(opts...) options := NewHandlerOptions(opts...)
typ := reflect.TypeOf(handler) typ := reflect.TypeOf(handler)

View File

@@ -5,17 +5,12 @@ import (
"github.com/unistack-org/micro/v3/codec" "github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/errors" "github.com/unistack-org/micro/v3/errors"
"github.com/unistack-org/micro/v3/server"
) )
var ( var _ HealthServer = &Handler{}
// guard to fail early
_ HealthServer = &handler{}
)
type handler struct { type Handler struct {
server server.Server opts Options
opts Options
} }
type CheckFunc func(context.Context) error type CheckFunc func(context.Context) error
@@ -23,10 +18,10 @@ type CheckFunc func(context.Context) error
type Option func(*Options) type Option func(*Options)
type Options struct { type Options struct {
LiveChecks []CheckFunc
ReadyChecks []CheckFunc
Version string Version string
Name string Name string
LiveChecks []CheckFunc
ReadyChecks []CheckFunc
} }
func LiveChecks(fns ...CheckFunc) Option { func LiveChecks(fns ...CheckFunc) Option {
@@ -53,15 +48,15 @@ func Version(version string) Option {
} }
} }
func NewHandler(opts ...Option) *handler { func NewHandler(opts ...Option) *Handler {
options := Options{} options := Options{}
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
} }
return &handler{opts: options} return &Handler{opts: options}
} }
func (h *handler) Live(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error { func (h *Handler) Live(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
var err error var err error
for _, fn := range h.opts.LiveChecks { for _, fn := range h.opts.LiveChecks {
if err = fn(ctx); err != nil { if err = fn(ctx); err != nil {
@@ -71,7 +66,7 @@ func (h *handler) Live(ctx context.Context, req *codec.Frame, rsp *codec.Frame)
return nil return nil
} }
func (h *handler) Ready(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error { func (h *Handler) Ready(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
var err error var err error
for _, fn := range h.opts.ReadyChecks { for _, fn := range h.opts.ReadyChecks {
if err = fn(ctx); err != nil { if err = fn(ctx); err != nil {
@@ -81,7 +76,7 @@ func (h *handler) Ready(ctx context.Context, req *codec.Frame, rsp *codec.Frame)
return nil return nil
} }
func (h *handler) Version(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error { func (h *Handler) Version(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
rsp.Data = []byte(h.opts.Version) rsp.Data = []byte(h.opts.Version)
return nil return nil
} }

View File

@@ -6,9 +6,6 @@ import (
"sync" "sync"
"time" "time"
// cjson "github.com/unistack-org/micro-codec-json"
// cjsonrpc "github.com/unistack-org/micro-codec-jsonrpc"
// cproto "github.com/unistack-org/micro-codec-proto"
// cprotorpc "github.com/unistack-org/micro-codec-protorpc" // cprotorpc "github.com/unistack-org/micro-codec-protorpc"
"github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/broker"
"github.com/unistack-org/micro/v3/codec" "github.com/unistack-org/micro/v3/codec"
@@ -16,29 +13,23 @@ import (
"github.com/unistack-org/micro/v3/register" "github.com/unistack-org/micro/v3/register"
) )
var ( // DefaultCodecs will be used to encode/decode
// DefaultCodecs will be used to encode/decode var DefaultCodecs = map[string]codec.Codec{
DefaultCodecs = map[string]codec.Codec{ "application/octet-stream": codec.NewCodec(),
//"application/json": cjson.NewCodec, }
//"application/json-rpc": cjsonrpc.NewCodec,
//"application/protobuf": cproto.NewCodec,
//"application/proto-rpc": cprotorpc.NewCodec,
"application/octet-stream": codec.NewCodec(),
}
)
const ( const (
defaultContentType = "application/json" defaultContentType = "application/json"
) )
type noopServer struct { type noopServer struct {
opts Options
h Handler h Handler
wg *sync.WaitGroup
rsvc *register.Service rsvc *register.Service
handlers map[string]Handler handlers map[string]Handler
subscribers map[*subscriber][]broker.Subscriber subscribers map[*subscriber][]broker.Subscriber
exit chan chan error exit chan chan error
wg *sync.WaitGroup opts Options
sync.RWMutex sync.RWMutex
registered bool registered bool
started bool started bool
@@ -103,7 +94,7 @@ func (n *noopServer) Subscribe(sb Subscriber) error {
} }
func (n *noopServer) NewHandler(h interface{}, opts ...HandlerOption) Handler { func (n *noopServer) NewHandler(h interface{}, opts ...HandlerOption) Handler {
return newRpcHandler(h, opts...) return newRPCHandler(h, opts...)
} }
func (n *noopServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber { func (n *noopServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber {
@@ -158,23 +149,16 @@ func (n *noopServer) Register() error {
} }
n.RLock() n.RLock()
// Maps are ordered randomly, sort the keys for consistency handlerList := make([]string, 0, len(n.handlers))
var handlerList []string for n := range n.handlers {
for n, e := range n.handlers { handlerList = append(handlerList, n)
// Only advertise non internal handlers
if !e.Options().Internal {
handlerList = append(handlerList, n)
}
} }
sort.Strings(handlerList) sort.Strings(handlerList)
var subscriberList []*subscriber subscriberList := make([]*subscriber, 0, len(n.subscribers))
for e := range n.subscribers { for e := range n.subscribers {
// Only advertise non internal subscribers subscriberList = append(subscriberList, e)
if !e.Options().Internal {
subscriberList = append(subscriberList, e)
}
} }
sort.Slice(subscriberList, func(i, j int) bool { sort.Slice(subscriberList, func(i, j int) bool {
return subscriberList[i].topic > subscriberList[j].topic return subscriberList[i].topic > subscriberList[j].topic
@@ -190,7 +174,7 @@ func (n *noopServer) Register() error {
n.RUnlock() n.RUnlock()
service.Nodes[0].Metadata["protocol"] = "noop" service.Nodes[0].Metadata["protocol"] = "noop"
service.Nodes[0].Metadata["transport"] = "noop" service.Nodes[0].Metadata["transport"] = service.Nodes[0].Metadata["protocol"]
service.Endpoints = endpoints service.Endpoints = endpoints
n.RLock() n.RLock()
@@ -199,7 +183,7 @@ func (n *noopServer) Register() error {
if !registered { if !registered {
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].Id) config.Logger.Infof(n.opts.Context, "register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID)
} }
} }
@@ -262,7 +246,7 @@ func (n *noopServer) Deregister() error {
} }
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "deregistering node: %s", service.Nodes[0].Id) config.Logger.Infof(n.opts.Context, "deregistering node: %s", service.Nodes[0].ID)
} }
if err := DefaultDeregisterFunc(service, config); err != nil { if err := DefaultDeregisterFunc(service, config); err != nil {
@@ -344,9 +328,10 @@ func (n *noopServer) Start() error {
} }
// use RegisterCheck func before register // use RegisterCheck func before register
// nolint: nestif
if err := config.RegisterCheck(config.Context); err != nil { if err := config.RegisterCheck(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s", config.Name, config.Id, err) config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s", config.Name, config.ID, err)
} }
} else { } else {
// announce self to the world // announce self to the world
@@ -378,25 +363,26 @@ func (n *noopServer) Start() error {
registered := n.registered registered := n.registered
n.RUnlock() n.RUnlock()
rerr := config.RegisterCheck(config.Context) rerr := config.RegisterCheck(config.Context)
// nolint: nestif
if rerr != nil && registered { if rerr != nil && registered {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr) config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s, deregister it", config.Name, config.ID, rerr)
} }
// deregister self in case of error // deregister self in case of error
if err := n.Deregister(); err != nil { if err := n.Deregister(); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "server %s-%s deregister error: %s", config.Name, config.Id, err) config.Logger.Errorf(n.opts.Context, "server %s-%s deregister error: %s", config.Name, config.ID, err)
} }
} }
} else if rerr != nil && !registered { } else if rerr != nil && !registered {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s", config.Name, config.Id, rerr) config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s", config.Name, config.ID, rerr)
} }
continue continue
} }
if err := n.Register(); err != nil { if err := n.Register(); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "server %s-%s register error: %s", config.Name, config.Id, err) config.Logger.Errorf(n.opts.Context, "server %s-%s register error: %s", config.Name, config.ID, err)
} }
} }
// wait for exit // wait for exit

View File

@@ -57,8 +57,8 @@ type Options struct {
RegisterCheck func(context.Context) error RegisterCheck func(context.Context) error
// Codecs map to handle content-type // Codecs map to handle content-type
Codecs map[string]codec.Codec Codecs map[string]codec.Codec
// Id holds the id of the server // ID holds the id of the server
Id string ID string
// Namespace for te server // Namespace for te server
Namespace string Namespace string
// Name holds the server name // Name holds the server name
@@ -104,7 +104,7 @@ func NewOptions(opts ...Option) Options {
Address: DefaultAddress, Address: DefaultAddress,
Name: DefaultName, Name: DefaultName,
Version: DefaultVersion, Version: DefaultVersion,
Id: DefaultId, ID: DefaultID,
Namespace: DefaultNamespace, Namespace: DefaultNamespace,
} }
@@ -143,10 +143,10 @@ func Meter(m meter.Meter) Option {
} }
} }
// Id unique server id // ID unique server id
func Id(id string) Option { func ID(id string) Option {
return func(o *Options) { return func(o *Options) {
o.Id = id o.ID = id
} }
} }
@@ -325,8 +325,6 @@ type HandlerOptions struct {
Context context.Context Context context.Context
// Metadata for hondler // Metadata for hondler
Metadata map[string]metadata.Metadata Metadata map[string]metadata.Metadata
// Internal flag limits exporting to other nodes via register
Internal bool
} }
// NewHandlerOptions creates new HandlerOptions // NewHandlerOptions creates new HandlerOptions
@@ -354,8 +352,6 @@ type SubscriberOptions struct {
Queue string Queue string
// AutoAck flag for auto ack messages after processing // AutoAck flag for auto ack messages after processing
AutoAck bool AutoAck bool
// Internal flag limit exporting info via register
Internal bool
// BodyOnly flag specifies that message without headers // BodyOnly flag specifies that message without headers
BodyOnly bool BodyOnly bool
} }
@@ -382,23 +378,6 @@ func EndpointMetadata(name string, md metadata.Metadata) HandlerOption {
} }
} }
// InternalHandler options specifies that a handler is not advertised
// to the discovery system. In the future this may also limit request
// to the internal network or authorised user.
func InternalHandler(b bool) HandlerOption {
return func(o *HandlerOptions) {
o.Internal = b
}
}
// InternalSubscriber options specifies that a subscriber is not advertised
// to the discovery system.
func InternalSubscriber(b bool) SubscriberOption {
return func(o *SubscriberOptions) {
o.Internal = b
}
}
// DisableAutoAck will disable auto acking of messages // DisableAutoAck will disable auto acking of messages
// after they have been handled. // after they have been handled.
func DisableAutoAck() SubscriberOption { func DisableAutoAck() SubscriberOption {

View File

@@ -72,7 +72,7 @@ func NewRegisterService(s Server) (*register.Service, error) {
} }
node := &register.Node{ node := &register.Node{
Id: opts.Name + "-" + opts.Id, ID: opts.Name + "-" + opts.ID,
Address: net.JoinHostPort(addr, port), Address: net.JoinHostPort(addr, port),
} }
node.Metadata = metadata.Copy(opts.Metadata) node.Metadata = metadata.Copy(opts.Metadata)

View File

@@ -11,10 +11,8 @@ import (
"github.com/unistack-org/micro/v3/register" "github.com/unistack-org/micro/v3/register"
) )
var ( // DefaultServer default server
// DefaultServer default server var DefaultServer Server = NewServer()
DefaultServer Server = NewServer()
)
var ( var (
// DefaultAddress will be used if no address passed // DefaultAddress will be used if no address passed
@@ -23,8 +21,8 @@ var (
DefaultName = "server" DefaultName = "server"
// DefaultVersion will be used if no version passed // DefaultVersion will be used if no version passed
DefaultVersion = "latest" DefaultVersion = "latest"
// DefaultId will be used if no id passed // DefaultID will be used if no id passed
DefaultId = uuid.New().String() DefaultID = uuid.New().String()
// DefaultRegisterCheck holds func that run before register server // DefaultRegisterCheck holds func that run before register server
DefaultRegisterCheck = func(context.Context) error { return nil } DefaultRegisterCheck = func(context.Context) error { return nil }
// DefaultRegisterInterval holds interval for register // DefaultRegisterInterval holds interval for register

View File

@@ -21,11 +21,9 @@ const (
subSig = "func(context.Context, interface{}) error" subSig = "func(context.Context, interface{}) error"
) )
var ( // Precompute the reflect type for error. Can't use error directly
// Precompute the reflect type for error. Can't use error directly // because Typeof takes an empty interface value. This is annoying.
// because Typeof takes an empty interface value. This is annoying. var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
typeOfError = reflect.TypeOf((*error)(nil)).Elem()
)
type handler struct { type handler struct {
reqType reflect.Type reqType reflect.Type
@@ -64,7 +62,8 @@ func ValidateSubscriber(sub Subscriber) error {
typ := reflect.TypeOf(sub.Subscriber()) typ := reflect.TypeOf(sub.Subscriber())
var argType reflect.Type var argType reflect.Type
if typ.Kind() == reflect.Func { switch typ.Kind() {
case reflect.Func:
name := "Func" name := "Func"
switch typ.NumIn() { switch typ.NumIn() {
case 2: case 2:
@@ -82,7 +81,7 @@ func ValidateSubscriber(sub Subscriber) error {
if returnType := typ.Out(0); returnType != typeOfError { if returnType := typ.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String()) return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
} }
} else { default:
hdlr := reflect.ValueOf(sub.Subscriber()) hdlr := reflect.ValueOf(sub.Subscriber())
name := reflect.Indirect(hdlr).Type().Name() name := reflect.Indirect(hdlr).Type().Name()
@@ -276,14 +275,14 @@ func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handl
if n.wg != nil { if n.wg != nil {
defer n.wg.Done() defer n.wg.Done()
} }
err := fn(ctx, &rpcMessage{ cerr := fn(ctx, &rpcMessage{
topic: sb.topic, topic: sb.topic,
contentType: ct, contentType: ct,
payload: req.Interface(), payload: req.Interface(),
header: msg.Header, header: msg.Header,
body: msg.Body, body: msg.Body,
}) })
results <- err results <- cerr
}() }()
} }
var errors []string var errors []string

View File

@@ -162,7 +162,6 @@ func (s *service) Broker(names ...string) broker.Broker {
idx = getNameIndex(names[0], s.opts.Brokers) idx = getNameIndex(names[0], s.opts.Brokers)
} }
return s.opts.Brokers[idx] return s.opts.Brokers[idx]
} }
func (s *service) Tracer(names ...string) tracer.Tracer { func (s *service) Tracer(names ...string) tracer.Tracer {

View File

@@ -28,8 +28,8 @@ func (m *memoryStore) Disconnect(ctx context.Context) error {
} }
type memoryStore struct { type memoryStore struct {
opts Options
store *cache.Cache store *cache.Cache
opts Options
} }
func (m *memoryStore) key(prefix, key string) string { func (m *memoryStore) key(prefix, key string) string {

View File

@@ -5,8 +5,6 @@ package store
import ( import (
"context" "context"
"errors" "errors"
"github.com/unistack-org/micro/v3/metadata"
) )
var ( var (
@@ -42,12 +40,3 @@ type Store interface {
// String returns the name of the implementation. // String returns the name of the implementation.
String() string String() string
} }
// Value is an item stored or retrieved from a Store
// may be used in store implementations to provide metadata
type Value struct {
// Data holds underline struct
Data interface{} `json:"data"`
// Metadata associated with data for indexing
Metadata metadata.Metadata `json:"metadata"`
}

View File

@@ -6,8 +6,8 @@ import (
) )
type memorySync struct { type memorySync struct {
options Options
locks map[string]*memoryLock locks map[string]*memoryLock
options Options
mtx gosync.RWMutex mtx gosync.RWMutex
} }

View File

@@ -5,10 +5,8 @@ import (
"errors" "errors"
) )
var ( // ErrLockTimeout error
// ErrLockTimeout error var ErrLockTimeout = errors.New("lock timeout")
ErrLockTimeout = errors.New("lock timeout")
)
// Sync is an interface for distributed synchronization // Sync is an interface for distributed synchronization
type Sync interface { type Sync interface {

View File

@@ -38,7 +38,6 @@ type noopSpan struct {
} }
func (s *noopSpan) Finish(opts ...SpanOption) { func (s *noopSpan) Finish(opts ...SpanOption) {
} }
func (s *noopSpan) Context() context.Context { func (s *noopSpan) Context() context.Context {
@@ -50,7 +49,6 @@ func (s *noopSpan) Tracer() Tracer {
} }
func (s *noopSpan) AddEvent(name string, opts ...EventOption) { func (s *noopSpan) AddEvent(name string, opts ...EventOption) {
} }
func (s *noopSpan) SetName(name string) { func (s *noopSpan) SetName(name string) {
@@ -58,7 +56,6 @@ func (s *noopSpan) SetName(name string) {
} }
func (s *noopSpan) SetLabels(labels ...Label) { func (s *noopSpan) SetLabels(labels ...Label) {
} }
// NewTracer returns new memory tracer // NewTracer returns new memory tracer

View File

@@ -2,13 +2,11 @@ package tracer
import "github.com/unistack-org/micro/v3/logger" import "github.com/unistack-org/micro/v3/logger"
type SpanOptions struct { type SpanOptions struct{}
}
type SpanOption func(o *SpanOptions) type SpanOption func(o *SpanOptions)
type EventOptions struct { type EventOptions struct{}
}
type EventOption func(o *EventOptions) type EventOption func(o *EventOptions)

View File

@@ -5,10 +5,8 @@ import (
"context" "context"
) )
var ( // DefaultTracer is the global default tracer
// DefaultTracer is the global default tracer var DefaultTracer Tracer = NewTracer()
DefaultTracer Tracer = NewTracer()
)
// Tracer is an interface for distributed tracing // Tracer is an interface for distributed tracing
type Tracer interface { type Tracer interface {

View File

@@ -113,12 +113,14 @@ type tWrapper struct {
opts Options opts Options
} }
type ClientCallObserver func(context.Context, client.Request, interface{}, []client.CallOption, tracer.Span, error) type (
type ClientStreamObserver func(context.Context, client.Request, []client.CallOption, client.Stream, tracer.Span, error) ClientCallObserver func(context.Context, client.Request, interface{}, []client.CallOption, tracer.Span, error)
type ClientPublishObserver func(context.Context, client.Message, []client.PublishOption, tracer.Span, error) ClientStreamObserver func(context.Context, client.Request, []client.CallOption, client.Stream, tracer.Span, error)
type ClientCallFuncObserver func(context.Context, string, client.Request, interface{}, client.CallOptions, tracer.Span, error) ClientPublishObserver func(context.Context, client.Message, []client.PublishOption, tracer.Span, error)
type ServerHandlerObserver func(context.Context, server.Request, interface{}, tracer.Span, error) ClientCallFuncObserver func(context.Context, string, client.Request, interface{}, client.CallOptions, tracer.Span, error)
type ServerSubscriberObserver func(context.Context, server.Message, tracer.Span, error) ServerHandlerObserver func(context.Context, server.Request, interface{}, tracer.Span, error)
ServerSubscriberObserver func(context.Context, server.Message, tracer.Span, error)
)
// Options struct // Options struct
type Options struct { type Options struct {

View File

@@ -5,9 +5,7 @@ import (
"net" "net"
) )
var ( var privateBlocks []*net.IPNet
privateBlocks []*net.IPNet
)
func init() { func init() {
for _, b := range []string{"10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "100.64.0.0/10", "fd00::/8"} { for _, b := range []string{"10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "100.64.0.0/10", "fd00::/8"} {

View File

@@ -54,7 +54,6 @@ func TestExtractor(t *testing.T) {
t.Errorf("Expected %s got %s", d.expect, addr) t.Errorf("Expected %s got %s", d.expect, addr)
} }
} }
} }
func TestAppendPrivateBlocks(t *testing.T) { func TestAppendPrivateBlocks(t *testing.T) {

View File

@@ -4,20 +4,20 @@ import (
"bytes" "bytes"
) )
type buffer struct { type Buffer struct {
*bytes.Buffer *bytes.Buffer
} }
// Close reset buffer contents // Close reset buffer contents
func (b *buffer) Close() error { func (b *Buffer) Close() error {
b.Buffer.Reset() b.Buffer.Reset()
return nil return nil
} }
// New creates new buffer that satisfies Closer interface // New creates new buffer that satisfies Closer interface
func New(b *bytes.Buffer) *buffer { func New(b *bytes.Buffer) *Buffer {
if b == nil { if b == nil {
b = bytes.NewBuffer(nil) b = bytes.NewBuffer(nil)
} }
return &buffer{b} return &Buffer{b}
} }

View File

@@ -27,7 +27,6 @@ func HostPort(addr string, port interface{}) string {
// Listen takes addr:portmin-portmax and binds to the first available port // Listen takes addr:portmin-portmax and binds to the first available port
// Example: Listen("localhost:5000-6000", fn) // Example: Listen("localhost:5000-6000", fn)
func Listen(addr string, fn func(string) (net.Listener, error)) (net.Listener, error) { func Listen(addr string, fn func(string) (net.Listener, error)) (net.Listener, error) {
if strings.Count(addr, ":") == 1 && strings.Count(addr, "-") == 0 { if strings.Count(addr, ":") == 1 && strings.Count(addr, "-") == 0 {
return fn(addr) return fn(addr)
} }

View File

@@ -22,5 +22,4 @@ func TestListen(t *testing.T) {
// TODO nats case test // TODO nats case test
// natsAddr := "_INBOX.bID2CMRvlNp0vt4tgNBHWf" // natsAddr := "_INBOX.bID2CMRvlNp0vt4tgNBHWf"
// Expect addr DO NOT has extra ":" at the end! // Expect addr DO NOT has extra ":" at the end!
} }

View File

@@ -8,9 +8,8 @@ import (
"crypto/rand" "crypto/rand"
"crypto/x509" "crypto/x509"
"encoding/pem" "encoding/pem"
"fmt"
"errors" "errors"
"fmt"
) )
// GenerateKey returns an ed25519 key // GenerateKey returns an ed25519 key
@@ -46,14 +45,14 @@ func CA(opts ...CertOption) ([]byte, []byte, error) {
return nil, nil, err return nil, nil, err
} }
cert, key := &bytes.Buffer{}, &bytes.Buffer{} cert, key := &bytes.Buffer{}, &bytes.Buffer{}
if err := pem.Encode(cert, &pem.Block{Type: "CERTIFICATE", Bytes: x509Cert}); err != nil { if err = pem.Encode(cert, &pem.Block{Type: "CERTIFICATE", Bytes: x509Cert}); err != nil {
return nil, nil, err return nil, nil, err
} }
x509Key, err := x509.MarshalPKCS8PrivateKey(options.Priv) x509Key, err := x509.MarshalPKCS8PrivateKey(options.Priv)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
if err := pem.Encode(key, &pem.Block{Type: "PRIVATE KEY", Bytes: x509Key}); err != nil { if err = pem.Encode(key, &pem.Block{Type: "PRIVATE KEY", Bytes: x509Key}); err != nil {
return nil, nil, err return nil, nil, err
} }

View File

@@ -63,9 +63,7 @@ func Unmarshal(dst interface{}, query string) error {
// possible. Eg the example above would output: // possible. Eg the example above would output:
// {"bar":{"one":{"two":2,"red":112}}} // {"bar":{"one":{"two":2,"red":112}}}
func ToJSON(query string) ([]byte, error) { func ToJSON(query string) ([]byte, error) {
var ( var builder interface{} = make(map[string]interface{})
builder interface{} = make(map[string]interface{})
)
params := strings.Split(query, "&") params := strings.Split(query, "&")
for _, part := range params { for _, part := range params {
tempMap, err := queryToMap(part) tempMap, err := queryToMap(part)

View File

@@ -32,9 +32,10 @@ type unmarshalT struct {
A string `json:"a"` A string `json:"a"`
B unmarshalB `json:"b"` B unmarshalB `json:"b"`
} }
type unmarshalB struct { type unmarshalB struct {
C int `json:"c"`
D string `json:"D"` D string `json:"D"`
C int `json:"c"`
} }
func TestUnmarshal(t *testing.T) { func TestUnmarshal(t *testing.T) {

View File

@@ -11,7 +11,7 @@ type Rand struct {
} }
func (r *Rand) Int31() int32 { func (r *Rand) Int31() int32 {
rand.Read(r.buf[:4]) _, _ = rand.Read(r.buf[:4])
return int32(binary.BigEndian.Uint32(r.buf[:4]) & ^uint32(1<<31)) return int32(binary.BigEndian.Uint32(r.buf[:4]) & ^uint32(1<<31))
} }
@@ -54,7 +54,7 @@ func (r *Rand) Intn(n int) int {
} }
func (r *Rand) Int63() int64 { func (r *Rand) Int63() int64 {
rand.Read(r.buf[:]) _, _ = rand.Read(r.buf[:])
return int64(binary.BigEndian.Uint64(r.buf[:]) & ^uint64(1<<63)) return int64(binary.BigEndian.Uint64(r.buf[:]) & ^uint64(1<<63))
} }

View File

@@ -5,7 +5,6 @@ import (
) )
func TestPath(t *testing.T) { func TestPath(t *testing.T) {
type Nested2 struct { type Nested2 struct {
Name string Name string
} }

File diff suppressed because it is too large Load Diff

475
util/reflect/struct.go Normal file
View File

@@ -0,0 +1,475 @@
package reflect
import (
"errors"
"fmt"
"net/url"
"reflect"
"regexp"
"strings"
)
// ErrInvalidParam specifies invalid url query params
var ErrInvalidParam = errors.New("invalid url query param provided")
var bracketSplitter = regexp.MustCompile(`\[|\]`)
func StructFieldByTag(src interface{}, tkey string, tval string) (interface{}, error) {
sv := reflect.ValueOf(src)
if sv.Kind() == reflect.Ptr {
sv = sv.Elem()
}
if sv.Kind() != reflect.Struct {
return nil, ErrInvalidStruct
}
typ := sv.Type()
for idx := 0; idx < typ.NumField(); idx++ {
fld := typ.Field(idx)
val := sv.Field(idx)
if !val.CanSet() || len(fld.PkgPath) != 0 {
continue
}
if ts, ok := fld.Tag.Lookup(tkey); ok {
for _, p := range strings.Split(ts, ",") {
if p == tval {
if val.Kind() != reflect.Ptr && val.CanAddr() {
val = val.Addr()
}
return val.Interface(), nil
}
}
}
switch val.Kind() {
case reflect.Ptr:
if val = val.Elem(); val.Kind() == reflect.Struct {
if iface, err := StructFieldByTag(val.Interface(), tkey, tval); err == nil {
return iface, nil
}
}
case reflect.Struct:
if iface, err := StructFieldByTag(val.Interface(), tkey, tval); err == nil {
return iface, nil
}
}
}
return nil, ErrNotFound
}
func StructFieldByName(src interface{}, tkey string) (interface{}, error) {
sv := reflect.ValueOf(src)
if sv.Kind() == reflect.Ptr {
sv = sv.Elem()
}
if sv.Kind() != reflect.Struct {
return nil, ErrInvalidStruct
}
typ := sv.Type()
for idx := 0; idx < typ.NumField(); idx++ {
fld := typ.Field(idx)
val := sv.Field(idx)
if !val.CanSet() || len(fld.PkgPath) != 0 {
continue
}
if fld.Name == tkey {
if val.Kind() != reflect.Ptr && val.CanAddr() {
val = val.Addr()
}
return val.Interface(), nil
}
switch val.Kind() {
case reflect.Ptr:
if val = val.Elem(); val.Kind() == reflect.Struct {
if iface, err := StructFieldByName(val.Interface(), tkey); err == nil {
return iface, nil
}
}
case reflect.Struct:
if iface, err := StructFieldByName(val.Interface(), tkey); err == nil {
return iface, nil
}
}
}
return nil, ErrNotFound
}
// StructFields returns slice of struct fields
func StructFields(src interface{}) ([]reflect.StructField, error) {
var fields []reflect.StructField
sv := reflect.ValueOf(src)
if sv.Kind() == reflect.Ptr {
sv = sv.Elem()
}
if sv.Kind() != reflect.Struct {
return nil, ErrInvalidStruct
}
typ := sv.Type()
for idx := 0; idx < typ.NumField(); idx++ {
fld := typ.Field(idx)
val := sv.Field(idx)
if !val.CanSet() || len(fld.PkgPath) != 0 {
continue
}
if val.Kind() == reflect.Struct {
infields, err := StructFields(val.Interface())
if err != nil {
return nil, err
}
fields = append(fields, infields...)
} else {
fields = append(fields, fld)
}
}
return fields, nil
}
// CopyDefaults for a from b
// a and b should be pointers to the same kind of struct
func CopyDefaults(a, b interface{}) {
pt := reflect.TypeOf(a)
t := pt.Elem()
va := reflect.ValueOf(a).Elem()
vb := reflect.ValueOf(b).Elem()
for i := 0; i < t.NumField(); i++ {
aField := va.Field(i)
if aField.CanSet() {
bField := vb.Field(i)
aField.Set(bField)
}
}
}
// CopyFrom sets the public members of a from b
// a and b should be pointers to structs
// a can be a different type from b
// Only the Fields which have the same name and assignable type on a
// and b will be set.
func CopyFrom(a, b interface{}) {
ta := reflect.TypeOf(a).Elem()
tb := reflect.TypeOf(b).Elem()
va := reflect.ValueOf(a).Elem()
vb := reflect.ValueOf(b).Elem()
for i := 0; i < tb.NumField(); i++ {
bField := vb.Field(i)
tbField := tb.Field(i)
name := tbField.Name
aField := va.FieldByName(name)
taField, found := ta.FieldByName(name)
if found && aField.IsValid() && bField.IsValid() && aField.CanSet() && tbField.Type.AssignableTo(taField.Type) {
aField.Set(bField)
}
}
}
func StructURLValues(src interface{}, pref string, tags []string) (url.Values, error) {
data := url.Values{}
sv := reflect.ValueOf(src)
if sv.Kind() == reflect.Ptr {
sv = sv.Elem()
}
if sv.Kind() != reflect.Struct {
return nil, ErrInvalidStruct
}
typ := sv.Type()
for idx := 0; idx < typ.NumField(); idx++ {
fld := typ.Field(idx)
val := sv.Field(idx)
if !val.CanSet() || len(fld.PkgPath) != 0 || !val.IsValid() {
continue
}
var t *tag
for _, tn := range tags {
ts, ok := fld.Tag.Lookup(tn)
if !ok {
continue
}
tp := strings.Split(ts, ",")
// special
switch tn {
case "protobuf": // special
t = &tag{key: tn, name: tp[3][5:], opts: append(tp[:3], tp[4:]...)}
default:
t = &tag{key: tn, name: tp[0], opts: tp[1:]}
}
if t.name != "" {
break
}
}
if t.name == "" {
// fallback to lowercase
t.name = strings.ToLower(fld.Name)
}
if pref != "" {
t.name = pref + "." + t.name
}
if !val.IsValid() || val.IsZero() {
continue
}
switch val.Kind() {
case reflect.Struct, reflect.Ptr:
if val.IsNil() {
continue
}
ndata, err := StructURLValues(val.Interface(), t.name, tags)
if err != nil {
return ndata, err
}
for k, v := range ndata {
data[k] = v
}
default:
switch val.Kind() {
case reflect.Slice:
for i := 0; i < val.Len(); i++ {
va := val.Index(i)
// if va.Type().Elem().Kind() != reflect.Ptr {
if va.Kind() != reflect.Ptr {
data.Set(t.name, fmt.Sprintf("%v", va.Interface()))
continue
}
switch va.Type().Elem().String() {
case "wrapperspb.BoolValue", "wrapperspb.BytesValue", "wrapperspb.StringValue":
if eva := reflect.Indirect(va).FieldByName("Value"); eva.IsValid() {
data.Add(t.name, fmt.Sprintf("%v", eva.Interface()))
}
case "wrapperspb.DoubleValue", "wrapperspb.FloatValue":
if eva := reflect.Indirect(va).FieldByName("Value"); eva.IsValid() {
data.Add(t.name, fmt.Sprintf("%v", eva.Interface()))
}
case "wrapperspb.Int32Value", "wrapperspb.Int64Value":
if eva := reflect.Indirect(va).FieldByName("Value"); eva.IsValid() {
data.Add(t.name, fmt.Sprintf("%v", eva.Interface()))
}
case "wrapperspb.UInt32Value", "wrapperspb.UInt64Value":
if eva := reflect.Indirect(va).FieldByName("Value"); eva.IsValid() {
data.Add(t.name, fmt.Sprintf("%v", eva.Interface()))
}
default:
data.Add(t.name, fmt.Sprintf("%v", val.Index(i).Interface()))
}
}
default:
data.Set(t.name, fmt.Sprintf("%v", val.Interface()))
}
}
}
return data, nil
}
// URLMap returns map of url query params
func URLMap(query string) (map[string]interface{}, error) {
var mp interface{} = make(map[string]interface{})
params := strings.Split(query, "&")
for _, part := range params {
tm, err := queryToMap(part)
if err != nil {
return nil, err
}
mp = merge(mp, tm)
}
return mp.(map[string]interface{}), nil
}
// FlattenMap expand key.subkey to nested map
func FlattenMap(a map[string]interface{}) map[string]interface{} {
// preprocess map
nb := make(map[string]interface{}, len(a))
for k, v := range a {
ps := strings.Split(k, ".")
if len(ps) == 1 {
nb[k] = v
continue
}
em := make(map[string]interface{})
em[ps[len(ps)-1]] = v
for i := len(ps) - 2; i > 0; i-- {
nm := make(map[string]interface{})
nm[ps[i]] = em
em = nm
}
if vm, ok := nb[ps[0]]; ok {
// nested map
nm := vm.(map[string]interface{})
for vk, vv := range em {
nm[vk] = vv
}
nb[ps[0]] = nm
} else {
nb[ps[0]] = em
}
}
return nb
}
/*
case reflect.String:
fn := func(c rune) bool { return c == ',' || c == ';' || c == ' ' }
slice := strings.FieldsFunc(vb.String(), fn)
if va.IsNil() {
va.Set(reflect.MakeSlice(va.Type(), len(slice), len(slice)))
}
*/
func btSplitter(str string) []string {
r := bracketSplitter.Split(str, -1)
for idx, s := range r {
if len(s) == 0 {
if len(r) > idx+1 {
copy(r[idx:], r[idx+1:])
r = r[:len(r)-1]
}
}
}
return r
}
// queryToMap turns something like a[b][c]=4 into
// map[string]interface{}{
// "a": map[string]interface{}{
// "b": map[string]interface{}{
// "c": 4,
// },
// },
// }
func queryToMap(param string) (map[string]interface{}, error) {
rawKey, rawValue, err := splitKeyAndValue(param)
if err != nil {
return nil, err
}
rawValue, err = url.QueryUnescape(rawValue)
if err != nil {
return nil, err
}
rawKey, err = url.QueryUnescape(rawKey)
if err != nil {
return nil, err
}
pieces := btSplitter(rawKey)
key := pieces[0]
// If len==1 then rawKey has no [] chars and we can just
// decode this as key=value into {key: value}
if len(pieces) == 1 {
return map[string]interface{}{
key: rawValue,
}, nil
}
// If len > 1 then we have something like a[b][c]=2
// so we need to turn this into {"a": {"b": {"c": 2}}}
// To do this we break our key into two pieces:
// a and b[c]
// and then we set {"a": queryToMap("b[c]", value)}
ret := make(map[string]interface{})
ret[key], err = queryToMap(buildNewKey(rawKey) + "=" + rawValue)
if err != nil {
return nil, err
}
// When URL params have a set of empty brackets (eg a[]=1)
// it is assumed to be an array. This will get us the
// correct value for the array item and return it as an
// []interface{} so that it can be merged properly.
if pieces[1] == "" {
temp := ret[key].(map[string]interface{})
ret[key] = []interface{}{temp[""]}
}
return ret, nil
}
// buildNewKey will take something like:
// origKey = "bar[one][two]"
// pieces = [bar one two ]
// and return "one[two]"
func buildNewKey(origKey string) string {
pieces := btSplitter(origKey)
ret := origKey[len(pieces[0])+1:]
ret = ret[:len(pieces[1])] + ret[len(pieces[1])+1:]
return ret
}
// splitKeyAndValue splits a URL param at the last equal
// sign and returns the two strings. If no equal sign is
// found, the ErrInvalidParam error is returned.
func splitKeyAndValue(param string) (string, string, error) {
li := strings.LastIndex(param, "=")
if li == -1 {
return "", "", ErrInvalidParam
}
return param[:li], param[li+1:], nil
}
// merge merges a with b if they are either both slices
// or map[string]interface{} types. Otherwise it returns b.
func merge(a interface{}, b interface{}) interface{} {
if av, aok := a.(map[string]interface{}); aok {
if bv, bok := b.(map[string]interface{}); bok {
return mergeMapIface(av, bv)
}
}
if av, aok := a.([]interface{}); aok {
if bv, bok := b.([]interface{}); bok {
return mergeSliceIface(av, bv)
}
}
va := reflect.ValueOf(a)
vb := reflect.ValueOf(b)
if (va.Type().Kind() == reflect.Slice) && (va.Type().Elem().Kind() == vb.Type().Kind() || vb.Type().ConvertibleTo(va.Type().Elem())) {
va = reflect.Append(va, vb.Convert(va.Type().Elem()))
return va.Interface()
}
return b
}
// mergeMap merges a with b, attempting to merge any nested
// values in nested maps but eventually overwriting anything
// in a that can't be merged with whatever is in b.
func mergeMapIface(a map[string]interface{}, b map[string]interface{}) map[string]interface{} {
for bK, bV := range b {
if aV, ok := a[bK]; ok {
if (reflect.ValueOf(aV).Type().Kind() == reflect.ValueOf(bV).Type().Kind()) ||
((reflect.ValueOf(aV).Type().Kind() == reflect.Slice) && reflect.ValueOf(aV).Type().Elem().Kind() == reflect.ValueOf(bV).Type().Kind()) {
nV := []interface{}{aV, bV}
a[bK] = nV
} else {
a[bK] = merge(a[bK], bV)
}
} else {
a[bK] = bV
}
}
return a
}
// mergeSlice merges a with b and returns the result.
func mergeSliceIface(a []interface{}, b []interface{}) []interface{} {
a = append(a, b...)
return a
}
type tag struct {
key string
name string
opts []string
}

View File

@@ -5,6 +5,62 @@ import (
"testing" "testing"
) )
func TestStructByTag(t *testing.T) {
type Str struct {
Name []string `json:"name" codec:"flatten"`
}
val := &Str{Name: []string{"first", "second"}}
iface, err := StructFieldByTag(val, "codec", "flatten")
if err != nil {
t.Fatal(err)
}
if v, ok := iface.(*[]string); !ok {
t.Fatalf("not *[]string %v", iface)
} else if len(*v) != 2 {
t.Fatalf("invalid number %v", iface)
}
}
func TestStructByName(t *testing.T) {
type Str struct {
Name []string `json:"name" codec:"flatten"`
}
val := &Str{Name: []string{"first", "second"}}
iface, err := StructFieldByName(val, "Name")
if err != nil {
t.Fatal(err)
}
if v, ok := iface.(*[]string); !ok {
t.Fatalf("not *[]string %v", iface)
} else if len(*v) != 2 {
t.Fatalf("invalid number %v", iface)
}
}
func TestStructURLValues(t *testing.T) {
type Str struct {
Str *Str `json:"str"`
Name string `json:"name"`
Args []int `json:"args"`
}
val := &Str{Name: "test_name", Args: []int{1, 2, 3}, Str: &Str{Name: "nested_name"}}
data, err := StructURLValues(val, "", []string{"json"})
if err != nil {
t.Fatal(err)
}
if data.Get("name") != "test_name" {
t.Fatalf("invalid data: %v", data)
}
}
func TestURLSliceVars(t *testing.T) { func TestURLSliceVars(t *testing.T) {
u, err := url.Parse("http://localhost/v1/test/call/my_name?key=arg1&key=arg2&key=arg3") u, err := url.Parse("http://localhost/v1/test/call/my_name?key=arg1&key=arg2&key=arg3")
if err != nil { if err != nil {
@@ -72,8 +128,8 @@ func TestIsZero(t *testing.T) {
Nested string Nested string
} }
type testStr2 struct { type testStr2 struct {
Name string
Nested *testStr3 Nested *testStr3
Name string
} }
vtest := &testStr2{ vtest := &testStr2{
Name: "test_name", Name: "test_name",
@@ -88,5 +144,5 @@ func TestIsZero(t *testing.T) {
t.Fatalf("non zero ret on zero struct: %#+v", vtest) t.Fatalf("non zero ret on zero struct: %#+v", vtest)
} }
//t.Logf("XX %#+v\n", ok) // t.Logf("XX %#+v\n", ok)
} }

View File

@@ -5,11 +5,11 @@ import (
) )
func addNodes(old, neu []*register.Node) []*register.Node { func addNodes(old, neu []*register.Node) []*register.Node {
nodes := make([]*register.Node, len(neu)) nodes := make([]*register.Node, 0, len(neu))
// add all new nodes // add all new nodes
for i, n := range neu { for _, n := range neu {
node := *n node := *n
nodes[i] = &node nodes = append(nodes, &node)
} }
// look at old nodes // look at old nodes
@@ -19,7 +19,7 @@ func addNodes(old, neu []*register.Node) []*register.Node {
// check against new nodes // check against new nodes
for _, n := range nodes { for _, n := range nodes {
// ids match then skip // ids match then skip
if o.Id == n.Id { if o.ID == n.ID {
exists = true exists = true
break break
} }
@@ -40,7 +40,7 @@ func delNodes(old, del []*register.Node) []*register.Node {
for _, o := range old { for _, o := range old {
var rem bool var rem bool
for _, n := range del { for _, n := range del {
if o.Id == n.Id { if o.ID == n.ID {
rem = true rem = true
break break
} }

View File

@@ -14,7 +14,7 @@ func TestRemove(t *testing.T) {
Version: "1.0.0", Version: "1.0.0",
Nodes: []*register.Node{ Nodes: []*register.Node{
{ {
Id: "foo-123", ID: "foo-123",
Address: "localhost:9999", Address: "localhost:9999",
}, },
}, },
@@ -24,7 +24,7 @@ func TestRemove(t *testing.T) {
Version: "1.0.0", Version: "1.0.0",
Nodes: []*register.Node{ Nodes: []*register.Node{
{ {
Id: "foo-123", ID: "foo-123",
Address: "localhost:6666", Address: "localhost:6666",
}, },
}, },
@@ -47,11 +47,11 @@ func TestRemoveNodes(t *testing.T) {
Version: "1.0.0", Version: "1.0.0",
Nodes: []*register.Node{ Nodes: []*register.Node{
{ {
Id: "foo-123", ID: "foo-123",
Address: "localhost:9999", Address: "localhost:9999",
}, },
{ {
Id: "foo-321", ID: "foo-321",
Address: "localhost:6666", Address: "localhost:6666",
}, },
}, },
@@ -61,7 +61,7 @@ func TestRemoveNodes(t *testing.T) {
Version: "1.0.0", Version: "1.0.0",
Nodes: []*register.Node{ Nodes: []*register.Node{
{ {
Id: "foo-123", ID: "foo-123",
Address: "localhost:6666", Address: "localhost:6666",
}, },
}, },

View File

@@ -77,7 +77,7 @@ func (t template) Compile() Template {
rawOps = append(rawOps, s.compile()...) rawOps = append(rawOps, s.compile()...)
} }
//ops := make([]int, 0, len(rawOps)) // ops := make([]int, 0, len(rawOps))
var ( var (
ops []int ops []int
pool []string pool []string

View File

@@ -50,9 +50,7 @@ func tokenize(path string) (tokens []string, verb string) {
field field
nested nested
) )
var ( st := init
st = init
)
for path != "" { for path != "" {
var idx int var idx int
switch st { switch st {

View File

@@ -209,7 +209,8 @@ func TestParseSegments(t *testing.T) {
"a", "/", "b", "/", "*", "/", "c", "a", "/", "b", "/", "*", "/", "c",
"}", "/", "}", "/",
"**", "**",
eof}, eof,
},
want: []segment{ want: []segment{
literal("v1"), literal("v1"),
variable{ variable{

View File

@@ -40,7 +40,7 @@ func (c *syncStore) processQueue(index int) {
} }
var opts []store.WriteOption var opts []store.WriteOption
if !ir.expiresAt.IsZero() { if !ir.expiresAt.IsZero() {
opts = append(opts, store.WriteTTL(ir.expiresAt.Sub(time.Now()))) opts = append(opts, store.WriteTTL(time.Until(ir.expiresAt)))
} }
// Todo = internal queue also has to hold the corresponding store.WriteOptions // Todo = internal queue also has to hold the corresponding store.WriteOptions
if err := c.syncOpts.Stores[index+1].Write(c.storeOpts.Context, ir.key, ir.value, opts...); err != nil { if err := c.syncOpts.Stores[index+1].Write(c.storeOpts.Context, ir.key, ir.value, opts...); err != nil {

View File

@@ -17,10 +17,8 @@ type Basic struct {
store store.Store store store.Store
} }
var ( // StorePrefix to isolate tokens
// StorePrefix to isolate tokens var StorePrefix = "tokens/"
StorePrefix = "tokens/"
)
// NewTokenProvider returns an initialized basic provider // NewTokenProvider returns an initialized basic provider
func NewTokenProvider(opts ...token.Option) token.Provider { func NewTokenProvider(opts ...token.Option) token.Provider {

View File

@@ -83,5 +83,4 @@ func TestInspect(t *testing.T) {
t.Fatalf("Inspect returned %v error, expected %v", err, token.ErrInvalidToken) t.Fatalf("Inspect returned %v error, expected %v", err, token.ErrInvalidToken)
} }
}) })
} }

View File

@@ -46,7 +46,7 @@ func NewOptions(opts ...Option) Options {
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
} }
//set default store // set default store
if options.Store == nil { if options.Store == nil {
options.Store = store.DefaultStore options.Store = store.DefaultStore
} }
@@ -75,7 +75,7 @@ func NewGenerateOptions(opts ...GenerateOption) GenerateOptions {
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
} }
//set default Expiry of token // set default Expiry of token
if options.Expiry == 0 { if options.Expiry == 0 {
options.Expiry = time.Minute * 15 options.Expiry = time.Minute * 15
} }