Improvements to playlist queue

This commit is contained in:
calzoneman 2013-09-29 11:30:36 -05:00
parent 3d3d21a3f9
commit 21bd270813
4 changed files with 110 additions and 6 deletions

View File

@ -8,14 +8,21 @@ AsyncQueue.prototype.next = function () {
if (this._q.length > 0) { if (this._q.length > 0) {
if (!this.lock()) if (!this.lock())
return; return;
var fn = this._q.shift(); var item = this._q.shift();
var fn = item[0], tm = item[1];
this._tm = Date.now() + item[1];
fn(this); fn(this);
} }
}; };
AsyncQueue.prototype.lock = function () { AsyncQueue.prototype.lock = function () {
if (this._lock) if (this._lock) {
if (this._tm > 0 && Date.now() > this._tm) {
this._tm = 0;
return true;
}
return false; return false;
}
this._lock = true; this._lock = true;
return true; return true;
@ -27,7 +34,7 @@ AsyncQueue.prototype.release = function () {
return false; return false;
self._lock = false; self._lock = false;
process.nextTick(function () { setImmediate(function () {
self.next(); self.next();
}); });
return true; return true;
@ -35,7 +42,7 @@ AsyncQueue.prototype.release = function () {
AsyncQueue.prototype.queue = function (fn) { AsyncQueue.prototype.queue = function (fn) {
var self = this; var self = this;
self._q.push(fn); self._q.push([fn, 20000]);
self.next(); self.next();
}; };
@ -43,3 +50,5 @@ AsyncQueue.prototype.reset = function () {
this._q = []; this._q = [];
this._lock = false; this._lock = false;
}; };
module.exports = AsyncQueue;

View File

@ -23,6 +23,7 @@ module.exports = function (Server) {
// This should cut down on needing to restart the server // This should cut down on needing to restart the server
var d = domain.create(); var d = domain.create();
d.on("error", function (err) { d.on("error", function (err) {
Logger.errlog.log(err.trace());
Logger.errlog.log("urlRetrieve failed: " + err); Logger.errlog.log("urlRetrieve failed: " + err);
Logger.errlog.log("Request was: " + options.host + options.path); Logger.errlog.log("Request was: " + options.host + options.path);
callback(503, err); callback(503, err);
@ -88,7 +89,8 @@ module.exports = function (Server) {
callback(true, null); callback(true, null);
return; return;
} }
var buffer = data;
try { try {
data = JSON.parse(data); data = JSON.parse(data);
var seconds = data.entry.media$group.yt$duration.seconds; var seconds = data.entry.media$group.yt$duration.seconds;
@ -599,6 +601,8 @@ module.exports = function (Server) {
getMedia: function (id, type, callback) { getMedia: function (id, type, callback) {
if(type in this.Getters) { if(type in this.Getters) {
this.Getters[type](id, callback); this.Getters[type](id, callback);
} else {
callback("Unknown media type '" + type + "'", null);
} }
} }
}; };

View File

@ -10,6 +10,7 @@ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLI
*/ */
ULList = require("./ullist").ULList; ULList = require("./ullist").ULList;
var AsyncQueue = require("./asyncqueue");
var Media = require("./media").Media; var Media = require("./media").Media;
var AllPlaylists = {}; var AllPlaylists = {};
@ -52,6 +53,7 @@ function Playlist(chan) {
}; };
this.lock = false; this.lock = false;
this.action_queue = []; this.action_queue = [];
this.fnqueue = new AsyncQueue();
this._qaInterval = false; this._qaInterval = false;
AllPlaylists[name] = this; AllPlaylists[name] = this;
@ -247,7 +249,58 @@ Playlist.prototype.addCachedMedia = function(data, callback) {
this.queueAction(action); this.queueAction(action);
} }
Playlist.prototype.addMedia = function(data, callback) { Playlist.prototype.addMedia = function (data, cb) {
var self = this;
if (data.type === "yp") {
this.addYouTubePlaylist(data, cb);
return;
}
this.fnqueue.queue(function (q) {
var afterData = function (m) {
if (data.maxlength && data.seconds > m.maxlength) {
setImmediate(function () {
cb("Media is too long!", null);
});
q.release();
return;
}
var it = self.makeItem(m);
it.queueby = data.queueby;
it.temp = data.temp;
var pos = "append";
if(data.pos == "next") {
if (!self.current)
pos = "prepend";
else
pos = self.current.uid;
}
var err = self.add(it, pos);
setImmediate(function () {
cb(err, err ? null : it);
});
q.release();
};
if (typeof data.title === "string") {
afterData(new Media(data.id, data.title, data.seconds, data.type));
} else {
self.server.infogetter.getMedia(data.id, data.type,
function (e, m) {
if (e) {
setImmediate(function () { cb(e, null); });
q.release();
return;
}
afterData(m);
});
}
});
};
Playlist.prototype._addMedia = function(data, callback) {
if(data.type == "yp") { if(data.type == "yp") {
this.addYouTubePlaylist(data, callback); this.addYouTubePlaylist(data, callback);

View File

@ -0,0 +1,38 @@
var fs = require('fs');
var io = require('socket.io-client');
var socket = io.connect('http://localhost:1337');
socket.on('connect', function () {
socket.emit('login', { name: 'test', pw: 'test' });
socket.emit('joinChannel', { name: 'test' });
});
socket.on('queueFail', function (msg) {
console.log(msg);
});
/* Stress test adding a lot of videos in a very short timespan */
function testAddVideos() {
var pl = fs.readFileSync('largepl.json') + '';
pl = JSON.parse(pl);
var ids = [];
for (var i = 0; i < pl.length; i++) {
if (pl[i].type === 'yt')
ids.push(pl[i].id);
}
for (var i = 0; i < ids.length; i++) {
(function (i) {
setTimeout(function () {
console.log('queue', ids[i]);
socket.emit('queue', {
id: ids[i],
type: 'yt',
pos: 'end'
});
}, 1050 * i);
})(i);
}
}
testAddVideos();