diff --git a/src/Orleans.Clustering.ZooKeeper/Orleans.Clustering.ZooKeeper.csproj b/src/Orleans.Clustering.ZooKeeper/Orleans.Clustering.ZooKeeper.csproj
index 13ecaf95e9..8167702a61 100644
--- a/src/Orleans.Clustering.ZooKeeper/Orleans.Clustering.ZooKeeper.csproj
+++ b/src/Orleans.Clustering.ZooKeeper/Orleans.Clustering.ZooKeeper.csproj
@@ -13,6 +13,7 @@
+
diff --git a/src/Orleans.Clustering.ZooKeeper/ZooKeeperBasedMembershipTable.cs b/src/Orleans.Clustering.ZooKeeper/ZooKeeperBasedMembershipTable.cs
index babee19e38..13338a3eb9 100644
--- a/src/Orleans.Clustering.ZooKeeper/ZooKeeperBasedMembershipTable.cs
+++ b/src/Orleans.Clustering.ZooKeeper/ZooKeeperBasedMembershipTable.cs
@@ -10,8 +10,8 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Orleans.Configuration;
-using Orleans.Runtime.Configuration;
using Orleans.Runtime.Host;
+using System.Threading;
namespace Orleans.Runtime.Membership
{
@@ -37,34 +37,29 @@ namespace Orleans.Runtime.Membership
///
public class ZooKeeperBasedMembershipTable : IMembershipTable
{
- private ILogger logger;
+ private ILogger _logger;
+ private readonly ZooKeeperClusteringSiloOptions _options;
+ private const int ZOOKEEPER_CONNECTION_TIMEOUT = 5000;
- private const int ZOOKEEPER_CONNECTION_TIMEOUT = 2000;
-
- private ZooKeeperWatcher watcher;
-
- ///
- /// The deployment connection string. for eg. "192.168.1.1,192.168.1.2/ClusterId"
- ///
- private string deploymentConnectionString;
///
/// the node name for this deployment. for eg. /ClusterId
///
- private string clusterPath;
+ private readonly string _clusterPath;
///
/// The root connection string. for eg. "192.168.1.1,192.168.1.2"
///
- private string rootConnectionString;
-
+ private readonly string _rootConnectionString;
+
public ZooKeeperBasedMembershipTable(
- ILogger logger,
- IOptions membershipTableOptions,
+ ILogger logger,
+ IOptions membershipTableOptions,
IOptions clusterOptions)
{
- this.logger = logger;
- var options = membershipTableOptions.Value;
- watcher = new ZooKeeperWatcher(logger);
- InitConfig(options.ConnectionString, clusterOptions.Value.ClusterId);
+ _logger = logger;
+ _options = membershipTableOptions.Value;
+
+ _clusterPath = $"/{clusterOptions.Value.ClusterId}";
+ _rootConnectionString = _options.ConnectionString;
}
///
@@ -78,28 +73,22 @@ public async Task InitializeMembershipTable(bool tryInitPath)
// try to insert an initial path if it is not already there,
// so we always have the path, before this silo starts working.
// note that when a zookeeper connection adds /ClusterId to the connection string, the nodes are relative
- await UsingZookeeper(rootConnectionString, async zk =>
+ await ZooKeeper.Using(_rootConnectionString, ZOOKEEPER_CONNECTION_TIMEOUT, new ZooKeeperWatcher(_logger), async zk =>
{
try
{
- await zk.createAsync(this.clusterPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- await zk.sync(this.clusterPath);
+ await zk.createAsync(_clusterPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ await zk.sync(_clusterPath);
//if we got here we know that we've just created the deployment path with version=0
- this.logger.Info("Created new deployment path: " + this.clusterPath);
+ _logger.Info($"Created new deployment path: {_clusterPath}");
}
catch (KeeperException.NodeExistsException)
{
- this.logger.Debug("Deployment path already exists: " + this.clusterPath);
+ _logger.Debug($"Deployment path already exists: {_clusterPath}");
}
});
}
- private void InitConfig(string dataConnectionString, string clusterId)
- {
- this.clusterPath = "/" + clusterId;
- deploymentConnectionString = dataConnectionString + this.clusterPath;
- rootConnectionString = dataConnectionString;
- }
///
/// Atomically reads the Membership Table information about a given silo.
@@ -109,27 +98,27 @@ private void InitConfig(string dataConnectionString, string clusterId)
/// The address of the silo whose membership information needs to be read.
/// The membership information for a given silo: MembershipTableData consisting one MembershipEntry entry and
/// TableVersion, read atomically.
- public Task ReadRow(SiloAddress siloAddress)
+ public async Task ReadRow(SiloAddress siloAddress)
{
- return UsingZookeeper(async zk =>
+ return await ZooKeeper.Using(_rootConnectionString, ZOOKEEPER_CONNECTION_TIMEOUT, new ZooKeeperWatcher(_logger), async zk =>
{
- var getRowTask = GetRow(zk, siloAddress);
- var getTableNodeTask = zk.getDataAsync("/");//get the current table version
+ var getRow = await GetRow(zk, siloAddress);
+ var getTableNode = await zk.getDataAsync(_clusterPath); //get the current table version
- List> rows = new List>(1);
+ var rows = new List>(1);
try
{
- await Task.WhenAll(getRowTask, getTableNodeTask);
- rows.Add(await getRowTask);
+ rows.Add(getRow.ToTuple());
}
catch (KeeperException.NoNodeException)
{
//that's ok because orleans expects an empty list in case of a missing row
}
- var tableVersion = ConvertToTableVersion((await getTableNodeTask).Stat);
+ var tableVersion = ConvertToTableVersion(getTableNode.Stat);
+
return new MembershipTableData(rows, tableVersion);
- }, this.deploymentConnectionString, this.watcher, true);
+ });
}
///
@@ -139,28 +128,25 @@ public Task ReadRow(SiloAddress siloAddress)
///
/// The membership information for a given table: MembershipTableData consisting multiple MembershipEntry entries and
/// TableVersion, all read atomically.
- public Task ReadAll()
+ public async Task ReadAll()
{
- return ReadAll(this.deploymentConnectionString, this.watcher);
- }
-
- internal static Task ReadAll(string deploymentConnectionString, ZooKeeperWatcher watcher)
- {
- return UsingZookeeper(async zk =>
+ return await ZooKeeper.Using(_rootConnectionString, ZOOKEEPER_CONNECTION_TIMEOUT, new ZooKeeperWatcher(_logger), async zk =>
{
- var childrenResult = await zk.getChildrenAsync("/");//get all the child nodes (without the data)
-
- var childrenTasks = //get the data from each child node
- childrenResult.Children.Select(child => GetRow(zk, SiloAddress.FromParsableString(child))).ToList();
+ var childrenResult = await zk.getChildrenAsync(_clusterPath);//get all the child nodes (without the data)
- var childrenTaskResults = await Task.WhenAll(childrenTasks);
+ var childrenList = new List>(childrenResult.Children.Count);
+ //get the data from each child node
+ foreach (var child in childrenResult.Children)
+ {
+ var childData = await GetRow(zk, SiloAddress.FromParsableString(child));
+ childrenList.Add(childData.ToTuple());
+ }
var tableVersion = ConvertToTableVersion(childrenResult.Stat);//this is the current table version
- return new MembershipTableData(childrenTaskResults.ToList(), tableVersion);
- }, deploymentConnectionString, watcher, true);
+ return new MembershipTableData(childrenList, tableVersion);
+ });
}
-
///
/// Atomically tries to insert (add) a new MembershipEntry for one silo and also update the TableVersion.
/// If operation succeeds, the following changes would be made to the table:
@@ -176,7 +162,7 @@ internal static Task ReadAll(string deploymentConnectionStr
/// MembershipEntry to be inserted.
/// The new TableVersion for this table, along with its etag.
/// True if the insert operation succeeded and false otherwise.
- public Task InsertRow(MembershipEntry entry, TableVersion tableVersion)
+ public async Task InsertRow(MembershipEntry entry, TableVersion tableVersion)
{
string rowPath = ConvertToRowPath(entry.SiloAddress);
string rowIAmAlivePath = ConvertToRowIAmAlivePath(entry.SiloAddress);
@@ -185,10 +171,11 @@ public Task InsertRow(MembershipEntry entry, TableVersion tableVersion)
int expectedTableVersion = int.Parse(tableVersion.VersionEtag, CultureInfo.InvariantCulture);
- return TryTransaction(t => t
- .setData("/", null, expectedTableVersion)//increments the version of node "/"
- .create(rowPath, newRowData, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
- .create(rowIAmAlivePath, newRowIAmAliveData, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+ return await ZooKeeper.Using(_rootConnectionString, ZOOKEEPER_CONNECTION_TIMEOUT, new ZooKeeperWatcher(_logger), async zk
+ => await zk.TryTransaction(tx
+ => tx.setData(_clusterPath, null, expectedTableVersion)//increments the version of node "/"
+ .create(rowPath, newRowData, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
+ .create(rowIAmAlivePath, newRowIAmAliveData, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)));
}
///
@@ -208,7 +195,7 @@ public Task InsertRow(MembershipEntry entry, TableVersion tableVersion)
/// The etag for the given MembershipEntry.
/// The new TableVersion for this table, along with its etag.
/// True if the update operation succeeded and false otherwise.
- public Task UpdateRow(MembershipEntry entry, string etag, TableVersion tableVersion)
+ public async Task UpdateRow(MembershipEntry entry, string etag, TableVersion tableVersion)
{
string rowPath = ConvertToRowPath(entry.SiloAddress);
string rowIAmAlivePath = ConvertToRowIAmAlivePath(entry.SiloAddress);
@@ -218,10 +205,11 @@ public Task UpdateRow(MembershipEntry entry, string etag, TableVersion tab
int expectedTableVersion = int.Parse(tableVersion.VersionEtag, CultureInfo.InvariantCulture);
int expectedRowVersion = int.Parse(etag, CultureInfo.InvariantCulture);
- return TryTransaction(t => t
- .setData("/", null, expectedTableVersion)//increments the version of node "/"
- .setData(rowPath, newRowData, expectedRowVersion)//increments the version of node "/IP:Port@Gen"
- .setData(rowIAmAlivePath, newRowIAmAliveData));
+ return await ZooKeeper.Using(_rootConnectionString, ZOOKEEPER_CONNECTION_TIMEOUT, new ZooKeeperWatcher(_logger), async zk
+ => await zk.TryTransaction(tx
+ => tx.setData(_clusterPath, null, expectedTableVersion)//increments the version of node "/"
+ .setData(rowPath, newRowData, expectedRowVersion)//increments the version of node "/IP:Port@Gen"
+ .setData(rowIAmAlivePath, newRowIAmAliveData)));
}
///
@@ -236,44 +224,57 @@ public Task UpdateRow(MembershipEntry entry, string etag, TableVersion tab
///
/// The target MembershipEntry tp update
/// Task representing the successful execution of this operation.
- public Task UpdateIAmAlive(MembershipEntry entry)
+ public async Task UpdateIAmAlive(MembershipEntry entry)
{
string rowIAmAlivePath = ConvertToRowIAmAlivePath(entry.SiloAddress);
byte[] newRowIAmAliveData = Serialize(entry.IAmAliveTime);
+
//update the data for IAmAlive unconditionally
- return UsingZookeeper(zk => zk.setDataAsync(rowIAmAlivePath, newRowIAmAliveData), this.deploymentConnectionString, this.watcher);
+ await ZooKeeper.Using(_rootConnectionString, ZOOKEEPER_CONNECTION_TIMEOUT, new ZooKeeperWatcher(_logger), async zk
+ => await zk.setDataAsync(rowIAmAlivePath, newRowIAmAliveData));
}
///
/// Deletes all table entries of the given clusterId
///
- public Task DeleteMembershipTableEntries(string clusterId)
+ public async Task DeleteMembershipTableEntries(string clusterId)
{
- string pathToDelete = "/" + clusterId;
- return UsingZookeeper(rootConnectionString, async zk =>
+ string pathToDelete = $"/{clusterId}";
+
+ await ZooKeeper.Using(_rootConnectionString, ZOOKEEPER_CONNECTION_TIMEOUT, new ZooKeeperWatcher(_logger), async zk =>
{
await ZKUtil.deleteRecursiveAsync(zk, pathToDelete);
await zk.sync(pathToDelete);
});
}
- private async Task TryTransaction(Func transactionFunc)
+ public async Task CleanupDefunctSiloEntries(DateTimeOffset beforeDate)
{
- try
- {
- await UsingZookeeper(zk => transactionFunc(zk.transaction()).commitAsync(), this.deploymentConnectionString, this.watcher);
- return true;
- }
- catch (KeeperException e)
+ _logger.LogInformation("Deleting defunct silo entries from {deletePath} before {beforeDate}", $"{_rootConnectionString}{_clusterPath}", beforeDate);
+
+ await ZooKeeper.Using(_rootConnectionString, ZOOKEEPER_CONNECTION_TIMEOUT, new ZooKeeperWatcher(_logger), async zk =>
{
- //these exceptions are thrown when the transaction fails to commit due to semantical reasons
- if (e is KeeperException.NodeExistsException || e is KeeperException.NoNodeException ||
- e is KeeperException.BadVersionException)
+ var childrenResult = await zk.getChildrenAsync(_clusterPath);
+ var deleteTasks = new List();
+ _logger.LogInformation("Deleting defunct silo entry children: {children}", string.Join(", ", childrenResult.Children));
+
+ foreach (var child in childrenResult.Children)
{
- return false;
+ var rowPath = $"{_clusterPath}/{child}";
+ var rowData = await GetRow(zk, SiloAddress.FromParsableString(child));
+
+ var membershipEntry = rowData.membershipEntry;
+
+ _logger.LogInformation("{child} status: {status} last: {alive}", child, membershipEntry.Status, membershipEntry.IAmAliveTime);
+ if (membershipEntry.Status == SiloStatus.Dead && membershipEntry.IAmAliveTime < beforeDate)
+ {
+ _logger.LogInformation("Deleting defunct silo entry {address} status: {status} last: {alive}.", membershipEntry.SiloAddress, membershipEntry.Status, membershipEntry.IAmAliveTime);
+ deleteTasks.Add(ZKUtil.deleteRecursiveAsync(zk, rowPath));
+ }
}
- throw;
- }
+
+ await Task.WhenAll(deleteTasks);
+ });
}
///
@@ -281,43 +282,26 @@ private async Task TryTransaction(Func transacti
///
/// The zookeeper instance used for the read
/// The silo address.
- private static async Task> GetRow(ZooKeeper zk, SiloAddress siloAddress)
+ private async Task<(MembershipEntry membershipEntry, string version)> GetRow(ZooKeeper zk, SiloAddress siloAddress)
{
string rowPath = ConvertToRowPath(siloAddress);
string rowIAmAlivePath = ConvertToRowIAmAlivePath(siloAddress);
- var rowDataTask = zk.getDataAsync(rowPath);
- var rowIAmAliveDataTask = zk.getDataAsync(rowIAmAlivePath);
-
- await Task.WhenAll(rowDataTask, rowIAmAliveDataTask);
-
- MembershipEntry me = Deserialize((await rowDataTask).Data);
- me.IAmAliveTime = Deserialize((await rowIAmAliveDataTask).Data);
+ var rowData = await zk.getDataAsync(rowPath);
+ var rowIAmAliveData = await zk.getDataAsync(rowIAmAlivePath);
- int rowVersion = (await rowDataTask).Stat.getVersion();
+ var me = Deserialize(rowData.Data);
+ me.IAmAliveTime = Deserialize(rowIAmAliveData.Data);
- return new Tuple(me, rowVersion.ToString(CultureInfo.InvariantCulture));
+ int rowVersion = rowData.Stat.getVersion();
+ return (membershipEntry: me, version: rowVersion.ToString(CultureInfo.InvariantCulture));
}
- private static Task UsingZookeeper(Func> zkMethod, string deploymentConnectionString, ZooKeeperWatcher watcher, bool canBeReadOnly = false)
- {
- return ZooKeeper.Using(deploymentConnectionString, ZOOKEEPER_CONNECTION_TIMEOUT, watcher, zkMethod, canBeReadOnly);
- }
+ private string ConvertToRowPath(SiloAddress siloAddress)
+ => $"{_clusterPath}/{siloAddress.ToParsableString()}";
- private Task UsingZookeeper(string connectString, Func zkMethod)
- {
- return ZooKeeper.Using(connectString, ZOOKEEPER_CONNECTION_TIMEOUT, watcher, zkMethod);
- }
-
- private static string ConvertToRowPath(SiloAddress siloAddress)
- {
- return "/" + siloAddress.ToParsableString();
- }
-
- private static string ConvertToRowIAmAlivePath(SiloAddress siloAddress)
- {
- return ConvertToRowPath(siloAddress) + "/IAmAlive";
- }
+ private string ConvertToRowIAmAlivePath(SiloAddress siloAddress)
+ => $"{ConvertToRowPath(siloAddress)}/IAmAlive";
private static TableVersion ConvertToTableVersion(Stat stat)
{
@@ -326,42 +310,28 @@ private static TableVersion ConvertToTableVersion(Stat stat)
}
private static byte[] Serialize(object obj)
- {
- return
- Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(obj, Formatting.None,
- MembershipSerializerSettings.Instance));
- }
+ => Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(obj, Formatting.None, MembershipSerializerSettings.Instance));
private static T Deserialize(byte[] data)
- {
- return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data), MembershipSerializerSettings.Instance);
- }
-
- public Task CleanupDefunctSiloEntries(DateTimeOffset beforeDate)
- {
- throw new NotImplementedException();
- }
+ => JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data), MembershipSerializerSettings.Instance);
}
- ///
- /// the state of every ZooKeeper client and its push notifications are published using watchers.
- /// in orleans the watcher is only for debugging purposes
- ///
- internal class ZooKeeperWatcher : Watcher
+ public static class ZKExtensions
{
- private readonly ILogger logger;
- public ZooKeeperWatcher(ILogger logger)
+ public static async Task TryTransaction(this ZooKeeper zk, Func transactionFunc)
{
- this.logger = logger;
- }
+ try
+ {
+ var trx = transactionFunc(zk.transaction());
+ await trx.commitAsync();
- public override Task process(WatchedEvent @event)
- {
- if (logger.IsEnabled(LogLevel.Debug))
+ return true;
+ }
+ catch (KeeperException e)
+ when (e is KeeperException.NodeExistsException || e is KeeperException.NoNodeException || e is KeeperException.BadVersionException)
{
- logger.Debug(@event.ToString());
+ return false;
}
- return Task.CompletedTask;
}
}
}
diff --git a/src/Orleans.Clustering.ZooKeeper/ZooKeeperClusteringClientOptions.cs b/src/Orleans.Clustering.ZooKeeper/ZooKeeperClusteringClientOptions.cs
index 5b615b3dfb..2349ede8fc 100644
--- a/src/Orleans.Clustering.ZooKeeper/ZooKeeperClusteringClientOptions.cs
+++ b/src/Orleans.Clustering.ZooKeeper/ZooKeeperClusteringClientOptions.cs
@@ -1,4 +1,4 @@
-using System;
+using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
@@ -9,74 +9,67 @@
using Microsoft.Extensions.Options;
using Orleans.Configuration;
-namespace Orleans.Runtime.Membership
+namespace Orleans.Runtime.Membership;
+
+public class ZooKeeperClusteringClientOptions : IGatewayListProvider
{
- public class ZooKeeperClusteringClientOptions : IGatewayListProvider
+ private TimeSpan maxStaleness;
+ private readonly ZooKeeperBasedMembershipTable zooKeeperBasedMembershipTable;
+
+ public ZooKeeperClusteringClientOptions(
+ ZooKeeperBasedMembershipTable zooKeeperBasedMembershipTable,
+ ILogger logger,
+ IOptions options,
+ IOptions gatewayOptions,
+ IOptions clusterOptions)
{
- private ZooKeeperWatcher watcher;
- ///
- /// the node name for this deployment. for eg. /ClusterId
- ///
- private string deploymentPath;
+ maxStaleness = gatewayOptions.Value.GatewayListRefreshPeriod;
+ this.zooKeeperBasedMembershipTable = zooKeeperBasedMembershipTable;
+ }
- ///
- /// The deployment connection string. for eg. "192.168.1.1,192.168.1.2/ClusterId"
- ///
- private string deploymentConnectionString;
- private TimeSpan maxStaleness;
- public ZooKeeperClusteringClientOptions(
- ILogger logger,
- IOptions options,
- IOptions gatewayOptions,
- IOptions clusterOptions)
- {
- watcher = new ZooKeeperWatcher(logger);
+ ///
+ /// Initializes the ZooKeeper based gateway provider
+ ///
+ public Task InitializeGatewayListProvider()
+ {
+ return Task.CompletedTask;
+ }
- deploymentPath = "/" + clusterOptions.Value.ClusterId;
- deploymentConnectionString = options.Value.ConnectionString + deploymentPath;
- maxStaleness = gatewayOptions.Value.GatewayListRefreshPeriod;
- }
+ ///
+ /// Returns the list of gateways (silos) that can be used by a client to connect to Orleans cluster.
+ /// The Uri is in the form of: "gwy.tcp://IP:port/Generation". See Utils.ToGatewayUri and Utils.ToSiloAddress for more details about Uri format.
+ ///
+ public async Task> GetGateways()
+ {
+ var membershipTableData = await zooKeeperBasedMembershipTable.ReadAll();
- ///
- /// Initializes the ZooKeeper based gateway provider
- ///
- public Task InitializeGatewayListProvider()
- {
- return Task.CompletedTask;
- }
+ return membershipTableData
+ .Members
+ .Select(e => e.Item1)
+ .Where(m => m.Status == SiloStatus.Active && m.ProxyPort != 0)
+ .Select(m =>
+ {
+ var endpoint = new IPEndPoint(m.SiloAddress.Endpoint.Address, m.ProxyPort);
+ var gatewayAddress = SiloAddress.New(endpoint, m.SiloAddress.Generation);
- ///
- /// Returns the list of gateways (silos) that can be used by a client to connect to Orleans cluster.
- /// The Uri is in the form of: "gwy.tcp://IP:port/Generation". See Utils.ToGatewayUri and Utils.ToSiloAddress for more details about Uri format.
- ///
- public async Task> GetGateways()
- {
- var membershipTableData = await ZooKeeperBasedMembershipTable.ReadAll(this.deploymentConnectionString, this.watcher);
- return membershipTableData.Members.Select(e => e.Item1).
- Where(m => m.Status == SiloStatus.Active && m.ProxyPort != 0).
- Select(m =>
- {
- var endpoint = new IPEndPoint(m.SiloAddress.Endpoint.Address, m.ProxyPort);
- var gatewayAddress = SiloAddress.New(endpoint, m.SiloAddress.Generation);
- return gatewayAddress.ToGatewayUri();
- }).ToList();
- }
+ return gatewayAddress.ToGatewayUri();
+ }).ToList();
+ }
- ///
- /// Specifies how often this IGatewayListProvider is refreshed, to have a bound on max staleness of its returned information.
- ///
- public TimeSpan MaxStaleness
- {
- get { return maxStaleness; }
- }
+ ///
+ /// Specifies how often this IGatewayListProvider is refreshed, to have a bound on max staleness of its returned information.
+ ///
+ public TimeSpan MaxStaleness
+ {
+ get { return maxStaleness; }
+ }
- ///
- /// Specifies whether this IGatewayListProvider ever refreshes its returned information, or always returns the same gw list.
- /// (currently only the static config based StaticGatewayListProvider is not updatable. All others are.)
- ///
- public bool IsUpdatable
- {
- get { return true; }
- }
+ ///
+ /// Specifies whether this IGatewayListProvider ever refreshes its returned information, or always returns the same gw list.
+ /// (currently only the static config based StaticGatewayListProvider is not updatable. All others are.)
+ ///
+ public bool IsUpdatable
+ {
+ get { return true; }
}
-}
+}
\ No newline at end of file
diff --git a/src/Orleans.Clustering.ZooKeeper/ZooKeeperWatcher.cs b/src/Orleans.Clustering.ZooKeeper/ZooKeeperWatcher.cs
new file mode 100644
index 0000000000..5d42edf9e9
--- /dev/null
+++ b/src/Orleans.Clustering.ZooKeeper/ZooKeeperWatcher.cs
@@ -0,0 +1,52 @@
+using System.Threading.Tasks;
+using org.apache.zookeeper;
+using Microsoft.Extensions.Logging;
+using static org.apache.zookeeper.Watcher.Event;
+using System.Threading;
+
+namespace Orleans.Runtime.Membership
+{
+ ///
+ /// the state of every ZooKeeper client and its push notifications are published using watchers.
+ /// in orleans the watcher is only for debugging purposes
+ ///
+ internal class ZooKeeperWatcher : Watcher
+ {
+ private readonly ILogger logger;
+
+ public ZooKeeperWatcher(ILogger logger)
+ {
+ this.logger = logger;
+ }
+
+ public override Task process(WatchedEvent ev)
+ {
+ if (logger.IsEnabled(LogLevel.Debug))
+ {
+ logger.Debug(ev.ToString());
+ }
+
+ switch (ev.getState())
+ {
+ case KeeperState.AuthFailed:
+ logger.LogError("ZooKeeper authentication failed", ev);
+ break;
+ case KeeperState.Expired:
+ logger.LogError("ZooKeeper session expired", ev);
+ break;
+ case KeeperState.Disconnected:
+ logger.LogError("ZooKeeper disconnected", ev);
+ break;
+ case KeeperState.SyncConnected:
+ logger.LogInformation("ZooKeeper connected", ev);
+ break;
+ case KeeperState.ConnectedReadOnly:
+ logger.LogInformation("ZooKeeper connected readonly", ev);
+ break;
+ }
+
+ return Task.CompletedTask;
+
+ }
+ }
+}
diff --git a/test/DefaultCluster.Tests/SerializationTests/SerializationTests.cs b/test/DefaultCluster.Tests/SerializationTests/SerializationTests.cs
index b0cf56075d..0e3f1d5d89 100644
--- a/test/DefaultCluster.Tests/SerializationTests/SerializationTests.cs
+++ b/test/DefaultCluster.Tests/SerializationTests/SerializationTests.cs
@@ -1,4 +1,6 @@
using System;
+using System.Net.Http;
+
using NodaTime;
using Orleans.Serialization;
using TestExtensions;
@@ -13,6 +15,26 @@ public SerializationTests(DefaultClusterFixture fixture) : base(fixture)
{
}
+ [Fact]
+ public void GigTest()
+ {
+ var expected = ThrowAndCatch(new HttpRequestException("HTTP request exception"));
+ var actual = this.HostedCluster.SerializationManager.RoundTripSerializationForTesting(expected);
+
+ Assert.Equal(expected.Message, actual.Message);
+ static T ThrowAndCatch(T exception) where T : Exception
+ {
+ try
+ {
+ throw exception;
+ }
+ catch (T ex)
+ {
+ return ex;
+ }
+ }
+ }
+
[Fact, TestCategory("BVT"), TestCategory("Serialization")]
public void Serialization_LargeTestData()
{