Skip to content

Commit

Permalink
feat(event-stream): implement event stream sra (#4695)
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewFossAWS authored May 30, 2023
1 parent e91addb commit 9ba012d
Show file tree
Hide file tree
Showing 25 changed files with 629 additions and 178 deletions.
53 changes: 51 additions & 2 deletions packages/eventstream-codec/src/EventStreamCodec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
import { Crc32 } from "@aws-crypto/crc32";
import { Message, MessageHeaders } from "@aws-sdk/types";
import {
AvailableMessage,
AvailableMessages,
Message,
MessageDecoder,
MessageEncoder,
MessageHeaders,
} from "@aws-sdk/types";
import { Decoder, Encoder } from "@aws-sdk/types";

import { HeaderMarshaller } from "./HeaderMarshaller";
Expand All @@ -9,11 +16,53 @@ import { splitMessage } from "./splitMessage";
* A Codec that can convert binary-packed event stream messages into
* JavaScript objects and back again into their binary format.
*/
export class EventStreamCodec {
export class EventStreamCodec implements MessageEncoder, MessageDecoder {
private readonly headerMarshaller: HeaderMarshaller;
private messageBuffer: Message[];

private isEndOfStream: boolean;

constructor(toUtf8: Encoder, fromUtf8: Decoder) {
this.headerMarshaller = new HeaderMarshaller(toUtf8, fromUtf8);
this.messageBuffer = [];
this.isEndOfStream = false;
}

feed(message: ArrayBufferView): void {
this.messageBuffer.push(this.decode(message));
}

endOfStream(): void {
this.isEndOfStream = true;
}

getMessage(): AvailableMessage {
const message = this.messageBuffer.pop();
const isEndOfStream = this.isEndOfStream;

return {
getMessage(): Message | undefined {
return message;
},
isEndOfStream(): boolean {
return isEndOfStream;
},
};
}

getAvailableMessages(): AvailableMessages {
const messages = this.messageBuffer;
this.messageBuffer = [];
const isEndOfStream = this.isEndOfStream;

return {
getMessages(): Message[] {
return messages;
},
isEndOfStream(): boolean {
return isEndOfStream;
},
};
}

/**
Expand Down
43 changes: 43 additions & 0 deletions packages/eventstream-codec/src/MessageDecoderStream.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { Message } from "@aws-sdk/types";

import { MessageDecoderStream } from "./MessageDecoderStream";

describe("MessageDecoderStream", () => {
it("returns decoded messages", async () => {
const message1 = {
headers: {},
body: new Uint8Array(1),
};

const message2 = {
headers: {},
body: new Uint8Array(2),
};

const messageDecoderMock = {
decode: jest.fn().mockReturnValueOnce(message1).mockReturnValueOnce(message2),
feed: jest.fn(),
endOfStream: jest.fn(),
getMessage: jest.fn(),
getAvailableMessages: jest.fn(),
};

const inputStream = async function* () {
yield new Uint8Array(0);
yield new Uint8Array(1);
};

const messageDecoderStream = new MessageDecoderStream({
decoder: messageDecoderMock,
inputStream: inputStream(),
});

const messages: Array<Message> = [];
for await (const message of messageDecoderStream) {
messages.push(message);
}
expect(messages.length).toEqual(2);
expect(messages[0]).toEqual(message1);
expect(messages[1]).toEqual(message2);
});
});
27 changes: 27 additions & 0 deletions packages/eventstream-codec/src/MessageDecoderStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { Message, MessageDecoder } from "@aws-sdk/types";

/**
* @internal
*/
export interface MessageDecoderStreamOptions {
inputStream: AsyncIterable<Uint8Array>;
decoder: MessageDecoder;
}

/**
* @internal
*/
export class MessageDecoderStream implements AsyncIterable<Message> {
constructor(private readonly options: MessageDecoderStreamOptions) {}

[Symbol.asyncIterator](): AsyncIterator<Message> {
return this.asyncIterator();
}

private async *asyncIterator() {
for await (const bytes of this.options.inputStream) {
const decoded = this.options.decoder.decode(bytes);
yield decoded;
}
}
}
73 changes: 73 additions & 0 deletions packages/eventstream-codec/src/MessageEncoderStream.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { MessageEncoderStream } from "./MessageEncoderStream";

describe("MessageEncoderStream", () => {
it("returns encoded stream with end frame", async () => {
const message1 = {
headers: {},
body: new Uint8Array(1),
};

const message2 = {
headers: {},
body: new Uint8Array(2),
};

const messageEncoderMock = {
encode: jest.fn().mockReturnValueOnce(new Uint8Array(1)).mockReturnValueOnce(new Uint8Array(2)),
};

const inputStream = async function* () {
yield message1;
yield message2;
};

const messageEncoderStream = new MessageEncoderStream({
encoder: messageEncoderMock,
messageStream: inputStream(),
includeEndFrame: true,
});

const messages: Array<Uint8Array> = [];
for await (const encoded of messageEncoderStream) {
messages.push(encoded);
}
expect(messages.length).toEqual(3);
expect(messages[0]).toEqual(new Uint8Array(1));
expect(messages[1]).toEqual(new Uint8Array(2));
expect(messages[2]).toEqual(new Uint8Array(0));
});

it("returns encoded stream without end frame", async () => {
const message1 = {
headers: {},
body: new Uint8Array(1),
};

const message2 = {
headers: {},
body: new Uint8Array(2),
};

const messageEncoderMock = {
encode: jest.fn().mockReturnValueOnce(new Uint8Array(1)).mockReturnValueOnce(new Uint8Array(2)),
};

const inputStream = async function* () {
yield message1;
yield message2;
};

const messageEncoderStream = new MessageEncoderStream({
encoder: messageEncoderMock,
messageStream: inputStream(),
});

const messages: Array<Uint8Array> = [];
for await (const encoded of messageEncoderStream) {
messages.push(encoded);
}
expect(messages.length).toEqual(2);
expect(messages[0]).toEqual(new Uint8Array(1));
expect(messages[1]).toEqual(new Uint8Array(2));
});
});
31 changes: 31 additions & 0 deletions packages/eventstream-codec/src/MessageEncoderStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { Message, MessageEncoder } from "@aws-sdk/types";

/**
* @internal
*/
export interface MessageEncoderStreamOptions {
messageStream: AsyncIterable<Message>;
encoder: MessageEncoder;
includeEndFrame?: Boolean;
}

/**
* @internal
*/
export class MessageEncoderStream implements AsyncIterable<Uint8Array> {
constructor(private readonly options: MessageEncoderStreamOptions) {}

[Symbol.asyncIterator](): AsyncIterator<Uint8Array> {
return this.asyncIterator();
}

private async *asyncIterator() {
for await (const msg of this.options.messageStream) {
const encoded = this.options.encoder.encode(msg);
yield encoded;
}
if (this.options.includeEndFrame) {
yield new Uint8Array(0);
}
}
}
38 changes: 38 additions & 0 deletions packages/eventstream-codec/src/SmithyMessageDecoderStream.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { SmithyMessageDecoderStream } from "./SmithyMessageDecoderStream";

describe("SmithyMessageDecoderStream", () => {
it("returns decoded stream", async () => {
const message1 = {
headers: {},
body: new Uint8Array(1),
};

const message2 = {
headers: {},
body: new Uint8Array(2),
};

const deserializer = jest
.fn()
.mockReturnValueOnce(Promise.resolve("first"))
.mockReturnValueOnce(Promise.resolve("second"));

const inputStream = async function* () {
yield message1;
yield message2;
};

const stream = new SmithyMessageDecoderStream<String>({
messageStream: inputStream(),
deserializer: deserializer,
});

const messages: Array<String> = [];
for await (const str of stream) {
messages.push(str);
}
expect(messages.length).toEqual(2);
expect(messages[0]).toEqual("first");
expect(messages[1]).toEqual("second");
});
});
28 changes: 28 additions & 0 deletions packages/eventstream-codec/src/SmithyMessageDecoderStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { Message } from "@aws-sdk/types";

/**
* @internal
*/
export interface SmithyMessageDecoderStreamOptions<T> {
readonly messageStream: AsyncIterable<Message>;
readonly deserializer: (input: Message) => Promise<T | undefined>;
}

/**
* @internal
*/
export class SmithyMessageDecoderStream<T> implements AsyncIterable<T> {
constructor(private readonly options: SmithyMessageDecoderStreamOptions<T>) {}

[Symbol.asyncIterator](): AsyncIterator<T> {
return this.asyncIterator();
}

private async *asyncIterator() {
for await (const message of this.options.messageStream) {
const deserialized = await this.options.deserializer(message);
if (deserialized === undefined) continue;
yield deserialized;
}
}
}
37 changes: 37 additions & 0 deletions packages/eventstream-codec/src/SmithyMessageEncoderStream.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { Message } from "@aws-sdk/types";

import { SmithyMessageEncoderStream } from "./SmithyMessageEncoderStream";

describe("SmithyMessageEncoderStream", () => {
it("returns encoded stream", async () => {
const message1 = {
headers: {},
body: new Uint8Array(1),
};

const message2 = {
headers: {},
body: new Uint8Array(2),
};

const serializer = jest.fn().mockReturnValueOnce(message1).mockReturnValueOnce(message2);

const inputStream = async function* () {
yield "first";
yield "second";
};

const stream = new SmithyMessageEncoderStream<String>({
inputStream: inputStream(),
serializer: serializer,
});

const messages: Array<Message> = [];
for await (const str of stream) {
messages.push(str);
}
expect(messages.length).toEqual(2);
expect(messages[0]).toEqual(message1);
expect(messages[1]).toEqual(message2);
});
});
27 changes: 27 additions & 0 deletions packages/eventstream-codec/src/SmithyMessageEncoderStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { Message } from "@aws-sdk/types";

/**
* @internal
*/
export interface SmithyMessageEncoderStreamOptions<T> {
inputStream: AsyncIterable<T>;
serializer: (event: T) => Message;
}

/**
* @internal
*/
export class SmithyMessageEncoderStream<T> implements AsyncIterable<Message> {
constructor(private readonly options: SmithyMessageEncoderStreamOptions<T>) {}

[Symbol.asyncIterator](): AsyncIterator<Message> {
return this.asyncIterator();
}

private async *asyncIterator() {
for await (const chunk of this.options.inputStream) {
const payloadBuf = this.options.serializer(chunk);
yield payloadBuf;
}
}
}
5 changes: 5 additions & 0 deletions packages/eventstream-codec/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
export * from "./EventStreamCodec";
export * from "./HeaderMarshaller";
export * from "./Int64";
export * from "./Message";
export * from "./MessageDecoderStream";
export * from "./MessageEncoderStream";
export * from "./SmithyMessageDecoderStream";
export * from "./SmithyMessageEncoderStream";
Loading

0 comments on commit 9ba012d

Please sign in to comment.