Skip to content

Commit

Permalink
add default wait time for aoss
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az committed Nov 27, 2024
1 parent 368b17d commit df706c2
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 6 deletions.
2 changes: 1 addition & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.flint.index.checkpointLocation.rootDir`: default is None. Flint will create a default checkpoint location in format of '<rootDir>/<indexName>/<UUID>' to isolate checkpoint data.
- `spark.flint.index.checkpoint.mandatory`: default is true.
- `spark.datasource.flint.socket_timeout_millis`: default value is 60000.
- `spark.datasource.flint.request.completionDelayMillis`: Time to wait in milliseconds after request is complete. Applied after index creation. Default value is 0.
- `spark.datasource.flint.request.completionDelayMillis`: Time to wait in milliseconds after request is complete. Applied after index creation. Default value is 2000 if using aoss service, otherwise 0.
- `spark.flint.monitor.initialDelaySeconds`: Initial delay in seconds before starting the monitoring task. Default value is 15.
- `spark.flint.monitor.intervalSeconds`: Interval in seconds for scheduling the monitoring task. Default value is 60.
- `spark.flint.monitor.maxErrorCount`: Maximum number of consecutive errors allowed before stopping the monitoring task. Default value is 5.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public class FlintOptions implements Serializable {

public static final String REQUEST_COMPLETION_DELAY_MILLIS = "request.completionDelayMillis";
public static final int DEFAULT_REQUEST_COMPLETION_DELAY_MILLIS = 0;
public static final int DEFAULT_AOSS_REQUEST_COMPLETION_DELAY_MILLIS = 2000;

public static final String DATA_SOURCE_NAME = "spark.flint.datasource.name";

Expand Down Expand Up @@ -182,7 +183,10 @@ public int getSocketTimeoutMillis() {
}

public int getRequestCompletionDelayMillis() {
return Integer.parseInt(options.getOrDefault(REQUEST_COMPLETION_DELAY_MILLIS, String.valueOf(DEFAULT_REQUEST_COMPLETION_DELAY_MILLIS)));
int defaultValue = SERVICE_NAME_AOSS.equals(getServiceName())
? DEFAULT_AOSS_REQUEST_COMPLETION_DELAY_MILLIS
: DEFAULT_REQUEST_COMPLETION_DELAY_MILLIS;
return Integer.parseInt(options.getOrDefault(REQUEST_COMPLETION_DELAY_MILLIS, String.valueOf(defaultValue)));
}

public String getDataSourceName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ object FlintSparkConf {
FlintConfig(s"spark.datasource.flint.${FlintOptions.REQUEST_COMPLETION_DELAY_MILLIS}")
.datasourceOption()
.doc("delay in milliseconds after index creation is completed")
.createWithDefault(String.valueOf(FlintOptions.DEFAULT_REQUEST_COMPLETION_DELAY_MILLIS))
.createOptional()
val DATA_SOURCE_NAME =
FlintConfig(s"spark.flint.datasource.name")
.doc("data source name")
Expand Down Expand Up @@ -347,8 +347,7 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
SOCKET_TIMEOUT_MILLIS,
JOB_TYPE,
REPL_INACTIVITY_TIMEOUT_MILLIS,
BATCH_BYTES,
REQUEST_COMPLETION_DELAY_MILLIS)
BATCH_BYTES)
.map(conf => (conf.optionKey, conf.readFrom(reader)))
.toMap

Expand All @@ -362,7 +361,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
REQUEST_INDEX,
METADATA_ACCESS_AWS_CREDENTIALS_PROVIDER,
EXCLUDE_JOB_IDS,
SCROLL_SIZE)
SCROLL_SIZE,
REQUEST_COMPLETION_DELAY_MILLIS)
.map(conf => (conf.optionKey, conf.readFrom(reader)))
.flatMap {
case (_, None) => None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ class FlintSparkConfSuite extends FlintSuite {
FlintSparkConf().flintOptions().getRequestCompletionDelayMillis shouldBe 0
}

test("test request completionDelayMillis default value for aoss") {
val options = FlintSparkConf(Map("auth.servicename" -> "aoss").asJava).flintOptions()
options.getRequestCompletionDelayMillis shouldBe 2000
}

test("test specified request completionDelayMillis") {
val options =
FlintSparkConf(Map("request.completionDelayMillis" -> "1000").asJava).flintOptions()
Expand Down

0 comments on commit df706c2

Please sign in to comment.