Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
cdmbase committed Oct 15, 2017
1 parent 4ff1df5 commit 4f8951d
Show file tree
Hide file tree
Showing 9 changed files with 2,712 additions and 1,362 deletions.
3,810 changes: 2,574 additions & 1,236 deletions package-lock.json

Large diffs are not rendered by default.

28 changes: 16 additions & 12 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "graphql-nats-subscriptions",
"version": "1.0.6",
"version": "1.1.0",
"description": "",
"main": "dist/index.js",
"scripts": {
Expand Down Expand Up @@ -39,24 +39,28 @@
"testRegex": "/__tests__/.*test*\\.(ts|tsx|js)$"
},
"dependencies": {
"graphql-subscriptions": "^0.4.4",
"graphql-subscriptions": "^0.5.4",
"lodash": "^4.17.4",
"nats": "^0.7.20"
},
"devDependencies": {
"@cdm-logger/server": "^3.0.2",
"@types/bunyan": "^1.8.0",
"@types/graphql": "^0.10.0",
"@types/jest": "^20.0.7",
"@types/bunyan": "^1.8.3",
"@types/graphql": "^0.11.5",
"@types/jest": "^21.1.2",
"@types/node": "^8.0.0",
"graphql": "^0.10.5",
"iterall": "^1.1.1",
"jest": "^20.0.4",
"jest-cli": "^20.0.4",
"graphql": "^0.11.7",
"iterall": "^1.1.3",
"jest": "^21.2.1",
"jest-cli": "^21.2.1",
"simple-mock": "^0.8.0",
"ts-jest": "^20.0.10",
"ts-jest": "^21.1.2",
"ts-node": "^3.3.0",
"tslint": "^5.2.0",
"typescript": "2.3.4"
"tslint": "^5.7.0",
"typescript": "2.5.3"
},
"typings": "dist/index.d.ts",
"typescript": {
"definition": "dist/index.d.ts"
}
}
4 changes: 2 additions & 2 deletions src/__tests__/async-iterator-subscription.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
parse,
GraphQLSchema,
GraphQLObjectType,
GraphQLString,
GraphQLString, ExecutionResult,
} from 'graphql';
import { withFilter } from 'graphql-subscriptions';
import { subscribe } from 'graphql/subscription';
Expand Down Expand Up @@ -55,7 +55,7 @@ describe('GraphQL-JS asyncIterator', () => {
const orig2Iterator = pubsub.asyncIterator('TEST123');

const schema = buildSchema(origIterator);
const results = subscribe(schema, query);
const results: AsyncIterator<ExecutionResult> = subscribe(schema, query);
const payload1 = results.next();

expect(isAsyncIterable(results)).toBeTruthy();
Expand Down
93 changes: 93 additions & 0 deletions src/__tests__/async-iterator.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import { NatsPubSub } from '../nats-pubsub';
import { logger } from './logger';
import { isAsyncIterable } from 'iterall';
import 'jest';
const customLogger = logger.child({ className: '---------------async-iterator.test------------' });
describe('AsyncIterator', () => {
// it('should expose valid asyncIterator for a specific event', () => {
// const eventName = 'test1';
// const ps = new NatsPubSub({ logger });
// const iterator = ps.asyncIterator(eventName);
// expect(iterator).toBeDefined();
// expect(isAsyncIterable(iterator)).toBeTruthy();
// });

// it('should trigger event on asyncIterator when published', done => {
// const eventName = 'test2';
// const ps = new NatsPubSub({ logger });
// const iterator = ps.asyncIterator(eventName);

// iterator.next().then(result => {
// customLogger.trace('result: (%j)', result);
// try {
// expect(result).toBeDefined();
// expect(result.value).toBeDefined();
// expect(result.done).toBeDefined();
// done();
// } catch (e) {
// done.fail(e);
// }

// });
// ps.publish(eventName, { test: true });
// });

// it('should not trigger event on asyncIterator when publishing other event', () => {
// const eventName = 'test3';
// const ps = new NatsPubSub({ logger });
// const iterator = ps.asyncIterator('test');
// const spy = jest.fn();
// iterator.next().then(spy);
// ps.publish(eventName, { test: true });
// expect(spy).not.toBeCalled();
// });

// it('register to multiple events', done => {
// const eventName = 'test4';
// const ps = new NatsPubSub({ logger });
// const iterator = ps.asyncIterator(['test', 'test2']);
// const spy = jest.fn();

// iterator.next().then(() => {
// spy();
// expect(spy).toHaveBeenCalled();
// done();
// });
// ps.publish(eventName, { test: true });
// });

it('should not trigger event on asyncIterator already returned', done => {
const eventName = 'test5';
const ps = new NatsPubSub({ logger });
const iterator = ps.asyncIterator(eventName);

iterator.next().then(result => {
customLogger.trace('result: (%j)', result);

try {
expect(result).toBeDefined();
expect(result.value).toBeDefined();
expect(result.done).toBeFalsy();
} catch (e) {
done.fail(e);
}
});

ps.publish(eventName, { test: 'word' });

iterator.next().then(result => {
customLogger.trace('result: (%j)', result);
try {
expect(result).toBeDefined();
expect(result.value).not.toBeDefined();
expect(result.done).toBeTruthy();
done();
} catch (e) {
done.fail(e);
}
});

iterator.return();
ps.publish(eventName, { test: true });
});
});
93 changes: 0 additions & 93 deletions src/__tests__/async-iterator.testNOT.ts

This file was deleted.

File renamed without changes.
17 changes: 9 additions & 8 deletions src/nats-pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ export class NatsPubSub implements PubSubEngine {
return true;
}

public subscribe(trigger: string, onMessage: Function, options?: object): Promise<number> {
public async subscribe(trigger: string, onMessage: Function, options?: object): Promise<number> {
this.logger.trace("subscribing to queue '%s' with onMessage (%j), and options (%j) ",
trigger, onMessage, options);

Expand All @@ -93,20 +93,21 @@ export class NatsPubSub implements PubSubEngine {
this.logger.trace('relavent topic (%s) is already subscribed', triggerName);
const newRefs = [...refs, id];
this.subsRefsMap[triggerName] = newRefs;
return Promise.resolve(id);
return await id;
} else {
return new Promise<number>((resolve, reject) => {
// return new Promise<number>((resolve, reject) => {
this.logger.trace('topic (%s) is new and yet to be subscribed', triggerName);
// 1. Resolve options object
this.subscribeOptionsResolver(trigger, options).then(subscriptionOptions => {
// this.subscribeOptionsResolver(trigger, options).then(subscriptionOptions => {
this.logger.trace('resolve subscriptionoptions with options (%j)', options);
// 2. Subscribing using NATS
const subId = this.natsConnection.subscribe(triggerName, (msg) => this.onMessage(triggerName, msg));
this.subsRefsMap[triggerName] = [...(this.subsRefsMap[triggerName] || []), id];
this.natsSubMap[triggerName] = subId;
resolve(id);
});
});
return await id;
// });
// });

}
}

Expand Down Expand Up @@ -159,7 +160,7 @@ export class NatsPubSub implements PubSubEngine {

for (const subId of subscribers) {
const listener = this.subscriptionMap[subId][1];
this.logger.trace('subscription listener to run for subId (%s) and listener (%j)', subId, listener);
this.logger.trace('subscription listener to run for subId (%s)', subId);
listener(parsedMessage);
}
}
Expand Down
26 changes: 17 additions & 9 deletions src/pubsub-async-iterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ export class PubSubAsyncIterator<T> implements AsyncIterator<T> {
private logger: Logger;

constructor(pubsub: PubSubEngine, eventNames: string | string[], logger?: Logger) {
this.logger = logger.child({ className: 'pubsub-async-iterator' });
this.pubsub = pubsub;
this.logger = logger.child({className: 'pubsub-async-iterator'});
this.pullQueue = [];
this.pushQueue = [];
this.listening = true;
Expand All @@ -51,13 +51,13 @@ export class PubSubAsyncIterator<T> implements AsyncIterator<T> {
}

public async next() {
this.logger.trace('calling next pullQueue: (%j) pushQueue: (%j)', this.pullQueue, this.pushQueue);
this.logger.trace('next has been called, current state [ pullQueue: (%j) pushQueue: (%j)]', this.pullQueue, this.pushQueue);
await this.allSubscribed;
return this.listening ? this.pullValue() : this.return();
}

public async return() {
this.logger.trace('calling return');
this.logger.trace('calling [return]');
this.emptyQueue(await this.allSubscribed);
return { value: undefined, done: true };
}
Expand All @@ -69,32 +69,39 @@ export class PubSubAsyncIterator<T> implements AsyncIterator<T> {
}

public [$$asyncIterator]() {
this.logger.trace('[$$asyncIterator]');
return this;
}

private async pushValue(event) {
this.logger.trace('pushing event (%j) into queue', event);
this.logger.trace('[pushValue] with event (%j)', event);
await this.allSubscribed;
if (this.pullQueue.length !== 0) {
this.logger.trace('pull event (%j) from pullQueue (%j)', event, this.pullQueue);
this.pullQueue.shift()({ value: event, done: false });
} else {
this.pushQueue.push(event);
this.logger.trace('push event (%j) to pushQueue (%j)', event, this.pullQueue);
}
}

private pullValue() {
this.logger.trace('pulling event from queue (%j)', this.pushQueue);
private pullValue(): Promise<IteratorResult<any>> {
this.logger.trace('[pullValue] ');
return new Promise((resolve => {
if (this.pushQueue.length !== 0) {
this.logger.trace('pluck last event from pushQueue (%j)', this.pushQueue);
resolve({ value: this.pushQueue.shift(), done: false });
} else {
this.pullQueue.push(resolve);
this.logger.trace('push Promise.resolve into pullQueue (%j)', this.pullQueue);
}
}).bind(this));
}));
}

private emptyQueue(subscriptionIds: number[]) {
this.logger.trace('[emptyQueue] ');
if (this.listening) {
this.logger.trace('listening is true, it will unsubscribeAll, will empty all elements in pullQueue (%j)', this.pullQueue);
this.listening = false;
this.unsubscribeAll(subscriptionIds);
this.pullQueue.forEach(resolve => resolve({ value: undefined, done: true }));
Expand All @@ -104,16 +111,17 @@ export class PubSubAsyncIterator<T> implements AsyncIterator<T> {
}

private subscribeAll() {
this.logger.trace('[subscribeAll] ');
return Promise.all(this.eventsArray.map(
eventName => {
this.logger.trace('subscribing to eventName (%j), pushValue: (%j)', eventName, this.pushValue);
this.logger.trace('subscribing to eventName (%j) with onMessage as this.pushValue', eventName);
return this.pubsub.subscribe(eventName, this.pushValue.bind(this), {});
},
));
}

private unsubscribeAll(subscriptionIds: number[]) {
this.logger.trace('unsubscribed to all subIds (%j)', subscriptionIds);
this.logger.trace('unsubscribeAll to all subIds (%j)', subscriptionIds);
for (const subscriptionId of subscriptionIds) {
this.pubsub.unsubscribe(subscriptionId);
}
Expand Down
3 changes: 1 addition & 2 deletions tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
"noLib": false,
"preserveConstEnums": false,
"moduleResolution": "node",
// "sourceMap": true,
"inlineSourceMap": true,
"sourceMap": true,
"rootDir": "./src",
"outDir": "./dist",
"allowSyntheticDefaultImports": true,
Expand Down

0 comments on commit 4f8951d

Please sign in to comment.