diff --git a/gnocchi/storage/__init__.py b/gnocchi/storage/__init__.py index 6f29234b1..f4aa47ae0 100644 --- a/gnocchi/storage/__init__.py +++ b/gnocchi/storage/__init__.py @@ -121,6 +121,10 @@ def time(self, name): class StorageDriver(object): + # NOTE(sileht): By default we use threads, but some driver can disable + # threads by setting this to utils.sequencial_map + MAP_METHOD = utils.parallel_map + def __init__(self, conf): self.statistics = Statistics() @@ -131,7 +135,7 @@ def upgrade(): def _get_splits(self, metrics_aggregations_keys, version=3): results = collections.defaultdict( lambda: collections.defaultdict(list)) - for metric, aggregation, split in utils.parallel_map( + for metric, aggregation, split in self.MAP_METHOD( lambda m, k, a, v: (m, a, self._get_splits_unbatched(m, k, a, v)), # noqa ((metric, key, aggregation, version) for metric, aggregations_and_keys @@ -168,7 +172,7 @@ def _get_or_create_unaggregated_timeseries(self, metrics, version=3): return dict( six.moves.zip( metrics, - utils.parallel_map( + self.MAP_METHOD( utils.return_none_on_failure( self._get_or_create_unaggregated_timeseries_unbatched), ((metric, version) for metric in metrics)))) @@ -189,7 +193,7 @@ def _store_unaggregated_timeseries(self, metrics_and_data, version=3): :param metrics_and_data: A list of (metric, serialized_data) tuples :param version: Storage engine data format version """ - utils.parallel_map( + self.MAP_METHOD( utils.return_none_on_failure( self._store_unaggregated_timeseries_unbatched), ((metric, data, version) for metric, data in metrics_and_data)) @@ -221,7 +225,7 @@ def _store_metric_splits(self, metrics_keys_aggregations_data_offset, data, offset) tuples. :param version: Storage engine format version. """ - utils.parallel_map( + self.MAP_METHOD( self._store_metric_splits_unbatched, ((metric, key, aggregation, data, offset, version) for metric, keys_aggregations_data_offset @@ -260,7 +264,7 @@ def get_aggregated_measures(self, metric, aggregations, :param to timestamp: The timestamp to get the measure to. """ keys = self._list_split_keys(metric, aggregations) - timeseries = utils.parallel_map( + timeseries = self.MAP_METHOD( self._get_measures_timeserie, ((metric, agg, keys[agg], from_timestamp, to_timestamp) for agg in aggregations)) @@ -561,7 +565,7 @@ def _delete_metric_splits(self, metrics_keys_aggregations, version=3): `storage.Metric` and values are lists of (key, aggregation) tuples. """ - utils.parallel_map( + self.MAP_METHOD( utils.return_none_on_failure(self._delete_metric_splits_unbatched), ((metric, key, aggregation) for metric, keys_and_aggregations diff --git a/gnocchi/storage/swift.py b/gnocchi/storage/swift.py index c1a1ff65a..f334ff86e 100644 --- a/gnocchi/storage/swift.py +++ b/gnocchi/storage/swift.py @@ -86,6 +86,9 @@ class SwiftStorage(storage.StorageDriver): WRITE_FULL = True + # NOTE(sileht): Using threads with swiftclient doesn't work + # as expected, so disable it + MAP_METHOD = utils.sequencial_map def __init__(self, conf): super(SwiftStorage, self).__init__(conf) diff --git a/gnocchi/utils.py b/gnocchi/utils.py index c04cc7aa4..ad2d2b882 100644 --- a/gnocchi/utils.py +++ b/gnocchi/utils.py @@ -299,11 +299,15 @@ def get_driver_class(namespace, conf): conf.driver).driver +def sequencial_map(fn, list_of_args): + return list(itertools.starmap(fn, list_of_args)) + + def parallel_map(fn, list_of_args): """Run a function in parallel.""" if parallel_map.MAX_WORKERS == 1: - return list(itertools.starmap(fn, list_of_args)) + return sequencial_map(fn, list_of_args) with futures.ThreadPoolExecutor( max_workers=parallel_map.MAX_WORKERS) as executor: