-
Notifications
You must be signed in to change notification settings - Fork 47
/
temporal_tables.c
179 lines (148 loc) · 4.8 KB
/
temporal_tables.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
/* -------------------------------------------------------------------------
*
* temporal_tables.c
*
* Copyright (c) 2012-2023 Vladislav Arkhipov <vlad@arkhipov.ru>
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "fmgr.h"
#include "nodes/pg_list.h"
#include "utils/memutils.h"
#include "temporal_tables.h"
#include "utils/elog.h"
PG_MODULE_MAGIC;
PGDLLEXPORT void _PG_init(void);
static void temporal_tables_xact_callback(XactEvent event,
void *arg);
static void temporal_tables_subxact_callback(SubXactEvent event,
SubTransactionId mySubid,
SubTransactionId parentSubid,
void *arg);
/* TemporalContext stack */
static List *temporal_contexts;
void
_PG_init(void)
{
MemoryContext oldcxt;
TemporalContext *ctx;
// Create the TemporalContexts stack and add the top temporal context
// into it. We use the special value InvalidSubTransactionId to denote
// the top TemporalContext. This context is never modified directly
// since we will set it as the current context in the case of a
// transaction rolling back. If a transaction commits, we will copy the
// current TemporalContext content to this context. Note that this
// context is created in the TopMemoryContext as it must survive
// transaction boundaries.
oldcxt = MemoryContextSwitchTo(TopMemoryContext);
ctx = palloc0(sizeof(TemporalContext));
ctx->system_time_mode = CurrentTransactionStartTimestamp;
ctx->subid = InvalidSubTransactionId;
temporal_contexts = list_make1(ctx);
MemoryContextSwitchTo(oldcxt);
// Register some callback functions that manage the TemporalContexts
// stack.
RegisterXactCallback(temporal_tables_xact_callback, NULL);
RegisterSubXactCallback(temporal_tables_subxact_callback, NULL);
}
static void
copy_temporal_context(TemporalContext *dest, TemporalContext *src,
SubTransactionId subid)
{
memcpy(dest, src, sizeof(TemporalContext));
dest->subid = subid;
}
static TemporalContext *
push_temporal_context(SubTransactionId subid)
{
MemoryContext oldcxt;
TemporalContext *ctx;
// We use the TopTransactionContext because we sometimes propagate the
// context to the parent subtransaction without copying it.
oldcxt = MemoryContextSwitchTo(TopTransactionContext);
ctx = palloc(sizeof(TemporalContext));
copy_temporal_context(ctx, linitial(temporal_contexts), subid);
temporal_contexts = lcons(ctx, temporal_contexts);
MemoryContextSwitchTo(oldcxt);
return ctx;
}
/*
* A callback routine on a transaction commit/abort. It pops the top from
* the TemporalContexts stack and copy its content to the top
* TemporalContext in the case of the transaction committed.
*/
static void
temporal_tables_xact_callback(XactEvent event, void *arg)
{
if (event == XACT_EVENT_COMMIT || event == XACT_EVENT_ABORT)
{
TemporalContext *ctx = linitial(temporal_contexts);
if (ctx->subid != InvalidSubTransactionId)
{
temporal_contexts = list_delete_first(temporal_contexts);
if (event == XACT_EVENT_COMMIT)
{
TemporalContext *top_ctx = linitial(temporal_contexts);
Assert(top_ctx->subid == InvalidSubTransactionId);
copy_temporal_context(top_ctx, ctx,
InvalidSubTransactionId);
}
}
}
}
/*
* A callback routine on a subtransaction commit/abort. It pops the top from
* the TemporalContexts stack.
*/
static void
temporal_tables_subxact_callback(SubXactEvent event,
SubTransactionId mySubid,
SubTransactionId parentSubid, void *arg)
{
if (event == SUBXACT_EVENT_COMMIT_SUB ||
event == SUBXACT_EVENT_ABORT_SUB)
{
TemporalContext *ctx = linitial(temporal_contexts);
// If subid does not equal to the current subtransaction id, that
// means that we have not pushed a new temporal context in this
// subtransaction, so there is nothing to pop here.
if (ctx->subid == GetCurrentSubTransactionId())
{
if (event == SUBXACT_EVENT_ABORT_SUB)
{
temporal_contexts = list_delete_first(temporal_contexts);
pfree(ctx);
}
else // SUBXACT_EVENT_COMMIT_SUB
{
TemporalContext *parent_ctx = lsecond(temporal_contexts);
if (parent_ctx->subid != parentSubid)
{
ctx->subid = parentSubid;
}
else
{
copy_temporal_context(parent_ctx, ctx, parentSubid);
temporal_contexts = list_delete_first(temporal_contexts);
pfree(ctx);
}
}
}
}
}
TemporalContext *
get_current_temporal_context(bool will_modify)
{
TemporalContext *ctx = linitial(temporal_contexts);
SubTransactionId subid;
// Since the operation is read-only, we can safely return the top from
// the TemporalContexts stack as that TemporalContext has not been
// changed since the transaction it was created in.
if (!will_modify)
return ctx;
subid = GetCurrentSubTransactionId();
if (ctx->subid == subid)
return ctx;
return push_temporal_context(subid);
}