diff --git a/lib/trino-filesystem-alluxio/pom.xml b/lib/trino-filesystem-alluxio/pom.xml
new file mode 100644
index 000000000000..8f6523ebe09a
--- /dev/null
+++ b/lib/trino-filesystem-alluxio/pom.xml
@@ -0,0 +1,125 @@
+
+
+ 4.0.0
+
+
+ io.trino
+ trino-root
+ 460-SNAPSHOT
+ ../../pom.xml
+
+
+ trino-filesystem-alluxio
+ Trino Filesystem - Alluxio
+
+
+ ${project.parent.basedir}
+ true
+
+
+
+
+ com.google.inject
+ guice
+
+
+
+ io.trino
+ trino-filesystem
+
+
+
+ io.trino
+ trino-memory-context
+
+
+
+ io.trino
+ trino-spi
+
+
+
+ org.alluxio
+ alluxio-core-client-fs
+
+
+
+ org.alluxio
+ alluxio-core-common
+
+
+
+ org.alluxio
+ alluxio-core-transport
+
+
+
+ com.github.docker-java
+ docker-java-api
+ test
+
+
+
+ io.airlift
+ junit-extensions
+ test
+
+
+
+ io.trino
+ trino-filesystem
+ ${project.version}
+ tests
+ test
+
+
+
+ io.trino
+ trino-spi
+ test-jar
+ test
+
+
+
+ io.trino
+ trino-testing
+ test
+
+
+
+ io.trino
+ trino-testing-containers
+ test
+
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ test
+
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ test
+
+
+
+ org.testcontainers
+ junit-jupiter
+ test
+
+
+
+ org.testcontainers
+ testcontainers
+ test
+
+
+
diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileIterator.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileIterator.java
new file mode 100644
index 000000000000..ba16de33df26
--- /dev/null
+++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileIterator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.alluxio;
+
+import alluxio.client.file.URIStatus;
+import io.trino.filesystem.FileEntry;
+import io.trino.filesystem.FileIterator;
+import io.trino.filesystem.Location;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+
+import static io.trino.filesystem.alluxio.AlluxioUtils.convertToLocation;
+import static java.util.Objects.requireNonNull;
+
+public class AlluxioFileIterator
+ implements FileIterator
+{
+ private final Iterator files;
+ private final String mountRoot;
+
+ public AlluxioFileIterator(List files, String mountRoot)
+ {
+ this.files = requireNonNull(files.iterator(), "files is null");
+ this.mountRoot = requireNonNull(mountRoot, "mountRoot is null");
+ }
+
+ @Override
+ public boolean hasNext()
+ throws IOException
+ {
+ return files.hasNext();
+ }
+
+ @Override
+ public FileEntry next()
+ throws IOException
+ {
+ if (!hasNext()) {
+ return null;
+ }
+ URIStatus fileStatus = files.next();
+ Location location = convertToLocation(fileStatus, mountRoot);
+ return new FileEntry(
+ location,
+ fileStatus.getLength(),
+ Instant.ofEpochMilli(fileStatus.getLastModificationTimeMs()),
+ Optional.empty());
+ }
+}
diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystem.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystem.java
new file mode 100644
index 000000000000..371e521baecb
--- /dev/null
+++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystem.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.alluxio;
+
+import alluxio.AlluxioURI;
+import alluxio.client.file.FileSystem;
+import alluxio.client.file.URIStatus;
+import alluxio.exception.AlluxioException;
+import alluxio.exception.FileDoesNotExistException;
+import alluxio.exception.runtime.NotFoundRuntimeException;
+import alluxio.grpc.CreateDirectoryPOptions;
+import alluxio.grpc.DeletePOptions;
+import alluxio.grpc.ListStatusPOptions;
+import io.trino.filesystem.FileIterator;
+import io.trino.filesystem.Location;
+import io.trino.filesystem.TrinoFileSystem;
+import io.trino.filesystem.TrinoInputFile;
+import io.trino.filesystem.TrinoOutputFile;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static io.trino.filesystem.alluxio.AlluxioUtils.convertToAlluxioURI;
+import static java.util.Objects.requireNonNull;
+import static java.util.UUID.randomUUID;
+
+public class AlluxioFileSystem
+ implements TrinoFileSystem
+{
+ private final FileSystem alluxioClient;
+ private final String mountRoot;
+ private final Location rootLocation;
+
+ public AlluxioFileSystem(FileSystem alluxioClient)
+ {
+ this(alluxioClient, "/", Location.of("alluxio:///"));
+ }
+
+ public AlluxioFileSystem(FileSystem alluxioClient, String mountRoot, Location rootLocation)
+ {
+ this.alluxioClient = requireNonNull(alluxioClient, "filesystem is null");
+ this.mountRoot = mountRoot; // default alluxio mount root
+ this.rootLocation = requireNonNull(rootLocation, "rootLocation is null");
+ }
+
+ public String getMountRoot()
+ {
+ return mountRoot;
+ }
+
+ @Override
+ public TrinoInputFile newInputFile(Location location)
+ {
+ ensureNotRootLocation(location);
+ ensureNotEndWithSlash(location);
+ return new AlluxioFileSystemInputFile(location, null, alluxioClient, mountRoot, Optional.empty());
+ }
+
+ @Override
+ public TrinoInputFile newInputFile(Location location, long length)
+ {
+ ensureNotRootLocation(location);
+ ensureNotEndWithSlash(location);
+ return new AlluxioFileSystemInputFile(location, length, alluxioClient, mountRoot, Optional.empty());
+ }
+
+ @Override
+ public TrinoInputFile newInputFile(Location location, long length, Instant lastModified)
+ {
+ ensureNotRootLocation(location);
+ ensureNotEndWithSlash(location);
+ return new AlluxioFileSystemInputFile(location, length, alluxioClient, mountRoot, Optional.of(lastModified));
+ }
+
+ @Override
+ public TrinoOutputFile newOutputFile(Location location)
+ {
+ ensureNotRootLocation(location);
+ ensureNotEndWithSlash(location);
+ return new AlluxioFileSystemOutputFile(rootLocation, location, alluxioClient, mountRoot);
+ }
+
+ @Override
+ public void deleteFile(Location location)
+ throws IOException
+ {
+ ensureNotRootLocation(location);
+ ensureNotEndWithSlash(location);
+ try {
+ alluxioClient.delete(convertToAlluxioURI(location, mountRoot));
+ }
+ catch (FileDoesNotExistException _) {
+ }
+ catch (AlluxioException e) {
+ throw new IOException("Error deleteFile %s".formatted(location), e);
+ }
+ }
+
+ @Override
+ public void deleteDirectory(Location location)
+ throws IOException
+ {
+ try {
+ AlluxioURI uri = convertToAlluxioURI(location, mountRoot);
+ URIStatus status = alluxioClient.getStatus(uri);
+ if (status == null) {
+ return;
+ }
+ if (!status.isFolder()) {
+ throw new IOException("delete directory cannot be called on a file %s".formatted(location));
+ }
+ DeletePOptions deletePOptions = DeletePOptions.newBuilder().setRecursive(true).build();
+ // recursive delete on the root directory must be handled manually
+ if (location.path().isEmpty() || location.path().equals(mountRoot)) {
+ for (URIStatus uriStatus : alluxioClient.listStatus(uri)) {
+ alluxioClient.delete(new AlluxioURI(uriStatus.getPath()), deletePOptions);
+ }
+ }
+ else {
+ alluxioClient.delete(uri, deletePOptions);
+ }
+ }
+ catch (FileDoesNotExistException | NotFoundRuntimeException e) {
+ }
+ catch (AlluxioException e) {
+ throw new IOException("Error deleteDirectory %s".formatted(location), e);
+ }
+ }
+
+ @Override
+ public void renameFile(Location source, Location target)
+ throws IOException
+ {
+ try {
+ ensureNotRootLocation(source);
+ ensureNotEndWithSlash(source);
+ ensureNotRootLocation(target);
+ ensureNotEndWithSlash(target);
+ }
+ catch (IllegalStateException e) {
+ throw new IllegalStateException(
+ "Cannot rename file from %s to %s as one of them is root location".formatted(source, target), e);
+ }
+ AlluxioURI sourceUri = convertToAlluxioURI(source, mountRoot);
+ AlluxioURI targetUri = convertToAlluxioURI(target, mountRoot);
+
+ try {
+ if (!alluxioClient.exists(sourceUri)) {
+ throw new IOException(
+ "Cannot rename file %s to %s as file %s doesn't exist".formatted(source, target, source));
+ }
+ if (alluxioClient.exists(targetUri)) {
+ throw new IOException(
+ "Cannot rename file %s to %s as file %s already exists".formatted(source, target, target));
+ }
+ URIStatus status = alluxioClient.getStatus(sourceUri);
+ if (status.isFolder()) {
+ throw new IOException(
+ "Cannot rename file %s to %s as %s is a directory".formatted(source, target, source));
+ }
+ alluxioClient.rename(convertToAlluxioURI(source, mountRoot), convertToAlluxioURI(target, mountRoot));
+ }
+ catch (AlluxioException e) {
+ throw new IOException("Error renameFile from %s to %s".formatted(source, target), e);
+ }
+ }
+
+ @Override
+ public FileIterator listFiles(Location location)
+ throws IOException
+ {
+ try {
+ URIStatus status = alluxioClient.getStatus(convertToAlluxioURI(location, mountRoot));
+ if (status == null) {
+ new AlluxioFileIterator(Collections.emptyList(), mountRoot);
+ }
+ if (!status.isFolder()) {
+ throw new IOException("Location is not a directory: %s".formatted(location));
+ }
+ }
+ catch (NotFoundRuntimeException | AlluxioException e) {
+ return new AlluxioFileIterator(Collections.emptyList(), mountRoot);
+ }
+
+ try {
+ List filesStatus = alluxioClient.listStatus(convertToAlluxioURI(location, mountRoot),
+ ListStatusPOptions.newBuilder().setRecursive(true).build());
+ return new AlluxioFileIterator(filesStatus.stream().filter(status -> !status.isFolder() & status.isCompleted()).toList(), mountRoot);
+ }
+ catch (AlluxioException e) {
+ throw new IOException("Error listFiles %s".formatted(location), e);
+ }
+ }
+
+ @Override
+ public Optional directoryExists(Location location)
+ throws IOException
+ {
+ if (location.path().isEmpty()) {
+ return Optional.of(true);
+ }
+ try {
+ URIStatus status = alluxioClient.getStatus(convertToAlluxioURI(location, mountRoot));
+ if (status != null && status.isFolder()) {
+ return Optional.of(true);
+ }
+ return Optional.of(false);
+ }
+ catch (FileDoesNotExistException | FileNotFoundException | NotFoundRuntimeException e) {
+ return Optional.of(false);
+ }
+ catch (AlluxioException e) {
+ throw new IOException("Error directoryExists %s".formatted(location), e);
+ }
+ }
+
+ @Override
+ public void createDirectory(Location location)
+ throws IOException
+ {
+ try {
+ AlluxioURI locationUri = convertToAlluxioURI(location, mountRoot);
+ if (alluxioClient.exists(locationUri)) {
+ URIStatus status = alluxioClient.getStatus(locationUri);
+ if (!status.isFolder()) {
+ throw new IOException(
+ "Cannot create a directory for an existing file location %s".formatted(location));
+ }
+ }
+ alluxioClient.createDirectory(
+ locationUri,
+ CreateDirectoryPOptions.newBuilder()
+ .setAllowExists(true)
+ .setRecursive(true)
+ .build());
+ }
+ catch (AlluxioException e) {
+ throw new IOException("Error createDirectory %s".formatted(location), e);
+ }
+ }
+
+ @Override
+ public void renameDirectory(Location source, Location target)
+ throws IOException
+ {
+ try {
+ ensureNotRootLocation(source);
+ ensureNotRootLocation(target);
+ }
+ catch (IllegalStateException e) {
+ throw new IOException(
+ "Cannot rename directory from %s to %s as one of them is root location".formatted(source, target), e);
+ }
+ try {
+ if (alluxioClient.exists(convertToAlluxioURI(target, mountRoot))) {
+ throw new IOException(
+ "Cannot rename %s to %s as file %s already exists".formatted(source, target, target));
+ }
+ alluxioClient.rename(convertToAlluxioURI(source, mountRoot), convertToAlluxioURI(target, mountRoot));
+ }
+ catch (AlluxioException e) {
+ throw new IOException("Error renameDirectory from %s to %s".formatted(source, target), e);
+ }
+ }
+
+ @Override
+ public Set listDirectories(Location location)
+ throws IOException
+ {
+ try {
+ if (isFile(location)) {
+ throw new IOException("Cannot list directories for a file %s".formatted(location));
+ }
+ List filesStatus = alluxioClient.listStatus(convertToAlluxioURI(location, mountRoot));
+ return filesStatus.stream()
+ .filter(URIStatus::isFolder)
+ .map((URIStatus fileStatus) -> AlluxioUtils.convertToLocation(fileStatus, mountRoot))
+ .map(loc -> {
+ if (!loc.toString().endsWith("/")) {
+ return Location.of(loc + "/");
+ }
+ else {
+ return loc;
+ }
+ })
+ .collect(Collectors.toSet());
+ }
+ catch (FileDoesNotExistException | FileNotFoundException | NotFoundRuntimeException e) {
+ return Collections.emptySet();
+ }
+ catch (AlluxioException e) {
+ throw new IOException("Error listDirectories %s".formatted(location), e);
+ }
+ }
+
+ @Override
+ public Optional createTemporaryDirectory(Location targetPath, String temporaryPrefix, String relativePrefix)
+ throws IOException
+ {
+ // allow for absolute or relative temporary prefix
+ Location temporary;
+ if (temporaryPrefix.startsWith("/")) {
+ String prefix = temporaryPrefix;
+ while (prefix.startsWith("/")) {
+ prefix = prefix.substring(1);
+ }
+ temporary = targetPath.appendPath(prefix);
+ }
+ else {
+ temporary = targetPath.appendPath(temporaryPrefix);
+ }
+
+ temporary = temporary.appendPath(randomUUID().toString());
+
+ createDirectory(temporary);
+ return Optional.of(temporary);
+ }
+
+ private void ensureNotRootLocation(Location location)
+ {
+ String locationPath = location.path();
+ while (locationPath.endsWith("/")) {
+ locationPath = locationPath.substring(0, locationPath.length() - 1);
+ }
+
+ String rootLocationPath = rootLocation.path();
+ while (rootLocationPath.endsWith("/")) {
+ rootLocationPath = rootLocationPath.substring(0, rootLocationPath.length() - 1);
+ }
+
+ if (rootLocationPath.equals(locationPath)) {
+ throw new IllegalStateException("Illegal operation on %s".formatted(location));
+ }
+ }
+
+ private void ensureNotEndWithSlash(Location location)
+ {
+ String locationPath = location.path();
+ if (locationPath.endsWith("/")) {
+ throw new IllegalStateException("Illegal operation on %s".formatted(location));
+ }
+ }
+
+ private boolean isFile(Location location)
+ {
+ try {
+ URIStatus status = alluxioClient.getStatus(convertToAlluxioURI(location, mountRoot));
+ if (status == null) {
+ return false;
+ }
+ return !status.isFolder();
+ }
+ catch (NotFoundRuntimeException | AlluxioException | IOException e) {
+ return false;
+ }
+ }
+}
diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemFactory.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemFactory.java
new file mode 100644
index 000000000000..cc7b6303e061
--- /dev/null
+++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.alluxio;
+
+import alluxio.client.file.FileSystem;
+import alluxio.client.file.FileSystemContext;
+import alluxio.conf.AlluxioConfiguration;
+import alluxio.conf.Configuration;
+import com.google.inject.Inject;
+import io.trino.filesystem.TrinoFileSystem;
+import io.trino.filesystem.TrinoFileSystemFactory;
+import io.trino.spi.security.ConnectorIdentity;
+
+public class AlluxioFileSystemFactory
+ implements TrinoFileSystemFactory
+{
+ private final AlluxioConfiguration conf;
+
+ @Inject
+ public AlluxioFileSystemFactory()
+ {
+ this(Configuration.global());
+ }
+
+ public AlluxioFileSystemFactory(AlluxioConfiguration conf)
+ {
+ this.conf = conf;
+ }
+
+ @Override
+ public TrinoFileSystem create(ConnectorIdentity identity)
+ {
+ FileSystemContext fsContext = FileSystemContext.create(conf);
+ FileSystem alluxioClient = FileSystem.Factory.create(fsContext);
+ return new AlluxioFileSystem(alluxioClient);
+ }
+}
diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInput.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInput.java
new file mode 100644
index 000000000000..44843c5ad587
--- /dev/null
+++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInput.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.alluxio;
+
+import alluxio.client.file.FileInStream;
+import io.trino.filesystem.TrinoInput;
+import io.trino.filesystem.TrinoInputFile;
+
+import java.io.IOException;
+
+import static java.lang.Math.min;
+import static java.util.Objects.checkFromIndexSize;
+import static java.util.Objects.requireNonNull;
+
+public class AlluxioFileSystemInput
+ implements TrinoInput
+{
+ private final FileInStream stream;
+ private final TrinoInputFile inputFile;
+
+ private volatile boolean closed;
+
+ public AlluxioFileSystemInput(FileInStream stream, TrinoInputFile inputFile)
+ {
+ this.stream = requireNonNull(stream, "stream is null");
+ this.inputFile = requireNonNull(inputFile, "inputFile is null");
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer, int bufferOffset, int bufferLength)
+ throws IOException
+ {
+ ensureOpen();
+ checkFromIndexSize(bufferOffset, bufferLength, buffer.length);
+ if (position + bufferLength > inputFile.length()) {
+ throw new IOException("readFully position overflow %s. pos %d + buffer length %d > file size %d"
+ .formatted(inputFile.location(), position, bufferLength, inputFile.length()));
+ }
+ stream.positionedRead(position, buffer, bufferOffset, bufferLength);
+ }
+
+ @Override
+ public int readTail(byte[] buffer, int bufferOffset, int bufferLength)
+ throws IOException
+ {
+ ensureOpen();
+ checkFromIndexSize(bufferOffset, bufferLength, buffer.length);
+ long fileSize = inputFile.length();
+ int readSize = (int) min(fileSize, bufferLength);
+ readFully(fileSize - readSize, buffer, bufferOffset, readSize);
+ return readSize;
+ }
+
+ @Override
+ public String toString()
+ {
+ return inputFile.toString();
+ }
+
+ @Override
+ public void close()
+ throws IOException
+ {
+ closed = true;
+ stream.close();
+ }
+
+ private void ensureOpen()
+ throws IOException
+ {
+ if (closed) {
+ throw new IOException("Input stream closed: " + this);
+ }
+ }
+}
diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInputFile.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInputFile.java
new file mode 100644
index 000000000000..ab9e59b5d836
--- /dev/null
+++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInputFile.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.alluxio;
+
+import alluxio.client.file.FileInStream;
+import alluxio.client.file.FileSystem;
+import alluxio.client.file.URIStatus;
+import alluxio.exception.AlluxioException;
+import alluxio.exception.FileDoesNotExistException;
+import alluxio.exception.runtime.NotFoundRuntimeException;
+import alluxio.grpc.OpenFilePOptions;
+import io.trino.filesystem.Location;
+import io.trino.filesystem.TrinoInput;
+import io.trino.filesystem.TrinoInputFile;
+import io.trino.filesystem.TrinoInputStream;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Optional;
+
+import static io.trino.filesystem.alluxio.AlluxioUtils.convertToAlluxioURI;
+import static java.util.Objects.requireNonNull;
+
+public class AlluxioFileSystemInputFile
+ implements TrinoInputFile
+{
+ private final Location location;
+ private final FileSystem fileSystem;
+ private final String mountRoot;
+
+ private Optional lastModified;
+ private Long length;
+ private URIStatus status;
+
+ public AlluxioFileSystemInputFile(Location location, Long length, FileSystem fileSystem, String mountRoot, Optional lastModified)
+ {
+ this.location = requireNonNull(location, "location is null");
+ this.fileSystem = requireNonNull(fileSystem, "fileSystem is null");
+ this.mountRoot = requireNonNull(mountRoot, "mountRoot is null");
+ this.length = length;
+ this.lastModified = requireNonNull(lastModified, "lastModified is null");
+ }
+
+ @Override
+ public TrinoInput newInput()
+ throws IOException
+ {
+ try {
+ return new AlluxioFileSystemInput(openFile(), this);
+ }
+ catch (AlluxioException e) {
+ throw new IOException("Error newInput() file: %s".formatted(location), e);
+ }
+ }
+
+ @Override
+ public TrinoInputStream newStream()
+ throws IOException
+ {
+ try {
+ return new AlluxioTrinoInputStream(location, openFile(), getURIStatus());
+ }
+ catch (AlluxioException e) {
+ throw new IOException("Error newStream() file: %s".formatted(location), e);
+ }
+ }
+
+ private FileInStream openFile()
+ throws IOException, AlluxioException
+ {
+ if (!exists()) {
+ throw new FileNotFoundException("File does not exist: " + location);
+ }
+ return fileSystem.openFile(getURIStatus(), OpenFilePOptions.getDefaultInstance());
+ }
+
+ private void loadFileStatus()
+ throws IOException
+ {
+ if (status == null) {
+ URIStatus fileStatus = getURIStatus();
+ if (length == null) {
+ length = fileStatus.getLength();
+ }
+ if (lastModified.isEmpty()) {
+ lastModified = Optional.of(Instant.ofEpochMilli(fileStatus.getLastModificationTimeMs()));
+ }
+ }
+ }
+
+ private URIStatus getURIStatus()
+ throws IOException
+ {
+ try {
+ //TODO: create a URIStatus object based on the location field
+ status = fileSystem.getStatus(convertToAlluxioURI(location, mountRoot));
+ }
+ catch (FileDoesNotExistException | NotFoundRuntimeException e) {
+ throw new FileNotFoundException("File does not exist: %s".formatted(location));
+ }
+ catch (AlluxioException | IOException e) {
+ throw new IOException("Get status for file %s failed: %s".formatted(location, e.getMessage()), e);
+ }
+ return status;
+ }
+
+ @Override
+ public long length()
+ throws IOException
+ {
+ if (length == null) {
+ loadFileStatus();
+ }
+ return requireNonNull(length, "length is null");
+ }
+
+ @Override
+ public Instant lastModified()
+ throws IOException
+ {
+ if (lastModified.isEmpty()) {
+ loadFileStatus();
+ }
+ return lastModified.orElseThrow();
+ }
+
+ @Override
+ public boolean exists()
+ throws IOException
+ {
+ try {
+ return fileSystem.exists(convertToAlluxioURI(location, mountRoot));
+ }
+ catch (AlluxioException e) {
+ throw new IOException("fail to check file existence", e);
+ }
+ }
+
+ @Override
+ public Location location()
+ {
+ return location;
+ }
+
+ @Override
+ public String toString()
+ {
+ return location().toString();
+ }
+}
diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemModule.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemModule.java
new file mode 100644
index 000000000000..7a61711762c5
--- /dev/null
+++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemModule.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.alluxio;
+
+import com.google.inject.Binder;
+import com.google.inject.Module;
+
+import static com.google.inject.Scopes.SINGLETON;
+
+public class AlluxioFileSystemModule
+ implements Module
+{
+ @Override
+ public void configure(Binder binder)
+ {
+ binder.bind(AlluxioFileSystemFactory.class).in(SINGLETON);
+ }
+}
diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemOutputFile.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemOutputFile.java
new file mode 100644
index 000000000000..c632fe081d7f
--- /dev/null
+++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemOutputFile.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.alluxio;
+
+import alluxio.client.file.FileOutStream;
+import alluxio.client.file.FileSystem;
+import alluxio.exception.AlluxioException;
+import alluxio.grpc.CreateFilePOptions;
+import io.trino.filesystem.Location;
+import io.trino.filesystem.TrinoOutputFile;
+import io.trino.memory.context.AggregatedMemoryContext;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.FileAlreadyExistsException;
+
+import static io.trino.filesystem.alluxio.AlluxioUtils.convertToAlluxioURI;
+import static java.util.Objects.requireNonNull;
+
+public class AlluxioFileSystemOutputFile
+ implements TrinoOutputFile
+{
+ private final Location rootLocation;
+ private final Location location;
+ private final FileSystem fileSystem;
+ private final String mountRoot;
+
+ public AlluxioFileSystemOutputFile(Location rootLocation, Location location, FileSystem fileSystem, String mountRoot)
+ {
+ this.rootLocation = requireNonNull(rootLocation, "root location is null");
+ this.location = requireNonNull(location, "location is null");
+ this.fileSystem = requireNonNull(fileSystem, "fileSystem is null");
+ this.mountRoot = requireNonNull(mountRoot, "mountRoot is null");
+ }
+
+ @Override
+ public void createOrOverwrite(byte[] data)
+ throws IOException
+ {
+ ensureOutputFileNotOutsideOfRoot(location);
+ try (FileOutStream outStream = fileSystem.createFile(
+ convertToAlluxioURI(location, mountRoot),
+ CreateFilePOptions.newBuilder().setOverwrite(true).setRecursive(true).build())) {
+ outStream.write(data);
+ }
+ catch (AlluxioException e) {
+ throw new IOException("Error createOrOverwrite %s".formatted(location), e);
+ }
+ }
+
+ @Override
+ public OutputStream create(AggregatedMemoryContext memoryContext)
+ throws IOException
+ {
+ throwIfAlreadyExists();
+ try {
+ return new AlluxioTrinoOutputStream(
+ location,
+ fileSystem.createFile(
+ convertToAlluxioURI(location, mountRoot),
+ CreateFilePOptions.newBuilder().setRecursive(true).build()));
+ }
+ catch (AlluxioException e) {
+ throw new IOException("Error create %s".formatted(location), e);
+ }
+ }
+
+ @Override
+ public Location location()
+ {
+ return location;
+ }
+
+ @Override
+ public String toString()
+ {
+ return location().toString();
+ }
+
+ private void ensureOutputFileNotOutsideOfRoot(Location location)
+ throws IOException
+ {
+ String path = AlluxioUtils.simplifyPath(location.path());
+ if (rootLocation != null && !path.startsWith(rootLocation.path())) {
+ throw new IOException("Output file %s outside of root is not allowed".formatted(location));
+ }
+ }
+
+ private void throwIfAlreadyExists()
+ throws IOException
+ {
+ try {
+ if (fileSystem.exists(convertToAlluxioURI(location, mountRoot))) {
+ throw new FileAlreadyExistsException("File %s already exists".formatted(location));
+ }
+ }
+ catch (AlluxioException e) {
+ throw new IOException("Error create %s".formatted(location), e);
+ }
+ }
+}
diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoInputStream.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoInputStream.java
new file mode 100644
index 000000000000..e23d61c5cf61
--- /dev/null
+++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoInputStream.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.alluxio;
+
+import alluxio.client.file.FileInStream;
+import alluxio.client.file.URIStatus;
+import io.trino.filesystem.Location;
+import io.trino.filesystem.TrinoInputStream;
+
+import java.io.IOException;
+
+import static java.util.Objects.checkFromIndexSize;
+import static java.util.Objects.requireNonNull;
+
+public final class AlluxioTrinoInputStream
+ extends TrinoInputStream
+{
+ private final Location location;
+ private final FileInStream stream;
+ private final URIStatus fileStatus;
+
+ private boolean closed;
+
+ public AlluxioTrinoInputStream(Location location, FileInStream stream, URIStatus fileStatus)
+ {
+ this.location = requireNonNull(location, "location is null");
+ this.stream = requireNonNull(stream, "stream is null");
+ this.fileStatus = requireNonNull(fileStatus, "fileStatus is null");
+ }
+
+ @Override
+ public long getPosition()
+ throws IOException
+ {
+ ensureOpen();
+ try {
+ return stream.getPos();
+ }
+ catch (IOException e) {
+ throw new IOException("Get position for file %s failed: %s".formatted(location, e.getMessage()), e);
+ }
+ }
+
+ @Override
+ public void seek(long position)
+ throws IOException
+ {
+ ensureOpen();
+ if (position < 0) {
+ throw new IOException("Negative seek offset");
+ }
+ if (position > fileStatus.getLength()) {
+ throw new IOException("Cannot seek to %s. File size is %s: %s".formatted(position, fileStatus.getLength(), location));
+ }
+ try {
+ stream.seek(position);
+ }
+ catch (IOException e) {
+ throw new IOException("Cannot seek to %s: %s".formatted(position, location));
+ }
+ }
+
+ @Override
+ public long skip(long n)
+ throws IOException
+ {
+ ensureOpen();
+ try {
+ return stream.skip(n);
+ }
+ catch (IOException e) {
+ throw new IOException("Skipping %s bytes of file %s failed: %s".formatted(n, location, e.getMessage()), e);
+ }
+ }
+
+ @Override
+ public int read()
+ throws IOException
+ {
+ ensureOpen();
+ try {
+ return stream.read();
+ }
+ catch (IOException e) {
+ throw new IOException("Read of file %s failed: %s".formatted(location, e.getMessage()), e);
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len)
+ throws IOException
+ {
+ ensureOpen();
+ checkFromIndexSize(off, len, b.length);
+ try {
+ return stream.read(b, off, len);
+ }
+ catch (IOException e) {
+ throw new IOException("Read of file %s failed: %s".formatted(location, e.getMessage()), e);
+ }
+ }
+
+ @Override
+ public void close()
+ throws IOException
+ {
+ closed = true;
+ stream.close();
+ }
+
+ private void ensureOpen()
+ throws IOException
+ {
+ if (closed) {
+ throw new IOException("Input stream closed: " + location);
+ }
+ }
+
+ @Override
+ public int available()
+ throws IOException
+ {
+ ensureOpen();
+ return super.available();
+ }
+}
diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoOutputStream.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoOutputStream.java
new file mode 100644
index 000000000000..0421b4229ea3
--- /dev/null
+++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoOutputStream.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.alluxio;
+
+import io.trino.filesystem.Location;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public final class AlluxioTrinoOutputStream
+ extends OutputStream
+{
+ private final Location location;
+ private final OutputStream delegate;
+
+ private volatile boolean closed;
+
+ public AlluxioTrinoOutputStream(Location location, OutputStream delegate)
+ {
+ this.location = location;
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void write(int b)
+ throws IOException
+ {
+ ensureOpen();
+ delegate.write(b);
+ }
+
+ @Override
+ public void flush()
+ throws IOException
+ {
+ ensureOpen();
+ delegate.flush();
+ }
+
+ @Override
+ public void close()
+ throws IOException
+ {
+ closed = true;
+ delegate.close();
+ }
+
+ private void ensureOpen()
+ throws IOException
+ {
+ if (closed) {
+ throw new IOException("Output stream for %s closed: ".formatted(location));
+ }
+ }
+}
diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioUtils.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioUtils.java
new file mode 100644
index 000000000000..493e808fb35e
--- /dev/null
+++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioUtils.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.alluxio;
+
+import alluxio.AlluxioURI;
+import alluxio.client.file.URIStatus;
+import io.trino.filesystem.Location;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Optional;
+
+public class AlluxioUtils
+{
+ private AlluxioUtils() {}
+
+ public static Location convertToLocation(URIStatus fileStatus, String mountRoot)
+ {
+ if (fileStatus == null) {
+ return null;
+ }
+ String path = fileStatus.getPath();
+ if (path == null) {
+ return null;
+ }
+ if (path.isEmpty()) {
+ return Location.of("");
+ }
+ if (path.startsWith("alluxio://")) {
+ return Location.of(fileStatus.getPath());
+ }
+
+ String schema = "alluxio://";
+ if (path.startsWith("/")) {
+ while (path.startsWith("/")) {
+ path = path.substring(1);
+ }
+ }
+ String mountRootWithSlash = mountRoot;
+ if (!mountRoot.endsWith("/")) {
+ mountRootWithSlash = mountRoot + "/";
+ }
+ return Location.of(schema + mountRootWithSlash + path);
+ }
+
+ public static String simplifyPath(String path)
+ {
+ // Use a deque to store the path components
+ Deque deque = new ArrayDeque<>();
+ String[] segments = path.split("/");
+
+ for (String segment : segments) {
+ if (segment.isEmpty() || segment.equals(".")) {
+ // Ignore empty and current directory segments
+ continue;
+ }
+ if (segment.equals("..")) {
+ // If there's a valid directory to go back to, remove it
+ if (!deque.isEmpty()) {
+ deque.pollLast();
+ }
+ }
+ else {
+ // Add the directory to the deque
+ deque.offerLast(segment);
+ }
+ }
+
+ // Build the final simplified path from the deque
+ StringBuilder simplifiedPath = new StringBuilder();
+ for (String dir : deque) {
+ simplifiedPath.append(dir).append("/");
+ }
+
+ // Retain trailing slash if it was in the original path
+ if (!path.endsWith("/") && simplifiedPath.length() > 0) {
+ simplifiedPath.setLength(simplifiedPath.length() - 1);
+ }
+
+ return simplifiedPath.length() == 0 ? "" : simplifiedPath.toString();
+ }
+
+ public static AlluxioURI convertToAlluxioURI(Location location, String mountRoot)
+ {
+ Optional scheme = location.scheme();
+ if (scheme.isPresent()) {
+ if (!scheme.get().equals("alluxio")) {
+ return new AlluxioURI(location.toString());
+ }
+ }
+ String path = location.path();
+ while (path.startsWith("/")) {
+ path = path.substring(1);
+ }
+ if (!mountRoot.endsWith("/")) {
+ mountRoot = mountRoot + "/";
+ }
+ return new AlluxioURI(mountRoot + path);
+ }
+}
diff --git a/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/AbstractTestAlluxioFileSystem.java b/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/AbstractTestAlluxioFileSystem.java
new file mode 100644
index 000000000000..f455f4a0e5dc
--- /dev/null
+++ b/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/AbstractTestAlluxioFileSystem.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.alluxio;
+
+import alluxio.AlluxioURI;
+import alluxio.client.file.FileSystem;
+import alluxio.client.file.FileSystemContext;
+import alluxio.client.file.URIStatus;
+import alluxio.conf.Configuration;
+import alluxio.conf.InstancedConfiguration;
+import alluxio.exception.AlluxioException;
+import alluxio.grpc.DeletePOptions;
+import io.trino.filesystem.AbstractTestTrinoFileSystem;
+import io.trino.filesystem.Location;
+import io.trino.filesystem.TrinoFileSystem;
+import io.trino.spi.security.ConnectorIdentity;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestInstance;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class AbstractTestAlluxioFileSystem
+ extends AbstractTestTrinoFileSystem
+{
+ private TrinoFileSystem fileSystem;
+ private Location rootLocation;
+ private FileSystem alluxioFs;
+ private AlluxioFileSystemFactory alluxioFileSystemFactory;
+
+ protected void initialize()
+ throws IOException
+ {
+ this.rootLocation = Location.of("alluxio:///");
+ InstancedConfiguration conf = Configuration.copyGlobal();
+ FileSystemContext fsContext = FileSystemContext.create(conf);
+ this.alluxioFs = FileSystem.Factory.create(fsContext);
+ this.alluxioFileSystemFactory = new AlluxioFileSystemFactory(conf);
+ this.fileSystem = alluxioFileSystemFactory.create(ConnectorIdentity.ofUser("alluxio"));
+ }
+
+ @AfterAll
+ void tearDown()
+ {
+ fileSystem = null;
+ alluxioFs = null;
+ rootLocation = null;
+ alluxioFileSystemFactory = null;
+ }
+
+ @AfterEach
+ void afterEach()
+ throws IOException, AlluxioException
+ {
+ AlluxioURI root = new AlluxioURI(getRootLocation().toString());
+
+ for (URIStatus status : alluxioFs.listStatus(root)) {
+ alluxioFs.delete(new AlluxioURI(status.getPath()), DeletePOptions.newBuilder().setRecursive(true).build());
+ }
+ }
+
+ @Override
+ protected boolean isHierarchical()
+ {
+ return true;
+ }
+
+ @Override
+ protected TrinoFileSystem getFileSystem()
+ {
+ return fileSystem;
+ }
+
+ @Override
+ protected Location getRootLocation()
+ {
+ return rootLocation;
+ }
+
+ @Override
+ protected void verifyFileSystemIsEmpty()
+ {
+ AlluxioURI bucket =
+ AlluxioUtils.convertToAlluxioURI(rootLocation, ((AlluxioFileSystem) fileSystem).getMountRoot());
+ try {
+ assertThat(alluxioFs.listStatus(bucket)).isEmpty();
+ }
+ catch (IOException | AlluxioException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected final boolean supportsCreateExclusive()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean supportsIncompleteWriteNoClobber()
+ {
+ return false;
+ }
+}
diff --git a/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioFileSystem.java b/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioFileSystem.java
new file mode 100644
index 000000000000..188d7e917cab
--- /dev/null
+++ b/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioFileSystem.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.alluxio;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@Testcontainers
+public class TestAlluxioFileSystem
+ extends AbstractTestAlluxioFileSystem
+{
+ private static final String IMAGE_NAME = "alluxio/alluxio:2.9.5";
+ public static final DockerImageName ALLUXIO_IMAGE = DockerImageName.parse(IMAGE_NAME);
+
+ @Container
+ private static final GenericContainer> ALLUXIO_MASTER_CONTAINER = createAlluxioMasterContainer();
+
+ @Container
+ private static final GenericContainer> ALLUXIO_WORKER_CONTAINER = createAlluxioWorkerContainer();
+
+ private static GenericContainer> createAlluxioMasterContainer()
+ {
+ GenericContainer> container = new GenericContainer<>(ALLUXIO_IMAGE);
+ container.withCommand("master-only")
+ .withEnv("ALLUXIO_JAVA_OPTS",
+ "-Dalluxio.security.authentication.type=NOSASL "
+ + "-Dalluxio.master.hostname=localhost "
+ + "-Dalluxio.worker.hostname=localhost "
+ + "-Dalluxio.master.mount.table.root.ufs=/opt/alluxio/underFSStorage "
+ + "-Dalluxio.master.journal.type=NOOP "
+ + "-Dalluxio.security.authorization.permission.enabled=false "
+ + "-Dalluxio.security.authorization.plugins.enabled=false ")
+ .withNetworkMode("host")
+ .withAccessToHost(true)
+ .waitingFor(Wait.forLogMessage(".*Primary started*\n", 1));
+ return container;
+ }
+
+ private static GenericContainer> createAlluxioWorkerContainer()
+ {
+ GenericContainer> container = new GenericContainer<>(ALLUXIO_IMAGE);
+ container.withCommand("worker-only")
+ .withNetworkMode("host")
+ .withEnv("ALLUXIO_JAVA_OPTS",
+ "-Dalluxio.security.authentication.type=NOSASL "
+ + "-Dalluxio.worker.ramdisk.size=128MB "
+ + "-Dalluxio.worker.hostname=localhost "
+ + "-Dalluxio.worker.tieredstore.level0.alias=HDD "
+ + "-Dalluxio.worker.tieredstore.level0.dirs.path=/tmp "
+ + "-Dalluxio.master.hostname=localhost "
+ + "-Dalluxio.security.authorization.permission.enabled=false "
+ + "-Dalluxio.security.authorization.plugins.enabled=false ")
+ .withAccessToHost(true)
+ .dependsOn(ALLUXIO_MASTER_CONTAINER)
+ .waitingFor(Wait.forLogMessage(".*Alluxio worker started.*\n", 1));
+ return container;
+ }
+
+ @BeforeAll
+ void setup()
+ throws IOException
+ {
+ initialize();
+ // the SSHD container will be stopped by TestContainers on shutdown
+ // https://github.com/trinodb/trino/discussions/21969
+ System.setProperty("ReportLeakedContainers.disabled", "true");
+ }
+
+ @Test
+ void testContainer()
+ {
+ assertThat(ALLUXIO_MASTER_CONTAINER.isRunning()).isTrue();
+ assertThat(ALLUXIO_WORKER_CONTAINER.isRunning()).isTrue();
+ }
+}
diff --git a/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioUtils.java b/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioUtils.java
new file mode 100644
index 000000000000..f38ec2f6db49
--- /dev/null
+++ b/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioUtils.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.alluxio;
+
+import alluxio.AlluxioURI;
+import alluxio.client.file.URIStatus;
+import alluxio.wire.FileInfo;
+import io.trino.filesystem.Location;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TestAlluxioUtils
+{
+ @Test
+ public void testSimplifyPath()
+ {
+ String path = "test/level0-file0";
+ assertThat(path).isEqualTo(AlluxioUtils.simplifyPath(path));
+ path = "a/./b/../../c/";
+ assertThat("c/").isEqualTo(AlluxioUtils.simplifyPath(path));
+ }
+
+ @Test
+ public void convertToLocation()
+ {
+ String mountRoot = "/";
+ URIStatus fileStatus = new URIStatus(new FileInfo().setPath("/mnt/test/level0-file0"));
+ assertThat(Location.of("alluxio:///mnt/test/level0-file0")).isEqualTo(AlluxioUtils.convertToLocation(fileStatus, mountRoot));
+ fileStatus = new URIStatus(new FileInfo().setPath("/mnt/test/level0/level1-file0"));
+ assertThat(Location.of("alluxio:///mnt/test/level0/level1-file0")).isEqualTo(AlluxioUtils.convertToLocation(fileStatus, mountRoot));
+ fileStatus = new URIStatus(new FileInfo().setPath("/mnt/test2/level0/level1/level2-file0"));
+ assertThat(Location.of("alluxio:///mnt/test2/level0/level1/level2-file0")).isEqualTo(AlluxioUtils.convertToLocation(fileStatus, mountRoot));
+ }
+
+ @Test
+ public void testConvertToAlluxioURI()
+ {
+ Location location = Location.of("alluxio:///mnt/test/level0-file0");
+ String mountRoot = "/";
+ assertThat(new AlluxioURI("/mnt/test/level0-file0")).isEqualTo(AlluxioUtils.convertToAlluxioURI(location, mountRoot));
+ location = Location.of("alluxio:///mnt/test/level0/level1-file0");
+ assertThat(new AlluxioURI("/mnt/test/level0/level1-file0")).isEqualTo(AlluxioUtils.convertToAlluxioURI(location, mountRoot));
+ location = Location.of("alluxio:///mnt/test2/level0/level1/level2-file0");
+ assertThat(new AlluxioURI("/mnt/test2/level0/level1/level2-file0")).isEqualTo(AlluxioUtils.convertToAlluxioURI(location, mountRoot));
+ }
+}
diff --git a/lib/trino-filesystem-manager/pom.xml b/lib/trino-filesystem-manager/pom.xml
index 5eadbce9d3fb..429a0476bf48 100644
--- a/lib/trino-filesystem-manager/pom.xml
+++ b/lib/trino-filesystem-manager/pom.xml
@@ -42,6 +42,11 @@
trino-filesystem
+
+ io.trino
+ trino-filesystem-alluxio
+
+
io.trino
trino-filesystem-azure
diff --git a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemConfig.java b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemConfig.java
index 4c5b639dfff2..7394476288d9 100644
--- a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemConfig.java
+++ b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemConfig.java
@@ -18,6 +18,7 @@
public class FileSystemConfig
{
private boolean hadoopEnabled;
+ private boolean alluxioEnabled;
private boolean nativeAzureEnabled;
private boolean nativeS3Enabled;
private boolean nativeGcsEnabled;
@@ -35,6 +36,18 @@ public FileSystemConfig setHadoopEnabled(boolean hadoopEnabled)
return this;
}
+ public boolean isAlluxioEnabled()
+ {
+ return alluxioEnabled;
+ }
+
+ @Config("fs.alluxio.enabled")
+ public FileSystemConfig setAlluxioEnabled(boolean nativeAlluxioEnabled)
+ {
+ this.alluxioEnabled = nativeAlluxioEnabled;
+ return this;
+ }
+
public boolean isNativeAzureEnabled()
{
return nativeAzureEnabled;
diff --git a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java
index 4312c9ab359f..d0e980ec5409 100644
--- a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java
+++ b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java
@@ -24,6 +24,8 @@
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.alluxio.AlluxioFileSystemCacheModule;
+import io.trino.filesystem.alluxio.AlluxioFileSystemFactory;
+import io.trino.filesystem.alluxio.AlluxioFileSystemModule;
import io.trino.filesystem.azure.AzureFileSystemFactory;
import io.trino.filesystem.azure.AzureFileSystemModule;
import io.trino.filesystem.cache.CacheFileSystemFactory;
@@ -89,6 +91,11 @@ protected void setup(Binder binder)
var factories = newMapBinder(binder, String.class, TrinoFileSystemFactory.class);
+ if (config.isAlluxioEnabled()) {
+ install(new AlluxioFileSystemModule());
+ factories.addBinding("alluxio").to(AlluxioFileSystemFactory.class);
+ }
+
if (config.isNativeAzureEnabled()) {
install(new AzureFileSystemModule());
factories.addBinding("abfs").to(AzureFileSystemFactory.class);
diff --git a/lib/trino-filesystem-manager/src/test/java/io/trino/filesystem/manager/TestFileSystemConfig.java b/lib/trino-filesystem-manager/src/test/java/io/trino/filesystem/manager/TestFileSystemConfig.java
index 9609484c0a5b..088d7653d53c 100644
--- a/lib/trino-filesystem-manager/src/test/java/io/trino/filesystem/manager/TestFileSystemConfig.java
+++ b/lib/trino-filesystem-manager/src/test/java/io/trino/filesystem/manager/TestFileSystemConfig.java
@@ -29,6 +29,7 @@ public void testDefaults()
{
assertRecordedDefaults(recordDefaults(FileSystemConfig.class)
.setHadoopEnabled(false)
+ .setAlluxioEnabled(false)
.setNativeAzureEnabled(false)
.setNativeS3Enabled(false)
.setNativeGcsEnabled(false)
@@ -40,6 +41,7 @@ public void testExplicitPropertyMappings()
{
Map properties = ImmutableMap.builder()
.put("fs.hadoop.enabled", "true")
+ .put("fs.alluxio.enabled", "true")
.put("fs.native-azure.enabled", "true")
.put("fs.native-s3.enabled", "true")
.put("fs.native-gcs.enabled", "true")
@@ -48,6 +50,7 @@ public void testExplicitPropertyMappings()
FileSystemConfig expected = new FileSystemConfig()
.setHadoopEnabled(true)
+ .setAlluxioEnabled(true)
.setNativeAzureEnabled(true)
.setNativeS3Enabled(true)
.setNativeGcsEnabled(true)
diff --git a/pom.xml b/pom.xml
index 6ab39ea9ebd4..1f3202abf0ca 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,6 +43,7 @@
lib/trino-array
lib/trino-cache
lib/trino-filesystem
+ lib/trino-filesystem-alluxio
lib/trino-filesystem-azure
lib/trino-filesystem-cache-alluxio
lib/trino-filesystem-gcs
@@ -1101,6 +1102,12 @@
test-jar
+
+ io.trino
+ trino-filesystem-alluxio
+ ${project.version}
+
+
io.trino
trino-filesystem-azure
@@ -1729,14 +1736,6 @@
alluxio-core-client-fs
${dep.alluxio.version}
-
- io.grpc
- grpc-core
-
-
- io.grpc
- grpc-stub
-
org.alluxio
alluxio-core-common
@@ -1745,10 +1744,34 @@
org.alluxio
alluxio-core-transport
+
+ org.apache.logging.log4j
+ log4j-api
+
org.apache.logging.log4j
log4j-core
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+
+
+ org.rocksdb
+ rocksdbjni
+
+
+ org.slf4j
+ log4j-over-slf4j
+
+
+ org.slf4j
+ slf4j-api
+
+
+ org.slf4j
+ slf4j-reload4j
+
@@ -1757,10 +1780,18 @@
alluxio-core-common
${dep.alluxio.version}
+
+ com.amazonaws
+ aws-java-sdk-core
+
com.rabbitmq
amqp-client
+
+ commons-cli
+ commons-cli
+
commons-logging
commons-logging
@@ -1770,28 +1801,12 @@
jetcd-core
- io.grpc
- grpc-api
-
-
- io.grpc
- grpc-core
+ io.swagger
+ swagger-annotations
- io.grpc
- grpc-netty
-
-
- io.grpc
- grpc-services
-
-
- io.grpc
- grpc-stub
-
-
- io.netty
- netty-tcnative-boringssl-static
+ jakarta.servlet
+ jakarta.servlet-api
org.alluxio
@@ -1817,6 +1832,58 @@
org.eclipse.jetty
jetty-servlet
+
+ org.reflections
+ reflections
+
+
+ org.slf4j
+ slf4j-api
+
+
+ org.slf4j
+ slf4j-reload4j
+
+
+
+
+
+ org.alluxio
+ alluxio-core-transport
+ ${dep.alluxio.version}
+
+
+ org.apache.logging.log4j
+ log4j-core
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+
+
+ org.slf4j
+ log4j-over-slf4j
+
+
+ org.slf4j
+ slf4j-api
+
+
+ org.slf4j
+ slf4j-reload4j
+
+
+
+
+
+ org.alluxio
+ alluxio-underfs-local
+ ${dep.alluxio.version}
+
+
+ org.apache.logging.log4j
+ log4j-core
+
@@ -2567,6 +2634,10 @@
org.alluxio
alluxio-core-transport
+
+ org.alluxio
+ alluxio-underfs-local
+
git.properties