-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgeodiff.py
256 lines (226 loc) · 8.45 KB
/
geodiff.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
import argparse
import math
import os
from dataclasses import dataclass
from enum import Enum
from typing import Any, List, Optional
from colorama import Fore, Style
from osgeo import gdal
from yirgacheffe.layers import RasterLayer
gdal.UseExceptions()
gdal.SetConfigOption('CPL_LOG', '/dev/null')
class Result(Enum):
SUCCESS=1
WARNING=2
FAIL=3
@dataclass
class ReportEntry:
key: str
left_value: Any
right_value: Any
success: Result
notes: str
def gdal_datatype_to_str(gdt: int) -> str:
if gdal.GDT_Byte == gdt:
return "byte"
if gdal.GDT_Int8 == gdt:
return "int 8"
if gdal.GDT_Int16 == gdt:
return "int 16"
if gdal.GDT_Int32 == gdt:
return "int 32"
if gdal.GDT_Int64 == gdt:
return "int 64"
if gdal.GDT_UInt16 == gdt:
return "unsigned int 16"
if gdal.GDT_UInt32 == gdt:
return "unsigned int 32"
if gdal.GDT_Int64 == gdt:
return "unsigned int 64"
if gdal.GDT_Float32 == gdt:
return "float 32"
if gdal.GDT_Float64 == gdt:
return "float 64"
if gdal.GDT_CFloat32 == gdt:
return "complex float 32"
if gdal.GDT_CFloat64 == gdt:
return "complex float 64"
if gdal.GDT_CInt16 == gdt:
return "complex int 16"
if gdal.GDT_CInt32 == gdt:
return "complex int 32"
if gdal.GDT_TypeCount == gdt:
return "type count"
if gdal.GDT_Unknown == gdt:
return "gdal unknown"
return "actual unknown"
def geodiff(
left_file_path: str,
right_file_path: str,
save_raster_path: Optional[str],
) -> List[ReportEntry]:
report = {
"left": left_file_path,
"right": right_file_path,
"report": []
}
left = RasterLayer.layer_from_file(left_file_path)
right = RasterLayer.layer_from_file(right_file_path)
same_scale = left.pixel_scale == right.pixel_scale
report["report"].append(ReportEntry(
key="Pixel Scale",
left_value=left.pixel_scale,
right_value=right.pixel_scale,
success=Result.SUCCESS if same_scale else Result.FAIL,
notes="Pass" if same_scale else "Pixel matching will not be attepted"
))
same_projection = left.projection == right.projection
report["report"].append(ReportEntry(
key="Projection",
left_value=left.projection,
right_value=right.projection,
success=Result.SUCCESS if same_projection else Result.FAIL,
notes="Pass" if same_projection else "Pixel matching will not be attempted"
))
same_datatype = left.datatype == right.datatype
report["report"].append(ReportEntry(
key="Data type",
left_value=gdal_datatype_to_str(left.datatype),
right_value=gdal_datatype_to_str(right.datatype),
success=Result.SUCCESS if same_datatype else Result.WARNING,
notes="Pass" if same_datatype else "Pixel matching results will be suspect"
))
same_area = left.area == right.area
report["report"].append(ReportEntry(
key="Area",
left_value=left.area,
right_value=right.area,
success=Result.SUCCESS if same_area else Result.WARNING,
notes="Pass" if same_area else "Pixel matching only for overlapping area"
))
try:
intersection = RasterLayer.find_intersection([left, right])
except ValueError:
intersection = None
report["report"].append(ReportEntry(
key="Intersection",
left_value=left.area,
right_value=right.area,
success=Result.SUCCESS if intersection is not None else Result.FAIL,
notes="Pass" if intersection is not None else "Pixel matching will not be attempted"
))
# Metadata comparisons over, so return early if we can't do anything
# with matching pixels
if (not same_scale) or (not same_projection) or (intersection is None):
return report
left.set_window_for_intersection(intersection)
right.set_window_for_intersection(intersection)
diff_save_name = None
if save_raster_path is not None:
_, left_filename = os.path.split(left_file_path)
diff_save_name = os.path.join(save_raster_path, left_filename)
diff_layer = RasterLayer.empty_raster_layer_like(
left,
filename=diff_save_name,
datatype=gdal.GDT_Byte,
nbits=1
)
diff_calc = left.numpy_apply(lambda left, right: left != right, right)
sum = diff_calc.save(diff_layer, and_sum=True)
report["report"].append(ReportEntry(
key="Pixel diff count",
left_value=(float(sum) / (left.window.xsize * left.window.ysize)) * 100 ,
right_value=None,
success=Result.SUCCESS if sum == 0 else Result.FAIL,
notes="Pass" if sum == 0 else "Different pixel values found (percentage)"
))
# If at this stage both images are the same, our work is done
if sum == 0:
return
# Stats are a tuple of (min, max, mean, stddev)
left_stats = left._dataset.GetRasterBand(1).GetStatistics(False, True)
right_stats = right._dataset.GetRasterBand(1).GetStatistics(False, True)
same_min = math.isclose(left_stats[0], right_stats[0])
report["report"].append(ReportEntry(
key="Minimum",
left_value=left_stats[0],
right_value=right_stats[0],
success=Result.SUCCESS if same_min else Result.FAIL,
notes="Pass" if same_min else "Different lower bound pixel value"
))
same_max = math.isclose(left_stats[1], right_stats[1])
report["report"].append(ReportEntry(
key="Maximum",
left_value=left_stats[1],
right_value=right_stats[1],
success=Result.SUCCESS if same_max else Result.FAIL,
notes="Pass" if same_min else "Different upper bound pixel value"
))
# This is a guess that we're using an enumerated dataset. Another option would
# to be test for NBITS, but we already want to do the min/max check anyway
if same_min and same_max and same_datatype and (gdal.GDT_Byte == left.datatype) and int((left_stats[1] - left_stats[0])) < 16:
for value in range(int(left_stats[0]), int(left_stats[1]) + 1):
left_count = left.numpy_apply(lambda chunk: chunk == value).sum()
right_count = right.numpy_apply(lambda chunk: chunk == value).sum()
same_val = left_count == right_count
report["report"].append(ReportEntry(
key=f"Band {value}",
left_value=left_count,
right_value=right_count,
success=Result.SUCCESS if same_val else Result.FAIL,
notes="Pass" if same_val else f"Different counts for band by {abs(left_count - right_count)}"
))
return report
def pretty_print_report(report:dict) -> None:
print(f"left: {report['left']}")
print(f"right: {report['right']}")
for report in report["report"]:
print(f"{report.key}: ", end='')
if Result.SUCCESS == report.success:
print(f"{Fore.GREEN}SUCCESS{Style.RESET_ALL}")
elif Result.WARNING == report.success:
print(f"{Fore.YELLOW}WARNING!{Style.RESET_ALL}")
elif Result.FAIL == report.success:
print(f"{Fore.RED}FAIL!!{Style.RESET_ALL}")
else:
print("UNKNOWN RESULT")
if Result.SUCCESS != report.success:
print(f"\t{report.notes}")
if report.right_value is not None:
print(f"\tLeft: {report.left_value}")
print(f"\tRight: {report.right_value}")
else:
print(f"\tDifference: {report.left_value}")
def main() -> None:
parser = argparse.ArgumentParser(description="Diffs GeoTIFFs")
parser.add_argument(
"--left",
type=str,
help="GeoTIFF or folder of GeoTIFFs",
required=True,
dest="left_path",
)
parser.add_argument(
"--right",
type=str,
help="GeoTIFF or folder of GeoTIFFs",
required=True,
dest="right_path",
)
parser.add_argument(
"--save-diff-raster",
type=str,
help="Path of directory where to save the diff between rasters if possible",
required=False,
default=None,
dest="save_raster_path"
)
args = parser.parse_args()
if args.save_raster_path is not None:
os.makedirs(args.save_raster_path, exist_ok=True)
# TODO: Add support for folders
report = geodiff(args.left_path, args.right_path, args.save_raster_path)
# TODO: Add option to save report as JSON
pretty_print_report(report)
if __name__ == "__main__":
main()