Skip to content

Commit

Permalink
Merge branch 'master' into dotnetstandard1_3
Browse files Browse the repository at this point in the history
  • Loading branch information
tameraw committed Mar 21, 2017
2 parents 1e15baa + 9f96703 commit 4b1b36f
Show file tree
Hide file tree
Showing 6 changed files with 598 additions and 128 deletions.
2 changes: 1 addition & 1 deletion device/Microsoft.Azure.Devices.Client/Common/Singleton.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ bool TrySet(TaskCompletionSource<TValue> tcs)
}
}

bool TryRemove()
public bool TryRemove()
{
lock (this.syncLock)
{
Expand Down
83 changes: 53 additions & 30 deletions device/Microsoft.Azure.Devices.Client/IotHubConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,14 @@ abstract class IotHubConnection
readonly int port;

static readonly AmqpVersion AmqpVersion_1_0_0 = new AmqpVersion(1, 0, 0);
const string DisableServerCertificateValidationKeyName = "Microsoft.Azure.Devices.DisableServerCertificateValidation";
static readonly Lazy<bool> DisableServerCertificateValidation = new Lazy<bool>(InitializeDisableServerCertificateValidation);

const string DisableServerCertificateValidationKeyName =
"Microsoft.Azure.Devices.DisableServerCertificateValidation";

static readonly Lazy<bool> DisableServerCertificateValidation =
new Lazy<bool>(InitializeDisableServerCertificateValidation);

private SemaphoreSlim sessionSemaphore = new SemaphoreSlim(1, 1);

protected IotHubConnection(string hostName, int port, AmqpTransportSettings amqpTransportSettings)
{
Expand All @@ -48,25 +54,22 @@ protected IotHubConnection(string hostName, int port, AmqpTransportSettings amqp

public abstract void SafeClose(Exception exception);

public async Task<SendingAmqpLink> CreateSendingLinkAsync(string path, IotHubConnectionString connectionString, TimeSpan timeout, CancellationToken cancellationToken)
public async Task<SendingAmqpLink> CreateSendingLinkAsync(string path, IotHubConnectionString connectionString,
TimeSpan timeout, CancellationToken cancellationToken)
{
this.OnCreateSendingLink(connectionString);

var timeoutHelper = new TimeoutHelper(timeout);

AmqpSession session;
if (!this.FaultTolerantSession.TryGetOpenedObject(out session))
{
session = await this.FaultTolerantSession.GetOrCreateAsync(timeoutHelper.RemainingTime(), cancellationToken);
}
AmqpSession session = await this.GetSessionAsync(timeoutHelper, cancellationToken);

var linkAddress = this.BuildLinkAddress(connectionString, path);

var linkSettings = new AmqpLinkSettings()
{
Role = false,
InitialDeliveryCount = 0,
Target = new Target() { Address = linkAddress.AbsoluteUri },
Target = new Target() {Address = linkAddress.AbsoluteUri},
SndSettleMode = null, // SenderSettleMode.Unsettled (null as it is the default and to avoid bytes on the wire)
RcvSettleMode = null, // (byte)ReceiverSettleMode.First (null as it is the default and to avoid bytes on the wire)
LinkName = Guid.NewGuid().ToString("N") // Use a human readable link name to help with debugging
Expand All @@ -83,17 +86,15 @@ public async Task<SendingAmqpLink> CreateSendingLinkAsync(string path, IotHubCon
return link;
}

public async Task<ReceivingAmqpLink> CreateReceivingLinkAsync(string path, IotHubConnectionString connectionString, TimeSpan timeout, uint prefetchCount, CancellationToken cancellationToken)
public async Task<ReceivingAmqpLink> CreateReceivingLinkAsync(string path,
IotHubConnectionString connectionString, TimeSpan timeout, uint prefetchCount,
CancellationToken cancellationToken)
{
this.OnCreateReceivingLink(connectionString);

var timeoutHelper = new TimeoutHelper(timeout);

AmqpSession session;
if (!this.FaultTolerantSession.TryGetOpenedObject(out session))
{
session = await this.FaultTolerantSession.GetOrCreateAsync(timeoutHelper.RemainingTime(), cancellationToken);
}
AmqpSession session = await this.GetSessionAsync(timeoutHelper, cancellationToken);

var linkAddress = this.BuildLinkAddress(connectionString, path);

Expand All @@ -102,7 +103,7 @@ public async Task<ReceivingAmqpLink> CreateReceivingLinkAsync(string path, IotHu
Role = true,
TotalLinkCredit = prefetchCount,
AutoSendFlow = prefetchCount > 0,
Source = new Source() { Address = linkAddress.AbsoluteUri },
Source = new Source() {Address = linkAddress.AbsoluteUri},
SndSettleMode = null, // SenderSettleMode.Unsettled (null as it is the default and to avoid bytes on the wire)
RcvSettleMode = (byte)ReceiverSettleMode.Second,
LinkName = Guid.NewGuid().ToString("N") // Use a human readable link name to help with debuggin
Expand All @@ -119,25 +120,23 @@ public async Task<ReceivingAmqpLink> CreateReceivingLinkAsync(string path, IotHu
return link;
}

public async Task<SendingAmqpLink> CreateMethodSendingLinkAsync(string path, IotHubConnectionString connectionString, TimeSpan timeout, CancellationToken cancellationToken, string deviceId)
public async Task<SendingAmqpLink> CreateMethodSendingLinkAsync(string path,
IotHubConnectionString connectionString, TimeSpan timeout, CancellationToken cancellationToken,
string deviceId)
{
this.OnCreateSendingLink(connectionString);

var timeoutHelper = new TimeoutHelper(timeout);

AmqpSession session;
if (!this.FaultTolerantSession.TryGetOpenedObject(out session))
{
session = await this.FaultTolerantSession.GetOrCreateAsync(timeoutHelper.RemainingTime(), cancellationToken);
}
AmqpSession session = await this.GetSessionAsync(timeoutHelper, cancellationToken);

var linkAddress = this.BuildLinkAddress(connectionString, path);

var linkSettings = new AmqpLinkSettings()
{
Role = false,
InitialDeliveryCount = 0,
Target = new Target() { Address = linkAddress.AbsoluteUri },
Target = new Target() {Address = linkAddress.AbsoluteUri},
SndSettleMode = (byte)SenderSettleMode.Settled,
RcvSettleMode = (byte)ReceiverSettleMode.First,
LinkName = Guid.NewGuid().ToString("N") // Use a human readable link name to help with debugging
Expand All @@ -156,19 +155,16 @@ public async Task<SendingAmqpLink> CreateMethodSendingLinkAsync(string path, Iot
}

public async Task<ReceivingAmqpLink> CreateMethodReceivingLinkAsync(
string path, IotHubConnectionString connectionString, TimeSpan timeout, uint prefetchCount, CancellationToken cancellationToken,
string path, IotHubConnectionString connectionString, TimeSpan timeout, uint prefetchCount,
CancellationToken cancellationToken,
string deviceId, Action<AmqpMessage, ReceivingAmqpLink> messageListenerAction)
{
this.OnCreateReceivingLink(connectionString);

var timeoutHelper = new TimeoutHelper(timeout);

AmqpSession session;
if (!this.FaultTolerantSession.TryGetOpenedObject(out session))
{
session = await this.FaultTolerantSession.GetOrCreateAsync(timeoutHelper.RemainingTime(), cancellationToken);
}

AmqpSession session = await this.GetSessionAsync(timeoutHelper, cancellationToken);

var linkAddress = this.BuildLinkAddress(connectionString, path);

var linkSettings = new AmqpLinkSettings()
Expand All @@ -195,6 +191,33 @@ public async Task<ReceivingAmqpLink> CreateMethodReceivingLinkAsync(
return link;
}

private async Task<AmqpSession> GetSessionAsync(TimeoutHelper timeoutHelper, CancellationToken token)
{
AmqpSession session;
try
{
await sessionSemaphore.WaitAsync();

session = await this.FaultTolerantSession.GetOrCreateAsync(timeoutHelper.RemainingTime(), token);

Fx.Assert(session != null, "Amqp Session cannot be null.");
if (session.State != AmqpObjectState.Opened)
{
if (session.State == AmqpObjectState.End)
{
this.FaultTolerantSession.TryRemove();
}
session = await this.FaultTolerantSession.GetOrCreateAsync(timeoutHelper.RemainingTime(), token);
}
}
finally
{
sessionSemaphore.Release();
}

return session;
}

public void CloseLink(AmqpLink link)
{
link.SafeClose();
Expand Down
Loading

0 comments on commit 4b1b36f

Please sign in to comment.