diff --git a/lib/index.js b/lib/index.js index 7142d39..6b7a75d 100644 --- a/lib/index.js +++ b/lib/index.js @@ -60,8 +60,13 @@ const Channel = function(length = 0) { } else if (shift.cancelled) { index.shift++; } else { - lastValue = push.value; - shift.resolve(lastValue); + if (`reason` in push) { + shift.reject(push.reason); + } else { + lastValue = push.value; + shift.resolve(lastValue); + } + buffered = Math.max(0, buffered - 1); index.push++; index.shift++; @@ -320,6 +325,13 @@ const Channel = function(length = 0) { const { order, promise } = Order(this); order.value = value; + // If value is a promise that rejects, catch it in case there hasn't + // been a matching shift yet in order to prevent an unhandledRejection + // error. Reject it again when there's a shift. + Promise.resolve(value).catch(reason => { + order.reason = reason; + }); + if (closed) { order.reject(new Error(`Can't push to closed channel.`)); } else if (value === undefined) { diff --git a/test/index.js b/test/index.js index 53eb6f3..33c1125 100644 --- a/test/index.js +++ b/test/index.js @@ -453,3 +453,25 @@ describe(`Channel object`, function() { }); }); }); + +it(`allows promises to be sent through a channel`, function() { + return new Promise(async (resolve, reject) => { + process.once(`unhandledRejection`, reject); + + const channel = Channel.of( + Promise.resolve(`resolved`), + new Promise((resolve, reject) => { + setImmediate(reject, new Error(`rejected`)); + }) + ); + + assert.equal(await channel.shift(), `resolved`); + + try { + await channel.shift(); + } catch (exception) { + assert.equal(exception.message, `rejected`); + resolve(); + } + }); +});