From 0a8a0067053cc73657d7d55cd7bf9dac715f1c87 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Mon, 31 Aug 2020 00:13:04 +0100 Subject: [PATCH] [add] Added support for TS.REVRANGE and TS.MREVRANGE --- .circleci/config.yml | 41 ++++++++++++++++- redistimeseries/client.py | 97 ++++++++++++++++++++++++++++++++------- test_commands.py | 54 ++++++++++++++++++++++ 3 files changed, 174 insertions(+), 18 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index e9565bf..5218f5d 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -66,7 +66,42 @@ jobs: . venv/bin/activate codecov - build_nightly: + build_latest: + docker: + - image: circleci/python:3.7.1 + - image: redislabs/redistimeseries:latest + + working_directory: ~/repo + + steps: + - checkout + + - restore_cache: # Download and cache dependencies + keys: + - v1-dependencies-{{ checksum "requirements.txt" }} + # fallback to using the latest cache if no exact match is found + - v1-dependencies- + + - run: + name: install dependencies + command: | + virtualenv venv + . venv/bin/activate + pip install -r requirements.txt + pip install codecov + + - save_cache: + paths: + - ./venv + key: v1-dependencies-{{ checksum "requirements.txt" }} + + - run: + name: run tests + command: | + . venv/bin/activate + REDIS_PORT=6379 python test_commands.py + + build_edge: docker: - image: circleci/python:3.7.1 - image: redislabs/redistimeseries:edge @@ -108,6 +143,7 @@ workflows: commit: jobs: - build + - build_latest nightly: triggers: - schedule: @@ -117,4 +153,5 @@ workflows: only: - master jobs: - - build_nightly + - build_edge + - build_latest diff --git a/redistimeseries/client.py b/redistimeseries/client.py index 5cf3113..62aed79 100644 --- a/redistimeseries/client.py +++ b/redistimeseries/client.py @@ -88,7 +88,9 @@ class Client(object): #changed from StrictRedis CREATERULE_CMD = 'TS.CREATERULE' DELETERULE_CMD = 'TS.DELETERULE' RANGE_CMD = 'TS.RANGE' + REVRANGE_CMD = 'TS.REVRANGE' MRANGE_CMD = 'TS.MRANGE' + MREVRANGE_CMD = 'TS.MREVRANGE' GET_CMD = 'TS.GET' MGET_CMD = 'TS.MGET' INFO_CMD = 'TS.INFO' @@ -107,7 +109,9 @@ def __init__(self, conn=None, *args, **kwargs): self.CREATERULE_CMD : bool_ok, self.DELETERULE_CMD : bool_ok, self.RANGE_CMD : parse_range, + self.REVRANGE_CMD: parse_range, self.MRANGE_CMD : parse_m_range, + self.MREVRANGE_CMD: parse_m_range, self.GET_CMD : parse_get, self.MGET_CMD : parse_m_get, self.INFO_CMD : TSInfo, @@ -254,33 +258,56 @@ def createrule(self, source_key, dest_key, def deleterule(self, source_key, dest_key): """Deletes a compaction rule""" return self.redis.execute_command(self.DELETERULE_CMD, source_key, dest_key) - - def range(self, key, from_time, to_time, count=None, - aggregation_type=None, bucket_size_msec=0): + + def __range_params(self, key, from_time, to_time, count, aggregation_type, bucket_size_msec): """ - Query a range from ``key``, from ``from_time`` to ``to_time``. - ``count`` limits the number of results. - Can Aggregate for ``bucket_size_msec`` where an ``aggregation_type`` - can be ['avg', 'sum', 'min', 'max', 'range', 'count', 'first', - 'last', 'std.p', 'std.s', 'var.p', 'var.s'] + Internal method to create TS.RANGE and TS.REVRANGE arguments """ params = [key, from_time, to_time] self.appendCount(params, count) if aggregation_type is not None: self.appendAggregation(params, aggregation_type, bucket_size_msec) + return params + def range(self, key, from_time, to_time, count=None, + aggregation_type=None, bucket_size_msec=0): + """ + Query a range in forward direction for a specific time-serie. + + Args: + key: Key name for timeseries. + from_time: Start timestamp for the range query. - can be used to express the minimum possible timestamp (0). + to_time: End timestamp for range query, + can be used to express the maximum possible timestamp. + count: Optional maximum number of returned results. + aggregation_type: Optional aggregation type. Can be one of ['avg', 'sum', 'min', 'max', 'range', 'count', 'first', + 'last', 'std.p', 'std.s', 'var.p', 'var.s'] + bucket_size_msec: Time bucket for aggregation in milliseconds. + """ + params = self.__range_params(key, from_time, to_time, count, aggregation_type, bucket_size_msec) return self.redis.execute_command(self.RANGE_CMD, *params) - def mrange(self, from_time, to_time, filters, count=None, - aggregation_type=None, bucket_size_msec=0, with_labels=False): + def revrange(self, key, from_time, to_time, count=None, + aggregation_type=None, bucket_size_msec=0): """ - Query a range based on filters,retention_msecs from ``from_time`` to ``to_time``. - ``count`` limits the number of results. - ``filters`` are a list strings such as ['Test=This']. - Can Aggregate for ``bucket_size_msec`` where an ``aggregation_type`` - can be ['avg', 'sum', 'min', 'max', 'range', 'count', 'first', + Query a range in reverse direction for a specific time-serie. + Note: This command is only available since RedisTimeSeries >= v1.4 + + Args: + key: Key name for timeseries. + from_time: Start timestamp for the range query. - can be used to express the minimum possible timestamp (0). + to_time: End timestamp for range query, + can be used to express the maximum possible timestamp. + count: Optional maximum number of returned results. + aggregation_type: Optional aggregation type. Can be one of ['avg', 'sum', 'min', 'max', 'range', 'count', 'first', 'last', 'std.p', 'std.s', 'var.p', 'var.s'] - ``WITHLABELS`` appends labels to results. + bucket_size_msec: Time bucket for aggregation in milliseconds. + """ + params = self.__range_params(key, from_time, to_time, count, aggregation_type, bucket_size_msec) + return self.redis.execute_command(self.REVRANGE_CMD, *params) + + + def __mrange_params(self, aggregation_type, bucket_size_msec, count, filters, from_time, to_time, with_labels): + """ + Internal method to create TS.MRANGE and TS.MREVRANGE arguments """ params = [from_time, to_time] self.appendCount(params, count) @@ -289,8 +316,46 @@ def mrange(self, from_time, to_time, filters, count=None, self.appendWithLabels(params, with_labels) params.extend(['FILTER']) params += filters + return params + + def mrange(self, from_time, to_time, filters, count=None, + aggregation_type=None, bucket_size_msec=0, with_labels=False): + """ + Query a range across multiple time-series by filters in forward direction. + + Args: + from_time: Start timestamp for the range query. - can be used to express the minimum possible timestamp (0). + to_time: End timestamp for range query, + can be used to express the maximum possible timestamp. + filters: filter to match the time-series labels. + count: Optional maximum number of returned results. + aggregation_type: Optional aggregation type. Can be one of ['avg', 'sum', 'min', 'max', 'range', 'count', 'first', + 'last', 'std.p', 'std.s', 'var.p', 'var.s'] + bucket_size_msec: Time bucket for aggregation in milliseconds. + with_labels: Include in the reply the label-value pairs that represent metadata labels of the time-series. + If this argument is not set, by default, an empty Array will be replied on the labels array position. + """ + params = self.__mrange_params(aggregation_type, bucket_size_msec, count, filters, from_time, to_time, with_labels) return self.redis.execute_command(self.MRANGE_CMD, *params) + def mrevrange(self, from_time, to_time, filters, count=None, + aggregation_type=None, bucket_size_msec=0, with_labels=False): + """ + Query a range across multiple time-series by filters in reverse direction. + + Args: + from_time: Start timestamp for the range query. - can be used to express the minimum possible timestamp (0). + to_time: End timestamp for range query, + can be used to express the maximum possible timestamp. + filters: filter to match the time-series labels. + count: Optional maximum number of returned results. + aggregation_type: Optional aggregation type. Can be one of ['avg', 'sum', 'min', 'max', 'range', 'count', 'first', + 'last', 'std.p', 'std.s', 'var.p', 'var.s'] + bucket_size_msec: Time bucket for aggregation in milliseconds. + with_labels: Include in the reply the label-value pairs that represent metadata labels of the time-series. + If this argument is not set, by default, an empty Array will be replied on the labels array position. + """ + params = self.__mrange_params(aggregation_type, bucket_size_msec, count, filters, from_time, to_time, with_labels) + return self.redis.execute_command(self.MREVRANGE_CMD, *params) + def get(self, key): """Gets the last sample of ``key``""" return self.redis.execute_command(self.GET_CMD, key) diff --git a/test_commands.py b/test_commands.py index f6f9965..f4eb4bb 100644 --- a/test_commands.py +++ b/test_commands.py @@ -5,14 +5,21 @@ from redistimeseries.client import Client as RedisTimeSeries from redis import Redis +version = None rts = None port = 6379 class RedisTimeSeriesTest(TestCase): def setUp(self): global rts + global version rts = RedisTimeSeries(port=port) rts.redis.flushdb() + modules = rts.redis.execute_command("module","list") + if modules is not None: + for module_info in modules: + if module_info[1] == b'timeseries': + version = int(module_info[3]) def testCreate(self): '''Test TS.CREATE calls''' @@ -111,6 +118,22 @@ def testRange(self): self.assertEqual(20, len(rts.range(1, 0, 500, aggregation_type='avg', bucket_size_msec=10))) self.assertEqual(10, len(rts.range(1, 0, 500, count=10))) + def testRevRange(self): + '''Test TS.REVRANGE calls which returns reverse range by key''' + # TS.REVRANGE is available since RedisTimeSeries >= v1.4 + if version is None or version < 14000: + return + + for i in range(100): + rts.add(1, i, i % 7) + self.assertEqual(100, len(rts.range(1, 0, 200))) + for i in range(100): + rts.add(1, i+200, i % 7) + self.assertEqual(200, len(rts.range(1, 0, 500))) + #first sample isn't returned + self.assertEqual(20, len(rts.revrange(1, 0, 500, aggregation_type='avg', bucket_size_msec=10))) + self.assertEqual(10, len(rts.revrange(1, 0, 500, count=10))) + def testMultiRange(self): '''Test TS.MRANGE calls which returns range by filter''' @@ -139,6 +162,37 @@ def testMultiRange(self): res = rts.mrange(0, 200, filters=['Test=This'], with_labels=True) self.assertEqual({'Test': 'This'}, res[0]['1'][0]) + def testMultiReverseRange(self): + '''Test TS.MREVRANGE calls which returns range by filter''' + # TS.MREVRANGE is available since RedisTimeSeries >= v1.4 + if version is None or version < 14000: + return + + rts.create(1, labels={'Test': 'This'}) + rts.create(2, labels={'Test': 'This', 'Taste': 'That'}) + for i in range(100): + rts.add(1, i, i % 7) + rts.add(2, i, i % 11) + + res = rts.mrange(0, 200, filters=['Test=This']) + self.assertEqual(2, len(res)) + self.assertEqual(100, len(res[0]['1'][1])) + + res = rts.mrange(0, 200, filters=['Test=This'], count=10) + self.assertEqual(10, len(res[0]['1'][1])) + + for i in range(100): + rts.add(1, i + 200, i % 7) + res = rts.mrevrange(0, 500, filters=['Test=This'], + aggregation_type='avg', bucket_size_msec=10) + self.assertEqual(2, len(res)) + self.assertEqual(20, len(res[0]['1'][1])) + + # test withlabels + self.assertEqual({}, res[0]['1'][0]) + res = rts.mrevrange(0, 200, filters=['Test=This'], with_labels=True) + self.assertEqual({'Test': 'This'}, res[0]['1'][0]) + def testGet(self): '''Test TS.GET calls'''