diff --git a/README.md b/README.md index 9deaa9f..a77cc7c 100644 --- a/README.md +++ b/README.md @@ -31,10 +31,17 @@ c.get "/key", (err, val) -> c.del "/key", (err, val) -> console.log err, val -# Watch a value +# Watch a value (wait for a single value change) c.watch "/key", (err, val) -> console.log err, val +# Watcher retuns an eventemitter for continuously watching changes, +# it also handles reconnect on error, etc.. +w = c.watcher '/key' + +w.on 'change', console.log +w.on 'reconnect', console.log + # Set with expiry (time to live) c.setTTL "/key", "value", 5, (err, val) -> console.log err, val diff --git a/src/index.coffee b/src/index.coffee index e9e42f0..77d26d5 100644 --- a/src/index.coffee +++ b/src/index.coffee @@ -1,5 +1,6 @@ request = require 'request' _ = require 'underscore' +Watcher = require './watcher' class Etcd @@ -64,6 +65,10 @@ class Etcd request.post opt, @_responseHandler callback + # Returns an eventemitter that watches a key, emits 'change' on value change + # or 'reconnect' when trying to recover from errors. + watcher: (key, index = null) => + return new Watcher this, key, index # Get the etcd cluster machines machines: (callback) -> diff --git a/src/watcher.coffee b/src/watcher.coffee new file mode 100644 index 0000000..180c768 --- /dev/null +++ b/src/watcher.coffee @@ -0,0 +1,37 @@ +{EventEmitter} = require 'events' + +# A eventemitter for watching changes on a given key for etcd. +# Emits: +# 'change' - on value change +# 'reconnect' - on errors/timeouts +# +# Automatically reconnects and backs off on errors. +# +class Watcher extends EventEmitter + + constructor: (@etcd, @key, @index = null) -> + @retryAttempts = 0 + @_watch() + + _watch: () => + if @index is null + @etcd.watch @key, @_respHandler + else + @etcd.watchIndex @key, @index, @_respHandler + + _respHandler: (err, val) => + if val?.index? + @retryAttempts = 0 + @index = val.index + 1 + @emit 'change', val + @_watch() + + if err isnt null + @emit 'reconnect', { error: err, reconnectcount: @retryAttempts } + @_retry() + + _retry: () => + setTimeout @_watch, 500 * @retryAttempts + @retryAttempts++ + +exports = module.exports = Watcher diff --git a/test/test.coffee b/test/index.coffee similarity index 100% rename from test/test.coffee rename to test/index.coffee diff --git a/test/watcher.coffee b/test/watcher.coffee new file mode 100644 index 0000000..ccffc4c --- /dev/null +++ b/test/watcher.coffee @@ -0,0 +1,63 @@ +require 'should' +Watcher = require '../src/watcher.coffee' + +class FakeEtcd + constructor: () -> + @cb = () -> + + watch: (key, cb) -> + key.should.equal 'key' + @cb = cb + + watchIndex: (key, index, cb) -> + key.should.equal 'key' + @cb = cb + + change: (err, val) -> + @cb err, val + + +describe 'Watcher', () -> + it 'should emit change on watch change', (done) -> + etcd = new FakeEtcd + w = new Watcher etcd, 'key' + + w.on 'change', (val) -> + val.should.include { index: 0 } + done() + + etcd.change null, { index: 0 } + + it 'should emit reconnect event on error', (done) -> + etcd = new FakeEtcd + w = new Watcher etcd, 'key' + + w.on 'reconnect', (err) -> + err.should.include { error: "error" } + done() + + etcd.change "error", null + + it 'should reconnect (call watch again) on error', (done) -> + etcd = new FakeEtcd + w = new Watcher etcd, 'key' + + etcd.watch = (key, cb) -> + w.retryAttempts.should.equal 1 + done() + + etcd.change "error", null + + it 'should call watch on next index after getting change', (done) -> + etcd = new FakeEtcd + w = new Watcher etcd, 'key' + + i = 5 + + etcd.watchIndex = (key, index, cb) -> + index.should.equal i + 1 + done() + + etcd.change null, { index: i } + +