Skip to content

Commit

Permalink
[Streaming] Streaming data transfer java (ray-project#6474)
Browse files Browse the repository at this point in the history
  • Loading branch information
chaokunyang authored and raulchen committed Dec 22, 2019
1 parent 1b14fbe commit 7bbfa85
Show file tree
Hide file tree
Showing 146 changed files with 3,923 additions and 786 deletions.
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ java/runtime/native_dependencies/

# streaming/python
streaming/python/generated/
streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/generated/
streaming/build/java
.clwb
streaming/**/.settings
streaming/java/**/target
streaming/java/**/.classpath
streaming/java/**/.project
streaming/java/**/*.log

# python virtual env
venv
Expand Down
12 changes: 9 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,25 @@ matrix:
- ./java/test.sh

- os: linux
env: BAZEL_PYTHON_VERSION=PY3 PYTHON=3.5 PYTHONWARNINGS=ignore TESTSUITE=streaming
env:
- TESTSUITE=streaming
- JDK='Oracle JDK 8'
- RAY_INSTALL_JAVA=1
- BAZEL_PYTHON_VERSION=PY3
- PYTHON=3.5 PYTHONWARNINGS=ignore
install:
- python $TRAVIS_BUILD_DIR/ci/travis/determine_tests_to_run.py
- eval `python $TRAVIS_BUILD_DIR/ci/travis/determine_tests_to_run.py`
- if [ $RAY_CI_STREAMING_PYTHON_AFFECTED != "1" ]; then exit; fi
- if [[ $RAY_CI_STREAMING_PYTHON_AFFECTED != "1" && $RAY_CI_STREAMING_JAVA_AFFECTED != "1" ]]; then exit; fi
- ./ci/suppress_output ./ci/travis/install-bazel.sh
- ./ci/suppress_output ./ci/travis/install-dependencies.sh
- export PATH="$HOME/miniconda/bin:$PATH"
- ./ci/suppress_output ./ci/travis/install-ray.sh
script:
# Streaming cpp test.
- if [ $RAY_CI_STREAMING_CPP_AFFECTED == "1" ]; then ./ci/suppress_output bash streaming/src/test/run_streaming_queue_test.sh; fi
- if [ RAY_CI_STREAMING_PYTHON_AFFECTED == "1" ]; then python -m pytest -v --durations=5 --timeout=300 python/ray/streaming/tests/; fi
- if [ $RAY_CI_STREAMING_PYTHON_AFFECTED == "1" ]; then python -m pytest -v --durations=5 --timeout=300 streaming/python/tests/; fi
- if [ $RAY_CI_STREAMING_JAVA_AFFECTED == "1" ]; then ./streaming/java/test.sh; fi

- os: linux
env: LINT=1 PYTHONWARNINGS=ignore
Expand Down
13 changes: 13 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -968,10 +968,23 @@ cc_binary(
"@bazel_tools//src/conditions:darwin": ["external/bazel_tools/tools/jdk/include/darwin"],
"//conditions:default": ["external/bazel_tools/tools/jdk/include/linux"],
}),
# Export ray ABI symbols, which can then be used by libstreaming_java.so. see `//:_raylet`
linkopts = select({
"@bazel_tools//src/conditions:darwin": [
"-Wl,-exported_symbols_list,$(location //:src/ray/ray_exported_symbols.lds)",
],
"@bazel_tools//src/conditions:windows": [
],
"//conditions:default": [
"-Wl,--version-script,$(location //:src/ray/ray_version_script.lds)",
],
}),
linkshared = 1,
linkstatic = 1,
deps = [
"//:core_worker_lib",
"//:src/ray/ray_exported_symbols.lds",
"//:src/ray/ray_version_script.lds",
],
)

Expand Down
4 changes: 2 additions & 2 deletions bazel/ray.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def define_java_module(
)
checkstyle_test(
name = "org_ray_ray_" + name + "-checkstyle",
target = "//java:org_ray_ray_" + name,
target = ":org_ray_ray_" + name,
config = "//java:checkstyle.xml",
suppressions = "//java:checkstyle-suppressions.xml",
size = "small",
Expand All @@ -63,7 +63,7 @@ def define_java_module(
)
checkstyle_test(
name = "org_ray_ray_" + name + "_test-checkstyle",
target = "//java:org_ray_ray_" + name + "_test",
target = ":org_ray_ray_" + name + "_test",
config = "//java:checkstyle.xml",
suppressions = "//java:checkstyle-suppressions.xml",
size = "small",
Expand Down
2 changes: 2 additions & 0 deletions bazel/ray_deps_build_all.bzl
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
load("@com_github_ray_project_ray//java:dependencies.bzl", "gen_java_deps")
load("@com_github_ray_project_ray//streaming/java:dependencies.bzl", "gen_streaming_java_deps")
load("@com_github_nelhage_rules_boost//:boost/boost.bzl", "boost_deps")
load("@com_github_jupp0r_prometheus_cpp//bazel:repositories.bzl", "prometheus_cpp_repositories")
load("@com_github_checkstyle_java//:repo.bzl", "checkstyle_deps")
Expand All @@ -9,6 +10,7 @@ load("@rules_proto_grpc//:repositories.bzl", "rules_proto_grpc_toolchains")

def ray_deps_build_all():
gen_java_deps()
gen_streaming_java_deps()
checkstyle_deps()
boost_deps()
prometheus_cpp_repositories()
Expand Down
2 changes: 1 addition & 1 deletion ci/travis/bazel-format.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,6 @@ done

pushd $ROOT_DIR/../..
BAZEL_FILES="bazel/BUILD bazel/BUILD.plasma bazel/ray.bzl BUILD.bazel
streaming/BUILD.bazel WORKSPACE"
streaming/BUILD.bazel streaming/java/BUILD.bazel WORKSPACE"
buildifier -mode=$RUN_TYPE -diff_command="diff -u" $BAZEL_FILES
popd
12 changes: 12 additions & 0 deletions ci/travis/determine_tests_to_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def list_changed_files(commit_range):
RAY_CI_MACOS_WHEELS_AFFECTED = 0
RAY_CI_STREAMING_CPP_AFFECTED = 0
RAY_CI_STREAMING_PYTHON_AFFECTED = 0
RAY_CI_STREAMING_JAVA_AFFECTED = 0

if os.environ["TRAVIS_EVENT_TYPE"] == "pull_request":

Expand Down Expand Up @@ -76,6 +77,7 @@ def list_changed_files(commit_range):
RAY_CI_STREAMING_PYTHON_AFFECTED = 1
elif changed_file.startswith("java/"):
RAY_CI_JAVA_AFFECTED = 1
RAY_CI_STREAMING_JAVA_AFFECTED = 1
elif any(
changed_file.startswith(prefix)
for prefix in skip_prefix_list):
Expand All @@ -91,11 +93,15 @@ def list_changed_files(commit_range):
RAY_CI_MACOS_WHEELS_AFFECTED = 1
RAY_CI_STREAMING_CPP_AFFECTED = 1
RAY_CI_STREAMING_PYTHON_AFFECTED = 1
RAY_CI_STREAMING_JAVA_AFFECTED = 1
elif changed_file.startswith("streaming/src"):
RAY_CI_STREAMING_CPP_AFFECTED = 1
RAY_CI_STREAMING_PYTHON_AFFECTED = 1
RAY_CI_STREAMING_JAVA_AFFECTED = 1
elif changed_file.startswith("streaming/python"):
RAY_CI_STREAMING_PYTHON_AFFECTED = 1
elif changed_file.startswith("streaming/java"):
RAY_CI_STREAMING_JAVA_AFFECTED = 1
else:
RAY_CI_TUNE_AFFECTED = 1
RAY_CI_RLLIB_AFFECTED = 1
Expand All @@ -105,6 +111,8 @@ def list_changed_files(commit_range):
RAY_CI_LINUX_WHEELS_AFFECTED = 1
RAY_CI_MACOS_WHEELS_AFFECTED = 1
RAY_CI_STREAMING_CPP_AFFECTED = 1
RAY_CI_STREAMING_PYTHON_AFFECTED = 1
RAY_CI_STREAMING_JAVA_AFFECTED = 1
else:
RAY_CI_TUNE_AFFECTED = 1
RAY_CI_RLLIB_AFFECTED = 1
Expand All @@ -114,6 +122,8 @@ def list_changed_files(commit_range):
RAY_CI_LINUX_WHEELS_AFFECTED = 1
RAY_CI_MACOS_WHEELS_AFFECTED = 1
RAY_CI_STREAMING_CPP_AFFECTED = 1
RAY_CI_STREAMING_PYTHON_AFFECTED = 1
RAY_CI_STREAMING_JAVA_AFFECTED = 1

# Log the modified environment variables visible in console.
for output_stream in [sys.stdout, sys.stderr]:
Expand All @@ -132,3 +142,5 @@ def list_changed_files(commit_range):
.format(RAY_CI_STREAMING_CPP_AFFECTED))
_print("export RAY_CI_STREAMING_PYTHON_AFFECTED={}"
.format(RAY_CI_STREAMING_PYTHON_AFFECTED))
_print("export RAY_CI_STREAMING_JAVA_AFFECTED={}"
.format(RAY_CI_STREAMING_JAVA_AFFECTED))
41 changes: 3 additions & 38 deletions java/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@ exports_files([
"testng.xml",
"checkstyle.xml",
"checkstyle-suppressions.xml",
"streaming/testng.xml",
])

all_modules = [
"api",
"runtime",
"test",
"tutorial",
"streaming",
]

java_import(
Expand All @@ -25,14 +23,11 @@ java_import(
] + [
"all_tests_deploy.jar",
"all_tests_deploy-src.jar",
"streaming_tests_deploy.jar",
"streaming_tests_deploy-src.jar",
],
deps = [
":org_ray_ray_" + module for module in all_modules
] + [
":all_tests",
":streaming_tests",
],
)

Expand All @@ -45,6 +40,7 @@ define_java_module(
"@maven//:com_sun_xml_bind_jaxb_core",
"@maven//:com_sun_xml_bind_jaxb_impl",
],
visibility = ["//visibility:public"]
)

define_java_module(
Expand Down Expand Up @@ -79,7 +75,9 @@ define_java_module(
"@maven//:org_slf4j_slf4j_api",
"@maven//:org_slf4j_slf4j_log4j12",
"@maven//:redis_clients_jedis",
"@maven//:net_java_dev_jna_jna",
],
visibility = ["//visibility:public"]
)

define_java_module(
Expand Down Expand Up @@ -107,28 +105,6 @@ define_java_module(
],
)

define_java_module(
name = "streaming",
deps = [
":org_ray_ray_api",
":org_ray_ray_runtime",
"@maven//:com_google_guava_guava",
"@maven//:org_slf4j_slf4j_api",
"@maven//:org_slf4j_slf4j_log4j12",
],
define_test_lib = True,
test_deps = [
":org_ray_ray_api",
":org_ray_ray_runtime",
":org_ray_ray_streaming",
"@maven//:com_beust_jcommander",
"@maven//:com_google_guava_guava",
"@maven//:org_slf4j_slf4j_api",
"@maven//:org_slf4j_slf4j_log4j12",
"@maven//:org_testng_testng",
],
)

java_binary(
name = "all_tests",
main_class = "org.testng.TestNG",
Expand All @@ -140,16 +116,6 @@ java_binary(
],
)

java_binary(
name = "streaming_tests",
main_class = "org.testng.TestNG",
data = ["streaming/testng.xml"],
args = ["java/streaming/testng.xml"],
runtime_deps = [
":org_ray_ray_streaming_test",
],
)

java_proto_compile(
name = "common_java_proto",
deps = ["@//:common_proto"],
Expand Down Expand Up @@ -236,7 +202,6 @@ genrule(
cp -f $(location //java:org_ray_ray_runtime_pom) $$WORK_DIR/java/runtime/pom.xml
cp -f $(location //java:org_ray_ray_tutorial_pom) $$WORK_DIR/java/tutorial/pom.xml
cp -f $(location //java:org_ray_ray_test_pom) $$WORK_DIR/java/test/pom.xml
cp -f $(location //java:org_ray_ray_streaming_pom) $$WORK_DIR/java/streaming/pom.xml
echo $$(date) > $@
""",
local = 1,
Expand Down
3 changes: 2 additions & 1 deletion java/dependencies.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ def gen_java_deps():
"org.slf4j:slf4j-log4j12:1.7.25",
"org.testng:testng:6.9.10",
"redis.clients:jedis:2.8.0",
"net.java.dev.jna:jna:5.5.0"
],
repositories = [
"https://repo1.maven.org/maven2",
"https://repo1.maven.org/maven2/",
],
)
1 change: 0 additions & 1 deletion java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
<module>api</module>
<module>runtime</module>
<module>test</module>
<module>streaming</module>
<module>tutorial</module>
</modules>

Expand Down
5 changes: 5 additions & 0 deletions java/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@
<artifactId>fst</artifactId>
<version>2.57</version>
</dependency>
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
<version>5.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand Down
46 changes: 9 additions & 37 deletions java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package org.ray.runtime;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.io.FileUtils;
Expand All @@ -22,7 +20,7 @@
import org.ray.runtime.task.NativeTaskExecutor;
import org.ray.runtime.task.NativeTaskSubmitter;
import org.ray.runtime.task.TaskExecutor;
import org.ray.runtime.util.FileUtil;
import org.ray.runtime.util.JniUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -42,16 +40,11 @@ public final class RayNativeRuntime extends AbstractRayRuntime {

static {
LOGGER.debug("Loading native libraries.");
// Load native libraries.
String[] libraries = new String[]{"core_worker_library_java"};
for (String library : libraries) {
String fileName = System.mapLibraryName(library);
try (FileUtil.TempFile libFile = FileUtil.getTempFileFromResource(fileName)) {
System.load(libFile.getFile().getAbsolutePath());
}
LOGGER.debug("Native libraries loaded.");
}

// Expose ray ABI symbols which may be depended by other shared
// libraries such as libstreaming_java.so.
// See BUILD.bazel:libcore_worker_library_java.so
JniUtils.loadLibrary("core_worker_library_java", true);
LOGGER.debug("Native libraries loaded.");
RayConfig globalRayConfig = RayConfig.create();
resetLibraryPath(globalRayConfig);

Expand All @@ -65,30 +58,9 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
}

private static void resetLibraryPath(RayConfig rayConfig) {
if (rayConfig.libraryPath.isEmpty()) {
return;
}

String path = System.getProperty("java.library.path");
if (Strings.isNullOrEmpty(path)) {
path = "";
} else {
path += ":";
}
path += String.join(":", rayConfig.libraryPath);

// This is a hack to reset library path at runtime,
// see https://stackoverflow.com/questions/15409223/.
System.setProperty("java.library.path", path);
// Set sys_paths to null so that java.library.path will be re-evaluated next time it is needed.
final Field sysPathsField;
try {
sysPathsField = ClassLoader.class.getDeclaredField("sys_paths");
sysPathsField.setAccessible(true);
sysPathsField.set(null, null);
} catch (NoSuchFieldException | IllegalAccessException e) {
LOGGER.error("Failed to set library path.", e);
}
String separator = System.getProperty("path.separator");
String libraryPath = String.join(separator, rayConfig.libraryPath);
JniUtils.resetLibraryPath(libraryPath);
}

public RayNativeRuntime(RayConfig rayConfig, FunctionManager functionManager) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ public class DefaultWorker {
public static void main(String[] args) {
try {
System.setProperty("ray.worker.mode", "WORKER");
// Set run-mode to `CLUSTER` explicitly, to prevent the DefaultWorker to receive
// a wrong run-mode parameter through jvm options.
System.setProperty("ray.run-mode", "CLUSTER");
Thread.setDefaultUncaughtExceptionHandler((Thread t, Throwable e) -> {
LOGGER.error("Uncaught worker exception in thread {}: {}", t, e);
});
Expand Down
Loading

0 comments on commit 7bbfa85

Please sign in to comment.