Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[generator:regions] Speedup locality index building: parallel regions stripping #79

Merged
merged 1 commit into from
Jan 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions base/thread_pool_computational.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <memory>
#include <queue>
#include <thread>
#include <vector>

namespace base
{
Expand Down Expand Up @@ -100,6 +101,23 @@ class ThreadPool
m_condition.notify_one();
}

// Submit min(|workersCountHint|, Size()) tasks and wait completions.
// func - task to be performed.
// Warning: If the thread pool is stopped then the call will be ignored.
template <typename F, typename... Args>
void PerformParallelWorks(F && func, size_t workersCountHint)
{
size_t const workersCount = std::min(std::max(size_t{1}, workersCountHint), Size());

std::vector<std::future<void>> workers{};
workers.reserve(workersCount);
for (size_t i = 0; i < workersCount; ++i)
workers.push_back(Submit(func));

for (auto & worker : workers)
worker.wait();
}

// Stop a ThreadPool.
// Removes the tasks that are not yet started from the queue.
// Unlike the destructor, this function does not wait for all runnables to complete:
Expand All @@ -125,6 +143,8 @@ class ThreadPool
m_joiner.Join();
}

size_t Size() const noexcept { return m_threads.size(); }

private:
void Worker()
{
Expand Down
20 changes: 13 additions & 7 deletions generator/covering_index_generator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include "base/logging.hpp"
#include "base/scope_guard.hpp"
#include "base/string_utils.hpp"
#include "base/thread_utils.hpp"
#include "base/thread_pool_computational.hpp"
#include "base/timer.hpp"

#include "defines.hpp"
Expand All @@ -43,7 +43,8 @@ namespace generator
class CoveredObjectBuilder
{
public:
CoveredObjectBuilder()
CoveredObjectBuilder(base::thread_pool::computational::ThreadPool & threadPool)
: m_threadPool(threadPool)
{
m_header.SetGeometryCodingParams(serial::GeometryCodingParams());
m_header.SetScales({scales::GetUpperScale()});
Expand Down Expand Up @@ -125,7 +126,7 @@ class CoveredObjectBuilder

if (points.size() > 2)
{
if (!holder.TryToMakeStrip(points))
if (!holder.TryToMakeStrip(points, m_threadPool))
{
m2::ConvexHull hull(points, 1e-16);
vector<m2::PointD> hullPoints = hull.Points();
Expand All @@ -151,20 +152,22 @@ class CoveredObjectBuilder
DataHeader m_header;
indexer::CoveredObject m_coveredObject;
buffer_vector<m2::PointD, 32> m_pointsBuffer;
base::thread_pool::computational::ThreadPool & m_threadPool;
};

template <typename FeatureFilter, typename IndexBuilder>
void CoverFeatures(
std::string const & featuresFile, FeatureFilter && featureFilter,
IndexBuilder && indexBuilder, unsigned int threadsCount, uint64_t chunkFeaturesCount,
base::thread_pool::computational::ThreadPool & threadPool,
covering::ObjectsCovering & objectsCovering)
{
std::list<covering::ObjectsCovering> coveringsParts{};
auto makeProcessor = [&] {
coveringsParts.emplace_back();
auto & covering = coveringsParts.back();

CoveredObjectBuilder localityObjectBuilder;
CoveredObjectBuilder localityObjectBuilder{threadPool};
auto processor = [featureFilter, &indexBuilder, &covering, localityObjectBuilder]
(FeatureBuilder & fb, uint64_t /* currPos */) mutable
{
Expand Down Expand Up @@ -229,11 +232,13 @@ bool ParseNodes(string nodesFile, set<uint64_t> & nodeIds)
bool GenerateRegionsIndex(std::string const & outPath, std::string const & featuresFile,
unsigned int threadsCount)
{
base::thread_pool::computational::ThreadPool threadPool{threadsCount};
auto const featuresFilter = [](FeatureBuilder & fb) { return fb.IsArea(); };
indexer::RegionsIndexBuilder indexBuilder;

covering::ObjectsCovering objectsCovering;
CoverFeatures(featuresFile, featuresFilter, indexBuilder, threadsCount,
1 /* chunkFeaturesCount */, objectsCovering);
1 /* chunkFeaturesCount */, threadPool, objectsCovering);

LOG(LINFO, ("Build locality index..."));
if (!indexBuilder.BuildCoveringIndex(std::move(objectsCovering), outPath))
Expand All @@ -248,6 +253,7 @@ bool GenerateGeoObjectsIndex(
boost::optional<std::string> const & nodesFile,
boost::optional<std::string> const & streetsFeaturesFile)
{
base::thread_pool::computational::ThreadPool threadPool{threadsCount};
covering::ObjectsCovering objectsCovering;
indexer::GeoObjectsIndexBuilder indexBuilder;

Expand All @@ -268,7 +274,7 @@ bool GenerateGeoObjectsIndex(
};

CoverFeatures(geoObjectsFeaturesFile, geoObjectsFilter, indexBuilder, threadsCount,
10 /* chunkFeaturesCount */, objectsCovering);
10 /* chunkFeaturesCount */, threadPool, objectsCovering);

if (streetsFeaturesFile)
{
Expand All @@ -278,7 +284,7 @@ bool GenerateGeoObjectsIndex(
};

CoverFeatures(*streetsFeaturesFile, streetsFilter, indexBuilder, threadsCount,
1 /* chunkFeaturesCount */, objectsCovering);
1 /* chunkFeaturesCount */, threadPool, objectsCovering);
}

LOG(LINFO, ("Build objects index..."));
Expand Down
31 changes: 27 additions & 4 deletions generator/geometry_holder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

#include "indexer/data_header.hpp"

#include "base/thread_pool_computational.hpp"

#include <cstdint>
#include <functional>
#include <limits>
Expand Down Expand Up @@ -79,6 +81,30 @@ class GeometryHolder
bool NeedProcessTriangles() const { return !m_trgInner || m_buffer.m_innerTrg.empty(); }

bool TryToMakeStrip(Points & points)
{
auto singleStripFinder = [](Points & points) {
auto visiblityInspector =
IsDiagonalVisibleFunctor<Points::const_iterator>(points.begin(), points.end());
return FindSingleStrip(points.size(), visiblityInspector);
};

return TryToMakeSingleStrip(points, singleStripFinder);
}

bool TryToMakeStrip(Points & points,
base::thread_pool::computational::ThreadPool & threadPool)
{
auto singleStripFinder = [&threadPool](Points & points) {
auto visiblityInspector =
IsDiagonalVisibleFunctor<Points::const_iterator>(points.begin(), points.end());
return FindSingleStrip(points.size(), visiblityInspector, threadPool);
};

return TryToMakeSingleStrip(points, singleStripFinder);
}

template <typename SingleStripFinder>
bool TryToMakeSingleStrip(Points & points, SingleStripFinder && singleStripFinder)
{
size_t const count = points.size();
if (!m_trgInner || (count >= 2 && count - 2 > m_maxNumTriangles))
Expand Down Expand Up @@ -109,9 +135,7 @@ class GeometryHolder
return false;
}

size_t const index = FindSingleStrip(
count, IsDiagonalVisibleFunctor<Points::const_iterator>(points.begin(), points.end()));

size_t const index = singleStripFinder(points);
if (index == count)
{
// can't find strip
Expand All @@ -125,7 +149,6 @@ class GeometryHolder
return true;
}


private:
class StripEmitter
{
Expand Down
40 changes: 37 additions & 3 deletions geometry/polygon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
#include "base/base.hpp"
#include "base/math.hpp"
#include "base/stl_helpers.hpp"
#include "base/thread_pool_computational.hpp"

#include <iterator>
#include <limits>

template <typename IsVisibleF>
bool FindSingleStripForIndex(size_t i, size_t n, IsVisibleF isVisible)
bool FindSingleStripForIndex(size_t i, size_t n, IsVisibleF && isVisible)
{
// Searching for a strip only in a single direction, because the opposite direction
// is traversed from the last vertex of the possible strip.
Expand All @@ -35,17 +36,50 @@ bool FindSingleStripForIndex(size_t i, size_t n, IsVisibleF isVisible)

// If polygon with n vertices is a single strip, return the start index of the strip or n otherwise.
template <typename IsVisibleF>
size_t FindSingleStrip(size_t n, IsVisibleF isVisible)
size_t FindSingleStrip(size_t n, IsVisibleF && isVisible)
{
for (size_t i = 0; i < n; ++i)
{
if (FindSingleStripForIndex(i, n, isVisible))
return i;
}

return n;
}

// If polygon with n vertices is a single strip, return the start index of the strip or n otherwise.
template <typename IsVisibleF>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/IsVisibleF/IsVisibleFn/

size_t FindSingleStrip(size_t n, IsVisibleF && isVisible,
base::thread_pool::computational::ThreadPool & threadPool)
{
size_t const & tasksCount = n * n * n / 100'000'000;
if (tasksCount <= 1)
return FindSingleStrip(n, isVisible);

// Try to find instantly.
if (FindSingleStripForIndex(0, n, isVisible))
return 0;

std::atomic_size_t foundIndex{0};
std::atomic_size_t unprocessedIndex{1};
auto find = [n, &isVisible, &foundIndex, &unprocessedIndex]() {
while (foundIndex)
{
auto const i = unprocessedIndex++;
if (i >= n)
return;

if (FindSingleStripForIndex(i, n, isVisible))
foundIndex = i;
}
};

size_t const tasksPerWorker = 10;
size_t const workersCount = std::max(size_t{2}, tasksCount / tasksPerWorker);
threadPool.PerformParallelWorks(find, workersCount);

return foundIndex ? foundIndex.load() : n;
}

#ifdef DEBUG
template <typename IterT> bool TestPolygonPreconditions(IterT beg, IterT end)
{
Expand Down