From 0705ba0312b3e0965efcf5fc25f3d8e3472baf7e Mon Sep 17 00:00:00 2001 From: Hong Date: Tue, 8 Jun 2021 17:33:56 +0800 Subject: [PATCH] [SQL-DS-CACHE-136]Update the documents for OAP 1.1.1 (#138) * [SQL-DS-CACHE-136]Update the documents for OAP 1.1.1 * [SQL-DS-CACHE-136]Update the documents for OAP 1.1.1 and remote additional cache strategies info * [SQL-DS-CACHE-136]Modify the developer guide * Update the OAP Developer Guide --- CHANGELOG.md | 154 +++++++++++- docs/Advanced-Configuration.md | 284 --------------------- docs/Developer-Guide.md | 110 ++------ docs/HCFS-User-Guide.md | 6 +- docs/OAP-Developer-Guide.md | 49 ++-- docs/OAP-Installation-Guide.md | 8 +- docs/User-Guide.md | 29 +-- docs/numa-binding-spark-3.0.0.patch | 378 ---------------------------- mkdocs.yml | 2 +- 9 files changed, 197 insertions(+), 823 deletions(-) delete mode 100644 docs/numa-binding-spark-3.0.0.patch diff --git a/CHANGELOG.md b/CHANGELOG.md index 86b48d454..60fd14471 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,13 +1,151 @@ # Change log -Generated on 2021-04-29 +Generated on 2021-06-02 + +## Release 1.1.1 + +### Native SQL Engine + +#### Features +||| +|:---|:---| +|[#304](https://github.com/oap-project/native-sql-engine/issues/304)|Upgrade to Arrow 4.0.0| +|[#285](https://github.com/oap-project/native-sql-engine/issues/285)|ColumnarWindow: Support Date/Timestamp input in MAX/MIN| +|[#297](https://github.com/oap-project/native-sql-engine/issues/297)|Disable incremental compiler in CI| +|[#245](https://github.com/oap-project/native-sql-engine/issues/245)|Support columnar rdd cache| +|[#276](https://github.com/oap-project/native-sql-engine/issues/276)|Add option to switch Hadoop version| +|[#274](https://github.com/oap-project/native-sql-engine/issues/274)|Comment to trigger tpc-h RAM test| +|[#256](https://github.com/oap-project/native-sql-engine/issues/256)|CI: do not run ram report for each PR| + +#### Bugs Fixed +||| +|:---|:---| +|[#325](https://github.com/oap-project/native-sql-engine/issues/325)|java.util.ConcurrentModificationException: mutation occurred during iteration| +|[#329](https://github.com/oap-project/native-sql-engine/issues/329)|numPartitions are not the same| +|[#318](https://github.com/oap-project/native-sql-engine/issues/318)|fix Spark 311 on data source v2| +|[#311](https://github.com/oap-project/native-sql-engine/issues/311)|Build reports errors| +|[#302](https://github.com/oap-project/native-sql-engine/issues/302)|test on v2 failed due to an exception| +|[#257](https://github.com/oap-project/native-sql-engine/issues/257)|different version of slf4j-log4j| +|[#293](https://github.com/oap-project/native-sql-engine/issues/293)|Fix BHJ loss if key = 0| +|[#248](https://github.com/oap-project/native-sql-engine/issues/248)|arrow dependency must put after arrow installation| + +#### PRs +||| +|:---|:---| +|[#332](https://github.com/oap-project/native-sql-engine/pull/332)|[NSE-325] fix incremental compile issue with 4.5.x scala-maven-plugin| +|[#335](https://github.com/oap-project/native-sql-engine/pull/335)|[NSE-329] fix out partitioning in BHJ and SHJ| +|[#328](https://github.com/oap-project/native-sql-engine/pull/328)|[NSE-318]check schema before reuse exchange| +|[#307](https://github.com/oap-project/native-sql-engine/pull/307)|[NSE-304] Upgrade to Arrow 4.0.0| +|[#312](https://github.com/oap-project/native-sql-engine/pull/312)|[NSE-311] Build reports errors| +|[#272](https://github.com/oap-project/native-sql-engine/pull/272)|[NSE-273] support spark311| +|[#303](https://github.com/oap-project/native-sql-engine/pull/303)|[NSE-302] fix v2 test| +|[#306](https://github.com/oap-project/native-sql-engine/pull/306)|[NSE-304] Upgrade to Arrow 4.0.0: Change basic GHA TPC-H test target …| +|[#286](https://github.com/oap-project/native-sql-engine/pull/286)|[NSE-285] ColumnarWindow: Support Date input in MAX/MIN| +|[#298](https://github.com/oap-project/native-sql-engine/pull/298)|[NSE-297] Disable incremental compiler in GHA CI| +|[#291](https://github.com/oap-project/native-sql-engine/pull/291)|[NSE-257] fix multiple slf4j bindings| +|[#294](https://github.com/oap-project/native-sql-engine/pull/294)|[NSE-293] fix unsafemap with key = '0'| +|[#233](https://github.com/oap-project/native-sql-engine/pull/233)|[NSE-207] fix issues found from aggregate unit tests| +|[#246](https://github.com/oap-project/native-sql-engine/pull/246)|[NSE-245]Adding columnar RDD cache support| +|[#289](https://github.com/oap-project/native-sql-engine/pull/289)|[NSE-206]Update installation guide and configuration guide.| +|[#277](https://github.com/oap-project/native-sql-engine/pull/277)|[NSE-276] Add option to switch Hadoop version| +|[#275](https://github.com/oap-project/native-sql-engine/pull/275)|[NSE-274] Comment to trigger tpc-h RAM test| +|[#271](https://github.com/oap-project/native-sql-engine/pull/271)|[NSE-196] clean up configs in unit tests| +|[#258](https://github.com/oap-project/native-sql-engine/pull/258)|[NSE-257] fix different versions of slf4j-log4j12| +|[#259](https://github.com/oap-project/native-sql-engine/pull/259)|[NSE-248] fix arrow dependency order| +|[#249](https://github.com/oap-project/native-sql-engine/pull/249)|[NSE-241] fix hashagg result length| +|[#255](https://github.com/oap-project/native-sql-engine/pull/255)|[NSE-256] do not run ram report test on each PR| + + +### SQL DS Cache + +#### Features +||| +|:---|:---| +|[#118](https://github.com/oap-project/sql-ds-cache/issues/118)|port to Spark 3.1.1| + +#### Bugs Fixed +||| +|:---|:---| +|[#121](https://github.com/oap-project/sql-ds-cache/issues/121)|OAP Index creation stuck issue| + +#### PRs +||| +|:---|:---| +|[#132](https://github.com/oap-project/sql-ds-cache/pull/132)|Fix SampleBasedStatisticsSuite UnitTest case| +|[#122](https://github.com/oap-project/sql-ds-cache/pull/122)|[ sql-ds-cache-121] Fix Index stuck issues| +|[#119](https://github.com/oap-project/sql-ds-cache/pull/119)|[SQL-DS-CACHE-118][POAE7-1130] port sql-ds-cache to Spark3.1.1| + + +### OAP MLlib + +#### Features +||| +|:---|:---| +|[#26](https://github.com/oap-project/oap-mllib/issues/26)|[PIP] Support Spark 3.0.1 / 3.0.2 and upcoming 3.1.1| + +#### PRs +||| +|:---|:---| +|[#39](https://github.com/oap-project/oap-mllib/pull/39)|[ML-26] Build for different spark version by -Pprofile| + + +### PMEM Spill + +#### Features +||| +|:---|:---| +|[#34](https://github.com/oap-project/pmem-spill/issues/34)|Support vanilla spark 3.1.1| + +#### PRs +||| +|:---|:---| +|[#41](https://github.com/oap-project/pmem-spill/pull/41)|[PMEM-SPILL-34][POAE7-1119]Port RDD cache to Spark 3.1.1 as separate module| + + +### PMEM Common + +#### Features +||| +|:---|:---| +|[#10](https://github.com/oap-project/pmem-common/issues/10)|add -mclflushopt flag to enable clflushopt for gcc| +|[#8](https://github.com/oap-project/pmem-common/issues/8)|use clflushopt instead of clflush | + +#### PRs +||| +|:---|:---| +|[#11](https://github.com/oap-project/pmem-common/pull/11)|[PMEM-COMMON-10][POAE7-1010]Add -mclflushopt flag to enable clflushop…| +|[#9](https://github.com/oap-project/pmem-common/pull/9)|[PMEM-COMMON-8][POAE7-896]use clflush optimize version for clflush| + + +### PMEM Shuffle + +#### Features +||| +|:---|:---| +|[#15](https://github.com/oap-project/pmem-shuffle/issues/15)|Doesn't work with Spark3.1.1| + +#### PRs +||| +|:---|:---| +|[#16](https://github.com/oap-project/pmem-shuffle/pull/16)|[pmem-shuffle-15] Make pmem-shuffle support Spark3.1.1| + + +### Remote Shuffle + +#### Features +||| +|:---|:---| +|[#18](https://github.com/oap-project/remote-shuffle/issues/18)|upgrade to Spark-3.1.1| +|[#11](https://github.com/oap-project/remote-shuffle/issues/11)|Support DAOS Object Async API| + +#### PRs +||| +|:---|:---| +|[#19](https://github.com/oap-project/remote-shuffle/pull/19)|[REMOTE-SHUFFLE-18] upgrade to Spark-3.1.1| +|[#14](https://github.com/oap-project/remote-shuffle/pull/14)|[REMOTE-SHUFFLE-11] Support DAOS Object Async API| + + ## Release 1.1.0 -* [Native SQL Engine](#native-sql-engine) -* [SQL DS Cache](#sql-ds-cache) -* [OAP MLlib](#oap-mllib) -* [PMEM Spill](#pmem-spill) -* [PMEM Shuffle](#pmem-shuffle) -* [Remote Shuffle](#remote-shuffle) ### Native SQL Engine @@ -264,7 +402,7 @@ Generated on 2021-04-29 |[#6](https://github.com/oap-project/pmem-shuffle/pull/6)|[PMEM-SHUFFLE-7] enable fsdax mode in pmem-shuffle| -### Remote-Shuffle +### Remote Shuffle #### Features ||| diff --git a/docs/Advanced-Configuration.md b/docs/Advanced-Configuration.md index d25e2d91c..3a762ed24 100644 --- a/docs/Advanced-Configuration.md +++ b/docs/Advanced-Configuration.md @@ -2,290 +2,6 @@ In addition to usage information provided in [User Guide](User-Guide.md), we provide more strategies for SQL Index and Data Source Cache in this section. -Their needed dependencies like ***Memkind*** ,***Vmemcache*** and ***Plasma*** can be automatically installed when following [OAP Installation Guide](OAP-Installation-Guide.md), corresponding feature jars can be found under `$HOME/miniconda2/envs/oapenv/oap_jars`. - -- [Additional Cache Strategies](#Additional-Cache-Strategies) In addition to **external** cache strategy, SQL Data Source Cache also supports 3 other cache strategies: **guava**, **noevict** and **vmemcache**. -- [Index and Data Cache Separation](#Index-and-Data-Cache-Separation) To optimize the cache media utilization, SQL Data Source Cache supports cache separation of data and index, by using same or different cache media with DRAM and PMem. -- [Cache Hot Tables](#Cache-Hot-Tables) Data Source Cache also supports caching specific tables according to actual situations, these tables are usually hot tables. -- [Column Vector Cache](#Column-Vector-Cache) This document above uses **binary** cache as example for Parquet file format, if your cluster memory resources is abundant enough, you can choose ColumnVector data cache instead of binary cache for Parquet to spare computation time. -- [Large Scale and Heterogeneous Cluster Support](#Large-Scale-and-Heterogeneous-Cluster-Support) Introduce an external database to store cache locality info to support large-scale and heterogeneous clusters. - -## Additional Cache Strategies - -Following table shows features of 4 cache strategies on PMem. - -| guava | noevict | vmemcache | external cache | -| :----- | :----- | :----- | :-----| -| Use memkind lib to operate on PMem and guava cache strategy when data eviction happens. | Use memkind lib to operate on PMem and doesn't allow data eviction. | Use vmemcache lib to operate on PMem and LRU cache strategy when data eviction happens. | Use Plasma/dlmalloc to operate on PMem and LRU cache strategy when data eviction happens. | -| Need numa patch in Spark for better performance. | Need numa patch in Spark for better performance. | Need numa patch in Spark for better performance. | Doesn't need numa patch. | -| Suggest using 2 executors one node to keep aligned with PMem paths and numa nodes number. | Suggest using 2 executors one node to keep aligned with PMem paths and numa nodes number. | Suggest using 2 executors one node to keep aligned with PMem paths and numa nodes number. | Node-level cache so there are no limitation for executor number. | -| Cache data cleaned once executors exited. | Cache data cleaned once executors exited. | Cache data cleaned once executors exited. | No data loss when executors exit thus is friendly to dynamic allocation. But currently it has performance overhead than other cache solutions. | - -- For cache solution `guava/noevict`, make sure [Memkind](https://github.com/memkind/memkind/tree/v1.10.1-rc2) library installed on every cluster worker node. If you have finished [OAP Installation Guide](OAP-Installation-Guide.md), libmemkind will be installed. Or manually build and install it following [memkind-installation](./Developer-Guide.md#memkind-installation), then place `libmemkind.so.0` under `/lib64/` on each worker node. - -- For cache solution `vmemcahe/external` cache, make sure [Vmemcache](https://github.com/pmem/vmemcache) library has been installed on every cluster worker node. If you have finished [OAP Installation Guide](OAP-Installation-Guide.md), libvmemcache will be installed. Or you can follow the [vmemcache-installation](./Developer-Guide.md#vmemcache-installation) steps and make sure `libvmemcache.so.0` exist under `/lib64/` directory on each worker node. - -If you have followed [OAP Installation Guide](OAP-Installation-Guide.md), ***Memkind*** ,***Vmemcache*** and ***Plasma*** will be automatically installed. -Or you can refer to [OAP Developer-Guide](OAP-Developer-Guide.md), there is a shell script to help you install these dependencies automatically. - -### Use PMem Cache - -#### Prerequisites - -The following are required to configure OAP to use PMem cache. - -- PMem hardware is successfully deployed on each node in cluster. - -- Directories exposing PMem hardware on each socket. For example, on a two socket system the mounted PMem directories should appear as `/mnt/pmem0` and `/mnt/pmem1`. Correctly installed PMem must be formatted and mounted on every cluster worker node. You can follow these commands to destroy interleaved PMem device which you set in [User-Guide](./User-Guide.md#prerequisites-1): - -``` - # destroy interleaved PMem device which you set when using external cache strategy - umount /mnt/pmem - dmsetup remove striped-pmem - echo y | mkfs.ext4 /dev/pmem0 - echo y | mkfs.ext4 /dev/pmem1 - mkdir -p /mnt/pmem0 - mkdir -p /mnt/pmem1 - mount -o dax /dev/pmem0 /mnt/pmem0 - mount -o dax /dev/pmem1 /mnt/pmem1 -``` - - In this case file systems are generated for 2 NUMA nodes, which can be checked by "numactl --hardware". For a different number of NUMA nodes, a corresponding number of namespaces should be created to assure correct file system paths mapping to NUMA nodes. - - For more information you can refer to [Quick Start Guide: Provision Intel® Optane™ DC Persistent Memory](https://software.intel.com/content/www/us/en/develop/articles/quick-start-guide-configure-intel-optane-dc-persistent-memory-on-linux.html) - -#### Configuration for NUMA - -1. Install `numactl` to bind the executor to the PMem device on the same NUMA node. - - ```yum install numactl -y ``` - -2. We strongly recommend you use NUMA-patched Spark to achieve better performance gain for the following 3 cache strategies. Besides, currently using Community Spark occasionally has the problem of two executors being bound to the same PMem path. - - Build Spark from source to enable NUMA-binding support, refer to [Enabling-NUMA-binding-for-PMem-in-Spark](./Developer-Guide.md#Enabling-NUMA-binding-for-PMem-in-Spark). - - -#### Configuration for PMem - -Create `persistent-memory.xml` under `$SPARK_HOME/conf` if it doesn't exist. Use the following template and change the `initialPath` to your mounted paths for PMem devices. - -``` - - - - - /mnt/pmem0 - - - /mnt/pmem1 - - -``` - -### Guava cache - -Guava cache is based on memkind library, built on top of jemalloc and provides memory characteristics. To use it in your workload, follow [prerequisites](#prerequisites) to set up PMem hardware correctly, also make sure memkind library installed. Then follow configurations below. - -**NOTE**: `spark.executor.sql.oap.cache.persistent.memory.reserved.size`: When we use PMem as memory through memkind library, some portion of the space needs to be reserved for memory management overhead, such as memory segmentation. We suggest reserving 20% - 25% of the available PMem capacity to avoid memory allocation failure. But even with an allocation failure, OAP will continue the operation to read data from original input data and will not cache the data block. - -``` -# enable numa -spark.yarn.numa.enabled true -spark.executorEnv.MEMKIND_ARENA_NUM_PER_KIND 1 -# for Parquet file format, enable binary cache -spark.sql.oap.parquet.binary.cache.enabled true -# for ORC file format, enable binary cache -spark.sql.oap.orc.binary.cache.enabled true - -spark.sql.oap.cache.memory.manager pm -spark.oap.cache.strategy guava -# PMem capacity per executor, according to your cluster -spark.executor.sql.oap.cache.persistent.memory.initial.size 256g -# Reserved space per executor, according to your cluster -spark.executor.sql.oap.cache.persistent.memory.reserved.size 50g -# enable SQL Index and Data Source Cache jar in Spark -spark.sql.extensions org.apache.spark.sql.OapExtensions -# absolute path of the jar on your working node, when in Yarn client mode -spark.files $HOME/miniconda2/envs/oapenv/oap_jars/plasma-sql-ds-cache--with-spark-.jar,$HOME/miniconda2/envs/oapenv/oap_jars/pmem-common--with-spark-.jar -# relative path to spark.files, just specify jar name in current dir, when in Yarn client mode -spark.executor.extraClassPath ./plasma-sql-ds-cache--with-spark-.jar:./pmem-common--with-spark-.jar -# absolute path of the jar on your working node,when in Yarn client mode -spark.driver.extraClassPath $HOME/miniconda2/envs/oapenv/oap_jars/plasma-sql-ds-cache--with-spark-.jar:$HOME/miniconda2/envs/oapenv/oap_jars/pmem-common--with-spark-.jar - -``` - -Memkind library also support DAX KMEM mode. Refer to [Kernel](https://github.com/memkind/memkind#kernel), this chapter will guide how to configure PMem as system RAM. Or [Memkind support for KMEM DAX option](https://pmem.io/2020/01/20/memkind-dax-kmem.html) for more details. - -Please note that DAX KMEM mode need kernel version 5.x and memkind version 1.10 or above. If you choose KMEM mode, change memory manager from `pm` to `kmem` as below. -``` -spark.sql.oap.cache.memory.manager kmem -``` - -### Noevict cache - -The noevict cache strategy is also supported in OAP based on the memkind library for PMem. - -To use it in your workload, follow [prerequisites](#prerequisites) to set up PMem hardware correctly, also make sure memkind library installed. Then follow the configuration below. - -``` -# enable numa -spark.yarn.numa.enabled true -spark.executorEnv.MEMKIND_ARENA_NUM_PER_KIND 1 -# for Parquet file format, enable binary cache -spark.sql.oap.parquet.binary.cache.enabled true -# for ORC file format, enable binary cache -spark.sql.oap.orc.binary.cache.enabled true -spark.oap.cache.strategy noevict -spark.executor.sql.oap.cache.persistent.memory.initial.size 256g - -# Enable OAP extension in Spark -spark.sql.extensions org.apache.spark.sql.OapExtensions - -# absolute path of the jar on your working node, when in Yarn client mode -spark.files $HOME/miniconda2/envs/oapenv/oap_jars/plasma-sql-ds-cache--with-spark-.jar,$HOME/miniconda2/envs/oapenv/oap_jars/pmem-common--with-spark-.jar -# relative path to spark.files, just specify jar name in current dir, when in Yarn client mode -spark.executor.extraClassPath ./plasma-sql-ds-cache--with-spark-.jar:./pmem-common--with-spark-.jar -# absolute path of the jar on your working node,when in Yarn client mode -spark.driver.extraClassPath $HOME/miniconda2/envs/oapenv/oap_jars/plasma-sql-ds-cache--with-spark-.jar:$HOME/miniconda2/envs/oapenv/oap_jars/pmem-common--with-spark-.jar - -``` - -### Vmemcache - -- Make sure [Vmemcache](https://github.com/pmem/vmemcache) library has been installed on every cluster worker node if vmemcache strategy is chosen for PMem cache. If you have finished [OAP-Installation-Guide](OAP-Installation-Guide.md), vmemcache library will be automatically installed by Conda. - - Or you can follow the [build/install](./Developer-Guide.md#build-and-install-vmemcache) steps and make sure `libvmemcache.so` exist in `/lib64` directory on each worker node. -- To use it in your workload, follow [prerequisites](#prerequisites) to set up PMem hardware correctly. - -#### Configure to enable PMem cache - -Make the following configuration changes in `$SPARK_HOME/conf/spark-defaults.conf`. - -``` -# 2x number of your worker nodes -spark.executor.instances 6 -# enable numa -spark.yarn.numa.enabled true -# Enable OAP extension in Spark -spark.sql.extensions org.apache.spark.sql.OapExtensions - -# absolute path of the jar on your working node, when in Yarn client mode -spark.files $HOME/miniconda2/envs/oapenv/oap_jars/plasma-sql-ds-cache--with-spark-.jar,$HOME/miniconda2/envs/oapenv/oap_jars/pmem-common--with-spark-.jar -# relative path to spark.files, just specify jar name in current dir, when in Yarn client mode -spark.executor.extraClassPath ./plasma-sql-ds-cache--with-spark-.jar:./pmem-common--with-spark-.jar -# absolute path of the jar on your working node,when in Yarn client mode -spark.driver.extraClassPath $HOME/miniconda2/envs/oapenv/oap_jars/plasma-sql-ds-cache--with-spark-.jar:$HOME/miniconda2/envs/oapenv/oap_jars/pmem-common--with-spark-.jar - -# for parquet file format, enable binary cache -spark.sql.oap.parquet.binary.cache.enabled true -# for ORC file format, enable binary cache -spark.sql.oap.orc.binary.cache.enabled true -# enable vmemcache strategy -spark.oap.cache.strategy vmem -spark.executor.sql.oap.cache.persistent.memory.initial.size 256g -# according to your cluster -spark.executor.sql.oap.cache.guardian.memory.size 10g -``` -The `vmem` cache strategy is based on libvmemcache (buffer based LRU cache), which provides a key-value store API. Follow these steps to enable vmemcache support in Data Source Cache. - -- `spark.executor.instances`: We suggest setting the value to 2X the number of worker nodes when NUMA binding is enabled. Each worker node runs two executors, each executor is bound to one of the two sockets, and accesses the corresponding PMem device on that socket. -- `spark.executor.sql.oap.cache.persistent.memory.initial.size`: It is configured to the available PMem capacity to be used as data cache per exectutor. - -**NOTE**: If "PendingFiber Size" (on spark web-UI OAP page) is large, or some tasks fail with "cache guardian use too much memory" error, set `spark.executor.sql.oap.cache.guardian.memory.size ` to a larger number as the default size is 10GB. The user could also increase `spark.sql.oap.cache.guardian.free.thread.nums` or decrease `spark.sql.oap.cache.dispose.timeout.ms` to free memory more quickly. - -#### Verify PMem cache functionality - -- After finishing configuration, restart Spark Thrift Server for the configuration changes to take effect. Start at step 2 of the [Use DRAM Cache](./User-Guide.md#use-dram-cache) guide to verify that cache is working correctly. - -- Verify NUMA binding status by confirming keywords like `numactl --cpubind=1 --membind=1` contained in executor launch command. - -- Check PMem cache size by checking disk space with `df -h`.For `vmemcache` strategy, disk usage will reach the initial cache size once the PMem cache is initialized and will not change during workload execution. For `Guava/Noevict` strategies, the command will show disk space usage increases along with workload execution. - - -## Index and Data Cache Separation - -SQL Index and Data Source Cache now supports different cache strategies for DRAM and PMem. To optimize the cache media utilization, you can enable cache separation of data and index with same or different cache media. When Sharing same media, data cache and index cache will use different fiber cache ratio. - - -Here we list 4 different kinds of configuration for index/cache separation, if you choose one of them, please add corresponding configuration to `spark-defaults.conf`. -1. DRAM as cache media, `guava` strategy as index & data cache backend. - -``` -spark.sql.oap.index.data.cache.separation.enabled       true -spark.oap.cache.strategy                                mix -spark.sql.oap.cache.memory.manager                 offheap -``` -The rest configuration you can refer to [Use DRAM Cache](./User-Guide.md#use-dram-cache) - -2. PMem as cache media, `external` strategy as index & data cache backend. - -``` -spark.sql.oap.index.data.cache.separation.enabled       true -spark.oap.cache.strategy                                mix -spark.sql.oap.cache.memory.manager tmp -spark.sql.oap.mix.data.cache.backend external -spark.sql.oap.mix.index.cache.backend external - -``` -The rest configurations can refer to the configurations of [PMem Cache](./User-Guide.md#use-pmem-cache) and [External cache](./User-Guide.md#Configuration-for-enabling-PMem-cache) - -3. DRAM(`offheap`)/`guava` as `index` cache media and backend, PMem(`tmp`)/`external` as `data` cache media and backend. - -``` -spark.sql.oap.index.data.cache.separation.enabled true -spark.oap.cache.strategy mix -spark.sql.oap.cache.memory.manager mix -spark.sql.oap.mix.data.cache.backend external - -# 2x number of your worker nodes -spark.executor.instances 6 -# enable numa -spark.yarn.numa.enabled true -spark.memory.offHeap.enabled false - -spark.sql.oap.dcpmm.free.wait.threshold 50000000000 -# according to your executor core number -spark.executor.sql.oap.cache.external.client.pool.size 10 - -# equal to the size of executor.memoryOverhead -spark.executor.sql.oap.cache.offheap.memory.size 50g -# according to the resource of cluster -spark.executor.memoryOverhead 50g - -# for ORC file format -spark.sql.oap.orc.binary.cache.enabled true -# for Parquet file format -spark.sql.oap.parquet.binary.cache.enabled true -``` - -4. DRAM(`offheap`)/`guava` as `index` cache media and backend, PMem(`pm`)/`guava` as `data` cache media and backend. - -``` -spark.sql.oap.index.data.cache.separation.enabled true -spark.oap.cache.strategy mix -spark.sql.oap.cache.memory.manager mix - -# 2x number of your worker nodes -spark.executor.instances 6 -# enable numa -spark.yarn.numa.enabled true -spark.executorEnv.MEMKIND_ARENA_NUM_PER_KIND 1 -spark.memory.offHeap.enabled false -# PMem capacity per executor -spark.executor.sql.oap.cache.persistent.memory.initial.size 256g -# Reserved space per executor -spark.executor.sql.oap.cache.persistent.memory.reserved.size 50g - -# equal to the size of executor.memoryOverhead -spark.executor.sql.oap.cache.offheap.memory.size 50g -# according to the resource of cluster -spark.executor.memoryOverhead 50g -# for ORC file format -spark.sql.oap.orc.binary.cache.enabled true -# for Parquet file format -spark.sql.oap.parquet.binary.cache.enabled true -``` - ## Cache Hot Tables Data Source Cache also supports caching specific tables by configuring items according to actual situations, these tables are usually hot tables. diff --git a/docs/Developer-Guide.md b/docs/Developer-Guide.md index 4a3d8a6e2..7294a930a 100644 --- a/docs/Developer-Guide.md +++ b/docs/Developer-Guide.md @@ -3,12 +3,9 @@ This document is a supplement to the whole [OAP Developer Guide](OAP-Developer-Guide.md) for SQL Index and Data Source Cache. After following that document, you can continue more details for SQL Index and Data Source Cache. -* [Building](#Building) -* [Enabling NUMA binding for Intel® Optane™ DC Persistent Memory in Spark](#enabling-numa-binding-for-pmem-in-spark) - ## Building -### Building SQL DS Cache +### Prerequisites for building Building with [Apache Maven\*](http://maven.apache.org/). @@ -20,68 +17,18 @@ cd pmem-common mvn clean install -DskipTests ``` -Build the SQL DS Cache package: - -``` -git clone -b https://github.com/oap-project/sql-ds-cache.git -cd sql-ds-cache -mvn clean -DskipTests package -``` - -### Running Tests - -Run all the tests: -``` -mvn clean test -``` -Run a specific test suite, for example `OapDDLSuite`: -``` -mvn -DwildcardSuites=org.apache.spark.sql.execution.datasources.oap.OapDDLSuite test -``` -**NOTE**: Log level of unit tests currently default to ERROR, please override oap-cache/oap/src/test/resources/log4j.properties if needed. - -### Building with Intel® Optane™ DC Persistent Memory Module - -#### Prerequisites for building with PMem support - Install the required packages on the build system: -- [cmake](https://help.directadmin.com/item.php?id=494) -- [memkind](https://github.com/memkind/memkind/tree/v1.10.1) -- [vmemcache](https://github.com/pmem/vmemcache) +- [cmake](https://cmake.org/install/) - [Plasma](http://arrow.apache.org/blog/2017/08/08/plasma-in-memory-object-store/) -#### memkind installation - -The memkind library depends on `libnuma` at the runtime, so it must already exist in the worker node system. Build the latest memkind lib from source: - -``` -git clone -b v1.10.1 https://github.com/memkind/memkind -cd memkind -./autogen.sh -./configure -make -make install -``` -#### vmemcache installation - -To build vmemcache library from source, you can (for RPM-based linux as example): -``` -git clone https://github.com/pmem/vmemcache -cd vmemcache -mkdir build -cd build -cmake .. -DCMAKE_INSTALL_PREFIX=/usr -DCPACK_GENERATOR=rpm -make package -sudo rpm -i libvmemcache*.rpm -``` #### Plasma installation To use optimized Plasma cache with OAP, you need following components: (1) `libarrow.so`, `libplasma.so`, `libplasma_java.so`: dynamic libraries, will be used in Plasma client. (2) `plasma-store-server`: executable file, Plasma cache service. - (3) `arrow-plasma-3.0.0.jar`: will be used when compile oap and spark runtime also need it. + (3) `arrow-plasma-4.0.0.jar`: will be used when compile oap and spark runtime also need it. - `.so` file and binary file Clone code from Arrow repo and run following commands, this will install `libplasma.so`, `libarrow.so`, `libplasma_java.so` and `plasma-store-server` to your system path(`/usr/lib64` by default). And if you are using Spark in a cluster environment, you can copy these files to all nodes in your cluster if the OS or distribution are same, otherwise, you need compile it on each node. @@ -89,7 +36,7 @@ To use optimized Plasma cache with OAP, you need following components: ``` cd /tmp git clone https://github.com/oap-project/arrow.git -cd arrow && git checkout arrow-3.0.0-oap +cd arrow && git checkout arrow-4.0.0-oap-1.1.1 cd cpp mkdir release cd release @@ -99,8 +46,8 @@ make -j$(nproc) sudo make install -j$(nproc) ``` -- arrow-plasma-3.0.0.jar - Run following command, this will install arrow jars to your local maven repo. Besides, you need copy arrow-plasma-3.0.0.jar to `$SPARK_HOME/jars/` dir, cause this jar is needed when using external cache. +- arrow-plasma-4.0.0.jar + Run following command, this will install arrow jars to your local maven repo. Besides, you need copy arrow-plasma-4.0.0.jar to `$SPARK_HOME/jars/` dir, cause this jar is needed when using external cache. ``` cd /tmp/arrow/java @@ -108,53 +55,26 @@ mvn clean -q -pl plasma -am -DskipTests install ``` -#### Building the package -You need to add `-Ppersistent-memory` to build with PMem support. For `noevict` cache strategy, you also need to build with `-Ppersistent-memory` parameter. -``` -cd /pmem-common -mvn clean install -Ppersistent-memory -DskipTests -cd /sql-ds-cache -mvn clean -DskipTests package -``` - -For vmemcache cache strategy, please build with command: +Build the SQL DS Cache package: ``` -cd /pmem-common -mvn clean install -Pvmemcache -DskipTests -cd /sql-ds-cache +git clone -b https://github.com/oap-project/sql-ds-cache.git +cd sql-ds-cache mvn clean -DskipTests package ``` -Build with this command to use all of them: +### Running Tests +Run all the tests: ``` -cd /pmem-common -mvn clean install -Ppersistent-memory -Pvmemcache -DskipTests -cd /sql-ds-cache -mvn clean -DskipTests package +mvn clean test ``` - -## Enabling NUMA binding for PMem in Spark - -### Rebuilding Spark packages with NUMA binding patch - -When using PMem as a cache medium apply the [NUMA](https://www.kernel.org/doc/html/v4.18/vm/numa.html) binding patch [numa-binding-spark-3.0.0.patch](./numa-binding-spark-3.0.0.patch) to Spark source code for best performance. - -1. Download src for [Spark-3.0.0](https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0.tgz) and clone the src from github. - -2. Apply this patch and [rebuild](https://spark.apache.org/docs/latest/building-spark.html) the Spark package. - +Run a specific test suite, for example `OapDDLSuite`: ``` -git apply numa-binding-spark-3.0.0.patch +mvn -DwildcardSuites=org.apache.spark.sql.execution.datasources.oap.OapDDLSuite test ``` +**NOTE**: Log level of unit tests currently default to ERROR, please override `sql-ds-cache/Plasma-based-cache/src/test/resources/log4j.properties` if needed. -3. Add these configuration items to the Spark configuration file $SPARK_HOME/conf/spark-defaults.conf to enable NUMA binding. -``` -spark.yarn.numa.enabled true -``` -**NOTE**: If you are using a customized Spark, you will need to manually resolve the conflicts. - ###### \*Other names and brands may be claimed as the property of others. diff --git a/docs/HCFS-User-Guide.md b/docs/HCFS-User-Guide.md index cc1c534f5..6fd80d832 100644 --- a/docs/HCFS-User-Guide.md +++ b/docs/HCFS-User-Guide.md @@ -1,11 +1,11 @@ # HCFS User Guide * [Prerequisites](#prerequisites) -* [Configurations](#configuration) +* [Configurations](#configurations) ## Prerequisites -HCFS based Data Source Cache on Spark 3.0.0 requires a working Hadoop cluster with YARN and Spark. Running Spark on YARN requires a binary distribution of Spark, which is built with YARN support. The HCFS based Data Source Cache also need to install plasma and redis, please follow [OAP-Installation-Guide](OAP-Installation-Guide.md) for how to install plasma and redis. +HCFS based Data Source Cache on Spark 3.1.1 requires a working Hadoop cluster with YARN and Spark. Running Spark on YARN requires a binary distribution of Spark, which is built with YARN support. The HCFS based Data Source Cache also need to install plasma and redis, please follow [OAP-Installation-Guide](OAP-Installation-Guide.md) for how to install plasma and redis. ## Configurations @@ -35,7 +35,7 @@ spark.hadoop.fs.cachedFs.redis.port $PORT ### Configuration for HCFS cache location policy We provide three HCFS cache location policies, you can choose the best one for you workload -* defalut policy +* default policy This policy the file block locations consist of cached blocks and hdfs blocks (if cached blocks are incomplete) * cache_over_hdfs This policy use cached block location only if all requested content is cached, otherwise use HDFS block locations diff --git a/docs/OAP-Developer-Guide.md b/docs/OAP-Developer-Guide.md index bf7b036b6..b1eb677ed 100644 --- a/docs/OAP-Developer-Guide.md +++ b/docs/OAP-Developer-Guide.md @@ -3,13 +3,13 @@ This document contains the instructions & scripts on installing necessary dependencies and building OAP modules. You can get more detailed information from OAP each module below. -* [SQL Index and Data Source Cache](https://github.com/oap-project/sql-ds-cache/blob/v1.1.0-spark-3.0.0/docs/Developer-Guide.md) -* [PMem Common](https://github.com/oap-project/pmem-common/tree/v1.1.0-spark-3.0.0) -* [PMem Spill](https://github.com/oap-project/pmem-spill/tree/v1.1.0-spark-3.0.0) -* [PMem Shuffle](https://github.com/oap-project/pmem-shuffle/tree/v1.1.0-spark-3.0.0#5-install-dependencies-for-pmem-shuffle) -* [Remote Shuffle](https://github.com/oap-project/remote-shuffle/tree/v1.1.0-spark-3.0.0) -* [OAP MLlib](https://github.com/oap-project/oap-mllib/tree/v1.1.0-spark-3.0.0) -* [Native SQL Engine](https://github.com/oap-project/native-sql-engine/tree/v1.1.0-spark-3.0.0) +* [SQL Index and Data Source Cache](https://github.com/oap-project/sql-ds-cache/blob/v1.1.1-spark-3.1.1/docs/Developer-Guide.md) +* [PMem Common](https://github.com/oap-project/pmem-common/tree/v1.1.1-spark-3.1.1) +* [PMem Spill](https://github.com/oap-project/pmem-spill/tree/v1.1.1-spark-3.1.1) +* [PMem Shuffle](https://github.com/oap-project/pmem-shuffle/tree/v1.1.1-spark-3.1.1#5-install-dependencies-for-pmem-shuffle) +* [Remote Shuffle](https://github.com/oap-project/remote-shuffle/tree/v1.1.1-spark-3.1.1) +* [OAP MLlib](https://github.com/oap-project/oap-mllib/tree/v1.1.1-spark-3.1.1) +* [Native SQL Engine](https://github.com/oap-project/native-sql-engine/tree/v1.1.1-spark-3.1.1) ## Building OAP @@ -22,45 +22,42 @@ We provide scripts to help automatically install dependencies required, please c # cd oap-tools # sh dev/install-compile-time-dependencies.sh ``` -*Note*: oap-tools tag version `v1.1.0-spark-3.0.0` corresponds to all OAP modules' tag version `v1.1.0-spark-3.0.0`. +*Note*: oap-tools tag version `v1.1.1-spark-3.1.1` corresponds to all OAP modules' tag version `v1.1.1-spark-3.1.1`. Then the dependencies below will be installed: -* [Cmake](https://help.directadmin.com/item.php?id=494) +* [Cmake](https://cmake.org/install/) * [GCC > 7](https://gcc.gnu.org/wiki/InstallingGCC) * [Memkind](https://github.com/memkind/memkind/tree/v1.10.1) * [Vmemcache](https://github.com/pmem/vmemcache) * [HPNL](https://github.com/Intel-bigdata/HPNL) * [PMDK](https://github.com/pmem/pmdk) * [OneAPI](https://software.intel.com/content/www/us/en/develop/tools/oneapi.html) -* [Arrow](https://github.com/oap-project/arrow/tree/arrow-3.0.0-oap-1.1) +* [Arrow](https://github.com/oap-project/arrow/tree/arrow-4.0.0-oap-1.1.1) * [LLVM](https://llvm.org/) -Run the following command to learn more. - -``` -# sh dev/scripts/prepare_oap_env.sh --help -``` - -Run the following command to automatically install specific dependency such as Maven. - -``` -# sh dev/scripts/prepare_oap_env.sh --prepare_maven -``` - - **Requirements for Shuffle Remote PMem Extension** If enable Shuffle Remote PMem extension with RDMA, you can refer to [PMem Shuffle](https://github.com/oap-project/pmem-shuffle) to configure and validate RDMA in advance. ### Building +#### Building OAP + OAP is built with [Apache Maven](http://maven.apache.org/) and Oracle Java 8. -To build OAP package, run command below then you can find a tarball named `oap-$VERSION-bin-spark-$VERSION.tar.gz` under directory `$OAP_TOOLS_HOME/dev/release-package `. +To build OAP package, run command below then you can find a tarball named `oap-$VERSION-bin-spark-$VERSION.tar.gz` under directory `$OAP_TOOLS_HOME/dev/release-package `, which contains all OAP module jars. +Change to `root` user, run + ``` -$ sh $OAP_TOOLS_HOME/dev/compile-oap.sh +# cd oap-tools +# sh dev/compile-oap.sh ``` -Building specified OAP Module, such as `sql-ds-cache`, run: +#### Building OAP specific module + +If you just want to build a specific OAP Module, such as `sql-ds-cache`, change to `root` user, then run: + ``` -$ sh $OAP_TOOLS_HOME/dev/compile-oap.sh --sql-ds-cache +# cd oap-tools +# sh dev/compile-oap.sh --sql-ds-cache ``` diff --git a/docs/OAP-Installation-Guide.md b/docs/OAP-Installation-Guide.md index c269b978e..bdf10b745 100644 --- a/docs/OAP-Installation-Guide.md +++ b/docs/OAP-Installation-Guide.md @@ -29,17 +29,17 @@ Create a Conda environment and install OAP Conda package. ```bash $ conda create -n oapenv -y python=3.7 $ conda activate oapenv -$ conda install -c conda-forge -c intel -y oap=1.1.0 +$ conda install -c conda-forge -c intel -y oap=1.1.1 ``` Once finished steps above, you have completed OAP dependencies installation and OAP building, and will find built OAP jars under `$HOME/miniconda2/envs/oapenv/oap_jars` Dependencies below are required by OAP and all of them are included in OAP Conda package, they will be automatically installed in your cluster when you Conda install OAP. Ensure you have activated environment which you created in the previous steps. -- [Arrow](https://github.com/Intel-bigdata/arrow) +- [Arrow](https://github.com/oap-project/arrow/tree/arrow-4.0.0-oap-1.1.1) - [Plasma](http://arrow.apache.org/blog/2017/08/08/plasma-in-memory-object-store/) -- [Memkind](https://anaconda.org/intel/memkind) -- [Vmemcache](https://anaconda.org/intel/vmemcache) +- [Memkind](https://github.com/memkind/memkind/tree/v1.10.1) +- [Vmemcache](https://github.com/pmem/vmemcache.git) - [HPNL](https://anaconda.org/intel/hpnl) - [PMDK](https://github.com/pmem/pmdk) - [OneAPI](https://software.intel.com/content/www/us/en/develop/tools/oneapi.html) diff --git a/docs/User-Guide.md b/docs/User-Guide.md index 3bb5a50a2..c18ed4719 100644 --- a/docs/User-Guide.md +++ b/docs/User-Guide.md @@ -12,7 +12,7 @@ ## Prerequisites -SQL Index and Data Source Cache on Spark 3.0.0 requires a working Hadoop cluster with YARN and Spark. Running Spark on YARN requires a binary distribution of Spark, which is built with YARN support. +SQL Index and Data Source Cache on Spark 3.1.1 requires a working Hadoop cluster with YARN and Spark. Running Spark on YARN requires a binary distribution of Spark, which is built with YARN support. ## Getting Started @@ -263,21 +263,12 @@ Socket Configuration -> Intel UPI General Configuration -> Stale AtoS : Disable For more information you can refer to [Quick Start Guide: Provision Intel® Optane™ DC Persistent Memory](https://software.intel.com/content/www/us/en/develop/articles/quick-start-guide-configure-intel-optane-dc-persistent-memory-on-linux.html) -- SQL Data Source Cache uses Plasma as a node-level external cache service, the benefit of using external cache is data could be shared across process boundaries. [Plasma](http://arrow.apache.org/blog/2017/08/08/plasma-in-memory-object-store/) is a high-performance shared-memory object store and a component of [Apache Arrow](https://github.com/apache/arrow). We have modified Plasma to support PMem, and make it open source on [oap-project-Arrow](https://github.com/oap-project/arrow/tree/arrow-3.0.0-oap) repo. If you have finished [OAP Installation Guide](OAP-Installation-Guide.md), Plasma will be automatically installed and then you just need copy `arrow-plasma-3.0.0.jar` to `$SPARK_HOME/jars`. For manual building and installation steps you can refer to [Plasma installation](./Developer-Guide.md#Plasma-installation). +- SQL Data Source Cache uses Plasma as a node-level external cache service, the benefit of using external cache is data could be shared across process boundaries. [Plasma](http://arrow.apache.org/blog/2017/08/08/plasma-in-memory-object-store/) is a high-performance shared-memory object store and a component of [Apache Arrow](https://github.com/apache/arrow). We have modified Plasma to support PMem, and make it open source on [oap-project-Arrow](https://github.com/oap-project/arrow/tree/arrow-4.0.0-oap-1.1.1) repo. If you have finished [OAP Installation Guide](OAP-Installation-Guide.md), Plasma will be automatically installed and then you just need copy `arrow-plasma-4.0.0.jar` to `$SPARK_HOME/jars`. For manual building and installation steps you can refer to [Plasma installation](./Developer-Guide.md#Plasma-installation). - Refer to configuration below to apply external cache strategy and start Plasma service on each node and start your workload. -#### Configuration for NUMA - -Install `numactl` to bind the executor to the PMem device on the same NUMA node. - -```yum install numactl -y ``` - -We recommend you use NUMA-patched Spark to achieve better performance gain for the `external` strategy compared with Community Spark. -Build Spark from source to enable NUMA-binding support, refer to [Enabling-NUMA-binding-for-PMem-in-Spark](./Developer-Guide.md#Enabling-NUMA-binding-for-PMem-in-Spark). - #### Configuration for enabling PMem cache Add the following configuration to `$SPARK_HOME/conf/spark-defaults.conf`. @@ -285,8 +276,6 @@ Add the following configuration to `$SPARK_HOME/conf/spark-defaults.conf`. ``` # 2x number of your worker nodes spark.executor.instances 6 -# enable numa -spark.yarn.numa.enabled true # enable SQL Index and Data Source Cache extension in Spark spark.sql.extensions org.apache.spark.sql.OapExtensions @@ -364,7 +353,7 @@ Run ```yarn app -destroy plasma-store-service```to destroy it. This section provides instructions and tools for running TPC-DS queries to evaluate the cache performance of various configurations. The TPC-DS suite has many queries and we select 9 I/O intensive queries to simplify performance evaluation. -We created some tool scripts [oap-benchmark-tool.zip](https://github.com/oap-project/oap-tools/releases/download/v1.1.0-spark-3.0.0/oap-benchmark-tool.zip) to simplify running the workload. If you are already familiar with TPC-DS data generation and running a TPC-DS tool suite, skip our tool and use the TPC-DS tool suite directly. +We created some tool scripts [oap-benchmark-tool.zip](https://github.com/oap-project/oap-tools/releases/download/v1.1.1-spark-3.1.1/oap-benchmark-tool.zip) to simplify running the workload. If you are already familiar with TPC-DS data generation and running a TPC-DS tool suite, skip our tool and use the TPC-DS tool suite directly. ### Prerequisites @@ -372,7 +361,7 @@ We created some tool scripts [oap-benchmark-tool.zip](https://github.com/oap-pro ### Prepare the Tool -1. Download [oap-benchmark-tool.zip](https://github.com/oap-project/oap-tools/releases/download/v1.1.0-spark-3.0.0/oap-benchmark-tool.zip) and unzip to a folder (for example, `oap-benchmark-tool` folder) on your working node. +1. Download [oap-benchmark-tool.zip](https://github.com/oap-project/oap-tools/releases/download/v1.1.1-spark-3.1.1/oap-benchmark-tool.zip) and unzip to a folder (for example, `oap-benchmark-tool` folder) on your working node. 2. Copy `oap-benchmark-tool/tools/tpcds-kits` to ***ALL*** worker nodes under the same folder (for example, `/home/oap/tpcds-kits`). ### Generate TPC-DS Data @@ -389,7 +378,7 @@ We created some tool scripts [oap-benchmark-tool.zip](https://github.com/oap-pro For example: ``` -export SPARK_HOME=/home/oap/spark-3.0.0 +export SPARK_HOME=/home/oap/spark-3.1.1 export TPCDS_KITS_DIR=/home/oap/tpcds-kits export NAMENODE_ADDRESS=mynamenode:9000 export THRIFT_SERVER_ADDRESS=mythriftserver @@ -465,14 +454,6 @@ And the Spark webUI OAP tab has more specific OAP cache metrics just as [section ## Advanced Configuration -- [Additional Cache Strategies](./Advanced-Configuration.md#Additional-Cache-Strategies) - - In addition to **external** cache strategy, SQL Data Source Cache also supports 3 other cache strategies: **guava**, **noevict** and **vmemcache**. - -- [Index and Data Cache Separation](./Advanced-Configuration.md#Index-and-Data-Cache-Separation) - - To optimize the cache media utilization, SQL Data Source Cache supports cache separation of data and index, by using same or different cache media with DRAM and PMem. - - [Cache Hot Tables](./Advanced-Configuration.md#Cache-Hot-Tables) Data Source Cache also supports caching specific tables according to actual situations, these tables are usually hot tables. diff --git a/docs/numa-binding-spark-3.0.0.patch b/docs/numa-binding-spark-3.0.0.patch deleted file mode 100644 index 448dd4c00..000000000 --- a/docs/numa-binding-spark-3.0.0.patch +++ /dev/null @@ -1,378 +0,0 @@ -diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala -index 25c5b9812f..25e25376cb 100644 ---- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala -+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala -@@ -49,6 +49,7 @@ private[spark] class CoarseGrainedExecutorBackend( - override val rpcEnv: RpcEnv, - driverUrl: String, - executorId: String, -+ numaNodeId: Option[String], - bindAddress: String, - hostname: String, - cores: Int, -@@ -253,6 +254,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { - case class Arguments( - driverUrl: String, - executorId: String, -+ numaNodeId: Option[String], - bindAddress: String, - hostname: String, - cores: Int, -@@ -266,8 +268,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { - val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) => - CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) => - new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, -- arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env, -- arguments.resourcesFileOpt, resourceProfile) -+ arguments.numaNodeId, arguments.bindAddress, arguments.hostname, arguments.cores, -+ arguments.userClassPath, env, arguments.resourcesFileOpt, resourceProfile) - } - run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn) - System.exit(0) -@@ -322,14 +324,17 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { - driverConf.set(key, value) - } - } -+ driverConf.set("spark.executor.numa.id", s"${arguments.numaNodeId.getOrElse(-1)}") - - cfg.hadoopDelegationCreds.foreach { tokens => - SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf) - } - - driverConf.set(EXECUTOR_ID, arguments.executorId) -- val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress, -- arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false) -+ val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, -+ arguments.bindAddress, arguments.hostname, arguments.cores, cfg.ioEncryptionKey, -+ isLocal = false) -+ SparkEnv.get.conf.set("spark.executor.numa.id", s"${arguments.numaNodeId.getOrElse(-1)}") - - env.rpcEnv.setupEndpoint("Executor", - backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile)) -@@ -343,6 +348,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { - def parseArguments(args: Array[String], classNameForEntry: String): Arguments = { - var driverUrl: String = null - var executorId: String = null -+ var numaNodeId: Option[String] = None - var bindAddress: String = null - var hostname: String = null - var cores: Int = 0 -@@ -376,6 +382,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { - case ("--app-id") :: value :: tail => - appId = value - argv = tail -+ case ("--numa-node-id") :: value :: tail => -+ numaNodeId = Some(value.trim) -+ argv = tail - case ("--worker-url") :: value :: tail => - // Worker url is used in spark standalone mode to enforce fate-sharing with worker - workerUrl = Some(value) -@@ -408,7 +417,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { - bindAddress = hostname - } - -- Arguments(driverUrl, executorId, bindAddress, hostname, cores, appId, workerUrl, -+ Arguments(driverUrl, executorId, numaNodeId, bindAddress, hostname, cores, appId, workerUrl, - userClassPath, resourcesFileOpt, resourceProfileId) - } - -@@ -422,6 +431,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { - | --driver-url - | --executor-id - | --bind-address -+ | --numa-node-id - | --hostname - | --cores - | --resourcesFile -diff --git a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala -index 3134a738b3..e0d5db3312 100644 ---- a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala -+++ b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala -@@ -55,7 +55,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite - val env = createMockEnv(conf, serializer) - - // we don't really use this, just need it to get at the parser function -- val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1", -+ val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", None, "host1", "host1", - 4, Seq.empty[URL], env, None, resourceProfile) - withTempDir { tmpDir => - val testResourceArgs: JObject = ("" -> "") -@@ -76,7 +76,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite - val serializer = new JavaSerializer(conf) - val env = createMockEnv(conf, serializer) - // we don't really use this, just need it to get at the parser function -- val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1", -+ val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", None, "host1", "host1", - 4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) - withTempDir { tmpDir => - val ra = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1")) -@@ -110,7 +110,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite - val serializer = new JavaSerializer(conf) - val env = createMockEnv(conf, serializer) - // we don't really use this, just need it to get at the parser function -- val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1", -+ val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", None, "host1", "host1", - 4, Seq.empty[URL], env, None, resourceProfile) - - withTempDir { tmpDir => -@@ -137,8 +137,8 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite - val serializer = new JavaSerializer(conf) - val env = createMockEnv(conf, serializer) - // we don't really use this, just need it to get at the parser function -- val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1", -- 4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) -+ val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", None, "host1", -+ "host1", 4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) - - // not enough gpu's on the executor - withTempDir { tmpDir => -@@ -190,8 +190,8 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite - val serializer = new JavaSerializer(conf) - val env = createMockEnv(conf, serializer) - // we don't really use this, just need it to get at the parser function -- val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1", -- 4, Seq.empty[URL], env, None, resourceProfile) -+ val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", None, "host1", -+ "host1", 4, Seq.empty[URL], env, None, resourceProfile) - - // executor resources < required - withTempDir { tmpDir => -@@ -221,8 +221,8 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite - val env = createMockEnv(conf, serializer) - - // we don't really use this, just need it to get at the parser function -- val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1", -- 4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) -+ val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", None, "host1", -+ "host1", 4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) - - val parsedResources = backend.parseOrFindResources(None) - -@@ -268,7 +268,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite - val env = createMockEnv(conf, serializer) - - // we don't really use this, just need it to get at the parser function -- val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1", -+ val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", None, "host1", "host1", - 4, Seq.empty[URL], env, None, resourceProfile) - val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1")) - val ja = Extraction.decompose(Seq(gpuArgs)) -@@ -293,7 +293,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite - try { - val rpcEnv = RpcEnv.create("1", "localhost", 0, conf, securityMgr) - val env = createMockEnv(conf, serializer, Some(rpcEnv)) -- backend = new CoarseGrainedExecutorBackend(env.rpcEnv, rpcEnv.address.hostPort, "1", -+ backend = new CoarseGrainedExecutorBackend(env.rpcEnv, rpcEnv.address.hostPort, "1", None, - "host1", "host1", 4, Seq.empty[URL], env, None, - resourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf)) - assert(backend.taskResources.isEmpty) -diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala -index 862acd8c03..1248028a63 100644 ---- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala -+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala -@@ -457,7 +457,7 @@ private[spark] class ApplicationMaster( - val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt - val executorCores = _sparkConf.get(EXECUTOR_CORES) - val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "", -- "", executorMemory, executorCores, appId, securityMgr, localResources, -+ None, "", executorMemory, executorCores, appId, securityMgr, localResources, - ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) - dummyRunner.launchContextDebugInfo() - } -diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala -index d9262bbac6..85a99af1ea 100644 ---- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala -+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala -@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC - import org.apache.hadoop.yarn.util.Records - - import org.apache.spark.{SecurityManager, SparkConf, SparkException} -+import org.apache.spark.deploy.yarn.config._ - import org.apache.spark.internal.Logging - import org.apache.spark.internal.config._ - import org.apache.spark.network.util.JavaUtils -@@ -49,6 +50,7 @@ private[yarn] class ExecutorRunnable( - sparkConf: SparkConf, - masterAddress: String, - executorId: String, -+ numaNodeId: Option[String], - hostname: String, - executorMemory: Int, - executorCores: Int, -@@ -200,9 +202,23 @@ private[yarn] class ExecutorRunnable( - Seq("--user-class-path", "file:" + absPath) - }.toSeq - -+ val numaEnabled = sparkConf.get(SPARK_YARN_NUMA_ENABLED) -+ -+ logInfo(s"[NUMACHECK] numaEnabled $numaEnabled executorId $executorId") -+ // Don't need numa binding for driver. -+ val (numaCtlCommand, numaNodeOpts) = if (numaEnabled && executorId != "" -+ && numaNodeId.nonEmpty) { -+ logInfo(s"numaNodeId ${numaNodeId.get}") -+ val command = s"numactl --cpubind=${numaNodeId.get} --membind=${numaNodeId.get} " -+ (command, Seq("--numa-node-id", numaNodeId.get.toString)) -+ } else { -+ ("", Nil) -+ } -+ -+ logInfo(s"[NUMACHECK] numactl command $numaCtlCommand") - YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts) - val commands = prefixEnv ++ -- Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++ -+ Seq(numaCtlCommand + Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++ - javaOpts ++ - Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend", - "--driver-url", masterAddress, -@@ -211,11 +227,13 @@ private[yarn] class ExecutorRunnable( - "--cores", executorCores.toString, - "--app-id", appId, - "--resourceProfileId", resourceProfileId.toString) ++ -+ numaNodeOpts ++ - userClassPath ++ - Seq( - s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout", - s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr") - -+ logInfo(s"[NUMACHECK] container command $commands") - // TODO: it would be nicer to just make sure there are no null commands here - commands.map(s => if (s == null) "null" else s).toList - } -diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala -index 09414cbbe5..ada16307c9 100644 ---- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala -+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala -@@ -184,6 +184,14 @@ private[yarn] class YarnAllocator( - - def isAllNodeBlacklisted: Boolean = allocatorBlacklistTracker.isAllNodeBlacklisted - -+ // The total number of numa node -+ private[yarn] val totalNumaNumber = sparkConf.get(SPARK_YARN_NUMA_NUMBER) -+ // Mapping from host to executor counter -+ private[yarn] case class NumaInfo(cotainer2numa: mutable.HashMap[String, Int], -+ numaUsed: Array[Int]) -+ -+ private[yarn] val hostToNumaInfo = new mutable.HashMap[String, NumaInfo]() -+ - /** - * A sequence of pending container requests that have not yet been fulfilled. - */ -@@ -532,11 +540,25 @@ private[yarn] class YarnAllocator( - for (container <- containersToUse) { - executorIdCounter += 1 - val executorHostname = container.getNodeId.getHost -+ // Setting the numa id that the executor should binding. -+ // new numaid binding method -+ val numaInfo = hostToNumaInfo.getOrElseUpdate(executorHostname, -+ NumaInfo(new mutable.HashMap[String, Int], new Array[Int](totalNumaNumber))) -+ val minUsed = numaInfo.numaUsed.min -+ val newNumaNodeId = numaInfo.numaUsed.indexOf(minUsed) -+ numaInfo.cotainer2numa.put(container.getId.toString, newNumaNodeId) -+ numaInfo.numaUsed(newNumaNodeId) += 1 -+ -+ val numaNodeId = newNumaNodeId.toString -+ logInfo(s"numaNodeId: $numaNodeId on host $executorHostname," + -+ "container: " + container.getId.toString + -+ ", minUsed: " + minUsed) -+ - val containerId = container.getId - val executorId = executorIdCounter.toString - assert(container.getResource.getMemory >= resource.getMemory) - logInfo(s"Launching container $containerId on host $executorHostname " + -- s"for executor with ID $executorId") -+ s"for executor with ID $executorId with numa ID $numaNodeId") - - def updateInternalState(): Unit = synchronized { - runningExecutors.add(executorId) -@@ -561,6 +583,7 @@ private[yarn] class YarnAllocator( - sparkConf, - driverUrl, - executorId, -+ Some(numaNodeId), - executorHostname, - executorMemory, - executorCores, -@@ -619,6 +642,17 @@ private[yarn] class YarnAllocator( - // there are some exit status' we shouldn't necessarily count against us, but for - // now I think its ok as none of the containers are expected to exit. - val exitStatus = completedContainer.getExitStatus -+ -+ var numaNodeId = -1 -+ val hostName = hostOpt.getOrElse("nohost") -+ val numaInfoOp = hostToNumaInfo.get(hostName) -+ numaInfoOp match { -+ case Some(numaInfo) => -+ numaNodeId = numaInfo.cotainer2numa.get(containerId.toString).getOrElse(-1) -+ if(-1 != numaNodeId) numaInfo.numaUsed(numaNodeId) -= 1 -+ case _ => numaNodeId = -1 -+ } -+ - val (exitCausedByApp, containerExitReason) = exitStatus match { - case ContainerExitStatus.SUCCESS => - (false, s"Executor for container $containerId exited because of a YARN event (e.g., " + -diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala -index 3797491bb2..7de7d8c468 100644 ---- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala -+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala -@@ -158,6 +158,17 @@ package object config { - - /* Launcher configuration. */ - -+ private[spark] val SPARK_YARN_NUMA_ENABLED = ConfigBuilder("spark.yarn.numa.enabled") -+ .doc("Whether enabling numa binding when executor start up. This is recommend to true " + -+ "when persistent memory is enabled.") -+ .booleanConf -+ .createWithDefault(false) -+ -+ private[spark] val SPARK_YARN_NUMA_NUMBER = ConfigBuilder("spark.yarn.numa.number") -+ .doc("Total number of numanodes in physical server") -+ .intConf -+ .createWithDefault(2) -+ - private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.yarn.submit.waitAppCompletion") - .doc("In cluster mode, whether to wait for the application to finish before exiting the " + - "launcher process.") -diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala -index 669e39fb7c..1aec20bd9f 100644 ---- a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala -+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala -@@ -35,6 +35,7 @@ private[spark] class YarnCoarseGrainedExecutorBackend( - rpcEnv: RpcEnv, - driverUrl: String, - executorId: String, -+ numaNodeId: Option[String], - bindAddress: String, - hostname: String, - cores: Int, -@@ -46,6 +47,7 @@ private[spark] class YarnCoarseGrainedExecutorBackend( - rpcEnv, - driverUrl, - executorId, -+ numaNodeId, - bindAddress, - hostname, - cores, -@@ -73,8 +75,8 @@ private[spark] object YarnCoarseGrainedExecutorBackend extends Logging { - val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv, ResourceProfile) => - CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) => - new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, -- arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env, -- arguments.resourcesFileOpt, resourceProfile) -+ arguments.numaNodeId, arguments.bindAddress, arguments.hostname, arguments.cores, -+ arguments.userClassPath, env, arguments.resourcesFileOpt, resourceProfile) - } - val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args, - this.getClass.getCanonicalName.stripSuffix("$")) -diff --git a/scalastyle-config.xml b/scalastyle-config.xml -index 73ac14fdba..053a35ea01 100644 ---- a/scalastyle-config.xml -+++ b/scalastyle-config.xml -@@ -94,7 +94,7 @@ This file is divided into 3 sections: - - - -- -+ - - - diff --git a/mkdocs.yml b/mkdocs.yml index e550fa5a8..b4e8f7938 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -21,6 +21,6 @@ theme: readthedocs plugins: - search - mkdocs-versioning: - version: master + version: 1.1.1 exclude_from_nav: ["image", "js", "css", "fonts", "img"]