Skip to content

Commit

Permalink
Add some diag on abfs leasing and test dual write outcome
Browse files Browse the repository at this point in the history
Uses reflection to call AbfsOutputStream.hasLease()
as it is a 3.3.1+ attribute.
  • Loading branch information
steveloughran committed Mar 6, 2023
1 parent 835be4b commit 6a170c3
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package org.apache.hadoop.fs.store.diag;

import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
Expand All @@ -29,9 +32,12 @@
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.store.StoreDurationInfo;

import static org.apache.hadoop.fs.store.diag.CapabilityKeys.*;
Expand Down Expand Up @@ -86,6 +92,11 @@ public class AbfsDiagnosticsInfo extends StoreDiagnosticsInfo {
*/
public static final int BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT = 20;

public static final String FS_AZURE_INFINITE_LEASE_DIRECTORIES =
"fs.azure.infinite-lease.directories";

public static final String FS_AZURE_LEASE_THREADS = "fs.azure.lease.threads";

private static final Object[][] options = {

{"abfs.external.authorization.class", false, false},
Expand Down Expand Up @@ -140,14 +151,14 @@ public class AbfsDiagnosticsInfo extends StoreDiagnosticsInfo {
{"fs.azure.identity.transformer.service.principal.id", false, false},
{"fs.azure.identity.transformer.service.principal.substitution.list", false, false},
{"fs.azure.identity.transformer.skip.superuser.replacement", false, false},
{"fs.azure.infinite-lease.directories", false, false},
{FS_AZURE_INFINITE_LEASE_DIRECTORIES, false, false},
{"fs.azure.io.rate.limit", false, false},
{"fs.azure.io.read.tolerate.concurrent.append", false, false},
{"fs.azure.io.retry.backoff.interval", false, false},
{"fs.azure.io.retry.max.backoff.interval", false, false},
{"fs.azure.io.retry.max.retries", false, false},
{"fs.azure.io.retry.min.backoff.interval", false, false},
{"fs.azure.lease.threads", false, false},
{FS_AZURE_LEASE_THREADS, false, false},
{"fs.azure.list.max.results", false, false},
{"fs.azure.oauth.token.fetch.retry.max.backoff.interval", false, false},
{"fs.azure.oauth.token.fetch.retry.max.retries", false, false},
Expand Down Expand Up @@ -364,6 +375,20 @@ protected void validateConfig(final Printout printout,
validateBufferDir(printout, conf, FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR, HADOOP_TMP_DIR,
writeOperations);
}
int leaseThreads;
if (conf.getBoolean(FS_AZURE_INFINITE_LEASE_DIRECTORIES, false)) {
leaseThreads = conf.getInt(FS_AZURE_LEASE_THREADS, 0);
printout.println("Filesystem has directory leasing enabled with lease thread count of %,d", leaseThreads);

if (leaseThreads == 0) {
printout.warn("Lease thread count is 0 (set in %s)", FS_AZURE_LEASE_THREADS);
printout.warn("Leases will not be released");
} else if (leaseThreads == 1) {
printout.warn("Lease thread count is 1 (set in %s)", FS_AZURE_LEASE_THREADS);
printout.warn("Leases release may be slower than desired");
}
}

}

@Override
Expand Down Expand Up @@ -397,4 +422,37 @@ public void validateFilesystem(final Printout printout,
printout.debug("Error calling getIsNamespaceEnabled()", e);
}
}

@Override
public void validateOutputStream(
final Printout printout,
final FileSystem fs,
final Path file,
final FSDataOutputStream out) throws IOException {

final OutputStream wrappedStream = out.getWrappedStream();
printout.heading("Validating ABFS output stream");
if (!(wrappedStream instanceof AbfsOutputStream)) {
printout.warn("output stream %s is not an ABFS stream", wrappedStream);
return;
}
AbfsOutputStream abfsOut = (AbfsOutputStream) wrappedStream;

printout.println("Writing under %s with output stream %s", file, abfsOut);
try {
final Method hasLeaseM = AbfsOutputStream.class.getMethod("hasLease");
final boolean hasLease = (Boolean)hasLeaseM.invoke(abfsOut);
printout.println("Stream %s lease on the path",
hasLease ? "has a" : "has no");
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
LOG.debug("no method AbfsOutputStream.hasLease() on this release");
}

// now attempt to open the same path again
try (FSDataOutputStream out2 = fs.createFile(file).overwrite(true).build()) {
printout.println("Store permits multiple clients to open the same path for writing");
} catch (IOException ioe) {
LOG.info("Store doesn't permit concurrent writes", ioe);
}
}
}
5 changes: 4 additions & 1 deletion src/main/java/org/apache/hadoop/fs/store/diag/StoreDiag.java
Original file line number Diff line number Diff line change
Expand Up @@ -944,8 +944,11 @@ public void executeFileSystemOperations(final Path baseDir,
try (StoreDurationInfo ignored = new StoreDurationInfo(getOut(),
"Creating file %s", file)) {
FSDataOutputStream data = fs.create(file, true);
data.writeUTF(HELLO);
printStreamCapabilities(data, CapabilityKeys.OUTPUTSTREAM_CAPABILITIES);
storeInfo.validateOutputStream(this, fs,file, data);

heading("Writing data to %s", file);
data.writeUTF(HELLO);

try {
data.hsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
Expand Down Expand Up @@ -595,4 +596,20 @@ protected boolean sizeHint(
"Option %s has value %d. Recommend a value of at least %d",
option, val, recommend);
}

/**
* Validate an output stream of a file being written to.
* @param printout
* @param fs fs
* @param file path to file
* @param data output stream
* @throws IOException IO failure
*/
public void validateOutputStream(
final Printout printout,
final FileSystem fs,
final Path file,
final FSDataOutputStream data) throws IOException {

}
}

0 comments on commit 6a170c3

Please sign in to comment.