From 46f1e5d508b777d7f235ff8faa5eb5ea247a3986 Mon Sep 17 00:00:00 2001 From: Julien Gilli Date: Fri, 9 Mar 2018 17:11:09 -0800 Subject: [PATCH] TRITON-239 VMAPI should use reusable moray buckets initialization/data migrations module --- lib/apis/moray.js | 633 +----------------- lib/data-migrations/controller.js | 305 --------- lib/data-migrations/loader.js | 257 ------- lib/data-migrations/noop-controller.js | 49 -- lib/endpoints/ping.js | 89 ++- lib/errors.js | 10 +- lib/interceptors.js | 18 +- lib/moray/moray-buckets-initializer.js | 276 -------- lib/moray/moray-init.js | 29 +- lib/vmapi.js | 30 +- package.json | 1 + server.js | 57 +- .../vms/001-invalid-file-extension.foo | 9 - .../vms/invalid-file-name.js | 9 - .../vmapi-server-throwing-expected-stderr.txt | 4 +- .../vmapi-server-with-throwing-handler.js | 16 +- test/vms.data-migrations.test.js | 572 ++++------------ ...index-moray-bucket-transient-error.test.js | 151 ++--- ...e-moray-bucket-non-transient-error.test.js | 207 ------ ...e-moray-bucket-removes-index-fails.test.js | 213 ------ ...pdate-moray-bucket-transient-error.test.js | 91 +-- ...vms.update-moray-bucket-versioning.test.js | 520 -------------- tools/add-test-vms.js | 2 +- tools/fix-no-owner.js | 2 +- tools/kvm-backfill.js | 2 +- 25 files changed, 396 insertions(+), 3156 deletions(-) delete mode 100644 lib/data-migrations/controller.js delete mode 100644 lib/data-migrations/loader.js delete mode 100644 lib/data-migrations/noop-controller.js delete mode 100644 lib/moray/moray-buckets-initializer.js delete mode 100644 test/fixtures/data-migrations-invalid-filenames/vms/001-invalid-file-extension.foo delete mode 100644 test/fixtures/data-migrations-invalid-filenames/vms/invalid-file-name.js delete mode 100644 test/vms.update-moray-bucket-non-transient-error.test.js delete mode 100644 test/vms.update-moray-bucket-removes-index-fails.test.js delete mode 100644 test/vms.update-moray-bucket-versioning.test.js diff --git a/lib/apis/moray.js b/lib/apis/moray.js index 8f3970a7..995872e9 100644 --- a/lib/apis/moray.js +++ b/lib/apis/moray.js @@ -80,6 +80,8 @@ function Moray(options) { assert.object(options.bucketsConfig, 'options.bucketsConfig'); assert.object(options.changefeedPublisher, 'options.changefeedPublisher'); assert.optionalObject(options.log, 'options.log'); + assert.object(options.morayBucketsInitializer, + 'options.morayBucketsInitializer'); this._bucketsConfig = options.bucketsConfig; this._changefeedPublisher = options.changefeedPublisher; @@ -88,16 +90,12 @@ function Moray(options) { level: options.logLevel || 'info', serializers: restify.bunyan.serializers }); + this._morayBucketsInitializer = options.morayBucketsInitializer; this._morayClient = options.morayClient; - this._bucketsSetup = false; - this._lastBucketsSetupError = null; - this._reindexingBuckets = false; - this._settingUpBuckets = false; - - this._latestCompletedMigration = undefined; - _validateBucketsConfig(this._bucketsConfig); + this._VMS_BUCKET_NAME = options.bucketsConfig.vms.name; + this._VM_ROLE_TAGS_BUCKET_NAME = options.bucketsConfig.vm_role_tags.name; } /* @@ -113,52 +111,6 @@ function _validateBucketsConfig(bucketsConfig) { assert.object(bucketsConfig.vm_role_tags, 'bucketsConfig.vm_role_tags'); } -/* - * Validates that data migrations represented by "dataMigrations" are sound. For - * instance, it checks that for each model that needs to be migrated, its - * corresponding moray bucket configuration includes a "data_version" indexed - * field. It also makes sure that versioning of subsequent data migrations for a - * given model follows a sequence. - */ -Moray.prototype.validateDataMigrations = -function validateDataMigrations(dataMigrations) { - var bucketConfig; - var bucketName; - var expectedDataVersion; - var idx; - var migrationsForBucket; - - assert.object(this._bucketsConfig, 'this._bucketsConfig'); - assert.object(dataMigrations, 'dataMigrations'); - - for (bucketName in dataMigrations) { - bucketConfig = this._bucketsConfig[bucketName]; - - assert.object(bucketConfig, 'bucketConfig'); - assert.object(bucketConfig.schema.index.data_version, - 'data_version indexed field should be present in bucket config'); - assert.equal(bucketConfig.schema.index.data_version.type, 'number', - 'data_version indexed field should be of type \'number\''); - - migrationsForBucket = dataMigrations[bucketName]; - expectedDataVersion = 1; - /* - * Validates that all data migrations that need to be performed are - * valid. For instance, that their DATA_VERSION numbers are a proper - * sequence starting at 1, and that they export a function named - * "migrateRecord". - */ - for (idx = 0; idx < migrationsForBucket.length; ++idx) { - assert.equal(migrationsForBucket[idx].DATA_VERSION, - expectedDataVersion, 'Data version of migration ' + (idx + 1) + - ' should be ' + expectedDataVersion); - assert.func(migrationsForBucket[idx].migrateRecord, - 'migrationsForBucket[' + idx + '].migrateRecord'); - ++expectedDataVersion; - } - } -}; - /* * Returns whether the application model represented by the string "modelName" * is valid. Currently it means it has a representation in Moray. @@ -181,136 +133,48 @@ Moray.prototype._modelToBucketName = function _modelToBucketName(modelName) { return this._bucketsConfig[modelName].name; }; -/* - * Returns true if the "err" error object represents a transient error (an error - * that could be solved after retrying the same action) that can happen during - * the process of setting up Moray buckets. - */ -Moray.prototype.isBucketsSetupErrorTransient = - function isBucketsSetupErrorTransient(err) { - assert.object(err, 'err'); - assert.string(err.name, 'err.name'); - - var nonTransientErrName; - var NON_TRANSIENT_ERROR_NAMES = [ - /* Errors sent by the moray server */ - 'InvalidBucketConfigError', - 'InvalidBucketNameError', - 'InvalidIndexDefinitionError', - 'NotFunctionError', - 'BucketVersionError', - /* Custom errors generated by this Moray abstraction layer */ - 'InvalidIndexesRemovalError' - ]; - - for (var idx in NON_TRANSIENT_ERROR_NAMES) { - nonTransientErrName = NON_TRANSIENT_ERROR_NAMES[idx]; - if (verror.hasCauseWithName(err, nonTransientErrName)) { - return false; - } - } - - return true; - }; - - -/* - * Sets up VMAPI's moray buckets, including creating them if they're - * missing, or updating them if they already exist. Calls the 'callback' - * function when that setup completed. - * - * It does not perform any reindexing of rows that would need to be reindexed - * after a bucket was updated to add one or more indexes. To reindex rows of all - * buckets, use the "Moray.prototype.reindexBuckets" function. - * - * If the setup results in an error, the first argument of the 'callback' - * function is an Error object. The - * 'Moray.prototype.isBucketsSetupErrorNonTransient' function can be used to - * determine whether that error is non transient, and how to act on it depending - * on the program's expectations and behavior. - * - * The "Moray.prototype.setupBuckets" function can be called more than once per - * instance of the Moray constructor, as long as each call is made after the - * previous setup process terminated, either successfully or with an error, by - * calling the 'callback' function passed as a parameter. Calling this method - * while a previous call is still in flight will throw an error. - */ -Moray.prototype.setupBuckets = function setupBuckets(callback) { - var self = this; - var bucketsList = []; - var bucketConfig; - - if (self._settingUpBuckets === true) { - throw new Error('setupBuckets cannot be called when a setup ' + - 'process is in progress'); - } - - self._lastBucketsSetupError = null; - self._settingUpBuckets = true; - - self._log.info({bucketsConfig: self._bucketsConfig}, - 'Setting up moray buckets...'); - - self._VMS_BUCKET_NAME = self._bucketsConfig.vms.name; - self._VM_ROLE_TAGS_BUCKET_NAME = self._bucketsConfig.vm_role_tags.name; - - for (bucketConfig in self._bucketsConfig) { - bucketsList.push(self._bucketsConfig[bucketConfig]); - } - - self._trySetupBuckets(bucketsList, function (setupBucketsErr) { - self._settingUpBuckets = false; - self._lastBucketsSetupError = setupBucketsErr; - - if (setupBucketsErr) { - self._log.error({ error: setupBucketsErr }, - 'Error when setting up moray buckets'); - } else { - self._log.info('Buckets have been setup successfully'); - self._bucketsSetup = true; - } - - callback(setupBucketsErr); - }); -}; - - /* * Returns true if VMAPI's moray buckets have been setup successfully, false * otherwise. */ Moray.prototype.bucketsSetup = function bucketsSetup() { - return this._bucketsSetup; + return this._morayBucketsInitializer && + this._morayBucketsInitializer.status() && + this._morayBucketsInitializer.status().bucketsSetup.state === 'DONE'; }; - -/* - * Returns an Error instance that represents the latest error that occured - * during the process of setting up Moray buckets (but not reindexing), or null - * if no error occurred since the last time "Moray.prototype.setupBuckets" was - * called. - */ -Moray.prototype.lastBucketsSetupError = function lastBucketsSetupError() { - return this._lastBucketsSetupError; -}; - - /* * Returns a string representing an error message to signal that the * Moray layer's setup process has not completed yet. */ Moray.prototype._createMorayBucketsNotSetupErrMsg = function _createMorayBucketsNotSetupErrMsg() { - var errMsg = 'moray buckets are not setup'; + var bucketsSetupErr; + var bucketsSetupStatus; + var errMsg; + var morayBucketsInitStatus; + + assert.object(this._morayBucketsInitializer, + 'this._morayBucketsInitializer'); + + morayBucketsInitStatus = this._morayBucketsInitializer.status(); + assert.object(morayBucketsInitStatus, 'morayBucketsInitStatus'); + + bucketsSetupStatus = morayBucketsInitStatus.bucketsSetup; + assert.object(bucketsSetupStatus, 'bucketsSetupStatus'); + + bucketsSetupErr = bucketsSetupStatus.latestError; - if (this._lastBucketsSetupError !== null) { - errMsg += ', reason: ' + this._lastBucketsSetupError; + errMsg = 'moray buckets are not setup'; + if (bucketsSetupErr !== null) { + errMsg += ', reason: ' + bucketsSetupErr; } return errMsg; }; + /* * Pings Moray by calling its ping method */ @@ -331,8 +195,7 @@ Moray.prototype.getVm = function getVm(params, cb) { var error; if (!this.bucketsSetup()) { - cb(new verror.VError('Moray buckets are not setup', - this._lastBucketsSetupError)); + cb(new Error(this._createMorayBucketsNotSetupErrMsg())); return; } @@ -1166,195 +1029,6 @@ Moray.prototype._addTagsFilter = function (params, filter) { }; -/* - * Tries to setup VMAPI's moray buckets as specified by the array "buckets". - * Calls the function "cb" when done. If there was an error, the "cb" function - * is called with an error object as its first parameter, otherwise it is called - * without passing any parameter. - */ -Moray.prototype._trySetupBuckets = - function _trySetupBuckets(buckets, cb) { - assert.arrayOfObject(buckets, 'buckets'); - assert.func(cb, 'cb'); - - var self = this; - - vasync.forEachPipeline({ - func: function setupEachBucket(newBucketConfig, done) { - var bucketName = newBucketConfig.name; - assert.string(bucketName, 'bucketName'); - - self._trySetupBucket(bucketName, newBucketConfig, done); - }, - inputs: buckets - }, cb); - }; - - -/* - * Returns true if the updating a moray bucket from the bucket schema - * "oldBucketSchema" to "newBucketSchema" would imply removing at least one - * index. Returns false otherwise. - */ -function indexesRemovedBySchemaChange(oldBucketSchema, newBucketSchema) { - assert.object(oldBucketSchema, 'oldBucketSchema'); - assert.object(newBucketSchema, 'newBucketSchema'); - - var oldBucketIndexNames = []; - var newBucketIndexNames = []; - - if (oldBucketSchema.index) { - oldBucketIndexNames = Object.keys(oldBucketSchema.index); - } - - if (newBucketSchema.index) { - newBucketIndexNames = Object.keys(newBucketSchema.index); - } - - var indexesRemoved = - oldBucketIndexNames.filter(function indexMissingInNewSchema(indexName) { - return newBucketIndexNames.indexOf(indexName) === -1; - }); - - return indexesRemoved; -} - - -/* - * Tries to set up bucket with name "bucketName" to have configuration - * "bucketConfig". The setup process includes, in the following order: - * - * 1. creating the bucket if it does not exist. - * - * 2. updating the bucket's indexes to add indexes. Indexes cannot be removed - * because it's a backward incompitble change: if a code rollback is performed, - * older code that would rely on the deleted indexes wouldn't be able to work - * properly, and removing indexes will generate an error. - * - */ -function _trySetupBucket(bucketName, bucketConfig, cb) { - assert.string(bucketName, 'bucketName'); - assert.object(bucketConfig, 'bucketConfig'); - assert.object(bucketConfig.schema, 'bucketConfig.schema'); - assert.optionalObject(bucketConfig.schema.options, - 'bucketConfig.schema.options'); - if (bucketConfig.schema.options) { - assert.optionalNumber(bucketConfig.schema.options.version, - 'bucketConfig.schema.options.version'); - } - - assert.func(cb, 'cb'); - - var self = this; - - var newBucketSchema = bucketConfig.schema; - - vasync.waterfall([ - function loadBucket(next) { - self._getBucket(bucketName, function (err, oldBucketSchema) { - if (err && - verror.hasCauseWithName(err, 'BucketNotFoundError')) { - err = null; - } - - next(err, oldBucketSchema); - }); - }, - function createBucket(oldBucketSchema, next) { - if (!oldBucketSchema) { - self._log.info({bucketName: bucketName}, - 'Bucket not found, creating it...'); - self._createBucket(bucketName, bucketConfig.schema, - function createDone(createErr) { - if (createErr) { - self._log.error({ - bucketName: bucketName, - error: createErr.toString() - }, 'Error when creating bucket'); - } else { - self._log.info('Bucket ' + - bucketName + - ' created successfully'); - } - - next(createErr, oldBucketSchema); - }); - } else { - self._log.info({bucketName: bucketName}, - 'Bucket already exists, not creating it.'); - next(null, oldBucketSchema); - } - }, - function updateBucketSchema(oldBucketSchema, next) { - assert.optionalObject(oldBucketSchema, 'oldBucketSchema'); - - var oldVersion = 0; - var newVersion = 0; - var removedIndexes = []; - - if (oldBucketSchema && oldBucketSchema.options && - oldBucketSchema.options.version) { - oldVersion = oldBucketSchema.options.version; - } - - if (newBucketSchema.options && newBucketSchema.options.version) { - newVersion = newBucketSchema.options.version; - } - - /* - * If the bucket's version was bumped, update the bucket, otherwise: - * - * 1. the version number wasn't bumped because no change was made - * and there's nothing to do. - * - * 2. the version number is lower than the current version number in - * moray. This can be the result of a code rollback. Since we make - * only backward compatible changes for moray buckets, and - * decrementing a bucket's version number is an error, it's ok to - * not change the bucket. - */ - if (oldBucketSchema && newVersion > oldVersion) { - removedIndexes = indexesRemovedBySchemaChange(oldBucketSchema, - newBucketSchema); - if (removedIndexes.length > 0) { - /* - * Removing indexes is considered to be a backward - * incompatible change. We don't allow them so that after - * rolling back to a previous version of the code, the code - * can still use any index that it relies on. - */ - next(new errors.InvalidIndexesRemovalError(removedIndexes)); - return; - } - - self._log.info('Updating bucket ' + bucketName + ' from ' + - 'version ' + oldVersion + ' to version ' + newVersion + - '...'); - - self._updateBucket(bucketName, newBucketSchema, - function updateDone(updateErr) { - if (updateErr) { - self._log.error({error: updateErr}, - 'Error when updating bucket ' + - bucketName); - } else { - self._log.info('Bucket ' + bucketName + - ' updated successfully'); - } - - next(updateErr); - }); - } else { - self._log.info('Bucket ' + bucketName + ' already at version ' + - '>= ' + newVersion + ', no need to update it'); - next(null); - } - } - ], cb); -} -Moray.prototype._trySetupBucket = _trySetupBucket; - - /* * Gets a bucket */ @@ -1555,255 +1229,4 @@ Moray.prototype.delVmRoleTags = function (uuid, cb) { }; - -/* - * Reindexes all objects in the bucket with name "bucketName" and calls the - * function "callback" when it's done. - * - * @param moray {MorayClient} - * @param bucketName {Name of the bucket to reindex} - * @param callback {Function} `function (err)` - */ -Moray.prototype._reindexBucket = - function _reindexBucket(bucketName, callback) { - assert.string(bucketName, 'bucketName'); - assert.func(callback, 'callback'); - - var self = this; - - self._morayClient.reindexObjects(bucketName, 100, - function onReindexBucketDone(reindexErr, res) { - if (reindexErr || res.processed < 1) { - callback(reindexErr); - return; - } - - self._reindexBucket(bucketName, callback); - }); - }; - -/* - * Reindexes all buckets and calls "callback" when it's done. - * - * @param {Function} callback - a function called when either the reindexing - * process is complete for all buckets, or when an error occurs. It is called - * as "callback(null)" if the reindexing process completed with no error, or - * "callback(err)"" if the error "err" occurred. - */ -Moray.prototype.reindexBuckets = function reindexBuckets(callback) { - assert.func(callback, 'callback'); - - var bucketsList = []; - var bucketConfigName; - var self = this; - - if (self._reindexingBuckets === true) { - throw new Error('reindexBuckets cannot be called when a reindexing ' + - 'process is in progress'); - } - - self._reindexingBuckets = true; - - for (bucketConfigName in this._bucketsConfig) { - bucketsList.push(this._bucketsConfig[bucketConfigName]); - } - - vasync.forEachPipeline({ - func: function reindexBucket(bucketConfig, done) { - assert.object(bucketConfig, 'bucketConfig'); - assert.string(bucketConfig.name, 'bucketConfig.name'); - - var bucketName = bucketConfig.name; - - self._log.info('Reindexing bucket ' + bucketName + '...'); - - self._reindexBucket(bucketName, function reindexDone(reindexErr) { - if (reindexErr) { - self._log.error({err: reindexErr}, - 'Error when reindexing bucket ' + bucketName); - } else { - self._log.info('Bucket ' + bucketName + - ' reindexed successfully'); - } - - done(reindexErr); - }); - }, - inputs: bucketsList - }, function onAllBucketsReindexed(reindexErr) { - self._reindexingBuckets = false; - callback(reindexErr); - }); -}; - -/* - * Finds the next chunk of records that need to be changed to be migrated to - * version "version". - * - * @param {String} modelName: the name of the model (e.g "vms", "vm_role_tags", - * "server_vms") for which to find records to migrate - * - * @param {Number} version: must be >= 1. - * - * @param {Object} options: - * - log {Object}: the bunyan log instance to use to output log messages. - * - * @param {Function} callback: called with two parameters: (error, records) - * where "error" is any error that occurred when trying to find those records, - * and "records" is an array of objects representing VM objects that need to - * be changed to be migrated to version "version". - */ -Moray.prototype.findRecordsToMigrate = -function findRecordsToMigrate(modelName, version, options, callback) { - assert.string(modelName, 'bucketName'); - assert.number(version, 'version'); - assert.ok(version >= 1, 'version >= 1'); - assert.object(options, 'options'); - assert.func(callback, 'callback'); - - var bucketName = this._modelToBucketName(modelName); - var log = this._log; - var morayFilter; - var records = []; - var RETRY_DELAY_IN_MS = 10000; - var self = this; - - /* - * !!!! WARNING !!!! - * - * When updating these LDAP filters, make sure that they don't break the - * assumption below that an InvalidQueryError can be treated as a transient - * error (See below why.). - * - * !!!! WARNING !!!! - */ - if (version === 1) { - /* - * Version 1 is special, in the sense that there's no anterior version - * for which data_version has a value. Instead, the version before - * version 1 is represented by an absence of value for the data_version - * field. - */ - morayFilter = '(!(data_version=*))'; - } else { - /* - * For any migration whose version number is greater than one, they only - * migrate records at version N - 1. This is safe because: - * - * 1. all new records created are created at the latest version - * supported by VMAPI - * - * 2. migrations are always done in sequence, starting from the - * migration that migrates records without a data_version to records - * with a data_version === 1. - */ - morayFilter = util.format('(|(!(data_version=*))(data_version=%s))', - version - 1); - } - - log.debug({filter: morayFilter, version: version}, - 'generated LDAP filter to find records at version less than given ' + - 'version'); - - /* - * It would be useful to pass either the requireIndexes: true or - * requireOnlineReindexing: true options to findObjects here, as that would - * allow us to make sure that we can actually rely on the results from this - * query. However: - * - * 1. We don't want to rely on a specific version of the Moray server. - * Support for these options is fairly new (see - * http://smartos.org/bugview/MORAY-104 and - * http://smartos.org/bugview/MORAY-428) and being able to perform data - * migrations is a basic requirement of the service, so we don't want to - * prevent that from happening if Moray was rolled back in a DC to a - * version that doesn't support those flags. Moreover, at the time data - * migrations were added, the latest version of the manta-moray image in - * the "support" channel of updates.joyent.com did not include MORAY-104 - * or MORAY-428. - * - * 2. Since this filter uses only one field, Moray already has a mechanism - * that will return an InvalidQueryError in case this field is not - * indexed, which effectively acts similarly to those two different - * options mentioned above. - */ - var req = this._morayClient.findObjects(bucketName, morayFilter); - - req.once('error', function onRecordsNotAtVersionError(err) { - log.error({err: err}, - 'Error when finding next chunk of records to migrate'); - - if (verror.hasCauseWithName(err, 'InvalidQueryError')) { - /* - * We treat InvalidQueryError here as a transient error and retry - * when it occurs because: - * - * 1. We know that the LDAP filter passed to the findObjects request - * uses only one field, and that field was added with the same - * code change than this code. - * - * 2. We know that data migrations are run *after* reindexing of all - * buckets is completed and successful. - * - * As a result, we can rely on this field being indexed and - * searchable, and we know that an InvalidQueryError is returned by - * the Moray server only when the bucket cache of the Moray instance - * that responded has not been refreshed yet. - */ - log.info('Scheduling retry in ' + RETRY_DELAY_IN_MS + ' ms'); - setTimeout(function retry() { - log.info({version: version}, - 'Retrying to find records at version less than'); - self.findRecordsToMigrate(modelName, version, options, - callback); - }, RETRY_DELAY_IN_MS); - } - }); - - req.on('record', function onRecord(record) { - records.push(record); - }); - - req.once('end', function onEnd() { - callback(null, records); - }); -}; - -/* - * Generates a Moray batch request to PUT all objects in the array of objects - * "records", and call "callback" when it's done. - * - * @params {String} modelName: the name of the model (e.g "vms", "vm_role_tags", - * "server_vms") for which to generate a PUT batch operation - * - * @params {ArrayOfObjects} records - * - * @params {Function} callback(err) - */ -Moray.prototype.putBatch = function putBatch(modelName, records, callback) { - assert.string(modelName, 'modelName'); - assert.arrayOfObject(records, 'records'); - assert.func(callback, 'callback'); - - var bucketName = this._modelToBucketName(modelName); - assert.string(bucketName, 'bucketName'); - - this._morayClient.batch(records.map(function generateVmPutBatch(record) { - return { - bucket: bucketName, - operation: 'put', - key: record.value.uuid, - value: record.value, - etag: record._etag - }; - }), function onBatch(batchErr, meta) { - /* - * We don't care about the data in "meta" for now (the list of etags - * resulting from writing all records), and adding it later would be - * backward compatible. - */ - callback(batchErr); - }); -}; - module.exports = Moray; diff --git a/lib/data-migrations/controller.js b/lib/data-migrations/controller.js deleted file mode 100644 index 87ee1f0b..00000000 --- a/lib/data-migrations/controller.js +++ /dev/null @@ -1,305 +0,0 @@ -/* - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ - -/* - * Copyright (c) 2017, Joyent, Inc. - */ - -var assert = require('assert-plus'); -var backoff = require('backoff'); -var EventEmitter = require('events'); -var util = require('util'); -var vasync = require('vasync'); -var VError = require('verror'); - -function DataMigrationsController(options) { - assert.object(options, 'options'); - assert.object(options.log, 'options.log'); - assert.object(options.migrations, 'options.migrations'); - assert.object(options.moray, 'options.moray'); - - EventEmitter.call(this); - - this._latestErrors = undefined; - this._latestCompletedMigrations = {}; - this._log = options.log; - this._migrations = options.migrations; - this._moray = options.moray; -} -util.inherits(DataMigrationsController, EventEmitter); - -function dataMigrationErrorTransient(error) { - assert.object(error, 'error'); - - var idx; - var nonTransientErrors = [ - /* - * For now, we consider a bucket not found to be a non-transient error - * because it's not clear how that error would resolve itself by - * retrying the data migrations process. - */ - 'BucketNotFoundError', - 'InvalidIndexTypeError', - 'InvalidQueryError', - /* - * We consider NotIndexedError errors to be non-transient because data - * migrations happen *after any schema migration, including reindexing - * of all affected buckets* is considered to be complete. As a result, - * when data migrations start, the indexes that are present will not - * change, and so retrying on such an error would lead to the same error - * occurring. - */ - 'NotIndexedError', - /* - * Unless a specific data migration handles a UniqueAttributeError - * itself, we consider that retrying that migration would have the same - * result, so we treat it as a non-transient error. - */ - 'UniqueAttributeError' - ]; - - for (idx = 0; idx < nonTransientErrors.length; ++idx) { - if (VError.hasCauseWithName(error, nonTransientErrors[idx])) { - return false; - } - } - - return true; -} - -DataMigrationsController.prototype.getLatestCompletedMigrationForModel = -function getLatestCompletedMigrationForModel(modelName) { - assert.string(modelName, 'modelName'); - - return this._latestCompletedMigrations[modelName]; -}; - -DataMigrationsController.prototype.getLatestCompletedMigrations = -function getLatestCompletedMigrations() { - return this._latestCompletedMigrations; -}; - -DataMigrationsController.prototype.getLatestErrors = -function getLatestErrors() { - return this._latestErrors; -}; - -DataMigrationsController.prototype.start = function start() { - var dataMigrationsBackoff = backoff.exponential(); - var moray = this._moray; - var self = this; - - moray.validateDataMigrations(this._migrations); - this._latestErrors = undefined; - - dataMigrationsBackoff.on('backoff', - function onDataMigrationBackoff(number, delay) { - self._log.info('Data migration backed off, will retry in %sms', - delay); - }); - - dataMigrationsBackoff.on('ready', function onMigrationReady(number, delay) { - self.runMigrations(function onMigrationsRan(dataMigrationErr) { - if (dataMigrationErr) { - self._log.error({ - err: dataMigrationErr, - number: number, - delay: delay - }, 'Error when running data migrations'); - - if (dataMigrationErrorTransient(dataMigrationErr)) { - self._log.info('Error is transient, backing off'); - dataMigrationsBackoff.backoff(); - } else { - self._log.error(dataMigrationErr, - 'Error is not transient, emitting error'); - self.emit('error', dataMigrationErr); - } - } else { - self._log.info('All data migrations ran successfully'); - self.emit('done'); - } - }); - }); - - dataMigrationsBackoff.backoff(); -}; - -DataMigrationsController.prototype.runMigrations = -function runMigrations(callback) { - var modelNames; - var log = this._log; - var self = this; - - assert.object(this._migrations, 'this._dataMigrations'); - - log.info({dataMigrations: self._migrations}, 'Running data migrations'); - - modelNames = Object.keys(this._migrations); - - /* - * We run data migrations for separate models in *parallel* on purpose. Data - * migrations are heavily I/O bound, and the number of records for each - * "model" (or Moray bucket) can vary widely. Thus, performing them in - * sequence would mean that the migration of a model with very few objects - * could be significantly delayed by the migration of a model with a much - * higher number of objects. Instead, data migrations process objects in - * chunks of a bounded number of objects (currently 1000, the default Moray - * "page" limit), and thus these data migrations are interleaved, making - * none of them blocked on each other. - */ - vasync.forEachParallel({ - func: function runAllMigrationsForSingleModel(modelName, done) { - self._runMigrationsForModel(modelName, self._migrations[modelName], - done); - }, - inputs: modelNames - }, callback); -}; - -DataMigrationsController.prototype._runMigrationsForModel = -function _runMigrationsForModel(modelName, dataMigrations, callback) { - assert.string(modelName, 'modelName'); - assert.arrayOfObject(dataMigrations, 'dataMigrations'); - assert.func(callback, 'callback'); - - assert.object(this._log, 'this._log'); - var log = this._log; - var self = this; - - log.info('Starting data migrations for model %s', modelName); - self._latestCompletedMigrations = {}; - - vasync.forEachPipeline({ - func: function runSingleMigration(migration, next) { - assert.number(migration.DATA_VERSION, 'migration.DATA_VERSION'); - assert.ok(migration.DATA_VERSION >= 1, - 'migration.DATA_VERSION >= 1'); - - self._runSingleMigration(modelName, migration, { - log: log - }, function onMigration(migrationErr) { - if (migrationErr) { - if (self._latestErrors === undefined) { - self._latestErrors = {}; - } - - self._latestErrors[modelName] = migrationErr; - - log.error({err: migrationErr}, - 'Error when running migration to data version: ' + - migration.DATA_VERSION); - } else { - self._latestCompletedMigrations[modelName] = - migration.DATA_VERSION; - if (self._latestErrors && self._latestErrors[modelName]) { - delete self._latestErrors[modelName]; - if (Object.keys(self._latestErrors).length === 0) { - self._latestErrors = undefined; - } - } - log.info('Data migration to data version: ' + - migration.DATA_VERSION + ' ran successfully'); - } - - next(migrationErr); - }); - }, - inputs: dataMigrations - }, function onAllMigrationsDone(migrationsErr, results) { - var err; - - if (migrationsErr) { - err = new VError(migrationsErr, 'Failed to run data migrations'); - } - - callback(err); - }); -}; - -DataMigrationsController.prototype._runSingleMigration = -function _runSingleMigration(modelName, migration, options, callback) { - assert.string(modelName, 'modelName'); - assert.object(migration, 'migration'); - assert.func(migration.migrateRecord, 'migration.migrateRecord'); - assert.number(migration.DATA_VERSION, 'migration.DATA_VERSION'); - assert.ok(migration.DATA_VERSION >= 1, - 'migration.DATA_VERSION >= 1'); - assert.object(options, 'options'); - assert.func(callback, 'callback'); - - var context = {}; - var log = this._log; - var self = this; - var version = migration.DATA_VERSION; - - log.info('Running migration for model %s to data version: %s', modelName, - version); - - function processNextChunk() { - vasync.pipeline({arg: context, funcs: [ - function findRecords(ctx, next) { - self._moray.findRecordsToMigrate(modelName, version, { - log: log - }, function onFindRecords(findErr, records) { - if (findErr) { - log.error({err: findErr}, - 'Error when finding records not at version: ' + - version); - } else { - log.info('Found ' + records.length + ' records'); - ctx.records = records; - } - - next(findErr); - }); - }, - function migrateRecords(ctx, next) { - var migrateRecordFunc = migration.migrateRecord; - var migratedRecords; - var records = ctx.records; - - assert.arrayOfObject(records, 'records'); - - if (records.length === 0) { - next(); - return; - } - - migratedRecords = records.map(function migrate(record) { - return migrateRecordFunc(record, {log: log}); - }); - - log.trace({migratedRecords: migratedRecords}, - 'Migrated records'); - - self._moray.putBatch(modelName, migratedRecords, next); - } - ]}, function onChunkProcessed(chunkProcessingErr) { - var records = context.records; - - if (chunkProcessingErr) { - log.error({err: chunkProcessingErr}, - 'Error when processing chunk'); - callback(chunkProcessingErr); - return; - } - - if (!records || records.length === 0) { - log.info('No more records at version: ' + version + - ', migration done'); - callback(); - } else { - log.info('Processed ' + records.length + ' records, ' + - 'scheduling processing of next chunk'); - setImmediate(processNextChunk); - } - }); - } - - processNextChunk(); -}; -module.exports = DataMigrationsController; \ No newline at end of file diff --git a/lib/data-migrations/loader.js b/lib/data-migrations/loader.js deleted file mode 100644 index a5c2c663..00000000 --- a/lib/data-migrations/loader.js +++ /dev/null @@ -1,257 +0,0 @@ -/* - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ - -/* - * Copyright (c) 2017, Joyent, Inc. - */ - -/* - * This module implements a "loadMigrations" function that loads migration code - * from a directory on the filesystem. It is used both by the VMAPI server to - * load actual data migrations code and by tests exercising the data migrations - * process to load migration fixtures. - */ - -var assert = require('assert-plus'); -var EventEmitter = require('events'); -var fs = require('fs'); -var path = require('path'); -var vasync = require('vasync'); -var util = require('util'); - -var errors = require('../errors'); - -var DEFAULT_MIGRATIONS_ROOT_PATH = path.resolve(__dirname, 'migrations'); -var InvalidDataMigrationFileNamesError = - errors.InvalidDataMigrationFileNamesError; -/* - * A migration module file name *must* start with three digits (in order to make - * it clear when listing files the order with which the code in these files will - * be executed), and *must* end with a ".js" file extension. - */ -var MIGRATION_FILE_RE = /^\d{3}-.*\.js$/; - -/* - * Loads all of the data migration code present in a data migrations directory. - * A data migrations directory is of the following form: - * - * data-migrations-root-dir/ - * vms/ - * 001-some-data-migration.js - * 002-some-other-data-migration.js - * server_vms/ - * 001-some-data-migration.js - * 002-some-other-data-migration.js - * vm_role_tags/ - * 001-some-data-migration.js - * 002-some-other-data-migration.js - * - * The data migrations root dir ("data-migrations-root-dir" in the example - * above) can have any name. Each of its sub-directory must have the name of a - * VMAPI Moray bucket, but not all VMAPI Moray buckets must have a data - * migrations sub-directory: Moray buckets that don't need to have any migration - * running don't need to have an empty directory present. - * - * Each sub-directory must have files using the ".js" extension that can be - * loaded as a Node.js module using the "require" statement. - * - * For a given data migrations sub-directory, the alphanumerical order will be - * used to determine in which order each data migration is performed. - * - * @params {Object} options (optional) - * - {String} migrationsRootPath: the root directory where the data migrations - * modules are present - * - * @params {Function} callback (required): the function called when all data - * migration modules have been loaded - */ -function loadMigrations(options, callback) { - var context = { - migrations: {} - }; - var log; - var migrationsRootPath; - - if (typeof (options) === 'function') { - callback = options; - options = undefined; - } - - assert.object(options, 'options'); - assert.object(options.log, 'options.log'); - assert.optionalString(options.migrationsRootPath, - 'options.migrationsRootPath'); - assert.func(callback, 'callback'); - - log = options.log; - - migrationsRootPath = options.migrationsRootPath; - if (migrationsRootPath === undefined) { - migrationsRootPath = DEFAULT_MIGRATIONS_ROOT_PATH; - } - - log.info('Loading data migrations from root directory %s', - migrationsRootPath); - - vasync.pipeline({arg: context, funcs: [ - readRootMigrationDir, - checkRootMigrationDirEntries, - readMigrationsDirs - ]}, function onMigrationsLoaded(err, results) { - if (err) { - log.error(err, 'Error when loading data migrations'); - } else { - log.info('Data migrations loaded successfully'); - } - - callback(err, context.migrations); - }); - - /* - * First, read the sub-directories under the top-level root directory - * that represents the containers of migration files for each Moray - * bucket name. - */ - function readRootMigrationDir(ctx, next) { - log.debug('Reading root migration directory'); - fs.readdir(migrationsRootPath, - function onRootDirRead(rootDirReadErr, dirEntries) { - if (rootDirReadErr) { - log.debug(rootDirReadErr, - 'Error when reading root migration directory'); - } else { - log.debug({dirEntries: dirEntries}, - 'Successfully read root migration directory'); - } - - if (dirEntries) { - ctx.migrationsDirPaths = - dirEntries.map(function getFullPath(dirEntry) { - return path.join(migrationsRootPath, dirEntry); - }); - } - - next(rootDirReadErr); - }); - } - - /* - * Then, check that these directory entries are actually - * (sub-)directories, and not any type of directory entry (files, etc.). - */ - function checkRootMigrationDirEntries(ctx, next) { - assert.arrayOfString(ctx.migrationsDirPaths, - 'ctx.migrationsDirPaths'); - - log.debug({migrationsDirPaths: ctx.migrationsDirPaths}, - 'Checking top level migration dir entries'); - - vasync.forEachParallel({ - func: function checkIsDirectory(dirPath, done) { - var err; - - fs.lstat(dirPath, - function onLstat(lstatErr, stats) { - if (lstatErr) { - done(lstatErr); - return; - } - - if (!stats || !stats.isDirectory()) { - err = new Error(dirPath + - ' is not a directory'); - } - - done(err); - }); - }, - inputs: ctx.migrationsDirPaths - }, function onTopLevelDirsChecked(checkErr) { - if (checkErr) { - log.debug(checkErr, - 'Error when checking root migration dir entries'); - } else { - log.debug('Checked root migration dir entries ' + - 'successfully'); - } - - next(checkErr); - }); - } - - /* - * Finally, load each file in those sub-directories as a JS module. - */ - function readMigrationsDirs(ctx, next) { - log.debug('Reading data migrations subdirectories'); - - vasync.forEachParallel({func: function loadFiles(dirPath, done) { - var modelName = path.basename(dirPath); - - log.debug({ - dirPath: dirPath - }, 'Reading data migrations subdirectory'); - - fs.readdir(dirPath, function onDirRead(dirReadErr, migrationFiles) { - var invalidFileNames; - - log.trace({migrationFiles: migrationFiles}, 'migration files'); - - if (dirReadErr) { - log.error({ - dirPath: dirPath, - err: dirReadErr - }, 'Error when reading data migrations subdirectory'); - done(dirReadErr); - return; - } - - invalidFileNames = - migrationFiles.filter( - function isInvalidMigrationFilename(fileName) { - return !MIGRATION_FILE_RE.test(fileName); - }); - - log.trace({invalidFileNames: invalidFileNames}, - 'Found %d invalid file names', invalidFileNames.length); - - if (invalidFileNames.length !== 0) { - done(new - InvalidDataMigrationFileNamesError(invalidFileNames)); - return; - } - - /* - * Array.sort() sorts "according to unicode code points", so - * migration files will be sorted alphanumerically. E.g - * 001-foo.js will be sorted (and thus run) before 002-bar.js. - */ - migrationFiles.sort(); - - ctx.migrations[modelName] = - migrationFiles.map(function load(file) { - return require(path.join(dirPath, file)); - }); - - done(); - }); - }, inputs: ctx.migrationsDirPaths - }, function onMigrationDirsRead(readDirsErr) { - if (readDirsErr) { - log.error({readDirsErr: readDirsErr}, - 'Error when reading migration dirs'); - } else { - log.info('Read migration dirs successfully'); - } - - next(readDirsErr); - }); - } -} - -module.exports = { - loadMigrations: loadMigrations -}; \ No newline at end of file diff --git a/lib/data-migrations/noop-controller.js b/lib/data-migrations/noop-controller.js deleted file mode 100644 index 43ccc9ca..00000000 --- a/lib/data-migrations/noop-controller.js +++ /dev/null @@ -1,49 +0,0 @@ -/* - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ - -/* - * Copyright (c) 2017, Joyent, Inc. - */ - -/* - * This module implements a mocked data migrations controller that immediately - * emits an event signaling that all migrations completed successfully. It is - * meant to be used when a VmapiApp instance needs to be created but we don't - * really care about data migrations (e.g tests that do not test data migrations - * specifically). - */ - -var assert = require('assert-plus'); -var EventEmitter = require('events'); -var util = require('util'); - -function NoopDataMigrationsController() { - EventEmitter.call(this); -} -util.inherits(NoopDataMigrationsController, EventEmitter); - -NoopDataMigrationsController.prototype.start = function start() { - this.emit('done'); -}; - -NoopDataMigrationsController.prototype.getLatestCompletedMigrations = -function getLatestCompletedMigrations() { - return {}; -}; - -NoopDataMigrationsController.prototype.getLatestErrors = -function getLatestErrors() { - return undefined; -}; - -NoopDataMigrationsController.prototype.getLatestCompletedMigrationForModel = -function getLatestCompletedMigrationForModel(modelName) { - assert.string(modelName, 'modelName'); - - return undefined; -}; - -module.exports = NoopDataMigrationsController; \ No newline at end of file diff --git a/lib/endpoints/ping.js b/lib/endpoints/ping.js index 0958e4d7..80453b59 100644 --- a/lib/endpoints/ping.js +++ b/lib/endpoints/ping.js @@ -29,7 +29,6 @@ var NOT_OK_OVERALL_STATUS = 'some services are not ready'; * GET /ping */ function ping(req, res, next) { - var dataMigrationsStatus = {}; var morayInitialization; var morayStatus = OFFLINE_STATUS; var wfapiServiceStatus = OFFLINE_STATUS; @@ -64,43 +63,64 @@ function ping(req, res, next) { }); }, function getMorayInitialization(done) { + var dataMigrationError; + var modelName; + + assert.object(req.app.morayBucketsInitializer, + 'req.app.morayBucketsInitializer'); + req.log.debug('checking moray initialization status...'); var morayBucketsInitStatus = req.app.morayBucketsInitializer.status(); - var morayBucketsInitError = - req.app.morayBucketsInitializer.lastInitError(); - var morayBucketsSetup = req.app.moray.bucketsSetup(); - - assert.optionalObject(morayBucketsInitError, - 'morayBucketsInitError'); - if (morayBucketsInitError) { - morayBucketsInitError = morayBucketsInitError.toString(); - } + var morayBucketsReindexStatus; + var morayBucketsSetupStatus; + + assert.object(morayBucketsInitStatus, 'morayBucketsInitStatus'); + morayBucketsReindexStatus = morayBucketsInitStatus.bucketsReindex; + morayBucketsSetupStatus = morayBucketsInitStatus.bucketsSetup; + + assert.object(morayBucketsReindexStatus, + 'morayBucketsReindexStatus'); + assert.object(morayBucketsSetupStatus, + 'morayBucketsSetupStatus'); - if (morayBucketsInitError || - ((morayBucketsInitStatus !== 'BUCKETS_SETUP_DONE') && - (morayBucketsInitStatus !== 'BUCKETS_REINDEX_DONE'))) { + if (morayBucketsSetupStatus.state !== 'DONE' || + morayBucketsReindexStatus.state !== 'DONE') { overallHealthy = false; } - if (!morayBucketsSetup) { + if (morayBucketsSetupStatus.state !== 'DONE') { overallStatus = NOT_OK_OVERALL_STATUS; } - req.log.debug({ - error: morayBucketsInitError, - status: morayBucketsInitStatus - }, 'moray initialization check results'); + morayInitialization = morayBucketsInitStatus; - morayInitialization = { - status: morayBucketsInitStatus - }; + /* + * Render all error objects so that they are human readable when + * sent as part of the JSON output of the endpoint. + */ + if (morayBucketsSetupStatus.latestError) { + morayInitialization.bucketsSetup.latestError = + morayInitialization.bucketsSetup.latestError.toString(); + } - if (morayBucketsInitError) { - morayInitialization.error = morayBucketsInitError; + if (morayInitialization.bucketsReindex.latestError) { + morayInitialization.bucketsReindex.latestError = + morayInitialization.bucketsReindex.latestError.toString(); } + for (modelName in + morayInitialization.dataMigrations.latestErrors) { + dataMigrationError = + morayInitialization.dataMigrations.latestErrors[modelName]; + morayInitialization.dataMigrations.latestErrors[modelName] = + dataMigrationError.toString(); + } + + req.log.debug(morayInitialization, + 'moray buckets initialization status'); + done(); }, function getWfApiConnectivity(done) { @@ -118,28 +138,6 @@ function ping(req, res, next) { status: wfapiServiceStatus }, 'wfapi connectivity check results'); - done(); - }, - function getDataMigrationsStatus(done) { - var latestErrors; - var modelName; - - req.log.debug('Checking data migrations status'); - - if (req.app.dataMigrationsCtrl) { - dataMigrationsStatus.latestCompletedMigrations = - req.app.dataMigrationsCtrl.getLatestCompletedMigrations(); - latestErrors = req.app.dataMigrationsCtrl.getLatestErrors(); - } - - if (latestErrors) { - for (modelName in latestErrors) { - latestErrors[modelName] = - latestErrors[modelName].toString(); - } - dataMigrationsStatus.latestErrors = latestErrors; - } - done(); } ]}, function allStatusInfoRetrieved(err) { @@ -154,7 +152,6 @@ function ping(req, res, next) { responseCode = 503; } - response.dataMigrations = dataMigrationsStatus; response.healthy = overallHealthy; response.initialization = { moray: morayInitialization diff --git a/lib/errors.js b/lib/errors.js index e038af78..6d5d6745 100644 --- a/lib/errors.js +++ b/lib/errors.js @@ -231,12 +231,12 @@ exports.ValidationFailedError = ValidationFailedError; exports.BrandNotSupportedError = BrandNotSupportedError; exports.VmNotRunningError = VmNotRunningError; -function MorayBucketsNotSetupError(lastInitError) { - assert.optionalObject(lastInitError, 'lastInitError'); +function MorayBucketsNotSetupError(bucketsSetupError) { + assert.optionalObject(bucketsSetupError, 'bucketsSetupError'); var message = 'Moray buckets are not setup'; - if (lastInitError) { - message += ', last buckets setup error: ' + lastInitError; + if (bucketsSetupError) { + message += ', last buckets setup error: ' + bucketsSetupError; } restify.ServiceUnavailableError.call(this, { @@ -246,7 +246,7 @@ function MorayBucketsNotSetupError(lastInitError) { body: { code: this.constructor.restCode, message: message, - lastInitError: lastInitError + lastInitError: bucketsSetupError } }); } diff --git a/lib/interceptors.js b/lib/interceptors.js index 9ec2fd19..3f055588 100644 --- a/lib/interceptors.js +++ b/lib/interceptors.js @@ -75,15 +75,25 @@ exports.checkMorayBucketsSetup = function checkMorayBucketsSetup(req, res, next) { assert.object(req, 'req'); assert.object(req.app, 'req.app'); + assert.object(req.app.morayBucketsInitializer, + 'req.app.morayBucketsInitializer'); assert.object(res, 'res'); assert.func(next, 'next'); - var lastMorayBucketsSetupError; var err; + var bucketsSetupError; + var morayBucketsInitStatus; + var morayBucketsSetupStatus; - if (!req.app.moray.bucketsSetup()) { - lastMorayBucketsSetupError = req.app.moray.lastBucketsSetupError(); - err = new errors.MorayBucketsNotSetupError(lastMorayBucketsSetupError); + morayBucketsInitStatus = req.app.morayBucketsInitializer.status(); + assert.object(morayBucketsInitStatus, 'morayBucketsInitStatus'); + + morayBucketsSetupStatus = morayBucketsInitStatus.bucketsSetup; + assert.object(morayBucketsSetupStatus, 'morayBucketsSetupStatus'); + + if (morayBucketsSetupStatus.state !== 'DONE') { + bucketsSetupError = morayBucketsSetupStatus.latestError; + err = new errors.MorayBucketsNotSetupError(bucketsSetupError); } next(err); diff --git a/lib/moray/moray-buckets-initializer.js b/lib/moray/moray-buckets-initializer.js deleted file mode 100644 index c7b9097b..00000000 --- a/lib/moray/moray-buckets-initializer.js +++ /dev/null @@ -1,276 +0,0 @@ -/* - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ - -/* - * Copyright (c) 2017, Joyent, Inc. - */ - -var assert = require('assert-plus'); -var backoff = require('backoff'); -var bunyan = require('bunyan'); -var events = require('events'); -var restify = require('restify'); -var util = require('util'); -var vasync = require('vasync'); -var verror = require('verror'); - -/* - * MorayBucketsInitializer instances drive the process that sets up _and_ - * reindexes the moray buckets that need to be present for VMAPI to function - * properly. They take an instance of the "Moray" constructor and an object that - * represents the desired configuration of moray buckets used by VMAPI as input. - * - * Once an instance of MorayBucketsInitializer has been created, its "start" - * method can be called to actually start the process. - * - * If the process completes successfully, a 'done' event is emitted by a - * MorayBucketsInitializer instance. If the process encounters an unrecoverable - * error, it emits an 'error' event. - */ - -/* - * The constructor for the MorayBucketsInitializer class. It derives from - * events.EventEmitter. - * - * Its parameters are: - * - * - "options" (mandatory): an object with properties and values that can be - * used to tweak the behavior of the initializer. The following properties are - * supported: - * - * * "maxBucketsSetupAttempts" (optional): the number of attempts to setup - * (create and/or update) buckets before an 'error' event is emitted. - * Its default value is "undefined" and it causes the process to be retried - * indefinitely, unless a non-transient error is encountered. - * - * * "maxBucketsReindexAttempts" (optional): the number of attempts to reindex - * buckets before an 'error' event is emitted. Its default value is - * "undefined" and it causes the process to be retried indefinitely, unless - * a non-transient error is encountered. - * - * * "log" (mandatory): the bunyan logger instance to use. - */ -function MorayBucketsInitializer(options) { - events.EventEmitter.call(this); - - assert.object(options, 'options'); - - assert.optionalNumber(options.maxBucketsSetupAttempts, - 'options.maxBucketsSetupAttempts'); - this._maxBucketsSetupAttempts = options.maxBucketsSetupAttempts; - - assert.optionalNumber(options.maxBucketsReindexAttempts, - 'options.maxBucketsReindexAttempts'); - this._maxBucketsReindexAttempts = options.maxBucketsReindexAttempts; - - assert.object(options.log, 'options.log'); - this.log = options.log; - - this._lastInitError = null; - this._status = 'NOT_STARTED'; -} -util.inherits(MorayBucketsInitializer, events.EventEmitter); - -MorayBucketsInitializer.prototype.status = function status() { - return this._status; -}; - -/* - * Returns an object representing the latest error encountered when setting up - * VMAPI's moray buckets, null otherwise. - */ -MorayBucketsInitializer.prototype.lastInitError = function lastInitError() { - return this._lastInitError; -}; - -/* - * The "start" method can be used to actually start the process of setting up - * and reindexing VMAPI's moray buckets. - * - * Its parameters are: - * - * - "moray": an instance of the Moray constructor used to - * actually perform operations against the moray key/value store. - * - * - "morayBucketsConfig": an object that represents the configuration of the - * buckets that need to be setup in moray for VMAPI to be able to function - * properly. - * - * When the process completes successfully, the 'done' event is emitted on the - * MorayBucketsInitializer instance. - * - * When the process encounters an error, it emits an 'error' event if the error - * is considered to be unrecoverable. If the error is considered to be - * recoverable, it restarts the process until it succeeds, or until the maximum - * number of retries has been reached. - * - * If the maximum number of retries has been reached, the 'error' event is - * emitted. - * - * Transient moray errors are considered to be recoverable and non-transient - * errors (such as bad bucket configuration errors) are considered to be - * unrecoverable. - */ -MorayBucketsInitializer.prototype.start = - function start(moray) { - assert.object(moray, 'moray'); - - var self = this; - - self._status = 'STARTED'; - - vasync.pipeline({arg: {}, funcs: [ - function setupBuckets(arg, next) { - self.log.info('Starting setting up buckets'); - self._setupBuckets(moray, function onBucketsSetup(bucketsSetupErr) { - if (!bucketsSetupErr) { - self.log.info('Buckets setup successfully'); - self._status = 'BUCKETS_SETUP_DONE'; - } else { - self.log.error({err: bucketsSetupErr}, - 'Error when setting up buckets'); - } - - next(bucketsSetupErr); - }); - }, - function reindexBuckets(arg, next) { - self.log.info('Starting reindexing buckets'); - self._reindexBuckets(moray, - function onBucketsReindexed(bucketsReindexErr) { - if (!bucketsReindexErr) { - self.log.info('Buckets reindexed successfully'); - self._status = 'BUCKETS_REINDEX_DONE'; - } else { - self.log.error({err: bucketsReindexErr}, - 'Error when reindexing buckets'); - } - - next(bucketsReindexErr); - }); - } - ]}, function onBucketsInitialized(bucketsInitErr) { - if (bucketsInitErr) { - self.log.error({err: bucketsInitErr}, - 'Error when initializing moray buckets'); - self._status = 'FAILED'; - self.emit('error', bucketsInitErr); - } else { - self.log.info('Buckets initialized successfully'); - self.emit('done'); - } - }); -}; - -MorayBucketsInitializer.prototype._performBackedOffProcess = - function _performBackedOffProcess(processName, fun, options, callback) { - assert.string(processName, 'processName'); - assert.func(fun, 'fun'); - assert.object(options, 'options'); - assert.optionalNumber(options.maxAttempts, 'options.maxAttempts'); - assert.func(options.isErrTransientFun, 'options.isErrTransientFun'); - assert.func(callback, 'callback'); - - var INITIAL_SETUP_BUCKET_BACKOFF_DELAY_MS = 10; - var MAX_SETUP_BUCKET_BACKOFF_DELAY_MS = 5000; - - var processBackoff = backoff.exponential({ - initialDelay: INITIAL_SETUP_BUCKET_BACKOFF_DELAY_MS, - maxDelay: MAX_SETUP_BUCKET_BACKOFF_DELAY_MS - }); - var self = this; - - if (options.maxAttempts !== undefined) { - processBackoff.failAfter(options.maxAttempts); - } - - function onProcessDone(processErr) { - var errTransient = true; - - if (processErr) { - self._lastInitError = processErr; - - errTransient = options.isErrTransientFun(processErr); - if (!errTransient) { - self.log.error({error: processErr}, - 'Non transient error when performing moray initializer ' + - 'process ' + processName); - - self.log.debug('stopping moray process backoff'); - processBackoff.reset(); - - callback(processErr); - return; - } else { - self.log.warn({error: processErr}, - 'Transient error encountered, backing off'); - processBackoff.backoff(); - return; - } - } else { - self._lastInitError = null; - self.log.info('Moray process done!'); - processBackoff.reset(); - callback(); - return; - } - } - - processBackoff.on('ready', function onSetupBucketsBackoffReady() { - fun(onProcessDone); - }); - - processBackoff.on('backoff', function onMorayProcessBackoff(number, delay) { - self.log.warn({ - number: number, - delay: delay - }, 'Moray process backed off'); - }); - - processBackoff.on('fail', function onProcessFail() { - callback(new Error('Maximum number of tries reached when ' + - 'performing ' + processName)); - }); - - processBackoff.backoff(); -}; - -MorayBucketsInitializer.prototype._setupBuckets = - function _setupBuckets(moray, callback) { - assert.object(moray, 'moray'); - assert.func(callback, 'callback'); - - var self = this; - - self._performBackedOffProcess('buckets setup', - moray.setupBuckets.bind(moray), { - maxAttempts: self._maxBucketsSetupAttempts, - isErrTransientFun: - moray.isBucketsSetupErrorTransient.bind(moray) - }, callback); -}; - -MorayBucketsInitializer.prototype._reindexBuckets = - function _reindexBuckets(moray, callback) { - - assert.object(moray, 'moray'); - assert.func(callback, 'callback'); - - var self = this; - - self._performBackedOffProcess('buckets reindex', - moray.reindexBuckets.bind(moray), { - maxAttempts: self._maxBucketsReindexAttempts, - isErrTransientFun: function isReindexErrorTransient(err) { - /* - * Reindexing errors are always transient. - */ - return true; - } - }, callback); -}; - -module.exports = MorayBucketsInitializer; \ No newline at end of file diff --git a/lib/moray/moray-init.js b/lib/moray/moray-init.js index 269799ec..9c1ad25f 100644 --- a/lib/moray/moray-init.js +++ b/lib/moray/moray-init.js @@ -23,7 +23,7 @@ var mod_moray = require('moray'); var restify = require('restify'); var Moray = require('../apis/moray'); -var MorayBucketsInitializer = require('./moray-buckets-initializer.js'); +var MorayBucketsInitializer = require('moray-buckets').MorayBucketsInitializer; var DEFAULT_MORAY_BUCKETS_CONFIG = require('./moray-buckets-config.js'); /* @@ -92,7 +92,9 @@ function startMorayInit(options) { options = options || {}; - assert.object(options.morayConfig, 'options.morayConfig'); + assert.object(options.changefeedPublisher, 'options.changefeedPublisher'); + assert.optionalString(options.dataMigrationsPath, + 'options.dataMigrationsPath'); assert.optionalObject(options.log, 'options.log'); assert.optionalNumber(options.maxBucketsReindexAttempts, 'options.maxBucketsReindexAttempts'); @@ -100,9 +102,10 @@ function startMorayInit(options) { 'options.maxBucketsSetupAttempts'); assert.optionalObject(options.morayBucketsConfig, 'options.morayBucketsConfig'); - assert.object(options.changefeedPublisher, 'options.changefeedPublisher'); + assert.object(options.morayConfig, 'options.morayConfig'); var changefeedPublisher = options.changefeedPublisher; + var dataMigrationsPath = options.dataMigrationsPath; var log = options.log; var maxBucketsReindexAttempts = options.maxBucketsReindexAttempts; var maxBucketsSetupAttempts = options.maxBucketsSetupAttempts; @@ -138,21 +141,25 @@ function startMorayInit(options) { component: 'moray-buckets-initializer' }, true); + var morayBucketsInitializer = new MorayBucketsInitializer({ + bucketsConfig: morayBucketsConfig, + dataMigrationsPath: dataMigrationsPath, + log: morayBucketsInitializerLog, + maxBucketsReindexAttempts: maxBucketsReindexAttempts, + maxBucketsSetupAttempts: maxBucketsSetupAttempts, + morayClient: morayClient + }); + moray = new Moray({ - changefeedPublisher: changefeedPublisher, bucketsConfig: morayBucketsConfig, + changefeedPublisher: changefeedPublisher, + morayBucketsInitializer: morayBucketsInitializer, morayClient: morayClient, log: morayStorageLog }); - var morayBucketsInitializer = new MorayBucketsInitializer({ - maxBucketsSetupAttempts: maxBucketsSetupAttempts, - maxBucketsReindexAttempts: maxBucketsReindexAttempts, - log: morayBucketsInitializerLog - }); - morayClient.on('connect', function onMorayClientConnected() { - morayBucketsInitializer.start(moray); + morayBucketsInitializer.start(); }); return { diff --git a/lib/vmapi.js b/lib/vmapi.js index 4e42b5cf..e9896a82 100644 --- a/lib/vmapi.js +++ b/lib/vmapi.js @@ -122,16 +122,6 @@ function VmapiApp(options) { this.options = options; - /* - * We make it mandatory to pass a data migrations controller so that we - * don't omit to pass it to the VMAPI application constructor by mistake at - * some point, even though technically in a lot of use cases (e.g tests) - * when we don't need to perform data migrations, it'd be perfectly fine to - * omit it. - */ - assert.object(options.dataMigrationsCtrl, 'options.dataMigrationsCtrl'); - this.dataMigrationsCtrl = options.dataMigrationsCtrl; - validations.init(options); this._initApis(options); } @@ -408,14 +398,26 @@ VmapiApp.prototype.listen = function (options, callback) { VmapiApp.prototype.getLatestCompletedDataMigrationForModel = function getLatestCompletedDataMigrationForModel(modelName) { + var completedDataMigrations; + var morayBucketsInitializer = this.morayBucketsInitializer; + var morayBucketsInitStatus; + var dataMigrationsStatus; + assert.ok(this.moray.isValidModelName(modelName), modelName + ' is valid'); + assert.object(morayBucketsInitializer, 'morayBucketsInitializer'); - var dataMigrationsCtrl = this.dataMigrationsCtrl; - if (dataMigrationsCtrl === undefined) { - return; + morayBucketsInitStatus = morayBucketsInitializer.status(); + assert.object(morayBucketsInitStatus, 'morayBucketsInitStatus'); + + dataMigrationsStatus = morayBucketsInitStatus.dataMigrations; + assert.object(dataMigrationsStatus, 'dataMigrationsStatus'); + + completedDataMigrations = dataMigrationsStatus.completed; + if (completedDataMigrations === undefined) { + return undefined; } - return dataMigrationsCtrl.getLatestCompletedMigrationForModel(modelName); + return completedDataMigrations[modelName]; }; diff --git a/package.json b/package.json index 2f7d67b3..cb8e14f2 100644 --- a/package.json +++ b/package.json @@ -20,6 +20,7 @@ "ldap-filter": "0.3.3", "libuuid": "0.2.1", "moray": "3.1.1", + "moray-buckets": "1.0.0", "nodeunit": "0.9.1", "once": "^1.3.3", "restify": "4.3.0", diff --git a/server.js b/server.js index e3011f7e..7a483738 100644 --- a/server.js +++ b/server.js @@ -36,8 +36,6 @@ var WFAPI = require('./lib/apis/wfapi'); var configLoader = require('./lib/config-loader'); var createMetricsManager = require('./lib/metrics').createMetricsManager; -var DataMigrationsController = require('./lib/data-migrations/controller'); -var dataMigrationsLoader = require('./lib/data-migrations/loader'); var morayInit = require('./lib/moray/moray-init.js'); var DATA_MIGRATIONS; @@ -140,8 +138,6 @@ function startVmapiService() { var changefeedPublisher; var configFilePath = path.join(__dirname, 'config.json'); var config = configLoader.loadConfig(configFilePath); - var dataMigrations; - var dataMigrationsCtrl; var metricsManager; var vmapiLog = bunyan.createLogger({ name: 'vmapi', @@ -173,24 +169,6 @@ function startVmapiService() { next(); }); }, - function loadDataMigrations(_, next) { - vmapiLog.info('Loading data migrations modules'); - - dataMigrationsLoader.loadMigrations({ - log: vmapiLog.child({ component: 'migrations-loader' }, true) - }, function onMigrationsLoaded(migrationsLoadErr, migrations) { - if (migrationsLoadErr) { - vmapiLog.error({err: migrationsLoadErr}, - 'Error when loading data migrations modules'); - } else { - vmapiLog.info({migrations: migrations}, - 'Loaded data migrations modules successfully!'); - } - - dataMigrations = migrations; - next(migrationsLoadErr); - }); - }, function initMoray(_, next) { assert.object(changefeedPublisher, 'changefeedPublisher'); @@ -199,43 +177,17 @@ function startVmapiService() { morayConfig.changefeedPublisher = changefeedPublisher; var moraySetup = morayInit.startMorayInit({ - morayConfig: morayConfig, + changefeedPublisher: changefeedPublisher, + dataMigrationsPath: path.join(__dirname, 'lib', + 'data-migrations', 'migrations'), log: vmapiLog.child({ component: 'moray-init' }, true), - changefeedPublisher: changefeedPublisher + morayConfig: morayConfig }); morayBucketsInitializer = moraySetup.morayBucketsInitializer; morayClient = moraySetup.morayClient; moray = moraySetup.moray; - /* - * We don't set an 'error' event listener because we want the - * process to abort when there's a non-transient data migration - * error. - */ - dataMigrationsCtrl = new DataMigrationsController({ - log: vmapiLog.child({ - component: 'migrations-controller' - }, true), - migrations: dataMigrations, - moray: moray - }); - - /* - * We purposely start data migrations *only when all buckets are - * updated and reindexed*. Otherwise, if we we migrated records that - * have a value for a field for which a new index was just added, - * moray could discard that field when fetching the object using - * findObjects or getObject requests (See - * http://smartos.org/bugview/MORAY-104 and - * http://smartos.org/bugview/MORAY-428). We could thus migrate - * those records erroneously, and in the end write bogus data. - */ - morayBucketsInitializer.on('done', - function onMorayBucketsInitialized() { - dataMigrationsCtrl.start(); - }); - /* * We don't want to wait for the Moray initialization process to be * done before creating the HTTP server that will provide VMAPI's @@ -298,7 +250,6 @@ function startVmapiService() { var vmapiApp = new VmapiApp({ apiClients: apiClients, changefeedPublisher: changefeedPublisher, - dataMigrationsCtrl: dataMigrationsCtrl, log: vmapiLog.child({ component: 'http-api' }, true), metricsManager: metricsManager, moray: moray, diff --git a/test/fixtures/data-migrations-invalid-filenames/vms/001-invalid-file-extension.foo b/test/fixtures/data-migrations-invalid-filenames/vms/001-invalid-file-extension.foo deleted file mode 100644 index e3e417b8..00000000 --- a/test/fixtures/data-migrations-invalid-filenames/vms/001-invalid-file-extension.foo +++ /dev/null @@ -1,9 +0,0 @@ -/* - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ - -/* - * Copyright (c) 2017, Joyent, Inc. - */ \ No newline at end of file diff --git a/test/fixtures/data-migrations-invalid-filenames/vms/invalid-file-name.js b/test/fixtures/data-migrations-invalid-filenames/vms/invalid-file-name.js deleted file mode 100644 index e3e417b8..00000000 --- a/test/fixtures/data-migrations-invalid-filenames/vms/invalid-file-name.js +++ /dev/null @@ -1,9 +0,0 @@ -/* - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ - -/* - * Copyright (c) 2017, Joyent, Inc. - */ \ No newline at end of file diff --git a/test/fixtures/vmapi-server-throwing-expected-stderr.txt b/test/fixtures/vmapi-server-throwing-expected-stderr.txt index 79ea48ef..6e93d325 100644 --- a/test/fixtures/vmapi-server-throwing-expected-stderr.txt +++ b/test/fixtures/vmapi-server-throwing-expected-stderr.txt @@ -1,6 +1,6 @@ -/opt/smartdc/vmapi/test/fixtures/vmapi-server-with-throwing-handler.js:24 +/opt/smartdc/vmapi/test/fixtures/vmapi-server-with-throwing-handler.js:22 throw new Error('boom'); ^ Error: boom - at Server.throwingRestifyHandler (/opt/smartdc/vmapi/test/fixtures/vmapi-server-with-throwing-handler.js:24:11) + at Server.throwingRestifyHandler (/opt/smartdc/vmapi/test/fixtures/vmapi-server-with-throwing-handler.js:22:11) diff --git a/test/fixtures/vmapi-server-with-throwing-handler.js b/test/fixtures/vmapi-server-with-throwing-handler.js index eaa921c5..e4a81c28 100644 --- a/test/fixtures/vmapi-server-with-throwing-handler.js +++ b/test/fixtures/vmapi-server-with-throwing-handler.js @@ -14,8 +14,6 @@ var path = require('path'); var vasync = require('vasync'); var changefeedUtils = require('../../lib/changefeed'); -var NoopDataMigrationsController = - require('../../lib/data-migrations/noop-controller'); var VmapiApp = require('../../lib/vmapi'); var UNIQUE_ENDPOINT_PATH = '/' + libuuid.create(); @@ -49,10 +47,20 @@ vasync.pipeline({funcs: [ bucketsSetup: function bucketsSetup() { return true; } }, changefeedPublisher: changefeedUtils.createNoopCfPublisher(), - dataMigrationsCtrl: new NoopDataMigrationsController(), metricsManager: mockedMetricsManager, morayBucketsInitializer: { - status: function status() { return 'BUCKETS_REINDEX_DONE'; }, + status: function status() { + return { + /* + * This needs to be 'DONE' so that the restify + * middleware/interceptor that makes all requests error + * _before_ their custom handler runs does not kick in. + */ + bucketsSetup: { state: 'DONE' }, + bucketsReindex: { state: 'NOT_STARTED' }, + dataMigrations: { state: 'NOT_STARTED' } + }; + }, lastInitError: function lastInitError() { return null; } } }); diff --git a/test/vms.data-migrations.test.js b/test/vms.data-migrations.test.js index 059d16d5..e702dcaa 100644 --- a/test/vms.data-migrations.test.js +++ b/test/vms.data-migrations.test.js @@ -11,8 +11,6 @@ var assert = require('assert-plus'); var bunyan = require('bunyan'); var jsprim = require('jsprim'); -var libuuid = require('libuuid'); -var once = require('once'); var path = require('path'); var restify = require('restify'); var util = require('util'); @@ -22,8 +20,6 @@ var VMAPI = require('sdc-clients').VMAPI; var changefeedUtils = require('../lib/changefeed'); var common = require('./common'); -var DataMigrationsController = require('../lib/data-migrations/controller'); -var dataMigrationsLoader = require('../lib/data-migrations/loader'); var morayInit = require('../lib/moray/moray-init'); var testMoray = require('./lib/moray.js'); var VmapiApp = require('../lib/vmapi'); @@ -43,29 +39,20 @@ var VMS_BUCKET_NAME = 'test_vmapi_vms_data_migrations'; var SERVER_VMS_BUCKET_NAME = 'test_vmapi_server_vms_data_migrations'; var ROLE_TAGS_BUCKET_NAME = 'test_vmapi_vm_role_tags_data_migrations'; -/* - * We use two versions for the VMS_BUCKET_CONFIG (VMS_BUCKET_CONFIG_V1 and - * VMS_BUCKET_CONFIG_V2) to exercise the code path where finding objects to - * migrate fails with an InvalidQueryError due to the fact that some Moray - * instances do not have the data_version field indexed in their bucket cache. - * See https://smartos.org/bugview/TRITON-214 for context. - */ -var VMS_BUCKET_CONFIG_V1 = { - name: VMS_BUCKET_NAME, - schema: { - index: { - foo: { type: 'string' }, - bar: { type: 'string' } - } - } -}; - -var VMS_BUCKET_CONFIG_V2 = { +var VMS_BUCKET_CONFIG = { name: VMS_BUCKET_NAME, schema: { index: { foo: { type: 'string' }, bar: { type: 'string' }, + /* + * The "uuid" and "internal_metadata_search_array" indexes are + * required to be able to make sure that filtering on + * "internal_metadata" works as expected once all data migrations + * completed successfully. + */ + uuid: { type: 'string' }, + internal_metadata_search_array: { type: '[string]'}, data_version: { type: 'number' } }, options: { @@ -85,14 +72,8 @@ var ROLE_TAGS_MORAY_BUCKET_CONFIG = { } }; -var TEST_BUCKETS_CONFIG_V1 = { - vms: VMS_BUCKET_CONFIG_V1, - server_vms: SERVER_VMS_MORAY_BUCKET_CONFIG, - vm_role_tags: ROLE_TAGS_MORAY_BUCKET_CONFIG -}; - -var TEST_BUCKETS_CONFIG_V2 = { - vms: VMS_BUCKET_CONFIG_V2, +var TEST_BUCKETS_CONFIG = { + vms: VMS_BUCKET_CONFIG, server_vms: SERVER_VMS_MORAY_BUCKET_CONFIG, vm_role_tags: ROLE_TAGS_MORAY_BUCKET_CONFIG }; @@ -111,14 +92,13 @@ function findAllObjects(morayClient, bucketName, filter, callback) { assert.string(bucketName, 'bucketName'); assert.func(callback, 'callback'); - var callbackOnce = once(callback); var allRecords = []; var findAllObjectsReq = morayClient.findObjects(bucketName, filter); findAllObjectsReq.once('error', function onError(findErr) { cleanup(); - callbackOnce(findErr); + callback(findErr); }); findAllObjectsReq.on('record', function onRecord(record) { @@ -127,7 +107,7 @@ function findAllObjects(morayClient, bucketName, filter, callback) { findAllObjectsReq.once('end', function onGotAllRecords() { cleanup(); - callbackOnce(null, allRecords); + callback(null, allRecords); }); function cleanup() { @@ -137,38 +117,12 @@ function findAllObjects(morayClient, bucketName, filter, callback) { } } -exports.data_migrations_invalid_filenames = function (t) { - var dataMigrationsLoaderLogger = bunyan.createLogger({ - name: 'data-migrations-loader', - level: 'debug', - serializers: restify.bunyan.serializers - }); - - dataMigrationsLoader.loadMigrations({ - log: dataMigrationsLoaderLogger, - migrationsRootPath: path.resolve(__dirname, 'fixtures', - 'data-migrations-invalid-filenames') - }, function onMigrationsLoaded(loadMigrationsErr, migrations) { - var expectedErrorName = 'InvalidDataMigrationFileNamesError'; - - t.ok(loadMigrationsErr, - 'loading migrations with invalid filenames should error'); - - if (loadMigrationsErr) { - t.ok(VError.hasCauseWithName(loadMigrationsErr, expectedErrorName), - 'error should have a cause of ' + expectedErrorName); - } - - t.done(); - }); -}; - exports.data_migrations = function (t) { var context = {}; var TRANSIENT_ERROR_MSG = 'Mocked transient error'; vasync.pipeline({arg: context, funcs: [ - function cleanup(ctx, next) { + function cleanupBuckets(ctx, next) { testMoray.cleanupLeftoverBuckets([ VMS_BUCKET_NAME, SERVER_VMS_BUCKET_NAME, @@ -180,43 +134,64 @@ exports.data_migrations = function (t) { next(cleanupErr); }); }, - function setupMorayBuckets(ctx, next) { + /* + * Start the buckets initialization process again after injecting a + * transient error in the data migration process. This way we can check + * that VMAPI reacts properly to errors at this specific stage of the + * buckets init process. When we're done, we'll remove the injected + * transient error, and make sure VMAPI can provide the functionality + * that depends on data migrations successfully. + */ + function startDataMigrationsWithTransientErr(ctx, next) { var morayBucketsInitializer; var morayClient; - var moraySetup = morayInit.startMorayInit({ + var moraySetup; + + moraySetup = morayInit.startMorayInit({ + changefeedPublisher: changefeedUtils.createNoopCfPublisher(), + dataMigrationsPath: path.join(__dirname, 'fixtures', + 'data-migrations-valid'), morayConfig: common.config.moray, - morayBucketsConfig: TEST_BUCKETS_CONFIG_V1, - changefeedPublisher: changefeedUtils.createNoopCfPublisher() + morayBucketsConfig: TEST_BUCKETS_CONFIG }); - var nextOnce = once(next); ctx.moray = moraySetup.moray; ctx.morayBucketsInitializer = morayBucketsInitializer = moraySetup.morayBucketsInitializer; ctx.morayClient = morayClient = moraySetup.morayClient; - function cleanUp() { - morayBucketsInitializer.removeAllListeners('error'); - morayBucketsInitializer.removeAllListeners('done'); - } - - morayBucketsInitializer.on('done', function onMorayBucketsInit() { - t.ok(true, - 'original moray buckets setup should be ' + - 'successful'); + ctx.originalBatch = ctx.morayClient.batch; + ctx.morayClient.batch = + function mockedBatch(listOpts, callback) { + assert.arrayOfObject(listOpts, 'listOpts'); + assert.func(callback, 'callback'); - cleanUp(); - nextOnce(); - }); + callback(new Error(TRANSIENT_ERROR_MSG)); + }; - morayBucketsInitializer.on('error', - function onMorayBucketsInitError(morayBucketsInitErr) { - t.ok(!morayBucketsInitErr, - 'original moray buckets initialization should ' + - 'not error'); + ctx.morayBucketsInitializer.once('done', + function onBucketsInitDone() { + t.ok(false, 'Moray buckets init should not complete when ' + + 'transient error injected in data migrations'); + }); - cleanUp(); - nextOnce(morayBucketsInitErr); + ctx.morayBucketsInitializer.once('error', + function onBucketsInitError(bucketsInitErr) { + t.ok(false, 'Moray buckets init should not error when ' + + 'transient error injected in data migrations'); + }); + /* + * Move on to the next step of this test only when reindexing has + * completed successfully, so that we know that at some point (once + * all buckets caches are refreshed) we can test that the search on + * internal_metadata should be successful, since the required index + * will be present. + */ + ctx.morayBucketsInitializer.once('buckets-reindex-done', + function onBucketsSetupDone() { + t.ok(true, 'Moray buckets setup should complete when ' + + 'transient error injected in data migrations'); + next(); }); }, function writeTestObjects(ctx, next) { @@ -225,89 +200,17 @@ exports.data_migrations = function (t) { testMoray.writeObjects(ctx.morayClient, VMS_BUCKET_NAME, { foo: 'foo' }, NUM_TEST_OBJECTS, function onTestObjectsWritten(writeErr) { - ctx.morayClient.close(); - t.ok(!writeErr, 'writing test objects should not error, got: ' + util.inspect(writeErr)); next(writeErr); }); }, - function migrateSchemaForDataMigrations(ctx, next) { - var morayBucketsInitializer; - var morayClient; - var moraySetup = morayInit.startMorayInit({ - morayConfig: common.config.moray, - morayBucketsConfig: TEST_BUCKETS_CONFIG_V2, - changefeedPublisher: changefeedUtils.createNoopCfPublisher() - }); - var nextOnce = once(next); - - ctx.moray = moraySetup.moray; - ctx.morayBucketsInitializer = morayBucketsInitializer = - moraySetup.morayBucketsInitializer; - ctx.morayClient = morayClient = moraySetup.morayClient; - - function cleanUp() { - morayBucketsInitializer.removeAllListeners('error'); - morayBucketsInitializer.removeAllListeners('done'); - } - - morayBucketsInitializer.on('done', function onMorayBucketsInit() { - t.ok(true, 'migration of moray buckets should be successful'); - - cleanUp(); - nextOnce(); - }); - - morayBucketsInitializer.on('error', - function onMorayBucketsInitError(morayBucketsInitErr) { - t.ok(!morayBucketsInitErr, - 'moray buckets migration should not error, got: ' + - morayBucketsInitErr); - - cleanUp(); - nextOnce(morayBucketsInitErr); - }); - }, - function loadDataMigrations(ctx, next) { - var dataMigrationsLoaderLogger = bunyan.createLogger({ - name: 'data-migrations-loader', - level: 'info', - serializers: restify.bunyan.serializers - }); - - dataMigrationsLoader.loadMigrations({ - log: dataMigrationsLoaderLogger, - migrationsRootPath: path.resolve(__dirname, 'fixtures', - 'data-migrations-valid') - }, function onMigrationsLoaded(loadMigrationsErr, migrations) { - ctx.migrations = migrations; - next(loadMigrationsErr); - }); - }, - function createMigrationsController(ctx, next) { - assert.object(ctx.migrations, 'ctx.migrations'); - assert.object(ctx.moray, 'ctx.moray'); - - ctx.dataMigrationsCtrl = new DataMigrationsController({ - log: bunyan.createLogger({ - name: 'data-migratons-controller', - level: 'info', - serializers: restify.bunyan.serializers - }), - migrations: ctx.migrations, - moray: ctx.moray - }); - - next(); - }, function startVmapiService(ctx, next) { ctx.vmapiApp = new VmapiApp({ apiClients: { wfapi: MOCKED_WFAPI_CLIENT }, changefeedPublisher: changefeedUtils.createNoopCfPublisher(), - dataMigrationsCtrl: ctx.dataMigrationsCtrl, metricsManager: MOCKED_METRICS_MANAGER, morayBucketsInitializer: ctx.morayBucketsInitializer, moray: ctx.moray @@ -329,53 +232,6 @@ exports.data_migrations = function (t) { next(); }); }, - function checkDataMigrationsNoneStarted(ctx, next) { - assert.object(ctx.vmapiClient, 'ctx.vmapiClient'); - - ctx.vmapiClient.ping(function onVmapiPing(pingErr, obj, req, res) { - t.ok(!pingErr, 'pinging VMAPI when data migrations have not ' + - 'started yet should not error'); - t.ok(obj, 'pinging VMAPI when data migrations have not ' + - 'started should return a non-empty response'); - if (obj) { - t.ok(obj.dataMigrations && - obj.dataMigrations.latestCompletedMigrations, - 'ping response should have a ' + - 'dataMigrations.latestCompletedMigrations ' + - 'property'); - } - next(); - }); - }, - function injectTransientError(ctx, next) { - ctx.originalPutBatch = ctx.moray.putBatch; - ctx.moray.putBatch = - function mockedPutBatch(modelName, records, callback) { - assert.string(modelName, 'modelName'); - assert.arrayOfObject(records, 'records'); - assert.func(callback, 'callback'); - - callback(new Error(TRANSIENT_ERROR_MSG)); - }; - next(); - }, - function startMigrations(ctx, next) { - ctx.dataMigrationsCtrl.start(); - - ctx.dataMigrationsCtrl.once('done', - function onDataMigrationsDone() { - t.ok(false, 'data migrations should not complete when ' + - 'transient error injected'); - }); - - ctx.dataMigrationsCtrl.once('error', - function onDataMigrationsError(dataMigrationErr) { - t.ok(false, 'data migrations should not error when ' + - 'transient error injected'); - }); - - next(); - }, function checkDataMigrationsTransientError(ctx, next) { var MAX_NUM_TRIES; /* @@ -399,26 +255,29 @@ exports.data_migrations = function (t) { ctx.vmapiClient.ping(function onPing(pingErr, obj, req, res) { var foundExpectedErrMsg; var latestVmsMigrationsErr; - - console.log('pingErr:', pingErr); - console.log('obj:', obj); + var morayInitStatus; t.ok(!pingErr, 'pinging VMAPI when data migrations fail ' + 'should return a non-error status, got: ' + pingErr); t.ok(obj, 'pinging VMAPI when data migrations fail ' + 'should return a non-empty response, got: ' + obj); - if (obj.dataMigrations && - obj.dataMigrations.latestErrors && - obj.dataMigrations.latestErrors.vms) { + + if (obj) { + morayInitStatus = obj.initialization.moray; + } + + if (morayInitStatus && morayInitStatus.dataMigrations && + morayInitStatus.dataMigrations.latestErrors && + morayInitStatus.dataMigrations.latestErrors.vms) { latestVmsMigrationsErr = - obj.dataMigrations.latestErrors.vms; + morayInitStatus.dataMigrations.latestErrors.vms; foundExpectedErrMsg = latestVmsMigrationsErr.indexOf(TRANSIENT_ERROR_MSG) !== -1; t.ok(foundExpectedErrMsg, 'data migrations latest error should include ' + TRANSIENT_ERROR_MSG + ', got: ' + - obj.dataMigrations.latestErrors.vms); + latestVmsMigrationsErr); next(); } else { if (NUM_TRIES >= MAX_NUM_TRIES) { @@ -452,51 +311,25 @@ exports.data_migrations = function (t) { }); }, function removeTransientError(ctx, next) { - ctx.dataMigrationsCtrl.removeAllListeners('done'); - ctx.dataMigrationsCtrl.removeAllListeners('error'); + ctx.morayBucketsInitializer.removeAllListeners('done'); + ctx.morayBucketsInitializer.removeAllListeners('error'); - ctx.moray.putBatch = ctx.originalPutBatch; + ctx.morayClient.batch = ctx.originalBatch; - ctx.dataMigrationsCtrl.once('done', - function onDataMigrationsDone() { + ctx.morayBucketsInitializer.once('done', + function onBucketsInitDone() { t.ok(true, - 'data migration should eventually complete ' + - 'successfully'); + 'Moray buckets init should eventually complete ' + + 'successfully after removing transient error'); next(); }); - ctx.dataMigrationsCtrl.once('error', - function onDataMigrationsError(dataMigrationErr) { - t.ok(false, 'data migrations should not error, got: ', - util.inspect(dataMigrationErr)); - next(dataMigrationErr); - }); - }, - function readTestObjects(ctx, next) { - assert.object(ctx.morayClient, 'ctx.morayClient'); - - findAllObjects(ctx.morayClient, VMS_BUCKET_NAME, '(foo=*)', - function onFindAllObjects(findErr, objects) { - var nonMigratedObjects; - - t.ok(!findErr, - 'reading all objects back should not error, got: ' + - util.inspect(findErr)); - t.ok(objects, - 'reading all objects should not return empty response'); - - if (objects) { - nonMigratedObjects = - objects.filter(function checkObjects(object) { - return object.value.bar !== 'foo'; - }); - t.equal(nonMigratedObjects.length, 0, - 'data migrations should have migrated all objects' + - ', got the following non-migrated objects: ' + - nonMigratedObjects.join(', ')); - } - - next(findErr); + ctx.morayBucketsInitializer.once('error', + function onBucketsINitError(bucketsInitErr) { + t.ok(false, 'Moray buckets init should not error after ' + + 'removing transient error, got: ', + util.inspect(bucketsInitErr)); + next(bucketsInitErr); }); }, function checkDataMigrationsDone(ctx, next) { @@ -505,15 +338,22 @@ exports.data_migrations = function (t) { assert.object(ctx.vmapiClient, 'ctx.vmapiClient'); ctx.vmapiClient.ping(function onVmapiPing(pingErr, obj, req, res) { + var latestCompletedMigrations; + var morayInitStatus; + t.ok(!pingErr, 'ping VMAPI when data migrations suceeded ' + 'should not error, got: ' + pingErr); t.ok(obj, 'pinging VMAPI when data migrations succeeded ' + 'should return a non-empty response'); - if (obj && - obj.dataMigrations && - obj.dataMigrations.latestCompletedMigrations) { - t.equal(obj.dataMigrations.latestCompletedMigrations.vms, + if (obj) { + morayInitStatus = obj.initialization.moray; + } + + if (morayInitStatus && morayInitStatus.dataMigrations) { + latestCompletedMigrations = + morayInitStatus.dataMigrations.completed; + t.equal(latestCompletedMigrations.vms, latestExpectedCompletedVmsMigration, 'latest completed data migration for vms model ' + 'should be at version ' + @@ -526,6 +366,53 @@ exports.data_migrations = function (t) { next(); }); + }, + /* + * Eventually, when all buckets caches are refreshed on all Moray + * instances, and since we know all data migrations succeeded, we should + * be able to search on the internal_metadata field. + */ + function checkInternalMetadataSearchSuccess(ctx, next) { + var expectedErrMsg = 'invalid filter'; + var MAX_NUM_TRIES; + /* + * We wait for the moray bucket cache to be refreshed on all Moray + * instances, which can be up to 5 minutes currently, and then some. + * This is the maximum delay during which InvalidQueryError can + * occur due to stale buckets cache. + */ + var MAX_TRIES_DURATION_IN_MS = 6 * 60 * 1000; + var NUM_TRIES = 0; + var RETRY_DELAY_IN_MS = 10000; + + MAX_NUM_TRIES = MAX_TRIES_DURATION_IN_MS / RETRY_DELAY_IN_MS; + + assert.object(ctx.vmapiClient, 'ctx.vmapiClient'); + + function listVmsWithInternalMetadataFilter() { + ++NUM_TRIES; + + ctx.vmapiClient.listVms({'internal_metadata.foo': 'bar'}, + function onListVms(listVmsErr, vms, req, res) { + if (listVmsErr && NUM_TRIES < MAX_NUM_TRIES && + listVmsErr.body && listVmsErr.body.message && + listVmsErr.body.message.indexOf(expectedErrMsg) !== + -1) { + t.ok(true, 'Got expected transient error, ' + + 'retrying in ' + RETRY_DELAY_IN_MS + 'ms...'); + setTimeout(listVmsWithInternalMetadataFilter, + RETRY_DELAY_IN_MS); + } else { + t.ok(!listVmsErr, + 'searching on internal_metadata when the ' + + 'corresponding data migration has ' + + 'completed should not error'); + next(); + } + }); + } + + listVmsWithInternalMetadataFilter(); } ]}, function allMigrationsDone(allMigrationsErr) { t.ok(!allMigrationsErr, 'data migrations test should not error'); @@ -545,180 +432,3 @@ exports.data_migrations = function (t) { t.done(); }); }; - -exports.data_migrations_non_transient_error = function (t) { - var context = {}; - - vasync.pipeline({arg: context, funcs: [ - function cleanup(ctx, next) { - testMoray.cleanupLeftoverBuckets([ - VMS_BUCKET_NAME, - SERVER_VMS_BUCKET_NAME, - ROLE_TAGS_BUCKET_NAME - ], - function onCleanupLeftoverBuckets(cleanupErr) { - t.ok(!cleanupErr, - 'cleaning up leftover buckets should be successful'); - next(cleanupErr); - }); - }, - function setupMorayBuckets(ctx, next) { - var morayBucketsInitializer; - var morayClient; - var moraySetup = morayInit.startMorayInit({ - morayConfig: common.config.moray, - morayBucketsConfig: TEST_BUCKETS_CONFIG_V1, - changefeedPublisher: changefeedUtils.createNoopCfPublisher() - }); - var nextOnce = once(next); - - ctx.moray = moraySetup.moray; - ctx.morayBucketsInitializer = morayBucketsInitializer = - moraySetup.morayBucketsInitializer; - ctx.morayClient = morayClient = moraySetup.morayClient; - - function cleanUp() { - morayBucketsInitializer.removeAllListeners('error'); - morayBucketsInitializer.removeAllListeners('done'); - } - - morayBucketsInitializer.on('done', function onMorayBucketsInit() { - t.ok(true, - 'original moray buckets setup should be ' + - 'successful'); - - cleanUp(); - nextOnce(); - }); - - morayBucketsInitializer.on('error', - function onMorayBucketsInitError(morayBucketsInitErr) { - t.ok(!morayBucketsInitErr, - 'original moray buckets initialization should ' + - 'not error'); - - cleanUp(); - nextOnce(morayBucketsInitErr); - }); - }, - function writeTestObjects(ctx, next) { - assert.object(ctx.morayClient, 'ctx.morayClient'); - - testMoray.writeObjects(ctx.morayClient, VMS_BUCKET_NAME, { - foo: 'foo' - }, NUM_TEST_OBJECTS, function onTestObjectsWritten(writeErr) { - ctx.morayClient.close(); - - t.ok(!writeErr, 'writing test objects should not error, got: ' + - util.inspect(writeErr)); - next(writeErr); - }); - }, - function migrateSchemaForDataMigrations(ctx, next) { - var morayBucketsInitializer; - var morayClient; - var moraySetup = morayInit.startMorayInit({ - morayConfig: common.config.moray, - morayBucketsConfig: TEST_BUCKETS_CONFIG_V2, - changefeedPublisher: changefeedUtils.createNoopCfPublisher() - }); - var nextOnce = once(next); - - ctx.moray = moraySetup.moray; - ctx.morayBucketsInitializer = morayBucketsInitializer = - moraySetup.morayBucketsInitializer; - ctx.morayClient = morayClient = moraySetup.morayClient; - - function cleanUp() { - morayBucketsInitializer.removeAllListeners('error'); - morayBucketsInitializer.removeAllListeners('done'); - } - - morayBucketsInitializer.on('done', function onMorayBucketsInit() { - t.ok(true, 'migration of moray buckets should be successful'); - - cleanUp(); - nextOnce(); - }); - - morayBucketsInitializer.on('error', - function onMorayBucketsInitError(morayBucketsInitErr) { - t.ok(!morayBucketsInitErr, - 'moray buckets migration should not error, got: ' + - morayBucketsInitErr); - - cleanUp(); - nextOnce(morayBucketsInitErr); - }); - }, - function loadDataMigrations(ctx, next) { - var dataMigrationsLoaderLogger = bunyan.createLogger({ - name: 'data-migrations-loader', - level: 'info', - serializers: restify.bunyan.serializers - }); - - dataMigrationsLoader.loadMigrations({ - log: dataMigrationsLoaderLogger, - migrationsRootPath: path.resolve(__dirname, 'fixtures', - 'data-migrations-valid') - }, function onMigrationsLoaded(loadMigrationsErr, migrations) { - ctx.migrations = migrations; - next(loadMigrationsErr); - }); - }, - function injectNonTransientError(ctx, next) { - ctx.originalPutBatch = ctx.moray.putBatch; - ctx.moray.putBatch = - function mockedPutBatch(modelName, records, callback) { - assert.string(modelName, 'modelName'); - assert.arrayOfObject(records, 'records'); - assert.func(callback, 'callback'); - - callback(new VError({ - name: 'BucketNotFoundError' - }, 'non-transient error')); - }; - next(); - }, - function startMigrations(ctx, next) { - assert.object(ctx.migrations, 'ctx.migrations'); - assert.object(ctx.moray, 'ctx.moray'); - - ctx.dataMigrationsCtrl = new DataMigrationsController({ - log: bunyan.createLogger({ - name: 'data-migratons-controller', - level: 'info', - serializers: restify.bunyan.serializers - }), - migrations: ctx.migrations, - moray: ctx.moray - }); - - ctx.dataMigrationsCtrl.start(); - - ctx.dataMigrationsCtrl.once('done', - function onDataMigrationsDone() { - t.ok(false, 'data migration should not complete when ' + - 'non-transient error injected'); - }); - - ctx.dataMigrationsCtrl.once('error', - function onDataMigrationsError(dataMigrationErr) { - t.ok(true, 'data migrations should error when ' + - 'non-transient error injected, got: ' + - dataMigrationErr.toString()); - next(); - }); - } - ]}, function allMigrationsDone(allMigrationsErr) { - t.equal(allMigrationsErr, undefined, - 'data migrations test should not error'); - - if (context.morayClient) { - context.morayClient.close(); - } - - t.done(); - }); -}; diff --git a/test/vms.reindex-moray-bucket-transient-error.test.js b/test/vms.reindex-moray-bucket-transient-error.test.js index e13b6d5a..b064d498 100644 --- a/test/vms.reindex-moray-bucket-transient-error.test.js +++ b/test/vms.reindex-moray-bucket-transient-error.test.js @@ -10,12 +10,10 @@ /* * This test is about making sure that, when a transient error is encountered by - * the moray buckets reindexing process, the process is retried until that error - * is resolved. This test also makes sure that, in the meantime, VMAPI's /ping - * endpoint responds with an "OK" status, but still includes the reindexing - * error in its moray initialization status. Finally, it also makes sure that - * listing VMs succeeds while the reindexing process encounters transient - * errors. + * the moray buckets reindexing process, VMAPI's /ping endpoint responds with an + * "OK" status, but still includes the reindexing error in its moray + * initialization status. Finally, it also makes sure that listing VMs succeeds + * while the reindexing process encounters transient errors. */ var assert = require('assert-plus'); @@ -29,46 +27,15 @@ var vasync = require('vasync'); var changefeedUtils = require('../lib/changefeed'); var common = require('./common'); var morayInit = require('../lib/moray/moray-init'); -var NoopDataMigrationsController = - require('../lib/data-migrations/noop-controller'); var VmapiApp = require('../lib/vmapi'); var TRANSIENT_ERROR_MSG = 'Mocked transient error'; -var VMS_BUCKET_CONFIG = { - name: 'test_vmapi_vms_reindex_transient_error', - schema: { - index: { - uuid: { type: 'string', unique: true} - } - } -}; - -var SERVER_VMS_MORAY_BUCKET_CONFIG = { - name: 'test_vmapi_server_vms_reindex_transient_error', - schema: {} -}; - -var ROLE_TAGS_MORAY_BUCKET_CONFIG = { - name: 'test_vmapi_vm_role_tags_reindex_transient_error', - schema: { - index: { - role_tags: { type: '[string]' } - } - } -}; - -var MORAY_BUCKETS_CONFIG = { - vms: VMS_BUCKET_CONFIG, - server_vms: SERVER_VMS_MORAY_BUCKET_CONFIG, - vm_role_tags: ROLE_TAGS_MORAY_BUCKET_CONFIG -}; - exports.moray_init_transient_error = function (t) { + var moray; var morayBucketsInitializer; var morayClient; - var moray; - var origMorayReindexBucket; + var origMorayReindexObjects; var mockedMetricsManager = { update: function () {} @@ -85,18 +52,46 @@ exports.moray_init_transient_error = function (t) { var vmapiClient; vasync.pipeline({funcs: [ - function initMorayStorage(arg, next) { + function initMorayWithTransientError(arg, next) { var moraySetup = morayInit.startMorayInit({ morayConfig: common.config.moray, - morayBucketsConfig: MORAY_BUCKETS_CONFIG, changefeedPublisher: changefeedUtils.createNoopCfPublisher() }); + moray = moraySetup.moray; morayBucketsInitializer = moraySetup.morayBucketsInitializer; morayClient = moraySetup.morayClient; - moray = moraySetup.moray; - origMorayReindexBucket = moray._reindexBucket; + origMorayReindexObjects = morayClient.reindexObjects; + + /* + * Monkey patch the Moray client's "reindexObjects" method to inject + * a transient error, so that we can test that VMAPI API behave + * correctly in that case. + */ + morayClient.reindexObjects = + function _mockedReindexObjects(bucketName, nbObjs, callback) { + assert.string(bucketName, 'bucketName'); + assert.number(nbObjs, 'nbObjs'); + assert.func(callback, 'callback'); + + callback(new Error(TRANSIENT_ERROR_MSG)); + }; + + morayBucketsInitializer.once('done', onMorayBucketsInitDone); + morayBucketsInitializer.once('error', onMorayBucketsInitError); + + function onMorayBucketsInitDone() { + t.ok(false, 'moray buckets init should not complete when ' + + 'transient error injected'); + morayBucketsInitializer.removeAllListeners('error'); + } + + function onMorayBucketsInitError(morayBucketsInitError) { + t.ok(false, 'moray buckets init should not error when ' + + 'transient error injected'); + morayBucketsInitializer.removeAllListeners('done'); + } next(); }, @@ -106,10 +101,9 @@ exports.moray_init_transient_error = function (t) { wfapi: mockedWfapiClient }, changefeedPublisher: changefeedUtils.createNoopCfPublisher(), - dataMigrationsCtrl: new NoopDataMigrationsController(), metricsManager: mockedMetricsManager, - morayBucketsInitializer: morayBucketsInitializer, - moray: moray + moray: moray, + morayBucketsInitializer: morayBucketsInitializer }); next(); @@ -129,34 +123,6 @@ exports.moray_init_transient_error = function (t) { next(); }); }, - function initMorayWithTransientError(arg, next) { - /* - * Monkey patch VMAPI's moray layer "_reindexBucket" method to - * inject a transient error, so that we can test that the moray - * initializer and the VMAPI API behave correctly in that case. - */ - moray._reindexBucket = - function _reindexBucket(bucketName, callback) { - callback(new Error(TRANSIENT_ERROR_MSG)); - }; - - morayBucketsInitializer.once('done', onMorayBucketsInitDone); - morayBucketsInitializer.once('error', onMorayBucketsInitError); - - function onMorayBucketsInitDone() { - t.ok(false, 'moray buckets init should not complete when ' + - 'transient error injected'); - morayBucketsInitializer.removeAllListeners('error'); - } - - function onMorayBucketsInitError(morayBucketsInitError) { - t.ok(false, 'moray buckets init should not error when ' + - 'transient error injected'); - morayBucketsInitializer.removeAllListeners('done'); - } - - next(); - }, function checkMorayStatusWithTransientErr(arg, next) { var nbVmapiStatusCheckSoFar = 0; var MAX_NB_VMAPI_STATUS_CHECKS = 10; @@ -172,15 +138,22 @@ exports.moray_init_transient_error = function (t) { * we expect the "healthiness" to be false. */ var expectedHealthiness = false; - var expectedMorayInitStatus = 'BUCKETS_SETUP_DONE'; var expectedStatus = 'OK'; + var morayInitStatus; + var overallHealthy; + var overallStatus; + + if (obj) { + morayInitStatus = obj.initialization.moray; + overallHealthy = obj.healthy; + overallStatus = obj.status; + } - if (obj && - obj.status === expectedStatus && - obj.healthy === expectedHealthiness && - obj.initialization.moray.status === - expectedMorayInitStatus && - obj.initialization.moray.error === + if (overallStatus === expectedStatus && + overallHealthy === expectedHealthiness && + morayInitStatus.bucketsSetup.state === 'DONE' && + morayInitStatus.bucketsReindex.state === 'ERROR' && + morayInitStatus.bucketsReindex.latestError === expectedErrString) { callback(true); } else { @@ -244,12 +217,12 @@ exports.moray_init_transient_error = function (t) { morayBucketsInitializer.once('error', onMockedMorayBucketsInitFailed); - moray._reindexBucket = origMorayReindexBucket; + morayClient.reindexObjects = origMorayReindexObjects; function onMockedMorayBucketsInitDone() { vmapiClient.ping(function onVmapiPing(pingErr, obj, req, res) { - var actualMorayInitStatus = obj.initialization.moray.status; - var expectedMorayInitStatus = 'BUCKETS_REINDEX_DONE'; + var actualMorayInitStatus = obj.initialization.moray; + var expectedMorayReindexStatus = 'DONE'; var expectedResponseHttpStatus = 200; t.equal(res.statusCode, expectedResponseHttpStatus, @@ -258,11 +231,13 @@ exports.moray_init_transient_error = function (t) { t.equal(pingErr, null, 'ping endpoint should not ' + 'error when no error injected in moray ' + 'initialization'); - t.equal(actualMorayInitStatus, expectedMorayInitStatus, + t.equal(actualMorayInitStatus.bucketsReindex.state, + expectedMorayReindexStatus, 'Moray initialization status should be: ' + - expectedMorayInitStatus + ' and is: ' + - actualMorayInitStatus); - t.equal(obj.initialization.moray.error, undefined, + expectedMorayReindexStatus + ' and is: ' + + actualMorayInitStatus.bucketsReindex.state); + t.equal(actualMorayInitStatus.bucketsReindex.latestError, + undefined, 'Moray initialization status should have no error'); morayBucketsInitializer.removeAllListeners('error'); diff --git a/test/vms.update-moray-bucket-non-transient-error.test.js b/test/vms.update-moray-bucket-non-transient-error.test.js deleted file mode 100644 index 2733c3a6..00000000 --- a/test/vms.update-moray-bucket-non-transient-error.test.js +++ /dev/null @@ -1,207 +0,0 @@ -/* - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ - -/* - * Copyright (c) 2018, Joyent, Inc. - */ - -/* - * This test is about making sure that, when a non transient error is - * encountered while setting up moray buckets, the MorayBucketsInitializer - * instance emits an error event. Not handling that error event would make the - * process exit , which is what we want to happen when running the VMAPI - * service. - */ - -var jsprim = require('jsprim'); -var Logger = require('bunyan'); -var VMAPI = require('sdc-clients').VMAPI; -var path = require('path'); -var restify = require('restify'); -var vasync = require('vasync'); - -var changefeedUtils = require('../lib/changefeed'); -var common = require('./common'); -var morayInit = require('../lib/moray/moray-init'); -var NoopDataMigrationsController = - require('../lib/data-migrations/noop-controller'); -var testMoray = require('./lib/moray'); -var VmapiApp = require('../lib/vmapi'); - -var VMS_BUCKET_CONFIG_WITH_ERROR = { - name: 'test_vmapi_vms_non_transient_error', - schema: { - index: { - uuid: { type: 'string', unique: true}, - owner_uuid: { type: 'string' }, - image_uuid: { type: 'string' }, - billing_id: { type: 'string' }, - server_uuid: { type: 'string' }, - package_name: { type: 'string' }, - package_version: { type: 'string' }, - tags: { type: 'string' }, - brand: { type: 'string' }, - state: { type: 'string' }, - alias: { type: 'string' }, - max_physical_memory: { type: 'number' }, - create_timestamp: { type: 'number' }, - /* - * The typo in "booleaan" is intentional: it is used to trigger what - * we consider to be a non-transient error when setting up VMAPI's - * moray buckets, and test that the moray buckets setup process - * handles this error appropriately, in that case by emitting an - * 'error' event. - */ - docker: { type: 'booleaan' } - }, - options: { - version: 1 - } - } -}; - -var SERVER_VMS_MORAY_BUCKET_CONFIG = { - name: 'test_vmapi_server_vms_non_transient_error', - schema: {} -}; - -var ROLE_TAGS_MORAY_BUCKET_CONFIG = { - name: 'test_vmapi_vm_role_tags_non_transient_error', - schema: { - index: { - role_tags: { type: '[string]' } - } - } -}; - -var morayBucketsConfigWithError = { - vms: VMS_BUCKET_CONFIG_WITH_ERROR, - server_vms: SERVER_VMS_MORAY_BUCKET_CONFIG, - vm_role_tags: ROLE_TAGS_MORAY_BUCKET_CONFIG -}; - -exports.moray_init_non_transient_error = function (t) { - var mockedMetricsManager = { - update: function () {} - }; - - var mockedWfapiClient = { - connected: true, - connect: function mockedWfapiConnect(callback) { - callback(); - } - }; - var morayBucketsInitializer; - var morayClient; - var moray; - var vmapiApp; - var vmapiClient; - - vasync.pipeline({funcs: [ - function cleanLeftoverTestBuckets(arg, next) { - testMoray.cleanupLeftoverBuckets([ - morayBucketsConfigWithError.vms.name, - morayBucketsConfigWithError.server_vms.name, - morayBucketsConfigWithError.vm_role_tags.name - ], - function onCleanupLeftoverBuckets(cleanupErr) { - t.ifError(cleanupErr, - 'cleaning up leftover buckets should be successful'); - next(cleanupErr); - }); - }, - function initMorayStorage(arg, next) { - var moraySetup = morayInit.startMorayInit({ - morayConfig: common.config.moray, - morayBucketsConfig: morayBucketsConfigWithError, - changefeedPublisher: changefeedUtils.createNoopCfPublisher() - }); - - morayBucketsInitializer = moraySetup.morayBucketsInitializer; - morayClient = moraySetup.morayClient; - moray = moraySetup.moray; - - morayBucketsInitializer.on('error', - function onMorayBucketsInitError(morayBucketsInitErr) { - t.ok(morayBucketsInitErr, - 'moray initialization should error'); - next(); - }); - }, - function initVmapi(arg, next) { - vmapiApp = new VmapiApp({ - apiClients: { - wfapi: mockedWfapiClient - }, - changefeedPublisher: changefeedUtils.createNoopCfPublisher(), - dataMigrationsCtrl: new NoopDataMigrationsController(), - metricsManager: mockedMetricsManager, - morayBucketsInitializer: morayBucketsInitializer, - moray: moray - }); - - next(); - }, - function listenOnVmapiServer(arg, next) { - vmapiApp.listen({ - port: 0 - }, next); - } - ]}, function onVmapiServiceReady(initErr) { - var vmapiServerAddress; - var vmapiServerUrl; - - t.ifError(initErr, 'initialization of VMAPI app and its dependencies ' + - 'should be successful'); - - if (initErr) { - t.done(); - return; - } - - vmapiServerAddress = vmapiApp.server.address(); - vmapiServerUrl = 'http://' + vmapiServerAddress.address + - ':' + vmapiServerAddress.port; - - vmapiClient = new VMAPI({ - url: vmapiServerUrl - }); - - vmapiClient.ping(function onVmapiPing(pingErr, obj, req, res) { - var errBody = pingErr.body; - var expectedErrString = 'bucket.index[\'docker\'].type should be ' + - 'equal to one of the allowed values'; - var expectedHealthiness = false; - var expectedResponseHttpStatus = 503; - var expectedStatus = 'some services are not ready'; - var morayInitError; - - console.log('errBody:', errBody); - - t.equal(res.statusCode, expectedResponseHttpStatus, - 'Response\'s HTTP status code must be ' + - expectedResponseHttpStatus); - t.equal(errBody.status, - expectedStatus, 'status property of the error ' + - 'message should be equal to "' + expectedStatus + - '"'); - t.equal(errBody.healthy, expectedHealthiness, - 'healthy property of the error message should be "' + - expectedHealthiness + '"'); - - morayInitError = errBody.initialization.moray.error; - t.ok(morayInitError.indexOf(expectedErrString) !== -1, - 'Error string for moray initialization error should ' + - 'contain: "' + expectedErrString + '", but is: ' + - morayInitError); - - vmapiClient.close(); - vmapiApp.close(); - morayClient.close(); - t.done(); - }); - }); -}; diff --git a/test/vms.update-moray-bucket-removes-index-fails.test.js b/test/vms.update-moray-bucket-removes-index-fails.test.js deleted file mode 100644 index 5151654e..00000000 --- a/test/vms.update-moray-bucket-removes-index-fails.test.js +++ /dev/null @@ -1,213 +0,0 @@ -/* - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ - -/* - * Copyright (c) 2018, Joyent, Inc. - */ - -/* - * This test is about making sure that, when a moray bucket is changed in a way - * that an index is removed, which is a backward incompatible change, the - * MorayBucketsInitializer instance emits an error event. - */ - -var jsprim = require('jsprim'); -var Logger = require('bunyan'); -var VMAPI = require('sdc-clients').VMAPI; -var path = require('path'); -var restify = require('restify'); -var vasync = require('vasync'); - -var changefeedUtils = require('../lib/changefeed'); -var common = require('./common'); -var morayInit = require('../lib/moray/moray-init'); -var NoopDataMigrationsController = - require('../lib/data-migrations/noop-controller'); -var testMoray = require('./lib/moray'); -var VmapiApp = require('../lib/vmapi'); - -var VMS_BUCKET_CONFIG_V0 = { - name: 'test_vmapi_vms_invalid_index_removal', - schema: { - index: { - uuid: { type: 'string', unique: true}, - some_index: { type: 'string' } - } - } -}; - -var VMS_BUCKET_CONFIG_V1 = { - name: 'test_vmapi_vms_invalid_index_removal', - schema: { - index: { - uuid: { type: 'string', unique: true} - }, - options: { - version: 1 - } - } -}; - -var SERVER_VMS_MORAY_BUCKET_CONFIG = { - name: 'test_vmapi_server_vms_invalid_index_removal', - schema: {} -}; - -var ROLE_TAGS_MORAY_BUCKET_CONFIG = { - name: 'test_vmapi_vm_role_tags_invalid_index_removal', - schema: { - index: { - role_tags: { type: '[string]' } - } - } -}; - -var morayBucketsConfigV0 = { - vms: VMS_BUCKET_CONFIG_V0, - server_vms: SERVER_VMS_MORAY_BUCKET_CONFIG, - vm_role_tags: ROLE_TAGS_MORAY_BUCKET_CONFIG -}; - -var morayBucketsConfigV1 = { - vms: VMS_BUCKET_CONFIG_V1, - server_vms: SERVER_VMS_MORAY_BUCKET_CONFIG, - vm_role_tags: ROLE_TAGS_MORAY_BUCKET_CONFIG -}; - -var morayBucketsInitializer; - -exports.moray_init_invalid_index_removal = function (t) { - var morayClient; - var moray; - var vmapiApp; - - var mockedMetricsManager = { - update: function () {} - }; - - var mockedWfapiClient = { - connected: true, - connect: function mockedWfapiConnect(callback) { - callback(); - } - }; - - var vmapiClient; - - vasync.pipeline({funcs: [ - function cleanLeftoverTestBuckets(arg, next) { - testMoray.cleanupLeftoverBuckets([ - morayBucketsConfigV0.vms.name, - morayBucketsConfigV0.server_vms.name, - morayBucketsConfigV0.vm_role_tags.name - ], - function onCleanupLeftoverBuckets(cleanupErr) { - t.ifError(cleanupErr, - 'cleaning up leftover buckets should be successful'); - next(cleanupErr); - }); - }, - function setupMorayWithBucketsFirstVersion(arg, next) { - var moraySetup = morayInit.startMorayInit({ - morayConfig: common.config.moray, - morayBucketsConfig: morayBucketsConfigV0, - changefeedPublisher: changefeedUtils.createNoopCfPublisher() - }); - - morayBucketsInitializer = moraySetup.morayBucketsInitializer; - morayClient = moraySetup.morayClient; - moray = moraySetup.moray; - - morayBucketsInitializer.on('done', - function onMorayBucketsInit() { - t.ok(true, - 'moray buckets initialization with correct ' + - 'configuration should be successfull'); - - morayClient.close(); - - next(); - }); - }, - function setupMorayWithIncorrectBucketsConfig(arg, next) { - var moraySetup = morayInit.startMorayInit({ - morayConfig: common.config.moray, - morayBucketsConfig: morayBucketsConfigV1, - changefeedPublisher: changefeedUtils.createNoopCfPublisher() - }); - - morayBucketsInitializer = moraySetup.morayBucketsInitializer; - morayClient = moraySetup.morayClient; - moray = moraySetup.moray; - - morayBucketsInitializer.on('error', - function onMorayBucketsInit() { - t.ok(true, - 'moray buckets initialization with incorrect ' + - 'configuration should error'); - - next(); - }); - }, - function initVmapi(arg, next) { - - vmapiApp = new VmapiApp({ - apiClients: { - wfapi: mockedWfapiClient - }, - changefeedPublisher: changefeedUtils.createNoopCfPublisher(), - dataMigrationsCtrl: new NoopDataMigrationsController(), - metricsManager: mockedMetricsManager, - morayBucketsInitializer: morayBucketsInitializer, - moray: moray - }); - - next(); - }, - function listenOnVmapiServer(arg, next) { - vmapiApp.listen({ - port: 0 - }, next); - } - ]}, function onVmapiServiceReady(initErr) { - var vmapiServerAddress = vmapiApp.server.address(); - var vmapiServerUrl = 'http://' + vmapiServerAddress.address + - ':' + vmapiServerAddress.port; - - vmapiClient = new VMAPI({ - url: vmapiServerUrl - }); - - vmapiClient.ping(function onVmapiPing(pingErr, obj, req, res) { - var errBody = pingErr.body; - var expectedErrString = - 'InvalidIndexesRemovalError: Invalid removal of ' + - 'indexes: some_index'; - var expectedHealthiness = false; - var expectedResponseHttpStatus = 503; - var expectedStatus = 'some services are not ready'; - - t.equal(res.statusCode, expectedResponseHttpStatus, - 'Response\'s HTTP status code must be ' + - expectedResponseHttpStatus); - t.equal(errBody.status, - expectedStatus, 'status property of the error ' + - 'message should be equal to "' + expectedStatus + - '"'); - t.equal(errBody.healthy, expectedHealthiness, - 'healthy property of the error message should be "' + - expectedHealthiness + '"'); - t.equal(errBody.initialization.moray.error, expectedErrString, - 'Error string for moray initialization error should ' + - 'be: "' + expectedErrString + '"'); - - vmapiClient.close(); - vmapiApp.close(); - morayClient.close(); - t.done(); - }); - }); -}; diff --git a/test/vms.update-moray-bucket-transient-error.test.js b/test/vms.update-moray-bucket-transient-error.test.js index a72c70a9..83a8ba93 100644 --- a/test/vms.update-moray-bucket-transient-error.test.js +++ b/test/vms.update-moray-bucket-transient-error.test.js @@ -10,8 +10,7 @@ /* * This test is about making sure that, when a transient error is encountered by - * the moray buckets setup process, the process is retried until that error is - * resolved and that, in the meantime, VMAPI's /ping endpoint responds with the + * the moray buckets setup process, VMAPI's /ping endpoint responds with the * proper status error. */ @@ -21,13 +20,12 @@ var Logger = require('bunyan'); var VMAPI = require('sdc-clients').VMAPI; var path = require('path'); var restify = require('restify'); +var util = require('util'); var vasync = require('vasync'); var changefeedUtils = require('../lib/changefeed'); var common = require('./common'); var morayInit = require('../lib/moray/moray-init'); -var NoopDataMigrationsController = - require('../lib/data-migrations/noop-controller'); var VmapiApp = require('../lib/vmapi'); var TRANSIENT_ERROR_MSG = 'Mocked transient error'; @@ -53,7 +51,7 @@ exports.moray_init_transient_error = function (t) { var vmapiClient; vasync.pipeline({funcs: [ - function initMorayStorage(arg, next) { + function initMorayStorageWithTransientError(arg, next) { var moraySetup = morayInit.startMorayInit({ morayConfig: common.config.moray, changefeedPublisher: changefeedUtils.createNoopCfPublisher() @@ -65,6 +63,31 @@ exports.moray_init_transient_error = function (t) { moray = moraySetup.moray; + /* + * Monkey patch moray client's getBucket method to inject a + * transient error, so that we can test that the moray initializer + * and the VMAPI API behave correctly in that case. + */ + morayClient.getBucket = + function mockedGetBucket(bucketName, callback) { + callback(new Error(TRANSIENT_ERROR_MSG)); + }; + + morayBucketsInitializer.once('done', onMorayBucketsInitDone); + morayBucketsInitializer.once('error', onMorayBucketsInitError); + + function onMorayBucketsInitDone() { + t.ok(false, 'moray buckets init should not complete when ' + + 'transient error injected'); + morayBucketsInitializer.removeAllListeners('error'); + } + + function onMorayBucketsInitError(morayBucketsInitError) { + t.ok(false, 'moray buckets init should not error when ' + + 'transient error injected'); + morayBucketsInitializer.removeAllListeners('done'); + } + next(); }, function initVmapi(arg, next) { @@ -73,7 +96,6 @@ exports.moray_init_transient_error = function (t) { wfapi: mockedWfapiClient }, changefeedPublisher: changefeedUtils.createNoopCfPublisher(), - dataMigrationsCtrl: new NoopDataMigrationsController(), metricsManager: mockedMetricsManager, morayBucketsInitializer: morayBucketsInitializer, moray: moray @@ -96,34 +118,6 @@ exports.moray_init_transient_error = function (t) { next(); }); }, - function initMorayWithTransientError(arg, next) { - /* - * Monkey patch moray client's getBucket method to inject a - * transient error, so that we can test that the moray initializer - * and the VMAPI API behave correctly in that case. - */ - morayClient.getBucket = - function mockedGetBucket(bucketName, callback) { - callback(new Error(TRANSIENT_ERROR_MSG)); - }; - - morayBucketsInitializer.once('done', onMorayBucketsInitDone); - morayBucketsInitializer.once('error', onMorayBucketsInitError); - - function onMorayBucketsInitDone() { - t.ok(false, 'moray buckets init should not complete when ' + - 'transient error injected'); - morayBucketsInitializer.removeAllListeners('error'); - } - - function onMorayBucketsInitError(morayBucketsInitError) { - t.ok(false, 'moray buckets init should not error when ' + - 'transient error injected'); - morayBucketsInitializer.removeAllListeners('done'); - } - - next(); - }, function checkMorayStatusWithTransientErr(arg, next) { var nbVmapiStatusCheckSoFar = 0; var MAX_NB_VMAPI_STATUS_CHECKS = 10; @@ -135,18 +129,22 @@ exports.moray_init_transient_error = function (t) { var expectedErrString = 'Error: ' + TRANSIENT_ERROR_MSG; var expectedHealthiness = false; var expectedStatus = 'some services are not ready'; - - console.log('pingErr:', pingErr); - - if (errBody && - errBody.status === expectedStatus && - errBody.healthy === expectedHealthiness && - errBody.initialization.moray.error === - expectedErrString) { - callback(true); - } else { - callback(false); + var morayInitStatus; + + if (errBody && errBody.initialization && + errBody.initialization.moray) { + morayInitStatus = errBody.initialization.moray; + + if (errBody.status === expectedStatus && + errBody.healthy === expectedHealthiness && + morayInitStatus.bucketsSetup.latestError === + expectedErrString) { + callback(true); + return; + } } + + callback(false); }); } @@ -217,6 +215,9 @@ exports.moray_init_transient_error = function (t) { function onMockedMorayBucketsSetup() { vmapiClient.ping(function onVmapiPing(pingErr, obj, req, res) { + console.log('pingErr 2:', pingErr); + console.log('obj 2:', obj); + var expectedResponseHttpStatus = 200; t.equal(res.statusCode, expectedResponseHttpStatus, diff --git a/test/vms.update-moray-bucket-versioning.test.js b/test/vms.update-moray-bucket-versioning.test.js deleted file mode 100644 index 274107a0..00000000 --- a/test/vms.update-moray-bucket-versioning.test.js +++ /dev/null @@ -1,520 +0,0 @@ -/* - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ - -/* - * Copyright (c) 2018, Joyent, Inc. - */ - -var assert = require('assert-plus'); -var jsprim = require('jsprim'); -var libuuid = require('libuuid'); -var once = require('once'); -var path = require('path'); -var restify = require('restify'); -var util = require('util'); -var vasync = require('vasync'); -var VMAPI = require('sdc-clients').VMAPI; - -var changefeedUtils = require('../lib/changefeed'); -var common = require('./common'); -var morayInit = require('../lib/moray/moray-init'); -var NoopDataMigrationsController = - require('../lib/data-migrations/noop-controller'); -var testMoray = require('./lib/moray.js'); -var VmapiApp = require('../lib/vmapi'); - -var MOCKED_METRICS_MANAGER = { - update: function () {} -}; - -var MOCKED_WFAPI_CLIENT = { - connected: true, - connect: function mockedWfapiConnect(callback) { - callback(); - } -}; - -var VMS_BUCKET_NAME = 'test_vmapi_vms_versioning'; -var SERVER_VMS_BUCKET_NAME = 'test_vmapi_server_vms_versioning'; -var ROLE_TAGS_BUCKET_NAME = 'test_vmapi_vm_role_tags_versioning'; - -/* - * Initial buckets configuration, version 0. - */ -var VMS_BUCKET_CONFIG_V0 = { - name: VMS_BUCKET_NAME, - schema: { - index: { - uuid: { type: 'string', unique: true } - } - } -}; - -var SERVER_VMS_MORAY_BUCKET_CONFIG_V0 = { - name: SERVER_VMS_BUCKET_NAME, - schema: {} -}; - -var ROLE_TAGS_MORAY_BUCKET_CONFIG_V0 = { - name: ROLE_TAGS_BUCKET_NAME, - schema: { - } -}; - -/* - * Buckets configuration at version 1: an index is added on the property named - * "indexed_property". The upgrade from version 0 to version 1 is valid. - */ -var VMS_BUCKET_CONFIG_V1 = { - name: VMS_BUCKET_NAME, - schema: { - index: { - uuid: { type: 'string', unique: true }, - indexed_property: { type: 'string' } - }, - options: { - version: 1 - } - } -}; - -var SERVER_VMS_MORAY_BUCKET_CONFIG_V1 = { - name: SERVER_VMS_BUCKET_NAME, - schema: {} -}; - -var ROLE_TAGS_MORAY_BUCKET_CONFIG_V1 = { - name: ROLE_TAGS_BUCKET_NAME, - schema: { - index: { - role_tags: { type: '[string]' } - } - } -}; - -/* - * Buckets configuration at version 2: an index is added on the property named - * "another_indexed_property". The upgrade from version 1 to version 2 is valid. - */ -var VMS_BUCKET_CONFIG_V2 = { - name: VMS_BUCKET_NAME, - schema: { - index: { - uuid: { type: 'string', unique: true }, - indexed_property: { type: 'string' }, - another_indexed_property: { type: 'string' } - }, - options: { - version: 2 - } - } -}; - -var SERVER_VMS_MORAY_BUCKET_CONFIG_V2 = { - name: SERVER_VMS_BUCKET_NAME, - schema: {} -}; - -var ROLE_TAGS_MORAY_BUCKET_CONFIG_V2 = { - name: ROLE_TAGS_BUCKET_NAME, - schema: { - index: { - role_tags: { type: '[string]' } - } - } -}; - -var testBucketsConfigV0 = { - vms: VMS_BUCKET_CONFIG_V0, - server_vms: SERVER_VMS_MORAY_BUCKET_CONFIG_V0, - vm_role_tags: ROLE_TAGS_MORAY_BUCKET_CONFIG_V0 -}; - -var testBucketsConfigV1 = { - vms: VMS_BUCKET_CONFIG_V1, - server_vms: SERVER_VMS_MORAY_BUCKET_CONFIG_V1, - vm_role_tags: ROLE_TAGS_MORAY_BUCKET_CONFIG_V1 -}; - -var testBucketsConfigV2 = { - vms: VMS_BUCKET_CONFIG_V2, - server_vms: SERVER_VMS_MORAY_BUCKET_CONFIG_V2, - vm_role_tags: ROLE_TAGS_MORAY_BUCKET_CONFIG_V2 -}; - -var NB_TEST_OBJECTS = 200; - -function getAllObjects(morayClient, bucketName, callback) { - assert.object(morayClient, 'morayClient'); - assert.string(bucketName, 'bucketName'); - assert.func(callback, 'callback'); - - var callbackOnce = once(callback); - var allRecords = []; - - var findAllObjectsReq = morayClient.sql('select _rver from ' + - VMS_BUCKET_NAME); - - findAllObjectsReq.once('error', function onSqlError(sqlErr) { - cleanup(); - callbackOnce(sqlErr); - }); - - findAllObjectsReq.on('record', function onRecord(record) { - allRecords.push(record); - }); - - findAllObjectsReq.once('end', function onGotAllRecords() { - cleanup(); - callbackOnce(null, allRecords); - }); - - function cleanup() { - findAllObjectsReq.removeAllListeners('error'); - findAllObjectsReq.removeAllListeners('record'); - findAllObjectsReq.removeAllListeners('end'); - } -} - -function testMigrationToBucketsConfig(bucketsConfig, options, t, callback) { - assert.object(bucketsConfig, 'bucketsConfig'); - assert.object(options, 'options'); - assert.arrayOfObject(options.expectedResults, 'options.expectedResults'); - assert.object(t, 't'); - assert.func(callback, 'callback'); - - var morayBucketsInitializer; - var morayClient; - var storage; - - var vmapiApp; - - vasync.pipeline({funcs: [ - function initMorayStorage(arg, next) { - var moraySetup = morayInit.startMorayInit({ - morayConfig: common.config.moray, - morayBucketsConfig: bucketsConfig, - changefeedPublisher: changefeedUtils.createNoopCfPublisher() - }); - - morayBucketsInitializer = moraySetup.morayBucketsInitializer; - morayClient = moraySetup.morayClient; - storage = moraySetup.moray; - - morayBucketsInitializer.on('done', - function onMorayBucketsInit() { - t.ok(true, - 'moray initialization should be successfull'); - next(); - }); - }, - /* - * After a moray bucket is migrated to a version that adds a new index, - * it is important to make sure that it's safe to use for both read and - * write operations. For instance, search filters will not work as - * expected when a bucket is being reindexed and putobject operations - * will also not use the updated bucket schema if they write to a row - * that hasn't been reindexed yet, leading to data corruption. - * - * To check that a bucket has been properly reindexed after an update, - * we need to check that: - * - * 1. The migrated bucket is at the expected version. - * - * 2. The 'reindex_active' column of the row representing the migrated - * bucket in the 'buckets_config'' table has a value representing an - * empty object. - * - * 3. All rows in the table storing the migrated bucket's data' have the - * expected version number. - */ - function checkBucketsAtExpectedVersion(arg, next) { - var expectedResults = options.expectedResults; - - vasync.forEachPipeline({ - func: function checkBucketVersion(expectedResult, done) { - assert.object(expectedResult, 'expectedResult'); - - var bucketName = expectedResult.bucketName; - assert.string(bucketName, 'bucketName'); - - var expectedVersion = expectedResult.version; - assert.number(expectedVersion, 'expectedVersion'); - - morayClient.getBucket(bucketName, - function onGetBucket(getBucketErr, bucket) { - t.equal(bucket.options.version, expectedVersion, - 'Bucket with name ' + bucketName + - ' should be at version ' + expectedVersion); - - done(); - }); - }, - inputs: expectedResults - }, next); - }, - function checkObjectsAtExpectedVersion(arg, next) { - var expectedResults = options.expectedResults; - - vasync.forEachPipeline({ - func: function checkObjectsVersion(expectedResult, done) { - assert.object(expectedResult, 'expectedResult'); - - var bucketName = expectedResult.bucketName; - assert.string(bucketName, 'bucketName'); - - var expectedVersion = expectedResult.version; - assert.number(expectedVersion, 'expectedVersion'); - - getAllObjects(morayClient, bucketName, - function onGetAllObjects(versionCheckErr, allRecords) { - var allRecordsAtExpectedVersion = false; - - t.strictEqual(allRecords.length, NB_TEST_OBJECTS, - NB_TEST_OBJECTS + ' records must have been ' + - 'checked'); - - allRecordsAtExpectedVersion = - allRecords.every(function checkVersion(record) { - assert.object(record, 'record'); - - return record._rver === expectedVersion; - }); - - t.ok(allRecordsAtExpectedVersion, - 'all records should be at version ' + - expectedVersion.version); - - done(); - }); - }, - inputs: expectedResults - }, function allVersionsChecked(err) { - next(err); - }); - }, - function checkNoBucketHasReindexingActive(arg, next) { - var expectedResults = options.expectedResults; - - vasync.forEachPipeline({ - func: function checkNoReindexingActive(expectedResult, done) { - var bucketName = expectedResult.bucketName; - assert.string(bucketName, 'bucketName'); - - morayClient.getBucket(bucketName, - function onGetVmBucket(getBucketErr, bucket) { - var reindexActive = - bucket.reindex_active !== undefined && - Object.keys(bucket.reindex_active) > 0; - - t.ok(!getBucketErr, 'Getting bucket ' + bucketName + - ' should not error'); - t.ok(!reindexActive, 'bucket ' + bucketName + - ' should not be reindexing'); - - done(); - }); - }, - inputs: expectedResults - }, next); - }, - function initVmapi(arg, next) { - vmapiApp = new VmapiApp({ - apiClients: { - wfapi: MOCKED_WFAPI_CLIENT - }, - changefeedPublisher: changefeedUtils.createNoopCfPublisher(), - dataMigrationsCtrl: new NoopDataMigrationsController(), - metricsManager: MOCKED_METRICS_MANAGER, - morayBucketsInitializer: morayBucketsInitializer, - moray: storage - }); - - next(); - }, - function listenOnVmapiServer(arg, next) { - vmapiApp.listen({ - port: 0 - }, next); - }, - function testPingEndpoint(arg, next) { - var vmapiClient; - - var vmapiServerAddress = vmapiApp.server.address(); - var vmapiServerUrl = 'http://' + vmapiServerAddress.address + - ':' + vmapiServerAddress.port; - - vmapiClient = new VMAPI({ - url: vmapiServerUrl - }); - - vmapiClient.ping(function onVmapiPing(pingErr, obj) { - var expectedErrValue = null; - var expectedHealthiness = true; - var expectedMorayInitStatus = 'BUCKETS_REINDEX_DONE'; - var expectedStatus = 'OK'; - - t.equal(pingErr, undefined, 'ping endpoint should not error'); - t.equal(obj.status, - expectedStatus, 'status property of the response ' + - 'message should be equal to "' + - expectedStatus + '"'); - t.equal(obj.healthy, expectedHealthiness, - 'healthy property of the response message should ' + - ' be"' + expectedHealthiness + '"'); - t.equal(obj.initialization.moray.error, expectedErrValue, - 'Error string for moray initialization error ' + - 'should be: "' + expectedErrValue + '"'); - t.equal(obj.initialization.moray.status, - expectedMorayInitStatus, - 'Error string for moray initialization error ' + - 'should be: "' + expectedErrValue + '"'); - - vmapiClient.close(); - - next(); - }); - } - ]}, function allMigrationTestsDone(migrationTestsErr) { - t.equal(migrationTestsErr, undefined, - 'migration test should not error'); - - if (vmapiApp) { - vmapiApp.close(); - } - - morayClient.close(); - - callback(); - }); -} - -exports.moray_init_bucket_versioning = function (t) { - vasync.pipeline({funcs: [ - function cleanup(arg, next) { - testMoray.cleanupLeftoverBuckets([ - VMS_BUCKET_NAME, - SERVER_VMS_BUCKET_NAME, - ROLE_TAGS_BUCKET_NAME - ], - function onCleanupLeftoverBuckets(cleanupErr) { - t.ok(!cleanupErr, - 'cleaning up leftover buckets should be successful'); - next(cleanupErr); - }); - }, - function setupOriginalMorayBuckets(arg, next) { - var morayBucketsInitializer; - var morayClient; - var moraySetup = morayInit.startMorayInit({ - morayConfig: common.config.moray, - morayBucketsConfig: testBucketsConfigV0, - changefeedPublisher: changefeedUtils.createNoopCfPublisher() - }); - var nextOnce = once(next); - - morayBucketsInitializer = moraySetup.morayBucketsInitializer; - morayClient = moraySetup.morayClient; - - function cleanUp() { - morayBucketsInitializer.removeAllListeners('error'); - morayBucketsInitializer.removeAllListeners('done'); - morayClient.close(); - } - - morayBucketsInitializer.on('done', - function onMorayBucketsInit() { - t.ok(true, - 'original moray buckets setup should be ' + - 'successful'); - - cleanUp(); - nextOnce(); - }); - - morayBucketsInitializer.on('error', - function onMorayBucketsInitError(morayBucketsInitErr) { - t.ok(!morayBucketsInitErr, - 'original moray buckets initialization should ' + - 'not error'); - - cleanUp(); - nextOnce(morayBucketsInitErr); - }); - }, - function writeTestObjects(arg, next) { - var morayBucketsInitializer; - var morayClient; - var moraySetup = morayInit.startMorayInit({ - morayConfig: common.config.moray, - morayBucketsConfig: testBucketsConfigV0, - changefeedPublisher: changefeedUtils.createNoopCfPublisher() - }); - - morayBucketsInitializer = moraySetup.morayBucketsInitializer; - morayClient = moraySetup.morayClient; - - morayBucketsInitializer.on('done', - function onMorayBucketsInitialized() { - testMoray.writeObjects(morayClient, VMS_BUCKET_NAME, { - indexed_property: 'foo' - }, NB_TEST_OBJECTS, function onTestObjectsWritten(err) { - t.ok(!err, 'writing test objects should not error'); - morayClient.close(); - next(err); - }); - }); - }, - /* - * First, migrate from version 0 to 1, which is a valid migration and - * results in the bucket storing VM objects to be at version 1. - */ - function migrateFromV0ToV1(arg, next) { - testMigrationToBucketsConfig(testBucketsConfigV1, { - expectedResults: [ - { - bucketName: VMS_BUCKET_NAME, - version: 1 - } - ] - }, t, next); - }, - /* - * Then, attempt to migrate from version 1 to 0 (a downgrade), which is - * a valid migration but results in the bucket storing VM objects to - * stay at version 1. - */ - function migrateFromV1ToV0(arg, next) { - testMigrationToBucketsConfig(testBucketsConfigV0, { - expectedResults: [ - { - bucketName: VMS_BUCKET_NAME, - version: 1 - } - ] - }, t, next); - }, - /* - * Finally, migrate from version 1 to 2, which is a valid migration and - * results in the bucket storing VM objects to be at version 2. - */ - function migrateFromV1ToV2(arg, next) { - testMigrationToBucketsConfig(testBucketsConfigV2, { - expectedResults: [ - { - bucketName: VMS_BUCKET_NAME, - version: 2 - } - ] - }, t, next); - } - ]}, function allMigrationsDone(allMigrationsErr) { - t.equal(allMigrationsErr, undefined, - 'versioning test should not error'); - - t.done(); - }); -}; diff --git a/tools/add-test-vms.js b/tools/add-test-vms.js index 991ac8b4..ba981af3 100755 --- a/tools/add-test-vms.js +++ b/tools/add-test-vms.js @@ -101,7 +101,7 @@ function addTestVms(nbVms, concurrency, data) { moray = moraySetup.moray; morayBucketsInitializer = moraySetup.morayBucketsInitializer; - morayBucketsInitializer.on('done', + morayBucketsInitializer.on('buckets-setup-done', function onMorayBucketsSetup() { log.debug('Number of test VMs to create:', nbVms); assert.number(nbVms); diff --git a/tools/fix-no-owner.js b/tools/fix-no-owner.js index 17acfaeb..75d2dd42 100644 --- a/tools/fix-no-owner.js +++ b/tools/fix-no-owner.js @@ -75,7 +75,7 @@ vasync.pipeline({funcs: [ next(morayBucketsInitErr); }); - morayBucketsInitializer.on('done', next); + morayBucketsInitializer.on('buckets-setup-done', next); }, function initWfApi(ctx, next) { wfapi = new WFAPI(config.wfapi); diff --git a/tools/kvm-backfill.js b/tools/kvm-backfill.js index 6974f1eb..f443a0d5 100644 --- a/tools/kvm-backfill.js +++ b/tools/kvm-backfill.js @@ -75,7 +75,7 @@ vasync.pipeline({funcs: [ next(morayBucketsSetupErr); }); - morayBucketsInitializer.on('done', + morayBucketsInitializer.on('buckets-setup-done', function onMorayBucketsInitDone() { next(); });