Hello!
In this article I would like to tell you how you could make your own RWMutex, but with the ability to skip the lock by timeout or by triggering the context. That is, implement TryLock (context.Context) and RTryLock (context.Context), but for your own Mutex.
The picture shows how to pour water into a very narrow neck.
To begin with, it should be clarified that for 99% of tasks such methods are not needed at all. They are needed when the blocked resource may not be released for a very long time. I would like to note that if a blocked resource remains busy for a long time, it is worth trying to optimize the logic in the beginning in such a way as to minimize the blocking time.
For more information, see Dancing with Mutexes in Go in Example 2.
But if, nevertheless, we have to have a long retention of one resource flow, then it seems to me that TryLock will be difficult to do without.
, , atomic, . , . , , . , , .
Mutex:
// RWTMutex - Read Write and Try Mutex
type RWTMutex struct {
state int32
mx sync.Mutex
ch chan struct{}
}
state â mutex, atomic.AddInt32, atomic.LoadInt32 atomic.CompareAndSwapInt32
ch â , .
mx â , , .
:
// TryLock - try locks mutex with context
func (m *RWTMutex) TryLock(ctx context.Context) bool {
if atomic.CompareAndSwapInt32(&m.state, 0, -1) {
return true
}
// Slow way
return m.lockST(ctx)
}
// RTryLock - try read locks mutex with context
func (m *RWTMutex) RTryLock(ctx context.Context) bool {
k := atomic.LoadInt32(&m.state)
if k >= 0 && atomic.CompareAndSwapInt32(&m.state, k, k+1) {
return true
}
// Slow way
return m.rlockST(ctx)
}
As you can see, if the Mutex is not locked, then it can be simply blocked, but if not, then we will move on to a more complex scheme.
At the beginning, we get the channel, and we go into an infinite loop, if it turned out to be locked, we exit with success, and if not, then we start waiting for one of the 2 events, or that the channel is unblocked, or that the ctx.Done () stream will unblock:
func (m *RWTMutex) chGet() chan struct{} {
m.mx.Lock()
if m.ch == nil {
m.ch = make(chan struct{}, 1)
}
r := m.ch
m.mx.Unlock()
return r
}
func (m *RWTMutex) lockST(ctx context.Context) bool {
ch := m.chGet()
for {
if atomic.CompareAndSwapInt32(&m.state, 0, -1) {
return true
}
if ctx == nil {
return false
}
select {
case <-ch:
ch = m.chGet()
case <-ctx.Done():
return false
}
}
}
func (m *RWTMutex) rlockST(ctx context.Context) bool {
ch := m.chGet()
var k int32
for {
k = atomic.LoadInt32(&m.state)
if k >= 0 && atomic.CompareAndSwapInt32(&m.state, k, k+1) {
return true
}
if ctx == nil {
return false
}
select {
case <-ch:
ch = m.chGet()
case <-ctx.Done():
return false
}
}
}
Let's unblock the mutex.
We need to change the state and, if necessary, unblock the channel.
As I wrote above, if the channel is closed, then case <-ch will skip the execution flow further.
func (m *RWTMutex) chClose() {
if m.ch == nil {
return
}
var o chan struct{}
m.mx.Lock()
if m.ch != nil {
o = m.ch
m.ch = nil
}
m.mx.Unlock()
if o != nil {
close(o)
}
}
// Unlock - unlocks mutex
func (m *RWTMutex) Unlock() {
if atomic.CompareAndSwapInt32(&m.state, -1, 0) {
m.chClose()
return
}
panic("RWTMutex: Unlock fail")
}
// RUnlock - unlocks mutex
func (m *RWTMutex) RUnlock() {
i := atomic.AddInt32(&m.state, -1)
if i > 0 {
return
} else if i == 0 {
m.chClose()
return
}
panic("RWTMutex: RUnlock fail")
}
The mutex itself is ready, you need to write a couple of tests and standard methods like Lock () and RLock () for it
Benchmarks on my car showed these speeds
BenchmarkRWTMutexTryLockUnlock-8 92154297 12.8 ns/op 0 B/op 0 allocs/op
BenchmarkRWTMutexTryRLockRUnlock-8 64337136 18.4 ns/op 0 B/op 0 allocs/op
RWMutex
BenchmarkRWMutexLockUnlock-8 44187962 25.8 ns/op 0 B/op 0 allocs/op
BenchmarkRWMutexRLockRUnlock-8 94655520 12.6 ns/op 0 B/op 0 allocs/op
Mutex
BenchmarkMutexLockUnlock-8 94345815 12.7 ns/op 0 B/op 0 allocs/op
That is, the speed of work is comparable to the usual RWMutex and Mutex.