Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify number of series migrated #12

Merged
merged 12 commits into from
May 9, 2024
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ node_modules
# Ignore AWS Influx migration script backup and performance files and directories
tools/python/influx-migration/**/performance.txt
tools/python/influx-migration/**/*influxdb-backup-*
tools/python/influx-migration/**/scripts/temp/*
14 changes: 14 additions & 0 deletions tools/python/influx-migration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -651,3 +651,17 @@ Possible reasons for a restore failing include:
- Invalid InfluxDB destination token.
- A bucket existing in the destination instance with the same name as in the source instance. For individual bucket migrations use the `--dest-bucket` option to set a unique name for the migrated bucket.
- Connectivity failure, either with the source or destination hosts or with an optional S3 bucket.

### Determining Amount of Data Migrated

By default, the number of shards migrated, as reported by the Influx CLI, and the
number of rows migrated when `--csv` is used, are logged.
When the log level is set to `debug`, with the option `--log-level debug`, the
number of [series](https://docs.influxdata.com/influxdb/v2/reference/key-concepts/data-elements/#series) as reported by
the [InfluxDB `/metrics` endpoint](https://docs.influxdata.com/influxdb/v2/api/#operation/GetMetrics), under [bucket series number](https://docs.influxdata.com/influxdb/v2/reference/internals/metrics/#bucket-series-number),
will be logged.

If a bucket is empty or has not been migrated, it will not be listed under bucket series number and an error indicating as such will be logged.
This can help determine whether data is successfully being migrated.

To manually verify migrated records, see the recommended queries listed in the [Migration Overview](#migration-overview) section, step 3.
116 changes: 105 additions & 11 deletions tools/python/influx-migration/influx_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from influxdb_client import BucketRetentionRules, InfluxDBClient
from influxdb_client.client.exceptions import InfluxDBError
from influxdb_client.rest import ApiException
from influxdb_client.service.metrics_service import MetricsService

# Maximum number of retries for attempting to mount an S3 bucket
MAX_RETRIES = 20
Expand All @@ -49,6 +50,12 @@
# mount an S3 bucket
MOUNT_POINT_NAME = "influxdb-backups"

# The recommended amount of time to wait before scraping from the /metrics endpoint
# to ensure metrics are up to date
METRICS_SCRAPE_INTERVAL_SECONDS=10

BUCKET_PAGINATION_LIMIT=100

script_duration = 0

# The user is already warned for using --skip-verify, this will de-clutter output
Expand Down Expand Up @@ -80,6 +87,12 @@ def backup(backup_path, root_token, src_host, bucket_name=None, full=False, skip
raise ValueError("bucket_name and full not provided, one must be provided")

logging.info("Backing up bucket data and metadata using the InfluxDB CLI")
if logging.root.level >= logging.DEBUG:
time.sleep(METRICS_SCRAPE_INTERVAL_SECONDS)
if full:
report_all_bucket_series_count(host=src_host, token=root_token)
else:
report_bucket_series_count(bucket_name=bucket_name, host=src_host, token=root_token)
start_time = time.time()

bucket_backup_command = ['influx', 'backup', backup_path, '-t', root_token,
Expand All @@ -97,6 +110,47 @@ def backup(backup_path, root_token, src_host, bucket_name=None, full=False, skip
duration = time.time() - start_time
log_performance_metrics("backup", start_time, duration)

def report_all_bucket_series_count(host, token, org=None):
with InfluxDBClient(url=host, token=token) as client:
# CSV migration may use an all access token, meaning buckets will be scoped to an organization
if org is not None:
client.org = org
buckets = client.buckets_api().find_buckets(limit=BUCKET_PAGINATION_LIMIT)
for bucket in buckets.buckets:
if not bucket.name.startswith("_"):
report_bucket_series_count(bucket_name=bucket.name, host=host, token=token, org=org)
# Handle pagination
offset = 0
while buckets.links.next is not None:
offset += BUCKET_PAGINATION_LIMIT
buckets = client.buckets_api().find_buckets(limit=BUCKET_PAGINATION_LIMIT,
offset=offset)
for bucket in buckets.buckets:
if not bucket.name.startswith("_"):
report_bucket_series_count(bucket_name=bucket.name, host=host, token=token, org=org)

def report_bucket_series_count(bucket_name, host, token, org=None):
try:
with InfluxDBClient(url=host, token=token) as client:
if org is not None:
client.org = org
bucket = client.buckets_api().find_bucket_by_name(bucket_name=bucket_name)
metrics_service = MetricsService(client.api_client)
metrics = metrics_service.get_metrics()
for line in metrics.split("\n"):
if f'storage_bucket_series_num{{bucket="{bucket.id}"}}' in line:
line = line.split(" ")
if len(line) < 2:
raise ValueError(f"Bucket metrics for bucket with name {bucket.name} are "
"tracked in storage_bucket_series_num but its series count is missing. "
f"Check the {host}/metrics endpoint for more details")
logging.debug(f"Bucket with name {bucket.name}, in org {bucket.org_id}, has {line[1]} series")
return
raise ValueError(f"Bucket series count could not be found in {host}/metrics")
except (ApiException, ValueError) as error:
logging.error(repr(error))
logging.error(f"Failed to get series count for bucket with name {bucket_name} in {host}")

def backup_csv(backup_path, root_token, src_host, bucket_name=None, full=False, skip_verify=False, src_org=None):
"""
Backups data and metadata stored in InfluxDB to a specified directory using csv for each
Expand All @@ -118,6 +172,12 @@ def backup_csv(backup_path, root_token, src_host, bucket_name=None, full=False,
:raises OSError: If writing bucket data to csv fails
"""
logging.info("Backing up bucket data and metadata using the InfluxDB v2 API")
if logging.root.level >= logging.DEBUG:
time.sleep(METRICS_SCRAPE_INTERVAL_SECONDS)
if full:
report_all_bucket_series_count(host=src_host, token=root_token, org=src_org)
else:
report_bucket_series_count(bucket_name=bucket_name, host=src_host, token=root_token, org=src_org)
start_time = time.time()

try:
Expand Down Expand Up @@ -170,30 +230,53 @@ def bucket_create_rollback(host, token, bucket_name, org, skip_verify):
client.close()
return True

def bucket_exists(host, token, bucket_name, skip_verify=False, org=None):
def bucket_exists(host, token, bucket_name, skip_verify=False, org_name=None):
"""
Checks for the existence of a bucket.

:param str host: The host for the InfluxDB instance.
:param str token: The token to use for verification.
:param str bucket_name: The name of the bucket to verify.
:param bool skip_verify: Whether to skip TLS certificate verification.
:param org: The name of the org to use for bucket verification
:type org: str or None
:param org_name: The name of the org to use for bucket verification
:type org_name: str or None
:returns: Whether the bucket exists in the instance.
:rtype: bool
"""
try:
client = InfluxDBClient(url=host,
token=token, timeout=MILLISECOND_TIMEOUT, verify_ssl=not skip_verify, org=org)
if client.buckets_api().find_bucket_by_name(bucket_name) is None:
return False
with InfluxDBClient(url=host, token=token, timeout=MILLISECOND_TIMEOUT,
verify_ssl=not skip_verify) as client:
org_list = []
if org_name is not None:
client.org = org_name
org_list = client.organizations_api().find_organizations(org=org_name)
if len(org_list) <= 0:
logging.debug(f"Org {org_name} could not be found when checking whether bucket {bucket_name} exists")
return False
logging.debug(f"{len(org_list)} orgs matched with name {org_name}")
bucket = client.buckets_api().find_bucket_by_name(bucket_name)
if bucket is None:
logging.debug(f"Bucket with name {bucket_name} could not be found "
f"in host {host}")
return False
if len(org_list) > 0:
for org in org_list:
if org.id == bucket.org_id:
logging.debug(f"Bucket with name {bucket_name} found in org "
f"{org_name} with org ID {org.id} in host {host}")
return True
# Bucket could not be found in any org with matching org_name
logging.debug(f"Bucket with name {bucket_name} could not be found "
f"in any org with name {org_name} in host {host}")
return False
# Org not specified and bucket has been found
logging.debug(f"Bucket with name {bucket_name} found in host {host}")
return True
except InfluxDBError as error:
logging.error(str(error))
logging.debug("An unexpected error occurred while checking the existence "
f"a bucket with name {bucket_name} in host {host}")
return False
finally:
client.close()
return True

def cleanup(mount_point=None, exec_s3_bucket_mount=None):
"""
Expand Down Expand Up @@ -611,7 +694,7 @@ def set_logging(log_level):

logging.addLevelName(logging.WARNING, yellow + logging.getLevelName(logging.WARNING) + reset)
logging.addLevelName(logging.ERROR, bold_red + logging.getLevelName(logging.ERROR) + reset)
log_format = '%(levelname)s: %(filename)s: %(message)s'
log_format = '%(levelname)s: %(asctime)s %(filename)s: %(message)s'

log_level = log_level.lower()
if log_level == "debug":
Expand Down Expand Up @@ -1084,6 +1167,17 @@ def main(args):
"--retry-restore-dir option and provide the previously-mentioned backup directory.")
raise

# Report number of series in destination after migration
if logging.root.level >= logging.DEBUG:
time.sleep(METRICS_SCRAPE_INTERVAL_SECONDS)
if args.full:
# For a full migration, destination instance will contain
# the source token after migration
report_all_bucket_series_count(host=args.dest_host, token=src_token)
else:
report_bucket_series_count(bucket_name=args.dest_bucket, host=args.dest_host,
token=dest_token, org=args.dest_org)

logging.info("Migration complete")
log_performance_metrics("influx_migration.py", script_start_time, script_duration)
except (ApiException, CalledProcessError, botocore.exceptions.ClientError, OSError, ValueError,
Expand Down