Merge pull request #838 from micro/etcd

Use etcd serializable option
This commit is contained in:
Asim Aslam 2019-10-10 19:22:10 +01:00 committed by GitHub
commit 98e1f2c2d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -8,6 +8,7 @@ import (
"errors" "errors"
"net" "net"
"path" "path"
"sort"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -20,7 +21,7 @@ import (
) )
var ( var (
prefix = "/micro/registry" prefix = "/micro/registry/"
) )
type etcdRegistry struct { type etcdRegistry struct {
@ -148,7 +149,7 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op
defer cancel() defer cancel()
// look for the existing key // look for the existing key
rsp, err := e.client.Get(ctx, nodePath(s.Name, node.Id)) rsp, err := e.client.Get(ctx, nodePath(s.Name, node.Id), clientv3.WithSerializable())
if err != nil { if err != nil {
return err return err
} }
@ -310,7 +311,7 @@ func (e *etcdRegistry) GetService(name string) ([]*registry.Service, error) {
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
defer cancel() defer cancel()
rsp, err := e.client.Get(ctx, servicePath(name)+"/", clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend)) rsp, err := e.client.Get(ctx, servicePath(name)+"/", clientv3.WithPrefix(), clientv3.WithSerializable())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -344,6 +345,7 @@ func (e *etcdRegistry) GetService(name string) ([]*registry.Service, error) {
for _, service := range serviceMap { for _, service := range serviceMap {
services = append(services, service) services = append(services, service)
} }
return services, nil return services, nil
} }
@ -354,7 +356,7 @@ func (e *etcdRegistry) ListServices() ([]*registry.Service, error) {
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
defer cancel() defer cancel()
rsp, err := e.client.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend)) rsp, err := e.client.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithSerializable())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -381,6 +383,9 @@ func (e *etcdRegistry) ListServices() ([]*registry.Service, error) {
services = append(services, service) services = append(services, service)
} }
// sort the services
sort.Slice(services, func(i, j int) bool { return services[i].Name < services[j].Name })
return services, nil return services, nil
} }