Skip to content

Commit

Permalink
Increase some integration test timeouts (#1374)
Browse files Browse the repository at this point in the history
  • Loading branch information
dpkp authored Feb 9, 2018
1 parent 7d8f9a4 commit 8655c75
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 11 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ docs/_build
.cache*
.idea/
integration-test/
tests-env/
tests-env/
.pytest_cache/
6 changes: 4 additions & 2 deletions test/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def open(self):

# Party!
timeout = 5
max_timeout = 30
max_timeout = 120
backoff = 1
end_at = time.time() + max_timeout
tries = 1
Expand All @@ -161,6 +161,7 @@ def open(self):
timeout *= 2
time.sleep(backoff)
tries += 1
backoff += 1
else:
raise RuntimeError('Failed to start Zookeeper before max_timeout')
self.out("Done!")
Expand Down Expand Up @@ -278,7 +279,7 @@ def open(self):
env = self.kafka_run_class_env()

timeout = 5
max_timeout = 30
max_timeout = 120
backoff = 1
end_at = time.time() + max_timeout
tries = 1
Expand All @@ -301,6 +302,7 @@ def open(self):
timeout *= 2
time.sleep(backoff)
tries += 1
backoff += 1
else:
raise RuntimeError('Failed to start KafkaInstance before max_timeout')
self.out("Done!")
Expand Down
10 changes: 6 additions & 4 deletions test/test_consumer_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,13 +647,14 @@ def test_kafka_consumer_offsets_for_time(self):
early_time = late_time - 2000
tp = TopicPartition(self.topic, 0)

timeout = 10
kafka_producer = self.kafka_producer()
early_msg = kafka_producer.send(
self.topic, partition=0, value=b"first",
timestamp_ms=early_time).get(1)
timestamp_ms=early_time).get(timeout)
late_msg = kafka_producer.send(
self.topic, partition=0, value=b"last",
timestamp_ms=late_time).get(1)
timestamp_ms=late_time).get(timeout)

consumer = self.kafka_consumer()
offsets = consumer.offsets_for_times({tp: early_time})
Expand Down Expand Up @@ -699,12 +700,13 @@ def test_kafka_consumer_offsets_search_many_partitions(self):

kafka_producer = self.kafka_producer()
send_time = int(time.time() * 1000)
timeout = 10
p0msg = kafka_producer.send(
self.topic, partition=0, value=b"XXX",
timestamp_ms=send_time).get()
timestamp_ms=send_time).get(timeout)
p1msg = kafka_producer.send(
self.topic, partition=1, value=b"XXX",
timestamp_ms=send_time).get()
timestamp_ms=send_time).get(timeout)

consumer = self.kafka_consumer()
offsets = consumer.offsets_for_times({
Expand Down
6 changes: 3 additions & 3 deletions test/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ def test_end_to_end(kafka_broker, compression):
connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
producer = KafkaProducer(bootstrap_servers=connect_str,
retries=5,
max_block_ms=10000,
max_block_ms=30000,
compression_type=compression,
value_serializer=str.encode)
consumer = KafkaConsumer(bootstrap_servers=connect_str,
group_id=None,
consumer_timeout_ms=10000,
consumer_timeout_ms=30000,
auto_offset_reset='earliest',
value_deserializer=bytes.decode)

Expand Down Expand Up @@ -87,7 +87,7 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
producer = KafkaProducer(bootstrap_servers=connect_str,
retries=5,
max_block_ms=10000,
max_block_ms=30000,
compression_type=compression)
magic = producer._max_usable_produce_magic()

Expand Down
12 changes: 11 additions & 1 deletion test/testutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from . import unittest

from kafka import SimpleClient
from kafka.errors import LeaderNotAvailableError, KafkaTimeoutError
from kafka.structs import OffsetRequestPayload

__all__ = [
Expand Down Expand Up @@ -98,7 +99,16 @@ def setUp(self):
if self.create_client:
self.client = SimpleClient('%s:%d' % (self.server.host, self.server.port))

self.client.ensure_topic_exists(self.topic)
timeout = time.time() + 30
while time.time() < timeout:
try:
self.client.load_metadata_for_topics(self.topic, ignore_leadernotavailable=False)
if self.client.has_metadata_for_topic(topic):
break
except LeaderNotAvailableError:
time.sleep(1)
else:
raise KafkaTimeoutError('Timeout loading topic metadata!')

self._messages = {}

Expand Down

0 comments on commit 8655c75

Please sign in to comment.