diff --git a/doc/API.md b/doc/API.md index 4b3a2d5..24458fd 100644 --- a/doc/API.md +++ b/doc/API.md @@ -12,7 +12,7 @@ - [Channel([bufferLength = 0]) -> Channel](#channelbufferlength--0---channel) - [Channel.isChannel(value) -> Boolean](#channelischannelvalue---boolean) - [Channel.of(...values) -> Channel](#channelofvalues---channel) - - [Channel.from(iterable | stream.Readable) -> Channel](#channelfromiterable--streamreadable---channel) + - [Channel.from(callback | iterable | stream.Readable) -> Channel](#channelfromcallback--iterable--streamreadable---channel) - [Channel Object](#channel-object) - [every(callbackfn[, thisArg]) -> async Boolean](#everycallbackfn-thisarg---async-boolean) - [filter(callbackfn[, thisArg]) -> Channel](#filtercallbackfn-thisarg---channel) @@ -164,10 +164,14 @@ Return `true` if `value` is a channel, `false` otherwise. Push `values` into a new channel and then close it. -### Channel.from(iterable | stream.Readable) -> Channel +### Channel.from(callback | iterable | stream.Readable) -> Channel -Create a new `Channel` from an iterable or a [Node.js readable -stream](https://nodejs.org/api/stream.html#stream_readable_streams). +Create a new `Channel` from a callback function, an iterable, or a [Node.js +readable stream](https://nodejs.org/api/stream.html#stream_readable_streams). + +If given a callback function, call the function repeatedly to obtain values for +pushing into the channel. Close the channel when the function returns +`undefined`. ## Channel Object diff --git a/lib/index.js b/lib/index.js index c761190..66a71d8 100644 --- a/lib/index.js +++ b/lib/index.js @@ -308,30 +308,44 @@ const Channel = function(length = 0) { Channel.from = values => { const channel = Channel(); + (async () => { + // iterable try { - // iterable for (let value of values) { await channel.push(value); } await channel.close(); } catch (exception) { - // Assume it's a Node.js stream.readable. + // callback function + try { + for (;;) { + const value = values(); - values.on("readable", async () => { - while (true) { - const data = values.read(); - - if (data === null) { + if (value === undefined) { + await channel.close(); break; } else { - await channel.push(data); + await channel.push(value); } } - }); + } catch (exception) { + // Node.js stream.readable + values.on("readable", async () => { + while (true) { + const data = values.read(); - values.once("end", channel.close); + if (data === null) { + break; + } else { + await channel.push(data); + } + } + }); + + values.once("end", channel.close); + } } })(); diff --git a/test/index.js b/test/index.js index 79bd2b1..377bf3e 100644 --- a/test/index.js +++ b/test/index.js @@ -45,6 +45,12 @@ describe(`Channel`, function() { }); describe(`from`, function() { + it(`callback`, async function() { + let counter = 0; + const callback = () => (counter < 3 ? counter++ : undefined); + assert.deepEqual(await Channel.from(callback).values(), [0, 1, 2]); + }); + it(`iterable`, async function() { assert.deepEqual(await Channel.from([0, 1, 2]).values(), [0, 1, 2]); });