ES2017 新特性:共享内存 和 Atomics

10年服务1亿Tian开发工程师

本文为 《JavaScript 新书:探索 ES2016 与 ES2017》的内容章节,你可以点击链接查看完整目录。

ECMAScript 2017 特性 “”,由Lars T. Hansen设计。它引入了一个新的构造函数 SharedArrayBuffer 和 具有辅助函数的命名空间对象 Atomics 。本章介绍一些细节。

并行(Parallelism) vs. 并发(Concurrency)

在我们开始之前,让我们澄清两个相似但截然不同的术语:并行(Parallelism) 和 并发(Concurrency) 。他们存在许多定义,我使用的定义如下:

  • 并行(Parallelism) (parallel 并行 vs. serial 串行):同时执行多个任务;
  • 并发(Concurrency) (concurrent 并发 vs. sequential 连续):在重叠的时间段内(而不是一个接一个)执行几个任务。

两者密切相关,但不一样:

  • 并行(Parallelism) 中没有 并发(Concurrency) :单个指令,多数据(SIMD)。多次计算并行发生,但是在任何给定的时刻都只执行一个任务(指令)。
  • 并发(Concurrency) 中没有 并行(Parallelism):单核 CPU 上通过时间分配进行多任务处理。

然而,准确地使用这些术语是很难的,这就是为什么交换这些概念通常不是问题。

并行模式

两种并行模式是:

  • 数据并行:同一段代码并行执行多次。这些实例操作同一数据集的不同元素。例如: MapReduce 是一种数据并行本赛季模型。
  • 任务并行:并行执行不同的代码段。例如:web workers 和 Unix model of spawning processes。

JS并行的历史

  • JavaScript 在单线程中执行。某些任务可以异步执行:浏览器通常会在单线程中运行这些任务,然后通过回调将结果重新加入到单线程中。
  • Web workers 将任务并行引入了 JavaScript :这些是相对重量级的进程。每个 workers 都有自己的全局环境。默认情况下,不共享任何内容。 workers 之间的通信(或在 workers 和主线程之间的通信)发展:
  • 起初,你只能发送和接收字符串。
  • 然后,引入结构化克隆:可以发送和接收数据副本。(JSON 数据,TypedArray,正则表达式,Blob对象,ImageData对象等)。它甚至可以正确处理对象之间的循环引用。但是,不能克隆 error 对象,function 对象和 DOM 节点。
  • 可在 workers 之间的转移数据:当接收方获得数据时,发送方失去访问权限。
  • 通过 使用 GPU 计算(它倾向于数据并行处理) :。
  • 输入:您的数据,转换为图像(逐个像素)。
  • 处理:OpenGL像素着色器可以对 GPU 执行任意计算。 您的像素着色器会转换输入图像。
  • 输出:再次可以转换为你的数据类型的图像。
  • SIMD(低级数据并行):通过 支持。它允许你在多个整数或浮点数上执行操作(例如添加加法和平方根)。
  • PJS(代号 River Trail ):这个最终被遗弃的项目的计划是将高级数据并行(通过纯函数来考虑 map-reduce )引入到 JavaScript 。然而,开发人员和引擎实现人员没有足够的兴趣。如果没有实现,就无法对这个API进行实验,因为它不能被 polyfill 。在 2015-01-05,Lars T. Hansen 将从 Firefox 中删除这个实验性的实现。

下一步:SharedArrayBuffer

下一步是什么?对于低级并行,方向很清楚:尽可能地支持 SIMD 和 GPU 。然而,对于高级并行,不太明朗,特别是PJS失败后。

我们需要的是一种尝试多种方法的方式,以找出如何最好地将高级并行引入到JavaScript。遵循可扩展 web 声明的原则,提案 “共享内存 和 Atomics” (即 共享数组缓冲区) 通过提供低级原语来做到这一点,可用于实现更高级的构造。

共享数组缓冲区(Shared Array Buffers)

共享阵列缓冲区是更高并发抽象的基本构建块。它们允许您在多个 workers 和主线程之间共享 SharedArrayBuffer 对象的字节(该缓冲区是共享的,用于访问字节,将其封装在一个 TypedArray 中)这种共享有两个好处:

  • 你可以更快地在 workers 之间共享数据。
  • workers 之间的协调变得更简单和更快(与 postMessage() 相比)。

创建和发送一个共享数组缓冲区(Shared Array Buffers)

// main.js

const worker = new Worker('worker.js');

// To be shared
const sharedBuffer = new SharedArrayBuffer( // (A)
    10 * Int32Array.BYTES_PER_ELEMENT); // 10 elements

// Share sharedBuffer with the worker
worker.postMessage({sharedBuffer}); // clone

// Local only
const sharedArray = new Int32Array(sharedBuffer); // (B)

创建一个共享数组缓冲区(Shared Array Buffers)的方法与创建普通的数组缓冲区(Array Buffer)类似:通过调用构造函数,并以字节的形式指定缓冲区的大小(行A)。你与 workers 共享的是 缓冲区(buffer) 。对于你自己的本地使用,你通常将共享数组缓冲区封装在 TypedArray 中(行B)。

警告:克隆 共享数组缓冲区(Shared Array Buffers)是共享它的正确方式,但是有些引擎仍然会实现了较旧版本的API,并要求您进行传输:

worker.postMessage({sharedBuffer}, [sharedBuffer]); // transfer (deprecated)

在API的最终版本中,传输共享数组缓冲区(Shared Array Buffers)意味着你将无法访问它。

接受一个共享数组缓冲区(Shared Array Buffers)

workers的实现如下所列。

// worker.js

self.addEventListener('message', function (event) {
    const {sharedBuffer} = event.data;
    const sharedArray = new Int32Array(sharedBuffer); // (A)

    // ···
});

我们首先提取发送给我们的共享数组缓冲区(Shared Array Buffers),然后将它封装在一个 TypedArray 中(行A),这样我们就可以在本地使用它了。

Atomics: 安全访问共享数据

问题:优化使代码在 workers 之间无法预测

在单线程中,编译器可以进行优化,以破坏多线程代码。

以下列代码为例:

while (sharedArray[0] === 123) ;

在单线程中,当循环运行时,sharedArray[0] 的值不会改变(如果 sharedArray 是一个数组或 TypedArray ,它不会以某种方式打补丁)。因此,代码可以优化如下:

const tmp = sharedArray[0];
while (tmp === 123) ;

然而,在多线程环境中,这种优化可以防止我们使用此模式来等待另一个线程所做的更改。

另一个例子是以下代码:

// main.js
sharedArray[1] = 11;
sharedArray[2] = 22;

在单线程中,您可以重新排列这些写入操作,因为在中间没有读到任何内容。 对于多线程,当你期望以特定顺序执行写入操作时,就会遇到麻烦:

// worker.js
while (sharedArray[2] !== 22) ;
console.log(sharedArray[1]); // 0 or 11

这些优化使实际上不可能在同一个共享数组缓冲区(Shared Array Buffers)上同步多个 workers 的操作。

解决方案:Atomics

该提案提供了全局对象 Atomics ,该对象有三个主要用例。

用例:同步

Atomics 方法可以用来与其他 workers 进行同步。例如,以下两个操作可以让你读取和写入数据,并且不会被编译器重新排列:

  • Atomics.load(ta : TypedArray, index) : T
  • Atomics.store(ta : TypedArray, index, value : T) : T

这个想法是使用常规操作读取和写入大多数数据,而 Atomics 操作(loadstore 和其他操作)可确保读取和写入安全。通常,您将使用自定义同步机制,例如锁,其实现基于Atomics

这是一个非常简单的例子,它总是有效的,这要感谢 Atomics (我省略了 sharedArray 的设置):

// main.js
console.log('notifying...');
Atomics.store(sharedArray, 0, 123);

// worker.js
while (Atomics.load(sharedArray, 0) !== 123) ;
console.log('notified');

用例:等待通知

使用 while 循环等待通知的效率不是很高,这就是 Atomics 有帮助的原因:

  • Atomics.wait(ta: Int32Array, index, value, timeout)
    ta[index] 上等待通知,但只有当 ta[index]value 时。
  • Atomics.wake(ta : Int32Array, index, count)
    唤醒正在 ta[index] 上等待的 workers 。

用例:Atomics 操作

用 Atomics 操作执行算术运算,这样做不不会被中断,有助于同步。例如

  • Atomics.add(ta : TypedArray, index, value) : T

大致来说,此操作执行:

ta[index] += value;

问题:破坏值

共享内存的另一个有问题是破坏值(garbage,垃圾):读取时,您可能会看到一个中间值 – 既不是新值之前写入内存的值,也不是新值。

规范中的 “Tear-Free Reads” 表示,当且仅当以下情况下不会破坏:

  • 读写都通过 TypeArray(而不是数据视图)进行。
  • TypeArray 和 共享数组缓冲区(Shared Array Buffers) 都是对称的:
    sharedArray.byteOffset % sharedArray.BYTES_PER_ELEMENT === 0
  • TypeArray 每个元素的字节数相同。

换句话说,每当通过以下方式访问相同的 共享数组缓冲区(Shared Array Buffers)时,破坏值是一个问题:

  • 一个或多个数据视图
  • 一个或多个未对称的 TypeArray
  • 具有不同元素大小的 TypeArray

为避免这些情况下的破坏值,请使用 Atomics 或同步。

共享数组缓冲区(Shared Array Buffers)的使用

共享数组缓冲区(Shared Array Buffers)和 JavaScript 的运行到完成语义

JavaScript具有所谓的 run-to-completion semantics(运行到完成语义):每个函数都可以依赖于不被另一个线程中断,直到它完成。函数变成事务,并且可以执行完整的算法,而不需要任何人查看它们在中间状态下的数据。

共享数组缓冲区(Shared Array Buffers)中断运行到完成(RTC):函数正在运行时的数据可以在函数运行时被另一个线程更改。然而,代码完全控制了这种违反RTC的行为是否发生:如果它不使用共享数组缓冲区,那么它是安全的。

这与 async 异步函数违反 RTC 的方式有点类似。在这里,您可以通过关键字 await 进入一个阻塞操作。

共享数组缓冲区和 asm.js 和 WebAssembly

共享数组缓冲区使 emscripten 可以将 pthreads 编译成 asm.js 。 引用 emscripten 文档页面:

共享数组缓冲区使emscripten能够将pthreads编译成asm.js。:

[共享数组缓冲区允许] Emscripten 应用程序共享 web worker 之间的主内存堆。这与低级 atomics 和 futex 支持的原语一起使 Emscripten 能够实现对 Pthreads(POSIX线程)API的支持。

也就是说,您可以将多线程 C 和 C++ 代码编译为 asm.js 。

讨论如何最好地将多线程引入 WebAssembly 的 。鉴于 web worker 相对重量级,WebAssembly 可能会引入轻量级线程。您还可以在 上看到线程。

共享除整数之外的数据

目前,只能共享整数的数组(最多32位)。这意味着共享其他类型数据的唯一方式是将其编码为整数。有用的工具包括:

  • :前者将字符串转换为 Uint8Array 的实例。后者则相反。
  • :将字符串作为字符数组处理的库。使用 Array Buffers(数组缓冲区)。
  • :通过在平面内存(ArrayBuffer和SharedArrayBuffer) 中存储复杂数据结构( structs 、类 和 数组 )的方式来增强JavaScript 。JavaScript + FlatJS 被编译成普通的 JavaScript 。支持 JavaScript dialects( TypeScript 等)。
  • :是一种用于快速并行本赛季的 JavaScript dialects 。 它编译为 asm.js 和 WebAssembly 。

最终,可能会有更多的更高级别的共享数据机制。实验将继续找出这些机制应该是什么样的。

使用共享数组缓冲区的代码到底快多少?

Lars T. Hansen 编写了 Mandelbrot 算法的两个实现(如他的文章“ ”中所述,您可以在线尝试):使用串行版本和并行版本的多个 web worker。对于最多 4 个 web worker(以及处理器核数),加速几乎线性提高,从每秒 6.9 帧(1个web worker)到每秒25.4帧(4个 web worker)。更多 web worker 带来更多的性能改进但更适度。

Hansen 指出,加速令人印象深刻,但并行是代码更复杂的代价。

示例

我们来看一个更全面的例子。代码在 GitHub 的 仓库中获得 。你可以。

使用共享锁

在主线程中,我们设置了共享内存,使其编码一个封闭的锁,并将其发送给一个web worker员(A行)。 用户点击后,我们打开锁(B行)。

// main.js

// Set up the shared memory
const sharedBuffer = new SharedArrayBuffer(
    1 * Int32Array.BYTES_PER_ELEMENT);
const sharedArray = new Int32Array(sharedBuffer);

// Set up the lock
Lock.initialize(sharedArray, 0);
const lock = new Lock(sharedArray, 0);
lock.lock(); // writes to sharedBuffer

worker.postMessage({sharedBuffer}); // (A)

document.getElementById('unlock').addEventListener(
    'click', event => {
        event.preventDefault();
        lock.unlock(); // (B)
    });

在 worker 中,我们设置了一个本地版本的锁(其状态通过共享数组缓冲区与主线程共享)。在 B 行中,我们等待解锁。在 A 和 C 行中,我们发送文本到主线程,它显示在我们的页面上(如何做到这一点没有在前面的代码片段中显示)。也就是说,我们在这两行中使用 self.postMessage() 非常类似于 console.log()

// worker.js

self.addEventListener('message', function (event) {
    const {sharedBuffer} = event.data;
    const lock = new Lock(new Int32Array(sharedBuffer), 0);

    self.postMessage('Waiting for lock...'); // (A)
    lock.lock(); // (B) blocks!
    self.postMessage('Unlocked'); // (C)
});

值得注意的是,在 B 行中等待解锁是完全停止的。这是真正的阻塞,直到目前为止JavaScript还没有(近似于 async 异步函数中的 await)。

实现共享锁

接下来,我们将看看 Lars T. Hansen ES6-ifie 版本的 Lock 实现 ,基于 SharedArrayBuffer 。

在本节中,我们(以及其他地方)需要以下 Atomics 函数:

  • Atomics.compareExchange(ta : TypedArray, index, expectedValue, replacementValue) : T
    如果 ta[index] 的当前的元素值为 expectedValue ,那么用 replacementValue 替换它。返回 index 索引值的前一个(或未更改)元素。

如果索引的当前元素是预期值,则用替换值替换它。返回索引中的前一个(或未更改的)元素。

实现从几个常量和构造函数开始:

const UNLOCKED = 0;
const LOCKED_NO_WAITERS = 1;
const LOCKED_POSSIBLE_WAITERS = 2;

// Number of shared Int32 locations needed by the lock.
const NUMINTS = 1;

class Lock {

    /**
     * @param iab an Int32Array wrapping a SharedArrayBuffer
     * @param ibase an index inside iab, leaving enough room for NUMINTS
     */
    constructor(iab, ibase) {
        // OMITTED: check parameters
        this.iab = iab;
        this.ibase = ibase;
    }

构造函数主要将其参数存储在实例属性中。

上锁方法如下。

/**
 * Acquire the lock, or block until we can. Locking is not recursive:
 * you must not hold the lock when calling this.
 */
lock() {
    const iab = this.iab;
    const stateIdx = this.ibase;
    var c;
    if ((c = Atomics.compareExchange(iab, stateIdx, // (A)
    UNLOCKED, LOCKED_NO_WAITERS)) !== UNLOCKED) {
        do {
            if (c === LOCKED_POSSIBLE_WAITERS // (B)
            || Atomics.compareExchange(iab, stateIdx,
            LOCKED_NO_WAITERS, LOCKED_POSSIBLE_WAITERS) !== UNLOCKED) {
                Atomics.wait(iab, stateIdx, // (C)
                    LOCKED_POSSIBLE_WAITERS, Number.POSITIVE_INFINITY);
            }
        } while ((c = Atomics.compareExchange(iab, stateIdx,
        UNLOCKED, LOCKED_POSSIBLE_WAITERS)) !== UNLOCKED);
    }
}

在A行中,如果 lock(锁) 当前值为 UNLOCKED ,则将锁定更改为 LOCKED_NO_WAITERS 。如果 lock(锁) 已经被上锁,我们只会进入当时的块(在这种情况下,compareExchange()没有改变任何东西)。

在B行中(在 do-while 循环内),我们检查 lock(锁) 是否被 waiters(等待者)上锁。鉴于我们即将等待,如果当前值为 LOCKED_NO_WAITERS ,那么 compareExchange() 也会切换到LOCKED_POSSIBLE_WAITERS

在C行中,如果 lock(锁) 的值为 LOCKED_POSSIBLE_WAITERS 我们将等待。
最后一个参数 Number.POSITIVE_INFINITY,意味着等待永不超时。

唤醒后,如果我们没有解锁,我们会继续循环。 如果 lock(锁) 为 UNLOCKED ,则 compareExchange() 也会切换到 LOCKED_POSSIBLE_WAITERS 。我们使用 LOCKED_POSSIBLE_WAITERS 而不是 LOCKED_NO_WAITERS ,因为我们需要在 unlock() 之后恢复这个值,所以暂时把它设置为 UNLOCKED ,并唤醒我们。

解锁方法如下。

    /**
     * Unlock a lock that is held.  Anyone can unlock a lock that
     * is held; nobody can unlock a lock that is not held.
     */
    unlock() {
        const iab = this.iab;
        const stateIdx = this.ibase;
        var v0 = Atomics.sub(iab, stateIdx, 1); // A

        // Wake up a waiter if there are any
        if (v0 !== LOCKED_NO_WAITERS) {
            Atomics.store(iab, stateIdx, UNLOCKED);
            Atomics.wake(iab, stateIdx, 1);
        }
    }

    // ···
}

在行A中,v0 得到 iab[stateIdx] 在从中减去1之前的值。减法表示我们(例如)从 LOCKED_NO_WAITERSUNLOCKED,从LOCKED_POSSIBLE_WAITERSLOCKED

如果该值以前是 LOCKED_NO_WAITERS ,那么它现在是 UNLOCKED ,一切都很好(没有一个被唤醒)。

否则,该值为 LOCKED_POSSIBLE_WAITERSUNLOCKED 。在前一种情况下,我们现在已经解锁了,必须唤醒某一个(谁通常会再次上锁)。在后一种情况下,我们必须回复由减法创建的非法值,而 wake() 只是简单的不做任何事情。

示例结论

这给出了一个大致的基于 SharedArrayBuffer 的 lock(锁) 的工作原理。请记住,多线程代码是非常难写的,因为任何时候事情都可能发生变化。比如:lock.js 是基于一篇记录 Linux 内核 futex 论文实现的。该论文的标题是 “”(PDF) 。

如果要更深入地使用Shared Array Buffers进行并行本赛季,请查看 以及。

共享内存 和 Atomics 的 API

SharedArrayBuffer

构造函数:

  • new SharedArrayBuffer(length)
    创建一个 length 字节的 buffer(缓冲区)。

静态属性:

  • get SharedArrayBuffer[Symbol.species]
    默认情况下返回 this。 覆盖以控制 slice() 的返回。

实例属性:

  • get SharedArrayBuffer.prototype.byteLength()
    返回 buffer(缓冲区) 的字节长度。
  • SharedArrayBuffer.prototype.slice(start, end)
    创建一个新的 this.constructor[Symbol.species] 实例,并用字节填充从(包括)开始到(不包括)结束的索引。

Atomics

Atomic 函数的主要操作数必须是 Int8ArrayUint8ArrayInt16ArrayUint16ArrayInt32ArrayUint32Array 的一个实例。它必须包裹一个 SharedArrayBuffer

所有函数都以 atomically 方式进行操作。存储操作的顺序是固定的并且不能由编译器或 CPU 重新排序。

加载和存储

  • Atomics.load(ta : TypedArray<T>, index) : T
    读取和返回 ta[index] 上的元素,返回数组指定位置上的值。
  • Atomics.store(ta : TypedArray<T>, index, value : T) : T
    ta[index] 上写入 value,并且返回 value
  • Atomics.exchange(ta : TypedArray<T>, index, value : T) : T
    ta[index] 上的元素设置为 value ,并且返回索引 index 原先的值。
  • Atomics.compareExchange(ta : TypedArray<T>, index, expectedValue, replacementValue) : T
    如果 ta[index] 上的当前元素为 expectedValue , 那么使用 replacementValue 替换。并且返回索引 index 原先(或者未改变)的值。

简单修改 TypeArray 元素

以下每个函数都会在给定索引处更改 TypeArray 元素:它将一个操作符应用于元素和参数,并将结果写回元素。它返回元素的原始值。

  • Atomics.add(ta : TypedArray<T>, index, value) : T
    执行 ta[index] += value 并返回 ta[index] 的原始值。
  • Atomics.sub(ta : TypedArray<T>, index, value) : T
    执行 ta[index] -= value 并返回 ta[index] 的原始值。
  • Atomics.and(ta : TypedArray<T>, index, value) : T
    执行 ta[index] &amp;= value 并返回 ta[index] 的原始值。
  • Atomics.or(ta : TypedArray<T>, index, value) : T
    执行 ta[index] |= value 并返回 ta[index] 的原始值。
  • Atomics.xor(ta : TypedArray<T>, index, value) : T
    执行 ta[index] ^= value 并返回 ta[index] 的原始值。

等待和唤醒

等待和唤醒要求参数 ta 必须是 Int32Array 的一个实例。

  • Atomics.wait(ta: Int32Array, index, value, timeout=Number.POSITIVE_INFINITY) : ('not-equal' | 'ok' | 'timed-out')
    如果 ta[index] 的当前值不是 value ,则返回 'not-equal'。否则继续等待,直到我们通过 Atomics.wake() 唤醒或直到等待超时。 在前一种情况下,返回 'ok'。在后一种情况下,返回'timed-out'timeout 以毫秒为单位。记住此函数执行的操作:“如果 ta[index]value,那么继续等待” 。
  • Atomics.wake(ta : Int32Array, index, count)
    唤醒等待在 ta[index] 上的 count workers。

其他

  • Atomics.isLockFree(size)
    这个函数允许您询问 JavaScript 引擎,如果使用给定 size (字节)的操作数可以在不锁定的情况下进行操作。这可以告诉算法是否需要依赖内置的原语( compareExchange() 等)或使用它们自己的锁。Atomics.isLockFree(4) 总是返回 true,因为这是当前所有相关的支持。

常见问题

什么浏览器支持共享数组缓冲区?

目前,我知道的几个:

  • Firefox (50.1.0+): 进入 about:config 并且设置 javascript.options.shared_memorytrue
  • Safari Technology Preview (Release 21+): 默认启用。
  • Chrome Canary (58.0+): 有两种方法开启:
  • 通过 chrome://flags/ (“Experimental enabled SharedArrayBuffer support in JavaScript”)
  • --js-flags=--harmony-sharedarraybuffer --enable-blink-feature=SharedArrayBuffer

进一步阅读

有关共享数组缓冲区和支持技术的更多信息:

  • “” by Lars T. Hansen
  • “” by Lars T. Hansen [一个很好的关于共享数组缓冲区的介绍]
  • “” (PDF), slides by Shu-yu Guo and Lars T. Hansen (2016-11-30) [slides accompanying the ES proposal]
  • “” by Eric Bidelman [web workers 介绍]

与并行相关的其他 JavaScript 技术:

  • “” by Dave Herman [放弃 PJS 后的 JavaScript]
  • “” by Steve Sanderson [令人着迷的演讲,介绍如何让 WebGL 在 GPU 上做计算]

并行背景:

  • “” by Rob Pike [Pike 使用的 并发和并行 术语,与本章略有不同,提供了一个有趣的补充观点]

致谢:我非常感谢 Lars T. Hansen 审查本章并回答了我一些关于 SharedArrayBuffer 的问题。

原文链接:

赞(0) 打赏
未经允许不得转载:WEBTian开发 » ES2017 新特性:共享内存 和 Atomics

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

Tian开发相关广告投放 更专业 更精准

联系我们

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏