From fd503a978c435c27d782adf94b5f20b93b979f76 Mon Sep 17 00:00:00 2001 From: David <> Date: Tue, 2 Feb 2021 14:58:39 -0600 Subject: [PATCH] Add 'Channel.all'. --- doc/API.md | 8 +++++++- lib/index.js | 20 ++++++++++++++++++++ test/index.js | 6 ++++++ 3 files changed, 33 insertions(+), 1 deletion(-) diff --git a/doc/API.md b/doc/API.md index 8b0bfc5..baf90e4 100644 --- a/doc/API.md +++ b/doc/API.md @@ -4,6 +4,7 @@ - [close() -> (async)](#close-async) - [readOnly() -> Channel](#readonly-channel) - [writeOnly() -> Channel](#writeonly-channel) + - [Channel.all(channels) -> Channel](#channelallchannels-channel) - [Channel.select(promises) -> (async) channel](#channelselectpromises-async-channel) - [Examples](#examples) - [value()](#value) @@ -44,7 +45,7 @@ The following properties don't have equivalents in `Array`. Close the channel so that no more values can be pushed to it. Return a promise that resolves when any remaining pushes in flight complete. -Attempting to push to a closed channel will throw an exception and shifting from +Attempting to push to a closed channel will throw an exception. Shifting from a closed channel will immediately return `undefined`. ## readOnly() -> Channel @@ -55,6 +56,11 @@ Return a version of the channel that provides only read methods. Return a version of the channel that provides only write methods. +## Channel.all(channels) -> Channel + +Take an array of channels and wait for the next value in each one before pushing +an array of the values to a newly created channel. Similar to `Promise.all`. + ## Channel.select(promises) -> (async) channel Wait for the first channel method promise to succeed and then cancel the rest. diff --git a/lib/index.js b/lib/index.js index f2629c2..f33180d 100644 --- a/lib/index.js +++ b/lib/index.js @@ -364,6 +364,26 @@ const Channel = function(length = 0) { ); }; +Channel.all = channels => { + const output = Channel(); + + (async () => { + for (;;) { + const values = await Promise.all(channels.map(Channel.shift)); + + if (values.every(value => value === undefined)) { + break; + } else { + await output.push(values); + } + } + + await output.close(); + })(); + + return output; +}; + // Node.js stream.readable const fromNodeStream = (channel, stream) => { stream.on(`readable`, async () => { diff --git a/test/index.js b/test/index.js index 4813f4d..0aac8f0 100644 --- a/test/index.js +++ b/test/index.js @@ -44,6 +44,12 @@ describe(`Channel function`, function() { assert.deepEqual(await channel.values(), [0, 1, 2]); }); + it(`all`, async function() { + const a = Channel.of(0, 1); + const b = Channel.of(2, 3); + assert.deepEqual(await Channel.all([a, b]).values(), [[0, 2], [1, 3]]); + }); + describe(`from`, function() { describe(`types`, function() { it(`callback`, async function() {