Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closes #345, Introduces KustoRequest wrapper object and async methods #358

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
b2b670d
Introduces KustoQuery wrapper object
Mar 26, 2024
ab1102c
Merge branch 'master' into implement_kusto_query_object
The-Funk Apr 9, 2024
77aaf02
Merge branch 'master' into implement_kusto_query_object
The-Funk Apr 11, 2024
1f1f30d
working on async client
May 4, 2024
939d4a7
updating client with KustoQuery object
May 15, 2024
64b100b
updating client with KustoQuery object
May 15, 2024
0989842
updating client with KustoQuery object
May 15, 2024
abd512d
Merge remote-tracking branch 'origin/implement_kusto_query_object' in…
May 15, 2024
7c07207
Merge branch 'master' into implement_kusto_query_object
The-Funk May 15, 2024
4c4cc46
will simplify later.
May 20, 2024
d000948
Merge remote-tracking branch 'origin/implement_kusto_query_object' in…
May 20, 2024
8099b73
will simplify later.
May 20, 2024
610778e
will simplify later.
May 20, 2024
5972c30
Merge branch 'master' into implement_kusto_query_object
The-Funk May 21, 2024
39d479a
ran formatter, fixed test, fixed bad merge
May 21, 2024
c8c90a5
AtomicBooleans in already modified test. Save some lines, might as well.
May 21, 2024
c59c124
Merge branch 'master' into implement_kusto_query_object
The-Funk May 21, 2024
a5b6d37
hide KustoQuery object
May 22, 2024
2ff7eda
Merge remote-tracking branch 'origin/implement_kusto_query_object' in…
May 22, 2024
e329eef
rename and hide KustoRequest
May 22, 2024
7e7ccc2
adding E2ETests for async queries
Jun 11, 2024
1c96d72
ran formatter
Jun 11, 2024
ab201e4
no complete in sink
The-Funk Jun 21, 2024
96e3527
move blocking operations off the io thread
The-Funk Jun 21, 2024
5cd3b31
run formatter
The-Funk Jun 21, 2024
ae0c72e
replace execute utilization
The-Funk Jun 21, 2024
2904187
fixes teardown. async still doesn't work yet.
Aug 1, 2024
4924c7d
fixes msal4j issue
Aug 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 70 additions & 16 deletions data/src/main/java/com/microsoft/azure/kusto/data/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;

import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.util.Optional;
import java.util.function.BiConsumer;

public abstract class BaseClient implements Client, StreamingClient {

Expand All @@ -30,27 +33,78 @@ public BaseClient(HttpClient httpClient) {
}

protected String post(HttpRequest request) throws DataServiceException {
The-Funk marked this conversation as resolved.
Show resolved Hide resolved

// Todo: Add async version of this method

// Execute and get the response
try (HttpResponse response = httpClient.sendSync(request, Context.NONE)) {
String responseBody = Utils.isGzipResponse(response) ? Utils.gzipedInputToString(response.getBodyAsBinaryData().toStream())
: response.getBodyAsBinaryData().toString();

if (responseBody != null) {
switch (response.getStatusCode()) {
case HttpStatus.OK:
return responseBody;
case HttpStatus.TOO_MANY_REQS:
throw new ThrottleException(request.getUrl().toString());
default:
throw createExceptionFromResponse(request.getUrl().toString(), response, null, responseBody);
}
return processResponseBody(response);
}
}

protected Mono<String> postAsync(HttpRequest request) {
// Execute and get the response
return httpClient.send(request)
.handle(processResponseBodyAsync);
}

// TODO: Asaf and Ohad, thoughts?
The-Funk marked this conversation as resolved.
Show resolved Hide resolved
private final BiConsumer<HttpResponse, SynchronousSink<String>> processResponseBodyAsync = (response, sink) -> {
// To the best of my knowledge, reactive pipelines cannot throw checked exceptions, as they must always complete normally.

// So, in the case of reactive streams we do not want to throw exceptions,
// but instead insert them into the pipeline as an error state.

// This leaves two simple options:
// 1. Slightly abandon DRY and repeat some lines of code when doing synchronous transformations.
// 2. Use exceptions from synchronous methods as flow control and take a slight performance hit.

// In short, instead of making the sync methods blocking wrappers of the async methods, I recommended wrapping
// synchronous transformations like those done below, in order to fit them into a reactive pipeline appropriately.

// Commented code below is an alternative approach that doesn't duplicate code but instead uses exceptions as flow control.
// It is slightly slower on error due to a performance hit from catching exceptions
//try {
// String body = processResponseBody(response);
// if (body != null) {
// sink.next(body);
// }
// sink.complete();
//} catch (Exception e) {
// sink.error(e);
//}

// And here's the main idea, it is slightly redundant/less DRY but uses sink and completes normally without using exceptions as flow control, which is generally not good.
String responseBody = Utils.isGzipResponse(response) ? Utils.gzipedInputToString(response.getBodyAsBinaryData().toStream())
: response.getBodyAsBinaryData().toString();

if (responseBody != null) {
switch (response.getStatusCode()) {
case HttpStatus.OK:
sink.next(responseBody);
case HttpStatus.TOO_MANY_REQS:
sink.error(new ThrottleException(response.getRequest().getUrl().toString()));
default:
sink.error(createExceptionFromResponse(response.getRequest().getUrl().toString(), response, null, responseBody));
}
}
// If null, complete void
sink.complete();
};


return null;
private String processResponseBody(HttpResponse response) throws DataServiceException {
String responseBody = Utils.isGzipResponse(response) ? Utils.gzipedInputToString(response.getBodyAsBinaryData().toStream())
: response.getBodyAsBinaryData().toString();

if (responseBody == null) {
return null;
}
switch (response.getStatusCode()) {
case HttpStatus.OK:
return responseBody;
case HttpStatus.TOO_MANY_REQS:
throw new ThrottleException(response.getRequest().getUrl().toString());
default:
throw createExceptionFromResponse(response.getRequest().getUrl().toString(), response, null, responseBody);
}
}

protected InputStream postToStreamingOutput(HttpRequest request) throws DataServiceException {
Expand Down
31 changes: 31 additions & 0 deletions data/src/main/java/com/microsoft/azure/kusto/data/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,62 @@

import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import com.microsoft.azure.kusto.data.req.KustoQuery;
import reactor.core.publisher.Mono;

import java.io.Closeable;

public interface Client extends Closeable {

KustoOperationResult execute(KustoQuery kq) throws DataServiceException, DataClientException;

KustoOperationResult executeQuery(KustoQuery kq) throws DataServiceException, DataClientException;

KustoOperationResult executeMgmt(KustoQuery kq) throws DataServiceException, DataClientException;

Mono<KustoOperationResult> executeAsync(KustoQuery kq);

Mono<KustoOperationResult> executeQueryAsync(KustoQuery kq);

Mono<KustoOperationResult> executeMgmtAsync(KustoQuery kq);

String executeToJsonResult(KustoQuery kq) throws DataServiceException, DataClientException;

Mono<String> executeToJsonResultAsync(KustoQuery kq);

@Deprecated
KustoOperationResult execute(String command) throws DataServiceException, DataClientException;

@Deprecated
KustoOperationResult execute(String database, String command) throws DataServiceException, DataClientException;

@Deprecated
KustoOperationResult execute(String database, String command, ClientRequestProperties properties) throws DataServiceException, DataClientException;

@Deprecated
KustoOperationResult executeQuery(String command) throws DataServiceException, DataClientException;

@Deprecated
KustoOperationResult executeQuery(String database, String command) throws DataServiceException, DataClientException;

@Deprecated
KustoOperationResult executeQuery(String database, String command, ClientRequestProperties properties) throws DataServiceException, DataClientException;

@Deprecated
KustoOperationResult executeMgmt(String command) throws DataServiceException, DataClientException;

@Deprecated
KustoOperationResult executeMgmt(String database, String command) throws DataServiceException, DataClientException;

@Deprecated
KustoOperationResult executeMgmt(String database, String command, ClientRequestProperties properties) throws DataServiceException, DataClientException;

@Deprecated
String executeToJsonResult(String database) throws DataServiceException, DataClientException;

@Deprecated
String executeToJsonResult(String database, String command) throws DataServiceException, DataClientException;

@Deprecated
String executeToJsonResult(String database, String command, ClientRequestProperties properties) throws DataServiceException, DataClientException;
}
134 changes: 92 additions & 42 deletions data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
import com.microsoft.azure.kusto.data.instrumentation.SupplierOneException;
import com.microsoft.azure.kusto.data.instrumentation.SupplierTwoExceptions;
import com.microsoft.azure.kusto.data.instrumentation.TraceableAttributes;
import com.microsoft.azure.kusto.data.req.KustoQuery;
import org.apache.commons.lang3.StringUtils;

import org.apache.http.client.utils.URIBuilder;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Mono;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -83,6 +85,42 @@ public ClientImpl(ConnectionStringBuilder csb, HttpClient httpClient) throws URI
clientDetails = new ClientDetails(csb.getApplicationNameForTracing(), csb.getUserNameForTracing(), csb.getClientVersionForTracing());
}

@Override
public KustoOperationResult execute(KustoQuery kq) throws DataServiceException, DataClientException {
return executeImpl(kq);
}

@Override
public KustoOperationResult executeQuery(KustoQuery kq) throws DataServiceException, DataClientException {
if (kq != null && kq.getCommandType() != CommandType.QUERY) {
kq.setCommandType(CommandType.QUERY);
}
return executeImpl(kq);
}

@Override
public KustoOperationResult executeMgmt(KustoQuery kq) throws DataServiceException, DataClientException {
if (kq != null && kq.getCommandType() != CommandType.ADMIN_COMMAND) {
kq.setCommandType(CommandType.ADMIN_COMMAND);
}
return executeImpl(kq);
}

@Override
public Mono<KustoOperationResult> executeAsync(KustoQuery kq) {
return null;
}

@Override
public Mono<KustoOperationResult> executeQueryAsync(KustoQuery kq) {
return null;
}

@Override
public Mono<KustoOperationResult> executeMgmtAsync(KustoQuery kq) {
return null;
}

@Override
public KustoOperationResult execute(String command) throws DataServiceException, DataClientException {
return execute(DEFAULT_DATABASE_NAME, command);
Expand All @@ -100,9 +138,9 @@ public KustoOperationResult execute(String database, String command, ClientReque

private KustoOperationResult execute(String database, String command, ClientRequestProperties properties, CommandType commandType)
throws DataServiceException, DataClientException {
KustoQuery kq = new KustoQuery(command, database, properties, commandType);
return MonitoredActivity.invoke(
(SupplierTwoExceptions<KustoOperationResult, DataServiceException, DataClientException>) () -> executeImpl(database, command, properties,
commandType),
(SupplierTwoExceptions<KustoOperationResult, DataServiceException, DataClientException>) () -> executeImpl(kq),
commandType.getActivityTypeSuffix().concat(".execute"),
updateAndGetExecuteTracingAttributes(database, properties));
}
Expand Down Expand Up @@ -150,10 +188,9 @@ private Map<String, String> updateAndGetExecuteTracingAttributes(String database
}

@NotNull
private KustoOperationResult executeImpl(String database, String command, ClientRequestProperties properties, CommandType commandType)
throws DataServiceException, DataClientException {
String response = executeToJsonResult(database, command, properties);
String clusterEndpoint = String.format(commandType.getEndpoint(), clusterUrl);
private KustoOperationResult executeImpl(KustoQuery kq) throws DataServiceException, DataClientException {
String response = executeToJsonResult(kq);
String clusterEndpoint = String.format(kq.getCommandType().getEndpoint(), clusterUrl);
try {
return new KustoOperationResult(response, clusterEndpoint.endsWith("v2/rest/query") ? "v2" : "v1");
} catch (KustoServiceQueryError e) {
Expand All @@ -165,53 +202,62 @@ private KustoOperationResult executeImpl(String database, String command, Client
}

@Override
public String executeToJsonResult(String command) throws DataServiceException, DataClientException {
return executeToJsonResult(DEFAULT_DATABASE_NAME, command);
}
public String executeToJsonResult(KustoQuery kq) throws DataServiceException, DataClientException {
if (kq == null) {
throw new IllegalArgumentException("KustoQuery object cannot be null in order to be executed.");
}

@Override
public String executeToJsonResult(String database, String command) throws DataServiceException, DataClientException {
return executeToJsonResult(database, command, null);
}
// Validate and optimize the query object
kq.validateAndOptimize();

@Override
public String executeToJsonResult(String database, String command, ClientRequestProperties properties) throws DataServiceException, DataClientException {
// Argument validation
if (StringUtils.isEmpty(database)) {
throw new IllegalArgumentException("Database is empty");
}
if (StringUtils.isEmpty(command)) {
throw new IllegalArgumentException("Command is empty");
}
command = command.trim();
CommandType commandType = determineCommandType(command);
String clusterEndpoint = String.format(commandType.getEndpoint(), clusterUrl);
String clusterEndpoint = String.format(kq.getCommandType().getEndpoint(), clusterUrl);
String authorization = getAuthorizationHeaderValue();

// Validate the endpoint
// Validate the endpoint (?)
validateEndpoint();

// Build the tracing object
HttpTracing tracing = HttpTracing
.newBuilder()
.withProperties(properties)
.withProperties(kq.getProperties())
.withRequestPrefix("KJC.execute")
.withActivitySuffix(commandType.getActivityTypeSuffix())
.withActivitySuffix(kq.getCommandType().getActivityTypeSuffix())
.withClientDetails(clientDetails)
.build();

// Build the HTTP request
HttpRequest request = HttpRequestBuilder
.newPost(clusterEndpoint)
.createCommandPayload(database, command, properties)
.createCommandPayload(kq)
.withTracing(tracing)
.withAuthorization(authorization)
.build();

// Get the response and trace the call
return MonitoredActivity.invoke(
(SupplierOneException<String, DataServiceException>) () -> post(request),
commandType.getActivityTypeSuffix().concat(".executeToJsonResult"));
kq.getCommandType().getActivityTypeSuffix().concat(".executeToJsonResult"));
}

@Override
public Mono<String> executeToJsonResultAsync(KustoQuery kq) {
The-Funk marked this conversation as resolved.
Show resolved Hide resolved
return null;
}

@Override
public String executeToJsonResult(String command) throws DataServiceException, DataClientException {
return executeToJsonResult(DEFAULT_DATABASE_NAME, command);
}

@Override
public String executeToJsonResult(String database, String command) throws DataServiceException, DataClientException {
return executeToJsonResult(database, command, null);
}

@Override
public String executeToJsonResult(String database, String command, ClientRequestProperties properties) throws DataServiceException, DataClientException {
KustoQuery kq = new KustoQuery(command, database, properties);
return executeToJsonResult(kq);
}

private void validateEndpoint() throws DataServiceException, DataClientException {
Expand Down Expand Up @@ -350,15 +396,19 @@ public InputStream executeStreamingQuery(String database, String command) throws
@Override
public InputStream executeStreamingQuery(String database, String command, ClientRequestProperties properties)
throws DataServiceException, DataClientException {
if (StringUtils.isEmpty(database)) {
throw new IllegalArgumentException("Database is empty");
}
if (StringUtils.isEmpty(command)) {
throw new IllegalArgumentException("Command is empty");
KustoQuery kq = new KustoQuery(command, database, properties);
return executeStreamingQuery(kq);
}

public InputStream executeStreamingQuery(KustoQuery kq) throws DataServiceException, DataClientException {
if (kq == null) {
throw new IllegalArgumentException("KustoQuery object cannot be null in order to be executed.");
}
command = command.trim();
CommandType commandType = determineCommandType(command);
String clusterEndpoint = String.format(commandType.getEndpoint(), clusterUrl);

// Validate and optimize the query object
kq.validateAndOptimize();

String clusterEndpoint = String.format(kq.getCommandType().getEndpoint(), clusterUrl);
String authorization = getAuthorizationHeaderValue();

// Validate the endpoint
Expand All @@ -367,24 +417,24 @@ public InputStream executeStreamingQuery(String database, String command, Client
// Build the tracing object
HttpTracing tracing = HttpTracing
.newBuilder()
.withProperties(properties)
.withProperties(kq.getProperties())
.withRequestPrefix("KJC.executeStreaming")
.withActivitySuffix(commandType.getActivityTypeSuffix())
.withActivitySuffix(kq.getCommandType().getActivityTypeSuffix())
.withClientDetails(clientDetails)
.build();

// Build the HTTP request
HttpRequest request = HttpRequestBuilder
.newPost(clusterEndpoint)
.createCommandPayload(database, command, properties)
.createCommandPayload(kq)
.withTracing(tracing)
.withAuthorization(authorization)
.build();

// Get the response and trace the call
return MonitoredActivity.invoke(
(SupplierOneException<InputStream, DataServiceException>) () -> postToStreamingOutput(request),
"ClientImpl.executeStreamingQuery", updateAndGetExecuteTracingAttributes(database, properties));
"ClientImpl.executeStreamingQuery", updateAndGetExecuteTracingAttributes(kq.getDatabase(), kq.getProperties()));
}

private CommandType determineCommandType(String command) {
Expand Down
Loading