diff --git a/doc/API.md b/doc/API.md index a17f0b7..4b3a2d5 100644 --- a/doc/API.md +++ b/doc/API.md @@ -6,7 +6,7 @@ - [writeOnly() -> Channel](#writeonly---channel) - [Channel.select(promises) -> async channel](#channelselectpromises---async-channel) - [Examples](#examples) - - [value](#value) + - [value()](#value) - [Array-like Properties](#array-like-properties) - [Channel](#channel) - [Channel([bufferLength = 0]) -> Channel](#channelbufferlength--0---channel) @@ -71,11 +71,11 @@ switch (await Channel.select([ charlie.push(`Hi!`) ])) { case alice: - console.log(`Alice said ${alice.value}.`); + console.log(`Alice said ${alice.value()}.`); break; case bob: - console.log(`Bob said ${bob.value}.`); + console.log(`Bob said ${bob.value()}.`); break; case charlie: @@ -108,11 +108,11 @@ closed.close(); switch (await Channel.select([alice.shift(), bob.shift(), closed.shift())]) { case alice: - console.log(`Alice said ${alice.value}.`); + console.log(`Alice said ${alice.value()}.`); break; case bob: - console.log(`Bob said ${bob.value}.`); + console.log(`Bob said ${bob.value()}.`); break; default: @@ -128,11 +128,11 @@ setTimeout(timeout.close, 1000); switch (await Channel.select([alice.shift(), bob.shift(), timeout.shift())]) { case alice: - console.log(`Alice said ${alice.value}.`); + console.log(`Alice said ${alice.value()}.`); break; case bob: - console.log(`Bob said ${bob.value}.`); + console.log(`Bob said ${bob.value()}.`); break; default: @@ -140,9 +140,9 @@ switch (await Channel.select([alice.shift(), bob.shift(), timeout.shift())]) { } ``` -## value +## value() -Set to the most recently `shift`ed value. This is useful when used in +Return the most recently `shift`ed value. This is useful when used in combination with `select`. # Array-like Properties diff --git a/lib/index.js b/lib/index.js index 7037649..c761190 100644 --- a/lib/index.js +++ b/lib/index.js @@ -36,7 +36,7 @@ const Order = channel => { const prototype = {}; -const Channel = function(bufferLength = 0) { +const Channel = function(length = 0) { let buffered = 0; let closed = false; let lastValue; @@ -70,7 +70,7 @@ const Channel = function(bufferLength = 0) { for ( ; resolvedIndex < index.push || - (resolvedIndex < pushes.length && buffered < bufferLength); + (resolvedIndex < pushes.length && buffered < length); resolvedIndex++ ) { const { cancelled, resolve } = pushes[resolvedIndex]; @@ -80,7 +80,7 @@ const Channel = function(bufferLength = 0) { buffered++; } - resolve(bufferLength); + resolve(length); } } @@ -101,205 +101,209 @@ const Channel = function(bufferLength = 0) { resolvedIndex -= index.push; }; - const readOnly = Object.assign(Object.create(prototype), { - every: async (callbackfn, thisArg) => { - for (;;) { - const value = await readOnly.shift(); - - if (value === undefined) { - return true; - } else { - if (!callbackfn.call(thisArg, value)) { - return false; - } - } - } - }, - - filter: (callbackfn, thisArg) => { - const output = Channel(); - (async () => { + const readOnly = Object.freeze( + Object.assign(Object.create(prototype), { + every: async (callbackfn, thisArg) => { for (;;) { const value = await readOnly.shift(); if (value === undefined) { - await output.close(); - break; - } else if (callbackfn.call(thisArg, value)) { - await output.push(value); + return true; + } else { + if (!callbackfn.call(thisArg, value)) { + return false; + } } } - })(); + }, - return output; - }, + filter: (callbackfn, thisArg) => { + const output = Channel(); + (async () => { + for (;;) { + const value = await readOnly.shift(); - forEach: async (callbackfn, thisArg) => { - for (;;) { - const value = await readOnly.shift(); - - if (value === undefined) { - break; - } else { - await callbackfn.call(thisArg, value); - } - } - }, - - join: async separator => { - const elements = []; - - await readOnly.forEach(element => { - elements.push(element); - }); - - return elements.join(separator); - }, - - map: (callbackfn, thisArg) => { - const output = Channel(); - (async () => { - await readOnly.forEach(value => - output.push(callbackfn.call(thisArg, value)) - ); - - output.close(); - })(); - - return output; - }, - - readOnly: () => readOnly, - - reduce: async (callbackfn, ...initialValue) => { - let previousValue = initialValue[0]; - let previousValueDefined = initialValue.length > 0; - - await readOnly.forEach(currentValue => { - if (previousValueDefined) { - previousValue = callbackfn(previousValue, currentValue); - } else { - previousValue = currentValue; - previousValueDefined = true; - } - }); - - if (previousValueDefined) { - return previousValue; - } else { - throw new TypeError( - `No values in channel and initialValue wasn't provided.` - ); - } - }, - - shift: function() { - const { order, promise } = Order(this); - shifts.push(order); - setImmediate(processOrders); - return Object.freeze(promise); - }, - - slice: (start, end = Infinity) => { - const output = Channel(); - (async () => { - for (let index = 0; index < start; index++) { - const value = await readOnly.shift(); - - if (value === undefined) { - break; + if (value === undefined) { + await output.close(); + break; + } else if (callbackfn.call(thisArg, value)) { + await output.push(value); + } } - } + })(); - for (let index = start; index < end; index++) { + return output; + }, + + forEach: async (callbackfn, thisArg) => { + for (;;) { const value = await readOnly.shift(); if (value === undefined) { break; } else { - await output.push(value); + await callbackfn.call(thisArg, value); } } + }, - await output.close(); - })(); + join: async separator => { + const elements = []; - return output; - }, + await readOnly.forEach(element => { + elements.push(element); + }); - some: async (callbackfn, thisArg) => { - for (;;) { - const value = await readOnly.shift(); + return elements.join(separator); + }, - if (value === undefined) { - return false; - } else { - if (callbackfn.call(thisArg, value)) { - return true; + map: (callbackfn, thisArg) => { + const output = Channel(); + (async () => { + await readOnly.forEach(value => + output.push(callbackfn.call(thisArg, value)) + ); + + output.close(); + })(); + + return output; + }, + + readOnly: () => readOnly, + + reduce: async (callbackfn, ...initialValue) => { + let previousValue = initialValue[0]; + let previousValueDefined = initialValue.length > 0; + + await readOnly.forEach(currentValue => { + if (previousValueDefined) { + previousValue = callbackfn(previousValue, currentValue); + } else { + previousValue = currentValue; + previousValueDefined = true; } - } - } - }, + }); - toString: () => `Channel(${bufferLength})`, - - values: async () => { - const array = []; - - await readOnly.forEach(item => { - array.push(item); - }); - - return array; - } - }); - - Object.defineProperties(readOnly, { - length: { get: () => bufferLength }, - value: { get: () => lastValue } - }); - - Object.freeze(readOnly); - - const writeOnly = Object.freeze({ - close: () => - new Promise((resolve, reject) => { - if (closed) { - reject(new Error(`Can't close an already-closed channel.`)); + if (previousValueDefined) { + return previousValue; } else { - closed = true; - processOrders(); - - // Give remaining orders in flight time to resolve before returning. - setImmediate(resolve); + throw new TypeError( + `No values in channel and initialValue wasn't provided.` + ); } - }), + }, - push: function(value) { - const { order, promise } = Order(this); - order.value = value; - - if (closed) { - order.reject(new Error(`Can't push to closed channel.`)); - } else if (value === undefined) { - order.reject( - new TypeError(`Can't push 'undefined' to channel, use close instead.`) - ); - } else if (arguments.length > 1) { - order.reject(new Error(`Can't push more than one value at a time.`)); - } else { - pushes.push(order); + shift: function() { + const { order, promise } = Order(this); + shifts.push(order); setImmediate(processOrders); + return Object.freeze(promise); + }, + + slice: (start, end = Infinity) => { + const output = Channel(); + (async () => { + for (let index = 0; index < start; index++) { + const value = await readOnly.shift(); + + if (value === undefined) { + break; + } + } + + for (let index = start; index < end; index++) { + const value = await readOnly.shift(); + + if (value === undefined) { + break; + } else { + await output.push(value); + } + } + + await output.close(); + })(); + + return output; + }, + + some: async (callbackfn, thisArg) => { + for (;;) { + const value = await readOnly.shift(); + + if (value === undefined) { + return false; + } else { + if (callbackfn.call(thisArg, value)) { + return true; + } + } + } + }, + + toString: () => `Channel(${length})`, + + value: () => lastValue, + + values: async () => { + const array = []; + + await readOnly.forEach(item => { + array.push(item); + }); + + return array; } + }) + ); - return Object.freeze(promise); - }, + const writeOnly = Object.freeze( + Object.assign(Object.create(prototype), { + close: () => + new Promise((resolve, reject) => { + if (closed) { + reject(new Error(`Can't close an already-closed channel.`)); + } else { + closed = true; + processOrders(); - writeOnly: () => writeOnly - }); + // Give remaining orders in flight time to resolve before returning. + setImmediate(resolve); + } + }), - // Use Object.create because readOnly has a getter. - return Object.freeze(Object.assign(Object.create(readOnly), writeOnly)); + length, + + push: function(value) { + const { order, promise } = Order(this); + order.value = value; + + if (closed) { + order.reject(new Error(`Can't push to closed channel.`)); + } else if (value === undefined) { + order.reject( + new TypeError( + `Can't push 'undefined' to channel, use close instead.` + ) + ); + } else if (arguments.length > 1) { + order.reject(new Error(`Can't push more than one value at a time.`)); + } else { + pushes.push(order); + setImmediate(processOrders); + } + + return Object.freeze(promise); + }, + + writeOnly: () => writeOnly + }) + ); + + return Object.freeze( + Object.assign(Object.create(prototype), readOnly, writeOnly) + ); }; Channel.from = values => { @@ -384,7 +388,10 @@ Channel.select = methodPromises => // Channel.slice(10)(Infinity)(channel) const channel = Channel(); -const methods = Object.keys(channel).concat(Object.keys(channel.readOnly())); + +const methods = Object.keys(channel).filter( + method => typeof channel[method] === `function` +); methods.forEach(method => { const bound = function(...args) { diff --git a/test/index.js b/test/index.js index d1f8883..d7dcec4 100644 --- a/test/index.js +++ b/test/index.js @@ -63,6 +63,8 @@ describe(`Channel`, function() { it(`isChannel`, function() { assert(Channel.isChannel(Channel.of(0, 1, 2))); assert(!Channel.isChannel(Array.of(0, 1, 2))); + assert(Channel.isChannel(Channel().readOnly())); + assert(Channel.isChannel(Channel().writeOnly())); }); it(`of`, async function() { @@ -80,7 +82,7 @@ describe(`Channel`, function() { })(); assert.equal(await Channel.select([a.shift(), b.shift()]), b); - assert.equal(b.value, 0); + assert.equal(b.value(), 0); assert.equal(await a.shift(), 1); assert.equal(await Channel.select([a.push(0), b.shift()]), a); }); @@ -294,7 +296,7 @@ describe(`Channel object`, function() { assert.equal(readOnly.readOnly(), readOnly); assert.equal(await readOnly.shift(), 1); - assert.equal(readOnly.value, 1); + assert.equal(readOnly.value(), 1); assert.throws(() => { readOnly.frozen = false; @@ -367,21 +369,13 @@ describe(`Channel object`, function() { }); it(`value`, async function() { - const channel = Channel(); - (async () => { - await channel.push(0); - })(); - + const channel = Channel(1); + await channel.push(null); await channel.shift(); - assert.equal(channel.value, 0); - - assert.throws(() => { - channel.value = 1; - }); - + assert.equal(channel.value(), null); channel.close(); await channel.shift(); - assert.equal(channel.value, undefined); + assert.equal(channel.value(), undefined); }); it(`values`, async function() {