Simplify processOrders, fixing buffer off-by-one error.

This commit is contained in:
David Braun 2018-10-22 21:17:14 -04:00
parent 6066be83fe
commit 3e92604d18
No known key found for this signature in database
GPG key ID: 87EC41ADF710B7E2
2 changed files with 316 additions and 26 deletions

View file

@ -43,15 +43,10 @@ const Channel = function(length = 0) {
let buffered = 0;
let closed = false;
let lastValue;
let resolvedIndex = 0;
const pushes = [];
const shifts = [];
// Process the push and shift queues like an order book, looking for matches.
const processOrders = () => {
const index = { push: 0, shift: 0 };
// Match pushes and shifts.
const matchPushesAndShifts = index => {
while (index.push < pushes.length && index.shift < shifts.length) {
const push = pushes[index.push];
const shift = shifts[index.shift];
@ -63,45 +58,54 @@ const Channel = function(length = 0) {
} else {
lastValue = push.value;
shift.resolve(lastValue);
buffered = Math.max(0, buffered - 1);
index.push++;
index.shift++;
push.resolve(length);
index.push++;
buffered = Math.max(0, buffered - 1);
}
}
};
// Resolve push promises up to the end of the buffer.
// Resolve push promises up to the end of the buffer.
const resolveBufferedPushes = index => {
for (
;
resolvedIndex < index.push ||
(resolvedIndex < pushes.length && buffered < length);
let resolvedIndex = index.push + buffered;
resolvedIndex < pushes.length && buffered < length;
resolvedIndex++
) {
const { cancelled, resolve } = pushes[resolvedIndex];
if (!cancelled) {
if (resolvedIndex > index.push) {
buffered++;
}
buffered++;
resolve(length);
}
}
};
const resolveClosedShifts = index => {
for (; index.shift < shifts.length; index.shift++) {
const { cancelled, resolve } = shifts[index.shift];
if (!cancelled) {
lastValue = undefined;
resolve(lastValue);
}
}
};
// Process the push and shift queues like an order book, looking for matches.
const processOrders = () => {
const index = { push: 0, shift: 0 };
matchPushesAndShifts(index);
resolveBufferedPushes(index);
// If the channel is closed then resolve 'undefined' to remaining shifts.
if (closed) {
for (; index.shift < shifts.length; index.shift++) {
const { cancelled, resolve } = shifts[index.shift];
if (!cancelled) {
lastValue = undefined;
resolve(lastValue);
}
}
resolveClosedShifts(index);
}
pushes.splice(0, index.push);
shifts.splice(0, index.shift);
resolvedIndex -= index.push;
};
const readOnly = Object.freeze(
@ -317,7 +321,7 @@ const Channel = function(length = 0) {
// If value is a promise that rejects, catch it in case there hasn't
// been a matching shift yet in order to prevent an unhandledRejection
// error. Reject it again when there's a shift.
// error.
Promise.resolve(value).catch(() => {});
if (closed) {