forked from apache/cassandra-dtest
-
Notifications
You must be signed in to change notification settings - Fork 0
/
schema_test.py
182 lines (134 loc) · 7.09 KB
/
schema_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
import time
from cassandra.concurrent import execute_concurrent_with_args
from tools.assertions import assert_invalid, assert_all, assert_one
from tools.decorators import since
from dtest import Tester, create_ks
class TestSchema(Tester):
def table_alteration_test(self):
"""
Tests that table alters return as expected with many sstables at different schema points
"""
cluster = self.cluster
cluster.populate(1).start()
node1, = cluster.nodelist()
session = self.patient_cql_connection(node1)
create_ks(session, 'ks', 1)
session.execute("use ks;")
session.execute("create table tbl_o_churn (id int primary key, c0 text, c1 text) "
"WITH compaction = {'class': 'SizeTieredCompactionStrategy', 'min_threshold': 1024, 'max_threshold': 1024 };")
stmt1 = session.prepare("insert into tbl_o_churn (id, c0, c1) values (?, ?, ?)")
rows_to_insert = 50
for n in range(5):
parameters = [(x, 'aaa', 'bbb') for x in range(n * rows_to_insert, (n * rows_to_insert) + rows_to_insert)]
execute_concurrent_with_args(session, stmt1, parameters, concurrency=rows_to_insert)
node1.flush()
session.execute("alter table tbl_o_churn add c2 text")
session.execute("alter table tbl_o_churn drop c0")
stmt2 = session.prepare("insert into tbl_o_churn (id, c1, c2) values (?, ?, ?);")
for n in range(5, 10):
parameters = [(x, 'ccc', 'ddd') for x in range(n * rows_to_insert, (n * rows_to_insert) + rows_to_insert)]
execute_concurrent_with_args(session, stmt2, parameters, concurrency=rows_to_insert)
node1.flush()
rows = session.execute("select * from tbl_o_churn")
for row in rows:
if row.id < rows_to_insert * 5:
self.assertEqual(row.c1, 'bbb')
self.assertIsNone(row.c2)
self.assertFalse(hasattr(row, 'c0'))
else:
self.assertEqual(row.c1, 'ccc')
self.assertEqual(row.c2, 'ddd')
self.assertFalse(hasattr(row, 'c0'))
@since("2.0", max_version="3.X") # Compact Storage
def drop_column_compact_test(self):
session = self.prepare()
session.execute("USE ks")
session.execute("CREATE TABLE cf (key int PRIMARY KEY, c1 int, c2 int) WITH COMPACT STORAGE")
assert_invalid(session, "ALTER TABLE cf DROP c1", "Cannot drop columns from a")
def drop_column_compaction_test(self):
session = self.prepare()
session.execute("USE ks")
session.execute("CREATE TABLE cf (key int PRIMARY KEY, c1 int, c2 int)")
# insert some data.
session.execute("INSERT INTO cf (key, c1, c2) VALUES (0, 1, 2)")
session.execute("INSERT INTO cf (key, c1, c2) VALUES (1, 2, 3)")
session.execute("INSERT INTO cf (key, c1, c2) VALUES (2, 3, 4)")
# drop and readd c1.
session.execute("ALTER TABLE cf DROP c1")
session.execute("ALTER TABLE cf ADD c1 int")
# add another row.
session.execute("INSERT INTO cf (key, c1, c2) VALUES (3, 4, 5)")
node = self.cluster.nodelist()[0]
node.flush()
node.compact()
# test that c1 values have been compacted away.
session = self.patient_cql_connection(node)
assert_all(session, "SELECT c1 FROM ks.cf", [[None], [None], [None], [4]], ignore_order=True)
def drop_column_queries_test(self):
session = self.prepare()
session.execute("USE ks")
session.execute("CREATE TABLE cf (key int PRIMARY KEY, c1 int, c2 int)")
session.execute("CREATE INDEX ON cf(c2)")
# insert some data.
session.execute("INSERT INTO cf (key, c1, c2) VALUES (0, 1, 2)")
session.execute("INSERT INTO cf (key, c1, c2) VALUES (1, 2, 3)")
session.execute("INSERT INTO cf (key, c1, c2) VALUES (2, 3, 4)")
# drop and readd c1.
session.execute("ALTER TABLE cf DROP c1")
session.execute("ALTER TABLE cf ADD c1 int")
# add another row.
session.execute("INSERT INTO cf (key, c1, c2) VALUES (3, 4, 5)")
# test that old (pre-drop) c1 values aren't returned and new ones are.
assert_all(session, "SELECT c1 FROM cf", [[None], [None], [None], [4]], ignore_order=True)
assert_all(session, "SELECT * FROM cf", [[0, None, 2], [1, None, 3], [2, None, 4], [3, 4, 5]], ignore_order=True)
assert_one(session, "SELECT c1 FROM cf WHERE key = 0", [None])
assert_one(session, "SELECT c1 FROM cf WHERE key = 3", [4])
assert_one(session, "SELECT * FROM cf WHERE c2 = 2", [0, None, 2])
assert_one(session, "SELECT * FROM cf WHERE c2 = 5", [3, 4, 5])
def drop_column_and_restart_test(self):
"""
Simply insert data in a table, drop a column involved in the insert and restart the node afterwards.
This ensures that the dropped_columns system table is properly flushed on the alter or the restart
fails as in CASSANDRA-11050.
@jira_ticket CASSANDRA-11050
"""
session = self.prepare()
session.execute("USE ks")
session.execute("CREATE TABLE t (k int PRIMARY KEY, c1 int, c2 int)")
session.execute("INSERT INTO t (k, c1, c2) VALUES (0, 0, 0)")
session.execute("ALTER TABLE t DROP c2")
assert_one(session, "SELECT * FROM t", [0, 0])
self.cluster.stop()
self.cluster.start()
session = self.patient_cql_connection(self.cluster.nodelist()[0])
session.execute("USE ks")
assert_one(session, "SELECT * FROM t", [0, 0])
def drop_static_column_and_restart_test(self):
"""
Dropping a static column caused an sstable corrupt exception after restarting, here
we test that we can drop a static column and restart safely.
@jira_ticket CASSANDRA-12582
"""
session = self.prepare()
session.execute("USE ks")
session.execute("CREATE TABLE ts (id1 int, id2 int, id3 int static, val text, PRIMARY KEY (id1, id2))")
session.execute("INSERT INTO ts (id1, id2, id3, val) VALUES (1, 1, 0, 'v1')")
session.execute("INSERT INTO ts (id1, id2, id3, val) VALUES (1, 2, 0, 'v2')")
session.execute("INSERT INTO ts (id1, id2, id3, val) VALUES (2, 1, 1, 'v3')")
self.cluster.nodelist()[0].nodetool('flush ks ts')
assert_all(session, "SELECT * FROM ts", [[1, 1, 0, 'v1'], [1, 2, 0, 'v2'], [2, 1, 1, 'v3']])
session.execute("alter table ts drop id3")
assert_all(session, "SELECT * FROM ts", [[1, 1, 'v1'], [1, 2, 'v2'], [2, 1, 'v3']])
self.cluster.stop()
self.cluster.start()
session = self.patient_cql_connection(self.cluster.nodelist()[0])
session.execute("USE ks")
assert_all(session, "SELECT * FROM ts", [[1, 1, 'v1'], [1, 2, 'v2'], [2, 1, 'v3']])
def prepare(self):
cluster = self.cluster
cluster.populate(1).start()
time.sleep(.5)
nodes = cluster.nodelist()
session = self.patient_cql_connection(nodes[0])
create_ks(session, 'ks', 1)
return session