Skip to content

derfred/resque-fanout

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

24 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Resque Plugin for pubsub routing between queues

This plugin allows you to define "exchanges" which look like regular Resque queues, but rather than processing jobs they distribute them to one or more other queues. The mapping between exchanges and queues can change at runtime. A code based configuration method as well as a web frontend are provided.

This is useful for loosely coupled asynchronous communication between multiple applications.

Consider a system consisting of three applications. The main frontend application, an internal one processing billing and another handling user accounts. When a new user registers work needs to be performed in the last two.

In the billing application define the following worker:

class BillingListener

  @queue = :payroll

  def self.perform(user_hash)
    
  end

end

Resque.subscribe :new_user, :class => BillingListener

In the user accounts application define this worker:

class AccountListener

  @queue = :accounts

  def self.perform(user_hash)
    
  end

end

Resque.subscribe :new_user, :class => AccountListener

When a new user registers in the frontend application execute:

Resque.publish :new_user, :user_name => :feynman, :account_type => :qed

The job will then be processed by both the BillingListener and the AccountListener in the context of the respective applications.

PubSub semantics

The mapping between exchange and queues is maintained in the Redis server and can therefore change at runtime. The job distribution is performed upon job submission. Exchanges override queues with the same name.

The mapping is written to Redis by the following call:

Resque.subscribe :new_user, :class => AccountListener

This call will do two things

  1. Add the queue defined for AccountListener to the outbound list for the exchange "new_user"
  2. Set the class AccountListener as the class handling the request on the queue defined in step 1

The second step is necessary because the class handling the jobs sent to the exchange "new_user" might be different in the respective receiving applications. The receiving class is set by setting the "class" attribute in the Job definition when it is pushed to the queue.

The above semantics allow you to reroute an existing queue. Consider the above example but the frontend application also defines the following worker:

class NewUserListener

  @queue = :new_user

  def self.perform(user_hash)
    
  end

end

Resque.subscribe :new_user, :class => NewUserListener

If you then do the following in the frontend application,

Resque.enqueue(NewUserListener, :user_name => :feynman, :account_type => :qed)

the job will be performed by all three workers in their respective application contexts.

The Resque.subscribe method provides another mode which does not set the handling class:

Resque.subscribe :new_user, :queue => :accounts

This will of course only work if the receiving applications have a worker called NewUserListener. Using Resque.publish with this exchange will cause an exception.

Managing the Exchange -> Queue mapping

The mapping can be defined either by the Resque.subscribe method or through the web frontend. A relevant mapping is created by the Resque.subscribe call. Therefore if you delete a mapping in the web frontend but rerun Resque.subscribe it will reappear. Depending on your circumstances this may or may not be what you expect. It is recommended to place your mapping definitions in a Rake task similar to rake db:migrate

About

Resque Plugin adding runtime configurable routing to queues

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages