Skip to content

Commit

Permalink
Merge pull request #4 from stianeikeland/watcher
Browse files Browse the repository at this point in the history
Added eventemitter for watching changes
  • Loading branch information
stianeikeland committed Aug 25, 2013
2 parents bc5bcac + 1c9cf2f commit 5714773
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 1 deletion.
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,17 @@ c.get "/key", (err, val) ->
c.del "/key", (err, val) ->
console.log err, val

# Watch a value
# Watch a value (wait for a single value change)
c.watch "/key", (err, val) ->
console.log err, val

# Watcher retuns an eventemitter for continuously watching changes,
# it also handles reconnect on error, etc..
w = c.watcher '/key'

w.on 'change', console.log
w.on 'reconnect', console.log

# Set with expiry (time to live)
c.setTTL "/key", "value", 5, (err, val) ->
console.log err, val
Expand Down
5 changes: 5 additions & 0 deletions src/index.coffee
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
request = require 'request'
_ = require 'underscore'
Watcher = require './watcher'

class Etcd

Expand Down Expand Up @@ -64,6 +65,10 @@ class Etcd

request.post opt, @_responseHandler callback

# Returns an eventemitter that watches a key, emits 'change' on value change
# or 'reconnect' when trying to recover from errors.
watcher: (key, index = null) =>
return new Watcher this, key, index

# Get the etcd cluster machines
machines: (callback) ->
Expand Down
37 changes: 37 additions & 0 deletions src/watcher.coffee
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{EventEmitter} = require 'events'

# A eventemitter for watching changes on a given key for etcd.
# Emits:
# 'change' - on value change
# 'reconnect' - on errors/timeouts
#
# Automatically reconnects and backs off on errors.
#
class Watcher extends EventEmitter

constructor: (@etcd, @key, @index = null) ->
@retryAttempts = 0
@_watch()

_watch: () =>
if @index is null
@etcd.watch @key, @_respHandler
else
@etcd.watchIndex @key, @index, @_respHandler

_respHandler: (err, val) =>
if val?.index?
@retryAttempts = 0
@index = val.index + 1
@emit 'change', val
@_watch()

if err isnt null
@emit 'reconnect', { error: err, reconnectcount: @retryAttempts }
@_retry()

_retry: () =>
setTimeout @_watch, 500 * @retryAttempts
@retryAttempts++

exports = module.exports = Watcher
File renamed without changes.
63 changes: 63 additions & 0 deletions test/watcher.coffee
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
require 'should'
Watcher = require '../src/watcher.coffee'

class FakeEtcd
constructor: () ->
@cb = () ->

watch: (key, cb) ->
key.should.equal 'key'
@cb = cb

watchIndex: (key, index, cb) ->
key.should.equal 'key'
@cb = cb

change: (err, val) ->
@cb err, val


describe 'Watcher', () ->
it 'should emit change on watch change', (done) ->
etcd = new FakeEtcd
w = new Watcher etcd, 'key'

w.on 'change', (val) ->
val.should.include { index: 0 }
done()

etcd.change null, { index: 0 }

it 'should emit reconnect event on error', (done) ->
etcd = new FakeEtcd
w = new Watcher etcd, 'key'

w.on 'reconnect', (err) ->
err.should.include { error: "error" }
done()

etcd.change "error", null

it 'should reconnect (call watch again) on error', (done) ->
etcd = new FakeEtcd
w = new Watcher etcd, 'key'

etcd.watch = (key, cb) ->
w.retryAttempts.should.equal 1
done()

etcd.change "error", null

it 'should call watch on next index after getting change', (done) ->
etcd = new FakeEtcd
w = new Watcher etcd, 'key'

i = 5

etcd.watchIndex = (key, index, cb) ->
index.should.equal i + 1
done()

etcd.change null, { index: i }


0 comments on commit 5714773

Please sign in to comment.