### Dependencies:
* python >= 3.8 at `/usr/local/bin/python3`
* [HandBrakeCLI](https://handbrake.fr/downloads2.php) and [ffmpeg](https://www.ffmpeg.org/download.html) on your `$PATH`
-* [numpy](https://pypi.org/project/numpy/)
* [cv2](https://pypi.org/project/opencv-python/)
+* [dill](https://pypi.org/project/dill/)
+* [imutils](https://pypi.org/project/imutils/)
+* [numpy](https://pypi.org/project/numpy/)
+* [scikit-image](https://pypi.org/project/scikit-image/)
-import argparse
-import cv2
-import numpy as np
-import os
-import subprocess
-import sys
-parser = argparse.ArgumentParser()
-parser.add_argument("filename", help="Source filename (or \"all\" to compare all files which exist in both ./source/ and ./hevc/)")
-parser.add_argument("num_frames", nargs="?", default=5, type=int, help="Number of comparison frames to generate")
-parser.add_argument("-s", "--stack", action="store_true", help="Also create 2-up stacked comparison")
-args = parser.parse_args()
-if not set(["source", "hevc"]).issubset(set(os.listdir())):
- sys.exit("Invalid working directory, exiting.")
-if args.filename.lower() == "all":
- source_files = [filename for filename in os.listdir("source") if os.path.splitext(filename)[1] == ".mp4"]
-elif args.filename.endswith(".mp4"):
- source_files = [args.filename]
- sys.exit("Invalid filename, exiting.")
-print("\nComparison frames:\t{frames}".format(frames=args.num_frames))
-for source_file in source_files:
- source_file_path = os.path.join("source", source_file)
- source_file_size = int(os.path.getsize(source_file_path)/1000000)
- source_file_handle = cv2.VideoCapture(source_file_path)
- hevc_files = [filename for filename in os.listdir("hevc") if filename.startswith(os.path.splitext(source_file)[0])]
- for hevc_file in hevc_files:
- output_directory = os.path.join(os.path.relpath("comparison"), os.path.splitext(os.path.basename(hevc_file))[0])
- hevc_file_path = os.path.join("hevc", hevc_file)
- hevc_file_handle = cv2.VideoCapture(hevc_file_path)
- hevc_file_size = int(os.path.getsize(hevc_file_path)/1000000)
- compression_ratio = int(100-(hevc_file_size/source_file_size*100))
- total_frames = source_file_handle.get(cv2.CAP_PROP_FRAME_COUNT)
- stride = int(total_frames/(args.num_frames+1))
- print("\nFilename:\t\t{filename}".format(filename=hevc_file))
- if source_file_handle.get(cv2.CAP_PROP_FRAME_COUNT) != hevc_file_handle.get(cv2.CAP_PROP_FRAME_COUNT):
- print("\t\t\t!!! WARNING: Frame counts do not match, screencaps may be time-shifted")
- print("\tSource Size:\t{size} MB".format(size=source_file_size))
- print("\tHEVC Size:\t{size} MB".format(size=hevc_file_size))
- print("\tReduction:\t{ratio}%\n".format(ratio=compression_ratio))
- if not os.path.exists(output_directory): os.makedirs(output_directory)
- with open(os.path.join(output_directory, "summary.txt"), "w") as summary_file:
- summary_file.write("{source} MB to {hevc} MB, saving {ratio}%".format(source=str(source_file_size), hevc=str(hevc_file_size), ratio=str(compression_ratio)))
- for frame in range(1, args.num_frames+1):
- source_file_handle.set(cv2.CAP_PROP_POS_FRAMES,stride*frame)
- hevc_file_handle.set(cv2.CAP_PROP_POS_FRAMES,stride*frame)
- ret,source_frame = source_file_handle.read()
- ret,hevc_frame = hevc_file_handle.read()
- if args.stack:
- comparison_frame = np.vstack((source_frame,hevc_frame))
- cv2.imwrite(os.path.join(output_directory, "{number}-2up.png".format(number=frame)), comparison_frame, [cv2.IMWRITE_PNG_COMPRESSION, 0])
- cv2.imwrite(os.path.join(output_directory, "{number}-source.png".format(number=frame)), source_frame, [cv2.IMWRITE_PNG_COMPRESSION, 0])
- cv2.imwrite(os.path.join(output_directory, "{number}-x265.png".format(number=frame)), hevc_frame)
- hevc_file_handle.release()
- source_file_handle.release()
+import argparse
+import cv2
+import dill
+import imutils
+import numpy as np
+import os
+from skimage.metrics import structural_similarity
+import subprocess
+import sys
+# Verify script is colocated with ./lib/ and import dependencies
+sys.path.append(os.path.join(sys.path[0], "lib"))
+ from common import get_choice_from_menu
+except ImportError:
+ sys.exit("FATAL: failed to import dependencies from ./lib/\n")
+# Parse command-line arguments
+parser = argparse.ArgumentParser()
+source_group = parser.add_mutually_exclusive_group()
+source_group.add_argument("--source", help="Source filename to compare that exists in both ./source/ and ./hevc/)")
+source_group.add_argument("--all", action="store_true", help="Compare all files which exist in both ./source/ and ./hevc/")
+parser.add_argument("--frame", nargs=1, type=int, help="Compare SSIM values for specific frames from previously generated comparisons")
+parser.add_argument("--num_frames", nargs="?", default=5, type=int, help="Number of comparison frames to generate")
+parser.add_argument("--dill", action="store_true")
+args = parser.parse_args()
+# Verify we're working from a directory that contains expected subdirectories
+if not set(["source", "hevc", "performance"]).issubset(set(os.listdir())):
+ sys.exit("Invalid working directory, exiting.")
+if args.all and args.frame:
+ sys.exit("Error: --frame must be used with --source, not --all. Exiting.")
+elif args.all:
+ source_files = [filename for filename in os.listdir("source") if os.path.splitext(filename)[1] == ".mp4"]
+elif args.source.endswith(".mp4"):
+ source_files = [args.source]
+# if args.frame, fail unless args.source exists
+ # if args.frame, verify filename exists in "comparison"
+ sys.exit("Invalid filename, exiting.")
+# if comparison directory already exists, exit
+print("\nComparison frames:\t{frames}".format(frames=args.num_frames))
+for source_file in source_files:
+ source_file_path = os.path.join("source", source_file)
+ source_file_size = int(os.path.getsize(source_file_path)/1000000)
+ source_file_handle = cv2.VideoCapture(source_file_path)
+ hevc_files = [filename for filename in os.listdir("hevc") if filename.startswith(os.path.splitext(source_file)[0])]
+ for hevc_file in hevc_files:
+ evaluate_frames = True
+ output_directory = os.path.join(os.path.relpath("comparison"), os.path.splitext(os.path.basename(hevc_file))[0])
+ hevc_file_path = os.path.join("hevc", hevc_file)
+ hevc_file_handle = cv2.VideoCapture(hevc_file_path)
+ hevc_file_size = int(os.path.getsize(hevc_file_path)/1000000)
+ compression_ratio = int(100-(hevc_file_size/source_file_size*100))
+ total_frames = source_file_handle.get(cv2.CAP_PROP_FRAME_COUNT)
+ # get other attributes from ./performance/<<>>.log
+ stride = int(total_frames/(args.num_frames+1))
+ print("\nFilename:\t\t{filename}".format(filename=hevc_file))
+ if source_file_handle.get(cv2.CAP_PROP_FRAME_COUNT) != hevc_file_handle.get(cv2.CAP_PROP_FRAME_COUNT):
+ print("\t\t\t!!! WARNING: Frame counts do not match, screencaps may be time-shifted")
+ evaluate_frames = False
+ print("\tSource Size:\t{size} MB".format(size=source_file_size))
+ print("\tHEVC Size:\t{size} MB".format(size=hevc_file_size))
+ print("\tReduction:\t{ratio}%\n".format(ratio=compression_ratio))
+ ssim_total = 0.0
+ ssim_values = {}
+ print("\tSSIM:")
+ if not os.path.exists(output_directory): os.makedirs(output_directory)
+ #if frame_resolution_differs: # e.g. letterboxing removed -- where does this go?
+ for frame in range(1, args.num_frames+1):
+ source_file_handle.set(cv2.CAP_PROP_POS_FRAMES,stride*frame)
+ hevc_file_handle.set(cv2.CAP_PROP_POS_FRAMES,stride*frame)
+ ret,source_frame = source_file_handle.read()
+ ret,hevc_frame = hevc_file_handle.read()
+ cv2.imwrite(os.path.join(output_directory, "{number}-source.png".format(number=frame)), source_frame, [cv2.IMWRITE_PNG_COMPRESSION, 0])
+ cv2.imwrite(os.path.join(output_directory, "{number}-x265.png".format(number=frame)), hevc_frame, [cv2.IMWRITE_PNG_COMPRESSION, 0])
+ if evaluate_frames:
+ try:
+ ssim = structural_similarity(cv2.cvtColor(source_frame, cv2.COLOR_BGR2GRAY), cv2.cvtColor(hevc_frame, cv2.COLOR_BGR2GRAY))
+ except ValueError as error:
+ print("\tERROR: " +str(error))
+ evaluate_frames = False
+ else:
+ ssim_values[frame] = ssim
+ ssim_total += ssim
+ print("\t Frame {frame}:\t{ssim}".format(frame=frame, ssim=ssim))
+ ssim_average = ssim_total/args.num_frames
+ print("\tAverage:\t{average}\n".format(average=ssim_average))
+ with open(os.path.join("performance", hevc_file[:-4] + ".log"), "r") as performance_file:
+ duration = performance_file.readline().rstrip()
+ fps = "{:0.2f}".format(float(performance_file.readline().rstrip().split(" ")[0]))
+ with open(os.path.join(output_directory, "summary.txt"), "w") as summary_file:
+ summary_file.write("SSIM Avg:\t{average}\nDuration:\t{duration}\nFPS:\t\t{fps}\nCompression:\t{compression}%\n\n".format(average=ssim_average, duration=duration, fps=fps, compression=compression_ratio))
+ if evaluate_frames:
+ for iterator in range(1, args.num_frames+1):
+ summary_file.write("\t{iterator}:\t{ssim}\n".format(iterator=iterator, ssim=ssim_values[iterator]))
+ hevc_file_handle.release()
+ source_file_handle.release()
\ No newline at end of file
+import argparse
+import cv2
+import imutils
+import os
+from skimage.metrics import structural_similarity
+import sys
+sys.path.append(os.path.join(sys.path[0], "lib"))
+ from common import get_choice_from_menu
+except ImportError:
+ sys.exit("FATAL: failed to import dependencies from ./lib/\n")
+parser = argparse.ArgumentParser()
+args = parser.parse_args()
+if not args.dir:
+ choices = sorted([file for file in os.listdir("comparison") if os.path.isdir(os.path.join("comparison", file))])
+ if len(choices) == 0:
+ sys.exit("\nNo transcode directories to evaluate.\n")
+ else:
+ print("\nChoose a transcode to evaluate:")
+ transcode = choices[get_choice_from_menu(choices)]
+ if args.dir in os.listdir("comparison"):
+ transcode = args.dir
+ else:
+ sys.exit("Invalid directory.\n")
+# TODO: frame arg
+screenshots = sorted([file for file in os.listdir(os.path.join("comparison", transcode)) if file.endswith(".png")], key=lambda filename: int(filename.split("-")[0]))
+if not (len(screenshots) % 2 == 0):
+ sys.exit("ERROR: Odd number of screenshots found in {directory}".format(directory=transcode))
+ num_screenshots = int(len(screenshots)/2)
+#TODO: integrate into compareEncoding.py, error out if source/hevc dimenions !=
+if os.path.exists(os.path.join("comparison", transcode, "summary.txt")):
+ sys.exit("\nsummary.txt exists, {transcode} has already been evaluated.\n\nExiting.\n".format(transcode=transcode))
+file_info = {"filename": transcode}
+with open(os.path.join("performance", transcode + ".log"), "r") as log_file:
+ file_info["duration"] = log_file.readline().rstrip()
+ file_info["fps"] = "{:0.2f}".format(float(log_file.readline().rstrip().split(" ")[0]))
+ file_info["compression"] = log_file.readline().rstrip().split(" ")[0]
+ for line in log_file:
+ if "bitrate" in line:
+ file_info["bitrate"] = int(line.rstrip().split(": ")[2][:-1])
+ elif "height" in line:
+ file_info["height"] = line.rstrip().split(": ")[1][:-1]
+ elif "width" in line:
+ file_info["width"] = line.rstrip().split(": ")[1][:-2]
+ elif "encoder_quality" in line:
+ file_info["encoder_quality"] = line.rstrip().split(": ")[1][:-1]
+ elif "encoder_preset" in line:
+ file_info["encoder_preset"] = line.rstrip().split(": ")[1][1:-2]
+ elif "encoder_options" in line:
+ file_info["encoder_options"] = line.rstrip().split(": ")[1][1:-2]
+print(" Resolution:\t{resolution}".format(resolution=file_info["width"] + "x" + file_info["height"]))
+print(" Bitrate:\t{bitrate}".format(bitrate=str(int(file_info["bitrate"] / 1000)) + "kbps"))
+print(" Encoder:\t{settings}".format(settings=str("RF" + file_info["encoder_quality"] + " " + file_info["encoder_preset"] + ", " + file_info["encoder_options"])))
+print(" Duration:\t{duration}".format(duration=file_info["duration"]))
+print(" FPS:\t\t{fps}".format(fps=str(file_info["fps"])))
+print(" Compression:\t{ratio}".format(ratio=file_info["compression"]))
+print("\n SSIM:")
+ssim_total = 0.0
+ssim_values = {}
+for image_iterator in range(1, num_screenshots+1):
+ screenshot_pair = sorted([os.path.join("comparison", transcode, screenshot) for screenshot in screenshots if screenshot.split("-")[0] == str(image_iterator)])
+ ssim = structural_similarity(cv2.cvtColor(cv2.imread(screenshot_pair[0]), cv2.COLOR_BGR2GRAY), cv2.cvtColor(cv2.imread(screenshot_pair[1]), cv2.COLOR_BGR2GRAY))
+ #(score, diff) = structural_similarity(source_grayscale, hevc_grayscale, full=True)
+ # What does the full image get me?
+ ssim_values[image_iterator] = ssim
+ print(" Frame {image_iterator}:\t{ssim}".format(image_iterator=image_iterator, ssim=ssim))
+ ssim_total += ssim
+ssim_average = ssim_total/num_screenshots
+print(" Average:\t{average}\n".format(average=ssim_average))
+with open(os.path.join("comparison", transcode, "summary.txt"), "w") as summary_file:
+ summary_file.write("SSIM Avg:\t{average}\nDuration:\t{duration}\nFPS:\t\t{fps}\nCompression:\t{compression}\n\n".format(average=ssim_average, duration=file_info["duration"], fps=file_info["fps"], compression=file_info["compression"]))
+ for iterator in range(1, num_screenshots+1):
+ summary_file.write("\t{iterator}:\t{ssim}\n".format(iterator=iterator, ssim=ssim_values[iterator]))
+import argparse
+import cv2
+from datetime import datetime, timedelta
+import dill
+import imutils
+import numpy as np
+import os
+from pprint import pprint
+from skimage.metrics import structural_similarity
+import subprocess
+import sys
+# Verify script is colocated with ./lib/ and import dependencies
+sys.path.append(os.path.join(sys.path[0], "lib"))
+ from common import get_choice_from_menu
+except ImportError:
+ sys.exit("FATAL: failed to import dependencies from ./lib/\n")
+# Verify we're working from a directory that contains expected subdirectories
+if not set(["source", "performance"]).issubset(set(os.listdir())):
+ sys.exit("Invalid working directory, exiting.")
+source_files = [filename for filename in os.listdir("source") if os.path.splitext(filename)[1] == ".mp4"]
+comparisons = {}
+for source_file in source_files:
+ comparison_directories = [filename for filename in os.listdir("comparison") if filename.startswith(os.path.splitext(source_file)[0])]
+ movie_name = source_file.split(".")[0]
+ transcodes = {}
+ for directory in comparison_directories:
+ summary_file = os.path.join("comparison", directory, "summary.txt")
+ transcode_options = directory.split("-")[1].split("_")
+ transcode_options.pop(0)
+ if "Baseline" in transcode_options:
+ quality = "Baseline"
+ else:
+ quality = transcode_options[0]
+ transcode_options.pop(0)
+ if quality not in transcodes:
+ transcodes[quality] = {}
+ with open(summary_file, "r") as file:
+ data = {"ssim": file.readline().rstrip().split("\t")[1]}
+ data["duration"] = file.readline().rstrip().split("\t")[1]
+ data["fps"] = file.readline().rstrip().split("\t")[2]
+ data["compression"] = file.readline().rstrip().split("\t")[1]
+ if quality == "Baseline":
+ transcodes[quality] = data
+ else:
+ transcodes[quality]["_".join(transcode_options)] = data
+ comparisons[movie_name] = transcodes
+for movie_name in sorted(comparisons.keys()):
+ print(movie_name)
+ for key, value in sorted(comparisons[movie_name].items()):
+ if key == "Baseline":
+ print("Baseline", value["ssim"], "-", value["duration"], "-", str(value["compression"]) + "%", "-", value["fps"], sep="\t")
+ else:
+ for variant in comparisons[movie_name][key]:
+ name = key + "_" + variant
+ ssim_delta = float(value[variant]["ssim"]) - float(comparisons[movie_name]["Baseline"]["ssim"])
+ #duration_delta = timedelta(seconds = (datetime.strptime(value[variant]["duration"], "%H:%M:%S.%f") - datetime.strptime(comparisons[movie_name]["Baseline"]["duration"], "%H:%M:%S.%f")).total_seconds())
+ compression_delta = str(int(value[variant]["compression"]) - int(comparisons[movie_name]["Baseline"]["compression"])) + "%"
+ print(name, value[variant]["ssim"], ssim_delta, value[variant]["duration"], "", str(value[variant]["compression"]) + "%", compression_delta, value[variant]["fps"], sep="\t")
+ print()
+ print()
+ print()
from datetime import datetime
+import dill
import json
import os
from pprint import pprint
"codec": metadata["codec_name"]
            }
if self.source["height"] < 720:
- self.encoder_quality = 21
+ self.encoder_quality = 18
+ #self.encoder_quality = 21
self.encoder_options = "ctu=32:qg-size=16"
elif 720 <= self.source["height"] < 1080:
- self.encoder_quality = 22
+ self.encoder_quality = 20
+ #self.encoder_quality = 22
self.encoder_options = "ctu=32:qg-size=32"
elif 1080 <= self.source["height"] < 2160:
- self.encoder_quality = 23
+ self.encoder_quality = 21
+ #self.encoder_quality = 23
self.encoder_options = "ctu=64:qg-size=64"
elif 2160 <= self.source["height"]:
- self.encoder_quality = 26
+ self.encoder_quality = 24
+ #self.encoder_quality = 26
self.encoder_options = "ctu=64:qg-size=64"
# Create empty attributes for dynamic session options
self.output["filename"] = self.source["filename"] + self.output["file_decorator"]
self.path["output"] = os.path.join("hevc", self.output["filename"] + ".mp4")
self.path["log"] = os.path.join("performance", self.output["filename"] + ".log")
+ self.path["session"] = os.path.join("performance", self.output["filename"] + ".session")
# Verify no attributes are None
@@ -126,6 +132,8 @@ def finish(self):
self.output["compression_ratio"] = int(100 - (self.output["filesize"] / self.source["filesize"] * 100))
self.fps = self.source["frames"] / self.time["duration"].seconds
self.log(self.time["duration"], self.fps, self.output["compression_ratio"])
+ with open(self.path["session"], "wb") as session_file:
+ dill.dump(self, session_file)
if self.args.delete:
""" Summarizes transcode session for screen and log
summary = "{duration}\n{fps:.2f} fps\n{compression_ratio}% reduction ({source_size}mb to {output_size}mb)".format(duration=self.time["duration"], fps=self.fps, compression_ratio=self.output["compression_ratio"], source_size=int(self.source["filesize"] / 1000000), output_size=int(self.output["filesize"] / 1000000))
- with open(self.path["log"], "w") as logfile:
- logfile.write(summary + "\n\n" + self.command + "\n\n")
- pprint(vars(self), logfile)
+ with open(self.path["log"], "w") as log_file:
+ log_file.write(summary + "\n\n" + self.command + "\n\n")
+ pprint(vars(self), log_file)
except FileNotFoundError:
print("Session.cleanup():", self.path["output"], "does not exist.")
+ def repair(self):
+ with open(os.path.join("performance", self.output["filename"] + ".log"), "r") as log_file:
+ self.output["filesize"] = os.path.getsize(self.path["output"])
+ self.time = {"duration": log_file.readline().rstrip()}
+ self.fps = "{:0.2f}".format(float(log_file.readline().rstrip().split(" ")[0]))
+ self.output["compression_ratio"] = log_file.readline().rstrip().split(" ")[0][:-1]
+ if self.output["compression_ratio"] == "":
+ self.output["filesize"] = os.path.getsize(self.path["output"])
+ self.output["compression_ratio"] = int(100 - (self.output["filesize"] / self.source["filesize"] * 100))
+ with open(self.path["session"], "wb") as session_file:
+ dill.dump(self, session_file)
+ print("Wrote " + self.path["session"])
# Check for Python 3.8 (required for shlex usage)
if not (sys.version_info[0] >= 3 and sys.version_info[1] >= 8):
-def get_user_response():
+def get_yn_answer():
""" Accepts yes/no answer as user input and returns answer as boolean
while "need response":
return response
+def get_choice_from_menu(choices):
+ choice = 1
+ for option in choices:
+ print(" {choice}: {option}".format(choice=choice, option=option))
+ choice += 1
+ print()
+ user_input = 0
+ while True:
+ try:
+ user_input = int(input("Choice: (1-{num_choices}) ".format(num_choices=len(choices))))
+ if user_input not in range(1, len(choices)+1):
+ continue
+ else:
+ pass
+ except KeyboardInterrupt:
+ sys.exit()
+ except:
+ pass
+ else:
+ print("\n{choice}".format(choice=choices[user_input-1]))
+ break
+ return user_input-1
if __name__ == "__main__":
sys.exit("I am a module, not a script.")
return source_files
def main():
args = evaluate_args()
source_files = build_source_list(args)