Skip to content
This repository has been archived by the owner on Oct 2, 2024. It is now read-only.

feat: add jetstream push subscribe wrappers #481

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions components/channel/jetstreamPushSubscription.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { realizeChannelName, camelCase, getMessageType, messageHasNullPayload, realizeParametersForChannelWrapper, renderJSDocParameters} from '../../utils/index';
import { unwrap } from './ChannelParameterUnwrap';
// eslint-disable-next-line no-unused-vars
import { Message, ChannelParameter } from '@asyncapi/parser';

/**
* Component which returns a function which subscribes to the given channel
*
* @param {string} defaultContentType
* @param {string} channelName to subscribe to
* @param {Message} message which is being received
* @param {Object.<string, ChannelParameter>} channelParameters parameters to the channel
*/
export function JetstreamPushSubscription(channelName, message, channelParameters) {
const messageType = getMessageType(message);
let parameters = [];
parameters = Object.entries(channelParameters).map(([parameterName]) => {
return `${camelCase(parameterName)}Param`;
});
const hasNullPayload = messageHasNullPayload(message.payload());

//Determine the callback process when receiving messages.
//If the message payload is null no hooks are called to process the received data.
let whenReceivingMessage = `onDataCallback(undefined, null ${parameters.length > 0 && `, ${parameters.join(',')}`});`;
if (!hasNullPayload) {
whenReceivingMessage = `
let receivedData: any = codec.decode(msg.data);
onDataCallback(undefined, ${messageType}.unmarshal(receivedData) ${parameters.length > 0 && `, ${parameters.join(',')}`});
`;
}

return `

/**
* Internal functionality to setup jetstream push subscription on the \`${channelName}\` channel
*
* @param onDataCallback to call when messages are received
* @param nc to subscribe with
* @param codec used to convert messages
${renderJSDocParameters(channelParameters)}
* @param options to subscribe with, bindings from the AsyncAPI document overwrite these if specified
*/
export function jetStreamPushSubscribe(
onDataCallback: (
err ? : NatsTypescriptTemplateError,
msg?: ${messageType}
${realizeParametersForChannelWrapper(channelParameters, false)},
jetstreamMsg?: Nats.JsMsg) => void,
js: Nats.JetStreamClient,
codec: Nats.Codec < any >
${realizeParametersForChannelWrapper(channelParameters)},
options: Nats.ConsumerOptsBuilder | Partial<Nats.ConsumerOpts>
): Promise < Nats.JetStreamSubscription > {
return new Promise(async (resolve, reject) => {
try {
let subscription = js.subscribe(${realizeChannelName(channelParameters, channelName)}, options);
(async () => {
for await (const msg of await subscription) {
${unwrap(channelName, channelParameters)}

${whenReceivingMessage}
}
console.log("subscription closed");
})();
resolve(subscription);
} catch (e: any) {
reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.INTERNAL_NATS_TS_ERROR, e));
}
})
}
`;
}
55 changes: 55 additions & 0 deletions components/index/jetstreamPushSubscription.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { pascalCase, camelCase, getMessageType, realizeParametersForChannelWrapper, realizeParametersForChannelWithoutType, renderJSDocParameters} from '../../utils/index';
// eslint-disable-next-line no-unused-vars
import { Message, ChannelParameter } from '@asyncapi/parser';

/**
* Component which returns a subscribe to function for the client
*
* @param {string} defaultContentType
* @param {string} channelName to publish to
* @param {Message} message which is being received
* @param {string} messageDescription
* @param {Object.<string, ChannelParameter>} channelParameters parameters to the channel
*/
export function JetstreamPushSubscription(channelName, message, messageDescription, channelParameters) {
return `
/**
* Push subscription to the \`${channelName}\`
*
* ${messageDescription}
*
* @param onDataCallback to call when messages are received
${renderJSDocParameters(channelParameters)}
* @param flush ensure client is force flushed after subscribing
* @param options to subscribe with, bindings from the AsyncAPI document overwrite these if specified
*/
public jetStreamPushSubscribeTo${pascalCase(channelName)}(
onDataCallback: (
err?: NatsTypescriptTemplateError,
msg?: ${getMessageType(message)}
${realizeParametersForChannelWrapper(channelParameters, false)},
jetstreamMsg?: Nats.JsMsg) => void
${realizeParametersForChannelWrapper(channelParameters)},
options: Nats.ConsumerOptsBuilder | Partial<Nats.ConsumerOpts>
): Promise<Nats.JetStreamSubscription> {
return new Promise(async (resolve, reject) => {
if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined && this.js !== undefined) {
try {
const sub = await ${camelCase(channelName)}Channel.jetStreamPushSubscribe(
onDataCallback,
this.js,
this.codec,
${Object.keys(channelParameters).length ? `${realizeParametersForChannelWithoutType(channelParameters)},` : ''}
options
);
resolve(sub);
} catch (e: any) {
reject(e);
}
} else {
reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.NOT_CONNECTED));
}
});
}
`;
}
17 changes: 17 additions & 0 deletions examples/simple-publish/asyncapi-nats-client/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ The test/mirror client which is the reverse to the normal NatsAsyncApiClient.
* [.connectToLocal()](#NatsAsyncApiTestClient+connectToLocal)
* [.subscribeToStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, flush, options)](#NatsAsyncApiTestClient+subscribeToStreetlightStreetlightIdCommandTurnon)
* [.jetStreamPullStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, options)](#NatsAsyncApiTestClient+jetStreamPullStreetlightStreetlightIdCommandTurnon)
* [.jetStreamPushSubscribeToStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, flush, options)](#NatsAsyncApiTestClient+jetStreamPushSubscribeToStreetlightStreetlightIdCommandTurnon)

<a name="NatsAsyncApiTestClient+connect"></a>

Expand Down Expand Up @@ -296,3 +297,19 @@ Channel for the turn on command which should turn on the streetlight
| streetlight_id | parameter to use in topic |
| options | to pull message with, bindings from the AsyncAPI document overwrite these if specified |

<a name="NatsAsyncApiTestClient+jetStreamPushSubscribeToStreetlightStreetlightIdCommandTurnon"></a>

### natsAsyncApiTestClient.jetStreamPushSubscribeToStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, flush, options)
Push subscription to the `streetlight/{streetlight_id}/command/turnon`

Channel for the turn on command which should turn on the streetlight

**Kind**: instance method of [<code>NatsAsyncApiTestClient</code>](#NatsAsyncApiTestClient)

| Param | Description |
| --- | --- |
| onDataCallback | to call when messages are received |
| streetlight_id | parameter to use in topic |
| flush | ensure client is force flushed after subscribing |
| options | to subscribe with, bindings from the AsyncAPI document overwrite these if specified |

Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,40 @@ export class NatsAsyncApiTestClient {
throw NatsTypescriptTemplateError.errorForCode(ErrorCode.NOT_CONNECTED);
}
}
/**
* Push subscription to the `streetlight/{streetlight_id}/command/turnon`
*
* Channel for the turn on command which should turn on the streetlight
*
* @param onDataCallback to call when messages are received
* @param streetlight_id parameter to use in topic
* @param flush ensure client is force flushed after subscribing
* @param options to subscribe with, bindings from the AsyncAPI document overwrite these if specified
*/
public jetStreamPushSubscribeToStreetlightStreetlightIdCommandTurnon(
onDataCallback: (
err ? : NatsTypescriptTemplateError,
msg ? : TurnOn, streetlight_id ? : string,
jetstreamMsg ? : Nats.JsMsg) => void, streetlight_id: string,
options: Nats.ConsumerOptsBuilder | Partial < Nats.ConsumerOpts >
): Promise < Nats.JetStreamSubscription > {
return new Promise(async (resolve, reject) => {
if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined && this.js !== undefined) {
try {
const sub = await streetlightStreetlightIdCommandTurnonChannel.jetStreamPushSubscribe(
onDataCallback,
this.js,
this.codec,
streetlight_id,
options
);
resolve(sub);
} catch (e: any) {
reject(e);
}
} else {
reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.NOT_CONNECTED));
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,48 @@ export function jetStreamPull(
let receivedData: any = codec.decode(msg.data);
onDataCallback(undefined, TurnOn.unmarshal(receivedData), streetlightIdParam, msg);
})();
}
/**
* Internal functionality to setup jetstream push subscription on the `streetlight/{streetlight_id}/command/turnon` channel
*
* @param onDataCallback to call when messages are received
* @param nc to subscribe with
* @param codec used to convert messages
* @param streetlight_id parameter to use in topic
* @param options to subscribe with, bindings from the AsyncAPI document overwrite these if specified
*/
export function jetStreamPushSubscribe(
onDataCallback: (
err ? : NatsTypescriptTemplateError,
msg ? : TurnOn, streetlight_id ? : string,
jetstreamMsg ? : Nats.JsMsg) => void,
js: Nats.JetStreamClient,
codec: Nats.Codec < any > , streetlight_id: string,
options: Nats.ConsumerOptsBuilder | Partial < Nats.ConsumerOpts >
): Promise < Nats.JetStreamSubscription > {
return new Promise(async (resolve, reject) => {
try {
let subscription = js.subscribe(`streetlight.${streetlight_id}.command.turnon`, options);
(async () => {
for await (const msg of await subscription) {
const unmodifiedChannel = `streetlight.{streetlight_id}.command.turnon`;
let channel = msg.subject;
const streetlightIdSplit = unmodifiedChannel.split("{streetlight_id}");
const splits = [
streetlightIdSplit[0],
streetlightIdSplit[1]
];
channel = channel.substring(splits[0].length);
const streetlightIdEnd = channel.indexOf(splits[1]);
const streetlightIdParam = "" + channel.substring(0, streetlightIdEnd);
let receivedData: any = codec.decode(msg.data);
onDataCallback(undefined, TurnOn.unmarshal(receivedData), streetlightIdParam);
}
console.log("subscription closed");
})();
resolve(subscription);
} catch (e: any) {
reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.INTERNAL_NATS_TS_ERROR, e));
}
})
}
33 changes: 33 additions & 0 deletions examples/simple-subscribe/asyncapi-nats-client/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Module which wraps functionality for the `streetlight/{streetlight_id}/command/t
* [streetlightStreetlightIdCommandTurnon](#module_streetlightStreetlightIdCommandTurnon)
* [~subscribe(onDataCallback, nc, codec, streetlight_id, options)](#module_streetlightStreetlightIdCommandTurnon..subscribe)
* [~jetStreamPull(onDataCallback, js, codec, streetlight_id)](#module_streetlightStreetlightIdCommandTurnon..jetStreamPull)
* [~jetStreamPushSubscribe(onDataCallback, nc, codec, streetlight_id, options)](#module_streetlightStreetlightIdCommandTurnon..jetStreamPushSubscribe)

<a name="module_streetlightStreetlightIdCommandTurnon..subscribe"></a>

Expand Down Expand Up @@ -58,6 +59,21 @@ Internal functionality to setup jetstrema pull on the `streetlight/{streetlight_
| codec | used to convert messages |
| streetlight_id | parameter to use in topic |

<a name="module_streetlightStreetlightIdCommandTurnon..jetStreamPushSubscribe"></a>

### streetlightStreetlightIdCommandTurnon~jetStreamPushSubscribe(onDataCallback, nc, codec, streetlight_id, options)
Internal functionality to setup jetstream push subscription on the `streetlight/{streetlight_id}/command/turnon` channel

**Kind**: inner method of [<code>streetlightStreetlightIdCommandTurnon</code>](#module_streetlightStreetlightIdCommandTurnon)

| Param | Description |
| --- | --- |
| onDataCallback | to call when messages are received |
| nc | to subscribe with |
| codec | used to convert messages |
| streetlight_id | parameter to use in topic |
| options | to subscribe with, bindings from the AsyncAPI document overwrite these if specified |

<a name="NatsAsyncApiClient"></a>

## NatsAsyncApiClient
Expand All @@ -77,6 +93,7 @@ The generated client based on your AsyncAPI document.
* [.connectToLocal()](#NatsAsyncApiClient+connectToLocal)
* [.subscribeToStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, flush, options)](#NatsAsyncApiClient+subscribeToStreetlightStreetlightIdCommandTurnon)
* [.jetStreamPullStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, options)](#NatsAsyncApiClient+jetStreamPullStreetlightStreetlightIdCommandTurnon)
* [.jetStreamPushSubscribeToStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, flush, options)](#NatsAsyncApiClient+jetStreamPushSubscribeToStreetlightStreetlightIdCommandTurnon)

<a name="NatsAsyncApiClient+connect"></a>

Expand Down Expand Up @@ -178,6 +195,22 @@ Channel for the turn on command which should turn on the streetlight
| streetlight_id | parameter to use in topic |
| options | to pull message with, bindings from the AsyncAPI document overwrite these if specified |

<a name="NatsAsyncApiClient+jetStreamPushSubscribeToStreetlightStreetlightIdCommandTurnon"></a>

### natsAsyncApiClient.jetStreamPushSubscribeToStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, flush, options)
Push subscription to the `streetlight/{streetlight_id}/command/turnon`

Channel for the turn on command which should turn on the streetlight

**Kind**: instance method of [<code>NatsAsyncApiClient</code>](#NatsAsyncApiClient)

| Param | Description |
| --- | --- |
| onDataCallback | to call when messages are received |
| streetlight_id | parameter to use in topic |
| flush | ensure client is force flushed after subscribing |
| options | to subscribe with, bindings from the AsyncAPI document overwrite these if specified |

<a name="NatsAsyncApiTestClient"></a>

## NatsAsyncApiTestClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,48 @@ export function jetStreamPull(
let receivedData: any = codec.decode(msg.data);
onDataCallback(undefined, TurnOn.unmarshal(receivedData), streetlightIdParam, msg);
})();
}
/**
* Internal functionality to setup jetstream push subscription on the `streetlight/{streetlight_id}/command/turnon` channel
*
* @param onDataCallback to call when messages are received
* @param nc to subscribe with
* @param codec used to convert messages
* @param streetlight_id parameter to use in topic
* @param options to subscribe with, bindings from the AsyncAPI document overwrite these if specified
*/
export function jetStreamPushSubscribe(
onDataCallback: (
err ? : NatsTypescriptTemplateError,
msg ? : TurnOn, streetlight_id ? : string,
jetstreamMsg ? : Nats.JsMsg) => void,
js: Nats.JetStreamClient,
codec: Nats.Codec < any > , streetlight_id: string,
options: Nats.ConsumerOptsBuilder | Partial < Nats.ConsumerOpts >
): Promise < Nats.JetStreamSubscription > {
return new Promise(async (resolve, reject) => {
try {
let subscription = js.subscribe(`streetlight.${streetlight_id}.command.turnon`, options);
(async () => {
for await (const msg of await subscription) {
const unmodifiedChannel = `streetlight.{streetlight_id}.command.turnon`;
let channel = msg.subject;
const streetlightIdSplit = unmodifiedChannel.split("{streetlight_id}");
const splits = [
streetlightIdSplit[0],
streetlightIdSplit[1]
];
channel = channel.substring(splits[0].length);
const streetlightIdEnd = channel.indexOf(splits[1]);
const streetlightIdParam = "" + channel.substring(0, streetlightIdEnd);
let receivedData: any = codec.decode(msg.data);
onDataCallback(undefined, TurnOn.unmarshal(receivedData), streetlightIdParam);
}
console.log("subscription closed");
})();
resolve(subscription);
} catch (e: any) {
reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.INTERNAL_NATS_TS_ERROR, e));
}
})
}
Loading