Skip to content

Commit

Permalink
Merge pull request #4 from rashidkpc/master
Browse files Browse the repository at this point in the history
Mapper and tests
  • Loading branch information
Rashid Khan committed Feb 24, 2014
2 parents 0929c34 + b7620be commit c99bdbb
Show file tree
Hide file tree
Showing 7 changed files with 367 additions and 42 deletions.
2 changes: 1 addition & 1 deletion .jshintrc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"console": true
},

"camelcase": true,
"camelcase": false,
"white": true,
"bitwise": false,
"eqnull": true,
Expand Down
8 changes: 7 additions & 1 deletion src/courier/courier.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ define(function (require) {
var SearchSource = require('courier/data_source/search');
var HastyRefresh = require('courier/errors').HastyRefresh;

var Mapper = require('courier/mapper.js');

// map constructors to type keywords
var sourceTypes = {
doc: DocSource,
Expand Down Expand Up @@ -54,7 +56,8 @@ define(function (require) {
// default config values
var defaults = {
fetchInterval: 30000,
docInterval: 2500
docInterval: 2500,
internalIndex: 'kibana4-int'
};

/**
Expand Down Expand Up @@ -100,6 +103,9 @@ define(function (require) {
// interval hook/fn for each type
this._onInterval = {};

// make the mapper accessable
this._mapper = new Mapper(this);

_.each(sourceTypes, function (fn, type) {
var courier = this;
// the name used outside of this module
Expand Down
29 changes: 27 additions & 2 deletions src/courier/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,23 @@ define(function (require) {
HastyRefresh.prototype.constructor = HastyRefresh;
errors.HastyRefresh = HastyRefresh;

// a non-critical cache write to elasticseach failed
function CacheWriteFailure() {
this.name = 'CacheWriteFailure';
this.message = 'A Elasticsearch cache write has failed.';
}
CacheWriteFailure.prototype = new Error();
CacheWriteFailure.prototype.constructor = CacheWriteFailure;
errors.CacheWriteFailure = CacheWriteFailure;

// when a field mapping is requested for an unknown field
function FieldNotFoundInCache(name) {
this.name = 'FieldNotFoundInCache';
this.message = 'The ' + name + ' field was not found in the cached mappings';
}
FieldNotFoundInCache.prototype = new Error();
FieldNotFoundInCache.prototype.constructor = FieldNotFoundInCache;
errors.FieldNotFoundInCache = FieldNotFoundInCache;

// where there is an error getting a doc
function DocFetchFailure(resp) {
Expand All @@ -22,16 +39,24 @@ define(function (require) {
DocFetchFailure.prototype.constructor = DocFetchFailure;
errors.DocFetchFailure = DocFetchFailure;


// there was a conflict storing a doc
function VersionConflict(resp) {
this.name = 'VersionConflict';
this.resp = resp;
this.message = 'Failed to store document changes do to a version conflict.';
this.message = 'Failed to store document changes due to a version conflict.';
}
VersionConflict.prototype = new Error();
VersionConflict.prototype.constructor = VersionConflict;
errors.VersionConflict = VersionConflict;

// there was a conflict storing a doc
function MappingConflict(field) {
this.name = 'MappingConflict';
this.message = 'Field ' + field + ' is defined as at least two different types in indices matching the pattern';
}
MappingConflict.prototype = new Error();
MappingConflict.prototype.constructor = MappingConflict;
errors.MappingConflict = MappingConflict;

return errors;
});
207 changes: 198 additions & 9 deletions src/courier/mapper.js
Original file line number Diff line number Diff line change
@@ -1,26 +1,215 @@
define(function (require) {
var _ = require('lodash');
var Error = require('courier/errors');

/**
* - Resolves index patterns
* - Fetches mappings from elasticsearch
* - casts result object fields using mappings
*
* @class Mapper
*/
function Mapper(client) {
function Mapper(courier) {

var client = courier._getClient();

// Exclude anything wirh empty mapping except these
var reservedFields = {
'_id': { type: 'string' },
'_type': { type: 'string' },
'_index': { type: 'string' }
};

// Save a reference to this
var self = this;

// STUB Until we have another way to get the config object.
var config = {
index: 'kibana4-int'
};

// Store mappings we've already loaded from Elasticsearch
var mappings = {};

/**
* Gets an object containing all fields with their mappings
* @param {dataSource} [dataSource]
* @param {Function} [callback] A function to be executed with the results.
* @param {String} [type]
* @return {Object} A hash containing fields and their related mapping
* @param {dataSource} dataSource
* @param {Function} callback A function to be executed with the results.
*/
this.getFields = function (dataSource, callback) {
if (self.getFieldsFromObject(dataSource)) {
// If we already have the fields in our object, use that.
setTimeout(callback(undefined, self.getFieldsFromObject(dataSource)), 0);
} else {
// Otherwise, try to get fields from Elasticsearch cache
self.getFieldsFromCache(dataSource, function (err, fields) {
if (err) {
// If we are unable to get the fields from cache, get them from mapping
self.getFieldsFromMapping(dataSource, function (err, fields) {
if (err) return courier._error(err);

// And then cache them
cacheFieldsToElasticsearch(config, dataSource._state.index, fields, function (err, response) {
if (err) return courier._error(new Error.CacheWriteError());
});

cacheFieldsToObject(dataSource, fields);
callback(err, fields);
});
} else {
cacheFieldsToObject(dataSource, fields);
callback(err, fields);
}
});
}
};

/**
* Gets an object containing the mapping for a field
* @param {dataSource} dataSource
* @param {String} field The dot notated name of a field to get the mapping for
* @param {Function} callback A function to be executed with the results.
*/
this.getFieldMapping = function (dataSource, field, callback) {
self.getFields(dataSource, function (err, fields) {
if (_.isUndefined(fields[field])) return courier._error(new Error.FieldNotFoundInCache());
callback(err, fields[field]);
});
};

/**
* Gets an object containing the mapping for a field
* @param {dataSource} dataSource
* @param {Array} fields The dot notated names of a fields to get the mapping for
* @param {Function} callback A function to be executed with the results.
*/
this.getFieldsMapping = function (dataSource, fields, callback) {
self.getFields(dataSource, function (err, fields) {
var _mapping = _.object(_.map(fields, function (field) {
if (_.isUndefined(fields[field])) return courier._error(new Error.FieldNotFoundInCache());
return [field, fields[field]];
}));
callback(err, _mapping);
});
};

/**
* Gets an object containing all fields with their mappings from kibana's cache in Elasticsearch
* @param {dataSource} dataSource
* @return {Object} An object containing fields with their mappings, or false if not found.
*/
this.getFieldsFromObject = function (dataSource) {
return !_.isUndefined(mappings[dataSource._state.index]) ? mappings[dataSource._state.index] : false;
};

/**
* Gets an object containing all fields with their mappings from kibana's cache in Elasticsearch
* @param {dataSource} dataSource
* @param {Function} callback A function to be executed with the results.
*/
this.getFields = function (dataSource, callback, type) {
client.indices.getFieldMapping({index: dataSource.index}, callback);
this.getFieldsFromCache = function (dataSource, callback) {
var params = {
index: config.index,
type: 'mapping',
id: dataSource._state.index,
};

client.getSource(params, callback);
};

/**
* Gets an object containing all fields with their mappings directly from Elasticsearch
* @param {dataSource} dataSource
* @param {Function} callback A function to be executed with the results.
*/
this.getFieldsFromMapping = function (dataSource, callback) {
var params = {
// TODO: Change index to be newest resolved index. Change _state.index to id().
index: dataSource._state.index,
field: '*',
};

// TODO: Add week/month check
client.indices.getFieldMapping(params, function (err, response, status) {

// TODO: Add error message

var fields = {};

_.each(response, function (index) {
_.each(index.mappings, function (type) {
_.each(type, function (field, name) {
if (_.isUndefined(field.mapping) || name[0] === '_') return;
if (!_.isUndefined(fields[name]) && fields[name] !== field.mapping[_.keys(field.mapping)[0]])
return courier._error(new Error.MappingConflict());
fields[name] = field.mapping[_.keys(field.mapping)[0]];
});
});
});

// TODO if these are mapped differently this might cause problems
_.assign(fields, reservedFields);

callback(err, fields);
});
};

/**
* Stores processed mappings in Elasticsearch
* @param {dataSource} dataSource
* @param {Function} callback A function to be executed with the results.
*/
var cacheFieldsToElasticsearch = function (config, index, fields, callback) {
client.index({
index : config.index,
type: 'mapping',
id : index,
body : fields
}, callback);
};

this.getFieldType = function (dataSource, field, type) {
return field, type;
/**
* Stores processed mappings in an object
* @param {dataSource} dataSource
* @param {Function} callback A function to be executed with the results.
*/
var cacheFieldsToObject = function (dataSource, fields) {
mappings[dataSource._state.index] = _.clone(fields);
return !_.isUndefined(mappings[dataSource._state.index]) ? true : false;
};

/**
* Clears mapping caches from elasticsearch and from local object
* @param {dataSource} dataSource
* @param {Function} callback A function to be executed with the results.
*/
this.clearCache = function (dataSource, callback) {
if (!_.isUndefined(mappings[dataSource._state.index])) {
delete mappings[dataSource._state.index];
}
client.delete({
index : config.index,
type: 'mapping',
id : dataSource._state.index
}, callback);
};


/**
* Sets a number of fields to be ignored in the mapping. Not sure this is a good idea?
* @param {dataSource} dataSource
* @param {Array} fields An array of fields to be ignored
* @param {Function} callback A function to be executed with the results.
*/
this.ignoreFields = function (dataSource, fields, callback) {
if (!_.isArray(fields)) fields = [fields];
var ignore = _.object(_.map(fields, function (field) {
return [field, {type: 'ignore'}];
}));
self.getFields(dataSource, function (err, mapping) {
_.assign(mapping, ignore);
callback(err, mapping);
});
};

}
Expand Down
30 changes: 24 additions & 6 deletions src/kibana/controllers/kibana.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,38 @@ define(function (require) {
return {
restrict: 'E',
scope: {
type: '@'
type: '@',
fields: '@'
},
template: '<strong style="float:left">{{count}} :&nbsp;</strong><pre>{{json}}</pre>',
template: 'Mappings:<br><div ng-repeat="(name,mapping) in mappedFields">{{name}} = {{mapping.type}}</div><hr>' +
'<strong style="float:left">{{count}} :&nbsp;</strong><pre>{{json}}</pre>',
controller: function ($rootScope, $scope, courier) {
$scope.count = 0;
$scope.mappedFields = {};

var source = $rootScope.dataSource.extend()
.index('logstash-*')
.type($scope.type)
.source({
include: 'country'
include: 'geo'
})
.on('results', function (resp) {
$scope.count ++;
$scope.json = JSON.stringify(resp.hits, null, ' ');
});

courier.mapper.getFields($rootScope.dataSource, function (data) {
$scope.json = data;
var fields = $scope.fields.split(',');


_.each(fields, function (field) {
courier._mapper.getFieldMapping(source, field, function (err, mapping) {
$scope.mappedFields[field] = mapping;
});
});


courier._mapper.getFields(source, function (err, response, status) {
console.log(response);
});

$scope.$watch('type', source.type);
Expand All @@ -49,10 +63,13 @@ define(function (require) {
type: '@',
index: '@'
},
template: '<strong style="float:left">{{count}} : <button ng-click="click()">reindex</button> :&nbsp;</strong><pre>{{json}}</pre>',
template: '<strong style="float:left">{{count}} : <button ng-click="click()">reindex</button> :&nbsp;</strong>' +
'<pre>{{json}} BEER</pre>',
controller: function (courier, $scope) {
$scope.count = 0;

console.log(courier);

var currentSource;
$scope.click = function () {
if (currentSource) {
Expand All @@ -69,6 +86,7 @@ define(function (require) {
$scope.count ++;
$scope.json = JSON.stringify(doc, null, ' ');
});

}
};
});
Expand Down
4 changes: 2 additions & 2 deletions test/unit/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
}
})
require([
'/specs/courier.js',
'/specs/data_source.js',
//'/specs/courier.js',
//'/specs/data_source.js',
'/specs/mapper.js'
], function () {
window.mochaRunner = mocha.run().on('end', function(){
Expand Down
Loading

0 comments on commit c99bdbb

Please sign in to comment.