Add selector code
This commit is contained in:
		| @@ -18,11 +18,9 @@ import ( | ||||
| type rpcClient struct { | ||||
| 	once sync.Once | ||||
| 	opts options | ||||
| 	sel  NodeSelector | ||||
| } | ||||
|  | ||||
| func newRpcClient(opt ...Option) Client { | ||||
| 	var sel NodeSelector | ||||
| 	var once sync.Once | ||||
|  | ||||
| 	opts := options{ | ||||
| @@ -37,28 +35,27 @@ func newRpcClient(opt ...Option) Client { | ||||
| 		opts.contentType = defaultContentType | ||||
| 	} | ||||
|  | ||||
| 	if opts.transport == nil { | ||||
| 		opts.transport = transport.DefaultTransport | ||||
| 	if opts.broker == nil { | ||||
| 		opts.broker = broker.DefaultBroker | ||||
| 	} | ||||
|  | ||||
| 	if opts.registry == nil { | ||||
| 		opts.registry = registry.DefaultRegistry | ||||
| 	} | ||||
|  | ||||
| 	if opts.broker == nil { | ||||
| 		opts.broker = broker.DefaultBroker | ||||
| 	if opts.selector == nil { | ||||
| 		opts.selector = registry.NewRandomSelector( | ||||
| 			registry.SelectorRegistry(opts.registry), | ||||
| 		) | ||||
| 	} | ||||
|  | ||||
| 	if opts.selector != nil { | ||||
| 		sel = opts.selector(opts.registry) | ||||
| 	} else { | ||||
| 		sel = &nodeSelector{opts.registry} | ||||
| 	if opts.transport == nil { | ||||
| 		opts.transport = transport.DefaultTransport | ||||
| 	} | ||||
|  | ||||
| 	rc := &rpcClient{ | ||||
| 		once: once, | ||||
| 		opts: opts, | ||||
| 		sel:  sel, | ||||
| 	} | ||||
|  | ||||
| 	c := Client(rc) | ||||
| @@ -153,7 +150,17 @@ func (r *rpcClient) CallRemote(ctx context.Context, address string, request Requ | ||||
| } | ||||
|  | ||||
| func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error { | ||||
| 	node, err := r.sel.Select(ctx, request) | ||||
| 	var copts callOptions | ||||
| 	for _, opt := range opts { | ||||
| 		opt(&copts) | ||||
| 	} | ||||
|  | ||||
| 	next, err := r.opts.selector.Select(request.Service(), copts.selectOptions...) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	node, err := next() | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| @@ -164,7 +171,7 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac | ||||
| 	} | ||||
|  | ||||
| 	err = r.call(ctx, address, request, response) | ||||
| 	r.sel.Response(node, err) | ||||
| 	r.opts.selector.Mark(request.Service(), node, err) | ||||
| 	return err | ||||
| } | ||||
|  | ||||
| @@ -173,7 +180,17 @@ func (r *rpcClient) StreamRemote(ctx context.Context, address string, request Re | ||||
| } | ||||
|  | ||||
| func (r *rpcClient) Stream(ctx context.Context, request Request, responseChan interface{}, opts ...CallOption) (Streamer, error) { | ||||
| 	node, err := r.sel.Select(ctx, request) | ||||
| 	var copts callOptions | ||||
| 	for _, opt := range opts { | ||||
| 		opt(&copts) | ||||
| 	} | ||||
|  | ||||
| 	next, err := r.opts.selector.Select(request.Service(), copts.selectOptions...) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	node, err := next() | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| @@ -184,7 +201,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, responseChan in | ||||
| 	} | ||||
|  | ||||
| 	stream, err := r.stream(ctx, address, request, responseChan) | ||||
| 	r.sel.Response(node, err) | ||||
| 	r.opts.selector.Mark(request.Service(), node, err) | ||||
| 	return stream, err | ||||
| } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user