Webux Lab

By Studio Webux

NodeJS - CLI Queue System

TG
Tommy Gingras Studio Webux 2023-09-09

Queue System within the CLI with NodeJS

I wanted to buld a simple CLI tool that has


The queue has been started and 99% built from : https://github.com/MatthewMueller/enqueue (I added some features that I need to monitor the process, this is very simple, but the idea was only to build a POC because I have other projects that can benefit from this prototype)

The Code

Dependencies

const { exec } = require("node:child_process");
const readline = require("node:readline");
const { stdin: input, stdout: output } = require("node:process");
const sliced = require("sliced");
const noop = function () {};

Options

const options = {
  concurrency: 2,
  timeout: 600000,
  limit: 100,
};

enqueue (99% of the code is from the enqueue package)

function enqueue(fn, options) {
  options = options || {};

  var concurrency = options.concurrency || 1;
  var timeout = options.timeout || false;
  var limit = options.limit || Infinity;
  var running = 0;
  var tids = {};
  var pending = [];
  var id = 0;

  return {
    getRunning: function () {
      return running;
    },

    getPending: function () {
      return pending.map((job) => job[0]);
    },

    add: function () {
      var args = sliced(arguments);

      if (pending.length + running > limit) {
        return new Error("queue limit reached, try later");
      }

      var args = sliced(arguments);
      var last = args[args.length - 1];
      var end = "function" == typeof last && last;
      var ctx = this;
      id++;

      // remove "on end" function if there is one
      end = end ? args.pop() : noop;
      pending.push([id, ctx, args.concat(once(done(id)))]);
      return next();

      function next() {
        if (running > concurrency) return;
        var job = pending.shift();
        if (!job) return;

        var id = job[0];
        var args = job[2];
        var finish = args[args.length - 1];

        running++;

        // support timeouts
        if (timeout) {
          tids[id] = setTimeout(function () {
            finish(new Error("job timed out"));
          }, timeout);
        }

        // call the fn
        return fn.apply(job[1], job[2]);
      }

      function done(id) {
        return function _done() {
          clearTimeout(tids[id]);
          running--;
          next();
          return end.apply(this, arguments);
        };
      }
    },
  };
}

/**
 * Once
 */

function once(fn) {
  var called = false;
  return function _once() {
    if (called) return noop();
    called = true;
    return fn.apply(this, arguments);
  };
}

Prompt (I reused the function that I have within yat-vault)

async function ReadInput(question) {
  return new Promise((resolve) => {
    const rl = readline.createInterface({
      input,
      output,
      prompt: "",
    });

    rl.question(question, (answer) => {
      rl.output.write("\n\r");
      rl.close();
      return resolve(answer);
    });
  });
}

Processing Function

const q = enqueue(function (url, done) {
  const cmd = `echo "It can be whatever you want here"`;
  exec(cmd, done);
}, options);

Main

(async () => {
  while (true) {
    console.log("Jobs Remaining", q.getPending().length);
    console.log("Jobs processing", q.getRunning());

    const response = await ReadInput("Prompt: ");
    if (!response || response === "") {
      console.error("ERR: Invalid Prompt");
      continue;
    }
    q.add(response, (out, err) => {
      if (err) console.error("err: ", err);
      if (out) console.log("out: ", out);
      else console.log("Ok!");

      console.log("Jobs Remaining", q.getPending().length);
      console.log("Jobs processing", q.getRunning());
    });
  }
})();

Voila !

The prompt will always be there and you will be able to send whatever input you want.

I have many other prototypes, that maybe someday I will concatenate altogether :shrugging:


Search