Skip to content

Commit

Permalink
[generator:regions] Speedup locality index building: parallel regions…
Browse files Browse the repository at this point in the history
… stripping
  • Loading branch information
Anatoly Serdtcev committed Jan 13, 2020
1 parent 7305cc1 commit 2aaf3a0
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 14 deletions.
20 changes: 20 additions & 0 deletions base/thread_pool_computational.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <memory>
#include <queue>
#include <thread>
#include <vector>

namespace base
{
Expand Down Expand Up @@ -100,6 +101,23 @@ class ThreadPool
m_condition.notify_one();
}

// Submit min(|workersCountHint|, Size()) tasks and wait completions.
// func - task to be performed.
// Warning: If the thread pool is stopped then the call will be ignored.
template <typename F, typename... Args>
void PerformParallelWorks(F && func, size_t workersCountHint)
{
size_t const workersCount = std::min(std::max(size_t{1}, workersCountHint), Size());

std::vector<std::future<void>> workers{};
workers.reserve(workersCount);
for (size_t i = 0; i < workersCount; ++i)
workers.push_back(Submit(func));

for (auto & worker : workers)
worker.wait();
}

// Stop a ThreadPool.
// Removes the tasks that are not yet started from the queue.
// Unlike the destructor, this function does not wait for all runnables to complete:
Expand All @@ -125,6 +143,8 @@ class ThreadPool
m_joiner.Join();
}

size_t Size() const noexcept { return m_threads.size(); }

private:
void Worker()
{
Expand Down
20 changes: 13 additions & 7 deletions generator/covering_index_generator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include "base/logging.hpp"
#include "base/scope_guard.hpp"
#include "base/string_utils.hpp"
#include "base/thread_utils.hpp"
#include "base/thread_pool_computational.hpp"
#include "base/timer.hpp"

#include "defines.hpp"
Expand All @@ -43,7 +43,8 @@ namespace generator
class CoveredObjectBuilder
{
public:
CoveredObjectBuilder()
CoveredObjectBuilder(base::thread_pool::computational::ThreadPool & threadPool)
: m_threadPool(threadPool)
{
m_header.SetGeometryCodingParams(serial::GeometryCodingParams());
m_header.SetScales({scales::GetUpperScale()});
Expand Down Expand Up @@ -125,7 +126,7 @@ class CoveredObjectBuilder

if (points.size() > 2)
{
if (!holder.TryToMakeStrip(points))
if (!holder.TryToMakeStrip(points, m_threadPool))
{
m2::ConvexHull hull(points, 1e-16);
vector<m2::PointD> hullPoints = hull.Points();
Expand All @@ -151,20 +152,22 @@ class CoveredObjectBuilder
DataHeader m_header;
indexer::CoveredObject m_coveredObject;
buffer_vector<m2::PointD, 32> m_pointsBuffer;
base::thread_pool::computational::ThreadPool & m_threadPool;
};

template <typename FeatureFilter, typename IndexBuilder>
void CoverFeatures(
std::string const & featuresFile, FeatureFilter && featureFilter,
IndexBuilder && indexBuilder, unsigned int threadsCount, uint64_t chunkFeaturesCount,
base::thread_pool::computational::ThreadPool & threadPool,
covering::ObjectsCovering & objectsCovering)
{
std::list<covering::ObjectsCovering> coveringsParts{};
auto makeProcessor = [&] {
coveringsParts.emplace_back();
auto & covering = coveringsParts.back();

CoveredObjectBuilder localityObjectBuilder;
CoveredObjectBuilder localityObjectBuilder{threadPool};
auto processor = [featureFilter, &indexBuilder, &covering, localityObjectBuilder]
(FeatureBuilder & fb, uint64_t /* currPos */) mutable
{
Expand Down Expand Up @@ -229,11 +232,13 @@ bool ParseNodes(string nodesFile, set<uint64_t> & nodeIds)
bool GenerateRegionsIndex(std::string const & outPath, std::string const & featuresFile,
unsigned int threadsCount)
{
base::thread_pool::computational::ThreadPool threadPool{threadsCount};
auto const featuresFilter = [](FeatureBuilder & fb) { return fb.IsArea(); };
indexer::RegionsIndexBuilder indexBuilder;

covering::ObjectsCovering objectsCovering;
CoverFeatures(featuresFile, featuresFilter, indexBuilder, threadsCount,
1 /* chunkFeaturesCount */, objectsCovering);
1 /* chunkFeaturesCount */, threadPool, objectsCovering);

LOG(LINFO, ("Build locality index..."));
if (!indexBuilder.BuildCoveringIndex(std::move(objectsCovering), outPath))
Expand All @@ -248,6 +253,7 @@ bool GenerateGeoObjectsIndex(
boost::optional<std::string> const & nodesFile,
boost::optional<std::string> const & streetsFeaturesFile)
{
base::thread_pool::computational::ThreadPool threadPool{threadsCount};
covering::ObjectsCovering objectsCovering;
indexer::GeoObjectsIndexBuilder indexBuilder;

Expand All @@ -268,7 +274,7 @@ bool GenerateGeoObjectsIndex(
};

CoverFeatures(geoObjectsFeaturesFile, geoObjectsFilter, indexBuilder, threadsCount,
10 /* chunkFeaturesCount */, objectsCovering);
10 /* chunkFeaturesCount */, threadPool, objectsCovering);

if (streetsFeaturesFile)
{
Expand All @@ -278,7 +284,7 @@ bool GenerateGeoObjectsIndex(
};

CoverFeatures(*streetsFeaturesFile, streetsFilter, indexBuilder, threadsCount,
1 /* chunkFeaturesCount */, objectsCovering);
1 /* chunkFeaturesCount */, threadPool, objectsCovering);
}

LOG(LINFO, ("Build objects index..."));
Expand Down
31 changes: 27 additions & 4 deletions generator/geometry_holder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

#include "indexer/data_header.hpp"

#include "base/thread_pool_computational.hpp"

#include <cstdint>
#include <functional>
#include <limits>
Expand Down Expand Up @@ -79,6 +81,30 @@ class GeometryHolder
bool NeedProcessTriangles() const { return !m_trgInner || m_buffer.m_innerTrg.empty(); }

bool TryToMakeStrip(Points & points)
{
auto singleStripFinder = [](Points & points) {
auto visiblityInspector =
IsDiagonalVisibleFunctor<Points::const_iterator>(points.begin(), points.end());
return FindSingleStrip(points.size(), visiblityInspector);
};

return TryToMakeSingleStrip(points, singleStripFinder);
}

bool TryToMakeStrip(Points & points,
base::thread_pool::computational::ThreadPool & threadPool)
{
auto singleStripFinder = [&threadPool](Points & points) {
auto visiblityInspector =
IsDiagonalVisibleFunctor<Points::const_iterator>(points.begin(), points.end());
return FindSingleStrip(points.size(), visiblityInspector, threadPool);
};

return TryToMakeSingleStrip(points, singleStripFinder);
}

template <typename SingleStripFinder>
bool TryToMakeSingleStrip(Points & points, SingleStripFinder && singleStripFinder)
{
size_t const count = points.size();
if (!m_trgInner || (count >= 2 && count - 2 > m_maxNumTriangles))
Expand Down Expand Up @@ -109,9 +135,7 @@ class GeometryHolder
return false;
}

size_t const index = FindSingleStrip(
count, IsDiagonalVisibleFunctor<Points::const_iterator>(points.begin(), points.end()));

size_t const index = singleStripFinder(points);
if (index == count)
{
// can't find strip
Expand All @@ -125,7 +149,6 @@ class GeometryHolder
return true;
}


private:
class StripEmitter
{
Expand Down
40 changes: 37 additions & 3 deletions geometry/polygon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
#include "base/base.hpp"
#include "base/math.hpp"
#include "base/stl_helpers.hpp"
#include "base/thread_pool_computational.hpp"

#include <iterator>
#include <limits>

template <typename IsVisibleF>
bool FindSingleStripForIndex(size_t i, size_t n, IsVisibleF isVisible)
bool FindSingleStripForIndex(size_t i, size_t n, IsVisibleF && isVisible)
{
// Searching for a strip only in a single direction, because the opposite direction
// is traversed from the last vertex of the possible strip.
Expand All @@ -35,17 +36,50 @@ bool FindSingleStripForIndex(size_t i, size_t n, IsVisibleF isVisible)

// If polygon with n vertices is a single strip, return the start index of the strip or n otherwise.
template <typename IsVisibleF>
size_t FindSingleStrip(size_t n, IsVisibleF isVisible)
size_t FindSingleStrip(size_t n, IsVisibleF && isVisible)
{
for (size_t i = 0; i < n; ++i)
{
if (FindSingleStripForIndex(i, n, isVisible))
return i;
}

return n;
}

// If polygon with n vertices is a single strip, return the start index of the strip or n otherwise.
template <typename IsVisibleF>
size_t FindSingleStrip(size_t n, IsVisibleF && isVisible,
base::thread_pool::computational::ThreadPool & threadPool)
{
size_t const & tasksCount = n * n * n / 100'000'000;
if (tasksCount <= 1)
return FindSingleStrip(n, isVisible);

// Try to find instantly.
if (FindSingleStripForIndex(0, n, isVisible))
return 0;

std::atomic_size_t foundIndex{0};
std::atomic_size_t unprocessedIndex{1};
auto find = [n, &isVisible, &foundIndex, &unprocessedIndex]() {
while (foundIndex)
{
auto const i = unprocessedIndex++;
if (i >= n)
return;

if (FindSingleStripForIndex(i, n, isVisible))
foundIndex = i;
}
};

size_t const tasksPerWorker = 10;
size_t const workersCount = std::max(size_t{2}, tasksCount / tasksPerWorker);
threadPool.PerformParallelWorks(find, workersCount);

return foundIndex ? foundIndex.load() : n;
}

#ifdef DEBUG
template <typename IterT> bool TestPolygonPreconditions(IterT beg, IterT end)
{
Expand Down

0 comments on commit 2aaf3a0

Please sign in to comment.