Compare commits

..

18 Commits

Author SHA1 Message Date
6e6c31b5dd Merge pull request #69 from unistack-org/master
merge master
2021-12-28 09:30:34 +03:00
94929878fe Merge pull request #68 from unistack-org/improvements
improvements
2021-12-28 09:23:45 +03:00
8ce469a09e tracer: fixes
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-12-28 09:18:52 +03:00
88788776d2 Merge branch 'master' into v3 2021-12-16 15:04:08 +03:00
e143e2b547 client: allow to set metadata for message
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-12-16 15:03:42 +03:00
a36f99e30b Merge pull request #66 from unistack-org/minor_changes
config: add new error type
2021-11-30 07:35:27 +03:00
326ee53333 config: add new error type
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-11-30 07:34:49 +03:00
1244c5bb4d Merge pull request #65 from unistack-org/master
merge changes from master
2021-11-24 00:59:00 +03:00
4ccc8a9c85 Merge pull request #64 from unistack-org/minor_changes
minor changes
2021-11-24 00:58:21 +03:00
8a2e84d489 minor changes
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-11-24 00:57:59 +03:00
d29363b78d codec: add NewFrame helper
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-11-19 09:22:13 +03:00
734f751055 Merge pull request #63 from unistack-org/master
util/http: add type alias
2021-11-19 03:04:55 +03:00
55d8a9ee20 util/http: add type alias
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-11-19 03:04:08 +03:00
07c93042ba Merge pull request #62 from unistack-org/master
merge stable
2021-11-18 16:01:10 +03:00
b9bbfdf159 config: add watch option helper
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-11-18 15:57:14 +03:00
fbad257acc config: add helpers to load/save options (#60) 2021-11-18 15:46:30 +03:00
1829febb6e util/http: fix lint issues
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-11-09 17:07:52 +03:00
7838fa62a8 util/trie: import some code from chi router
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-11-09 16:34:05 +03:00
19 changed files with 139 additions and 60 deletions

View File

@@ -49,6 +49,7 @@ type Message interface {
Topic() string
Payload() interface{}
ContentType() string
Metadata() metadata.Metadata
}
// Request is the interface for a synchronous request used by Call or Stream

View File

@@ -12,7 +12,7 @@ import (
type LookupFunc func(context.Context, Request, CallOptions) ([]string, error)
// LookupRoute for a request using the router and then choose one using the selector
func LookupRoute(ctx context.Context, req Request, opts CallOptions) ([]string, error) {
func LookupRoute(_ context.Context, req Request, opts CallOptions) ([]string, error) {
// check to see if an address was provided as a call option
if len(opts.Address) > 0 {
return opts.Address, nil

View File

@@ -139,6 +139,10 @@ func (n *noopMessage) ContentType() string {
return n.opts.ContentType
}
func (n *noopMessage) Metadata() metadata.Metadata {
return n.opts.Metadata
}
func (n *noopClient) newCodec(contentType string) (codec.Codec, error) {
if cf, ok := n.opts.Codecs[contentType]; ok {
return cf, nil

View File

@@ -8,6 +8,7 @@ import (
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/meter"
"go.unistack.org/micro/v3/network/transport"
"go.unistack.org/micro/v3/register"
@@ -128,7 +129,7 @@ type PublishOptions struct {
// NewMessageOptions creates message options struct
func NewMessageOptions(opts ...MessageOption) MessageOptions {
options := MessageOptions{}
options := MessageOptions{Metadata: metadata.New(1)}
for _, o := range opts {
o(&options)
}
@@ -137,7 +138,10 @@ func NewMessageOptions(opts ...MessageOption) MessageOptions {
// MessageOptions holds client message options
type MessageOptions struct {
// Metadata additional metadata
Metadata metadata.Metadata
// ContentType specify content-type of message
// deprecated
ContentType string
}
@@ -517,6 +521,7 @@ func WithSelectOptions(sops ...selector.SelectOption) CallOption {
// Deprecated
func WithMessageContentType(ct string) MessageOption {
return func(o *MessageOptions) {
o.Metadata.Set(metadata.HeaderContentType, ct)
o.ContentType = ct
}
}
@@ -524,10 +529,18 @@ func WithMessageContentType(ct string) MessageOption {
// MessageContentType sets the message content type
func MessageContentType(ct string) MessageOption {
return func(o *MessageOptions) {
o.Metadata.Set(metadata.HeaderContentType, ct)
o.ContentType = ct
}
}
// MessageMetadata sets the message metadata
func MessageMetadata(k, v string) MessageOption {
return func(o *MessageOptions) {
o.Metadata.Set(k, v)
}
}
// StreamingRequest specifies that request is streaming
func StreamingRequest(b bool) RequestOption {
return func(o *RequestOptions) {

View File

@@ -20,7 +20,7 @@ func RetryNever(ctx context.Context, req Request, retryCount int, err error) (bo
}
// RetryOnError retries a request on a 500 or timeout error
func RetryOnError(ctx context.Context, req Request, retryCount int, err error) (bool, error) {
func RetryOnError(_ context.Context, _ Request, _ int, err error) (bool, error) {
if err == nil {
return false, nil
}

View File

@@ -5,29 +5,40 @@ type Frame struct {
Data []byte
}
// NewFrame returns new frame with data
func NewFrame(data []byte) *Frame {
return &Frame{Data: data}
}
// MarshalJSON returns frame data
func (m *Frame) MarshalJSON() ([]byte, error) {
return m.Data, nil
return m.Marshal()
}
// UnmarshalJSON set frame data
func (m *Frame) UnmarshalJSON(data []byte) error {
m.Data = data
return nil
return m.Unmarshal(data)
}
// ProtoMessage noop func
func (m *Frame) ProtoMessage() {}
// Reset resets frame
func (m *Frame) Reset() {
*m = Frame{}
}
// String returns frame as string
func (m *Frame) String() string {
return string(m.Data)
}
// Marshal returns frame data
func (m *Frame) Marshal() ([]byte, error) {
return m.Data, nil
}
// Unmarshal set frame data
func (m *Frame) Unmarshal(data []byte) error {
m.Data = data
return nil

View File

@@ -23,6 +23,8 @@ var (
ErrInvalidStruct = errors.New("invalid struct specified")
// ErrWatcherStopped is returned when source watcher has been stopped
ErrWatcherStopped = errors.New("watcher stopped")
// ErrWatcherNotImplemented returned when config does not implement watch
ErrWatcherNotImplemented = errors.New("watcher not implemented")
)
// Config is an interface abstraction for dynamic configuration

View File

@@ -52,3 +52,13 @@ func SetLoadOption(k, v interface{}) LoadOption {
o.Context = context.WithValue(o.Context, k, v)
}
}
// SetWatchOption returns a function to setup a context with given value
func SetWatchOption(k, v interface{}) WatchOption {
return func(o *WatchOptions) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, k, v)
}
}

View File

@@ -2,7 +2,6 @@ package config
import (
"context"
"fmt"
"reflect"
"strconv"
"strings"
@@ -271,7 +270,7 @@ func (c *defaultConfig) Name() string {
}
func (c *defaultConfig) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) {
return nil, fmt.Errorf("not implemented")
return nil, ErrWatcherNotImplemented
}
// NewConfig returns new default config source

View File

@@ -53,6 +53,22 @@ func (e *Error) Error() string {
return string(b)
}
/*
// Generator struct holds id of error
type Generator struct {
id string
}
// Generator can emit new error with static id
func NewGenerator(id string) *Generator {
return &Generator{id: id}
}
func (g *Generator) BadRequest(format string, args ...interface{}) error {
return BadRequest(g.id, format, args...)
}
*/
// New generates a custom error
func New(id, detail string, code int32) error {
return &Error{
@@ -75,121 +91,121 @@ func Parse(err string) *Error {
}
// BadRequest generates a 400 error.
func BadRequest(id, format string, a ...interface{}) error {
func BadRequest(id, format string, args ...interface{}) error {
return &Error{
ID: id,
Code: 400,
Detail: fmt.Sprintf(format, a...),
Detail: fmt.Sprintf(format, args...),
Status: http.StatusText(400),
}
}
// Unauthorized generates a 401 error.
func Unauthorized(id, format string, a ...interface{}) error {
func Unauthorized(id, format string, args ...interface{}) error {
return &Error{
ID: id,
Code: 401,
Detail: fmt.Sprintf(format, a...),
Detail: fmt.Sprintf(format, args...),
Status: http.StatusText(401),
}
}
// Forbidden generates a 403 error.
func Forbidden(id, format string, a ...interface{}) error {
func Forbidden(id, format string, args ...interface{}) error {
return &Error{
ID: id,
Code: 403,
Detail: fmt.Sprintf(format, a...),
Detail: fmt.Sprintf(format, args...),
Status: http.StatusText(403),
}
}
// NotFound generates a 404 error.
func NotFound(id, format string, a ...interface{}) error {
func NotFound(id, format string, args ...interface{}) error {
return &Error{
ID: id,
Code: 404,
Detail: fmt.Sprintf(format, a...),
Detail: fmt.Sprintf(format, args...),
Status: http.StatusText(404),
}
}
// MethodNotAllowed generates a 405 error.
func MethodNotAllowed(id, format string, a ...interface{}) error {
func MethodNotAllowed(id, format string, args ...interface{}) error {
return &Error{
ID: id,
Code: 405,
Detail: fmt.Sprintf(format, a...),
Detail: fmt.Sprintf(format, args...),
Status: http.StatusText(405),
}
}
// Timeout generates a 408 error.
func Timeout(id, format string, a ...interface{}) error {
func Timeout(id, format string, args ...interface{}) error {
return &Error{
ID: id,
Code: 408,
Detail: fmt.Sprintf(format, a...),
Detail: fmt.Sprintf(format, args...),
Status: http.StatusText(408),
}
}
// Conflict generates a 409 error.
func Conflict(id, format string, a ...interface{}) error {
func Conflict(id, format string, args ...interface{}) error {
return &Error{
ID: id,
Code: 409,
Detail: fmt.Sprintf(format, a...),
Detail: fmt.Sprintf(format, args...),
Status: http.StatusText(409),
}
}
// InternalServerError generates a 500 error.
func InternalServerError(id, format string, a ...interface{}) error {
func InternalServerError(id, format string, args ...interface{}) error {
return &Error{
ID: id,
Code: 500,
Detail: fmt.Sprintf(format, a...),
Detail: fmt.Sprintf(format, args...),
Status: http.StatusText(500),
}
}
// NotImplemented generates a 501 error
func NotImplemented(id, format string, a ...interface{}) error {
func NotImplemented(id, format string, args ...interface{}) error {
return &Error{
ID: id,
Code: 501,
Detail: fmt.Sprintf(format, a...),
Detail: fmt.Sprintf(format, args...),
Status: http.StatusText(501),
}
}
// BadGateway generates a 502 error
func BadGateway(id, format string, a ...interface{}) error {
func BadGateway(id, format string, args ...interface{}) error {
return &Error{
ID: id,
Code: 502,
Detail: fmt.Sprintf(format, a...),
Detail: fmt.Sprintf(format, args...),
Status: http.StatusText(502),
}
}
// ServiceUnavailable generates a 503 error
func ServiceUnavailable(id, format string, a ...interface{}) error {
func ServiceUnavailable(id, format string, args ...interface{}) error {
return &Error{
ID: id,
Code: 503,
Detail: fmt.Sprintf(format, a...),
Detail: fmt.Sprintf(format, args...),
Status: http.StatusText(503),
}
}
// GatewayTimeout generates a 504 error
func GatewayTimeout(id, format string, a ...interface{}) error {
func GatewayTimeout(id, format string, args ...interface{}) error {
return &Error{
ID: id,
Code: 504,
Detail: fmt.Sprintf(format, a...),
Detail: fmt.Sprintf(format, args...),
Status: http.StatusText(504),
}
}

2
go.mod
View File

@@ -4,7 +4,7 @@ go 1.16
require (
github.com/ef-ds/deque v1.0.4
github.com/golang-jwt/jwt/v4 v4.1.0
github.com/golang-jwt/jwt/v4 v4.2.0
github.com/imdario/mergo v0.3.12
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/silas/dag v0.0.0-20210626123444-3804bac2d6d4

4
go.sum
View File

@@ -1,7 +1,7 @@
github.com/ef-ds/deque v1.0.4 h1:iFAZNmveMT9WERAkqLJ+oaABF9AcVQ5AjXem/hroniI=
github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg=
github.com/golang-jwt/jwt/v4 v4.1.0 h1:XUgk2Ex5veyVFVeLm0xhusUTQybEbexJXrvPNOKkSY0=
github.com/golang-jwt/jwt/v4 v4.1.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg=
github.com/golang-jwt/jwt/v4 v4.2.0 h1:besgBTC8w8HjP6NzQdxwKH9Z5oQMZ24ThTrHp3cZ8eU=
github.com/golang-jwt/jwt/v4 v4.2.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=

View File

@@ -51,6 +51,4 @@ func TestExtractEndpoint(t *testing.T) {
if endpoints[0].Response != "TestResponse" {
t.Fatalf("Expected TestResponse got %s", endpoints[0].Response)
}
t.Logf("XXX %#+v\n", endpoints[0])
}

View File

@@ -16,6 +16,7 @@ import (
"go.unistack.org/micro/v3/network/transport"
"go.unistack.org/micro/v3/register"
"go.unistack.org/micro/v3/tracer"
"go.unistack.org/micro/v3/util/id"
)
// Option func
@@ -106,7 +107,7 @@ func NewOptions(opts ...Option) Options {
Address: DefaultAddress,
Name: DefaultName,
Version: DefaultVersion,
ID: DefaultID,
ID: id.Must(),
Namespace: DefaultNamespace,
}

View File

@@ -8,7 +8,6 @@ import (
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/register"
"go.unistack.org/micro/v3/util/id"
)
// DefaultServer default server
@@ -21,8 +20,6 @@ var (
DefaultName = "server"
// DefaultVersion will be used if no version passed
DefaultVersion = "latest"
// DefaultID will be used if no id passed
DefaultID = id.Must()
// DefaultRegisterCheck holds func that run before register server
DefaultRegisterCheck = func(context.Context) error { return nil }
// DefaultRegisterInterval holds interval for register

View File

@@ -18,11 +18,11 @@ var (
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
labels = append(labels, tracer.LabelString(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
labels = append(labels, tracer.LabelBool("error", true))
}
sp.SetLabels(labels...)
}
@@ -33,11 +33,11 @@ var (
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
labels = append(labels, tracer.LabelString(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
labels = append(labels, tracer.LabelBool("error", true))
}
sp.SetLabels(labels...)
}
@@ -48,11 +48,11 @@ var (
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
labels = append(labels, tracer.LabelString(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
labels = append(labels, tracer.LabelBool("error", true))
}
sp.SetLabels(labels...)
}
@@ -63,11 +63,11 @@ var (
if md, ok := metadata.FromIncomingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
labels = append(labels, tracer.LabelString(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
labels = append(labels, tracer.LabelBool("error", true))
}
sp.SetLabels(labels...)
}
@@ -78,11 +78,11 @@ var (
if md, ok := metadata.FromIncomingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
labels = append(labels, tracer.LabelString(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
labels = append(labels, tracer.LabelBool("error", true))
}
sp.SetLabels(labels...)
}
@@ -93,11 +93,11 @@ var (
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
labels = append(labels, tracer.LabelString(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
labels = append(labels, tracer.LabelBool("error", true))
}
sp.SetLabels(labels...)
}
@@ -229,7 +229,10 @@ func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{
}
}
sp := tracer.SpanFromContext(ctx)
sp, ok := tracer.SpanFromContext(ctx)
if !ok {
ctx, sp = ot.opts.Tracer.Start(ctx, endpoint)
}
defer sp.Finish()
err := ot.Client.Call(ctx, req, rsp, opts...)
@@ -249,7 +252,10 @@ func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...clie
}
}
sp := tracer.SpanFromContext(ctx)
sp, ok := tracer.SpanFromContext(ctx)
if !ok {
ctx, sp = ot.opts.Tracer.Start(ctx, endpoint)
}
defer sp.Finish()
stream, err := ot.Client.Stream(ctx, req, opts...)
@@ -262,7 +268,10 @@ func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...clie
}
func (ot *tWrapper) Publish(ctx context.Context, msg client.Message, opts ...client.PublishOption) error {
sp := tracer.SpanFromContext(ctx)
sp, ok := tracer.SpanFromContext(ctx)
if !ok {
ctx, sp = ot.opts.Tracer.Start(ctx, msg.Topic())
}
defer sp.Finish()
err := ot.Client.Publish(ctx, msg, opts...)
@@ -282,7 +291,10 @@ func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp i
}
}
sp := tracer.SpanFromContext(ctx)
sp, ok := tracer.SpanFromContext(ctx)
if !ok {
ctx, sp = ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
}
defer sp.Finish()
err := ot.serverHandler(ctx, req, rsp)
@@ -295,7 +307,10 @@ func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp i
}
func (ot *tWrapper) ServerSubscriber(ctx context.Context, msg server.Message) error {
sp := tracer.SpanFromContext(ctx)
sp, ok := tracer.SpanFromContext(ctx)
if !ok {
ctx, sp = ot.opts.Tracer.Start(ctx, msg.Topic())
}
defer sp.Finish()
err := ot.serverSubscriber(ctx, msg)
@@ -339,7 +354,10 @@ func (ot *tWrapper) ClientCallFunc(ctx context.Context, addr string, req client.
}
}
sp := tracer.SpanFromContext(ctx)
sp, ok := tracer.SpanFromContext(ctx)
if !ok {
ctx, sp = ot.opts.Tracer.Start(ctx, endpoint)
}
defer sp.Finish()
err := ot.clientCallFunc(ctx, addr, req, rsp, opts)

View File

@@ -79,6 +79,8 @@ func NewTrie() *Node {
return &Node{}
}
type Trie = Node
type Node struct {
// regexp matcher for regexp nodes
rex *regexp.Regexp

View File

@@ -5,6 +5,10 @@ import (
"testing"
)
func TestTrieBackwards(t *testing.T) {
_ = &Trie{}
}
func TestTrieWildcardPathPrefix(t *testing.T) {
var err error
type handler struct {

View File

@@ -91,7 +91,10 @@ func Merge(dst interface{}, mp map[string]interface{}, opts ...Option) error {
val, ok := mp[fname]
if !ok {
continue
val, ok = mp[dfld.Name]
if !ok {
continue
}
}
sval = reflect.ValueOf(val)