forked from thelastpickle/cassandra-medusa
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrestore_cluster.py
507 lines (434 loc) · 24.7 KB
/
restore_cluster.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
# -*- coding: utf-8 -*-
# Copyright 2020- Datastax, Inc. All rights reserved.
# Copyright 2019 Spotify AB. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import logging
import operator
import socket
import sys
import traceback
import uuid
import medusa.config
import medusa.utils
from medusa.cassandra_utils import CqlSessionProvider, Cassandra
from medusa.host_man import HostMan
from medusa.monitoring import Monitoring
from medusa.network.hostname_resolver import HostnameResolver
from medusa.orchestration import Orchestration
from medusa.schema import parse_schema
from medusa.storage import Storage
from medusa.verify_restore import verify_restore
def orchestrate(config, backup_name, seed_target, temp_dir, host_list, keep_auth, bypass_checks, verify, keyspaces,
tables, parallel_restores, use_sstableloader=False, version_target=None, ignore_racks=False):
monitoring = Monitoring(config=config.monitoring)
try:
restore_start_time = datetime.datetime.now()
if seed_target is None and host_list is None:
# if no target node is provided, nor a host list file, default to the local node as seed target
hostname_resolver = HostnameResolver(medusa.utils.evaluate_boolean(config.cassandra.resolve_ip_addresses),
medusa.utils.evaluate_boolean(
config.kubernetes.enabled if config.kubernetes else False))
seed_target = hostname_resolver.resolve_fqdn(socket.gethostbyname(socket.getfqdn()))
logging.warning("Seed target was not provided, using the local hostname: {}".format(seed_target))
if seed_target is not None and host_list is not None:
err_msg = 'You must either provide a seed target or a list of host, not both'
logging.error(err_msg)
raise RuntimeError(err_msg)
if not temp_dir.is_dir():
err_msg = '{} is not a directory'.format(temp_dir)
logging.error(err_msg)
raise RuntimeError(err_msg)
with Storage(config=config.storage) as storage:
try:
cluster_backup = storage.get_cluster_backup(backup_name)
except KeyError:
err_msg = 'No such backup --> {}'.format(backup_name)
logging.error(err_msg)
raise RuntimeError(err_msg)
restore = RestoreJob(cluster_backup, config, temp_dir, host_list, seed_target, keep_auth, verify,
parallel_restores, keyspaces, tables, bypass_checks, use_sstableloader, version_target,
ignore_racks)
restore.execute()
restore_end_time = datetime.datetime.now()
restore_duration = restore_end_time - restore_start_time
logging.debug('Emitting metrics')
logging.info('Restore duration: {}'.format(restore_duration.total_seconds()))
tags = ['medusa-cluster-restore', 'restore-duration', backup_name]
monitoring.send(tags, restore_duration.total_seconds())
tags = ['medusa-cluster-restore', 'restore-error', backup_name]
monitoring.send(tags, 0)
logging.debug('Done emitting metrics')
logging.info('Successfully restored the cluster')
except Exception as e:
tags = ['medusa-cluster-restore', 'restore-error', backup_name]
monitoring.send(tags, 1)
logging.error('This error happened during the cluster restore: {}'.format(str(e)))
traceback.print_exc()
sys.exit(1)
def expand_repeatable_option(option, values):
return ' '.join(['--{} {}'.format(option, value) for value in values])
class RestoreJob(object):
def __init__(self, cluster_backup, config, temp_dir, host_list, seed_target, keep_auth, verify,
parallel_restores, keyspaces=None, tables=None, bypass_checks=False, use_sstableloader=False,
version_target=None, ignore_racks=False):
self.id = uuid.uuid4()
self.ringmap = None
self.cluster_backup = cluster_backup
self.session_provider = None
self.orchestration = Orchestration(config, parallel_restores)
self.config = config
self.host_list = host_list
self.seed_target = seed_target
self.keep_auth = keep_auth
self.verify = verify
self.in_place = None
self.temp_dir = temp_dir # temporary files
self.work_dir = self.temp_dir / 'medusa-job-{id}'.format(id=self.id)
self.host_map = {} # Map of backup host/target host for the restore process
self.keyspaces = keyspaces if keyspaces else {}
self.tables = tables if tables else {}
self.bypass_checks = bypass_checks
self.use_sstableloader = use_sstableloader
self.pssh_pool_size = parallel_restores
self.cassandra = Cassandra(config)
fqdn_resolver = medusa.utils.evaluate_boolean(self.config.cassandra.resolve_ip_addresses)
k8s_mode = medusa.utils.evaluate_boolean(config.kubernetes.enabled if config.kubernetes else False)
self.fqdn_resolver = HostnameResolver(fqdn_resolver, k8s_mode)
self._version_target = version_target
self.ignore_racks = ignore_racks
def prepare_restore(self):
logging.info('Ensuring the backup is found and is complete')
if not self.config.kubernetes.enabled and not self.cluster_backup.is_complete():
raise RuntimeError('Backup is not complete')
# CASE 1 : We're restoring using a seed target. Source/target mapping will be built based on tokenmap.
if self.seed_target is not None:
self.session_provider = CqlSessionProvider([self.seed_target], self.config)
with self.session_provider.new_session() as session:
self._populate_ringmap(self.cluster_backup.tokenmap, session.tokenmap())
self._capture_release_version(session)
# CASE 2 : We're restoring a backup on a different cluster
if self.host_list is not None:
logging.info('Restore will happen on new hardware')
self.in_place = False
self._populate_hostmap()
self._capture_release_version(session=None)
logging.info('Starting Restore on all the nodes in this list: {}'.format(self.host_list))
def execute(self):
self.prepare_restore()
self._restore_data()
def _validate_ringmap(self, tokenmap, target_tokenmap):
def _ringmap_to_rack_topology(ringmap):
rack_topology = list()
for rack in self._tokenmap_to_nodes_per_rack(ringmap):
rack_topology.append(len(rack[1]))
return rack_topology
for host, ring_item in target_tokenmap.items():
if not ring_item.get('is_up'):
raise RuntimeError('Target {host} is not up!'.format(host=host))
if len(target_tokenmap) != len(tokenmap):
return False
if not self.ignore_racks:
backup_topology = _ringmap_to_rack_topology(tokenmap)
target_topology = _ringmap_to_rack_topology(target_tokenmap)
if backup_topology != target_topology:
logging.error("Rack aware cluster topology of the backup cluster does not match the target")
return False
return True
def _populate_ringmap(self, tokenmap, target_tokenmap):
def _tokens_from_ringitem(ringitem):
return ','.join(map(str, ringitem['tokens']))
def _token_counts_per_host(tokenmap):
for host, ringitem in tokenmap.items():
return len(ringitem['tokens'])
def _hosts_from_tokenmap(tokenmap):
hosts = set()
for host, ringitem in tokenmap.items():
hosts.add(host)
return hosts
def _chunk(my_list, nb_chunks):
groups = []
for i in range(nb_chunks):
groups.append([])
for i in range(len(my_list)):
groups[i % nb_chunks].append(my_list[i])
return groups
target_tokens = {}
backup_tokens = {}
topology_matches = self._validate_ringmap(tokenmap, target_tokenmap)
self.in_place = self._is_restore_in_place(tokenmap, target_tokenmap)
if self.in_place:
logging.info("Restoring on the same cluster that was the backup was taken on (in place fashion)")
self.keep_auth = False
else:
logging.info("Restoring on a different cluster than the backup one (remote fashion)")
if self.keep_auth:
logging.info('system_auth keyspace will be left untouched on the target nodes')
else:
# ops might not be aware of the underlying behavior towards auth. Let's ask what to do...
really_keep_auth = None
while (really_keep_auth != 'Y' and really_keep_auth != 'n') and not self.bypass_checks:
really_keep_auth = input('Do you want to skip restoring the system_auth keyspace and keep the'
+ ' credentials of the target cluster? (Y/n)')
self.keep_auth = True if really_keep_auth == 'Y' else False
if topology_matches:
target_tokens = {_tokens_from_ringitem(ringitem): host for host, ringitem in target_tokenmap.items()}
backup_tokens = {_tokens_from_ringitem(ringitem): host for host, ringitem in tokenmap.items()}
target_tokens_per_host = _token_counts_per_host(tokenmap)
backup_tokens_per_host = _token_counts_per_host(target_tokenmap)
# we must have the same number of tokens per host in both vnode and normal clusters
if target_tokens_per_host != backup_tokens_per_host:
logging.info('Source/target rings have different number of tokens per node: {}/{}'.format(
backup_tokens_per_host,
target_tokens_per_host
))
topology_matches = False
# if not using vnodes, the tokens must match exactly
if backup_tokens_per_host == 1 and target_tokens.keys() != backup_tokens.keys():
extras = target_tokens.keys() ^ backup_tokens.keys()
logging.info('Tokenmap is differently distributed. Extra items: {}'.format(extras))
topology_matches = False
if topology_matches and self.ignore_racks:
# backup and restore nodes are ordered by smallest token and associated one by one
sorted_backup_nodes = self._tokenmap_to_sorted_nodes(tokenmap)
sorted_target_nodes = self._tokenmap_to_sorted_nodes(target_tokenmap)
for i in range(len(sorted_backup_nodes)):
restore_host = sorted_target_nodes[i][0]
is_seed = True if self.fqdn_resolver.resolve_fqdn(restore_host) in self._get_seeds_fqdn() else False
self.host_map[restore_host] = {'source': [sorted_backup_nodes[i][0]], 'seed': is_seed}
elif topology_matches and not self.ignore_racks:
# restore data from the same backup rack into a single rack with matching node count
backup_nodes_per_rack = self._tokenmap_to_nodes_per_rack(tokenmap)
target_nodes_per_rack = self._tokenmap_to_nodes_per_rack(target_tokenmap)
for i in range(len(backup_nodes_per_rack)):
backup_rack = backup_nodes_per_rack[i][1]
target_rack = target_nodes_per_rack[i][1]
for j in range(len(backup_rack)):
backup_host = backup_rack[j][0]
restore_host = target_rack[j][0]
is_seed = self.fqdn_resolver.resolve_fqdn(restore_host) in self._get_seeds_fqdn()
self.host_map[restore_host] = {'source': [backup_host], 'seed': is_seed}
else:
# Topologies are different between backup and restore clusters. Using the sstableloader for restore.
self.use_sstableloader = True
backup_hosts = _hosts_from_tokenmap(tokenmap)
restore_hosts = list(_hosts_from_tokenmap(target_tokenmap))
if len(backup_hosts) >= len(restore_hosts):
grouped_backups = _chunk(list(backup_hosts), len(restore_hosts))
else:
grouped_backups = _chunk(list(backup_hosts), len(backup_hosts))
for i in range(min([len(grouped_backups), len(restore_hosts)])):
# associate one restore host with several backups as we don't have the same number of nodes.
self.host_map[restore_hosts[i]] = {'source': grouped_backups[i], 'seed': False}
def _tokenmap_to_sorted_nodes(self, tokenmap):
nodes = dict()
for node in tokenmap.keys():
nodes[node] = tokenmap[node]['tokens'][0]
return sorted(nodes.items(), key=operator.itemgetter(1))
def _tokenmap_to_nodes_per_rack(self, tokenmap):
# Returns an alphabetically sorted list of racks
nodes_per_rack = dict()
for node in tokenmap.keys():
rack = tokenmap[node].get('rack', "")
first_token = tokenmap[node].get('tokens', [""])[0]
nodes_per_rack.setdefault(rack, []).append((node, first_token))
return sorted(nodes_per_rack.items())
@staticmethod
def _is_restore_in_place(backup_tokenmap, target_tokenmap):
# If at least one node is part of both tokenmaps, then we're restoring in place
# Otherwise we're restoring a remote cluster
logging.info(f"backup tokenmap keys: {backup_tokenmap.keys()}")
logging.info(f"target tokenmap keys: {target_tokenmap.keys()}")
return len(set(backup_tokenmap.keys()).intersection(set(target_tokenmap.keys()))) > 0
def _get_seeds_fqdn(self):
seeds = list()
for seed in self.cassandra.seeds:
seeds.append(self.fqdn_resolver.resolve_fqdn(seed))
logging.debug("seeds are: {}".format(seeds))
return seeds
def _populate_hostmap(self):
"""
When there are no seed nodes to pull cluster topology from, the essential information required for a restore
can be passed in via a simple file using the --host-list CLI argument.
Each line in the file must have three pieces of information in this order:
- the string `True` or `False`; This indicates if the source node was a seed node
- the host/ip that the restore operation is to take place on / destination node
- the host/ip where the data came from / source node
Each field is separated by a comma.
E.G.: Using medusa to restore a 4 node cluster from a previous backup taken of that same cluster:
medusa@cassandra-node01:~$ cat nodes.list
True,10.10.1.127,10.10.1.127
True,10.10.1.128,10.10.1.128
False,10.10.1.129,10.10.1.129
False,10.10.1.130,10.10.1.130
:return:
"""
with open(self.host_list, 'r') as f:
for line in f.readlines():
# Remove leading/trailing whitespace
_line = line.strip()
# Ignore comment lines
if not _line or _line.startswith('#'):
continue
seed, target, source = _line.split(self.config.storage.host_file_separator)
# in python, bool('False') evaluates to True. Need to test the membership as below
target_resolved = self.fqdn_resolver.resolve_fqdn(target.strip())
source_resolved = self.fqdn_resolver.resolve_fqdn(source.strip())
self.host_map[target_resolved] = {'source': [source_resolved], 'seed': seed in ['True']}
def _restore_data(self):
# create workdir on each target host
# Later: distribute a credential
# construct command for each target host
# invoke `nohup medusa-wrapper #{command}` on each target host
# wait for exit on each
logging.info('Starting cluster restore...')
logging.info('Working directory for this execution: {}'.format(self.work_dir))
for target, sources in self.host_map.items():
logging.info('About to restore on {} using {} as backup source'.format(target, sources))
logging.info("This will delete all data on the target nodes and replace it with backup '{}'."
.format(self.cluster_backup.name))
proceed = None
while (proceed != 'Y' and proceed != 'n') and not self.bypass_checks:
proceed = input('Are you sure you want to proceed? (Y/n)')
if proceed == 'n':
err_msg = 'Restore manually cancelled'
logging.error(err_msg)
raise RuntimeError(err_msg)
# work out which nodes are seeds in the target cluster
target_seeds = [t for t, s in self.host_map.items() if s['seed']]
logging.info("target seeds : {}".format(target_seeds))
# work out which nodes are seeds in the target cluster
target_hosts = [host for host in self.host_map.keys()]
logging.info("target hosts : {}".format(target_hosts))
if self.use_sstableloader is False:
# stop all target nodes
logging.info('Stopping Cassandra on all nodes currently up')
# Generate a Job ID for this run
job_id = str(uuid.uuid4())
logging.debug('Job id is: {}'.format(job_id))
# Define command to run
command = self.config.cassandra.stop_cmd
logging.debug('Command to run is: {}'.format(command))
self.orchestration.pssh_run(target_hosts, command, hosts_variables={})
else:
# we're using the sstableloader, which will require to (re)create the schema and empty the tables
logging.info("Restoring schema on the target cluster")
self._restore_schema()
# trigger restores everywhere at once
# pass in seed info so that non-seeds can wait for seeds before starting
# seeds, naturally, don't wait for anything
# Generate a Job ID for this run
hosts_variables = []
for target, source in [(t, s['source']) for t, s in self.host_map.items()]:
logging.info('Restoring data on {}...'.format(target))
seeds = '' if target in target_seeds or len(target_seeds) == 0 \
else '--seeds {}'.format(','.join(target_seeds))
hosts_variables.append((','.join(source), seeds))
command = self._build_restore_cmd()
pssh_run_success = self.orchestration.pssh_run(target_hosts,
command,
hosts_variables=hosts_variables)
if not pssh_run_success:
# we could implement a retry.
err_msg = 'Some nodes failed to restore. Exiting'
logging.error(err_msg)
raise RuntimeError(err_msg)
logging.info('Restore process is complete. The cluster should be up shortly.')
if self.verify:
verify_restore(target_hosts, self.config)
def _build_restore_cmd(self):
in_place_option = '--in-place' if self.in_place else '--remote'
keep_auth_option = '--keep-auth' if self.keep_auth else ''
keyspace_options = expand_repeatable_option('keyspace', self.keyspaces)
table_options = expand_repeatable_option('table', self.tables)
# We explicitly set --no-verify since we are doing verification here in this module
# from the control node
verify_option = '--no-verify'
# %s placeholders in the below command will get replaced by pssh using per host command substitution
command = 'mkdir -p {work}; cd {work} && medusa-wrapper {sudo} medusa {config} ' \
'--fqdn=%s -vvv restore-node ' \
'{in_place} {keep_auth} %s {verify} --backup-name {backup} --temp-dir {temp_dir} ' \
'{use_sstableloader} {keyspaces} {tables}' \
.format(work=self.work_dir,
sudo='sudo' if medusa.utils.evaluate_boolean(self.config.cassandra.use_sudo) else '',
config=f'--config-file {self.config.file_path}' if self.config.file_path else '',
in_place=in_place_option,
keep_auth=keep_auth_option,
verify=verify_option,
backup=self.cluster_backup.name,
temp_dir=self.temp_dir,
use_sstableloader='--use-sstableloader' if self.use_sstableloader else '',
keyspaces=keyspace_options,
tables=table_options)
logging.debug('Preparing to restore on all nodes with the following command: {}'.format(command))
return command
def _restore_schema(self):
schema = parse_schema(self.cluster_backup.schema)
with self.session_provider.new_session() as session:
for keyspace in schema.keys():
if keyspace.startswith("system"):
continue
else:
self._create_or_recreate_schema_objects(session, keyspace, schema[keyspace])
def _create_or_recreate_schema_objects(self, session, keyspace, keyspace_schema):
logging.info("(Re)creating schema for keyspace {}".format(keyspace))
if keyspace not in session.cluster.metadata.keyspaces:
# Keyspace doesn't exist on the target cluster. Got to create it and all the tables as well.
session.execute(keyspace_schema['create_statement'])
for mv in keyspace_schema['materialized_views'].items():
# MVs need to be dropped before we drop the tables
logging.debug("Dropping MV {}.{}".format(keyspace, mv[0]))
session.execute("DROP MATERIALIZED VIEW IF EXISTS {}.{}".format(keyspace, mv[0]))
for table in keyspace_schema['tables'].items():
logging.debug("Dropping table {}.{}".format(keyspace, table[0]))
session.execute("DROP TABLE IF EXISTS {}.{}".format(keyspace, table[0]))
for udt in keyspace_schema['udt'].items():
# then custom types as they can be used in tables
session.execute("DROP TYPE IF EXISTS {}.{}".format(keyspace, udt[0]))
# Then we create the missing ones
session.execute(udt[1])
for table in keyspace_schema['tables'].items():
logging.debug("Creating table {}.{}".format(keyspace, table[0]))
# Create the tables
session.execute(table[1])
for index in keyspace_schema['indices'].items():
# indices were dropped with their base tables
logging.debug("Creating index {}.{}".format(keyspace, index[0]))
session.execute(index[1])
for mv in keyspace_schema['materialized_views'].items():
# Base tables are created now, we can create the MVs
logging.debug("Creating MV {}.{}".format(keyspace, mv[0]))
session.execute(mv[1])
# Capture release version as specified, from driver, or use default.
# This is necessary for logic that requires knowledge of differences between 2, 3, and 4.
def _capture_release_version(self, session):
# If no version specified via CLI, but have a session, get version from driver.
if not self._version_target and session:
driver_app_version = session.cluster.application_version
if driver_app_version:
logging.debug('Driver version provided as: {}'.format(driver_app_version))
HostMan.set_release_version(driver_app_version)
else:
logging.debug('Unable to obtain app_version via driver or command line, '
'using default: {}'.format(HostMan.DEFAULT_RELEASE_VERSION))
# Using default as target wasn't found by driver or provided to RestoreJob
HostMan.set_release_version(HostMan.DEFAULT_RELEASE_VERSION)
# If no session available or specified version from CLI, use default.
elif not self._version_target:
# Use default
HostMan.set_release_version(HostMan.DEFAULT_RELEASE_VERSION)
else:
# Use what is specified from CLI as version.
logging.debug('Target version provided as: {}'.format(self._version_target))
HostMan.set_release_version(self._version_target)