Compare commits

..

40 Commits

Author SHA1 Message Date
ecb60e4dc5 fix lint
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-09-28 23:43:43 +03:00
a1999ff81c util/http: trie add more tests
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-09-28 01:02:28 +03:00
d0f2bc8346 util/http: add trie matching func
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-09-27 23:30:53 +03:00
dependabot[bot]
dd29bf457e chore(deps): bump actions/github-script from 4 to 5 (#58)
Bumps [actions/github-script](https://github.com/actions/github-script) from 4 to 5.
- [Release notes](https://github.com/actions/github-script/releases)
- [Commits](https://github.com/actions/github-script/compare/v4...v5)

---
updated-dependencies:
- dependency-name: actions/github-script
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2021-09-27 09:33:25 +03:00
d062c248e3 codec: fieldaligment
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-09-22 17:09:26 +03:00
875f66d36e codec: implement proto v1 message for Frame
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-09-22 16:59:52 +03:00
818a0e6356 codec: add context helper funcs
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-09-22 01:07:27 +03:00
56e02ec463 codec: add ability to pass codec options
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-09-22 00:57:10 +03:00
6ca851401d update workflow
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-09-21 21:46:23 +03:00
bd8216b397 update workflows
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-09-17 07:47:23 +03:00
2b13b3f128 Revert "update workflows"
This reverts commit 9957380b6d.
2021-09-17 07:42:46 +03:00
9957380b6d update workflows
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-09-17 07:41:17 +03:00
e10f8c0fa0 util/id: move tests to micro-tests repo
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-09-16 15:31:01 +03:00
45252fe4a6 enable automerge
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-09-16 10:30:59 +03:00
faad082efe util/rand: add Shuffle func
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-09-15 17:51:25 +03:00
8ab35cbd9b update dependabot
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-09-12 16:17:28 +03:00
ad58ab6943 fix codeql issue
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-09-06 10:51:13 +03:00
0e97049e1d Create SECURITY.md 2021-09-06 10:49:18 +03:00
edb0bbf9cf add codeql
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-09-06 10:30:21 +03:00
dependabot[bot]
1b01bd22a6 build(deps): bump github.com/unistack-org/micro-proto (#57) 2021-09-06 06:56:44 +00:00
2fbaa26f0f logger: add Clone method
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-30 16:21:01 +03:00
35d3e4b332 logger: breaking changes to log level parsing
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-30 16:21:01 +03:00
dependabot[bot]
e98a93d530 build(deps): bump github.com/unistack-org/micro-proto (#56)
Bumps [github.com/unistack-org/micro-proto](https://github.com/unistack-org/micro-proto) from 0.0.5 to 0.0.8.
- [Release notes](https://github.com/unistack-org/micro-proto/releases)
- [Commits](https://github.com/unistack-org/micro-proto/compare/v0.0.5...v0.0.8)

---
updated-dependencies:
- dependency-name: github.com/unistack-org/micro-proto
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2021-08-30 16:20:10 +03:00
e3545532e8 minor changes
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-21 01:00:10 +03:00
09653c2fb2 util/id: specify default size for uuid behaviour
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-20 22:48:03 +03:00
70adfeab0d fix flow
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-20 22:44:17 +03:00
a45b672c98 drop uuid and use modified nanoid
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-20 22:40:48 +03:00
4509323cae update and regen all
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-16 23:56:50 +03:00
b3f4c670d5 regen all
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-16 18:57:19 +03:00
778dd449e2 logger: add NewStdLogger and RedirectStdLogger
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-06 13:45:11 +03:00
1d16983b67 logger: add NewStdLogger that can be used as std *log.Logger
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-06 11:52:04 +03:00
f386bffd37 logger: change logger interface
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-06 02:15:57 +03:00
772bde7938 network/tunnel/broker: fix metadata compile issue
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-06 02:14:56 +03:00
ea16f5f825 config/default: not implement watcher as it cant change
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-04 16:04:58 +03:00
c2f34df493 config: minor changes to split config and watcher files
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-04 13:51:43 +03:00
efe215cd60 config/default: watcher send changes only on non nil
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-04 12:25:29 +03:00
b4f332bf0d config/default: return error on Next() call
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-04 01:15:50 +03:00
f47fbb1030 config: add jitter interval for watcher to avoid dos
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-04 00:37:56 +03:00
1e8e57a708 config/default: minor changes
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-03 00:49:21 +03:00
dependabot[bot]
5d0959b0a1 build(deps): bump github.com/golang-jwt/jwt (#54)
Bumps [github.com/golang-jwt/jwt](https://github.com/golang-jwt/jwt) from 3.2.1+incompatible to 3.2.2+incompatible.
- [Release notes](https://github.com/golang-jwt/jwt/releases)
- [Changelog](https://github.com/golang-jwt/jwt/blob/main/VERSION_HISTORY.md)
- [Commits](https://github.com/golang-jwt/jwt/compare/v3.2.1...v3.2.2)

---
updated-dependencies:
- dependency-name: github.com/golang-jwt/jwt
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2021-08-03 00:27:35 +03:00
66 changed files with 1238 additions and 432 deletions

View File

@@ -11,9 +11,16 @@ updates:
directory: "/"
schedule:
interval: "daily"
commit-message:
prefix: "chore"
include: "scope"
# Maintain dependencies for Golang
- package-ecosystem: "gomod"
directory: "/"
schedule:
interval: "daily"
commit-message:
prefix: "chore"
include: "scope"

75
.github/workflows/codeql-analysis.yml vendored Normal file
View File

@@ -0,0 +1,75 @@
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
#
# ******** NOTE ********
# We have attempted to detect the languages in your repository. Please check
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
#
name: "CodeQL"
on:
workflow_run:
workflows: ["prbuild"]
types:
- completed
push:
branches: [ master ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ master ]
schedule:
- cron: '34 1 * * 0'
jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write
strategy:
fail-fast: false
matrix:
language: [ 'go' ]
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python' ]
# Learn more:
# https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed
steps:
- name: Checkout repository
uses: actions/checkout@v2
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v1
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# queries: ./path/to/local/query, your-org/your-repo/queries@main
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@v1
# Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
# ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
# and modify them (or add more) to build your code if your project
# uses a compiled language
#- run: |
# make bootstrap
# make release
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v1

View File

@@ -0,0 +1,66 @@
name: "prautomerge"
on:
workflow_run:
workflows: ["prbuild"]
types:
- completed
permissions:
contents: write
pull-requests: write
jobs:
Dependabot-Automerge:
runs-on: ubuntu-latest
# Contains workaround to execute if dependabot updates the PR by checking for the base branch in the linked PR
# The the github.event.workflow_run.event value is 'push' and not 'pull_request'
# dont work with multiple workflows when last returns success
if: >-
github.event.workflow_run.conclusion == 'success'
&& github.actor == 'dependabot[bot]'
&& github.event.sender.login == 'dependabot[bot]'
&& github.event.sender.type == 'Bot'
&& (github.event.workflow_run.event == 'pull_request'
|| (github.event.workflow_run.event == 'push' && github.event.workflow_run.pull_requests[0].base.ref == github.event.repository.default_branch ))
steps:
- name: Approve Changes and Merge changes if label 'dependencies' is set
uses: actions/github-script@v5
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: |
console.log(context.payload.workflow_run);
var labelNames = await github.paginate(
github.issues.listLabelsOnIssue,
{
repo: context.repo.repo,
owner: context.repo.owner,
issue_number: context.payload.workflow_run.pull_requests[0].number,
},
(response) => response.data.map(
(label) => label.name
)
);
console.log(labelNames);
if (labelNames.includes('dependencies')) {
console.log('Found label');
await github.pulls.createReview({
repo: context.repo.repo,
owner: context.repo.owner,
pull_number: context.payload.workflow_run.pull_requests[0].number,
event: 'APPROVE'
});
console.log('Approved PR');
await github.pulls.merge({
repo: context.repo.repo,
owner: context.repo.owner,
pull_number: context.payload.workflow_run.pull_requests[0].number,
});
console.log('Merged PR');
}

15
SECURITY.md Normal file
View File

@@ -0,0 +1,15 @@
# Security Policy
## Supported Versions
Use this section to tell people about which versions of your project are
currently being supported with security updates.
| Version | Supported |
| ------- | ------------------ |
| 3.7.x | :white_check_mark: |
| < 3.7.0 | :x: |
## Reporting a Vulnerability
If you find any issue, please create github issue in this repo

View File

@@ -55,7 +55,7 @@ type Auth interface {
type Account struct {
// Metadata any other associated metadata
Metadata metadata.Metadata `json:"metadata"`
// ID of the account e.g. email or uuid
// ID of the account e.g. email or id
ID string `json:"id"`
// Type of the account, e.g. service
Type string `json:"type"`

View File

@@ -1,7 +1,7 @@
package auth
import (
"github.com/google/uuid"
"github.com/unistack-org/micro/v3/util/id"
)
type noopAuth struct {
@@ -61,11 +61,11 @@ func (n *noopAuth) Verify(acc *Account, res *Resource, opts ...VerifyOption) err
// Inspect a token
func (n *noopAuth) Inspect(token string) (*Account, error) {
uid, err := uuid.NewRandom()
id, err := id.New()
if err != nil {
return nil, err
}
return &Account{ID: uid.String(), Issuer: n.Options().Issuer}, nil
return &Account{ID: id, Issuer: n.Options().Issuer}, nil
}
// Token generation using an account id and secret

View File

@@ -4,10 +4,10 @@ import (
"context"
"sync"
"github.com/google/uuid"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/metadata"
maddr "github.com/unistack-org/micro/v3/util/addr"
"github.com/unistack-org/micro/v3/util/id"
mnet "github.com/unistack-org/micro/v3/util/net"
"github.com/unistack-org/micro/v3/util/rand"
)
@@ -114,8 +114,8 @@ func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message,
}
type msgWrapper struct {
topic string
body interface{}
topic string
}
func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error {
@@ -180,7 +180,7 @@ func (m *memoryBroker) publish(ctx context.Context, vs []msgWrapper, opts ...Pub
beh = sub.opts.BatchErrorHandler
}
if beh != nil {
beh(ms)
_ = beh(ms)
} else if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, err.Error())
}
@@ -199,7 +199,7 @@ func (m *memoryBroker) publish(ctx context.Context, vs []msgWrapper, opts ...Pub
eh = sub.opts.ErrorHandler
}
if eh != nil {
eh(p)
_ = eh(p)
} else if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, err.Error())
}
@@ -224,7 +224,7 @@ func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler
}
m.RUnlock()
id, err := uuid.NewRandom()
sid, err := id.New()
if err != nil {
return nil, err
}
@@ -233,7 +233,7 @@ func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler
sub := &memorySubscriber{
exit: make(chan bool, 1),
id: id.String(),
id: sid,
topic: topic,
batchhandler: handler,
opts: options,
@@ -269,7 +269,7 @@ func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Hand
}
m.RUnlock()
id, err := uuid.NewRandom()
sid, err := id.New()
if err != nil {
return nil, err
}
@@ -278,7 +278,7 @@ func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Hand
sub := &memorySubscriber{
exit: make(chan bool, 1),
id: id.String(),
id: sid,
topic: topic,
handler: handler,
opts: options,

View File

@@ -53,6 +53,7 @@ func TestMemoryBatchBroker(t *testing.T) {
t.Fatalf("Unexpected connect error %v", err)
}
}
func TestMemoryBroker(t *testing.T) {
b := NewBroker()
ctx := context.Background()

View File

@@ -41,11 +41,11 @@ type MessageType int
// connection. ReadBody may be called with a nil argument to force the
// body to be read and discarded.
type Codec interface {
ReadHeader(io.Reader, *Message, MessageType) error
ReadBody(io.Reader, interface{}) error
Write(io.Writer, *Message, interface{}) error
Marshal(interface{}) ([]byte, error)
Unmarshal([]byte, interface{}) error
ReadHeader(r io.Reader, m *Message, mt MessageType) error
ReadBody(r io.Reader, v interface{}) error
Write(w io.Writer, m *Message, v interface{}) error
Marshal(v interface{}, opts ...Option) ([]byte, error)
Unmarshal(b []byte, v interface{}, opts ...Option) error
String() string
}

34
codec/context.go Normal file
View File

@@ -0,0 +1,34 @@
package codec
import (
"context"
)
type codecKey struct{}
// FromContext returns codec from context
func FromContext(ctx context.Context) (Codec, bool) {
if ctx == nil {
return nil, false
}
c, ok := ctx.Value(codecKey{}).(Codec)
return c, ok
}
// NewContext put codec in context
func NewContext(ctx context.Context, c Codec) context.Context {
if ctx == nil {
ctx = context.Background()
}
return context.WithValue(ctx, codecKey{}, c)
}
// SetOption returns a function to setup a context with given value
func SetOption(k, v interface{}) Option {
return func(o *Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, k, v)
}
}

View File

@@ -4,3 +4,31 @@ package codec
type Frame struct {
Data []byte
}
func (m *Frame) MarshalJSON() ([]byte, error) {
return m.Data, nil
}
func (m *Frame) UnmarshalJSON(data []byte) error {
m.Data = data
return nil
}
func (m *Frame) ProtoMessage() {}
func (m *Frame) Reset() {
*m = Frame{}
}
func (m *Frame) String() string {
return string(m.Data)
}
func (m *Frame) Marshal() ([]byte, error) {
return m.Data, nil
}
func (m *Frame) Unmarshal(data []byte) error {
m.Data = data
return nil
}

View File

@@ -5,7 +5,9 @@ import (
"io"
)
type noopCodec struct{}
type noopCodec struct {
opts Options
}
func (c *noopCodec) ReadHeader(conn io.Reader, m *Message, t MessageType) error {
return nil
@@ -69,11 +71,11 @@ func (c *noopCodec) String() string {
}
// NewCodec returns new noop codec
func NewCodec() Codec {
return &noopCodec{}
func NewCodec(opts ...Option) Codec {
return &noopCodec{opts: NewOptions(opts...)}
}
func (c *noopCodec) Marshal(v interface{}) ([]byte, error) {
func (c *noopCodec) Marshal(v interface{}, opts ...Option) ([]byte, error) {
if v == nil {
return nil, nil
}
@@ -96,7 +98,7 @@ func (c *noopCodec) Marshal(v interface{}) ([]byte, error) {
return json.Marshal(v)
}
func (c *noopCodec) Unmarshal(d []byte, v interface{}) error {
func (c *noopCodec) Unmarshal(d []byte, v interface{}, opts ...Option) error {
if v == nil {
return nil
}

View File

@@ -1,6 +1,8 @@
package codec
import (
"context"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/meter"
"github.com/unistack-org/micro/v3/tracer"
@@ -17,6 +19,10 @@ type Options struct {
Logger logger.Logger
// Tracer used for tracing
Tracer tracer.Tracer
// Context stores additional codec options
Context context.Context
// TagName specifies tag name in struct to control codec
TagName string
// MaxMsgSize specifies max messages size that reads by codec
MaxMsgSize int
}
@@ -28,6 +34,13 @@ func MaxMsgSize(n int) Option {
}
}
// TagName sets the codec tag name in struct
func TagName(n string) Option {
return func(o *Options) {
o.TagName = n
}
}
// Logger sets the logger
func Logger(l logger.Logger) Option {
return func(o *Options) {
@@ -52,10 +65,12 @@ func Meter(m meter.Meter) Option {
// NewOptions returns new options
func NewOptions(opts ...Option) Options {
options := Options{
Context: context.Background(),
Logger: logger.DefaultLogger,
Meter: meter.DefaultMeter,
Tracer: tracer.DefaultTracer,
MaxMsgSize: DefaultMaxMsgSize,
TagName: DefaultTagName,
}
for _, o := range opts {

View File

@@ -10,8 +10,11 @@ import (
// DefaultConfig default config
var DefaultConfig Config = NewConfig()
// DefaultWatcherInterval default interval for poll changes
var DefaultWatcherInterval = 5 * time.Second
// DefaultWatcherMinInterval default min interval for poll changes
var DefaultWatcherMinInterval = 5 * time.Second
// DefaultWatcherMaxInterval default max interval for poll changes
var DefaultWatcherMaxInterval = 9 * time.Second
var (
// ErrCodecMissing is returned when codec needed and not specified

View File

@@ -2,10 +2,10 @@ package config
import (
"context"
"fmt"
"reflect"
"strconv"
"strings"
"time"
"github.com/imdario/mergo"
rutil "github.com/unistack-org/micro/v3/util/reflect"
@@ -271,17 +271,7 @@ func (c *defaultConfig) Name() string {
}
func (c *defaultConfig) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) {
w := &defaultWatcher{
opts: c.opts,
wopts: NewWatchOptions(opts...),
done: make(chan bool),
vchan: make(chan map[string]interface{}),
echan: make(chan error),
}
go w.run()
return w, nil
return nil, fmt.Errorf("not implemented")
}
// NewConfig returns new default config source
@@ -292,73 +282,3 @@ func NewConfig(opts ...Option) Config {
}
return &defaultConfig{opts: options}
}
type defaultWatcher struct {
opts Options
wopts WatchOptions
done chan bool
ticker *time.Ticker
vchan chan map[string]interface{}
echan chan error
}
func (w *defaultWatcher) run() {
ticker := time.NewTicker(w.wopts.Interval)
defer ticker.Stop()
src := w.opts.Struct
if w.wopts.Struct != nil {
src = w.wopts.Struct
}
for {
select {
case <-w.done:
return
case <-ticker.C:
dst, err := rutil.Zero(src)
if err == nil {
err = fillValues(reflect.ValueOf(dst), w.opts.StructTag)
}
if err != nil {
w.echan <- err
return
}
srcmp, err := rutil.StructFieldsMap(src)
if err != nil {
w.echan <- err
return
}
dstmp, err := rutil.StructFieldsMap(dst)
if err != nil {
w.echan <- err
return
}
for sk, sv := range srcmp {
if reflect.DeepEqual(dstmp[sk], sv) {
delete(dstmp, sk)
}
}
w.vchan <- dstmp
src = dst
}
}
}
func (w *defaultWatcher) Next() (map[string]interface{}, error) {
select {
case <-w.done:
break
case v, ok := <-w.vchan:
if !ok {
break
}
return v, nil
}
return nil, ErrWatcherStopped
}
func (w *defaultWatcher) Stop() error {
close(w.done)
return nil
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"testing"
"time"
"github.com/unistack-org/micro/v3/config"
)
@@ -18,57 +17,6 @@ type Cfg struct {
IntValue int `default:"99"`
}
func TestWatch(t *testing.T) {
ctx := context.Background()
conf := &Cfg{IntValue: 10}
cfg := config.NewConfig(config.Struct(conf))
if err := cfg.Init(); err != nil {
t.Fatal(err)
}
if err := cfg.Load(ctx); err != nil {
t.Fatal(err)
}
w, err := cfg.Watch(ctx, config.WatchInterval(500*time.Millisecond))
if err != nil {
t.Fatal(err)
}
defer func() {
_ = w.Stop()
}()
done := make(chan struct{})
go func() {
for {
mp, err := w.Next()
if err != nil && err != config.ErrWatcherStopped {
t.Fatal(err)
} else if err == config.ErrWatcherStopped {
return
}
if len(mp) != 1 {
t.Fatal(fmt.Errorf("default watcher err: %v", mp))
}
v, ok := mp["IntValue"]
if !ok {
t.Fatal(fmt.Errorf("default watcher err: %v", v))
}
if nv, ok := v.(int); !ok || nv != 99 {
t.Fatal(fmt.Errorf("default watcher err: %v", v))
}
close(done)
return
}
}()
<-done
}
func TestDefault(t *testing.T) {
ctx := context.Background()
conf := &Cfg{IntValue: 10}
@@ -100,5 +48,5 @@ func TestDefault(t *testing.T) {
t.Fatal("AfterLoad option not working")
}
_ = conf
//t.Logf("%#+v\n", conf)
// t.Logf("%#+v\n", conf)
}

View File

@@ -211,8 +211,10 @@ type WatchOptions struct {
Context context.Context
// Coalesce multiple events to one
Coalesce bool
// Interval to periodically pull changes if config source not supports async notify
Interval time.Duration
// MinInterval specifies the min time.Duration interval for poll changes
MinInterval time.Duration
// MaxInterval specifies the max time.Duration interval for poll changes
MaxInterval time.Duration
// Struct for filling
Struct interface{}
}
@@ -221,8 +223,9 @@ type WatchOption func(*WatchOptions)
func NewWatchOptions(opts ...WatchOption) WatchOptions {
options := WatchOptions{
Context: context.Background(),
Interval: DefaultWatcherInterval,
Context: context.Background(),
MinInterval: DefaultWatcherMinInterval,
MaxInterval: DefaultWatcherMaxInterval,
}
for _, o := range opts {
o(&options)
@@ -244,10 +247,11 @@ func WatchCoalesce(b bool) WatchOption {
}
}
// WatchInterval specifies time.Duration for pulling changes
func WatchInterval(td time.Duration) WatchOption {
// WatchInterval specifies min and max time.Duration for pulling changes
func WatchInterval(min, max time.Duration) WatchOption {
return func(o *WatchOptions) {
o.Interval = td
o.MinInterval = min
o.MaxInterval = max
}
}

View File

@@ -37,7 +37,7 @@ var (
// Error type
type Error struct {
// Id holds error id or service, usually someting like my_service or uuid
// Id holds error id or service, usually someting like my_service or id
Id string
// Detail holds some useful details about error
Detail string

View File

@@ -6,13 +6,13 @@ import (
"path/filepath"
"sync"
"github.com/google/uuid"
"github.com/silas/dag"
"github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/store"
"github.com/unistack-org/micro/v3/util/id"
)
type microFlow struct {
@@ -149,18 +149,18 @@ func (w *microWorkflow) getSteps(start string, reverse bool) ([][]Step, error) {
return steps, nil
}
func (w *microWorkflow) Abort(ctx context.Context, eid string) error {
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
func (w *microWorkflow) Abort(ctx context.Context, id string) error {
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id))
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusAborted.String())})
}
func (w *microWorkflow) Suspend(ctx context.Context, eid string) error {
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
func (w *microWorkflow) Suspend(ctx context.Context, id string) error {
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id))
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusSuspend.String())})
}
func (w *microWorkflow) Resume(ctx context.Context, eid string) error {
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
func (w *microWorkflow) Resume(ctx context.Context, id string) error {
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id))
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusRunning.String())})
}
@@ -176,11 +176,10 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
}
w.Unlock()
uid, err := uuid.NewRandom()
eid, err := id.New()
if err != nil {
return "", err
}
eid := uid.String()
stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid))
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
@@ -265,17 +264,16 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
}
cherr <- serr
return
} else {
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), rsp); werr != nil {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
cherr <- werr
return
}
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
cherr <- werr
return
}
}
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), rsp); werr != nil {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
cherr <- werr
return
}
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
cherr <- werr
return
}
}(cstep)
wg.Wait()
@@ -299,16 +297,15 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
}
cherr <- serr
return
} else {
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), rsp); werr != nil {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
cherr <- werr
return
}
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
cherr <- werr
return
}
}
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), rsp); werr != nil {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
cherr <- werr
return
}
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
cherr <- werr
return
}
}
}
@@ -330,7 +327,7 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
close(cherr)
case <-chstatus:
close(chstatus)
return uid.String(), nil
return eid, nil
}
switch {
@@ -338,20 +335,17 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusAborted.String())}); werr != nil {
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
}
break
case err == nil:
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
}
break
case err != nil:
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil {
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
}
break
}
return uid.String(), err
return eid, err
}
func NewFlow(opts ...Option) Flow {
@@ -500,10 +494,12 @@ func (s *microCallStep) Execute(ctx context.Context, req *Message, opts ...Execu
rsp := &codec.Frame{}
copts := []client.CallOption{client.WithRetries(0)}
if options.Timeout > 0 {
copts = append(copts, client.WithRequestTimeout(options.Timeout), client.WithDialTimeout(options.Timeout))
copts = append(copts,
client.WithRequestTimeout(options.Timeout),
client.WithDialTimeout(options.Timeout))
}
nctx := metadata.NewOutgoingContext(ctx, req.Header)
err := options.Client.Call(nctx, options.Client.NewRequest(s.service, s.method, &codec.Frame{Data: req.Body}), rsp)
err := options.Client.Call(nctx, options.Client.NewRequest(s.service, s.method, &codec.Frame{Data: req.Body}), rsp, copts...)
if err != nil {
return nil, err
}
@@ -554,7 +550,7 @@ func (s *microPublishStep) String() string {
if s.opts.ID != "" {
return s.opts.ID
}
return fmt.Sprintf("%s", s.topic)
return s.topic
}
func (s *microPublishStep) Name() string {

View File

@@ -116,11 +116,11 @@ type Workflow interface {
// Steps returns steps slice where parallel steps returned on the same level
Steps() ([][]Step, error)
// Suspend suspends execution
Suspend(ctx context.Context, eid string) error
Suspend(ctx context.Context, id string) error
// Resume resumes execution
Resume(ctx context.Context, eid string) error
Resume(ctx context.Context, id string) error
// Abort abort execution
Abort(ctx context.Context, eid string) error
Abort(ctx context.Context, id string) error
}
// Flow the base interface to interact with workflows

9
go.mod
View File

@@ -4,10 +4,11 @@ go 1.16
require (
github.com/ef-ds/deque v1.0.4
github.com/golang-jwt/jwt v3.2.1+incompatible
github.com/google/uuid v1.3.0
github.com/golang-jwt/jwt/v4 v4.1.0
github.com/imdario/mergo v0.3.12
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34
golang.org/x/net v0.0.0-20210510120150-4163338589ed
github.com/silas/dag v0.0.0-20210626123444-3804bac2d6d4
github.com/unistack-org/micro-proto v0.0.9
golang.org/x/net v0.0.0-20210928044308-7d9f5e0b762b
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
)

27
go.sum
View File

@@ -1,22 +1,37 @@
github.com/ef-ds/deque v1.0.4 h1:iFAZNmveMT9WERAkqLJ+oaABF9AcVQ5AjXem/hroniI=
github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg=
github.com/golang-jwt/jwt v3.2.1+incompatible h1:73Z+4BJcrTC+KczS6WvTPvRGOp1WmfEP4Q1lOd9Z/+c=
github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/golang-jwt/jwt/v4 v4.0.0 h1:RAqyYixv1p7uEnocuy8P1nru5wprCh/MH2BIlW5z5/o=
github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg=
github.com/golang-jwt/jwt/v4 v4.1.0 h1:XUgk2Ex5veyVFVeLm0xhusUTQybEbexJXrvPNOKkSY0=
github.com/golang-jwt/jwt/v4 v4.1.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU=
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34 h1:vBfVmA5mZhsQa2jr1FOL9nfA37N/jnbBmi5XUfviVTI=
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
golang.org/x/net v0.0.0-20210510120150-4163338589ed h1:p9UgmWI9wKpfYmgaV/IZKGdXc5qEK45tDwwwDyjS26I=
golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
github.com/silas/dag v0.0.0-20210626123444-3804bac2d6d4 h1:fOH64AB0C3ixGf9emky61STvPJL3smxJg+1Zwx1oCdg=
github.com/silas/dag v0.0.0-20210626123444-3804bac2d6d4/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
github.com/unistack-org/micro-proto v0.0.9 h1:KrWLS4FUX7UAWNAilQf70uad6ZPf/0EudeddCXllRVc=
github.com/unistack-org/micro-proto v0.0.9/go.mod h1:Cckwmzd89gvS7ThxzZp9kQR/EOdksFQcsTAtDDyKwrg=
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d h1:20cMwl2fHAzkJMEA+8J4JgqBQcQGzbisXo31MIeenXI=
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210928044308-7d9f5e0b762b h1:eB48h3HiRycXNy8E0Gf5e0hv7YT6Kt14L/D73G1fuwo=
golang.org/x/net v0.0.0-20210928044308-7d9f5e0b762b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View File

@@ -11,15 +11,6 @@ import (
"time"
)
func init() {
lvl, err := GetLevel(os.Getenv("MICRO_LOG_LEVEL"))
if err != nil {
lvl = InfoLevel
}
DefaultLogger = NewLogger(WithLevel(lvl))
}
type defaultLogger struct {
enc *json.Encoder
opts Options
@@ -40,7 +31,6 @@ func (l *defaultLogger) Init(opts ...Option) error {
l.logFunc = l.opts.Wrappers[i-1].Log(l.logFunc)
l.logfFunc = l.opts.Wrappers[i-1].Logf(l.logfFunc)
}
l.Unlock()
return nil
}
@@ -49,6 +39,28 @@ func (l *defaultLogger) String() string {
return "micro"
}
func (l *defaultLogger) Clone(opts ...Option) Logger {
newopts := NewOptions(opts...)
oldopts := l.opts
for _, o := range opts {
o(&newopts)
o(&oldopts)
}
oldopts.Wrappers = newopts.Wrappers
l.Lock()
cl := &defaultLogger{opts: oldopts, logFunc: l.logFunc, logfFunc: l.logfFunc}
l.Unlock()
// wrap the Log func
for i := len(newopts.Wrappers); i > 0; i-- {
cl.logFunc = newopts.Wrappers[i-1].Log(cl.logFunc)
cl.logfFunc = newopts.Wrappers[i-1].Logf(cl.logfFunc)
}
return cl
}
func (l *defaultLogger) V(level Level) bool {
l.RLock()
ok := l.opts.Level.Enabled(level)
@@ -56,26 +68,26 @@ func (l *defaultLogger) V(level Level) bool {
return ok
}
func (l *defaultLogger) Fields(fields map[string]interface{}) Logger {
nl := &defaultLogger{opts: l.opts, enc: l.enc}
nl.opts.Fields = make(map[string]interface{}, len(l.opts.Fields)+len(fields))
l.RLock()
for k, v := range l.opts.Fields {
nl.opts.Fields[k] = v
}
l.RUnlock()
func (l *defaultLogger) Level(level Level) {
l.Lock()
l.opts.Level = level
l.Unlock()
}
for k, v := range fields {
nl.opts.Fields[k] = v
func (l *defaultLogger) Fields(fields ...interface{}) Logger {
nl := &defaultLogger{opts: l.opts, enc: l.enc}
if len(fields) == 0 {
return nl
} else if len(fields)%2 != 0 {
fields = fields[:len(fields)-1]
}
nl.opts.Fields = append(nl.opts.Fields, fields...)
return nl
}
func copyFields(src map[string]interface{}) map[string]interface{} {
dst := make(map[string]interface{}, len(src))
for k, v := range src {
dst[k] = v
}
func copyFields(src []interface{}) []interface{} {
dst := make([]interface{}, len(src))
copy(dst, src)
return dst
}
@@ -162,19 +174,23 @@ func (l *defaultLogger) Log(ctx context.Context, level Level, args ...interface{
fields := copyFields(l.opts.Fields)
l.RUnlock()
fields["level"] = level.String()
fields = append(fields, "level", level.String())
if _, file, line, ok := runtime.Caller(l.opts.CallerSkipCount); ok {
fields["caller"] = fmt.Sprintf("%s:%d", logCallerfilePath(file), line)
fields = append(fields, "caller", fmt.Sprintf("%s:%d", logCallerfilePath(file), line))
}
fields = append(fields, "timestamp", time.Now().Format("2006-01-02 15:04:05"))
fields["timestamp"] = time.Now().Format("2006-01-02 15:04:05")
if len(args) > 0 {
fields["msg"] = fmt.Sprint(args...)
fields = append(fields, "msg", fmt.Sprint(args...))
}
out := make(map[string]interface{}, len(fields)/2)
for i := 0; i < len(fields); i += 2 {
out[fields[i].(string)] = fields[i+1]
}
l.RLock()
_ = l.enc.Encode(fields)
_ = l.enc.Encode(out)
l.RUnlock()
}
@@ -187,30 +203,30 @@ func (l *defaultLogger) Logf(ctx context.Context, level Level, msg string, args
fields := copyFields(l.opts.Fields)
l.RUnlock()
fields["level"] = level.String()
fields = append(fields, "level", level.String())
if _, file, line, ok := runtime.Caller(l.opts.CallerSkipCount); ok {
fields["caller"] = fmt.Sprintf("%s:%d", logCallerfilePath(file), line)
fields = append(fields, "caller", fmt.Sprintf("%s:%d", logCallerfilePath(file), line))
}
fields["timestamp"] = time.Now().Format("2006-01-02 15:04:05")
fields = append(fields, "timestamp", time.Now().Format("2006-01-02 15:04:05"))
if len(args) > 0 {
fields["msg"] = fmt.Sprintf(msg, args...)
fields = append(fields, "msg", fmt.Sprintf(msg, args...))
} else if msg != "" {
fields["msg"] = msg
fields = append(fields, "msg", msg)
}
out := make(map[string]interface{}, len(fields)/2)
for i := 0; i < len(fields); i += 2 {
out[fields[i].(string)] = fields[i+1]
}
l.RLock()
_ = l.enc.Encode(fields)
_ = l.enc.Encode(out)
l.RUnlock()
}
func (l *defaultLogger) Options() Options {
// not guard against options Context values
l.RLock()
opts := l.opts
opts.Fields = copyFields(l.opts.Fields)
l.RUnlock()
return opts
return l.opts
}
// NewLogger builds a new logger based on options

View File

@@ -1,24 +1,20 @@
package logger
import (
"fmt"
)
// Level means logger level
type Level int8
const (
// TraceLevel level. Designates finer-grained informational events than the Debug.
// TraceLevel level usually used to find bugs, very verbose
TraceLevel Level = iota - 2
// DebugLevel level. Usually only enabled when debugging. Very verbose logging.
// DebugLevel level used only when enabled debugging
DebugLevel
// InfoLevel level. General operational entries about what's going on inside the application.
// InfoLevel level used for general info about what's going on inside the application
InfoLevel
// WarnLevel level. Non-critical entries that deserve eyes.
// WarnLevel level used for non-critical entries
WarnLevel
// ErrorLevel level. Used for errors that should definitely be noted.
// ErrorLevel level used for errors that should definitely be noted
ErrorLevel
// FatalLevel level. Logs and then calls `os.Exit(1)`. highest level of severity.
// FatalLevel level used for critical errors and then calls `os.Exit(1)`
FatalLevel
)
@@ -38,7 +34,7 @@ func (l Level) String() string {
case FatalLevel:
return "fatal"
}
return ""
return "info"
}
// Enabled returns true if the given level is at or above this level.
@@ -46,22 +42,22 @@ func (l Level) Enabled(lvl Level) bool {
return lvl >= l
}
// GetLevel converts a level string into a logger Level value.
// returns an error if the input string does not match known values.
func GetLevel(levelStr string) (Level, error) {
switch levelStr {
// ParseLevel converts a level string into a logger Level value.
// returns an InfoLevel if the input string does not match known values.
func ParseLevel(lvl string) Level {
switch lvl {
case TraceLevel.String():
return TraceLevel, nil
return TraceLevel
case DebugLevel.String():
return DebugLevel, nil
return DebugLevel
case InfoLevel.String():
return InfoLevel, nil
return InfoLevel
case WarnLevel.String():
return WarnLevel, nil
return WarnLevel
case ErrorLevel.String():
return ErrorLevel, nil
return ErrorLevel
case FatalLevel.String():
return FatalLevel, nil
return FatalLevel
}
return InfoLevel, fmt.Errorf("unknown Level String: '%s', use InfoLevel", levelStr)
return InfoLevel
}

View File

@@ -1,11 +1,14 @@
// Package logger provides a log interface
package logger
import "context"
import (
"context"
"os"
)
var (
// DefaultLogger variable
DefaultLogger Logger = NewLogger()
DefaultLogger Logger = NewLogger(WithLevel(ParseLevel(os.Getenv("MICRO_LOG_LEVEL"))))
// DefaultLevel used by logger
DefaultLevel Level = InfoLevel
// DefaultCallerSkipCount used by logger
@@ -16,12 +19,16 @@ var (
type Logger interface {
// Init initialises options
Init(opts ...Option) error
// Clone create logger copy with new options
Clone(opts ...Option) Logger
// V compare provided verbosity level with current log level
V(level Level) bool
// Level sets the log level for logger
Level(level Level)
// The Logger options
Options() Options
// Fields set fields to always be logged
Fields(fields map[string]interface{}) Logger
// Fields set fields to always be logged with keyval pairs
Fields(fields ...interface{}) Logger
// Info level message
Info(ctx context.Context, args ...interface{})
// Trace level message
@@ -54,6 +61,9 @@ type Logger interface {
String() string
}
// Field contains keyval pair
type Field interface{}
// Info writes msg to default logger on info level
func Info(ctx context.Context, args ...interface{}) {
DefaultLogger.Info(ctx, args...)
@@ -125,6 +135,6 @@ func Init(opts ...Option) error {
}
// Fields create logger with specific fields
func Fields(fields map[string]interface{}) Logger {
return DefaultLogger.Fields(fields)
func Fields(fields ...interface{}) Logger {
return DefaultLogger.Fields(fields...)
}

View File

@@ -3,9 +3,58 @@ package logger
import (
"bytes"
"context"
"log"
"testing"
)
func TestClone(t *testing.T) {
ctx := context.TODO()
buf := bytes.NewBuffer(nil)
l := NewLogger(WithLevel(TraceLevel), WithOutput(buf))
if err := l.Init(); err != nil {
t.Fatal(err)
}
nl := l.Clone(WithLevel(ErrorLevel))
if err := nl.Init(); err != nil {
t.Fatal(err)
}
nl.Info(ctx, "info message")
if len(buf.Bytes()) != 0 {
t.Fatal("message must not be logged")
}
l.Info(ctx, "info message")
if len(buf.Bytes()) == 0 {
t.Fatal("message must be logged")
}
}
func TestRedirectStdLogger(t *testing.T) {
buf := bytes.NewBuffer(nil)
l := NewLogger(WithLevel(TraceLevel), WithOutput(buf))
if err := l.Init(); err != nil {
t.Fatal(err)
}
fn := RedirectStdLogger(l, ErrorLevel)
defer fn()
log.Print("test")
if !bytes.Contains(buf.Bytes(), []byte(`"level":"error","msg":"test","timestamp"`)) {
t.Fatalf("logger error, buf %s", buf.Bytes())
}
}
func TestStdLogger(t *testing.T) {
buf := bytes.NewBuffer(nil)
l := NewLogger(WithLevel(TraceLevel), WithOutput(buf))
if err := l.Init(); err != nil {
t.Fatal(err)
}
lg := NewStdLogger(l, ErrorLevel)
lg.Print("test")
if !bytes.Contains(buf.Bytes(), []byte(`"level":"error","msg":"test","timestamp"`)) {
t.Fatalf("logger error, buf %s", buf.Bytes())
}
}
func TestLogger(t *testing.T) {
ctx := context.TODO()
buf := bytes.NewBuffer(nil)
@@ -15,7 +64,7 @@ func TestLogger(t *testing.T) {
}
l.Trace(ctx, "trace_msg1")
l.Warn(ctx, "warn_msg1")
l.Fields(map[string]interface{}{"error": "test"}).Info(ctx, "error message")
l.Fields("error", "test").Info(ctx, "error message")
l.Warn(ctx, "first", " ", "second")
if !bytes.Contains(buf.Bytes(), []byte(`"level":"trace","msg":"trace_msg1"`)) {
t.Fatalf("logger error, buf %s", buf.Bytes())

View File

@@ -16,7 +16,7 @@ type Options struct {
// Context holds exernal options
Context context.Context
// Fields holds additional metadata
Fields map[string]interface{}
Fields []interface{}
// Name holds the logger name
Name string
// CallerSkipCount number of frmaes to skip
@@ -31,7 +31,7 @@ type Options struct {
func NewOptions(opts ...Option) Options {
options := Options{
Level: DefaultLevel,
Fields: make(map[string]interface{}),
Fields: make([]interface{}, 0, 6),
Out: os.Stderr,
CallerSkipCount: DefaultCallerSkipCount,
Context: context.Background(),
@@ -43,7 +43,7 @@ func NewOptions(opts ...Option) Options {
}
// WithFields set default fields for the logger
func WithFields(fields map[string]interface{}) Option {
func WithFields(fields ...interface{}) Option {
return func(o *Options) {
o.Fields = fields
}

35
logger/stdlogger.go Normal file
View File

@@ -0,0 +1,35 @@
package logger
import (
"bytes"
"log"
)
type stdLogger struct {
l Logger
level Level
}
func NewStdLogger(l Logger, level Level) *log.Logger {
return log.New(&stdLogger{l: l, level: level}, "" /* prefix */, 0 /* flags */)
}
func (sl *stdLogger) Write(p []byte) (int, error) {
p = bytes.TrimSpace(p)
sl.l.Log(sl.l.Options().Context, sl.level, string(p))
return len(p), nil
}
func RedirectStdLogger(l Logger, level Level) func() {
flags := log.Flags()
prefix := log.Prefix()
writer := log.Writer()
log.SetFlags(0)
log.SetPrefix("")
log.SetOutput(&stdLogger{l: l, level: level})
return func() {
log.SetFlags(flags)
log.SetPrefix(prefix)
log.SetOutput(writer)
}
}

View File

@@ -20,9 +20,7 @@ type Wrapper interface {
Logf(LogfFunc) LogfFunc
}
var (
_ Logger = &OmitLogger{}
)
var _ Logger = &OmitLogger{}
type OmitLogger struct {
l Logger
@@ -40,12 +38,20 @@ func (w *OmitLogger) V(level Level) bool {
return w.l.V(level)
}
func (w *OmitLogger) Level(level Level) {
w.l.Level(level)
}
func (w *OmitLogger) Clone(opts ...Option) Logger {
return w.l.Clone(opts...)
}
func (w *OmitLogger) Options() Options {
return w.l.Options()
}
func (w *OmitLogger) Fields(fields map[string]interface{}) Logger {
return w.l.Fields(fields)
func (w *OmitLogger) Fields(fields ...interface{}) Logger {
return w.l.Fields(fields...)
}
func (w *OmitLogger) Info(ctx context.Context, args ...interface{}) {
@@ -119,23 +125,29 @@ func getArgs(args []interface{}) []interface{} {
var err error
for _, arg := range args {
val := reflect.ValueOf(arg)
switch val.Kind() {
case reflect.Ptr:
if val.Kind() == reflect.Ptr {
val = val.Elem()
}
narg := arg
if val.Kind() == reflect.Struct {
if narg, err = rutil.Zero(arg); err == nil {
rutil.CopyDefaults(narg, arg)
if flds, ferr := rutil.StructFields(narg); ferr == nil {
for _, fld := range flds {
if tv, ok := fld.Field.Tag.Lookup("logger"); ok && tv == "omit" {
fld.Value.Set(reflect.Zero(fld.Value.Type()))
}
}
if val.Kind() != reflect.Struct {
nargs = append(nargs, narg)
continue
}
if narg, err = rutil.Zero(arg); err != nil {
nargs = append(nargs, narg)
continue
}
rutil.CopyDefaults(narg, arg)
if flds, ferr := rutil.StructFields(narg); ferr == nil {
for _, fld := range flds {
if tv, ok := fld.Field.Tag.Lookup("logger"); ok && tv == "omit" {
fld.Value.Set(reflect.Zero(fld.Value.Type()))
}
}
}
nargs = append(nargs, narg)
}
return nargs

View File

@@ -1,3 +1,12 @@
package meter
//go:generate protoc -I./handler -I../ -I/home/vtolstov/.cache/go-path/pkg/mod/github.com/unistack-org/micro-proto@v0.0.1 --micro_out=components=micro|http|server,standalone=false,debug=true,paths=source_relative:./handler handler/handler.proto
//go:generate sh -c "protoc -I./handler -I../ -I$(go list -f '{{ .Dir }}' -m github.com/unistack-org/micro-proto) --go-micro_out='components=micro|http|server',standalone=false,debug=true,paths=source_relative:./handler handler/handler.proto"
import (
// import required packages
_ "github.com/unistack-org/micro-proto/api"
// import required packages
_ "github.com/unistack-org/micro-proto/openapiv2"
)

View File

@@ -11,17 +11,17 @@ service Meter {
rpc Metrics(micro.codec.Frame) returns (micro.codec.Frame) {
option (micro.openapiv2.openapiv2_operation) = {
operation_id: "Metrics";
responses: {
key: "default";
responses: {
response_code: {
name: "default";
value: {
description: "Error response";
schema: {
json_schema: {
ref: "micro.codec.Frame";
}
}
}
}
json_reference: {
description: "Error response";
_ref: "micro.codec.Frame";
};
};
};
};
};
option (micro.api.http) = { get: "/metrics"; };
};

View File

@@ -1,22 +1,31 @@
// Code generated by protoc-gen-micro
// Code generated by protoc-gen-go-micro. DO NOT EDIT.
// protoc-gen-go-micro version: v3.4.2
// source: handler.proto
package handler
import (
context "context"
api "github.com/unistack-org/micro/v3/api"
codec "github.com/unistack-org/micro/v3/codec"
)
func NewMeterEndpoints() []*api.Endpoint {
return []*api.Endpoint{
&api.Endpoint{
var (
MeterName = "Meter"
MeterEndpoints = []api.Endpoint{
{
Name: "Meter.Metrics",
Path: []string{"/metrics"},
Method: []string{"GET"},
Handler: "rpc",
},
}
)
func NewMeterEndpoints() []api.Endpoint {
return MeterEndpoints
}
type MeterServer interface {

View File

@@ -1,9 +1,12 @@
// Code generated by protoc-gen-micro
// Code generated by protoc-gen-go-micro. DO NOT EDIT.
// protoc-gen-go-micro version: v3.4.2
// source: handler.proto
package handler
import (
context "context"
api "github.com/unistack-org/micro/v3/api"
codec "github.com/unistack-org/micro/v3/codec"
server "github.com/unistack-org/micro/v3/server"
@@ -26,8 +29,8 @@ func RegisterMeterServer(s server.Server, sh MeterServer, opts ...server.Handler
}
h := &meterServer{sh}
var nopts []server.HandlerOption
for _, endpoint := range NewMeterEndpoints() {
nopts = append(nopts, api.WithEndpoint(endpoint))
for _, endpoint := range MeterEndpoints {
nopts = append(nopts, api.WithEndpoint(&endpoint))
}
return s.Handle(s.NewHandler(&Meter{h}, append(nopts, opts...)...))
}

View File

@@ -33,7 +33,7 @@ func TestBuildLabels(t *testing.T) {
}
data := []testData{
testData{
{
src: []string{"zerolabel", "value3", "firstlabel", "value2"},
dst: []string{"firstlabel", "value2", "zerolabel", "value3"},
},
@@ -48,15 +48,15 @@ func TestBuildLabels(t *testing.T) {
func TestBuildName(t *testing.T) {
data := map[string][]string{
`my_metric{firstlabel="value2",zerolabel="value3"}`: []string{
`my_metric{firstlabel="value2",zerolabel="value3"}`: {
"my_metric",
"zerolabel", "value3", "firstlabel", "value2",
},
`my_metric{broker="broker2",register="mdns",server="tcp"}`: []string{
`my_metric{broker="broker2",register="mdns",server="tcp"}`: {
"my_metric",
"broker", "broker1", "broker", "broker2", "server", "http", "server", "tcp", "register", "mdns",
},
`my_metric{aaa="aaa"}`: []string{
`my_metric{aaa="aaa"}`: {
"my_metric",
"aaa", "aaa",
},

View File

@@ -1,13 +1,13 @@
package network
import (
"github.com/google/uuid"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/meter"
"github.com/unistack-org/micro/v3/network/tunnel"
"github.com/unistack-org/micro/v3/proxy"
"github.com/unistack-org/micro/v3/router"
"github.com/unistack-org/micro/v3/tracer"
"github.com/unistack-org/micro/v3/util/id"
)
// Option func
@@ -119,7 +119,7 @@ func Tracer(t tracer.Tracer) Option {
// NewOptions returns network default options
func NewOptions(opts ...Option) Options {
options := Options{
Id: uuid.New().String(),
Id: id.Must(),
Name: "go.micro",
Address: ":0",
Logger: logger.DefaultLogger,

View File

@@ -7,6 +7,7 @@ import (
"github.com/unistack-org/micro/v3/broker"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/network/transport"
"github.com/unistack-org/micro/v3/network/tunnel"
)
@@ -81,7 +82,7 @@ func (t *tunBroker) BatchPublish(ctx context.Context, msgs []*broker.Message, op
topic, _ := msg.Header.Get(metadata.HeaderTopic)
c, ok := topicMap[topic]
if !ok {
c, err := t.tunnel.Dial(ctx, topic, tunnel.DialMode(tunnel.Multicast))
c, err = t.tunnel.Dial(ctx, topic, tunnel.DialMode(tunnel.Multicast))
if err != nil {
return err
}

View File

@@ -3,11 +3,11 @@ package tunnel
import (
"time"
"github.com/google/uuid"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/meter"
"github.com/unistack-org/micro/v3/network/transport"
"github.com/unistack-org/micro/v3/tracer"
"github.com/unistack-org/micro/v3/util/id"
)
var (
@@ -164,7 +164,7 @@ func DialWait(b bool) DialOption {
// NewOptions returns router default options with filled values
func NewOptions(opts ...Option) Options {
options := Options{
ID: uuid.New().String(),
ID: id.Must(),
Address: DefaultAddress,
Token: DefaultToken,
Logger: logger.DefaultLogger,

View File

@@ -31,7 +31,7 @@ func (p *profiler) writeHeap(f *os.File) {
select {
case <-t.C:
runtime.GC()
pprof.WriteHeapProfile(f)
_ = pprof.WriteHeapProfile(f)
case <-p.exit:
return
}

View File

@@ -6,8 +6,8 @@ import (
"sync"
"time"
"github.com/google/uuid"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/util/id"
)
var (
@@ -378,13 +378,16 @@ func (m *memory) ListServices(ctx context.Context, opts ...ListOption) ([]*Servi
}
func (m *memory) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) {
id, err := id.New()
if err != nil {
return nil, err
}
wo := NewWatchOptions(opts...)
// construct the watcher
w := &watcher{
exit: make(chan bool),
res: make(chan *Result),
id: uuid.New().String(),
id: id,
wo: wo,
}

View File

@@ -40,8 +40,9 @@ func (d *dns) Lookup(opts ...QueryOption) ([]Route, error) {
// check to see if we have the port provided in the service, e.g. go-micro-srv-foo:8000
host, port, err := net.SplitHostPort(options.Service)
if err == nil {
var ips []string
// lookup the service using A records
ips, err := net.LookupHost(host)
ips, err = net.LookupHost(host)
if err != nil {
return nil, err
}
@@ -53,7 +54,7 @@ func (d *dns) Lookup(opts ...QueryOption) ([]Route, error) {
for i, ip := range ips {
result[i] = Route{
Service: options.Service,
Address: fmt.Sprintf("%s:%d", ip, uint16(p)),
Address: fmt.Sprintf("%s:%d", ip, p),
}
}
return result, nil

View File

@@ -3,9 +3,9 @@ package router
import (
"context"
"github.com/google/uuid"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/register"
"github.com/unistack-org/micro/v3/util/id"
)
// Options are router options
@@ -80,7 +80,7 @@ func Name(n string) Option {
// NewOptions returns router default options
func NewOptions(opts ...Option) Options {
options := Options{
Id: uuid.New().String(),
Id: id.Must(),
Network: DefaultNetwork,
Register: register.DefaultRegister,
Logger: logger.DefaultLogger,

View File

@@ -1,3 +1,12 @@
package server
//go:generate protoc -I./health -I../ -I/home/vtolstov/.cache/go-path/pkg/mod/github.com/unistack-org/micro-proto@v0.0.1 --micro_out=components=micro|http|server,standalone=false,debug=true,paths=source_relative:./health health/health.proto
//go:generate sh -c "protoc -I./health -I../ -I$(go list -f '{{ .Dir }}' -m github.com/unistack-org/micro-proto) --go-micro_out='components=micro|http|server',standalone=false,debug=true,paths=source_relative:./health health/health.proto"
import (
// import required packages
_ "github.com/unistack-org/micro-proto/api"
// import required packages
_ "github.com/unistack-org/micro-proto/openapiv2"
)

View File

@@ -11,51 +11,51 @@ service Health {
rpc Live(micro.codec.Frame) returns (micro.codec.Frame) {
option (micro.openapiv2.openapiv2_operation) = {
operation_id: "Live";
responses: {
key: "default";
responses: {
response_code: {
name: "default";
value: {
description: "Error response";
schema: {
json_schema: {
ref: "micro.codec.Frame";
}
}
}
}
json_reference: {
description: "Error response";
_ref: "micro.codec.Frame";
};
};
};
};
};
option (micro.api.http) = { get: "/live"; };
};
rpc Ready(micro.codec.Frame) returns (micro.codec.Frame) {
option (micro.openapiv2.openapiv2_operation) = {
operation_id: "Ready";
responses: {
key: "default";
responses: {
response_code: {
name: "default";
value: {
description: "Error response";
schema: {
json_schema: {
ref: "micro.codec.Frame";
}
}
}
}
json_reference: {
description: "Error response";
_ref: "micro.codec.Frame";
};
};
};
};
};
option (micro.api.http) = { get: "/ready"; };
};
rpc Version(micro.codec.Frame) returns (micro.codec.Frame) {
option (micro.openapiv2.openapiv2_operation) = {
operation_id: "Version";
responses: {
key: "default";
responses: {
response_code: {
name: "default";
value: {
description: "Error response";
schema: {
json_schema: {
ref: "micro.codec.Frame";
}
}
}
}
json_reference: {
description: "Error response";
_ref: "micro.codec.Frame";
};
};
};
};
};
option (micro.api.http) = { get: "/version"; };
};

View File

@@ -1,34 +1,43 @@
// Code generated by protoc-gen-micro
// Code generated by protoc-gen-go-micro. DO NOT EDIT.
// protoc-gen-go-micro version: v3.4.2
// source: health.proto
package health
import (
context "context"
api "github.com/unistack-org/micro/v3/api"
codec "github.com/unistack-org/micro/v3/codec"
)
func NewHealthEndpoints() []*api.Endpoint {
return []*api.Endpoint{
&api.Endpoint{
var (
HealthName = "Health"
HealthEndpoints = []api.Endpoint{
{
Name: "Health.Live",
Path: []string{"/live"},
Method: []string{"GET"},
Handler: "rpc",
},
&api.Endpoint{
{
Name: "Health.Ready",
Path: []string{"/ready"},
Method: []string{"GET"},
Handler: "rpc",
},
&api.Endpoint{
{
Name: "Health.Version",
Path: []string{"/version"},
Method: []string{"GET"},
Handler: "rpc",
},
}
)
func NewHealthEndpoints() []api.Endpoint {
return HealthEndpoints
}
type HealthServer interface {

View File

@@ -1,9 +1,12 @@
// Code generated by protoc-gen-micro
// Code generated by protoc-gen-go-micro. DO NOT EDIT.
// protoc-gen-go-micro version: v3.4.2
// source: health.proto
package health
import (
context "context"
api "github.com/unistack-org/micro/v3/api"
codec "github.com/unistack-org/micro/v3/codec"
server "github.com/unistack-org/micro/v3/server"
@@ -36,8 +39,8 @@ func RegisterHealthServer(s server.Server, sh HealthServer, opts ...server.Handl
}
h := &healthServer{sh}
var nopts []server.HandlerOption
for _, endpoint := range NewHealthEndpoints() {
nopts = append(nopts, api.WithEndpoint(endpoint))
for _, endpoint := range HealthEndpoints {
nopts = append(nopts, api.WithEndpoint(&endpoint))
}
return s.Handle(s.NewHandler(&Health{h}, append(nopts, opts...)...))
}

View File

@@ -190,7 +190,7 @@ func (n *noopServer) Register() error {
}
// register the service
if err := DefaultRegisterFunc(service, config); err != nil {
if err = DefaultRegisterFunc(service, config); err != nil {
return err
}

View File

@@ -20,12 +20,8 @@ type TestMessage struct {
Name string
}
var (
numMsg int = 8
)
func (h *TestHandler) SingleSubHandler(ctx context.Context, msg *codec.Frame) error {
//fmt.Printf("msg %s\n", msg.Data)
// fmt.Printf("msg %s\n", msg.Data)
return nil
}

View File

@@ -5,10 +5,10 @@ import (
"context"
"time"
"github.com/google/uuid"
"github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/register"
"github.com/unistack-org/micro/v3/util/id"
)
// DefaultServer default server
@@ -22,7 +22,7 @@ var (
// DefaultVersion will be used if no version passed
DefaultVersion = "latest"
// DefaultID will be used if no id passed
DefaultID = uuid.New().String()
DefaultID = id.Must()
// DefaultRegisterCheck holds func that run before register server
DefaultRegisterCheck = func(context.Context) error { return nil }
// DefaultRegisterInterval holds interval for register

View File

@@ -11,6 +11,7 @@ import (
"unicode/utf8"
"github.com/unistack-org/micro/v3/broker"
"github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/errors"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/metadata"
@@ -72,7 +73,7 @@ func ValidateSubscriber(sub Subscriber) error {
if argType.Kind() != reflect.Slice {
return fmt.Errorf("subscriber %v dont have required signature %s", name, batchSubSig)
}
if strings.Compare(fmt.Sprintf("%s", argType), "[]interface{}") == 0 {
if strings.Compare(fmt.Sprintf("%v", argType), "[]interface{}") == 0 {
return fmt.Errorf("subscriber %v dont have required signaure %s", name, batchSubSig)
}
}
@@ -244,9 +245,9 @@ func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.Bat
}
reqType := handler.reqType
var cf codec.Codec
for _, msg := range msgs {
cf, err := n.newCodec(msg.ContentType())
cf, err = n.newCodec(msg.ContentType())
if err != nil {
return err
}

View File

@@ -32,8 +32,8 @@ type Options struct {
Namespace string
// Addrs contains store address
Addrs []string
//Wrappers store wrapper that called before actual functions
//Wrappers []Wrapper
// Wrappers store wrapper that called before actual functions
// Wrappers []Wrapper
}
// NewOptions creates options struct

View File

@@ -5,21 +5,19 @@ import (
)
// LogfFunc function used for Logf method
//type LogfFunc func(ctx context.Context, level Level, msg string, args ...interface{})
//type Wrapper interface {
// Logf logs message with needed level
//Logf(LogfFunc) LogfFunc
//}
// type LogfFunc func(ctx context.Context, level Level, msg string, args ...interface{})
// type Wrapper interface {
// Logf logs message with needed level
// Logf(LogfFunc) LogfFunc
// }
// NamespaceStore wrap store with namespace
type NamespaceStore struct {
s Store
ns string
}
var (
_ Store = &NamespaceStore{}
)
var _ Store = &NamespaceStore{}
func NewNamespaceStore(s Store, ns string) Store {
return &NamespaceStore{s: s, ns: ns}
@@ -69,7 +67,7 @@ func (w *NamespaceStore) String() string {
return w.s.String()
}
//type NamespaceWrapper struct{}
// type NamespaceWrapper struct{}
//func NewNamespaceWrapper() Wrapper {
// return &NamespaceWrapper{}

View File

@@ -4,9 +4,9 @@ import (
"context"
"time"
"github.com/google/uuid"
"github.com/unistack-org/micro/v3/auth"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/util/id"
)
// Verify the auth credentials and refresh the auth token periodically
@@ -22,7 +22,11 @@ func Verify(a auth.Auth) error {
auth.WithScopes("service"),
}
acc, err := a.Generate(uuid.New().String(), opts...)
id, err := id.New()
if err != nil {
return err
}
acc, err := a.Generate(id, opts...)
if err != nil {
return err
}

View File

@@ -3,6 +3,7 @@
package http
import (
"io"
"net"
"net/http"
"testing"
@@ -80,5 +81,4 @@ func TestRoundTripper(t *testing.T) {
if string(b) != "hello world" {
t.Fatal("response is", string(b))
}
}

201
util/http/trie.go Normal file
View File

@@ -0,0 +1,201 @@
package http
import (
"regexp"
"strings"
"sync"
)
// Tree is a trie tree.
type Trie struct {
node *node
rcache map[string]*regexp.Regexp
rmu sync.RWMutex
}
// node is a node of tree
type node struct {
actions map[string]interface{} // key is method, val is handler interface
children map[string]*node // key is label of next nodes
label string
}
const (
pathRoot string = "/"
pathDelimiter string = "/"
paramDelimiter string = ":"
leftPtnDelimiter string = "{"
rightPtnDelimiter string = "}"
ptnWildcard string = "(.+)"
)
// NewTree creates a new trie tree.
func NewTrie() *Trie {
return &Trie{
node: &node{
label: pathRoot,
actions: make(map[string]interface{}),
children: make(map[string]*node),
},
rcache: make(map[string]*regexp.Regexp),
}
}
// Insert inserts a route definition to tree.
func (t *Trie) Insert(methods []string, path string, handler interface{}) {
curNode := t.node
if path == pathRoot {
curNode.label = path
for _, method := range methods {
curNode.actions[method] = handler
}
return
}
ep := splitPath(path)
for i, p := range ep {
nextNode, ok := curNode.children[p]
if ok {
curNode = nextNode
}
// Create a new node.
if !ok {
curNode.children[p] = &node{
label: p,
actions: make(map[string]interface{}),
children: make(map[string]*node),
}
curNode = curNode.children[p]
}
// last loop.
// If there is already registered data, overwrite it.
if i == len(ep)-1 {
curNode.label = p
for _, method := range methods {
curNode.actions[method] = handler
}
break
}
}
}
// Search searches a path from a tree.
func (t *Trie) Search(method string, path string) (interface{}, map[string]string, bool) {
params := make(map[string]string)
curNode := t.node
for _, p := range splitPath(path) {
nextNode, ok := curNode.children[p]
if ok {
curNode = nextNode
continue
}
if len(curNode.children) == 0 {
if curNode.label != p {
// no matching path was found.
return nil, nil, false
}
break
}
isParamMatch := false
for c := range curNode.children {
if string([]rune(c)[0]) == leftPtnDelimiter {
ptn := getPattern(c)
t.rmu.RLock()
reg, ok := t.rcache[ptn]
t.rmu.RUnlock()
if !ok {
var err error
reg, err = regexp.Compile(ptn)
if err != nil {
return nil, nil, false
}
t.rmu.Lock()
t.rcache[ptn] = reg
t.rmu.Unlock()
}
if reg.Match([]byte(p)) {
pn := getParamName(c)
params[pn] = p
curNode = curNode.children[c]
isParamMatch = true
break
}
// no matching param was found.
return nil, nil, false
}
}
if !isParamMatch {
return nil, nil, false
}
}
if path == pathRoot {
if len(curNode.actions) == 0 {
return nil, nil, false
}
}
handler, ok := curNode.actions[method]
if !ok || handler == nil {
return nil, nil, false
}
return handler, params, true
}
// getPattern gets a pattern from a label
// {id:[^\d+$]} -> ^\d+$
// {id} -> (.+)
func getPattern(label string) string {
leftI := strings.Index(label, leftPtnDelimiter)
rightI := strings.Index(label, paramDelimiter)
// if label doesn't have any pattern, return wild card pattern as default.
if leftI == -1 || rightI == -1 {
return ptnWildcard
}
return label[rightI+1 : len(label)-1]
}
// getParamName gets a parameter from a label
// {id:[^\d+$]} -> id
// {id} -> id
func getParamName(label string) string {
leftI := strings.Index(label, leftPtnDelimiter)
rightI := func(l string) int {
r := []rune(l)
var n int
loop:
for i := 0; i < len(r); i++ {
n = i
switch string(r[i]) {
case paramDelimiter:
n = i
break loop
case rightPtnDelimiter:
n = i
break loop
}
if i == len(r)-1 {
n = i + 1
break loop
}
}
return n
}(label)
return label[leftI+1 : rightI]
}
// splitPath removes an empty value in slice.
func splitPath(path string) []string {
s := strings.Split(path, pathDelimiter)
var r []string
for _, str := range s {
if str != "" {
r = append(r, str)
}
}
return r
}

81
util/http/trie_test.go Normal file
View File

@@ -0,0 +1,81 @@
package http
import (
"net/http"
"testing"
)
func TestTrieContentType(t *testing.T) {
type handler struct {
name string
}
tr := NewTrie()
tr.Insert([]string{"application/json"}, "/v1/create/{id}", &handler{name: "test"})
h, _, ok := tr.Search("application/json", "/v1/create/12")
if !ok {
t.Fatalf("must be found error")
}
if h.(*handler).name != "test" {
t.Fatalf("invalid handler %v", h)
}
_, _, ok = tr.Search("text/xml", "/v1/create/12")
if ok {
t.Fatalf("must be not found error")
}
}
func TestTrieNoMatchMethod(t *testing.T) {
tr := NewTrie()
tr.Insert([]string{http.MethodPut}, "/v1/create/{id}", nil)
_, _, ok := tr.Search(http.MethodPost, "/v1/create")
if ok {
t.Fatalf("must be not found error")
}
}
func TestTrieMatchRegexp(t *testing.T) {
type handler struct{}
tr := NewTrie()
tr.Insert([]string{http.MethodPut}, "/v1/create/{category}/{id:[0-9]+}", &handler{})
_, params, ok := tr.Search(http.MethodPut, "/v1/create/test_cat/12345")
if !ok {
t.Fatalf("route not found")
} else if len(params) != 2 {
t.Fatalf("param matching error %v", params)
} else if params["category"] != "test_cat" {
t.Fatalf("param matching error %v", params)
}
}
func TestTrieMatchRegexpFail(t *testing.T) {
type handler struct{}
tr := NewTrie()
tr.Insert([]string{http.MethodPut}, "/v1/create/{id:[a-z]+}", &handler{})
_, _, ok := tr.Search(http.MethodPut, "/v1/create/12345")
if ok {
t.Fatalf("route must not be not found")
}
}
func TestTrieMatchLongest(t *testing.T) {
type handler struct {
name string
}
tr := NewTrie()
tr.Insert([]string{http.MethodPut}, "/v1/create", &handler{name: "first"})
tr.Insert([]string{http.MethodPut}, "/v1/create/{id:[0-9]+}", &handler{name: "second"})
if h, _, ok := tr.Search(http.MethodPut, "/v1/create/12345"); !ok {
t.Fatalf("route must be found")
} else if h.(*handler).name != "second" {
t.Fatalf("invalid handler found: %s != %s", h.(*handler).name, "second")
}
if h, _, ok := tr.Search(http.MethodPut, "/v1/create"); !ok {
t.Fatalf("route must be found")
} else if h.(*handler).name != "first" {
t.Fatalf("invalid handler found: %s != %s", h.(*handler).name, "first")
}
}

22
util/id/LICENSE Normal file
View File

@@ -0,0 +1,22 @@
The MIT License (MIT)
Copyright (c) 2018-2021 Matous Dzivjak <matousdzivjak@gmail.com>
Copyright (c) 2021 Unistack LLC <v.tolstov@unistack.org>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

112
util/id/id.go Normal file
View File

@@ -0,0 +1,112 @@
package id
import (
"context"
"crypto/rand"
"errors"
"math"
"github.com/unistack-org/micro/v3/logger"
)
// DefaultAlphabet is the alphabet used for ID characters by default
var DefaultAlphabet = []rune("6789BCDFGHJKLMNPQRTWbcdfghjkmnpqrtwz")
// DefaultSize is the size used for ID by default
// To get uuid like collision specify 21
var DefaultSize = 16
// getMask generates bit mask used to obtain bits from the random bytes that are used to get index of random character
// from the alphabet. Example: if the alphabet has 6 = (110)_2 characters it is sufficient to use mask 7 = (111)_2
func getMask(alphabetSize int) int {
for i := 1; i <= 8; i++ {
mask := (2 << uint(i)) - 1
if mask >= alphabetSize-1 {
return mask
}
}
return 0
}
// New returns new id or error
func New(opts ...Option) (string, error) {
options := NewOptions(opts...)
if len(options.Alphabet) == 0 || len(options.Alphabet) > 255 {
return "", errors.New("alphabet must not be empty and contain no more than 255 chars")
}
if options.Size <= 0 {
return "", errors.New("size must be positive integer")
}
chars := options.Alphabet
mask := getMask(len(chars))
// estimate how many random bytes we will need for the ID, we might actually need more but this is tradeoff
// between average case and worst case
ceilArg := 1.6 * float64(mask*options.Size) / float64(len(options.Alphabet))
step := int(math.Ceil(ceilArg))
id := make([]rune, options.Size)
bytes := make([]byte, step)
for j := 0; ; {
_, err := rand.Read(bytes)
if err != nil {
return "", err
}
for i := 0; i < step; i++ {
currByte := bytes[i] & byte(mask)
if currByte < byte(len(chars)) {
id[j] = chars[currByte]
j++
if j == options.Size {
return string(id[:options.Size]), nil
}
}
}
}
}
// Must is the same as New but fatals on error
func Must(opts ...Option) string {
id, err := New(opts...)
if err != nil {
logger.Fatal(context.TODO(), err)
}
return id
}
// Options contains id deneration options
type Options struct {
Alphabet []rune
Size int
}
// Option func signature
type Option func(*Options)
// Alphabet specifies alphabet to use
func Alphabet(alphabet string) Option {
return func(o *Options) {
o.Alphabet = []rune(alphabet)
}
}
// Size specifies id size
func Size(size int) Option {
return func(o *Options) {
o.Size = size
}
}
// NewOptions returns new Options struct filled by opts
func NewOptions(opts ...Option) Options {
options := Options{
Alphabet: DefaultAlphabet,
Size: DefaultSize,
}
for _, o := range opts {
o(&options)
}
return options
}

View File

@@ -7,8 +7,8 @@ import (
"github.com/unistack-org/micro/v3/util/rand"
)
// Do returns a random time to jitter with max cap specified
func Do(d time.Duration) time.Duration {
// Random returns a random time to jitter with max cap specified
func Random(d time.Duration) time.Duration {
var rng rand.Rand
v := rng.Float64() * float64(d.Nanoseconds())
return time.Duration(v)

65
util/jitter/ticker.go Normal file
View File

@@ -0,0 +1,65 @@
package jitter
import (
"time"
"github.com/unistack-org/micro/v3/util/rand"
)
// Ticker is similar to time.Ticker but ticks at random intervals between
// the min and max duration values (stored internally as int64 nanosecond
// counts).
type Ticker struct {
C chan time.Time
done chan chan struct{}
min int64
max int64
rng rand.Rand
}
// NewTicker returns a pointer to an initialized instance of the Ticker.
// Min and max are durations of the shortest and longest allowed
// ticks. Ticker will run in a goroutine until explicitly stopped.
func NewTicker(min, max time.Duration) *Ticker {
ticker := &Ticker{
C: make(chan time.Time),
done: make(chan chan struct{}),
min: min.Nanoseconds(),
max: max.Nanoseconds(),
}
go ticker.run()
return ticker
}
// Stop terminates the ticker goroutine and closes the C channel.
func (ticker *Ticker) Stop() {
c := make(chan struct{})
ticker.done <- c
<-c
}
func (ticker *Ticker) run() {
defer close(ticker.C)
t := time.NewTimer(ticker.nextInterval())
for {
// either a stop signal or a timeout
select {
case c := <-ticker.done:
t.Stop()
close(c)
return
case <-t.C:
select {
case ticker.C <- time.Now():
t.Stop()
t = time.NewTimer(ticker.nextInterval())
default:
// there could be noone receiving...
}
}
}
}
func (ticker *Ticker) nextInterval() time.Duration {
return time.Duration(ticker.rng.Int63n(ticker.max-ticker.min)+ticker.min) * time.Nanosecond
}

View File

@@ -5,8 +5,8 @@ import (
"sync"
"time"
"github.com/google/uuid"
"github.com/unistack-org/micro/v3/network/transport"
"github.com/unistack-org/micro/v3/util/id"
)
type pool struct {
@@ -87,9 +87,13 @@ func (p *pool) Get(ctx context.Context, addr string, opts ...transport.DialOptio
if err != nil {
return nil, err
}
id, err := id.New()
if err != nil {
return nil, err
}
return &poolConn{
Client: c,
id: uuid.New().String(),
id: id,
created: time.Now(),
}, nil
}

View File

@@ -1,17 +1,17 @@
package rand
import (
"crypto/rand"
crand "crypto/rand"
"encoding/binary"
)
// Rand is a wrapper around crypto/rand that adds some convenience functions known from math/rand.
// Rand is a wrapper around crypto/rand that adds some convenience functions known from math/rand
type Rand struct {
buf [8]byte
}
func (r *Rand) Int31() int32 {
_, _ = rand.Read(r.buf[:4])
_, _ = crand.Read(r.buf[:4])
return int32(binary.BigEndian.Uint32(r.buf[:4]) & ^uint32(1<<31))
}
@@ -54,11 +54,11 @@ func (r *Rand) Intn(n int) int {
}
func (r *Rand) Int63() int64 {
_, _ = rand.Read(r.buf[:])
_, _ = crand.Read(r.buf[:])
return int64(binary.BigEndian.Uint64(r.buf[:]) & ^uint64(1<<63))
}
// copied from the standard library math/rand implementation of Int63n
// Int31n copied from the standard library math/rand implementation of Int31n
func (r *Rand) Int31n(n int32) int32 {
if n&(n-1) == 0 { // n is power of two, can mask
return r.Int31() & (n - 1)
@@ -71,6 +71,7 @@ func (r *Rand) Int31n(n int32) int32 {
return v % n
}
// Int63n copied from the standard library math/rand implementation of Int63n
func (r *Rand) Int63n(n int64) int64 {
if n&(n-1) == 0 { // n is power of two, can mask
return r.Int63() & (n - 1)
@@ -82,3 +83,26 @@ func (r *Rand) Int63n(n int64) int64 {
}
return v % n
}
// Shuffle copied from the standard library math/rand implementation of Shuffle
func (r *Rand) Shuffle(n int, swap func(i, j int)) {
if n < 0 {
panic("invalid argument to Shuffle")
}
// Fisher-Yates shuffle: https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle
// Shuffle really ought not be called with n that doesn't fit in 32 bits.
// Not only will it take a very long time, but with 2³¹! possible permutations,
// there's no way that any PRNG can have a big enough internal state to
// generate even a minuscule percentage of the possible permutations.
// Nevertheless, the right API signature accepts an int n, so handle it as best we can.
i := n - 1
for ; i > 1<<31-1-1; i-- {
j := int(r.Int63n(int64(i + 1)))
swap(i, j)
}
for ; i > 0; i-- {
j := int(r.Int31n(int32(i + 1)))
swap(i, j)
}
}

View File

@@ -12,10 +12,9 @@ import (
// ErrInvalidParam specifies invalid url query params
var ErrInvalidParam = errors.New("invalid url query param provided")
// var timeKind = reflect.ValueOf(time.Time{}).Kind()
var bracketSplitter = regexp.MustCompile(`\[|\]`)
//var timeKind = reflect.ValueOf(time.Time{}).Kind()
type StructField struct {
Field reflect.StructField
Value reflect.Value
@@ -149,9 +148,9 @@ func StructFields(src interface{}) ([]StructField, error) {
}
switch val.Kind() {
//case timeKind:
//fmt.Printf("GGG\n")
//fields = append(fields, StructField{Field: fld, Value: val, Path: fld.Name})
// case timeKind:
// fmt.Printf("GGG\n")
// fields = append(fields, StructField{Field: fld, Value: val, Path: fld.Name})
case reflect.Struct:
infields, err := StructFields(val.Interface())
if err != nil {

View File

@@ -2,7 +2,7 @@ package reflect
import (
"net/url"
"reflect"
rfl "reflect"
rfl "reflect"
"testing"
)

View File

@@ -5,7 +5,7 @@ import (
"sync"
"time"
"github.com/google/uuid"
"github.com/unistack-org/micro/v3/util/id"
)
// Buffer is ring buffer
@@ -112,7 +112,7 @@ func (b *Buffer) Stream() (<-chan *Entry, chan bool) {
defer b.Unlock()
entries := make(chan *Entry, 128)
id := uuid.New().String()
id := id.Must()
stop := make(chan bool)
b.streams[id] = &Stream{

View File

@@ -6,9 +6,9 @@ import (
"fmt"
"time"
"github.com/google/uuid"
"github.com/unistack-org/micro/v3/auth"
"github.com/unistack-org/micro/v3/store"
"github.com/unistack-org/micro/v3/util/id"
"github.com/unistack-org/micro/v3/util/token"
)
@@ -44,7 +44,11 @@ func (b *Basic) Generate(acc *auth.Account, opts ...token.GenerateOption) (*toke
}
// write to the store
key := uuid.New().String()
key, err := id.New()
if err != nil {
return nil, err
}
err = b.store.Write(context.Background(), fmt.Sprintf("%v%v", StorePrefix, key), bytes, store.WriteTTL(options.Expiry))
if err != nil {
return nil, err

View File

@@ -4,7 +4,7 @@ import (
"encoding/base64"
"time"
"github.com/golang-jwt/jwt"
"github.com/golang-jwt/jwt/v4"
"github.com/unistack-org/micro/v3/auth"
"github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/util/token"
@@ -13,7 +13,7 @@ import (
// authClaims to be encoded in the JWT
type authClaims struct {
Metadata metadata.Metadata `json:"metadata"`
jwt.StandardClaims
jwt.RegisteredClaims
Type string `json:"type"`
Scopes []string `json:"scopes"`
}
@@ -50,10 +50,10 @@ func (j *JWT) Generate(acc *auth.Account, opts ...token.GenerateOption) (*token.
// generate the JWT
expiry := time.Now().Add(options.Expiry)
t := jwt.NewWithClaims(jwt.SigningMethodRS256, authClaims{
Type: acc.Type, Scopes: acc.Scopes, Metadata: acc.Metadata, StandardClaims: jwt.StandardClaims{
Type: acc.Type, Scopes: acc.Scopes, Metadata: acc.Metadata, RegisteredClaims: jwt.RegisteredClaims{
Subject: acc.ID,
Issuer: acc.Issuer,
ExpiresAt: expiry.Unix(),
ExpiresAt: jwt.NewNumericDate(expiry),
},
})
tok, err := t.SignedString(key)