Skip to content

Commit

Permalink
[SPARK-48518][CORE] Make LZF compression be able to run in parallel
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR introduced a config that turns on LZF compression to parallel mode via using PLZFOutputStream.

FYI, https://github.com/ning/compress?tab=readme-ov-file#parallel-processing

### Why are the changes needed?

Improve performance

```
[info] OpenJDK 64-Bit Server VM 17.0.10+0 on Mac OS X 14.5
[info] Apple M2 Max
[info] Compress large objects:                        Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] -----------------------------------------------------------------------------------------------------------------------------
[info] Compression 1024 array values in 7 threads                12             13           1          0.1       11788.2       1.0X
[info] Compression 1024 array values single-threaded             23             23           0          0.0       22512.7       0.5X
```

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

benchmark
### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#46858 from yaooqinn/SPARK-48518.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
  • Loading branch information
yaooqinn committed Jun 4, 2024
1 parent 02c6456 commit 90ee299
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 1 deletion.
19 changes: 19 additions & 0 deletions core/benchmarks/LZFBenchmark-jdk21-results.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
================================================================================================
Benchmark LZFCompressionCodec
================================================================================================

OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
AMD EPYC 7763 64-Core Processor
Compress small objects: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------------------------------------------
Compression 256000000 int values in parallel 598 600 2 428.2 2.3 1.0X
Compression 256000000 int values single-threaded 568 570 2 451.0 2.2 1.1X

OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
AMD EPYC 7763 64-Core Processor
Compress large objects: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------
Compression 1024 array values in 1 threads 39 45 5 0.0 38475.4 1.0X
Compression 1024 array values single-threaded 32 33 1 0.0 31154.5 1.2X


19 changes: 19 additions & 0 deletions core/benchmarks/LZFBenchmark-results.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
================================================================================================
Benchmark LZFCompressionCodec
================================================================================================

OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1021-azure
AMD EPYC 7763 64-Core Processor
Compress small objects: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------------------------------------------
Compression 256000000 int values in parallel 602 612 6 425.1 2.4 1.0X
Compression 256000000 int values single-threaded 610 617 5 419.8 2.4 1.0X

OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1021-azure
AMD EPYC 7763 64-Core Processor
Compress large objects: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------
Compression 1024 array values in 1 threads 35 43 6 0.0 33806.8 1.0X
Compression 1024 array values single-threaded 32 32 0 0.0 30990.4 1.1X


Original file line number Diff line number Diff line change
Expand Up @@ -2031,6 +2031,13 @@ package object config {
.intConf
.createWithDefault(1)

private[spark] val IO_COMPRESSION_LZF_PARALLEL =
ConfigBuilder("spark.io.compression.lzf.parallel.enabled")
.doc("When true, LZF compression will use multiple threads to compress data in parallel.")
.version("4.0.0")
.booleanConf
.createWithDefault(false)

private[spark] val IO_WARNING_LARGEFILETHRESHOLD =
ConfigBuilder("spark.io.warning.largeFileThreshold")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.Locale

import com.github.luben.zstd.{NoPool, RecyclingBufferPool, ZstdInputStreamNoFinalizer, ZstdOutputStreamNoFinalizer}
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import com.ning.compress.lzf.parallel.PLZFOutputStream
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream, LZ4Factory}
import net.jpountz.xxhash.XXHashFactory
import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}
Expand Down Expand Up @@ -170,9 +171,14 @@ class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec {
*/
@DeveloperApi
class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {
private val parallelCompression = conf.get(IO_COMPRESSION_LZF_PARALLEL)

override def compressedOutputStream(s: OutputStream): OutputStream = {
new LZFOutputStream(s).setFinishBlockOnFlush(true)
if (parallelCompression) {
new PLZFOutputStream(s)
} else {
new LZFOutputStream(s).setFinishBlockOnFlush(true)
}
}

override def compressedInputStream(s: InputStream): InputStream = new LZFInputStream(s)
Expand Down
93 changes: 93 additions & 0 deletions core/src/test/scala/org/apache/spark/io/LZFBenchmark.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.io

import java.io.{ByteArrayOutputStream, ObjectOutputStream}
import java.lang.management.ManagementFactory

import org.apache.spark.SparkConf
import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
import org.apache.spark.internal.config.IO_COMPRESSION_LZF_PARALLEL

/**
* Benchmark for ZStandard codec performance.
* {{{
* To run this benchmark:
* 1. without sbt: bin/spark-submit --class <this class> <spark core test jar>
* 2. build/sbt "core/Test/runMain <this class>"
* 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/Test/runMain <this class>"
* Results will be written to "benchmarks/ZStandardBenchmark-results.txt".
* }}}
*/
object LZFBenchmark extends BenchmarkBase {

override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
runBenchmark("Benchmark LZFCompressionCodec") {
compressSmallObjects()
compressLargeObjects()
}
}

private def compressSmallObjects(): Unit = {
val N = 256_000_000
val benchmark = new Benchmark("Compress small objects", N, output = output)
Seq(true, false).foreach { parallel =>
val conf = new SparkConf(false).set(IO_COMPRESSION_LZF_PARALLEL, parallel)
val condition = if (parallel) "in parallel" else "single-threaded"
benchmark.addCase(s"Compression $N int values $condition") { _ =>
val os = new LZFCompressionCodec(conf).compressedOutputStream(new ByteArrayOutputStream())
for (i <- 1 until N) {
os.write(i)
}
os.close()
}
}
benchmark.run()
}

private def compressLargeObjects(): Unit = {
val N = 1024
val data: Array[Byte] = (1 until 128 * 1024 * 1024).map(_.toByte).toArray
val benchmark = new Benchmark(s"Compress large objects", N, output = output)

// com.ning.compress.lzf.parallel.PLZFOutputStream.getNThreads
def getNThreads: Int = {
var nThreads = Runtime.getRuntime.availableProcessors
val jmx = ManagementFactory.getOperatingSystemMXBean
if (jmx != null) {
val loadAverage = jmx.getSystemLoadAverage.toInt
if (nThreads > 1 && loadAverage >= 1) nThreads = Math.max(1, nThreads - loadAverage)
}
nThreads
}
Seq(true, false).foreach { parallel =>
val conf = new SparkConf(false).set(IO_COMPRESSION_LZF_PARALLEL, parallel)
val condition = if (parallel) s"in $getNThreads threads" else "single-threaded"
benchmark.addCase(s"Compression $N array values $condition") { _ =>
val baos = new ByteArrayOutputStream()
val zcos = new LZFCompressionCodec(conf).compressedOutputStream(baos)
val oos = new ObjectOutputStream(zcos)
1 to N foreach { _ =>
oos.writeObject(data)
}
oos.close()
}
}
benchmark.run()
}
}
8 changes: 8 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1895,6 +1895,14 @@ Apart from these, the following properties are also available, and may be useful
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.io.compression.lzf.parallel.enabled</code></td>
<td>false</td>
<td>
When true, LZF compression will use multiple threads to compress data in parallel.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.kryo.classesToRegister</code></td>
<td>(none)</td>
Expand Down

0 comments on commit 90ee299

Please sign in to comment.