Compare commits
44 Commits
Author | SHA1 | Date | |
---|---|---|---|
919520219c | |||
60a5e737f8 | |||
34f0b209cc | |||
ba8e1889fe | |||
dae5c57a60 | |||
ea590d57df | |||
|
9aa6969836 | ||
|
c00c705c24 | ||
|
0239f795d8 | ||
|
e69b43881d | ||
3a48a613fe | |||
86626c5922 | |||
ee11f39a2f | |||
3bdfdd8fd2 | |||
6dfdff7fd8 | |||
00a4785df3 | |||
|
bae3b0ef94 | ||
|
89b0565062 | ||
1f8b0aeb61 | |||
|
5b6f849e0a | ||
|
3b416fffde | ||
3a60103aed | |||
41837a67f8 | |||
852f19598d | |||
6537b35773 | |||
b733f1316f | |||
|
840af5574c | ||
|
56e5b7001c | ||
|
11dc6fd752 | ||
a2695d8699 | |||
618421de05 | |||
|
30baaabd9f | ||
df5bce1191 | |||
|
089d0fe4df | ||
a06f535303 | |||
|
eba586a329 | ||
|
d74a8645e8 | ||
|
5a00786192 | ||
|
b3e9941634 | ||
|
a5a5904302 | ||
|
a59832e57e | ||
0e42033e7f | |||
52d8255974 | |||
9830cb48a9 |
19
.github/dependabot.yml
vendored
Normal file
19
.github/dependabot.yml
vendored
Normal file
@@ -0,0 +1,19 @@
|
||||
# To get started with Dependabot version updates, you'll need to specify which
|
||||
# package ecosystems to update and where the package manifests are located.
|
||||
# Please see the documentation for all configuration options:
|
||||
# https://help.github.com/github/administering-a-repository/configuration-options-for-dependency-updates
|
||||
|
||||
version: 2
|
||||
updates:
|
||||
|
||||
# Maintain dependencies for GitHub Actions
|
||||
- package-ecosystem: "github-actions"
|
||||
directory: "/"
|
||||
schedule:
|
||||
interval: "daily"
|
||||
|
||||
# Maintain dependencies for Golang
|
||||
- package-ecosystem: "gomod"
|
||||
directory: "/"
|
||||
schedule:
|
||||
interval: "daily"
|
15
.github/generate.sh
vendored
15
.github/generate.sh
vendored
@@ -1,15 +0,0 @@
|
||||
#!/bin/bash -e
|
||||
|
||||
find . -type f -name '*.pb.*.go' -o -name '*.pb.go' -a ! -name 'message.pb.go' -delete
|
||||
PROTOS=$(find . -type f -name '*.proto' | grep -v proto/google/api)
|
||||
|
||||
mkdir -p proto/google/api
|
||||
curl -s -o proto/google/api/annotations.proto -L https://raw.githubusercontent.com/googleapis/googleapis/master/google/api/annotations.proto
|
||||
curl -s -o proto/google/api/http.proto -L https://raw.githubusercontent.com/googleapis/googleapis/master/google/api/http.proto
|
||||
|
||||
for PROTO in $PROTOS; do
|
||||
echo $PROTO
|
||||
protoc -I./proto -I. -I$(dirname $PROTO) --go-grpc_out=paths=source_relative:. --go_out=paths=source_relative:. --micro_out=paths=source_relative:. $PROTO
|
||||
done
|
||||
|
||||
rm -r proto
|
20
.github/renovate.json
vendored
20
.github/renovate.json
vendored
@@ -1,20 +0,0 @@
|
||||
{
|
||||
"extends": [
|
||||
"config:base"
|
||||
],
|
||||
"postUpdateOptions": ["gomodTidy"],
|
||||
"packageRules": [
|
||||
{
|
||||
"matchUpdateTypes": ["minor", "patch", "pin", "digest"],
|
||||
"automerge": true
|
||||
},
|
||||
{
|
||||
"groupName": "all deps",
|
||||
"separateMajorMinor": true,
|
||||
"groupSlug": "all",
|
||||
"packagePatterns": [
|
||||
"*"
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
13
.github/stale.sh
vendored
13
.github/stale.sh
vendored
@@ -1,13 +0,0 @@
|
||||
#!/bin/bash -ex
|
||||
|
||||
export PATH=$PATH:$(pwd)/bin
|
||||
export GO111MODULE=on
|
||||
export GOBIN=$(pwd)/bin
|
||||
|
||||
#go get github.com/rvflash/goup@v0.4.1
|
||||
|
||||
#goup -v ./...
|
||||
#go get github.com/psampaz/go-mod-outdated@v0.6.0
|
||||
go list -u -m -mod=mod -json all | go-mod-outdated -update -direct -ci || true
|
||||
|
||||
#go list -u -m -json all | go-mod-outdated -update
|
2
.github/workflows/build.yml
vendored
2
.github/workflows/build.yml
vendored
@@ -53,7 +53,7 @@ jobs:
|
||||
continue-on-error: true
|
||||
with:
|
||||
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
|
||||
version: v1.30
|
||||
version: v1.39
|
||||
# Optional: working directory, useful for monorepos
|
||||
# working-directory: somedir
|
||||
# Optional: golangci-lint command line arguments.
|
||||
|
2
.github/workflows/pr.yml
vendored
2
.github/workflows/pr.yml
vendored
@@ -53,7 +53,7 @@ jobs:
|
||||
continue-on-error: true
|
||||
with:
|
||||
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
|
||||
version: v1.30
|
||||
version: v1.39
|
||||
# Optional: working directory, useful for monorepos
|
||||
# working-directory: somedir
|
||||
# Optional: golangci-lint command line arguments.
|
||||
|
@@ -1,30 +1,44 @@
|
||||
run:
|
||||
concurrency: 4
|
||||
deadline: 5m
|
||||
modules-download-mode: readonly
|
||||
skip-files:
|
||||
- ".*\\.pb\\.go$"
|
||||
- ".*\\.pb\\.micro\\.go$"
|
||||
issues-exit-code: 1
|
||||
tests: true
|
||||
|
||||
linters-settings:
|
||||
govet:
|
||||
check-shadowing: true
|
||||
enable:
|
||||
- fieldalignment
|
||||
|
||||
linters:
|
||||
disable-all: false
|
||||
enable-all: false
|
||||
enable:
|
||||
- megacheck
|
||||
- staticcheck
|
||||
- deadcode
|
||||
- varcheck
|
||||
- gosimple
|
||||
- unused
|
||||
- prealloc
|
||||
- scopelint
|
||||
- gocritic
|
||||
- goimports
|
||||
- unconvert
|
||||
- govet
|
||||
- nakedret
|
||||
- deadcode
|
||||
- errcheck
|
||||
- govet
|
||||
- ineffassign
|
||||
- staticcheck
|
||||
- structcheck
|
||||
- gosec
|
||||
disable:
|
||||
- maligned
|
||||
- interfacer
|
||||
- typecheck
|
||||
- dupl
|
||||
- unused
|
||||
- varcheck
|
||||
- bodyclose
|
||||
- gci
|
||||
- goconst
|
||||
- gocritic
|
||||
- gosimple
|
||||
- gofmt
|
||||
- gofumpt
|
||||
- goimports
|
||||
- golint
|
||||
- gosec
|
||||
- makezero
|
||||
- misspell
|
||||
- nakedret
|
||||
- nestif
|
||||
- nilerr
|
||||
- noctx
|
||||
- prealloc
|
||||
- unconvert
|
||||
- unparam
|
||||
disable-all: false
|
||||
|
@@ -149,5 +149,4 @@ func TestValidate(t *testing.T) {
|
||||
if err := Validate(epPcreInvalid); err == nil {
|
||||
t.Fatalf("invalid pcre %v", epPcreInvalid.Path[0])
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -6,10 +6,8 @@ import (
|
||||
"github.com/unistack-org/micro/v3/logger"
|
||||
)
|
||||
|
||||
var (
|
||||
// DefaultMaxRecvSize specifies max recv size for handler
|
||||
DefaultMaxRecvSize int64 = 1024 * 1024 * 100 // 10Mb
|
||||
)
|
||||
// DefaultMaxRecvSize specifies max recv size for handler
|
||||
var DefaultMaxRecvSize int64 = 1024 * 1024 * 100 // 10Mb
|
||||
|
||||
// Options struct holds handler options
|
||||
type Options struct {
|
||||
|
@@ -15,12 +15,12 @@ import (
|
||||
// NewResolver creates new subdomain api resolver
|
||||
func NewResolver(parent resolver.Resolver, opts ...resolver.Option) resolver.Resolver {
|
||||
options := resolver.NewOptions(opts...)
|
||||
return &subdomainResolver{options, parent}
|
||||
return &subdomainResolver{opts: options, Resolver: parent}
|
||||
}
|
||||
|
||||
type subdomainResolver struct {
|
||||
opts resolver.Options
|
||||
resolver.Resolver
|
||||
opts resolver.Options
|
||||
}
|
||||
|
||||
// Resolve resolve endpoint based on subdomain
|
||||
|
@@ -19,9 +19,7 @@ type vpathResolver struct {
|
||||
opts resolver.Options
|
||||
}
|
||||
|
||||
var (
|
||||
re = regexp.MustCompile("^v[0-9]+$")
|
||||
)
|
||||
var re = regexp.MustCompile("^v[0-9]+$")
|
||||
|
||||
// Resolve endpoint
|
||||
func (r *vpathResolver) Resolve(req *http.Request, opts ...resolver.ResolveOption) (*resolver.Endpoint, error) {
|
||||
|
@@ -7,10 +7,8 @@ import (
|
||||
"github.com/unistack-org/micro/v3/api"
|
||||
)
|
||||
|
||||
var (
|
||||
// DefaultRouter contains default router implementation
|
||||
DefaultRouter Router
|
||||
)
|
||||
// DefaultRouter contains default router implementation
|
||||
var DefaultRouter Router
|
||||
|
||||
// Router is used to determine an endpoint for a request
|
||||
type Router interface {
|
||||
|
@@ -24,11 +24,11 @@ func TestVerify(t *testing.T) {
|
||||
}
|
||||
|
||||
tt := []struct {
|
||||
Name string
|
||||
Rules []*Rule
|
||||
Error error
|
||||
Account *Account
|
||||
Resource *Resource
|
||||
Error error
|
||||
Name string
|
||||
Rules []*Rule
|
||||
}{
|
||||
{
|
||||
Name: "NoRules",
|
||||
|
@@ -7,10 +7,8 @@ import (
|
||||
"github.com/unistack-org/micro/v3/metadata"
|
||||
)
|
||||
|
||||
var (
|
||||
// DefaultBroker default broker
|
||||
DefaultBroker Broker = NewBroker()
|
||||
)
|
||||
// DefaultBroker default broker
|
||||
var DefaultBroker Broker = NewBroker()
|
||||
|
||||
// Broker is an interface used for asynchronous messaging.
|
||||
type Broker interface {
|
||||
|
@@ -13,27 +13,27 @@ import (
|
||||
)
|
||||
|
||||
type memoryBroker struct {
|
||||
opts Options
|
||||
Subscribers map[string][]*memorySubscriber
|
||||
addr string
|
||||
opts Options
|
||||
sync.RWMutex
|
||||
connected bool
|
||||
}
|
||||
|
||||
type memoryEvent struct {
|
||||
opts Options
|
||||
err error
|
||||
message interface{}
|
||||
topic string
|
||||
opts Options
|
||||
}
|
||||
|
||||
type memorySubscriber struct {
|
||||
opts SubscribeOptions
|
||||
ctx context.Context
|
||||
exit chan bool
|
||||
handler Handler
|
||||
id string
|
||||
topic string
|
||||
opts SubscribeOptions
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Options() Options {
|
||||
|
@@ -9,16 +9,10 @@ import (
|
||||
"github.com/unistack-org/micro/v3/metadata"
|
||||
)
|
||||
|
||||
var (
|
||||
// DefaultCodecs will be used to encode/decode data
|
||||
DefaultCodecs = map[string]codec.Codec{
|
||||
//"application/json": cjson.NewCodec,
|
||||
//"application/json-rpc": cjsonrpc.NewCodec,
|
||||
//"application/protobuf": cproto.NewCodec,
|
||||
//"application/proto-rpc": cprotorpc.NewCodec,
|
||||
"application/octet-stream": codec.NewCodec(),
|
||||
}
|
||||
)
|
||||
// DefaultCodecs will be used to encode/decode data
|
||||
var DefaultCodecs = map[string]codec.Codec{
|
||||
"application/octet-stream": codec.NewCodec(),
|
||||
}
|
||||
|
||||
type noopClient struct {
|
||||
opts Options
|
||||
|
@@ -19,8 +19,8 @@ import (
|
||||
|
||||
// Options holds client options
|
||||
type Options struct {
|
||||
// CallOptions contains default CallOptions
|
||||
CallOptions CallOptions
|
||||
// Selector used to select needed address
|
||||
Selector selector.Selector
|
||||
// Logger used to log messages
|
||||
Logger logger.Logger
|
||||
// Tracer used for tracing
|
||||
@@ -31,30 +31,30 @@ type Options struct {
|
||||
Meter meter.Meter
|
||||
// Router used to get route
|
||||
Router router.Router
|
||||
// Selector used to select needed address
|
||||
Selector selector.Selector
|
||||
// Transport used for transfer messages
|
||||
Transport transport.Transport
|
||||
// Context is used for external options
|
||||
Context context.Context
|
||||
// Codecs map
|
||||
Codecs map[string]codec.Codec
|
||||
// Lookup func used to get destination addr
|
||||
Lookup LookupFunc
|
||||
// Codecs map
|
||||
Codecs map[string]codec.Codec
|
||||
// TLSConfig specifies tls.Config for secure connection
|
||||
TLSConfig *tls.Config
|
||||
// Proxy is used for proxy requests
|
||||
Proxy string
|
||||
// ContentType is Used to select codec
|
||||
// ContentType is used to select codec
|
||||
ContentType string
|
||||
// Name is the client name
|
||||
Name string
|
||||
// Wrappers contains wrappers
|
||||
Wrappers []Wrapper
|
||||
// CallOptions contains default CallOptions
|
||||
CallOptions CallOptions
|
||||
// PoolSize connection pool size
|
||||
PoolSize int
|
||||
// PoolTTL connection pool ttl
|
||||
PoolTTL time.Duration
|
||||
// TLSConfig specifies tls.Config for secure connection
|
||||
TLSConfig *tls.Config
|
||||
}
|
||||
|
||||
// NewCallOptions creates new call options struct
|
||||
@@ -80,6 +80,8 @@ type CallOptions struct {
|
||||
Backoff BackoffFunc
|
||||
// Network name
|
||||
Network string
|
||||
// Content-Type
|
||||
ContentType string
|
||||
// CallWrappers call wrappers
|
||||
CallWrappers []CallWrapper
|
||||
// SelectOptions selector options
|
||||
@@ -116,6 +118,8 @@ func NewPublishOptions(opts ...PublishOption) PublishOptions {
|
||||
|
||||
// PublishOptions holds publish options
|
||||
type PublishOptions struct {
|
||||
// BodyOnly will publish only message body
|
||||
BodyOnly bool
|
||||
// Context used for external options
|
||||
Context context.Context
|
||||
// Exchange topic exchange name
|
||||
@@ -375,6 +379,13 @@ func WithExchange(e string) PublishOption {
|
||||
}
|
||||
}
|
||||
|
||||
// WithBodyOnly publish only message body
|
||||
func WithBodyOnly(b bool) PublishOption {
|
||||
return func(o *PublishOptions) {
|
||||
o.BodyOnly = b
|
||||
}
|
||||
}
|
||||
|
||||
// PublishContext sets the context in publish options
|
||||
func PublishContext(ctx context.Context) PublishOption {
|
||||
return func(o *PublishOptions) {
|
||||
@@ -382,6 +393,13 @@ func PublishContext(ctx context.Context) PublishOption {
|
||||
}
|
||||
}
|
||||
|
||||
// WithContentType specifies call content type
|
||||
func WithContentType(ct string) CallOption {
|
||||
return func(o *CallOptions) {
|
||||
o.ContentType = ct
|
||||
}
|
||||
}
|
||||
|
||||
// WithAddress sets the remote addresses to use rather than using service discovery
|
||||
func WithAddress(a ...string) CallOption {
|
||||
return func(o *CallOptions) {
|
||||
@@ -486,16 +504,16 @@ func WithMessageContentType(ct string) MessageOption {
|
||||
}
|
||||
}
|
||||
|
||||
// WithContentType specifies request content type
|
||||
func WithContentType(ct string) RequestOption {
|
||||
// StreamingRequest specifies that request is streaming
|
||||
func StreamingRequest(b bool) RequestOption {
|
||||
return func(o *RequestOptions) {
|
||||
o.Stream = b
|
||||
}
|
||||
}
|
||||
|
||||
// RequestContentType specifies request content type
|
||||
func RequestContentType(ct string) RequestOption {
|
||||
return func(o *RequestOptions) {
|
||||
o.ContentType = ct
|
||||
}
|
||||
}
|
||||
|
||||
// StreamingRequest specifies that request is streaming
|
||||
func StreamingRequest() RequestOption {
|
||||
return func(o *RequestOptions) {
|
||||
o.Stream = true
|
||||
}
|
||||
}
|
||||
|
@@ -5,13 +5,13 @@ import (
|
||||
)
|
||||
|
||||
type testRequest struct {
|
||||
opts RequestOptions
|
||||
codec codec.Codec
|
||||
body interface{}
|
||||
service string
|
||||
method string
|
||||
endpoint string
|
||||
contentType string
|
||||
service string
|
||||
opts RequestOptions
|
||||
}
|
||||
|
||||
func (r *testRequest) ContentType() string {
|
||||
|
@@ -28,6 +28,8 @@ var (
|
||||
DefaultMaxMsgSize int = 1024 * 1024 * 4 // 4Mb
|
||||
// DefaultCodec is the global default codec
|
||||
DefaultCodec Codec = NewCodec()
|
||||
// DefaultTagName specifies struct tag name to control codec Marshal/Unmarshal
|
||||
DefaultTagName = "codec"
|
||||
)
|
||||
|
||||
// MessageType specifies message type for codec
|
||||
|
@@ -5,8 +5,7 @@ import (
|
||||
"io"
|
||||
)
|
||||
|
||||
type noopCodec struct {
|
||||
}
|
||||
type noopCodec struct{}
|
||||
|
||||
func (c *noopCodec) ReadHeader(conn io.Reader, m *Message, t MessageType) error {
|
||||
return nil
|
||||
|
@@ -6,10 +6,8 @@ import (
|
||||
"errors"
|
||||
)
|
||||
|
||||
var (
|
||||
// DefaultConfig default config
|
||||
DefaultConfig Config = NewConfig()
|
||||
)
|
||||
// DefaultConfig default config
|
||||
var DefaultConfig Config = NewConfig()
|
||||
|
||||
var (
|
||||
// ErrCodecMissing is returned when codec needed and not specified
|
||||
@@ -37,10 +35,10 @@ type Config interface {
|
||||
}
|
||||
|
||||
// Watcher is the config watcher
|
||||
//type Watcher interface {
|
||||
// type Watcher interface {
|
||||
// Next() (, error)
|
||||
// Stop() error
|
||||
//}
|
||||
// }
|
||||
|
||||
// Load loads config from config sources
|
||||
func Load(ctx context.Context, cs ...Config) error {
|
||||
|
@@ -35,7 +35,7 @@ func (c *defaultConfig) Load(ctx context.Context) error {
|
||||
src, err := rutil.Zero(c.opts.Struct)
|
||||
if err == nil {
|
||||
valueOf := reflect.ValueOf(src)
|
||||
if err = c.fillValues(ctx, valueOf); err == nil {
|
||||
if err = c.fillValues(valueOf); err == nil {
|
||||
err = mergo.Merge(c.opts.Struct, src, mergo.WithOverride, mergo.WithTypeCheck, mergo.WithAppendSlice)
|
||||
}
|
||||
}
|
||||
@@ -54,7 +54,7 @@ func (c *defaultConfig) Load(ctx context.Context) error {
|
||||
}
|
||||
|
||||
//nolint:gocyclo
|
||||
func (c *defaultConfig) fillValue(ctx context.Context, value reflect.Value, val string) error {
|
||||
func (c *defaultConfig) fillValue(value reflect.Value, val string) error {
|
||||
if !rutil.IsEmpty(value) {
|
||||
return nil
|
||||
}
|
||||
@@ -71,10 +71,10 @@ func (c *defaultConfig) fillValue(ctx context.Context, value reflect.Value, val
|
||||
kv := strings.FieldsFunc(nval, func(c rune) bool { return c == '=' })
|
||||
mkey := reflect.Indirect(reflect.New(kt))
|
||||
mval := reflect.Indirect(reflect.New(et))
|
||||
if err := c.fillValue(ctx, mkey, kv[0]); err != nil {
|
||||
if err := c.fillValue(mkey, kv[0]); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.fillValue(ctx, mval, kv[1]); err != nil {
|
||||
if err := c.fillValue(mval, kv[1]); err != nil {
|
||||
return err
|
||||
}
|
||||
value.SetMapIndex(mkey, mval)
|
||||
@@ -84,7 +84,7 @@ func (c *defaultConfig) fillValue(ctx context.Context, value reflect.Value, val
|
||||
value.Set(reflect.MakeSlice(reflect.SliceOf(value.Type().Elem()), len(nvals), len(nvals)))
|
||||
for idx, nval := range nvals {
|
||||
nvalue := reflect.Indirect(reflect.New(value.Type().Elem()))
|
||||
if err := c.fillValue(ctx, nvalue, nval); err != nil {
|
||||
if err := c.fillValue(nvalue, nval); err != nil {
|
||||
return err
|
||||
}
|
||||
value.Index(idx).Set(nvalue)
|
||||
@@ -173,7 +173,7 @@ func (c *defaultConfig) fillValue(ctx context.Context, value reflect.Value, val
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *defaultConfig) fillValues(ctx context.Context, valueOf reflect.Value) error {
|
||||
func (c *defaultConfig) fillValues(valueOf reflect.Value) error {
|
||||
var values reflect.Value
|
||||
|
||||
if valueOf.Kind() == reflect.Ptr {
|
||||
@@ -200,7 +200,7 @@ func (c *defaultConfig) fillValues(ctx context.Context, valueOf reflect.Value) e
|
||||
switch value.Kind() {
|
||||
case reflect.Struct:
|
||||
value.Set(reflect.Indirect(reflect.New(value.Type())))
|
||||
if err := c.fillValues(ctx, value); err != nil {
|
||||
if err := c.fillValues(value); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
@@ -214,7 +214,7 @@ func (c *defaultConfig) fillValues(ctx context.Context, valueOf reflect.Value) e
|
||||
value.Set(reflect.New(value.Type().Elem()))
|
||||
}
|
||||
value = value.Elem()
|
||||
if err := c.fillValues(ctx, value); err != nil {
|
||||
if err := c.fillValues(value); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
@@ -224,7 +224,7 @@ func (c *defaultConfig) fillValues(ctx context.Context, valueOf reflect.Value) e
|
||||
continue
|
||||
}
|
||||
|
||||
if err := c.fillValue(ctx, value, tag); err != nil {
|
||||
if err := c.fillValue(value, tag); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@@ -10,30 +10,30 @@ import (
|
||||
|
||||
type Cfg struct {
|
||||
StringValue string `default:"string_value"`
|
||||
IntValue int `default:"99"`
|
||||
IgnoreValue string `json:"-"`
|
||||
StructValue struct {
|
||||
StringValue string `default:"string_value"`
|
||||
}
|
||||
IntValue int `default:"99"`
|
||||
}
|
||||
|
||||
func TestDefault(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
conf := &Cfg{IntValue: 10}
|
||||
blfn := func(ctx context.Context, cfg config.Config) error {
|
||||
conf, ok := cfg.Options().Struct.(*Cfg)
|
||||
nconf, ok := cfg.Options().Struct.(*Cfg)
|
||||
if !ok {
|
||||
return fmt.Errorf("failed to get Struct from options: %v", cfg.Options())
|
||||
}
|
||||
conf.StringValue = "before_load"
|
||||
nconf.StringValue = "before_load"
|
||||
return nil
|
||||
}
|
||||
alfn := func(ctx context.Context, cfg config.Config) error {
|
||||
conf, ok := cfg.Options().Struct.(*Cfg)
|
||||
nconf, ok := cfg.Options().Struct.(*Cfg)
|
||||
if !ok {
|
||||
return fmt.Errorf("failed to get Struct from options: %v", cfg.Options())
|
||||
}
|
||||
conf.StringValue = "after_load"
|
||||
nconf.StringValue = "after_load"
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@@ -7,9 +7,9 @@ import (
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Value string
|
||||
SubConfig *SubConfig
|
||||
Config *Config
|
||||
Value string
|
||||
}
|
||||
|
||||
type SubConfig struct {
|
||||
|
@@ -17,7 +17,6 @@ func TestFromError(t *testing.T) {
|
||||
if merr.Id != "go.micro.test" || merr.Code != 404 {
|
||||
t.Fatalf("invalid conversation %v != %v", err, merr)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestEqual(t *testing.T) {
|
||||
@@ -32,7 +31,6 @@ func TestEqual(t *testing.T) {
|
||||
if Equal(err1, err3) {
|
||||
t.Fatal("errors must be not equal")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestErrors(t *testing.T) {
|
||||
|
@@ -44,10 +44,10 @@ func TestDag(t *testing.T) {
|
||||
var steps [][]string
|
||||
fn := func(n dag.Vertex, idx int) error {
|
||||
if idx == 0 {
|
||||
steps = make([][]string, 1, 1)
|
||||
steps = make([][]string, 1)
|
||||
steps[0] = make([]string, 0, 1)
|
||||
} else if idx >= len(steps) {
|
||||
tsteps := make([][]string, idx+1, idx+1)
|
||||
tsteps := make([][]string, idx+1)
|
||||
copy(tsteps, steps)
|
||||
steps = tsteps
|
||||
steps[idx] = make([]string, 0, 1)
|
||||
|
14
flow/flow.go
14
flow/flow.go
@@ -1,7 +1,17 @@
|
||||
// Package flow is an interface used for saga pattern messaging
|
||||
// Package flow is an interface used for saga pattern microservice workflow
|
||||
package flow
|
||||
|
||||
type Step interface {
|
||||
// Endpoint returns service_name.service_method
|
||||
// Endpoint returns rpc endpoint service_name.service_method or broker topic
|
||||
Endpoint() string
|
||||
}
|
||||
|
||||
type Workflow interface {
|
||||
Steps() [][]Step
|
||||
Stop() error
|
||||
}
|
||||
|
||||
type Flow interface {
|
||||
Start(Workflow) error
|
||||
Stop(Workflow)
|
||||
}
|
||||
|
@@ -1,3 +0,0 @@
|
||||
package micro
|
||||
|
||||
//go:generate ./.github/generate.sh
|
2
go.mod
2
go.mod
@@ -9,5 +9,5 @@ require (
|
||||
github.com/imdario/mergo v0.3.12
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34
|
||||
golang.org/x/net v0.0.0-20210326220855-61e056675ecf
|
||||
golang.org/x/net v0.0.0-20210510120150-4163338589ed
|
||||
)
|
||||
|
7
go.sum
7
go.sum
@@ -10,11 +10,12 @@ github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaR
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
|
||||
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34 h1:vBfVmA5mZhsQa2jr1FOL9nfA37N/jnbBmi5XUfviVTI=
|
||||
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
|
||||
golang.org/x/net v0.0.0-20210326220855-61e056675ecf/go.mod h1:uSPa2vr4CLtc/ILN5odXGNXS6mhrKVzTaCXzk9m6W3k=
|
||||
golang.org/x/net v0.0.0-20210510120150-4163338589ed h1:p9UgmWI9wKpfYmgaV/IZKGdXc5qEK45tDwwwDyjS26I=
|
||||
golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210324051608-47abb6519492/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
|
||||
|
@@ -21,8 +21,8 @@ func init() {
|
||||
}
|
||||
|
||||
type defaultLogger struct {
|
||||
opts Options
|
||||
enc *json.Encoder
|
||||
opts Options
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
|
@@ -76,7 +76,7 @@ func WithContext(ctx context.Context) Option {
|
||||
}
|
||||
|
||||
// WithName sets the name
|
||||
func withName(n string) Option {
|
||||
func WithName(n string) Option {
|
||||
return func(o *Options) {
|
||||
o.Name = n
|
||||
}
|
||||
|
@@ -70,21 +70,23 @@ type lWrapper struct {
|
||||
opts Options
|
||||
}
|
||||
|
||||
type ClientCallObserver func(context.Context, client.Request, interface{}, []client.CallOption, error) []string
|
||||
type ClientStreamObserver func(context.Context, client.Request, []client.CallOption, client.Stream, error) []string
|
||||
type ClientPublishObserver func(context.Context, client.Message, []client.PublishOption, error) []string
|
||||
type ClientCallFuncObserver func(context.Context, string, client.Request, interface{}, client.CallOptions, error) []string
|
||||
type ServerHandlerObserver func(context.Context, server.Request, interface{}, error) []string
|
||||
type ServerSubscriberObserver func(context.Context, server.Message, error) []string
|
||||
type (
|
||||
ClientCallObserver func(context.Context, client.Request, interface{}, []client.CallOption, error) []string
|
||||
ClientStreamObserver func(context.Context, client.Request, []client.CallOption, client.Stream, error) []string
|
||||
ClientPublishObserver func(context.Context, client.Message, []client.PublishOption, error) []string
|
||||
ClientCallFuncObserver func(context.Context, string, client.Request, interface{}, client.CallOptions, error) []string
|
||||
ServerHandlerObserver func(context.Context, server.Request, interface{}, error) []string
|
||||
ServerSubscriberObserver func(context.Context, server.Message, error) []string
|
||||
)
|
||||
|
||||
// Options struct for wrapper
|
||||
type Options struct {
|
||||
// Logger that used for log
|
||||
Logger logger.Logger
|
||||
// Level for logger
|
||||
Level logger.Level
|
||||
// Enabled flag
|
||||
Enabled bool
|
||||
// ServerHandlerObservers funcs
|
||||
ServerHandlerObservers []ServerHandlerObserver
|
||||
// ServerSubscriberObservers funcs
|
||||
ServerSubscriberObservers []ServerSubscriberObserver
|
||||
// ClientCallObservers funcs
|
||||
ClientCallObservers []ClientCallObserver
|
||||
// ClientStreamObservers funcs
|
||||
@@ -93,12 +95,12 @@ type Options struct {
|
||||
ClientPublishObservers []ClientPublishObserver
|
||||
// ClientCallFuncObservers funcs
|
||||
ClientCallFuncObservers []ClientCallFuncObserver
|
||||
// ServerHandlerObservers funcs
|
||||
ServerHandlerObservers []ServerHandlerObserver
|
||||
// ServerSubscriberObservers funcs
|
||||
ServerSubscriberObservers []ServerSubscriberObserver
|
||||
// SkipEndpoints
|
||||
SkipEndpoints []string
|
||||
// Level for logger
|
||||
Level logger.Level
|
||||
// Enabled flag
|
||||
Enabled bool
|
||||
}
|
||||
|
||||
// Option func signature
|
||||
@@ -213,7 +215,7 @@ func (l *lWrapper) Call(ctx context.Context, req client.Request, rsp interface{}
|
||||
for _, o := range l.opts.ClientCallObservers {
|
||||
labels = append(labels, o(ctx, req, rsp, opts, err)...)
|
||||
}
|
||||
fields := make(map[string]interface{}, int(len(labels)/2))
|
||||
fields := make(map[string]interface{}, len(labels)/2)
|
||||
for i := 0; i < len(labels); i += 2 {
|
||||
fields[labels[i]] = labels[i+1]
|
||||
}
|
||||
@@ -240,7 +242,7 @@ func (l *lWrapper) Stream(ctx context.Context, req client.Request, opts ...clien
|
||||
for _, o := range l.opts.ClientStreamObservers {
|
||||
labels = append(labels, o(ctx, req, opts, stream, err)...)
|
||||
}
|
||||
fields := make(map[string]interface{}, int(len(labels)/2))
|
||||
fields := make(map[string]interface{}, len(labels)/2)
|
||||
for i := 0; i < len(labels); i += 2 {
|
||||
fields[labels[i]] = labels[i+1]
|
||||
}
|
||||
@@ -267,7 +269,7 @@ func (l *lWrapper) Publish(ctx context.Context, msg client.Message, opts ...clie
|
||||
for _, o := range l.opts.ClientPublishObservers {
|
||||
labels = append(labels, o(ctx, msg, opts, err)...)
|
||||
}
|
||||
fields := make(map[string]interface{}, int(len(labels)/2))
|
||||
fields := make(map[string]interface{}, len(labels)/2)
|
||||
for i := 0; i < len(labels); i += 2 {
|
||||
fields[labels[i]] = labels[i+1]
|
||||
}
|
||||
@@ -294,7 +296,7 @@ func (l *lWrapper) ServerHandler(ctx context.Context, req server.Request, rsp in
|
||||
for _, o := range l.opts.ServerHandlerObservers {
|
||||
labels = append(labels, o(ctx, req, rsp, err)...)
|
||||
}
|
||||
fields := make(map[string]interface{}, int(len(labels)/2))
|
||||
fields := make(map[string]interface{}, len(labels)/2)
|
||||
for i := 0; i < len(labels); i += 2 {
|
||||
fields[labels[i]] = labels[i+1]
|
||||
}
|
||||
@@ -321,7 +323,7 @@ func (l *lWrapper) ServerSubscriber(ctx context.Context, msg server.Message) err
|
||||
for _, o := range l.opts.ServerSubscriberObservers {
|
||||
labels = append(labels, o(ctx, msg, err)...)
|
||||
}
|
||||
fields := make(map[string]interface{}, int(len(labels)/2))
|
||||
fields := make(map[string]interface{}, len(labels)/2)
|
||||
for i := 0; i < len(labels); i += 2 {
|
||||
fields[labels[i]] = labels[i+1]
|
||||
}
|
||||
@@ -372,7 +374,7 @@ func (l *lWrapper) ClientCallFunc(ctx context.Context, addr string, req client.R
|
||||
for _, o := range l.opts.ClientCallFuncObservers {
|
||||
labels = append(labels, o(ctx, addr, req, rsp, opts, err)...)
|
||||
}
|
||||
fields := make(map[string]interface{}, int(len(labels)/2))
|
||||
fields := make(map[string]interface{}, len(labels)/2)
|
||||
for i := 0; i < len(labels); i += 2 {
|
||||
fields[labels[i]] = labels[i+1]
|
||||
}
|
||||
|
@@ -5,9 +5,11 @@ import (
|
||||
"context"
|
||||
)
|
||||
|
||||
type mdIncomingKey struct{}
|
||||
type mdOutgoingKey struct{}
|
||||
type mdKey struct{}
|
||||
type (
|
||||
mdIncomingKey struct{}
|
||||
mdOutgoingKey struct{}
|
||||
mdKey struct{}
|
||||
)
|
||||
|
||||
// FromIncomingContext returns metadata from incoming ctx
|
||||
// returned metadata shoud not be modified or race condition happens
|
||||
|
@@ -6,10 +6,8 @@ import (
|
||||
"sort"
|
||||
)
|
||||
|
||||
var (
|
||||
// HeaderPrefix for all headers passed
|
||||
HeaderPrefix = "Micro-"
|
||||
)
|
||||
// HeaderPrefix for all headers passed
|
||||
var HeaderPrefix = "Micro-"
|
||||
|
||||
// Metadata is our way of representing request headers internally.
|
||||
// They're used at the RPC level and translate back and forth
|
||||
@@ -20,10 +18,8 @@ type rawMetadata struct {
|
||||
md Metadata
|
||||
}
|
||||
|
||||
var (
|
||||
// defaultMetadataSize used when need to init new Metadata
|
||||
defaultMetadataSize = 2
|
||||
)
|
||||
// defaultMetadataSize used when need to init new Metadata
|
||||
var defaultMetadataSize = 2
|
||||
|
||||
// Iterator used to iterate over metadata with order
|
||||
type Iterator struct {
|
||||
|
@@ -76,7 +76,7 @@ func TestIterator(t *testing.T) {
|
||||
var k, v string
|
||||
|
||||
for iter.Next(&k, &v) {
|
||||
//fmt.Printf("k: %s, v: %s\n", k, v)
|
||||
// fmt.Printf("k: %s, v: %s\n", k, v)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -102,7 +102,6 @@ func TestMedataCanonicalKey(t *testing.T) {
|
||||
} else if v != "12345" {
|
||||
t.Fatalf("invalid metadata value: %s != %s", "12345", v)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestMetadataSet(t *testing.T) {
|
||||
@@ -130,7 +129,6 @@ func TestMetadataDelete(t *testing.T) {
|
||||
if ok {
|
||||
t.Fatal("key Baz not deleted")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestNilContext(t *testing.T) {
|
||||
|
@@ -1,31 +1,64 @@
|
||||
package pb
|
||||
package handler
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
|
||||
"github.com/unistack-org/micro/v3/codec"
|
||||
"github.com/unistack-org/micro/v3/errors"
|
||||
"github.com/unistack-org/micro/v3/meter"
|
||||
)
|
||||
|
||||
var (
|
||||
// guard to fail early
|
||||
_ MeterServer = &handler{}
|
||||
)
|
||||
// guard to fail early
|
||||
var _ MeterServer = &Handler{}
|
||||
|
||||
type handler struct {
|
||||
meter meter.Meter
|
||||
opts []meter.Option
|
||||
type Handler struct {
|
||||
opts Options
|
||||
}
|
||||
|
||||
func NewHandler(meter meter.Meter, opts ...meter.Option) *handler {
|
||||
return &handler{meter: meter, opts: opts}
|
||||
type Option func(*Options)
|
||||
|
||||
type Options struct {
|
||||
Meter meter.Meter
|
||||
Name string
|
||||
MeterOptions []meter.Option
|
||||
}
|
||||
|
||||
func (h *handler) Metrics(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
|
||||
func Meter(m meter.Meter) Option {
|
||||
return func(o *Options) {
|
||||
o.Meter = m
|
||||
}
|
||||
}
|
||||
|
||||
func Name(name string) Option {
|
||||
return func(o *Options) {
|
||||
o.Name = name
|
||||
}
|
||||
}
|
||||
|
||||
func MeterOptions(opts ...meter.Option) Option {
|
||||
return func(o *Options) {
|
||||
o.MeterOptions = append(o.MeterOptions, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
func NewOptions(opts ...Option) Options {
|
||||
options := Options{Meter: meter.DefaultMeter}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
return options
|
||||
}
|
||||
|
||||
func NewHandler(opts ...Option) *Handler {
|
||||
options := NewOptions(opts...)
|
||||
return &Handler{opts: options}
|
||||
}
|
||||
|
||||
func (h *Handler) Metrics(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
|
||||
buf := bytes.NewBuffer(nil)
|
||||
if err := h.meter.Write(buf, h.opts...); err != nil {
|
||||
return err
|
||||
if err := h.opts.Meter.Write(buf, h.opts.MeterOptions...); err != nil {
|
||||
return errors.InternalServerError(h.opts.Name, "%v", err)
|
||||
}
|
||||
|
||||
rsp.Data = buf.Bytes()
|
||||
|
@@ -29,13 +29,13 @@ var (
|
||||
type Meter interface {
|
||||
Name() string
|
||||
Init(opts ...Option) error
|
||||
Counter(name string, opts ...Option) Counter
|
||||
FloatCounter(name string, opts ...Option) FloatCounter
|
||||
Gauge(name string, fn func() float64, opts ...Option) Gauge
|
||||
Counter(name string, labels ...string) Counter
|
||||
FloatCounter(name string, labels ...string) FloatCounter
|
||||
Gauge(name string, fn func() float64, labels ...string) Gauge
|
||||
Set(opts ...Option) Meter
|
||||
Histogram(name string, opts ...Option) Histogram
|
||||
Summary(name string, opts ...Option) Summary
|
||||
SummaryExt(name string, window time.Duration, quantiles []float64, opts ...Option) Summary
|
||||
Histogram(name string, labels ...string) Histogram
|
||||
Summary(name string, labels ...string) Summary
|
||||
SummaryExt(name string, window time.Duration, quantiles []float64, labels ...string) Summary
|
||||
Write(w io.Writer, opts ...Option) error
|
||||
Options() Options
|
||||
String() string
|
||||
|
@@ -10,7 +10,7 @@ func TestNoopMeter(t *testing.T) {
|
||||
t.Fatalf("invalid options parsing: %v", m.Options())
|
||||
}
|
||||
|
||||
cnt := m.Counter("counter", Labels("server", "noop"))
|
||||
cnt := m.Counter("counter", "server", "noop")
|
||||
cnt.Inc()
|
||||
}
|
||||
|
||||
|
@@ -28,57 +28,33 @@ func (r *noopMeter) Init(opts ...Option) error {
|
||||
}
|
||||
|
||||
// Counter implements the Meter interface
|
||||
func (r *noopMeter) Counter(name string, opts ...Option) Counter {
|
||||
options := Options{}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
return &noopCounter{labels: options.Labels}
|
||||
func (r *noopMeter) Counter(name string, labels ...string) Counter {
|
||||
return &noopCounter{labels: labels}
|
||||
}
|
||||
|
||||
// FloatCounter implements the Meter interface
|
||||
func (r *noopMeter) FloatCounter(name string, opts ...Option) FloatCounter {
|
||||
options := Options{}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
return &noopFloatCounter{labels: options.Labels}
|
||||
func (r *noopMeter) FloatCounter(name string, labels ...string) FloatCounter {
|
||||
return &noopFloatCounter{labels: labels}
|
||||
}
|
||||
|
||||
// Gauge implements the Meter interface
|
||||
func (r *noopMeter) Gauge(name string, f func() float64, opts ...Option) Gauge {
|
||||
options := Options{}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
return &noopGauge{labels: options.Labels}
|
||||
func (r *noopMeter) Gauge(name string, f func() float64, labels ...string) Gauge {
|
||||
return &noopGauge{labels: labels}
|
||||
}
|
||||
|
||||
// Summary implements the Meter interface
|
||||
func (r *noopMeter) Summary(name string, opts ...Option) Summary {
|
||||
options := Options{}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
return &noopSummary{labels: options.Labels}
|
||||
func (r *noopMeter) Summary(name string, labels ...string) Summary {
|
||||
return &noopSummary{labels: labels}
|
||||
}
|
||||
|
||||
// SummaryExt implements the Meter interface
|
||||
func (r *noopMeter) SummaryExt(name string, window time.Duration, quantiles []float64, opts ...Option) Summary {
|
||||
options := Options{}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
return &noopSummary{labels: options.Labels}
|
||||
func (r *noopMeter) SummaryExt(name string, window time.Duration, quantiles []float64, labels ...string) Summary {
|
||||
return &noopSummary{labels: labels}
|
||||
}
|
||||
|
||||
// Histogram implements the Meter interface
|
||||
func (r *noopMeter) Histogram(name string, opts ...Option) Histogram {
|
||||
options := Options{}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
return &noopHistogram{labels: options.Labels}
|
||||
func (r *noopMeter) Histogram(name string, labels ...string) Histogram {
|
||||
return &noopHistogram{labels: labels}
|
||||
}
|
||||
|
||||
// Set implements the Meter interface
|
||||
@@ -111,11 +87,9 @@ type noopCounter struct {
|
||||
}
|
||||
|
||||
func (r *noopCounter) Add(int) {
|
||||
|
||||
}
|
||||
|
||||
func (r *noopCounter) Dec() {
|
||||
|
||||
}
|
||||
|
||||
func (r *noopCounter) Get() uint64 {
|
||||
@@ -123,11 +97,9 @@ func (r *noopCounter) Get() uint64 {
|
||||
}
|
||||
|
||||
func (r *noopCounter) Inc() {
|
||||
|
||||
}
|
||||
|
||||
func (r *noopCounter) Set(uint64) {
|
||||
|
||||
}
|
||||
|
||||
type noopFloatCounter struct {
|
||||
@@ -135,7 +107,6 @@ type noopFloatCounter struct {
|
||||
}
|
||||
|
||||
func (r *noopFloatCounter) Add(float64) {
|
||||
|
||||
}
|
||||
|
||||
func (r *noopFloatCounter) Get() float64 {
|
||||
@@ -143,11 +114,9 @@ func (r *noopFloatCounter) Get() float64 {
|
||||
}
|
||||
|
||||
func (r *noopFloatCounter) Set(float64) {
|
||||
|
||||
}
|
||||
|
||||
func (r *noopFloatCounter) Sub(float64) {
|
||||
|
||||
}
|
||||
|
||||
type noopGauge struct {
|
||||
@@ -163,11 +132,9 @@ type noopSummary struct {
|
||||
}
|
||||
|
||||
func (r *noopSummary) Update(float64) {
|
||||
|
||||
}
|
||||
|
||||
func (r *noopSummary) UpdateDuration(time.Time) {
|
||||
|
||||
}
|
||||
|
||||
type noopHistogram struct {
|
||||
@@ -175,15 +142,12 @@ type noopHistogram struct {
|
||||
}
|
||||
|
||||
func (r *noopHistogram) Reset() {
|
||||
|
||||
}
|
||||
|
||||
func (r *noopHistogram) Update(float64) {
|
||||
|
||||
}
|
||||
|
||||
func (r *noopHistogram) UpdateDuration(time.Time) {
|
||||
|
||||
}
|
||||
|
||||
//func (r *noopHistogram) VisitNonZeroBuckets(f func(vmrange string, count uint64)) {}
|
||||
// func (r *noopHistogram) VisitNonZeroBuckets(f func(vmrange string, count uint64)) {}
|
||||
|
@@ -90,7 +90,7 @@ func Logger(l logger.Logger) Option {
|
||||
|
||||
func Labels(ls ...string) Option {
|
||||
return func(o *Options) {
|
||||
o.Labels = ls
|
||||
o.Labels = append(o.Labels, ls...)
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -11,18 +11,25 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
ClientRequestDurationSeconds = "client_request_duration_seconds"
|
||||
ClientRequestLatencyMicroseconds = "client_request_latency_microseconds"
|
||||
ClientRequestTotal = "client_request_total"
|
||||
ServerRequestDurationSeconds = "server_request_duration_seconds"
|
||||
ServerRequestLatencyMicroseconds = "server_request_latency_microseconds"
|
||||
ServerRequestTotal = "server_request_total"
|
||||
PublishMessageDurationSeconds = "publish_message_duration_seconds"
|
||||
PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds"
|
||||
PublishMessageTotal = "publish_message_total"
|
||||
ClientRequestDurationSeconds = "client_request_duration_seconds"
|
||||
ClientRequestLatencyMicroseconds = "client_request_latency_microseconds"
|
||||
ClientRequestTotal = "client_request_total"
|
||||
ClientRequestInflight = "client_request_inflight"
|
||||
|
||||
ServerRequestDurationSeconds = "server_request_duration_seconds"
|
||||
ServerRequestLatencyMicroseconds = "server_request_latency_microseconds"
|
||||
ServerRequestTotal = "server_request_total"
|
||||
ServerRequestInflight = "server_request_inflight"
|
||||
|
||||
PublishMessageDurationSeconds = "publish_message_duration_seconds"
|
||||
PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds"
|
||||
PublishMessageTotal = "publish_message_total"
|
||||
PublishMessageInflight = "publish_message_inflight"
|
||||
|
||||
SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds"
|
||||
SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds"
|
||||
SubscribeMessageTotal = "subscribe_message_total"
|
||||
SubscribeMessageInflight = "subscribe_message_inflight"
|
||||
|
||||
labelSuccess = "success"
|
||||
labelFailure = "failure"
|
||||
@@ -116,22 +123,25 @@ func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request,
|
||||
return w.callFunc(ctx, addr, req, rsp, opts)
|
||||
}
|
||||
}
|
||||
|
||||
labels := make([]string, 0, 4)
|
||||
labels = append(labels, labelEndpoint, endpoint)
|
||||
|
||||
w.opts.Meter.Counter(ClientRequestInflight, labels...).Inc()
|
||||
ts := time.Now()
|
||||
err := w.callFunc(ctx, addr, req, rsp, opts)
|
||||
te := time.Since(ts)
|
||||
w.opts.Meter.Counter(ClientRequestInflight, labels...).Dec()
|
||||
|
||||
lopts := w.opts.lopts
|
||||
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint))
|
||||
|
||||
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
|
||||
w.opts.Meter.Histogram(ClientRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
|
||||
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
|
||||
w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds())
|
||||
|
||||
if err == nil {
|
||||
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess))
|
||||
labels = append(labels, labelStatus, labelSuccess)
|
||||
} else {
|
||||
lopts = append(lopts, meter.Labels(labelStatus, labelFailure))
|
||||
labels = append(labels, labelStatus, labelFailure)
|
||||
}
|
||||
w.opts.Meter.Counter(ClientRequestTotal, lopts...).Inc()
|
||||
w.opts.Meter.Counter(ClientRequestTotal, labels...).Inc()
|
||||
|
||||
return err
|
||||
}
|
||||
@@ -144,22 +154,24 @@ func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{},
|
||||
}
|
||||
}
|
||||
|
||||
labels := make([]string, 0, 4)
|
||||
labels = append(labels, labelEndpoint, endpoint)
|
||||
|
||||
w.opts.Meter.Counter(ClientRequestInflight, labels...).Inc()
|
||||
ts := time.Now()
|
||||
err := w.Client.Call(ctx, req, rsp, opts...)
|
||||
te := time.Since(ts)
|
||||
w.opts.Meter.Counter(ClientRequestInflight, labels...).Dec()
|
||||
|
||||
lopts := w.opts.lopts
|
||||
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint))
|
||||
|
||||
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
|
||||
w.opts.Meter.Histogram(ClientRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
|
||||
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
|
||||
w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds())
|
||||
|
||||
if err == nil {
|
||||
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess))
|
||||
labels = append(labels, labelStatus, labelSuccess)
|
||||
} else {
|
||||
lopts = append(lopts, meter.Labels(labelStatus, labelFailure))
|
||||
labels = append(labels, labelStatus, labelFailure)
|
||||
}
|
||||
w.opts.Meter.Counter(ClientRequestTotal, lopts...).Inc()
|
||||
w.opts.Meter.Counter(ClientRequestTotal, labels...).Inc()
|
||||
|
||||
return err
|
||||
}
|
||||
@@ -172,22 +184,24 @@ func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client
|
||||
}
|
||||
}
|
||||
|
||||
labels := make([]string, 0, 4)
|
||||
labels = append(labels, labelEndpoint, endpoint)
|
||||
|
||||
w.opts.Meter.Counter(ClientRequestInflight, labels...).Inc()
|
||||
ts := time.Now()
|
||||
stream, err := w.Client.Stream(ctx, req, opts...)
|
||||
te := time.Since(ts)
|
||||
w.opts.Meter.Counter(ClientRequestInflight, labels...).Dec()
|
||||
|
||||
lopts := w.opts.lopts
|
||||
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint))
|
||||
|
||||
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
|
||||
w.opts.Meter.Histogram(ClientRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
|
||||
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
|
||||
w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds())
|
||||
|
||||
if err == nil {
|
||||
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess))
|
||||
labels = append(labels, labelStatus, labelSuccess)
|
||||
} else {
|
||||
lopts = append(lopts, meter.Labels(labelStatus, labelFailure))
|
||||
labels = append(labels, labelStatus, labelFailure)
|
||||
}
|
||||
w.opts.Meter.Counter(ClientRequestTotal, lopts...).Inc()
|
||||
w.opts.Meter.Counter(ClientRequestTotal, labels...).Inc()
|
||||
|
||||
return stream, err
|
||||
}
|
||||
@@ -195,22 +209,24 @@ func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client
|
||||
func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
|
||||
endpoint := p.Topic()
|
||||
|
||||
labels := make([]string, 0, 4)
|
||||
labels = append(labels, labelEndpoint, endpoint)
|
||||
|
||||
w.opts.Meter.Counter(PublishMessageInflight, labels...).Inc()
|
||||
ts := time.Now()
|
||||
err := w.Client.Publish(ctx, p, opts...)
|
||||
te := time.Since(ts)
|
||||
w.opts.Meter.Counter(PublishMessageInflight, labels...).Dec()
|
||||
|
||||
lopts := w.opts.lopts
|
||||
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint))
|
||||
|
||||
w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
|
||||
w.opts.Meter.Histogram(PublishMessageDurationSeconds, lopts...).Update(float64(te.Seconds()))
|
||||
w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, labels...).Update(te.Seconds())
|
||||
w.opts.Meter.Histogram(PublishMessageDurationSeconds, labels...).Update(te.Seconds())
|
||||
|
||||
if err == nil {
|
||||
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess))
|
||||
labels = append(labels, labelStatus, labelSuccess)
|
||||
} else {
|
||||
lopts = append(lopts, meter.Labels(labelStatus, labelFailure))
|
||||
labels = append(labels, labelStatus, labelFailure)
|
||||
}
|
||||
w.opts.Meter.Counter(PublishMessageTotal, lopts...).Inc()
|
||||
w.opts.Meter.Counter(PublishMessageTotal, labels...).Inc()
|
||||
|
||||
return err
|
||||
}
|
||||
@@ -231,22 +247,24 @@ func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc {
|
||||
}
|
||||
}
|
||||
|
||||
labels := make([]string, 0, 4)
|
||||
labels = append(labels, labelEndpoint, endpoint)
|
||||
|
||||
w.opts.Meter.Counter(ServerRequestInflight, labels...).Inc()
|
||||
ts := time.Now()
|
||||
err := fn(ctx, req, rsp)
|
||||
te := time.Since(ts)
|
||||
w.opts.Meter.Counter(ServerRequestInflight, labels...).Dec()
|
||||
|
||||
lopts := w.opts.lopts
|
||||
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint))
|
||||
|
||||
w.opts.Meter.Summary(ServerRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
|
||||
w.opts.Meter.Histogram(ServerRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
|
||||
w.opts.Meter.Summary(ServerRequestLatencyMicroseconds, labels...).Update(te.Seconds())
|
||||
w.opts.Meter.Histogram(ServerRequestDurationSeconds, labels...).Update(te.Seconds())
|
||||
|
||||
if err == nil {
|
||||
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess))
|
||||
labels = append(labels, labelStatus, labelSuccess)
|
||||
} else {
|
||||
lopts = append(lopts, meter.Labels(labelStatus, labelFailure))
|
||||
labels = append(labels, labelStatus, labelFailure)
|
||||
}
|
||||
w.opts.Meter.Counter(ServerRequestTotal, lopts...).Inc()
|
||||
w.opts.Meter.Counter(ServerRequestTotal, labels...).Inc()
|
||||
|
||||
return err
|
||||
}
|
||||
@@ -263,22 +281,24 @@ func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc
|
||||
return func(ctx context.Context, msg server.Message) error {
|
||||
endpoint := msg.Topic()
|
||||
|
||||
labels := make([]string, 0, 4)
|
||||
labels = append(labels, labelEndpoint, endpoint)
|
||||
|
||||
w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Inc()
|
||||
ts := time.Now()
|
||||
err := fn(ctx, msg)
|
||||
te := time.Since(ts)
|
||||
w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Dec()
|
||||
|
||||
lopts := w.opts.lopts
|
||||
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint))
|
||||
|
||||
w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
|
||||
w.opts.Meter.Histogram(SubscribeMessageDurationSeconds, lopts...).Update(float64(te.Seconds()))
|
||||
w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, labels...).Update(te.Seconds())
|
||||
w.opts.Meter.Histogram(SubscribeMessageDurationSeconds, labels...).Update(te.Seconds())
|
||||
|
||||
if err == nil {
|
||||
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess))
|
||||
labels = append(labels, labelStatus, labelSuccess)
|
||||
} else {
|
||||
lopts = append(lopts, meter.Labels(labelStatus, labelFailure))
|
||||
labels = append(labels, labelStatus, labelFailure)
|
||||
}
|
||||
w.opts.Meter.Counter(SubscribeMessageTotal, lopts...).Inc()
|
||||
w.opts.Meter.Counter(SubscribeMessageTotal, labels...).Inc()
|
||||
|
||||
return err
|
||||
}
|
||||
|
@@ -31,18 +31,18 @@ type memoryClient struct {
|
||||
}
|
||||
|
||||
type memoryListener struct {
|
||||
topts Options
|
||||
ctx context.Context
|
||||
lopts ListenOptions
|
||||
ctx context.Context
|
||||
exit chan bool
|
||||
conn chan *memorySocket
|
||||
addr string
|
||||
topts Options
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
type memoryTransport struct {
|
||||
opts Options
|
||||
listeners map[string]*memoryListener
|
||||
opts Options
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
|
@@ -27,9 +27,9 @@ func TestMemoryTransport(t *testing.T) {
|
||||
if len(os.Getenv("INTEGRATION_TESTS")) == 0 {
|
||||
t.Logf("Server Received %s", string(m.Body))
|
||||
}
|
||||
if err := sock.Send(&Message{
|
||||
if cerr := sock.Send(&Message{
|
||||
Body: []byte(`pong`),
|
||||
}); err != nil {
|
||||
}); cerr != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -60,7 +60,6 @@ func TestMemoryTransport(t *testing.T) {
|
||||
t.Logf("Client Received %s", string(m.Body))
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestListener(t *testing.T) {
|
||||
|
@@ -12,16 +12,16 @@ import (
|
||||
)
|
||||
|
||||
type tunBroker struct {
|
||||
opts broker.Options
|
||||
tunnel tunnel.Tunnel
|
||||
opts broker.Options
|
||||
}
|
||||
|
||||
type tunSubscriber struct {
|
||||
opts broker.SubscribeOptions
|
||||
listener tunnel.Listener
|
||||
handler broker.Handler
|
||||
closed chan bool
|
||||
topic string
|
||||
opts broker.SubscribeOptions
|
||||
}
|
||||
|
||||
type tunEvent struct {
|
||||
@@ -30,8 +30,10 @@ type tunEvent struct {
|
||||
}
|
||||
|
||||
// used to access tunnel from options context
|
||||
type tunnelKey struct{}
|
||||
type tunnelAddr struct{}
|
||||
type (
|
||||
tunnelKey struct{}
|
||||
tunnelAddr struct{}
|
||||
)
|
||||
|
||||
func (t *tunBroker) Init(opts ...broker.Option) error {
|
||||
for _, o := range opts {
|
||||
|
@@ -34,8 +34,8 @@ type Options struct {
|
||||
Token string
|
||||
// Name holds the tunnel name
|
||||
Name string
|
||||
// Id holds the tunnel id
|
||||
Id string
|
||||
// ID holds the tunnel id
|
||||
ID string
|
||||
// Address holds the tunnel address
|
||||
Address string
|
||||
// Nodes holds the tunnel nodes
|
||||
@@ -68,10 +68,10 @@ type ListenOptions struct {
|
||||
Timeout time.Duration
|
||||
}
|
||||
|
||||
// Id sets the tunnel id
|
||||
func Id(id string) Option {
|
||||
// ID sets the tunnel id
|
||||
func ID(id string) Option {
|
||||
return func(o *Options) {
|
||||
o.Id = id
|
||||
o.ID = id
|
||||
}
|
||||
}
|
||||
|
||||
@@ -164,7 +164,7 @@ func DialWait(b bool) DialOption {
|
||||
// NewOptions returns router default options with filled values
|
||||
func NewOptions(opts ...Option) Options {
|
||||
options := Options{
|
||||
Id: uuid.New().String(),
|
||||
ID: uuid.New().String(),
|
||||
Address: DefaultAddress,
|
||||
Token: DefaultToken,
|
||||
Logger: logger.DefaultLogger,
|
||||
|
@@ -10,9 +10,8 @@ import (
|
||||
)
|
||||
|
||||
type tunTransport struct {
|
||||
tunnel tunnel.Tunnel
|
||||
options transport.Options
|
||||
|
||||
tunnel tunnel.Tunnel
|
||||
}
|
||||
|
||||
type tunnelKey struct{}
|
||||
@@ -88,7 +87,7 @@ func NewTransport(opts ...transport.Option) transport.Transport {
|
||||
}
|
||||
|
||||
// initialise
|
||||
//t.Init(opts...)
|
||||
// t.Init(opts...)
|
||||
|
||||
return t
|
||||
}
|
||||
|
@@ -9,10 +9,8 @@ import (
|
||||
"github.com/unistack-org/micro/v3/network/transport"
|
||||
)
|
||||
|
||||
var (
|
||||
// DefaultTunnel contains default tunnel implementation
|
||||
DefaultTunnel Tunnel
|
||||
)
|
||||
// DefaultTunnel contains default tunnel implementation
|
||||
var DefaultTunnel Tunnel
|
||||
|
||||
const (
|
||||
// Unicast send over one link
|
||||
|
@@ -17,8 +17,6 @@ import (
|
||||
"github.com/unistack-org/micro/v3/server"
|
||||
"github.com/unistack-org/micro/v3/store"
|
||||
"github.com/unistack-org/micro/v3/tracer"
|
||||
// "github.com/unistack-org/micro/v3/profiler"
|
||||
// "github.com/unistack-org/micro/v3/runtime"
|
||||
)
|
||||
|
||||
// Options for micro service
|
||||
@@ -78,8 +76,8 @@ func NewOptions(opts ...Option) Options {
|
||||
Meters: []meter.Meter{meter.DefaultMeter},
|
||||
Configs: []config.Config{config.DefaultConfig},
|
||||
Stores: []store.Store{store.DefaultStore},
|
||||
//Runtime runtime.Runtime
|
||||
//Profile profile.Profile
|
||||
// Runtime runtime.Runtime
|
||||
// Profile profile.Profile
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
|
@@ -16,10 +16,8 @@ type httpProfile struct {
|
||||
running bool
|
||||
}
|
||||
|
||||
var (
|
||||
// DefaultAddress for http profiler
|
||||
DefaultAddress = ":6060"
|
||||
)
|
||||
// DefaultAddress for http profiler
|
||||
var DefaultAddress = ":6060"
|
||||
|
||||
// Start the profiler
|
||||
func (h *httpProfile) Start() error {
|
||||
|
@@ -11,10 +11,8 @@ type Profiler interface {
|
||||
String() string
|
||||
}
|
||||
|
||||
var (
|
||||
// DefaultProfiler holds the default profiler
|
||||
DefaultProfiler Profiler = NewProfiler()
|
||||
)
|
||||
// DefaultProfiler holds the default profiler
|
||||
var DefaultProfiler Profiler = NewProfiler()
|
||||
|
||||
// Options holds the options for profiler
|
||||
type Options struct {
|
||||
|
@@ -7,10 +7,8 @@ import (
|
||||
"github.com/unistack-org/micro/v3/server"
|
||||
)
|
||||
|
||||
var (
|
||||
// DefaultEndpoint holds default proxy address
|
||||
DefaultEndpoint = "localhost:9090"
|
||||
)
|
||||
// DefaultEndpoint holds default proxy address
|
||||
var DefaultEndpoint = "localhost:9090"
|
||||
|
||||
// Proxy can be used as a proxy server for micro services
|
||||
type Proxy interface {
|
||||
|
@@ -3,7 +3,6 @@ package register
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"unicode"
|
||||
"unicode/utf8"
|
||||
|
||||
@@ -11,12 +10,12 @@ import (
|
||||
)
|
||||
|
||||
// ExtractValue from reflect.Type from specified depth
|
||||
func ExtractValue(v reflect.Type, d int) *Value {
|
||||
func ExtractValue(v reflect.Type, d int) string {
|
||||
if d == 3 {
|
||||
return nil
|
||||
return ""
|
||||
}
|
||||
if v == nil {
|
||||
return nil
|
||||
return ""
|
||||
}
|
||||
|
||||
if v.Kind() == reflect.Ptr {
|
||||
@@ -25,7 +24,7 @@ func ExtractValue(v reflect.Type, d int) *Value {
|
||||
|
||||
// slices and maps don't have a defined name
|
||||
if (v.Kind() == reflect.Slice || v.Kind() == reflect.Map) || len(v.Name()) == 0 {
|
||||
return nil
|
||||
return ""
|
||||
}
|
||||
|
||||
// get the rune character
|
||||
@@ -33,58 +32,10 @@ func ExtractValue(v reflect.Type, d int) *Value {
|
||||
|
||||
// crude check for is unexported field
|
||||
if unicode.IsLower(a) {
|
||||
return nil
|
||||
return ""
|
||||
}
|
||||
|
||||
arg := &Value{
|
||||
Name: v.Name(),
|
||||
Type: v.Name(),
|
||||
}
|
||||
|
||||
switch v.Kind() {
|
||||
case reflect.Struct:
|
||||
for i := 0; i < v.NumField(); i++ {
|
||||
f := v.Field(i)
|
||||
val := ExtractValue(f.Type, d+1)
|
||||
if val == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// if we can find a json tag use it
|
||||
if tags := f.Tag.Get("json"); len(tags) > 0 {
|
||||
parts := strings.Split(tags, ",")
|
||||
if parts[0] == "-" || parts[0] == "omitempty" {
|
||||
continue
|
||||
}
|
||||
val.Name = parts[0]
|
||||
}
|
||||
|
||||
// if there's no name default it
|
||||
if len(val.Name) == 0 {
|
||||
val.Name = v.Field(i).Name
|
||||
}
|
||||
|
||||
arg.Values = append(arg.Values, val)
|
||||
}
|
||||
case reflect.Slice:
|
||||
p := v.Elem()
|
||||
if p.Kind() == reflect.Ptr {
|
||||
p = p.Elem()
|
||||
}
|
||||
arg.Type = "[]" + p.Name()
|
||||
case reflect.Map:
|
||||
p := v.Elem()
|
||||
if p.Kind() == reflect.Ptr {
|
||||
p = p.Elem()
|
||||
}
|
||||
key := v.Key()
|
||||
if key.Kind() == reflect.Ptr {
|
||||
key = key.Elem()
|
||||
}
|
||||
arg.Type = fmt.Sprintf("map[%s]%s", key.Name(), p.Name())
|
||||
}
|
||||
|
||||
return arg
|
||||
return v.Name()
|
||||
}
|
||||
|
||||
// ExtractEndpoint extract *Endpoint from reflect.Method
|
||||
@@ -116,7 +67,7 @@ func ExtractEndpoint(method reflect.Method) *Endpoint {
|
||||
|
||||
request := ExtractValue(reqType, 0)
|
||||
response := ExtractValue(rspType, 0)
|
||||
if request == nil || response == nil {
|
||||
if request == "" || response == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -135,7 +86,7 @@ func ExtractEndpoint(method reflect.Method) *Endpoint {
|
||||
}
|
||||
|
||||
// ExtractSubValue exctact *Value from reflect.Type
|
||||
func ExtractSubValue(typ reflect.Type) *Value {
|
||||
func ExtractSubValue(typ reflect.Type) string {
|
||||
var reqType reflect.Type
|
||||
switch typ.NumIn() {
|
||||
case 1:
|
||||
@@ -145,7 +96,7 @@ func ExtractSubValue(typ reflect.Type) *Value {
|
||||
case 3:
|
||||
reqType = typ.In(2)
|
||||
default:
|
||||
return nil
|
||||
return ""
|
||||
}
|
||||
return ExtractValue(reqType, 0)
|
||||
}
|
||||
|
@@ -36,28 +36,21 @@ func TestExtractEndpoint(t *testing.T) {
|
||||
t.Fatalf("Expected handler Test, got %s", endpoints[0].Name)
|
||||
}
|
||||
|
||||
if endpoints[0].Request == nil {
|
||||
if endpoints[0].Request == "" {
|
||||
t.Fatal("Expected non nil Request")
|
||||
}
|
||||
|
||||
if endpoints[0].Response == nil {
|
||||
if endpoints[0].Response == "" {
|
||||
t.Fatal("Expected non nil Request")
|
||||
}
|
||||
|
||||
if endpoints[0].Request.Name != "TestRequest" {
|
||||
t.Fatalf("Expected TestRequest got %s", endpoints[0].Request.Name)
|
||||
if endpoints[0].Request != "TestRequest" {
|
||||
t.Fatalf("Expected TestRequest got %s", endpoints[0].Request)
|
||||
}
|
||||
|
||||
if endpoints[0].Response.Name != "TestResponse" {
|
||||
t.Fatalf("Expected TestResponse got %s", endpoints[0].Response.Name)
|
||||
}
|
||||
|
||||
if endpoints[0].Request.Type != "TestRequest" {
|
||||
t.Fatalf("Expected TestRequest type got %s", endpoints[0].Request.Type)
|
||||
}
|
||||
|
||||
if endpoints[0].Response.Type != "TestResponse" {
|
||||
t.Fatalf("Expected TestResponse type got %s", endpoints[0].Response.Type)
|
||||
if endpoints[0].Response != "TestResponse" {
|
||||
t.Fatalf("Expected TestResponse got %s", endpoints[0].Response)
|
||||
}
|
||||
|
||||
t.Logf("XXX %#+v\n", endpoints[0])
|
||||
}
|
||||
|
@@ -30,10 +30,9 @@ type record struct {
|
||||
}
|
||||
|
||||
type memory struct {
|
||||
opts Options
|
||||
// records is a KV map with domain name as the key and a services map as the value
|
||||
records map[string]services
|
||||
watchers map[string]*watcher
|
||||
opts Options
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
@@ -65,7 +64,7 @@ func (m *memory) ttlPrune() {
|
||||
for id, n := range record.Nodes {
|
||||
if n.TTL != 0 && time.Since(n.LastSeen) > n.TTL {
|
||||
if m.opts.Logger.V(logger.DebugLevel) {
|
||||
m.opts.Logger.Debugf(m.opts.Context, "Register TTL expired for node %s of service %s", n.Id, service)
|
||||
m.opts.Logger.Debugf(m.opts.Context, "Register TTL expired for node %s of service %s", n.ID, service)
|
||||
}
|
||||
delete(m.records[domain][service][version].Nodes, id)
|
||||
}
|
||||
@@ -162,7 +161,7 @@ func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOptio
|
||||
|
||||
for _, n := range s.Nodes {
|
||||
// check if already exists
|
||||
if _, ok := srvs[s.Name][s.Version].Nodes[n.Id]; ok {
|
||||
if _, ok := srvs[s.Name][s.Version].Nodes[n.ID]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -177,9 +176,9 @@ func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOptio
|
||||
metadata["domain"] = options.Domain
|
||||
|
||||
// add the node
|
||||
srvs[s.Name][s.Version].Nodes[n.Id] = &node{
|
||||
srvs[s.Name][s.Version].Nodes[n.ID] = &node{
|
||||
Node: &Node{
|
||||
Id: n.Id,
|
||||
ID: n.ID,
|
||||
Address: n.Address,
|
||||
Metadata: metadata,
|
||||
},
|
||||
@@ -201,8 +200,8 @@ func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOptio
|
||||
if m.opts.Logger.V(logger.DebugLevel) {
|
||||
m.opts.Logger.Debugf(m.opts.Context, "Updated registration for service: %s, version: %s", s.Name, s.Version)
|
||||
}
|
||||
srvs[s.Name][s.Version].Nodes[n.Id].TTL = options.TTL
|
||||
srvs[s.Name][s.Version].Nodes[n.Id].LastSeen = time.Now()
|
||||
srvs[s.Name][s.Version].Nodes[n.ID].TTL = options.TTL
|
||||
srvs[s.Name][s.Version].Nodes[n.ID].LastSeen = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -242,11 +241,11 @@ func (m *memory) Deregister(ctx context.Context, s *Service, opts ...DeregisterO
|
||||
|
||||
// deregister all of the service nodes from this version
|
||||
for _, n := range s.Nodes {
|
||||
if _, ok := version.Nodes[n.Id]; ok {
|
||||
if _, ok := version.Nodes[n.ID]; ok {
|
||||
if m.opts.Logger.V(logger.DebugLevel) {
|
||||
m.opts.Logger.Debugf(m.opts.Context, "Register removed node from service: %s, version: %s", s.Name, s.Version)
|
||||
}
|
||||
delete(version.Nodes, n.Id)
|
||||
delete(version.Nodes, n.ID)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -458,7 +457,7 @@ func serviceToRecord(s *Service, ttl time.Duration) *record {
|
||||
|
||||
nodes := make(map[string]*node, len(s.Nodes))
|
||||
for _, n := range s.Nodes {
|
||||
nodes[n.Id] = &node{
|
||||
nodes[n.ID] = &node{
|
||||
Node: n,
|
||||
TTL: ttl,
|
||||
LastSeen: time.Now(),
|
||||
@@ -490,40 +489,31 @@ func recordToService(r *record, domain string) *Service {
|
||||
|
||||
endpoints := make([]*Endpoint, len(r.Endpoints))
|
||||
for i, e := range r.Endpoints {
|
||||
request := new(Value)
|
||||
if e.Request != nil {
|
||||
*request = *e.Request
|
||||
}
|
||||
response := new(Value)
|
||||
if e.Response != nil {
|
||||
*response = *e.Response
|
||||
}
|
||||
|
||||
metadata := make(map[string]string, len(e.Metadata))
|
||||
md := make(map[string]string, len(e.Metadata))
|
||||
for k, v := range e.Metadata {
|
||||
metadata[k] = v
|
||||
md[k] = v
|
||||
}
|
||||
|
||||
endpoints[i] = &Endpoint{
|
||||
Name: e.Name,
|
||||
Request: request,
|
||||
Response: response,
|
||||
Metadata: metadata,
|
||||
Request: e.Request,
|
||||
Response: e.Response,
|
||||
Metadata: md,
|
||||
}
|
||||
}
|
||||
|
||||
nodes := make([]*Node, len(r.Nodes))
|
||||
i := 0
|
||||
for _, n := range r.Nodes {
|
||||
metadata := make(map[string]string, len(n.Metadata))
|
||||
md := make(map[string]string, len(n.Metadata))
|
||||
for k, v := range n.Metadata {
|
||||
metadata[k] = v
|
||||
md[k] = v
|
||||
}
|
||||
|
||||
nodes[i] = &Node{
|
||||
Id: n.Id,
|
||||
ID: n.ID,
|
||||
Address: n.Address,
|
||||
Metadata: metadata,
|
||||
Metadata: md,
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
@@ -8,72 +8,70 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
testData = map[string][]*Service{
|
||||
"foo": {
|
||||
{
|
||||
Name: "foo",
|
||||
Version: "1.0.0",
|
||||
Nodes: []*Node{
|
||||
{
|
||||
Id: "foo-1.0.0-123",
|
||||
Address: "localhost:9999",
|
||||
},
|
||||
{
|
||||
Id: "foo-1.0.0-321",
|
||||
Address: "localhost:9999",
|
||||
},
|
||||
var testData = map[string][]*Service{
|
||||
"foo": {
|
||||
{
|
||||
Name: "foo",
|
||||
Version: "1.0.0",
|
||||
Nodes: []*Node{
|
||||
{
|
||||
ID: "foo-1.0.0-123",
|
||||
Address: "localhost:9999",
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "foo",
|
||||
Version: "1.0.1",
|
||||
Nodes: []*Node{
|
||||
{
|
||||
Id: "foo-1.0.1-321",
|
||||
Address: "localhost:6666",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "foo",
|
||||
Version: "1.0.3",
|
||||
Nodes: []*Node{
|
||||
{
|
||||
Id: "foo-1.0.3-345",
|
||||
Address: "localhost:8888",
|
||||
},
|
||||
{
|
||||
ID: "foo-1.0.0-321",
|
||||
Address: "localhost:9999",
|
||||
},
|
||||
},
|
||||
},
|
||||
"bar": {
|
||||
{
|
||||
Name: "bar",
|
||||
Version: "default",
|
||||
Nodes: []*Node{
|
||||
{
|
||||
Id: "bar-1.0.0-123",
|
||||
Address: "localhost:9999",
|
||||
},
|
||||
{
|
||||
Id: "bar-1.0.0-321",
|
||||
Address: "localhost:9999",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "bar",
|
||||
Version: "latest",
|
||||
Nodes: []*Node{
|
||||
{
|
||||
Id: "bar-1.0.1-321",
|
||||
Address: "localhost:6666",
|
||||
},
|
||||
{
|
||||
Name: "foo",
|
||||
Version: "1.0.1",
|
||||
Nodes: []*Node{
|
||||
{
|
||||
ID: "foo-1.0.1-321",
|
||||
Address: "localhost:6666",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
{
|
||||
Name: "foo",
|
||||
Version: "1.0.3",
|
||||
Nodes: []*Node{
|
||||
{
|
||||
ID: "foo-1.0.3-345",
|
||||
Address: "localhost:8888",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"bar": {
|
||||
{
|
||||
Name: "bar",
|
||||
Version: "default",
|
||||
Nodes: []*Node{
|
||||
{
|
||||
ID: "bar-1.0.0-123",
|
||||
Address: "localhost:9999",
|
||||
},
|
||||
{
|
||||
ID: "bar-1.0.0-321",
|
||||
Address: "localhost:9999",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "bar",
|
||||
Version: "latest",
|
||||
Nodes: []*Node{
|
||||
{
|
||||
ID: "bar-1.0.1-321",
|
||||
Address: "localhost:6666",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
//nolint:gocyclo
|
||||
func TestMemoryRegistry(t *testing.T) {
|
||||
|
@@ -44,6 +44,7 @@ func NewOptions(opts ...Option) Options {
|
||||
return options
|
||||
}
|
||||
|
||||
// nolint: golint
|
||||
// RegisterOptions holds options for register method
|
||||
type RegisterOptions struct {
|
||||
Context context.Context
|
||||
@@ -196,6 +197,7 @@ func TLSConfig(t *tls.Config) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// nolint: golint
|
||||
// RegisterAttempts specifies register atempts count
|
||||
func RegisterAttempts(t int) RegisterOption {
|
||||
return func(o *RegisterOptions) {
|
||||
@@ -203,6 +205,7 @@ func RegisterAttempts(t int) RegisterOption {
|
||||
}
|
||||
}
|
||||
|
||||
// nolint: golint
|
||||
// RegisterTTL specifies register ttl
|
||||
func RegisterTTL(t time.Duration) RegisterOption {
|
||||
return func(o *RegisterOptions) {
|
||||
@@ -210,6 +213,7 @@ func RegisterTTL(t time.Duration) RegisterOption {
|
||||
}
|
||||
}
|
||||
|
||||
// nolint: golint
|
||||
// RegisterContext sets the register context
|
||||
func RegisterContext(ctx context.Context) RegisterOption {
|
||||
return func(o *RegisterOptions) {
|
||||
@@ -217,6 +221,7 @@ func RegisterContext(ctx context.Context) RegisterOption {
|
||||
}
|
||||
}
|
||||
|
||||
// nolint: golint
|
||||
// RegisterDomain secifies register domain
|
||||
func RegisterDomain(d string) RegisterOption {
|
||||
return func(o *RegisterOptions) {
|
||||
|
@@ -53,29 +53,23 @@ type Service struct {
|
||||
// Node holds node register info
|
||||
type Node struct {
|
||||
Metadata metadata.Metadata `json:"metadata"`
|
||||
Id string `json:"id"`
|
||||
ID string `json:"id"`
|
||||
Address string `json:"address"`
|
||||
}
|
||||
|
||||
// Endpoint holds endpoint register info
|
||||
type Endpoint struct {
|
||||
Request *Value `json:"request"`
|
||||
Response *Value `json:"response"`
|
||||
Request string `json:"request"`
|
||||
Response string `json:"response"`
|
||||
Metadata metadata.Metadata `json:"metadata"`
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
// Value holds additional kv stuff
|
||||
type Value struct {
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
Values []*Value `json:"values"`
|
||||
}
|
||||
|
||||
// Option func signature
|
||||
type Option func(*Options)
|
||||
|
||||
// RegisterOption option is used to register service
|
||||
// nolint: golint
|
||||
type RegisterOption func(*RegisterOptions)
|
||||
|
||||
// WatchOption option is used to watch service changes
|
||||
|
@@ -52,8 +52,8 @@ type Event struct {
|
||||
Timestamp time.Time
|
||||
// Service is register service
|
||||
Service *Service
|
||||
// Id is register id
|
||||
Id string
|
||||
// ID is register id
|
||||
ID string
|
||||
// Type defines type of event
|
||||
Type EventType
|
||||
}
|
||||
|
@@ -5,10 +5,8 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrWatcherStopped is returned when routing table watcher has been stopped
|
||||
ErrWatcherStopped = errors.New("watcher stopped")
|
||||
)
|
||||
// ErrWatcherStopped is returned when routing table watcher has been stopped
|
||||
var ErrWatcherStopped = errors.New("watcher stopped")
|
||||
|
||||
// EventType defines routing table event
|
||||
type EventType int
|
||||
@@ -38,12 +36,12 @@ func (t EventType) String() string {
|
||||
|
||||
// Event is returned by a call to Next on the watcher.
|
||||
type Event struct {
|
||||
// Route is table route
|
||||
Route Route
|
||||
// Timestamp is event timestamp
|
||||
Timestamp time.Time
|
||||
// Id of the event
|
||||
Id string
|
||||
// Route is table route
|
||||
Route Route
|
||||
// Type defines type of event
|
||||
Type EventType
|
||||
}
|
||||
|
@@ -8,10 +8,8 @@ import (
|
||||
"github.com/unistack-org/micro/v3/metadata"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrAlreadyExists error
|
||||
ErrAlreadyExists = errors.New("already exists")
|
||||
)
|
||||
// ErrAlreadyExists error
|
||||
var ErrAlreadyExists = errors.New("already exists")
|
||||
|
||||
// Runtime is a service runtime manager
|
||||
type Runtime interface {
|
||||
|
@@ -5,10 +5,8 @@ import (
|
||||
"errors"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrNoneAvailable is returned by select when no routes were provided to select from
|
||||
ErrNoneAvailable = errors.New("none available")
|
||||
)
|
||||
// ErrNoneAvailable is returned by select when no routes were provided to select from
|
||||
var ErrNoneAvailable = errors.New("none available")
|
||||
|
||||
// Selector selects a route from a pool
|
||||
type Selector interface {
|
||||
|
@@ -13,7 +13,7 @@ type rpcHandler struct {
|
||||
endpoints []*register.Endpoint
|
||||
}
|
||||
|
||||
func newRpcHandler(handler interface{}, opts ...HandlerOption) Handler {
|
||||
func newRPCHandler(handler interface{}, opts ...HandlerOption) Handler {
|
||||
options := NewHandlerOptions(opts...)
|
||||
|
||||
typ := reflect.TypeOf(handler)
|
||||
|
@@ -1,21 +1,16 @@
|
||||
package pb
|
||||
package health
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/unistack-org/micro/v3/codec"
|
||||
"github.com/unistack-org/micro/v3/errors"
|
||||
"github.com/unistack-org/micro/v3/server"
|
||||
)
|
||||
|
||||
var (
|
||||
// guard to fail early
|
||||
_ HealthServer = &handler{}
|
||||
)
|
||||
var _ HealthServer = &Handler{}
|
||||
|
||||
type handler struct {
|
||||
server server.Server
|
||||
opts Options
|
||||
type Handler struct {
|
||||
opts Options
|
||||
}
|
||||
|
||||
type CheckFunc func(context.Context) error
|
||||
@@ -23,10 +18,10 @@ type CheckFunc func(context.Context) error
|
||||
type Option func(*Options)
|
||||
|
||||
type Options struct {
|
||||
LiveChecks []CheckFunc
|
||||
ReadyChecks []CheckFunc
|
||||
Version string
|
||||
Name string
|
||||
LiveChecks []CheckFunc
|
||||
ReadyChecks []CheckFunc
|
||||
}
|
||||
|
||||
func LiveChecks(fns ...CheckFunc) Option {
|
||||
@@ -53,15 +48,15 @@ func Version(version string) Option {
|
||||
}
|
||||
}
|
||||
|
||||
func NewHandler(opts ...Option) *handler {
|
||||
func NewHandler(opts ...Option) *Handler {
|
||||
options := Options{}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
return &handler{opts: options}
|
||||
return &Handler{opts: options}
|
||||
}
|
||||
|
||||
func (h *handler) Live(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
|
||||
func (h *Handler) Live(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
|
||||
var err error
|
||||
for _, fn := range h.opts.LiveChecks {
|
||||
if err = fn(ctx); err != nil {
|
||||
@@ -71,7 +66,7 @@ func (h *handler) Live(ctx context.Context, req *codec.Frame, rsp *codec.Frame)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *handler) Ready(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
|
||||
func (h *Handler) Ready(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
|
||||
var err error
|
||||
for _, fn := range h.opts.ReadyChecks {
|
||||
if err = fn(ctx); err != nil {
|
||||
@@ -81,7 +76,7 @@ func (h *handler) Ready(ctx context.Context, req *codec.Frame, rsp *codec.Frame)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *handler) Version(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
|
||||
func (h *Handler) Version(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
|
||||
rsp.Data = []byte(h.opts.Version)
|
||||
return nil
|
||||
}
|
||||
|
@@ -6,9 +6,6 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
// cjson "github.com/unistack-org/micro-codec-json"
|
||||
// cjsonrpc "github.com/unistack-org/micro-codec-jsonrpc"
|
||||
// cproto "github.com/unistack-org/micro-codec-proto"
|
||||
// cprotorpc "github.com/unistack-org/micro-codec-protorpc"
|
||||
"github.com/unistack-org/micro/v3/broker"
|
||||
"github.com/unistack-org/micro/v3/codec"
|
||||
@@ -16,29 +13,23 @@ import (
|
||||
"github.com/unistack-org/micro/v3/register"
|
||||
)
|
||||
|
||||
var (
|
||||
// DefaultCodecs will be used to encode/decode
|
||||
DefaultCodecs = map[string]codec.Codec{
|
||||
//"application/json": cjson.NewCodec,
|
||||
//"application/json-rpc": cjsonrpc.NewCodec,
|
||||
//"application/protobuf": cproto.NewCodec,
|
||||
//"application/proto-rpc": cprotorpc.NewCodec,
|
||||
"application/octet-stream": codec.NewCodec(),
|
||||
}
|
||||
)
|
||||
// DefaultCodecs will be used to encode/decode
|
||||
var DefaultCodecs = map[string]codec.Codec{
|
||||
"application/octet-stream": codec.NewCodec(),
|
||||
}
|
||||
|
||||
const (
|
||||
defaultContentType = "application/json"
|
||||
)
|
||||
|
||||
type noopServer struct {
|
||||
opts Options
|
||||
h Handler
|
||||
wg *sync.WaitGroup
|
||||
rsvc *register.Service
|
||||
handlers map[string]Handler
|
||||
subscribers map[*subscriber][]broker.Subscriber
|
||||
exit chan chan error
|
||||
wg *sync.WaitGroup
|
||||
opts Options
|
||||
sync.RWMutex
|
||||
registered bool
|
||||
started bool
|
||||
@@ -103,7 +94,7 @@ func (n *noopServer) Subscribe(sb Subscriber) error {
|
||||
}
|
||||
|
||||
func (n *noopServer) NewHandler(h interface{}, opts ...HandlerOption) Handler {
|
||||
return newRpcHandler(h, opts...)
|
||||
return newRPCHandler(h, opts...)
|
||||
}
|
||||
|
||||
func (n *noopServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber {
|
||||
@@ -158,23 +149,16 @@ func (n *noopServer) Register() error {
|
||||
}
|
||||
|
||||
n.RLock()
|
||||
// Maps are ordered randomly, sort the keys for consistency
|
||||
var handlerList []string
|
||||
for n, e := range n.handlers {
|
||||
// Only advertise non internal handlers
|
||||
if !e.Options().Internal {
|
||||
handlerList = append(handlerList, n)
|
||||
}
|
||||
handlerList := make([]string, 0, len(n.handlers))
|
||||
for n := range n.handlers {
|
||||
handlerList = append(handlerList, n)
|
||||
}
|
||||
|
||||
sort.Strings(handlerList)
|
||||
|
||||
var subscriberList []*subscriber
|
||||
subscriberList := make([]*subscriber, 0, len(n.subscribers))
|
||||
for e := range n.subscribers {
|
||||
// Only advertise non internal subscribers
|
||||
if !e.Options().Internal {
|
||||
subscriberList = append(subscriberList, e)
|
||||
}
|
||||
subscriberList = append(subscriberList, e)
|
||||
}
|
||||
sort.Slice(subscriberList, func(i, j int) bool {
|
||||
return subscriberList[i].topic > subscriberList[j].topic
|
||||
@@ -190,7 +174,7 @@ func (n *noopServer) Register() error {
|
||||
n.RUnlock()
|
||||
|
||||
service.Nodes[0].Metadata["protocol"] = "noop"
|
||||
service.Nodes[0].Metadata["transport"] = "noop"
|
||||
service.Nodes[0].Metadata["transport"] = service.Nodes[0].Metadata["protocol"]
|
||||
service.Endpoints = endpoints
|
||||
|
||||
n.RLock()
|
||||
@@ -199,7 +183,7 @@ func (n *noopServer) Register() error {
|
||||
|
||||
if !registered {
|
||||
if config.Logger.V(logger.InfoLevel) {
|
||||
config.Logger.Infof(n.opts.Context, "register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].Id)
|
||||
config.Logger.Infof(n.opts.Context, "register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -262,7 +246,7 @@ func (n *noopServer) Deregister() error {
|
||||
}
|
||||
|
||||
if config.Logger.V(logger.InfoLevel) {
|
||||
config.Logger.Infof(n.opts.Context, "deregistering node: %s", service.Nodes[0].Id)
|
||||
config.Logger.Infof(n.opts.Context, "deregistering node: %s", service.Nodes[0].ID)
|
||||
}
|
||||
|
||||
if err := DefaultDeregisterFunc(service, config); err != nil {
|
||||
@@ -344,9 +328,10 @@ func (n *noopServer) Start() error {
|
||||
}
|
||||
|
||||
// use RegisterCheck func before register
|
||||
// nolint: nestif
|
||||
if err := config.RegisterCheck(config.Context); err != nil {
|
||||
if config.Logger.V(logger.ErrorLevel) {
|
||||
config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s", config.Name, config.Id, err)
|
||||
config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s", config.Name, config.ID, err)
|
||||
}
|
||||
} else {
|
||||
// announce self to the world
|
||||
@@ -378,25 +363,26 @@ func (n *noopServer) Start() error {
|
||||
registered := n.registered
|
||||
n.RUnlock()
|
||||
rerr := config.RegisterCheck(config.Context)
|
||||
// nolint: nestif
|
||||
if rerr != nil && registered {
|
||||
if config.Logger.V(logger.ErrorLevel) {
|
||||
config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr)
|
||||
config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s, deregister it", config.Name, config.ID, rerr)
|
||||
}
|
||||
// deregister self in case of error
|
||||
if err := n.Deregister(); err != nil {
|
||||
if config.Logger.V(logger.ErrorLevel) {
|
||||
config.Logger.Errorf(n.opts.Context, "server %s-%s deregister error: %s", config.Name, config.Id, err)
|
||||
config.Logger.Errorf(n.opts.Context, "server %s-%s deregister error: %s", config.Name, config.ID, err)
|
||||
}
|
||||
}
|
||||
} else if rerr != nil && !registered {
|
||||
if config.Logger.V(logger.ErrorLevel) {
|
||||
config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s", config.Name, config.Id, rerr)
|
||||
config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s", config.Name, config.ID, rerr)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if err := n.Register(); err != nil {
|
||||
if config.Logger.V(logger.ErrorLevel) {
|
||||
config.Logger.Errorf(n.opts.Context, "server %s-%s register error: %s", config.Name, config.Id, err)
|
||||
config.Logger.Errorf(n.opts.Context, "server %s-%s register error: %s", config.Name, config.ID, err)
|
||||
}
|
||||
}
|
||||
// wait for exit
|
||||
|
@@ -57,8 +57,8 @@ type Options struct {
|
||||
RegisterCheck func(context.Context) error
|
||||
// Codecs map to handle content-type
|
||||
Codecs map[string]codec.Codec
|
||||
// Id holds the id of the server
|
||||
Id string
|
||||
// ID holds the id of the server
|
||||
ID string
|
||||
// Namespace for te server
|
||||
Namespace string
|
||||
// Name holds the server name
|
||||
@@ -104,7 +104,7 @@ func NewOptions(opts ...Option) Options {
|
||||
Address: DefaultAddress,
|
||||
Name: DefaultName,
|
||||
Version: DefaultVersion,
|
||||
Id: DefaultId,
|
||||
ID: DefaultID,
|
||||
Namespace: DefaultNamespace,
|
||||
}
|
||||
|
||||
@@ -143,10 +143,10 @@ func Meter(m meter.Meter) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// Id unique server id
|
||||
func Id(id string) Option {
|
||||
// ID unique server id
|
||||
func ID(id string) Option {
|
||||
return func(o *Options) {
|
||||
o.Id = id
|
||||
o.ID = id
|
||||
}
|
||||
}
|
||||
|
||||
@@ -325,8 +325,6 @@ type HandlerOptions struct {
|
||||
Context context.Context
|
||||
// Metadata for hondler
|
||||
Metadata map[string]metadata.Metadata
|
||||
// Internal flag limits exporting to other nodes via register
|
||||
Internal bool
|
||||
}
|
||||
|
||||
// NewHandlerOptions creates new HandlerOptions
|
||||
@@ -354,8 +352,6 @@ type SubscriberOptions struct {
|
||||
Queue string
|
||||
// AutoAck flag for auto ack messages after processing
|
||||
AutoAck bool
|
||||
// Internal flag limit exporting info via register
|
||||
Internal bool
|
||||
// BodyOnly flag specifies that message without headers
|
||||
BodyOnly bool
|
||||
}
|
||||
@@ -382,23 +378,6 @@ func EndpointMetadata(name string, md metadata.Metadata) HandlerOption {
|
||||
}
|
||||
}
|
||||
|
||||
// InternalHandler options specifies that a handler is not advertised
|
||||
// to the discovery system. In the future this may also limit request
|
||||
// to the internal network or authorised user.
|
||||
func InternalHandler(b bool) HandlerOption {
|
||||
return func(o *HandlerOptions) {
|
||||
o.Internal = b
|
||||
}
|
||||
}
|
||||
|
||||
// InternalSubscriber options specifies that a subscriber is not advertised
|
||||
// to the discovery system.
|
||||
func InternalSubscriber(b bool) SubscriberOption {
|
||||
return func(o *SubscriberOptions) {
|
||||
o.Internal = b
|
||||
}
|
||||
}
|
||||
|
||||
// DisableAutoAck will disable auto acking of messages
|
||||
// after they have been handled.
|
||||
func DisableAutoAck() SubscriberOption {
|
||||
|
@@ -72,7 +72,7 @@ func NewRegisterService(s Server) (*register.Service, error) {
|
||||
}
|
||||
|
||||
node := ®ister.Node{
|
||||
Id: opts.Name + "-" + opts.Id,
|
||||
ID: opts.Name + "-" + opts.ID,
|
||||
Address: net.JoinHostPort(addr, port),
|
||||
}
|
||||
node.Metadata = metadata.Copy(opts.Metadata)
|
||||
|
@@ -11,10 +11,8 @@ import (
|
||||
"github.com/unistack-org/micro/v3/register"
|
||||
)
|
||||
|
||||
var (
|
||||
// DefaultServer default server
|
||||
DefaultServer Server = NewServer()
|
||||
)
|
||||
// DefaultServer default server
|
||||
var DefaultServer Server = NewServer()
|
||||
|
||||
var (
|
||||
// DefaultAddress will be used if no address passed
|
||||
@@ -23,8 +21,8 @@ var (
|
||||
DefaultName = "server"
|
||||
// DefaultVersion will be used if no version passed
|
||||
DefaultVersion = "latest"
|
||||
// DefaultId will be used if no id passed
|
||||
DefaultId = uuid.New().String()
|
||||
// DefaultID will be used if no id passed
|
||||
DefaultID = uuid.New().String()
|
||||
// DefaultRegisterCheck holds func that run before register server
|
||||
DefaultRegisterCheck = func(context.Context) error { return nil }
|
||||
// DefaultRegisterInterval holds interval for register
|
||||
|
@@ -21,11 +21,9 @@ const (
|
||||
subSig = "func(context.Context, interface{}) error"
|
||||
)
|
||||
|
||||
var (
|
||||
// Precompute the reflect type for error. Can't use error directly
|
||||
// because Typeof takes an empty interface value. This is annoying.
|
||||
typeOfError = reflect.TypeOf((*error)(nil)).Elem()
|
||||
)
|
||||
// Precompute the reflect type for error. Can't use error directly
|
||||
// because Typeof takes an empty interface value. This is annoying.
|
||||
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
|
||||
|
||||
type handler struct {
|
||||
reqType reflect.Type
|
||||
@@ -64,7 +62,8 @@ func ValidateSubscriber(sub Subscriber) error {
|
||||
typ := reflect.TypeOf(sub.Subscriber())
|
||||
var argType reflect.Type
|
||||
|
||||
if typ.Kind() == reflect.Func {
|
||||
switch typ.Kind() {
|
||||
case reflect.Func:
|
||||
name := "Func"
|
||||
switch typ.NumIn() {
|
||||
case 2:
|
||||
@@ -82,7 +81,7 @@ func ValidateSubscriber(sub Subscriber) error {
|
||||
if returnType := typ.Out(0); returnType != typeOfError {
|
||||
return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
|
||||
}
|
||||
} else {
|
||||
default:
|
||||
hdlr := reflect.ValueOf(sub.Subscriber())
|
||||
name := reflect.Indirect(hdlr).Type().Name()
|
||||
|
||||
@@ -276,14 +275,14 @@ func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handl
|
||||
if n.wg != nil {
|
||||
defer n.wg.Done()
|
||||
}
|
||||
err := fn(ctx, &rpcMessage{
|
||||
cerr := fn(ctx, &rpcMessage{
|
||||
topic: sb.topic,
|
||||
contentType: ct,
|
||||
payload: req.Interface(),
|
||||
header: msg.Header,
|
||||
body: msg.Body,
|
||||
})
|
||||
results <- err
|
||||
results <- cerr
|
||||
}()
|
||||
}
|
||||
var errors []string
|
||||
|
17
service.go
17
service.go
@@ -103,48 +103,48 @@ func (s *service) Init(opts ...Option) error {
|
||||
// skip config as the struct not passed
|
||||
continue
|
||||
}
|
||||
if err = cfg.Init(config.Context(s.opts.Context)); err != nil {
|
||||
if err = cfg.Init(config.Context(cfg.Options().Context)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = cfg.Load(s.opts.Context); err != nil {
|
||||
if err = cfg.Load(cfg.Options().Context); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
for _, log := range s.opts.Loggers {
|
||||
if err = log.Init(logger.WithContext(s.opts.Context)); err != nil {
|
||||
if err = log.Init(logger.WithContext(log.Options().Context)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for _, reg := range s.opts.Registers {
|
||||
if err = reg.Init(register.Context(s.opts.Context)); err != nil {
|
||||
if err = reg.Init(register.Context(reg.Options().Context)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for _, brk := range s.opts.Brokers {
|
||||
if err = brk.Init(broker.Context(s.opts.Context)); err != nil {
|
||||
if err = brk.Init(broker.Context(brk.Options().Context)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for _, str := range s.opts.Stores {
|
||||
if err = str.Init(store.Context(s.opts.Context)); err != nil {
|
||||
if err = str.Init(store.Context(str.Options().Context)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for _, srv := range s.opts.Servers {
|
||||
if err = srv.Init(server.Context(s.opts.Context)); err != nil {
|
||||
if err = srv.Init(server.Context(srv.Options().Context)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for _, cli := range s.opts.Clients {
|
||||
if err = cli.Init(client.Context(s.opts.Context)); err != nil {
|
||||
if err = cli.Init(client.Context(cli.Options().Context)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -162,7 +162,6 @@ func (s *service) Broker(names ...string) broker.Broker {
|
||||
idx = getNameIndex(names[0], s.opts.Brokers)
|
||||
}
|
||||
return s.opts.Brokers[idx]
|
||||
|
||||
}
|
||||
|
||||
func (s *service) Tracer(names ...string) tracer.Tracer {
|
||||
|
@@ -28,8 +28,8 @@ func (m *memoryStore) Disconnect(ctx context.Context) error {
|
||||
}
|
||||
|
||||
type memoryStore struct {
|
||||
opts Options
|
||||
store *cache.Cache
|
||||
opts Options
|
||||
}
|
||||
|
||||
func (m *memoryStore) key(prefix, key string) string {
|
||||
|
@@ -5,8 +5,6 @@ package store
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/unistack-org/micro/v3/metadata"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -42,12 +40,3 @@ type Store interface {
|
||||
// String returns the name of the implementation.
|
||||
String() string
|
||||
}
|
||||
|
||||
// Value is an item stored or retrieved from a Store
|
||||
// may be used in store implementations to provide metadata
|
||||
type Value struct {
|
||||
// Data holds underline struct
|
||||
Data interface{} `json:"data"`
|
||||
// Metadata associated with data for indexing
|
||||
Metadata metadata.Metadata `json:"metadata"`
|
||||
}
|
||||
|
@@ -6,8 +6,8 @@ import (
|
||||
)
|
||||
|
||||
type memorySync struct {
|
||||
options Options
|
||||
locks map[string]*memoryLock
|
||||
options Options
|
||||
mtx gosync.RWMutex
|
||||
}
|
||||
|
||||
|
@@ -5,10 +5,8 @@ import (
|
||||
"errors"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrLockTimeout error
|
||||
ErrLockTimeout = errors.New("lock timeout")
|
||||
)
|
||||
// ErrLockTimeout error
|
||||
var ErrLockTimeout = errors.New("lock timeout")
|
||||
|
||||
// Sync is an interface for distributed synchronization
|
||||
type Sync interface {
|
||||
|
@@ -38,7 +38,6 @@ type noopSpan struct {
|
||||
}
|
||||
|
||||
func (s *noopSpan) Finish(opts ...SpanOption) {
|
||||
|
||||
}
|
||||
|
||||
func (s *noopSpan) Context() context.Context {
|
||||
@@ -50,7 +49,6 @@ func (s *noopSpan) Tracer() Tracer {
|
||||
}
|
||||
|
||||
func (s *noopSpan) AddEvent(name string, opts ...EventOption) {
|
||||
|
||||
}
|
||||
|
||||
func (s *noopSpan) SetName(name string) {
|
||||
@@ -58,7 +56,6 @@ func (s *noopSpan) SetName(name string) {
|
||||
}
|
||||
|
||||
func (s *noopSpan) SetLabels(labels ...Label) {
|
||||
|
||||
}
|
||||
|
||||
// NewTracer returns new memory tracer
|
||||
|
@@ -2,13 +2,11 @@ package tracer
|
||||
|
||||
import "github.com/unistack-org/micro/v3/logger"
|
||||
|
||||
type SpanOptions struct {
|
||||
}
|
||||
type SpanOptions struct{}
|
||||
|
||||
type SpanOption func(o *SpanOptions)
|
||||
|
||||
type EventOptions struct {
|
||||
}
|
||||
type EventOptions struct{}
|
||||
|
||||
type EventOption func(o *EventOptions)
|
||||
|
||||
|
@@ -5,10 +5,8 @@ import (
|
||||
"context"
|
||||
)
|
||||
|
||||
var (
|
||||
// DefaultTracer is the global default tracer
|
||||
DefaultTracer Tracer = NewTracer()
|
||||
)
|
||||
// DefaultTracer is the global default tracer
|
||||
var DefaultTracer Tracer = NewTracer()
|
||||
|
||||
// Tracer is an interface for distributed tracing
|
||||
type Tracer interface {
|
||||
|
@@ -113,12 +113,14 @@ type tWrapper struct {
|
||||
opts Options
|
||||
}
|
||||
|
||||
type ClientCallObserver func(context.Context, client.Request, interface{}, []client.CallOption, tracer.Span, error)
|
||||
type ClientStreamObserver func(context.Context, client.Request, []client.CallOption, client.Stream, tracer.Span, error)
|
||||
type ClientPublishObserver func(context.Context, client.Message, []client.PublishOption, tracer.Span, error)
|
||||
type ClientCallFuncObserver func(context.Context, string, client.Request, interface{}, client.CallOptions, tracer.Span, error)
|
||||
type ServerHandlerObserver func(context.Context, server.Request, interface{}, tracer.Span, error)
|
||||
type ServerSubscriberObserver func(context.Context, server.Message, tracer.Span, error)
|
||||
type (
|
||||
ClientCallObserver func(context.Context, client.Request, interface{}, []client.CallOption, tracer.Span, error)
|
||||
ClientStreamObserver func(context.Context, client.Request, []client.CallOption, client.Stream, tracer.Span, error)
|
||||
ClientPublishObserver func(context.Context, client.Message, []client.PublishOption, tracer.Span, error)
|
||||
ClientCallFuncObserver func(context.Context, string, client.Request, interface{}, client.CallOptions, tracer.Span, error)
|
||||
ServerHandlerObserver func(context.Context, server.Request, interface{}, tracer.Span, error)
|
||||
ServerSubscriberObserver func(context.Context, server.Message, tracer.Span, error)
|
||||
)
|
||||
|
||||
// Options struct
|
||||
type Options struct {
|
||||
|
@@ -5,9 +5,7 @@ import (
|
||||
"net"
|
||||
)
|
||||
|
||||
var (
|
||||
privateBlocks []*net.IPNet
|
||||
)
|
||||
var privateBlocks []*net.IPNet
|
||||
|
||||
func init() {
|
||||
for _, b := range []string{"10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "100.64.0.0/10", "fd00::/8"} {
|
||||
|
@@ -54,7 +54,6 @@ func TestExtractor(t *testing.T) {
|
||||
t.Errorf("Expected %s got %s", d.expect, addr)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestAppendPrivateBlocks(t *testing.T) {
|
||||
|
@@ -4,20 +4,20 @@ import (
|
||||
"bytes"
|
||||
)
|
||||
|
||||
type buffer struct {
|
||||
type Buffer struct {
|
||||
*bytes.Buffer
|
||||
}
|
||||
|
||||
// Close reset buffer contents
|
||||
func (b *buffer) Close() error {
|
||||
func (b *Buffer) Close() error {
|
||||
b.Buffer.Reset()
|
||||
return nil
|
||||
}
|
||||
|
||||
// New creates new buffer that satisfies Closer interface
|
||||
func New(b *bytes.Buffer) *buffer {
|
||||
func New(b *bytes.Buffer) *Buffer {
|
||||
if b == nil {
|
||||
b = bytes.NewBuffer(nil)
|
||||
}
|
||||
return &buffer{b}
|
||||
return &Buffer{b}
|
||||
}
|
||||
|
@@ -27,7 +27,6 @@ func HostPort(addr string, port interface{}) string {
|
||||
// Listen takes addr:portmin-portmax and binds to the first available port
|
||||
// Example: Listen("localhost:5000-6000", fn)
|
||||
func Listen(addr string, fn func(string) (net.Listener, error)) (net.Listener, error) {
|
||||
|
||||
if strings.Count(addr, ":") == 1 && strings.Count(addr, "-") == 0 {
|
||||
return fn(addr)
|
||||
}
|
||||
|
@@ -22,5 +22,4 @@ func TestListen(t *testing.T) {
|
||||
// TODO nats case test
|
||||
// natsAddr := "_INBOX.bID2CMRvlNp0vt4tgNBHWf"
|
||||
// Expect addr DO NOT has extra ":" at the end!
|
||||
|
||||
}
|
||||
|
@@ -8,9 +8,8 @@ import (
|
||||
"crypto/rand"
|
||||
"crypto/x509"
|
||||
"encoding/pem"
|
||||
"fmt"
|
||||
|
||||
"errors"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// GenerateKey returns an ed25519 key
|
||||
@@ -46,14 +45,14 @@ func CA(opts ...CertOption) ([]byte, []byte, error) {
|
||||
return nil, nil, err
|
||||
}
|
||||
cert, key := &bytes.Buffer{}, &bytes.Buffer{}
|
||||
if err := pem.Encode(cert, &pem.Block{Type: "CERTIFICATE", Bytes: x509Cert}); err != nil {
|
||||
if err = pem.Encode(cert, &pem.Block{Type: "CERTIFICATE", Bytes: x509Cert}); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
x509Key, err := x509.MarshalPKCS8PrivateKey(options.Priv)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if err := pem.Encode(key, &pem.Block{Type: "PRIVATE KEY", Bytes: x509Key}); err != nil {
|
||||
if err = pem.Encode(key, &pem.Block{Type: "PRIVATE KEY", Bytes: x509Key}); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
|
@@ -63,9 +63,7 @@ func Unmarshal(dst interface{}, query string) error {
|
||||
// possible. Eg the example above would output:
|
||||
// {"bar":{"one":{"two":2,"red":112}}}
|
||||
func ToJSON(query string) ([]byte, error) {
|
||||
var (
|
||||
builder interface{} = make(map[string]interface{})
|
||||
)
|
||||
var builder interface{} = make(map[string]interface{})
|
||||
params := strings.Split(query, "&")
|
||||
for _, part := range params {
|
||||
tempMap, err := queryToMap(part)
|
||||
|
@@ -32,9 +32,10 @@ type unmarshalT struct {
|
||||
A string `json:"a"`
|
||||
B unmarshalB `json:"b"`
|
||||
}
|
||||
|
||||
type unmarshalB struct {
|
||||
C int `json:"c"`
|
||||
D string `json:"D"`
|
||||
C int `json:"c"`
|
||||
}
|
||||
|
||||
func TestUnmarshal(t *testing.T) {
|
||||
|
@@ -11,7 +11,7 @@ type Rand struct {
|
||||
}
|
||||
|
||||
func (r *Rand) Int31() int32 {
|
||||
rand.Read(r.buf[:4])
|
||||
_, _ = rand.Read(r.buf[:4])
|
||||
return int32(binary.BigEndian.Uint32(r.buf[:4]) & ^uint32(1<<31))
|
||||
}
|
||||
|
||||
@@ -54,7 +54,7 @@ func (r *Rand) Intn(n int) int {
|
||||
}
|
||||
|
||||
func (r *Rand) Int63() int64 {
|
||||
rand.Read(r.buf[:])
|
||||
_, _ = rand.Read(r.buf[:])
|
||||
return int64(binary.BigEndian.Uint64(r.buf[:]) & ^uint64(1<<63))
|
||||
}
|
||||
|
||||
|
@@ -5,7 +5,6 @@ import (
|
||||
)
|
||||
|
||||
func TestPath(t *testing.T) {
|
||||
|
||||
type Nested2 struct {
|
||||
Name string
|
||||
}
|
||||
|
File diff suppressed because it is too large
Load Diff
475
util/reflect/struct.go
Normal file
475
util/reflect/struct.go
Normal file
@@ -0,0 +1,475 @@
|
||||
package reflect
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// ErrInvalidParam specifies invalid url query params
|
||||
var ErrInvalidParam = errors.New("invalid url query param provided")
|
||||
|
||||
var bracketSplitter = regexp.MustCompile(`\[|\]`)
|
||||
|
||||
func StructFieldByTag(src interface{}, tkey string, tval string) (interface{}, error) {
|
||||
sv := reflect.ValueOf(src)
|
||||
if sv.Kind() == reflect.Ptr {
|
||||
sv = sv.Elem()
|
||||
}
|
||||
if sv.Kind() != reflect.Struct {
|
||||
return nil, ErrInvalidStruct
|
||||
}
|
||||
|
||||
typ := sv.Type()
|
||||
for idx := 0; idx < typ.NumField(); idx++ {
|
||||
fld := typ.Field(idx)
|
||||
val := sv.Field(idx)
|
||||
if !val.CanSet() || len(fld.PkgPath) != 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if ts, ok := fld.Tag.Lookup(tkey); ok {
|
||||
for _, p := range strings.Split(ts, ",") {
|
||||
if p == tval {
|
||||
if val.Kind() != reflect.Ptr && val.CanAddr() {
|
||||
val = val.Addr()
|
||||
}
|
||||
return val.Interface(), nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
switch val.Kind() {
|
||||
case reflect.Ptr:
|
||||
if val = val.Elem(); val.Kind() == reflect.Struct {
|
||||
if iface, err := StructFieldByTag(val.Interface(), tkey, tval); err == nil {
|
||||
return iface, nil
|
||||
}
|
||||
}
|
||||
case reflect.Struct:
|
||||
if iface, err := StructFieldByTag(val.Interface(), tkey, tval); err == nil {
|
||||
return iface, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
|
||||
func StructFieldByName(src interface{}, tkey string) (interface{}, error) {
|
||||
sv := reflect.ValueOf(src)
|
||||
if sv.Kind() == reflect.Ptr {
|
||||
sv = sv.Elem()
|
||||
}
|
||||
if sv.Kind() != reflect.Struct {
|
||||
return nil, ErrInvalidStruct
|
||||
}
|
||||
|
||||
typ := sv.Type()
|
||||
for idx := 0; idx < typ.NumField(); idx++ {
|
||||
fld := typ.Field(idx)
|
||||
val := sv.Field(idx)
|
||||
if !val.CanSet() || len(fld.PkgPath) != 0 {
|
||||
continue
|
||||
}
|
||||
if fld.Name == tkey {
|
||||
if val.Kind() != reflect.Ptr && val.CanAddr() {
|
||||
val = val.Addr()
|
||||
}
|
||||
return val.Interface(), nil
|
||||
}
|
||||
|
||||
switch val.Kind() {
|
||||
case reflect.Ptr:
|
||||
if val = val.Elem(); val.Kind() == reflect.Struct {
|
||||
if iface, err := StructFieldByName(val.Interface(), tkey); err == nil {
|
||||
return iface, nil
|
||||
}
|
||||
}
|
||||
case reflect.Struct:
|
||||
if iface, err := StructFieldByName(val.Interface(), tkey); err == nil {
|
||||
return iface, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
|
||||
// StructFields returns slice of struct fields
|
||||
func StructFields(src interface{}) ([]reflect.StructField, error) {
|
||||
var fields []reflect.StructField
|
||||
|
||||
sv := reflect.ValueOf(src)
|
||||
if sv.Kind() == reflect.Ptr {
|
||||
sv = sv.Elem()
|
||||
}
|
||||
if sv.Kind() != reflect.Struct {
|
||||
return nil, ErrInvalidStruct
|
||||
}
|
||||
|
||||
typ := sv.Type()
|
||||
for idx := 0; idx < typ.NumField(); idx++ {
|
||||
fld := typ.Field(idx)
|
||||
val := sv.Field(idx)
|
||||
if !val.CanSet() || len(fld.PkgPath) != 0 {
|
||||
continue
|
||||
}
|
||||
if val.Kind() == reflect.Struct {
|
||||
infields, err := StructFields(val.Interface())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fields = append(fields, infields...)
|
||||
} else {
|
||||
fields = append(fields, fld)
|
||||
}
|
||||
}
|
||||
|
||||
return fields, nil
|
||||
}
|
||||
|
||||
// CopyDefaults for a from b
|
||||
// a and b should be pointers to the same kind of struct
|
||||
func CopyDefaults(a, b interface{}) {
|
||||
pt := reflect.TypeOf(a)
|
||||
t := pt.Elem()
|
||||
va := reflect.ValueOf(a).Elem()
|
||||
vb := reflect.ValueOf(b).Elem()
|
||||
for i := 0; i < t.NumField(); i++ {
|
||||
aField := va.Field(i)
|
||||
if aField.CanSet() {
|
||||
bField := vb.Field(i)
|
||||
aField.Set(bField)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// CopyFrom sets the public members of a from b
|
||||
// a and b should be pointers to structs
|
||||
// a can be a different type from b
|
||||
// Only the Fields which have the same name and assignable type on a
|
||||
// and b will be set.
|
||||
func CopyFrom(a, b interface{}) {
|
||||
ta := reflect.TypeOf(a).Elem()
|
||||
tb := reflect.TypeOf(b).Elem()
|
||||
va := reflect.ValueOf(a).Elem()
|
||||
vb := reflect.ValueOf(b).Elem()
|
||||
for i := 0; i < tb.NumField(); i++ {
|
||||
bField := vb.Field(i)
|
||||
tbField := tb.Field(i)
|
||||
name := tbField.Name
|
||||
aField := va.FieldByName(name)
|
||||
taField, found := ta.FieldByName(name)
|
||||
if found && aField.IsValid() && bField.IsValid() && aField.CanSet() && tbField.Type.AssignableTo(taField.Type) {
|
||||
aField.Set(bField)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func StructURLValues(src interface{}, pref string, tags []string) (url.Values, error) {
|
||||
data := url.Values{}
|
||||
|
||||
sv := reflect.ValueOf(src)
|
||||
if sv.Kind() == reflect.Ptr {
|
||||
sv = sv.Elem()
|
||||
}
|
||||
if sv.Kind() != reflect.Struct {
|
||||
return nil, ErrInvalidStruct
|
||||
}
|
||||
|
||||
typ := sv.Type()
|
||||
for idx := 0; idx < typ.NumField(); idx++ {
|
||||
fld := typ.Field(idx)
|
||||
val := sv.Field(idx)
|
||||
if !val.CanSet() || len(fld.PkgPath) != 0 || !val.IsValid() {
|
||||
continue
|
||||
}
|
||||
|
||||
var t *tag
|
||||
for _, tn := range tags {
|
||||
ts, ok := fld.Tag.Lookup(tn)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
tp := strings.Split(ts, ",")
|
||||
// special
|
||||
switch tn {
|
||||
case "protobuf": // special
|
||||
t = &tag{key: tn, name: tp[3][5:], opts: append(tp[:3], tp[4:]...)}
|
||||
default:
|
||||
t = &tag{key: tn, name: tp[0], opts: tp[1:]}
|
||||
}
|
||||
if t.name != "" {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if t.name == "" {
|
||||
// fallback to lowercase
|
||||
t.name = strings.ToLower(fld.Name)
|
||||
}
|
||||
if pref != "" {
|
||||
t.name = pref + "." + t.name
|
||||
}
|
||||
|
||||
if !val.IsValid() || val.IsZero() {
|
||||
continue
|
||||
}
|
||||
|
||||
switch val.Kind() {
|
||||
case reflect.Struct, reflect.Ptr:
|
||||
if val.IsNil() {
|
||||
continue
|
||||
}
|
||||
ndata, err := StructURLValues(val.Interface(), t.name, tags)
|
||||
if err != nil {
|
||||
return ndata, err
|
||||
}
|
||||
for k, v := range ndata {
|
||||
data[k] = v
|
||||
}
|
||||
default:
|
||||
switch val.Kind() {
|
||||
case reflect.Slice:
|
||||
for i := 0; i < val.Len(); i++ {
|
||||
va := val.Index(i)
|
||||
// if va.Type().Elem().Kind() != reflect.Ptr {
|
||||
if va.Kind() != reflect.Ptr {
|
||||
data.Set(t.name, fmt.Sprintf("%v", va.Interface()))
|
||||
continue
|
||||
}
|
||||
switch va.Type().Elem().String() {
|
||||
case "wrapperspb.BoolValue", "wrapperspb.BytesValue", "wrapperspb.StringValue":
|
||||
if eva := reflect.Indirect(va).FieldByName("Value"); eva.IsValid() {
|
||||
data.Add(t.name, fmt.Sprintf("%v", eva.Interface()))
|
||||
}
|
||||
case "wrapperspb.DoubleValue", "wrapperspb.FloatValue":
|
||||
if eva := reflect.Indirect(va).FieldByName("Value"); eva.IsValid() {
|
||||
data.Add(t.name, fmt.Sprintf("%v", eva.Interface()))
|
||||
}
|
||||
case "wrapperspb.Int32Value", "wrapperspb.Int64Value":
|
||||
if eva := reflect.Indirect(va).FieldByName("Value"); eva.IsValid() {
|
||||
data.Add(t.name, fmt.Sprintf("%v", eva.Interface()))
|
||||
}
|
||||
case "wrapperspb.UInt32Value", "wrapperspb.UInt64Value":
|
||||
if eva := reflect.Indirect(va).FieldByName("Value"); eva.IsValid() {
|
||||
data.Add(t.name, fmt.Sprintf("%v", eva.Interface()))
|
||||
}
|
||||
default:
|
||||
data.Add(t.name, fmt.Sprintf("%v", val.Index(i).Interface()))
|
||||
}
|
||||
}
|
||||
default:
|
||||
data.Set(t.name, fmt.Sprintf("%v", val.Interface()))
|
||||
}
|
||||
}
|
||||
}
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// URLMap returns map of url query params
|
||||
func URLMap(query string) (map[string]interface{}, error) {
|
||||
var mp interface{} = make(map[string]interface{})
|
||||
|
||||
params := strings.Split(query, "&")
|
||||
|
||||
for _, part := range params {
|
||||
tm, err := queryToMap(part)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mp = merge(mp, tm)
|
||||
}
|
||||
|
||||
return mp.(map[string]interface{}), nil
|
||||
}
|
||||
|
||||
// FlattenMap expand key.subkey to nested map
|
||||
func FlattenMap(a map[string]interface{}) map[string]interface{} {
|
||||
// preprocess map
|
||||
nb := make(map[string]interface{}, len(a))
|
||||
for k, v := range a {
|
||||
ps := strings.Split(k, ".")
|
||||
if len(ps) == 1 {
|
||||
nb[k] = v
|
||||
continue
|
||||
}
|
||||
em := make(map[string]interface{})
|
||||
em[ps[len(ps)-1]] = v
|
||||
for i := len(ps) - 2; i > 0; i-- {
|
||||
nm := make(map[string]interface{})
|
||||
nm[ps[i]] = em
|
||||
em = nm
|
||||
}
|
||||
if vm, ok := nb[ps[0]]; ok {
|
||||
// nested map
|
||||
nm := vm.(map[string]interface{})
|
||||
for vk, vv := range em {
|
||||
nm[vk] = vv
|
||||
}
|
||||
nb[ps[0]] = nm
|
||||
} else {
|
||||
nb[ps[0]] = em
|
||||
}
|
||||
}
|
||||
return nb
|
||||
}
|
||||
|
||||
/*
|
||||
case reflect.String:
|
||||
fn := func(c rune) bool { return c == ',' || c == ';' || c == ' ' }
|
||||
slice := strings.FieldsFunc(vb.String(), fn)
|
||||
if va.IsNil() {
|
||||
va.Set(reflect.MakeSlice(va.Type(), len(slice), len(slice)))
|
||||
}
|
||||
*/
|
||||
|
||||
func btSplitter(str string) []string {
|
||||
r := bracketSplitter.Split(str, -1)
|
||||
for idx, s := range r {
|
||||
if len(s) == 0 {
|
||||
if len(r) > idx+1 {
|
||||
copy(r[idx:], r[idx+1:])
|
||||
r = r[:len(r)-1]
|
||||
}
|
||||
}
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
// queryToMap turns something like a[b][c]=4 into
|
||||
// map[string]interface{}{
|
||||
// "a": map[string]interface{}{
|
||||
// "b": map[string]interface{}{
|
||||
// "c": 4,
|
||||
// },
|
||||
// },
|
||||
// }
|
||||
func queryToMap(param string) (map[string]interface{}, error) {
|
||||
rawKey, rawValue, err := splitKeyAndValue(param)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rawValue, err = url.QueryUnescape(rawValue)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rawKey, err = url.QueryUnescape(rawKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pieces := btSplitter(rawKey)
|
||||
key := pieces[0]
|
||||
|
||||
// If len==1 then rawKey has no [] chars and we can just
|
||||
// decode this as key=value into {key: value}
|
||||
if len(pieces) == 1 {
|
||||
return map[string]interface{}{
|
||||
key: rawValue,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// If len > 1 then we have something like a[b][c]=2
|
||||
// so we need to turn this into {"a": {"b": {"c": 2}}}
|
||||
// To do this we break our key into two pieces:
|
||||
// a and b[c]
|
||||
// and then we set {"a": queryToMap("b[c]", value)}
|
||||
ret := make(map[string]interface{})
|
||||
ret[key], err = queryToMap(buildNewKey(rawKey) + "=" + rawValue)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// When URL params have a set of empty brackets (eg a[]=1)
|
||||
// it is assumed to be an array. This will get us the
|
||||
// correct value for the array item and return it as an
|
||||
// []interface{} so that it can be merged properly.
|
||||
if pieces[1] == "" {
|
||||
temp := ret[key].(map[string]interface{})
|
||||
ret[key] = []interface{}{temp[""]}
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// buildNewKey will take something like:
|
||||
// origKey = "bar[one][two]"
|
||||
// pieces = [bar one two ]
|
||||
// and return "one[two]"
|
||||
func buildNewKey(origKey string) string {
|
||||
pieces := btSplitter(origKey)
|
||||
|
||||
ret := origKey[len(pieces[0])+1:]
|
||||
ret = ret[:len(pieces[1])] + ret[len(pieces[1])+1:]
|
||||
return ret
|
||||
}
|
||||
|
||||
// splitKeyAndValue splits a URL param at the last equal
|
||||
// sign and returns the two strings. If no equal sign is
|
||||
// found, the ErrInvalidParam error is returned.
|
||||
func splitKeyAndValue(param string) (string, string, error) {
|
||||
li := strings.LastIndex(param, "=")
|
||||
if li == -1 {
|
||||
return "", "", ErrInvalidParam
|
||||
}
|
||||
return param[:li], param[li+1:], nil
|
||||
}
|
||||
|
||||
// merge merges a with b if they are either both slices
|
||||
// or map[string]interface{} types. Otherwise it returns b.
|
||||
func merge(a interface{}, b interface{}) interface{} {
|
||||
if av, aok := a.(map[string]interface{}); aok {
|
||||
if bv, bok := b.(map[string]interface{}); bok {
|
||||
return mergeMapIface(av, bv)
|
||||
}
|
||||
}
|
||||
if av, aok := a.([]interface{}); aok {
|
||||
if bv, bok := b.([]interface{}); bok {
|
||||
return mergeSliceIface(av, bv)
|
||||
}
|
||||
}
|
||||
|
||||
va := reflect.ValueOf(a)
|
||||
vb := reflect.ValueOf(b)
|
||||
if (va.Type().Kind() == reflect.Slice) && (va.Type().Elem().Kind() == vb.Type().Kind() || vb.Type().ConvertibleTo(va.Type().Elem())) {
|
||||
va = reflect.Append(va, vb.Convert(va.Type().Elem()))
|
||||
return va.Interface()
|
||||
}
|
||||
|
||||
return b
|
||||
}
|
||||
|
||||
// mergeMap merges a with b, attempting to merge any nested
|
||||
// values in nested maps but eventually overwriting anything
|
||||
// in a that can't be merged with whatever is in b.
|
||||
func mergeMapIface(a map[string]interface{}, b map[string]interface{}) map[string]interface{} {
|
||||
for bK, bV := range b {
|
||||
if aV, ok := a[bK]; ok {
|
||||
if (reflect.ValueOf(aV).Type().Kind() == reflect.ValueOf(bV).Type().Kind()) ||
|
||||
((reflect.ValueOf(aV).Type().Kind() == reflect.Slice) && reflect.ValueOf(aV).Type().Elem().Kind() == reflect.ValueOf(bV).Type().Kind()) {
|
||||
nV := []interface{}{aV, bV}
|
||||
a[bK] = nV
|
||||
} else {
|
||||
a[bK] = merge(a[bK], bV)
|
||||
}
|
||||
} else {
|
||||
a[bK] = bV
|
||||
}
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
// mergeSlice merges a with b and returns the result.
|
||||
func mergeSliceIface(a []interface{}, b []interface{}) []interface{} {
|
||||
a = append(a, b...)
|
||||
return a
|
||||
}
|
||||
|
||||
type tag struct {
|
||||
key string
|
||||
name string
|
||||
opts []string
|
||||
}
|
@@ -5,6 +5,62 @@ import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestStructByTag(t *testing.T) {
|
||||
type Str struct {
|
||||
Name []string `json:"name" codec:"flatten"`
|
||||
}
|
||||
|
||||
val := &Str{Name: []string{"first", "second"}}
|
||||
|
||||
iface, err := StructFieldByTag(val, "codec", "flatten")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if v, ok := iface.(*[]string); !ok {
|
||||
t.Fatalf("not *[]string %v", iface)
|
||||
} else if len(*v) != 2 {
|
||||
t.Fatalf("invalid number %v", iface)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStructByName(t *testing.T) {
|
||||
type Str struct {
|
||||
Name []string `json:"name" codec:"flatten"`
|
||||
}
|
||||
|
||||
val := &Str{Name: []string{"first", "second"}}
|
||||
|
||||
iface, err := StructFieldByName(val, "Name")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if v, ok := iface.(*[]string); !ok {
|
||||
t.Fatalf("not *[]string %v", iface)
|
||||
} else if len(*v) != 2 {
|
||||
t.Fatalf("invalid number %v", iface)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStructURLValues(t *testing.T) {
|
||||
type Str struct {
|
||||
Str *Str `json:"str"`
|
||||
Name string `json:"name"`
|
||||
Args []int `json:"args"`
|
||||
}
|
||||
|
||||
val := &Str{Name: "test_name", Args: []int{1, 2, 3}, Str: &Str{Name: "nested_name"}}
|
||||
data, err := StructURLValues(val, "", []string{"json"})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if data.Get("name") != "test_name" {
|
||||
t.Fatalf("invalid data: %v", data)
|
||||
}
|
||||
}
|
||||
|
||||
func TestURLSliceVars(t *testing.T) {
|
||||
u, err := url.Parse("http://localhost/v1/test/call/my_name?key=arg1&key=arg2&key=arg3")
|
||||
if err != nil {
|
||||
@@ -72,8 +128,8 @@ func TestIsZero(t *testing.T) {
|
||||
Nested string
|
||||
}
|
||||
type testStr2 struct {
|
||||
Name string
|
||||
Nested *testStr3
|
||||
Name string
|
||||
}
|
||||
vtest := &testStr2{
|
||||
Name: "test_name",
|
||||
@@ -88,5 +144,5 @@ func TestIsZero(t *testing.T) {
|
||||
t.Fatalf("non zero ret on zero struct: %#+v", vtest)
|
||||
}
|
||||
|
||||
//t.Logf("XX %#+v\n", ok)
|
||||
// t.Logf("XX %#+v\n", ok)
|
||||
}
|
@@ -5,11 +5,11 @@ import (
|
||||
)
|
||||
|
||||
func addNodes(old, neu []*register.Node) []*register.Node {
|
||||
nodes := make([]*register.Node, len(neu))
|
||||
nodes := make([]*register.Node, 0, len(neu))
|
||||
// add all new nodes
|
||||
for i, n := range neu {
|
||||
for _, n := range neu {
|
||||
node := *n
|
||||
nodes[i] = &node
|
||||
nodes = append(nodes, &node)
|
||||
}
|
||||
|
||||
// look at old nodes
|
||||
@@ -19,7 +19,7 @@ func addNodes(old, neu []*register.Node) []*register.Node {
|
||||
// check against new nodes
|
||||
for _, n := range nodes {
|
||||
// ids match then skip
|
||||
if o.Id == n.Id {
|
||||
if o.ID == n.ID {
|
||||
exists = true
|
||||
break
|
||||
}
|
||||
@@ -40,7 +40,7 @@ func delNodes(old, del []*register.Node) []*register.Node {
|
||||
for _, o := range old {
|
||||
var rem bool
|
||||
for _, n := range del {
|
||||
if o.Id == n.Id {
|
||||
if o.ID == n.ID {
|
||||
rem = true
|
||||
break
|
||||
}
|
||||
|
@@ -14,7 +14,7 @@ func TestRemove(t *testing.T) {
|
||||
Version: "1.0.0",
|
||||
Nodes: []*register.Node{
|
||||
{
|
||||
Id: "foo-123",
|
||||
ID: "foo-123",
|
||||
Address: "localhost:9999",
|
||||
},
|
||||
},
|
||||
@@ -24,7 +24,7 @@ func TestRemove(t *testing.T) {
|
||||
Version: "1.0.0",
|
||||
Nodes: []*register.Node{
|
||||
{
|
||||
Id: "foo-123",
|
||||
ID: "foo-123",
|
||||
Address: "localhost:6666",
|
||||
},
|
||||
},
|
||||
@@ -47,11 +47,11 @@ func TestRemoveNodes(t *testing.T) {
|
||||
Version: "1.0.0",
|
||||
Nodes: []*register.Node{
|
||||
{
|
||||
Id: "foo-123",
|
||||
ID: "foo-123",
|
||||
Address: "localhost:9999",
|
||||
},
|
||||
{
|
||||
Id: "foo-321",
|
||||
ID: "foo-321",
|
||||
Address: "localhost:6666",
|
||||
},
|
||||
},
|
||||
@@ -61,7 +61,7 @@ func TestRemoveNodes(t *testing.T) {
|
||||
Version: "1.0.0",
|
||||
Nodes: []*register.Node{
|
||||
{
|
||||
Id: "foo-123",
|
||||
ID: "foo-123",
|
||||
Address: "localhost:6666",
|
||||
},
|
||||
},
|
||||
|
@@ -77,7 +77,7 @@ func (t template) Compile() Template {
|
||||
rawOps = append(rawOps, s.compile()...)
|
||||
}
|
||||
|
||||
//ops := make([]int, 0, len(rawOps))
|
||||
// ops := make([]int, 0, len(rawOps))
|
||||
var (
|
||||
ops []int
|
||||
pool []string
|
||||
|
@@ -50,9 +50,7 @@ func tokenize(path string) (tokens []string, verb string) {
|
||||
field
|
||||
nested
|
||||
)
|
||||
var (
|
||||
st = init
|
||||
)
|
||||
st := init
|
||||
for path != "" {
|
||||
var idx int
|
||||
switch st {
|
||||
|
@@ -209,7 +209,8 @@ func TestParseSegments(t *testing.T) {
|
||||
"a", "/", "b", "/", "*", "/", "c",
|
||||
"}", "/",
|
||||
"**",
|
||||
eof},
|
||||
eof,
|
||||
},
|
||||
want: []segment{
|
||||
literal("v1"),
|
||||
variable{
|
||||
|
@@ -40,7 +40,7 @@ func (c *syncStore) processQueue(index int) {
|
||||
}
|
||||
var opts []store.WriteOption
|
||||
if !ir.expiresAt.IsZero() {
|
||||
opts = append(opts, store.WriteTTL(ir.expiresAt.Sub(time.Now())))
|
||||
opts = append(opts, store.WriteTTL(time.Until(ir.expiresAt)))
|
||||
}
|
||||
// Todo = internal queue also has to hold the corresponding store.WriteOptions
|
||||
if err := c.syncOpts.Stores[index+1].Write(c.storeOpts.Context, ir.key, ir.value, opts...); err != nil {
|
||||
|
@@ -17,10 +17,8 @@ type Basic struct {
|
||||
store store.Store
|
||||
}
|
||||
|
||||
var (
|
||||
// StorePrefix to isolate tokens
|
||||
StorePrefix = "tokens/"
|
||||
)
|
||||
// StorePrefix to isolate tokens
|
||||
var StorePrefix = "tokens/"
|
||||
|
||||
// NewTokenProvider returns an initialized basic provider
|
||||
func NewTokenProvider(opts ...token.Option) token.Provider {
|
||||
|
@@ -83,5 +83,4 @@ func TestInspect(t *testing.T) {
|
||||
t.Fatalf("Inspect returned %v error, expected %v", err, token.ErrInvalidToken)
|
||||
}
|
||||
})
|
||||
|
||||
}
|
||||
|
@@ -46,7 +46,7 @@ func NewOptions(opts ...Option) Options {
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
//set default store
|
||||
// set default store
|
||||
if options.Store == nil {
|
||||
options.Store = store.DefaultStore
|
||||
}
|
||||
@@ -75,7 +75,7 @@ func NewGenerateOptions(opts ...GenerateOption) GenerateOptions {
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
//set default Expiry of token
|
||||
// set default Expiry of token
|
||||
if options.Expiry == 0 {
|
||||
options.Expiry = time.Minute * 15
|
||||
}
|
||||
|
Reference in New Issue
Block a user