rewrite request queue, fixes #118

This commit is contained in:
jomo 2015-05-28 00:45:20 +02:00
parent 189698bed9
commit 01049cb34d
3 changed files with 44 additions and 49 deletions

View File

@ -107,39 +107,38 @@ function store_cape(rid, userId, profile, cache_details, callback) {
// used by store_images to queue simultaneous requests for identical userId
// the first request has to be completed until all others are continued
var currently_running = [];
// calls back all queued requests that match userId and type
function callback_for(userId, type, err, hash) {
var req_count = 0;
for (var i = 0; i < currently_running.length; i++) {
var current = currently_running[i];
if (current.userid === userId && current.type === type) {
req_count++;
if (req_count !== 1) {
// otherwise this would show up on single/first requests, too
logging.debug(current.rid, "queued", type + " request continued");
}
currently_running.splice(i, 1); // remove from array
current.callback(err, hash);
i--;
}
}
if (req_count > 1) {
logging.debug(req_count + " simultaneous requests for", userId);
// otherwise we risk running into Mojang's rate limit and deleting the cached skin
var requests = {
skin: {},
cape: {}
};
function push_request(userId, type, fun) {
if (!requests[type][userId]) {
requests[type][userId] = [];
}
requests[type][userId].push(fun);
}
// returns true if any object in +arr+ has a +property+ that matches +value+
//
// deep_property_check([{foo: "bar"}, {foo: "baz"}], "foo", "baz");
//
function deep_property_check(arr, property, value) {
for (var i = 0; i < arr.length; i++) {
if (arr[i][property] === value) {
return true;
// calls back all queued requests that match userId and type
function resume(userId, type, err, hash) {
var callbacks = requests[type][userId];
if (callbacks) {
if (callbacks.length > 1) {
logging.debug(callbacks.length, "simultaneous requests for", userId);
}
for (var i = 0; i < callbacks.length; i++) {
// continue the request
callbacks[i](err, hash);
// remove from array
callbacks.splice(i, 1);
i--;
}
// it's still an empty array
delete requests[type][userId];
}
return false;
}
// downloads the images for +userId+ while checking the cache
@ -148,14 +147,13 @@ function deep_property_check(arr, property, value) {
// callback: error, image hash
function store_images(rid, userId, cache_details, type, callback) {
var is_uuid = userId.length > 16;
var new_hash = {
rid: rid,
userid: userId,
type: type,
callback: callback
};
if (!deep_property_check(currently_running, "userid", userId)) {
currently_running.push(new_hash);
if (requests[type][userId]) {
logging.log(rid, "adding to request queue");
push_request(userId, type, callback);
} else {
// add request to the queue
push_request(userId, type, callback);
networking.get_profile(rid, (is_uuid ? userId : null), function(err, profile) {
if (err || (is_uuid && !profile)) {
// error or uuid without profile
@ -163,40 +161,37 @@ function store_images(rid, userId, cache_details, type, callback) {
// no error, but uuid without profile
cache.save_hash(rid, userId, null, null, function(cache_err) {
// we have no profile, so we have neither skin nor cape
callback_for(userId, "skin", cache_err, null);
callback_for(userId, "cape", cache_err, null);
resume(userId, "skin", cache_err, null);
resume(userId, "cape", cache_err, null);
});
} else {
// an error occured, not caching. we can try in 60 seconds
callback_for(userId, type, err, null);
resume(userId, type, err, null);
}
} else {
// no error and we have a profile (if it's a uuid)
store_skin(rid, userId, profile, cache_details, function(store_err, skin_hash) {
if (store_err && !skin_hash) {
// an error occured, not caching. we can try in 60 seconds
callback_for(userId, "skin", store_err, null);
resume(userId, "skin", store_err, null);
} else {
cache.save_hash(rid, userId, skin_hash, undefined, function(cache_err) {
callback_for(userId, "skin", (store_err || cache_err), skin_hash);
resume(userId, "skin", (store_err || cache_err), skin_hash);
});
}
});
store_cape(rid, userId, profile, cache_details, function(store_err, cape_hash) {
if (store_err && !cape_hash) {
// an error occured, not caching. we can try in 60 seconds
callback_for(userId, "cape", (store_err), cape_hash);
resume(userId, "cape", (store_err), cape_hash);
} else {
cache.save_hash(rid, userId, undefined, cape_hash, function(cache_err) {
callback_for(userId, "cape", (store_err || cache_err), cape_hash);
resume(userId, "cape", (store_err || cache_err), cape_hash);
});
}
});
}
});
} else {
logging.log(rid, "ID already being processed, adding to queue");
currently_running.push(new_hash);
}
}

View File

@ -74,7 +74,7 @@ exp.get_from_options = function(rid, url, options, callback) {
// log url + code + description
var code = response && response.statusCode;
if (error) {
logging.error(url, error);
logging.error(rid, url, error);
} else {
var logfunc = code && code < 405 ? logging.log : logging.warn;
logfunc(rid, url, code, http_code[code]);

View File

@ -46,8 +46,8 @@ module.exports = function(request, response, result) {
};
if (result.err) {
logging.error(result.err);
logging.error(result.err.stack);
logging.error(request.id, result.err);
logging.error(request.id, result.err.stack);
result.status = -1;
}