Refactor from
.
This commit is contained in:
parent
a6233e3884
commit
c0b2e8b411
1 changed files with 18 additions and 14 deletions
32
lib/index.js
32
lib/index.js
|
@ -342,6 +342,23 @@ const Channel = function(length = 0) {
|
|||
);
|
||||
};
|
||||
|
||||
// Node.js stream.readable
|
||||
const fromNodeStream = (channel, stream) => {
|
||||
stream.on(`readable`, async () => {
|
||||
for (;;) {
|
||||
const data = stream.read();
|
||||
|
||||
if (data === null) {
|
||||
break;
|
||||
} else {
|
||||
await channel.push(data);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
stream.once(`end`, channel.close);
|
||||
};
|
||||
|
||||
Channel.from = (values, mapfn, thisArg) => {
|
||||
const channel = Channel();
|
||||
|
||||
|
@ -367,20 +384,7 @@ Channel.from = (values, mapfn, thisArg) => {
|
|||
}
|
||||
}
|
||||
} catch (exception) {
|
||||
// Node.js stream.readable
|
||||
values.on(`readable`, async () => {
|
||||
while (true) {
|
||||
const data = values.read();
|
||||
|
||||
if (data === null) {
|
||||
break;
|
||||
} else {
|
||||
await channel.push(data);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
values.once(`end`, channel.close);
|
||||
fromNodeStream(channel, values);
|
||||
}
|
||||
}
|
||||
})();
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue