From 8356b6a0bd3efe614108d95e03245f97ebef8515 Mon Sep 17 00:00:00 2001 From: "milos.colic" Date: Mon, 26 Feb 2024 14:07:15 +0000 Subject: [PATCH] Add COG to format extensions list. Add createInfo concept, this will contain driver, parentPath, currentPath, etc. Make gdal programs no-failure operations. Capture errors and warnings of gdal programs in the raster tile metadata. Add RST_Transform expression. Add ReadAsPath reading strategy. --- python/mosaic/api/raster.py | 27 ++ .../mosaic/core/raster/api/FormatLookup.scala | 1 + .../labs/mosaic/core/raster/api/GDAL.scala | 30 +- .../core/raster/gdal/MosaicRasterGDAL.scala | 151 ++++++---- .../mosaic/core/raster/io/RasterReader.scala | 38 +-- .../raster/operator/gdal/GDALBuildVRT.scala | 19 +- .../core/raster/operator/gdal/GDALCalc.scala | 31 ++- .../core/raster/operator/gdal/GDALInfo.scala | 14 +- .../raster/operator/gdal/GDALTranslate.scala | 20 +- .../core/raster/operator/gdal/GDALWarp.scala | 25 +- .../operator/retile/OverlappingTiles.scala | 2 +- .../operator/retile/RasterTessellate.scala | 4 +- .../core/raster/operator/retile/ReTile.scala | 2 +- .../operator/separate/SeparateBands.scala | 13 +- .../mosaic/core/types/RasterTileType.scala | 3 +- .../core/types/model/MosaicRasterTile.scala | 48 ++-- .../mosaic/datasource/gdal/ReTileOnRead.scala | 5 +- .../mosaic/datasource/gdal/ReadAsPath.scala | 124 +++++++++ .../mosaic/datasource/gdal/ReadInMemory.scala | 12 +- .../mosaic/datasource/gdal/ReadStrategy.scala | 1 + .../expressions/raster/RST_CombineAvg.scala | 7 +- .../raster/RST_CombineAvgAgg.scala | 5 +- .../expressions/raster/RST_DerivedBand.scala | 4 +- .../raster/RST_DerivedBandAgg.scala | 5 +- .../expressions/raster/RST_FromContent.scala | 5 +- .../expressions/raster/RST_FromFile.scala | 7 +- .../expressions/raster/RST_MakeTiles.scala | 5 +- .../expressions/raster/RST_MapAlgebra.scala | 8 +- .../expressions/raster/RST_MergeAgg.scala | 5 +- .../expressions/raster/RST_SetSRID.scala | 8 +- .../expressions/raster/RST_Transform.scala | 61 ++++ .../mosaic/expressions/raster/package.scala | 19 +- .../labs/mosaic/functions/MosaicContext.scala | 3 + .../labs/mosaic/gdal/MosaicGDAL.scala | 1 + .../core/raster/TestRasterBandGDAL.scala | 28 +- .../mosaic/core/raster/TestRasterGDAL.scala | 262 +++++++++--------- .../raster/RST_CombineAvgBehaviors.scala | 6 +- .../raster/RST_DerivedBandBehaviors.scala | 6 +- .../raster/RST_MergeBehaviors.scala | 8 +- .../raster/RST_TessellateBehaviors.scala | 2 +- .../raster/RST_TransformBehaviors.scala | 49 ++++ .../raster/RST_TransformTest.scala | 32 +++ 42 files changed, 744 insertions(+), 362 deletions(-) create mode 100644 src/main/scala/com/databricks/labs/mosaic/datasource/gdal/ReadAsPath.scala create mode 100644 src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_Transform.scala create mode 100644 src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_TransformBehaviors.scala create mode 100644 src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_TransformTest.scala diff --git a/python/mosaic/api/raster.py b/python/mosaic/api/raster.py index 3638510dc..c61bafcd2 100644 --- a/python/mosaic/api/raster.py +++ b/python/mosaic/api/raster.py @@ -55,6 +55,7 @@ "rst_subdivide", "rst_summary", "rst_tessellate", + "rst_transform", "rst_to_overlapping_tiles", "rst_tryopen", "rst_upperleftx", @@ -997,6 +998,32 @@ def rst_tessellate(raster_tile: ColumnOrName, resolution: ColumnOrName) -> Colum ) +def rst_transform(raster_tile: ColumnOrName, srid: ColumnOrName) -> Column: + """ + Transforms the raster to the given SRID. + The result is a Mosaic raster tile struct of the transformed raster. + The result is stored in the checkpoint directory. + + Parameters + ---------- + raster_tile : Column (RasterTileType) + Mosaic raster tile struct column. + srid : Column (IntegerType) + EPSG authority code for the file's projection. + + Returns + ------- + Column (RasterTileType) + Mosaic raster tile struct column. + + """ + return config.mosaic_context.invoke_function( + "rst_transform", + pyspark_to_java_column(raster_tile), + pyspark_to_java_column(srid), + ) + + def rst_fromcontent( raster_bin: ColumnOrName, driver: ColumnOrName, size_in_mb: Any = -1 ) -> Column: diff --git a/src/main/scala/com/databricks/labs/mosaic/core/raster/api/FormatLookup.scala b/src/main/scala/com/databricks/labs/mosaic/core/raster/api/FormatLookup.scala index e3aeb5296..8bf2d9cdb 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/raster/api/FormatLookup.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/raster/api/FormatLookup.scala @@ -17,6 +17,7 @@ object FormatLookup { "CAD" -> "dwg", "CEOS" -> "ceos", "COASP" -> "coasp", + "COG" -> "tif", "COSAR" -> "cosar", "CPG" -> "cpg", "CSW" -> "csw", diff --git a/src/main/scala/com/databricks/labs/mosaic/core/raster/api/GDAL.scala b/src/main/scala/com/databricks/labs/mosaic/core/raster/api/GDAL.scala index b86489359..6e2fee0f6 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/raster/api/GDAL.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/raster/api/GDAL.scala @@ -96,27 +96,26 @@ object GDAL { */ def readRaster( inputRaster: Any, - parentPath: String, - shortDriverName: String, + createInfo: Map[String, String], inputDT: DataType ): MosaicRasterGDAL = { inputDT match { case StringType => val path = inputRaster.asInstanceOf[UTF8String].toString - MosaicRasterGDAL.readRaster(path, parentPath) + MosaicRasterGDAL.readRaster(createInfo) case BinaryType => val bytes = inputRaster.asInstanceOf[Array[Byte]] - val raster = MosaicRasterGDAL.readRaster(bytes, parentPath, shortDriverName) + val raster = MosaicRasterGDAL.readRaster(bytes, createInfo) // If the raster is coming as a byte array, we can't check for zip condition. // We first try to read the raster directly, if it fails, we read it as a zip. if (raster == null) { + val parentPath = createInfo("parentPath") val zippedPath = s"/vsizip/$parentPath" - MosaicRasterGDAL.readRaster(bytes, zippedPath, shortDriverName) + MosaicRasterGDAL.readRaster(bytes, createInfo + ("path" -> zippedPath)) } else { raster } - case _ => - throw new IllegalArgumentException(s"Unsupported data type: $inputDT") + case _ => throw new IllegalArgumentException(s"Unsupported data type: $inputDT") } } @@ -160,7 +159,10 @@ object GDAL { * @return * Returns a Raster object. */ - def raster(path: String, parentPath: String): MosaicRasterGDAL = MosaicRasterGDAL.readRaster(path, parentPath) + def raster(path: String, parentPath: String): MosaicRasterGDAL = { + val createInfo = Map("path" -> path, "parentPath" -> parentPath) + MosaicRasterGDAL.readRaster(createInfo) + } /** * Reads a raster from the given byte array. If the byte array is a zip @@ -171,8 +173,10 @@ object GDAL { * @return * Returns a Raster object. */ - def raster(content: Array[Byte], parentPath: String, driverShortName: String): MosaicRasterGDAL = - MosaicRasterGDAL.readRaster(content, parentPath, driverShortName) + def raster(content: Array[Byte], parentPath: String, driverShortName: String): MosaicRasterGDAL = { + val createInfo = Map("parentPath" -> parentPath, "driver" -> driverShortName) + MosaicRasterGDAL.readRaster(content, createInfo) + } /** * Reads a raster from the given path. It extracts the specified band from @@ -186,8 +190,10 @@ object GDAL { * @return * Returns a Raster band object. */ - def band(path: String, bandIndex: Int, parentPath: String): MosaicRasterBandGDAL = - MosaicRasterGDAL.readBand(path, bandIndex, parentPath) + def band(path: String, bandIndex: Int, parentPath: String): MosaicRasterBandGDAL = { + val createInfo = Map("path" -> path, "parentPath" -> parentPath) + MosaicRasterGDAL.readBand(bandIndex, createInfo) + } /** * Converts raster x, y coordinates to lat, lon coordinates. diff --git a/src/main/scala/com/databricks/labs/mosaic/core/raster/gdal/MosaicRasterGDAL.scala b/src/main/scala/com/databricks/labs/mosaic/core/raster/gdal/MosaicRasterGDAL.scala index 108591529..10f04416d 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/raster/gdal/MosaicRasterGDAL.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/raster/gdal/MosaicRasterGDAL.scala @@ -26,13 +26,17 @@ import scala.util.{Failure, Success, Try} //noinspection DuplicatedCode case class MosaicRasterGDAL( raster: Dataset, - path: String, - parentPath: String, - driverShortName: String, + createInfo: Map[String, String], memSize: Long ) extends RasterWriter with RasterCleaner { + def path: String = createInfo("path") + + def parentPath: String = createInfo("parentPath") + + def driverShortName: Option[String] = createInfo.get("driver") + def getWriteOptions: MosaicRasterWriteOptions = MosaicRasterWriteOptions(this) def getCompression: String = { @@ -72,7 +76,9 @@ case class MosaicRasterGDAL( * @return * The raster's driver short name. */ - def getDriversShortName: String = driverShortName + def getDriversShortName: String = driverShortName.getOrElse( + Try(raster.GetDriver().getShortName).getOrElse("NONE") + ) /** * @return @@ -114,7 +120,7 @@ case class MosaicRasterGDAL( def pixelDiagSize: Double = math.sqrt(pixelXSize * pixelXSize + pixelYSize * pixelYSize) /** @return Returns file extension. */ - def getRasterFileExtension: String = GDAL.getExtension(driverShortName) + def getRasterFileExtension: String = GDAL.getExtension(getDriversShortName) /** @return Returns the raster's bands as a Seq. */ def getBands: Seq[MosaicRasterBandGDAL] = (1 to numBands).map(getBand) @@ -141,7 +147,7 @@ case class MosaicRasterGDAL( * A MosaicRaster object. */ def openRaster(path: String): Dataset = { - MosaicRasterGDAL.openRaster(path, Some(driverShortName)) + MosaicRasterGDAL.openRaster(path, driverShortName) } /** @@ -202,7 +208,6 @@ case class MosaicRasterGDAL( .toInt } - /** * @return * Sets the raster's SRID. This is the EPSG code of the raster's CRS. @@ -212,15 +217,18 @@ case class MosaicRasterGDAL( srs.ImportFromEPSG(srid) raster.SetSpatialRef(srs) val driver = raster.GetDriver() - val newPath = PathUtils.createTmpFilePath(GDAL.getExtension(driverShortName)) + val newPath = PathUtils.createTmpFilePath(GDAL.getExtension(getDriversShortName)) driver.CreateCopy(newPath, raster) - val newRaster = MosaicRasterGDAL.openRaster(newPath, Some(driverShortName)) + val newRaster = MosaicRasterGDAL.openRaster(newPath, driverShortName) dispose(this) - MosaicRasterGDAL(newRaster, newPath, parentPath, driverShortName, -1) + val createInfo = Map( + "path" -> newPath, + "parentPath" -> parentPath, + "driver" -> getDriversShortName + ) + MosaicRasterGDAL(newRaster, createInfo, -1) } - - /** * @return * Returns the raster's proj4 string. @@ -340,10 +348,9 @@ case class MosaicRasterGDAL( def isEmpty: Boolean = { val bands = getBands if (bands.isEmpty) { - subdatasets - .values - .filter(_.toLowerCase(Locale.ROOT).startsWith(driverShortName.toLowerCase(Locale.ROOT))) - .flatMap(readRaster(_, path).getBands) + subdatasets.values + .filter(_.toLowerCase(Locale.ROOT).startsWith(getDriversShortName.toLowerCase(Locale.ROOT))) + .flatMap(bp => readRaster(createInfo + ("path" -> bp)).getBands) .takeWhile(_.isEmpty) .nonEmpty } else { @@ -381,7 +388,7 @@ case class MosaicRasterGDAL( val cleanPath = filePath.replace("/vsizip/", "") val zipPath = if (cleanPath.endsWith("zip")) cleanPath else s"$cleanPath.zip" if (path != PathUtils.getCleanPath(parentPath)) { - Try(gdal.GetDriverByName(driverShortName).Delete(path)) + Try(gdal.GetDriverByName(getDriversShortName).Delete(path)) Try(Files.deleteIfExists(Paths.get(cleanPath))) Try(Files.deleteIfExists(Paths.get(path))) Try(Files.deleteIfExists(Paths.get(filePath))) @@ -502,7 +509,7 @@ case class MosaicRasterGDAL( * usable again. */ def refresh(): MosaicRasterGDAL = { - MosaicRasterGDAL(openRaster(path), path, parentPath, driverShortName, memSize) + MosaicRasterGDAL(openRaster(path), createInfo, memSize) } /** @@ -555,22 +562,35 @@ case class MosaicRasterGDAL( * Returns the raster's subdataset with given name. */ def getSubdataset(subsetName: String): MosaicRasterGDAL = { - val path = subdatasets.getOrElse( - s"${subsetName}_tmp", - throw new Exception(s""" - |Subdataset $subsetName not found! - |Available subdatasets: - | ${subdatasets.keys.filterNot(_.startsWith("SUBDATASET_")).mkString(", ")} - | """.stripMargin) - ) - val sanitized = PathUtils.getCleanPath(path) + val path = subdatasets.get(s"${subsetName}_tmp") + val gdalError = gdal.GetLastErrorMsg() + val error = path match { + case Some(_) => "" + case None => + s""" + |Subdataset $subsetName not found! + |Available subdatasets: + | ${subdatasets.keys.filterNot(_.startsWith("SUBDATASET_")).mkString(", ")} + | """.stripMargin + } + val sanitized = PathUtils.getCleanPath(path.getOrElse(PathUtils.NO_PATH_STRING)) val subdatasetPath = PathUtils.getSubdatasetPath(sanitized) val ds = openRaster(subdatasetPath) // Avoid costly IO to compute MEM size here // It will be available when the raster is serialized for next operation // If value is needed then it will be computed when getMemSize is called - MosaicRasterGDAL(ds, path, parentPath, driverShortName, -1) + val createInfo = Map( + "path" -> path.getOrElse(PathUtils.NO_PATH_STRING), + "parentPath" -> parentPath, + "driver" -> getDriversShortName, + "last_error" -> + s""" + |GDAL Error: $gdalError + |$error + |""".stripMargin + ) + MosaicRasterGDAL(ds, createInfo, -1) } def convolve(kernel: Array[Array[Double]]): MosaicRasterGDAL = { @@ -584,7 +604,13 @@ case class MosaicRasterGDAL( band.convolve(kernel) } - MosaicRasterGDAL(outputRaster, resultRasterPath, parentPath, driverShortName, -1) + val createInfo = Map( + "path" -> resultRasterPath, + "parentPath" -> parentPath, + "driver" -> getDriversShortName + ) + + MosaicRasterGDAL(outputRaster, createInfo, -1) } @@ -604,7 +630,13 @@ case class MosaicRasterGDAL( band.filter(kernelSize, operation, outputBand) } - val result = MosaicRasterGDAL(outputRaster, resultRasterPath, parentPath, driverShortName, this.memSize) + val createInfo = Map( + "path" -> resultRasterPath, + "parentPath" -> parentPath, + "driver" -> getDriversShortName + ) + + val result = MosaicRasterGDAL(outputRaster, createInfo, this.memSize) result.flushCache() } @@ -659,25 +691,44 @@ object MosaicRasterGDAL extends RasterReader { * @example * Raster: path = "file:///path/to/file.tif" Subdataset: path = * "file:///path/to/file.tif:subdataset" - * @param inPath - * The path to the raster file. + * @param createInfo + * The create info for the raster. This should contain the following + * keys: + * - path: The path to the raster file. + * - parentPath: The path of the parent raster file. * @return * A MosaicRaster object. */ - override def readRaster(inPath: String, parentPath: String): MosaicRasterGDAL = { + override def readRaster(createInfo: Map[String, String]): MosaicRasterGDAL = { + val inPath = createInfo("path") val isSubdataset = PathUtils.isSubdataset(inPath) val path = PathUtils.getCleanPath(inPath) val readPath = if (isSubdataset) PathUtils.getSubdatasetPath(path) else PathUtils.getZipPath(path) val dataset = openRaster(readPath, None) - val driverShortName = dataset.GetDriver().getShortName - + val error = + if (dataset == null) { + val error = gdal.GetLastErrorMsg() + s""" + Error reading raster from path: $readPath + Error: $error + """ + } else "" + val driverShortName = Try(dataset.GetDriver().getShortName).getOrElse("NONE") // Avoid costly IO to compute MEM size here // It will be available when the raster is serialized for next operation // If value is needed then it will be computed when getMemSize is called // We cannot just use memSize value of the parent due to the fact that the raster could be a subdataset - val raster = MosaicRasterGDAL(dataset, path, parentPath, driverShortName, -1) + val raster = MosaicRasterGDAL( + dataset, + createInfo ++ + Map( + "driver" -> driverShortName, + "last_error" -> error + ), + -1 + ) raster } @@ -685,17 +736,19 @@ object MosaicRasterGDAL extends RasterReader { * Reads a raster from a byte array. * @param contentBytes * The byte array containing the raster data. - * @param driverShortName - * The driver short name of the raster. + * @param createInfo + * Mosaic creation info of the raster. Note: This is not the same as the + * metadata of the raster. This is not the same as GDAL creation options. * @return * A MosaicRaster object. */ - override def readRaster(contentBytes: Array[Byte], parentPath: String, driverShortName: String): MosaicRasterGDAL = { + override def readRaster(contentBytes: Array[Byte], createInfo: Map[String, String]): MosaicRasterGDAL = { if (Option(contentBytes).isEmpty || contentBytes.isEmpty) { - MosaicRasterGDAL(null, "", parentPath, "", -1) + MosaicRasterGDAL(null, createInfo, -1) } else { // This is a temp UUID for purposes of reading the raster through GDAL from memory // The stable UUID is kept in metadata of the raster + val driverShortName = createInfo("driver") val extension = GDAL.getExtension(driverShortName) val tmpPath = PathUtils.createTmpFilePath(extension) Files.write(Paths.get(tmpPath), contentBytes) @@ -721,12 +774,12 @@ object MosaicRasterGDAL extends RasterReader { if (dataset == null) { throw new Exception(s"Error reading raster from bytes: ${prompt._3}") } - MosaicRasterGDAL(dataset, unzippedPath, parentPath, driverShortName, contentBytes.length) + MosaicRasterGDAL(dataset, createInfo + ("path" -> unzippedPath), contentBytes.length) } else { - MosaicRasterGDAL(ds, readPath, parentPath, driverShortName, contentBytes.length) + MosaicRasterGDAL(ds, createInfo + ("path" -> readPath), contentBytes.length) } } else { - MosaicRasterGDAL(dataset, tmpPath, parentPath, driverShortName, contentBytes.length) + MosaicRasterGDAL(dataset, createInfo + ("path" -> tmpPath), contentBytes.length) } } } @@ -738,15 +791,19 @@ object MosaicRasterGDAL extends RasterReader { * @example * Raster: path = "file:///path/to/file.tif" Subdataset: path = * "file:///path/to/file.tif:subdataset" - * @param path - * The path to the raster file. + * @param createInfo + * The create info for the raster. This should contain the following + * keys: + * - path: The path to the raster file. + * - parentPath: The path of the parent raster file. + * - driver: Optional: The driver short name of the raster file * @param bandIndex * The band index to read. * @return * A MosaicRaster object. */ - override def readBand(path: String, bandIndex: Int, parentPath: String): MosaicRasterBandGDAL = { - val raster = readRaster(path, parentPath) + override def readBand(bandIndex: Int, createInfo: Map[String, String]): MosaicRasterBandGDAL = { + val raster = readRaster(createInfo) // TODO: Raster and Band are coupled, this can cause a pointer leak raster.getBand(bandIndex) } diff --git a/src/main/scala/com/databricks/labs/mosaic/core/raster/io/RasterReader.scala b/src/main/scala/com/databricks/labs/mosaic/core/raster/io/RasterReader.scala index b207789ae..d8a1a90c1 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/raster/io/RasterReader.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/raster/io/RasterReader.scala @@ -20,14 +20,16 @@ trait RasterReader extends Logging { * @example * Raster: path = "/path/to/file.tif" Subdataset: path = * "FORMAT:/path/to/file.tif:subdataset" - * @param path - * The path to the raster file. - * @param parentPath - * The path of the parent raster file. + * @param createInfo + * The create info for the raster. This should contain the following + * keys: + * - path: The path to the raster file. + * - parentPath: The path of the parent raster file. + * - driver: Optional: The driver short name of the raster file * @return * A MosaicRaster object. */ - def readRaster(path: String, parentPath: String): MosaicRasterGDAL + def readRaster(createInfo: Map[String, String]): MosaicRasterGDAL /** * Reads a raster from an in memory buffer. Use the buffer bytes to produce @@ -35,30 +37,32 @@ trait RasterReader extends Logging { * * @param contentBytes * The file bytes. - * @param parentPath - * The path of the parent raster file. - * @param driverShortName - * The driver short name of the raster file. + * @param createInfo + * The create info for the raster. This should contain the following + * keys: + * - parentPath: The path of the parent raster file. + * - driver: The driver short name of the raster file * @return * A MosaicRaster object. */ - def readRaster(contentBytes: Array[Byte], parentPath: String, driverShortName: String): MosaicRasterGDAL + def readRaster(contentBytes: Array[Byte], createInfo: Map[String, String]): MosaicRasterGDAL /** * Reads a raster band from a file system path. Reads a subdataset band if * the path is to a subdataset. + * * @example * Raster: path = "/path/to/file.tif" Subdataset: path = * "FORMAT:/path/to/file.tif:subdataset" - * @param path - * The path to the raster file. - * @param bandIndex - * The band index to read. - * @param parentPath - * The path of the parent raster file. + * @param createInfo + * The create info for the raster. This should contain the following + * keys: + * - path: The path to the raster file. + * - parentPath: The path of the parent raster file. + * - driver: Optional: The driver short name of the raster file * @return * A MosaicRaster object. */ - def readBand(path: String, bandIndex: Int, parentPath: String): MosaicRasterBandGDAL + def readBand(bandIndex: Int, createInfo: Map[String, String]): MosaicRasterBandGDAL } diff --git a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/gdal/GDALBuildVRT.scala b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/gdal/GDALBuildVRT.scala index 9e1e97401..cb79dc263 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/gdal/GDALBuildVRT.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/gdal/GDALBuildVRT.scala @@ -24,16 +24,17 @@ object GDALBuildVRT { val vrtOptionsVec = OperatorOptions.parseOptions(effectiveCommand) val vrtOptions = new BuildVRTOptions(vrtOptionsVec) val result = gdal.BuildVRT(outputPath, rasters.map(_.getRaster).toArray, vrtOptions) - if (result == null) { - throw new Exception(s""" - |Build VRT failed. - |Command: $effectiveCommand - |Error: ${gdal.GetLastErrorMsg} - |""".stripMargin) - } - // TODO: Figure out multiple parents, should this be an array? + val errorMsg = gdal.GetLastErrorMsg + val createInfo = Map( + "path" -> outputPath, + "parentPath" -> rasters.head.getParentPath, + "driver" -> "VRT", + "last_command" -> effectiveCommand, + "last_error" -> errorMsg, + "all_parents" -> rasters.map(_.getParentPath).mkString(";") + ) // VRT files are just meta files, mem size doesnt make much sense so we keep -1 - MosaicRasterGDAL(result, outputPath, rasters.head.getParentPath, "VRT", -1).flushCache() + MosaicRasterGDAL(result, createInfo, -1).flushCache() } } diff --git a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/gdal/GDALCalc.scala b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/gdal/GDALCalc.scala index fa92c3b37..e22228817 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/gdal/GDALCalc.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/gdal/GDALCalc.scala @@ -3,6 +3,7 @@ package com.databricks.labs.mosaic.core.raster.operator.gdal import com.databricks.labs.mosaic.core.raster.api.GDAL import com.databricks.labs.mosaic.core.raster.gdal.{MosaicRasterGDAL, MosaicRasterWriteOptions} import com.databricks.labs.mosaic.utils.SysUtils +import org.gdal.gdal.gdal /** GDALCalc is a helper object for executing GDAL Calc commands. */ object GDALCalc { @@ -33,18 +34,26 @@ object GDALCalc { val effectiveCommand = OperatorOptions.appendOptions(gdalCalcCommand, MosaicRasterWriteOptions.GTiff) val toRun = effectiveCommand.replace("gdal_calc", gdal_calc) val commandRes = SysUtils.runCommand(s"python3 $toRun") - if (commandRes._1.startsWith("ERROR")) { - throw new RuntimeException(s""" - |GDAL Calc command failed: - |$toRun - |STDOUT: - |${commandRes._2} - |STDERR: - |${commandRes._3} - |""".stripMargin) - } + val errorMsg = gdal.GetLastErrorMsg val result = GDAL.raster(resultPath, resultPath) - result + val createInfo = Map( + "path" -> resultPath, + "parentPath" -> resultPath, + "driver" -> "GTiff", + "last_command" -> effectiveCommand, + "last_error" -> errorMsg, + "all_parents" -> resultPath, + "full_error" -> s""" + |GDAL Calc command failed: + |GDAL err: + |$errorMsg + |STDOUT: + |${commandRes._2} + |STDERR: + |${commandRes._3} + |""".stripMargin + ) + result.copy(createInfo = createInfo) } } diff --git a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/gdal/GDALInfo.scala b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/gdal/GDALInfo.scala index 7a60a837a..d3ccd471b 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/gdal/GDALInfo.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/gdal/GDALInfo.scala @@ -25,14 +25,14 @@ object GDALInfo { val gdalInfo = gdal.GDALInfo(raster.getRaster, infoOptions) if (gdalInfo == null) { - throw new Exception(s""" - |GDAL Info failed. - |Command: $command - |Error: ${gdal.GetLastErrorMsg} - |""".stripMargin) + s""" + |GDAL Info failed. + |Command: $command + |Error: ${gdal.GetLastErrorMsg} + |""".stripMargin + } else { + gdalInfo } - - gdalInfo } } diff --git a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/gdal/GDALTranslate.scala b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/gdal/GDALTranslate.scala index fd24a0f73..2fb106fda 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/gdal/GDALTranslate.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/gdal/GDALTranslate.scala @@ -31,15 +31,19 @@ object GDALTranslate { val translateOptionsVec = OperatorOptions.parseOptions(effectiveCommand) val translateOptions = new TranslateOptions(translateOptionsVec) val result = gdal.Translate(outputPath, raster.getRaster, translateOptions) - if (result == null) { - throw new Exception(s""" - |Translate failed. - |Command: $effectiveCommand - |Error: ${gdal.GetLastErrorMsg} - |""".stripMargin) - } + val errorMsg = gdal.GetLastErrorMsg val size = Files.size(Paths.get(outputPath)) - raster.copy(raster = result, path = outputPath, memSize = size, driverShortName = writeOptions.format).flushCache() + val createInfo = Map( + "path" -> outputPath, + "parentPath" -> raster.getParentPath, + "driver" -> writeOptions.format, + "last_command" -> effectiveCommand, + "last_error" -> errorMsg, + "all_parents" -> raster.getParentPath + ) + raster + .copy(raster = result, createInfo = createInfo, memSize = size) + .flushCache() } } diff --git a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/gdal/GDALWarp.scala b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/gdal/GDALWarp.scala index ba6dce58d..516560a76 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/gdal/GDALWarp.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/gdal/GDALWarp.scala @@ -27,23 +27,18 @@ object GDALWarp { val warpOptionsVec = OperatorOptions.parseOptions(effectiveCommand) val warpOptions = new WarpOptions(warpOptionsVec) val result = gdal.Warp(outputPath, rasters.map(_.getRaster).toArray, warpOptions) - // TODO: Figure out multiple parents, should this be an array? // Format will always be the same as the first raster - if (result == null) { - throw new Exception(s""" - |Warp failed. - |Command: $effectiveCommand - |Error: ${gdal.GetLastErrorMsg} - |""".stripMargin) - } + val errorMsg = gdal.GetLastErrorMsg val size = Files.size(Paths.get(outputPath)) - rasters.head - .copy( - raster = result, - path = outputPath, - memSize = size - ) - .flushCache() + val createInfo = Map( + "path" -> outputPath, + "parentPath" -> rasters.head.getParentPath, + "driver" -> rasters.head.getWriteOptions.format, + "last_command" -> effectiveCommand, + "last_error" -> errorMsg, + "all_parents" -> rasters.map(_.getParentPath).mkString(";") + ) + rasters.head.copy(raster = result, createInfo = createInfo, memSize = size).flushCache() } } diff --git a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/retile/OverlappingTiles.scala b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/retile/OverlappingTiles.scala index 4e9f61c5e..072380666 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/retile/OverlappingTiles.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/retile/OverlappingTiles.scala @@ -69,7 +69,7 @@ object OverlappingTiles { val (_, valid) = tiles.flatten.partition(_._1) - valid.map(t => MosaicRasterTile(null, t._2, raster.getParentPath, raster.getDriversShortName)) + valid.map(t => MosaicRasterTile(null, t._2)) } diff --git a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/retile/RasterTessellate.scala b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/retile/RasterTessellate.scala index 701cf8cf1..9920af923 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/retile/RasterTessellate.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/retile/RasterTessellate.scala @@ -38,13 +38,13 @@ object RasterTessellate { val cellID = cell.cellIdAsLong(indexSystem) val isValidCell = indexSystem.isValid(cellID) if (!isValidCell) { - (false, MosaicRasterTile(cell.index, null, "", "")) + (false, MosaicRasterTile(cell.index, null)) } else { val cellRaster = tmpRaster.getRasterForCell(cellID, indexSystem, geometryAPI) val isValidRaster = !cellRaster.isEmpty ( isValidRaster, - MosaicRasterTile(cell.index, cellRaster, raster.getParentPath, raster.getDriversShortName) + MosaicRasterTile(cell.index, cellRaster) ) } }) diff --git a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/retile/ReTile.scala b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/retile/ReTile.scala index b12a8f847..7b218199e 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/retile/ReTile.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/retile/ReTile.scala @@ -58,7 +58,7 @@ object ReTile { val (_, valid) = tiles.partition(_._1) - valid.map(t => MosaicRasterTile(null, t._2, raster.getParentPath, raster.getDriversShortName)) + valid.map(t => MosaicRasterTile(null, t._2)) } diff --git a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/separate/SeparateBands.scala b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/separate/SeparateBands.scala index 25f73bf8b..9580cc441 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/separate/SeparateBands.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/raster/operator/separate/SeparateBands.scala @@ -5,7 +5,10 @@ import com.databricks.labs.mosaic.core.raster.operator.gdal.GDALTranslate import com.databricks.labs.mosaic.core.types.model.MosaicRasterTile import com.databricks.labs.mosaic.utils.PathUtils -/** ReTile is a helper object for splitting multi-band rasters into single-band-per-row. */ +/** + * ReTile is a helper object for splitting multi-band rasters into + * single-band-per-row. + */ object SeparateBands { /** @@ -24,11 +27,13 @@ object SeparateBands { val fileExtension = raster.getRasterFileExtension val rasterPath = PathUtils.createTmpFilePath(fileExtension) val shortDriver = raster.getDriversShortName + val outOptions = raster.getWriteOptions val result = GDALTranslate.executeTranslate( rasterPath, raster, - command = s"gdal_translate -of $shortDriver -b ${i + 1} -co COMPRESS=DEFLATE" + command = s"gdal_translate -of $shortDriver -b ${i + 1}", + writeOptions = outOptions ) val isEmpty = result.isEmpty @@ -38,13 +43,13 @@ object SeparateBands { if (isEmpty) dispose(result) - (isEmpty, result, i) + (isEmpty, result.copy(createInfo = result.createInfo ++ Map("bandIndex" -> (i + 1).toString)), i) } val (_, valid) = tiles.partition(_._1) - valid.map(t => new MosaicRasterTile(null, t._2, raster.getParentPath, raster.getDriversShortName)) + valid.map(t => new MosaicRasterTile(null, t._2)) } diff --git a/src/main/scala/com/databricks/labs/mosaic/core/types/RasterTileType.scala b/src/main/scala/com/databricks/labs/mosaic/core/types/RasterTileType.scala index 5203178e0..137d482ce 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/types/RasterTileType.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/types/RasterTileType.scala @@ -36,8 +36,7 @@ object RasterTileType { Array( StructField("index_id", idType), StructField("raster", rasterType), - StructField("parentPath", StringType), - StructField("driver", StringType) + StructField("metadata", MapType(StringType, StringType)) ) ) } diff --git a/src/main/scala/com/databricks/labs/mosaic/core/types/model/MosaicRasterTile.scala b/src/main/scala/com/databricks/labs/mosaic/core/types/model/MosaicRasterTile.scala index a0710dbe9..bf36ea8f2 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/types/model/MosaicRasterTile.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/types/model/MosaicRasterTile.scala @@ -3,6 +3,7 @@ package com.databricks.labs.mosaic.core.types.model import com.databricks.labs.mosaic.core.index.IndexSystem import com.databricks.labs.mosaic.core.raster.api.GDAL import com.databricks.labs.mosaic.core.raster.gdal.MosaicRasterGDAL +import com.databricks.labs.mosaic.expressions.raster.{buildMapString, extractMap} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types.{BinaryType, DataType, LongType, StringType} import org.apache.spark.unsafe.types.UTF8String @@ -16,18 +17,16 @@ import scala.util.{Failure, Success, Try} * Index ID. * @param raster * Raster instance corresponding to the tile. - * @param parentPath - * Parent path of the raster. - * @param driver - * Driver used to read the raster. */ case class MosaicRasterTile( index: Either[Long, String], - raster: MosaicRasterGDAL, - parentPath: String, - driver: String + raster: MosaicRasterGDAL ) { + def parentPath: String = raster.createInfo("parentPath") + + def driver: String = raster.createInfo("driver") + def getIndex: Either[Long, String] = index def getParentPath: String = parentPath @@ -57,18 +56,8 @@ case class MosaicRasterTile( (indexSystem.getCellIdDataType, index) match { case (_: LongType, Left(_)) => this case (_: StringType, Right(_)) => this - case (_: LongType, Right(value)) => new MosaicRasterTile( - index = Left(indexSystem.parse(value)), - raster = raster, - parentPath = parentPath, - driver = driver - ) - case (_: StringType, Left(value)) => new MosaicRasterTile( - index = Right(indexSystem.format(value)), - raster = raster, - parentPath = parentPath, - driver = driver - ) + case (_: LongType, Right(value)) => this.copy(index = Left(indexSystem.parse(value))) + case (_: StringType, Left(value)) => this.copy(index = Right(indexSystem.format(value))) case _ => throw new IllegalArgumentException("Invalid cell id data type") } } @@ -110,22 +99,21 @@ case class MosaicRasterTile( def serialize( rasterDataType: DataType ): InternalRow = { - val parentPathUTF8 = UTF8String.fromString(parentPath) - val driverUTF8 = UTF8String.fromString(driver) val encodedRaster = encodeRaster(rasterDataType) + val mapData = buildMapString(raster.createInfo) if (Option(index).isDefined) { if (index.isLeft) InternalRow.fromSeq( - Seq(index.left.get, encodedRaster, parentPathUTF8, driverUTF8) + Seq(index.left.get, encodedRaster, mapData) ) else { // Copy from tmp to checkpoint. // Have to use GDAL Driver to do this since sidecar files are not copied by spark. InternalRow.fromSeq( - Seq(UTF8String.fromString(index.right.get), encodedRaster, parentPathUTF8, driverUTF8) + Seq(UTF8String.fromString(index.right.get), encodedRaster, mapData) ) } } else { - InternalRow.fromSeq(Seq(null, encodedRaster, parentPathUTF8, driverUTF8)) + InternalRow.fromSeq(Seq(null, encodedRaster, mapData)) } } @@ -147,6 +135,7 @@ case class MosaicRasterTile( case Success(value) => value.toInt case Failure(_) => -1 } + } /** Companion object. */ @@ -165,18 +154,17 @@ object MosaicRasterTile { def deserialize(row: InternalRow, idDataType: DataType, rasterType: DataType): MosaicRasterTile = { val index = row.get(0, idDataType) val rawRaster = row.get(1, rasterType) - val parentPath = row.get(2, StringType).toString - val driver = row.get(3, StringType).toString - val raster = GDAL.readRaster(rawRaster, parentPath, driver, rasterType) + val createInfo = extractMap(row.getMap(2)) + val raster = GDAL.readRaster(rawRaster, createInfo, rasterType) // noinspection TypeCheckCanBeMatch if (Option(index).isDefined) { if (index.isInstanceOf[Long]) { - new MosaicRasterTile(Left(index.asInstanceOf[Long]), raster, parentPath, driver) + new MosaicRasterTile(Left(index.asInstanceOf[Long]), raster) } else { - new MosaicRasterTile(Right(index.asInstanceOf[UTF8String].toString), raster, parentPath, driver) + new MosaicRasterTile(Right(index.asInstanceOf[UTF8String].toString), raster) } } else { - new MosaicRasterTile(null, raster, parentPath, driver) + new MosaicRasterTile(null, raster) } } diff --git a/src/main/scala/com/databricks/labs/mosaic/datasource/gdal/ReTileOnRead.scala b/src/main/scala/com/databricks/labs/mosaic/datasource/gdal/ReTileOnRead.scala index a38e76900..867167c58 100644 --- a/src/main/scala/com/databricks/labs/mosaic/datasource/gdal/ReTileOnRead.scala +++ b/src/main/scala/com/databricks/labs/mosaic/datasource/gdal/ReTileOnRead.scala @@ -134,8 +134,9 @@ object ReTileOnRead extends ReadStrategy { */ def localSubdivide(inPath: String, parentPath: String, sizeInMB: Int): Seq[MosaicRasterTile] = { val cleanPath = PathUtils.getCleanPath(inPath) - val raster = MosaicRasterGDAL.readRaster(cleanPath, parentPath) - val inTile = new MosaicRasterTile(null, raster, parentPath, raster.getDriversShortName) + val createInfo = Map("path" -> cleanPath, "parentPath" -> parentPath) + val raster = MosaicRasterGDAL.readRaster(createInfo) + val inTile = new MosaicRasterTile(null, raster) val tiles = BalancedSubdivision.splitRaster(inTile, sizeInMB) RasterCleaner.dispose(raster) RasterCleaner.dispose(inTile) diff --git a/src/main/scala/com/databricks/labs/mosaic/datasource/gdal/ReadAsPath.scala b/src/main/scala/com/databricks/labs/mosaic/datasource/gdal/ReadAsPath.scala new file mode 100644 index 000000000..0973146ef --- /dev/null +++ b/src/main/scala/com/databricks/labs/mosaic/datasource/gdal/ReadAsPath.scala @@ -0,0 +1,124 @@ +package com.databricks.labs.mosaic.datasource.gdal + +import com.databricks.labs.mosaic.core.index.{IndexSystem, IndexSystemFactory} +import com.databricks.labs.mosaic.core.raster.gdal.MosaicRasterGDAL +import com.databricks.labs.mosaic.core.raster.io.RasterCleaner +import com.databricks.labs.mosaic.core.types.RasterTileType +import com.databricks.labs.mosaic.core.types.model.MosaicRasterTile +import com.databricks.labs.mosaic.datasource.Utils +import com.databricks.labs.mosaic.datasource.gdal.GDALFileFormat._ +import com.databricks.labs.mosaic.utils.PathUtils +import org.apache.hadoop.fs.{FileStatus, FileSystem} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types._ + +import java.nio.file.{Files, Paths} + +/** An object defining the retiling read strategy for the GDAL file format. */ +object ReadAsPath extends ReadStrategy { + + val tileDataType: DataType = StringType + + // noinspection DuplicatedCode + /** + * Returns the schema of the GDAL file format. + * @note + * Different read strategies can have different schemas. This is because + * the schema is defined by the read strategy. For retiling we always use + * checkpoint location. In this case rasters are stored off spark rows. + * If you need the tiles in memory please load them from path stored in + * the tile returned by the reader. + * + * @param options + * Options passed to the reader. + * @param files + * List of files to read. + * @param parentSchema + * Parent schema. + * @param sparkSession + * Spark session. + * + * @return + * Schema of the GDAL file format. + */ + override def getSchema( + options: Map[String, String], + files: Seq[FileStatus], + parentSchema: StructType, + sparkSession: SparkSession + ): StructType = { + val trimmedSchema = parentSchema.filter(field => field.name != CONTENT && field.name != LENGTH) + val indexSystem = IndexSystemFactory.getIndexSystem(sparkSession) + StructType(trimmedSchema) + .add(StructField(UUID, LongType, nullable = false)) + .add(StructField(X_SIZE, IntegerType, nullable = false)) + .add(StructField(Y_SIZE, IntegerType, nullable = false)) + .add(StructField(BAND_COUNT, IntegerType, nullable = false)) + .add(StructField(METADATA, MapType(StringType, StringType), nullable = false)) + .add(StructField(SUBDATASETS, MapType(StringType, StringType), nullable = false)) + .add(StructField(SRID, IntegerType, nullable = false)) + .add(StructField(LENGTH, LongType, nullable = false)) + // Note that for retiling we always use checkpoint location. + // In this case rasters are stored off spark rows. + // If you need the tiles in memory please load them from path stored in the tile returned by the reader. + .add(StructField(TILE, RasterTileType(indexSystem.getCellIdDataType, tileDataType), nullable = false)) + } + + /** + * Reads the content of the file. + * @param status + * File status. + * @param fs + * File system. + * @param requiredSchema + * Required schema. + * @param options + * Options passed to the reader. + * @param indexSystem + * Index system. + * + * @return + * Iterator of internal rows. + */ + override def read( + status: FileStatus, + fs: FileSystem, + requiredSchema: StructType, + options: Map[String, String], + indexSystem: IndexSystem + ): Iterator[InternalRow] = { + val inPath = status.getPath.toString + val uuid = getUUID(status) + + val tmpPath = PathUtils.copyToTmp(inPath) + val createInfo = Map("path" -> tmpPath, "parentPath" -> inPath) + val raster = MosaicRasterGDAL.readRaster(createInfo) + val tile = MosaicRasterTile(null, raster) + + val trimmedSchema = StructType(requiredSchema.filter(field => field.name != TILE)) + val fields = trimmedSchema.fieldNames.map { + case PATH => status.getPath.toString + case MODIFICATION_TIME => status.getModificationTime + case UUID => uuid + case X_SIZE => tile.getRaster.xSize + case Y_SIZE => tile.getRaster.ySize + case BAND_COUNT => tile.getRaster.numBands + case METADATA => tile.getRaster.metadata + case SUBDATASETS => tile.getRaster.subdatasets + case SRID => tile.getRaster.SRID + case LENGTH => tile.getRaster.getMemSize + case other => throw new RuntimeException(s"Unsupported field name: $other") + } + // Writing to bytes is destructive so we delay reading content and content length until the last possible moment + val row = Utils.createRow(fields ++ Seq(tile.formatCellId(indexSystem).serialize(tileDataType))) + RasterCleaner.dispose(tile) + + val rows = Seq(row) + + Files.deleteIfExists(Paths.get(tmpPath)) + + rows.iterator + } + +} diff --git a/src/main/scala/com/databricks/labs/mosaic/datasource/gdal/ReadInMemory.scala b/src/main/scala/com/databricks/labs/mosaic/datasource/gdal/ReadInMemory.scala index 15ddec2ed..7e6687079 100644 --- a/src/main/scala/com/databricks/labs/mosaic/datasource/gdal/ReadInMemory.scala +++ b/src/main/scala/com/databricks/labs/mosaic/datasource/gdal/ReadInMemory.scala @@ -6,12 +6,12 @@ import com.databricks.labs.mosaic.core.raster.io.RasterCleaner import com.databricks.labs.mosaic.core.types.RasterTileType import com.databricks.labs.mosaic.datasource.Utils import com.databricks.labs.mosaic.datasource.gdal.GDALFileFormat._ +import com.databricks.labs.mosaic.expressions.raster.buildMapString import com.databricks.labs.mosaic.utils.PathUtils import org.apache.hadoop.fs.{FileStatus, FileSystem} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String /** An object defining the in memory read strategy for the GDAL file format. */ object ReadInMemory extends ReadStrategy { @@ -78,9 +78,12 @@ object ReadInMemory extends ReadStrategy { ): Iterator[InternalRow] = { val inPath = status.getPath.toString val readPath = PathUtils.getCleanPath(inPath) - val driverShortName = MosaicRasterGDAL.identifyDriver(readPath) val contentBytes: Array[Byte] = readContent(fs, status) - val raster = MosaicRasterGDAL.readRaster(readPath, inPath) + val createInfo = Map( + "path" -> readPath, + "parentPath" -> inPath + ) + val raster = MosaicRasterGDAL.readRaster(createInfo) val uuid = getUUID(status) val fields = requiredSchema.fieldNames.filter(_ != TILE).map { @@ -96,8 +99,9 @@ object ReadInMemory extends ReadStrategy { case SRID => raster.SRID case other => throw new RuntimeException(s"Unsupported field name: $other") } + val mapData = buildMapString(raster.createInfo) val rasterTileSer = InternalRow.fromSeq( - Seq(null, contentBytes, UTF8String.fromString(inPath), UTF8String.fromString(driverShortName), null) + Seq(null, contentBytes, mapData) ) val row = Utils.createRow( fields ++ Seq(rasterTileSer) diff --git a/src/main/scala/com/databricks/labs/mosaic/datasource/gdal/ReadStrategy.scala b/src/main/scala/com/databricks/labs/mosaic/datasource/gdal/ReadStrategy.scala index cacc1c133..ab141b069 100644 --- a/src/main/scala/com/databricks/labs/mosaic/datasource/gdal/ReadStrategy.scala +++ b/src/main/scala/com/databricks/labs/mosaic/datasource/gdal/ReadStrategy.scala @@ -72,6 +72,7 @@ object ReadStrategy { readStrategy match { case MOSAIC_RASTER_READ_IN_MEMORY => ReadInMemory case MOSAIC_RASTER_RE_TILE_ON_READ => ReTileOnRead + case MOSAIC_RASTER_READ_AS_PATH => ReadAsPath case _ => ReadInMemory } diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_CombineAvg.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_CombineAvg.scala index f94eae6e8..de163ca37 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_CombineAvg.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_CombineAvg.scala @@ -28,12 +28,7 @@ case class RST_CombineAvg( /** Combines the rasters using average of pixels. */ override def rasterTransform(tiles: Seq[MosaicRasterTile]): Any = { val index = if (tiles.map(_.getIndex).groupBy(identity).size == 1) tiles.head.getIndex else null - MosaicRasterTile( - index, - CombineAVG.compute(tiles.map(_.getRaster)), - tiles.head.getParentPath, - tiles.head.getDriver - ) + MosaicRasterTile(index, CombineAVG.compute(tiles.map(_.getRaster))) } } diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_CombineAvgAgg.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_CombineAvgAgg.scala index be12bd12c..3bf9248c2 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_CombineAvgAgg.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_CombineAvgAgg.scala @@ -79,11 +79,8 @@ case class RST_CombineAvgAgg( // If merging multiple index rasters, the index value is dropped val idx = if (tiles.map(_.getIndex).groupBy(identity).size == 1) tiles.head.getIndex else null var combined = CombineAVG.compute(tiles.map(_.getRaster)).flushCache() - // TODO: should parent path be an array? - val parentPath = tiles.head.getParentPath - val driver = tiles.head.getDriver - val result = MosaicRasterTile(idx, combined, parentPath, driver) + val result = MosaicRasterTile(idx, combined) .formatCellId(IndexSystemFactory.getIndexSystem(expressionConfig.getIndexSystem)) .serialize(tileType) diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_DerivedBand.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_DerivedBand.scala index 3e7de13b5..459f1774a 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_DerivedBand.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_DerivedBand.scala @@ -37,9 +37,7 @@ case class RST_DerivedBand( val index = if (tiles.map(_.getIndex).groupBy(identity).size == 1) tiles.head.getIndex else null MosaicRasterTile( index, - PixelCombineRasters.combine(tiles.map(_.getRaster), pythonFunc, funcName), - tiles.head.getParentPath, - tiles.head.getDriver + PixelCombineRasters.combine(tiles.map(_.getRaster), pythonFunc, funcName) ) } diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_DerivedBandAgg.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_DerivedBandAgg.scala index 7d972b6e1..836b79cbd 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_DerivedBandAgg.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_DerivedBandAgg.scala @@ -89,11 +89,8 @@ case class RST_DerivedBandAgg( val idx = if (tiles.map(_.getIndex).groupBy(identity).size == 1) tiles.head.getIndex else null var combined = PixelCombineRasters.combine(tiles.map(_.getRaster), pythonFunc, funcName) - // TODO: should parent path be an array? - val parentPath = tiles.head.getParentPath - val driver = tiles.head.getDriver - val result = MosaicRasterTile(idx, combined, parentPath, driver) + val result = MosaicRasterTile(idx, combined) .formatCellId(IndexSystemFactory.getIndexSystem(expressionConfig.getIndexSystem)) .serialize(BinaryType) diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_FromContent.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_FromContent.scala index 956c9c049..1021eb083 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_FromContent.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_FromContent.scala @@ -68,8 +68,9 @@ case class RST_FromContent( val targetSize = sizeInMB.eval(input).asInstanceOf[Int] if (targetSize <= 0 || rasterArr.length <= targetSize) { // - no split required - var raster = MosaicRasterGDAL.readRaster(rasterArr, PathUtils.NO_PATH_STRING, driver) - var tile = MosaicRasterTile(null, raster, PathUtils.NO_PATH_STRING, driver) + val createInfo = Map("parentPath" -> PathUtils.NO_PATH_STRING, "driver" -> driver) + var raster = MosaicRasterGDAL.readRaster(rasterArr, createInfo) + var tile = MosaicRasterTile(null, raster) val row = tile.formatCellId(indexSystem).serialize(tileType) RasterCleaner.dispose(raster) RasterCleaner.dispose(tile) diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_FromFile.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_FromFile.scala index ddd4c6af2..cd5808f30 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_FromFile.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_FromFile.scala @@ -15,7 +15,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.expressions.{CollectionGenerator, Expression, Literal, NullIntolerant} -import org.apache.spark.sql.types.{BinaryType, DataType, IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import java.nio.file.{Files, Paths, StandardCopyOption} @@ -66,8 +66,9 @@ case class RST_FromFile( val driver = MosaicRasterGDAL.identifyDriver(path) val targetSize = sizeInMB.eval(input).asInstanceOf[Int] if (targetSize <= 0 && Files.size(Paths.get(readPath)) <= Integer.MAX_VALUE) { - var raster = MosaicRasterGDAL.readRaster(readPath, path) - var tile = MosaicRasterTile(null, raster, path, raster.getDriversShortName) + val createInfo = Map("path" -> readPath, "parentPath" -> path) + var raster = MosaicRasterGDAL.readRaster(createInfo) + var tile = MosaicRasterTile(null, raster) val row = tile.formatCellId(indexSystem).serialize(tileType) RasterCleaner.dispose(raster) RasterCleaner.dispose(tile) diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_MakeTiles.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_MakeTiles.scala index 586337556..12c70d025 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_MakeTiles.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_MakeTiles.scala @@ -125,8 +125,9 @@ case class RST_MakeTiles( if (targetSize <= 0 && inputSize <= Integer.MAX_VALUE) { // - no split required - val raster = GDAL.readRaster(rawInput, PathUtils.NO_PATH_STRING, driver, inputExpr.dataType) - val tile = MosaicRasterTile(null, raster, PathUtils.NO_PATH_STRING, driver) + val createInfo = Map("parentPath" -> PathUtils.NO_PATH_STRING, "driver" -> driver) + val raster = GDAL.readRaster(rawInput, createInfo, inputExpr.dataType) + val tile = MosaicRasterTile(null, raster) val row = tile.formatCellId(indexSystem).serialize(tileType) RasterCleaner.dispose(raster) RasterCleaner.dispose(tile) diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_MapAlgebra.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_MapAlgebra.scala index 606f8cc72..bc2aea949 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_MapAlgebra.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_MapAlgebra.scala @@ -45,12 +45,8 @@ case class RST_MapAlgebra( val resultPath = PathUtils.createTmpFilePath(extension) val command = parseSpec(jsonSpec, resultPath, tiles) val index = if (tiles.map(_.getIndex).groupBy(identity).size == 1) tiles.head.getIndex else null - MosaicRasterTile( - index, - GDALCalc.executeCalc(command, resultPath), - resultPath, - tiles.head.getDriver - ) + val result = GDALCalc.executeCalc(command, resultPath) + MosaicRasterTile(index, result) } def parseSpec(jsonSpec: String, resultPath: String, tiles: Seq[MosaicRasterTile]): String = { diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeAgg.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeAgg.scala index b639a42da..ae56a01ab 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeAgg.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeAgg.scala @@ -80,11 +80,8 @@ case class RST_MergeAgg( // If merging multiple index rasters, the index value is dropped val idx = if (tiles.map(_.getIndex).groupBy(identity).size == 1) tiles.head.getIndex else null var merged = MergeRasters.merge(tiles.map(_.getRaster)).flushCache() - // TODO: should parent path be an array? - val parentPath = tiles.head.getParentPath - val driver = tiles.head.getDriver - val result = MosaicRasterTile(idx, merged, parentPath, driver) + val result = MosaicRasterTile(idx, merged) .formatCellId(IndexSystemFactory.getIndexSystem(expressionConfig.getIndexSystem)) .serialize(BinaryType) diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_SetSRID.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_SetSRID.scala index a3d44289a..2aabb3df9 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_SetSRID.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_SetSRID.scala @@ -1,8 +1,6 @@ package com.databricks.labs.mosaic.expressions.raster import com.databricks.labs.mosaic.core.geometry.api.GeometryAPI -import com.databricks.labs.mosaic.core.raster.io.RasterCleaner -import com.databricks.labs.mosaic.core.raster.operator.clip.RasterClipByVector import com.databricks.labs.mosaic.core.types.RasterTileType import com.databricks.labs.mosaic.core.types.model.MosaicRasterTile import com.databricks.labs.mosaic.expressions.base.{GenericExpressionFactory, WithExpressionInfo} @@ -11,6 +9,7 @@ import com.databricks.labs.mosaic.functions.MosaicExpressionConfig import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.expressions.{Expression, NullIntolerant} +import org.apache.spark.sql.types.DataType /** The expression for clipping a raster by a vector. */ case class RST_SetSRID( @@ -19,14 +18,15 @@ case class RST_SetSRID( expressionConfig: MosaicExpressionConfig ) extends Raster1ArgExpression[RST_SetSRID]( rastersExpr, - sridExpr, - RasterTileType(expressionConfig.getCellIdType), + sridExpr, returnsRaster = true, expressionConfig = expressionConfig ) with NullIntolerant with CodegenFallback { + override def dataType: DataType = RasterTileType(expressionConfig.getCellIdType, rastersExpr) + val geometryAPI: GeometryAPI = GeometryAPI(expressionConfig.getGeometryAPI) /** diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_Transform.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_Transform.scala new file mode 100644 index 000000000..7681f2bba --- /dev/null +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_Transform.scala @@ -0,0 +1,61 @@ +package com.databricks.labs.mosaic.expressions.raster + +import com.databricks.labs.mosaic.core.raster.operator.proj.RasterProject +import com.databricks.labs.mosaic.core.types.RasterTileType +import com.databricks.labs.mosaic.core.types.model.MosaicRasterTile +import com.databricks.labs.mosaic.expressions.base.{GenericExpressionFactory, WithExpressionInfo} +import com.databricks.labs.mosaic.expressions.raster.base.Raster1ArgExpression +import com.databricks.labs.mosaic.functions.MosaicExpressionConfig +import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.expressions.{Expression, NullIntolerant} +import org.apache.spark.sql.types._ +import org.gdal.osr.SpatialReference + +/** Returns the upper left x of the raster. */ +case class RST_Transform( + tileExpr: Expression, + srid: Expression, + expressionConfig: MosaicExpressionConfig +) extends Raster1ArgExpression[RST_Transform]( + tileExpr, + srid, + returnsRaster = true, + expressionConfig + ) + with NullIntolerant + with CodegenFallback { + + override def dataType: DataType = RasterTileType(expressionConfig.getCellIdType, tileExpr) + + /** Returns the upper left x of the raster. */ + override def rasterTransform(tile: MosaicRasterTile, arg1: Any): Any = { + val srid = arg1.asInstanceOf[Int] + val sReff = new SpatialReference() + sReff.ImportFromEPSG(srid) + sReff.SetAxisMappingStrategy(org.gdal.osr.osrConstants.OAMS_TRADITIONAL_GIS_ORDER) + val result = RasterProject.project(tile.raster, sReff) + tile.copy(raster = result) + } + +} + +/** Expression info required for the expression registration for spark SQL. */ +object RST_Transform extends WithExpressionInfo { + + override def name: String = "rst_transform" + + override def usage: String = "_FUNC_(expr1) - Returns an array containing mean values for each band." + + override def example: String = + """ + | Examples: + | > SELECT _FUNC_(raster_tile); + | [1.123, 2.123, 3.123] + | """.stripMargin + + override def builder(expressionConfig: MosaicExpressionConfig): FunctionBuilder = { + GenericExpressionFactory.getBaseBuilder[RST_Avg](1, expressionConfig) + } + +} diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/package.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/package.scala index a229aae89..7db83db25 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/package.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/package.scala @@ -1,6 +1,6 @@ package com.databricks.labs.mosaic.expressions -import org.apache.spark.sql.catalyst.util.{ArrayBasedMapBuilder, ArrayBasedMapData, ArrayData} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapBuilder, ArrayBasedMapData, ArrayData, MapData} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -21,8 +21,8 @@ package object raster { * The measure type of the resulting pixel value. * * @return - * The datatype to be used for serialization of the result of - * [[com.databricks.labs.mosaic.expressions.raster.base.RasterToGridExpression]]. + * The datatype to be used for serialization of the result of + * [[com.databricks.labs.mosaic.expressions.raster.base.RasterToGridExpression]]. */ def RasterToGridType(cellIDType: DataType, measureType: DataType): DataType = { ArrayType( @@ -49,6 +49,19 @@ package object raster { mapBuilder.build() } + /** + * Extracts a scala Map[String, String] from a spark map. + * @param mapData + * The map to be used. + * @return + * Deserialized map. + */ + def extractMap(mapData: MapData): Map[String, String] = { + val keys = mapData.keyArray().toArray[UTF8String](StringType).map(_.toString) + val values = mapData.valueArray().toArray[UTF8String](StringType).map(_.toString) + keys.zip(values).toMap + } + /** * Builds a spark map from a scala Map[String, Double]. * @param metaData diff --git a/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala b/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala index cc0a611dd..293d27593 100644 --- a/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala +++ b/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala @@ -305,6 +305,7 @@ class MosaicContext(indexSystem: IndexSystem, geometryAPI: GeometryAPI) extends mosaicRegistry.registerExpression[RST_Subdatasets](expressionConfig) mosaicRegistry.registerExpression[RST_Summary](expressionConfig) mosaicRegistry.registerExpression[RST_Tessellate](expressionConfig) + mosaicRegistry.registerExpression[RST_Transform](expressionConfig) mosaicRegistry.registerExpression[RST_FromContent](expressionConfig) mosaicRegistry.registerExpression[RST_FromFile](expressionConfig) mosaicRegistry.registerExpression[RST_ToOverlappingTiles](expressionConfig) @@ -749,6 +750,8 @@ class MosaicContext(indexSystem: IndexSystem, geometryAPI: GeometryAPI) extends def rst_summary(raster: Column): Column = ColumnAdapter(RST_Summary(raster.expr, expressionConfig)) def rst_tessellate(raster: Column, resolution: Column): Column = ColumnAdapter(RST_Tessellate(raster.expr, resolution.expr, expressionConfig)) + def rst_transform(raster: Column, srid: Column): Column = + ColumnAdapter(RST_Transform(raster.expr, srid.expr, expressionConfig)) def rst_tessellate(raster: Column, resolution: Int): Column = ColumnAdapter(RST_Tessellate(raster.expr, lit(resolution).expr, expressionConfig)) def rst_fromcontent(raster: Column, driver: Column): Column = diff --git a/src/main/scala/com/databricks/labs/mosaic/gdal/MosaicGDAL.scala b/src/main/scala/com/databricks/labs/mosaic/gdal/MosaicGDAL.scala index c7676a88d..b9e972d6f 100644 --- a/src/main/scala/com/databricks/labs/mosaic/gdal/MosaicGDAL.scala +++ b/src/main/scala/com/databricks/labs/mosaic/gdal/MosaicGDAL.scala @@ -47,6 +47,7 @@ object MosaicGDAL extends Logging { /** Configures the GDAL environment. */ def configureGDAL(mosaicConfig: MosaicExpressionConfig): Unit = { val CPL_TMPDIR = MosaicContext.tmpDir + val GDAL_PAM_PROXY_DIR = MosaicContext.tmpDir gdal.SetConfigOption("GDAL_VRT_ENABLE_PYTHON", "YES") gdal.SetConfigOption("GDAL_DISABLE_READDIR_ON_OPEN", "TRUE") gdal.SetConfigOption("CPL_TMPDIR", CPL_TMPDIR) diff --git a/src/test/scala/com/databricks/labs/mosaic/core/raster/TestRasterBandGDAL.scala b/src/test/scala/com/databricks/labs/mosaic/core/raster/TestRasterBandGDAL.scala index 1337ae6d2..88c0f4bbb 100644 --- a/src/test/scala/com/databricks/labs/mosaic/core/raster/TestRasterBandGDAL.scala +++ b/src/test/scala/com/databricks/labs/mosaic/core/raster/TestRasterBandGDAL.scala @@ -10,10 +10,11 @@ class TestRasterBandGDAL extends SharedSparkSessionGDAL { test("Read band metadata and pixel data from GeoTIFF file.") { assume(System.getProperty("os.name") == "Linux") - val testRaster = MosaicRasterGDAL.readRaster( - filePath("/modis/MCD43A4.A2018185.h10v07.006.2018194033728_B01.TIF"), - filePath("/modis/MCD43A4.A2018185.h10v07.006.2018194033728_B01.TIF") + val createInfo = Map( + "path" -> filePath("/modis/MCD43A4.A2018185.h10v07.006.2018194033728_B01.TIF"), + "parentPath" -> filePath("/modis/MCD43A4.A2018185.h10v07.006.2018194033728_B01.TIF") ) + val testRaster = MosaicRasterGDAL.readRaster(createInfo) val testBand = testRaster.getBand(1) testBand.getBand testBand.index shouldBe 1 @@ -36,10 +37,11 @@ class TestRasterBandGDAL extends SharedSparkSessionGDAL { test("Read band metadata and pixel data from a GRIdded Binary file.") { assume(System.getProperty("os.name") == "Linux") - val testRaster = MosaicRasterGDAL.readRaster( - filePath("/binary/grib-cams/adaptor.mars.internal-1650626995.380916-11651-14-ca8e7236-16ca-4e11-919d-bdbd5a51da35.grb"), - filePath("/binary/grib-cams/adaptor.mars.internal-1650626995.380916-11651-14-ca8e7236-16ca-4e11-919d-bdbd5a51da35.grb") + val createInfo = Map( + "path" -> filePath("/binary/grib-cams/adaptor.mars.internal-1650626995.380916-11651-14-ca8e7236-16ca-4e11-919d-bdbd5a51da35.grb"), + "parentPath" -> filePath("/binary/grib-cams/adaptor.mars.internal-1650626995.380916-11651-14-ca8e7236-16ca-4e11-919d-bdbd5a51da35.grb") ) + val testRaster = MosaicRasterGDAL.readRaster(createInfo) val testBand = testRaster.getBand(1) testBand.description shouldBe "1[-] HYBL=\"Hybrid level\"" testBand.dataType shouldBe 7 @@ -55,15 +57,17 @@ class TestRasterBandGDAL extends SharedSparkSessionGDAL { test("Read band metadata and pixel data from a NetCDF file.") { assume(System.getProperty("os.name") == "Linux") - val superRaster = MosaicRasterGDAL.readRaster( - filePath("/binary/netcdf-coral/ct5km_baa-max-7d_v3.1_20220101.nc"), - filePath("/binary/netcdf-coral/ct5km_baa-max-7d_v3.1_20220101.nc") + val createInfo = Map( + "path" -> filePath("/binary/netcdf-coral/ct5km_baa-max-7d_v3.1_20220101.nc"), + "parentPath" -> filePath("/binary/netcdf-coral/ct5km_baa-max-7d_v3.1_20220101.nc") ) + val superRaster = MosaicRasterGDAL.readRaster(createInfo) val subdatasetPath = superRaster.subdatasets("bleaching_alert_area") - val testRaster = MosaicRasterGDAL.readRaster( - subdatasetPath, - subdatasetPath + val sdCreate = Map( + "path" -> subdatasetPath, + "parentPath" -> subdatasetPath ) + val testRaster = MosaicRasterGDAL.readRaster(sdCreate) val testBand = testRaster.getBand(1) testBand.dataType shouldBe 1 diff --git a/src/test/scala/com/databricks/labs/mosaic/core/raster/TestRasterGDAL.scala b/src/test/scala/com/databricks/labs/mosaic/core/raster/TestRasterGDAL.scala index bb53d6b79..8bbe4b1f3 100644 --- a/src/test/scala/com/databricks/labs/mosaic/core/raster/TestRasterGDAL.scala +++ b/src/test/scala/com/databricks/labs/mosaic/core/raster/TestRasterGDAL.scala @@ -34,11 +34,12 @@ class TestRasterGDAL extends SharedSparkSessionGDAL { test("Read raster metadata from GeoTIFF file.") { assume(System.getProperty("os.name") == "Linux") - - val testRaster = MosaicRasterGDAL.readRaster( - filePath("/modis/MCD43A4.A2018185.h10v07.006.2018194033728_B01.TIF"), - filePath("/modis/MCD43A4.A2018185.h10v07.006.2018194033728_B01.TIF") + + val createInfo = Map( + "path" -> filePath("/modis/MCD43A4.A2018185.h10v07.006.2018194033728_B01.TIF"), + "parentPath" -> filePath("/modis/MCD43A4.A2018185.h10v07.006.2018194033728_B01.TIF") ) + val testRaster = MosaicRasterGDAL.readRaster(createInfo) testRaster.xSize shouldBe 2400 testRaster.ySize shouldBe 2400 testRaster.numBands shouldBe 1 @@ -56,10 +57,11 @@ class TestRasterGDAL extends SharedSparkSessionGDAL { test("Read raster metadata from a GRIdded Binary file.") { assume(System.getProperty("os.name") == "Linux") - val testRaster = MosaicRasterGDAL.readRaster( - filePath("/binary/grib-cams/adaptor.mars.internal-1650626995.380916-11651-14-ca8e7236-16ca-4e11-919d-bdbd5a51da35.grb"), - filePath("/binary/grib-cams/adaptor.mars.internal-1650626995.380916-11651-14-ca8e7236-16ca-4e11-919d-bdbd5a51da35.grb") + val createInfo = Map( + "path" -> filePath("/binary/grib-cams/adaptor.mars.internal-1650626995.380916-11651-14-ca8e7236-16ca-4e11-919d-bdbd5a51da35.grb"), + "parentPath" -> filePath("/binary/grib-cams/adaptor.mars.internal-1650626995.380916-11651-14-ca8e7236-16ca-4e11-919d-bdbd5a51da35.grb") ) + val testRaster = MosaicRasterGDAL.readRaster(createInfo) testRaster.xSize shouldBe 14 testRaster.ySize shouldBe 14 testRaster.numBands shouldBe 14 @@ -72,17 +74,19 @@ class TestRasterGDAL extends SharedSparkSessionGDAL { test("Read raster metadata from a NetCDF file.") { assume(System.getProperty("os.name") == "Linux") - - val superRaster = MosaicRasterGDAL.readRaster( - filePath("/binary/netcdf-coral/ct5km_baa-max-7d_v3.1_20220101.nc"), - filePath("/binary/netcdf-coral/ct5km_baa-max-7d_v3.1_20220101.nc") + + val createInfo = Map( + "path" -> filePath("/binary/netcdf-coral/ct5km_baa-max-7d_v3.1_20220101.nc"), + "parentPath" -> filePath("/binary/netcdf-coral/ct5km_baa-max-7d_v3.1_20220101.nc") ) + val superRaster = MosaicRasterGDAL.readRaster(createInfo) val subdatasetPath = superRaster.subdatasets("bleaching_alert_area") - val testRaster = MosaicRasterGDAL.readRaster( - subdatasetPath, - subdatasetPath + val sdCreateInfo = Map( + "path" -> subdatasetPath, + "parentPath" -> subdatasetPath ) + val testRaster = MosaicRasterGDAL.readRaster(sdCreateInfo) testRaster.xSize shouldBe 7200 testRaster.ySize shouldBe 3600 @@ -98,10 +102,11 @@ class TestRasterGDAL extends SharedSparkSessionGDAL { test("Raster pixel and extent sizes are correct.") { assume(System.getProperty("os.name") == "Linux") - val testRaster = MosaicRasterGDAL.readRaster( - filePath("/modis/MCD43A4.A2018185.h10v07.006.2018194033728_B01.TIF"), - filePath("/modis/MCD43A4.A2018185.h10v07.006.2018194033728_B01.TIF") + val createInfo = Map( + "path" -> filePath("/modis/MCD43A4.A2018185.h10v07.006.2018194033728_B01.TIF"), + "parentPath" -> filePath("/modis/MCD43A4.A2018185.h10v07.006.2018194033728_B01.TIF") ) + val testRaster = MosaicRasterGDAL.readRaster(createInfo) testRaster.pixelXSize - 463.312716527 < 0.0000001 shouldBe true testRaster.pixelYSize - -463.312716527 < 0.0000001 shouldBe true @@ -125,13 +130,18 @@ class TestRasterGDAL extends SharedSparkSessionGDAL { MosaicGDAL.setBlockSize(30) - val ds = gdalJNI.GetDriverByName("GTiff").Create("/mosaic_tmp/test.tif", 50, 50, 1, gdalconst.gdalconstConstants.GDT_Float32) + val ds = gdalJNI.GetDriverByName("GTiff").Create("/tmp/mosaic_tmp/test.tif", 50, 50, 1, gdalconst.gdalconstConstants.GDT_Float32) val values = 0 until 50 * 50 ds.GetRasterBand(1).WriteRaster(0, 0, 50, 50, values.toArray) ds.FlushCache() - var result = MosaicRasterGDAL(ds, "", "", "GTiff", -1).filter(5, "avg").flushCache() + val createInfo = Map( + "path" -> "", + "parentPath" -> "", + "driver" -> "GTiff" + ) + var result = MosaicRasterGDAL(ds, createInfo, -1).filter(5, "avg").flushCache() var resultValues = result.getBand(1).values @@ -158,7 +168,7 @@ class TestRasterGDAL extends SharedSparkSessionGDAL { // mode - result = MosaicRasterGDAL(ds, "", "", "GTiff", -1).filter(5, "mode").flushCache() + result = MosaicRasterGDAL(ds, createInfo, -1).filter(5, "mode").flushCache() resultValues = result.getBand(1).values @@ -194,136 +204,136 @@ class TestRasterGDAL extends SharedSparkSessionGDAL { inputMatrix(12)(12), inputMatrix(12)(13) ).groupBy(identity).maxBy(_._2.size)._1.toDouble - + // corner resultMatrix(49)(49) shouldBe Seq( - inputMatrix(47)(47), - inputMatrix(47)(48), - inputMatrix(47)(49), - inputMatrix(48)(47), - inputMatrix(48)(48), - inputMatrix(48)(49), - inputMatrix(49)(47), - inputMatrix(49)(48), - inputMatrix(49)(49) + inputMatrix(47)(47), + inputMatrix(47)(48), + inputMatrix(47)(49), + inputMatrix(48)(47), + inputMatrix(48)(48), + inputMatrix(48)(49), + inputMatrix(49)(47), + inputMatrix(49)(48), + inputMatrix(49)(49) ).groupBy(identity).maxBy(_._2.size)._1.toDouble - + // median - - result = MosaicRasterGDAL(ds, "", "", "GTiff", -1).filter(5, "median").flushCache() - + + result = MosaicRasterGDAL(ds, createInfo, -1).filter(5, "median").flushCache() + resultValues = result.getBand(1).values - + inputMatrix = values.toArray.grouped(50).toArray resultMatrix = resultValues.grouped(50).toArray - + // first block - + resultMatrix(10)(11) shouldBe Seq( - inputMatrix(8)(9), - inputMatrix(8)(10), - inputMatrix(8)(11), - inputMatrix(8)(12), - inputMatrix(8)(13), - inputMatrix(9)(9), - inputMatrix(9)(10), - inputMatrix(9)(11), - inputMatrix(9)(12), - inputMatrix(9)(13), - inputMatrix(10)(9), - inputMatrix(10)(10), - inputMatrix(10)(11), - inputMatrix(10)(12), - inputMatrix(10)(13), - inputMatrix(11)(9), - inputMatrix(11)(10), - inputMatrix(11)(11), - inputMatrix(11)(12), - inputMatrix(11)(13), - inputMatrix(12)(9), - inputMatrix(12)(10), - inputMatrix(12)(11), - inputMatrix(12)(12), - inputMatrix(12)(13) + inputMatrix(8)(9), + inputMatrix(8)(10), + inputMatrix(8)(11), + inputMatrix(8)(12), + inputMatrix(8)(13), + inputMatrix(9)(9), + inputMatrix(9)(10), + inputMatrix(9)(11), + inputMatrix(9)(12), + inputMatrix(9)(13), + inputMatrix(10)(9), + inputMatrix(10)(10), + inputMatrix(10)(11), + inputMatrix(10)(12), + inputMatrix(10)(13), + inputMatrix(11)(9), + inputMatrix(11)(10), + inputMatrix(11)(11), + inputMatrix(11)(12), + inputMatrix(11)(13), + inputMatrix(12)(9), + inputMatrix(12)(10), + inputMatrix(12)(11), + inputMatrix(12)(12), + inputMatrix(12)(13) ).sorted.apply(12).toDouble - + // min filter - - result = MosaicRasterGDAL(ds, "", "", "GTiff", -1).filter(5, "min").flushCache() - + + result = MosaicRasterGDAL(ds, createInfo, -1).filter(5, "min").flushCache() + resultValues = result.getBand(1).values - + inputMatrix = values.toArray.grouped(50).toArray resultMatrix = resultValues.grouped(50).toArray - + // first block - + resultMatrix(10)(11) shouldBe Seq( - inputMatrix(8)(9), - inputMatrix(8)(10), - inputMatrix(8)(11), - inputMatrix(8)(12), - inputMatrix(8)(13), - inputMatrix(9)(9), - inputMatrix(9)(10), - inputMatrix(9)(11), - inputMatrix(9)(12), - inputMatrix(9)(13), - inputMatrix(10)(9), - inputMatrix(10)(10), - inputMatrix(10)(11), - inputMatrix(10)(12), - inputMatrix(10)(13), - inputMatrix(11)(9), - inputMatrix(11)(10), - inputMatrix(11)(11), - inputMatrix(11)(12), - inputMatrix(11)(13), - inputMatrix(12)(9), - inputMatrix(12)(10), - inputMatrix(12)(11), - inputMatrix(12)(12), - inputMatrix(12)(13) + inputMatrix(8)(9), + inputMatrix(8)(10), + inputMatrix(8)(11), + inputMatrix(8)(12), + inputMatrix(8)(13), + inputMatrix(9)(9), + inputMatrix(9)(10), + inputMatrix(9)(11), + inputMatrix(9)(12), + inputMatrix(9)(13), + inputMatrix(10)(9), + inputMatrix(10)(10), + inputMatrix(10)(11), + inputMatrix(10)(12), + inputMatrix(10)(13), + inputMatrix(11)(9), + inputMatrix(11)(10), + inputMatrix(11)(11), + inputMatrix(11)(12), + inputMatrix(11)(13), + inputMatrix(12)(9), + inputMatrix(12)(10), + inputMatrix(12)(11), + inputMatrix(12)(12), + inputMatrix(12)(13) ).min.toDouble - + // max filter - - result = MosaicRasterGDAL(ds, "", "", "GTiff", -1).filter(5, "max").flushCache() - + + result = MosaicRasterGDAL(ds, createInfo, -1).filter(5, "max").flushCache() + resultValues = result.getBand(1).values - + inputMatrix = values.toArray.grouped(50).toArray resultMatrix = resultValues.grouped(50).toArray - + // first block - + resultMatrix(10)(11) shouldBe Seq( - inputMatrix(8)(9), - inputMatrix(8)(10), - inputMatrix(8)(11), - inputMatrix(8)(12), - inputMatrix(8)(13), - inputMatrix(9)(9), - inputMatrix(9)(10), - inputMatrix(9)(11), - inputMatrix(9)(12), - inputMatrix(9)(13), - inputMatrix(10)(9), - inputMatrix(10)(10), - inputMatrix(10)(11), - inputMatrix(10)(12), - inputMatrix(10)(13), - inputMatrix(11)(9), - inputMatrix(11)(10), - inputMatrix(11)(11), - inputMatrix(11)(12), - inputMatrix(11)(13), - inputMatrix(12)(9), - inputMatrix(12)(10), - inputMatrix(12)(11), - inputMatrix(12)(12), - inputMatrix(12)(13) + inputMatrix(8)(9), + inputMatrix(8)(10), + inputMatrix(8)(11), + inputMatrix(8)(12), + inputMatrix(8)(13), + inputMatrix(9)(9), + inputMatrix(9)(10), + inputMatrix(9)(11), + inputMatrix(9)(12), + inputMatrix(9)(13), + inputMatrix(10)(9), + inputMatrix(10)(10), + inputMatrix(10)(11), + inputMatrix(10)(12), + inputMatrix(10)(13), + inputMatrix(11)(9), + inputMatrix(11)(10), + inputMatrix(11)(11), + inputMatrix(11)(12), + inputMatrix(11)(13), + inputMatrix(12)(9), + inputMatrix(12)(10), + inputMatrix(12)(11), + inputMatrix(12)(12), + inputMatrix(12)(13) ).max.toDouble } diff --git a/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_CombineAvgBehaviors.scala b/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_CombineAvgBehaviors.scala index 611bf8f77..4b7943d8a 100644 --- a/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_CombineAvgBehaviors.scala +++ b/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_CombineAvgBehaviors.scala @@ -4,7 +4,7 @@ import com.databricks.labs.mosaic.core.geometry.api.GeometryAPI import com.databricks.labs.mosaic.core.index.IndexSystem import com.databricks.labs.mosaic.functions.MosaicContext import org.apache.spark.sql.QueryTest -import org.apache.spark.sql.functions.collect_set +import org.apache.spark.sql.functions.{collect_list, collect_set} import org.scalatest.matchers.should.Matchers._ trait RST_CombineAvgBehaviors extends QueryTest { @@ -28,7 +28,7 @@ trait RST_CombineAvgBehaviors extends QueryTest { .select("path", "tiles") .groupBy("path") .agg( - rst_combineavg(collect_set($"tiles")).as("tiles") + rst_combineavg(collect_list($"tiles")).as("tiles") ) .select("tiles") @@ -38,7 +38,7 @@ trait RST_CombineAvgBehaviors extends QueryTest { //noException should be thrownBy spark.sql(""" - |select rst_combineavg(collect_set(tiles)) as tiles + |select rst_combineavg(collect_list(tiles)) as tiles |from ( | select path, rst_tessellate(tile, 2) as tiles | from source diff --git a/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_DerivedBandBehaviors.scala b/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_DerivedBandBehaviors.scala index ef6466a88..d883fc5cc 100644 --- a/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_DerivedBandBehaviors.scala +++ b/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_DerivedBandBehaviors.scala @@ -4,7 +4,7 @@ import com.databricks.labs.mosaic.core.geometry.api.GeometryAPI import com.databricks.labs.mosaic.core.index.IndexSystem import com.databricks.labs.mosaic.functions.MosaicContext import org.apache.spark.sql.QueryTest -import org.apache.spark.sql.functions.{collect_set, lit} +import org.apache.spark.sql.functions.{collect_list, lit} import org.scalatest.matchers.should.Matchers._ trait RST_DerivedBandBehaviors extends QueryTest { @@ -40,7 +40,7 @@ trait RST_DerivedBandBehaviors extends QueryTest { .select("path", "tiles") .groupBy("path") .agg( - rst_derivedband(collect_set($"tiles"), lit(pyFuncCode), lit(funcName)).as("tiles") + rst_derivedband(collect_list($"tiles"), lit(pyFuncCode), lit(funcName)).as("tiles") ) .select("tiles") @@ -52,7 +52,7 @@ trait RST_DerivedBandBehaviors extends QueryTest { noException should be thrownBy spark.sql( """ |select rst_derivedband( - | collect_set(tiles), + | collect_list(tiles), |" |import numpy as np |def multiply(in_ar, out_ar, xoff, yoff, xsize, ysize, raster_xsize,raster_ysize, buf_radius, gt, **kwargs): diff --git a/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeBehaviors.scala b/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeBehaviors.scala index 893d6bdf4..330345f20 100644 --- a/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeBehaviors.scala +++ b/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_MergeBehaviors.scala @@ -4,7 +4,7 @@ import com.databricks.labs.mosaic.core.geometry.api.GeometryAPI import com.databricks.labs.mosaic.core.index.IndexSystem import com.databricks.labs.mosaic.functions.MosaicContext import org.apache.spark.sql.QueryTest -import org.apache.spark.sql.functions.collect_set +import org.apache.spark.sql.functions.collect_list import org.scalatest.matchers.should.Matchers._ trait RST_MergeBehaviors extends QueryTest { @@ -29,7 +29,7 @@ trait RST_MergeBehaviors extends QueryTest { .select("path", "tile") .groupBy("path") .agg( - collect_set("tile").as("tiles") + collect_list("tile").as("tiles") ) .select( rst_merge($"tiles").as("tile") @@ -41,7 +41,7 @@ trait RST_MergeBehaviors extends QueryTest { spark.sql(""" |select rst_merge(tiles) as tile |from ( - | select collect_set(tile) as tiles + | select collect_list(tile) as tiles | from ( | select path, rst_tessellate(tile, 3) as tile | from source @@ -55,7 +55,7 @@ trait RST_MergeBehaviors extends QueryTest { .select("path", "tile") .groupBy("path") .agg( - collect_set("tile").as("tiles") + collect_list("tile").as("tiles") ) .select( rst_merge($"tiles").as("tile") diff --git a/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_TessellateBehaviors.scala b/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_TessellateBehaviors.scala index 050e5cb4d..38d3fc778 100644 --- a/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_TessellateBehaviors.scala +++ b/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_TessellateBehaviors.scala @@ -42,7 +42,7 @@ trait RST_TessellateBehaviors extends QueryTest { val result = gridTiles.select(explode(col("avg")).alias("a")).groupBy("a").count().collect() - result.length should be(462) + result.length should be(441) val netcdf = spark.read .format("gdal") diff --git a/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_TransformBehaviors.scala b/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_TransformBehaviors.scala new file mode 100644 index 000000000..9ce449b13 --- /dev/null +++ b/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_TransformBehaviors.scala @@ -0,0 +1,49 @@ +package com.databricks.labs.mosaic.expressions.raster + +import com.databricks.labs.mosaic.core.geometry.api.GeometryAPI +import com.databricks.labs.mosaic.core.index.IndexSystem +import com.databricks.labs.mosaic.functions.MosaicContext +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.functions._ +import org.scalatest.matchers.should.Matchers._ + +trait RST_TransformBehaviors extends QueryTest { + + // noinspection MapGetGet + def behavior(indexSystem: IndexSystem, geometryAPI: GeometryAPI): Unit = { + spark.sparkContext.setLogLevel("FATAL") + val mc = MosaicContext.build(indexSystem, geometryAPI) + mc.register() + val sc = spark + import mc.functions._ + import sc.implicits._ + + val rastersInMemory = spark.read + .format("gdal") + .option("raster_storage", "in-memory") + .load("src/test/resources/modis") + + val gridTiles = rastersInMemory + .withColumn("tile", rst_transform($"tile", lit(27700))) + .withColumn("bbox", st_aswkt(rst_boundingbox($"tile"))) + .select("bbox", "path", "tile") + .withColumn("avg", rst_avg($"tile")) + + rastersInMemory + .createOrReplaceTempView("source") + + noException should be thrownBy spark.sql(""" + |select rst_transform(tile, 27700) from source + |""".stripMargin) + + noException should be thrownBy rastersInMemory + .withColumn("tile", rst_transform($"tile", lit(27700))) + .select("tile") + + val result = gridTiles.select(explode(col("avg")).alias("a")).groupBy("a").count().collect() + + result.length should be(7) + + } + +} diff --git a/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_TransformTest.scala b/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_TransformTest.scala new file mode 100644 index 000000000..b7c10e548 --- /dev/null +++ b/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_TransformTest.scala @@ -0,0 +1,32 @@ +package com.databricks.labs.mosaic.expressions.raster + +import com.databricks.labs.mosaic.core.geometry.api.JTS +import com.databricks.labs.mosaic.core.index.H3IndexSystem +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSessionGDAL + +import scala.util.Try + +class RST_TransformTest extends QueryTest with SharedSparkSessionGDAL with RST_TransformBehaviors { + + private val noCodegen = + withSQLConf( + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false", + SQLConf.CODEGEN_FACTORY_MODE.key -> CodegenObjectFactoryMode.NO_CODEGEN.toString + ) _ + + // Hotfix for SharedSparkSession afterAll cleanup. + override def afterAll(): Unit = Try(super.afterAll()) + + // These tests are not index system nor geometry API specific. + // Only testing one pairing is sufficient. + test("Testing RST_Transform with manual GDAL registration (H3, JTS).") { + noCodegen { + assume(System.getProperty("os.name") == "Linux") + behavior(H3IndexSystem, JTS) + } + } + +}