Skip to content

Commit

Permalink
Merge pull request #5 from spenceralger/application_boot
Browse files Browse the repository at this point in the history
Application boot
  • Loading branch information
spenceralger committed Feb 24, 2014
2 parents c99bdbb + 3dbbc97 commit c385df2
Show file tree
Hide file tree
Showing 22 changed files with 772 additions and 340 deletions.
2 changes: 1 addition & 1 deletion src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ define(function () {
* The default ES index to use for storing Kibana specific object
* such as stored dashboards
*/
kibanaIndex: 'kibana-int'
kibanaIndex: 'kibana4-int'
};
});
63 changes: 37 additions & 26 deletions src/courier/courier.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ define(function (require) {
var DocSource = require('courier/data_source/doc');
var SearchSource = require('courier/data_source/search');
var HastyRefresh = require('courier/errors').HastyRefresh;
var nextTick = require('utils/next_tick');

var Mapper = require('courier/mapper.js');
var Mapper = require('courier/mapper');

// map constructors to type keywords
var sourceTypes = {
Expand All @@ -31,17 +32,15 @@ define(function (require) {
courier._refs.search,
function (err) {
if (err) return courier._error(err);
courier._activeSearchRequest = null;
});
},

// validate that all of the DocSource objects are up to date
// then fetch the onces that are not
doc: function (courier) {
DocSource.validate(courier, courier._refs.doc, function (err, invalid) {
if (err) {
courier.stop();
return courier.emit('error', err);
}
if (err) return courier._error(err);

// if all of the docs are up to date we don't need to do anything else
if (invalid.length === 0) return;
Expand All @@ -56,8 +55,9 @@ define(function (require) {
// default config values
var defaults = {
fetchInterval: 30000,
docInterval: 2500,
internalIndex: 'kibana4-int'
docInterval: 1500,
internalIndex: 'kibana4-int',
mapperCacheType: 'mappings'
};

/**
Expand All @@ -66,12 +66,13 @@ define(function (require) {
* search:
* - inherits filters, and other query properties
* - automatically emit results on a set interval
*
* doc:
* - tracks doc versions
* - emits same results event when the doc is updated
* - helps seperate versions of kibana running on the same machine stay in sync
* - (NI) tracks version and uses it when new versions of a doc are reindexed
* - (NI) helps deal with conflicts
* - tracks version and uses it to verify that updates are safe to make
* - emits conflict event when that happens
*
* @param {object} config
* @param {Client} config.client - The elasticsearch.js client to use for querying. Should be
Expand Down Expand Up @@ -104,7 +105,10 @@ define(function (require) {
this._onInterval = {};

// make the mapper accessable
this._mapper = new Mapper(this);
this._mapper = new Mapper(this, {
cacheIndex: config.internalIndex,
cacheType: config.mapperCacheType
});

_.each(sourceTypes, function (fn, type) {
var courier = this;
Expand All @@ -121,7 +125,7 @@ define(function (require) {

// store a quick "bound" method for triggering
this._onInterval[type] = function () {
if (courier._refs[type].length) onFetch[type](courier);
courier.fetch(type);
courier._schedule(type);
};

Expand Down Expand Up @@ -151,7 +155,7 @@ define(function (require) {

// is the courier currently running?
Courier.prototype.running = function () {
return !!this._fetchTimer;
return !!_.size(this._timer);
};

// stop the courier from fetching more results
Expand All @@ -170,11 +174,18 @@ define(function (require) {
}, this);
};

// force a fetch of all datasources right now
Courier.prototype.fetch = function () {
_.forOwn(onFetch, function (fn, type) {
if (this._refs[type].length) fn(this);
}, this);
// force a fetch of all datasources right now, optionally filter by type
Courier.prototype.fetch = function (onlyType) {
var courier = this;
nextTick(function () {
_.forOwn(onFetch, function (fn, type) {
if (onlyType && onlyType !== type) return;
if (courier._refs[type].length) fn(courier);
courier._refs[type].forEach(function (ref) {
ref.fetchCount ++;
});
});
});
};

// data source factory
Expand All @@ -187,6 +198,7 @@ define(function (require) {
return new Constructor(this, initialState);
};


/*****
* PRIVATE API
*****/
Expand All @@ -212,7 +224,8 @@ define(function (require) {
var refs = this._refs[source._getType()];
if (!_.find(refs, { source: source })) {
refs.push({
source: source
source: source,
fetchCount: 0
});
}
};
Expand All @@ -235,7 +248,8 @@ define(function (require) {

// properly clear scheduled fetches
Courier.prototype._clearScheduled = function (type) {
this._timer[type] = clearTimeout(this._timer[type]);
clearTimeout(this._timer[type]);
delete this._timer[type];
};

// alert the courior that a doc has been updated
Expand All @@ -246,18 +260,15 @@ define(function (require) {
_.each(this._refs.doc, function (ref) {
var state = ref.source._state;
if (
state === updated
|| (
state.id === updated.id
&& state.type === updated.type
&& state.index === updated.index
)
state.id === updated.id
&& state.type === updated.type
&& state.index === updated.index
) {
delete ref.version;
}
});

onFetch.doc(this);
this.fetch('doc');
};

return Courier;
Expand Down
49 changes: 49 additions & 0 deletions src/courier/data_source/data_source.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ define(function (require) {
return courier.createSource(this._getType()).inherits(this);
};

this.courier = function (newCourier) {
courier = this._courier = newCourier;
return this;
};

// get/set internal state values
this._methods.forEach(function (name) {
this[name] = function (val) {
Expand Down Expand Up @@ -96,6 +101,50 @@ define(function (require) {
return JSON.stringify(this.toJSON());
};

/**
* Set the $scope for a datasource, when a datasource is bound
* to a scope, it's event listeners will be wrapped in a call to that
* scope's $apply method (safely).
*
* This also binds the DataSource to the lifetime of the scope: when the scope
* is destroyed, the datasource is closed
*
* @param {AngularScope} $scope - the scope where the event emitter "occurs",
* helps angular determine where to start checking for changes
* @return {this} - chainable
*/
DataSource.prototype.$scope = function ($scope) {
var emitter = this;

if (emitter._emitter$scope) {
emitter._emitter$scope = $scope;
return this;
}

emitter._emitter$scope = $scope;
var origOn = emitter.on;

emitter.on = function (event, listener) {
var wrapped = function () {
var args = arguments;
// always use the stored ref so that it can be updated if needed
var $scope = emitter._emitter$scope;
$scope[$scope.$$phase ? '$eval' : '$apply'](function () {
listener.apply(emitter, args);
});
};
wrapped.listener = listener;
return origOn.call(emitter, event, wrapped);
};

emitter.on.restore = function () {
delete emitter._emitter$scope;
emitter.on = origOn;
};

return this;
};

/*****
* PRIVATE API
*****/
Expand Down
66 changes: 40 additions & 26 deletions src/courier/data_source/doc.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
define(function (require) {
var DataSource = require('courier/data_source/data_source');
var inherits = require('utils/inherits');
var nextTick = require('utils/next_tick');
var errors = require('courier/errors');
var listenerCount = require('utils/event_emitter').listenerCount;
var _ = require('lodash');
Expand All @@ -23,7 +24,7 @@ define(function (require) {
DocSource.fetch = function (courier, refs, cb) {
var client = courier._getClient();
var allRefs = [];
var body = {
var getBody = {
docs: []
};

Expand All @@ -32,25 +33,30 @@ define(function (require) {
if (source._getType() !== 'doc') return;

allRefs.push(ref);
body.docs.push(source._flatten());
getBody.docs.push(source._flatten());
});

return client.mget({ body: body }, function (err, resp) {
if (err) return cb(err);

_.each(resp.docs, function (resp, i) {
var ref = allRefs[i];
var source = ref.source;

if (resp.error) return source._error(new errors.DocFetchFailure(resp));
if (ref.version === resp._version) return; // no change
ref.version = resp._version;
source._storeVersion(resp._version);
source.emit('results', resp);
});

cb(err, resp);
});
return client.mget({ body: getBody })
.then(function (resp) {
_.each(resp.docs, function (resp, i) {
var ref = allRefs[i];
var source = ref.source;

if (resp.error) return source._error(new errors.DocFetchFailure(resp));
if (resp.found) {
if (ref.version === resp._version) return; // no change
ref.version = resp._version;
source._storeVersion(resp._version);
} else {
ref.version = void 0;
source._clearVersion();
}
source.emit('results', resp);
});

cb(void 0, resp);
})
.catch(cb);
};

/**
Expand All @@ -63,11 +69,10 @@ define(function (require) {
DocSource.validate = function (courier, refs, cb) {
var invalid = _.filter(refs, function (ref) {
var storedVersion = ref.source._getVersion();
if (ref.version !== storedVersion) return true;
});
setTimeout(function () {
cb(void 0, invalid);
/* jshint eqeqeq: false */
return (!ref.fetchCount || ref.version != storedVersion);
});
nextTick(cb, void 0, invalid);
};

/*****
Expand Down Expand Up @@ -102,6 +107,7 @@ define(function (require) {
id: state.id,
type: state.type,
index: state.index,
version: source._getVersion(),
body: {
doc: fields
}
Expand Down Expand Up @@ -129,7 +135,6 @@ define(function (require) {
id: state.id,
type: state.type,
index: state.index,
version: source._getVersion(),
body: body,
ignore: [409]
}, function (err, resp) {
Expand Down Expand Up @@ -201,8 +206,8 @@ define(function (require) {
* @return {number} - the version number, or NaN
*/
DocSource.prototype._getVersion = function () {
var id = this._versionKey();
return _.parseInt(localStorage.getItem(id));
var v = localStorage.getItem(this._versionKey());
return v ? _.parseInt(v) : void 0;
};

/**
Expand All @@ -212,8 +217,17 @@ define(function (require) {
*/
DocSource.prototype._storeVersion = function (version) {
var id = this._versionKey();
localStorage.setItem(id, version);
if (version) {
localStorage.setItem(id, version);
} else {
localStorage.removeItem(id);
}
};

/**
* Clears the stored version for a DocSource
*/
DocSource.prototype._clearVersion = DocSource.prototype._storeVersion;

return DocSource;
});
Loading

0 comments on commit c385df2

Please sign in to comment.