Using Atomics.wait (), Atomics.notify (), and Atomics.waitAsync ()

The static methods Atomics.wait () and Atomics.notify () are low-level synchronization primitives that can be used to implement mutexes and other similar mechanisms. But, since the method Atomics.wait()is blocking, it cannot be called on the main thread (if you try to do this, an error will be thrown TypeError).



The V8 engine, since version 8.7, supports a non-blocking option Atomics.wait()called Atomics.waitAsync () . This new method can be used on the main thread. Today we will show you how to use these low-level APIs to create a mutex that can run both synchronously (on worker threads) and asynchronously (on worker threads or on the main thread).











Atomics.wait () and Atomics.waitAsync ()



Methods Atomics.wait()and Atomics.waitAsync()take the following parameters:



  • buffer: an array of type Int32Arrayor BigInt64Array, which is based on SharedArrayBuffer.
  • index: the actual index of the element in the array.
  • expectedValue: the value that we expect to be represented in memory, at the location described with bufferand index.
  • timeout: timeout in milliseconds (optional, defaults to Infinity).


Atomics.wait()returns a string. If the expected value is not found in the specified memory location, it Atomics.wait()immediately exits, returning a string not-equal. Otherwise, the thread is blocked. One of the following events must occur for the lock to be released. The first is a call from another thread of a method Atomics.notify()with an indication of the place in memory that the method is interested in Atomics.wait(). The second is the expiration of the timeout. In the first case, it will Atomics.wait()return a string ok, in the second - a string value timed-out.



The method Atomics.notify()takes the following parameters:



  • typedArray: an array of type Int32Arrayor BigInt64Array, which is based on SharedArrayBuffer.
  • index: the actual index of the element in the array.
  • count: number of agents waiting for notification (optional parameter, set to by default Infinity).


The method Atomics.notify()notifies the specified number of agents waiting for notification at the address described typedArrayand indexbypassing them in FIFO order. If several calls have been made Atomics.wait()or Atomics.waitAsync()are watching the same place in memory, then they all end up in the same queue.



Unlike a method Atomics.wait(), a method Atomics.waitAsync()immediately returns a value where it is called. It can be one of the following values:



  • { async: false, value: 'not-equal' } - if the specified memory location does not contain the expected value.
  • { async: false, value: 'timed-out' } - only when the timeout is set to 0.
  • { async: true, value: promise } - in other cases.


A promise, after some time, can be successfully resolved by a string value ok(if a method was called Atomics.notify(), to which information about the place in memory that was passed was passed Atomics.waitAsync()). It can be resolved with a value timed-out. This promise is never rejected.



The following example demonstrates the basics of use Atomics.waitAsync():



const sab = new SharedArrayBuffer(16);
const i32a = new Int32Array(sab);
const result = Atomics.waitAsync(i32a, 0, 0, 1000);
//                                     |  |  ^ - ()
//                                     |  ^  
//                                     ^ 

if (result.value === 'not-equal') {
  //   SharedArrayBuffer   .
} else {
  result.value instanceof Promise; // true
  result.value.then(
    (value) => {
      if (value == 'ok') { /*   */ }
      else { /*  - */ }
    });
}

//      :
Atomics.notify(i32a, 0);


Now let's talk about how to create a mutex that can be used in both synchronous and asynchronous modes. It should be noted that the implementation of the synchronous version of the mutex has been discussed previously. For example - in this material.



In this example, we will not use the parameter timeoutwhen calling Atomics.wait()and Atomics.waitAsync(). This parameter can be used to implement timeout related conditionals.



Our class AsyncLockrepresenting a mutex works with a buffer SharedArrayBufferand implements the following methods:



  • lock(): blocks the thread until we have the opportunity to capture the mutex (applicable only in the worker thread).
  • unlock(): frees the mutex (this one is the opposite lock()).
  • executeLocked(callback): tries to acquire the lock without blocking the thread. This method can be used on the main thread. It plans to execute the callback at the time when we can acquire the lock.


Let's take a look at how these methods can be implemented. The class declaration includes constants and a constructor that takes a buffer SharedArrayBuffer.



class AsyncLock {
  static INDEX = 0;
  static UNLOCKED = 0;
  static LOCKED = 1;

  constructor(sab) {
    this.sab = sab;
    this.i32a = new Int32Array(sab);
  }

  lock() {
    /* … */
  }

  unlock() {
    /* … */
  }

  executeLocked(f) {
    /* … */
  }
}


Here the element i32a[0]contains the value LOCKEDor UNLOCKED. He, in addition, represents the place in memory that interests Atomics.wait()and Atomics.waitAsync(). The class AsyncLockprovides the following basic capabilities:



  1. i32a[0] == LOCKEDand the thread is in a waiting state (after a call Atomics.wait()or Atomics.waitAsync()), watching i32a[0], it will eventually be notified.
  2. After the thread is notified, it will try to acquire the lock. If it succeeds, then, when it releases the lock, it will call Atomics.notify().


Synchronous lock capture and release



Consider the code for a method lock()that can only be called from a worker thread.



lock() {
  while (true) {
    const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
                        /*   >>> */  AsyncLock.UNLOCKED,
                        /*   >>> */  AsyncLock.LOCKED);
    if (oldValue == AsyncLock.UNLOCKED) {
      return;
    }
    Atomics.wait(this.i32a, AsyncLock.INDEX,
                 AsyncLock.LOCKED); // <<< ,    
  }
}


When a method is called from a thread lock(), it first tries to acquire the lock, using it Atomics.compareExchange()to change the state of the lock from UNLOCKEDto LOCKED. The method Atomics.compareExchange()tries to perform an atomic operation of changing the lock state, it returns the original value located in the specified memory area. If the original value was UNLOCKED, this will tell us that the state change was successful and that the thread has acquired the lock. You don't need to do anything else.



If it Atomics.compareExchange()could not change the state of the lock, it means that another thread is holding the lock. As a result, the thread from which the method is called lock()tries to use the methodAtomics.wait()in order to wait until the lock is released by another thread. If the expected value is still stored in the memory area of ​​interest (in our case - AsyncLock.LOCKED), then the call will Atomics.wait()block the thread. The return from Atomics.wait()will only happen when another thread calls Atomics.notify().



The method unlock()releases the lock by setting it to the state UNLOCKEDand calls Atomics.notify()it to notify agents that are waiting for the lock to be released. It is assumed that a lock state change operation always succeeds. This is because the thread performing this operation is holding a lock. Therefore, nothing else should call the method at this time unlock().



unlock() {
  const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
                      /*   >>> */  AsyncLock.LOCKED,
                      /*   >>> */  AsyncLock.UNLOCKED);
  if (oldValue != AsyncLock.LOCKED) {
    throw new Error('Tried to unlock while not holding the mutex');
  }
  Atomics.notify(this.i32a, AsyncLock.INDEX, 1);
}


In a typical case, everything happens like this: the lock is free and the thread T1 captures it, changing its state using Atomics.compareExchange(). Thread T2 tries to acquire the lock by calling Atomics.compareExchange(), but cannot change its state. Then T2 calls Atomics.wait(), this call will block the thread. After some time, thread T1 releases the lock and calls Atomics.notify(). This causes the call Atomics.wait()to T2 to return okand thread T2 to exit the lock. T2 then tries to acquire the lock again. This time he succeeds.



There are two special cases here. Their analysis aims to demonstrate the reasons why Atomics.wait(), and Atomics.waitAsync()check for a specific value at the specified index of the array element. These are the cases:



  • T1 , T2 . T2 , Atomics.compareExchange(), . T1 , T2 Atomics.wait(). T2 Atomics.wait(), not-equal. T2 .
  • T1 , T2 Atomics.wait() . T1 , T2 ( Atomics.wait()) Atomics.compareExchange() . , T3, . . Atomics.compareExchange() T2 . T2 Atomics.wait() , T3 .


The last special case demonstrates the fact that our mutex is not working fair. It may happen that the thread T2 was waiting for the lock to be released, but T3 managed to acquire it immediately after it was released. A lock implementation that is more suitable for real-world use can use several lock states that exist to distinguish between situations in which the lock was simply “acquired” and in which “there was a conflict during the acquisition”.



Asynchronous lock capture



A non-blocking method executeLocked()can, unlike a method lock(), be called from the main thread. It receives, as the only parameter, a callback, and schedules the callback after a successful lock is acquired.



executeLocked(f) {
  const self = this;

  async function tryGetLock() {
    while (true) {
      const oldValue = Atomics.compareExchange(self.i32a, AsyncLock.INDEX,
                          /*   >>> */  AsyncLock.UNLOCKED,
                          /*   >>> */  AsyncLock.LOCKED);
      if (oldValue == AsyncLock.UNLOCKED) {
        f();
        self.unlock();
        return;
      }
      const result = Atomics.waitAsync(self.i32a, AsyncLock.INDEX,
                                       AsyncLock.LOCKED);
                                   //  ^ ,    
      await result.value;
    }
  }

  tryGetLock();
}


The inner function tryGetLock()first tries to acquire the lock with Atomics.compareExchange(). If calling this method results in a successful lock state change, the function can call a callback, and then release the lock and exit.



If the call Atomics.compareExchange()to acquire the lock did not allow, we need to try to do it again, at the moment when the lock will probably be free. But we cannot block the thread and wait for the lock to be released. Instead, we Atomics.waitAsync()are scheduling a new attempt to acquire the lock using the method and the promise it returns.



If we succeeded in executing the method Atomics.waitAsync(), then the promise returned by this method is resolved when the thread that held the lock callsAtomics.notify()... After that, the thread that wanted to acquire the lock, as before, tries to do it again.



Here, those special cases are possible that are characteristic of the synchronous version (the lock is released between calls Atomics.compareExchange()and Atomics.waitAsync(); the lock is captured by another thread, doing this between the moments of resolving the promise and the call Atomics.compareExchange()). Therefore, in a similar code applicable in real projects, this must be taken into account.



Outcome



In this article, we talked about the low-level synchronization primitives Atomics.wait(), Atomics.waitAsync()and Atomics.notify(). We have analyzed an example of creating a mutex based on them, which can be used both in the main thread and in worker threads.



Will Atomics.wait (), Atomics.waitAsync (), and Atomics.notify () be useful in your projects?



All Articles