From bc18ebded10bc763b98bcba46db925a3a5031ecd Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Tue, 22 May 2018 14:33:09 +0200 Subject: [PATCH] swift: avoid Connection aborted This change removes usage of threads with swift driver. This avoids to get "Connection aborted" because a thread is stuck and the server side decide to break the connection. Related-bug: #509 --- gnocchi/incoming/__init__.py | 12 +++++++----- gnocchi/incoming/swift.py | 5 +++++ gnocchi/storage/__init__.py | 16 ++++++++++------ gnocchi/storage/swift.py | 3 +++ gnocchi/utils.py | 6 +++++- 5 files changed, 30 insertions(+), 12 deletions(-) diff --git a/gnocchi/incoming/__init__.py b/gnocchi/incoming/__init__.py index 51b72d108..562a18ee8 100644 --- a/gnocchi/incoming/__init__.py +++ b/gnocchi/incoming/__init__.py @@ -106,6 +106,9 @@ class IncomingDriver(object): SACK_NAME_FORMAT = "incoming{total}-{number}" CFG_PREFIX = 'gnocchi-config' CFG_SACKS = 'sacks' + # NOTE(sileht): By default we use threads, but some driver can disable + # threads by setting this to utils.sequencial_map + MAP_METHOD = staticmethod(utils.parallel_map) @property def NUM_SACKS(self): @@ -188,11 +191,10 @@ def add_measures_batch(self, metrics_and_measures): and values are a list of :py:class:`gnocchi.incoming.Measure`. """ - utils.parallel_map( - self._store_new_measures, - ((metric_id, self._encode_measures(measures)) - for metric_id, measures - in six.iteritems(metrics_and_measures))) + self.MAP_METHOD(self._store_new_measures, + ((metric_id, self._encode_measures(measures)) + for metric_id, measures + in six.iteritems(metrics_and_measures))) @staticmethod def _store_new_measures(metric_id, data): diff --git a/gnocchi/incoming/swift.py b/gnocchi/incoming/swift.py index 4445eee40..b232bfd72 100644 --- a/gnocchi/incoming/swift.py +++ b/gnocchi/incoming/swift.py @@ -22,6 +22,7 @@ from gnocchi.common import swift from gnocchi import incoming +from gnocchi import utils swclient = swift.swclient swift_utils = swift.swift_utils @@ -30,6 +31,10 @@ class SwiftStorage(incoming.IncomingDriver): + # NOTE(sileht): Using threads with swiftclient doesn't work + # as expected, so disable it + MAP_METHOD = staticmethod(utils.sequencial_map) + def __init__(self, conf, greedy=True): super(SwiftStorage, self).__init__(conf) self.swift = swift.get_connection(conf) diff --git a/gnocchi/storage/__init__.py b/gnocchi/storage/__init__.py index 6f29234b1..79ec19dbb 100644 --- a/gnocchi/storage/__init__.py +++ b/gnocchi/storage/__init__.py @@ -121,6 +121,10 @@ def time(self, name): class StorageDriver(object): + # NOTE(sileht): By default we use threads, but some driver can disable + # threads by setting this to utils.sequencial_map + MAP_METHOD = staticmethod(utils.parallel_map) + def __init__(self, conf): self.statistics = Statistics() @@ -131,7 +135,7 @@ def upgrade(): def _get_splits(self, metrics_aggregations_keys, version=3): results = collections.defaultdict( lambda: collections.defaultdict(list)) - for metric, aggregation, split in utils.parallel_map( + for metric, aggregation, split in self.MAP_METHOD( lambda m, k, a, v: (m, a, self._get_splits_unbatched(m, k, a, v)), # noqa ((metric, key, aggregation, version) for metric, aggregations_and_keys @@ -168,7 +172,7 @@ def _get_or_create_unaggregated_timeseries(self, metrics, version=3): return dict( six.moves.zip( metrics, - utils.parallel_map( + self.MAP_METHOD( utils.return_none_on_failure( self._get_or_create_unaggregated_timeseries_unbatched), ((metric, version) for metric in metrics)))) @@ -189,7 +193,7 @@ def _store_unaggregated_timeseries(self, metrics_and_data, version=3): :param metrics_and_data: A list of (metric, serialized_data) tuples :param version: Storage engine data format version """ - utils.parallel_map( + self.MAP_METHOD( utils.return_none_on_failure( self._store_unaggregated_timeseries_unbatched), ((metric, data, version) for metric, data in metrics_and_data)) @@ -221,7 +225,7 @@ def _store_metric_splits(self, metrics_keys_aggregations_data_offset, data, offset) tuples. :param version: Storage engine format version. """ - utils.parallel_map( + self.MAP_METHOD( self._store_metric_splits_unbatched, ((metric, key, aggregation, data, offset, version) for metric, keys_aggregations_data_offset @@ -260,7 +264,7 @@ def get_aggregated_measures(self, metric, aggregations, :param to timestamp: The timestamp to get the measure to. """ keys = self._list_split_keys(metric, aggregations) - timeseries = utils.parallel_map( + timeseries = self.MAP_METHOD( self._get_measures_timeserie, ((metric, agg, keys[agg], from_timestamp, to_timestamp) for agg in aggregations)) @@ -561,7 +565,7 @@ def _delete_metric_splits(self, metrics_keys_aggregations, version=3): `storage.Metric` and values are lists of (key, aggregation) tuples. """ - utils.parallel_map( + self.MAP_METHOD( utils.return_none_on_failure(self._delete_metric_splits_unbatched), ((metric, key, aggregation) for metric, keys_and_aggregations diff --git a/gnocchi/storage/swift.py b/gnocchi/storage/swift.py index c1a1ff65a..9f8a8cbd0 100644 --- a/gnocchi/storage/swift.py +++ b/gnocchi/storage/swift.py @@ -86,6 +86,9 @@ class SwiftStorage(storage.StorageDriver): WRITE_FULL = True + # NOTE(sileht): Using threads with swiftclient doesn't work + # as expected, so disable it + MAP_METHOD = staticmethod(utils.sequencial_map) def __init__(self, conf): super(SwiftStorage, self).__init__(conf) diff --git a/gnocchi/utils.py b/gnocchi/utils.py index c04cc7aa4..ad2d2b882 100644 --- a/gnocchi/utils.py +++ b/gnocchi/utils.py @@ -299,11 +299,15 @@ def get_driver_class(namespace, conf): conf.driver).driver +def sequencial_map(fn, list_of_args): + return list(itertools.starmap(fn, list_of_args)) + + def parallel_map(fn, list_of_args): """Run a function in parallel.""" if parallel_map.MAX_WORKERS == 1: - return list(itertools.starmap(fn, list_of_args)) + return sequencial_map(fn, list_of_args) with futures.ThreadPoolExecutor( max_workers=parallel_map.MAX_WORKERS) as executor: