Skip to content

Commit

Permalink
Nats subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
Igor-Adlab committed Feb 17, 2017
1 parent 8db7860 commit d793123
Show file tree
Hide file tree
Showing 9 changed files with 288 additions and 31 deletions.
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
node_modules
.idea
node_modules
dist
coverage
typings
npm-debug.log
6 changes: 6 additions & 0 deletions .npmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
src
node_modules
typings
tsconfig.json
typings.json
tslint.json
41 changes: 41 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"name": "subscriptions-transport-nats",
"version": "1.0.3",
"description": "",
"main": "dist/index.js",
"scripts": {
"test": "npm run integration",
"compile": "tsc",
"watch": "tsc -w",
"pretest": "npm run compile",
"prepublish": "npm run compile",
"integration": "npm run compile && mocha --compilers js:babel-core/register --reporter spec --full-trace ./dist/test/integration-tests.js "
},
"repository": {
"type": "git",
"url": "git+https://igor-adlab:Adlab123@github.com/Igor-Adlab/subscriptions-transport-nats.git"
},
"author": "",
"license": "ISC",
"bugs": {
"url": "https://github.com/Igor-Adlab/subscriptions-transport-nats/issues"
},
"homepage": "https://github.com/Igor-Adlab/subscriptions-transport-nats#readme",
"dependencies": {
"@types/mocha": "^2.2.39",
"@types/node": "^7.0.5",
"async": "^2.1.4",
"babel-core": "^6.23.1",
"bunyan": "^1.8.5",
"cdm-logger": "^0.3.2",
"chai": "^3.5.0",
"chai-as-promised": "^6.0.0",
"graphql": "^0.9.1",
"graphql-subscriptions": "^0.3.0",
"lodash": "^4.17.4",
"nats": "^0.7.4"
},
"devDependencies": {
"mocha": "^3.2.0"
}
}
6 changes: 0 additions & 6 deletions src/INatsOptions.ts

This file was deleted.

94 changes: 73 additions & 21 deletions src/PubSub.ts
Original file line number Diff line number Diff line change
@@ -1,43 +1,95 @@
import { PubSubEngine } from 'graphql-subscriptions/dist/pubsub';
import { connect } from 'nats';
import { Logger } from 'bunyan';
import Logger from 'bunyan';
import _ from 'lodash';

interface NatsConnection {
url: String
token: String
user: String
pass: String
}

interface NatsPubSubOptions {
config: NatsConnection | string,
export interface NatsPubSubOptions {
config: string | any,
connectionListener?: (err: Error) => void;
logger?: Logger;
}

class PubSub implements PubSubEngine {
export class PubSub implements PubSubEngine {
logger: Logger;
connection: any;
nextId: number;
refs: Map;
subscriptions: Map;
listeners: Map<number, any>; // { [subId]: onMessage }
refs: Map<any, Array<number>>; // { [topic]: [ subId1, subId2, ... ] }
subscriptions: Map<any, any>; // { [topic]: { sid } } -- NATS Subscriptions

constructor(options: NatsPubSubOptions) {
public constructor(options: NatsPubSubOptions) {
this.connection = connect(options.config);
this.logger = options.logger;

this.subscriptionMap = {};
this.refs = {};
this.subscriptions = new Map();
this.listeners = new Map<number, any>();
this.refs = new Map();
this.nextId = 0;
}

public publish(topic, payload): boolean {
public publish(topic, payload: any): boolean {
this.logger.trace("publishing for queue '%s' (%j)", topic, payload);
try {
payload = JSON.stringify(payload)
} catch (e) {
this.logger.trace("Can not publish message: %j", payload);
payload = "{}";
}
this.connection.publish(topic, payload);

return true;
}

public subscribe(topic, onMessage): Promise<number> {
public subscribe(topic: String, onMessage: Function): Promise<number> {
const id = this.nextId++;
const subscription = this.subscriptions.get(topic);
let sid;


if(!subscription) {
sid = this.connection.subscribe(topic, msg => this.onMessage(topic, msg));
this.subscriptions.set(topic, { sid });
} else {
sid = subscription.sid;
}

this.listeners.set(id, { id, topic, handler: onMessage, sid });
this.refs.set(topic, [...(this.refs.get(topic) || []), id]);

return Promise.resolve(id);
}
}

export default PubSub
public unsubscribe(sid: number) {
const subscription = this.listeners.get(sid);
if(subscription) {
const { topic } = subscription;
const ids = this.refs.get(topic) || [];
_.remove(ids, id => id === sid);

if(!ids.length) {
this.connection.unsubscribe(subscription.sid);
this.subscriptions.delete(topic);
this.refs.delete(topic);
this.listeners.delete(sid);
} else {
this.refs.set(topic, ids);
}
}
}

private onMessage(topic, msg) {
try {
msg = JSON.parse(msg)
} catch(e) {
this.logger.trace("Can not parse message: %j", msg);
msg = {};
}
const listeners = this.refs.get(topic) || [];
listeners.forEach(sid => {
if(this.listeners.has(sid)) {
this.logger.trace("sending message to subscriber callback function '(%j)'", msg);
const { handler } = this.listeners.get(sid);
handler(msg);
}
});
}
}
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { PubSub as NatsPubSub } from './PubSub'
24 changes: 21 additions & 3 deletions tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,26 @@
{
"compilerOptions": {
"module": "commonjs",
"target": "es5",
"module": "commonjs",
"experimentalDecorators": true,
"declaration": true,
"removeComments": true,
"noImplicitAny": false,
"sourceMap": false
}
"noLib": false,
"preserveConstEnums": false,
"moduleResolution": "node",
"sourceMap": true,
"rootDir": "./src",
"outDir": "./dist",
"allowSyntheticDefaultImports": true,
"pretty": true,
"lib": [
"es2015",
"es5"
]
},
"exclude": [
"node_modules",
"dist"
]
}
134 changes: 134 additions & 0 deletions tslint.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
{
"rules": {
"align": [
false,
"parameters",
"arguments",
"statements"
],
"ban": false,
"class-name": true,
"curly": true,
"eofline": true,
"forin": true,
"indent": [
true,
"spaces"
],
"interface-name": false,
"jsdoc-format": true,
"label-position": true,
"max-line-length": [
true,
140
],
"member-access": true,
"member-ordering": [
true,
"public-before-private",
"static-before-instance",
"variables-before-functions"
],
"no-any": false,
"no-arg": true,
"no-bitwise": true,
"no-conditional-assignment": true,
"no-consecutive-blank-lines": false,
"no-console": [
true,
"log",
"debug",
"info",
"time",
"timeEnd",
"trace"
],
"no-construct": true,
"no-debugger": true,
"no-duplicate-variable": true,
"no-empty": true,
"no-eval": true,
"no-inferrable-types": false,
"no-internal-module": true,
"no-null-keyword": false,
"no-require-imports": false,
"no-shadowed-variable": true,
"no-switch-case-fall-through": true,
"no-trailing-whitespace": true,
"no-unused-expression": true,
"no-use-before-declare": true,
"no-var-keyword": true,
"no-var-requires": true,
"object-literal-sort-keys": false,
"one-line": [
true,
"check-open-brace",
"check-catch",
"check-else",
"check-finally",
"check-whitespace"
],
"quotemark": [
true,
"single",
"avoid-escape"
],
"radix": true,
"semicolon": [
true,
"always"
],
"switch-default": true,
"trailing-comma": [
true,
{
"multiline": "always",
"singleline": "never"
}
],
"triple-equals": [
true,
"allow-null-check"
],
"typedef": [
false,
"call-signature",
"parameter",
"arrow-parameter",
"property-declaration",
"variable-declaration",
"member-variable-declaration"
],
"typedef-whitespace": [
true,
{
"call-signature": "nospace",
"index-signature": "nospace",
"parameter": "nospace",
"property-declaration": "nospace",
"variable-declaration": "nospace"
},
{
"call-signature": "space",
"index-signature": "space",
"parameter": "space",
"property-declaration": "space",
"variable-declaration": "space"
}
],
"variable-name": [
true,
"check-format",
"allow-leading-underscore",
"ban-keywords"
],
"whitespace": [
true,
"check-branch",
"check-decl",
"check-operator",
"check-separator",
"check-type"
]
}
}
7 changes: 7 additions & 0 deletions typings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"dependencies": {},
"devDependencies": {},
"ambientDependencies": {
"node": "github:DefinitelyTyped/DefinitelyTyped/node/node.d.ts#1c56e368e17bb28ca57577250624ca5bd561aa81"
}
}

0 comments on commit d793123

Please sign in to comment.