Skip to content

Commit

Permalink
Include the shard ID in the piped records
Browse files Browse the repository at this point in the history
  • Loading branch information
eaviles committed Oct 12, 2018
1 parent cd5d904 commit 1bb389b
Showing 1 changed file with 30 additions and 12 deletions.
42 changes: 30 additions & 12 deletions lib/records-decoder.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ class RecordsDecoder extends Transform {
}

async _transform({ headers, payload }, encoding, callback) {
const { compressionLib, logger } = internal(this);
const { compressionLib, logger, shard } = internal(this);
const msgType = headers[':message-type'];
const eventType = headers[':event-type'];
const { ShardId: shardId } = shard;

if (msgType !== 'event') {
this.emit('error', new Error(`Unknown event stream message type "${msgType}".`));
Expand All @@ -31,25 +32,42 @@ class RecordsDecoder extends Transform {

if (eventType === 'SubscribeToShardEvent') {
try {
const decodedRecords = await Promise.all(
payload.Records.map(async record => {
const { Data } = record;
let parsedData;
if (compressionLib) parsedData = await compressionLib.decompress(Data);
else parsedData = Buffer.from(Data).toString('utf8');
if (isJson(parsedData)) parsedData = JSON.parse(parsedData);
return { ...record, Data: parsedData };
const {
ContinuationSequenceNumber: continuationSequenceNumber,
MillisBehindLatest: millisBehindLatest,
Records
} = payload;
const records = await Promise.all(
Records.map(async record => {
const {
ApproximateArrivalTimestamp: approximateArrivalTimestamp,
Data,
EncryptionType: encryptionType,
PartitionKey: partitionKey,
SequenceNumber: sequenceNumber
} = record;
let data;
if (compressionLib) data = await compressionLib.decompress(Data);
else data = Buffer.from(Data, 'base64').toString('utf8');
if (isJson(data)) data = JSON.parse(data);
return {
approximateArrivalTimestamp,
data,
encryptionType,
partitionKey,
sequenceNumber
};
})
);
this.push({ ...payload, Records: decodedRecords });
this.push({ continuationSequenceNumber, millisBehindLatest, records, shardId });
callback();
return;
} catch (err) {
this.emit('error', err);
}
return;
}

logger.debug(`Emiting "${eventType}"`);
logger.debug(`Event "${eventType}" emitted.`);
this.emit(eventType, payload);
callback();
}
Expand Down

0 comments on commit 1bb389b

Please sign in to comment.