Skip to content

Commit

Permalink
Refactor queue related logic.
Browse files Browse the repository at this point in the history
Signed-off-by: Eric Wang <skygragon@gmail.com>
  • Loading branch information
skygragon committed Dec 26, 2017
1 parent a25cfa7 commit c3c6e81
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 96 deletions.
89 changes: 38 additions & 51 deletions lib/commands/submission.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ var sprintf = require('sprintf-js').sprintf;
var h = require('../helper');
var chalk = require('../chalk');
var log = require('../log');
var queue = require('../queue');
var Queue = require('../queue');
var core = require('../core');
var session = require('../session');

Expand Down Expand Up @@ -41,67 +41,62 @@ var cmd = {
}
};

function onTaskDone(e, msg, problem, cb) {
// NOTE: msg color means different purpose:
// - red: error
// - green: accepted, fresh download
// - yellow: not ac-ed, fresh download
// - white: existed already, skip download
log.printf('[%3d] %-60s %s', problem.id, problem.name,
(e ? chalk.red('ERROR: ' + (e.msg || e)) : msg));
if (cb) cb(e);
}

function onTaskRun(argv, problem, cb) {
var done = _.partial(onTaskDone, _, _, problem, cb);
function doTask(problem, queue, cb) {
var argv = queue.ctx.argv;

function onTaskDone(e, msg) {
// NOTE: msg color means different purpose:
// - red: error
// - green: accepted, fresh download
// - yellow: not ac-ed, fresh download
// - white: existed already, skip download
log.printf('[%3d] %-60s %s', problem.id, problem.name,
(e ? chalk.red('ERROR: ' + (e.msg || e)) : msg));
if (cb) cb(e);
}

if (argv.extra) {
// have to get problem details, e.g. problem description.
core.getProblem(problem.id, function(e, problem) {
if (e) return done(e);

exportSubmission(argv, problem, done);
exportSubmission(problem, argv, onTaskDone);
});
} else {
exportSubmission(argv, problem, done);
exportSubmission(problem, argv, onTaskDone);
}
}

function exportSubmission(argv, problem, cb) {
function exportSubmission(problem, argv, cb) {
core.getSubmissions(problem, function(e, submissions) {
if (e) return cb(e);
if (submissions.length === 0) return cb('no submissions?');
if (submissions.length === 0)
return cb('No submissions?');

// get obj list contain required filetype
var submissionInTargetType = _.filter(submissions, function(x) {
submissions = _.filter(submissions, function(x) {
return argv.lang === 'all' || argv.lang === x.lang;
});
if (submissionInTargetType.length === 0) {
return cb('No previous submission in required language.');
}
var submission = _.find(submissionInTargetType, function(x) {
return x.status_display === 'Accepted';
});

var submissionState = submission === undefined ? 'notac' : 'ac';
if (submissions.length === 0)
return cb('No submissions in required language.');

// if no accepted, use the latest non-accepted one
submission = submission || submissionInTargetType[0];

h.mkdir(argv.outdir);
var submission = _.find(submissions, function(x) {
return x.status_display === 'Accepted';
}) || submissions[0];
submission.ac = (submission.status_display === 'Accepted');

var filename = sprintf('%s/%d.%s.%s.%s%s',
var f = sprintf('%s/%d.%s.%s.%s%s',
argv.outdir,
problem.id,
problem.slug,
submission.id,
submissionState,
submission.ac ? 'ac' : 'notac',
h.langToExt(submission.lang));

h.mkdir(argv.outdir);
// skip the existing cached submissions
if (fs.existsSync(filename)) {
return cb(null, chalk.underline(filename));
}
if (fs.existsSync(f))
return cb(null, chalk.underline(f));

core.getSubmission(submission, function(e, submission) {
if (e) return cb(e);
Expand All @@ -111,29 +106,22 @@ function exportSubmission(argv, problem, cb) {
code: submission.code,
tpl: argv.extra ? 'detailed' : 'codeonly'
};
fs.writeFileSync(filename, core.exportProblem(problem, opts));

if (submission.status_display === 'Accepted')
cb(null, chalk.green.underline(filename));
else
cb(null, chalk.yellow.underline(filename));
fs.writeFileSync(f, core.exportProblem(problem, opts));
cb(null, submission.ac ? chalk.green.underline(f)
: chalk.yellow.underline(f));
});
});
}

cmd.handler = function(argv) {
session.argv = argv;
var doTask = _.partial(onTaskRun, argv, _, _);
var q = new Queue(null, {argv: argv}, doTask);

if (argv.all) {
core.getProblems(function(e, problems) {
if (e) return log.fail(e);

problems = problems.filter(function(q) {
return q.state === 'ac' || q.state === 'notac';
});

queue.run(problems, doTask);
problems = problems.filter(function(q) { return q.state === 'ac' || q.state === 'notac'; });
q.addTasks(problems).run();
});
return;
}
Expand All @@ -143,8 +131,7 @@ cmd.handler = function(argv) {

core.getProblem(argv.keyword, function(e, problem) {
if (e) return log.fail(e);

queue.run([problem], doTask);
q.addTask(problem).run();
});
};

Expand Down
48 changes: 27 additions & 21 deletions lib/plugins/leetcode.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ var config = require('../config');
var h = require('../helper');
var log = require('../log');
var Plugin = require('../plugin');
var queue = require('../queue');
var Queue = require('../queue');
var session = require('../session');

var plugin = new Plugin(10, 'leetcode', '',
Expand Down Expand Up @@ -52,20 +52,21 @@ function checkError(e, resp, expectedStatus) {
plugin.getProblems = function(cb) {
log.debug('running leetcode.getProblems');
var problems = [];
var doTask = function(category, taskDone) {
var getCategory = function(category, queue, cb) {
plugin.getCategoryProblems(category, function(e, _problems) {
if (e) {
log.debug(category + ': failed to getProblems: ' + e.msg);
} else {
log.debug(category + ': getProblems got ' + _problems.length + ' problems');
problems = problems.concat(_problems);
}
return taskDone(e);
return cb(e);
});
};

spin = h.spin('Downloading problems');
queue.run(config.sys.categories, doTask, function(e) {
var q = new Queue(config.sys.categories, {}, getCategory);
q.run(null, function(e) {
spin.stop();
return cb(e, problems);
});
Expand Down Expand Up @@ -144,7 +145,7 @@ plugin.getProblem = function(problem, cb) {
operationName: 'getQuestionDetail'
};

spin = h.spin('Downloading ' + problem.slug);
var spin = h.spin('Downloading ' + problem.slug);
request.post(opts, function(e, resp, body) {
spin.stop();
e = checkError(e, resp, 200);
Expand Down Expand Up @@ -181,7 +182,7 @@ function runCode(opts, problem, cb) {
typed_code: h.getFileData(problem.file)
});

spin = h.spin('Sending code to judge');
var spin = h.spin('Sending code to judge');
request(opts, function(e, resp, body) {
spin.stop();
e = checkError(e, resp, 200);
Expand Down Expand Up @@ -209,13 +210,12 @@ function runCode(opts, problem, cb) {
});
}

function verifyResult(opts, jobs, results, cb) {
if (jobs.length === 0) return cb(null, results);

function verifyResult(task, queue, cb) {
var opts = queue.ctx.opts;
opts.method = 'GET';
opts.url = config.sys.urls.verify.replace('$id', jobs[0].id);
opts.url = config.sys.urls.verify.replace('$id', task.id);

spin = h.spin('Waiting for judge result');
var spin = h.spin('Waiting for judge result');
request(opts, function(e, resp, body) {
spin.stop();
e = checkError(e, resp, 200);
Expand All @@ -224,12 +224,12 @@ function verifyResult(opts, jobs, results, cb) {
var result = JSON.parse(body);
if (result.state === 'SUCCESS') {
result = formatResult(result);
_.extendOwn(result, jobs[0]);
results.push(result);
jobs.shift();
_.extendOwn(result, task);
queue.ctx.results.push(result);
} else {
queue.addTask(task);
}

setImmediate(verifyResult, opts, jobs, results, cb);
return cb();
});
}

Expand Down Expand Up @@ -274,11 +274,14 @@ plugin.testProblem = function(problem, cb) {
runCode(opts, problem, function(e, task) {
if (e) return cb(e);

var jobs = [
var tasks = [
{type: 'Actual', id: task.interpret_id},
{type: 'Expected', id: task.interpret_expected_id}
];
verifyResult(opts, jobs, [], cb);
var q = new Queue(tasks, {opts: opts, results: []}, verifyResult);
q.run(null, function(e, ctx) {
return cb(e, ctx.results);
});
});
};

Expand All @@ -290,8 +293,11 @@ plugin.submitProblem = function(problem, cb) {
runCode(opts, problem, function(e, task) {
if (e) return cb(e);

var jobs = [{type: 'Actual', id: task.submission_id}];
verifyResult(opts, jobs, [], cb);
var tasks = [{type: 'Actual', id: task.submission_id}];
var q = new Queue(tasks, {opts: opts, results: []}, verifyResult);
q.run(null, function(e, ctx) {
return cb(e, ctx.results);
});
});
};

Expand Down Expand Up @@ -376,7 +382,7 @@ plugin.getFavorites = function(cb) {

plugin.signin = function(user, cb) {
log.debug('running leetcode.signin');
spin = h.spin('Signing in leetcode.com');
var spin = h.spin('Signing in leetcode.com');
request(config.sys.urls.login, function(e, resp, body) {
spin.stop();
e = checkError(e, resp, 200);
Expand Down
60 changes: 36 additions & 24 deletions lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,49 @@ var _ = require('underscore');

var config = require('./config');

var queue = {};
function Queue(tasks, ctx, onTask) {
this.tasks = _.clone(tasks) || [];
this.ctx = ctx || {};
this.onTask = onTask;
this.error = null;
}

Queue.prototype.addTask = function(task) {
this.tasks.push(task);
return this;
};

Queue.prototype.addTasks = function(tasks) {
this.tasks = this.tasks.concat(tasks);
return this;
};

Queue.prototype.run = function(concurrency, onDone) {
this.concurrency = concurrency || config.network.concurrency || 1;
this.onDone = onDone;

function workerRun(ctx) {
var self = this;
for (var i = 0; i < this.concurrency; ++i) {
setImmediate(function() { self.workerRun(); });
}
};

Queue.prototype.workerRun = function() {
// no more tasks, quit now
if (ctx.tasks.length === 0) {
if (--ctx.workers === 0 && ctx.cb)
ctx.cb(ctx.error);
if (this.tasks.length === 0) {
if (--this.concurrency === 0 && this.onDone)
this.onDone(this.error, this.ctx);
return;
}

var task = ctx.tasks.shift();
ctx.doTask(task, function(e) {
if (e) ctx.error = e;
var task = this.tasks.shift();
var self = this;
this.onTask(task, self, function(e) {
if (e) self.error = e;

// TODO: could retry failed task here.
setImmediate(workerRun, ctx);
setImmediate(function() { self.workerRun(); });
});
}

queue.run = function(tasks, doTask, cb) {
var ctx = {
tasks: _.clone(tasks),
doTask: doTask,
cb: cb,
workers: config.network.concurrency || 1,
error: null
};

for (var i = 0; i < ctx.workers; ++i) {
setImmediate(workerRun, ctx);
}
};

module.exports = queue;
module.exports = Queue;

0 comments on commit c3c6e81

Please sign in to comment.