Skip to content
This repository has been archived by the owner on Jan 24, 2023. It is now read-only.

Commit

Permalink
Merge pull request #62 from filipecosta90/revrange.and.mrevrange
Browse files Browse the repository at this point in the history
Added support for TS.REVRANGE and TS.MREVRANGE
  • Loading branch information
filipecosta90 authored Aug 31, 2020
2 parents 5558adc + 0a8a006 commit 365ad53
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 18 deletions.
41 changes: 39 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,42 @@ jobs:
. venv/bin/activate
codecov
build_nightly:
build_latest:
docker:
- image: circleci/python:3.7.1
- image: redislabs/redistimeseries:latest

working_directory: ~/repo

steps:
- checkout

- restore_cache: # Download and cache dependencies
keys:
- v1-dependencies-{{ checksum "requirements.txt" }}
# fallback to using the latest cache if no exact match is found
- v1-dependencies-

- run:
name: install dependencies
command: |
virtualenv venv
. venv/bin/activate
pip install -r requirements.txt
pip install codecov
- save_cache:
paths:
- ./venv
key: v1-dependencies-{{ checksum "requirements.txt" }}

- run:
name: run tests
command: |
. venv/bin/activate
REDIS_PORT=6379 python test_commands.py
build_edge:
docker:
- image: circleci/python:3.7.1
- image: redislabs/redistimeseries:edge
Expand Down Expand Up @@ -108,6 +143,7 @@ workflows:
commit:
jobs:
- build
- build_latest
nightly:
triggers:
- schedule:
Expand All @@ -117,4 +153,5 @@ workflows:
only:
- master
jobs:
- build_nightly
- build_edge
- build_latest
97 changes: 81 additions & 16 deletions redistimeseries/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ class Client(object): #changed from StrictRedis
CREATERULE_CMD = 'TS.CREATERULE'
DELETERULE_CMD = 'TS.DELETERULE'
RANGE_CMD = 'TS.RANGE'
REVRANGE_CMD = 'TS.REVRANGE'
MRANGE_CMD = 'TS.MRANGE'
MREVRANGE_CMD = 'TS.MREVRANGE'
GET_CMD = 'TS.GET'
MGET_CMD = 'TS.MGET'
INFO_CMD = 'TS.INFO'
Expand All @@ -107,7 +109,9 @@ def __init__(self, conn=None, *args, **kwargs):
self.CREATERULE_CMD : bool_ok,
self.DELETERULE_CMD : bool_ok,
self.RANGE_CMD : parse_range,
self.REVRANGE_CMD: parse_range,
self.MRANGE_CMD : parse_m_range,
self.MREVRANGE_CMD: parse_m_range,
self.GET_CMD : parse_get,
self.MGET_CMD : parse_m_get,
self.INFO_CMD : TSInfo,
Expand Down Expand Up @@ -254,33 +258,56 @@ def createrule(self, source_key, dest_key,
def deleterule(self, source_key, dest_key):
"""Deletes a compaction rule"""
return self.redis.execute_command(self.DELETERULE_CMD, source_key, dest_key)

def range(self, key, from_time, to_time, count=None,
aggregation_type=None, bucket_size_msec=0):

def __range_params(self, key, from_time, to_time, count, aggregation_type, bucket_size_msec):
"""
Query a range from ``key``, from ``from_time`` to ``to_time``.
``count`` limits the number of results.
Can Aggregate for ``bucket_size_msec`` where an ``aggregation_type``
can be ['avg', 'sum', 'min', 'max', 'range', 'count', 'first',
'last', 'std.p', 'std.s', 'var.p', 'var.s']
Internal method to create TS.RANGE and TS.REVRANGE arguments
"""
params = [key, from_time, to_time]
self.appendCount(params, count)
if aggregation_type is not None:
self.appendAggregation(params, aggregation_type, bucket_size_msec)
return params

def range(self, key, from_time, to_time, count=None,
aggregation_type=None, bucket_size_msec=0):
"""
Query a range in forward direction for a specific time-serie.
Args:
key: Key name for timeseries.
from_time: Start timestamp for the range query. - can be used to express the minimum possible timestamp (0).
to_time: End timestamp for range query, + can be used to express the maximum possible timestamp.
count: Optional maximum number of returned results.
aggregation_type: Optional aggregation type. Can be one of ['avg', 'sum', 'min', 'max', 'range', 'count', 'first',
'last', 'std.p', 'std.s', 'var.p', 'var.s']
bucket_size_msec: Time bucket for aggregation in milliseconds.
"""
params = self.__range_params(key, from_time, to_time, count, aggregation_type, bucket_size_msec)
return self.redis.execute_command(self.RANGE_CMD, *params)

def mrange(self, from_time, to_time, filters, count=None,
aggregation_type=None, bucket_size_msec=0, with_labels=False):
def revrange(self, key, from_time, to_time, count=None,
aggregation_type=None, bucket_size_msec=0):
"""
Query a range based on filters,retention_msecs from ``from_time`` to ``to_time``.
``count`` limits the number of results.
``filters`` are a list strings such as ['Test=This'].
Can Aggregate for ``bucket_size_msec`` where an ``aggregation_type``
can be ['avg', 'sum', 'min', 'max', 'range', 'count', 'first',
Query a range in reverse direction for a specific time-serie.
Note: This command is only available since RedisTimeSeries >= v1.4
Args:
key: Key name for timeseries.
from_time: Start timestamp for the range query. - can be used to express the minimum possible timestamp (0).
to_time: End timestamp for range query, + can be used to express the maximum possible timestamp.
count: Optional maximum number of returned results.
aggregation_type: Optional aggregation type. Can be one of ['avg', 'sum', 'min', 'max', 'range', 'count', 'first',
'last', 'std.p', 'std.s', 'var.p', 'var.s']
``WITHLABELS`` appends labels to results.
bucket_size_msec: Time bucket for aggregation in milliseconds.
"""
params = self.__range_params(key, from_time, to_time, count, aggregation_type, bucket_size_msec)
return self.redis.execute_command(self.REVRANGE_CMD, *params)


def __mrange_params(self, aggregation_type, bucket_size_msec, count, filters, from_time, to_time, with_labels):
"""
Internal method to create TS.MRANGE and TS.MREVRANGE arguments
"""
params = [from_time, to_time]
self.appendCount(params, count)
Expand All @@ -289,8 +316,46 @@ def mrange(self, from_time, to_time, filters, count=None,
self.appendWithLabels(params, with_labels)
params.extend(['FILTER'])
params += filters
return params

def mrange(self, from_time, to_time, filters, count=None,
aggregation_type=None, bucket_size_msec=0, with_labels=False):
"""
Query a range across multiple time-series by filters in forward direction.
Args:
from_time: Start timestamp for the range query. - can be used to express the minimum possible timestamp (0).
to_time: End timestamp for range query, + can be used to express the maximum possible timestamp.
filters: filter to match the time-series labels.
count: Optional maximum number of returned results.
aggregation_type: Optional aggregation type. Can be one of ['avg', 'sum', 'min', 'max', 'range', 'count', 'first',
'last', 'std.p', 'std.s', 'var.p', 'var.s']
bucket_size_msec: Time bucket for aggregation in milliseconds.
with_labels: Include in the reply the label-value pairs that represent metadata labels of the time-series.
If this argument is not set, by default, an empty Array will be replied on the labels array position.
"""
params = self.__mrange_params(aggregation_type, bucket_size_msec, count, filters, from_time, to_time, with_labels)
return self.redis.execute_command(self.MRANGE_CMD, *params)

def mrevrange(self, from_time, to_time, filters, count=None,
aggregation_type=None, bucket_size_msec=0, with_labels=False):
"""
Query a range across multiple time-series by filters in reverse direction.
Args:
from_time: Start timestamp for the range query. - can be used to express the minimum possible timestamp (0).
to_time: End timestamp for range query, + can be used to express the maximum possible timestamp.
filters: filter to match the time-series labels.
count: Optional maximum number of returned results.
aggregation_type: Optional aggregation type. Can be one of ['avg', 'sum', 'min', 'max', 'range', 'count', 'first',
'last', 'std.p', 'std.s', 'var.p', 'var.s']
bucket_size_msec: Time bucket for aggregation in milliseconds.
with_labels: Include in the reply the label-value pairs that represent metadata labels of the time-series.
If this argument is not set, by default, an empty Array will be replied on the labels array position.
"""
params = self.__mrange_params(aggregation_type, bucket_size_msec, count, filters, from_time, to_time, with_labels)
return self.redis.execute_command(self.MREVRANGE_CMD, *params)

def get(self, key):
"""Gets the last sample of ``key``"""
return self.redis.execute_command(self.GET_CMD, key)
Expand Down
54 changes: 54 additions & 0 deletions test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,21 @@
from redistimeseries.client import Client as RedisTimeSeries
from redis import Redis

version = None
rts = None
port = 6379

class RedisTimeSeriesTest(TestCase):
def setUp(self):
global rts
global version
rts = RedisTimeSeries(port=port)
rts.redis.flushdb()
modules = rts.redis.execute_command("module","list")
if modules is not None:
for module_info in modules:
if module_info[1] == b'timeseries':
version = int(module_info[3])

def testCreate(self):
'''Test TS.CREATE calls'''
Expand Down Expand Up @@ -111,6 +118,22 @@ def testRange(self):
self.assertEqual(20, len(rts.range(1, 0, 500, aggregation_type='avg', bucket_size_msec=10)))
self.assertEqual(10, len(rts.range(1, 0, 500, count=10)))

def testRevRange(self):
'''Test TS.REVRANGE calls which returns reverse range by key'''
# TS.REVRANGE is available since RedisTimeSeries >= v1.4
if version is None or version < 14000:
return

for i in range(100):
rts.add(1, i, i % 7)
self.assertEqual(100, len(rts.range(1, 0, 200)))
for i in range(100):
rts.add(1, i+200, i % 7)
self.assertEqual(200, len(rts.range(1, 0, 500)))
#first sample isn't returned
self.assertEqual(20, len(rts.revrange(1, 0, 500, aggregation_type='avg', bucket_size_msec=10)))
self.assertEqual(10, len(rts.revrange(1, 0, 500, count=10)))

def testMultiRange(self):
'''Test TS.MRANGE calls which returns range by filter'''

Expand Down Expand Up @@ -139,6 +162,37 @@ def testMultiRange(self):
res = rts.mrange(0, 200, filters=['Test=This'], with_labels=True)
self.assertEqual({'Test': 'This'}, res[0]['1'][0])

def testMultiReverseRange(self):
'''Test TS.MREVRANGE calls which returns range by filter'''
# TS.MREVRANGE is available since RedisTimeSeries >= v1.4
if version is None or version < 14000:
return

rts.create(1, labels={'Test': 'This'})
rts.create(2, labels={'Test': 'This', 'Taste': 'That'})
for i in range(100):
rts.add(1, i, i % 7)
rts.add(2, i, i % 11)

res = rts.mrange(0, 200, filters=['Test=This'])
self.assertEqual(2, len(res))
self.assertEqual(100, len(res[0]['1'][1]))

res = rts.mrange(0, 200, filters=['Test=This'], count=10)
self.assertEqual(10, len(res[0]['1'][1]))

for i in range(100):
rts.add(1, i + 200, i % 7)
res = rts.mrevrange(0, 500, filters=['Test=This'],
aggregation_type='avg', bucket_size_msec=10)
self.assertEqual(2, len(res))
self.assertEqual(20, len(res[0]['1'][1]))

# test withlabels
self.assertEqual({}, res[0]['1'][0])
res = rts.mrevrange(0, 200, filters=['Test=This'], with_labels=True)
self.assertEqual({'Test': 'This'}, res[0]['1'][0])

def testGet(self):
'''Test TS.GET calls'''

Expand Down

0 comments on commit 365ad53

Please sign in to comment.