Compare commits

..

No commits in common. "v3" and "upstream" have entirely different histories.
v3 ... upstream

21 changed files with 228 additions and 1122 deletions

View File

@ -1,24 +0,0 @@
---
name: Bug report
about: For reporting bugs in go-micro
title: "[BUG]"
labels: ''
assignees: ''
---
**Describe the bug**
1. What are you trying to do?
2. What did you expect to happen?
3. What happens instead?
**How to reproduce the bug:**
If possible, please include a minimal code snippet here.
**Environment:**
Go Version: please paste `go version` output here
```
please paste `go env` output here
```

View File

@ -1,17 +0,0 @@
---
name: Feature request / Enhancement
about: If you have a need not served by go-micro
title: "[FEATURE]"
labels: ''
assignees: ''
---
**Is your feature request related to a problem? Please describe.**
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
**Describe the solution you'd like**
A clear and concise description of what you want to happen.
**Additional context**
Add any other context or screenshots about the feature request here.

View File

@ -1,14 +0,0 @@
---
name: Question
about: Ask a question about go-micro
title: ''
labels: ''
assignees: ''
---
Before asking, please check if your question has already been answered:
1. Check the documentation - https://micro.mu/docs/
2. Check the examples and plugins - https://github.com/micro/examples & https://github.com/micro/go-plugins
3. Search existing issues

View File

@ -1,9 +0,0 @@
## Pull Request template
Please, go through these steps before clicking submit on this PR.
1. Give a descriptive title to your PR.
2. Provide a description of your changes.
3. Make sure you have some relevant tests.
4. Put `closes #XXXX` in your comment to auto-close the issue that your PR fixes (if applicable).
**PLEASE REMOVE THIS TEMPLATE BEFORE SUBMITTING**

View File

@ -1,19 +0,0 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for all configuration options:
# https://help.github.com/github/administering-a-repository/configuration-options-for-dependency-updates
version: 2
updates:
# Maintain dependencies for GitHub Actions
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "daily"
# Maintain dependencies for Golang
- package-ecosystem: "gomod"
directory: "/"
schedule:
interval: "daily"

View File

@ -1,20 +0,0 @@
name: "autoapprove"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
permissions:
pull-requests: write
contents: write
jobs:
autoapprove:
runs-on: ubuntu-latest
steps:
- name: approve
uses: hmarr/auto-approve-action@v3
if: github.actor == 'vtolstov' || github.actor == 'dependabot[bot]'
id: approve
with:
github-token: ${{ secrets.GITHUB_TOKEN }}

View File

@ -1,21 +0,0 @@
name: "automerge"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
permissions:
pull-requests: write
contents: write
jobs:
automerge:
runs-on: ubuntu-latest
if: github.actor == 'vtolstov'
steps:
- name: merge
id: merge
run: gh pr merge --auto --merge "$PR_URL"
env:
PR_URL: ${{github.event.pull_request.html_url}}
GITHUB_TOKEN: ${{secrets.TOKEN}}

View File

@ -1,47 +0,0 @@
name: build
on:
push:
branches:
- master
- v3
jobs:
test:
name: test
runs-on: ubuntu-latest
steps:
- name: setup
uses: actions/setup-go@v3
with:
go-version: 1.17
- name: checkout
uses: actions/checkout@v3
- name: cache
uses: actions/cache@v3
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: ${{ runner.os }}-go-
- name: deps
run: go get -v -t -d ./...
- name: test
env:
INTEGRATION_TESTS: yes
run: go test -mod readonly -v ./...
lint:
name: lint
runs-on: ubuntu-latest
steps:
- name: checkout
uses: actions/checkout@v3
- name: lint
uses: golangci/golangci-lint-action@v3.4.0
continue-on-error: true
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.30
# Optional: working directory, useful for monorepos
# working-directory: somedir
# Optional: golangci-lint command line arguments.
# args: --issues-exit-code=0
# Optional: show only new issues if it's a pull request. The default value is `false`.
# only-new-issues: true

View File

@ -1,78 +0,0 @@
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
#
# ******** NOTE ********
# We have attempted to detect the languages in your repository. Please check
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
#
name: "codeql"
on:
workflow_run:
workflows: ["prbuild"]
types:
- completed
push:
branches: [ master, v3 ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ master, v3 ]
schedule:
- cron: '34 1 * * 0'
jobs:
analyze:
name: analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write
strategy:
fail-fast: false
matrix:
language: [ 'go' ]
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python' ]
# Learn more:
# https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed
steps:
- name: checkout
uses: actions/checkout@v3
- name: setup
uses: actions/setup-go@v3
with:
go-version: 1.17
# Initializes the CodeQL tools for scanning.
- name: init
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# queries: ./path/to/local/query, your-org/your-repo/queries@main
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: autobuild
uses: github/codeql-action/autobuild@v2
# Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
# ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
# and modify them (or add more) to build your code if your project
# uses a compiled language
#- run: |
# make bootstrap
# make release
- name: analyze
uses: github/codeql-action/analyze@v2

View File

@ -1,27 +0,0 @@
name: "dependabot-automerge"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
permissions:
pull-requests: write
contents: write
jobs:
automerge:
runs-on: ubuntu-latest
if: github.actor == 'dependabot[bot]'
steps:
- name: metadata
id: metadata
uses: dependabot/fetch-metadata@v1.3.6
with:
github-token: "${{ secrets.TOKEN }}"
- name: merge
id: merge
if: ${{contains(steps.metadata.outputs.dependency-names, 'go.unistack.org')}}
run: gh pr merge --auto --merge "$PR_URL"
env:
PR_URL: ${{github.event.pull_request.html_url}}
GITHUB_TOKEN: ${{secrets.TOKEN}}

View File

@ -1,47 +0,0 @@
name: prbuild
on:
pull_request:
branches:
- master
- v3
jobs:
test:
name: test
runs-on: ubuntu-latest
steps:
- name: setup
uses: actions/setup-go@v3
with:
go-version: 1.17
- name: checkout
uses: actions/checkout@v3
- name: cache
uses: actions/cache@v3
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: ${{ runner.os }}-go-
- name: deps
run: go get -v -t -d ./...
- name: test
env:
INTEGRATION_TESTS: yes
run: go test -mod readonly -v ./...
lint:
name: lint
runs-on: ubuntu-latest
steps:
- name: checkout
uses: actions/checkout@v3
- name: lint
uses: golangci/golangci-lint-action@v3.4.0
continue-on-error: true
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.30
# Optional: working directory, useful for monorepos
# working-directory: somedir
# Optional: golangci-lint command line arguments.
# args: --issues-exit-code=0
# Optional: show only new issues if it's a pull request. The default value is `false`.
# only-new-issues: true

View File

@ -1,44 +0,0 @@
run:
concurrency: 4
deadline: 5m
issues-exit-code: 1
tests: true
linters-settings:
govet:
check-shadowing: true
enable:
- fieldalignment
linters:
enable:
- govet
- deadcode
- errcheck
- govet
- ineffassign
- staticcheck
- structcheck
- typecheck
- unused
- varcheck
- bodyclose
- gci
- goconst
- gocritic
- gosimple
- gofmt
- gofumpt
- goimports
- golint
- gosec
- makezero
- misspell
- nakedret
- nestif
- nilerr
- noctx
- prealloc
- unconvert
- unparam
disable-all: false

View File

@ -1 +0,0 @@
0a6e451539fd046bbfea625b05ef2e3006c69214

191
LICENSE
View File

@ -1,191 +0,0 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
Copyright 2020 Unistack LLC.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

5
go.mod
View File

@ -1,5 +0,0 @@
module go.unistack.org/micro-router-register/v3
go 1.16
require go.unistack.org/micro/v3 v3.10.14

8
go.sum
View File

@ -1,8 +0,0 @@
github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
go.unistack.org/micro/v3 v3.10.14 h1:7fgLpwGlCN67twhwtngJDEQvrMkUBDSA5vzZqxIDqNE=
go.unistack.org/micro/v3 v3.10.14/go.mod h1:uMAc0U/x7dmtICCrblGf0ZLgYegu3VwQAquu+OFCw1Q=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -1,13 +1,14 @@
package register // import "go.unistack.org/micro-router-register/v3" package registry
import ( import (
"fmt" "fmt"
"strings"
"sync" "sync"
"time" "time"
"go.unistack.org/micro/v3/logger" "github.com/micro/go-micro/v3/logger"
"go.unistack.org/micro/v3/register" "github.com/micro/go-micro/v3/registry"
"go.unistack.org/micro/v3/router" "github.com/micro/go-micro/v3/router"
) )
var ( var (
@ -20,26 +21,33 @@ var (
// rtr implements router interface // rtr implements router interface
type rtr struct { type rtr struct {
sync.RWMutex sync.RWMutex
running bool
table *table table *table
options router.Options
exit chan bool exit chan bool
initChan chan bool initChan chan bool
opts router.Options
running bool
} }
// NewRouter creates new router and returns it // NewRouter creates new router and returns it
func NewRouter(opts ...router.Option) router.Router { func NewRouter(opts ...router.Option) router.Router {
options := router.NewOptions(opts...) // get default options
options := router.DefaultOptions()
// apply requested options
for _, o := range opts {
o(&options)
}
// construct the router // construct the router
r := &rtr{ r := &rtr{
opts: options, options: options,
initChan: make(chan bool), initChan: make(chan bool),
} }
// create the new table, passing the fetchRoute method in as a fallback if // create the new table, passing the fetchRoute method in as a fallback if
// the table doesn't contain the result for a query. // the table doesn't contain the result for a query.
r.table = newTable(r.lookup) r.table = newTable()
// start the router // start the router
r.start() r.start()
@ -50,16 +58,12 @@ func NewRouter(opts ...router.Option) router.Router {
func (r *rtr) Init(opts ...router.Option) error { func (r *rtr) Init(opts ...router.Option) error {
r.Lock() r.Lock()
for _, o := range opts { for _, o := range opts {
o(&r.opts) o(&r.options)
} }
r.Unlock() r.Unlock()
if r.opts.Register == nil {
return fmt.Errorf("register not set")
}
// push a message to the init chan so the watchers // push a message to the init chan so the watchers
// can reset in the case the register was changed // can reset in the case the registry was changed
go func() { go func() {
r.initChan <- true r.initChan <- true
}() }()
@ -72,7 +76,7 @@ func (r *rtr) Options() router.Options {
r.RLock() r.RLock()
defer r.RUnlock() defer r.RUnlock()
options := r.opts options := r.options
return options return options
} }
@ -84,9 +88,9 @@ func (r *rtr) Table() router.Table {
return r.table return r.table
} }
func getDomain(srv *register.Service) string { func getDomain(srv *registry.Service) string {
// check the service metadata for domain // check the service metadata for domain
// TODO: domain as Domain field in register? // TODO: domain as Domain field in registry?
if srv.Metadata != nil && len(srv.Metadata["domain"]) > 0 { if srv.Metadata != nil && len(srv.Metadata["domain"]) > 0 {
return srv.Metadata["domain"] return srv.Metadata["domain"]
} else if len(srv.Nodes) > 0 && srv.Nodes[0].Metadata != nil { } else if len(srv.Nodes) > 0 && srv.Nodes[0].Metadata != nil {
@ -95,7 +99,7 @@ func getDomain(srv *register.Service) string {
// otherwise return wildcard // otherwise return wildcard
// TODO: return GlobalDomain or PublicDomain // TODO: return GlobalDomain or PublicDomain
return register.DefaultDomain return registry.DefaultDomain
} }
// manageRoute applies action on a given route // manageRoute applies action on a given route
@ -121,8 +125,8 @@ func (r *rtr) manageRoute(route router.Route, action string) error {
} }
// createRoutes turns a service into a list routes basically converting nodes to routes // createRoutes turns a service into a list routes basically converting nodes to routes
func (r *rtr) createRoutes(service *register.Service, network string) []router.Route { func (r *rtr) createRoutes(service *registry.Service, network string) []router.Route {
routes := make([]router.Route, 0, len(service.Nodes)) var routes []router.Route
for _, node := range service.Nodes { for _, node := range service.Nodes {
routes = append(routes, router.Route{ routes = append(routes, router.Route{
@ -130,9 +134,9 @@ func (r *rtr) createRoutes(service *register.Service, network string) []router.R
Address: node.Address, Address: node.Address,
Gateway: "", Gateway: "",
Network: network, Network: network,
Router: r.opts.ID, Router: r.options.Id,
Link: router.DefaultLink, Link: router.DefaultLink,
Metric: router.DefaultLocalMetric, Metric: router.DefaultMetric,
Metadata: node.Metadata, Metadata: node.Metadata,
}) })
} }
@ -142,7 +146,10 @@ func (r *rtr) createRoutes(service *register.Service, network string) []router.R
// manageServiceRoutes applies action to all routes of the service. // manageServiceRoutes applies action to all routes of the service.
// It returns error of the action fails with error. // It returns error of the action fails with error.
func (r *rtr) manageRoutes(service *register.Service, action, network string) error { func (r *rtr) manageRoutes(service *registry.Service, action, network string) error {
// action is the routing table action
action = strings.ToLower(action)
// create a set of routes from the service // create a set of routes from the service
routes := r.createRoutes(service, network) routes := r.createRoutes(service, network)
@ -157,9 +164,7 @@ func (r *rtr) manageRoutes(service *register.Service, action, network string) er
// create the routes in the table // create the routes in the table
for _, route := range routes { for _, route := range routes {
if r.opts.Logger.V(logger.TraceLevel) { logger.Tracef("Creating route %v domain: %v", route, network)
r.opts.Logger.Tracef(r.opts.Context, "Creating route %v domain: %v", route, network)
}
if err := r.manageRoute(route, action); err != nil { if err := r.manageRoute(route, action); err != nil {
return err return err
} }
@ -168,10 +173,10 @@ func (r *rtr) manageRoutes(service *register.Service, action, network string) er
return nil return nil
} }
// manageRegisterRoutes applies action to all routes of each service found in the register. // manageRegistryRoutes applies action to all routes of each service found in the registry.
// It returns error if either the services failed to be listed or the routing table action fails. // It returns error if either the services failed to be listed or the routing table action fails.
func (r *rtr) loadRoutes(reg register.Register) error { func (r *rtr) loadRoutes(reg registry.Registry) error {
services, err := reg.ListServices(r.opts.Context, register.ListDomain(register.WildcardDomain)) services, err := reg.ListServices(registry.ListDomain(registry.WildcardDomain))
if err != nil { if err != nil {
return fmt.Errorf("failed listing services: %v", err) return fmt.Errorf("failed listing services: %v", err)
} }
@ -186,9 +191,7 @@ func (r *rtr) loadRoutes(reg register.Register) error {
// if the routes exist save them // if the routes exist save them
if len(routes) > 0 { if len(routes) > 0 {
if r.opts.Logger.V(logger.TraceLevel) { logger.Tracef("Creating routes for service %v domain: %v", service, domain)
r.opts.Logger.Tracef(r.opts.Context, "Creating routes for service %v domain: %v", service, domain)
}
for _, rt := range routes { for _, rt := range routes {
err := r.table.Create(rt) err := r.table.Create(rt)
@ -198,9 +201,7 @@ func (r *rtr) loadRoutes(reg register.Register) error {
} }
if err != nil { if err != nil {
if r.opts.Logger.V(logger.ErrorLevel) { logger.Errorf("Error creating route for service %v in domain %v: %v", service, domain, err)
r.opts.Logger.Errorf(r.opts.Context, "Error creating route for service %v in domain %v: %v", service, domain, err)
}
} }
} }
continue continue
@ -209,11 +210,9 @@ func (r *rtr) loadRoutes(reg register.Register) error {
// otherwise get all the service info // otherwise get all the service info
// get the service to retrieve all its info // get the service to retrieve all its info
srvs, err := reg.LookupService(r.opts.Context, service.Name, register.LookupDomain(domain)) srvs, err := reg.GetService(service.Name, registry.GetDomain(domain))
if err != nil { if err != nil {
if r.opts.Logger.V(logger.TraceLevel) { logger.Tracef("Failed to get service %s domain: %s", service.Name, domain)
r.opts.Logger.Tracef(r.opts.Context, "Failed to get service %s domain: %s", service.Name, domain)
}
continue continue
} }
@ -222,9 +221,7 @@ func (r *rtr) loadRoutes(reg register.Register) error {
routes := r.createRoutes(srv, domain) routes := r.createRoutes(srv, domain)
if len(routes) > 0 { if len(routes) > 0 {
if r.opts.Logger.V(logger.TraceLevel) { logger.Tracef("Creating routes for service %v domain: %v", srv, domain)
r.opts.Logger.Tracef(r.opts.Context, "Creating routes for service %v domain: %v", srv, domain)
}
for _, rt := range routes { for _, rt := range routes {
err := r.table.Create(rt) err := r.table.Create(rt)
@ -234,9 +231,7 @@ func (r *rtr) loadRoutes(reg register.Register) error {
} }
if err != nil { if err != nil {
if r.opts.Logger.V(logger.ErrorLevel) { logger.Errorf("Error creating route for service %v in domain %v: %v", service, domain, err)
r.opts.Logger.Errorf(r.opts.Context, "Error creating route for service %v in domain %v: %v", service, domain, err)
}
} }
} }
} }
@ -246,27 +241,52 @@ func (r *rtr) loadRoutes(reg register.Register) error {
return nil return nil
} }
// lookup retrieves all the routes for a given service and creates them in the routing table // Close the router
func (r *rtr) lookup(service string) ([]router.Route, error) { func (r *rtr) Close() error {
if r.opts.Logger.V(logger.TraceLevel) { r.Lock()
r.opts.Logger.Tracef(r.opts.Context, "Fetching route for %s domain: %v", service, register.WildcardDomain) defer r.Unlock()
select {
case <-r.exit:
return nil
default:
if !r.running {
return nil
}
close(r.exit)
} }
services, err := r.opts.Register.LookupService(r.opts.Context, service, register.LookupDomain(register.WildcardDomain)) r.running = false
if err == register.ErrNotFound { return nil
if r.opts.Logger.V(logger.TraceLevel) { }
r.opts.Logger.Tracef(r.opts.Context, "Failed to find route for %s", service)
// lookup retrieves all the routes for a given service and creates them in the routing table
func (r *rtr) Lookup(service string, opts ...router.LookupOption) ([]router.Route, error) {
q := router.NewLookup(opts...)
// if we find the routes filter and return them
routes, err := r.table.Read(router.ReadService(service))
if err == nil {
routes = router.Filter(routes, q)
if len(routes) == 0 {
return nil, router.ErrRouteNotFound
} }
return routes, nil
}
// lookup the route
logger.Tracef("Fetching route for %s domain: %v", service, registry.WildcardDomain)
services, err := r.options.Registry.GetService(service, registry.GetDomain(registry.WildcardDomain))
if err == registry.ErrNotFound {
logger.Tracef("Failed to find route for %s", service)
return nil, router.ErrRouteNotFound return nil, router.ErrRouteNotFound
} else if err != nil { } else if err != nil {
if r.opts.Logger.V(logger.TraceLevel) { logger.Tracef("Failed to find route for %s: %v", service, err)
r.opts.Logger.Tracef(r.opts.Context, "Failed to find route for %s: %v", service, err)
}
return nil, fmt.Errorf("failed getting services: %v", err) return nil, fmt.Errorf("failed getting services: %v", err)
} }
var routes []router.Route
for _, srv := range services { for _, srv := range services {
domain := getDomain(srv) domain := getDomain(srv)
// TODO: should we continue to send the event indicating we created a route? // TODO: should we continue to send the event indicating we created a route?
@ -274,12 +294,23 @@ func (r *rtr) lookup(service string) ([]router.Route, error) {
routes = append(routes, r.createRoutes(srv, domain)...) routes = append(routes, r.createRoutes(srv, domain)...)
} }
// if we're supposed to cache then save the routes
if r.options.Cache {
for _, route := range routes {
r.table.Create(route)
}
}
routes = router.Filter(routes, q)
if len(routes) == 0 {
return nil, router.ErrRouteNotFound
}
return routes, nil return routes, nil
} }
// watchRegister watches register and updates routing table based on the received events. // watchRegistry watches registry and updates routing table based on the received events.
// It returns error if either the register watcher fails with error or if the routing table update fails. // It returns error if either the registry watcher fails with error or if the routing table update fails.
func (r *rtr) watchRegister(w register.Watcher) error { func (r *rtr) watchRegistry(w registry.Watcher) error {
exit := make(chan bool) exit := make(chan bool)
defer func() { defer func() {
@ -303,7 +334,7 @@ func (r *rtr) watchRegister(w register.Watcher) error {
// get the next service // get the next service
res, err := w.Next() res, err := w.Next()
if err != nil { if err != nil {
if err != register.ErrWatcherStopped { if err != registry.ErrWatcherStopped {
return err return err
} }
break break
@ -311,15 +342,11 @@ func (r *rtr) watchRegister(w register.Watcher) error {
// don't process nil entries // don't process nil entries
if res.Service == nil { if res.Service == nil {
if logger.V(logger.TraceLevel) { logger.Trace("Received a nil service")
logger.Trace(r.opts.Context, "Received a nil service")
}
continue continue
} }
if r.opts.Logger.V(logger.TraceLevel) { logger.Tracef("Router dealing with next route %s %+v\n", res.Action, res.Service)
r.opts.Logger.Tracef(r.opts.Context, "Router dealing with next route %s %+v\n", res.Action, res.Service)
}
// get the services domain from metadata. Fallback to wildcard. // get the services domain from metadata. Fallback to wildcard.
domain := getDomain(res.Service) domain := getDomain(res.Service)
@ -339,24 +366,17 @@ func (r *rtr) start() error {
return nil return nil
} }
if r.opts.Precache {
// add all local service routes into the routing table
if err := r.loadRoutes(r.opts.Register); err != nil {
return fmt.Errorf("failed loading register routes: %s", err)
}
}
// add default gateway into routing table // add default gateway into routing table
if r.opts.Gateway != "" { if r.options.Gateway != "" {
// note, the only non-default value is the gateway // note, the only non-default value is the gateway
route := router.Route{ route := router.Route{
Service: "*", Service: "*",
Address: "*", Address: "*",
Gateway: r.opts.Gateway, Gateway: r.options.Gateway,
Network: "*", Network: "*",
Router: r.opts.ID, Router: r.options.Id,
Link: router.DefaultLink, Link: router.DefaultLink,
Metric: router.DefaultLocalMetric, Metric: router.DefaultMetric,
} }
if err := r.table.Create(route); err != nil { if err := r.table.Create(route); err != nil {
return fmt.Errorf("failed adding default gateway route: %s", err) return fmt.Errorf("failed adding default gateway route: %s", err)
@ -365,27 +385,59 @@ func (r *rtr) start() error {
// create error and exit channels // create error and exit channels
r.exit = make(chan bool) r.exit = make(chan bool)
r.running = true
// periodically refresh all the routes // only cache if told to do so
if !r.options.Cache {
return nil
}
// create a refresh notify channel
refresh := make(chan bool, 1)
// fires the refresh for loading routes
refreshRoutes := func() {
select {
case refresh <- true:
default:
}
}
// refresh all the routes in the event of a failure watching the registry
go func() { go func() {
t1 := time.NewTicker(RefreshInterval) var lastRefresh time.Time
defer t1.Stop()
t2 := time.NewTicker(PruneInterval) // load a refresh
defer t2.Stop() refreshRoutes()
for { for {
select { select {
case <-r.exit: case <-r.exit:
return return
case <-t2.C: case <-refresh:
r.table.pruneRoutes(RefreshInterval) // don't refresh if we've done so in the past minute
case <-t1.C: if !lastRefresh.IsZero() && time.Since(lastRefresh) < time.Minute {
if err := r.loadRoutes(r.opts.Register); err != nil { continue
if r.opts.Logger.V(logger.DebugLevel) {
r.opts.Logger.Debugf(r.opts.Context, "failed refreshing register routes: %s", err)
}
} }
// load new routes
if err := r.loadRoutes(r.options.Registry); err != nil {
logger.Debugf("failed refreshing registry routes: %s", err)
// in this don't prune
continue
}
// first time so nothing to prune
if !lastRefresh.IsZero() {
// prune any routes since last refresh since we've
// updated basically everything we care about
r.table.pruneRoutes(time.Since(lastRefresh))
}
// update the refresh time
lastRefresh = time.Now()
case <-time.After(RefreshInterval):
refreshRoutes()
} }
} }
}() }()
@ -396,69 +448,40 @@ func (r *rtr) start() error {
case <-r.exit: case <-r.exit:
return return
default: default:
if r.opts.Logger.V(logger.TraceLevel) { logger.Tracef("Router starting registry watch")
r.opts.Logger.Tracef(r.opts.Context, "Router starting register watch") w, err := r.options.Registry.Watch(registry.WatchDomain(registry.WildcardDomain))
}
w, err := r.opts.Register.Watch(r.opts.Context, register.WatchDomain(register.WildcardDomain))
if err != nil { if err != nil {
if r.opts.Logger.V(logger.DebugLevel) { if logger.V(logger.DebugLevel, logger.DefaultLogger) {
r.opts.Logger.Debug(r.opts.Context, "failed creating register watcher: %v", err) logger.Debugf("failed creating registry watcher: %v", err)
} }
time.Sleep(time.Second) time.Sleep(time.Second)
// in the event of an error reload routes
refreshRoutes()
continue continue
} }
// watchRegister calls stop when it's done // watchRegistry calls stop when it's done
if err := r.watchRegister(w); err != nil { if err := r.watchRegistry(w); err != nil {
if r.opts.Logger.V(logger.DebugLevel) { if logger.V(logger.DebugLevel, logger.DefaultLogger) {
r.opts.Logger.Debugf(r.opts.Context, "Error watching the register: %v", err) logger.Debugf("Error watching the registry: %v", err)
} }
time.Sleep(time.Second) time.Sleep(time.Second)
// in the event of an error reload routes
refreshRoutes()
} }
} }
} }
}() }()
r.running = true
return nil return nil
} }
// Lookup routes in the routing table
func (r *rtr) Lookup(q ...router.QueryOption) ([]router.Route, error) {
return r.Table().Query(q...)
}
// Watch routes // Watch routes
func (r *rtr) Watch(opts ...router.WatchOption) (router.Watcher, error) { func (r *rtr) Watch(opts ...router.WatchOption) (router.Watcher, error) {
return r.table.Watch(opts...) return r.table.Watch(opts...)
} }
// Close the router
func (r *rtr) Close() error {
r.Lock()
defer r.Unlock()
select {
case <-r.exit:
return nil
default:
if !r.running {
return nil
}
close(r.exit)
}
r.running = false
return nil
}
// String prints debugging information about router // String prints debugging information about router
func (r *rtr) String() string { func (r *rtr) String() string {
return "register" return "registry"
}
func (r *rtr) Name() string {
return r.opts.Name
} }

View File

@ -1,19 +1,16 @@
//go:build ignore package registry
// +build ignore
package register
import ( import (
"os" "os"
"testing" "testing"
"go.unistack.org/micro/v3/register/memory" "github.com/micro/go-micro/v3/registry/memory"
"go.unistack.org/micro/v3/router" "github.com/micro/go-micro/v3/router"
) )
func routerTestSetup() router.Router { func routerTestSetup() router.Router {
r := memory.NewRegister() r := memory.NewRegistry()
return NewRouter(router.Register(r)) return NewRouter(router.Registry(r))
} }
func TestRouterClose(t *testing.T) { func TestRouterClose(t *testing.T) {
@ -22,7 +19,7 @@ func TestRouterClose(t *testing.T) {
if err := r.Close(); err != nil { if err := r.Close(); err != nil {
t.Errorf("failed to stop router: %v", err) t.Errorf("failed to stop router: %v", err)
} }
if len(os.Getenv("INTEGRATION_TESTS")) == 0 { if len(os.Getenv("IN_TRAVIS_CI")) == 0 {
t.Logf("TestRouterStartStop STOPPED") t.Logf("TestRouterStartStop STOPPED")
} }
} }

204
table.go
View File

@ -1,38 +1,33 @@
package register package registry
import ( import (
"sync" "sync"
"time" "time"
"go.unistack.org/micro/v3/logger" "github.com/google/uuid"
"go.unistack.org/micro/v3/router" "github.com/micro/go-micro/v3/logger"
"go.unistack.org/micro/v3/util/id" "github.com/micro/go-micro/v3/router"
) )
// table is an in-memory routing table // table is an in-memory routing table
type table struct { type table struct {
sync.RWMutex sync.RWMutex
// lookup for a service
lookup func(string) ([]router.Route, error)
// routes stores service routes // routes stores service routes
routes map[string]map[uint64]*route routes map[string]map[uint64]*route
// watchers stores table watchers // watchers stores table watchers
watchers map[string]*tableWatcher watchers map[string]*tableWatcher
opts router.Options
} }
type route struct { type route struct {
updated time.Time
route router.Route route router.Route
updated time.Time
} }
// newtable creates a new routing table and returns it // newtable creates a new routing table and returns it
func newTable(lookup func(string) ([]router.Route, error), opts ...router.Option) *table { func newTable() *table {
return &table{ return &table{
lookup: lookup,
routes: make(map[string]map[uint64]*route), routes: make(map[string]map[uint64]*route),
watchers: make(map[string]*tableWatcher), watchers: make(map[string]*tableWatcher),
opts: router.NewOptions(opts...),
} }
} }
@ -96,8 +91,8 @@ func (t *table) sendEvent(e *router.Event) {
t.RLock() t.RLock()
defer t.RUnlock() defer t.RUnlock()
if len(e.ID) == 0 { if len(e.Id) == 0 {
e.ID, _ = id.New() e.Id = uuid.New().String()
} }
for _, w := range t.watchers { for _, w := range t.watchers {
@ -129,10 +124,10 @@ func (t *table) Create(r router.Route) error {
} }
// create the route // create the route
t.routes[service][sum] = &route{updated: time.Now(), route: r} t.routes[service][sum] = &route{r, time.Now()}
if t.opts.Logger.V(logger.DebugLevel) { if logger.V(logger.DebugLevel, logger.DefaultLogger) {
t.opts.Logger.Debugf(t.opts.Context, "Router emitting %s for route: %s", router.Create, r.Address) logger.Debugf("Router emitting %s for route: %s", router.Create, r.Address)
} }
// send a route created event // send a route created event
@ -165,8 +160,8 @@ func (t *table) Delete(r router.Route) error {
delete(t.routes, service) delete(t.routes, service)
} }
if t.opts.Logger.V(logger.DebugLevel) { if logger.V(logger.DebugLevel, logger.DefaultLogger) {
t.opts.Logger.Debugf(t.opts.Context, "Router emitting %s for route: %s", router.Delete, r.Address) logger.Debugf("Router emitting %s for route: %s", router.Delete, r.Address)
} }
go t.sendEvent(&router.Event{Type: router.Delete, Timestamp: time.Now(), Route: r}) go t.sendEvent(&router.Event{Type: router.Delete, Timestamp: time.Now(), Route: r})
@ -188,170 +183,55 @@ func (t *table) Update(r router.Route) error {
if _, ok := t.routes[service][sum]; !ok { if _, ok := t.routes[service][sum]; !ok {
// update the route // update the route
t.routes[service][sum] = &route{updated: time.Now(), route: r} t.routes[service][sum] = &route{r, time.Now()}
if t.opts.Logger.V(logger.DebugLevel) { if logger.V(logger.DebugLevel, logger.DefaultLogger) {
t.opts.Logger.Debugf(t.opts.Context, "Router emitting %s for route: %s", router.Update, r.Address) logger.Debugf("Router emitting %s for route: %s", router.Update, r.Address)
} }
go t.sendEvent(&router.Event{Type: router.Update, Timestamp: time.Now(), Route: r}) go t.sendEvent(&router.Event{Type: router.Update, Timestamp: time.Now(), Route: r})
return nil return nil
} }
// just update the route, but dont emit Update event // just update the route, but dont emit Update event
t.routes[service][sum] = &route{updated: time.Now(), route: r} t.routes[service][sum] = &route{r, time.Now()}
return nil return nil
} }
// List returns a list of all routes in the table // Read entries from the table
func (t *table) List() ([]router.Route, error) { func (t *table) Read(opts ...router.ReadOption) ([]router.Route, error) {
var options router.ReadOptions
for _, o := range opts {
o(&options)
}
t.RLock() t.RLock()
defer t.RUnlock() defer t.RUnlock()
var routes []router.Route var routes []router.Route
for _, rmap := range t.routes {
for _, route := range rmap { // get the routes based on options passed
routes = append(routes, route.route) if len(options.Service) > 0 {
routeMap, ok := t.routes[options.Service]
if !ok {
return nil, router.ErrRouteNotFound
}
for _, rt := range routeMap {
routes = append(routes, rt.route)
}
return routes, nil
}
// otherwise get all routes
for _, serviceRoutes := range t.routes {
for _, rt := range serviceRoutes {
routes = append(routes, rt.route)
} }
} }
return routes, nil return routes, nil
} }
// isMatch checks if the route matches given query options
func isMatch(route router.Route, address, gateway, network, rtr, link string) bool {
// matches the values provided
match := func(a, b string) bool {
if a == "*" || b == "*" || a == b {
return true
}
return false
}
// a simple struct to hold our values
type compare struct {
a string
b string
}
// compare the following values
values := []compare{
{gateway, route.Gateway},
{network, route.Network},
{rtr, route.Router},
{address, route.Address},
{link, route.Link},
}
for _, v := range values {
// attempt to match each value
if !match(v.a, v.b) {
return false
}
}
return true
}
// filterRoutes finds all the routes for given network and router and returns them
func filterRoutes(routes map[uint64]*route, opts router.QueryOptions) []router.Route {
address := opts.Address
gateway := opts.Gateway
network := opts.Network
rtr := opts.Router
link := opts.Link
// routeMap stores the routes we're going to advertise
routeMap := make(map[string][]router.Route)
var routeCnt int
for _, rt := range routes {
// get the actual route
route := rt.route
if isMatch(route, address, gateway, network, rtr, link) {
// add matchihg route to the routeMap
routeKey := route.Service + "@" + route.Network
routeMap[routeKey] = append(routeMap[routeKey], route)
routeCnt++
}
}
results := make([]router.Route, 0, routeCnt)
for _, route := range routeMap {
results = append(results, route...)
}
return results
}
// Lookup queries routing table and returns all routes that match the lookup query
func (t *table) Query(q ...router.QueryOption) ([]router.Route, error) {
// create new query options
opts := router.NewQuery(q...)
// create a cwslicelist of query results
results := make([]router.Route, 0, len(t.routes))
// readAndFilter routes for this service under read lock.
readAndFilter := func(q router.QueryOptions) ([]router.Route, bool) {
t.RLock()
defer t.RUnlock()
routes, ok := t.routes[q.Service]
if !ok || len(routes) == 0 {
return nil, false
}
return filterRoutes(routes, q), true
}
if opts.Service != "*" {
// try and load services from the cache
if routes, ok := readAndFilter(opts); ok {
return routes, nil
}
// lookup the route and try again
// TODO: move this logic out of the hot path
// being hammered on queries will require multiple lookups
routes, err := t.lookup(opts.Service)
if err != nil {
return nil, err
}
// cache the routes
for _, rt := range routes {
t.Create(rt)
}
// try again
if routes, ok := readAndFilter(opts); ok {
return routes, nil
}
return nil, router.ErrRouteNotFound
}
// search through all destinations
t.RLock()
for _, routes := range t.routes {
// filter the routes
found := filterRoutes(routes, opts)
// ensure we don't append zero length routes
if len(found) == 0 {
continue
}
results = append(results, found...)
}
t.RUnlock()
return results, nil
}
// Watch returns routing table entry watcher // Watch returns routing table entry watcher
func (t *table) Watch(opts ...router.WatchOption) (router.Watcher, error) { func (t *table) Watch(opts ...router.WatchOption) (router.Watcher, error) {
// by default watch everything // by default watch everything
@ -364,11 +244,11 @@ func (t *table) Watch(opts ...router.WatchOption) (router.Watcher, error) {
} }
w := &tableWatcher{ w := &tableWatcher{
id: uuid.New().String(),
opts: wopts, opts: wopts,
resChan: make(chan *router.Event, 10), resChan: make(chan *router.Event, 10),
done: make(chan struct{}), done: make(chan struct{}),
} }
w.id, _ = id.New()
// when the watcher is stopped delete it // when the watcher is stopped delete it
go func() { go func() {

View File

@ -1,23 +1,13 @@
//go:build ignore package registry
// +build ignore
package register
import ( import (
"fmt"
"testing" "testing"
"go.unistack.org/micro/v3/router" "github.com/micro/go-micro/v3/router"
) )
func testSetup(t *testing.T) (*table, router.Route) { func testSetup() (*table, router.Route) {
r, err := NewRouter() table := newTable()
if err != nil {
t.Fatal(err)
}
routr := r.(*rtr)
table := newTable(routr.lookup)
route := router.Route{ route := router.Route{
Service: "dest.svc", Service: "dest.svc",
@ -33,7 +23,7 @@ func testSetup(t *testing.T) (*table, router.Route) {
} }
func TestCreate(t *testing.T) { func TestCreate(t *testing.T) {
table, route := testSetup(t) table, route := testSetup()
if err := table.Create(route); err != nil { if err := table.Create(route); err != nil {
t.Fatalf("error adding route: %s", err) t.Fatalf("error adding route: %s", err)
@ -53,13 +43,13 @@ func TestCreate(t *testing.T) {
} }
func TestDelete(t *testing.T) { func TestDelete(t *testing.T) {
table, route := testSetup(t) table, route := testSetup()
if err := table.Create(route); err != nil { if err := table.Create(route); err != nil {
t.Fatalf("error adding route: %s", err) t.Fatalf("error adding route: %s", err)
} }
// should fail to delete non-existent route // should fail to delete non-existant route
prevSvc := route.Service prevSvc := route.Service
route.Service = "randDest" route.Service = "randDest"
@ -76,7 +66,7 @@ func TestDelete(t *testing.T) {
} }
func TestUpdate(t *testing.T) { func TestUpdate(t *testing.T) {
table, route := testSetup(t) table, route := testSetup()
if err := table.Create(route); err != nil { if err := table.Create(route); err != nil {
t.Fatalf("error adding route: %s", err) t.Fatalf("error adding route: %s", err)
@ -98,7 +88,7 @@ func TestUpdate(t *testing.T) {
} }
func TestList(t *testing.T) { func TestList(t *testing.T) {
table, route := testSetup(t) table, route := testSetup()
svc := []string{"one.svc", "two.svc", "three.svc"} svc := []string{"one.svc", "two.svc", "three.svc"}
@ -109,7 +99,7 @@ func TestList(t *testing.T) {
} }
} }
routes, err := table.List() routes, err := table.Read()
if err != nil { if err != nil {
t.Fatalf("error listing routes: %s", err) t.Fatalf("error listing routes: %s", err)
} }
@ -120,234 +110,22 @@ func TestList(t *testing.T) {
} }
func TestQuery(t *testing.T) { func TestQuery(t *testing.T) {
table, route := testSetup(t) table, route := testSetup()
svc := []string{"svc1", "svc2", "svc3", "svc1"} if err := table.Create(route); err != nil {
net := []string{"net1", "net2", "net1", "net3"} t.Fatalf("error adding route: %s", err)
gw := []string{"gw1", "gw2", "gw3", "gw3"}
rtr := []string{"rtr1", "rt2", "rt3", "rtr3"}
for i := 0; i < len(svc); i++ {
route.Service = svc[i]
route.Network = net[i]
route.Gateway = gw[i]
route.Router = rtr[i]
route.Link = router.DefaultLink
if err := table.Create(route); err != nil {
t.Fatalf("error adding route: %s", err)
}
} }
// return all routes rt, err := table.Read(router.ReadService(route.Service))
routes, err := table.Query()
if err != nil { if err != nil {
t.Fatalf("error looking up routes: %s", err) t.Fatal("Expected a route got err", err)
} else if len(routes) == 0 {
t.Fatalf("error looking up routes: not found")
} }
// query routes particular network if len(rt) != 1 {
network := "net1" t.Fatalf("Expected one route got %d", len(rt))
routes, err = table.Query(router.QueryNetwork(network))
if err != nil {
t.Fatalf("error looking up routes: %s", err)
} }
if len(routes) != 2 { if rt[0].Hash() != route.Hash() {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 2, len(routes)) t.Fatal("Mismatched routes received")
}
for _, route := range routes {
if route.Network != network {
t.Fatalf("incorrect route returned. Expected network: %s, found: %s", network, route.Network)
}
}
// query routes for particular gateway
gateway := "gw1"
routes, err = table.Query(router.QueryGateway(gateway))
if err != nil {
t.Fatalf("error looking up routes: %s", err)
}
if len(routes) != 1 {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes))
}
if routes[0].Gateway != gateway {
t.Fatalf("incorrect route returned. Expected gateway: %s, found: %s", gateway, routes[0].Gateway)
}
// query routes for particular router
rt := "rtr1"
routes, err = table.Query(router.QueryRouter(rt))
if err != nil {
t.Fatalf("error looking up routes: %s", err)
}
if len(routes) != 1 {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes))
}
if routes[0].Router != rt {
t.Fatalf("incorrect route returned. Expected router: %s, found: %s", rt, routes[0].Router)
}
// query particular gateway and network
query := []router.QueryOption{
router.QueryGateway(gateway),
router.QueryNetwork(network),
router.QueryRouter(rt),
}
routes, err = table.Query(query...)
if err != nil {
t.Fatalf("error looking up routes: %s", err)
}
if len(routes) != 1 {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes))
}
if routes[0].Gateway != gateway {
t.Fatalf("incorrect route returned. Expected gateway: %s, found: %s", gateway, routes[0].Gateway)
}
if routes[0].Network != network {
t.Fatalf("incorrect network returned. Expected network: %s, found: %s", network, routes[0].Network)
}
if routes[0].Router != rt {
t.Fatalf("incorrect route returned. Expected router: %s, found: %s", rt, routes[0].Router)
}
// non-existen route query
routes, err = table.Query(router.QueryService("foobar"))
if err != router.ErrRouteNotFound {
t.Fatalf("error looking up routes. Expected: %s, found: %s", router.ErrRouteNotFound, err)
}
if len(routes) != 0 {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 0, len(routes))
}
// query NO routes
query = []router.QueryOption{
router.QueryGateway(gateway),
router.QueryNetwork(network),
router.QueryLink("network"),
}
routes, err = table.Query(query...)
if err != nil {
t.Fatalf("error looking up routes: %s", err)
}
if len(routes) > 0 {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 0, len(routes))
}
// insert local routes to query
for i := 0; i < 2; i++ {
route.Link = "foobar"
route.Address = fmt.Sprintf("local.route.address-%d", i)
if err := table.Create(route); err != nil {
t.Fatalf("error adding route: %s", err)
}
}
// query local routes
query = []router.QueryOption{
router.QueryGateway("*"),
router.QueryNetwork("*"),
router.QueryLink("foobar"),
}
routes, err = table.Query(query...)
if err != nil {
t.Fatalf("error looking up routes: %s", err)
}
if len(routes) != 2 {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 2, len(routes))
}
// add two different routes for svcX with different metric
for i := 0; i < 2; i++ {
route.Service = "svcX"
route.Address = fmt.Sprintf("svcX.route.address-%d", i)
route.Metric = int64(100 + i)
route.Link = router.DefaultLink
if err := table.Create(route); err != nil {
t.Fatalf("error adding route: %s", err)
}
}
query = []router.QueryOption{
router.QueryService("svcX"),
}
routes, err = table.Query(query...)
if err != nil {
t.Fatalf("error looking up routes: %s", err)
}
if len(routes) != 2 {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes))
}
}
func TestFallback(t *testing.T) {
r := &rtr{
options: router.NewOptions(),
}
route := router.Route{
Service: "go.micro.service.foo",
Router: r.options.ID,
Link: router.DefaultLink,
Metric: router.DefaultLocalMetric,
}
r.table = newTable(func(s string) ([]router.Route, error) {
return []router.Route{route}, nil
})
r.start()
rts, err := r.Lookup(router.QueryService("go.micro.service.foo"))
if err != nil {
t.Fatalf("error looking up service %s", err)
}
if len(rts) != 1 {
t.Fatalf("incorrect number of routes returned %d", len(rts))
}
// deleting from the table but the next query should invoke the fallback that we passed during new table creation
if err := r.table.Delete(route); err != nil {
t.Fatalf("error deleting route %s", err)
}
rts, err = r.Lookup(router.QueryService("go.micro.service.foo"))
if err != nil {
t.Fatalf("error looking up service %s", err)
}
if len(rts) != 1 {
t.Fatalf("incorrect number of routes returned %d", len(rts))
}
}
func TestFallbackError(t *testing.T) {
r := &rtr{
options: router.NewOptions(),
}
r.table = newTable(func(s string) ([]router.Route, error) {
return nil, fmt.Errorf("ERROR")
})
r.start()
_, err := r.Lookup(router.QueryService("go.micro.service.foo"))
if err == nil {
t.Fatalf("expected error looking up service but none returned")
} }
} }

View File

@ -1,18 +1,18 @@
package register package registry
import ( import (
"sync" "sync"
"go.unistack.org/micro/v3/router" "github.com/micro/go-micro/v3/router"
) )
// tableWatcher implements routing table Watcher // tableWatcher implements routing table Watcher
type tableWatcher struct { type tableWatcher struct {
sync.RWMutex sync.RWMutex
resChan chan *router.Event
done chan struct{}
id string id string
opts router.WatchOptions opts router.WatchOptions
resChan chan *router.Event
done chan struct{}
} }
// Next returns the next noticed action taken on table // Next returns the next noticed action taken on table