Skip to content

Commit

Permalink
introduce advanced logging controls (#91)
Browse files Browse the repository at this point in the history
  • Loading branch information
thenewguy39 authored Dec 1, 2023
1 parent b8d562e commit 7374a4e
Show file tree
Hide file tree
Showing 7 changed files with 959 additions and 58 deletions.
4 changes: 2 additions & 2 deletions src/CallbackContext.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
'use strict';

const BeforeExitListener = require('./BeforeExitListener.js');
const { toFormatted, intoError } = require('./Errors');
const { structuredConsole } = require('./LogPatch');

/**
* Build the callback function and the part of the context which exposes
Expand All @@ -20,7 +20,7 @@ const { toFormatted, intoError } = require('./Errors');
*/
function _rawCallbackContext(client, id, scheduleNext) {
const postError = (err, callback) => {
console.error('Invoke Error', toFormatted(intoError(err)));
structuredConsole.logError('Invoke Error', err);
client.postInvocationError(err, id, callback);
};

Expand Down
204 changes: 169 additions & 35 deletions src/LogPatch.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,57 @@

const util = require('util');
const fs = require('fs');
const Errors = require('./Errors');

const levels = Object.freeze({
INFO: { name: 'INFO' },
DEBUG: { name: 'DEBUG' },
WARN: { name: 'WARN' },
ERROR: { name: 'ERROR' },
TRACE: { name: 'TRACE' },
FATAL: { name: 'FATAL' },
});
const structuredConsole = {};

const jsonErrorReplacer = (_, value) => {
if (value instanceof Error) {
let serializedErr = Object.assign(
{
errorType: value?.constructor?.name ?? 'UnknownError',
errorMessage: value.message,
stackTrace:
typeof value.stack === 'string'
? value.stack.split('\n')
: value.stack,
},
value,
);
return serializedErr;
}
return value;
};

function formatJsonMessage(requestId, timestamp, level, ...messageParams) {
let result = {
timestamp: timestamp,
level: level.name,
requestId: requestId,
};

if (messageParams.length === 1) {
result.message = messageParams[0];
try {
return JSON.stringify(result, jsonErrorReplacer);
} catch (_) {
result.message = util.format(result.message);
return JSON.stringify(result);
}
}

result.message = util.format(...messageParams);
for (const param of messageParams) {
if (param instanceof Error) {
result.errorType = param?.constructor?.name ?? 'UnknownError';
result.errorMessage = param.message;
result.stackTrace =
typeof param.stack === 'string' ? param.stack.split('\n') : [];
break;
}
}
return JSON.stringify(result);
}

/* Use a unique symbol to provide global access without risk of name clashes. */
const REQUEST_ID_SYMBOL = Symbol.for('aws.lambda.runtime.requestId');
Expand All @@ -26,10 +68,21 @@ let _currentRequestId = {
/**
* Write logs to stdout.
*/
let _logToStdout = (level, message) => {
let logTextToStdout = (level, message, ...params) => {
let time = new Date().toISOString();
let requestId = _currentRequestId.get();
let line = `${time}\t${requestId}\t${level.name}\t${util.format(
message,
...params,
)}`;
line = line.replace(/\n/g, '\r');
process.stdout.write(line + '\n');
};

let logJsonToStdout = (level, message, ...params) => {
let time = new Date().toISOString();
let requestId = _currentRequestId.get();
let line = `${time}\t${requestId}\t${level.name}\t${message}`;
let line = formatJsonMessage(requestId, time, level, message, ...params);
line = line.replace(/\n/g, '\r');
process.stdout.write(line + '\n');
};
Expand All @@ -46,15 +99,41 @@ let _logToStdout = (level, message) => {
* The next 8 bytes are the UNIX timestamp of the message with microseconds precision.
* The remaining bytes ar ethe message itself. Byte order is big-endian.
*/
let _logToFd = function (logTarget) {
let logTextToFd = function (logTarget) {
let typeAndLength = Buffer.alloc(16);
typeAndLength.writeUInt32BE(0xa55a0003, 0);
return (level, message, ...params) => {
let date = new Date();
let time = date.toISOString();
let requestId = _currentRequestId.get();
let enrichedMessage = `${time}\t${requestId}\t${level.name}\t${util.format(
message,
...params,
)}\n`;

return (level, message) => {
typeAndLength.writeUInt32BE((0xa55a0003 | level.tlvMask) >>> 0, 0);
let messageBytes = Buffer.from(enrichedMessage, 'utf8');
typeAndLength.writeInt32BE(messageBytes.length, 4);
typeAndLength.writeBigInt64BE(BigInt(date.valueOf()) * 1000n, 8);
fs.writeSync(logTarget, typeAndLength);
fs.writeSync(logTarget, messageBytes);
};
};

let logJsonToFd = function (logTarget) {
let typeAndLength = Buffer.alloc(16);
return (level, message, ...params) => {
let date = new Date();
let time = date.toISOString();
let requestId = _currentRequestId.get();
let enrichedMessage = `${time}\t${requestId}\t${level.name}\t${message}\n`;
let enrichedMessage = formatJsonMessage(
requestId,
time,
level,
message,
...params,
);

typeAndLength.writeUInt32BE((0xa55a0002 | level.tlvMask) >>> 0, 0);
let messageBytes = Buffer.from(enrichedMessage, 'utf8');
typeAndLength.writeInt32BE(messageBytes.length, 4);
typeAndLength.writeBigInt64BE(BigInt(date.valueOf()) * 1000n, 8);
Expand All @@ -66,45 +145,100 @@ let _logToFd = function (logTarget) {
/**
* Replace console functions with a log function.
* @param {Function(level, String)} log
* Apply log filters, based on `AWS_LAMBDA_LOG_LEVEL` env var
*/
function _patchConsoleWith(log) {
console.log = (msg, ...params) => {
log(levels.INFO, util.format(msg, ...params));
};
console.debug = (msg, ...params) => {
log(levels.DEBUG, util.format(msg, ...params));
};
console.info = (msg, ...params) => {
log(levels.INFO, util.format(msg, ...params));
};
console.warn = (msg, ...params) => {
log(levels.WARN, util.format(msg, ...params));
};
console.error = (msg, ...params) => {
log(levels.ERROR, util.format(msg, ...params));
};
console.trace = (msg, ...params) => {
log(levels.TRACE, util.format(msg, ...params));
};
const NopLog = (_message, ..._params) => {};
const levels = Object.freeze({
TRACE: { name: 'TRACE', priority: 1, tlvMask: 0b00100 },
DEBUG: { name: 'DEBUG', priority: 2, tlvMask: 0b01000 },
INFO: { name: 'INFO', priority: 3, tlvMask: 0b01100 },
WARN: { name: 'WARN', priority: 4, tlvMask: 0b10000 },
ERROR: { name: 'ERROR', priority: 5, tlvMask: 0b10100 },
FATAL: { name: 'FATAL', priority: 6, tlvMask: 0b11000 },
});
let awsLambdaLogLevel =
levels[process.env['AWS_LAMBDA_LOG_LEVEL']?.toUpperCase()] ?? levels.TRACE;

if (levels.TRACE.priority >= awsLambdaLogLevel.priority) {
console.trace = (msg, ...params) => {
log(levels.TRACE, msg, ...params);
};
} else {
console.trace = NopLog;
}
if (levels.DEBUG.priority >= awsLambdaLogLevel.priority) {
console.debug = (msg, ...params) => {
log(levels.DEBUG, msg, ...params);
};
} else {
console.debug = NopLog;
}
if (levels.INFO.priority >= awsLambdaLogLevel.priority) {
console.info = (msg, ...params) => {
log(levels.INFO, msg, ...params);
};
} else {
console.info = NopLog;
}
console.log = console.info;
if (levels.WARN.priority >= awsLambdaLogLevel.priority) {
console.warn = (msg, ...params) => {
log(levels.WARN, msg, ...params);
};
} else {
console.warn = NopLog;
}
if (levels.ERROR.priority >= awsLambdaLogLevel.priority) {
console.error = (msg, ...params) => {
log(levels.ERROR, msg, ...params);
};
} else {
console.error = NopLog;
}
console.fatal = (msg, ...params) => {
log(levels.FATAL, util.format(msg, ...params));
log(levels.FATAL, msg, ...params);
};
}

let _patchConsole = () => {
const JsonName = 'JSON',
TextName = 'TEXT';
let awsLambdaLogFormat =
process.env['AWS_LAMBDA_LOG_FORMAT']?.toUpperCase() === JsonName
? JsonName
: TextName;
let jsonErrorLogger = (_, err) => {
console.error(Errors.intoError(err));
},
textErrorLogger = (msg, err) => {
console.error(msg, Errors.toFormatted(Errors.intoError(err)));
};

/**
Resolve log format here, instead of inside log function.
This avoids conditional statements in the log function hot path.
**/
let logger;
if (
process.env['_LAMBDA_TELEMETRY_LOG_FD'] != null &&
process.env['_LAMBDA_TELEMETRY_LOG_FD'] != undefined
) {
let logFd = parseInt(process.env['_LAMBDA_TELEMETRY_LOG_FD']);
_patchConsoleWith(_logToFd(logFd));
delete process.env['_LAMBDA_TELEMETRY_LOG_FD'];
logger =
awsLambdaLogFormat === JsonName ? logJsonToFd(logFd) : logTextToFd(logFd);
} else {
_patchConsoleWith(_logToStdout);
logger =
awsLambdaLogFormat === JsonName ? logJsonToStdout : logTextToStdout;
}
_patchConsoleWith(logger);
structuredConsole.logError =
awsLambdaLogFormat === JsonName ? jsonErrorLogger : textErrorLogger;
};

module.exports = {
setCurrentRequestId: _currentRequestId.set,
patchConsole: _patchConsole,
structuredConsole: structuredConsole,
};
9 changes: 3 additions & 6 deletions src/StreamingContext.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@
'use strict';

const BeforeExitListener = require('./BeforeExitListener.js');
const {
InvalidStreamingOperation,
toFormatted,
intoError,
} = require('./Errors');
const { InvalidStreamingOperation } = require('./Errors');
const { verbose, vverbose } = require('./VerboseLog.js').logger('STREAM');
const { tryCallFail } = require('./ResponseStream');
const { structuredConsole } = require('./LogPatch');

/**
* Construct the base-context object which includes the required flags and
Expand Down Expand Up @@ -67,7 +64,7 @@ module.exports.build = function (client, id, scheduleNext, options) {

return {
fail: (err, callback) => {
console.error('Invoke Error', toFormatted(intoError(err)));
structuredConsole.logError('Invoke Error', err);

tryCallFail(responseStream, err, callback);
},
Expand Down
4 changes: 2 additions & 2 deletions src/index.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ export async function run(appRootOrHandler, handler = '') {
};

process.on('uncaughtException', (error) => {
console.error('Uncaught Exception', Errors.toFormatted(error));
LogPatch.structuredConsole.logError('Uncaught Exception', error);
errorCallbacks.uncaughtException(error);
});

process.on('unhandledRejection', (reason, promise) => {
let error = new Errors.UnhandledPromiseRejection(reason, promise);
console.error('Unhandled Promise Rejection', Errors.toFormatted(error));
LogPatch.structuredConsole.logError('Unhandled Promise Rejection', error);
errorCallbacks.unhandledRejection(error);
});

Expand Down
37 changes: 33 additions & 4 deletions test/unit/FakeTelemetryTarget.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,16 @@ const fs = require('fs');
const path = require('path');
const assert = require('assert');

const _LOG_IDENTIFIER = Buffer.from('a55a0003', 'hex');
const levels = Object.freeze({
TRACE: { name: 'TRACE', tlvMask: 0b00100 },
DEBUG: { name: 'DEBUG', tlvMask: 0b01000 },
INFO: { name: 'INFO', tlvMask: 0b01100 },
WARN: { name: 'WARN', tlvMask: 0b10000 },
ERROR: { name: 'ERROR', tlvMask: 0b10100 },
FATAL: { name: 'FATAL', tlvMask: 0b11000 },
});

const TextName = 'TEXT';

/**
* A fake implementation of the multilne logging protocol.
Expand Down Expand Up @@ -55,7 +64,7 @@ module.exports = class FakeTelemetryTarget {
* - the prefix is malformed
* - there aren't enough bytes
*/
readLine() {
readLine(level = 'INFO', format = TextName, expectEmpty = false) {
let readLength = () => {
let logPrefix = Buffer.alloc(16);
let actualReadBytes = fs.readSync(
Expand All @@ -64,17 +73,34 @@ module.exports = class FakeTelemetryTarget {
0,
logPrefix.length,
);

if (expectEmpty) {
assert.strictEqual(
actualReadBytes,
0,
`Expected actualReadBytes[${actualReadBytes}] = 0`,
);
return 0;
}

assert.strictEqual(
actualReadBytes,
logPrefix.length,
`Expected actualReadBytes[${actualReadBytes}] = ${logPrefix.length}`,
);

var _tlvHeader;
if (format === TextName)
_tlvHeader = (0xa55a0003 | levels[level].tlvMask) >>> 0;
else _tlvHeader = (0xa55a0002 | levels[level].tlvMask) >>> 0;

let _logIdentifier = Buffer.from(_tlvHeader.toString(16), 'hex');
assert.strictEqual(
logPrefix.lastIndexOf(_LOG_IDENTIFIER),
logPrefix.lastIndexOf(_logIdentifier),
0,
`log prefix ${logPrefix.toString(
'hex',
)} should start with ${_LOG_IDENTIFIER.toString('hex')}`,
)} should start with ${_logIdentifier.toString('hex')}`,
);
let len = logPrefix.readUInt32BE(4);
// discard the timestamp
Expand All @@ -83,6 +109,9 @@ module.exports = class FakeTelemetryTarget {
};

let lineLength = readLength();
if (lineLength === 0) {
return '';
}
let lineBytes = Buffer.alloc(lineLength);
let actualLineSize = fs.readSync(
this.readTarget,
Expand Down
Loading

0 comments on commit 7374a4e

Please sign in to comment.