Skip to content
This repository has been archived by the owner on Jan 24, 2023. It is now read-only.

Commit

Permalink
Pipeline (#30)
Browse files Browse the repository at this point in the history
* Added pipeline + tests
  • Loading branch information
Ariel Shtul authored and gkorland committed Dec 20, 2019
1 parent b2e46d9 commit 132c751
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 8 deletions.
37 changes: 30 additions & 7 deletions redistimeseries/client.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,32 @@
import six
import redis
from redis import Redis, RedisError
from redis import Redis
from redis.client import Pipeline
from redis.client import bool_ok
from redis.client import int_or_none
from redis._compat import (long, nativestr)
from redis.exceptions import DataError
from redis._compat import nativestr

class TSInfo(object):
rules = []
labels = []
sourceKey = None
chunk_count = None
last_time_stamp = None
memory_usage = None
total_samples = None
retention_msecs = None
last_time_stamp = None
first_time_stamp = None
max_samples_per_chunk = None

def __init__(self, args):
response = dict(zip(map(nativestr, args[::2]), args[1::2]))
self.rules = response['rules']
self.sourceKey = response['sourceKey']
self.chunkCount = response['chunkCount']
self.memory_usage = response['memoryUsage']
self.total_samples = response['totalSamples']
self.labels = list_to_dict(response['labels'])
self.lastTimeStamp = response['lastTimestamp']
self.retention_msecs = response['retentionTime']
self.lastTimeStamp = response['lastTimestamp']
self.first_time_stamp = response['firstTimestamp']
self.maxSamplesPerChunk = response['maxSamplesPerChunk']

def list_to_dict(aList):
Expand Down Expand Up @@ -262,3 +266,22 @@ def info(self, key):
def queryindex(self, filters):
"""Get all the keys matching the ``filter`` list."""
return self.execute_command(self.QUERYINDEX_CMD, *filters)

def pipeline(self, transaction=True, shard_hint=None):
"""
Return a new pipeline object that can queue multiple commands for
later execution. ``transaction`` indicates whether all commands
should be executed atomically. Apart from making a group of operations
atomic, pipelines are useful for reducing the back-and-forth overhead
between the client and server.
Overridden in order to provide the right client through the pipeline.
"""
p = Pipeline(
connection_pool=self.connection_pool,
response_callbacks=self.response_callbacks,
transaction=transaction,
shard_hint=shard_hint)
return p

class Pipeline(Pipeline, Client):
"Pipeline for Redis TimeSeries Client"
17 changes: 16 additions & 1 deletion test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ def testAlter(self):
rts.alter(1, labels={'Time':'Series'})
self.assertEqual('Series', rts.info(1).labels['Time'])
self.assertEqual(10, rts.info(1).retention_msecs)

pipe = rts.pipeline()
self.assertTrue(pipe.create(2))
def testAdd(self):
'''Test TS.ADD calls'''

Expand Down Expand Up @@ -141,11 +142,25 @@ def testInfo(self):
self.assertEqual(info.labels['currentLabel'], 'currentData')

def testQueryIndex(self):
'''Test TS.QUERYINDEX calls'''
rts.create(1, labels={'Test':'This'})
rts.create(2, labels={'Test':'This', 'Taste':'That'})
self.assertEqual(2, len(rts.queryindex(['Test=This'])))
self.assertEqual(1, len(rts.queryindex(['Taste=That'])))
self.assertEqual(['2'], rts.queryindex(['Taste=That']))

def testPipeline(self):
'''Test pipeline'''
pipeline = rts.pipeline()
pipeline.create('with_pipeline')
for i in range(100):
pipeline.add('with_pipeline', i, 1.1 * i)
pipeline.execute()

info = rts.info('with_pipeline')
self.assertEqual(info.lastTimeStamp, 99)
self.assertEqual(info.total_samples, 100)
self.assertEqual(rts.get('with_pipeline')[1], 99 * 1.1)

if __name__ == '__main__':
unittest.main()

0 comments on commit 132c751

Please sign in to comment.