Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify number of series migrated #12

Merged
merged 12 commits into from
May 9, 2024
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ node_modules
# Ignore AWS Influx migration script backup and performance files and directories
tools/python/influx-migration/**/performance.txt
tools/python/influx-migration/**/*influxdb-backup-*
tools/python/influx-migration/**/scripts/temp/*
14 changes: 14 additions & 0 deletions tools/python/influx-migration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -651,3 +651,17 @@ Possible reasons for a restore failing include:
- Invalid InfluxDB destination token.
- A bucket existing in the destination instance with the same name as in the source instance. For individual bucket migrations use the `--dest-bucket` option to set a unique name for the migrated bucket.
- Connectivity failure, either with the source or destination hosts or with an optional S3 bucket.

### Determining Amount of Data Migrated

By default, the number of shards migrated, as reported by the Influx CLI, and the
number of rows migrated when `--csv` is used, are logged.
When the log level is set to `debug`, with the option `--log-level debug`, the
number of [series](https://docs.influxdata.com/influxdb/v2/reference/key-concepts/data-elements/#series) as reported by
the [InfluxDB `/metrics` endpoint](https://docs.influxdata.com/influxdb/v2/api/#operation/GetMetrics), under [bucket series number](https://docs.influxdata.com/influxdb/v2/reference/internals/metrics/#bucket-series-number),
will be logged.

If a bucket is empty or has not been migrated, it will not be listed under bucket series number and an error indicating as such will be logged.
This can help determine whether data is successfully being migrated.

To manually verify migrated records, see the recommended queries listed in the [Migration Overview](#migration-overview) section, step 3.
110 changes: 97 additions & 13 deletions tools/python/influx-migration/influx_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from influxdb_client import BucketRetentionRules, InfluxDBClient
from influxdb_client.client.exceptions import InfluxDBError
from influxdb_client.rest import ApiException
from influxdb_client.service.metrics_service import MetricsService

# Maximum number of retries for attempting to mount an S3 bucket
MAX_RETRIES = 20
Expand All @@ -49,6 +50,12 @@
# mount an S3 bucket
MOUNT_POINT_NAME = "influxdb-backups"

# The recommended amount of time to wait before scraping from the /metrics endpoint
# to ensure metrics are up to date
METRICS_SCRAPE_INTERVAL_SECONDS=10
trevorbonas marked this conversation as resolved.
Show resolved Hide resolved

BUCKET_PAGINATION_LIMIT=100

script_duration = 0

# The user is already warned for using --skip-verify, this will de-clutter output
Expand Down Expand Up @@ -80,6 +87,12 @@ def backup(backup_path, root_token, src_host, bucket_name=None, full=False, skip
raise ValueError("bucket_name and full not provided, one must be provided")

logging.info("Backing up bucket data and metadata using the InfluxDB CLI")
if logging.root.level >= logging.DEBUG:
time.sleep(METRICS_SCRAPE_INTERVAL_SECONDS)
if full:
report_all_bucket_series_count(host=src_host, token=root_token)
trevorbonas marked this conversation as resolved.
Show resolved Hide resolved
else:
report_bucket_series_count(bucket_name=bucket_name, host=src_host, token=root_token, org_name=src_org)
start_time = time.time()

bucket_backup_command = ['influx', 'backup', backup_path, '-t', root_token,
Expand All @@ -97,6 +110,46 @@ def backup(backup_path, root_token, src_host, bucket_name=None, full=False, skip
duration = time.time() - start_time
log_performance_metrics("backup", start_time, duration)

def report_all_bucket_series_count(host, token, org_name=None):
with InfluxDBClient(url=host, token=token) as client:
# CSV migration may use an all access token, meaning buckets will be scoped to an organization
if org_name is not None:
client.org = org_name
buckets = client.buckets_api().find_buckets(limit=BUCKET_PAGINATION_LIMIT)
offset = 0
while len(buckets.buckets) > 0:
for bucket in buckets.buckets:
if not bucket.name.startswith("_"):
report_bucket_series_count(bucket_name=bucket.name, host=host, token=token, org_name=org_name)
offset += BUCKET_PAGINATION_LIMIT
buckets = client.buckets_api().find_buckets(limit=BUCKET_PAGINATION_LIMIT,
offset=offset)

def report_bucket_series_count(bucket_name, host, token, org_name=None):
try:
with InfluxDBClient(url=host, token=token) as client:
if org_name is not None:
client.org = org_name
buckets = client.buckets_api().find_buckets(name=bucket_name) \
if org_name is None else \
client.buckets_api().find_buckets(name=bucket_name, org=org_name)
for bucket in buckets.buckets:
metrics_service = MetricsService(client.api_client)
metrics = metrics_service.get_metrics()
for line in metrics.split("\n"):
if f'storage_bucket_series_num{{bucket="{bucket.id}"}}' in line:
line = line.split(" ")
if len(line) < 2:
raise ValueError(f"Bucket metrics for bucket with name {bucket.name} are "
"tracked in storage_bucket_series_num but its series count is missing. "
f"Check the {host}/metrics endpoint for more details")
logging.debug(f"Bucket with name {bucket.name}, in org {bucket.org_id}, has {line[1]} series")
return
raise ValueError(f"Bucket series count could not be found in {host}/metrics")
except (ApiException, ValueError) as error:
logging.error(repr(error))
logging.error(f"Failed to get series count for bucket with name {bucket_name} in {host}")

def backup_csv(backup_path, root_token, src_host, bucket_name=None, full=False, skip_verify=False, src_org=None):
"""
Backups data and metadata stored in InfluxDB to a specified directory using csv for each
Expand All @@ -118,6 +171,12 @@ def backup_csv(backup_path, root_token, src_host, bucket_name=None, full=False,
:raises OSError: If writing bucket data to csv fails
"""
logging.info("Backing up bucket data and metadata using the InfluxDB v2 API")
if logging.root.level >= logging.DEBUG:
time.sleep(METRICS_SCRAPE_INTERVAL_SECONDS)
if full:
report_all_bucket_series_count(host=src_host, token=root_token, org_name=src_org)
else:
report_bucket_series_count(bucket_name=bucket_name, host=src_host, token=root_token, org_name=src_org)
start_time = time.time()

try:
Expand Down Expand Up @@ -170,30 +229,44 @@ def bucket_create_rollback(host, token, bucket_name, org, skip_verify):
client.close()
return True

def bucket_exists(host, token, bucket_name, skip_verify=False, org=None):
def bucket_exists(host, token, bucket_name, org_name=None, skip_verify=False):
"""
Checks for the existence of a bucket.

:param str host: The host for the InfluxDB instance.
:param str token: The token to use for verification.
:param str bucket_name: The name of the bucket to verify.
:param org_name: The name of the org to use for bucket verification.
:type org_name: str or None
:param bool skip_verify: Whether to skip TLS certificate verification.
:param org: The name of the org to use for bucket verification
:type org: str or None
:returns: Whether the bucket exists in the instance.
:rtype: bool
"""
try:
client = InfluxDBClient(url=host,
token=token, timeout=MILLISECOND_TIMEOUT, verify_ssl=not skip_verify, org=org)
if client.buckets_api().find_bucket_by_name(bucket_name) is None:
return False
with InfluxDBClient(url=host, token=token, timeout=MILLISECOND_TIMEOUT,
verify_ssl=not skip_verify) as client:
forestmvey marked this conversation as resolved.
Show resolved Hide resolved
# If the org name is provided, set the client org before making
# any requests
if org_name is not None:
client.org = org_name
# Buckets may have the same name in multiple organizations
buckets = client.buckets_api().find_buckets(name=bucket_name) \
if org_name is None else \
client.buckets_api().find_buckets(name=bucket_name, org=org_name)
if len(buckets.buckets) <= 0:
logging.debug(f"Bucket with name {bucket_name} could not be found "
f"in host {host}")
return False
forestmvey marked this conversation as resolved.
Show resolved Hide resolved
logging.debug(f"Bucket with name {bucket_name} found in host {host}")
if org_name is not None:
trevorbonas marked this conversation as resolved.
Show resolved Hide resolved
logging.debug(f"Bucket found in org with name {org_name} and ID {buckets.buckets[0].org_id}")
logging.debug(f"{len(buckets.buckets)} buckets found")
return True
except InfluxDBError as error:
logging.error(str(error))
logging.debug("An unexpected error occurred while checking the existence "
f"a bucket with name {bucket_name} in host {host}")
return False
finally:
client.close()
return True

def cleanup(mount_point=None, exec_s3_bucket_mount=None):
"""
Expand Down Expand Up @@ -611,7 +684,7 @@ def set_logging(log_level):

logging.addLevelName(logging.WARNING, yellow + logging.getLevelName(logging.WARNING) + reset)
logging.addLevelName(logging.ERROR, bold_red + logging.getLevelName(logging.ERROR) + reset)
log_format = '%(levelname)s: %(filename)s: %(message)s'
log_format = '%(levelname)s: %(asctime)s %(filename)s: %(message)s'

log_level = log_level.lower()
if log_level == "debug":
Expand Down Expand Up @@ -785,7 +858,7 @@ def verify_instances(args, src_token, dest_token):
if args.src_org is not None and not verify_org(args.src_host, src_token, args.src_org, args.skip_verify):
raise InfluxDBError(message="The source org could not be verified")
if args.src_bucket is not None and args.full is False and \
not bucket_exists(args.src_host, src_token, args.src_bucket, args.skip_verify, args.src_org):
not bucket_exists(args.src_host, src_token, args.src_bucket, args.src_org, args.skip_verify):
raise InfluxDBError(message="The source bucket could not be found")
if not args.skip_verify and not verify_tls(args.src_host):
raise InfluxDBError(message="TLS certificate could not be verified for source host")
Expand All @@ -800,7 +873,7 @@ def verify_instances(args, src_token, dest_token):
if args.dest_org is not None and not verify_org(args.dest_host, dest_token, args.dest_org, args.skip_verify):
raise InfluxDBError(message="The destination org could not be verified")
if args.dest_bucket is not None and args.full is False and \
bucket_exists(args.dest_host, dest_token, args.dest_bucket, args.skip_verify, args.dest_org):
bucket_exists(args.dest_host, dest_token, args.dest_bucket, args.dest_org, args.skip_verify):
message = (f"The destination bucket {args.dest_bucket} already exists in the "
"destination instance")
if args.dest_org is not None:
Expand Down Expand Up @@ -1084,6 +1157,17 @@ def main(args):
"--retry-restore-dir option and provide the previously-mentioned backup directory.")
raise

# Report number of series in destination after migration
if logging.root.level >= logging.DEBUG:
time.sleep(METRICS_SCRAPE_INTERVAL_SECONDS)
if args.full:
# For a full migration, destination instance will contain
# the source token after migration
report_all_bucket_series_count(host=args.dest_host, token=src_token)
else:
report_bucket_series_count(bucket_name=args.dest_bucket, host=args.dest_host,
token=dest_token, org_name=args.dest_org)

logging.info("Migration complete")
log_performance_metrics("influx_migration.py", script_start_time, script_duration)
except (ApiException, CalledProcessError, botocore.exceptions.ClientError, OSError, ValueError,
Expand Down