From db989b3e8a8794ecf572e377fb62ad2a756fd390 Mon Sep 17 00:00:00 2001 From: Bill Little Date: Tue, 24 Oct 2023 15:48:32 +0100 Subject: [PATCH] Add perceptual image hash support --- pytest_mpl/kernels.py | 252 +++++++++++++++++++++++++++++++++++++++ tests/test_kernels.py | 265 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 517 insertions(+) create mode 100644 pytest_mpl/kernels.py create mode 100644 tests/test_kernels.py diff --git a/pytest_mpl/kernels.py b/pytest_mpl/kernels.py new file mode 100644 index 00000000..d9fd0e0d --- /dev/null +++ b/pytest_mpl/kernels.py @@ -0,0 +1,252 @@ +""" +This module contains the supported hashing kernel implementations. + +""" +import hashlib +from abc import ABC, abstractmethod + +import imagehash +from PIL import Image + +#: The default hamming distance bit tolerance for "similar" imagehash hashes. +DEFAULT_HAMMING_TOLERANCE = 4 + +#: The default imagehash hash size (N), resulting in a hash of N**2 bits. +DEFAULT_HASH_SIZE = 16 + +#: Level of image detail (high) or structure (low) represented by phash . +DEFAULT_HIGH_FREQUENCY_FACTOR = 4 + +#: Registered kernel names. +KERNEL_PHASH = "phash" +KERNEL_SHA256 = "sha256" + +__all__ = [ + "DEFAULT_HAMMING_TOLERANCE", + "DEFAULT_HASH_SIZE", + "DEFAULT_HIGH_FREQUENCY_FACTOR", + "KERNEL_PHASH", + "KERNEL_SHA256", + "KernelPHash", + "KernelSHA256", + "kernel_factory", +] + + +class Kernel(ABC): + """ + Kernel abstract base class (ABC) which defines a simple common kernel API. + + """ + + def __init__(self, plugin): + # Containment of the plugin allows the kernel to cherry-pick required state. + self._plugin = plugin + + @abstractmethod + def equivalent_hash(self, result, baseline, marker=None): + """ + Determine whether the kernel considers the provided result (actual) + and baseline (expected) hashes as similar. + + Parameters + ---------- + result : str + The hash of the image generated by the test. + baseline : str + The hash of the baseline image. + marker : pytest.Mark + The test marker, which may contain kwarg options to be + applied to the equivalence test. + + Returns + ------- + bool + Whether the result and baseline hashes are deemed similar. + + """ + + @abstractmethod + def generate_hash(self, buffer): + """ + Computes the hash of the image from the in-memory/open byte stream + buffer. + + Parameters + ---------- + buffer : stream + The in-memory/open byte stream of the image. + + Returns + ------- + str + The string representation (hexdigest) of the image hash. + + """ + + def update_status(self, message): + """ + Append the kernel status message to the provided message. + + Parameters + ---------- + message : str + The existing status message. + + Returns + ------- + str + The updated status message. + + """ + return message + + def update_summary(self, summary): + """ + Refresh the image comparison summary with relevant kernel entries. + + Parameters + ---------- + summary : dict + Image comparison test report summary. + + Returns + ------- + None + + """ + # The "name" class property *must* be defined in derived child class. + summary["kernel"] = self.name + + @property + def metadata(self): + """ + The kernel metadata to be archived in a hash library with results. + + Returns + ------- + dict + The kernel metadata. + + """ + return dict(name=self.name) + + +class KernelPHash(Kernel): + """ + Kernel that calculates a perceptual hash of an image for the + specified hash size (N) and high frequency factor. + + Where the resultant perceptual hash will be composed of N**2 bits. + + """ + + name = KERNEL_PHASH + + def __init__(self, plugin): + super().__init__(plugin) + # Keep state of the equivalence result. + self.equivalent = None + # Keep state of hash hamming distance (whole number) result. + self.hamming_distance = None + # Value may be overridden by py.test marker kwarg. + arg = self._plugin.hamming_tolerance + self.hamming_tolerance = ( + int(arg) if arg is not None else DEFAULT_HAMMING_TOLERANCE + ) + # The hash-size (N) defines the resultant N**2 bits hash size. + arg = self._plugin.hash_size + self.hash_size = int(arg) if arg is not None else DEFAULT_HASH_SIZE + # The level of image detail (high freq) or structure (low freq) + # represented in perceptual hash thru discrete cosine transform. + arg = self._plugin.high_freq_factor + self.high_freq_factor = ( + int(arg) if arg is not None else DEFAULT_HIGH_FREQUENCY_FACTOR + ) + # py.test marker kwarg. + self.option = "hamming_tolerance" + + def equivalent_hash(self, result, baseline, marker=None): + if marker: + value = marker.kwargs.get(self.option) + if value is not None: + # Override with the decorator marker value. + self.hamming_tolerance = int(value) + # Convert string hexdigest hashes to imagehash.ImageHash instances. + result = imagehash.hex_to_hash(result) + baseline = imagehash.hex_to_hash(baseline) + # Unlike cryptographic hashes, perceptual hashes can measure the + # degree of "similarity" through hamming distance bit differences + # between the hashes. + try: + self.hamming_distance = result - baseline + self.equivalent = self.hamming_distance <= self.hamming_tolerance + except TypeError: + # imagehash won't compare hashes of different sizes, however + # let's gracefully support this for use-ability. + self.hamming_distance = None + self.equivalent = False + return self.equivalent + + def generate_hash(self, buffer): + buffer.seek(0) + data = Image.open(buffer) + phash = imagehash.phash( + data, hash_size=self.hash_size, highfreq_factor=self.high_freq_factor + ) + return str(phash) + + def update_status(self, message): + result = str() if message is None else str(message) + # Only update the status message for non-equivalent hash comparisons. + if self.equivalent is False: + msg = ( + f"Hash hamming distance of {self.hamming_distance} bits > " + f"hamming tolerance of {self.hamming_tolerance} bits." + ) + result = f"{message} {msg}" if len(result) else msg + return result + + def update_summary(self, summary): + super().update_summary(summary) + summary["hamming_distance"] = self.hamming_distance + summary["hamming_tolerance"] = self.hamming_tolerance + + @property + def metadata(self): + result = super().metadata + result["hash_size"] = self.hash_size + result["high_freq_factor"] = self.high_freq_factor + return result + + +class KernelSHA256(Kernel): + """ + A simple kernel that calculates a 256-bit cryptographic SHA hash + of an image. + + """ + + name = KERNEL_SHA256 + + def equivalent_hash(self, result, baseline, marker=None): + # Simple cryptographic hash binary comparison. Interpretation of + # the comparison result is that the hashes are either identical or + # not identical. For non-identical hashes, it is not possible to + # determine a heuristic of hash "similarity" due to the nature of + # cryptographic hashes. + return result == baseline + + def generate_hash(self, buffer): + buffer.seek(0) + data = buffer.read() + hasher = hashlib.sha256() + hasher.update(data) + return hasher.hexdigest() + + +#: Registry of available hashing kernel factories. +kernel_factory = { + KernelPHash.name: KernelPHash, + KernelSHA256.name: KernelSHA256, +} diff --git a/tests/test_kernels.py b/tests/test_kernels.py new file mode 100644 index 00000000..0d69aa00 --- /dev/null +++ b/tests/test_kernels.py @@ -0,0 +1,265 @@ +from pathlib import Path + +import pytest + +from pytest_mpl.kernels import (DEFAULT_HAMMING_TOLERANCE, DEFAULT_HASH_SIZE, + DEFAULT_HIGH_FREQUENCY_FACTOR, Kernel, + KernelPHash, KernelSHA256, kernel_factory) + +#: baseline hash (32-bit) +HASH_BASE_32 = "01234567" + +#: baseline hash (64-bit) +HASH_BASE = "0123456789abcdef" + +#: baseline hash with 2-bit delta (64-bit) +# ---X------------ +HASH_2BIT = "0120456789abcdef" + +#: baseline with 4-bit delta (64-bit) +# --XX-----------X +HASH_4BIT = "0100456789abcdee" + +#: baseline with 8-bit delta (64-bit) +# -X------------XX +HASH_8BIT = "0023456789abcd00" + + +#: Absolute path to test baseline image +baseline_image = Path(__file__).parent / "baseline" / "2.0.x" / "test_base_style.png" + +#: Verify availabilty of test baseline image +baseline_unavailable = not baseline_image.is_file() + +#: Convenience skipif reason +baseline_missing = f"missing baseline image {str(baseline_image)!r}" + + +class DummyMarker: + def __init__(self, hamming_tolerance=None): + self.kwargs = dict(hamming_tolerance=hamming_tolerance) + + +class DummyPlugin: + def __init__(self, hash_size=None, hamming_tolerance=None, high_freq_factor=None): + self.hash_size = hash_size + self.hamming_tolerance = hamming_tolerance + self.high_freq_factor = high_freq_factor + + +def test_kernel_abc(): + emsg = "Can't instantiate abstract class Kernel" + with pytest.raises(TypeError, match=emsg): + Kernel(None) + + +def test_phash_name(): + for name, factory in kernel_factory.items(): + assert name == factory.name + + +# +# KernelPHash +# + + +def test_phash_init__set(): + hash_size, hamming_tolerance, high_freq_factor = -1, -2, -3 + plugin = DummyPlugin( + hash_size=hash_size, + hamming_tolerance=hamming_tolerance, + high_freq_factor=high_freq_factor, + ) + kernel = KernelPHash(plugin) + assert kernel.hash_size == hash_size + assert kernel.hamming_tolerance == hamming_tolerance + assert kernel.high_freq_factor == high_freq_factor + assert kernel.equivalent is None + assert kernel.hamming_distance is None + + +def test_phash_init__default(): + plugin = DummyPlugin() + kernel = KernelPHash(plugin) + assert kernel.hash_size == DEFAULT_HASH_SIZE + assert kernel.hamming_tolerance == DEFAULT_HAMMING_TOLERANCE + assert kernel.high_freq_factor == DEFAULT_HIGH_FREQUENCY_FACTOR + assert kernel.equivalent is None + assert kernel.hamming_distance is None + + +def test_phash_option(): + assert KernelPHash(DummyPlugin()).option == "hamming_tolerance" + + +@pytest.mark.parametrize( + "baseline,equivalent,distance", + [ + (HASH_BASE, True, 0), + (HASH_2BIT, True, 2), + (HASH_4BIT, True, 4), + (HASH_8BIT, False, 8), + (HASH_BASE_32, False, None), + ], +) +def test_phash_equivalent(baseline, equivalent, distance): + kernel = KernelPHash(DummyPlugin()) + assert kernel.equivalent_hash(HASH_BASE, baseline) is equivalent + assert kernel.equivalent is equivalent + assert kernel.hamming_distance == distance + + +def test_phash_equivalent__tolerance(): + hamming_tolerance = 10 + plugin = DummyPlugin(hamming_tolerance=hamming_tolerance) + kernel = KernelPHash(plugin) + assert kernel.equivalent_hash(HASH_BASE, HASH_4BIT) + assert kernel.equivalent is True + assert kernel.hamming_tolerance == hamming_tolerance + assert kernel.hamming_distance == 4 + + +@pytest.mark.parametrize( + "tolerance,equivalent", + [(10, True), (3, False)], +) +def test_phash_equivalent__marker(tolerance, equivalent): + marker = DummyMarker(hamming_tolerance=tolerance) + kernel = KernelPHash(DummyPlugin()) + assert kernel.hamming_tolerance == DEFAULT_HAMMING_TOLERANCE + assert kernel.equivalent_hash(HASH_BASE, HASH_4BIT, marker=marker) is equivalent + assert kernel.equivalent is equivalent + assert kernel.hamming_tolerance == tolerance + assert kernel.hamming_distance == 4 + + +@pytest.mark.skipif(baseline_unavailable, reason=baseline_missing) +@pytest.mark.parametrize( + "hash_size,hff,expected", + [ + ( + DEFAULT_HASH_SIZE, + DEFAULT_HIGH_FREQUENCY_FACTOR, + "800bc0555feab05f67ea8d1779fa83537e7ec0d17f9f003517ef200985532856", + ), + ( + DEFAULT_HASH_SIZE, + 8, + "800fc0155fe8b05f67ea8d1779fa83537e7ec0d57f9f003517ef200985532856", + ), + (8, DEFAULT_HIGH_FREQUENCY_FACTOR, "80c05fb1778d79c3"), + ( + DEFAULT_HASH_SIZE, + 16, + "800bc0155feab05f67ea8d1779fa83537e7ec0d57f9f003517ef200985532856", + ), + ], +) +def test_phash_generate_hash(hash_size, hff, expected): + plugin = DummyPlugin(hash_size=hash_size, high_freq_factor=hff) + kernel = KernelPHash(plugin) + with open(baseline_image, "rb") as fh: + actual = kernel.generate_hash(fh) + assert actual == expected + + +@pytest.mark.parametrize("message", (None, "", "one")) +@pytest.mark.parametrize("equivalent", (None, True)) +def test_phash_update_status__none(message, equivalent): + kernel = KernelPHash(DummyPlugin()) + kernel.equivalent = equivalent + result = kernel.update_status(message) + assert isinstance(result, str) + expected = 0 if message is None else len(message) + assert len(result) == expected + + +@pytest.mark.parametrize("message", ("", "one")) +@pytest.mark.parametrize("distance", (10, 20)) +@pytest.mark.parametrize("tolerance", (1, 2)) +def test_phash_update_status__equivalent(message, distance, tolerance): + plugin = DummyPlugin(hamming_tolerance=tolerance) + kernel = KernelPHash(plugin) + kernel.equivalent = False + kernel.hamming_distance = distance + result = kernel.update_status(message) + assert isinstance(result, str) + template = "Hash hamming distance of {} bits > hamming tolerance of {} bits." + status = template.format(distance, tolerance) + expected = f"{message} {status}" if message else status + assert result == expected + + +@pytest.mark.parametrize( + "summary,distance,tolerance,count", + [({}, None, DEFAULT_HAMMING_TOLERANCE, 3), (dict(one=1), 2, 3, 4)], +) +def test_phash_update_summary(summary, distance, tolerance, count): + plugin = DummyPlugin(hamming_tolerance=tolerance) + kernel = KernelPHash(plugin) + kernel.hamming_distance = distance + kernel.update_summary(summary) + assert summary["kernel"] == KernelPHash.name + assert summary["hamming_distance"] == distance + assert summary["hamming_tolerance"] == tolerance + assert len(summary) == count + + +@pytest.mark.parametrize( + "hash_size,hff", + [(DEFAULT_HASH_SIZE, DEFAULT_HIGH_FREQUENCY_FACTOR), (32, 8)], +) +def test_phash_metadata(hash_size, hff): + plugin = DummyPlugin(hash_size=hash_size, high_freq_factor=hff) + kernel = KernelPHash(plugin) + metadata = kernel.metadata + assert {"name", "hash_size", "high_freq_factor"} == set(metadata) + assert metadata["name"] == KernelPHash.name + assert metadata["hash_size"] == hash_size + assert metadata["high_freq_factor"] == hff + + +# +# KernelSHA256 +# + + +@pytest.mark.parametrize( + "baseline, equivalent", + [(HASH_BASE, True), (HASH_2BIT, False), (HASH_4BIT, False)], +) +def test_sha256_equivalent(baseline, equivalent): + kernel = KernelSHA256(DummyPlugin()) + assert kernel.equivalent_hash(HASH_BASE, baseline) is equivalent + + +@pytest.mark.skipif(baseline_unavailable, reason=baseline_missing) +def test_sha256_generate_hash(): + kernel = KernelSHA256(DummyPlugin()) + with open(baseline_image, "rb") as fh: + actual = kernel.generate_hash(fh) + expected = "2dc4d32eefa5f5d11c365b10196f2fcdadc8ed604e370d595f9cf304363c13d2" + assert actual == expected + + +def test_sha256_update_status(): + kernel = KernelSHA256(DummyPlugin()) + message = "nop" + result = kernel.update_status(message) + assert result is message + + +def test_sha256_update_summary(): + kernel = KernelSHA256(DummyPlugin()) + summary = {} + kernel.update_summary(summary) + assert len(summary) == 1 + assert "kernel" in summary + assert summary["kernel"] == KernelSHA256.name + + +def test_sha256_metadata(): + kernel = KernelSHA256(DummyPlugin()) + metadata = kernel.metadata + assert {"name"} == set(metadata) + assert metadata["name"] == KernelSHA256.name