from: Accept a callback function.

This commit is contained in:
David Braun 2017-10-21 15:31:59 -04:00
parent 8c26f23ead
commit 6b577ea6cd
No known key found for this signature in database
GPG key ID: 5694EEC4D129BDCF
3 changed files with 38 additions and 14 deletions

View file

@ -12,7 +12,7 @@
- [Channel([bufferLength = 0]) -> Channel](#channelbufferlength--0---channel) - [Channel([bufferLength = 0]) -> Channel](#channelbufferlength--0---channel)
- [Channel.isChannel(value) -> Boolean](#channelischannelvalue---boolean) - [Channel.isChannel(value) -> Boolean](#channelischannelvalue---boolean)
- [Channel.of(...values) -> Channel](#channelofvalues---channel) - [Channel.of(...values) -> Channel](#channelofvalues---channel)
- [Channel.from(iterable | stream.Readable) -> Channel](#channelfromiterable--streamreadable---channel) - [Channel.from(callback | iterable | stream.Readable) -> Channel](#channelfromcallback--iterable--streamreadable---channel)
- [Channel Object](#channel-object) - [Channel Object](#channel-object)
- [every(callbackfn[, thisArg]) -> async Boolean](#everycallbackfn-thisarg---async-boolean) - [every(callbackfn[, thisArg]) -> async Boolean](#everycallbackfn-thisarg---async-boolean)
- [filter(callbackfn[, thisArg]) -> Channel](#filtercallbackfn-thisarg---channel) - [filter(callbackfn[, thisArg]) -> Channel](#filtercallbackfn-thisarg---channel)
@ -164,10 +164,14 @@ Return `true` if `value` is a channel, `false` otherwise.
Push `values` into a new channel and then close it. Push `values` into a new channel and then close it.
### Channel.from(iterable | stream.Readable) -> Channel ### Channel.from(callback | iterable | stream.Readable) -> Channel
Create a new `Channel` from an iterable or a [Node.js readable Create a new `Channel` from a callback function, an iterable, or a [Node.js
stream](https://nodejs.org/api/stream.html#stream_readable_streams). readable stream](https://nodejs.org/api/stream.html#stream_readable_streams).
If given a callback function, call the function repeatedly to obtain values for
pushing into the channel. Close the channel when the function returns
`undefined`.
## Channel Object ## Channel Object

View file

@ -308,30 +308,44 @@ const Channel = function(length = 0) {
Channel.from = values => { Channel.from = values => {
const channel = Channel(); const channel = Channel();
(async () => { (async () => {
// iterable
try { try {
// iterable
for (let value of values) { for (let value of values) {
await channel.push(value); await channel.push(value);
} }
await channel.close(); await channel.close();
} catch (exception) { } catch (exception) {
// Assume it's a Node.js stream.readable. // callback function
try {
for (;;) {
const value = values();
values.on("readable", async () => { if (value === undefined) {
while (true) { await channel.close();
const data = values.read();
if (data === null) {
break; break;
} else { } else {
await channel.push(data); await channel.push(value);
} }
} }
}); } catch (exception) {
// Node.js stream.readable
values.on("readable", async () => {
while (true) {
const data = values.read();
values.once("end", channel.close); if (data === null) {
break;
} else {
await channel.push(data);
}
}
});
values.once("end", channel.close);
}
} }
})(); })();

View file

@ -45,6 +45,12 @@ describe(`Channel`, function() {
}); });
describe(`from`, function() { describe(`from`, function() {
it(`callback`, async function() {
let counter = 0;
const callback = () => (counter < 3 ? counter++ : undefined);
assert.deepEqual(await Channel.from(callback).values(), [0, 1, 2]);
});
it(`iterable`, async function() { it(`iterable`, async function() {
assert.deepEqual(await Channel.from([0, 1, 2]).values(), [0, 1, 2]); assert.deepEqual(await Channel.from([0, 1, 2]).values(), [0, 1, 2]);
}); });