fix typo in comments (#1840)
* remove global error tracking * rpc_server: fix invalid register err * fix typo Co-authored-by: Asim Aslam <asim@aslam.me>
This commit is contained in:
parent
dfa50a888d
commit
f9bf562393
@ -2,7 +2,7 @@
|
|||||||
package command
|
package command
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// Commmands keyed by golang/regexp patterns
|
// Commands keyed by golang/regexp patterns
|
||||||
// regexp.Match(key, input) is used to match
|
// regexp.Match(key, input) is used to match
|
||||||
Commands = map[string]Command{}
|
Commands = map[string]Command{}
|
||||||
)
|
)
|
||||||
|
@ -18,7 +18,7 @@ type Api interface {
|
|||||||
Register(*Endpoint) error
|
Register(*Endpoint) error
|
||||||
// Register a route
|
// Register a route
|
||||||
Deregister(*Endpoint) error
|
Deregister(*Endpoint) error
|
||||||
// Implemenation of api
|
// Implementation of api
|
||||||
String() string
|
String() string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -21,7 +21,7 @@ const (
|
|||||||
type httpHandler struct {
|
type httpHandler struct {
|
||||||
options handler.Options
|
options handler.Options
|
||||||
|
|
||||||
// set with different initialiser
|
// set with different initializer
|
||||||
s *api.Service
|
s *api.Service
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,7 +62,7 @@ func WithClient(c client.Client) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithmaxRecvSize specifies max body size
|
// WithMaxRecvSize specifies max body size
|
||||||
func WithMaxRecvSize(size int64) Option {
|
func WithMaxRecvSize(size int64) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
o.MaxRecvSize = size
|
o.MaxRecvSize = size
|
||||||
|
@ -265,7 +265,7 @@ func requestPayload(r *http.Request) ([]byte, error) {
|
|||||||
|
|
||||||
// otherwise as per usual
|
// otherwise as per usual
|
||||||
ctx := r.Context()
|
ctx := r.Context()
|
||||||
// dont user meadata.FromContext as it mangles names
|
// dont user metadata.FromContext as it mangles names
|
||||||
md, ok := metadata.FromContext(ctx)
|
md, ok := metadata.FromContext(ctx)
|
||||||
if !ok {
|
if !ok {
|
||||||
md = make(map[string]string)
|
md = make(map[string]string)
|
||||||
|
@ -13,7 +13,7 @@ type Options struct {
|
|||||||
Secure bool
|
Secure bool
|
||||||
Codec codec.Marshaler
|
Codec codec.Marshaler
|
||||||
|
|
||||||
// Handler executed when error happens in broker mesage
|
// Handler executed when error happens in broker message
|
||||||
// processing
|
// processing
|
||||||
ErrorHandler Handler
|
ErrorHandler Handler
|
||||||
|
|
||||||
|
@ -57,7 +57,7 @@ type Response interface {
|
|||||||
Read() ([]byte, error)
|
Read() ([]byte, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stream is the inteface for a bidirectional synchronous stream
|
// Stream is the interface for a bidirectional synchronous stream
|
||||||
type Stream interface {
|
type Stream interface {
|
||||||
// Context for the stream
|
// Context for the stream
|
||||||
Context() context.Context
|
Context() context.Context
|
||||||
|
@ -378,7 +378,7 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// pass a node to enable backwards compatability as changing the
|
// pass a node to enable backwards comparability as changing the
|
||||||
// call func would be a breaking change.
|
// call func would be a breaking change.
|
||||||
// todo v3: change the call func to accept a route
|
// todo v3: change the call func to accept a route
|
||||||
node := ®istry.Node{Address: route.Address, Metadata: route.Metadata}
|
node := ®istry.Node{Address: route.Address, Metadata: route.Metadata}
|
||||||
|
@ -32,7 +32,7 @@ type Entity interface {
|
|||||||
Name() string
|
Name() string
|
||||||
// The value associated with the entity
|
// The value associated with the entity
|
||||||
Value() interface{}
|
Value() interface{}
|
||||||
// Attributes of the enity
|
// Attributes of the entity
|
||||||
Attributes() map[string]interface{}
|
Attributes() map[string]interface{}
|
||||||
// Read a value as a concrete type
|
// Read a value as a concrete type
|
||||||
Read(v interface{}) error
|
Read(v interface{}) error
|
||||||
|
@ -634,7 +634,7 @@ func (n *network) processCtrlChan(listener tunnel.Listener) {
|
|||||||
// receive control message queue
|
// receive control message queue
|
||||||
recv := make(chan *message, 128)
|
recv := make(chan *message, 128)
|
||||||
|
|
||||||
// accept ControlChannel cconnections
|
// accept ControlChannel connections
|
||||||
go n.acceptCtrlConn(listener, recv)
|
go n.acceptCtrlConn(listener, recv)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
@ -660,7 +660,7 @@ func (n *network) processCtrlChan(listener tunnel.Listener) {
|
|||||||
logger.Debugf("Network received advert message from: %s", pbRtrAdvert.Id)
|
logger.Debugf("Network received advert message from: %s", pbRtrAdvert.Id)
|
||||||
}
|
}
|
||||||
|
|
||||||
// loookup advertising node in our peer topology
|
// lookup advertising node in our peer topology
|
||||||
advertNode := n.node.GetPeerNode(pbRtrAdvert.Id)
|
advertNode := n.node.GetPeerNode(pbRtrAdvert.Id)
|
||||||
if advertNode == nil {
|
if advertNode == nil {
|
||||||
// if we can't find the node in our topology (MaxDepth) we skipp prcessing adverts
|
// if we can't find the node in our topology (MaxDepth) we skipp prcessing adverts
|
||||||
@ -913,7 +913,7 @@ func (n *network) processNetChan(listener tunnel.Listener) {
|
|||||||
logger.Debugf("Network failed refreshing peer %s: %v", pbNetPeer.Node.Id, err)
|
logger.Debugf("Network failed refreshing peer %s: %v", pbNetPeer.Node.Id, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NOTE: we don't unpack MaxDepth toplogy
|
// NOTE: we don't unpack MaxDepth topology
|
||||||
peer = UnpackPeerTopology(pbNetPeer, now, MaxDepth-1)
|
peer = UnpackPeerTopology(pbNetPeer, now, MaxDepth-1)
|
||||||
// update the link
|
// update the link
|
||||||
peer.link = m.msg.Header["Micro-Link"]
|
peer.link = m.msg.Header["Micro-Link"]
|
||||||
@ -1217,7 +1217,7 @@ func (n *network) manage() {
|
|||||||
lastSent := links[peer.link]
|
lastSent := links[peer.link]
|
||||||
|
|
||||||
// check when we last sent to the peer
|
// check when we last sent to the peer
|
||||||
// and send a peer message if we havent
|
// and send a peer message if we haven't
|
||||||
if lastSent.IsZero() || time.Since(lastSent) > KeepAliveTime {
|
if lastSent.IsZero() || time.Since(lastSent) > KeepAliveTime {
|
||||||
link := peer.link
|
link := peer.link
|
||||||
id := peer.id
|
id := peer.id
|
||||||
@ -1351,7 +1351,7 @@ func (n *network) manage() {
|
|||||||
|
|
||||||
// pick a random peer from the list of peers and request full sync
|
// pick a random peer from the list of peers and request full sync
|
||||||
peer := n.node.GetPeerNode(peers[rnd.Intn(len(peers))].Id())
|
peer := n.node.GetPeerNode(peers[rnd.Intn(len(peers))].Id())
|
||||||
// skip if we can't find randmly selected peer
|
// skip if we can't find randomly selected peer
|
||||||
if peer == nil {
|
if peer == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -167,7 +167,7 @@ func (n *node) walk(until func(peer *node) bool, action func(parent, peer *node)
|
|||||||
return visited
|
return visited
|
||||||
}
|
}
|
||||||
// iterate through all of the node peers
|
// iterate through all of the node peers
|
||||||
// mark the visited nodes; enqueue the non-visted
|
// mark the visited nodes; enqueue the non-visited
|
||||||
for id, peer := range qnode.Value.(*node).peers {
|
for id, peer := range qnode.Value.(*node).peers {
|
||||||
action(qnode.Value.(*node), peer)
|
action(qnode.Value.(*node), peer)
|
||||||
if _, ok := visited[id]; !ok {
|
if _, ok := visited[id]; !ok {
|
||||||
@ -274,7 +274,7 @@ func (n *node) RefreshSync(now time.Time) error {
|
|||||||
// Nodes returns a slice of all nodes in the whole node topology
|
// Nodes returns a slice of all nodes in the whole node topology
|
||||||
func (n *node) Nodes() []Node {
|
func (n *node) Nodes() []Node {
|
||||||
// we need to freeze the network graph here
|
// we need to freeze the network graph here
|
||||||
// otherwise we might get inconsisten results
|
// otherwise we might get inconsistent results
|
||||||
n.RLock()
|
n.RLock()
|
||||||
defer n.RUnlock()
|
defer n.RUnlock()
|
||||||
|
|
||||||
|
@ -122,7 +122,7 @@ func (p *Proxy) filterRoutes(ctx context.Context, routes []router.Route) []route
|
|||||||
// process only routes for this id
|
// process only routes for this id
|
||||||
if id, ok := md.Get("Micro-Router"); ok && len(id) > 0 {
|
if id, ok := md.Get("Micro-Router"); ok && len(id) > 0 {
|
||||||
if route.Router != id {
|
if route.Router != id {
|
||||||
// skip routes that don't mwatch
|
// skip routes that don't match
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -130,7 +130,7 @@ func (p *Proxy) filterRoutes(ctx context.Context, routes []router.Route) []route
|
|||||||
// only process routes with this network
|
// only process routes with this network
|
||||||
if net, ok := md.Get("Micro-Namespace"); ok && len(net) > 0 {
|
if net, ok := md.Get("Micro-Namespace"); ok && len(net) > 0 {
|
||||||
if route.Network != router.DefaultNetwork && route.Network != net {
|
if route.Network != router.DefaultNetwork && route.Network != net {
|
||||||
// skip routes that don't mwatch
|
// skip routes that don't match
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -23,9 +23,9 @@ type EventType int
|
|||||||
const (
|
const (
|
||||||
// Create is emitted when a new service is registered
|
// Create is emitted when a new service is registered
|
||||||
Create EventType = iota
|
Create EventType = iota
|
||||||
// Delete is emitted when an existing service is deregsitered
|
// Delete is emitted when an existing service is deregistered
|
||||||
Delete
|
Delete
|
||||||
// Update is emitted when an existing servicec is updated
|
// Update is emitted when an existing service is updated
|
||||||
Update
|
Update
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -46,7 +46,7 @@ func newService(opts ...Option) Service {
|
|||||||
|
|
||||||
// pass the services auth namespace to the auth handler so it
|
// pass the services auth namespace to the auth handler so it
|
||||||
// uses this to verify requests, preventing the reliance on the
|
// uses this to verify requests, preventing the reliance on the
|
||||||
// unsecure Micro-Namespace header.
|
// insecure Micro-Namespace header.
|
||||||
handlerNS := wrapper.AuthHandlerNamespace(options.Auth.Options().Issuer)
|
handlerNS := wrapper.AuthHandlerNamespace(options.Auth.Options().Issuer)
|
||||||
|
|
||||||
// wrap the server to provide handler stats
|
// wrap the server to provide handler stats
|
||||||
|
@ -8,7 +8,7 @@ import (
|
|||||||
"github.com/micro/go-micro/v2/logger"
|
"github.com/micro/go-micro/v2/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Verify the auth credentials and refresh the auth token periodicallay
|
// Verify the auth credentials and refresh the auth token periodically
|
||||||
func Verify(a auth.Auth) error {
|
func Verify(a auth.Auth) error {
|
||||||
// extract the account creds from options, these can be set by flags
|
// extract the account creds from options, these can be set by flags
|
||||||
accID := a.Options().ID
|
accID := a.Options().ID
|
||||||
|
@ -90,7 +90,7 @@ func (r *Request) Resource(s string) *Request {
|
|||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
// SubResource sets a subresource on a resource,
|
// SubResource sets a sub resource on a resource,
|
||||||
// e.g. pods/log for pod logs
|
// e.g. pods/log for pod logs
|
||||||
func (r *Request) SubResource(s string) *Request {
|
func (r *Request) SubResource(s string) *Request {
|
||||||
r.subResource = &s
|
r.subResource = &s
|
||||||
@ -132,7 +132,7 @@ func (r *Request) Body(in interface{}) *Request {
|
|||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
// Params isused to set paramters on a request
|
// Params is used to set parameters on a request
|
||||||
func (r *Request) Params(p *Params) *Request {
|
func (r *Request) Params(p *Params) *Request {
|
||||||
for k, v := range p.LabelSelector {
|
for k, v := range p.LabelSelector {
|
||||||
// create new key=value pair
|
// create new key=value pair
|
||||||
|
@ -37,7 +37,7 @@ type client struct {
|
|||||||
type Client interface {
|
type Client interface {
|
||||||
// Create creates new API resource
|
// Create creates new API resource
|
||||||
Create(*Resource, ...CreateOption) error
|
Create(*Resource, ...CreateOption) error
|
||||||
// Get queries API resrouces
|
// Get queries API resources
|
||||||
Get(*Resource, ...GetOption) error
|
Get(*Resource, ...GetOption) error
|
||||||
// Update patches existing API object
|
// Update patches existing API object
|
||||||
Update(*Resource, ...UpdateOption) error
|
Update(*Resource, ...UpdateOption) error
|
||||||
|
@ -13,7 +13,7 @@ type Pool interface {
|
|||||||
Close() error
|
Close() error
|
||||||
// Get a connection
|
// Get a connection
|
||||||
Get(addr string, opts ...transport.DialOption) (Conn, error)
|
Get(addr string, opts ...transport.DialOption) (Conn, error)
|
||||||
// Releaes the connection
|
// Release the connection
|
||||||
Release(c Conn, status error) error
|
Release(c Conn, status error) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5,7 +5,7 @@ import (
|
|||||||
"syscall"
|
"syscall"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ShutDownSingals returns all the singals that are being watched for to shut down services.
|
// ShutDownSignals returns all the signals that are being watched for to shut down services.
|
||||||
func Shutdown() []os.Signal {
|
func Shutdown() []os.Signal {
|
||||||
return []os.Signal{
|
return []os.Signal{
|
||||||
syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL,
|
syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL,
|
||||||
|
@ -380,7 +380,7 @@ func TestCacheWrapper(t *testing.T) {
|
|||||||
return nil
|
return nil
|
||||||
}, cli)
|
}, cli)
|
||||||
|
|
||||||
// perfroming two requests should increment the call count by two indicating the cache wasn't
|
// performing two requests should increment the call count by two indicating the cache wasn't
|
||||||
// used even though the WithCache option was passed.
|
// used even though the WithCache option was passed.
|
||||||
w.Call(context.TODO(), req, nil, client.WithCache(time.Minute))
|
w.Call(context.TODO(), req, nil, client.WithCache(time.Minute))
|
||||||
w.Call(context.TODO(), req, nil, client.WithCache(time.Minute))
|
w.Call(context.TODO(), req, nil, client.WithCache(time.Minute))
|
||||||
@ -398,7 +398,7 @@ func TestCacheWrapper(t *testing.T) {
|
|||||||
return cache
|
return cache
|
||||||
}, cli)
|
}, cli)
|
||||||
|
|
||||||
// perfroming two requests should increment the call count by two since we didn't pass the WithCache
|
// performing two requests should increment the call count by two since we didn't pass the WithCache
|
||||||
// option to Call.
|
// option to Call.
|
||||||
w.Call(context.TODO(), req, nil)
|
w.Call(context.TODO(), req, nil)
|
||||||
w.Call(context.TODO(), req, nil)
|
w.Call(context.TODO(), req, nil)
|
||||||
@ -417,7 +417,7 @@ func TestCacheWrapper(t *testing.T) {
|
|||||||
return cache
|
return cache
|
||||||
}, cli)
|
}, cli)
|
||||||
|
|
||||||
// perfroming two requests should increment the call count by once since the second request should
|
// performing two requests should increment the call count by once since the second request should
|
||||||
// have used the cache. The correct value should be set on both responses and no errors should
|
// have used the cache. The correct value should be set on both responses and no errors should
|
||||||
// be returned.
|
// be returned.
|
||||||
rsp1 := &testRsp{}
|
rsp1 := &testRsp{}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user