Compare commits

...

16 Commits

Author SHA1 Message Date
39f66cc86c add logger wrapper, fix default logger fields method
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-06 23:26:47 +03:00
bbbcb22565 fieldalignment of all structs to save memory
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-06 19:45:13 +03:00
cb70dfa664 meter/wrapper: use meter.DefaultMeter in NewOptions
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-05 17:40:03 +03:00
1f0482fbd5 tracer: finalize tracer implementation
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-04 01:12:16 +03:00
a862562284 fixup domain in ListServices
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-03 18:16:54 +03:00
c320c23913 metadata: minor fixup for NewXXXContext functions
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-01 13:00:53 +03:00
Renovate Bot
ae848ba8bb fix(deps): update golang.org/x/net commit hash to e18ecbb 2021-02-26 19:46:14 +00:00
Renovate Bot
8e264cbb3e fix(deps): update golang.org/x/net commit hash to 39120d0 2021-02-26 12:05:10 +00:00
Renovate Bot
54e523ab3f fix(deps): update golang.org/x/net commit hash to 3d97a24 2021-02-26 08:40:37 +00:00
09973af099 server: add error helper
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-02-22 00:52:18 +03:00
3247da3dd0 metadata: add Pairs helper func
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-02-22 00:08:05 +03:00
b505455f7c run go mod tidy in renovate update
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-02-21 23:55:19 +03:00
293949f081 metadata: add Append func to Incoming/Outgoing context
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-02-21 23:54:59 +03:00
8d7e442b3a server: add SubscriberBodyOnly option
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-02-20 18:12:13 +03:00
renovate[bot]
f7b5211af3 fix(deps): update golang.org/x/net commit hash to 5f55cee (#20)
Co-authored-by: Renovate Bot <bot@renovateapp.com>
2021-02-20 11:40:35 +03:00
7eb6d030dc meter: fix internal labels sorting
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-02-18 15:57:42 +03:00
77 changed files with 1707 additions and 926 deletions

View File

@@ -2,6 +2,7 @@
"extends": [
"config:base"
],
"postUpdateOptions": ["gomodTidy"],
"packageRules": [
{
"matchUpdateTypes": ["minor", "patch", "pin", "digest"],

View File

@@ -34,20 +34,20 @@ type Option func(*Options) error
type Endpoint struct {
// Name Greeter.Hello
Name string
// Description e.g what's this endpoint for
// Desciption for endpoint
Description string
// Handler e.g rpc, proxy
Handler string
// Body destination
// "*" or "" - top level message value
// "string" - inner message value
Body string
// Host e.g example.com
Host []string
// Method e.g GET, POST
Method []string
// Path e.g /greeter. Expect POSIX regex
Path []string
// Body destination
// "*" or "" - top level message value
// "string" - inner message value
Body string
// Stream flag
Stream bool
}

View File

@@ -13,11 +13,11 @@ var (
// Options struct holds handler options
type Options struct {
MaxRecvSize int64
Namespace string
Router router.Router
Client client.Client
Logger logger.Logger
Namespace string
MaxRecvSize int64
}
// Option func signature

View File

@@ -8,9 +8,12 @@ import (
// Options struct
type Options struct {
Handler string
// Context is for external defined options
Context context.Context
// Handler name
Handler string
// ServicePrefix is the prefix
ServicePrefix string
Context context.Context
}
// Option func

View File

@@ -11,11 +11,16 @@ import (
// Options holds the options for api router
type Options struct {
Handler string
// Register for service lookup
Register register.Register
// Resolver to use
Resolver resolver.Resolver
Logger logger.Logger
Context context.Context
// Logger micro logger
Logger logger.Logger
// Context is for external options
Context context.Context
// Handler name
Handler string
}
// Option func signature

View File

@@ -53,30 +53,30 @@ type Auth interface {
// Account provided by an auth provider
type Account struct {
// ID of the account e.g. email
// Metadata any other associated metadata
Metadata metadata.Metadata `json:"metadata"`
// ID of the account e.g. email or uuid
ID string `json:"id"`
// Type of the account, e.g. service
Type string `json:"type"`
// Issuer of the account
Issuer string `json:"issuer"`
// Any other associated metadata
Metadata metadata.Metadata `json:"metadata"`
// Scopes the account has access to
Scopes []string `json:"scopes"`
// Secret for the account, e.g. the password
Secret string `json:"secret"`
// Scopes the account has access to
Scopes []string `json:"scopes"`
}
// Token can be short or long lived
type Token struct {
// The token to be used for accessing resources
AccessToken string `json:"access_token"`
// RefreshToken to be used to generate a new token
RefreshToken string `json:"refresh_token"`
// Time of token creation
Created time.Time `json:"created"`
// Time of token expiry
Expiry time.Time `json:"expiry"`
// The token to be used for accessing resources
AccessToken string `json:"access_token"`
// RefreshToken to be used to generate a new token
RefreshToken string `json:"refresh_token"`
}
// Expired returns a boolean indicating if the token needs to be refreshed
@@ -106,17 +106,15 @@ const (
// Rule is used to verify access to a resource
type Rule struct {
// ID of the rule, e.g. "public"
ID string
// Scope the rule requires, a blank scope indicates open to the public and * indicates the rule
// applies to any valid account
Scope string
// Resource the rule applies to
// Resource that rule belongs to
Resource *Resource
// Access determines if the rule grants or denies access to the resource
// ID of the rule
ID string
// Scope of the rule
Scope string
// Access flag allow/deny
Access Access
// Priority the rule should take when verifying a request, the higher the value the sooner the
// rule will be applied
// Priority holds the rule priority
Priority int32
}

View File

@@ -26,33 +26,34 @@ func NewOptions(opts ...Option) Options {
// Options struct holds auth options
type Options struct {
Name string
// Issuer of the service's account
Issuer string
// ID is the services auth ID
ID string
// Secret is used to authenticate the service
Secret string
// Context holds the external options
Context context.Context
// Meter used for metrics
Meter meter.Meter
// Logger used for logging
Logger logger.Logger
// Tracer used for tracing
Tracer tracer.Tracer
// Store used for stre data
Store store.Store
// Token is the services token used to authenticate itself
Token *Token
// PublicKey for decoding JWTs
PublicKey string
// PrivateKey for encoding JWTs
PrivateKey string
// LoginURL is the relative url path where a user can login
LoginURL string
// Store to back auth
Store store.Store
// PrivateKey for encoding JWTs
PrivateKey string
// PublicKey for decoding JWTs
PublicKey string
// Secret is used to authenticate the service
Secret string
// ID is the services auth ID
ID string
// Issuer of the service's account
Issuer string
// Name holds the auth name
Name string
// Addrs sets the addresses of auth
Addrs []string
// Logger sets the logger
Logger logger.Logger
// Meter sets tht meter
Meter meter.Meter
// Tracer
Tracer tracer.Tracer
// Context to store other options
Context context.Context
}
// Option func
@@ -124,18 +125,12 @@ func LoginURL(url string) Option {
// GenerateOptions struct
type GenerateOptions struct {
// Metadata associated with the account
Metadata metadata.Metadata
// Scopes the account has access too
Scopes []string
// Provider of the account, e.g. oauth
Provider string
// Type of the account, e.g. user
Type string
// Secret used to authenticate the account
Secret string
// Issuer of the account, e.g. micro
Issuer string
Type string
Secret string
Issuer string
Scopes []string
}
// GenerateOption func
@@ -194,16 +189,11 @@ func NewGenerateOptions(opts ...GenerateOption) GenerateOptions {
// TokenOptions struct
type TokenOptions struct {
// ID for the account
ID string
// Secret for the account
Secret string
// RefreshToken is used to refesh a token
ID string
Secret string
RefreshToken string
// Expiry is the time the token should live for
Expiry time.Duration
// Issuer of the account
Issuer string
Issuer string
Expiry time.Duration
}
// TokenOption func

View File

@@ -13,28 +13,27 @@ import (
)
type memoryBroker struct {
opts Options
addr string
sync.RWMutex
connected bool
opts Options
Subscribers map[string][]*memorySubscriber
addr string
sync.RWMutex
connected bool
}
type memoryEvent struct {
opts Options
topic string
err error
message interface{}
topic string
}
type memorySubscriber struct {
id string
topic string
exit chan bool
handler Handler
opts SubscribeOptions
ctx context.Context
exit chan bool
handler Handler
id string
topic string
}
func (m *memoryBroker) Options() Options {

View File

@@ -13,25 +13,26 @@ import (
// Options struct
type Options struct {
Name string
// Addrs useed by broker
Addrs []string
// ErrorHandler executed when errors occur processing messages
ErrorHandler Handler
// Codec used to marshal/unmarshal messages
Codec codec.Codec
// Logger the used logger
Logger logger.Logger
// Meter the used for metrics
Meter meter.Meter
// Tracer used for trace
// Tracer used for tracing
Tracer tracer.Tracer
// TLSConfig for secure communication
TLSConfig *tls.Config
// Register used for clustering
// Register can be used for clustering
Register register.Register
// Context is used for non default options
// Codec holds the codec for marshal/unmarshal
Codec codec.Codec
// Logger used for logging
Logger logger.Logger
// Meter used for metrics
Meter meter.Meter
// Context holds external options
Context context.Context
// TLSConfig holds tls.TLSConfig options
TLSConfig *tls.Config
// ErrorHandler used when broker can't unmarshal incoming message
ErrorHandler Handler
// Name holds the broker name
Name string
// Addrs holds the broker address
Addrs []string
}
// NewOptions create new Options
@@ -59,10 +60,10 @@ func Context(ctx context.Context) Option {
// PublishOptions struct
type PublishOptions struct {
// BodyOnly says that only body of the message must be published
BodyOnly bool
// Context for non default options
// Context holds external options
Context context.Context
// BodyOnly flag says the message contains raw body bytes
BodyOnly bool
}
// NewPublishOptions creates PublishOptions struct
@@ -80,22 +81,16 @@ func NewPublishOptions(opts ...PublishOption) PublishOptions {
// SubscribeOptions struct
type SubscribeOptions struct {
// AutoAck ack messages if handler returns nil err
AutoAck bool
// ErrorHandler executed when errors occur processing messages
ErrorHandler Handler
// Group for subscriber, Subscribers with the same group name
// will create a shared subscription where each
// receives a subset of messages.
Group string
// BodyOnly says that consumed only body of the message
BodyOnly bool
// Context is used for non default options
// Context holds external options
Context context.Context
// ErrorHandler used when broker can't unmarshal incoming message
ErrorHandler Handler
// Group holds consumer group
Group string
// AutoAck flag specifies auto ack of incoming message when no error happens
AutoAck bool
// BodyOnly flag specifies that message contains only body bytes without header
BodyOnly bool
}
// Option func

View File

@@ -21,12 +21,12 @@ type Source struct {
// Package is packaged format for source
type Package struct {
// Source of the package
Source *Source
// Name of the package
Name string
// Location of the package
Path string
// Type of package e.g tarball, binary, docker
Type string
// Source of the package
Source *Source
}

View File

@@ -31,12 +31,12 @@ type noopMessage struct {
}
type noopRequest struct {
body interface{}
codec codec.Codec
service string
method string
endpoint string
contentType string
body interface{}
codec codec.Codec
stream bool
}
@@ -175,7 +175,7 @@ func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opt
}
func (n *noopClient) NewRequest(service, endpoint string, req interface{}, opts ...RequestOption) Request {
return &noopRequest{}
return &noopRequest{service: service, endpoint: endpoint}
}
func (n *noopClient) NewMessage(topic string, msg interface{}, opts ...MessageOption) Message {

View File

@@ -18,35 +18,40 @@ import (
// Options holds client options
type Options struct {
Name string
// Used to select codec
ContentType string
// Proxy address to send requests via
Proxy string
// Plugged interfaces
Broker broker.Broker
Codecs map[string]codec.Codec
Router router.Router
Selector selector.Selector
Transport transport.Transport
Logger logger.Logger
Meter meter.Meter
// Lookup used for looking up routes
Lookup LookupFunc
// Connection Pool
PoolSize int
PoolTTL time.Duration
Tracer tracer.Tracer
// Wrapper that used client
Wrappers []Wrapper
// CallOptions that used by default
// CallOptions contains default CallOptions
CallOptions CallOptions
// Context is used for non default options
// Logger used to log messages
Logger logger.Logger
// Tracer used for tracing
Tracer tracer.Tracer
// Broker used to publish messages
Broker broker.Broker
// Meter used for metrics
Meter meter.Meter
// Router used to get route
Router router.Router
// Selector used to select needed address
Selector selector.Selector
// Transport used for transfer messages
Transport transport.Transport
// Context is used for external options
Context context.Context
// Codecs map
Codecs map[string]codec.Codec
// Lookup func used to get destination addr
Lookup LookupFunc
// Proxy is used for proxy requests
Proxy string
// ContentType is Used to select codec
ContentType string
// Name is the client name
Name string
// Wrappers contains wrappers
Wrappers []Wrapper
// PoolSize connection pool size
PoolSize int
// PoolTTL conection pool ttl
PoolTTL time.Duration
}
// NewCallOptions creates new call options struct
@@ -60,34 +65,34 @@ func NewCallOptions(opts ...CallOption) CallOptions {
// CallOptions holds client call options
type CallOptions struct {
// Address of remote hosts
Address []string
// Backoff func
Backoff BackoffFunc
// DialTimeout is the transport Dial Timeout
DialTimeout time.Duration
// Retries is the number of Call attempts
Retries int
// Retry func to be used for retries
Retry RetryFunc
// RequestTimeout specifies request timeout
RequestTimeout time.Duration
// Router to use for this call
// Router used for route
Router router.Router
// Selector to use for the call
// Selector selects addr
Selector selector.Selector
// SelectOptions to use when selecting a route
SelectOptions []selector.SelectOption
// StreamTimeout timeout for the stream
StreamTimeout time.Duration
// AuthToken specifies the auth token as the authorization header
AuthToken bool
// Network to lookup the route within
Network string
// CallWrappers is for low level call func
CallWrappers []CallWrapper
// Context is used for non default options
// Context used for deadline
Context context.Context
// Retry func used for retries
Retry RetryFunc
// Backoff func used for backoff when retry
Backoff BackoffFunc
// Network name
Network string
// CallWrappers call wrappers
CallWrappers []CallWrapper
// SelectOptions selector options
SelectOptions []selector.SelectOption
// Address specifies static addr list
Address []string
// Retries specifies retries num
Retries int
// StreamTimeout stream timeout
StreamTimeout time.Duration
// RequestTimeout request timeout
RequestTimeout time.Duration
// DialTimeout dial timeout
DialTimeout time.Duration
// AuthToken flag
AuthToken bool
}
// Context pass context to client
@@ -108,10 +113,10 @@ func NewPublishOptions(opts ...PublishOption) PublishOptions {
// PublishOptions holds publish options
type PublishOptions struct {
// Exchange is the routing exchange for the message
Exchange string
// Context holds additional options
// Context used for external options
Context context.Context
// Exchange topic exchange name
Exchange string
}
// NewMessageOptions creates message options struct
@@ -125,6 +130,7 @@ func NewMessageOptions(opts ...MessageOption) MessageOptions {
// MessageOptions holds client message options
type MessageOptions struct {
// ContentType specify content-type of message
ContentType string
}
@@ -139,12 +145,12 @@ func NewRequestOptions(opts ...RequestOption) RequestOptions {
// RequestOptions holds client request options
type RequestOptions struct {
// ContentType specify content-type of request
ContentType string
// Stream says that request is the streaming
Stream bool
// Context can hold other options
// Context used for external options
Context context.Context
// ContentType specify content-type of message
ContentType string
// Stream flag
Stream bool
}
// NewOptions creates new options struct

View File

@@ -5,13 +5,13 @@ import (
)
type testRequest struct {
service string
opts RequestOptions
codec codec.Codec
body interface{}
method string
endpoint string
contentType string
codec codec.Codec
body interface{}
opts RequestOptions
service string
}
func (r *testRequest) ContentType() string {

View File

@@ -51,16 +51,14 @@ type Codec interface {
// the communication, likely followed by the body.
// In the case of an error, body may be nil.
type Message struct {
Id string
Type MessageType
Header metadata.Metadata
Target string
Method string
Endpoint string
Error string
// The values read from the socket
Header metadata.Metadata
Body []byte
Id string
Body []byte
Type MessageType
}
// NewMessage creates new codec message

View File

@@ -11,10 +11,14 @@ type Option func(*Options)
// Options contains codec options
type Options struct {
// Meter used for metrics
Meter meter.Meter
// Logger used for logging
Logger logger.Logger
// Tracer used for tracing
Tracer tracer.Tracer
// MaxMsgSize specifies max messages size that reads by codec
MaxMsgSize int
Meter meter.Meter
Logger logger.Logger
Tracer tracer.Tracer
}
// MaxMsgSize sets the max message size

View File

@@ -11,26 +11,32 @@ import (
// Options hold the config options
type Options struct {
Name string
AllowFail bool
BeforeLoad []func(context.Context, Config) error
AfterLoad []func(context.Context, Config) error
BeforeSave []func(context.Context, Config) error
AfterSave []func(context.Context, Config) error
// Struct that holds config data
// Struct holds the destination config struct
Struct interface{}
// StructTag name
StructTag string
// Logger that will be used
Logger logger.Logger
// Meter that will be used
Meter meter.Meter
// Tracer used for trace
Tracer tracer.Tracer
// Codec that used for load/save
Codec codec.Codec
// Context for alternative data
// Tracer that will be used
Tracer tracer.Tracer
// Meter that will be used
Meter meter.Meter
// Logger that will be used
Logger logger.Logger
// Context used for external options
Context context.Context
// Name of the config
Name string
// StructTag name
StructTag string
// BeforeSave contains slice of funcs that runs before save
BeforeSave []func(context.Context, Config) error
// AfterLoad contains slice of funcs that runs after load
AfterLoad []func(context.Context, Config) error
// BeforeLoad contains slice of funcs that runs before load
BeforeLoad []func(context.Context, Config) error
// AfterSave contains slice of funcs that runs after save
AfterSave []func(context.Context, Config) error
// AllowFail flag to allow fail in config source
AllowFail bool
}
// Option function signature

View File

@@ -37,10 +37,14 @@ var (
// Error type
type Error struct {
Id string
Code int32
// Id holds error id or service, usually someting like my_service or uuid
Id string
// Detail holds some useful details about error
Detail string
// Status usually holds text of http status
Status string
// Code holds error code
Code int32
}
// Error satisfies error interface
@@ -49,7 +53,7 @@ func (e *Error) Error() string {
return string(b)
}
// New generates a custom error.
// New generates a custom error
func New(id, detail string, code int32) error {
return &Error{
Id: id,

8
go.mod
View File

@@ -6,12 +6,8 @@ require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/ef-ds/deque v1.0.4
github.com/google/uuid v1.2.0
github.com/heimdalr/dag v1.0.1 // indirect
github.com/imdario/mergo v0.3.11
github.com/kr/text v0.2.0 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34 // indirect
golang.org/x/net v0.0.0-20210119194325-5f4716e94777
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110
)

18
go.sum
View File

@@ -1,35 +1,21 @@
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/ef-ds/deque v1.0.4 h1:iFAZNmveMT9WERAkqLJ+oaABF9AcVQ5AjXem/hroniI=
github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg=
github.com/go-test/deep v1.0.7 h1:/VSMRlnY/JSyqxQUzQLKVMAskpY/NZKFA5j2P+0pP2M=
github.com/go-test/deep v1.0.7/go.mod h1:QV8Hv/iy04NyLBxAdO9njL0iVPN1S4d/A3NVv1V36o8=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/heimdalr/dag v1.0.1 h1:iR2K3DSUFDYx0GeV7iXBnZkedWS1xePSGrylQ197uxg=
github.com/heimdalr/dag v1.0.1/go.mod h1:t+ZkR+sjKL4xhlE1B9rwpvwfo+x+2R0363efS+Oghns=
github.com/imdario/mergo v0.3.11 h1:3tnifQM4i+fbajXKBHXWEH+KvNHqojZ778UH75j3bGA=
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34 h1:vBfVmA5mZhsQa2jr1FOL9nfA37N/jnbBmi5XUfviVTI=
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
golang.org/x/net v0.0.0-20210119194325-5f4716e94777 h1:003p0dJM77cxMSyCPFphvZf/Y5/NXf5fzg6ufd1/Oew=
golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110 h1:qWPm9rbaAMKs8Bq/9LRpbMqxWRVUAQwMI9fVrssnTfw=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View File

@@ -21,9 +21,9 @@ func init() {
}
type defaultLogger struct {
sync.RWMutex
opts Options
enc *json.Encoder
sync.RWMutex
}
// Init(opts...) should only overwrite provided options
@@ -49,10 +49,18 @@ func (l *defaultLogger) V(level Level) bool {
}
func (l *defaultLogger) Fields(fields map[string]interface{}) Logger {
l.Lock()
l.opts.Fields = copyFields(fields)
l.Unlock()
return l
nl := &defaultLogger{opts: l.opts, enc: l.enc}
nl.opts.Fields = make(map[string]interface{}, len(l.opts.Fields)+len(fields))
l.RLock()
for k, v := range l.opts.Fields {
nl.opts.Fields[k] = v
}
l.RUnlock()
for k, v := range fields {
nl.opts.Fields[k] = v
}
return nl
}
func copyFields(src map[string]interface{}) map[string]interface{} {
@@ -153,7 +161,9 @@ func (l *defaultLogger) Log(ctx context.Context, level Level, args ...interface{
}
fields["timestamp"] = time.Now().Format("2006-01-02 15:04:05")
fields["msg"] = fmt.Sprint(args...)
if len(args) > 0 {
fields["msg"] = fmt.Sprint(args...)
}
l.RLock()
_ = l.enc.Encode(fields)
@@ -178,7 +188,7 @@ func (l *defaultLogger) Logf(ctx context.Context, level Level, msg string, args
fields["timestamp"] = time.Now().Format("2006-01-02 15:04:05")
if len(args) > 0 {
fields["msg"] = fmt.Sprintf(msg, args...)
} else {
} else if msg != "" {
fields["msg"] = msg
}
l.RLock()

View File

@@ -11,17 +11,18 @@ type Option func(*Options)
// Options holds logger options
type Options struct {
Name string
// The logging level the logger should log at. default is `InfoLevel`
Level Level
// fields to always be logged
Fields map[string]interface{}
// It's common to set this to a file, or leave it default which is `os.Stderr`
// Out holds the output writer
Out io.Writer
// Caller skip frame count for file:line info
CallerSkipCount int
// Alternative options
// Context holds exernal options
Context context.Context
// Fields holds additional metadata
Fields map[string]interface{}
// Name holds the logger name
Name string
// CallerSkipCount number of frmaes to skip
CallerSkipCount int
// The logging level the logger should log
Level Level
}
// NewOptions creates new options struct

330
logger/wrapper/wrapper.go Normal file
View File

@@ -0,0 +1,330 @@
// Package wrapper provides wrapper for Tracer
package wrapper
import (
"context"
"github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/server"
)
type lWrapper struct {
client.Client
serverHandler server.HandlerFunc
serverSubscriber server.SubscriberFunc
clientCallFunc client.CallFunc
opts Options
}
type ClientCallObserver func(context.Context, client.Request, interface{}, []client.CallOption, error) []string
type ClientStreamObserver func(context.Context, client.Request, []client.CallOption, client.Stream, error) []string
type ClientPublishObserver func(context.Context, client.Message, []client.PublishOption, error) []string
type ClientCallFuncObserver func(context.Context, string, client.Request, interface{}, client.CallOptions, error) []string
type ServerHandlerObserver func(context.Context, server.Request, interface{}, error) []string
type ServerSubscriberObserver func(context.Context, server.Message, error) []string
type Options struct {
Logger logger.Logger
Level logger.Level
Enabled bool
ClientCallObservers []ClientCallObserver
ClientStreamObservers []ClientStreamObserver
ClientPublishObservers []ClientPublishObserver
ClientCallFuncObservers []ClientCallFuncObserver
ServerHandlerObservers []ServerHandlerObserver
ServerSubscriberObservers []ServerSubscriberObserver
}
type Option func(*Options)
func NewOptions(opts ...Option) Options {
options := Options{
Logger: logger.DefaultLogger,
Level: logger.TraceLevel,
ClientCallObservers: []ClientCallObserver{DefaultClientCallObserver},
ClientStreamObservers: []ClientStreamObserver{DefaultClientStreamObserver},
ClientPublishObservers: []ClientPublishObserver{DefaultClientPublishObserver},
ClientCallFuncObservers: []ClientCallFuncObserver{DefaultClientCallFuncObserver},
ServerHandlerObservers: []ServerHandlerObserver{DefaultServerHandlerObserver},
ServerSubscriberObservers: []ServerSubscriberObserver{DefaultServerSubscriberObserver},
}
for _, o := range opts {
o(&options)
}
return options
}
func WithEnabled(b bool) Option {
return func(o *Options) {
o.Enabled = b
}
}
func WithLevel(l logger.Level) Option {
return func(o *Options) {
o.Level = l
}
}
func WithLogger(l logger.Logger) Option {
return func(o *Options) {
o.Logger = l
}
}
func WithClientCallObservers(ob ...ClientCallObserver) Option {
return func(o *Options) {
o.ClientCallObservers = ob
}
}
func WithClientStreamObservers(ob ...ClientStreamObserver) Option {
return func(o *Options) {
o.ClientStreamObservers = ob
}
}
func WithClientPublishObservers(ob ...ClientPublishObserver) Option {
return func(o *Options) {
o.ClientPublishObservers = ob
}
}
func WithClientCallFuncObservers(ob ...ClientCallFuncObserver) Option {
return func(o *Options) {
o.ClientCallFuncObservers = ob
}
}
func WithServerHandlerObservers(ob ...ServerHandlerObserver) Option {
return func(o *Options) {
o.ServerHandlerObservers = ob
}
}
func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option {
return func(o *Options) {
o.ServerSubscriberObservers = ob
}
}
func DefaultClientCallObserver(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, err error) []string {
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
if err != nil {
labels = append(labels, "error", err.Error())
}
return labels
}
func DefaultClientStreamObserver(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, err error) []string {
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
if err != nil {
labels = append(labels, "error", err.Error())
}
return labels
}
func DefaultClientPublishObserver(ctx context.Context, msg client.Message, opts []client.PublishOption, err error) []string {
labels := []string{"endpoint", msg.Topic()}
if err != nil {
labels = append(labels, "error", err.Error())
}
return labels
}
func DefaultServerHandlerObserver(ctx context.Context, req server.Request, rsp interface{}, err error) []string {
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
if err != nil {
labels = append(labels, "error", err.Error())
}
return labels
}
func DefaultServerSubscriberObserver(ctx context.Context, msg server.Message, err error) []string {
labels := []string{"endpoint", msg.Topic()}
if err != nil {
labels = append(labels, "error", err.Error())
}
return labels
}
func DefaultClientCallFuncObserver(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, err error) []string {
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
if err != nil {
labels = append(labels, "error", err.Error())
}
return labels
}
func (l *lWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
err := l.Client.Call(ctx, req, rsp, opts...)
if !l.opts.Enabled {
return err
}
var labels []string
for _, o := range l.opts.ClientCallObservers {
labels = append(labels, o(ctx, req, rsp, opts, err)...)
}
fields := make(map[string]interface{}, int(len(labels)/2))
for i := 0; i < len(labels); i += 2 {
fields[labels[i]] = labels[i+1]
}
l.opts.Logger.Fields(fields).Log(ctx, l.opts.Level)
return err
}
func (l *lWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
stream, err := l.Client.Stream(ctx, req, opts...)
if !l.opts.Enabled {
return stream, err
}
var labels []string
for _, o := range l.opts.ClientStreamObservers {
labels = append(labels, o(ctx, req, opts, stream, err)...)
}
fields := make(map[string]interface{}, int(len(labels)/2))
for i := 0; i < len(labels); i += 2 {
fields[labels[i]] = labels[i+1]
}
l.opts.Logger.Fields(fields).Log(ctx, l.opts.Level)
return stream, err
}
func (l *lWrapper) Publish(ctx context.Context, msg client.Message, opts ...client.PublishOption) error {
err := l.Client.Publish(ctx, msg, opts...)
if !l.opts.Enabled {
return err
}
var labels []string
for _, o := range l.opts.ClientPublishObservers {
labels = append(labels, o(ctx, msg, opts, err)...)
}
fields := make(map[string]interface{}, int(len(labels)/2))
for i := 0; i < len(labels); i += 2 {
fields[labels[i]] = labels[i+1]
}
l.opts.Logger.Fields(fields).Log(ctx, l.opts.Level)
return err
}
func (l *lWrapper) ServerHandler(ctx context.Context, req server.Request, rsp interface{}) error {
err := l.serverHandler(ctx, req, rsp)
if !l.opts.Enabled {
return err
}
var labels []string
for _, o := range l.opts.ServerHandlerObservers {
labels = append(labels, o(ctx, req, rsp, err)...)
}
fields := make(map[string]interface{}, int(len(labels)/2))
for i := 0; i < len(labels); i += 2 {
fields[labels[i]] = labels[i+1]
}
l.opts.Logger.Fields(fields).Log(ctx, l.opts.Level)
return err
}
func (l *lWrapper) ServerSubscriber(ctx context.Context, msg server.Message) error {
err := l.serverSubscriber(ctx, msg)
if !l.opts.Enabled {
return err
}
var labels []string
for _, o := range l.opts.ServerSubscriberObservers {
labels = append(labels, o(ctx, msg, err)...)
}
fields := make(map[string]interface{}, int(len(labels)/2))
for i := 0; i < len(labels); i += 2 {
fields[labels[i]] = labels[i+1]
}
l.opts.Logger.Fields(fields).Log(ctx, l.opts.Level)
return err
}
// NewClientWrapper accepts an open tracing Trace and returns a Client Wrapper
func NewClientWrapper(opts ...Option) client.Wrapper {
return func(c client.Client) client.Client {
options := NewOptions()
for _, o := range opts {
o(&options)
}
return &lWrapper{opts: options, Client: c}
}
}
// NewClientCallWrapper accepts an opentracing Tracer and returns a Call Wrapper
func NewClientCallWrapper(opts ...Option) client.CallWrapper {
return func(h client.CallFunc) client.CallFunc {
options := NewOptions()
for _, o := range opts {
o(&options)
}
l := &lWrapper{opts: options, clientCallFunc: h}
return l.ClientCallFunc
}
}
func (l *lWrapper) ClientCallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
err := l.clientCallFunc(ctx, addr, req, rsp, opts)
if !l.opts.Enabled {
return err
}
var labels []string
for _, o := range l.opts.ClientCallFuncObservers {
labels = append(labels, o(ctx, addr, req, rsp, opts, err)...)
}
fields := make(map[string]interface{}, int(len(labels)/2))
for i := 0; i < len(labels); i += 2 {
fields[labels[i]] = labels[i+1]
}
l.opts.Logger.Fields(fields).Log(ctx, l.opts.Level)
return err
}
// NewServerHandlerWrapper accepts an options and returns a Handler Wrapper
func NewServerHandlerWrapper(opts ...Option) server.HandlerWrapper {
return func(h server.HandlerFunc) server.HandlerFunc {
options := NewOptions()
for _, o := range opts {
o(&options)
}
l := &lWrapper{opts: options, serverHandler: h}
return l.ServerHandler
}
}
// NewServerSubscriberWrapper accepts an opentracing Tracer and returns a Subscriber Wrapper
func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
return func(h server.SubscriberFunc) server.SubscriberFunc {
options := NewOptions()
for _, o := range opts {
o(&options)
}
l := &lWrapper{opts: options, serverSubscriber: h}
return l.ServerSubscriber
}
}

View File

@@ -93,7 +93,9 @@ func NewIncomingContext(ctx context.Context, md Metadata) context.Context {
ctx = context.Background()
}
ctx = context.WithValue(ctx, mdIncomingKey{}, &rawMetadata{md})
ctx = context.WithValue(ctx, mdOutgoingKey{}, &rawMetadata{})
if v, ok := ctx.Value(mdOutgoingKey{}).(*rawMetadata); !ok || v == nil {
ctx = context.WithValue(ctx, mdOutgoingKey{}, &rawMetadata{})
}
return ctx
}
@@ -103,6 +105,40 @@ func NewOutgoingContext(ctx context.Context, md Metadata) context.Context {
ctx = context.Background()
}
ctx = context.WithValue(ctx, mdOutgoingKey{}, &rawMetadata{md})
ctx = context.WithValue(ctx, mdIncomingKey{}, &rawMetadata{})
if v, ok := ctx.Value(mdIncomingKey{}).(*rawMetadata); !ok || v == nil {
ctx = context.WithValue(ctx, mdIncomingKey{}, &rawMetadata{})
}
return ctx
}
// AppendOutgoingContext apends new md to context
func AppendOutgoingContext(ctx context.Context, kv ...string) context.Context {
md, ok := Pairs(kv...)
if !ok {
return ctx
}
omd, ok := FromOutgoingContext(ctx)
if !ok {
return NewOutgoingContext(ctx, md)
}
for k, v := range md {
omd.Set(k, v)
}
return NewOutgoingContext(ctx, omd)
}
// AppendIncomingContext apends new md to context
func AppendIncomingContext(ctx context.Context, kv ...string) context.Context {
md, ok := Pairs(kv...)
if !ok {
return ctx
}
omd, ok := FromIncomingContext(ctx)
if !ok {
return NewIncomingContext(ctx, md)
}
for k, v := range md {
omd.Set(k, v)
}
return NewIncomingContext(ctx, omd)
}

View File

@@ -27,10 +27,10 @@ var (
// Iterator used to iterate over metadata with order
type Iterator struct {
md Metadata
keys []string
cur int
cnt int
keys []string
md Metadata
}
// Next advance iterator to next element
@@ -111,3 +111,19 @@ func Merge(omd Metadata, mmd Metadata, overwrite bool) Metadata {
}
return nmd
}
func Pairs(kv ...string) (Metadata, bool) {
if len(kv)%2 == 1 {
return nil, false
}
md := New(len(kv) / 2)
var k string
for i, v := range kv {
if i%2 == 0 {
k = v
continue
}
md.Set(k, v)
}
return md, true
}

View File

@@ -5,6 +5,28 @@ import (
"testing"
)
func TestAppend(t *testing.T) {
ctx := context.Background()
ctx = AppendIncomingContext(ctx, "key1", "val1", "key2", "val2")
md, ok := FromIncomingContext(ctx)
if !ok {
t.Fatal("metadata empty")
}
if _, ok := md.Get("key1"); !ok {
t.Fatal("key1 not found")
}
}
func TestPairs(t *testing.T) {
md, ok := Pairs("key1", "val1", "key2", "val2")
if !ok {
t.Fatal("odd number of kv")
}
if _, ok = md.Get("key1"); !ok {
t.Fatal("key1 not found")
}
}
func testCtx(ctx context.Context) {
md := New(2)
md.Set("Key1", "Val1_new")

View File

@@ -82,25 +82,6 @@ type Labels struct {
vals []string
}
type labels Labels
func (ls labels) sort() {
sort.Sort(ls)
}
func (ls labels) Len() int {
return len(ls.keys)
}
func (ls labels) Swap(i, j int) {
ls.keys[i], ls.keys[j] = ls.keys[j], ls.keys[i]
ls.vals[i], ls.vals[j] = ls.vals[j], ls.vals[i]
}
func (ls labels) Less(i, j int) bool {
return ls.vals[i] < ls.vals[j]
}
// Append adds labels to label set
func (ls Labels) Append(nls Labels) Labels {
for n := range nls.keys {
@@ -110,10 +91,30 @@ func (ls Labels) Append(nls Labels) Labels {
return ls
}
// Len returns number of labels
func (ls Labels) Len() int {
return len(ls.keys)
}
type labels Labels
func (ls labels) Len() int {
return len(ls.keys)
}
func (ls labels) Sort() {
sort.Sort(ls)
}
func (ls labels) Swap(i, j int) {
ls.keys[i], ls.keys[j] = ls.keys[j], ls.keys[i]
ls.vals[i], ls.vals[j] = ls.vals[j], ls.vals[i]
}
func (ls labels) Less(i, j int) bool {
return ls.keys[i] < ls.keys[j]
}
// LabelIter holds the
type LabelIter struct {
labels Labels
@@ -123,7 +124,7 @@ type LabelIter struct {
// Iter returns labels iterator
func (ls Labels) Iter() *LabelIter {
labels(ls).sort()
labels(ls).Sort()
return &LabelIter{labels: ls, cnt: len(ls.keys)}
}

View File

@@ -5,12 +5,12 @@ import (
)
func TestNoopMeter(t *testing.T) {
meter := NewMeter(Path("/noop"))
if "/noop" != meter.Options().Path {
t.Fatalf("invalid options parsing: %v", meter.Options())
m := NewMeter(Path("/noop"))
if "/noop" != m.Options().Path {
t.Fatalf("invalid options parsing: %v", m.Options())
}
cnt := meter.Counter("counter", Label("server", "noop"))
cnt := m.Counter("counter", Label("server", "noop"))
cnt.Inc()
}
@@ -32,15 +32,22 @@ func TestLabelsAppend(t *testing.T) {
}
func TestIterator(t *testing.T) {
var ls Labels
ls.keys = []string{"type", "server", "register"}
ls.vals = []string{"noop", "http", "gossip"}
options := NewOptions(
Label("name", "svc1"),
Label("version", "0.0.1"),
Label("id", "12345"),
Label("type", "noop"),
Label("server", "http"),
Label("register", "gossip"),
Label("aa", "kk"),
Label("zz", "kk"),
)
iter := ls.Iter()
iter := options.Labels.Iter()
var k, v string
cnt := 0
for iter.Next(&k, &v) {
if cnt == 1 && (k != "server" || v != "http") {
if cnt == 4 && (k != "server" || v != "http") {
t.Fatalf("iter error: %s != %s || %s != %s", k, "server", v, "http")
}
cnt++

View File

@@ -11,17 +11,26 @@ type Option func(*Options)
// Options for metrics implementations:
type Options struct {
Name string
// Logger used for logging
Logger logger.Logger
// Context holds external options
Context context.Context
// Name holds the meter name
Name string
// Address holds the address that serves metrics
Address string
Path string
Labels Labels
//TimingObjectives map[float64]float64
Logger logger.Logger
Context context.Context
MetricPrefix string
LabelPrefix string
// Path holds the path for metrics
Path string
// MetricPrefix holds the prefix for all metrics
MetricPrefix string
// LabelPrefix holds the prefix for all labels
LabelPrefix string
// Labels holds the default labels
Labels Labels
// WriteProcessMetrics flag to write process metrics
WriteProcessMetrics bool
WriteFDMetrics bool
// WriteFDMetrics flag to write fd metrics
WriteFDMetrics bool
}
// NewOptions prepares a set of options:

View File

@@ -31,16 +31,17 @@ var (
)
type Options struct {
Meter meter.Meter
Name string
Version string
ID string
Meter meter.Meter
lopts []meter.Option
}
type Option func(*Options)
func NewOptions(opts ...Option) Options {
options := Options{}
options := Options{
Meter: meter.DefaultMeter,
lopts: make([]meter.Option, 0, 5),
}
for _, o := range opts {
o(&options)
}
@@ -49,19 +50,19 @@ func NewOptions(opts ...Option) Options {
func ServiceName(name string) Option {
return func(o *Options) {
o.Name = name
o.lopts = append(o.lopts, meter.Label("name", name))
}
}
func ServiceVersion(version string) Option {
return func(o *Options) {
o.Version = version
o.lopts = append(o.lopts, meter.Label("version", version))
}
}
func ServiceID(id string) Option {
return func(o *Options) {
o.ID = id
o.lopts = append(o.lopts, meter.Label("id", id))
}
}
@@ -72,9 +73,9 @@ func Meter(m meter.Meter) Option {
}
type wrapper struct {
opts Options
callFunc client.CallFunc
client.Client
callFunc client.CallFunc
opts Options
}
func NewClientWrapper(opts ...Option) client.Wrapper {
@@ -104,7 +105,7 @@ func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request,
err := w.callFunc(ctx, addr, req, rsp, opts)
te := time.Since(ts)
lopts := make([]meter.Option, 0, 2)
lopts := w.opts.lopts
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
@@ -127,7 +128,7 @@ func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{},
err := w.Client.Call(ctx, req, rsp, opts...)
te := time.Since(ts)
lopts := make([]meter.Option, 0, 2)
lopts := w.opts.lopts
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
@@ -150,7 +151,7 @@ func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client
stream, err := w.Client.Stream(ctx, req, opts...)
te := time.Since(ts)
lopts := make([]meter.Option, 0, 2)
lopts := w.opts.lopts
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
@@ -173,7 +174,7 @@ func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.
err := w.Client.Publish(ctx, p, opts...)
te := time.Since(ts)
lopts := make([]meter.Option, 0, 2)
lopts := w.opts.lopts
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
@@ -204,7 +205,7 @@ func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc {
err := fn(ctx, req, rsp)
te := time.Since(ts)
lopts := make([]meter.Option, 0, 2)
lopts := w.opts.lopts
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
w.opts.Meter.Summary(ServerRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
@@ -236,7 +237,7 @@ func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc
err := fn(ctx, msg)
te := time.Since(ts)
lopts := make([]meter.Option, 0, 2)
lopts := w.opts.lopts
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))

View File

@@ -15,6 +15,18 @@ type Option func(*Options)
// Options configure network
type Options struct {
// Router used for routing
Router router.Router
// Proxy holds the proxy
Proxy proxy.Proxy
// Logger used for logging
Logger logger.Logger
// Meter used for metrics
Meter meter.Meter
// Tracer used for tracing
Tracer tracer.Tracer
// Tunnel used for transfer data
Tunnel tunnel.Tunnel
// Id of the node
Id string
// Name of the network
@@ -25,18 +37,6 @@ type Options struct {
Advertise string
// Nodes is a list of nodes to connect to
Nodes []string
// Tunnel is network tunnel
Tunnel tunnel.Tunnel
// Router is network router
Router router.Router
// Proxy is network proxy
Proxy proxy.Proxy
// Logger
Logger logger.Logger
// Meter
Meter meter.Meter
// Tracer
Tracer tracer.Tracer
}
// Id sets the id of the network node

View File

@@ -14,19 +14,14 @@ import (
)
type memorySocket struct {
recv chan *Message
send chan *Message
// sock exit
exit chan bool
// listener exit
lexit chan bool
local string
remote string
// for send/recv transport.Timeout
timeout time.Duration
ctx context.Context
recv chan *Message
exit chan bool
lexit chan bool
send chan *Message
local string
remote string
timeout time.Duration
sync.RWMutex
}
@@ -36,19 +31,19 @@ type memoryClient struct {
}
type memoryListener struct {
addr string
topts Options
ctx context.Context
lopts ListenOptions
exit chan bool
conn chan *memorySocket
lopts ListenOptions
topts Options
addr string
sync.RWMutex
ctx context.Context
}
type memoryTransport struct {
opts Options
sync.RWMutex
opts Options
listeners map[string]*memoryListener
sync.RWMutex
}
func (ms *memorySocket) Recv(m *Message) error {

View File

@@ -13,26 +13,24 @@ import (
// Options struct holds the transport options
type Options struct {
Name string
// Addrs is the list of intermediary addresses to connect to
Addrs []string
// Codec is the codec interface to use where headers are not supported
// by the transport and the entire payload must be encoded
Codec codec.Codec
// TLSConfig to secure the connection. The assumption is that this
// is mTLS keypair
TLSConfig *tls.Config
// Timeout sets the timeout for Send/Recv
Timeout time.Duration
// Logger sets the logger
Logger logger.Logger
// Meter sets the meter
// Meter used for metrics
Meter meter.Meter
// Tracer sets the tracer
// Tracer used for tracing
Tracer tracer.Tracer
// Other options for implementations of the interface
// can be stored in a context
// Codec used for marshal/unmarshal messages
Codec codec.Codec
// Logger used for logging
Logger logger.Logger
// Context holds external options
Context context.Context
// TLSConfig holds tls.TLSConfig options
TLSConfig *tls.Config
// Name holds the transport name
Name string
// Addrs holds the transport addrs
Addrs []string
// Timeout holds the timeout
Timeout time.Duration
}
// NewOptions returns new options
@@ -53,18 +51,12 @@ func NewOptions(opts ...Option) Options {
// DialOptions struct
type DialOptions struct {
// Tells the transport this is a streaming connection with
// multiple calls to send/recv and that send may not even be called
Stream bool
// Timeout for dialing
Timeout time.Duration
// TODO: add tls options when dialling
// Currently set in global options
// Other options for implementations of the interface
// can be stored in a context
// Context holds the external options
Context context.Context
// Timeout holds the timeout
Timeout time.Duration
// Stream flag
Stream bool
}
// NewDialOptions returns new DialOptions
@@ -85,10 +77,10 @@ func NewDialOptions(opts ...DialOption) DialOptions {
type ListenOptions struct {
// TODO: add tls options when listening
// Currently set in global options
// Other options for implementations of the interface
// can be stored in a context
// Context holds the external options
Context context.Context
// TLSConfig holds the *tls.Config options
TLSConfig *tls.Config
}
// NewListenOptions returns new ListenOptions

View File

@@ -17,17 +17,16 @@ type tunBroker struct {
}
type tunSubscriber struct {
topic string
handler broker.Handler
opts broker.SubscribeOptions
closed chan bool
opts broker.SubscribeOptions
listener tunnel.Listener
handler broker.Handler
closed chan bool
topic string
}
type tunEvent struct {
topic string
message *broker.Message
topic string
}
// used to access tunnel from options context

View File

@@ -22,23 +22,24 @@ type Option func(*Options)
// Options provides network configuration options
type Options struct {
Name string
// Id is tunnel id
Id string
// Address is tunnel address
Address string
// Nodes are remote nodes
Nodes []string
// The shared auth token
Token string
// Transport listens to incoming connections
Transport transport.Transport
// Logger
// Logger used for logging
Logger logger.Logger
// Meter
// Meter used for metrics
Meter meter.Meter
// Tracer
// Tracer used for tracing
Tracer tracer.Tracer
// Transport used for communication
Transport transport.Transport
// Token the shared auth token
Token string
// Name holds the tunnel name
Name string
// Id holds the tunnel id
Id string
// Address holds the tunnel address
Address string
// Nodes holds the tunnel nodes
Nodes []string
}
// DialOption func
@@ -61,9 +62,9 @@ type ListenOption func(*ListenOptions)
// ListenOptions provides listen options
type ListenOptions struct {
// specify mode of the session
// Mode specify mode of the session
Mode Mode
// The read timeout
// Timeout the read timeout
Timeout time.Duration
}

View File

@@ -23,33 +23,44 @@ import (
// Options for micro service
type Options struct {
Name string
Version string
Metadata metadata.Metadata
Auths []auth.Auth
Brokers []broker.Broker
Loggers []logger.Logger
Meters []meter.Meter
Configs []config.Config
Clients []client.Client
Servers []server.Server
Stores []store.Store
Registers []register.Register
Tracers []tracer.Tracer
Routers []router.Router
// Runtime runtime.Runtime
// Profile profile.Profile
// Before and After funcs
BeforeStart []func(context.Context) error
BeforeStop []func(context.Context) error
AfterStart []func(context.Context) error
AfterStop []func(context.Context) error
// Other options for implementations of the interface
// can be stored in a context
// Context holds external options or cancel stuff
Context context.Context
// Metadata holds service metadata
Metadata metadata.Metadata
// Version holds service version
Version string
// Name holds service name
Name string
// Brokers holds brokers
Brokers []broker.Broker
// Loggers holds loggers
Loggers []logger.Logger
// Meters holds meter
Meters []meter.Meter
// Configs holds config
Configs []config.Config
// Clients holds clients
Clients []client.Client
// Auths holds auths
Auths []auth.Auth
// Stores holds stores
Stores []store.Store
// Registers holds registers
Registers []register.Register
// Tracers holds tracers
Tracers []tracer.Tracer
// Routers holds routers
Routers []router.Router
// BeforeStart holds funcs that runs before service starts
BeforeStart []func(context.Context) error
// BeforeStop holds funcs that runs before service stops
BeforeStop []func(context.Context) error
// AfterStart holds funcs that runs after service starts
AfterStart []func(context.Context) error
// AfterStop holds funcs that runs after service stops
AfterStop []func(context.Context) error
// Servers holds servers
Servers []server.Server
}
// NewOptions returns new Options filled with defaults and overrided by provided opts

View File

@@ -11,9 +11,9 @@ import (
)
type httpProfile struct {
server *http.Server
sync.Mutex
running bool
server *http.Server
}
var (

View File

@@ -13,16 +13,12 @@ import (
)
type profiler struct {
opts profile.Options
exit chan bool
cpuFile *os.File
memFile *os.File
opts profile.Options
sync.Mutex
running bool
exit chan bool
// where the cpu profile is written
cpuFile *os.File
// where the mem profile is written
memFile *os.File
}
func (p *profiler) writeHeap(f *os.File) {

View File

@@ -11,20 +11,20 @@ import (
// Options for proxy
type Options struct {
// Specific endpoint to always call
Endpoint string
// The default client to use
Client client.Client
// The default router to use
Router router.Router
// Extra links for different clients
Links map[string]client.Client
// Logger
Logger logger.Logger
// Meter
Meter meter.Meter
// Tracer
// Tracer used for tracing
Tracer tracer.Tracer
// Client for communication
Client client.Client
// Router for routing
Router router.Router
// Logger used for logging
Logger logger.Logger
// Meter used for metrics
Meter meter.Meter
// Links holds the communication links
Links map[string]client.Client
// Endpoint holds the destination address
Endpoint string
}
// Option func signature

View File

@@ -16,9 +16,9 @@ var (
)
type node struct {
*Node
TTL time.Duration
LastSeen time.Time
*Node
TTL time.Duration
}
type record struct {
@@ -369,9 +369,9 @@ func (m *memory) ListServices(ctx context.Context, opts ...ListOption) ([]*Servi
// serialize the result, each version counts as an individual service
var result []*Service
for domain, service := range services {
for _, service := range services {
for _, version := range service {
result = append(result, recordToService(version, domain))
result = append(result, recordToService(version, options.Domain))
}
}
@@ -405,10 +405,10 @@ func (m *memory) String() string {
}
type watcher struct {
id string
wo WatchOptions
res chan *Result
exit chan bool
wo WatchOptions
id string
}
func (m *watcher) Next() (*Result, error) {

View File

@@ -12,20 +12,22 @@ import (
// Options holds options for register
type Options struct {
Name string
Addrs []string
Timeout time.Duration
TLSConfig *tls.Config
// Logger that will be used
Logger logger.Logger
// Meter that will be used
Meter meter.Meter
// Tracer
// Tracer used for tracing
Tracer tracer.Tracer
// Other options for implementations of the interface
// can be stored in a context
// Context holds external options
Context context.Context
// Logged used for logging
Logger logger.Logger
// Meter used for metrics
Meter meter.Meter
// TLSConfig holds tls.TLSConfig options
TLSConfig *tls.Config
// Name holds the name of register
Name string
// Addrs specifies register addrs
Addrs []string
// Timeout specifies timeout
Timeout time.Duration
}
// NewOptions returns options that filled by opts
@@ -44,13 +46,9 @@ func NewOptions(opts ...Option) Options {
// RegisterOptions holds options for register method
type RegisterOptions struct {
TTL time.Duration
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
// Domain to register the service in
Domain string
// Attempts specify attempts for register
Context context.Context
Domain string
TTL time.Duration
Attempts int
}

View File

@@ -52,17 +52,17 @@ type Service struct {
// Node holds node register info
type Node struct {
Metadata metadata.Metadata `json:"metadata"`
Id string `json:"id"`
Address string `json:"address"`
Metadata metadata.Metadata `json:"metadata"`
}
// Endpoint holds endpoint register info
type Endpoint struct {
Name string `json:"name"`
Request *Value `json:"request"`
Response *Value `json:"response"`
Metadata metadata.Metadata `json:"metadata"`
Name string `json:"name"`
}
// Value holds additional kv stuff

View File

@@ -7,14 +7,17 @@ import "time"
type Watcher interface {
// Next is a blocking call
Next() (*Result, error)
// Stop stops the watcher
Stop()
}
// Result is returned by a call to Next on
// the watcher. Actions can be create, update, delete
type Result struct {
Action string
// Service holds register service
Service *Service
// Action holds the action
Action string
}
// EventType defines register event type
@@ -45,12 +48,12 @@ func (t EventType) String() string {
// Event is register event
type Event struct {
// Id is register id
Id string
// Type defines type of event
Type EventType
// Timestamp is event timestamp
Timestamp time.Time
// Service is register service
Service *Service
// Id is register id
Id string
// Type defines type of event
Type EventType
}

View File

@@ -12,9 +12,9 @@ import (
// Resolver is a DNS network resolve
type Resolver struct {
// The resolver address to use
Address string
goresolver *net.Resolver
// Address of resolver to use
Address string
sync.RWMutex
}

View File

@@ -10,23 +10,15 @@ import (
// Options are router options
type Options struct {
Name string
// Id is router id
Id string
// Address is router address
Address string
// Gateway is network gateway
Gateway string
// Network is network address
Network string
// Register is the local register
Logger logger.Logger
Context context.Context
Register register.Register
// Precache routes
Name string
Gateway string
Network string
Id string
Address string
Precache bool
// Logger
Logger logger.Logger
// Context for additional options
Context context.Context
}
// Id sets Router Id

View File

@@ -15,10 +15,10 @@ var (
// Route is network route
type Route struct {
// Metadata for the route
Metadata metadata.Metadata
// Service is destination service name
Service string
// Address is service node address
Address string
// Gateway is route gateway
Gateway string
// Network is network address
@@ -27,10 +27,10 @@ type Route struct {
Router string
// Link is network link
Link string
// Address is service node address
Address string
// Metric is the route cost metric
Metric int64
// Metadata for the route
Metadata metadata.Metadata
}
// Hash returns route hash sum.

View File

@@ -38,14 +38,14 @@ func (t EventType) String() string {
// Event is returned by a call to Next on the watcher.
type Event struct {
// Unique id of the event
// Route is table route
Route Route
// Timestamp is event timestamp
Timestamp time.Time
// Id of the event
Id string
// Type defines type of event
Type EventType
// Timestamp is event timestamp
Timestamp time.Time
// Route is table route
Route Route
}
// Watcher defines routing table watcher interface

View File

@@ -10,18 +10,12 @@ import (
// Options configure runtime
type Options struct {
// Scheduler for updates
Scheduler Scheduler
// Service type to manage
Type string
// Source of the services repository
Source string
// Base image to use
Image string
// Client to use when making requests
Client client.Client
// Logger
Logger logger.Logger
Client client.Client
Logger logger.Logger
Type string
Source string
Image string
}
// Option func signature
@@ -77,42 +71,26 @@ type ReadOption func(o *ReadOptions)
// CreateOptions configure runtime services
type CreateOptions struct {
// Command to execut
Command []string
// Args to pass into command
Args []string
// Environment to configure
Env []string
// Log output
Output io.Writer
// Type of service to create
Type string
// Retries before failing deploy
Retries int
// Specify the image to use
Image string
// Namespace to create the service in
Namespace string
// Specify the context to use
Context context.Context
// Secrets to use
Secrets map[string]string
// Resources to allocate the service
Context context.Context
Output io.Writer
Resources *Resources
Secrets map[string]string
Image string
Namespace string
Type string
Command []string
Args []string
Env []string
Retries int
}
// ReadOptions queries runtime services
type ReadOptions struct {
// Service name
Service string
// Version queries services with given version
Version string
// Type of service
Type string
// Namespace the service is running in
Context context.Context
Service string
Version string
Type string
Namespace string
// Specify the context to use
Context context.Context
}
// CreateType sets the type of service to create
@@ -238,12 +216,9 @@ type UpdateOption func(o *UpdateOptions)
// UpdateOptions struct
type UpdateOptions struct {
// Namespace the service is running in
Context context.Context
Secrets map[string]string
Namespace string
// Specify the context to use
Context context.Context
// Secrets to use
Secrets map[string]string
}
// UpdateSecret sets a secret to provide the service with
@@ -276,10 +251,8 @@ type DeleteOption func(o *DeleteOptions)
// DeleteOptions struct
type DeleteOptions struct {
// Namespace the service is running in
Context context.Context
Namespace string
// Specify the context to use
Context context.Context
}
// DeleteNamespace sets the namespace
@@ -301,14 +274,10 @@ type LogsOption func(o *LogsOptions)
// LogsOptions configure runtime logging
type LogsOptions struct {
// How many existing lines to show
Count int64
// Stream new lines?
Stream bool
// Namespace the service is running in
Context context.Context
Namespace string
// Specify the context to use
Context context.Context
Count int64
Stream bool
}
// LogsCount confiures how many existing lines to show

View File

@@ -37,15 +37,20 @@ type Runtime interface {
// Logs returns a log stream
type Logs interface {
// Error retuns error
Error() error
// Chan return chan log
Chan() chan Log
// Stop stops the log stream
Stop() error
}
// Log is a log message
type Log struct {
Message string
// Metadata holds metadata
Metadata metadata.Metadata
// Message holds the message
Message string
}
// Scheduler is a runtime service scheduler
@@ -84,28 +89,28 @@ func (t EventType) String() string {
// Event is notification event
type Event struct {
// ID of the event
ID string
// Type is event type
Type EventType
// Timestamp is event timestamp
// Timestamp of event
Timestamp time.Time
// Service the event relates to
Service *Service
// Options to use when processing the event
Options *CreateOptions
// ID of the event
ID string
// Type is event type
Type EventType
}
// Service is runtime service
type Service struct {
// Metadata stores metadata
Metadata metadata.Metadata
// Name of the service
Name string
// Version of the service
Version string
// url location of source
// Name of the service
Source string
// Metadata stores metadata
Metadata metadata.Metadata
}
// Resources which are allocated to a serivce

59
server/errors.go Normal file
View File

@@ -0,0 +1,59 @@
package server
import "github.com/unistack-org/micro/v3/errors"
type Error struct {
id string
}
func NewError(id string) *Error {
return &Error{id}
}
func (e *Error) BadRequest(format string, a ...interface{}) error {
return errors.BadRequest(e.id, format, a...)
}
func (e *Error) Unauthorized(format string, a ...interface{}) error {
return errors.Unauthorized(e.id, format, a...)
}
func (e *Error) Forbidden(format string, a ...interface{}) error {
return errors.Forbidden(e.id, format, a...)
}
func (e *Error) NotFound(format string, a ...interface{}) error {
return errors.NotFound(e.id, format, a...)
}
func (e *Error) MethodNotAllowed(format string, a ...interface{}) error {
return errors.MethodNotAllowed(e.id, format, a...)
}
func (e *Error) Timeout(format string, a ...interface{}) error {
return errors.Timeout(e.id, format, a...)
}
func (e *Error) Conflict(format string, a ...interface{}) error {
return errors.Conflict(e.id, format, a...)
}
func (e *Error) InternalServerError(format string, a ...interface{}) error {
return errors.InternalServerError(e.id, format, a...)
}
func (e *Error) NotImplemented(format string, a ...interface{}) error {
return errors.NotImplemented(e.id, format, a...)
}
func (e *Error) BadGateway(format string, a ...interface{}) error {
return errors.BadGateway(e.id, format, a...)
}
func (e *Error) ServiceUnavailable(format string, a ...interface{}) error {
return errors.ServiceUnavailable(e.id, format, a...)
}
func (e *Error) GatewayTimeout(format string, a ...interface{}) error {
return errors.GatewayTimeout(e.id, format, a...)
}

19
server/errors_test.go Normal file
View File

@@ -0,0 +1,19 @@
package server
import (
"testing"
"github.com/unistack-org/micro/v3/errors"
)
func TestError(t *testing.T) {
e := NewError("svc1")
err := e.BadRequest("%s", "test")
merr, ok := err.(*errors.Error)
if !ok {
t.Fatal("error not *errors.Error")
}
if merr.Id != "svc1" {
t.Fatal("id != svc1")
}
}

View File

@@ -7,10 +7,10 @@ import (
)
type rpcHandler struct {
name string
handler interface{}
endpoints []*register.Endpoint
opts HandlerOptions
handler interface{}
name string
endpoints []*register.Endpoint
}
func newRpcHandler(handler interface{}, opts ...HandlerOption) Handler {

View File

@@ -32,16 +32,16 @@ const (
)
type noopServer struct {
h Handler
opts Options
h Handler
rsvc *register.Service
handlers map[string]Handler
subscribers map[*subscriber][]broker.Subscriber
registered bool
started bool
exit chan chan error
wg *sync.WaitGroup
sync.RWMutex
registered bool
started bool
}
// NewServer returns new noop server

View File

@@ -23,50 +23,62 @@ type Option func(*Options)
// Options server struct
type Options struct {
Codecs map[string]codec.Codec
Broker broker.Broker
Register register.Register
Tracer tracer.Tracer
Auth auth.Auth
Logger logger.Logger
Meter meter.Meter
Transport transport.Transport
Metadata metadata.Metadata
Name string
Address string
Advertise string
Id string
Namespace string
Version string
HdlrWrappers []HandlerWrapper
SubWrappers []SubscriberWrapper
// RegisterCheck runs a check function before registering the service
RegisterCheck func(context.Context) error
// The register expiry time
RegisterTTL time.Duration
// The interval on which to register
RegisterInterval time.Duration
// RegisterAttempts specify how many times try to register
RegisterAttempts int
// DeegisterAttempts specify how many times try to deregister
DeregisterAttempts int
// The router for requests
// Context holds the external options and can be used for server shutdown
Context context.Context
// Broker holds the server broker
Broker broker.Broker
// Register holds the register
Register register.Register
// Tracer holds the tracer
Tracer tracer.Tracer
// Auth holds the auth
Auth auth.Auth
// Logger holds the logger
Logger logger.Logger
// Meter holds the meter
Meter meter.Meter
// Transport holds the transport
Transport transport.Transport
// Router for requests
Router Router
// TLSConfig specifies tls.Config for secure serving
TLSConfig *tls.Config
Wait *sync.WaitGroup
// Listener may be passed if already created
Listener net.Listener
// MaxConn limit connections to server
// Wait group
Wait *sync.WaitGroup
// TLSConfig specifies tls.Config for secure serving
TLSConfig *tls.Config
// Metadata holds the server metadata
Metadata metadata.Metadata
// RegisterCheck run before register server
RegisterCheck func(context.Context) error
// Codecs map to handle content-type
Codecs map[string]codec.Codec
// Id holds the id of the server
Id string
// Namespace for te server
Namespace string
// Name holds the server name
Name string
// Address holds the server address
Address string
// Advertise holds the advertie addres
Advertise string
// Version holds the server version
Version string
// SubWrappers holds the server subscribe wrappers
SubWrappers []SubscriberWrapper
// HdlrWrappers holds the handler wrappers
HdlrWrappers []HandlerWrapper
// RegisterAttempts holds the number of register attempts before error
RegisterAttempts int
// RegisterInterval holds he interval for re-register
RegisterInterval time.Duration
// RegisterTTL specifies TTL for register record
RegisterTTL time.Duration
// MaxConn limits number of connections
MaxConn int
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
// DeregisterAttempts holds the number of deregister attempts before error
DeregisterAttempts int
}
// NewOptions returns new options struct with default or passed values
@@ -303,9 +315,12 @@ type HandlerOption func(*HandlerOptions)
// HandlerOptions struct
type HandlerOptions struct {
Internal bool
// Context holds external options
Context context.Context
// Metadata for hondler
Metadata map[string]metadata.Metadata
Context context.Context
// Internal flag limits exporting to other nodes via register
Internal bool
}
// NewHandlerOptions creates new HandlerOptions
@@ -327,12 +342,16 @@ type SubscriberOption func(*SubscriberOptions)
// SubscriberOptions struct
type SubscriberOptions struct {
// AutoAck defaults to true. When a handler returns
// with a nil error the message is acked.
AutoAck bool
Queue string
// Context holds the external options
Context context.Context
// Queue holds the subscribtion queue
Queue string
// AutoAck flag for auto ack messages after processing
AutoAck bool
// Internal flag limit exporting info via register
Internal bool
Context context.Context
// BodyOnly flag specifies that message without headers
BodyOnly bool
}
// NewSubscriberOptions create new SubscriberOptions
@@ -389,6 +408,20 @@ func SubscriberQueue(n string) SubscriberOption {
}
}
// SubscriberGroup sets the shared group name distributed messages across subscribers
func SubscriberGroup(n string) SubscriberOption {
return func(o *SubscriberOptions) {
o.Queue = n
}
}
// SubscriberBodyOnly says broker that message contains raw data with absence of micro broker.Message format
func SubscriberBodyOnly(b bool) SubscriberOption {
return func(o *SubscriberOptions) {
o.BodyOnly = b
}
}
// SubscriberContext set context options to allow broker SubscriberOption passed
func SubscriberContext(ctx context.Context) SubscriberOption {
return func(o *SubscriberOptions) {

View File

@@ -6,12 +6,12 @@ import (
)
type rpcMessage struct {
payload interface{}
codec codec.Codec
header metadata.Metadata
topic string
contentType string
payload interface{}
header metadata.Metadata
body []byte
codec codec.Codec
}
func (r *rpcMessage) ContentType() string {

View File

@@ -28,19 +28,19 @@ var (
)
type handler struct {
method reflect.Value
reqType reflect.Type
ctxType reflect.Type
method reflect.Value
}
type subscriber struct {
topic string
rcvr reflect.Value
opts SubscriberOptions
typ reflect.Type
subscriber interface{}
rcvr reflect.Value
topic string
handlers []*handler
endpoints []*register.Endpoint
opts SubscriberOptions
}
// Is this an exported - upper case - name?

View File

@@ -14,28 +14,27 @@ import (
// Options contains configuration for the Store
type Options struct {
// Meter used for metrics
Meter meter.Meter
// Tracer used for tracing
Tracer tracer.Tracer
// Context holds external options
Context context.Context
// Codec used to marshal/unmarshal
Codec codec.Codec
// Logger used for logging
Logger logger.Logger
// TLSConfig holds tls.TLSConfig options
TLSConfig *tls.Config
// Name specifies store name
Name string
// Nodes contains the addresses or other connection information of the backing storage.
// For example, an etcd implementation would contain the nodes of the cluster.
// A SQL implementation could contain one or more connection strings.
Nodes []string
// Database allows multiple isolated stores to be kept in one backend, if supported.
// Database specifies store database
Database string
// Table is analag for a table in database backends or a key prefix in KV backends
// Table specifies store table
Table string
// Codec that used for marshal/unmarshal value
Codec codec.Codec
// Logger the logger
Logger logger.Logger
// Meter the meter
Meter meter.Meter
// Tracer the tacer
Tracer tracer.Tracer
// TLSConfig specifies tls.Config for secure
TLSConfig *tls.Config
// Context should contain all implementation specific options
Context context.Context
// Nodes contains store address
// TODO: replace with Addrs
Nodes []string
}
// NewOptions creates options struct
@@ -139,10 +138,14 @@ func NewReadOptions(opts ...ReadOption) ReadOptions {
// ReadOptions configures an individual Read operation
type ReadOptions struct {
Database string
Table string
// Context holds external options
Context context.Context
// Database holds the database name
Database string
// Table holds table name
Table string
// Namespace holds namespace
Namespace string
Context context.Context
}
// ReadOption sets values in ReadOptions
@@ -167,12 +170,18 @@ func NewWriteOptions(opts ...WriteOption) WriteOptions {
// WriteOptions configures an individual Write operation
type WriteOptions struct {
Database string
Table string
TTL time.Duration
Metadata metadata.Metadata
// Context holds external options
Context context.Context
// Metadata contains additional metadata
Metadata metadata.Metadata
// Database holds database name
Database string
// Table holds table name
Table string
// Namespace holds namespace
Namespace string
Context context.Context
// TTL specifies key TTL
TTL time.Duration
}
// WriteOption sets values in WriteOptions
@@ -211,10 +220,14 @@ func NewDeleteOptions(opts ...DeleteOption) DeleteOptions {
// DeleteOptions configures an individual Delete operation
type DeleteOptions struct {
Database string
Table string
// Context holds external options
Context context.Context
// Database holds database name
Database string
// Table holds table name
Table string
// Namespace holds namespace
Namespace string
Context context.Context
}
// DeleteOption sets values in DeleteOptions
@@ -239,18 +252,14 @@ func NewListOptions(opts ...ListOption) ListOptions {
// ListOptions configures an individual List operation
type ListOptions struct {
// List from the following
Database, Table string
// Prefix returns all keys that are prefixed with key
Prefix string
// Suffix returns all keys that end with key
Suffix string
// Limit limits the number of returned keys
Limit uint
// Offset when combined with Limit supports pagination
Offset uint
Namespace string
Context context.Context
Database string
Prefix string
Suffix string
Namespace string
Table string
Limit uint
Offset uint
}
// ListOption sets values in ListOptions
@@ -297,8 +306,10 @@ type ExistsOption func(*ExistsOptions)
// ExistsOptions holds options for Exists method
type ExistsOptions struct {
// Context holds external options
Context context.Context
// Namespace contains namespace
Namespace string
Context context.Context
}
// NewExistsOptions helper for Exists method

View File

@@ -7,23 +7,22 @@ import (
type memorySync struct {
options Options
mtx gosync.RWMutex
locks map[string]*memoryLock
locks map[string]*memoryLock
mtx gosync.RWMutex
}
type memoryLock struct {
id string
time time.Time
ttl time.Duration
release chan bool
id string
ttl time.Duration
}
type memoryLeader struct {
opts LeaderOptions
id string
resign func(id string) error
status chan bool
id string
}
func (m *memoryLeader) Resign() error {

View File

@@ -10,11 +10,17 @@ import (
// Options holds the sync options
type Options struct {
Nodes []string
Prefix string
// Logger used for logging
Logger logger.Logger
// Tracer used for tracing
Tracer tracer.Tracer
Meter meter.Meter
// Meter used for merics
Meter meter.Meter
// Prefix holds prefix?
Prefix string
// Nodes holds the nodes
// TODO: change to Addrs ?
Nodes []string
}
// Option func signature

View File

@@ -3,41 +3,46 @@ package tracer
import (
"context"
"github.com/unistack-org/micro/v3/metadata"
)
const (
traceIDKey = "Micro-Trace-Id"
spanIDKey = "Micro-Span-Id"
)
type tracerKey struct{}
// FromContext returns a span from context
func FromContext(ctx context.Context) (traceID string, parentSpanID string, isFound bool) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", "", false
// FromContext returns a tracer from context
func FromContext(ctx context.Context) Tracer {
if ctx == nil {
return DefaultTracer
}
traceID, traceOk := md.Get(traceIDKey)
microID, microOk := md.Get("Micro-Id")
if !traceOk && !microOk {
isFound = false
return
if tracer, ok := ctx.Value(tracerKey{}).(Tracer); ok {
return tracer
}
if !traceOk {
traceID = microID
}
parentSpanID, ok = md.Get(spanIDKey)
return traceID, parentSpanID, ok
return DefaultTracer
}
// NewContext saves the trace and span ids in the context
func NewContext(ctx context.Context, traceID, parentSpanID string) context.Context {
md, ok := metadata.FromContext(ctx)
if !ok {
md = metadata.New(2)
// NewContext saves the tracer in the context
func NewContext(ctx context.Context, tracer Tracer) context.Context {
if ctx == nil {
ctx = context.Background()
}
md.Set(traceIDKey, traceID)
md.Set(spanIDKey, parentSpanID)
return metadata.NewContext(ctx, md)
return context.WithValue(ctx, tracerKey{}, tracer)
}
type spanKey struct{}
// SpanFromContext returns a span from context
func SpanFromContext(ctx context.Context) Span {
if ctx == nil {
return &noopSpan{}
}
if span, ok := ctx.Value(spanKey{}).(Span); ok {
return span
}
return &noopSpan{}
}
// NewSpanContext saves the span in the context
func NewSpanContext(ctx context.Context, span Span) context.Context {
if ctx == nil {
ctx = context.Background()
}
return context.WithValue(ctx, spanKey{}, span)
}

View File

@@ -1,99 +0,0 @@
package tracer
import (
"context"
"time"
"github.com/google/uuid"
"github.com/unistack-org/micro/v3/util/ring"
)
type tracer struct {
opts Options
// ring buffer of traces
buffer *ring.Buffer
}
func (t *tracer) Read(opts ...ReadOption) ([]*Span, error) {
var options ReadOptions
for _, o := range opts {
o(&options)
}
sp := t.buffer.Get(t.buffer.Size())
spans := make([]*Span, 0, len(sp))
for _, span := range sp {
val := span.Value.(*Span)
// skip if trace id is specified and doesn't match
if len(options.Trace) > 0 && val.Trace != options.Trace {
continue
}
spans = append(spans, val)
}
return spans, nil
}
func (t *tracer) Start(ctx context.Context, name string) (context.Context, *Span) {
span := &Span{
Name: name,
Trace: uuid.New().String(),
Id: uuid.New().String(),
Started: time.Now(),
Metadata: make(map[string]string),
}
// return span if no context
if ctx == nil {
return NewContext(context.Background(), span.Trace, span.Id), span
}
traceID, parentSpanID, ok := FromContext(ctx)
// If the trace can not be found in the header,
// that means this is where the trace is created.
if !ok {
return NewContext(ctx, span.Trace, span.Id), span
}
// set trace id
span.Trace = traceID
// set parent
span.Parent = parentSpanID
// return the span
return NewContext(ctx, span.Trace, span.Id), span
}
func (t *tracer) Finish(s *Span) error {
// set finished time
s.Duration = time.Since(s.Started)
// save the span
t.buffer.Put(s)
return nil
}
func (t *tracer) Init(opts ...Option) error {
for _, o := range opts {
o(&t.opts)
}
return nil
}
func (t *tracer) Lookup(ctx context.Context) (*Span, error) {
return nil, nil
}
func (t *tracer) Name() string {
return t.opts.Name
}
// NewTracer returns new memory tracer
func NewTracer(opts ...Option) Tracer {
return &tracer{
opts: NewOptions(opts...),
// the last 256 requests
buffer: ring.New(256),
}
}

69
tracer/noop.go Normal file
View File

@@ -0,0 +1,69 @@
package tracer
import (
"context"
)
type noopTracer struct {
opts Options
}
func (t *noopTracer) Start(ctx context.Context, name string, opts ...SpanOption) (context.Context, Span) {
span := &noopSpan{
name: name,
ctx: ctx,
tracer: t,
}
if span.ctx == nil {
span.ctx = context.Background()
}
return NewSpanContext(ctx, span), span
}
func (t *noopTracer) Init(opts ...Option) error {
for _, o := range opts {
o(&t.opts)
}
return nil
}
func (t *noopTracer) Name() string {
return t.opts.Name
}
type noopSpan struct {
ctx context.Context
tracer Tracer
name string
}
func (s *noopSpan) Finish(opts ...SpanOption) {
}
func (s *noopSpan) Context() context.Context {
return s.ctx
}
func (s *noopSpan) Tracer() Tracer {
return s.tracer
}
func (s *noopSpan) AddEvent(name string, opts ...EventOption) {
}
func (s *noopSpan) SetName(name string) {
s.name = name
}
func (s *noopSpan) SetLabels(labels ...Label) {
}
// NewTracer returns new memory tracer
func NewTracer(opts ...Option) Tracer {
return &noopTracer{
opts: NewOptions(opts...),
}
}

View File

@@ -2,39 +2,27 @@ package tracer
import "github.com/unistack-org/micro/v3/logger"
var (
// DefaultSize of the buffer
DefaultSize = 64
)
type SpanOptions struct {
}
type SpanOption func(o *SpanOptions)
type EventOptions struct {
}
type EventOption func(o *EventOptions)
// Options struct
type Options struct {
Name string
// Logger is the logger for messages
// Logger used for logging
Logger logger.Logger
// Size is the size of ring buffer
Size int
// Name of the tracer
Name string
}
// Option func
type Option func(o *Options)
// ReadOptions struct
type ReadOptions struct {
// Trace id
Trace string
}
// ReadOption func
type ReadOption func(o *ReadOptions)
// ReadTrace read the given trace
func ReadTrace(t string) ReadOption {
return func(o *ReadOptions) {
o.Trace = t
}
}
// Logger sets the logger
func Logger(l logger.Logger) Option {
return func(o *Options) {
@@ -46,7 +34,6 @@ func Logger(l logger.Logger) Option {
func NewOptions(opts ...Option) Options {
options := Options{
Logger: logger.DefaultLogger,
Size: DefaultSize,
}
for _, o := range opts {
o(&options)

View File

@@ -3,9 +3,6 @@ package tracer
import (
"context"
"time"
"github.com/unistack-org/micro/v3/metadata"
)
var (
@@ -15,44 +12,54 @@ var (
// Tracer is an interface for distributed tracing
type Tracer interface {
// Name return tracer name
Name() string
// Init tracer with options
Init(...Option) error
// Start a trace
Start(ctx context.Context, name string) (context.Context, *Span)
// Finish the trace
Finish(*Span) error
// Lookup get span from context
Lookup(ctx context.Context) (*Span, error)
// Read the traces
Read(...ReadOption) ([]*Span, error)
Start(ctx context.Context, name string, opts ...SpanOption) (context.Context, Span)
}
// SpanType describe the nature of the trace span
type SpanType int
const (
// SpanTypeRequestInbound is a span created when serving a request
SpanTypeRequestInbound SpanType = iota
// SpanTypeRequestOutbound is a span created when making a service call
SpanTypeRequestOutbound
)
// Span is used to record an entry
type Span struct {
// Id of the trace
Trace string
// name of the span
Name string
// id of the span
Id string
// parent span id
Parent string
// Start time
Started time.Time
// Duration in nano seconds
Duration time.Duration
// associated data
Metadata metadata.Metadata
// Type
Type SpanType
type Span interface {
// Tracer return underlining tracer
Tracer() Tracer
// Finish complete and send span
Finish(opts ...SpanOption)
// AddEvent add event to span
AddEvent(name string, opts ...EventOption)
// Context return context with span
Context() context.Context
// SetName set the span name
SetName(name string)
// SetLabels set the span labels
SetLabels(labels ...Label)
}
type Label struct {
val interface{}
key string
}
func Any(k string, v interface{}) Label {
return Label{key: k, val: v}
}
func String(k string, v string) Label {
return Label{key: k, val: v}
}
func Int(k string, v int) Label {
return Label{key: k, val: v}
}
func Int64(k string, v int64) Label {
return Label{key: k, val: v}
}
func Float64(k string, v float64) Label {
return Label{key: k, val: v}
}
func Bool(k string, v bool) Label {
return Label{key: k, val: v}
}

317
tracer/wrapper/wrapper.go Normal file
View File

@@ -0,0 +1,317 @@
// Package wrapper provides wrapper for Tracer
package wrapper
import (
"context"
"fmt"
"github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/server"
"github.com/unistack-org/micro/v3/tracer"
)
type tWrapper struct {
client.Client
serverHandler server.HandlerFunc
serverSubscriber server.SubscriberFunc
clientCallFunc client.CallFunc
opts Options
}
type ClientCallObserver func(context.Context, client.Request, interface{}, []client.CallOption, tracer.Span, error)
type ClientStreamObserver func(context.Context, client.Request, []client.CallOption, client.Stream, tracer.Span, error)
type ClientPublishObserver func(context.Context, client.Message, []client.PublishOption, tracer.Span, error)
type ClientCallFuncObserver func(context.Context, string, client.Request, interface{}, client.CallOptions, tracer.Span, error)
type ServerHandlerObserver func(context.Context, server.Request, interface{}, tracer.Span, error)
type ServerSubscriberObserver func(context.Context, server.Message, tracer.Span, error)
type Options struct {
Tracer tracer.Tracer
ClientCallObservers []ClientCallObserver
ClientStreamObservers []ClientStreamObserver
ClientPublishObservers []ClientPublishObserver
ClientCallFuncObservers []ClientCallFuncObserver
ServerHandlerObservers []ServerHandlerObserver
ServerSubscriberObservers []ServerSubscriberObserver
}
type Option func(*Options)
func NewOptions(opts ...Option) Options {
options := Options{
Tracer: tracer.DefaultTracer,
ClientCallObservers: []ClientCallObserver{DefaultClientCallObserver},
ClientStreamObservers: []ClientStreamObserver{DefaultClientStreamObserver},
ClientPublishObservers: []ClientPublishObserver{DefaultClientPublishObserver},
ClientCallFuncObservers: []ClientCallFuncObserver{DefaultClientCallFuncObserver},
ServerHandlerObservers: []ServerHandlerObserver{DefaultServerHandlerObserver},
ServerSubscriberObservers: []ServerSubscriberObserver{DefaultServerSubscriberObserver},
}
for _, o := range opts {
o(&options)
}
return options
}
func WithTracer(t tracer.Tracer) Option {
return func(o *Options) {
o.Tracer = t
}
}
func WithClientCallObservers(ob ...ClientCallObserver) Option {
return func(o *Options) {
o.ClientCallObservers = ob
}
}
func WithClientStreamObservers(ob ...ClientStreamObserver) Option {
return func(o *Options) {
o.ClientStreamObservers = ob
}
}
func WithClientPublishObservers(ob ...ClientPublishObserver) Option {
return func(o *Options) {
o.ClientPublishObservers = ob
}
}
func WithClientCallFuncObservers(ob ...ClientCallFuncObserver) Option {
return func(o *Options) {
o.ClientCallFuncObservers = ob
}
}
func WithServerHandlerObservers(ob ...ServerHandlerObserver) Option {
return func(o *Options) {
o.ServerHandlerObservers = ob
}
}
func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option {
return func(o *Options) {
o.ServerSubscriberObservers = ob
}
}
func DefaultClientCallObserver(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
var labels []tracer.Label
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
func DefaultClientStreamObserver(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
var labels []tracer.Label
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
func DefaultClientPublishObserver(ctx context.Context, msg client.Message, opts []client.PublishOption, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("Pub to %s", msg.Topic()))
var labels []tracer.Label
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
func DefaultServerHandlerObserver(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
var labels []tracer.Label
if md, ok := metadata.FromIncomingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
func DefaultServerSubscriberObserver(ctx context.Context, msg server.Message, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("Sub from %s", msg.Topic()))
var labels []tracer.Label
if md, ok := metadata.FromIncomingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
func DefaultClientCallFuncObserver(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
var labels []tracer.Label
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
sp := tracer.SpanFromContext(ctx)
defer sp.Finish()
err := ot.Client.Call(ctx, req, rsp, opts...)
for _, o := range ot.opts.ClientCallObservers {
o(ctx, req, rsp, opts, sp, err)
}
return err
}
func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
sp := tracer.SpanFromContext(ctx)
defer sp.Finish()
stream, err := ot.Client.Stream(ctx, req, opts...)
for _, o := range ot.opts.ClientStreamObservers {
o(ctx, req, opts, stream, sp, err)
}
return stream, err
}
func (ot *tWrapper) Publish(ctx context.Context, msg client.Message, opts ...client.PublishOption) error {
sp := tracer.SpanFromContext(ctx)
defer sp.Finish()
err := ot.Client.Publish(ctx, msg, opts...)
for _, o := range ot.opts.ClientPublishObservers {
o(ctx, msg, opts, sp, err)
}
return err
}
func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp interface{}) error {
sp := tracer.SpanFromContext(ctx)
defer sp.Finish()
err := ot.serverHandler(ctx, req, rsp)
for _, o := range ot.opts.ServerHandlerObservers {
o(ctx, req, rsp, sp, err)
}
return err
}
func (ot *tWrapper) ServerSubscriber(ctx context.Context, msg server.Message) error {
sp := tracer.SpanFromContext(ctx)
defer sp.Finish()
err := ot.serverSubscriber(ctx, msg)
for _, o := range ot.opts.ServerSubscriberObservers {
o(ctx, msg, sp, err)
}
return err
}
// NewClientWrapper accepts an open tracing Trace and returns a Client Wrapper
func NewClientWrapper(opts ...Option) client.Wrapper {
return func(c client.Client) client.Client {
options := NewOptions()
for _, o := range opts {
o(&options)
}
return &tWrapper{opts: options, Client: c}
}
}
// NewClientCallWrapper accepts an opentracing Tracer and returns a Call Wrapper
func NewClientCallWrapper(opts ...Option) client.CallWrapper {
return func(h client.CallFunc) client.CallFunc {
options := NewOptions()
for _, o := range opts {
o(&options)
}
ot := &tWrapper{opts: options, clientCallFunc: h}
return ot.ClientCallFunc
}
}
func (ot *tWrapper) ClientCallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
sp := tracer.SpanFromContext(ctx)
defer sp.Finish()
err := ot.clientCallFunc(ctx, addr, req, rsp, opts)
for _, o := range ot.opts.ClientCallFuncObservers {
o(ctx, addr, req, rsp, opts, sp, err)
}
return err
}
// NewServerHandlerWrapper accepts an options and returns a Handler Wrapper
func NewServerHandlerWrapper(opts ...Option) server.HandlerWrapper {
return func(h server.HandlerFunc) server.HandlerFunc {
options := NewOptions()
for _, o := range opts {
o(&options)
}
ot := &tWrapper{opts: options, serverHandler: h}
return ot.ServerHandler
}
}
// NewServerSubscriberWrapper accepts an opentracing Tracer and returns a Subscriber Wrapper
func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
return func(h server.SubscriberFunc) server.SubscriberFunc {
options := NewOptions()
for _, o := range opts {
o(&options)
}
ot := &tWrapper{opts: options, serverSubscriber: h}
return ot.ServerSubscriber
}
}

View File

@@ -11,17 +11,16 @@ import (
// CertOptions are passed to cert options
type CertOptions struct {
IsCA bool
NotAfter time.Time
NotBefore time.Time
Parent *x509.Certificate
SerialNumber *big.Int
Subject pkix.Name
DNSNames []string
IPAddresses []net.IP
SerialNumber *big.Int
NotBefore time.Time
NotAfter time.Time
Parent *x509.Certificate
Pub ed25519.PublicKey
Priv ed25519.PrivateKey
Pub ed25519.PublicKey
Priv ed25519.PrivateKey
IsCA bool
}
// CertOption sets CertOptions

View File

@@ -10,18 +10,17 @@ import (
)
type pool struct {
size int
ttl time.Duration
tr transport.Transport
sync.Mutex
tr transport.Transport
conns map[string][]*poolConn
size int
ttl time.Duration
sync.Mutex
}
type poolConn struct {
transport.Client
id string
created time.Time
transport.Client
id string
}
func newPool(options Options) *pool {

View File

@@ -10,11 +10,10 @@ import (
// Buffer is ring buffer
type Buffer struct {
size int
sync.RWMutex
vals []*Entry
streams map[string]*Stream
vals []*Entry
size int
sync.RWMutex
}
// Entry is ring buffer data entry
@@ -25,12 +24,12 @@ type Entry struct {
// Stream is used to stream the buffer
type Stream struct {
// Id of the stream
Id string
// Buffered entries
Entries chan *Entry
// Stop channel
Stop chan bool
// Id of the stream
Id string
}
// Put adds a new value to ring buffer

View File

@@ -8,18 +8,18 @@ const (
// Template is a compiled representation of path templates.
type Template struct {
// Version is the version number of the format.
Version int
// OpCodes is a sequence of operations.
// Verb is a VERB part in the template
Verb string
// Original template (example: /v1/a_bit_of_everything)
Template string
// OpCodes is a sequence of operations
OpCodes []int
// Pool is a constant pool
Pool []string
// Verb is a VERB part in the template.
Verb string
// Fields is a list of field paths bound in this template.
// Fields is a list of field paths bound in this template
Fields []string
// Original template (example: /v1/a_bit_of_everything)
Template string
// Version is the version number of the format
Version int
}
// Compiler compiles utilities representation of path templates into marshallable operations.
@@ -29,14 +29,8 @@ type Compiler interface {
}
type op struct {
// code is the opcode of the operation
code OpCode
// str is a string operand of the code.
// operand is ignored if str is not empty.
str string
// operand is a numeric operand of the code.
str string
code OpCode
operand int
}

View File

@@ -6,8 +6,8 @@ import (
)
type apiRouter struct {
routes []router.Route
router.Router
routes []router.Route
}
func (r *apiRouter) Lookup(...router.QueryOption) ([]router.Route, error) {

View File

@@ -25,9 +25,10 @@ type rop struct {
// Pattern is a template pattern of http request paths defined in github.com/googleapis/googleapis/google/api/http.proto.
type Pattern struct {
verb string
// ops is a list of operations
ops []rop
// pool is a constant pool indexed by the operands or vars.
// pool is a constant pool indexed by the operands or vars
pool []string
// vars is a list of variables names to be bound by this pattern
vars []string
@@ -35,8 +36,6 @@ type Pattern struct {
stacksize int
// tailLen is the length of the fixed-size segments after a deep wildcard
tailLen int
// verb is the VERB part of the path pattern. It is empty if the pattern does not have VERB part.
verb string
// assumeColonVerb indicates whether a path suffix after a final
// colon may only be interpreted as a verb.
assumeColonVerb bool

View File

@@ -8,9 +8,9 @@ import (
)
type template struct {
segments []segment
verb string
template string
segments []segment
}
type segment interface {

View File

@@ -6,8 +6,8 @@ import (
// Pool holds the socket pool
type Pool struct {
sync.RWMutex
pool map[string]*Socket
sync.RWMutex
}
// Get socket from pool

View File

@@ -9,17 +9,12 @@ import (
// Socket is our pseudo socket for transport.Socket
type Socket struct {
id string
// closed
closed chan bool
// remote addr
send chan *transport.Message
recv chan *transport.Message
id string
remote string
// local addr
local string
// send chan
send chan *transport.Message
// recv chan
recv chan *transport.Message
local string
}
// SetLocal sets the local addr

View File

@@ -21,10 +21,9 @@ type Stream interface {
type stream struct {
Stream
sync.RWMutex
err error
request *request
sync.RWMutex
}
type request struct {

View File

@@ -21,9 +21,9 @@ type Sync interface {
type syncStore struct {
storeOpts store.Options
syncOpts Options
pendingWrites []*deque.Deque
pendingWriteTickers []*time.Ticker
syncOpts Options
sync.RWMutex
}
@@ -123,7 +123,7 @@ func (c *syncStore) Sync() error {
}
type internalRecord struct {
expiresAt time.Time
key string
value []byte
expiresAt time.Time
}

View File

@@ -12,11 +12,10 @@ import (
// authClaims to be encoded in the JWT
type authClaims struct {
Type string `json:"type"`
Scopes []string `json:"scopes"`
Metadata metadata.Metadata `json:"metadata"`
jwt.StandardClaims
Type string `json:"type"`
Scopes []string `json:"scopes"`
}
// JWT implementation of token provider
@@ -51,7 +50,7 @@ func (j *JWT) Generate(acc *auth.Account, opts ...token.GenerateOption) (*token.
// generate the JWT
expiry := time.Now().Add(options.Expiry)
t := jwt.NewWithClaims(jwt.SigningMethodRS256, authClaims{
acc.Type, acc.Scopes, acc.Metadata, jwt.StandardClaims{
Type: acc.Type, Scopes: acc.Scopes, Metadata: acc.Metadata, StandardClaims: jwt.StandardClaims{
Subject: acc.ID,
Issuer: acc.Issuer,
ExpiresAt: expiry.Unix(),

View File

@@ -25,10 +25,10 @@ type Provider interface {
// Token holds the auth token
type Token struct {
// The actual token
Token string `json:"token"`
// Time of token creation
// Created time of token created
Created time.Time `json:"created"`
// Time of token expiry
// Expiry ime of the token
Expiry time.Time `json:"expiry"`
// Token holds the actual token
Token string `json:"token"`
}