Skip to content
This repository has been archived by the owner on Dec 14, 2022. It is now read-only.

Commit

Permalink
Fix client cache. (#292)
Browse files Browse the repository at this point in the history
fix: client cache key's hashcode is not unique, use serialized json string instead.
  • Loading branch information
syhily authored Apr 13, 2021
1 parent 4cfa5c0 commit fee9979
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 74 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ jobs:
sudo cp -R protoc/include/* $BASE/include
- name: Run check and test
run: |
./tools/retry.sh mvn -B -ntp clean install
mvn -B -ntp clean install
3 changes: 2 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
<testRetryCount>2</testRetryCount>
<testRetryCount>3</testRetryCount>

<!-- use Pulsar stable version -->
<pulsar.version>2.7.0</pulsar.version>
Expand Down Expand Up @@ -238,6 +238,7 @@
<log4j.configuration>file:${project.build.testOutputDirectory}/log4j-tests.properties</log4j.configuration>
<pulsar.systemtest.image>${pulsar.systemtest.image}</pulsar.systemtest.image>
</systemPropertyVariables>
<rerunFailingTestsCount>${testRetryCount}</rerunFailingTestsCount>
</configuration>
</plugin>
</plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import javax.annotation.Nonnull;

import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;
Expand Down Expand Up @@ -173,7 +172,7 @@ public PulsarClient getClient() {
if (pulsarClient == null) {
try {
pulsarClient = CachedPulsarClient.getOrCreate(pulsarConfiguration);
} catch (ExecutionException e) {
} catch (PulsarClientException e) {
throw new IllegalStateException("Cannot initialize pulsar client", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,6 @@ protected Producer<T> createProducer(
} catch (PulsarClientException e) {
log.error("Failed to create producer for topic {}", topic);
throw new RuntimeException(e);
} catch (ExecutionException e) {
log.error("Failed to getOrCreate a PulsarClient");
throw new RuntimeException(e);
}
}

Expand Down Expand Up @@ -503,7 +500,7 @@ protected void recoverAndCommit(PulsarTransactionState<T> transaction) {
TransactionCoordinatorClientImpl tcClient = CachedPulsarClient.getOrCreate(clientConfigurationData).getTcClient();
TxnID transactionalId = transaction.transactionalId;
tcClient.commit(transactionalId, transaction.pendingMessages);
} catch (ExecutionException executionException) {
} catch (PulsarClientException executionException) {
log.error("Failed to getOrCreate a PulsarClient");
throw new RuntimeException(executionException);
} catch (TransactionCoordinatorClientException.InvalidTxnStatusException statusException) {
Expand All @@ -526,7 +523,7 @@ protected void recoverAndAbort(PulsarTransactionState<T> transaction) {
TransactionCoordinatorClientImpl tcClient = CachedPulsarClient.getOrCreate(clientConfigurationData).getTcClient();
TxnID transactionalId = transaction.transactionalId;
tcClient.abort(transactionalId, transaction.pendingMessages);
} catch (ExecutionException executionException) {
} catch (PulsarClientException executionException) {
log.error("Failed to getOrCreate a PulsarClient");
throw new RuntimeException(executionException);
} catch (TransactionCoordinatorClientException.InvalidTxnStatusException statusException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,31 @@

package org.apache.flink.streaming.connectors.pulsar.internal;

import org.apache.flink.annotation.VisibleForTesting;

import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import lombok.SneakyThrows;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.shade.com.google.common.cache.CacheBuilder;
import org.apache.pulsar.shade.com.google.common.cache.CacheLoader;
import org.apache.pulsar.shade.com.google.common.cache.LoadingCache;
import org.apache.pulsar.shade.com.google.common.cache.RemovalListener;

import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;

/**
* Enable the sharing of same PulsarClient among tasks in a same process.
*/
@Slf4j
@UtilityClass
public class CachedPulsarClient {

private static final ObjectMapper mapper = new ObjectMapper();

private static int cacheSize = 100;

public static void setCacheSize(int newSize) {
Expand All @@ -42,78 +49,73 @@ public static int getCacheSize() {
return cacheSize;
}

private static CacheLoader<ClientConfigurationData, PulsarClientImpl> cacheLoader =
new CacheLoader<ClientConfigurationData, PulsarClientImpl>() {
@Override
public PulsarClientImpl load(ClientConfigurationData key) throws Exception {
return createPulsarClient(key);
}
};

private static RemovalListener<ClientConfigurationData, PulsarClientImpl> removalListener = notification -> {
ClientConfigurationData config = notification.getKey();
private static final RemovalListener<String, PulsarClientImpl> removalListener = notification -> {
String config = notification.getKey();
PulsarClientImpl client = notification.getValue();
log.debug("Evicting pulsar client {} with config {}, due to {}",
client.toString(), config.toString(), notification.getCause().toString());
log.debug("Evicting pulsar client {} with config {}, due to {}", client, config, notification.getCause());

close(config, client);
};

private static volatile LoadingCache<ClientConfigurationData, PulsarClientImpl> guavaCache;

private static LoadingCache<ClientConfigurationData, PulsarClientImpl> getGuavaCache() {
if (guavaCache != null) {
return guavaCache;
}
synchronized (CachedPulsarClient.class) {
if (guavaCache != null) {
return guavaCache;
}
guavaCache = CacheBuilder.newBuilder()
.maximumSize(cacheSize)
.removalListener(removalListener)
.build(cacheLoader);
return guavaCache;
}
}
private static final Cache<String, PulsarClientImpl> clientCache = CacheBuilder.newBuilder()
.maximumSize(cacheSize)
.removalListener(removalListener)
.build();

private static PulsarClientImpl createPulsarClient(
ClientConfigurationData clientConfig) throws PulsarClientException {
private static PulsarClientImpl createPulsarClient(ClientConfigurationData clientConfig) throws PulsarClientException {
PulsarClientImpl client;
try {
client = new PulsarClientImpl(clientConfig);
log.debug("Created a new instance of PulsarClientImpl for clientConf = {}",
clientConfig.toString());
log.debug("Created a new instance of PulsarClientImpl for clientConf = {}", clientConfig);
} catch (PulsarClientException e) {
log.error("Failed to create PulsarClientImpl for clientConf = {}",
clientConfig.toString());
log.error("Failed to create PulsarClientImpl for clientConf = {}", clientConfig);
throw e;
}
return client;
}

public static PulsarClientImpl getOrCreate(ClientConfigurationData config) throws ExecutionException {
return getGuavaCache().get(config);
public static synchronized PulsarClientImpl getOrCreate(ClientConfigurationData config) throws PulsarClientException {
String key = serializeKey(config);
PulsarClientImpl client = clientCache.getIfPresent(key);

if (client == null) {
client = createPulsarClient(config);
clientCache.put(key, client);
}

return client;
}

private static void close(ClientConfigurationData clientConfig, PulsarClientImpl client) {
try {
log.info("Closing the Pulsar client with conifg {}", clientConfig.toString());
client.close();
} catch (PulsarClientException e) {
log.warn(String.format("Error while closing the Pulsar client %s", clientConfig.toString()), e);
private static void close(String clientConfig, PulsarClientImpl client) {
if (client != null) {
try {
log.info("Closing the Pulsar client with config {}", clientConfig);
client.close();
} catch (PulsarClientException e) {
log.warn(String.format("Error while closing the Pulsar client %s", clientConfig), e);
}
}
}

@SneakyThrows
private String serializeKey(ClientConfigurationData clientConfig) {
return mapper.writeValueAsString(clientConfig);
}

@VisibleForTesting
static void close(ClientConfigurationData clientConfig) {
getGuavaCache().invalidate(clientConfig);
String key = serializeKey(clientConfig);
clientCache.invalidate(key);
}

@VisibleForTesting
static void clear() {
log.info("Cleaning up guava cache.");
getGuavaCache().invalidateAll();
clientCache.invalidateAll();
}

static ConcurrentMap<ClientConfigurationData, PulsarClientImpl> getAsMap() {
return getGuavaCache().asMap();
@VisibleForTesting
static ConcurrentMap<String, PulsarClientImpl> getAsMap() {
return clientCache.asMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
Expand All @@ -27,7 +28,6 @@

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -118,9 +118,8 @@ public void run() {
}
}

protected void createActualReader() throws org.apache.pulsar.client.api.PulsarClientException, ExecutionException {
ReaderBuilder<T> readerBuilder = CachedPulsarClient
.getOrCreate(clientConf)
protected void createActualReader() throws PulsarClientException {
ReaderBuilder<T> readerBuilder = CachedPulsarClient.getOrCreate(clientConf)
.newReader(deserializer.getSchema())
.topic(topicRange.getTopic())
.startMessageId(startMessageId)
Expand All @@ -135,7 +134,7 @@ protected void createActualReader() throws org.apache.pulsar.client.api.PulsarCl
reader = readerBuilder.create();
}

protected void skipFirstMessageIfNeeded() throws org.apache.pulsar.client.api.PulsarClientException {
protected void skipFirstMessageIfNeeded() throws PulsarClientException {
Message<?> currentMessage = null;
MessageId currentId;
boolean failOnDataLoss = this.failOnDataLoss;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;

import java.text.MessageFormat;
import java.util.ArrayList;
Expand All @@ -43,8 +43,8 @@ public abstract class PulsarTestBaseWithFlink extends PulsarTestBase {

protected ClusterClient<?> client;

@ClassRule
public static MiniClusterWithClientResource flink = new MiniClusterWithClientResource(
@Rule
public MiniClusterWithClientResource flink = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getFlinkConfiguration())
.setNumberTaskManagers(NUM_TMS)
Expand Down Expand Up @@ -86,7 +86,6 @@ public static void cancelRunningJobs(ClusterClient<?> client) throws Exception {
}

protected String createTableSql(String tableName, String topic, TableSchema tableSchema, String formatType) {

List<String> columns = new ArrayList<>();
for (TableColumn tableColumn : tableSchema.getTableColumns()) {
final String column = MessageFormat.format(" `{0}` {1}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotSame;

/**
* Unit test of {@link CachedPulsarClient}.
Expand All @@ -46,7 +46,7 @@ public void testClientConfClone() {
ClientConfigurationData conf2 = conf1.clone();
conf2.setTlsTrustCertsFilePath("def");

assertTrue(conf1 != conf2);
assertNotSame(conf1, conf2);
assertEquals(conf1.getTlsTrustCertsFilePath(), "abc");
assertEquals(conf2.getTlsTrustCertsFilePath(), "def");
}
Expand Down Expand Up @@ -99,12 +99,12 @@ public void testShouldCloseTheCorrectClient() throws Exception {

assertNotEquals(client1, client2);

ConcurrentMap<ClientConfigurationData, PulsarClientImpl> map1 = CachedPulsarClient.getAsMap();
ConcurrentMap<String, PulsarClientImpl> map1 = CachedPulsarClient.getAsMap();
assertEquals(map1.size(), 2);

CachedPulsarClient.close(conf2);

ConcurrentMap<ClientConfigurationData, PulsarClientImpl> map2 = CachedPulsarClient.getAsMap();
ConcurrentMap<String, PulsarClientImpl> map2 = CachedPulsarClient.getAsMap();
assertEquals(map2.size(), 1);

assertEquals(map2.values().iterator().next(), client1);
Expand Down

0 comments on commit fee9979

Please sign in to comment.