Skip to content

Commit

Permalink
asynciterator still shows value instead of event
Browse files Browse the repository at this point in the history
  • Loading branch information
cdmbase committed Aug 24, 2017
1 parent 265f4e6 commit 249b666
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ describe('GraphQL-JS asyncIterator', () => {
const results = subscribe(schema, query);
const payload1 = results.next();

expect(isAsyncIterable(results)).toBeTruthy;
expect(isAsyncIterable(results)).toBeTruthy();

const r = payload1.then(res => {
expect(res.value.data.testSubscription).toEqual('FIRST_EVENT');
Expand Down
54 changes: 54 additions & 0 deletions src/__tests__/async-iterator.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { NatsPubSub } from '../nats-pubsub';
import { logger } from './logger';
import { isAsyncIterable } from 'iterall';
import 'jest';
const customLogger = logger.child({ className: '---------------async-iterator.test------------' });
describe('AsyncIterator', () => {
it('should expose valid asyncIterator for a specific event', () => {
const eventName = 'test1';
const ps = new NatsPubSub({ logger });
const iterator = ps.asyncIterator(eventName);
expect(iterator).toBeDefined();
expect(isAsyncIterable(iterator)).toBeTruthy();
});

it('should trigger event on asyncIterator when published', done => {
const eventName = 'test2';
const ps = new NatsPubSub({ logger });
const iterator = ps.asyncIterator(eventName);

iterator.next().then(result => {
customLogger.trace('result: (%j)', result);
expect(result).toBeDefined();
expect(result.value).toBeDefined();
expect(result.done).toBeDefined();
done();
});
ps.publish(eventName, { test: true });
});

it('should not trigger event on asyncIterator when publishing other event', () => {
const eventName = 'test3';
const ps = new NatsPubSub({ logger });
const iterator = ps.asyncIterator('test');
const spy = jest.fn();
iterator.next().then(spy);
ps.publish(eventName, { test: true });
expect(spy).not.toBeCalled();
});

it('register to multiple events', done => {
const eventName = 'test4';
const ps = new NatsPubSub({ logger });
const iterator = ps.asyncIterator(['test', 'test2']);
const spy = jest.fn();

iterator.next().then(() => {
spy();
expect(spy).toHaveBeenCalled();
done();
});
ps.publish(eventName, { test: true });
});
});

Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,15 @@ describe('SubscriptionManager', function () {
});
});

it('can subscribe with a nameless query and gets a subId back', function () {
const query = 'subscription { testSubscription }';
const callback = () => null;
subManager.subscribe({ query, operationName: 'X', callback }).then(subId => {
expect(typeof subId).toBe('number');
subManager.unsubscribe(subId);
});
});

it('can subscribe with a valid query and get the root value', (done) => {
const query = 'subscription X{ testSubscription}';
const callback = function (err, payload) {
Expand Down Expand Up @@ -189,6 +198,34 @@ describe('SubscriptionManager', function () {
}, 20);
});
});
it('can use a filter function that returns a promise', function (done) {
const query = `subscription Filter2($filterBoolean: Boolean){
testFilter(filterBoolean: $filterBoolean)
}`;
const callback = function (err, payload) {
if (err) {
done.fail(err);
return;
}
try {
expect(payload.data.testFilter).toBe('goodFilter');
} catch (e) {
done.fail(e);
return;
}
done();
};
subManager.subscribe({
query,
operationName: 'Filter2',
variables: { filterBoolean: true },
callback,
}).then(subId => {
subManager.publish('Filter2', { filterBoolean: false });
subManager.publish('Filter2', { filterBoolean: true });
subManager.unsubscribe(subId);
});
});
it('can subscribe to more than one trigger', function (done) {
// I also used this for testing arg parsing (with console.log)
// args a and b can safely be removed.
Expand Down Expand Up @@ -301,3 +338,4 @@ describe('SubscriptionManager', function () {

});
});

29 changes: 29 additions & 0 deletions src/__tests__/pusub.testNOT.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { NatsPubSub } from '../nats-pubsub';
import { logger } from './logger';


describe('PubSub', function () {
it('can subscribe and is called when events happen', function (done) {
const ps = new NatsPubSub({ logger });
ps.subscribe('a', payload => {
expect(payload).toBe('test');
done();
}).then(() => {
const succeed = ps.publish('a', 'test');
expect(succeed).toBeTruthy();
});
});

it('can unsubscribe', function (done) {
const ps = new NatsPubSub({ logger });
ps.subscribe('a', payload => {
expect(payload).toBe('test');
}).then(subId => {
ps.unsubscribe(subId);
const succeed = ps.publish('a', 'test');
expect(succeed).toBeTruthy(); // True because publish success is not
// indicated by trigger having subscriptions
done(); // works because pubsub is synchronous
});
});
});
20 changes: 11 additions & 9 deletions src/nats-pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,15 @@ export class NatsPubSub implements PubSubEngine {
}

public publish(trigger: string, payload: any): boolean {
this.logger && this.logger.trace("publishing for queue '%s' (%j)",
this.logger.trace("publishing to queue '%s' (%j)",
trigger, payload);
const message = Buffer.from(JSON.stringify(payload), this.parseMessageWithEncoding);
this.natsConnection.publish(trigger, message);
return true;
}

public subscribe(trigger: string, onMessage: Function, options?: object): Promise<number> {
this.logger && this.logger.trace("subscribing to queue '%s' with onMessage (%j), and options (%j) ",
this.logger.trace("subscribing to queue '%s' with onMessage (%j), and options (%j) ",
trigger, onMessage, options);

const triggerName: string = this.triggerTransform(trigger, options);
Expand All @@ -90,13 +90,13 @@ export class NatsPubSub implements PubSubEngine {

let refs = this.subsRefsMap[triggerName];
if (refs && refs.length > 0) {
this.logger.trace('refs already exist calling it (%j)', refs);
this.logger.trace('relavent topic (%s) is already subscribed', triggerName);
const newRefs = [...refs, id];
this.subsRefsMap[triggerName] = newRefs;
this.logger.trace('returing subId (%s)', id);
return Promise.resolve(id);
} else {
return new Promise<number>((resolve, reject) => {
this.logger.trace('topic (%s) is new and yet to be subscribed', triggerName);
// 1. Resolve options object
this.subscribeOptionsResolver(trigger, options).then(subscriptionOptions => {
this.logger.trace('resolve subscriptionoptions with options (%j)', options);
Expand All @@ -112,31 +112,32 @@ export class NatsPubSub implements PubSubEngine {

public unsubscribe(subId: number) {
const [triggerName = null] = this.subscriptionMap[subId] || [];
this.logger.trace('dumping saved subsMappers (%j)', this.subsRefsMap);
const refs = this.subsRefsMap[triggerName];
const natsSubId = this.natsSubMap[triggerName];
this.logger && this.logger.trace("unsubscribing to queue '%s' and natsSid: (%s)", subId, natsSubId);
this.logger.trace("unsubscribing to queue '%s' and natsSid: (%s)", subId, natsSubId);
if (!refs) {
this.logger.error('there are no subscriptions for triggerName (%s) and natsSid (%s)', triggerName, natsSubId);
throw new Error(`There is no subscription of id "${subId}"`);
}
if (refs.length === 1) {
this.logger.trace("unsubscribing and there won't be any subscriber");
this.natsConnection.unsubscribe(natsSubId);
delete this.natsSubMap[triggerName];
delete this.subsRefsMap[triggerName];
this.logger.trace('unsubscribe on nats for subId (%s) is completed', natsSubId);
this.logger.trace('unsubscribe on nats for subId (%s) is completed and there is no subscriber to topic (%s)',
natsSubId, triggerName);
} else {
const index = refs.indexOf(subId);
const newRefs = index === -1 ? refs : [...refs.slice(0, index), ...refs.slice(index + 1)];
this.subsRefsMap[triggerName] = newRefs;
this.logger.trace('unsubscribe on nats for subId (%s) is completed and there are still (%s) subscribers', newRefs.length);
}

delete this.subscriptionMap[subId];
}

public asyncIterator<T>(triggers: string | string[]): AsyncIterator<T> {
return new PubSubAsyncIterator<T>(this, triggers);
this.logger.trace('asyncIterator called with trigger (%j)', triggers);
return new PubSubAsyncIterator<T>(this, triggers, this.logger);
}

private onMessage(topic: string, message: Buffer) {
Expand All @@ -158,6 +159,7 @@ export class NatsPubSub implements PubSubEngine {

for (const subId of subscribers) {
const listener = this.subscriptionMap[subId][1];
this.logger.trace('subscription listener to run for subId (%s) and listener (%j)', subId, listener);
listener(parsedMessage);
}
}
Expand Down
34 changes: 22 additions & 12 deletions src/pubsub-async-iterator.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { $$asyncIterator } from 'iterall';
import { PubSubEngine } from 'graphql-subscriptions/dist/pubsub-engine';

import * as Logger from 'bunyan';
/**
* A class for digesting PubSubEngine events via the new AsyncIterator interface.
* This implementation is a generic version of the one located at
Expand Down Expand Up @@ -32,8 +32,17 @@ import { PubSubEngine } from 'graphql-subscriptions/dist/pubsub-engine';
*/
export class PubSubAsyncIterator<T> implements AsyncIterator<T> {

constructor(pubsub: PubSubEngine, eventNames: string | string[]) {
private pullQueue: Function[];
private pushQueue: any[];
private eventsArray: string[];
private allSubscribed: Promise<number[]>;
private listening: boolean;
private pubsub: PubSubEngine;
private logger: Logger;

constructor(pubsub: PubSubEngine, eventNames: string | string[], logger?: Logger) {
this.pubsub = pubsub;
this.logger = logger.child({className: 'pubsub-async-iterator'});
this.pullQueue = [];
this.pushQueue = [];
this.listening = true;
Expand All @@ -42,16 +51,19 @@ export class PubSubAsyncIterator<T> implements AsyncIterator<T> {
}

public async next() {
this.logger.trace('calling next pullQueue: (%j) pushQueue: (%j)', this.pullQueue, this.pushQueue);
await this.allSubscribed;
return this.listening ? this.pullValue() : this.return();
}

public async return() {
this.logger.trace('calling return');
this.emptyQueue(await this.allSubscribed);
return { value: undefined, done: true };
}

public async throw(error) {
this.logger.trace('throwing error');
this.emptyQueue(await this.allSubscribed);
return Promise.reject(error);
}
Expand All @@ -60,14 +72,8 @@ export class PubSubAsyncIterator<T> implements AsyncIterator<T> {
return this;
}

private pullQueue: Function[];
private pushQueue: any[];
private eventsArray: string[];
private allSubscribed: Promise<number[]>;
private listening: boolean;
private pubsub: PubSubEngine;

private async pushValue(event) {
this.logger.trace('pushing event (%j) into queue', event);
await this.allSubscribed;
if (this.pullQueue.length !== 0) {
this.pullQueue.shift()({ value: event, done: false });
Expand All @@ -77,6 +83,7 @@ export class PubSubAsyncIterator<T> implements AsyncIterator<T> {
}

private pullValue() {
this.logger.trace('pulling event from queue (%j)', this.pushQueue);
return new Promise((resolve => {
if (this.pushQueue.length !== 0) {
resolve({ value: this.pushQueue.shift(), done: false });
Expand All @@ -98,14 +105,17 @@ export class PubSubAsyncIterator<T> implements AsyncIterator<T> {

private subscribeAll() {
return Promise.all(this.eventsArray.map(
eventName => this.pubsub.subscribe(eventName, this.pushValue.bind(this), {}),
eventName => {
this.logger.trace('subscribing to eventName (%j), pushValue: (%j)', eventName, this.pushValue);
return this.pubsub.subscribe(eventName, this.pushValue.bind(this), {});
},
));
}

private unsubscribeAll(subscriptionIds: number[]) {
this.logger.trace('unsubscribed to all subIds (%j)', subscriptionIds);
for (const subscriptionId of subscriptionIds) {
this.pubsub.unsubscribe(subscriptionId);
}
}

}
}

0 comments on commit 249b666

Please sign in to comment.