From 6b1c55cff3c59406699c47f664336430675d2761 Mon Sep 17 00:00:00 2001 From: David Braun Date: Wed, 24 Apr 2019 09:27:01 -0400 Subject: [PATCH] Add `flat` and `flatMap`. --- .eslintrc.js | 2 +- README.md | 32 ++++++++--------- doc/API.md | 98 ++++++++++++++++++++++++++++++--------------------- lib/index.js | 22 ++++++++++++ test/index.js | 37 +++++++++++++++++-- 5 files changed, 129 insertions(+), 62 deletions(-) diff --git a/.eslintrc.js b/.eslintrc.js index d57a208..431d136 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -5,7 +5,7 @@ module.exports = { node: true }, parserOptions: { - ecmaVersion: 2017 + ecmaVersion: 2018 }, rules: { quotes: ["error", "backtick"], diff --git a/README.md b/README.md index ea1d9b1..74bc3e8 100644 --- a/README.md +++ b/README.md @@ -1,25 +1,21 @@ # Introduction This is an idiomatic, minimally-opinionated `Channel` type for JavaScript that's -inspired by [Go's channels](https://golang.org/ref/spec#Channel_types). It -works in browsers and in Node.js. If you know how to use an `Array` then you +inspired by [Go's channels](https://golang.org/ref/spec#Channel_types). It +works in browsers and in Node.js. If you know how to use an `Array` then you already know most of how to use a `Channel`. ## Why Go's use of channels for concurrency is amazing and with JavaScript's -async/await feature we have the basis for it as well. All that's missing is a -solid `Channel` type. There are existing libraries but I wanted an idiomatic +async/await feature we have the basis for it as well. All that's missing is a +solid `Channel` type. There are existing libraries but I wanted an idiomatic `Channel` type that's simple and minimally-opinionated. This document assumes you're familiar with Go's channels and why you'd want to -use them. For explanatory background, see my [blog +use them. For explanatory background, see my [blog article](https://www.nodeguy.com/channels-for-javascript/) on the subject. -## Requirements - -ES 2017 - ## Installation ```shell @@ -30,8 +26,8 @@ $ npm install @nodeguy/channel Create a channel with `Channel()`. -To send a value to a channel use `push`. To receive a value from a channel use -`shift`. Always precede the method calls with `await`. Close the channel when +To send a value to a channel use `push`. To receive a value from a channel use +`shift`. Always precede the method calls with `await`. Close the channel when there are no more values to push. ```JavaScript @@ -64,19 +60,19 @@ The [API](doc/API.md) is in the `doc` directory. # Similar Projects -* [Channel](https://github.com/gozala/channel) -* [cochan](https://github.com/skozin/cochan) -* [js-csp](https://github.com/ubolonton/js-csp) -* [node-csp](https://github.com/olahol/node-csp) +- [Channel](https://github.com/gozala/channel) +- [cochan](https://github.com/skozin/cochan) +- [js-csp](https://github.com/ubolonton/js-csp) +- [node-csp](https://github.com/olahol/node-csp) # Copyright Copyright 2017 [David Braun](https://www.NodeGuy.com/) Licensed under the Apache License, Version 2.0 (the "License"); you may not use -these files except in compliance with the License. You may obtain a copy of the -License at `http://www.apache.org/licenses/LICENSE-2.0`. Unless required by +these files except in compliance with the License. You may obtain a copy of the +License at `http://www.apache.org/licenses/LICENSE-2.0`. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the specific language +KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. diff --git a/doc/API.md b/doc/API.md index 348ddf0..8b0bfc5 100644 --- a/doc/API.md +++ b/doc/API.md @@ -1,35 +1,37 @@ -* [New Properties](#new-properties) - * [close() -> (async)](#close---async) - * [readOnly() -> Channel](#readonly---channel) - * [writeOnly() -> Channel](#writeonly---channel) - * [Channel.select(promises) -> (async) channel](#channelselectpromises---async-channel) - * [Examples](#examples) - * [value()](#value) -* [Array-like Properties](#array-like-properties) - * [Channel](#channel) - * [Channel([bufferLength]) -> Channel](#channelbufferlength---channel) - * [Channel.isChannel(value) -> Boolean](#channelischannelvalue---boolean) - * [Channel.of(...values) -> read-only Channel](#channelofvalues---read-only-channel) - * [Channel.from(callback | iterable | stream.Readable[, mapfn [, thisArg]]) -> read-only Channel](#channelfromcallback--iterable--streamreadable-mapfn--thisarg---read-only-channel) - * [Examples](#examples-1) - * [Channel Object](#channel-object) - * [concat(...arguments) -> Channel](#concatarguments---channel) - * [every(callbackfn[, thisArg]) -> (async) Boolean](#everycallbackfn-thisarg---async-boolean) - * [filter(callbackfn[, thisArg]) -> Channel](#filtercallbackfn-thisarg---channel) - * [forEach(callbackfn[, thisArg]) -> (async)](#foreachcallbackfn-thisarg---async) - * [join(separator) -> (async) String](#joinseparator---async-string) - * [length](#length) - * [map(callbackfn[, thisArg]) -> Channel](#mapcallbackfn-thisarg---channel) - * [push(value) -> (async) bufferLength](#pushvalue---async-bufferlength) - * [reduce(callbackfn[, initialValue]) -> (async)](#reducecallbackfn-initialvalue---async) - * [shift() -> (async)](#shift---async) - * [slice(start[, end]) -> Channel](#slicestart-end---channel) - * [some(callbackfn[, thisArg])](#somecallbackfn-thisarg) - * [toString() -> String](#tostring---string) - * [values() -> (async) iterator](#values---async-iterator) -* [Functional API](#functional-api) +- [New Properties](#new-properties) + - [close() -> (async)](#close-async) + - [readOnly() -> Channel](#readonly-channel) + - [writeOnly() -> Channel](#writeonly-channel) + - [Channel.select(promises) -> (async) channel](#channelselectpromises-async-channel) + - [Examples](#examples) + - [value()](#value) +- [Array-like Properties](#array-like-properties) + - [Channel](#channel) + - [Channel([bufferLength]) -> Channel](#channelbufferlength-channel) + - [Channel.isChannel(value) -> Boolean](#channelischannelvalue-boolean) + - [Channel.of(...values) -> read-only Channel](#channelofvalues-read-only-channel) + - [Channel.from(callback | iterable | stream.Readable[, mapfn [, thisArg]]) -> read-only Channel](#channelfromcallback-iterable-streamreadable-mapfn-thisarg-read-only-channel) + - [Examples](#examples-1) + - [Channel Object](#channel-object) + - [concat(...arguments) -> Channel](#concatarguments-channel) + - [every(callbackfn[, thisArg]) -> (async) Boolean](#everycallbackfn-thisarg-async-boolean) + - [filter(callbackfn[, thisArg]) -> Channel](#filtercallbackfn-thisarg-channel) + - [flat([depth = 1]) -> Channel](#flatdepth-1-channel) + - [flatMap (mapperFunction[, thisArg]) -> Channel](#flatmap-mapperfunction-thisarg-channel) + - [forEach(callbackfn[, thisArg]) -> (async)](#foreachcallbackfn-thisarg-async) + - [join(separator) -> (async) String](#joinseparator-async-string) + - [length](#length) + - [map(mapperFunction[, thisArg]) -> Channel](#mapmapperfunction-thisarg-channel) + - [push(value) -> (async) bufferLength](#pushvalue-async-bufferlength) + - [reduce(callbackfn[, initialValue]) -> (async)](#reducecallbackfn-initialvalue-async) + - [shift() -> (async)](#shift-async) + - [slice(start[, end]) -> Channel](#slicestart-end-channel) + - [some(callbackfn[, thisArg])](#somecallbackfn-thisarg) + - [toString() -> String](#tostring-string) + - [values() -> (async) iterator](#values-async-iterator) +- [Functional API](#functional-api) @@ -239,6 +241,23 @@ instead. Unlike in the Array version of `filter`, `callbackfn` is called with only one argument. +### flat([depth = 1]) -> Channel + +Create a new channel with values from the existing channel. If any of the +values are themselves channels, flatten them by pushing their values into the +new channel instead (while repeating this behavior up to `depth` times). + +### flatMap (mapperFunction[, thisArg]) -> Channel + +Call `mapperFunction` once for each value in the channel and flatten the result +(with depth 1). + +If `thisArg` is provided it will be used as the `this` value for each invocation +of `mapperFunction`. If it is not provided, `undefined` is used instead. + +Unlike in `Array`'s `flatMap` method, `mapperFunction` is called with only one +argument. + ### forEach(callbackfn[, thisArg]) -> (async) The promise returned by `forEach` resolves when the channel is closed: @@ -275,25 +294,24 @@ provided, a single comma is used as the separator. The length of the channel's buffer. -### map(callbackfn[, thisArg]) -> Channel +### map(mapperFunction[, thisArg]) -> Channel -`callbackfn` should be a function that accepts one argument. `map` calls -`callbackfn` once for each value in the channel and constructs a new Channel -from the results. +Call `mapperFunction` once for each value in the channel and construct a new +channel with the results. -If a `thisArg` parameter is provided, it will be used as the `this` value for -each invocation of `callbackfn`. If it is not provided, `undefined` is used -instead. +If `thisArg` is provided it will be used as the `this` value for each invocation +of `mapperFunction`. If it is not provided, `undefined` is used instead. -Unlike `Array`'s method, `callbackfn` is called with only one argument. +Unlike in `Array`'s `map` method, `mapperFunction` is called with only one +argument. ### push(value) -> (async) bufferLength Send the value into the channel and return a promise that resolves when the value has been shifted or placed in the buffer. -* Throw a `TypeError` when attempting to push to a closed channel. -* Throw a `TypeError` when attempting to push `undefined` because it's a +- Throw a `TypeError` when attempting to push to a closed channel. +- Throw a `TypeError` when attempting to push `undefined` because it's a reserved value used to indicate a closed channel. The push can be cancelled before completion by calling `cancel` on the returned diff --git a/lib/index.js b/lib/index.js index 0eecf99..4981957 100644 --- a/lib/index.js +++ b/lib/index.js @@ -172,6 +172,28 @@ const Channel = function(length = 0) { return output; }, + flat: depth => { + const output = Channel(); + + (async () => { + await readOnly.forEach(async value => { + if (Channel.isChannel(value)) { + const input = depth > 1 ? value.flat(depth - 1) : value; + await input.forEach(output.push); + } else { + await output.push(value); + } + }); + + await output.close(); + })(); + + return output; + }, + + flatMap: (mapperFunction, thisArg) => + readOnly.map(mapperFunction, thisArg).flat(), + forEach: async (callbackfn, thisArg) => { for (;;) { const value = await readOnly.shift(); diff --git a/test/index.js b/test/index.js index 8ae24c7..7328319 100644 --- a/test/index.js +++ b/test/index.js @@ -1,8 +1,8 @@ "use strict"; -const assert = require("@nodeguy/assert"); -const Channel = require("../lib"); -const stream = require("stream"); +const assert = require(`@nodeguy/assert`); +const Channel = require(`../lib`); +const stream = require(`stream`); const assertRejects = async (callback, reason) => { try { @@ -250,6 +250,37 @@ describe(`Channel object`, function() { ); }); + it(`flat`, async function() { + const flat1 = Channel.of(1, 2, Channel.of(3, 4)).flat(); + assert.deepEqual(await flat1.values(), [1, 2, 3, 4]); + + const flat2 = Channel.of(1, 2, Channel.of(3, 4, Channel.of(5, 6))).flat(); + assert.equal(await flat2.shift(), 1); + assert.equal(await flat2.shift(), 2); + assert.equal(await flat2.shift(), 3); + assert.equal(await flat2.shift(), 4); + assert.deepEqual(await (await flat2.shift()).values(), [5, 6]); + + const flat3 = Channel.of(1, 2, Channel.of(3, 4, Channel.of(5, 6))).flat(2); + assert.deepEqual(await flat3.values(), [1, 2, 3, 4, 5, 6]); + }); + + it(`flatMap`, async function() { + assert.deepEqual( + await Channel.of(1, 2, 3, 4) + .flatMap(x => Channel.of(x * 2)) + .values(), + [2, 4, 6, 8] + ); + + assert.deepEqual( + await Channel.of(`it's Sunny in`, ``, `California`) + .flatMap(x => Channel.from(x.split(` `))) + .values(), + [`it's`, `Sunny`, `in`, ``, `California`] + ); + }); + it(`forEach`, async function() { const output = []; await Channel.of(0, 1, 2).forEach(value => output.push(value));