Skip to content

Commit

Permalink
Merge pull request #2449 from Drakkar-Software/candles_db
Browse files Browse the repository at this point in the history
[Community] handle candles history fetch
  • Loading branch information
GuillaumeDSM authored Oct 24, 2023
2 parents c7941b8 + c82f1ee commit 4464a4f
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 1 deletion.
85 changes: 85 additions & 0 deletions octobot/community/supabase_backend/community_supabase_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import octobot_commons.authentication as authentication
import octobot_commons.logging as commons_logging
import octobot_commons.profiles as commons_profiles
import octobot_commons.enums as commons_enums
import octobot_commons.constants as commons_constants
import octobot.constants as constants
import octobot.community.errors as errors
import octobot.community.supabase_backend.enums as enums
Expand Down Expand Up @@ -322,6 +324,89 @@ async def upsert_portfolio_history(self, portfolio_histories) -> list:
on_conflict=f"{enums.PortfolioHistoryKeys.TIME.value},{enums.PortfolioHistoryKeys.PORTFOLIO_ID.value}"
).execute()).data

async def fetch_candles_history_range(
self, exchange: str, symbol: str, time_frame: commons_enums.TimeFrames
) -> (typing.Union[float, None], typing.Union[float, None]):
min_max = json.loads(
(await self.postgres_functions().invoke(
"get_ohlcv_range",
{"body": {
"exchange_internal_name": exchange,
"symbol": symbol,
"time_frame": time_frame.value,
}}
))["data"]
)[0]
return (
self.get_parsed_time(min_max["min_value"]).timestamp() if min_max["min_value"] else None,
self.get_parsed_time(min_max["max_value"]).timestamp() if min_max["max_value"] else None,
)

async def fetch_candles_history(
self, exchange: str, symbol: str, time_frame: commons_enums.TimeFrames,
first_open_time: float, last_open_time: float
) -> list:
total_candles_count = (last_open_time - first_open_time) // (
commons_enums.TimeFramesMinutes[time_frame] * commons_constants.MINUTE_TO_SECONDS
)
offset = 0
max_size = 0
total_candles = []
max_requests_count = 100
request_count = 0
while request_count < max_requests_count:
request = (
self.table("temp_ohlcv_history").select("*")
.match({
"exchange_internal_name": exchange,
"symbol": symbol,
"time_frame": time_frame.value,
}).gte(
"timestamp", self.get_formatted_time(first_open_time)
).lte(
"timestamp", self.get_formatted_time(last_open_time)
).order(
"timestamp", desc=False
)
)
if offset:
request = request.range(offset, offset+max_size)
fetched_candles = (await request.execute()).data
total_candles += fetched_candles
if len(fetched_candles) < max_size or (max_size == 0 and len(fetched_candles) == total_candles_count):
# fetched everything
break
offset += len(fetched_candles)
if max_size == 0:
max_size = offset
request_count += 1

if request_count == max_requests_count:
commons_logging.get_logger(self.__class__.__name__).info(
f"OHLCV fetch error: too many requests ({request_count}), fetched: {len(total_candles)} candles"
)
return self._format_ohlcvs(total_candles)

def _format_ohlcvs(self, ohlcvs: list):
# uses PriceIndexes order
# IND_PRICE_TIME = 0
# IND_PRICE_OPEN = 1
# IND_PRICE_HIGH = 2
# IND_PRICE_LOW = 3
# IND_PRICE_CLOSE = 4
# IND_PRICE_VOL = 5
return [
[
int(self.get_parsed_time(ohlcv["timestamp"]).timestamp()),
ohlcv["open"],
ohlcv["high"],
ohlcv["low"],
ohlcv["close"],
ohlcv["volume"],
]
for ohlcv in ohlcvs
]

async def get_asset_id(self, bucket_id: str, asset_name: str) -> str:
"""
Not implemented for authenticated users
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Drakkar-Software requirements
OctoBot-Commons==1.9.28
OctoBot-Trading==2.4.35
OctoBot-Trading==2.4.36
OctoBot-Evaluators==1.9.1
OctoBot-Tentacles-Manager==2.9.5
OctoBot-Services==1.6.4
Expand Down

0 comments on commit 4464a4f

Please sign in to comment.