forked from thepaul/cassandra-dtest
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdeprecated_repair_test.py
185 lines (164 loc) · 9.45 KB
/
deprecated_repair_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement
from ccmlib.common import is_win
from dtest import Tester, debug
from tools import since, insert_c1c2, query_c1c2
from jmxutils import JolokiaAgent, make_mbean, remove_perf_disable_shared_mem
@since("2.2")
class TestDeprecatedRepairAPI(Tester):
"""
@jira_ticket CASSANDRA-9570
Test if depreated repair JMX API runs with expected parameters
"""
def check_rows_on_node(self, node_to_check, rows, found=None, missings=None, restart=True):
if found is None:
found = []
if missings is None:
missings = []
stopped_nodes = []
for node in self.cluster.nodes.values():
if node.is_running() and node is not node_to_check:
stopped_nodes.append(node)
node.stop(wait_other_notice=True)
session = self.patient_cql_connection(node_to_check, 'ks')
result = session.execute("SELECT * FROM cf LIMIT %d" % (rows * 2))
assert len(result) == rows, len(result)
for k in found:
query_c1c2(session, k, ConsistencyLevel.ONE)
for k in missings:
query = SimpleStatement("SELECT c1, c2 FROM cf WHERE key='k%d'" % k, consistency_level=ConsistencyLevel.ONE)
res = session.execute(query)
assert len(filter(lambda x: len(x) != 0, res)) == 0, res
if restart:
for node in stopped_nodes:
node.start(wait_other_notice=True)
def force_repair_async_1_test(self, ):
"""
test forceRepairAsync(String keyspace, boolean isSequential,
Collection<String> dataCenters,
Collection<String> hosts,
boolean primaryRange, boolean fullRepair, String... columnFamilies)
"""
opt = self._deprecated_repair_jmx("forceRepairAsync(java.lang.String,boolean,java.util.Collection,java.util.Collection,boolean,boolean,[Ljava.lang.String;)",
['ks', True, [], [], False, False, ["cf"]])
self.assertEqual(opt["parallelism"], "parallel" if is_win() else "sequential", opt)
self.assertEqual(opt["primary_range"], "false", opt)
self.assertEqual(opt["incremental"], "true", opt)
self.assertEqual(opt["job_threads"], "1", opt)
self.assertEqual(opt["data_centers"], "[]", opt)
self.assertEqual(opt["hosts"], "[]", opt)
self.assertEqual(opt["column_families"], "[cf]", opt)
def force_repair_async_2_test(self, ):
"""
test forceRepairAsync(String keyspace, int parallelismDegree,
Collection<String> dataCenters,
Collection<String> hosts,
boolean primaryRange, boolean fullRepair, String... columnFamilies)
"""
opt = self._deprecated_repair_jmx("forceRepairAsync(java.lang.String,int,java.util.Collection,java.util.Collection,boolean,boolean,[Ljava.lang.String;)",
['ks', 1, [], [], True, True, []])
self.assertEqual(opt["parallelism"], "parallel", opt)
self.assertEqual(opt["primary_range"], "true", opt)
self.assertEqual(opt["incremental"], "false", opt)
self.assertEqual(opt["job_threads"], "1", opt)
self.assertEqual(opt["data_centers"], "[]", opt)
self.assertEqual(opt["hosts"], "[]", opt)
self.assertEqual(opt["column_families"], "[]", opt)
def force_repair_async_3_test(self, ):
"""
test forceRepairAsync(String keyspace, boolean isSequential,
boolean isLocal, boolean primaryRange,
boolean fullRepair, String... columnFamilies)
"""
opt = self._deprecated_repair_jmx("forceRepairAsync(java.lang.String,boolean,boolean,boolean,boolean,[Ljava.lang.String;)",
['ks', False, False, False, False, ["cf"]])
self.assertEqual(opt["parallelism"], "parallel", opt)
self.assertEqual(opt["primary_range"], "false", opt)
self.assertEqual(opt["incremental"], "true", opt)
self.assertEqual(opt["job_threads"], "1", opt)
self.assertEqual(opt["data_centers"], "[]", opt)
self.assertEqual(opt["hosts"], "[]", opt)
self.assertEqual(opt["column_families"], "[cf]", opt)
def force_repair_range_async_1_test(self, ):
"""
test forceRepairRangeAsync(String beginToken, String endToken,
String keyspaceName, boolean isSequential,
Collection<String> dataCenters,
Collection<String> hosts, boolean fullRepair,
String... columnFamilies)
"""
opt = self._deprecated_repair_jmx("forceRepairRangeAsync(java.lang.String,java.lang.String,java.lang.String,boolean,java.util.Collection,java.util.Collection,boolean,[Ljava.lang.String;)",
["0", "1000", "ks", True, ["dc1"], [], False, ["cf"]])
self.assertEqual(opt["parallelism"], "parallel" if is_win() else "sequential", opt)
self.assertEqual(opt["primary_range"], "false", opt)
self.assertEqual(opt["incremental"], "true", opt)
self.assertEqual(opt["job_threads"], "1", opt)
self.assertEqual(opt["data_centers"], "[dc1]", opt)
self.assertEqual(opt["hosts"], "[]", opt)
self.assertEqual(opt["ranges"], "1", opt)
self.assertEqual(opt["column_families"], "[cf]", opt)
def force_repair_range_async_2_test(self, ):
"""
test forceRepairRangeAsync(String beginToken, String endToken,
String keyspaceName, int parallelismDegree,
Collection<String> dataCenters,
Collection<String> hosts,
boolean fullRepair, String... columnFamilies)
"""
opt = self._deprecated_repair_jmx("forceRepairRangeAsync(java.lang.String,java.lang.String,java.lang.String,int,java.util.Collection,java.util.Collection,boolean,[Ljava.lang.String;)",
["0", "1000", "ks", 2, [], [], True, ["cf"]])
self.assertEqual(opt["parallelism"], "parallel" if is_win() else "dc_parallel", opt)
self.assertEqual(opt["primary_range"], "false", opt)
self.assertEqual(opt["incremental"], "false", opt)
self.assertEqual(opt["job_threads"], "1", opt)
self.assertEqual(opt["data_centers"], "[]", opt)
self.assertEqual(opt["hosts"], "[]", opt)
self.assertEqual(opt["ranges"], "1", opt)
self.assertEqual(opt["column_families"], "[cf]", opt)
def force_repair_range_async_3_test(self, ):
"""
test forceRepairRangeAsync(String beginToken, String endToken,
String keyspaceName, boolean isSequential,
boolean isLocal, boolean fullRepair,
String... columnFamilies)
"""
opt = self._deprecated_repair_jmx("forceRepairRangeAsync(java.lang.String,java.lang.String,java.lang.String,boolean,boolean,boolean,[Ljava.lang.String;)",
["0", "1000", "ks", True, True, True, ["cf"]])
self.assertEqual(opt["parallelism"], "parallel" if is_win() else "sequential", opt)
self.assertEqual(opt["primary_range"], "false", opt)
self.assertEqual(opt["incremental"], "false", opt)
self.assertEqual(opt["job_threads"], "1", opt)
self.assertEqual(opt["data_centers"], "[dc1]", opt)
self.assertEqual(opt["hosts"], "[]", opt)
self.assertEqual(opt["ranges"], "1", opt)
self.assertEqual(opt["column_families"], "[cf]", opt)
def _deprecated_repair_jmx(self, method, arguments):
cluster = self.cluster
debug("Starting cluster..")
cluster.populate([1, 1])
node1, node2 = cluster.nodelist()
remove_perf_disable_shared_mem(node1)
cluster.start()
session = self.patient_cql_connection(node1)
self.create_ks(session, 'ks', 2)
self.create_cf(session, 'cf', read_repair=0.0, columns={'c1': 'text', 'c2': 'text'})
insert_c1c2(session, n=1000, consistency=ConsistencyLevel.ALL)
# Run repair
mbean = make_mbean('db', 'StorageService')
with JolokiaAgent(node1) as jmx:
# assert repair runs and returns valid cmd number
self.assertEqual(jmx.execute_method(mbean, method, arguments), 1)
# wait for log to start
node1.watch_log_for("Starting repair command")
# get repair parameters from the log
l = node1.grep_log("Starting repair command #1, repairing keyspace ks with repair options \(parallelism: (?P<parallelism>\w+), primary range: (?P<pr>\w+), incremental: (?P<incremental>\w+), job threads: (?P<jobs>\d+), ColumnFamilies: (?P<cfs>.+), dataCenters: (?P<dc>.+), hosts: (?P<hosts>.+), # of ranges: (?P<ranges>\d+)\)")
self.assertEqual(len(l), 1)
line, m = l[0]
return {"parallelism": m.group("parallelism"),
"primary_range": m.group("pr"),
"incremental": m.group("incremental"),
"job_threads": m.group("jobs"),
"column_families": m.group("cfs"),
"data_centers": m.group("dc"),
"hosts": m.group("hosts"),
"ranges": m.group("ranges")}