Skip to content

Commit

Permalink
Move nuclide-analytics to modules (3/6)
Browse files Browse the repository at this point in the history
Summary: Move ScribeProcess to modules

Reviewed By: ebluestein

Differential Revision: D10181552

fbshipit-source-id: af4f436258e43f2076cce1b343f0b5a1b4380186
  • Loading branch information
velocityboy authored and pelmers committed Nov 15, 2018
1 parent 6fa126f commit 37bfd88
Show file tree
Hide file tree
Showing 2 changed files with 323 additions and 0 deletions.
202 changes: 202 additions & 0 deletions modules/nuclide-commons/ScribeProcess.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/**
* Copyright (c) 2017-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
* @flow strict-local
* @format
*/

import {getLogger} from 'log4js';
import performanceNow from './performanceNow';
import os from 'os';
import {spawn} from './process';
import which from './which';
import once from './once';
import passesGK from './passesGK';

const DEFAULT_JOIN_TIMEOUT = 5000;
let SCRIBE_CAT_COMMAND = 'scribe_cat';

// On Mac OS, `scribe_cat` isn't quite the same as the server-side one:
// it only dumps its logs on exit. To make sure that logs are delivered
// in a timely manner, we'll periodically force-kill the process.
const DEFAULT_JOIN_INTERVAL = process.platform === 'darwin' ? 60000 : null;

// If spawning the Scribe process takes this long, disable it.
// Node sometimes runs into strange issues where spawning() starts to block.
// https://github.com/nodejs/node/issues/14917
const SPAWN_TOO_LONG_MS = 2000;

/**
* A wrapper of `scribe_cat` (https://github.com/facebookarchive/scribe/blob/master/examples/scribe_cat)
* command. User could call `new ScribeProcess($scribeCategoryName)` to create a process and then
* call `scribeProcess.write($object)` to save an JSON schemaed Object into scribe category.
* It will also recover from `scribe_cat` failure automatically.
*/
export default class ScribeProcess {
static _enabled: boolean = true;

_scribeCategory: string;
_childPromise: ?Promise<child_process$ChildProcess>;
_subscription: ?rxjs$ISubscription;
_joinTimer: ?TimeoutID;
_joinInterval: ?number;

constructor(
scribeCategory: string,
joinInterval: ?number = DEFAULT_JOIN_INTERVAL,
) {
this._scribeCategory = scribeCategory;
this._joinInterval = joinInterval;
this._getChildProcess();
}

/**
* Check if `scribe_cat` exists in PATH.
*/
static isScribeCatOnPath: () => Promise<boolean> = once(async () => {
const [whichCmd, gkEnabled] = await Promise.all([
which(SCRIBE_CAT_COMMAND),
process.platform === 'darwin'
? passesGK('nuclide_scribe_macos')
: Promise.resolve(true),
]);
return whichCmd != null && gkEnabled;
});

static isEnabled(): boolean {
return ScribeProcess._enabled;
}

/**
* Write a string to a Scribe category.
* Ensure newlines are properly escaped.
* Returns false if something is wrong with the Scribe process (use a fallback instead.)
*/
async write(message: string): Promise<boolean> {
if (!ScribeProcess._enabled) {
return false;
}
let child;
try {
child = await this._getChildProcess();
} catch (err) {
ScribeProcess._enabled = false;
// Note: Logging errors is potentially recursive, since they go through Scribe!
// It's important that we set _enabled before logging errors in this file.
getLogger('ScribeProcess').error(
'Disabling ScribeProcess due to spawn error:',
err,
);
return false;
}
await new Promise(resolve => {
child.stdin.write(`${message}${os.EOL}`, resolve);
});
return true;
}

/**
* Waits for the remaining messages to be written, then closes the write stream. Resolves once the
* process has exited. This method is called when the server shuts down in order to guarantee we
* capture logging during shutdown.
*/
async join(timeout: number = DEFAULT_JOIN_TIMEOUT): Promise<void> {
const {_childPromise, _subscription} = this;
if (_childPromise == null || _subscription == null) {
return;
}

// join() renders the existing process unusable.
// The next call to write() should create a new process, so clear out the references.
// Note that we stored them in local variables already above.
this._clear();

const child = await _childPromise;
const {stdin} = child;
const waitForExit = new Promise(resolve => {
child.on('exit', () => {
resolve();
});
setTimeout(() => {
_subscription.unsubscribe();
resolve();
}, timeout);
});
// Make sure stdin has drained before ending it.
if (!stdin.write(os.EOL)) {
stdin.once('drain', () => stdin.end());
} else {
stdin.end();
}
return waitForExit;
}

_getChildProcess(): Promise<child_process$ChildProcess> {
if (this._childPromise) {
return this._childPromise;
}

// Obtain a promise to get the child process, but don't start it yet.
// this._subscription will have control over starting / stopping the process.
const startTime = performanceNow();
const processStream = spawn(SCRIBE_CAT_COMMAND, [this._scribeCategory], {
dontLogInNuclide: true,
})
.do(child => {
const duration = performanceNow() - startTime;
if (duration > SPAWN_TOO_LONG_MS) {
ScribeProcess._enabled = false;
getLogger('ScribeProcess').error(
`Disabling ScribeProcess because spawn took too long (${duration}ms)`,
);
// Don't raise any errors and allow the current write to complete.
// However, the next write will fail due to the _enabled check.
this.join();
}
child.stdin.setDefaultEncoding('utf8');
})
.finally(() => {
// We may have already started a new process in the meantime.
if (this._childPromise === childPromise) {
this._clear();
}
})
.publish();

const childPromise = (this._childPromise = processStream
.first()
.toPromise());
this._subscription = processStream.connect();

if (this._joinInterval != null) {
this._joinTimer = setTimeout(() => {
this._joinTimer = null;
this.join();
}, this._joinInterval);
}

return childPromise;
}

_clear() {
this._childPromise = null;
this._subscription = null;
if (this._joinTimer != null) {
clearTimeout(this._joinTimer);
this._joinTimer = null;
}
}
}

export const __test__ = {
setScribeCatCommand(newCommand: string): string {
const originalCommand = SCRIBE_CAT_COMMAND;
SCRIBE_CAT_COMMAND = newCommand;
return originalCommand;
},
};
121 changes: 121 additions & 0 deletions modules/nuclide-commons/__tests__/ScribeProcess-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/**
* Copyright (c) 2017-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
* @flow
* @format
* @emails oncall+nuclide
*/
import fs from 'fs';
import fsPromise from 'nuclide-commons/fsPromise';
import nuclideUri from 'nuclide-commons/nuclideUri';
import ScribeProcess, {__test__} from '../ScribeProcess';
import waitsFor from '../../../jest/waits_for';

// scripe process does not pass the ENV vars properly and scribe_cat_mock
// fails on os.environ['SCRIBE_MOCK_PATH']
describe.skip('scribe_cat test suites', () => {
let tempDir = '';
let originalCommand = '';

function getContentOfScribeCategory(category: string): Array<string> {
const categoryFilePath = nuclideUri.join(tempDir, category);
const content = fs.readFileSync(categoryFilePath, 'utf8');
const result = content.split('\n').filter(item => item.length > 0);
return result;
}

beforeEach(async () => {
// Simulated scribe_cat script which saves data into:
// ${process.env['SCRIBE_MOCK_PATH'] + category_name}
// It terminates once we cut off the stdin stream.
const scribeCatMockCommandPath = nuclideUri.join(
nuclideUri.dirname(__filename),
'../__mocks__/scripts',
'scribe_cat_mock',
);
tempDir = await fsPromise.tempdir();
originalCommand = __test__.setScribeCatCommand(scribeCatMockCommandPath);
process.env.SCRIBE_MOCK_PATH = tempDir;
});

afterEach(async () => {
__test__.setScribeCatCommand(originalCommand);
});

it('Saves data to scribe category', async () => {
const localScribeProcess = new ScribeProcess('test');

const messages = [
'A',
'nuclide',
'is',
'an',
'atomic',
'species',
'characterized',
'by',
'the',
'specific',
'constitution',
'of',
'its',
'nucleus.',
];
messages.map(message => localScribeProcess.write(message));

// Wait for `scribe_cat_mock` to flush data into disk.
await localScribeProcess.join();
expect(messages).toEqual(getContentOfScribeCategory('test'));
});

it('Saves data to scribe category and resume from error', async () => {
const localScribeProcess = new ScribeProcess('test');

const firstPart = 'A nuclide is an atomic species'.split(' ');
const secondPart = 'characterized by the specific constitution of its nucleus.'.split(
' ',
);

firstPart.map(message => localScribeProcess.write(message));
// Kill the existing process.
await localScribeProcess.join();
secondPart.map(message => localScribeProcess.write(message));
// Wait for `scribe_cat_mock` to flush data into disk.
await localScribeProcess.join();
expect(firstPart.concat(secondPart)).toEqual(
getContentOfScribeCategory('test'),
);
});

it('Can automatically join', async () => {
const localScribeProcess = new ScribeProcess('test', 100);
localScribeProcess.write('test1');

await waitsFor(() => getContentOfScribeCategory('test').includes('test1'));

localScribeProcess.write('test2');
localScribeProcess.write('test3');
expect(getContentOfScribeCategory('test')).toEqual(['test1']);

await waitsFor(() => getContentOfScribeCategory('test').includes('test3'));

expect(getContentOfScribeCategory('test')).toEqual([
'test1',
'test2',
'test3',
]);
});

it('disables itself when spawning fails', async () => {
__test__.setScribeCatCommand('not a valid command');
const scribeProcess = new ScribeProcess('test', 100);
expect(await scribeProcess.write('hi')).toBe(false);
expect(ScribeProcess.isEnabled()).toBe(false);
expect(await scribeProcess.write('hi')).toBe(false);
});
});

0 comments on commit 37bfd88

Please sign in to comment.