Skip to content

Commit

Permalink
Merge branch 'master' of github.com:jessepollak/mixpanel-python-async
Browse files Browse the repository at this point in the history
  • Loading branch information
Jesse Pollak committed Nov 25, 2014
2 parents 73f1d50 + 4bcf3d7 commit 616e25a
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 17 deletions.
27 changes: 14 additions & 13 deletions mixpanel_async/async_buffered_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import threading
from datetime import datetime, timedelta
from mixpanel import BufferedConsumer as SynchronousBufferedConsumer
from mixpanel import MixpanelException

class FlushThread(threading.Thread):
'''
Expand Down Expand Up @@ -44,18 +45,18 @@ class AsyncBufferedConsumer(SynchronousBufferedConsumer):
Because AsyncBufferedConsumer holds events until the `flush_after` timeout
or an endpoint queue hits the size of _max_queue_size, you should call
flush(async=False) before you terminate any process where you have been
flush(async=False) before you terminate any process where you have been
using the AsyncBufferedConsumer.
'''

# constants used in the _should_flush method
ALL = "ALL"
ENDPOINT = "ENDPOINT"

def __init__(self, flush_after=timedelta(0, 10), flush_first=True, max_size=20,
def __init__(self, flush_after=timedelta(0, 10), flush_first=True, max_size=20,
events_url=None, people_url=None, *args, **kwargs):
'''
Create a new instance of a AsyncBufferedConsumer class.
Create a new instance of a AsyncBufferedConsumer class.
:param flush_after (datetime.timedelta): the time period after which
the AsyncBufferedConsumer will flush the events upon receiving a
Expand All @@ -68,12 +69,12 @@ def __init__(self, flush_after=timedelta(0, 10), flush_first=True, max_size=20,
:param people_url: the Mixpanel API URL that people events will be sent to
'''
super(AsyncBufferedConsumer, self).__init__(
max_size=max_size,
events_url=events_url,
max_size=max_size,
events_url=events_url,
people_url=people_url
)

# remove the minimum max size that the SynchronousBufferedConsumer
# remove the minimum max size that the SynchronousBufferedConsumer
# class sets
self._max_size = max_size
self.flush_after = flush_after
Expand Down Expand Up @@ -149,7 +150,7 @@ def send(self, endpoint, json_message):
:raises: MixpanelException
'''
if endpoint not in self._async_buffers:
raise MixpanelException('No such endpoint "{0}". Valid endpoints are one of {1}'.format(self._async_buffers.keys()))
raise MixpanelException('No such endpoint "{0}". Valid endpoints are one of {1}'.format(endpoint, self._async_buffers.keys()))

buf = self._async_buffers[endpoint]
buf.append(json_message)
Expand All @@ -175,8 +176,8 @@ def flush(self, endpoint=None, async=True):
thrown will have a message property, containing the text of the message,
and an endpoint property containing the endpoint that failed.
:param endpoint (str): One of 'events' or 'people, the Mixpanel endpoint
:param endpoint (str): One of 'events' or 'people, the Mixpanel endpoint
for sending the data
:param async (bool): Whether to flush the data in a seperate thread or not
'''
Expand Down Expand Up @@ -209,7 +210,7 @@ def flush(self, endpoint=None, async=True):
# event is added this second flush will be retriggered and
# will complete.
flushing = False

else:
self.transfer_buffers(endpoint=endpoint)
self._sync_flush(endpoint=endpoint)
Expand All @@ -226,17 +227,17 @@ def transfer_buffers(self, endpoint=None):
Transfer events from the `_async_buffers` where they are stored to the
`_buffers` where they will be flushed from by the flushing thread.
:param endpoint (str): One of 'events' or 'people, the Mixpanel endpoint
:param endpoint (str): One of 'events' or 'people, the Mixpanel endpoint
that is about to be flushed
"""
if endpoint:
if endpoint:
keys = [endpoint]
else:
keys = self._async_buffers.keys()

for key in keys:
buf = self._async_buffers[key]
while buf:
while buf:
self._buffers[key].append(buf.pop(0))


Expand Down
12 changes: 8 additions & 4 deletions tests/async_buffered_consumer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
except ImportError:
raise Exception(
"""
mixpanel-python-async requires the mock package to run the test suite.
Please run:
mixpanel-python-async requires the mock package to run the test suite.
Please run:
$ pip install mock
""")
Expand All @@ -32,7 +32,7 @@ class AsyncBufferedConsumerTestCase(unittest.TestCase):

def setUp(self):
self.consumer = AsyncBufferedConsumer(
max_size=self.MAX_SIZE,
max_size=self.MAX_SIZE,
flush_first=False
)

Expand All @@ -58,7 +58,7 @@ def test_sync_flush_calls_buffered_consumer_flush_endpoint(self, flush_endpoint)
flush_endpoint.assert_called_with(self.ENDPOINT)


@patch.object(AsyncBufferedConsumer, '_sync_flush')
@patch.object(AsyncBufferedConsumer, '_sync_flush')
def test_flush_gets_called_in_different_thread_if_async(self, sync_flush):
main_thread_id = thread.get_ident()
flush_thread_id = None
Expand Down Expand Up @@ -146,6 +146,10 @@ def test_does_not_drop_events(self):
send_patch.assert_called_once_with(self.ENDPOINT, '[{"test": true}]')
self.assertEqual(self.consumer._async_buffers[self.ENDPOINT], [self.JSON])

def test_raises_exception_with_bad_endpoint(self):
with self.assertRaises(mixpanel.MixpanelException):
self.consumer.send('badendpoint', True)

def send_event(self, endpoint=None):
endpoint = endpoint or self.ENDPOINT
self.consumer.send(endpoint, self.JSON)
Expand Down

0 comments on commit 616e25a

Please sign in to comment.