From a1342c23fbd100db482b8592e1b97cf2286454d8 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Mon, 14 Oct 2019 15:17:25 +0100 Subject: [PATCH] add mutex lock implementation --- sync/lock/lock.go | 5 ++ sync/lock/mutex/mutex.go | 142 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 147 insertions(+) create mode 100644 sync/lock/mutex/mutex.go diff --git a/sync/lock/lock.go b/sync/lock/lock.go index 8be6629f..c21a9125 100644 --- a/sync/lock/lock.go +++ b/sync/lock/lock.go @@ -2,9 +2,14 @@ package lock import ( + "errors" "time" ) +var ( + ErrLockTimeout = errors.New("lock timeout") +) + // Lock is a distributed locking interface type Lock interface { // Acquire a lock with given id diff --git a/sync/lock/mutex/mutex.go b/sync/lock/mutex/mutex.go new file mode 100644 index 00000000..81059ab3 --- /dev/null +++ b/sync/lock/mutex/mutex.go @@ -0,0 +1,142 @@ +// Package mutex provides a sync.Mutex implementation of the lock for local use +package mutex + +import ( + "sync" + "time" + + lock "github.com/micro/go-micro/sync/lock" +) + +type mutexLock struct { + sync.RWMutex + locks map[string]*mlock +} + +type mlock struct { + id string + time time.Time + ttl time.Duration + release chan bool +} + +func (m *mutexLock) Acquire(id string, opts ...lock.AcquireOption) error { + // lock our access + m.Lock() + + var options lock.AcquireOptions + for _, o := range opts { + o(&options) + } + + lk, ok := m.locks[id] + if !ok { + m.locks[id] = &mlock{ + id: id, + time: time.Now(), + ttl: options.TTL, + release: make(chan bool), + } + // unlock + m.Unlock() + return nil + } + + m.Unlock() + + // set wait time + var wait <-chan time.Time + var ttl <-chan time.Time + + // decide if we should wait + if options.Wait > time.Duration(0) { + wait = time.After(options.Wait) + } + + // check the ttl of the lock + if lk.ttl > time.Duration(0) { + // time lived for the lock + live := time.Since(lk.time) + + // set a timer for the leftover ttl + if live > lk.ttl { + // release the lock if it expired + m.Release(id) + } else { + ttl = time.After(live) + } + } + +lockLoop: + for { + // wait for the lock to be released + select { + case <-lk.release: + m.Lock() + + // someone locked before us + lk, ok = m.locks[id] + if ok { + m.Unlock() + continue + } + + // got chance to lock + m.locks[id] = &mlock{ + id: id, + time: time.Now(), + ttl: options.TTL, + release: make(chan bool), + } + + m.Unlock() + + break lockLoop + case <-ttl: + // ttl exceeded + m.Release(id) + // TODO: check the ttl again above + ttl = nil + // try acquire + continue + case <-wait: + return lock.ErrLockTimeout + } + } + + return nil +} + +func (m *mutexLock) Release(id string) error { + m.Lock() + defer m.Unlock() + + lk, ok := m.locks[id] + // no lock exists + if !ok { + return nil + } + + // delete the lock + delete(m.locks, id) + + select { + case <-lk.release: + return nil + default: + close(lk.release) + } + + return nil +} + +func NewLock(opts ...lock.Option) lock.Lock { + var options lock.Options + for _, o := range opts { + o(&options) + } + + return &mutexLock{ + locks: make(map[string]*mlock), + } +}