forked from thepaul/cassandra-dtest
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsstable_generation_loading_test.py
215 lines (174 loc) · 8.25 KB
/
sstable_generation_loading_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
from distutils import dir_util
import os
import subprocess
import time
from dtest import Tester, debug
from ccmlib import common as ccmcommon
class TestSSTableGenerationAndLoading(Tester):
def __init__(self, *argv, **kwargs):
kwargs['cluster_options'] = {'start_rpc': 'true'}
super(TestSSTableGenerationAndLoading, self).__init__(*argv, **kwargs)
self.allow_log_errors = True
def incompressible_data_in_compressed_table_test(self):
"""
tests for the bug that caused #3370:
https://issues.apache.org/jira/browse/CASSANDRA-3370
inserts random data into a compressed table. The compressed SSTable was
compared to the uncompressed and was found to indeed be larger then
uncompressed.
"""
cluster = self.cluster
cluster.populate(1).start()
node1 = cluster.nodelist()[0]
time.sleep(.5)
session = self.patient_cql_connection(node1)
self.create_ks(session, 'ks', 1)
self.create_cf(session, 'cf', compression="Deflate")
# make unique column names, and values that are incompressible
for col in xrange(10):
col_name = str(col)
col_val = os.urandom(5000)
col_val = col_val.encode('hex')
cql = "UPDATE cf SET v='%s' WHERE KEY='0' AND c='%s'" % (col_val, col_name)
# print cql
session.execute(cql)
node1.flush()
time.sleep(2)
rows = session.execute("SELECT * FROM cf WHERE KEY = '0' AND c < '8'")
assert len(rows) > 0
def remove_index_file_test(self):
"""
tests for situations similar to that found in #343:
https://issues.apache.org/jira/browse/CASSANDRA-343
"""
cluster = self.cluster
cluster.populate(1).start(wait_for_binary_proto=True)
node1 = cluster.nodelist()[0]
# Makinge sure the cluster is ready to accept the subsequent
# stress connection. This was an issue on Windows.
version = cluster.version()
if version < "2.1":
node1.stress(['--num-keys=10000'])
else:
node1.stress(['write', 'n=10000', '-rate', 'threads=8'])
node1.flush()
node1.compact()
node1.stop()
time.sleep(1)
path = ""
if version < "2.1":
path = os.path.join(node1.get_path(), 'data', 'Keyspace1', 'Standard1')
else:
basepath = os.path.join(node1.get_path(), 'data', 'keyspace1')
for x in os.listdir(basepath):
if x.startswith("standard1"):
path = os.path.join(basepath, x)
os.system('rm %s/*Index.db' % path)
os.system('rm %s/*Filter.db' % path)
os.system('rm %s/*Statistics.db' % path)
os.system('rm %s/*Digest.sha1' % path)
node1.start()
time.sleep(10)
data_found = 0
for fname in os.listdir(path):
if fname.endswith('Data.db'):
data_found += 1
assert data_found > 0, "After removing index, filter, stats, and digest files, the data file was deleted!"
def sstableloader_compression_none_to_none_test(self):
self.load_sstable_with_configuration(None, None)
def sstableloader_compression_none_to_snappy_test(self):
self.load_sstable_with_configuration(None, 'Snappy')
def sstableloader_compression_none_to_deflate_test(self):
self.load_sstable_with_configuration(None, 'Deflate')
def sstableloader_compression_snappy_to_none_test(self):
self.load_sstable_with_configuration('Snappy', None)
def sstableloader_compression_snappy_to_snappy_test(self):
self.load_sstable_with_configuration('Snappy', 'Snappy')
def sstableloader_compression_snappy_to_deflate_test(self):
self.load_sstable_with_configuration('Snappy', 'Deflate')
def sstableloader_compression_deflate_to_none_test(self):
self.load_sstable_with_configuration('Deflate', None)
def sstableloader_compression_deflate_to_snappy_test(self):
self.load_sstable_with_configuration('Deflate', 'Snappy')
def sstableloader_compression_deflate_to_deflate_test(self):
self.load_sstable_with_configuration('Deflate', 'Deflate')
def load_sstable_with_configuration(self, pre_compression=None, post_compression=None):
"""
tests that the sstableloader works by using it to load data.
Compression of the columnfamilies being loaded, and loaded into
can be specified.
pre_compression and post_compression can be these values:
None, 'Snappy', or 'Deflate'.
"""
NUM_KEYS = 1000
for compression_option in (pre_compression, post_compression):
assert compression_option in (None, 'Snappy', 'Deflate')
debug("Testing sstableloader with pre_compression=%s and post_compression=%s" % (pre_compression, post_compression))
cluster = self.cluster
cluster.populate(2).start()
node1, node2 = cluster.nodelist()
time.sleep(.5)
def create_schema(session, compression):
self.create_ks(session, "ks", rf=2)
self.create_cf(session, "standard1", compression=compression)
self.create_cf(session, "counter1", compression=compression, columns={'v': 'counter'})
debug("creating keyspace and inserting")
session = self.cql_connection(node1)
create_schema(session, pre_compression)
for i in range(NUM_KEYS):
session.execute("UPDATE standard1 SET v='%d' WHERE KEY='%d' AND c='col'" % (i, i))
session.execute("UPDATE counter1 SET v=v+1 WHERE KEY='%d'" % i)
node1.nodetool('drain')
node1.stop()
node2.nodetool('drain')
node2.stop()
debug("Making a copy of the sstables")
# make a copy of the sstables
data_dir = os.path.join(node1.get_path(), 'data')
copy_root = os.path.join(node1.get_path(), 'data_copy')
for ddir in os.listdir(data_dir):
keyspace_dir = os.path.join(data_dir, ddir)
if os.path.isdir(keyspace_dir) and ddir != 'system':
copy_dir = os.path.join(copy_root, ddir)
dir_util.copy_tree(keyspace_dir, copy_dir)
debug("Wiping out the data and restarting cluster")
# wipe out the node data.
cluster.clear()
cluster.start()
time.sleep(5) # let gossip figure out what is going on
debug("re-creating the keyspace and column families.")
session = self.cql_connection(node1)
create_schema(session, post_compression)
time.sleep(2)
debug("Calling sstableloader")
# call sstableloader to re-load each cf.
cdir = node1.get_install_dir()
sstableloader = os.path.join(cdir, 'bin', ccmcommon.platform_binary('sstableloader'))
env = ccmcommon.make_cassandra_env(cdir, node1.get_path())
host = node1.address()
sstablecopy_dir = copy_root + '/ks'
for cf_dir in os.listdir(sstablecopy_dir):
full_cf_dir = os.path.join(sstablecopy_dir, cf_dir)
if os.path.isdir(full_cf_dir):
cmd_args = [sstableloader, '--nodes', host, full_cf_dir]
p = subprocess.Popen(cmd_args, env=env)
exit_status = p.wait()
self.assertEqual(0, exit_status,
"sstableloader exited with a non-zero status: %d" % exit_status)
def read_and_validate_data(session):
for i in range(NUM_KEYS):
rows = session.execute("SELECT * FROM standard1 WHERE KEY='%d'" % i)
self.assertEquals([str(i), 'col', str(i)], list(rows[0]))
rows = session.execute("SELECT * FROM counter1 WHERE KEY='%d'" % i)
self.assertEquals([str(i), 1], list(rows[0]))
debug("Reading data back")
# Now we should have sstables with the loaded data, and the existing
# data. Lets read it all to make sure it is all there.
read_and_validate_data(session)
debug("scrubbing, compacting, and repairing")
# do some operations and try reading the data again.
node1.nodetool('scrub')
node1.nodetool('compact')
node1.nodetool('repair')
debug("Reading data back one more time")
read_and_validate_data(session)