Replace JavaScript Standard Style with Prettier.
This commit is contained in:
parent
b9638f2113
commit
d7355e17e1
9 changed files with 647 additions and 675 deletions
320
lib/index.js
320
lib/index.js
|
@ -1,342 +1,332 @@
|
|||
'use strict'
|
||||
"use strict";
|
||||
|
||||
// An order represents a pending push or shift.
|
||||
const Order = (channel) => {
|
||||
let order
|
||||
const preonFulfilleds = []
|
||||
const Order = channel => {
|
||||
let order;
|
||||
const preonFulfilleds = [];
|
||||
|
||||
const promise = new Promise((resolve, reject) => {
|
||||
order = {
|
||||
resolve: (value) => {
|
||||
preonFulfilleds.forEach((preonFulfilled) => {
|
||||
preonFulfilled(value)
|
||||
})
|
||||
resolve: value => {
|
||||
preonFulfilleds.forEach(preonFulfilled => {
|
||||
preonFulfilled(value);
|
||||
});
|
||||
|
||||
resolve(value)
|
||||
resolve(value);
|
||||
},
|
||||
|
||||
reject
|
||||
}
|
||||
})
|
||||
};
|
||||
});
|
||||
|
||||
Object.assign(promise, {
|
||||
cancel: () => {
|
||||
order.cancelled = true
|
||||
order.cancelled = true;
|
||||
},
|
||||
|
||||
channel,
|
||||
|
||||
prethen: (onFulfilled) => {
|
||||
preonFulfilleds.push(onFulfilled)
|
||||
prethen: onFulfilled => {
|
||||
preonFulfilleds.push(onFulfilled);
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
return {order, promise}
|
||||
}
|
||||
return { order, promise };
|
||||
};
|
||||
|
||||
const prototype = {}
|
||||
const prototype = {};
|
||||
|
||||
const Channel = function (bufferLength = 0) {
|
||||
let buffered = 0
|
||||
let closed = false
|
||||
let lastValue
|
||||
let resolvedIndex = 0
|
||||
const pushes = []
|
||||
const shifts = []
|
||||
const Channel = function(bufferLength = 0) {
|
||||
let buffered = 0;
|
||||
let closed = false;
|
||||
let lastValue;
|
||||
let resolvedIndex = 0;
|
||||
const pushes = [];
|
||||
const shifts = [];
|
||||
|
||||
// Process the push and shift queues like an order book, looking for matches.
|
||||
const processOrders = () => {
|
||||
const index = {push: 0, shift: 0}
|
||||
const index = { push: 0, shift: 0 };
|
||||
|
||||
// Match pushes and shifts.
|
||||
while ((index.push < pushes.length) && (index.shift < shifts.length)) {
|
||||
const push = pushes[index.push]
|
||||
const shift = shifts[index.shift]
|
||||
while (index.push < pushes.length && index.shift < shifts.length) {
|
||||
const push = pushes[index.push];
|
||||
const shift = shifts[index.shift];
|
||||
|
||||
if (push.cancelled) {
|
||||
index.push++
|
||||
index.push++;
|
||||
} else if (shift.cancelled) {
|
||||
index.shift++
|
||||
index.shift++;
|
||||
} else {
|
||||
lastValue = push.value
|
||||
shift.resolve(lastValue)
|
||||
buffered = Math.max(0, buffered - 1)
|
||||
index.push++
|
||||
index.shift++
|
||||
lastValue = push.value;
|
||||
shift.resolve(lastValue);
|
||||
buffered = Math.max(0, buffered - 1);
|
||||
index.push++;
|
||||
index.shift++;
|
||||
}
|
||||
}
|
||||
|
||||
// Resolve push promises up to the end of the buffer.
|
||||
for (
|
||||
;
|
||||
(resolvedIndex < index.push) ||
|
||||
((resolvedIndex < pushes.length) && (buffered < bufferLength));
|
||||
resolvedIndex < index.push ||
|
||||
(resolvedIndex < pushes.length && buffered < bufferLength);
|
||||
resolvedIndex++
|
||||
) {
|
||||
const {cancelled, resolve} = pushes[resolvedIndex]
|
||||
const { cancelled, resolve } = pushes[resolvedIndex];
|
||||
|
||||
if (!cancelled) {
|
||||
if (resolvedIndex > index.push) {
|
||||
buffered++
|
||||
buffered++;
|
||||
}
|
||||
|
||||
resolve(bufferLength)
|
||||
resolve(bufferLength);
|
||||
}
|
||||
}
|
||||
|
||||
// If the channel is closed then resolve 'undefined' to remaining shifts.
|
||||
if (closed) {
|
||||
for (;
|
||||
index.shift < shifts.length;
|
||||
index.shift++
|
||||
) {
|
||||
const {cancelled, resolve} = shifts[index.shift]
|
||||
for (; index.shift < shifts.length; index.shift++) {
|
||||
const { cancelled, resolve } = shifts[index.shift];
|
||||
|
||||
if (!cancelled) {
|
||||
lastValue = undefined
|
||||
resolve(lastValue)
|
||||
lastValue = undefined;
|
||||
resolve(lastValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pushes.splice(0, index.push)
|
||||
shifts.splice(0, index.shift)
|
||||
resolvedIndex -= index.push
|
||||
}
|
||||
pushes.splice(0, index.push);
|
||||
shifts.splice(0, index.shift);
|
||||
resolvedIndex -= index.push;
|
||||
};
|
||||
|
||||
const readOnly = Object.assign(Object.create(prototype), {
|
||||
every: async (callbackfn, thisArg) => {
|
||||
for (;;) {
|
||||
const value = await readOnly.shift()
|
||||
const value = await readOnly.shift();
|
||||
|
||||
if (value === undefined) {
|
||||
return true
|
||||
return true;
|
||||
} else {
|
||||
if (!callbackfn.call(thisArg, value)) {
|
||||
return false
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
filter: (callbackfn, thisArg) => {
|
||||
const output = Channel()
|
||||
|
||||
;(async () => {
|
||||
const output = Channel();
|
||||
(async () => {
|
||||
for (;;) {
|
||||
const value = await readOnly.shift()
|
||||
const value = await readOnly.shift();
|
||||
|
||||
if (value === undefined) {
|
||||
await output.close()
|
||||
break
|
||||
await output.close();
|
||||
break;
|
||||
} else if (callbackfn.call(thisArg, value)) {
|
||||
await output.push(value)
|
||||
await output.push(value);
|
||||
}
|
||||
}
|
||||
})()
|
||||
})();
|
||||
|
||||
return output
|
||||
return output;
|
||||
},
|
||||
|
||||
forEach: async (callbackfn, thisArg) => {
|
||||
for (;;) {
|
||||
const value = await readOnly.shift()
|
||||
const value = await readOnly.shift();
|
||||
|
||||
if (value === undefined) {
|
||||
break
|
||||
break;
|
||||
} else {
|
||||
await callbackfn.call(thisArg, value)
|
||||
await callbackfn.call(thisArg, value);
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
join: async (separator) => {
|
||||
const elements = []
|
||||
join: async separator => {
|
||||
const elements = [];
|
||||
|
||||
await readOnly.forEach((element) => {
|
||||
elements.push(element)
|
||||
})
|
||||
await readOnly.forEach(element => {
|
||||
elements.push(element);
|
||||
});
|
||||
|
||||
return elements.join(separator)
|
||||
return elements.join(separator);
|
||||
},
|
||||
|
||||
map: (callbackfn, thisArg) => {
|
||||
const output = Channel()
|
||||
|
||||
;(async () => {
|
||||
await readOnly.forEach((value) =>
|
||||
const output = Channel();
|
||||
(async () => {
|
||||
await readOnly.forEach(value =>
|
||||
output.push(callbackfn.call(thisArg, value))
|
||||
)
|
||||
);
|
||||
|
||||
output.close()
|
||||
})()
|
||||
output.close();
|
||||
})();
|
||||
|
||||
return output
|
||||
return output;
|
||||
},
|
||||
|
||||
readOnly: () => readOnly,
|
||||
|
||||
reduce: async (callbackfn, ...initialValue) => {
|
||||
let previousValue = initialValue[0]
|
||||
let previousValueDefined = initialValue.length > 0
|
||||
let previousValue = initialValue[0];
|
||||
let previousValueDefined = initialValue.length > 0;
|
||||
|
||||
await readOnly.forEach((currentValue) => {
|
||||
await readOnly.forEach(currentValue => {
|
||||
if (previousValueDefined) {
|
||||
previousValue = callbackfn(previousValue, currentValue)
|
||||
previousValue = callbackfn(previousValue, currentValue);
|
||||
} else {
|
||||
previousValue = currentValue
|
||||
previousValueDefined = true
|
||||
previousValue = currentValue;
|
||||
previousValueDefined = true;
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
if (previousValueDefined) {
|
||||
return previousValue
|
||||
return previousValue;
|
||||
} else {
|
||||
throw new TypeError(
|
||||
`No values in channel and initialValue wasn't provided.`
|
||||
)
|
||||
);
|
||||
}
|
||||
},
|
||||
|
||||
shift: function () {
|
||||
const {order, promise} = Order(this)
|
||||
shifts.push(order)
|
||||
setImmediate(processOrders)
|
||||
return Object.freeze(promise)
|
||||
shift: function() {
|
||||
const { order, promise } = Order(this);
|
||||
shifts.push(order);
|
||||
setImmediate(processOrders);
|
||||
return Object.freeze(promise);
|
||||
},
|
||||
|
||||
slice: (start, end = Infinity) => {
|
||||
const output = Channel()
|
||||
|
||||
;(async () => {
|
||||
const output = Channel();
|
||||
(async () => {
|
||||
for (let index = 0; index < end; index++) {
|
||||
const value = await readOnly.shift()
|
||||
const value = await readOnly.shift();
|
||||
|
||||
if (value === undefined) {
|
||||
break
|
||||
break;
|
||||
} else if (index >= start) {
|
||||
await output.push(value)
|
||||
await output.push(value);
|
||||
}
|
||||
}
|
||||
|
||||
await output.close()
|
||||
})()
|
||||
await output.close();
|
||||
})();
|
||||
|
||||
return output
|
||||
return output;
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
Object.defineProperty(readOnly, `value`, {get: () => lastValue})
|
||||
Object.freeze(readOnly)
|
||||
Object.defineProperty(readOnly, `value`, { get: () => lastValue });
|
||||
Object.freeze(readOnly);
|
||||
|
||||
const writeOnly = Object.freeze({
|
||||
close: () =>
|
||||
new Promise((resolve, reject) => {
|
||||
if (closed) {
|
||||
reject(new Error(`Can't close an already-closed channel.`))
|
||||
reject(new Error(`Can't close an already-closed channel.`));
|
||||
} else {
|
||||
closed = true
|
||||
processOrders()
|
||||
closed = true;
|
||||
processOrders();
|
||||
|
||||
// Give remaining orders in flight time to resolve before returning.
|
||||
setImmediate(resolve)
|
||||
setImmediate(resolve);
|
||||
}
|
||||
}),
|
||||
|
||||
push: function (value) {
|
||||
const {order, promise} = Order(this)
|
||||
order.value = value
|
||||
push: function(value) {
|
||||
const { order, promise } = Order(this);
|
||||
order.value = value;
|
||||
|
||||
if (closed) {
|
||||
order.reject(new Error(`Can't push to closed channel.`))
|
||||
order.reject(new Error(`Can't push to closed channel.`));
|
||||
} else if (value === undefined) {
|
||||
order.reject(new TypeError(
|
||||
`Can't push 'undefined' to channel, use close instead.`
|
||||
))
|
||||
order.reject(
|
||||
new TypeError(`Can't push 'undefined' to channel, use close instead.`)
|
||||
);
|
||||
} else if (arguments.length > 1) {
|
||||
order.reject(new Error(`Can't push more than one value at a time.`))
|
||||
order.reject(new Error(`Can't push more than one value at a time.`));
|
||||
} else {
|
||||
pushes.push(order)
|
||||
setImmediate(processOrders)
|
||||
pushes.push(order);
|
||||
setImmediate(processOrders);
|
||||
}
|
||||
|
||||
return Object.freeze(promise)
|
||||
return Object.freeze(promise);
|
||||
},
|
||||
|
||||
writeOnly: () => writeOnly
|
||||
})
|
||||
});
|
||||
|
||||
// Use Object.create because readOnly has a getter.
|
||||
return Object.freeze(Object.assign(Object.create(readOnly), writeOnly))
|
||||
}
|
||||
return Object.freeze(Object.assign(Object.create(readOnly), writeOnly));
|
||||
};
|
||||
|
||||
Channel.from = (values) => {
|
||||
const channel = Channel()
|
||||
|
||||
;(async () => {
|
||||
Channel.from = values => {
|
||||
const channel = Channel();
|
||||
(async () => {
|
||||
try {
|
||||
// iterable
|
||||
for (let value of values) {
|
||||
await channel.push(value)
|
||||
await channel.push(value);
|
||||
}
|
||||
|
||||
await channel.close()
|
||||
await channel.close();
|
||||
} catch (exception) {
|
||||
// Assume it's a Node.js stream.readable.
|
||||
|
||||
values.on('readable', async () => {
|
||||
values.on("readable", async () => {
|
||||
while (true) {
|
||||
const data = values.read()
|
||||
const data = values.read();
|
||||
|
||||
if (data === null) {
|
||||
break
|
||||
break;
|
||||
} else {
|
||||
await channel.push(data)
|
||||
await channel.push(data);
|
||||
}
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
values.once('end', channel.close)
|
||||
values.once("end", channel.close);
|
||||
}
|
||||
})()
|
||||
})();
|
||||
|
||||
return channel
|
||||
}
|
||||
return channel;
|
||||
};
|
||||
|
||||
Channel.of = (...values) =>
|
||||
Channel.from(values)
|
||||
Channel.of = (...values) => Channel.from(values);
|
||||
|
||||
Channel.isChannel = (arg) =>
|
||||
prototype.isPrototypeOf(arg)
|
||||
Channel.isChannel = arg => prototype.isPrototypeOf(arg);
|
||||
|
||||
Channel.select = (...methodPromises) =>
|
||||
Object.assign(
|
||||
new Promise((resolve, reject) => {
|
||||
methodPromises.forEach(async (promise) => {
|
||||
methodPromises.forEach(async promise => {
|
||||
promise.prethen(() => {
|
||||
// We've been given a heads-up that this method will complete first so
|
||||
// cancel the other method calls.
|
||||
methodPromises.forEach((other) => {
|
||||
methodPromises.forEach(other => {
|
||||
if (other !== promise) {
|
||||
other.cancel()
|
||||
other.cancel();
|
||||
}
|
||||
})
|
||||
})
|
||||
});
|
||||
});
|
||||
|
||||
try {
|
||||
await promise
|
||||
await promise;
|
||||
} catch (exception) {
|
||||
reject(exception)
|
||||
reject(exception);
|
||||
}
|
||||
|
||||
resolve(promise.channel)
|
||||
})
|
||||
resolve(promise.channel);
|
||||
});
|
||||
}),
|
||||
{
|
||||
cancel: () =>
|
||||
Promise.all(methodPromises.map((promise) => promise.cancel()))
|
||||
cancel: () => Promise.all(methodPromises.map(promise => promise.cancel()))
|
||||
}
|
||||
)
|
||||
);
|
||||
|
||||
// functional interface allowing full or partial application
|
||||
//
|
||||
|
@ -347,19 +337,17 @@ Channel.select = (...methodPromises) =>
|
|||
// const skipTen = Channel.slice(10, Infinity)
|
||||
// skipTen(channel)
|
||||
|
||||
const channel = Channel()
|
||||
const methods = Object.keys(channel).concat(Object.keys(channel.readOnly()))
|
||||
const channel = Channel();
|
||||
const methods = Object.keys(channel).concat(Object.keys(channel.readOnly()));
|
||||
|
||||
methods.forEach((method) => {
|
||||
methods.forEach(method => {
|
||||
Channel[method] = (...args) => {
|
||||
const arity = method === `slice`
|
||||
? 3
|
||||
: channel[method].length
|
||||
const arity = method === `slice` ? 3 : channel[method].length;
|
||||
|
||||
return args.length >= arity
|
||||
? args[arity - 1][method](...args.slice(0, arity - 1))
|
||||
: (channel) => channel[method](...args)
|
||||
}
|
||||
})
|
||||
: channel => channel[method](...args);
|
||||
};
|
||||
});
|
||||
|
||||
module.exports = Object.freeze(Channel)
|
||||
module.exports = Object.freeze(Channel);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue