Skip to content

Commit

Permalink
swift: avoid Connection aborted
Browse files Browse the repository at this point in the history
This change removes usage of threads with swift driver.

This avoids to get "Connection aborted" because a thread is stuck
and the server side decide to break the connection.

Related-bug: gnocchixyz#509
  • Loading branch information
sileht committed May 22, 2018
1 parent 2a76e73 commit ad655e4
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 7 deletions.
16 changes: 10 additions & 6 deletions gnocchi/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ def time(self, name):

class StorageDriver(object):

# NOTE(sileht): By default we use threads, but some driver can disable
# threads by setting this to utils.sequencial_map
MAP_METHOD = utils.parallel_map

def __init__(self, conf):
self.statistics = Statistics()

Expand All @@ -131,7 +135,7 @@ def upgrade():
def _get_splits(self, metrics_aggregations_keys, version=3):
results = collections.defaultdict(
lambda: collections.defaultdict(list))
for metric, aggregation, split in utils.parallel_map(
for metric, aggregation, split in self.MAP_METHOD(
lambda m, k, a, v: (m, a, self._get_splits_unbatched(m, k, a, v)), # noqa
((metric, key, aggregation, version)
for metric, aggregations_and_keys
Expand Down Expand Up @@ -168,7 +172,7 @@ def _get_or_create_unaggregated_timeseries(self, metrics, version=3):
return dict(
six.moves.zip(
metrics,
utils.parallel_map(
self.MAP_METHOD(
utils.return_none_on_failure(
self._get_or_create_unaggregated_timeseries_unbatched),
((metric, version) for metric in metrics))))
Expand All @@ -189,7 +193,7 @@ def _store_unaggregated_timeseries(self, metrics_and_data, version=3):
:param metrics_and_data: A list of (metric, serialized_data) tuples
:param version: Storage engine data format version
"""
utils.parallel_map(
self.MAP_METHOD(
utils.return_none_on_failure(
self._store_unaggregated_timeseries_unbatched),
((metric, data, version) for metric, data in metrics_and_data))
Expand Down Expand Up @@ -221,7 +225,7 @@ def _store_metric_splits(self, metrics_keys_aggregations_data_offset,
data, offset) tuples.
:param version: Storage engine format version.
"""
utils.parallel_map(
self.MAP_METHOD(
self._store_metric_splits_unbatched,
((metric, key, aggregation, data, offset, version)
for metric, keys_aggregations_data_offset
Expand Down Expand Up @@ -260,7 +264,7 @@ def get_aggregated_measures(self, metric, aggregations,
:param to timestamp: The timestamp to get the measure to.
"""
keys = self._list_split_keys(metric, aggregations)
timeseries = utils.parallel_map(
timeseries = self.MAP_METHOD(
self._get_measures_timeserie,
((metric, agg, keys[agg], from_timestamp, to_timestamp)
for agg in aggregations))
Expand Down Expand Up @@ -561,7 +565,7 @@ def _delete_metric_splits(self, metrics_keys_aggregations, version=3):
`storage.Metric` and values are lists
of (key, aggregation) tuples.
"""
utils.parallel_map(
self.MAP_METHOD(
utils.return_none_on_failure(self._delete_metric_splits_unbatched),
((metric, key, aggregation)
for metric, keys_and_aggregations
Expand Down
3 changes: 3 additions & 0 deletions gnocchi/storage/swift.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@
class SwiftStorage(storage.StorageDriver):

WRITE_FULL = True
# NOTE(sileht): Using threads with swiftclient doesn't work
# as expected, so disable it
MAP_METHOD = utils.sequencial_map

def __init__(self, conf):
super(SwiftStorage, self).__init__(conf)
Expand Down
6 changes: 5 additions & 1 deletion gnocchi/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,11 +299,15 @@ def get_driver_class(namespace, conf):
conf.driver).driver


def sequencial_map(fn, list_of_args):
return list(itertools.starmap(fn, list_of_args))


def parallel_map(fn, list_of_args):
"""Run a function in parallel."""

if parallel_map.MAX_WORKERS == 1:
return list(itertools.starmap(fn, list_of_args))
return sequencial_map(fn, list_of_args)

with futures.ThreadPoolExecutor(
max_workers=parallel_map.MAX_WORKERS) as executor:
Expand Down

0 comments on commit ad655e4

Please sign in to comment.