Skip to content

Commit

Permalink
[flink] Make FileStoreLookupFunction.refreshBlacklist nullable to avo…
Browse files Browse the repository at this point in the history
…id performance regression
  • Loading branch information
JingsongLi committed Nov 5, 2024
1 parent c17cdd5 commit c7170e6
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public class FileStoreLookupFunction implements Serializable, Closeable {
private final List<String> projectFields;
private final List<String> joinKeys;
@Nullable private final Predicate predicate;
private final RefreshBlacklist refreshBlacklist;
@Nullable private final RefreshBlacklist refreshBlacklist;

private transient File path;
private transient LookupTable lookupTable;
Expand Down Expand Up @@ -136,7 +136,7 @@ public FileStoreLookupFunction(
this.predicate = predicate;

this.refreshBlacklist =
new RefreshBlacklist(
RefreshBlacklist.create(
table.options().get(LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST.key()));
}

Expand Down Expand Up @@ -284,7 +284,7 @@ private void reopen() {
@VisibleForTesting
void tryRefresh() throws Exception {
// 1. check if this time is in black list
if (!refreshBlacklist.canRefresh()) {
if (refreshBlacklist != null && !refreshBlacklist.canRefresh()) {
return;
}

Expand Down Expand Up @@ -335,7 +335,7 @@ LookupTable lookupTable() {

@VisibleForTesting
long nextBlacklistCheckTime() {
return refreshBlacklist.nextBlacklistCheckTime();
return refreshBlacklist == null ? -1 : refreshBlacklist.nextBlacklistCheckTime();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -40,12 +42,22 @@ public class RefreshBlacklist {

private long nextBlacklistCheckTime;

public RefreshBlacklist(String blacklist) {
this.timePeriodsBlacklist = parseTimePeriodsBlacklist(blacklist);
public RefreshBlacklist(List<Pair<Long, Long>> timePeriodsBlacklist) {
this.timePeriodsBlacklist = timePeriodsBlacklist;
this.nextBlacklistCheckTime = -1;
}

private List<Pair<Long, Long>> parseTimePeriodsBlacklist(String blacklist) {
@Nullable
public static RefreshBlacklist create(String blacklist) {
List<Pair<Long, Long>> timePeriodsBlacklist = parseTimePeriodsBlacklist(blacklist);
if (timePeriodsBlacklist.isEmpty()) {
return null;
}

return new RefreshBlacklist(timePeriodsBlacklist);
}

private static List<Pair<Long, Long>> parseTimePeriodsBlacklist(String blacklist) {
if (StringUtils.isNullOrWhitespaceOnly(blacklist)) {
return Collections.emptyList();
}
Expand All @@ -69,7 +81,7 @@ private List<Pair<Long, Long>> parseTimePeriodsBlacklist(String blacklist) {
return result;
}

private long parseToMillis(String dateTime) {
private static long parseToMillis(String dateTime) {
try {
return DateTimeUtils.parseTimestampData(dateTime + ":00", 3, TimeZone.getDefault())
.getMillisecond();
Expand Down

0 comments on commit c7170e6

Please sign in to comment.