Skip to content

Commit

Permalink
Merge pull request #197 from Azure/DevToMaster
Browse files Browse the repository at this point in the history
Dev to master
  • Loading branch information
sjkwak authored Aug 18, 2017
2 parents 52b1ce1 + 9fe0a7a commit f2de124
Show file tree
Hide file tree
Showing 27 changed files with 482 additions and 269 deletions.
6 changes: 3 additions & 3 deletions appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ matrix:
image: Visual Studio 2017
environment:
azure-event-hubs-dotnet/ClientSecret:
secure: /8H7C81iNS+gVi7LhJCEOPUlNaa30y4KcY4nw/g2C4HGxOv6SrcroyDvMnqD+5GN
secure: +30vtVYAECW7t7ieidatPhCEhvVjjbDUc2UA1SYtDwHa8NxR354pfn9ezxaMqIAR
azure-event-hubs-dotnet/TenantId:
secure: xohonz/X8PPLOVIdT3ch2C5XeSa30RwR6NuXFh4e85svXT1mJNGGO1HQEGxCk3wp
secure: pDX+hB+9zbM0v7OQ44t1Ols0mfRl0BQlJY+ycuteSdZaK1NGJuMHFwS9ULeSBYS/
azure-event-hubs-dotnet/AppId:
secure: c+H140oRJfHtmFHZxSRLWocv5AU0q33X7kgMcTcXxhJvtVhk2WAk1dRQkSN+SyoA
secure: NYwBqWBHvwwNfcKZjjLjCDSxPEVq1AH0DvJXQjVjPA/xpDT/e9SxCj4YpIMWwd+t
matrix:
# First build
- DotNetRunTime: netcoreapp1.0
Expand Down
1 change: 1 addition & 0 deletions build/build.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ function Delete-AzureResources

Build-Solution
if (-Not $canDeploy -and -Not [bool][Environment]::GetEnvironmentVariable($connectionStringVariableName)) {
Write-Host 'Skipping deploy and unit tests - canDeploy:' $canDeploy
return
}
try {
Expand Down
4 changes: 2 additions & 2 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

|Build/Package|Status|
|------|-------------|
|master|[![Build status](https://ci.appveyor.com/api/projects/status/p71xb6o7233m7gi3/branch/master?svg=true)](https://ci.appveyor.com/project/jtaubensee/azure-event-hubs-dotnet/branch/master) [![codecov](https://codecov.io/gh/Azure/azure-event-hubs-dotnet/branch/master/graph/badge.svg)](https://codecov.io/gh/Azure/azure-event-hubs-dotnet)|
|dev|[![Build status](https://ci.appveyor.com/api/projects/status/p71xb6o7233m7gi3/branch/master?svg=true)](https://ci.appveyor.com/project/jtaubensee/azure-event-hubs-dotnet/branch/dev) [![codecov](https://codecov.io/gh/Azure/azure-event-hubs-dotnet/branch/dev/graph/badge.svg)](https://codecov.io/gh/Azure/azure-event-hubs-dotnet)|
|master|[![Build status](https://ci.appveyor.com/api/projects/status/p71xb6o7233m7gi3/branch/master?svg=true)](https://ci.appveyor.com/project/serkantkaraca/azure-event-hubs-dotnet/branch/master) [![codecov](https://codecov.io/gh/Azure/azure-event-hubs-dotnet/branch/master/graph/badge.svg)](https://codecov.io/gh/Azure/azure-event-hubs-dotnet)|
|dev|[![Build status](https://ci.appveyor.com/api/projects/status/p71xb6o7233m7gi3/branch/master?svg=true)](https://ci.appveyor.com/project/serkantkaraca/azure-event-hubs-dotnet/branch/dev) [![codecov](https://codecov.io/gh/Azure/azure-event-hubs-dotnet/branch/dev/graph/badge.svg)](https://codecov.io/gh/Azure/azure-event-hubs-dotnet)|
|Microsoft.Azure.EventHubs|[![NuGet Version and Downloads count](https://buildstats.info/nuget/Microsoft.Azure.EventHubs?includePreReleases=true)](https://www.nuget.org/packages/Microsoft.Azure.EventHubs/)|
|Microsoft.Azure.EventHubs.Processor|[![NuGet Version and Downloads count](https://buildstats.info/nuget/Microsoft.Azure.EventHubs.Processor?includePreReleases=true)](https://www.nuget.org/packages/Microsoft.Azure.EventHubs.Processor/)|

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,14 +333,7 @@ async Task<bool> AcquireLeaseCoreAsync(AzureBlobLease lease)
}
catch (StorageException se)
{
if (WasLeaseLost(partitionId, se))
{
retval = false;
}
else
{
throw;
}
throw HandleStorageException(partitionId, se);
}

return retval;
Expand All @@ -362,12 +355,7 @@ async Task<bool> RenewLeaseCoreAsync(AzureBlobLease lease)
}
catch (StorageException se)
{
if (WasLeaseLost(partitionId, se))
{
throw new LeaseLostException(partitionId, se);
}

throw;
throw HandleStorageException(partitionId, se);
}

return true;
Expand Down Expand Up @@ -398,12 +386,7 @@ async Task<bool> ReleaseLeaseCoreAsync(AzureBlobLease lease)
}
catch (StorageException se)
{
if (WasLeaseLost(partitionId, se))
{
throw new LeaseLostException(partitionId, se);
}

throw;
throw HandleStorageException(partitionId, se);
}

return true;
Expand Down Expand Up @@ -442,12 +425,7 @@ async Task<bool> UpdateLeaseCoreAsync(AzureBlobLease lease)
}
catch (StorageException se)
{
if (WasLeaseLost(partitionId, se))
{
throw new LeaseLostException(partitionId, se);
}

throw;
throw HandleStorageException(partitionId, se);
}

return true;
Expand All @@ -462,31 +440,32 @@ async Task<AzureBlobLease> DownloadLeaseAsync(string partitionId, CloudBlockBlob
AzureBlobLease blobLease = new AzureBlobLease(rehydrated, blob);
return blobLease;
}
bool WasLeaseLost(string partitionId, StorageException se)

Exception HandleStorageException(string partitionId, StorageException se)
{
bool retval = false;
ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.Id, partitionId, "WAS LEASE LOST?");
ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.Id, partitionId, "HttpStatusCode " + se.RequestInformation.HttpStatusCode);
ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.Id, partitionId, "HandleStorageException - HttpStatusCode " + se.RequestInformation.HttpStatusCode);
if (se.RequestInformation.HttpStatusCode == 409 || // conflict
se.RequestInformation.HttpStatusCode == 412) // precondition failed
{
StorageExtendedErrorInformation extendedErrorInfo = se.RequestInformation.ExtendedErrorInformation;

if (extendedErrorInfo != null)
{
string errorCode = extendedErrorInfo.ErrorCode;
ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.Id, partitionId, "Error code: " + errorCode);
ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.Id, partitionId, "Error message: " + extendedErrorInfo.ErrorMessage);
if (errorCode == BlobErrorCodeStrings.LeaseLost ||
errorCode == BlobErrorCodeStrings.LeaseIdMismatchWithLeaseOperation ||
errorCode == BlobErrorCodeStrings.LeaseIdMismatchWithBlobOperation)
{
retval = true;
}
ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.Id, partitionId, "HandleStorageException - Error code: " + errorCode);
ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.Id, partitionId, "HandleStorageException - Error message: " + extendedErrorInfo.ErrorMessage);
}

if (extendedErrorInfo == null ||
extendedErrorInfo.ErrorCode == BlobErrorCodeStrings.LeaseLost ||
extendedErrorInfo.ErrorCode == BlobErrorCodeStrings.LeaseIdMismatchWithLeaseOperation ||
extendedErrorInfo.ErrorCode == BlobErrorCodeStrings.LeaseIdMismatchWithBlobOperation)
{
return new LeaseLostException(partitionId, se);
}
}

return retval;
return se;
}

CloudBlockBlob GetBlockBlobReference(string partitionId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ protected override async Task OnOpenAsync()
if (this.PumpStatus == PartitionPumpStatus.OpenFailed)
{
this.PumpStatus = PartitionPumpStatus.Closing;
await this.CleanUpClientsAsync();
await this.CleanUpClientsAsync().ConfigureAwait(false);
this.PumpStatus = PartitionPumpStatus.Closed;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class LeaseLostException : Exception
readonly string partitionId;

internal LeaseLostException(string partitionId, Exception innerException)
: base(string.Empty, innerException)
: base(innerException.Message, innerException)
{
if (partitionId == null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<PropertyGroup>
<Description>This is the next generation Azure Event Hubs .NET Standard Event Processor Host library. For more information about Event Hubs, see https://azure.microsoft.com/en-us/services/event-hubs/</Description>
<AssemblyTitle>Microsoft.Azure.EventHubs.Processor</AssemblyTitle>
<VersionPrefix>1.0.1</VersionPrefix>
<VersionPrefix>1.0.3</VersionPrefix>
<Authors>Microsoft</Authors>
<TargetFrameworks>net451;netstandard1.3;uap10.0;</TargetFrameworks>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
Expand All @@ -24,6 +24,7 @@
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
<DebugType>full</DebugType>
<DocumentationFile>bin\$(Configuration)\$(TargetFramework)\Microsoft.Azure.EventHubs.Processor.xml</DocumentationFile>
<Version>1.0.3</Version>
</PropertyGroup>

<PropertyGroup Condition="'$(TargetFramework)' == 'uap10.0'">
Expand Down
2 changes: 1 addition & 1 deletion src/Microsoft.Azure.EventHubs.Processor/PartitionPump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ protected async Task ProcessEventsAsync(IEnumerable<EventData> events)
catch (Exception e)
{
ProcessorEventSource.Log.PartitionPumpInvokeProcessorEventsError(this.Host.Id, this.PartitionContext.PartitionId, e.ToString());
await this.ProcessErrorAsync(e);
await this.ProcessErrorAsync(e).ConfigureAwait(false);
}
finally
{
Expand Down
8 changes: 7 additions & 1 deletion src/Microsoft.Azure.EventHubs/Amqp/AmqpClientConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,16 @@ class AmqpClientConstants
public const string ManagementEventHubPartitionCount = "partition_count";
public const string ManagementEventHubPartitionIds = "partition_ids";

// Response codes
public const string ResponseStatusCode = "status-code";
public const string ResponseErrorCondition = "error-condition";
public const string ResponseStatusDescription = "status-description";

public const int AmqpMinimumOpenSessionTimeoutInSeconds = 60;
// Web-sockets related constants
public const string WebSocketsPathSuffix = "/$servicebus/websocket/";
public const string UriSchemeWss = "wss";

// Miscellaneous
public const int AmqpSessionTimeoutInSeconds = 30;
}
}
7 changes: 3 additions & 4 deletions src/Microsoft.Azure.EventHubs/Amqp/AmqpEventDataSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,9 @@ async Task<SendingAmqpLink> CreateLinkAsync(TimeSpan timeout)
{
var amqpEventHubClient = ((AmqpEventHubClient)this.EventHubClient);

// Allow at least AmqpMinimumOpenSessionTimeoutInSeconds seconds to open the session.
var openSessionTimeout = AmqpClientConstants.AmqpMinimumOpenSessionTimeoutInSeconds > timeout.TotalSeconds ?
TimeSpan.FromSeconds(AmqpClientConstants.AmqpMinimumOpenSessionTimeoutInSeconds) : timeout;
var timeoutHelper = new TimeoutHelper(openSessionTimeout);
// We won't use remaining timeout during create session call.
// For large or small operation timeout values using remaining time won't make any sense.
var timeoutHelper = new TimeoutHelper(TimeSpan.FromSeconds(AmqpClientConstants.AmqpSessionTimeoutInSeconds));

AmqpConnection connection = await amqpEventHubClient.ConnectionManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);

Expand Down
59 changes: 34 additions & 25 deletions src/Microsoft.Azure.EventHubs/Amqp/AmqpEventHubClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ namespace Microsoft.Azure.EventHubs.Amqp
using System.Threading.Tasks;
using Microsoft.Azure.Amqp.Sasl;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Framing;
using Microsoft.Azure.Amqp.Transport;
using Microsoft.Azure.EventHubs.Amqp.Management;

Expand Down Expand Up @@ -160,33 +159,38 @@ internal static AmqpSettings CreateAmqpSettings(
return settings;
}

static TransportSettings CreateTcpTransportSettings(
string networkHost,
string hostName,
int port,
bool useSslStreamSecurity,
bool sslStreamUpgrade = false,
string sslHostName = null)
static TransportSettings CreateTcpTlsTransportSettings(string hostName, int port)
{
TcpTransportSettings tcpSettings = new TcpTransportSettings
{
Host = networkHost,
Host = hostName,
Port = port < 0 ? AmqpConstants.DefaultSecurePort : port,
ReceiveBufferSize = AmqpConstants.TransportBufferSize,
SendBufferSize = AmqpConstants.TransportBufferSize
};

TransportSettings tpSettings = tcpSettings;
if (useSslStreamSecurity && !sslStreamUpgrade)
TlsTransportSettings tlsSettings = new TlsTransportSettings(tcpSettings)
{
TlsTransportSettings tlsSettings = new TlsTransportSettings(tcpSettings)
{
TargetHost = sslHostName ?? hostName,
};
tpSettings = tlsSettings;
}
TargetHost = hostName,
};

return tpSettings;
return tlsSettings;
}

static TransportSettings CreateWebSocketsTransportSettings(string hostName)
{
var uriBuilder = new UriBuilder(hostName)
{
Path = AmqpClientConstants.WebSocketsPathSuffix,
Scheme = AmqpClientConstants.UriSchemeWss,
Port = -1 // Port will be assigned on transport listener.
};
var ts = new WebSocketTransportSettings()
{
Uri = uriBuilder.Uri
};

return ts;
}

static AmqpConnectionSettings CreateAmqpConnectionSettings(uint maxFrameSize, string containerId, string hostName)
Expand All @@ -205,20 +209,25 @@ static AmqpConnectionSettings CreateAmqpConnectionSettings(uint maxFrameSize, st
async Task<AmqpConnection> CreateConnectionAsync(TimeSpan timeout)
{
string hostName = this.ConnectionStringBuilder.Endpoint.Host;
string networkHost = this.ConnectionStringBuilder.Endpoint.Host;
int port = this.ConnectionStringBuilder.Endpoint.Port;
bool useWebSockets = this.ConnectionStringBuilder.TransportType == Microsoft.Azure.EventHubs.TransportType.AmqpWebSockets;

var timeoutHelper = new TimeoutHelper(timeout);
var amqpSettings = CreateAmqpSettings(
amqpVersion: this.AmqpVersion,
useSslStreamSecurity: true,
hasTokenProvider: true);
hasTokenProvider: true,
useWebSockets: useWebSockets);

TransportSettings tpSettings = CreateTcpTransportSettings(
networkHost: networkHost,
hostName: hostName,
port: port,
useSslStreamSecurity: true);
TransportSettings tpSettings = null;
if (useWebSockets)
{
tpSettings = CreateWebSocketsTransportSettings(hostName);
}
else
{
tpSettings = CreateTcpTlsTransportSettings(hostName, port);
}

var initiator = new AmqpTransportInitiator(amqpSettings, tpSettings);
var transport = await initiator.ConnectTaskAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
Expand Down
Loading

0 comments on commit f2de124

Please sign in to comment.