Add 'Channel.all'.
This commit is contained in:
parent
fb598b0270
commit
fd503a978c
3 changed files with 33 additions and 1 deletions
|
@ -4,6 +4,7 @@
|
||||||
- [close() -> (async)](#close-async)
|
- [close() -> (async)](#close-async)
|
||||||
- [readOnly() -> Channel](#readonly-channel)
|
- [readOnly() -> Channel](#readonly-channel)
|
||||||
- [writeOnly() -> Channel](#writeonly-channel)
|
- [writeOnly() -> Channel](#writeonly-channel)
|
||||||
|
- [Channel.all(channels) -> Channel](#channelallchannels-channel)
|
||||||
- [Channel.select(promises) -> (async) channel](#channelselectpromises-async-channel)
|
- [Channel.select(promises) -> (async) channel](#channelselectpromises-async-channel)
|
||||||
- [Examples](#examples)
|
- [Examples](#examples)
|
||||||
- [value()](#value)
|
- [value()](#value)
|
||||||
|
@ -44,7 +45,7 @@ The following properties don't have equivalents in `Array`.
|
||||||
Close the channel so that no more values can be pushed to it. Return a promise
|
Close the channel so that no more values can be pushed to it. Return a promise
|
||||||
that resolves when any remaining pushes in flight complete.
|
that resolves when any remaining pushes in flight complete.
|
||||||
|
|
||||||
Attempting to push to a closed channel will throw an exception and shifting from
|
Attempting to push to a closed channel will throw an exception. Shifting from
|
||||||
a closed channel will immediately return `undefined`.
|
a closed channel will immediately return `undefined`.
|
||||||
|
|
||||||
## readOnly() -> Channel
|
## readOnly() -> Channel
|
||||||
|
@ -55,6 +56,11 @@ Return a version of the channel that provides only read methods.
|
||||||
|
|
||||||
Return a version of the channel that provides only write methods.
|
Return a version of the channel that provides only write methods.
|
||||||
|
|
||||||
|
## Channel.all(channels) -> Channel
|
||||||
|
|
||||||
|
Take an array of channels and wait for the next value in each one before pushing
|
||||||
|
an array of the values to a newly created channel. Similar to `Promise.all`.
|
||||||
|
|
||||||
## Channel.select(promises) -> (async) channel
|
## Channel.select(promises) -> (async) channel
|
||||||
|
|
||||||
Wait for the first channel method promise to succeed and then cancel the rest.
|
Wait for the first channel method promise to succeed and then cancel the rest.
|
||||||
|
|
20
lib/index.js
20
lib/index.js
|
@ -364,6 +364,26 @@ const Channel = function(length = 0) {
|
||||||
);
|
);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Channel.all = channels => {
|
||||||
|
const output = Channel();
|
||||||
|
|
||||||
|
(async () => {
|
||||||
|
for (;;) {
|
||||||
|
const values = await Promise.all(channels.map(Channel.shift));
|
||||||
|
|
||||||
|
if (values.every(value => value === undefined)) {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
await output.push(values);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
await output.close();
|
||||||
|
})();
|
||||||
|
|
||||||
|
return output;
|
||||||
|
};
|
||||||
|
|
||||||
// Node.js stream.readable
|
// Node.js stream.readable
|
||||||
const fromNodeStream = (channel, stream) => {
|
const fromNodeStream = (channel, stream) => {
|
||||||
stream.on(`readable`, async () => {
|
stream.on(`readable`, async () => {
|
||||||
|
|
|
@ -44,6 +44,12 @@ describe(`Channel function`, function() {
|
||||||
assert.deepEqual(await channel.values(), [0, 1, 2]);
|
assert.deepEqual(await channel.values(), [0, 1, 2]);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it(`all`, async function() {
|
||||||
|
const a = Channel.of(0, 1);
|
||||||
|
const b = Channel.of(2, 3);
|
||||||
|
assert.deepEqual(await Channel.all([a, b]).values(), [[0, 2], [1, 3]]);
|
||||||
|
});
|
||||||
|
|
||||||
describe(`from`, function() {
|
describe(`from`, function() {
|
||||||
describe(`types`, function() {
|
describe(`types`, function() {
|
||||||
it(`callback`, async function() {
|
it(`callback`, async function() {
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue