client: add proxy option (#1885)
* client: add proxy option * client: add WithProxy CallOption * use address option * ProxyAddress => Proxy
This commit is contained in:
parent
006bbefaf3
commit
e9fc5b1671
@ -411,6 +411,11 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface
|
|||||||
callOpts.Selector = g.opts.Selector
|
callOpts.Selector = g.opts.Selector
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// inject proxy address
|
||||||
|
if len(callOpts.Address) == 0 && len(g.opts.Proxy) > 0 {
|
||||||
|
callOpts.Address = []string{g.opts.Proxy}
|
||||||
|
}
|
||||||
|
|
||||||
// lookup the route to send the reques to
|
// lookup the route to send the reques to
|
||||||
route, err := client.LookupRoute(req, callOpts)
|
route, err := client.LookupRoute(req, callOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -514,6 +519,11 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
|
|||||||
callOpts.Selector = g.opts.Selector
|
callOpts.Selector = g.opts.Selector
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// inject proxy address
|
||||||
|
if len(callOpts.Address) == 0 && len(g.opts.Proxy) > 0 {
|
||||||
|
callOpts.Address = []string{g.opts.Proxy}
|
||||||
|
}
|
||||||
|
|
||||||
// lookup the route to send the reques to
|
// lookup the route to send the reques to
|
||||||
route, err := client.LookupRoute(req, callOpts)
|
route, err := client.LookupRoute(req, callOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -17,7 +17,6 @@ import (
|
|||||||
"github.com/micro/go-micro/v3/registry"
|
"github.com/micro/go-micro/v3/registry"
|
||||||
"github.com/micro/go-micro/v3/transport"
|
"github.com/micro/go-micro/v3/transport"
|
||||||
"github.com/micro/go-micro/v3/util/buf"
|
"github.com/micro/go-micro/v3/util/buf"
|
||||||
"github.com/micro/go-micro/v3/util/net"
|
|
||||||
"github.com/micro/go-micro/v3/util/pool"
|
"github.com/micro/go-micro/v3/util/pool"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -379,6 +378,11 @@ func (r *rpcClient) Call(ctx context.Context, request client.Request, response i
|
|||||||
callOpts.Selector = r.opts.Selector
|
callOpts.Selector = r.opts.Selector
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// inject proxy address
|
||||||
|
if len(callOpts.Address) == 0 && len(r.opts.Proxy) > 0 {
|
||||||
|
callOpts.Address = []string{r.opts.Proxy}
|
||||||
|
}
|
||||||
|
|
||||||
// lookup the route to send the request via
|
// lookup the route to send the request via
|
||||||
route, err := client.LookupRoute(request, callOpts)
|
route, err := client.LookupRoute(request, callOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -403,7 +407,7 @@ func (r *rpcClient) Call(ctx context.Context, request client.Request, response i
|
|||||||
retries := callOpts.Retries
|
retries := callOpts.Retries
|
||||||
|
|
||||||
// disable retries when using a proxy
|
// disable retries when using a proxy
|
||||||
if _, _, ok := net.Proxy(request.Service(), callOpts.Address); ok {
|
if len(r.opts.Proxy) > 0 {
|
||||||
retries = 0
|
retries = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -475,6 +479,11 @@ func (r *rpcClient) Stream(ctx context.Context, request client.Request, opts ...
|
|||||||
callOpts.Selector = r.opts.Selector
|
callOpts.Selector = r.opts.Selector
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// inject proxy address
|
||||||
|
if len(callOpts.Address) == 0 && len(r.opts.Proxy) > 0 {
|
||||||
|
callOpts.Address = []string{r.opts.Proxy}
|
||||||
|
}
|
||||||
|
|
||||||
// lookup the route to send the request via
|
// lookup the route to send the request via
|
||||||
route, err := client.LookupRoute(request, callOpts)
|
route, err := client.LookupRoute(request, callOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -504,7 +513,7 @@ func (r *rpcClient) Stream(ctx context.Context, request client.Request, opts ...
|
|||||||
retries := callOpts.Retries
|
retries := callOpts.Retries
|
||||||
|
|
||||||
// disable retries when using a proxy
|
// disable retries when using a proxy
|
||||||
if _, _, ok := net.Proxy(request.Service(), callOpts.Address); ok {
|
if len(r.opts.Proxy) > 0 {
|
||||||
retries = 0
|
retries = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,6 +17,8 @@ import (
|
|||||||
type Options struct {
|
type Options struct {
|
||||||
// Used to select codec
|
// Used to select codec
|
||||||
ContentType string
|
ContentType string
|
||||||
|
// Proxy address to send requests via
|
||||||
|
Proxy string
|
||||||
|
|
||||||
// Plugged interfaces
|
// Plugged interfaces
|
||||||
Broker broker.Broker
|
Broker broker.Broker
|
||||||
@ -149,6 +151,13 @@ func ContentType(ct string) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Proxy sets the proxy address
|
||||||
|
func Proxy(addr string) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.Proxy = addr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// PoolSize sets the connection pool size
|
// PoolSize sets the connection pool size
|
||||||
func PoolSize(d int) Option {
|
func PoolSize(d int) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
|
@ -6,19 +6,15 @@ import (
|
|||||||
"github.com/micro/go-micro/v3/errors"
|
"github.com/micro/go-micro/v3/errors"
|
||||||
"github.com/micro/go-micro/v3/router"
|
"github.com/micro/go-micro/v3/router"
|
||||||
"github.com/micro/go-micro/v3/selector"
|
"github.com/micro/go-micro/v3/selector"
|
||||||
pnet "github.com/micro/go-micro/v3/util/net"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// LookupRoute for a request using the router and then choose one using the selector
|
// LookupRoute for a request using the router and then choose one using the selector
|
||||||
func LookupRoute(req Request, opts CallOptions) (*router.Route, error) {
|
func LookupRoute(req Request, opts CallOptions) (*router.Route, error) {
|
||||||
// check to see if the proxy has been set, if it has we don't need to lookup the routes; net.Proxy
|
// check to see if an address was provided as a call option
|
||||||
// returns a slice of addresses, so we'll use a random one. Eventually we should to use the
|
if len(opts.Address) > 0 {
|
||||||
// selector for this.
|
|
||||||
service, addresses, _ := pnet.Proxy(req.Service(), opts.Address)
|
|
||||||
if len(addresses) > 0 {
|
|
||||||
return &router.Route{
|
return &router.Route{
|
||||||
Service: service,
|
Service: req.Service(),
|
||||||
Address: addresses[rand.Int()%len(addresses)],
|
Address: opts.Address[rand.Int()%len(opts.Address)],
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4,7 +4,6 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"os"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
@ -78,43 +77,3 @@ func Listen(addr string, fn func(string) (net.Listener, error)) (net.Listener, e
|
|||||||
// why are we here?
|
// why are we here?
|
||||||
return nil, fmt.Errorf("unable to bind to %s", addr)
|
return nil, fmt.Errorf("unable to bind to %s", addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Proxy returns the proxy and the address if it exits
|
|
||||||
func Proxy(service string, address []string) (string, []string, bool) {
|
|
||||||
var hasProxy bool
|
|
||||||
|
|
||||||
// get proxy. we parse out address if present
|
|
||||||
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
|
|
||||||
// default name
|
|
||||||
if prx == "service" {
|
|
||||||
prx = "go.micro.proxy"
|
|
||||||
address = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// check if its an address
|
|
||||||
if v := strings.Split(prx, ":"); len(v) > 1 {
|
|
||||||
address = []string{prx}
|
|
||||||
}
|
|
||||||
|
|
||||||
service = prx
|
|
||||||
hasProxy = true
|
|
||||||
|
|
||||||
return service, address, hasProxy
|
|
||||||
}
|
|
||||||
|
|
||||||
if prx := os.Getenv("MICRO_NETWORK"); len(prx) > 0 {
|
|
||||||
// default name
|
|
||||||
if prx == "service" {
|
|
||||||
prx = "go.micro.network"
|
|
||||||
}
|
|
||||||
service = prx
|
|
||||||
hasProxy = true
|
|
||||||
}
|
|
||||||
|
|
||||||
if prx := os.Getenv("MICRO_NETWORK_ADDRESS"); len(prx) > 0 {
|
|
||||||
address = []string{prx}
|
|
||||||
hasProxy = true
|
|
||||||
}
|
|
||||||
|
|
||||||
return service, address, hasProxy
|
|
||||||
}
|
|
||||||
|
@ -2,7 +2,6 @@ package net
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"net"
|
"net"
|
||||||
"os"
|
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -25,38 +24,3 @@ func TestListen(t *testing.T) {
|
|||||||
// Expect addr DO NOT has extra ":" at the end!
|
// Expect addr DO NOT has extra ":" at the end!
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestProxyEnv checks whether we have proxy/network settings in env
|
|
||||||
func TestProxyEnv(t *testing.T) {
|
|
||||||
service := "foo"
|
|
||||||
address := []string{"bar"}
|
|
||||||
|
|
||||||
s, a, ok := Proxy(service, address)
|
|
||||||
if ok {
|
|
||||||
t.Fatal("Should not have proxy", s, a, ok)
|
|
||||||
}
|
|
||||||
|
|
||||||
test := func(key, val, expectSrv, expectAddr string) {
|
|
||||||
// set env
|
|
||||||
os.Setenv(key, val)
|
|
||||||
|
|
||||||
s, a, ok := Proxy(service, address)
|
|
||||||
if !ok {
|
|
||||||
t.Fatal("Expected proxy")
|
|
||||||
}
|
|
||||||
if len(expectSrv) > 0 && s != expectSrv {
|
|
||||||
t.Fatal("Expected proxy service", expectSrv, "got", s)
|
|
||||||
}
|
|
||||||
if len(expectAddr) > 0 {
|
|
||||||
if len(a) == 0 || a[0] != expectAddr {
|
|
||||||
t.Fatal("Expected proxy address", expectAddr, "got", a)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
os.Unsetenv(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
test("MICRO_PROXY", "service", "go.micro.proxy", "")
|
|
||||||
test("MICRO_NETWORK", "service", "go.micro.network", "")
|
|
||||||
test("MICRO_NETWORK_ADDRESS", "10.0.0.1:8081", "", "10.0.0.1:8081")
|
|
||||||
}
|
|
||||||
|
Loading…
Reference in New Issue
Block a user