Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-17546. Implementing HostsFileReader timeout #6873

Merged
merged 10 commits into from
Jun 15, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeoutException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
Expand Down Expand Up @@ -120,4 +125,37 @@ private CombinedHostsFileReader() {
}
return allDNs;
}

/**
* Wrapper to call readFile with timeout via Future Tasks.
* If timeout is reached, it will throw IOException
* @param hostsFile the input json file to read from
* @param readTimeout timeout for FutureTask execution in milliseconds
* @return the set of DatanodeAdminProperties
* @throws IOException
*/
public static DatanodeAdminProperties[]
readFileWithTimeout(final String hostsFile, final int readTimeout) throws IOException {
FutureTask<DatanodeAdminProperties[]> futureTask = new FutureTask<>(
new Callable<DatanodeAdminProperties[]>() {
@Override
public DatanodeAdminProperties[] call() throws Exception {
return readFile(hostsFile);
}
});

Thread thread = new Thread(futureTask);
thread.start();

try {
return futureTask.get(readTimeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
futureTask.cancel(true);
LOG.error("refresh File read operation timed out");
throw new IOException("host file read operation timed out");
} catch (InterruptedException | ExecutionException e) {
LOG.error("File read operation interrupted : " + e.getMessage());
throw new IOException("host file read operation timed out");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.namenode.hosts.provider.classname";
public static final String DFS_HOSTS = "dfs.hosts";
public static final String DFS_HOSTS_EXCLUDE = "dfs.hosts.exclude";
public static final String DFS_HOSTS_TIMEOUT = "dfs.hosts.timeout";
public static final int DFS_HOSTS_TIMEOUT_DEFAULT = 0;
public static final String DFS_NAMENODE_AUDIT_LOGGERS_KEY = "dfs.namenode.audit.loggers";
public static final String DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME = "default";
public static final String DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY = "dfs.namenode.audit.log.token.tracking.id";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,15 @@ public Configuration getConf() {

@Override
public void refresh() throws IOException {
refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""));
refresh(conf.get(DFSConfigKeys.DFS_HOSTS, ""),
conf.getInt(DFSConfigKeys.DFS_HOSTS_TIMEOUT, DFSConfigKeys.DFS_HOSTS_TIMEOUT_DEFAULT)
);
}
private void refresh(final String hostsFile) throws IOException {
private void refresh(final String hostsFile, final int readTimeout) throws IOException {
HostProperties hostProps = new HostProperties();
DatanodeAdminProperties[] all =
CombinedHostsFileReader.readFile(hostsFile);
DatanodeAdminProperties[] all = readTimeout != DFSConfigKeys.DFS_HOSTS_TIMEOUT_DEFAULT
? CombinedHostsFileReader.readFileWithTimeout(hostsFile, readTimeout)
: CombinedHostsFileReader.readFile(hostsFile);
for(DatanodeAdminProperties properties : all) {
InetSocketAddress addr = parseEntry(hostsFile,
properties.getHostName(), properties.getPort());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1131,7 +1131,14 @@
not permitted to connect to the namenode. The full pathname of the
file must be specified. If the value is empty, no hosts are
excluded.</description>
</property>
</property>

<property>
<name>dfs.hosts.timeout</name>
<value>0</value>
<description>Specifies a timeout (in milliseconds) for reading the dfs.hosts file.
A value of zero indicates no timeout to be set.</description>
</property>

<property>
<name>dfs.namenode.max.objects</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,23 @@

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.concurrent.Callable;

import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Before;
import org.junit.Rule;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary import.

import org.junit.After;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.junit.rules.ExpectedException;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary import.

import org.mockito.Mock;
import org.mockito.Mockito;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;

/**
* Test for JSON based HostsFileReader.
Expand All @@ -44,8 +53,15 @@ public class TestCombinedHostsFileReader {
private final File legacyFile =
new File(TESTCACHEDATADIR, "legacy.dfs.hosts.json");

@Rule
public final ExpectedException exception = ExpectedException.none();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable is not used anywhere.


@Mock
private Callable<DatanodeAdminProperties[]> callable;

@Before
public void setUp() throws Exception {
callable = Mockito.mock(Callable.class);
}

@After
Expand Down Expand Up @@ -87,4 +103,50 @@ public void testEmptyCombinedHostsFileReader() throws Exception {
CombinedHostsFileReader.readFile(newFile.getAbsolutePath());
assertEquals(0, all.length);
}

/*
* When timeout is enabled, test for success when reading file within timeout
* limits
*/
@Test
public void testReadFileWithTimeoutSuccess() throws Exception {

DatanodeAdminProperties[] all = CombinedHostsFileReader.readFileWithTimeout(
jsonFile.getAbsolutePath(), 1000);
assertEquals(7, all.length);
}

/*
* When timeout is enabled, test for IOException when reading file exceeds
* timeout limits
*/
@Test(expected = IOException.class)
public void testReadFileWithTimeoutTimeoutException() throws Exception {
when(callable.call()).thenAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
Thread.sleep(2000);
return null;
}
});

CombinedHostsFileReader.readFileWithTimeout(
jsonFile.getAbsolutePath(), 1);
}

/*
* When timeout is enabled, test for IOException when execution is interrupted
*/
@Test(expected = IOException.class)
public void testReadFileWithTimeoutInterruptedException() throws Exception {
when(callable.call()).thenAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
throw new InterruptedException();
}
});

CombinedHostsFileReader.readFileWithTimeout(
jsonFile.getAbsolutePath(), 1);
}
}
Loading