Skip to content

Commit

Permalink
✨ feat: #58 cache more frame type bras and to real-time calc with lua
Browse files Browse the repository at this point in the history
1. add lua script and wrappers under scripts
2. load_lua_script during app.init
3. rebuild unclosed bars during app.init
4. realtime calc unclosed bars after 1m bars is fetched and cached
  • Loading branch information
aaron yang committed Jul 24, 2022
1 parent 30e40b4 commit c82fd07
Show file tree
Hide file tree
Showing 16 changed files with 569 additions and 27 deletions.
4 changes: 4 additions & 0 deletions docs/keyconcept.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# 数据存储
Omega 2.0将数据主要存储在时间序列数据库(influxdb)中。上
# 数据同步
# 数据校准
32 changes: 23 additions & 9 deletions omega/master/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@
import arrow
import cfg4py
from cfg4py.config import Config
from coretypes import FrameType
from omicron.dal import cache
from omicron.models.timeframe import TimeFrame
from coretypes import Frame, FrameType
from omicron import cache, tf

from omega.core import constants
from omega.core.events import Events
Expand All @@ -27,6 +26,7 @@
sync_xrxd_reports,
)
from omega.master.tasks.synctask import BarsSyncTask, master_syncbars_task
from omega.scripts import close_frame, update_unclosed_bar

logger = logging.getLogger(__name__)
cfg: Config = cfg4py.get_instance()
Expand All @@ -35,10 +35,10 @@
async def get_after_hour_sync_job_task() -> Optional[BarsSyncTask]:
"""获取盘后同步的task实例"""
now = arrow.now().naive
if not TimeFrame.is_trade_day(now): # pragma: no cover
if not tf.is_trade_day(now): # pragma: no cover
logger.info("非交易日,不同步")
return
end = TimeFrame.last_min_frame(now, FrameType.MIN1)
end = tf.last_min_frame(now, FrameType.MIN1)
if now < end: # pragma: no cover
logger.info("当天未收盘,禁止同步")
return
Expand Down Expand Up @@ -74,14 +74,14 @@ async def get_sync_minute_date():
end = arrow.now().naive.replace(second=0, microsecond=0)
first = end.replace(hour=9, minute=30, second=0, microsecond=0)
# 检查当前时间是否在交易时间内
if not TimeFrame.is_trade_day(end): # pragma: no cover
if not tf.is_trade_day(end): # pragma: no cover
logger.info("非交易日,不同步")
return False
if end < first: # pragma: no cover
logger.info("时间过早,不能拿到k线数据")
return False

end = TimeFrame.floor(end, FrameType.MIN1)
end = tf.floor(end, FrameType.MIN1)
tail = await cache.sys.get(constants.BAR_SYNC_MINUTE_TAIL)
# tail = "2022-02-22 13:29:00"
if tail:
Expand All @@ -93,8 +93,8 @@ async def get_sync_minute_date():
tail = first

# 取上次同步截止时间+1 计算出n_bars
tail = TimeFrame.floor(tail + datetime.timedelta(minutes=1), FrameType.MIN1)
n_bars = TimeFrame.count_frames(tail, end, FrameType.MIN1) # 获取到一共有多少根k线
tail = tf.floor(tail + datetime.timedelta(minutes=1), FrameType.MIN1)
n_bars = tf.count_frames(tail, end, FrameType.MIN1) # 获取到一共有多少根k线
return end, n_bars


Expand Down Expand Up @@ -128,6 +128,20 @@ async def run_sync_minute_bars_task(task: BarsSyncTask):
constants.BAR_SYNC_MINUTE_TAIL,
task.end.strftime("%Y-%m-%d %H:%M:00"),
)
frame = task.end
for frame_type in (
FrameType.MIN5,
FrameType.MIN15,
FrameType.MIN30,
FrameType.MIN60,
FrameType.DAY,
FrameType.WEEK,
FrameType.MONTH,
):
update_unclosed_bar(frame_type, frame)

if frame == tf.ceiling(frame, frame_type):
close_frame(frame_type, frame)

return task

Expand Down
103 changes: 103 additions & 0 deletions omega/master/tasks/rebuild_unclosed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import logging

import arrow
import numpy as np
from coretypes import FrameType
from omicron import cache, tf
from omicron.models.security import Security
from omicron.models.stock import Stock
from omicron.notify.dingtalk import DingTalkMessage

logger = logging.getLogger(__name__)


async def _rebuild_min_level_unclosed_bars():
"""根据缓存中的分钟线,重建当日已收盘或者未收盘的分钟级别及日线级别数据"""
end = tf.floor(arrow.now().naive, FrameType.MIN1)
keys = await cache.security.keys("bars:1m:*")

errors = 0
for key in keys:
try:
sec = key.split(":")[2]
bars = await Stock._get_cached_bars(sec, end, 240, FrameType.MIN1)
except Exception as e:
logger.exception(e)
logger.warning("failed to get cached bars for %s", sec)
errors += 1
continue

try:
for frame_type in tf.minute_level_frames[1:]:
resampled = Stock.resample(bars, FrameType.MIN1, frame_type)
if tf.is_bar_closed(resampled[-1]["frame"], frame_type):
await Stock.cache_bars(sec, frame_type, resampled)
else:
await Stock.cache_bars(sec, frame_type, resampled[:-1])
await Stock.cache_unclosed_bars(sec, frame_type, resampled[-1:])

# 重建日线数据
resampled = Stock.resample(bars, FrameType.MIN1, FrameType.DAY)
await Stock.cache_unclosed_bars(sec, FrameType.DAY, resampled)
except Exception as e:
logger.exception(e)
logger.warning(
"failed to build unclosed bar for %s, frame type is %s", sec, frame_type
)
errors += 1

if errors > 0:
DingTalkMessage.text(f"重建分钟级缓存数据时,出现{errors}个错误。")


async def _rebuild_day_level_unclosed_bars():
"""重建当期未收盘的周线、月线
!!!Info:
最终我们需要实时更新年线和季线。目前数据库还没有同步这两种k线。
"""
codes = await Security.select().eval()
end = arrow.now().date()
# just to cover one month's day bars at most
n = 30
start = tf.day_shift(end, -n)

errors = 0
for code in codes:
try:
bars = await Stock._get_persisted_bars(
code, FrameType.DAY, begin=start, end=end
)
except Exception as e:
logger.exception(e)
logger.warning("failed to get persisted bars for %s from %s to %s", code, start, end)
errors += 1
continue

try:
unclosed_day = await Stock._get_cached_day_bar(code)
bars = np.concatenate([bars, unclosed_day])

week = Stock.resample(bars, FrameType.DAY, FrameType.WEEK)
await Stock.cache_unclosed_bars(code, FrameType.WEEK, week[-1:])

month = Stock.resample(bars, FrameType.DAY, FrameType.MONTH)
await Stock.cache_unclosed_bars(code, FrameType.MONTH, month[-1:])
except Exception as e:
logger.exception(e)
logger.warning(
"failed to build unclosed bar for %s, got bars %s", code, len(bars)
)
errors += 1

if errors > 0:
DingTalkMessage.text(f"重建日线级别缓存数据时,出现{errors}个错误。")


async def rebuild_unclosed_bars():
"""在omega启动时重建未收盘数据
后续未收盘数据的更新,将在每个分钟线同步完成后,调用lua脚本进行。
"""
await _rebuild_min_level_unclosed_bars()
await _rebuild_day_level_unclosed_bars()
62 changes: 62 additions & 0 deletions omega/scripts/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import logging
import os

from coretypes import Frame, FrameType
from omicron import cache, tf
from omicron.notify.dingtalk import DingTalkMessage

logger = logging.getLogger(__name__)


async def load_lua_script():
"""加载lua脚本到redis中"""
dir_ = os.path.dirname(os.path.abspath(__file__))
for file in os.listdir(dir_):
if not file.endswith(".lua"):
continue

path = os.path.join(dir_, file)
with open(path, "r", encoding="utf-8") as f:
content = f.read()

r = await cache.sys.execute("FUNCTION", "LOAD", "REPLACE", content)
print(r)


async def update_unclosed_bar(frame_type: FrameType, source_min: Frame):
"""wraps the cognominal lua script function
Args:
frame_type: which frame type to be updated/merged
source_min: the minute bar to be merged from
"""
source = tf.time2int(source_min)
try:
await cache.security.execute(
"fcall", "update_unclosed", 0, frame_type.value, source
)
except Exception as e:
msg = f"实时合并{frame_type}未收盘行情数据错误:{source_min}"
logger.exception(e)
logging.warning(msg)
DingTalkMessage.text(msg)


async def close_frame(frame_type: FrameType, frame: Frame):
"""wraps the cognominal lua script function
Args:
frame_type: which frame type to be closed
frame: the closed frame
"""
dst = (
tf.date2int(frame) if frame_type in tf.day_level_frames else tf.time2int(frame)
)

try:
await cache.security.execute("fcall", "close_frame", 0, frame_type.value, dst)
except Exception as e:
msg = f"缓存收盘{frame_type}数据失败: {frame}"
logger.exception(e)
logger.warning(msg)
DingTalkMessage.text(msg)
112 changes: 112 additions & 0 deletions omega/scripts/omega.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#!lua name=omega

local function round2(num)
return math.floor(num * 100 + 0.5) / 100
end

local function newsplit(delimiter, str)
assert(type(delimiter) == "string")
assert(#delimiter > 0, "Must provide non empty delimiter")

-- Add escape characters if delimiter requires it
-- delimiter = delimiter:gsub("[%(%)%.%%%+%-%*%?%[%]%^%$]", "%%%0")

local start_index = 1
local result = {}

while true do
local delimiter_index, _ = str:find(delimiter, start_index)

if delimiter_index == nil then
table.insert(result, str:sub(start_index))
break
end

table.insert(result, str:sub(start_index, delimiter_index - 1))

start_index = delimiter_index + 1
end

return result
end

local function close_frame(keys_, args)
--local function close_frame(frame_type, frame)
-- close the frame, write unclosed_5m hash to bars:5m:{code} hash.
local frame_type, frame = unpack(args)
local hm = redis.call('hgetall', 'bars:' .. frame_type .. ':unclosed')

for i = 1, #hm, 2 do
local code = hm[i]
local bar = hm[i + 1]
redis.call('hset', 'bars:' .. frame_type .. ':' .. code, frame, bar)
end

redis.call('del', 'bars:' .. frame_type .. ':unclosed')
end

local function decode_bar(bars)
-- 将string表示的bar解码成为正确类型的OHLC,但对frame仍保持为字符串
local frame, open, high, low, close, volume, amount, factor = unpack(newsplit(',', bars))

return frame, round2(tonumber(open)), round2(tonumber(high)), round2(tonumber(low)), round2(tonumber(close)), tonumber(volume), tonumber(amount), tonumber(factor)
end

local function update_unclosed(keys_, args)
--local function update_unclosed(frame_type, min_frame)
-- merge bars:{frame_type.value}:unclosed with bars:1m:{code} hash.
-- args are: frame_type(str), min_frame(int, minute frame)

local frame_type, min_frame = unpack(args)
local unclosed_key = 'bars:' .. frame_type .. ':unclosed'

-- bars:1m:* should contains NO bars:1m:unclosed
local keys = redis.call('keys', 'bars:1m:*')

for _, key_ in ipairs(keys) do
local code = key_:match('bars:1m:(.*)')

-- get 1m bar to merge from
local mbar = redis.call('hget', key_, min_frame)
if mbar then
local t2, o2, h2, l2, c2, v2, a2, f2 = decode_bar(mbar)

-- get unclosed bar and do the merge
local unclosed = redis.call('hget', unclosed_key, code)

local t, opn, high, low, close, volume, amount, factor = '', o2, h2, l2, c2, v2, a2, f2
if unclosed then
local _, o1, h1, l1, c1, v1, a1, f1 = decode_bar(unclosed)
opn = o1
high = math.max(h1, h2)
low = math.min(l1, l2)
close = c2
volume = v1 + v2
amount = a1 + a2
factor = f2
end

-- save unclosed bar
local bar = min_frame .. ',' .. opn .. ',' .. high .. ',' .. low .. ',' .. close .. ',' .. volume .. ',' .. amount .. ',' .. factor
redis.call('hset', unclosed_key, code, bar)
end
end
end

-- update_unclosed('5m', 202207180935, 202207180931)
-- update_unclosed('5m', 202207180935, 202207180932)
-- update_unclosed('5m', 202207180935, 202207180933)
-- update_unclosed('5m', 202207180935, 202207180935)
-- update_unclosed('5m', 202207180935, 202207180936)


-- close_frame('5m', "2022020935")

redis.register_function(
'close_frame',
close_frame
)
redis.register_function(
'update_unclosed',
update_unclosed
)
Loading

0 comments on commit c82fd07

Please sign in to comment.