Compare commits

..

54 Commits

Author SHA1 Message Date
3a60103aed server: drop Internal option
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-04-20 12:45:14 +03:00
41837a67f8 register: drop verbose values export
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-04-20 12:39:21 +03:00
852f19598d util/reflect: fix protobuf field name detection
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-04-19 11:34:28 +03:00
6537b35773 util/reflect: add interface merging
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-04-19 01:19:37 +03:00
b733f1316f remove stale generate stuff
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-04-16 17:36:27 +03:00
Renovate Bot
840af5574c fix(deps): update golang.org/x/net commit hash to e915ea6 2021-04-16 00:56:52 +00:00
Renovate Bot
56e5b7001c fix(deps): update golang.org/x/net commit hash to 0645797 2021-04-14 21:41:15 +00:00
Renovate Bot
11dc6fd752 fix(deps): update golang.org/x/net commit hash to afb366f 2021-04-10 11:09:36 +00:00
a2695d8699 util/reflect: rewrite struct merging with map
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-04-10 01:22:40 +03:00
618421de05 client: allow to set content-type for call
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-04-09 23:09:12 +03:00
Renovate Bot
30baaabd9f fix(deps): update golang.org/x/net commit hash to a5a99cb 2021-04-05 19:46:46 +00:00
df5bce1191 util/reflect: fix StructURLValues
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-04-03 11:50:23 +03:00
Renovate Bot
089d0fe4df fix(deps): update golang.org/x/net commit hash to 0fccb6f 2021-03-31 22:50:38 +00:00
a06f535303 util/reflect: add StructURLValues func
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-04-01 00:30:26 +03:00
Renovate Bot
eba586a329 fix(deps): update golang.org/x/net commit hash to cb1fcc7 2021-03-31 09:19:27 +00:00
Renovate Bot
d74a8645e8 fix(deps): update golang.org/x/net commit hash to e572328 2021-03-31 00:52:53 +00:00
Renovate Bot
5a00786192 fix(deps): update golang.org/x/net commit hash to cd0ac97 2021-03-30 22:28:38 +00:00
Renovate Bot
b3e9941634 fix(deps): update golang.org/x/net commit hash to c8897c2 2021-03-30 16:02:28 +00:00
Renovate Bot
a5a5904302 fix(deps): update golang.org/x/net commit hash to 22f4162 2021-03-30 11:44:00 +00:00
Renovate Bot
a59832e57e fix(deps): update golang.org/x/net commit hash to df645c7 2021-03-30 05:11:12 +00:00
0e42033e7f meter/handler: more idiomatic option handling
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-29 17:51:44 +03:00
52d8255974 service init with own context
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-28 23:42:02 +03:00
9830cb48a9 fix compilation
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-28 19:31:03 +03:00
92d7ab2105 regen handlers
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-28 19:28:01 +03:00
d2935ef399 meter/handler: fix proto and generated code
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-28 19:18:11 +03:00
ce4c96ae0a server/health: add health check handler
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-28 19:18:11 +03:00
Renovate Bot
14026d15be fix(deps): update golang.org/x/net commit hash to 61e0566 2021-03-27 01:29:30 +00:00
Renovate Bot
2df0c7643e fix(deps): update golang.org/x/net commit hash to 6b15177 2021-03-26 19:17:30 +00:00
e13c2c48fd client: use router.DefaultRouter in NewOptions helper
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-26 17:45:55 +03:00
8db55d2e55 router: use dns as default router
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-26 17:43:39 +03:00
ed61cad961 meter/handler: regen
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-26 15:45:09 +03:00
040fc4548f client: add TLSConfig option
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-26 15:44:34 +03:00
6189a1b980 add SkipEndpoints for wrappers
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-25 23:30:38 +03:00
eb2a450a7b meter/handler: fix func signature
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-25 12:36:48 +03:00
Renovate Bot
d2a30a5da1 fix(deps): update golang.org/x/net commit hash to d1beb07 2021-03-24 23:28:38 +00:00
65889c66f6 meter/wrapper: add DefaultSkipEndpoints
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-25 00:06:38 +03:00
dcdf133d5b server: mask router
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-24 13:26:36 +03:00
8742b55305 auth: fix Init method
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-24 13:12:03 +03:00
4a64ee72f7 meter/handler: provide default metrics handler
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-23 17:26:29 +03:00
881d7afeea meter/handler: provide initial meter handler
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-23 17:12:13 +03:00
8c95448535 util/reflect: add IsZero helper
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-21 16:17:50 +03:00
c1dc041d8c client: fix NewOptions with CallOptions filling
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-16 19:09:49 +03:00
Renovate Bot
25be0ac0f0 fix(deps): update golang.org/x/net commit hash to d523dce 2021-03-16 13:12:17 +00:00
Renovate Bot
86f73cac4e fix(deps): update golang.org/x/net commit hash to 34ac3e1 2021-03-15 22:09:13 +00:00
485eda6ce9 meter/wrapper: fix wrapper build
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-15 00:49:26 +03:00
dbbdb24631 meter: rework labels
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-15 00:44:13 +03:00
723ceb4f32 regsiter: fix extractor
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-12 16:09:20 +03:00
bac9869bb3 register: support map in ExtractValue
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-12 15:48:05 +03:00
610427445f codec: provide proto for codec.Frame
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-11 07:42:42 +03:00
Renovate Bot
c84a66c713 fix(deps): update module github.com/imdario/mergo to v0.3.12 2021-03-10 00:35:57 +00:00
00eaae717b lint fixes
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-06 23:44:54 +03:00
a102e95433 spell fixes
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-06 23:33:37 +03:00
39f66cc86c add logger wrapper, fix default logger fields method
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-06 23:26:47 +03:00
bbbcb22565 fieldalignment of all structs to save memory
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-06 19:45:13 +03:00
97 changed files with 2910 additions and 1675 deletions

15
.github/generate.sh vendored
View File

@@ -1,15 +0,0 @@
#!/bin/bash -e
find . -type f -name '*.pb.*.go' -o -name '*.pb.go' -a ! -name 'message.pb.go' -delete
PROTOS=$(find . -type f -name '*.proto' | grep -v proto/google/api)
mkdir -p proto/google/api
curl -s -o proto/google/api/annotations.proto -L https://raw.githubusercontent.com/googleapis/googleapis/master/google/api/annotations.proto
curl -s -o proto/google/api/http.proto -L https://raw.githubusercontent.com/googleapis/googleapis/master/google/api/http.proto
for PROTO in $PROTOS; do
echo $PROTO
protoc -I./proto -I. -I$(dirname $PROTO) --go-grpc_out=paths=source_relative:. --go_out=paths=source_relative:. --micro_out=paths=source_relative:. $PROTO
done
rm -r proto

View File

@@ -34,20 +34,20 @@ type Option func(*Options) error
type Endpoint struct {
// Name Greeter.Hello
Name string
// Description e.g what's this endpoint for
// Desciption for endpoint
Description string
// Handler e.g rpc, proxy
Handler string
// Body destination
// "*" or "" - top level message value
// "string" - inner message value
Body string
// Host e.g example.com
Host []string
// Method e.g GET, POST
Method []string
// Path e.g /greeter. Expect POSIX regex
Path []string
// Body destination
// "*" or "" - top level message value
// "string" - inner message value
Body string
// Stream flag
Stream bool
}

View File

@@ -13,11 +13,11 @@ var (
// Options struct holds handler options
type Options struct {
MaxRecvSize int64
Namespace string
Router router.Router
Client client.Client
Logger logger.Logger
Namespace string
MaxRecvSize int64
}
// Option func signature

View File

@@ -8,9 +8,12 @@ import (
// Options struct
type Options struct {
Handler string
// Context is for external defined options
Context context.Context
// Handler name
Handler string
// ServicePrefix is the prefix
ServicePrefix string
Context context.Context
}
// Option func

View File

@@ -11,11 +11,16 @@ import (
// Options holds the options for api router
type Options struct {
Handler string
// Register for service lookup
Register register.Register
// Resolver to use
Resolver resolver.Resolver
Logger logger.Logger
Context context.Context
// Logger micro logger
Logger logger.Logger
// Context is for external options
Context context.Context
// Handler name
Handler string
}
// Option func signature

View File

@@ -30,7 +30,7 @@ var (
// Auth provides authentication and authorization
type Auth interface {
// Init the auth
Init(opts ...Option)
Init(opts ...Option) error
// Options set for auth
Options() Options
// Generate a new account
@@ -53,30 +53,30 @@ type Auth interface {
// Account provided by an auth provider
type Account struct {
// ID of the account e.g. email
// Metadata any other associated metadata
Metadata metadata.Metadata `json:"metadata"`
// ID of the account e.g. email or uuid
ID string `json:"id"`
// Type of the account, e.g. service
Type string `json:"type"`
// Issuer of the account
Issuer string `json:"issuer"`
// Any other associated metadata
Metadata metadata.Metadata `json:"metadata"`
// Scopes the account has access to
Scopes []string `json:"scopes"`
// Secret for the account, e.g. the password
Secret string `json:"secret"`
// Scopes the account has access to
Scopes []string `json:"scopes"`
}
// Token can be short or long lived
type Token struct {
// The token to be used for accessing resources
AccessToken string `json:"access_token"`
// RefreshToken to be used to generate a new token
RefreshToken string `json:"refresh_token"`
// Time of token creation
Created time.Time `json:"created"`
// Time of token expiry
Expiry time.Time `json:"expiry"`
// The token to be used for accessing resources
AccessToken string `json:"access_token"`
// RefreshToken to be used to generate a new token
RefreshToken string `json:"refresh_token"`
}
// Expired returns a boolean indicating if the token needs to be refreshed
@@ -106,17 +106,15 @@ const (
// Rule is used to verify access to a resource
type Rule struct {
// ID of the rule, e.g. "public"
ID string
// Scope the rule requires, a blank scope indicates open to the public and * indicates the rule
// applies to any valid account
Scope string
// Resource the rule applies to
// Resource that rule belongs to
Resource *Resource
// Access determines if the rule grants or denies access to the resource
// ID of the rule
ID string
// Scope of the rule
Scope string
// Access flag allow/deny
Access Access
// Priority the rule should take when verifying a request, the higher the value the sooner the
// rule will be applied
// Priority holds the rule priority
Priority int32
}

View File

@@ -14,10 +14,11 @@ func (n *noopAuth) String() string {
}
// Init the auth
func (n *noopAuth) Init(opts ...Option) {
func (n *noopAuth) Init(opts ...Option) error {
for _, o := range opts {
o(&n.opts)
}
return nil
}
// Options set for auth

View File

@@ -26,33 +26,34 @@ func NewOptions(opts ...Option) Options {
// Options struct holds auth options
type Options struct {
Name string
// Issuer of the service's account
Issuer string
// ID is the services auth ID
ID string
// Secret is used to authenticate the service
Secret string
// Context holds the external options
Context context.Context
// Meter used for metrics
Meter meter.Meter
// Logger used for logging
Logger logger.Logger
// Tracer used for tracing
Tracer tracer.Tracer
// Store used for stre data
Store store.Store
// Token is the services token used to authenticate itself
Token *Token
// PublicKey for decoding JWTs
PublicKey string
// PrivateKey for encoding JWTs
PrivateKey string
// LoginURL is the relative url path where a user can login
LoginURL string
// Store to back auth
Store store.Store
// PrivateKey for encoding JWTs
PrivateKey string
// PublicKey for decoding JWTs
PublicKey string
// Secret is used to authenticate the service
Secret string
// ID is the services auth ID
ID string
// Issuer of the service's account
Issuer string
// Name holds the auth name
Name string
// Addrs sets the addresses of auth
Addrs []string
// Logger sets the logger
Logger logger.Logger
// Meter sets tht meter
Meter meter.Meter
// Tracer
Tracer tracer.Tracer
// Context to store other options
Context context.Context
}
// Option func
@@ -124,18 +125,12 @@ func LoginURL(url string) Option {
// GenerateOptions struct
type GenerateOptions struct {
// Metadata associated with the account
Metadata metadata.Metadata
// Scopes the account has access too
Scopes []string
// Provider of the account, e.g. oauth
Provider string
// Type of the account, e.g. user
Type string
// Secret used to authenticate the account
Secret string
// Issuer of the account, e.g. micro
Issuer string
Type string
Secret string
Issuer string
Scopes []string
}
// GenerateOption func
@@ -194,16 +189,11 @@ func NewGenerateOptions(opts ...GenerateOption) GenerateOptions {
// TokenOptions struct
type TokenOptions struct {
// ID for the account
ID string
// Secret for the account
Secret string
// RefreshToken is used to refesh a token
ID string
Secret string
RefreshToken string
// Expiry is the time the token should live for
Expiry time.Duration
// Issuer of the account
Issuer string
Issuer string
Expiry time.Duration
}
// TokenOption func

View File

@@ -13,28 +13,27 @@ import (
)
type memoryBroker struct {
opts Options
addr string
sync.RWMutex
connected bool
opts Options
Subscribers map[string][]*memorySubscriber
addr string
sync.RWMutex
connected bool
}
type memoryEvent struct {
opts Options
topic string
err error
message interface{}
topic string
}
type memorySubscriber struct {
id string
topic string
exit chan bool
handler Handler
opts SubscribeOptions
ctx context.Context
exit chan bool
handler Handler
id string
topic string
}
func (m *memoryBroker) Options() Options {

View File

@@ -13,25 +13,26 @@ import (
// Options struct
type Options struct {
Name string
// Addrs useed by broker
Addrs []string
// ErrorHandler executed when errors occur processing messages
ErrorHandler Handler
// Codec used to marshal/unmarshal messages
Codec codec.Codec
// Logger the used logger
Logger logger.Logger
// Meter the used for metrics
Meter meter.Meter
// Tracer used for trace
// Tracer used for tracing
Tracer tracer.Tracer
// TLSConfig for secure communication
TLSConfig *tls.Config
// Register used for clustering
// Register can be used for clustering
Register register.Register
// Context is used for non default options
// Codec holds the codec for marshal/unmarshal
Codec codec.Codec
// Logger used for logging
Logger logger.Logger
// Meter used for metrics
Meter meter.Meter
// Context holds external options
Context context.Context
// TLSConfig holds tls.TLSConfig options
TLSConfig *tls.Config
// ErrorHandler used when broker can't unmarshal incoming message
ErrorHandler Handler
// Name holds the broker name
Name string
// Addrs holds the broker address
Addrs []string
}
// NewOptions create new Options
@@ -59,10 +60,10 @@ func Context(ctx context.Context) Option {
// PublishOptions struct
type PublishOptions struct {
// BodyOnly says that only body of the message must be published
BodyOnly bool
// Context for non default options
// Context holds external options
Context context.Context
// BodyOnly flag says the message contains raw body bytes
BodyOnly bool
}
// NewPublishOptions creates PublishOptions struct
@@ -80,22 +81,16 @@ func NewPublishOptions(opts ...PublishOption) PublishOptions {
// SubscribeOptions struct
type SubscribeOptions struct {
// AutoAck ack messages if handler returns nil err
AutoAck bool
// ErrorHandler executed when errors occur processing messages
ErrorHandler Handler
// Group for subscriber, Subscribers with the same group name
// will create a shared subscription where each
// receives a subset of messages.
Group string
// BodyOnly says that consumed only body of the message
BodyOnly bool
// Context is used for non default options
// Context holds external options
Context context.Context
// ErrorHandler used when broker can't unmarshal incoming message
ErrorHandler Handler
// Group holds consumer group
Group string
// AutoAck flag specifies auto ack of incoming message when no error happens
AutoAck bool
// BodyOnly flag specifies that message contains only body bytes without header
BodyOnly bool
}
// Option func

View File

@@ -21,12 +21,12 @@ type Source struct {
// Package is packaged format for source
type Package struct {
// Source of the package
Source *Source
// Name of the package
Name string
// Location of the package
Path string
// Type of package e.g tarball, binary, docker
Type string
// Source of the package
Source *Source
}

View File

@@ -31,12 +31,12 @@ type noopMessage struct {
}
type noopRequest struct {
body interface{}
codec codec.Codec
service string
method string
endpoint string
contentType string
body interface{}
codec codec.Codec
stream bool
}

View File

@@ -2,6 +2,7 @@ package client
import (
"context"
"crypto/tls"
"time"
"github.com/unistack-org/micro/v3/broker"
@@ -18,35 +19,42 @@ import (
// Options holds client options
type Options struct {
Name string
// Used to select codec
ContentType string
// Proxy address to send requests via
Proxy string
// Plugged interfaces
Broker broker.Broker
Codecs map[string]codec.Codec
Router router.Router
Selector selector.Selector
Transport transport.Transport
Logger logger.Logger
Meter meter.Meter
// Lookup used for looking up routes
Lookup LookupFunc
// Connection Pool
PoolSize int
PoolTTL time.Duration
Tracer tracer.Tracer
// Wrapper that used client
Wrappers []Wrapper
// CallOptions that used by default
// CallOptions contains default CallOptions
CallOptions CallOptions
// Context is used for non default options
// Logger used to log messages
Logger logger.Logger
// Tracer used for tracing
Tracer tracer.Tracer
// Broker used to publish messages
Broker broker.Broker
// Meter used for metrics
Meter meter.Meter
// Router used to get route
Router router.Router
// Selector used to select needed address
Selector selector.Selector
// Transport used for transfer messages
Transport transport.Transport
// Context is used for external options
Context context.Context
// Codecs map
Codecs map[string]codec.Codec
// Lookup func used to get destination addr
Lookup LookupFunc
// Proxy is used for proxy requests
Proxy string
// ContentType is Used to select codec
ContentType string
// Name is the client name
Name string
// Wrappers contains wrappers
Wrappers []Wrapper
// PoolSize connection pool size
PoolSize int
// PoolTTL connection pool ttl
PoolTTL time.Duration
// TLSConfig specifies tls.Config for secure connection
TLSConfig *tls.Config
}
// NewCallOptions creates new call options struct
@@ -60,34 +68,36 @@ func NewCallOptions(opts ...CallOption) CallOptions {
// CallOptions holds client call options
type CallOptions struct {
// Address of remote hosts
Address []string
// Backoff func
Backoff BackoffFunc
// DialTimeout is the transport Dial Timeout
DialTimeout time.Duration
// Retries is the number of Call attempts
Retries int
// Retry func to be used for retries
Retry RetryFunc
// RequestTimeout specifies request timeout
RequestTimeout time.Duration
// Router to use for this call
// Router used for route
Router router.Router
// Selector to use for the call
// Selector selects addr
Selector selector.Selector
// SelectOptions to use when selecting a route
SelectOptions []selector.SelectOption
// StreamTimeout timeout for the stream
StreamTimeout time.Duration
// AuthToken specifies the auth token as the authorization header
AuthToken bool
// Network to lookup the route within
Network string
// CallWrappers is for low level call func
CallWrappers []CallWrapper
// Context is used for non default options
// Context used for deadline
Context context.Context
// Retry func used for retries
Retry RetryFunc
// Backoff func used for backoff when retry
Backoff BackoffFunc
// Network name
Network string
// Content-Type
ContentType string
// CallWrappers call wrappers
CallWrappers []CallWrapper
// SelectOptions selector options
SelectOptions []selector.SelectOption
// Address specifies static addr list
Address []string
// Retries specifies retries num
Retries int
// StreamTimeout stream timeout
StreamTimeout time.Duration
// RequestTimeout request timeout
RequestTimeout time.Duration
// DialTimeout dial timeout
DialTimeout time.Duration
// AuthToken flag
AuthToken bool
}
// Context pass context to client
@@ -108,10 +118,10 @@ func NewPublishOptions(opts ...PublishOption) PublishOptions {
// PublishOptions holds publish options
type PublishOptions struct {
// Exchange is the routing exchange for the message
Exchange string
// Context holds additional options
// Context used for external options
Context context.Context
// Exchange topic exchange name
Exchange string
}
// NewMessageOptions creates message options struct
@@ -125,6 +135,7 @@ func NewMessageOptions(opts ...MessageOption) MessageOptions {
// MessageOptions holds client message options
type MessageOptions struct {
// ContentType specify content-type of message
ContentType string
}
@@ -139,12 +150,12 @@ func NewRequestOptions(opts ...RequestOption) RequestOptions {
// RequestOptions holds client request options
type RequestOptions struct {
// ContentType specify content-type of request
ContentType string
// Stream says that request is the streaming
Stream bool
// Context can hold other options
// Context used for external options
Context context.Context
// ContentType specify content-type of message
ContentType string
// Stream flag
Stream bool
}
// NewOptions creates new options struct
@@ -154,20 +165,23 @@ func NewOptions(opts ...Option) Options {
ContentType: DefaultContentType,
Codecs: make(map[string]codec.Codec),
CallOptions: CallOptions{
Context: context.Background(),
Backoff: DefaultBackoff,
Retry: DefaultRetry,
Retries: DefaultRetries,
RequestTimeout: DefaultRequestTimeout,
DialTimeout: transport.DefaultDialTimeout,
},
Lookup: LookupRoute,
PoolSize: DefaultPoolSize,
PoolTTL: DefaultPoolTTL,
Selector: random.NewSelector(),
Logger: logger.DefaultLogger,
Broker: broker.DefaultBroker,
Meter: meter.DefaultMeter,
Tracer: tracer.DefaultTracer,
Lookup: LookupRoute,
PoolSize: DefaultPoolSize,
PoolTTL: DefaultPoolTTL,
Selector: random.NewSelector(),
Logger: logger.DefaultLogger,
Broker: broker.DefaultBroker,
Meter: meter.DefaultMeter,
Tracer: tracer.DefaultTracer,
Router: router.DefaultRouter,
Transport: transport.DefaultTransport,
}
for _, o := range opts {
@@ -305,6 +319,22 @@ func Lookup(l LookupFunc) Option {
}
}
// TLSConfig specifies a *tls.Config
func TLSConfig(t *tls.Config) Option {
return func(o *Options) {
// set the internal tls
o.TLSConfig = t
// set the default transport if one is not
// already set. Required for Init call below.
// set the transport tls
o.Transport.Init(
transport.TLSConfig(t),
)
}
}
// Retries sets the retry count when making the request.
func Retries(i int) Option {
return func(o *Options) {
@@ -354,6 +384,13 @@ func PublishContext(ctx context.Context) PublishOption {
}
}
// WithContentType specifies call content type
func WithContentType(ct string) CallOption {
return func(o *CallOptions) {
o.ContentType = ct
}
}
// WithAddress sets the remote addresses to use rather than using service discovery
func WithAddress(a ...string) CallOption {
return func(o *CallOptions) {
@@ -458,16 +495,16 @@ func WithMessageContentType(ct string) MessageOption {
}
}
// WithContentType specifies request content type
func WithContentType(ct string) RequestOption {
// StreamingRequest specifies that request is streaming
func StreamingRequest(b bool) RequestOption {
return func(o *RequestOptions) {
o.Stream = b
}
}
// RequestContentType specifies request content type
func RequestContentType(ct string) RequestOption {
return func(o *RequestOptions) {
o.ContentType = ct
}
}
// StreamingRequest specifies that request is streaming
func StreamingRequest() RequestOption {
return func(o *RequestOptions) {
o.Stream = true
}
}

View File

@@ -5,13 +5,13 @@ import (
)
type testRequest struct {
service string
opts RequestOptions
codec codec.Codec
body interface{}
method string
endpoint string
contentType string
codec codec.Codec
body interface{}
opts RequestOptions
service string
}
func (r *testRequest) ContentType() string {

View File

@@ -51,16 +51,14 @@ type Codec interface {
// the communication, likely followed by the body.
// In the case of an error, body may be nil.
type Message struct {
Id string
Type MessageType
Header metadata.Metadata
Target string
Method string
Endpoint string
Error string
// The values read from the socket
Header metadata.Metadata
Body []byte
Id string
Body []byte
Type MessageType
}
// NewMessage creates new codec message

6
codec/frame.go Normal file
View File

@@ -0,0 +1,6 @@
package codec
// Frame gives us the ability to define raw data to send over the pipes
type Frame struct {
Data []byte
}

28
codec/frame.proto Normal file
View File

@@ -0,0 +1,28 @@
// Copyright 2021 Unistack LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package micro.codec;
option cc_enable_arenas = true;
option go_package = "github.com/unistack-org/micro/v3/codec;codec";
option java_multiple_files = true;
option java_outer_classname = "MicroCodec";
option java_package = "micro.codec";
option objc_class_prefix = "MCODEC";
message Frame {
bytes data = 1;
}

View File

@@ -8,11 +8,6 @@ import (
type noopCodec struct {
}
// Frame gives us the ability to define raw data to send over the pipes
type Frame struct {
Data []byte
}
func (c *noopCodec) ReadHeader(conn io.Reader, m *Message, t MessageType) error {
return nil
}

View File

@@ -11,10 +11,14 @@ type Option func(*Options)
// Options contains codec options
type Options struct {
// Meter used for metrics
Meter meter.Meter
// Logger used for logging
Logger logger.Logger
// Tracer used for tracing
Tracer tracer.Tracer
// MaxMsgSize specifies max messages size that reads by codec
MaxMsgSize int
Meter meter.Meter
Logger logger.Logger
Tracer tracer.Tracer
}
// MaxMsgSize sets the max message size

View File

@@ -11,26 +11,32 @@ import (
// Options hold the config options
type Options struct {
Name string
AllowFail bool
BeforeLoad []func(context.Context, Config) error
AfterLoad []func(context.Context, Config) error
BeforeSave []func(context.Context, Config) error
AfterSave []func(context.Context, Config) error
// Struct that holds config data
// Struct holds the destination config struct
Struct interface{}
// StructTag name
StructTag string
// Logger that will be used
Logger logger.Logger
// Meter that will be used
Meter meter.Meter
// Tracer used for trace
Tracer tracer.Tracer
// Codec that used for load/save
Codec codec.Codec
// Context for alternative data
// Tracer that will be used
Tracer tracer.Tracer
// Meter that will be used
Meter meter.Meter
// Logger that will be used
Logger logger.Logger
// Context used for external options
Context context.Context
// Name of the config
Name string
// StructTag name
StructTag string
// BeforeSave contains slice of funcs that runs before save
BeforeSave []func(context.Context, Config) error
// AfterLoad contains slice of funcs that runs after load
AfterLoad []func(context.Context, Config) error
// BeforeLoad contains slice of funcs that runs before load
BeforeLoad []func(context.Context, Config) error
// AfterSave contains slice of funcs that runs after save
AfterSave []func(context.Context, Config) error
// AllowFail flag to allow fail in config source
AllowFail bool
}
// Option function signature

View File

@@ -37,10 +37,14 @@ var (
// Error type
type Error struct {
Id string
Code int32
// Id holds error id or service, usually someting like my_service or uuid
Id string
// Detail holds some useful details about error
Detail string
// Status usually holds text of http status
Status string
// Code holds error code
Code int32
}
// Error satisfies error interface
@@ -49,7 +53,7 @@ func (e *Error) Error() string {
return string(b)
}
// New generates a custom error.
// New generates a custom error
func New(id, detail string, code int32) error {
return &Error{
Id: id,

View File

@@ -1,3 +0,0 @@
package micro
//go:generate ./.github/generate.sh

4
go.mod
View File

@@ -6,8 +6,8 @@ require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/ef-ds/deque v1.0.4
github.com/google/uuid v1.2.0
github.com/imdario/mergo v0.3.11
github.com/imdario/mergo v0.3.12
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110
golang.org/x/net v0.0.0-20210415231046-e915ea6b2b7d
)

11
go.sum
View File

@@ -4,17 +4,18 @@ github.com/ef-ds/deque v1.0.4 h1:iFAZNmveMT9WERAkqLJ+oaABF9AcVQ5AjXem/hroniI=
github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/imdario/mergo v0.3.11 h1:3tnifQM4i+fbajXKBHXWEH+KvNHqojZ778UH75j3bGA=
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU=
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34 h1:vBfVmA5mZhsQa2jr1FOL9nfA37N/jnbBmi5XUfviVTI=
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110 h1:qWPm9rbaAMKs8Bq/9LRpbMqxWRVUAQwMI9fVrssnTfw=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210415231046-e915ea6b2b7d h1:BgJvlyh+UqCUaPlscHJ+PN8GcpfrFdr7NHjd1JL0+Gs=
golang.org/x/net v0.0.0-20210415231046-e915ea6b2b7d/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=

View File

@@ -21,9 +21,9 @@ func init() {
}
type defaultLogger struct {
sync.RWMutex
opts Options
enc *json.Encoder
sync.RWMutex
}
// Init(opts...) should only overwrite provided options
@@ -49,10 +49,18 @@ func (l *defaultLogger) V(level Level) bool {
}
func (l *defaultLogger) Fields(fields map[string]interface{}) Logger {
l.Lock()
l.opts.Fields = copyFields(fields)
l.Unlock()
return l
nl := &defaultLogger{opts: l.opts, enc: l.enc}
nl.opts.Fields = make(map[string]interface{}, len(l.opts.Fields)+len(fields))
l.RLock()
for k, v := range l.opts.Fields {
nl.opts.Fields[k] = v
}
l.RUnlock()
for k, v := range fields {
nl.opts.Fields[k] = v
}
return nl
}
func copyFields(src map[string]interface{}) map[string]interface{} {
@@ -153,7 +161,9 @@ func (l *defaultLogger) Log(ctx context.Context, level Level, args ...interface{
}
fields["timestamp"] = time.Now().Format("2006-01-02 15:04:05")
fields["msg"] = fmt.Sprint(args...)
if len(args) > 0 {
fields["msg"] = fmt.Sprint(args...)
}
l.RLock()
_ = l.enc.Encode(fields)
@@ -178,7 +188,7 @@ func (l *defaultLogger) Logf(ctx context.Context, level Level, msg string, args
fields["timestamp"] = time.Now().Format("2006-01-02 15:04:05")
if len(args) > 0 {
fields["msg"] = fmt.Sprintf(msg, args...)
} else {
} else if msg != "" {
fields["msg"] = msg
}
l.RLock()

View File

@@ -11,17 +11,18 @@ type Option func(*Options)
// Options holds logger options
type Options struct {
Name string
// The logging level the logger should log at. default is `InfoLevel`
Level Level
// fields to always be logged
Fields map[string]interface{}
// It's common to set this to a file, or leave it default which is `os.Stderr`
// Out holds the output writer
Out io.Writer
// Caller skip frame count for file:line info
CallerSkipCount int
// Alternative options
// Context holds exernal options
Context context.Context
// Fields holds additional metadata
Fields map[string]interface{}
// Name holds the logger name
Name string
// CallerSkipCount number of frmaes to skip
CallerSkipCount int
// The logging level the logger should log
Level Level
}
// NewOptions creates new options struct

408
logger/wrapper/wrapper.go Normal file
View File

@@ -0,0 +1,408 @@
// Package wrapper provides wrapper for Logger
package wrapper
import (
"context"
"fmt"
"github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/server"
)
var (
DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, err error) []string {
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
if err != nil {
labels = append(labels, "error", err.Error())
}
return labels
}
DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, err error) []string {
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
if err != nil {
labels = append(labels, "error", err.Error())
}
return labels
}
DefaultClientPublishObserver = func(ctx context.Context, msg client.Message, opts []client.PublishOption, err error) []string {
labels := []string{"endpoint", msg.Topic()}
if err != nil {
labels = append(labels, "error", err.Error())
}
return labels
}
DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, err error) []string {
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
if err != nil {
labels = append(labels, "error", err.Error())
}
return labels
}
DefaultServerSubscriberObserver = func(ctx context.Context, msg server.Message, err error) []string {
labels := []string{"endpoint", msg.Topic()}
if err != nil {
labels = append(labels, "error", err.Error())
}
return labels
}
DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, err error) []string {
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
if err != nil {
labels = append(labels, "error", err.Error())
}
return labels
}
DefaultSkipEndpoints = []string{"Meter.Metrics"}
)
type lWrapper struct {
client.Client
serverHandler server.HandlerFunc
serverSubscriber server.SubscriberFunc
clientCallFunc client.CallFunc
opts Options
}
type ClientCallObserver func(context.Context, client.Request, interface{}, []client.CallOption, error) []string
type ClientStreamObserver func(context.Context, client.Request, []client.CallOption, client.Stream, error) []string
type ClientPublishObserver func(context.Context, client.Message, []client.PublishOption, error) []string
type ClientCallFuncObserver func(context.Context, string, client.Request, interface{}, client.CallOptions, error) []string
type ServerHandlerObserver func(context.Context, server.Request, interface{}, error) []string
type ServerSubscriberObserver func(context.Context, server.Message, error) []string
// Options struct for wrapper
type Options struct {
// Logger that used for log
Logger logger.Logger
// Level for logger
Level logger.Level
// Enabled flag
Enabled bool
// ClientCallObservers funcs
ClientCallObservers []ClientCallObserver
// ClientStreamObservers funcs
ClientStreamObservers []ClientStreamObserver
// ClientPublishObservers funcs
ClientPublishObservers []ClientPublishObserver
// ClientCallFuncObservers funcs
ClientCallFuncObservers []ClientCallFuncObserver
// ServerHandlerObservers funcs
ServerHandlerObservers []ServerHandlerObserver
// ServerSubscriberObservers funcs
ServerSubscriberObservers []ServerSubscriberObserver
// SkipEndpoints
SkipEndpoints []string
}
// Option func signature
type Option func(*Options)
// NewOptions creates Options from Option slice
func NewOptions(opts ...Option) Options {
options := Options{
Logger: logger.DefaultLogger,
Level: logger.TraceLevel,
ClientCallObservers: []ClientCallObserver{DefaultClientCallObserver},
ClientStreamObservers: []ClientStreamObserver{DefaultClientStreamObserver},
ClientPublishObservers: []ClientPublishObserver{DefaultClientPublishObserver},
ClientCallFuncObservers: []ClientCallFuncObserver{DefaultClientCallFuncObserver},
ServerHandlerObservers: []ServerHandlerObserver{DefaultServerHandlerObserver},
ServerSubscriberObservers: []ServerSubscriberObserver{DefaultServerSubscriberObserver},
SkipEndpoints: DefaultSkipEndpoints,
}
for _, o := range opts {
o(&options)
}
return options
}
// WithEnabled enable/diable flag
func WithEnabled(b bool) Option {
return func(o *Options) {
o.Enabled = b
}
}
// WithLevel log level
func WithLevel(l logger.Level) Option {
return func(o *Options) {
o.Level = l
}
}
// WithLogger logger
func WithLogger(l logger.Logger) Option {
return func(o *Options) {
o.Logger = l
}
}
// WithClientCallObservers funcs
func WithClientCallObservers(ob ...ClientCallObserver) Option {
return func(o *Options) {
o.ClientCallObservers = ob
}
}
// WithClientStreamObservers funcs
func WithClientStreamObservers(ob ...ClientStreamObserver) Option {
return func(o *Options) {
o.ClientStreamObservers = ob
}
}
// WithClientPublishObservers funcs
func WithClientPublishObservers(ob ...ClientPublishObserver) Option {
return func(o *Options) {
o.ClientPublishObservers = ob
}
}
// WithClientCallFuncObservers funcs
func WithClientCallFuncObservers(ob ...ClientCallFuncObserver) Option {
return func(o *Options) {
o.ClientCallFuncObservers = ob
}
}
// WithServerHandlerObservers funcs
func WithServerHandlerObservers(ob ...ServerHandlerObserver) Option {
return func(o *Options) {
o.ServerHandlerObservers = ob
}
}
// WithServerSubscriberObservers funcs
func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option {
return func(o *Options) {
o.ServerSubscriberObservers = ob
}
}
// SkipEndpoins
func SkipEndpoints(eps ...string) Option {
return func(o *Options) {
o.SkipEndpoints = append(o.SkipEndpoints, eps...)
}
}
func (l *lWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
err := l.Client.Call(ctx, req, rsp, opts...)
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range l.opts.SkipEndpoints {
if ep == endpoint {
return err
}
}
if !l.opts.Enabled {
return err
}
var labels []string
for _, o := range l.opts.ClientCallObservers {
labels = append(labels, o(ctx, req, rsp, opts, err)...)
}
fields := make(map[string]interface{}, int(len(labels)/2))
for i := 0; i < len(labels); i += 2 {
fields[labels[i]] = labels[i+1]
}
l.opts.Logger.Fields(fields).Log(ctx, l.opts.Level)
return err
}
func (l *lWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
stream, err := l.Client.Stream(ctx, req, opts...)
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range l.opts.SkipEndpoints {
if ep == endpoint {
return stream, err
}
}
if !l.opts.Enabled {
return stream, err
}
var labels []string
for _, o := range l.opts.ClientStreamObservers {
labels = append(labels, o(ctx, req, opts, stream, err)...)
}
fields := make(map[string]interface{}, int(len(labels)/2))
for i := 0; i < len(labels); i += 2 {
fields[labels[i]] = labels[i+1]
}
l.opts.Logger.Fields(fields).Log(ctx, l.opts.Level)
return stream, err
}
func (l *lWrapper) Publish(ctx context.Context, msg client.Message, opts ...client.PublishOption) error {
err := l.Client.Publish(ctx, msg, opts...)
endpoint := msg.Topic()
for _, ep := range l.opts.SkipEndpoints {
if ep == endpoint {
return err
}
}
if !l.opts.Enabled {
return err
}
var labels []string
for _, o := range l.opts.ClientPublishObservers {
labels = append(labels, o(ctx, msg, opts, err)...)
}
fields := make(map[string]interface{}, int(len(labels)/2))
for i := 0; i < len(labels); i += 2 {
fields[labels[i]] = labels[i+1]
}
l.opts.Logger.Fields(fields).Log(ctx, l.opts.Level)
return err
}
func (l *lWrapper) ServerHandler(ctx context.Context, req server.Request, rsp interface{}) error {
err := l.serverHandler(ctx, req, rsp)
endpoint := req.Endpoint()
for _, ep := range l.opts.SkipEndpoints {
if ep == endpoint {
return err
}
}
if !l.opts.Enabled {
return err
}
var labels []string
for _, o := range l.opts.ServerHandlerObservers {
labels = append(labels, o(ctx, req, rsp, err)...)
}
fields := make(map[string]interface{}, int(len(labels)/2))
for i := 0; i < len(labels); i += 2 {
fields[labels[i]] = labels[i+1]
}
l.opts.Logger.Fields(fields).Log(ctx, l.opts.Level)
return err
}
func (l *lWrapper) ServerSubscriber(ctx context.Context, msg server.Message) error {
err := l.serverSubscriber(ctx, msg)
endpoint := msg.Topic()
for _, ep := range l.opts.SkipEndpoints {
if ep == endpoint {
return err
}
}
if !l.opts.Enabled {
return err
}
var labels []string
for _, o := range l.opts.ServerSubscriberObservers {
labels = append(labels, o(ctx, msg, err)...)
}
fields := make(map[string]interface{}, int(len(labels)/2))
for i := 0; i < len(labels); i += 2 {
fields[labels[i]] = labels[i+1]
}
l.opts.Logger.Fields(fields).Log(ctx, l.opts.Level)
return err
}
// NewClientWrapper accepts an open options and returns a Client Wrapper
func NewClientWrapper(opts ...Option) client.Wrapper {
return func(c client.Client) client.Client {
options := NewOptions()
for _, o := range opts {
o(&options)
}
return &lWrapper{opts: options, Client: c}
}
}
// NewClientCallWrapper accepts an options and returns a Call Wrapper
func NewClientCallWrapper(opts ...Option) client.CallWrapper {
return func(h client.CallFunc) client.CallFunc {
options := NewOptions()
for _, o := range opts {
o(&options)
}
l := &lWrapper{opts: options, clientCallFunc: h}
return l.ClientCallFunc
}
}
func (l *lWrapper) ClientCallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
err := l.clientCallFunc(ctx, addr, req, rsp, opts)
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range l.opts.SkipEndpoints {
if ep == endpoint {
return err
}
}
if !l.opts.Enabled {
return err
}
var labels []string
for _, o := range l.opts.ClientCallFuncObservers {
labels = append(labels, o(ctx, addr, req, rsp, opts, err)...)
}
fields := make(map[string]interface{}, int(len(labels)/2))
for i := 0; i < len(labels); i += 2 {
fields[labels[i]] = labels[i+1]
}
l.opts.Logger.Fields(fields).Log(ctx, l.opts.Level)
return err
}
// NewServerHandlerWrapper accepts an options and returns a Handler Wrapper
func NewServerHandlerWrapper(opts ...Option) server.HandlerWrapper {
return func(h server.HandlerFunc) server.HandlerFunc {
options := NewOptions()
for _, o := range opts {
o(&options)
}
l := &lWrapper{opts: options, serverHandler: h}
return l.ServerHandler
}
}
// NewServerSubscriberWrapper accepts an options and returns a Subscriber Wrapper
func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
return func(h server.SubscriberFunc) server.SubscriberFunc {
options := NewOptions()
for _, o := range opts {
o(&options)
}
l := &lWrapper{opts: options, serverSubscriber: h}
return l.ServerSubscriber
}
}

View File

@@ -27,10 +27,10 @@ var (
// Iterator used to iterate over metadata with order
type Iterator struct {
md Metadata
keys []string
cur int
cnt int
keys []string
md Metadata
}
// Next advance iterator to next element
@@ -112,6 +112,7 @@ func Merge(omd Metadata, mmd Metadata, overwrite bool) Metadata {
return nmd
}
// Pairs from which metadata created
func Pairs(kv ...string) (Metadata, bool) {
if len(kv)%2 == 1 {
return nil, false

3
meter/generate.go Normal file
View File

@@ -0,0 +1,3 @@
package meter
//go:generate protoc -I./handler -I../ -I/home/vtolstov/.cache/go-path/pkg/mod/github.com/unistack-org/micro-proto@v0.0.1 --micro_out=components=micro|http|server,standalone=false,debug=true,paths=source_relative:./handler handler/handler.proto

69
meter/handler/handler.go Normal file
View File

@@ -0,0 +1,69 @@
package handler
import (
"bytes"
"context"
"github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/errors"
"github.com/unistack-org/micro/v3/meter"
)
var (
// guard to fail early
_ MeterServer = &handler{}
)
type handler struct {
opts Options
}
type Option func(*Options)
type Options struct {
Meter meter.Meter
MeterOptions []meter.Option
Name string
}
func Meter(m meter.Meter) Option {
return func(o *Options) {
o.Meter = m
}
}
func Name(name string) Option {
return func(o *Options) {
o.Name = name
}
}
func MeterOptions(opts ...meter.Option) Option {
return func(o *Options) {
o.MeterOptions = append(o.MeterOptions, opts...)
}
}
func NewOptions(opts ...Option) Options {
options := Options{Meter: meter.DefaultMeter}
for _, o := range opts {
o(&options)
}
return options
}
func NewHandler(opts ...Option) *handler {
options := NewOptions(opts...)
return &handler{opts: options}
}
func (h *handler) Metrics(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
buf := bytes.NewBuffer(nil)
if err := h.opts.Meter.Write(buf, h.opts.MeterOptions...); err != nil {
return errors.InternalServerError(h.opts.Name, "%v", err)
}
rsp.Data = buf.Bytes()
return nil
}

View File

@@ -0,0 +1,28 @@
syntax = "proto3";
package micro.meter.handler;
option go_package = "github.com/unistack-org/micro/v3/meter/handler;handler";
import "api/annotations.proto";
import "openapiv2/annotations.proto";
import "codec/frame.proto";
service Meter {
rpc Metrics(micro.codec.Frame) returns (micro.codec.Frame) {
option (micro.openapiv2.openapiv2_operation) = {
operation_id: "Metrics";
responses: {
key: "default";
value: {
description: "Error response";
schema: {
json_schema: {
ref: "micro.codec.Frame";
}
}
}
}
};
option (micro.api.http) = { get: "/metrics"; };
};
};

View File

@@ -0,0 +1,24 @@
// Code generated by protoc-gen-micro
// source: handler.proto
package handler
import (
context "context"
api "github.com/unistack-org/micro/v3/api"
codec "github.com/unistack-org/micro/v3/codec"
)
func NewMeterEndpoints() []*api.Endpoint {
return []*api.Endpoint{
&api.Endpoint{
Name: "Meter.Metrics",
Path: []string{"/metrics"},
Method: []string{"GET"},
Handler: "rpc",
},
}
}
type MeterServer interface {
Metrics(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
}

View File

@@ -0,0 +1,33 @@
// Code generated by protoc-gen-micro
// source: handler.proto
package handler
import (
context "context"
api "github.com/unistack-org/micro/v3/api"
codec "github.com/unistack-org/micro/v3/codec"
server "github.com/unistack-org/micro/v3/server"
)
type meterServer struct {
MeterServer
}
func (h *meterServer) Metrics(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
return h.MeterServer.Metrics(ctx, req, rsp)
}
func RegisterMeterServer(s server.Server, sh MeterServer, opts ...server.HandlerOption) error {
type meter interface {
Metrics(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
}
type Meter struct {
meter
}
h := &meterServer{sh}
var nopts []server.HandlerOption
for _, endpoint := range NewMeterEndpoints() {
nopts = append(nopts, api.WithEndpoint(endpoint))
}
return s.Handle(s.NewHandler(&Meter{h}, append(nopts, opts...)...))
}

View File

@@ -3,6 +3,7 @@ package meter
import (
"io"
"reflect"
"sort"
"time"
)
@@ -76,66 +77,36 @@ type Summary interface {
UpdateDuration(time.Time)
}
// Labels holds the metrics labels with k, v
type Labels struct {
keys []string
vals []string
type byKey []string
func (k byKey) Len() int { return len(k) / 2 }
func (k byKey) Less(i, j int) bool { return k[i*2] < k[j*2] }
func (k byKey) Swap(i, j int) {
k[i*2], k[i*2+1], k[j*2], k[j*2+1] = k[j*2], k[j*2+1], k[i*2], k[i*2+1]
}
// Append adds labels to label set
func (ls Labels) Append(nls Labels) Labels {
for n := range nls.keys {
ls.keys = append(ls.keys, nls.keys[n])
ls.vals = append(ls.vals, nls.vals[n])
func Sort(slice *[]string) {
bk := byKey(*slice)
if bk.Len() <= 1 {
return
}
return ls
}
// Len returns number of labels
func (ls Labels) Len() int {
return len(ls.keys)
}
type labels Labels
func (ls labels) Len() int {
return len(ls.keys)
}
func (ls labels) Sort() {
sort.Sort(ls)
}
func (ls labels) Swap(i, j int) {
ls.keys[i], ls.keys[j] = ls.keys[j], ls.keys[i]
ls.vals[i], ls.vals[j] = ls.vals[j], ls.vals[i]
}
func (ls labels) Less(i, j int) bool {
return ls.keys[i] < ls.keys[j]
}
// LabelIter holds the
type LabelIter struct {
labels Labels
cnt int
cur int
}
// Iter returns labels iterator
func (ls Labels) Iter() *LabelIter {
labels(ls).Sort()
return &LabelIter{labels: ls, cnt: len(ls.keys)}
}
// Next advance itarator to new pos
func (iter *LabelIter) Next(k, v *string) bool {
if iter.cur+1 > iter.cnt {
return false
sort.Sort(bk)
v := reflect.ValueOf(slice).Elem()
cnt := 0
key := 0
val := 1
for key < v.Len() {
if len(bk) > key+2 && bk[key] == bk[key+2] {
key += 2
val += 2
continue
}
v.Index(cnt).Set(v.Index(key))
cnt++
v.Index(cnt).Set(v.Index(val))
cnt++
key += 2
val += 2
}
*k = iter.labels.keys[iter.cur]
*v = iter.labels.vals[iter.cur]
iter.cur++
return true
v.SetLen(cnt)
}

View File

@@ -10,46 +10,15 @@ func TestNoopMeter(t *testing.T) {
t.Fatalf("invalid options parsing: %v", m.Options())
}
cnt := m.Counter("counter", Label("server", "noop"))
cnt := m.Counter("counter", Labels("server", "noop"))
cnt.Inc()
}
func TestLabelsAppend(t *testing.T) {
var ls Labels
ls.keys = []string{"type", "server"}
ls.vals = []string{"noop", "http"}
func TestLabelsSort(t *testing.T) {
ls := []string{"server", "http", "register", "mdns", "broker", "broker1", "broker", "broker2", "server", "tcp"}
Sort(&ls)
var nls Labels
nls.keys = []string{"register"}
nls.vals = []string{"gossip"}
ls = ls.Append(nls)
//ls.Sort()
if ls.keys[0] != "type" || ls.vals[0] != "noop" {
t.Fatalf("append error: %v", ls)
}
}
func TestIterator(t *testing.T) {
options := NewOptions(
Label("name", "svc1"),
Label("version", "0.0.1"),
Label("id", "12345"),
Label("type", "noop"),
Label("server", "http"),
Label("register", "gossip"),
Label("aa", "kk"),
Label("zz", "kk"),
)
iter := options.Labels.Iter()
var k, v string
cnt := 0
for iter.Next(&k, &v) {
if cnt == 4 && (k != "server" || v != "http") {
t.Fatalf("iter error: %s != %s || %s != %s", k, "server", v, "http")
}
cnt++
if ls[0] != "broker" || ls[1] != "broker2" {
t.Fatalf("sort error: %v", ls)
}
}

View File

@@ -107,7 +107,7 @@ func (r *noopMeter) String() string {
}
type noopCounter struct {
labels Labels
labels []string
}
func (r *noopCounter) Add(int) {
@@ -131,7 +131,7 @@ func (r *noopCounter) Set(uint64) {
}
type noopFloatCounter struct {
labels Labels
labels []string
}
func (r *noopFloatCounter) Add(float64) {
@@ -151,7 +151,7 @@ func (r *noopFloatCounter) Sub(float64) {
}
type noopGauge struct {
labels Labels
labels []string
}
func (r *noopGauge) Get() float64 {
@@ -159,7 +159,7 @@ func (r *noopGauge) Get() float64 {
}
type noopSummary struct {
labels Labels
labels []string
}
func (r *noopSummary) Update(float64) {
@@ -171,7 +171,7 @@ func (r *noopSummary) UpdateDuration(time.Time) {
}
type noopHistogram struct {
labels Labels
labels []string
}
func (r *noopHistogram) Reset() {

View File

@@ -11,17 +11,26 @@ type Option func(*Options)
// Options for metrics implementations:
type Options struct {
Name string
// Logger used for logging
Logger logger.Logger
// Context holds external options
Context context.Context
// Name holds the meter name
Name string
// Address holds the address that serves metrics
Address string
Path string
Labels Labels
//TimingObjectives map[float64]float64
Logger logger.Logger
Context context.Context
MetricPrefix string
LabelPrefix string
// Path holds the path for metrics
Path string
// MetricPrefix holds the prefix for all metrics
MetricPrefix string
// LabelPrefix holds the prefix for all labels
LabelPrefix string
// Labels holds the default labels
Labels []string
// WriteProcessMetrics flag to write process metrics
WriteProcessMetrics bool
WriteFDMetrics bool
// WriteFDMetrics flag to write fd metrics
WriteFDMetrics bool
}
// NewOptions prepares a set of options:
@@ -79,11 +88,9 @@ func Logger(l logger.Logger) Option {
}
}
// Label sets the label
func Label(key, val string) Option {
func Labels(ls ...string) Option {
return func(o *Options) {
o.Labels.keys = append(o.Labels.keys, key)
o.Labels.vals = append(o.Labels.vals, val)
o.Labels = ls
}
}

View File

@@ -28,19 +28,24 @@ var (
labelFailure = "failure"
labelStatus = "status"
labelEndpoint = "endpoint"
// DefaultSkipEndpoints contains list of endpoints that not evaluted by wrapper
DefaultSkipEndpoints = []string{"Meter.Metrics"}
)
type Options struct {
Meter meter.Meter
lopts []meter.Option
Meter meter.Meter
lopts []meter.Option
SkipEndpoints []string
}
type Option func(*Options)
func NewOptions(opts ...Option) Options {
options := Options{
Meter: meter.DefaultMeter,
lopts: make([]meter.Option, 0, 5),
Meter: meter.DefaultMeter,
lopts: make([]meter.Option, 0, 5),
SkipEndpoints: DefaultSkipEndpoints,
}
for _, o := range opts {
o(&options)
@@ -50,19 +55,19 @@ func NewOptions(opts ...Option) Options {
func ServiceName(name string) Option {
return func(o *Options) {
o.lopts = append(o.lopts, meter.Label("name", name))
o.lopts = append(o.lopts, meter.Labels("name", name))
}
}
func ServiceVersion(version string) Option {
return func(o *Options) {
o.lopts = append(o.lopts, meter.Label("version", version))
o.lopts = append(o.lopts, meter.Labels("version", version))
}
}
func ServiceID(id string) Option {
return func(o *Options) {
o.lopts = append(o.lopts, meter.Label("id", id))
o.lopts = append(o.lopts, meter.Labels("id", id))
}
}
@@ -72,10 +77,16 @@ func Meter(m meter.Meter) Option {
}
}
func SkipEndoints(eps ...string) Option {
return func(o *Options) {
o.SkipEndpoints = append(o.SkipEndpoints, eps...)
}
}
type wrapper struct {
opts Options
callFunc client.CallFunc
client.Client
callFunc client.CallFunc
opts Options
}
func NewClientWrapper(opts ...Option) client.Wrapper {
@@ -100,21 +111,25 @@ func NewCallWrapper(opts ...Option) client.CallWrapper {
func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range w.opts.SkipEndpoints {
if ep == endpoint {
return w.callFunc(ctx, addr, req, rsp, opts)
}
}
ts := time.Now()
err := w.callFunc(ctx, addr, req, rsp, opts)
te := time.Since(ts)
lopts := w.opts.lopts
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint))
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(ClientRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil {
lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess))
} else {
lopts = append(lopts, meter.Label(labelStatus, labelFailure))
lopts = append(lopts, meter.Labels(labelStatus, labelFailure))
}
w.opts.Meter.Counter(ClientRequestTotal, lopts...).Inc()
@@ -123,21 +138,26 @@ func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request,
func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range w.opts.SkipEndpoints {
if ep == endpoint {
return w.Client.Call(ctx, req, rsp, opts...)
}
}
ts := time.Now()
err := w.Client.Call(ctx, req, rsp, opts...)
te := time.Since(ts)
lopts := w.opts.lopts
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint))
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(ClientRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil {
lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess))
} else {
lopts = append(lopts, meter.Label(labelStatus, labelFailure))
lopts = append(lopts, meter.Labels(labelStatus, labelFailure))
}
w.opts.Meter.Counter(ClientRequestTotal, lopts...).Inc()
@@ -146,21 +166,26 @@ func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{},
func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range w.opts.SkipEndpoints {
if ep == endpoint {
return w.Client.Stream(ctx, req, opts...)
}
}
ts := time.Now()
stream, err := w.Client.Stream(ctx, req, opts...)
te := time.Since(ts)
lopts := w.opts.lopts
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint))
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(ClientRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil {
lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess))
} else {
lopts = append(lopts, meter.Label(labelStatus, labelFailure))
lopts = append(lopts, meter.Labels(labelStatus, labelFailure))
}
w.opts.Meter.Counter(ClientRequestTotal, lopts...).Inc()
@@ -175,15 +200,15 @@ func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.
te := time.Since(ts)
lopts := w.opts.lopts
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint))
w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(PublishMessageDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil {
lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess))
} else {
lopts = append(lopts, meter.Label(labelStatus, labelFailure))
lopts = append(lopts, meter.Labels(labelStatus, labelFailure))
}
w.opts.Meter.Counter(PublishMessageTotal, lopts...).Inc()
@@ -200,21 +225,26 @@ func NewHandlerWrapper(opts ...Option) server.HandlerWrapper {
func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
endpoint := req.Endpoint()
for _, ep := range w.opts.SkipEndpoints {
if ep == endpoint {
return fn(ctx, req, rsp)
}
}
ts := time.Now()
err := fn(ctx, req, rsp)
te := time.Since(ts)
lopts := w.opts.lopts
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint))
w.opts.Meter.Summary(ServerRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(ServerRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil {
lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess))
} else {
lopts = append(lopts, meter.Label(labelStatus, labelFailure))
lopts = append(lopts, meter.Labels(labelStatus, labelFailure))
}
w.opts.Meter.Counter(ServerRequestTotal, lopts...).Inc()
@@ -238,15 +268,15 @@ func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc
te := time.Since(ts)
lopts := w.opts.lopts
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint))
w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(SubscribeMessageDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil {
lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess))
} else {
lopts = append(lopts, meter.Label(labelStatus, labelFailure))
lopts = append(lopts, meter.Labels(labelStatus, labelFailure))
}
w.opts.Meter.Counter(SubscribeMessageTotal, lopts...).Inc()

View File

@@ -15,6 +15,18 @@ type Option func(*Options)
// Options configure network
type Options struct {
// Router used for routing
Router router.Router
// Proxy holds the proxy
Proxy proxy.Proxy
// Logger used for logging
Logger logger.Logger
// Meter used for metrics
Meter meter.Meter
// Tracer used for tracing
Tracer tracer.Tracer
// Tunnel used for transfer data
Tunnel tunnel.Tunnel
// Id of the node
Id string
// Name of the network
@@ -25,18 +37,6 @@ type Options struct {
Advertise string
// Nodes is a list of nodes to connect to
Nodes []string
// Tunnel is network tunnel
Tunnel tunnel.Tunnel
// Router is network router
Router router.Router
// Proxy is network proxy
Proxy proxy.Proxy
// Logger
Logger logger.Logger
// Meter
Meter meter.Meter
// Tracer
Tracer tracer.Tracer
}
// Id sets the id of the network node

View File

@@ -14,19 +14,14 @@ import (
)
type memorySocket struct {
recv chan *Message
send chan *Message
// sock exit
exit chan bool
// listener exit
lexit chan bool
local string
remote string
// for send/recv transport.Timeout
timeout time.Duration
ctx context.Context
recv chan *Message
exit chan bool
lexit chan bool
send chan *Message
local string
remote string
timeout time.Duration
sync.RWMutex
}
@@ -36,19 +31,19 @@ type memoryClient struct {
}
type memoryListener struct {
addr string
topts Options
ctx context.Context
lopts ListenOptions
exit chan bool
conn chan *memorySocket
lopts ListenOptions
topts Options
addr string
sync.RWMutex
ctx context.Context
}
type memoryTransport struct {
opts Options
sync.RWMutex
opts Options
listeners map[string]*memoryListener
sync.RWMutex
}
func (ms *memorySocket) Recv(m *Message) error {

View File

@@ -13,26 +13,24 @@ import (
// Options struct holds the transport options
type Options struct {
Name string
// Addrs is the list of intermediary addresses to connect to
Addrs []string
// Codec is the codec interface to use where headers are not supported
// by the transport and the entire payload must be encoded
Codec codec.Codec
// TLSConfig to secure the connection. The assumption is that this
// is mTLS keypair
TLSConfig *tls.Config
// Timeout sets the timeout for Send/Recv
Timeout time.Duration
// Logger sets the logger
Logger logger.Logger
// Meter sets the meter
// Meter used for metrics
Meter meter.Meter
// Tracer sets the tracer
// Tracer used for tracing
Tracer tracer.Tracer
// Other options for implementations of the interface
// can be stored in a context
// Codec used for marshal/unmarshal messages
Codec codec.Codec
// Logger used for logging
Logger logger.Logger
// Context holds external options
Context context.Context
// TLSConfig holds tls.TLSConfig options
TLSConfig *tls.Config
// Name holds the transport name
Name string
// Addrs holds the transport addrs
Addrs []string
// Timeout holds the timeout
Timeout time.Duration
}
// NewOptions returns new options
@@ -53,18 +51,12 @@ func NewOptions(opts ...Option) Options {
// DialOptions struct
type DialOptions struct {
// Tells the transport this is a streaming connection with
// multiple calls to send/recv and that send may not even be called
Stream bool
// Timeout for dialing
Timeout time.Duration
// TODO: add tls options when dialling
// Currently set in global options
// Other options for implementations of the interface
// can be stored in a context
// Context holds the external options
Context context.Context
// Timeout holds the timeout
Timeout time.Duration
// Stream flag
Stream bool
}
// NewDialOptions returns new DialOptions
@@ -85,10 +77,10 @@ func NewDialOptions(opts ...DialOption) DialOptions {
type ListenOptions struct {
// TODO: add tls options when listening
// Currently set in global options
// Other options for implementations of the interface
// can be stored in a context
// Context holds the external options
Context context.Context
// TLSConfig holds the *tls.Config options
TLSConfig *tls.Config
}
// NewListenOptions returns new ListenOptions

View File

@@ -17,17 +17,16 @@ type tunBroker struct {
}
type tunSubscriber struct {
topic string
handler broker.Handler
opts broker.SubscribeOptions
closed chan bool
opts broker.SubscribeOptions
listener tunnel.Listener
handler broker.Handler
closed chan bool
topic string
}
type tunEvent struct {
topic string
message *broker.Message
topic string
}
// used to access tunnel from options context

View File

@@ -22,23 +22,24 @@ type Option func(*Options)
// Options provides network configuration options
type Options struct {
Name string
// Id is tunnel id
Id string
// Address is tunnel address
Address string
// Nodes are remote nodes
Nodes []string
// The shared auth token
Token string
// Transport listens to incoming connections
Transport transport.Transport
// Logger
// Logger used for logging
Logger logger.Logger
// Meter
// Meter used for metrics
Meter meter.Meter
// Tracer
// Tracer used for tracing
Tracer tracer.Tracer
// Transport used for communication
Transport transport.Transport
// Token the shared auth token
Token string
// Name holds the tunnel name
Name string
// Id holds the tunnel id
Id string
// Address holds the tunnel address
Address string
// Nodes holds the tunnel nodes
Nodes []string
}
// DialOption func
@@ -61,9 +62,9 @@ type ListenOption func(*ListenOptions)
// ListenOptions provides listen options
type ListenOptions struct {
// specify mode of the session
// Mode specify mode of the session
Mode Mode
// The read timeout
// Timeout the read timeout
Timeout time.Duration
}

View File

@@ -23,33 +23,44 @@ import (
// Options for micro service
type Options struct {
Name string
Version string
Metadata metadata.Metadata
Auths []auth.Auth
Brokers []broker.Broker
Loggers []logger.Logger
Meters []meter.Meter
Configs []config.Config
Clients []client.Client
Servers []server.Server
Stores []store.Store
Registers []register.Register
Tracers []tracer.Tracer
Routers []router.Router
// Runtime runtime.Runtime
// Profile profile.Profile
// Before and After funcs
BeforeStart []func(context.Context) error
BeforeStop []func(context.Context) error
AfterStart []func(context.Context) error
AfterStop []func(context.Context) error
// Other options for implementations of the interface
// can be stored in a context
// Context holds external options or cancel stuff
Context context.Context
// Metadata holds service metadata
Metadata metadata.Metadata
// Version holds service version
Version string
// Name holds service name
Name string
// Brokers holds brokers
Brokers []broker.Broker
// Loggers holds loggers
Loggers []logger.Logger
// Meters holds meter
Meters []meter.Meter
// Configs holds config
Configs []config.Config
// Clients holds clients
Clients []client.Client
// Auths holds auths
Auths []auth.Auth
// Stores holds stores
Stores []store.Store
// Registers holds registers
Registers []register.Register
// Tracers holds tracers
Tracers []tracer.Tracer
// Routers holds routers
Routers []router.Router
// BeforeStart holds funcs that runs before service starts
BeforeStart []func(context.Context) error
// BeforeStop holds funcs that runs before service stops
BeforeStop []func(context.Context) error
// AfterStart holds funcs that runs after service starts
AfterStart []func(context.Context) error
// AfterStop holds funcs that runs after service stops
AfterStop []func(context.Context) error
// Servers holds servers
Servers []server.Server
}
// NewOptions returns new Options filled with defaults and overrided by provided opts

View File

@@ -11,9 +11,9 @@ import (
)
type httpProfile struct {
server *http.Server
sync.Mutex
running bool
server *http.Server
}
var (

View File

@@ -13,16 +13,12 @@ import (
)
type profiler struct {
opts profile.Options
exit chan bool
cpuFile *os.File
memFile *os.File
opts profile.Options
sync.Mutex
running bool
exit chan bool
// where the cpu profile is written
cpuFile *os.File
// where the mem profile is written
memFile *os.File
}
func (p *profiler) writeHeap(f *os.File) {

View File

@@ -11,20 +11,20 @@ import (
// Options for proxy
type Options struct {
// Specific endpoint to always call
Endpoint string
// The default client to use
Client client.Client
// The default router to use
Router router.Router
// Extra links for different clients
Links map[string]client.Client
// Logger
Logger logger.Logger
// Meter
Meter meter.Meter
// Tracer
// Tracer used for tracing
Tracer tracer.Tracer
// Client for communication
Client client.Client
// Router for routing
Router router.Router
// Logger used for logging
Logger logger.Logger
// Meter used for metrics
Meter meter.Meter
// Links holds the communication links
Links map[string]client.Client
// Endpoint holds the destination address
Endpoint string
}
// Option func signature

View File

@@ -3,7 +3,6 @@ package register
import (
"fmt"
"reflect"
"strings"
"unicode"
"unicode/utf8"
@@ -11,20 +10,21 @@ import (
)
// ExtractValue from reflect.Type from specified depth
func ExtractValue(v reflect.Type, d int) *Value {
func ExtractValue(v reflect.Type, d int) string {
if d == 3 {
return nil
return ""
}
if v == nil {
return nil
return ""
}
if v.Kind() == reflect.Ptr {
v = v.Elem()
}
if len(v.Name()) == 0 {
return nil
// slices and maps don't have a defined name
if (v.Kind() == reflect.Slice || v.Kind() == reflect.Map) || len(v.Name()) == 0 {
return ""
}
// get the rune character
@@ -32,48 +32,10 @@ func ExtractValue(v reflect.Type, d int) *Value {
// crude check for is unexported field
if unicode.IsLower(a) {
return nil
return ""
}
arg := &Value{
Name: v.Name(),
Type: v.Name(),
}
switch v.Kind() {
case reflect.Struct:
for i := 0; i < v.NumField(); i++ {
f := v.Field(i)
val := ExtractValue(f.Type, d+1)
if val == nil {
continue
}
// if we can find a json tag use it
if tags := f.Tag.Get("json"); len(tags) > 0 {
parts := strings.Split(tags, ",")
if parts[0] == "-" || parts[0] == "omitempty" {
continue
}
val.Name = parts[0]
}
// if there's no name default it
if len(val.Name) == 0 {
val.Name = v.Field(i).Name
}
arg.Values = append(arg.Values, val)
}
case reflect.Slice:
p := v.Elem()
if p.Kind() == reflect.Ptr {
p = p.Elem()
}
arg.Type = "[]" + p.Name()
}
return arg
return v.Name()
}
// ExtractEndpoint extract *Endpoint from reflect.Method
@@ -105,7 +67,7 @@ func ExtractEndpoint(method reflect.Method) *Endpoint {
request := ExtractValue(reqType, 0)
response := ExtractValue(rspType, 0)
if request == nil || response == nil {
if request == "" || response == "" {
return nil
}
@@ -124,7 +86,7 @@ func ExtractEndpoint(method reflect.Method) *Endpoint {
}
// ExtractSubValue exctact *Value from reflect.Type
func ExtractSubValue(typ reflect.Type) *Value {
func ExtractSubValue(typ reflect.Type) string {
var reqType reflect.Type
switch typ.NumIn() {
case 1:
@@ -134,7 +96,7 @@ func ExtractSubValue(typ reflect.Type) *Value {
case 3:
reqType = typ.In(2)
default:
return nil
return ""
}
return ExtractValue(reqType, 0)
}

View File

@@ -36,28 +36,21 @@ func TestExtractEndpoint(t *testing.T) {
t.Fatalf("Expected handler Test, got %s", endpoints[0].Name)
}
if endpoints[0].Request == nil {
if endpoints[0].Request == "" {
t.Fatal("Expected non nil Request")
}
if endpoints[0].Response == nil {
if endpoints[0].Response == "" {
t.Fatal("Expected non nil Request")
}
if endpoints[0].Request.Name != "TestRequest" {
t.Fatalf("Expected TestRequest got %s", endpoints[0].Request.Name)
if endpoints[0].Request != "TestRequest" {
t.Fatalf("Expected TestRequest got %s", endpoints[0].Request)
}
if endpoints[0].Response.Name != "TestResponse" {
t.Fatalf("Expected TestResponse got %s", endpoints[0].Response.Name)
}
if endpoints[0].Request.Type != "TestRequest" {
t.Fatalf("Expected TestRequest type got %s", endpoints[0].Request.Type)
}
if endpoints[0].Response.Type != "TestResponse" {
t.Fatalf("Expected TestResponse type got %s", endpoints[0].Response.Type)
if endpoints[0].Response != "TestResponse" {
t.Fatalf("Expected TestResponse got %s", endpoints[0].Response)
}
t.Logf("XXX %#+v\n", endpoints[0])
}

View File

@@ -16,9 +16,9 @@ var (
)
type node struct {
*Node
TTL time.Duration
LastSeen time.Time
*Node
TTL time.Duration
}
type record struct {
@@ -405,10 +405,10 @@ func (m *memory) String() string {
}
type watcher struct {
id string
wo WatchOptions
res chan *Result
exit chan bool
wo WatchOptions
id string
}
func (m *watcher) Next() (*Result, error) {
@@ -490,15 +490,6 @@ func recordToService(r *record, domain string) *Service {
endpoints := make([]*Endpoint, len(r.Endpoints))
for i, e := range r.Endpoints {
request := new(Value)
if e.Request != nil {
*request = *e.Request
}
response := new(Value)
if e.Response != nil {
*response = *e.Response
}
metadata := make(map[string]string, len(e.Metadata))
for k, v := range e.Metadata {
metadata[k] = v
@@ -506,8 +497,8 @@ func recordToService(r *record, domain string) *Service {
endpoints[i] = &Endpoint{
Name: e.Name,
Request: request,
Response: response,
Request: e.Request,
Response: e.Response,
Metadata: metadata,
}
}

View File

@@ -12,20 +12,22 @@ import (
// Options holds options for register
type Options struct {
Name string
Addrs []string
Timeout time.Duration
TLSConfig *tls.Config
// Logger that will be used
Logger logger.Logger
// Meter that will be used
Meter meter.Meter
// Tracer
// Tracer used for tracing
Tracer tracer.Tracer
// Other options for implementations of the interface
// can be stored in a context
// Context holds external options
Context context.Context
// Logged used for logging
Logger logger.Logger
// Meter used for metrics
Meter meter.Meter
// TLSConfig holds tls.TLSConfig options
TLSConfig *tls.Config
// Name holds the name of register
Name string
// Addrs specifies register addrs
Addrs []string
// Timeout specifies timeout
Timeout time.Duration
}
// NewOptions returns options that filled by opts
@@ -44,13 +46,9 @@ func NewOptions(opts ...Option) Options {
// RegisterOptions holds options for register method
type RegisterOptions struct {
TTL time.Duration
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
// Domain to register the service in
Domain string
// Attempts specify attempts for register
Context context.Context
Domain string
TTL time.Duration
Attempts int
}

View File

@@ -52,24 +52,17 @@ type Service struct {
// Node holds node register info
type Node struct {
Metadata metadata.Metadata `json:"metadata"`
Id string `json:"id"`
Address string `json:"address"`
Metadata metadata.Metadata `json:"metadata"`
}
// Endpoint holds endpoint register info
type Endpoint struct {
Name string `json:"name"`
Request *Value `json:"request"`
Response *Value `json:"response"`
Request string `json:"request"`
Response string `json:"response"`
Metadata metadata.Metadata `json:"metadata"`
}
// Value holds additional kv stuff
type Value struct {
Name string `json:"name"`
Type string `json:"type"`
Values []*Value `json:"values"`
Name string `json:"name"`
}
// Option func signature

View File

@@ -7,14 +7,17 @@ import "time"
type Watcher interface {
// Next is a blocking call
Next() (*Result, error)
// Stop stops the watcher
Stop()
}
// Result is returned by a call to Next on
// the watcher. Actions can be create, update, delete
type Result struct {
Action string
// Service holds register service
Service *Service
// Action holds the action
Action string
}
// EventType defines register event type
@@ -45,12 +48,12 @@ func (t EventType) String() string {
// Event is register event
type Event struct {
// Id is register id
Id string
// Type defines type of event
Type EventType
// Timestamp is event timestamp
Timestamp time.Time
// Service is register service
Service *Service
// Id is register id
Id string
// Type defines type of event
Type EventType
}

View File

@@ -12,9 +12,9 @@ import (
// Resolver is a DNS network resolve
type Resolver struct {
// The resolver address to use
Address string
goresolver *net.Resolver
// Address of resolver to use
Address string
sync.RWMutex
}

91
router/dns.go Normal file
View File

@@ -0,0 +1,91 @@
package router
import (
"fmt"
"net"
"strconv"
)
// NewRouter returns an initialized dns router
func NewRouter(opts ...Option) Router {
options := NewOptions(opts...)
return &dns{options}
}
type dns struct {
options Options
}
func (d *dns) Init(opts ...Option) error {
for _, o := range opts {
o(&d.options)
}
return nil
}
func (d *dns) Options() Options {
return d.options
}
func (d *dns) Table() Table {
return nil
}
func (d *dns) Close() error {
return nil
}
func (d *dns) Lookup(opts ...QueryOption) ([]Route, error) {
options := NewQuery(opts...)
// check to see if we have the port provided in the service, e.g. go-micro-srv-foo:8000
host, port, err := net.SplitHostPort(options.Service)
if err == nil {
// lookup the service using A records
ips, err := net.LookupHost(host)
if err != nil {
return nil, err
}
p, _ := strconv.Atoi(port)
// convert the ip addresses to routes
result := make([]Route, len(ips))
for i, ip := range ips {
result[i] = Route{
Service: options.Service,
Address: fmt.Sprintf("%s:%d", ip, uint16(p)),
}
}
return result, nil
}
// we didn't get the port so we'll lookup the service using SRV records. If we can't lookup the
// service using the SRV record, we return the error.
_, nodes, err := net.LookupSRV(options.Service, "tcp", d.options.Network)
if err != nil {
return nil, err
}
// convert the nodes (net services) to routes
result := make([]Route, len(nodes))
for i, n := range nodes {
result[i] = Route{
Service: options.Service,
Address: fmt.Sprintf("%s:%d", n.Target, n.Port),
Network: d.options.Network,
}
}
return result, nil
}
func (d *dns) Watch(opts ...WatchOption) (Watcher, error) {
return nil, nil
}
func (d *dns) Name() string {
return d.options.Name
}
func (d *dns) String() string {
return "dns"
}

View File

@@ -10,23 +10,15 @@ import (
// Options are router options
type Options struct {
Name string
// Id is router id
Id string
// Address is router address
Address string
// Gateway is network gateway
Gateway string
// Network is network address
Network string
// Register is the local register
Logger logger.Logger
Context context.Context
Register register.Register
// Precache routes
Name string
Gateway string
Network string
Id string
Address string
Precache bool
// Logger
Logger logger.Logger
// Context for additional options
Context context.Context
}
// Id sets Router Id

View File

@@ -15,10 +15,10 @@ var (
// Route is network route
type Route struct {
// Metadata for the route
Metadata metadata.Metadata
// Service is destination service name
Service string
// Address is service node address
Address string
// Gateway is route gateway
Gateway string
// Network is network address
@@ -27,10 +27,10 @@ type Route struct {
Router string
// Link is network link
Link string
// Address is service node address
Address string
// Metric is the route cost metric
Metric int64
// Metadata for the route
Metadata metadata.Metadata
}
// Hash returns route hash sum.

View File

@@ -7,7 +7,7 @@ import (
var (
// DefaultRouter is the global default router
DefaultRouter Router
DefaultRouter Router = NewRouter()
// DefaultNetwork is default micro network
DefaultNetwork = "micro"
// ErrRouteNotFound is returned when no route was found in the routing table

View File

@@ -38,14 +38,14 @@ func (t EventType) String() string {
// Event is returned by a call to Next on the watcher.
type Event struct {
// Unique id of the event
// Route is table route
Route Route
// Timestamp is event timestamp
Timestamp time.Time
// Id of the event
Id string
// Type defines type of event
Type EventType
// Timestamp is event timestamp
Timestamp time.Time
// Route is table route
Route Route
}
// Watcher defines routing table watcher interface

View File

@@ -10,18 +10,12 @@ import (
// Options configure runtime
type Options struct {
// Scheduler for updates
Scheduler Scheduler
// Service type to manage
Type string
// Source of the services repository
Source string
// Base image to use
Image string
// Client to use when making requests
Client client.Client
// Logger
Logger logger.Logger
Client client.Client
Logger logger.Logger
Type string
Source string
Image string
}
// Option func signature
@@ -77,42 +71,26 @@ type ReadOption func(o *ReadOptions)
// CreateOptions configure runtime services
type CreateOptions struct {
// Command to execut
Command []string
// Args to pass into command
Args []string
// Environment to configure
Env []string
// Log output
Output io.Writer
// Type of service to create
Type string
// Retries before failing deploy
Retries int
// Specify the image to use
Image string
// Namespace to create the service in
Namespace string
// Specify the context to use
Context context.Context
// Secrets to use
Secrets map[string]string
// Resources to allocate the service
Context context.Context
Output io.Writer
Resources *Resources
Secrets map[string]string
Image string
Namespace string
Type string
Command []string
Args []string
Env []string
Retries int
}
// ReadOptions queries runtime services
type ReadOptions struct {
// Service name
Service string
// Version queries services with given version
Version string
// Type of service
Type string
// Namespace the service is running in
Context context.Context
Service string
Version string
Type string
Namespace string
// Specify the context to use
Context context.Context
}
// CreateType sets the type of service to create
@@ -238,12 +216,9 @@ type UpdateOption func(o *UpdateOptions)
// UpdateOptions struct
type UpdateOptions struct {
// Namespace the service is running in
Context context.Context
Secrets map[string]string
Namespace string
// Specify the context to use
Context context.Context
// Secrets to use
Secrets map[string]string
}
// UpdateSecret sets a secret to provide the service with
@@ -276,10 +251,8 @@ type DeleteOption func(o *DeleteOptions)
// DeleteOptions struct
type DeleteOptions struct {
// Namespace the service is running in
Context context.Context
Namespace string
// Specify the context to use
Context context.Context
}
// DeleteNamespace sets the namespace
@@ -301,14 +274,10 @@ type LogsOption func(o *LogsOptions)
// LogsOptions configure runtime logging
type LogsOptions struct {
// How many existing lines to show
Count int64
// Stream new lines?
Stream bool
// Namespace the service is running in
Context context.Context
Namespace string
// Specify the context to use
Context context.Context
Count int64
Stream bool
}
// LogsCount confiures how many existing lines to show

View File

@@ -37,15 +37,20 @@ type Runtime interface {
// Logs returns a log stream
type Logs interface {
// Error returns error
Error() error
// Chan return chan log
Chan() chan Log
// Stop stops the log stream
Stop() error
}
// Log is a log message
type Log struct {
Message string
// Metadata holds metadata
Metadata metadata.Metadata
// Message holds the message
Message string
}
// Scheduler is a runtime service scheduler
@@ -84,28 +89,28 @@ func (t EventType) String() string {
// Event is notification event
type Event struct {
// ID of the event
ID string
// Type is event type
Type EventType
// Timestamp is event timestamp
// Timestamp of event
Timestamp time.Time
// Service the event relates to
Service *Service
// Options to use when processing the event
Options *CreateOptions
// ID of the event
ID string
// Type is event type
Type EventType
}
// Service is runtime service
type Service struct {
// Metadata stores metadata
Metadata metadata.Metadata
// Name of the service
Name string
// Version of the service
Version string
// url location of source
// Name of the service
Source string
// Metadata stores metadata
Metadata metadata.Metadata
}
// Resources which are allocated to a serivce

3
server/generate.go Normal file
View File

@@ -0,0 +1,3 @@
package server
//go:generate protoc -I./health -I../ -I/home/vtolstov/.cache/go-path/pkg/mod/github.com/unistack-org/micro-proto@v0.0.1 --micro_out=components=micro|http|server,standalone=false,debug=true,paths=source_relative:./health health/health.proto

View File

@@ -7,10 +7,10 @@ import (
)
type rpcHandler struct {
name string
handler interface{}
endpoints []*register.Endpoint
opts HandlerOptions
handler interface{}
name string
endpoints []*register.Endpoint
}
func newRpcHandler(handler interface{}, opts ...HandlerOption) Handler {

87
server/health/health.go Normal file
View File

@@ -0,0 +1,87 @@
package health
import (
"context"
"github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/errors"
"github.com/unistack-org/micro/v3/server"
)
var (
// guard to fail early
_ HealthServer = &handler{}
)
type handler struct {
server server.Server
opts Options
}
type CheckFunc func(context.Context) error
type Option func(*Options)
type Options struct {
LiveChecks []CheckFunc
ReadyChecks []CheckFunc
Version string
Name string
}
func LiveChecks(fns ...CheckFunc) Option {
return func(o *Options) {
o.LiveChecks = append(o.LiveChecks, fns...)
}
}
func ReadyChecks(fns ...CheckFunc) Option {
return func(o *Options) {
o.ReadyChecks = append(o.ReadyChecks, fns...)
}
}
func Name(name string) Option {
return func(o *Options) {
o.Name = name
}
}
func Version(version string) Option {
return func(o *Options) {
o.Version = version
}
}
func NewHandler(opts ...Option) *handler {
options := Options{}
for _, o := range opts {
o(&options)
}
return &handler{opts: options}
}
func (h *handler) Live(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
var err error
for _, fn := range h.opts.LiveChecks {
if err = fn(ctx); err != nil {
return errors.ServiceUnavailable(h.opts.Name, "%v", err)
}
}
return nil
}
func (h *handler) Ready(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
var err error
for _, fn := range h.opts.ReadyChecks {
if err = fn(ctx); err != nil {
return errors.ServiceUnavailable(h.opts.Name, "%v", err)
}
}
return nil
}
func (h *handler) Version(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
rsp.Data = []byte(h.opts.Version)
return nil
}

View File

@@ -0,0 +1,62 @@
syntax = "proto3";
package micro.server.health;
option go_package = "github.com/unistack-org/micro/v3/server/health;health";
import "api/annotations.proto";
import "openapiv2/annotations.proto";
import "codec/frame.proto";
service Health {
rpc Live(micro.codec.Frame) returns (micro.codec.Frame) {
option (micro.openapiv2.openapiv2_operation) = {
operation_id: "Live";
responses: {
key: "default";
value: {
description: "Error response";
schema: {
json_schema: {
ref: "micro.codec.Frame";
}
}
}
}
};
option (micro.api.http) = { get: "/live"; };
};
rpc Ready(micro.codec.Frame) returns (micro.codec.Frame) {
option (micro.openapiv2.openapiv2_operation) = {
operation_id: "Ready";
responses: {
key: "default";
value: {
description: "Error response";
schema: {
json_schema: {
ref: "micro.codec.Frame";
}
}
}
}
};
option (micro.api.http) = { get: "/ready"; };
};
rpc Version(micro.codec.Frame) returns (micro.codec.Frame) {
option (micro.openapiv2.openapiv2_operation) = {
operation_id: "Version";
responses: {
key: "default";
value: {
description: "Error response";
schema: {
json_schema: {
ref: "micro.codec.Frame";
}
}
}
}
};
option (micro.api.http) = { get: "/version"; };
};
};

View File

@@ -0,0 +1,38 @@
// Code generated by protoc-gen-micro
// source: health.proto
package health
import (
context "context"
api "github.com/unistack-org/micro/v3/api"
codec "github.com/unistack-org/micro/v3/codec"
)
func NewHealthEndpoints() []*api.Endpoint {
return []*api.Endpoint{
&api.Endpoint{
Name: "Health.Live",
Path: []string{"/live"},
Method: []string{"GET"},
Handler: "rpc",
},
&api.Endpoint{
Name: "Health.Ready",
Path: []string{"/ready"},
Method: []string{"GET"},
Handler: "rpc",
},
&api.Endpoint{
Name: "Health.Version",
Path: []string{"/version"},
Method: []string{"GET"},
Handler: "rpc",
},
}
}
type HealthServer interface {
Live(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
Ready(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
Version(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
}

View File

@@ -0,0 +1,43 @@
// Code generated by protoc-gen-micro
// source: health.proto
package health
import (
context "context"
api "github.com/unistack-org/micro/v3/api"
codec "github.com/unistack-org/micro/v3/codec"
server "github.com/unistack-org/micro/v3/server"
)
type healthServer struct {
HealthServer
}
func (h *healthServer) Live(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
return h.HealthServer.Live(ctx, req, rsp)
}
func (h *healthServer) Ready(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
return h.HealthServer.Ready(ctx, req, rsp)
}
func (h *healthServer) Version(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
return h.HealthServer.Version(ctx, req, rsp)
}
func RegisterHealthServer(s server.Server, sh HealthServer, opts ...server.HandlerOption) error {
type health interface {
Live(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
Ready(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
Version(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
}
type Health struct {
health
}
h := &healthServer{sh}
var nopts []server.HandlerOption
for _, endpoint := range NewHealthEndpoints() {
nopts = append(nopts, api.WithEndpoint(endpoint))
}
return s.Handle(s.NewHandler(&Health{h}, append(nopts, opts...)...))
}

View File

@@ -32,16 +32,16 @@ const (
)
type noopServer struct {
h Handler
opts Options
h Handler
rsvc *register.Service
handlers map[string]Handler
subscribers map[*subscriber][]broker.Subscriber
registered bool
started bool
exit chan chan error
wg *sync.WaitGroup
sync.RWMutex
registered bool
started bool
}
// NewServer returns new noop server
@@ -160,21 +160,15 @@ func (n *noopServer) Register() error {
n.RLock()
// Maps are ordered randomly, sort the keys for consistency
var handlerList []string
for n, e := range n.handlers {
// Only advertise non internal handlers
if !e.Options().Internal {
handlerList = append(handlerList, n)
}
for n, _ := range n.handlers {
handlerList = append(handlerList, n)
}
sort.Strings(handlerList)
var subscriberList []*subscriber
for e := range n.subscribers {
// Only advertise non internal subscribers
if !e.Options().Internal {
subscriberList = append(subscriberList, e)
}
subscriberList = append(subscriberList, e)
}
sort.Slice(subscriberList, func(i, j int) bool {
return subscriberList[i].topic > subscriberList[j].topic

View File

@@ -23,50 +23,66 @@ type Option func(*Options)
// Options server struct
type Options struct {
Codecs map[string]codec.Codec
Broker broker.Broker
Register register.Register
Tracer tracer.Tracer
Auth auth.Auth
Logger logger.Logger
Meter meter.Meter
Transport transport.Transport
Metadata metadata.Metadata
Name string
Address string
Advertise string
Id string
Namespace string
Version string
HdlrWrappers []HandlerWrapper
SubWrappers []SubscriberWrapper
// Context holds the external options and can be used for server shutdown
Context context.Context
// Broker holds the server broker
Broker broker.Broker
// Register holds the register
Register register.Register
// Tracer holds the tracer
Tracer tracer.Tracer
// Auth holds the auth
Auth auth.Auth
// Logger holds the logger
Logger logger.Logger
// Meter holds the meter
Meter meter.Meter
// Transport holds the transport
Transport transport.Transport
// RegisterCheck runs a check function before registering the service
RegisterCheck func(context.Context) error
// The register expiry time
RegisterTTL time.Duration
// The interval on which to register
RegisterInterval time.Duration
// RegisterAttempts specify how many times try to register
RegisterAttempts int
// DeegisterAttempts specify how many times try to deregister
DeregisterAttempts int
// The router for requests
Router Router
// TLSConfig specifies tls.Config for secure serving
TLSConfig *tls.Config
Wait *sync.WaitGroup
/*
// Router for requests
Router Router
*/
// Listener may be passed if already created
Listener net.Listener
// MaxConn limit connections to server
// Wait group
Wait *sync.WaitGroup
// TLSConfig specifies tls.Config for secure serving
TLSConfig *tls.Config
// Metadata holds the server metadata
Metadata metadata.Metadata
// RegisterCheck run before register server
RegisterCheck func(context.Context) error
// Codecs map to handle content-type
Codecs map[string]codec.Codec
// Id holds the id of the server
Id string
// Namespace for te server
Namespace string
// Name holds the server name
Name string
// Address holds the server address
Address string
// Advertise holds the advertise address
Advertise string
// Version holds the server version
Version string
// SubWrappers holds the server subscribe wrappers
SubWrappers []SubscriberWrapper
// HdlrWrappers holds the handler wrappers
HdlrWrappers []HandlerWrapper
// RegisterAttempts holds the number of register attempts before error
RegisterAttempts int
// RegisterInterval holds he interval for re-register
RegisterInterval time.Duration
// RegisterTTL specifies TTL for register record
RegisterTTL time.Duration
// MaxConn limits number of connections
MaxConn int
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
// DeregisterAttempts holds the number of deregister attempts before error
DeregisterAttempts int
}
// NewOptions returns new options struct with default or passed values
@@ -250,12 +266,14 @@ func TLSConfig(t *tls.Config) Option {
}
}
/*
// WithRouter sets the request router
func WithRouter(r Router) Option {
return func(o *Options) {
o.Router = r
}
}
*/
// Wait tells the server to wait for requests to finish before exiting
// If `wg` is nil, server only wait for completion of rpc handler.
@@ -303,9 +321,10 @@ type HandlerOption func(*HandlerOptions)
// HandlerOptions struct
type HandlerOptions struct {
Internal bool
// Context holds external options
Context context.Context
// Metadata for hondler
Metadata map[string]metadata.Metadata
Context context.Context
}
// NewHandlerOptions creates new HandlerOptions
@@ -327,13 +346,14 @@ type SubscriberOption func(*SubscriberOptions)
// SubscriberOptions struct
type SubscriberOptions struct {
// AutoAck defaults to true. When a handler returns
// with a nil error the message is acked.
AutoAck bool
Queue string
Internal bool
// Context holds the external options
Context context.Context
// Queue holds the subscription queue
Queue string
// AutoAck flag for auto ack messages after processing
AutoAck bool
// BodyOnly flag specifies that message without headers
BodyOnly bool
Context context.Context
}
// NewSubscriberOptions create new SubscriberOptions
@@ -358,23 +378,6 @@ func EndpointMetadata(name string, md metadata.Metadata) HandlerOption {
}
}
// InternalHandler options specifies that a handler is not advertised
// to the discovery system. In the future this may also limit request
// to the internal network or authorised user.
func InternalHandler(b bool) HandlerOption {
return func(o *HandlerOptions) {
o.Internal = b
}
}
// InternalSubscriber options specifies that a subscriber is not advertised
// to the discovery system.
func InternalSubscriber(b bool) SubscriberOption {
return func(o *SubscriberOptions) {
o.Internal = b
}
}
// DisableAutoAck will disable auto acking of messages
// after they have been handled.
func DisableAutoAck() SubscriberOption {

View File

@@ -6,12 +6,12 @@ import (
)
type rpcMessage struct {
payload interface{}
codec codec.Codec
header metadata.Metadata
topic string
contentType string
payload interface{}
header metadata.Metadata
body []byte
codec codec.Codec
}
func (r *rpcMessage) ContentType() string {

View File

@@ -65,6 +65,7 @@ type Server interface {
String() string
}
/*
// Router handle serving messages
type Router interface {
// ProcessMessage processes a message
@@ -72,6 +73,7 @@ type Router interface {
// ServeRequest processes a request to completion
ServeRequest(ctx context.Context, req Request, rsp Response) error
}
*/
// Message is an async message interface
type Message interface {

View File

@@ -28,19 +28,19 @@ var (
)
type handler struct {
method reflect.Value
reqType reflect.Type
ctxType reflect.Type
method reflect.Value
}
type subscriber struct {
topic string
rcvr reflect.Value
opts SubscriberOptions
typ reflect.Type
subscriber interface{}
rcvr reflect.Value
topic string
handlers []*handler
endpoints []*register.Endpoint
opts SubscriberOptions
}
// Is this an exported - upper case - name?

View File

@@ -103,48 +103,48 @@ func (s *service) Init(opts ...Option) error {
// skip config as the struct not passed
continue
}
if err = cfg.Init(config.Context(s.opts.Context)); err != nil {
if err = cfg.Init(config.Context(cfg.Options().Context)); err != nil {
return err
}
if err = cfg.Load(s.opts.Context); err != nil {
if err = cfg.Load(cfg.Options().Context); err != nil {
return err
}
}
for _, log := range s.opts.Loggers {
if err = log.Init(logger.WithContext(s.opts.Context)); err != nil {
if err = log.Init(logger.WithContext(log.Options().Context)); err != nil {
return err
}
}
for _, reg := range s.opts.Registers {
if err = reg.Init(register.Context(s.opts.Context)); err != nil {
if err = reg.Init(register.Context(reg.Options().Context)); err != nil {
return err
}
}
for _, brk := range s.opts.Brokers {
if err = brk.Init(broker.Context(s.opts.Context)); err != nil {
if err = brk.Init(broker.Context(brk.Options().Context)); err != nil {
return err
}
}
for _, str := range s.opts.Stores {
if err = str.Init(store.Context(s.opts.Context)); err != nil {
if err = str.Init(store.Context(str.Options().Context)); err != nil {
return err
}
}
for _, srv := range s.opts.Servers {
if err = srv.Init(server.Context(s.opts.Context)); err != nil {
if err = srv.Init(server.Context(srv.Options().Context)); err != nil {
return err
}
}
for _, cli := range s.opts.Clients {
if err = cli.Init(client.Context(s.opts.Context)); err != nil {
if err = cli.Init(client.Context(cli.Options().Context)); err != nil {
return err
}
}

View File

@@ -14,28 +14,27 @@ import (
// Options contains configuration for the Store
type Options struct {
// Meter used for metrics
Meter meter.Meter
// Tracer used for tracing
Tracer tracer.Tracer
// Context holds external options
Context context.Context
// Codec used to marshal/unmarshal
Codec codec.Codec
// Logger used for logging
Logger logger.Logger
// TLSConfig holds tls.TLSConfig options
TLSConfig *tls.Config
// Name specifies store name
Name string
// Nodes contains the addresses or other connection information of the backing storage.
// For example, an etcd implementation would contain the nodes of the cluster.
// A SQL implementation could contain one or more connection strings.
Nodes []string
// Database allows multiple isolated stores to be kept in one backend, if supported.
// Database specifies store database
Database string
// Table is analag for a table in database backends or a key prefix in KV backends
// Table specifies store table
Table string
// Codec that used for marshal/unmarshal value
Codec codec.Codec
// Logger the logger
Logger logger.Logger
// Meter the meter
Meter meter.Meter
// Tracer the tacer
Tracer tracer.Tracer
// TLSConfig specifies tls.Config for secure
TLSConfig *tls.Config
// Context should contain all implementation specific options
Context context.Context
// Nodes contains store address
// TODO: replace with Addrs
Nodes []string
}
// NewOptions creates options struct
@@ -139,10 +138,14 @@ func NewReadOptions(opts ...ReadOption) ReadOptions {
// ReadOptions configures an individual Read operation
type ReadOptions struct {
Database string
Table string
// Context holds external options
Context context.Context
// Database holds the database name
Database string
// Table holds table name
Table string
// Namespace holds namespace
Namespace string
Context context.Context
}
// ReadOption sets values in ReadOptions
@@ -167,12 +170,18 @@ func NewWriteOptions(opts ...WriteOption) WriteOptions {
// WriteOptions configures an individual Write operation
type WriteOptions struct {
Database string
Table string
TTL time.Duration
Metadata metadata.Metadata
// Context holds external options
Context context.Context
// Metadata contains additional metadata
Metadata metadata.Metadata
// Database holds database name
Database string
// Table holds table name
Table string
// Namespace holds namespace
Namespace string
Context context.Context
// TTL specifies key TTL
TTL time.Duration
}
// WriteOption sets values in WriteOptions
@@ -211,10 +220,14 @@ func NewDeleteOptions(opts ...DeleteOption) DeleteOptions {
// DeleteOptions configures an individual Delete operation
type DeleteOptions struct {
Database string
Table string
// Context holds external options
Context context.Context
// Database holds database name
Database string
// Table holds table name
Table string
// Namespace holds namespace
Namespace string
Context context.Context
}
// DeleteOption sets values in DeleteOptions
@@ -239,18 +252,14 @@ func NewListOptions(opts ...ListOption) ListOptions {
// ListOptions configures an individual List operation
type ListOptions struct {
// List from the following
Database, Table string
// Prefix returns all keys that are prefixed with key
Prefix string
// Suffix returns all keys that end with key
Suffix string
// Limit limits the number of returned keys
Limit uint
// Offset when combined with Limit supports pagination
Offset uint
Namespace string
Context context.Context
Database string
Prefix string
Suffix string
Namespace string
Table string
Limit uint
Offset uint
}
// ListOption sets values in ListOptions
@@ -297,8 +306,10 @@ type ExistsOption func(*ExistsOptions)
// ExistsOptions holds options for Exists method
type ExistsOptions struct {
// Context holds external options
Context context.Context
// Namespace contains namespace
Namespace string
Context context.Context
}
// NewExistsOptions helper for Exists method

View File

@@ -7,23 +7,22 @@ import (
type memorySync struct {
options Options
mtx gosync.RWMutex
locks map[string]*memoryLock
locks map[string]*memoryLock
mtx gosync.RWMutex
}
type memoryLock struct {
id string
time time.Time
ttl time.Duration
release chan bool
id string
ttl time.Duration
}
type memoryLeader struct {
opts LeaderOptions
id string
resign func(id string) error
status chan bool
id string
}
func (m *memoryLeader) Resign() error {

View File

@@ -10,11 +10,17 @@ import (
// Options holds the sync options
type Options struct {
Nodes []string
Prefix string
// Logger used for logging
Logger logger.Logger
// Tracer used for tracing
Tracer tracer.Tracer
Meter meter.Meter
// Meter used for merics
Meter meter.Meter
// Prefix holds prefix?
Prefix string
// Nodes holds the nodes
// TODO: change to Addrs ?
Nodes []string
}
// Option func signature

View File

@@ -32,9 +32,9 @@ func (t *noopTracer) Name() string {
}
type noopSpan struct {
name string
ctx context.Context
tracer Tracer
name string
}
func (s *noopSpan) Finish(opts ...SpanOption) {

View File

@@ -14,10 +14,10 @@ type EventOption func(o *EventOptions)
// Options struct
type Options struct {
// Logger used for logging
Logger logger.Logger
// Name of the tracer
Name string
// Logger is the logger for messages
Logger logger.Logger
}
// Option func

View File

@@ -36,30 +36,30 @@ type Span interface {
}
type Label struct {
key string
val interface{}
key string
}
func Any(k string, v interface{}) Label {
return Label{k, v}
return Label{key: k, val: v}
}
func String(k string, v string) Label {
return Label{k, v}
return Label{key: k, val: v}
}
func Int(k string, v int) Label {
return Label{k, v}
return Label{key: k, val: v}
}
func Int64(k string, v int64) Label {
return Label{k, v}
return Label{key: k, val: v}
}
func Float64(k string, v float64) Label {
return Label{k, v}
return Label{key: k, val: v}
}
func Bool(k string, v bool) Label {
return Label{k, v}
return Label{key: k, val: v}
}

View File

@@ -11,12 +11,106 @@ import (
"github.com/unistack-org/micro/v3/tracer"
)
var (
DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
var labels []tracer.Label
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
var labels []tracer.Label
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
DefaultClientPublishObserver = func(ctx context.Context, msg client.Message, opts []client.PublishOption, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("Pub to %s", msg.Topic()))
var labels []tracer.Label
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
var labels []tracer.Label
if md, ok := metadata.FromIncomingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
DefaultServerSubscriberObserver = func(ctx context.Context, msg server.Message, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("Sub from %s", msg.Topic()))
var labels []tracer.Label
if md, ok := metadata.FromIncomingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
var labels []tracer.Label
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
DefaultSkipEndpoints = []string{"Meter.Metrics"}
)
type tWrapper struct {
opts Options
client.Client
serverHandler server.HandlerFunc
serverSubscriber server.SubscriberFunc
clientCallFunc client.CallFunc
client.Client
opts Options
}
type ClientCallObserver func(context.Context, client.Request, interface{}, []client.CallOption, tracer.Span, error)
@@ -26,18 +120,30 @@ type ClientCallFuncObserver func(context.Context, string, client.Request, interf
type ServerHandlerObserver func(context.Context, server.Request, interface{}, tracer.Span, error)
type ServerSubscriberObserver func(context.Context, server.Message, tracer.Span, error)
// Options struct
type Options struct {
Tracer tracer.Tracer
ClientCallObservers []ClientCallObserver
ClientStreamObservers []ClientStreamObserver
ClientPublishObservers []ClientPublishObserver
ClientCallFuncObservers []ClientCallFuncObserver
ServerHandlerObservers []ServerHandlerObserver
// Tracer that used for tracing
Tracer tracer.Tracer
// ClientCallObservers funcs
ClientCallObservers []ClientCallObserver
// ClientStreamObservers funcs
ClientStreamObservers []ClientStreamObserver
// ClientPublishObservers funcs
ClientPublishObservers []ClientPublishObserver
// ClientCallFuncObservers funcs
ClientCallFuncObservers []ClientCallFuncObserver
// ServerHandlerObservers funcs
ServerHandlerObservers []ServerHandlerObserver
// ServerSubscriberObservers funcs
ServerSubscriberObservers []ServerSubscriberObserver
// SkipEndpoints
SkipEndpoints []string
}
// Option func signature
type Option func(*Options)
// NewOptions create Options from Option slice
func NewOptions(opts ...Option) Options {
options := Options{
Tracer: tracer.DefaultTracer,
@@ -47,6 +153,7 @@ func NewOptions(opts ...Option) Options {
ClientCallFuncObservers: []ClientCallFuncObserver{DefaultClientCallFuncObserver},
ServerHandlerObservers: []ServerHandlerObserver{DefaultServerHandlerObserver},
ServerSubscriberObservers: []ServerSubscriberObserver{DefaultServerSubscriberObserver},
SkipEndpoints: DefaultSkipEndpoints,
}
for _, o := range opts {
@@ -56,139 +163,70 @@ func NewOptions(opts ...Option) Options {
return options
}
// WithTracer pass tracer
func WithTracer(t tracer.Tracer) Option {
return func(o *Options) {
o.Tracer = t
}
}
// SkipEndponts
func SkipEndpoins(eps ...string) Option {
return func(o *Options) {
o.SkipEndpoints = append(o.SkipEndpoints, eps...)
}
}
// WithClientCallObservers funcs
func WithClientCallObservers(ob ...ClientCallObserver) Option {
return func(o *Options) {
o.ClientCallObservers = ob
}
}
// WithClientStreamObservers funcs
func WithClientStreamObservers(ob ...ClientStreamObserver) Option {
return func(o *Options) {
o.ClientStreamObservers = ob
}
}
// WithClientPublishObservers funcs
func WithClientPublishObservers(ob ...ClientPublishObserver) Option {
return func(o *Options) {
o.ClientPublishObservers = ob
}
}
// WithClientCallFuncObservers funcs
func WithClientCallFuncObservers(ob ...ClientCallFuncObserver) Option {
return func(o *Options) {
o.ClientCallFuncObservers = ob
}
}
// WithServerHandlerObservers funcs
func WithServerHandlerObservers(ob ...ServerHandlerObserver) Option {
return func(o *Options) {
o.ServerHandlerObservers = ob
}
}
// WithServerSubscriberObservers funcs
func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option {
return func(o *Options) {
o.ServerSubscriberObservers = ob
}
}
func DefaultClientCallObserver(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
var labels []tracer.Label
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
func DefaultClientStreamObserver(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
var labels []tracer.Label
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
func DefaultClientPublishObserver(ctx context.Context, msg client.Message, opts []client.PublishOption, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("Pub to %s", msg.Topic()))
var labels []tracer.Label
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
func DefaultServerHandlerObserver(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
var labels []tracer.Label
if md, ok := metadata.FromIncomingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
func DefaultServerSubscriberObserver(ctx context.Context, msg server.Message, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("Sub from %s", msg.Topic()))
var labels []tracer.Label
if md, ok := metadata.FromIncomingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
func DefaultClientCallFuncObserver(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
var labels []tracer.Label
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range ot.opts.SkipEndpoints {
if ep == endpoint {
return ot.Client.Call(ctx, req, rsp, opts...)
}
}
sp := tracer.SpanFromContext(ctx)
defer sp.Finish()
@@ -202,6 +240,13 @@ func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{
}
func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range ot.opts.SkipEndpoints {
if ep == endpoint {
return ot.Client.Stream(ctx, req, opts...)
}
}
sp := tracer.SpanFromContext(ctx)
defer sp.Finish()
@@ -228,6 +273,13 @@ func (ot *tWrapper) Publish(ctx context.Context, msg client.Message, opts ...cli
}
func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp interface{}) error {
endpoint := req.Endpoint()
for _, ep := range ot.opts.SkipEndpoints {
if ep == endpoint {
return ot.serverHandler(ctx, req, rsp)
}
}
sp := tracer.SpanFromContext(ctx)
defer sp.Finish()
@@ -278,6 +330,13 @@ func NewClientCallWrapper(opts ...Option) client.CallWrapper {
}
func (ot *tWrapper) ClientCallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range ot.opts.SkipEndpoints {
if ep == endpoint {
return ot.ClientCallFunc(ctx, addr, req, rsp, opts)
}
}
sp := tracer.SpanFromContext(ctx)
defer sp.Finish()

View File

@@ -11,17 +11,16 @@ import (
// CertOptions are passed to cert options
type CertOptions struct {
IsCA bool
NotAfter time.Time
NotBefore time.Time
Parent *x509.Certificate
SerialNumber *big.Int
Subject pkix.Name
DNSNames []string
IPAddresses []net.IP
SerialNumber *big.Int
NotBefore time.Time
NotAfter time.Time
Parent *x509.Certificate
Pub ed25519.PublicKey
Priv ed25519.PrivateKey
Pub ed25519.PublicKey
Priv ed25519.PrivateKey
IsCA bool
}
// CertOption sets CertOptions

View File

@@ -10,18 +10,17 @@ import (
)
type pool struct {
size int
ttl time.Duration
tr transport.Transport
sync.Mutex
tr transport.Transport
conns map[string][]*poolConn
size int
ttl time.Duration
sync.Mutex
}
type poolConn struct {
transport.Client
id string
created time.Time
transport.Client
id string
}
func newPool(options Options) *pool {

File diff suppressed because it is too large Load Diff

View File

@@ -1,45 +0,0 @@
package reflect
import (
"net/url"
"testing"
)
func TestURLSliceVars(t *testing.T) {
u, err := url.Parse("http://localhost/v1/test/call/my_name?key=arg1&key=arg2&key=arg3")
if err != nil {
t.Fatal(err)
}
mp, err := URLMap(u.RawQuery)
if err != nil {
t.Fatal(err)
}
v, ok := mp["key"]
if !ok {
t.Fatalf("key not exists: %#+v", mp)
}
vm, ok := v.([]interface{})
if !ok {
t.Fatalf("invalid key value")
}
if len(vm) != 3 {
t.Fatalf("missing key value: %#+v", mp)
}
}
func TestURLVars(t *testing.T) {
u, err := url.Parse("http://localhost/v1/test/call/my_name?req=key&arg1=arg1&arg2=12345&nested.string_args=str1&nested.string_args=str2&arg2=54321")
if err != nil {
t.Fatal(err)
}
mp, err := URLMap(u.RawQuery)
if err != nil {
t.Fatal(err)
}
_ = mp
}

398
util/reflect/struct.go Normal file
View File

@@ -0,0 +1,398 @@
package reflect
import (
"errors"
"fmt"
"net/url"
"reflect"
"regexp"
"strings"
)
var (
// ErrInvalidParam specifies invalid url query params
ErrInvalidParam = errors.New("invalid url query param provided")
)
var (
bracketSplitter = regexp.MustCompile(`\[|\]`)
)
// StructFields returns slice of struct fields
func StructFields(src interface{}) ([]reflect.StructField, error) {
var fields []reflect.StructField
sv := reflect.ValueOf(src)
if sv.Kind() == reflect.Ptr {
sv = sv.Elem()
}
if sv.Kind() != reflect.Struct {
return nil, ErrInvalidStruct
}
typ := sv.Type()
for idx := 0; idx < typ.NumField(); idx++ {
fld := typ.Field(idx)
val := sv.Field(idx)
if !val.CanSet() || len(fld.PkgPath) != 0 {
continue
}
if val.Kind() == reflect.Struct {
infields, err := StructFields(val.Interface())
if err != nil {
return nil, err
}
fields = append(fields, infields...)
} else {
fields = append(fields, fld)
}
}
return fields, nil
}
// CopyDefaults for a from b
// a and b should be pointers to the same kind of struct
func CopyDefaults(a, b interface{}) {
pt := reflect.TypeOf(a)
t := pt.Elem()
va := reflect.ValueOf(a).Elem()
vb := reflect.ValueOf(b).Elem()
for i := 0; i < t.NumField(); i++ {
aField := va.Field(i)
if aField.CanSet() {
bField := vb.Field(i)
aField.Set(bField)
}
}
}
// CopyFrom sets the public members of a from b
// a and b should be pointers to structs
// a can be a different type from b
// Only the Fields which have the same name and assignable type on a
// and b will be set.
func CopyFrom(a, b interface{}) {
ta := reflect.TypeOf(a).Elem()
tb := reflect.TypeOf(b).Elem()
va := reflect.ValueOf(a).Elem()
vb := reflect.ValueOf(b).Elem()
for i := 0; i < tb.NumField(); i++ {
bField := vb.Field(i)
tbField := tb.Field(i)
name := tbField.Name
aField := va.FieldByName(name)
taField, found := ta.FieldByName(name)
if found && aField.IsValid() && bField.IsValid() && aField.CanSet() && tbField.Type.AssignableTo(taField.Type) {
aField.Set(bField)
}
}
}
func StructURLValues(src interface{}, pref string, tags []string) (url.Values, error) {
data := url.Values{}
sv := reflect.ValueOf(src)
if sv.Kind() == reflect.Ptr {
sv = sv.Elem()
}
if sv.Kind() != reflect.Struct {
return nil, ErrInvalidStruct
}
typ := sv.Type()
for idx := 0; idx < typ.NumField(); idx++ {
fld := typ.Field(idx)
val := sv.Field(idx)
if !val.CanSet() || len(fld.PkgPath) != 0 || !val.IsValid() {
continue
}
var t *tag
for _, tn := range tags {
ts, ok := fld.Tag.Lookup(tn)
if !ok {
continue
}
tp := strings.Split(ts, ",")
// special
switch tn {
case "protobuf": // special
t = &tag{key: tn, name: tp[3][5:], opts: append(tp[:3], tp[4:]...)}
default:
t = &tag{key: tn, name: tp[0], opts: tp[1:]}
}
if t.name != "" {
break
}
}
if t.name == "" {
// fallback to lowercase
t.name = strings.ToLower(fld.Name)
}
if pref != "" {
t.name = pref + "." + t.name
}
if !val.IsValid() || val.IsZero() {
continue
}
switch val.Kind() {
case reflect.Struct, reflect.Ptr:
if val.IsNil() {
continue
}
ndata, err := StructURLValues(val.Interface(), t.name, tags)
if err != nil {
return ndata, err
}
for k, v := range ndata {
data[k] = v
}
default:
switch val.Kind() {
case reflect.Slice:
for i := 0; i < val.Len(); i++ {
va := val.Index(i)
//if va.Type().Elem().Kind() != reflect.Ptr {
if va.Kind() != reflect.Ptr {
data.Set(t.name, fmt.Sprintf("%v", va.Interface()))
continue
}
switch va.Type().Elem().String() {
case "wrapperspb.BoolValue", "wrapperspb.BytesValue", "wrapperspb.StringValue":
if eva := reflect.Indirect(va).FieldByName("Value"); eva.IsValid() {
data.Add(t.name, fmt.Sprintf("%v", eva.Interface()))
}
case "wrapperspb.DoubleValue", "wrapperspb.FloatValue":
if eva := reflect.Indirect(va).FieldByName("Value"); eva.IsValid() {
data.Add(t.name, fmt.Sprintf("%v", eva.Interface()))
}
case "wrapperspb.Int32Value", "wrapperspb.Int64Value":
if eva := reflect.Indirect(va).FieldByName("Value"); eva.IsValid() {
data.Add(t.name, fmt.Sprintf("%v", eva.Interface()))
}
case "wrapperspb.UInt32Value", "wrapperspb.UInt64Value":
if eva := reflect.Indirect(va).FieldByName("Value"); eva.IsValid() {
data.Add(t.name, fmt.Sprintf("%v", eva.Interface()))
}
default:
data.Add(t.name, fmt.Sprintf("%v", val.Index(i).Interface()))
}
}
default:
data.Set(t.name, fmt.Sprintf("%v", val.Interface()))
}
}
}
return data, nil
}
// URLMap returns map of url query params
func URLMap(query string) (map[string]interface{}, error) {
var (
mp interface{} = make(map[string]interface{})
)
params := strings.Split(query, "&")
for _, part := range params {
tm, err := queryToMap(part)
if err != nil {
return nil, err
}
mp = merge(mp, tm)
}
return mp.(map[string]interface{}), nil
}
// FlattenMap expand key.subkey to nested map
func FlattenMap(a map[string]interface{}) map[string]interface{} {
// preprocess map
nb := make(map[string]interface{}, len(a))
for k, v := range a {
ps := strings.Split(k, ".")
if len(ps) == 1 {
nb[k] = v
continue
}
em := make(map[string]interface{})
em[ps[len(ps)-1]] = v
for i := len(ps) - 2; i > 0; i-- {
nm := make(map[string]interface{})
nm[ps[i]] = em
em = nm
}
if vm, ok := nb[ps[0]]; ok {
// nested map
nm := vm.(map[string]interface{})
for vk, vv := range em {
nm[vk] = vv
}
nb[ps[0]] = nm
} else {
nb[ps[0]] = em
}
}
return nb
}
/*
case reflect.String:
fn := func(c rune) bool { return c == ',' || c == ';' || c == ' ' }
slice := strings.FieldsFunc(vb.String(), fn)
if va.IsNil() {
va.Set(reflect.MakeSlice(va.Type(), len(slice), len(slice)))
}
*/
func btSplitter(str string) []string {
r := bracketSplitter.Split(str, -1)
for idx, s := range r {
if len(s) == 0 {
if len(r) > idx+1 {
copy(r[idx:], r[idx+1:])
r = r[:len(r)-1]
}
}
}
return r
}
// queryToMap turns something like a[b][c]=4 into
// map[string]interface{}{
// "a": map[string]interface{}{
// "b": map[string]interface{}{
// "c": 4,
// },
// },
// }
func queryToMap(param string) (map[string]interface{}, error) {
rawKey, rawValue, err := splitKeyAndValue(param)
if err != nil {
return nil, err
}
rawValue, err = url.QueryUnescape(rawValue)
if err != nil {
return nil, err
}
rawKey, err = url.QueryUnescape(rawKey)
if err != nil {
return nil, err
}
pieces := btSplitter(rawKey)
key := pieces[0]
// If len==1 then rawKey has no [] chars and we can just
// decode this as key=value into {key: value}
if len(pieces) == 1 {
return map[string]interface{}{
key: rawValue,
}, nil
}
// If len > 1 then we have something like a[b][c]=2
// so we need to turn this into {"a": {"b": {"c": 2}}}
// To do this we break our key into two pieces:
// a and b[c]
// and then we set {"a": queryToMap("b[c]", value)}
ret := make(map[string]interface{})
ret[key], err = queryToMap(buildNewKey(rawKey) + "=" + rawValue)
if err != nil {
return nil, err
}
// When URL params have a set of empty brackets (eg a[]=1)
// it is assumed to be an array. This will get us the
// correct value for the array item and return it as an
// []interface{} so that it can be merged properly.
if pieces[1] == "" {
temp := ret[key].(map[string]interface{})
ret[key] = []interface{}{temp[""]}
}
return ret, nil
}
// buildNewKey will take something like:
// origKey = "bar[one][two]"
// pieces = [bar one two ]
// and return "one[two]"
func buildNewKey(origKey string) string {
pieces := btSplitter(origKey)
ret := origKey[len(pieces[0])+1:]
ret = ret[:len(pieces[1])] + ret[len(pieces[1])+1:]
return ret
}
// splitKeyAndValue splits a URL param at the last equal
// sign and returns the two strings. If no equal sign is
// found, the ErrInvalidParam error is returned.
func splitKeyAndValue(param string) (string, string, error) {
li := strings.LastIndex(param, "=")
if li == -1 {
return "", "", ErrInvalidParam
}
return param[:li], param[li+1:], nil
}
// merge merges a with b if they are either both slices
// or map[string]interface{} types. Otherwise it returns b.
func merge(a interface{}, b interface{}) interface{} {
if av, aok := a.(map[string]interface{}); aok {
if bv, bok := b.(map[string]interface{}); bok {
return mergeMapIface(av, bv)
}
}
if av, aok := a.([]interface{}); aok {
if bv, bok := b.([]interface{}); bok {
return mergeSliceIface(av, bv)
}
}
va := reflect.ValueOf(a)
vb := reflect.ValueOf(b)
if (va.Type().Kind() == reflect.Slice) && (va.Type().Elem().Kind() == vb.Type().Kind() || vb.Type().ConvertibleTo(va.Type().Elem())) {
va = reflect.Append(va, vb.Convert(va.Type().Elem()))
return va.Interface()
}
return b
}
// mergeMap merges a with b, attempting to merge any nested
// values in nested maps but eventually overwriting anything
// in a that can't be merged with whatever is in b.
func mergeMapIface(a map[string]interface{}, b map[string]interface{}) map[string]interface{} {
for bK, bV := range b {
if aV, ok := a[bK]; ok {
if (reflect.ValueOf(aV).Type().Kind() == reflect.ValueOf(bV).Type().Kind()) ||
((reflect.ValueOf(aV).Type().Kind() == reflect.Slice) && reflect.ValueOf(aV).Type().Elem().Kind() == reflect.ValueOf(bV).Type().Kind()) {
nV := []interface{}{aV, bV}
a[bK] = nV
} else {
a[bK] = merge(a[bK], bV)
}
} else {
a[bK] = bV
}
}
return a
}
// mergeSlice merges a with b and returns the result.
func mergeSliceIface(a []interface{}, b []interface{}) []interface{} {
a = append(a, b...)
return a
}
type tag struct {
key string
name string
opts []string
}

110
util/reflect/struct_test.go Normal file
View File

@@ -0,0 +1,110 @@
package reflect
import (
"net/url"
"testing"
)
func TestStructURLValues(t *testing.T) {
type Str struct {
Name string `json:"name"`
Args []int `json:"args"`
Str *Str `json:"str"`
}
val := &Str{Name: "test_name", Args: []int{1, 2, 3}, Str: &Str{Name: "nested_name"}}
data, err := StructURLValues(val, "", []string{"json"})
if err != nil {
t.Fatal(err)
}
if data.Get("name") != "test_name" {
t.Fatalf("invalid data: %v", data)
}
}
func TestURLSliceVars(t *testing.T) {
u, err := url.Parse("http://localhost/v1/test/call/my_name?key=arg1&key=arg2&key=arg3")
if err != nil {
t.Fatal(err)
}
mp, err := URLMap(u.RawQuery)
if err != nil {
t.Fatal(err)
}
v, ok := mp["key"]
if !ok {
t.Fatalf("key not exists: %#+v", mp)
}
vm, ok := v.([]interface{})
if !ok {
t.Fatalf("invalid key value")
}
if len(vm) != 3 {
t.Fatalf("missing key value: %#+v", mp)
}
}
func TestURLVars(t *testing.T) {
u, err := url.Parse("http://localhost/v1/test/call/my_name?req=key&arg1=arg1&arg2=12345&nested.string_args=str1&nested.string_args=str2&arg2=54321")
if err != nil {
t.Fatal(err)
}
mp, err := URLMap(u.RawQuery)
if err != nil {
t.Fatal(err)
}
_ = mp
}
func TestIsZero(t *testing.T) {
testStr1 := struct {
Name string
Value string
Nested struct {
NestedName string
}
}{
Name: "test_name",
Value: "test_value",
}
testStr1.Nested.NestedName = "nested_name"
if ok := IsZero(testStr1); ok {
t.Fatalf("zero ret on non zero struct: %#+v", testStr1)
}
testStr1.Name = ""
testStr1.Value = ""
testStr1.Nested.NestedName = ""
if ok := IsZero(testStr1); !ok {
t.Fatalf("non zero ret on zero struct: %#+v", testStr1)
}
type testStr3 struct {
Nested string
}
type testStr2 struct {
Name string
Nested *testStr3
}
vtest := &testStr2{
Name: "test_name",
Nested: &testStr3{Nested: "nested_name"},
}
if ok := IsZero(vtest); ok {
t.Fatalf("zero ret on non zero struct: %#+v", vtest)
}
vtest.Nested = nil
vtest.Name = ""
if ok := IsZero(vtest); !ok {
t.Fatalf("non zero ret on zero struct: %#+v", vtest)
}
//t.Logf("XX %#+v\n", ok)
}

View File

@@ -10,11 +10,10 @@ import (
// Buffer is ring buffer
type Buffer struct {
size int
sync.RWMutex
vals []*Entry
streams map[string]*Stream
vals []*Entry
size int
sync.RWMutex
}
// Entry is ring buffer data entry
@@ -25,12 +24,12 @@ type Entry struct {
// Stream is used to stream the buffer
type Stream struct {
// Id of the stream
Id string
// Buffered entries
Entries chan *Entry
// Stop channel
Stop chan bool
// Id of the stream
Id string
}
// Put adds a new value to ring buffer

View File

@@ -8,18 +8,18 @@ const (
// Template is a compiled representation of path templates.
type Template struct {
// Version is the version number of the format.
Version int
// OpCodes is a sequence of operations.
// Verb is a VERB part in the template
Verb string
// Original template (example: /v1/a_bit_of_everything)
Template string
// OpCodes is a sequence of operations
OpCodes []int
// Pool is a constant pool
Pool []string
// Verb is a VERB part in the template.
Verb string
// Fields is a list of field paths bound in this template.
// Fields is a list of field paths bound in this template
Fields []string
// Original template (example: /v1/a_bit_of_everything)
Template string
// Version is the version number of the format
Version int
}
// Compiler compiles utilities representation of path templates into marshallable operations.
@@ -29,14 +29,8 @@ type Compiler interface {
}
type op struct {
// code is the opcode of the operation
code OpCode
// str is a string operand of the code.
// operand is ignored if str is not empty.
str string
// operand is a numeric operand of the code.
str string
code OpCode
operand int
}

View File

@@ -6,8 +6,8 @@ import (
)
type apiRouter struct {
routes []router.Route
router.Router
routes []router.Route
}
func (r *apiRouter) Lookup(...router.QueryOption) ([]router.Route, error) {

View File

@@ -25,9 +25,10 @@ type rop struct {
// Pattern is a template pattern of http request paths defined in github.com/googleapis/googleapis/google/api/http.proto.
type Pattern struct {
verb string
// ops is a list of operations
ops []rop
// pool is a constant pool indexed by the operands or vars.
// pool is a constant pool indexed by the operands or vars
pool []string
// vars is a list of variables names to be bound by this pattern
vars []string
@@ -35,8 +36,6 @@ type Pattern struct {
stacksize int
// tailLen is the length of the fixed-size segments after a deep wildcard
tailLen int
// verb is the VERB part of the path pattern. It is empty if the pattern does not have VERB part.
verb string
// assumeColonVerb indicates whether a path suffix after a final
// colon may only be interpreted as a verb.
assumeColonVerb bool

View File

@@ -8,9 +8,9 @@ import (
)
type template struct {
segments []segment
verb string
template string
segments []segment
}
type segment interface {

View File

@@ -6,8 +6,8 @@ import (
// Pool holds the socket pool
type Pool struct {
sync.RWMutex
pool map[string]*Socket
sync.RWMutex
}
// Get socket from pool

View File

@@ -9,17 +9,12 @@ import (
// Socket is our pseudo socket for transport.Socket
type Socket struct {
id string
// closed
closed chan bool
// remote addr
send chan *transport.Message
recv chan *transport.Message
id string
remote string
// local addr
local string
// send chan
send chan *transport.Message
// recv chan
recv chan *transport.Message
local string
}
// SetLocal sets the local addr

View File

@@ -21,10 +21,9 @@ type Stream interface {
type stream struct {
Stream
sync.RWMutex
err error
request *request
sync.RWMutex
}
type request struct {

View File

@@ -21,9 +21,9 @@ type Sync interface {
type syncStore struct {
storeOpts store.Options
syncOpts Options
pendingWrites []*deque.Deque
pendingWriteTickers []*time.Ticker
syncOpts Options
sync.RWMutex
}
@@ -123,7 +123,7 @@ func (c *syncStore) Sync() error {
}
type internalRecord struct {
expiresAt time.Time
key string
value []byte
expiresAt time.Time
}

View File

@@ -12,11 +12,10 @@ import (
// authClaims to be encoded in the JWT
type authClaims struct {
Type string `json:"type"`
Scopes []string `json:"scopes"`
Metadata metadata.Metadata `json:"metadata"`
jwt.StandardClaims
Type string `json:"type"`
Scopes []string `json:"scopes"`
}
// JWT implementation of token provider
@@ -51,7 +50,7 @@ func (j *JWT) Generate(acc *auth.Account, opts ...token.GenerateOption) (*token.
// generate the JWT
expiry := time.Now().Add(options.Expiry)
t := jwt.NewWithClaims(jwt.SigningMethodRS256, authClaims{
acc.Type, acc.Scopes, acc.Metadata, jwt.StandardClaims{
Type: acc.Type, Scopes: acc.Scopes, Metadata: acc.Metadata, StandardClaims: jwt.StandardClaims{
Subject: acc.ID,
Issuer: acc.Issuer,
ExpiresAt: expiry.Unix(),

View File

@@ -25,10 +25,10 @@ type Provider interface {
// Token holds the auth token
type Token struct {
// The actual token
Token string `json:"token"`
// Time of token creation
// Created time of token created
Created time.Time `json:"created"`
// Time of token expiry
// Expiry ime of the token
Expiry time.Time `json:"expiry"`
// Token holds the actual token
Token string `json:"token"`
}