JavaScript API 之Atomics 与 SharedArrayBuffer – MasterH杂货铺

JavaScript API 之Atomics 与 SharedArrayBuffer

多个上下文访问 SharedArrayBuffer 时,如果同时对缓冲区执行操作,就可能出现资源争用问题。 Atomics API 通过强制同一时刻只能对缓冲区执行一个操作,可以让多个上下文安全地读写一个SharedArrayBuffer。

SharedArrayBuffer

此 API 与 ArrayBuffer 有相同的 API,不同点在于 ArrayBuffer 必须在不同执行上下文间切换,而 SharedArrayBuffer 可以被任意多个执行上下文同时使用。
下面例子演示了4个专用工作线程访问同一个 SharedArrayBuffer 导致的资源争用问题。

const workerScript = `
    self.onmessage = ({data}) => {
    const view = new Uint32Array(data);
    // 执行 1 000 000 次加操作
    for (let i = 0; i < 1E6; ++i) {
    // 线程不安全加操作会导致资源争用
    view[0] += 1;
    }
    self.postMessage(null);
    };
`;

const workerScriptBlobUrl = URL.createObjectURL(new Blob([workerScript]));
// 创建容量为 4 的工作线程池
const workers = [];
for (let i = 0; i < 4; ++i) {
    workers.push(new Worker(workerScriptBlobUrl));
}
// 在最后一个工作线程完成后打印出最终值
let responseCount = 0;
for (const worker of workers) {
    worker.onmessage = () => {
        if (++responseCount == workers.length) {
            console.log(`Final buffer value: ${view[0]}`);
        }
    };
}
// 初始化 SharedArrayBuffer
const sharedArrayBuffer = new SharedArrayBuffer(4);
const view = new Uint32Array(sharedArrayBuffer);
view[0] = 1;
// 把 SharedArrayBuffer 发送到每个工作线程
for (const worker of workers) {
    worker.postMessage(sharedArrayBuffer);
}
//(期待结果为 4000001。实际输出可能类似这样:)
// Final buffer value: 2145106

为了解决这个问题,Atomics API 出来了。此 API 可以保证 SharedArrayBuffer 上的 JavaScript 操作是线程安全的。

原子操作基础

算术及位操作方法

Atomics API 提供了一套简单的方法用以执行就地修改操作。这些操作会使读取、修改、回写操作按顺序执行,不会被其他线程中断。
所有算术方法:

// 创建大小为 1 的缓冲区
let sharedArrayBuffer = new SharedArrayBuffer(1);
// 基于缓冲创建 Uint8Array
let typedArray = new Uint8Array(sharedArrayBuffer);
// 所有 ArrayBuffer 全部初始化为 0
console.log(typedArray); // Uint8Array[0]
const index = 0;
const increment = 5;
// 对索引 0 处的值执行原子加 5
Atomics.add(typedArray, index, increment);
console.log(typedArray); // Uint8Array[5]
// 对索引 0 处的值执行原子减 5
Atomics.sub(typedArray, index, increment);
console.log(typedArray); // Uint8Array[0]

所有位方法:

// 创建大小为 1 的缓冲区
let sharedArrayBuffer = new SharedArrayBuffer(1);
// 基于缓冲创建 Uint8Array
let typedArray = new Uint8Array(sharedArrayBuffer);
// 所有 ArrayBuffer 全部初始化为 0
console.log(typedArray); // Uint8Array[0]
const index = 0;
// 对索引 0 处的值执行原子或 0b1111
Atomics.or(typedArray, index, 0b1111);
console.log(typedArray); // Uint8Array[15]
// 对索引 0 处的值执行原子与 0b1111
Atomics.and(typedArray, index, 0b1100);
console.log(typedArray); // Uint8Array[12]
// 对索引 0 处的值执行原子异或 0b1111
Atomics.xor(typedArray, index, 0b1111);
console.log(typedArray); // Uint8Array[3]

前面线程不安全的例子改写为安全的:

const workerScript = `
    self.onmessage = ({data}) => {
        const view = new Uint32Array(data);
        // 执行 1 000 000 次加操作
        for (let i = 0; i < 1E6; ++i) {
            // 线程安全的加操作
            Atomics.add(view, 0, 1);
        }
    self.postMessage(null);
};
`;
const workerScriptBlobUrl = URL.createObjectURL(new Blob([workerScript]));
// 创建容量为 4 的工作线程池
const workers = [];
for (let i = 0; i < 4; ++i) {
    workers.push(new Worker(workerScriptBlobUrl));
}
// 在最后一个工作线程完成后打印出最终值
let responseCount = 0;
for (const worker of workers) {
    worker.onmessage = () => {
        if (++responseCount == workers.length) {
            console.log(`Final buffer value: ${view[0]}`);
    }
};
}
// 初始化 SharedArrayBuffer
const sharedArrayBuffer = new SharedArrayBuffer(4);
const view = new Uint32Array(sharedArrayBuffer);
view[0] = 1;
// 把 SharedArrayBuffer 发送到每个工作线程
for (const worker of workers) {
    worker.postMessage(sharedArrayBuffer);
}
//(期待结果为 4000001)
// Final buffer value: 4000001

原子读和写

Atomics API 的两种主要作用:

  • 所有原子指令相互之间的顺序永远不会重排
  • 使用原子读或原子写保证所有指令(包括原子和非原子指令)都不会相对原子读/写重新排序。这意味着位于原子读/写之前的所有指令会在原子读/写发生前完成,而位于原子读/写之后的所有指令会在原子读/写完成后才会开始
    Atomics.load() 和 Atomics.store() 可以构建“代码围栏”。JavaScript引擎保证非原子指令可以相对于 load() 或 store() 本地重排,但这个重排不会侵犯原子读/写的边界。

    const sharedArrayBuffer = new SharedArrayBuffer(4);
    const view = new Uint32Array(sharedArrayBuffer);
    // 执行非原子写
    view[0] = 1;
    // 非原子写可以保证在这个读操作之前完成,因此这里一定会读到 1
    console.log(Atomics.load(view, 0)); // 1
    // 执行原子写
    Atomics.store(view, 0, 2);
    // 非原子读可以保证在原子写完成后发生,因此这里一定会读到 2
    console.log(view[0]); // 2

原子交换

exchange() 和 compareExchange() 可以保证不间断的先读后写。
exchange() 只执行简单的交换:

const sharedArrayBuffer = new SharedArrayBuffer(4);
const view = new Uint32Array(sharedArrayBuffer);
// 在索引 0 处写入 3
Atomics.store(view, 0, 3);
// 从索引 0 处读取值,然后在索引 0 处写入 4
console.log(Atomics.exchange(view, 0, 4)); // 3
// 从索引 0 处读取值
console.log(Atomics.load(view, 0)); // 4

compareExchange() 只在目标索引处的值与预期值匹配时才会执行写操作:

const sharedArrayBuffer = new SharedArrayBuffer(4);
const view = new Uint32Array(sharedArrayBuffer);
// 在索引 0 处写入 5
Atomics.store(view, 0, 5);
// 从缓冲区读取值
let initial = Atomics.load(view, 0);
// 对这个值执行非原子操作
let result = initial ** 2;
// 只在缓冲区未被修改的情况下才会向缓冲区写入新值
Atomics.compareExchange(view, 0, initial, result);
// 检查写入成功
console.log(Atomics.load(view, 0)); // 25

如果值不匹配, compareExchange()调用则什么也不做:

const sharedArrayBuffer = new SharedArrayBuffer(4);
const view = new Uint32Array(sharedArrayBuffer);
// 在索引 0 处写入 5
Atomics.store(view, 0, 5);
// 从缓冲区读取值
let initial = Atomics.load(view, 0);
// 对这个值执行非原子操作
let result = initial ** 2;
// 只在缓冲区未被修改的情况下才会向缓冲区写入新值
Atomics.compareExchange(view, 0, -1, result);
// 检查写入失败
console.log(Atomics.load(view, 0)); // 5

原子 Futex 操作与加锁

所有原子 Futex 操作只能用于 Int32Array 视图。而且,也只能用在工作线程内部。
与此相关的的 API 有 Atomics.wait() 和 Atomics.notify()
创建 4 个工作线程,用于对长度为 1 的 Int32Array 进行操作。

const workerScript = `
self.onmessage = ({data}) => {
    const view = new Int32Array(data);
    console.log('Waiting to obtain lock');
    // 遇到初始值则停止, 10 000 毫秒超时
    Atomics.wait(view, 0, 0, 1E5);
    console.log('Obtained lock');
    // 在索引 0 处加 1
    Atomics.add(view, 0, 1);
    console.log('Releasing lock');
    // 只允许 1 个工作线程继续执行
    Atomics.notify(view, 0, 1);
    self.postMessage(null);
};
`;
const workerScriptBlobUrl = URL.createObjectURL(new Blob([workerScript]));
const workers = [];
for (let i = 0; i < 4; ++i) {
    workers.push(new Worker(workerScriptBlobUrl));
}
// 在最后一个工作线程完成后打印出最终值
let responseCount = 0;
for (const worker of workers) {
    worker.onmessage = () => {
        if (++responseCount == workers.length) {
            console.log(`Final buffer value: ${view[0]}`);
        }
    };
}
// 初始化 SharedArrayBuffer
const sharedArrayBuffer = new SharedArrayBuffer(8);
const view = new Int32Array(sharedArrayBuffer);
// 把 SharedArrayBuffer 发送到每个工作线程
for (const worker of workers) {
    worker.postMessage(sharedArrayBuffer);
}
// 1000 毫秒后释放第一个锁
setTimeout(() => Atomics.notify(view, 0, 1), 1000);
// Waiting to obtain lock
// Waiting to obtain lock
// Waiting to obtain lock
// Waiting to obtain lock
// Obtained lock
// Releasing lock
// Obtained lock
// Releasing lock
// Obtained lock
// Releasing lock
// Obtained lock
// Releasing lock
// Final buffer value: 4

因为是使用 0 来初始化 SharedArrayBuffer,所以每个工作线程都会到达 Atomics.wait()并停止执行。在停止状态下,执行线程存在于一个等待队列中,在经过指定时间或在相应索引上调用Atomics.notify() 之前,一直保持暂停状态 。1000 毫秒之后, 顶部执行上下文会调用 Atomics.notify()释放其中一个等待的线程。这个线程执行完毕后会再次调用 Atomics.notify() 释放另一个线程。这个过程会持续到所有线程都执行完毕并通过 postMessage()传出最终的值。

Related Posts

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注