Skip to content

Commit

Permalink
swift: avoid Connection aborted
Browse files Browse the repository at this point in the history
This change removes usage of threads with swift driver.

This avoids to get "Connection aborted" because a thread is stuck
and the server side decide to break the connection.

Related-bug: #509
  • Loading branch information
sileht authored and mergify[bot] committed May 22, 2018
1 parent 2a76e73 commit bc18ebd
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 12 deletions.
12 changes: 7 additions & 5 deletions gnocchi/incoming/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ class IncomingDriver(object):
SACK_NAME_FORMAT = "incoming{total}-{number}"
CFG_PREFIX = 'gnocchi-config'
CFG_SACKS = 'sacks'
# NOTE(sileht): By default we use threads, but some driver can disable
# threads by setting this to utils.sequencial_map
MAP_METHOD = staticmethod(utils.parallel_map)

@property
def NUM_SACKS(self):
Expand Down Expand Up @@ -188,11 +191,10 @@ def add_measures_batch(self, metrics_and_measures):
and values are a list of
:py:class:`gnocchi.incoming.Measure`.
"""
utils.parallel_map(
self._store_new_measures,
((metric_id, self._encode_measures(measures))
for metric_id, measures
in six.iteritems(metrics_and_measures)))
self.MAP_METHOD(self._store_new_measures,
((metric_id, self._encode_measures(measures))
for metric_id, measures
in six.iteritems(metrics_and_measures)))

@staticmethod
def _store_new_measures(metric_id, data):
Expand Down
5 changes: 5 additions & 0 deletions gnocchi/incoming/swift.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from gnocchi.common import swift
from gnocchi import incoming
from gnocchi import utils

swclient = swift.swclient
swift_utils = swift.swift_utils
Expand All @@ -30,6 +31,10 @@


class SwiftStorage(incoming.IncomingDriver):
# NOTE(sileht): Using threads with swiftclient doesn't work
# as expected, so disable it
MAP_METHOD = staticmethod(utils.sequencial_map)

def __init__(self, conf, greedy=True):
super(SwiftStorage, self).__init__(conf)
self.swift = swift.get_connection(conf)
Expand Down
16 changes: 10 additions & 6 deletions gnocchi/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ def time(self, name):

class StorageDriver(object):

# NOTE(sileht): By default we use threads, but some driver can disable
# threads by setting this to utils.sequencial_map
MAP_METHOD = staticmethod(utils.parallel_map)

def __init__(self, conf):
self.statistics = Statistics()

Expand All @@ -131,7 +135,7 @@ def upgrade():
def _get_splits(self, metrics_aggregations_keys, version=3):
results = collections.defaultdict(
lambda: collections.defaultdict(list))
for metric, aggregation, split in utils.parallel_map(
for metric, aggregation, split in self.MAP_METHOD(
lambda m, k, a, v: (m, a, self._get_splits_unbatched(m, k, a, v)), # noqa
((metric, key, aggregation, version)
for metric, aggregations_and_keys
Expand Down Expand Up @@ -168,7 +172,7 @@ def _get_or_create_unaggregated_timeseries(self, metrics, version=3):
return dict(
six.moves.zip(
metrics,
utils.parallel_map(
self.MAP_METHOD(
utils.return_none_on_failure(
self._get_or_create_unaggregated_timeseries_unbatched),
((metric, version) for metric in metrics))))
Expand All @@ -189,7 +193,7 @@ def _store_unaggregated_timeseries(self, metrics_and_data, version=3):
:param metrics_and_data: A list of (metric, serialized_data) tuples
:param version: Storage engine data format version
"""
utils.parallel_map(
self.MAP_METHOD(
utils.return_none_on_failure(
self._store_unaggregated_timeseries_unbatched),
((metric, data, version) for metric, data in metrics_and_data))
Expand Down Expand Up @@ -221,7 +225,7 @@ def _store_metric_splits(self, metrics_keys_aggregations_data_offset,
data, offset) tuples.
:param version: Storage engine format version.
"""
utils.parallel_map(
self.MAP_METHOD(
self._store_metric_splits_unbatched,
((metric, key, aggregation, data, offset, version)
for metric, keys_aggregations_data_offset
Expand Down Expand Up @@ -260,7 +264,7 @@ def get_aggregated_measures(self, metric, aggregations,
:param to timestamp: The timestamp to get the measure to.
"""
keys = self._list_split_keys(metric, aggregations)
timeseries = utils.parallel_map(
timeseries = self.MAP_METHOD(
self._get_measures_timeserie,
((metric, agg, keys[agg], from_timestamp, to_timestamp)
for agg in aggregations))
Expand Down Expand Up @@ -561,7 +565,7 @@ def _delete_metric_splits(self, metrics_keys_aggregations, version=3):
`storage.Metric` and values are lists
of (key, aggregation) tuples.
"""
utils.parallel_map(
self.MAP_METHOD(
utils.return_none_on_failure(self._delete_metric_splits_unbatched),
((metric, key, aggregation)
for metric, keys_and_aggregations
Expand Down
3 changes: 3 additions & 0 deletions gnocchi/storage/swift.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@
class SwiftStorage(storage.StorageDriver):

WRITE_FULL = True
# NOTE(sileht): Using threads with swiftclient doesn't work
# as expected, so disable it
MAP_METHOD = staticmethod(utils.sequencial_map)

def __init__(self, conf):
super(SwiftStorage, self).__init__(conf)
Expand Down
6 changes: 5 additions & 1 deletion gnocchi/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,11 +299,15 @@ def get_driver_class(namespace, conf):
conf.driver).driver


def sequencial_map(fn, list_of_args):
return list(itertools.starmap(fn, list_of_args))


def parallel_map(fn, list_of_args):
"""Run a function in parallel."""

if parallel_map.MAX_WORKERS == 1:
return list(itertools.starmap(fn, list_of_args))
return sequencial_map(fn, list_of_args)

with futures.ThreadPoolExecutor(
max_workers=parallel_map.MAX_WORKERS) as executor:
Expand Down

0 comments on commit bc18ebd

Please sign in to comment.