Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resolve serviceUrl host separately when getConnection of each topic p… #9

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,13 @@ protected static String getTlsFileForClient(String name) {
protected PulsarService pulsar;
protected PulsarAdmin admin;
protected PulsarClient pulsarClient;
protected PulsarClient pulsarClientHttpUrlNotAllAvailable;
protected PulsarClient pulsarClientserviceUrlNotAllAvailable;
protected PortForwarder brokerGateway;
protected boolean enableBrokerGateway = false;
protected URL brokerUrl;
protected URL brokerUrlTls;
protected String brokerServiceUrl;

protected URI lookupUrl;

Expand Down Expand Up @@ -164,6 +167,8 @@ protected final void internalSetup() throws Exception {
}
}
pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
pulsarClientHttpUrlNotAllAvailable = newPulsarClient(brokerUrl.toString() + ",localhost:5678,localhost:5677,localhost:5676", 0);
pulsarClientserviceUrlNotAllAvailable = newPulsarClient(brokerServiceUrl + ",localhost:5678,localhost:5677,localhost:5676", 0);
}

protected final void internalSetup(ServiceConfiguration serviceConfiguration) throws Exception {
Expand Down Expand Up @@ -319,6 +324,7 @@ protected void startBroker() throws Exception {

brokerUrl = pulsar.getWebServiceAddress() != null ? new URL(pulsar.getWebServiceAddress()) : null;
brokerUrlTls = pulsar.getWebServiceAddressTls() != null ? new URL(pulsar.getWebServiceAddressTls()) : null;
brokerServiceUrl = pulsar.getBrokerServiceUrl() != null ? pulsar.getBrokerServiceUrl() : null;

if (admin != null) {
admin.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,26 @@ public void testInitialSubscriptionCreationWithAutoCreationDisable()

Assert.assertFalse(admin.topics().getSubscriptions(topic.toString()).contains(initialSubscriptionName));
}

@Test
public void testCreateWhenServiceUrlNotAllAvailable() throws Exception {

final TopicName topic =
TopicName.get("persistent", "public", "default", "testCreateInitialSubscriptionOnPartitionedTopic");
admin.topics().createPartitionedTopic(topic.toString(), 20);

// use pulsar serviceUrl with unavailable host to new producer
Producer<byte[]> producer = pulsarClientserviceUrlNotAllAvailable.newProducer()
.topic(topic.toString())
.create();

producer.close();

// use pulsar httpServiceUrl with unavailable host to new producer
Producer<byte[]> producer2 = pulsarClientHttpUrlNotAllAvailable.newProducer()
.topic(topic.toString())
.create();

producer2.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(T
return findBroker(serviceNameResolver.resolveHost(), false, topicName, 0);
}

public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(TopicName topicName, int currentIndex) {
InetSocketAddress socketAddress = serviceNameResolver.resolveHost(currentIndex);
return findBroker(socketAddress, false, topicName, 0);
}

/**
* calls broker binaryProto-lookup api to get metadata of partitioned-topic.
*
Expand Down Expand Up @@ -259,6 +264,10 @@ public String getServiceUrl() {
return serviceNameResolver.getServiceUrl();
}

public List<InetSocketAddress> getAddressList() {
return serviceNameResolver.getAddressList();
}

@Override
public InetSocketAddress resolveHost() {
return serviceNameResolver.resolveHost();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.pulsar.client.impl;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.pulsar.client.api.PulsarClientException;
Expand All @@ -41,6 +43,7 @@ public class ConnectionHandler {
// Start with -1L because it gets incremented before sending on the first connection
private volatile long epoch = -1L;
protected volatile long lastConnectionClosedTimestamp = 0L;
private AtomicInteger serviceResolverIndex;

interface Connection {
void connectionFailed(PulsarClientException exception);
Expand All @@ -49,11 +52,18 @@ interface Connection {

protected Connection connection;

protected ConnectionHandler(HandlerState state, Backoff backoff, Connection connection) {
protected ConnectionHandler(HandlerState state, Backoff backoff, Connection connection, List<InetSocketAddress> addressList) {
this.state = state;
this.connection = connection;
this.backoff = backoff;
CLIENT_CNX_UPDATER.set(this, null);
this.serviceResolverIndex = new AtomicInteger(randomIndex(addressList.size()));
}

private static int randomIndex(int numAddresses) {
return numAddresses == 1
?
0 : io.netty.util.internal.PlatformDependent.threadLocalRandom().nextInt(numAddresses);
}

protected void grabCnx() {
Expand All @@ -79,7 +89,7 @@ protected void grabCnx() {
} else if (state.topic == null) {
cnxFuture = state.client.getConnectionToServiceUrl();
} else {
cnxFuture = state.client.getConnection(state.topic); //
cnxFuture = state.client.getConnection(state.topic, serviceResolverIndex.getAndIncrement());
}
cnxFuture.thenAccept(cnx -> connection.connectionOpened(cnx)) //
.exceptionally(this::handleConnectionError);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
.setMandatoryStop(0, TimeUnit.MILLISECONDS)
.create(),
this);
this, client.getLookup().getAddressList());

this.topicName = TopicName.get(topic);
if (this.topicName.isPersistent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,4 +264,90 @@ public <T> CompletableFuture<T> get(String path, Class<T> clazz) {

return future;
}

public <T> CompletableFuture<T> get(String path, Class<T> clazz, int currentIndex) {
final CompletableFuture<T> future = new CompletableFuture<>();
try {
URI hostUri = serviceNameResolver.resolveHostUri(currentIndex);
String requestUrl = new URL(hostUri.toURL(), path).toString();
String remoteHostName = hostUri.getHost();
AuthenticationDataProvider authData = authentication.getAuthData(remoteHostName);

CompletableFuture<Map<String, String>> authFuture = new CompletableFuture<>();

// bring a authenticationStage for sasl auth.
if (authData.hasDataForHttp()) {
authentication.authenticationStage(requestUrl, authData, null, authFuture);
} else {
authFuture.complete(null);
}

// auth complete, do real request
authFuture.whenComplete((respHeaders, ex) -> {
if (ex != null) {
log.warn("[{}] Failed to perform http request at authentication stage: {}",
requestUrl, ex.getMessage());
future.completeExceptionally(new PulsarClientException(ex));
return;
}

// auth complete, use a new builder
BoundRequestBuilder builder = httpClient.prepareGet(requestUrl)
.setHeader("Accept", "application/json");

if (authData.hasDataForHttp()) {
Set<Entry<String, String>> headers;
try {
headers = authentication.newRequestHeader(requestUrl, authData, respHeaders);
} catch (Exception e) {
log.warn("[{}] Error during HTTP get headers: {}", requestUrl, e.getMessage());
future.completeExceptionally(new PulsarClientException(e));
return;
}
if (headers != null) {
headers.forEach(entry -> builder.addHeader(entry.getKey(), entry.getValue()));
}
}

builder.execute().toCompletableFuture().whenComplete((response2, t) -> {
if (t != null) {
log.warn("[{}] Failed to perform http request: {}", requestUrl, t.getMessage());
future.completeExceptionally(new PulsarClientException(t));
return;
}

// request not success
if (response2.getStatusCode() != HttpURLConnection.HTTP_OK) {
log.warn("[{}] HTTP get request failed: {}", requestUrl, response2.getStatusText());
Exception e;
if (response2.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
e = new NotFoundException("Not found: " + response2.getStatusText());
} else {
e = new PulsarClientException("HTTP get request failed: " + response2.getStatusText());
}
future.completeExceptionally(e);
return;
}

try {
T data = ObjectMapperFactory.getMapper().reader().readValue(
response2.getResponseBodyAsBytes(), clazz);
future.complete(data);
} catch (Exception e) {
log.warn("[{}] Error during HTTP get request: {}", requestUrl, e.getMessage());
future.completeExceptionally(new PulsarClientException(e));
}
});
});
} catch (Exception e) {
log.warn("[{}]PulsarClientImpl: {}", path, e.getMessage());
if (e instanceof PulsarClientException) {
future.completeExceptionally(e);
} else {
future.completeExceptionally(new PulsarClientException(e));
}
}

return future;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,41 @@ public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(T
});
}

@Override
public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(TopicName topicName, int currentIndex) {
String basePath = topicName.isV2() ? BasePathV2 : BasePathV1;
String path = basePath + topicName.getLookupName();
path = StringUtils.isBlank(listenerName) ? path : path + "?listenerName=" + Codec.encode(listenerName);
return httpClient.get(path, LookupData.class, currentIndex)
.thenCompose(lookupData -> {
// Convert LookupData into as SocketAddress, handling exceptions
URI uri = null;
try {
if (useTls) {
uri = new URI(lookupData.getBrokerUrlTls());
} else {
String serviceUrl = lookupData.getBrokerUrl();
if (serviceUrl == null) {
serviceUrl = lookupData.getNativeUrl();
}
uri = new URI(serviceUrl);
}

InetSocketAddress brokerAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
return CompletableFuture.completedFuture(Pair.of(brokerAddress, brokerAddress));
} catch (Exception e) {
// Failed to parse url
log.warn("[{}] Lookup Failed due to invalid url {}, {}", topicName, uri, e.getMessage());
return FutureUtil.failedFuture(e);
}
});
}

@Override
public List<InetSocketAddress> getAddressList() {
return httpClient.serviceNameResolver.getAddressList();
}

@Override
public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName) {
String format = topicName.isV2() ? "admin/v2/%s/partitions" : "admin/%s/partitions";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -58,6 +59,18 @@ public interface LookupService extends AutoCloseable {
*/
CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(TopicName topicName);

/**
* Calls broker lookup-api to get broker {@link InetSocketAddress} which serves namespace bundle that contains given
* topic. use currentIndex to select the indexed address of the serviceUrl address list
*
* @param topicName
* topic-name
* @param currentIndex
* index of multi-serviceUrl
* @return a pair of addresses, representing the logical and physical address of the broker that serves given topic
*/
CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(TopicName topicName, int currentIndex);

/**
* Returns {@link PartitionedTopicMetadata} for a given topic.
*
Expand Down Expand Up @@ -90,6 +103,13 @@ public interface LookupService extends AutoCloseable {
*/
String getServiceUrl();

/**
* Returns serviceUrl address list.
*
* @return
*/
List<InetSocketAddress> getAddressList();

/**
* Resolves pulsar service url.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
.setMandatoryStop(Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS)
.create(),
this);
this, client.getLookup().getAddressList());

grabCnx();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,12 @@ public CompletableFuture<ClientCnx> getConnection(final String topic) {
.thenCompose(pair -> getConnection(pair.getLeft(), pair.getRight()));
}

public CompletableFuture<ClientCnx> getConnection(final String topic, int currentIndex) {
TopicName topicName = TopicName.get(topic);
return lookup.getBroker(topicName, currentIndex)
.thenCompose(pair -> getConnection(pair.getLeft(), pair.getRight()));
}

public CompletableFuture<ClientCnx> getConnectionToServiceUrl() {
if (!(lookup instanceof BinaryProtoLookupService)) {
return FutureUtil.failedFuture(new PulsarClientException.InvalidServiceURL(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,39 @@ public InetSocketAddress resolveHost() {
}
}

public InetSocketAddress resolveHost(int index) {
List<InetSocketAddress> list = addressList;
checkState(
list != null, "No service url is provided yet");
checkState(
!list.isEmpty(), "No hosts found for service url : " + serviceUrl);
return list.get(index % list.size());
}

@Override
public URI resolveHostUri() {
InetSocketAddress host = resolveHost();
String hostUrl = serviceUri.getServiceScheme() + "://" + host.getHostString() + ":" + host.getPort();
return URI.create(hostUrl);
}

@Override
public URI resolveHostUri(int currentIndex) {
InetSocketAddress host = resolveHost(currentIndex);
String hostUrl = serviceUri.getServiceScheme() + "://" + host.getHostString() + ":" + host.getPort();
return URI.create(hostUrl);
}

@Override
public String getServiceUrl() {
return serviceUrl;
}

@Override
public List<InetSocketAddress> getAddressList() {
return new ArrayList<>(addressList);
}

@Override
public ServiceURI getServiceUri() {
return serviceUri;
Expand Down
Loading