Skip to content

Commit

Permalink
Merge pull request OSGeo#11915 from rouault/fix_11904
Browse files Browse the repository at this point in the history
VRT and multitthreaded warp: propagate errors from auxiliary threads to main thread
  • Loading branch information
rouault authored Mar 5, 2025
2 parents cf9dc1a + 4970a4f commit 94f13bf
Show file tree
Hide file tree
Showing 15 changed files with 415 additions and 288 deletions.
49 changes: 29 additions & 20 deletions alg/gdalwarpoperation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "cpl_config.h"
#include "cpl_conv.h"
#include "cpl_error.h"
#include "cpl_error_internal.h"
#include "cpl_mask.h"
#include "cpl_multiproc.h"
#include "cpl_string.h"
Expand Down Expand Up @@ -1044,20 +1045,22 @@ CPLErr GDALChunkAndWarpImage(GDALWarpOperationH hOperation, int nDstXOff,
/* ChunkThreadMain() */
/************************************************************************/

typedef struct
struct ChunkThreadData
{
GDALWarpOperation *poOperation;
GDALWarpChunk *pasChunkInfo;
CPLJoinableThread *hThreadHandle;
CPLErr eErr;
double dfProgressBase;
double dfProgressScale;
CPLMutex *hIOMutex;

CPLMutex *hCondMutex;
volatile int bIOMutexTaken;
CPLCond *hCond;
} ChunkThreadData;
GDALWarpOperation *poOperation = nullptr;
GDALWarpChunk *pasChunkInfo = nullptr;
CPLJoinableThread *hThreadHandle = nullptr;
CPLErr eErr = CE_None;
double dfProgressBase = 0;
double dfProgressScale = 0;
CPLMutex *hIOMutex = nullptr;

CPLMutex *hCondMutex = nullptr;
volatile int bIOMutexTaken = 0;
CPLCond *hCond = nullptr;

CPLErrorAccumulator *poErrorAccumulator = nullptr;
};

static void ChunkThreadMain(void *pThreadData)

Expand Down Expand Up @@ -1086,6 +1089,10 @@ static void ChunkThreadMain(void *pThreadData)
CPLReleaseMutex(psData->hCondMutex);
}

auto oAccumulator =
psData->poErrorAccumulator->InstallForCurrentScope();
CPL_IGNORE_RET_VAL(oAccumulator);

psData->eErr = psData->poOperation->WarpRegion(
pasChunkInfo->dx, pasChunkInfo->dy, pasChunkInfo->dsx,
pasChunkInfo->dsy, pasChunkInfo->sx, pasChunkInfo->sy,
Expand Down Expand Up @@ -1150,13 +1157,13 @@ CPLErr GDALWarpOperation::ChunkAndWarpMulti(int nDstXOff, int nDstYOff,
/* information for each region. */
/* -------------------------------------------------------------------- */
ChunkThreadData volatile asThreadData[2] = {};
memset(reinterpret_cast<void *>(
const_cast<ChunkThreadData(*)[2]>(&asThreadData)),
0, sizeof(asThreadData));
asThreadData[0].poOperation = this;
asThreadData[0].hIOMutex = hIOMutex;
asThreadData[1].poOperation = this;
asThreadData[1].hIOMutex = hIOMutex;
CPLErrorAccumulator oErrorAccumulator;
for (int i = 0; i < 2; ++i)
{
asThreadData[i].poOperation = this;
asThreadData[i].hIOMutex = hIOMutex;
asThreadData[i].poErrorAccumulator = &oErrorAccumulator;
}

double dfPixelsProcessed = 0.0;
double dfTotalPixels = static_cast<double>(nDstXSize) * nDstYSize;
Expand Down Expand Up @@ -1260,6 +1267,8 @@ CPLErr GDALWarpOperation::ChunkAndWarpMulti(int nDstXOff, int nDstYOff,

WipeChunkList();

oErrorAccumulator.ReplayErrors();

psOptions->pfnProgress(1.0, "", psOptions->pProgressArg);

return eErr;
Expand Down
21 changes: 9 additions & 12 deletions apps/gdalwarp_bin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,20 +204,17 @@ MAIN_START(argc, argv)
}
else
{
std::vector<CPLErrorHandlerAccumulatorStruct> aoErrors;
CPLInstallErrorHandlerAccumulator(aoErrors);
hDstDS = GDALOpenEx(
sOptionsForBinary.osDstFilename.c_str(),
GDAL_OF_RASTER | GDAL_OF_VERBOSE_ERROR | GDAL_OF_UPDATE, nullptr,
sOptionsForBinary.aosDestOpenOptions.List(), nullptr);
CPLUninstallErrorHandlerAccumulator();
CPLErrorAccumulator oErrorAccumulator;
{
auto oAccumulator = oErrorAccumulator.InstallForCurrentScope();
hDstDS = GDALOpenEx(
sOptionsForBinary.osDstFilename.c_str(),
GDAL_OF_RASTER | GDAL_OF_VERBOSE_ERROR | GDAL_OF_UPDATE,
nullptr, sOptionsForBinary.aosDestOpenOptions.List(), nullptr);
}
if (hDstDS != nullptr)
{
for (size_t i = 0; i < aoErrors.size(); i++)
{
CPLError(aoErrors[i].type, aoErrors[i].no, "%s",
aoErrors[i].msg.c_str());
}
oErrorAccumulator.ReplayErrors();
}
}

Expand Down
27 changes: 27 additions & 0 deletions autotest/alg/warp.py
Original file line number Diff line number Diff line change
Expand Up @@ -1964,3 +1964,30 @@ def test_warp_nodata_substitution(dt, expected_val, resampling):
struct.unpack("d", out_ds.ReadRaster(0, 0, 1, 1, buf_type=gdal.GDT_Float64))[0]
== expected_val
)


###############################################################################
# Test propagation of errors from I/O threads to main thread in multi-threaded reading


@gdaltest.enable_exceptions()
def test_warp_multi_threaded_errors(tmp_vsimem):

filename1 = str(tmp_vsimem / "tmp1.tif")
ds = gdal.GetDriverByName("GTiff").Create(filename1, 1, 1)
ds.SetGeoTransform([2, 1, 0, 49, 0, -1])
ds.Close()

filename2 = str(tmp_vsimem / "tmp2.tif")
ds = gdal.GetDriverByName("GTiff").Create(filename2, 1, 1)
ds.SetGeoTransform([3, 1, 0, 49, 0, -1])
ds.Close()

vrt_filename = str(tmp_vsimem / "tmp.vrt")
gdal.BuildVRT(vrt_filename, [filename1, filename2])

gdal.Unlink(filename2)

with gdal.Open(vrt_filename) as ds:
with pytest.raises(Exception):
gdal.Warp("", ds, format="MEM", multithread=True)
27 changes: 27 additions & 0 deletions autotest/gcore/vrt_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -2805,6 +2805,33 @@ def test_vrt_read_multi_threaded_disabled_since_overlapping_sources():
)


###############################################################################
# Test propagation of errors from threads to main thread in multi-threaded reading


@gdaltest.enable_exceptions()
def test_vrt_read_multi_threaded_errors(tmp_vsimem):

filename1 = str(tmp_vsimem / "tmp1.tif")
ds = gdal.GetDriverByName("GTiff").Create(filename1, 1, 1)
ds.SetGeoTransform([2, 1, 0, 49, 0, -1])
ds.Close()

filename2 = str(tmp_vsimem / "tmp2.tif")
ds = gdal.GetDriverByName("GTiff").Create(filename2, 1, 1)
ds.SetGeoTransform([3, 1, 0, 49, 0, -1])
ds.Close()

vrt_filename = str(tmp_vsimem / "tmp.vrt")
gdal.BuildVRT(vrt_filename, [filename1, filename2])

gdal.Unlink(filename2)

with gdal.Open(vrt_filename) as ds:
with pytest.raises(Exception):
ds.GetRasterBand(1).ReadRaster()


###############################################################################
# Test reading a VRT with a <VRTDataset> inside a <SimpleSource>

Expand Down
32 changes: 7 additions & 25 deletions frmts/gti/gdaltileindexdataset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,11 +374,11 @@ class GDALTileIndexDataset final : public GDALPamDataset
{
std::atomic<int> *pnCompletedJobs = nullptr;
std::atomic<bool> *pbSuccess = nullptr;
CPLErrorAccumulator *poErrorAccumulator = nullptr;
GDALTileIndexDataset *poDS = nullptr;
GDALTileIndexDataset::QueueWorkingStates *poQueueWorkingStates =
nullptr;
int nBandNrMax = 0;
std::string *posErrorMsg = nullptr;

int nXOff = 0;
int nYOff = 0;
Expand Down Expand Up @@ -4624,6 +4624,7 @@ CPLErr GDALTileIndexDataset::IRasterIO(

if (m_bLastMustUseMultiThreading)
{
CPLErrorAccumulator oErrorAccumulator;
std::atomic<bool> bSuccess = true;
const int nContributingSources =
static_cast<int>(m_aoSourceDesc.size());
Expand Down Expand Up @@ -4654,16 +4655,15 @@ CPLErr GDALTileIndexDataset::IRasterIO(

auto oQueue = psThreadPool->CreateJobQueue();
std::atomic<int> nCompletedJobs = 0;
std::string osErrorMsg;
for (auto &oSourceDesc : m_aoSourceDesc)
{
auto psJob = new RasterIOJob();
psJob->poDS = this;
psJob->pbSuccess = &bSuccess;
psJob->poErrorAccumulator = &oErrorAccumulator;
psJob->pnCompletedJobs = &nCompletedJobs;
psJob->poQueueWorkingStates = &m_oQueueWorkingStates;
psJob->nBandNrMax = nBandNrMax;
psJob->posErrorMsg = &osErrorMsg;
psJob->nXOff = nXOff;
psJob->nYOff = nYOff;
psJob->nXSize = nXSize;
Expand Down Expand Up @@ -4702,10 +4702,7 @@ CPLErr GDALTileIndexDataset::IRasterIO(
}
}

if (!osErrorMsg.empty())
{
CPLError(CE_Failure, CPLE_AppDefined, "%s", osErrorMsg.c_str());
}
oErrorAccumulator.ReplayErrors();

if (bSuccess && psExtraArg->pfnProgress)
{
Expand Down Expand Up @@ -4754,22 +4751,16 @@ void GDALTileIndexDataset::RasterIOJob::Func(void *pData)

SourceDesc oSourceDesc;

std::vector<CPLErrorHandlerAccumulatorStruct> aoErrors;
CPLInstallErrorHandlerAccumulator(aoErrors);
auto oAccumulator = psJob->poErrorAccumulator->InstallForCurrentScope();
CPL_IGNORE_RET_VAL(oAccumulator);

const bool bCanOpenSource =
psJob->poDS->GetSourceDesc(osTileName, oSourceDesc,
&psJob->poQueueWorkingStates->oMutex) &&
oSourceDesc.poDS;
CPLUninstallErrorHandlerAccumulator();

if (!bCanOpenSource)
{
if (!aoErrors.empty())
{
std::lock_guard oLock(psJob->poQueueWorkingStates->oMutex);
if (psJob->posErrorMsg->empty())
*(psJob->posErrorMsg) = aoErrors.back().msg;
}
*psJob->pbSuccess = false;
}
else
Expand Down Expand Up @@ -4799,8 +4790,6 @@ void GDALTileIndexDataset::RasterIOJob::Func(void *pData)
dfYSize = psJob->psExtraArg->dfYSize;
}

aoErrors.clear();
CPLInstallErrorHandlerAccumulator(aoErrors);
const bool bRenderOK =
psJob->poDS->RenderSource(
oSourceDesc, /*bNeedInitBuffer = */ true, psJob->nBandNrMax,
Expand All @@ -4810,16 +4799,9 @@ void GDALTileIndexDataset::RasterIOJob::Func(void *pData)
psJob->nBandCount, psJob->panBandMap, psJob->nPixelSpace,
psJob->nLineSpace, psJob->nBandSpace, &sArg,
*(poWorkingState.get())) == CE_None;
CPLUninstallErrorHandlerAccumulator();

if (!bRenderOK)
{
if (!aoErrors.empty())
{
std::lock_guard oLock(psJob->poQueueWorkingStates->oMutex);
if (psJob->posErrorMsg->empty())
*(psJob->posErrorMsg) = aoErrors.back().msg;
}
*psJob->pbSuccess = false;
}

Expand Down
Loading

0 comments on commit 94f13bf

Please sign in to comment.