Skip to content

Commit

Permalink
resolve serviceUrl host separately when getConnection of each topic p…
Browse files Browse the repository at this point in the history
…artition
  • Loading branch information
fanjianye committed Jun 6, 2023
1 parent f0e97f4 commit 6180556
Show file tree
Hide file tree
Showing 14 changed files with 243 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,13 @@ protected static String getTlsFileForClient(String name) {
protected PulsarService pulsar;
protected PulsarAdmin admin;
protected PulsarClient pulsarClient;
protected PulsarClient pulsarClientHttpUrlNotAllAvailable;
protected PulsarClient pulsarClientserviceUrlNotAllAvailable;
protected PortForwarder brokerGateway;
protected boolean enableBrokerGateway = false;
protected URL brokerUrl;
protected URL brokerUrlTls;
protected String brokerServiceUrl;

protected URI lookupUrl;

Expand Down Expand Up @@ -164,6 +167,8 @@ protected final void internalSetup() throws Exception {
}
}
pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
pulsarClientHttpUrlNotAllAvailable = newPulsarClient(brokerUrl.toString() + ",localhost:5678,localhost:5677,localhost:5676", 0);
pulsarClientserviceUrlNotAllAvailable = newPulsarClient(brokerServiceUrl + ",localhost:5678,localhost:5677,localhost:5676", 0);
}

protected final void internalSetup(ServiceConfiguration serviceConfiguration) throws Exception {
Expand Down Expand Up @@ -319,6 +324,7 @@ protected void startBroker() throws Exception {

brokerUrl = pulsar.getWebServiceAddress() != null ? new URL(pulsar.getWebServiceAddress()) : null;
brokerUrlTls = pulsar.getWebServiceAddressTls() != null ? new URL(pulsar.getWebServiceAddressTls()) : null;
brokerServiceUrl = pulsar.getBrokerServiceUrl() != null ? pulsar.getBrokerServiceUrl() : null;

if (admin != null) {
admin.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,26 @@ public void testInitialSubscriptionCreationWithAutoCreationDisable()

Assert.assertFalse(admin.topics().getSubscriptions(topic.toString()).contains(initialSubscriptionName));
}

@Test
public void testCreateWhenServiceUrlNotAllAvailable() throws Exception {

final TopicName topic =
TopicName.get("persistent", "public", "default", "testCreateInitialSubscriptionOnPartitionedTopic");
admin.topics().createPartitionedTopic(topic.toString(), 20);

// use pulsar serviceUrl with unavailable host to new producer
Producer<byte[]> producer = pulsarClientserviceUrlNotAllAvailable.newProducer()
.topic(topic.toString())
.create();

producer.close();

// use pulsar httpServiceUrl with unavailable host to new producer
Producer<byte[]> producer2 = pulsarClientHttpUrlNotAllAvailable.newProducer()
.topic(topic.toString())
.create();

producer2.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(T
return findBroker(serviceNameResolver.resolveHost(), false, topicName, 0);
}

public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(TopicName topicName, int currentIndex) {
InetSocketAddress socketAddress = serviceNameResolver.resolveHost(currentIndex);
return findBroker(socketAddress, false, topicName, 0);
}

/**
* calls broker binaryProto-lookup api to get metadata of partitioned-topic.
*
Expand Down Expand Up @@ -259,6 +264,10 @@ public String getServiceUrl() {
return serviceNameResolver.getServiceUrl();
}

public List<InetSocketAddress> getAddressList() {
return serviceNameResolver.getAddressList();
}

@Override
public InetSocketAddress resolveHost() {
return serviceNameResolver.resolveHost();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.pulsar.client.impl;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.pulsar.client.api.PulsarClientException;
Expand All @@ -41,6 +43,7 @@ public class ConnectionHandler {
// Start with -1L because it gets incremented before sending on the first connection
private volatile long epoch = -1L;
protected volatile long lastConnectionClosedTimestamp = 0L;
private AtomicInteger serviceResolverIndex;

interface Connection {
void connectionFailed(PulsarClientException exception);
Expand All @@ -49,11 +52,18 @@ interface Connection {

protected Connection connection;

protected ConnectionHandler(HandlerState state, Backoff backoff, Connection connection) {
protected ConnectionHandler(HandlerState state, Backoff backoff, Connection connection, List<InetSocketAddress> addressList) {
this.state = state;
this.connection = connection;
this.backoff = backoff;
CLIENT_CNX_UPDATER.set(this, null);
this.serviceResolverIndex = new AtomicInteger(randomIndex(addressList.size()));
}

private static int randomIndex(int numAddresses) {
return numAddresses == 1
?
0 : io.netty.util.internal.PlatformDependent.threadLocalRandom().nextInt(numAddresses);
}

protected void grabCnx() {
Expand All @@ -79,7 +89,7 @@ protected void grabCnx() {
} else if (state.topic == null) {
cnxFuture = state.client.getConnectionToServiceUrl();
} else {
cnxFuture = state.client.getConnection(state.topic); //
cnxFuture = state.client.getConnection(state.topic, serviceResolverIndex.getAndIncrement());
}
cnxFuture.thenAccept(cnx -> connection.connectionOpened(cnx)) //
.exceptionally(this::handleConnectionError);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
.setMandatoryStop(0, TimeUnit.MILLISECONDS)
.create(),
this);
this, client.getLookup().getAddressList());

this.topicName = TopicName.get(topic);
if (this.topicName.isPersistent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,4 +264,90 @@ public <T> CompletableFuture<T> get(String path, Class<T> clazz) {

return future;
}

public <T> CompletableFuture<T> get(String path, Class<T> clazz, int currentIndex) {
final CompletableFuture<T> future = new CompletableFuture<>();
try {
URI hostUri = serviceNameResolver.resolveHostUri(currentIndex);
String requestUrl = new URL(hostUri.toURL(), path).toString();
String remoteHostName = hostUri.getHost();
AuthenticationDataProvider authData = authentication.getAuthData(remoteHostName);

CompletableFuture<Map<String, String>> authFuture = new CompletableFuture<>();

// bring a authenticationStage for sasl auth.
if (authData.hasDataForHttp()) {
authentication.authenticationStage(requestUrl, authData, null, authFuture);
} else {
authFuture.complete(null);
}

// auth complete, do real request
authFuture.whenComplete((respHeaders, ex) -> {
if (ex != null) {
log.warn("[{}] Failed to perform http request at authentication stage: {}",
requestUrl, ex.getMessage());
future.completeExceptionally(new PulsarClientException(ex));
return;
}

// auth complete, use a new builder
BoundRequestBuilder builder = httpClient.prepareGet(requestUrl)
.setHeader("Accept", "application/json");

if (authData.hasDataForHttp()) {
Set<Entry<String, String>> headers;
try {
headers = authentication.newRequestHeader(requestUrl, authData, respHeaders);
} catch (Exception e) {
log.warn("[{}] Error during HTTP get headers: {}", requestUrl, e.getMessage());
future.completeExceptionally(new PulsarClientException(e));
return;
}
if (headers != null) {
headers.forEach(entry -> builder.addHeader(entry.getKey(), entry.getValue()));
}
}

builder.execute().toCompletableFuture().whenComplete((response2, t) -> {
if (t != null) {
log.warn("[{}] Failed to perform http request: {}", requestUrl, t.getMessage());
future.completeExceptionally(new PulsarClientException(t));
return;
}

// request not success
if (response2.getStatusCode() != HttpURLConnection.HTTP_OK) {
log.warn("[{}] HTTP get request failed: {}", requestUrl, response2.getStatusText());
Exception e;
if (response2.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
e = new NotFoundException("Not found: " + response2.getStatusText());
} else {
e = new PulsarClientException("HTTP get request failed: " + response2.getStatusText());
}
future.completeExceptionally(e);
return;
}

try {
T data = ObjectMapperFactory.getMapper().reader().readValue(
response2.getResponseBodyAsBytes(), clazz);
future.complete(data);
} catch (Exception e) {
log.warn("[{}] Error during HTTP get request: {}", requestUrl, e.getMessage());
future.completeExceptionally(new PulsarClientException(e));
}
});
});
} catch (Exception e) {
log.warn("[{}]PulsarClientImpl: {}", path, e.getMessage());
if (e instanceof PulsarClientException) {
future.completeExceptionally(e);
} else {
future.completeExceptionally(new PulsarClientException(e));
}
}

return future;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,41 @@ public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(T
});
}

@Override
public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(TopicName topicName, int currentIndex) {
String basePath = topicName.isV2() ? BasePathV2 : BasePathV1;
String path = basePath + topicName.getLookupName();
path = StringUtils.isBlank(listenerName) ? path : path + "?listenerName=" + Codec.encode(listenerName);
return httpClient.get(path, LookupData.class, currentIndex)
.thenCompose(lookupData -> {
// Convert LookupData into as SocketAddress, handling exceptions
URI uri = null;
try {
if (useTls) {
uri = new URI(lookupData.getBrokerUrlTls());
} else {
String serviceUrl = lookupData.getBrokerUrl();
if (serviceUrl == null) {
serviceUrl = lookupData.getNativeUrl();
}
uri = new URI(serviceUrl);
}

InetSocketAddress brokerAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
return CompletableFuture.completedFuture(Pair.of(brokerAddress, brokerAddress));
} catch (Exception e) {
// Failed to parse url
log.warn("[{}] Lookup Failed due to invalid url {}, {}", topicName, uri, e.getMessage());
return FutureUtil.failedFuture(e);
}
});
}

@Override
public List<InetSocketAddress> getAddressList() {
return httpClient.serviceNameResolver.getAddressList();
}

@Override
public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName) {
String format = topicName.isV2() ? "admin/v2/%s/partitions" : "admin/%s/partitions";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -58,6 +59,18 @@ public interface LookupService extends AutoCloseable {
*/
CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(TopicName topicName);

/**
* Calls broker lookup-api to get broker {@link InetSocketAddress} which serves namespace bundle that contains given
* topic. use currentIndex to select the indexed address of the serviceUrl address list
*
* @param topicName
* topic-name
* @param currentIndex
* index of multi-serviceUrl
* @return a pair of addresses, representing the logical and physical address of the broker that serves given topic
*/
CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(TopicName topicName, int currentIndex);

/**
* Returns {@link PartitionedTopicMetadata} for a given topic.
*
Expand Down Expand Up @@ -90,6 +103,13 @@ public interface LookupService extends AutoCloseable {
*/
String getServiceUrl();

/**
* Returns serviceUrl address list.
*
* @return
*/
List<InetSocketAddress> getAddressList();

/**
* Resolves pulsar service url.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
.setMandatoryStop(Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS)
.create(),
this);
this, client.getLookup().getAddressList());

grabCnx();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,12 @@ public CompletableFuture<ClientCnx> getConnection(final String topic) {
.thenCompose(pair -> getConnection(pair.getLeft(), pair.getRight()));
}

public CompletableFuture<ClientCnx> getConnection(final String topic, int currentIndex) {
TopicName topicName = TopicName.get(topic);
return lookup.getBroker(topicName, currentIndex)
.thenCompose(pair -> getConnection(pair.getLeft(), pair.getRight()));
}

public CompletableFuture<ClientCnx> getConnectionToServiceUrl() {
if (!(lookup instanceof BinaryProtoLookupService)) {
return FutureUtil.failedFuture(new PulsarClientException.InvalidServiceURL(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,39 @@ public InetSocketAddress resolveHost() {
}
}

public InetSocketAddress resolveHost(int index) {
List<InetSocketAddress> list = addressList;
checkState(
list != null, "No service url is provided yet");
checkState(
!list.isEmpty(), "No hosts found for service url : " + serviceUrl);
return list.get(index % list.size());
}

@Override
public URI resolveHostUri() {
InetSocketAddress host = resolveHost();
String hostUrl = serviceUri.getServiceScheme() + "://" + host.getHostString() + ":" + host.getPort();
return URI.create(hostUrl);
}

@Override
public URI resolveHostUri(int currentIndex) {
InetSocketAddress host = resolveHost(currentIndex);
String hostUrl = serviceUri.getServiceScheme() + "://" + host.getHostString() + ":" + host.getPort();
return URI.create(hostUrl);
}

@Override
public String getServiceUrl() {
return serviceUrl;
}

@Override
public List<InetSocketAddress> getAddressList() {
return new ArrayList<>(addressList);
}

@Override
public ServiceURI getServiceUri() {
return serviceUri;
Expand Down
Loading

0 comments on commit 6180556

Please sign in to comment.