Switch up the selector so it actually allows you to inform it how the node performed
This commit is contained in:
		| @@ -18,9 +18,11 @@ import ( | ||||
| type rpcClient struct { | ||||
| 	once sync.Once | ||||
| 	opts options | ||||
| 	sel  NodeSelector | ||||
| } | ||||
|  | ||||
| func newRpcClient(opt ...Option) Client { | ||||
| 	var sel NodeSelector | ||||
| 	var once sync.Once | ||||
|  | ||||
| 	opts := options{ | ||||
| @@ -39,17 +41,24 @@ func newRpcClient(opt ...Option) Client { | ||||
| 		opts.transport = transport.DefaultTransport | ||||
| 	} | ||||
|  | ||||
| 	if opts.registry == nil { | ||||
| 		opts.registry = registry.DefaultRegistry | ||||
| 	} | ||||
|  | ||||
| 	if opts.broker == nil { | ||||
| 		opts.broker = broker.DefaultBroker | ||||
| 	} | ||||
|  | ||||
| 	if opts.selector == nil { | ||||
| 		opts.selector = nodeSelector | ||||
| 	if opts.selector != nil { | ||||
| 		sel = opts.selector(opts.registry) | ||||
| 	} else { | ||||
| 		sel = &nodeSelector{opts.registry} | ||||
| 	} | ||||
|  | ||||
| 	rc := &rpcClient{ | ||||
| 		once: once, | ||||
| 		opts: opts, | ||||
| 		sel:  sel, | ||||
| 	} | ||||
|  | ||||
| 	c := Client(rc) | ||||
| @@ -145,12 +154,7 @@ func (r *rpcClient) CallRemote(ctx context.Context, address string, request Requ | ||||
|  | ||||
| // TODO: Call(..., opts *Options) error { | ||||
| func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}) error { | ||||
| 	service, err := registry.GetService(request.Service()) | ||||
| 	if err != nil { | ||||
| 		return errors.InternalServerError("go.micro.client", err.Error()) | ||||
| 	} | ||||
|  | ||||
| 	node, err := r.opts.selector(service) | ||||
| 	node, err := r.sel.Retrieve(ctx, request) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| @@ -160,7 +164,9 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac | ||||
| 		address = fmt.Sprintf("%s:%d", address, node.Port) | ||||
| 	} | ||||
|  | ||||
| 	return r.call(ctx, address, request, response) | ||||
| 	err = r.call(ctx, address, request, response) | ||||
| 	r.sel.Response(node, err) | ||||
| 	return err | ||||
| } | ||||
|  | ||||
| func (r *rpcClient) StreamRemote(ctx context.Context, address string, request Request, responseChan interface{}) (Streamer, error) { | ||||
| @@ -168,12 +174,7 @@ func (r *rpcClient) StreamRemote(ctx context.Context, address string, request Re | ||||
| } | ||||
|  | ||||
| func (r *rpcClient) Stream(ctx context.Context, request Request, responseChan interface{}) (Streamer, error) { | ||||
| 	service, err := registry.GetService(request.Service()) | ||||
| 	if err != nil { | ||||
| 		return nil, errors.InternalServerError("go.micro.client", err.Error()) | ||||
| 	} | ||||
|  | ||||
| 	node, err := r.opts.selector(service) | ||||
| 	node, err := r.sel.Retrieve(ctx, request) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| @@ -183,7 +184,9 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, responseChan in | ||||
| 		address = fmt.Sprintf("%s:%d", address, node.Port) | ||||
| 	} | ||||
|  | ||||
| 	return r.stream(ctx, address, request, responseChan) | ||||
| 	stream, err := r.stream(ctx, address, request, responseChan) | ||||
| 	r.sel.Response(node, err) | ||||
| 	return stream, err | ||||
| } | ||||
|  | ||||
| func (r *rpcClient) Publish(ctx context.Context, p Publication) error { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user