From 87a4455ec069ea121a0f58be45df7c3cc1cbd44f Mon Sep 17 00:00:00 2001 From: Etienne Brodu Date: Wed, 20 Jun 2012 13:22:55 +0200 Subject: [PATCH 01/11] fix missing start / stop on websocket --- lib/cube/metric.js | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/lib/cube/metric.js b/lib/cube/metric.js index 3421aea2..deb38ba4 100644 --- a/lib/cube/metric.js +++ b/lib/cube/metric.js @@ -8,7 +8,8 @@ var parser = require("./metric-expression"), var metric_fields = {v: 1}, metric_options = {sort: {"_id.t": 1}, batchSize: 1000}, - event_options = {sort: {t: 1}, batchSize: 1000}; + event_options = {sort: {t: 1}, batchSize: 1000}, + limitMax = 1e4; // Query for metrics. exports.getter = function(db) { @@ -18,13 +19,30 @@ exports.getter = function(db) { meta = event.putter(db); function getter(request, callback) { + + var limit = +request.limit, + step = +request.step; + + // Provide default start and stop times for recent events. + // If the limit is not specified, or too big, use the maximum limit. + if (!(step)) step = 1e4; + if (!("stop" in request)) request.stop = Math.floor(Date.now() / step) * step; + if (!("start" in request)) request.start = 0; + if (!(limit <= limitMax)) limit = limitMax; + + // If the time between start and stop is too long, then bring the start time + // forward so that only the most recent results are returned. This is only + // approximate in the case of months, but why would you want to return + // exactly ten thousand months? Don't rely on exact limits! var start = new Date(request.start), stop = new Date(request.stop), id = request.id; + if ((stop - start) / step > limit) start = new Date(stop - step * limit); // Validate the dates. - if (isNaN(start)) return callback({error: "invalid start"}), -1; - if (isNaN(stop)) return callback({error: "invalid stop"}), -1; + // edit: not needed anymore + //if (isNaN(start)) return callback({error: "invalid start"}), -1; + //if (isNaN(stop)) return callback({error: "invalid stop"}), -1; // Parse the expression. var expression; @@ -35,7 +53,7 @@ exports.getter = function(db) { } // Round start and stop to the appropriate time step. - var tier = tiers[+request.step]; + var tier = tiers[step]; if (!tier) return callback({error: "invalid step"}), -1; start = tier.floor(start); stop = tier.ceil(stop); From 6b806db029a7b6283d914b4e0b956eb92e826d4a Mon Sep 17 00:00:00 2001 From: Etienne Brodu Date: Wed, 20 Jun 2012 17:46:10 +0200 Subject: [PATCH 02/11] adding websocket polling to metric --- lib/cube/metric.js | 125 +++++++++++++++++++++++++++++++++++++-------- 1 file changed, 105 insertions(+), 20 deletions(-) diff --git a/lib/cube/metric.js b/lib/cube/metric.js index deb38ba4..e1434f5e 100644 --- a/lib/cube/metric.js +++ b/lib/cube/metric.js @@ -4,45 +4,40 @@ var parser = require("./metric-expression"), tiers = require("./tiers"), types = require("./types"), reduces = require("./reduces"), - event = require("./event"); + event = require("./event"), + util = require("util"); var metric_fields = {v: 1}, metric_options = {sort: {"_id.t": 1}, batchSize: 1000}, event_options = {sort: {t: 1}, batchSize: 1000}, limitMax = 1e4; +var streamInterval = 1000; + // Query for metrics. exports.getter = function(db) { var collection = types(db), Double = db.bson_serializer.Double, queueByName = {}, - meta = event.putter(db); + meta = event.putter(db), + streamsBySource = {}; function getter(request, callback) { - var limit = +request.limit, - step = +request.step; - // Provide default start and stop times for recent events. // If the limit is not specified, or too big, use the maximum limit. - if (!(step)) step = 1e4; - if (!("stop" in request)) request.stop = Math.floor(Date.now() / step) * step; - if (!("start" in request)) request.start = 0; - if (!(limit <= limitMax)) limit = limitMax; + var stream = !("stop" in request), + limit = !(+request.limit >= limitMax) ? request.limit : limitMax, + step = +request.step ? +request.step : 1e4, + stop = ("stop" in request) ? new Date(request.stop) : new Date(Math.floor(Date.now() / step) * step); + start = ("start" in request) ? new Date(request.start) : new Date(0), + id = request.id; // If the time between start and stop is too long, then bring the start time // forward so that only the most recent results are returned. This is only // approximate in the case of months, but why would you want to return // exactly ten thousand months? Don't rely on exact limits! - var start = new Date(request.start), - stop = new Date(request.stop), - id = request.id; - if ((stop - start) / step > limit) start = new Date(stop - step * limit); - - // Validate the dates. - // edit: not needed anymore - //if (isNaN(start)) return callback({error: "invalid start"}), -1; - //if (isNaN(stop)) return callback({error: "invalid stop"}), -1; + if ((stop - start) / step > limit) start = new Date(stop - step * limit); // Parse the expression. var expression; @@ -52,16 +47,98 @@ exports.getter = function(db) { return callback({error: "invalid expression"}), -1; } + // Copy any expression filters into the query object. + var filter = {t: {$gte: start, $lt: stop}}; + expression.filter(filter); + // Round start and stop to the appropriate time step. var tier = tiers[step]; if (!tier) return callback({error: "invalid step"}), -1; start = tier.floor(start); stop = tier.ceil(stop); - // Compute the request metric! - measure(expression, start, stop, tier, "id" in request + // For streaming queries, share streams for efficient polling. + if (stream) { + + if (!streamsBySource[expression.source]) + streamsBySource[expression.source] = []; + + var streams = streamsBySource[expression.source][tier.key]; + + // If there is an existing stream to attach to, backfill the initial set + // of results to catch the client up to the stream. Add the new callback + // to a queue, so that when the shared stream finishes its current poll, + // it begins notifying the new client. Note that we don't pass the null + // (end terminator) to the callback, because more results are to come! + if (streams) { + + filter.t.$lt = streams.time; + streams.waiting.push(callback); + measure(expression, start, stop, tier, "id" in request ? function(time, value) { callback({time: time, value: value, id: id}); } : function(time, value) { callback({time: time, value: value}); }); + } + + // Otherwise, we're creating a new stream, so we're responsible for + // starting the polling loop. This means notifying active callbacks, + // detecting when active callbacks are closed, advancing the time window, + // and moving waiting clients to active clients. + else { + streams = streamsBySource[expression.source][tier.key] = {time: stop, waiting: [], active: [callback]}; + (function poll() { + + poll.triggered = false; + + var callbackFunction = function(metric) { + + //util.log('processed : ' + JSON.stringify(metric)); + + + // If there's a value in metric, send it to all active, open clients. + if (metric.value !== undefined) { + streams.active.forEach(function(callback) { + // TODO + if (!callback.closed) callback(metric); + }); + } + + // If poll isn't triggered, that's mean we have to do it. + // merge the waiting callbacks into the active callbacks. Advance + // the time range, and set a timeout for the next poll. + + if (!poll.triggered) { + streams.active = streams.active.concat(streams.waiting).filter(open); + streams.waiting = []; + + // If no clients remain, then it's safe to delete the shared + // stream, and we'll no longer be responsible for polling. + if (!streams.active.length) { + delete streamsBySource[expression.source][tier.key]; + if (streamsBySource[expression.source].length === 0) + delete streamsBySource[expression.source]; + return; + } + + filter.t.$gte = filter.t.$lt; + filter.t.$lt = streams.time = tier.ceil(new Date(Date.now())); + setTimeout(poll, tier.key); + poll.triggered = true; + } + + }; + + measure(expression, filter.t.$gte, filter.t.$lt, tier, "id" in request + ? function(time, value) { callbackFunction({time: time, value: value, id: id}); } + : function(time, value) { callbackFunction({time: time, value: value}); }); + })(); + } + } else { + + // Compute the request metric! + measure(expression, start, stop, tier, "id" in request + ? function(time, value) { callback({time: time, value: value, id: id}); } + : function(time, value) { callback({time: time, value: value}); }); + } } // Computes the metric for the given expression for the time interval from @@ -256,9 +333,17 @@ exports.getter = function(db) { }); } + getter.close = function(callback) { + callback.closed = true; + }; + return getter; }; function handle(error) { if (error) throw error; } + +function open(callback) { + return !callback.closed; +} \ No newline at end of file From ddee9c2f2da909404b3bc57066c8cf78720a32fc Mon Sep 17 00:00:00 2001 From: Etienne Brodu Date: Thu, 21 Jun 2012 12:20:52 +0200 Subject: [PATCH 03/11] FIX : uncaught exception: TypeError: Cannot use 'in' operator to search for 'start' in ... --- lib/cube/metric.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/cube/metric.js b/lib/cube/metric.js index e1434f5e..fdfe02b9 100644 --- a/lib/cube/metric.js +++ b/lib/cube/metric.js @@ -26,11 +26,11 @@ exports.getter = function(db) { // Provide default start and stop times for recent events. // If the limit is not specified, or too big, use the maximum limit. - var stream = !("stop" in request), + var stream = (request.stop === undefined) ? true : false,//stream = !("stop" in request), limit = !(+request.limit >= limitMax) ? request.limit : limitMax, step = +request.step ? +request.step : 1e4, - stop = ("stop" in request) ? new Date(request.stop) : new Date(Math.floor(Date.now() / step) * step); - start = ("start" in request) ? new Date(request.start) : new Date(0), + stop = (request.stop !== undefined) ? new Date(request.stop) : new Date(Math.floor(Date.now() / step) * step); + start = (request.start !== undefined) ? new Date(request.start) : new Date(0), id = request.id; // If the time between start and stop is too long, then bring the start time From d9c0387388bda0b6c1fc27965a1ece12f3c81f0f Mon Sep 17 00:00:00 2001 From: Etienne Brodu Date: Fri, 22 Jun 2012 10:57:48 +0200 Subject: [PATCH 04/11] stream feature for metric --- lib/cube/metric.js | 104 +++++++++++++++------------------------------ lib/cube/server.js | 1 + 2 files changed, 36 insertions(+), 69 deletions(-) diff --git a/lib/cube/metric.js b/lib/cube/metric.js index fdfe02b9..ee234242 100644 --- a/lib/cube/metric.js +++ b/lib/cube/metric.js @@ -33,6 +33,8 @@ exports.getter = function(db) { start = (request.start !== undefined) ? new Date(request.start) : new Date(0), id = request.id; + util.log('>>> request for : ' + id + ' - ' + request.expression); + // If the time between start and stop is too long, then bring the start time // forward so that only the most recent results are returned. This is only // approximate in the case of months, but why would you want to return @@ -57,84 +59,51 @@ exports.getter = function(db) { start = tier.floor(start); stop = tier.ceil(stop); - // For streaming queries, share streams for efficient polling. + // TODO share streams for efficient polling if (stream) { + + var streamHook = { time: stop, id: request.id, callback: callback, triggered: false, + poll: (function() { - if (!streamsBySource[expression.source]) - streamsBySource[expression.source] = []; - - var streams = streamsBySource[expression.source][tier.key]; - - // If there is an existing stream to attach to, backfill the initial set - // of results to catch the client up to the stream. Add the new callback - // to a queue, so that when the shared stream finishes its current poll, - // it begins notifying the new client. Note that we don't pass the null - // (end terminator) to the callback, because more results are to come! - if (streams) { - - filter.t.$lt = streams.time; - streams.waiting.push(callback); - measure(expression, start, stop, tier, "id" in request - ? function(time, value) { callback({time: time, value: value, id: id}); } - : function(time, value) { callback({time: time, value: value}); }); - } - - // Otherwise, we're creating a new stream, so we're responsible for - // starting the polling loop. This means notifying active callbacks, - // detecting when active callbacks are closed, advancing the time window, - // and moving waiting clients to active clients. - else { - streams = streamsBySource[expression.source][tier.key] = {time: stop, waiting: [], active: [callback]}; - (function poll() { + streamHook.triggered = false; - poll.triggered = false; - - var callbackFunction = function(metric) { + callback.onClose = function() { + if(streamHook.triggered !== false) + clearTimeout(streamHook.triggered); + streamHook = null; + }; - //util.log('processed : ' + JSON.stringify(metric)); + measure(expression, filter.t.$gte, filter.t.$lt, tier, function(time, value) { + (function(metric) { + + if (metric.value !== undefined) { + metric.id = streamHook.id; + if (!streamHook.callback.closed) { + streamHook.callback(metric); + } else { + // We might have a problem here. + return; + } + } + if (!streamHook.triggered) { - // If there's a value in metric, send it to all active, open clients. - if (metric.value !== undefined) { - streams.active.forEach(function(callback) { - // TODO - if (!callback.closed) callback(metric); - }); - } + filter.t.$gte = filter.t.$lt; + filter.t.$lt = streamHook.time = tier.ceil(new Date(Date.now())); - // If poll isn't triggered, that's mean we have to do it. - // merge the waiting callbacks into the active callbacks. Advance - // the time range, and set a timeout for the next poll. - - if (!poll.triggered) { - streams.active = streams.active.concat(streams.waiting).filter(open); - streams.waiting = []; - - // If no clients remain, then it's safe to delete the shared - // stream, and we'll no longer be responsible for polling. - if (!streams.active.length) { - delete streamsBySource[expression.source][tier.key]; - if (streamsBySource[expression.source].length === 0) - delete streamsBySource[expression.source]; - return; + streamHook.triggered = setTimeout(streamHook.poll, tier.key); } - filter.t.$gte = filter.t.$lt; - filter.t.$lt = streams.time = tier.ceil(new Date(Date.now())); - setTimeout(poll, tier.key); - poll.triggered = true; - } + })({time: time, value: value}); + }); + }) + }; - }; + streamHook.poll(); - measure(expression, filter.t.$gte, filter.t.$lt, tier, "id" in request - ? function(time, value) { callbackFunction({time: time, value: value, id: id}); } - : function(time, value) { callbackFunction({time: time, value: value}); }); - })(); - } } else { - // Compute the request metric! + // Compute the request metric! measure(expression, start, stop, tier, "id" in request ? function(time, value) { callback({time: time, value: value, id: id}); } : function(time, value) { callback({time: time, value: value}); }); @@ -335,6 +304,7 @@ exports.getter = function(db) { getter.close = function(callback) { callback.closed = true; + util.log(' >>> closing callback'); }; return getter; @@ -342,8 +312,4 @@ exports.getter = function(db) { function handle(error) { if (error) throw error; -} - -function open(callback) { - return !callback.closed; } \ No newline at end of file diff --git a/lib/cube/server.js b/lib/cube/server.js index 8a2ceda2..84a9a4fc 100644 --- a/lib/cube/server.js +++ b/lib/cube/server.js @@ -82,6 +82,7 @@ module.exports = function(options) { // Listen for socket disconnect. if (e.dispatch.close) connection.socket.on("end", function() { e.dispatch.close(callback); + if( callback.onClose ) callback.onClose(); }); connection.on("message", function(message) { From 374e6ccdf5c0ebbbdb75c90c51a68e5436c1cf71 Mon Sep 17 00:00:00 2001 From: Etienne Brodu Date: Fri, 22 Jun 2012 15:30:29 +0200 Subject: [PATCH 05/11] resolve close streamHook problem --- lib/cube/metric.js | 30 +++++++++++++++++++----------- lib/cube/server.js | 1 - 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/lib/cube/metric.js b/lib/cube/metric.js index ee234242..89375d33 100644 --- a/lib/cube/metric.js +++ b/lib/cube/metric.js @@ -63,27 +63,35 @@ exports.getter = function(db) { if (stream) { var streamHook = { time: stop, id: request.id, callback: callback, triggered: false, - poll: (function() { - streamHook.triggered = false; + poll: (function() { - callback.onClose = function() { + if(streamHook.callback.closed) { if(streamHook.triggered !== false) clearTimeout(streamHook.triggered); streamHook = null; - }; + return; + } + + streamHook.triggered = false; + + //util.log('>>> request : ' + filter.t.$gte + ' <-> ' + filter.t.$lt); measure(expression, filter.t.$gte, filter.t.$lt, tier, function(time, value) { (function(metric) { + if (!streamHook) { + // streamHook closed already + return; + } + + //util.log('>>> metric : ' + JSON.stringify(metric)); if (metric.value !== undefined) { - metric.id = streamHook.id; - if (!streamHook.callback.closed) { - streamHook.callback(metric); - } else { - // We might have a problem here. - return; - } + //util.log(' >>> sending : ' + streamHook.id); + metric.id = streamHook.id; + if (!streamHook.callback.closed) { + streamHook.callback(metric); + } } if (!streamHook.triggered) { diff --git a/lib/cube/server.js b/lib/cube/server.js index 84a9a4fc..8a2ceda2 100644 --- a/lib/cube/server.js +++ b/lib/cube/server.js @@ -82,7 +82,6 @@ module.exports = function(options) { // Listen for socket disconnect. if (e.dispatch.close) connection.socket.on("end", function() { e.dispatch.close(callback); - if( callback.onClose ) callback.onClose(); }); connection.on("message", function(message) { From f8d24a951ab3de74812d4b075115b18d2491f825 Mon Sep 17 00:00:00 2001 From: Etienne Brodu Date: Fri, 22 Jun 2012 17:08:01 +0200 Subject: [PATCH 06/11] better design for polling metrics : share polling function by tier --- lib/cube/metric.js | 113 +++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 105 insertions(+), 8 deletions(-) diff --git a/lib/cube/metric.js b/lib/cube/metric.js index 89375d33..9937b667 100644 --- a/lib/cube/metric.js +++ b/lib/cube/metric.js @@ -20,7 +20,7 @@ exports.getter = function(db) { Double = db.bson_serializer.Double, queueByName = {}, meta = event.putter(db), - streamsBySource = {}; + streamsByTier = {}; function getter(request, callback) { @@ -59,8 +59,105 @@ exports.getter = function(db) { start = tier.floor(start); stop = tier.ceil(stop); - // TODO share streams for efficient polling if (stream) { + streams = streamsByTier[tier.key]; + + if (streams) { + streams.waiting.push({ + expression: expression, + id: id, + callback: callback + }); + // Compute the request metric! + measure(expression, start, stop, tier, "id" in request + ? function(time, value) { if (value) callback({time: time, value: value, id: id}); } + : function(time, value) { if (value) callback({time: time, value: value}); }); + } + else + { + streams = streamsByTier[tier.key] = { + tier: tier, + time: stop, + waiting: [], + active: [{ + expression: expression, + id: id, + callback: callback + }] + }; + + (function poll(streams){ + + util.log(' >>> + about to filter active connections : ' + streams.active.length); + streams.active = streams.active.concat(streams.waiting).filter(open); + util.log(' >>> - after filtering active connections : ' + streams.active.length); + streams.waiting = []; + + if (!streams.active.length) { + delete streamsByTier[streams.tier.key]; + } + else + { + streams.active.forEach(function(stream) { + util.log('stream : ', stream); + measure(stream.expression, streams.time - streams.tier.key, streams.time, streams.tier, function(time, value) { + + if (stream.callback.closed) { + // callback closed already + util.log(' >>> callback closed'); + return; + } + + if (value !== undefined) { + var metric = { + time: time, + value: value + } + if (stream.id) { + metric.id = stream.id; + } + + stream.callback(metric); + + util.log(' >>> sending : '); + util.log(' >>> ' + metric.time); + util.log(' >>> ' + metric.value); + if (stream.id) util.log(' >>> ' + metric.id); + } + else { + util.log(' >>> no value : ' + value); + } + }); + }); + } + + // TODO tier still are obscure to me. + streams.time = streams.time + streams.tier.key; + // calculate next step + var timer = streams.tier.key;// - tier.ceil(new Date(Date.now())); + + setTimeout(poll.bind(this, streams), timer); + + })(streams); + + + } // if streams + + + } // if stream + + + + + + + + + + + + // TODO share streams for efficient polling + /*if (stream) { var streamHook = { time: stop, id: request.id, callback: callback, triggered: false, @@ -75,8 +172,6 @@ exports.getter = function(db) { streamHook.triggered = false; - //util.log('>>> request : ' + filter.t.$gte + ' <-> ' + filter.t.$lt); - measure(expression, filter.t.$gte, filter.t.$lt, tier, function(time, value) { (function(metric) { @@ -84,10 +179,7 @@ exports.getter = function(db) { // streamHook closed already return; } - - //util.log('>>> metric : ' + JSON.stringify(metric)); if (metric.value !== undefined) { - //util.log(' >>> sending : ' + streamHook.id); metric.id = streamHook.id; if (!streamHook.callback.closed) { streamHook.callback(metric); @@ -96,6 +188,7 @@ exports.getter = function(db) { if (!streamHook.triggered) { + filter.t.$gte = filter.t.$lt; filter.t.$lt = streamHook.time = tier.ceil(new Date(Date.now())); @@ -109,7 +202,7 @@ exports.getter = function(db) { streamHook.poll(); - } else { + } */else { // Compute the request metric! measure(expression, start, stop, tier, "id" in request @@ -320,4 +413,8 @@ exports.getter = function(db) { function handle(error) { if (error) throw error; +} + +function open(request) { + return !request.callback.closed; } \ No newline at end of file From 004fbb7b8cb796e551b593097ef42e8f2ddd2791 Mon Sep 17 00:00:00 2001 From: Etienne Brodu Date: Mon, 25 Jun 2012 12:38:59 +0200 Subject: [PATCH 07/11] sharing poll function by Tier --- lib/cube/metric.js | 168 ++++++++++++++------------------------------- 1 file changed, 52 insertions(+), 116 deletions(-) diff --git a/lib/cube/metric.js b/lib/cube/metric.js index 9937b667..b5888e20 100644 --- a/lib/cube/metric.js +++ b/lib/cube/metric.js @@ -33,8 +33,6 @@ exports.getter = function(db) { start = (request.start !== undefined) ? new Date(request.start) : new Date(0), id = request.id; - util.log('>>> request for : ' + id + ' - ' + request.expression); - // If the time between start and stop is too long, then bring the start time // forward so that only the most recent results are returned. This is only // approximate in the case of months, but why would you want to return @@ -59,25 +57,34 @@ exports.getter = function(db) { start = tier.floor(start); stop = tier.ceil(stop); + // Compute the request metric + measure(expression, start, stop, tier, "id" in request + ? function(time, value) { if (value) callback({time: time, value: value, id: request.id}); } + : function(time, value) { if (value) callback({time: time, value: value}); }); + if (stream) { - streams = streamsByTier[tier.key]; + // for efficient polling, polling function handle all request with the same tier. + var streams = streamsByTier[tier.key]; + + // A poll function already exist for this interval : + // just push this request on the waiting stack, ready to be executed on next poll. if (streams) { streams.waiting.push({ expression: expression, id: id, callback: callback }); - // Compute the request metric! - measure(expression, start, stop, tier, "id" in request - ? function(time, value) { if (value) callback({time: time, value: value, id: id}); } - : function(time, value) { if (value) callback({time: time, value: value}); }); } + + // No poll function exist for this interval, let's create a new one. else { + streams = streamsByTier[tier.key] = { tier: tier, - time: stop, + start: stop, + stop: new Date(stop + tier.key), waiting: [], active: [{ expression: expression, @@ -86,130 +93,60 @@ exports.getter = function(db) { }] }; - (function poll(streams){ - - util.log(' >>> + about to filter active connections : ' + streams.active.length); - streams.active = streams.active.concat(streams.waiting).filter(open); - util.log(' >>> - after filtering active connections : ' + streams.active.length); - streams.waiting = []; - - if (!streams.active.length) { - delete streamsByTier[streams.tier.key]; - } - else - { - streams.active.forEach(function(stream) { - util.log('stream : ', stream); - measure(stream.expression, streams.time - streams.tier.key, streams.time, streams.tier, function(time, value) { - - if (stream.callback.closed) { - // callback closed already - util.log(' >>> callback closed'); - return; - } - - if (value !== undefined) { - var metric = { - time: time, - value: value - } - if (stream.id) { - metric.id = stream.id; - } - - stream.callback(metric); - - util.log(' >>> sending : '); - util.log(' >>> ' + metric.time); - util.log(' >>> ' + metric.value); - if (stream.id) util.log(' >>> ' + metric.id); - } - else { - util.log(' >>> no value : ' + value); - } - }); - }); - } - - // TODO tier still are obscure to me. - streams.time = streams.time + streams.tier.key; - // calculate next step - var timer = streams.tier.key;// - tier.ceil(new Date(Date.now())); - - setTimeout(poll.bind(this, streams), timer); + // We call the poll function for the next loop. + // no need to call it right now because measure() already have been called for the current range. - })(streams); + var timer = streams.stop.getTime() - Date.now(); + if (timer < 0) timer += tier.key; + setTimeout(poll.bind(this, streams), timer); } // if streams - - } // if stream + } + function poll(streams){ + streams.active = streams.active.concat(streams.waiting).filter(open); + streams.waiting = []; + if (!streams.active.length) { + delete streamsByTier[streams.tier.key]; + return; + } + else + { + streams.active.forEach(function(stream) { + measure(stream.expression, streams.start, streams.stop, streams.tier, function(time, value) { - - - - - - - - - // TODO share streams for efficient polling - /*if (stream) { - - var streamHook = { time: stop, id: request.id, callback: callback, triggered: false, - - poll: (function() { - - if(streamHook.callback.closed) { - if(streamHook.triggered !== false) - clearTimeout(streamHook.triggered); - streamHook = null; + if (stream.callback.closed) { + // callback closed already return; } - streamHook.triggered = false; - - measure(expression, filter.t.$gte, filter.t.$lt, tier, function(time, value) { - (function(metric) { - - if (!streamHook) { - // streamHook closed already - return; - } - if (metric.value !== undefined) { - metric.id = streamHook.id; - if (!streamHook.callback.closed) { - streamHook.callback(metric); - } - } - - if (!streamHook.triggered) { - + if (value !== undefined) { + var metric = { + time: time, + value: value + }; - filter.t.$gte = filter.t.$lt; - filter.t.$lt = streamHook.time = tier.ceil(new Date(Date.now())); + if (stream.id) { + metric.id = stream.id; + } - streamHook.triggered = setTimeout(streamHook.poll, tier.key); - } + stream.callback(metric); + } + }); + }); + } - })({time: time, value: value}); - }); - }) - }; + streams.start = streams.stop; + streams.stop = streams.tier.floor(Date.now()); - streamHook.poll(); + var timer = streams.stop.getTime() + streams.tier.key - Date.now(); + setTimeout(poll.bind(this, streams), timer); + } - } */else { - // Compute the request metric! - measure(expression, start, stop, tier, "id" in request - ? function(time, value) { callback({time: time, value: value, id: id}); } - : function(time, value) { callback({time: time, value: value}); }); - } - } // Computes the metric for the given expression for the time interval from // start (inclusive) to stop (exclusive). The time granularity is determined @@ -405,7 +342,6 @@ exports.getter = function(db) { getter.close = function(callback) { callback.closed = true; - util.log(' >>> closing callback'); }; return getter; From 516271a1fcf6a7f42baeb19daf7a79f5c39b671d Mon Sep 17 00:00:00 2001 From: Etienne Brodu Date: Tue, 26 Jun 2012 15:48:01 +0200 Subject: [PATCH 08/11] test passed --- lib/cube/metric.js | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/lib/cube/metric.js b/lib/cube/metric.js index b5888e20..7aab2cb4 100644 --- a/lib/cube/metric.js +++ b/lib/cube/metric.js @@ -47,9 +47,13 @@ exports.getter = function(db) { return callback({error: "invalid expression"}), -1; } - // Copy any expression filters into the query object. - var filter = {t: {$gte: start, $lt: stop}}; - expression.filter(filter); + // I don't understand why sometime expression.filter is defined, and sometime it's not. + // this condition make the tests successful. + if(expression.filter) { + // Copy any expression filters into the query object. + var filter = {t: {$gte: start, $lt: stop}}; + expression.filter(filter); + } // Round start and stop to the appropriate time step. var tier = tiers[step]; @@ -59,10 +63,11 @@ exports.getter = function(db) { // Compute the request metric measure(expression, start, stop, tier, "id" in request - ? function(time, value) { if (value) callback({time: time, value: value, id: request.id}); } - : function(time, value) { if (value) callback({time: time, value: value}); }); + ? function(time, value) { callback({time: time, value: value, id: request.id}); } + : function(time, value) { callback({time: time, value: value}); }); if (stream) { + // for efficient polling, polling function handle all request with the same tier. var streams = streamsByTier[tier.key]; From f8e276dfe29f63c6adcb6f358c6a89741de772a0 Mon Sep 17 00:00:00 2001 From: Etienne Brodu Date: Thu, 5 Jul 2012 15:09:45 +0200 Subject: [PATCH 09/11] fix global leak start, id --- lib/cube/metric.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/cube/metric.js b/lib/cube/metric.js index 7aab2cb4..746f7bdc 100644 --- a/lib/cube/metric.js +++ b/lib/cube/metric.js @@ -29,7 +29,7 @@ exports.getter = function(db) { var stream = (request.stop === undefined) ? true : false,//stream = !("stop" in request), limit = !(+request.limit >= limitMax) ? request.limit : limitMax, step = +request.step ? +request.step : 1e4, - stop = (request.stop !== undefined) ? new Date(request.stop) : new Date(Math.floor(Date.now() / step) * step); + stop = (request.stop !== undefined) ? new Date(request.stop) : new Date(Math.floor(Date.now() / step) * step), start = (request.start !== undefined) ? new Date(request.start) : new Date(0), id = request.id; From fb1987c1f5c543bbd8e81da6c708a288b414bc93 Mon Sep 17 00:00:00 2001 From: Etienne Brodu Date: Wed, 11 Jul 2012 11:18:57 +0200 Subject: [PATCH 10/11] might fix an error about the number of results with no start and limits --- lib/cube/metric.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/cube/metric.js b/lib/cube/metric.js index 746f7bdc..40972f35 100644 --- a/lib/cube/metric.js +++ b/lib/cube/metric.js @@ -59,7 +59,8 @@ exports.getter = function(db) { var tier = tiers[step]; if (!tier) return callback({error: "invalid step"}), -1; start = tier.floor(start); - stop = tier.ceil(stop); + //stop = tier.ceil(stop); + stop = tier.floor(stop); // Compute the request metric measure(expression, start, stop, tier, "id" in request From 26498e6e74c110c14c75cb0607dc6f7605c6a05a Mon Sep 17 00:00:00 2001 From: Etienne Brodu Date: Tue, 11 Sep 2012 12:42:22 +0200 Subject: [PATCH 11/11] default start date in websocket + id persistence --- lib/cube/event.js | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/lib/cube/event.js b/lib/cube/event.js index 4825904e..89d3e36e 100644 --- a/lib/cube/event.js +++ b/lib/cube/event.js @@ -147,13 +147,18 @@ exports.getter = function(db) { function getter(request, callback) { var stream = !("stop" in request), delay = "delay" in request ? +request.delay : streamDelayDefault, - start = new Date(request.start), + start = "start" in request ? new Date(request.start) : new Date(0), stop = stream ? new Date(Date.now() - delay) : new Date(request.stop); + id = "id" in request ? request.id : undefined; // Validate the dates. if (isNaN(start)) return callback({error: "invalid start"}), -1; if (isNaN(stop)) return callback({error: "invalid stop"}), -1; + // Convert them to ObjectIDs. + start = ObjectID.createFromTime(start/1000); + stop = ObjectID.createFromTime(stop/1000); + // Parse the expression. var expression; try { @@ -188,12 +193,22 @@ exports.getter = function(db) { handle(error); // A null event indicates that there are no more results. - if (event) callback({id: event._id instanceof ObjectID ? undefined : event._id, time: event.t, data: event.d}); + if (event) callback({id: event._id instanceof ObjectID ? id : event._id, time: event.t, data: event.d}); else callback(null); }); }); } + function formatEvent(event) { + + if (event) { + event.id = id; + callback(event); + } + else + callback(null); + } + // For streaming queries, share streams for efficient polling. if (stream) { var streams = streamsBySource[expression.source]; @@ -205,7 +220,7 @@ exports.getter = function(db) { // (end terminator) to the callback, because more results are to come! if (streams) { filter.t.$lt = streams.time; - streams.waiting.push(callback); + streams.waiting.push(formatEvent); query(function(event) { if (event) callback(event); }); } @@ -214,7 +229,7 @@ exports.getter = function(db) { // detecting when active callbacks are closed, advancing the time window, // and moving waiting clients to active clients. else { - streams = streamsBySource[expression.source] = {time: stop, waiting: [], active: [callback]}; + streams = streamsBySource[expression.source] = {time: stop, waiting: [], active: [formatEvent]}; (function poll() { query(function(event) {