Queue System within the CLI with NodeJS
I wanted to buld a simple CLI tool that has
- An infinite loop
- A prompt
- A queue System
- Simple Processing (no workers, sadly…)
The queue has been started and 99% built from : https://github.com/MatthewMueller/enqueue (I added some features that I need to monitor the process, this is very simple, but the idea was only to build a POC because I have other projects that can benefit from this prototype)
The Code
Dependencies
const { exec } = require("node:child_process");
const readline = require("node:readline");
const { stdin: input, stdout: output } = require("node:process");
const sliced = require("sliced");
const noop = function () {};
Options
const options = {
concurrency: 2,
timeout: 600000,
limit: 100,
};
enqueue (99% of the code is from the enqueue package)
function enqueue(fn, options) {
options = options || {};
var concurrency = options.concurrency || 1;
var timeout = options.timeout || false;
var limit = options.limit || Infinity;
var running = 0;
var tids = {};
var pending = [];
var id = 0;
return {
getRunning: function () {
return running;
},
getPending: function () {
return pending.map((job) => job[0]);
},
add: function () {
var args = sliced(arguments);
if (pending.length + running > limit) {
return new Error("queue limit reached, try later");
}
var args = sliced(arguments);
var last = args[args.length - 1];
var end = "function" == typeof last && last;
var ctx = this;
id++;
// remove "on end" function if there is one
end = end ? args.pop() : noop;
pending.push([id, ctx, args.concat(once(done(id)))]);
return next();
function next() {
if (running > concurrency) return;
var job = pending.shift();
if (!job) return;
var id = job[0];
var args = job[2];
var finish = args[args.length - 1];
running++;
// support timeouts
if (timeout) {
tids[id] = setTimeout(function () {
finish(new Error("job timed out"));
}, timeout);
}
// call the fn
return fn.apply(job[1], job[2]);
}
function done(id) {
return function _done() {
clearTimeout(tids[id]);
running--;
next();
return end.apply(this, arguments);
};
}
},
};
}
/**
* Once
*/
function once(fn) {
var called = false;
return function _once() {
if (called) return noop();
called = true;
return fn.apply(this, arguments);
};
}
Prompt (I reused the function that I have within yat-vault)
async function ReadInput(question) {
return new Promise((resolve) => {
const rl = readline.createInterface({
input,
output,
prompt: "",
});
rl.question(question, (answer) => {
rl.output.write("\n\r");
rl.close();
return resolve(answer);
});
});
}
Processing Function
const q = enqueue(function (url, done) {
const cmd = `echo "It can be whatever you want here"`;
exec(cmd, done);
}, options);
Main
(async () => {
while (true) {
console.log("Jobs Remaining", q.getPending().length);
console.log("Jobs processing", q.getRunning());
const response = await ReadInput("Prompt: ");
if (!response || response === "") {
console.error("ERR: Invalid Prompt");
continue;
}
q.add(response, (out, err) => {
if (err) console.error("err: ", err);
if (out) console.log("out: ", out);
else console.log("Ok!");
console.log("Jobs Remaining", q.getPending().length);
console.log("Jobs processing", q.getRunning());
});
}
})();
Voila !
The prompt will always be there and you will be able to send whatever input you want.
I have many other prototypes, that maybe someday I will concatenate altogether :shrugging: