Add network proxying (#1556)

* Add network proxying

* go fmt
This commit is contained in:
Asim Aslam 2020-04-21 15:54:40 +01:00 committed by GitHub
parent 05d2b34e10
commit d7ecb58f6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 51 additions and 55 deletions

View File

@ -6,7 +6,6 @@ import (
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"net" "net"
"os"
"strings" "strings"
"sync/atomic" "sync/atomic"
"time" "time"
@ -20,6 +19,7 @@ import (
"github.com/micro/go-micro/v2/metadata" "github.com/micro/go-micro/v2/metadata"
"github.com/micro/go-micro/v2/registry" "github.com/micro/go-micro/v2/registry"
"github.com/micro/go-micro/v2/util/config" "github.com/micro/go-micro/v2/util/config"
pnet "github.com/micro/go-micro/v2/util/net"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
@ -74,27 +74,13 @@ func (g *grpcClient) secure(addr string) grpc.DialOption {
} }
func (g *grpcClient) next(request client.Request, opts client.CallOptions) (selector.Next, error) { func (g *grpcClient) next(request client.Request, opts client.CallOptions) (selector.Next, error) {
service := request.Service() service, address, _ := pnet.Proxy(request.Service(), opts.Address)
// get proxy
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
// default name
if prx == "service" {
prx = "go.micro.proxy"
}
service = prx
}
// get proxy address
if prx := os.Getenv("MICRO_PROXY_ADDRESS"); len(prx) > 0 {
opts.Address = []string{prx}
}
// return remote address // return remote address
if len(opts.Address) > 0 { if len(address) > 0 {
return func() (*registry.Node, error) { return func() (*registry.Node, error) {
return &registry.Node{ return &registry.Node{
Address: opts.Address[0], Address: address[0],
}, nil }, nil
}, nil }, nil
} }

View File

@ -3,7 +3,6 @@ package client
import ( import (
"context" "context"
"fmt" "fmt"
"os"
"sync/atomic" "sync/atomic"
"time" "time"
@ -17,6 +16,7 @@ import (
"github.com/micro/go-micro/v2/registry" "github.com/micro/go-micro/v2/registry"
"github.com/micro/go-micro/v2/transport" "github.com/micro/go-micro/v2/transport"
"github.com/micro/go-micro/v2/util/buf" "github.com/micro/go-micro/v2/util/buf"
"github.com/micro/go-micro/v2/util/net"
"github.com/micro/go-micro/v2/util/pool" "github.com/micro/go-micro/v2/util/pool"
) )
@ -322,46 +322,18 @@ func (r *rpcClient) Options() Options {
return r.opts return r.opts
} }
// hasProxy checks if we have proxy set in the environment
func (r *rpcClient) hasProxy() bool {
// get proxy
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
return true
}
// get proxy address
if prx := os.Getenv("MICRO_PROXY_ADDRESS"); len(prx) > 0 {
return true
}
return false
}
// next returns an iterator for the next nodes to call // next returns an iterator for the next nodes to call
func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) { func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) {
service := request.Service() // try get the proxy
service, address, _ := net.Proxy(request.Service(), opts.Address)
// get proxy
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
// default name
if prx == "service" {
prx = "go.micro.proxy"
}
service = prx
}
// get proxy address
if prx := os.Getenv("MICRO_PROXY_ADDRESS"); len(prx) > 0 {
opts.Address = []string{prx}
}
// return remote address // return remote address
if len(opts.Address) > 0 { if len(address) > 0 {
nodes := make([]*registry.Node, len(opts.Address)) nodes := make([]*registry.Node, len(address))
for i, address := range opts.Address { for i, addr := range address {
nodes[i] = &registry.Node{ nodes[i] = &registry.Node{
Address: address, Address: addr,
// Set the protocol // Set the protocol
Metadata: map[string]string{ Metadata: map[string]string{
"protocol": "mucp", "protocol": "mucp",
@ -461,7 +433,7 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
retries := callOpts.Retries retries := callOpts.Retries
// disable retries when using a proxy // disable retries when using a proxy
if r.hasProxy() { if _, _, ok := net.Proxy(request.Service(), callOpts.Address); ok {
retries = 0 retries = 0
} }
@ -552,7 +524,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
retries := callOpts.Retries retries := callOpts.Retries
// disable retries when using a proxy // disable retries when using a proxy
if r.hasProxy() { if _, _, ok := net.Proxy(request.Service(), callOpts.Address); ok {
retries = 0 retries = 0
} }

View File

@ -4,6 +4,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"net" "net"
"os"
"strconv" "strconv"
"strings" "strings"
) )
@ -77,3 +78,40 @@ func Listen(addr string, fn func(string) (net.Listener, error)) (net.Listener, e
// why are we here? // why are we here?
return nil, fmt.Errorf("unable to bind to %s", addr) return nil, fmt.Errorf("unable to bind to %s", addr)
} }
// Proxy returns the proxy and the address if it exits
func Proxy(service string, address []string) (string, []string, bool) {
var hasProxy bool
// get proxy
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
// default name
if prx == "service" {
prx = "go.micro.proxy"
}
service = prx
hasProxy = true
}
// get proxy address
if prx := os.Getenv("MICRO_PROXY_ADDRESS"); len(prx) > 0 {
address = []string{prx}
hasProxy = true
}
if prx := os.Getenv("MICRO_NETWORK"); len(prx) > 0 {
// default name
if prx == "service" {
prx = "go.micro.network"
}
service = prx
hasProxy = true
}
if prx := os.Getenv("MICRO_NEWORK_ADDRESS"); len(prx) > 0 {
address = []string{prx}
hasProxy = true
}
return service, address, hasProxy
}