9.7 KiB
Introduction
This is an idiomatic, minimally-opinionated Channel
type for JavaScript that's
inspired by Go's channels. It
works in browsers and in Node.js. If you know how to use an Array
then you
already know most of how to use a Channel
.
Why
Go's use of channels for concurrency is amazing and with JavaScript's
async/await feature we have the basis for it as well. All that's missing is a
solid Channel
type. There are existing libraries but I wanted an idiomatic
Channel
type that's more simple and minimally-opinionated.
This document assumes you're familiar with Go's channels and why you'd want to use them. For explanatory background, read my blog article on the subject.
Requirements
ES 2017
Installation
$ npm install @nodeguy/channel
Basic Use
Create a channel with Channel()
.
To send an value to a channel, use push
. To receive an value from a channel,
use shift
. Always precede the method calls with await
:
const assert = require(`assert`)
const Channel = require(`@nodeguy/channel`)
const channel = Channel()
;(async () => {
await channel.push(42)
})()
;(async () => {
assert.equal(await channel.shift(), 42)
})()
The push
and shift
methods are usually called in different async functions.
They represent the two different ends of the channel and act to synchronize the
behavior of the async functions.
API
New Properties
The following properties don't have equivalents in Array
.
close() -> async
Closes the channel so that no more values can be pushed to it. Returns a promise that resolves when any remaining pushes in flight complete.
Attempting to push to a closed channel will throw an exception and shifting from
a closed channel will immediately return undefined
.
readOnly() -> Channel
Returns a version of the channel that provides only read methods.
value
Set to the most recently shift
ed value. This is useful when used in
combination with select
.
writeOnly() -> Channel
Returns a version of the channel that provides only write methods.
Channel.select(methods) -> async channel
Channel.select
attempts to call multiple channel methods
in parallel and
returns the channel of the first one that succeeds. Only the winning method is
executed to completion—the other methods have no effect.
All of the methods can be cancelled before completion by calling cancel
on the
promise returned from select
.
Examples
Imagine you're at a party and your next conversation depends on whom you run into first: Alice, Bob, or Charlie.
switch (await Channel.select(alice.shift(), bob.shift(), charlie.push(`Hi!`)) {
case alice:
console.log(`Alice said ${alice.value}.`)
break
case bob:
console.log(`Bob said ${bob.value}.`)
break
case charlie:
console.log(`I said "hi" to Charlie.`)
break
}
Be careful of unintended side effects, however. Even though only one value is pushed in the following example, the counter is incremented twice.
let counter = 0
const increment = () => {
counter++
return counter
}
await Channel.select(alice.push(increment()), bob.push(increment()))
assert.equal(counter, 2)
Sometimes you don't want to wait until a method completes. You can use a closed channel to return immediately even if no other channels are ready:
const closed = Channel()
closed.close()
switch (await Channel.select(alice.shift(), bob.shift(), closed.shift()) {
case alice:
console.log(`Alice said ${alice.value}.`)
break
case bob:
console.log(`Bob said ${bob.value}.`)
break
default:
console.log(`No one has anything to say yet.`)
}
You can also arrange it so that the select
completes within a timeout:
const timeout = Channel()
setTimeout(timeout.close, 1000)
switch (await Channel.select(alice.shift(), bob.shift(), timeout.shift()) {
case alice:
console.log(`Alice said ${alice.value}.`)
break
case bob:
console.log(`Bob said ${bob.value}.`)
break
default:
console.log(`I stopped listening after one second.`)
}
Array-like Methods
These methods are similar to the equivalently named methods of Array
.
Channel
Channel([bufferLength = 0]) -> Channel
Create a new Channel
with an optional buffer. This allows an async function
to push up to bufferLength
values before blocking.
Channel.isChannel(value) -> Boolean
Return true
if value
is a channel, false
otherwise.
Channel.of(...values) -> Channel
Pushes values
into a new channel and then closes it.
Channel.from(iterable | stream.Readable) -> Channel
Create a new Channel
from an iterable or a Node.js readable stream.
Channel Object
every (callbackfn[, thisArg]) -> async Boolean
callbackfn
should be a function that accepts one argument and returns a value
that is coercible to the Boolean values true
or false
. every
calls
callbackfn
once for each value present in the channel until it finds one where
callbackfn
returns false
. If such a value is found, every immediately
returns false
. Otherwise, if callbackfn
returned true
for all elements,
every
will return true
.
If a thisArg
parameter is provided, it will be used as the this
value for
each invocation of callbackfn
. If it is not provided, undefined
is used
instead.
Unlike in the Array version of every
, callbackfn
is called with only one
argument.
filter(callbackfn[, thisArg]) -> Channel
callbackfn
should be a function that accepts an argument and returns a value
that is coercible to the Boolean values true
or false
. filter
calls
callbackfn
once for each value in the channel and constructs a new channel of
all the values for which callbackfn
returns true.
If a thisArg
parameter is provided, it will be used as the this
value for
each invocation of callbackfn
. If it is not provided, undefined
is used
instead.
Unlike in the Array version of filter
, callbackfn
is called with only one
argument.
forEach(callbackfn[, thisArg]) -> async
The promise returned by forEach
resolves when the channel is closed:
const toArray = async (channel) => {
const array = []
await channel.forEach((value) => {
array.push(value)
})
return array
}
If callbackfn
is async then forEach
will wait for it before iterating to the
next value:
const pipe = async (source, sink) => {
await source.forEach(sink.push)
sink.close()
}
join(separator) -> async String
map(callbackfn[, thisArg]) -> Channel
push(value) -> async bufferLength
Unlike Array
's method, push
accepts only one value
at a time.
Sends the value into the channel and returns a promise that resolves when the value has been shifted out or placed in the buffer.
- Throws a
TypeError
when attempting to push to a closed channel. - Throws a
TypeError
when attempting to pushundefined
because it's a reserved value used to indicate a closed channel.
reduce(callbackfn[, initialValue])
callbackfn
should be a function that takes two arguments (unlike Array
's
version which takes four). reduce
calls the callback, as a function, once for
each value after the first value present in the channel.
callbackfn
is called with two arguments: the previousValue
(value from the
previous call to callbackfn
) and the currentValue
. The first time that
callback is called, the previousValue
and currentValue
can be one of two
values. If an initialValue
was provided in the call to reduce
, then
previousValue
will be equal to initialValue
and currentValue
will be equal
to the first value in the channel. If no initialValue
was provided, then
previousValue
will be equal to the first value in the channel and
currentValue
will be equal to the second. It is a TypeError
if the channel
contains no values and initialValue
is not provided.
shift() -> async
Returns a promise that resolves when an value is received from the channel.
Closed channels always return undefined
immediately.
slice(start, end) -> Channel
Functional API
There is a parallel API to support functional-style programming. Every channel
method is also available as an independent function in the Channel
namespace
that takes a channel as the final argument. For example, slice
can be called
in either of the following two ways:
// method
channel.slice(10)
// function
Channel.slice(10, Infinity, channel)
You can also use partial application to pass the channel in later:
const skipTen = Channel.slice(10, Infinity)
skipTen(channel)
Contributing
Please submit an issue if you
have a suggestion for how to make Channel
more Array
-like (e.g., by adding
another Array
method).
Similar Projects
Copyright
Copyright 2017 David Braun
Licensed under the Apache License, Version 2.0 (the "License"); you may not use
these files except in compliance with the License. You may obtain a copy of the
License at http://www.apache.org/licenses/LICENSE-2.0
. Unless required by
applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the specific language
governing permissions and limitations under the License.