Skip to content

Commit

Permalink
Move reconnect handling to faf server accessor (#3061)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sheikah45 authored Nov 27, 2023
1 parent 8144555 commit d2aa3a5
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 34 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ dependencies {
implementation("io.projectreactor.addons:reactor-extra")
implementation("io.projectreactor:reactor-tools")

def commonsVersion = "deb598df95"
def commonsVersion = "0ff63ee928"

implementation("com.github.FAForever.faf-java-commons:faf-commons-data:${commonsVersion}") {
exclude module: 'guava'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ public static class Irc {
@Data
public static class Server {
private String url;
private int retryDelaySeconds = 30;
private int retryAttempts = 10;
private int retryDelaySeconds = 5;
private int retryAttempts = 60;
}

@Data
Expand Down
102 changes: 71 additions & 31 deletions src/main/java/com/faforever/client/remote/FafServerAccessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.faforever.client.api.TokenRetriever;
import com.faforever.client.config.ClientProperties;
import com.faforever.client.config.ClientProperties.Server;
import com.faforever.client.domain.MatchmakerQueueBean;
import com.faforever.client.domain.PlayerBean;
import com.faforever.client.exception.UIDException;
Expand All @@ -22,6 +23,7 @@
import com.faforever.commons.lobby.GameLaunchResponse;
import com.faforever.commons.lobby.GameVisibility;
import com.faforever.commons.lobby.GpgGameOutboundMessage;
import com.faforever.commons.lobby.LoginException;
import com.faforever.commons.lobby.MatchmakerState;
import com.faforever.commons.lobby.MessageTarget;
import com.faforever.commons.lobby.NoticeInfo;
Expand All @@ -44,6 +46,8 @@
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.function.TupleUtils;
import reactor.util.retry.Retry;

import java.io.IOException;
import java.net.URL;
Expand All @@ -62,7 +66,8 @@
@RequiredArgsConstructor
public class FafServerAccessor implements InitializingBean, DisposableBean {

private final ReadOnlyObjectWrapper<ConnectionState> connectionState = new ReadOnlyObjectWrapper<>(ConnectionState.DISCONNECTED);
private final ReadOnlyObjectWrapper<ConnectionState> connectionState = new ReadOnlyObjectWrapper<>(
ConnectionState.DISCONNECTED);

private final NotificationService notificationService;
private final I18n i18n;
Expand All @@ -75,6 +80,8 @@ public class FafServerAccessor implements InitializingBean, DisposableBean {
@Qualifier("userWebClient")
private final ObjectFactory<WebClient> userWebClientFactory;

private boolean autoReconnect = false;

@Override
public void afterPropertiesSet() throws Exception {
eventBus.register(this);
Expand All @@ -89,6 +96,12 @@ public void afterPropertiesSet() throws Exception {
case CONNECTING -> ConnectionState.CONNECTING;
case CONNECTED -> ConnectionState.CONNECTED;
}).subscribe(connectionState::set, throwable -> log.error("Error processing connection status", throwable));

connectionState.subscribe((oldValue, newValue) -> {
if (autoReconnect && oldValue == ConnectionState.CONNECTED && newValue == ConnectionState.DISCONNECTED) {
connectAndLogIn().subscribe();
}
});
}

public <T extends ServerMessage> Flux<T> getEvents(Class<T> type) {
Expand All @@ -101,12 +114,12 @@ public <T extends ServerMessage> Flux<T> getEvents(Class<T> type) {
@Deprecated
public <T extends ServerMessage> void addEventListener(Class<T> type, Consumer<T> listener) {
lobbyClient.getEvents()
.ofType(type)
.flatMap(message -> Mono.fromRunnable(() -> listener.accept(message)).onErrorResume(throwable -> {
log.error("Could not process listener for `{}`", message, throwable);
return Mono.empty();
}))
.subscribe();
.ofType(type)
.flatMap(message -> Mono.fromRunnable(() -> listener.accept(message)).onErrorResume(throwable -> {
log.error("Could not process listener for `{}`", message, throwable);
return Mono.empty();
}))
.subscribe();
}

public ConnectionState getConnectionState() {
Expand All @@ -118,16 +131,34 @@ public ReadOnlyObjectProperty<ConnectionState> connectionStateProperty() {
}

public Mono<Player> connectAndLogIn() {
autoReconnect = true;
return userWebClientFactory.getObject()
.get()
.uri("/lobby/access")
.retrieve()
.bodyToMono(LobbyAccess.class)
.map(lobbyAccess -> new Config(tokenRetriever.getRefreshedTokenValue(), Version.getCurrentVersion(), clientProperties.getUserAgent(), lobbyAccess.accessUrl(), this::tryGenerateUid, 1024 * 1024, false, clientProperties.getServer()
.getRetryAttempts(), clientProperties.getServer().getRetryDelaySeconds()))
.flatMap(lobbyClient::connectAndLogin);
.get()
.uri("/lobby/access")
.retrieve()
.bodyToMono(LobbyAccess.class)
.map(LobbyAccess::accessUrl)
.zipWith(tokenRetriever.getRefreshedTokenValue())
.map(TupleUtils.function(
(lobbyUrl, token) -> new Config(token, Version.getCurrentVersion(),
clientProperties.getUserAgent(), lobbyUrl,
this::tryGenerateUid, 1024 * 1024, false)))
.flatMap(lobbyClient::connectAndLogin)
.timeout(Duration.ofSeconds(30))
.retryWhen(createRetrySpec(clientProperties.getServer()));
}

private Retry createRetrySpec(Server server) {
return Retry.fixedDelay(server.getRetryAttempts(), Duration.ofSeconds(server.getRetryDelaySeconds()))
.filter(exception -> !(exception instanceof LoginException))
.doBeforeRetry(
retry -> log.warn("Could not reach server retrying: Attempt #{} of {}", retry.totalRetries(),
server.getRetryAttempts(), retry.failure()))
.onRetryExhaustedThrow((spec, retrySignal) -> new LoginException(
"Could not reach server after %d attempts".formatted(spec.maxAttempts), retrySignal.failure()));
}


private String tryGenerateUid(Long sessionId) {
try {
return uidService.generate(String.valueOf(sessionId));
Expand All @@ -137,28 +168,30 @@ private String tryGenerateUid(Long sessionId) {
}

public CompletableFuture<GameLaunchResponse> requestHostGame(NewGameInfo newGameInfo) {
return lobbyClient.requestHostGame(newGameInfo.getTitle(), newGameInfo.getMap(), newGameInfo.getFeaturedMod()
.getTechnicalName(), GameVisibility.valueOf(newGameInfo.getGameVisibility()
.name()), newGameInfo.getPassword(), newGameInfo.getRatingMin(), newGameInfo.getRatingMax(), newGameInfo.getEnforceRatingRange())
.toFuture();
return lobbyClient.requestHostGame(newGameInfo.getTitle(), newGameInfo.getMap(),
newGameInfo.getFeaturedMod().getTechnicalName(),
GameVisibility.valueOf(newGameInfo.getGameVisibility().name()),
newGameInfo.getPassword(), newGameInfo.getRatingMin(),
newGameInfo.getRatingMax(), newGameInfo.getEnforceRatingRange()).toFuture();
}

public CompletableFuture<GameLaunchResponse> requestJoinGame(int gameId, String password) {
return lobbyClient.requestJoinGame(gameId, password).toFuture();
}

public void disconnect() {
autoReconnect = false;
log.info("Closing lobby server connection");
lobbyClient.disconnect();
}

public Mono<Player> reconnect() {
return lobbyClient.getConnectionStatus()
.filter(ConnectionStatus.DISCONNECTED::equals)
.next()
.take(Duration.ofSeconds(5))
.then(connectAndLogIn())
.doOnSubscribe(ignored -> disconnect());
.filter(ConnectionStatus.DISCONNECTED::equals)
.next()
.take(Duration.ofSeconds(5))
.then(connectAndLogIn())
.doOnSubscribe(ignored -> disconnect());
}

public void addFriend(int playerId) {
Expand All @@ -175,14 +208,15 @@ public void requestMatchmakerInfo() {

public CompletableFuture<GameLaunchResponse> startSearchMatchmaker() {
return lobbyClient.getEvents()
.filter(event -> event instanceof GameLaunchResponse)
.next()
.cast(GameLaunchResponse.class)
.toFuture();
.filter(event -> event instanceof GameLaunchResponse)
.next()
.cast(GameLaunchResponse.class)
.toFuture();
}

public void sendGpgMessage(GpgGameOutboundMessage message) {
lobbyClient.sendGpgGameMessage(new GpgGameOutboundMessage(message.getCommand(), message.getArgs(), MessageTarget.GAME));
lobbyClient.sendGpgGameMessage(
new GpgGameOutboundMessage(message.getCommand(), message.getArgs(), MessageTarget.GAME));
}

public void removeFriend(int playerId) {
Expand Down Expand Up @@ -223,8 +257,12 @@ private void onNotice(NoticeInfo noticeMessage) {

if (Objects.equals(noticeMessage.getStyle(), "kick")) {
log.info("Kicked from lobby, client closing after delay");
notificationService.addNotification(new ImmediateNotification(i18n.get("server.kicked.title"), i18n.get("server.kicked.message", clientProperties.getLinks()
.get("linksRules")), Severity.WARN, Collections.singletonList(new DismissAction(i18n))));
notificationService.addNotification(new ImmediateNotification(i18n.get("server.kicked.title"),
i18n.get("server.kicked.message",
clientProperties.getLinks()
.get("linksRules")),
Severity.WARN, Collections.singletonList(
new DismissAction(i18n))));
taskScheduler.scheduleWithFixedDelay(Platform::exit, Duration.ofSeconds(10));
}

Expand All @@ -243,7 +281,9 @@ private void onNotice(NoticeInfo noticeMessage) {
default -> Severity.INFO;
};
}
notificationService.addServerNotification(new ImmediateNotification(i18n.get("messageFromServer"), noticeMessage.getText(), severity, Collections.singletonList(new DismissAction(i18n))));
notificationService.addServerNotification(
new ImmediateNotification(i18n.get("messageFromServer"), noticeMessage.getText(), severity,
Collections.singletonList(new DismissAction(i18n))));
}

public void restoreGameSession(int id) {
Expand Down

0 comments on commit d2aa3a5

Please sign in to comment.