'use strict' // An order represents a pending push or shift. const Order = (channel) => { let order const preonFulfilleds = [] const promise = new Promise((resolve, reject) => { order = { resolve: (value) => { preonFulfilleds.forEach((preonFulfilled) => { preonFulfilled(value) }) resolve(value) }, reject } }) Object.assign(promise, { cancel: () => { order.cancelled = true }, channel, prethen: (onFulfilled) => { preonFulfilleds.push(onFulfilled) } }) return {order, promise} } const prototype = {} const Channel = function (bufferLength = 0) { let buffered = 0 let closed = false let lastValue let resolvedIndex = 0 const pushes = [] const shifts = [] // Process the push and shift queues like an order book, looking for matches. const processOrders = () => { const index = {push: 0, shift: 0} // Match pushes and shifts. while ((index.push < pushes.length) && (index.shift < shifts.length)) { const push = pushes[index.push] const shift = shifts[index.shift] if (push.cancelled) { index.push++ } else if (shift.cancelled) { index.shift++ } else { lastValue = push.value shift.resolve(lastValue) buffered = Math.max(0, buffered - 1) index.push++ index.shift++ } } // Resolve push promises up to the end of the buffer. for ( ; (resolvedIndex < index.push) || ((resolvedIndex < pushes.length) && (buffered < bufferLength)); resolvedIndex++ ) { const {cancelled, resolve} = pushes[resolvedIndex] if (!cancelled) { if (resolvedIndex > index.push) { buffered++ } resolve(bufferLength) } } // If the channel is closed then resolve 'undefined' to remaining shifts. if (closed) { for (; index.shift < shifts.length; index.shift++ ) { const {cancelled, resolve} = shifts[index.shift] if (!cancelled) { lastValue = undefined resolve(lastValue) } } } pushes.splice(0, index.push) shifts.splice(0, index.shift) resolvedIndex -= index.push } const readOnly = Object.assign(Object.create(prototype), { every: async (callbackfn, thisArg) => { for (;;) { const value = await readOnly.shift() if (value === undefined) { return true } else { if (!callbackfn.call(thisArg, value)) { return false } } } }, filter: (callbackfn, thisArg) => { const output = Channel() ;(async () => { for (;;) { const value = await readOnly.shift() if (value === undefined) { await output.close() break } else if (callbackfn.call(thisArg, value)) { await output.push(value) } } })() return output }, forEach: async (callbackfn, thisArg) => { for (;;) { const value = await readOnly.shift() if (value === undefined) { break } else { await callbackfn.call(thisArg, value) } } }, join: async (separator) => { const elements = [] await readOnly.forEach((element) => { elements.push(element) }) return elements.join(separator) }, map: (callbackfn, thisArg) => { const output = Channel() ;(async () => { await readOnly.forEach((value) => output.push(callbackfn.call(thisArg, value)) ) output.close() })() return output }, readOnly: () => readOnly, reduce: async (callbackfn, ...initialValue) => { let previousValue = initialValue[0] let previousValueDefined = initialValue.length > 0 await readOnly.forEach((currentValue) => { if (previousValueDefined) { previousValue = callbackfn(previousValue, currentValue) } else { previousValue = currentValue previousValueDefined = true } }) if (previousValueDefined) { return previousValue } else { throw new TypeError( `No values in channel and initialValue wasn't provided.` ) } }, shift: function () { const {order, promise} = Order(this) shifts.push(order) setImmediate(processOrders) return Object.freeze(promise) }, slice: (start, end = Infinity) => { const output = Channel() ;(async () => { for (let index = 0; index < end; index++) { const value = await readOnly.shift() if (value === undefined) { break } else if (index >= start) { await output.push(value) } } await output.close() })() return output } }) Object.defineProperty(readOnly, `value`, {get: () => lastValue}) Object.freeze(readOnly) const writeOnly = Object.freeze({ close: () => new Promise((resolve, reject) => { if (closed) { reject(new Error(`Can't close an already-closed channel.`)) } else { closed = true processOrders() // Give remaining orders in flight time to resolve before returning. setImmediate(resolve) } }), push: function (value) { const {order, promise} = Order(this) order.value = value if (closed) { order.reject(new Error(`Can't push to closed channel.`)) } else if (value === undefined) { order.reject(new TypeError( `Can't push 'undefined' to channel, use close instead.` )) } else if (arguments.length > 1) { order.reject(new Error(`Can't push more than one value at a time.`)) } else { pushes.push(order) setImmediate(processOrders) } return Object.freeze(promise) }, writeOnly: () => writeOnly }) // Use Object.create because readOnly has a getter. return Object.freeze(Object.assign(Object.create(readOnly), writeOnly)) } Channel.from = (values) => { const channel = Channel() ;(async () => { try { // iterable for (let value of values) { await channel.push(value) } await channel.close() } catch (exception) { // Assume it's a Node.js stream.readable. values.on('readable', async () => { while (true) { const data = values.read() if (data === null) { break } else { await channel.push(data) } } }) values.once('end', channel.close) } })() return channel } Channel.of = (...values) => Channel.from(values) Channel.isChannel = (arg) => prototype.isPrototypeOf(arg) Channel.select = (...methodPromises) => { const promise = new Promise((resolve, reject) => { methodPromises.forEach(async (promise) => { promise.prethen(() => { // We've been given a heads-up that this method will complete first so // cancel the other method calls. methodPromises.forEach((other) => { if (other !== promise) { other.cancel() } }) }) try { await promise } catch (exception) { reject(exception) } resolve(promise.channel) }) }) return promise } // functional interface allowing full or partial application // // Channel.slice(10, Infinity, channel) // // or // // const skipTen = Channel.slice(10, Infinity) // skipTen(channel) const channel = Channel() const methods = Object.keys(channel).concat(Object.keys(channel.readOnly())) methods.forEach((method) => { Channel[method] = (...args) => { const arity = method === `slice` ? 3 : channel[method].length return args.length >= arity ? args[arity - 1][method](...args.slice(0, arity - 1)) : (channel) => channel[method](...args) } }) module.exports = Object.freeze(Channel)