forked from pytorch/pytorch
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcontext.py
106 lines (79 loc) · 2.75 KB
/
context.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
## @package context
# Module caffe2.python.context
import inspect
import threading
import functools
class _ContextInfo:
def __init__(self, cls, allow_default):
self.cls = cls
self.allow_default = allow_default
self._local_stack = threading.local()
@property
def _stack(self):
if not hasattr(self._local_stack, 'obj'):
self._local_stack.obj = []
return self._local_stack.obj
def enter(self, value):
self._stack.append(value)
def exit(self, value):
assert len(self._stack) > 0, 'Context %s is empty.' % self.cls
assert self._stack.pop() == value
def get_active(self, required=True):
if len(self._stack) == 0:
if not required:
return None
assert self.allow_default, (
'Context %s is required but none is active.' % self.cls)
self.enter(self.cls())
return self._stack[-1]
class _ContextRegistry:
def __init__(self):
self._ctxs = {}
def get(self, cls):
if cls not in self._ctxs:
assert issubclass(cls, Managed), "must be a context managed class, got {}".format(cls)
self._ctxs[cls] = _ContextInfo(cls, allow_default=issubclass(cls, DefaultManaged))
return self._ctxs[cls]
_CONTEXT_REGISTRY = _ContextRegistry()
def _context_registry():
global _CONTEXT_REGISTRY
return _CONTEXT_REGISTRY
def _get_managed_classes(obj):
return [
cls for cls in inspect.getmro(obj.__class__)
if issubclass(cls, Managed) and cls != Managed and cls != DefaultManaged
]
class Managed:
"""
Managed makes the inheritted class a context managed class.
class Foo(Managed): ...
with Foo() as f:
assert f == Foo.current()
"""
@classmethod
def current(cls, value=None, required=True):
ctx_info = _context_registry().get(cls)
if value is not None:
assert isinstance(value, cls), (
'Wrong context type. Expected: %s, got %s.' % (cls, type(value)))
return value
return ctx_info.get_active(required=required)
def __enter__(self):
for cls in _get_managed_classes(self):
_context_registry().get(cls).enter(self)
return self
def __exit__(self, *args):
for cls in _get_managed_classes(self):
_context_registry().get(cls).exit(self)
def __call__(self, func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
with self:
return func(*args, **kwargs)
return wrapper
class DefaultManaged(Managed):
"""
DefaultManaged is similar to Managed but if there is no parent when
current() is called it makes a new one.
"""
pass