- Introduction
- Installation
- Usage
README.md is auto generated by the script tests/generate_readme_markdown.py from testing files,
DO NOT EDIT DIRECTLY! ;)
python3 tests/generate_readme_markdown.py
A python3.6+ toolbox with multi useful utils, functions, decorators in pythonic way, and is fully tested from python3.6 to python3.11 .
pip3 install pythonic-toolbox --upgrade
import pytest
from pythonic_toolbox.decorators.common import ignore_unexpected_kwargs
# Following functions are named under Metasyntactic Variables, like:
# foobar, foo, bar, baz, qux, quux, quuz, corge,
# grault, garply, waldo, fred, plugh, xyzzy, thud
def foo(a, b=0, c=3):
return a, b, c
dct = {'a': 1, 'b': 2, 'd': 4}
with pytest.raises(TypeError) as __:
assert foo(**dct) == (1, 2, 3)
wrapped_foo = ignore_unexpected_kwargs(foo)
assert wrapped_foo(**dct) == (1, 2, 3)
assert wrapped_foo(0, 0, 0) == (0, 0, 0)
assert wrapped_foo(a=1, b=2, c=3) == (1, 2, 3)
@ignore_unexpected_kwargs
def bar(*args: int):
return sum(args)
# should not change original behavior
assert bar(1, 2, 3) == 6
assert bar(1, 2, 3, unexpected='Gotcha') == 6
nums = [1, 2, 3]
assert bar(*nums, unexpected='Gotcha') == 6
@ignore_unexpected_kwargs
def qux(a, b, **kwargs):
# function with Parameter.VAR_KEYWORD Aka **kwargs
return a, b, kwargs.get('c', 3), kwargs.get('d', 4)
assert qux(**{'a': 1, 'b': 2, 'd': 4, 'e': 5}) == (1, 2, 3, 4)
class Person:
@ignore_unexpected_kwargs
def __init__(self, name, age, sex):
self.name = name
self.age = age
self.sex = sex
@classmethod
@ignore_unexpected_kwargs
def create(cls, name, age, sex):
return cls(name, age, sex)
@staticmethod
@ignore_unexpected_kwargs
def greetings(name):
return f'Hello, I am {name}'
params = {
'name': 'albert',
'age': 34,
'sex': 'male',
'height': '170cm',
}
__ = Person(**params)
__ = Person('albert', 35, 'male', height='170cm')
# test cases for classmethod, staticmethod
__ = Person.create(**params)
assert Person.greetings(**params)
import pytest
from pythonic_toolbox.decorators.common import retry
# use decorator without any arguments, using retry default params
@retry
def func_fail_first_time():
"""func_fail_first_time"""
self = func_fail_first_time
if not hasattr(self, 'call_times'):
# set attribute call_times for function, to count call times
self.call_times = 0
self.call_times += 1
if self.call_times == 1:
raise Exception('Fail when first called')
return 'ok'
assert func_fail_first_time() == 'ok'
assert func_fail_first_time.call_times == 2
assert func_fail_first_time.__doc__ == 'func_fail_first_time'
@retry(tries=2, delay=0.1) # use decorator with customized params
def func_fail_twice():
"""func_fail_twice"""
self = func_fail_twice
if not hasattr(self, 'call_times'):
self.call_times = 0
self.call_times += 1
if self.call_times <= 2:
raise Exception('Fail when called first, second time')
return 'ok'
assert func_fail_twice() == 'ok'
assert func_fail_twice.call_times == 3
assert func_fail_twice.__doc__ == 'func_fail_twice'
@retry(tries=2, delay=0.1)
def func_fail_three_times():
"""func_fail_three_times"""
self = func_fail_three_times
if not hasattr(self, 'call_times'):
self.call_times = 0
self.call_times += 1
if self.call_times <= 3: # 1, 2, 3
raise Exception('Fail when called first, second, third time')
return 'ok'
with pytest.raises(Exception) as exec_info:
func_fail_three_times()
assert func_fail_three_times.call_times == 3
assert exec_info.value.args[0] == 'Fail when called first, second, third time'
def raw_func_fail_first_time():
"""func_fail_first_time"""
self = raw_func_fail_first_time
if not hasattr(self, 'call_times'):
# set attribute call_times for function, to count call times
self.call_times = 0
self.call_times += 1
if self.call_times == 1:
raise Exception('Fail when first called')
return 'ok'
assert retry(raw_func_fail_first_time)() == 'ok'
# test cases when function has arguments, kwargs
@retry(tries=1, delay=0.1)
def func_fail_first_time_with_parameters(p1, p2):
"""func_fail_first_time"""
self = func_fail_first_time_with_parameters
if not hasattr(self, 'call_times'):
# set attribute call_times for function, to count call times
self.call_times = 0
self.call_times += 1
if self.call_times == 1:
raise Exception('Fail when first called')
return p1 + p2
assert func_fail_first_time_with_parameters(1, 2) == 3
def func_fail_first_time_with_parameters(p1, p2):
"""func_fail_first_time"""
self = func_fail_first_time_with_parameters
if not hasattr(self, 'call_times'):
# set attribute call_times for function, to count call times
self.call_times = 0
self.call_times += 1
if self.call_times == 1:
raise Exception('Fail when first called')
return p1 + p2
assert retry(tries=1, delay=0.1)(func_fail_first_time_with_parameters)(1, 2) == 3
assert retry(tries=1, delay=0.1)(func_fail_first_time_with_parameters)(p1=1, p2=2) == 3
import asyncio
@retry
async def async_func_fail_first_time():
"""async_func_fail_first_time"""
self = async_func_fail_first_time
if not hasattr(self, 'call_times'):
self.call_times = 0
self.call_times += 1
if self.call_times == 1:
raise Exception('Fail when first called')
return 'ok'
@retry(delay=0.1)
async def async_func_fail_first_time2():
"""async_func_fail_first_time2"""
self = async_func_fail_first_time2
if not hasattr(self, 'call_times'):
self.call_times = 0
self.call_times += 1
if self.call_times == 1:
raise Exception('Fail when first called')
return 'ok'
async def async_main():
assert await async_func_fail_first_time() == 'ok'
assert async_func_fail_first_time.__doc__ == 'async_func_fail_first_time'
assert async_func_fail_first_time.call_times == 2
assert await async_func_fail_first_time2() == 'ok'
assert async_func_fail_first_time2.call_times == 2
assert async_func_fail_first_time2.__doc__ == 'async_func_fail_first_time2'
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(async_main())
finally:
loop.close()
import random
fail_count = 0
@retry(delay=0.1)
async def always_fail_func():
nonlocal fail_count
fail_count += 1
await asyncio.sleep(random.random())
raise ValueError()
async def async_main_for_always_fail():
nonlocal fail_count
tasks = [always_fail_func() for i in range(0, 3)]
results = await asyncio.gather(*tasks, return_exceptions=True)
assert all(map(lambda e: isinstance(e, ValueError), results))
assert fail_count == 2 * 3 # each func run twice, three func calls
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(async_main_for_always_fail())
finally:
loop.close()
from collections import deque
import pytest
from pythonic_toolbox.utils.deque_utils import deque_pop_any
queue = deque([1, 2, 3, 4, 5])
assert deque_pop_any(queue, idx=1) == 2
assert queue == deque([1, 3, 4, 5])
# edge case: same as deque.popleft()
queue = deque([1, 2, 3, 4, 5])
assert deque_pop_any(queue, idx=0) == 1
assert queue == deque([2, 3, 4, 5])
# edge case: same as deque.popright()
queue = deque([1, 2, 3, 4, 5])
assert deque_pop_any(queue, idx=len(queue) - 1) == 5
assert queue == deque([1, 2, 3, 4])
queue = deque([1, 2, 3, 4, 5])
with pytest.raises(IndexError) as exec_info:
deque_pop_any(queue, idx=102)
# edge case: pop from empty deque
queue = deque()
with pytest.raises(IndexError) as exec_info:
deque_pop_any(queue, idx=0)
assert exec_info.value.args[0] == 'pop from empty deque'
import pytest
from collections import deque
from pythonic_toolbox.utils.deque_utils import deque_split
queue1, queue2 = deque_split(deque([1, 2, 3, 4, 5]), num=3)
assert queue1 == deque([1, 2, 3])
assert queue2 == deque([4, 5])
queue1, queue2 = deque_split(deque([1, 2, 3, 4, 5]), num=0)
assert queue1 == deque([])
assert queue2 == deque([1, 2, 3, 4, 5])
queue1, queue2 = deque_split(deque([1, 2, 3, 4, 5]), num=100)
assert queue1 == deque([1, 2, 3, 4, 5])
assert queue2 == deque([])
with pytest.raises(ValueError) as exec_info:
deque_split(deque([1, 2, 3, 4, 5]), -1)
assert exec_info.value.args[0] == 'num must be integer: 0 <= num <= sys.maxsize'
from copy import deepcopy
import pytest
from pythonic_toolbox.utils.dict_utils import DictObj
naive_dct = {
'key1': 'val1',
'key2': 'val2',
}
obj = DictObj(naive_dct)
# test basic functional methods like dict
assert len(obj) == 2
assert bool(obj) is True
# same behavior like ordinary dict according to the python version (FILO for popitem for 3.6+)
assert obj.popitem() == ('key2', 'val2')
assert obj.popitem() == ('key1', 'val1')
with pytest.raises(KeyError) as __:
obj.popitem()
# a key can be treated like an attribute
# an attribute can be treated like a key
obj.key3 = 'val3'
assert obj.pop('key3') == 'val3'
with pytest.raises(KeyError) as __:
obj.pop('key4')
obj.key5 = 'val5'
del obj.key5
with pytest.raises(KeyError) as __:
obj.pop('key5')
with pytest.raises(AttributeError) as __:
del obj.key5
# test deepcopy
obj = DictObj({'languages': ['Chinese', 'English']})
copied_obj = deepcopy(obj)
assert copied_obj == obj
copied_obj.languages = obj.languages + ['Japanese']
assert obj.languages == ['Chinese', 'English']
assert copied_obj.languages == ['Chinese', 'English', 'Japanese']
assert copied_obj != obj
person_dct = {'name': 'Albert', 'age': '34', 'sex': 'Male', 'languages': ['Chinese', 'English']}
person = DictObj(person_dct)
assert DictObj(person_dct) == DictObj(person_dct)
assert person.to_dict() == person_dct
assert set(person.keys()) == {'name', 'age', 'sex', 'languages'}
assert hasattr(person, 'name') is True
assert person.name == 'Albert'
assert person['name'] == 'Albert'
person.languages.append('Japanese')
assert person.languages == ['Chinese', 'English', 'Japanese']
person.height = '170'
assert person['height'] == '170'
assert 'height' in person
assert 'height' in person.keys()
assert hasattr(person, 'height') is True
del person['height']
assert 'height' not in person
assert 'height' not in person.keys()
person['height'] = '170cm'
person.update({'weight': '50'})
weight_val = person.pop('weight')
assert weight_val == '50'
person.update(DictObj({'weight': '50kg'}))
assert person.weight == '50kg'
expected = {
'name': 'Albert', 'age': '34', 'sex': 'Male',
'languages': ['Chinese', 'English', 'Japanese'], # appended new language
'height': '170cm', # new added attribute
'weight': '50kg', # new added attribute
}
assert person.to_dict() == expected
repr_expected: str = ("{'name': 'Albert', 'age': '34', 'sex': 'Male', "
"'languages': ['Chinese', 'English', 'Japanese'],"
" 'height': '170cm', 'weight': '50kg'}")
assert repr(person) == repr_expected
# nested structure will be detected, and changed to DictObj
chessboard_data = {
'position': [
[{'name': 'knight'}, {'name': 'pawn'}],
[{'name': 'pawn'}, {'name': 'queen'}],
]
}
chessboard_obj = DictObj(chessboard_data)
# test comparing instances of DictObj
assert DictObj(chessboard_data) == DictObj(chessboard_data)
assert isinstance(chessboard_obj.position, list)
assert len(chessboard_obj.position) == 2
assert isinstance(chessboard_obj.position[0][0], DictObj)
assert chessboard_obj.position[0][0].name == 'knight'
assert chessboard_obj.position[1][1].name == 'queen'
# edge case empty DictObj
empty_dict_obj = DictObj({})
assert len(empty_dict_obj) == 0
assert bool(empty_dict_obj) is False
obj_dict = DictObj({'data': 'oops'})
assert obj_dict.data == 'oops'
# params validation
invalid_key_dct = {
1: '1',
}
# test when dict's key is not str
with pytest.raises(ValueError) as __:
__ = DictObj(invalid_key_dct)
complicated_key_dct = {
'1abc': 'Gotcha', # '1abc' is not valid identifier for Python, so obj.1abc will cause SyntaxError
'class': 'MyClass', # 'class' is keyword in Python, so obj.class will cause SyntaxError
}
obj_dict = DictObj(complicated_key_dct)
assert obj_dict['1abc'] == 'Gotcha'
assert getattr(obj_dict, '1abc') == 'Gotcha'
# you can access '1abc' as attribute by adding prefix '_'
assert obj_dict._1abc == 'Gotcha'
del obj_dict._1abc
assert obj_dict['class'] == 'MyClass'
assert getattr(obj_dict, 'class') == 'MyClass'
# you can access 'class' as attribute by adding prefix '_'
assert obj_dict._class == 'MyClass'
# test re-assign new value for 'class'
obj_dict._class = 'MyClass2'
assert obj_dict._class == 'MyClass2'
assert obj_dict['class'] == 'MyClass2'
assert getattr(obj_dict, 'class') == 'MyClass2'
del obj_dict._class
# if assign new attributes (_2, _try), ObjDict will treat it like what the originally are
# this is fully considered by design, you're not encouraged to mess up keys
obj_dict._2x = 'NewAttr'
assert obj_dict._2x == 'NewAttr'
assert obj_dict['_2x'] == 'NewAttr'
with pytest.raises(KeyError):
__ = obj_dict['2x']
with pytest.raises(AttributeError):
__ = getattr(obj_dict, '2x')
obj_dict._try = 'NewAttr'
assert obj_dict._try == 'NewAttr'
assert obj_dict['_try'] == 'NewAttr'
with pytest.raises(KeyError):
__ = obj_dict['NewAttr']
with pytest.raises(AttributeError):
__ = getattr(obj_dict, 'NewAttr')
# Demo for messing up key 'class'
# delete and re-assign _class
complicated_key_dct = {
'class': 'MyClass', # 'class' is keyword in Python, so obj.class will cause SyntaxError
}
obj_dict = DictObj(complicated_key_dct)
assert obj_dict['class'] == 'MyClass'
obj_dict._class = 'MyClass2'
assert obj_dict['class'] == 'MyClass2'
del obj_dict._class
# obj_dict has no knowledge about 'class' or '_class'
# so '_class' is a brand-new attribute, and will be stored as '_class'
obj_dict._class = 'MyClass3'
with pytest.raises(KeyError):
# Oops!!! by-design
# 'class' cannot be accessed as key anymore,
# because we store '_class' as key as other valid keys behave
assert obj_dict['class'] == 'MyClass3'
assert obj_dict['_class'] == 'MyClass3'
# thread safe testing
import sys
from threading import Thread
from pythonic_toolbox.decorators.decorator_utils import method_synchronized
class MyObjDict(DictObj):
# implement a thread-safe method to increase the value of cnt
@method_synchronized
def increase_cnt_by_n(self, n):
self.cnt += n
def increase_cnt_by_100(dict_obj):
for i in range(100):
dict_obj.increase_cnt_by_n(1)
sw_interval = sys.getswitchinterval()
try:
sys.setswitchinterval(0.0001)
my_dict_obj = MyObjDict({'cnt': 0})
threads = [Thread(target=increase_cnt_by_100, args=(my_dict_obj,)) for _ in range(100)]
[t.start() for t in threads]
[t.join() for t in threads]
assert my_dict_obj.cnt == 10000
finally:
sys.setswitchinterval(sw_interval)
# test copy/deepcopy of DictObj
import copy
person = DictObj({'name': 'albert', 'age': 33})
team = DictObj({'leader': person})
shallow_copy_of_team = copy.copy(team)
assert team.leader is shallow_copy_of_team.leader
deep_copy_of_team = copy.deepcopy(team)
assert team.leader is not deep_copy_of_team.leader
assert team.leader == deep_copy_of_team.leader
from typing import cast
import pytest
from pythonic_toolbox.utils.dict_utils import FinalDictObj
person_dct = {'name': 'Albert', 'age': '34', 'sex': 'Male', 'languages': ['Chinese', 'English']}
fixed_person = FinalDictObj(person_dct)
assert fixed_person.name == 'Albert'
# FINAL means once initialized, you cannot change the key/attribute anymore
with pytest.raises(RuntimeError) as exec_info:
fixed_person.name = 'Steve'
expected_error_str = 'Cannot modify attribute/item in an already initialized FinalDictObj'
assert exec_info.value.args[0] == expected_error_str
with pytest.raises(RuntimeError) as __:
fixed_person.popitem()
with pytest.raises(RuntimeError) as __:
fixed_person.pop('name')
assert isinstance(fixed_person.languages, tuple)
with pytest.raises(AttributeError) as exec_info:
# list values are changed into tuple to avoid being modified
cast(list, fixed_person.languages).append('Japanese')
expected_error_str = "'tuple' object has no attribute 'append'"
assert exec_info.value.args[0] == expected_error_str
assert fixed_person.to_dict() == person_dct
# nested structure will be detected, and changed to FinalDictObj
chessboard_data = {
'position': [
[{'name': 'knight'}, {'name': 'pawn'}],
[{'name': 'pawn'}, {'name': 'queen'}],
]
}
chessboard_obj = FinalDictObj(chessboard_data)
# test comparing instances of FinalDictObj
assert FinalDictObj(chessboard_data) == FinalDictObj(chessboard_data)
assert isinstance(chessboard_obj.position, tuple)
assert isinstance(chessboard_obj.position[0][0], FinalDictObj)
assert chessboard_obj.position[1][1].name == 'queen'
with pytest.raises(RuntimeError) as __:
chessboard_obj.position[1][1].name = 'knight'
# test for keyword/non-identifier key as attribute
final_obj_dict = FinalDictObj({
'class': 'MyClass', # 'class' is keyword in Python, so obj.class will cause SyntaxError
})
assert final_obj_dict['class'] == 'MyClass'
assert getattr(final_obj_dict, 'class') == 'MyClass'
assert final_obj_dict._class == 'MyClass'
# test copy/deepcopy of FileDictObj
import copy
person = FinalDictObj({'name': 'albert', 'age': 33})
team = FinalDictObj({'leader': person})
shallow_copy_of_team = copy.copy(team)
assert team.leader is shallow_copy_of_team.leader
assert team.leader == shallow_copy_of_team.leader
deep_copy_of_team = copy.deepcopy(team)
assert team.leader is not deep_copy_of_team.leader
assert team.leader == deep_copy_of_team.leader
import pytest
from pythonic_toolbox.utils.dict_utils import RangeKeyDict
# test normal case
range_key_dict: RangeKeyDict[float, str] = RangeKeyDict({
(float('-inf'), 0): 'Negative',
(0, 60): 'F', # 0 <= val < 60
(60, 70): 'D', # 60 <= val < 70
(70, 80): 'C', # 70 <= val < 80
(80, 90): 'B', # 80 <= val < 90
(90, 100): 'A', # 90 <= val < 100
100: 'A+', # val == 100
})
# Big O of querying is O(log n), n is the number of ranges, due to using bisect inside
assert range_key_dict[-1] == 'Negative'
assert range_key_dict[0] == 'F'
assert range_key_dict[55] == 'F'
assert range_key_dict[60] == 'D'
assert range_key_dict[75] == 'C'
assert range_key_dict[85] == 'B'
assert range_key_dict[95] == 'A'
assert range_key_dict[100] == 'A+'
with pytest.raises(KeyError) as exec_info:
_ = range_key_dict['95'] # when key is not comparable with other integer keys
assert exec_info.value.args[0] == "KeyError: '95' is not comparable with other keys"
with pytest.raises(KeyError) as exec_info:
_ = range_key_dict[150]
assert exec_info.value.args[0] == 'KeyError: 150'
assert range_key_dict.get(150, 'N/A') == 'N/A'
# test comparison with other RangeKeyDict
assert RangeKeyDict({(0, 10): '1'}) == RangeKeyDict({(0, 10): '1'})
assert RangeKeyDict({(0, 10): '1'}) != RangeKeyDict({(0, 10): '2'})
assert RangeKeyDict({(0, 10): '1'}) != RangeKeyDict({(0, 1000): '1'})
with pytest.raises(ValueError):
# [1, 1) is not a valid range
# there's no value x satisfy 1 <= x < 1
RangeKeyDict({(1, 1): '1'})
with pytest.raises(ValueError):
# [1, -1) is not a valid range
RangeKeyDict({(1, -1): '1'})
# validate input keys types and detect range overlaps(segment intersect)
with pytest.raises(ValueError) as exec_info:
RangeKeyDict({
(0, 10): 'val-between-0-and-10',
(0, 5): 'val-between-0-and-5'
})
expected_error_msg = ("Duplicated left boundary key 0 detected: "
"(0, 10): 'val-between-0-and-10', (0, 5): 'val-between-0-and-5'")
assert exec_info.value.args[0] == expected_error_msg
with pytest.raises(ValueError) as exec_info:
RangeKeyDict({
(0, 10): 'val-between-0-and-10',
(5, 15): 'val-between-5-and-15'
})
expected_error_msg = ("Overlap detected: "
"(0, 10): 'val-between-0-and-10', (5, 15): 'val-between-5-and-15'")
assert exec_info.value.args[0] == expected_error_msg
# test RangeKeyDict with no continuous ranges
range_key_dict: RangeKeyDict[float, str] = RangeKeyDict({
(0, 60): 'F', # 0 <= val < 60
(70, 80): 'C', # 70 <= val < 80
})
assert range_key_dict[10] == 'F'
with pytest.raises(KeyError) as exec_info:
_ = range_key_dict[-100]
assert exec_info.value.args[0] == 'KeyError: -100'
with pytest.raises(KeyError) as exec_info:
_ = range_key_dict[65]
assert exec_info.value.args[0] == 'KeyError: 65'
with pytest.raises(KeyError) as exec_info:
_ = range_key_dict[100]
assert exec_info.value.args[0] == 'KeyError: 100'
from functools import total_ordering
@total_ordering
class Age:
def __init__(self, val: float):
if not isinstance(val, (int, float)):
raise ValueError('Invalid age value')
self.val = val
def __le__(self, other):
return self.val <= other.val
def __repr__(self):
return f'Age({repr(self.val)})'
def __hash__(self):
return hash(self.val)
age_categories_map: RangeKeyDict[Age, str] = RangeKeyDict({
(Age(0), Age(2)): 'Baby',
(Age(2), Age(15)): 'Children',
(Age(15), Age(25)): 'Youth',
(Age(25), Age(65)): 'Adults',
(Age(65), Age(123)): 'Seniors',
})
assert age_categories_map[Age(0.5)] == 'Baby'
assert age_categories_map[Age(12)] == 'Children'
assert age_categories_map[Age(20)] == 'Youth'
assert age_categories_map[Age(35)] == 'Adults'
assert age_categories_map[Age(70)] == 'Seniors'
import pytest
from pythonic_toolbox.utils.dict_utils import StrKeyIdDict
data = {1: 'a', 2: 'b', '3': 'c'}
my_dict = StrKeyIdDict(data)
# usage: value can be accessed by id (str: int-like/uuid-like/whatever) or id (int)
assert my_dict['1'] == my_dict[1] == 'a'
assert my_dict.keys() == {'1', '2', '3'} # all keys are str type
my_dict['4'] = 'd'
assert my_dict['4'] == 'd'
my_dict[4] = 'd'
assert my_dict['4'] == 'd'
my_dict.update({4: 'd'})
assert my_dict['4'] == 'd'
# test comparing instances of the class
assert StrKeyIdDict(data) == StrKeyIdDict(data)
assert StrKeyIdDict(data) != StrKeyIdDict(dict(data, **{'4': 'd'}))
assert StrKeyIdDict(data) == {'1': 'a', '2': 'b', '3': 'c'}
assert StrKeyIdDict(data) != {'1': 'a', '2': 'b', '3': 'd'}
assert StrKeyIdDict(data) != {1: 'a', 2: 'b', 3: 'c'} # StrKeyIdDict assumes all keys are strings
# test delete key
del my_dict[4]
assert my_dict.keys() == {'1', '2', '3'} # '4' is not in the dict anymore
# assign value to an arbitrary string key that is not in the dict
my_dict.update({'some-uuid': 'something'})
assert my_dict['some-uuid'] == 'something'
with pytest.raises(TypeError):
# key '1', 1 both stands for key '1',
# so we get duplicated keys when initializing instance, oops!
my_dict = StrKeyIdDict({'1': 'a', 1: 'A'})
assert my_dict.get(1) == 'a'
assert my_dict.get('NotExistKey') is None
assert my_dict.get('NotExistKey', 'NotExistValue') == 'NotExistValue'
# test edge cases
assert StrKeyIdDict() == {}
# test shallow copy
my_dict[5] = ['e1', 'e2', 'e3']
copy_dict = my_dict.copy()
copy_dict[1] = 'A'
assert my_dict[1] == 'a'
my_dict['5'].append('e4')
assert copy_dict['5'] == ['e1', 'e2', 'e3', 'e4']
# test deep copy
from copy import deepcopy
copy_dict = deepcopy(my_dict)
my_dict[5].append('e5')
assert my_dict['5'] == ['e1', 'e2', 'e3', 'e4', 'e5']
assert copy_dict[5] == ['e1', 'e2', 'e3', 'e4']
# test constructor
my_dict = StrKeyIdDict(uuid1='a', uuid2='b')
assert my_dict['uuid1'] == 'a'
# test constructor (from keys)
my_dict = StrKeyIdDict.fromkeys([1, 2, 3], None)
assert my_dict == {'1': None, '2': None, '3': None}
# test update and overwrite
my_dict.update(StrKeyIdDict({1: 'a', 2: 'b', 3: 'c', 4: 'd'}))
assert my_dict == {'1': 'a', '2': 'b', '3': 'c', '4': 'd'}
my_dict = StrKeyIdDict([(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')])
assert my_dict['1'] == my_dict[1] == 'a'
# reassign StrKeyIdDict instance to another StrKeyIdDict instance
my_dict = StrKeyIdDict(my_dict)
assert my_dict == {'1': 'a', '2': 'b', '3': 'c', '4': 'd'}
assert dict(my_dict) == {'1': 'a', '2': 'b', '3': 'c', '4': 'd'}
# test case when "key" is "data", which is a reserved keyword inside StrKeyIdDict
my_dict = StrKeyIdDict({'data': 'data_value', '1': 'a'})
assert my_dict['data'] == 'data_value'
assert my_dict['1'] == 'a'
# delete key 'data', should not affect other keys
del my_dict['data']
assert my_dict['1'] == 'a'
from pythonic_toolbox.utils.dict_utils import collect_leaves
# a nested dict-like struct
my_dict = {
'node_1': {
'node_1_1': {
'node_1_1_1': 'A',
},
'node_1_2': {
'node_1_2_1': 'B',
'node_1_2_2': 'C',
'node_1_2_3': None,
},
'node_1_3': [ # dict list
{
'node_1_3_1_1': 'D',
'node_1_3_1_2': 'E',
},
{
'node_1_3_2_1': 'FF',
'node_1_3_2_2': 'GG',
}
]
}}
expected = ['A', 'B', 'C', None, 'D', 'E', 'FF', 'GG']
assert collect_leaves(my_dict) == expected
expected = ['A', 'B', 'C', 'D', 'E', 'FF', 'GG']
assert collect_leaves(my_dict, leaf_pred=lambda lf: lf) == expected
assert collect_leaves(my_dict, keypath_pred=lambda kp: len(kp) == 1) == []
expected = ['B', 'C']
assert collect_leaves(my_dict, keypath_pred=lambda kp: kp[-1] in {'node_1_2_1', 'node_1_2_2'}) == expected
expected = ['C']
assert collect_leaves(my_dict, leaf_pred=lambda lf: lf == 'C') == expected
assert collect_leaves(my_dict,
keypath_pred=lambda kp: kp[-1] == 'node_1_2_2',
leaf_pred=lambda lf: lf == 'C') == expected
assert collect_leaves(my_dict,
keypath_pred=lambda kp: kp[-1] == 'node_1_1_1',
leaf_pred=lambda lf: lf == 'C') == []
expected = ['D', 'E', 'FF', 'GG']
assert collect_leaves(my_dict,
keypath_pred=lambda kp: len(kp) >= 2 and kp[-2] == 'node_1_3') == expected
expected = ['FF', 'GG']
assert collect_leaves(my_dict,
keypath_pred=lambda kp: len(kp) >= 2 and kp[-2] == 'node_1_3',
leaf_pred=lambda lf: isinstance(lf, str) and len(lf) == 2) == expected
# edge cases
assert collect_leaves([]) == []
assert collect_leaves({}) == []
assert collect_leaves(None) == []
from pythonic_toolbox.utils.dict_utils import dict_until
data = {'full_name': 'Albert Lee', 'pen_name': None}
assert dict_until(data, keys=['name', 'full_name']) == 'Albert Lee'
assert dict_until(data, keys=['full_name', 'name']) == 'Albert Lee'
assert dict_until(data, keys=['name', 'english_name']) is None
assert dict_until(data, keys=['name', 'english_name'], default='anonymous') == 'anonymous'
# test when pen_name is set None on purpose
assert dict_until(data, keys=['pen_name'], default='anonymous') is None
# test when value with None value is not acceptable
assert dict_until(data, keys=['pen_name'], terminate=lambda x: x is not None, default='anonymous') == 'anonymous'
from pythonic_toolbox.utils.dict_utils import select_list_of_dicts
dict_lst = [
{'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'},
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
# another Peter Parker from multiverse
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
# age unknown for Carol Danvers, no age field
{'name': 'Carol Danvers', 'sex': 'female', 'alias': 'Captain Marvel'},
{'name': 'Natasha Romanoff', 'sex': 'female', 'age': 35, 'alias': 'Black Widow'},
]
assert select_list_of_dicts(dict_lst, look_like={'name': 'Peter Parker'}) == [
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'}]
assert select_list_of_dicts(dict_lst, look_like={'sex': 'female'}) == [
{'name': 'Carol Danvers', 'sex': 'female', 'alias': 'Captain Marvel'},
{'name': 'Natasha Romanoff', 'sex': 'female', 'age': 35, 'alias': 'Black Widow'}]
assert select_list_of_dicts(dict_lst, look_like={'sex': 'female'}, keys=['name']) == [
{'name': 'Carol Danvers'}, {'name': 'Natasha Romanoff'}]
# unique is supported for return list
assert select_list_of_dicts(dict_lst, look_like={'sex': 'male'}, keys=['name', 'age']) == [
{'name': 'Tony Stark', 'age': 49},
{'name': 'Peter Parker', 'age': 16},
{'name': 'Peter Parker', 'age': 16},
]
assert select_list_of_dicts(dict_lst, look_like={'sex': 'male'}, keys=['name', 'age'], unique=True) == [
{'name': 'Tony Stark', 'age': 49},
{'name': 'Peter Parker', 'age': 16}]
# dict keys are ordered as the keys passed-in
assert list(select_list_of_dicts(dict_lst, keys=['name', 'age'], unique=True)[0].keys()) == ['name', 'age']
assert list(select_list_of_dicts(dict_lst, keys=['age', 'name'], unique=True)[0].keys()) == ['age', 'name']
# locate Captain Marvel, with default val for missing key
assert select_list_of_dicts(dict_lst,
look_like={'alias': 'Captain Marvel'},
keys=['name', 'sex', 'age', 'alias'],
val_for_missing_key='Unknown')[0]['age'] == 'Unknown'
# edge cases, get the original dict
assert select_list_of_dicts([]) == []
assert select_list_of_dicts(dict_lst) == dict_lst
# new list of dicts is returned, leaving the original list of dicts untouched
black_widow = select_list_of_dicts(dict_lst, look_like={'name': 'Natasha Romanoff'})[0]
black_widow['age'] += 1
assert black_widow['age'] == 36
# we don't modify the original dict data, Natasha is always 35 years old
assert select_list_of_dicts(dict_lst, look_like={'name': 'Natasha Romanoff'})[0]['age'] == 35
# preds provide more flexibility, filter the ones with age info
assert len(select_list_of_dicts(dict_lst, preds=[lambda d: 'age' in d, lambda d: d['age'] >= 0])) == 4
assert len(select_list_of_dicts(dict_lst, preds=[lambda d: 'age' in d, lambda d: d['age'] >= 0], unique=True)) == 3
# combine look_like and preds parameters
expected = [{'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'}]
assert select_list_of_dicts(dict_lst, look_like={'sex': 'male'},
preds=[lambda d: 'age' in d, lambda d: d['age'] > 20]) == expected
# empty list is returned if no dict matches the criteria
assert select_list_of_dicts(dict_lst, look_like={'sex': 'male'},
preds=[lambda d: 'sex' in d and d['sex'] == 'female']) == []
from pythonic_toolbox.utils.dict_utils import unique_list_of_dicts
dict_lst = [
{'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'},
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
# Peter Parkers from multiverse in same age.
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
# test same dict, but the order of dict is different
{'name': 'Peter Parker', 'sex': 'male', 'alias': 'Spider Man', 'age': 16},
]
# Only one Peter Parker will be kept, for all data are exactly same.
assert unique_list_of_dicts(dict_lst) == [
{'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'},
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
]
# edge cases
assert unique_list_of_dicts([]) == []
from pythonic_toolbox.utils.dict_utils import walk_leaves
data = {
'k1': {
'k1_1': 1,
'k1_2': 2,
},
'k2': 'N/A', # stands for not available
}
expected = {
'k1': {
'k1_1': 2,
'k1_2': 4,
},
'k2': 'N/A', # stands for not available
}
assert walk_leaves(data) == data # no transform function provided, just a deepcopy
assert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x) == expected
# if inplace is set True, will change data inplace, return nothing
assert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x, inplace=True) is None
assert data == expected
data = [{'name': 'lml', 'age': 33}, {'name': 'albert', 'age': 18}]
expected = [{'name': 'lml', 'age': 66}, {'name': 'albert', 'age': 36}]
assert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x) == expected
assert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x, inplace=True) is None
assert data == expected
# edge cases
assert walk_leaves(None) is None
assert walk_leaves([]) == []
assert walk_leaves({}) == {}
assert walk_leaves(None, inplace=True) is None
assert walk_leaves([], inplace=True) is None
assert walk_leaves({}, inplace=True) is None
from pythonic_toolbox.utils.functional_utils import lfilter_multi, filter_multi
from collections.abc import Iterable
def is_even(x):
return x % 2 == 0
def is_divisible_by_5(x):
return x % 5 == 0
# select numbers which are divisible by 2 and 5
assert lfilter_multi([is_even, is_divisible_by_5], range(1, 30)) == [10, 20]
assert lfilter_multi([is_even, is_divisible_by_5], [5, 10, 15, 20]) == [10, 20]
from itertools import count, takewhile
# if you want to pass an iterator, make sure the iterator will end/break,
# Note: a bare count(start=0, step=2) will generate number like 0, 2, 4, 6, .... (never ends)
even_numbers_less_equal_than_50 = takewhile(lambda x: x <= 50, count(start=0, step=2))
expected = [0, 10, 20, 30, 40, 50]
assert lfilter_multi([is_even, is_divisible_by_5], even_numbers_less_equal_than_50) == expected
# testing for filter_multi, not converted to list directly
num_iterator = filter_multi([is_even, is_divisible_by_5], [5, 10, 15, 20])
assert type(num_iterator) is filter
assert isinstance(num_iterator, Iterable)
expected = [10, 20]
for idx, value in enumerate(num_iterator):
assert value == expected[idx]
# when items are infinite, choose filter_multi instead of lfilter_multi
expected = [0, 10, 20, 30, 40, 50]
for idx, value in enumerate(filter_multi([is_even, is_divisible_by_5], count(start=0, step=1))):
if value > 50:
break
else:
assert value == expected[idx]
from pythonic_toolbox.utils.list_utils import filter_allowable
fruits = ['apple', 'banana', 'orange']
vegetables = ['carrot', 'potato', 'tomato']
meats = ['beef', 'chicken', 'fish']
foods = fruits + vegetables + meats
assert list(filter_allowable(foods)) == foods
assert list(filter_allowable(foods, allow_list=[], block_list=[])) == foods
assert list(filter_allowable(foods, allow_list=['apple', 'banana', 'blueberry'])) == ['apple', 'banana']
assert list(filter_allowable(foods, allow_list=[], block_list=foods)) == []
assert list(filter_allowable(foods, block_list=meats)) == fruits + vegetables
assert list(filter_allowable(foods, allow_list=['apple'], block_list=[])) == ['apple']
assert list(filter_allowable(foods, allow_list=['apple'], block_list=['apple'])) == []
assert list(filter_allowable(foods + ['blueberry'], allow_list=[], block_list=foods)) == ['blueberry']
assert list(filter_allowable(['blueberry'], allow_list=[], block_list=[])) == ['blueberry']
assert list(filter_allowable(['blueberry'], allow_list=[], block_list=['apple', 'banana'])) == ['blueberry']
assert list(filter_allowable(['blueberry'], allow_list=['orange'], block_list=['apple', 'banana'])) == []
# test cases with parameter key
assert list(filter_allowable(foods, allow_list=['a', 'b'], key=lambda x: x[0])) == ['apple', 'banana', 'beef']
# test some basic cases
assert list(filter_allowable()) == []
assert list(filter_allowable(candidates=None)) == []
assert list(filter_allowable(candidates=[])) == []
assert list(filter_allowable(candidates=[], allow_list=[], block_list=[])) == []
from operator import itemgetter
from typing import List
import pytest
from pythonic_toolbox.utils.list_utils import sort_with_custom_orders
# basic usage
values = ['branch2', 'branch1', 'branch3', 'master', 'release']
expected = ['master', 'release', 'branch1', 'branch2', 'branch3']
assert sort_with_custom_orders(values, prefix_orders=['master', 'release']) == expected
assert sort_with_custom_orders(values, prefix_orders=['master', 'release'], reverse=True) == expected[::-1]
values = [1, 2, 3, 9, 9]
expected = [9, 9, 1, 2, 3]
assert sort_with_custom_orders(values, prefix_orders=[9, 8, 7]) == expected
values = [1, 2, 3, 9]
expected = [9, 2, 3, 1]
assert sort_with_custom_orders(values, prefix_orders=[9], suffix_orders=[1]) == expected
assert sort_with_custom_orders([]) == []
assert sort_with_custom_orders([], prefix_orders=[], suffix_orders=[]) == []
assert sort_with_custom_orders([], prefix_orders=['master']) == []
# tests for unhashable values
values = [[2, 2], [1, 1], [3, 3], [6, 0]]
assert sort_with_custom_orders(values, prefix_orders=[[3, 3]]) == [[3, 3], [1, 1], [2, 2], [6, 0]]
# if "key" is provided, items are sorted in order of key(item)
# items in prefix_orders/suffix_orders don't need to be one-one correspondence with items to sort
# sum([6]) == sum([3, 3]) == sum([6, 0])
assert sort_with_custom_orders(values, prefix_orders=[[6]], key=sum) == [[3, 3], [6, 0], [1, 1], [2, 2]]
# tests for list of dicts
values = [{2: 2}, {1: 1}, {1: 2}]
assert sort_with_custom_orders(values, prefix_orders=[{2: 2}],
key=lambda data: sum(data.values())) == [{2: 2}, {1: 2}, {1: 1}]
branch_info: List[dict] = [{'branch': 'master', 'commit_id': 'v1.2'}, {'branch': 'release', 'commit_id': 'v1.1'}]
# Assume that we prefer choosing branch in order: release > master > others (develop, hotfix etc.)
res = sort_with_custom_orders(branch_info,
prefix_orders=[{'branch': 'release'}, {'branch': 'master'}],
key=itemgetter('branch'))
expected = [{'branch': 'release', 'commit_id': 'v1.1'}, {'branch': 'master', 'commit_id': 'v1.2'}]
assert res == expected
branch_info = [{'branch': 'develop', 'commit_id': 'v1.3'}, {'branch': 'master', 'commit_id': 'v1.2'}]
res = sort_with_custom_orders(branch_info,
prefix_orders=[{'branch': 'release'}, {'branch': 'master'}],
key=itemgetter('branch'))
expected = [{'branch': 'master', 'commit_id': 'v1.2'}, {'branch': 'develop', 'commit_id': 'v1.3'}]
assert res == expected
# tests for exceptions
with pytest.raises(ValueError) as exec_info:
sort_with_custom_orders([1, 2, 3], prefix_orders=[3], suffix_orders=[3])
assert exec_info.value.args[0] == 'prefix and suffix contains same value'
with pytest.raises(ValueError) as exec_info:
sort_with_custom_orders([1, 2, 3], prefix_orders=[1, 1])
assert exec_info.value.args[0] == 'prefix_orders contains duplicated values'
# tests for class
class Person:
def __init__(self, id, name, age):
self.id = id
self.name = name
self.age = age
def __lt__(self, other: 'Person'):
return self.age < other.age
def __eq__(self, other: 'Person'):
return self.age == other.age
def __hash__(self):
return self.id
def __str__(self):
return f'Person({self.id}, {self.name}, {self.age})'
def __repr__(self):
return str(self)
Albert = Person(1, 'Albert', 28)
Alice = Person(2, 'Alice', 26)
Menglong = Person(3, 'Menglong', 33)
persons = [Albert, Alice, Menglong]
expected = [Alice, Albert, Menglong]
assert sort_with_custom_orders(persons) == expected
expected = [Menglong, Alice, Albert]
assert sort_with_custom_orders(persons, prefix_orders=[Menglong, Person(4, 'Anyone', 40)]) == expected
import pytest
from pythonic_toolbox.utils.list_utils import unpack_list
first, second, third = unpack_list(['a', 'b', 'c', 'd'], target_num=3)
assert first == 'a' and second == 'b' and third == 'c'
first, second, third = unpack_list(['a', 'b'], target_num=3, default=None)
assert first == 'a' and second == 'b' and third is None
first, second, third = unpack_list(range(1, 3), target_num=3, default=None)
assert first == 1 and second == 2 and third is None
first, second, third = unpack_list([], target_num=3, default=0)
assert first == second == third == 0
first, second, *rest = unpack_list(['a', 'b', 'c'], target_num=4, default='x')
assert first == 'a' and second == 'b' and rest == ['c', 'x']
# test case for type range
first, second, third = unpack_list(range(1, 3), target_num=3, default=None)
assert first == 1 and second == 2 and third is None
def fib():
a, b = 0, 1
while 1:
yield a
a, b = b, a + b
# test case for type generator
fib_generator = fib() # generates data like [0, 1, 1, 2, 3, 5, 8, 13, 21 ...]
first, second, third, *rest = unpack_list(fib_generator, target_num=6)
assert first == 0 and second == 1 and third == 1
assert rest == [2, 3, 5]
seventh, eighth = unpack_list(fib_generator, target_num=2)
assert seventh == 8 and eighth == 13
# test edge case, nothing to unpack
empty = unpack_list([], target_num=0, default=None)
assert empty == []
res = unpack_list([], target_num=2, default=None)
assert res == [None, None]
empty = unpack_list(['a', 'b'], target_num=0, default=None)
assert empty == []
empty = unpack_list(range(0, 0), target_num=0)
assert empty == []
empty = unpack_list(iter([]), target_num=0, default=None)
assert empty == []
with pytest.raises(ValueError):
# ValueError: not enough values to unpack (expected 3, got 2)
first, second, third = unpack_list([1, 2], target_num=2)
from itertools import count
from pythonic_toolbox.utils.list_utils import until
# basic usage
counter = count(1, 2) # generator of odd numbers: 1, 3, 5, 7 ...
assert until(counter, lambda x: x > 10) == 11
assert until([1, 2, 3], lambda x: x > 10, default=11) == 11
# test case for when there's no default value and no item in the iterable satisfies the condition
assert until([1, 2, 3], lambda x: x > 10) is None
# edge cases
assert until([], default=3) == 3 # nothing provided, return default
assert until(None, lambda x: x > 10, default=11) == 11
# test case for when there's no item in the counter satisfies the condition
# the following codes will run forever, so comment them out
# counter = count(1, 2) # generator of odd numbers: 1, 3, 5, 7 ...
# assert until(counter, lambda x: x % 2 == 0) is None
# test case for when max_iter_num is provided, only iterate the counter for max_iter_num times
counter = count(1, 2) # generator of odd numbers: 1, 3, 5, 7 ...
assert until(counter, lambda x: x % 2 == 0, default=None, max_iter_num=100) is None
numbers = [1, 2, 3, 4, 5, 6]
assert until(numbers, lambda x: x >= 5, default=None, max_iter_num=1) is None
assert until(numbers, lambda x: x >= 5, default=None, max_iter_num=4) is None
assert until(numbers, lambda x: x >= 5, default=None, max_iter_num=5) == 5
assert until(numbers, lambda x: x >= 5, default=None, max_iter_num=100) == 5
from unittest.mock import patch, PropertyMock
import pytest
from pythonic_toolbox.utils.string_utils import substitute_string_template_dict, CycleError
# simple usage
# both $variable ${variable} declarations are supported in string template format
str_template_dict = {
'greeting': 'Good Morning, Everyone!',
'first_name': 'Albert',
'last_name': 'Lee',
'full_name': '$first_name $last_name',
'age': 34,
'speech': '$greeting, I am $full_name, a ${age}-year-old programmer, very glad to meet you!'
}
output_dict = substitute_string_template_dict(str_template_dict)
assert output_dict['full_name'] == 'Albert Lee'
expected_speech = 'Good Morning, Everyone!, I am Albert Lee, a 34-year-old programmer, very glad to meet you!'
assert output_dict['speech'] == expected_speech
# complex usage, with dynamic values, and multi value-providing holders
str_template_dict = {
'first_name': 'Daenerys',
'last_name': 'Targaryen',
'nick_name': 'Dany',
'full_name': '$first_name $last_name',
'speech': "$nick_name: I'm $full_name ($title1, $title2, $title3), it's $current_time_str, $greeting!",
}
variables_dict = {'title1': 'Queen of Meereen',
'title2': 'Mother of Dragons'}
class DynamicVariables:
@property
def current_time_str(self):
import datetime
return datetime.datetime.now().strftime("%H:%M:%S")
class DefaultUnknownTitle:
"""
A class will always return UnknownTitle, when try to access attribute like
title1, title2, ..., titleX
"""
def __getattribute__(self, item):
if isinstance(item, str) and item.startswith('title') and item[len(item) - 1:].isdigit():
return 'UnknownTitle'
return super(DefaultUnknownTitle, self).__getattribute__(item)
expected_speech = ("Dany: I'm Daenerys Targaryen (Queen of Meereen, Mother of Dragons, UnknownTitle), "
"it's 08:00:00, good morning everyone!")
# using mock to make DynamicVariables().current_time_str always return 08:00:00
with patch.object(DynamicVariables, 'current_time_str', return_value='08:00:00', new_callable=PropertyMock):
output_dict = substitute_string_template_dict(str_template_dict, variables_dict, DynamicVariables(),
DefaultUnknownTitle(),
greeting='good morning everyone')
assert output_dict['speech'] == expected_speech
# edge cases
assert substitute_string_template_dict({}) == {}
# cycle detection
str_template_dict = {
'variable_a': 'Hello $variable_b', # variable_a depends on variable_b
'variable_b': 'Hello $variable_a', # variable_b depends on variable_a, it's a cycle!
}
with pytest.raises(CycleError) as exec_info:
substitute_string_template_dict(str_template_dict)
import itertools
import pytest
from pythonic_toolbox.utils.context_utils import SkipContext
# Usage: define a class that inherits the SkipContext,
# and takes control of the skip or not logic
class MyWorkStation(SkipContext):
def __init__(self, week_day: str):
working_days = {'monday', 'tuesday', 'wednesday', 'thursday', 'friday'}
weekends = {'saturday', 'sunday'}
if week_day.lower() not in working_days.union(weekends):
raise ValueError(f'Invalid weekday {week_day}')
skip = True if week_day.lower() in weekends else False
super(MyWorkStation, self).__init__(skip=skip)
seven_week_days = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
logged_opening_days = []
total_working_hours = 0
for cur_week_day in seven_week_days:
# MyWorkStation will skip the code block when encountering weekends
with MyWorkStation(week_day=cur_week_day):
# log this working day
logged_opening_days.append(cur_week_day)
# accumulate working hours, 8 hours on each working day
total_working_hours += 8
# only working days are logged
assert logged_opening_days == ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday']
assert total_working_hours == 8 * 5
# test basic SkipContext
count_iterator = itertools.count(start=0, step=1)
flg_skip = True
with SkipContext(skip=flg_skip):
# if skip = True, all codes inside the context will be skipped(not executed)
next(count_iterator) # this will not be executed
assert sum([1, 1]) == 3
raise Exception('Codes will not be executed')
assert next(count_iterator) == 0 # check previous context is skipped
flg_skip = False
with SkipContext(skip=flg_skip):
# codes will be executed as normal, if skip = False
next(count_iterator) # generate value 1
assert sum([1, 1]) == 2
assert next(count_iterator) == 2 # check previous context is executed
with pytest.raises(Exception) as exec_info:
with SkipContext(skip=False):
# if skip = False, this SkipContextManager is transparent,
# internal exception will be detected as normal
raise Exception('MyError')
assert exec_info.value.args[0] == 'MyError'
# another example: ensure there will be only one job, who acquire the lock, run the increase +1
from multiprocessing import Manager, Pool
import time
from pythonic_toolbox.utils.context_utils import SkipContext
def plain_cronjob_increase(ns, lock):
start = time.time()
with lock:
now = time.time()
if now - start >= 0.5:
pass
else:
ns.cnt += 1
time.sleep(1)
return ns.cnt
class PreemptiveLockContext(SkipContext):
def __init__(self, lock):
self.start_time = time.perf_counter()
self.lock = lock
self.acquired = self.lock.acquire(timeout=0.5)
skip = not self.acquired
super(PreemptiveLockContext, self).__init__(skip=skip)
def __exit__(self, type, value, traceback):
if self.acquired:
time.sleep(1)
self.lock.release()
if type is None:
return # No exception
else:
if issubclass(type, self.SkipContentException):
return True # Suppress special SkipWithBlockException
return False
def cronjob_increase(ns, lock):
# for those who cannot acquire the lock within some time
# this context block will be skipped, quite simple
with PreemptiveLockContext(lock):
ns.cnt += 1
return ns.cnt
manager = Manager()
lock = manager.Lock()
ns = manager.Namespace()
pool = Pool(2)
ns.cnt = 0
processes = [pool.apply_async(plain_cronjob_increase, args=(ns, lock)) for __ in range(0, 2)]
result = [p.get() for p in processes]
assert result == [1, 1]
assert ns.cnt == 1
# reset global cnt=0
ns.cnt = 0
processes = [pool.apply_async(cronjob_increase, args=(ns, lock)) for __ in range(0, 2)]
result = [p.get() for p in processes]
assert result == [1, 1]
assert ns.cnt == 1