diff --git a/packages/eventstream-codec/src/EventStreamCodec.ts b/packages/eventstream-codec/src/EventStreamCodec.ts index f145b8c98cf9..8c04c0ff22c1 100644 --- a/packages/eventstream-codec/src/EventStreamCodec.ts +++ b/packages/eventstream-codec/src/EventStreamCodec.ts @@ -1,5 +1,12 @@ import { Crc32 } from "@aws-crypto/crc32"; -import { Message, MessageHeaders } from "@aws-sdk/types"; +import { + AvailableMessage, + AvailableMessages, + Message, + MessageDecoder, + MessageEncoder, + MessageHeaders, +} from "@aws-sdk/types"; import { Decoder, Encoder } from "@aws-sdk/types"; import { HeaderMarshaller } from "./HeaderMarshaller"; @@ -9,11 +16,53 @@ import { splitMessage } from "./splitMessage"; * A Codec that can convert binary-packed event stream messages into * JavaScript objects and back again into their binary format. */ -export class EventStreamCodec { +export class EventStreamCodec implements MessageEncoder, MessageDecoder { private readonly headerMarshaller: HeaderMarshaller; + private messageBuffer: Message[]; + + private isEndOfStream: boolean; constructor(toUtf8: Encoder, fromUtf8: Decoder) { this.headerMarshaller = new HeaderMarshaller(toUtf8, fromUtf8); + this.messageBuffer = []; + this.isEndOfStream = false; + } + + feed(message: ArrayBufferView): void { + this.messageBuffer.push(this.decode(message)); + } + + endOfStream(): void { + this.isEndOfStream = true; + } + + getMessage(): AvailableMessage { + const message = this.messageBuffer.pop(); + const isEndOfStream = this.isEndOfStream; + + return { + getMessage(): Message | undefined { + return message; + }, + isEndOfStream(): boolean { + return isEndOfStream; + }, + }; + } + + getAvailableMessages(): AvailableMessages { + const messages = this.messageBuffer; + this.messageBuffer = []; + const isEndOfStream = this.isEndOfStream; + + return { + getMessages(): Message[] { + return messages; + }, + isEndOfStream(): boolean { + return isEndOfStream; + }, + }; } /** diff --git a/packages/eventstream-codec/src/MessageDecoderStream.spec.ts b/packages/eventstream-codec/src/MessageDecoderStream.spec.ts new file mode 100644 index 000000000000..4e3878fee4d7 --- /dev/null +++ b/packages/eventstream-codec/src/MessageDecoderStream.spec.ts @@ -0,0 +1,43 @@ +import { Message } from "@aws-sdk/types"; + +import { MessageDecoderStream } from "./MessageDecoderStream"; + +describe("MessageDecoderStream", () => { + it("returns decoded messages", async () => { + const message1 = { + headers: {}, + body: new Uint8Array(1), + }; + + const message2 = { + headers: {}, + body: new Uint8Array(2), + }; + + const messageDecoderMock = { + decode: jest.fn().mockReturnValueOnce(message1).mockReturnValueOnce(message2), + feed: jest.fn(), + endOfStream: jest.fn(), + getMessage: jest.fn(), + getAvailableMessages: jest.fn(), + }; + + const inputStream = async function* () { + yield new Uint8Array(0); + yield new Uint8Array(1); + }; + + const messageDecoderStream = new MessageDecoderStream({ + decoder: messageDecoderMock, + inputStream: inputStream(), + }); + + const messages: Array = []; + for await (const message of messageDecoderStream) { + messages.push(message); + } + expect(messages.length).toEqual(2); + expect(messages[0]).toEqual(message1); + expect(messages[1]).toEqual(message2); + }); +}); diff --git a/packages/eventstream-codec/src/MessageDecoderStream.ts b/packages/eventstream-codec/src/MessageDecoderStream.ts new file mode 100644 index 000000000000..6257dbee4154 --- /dev/null +++ b/packages/eventstream-codec/src/MessageDecoderStream.ts @@ -0,0 +1,27 @@ +import { Message, MessageDecoder } from "@aws-sdk/types"; + +/** + * @internal + */ +export interface MessageDecoderStreamOptions { + inputStream: AsyncIterable; + decoder: MessageDecoder; +} + +/** + * @internal + */ +export class MessageDecoderStream implements AsyncIterable { + constructor(private readonly options: MessageDecoderStreamOptions) {} + + [Symbol.asyncIterator](): AsyncIterator { + return this.asyncIterator(); + } + + private async *asyncIterator() { + for await (const bytes of this.options.inputStream) { + const decoded = this.options.decoder.decode(bytes); + yield decoded; + } + } +} diff --git a/packages/eventstream-codec/src/MessageEncoderStream.spec.ts b/packages/eventstream-codec/src/MessageEncoderStream.spec.ts new file mode 100644 index 000000000000..b8b1a4d583b1 --- /dev/null +++ b/packages/eventstream-codec/src/MessageEncoderStream.spec.ts @@ -0,0 +1,73 @@ +import { MessageEncoderStream } from "./MessageEncoderStream"; + +describe("MessageEncoderStream", () => { + it("returns encoded stream with end frame", async () => { + const message1 = { + headers: {}, + body: new Uint8Array(1), + }; + + const message2 = { + headers: {}, + body: new Uint8Array(2), + }; + + const messageEncoderMock = { + encode: jest.fn().mockReturnValueOnce(new Uint8Array(1)).mockReturnValueOnce(new Uint8Array(2)), + }; + + const inputStream = async function* () { + yield message1; + yield message2; + }; + + const messageEncoderStream = new MessageEncoderStream({ + encoder: messageEncoderMock, + messageStream: inputStream(), + includeEndFrame: true, + }); + + const messages: Array = []; + for await (const encoded of messageEncoderStream) { + messages.push(encoded); + } + expect(messages.length).toEqual(3); + expect(messages[0]).toEqual(new Uint8Array(1)); + expect(messages[1]).toEqual(new Uint8Array(2)); + expect(messages[2]).toEqual(new Uint8Array(0)); + }); + + it("returns encoded stream without end frame", async () => { + const message1 = { + headers: {}, + body: new Uint8Array(1), + }; + + const message2 = { + headers: {}, + body: new Uint8Array(2), + }; + + const messageEncoderMock = { + encode: jest.fn().mockReturnValueOnce(new Uint8Array(1)).mockReturnValueOnce(new Uint8Array(2)), + }; + + const inputStream = async function* () { + yield message1; + yield message2; + }; + + const messageEncoderStream = new MessageEncoderStream({ + encoder: messageEncoderMock, + messageStream: inputStream(), + }); + + const messages: Array = []; + for await (const encoded of messageEncoderStream) { + messages.push(encoded); + } + expect(messages.length).toEqual(2); + expect(messages[0]).toEqual(new Uint8Array(1)); + expect(messages[1]).toEqual(new Uint8Array(2)); + }); +}); diff --git a/packages/eventstream-codec/src/MessageEncoderStream.ts b/packages/eventstream-codec/src/MessageEncoderStream.ts new file mode 100644 index 000000000000..0531c9f3b318 --- /dev/null +++ b/packages/eventstream-codec/src/MessageEncoderStream.ts @@ -0,0 +1,31 @@ +import { Message, MessageEncoder } from "@aws-sdk/types"; + +/** + * @internal + */ +export interface MessageEncoderStreamOptions { + messageStream: AsyncIterable; + encoder: MessageEncoder; + includeEndFrame?: Boolean; +} + +/** + * @internal + */ +export class MessageEncoderStream implements AsyncIterable { + constructor(private readonly options: MessageEncoderStreamOptions) {} + + [Symbol.asyncIterator](): AsyncIterator { + return this.asyncIterator(); + } + + private async *asyncIterator() { + for await (const msg of this.options.messageStream) { + const encoded = this.options.encoder.encode(msg); + yield encoded; + } + if (this.options.includeEndFrame) { + yield new Uint8Array(0); + } + } +} diff --git a/packages/eventstream-codec/src/SmithyMessageDecoderStream.spec.ts b/packages/eventstream-codec/src/SmithyMessageDecoderStream.spec.ts new file mode 100644 index 000000000000..41be8bac2726 --- /dev/null +++ b/packages/eventstream-codec/src/SmithyMessageDecoderStream.spec.ts @@ -0,0 +1,38 @@ +import { SmithyMessageDecoderStream } from "./SmithyMessageDecoderStream"; + +describe("SmithyMessageDecoderStream", () => { + it("returns decoded stream", async () => { + const message1 = { + headers: {}, + body: new Uint8Array(1), + }; + + const message2 = { + headers: {}, + body: new Uint8Array(2), + }; + + const deserializer = jest + .fn() + .mockReturnValueOnce(Promise.resolve("first")) + .mockReturnValueOnce(Promise.resolve("second")); + + const inputStream = async function* () { + yield message1; + yield message2; + }; + + const stream = new SmithyMessageDecoderStream({ + messageStream: inputStream(), + deserializer: deserializer, + }); + + const messages: Array = []; + for await (const str of stream) { + messages.push(str); + } + expect(messages.length).toEqual(2); + expect(messages[0]).toEqual("first"); + expect(messages[1]).toEqual("second"); + }); +}); diff --git a/packages/eventstream-codec/src/SmithyMessageDecoderStream.ts b/packages/eventstream-codec/src/SmithyMessageDecoderStream.ts new file mode 100644 index 000000000000..ae7970b704ae --- /dev/null +++ b/packages/eventstream-codec/src/SmithyMessageDecoderStream.ts @@ -0,0 +1,28 @@ +import { Message } from "@aws-sdk/types"; + +/** + * @internal + */ +export interface SmithyMessageDecoderStreamOptions { + readonly messageStream: AsyncIterable; + readonly deserializer: (input: Message) => Promise; +} + +/** + * @internal + */ +export class SmithyMessageDecoderStream implements AsyncIterable { + constructor(private readonly options: SmithyMessageDecoderStreamOptions) {} + + [Symbol.asyncIterator](): AsyncIterator { + return this.asyncIterator(); + } + + private async *asyncIterator() { + for await (const message of this.options.messageStream) { + const deserialized = await this.options.deserializer(message); + if (deserialized === undefined) continue; + yield deserialized; + } + } +} diff --git a/packages/eventstream-codec/src/SmithyMessageEncoderStream.spec.ts b/packages/eventstream-codec/src/SmithyMessageEncoderStream.spec.ts new file mode 100644 index 000000000000..b54d62e3567f --- /dev/null +++ b/packages/eventstream-codec/src/SmithyMessageEncoderStream.spec.ts @@ -0,0 +1,37 @@ +import { Message } from "@aws-sdk/types"; + +import { SmithyMessageEncoderStream } from "./SmithyMessageEncoderStream"; + +describe("SmithyMessageEncoderStream", () => { + it("returns encoded stream", async () => { + const message1 = { + headers: {}, + body: new Uint8Array(1), + }; + + const message2 = { + headers: {}, + body: new Uint8Array(2), + }; + + const serializer = jest.fn().mockReturnValueOnce(message1).mockReturnValueOnce(message2); + + const inputStream = async function* () { + yield "first"; + yield "second"; + }; + + const stream = new SmithyMessageEncoderStream({ + inputStream: inputStream(), + serializer: serializer, + }); + + const messages: Array = []; + for await (const str of stream) { + messages.push(str); + } + expect(messages.length).toEqual(2); + expect(messages[0]).toEqual(message1); + expect(messages[1]).toEqual(message2); + }); +}); diff --git a/packages/eventstream-codec/src/SmithyMessageEncoderStream.ts b/packages/eventstream-codec/src/SmithyMessageEncoderStream.ts new file mode 100644 index 000000000000..b489da126e1b --- /dev/null +++ b/packages/eventstream-codec/src/SmithyMessageEncoderStream.ts @@ -0,0 +1,27 @@ +import { Message } from "@aws-sdk/types"; + +/** + * @internal + */ +export interface SmithyMessageEncoderStreamOptions { + inputStream: AsyncIterable; + serializer: (event: T) => Message; +} + +/** + * @internal + */ +export class SmithyMessageEncoderStream implements AsyncIterable { + constructor(private readonly options: SmithyMessageEncoderStreamOptions) {} + + [Symbol.asyncIterator](): AsyncIterator { + return this.asyncIterator(); + } + + private async *asyncIterator() { + for await (const chunk of this.options.inputStream) { + const payloadBuf = this.options.serializer(chunk); + yield payloadBuf; + } + } +} diff --git a/packages/eventstream-codec/src/index.ts b/packages/eventstream-codec/src/index.ts index 35ffad501072..458feabc1542 100644 --- a/packages/eventstream-codec/src/index.ts +++ b/packages/eventstream-codec/src/index.ts @@ -1,3 +1,8 @@ export * from "./EventStreamCodec"; +export * from "./HeaderMarshaller"; export * from "./Int64"; export * from "./Message"; +export * from "./MessageDecoderStream"; +export * from "./MessageEncoderStream"; +export * from "./SmithyMessageDecoderStream"; +export * from "./SmithyMessageEncoderStream"; diff --git a/packages/eventstream-handler-node/src/EventSigningStream.spec.ts b/packages/eventstream-handler-node/src/EventSigningStream.spec.ts index a6bca057542a..8d5e50417918 100644 --- a/packages/eventstream-handler-node/src/EventSigningStream.spec.ts +++ b/packages/eventstream-handler-node/src/EventSigningStream.spec.ts @@ -1,5 +1,5 @@ import { EventStreamCodec } from "@aws-sdk/eventstream-codec"; -import { Message, MessageHeaders } from "@aws-sdk/types"; +import { Message, MessageHeaders, SignedMessage } from "@aws-sdk/types"; import { fromUtf8, toUtf8 } from "@aws-sdk/util-utf8"; import { EventSigningStream } from "./EventSigningStream"; @@ -13,18 +13,17 @@ describe("EventSigningStream", () => { it("should sign a eventstream payload properly", (done) => { const eventStreamCodec = new EventStreamCodec(toUtf8, fromUtf8); - const inputChunks: Array = ( - [ - { - headers: {}, - body: fromUtf8("foo"), - }, - { - headers: {}, - body: fromUtf8("bar"), - }, - ] as Array - ).map((event) => eventStreamCodec.encode(event)); + const message1: Message = { + headers: {}, + body: fromUtf8("foo"), + }; + const message2: Message = { + headers: {}, + body: fromUtf8("bar"), + }; + const inputChunks: Array = ([message1, message2] as Array).map((event) => + eventStreamCodec.encode(event) + ); const expected: Array = [ { ":date": { type: "timestamp", value: new Date(1546045446000) }, @@ -41,10 +40,10 @@ describe("EventSigningStream", () => { }, }, ]; - const mockEventSigner = jest + const mockMessageSigner = jest .fn() - .mockReturnValueOnce("7369676e617475726531") //'signature1' - .mockReturnValueOnce("7369676e617475726532"); //'signature2' + .mockReturnValueOnce({ message: message1, signature: "7369676e617475726531" } as SignedMessage) //'signature1' + .mockReturnValueOnce({ message: message2, signature: "7369676e617475726532" } as SignedMessage); //'signature2' // mock 'new Date()' let mockDateCount = 0; // eslint-disable-next-line @typescript-eslint/no-unused-vars @@ -58,7 +57,10 @@ describe("EventSigningStream", () => { }); const signingStream = new EventSigningStream({ priorSignature: "initial", - eventSigner: { sign: mockEventSigner }, + messageSigner: { + sign: mockMessageSigner, + signMessage: mockMessageSigner, + }, eventStreamCodec, }); const output: Array = []; @@ -67,12 +69,12 @@ describe("EventSigningStream", () => { }); signingStream.on("end", () => { expect(output).toEqual(expected); - expect(mockEventSigner.mock.calls[0][1].priorSignature).toBe("initial"); - expect(mockEventSigner.mock.calls[0][1].signingDate.getTime()).toBe( + expect(mockMessageSigner.mock.calls[0][0].priorSignature).toBe("initial"); + expect(mockMessageSigner.mock.calls[0][1].signingDate.getTime()).toBe( (expected[0][":date"].value as Date).getTime() ); - expect(mockEventSigner.mock.calls[1][1].priorSignature).toBe("7369676e617475726531"); - expect(mockEventSigner.mock.calls[1][1].signingDate.getTime()).toBe( + expect(mockMessageSigner.mock.calls[1][0].priorSignature).toBe("7369676e617475726531"); + expect(mockMessageSigner.mock.calls[1][1].signingDate.getTime()).toBe( (expected[1][":date"].value as Date).getTime() ); done(); diff --git a/packages/eventstream-handler-node/src/EventSigningStream.ts b/packages/eventstream-handler-node/src/EventSigningStream.ts index 98bfa38f030f..747dfacb9428 100644 --- a/packages/eventstream-handler-node/src/EventSigningStream.ts +++ b/packages/eventstream-handler-node/src/EventSigningStream.ts @@ -1,5 +1,5 @@ import { EventStreamCodec } from "@aws-sdk/eventstream-codec"; -import { EventSigner, MessageHeaders } from "@aws-sdk/types"; +import { MessageHeaders, MessageSigner } from "@aws-sdk/types"; import { Transform, TransformCallback, TransformOptions } from "stream"; /** @@ -7,18 +7,18 @@ import { Transform, TransformCallback, TransformOptions } from "stream"; */ export interface EventSigningStreamOptions extends TransformOptions { priorSignature: string; - eventSigner: EventSigner; + messageSigner: MessageSigner; eventStreamCodec: EventStreamCodec; } /** * @internal - * + * * A transform stream that signs the eventstream */ export class EventSigningStream extends Transform { private priorSignature: string; - private eventSigner: EventSigner; + private messageSigner: MessageSigner; private eventStreamCodec: EventStreamCodec; constructor(options: EventSigningStreamOptions) { @@ -30,8 +30,8 @@ export class EventSigningStream extends Transform { }); this.priorSignature = options.priorSignature; - this.eventSigner = options.eventSigner; this.eventStreamCodec = options.eventStreamCodec; + this.messageSigner = options.messageSigner; } async _transform(chunk: Uint8Array, encoding: string, callback: TransformCallback): Promise { @@ -40,23 +40,25 @@ export class EventSigningStream extends Transform { const dateHeader: MessageHeaders = { ":date": { type: "timestamp", value: now }, }; - const signature = await this.eventSigner.sign( + const signedMessage = await this.messageSigner.sign( { - payload: chunk, - headers: this.eventStreamCodec.formatHeaders(dateHeader), + message: { + body: chunk, + headers: dateHeader, + }, + priorSignature: this.priorSignature, }, { - priorSignature: this.priorSignature, signingDate: now, } ); - this.priorSignature = signature; + this.priorSignature = signedMessage.signature; const serializedSigned = this.eventStreamCodec.encode({ headers: { ...dateHeader, ":chunk-signature": { type: "binary", - value: getSignatureBinary(signature), + value: getSignatureBinary(signedMessage.signature), }, }, body: chunk, diff --git a/packages/eventstream-handler-node/src/EventStreamPayloadHandler.spec.ts b/packages/eventstream-handler-node/src/EventStreamPayloadHandler.spec.ts index 87c85bd62e47..1e69eb3ea047 100644 --- a/packages/eventstream-handler-node/src/EventStreamPayloadHandler.spec.ts +++ b/packages/eventstream-handler-node/src/EventStreamPayloadHandler.spec.ts @@ -1,5 +1,12 @@ import { EventStreamCodec } from "@aws-sdk/eventstream-codec"; -import { Decoder, Encoder, EventSigner, FinalizeHandler, FinalizeHandlerArguments, HttpRequest } from "@aws-sdk/types"; +import { + Decoder, + Encoder, + FinalizeHandler, + FinalizeHandlerArguments, + HttpRequest, + MessageSigner, +} from "@aws-sdk/types"; import { PassThrough, Readable } from "stream"; import { EventSigningStream } from "./EventSigningStream"; @@ -9,8 +16,9 @@ jest.mock("./EventSigningStream"); jest.mock("@aws-sdk/eventstream-codec"); describe(EventStreamPayloadHandler.name, () => { - const mockSigner: EventSigner = { + const mockMessageSigner: MessageSigner = { sign: jest.fn(), + signMessage: jest.fn(), }; const mockUtf8Decoder: Decoder = jest.fn(); const mockUtf8encoder: Encoder = jest.fn(); @@ -27,7 +35,7 @@ describe(EventStreamPayloadHandler.name, () => { it("should throw if request payload is not a stream", () => { const handler = new EventStreamPayloadHandler({ - eventSigner: () => Promise.resolve(mockSigner), + messageSigner: () => Promise.resolve(mockMessageSigner), utf8Decoder: mockUtf8Decoder, utf8Encoder: mockUtf8encoder, }); @@ -44,7 +52,7 @@ describe(EventStreamPayloadHandler.name, () => { (mockNextHandler as any).mockImplementationOnce(() => Promise.reject(mockError)); const handler = new EventStreamPayloadHandler({ - eventSigner: () => Promise.resolve(mockSigner), + messageSigner: () => Promise.resolve(mockMessageSigner), utf8Decoder: mockUtf8Decoder, utf8Encoder: mockUtf8encoder, }); @@ -74,7 +82,7 @@ describe(EventStreamPayloadHandler.name, () => { } as any; const handler = new EventStreamPayloadHandler({ - eventSigner: () => Promise.resolve(mockSigner), + messageSigner: () => Promise.resolve(mockMessageSigner), utf8Decoder: mockUtf8Decoder, utf8Encoder: mockUtf8encoder, }); @@ -88,7 +96,7 @@ describe(EventStreamPayloadHandler.name, () => { expect(EventSigningStream).toHaveBeenCalledWith({ priorSignature, eventStreamCodec: expect.anything(), - eventSigner: expect.anything(), + messageSigner: expect.anything(), }); }); @@ -105,7 +113,7 @@ describe(EventStreamPayloadHandler.name, () => { } as any; const handler = new EventStreamPayloadHandler({ - eventSigner: () => Promise.resolve(mockSigner), + messageSigner: () => Promise.resolve(mockMessageSigner), utf8Decoder: mockUtf8Decoder, utf8Encoder: mockUtf8encoder, }); @@ -119,7 +127,7 @@ describe(EventStreamPayloadHandler.name, () => { expect(EventSigningStream).toHaveBeenCalledWith({ priorSignature, eventStreamCodec: expect.anything(), - eventSigner: expect.anything(), + messageSigner: expect.anything(), }); }); @@ -132,7 +140,7 @@ describe(EventStreamPayloadHandler.name, () => { headers: { authorization }, } as any; const handler = new EventStreamPayloadHandler({ - eventSigner: () => Promise.resolve(mockSigner), + messageSigner: () => Promise.resolve(mockMessageSigner), utf8Decoder: mockUtf8Decoder, utf8Encoder: mockUtf8encoder, }); diff --git a/packages/eventstream-handler-node/src/EventStreamPayloadHandler.ts b/packages/eventstream-handler-node/src/EventStreamPayloadHandler.ts index edb4b0cfdcf7..8ae1433035ac 100644 --- a/packages/eventstream-handler-node/src/EventStreamPayloadHandler.ts +++ b/packages/eventstream-handler-node/src/EventStreamPayloadHandler.ts @@ -2,13 +2,13 @@ import { EventStreamCodec } from "@aws-sdk/eventstream-codec"; import { Decoder, Encoder, - EventSigner, EventStreamPayloadHandler as IEventStreamPayloadHandler, FinalizeHandler, FinalizeHandlerArguments, FinalizeHandlerOutput, HandlerExecutionContext, HttpRequest, + MessageSigner, MetadataBearer, Provider, } from "@aws-sdk/types"; @@ -20,14 +20,14 @@ import { EventSigningStream } from "./EventSigningStream"; * @internal */ export interface EventStreamPayloadHandlerOptions { - eventSigner: Provider; + messageSigner: Provider; utf8Encoder: Encoder; utf8Decoder: Decoder; } /** * @internal - * + * * A handler that control the eventstream payload flow: * 1. Pause stream for initial attempt. * 2. Close the stream is attempt fails. @@ -35,11 +35,11 @@ export interface EventStreamPayloadHandlerOptions { * 4. Sign the payload after payload stream starting to flow. */ export class EventStreamPayloadHandler implements IEventStreamPayloadHandler { - private readonly eventSigner: Provider; + private readonly messageSigner: Provider; private readonly eventStreamCodec: EventStreamCodec; constructor(options: EventStreamPayloadHandlerOptions) { - this.eventSigner = options.eventSigner; + this.messageSigner = options.messageSigner; this.eventStreamCodec = new EventStreamCodec(options.utf8Encoder, options.utf8Decoder); } @@ -78,7 +78,7 @@ export class EventStreamPayloadHandler implements IEventStreamPayloadHandler { const signingStream = new EventSigningStream({ priorSignature, eventStreamCodec: this.eventStreamCodec, - eventSigner: await this.eventSigner(), + messageSigner: await this.messageSigner(), }); pipeline(payloadStream, signingStream, request.body, (err: NodeJS.ErrnoException | null) => { diff --git a/packages/eventstream-handler-node/src/provider.ts b/packages/eventstream-handler-node/src/provider.ts index f54783b50914..98bba6e38a1b 100644 --- a/packages/eventstream-handler-node/src/provider.ts +++ b/packages/eventstream-handler-node/src/provider.ts @@ -1,4 +1,11 @@ -import { Decoder, Encoder, EventSigner, EventStreamPayloadHandlerProvider, Provider } from "@aws-sdk/types"; +import { + Decoder, + Encoder, + EventSigner, + EventStreamPayloadHandlerProvider, + MessageSigner, + Provider, +} from "@aws-sdk/types"; import { EventStreamPayloadHandler } from "./EventStreamPayloadHandler"; @@ -6,5 +13,5 @@ import { EventStreamPayloadHandler } from "./EventStreamPayloadHandler"; export const eventStreamPayloadHandlerProvider: EventStreamPayloadHandlerProvider = (options: { utf8Encoder: Encoder; utf8Decoder: Decoder; - eventSigner: Provider; + messageSigner: Provider; }) => new EventStreamPayloadHandler(options); diff --git a/packages/eventstream-serde-universal/src/EventStreamMarshaller.ts b/packages/eventstream-serde-universal/src/EventStreamMarshaller.ts index c1c5ff315a96..7fac6b16e859 100644 --- a/packages/eventstream-serde-universal/src/EventStreamMarshaller.ts +++ b/packages/eventstream-serde-universal/src/EventStreamMarshaller.ts @@ -1,8 +1,14 @@ -import { EventStreamCodec } from "@aws-sdk/eventstream-codec"; +import { + EventStreamCodec, + MessageDecoderStream, + MessageEncoderStream, + SmithyMessageDecoderStream, + SmithyMessageEncoderStream, +} from "@aws-sdk/eventstream-codec"; import { Decoder, Encoder, EventStreamMarshaller as IEventStreamMarshaller, Message } from "@aws-sdk/types"; import { getChunkedStream } from "./getChunkedStream"; -import { getUnmarshalledStream } from "./getUnmarshalledStream"; +import { getMessageUnmarshaller } from "./getUnmarshalledStream"; /** * @internal @@ -33,30 +39,20 @@ export class EventStreamMarshaller { body: AsyncIterable, deserializer: (input: Record) => Promise ): AsyncIterable { - const chunkedStream = getChunkedStream(body); - const unmarshalledStream = getUnmarshalledStream(chunkedStream, { - eventStreamCodec: this.eventStreamCodec, + const inputStream = getChunkedStream(body); + // @ts-expect-error Type 'SmithyMessageDecoderStream>' is not assignable to type 'AsyncIterable' + return new SmithyMessageDecoderStream({ + messageStream: new MessageDecoderStream({ inputStream, decoder: this.eventStreamCodec }), // @ts-expect-error Type 'T' is not assignable to type 'Record' - deserializer, - toUtf8: this.utfEncoder, + deserializer: getMessageUnmarshaller(deserializer, this.utfEncoder), }); - // @ts-expect-error 'T' could be instantiated with an arbitrary type which could be unrelated to 'Record'. - return unmarshalledStream; } - serialize(input: AsyncIterable, serializer: (event: T) => Message): AsyncIterable { - // eslint-disable-next-line @typescript-eslint/no-this-alias - const self = this; - const serializedIterator = async function* () { - for await (const chunk of input) { - const payloadBuf = self.eventStreamCodec.encode(serializer(chunk)); - yield payloadBuf; - } - // Ending frame - yield new Uint8Array(0); - }; - return { - [Symbol.asyncIterator]: serializedIterator, - }; + serialize(inputStream: AsyncIterable, serializer: (event: T) => Message): AsyncIterable { + return new MessageEncoderStream({ + messageStream: new SmithyMessageEncoderStream({ inputStream, serializer }), + encoder: this.eventStreamCodec, + includeEndFrame: true, + }); } } diff --git a/packages/eventstream-serde-universal/src/getUnmarshalledStream.ts b/packages/eventstream-serde-universal/src/getUnmarshalledStream.ts index 81fbd5b9a779..e0d5b83687ea 100644 --- a/packages/eventstream-serde-universal/src/getUnmarshalledStream.ts +++ b/packages/eventstream-serde-universal/src/getUnmarshalledStream.ts @@ -17,40 +17,55 @@ export function getUnmarshalledStream>( source: AsyncIterable, options: UnmarshalledStreamOptions ): AsyncIterable { + const messageUnmarshaller = getMessageUnmarshaller(options.deserializer, options.toUtf8); return { [Symbol.asyncIterator]: async function* () { for await (const chunk of source) { const message = options.eventStreamCodec.decode(chunk); - const { value: messageType } = message.headers[":message-type"]; - if (messageType === "error") { - // Unmodeled exception in event - const unmodeledError = new Error((message.headers[":error-message"].value as string) || "UnknownError"); - unmodeledError.name = message.headers[":error-code"].value as string; - throw unmodeledError; - } else if (messageType === "exception") { - // For modeled exception, push it to deserializer and throw after deserializing - const code = message.headers[":exception-type"].value as string; - const exception = { [code]: message }; - // Get parsed exception event in key(error code) value(structured error) pair. - const deserializedException = await options.deserializer(exception); - if (deserializedException.$unknown) { - //this is an unmodeled exception then try parsing it with best effort - const error = new Error(options.toUtf8(message.body)); - error.name = code; - throw error; - } - throw deserializedException[code]; - } else if (messageType === "event") { - const event = { - [message.headers[":event-type"].value as string]: message, - }; - const deserialized = await options.deserializer(event); - if (deserialized.$unknown) continue; - yield deserialized; - } else { - throw Error(`Unrecognizable event type: ${message.headers[":event-type"].value}`); - } + const type = await messageUnmarshaller(message); + if (type === undefined) continue; + yield type; } }, }; } + +/** + * @internal + */ +export function getMessageUnmarshaller>( + deserializer: (input: Record) => Promise, + toUtf8: Encoder +): (input: Message) => Promise { + return async function (message: Message): Promise { + const { value: messageType } = message.headers[":message-type"]; + if (messageType === "error") { + // Unmodeled exception in event + const unmodeledError = new Error((message.headers[":error-message"].value as string) || "UnknownError"); + unmodeledError.name = message.headers[":error-code"].value as string; + throw unmodeledError; + } else if (messageType === "exception") { + // For modeled exception, push it to deserializer and throw after deserializing + const code = message.headers[":exception-type"].value as string; + const exception = { [code]: message }; + // Get parsed exception event in key(error code) value(structured error) pair. + const deserializedException = await deserializer(exception); + if (deserializedException.$unknown) { + //this is an unmodeled exception then try parsing it with best effort + const error = new Error(toUtf8(message.body)); + error.name = code; + throw error; + } + throw deserializedException[code]; + } else if (messageType === "event") { + const event = { + [message.headers[":event-type"].value as string]: message, + }; + const deserialized = await deserializer(event); + if (deserialized.$unknown) return; + return deserialized; + } else { + throw Error(`Unrecognizable event type: ${message.headers[":event-type"].value}`); + } + }; +} diff --git a/packages/middleware-eventstream/src/eventStreamConfiguration.ts b/packages/middleware-eventstream/src/eventStreamConfiguration.ts index 6df1f1f64987..1f813b0572f9 100644 --- a/packages/middleware-eventstream/src/eventStreamConfiguration.ts +++ b/packages/middleware-eventstream/src/eventStreamConfiguration.ts @@ -32,9 +32,10 @@ export function resolveEventStreamConfig( input: T & PreviouslyResolved & EventStreamInputConfig ): T & EventStreamResolvedConfig { const eventSigner = input.signer; + const messageSigner = input.signer; const eventStreamPayloadHandler = input.eventStreamPayloadHandlerProvider({ ...input, - eventSigner, + messageSigner, }); return { ...input, diff --git a/packages/signature-v4/package.json b/packages/signature-v4/package.json index 75d71bbae4df..ecec9964d448 100644 --- a/packages/signature-v4/package.json +++ b/packages/signature-v4/package.json @@ -28,6 +28,7 @@ "@aws-sdk/util-middleware": "*", "@aws-sdk/util-uri-escape": "*", "@aws-sdk/util-utf8": "*", + "@aws-sdk/eventstream-codec": "*", "tslib": "^2.5.0" }, "devDependencies": { diff --git a/packages/signature-v4/src/SignatureV4.spec.ts b/packages/signature-v4/src/SignatureV4.spec.ts index 142636f8d7d9..3da58981b6db 100644 --- a/packages/signature-v4/src/SignatureV4.spec.ts +++ b/packages/signature-v4/src/SignatureV4.spec.ts @@ -1,6 +1,6 @@ import { Sha256 } from "@aws-crypto/sha256-js"; import { HttpRequest } from "@aws-sdk/protocol-http"; -import { AwsCredentialIdentity } from "@aws-sdk/types"; +import { AwsCredentialIdentity, SignableMessage, TimestampHeaderValue } from "@aws-sdk/types"; import { ALGORITHM_IDENTIFIER, @@ -786,6 +786,45 @@ describe("SignatureV4", () => { }); }); + describe("#sign (message)", () => { + const signerInit = { + service: "SERVICE", + region: "REGION", + credentials: { + accessKeyId: "akid", + secretAccessKey: "secret", + }, + sha256: Sha256, + }; + + it("support message signing", async () => { + const signer = new SignatureV4(signerInit); + + const headers = { + ":date": { + type: "timestamp", + value: new Date("2018-12-29T01:04:06.000Z"), + } as TimestampHeaderValue, + }; + + const signedMessage = await signer.sign( + { + message: { + headers, + body: "foo" as any, + }, + priorSignature: "", + } as SignableMessage, + { + signingDate: new Date(1369353600000), + } + ); + expect(signedMessage.signature).toEqual("204bb5e2713e95354680e9522986d3ac0304aeafd33397f39e6540ca51ffe226"); + expect(signedMessage.message.body).toEqual("foo"); + expect(signedMessage.message.headers).toEqual(headers); + }); + }); + describe("ambient Date usage", () => { let dateSpy; const mockDate = new Date(); diff --git a/packages/signature-v4/src/SignatureV4.ts b/packages/signature-v4/src/SignatureV4.ts index f89424971d3a..4c984737a3e8 100644 --- a/packages/signature-v4/src/SignatureV4.ts +++ b/packages/signature-v4/src/SignatureV4.ts @@ -1,3 +1,4 @@ +import { HeaderMarshaller } from "@aws-sdk/eventstream-codec"; import { AwsCredentialIdentity, ChecksumConstructor, @@ -8,17 +9,20 @@ import { HashConstructor, HeaderBag, HttpRequest, + MessageSigner, Provider, RequestPresigner, RequestPresigningArguments, RequestSigner, RequestSigningArguments, + SignableMessage, + SignedMessage, SigningArguments, StringSigner, } from "@aws-sdk/types"; import { toHex } from "@aws-sdk/util-hex-encoding"; import { normalizeProvider } from "@aws-sdk/util-middleware"; -import { toUint8Array } from "@aws-sdk/util-utf8"; +import { fromUtf8, toUint8Array, toUtf8 } from "@aws-sdk/util-utf8"; import { ALGORITHM_IDENTIFIER, @@ -93,13 +97,14 @@ export interface SignatureV4CryptoInit { sha256: ChecksumConstructor | HashConstructor; } -export class SignatureV4 implements RequestPresigner, RequestSigner, StringSigner, EventSigner { +export class SignatureV4 implements RequestPresigner, RequestSigner, StringSigner, EventSigner, MessageSigner { private readonly service: string; private readonly regionProvider: Provider; private readonly credentialProvider: Provider; private readonly sha256: ChecksumConstructor | HashConstructor; private readonly uriEscapePath: boolean; private readonly applyChecksum: boolean; + private readonly headerMarshaller = new HeaderMarshaller(toUtf8, fromUtf8); constructor({ applyChecksum, @@ -165,12 +170,15 @@ export class SignatureV4 implements RequestPresigner, RequestSigner, StringSigne public async sign(stringToSign: string, options?: SigningArguments): Promise; public async sign(event: FormattedEvent, options: EventSigningArguments): Promise; + public async sign(event: SignableMessage, options: SigningArguments): Promise; public async sign(requestToSign: HttpRequest, options?: RequestSigningArguments): Promise; public async sign(toSign: any, options: any): Promise { if (typeof toSign === "string") { return this.signString(toSign, options); } else if (toSign.headers && toSign.payload) { return this.signEvent(toSign, options); + } else if (toSign.message) { + return this.signMessage(toSign, options); } else { return this.signRequest(toSign, options); } @@ -198,6 +206,28 @@ export class SignatureV4 implements RequestPresigner, RequestSigner, StringSigne return this.signString(stringToSign, { signingDate, signingRegion: region, signingService }); } + async signMessage( + signableMessage: SignableMessage, + { signingDate = new Date(), signingRegion, signingService }: SigningArguments + ): Promise { + const promise = this.signEvent( + { + headers: this.headerMarshaller.format(signableMessage.message.headers), + payload: signableMessage.message.body, + }, + { + signingDate, + signingRegion, + signingService, + priorSignature: signableMessage.priorSignature, + } + ); + + return promise.then((signature) => { + return { message: signableMessage.message, signature }; + }); + } + private async signString( stringToSign: string, { signingDate = new Date(), signingRegion, signingService }: SigningArguments = {} diff --git a/packages/types/src/encode.ts b/packages/types/src/encode.ts new file mode 100644 index 000000000000..2f0754a10449 --- /dev/null +++ b/packages/types/src/encode.ts @@ -0,0 +1,24 @@ +import { Message } from "./eventStream"; + +export interface MessageEncoder { + encode(message: Message): Uint8Array; +} + +export interface MessageDecoder { + decode(message: ArrayBufferView): Message; + feed(message: ArrayBufferView): void; + endOfStream(): void; + getMessage(): AvailableMessage; + getAvailableMessages(): AvailableMessages; +} + +export interface AvailableMessage { + getMessage(): Message | undefined; + + isEndOfStream(): boolean; +} + +export interface AvailableMessages { + getMessages(): Message[]; + isEndOfStream(): boolean; +} diff --git a/packages/types/src/eventStream.ts b/packages/types/src/eventStream.ts index f63a7f9d956a..ae1400e3ca19 100644 --- a/packages/types/src/eventStream.ts +++ b/packages/types/src/eventStream.ts @@ -23,77 +23,17 @@ export interface Message { */ export type MessageHeaders = Record; -/** - * @public - */ -export interface BooleanHeaderValue { - type: "boolean"; - value: boolean; -} - -/** - * @public - */ -export interface ByteHeaderValue { - type: "byte"; - value: number; -} - -/** - * @public - */ -export interface ShortHeaderValue { - type: "short"; - value: number; -} - -/** - * @public - */ -export interface IntegerHeaderValue { - type: "integer"; - value: number; -} - -/** - * @public - */ -export interface LongHeaderValue { - type: "long"; - value: Int64; -} - -/** - * @public - */ -export interface BinaryHeaderValue { - type: "binary"; - value: Uint8Array; -} - -/** - * @public - */ -export interface StringHeaderValue { - type: "string"; - value: string; -} - -/** - * @public - */ -export interface TimestampHeaderValue { - type: "timestamp"; - value: Date; -} - -/** - * @public - */ -export interface UuidHeaderValue { - type: "uuid"; - value: string; -} +type HeaderValue = { type: K; value: V }; + +export type BooleanHeaderValue = HeaderValue<"boolean", boolean>; +export type ByteHeaderValue = HeaderValue<"byte", number>; +export type ShortHeaderValue = HeaderValue<"short", number>; +export type IntegerHeaderValue = HeaderValue<"integer", number>; +export type LongHeaderValue = HeaderValue<"long", Int64>; +export type BinaryHeaderValue = HeaderValue<"binary", Uint8Array>; +export type StringHeaderValue = HeaderValue<"string", string>; +export type TimestampHeaderValue = HeaderValue<"timestamp", Date>; +export type UuidHeaderValue = HeaderValue<"uuid", string>; /** * @public diff --git a/packages/types/src/index.ts b/packages/types/src/index.ts index 17e9f0010e55..ce1e6cf85815 100644 --- a/packages/types/src/index.ts +++ b/packages/types/src/index.ts @@ -7,6 +7,7 @@ export * from "./connection"; export * from "./credentials"; export * from "./crypto"; export * from "./dns"; +export * from "./encode"; export * from "./endpoint"; export * from "./eventStream"; export * from "./http"; diff --git a/packages/types/src/signature.ts b/packages/types/src/signature.ts index 4edc59e97a2c..f1a2b3f89f37 100644 --- a/packages/types/src/signature.ts +++ b/packages/types/src/signature.ts @@ -1,3 +1,4 @@ +import { Message } from "./eventStream"; import { HttpRequest } from "./http"; /** @@ -139,3 +140,29 @@ export interface EventSigner { */ sign(event: FormattedEvent, options: EventSigningArguments): Promise; } + +/** + * @public + */ +export interface SignableMessage { + message: Message; + + priorSignature: string; +} + +/** + * @public + */ +export interface SignedMessage { + message: Message; + + signature: string; +} + +/** + * @public + */ +export interface MessageSigner { + signMessage(message: SignableMessage, args: SigningArguments): Promise; + sign(event: SignableMessage, options: SigningArguments): Promise; +}