Skip to content

Commit

Permalink
Overloading the subscription client is bad, time for local in-memory …
Browse files Browse the repository at this point in the history
…sagas for subsriptions
  • Loading branch information
phatboyg committed Jul 15, 2011
1 parent de529c5 commit 9c590f6
Show file tree
Hide file tree
Showing 18 changed files with 235 additions and 201 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ src/logs/*
bin
obj
_ReSharper*
src/Samples/HeavyLoad/logs

*.csproj.user
*.resharper.user
Expand Down
18 changes: 15 additions & 3 deletions src/MassTransit.resharper.xml
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
<CodeStyleSettings>
<CSS>
<FormatSettings />
<Naming2 />
</CSS>
<CSharp>
<FormatSettings>
<ALIGN_MULTILINE_ARGUMENT>False</ALIGN_MULTILINE_ARGUMENT>
<ALIGN_MULTILINE_ARRAY_AND_OBJECT_INITIALIZER>False</ALIGN_MULTILINE_ARRAY_AND_OBJECT_INITIALIZER>
<ALIGN_MULTILINE_EXTENDS_LIST>False</ALIGN_MULTILINE_EXTENDS_LIST>
<ALIGN_MULTLINE_TYPE_PARAMETER_CONSTRAINS>False</ALIGN_MULTLINE_TYPE_PARAMETER_CONSTRAINS>
<BLANK_LINES_AFTER_START_COMMENT>0</BLANK_LINES_AFTER_START_COMMENT>
<EXPLICIT_PRIVATE_MODIFIER>False</EXPLICIT_PRIVATE_MODIFIER>
<INDENT_ANONYMOUS_METHOD_BLOCK>False</INDENT_ANONYMOUS_METHOD_BLOCK>
<INDENT_EMBRACED_INITIALIZER_BLOCK>False</INDENT_EMBRACED_INITIALIZER_BLOCK>
Expand Down Expand Up @@ -275,6 +280,13 @@ II.2.12 <HandlesEvent />
</Patterns>
]]></CustomMemberReorderingPatterns>
</CSharp>
<HTML>
<FormatSettings />
</HTML>
<JavaScript>
<FormatSettings />
<Naming2 />
</JavaScript>
<VB>
<FormatSettings />
<ImportsSettings />
Expand All @@ -286,12 +298,12 @@ II.2.12 <HandlesEvent />
<Web>
<Naming2 />
</Web>
<Xaml>
<Naming2 />
</Xaml>
<XML>
<FormatSettings />
</XML>
<Xaml>
<Naming2 />
</Xaml>
<FileHeader><![CDATA[Copyright 2007-2011 Chris Patterson, Dru Sellers, Travis Smith, et. al.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use
Expand Down
58 changes: 26 additions & 32 deletions src/MassTransit/Context/ContextStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,41 +10,25 @@
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
namespace MassTransit.Context
{
using System;
using System.Web;

using System;
using System.Web;

namespace MassTransit.Context
{
/// <summary>
/// The default context provider using thread local storage
/// </summary>
public static class ContextStorage
{
static ContextStorageProvider _provider;

static ContextStorageProvider Provider
{
get { return _provider ?? (_provider = GetDefaultProvider()); }
}

static ContextStorageProvider GetDefaultProvider()
{
if (HttpContext.Current != null)
return new HttpContextContextStorageProvider();

return new ThreadStaticContextStorageProvider();
}
private static ContextStorageProvider _provider;

public static ISendContext CurrentSendContext
{
get
{
return Provider.SendContext;
}
get { return Provider.SendContext; }
set
{
if (value == null || value.GetType() == typeof(InvalidSendContext))
if (value == null || value.GetType() == typeof (InvalidSendContext))
Provider.SendContext = null;
else
Provider.SendContext = value;
Expand All @@ -53,19 +37,21 @@ public static ISendContext CurrentSendContext

public static IConsumeContext CurrentConsumeContext
{
get
{
return Provider.ConsumeContext;
}
get { return Provider.ConsumeContext; }
set
{
if (value == null || value.GetType() == typeof(InvalidConsumeContext))
if (value == null || value.GetType() == typeof (InvalidConsumeContext))
Provider.ConsumeContext = null;
else
Provider.ConsumeContext = value;
}
}

private static ContextStorageProvider Provider
{
get { return _provider ?? (_provider = GetDefaultProvider()); }
}

public static IConsumeContext<T> MessageContext<T>()
where T : class
{
Expand All @@ -79,14 +65,14 @@ public static IConsumeContext<T> MessageContext<T>()
public static PublishContext<T> CreatePublishContext<T>(T message)
where T : class
{
var publishContext = PublishContext<T>.FromMessage(message);
PublishContext<T> publishContext = PublishContext<T>.FromMessage(message);

return publishContext;
}

public static IConsumeContext Context()
{
var context = CurrentConsumeContext;
IConsumeContext context = CurrentConsumeContext;
if (context == null)
throw new InvalidOperationException("No consumer context was found");

Expand All @@ -95,7 +81,7 @@ public static IConsumeContext Context()

public static void Context(Action<IConsumeContext> contextCallback)
{
var context = CurrentConsumeContext;
IConsumeContext context = CurrentConsumeContext;
if (context == null)
throw new InvalidOperationException("No consumer context was found");

Expand All @@ -104,11 +90,19 @@ public static void Context(Action<IConsumeContext> contextCallback)

public static TResult Context<TResult>(Func<IConsumeContext, TResult> contextCallback)
{
var context = CurrentConsumeContext;
IConsumeContext context = CurrentConsumeContext;
if (context == null)
throw new InvalidOperationException("No consumer context was found");

return contextCallback(context);
}

private static ContextStorageProvider GetDefaultProvider()
{
if (HttpContext.Current != null)
return new HttpContextContextStorageProvider();

return new ThreadStaticContextStorageProvider();
}
}
}
4 changes: 0 additions & 4 deletions src/MassTransit/Context/ServiceBusReceiveContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,6 @@ public void ReceiveFromEndpoint()

_bus.Endpoint.Receive(context =>
{
if (_log.IsDebugEnabled)
_log.DebugFormat("Enumerating pipeline on {0} from thread {1}", _bus.Endpoint.Address.Uri,
Thread.CurrentThread.ManagedThreadId);
context.SetBus(_bus);
IEnumerable<Action<IConsumeContext>> enumerable = _bus.InboundPipeline.Enumerate(context);
Expand Down
1 change: 1 addition & 0 deletions src/MassTransit/MassTransit.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@
<Compile Include="Transports\ConnectionHandler.cs" />
<Compile Include="Transports\ConnectionPolicy.cs" />
<Compile Include="Transports\ConnectionPolicyChain.cs" />
<Compile Include="Transports\DisposedConnectionPolicy.cs" />
<Compile Include="Transports\IDuplexTransport.cs" />
<Compile Include="Transports\IInboundTransport.cs" />
<Compile Include="Transports\InvalidConnectionException.cs" />
Expand Down
1 change: 0 additions & 1 deletion src/MassTransit/Pipeline/IInboundMessagePipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
namespace MassTransit.Pipeline
{
using System;
using Context;

public interface IInboundMessagePipeline :
IPipelineSink<IConsumeContext>
Expand Down
1 change: 0 additions & 1 deletion src/MassTransit/Pipeline/InboundMessagePipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ namespace MassTransit.Pipeline
{
using System;
using System.Collections.Generic;
using Context;
using Magnum.Concurrency;

public class InboundMessagePipeline :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ namespace MassTransit.Services.Subscriptions.Client
using System.ComponentModel;
using System.Linq;
using System.Reflection;
using log4net;
using Magnum;
using Magnum.Extensions;
using Messages;
using Pipeline;
using Util;
using log4net;

public class SubscriptionCoordinator :
ISubscriptionService,
Expand All @@ -34,11 +34,12 @@ public class SubscriptionCoordinator :
Consumes<RemoveSubscription>.All
{
const BindingFlags _bindingFlags = BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public;
static readonly ILog _log = LogManager.GetLogger(typeof(SubscriptionCoordinator));
static readonly ILog _log = LogManager.GetLogger(typeof (SubscriptionCoordinator));
static readonly ClientSubscriptionInfoMapper _mapper = new ClientSubscriptionInfoMapper();
readonly IServiceBus _bus;
readonly HashSet<string> _ignoredSubscriptions;
readonly EndpointUriList _localEndpoints;
readonly bool _multicast;
readonly string _network;
readonly IEndpoint _outboundEndpoint;
readonly SequenceNumberGenerator _sequence;
Expand All @@ -52,7 +53,6 @@ public class SubscriptionCoordinator :
volatile bool _disposed;

UnsubscribeAction _unsubscribeAction;
readonly bool _multicast;

public SubscriptionCoordinator(IServiceBus bus, IEndpoint outboundEndpoint, string network, bool multicast)
{
Expand Down Expand Up @@ -140,8 +140,8 @@ public void Dispose()

public void Start(IServiceBus bus)
{
if (_log.IsDebugEnabled)
_log.DebugFormat("Starting subscription coordinator on network '{0}'", _network);
if (_log.IsDebugEnabled)
_log.DebugFormat("Starting subscription coordinator on network '{0}'", _network);

ConnectBus(bus);

Expand Down Expand Up @@ -193,17 +193,6 @@ public UnregisterAction Register(IEndpointSubscriptionEvent consumer)

public event Action OnRefresh;

void ConnectBus(IServiceBus bus)
{
var publisher = new SubscriptionPublisher(this);
publisher.Start(bus);
_services.Add(publisher);

var consumer = new SubscriptionConsumer(this);
consumer.Start(bus);
_services.Add(consumer);
}

protected virtual void Dispose(bool disposing)
{
if (!disposing || _disposed) return;
Expand All @@ -223,17 +212,28 @@ protected virtual void Dispose(bool disposing)
_disposed = true;
}

void ConnectBus(IServiceBus bus)
{
var publisher = new SubscriptionPublisher(this);
publisher.Start(bus);
_services.Add(publisher);

var consumer = new SubscriptionConsumer(this);
consumer.Start(bus);
_services.Add(consumer);
}

bool ShouldIgnoreMessage<T>(T message)
{
if (_bus.Context().SourceAddress == _bus.Endpoint.Address.Uri)
{
_log.Debug("Ignoring subscription because its source address equals the busses address");
_log.Debug("Ignoring subscription because its source address equals the busses address");
return true;
}

if (!string.Equals(_bus.Context().Network, _network))
{
_log.DebugFormat("Ignoring subscription because the network '{0}' != ours '{1}1", _bus.Context().Network, _network);
_log.DebugFormat("Ignoring subscription because the network '{0}' != ours '{1}1", _bus.Context().Network, _network);
return true;
}

Expand Down Expand Up @@ -289,7 +289,7 @@ void Add(SubscriptionInformation sub)

_log.Debug("SubscriptionClient Add: " + sub);

var messageType = Type.GetType(sub.MessageName);
Type messageType = Type.GetType(sub.MessageName);
if (messageType == null)
{
_log.InfoFormat("Unknown message type '{0}', unable to add subscription", sub.MessageName);
Expand Down Expand Up @@ -425,8 +425,8 @@ UnsubscribeAction AddToClientsWithCorrelation<T, K>(K key, Uri endpointUri)

bool IgnoreIfLocalEndpoint(Uri endpointUri)
{
var r = _localEndpoints.Contains(endpointUri);
_log.Debug("Ignored a subscription because its endpoint was local.");
bool r = _localEndpoints.Contains(endpointUri);
_log.Debug("Ignored a subscription because its endpoint was local.");

return r;
}
Expand Down
6 changes: 4 additions & 2 deletions src/MassTransit/Transports/ConnectionHandlerImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ public class ConnectionHandlerImpl<T> :
readonly object _lock = new object();
readonly ILog _log = LogManager.GetLogger(typeof (ConnectionHandlerImpl<T>));
readonly ConnectionPolicyChainImpl _policyChain;
bool _connected;
bool _bound;
bool _connected;
bool _disposed;

public ConnectionHandlerImpl(T connection)
Expand All @@ -44,7 +44,7 @@ public void Connect()
{
lock (_lock)
{
if(!_connected)
if (!_connected)
_connection.Connect();

_connected = true;
Expand Down Expand Up @@ -146,6 +146,8 @@ void Dispose(bool disposing)
Disconnect();

_connection.Dispose();

_policyChain.Set(new DisposedConnectionPolicy());
}

_disposed = true;
Expand Down
9 changes: 9 additions & 0 deletions src/MassTransit/Transports/ConnectionPolicyChainImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,14 @@ public void Execute(Action callback)
{
Next(callback);
}

public void Set(ConnectionPolicy connectionPolicy)
{
lock (_policies)
{
_policies.Clear();
_policies.Push(connectionPolicy);
}
}
}
}
Loading

0 comments on commit 9c590f6

Please sign in to comment.