- A Java Developer Kit (JDK), version 11 or later
- Maven
- Clone the project and enter the samples directory:
git clone https://github.com/Azure/azure-kusto-java.git
cd samples
This sample will demonstrate how to execute a query.
Sample Code
- Create Azure Data Explorer Cluster and DB
- Create Azure Active Directory App Registration and grant it permissions to DB (save the app key and the application ID for later)
- Build Connection string and initialize the client
ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(
System.getProperty("clusterPath"),
System.getProperty("appId"),
System.getProperty("appKey"),
System.getProperty("appTenant"));
Client client = ClientFactory.createClient(csb);
If you'd like to tweak the underlying HTTP client used to make the requests, build an HTTP client properties object and use that along the Connection string to initialise the client:
ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(
System.getProperty("clusterPath"),
System.getProperty("appId"),
System.getProperty("appKey"),
System.getProperty("appTenant"));
HttpClientProperties properties = HttpClientProperties.builder()
.keepAlive(true)
.maxKeepAliveTime(120)
.maxConnectionsTotal(40)
.build();
Client client = ClientFactory.createClient(csb, properties);
- Execute query
KustoOperationResult results = client.execute( System.getProperty("dbName"), System.getProperty("query"));
cd samples
mvn clean compile exec:java -Dexec.mainClass="Query" \
-DclusterPath="cluster/path" \
-DappId="app-id" \
-DappKey="appKey" \
-DappTenant="subscription-id" \
-DdbName="dbName" \
-Dquery="your | query"
This sample shows some more advanced options available when querying data, like using query parameters to guard against injection attacks and extracting individual values from the query results.
Sample Code for Advanced Query
- Create Azure Data Explorer Cluster and DB
- Create Azure Active Directory App Registration and grant it permissions to DB (save the app key and the application ID for later). Principal's permission must be at least 'Database user'.
- Creating a table with initial data using the .set-or-replace command
String tableCommand = String.join(newLine,
".set-or-replace Events <|",
"range x from 1 to 100 step 1",
"| extend ts = totimespan(strcat(x,'.00:00:00'))",
"| project timestamp = now(ts), eventName = strcat('event ', x)");
client.execute(database, tableCommand);
- Using query parameters to guard against injection attacks
ClientRequestProperties clientRequestProperties = new ClientRequestProperties();
clientRequestProperties.setParameter("eventNameFilter", "event 1");
String query = String.join(newLine,
"declare query_parameters(eventNameFilter:string);",
"Events",
"| where eventName == eventNameFilter");
KustoOperationResult results = client.execute(database, query, clientRequestProperties);
- Extracting individual values from the query results
KustoResultSetTable mainTableResult = results.getPrimaryResults();
System.out.printf("Kusto sent back %s rows.%n", mainTableResult.count());
// iterate values
List<Event> events = new ArrayList<>();
while (mainTableResult.next()) {
events.add(new Event(
mainTableResult.getKustoDateTime("timestamp"),
mainTableResult.getString("eventName")));
}
Note: Running this sample will create a table named Events
in the given database.
cd samples
mvn clean compile exec:java -Dexec.mainClass="AdvancedQuery" \
-DclusterPath="cluster/path" \
-DappId="app-id" \
-DappKey="appKey" \
-DappTenant="tenant-id" \
-DdbName="dbName"
This sample will demonstrate how to ingest data from a file into table.
Sample Code
- Create Azure Data Explorer Cluster and DB
- Create Azure Active Directory App Registration and grant it permissions to DB (save the app key and the application ID for later)
- Create a table in the DB
- Create a mapping between the file and the table
- Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(System.getProperty("clusterPath"),
System.getProperty("appId"),
System.getProperty("appKey"),
System.getProperty("appTenant"));
- Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties = new IngestionProperties(System.getProperty("dbName"),
System.getProperty("tableName"));
ingestionProperties.getIngestionMapping().setIngestionMappingReference(System.getProperty("dataMappingName"), IngestionMapping.IngestionMappingKind.Csv);
- Load file and ingest it into table
FileSourceInfo fileSourceInfo = new FileSourceInfo(System.getProperty("filePath"), 0);
IngestionResult ingestionResult = client.ingestFromFile(fileSourceInfo, ingestionProperties);
cd samples
mvn clean compile exec:java -Dexec.cleanupDaemonThreads=false \
-Dexec.mainClass="FileIngestion" \
-DclusterPath="cluster/path" \
-DappId="app-id" \
-DappKey="appKey" \
-DappTenant="subscription-id" \
-DdbName="dbName" \
-DtableName="tableName" \
-DdataMappingName="dataMappingName" \
-DfilePath="file/path"
This sample will demonstrate how to ingest data using the streaming ingest client. {Sample Code](src/main/java/StreamingIngest)
- Create Azure Data Explorer Cluster and DB
- Create Azure Active Directory App Registration and grant it permissions to DB (save the app key and the application ID for later)
- Create a table in the DB:
.create table StreamingIngest (rownumber:int, rowguid:string, xdouble:real, xfloat:real, xbool:bool, xint16:int, xint32:int, xint64:long, xuint8:long, xuint16:long, xuint32:long, xuint64:long, xdate:datetime, xsmalltext:string, xtext:string, xnumberAsText:string, xtime:timespan, xtextWithNulls:string, xdynamicWithNulls:dynamic)
.create table StreamingIngest ingestion json mapping "JsonMapping" '[{"column":"rownumber","path": "$.rownumber", "datatype":"int" },{"column":"rowguid", "path":"$.rowguid","datatype":"string" },{"column":"xdouble", "path":"$.xdouble", "datatype":"real" },{"column":"xfloat", "path":"$.xfloat", "datatype":"real" },{"column":"xbool", "path":"$.xbool", "datatype":"bool" },{"column":"xint16", "path":"$.xint16", "datatype":"int" },{"column":"xint32", "path":"$.xint32", "datatype":"int" },{"column":"xint64", "path":"$.xint64", "datatype":"long" },{"column":"xuint8", "path":"$.xuint8", "datatype":"long"},{"column":"xuint16", "path":"$.xuint16", "datatype":"long"},{"column":"xuint32", "path":"$.xuint32", "datatype":"long"},{"column":"xuint64", "path":"$.xuint64", "datatype":"long"},{"column":"xdate", "path":"$.xdate", "datatype":"datetime"},{"column":"xsmalltext", "path":"$.xsmalltext","datatype":"string"},{"column":"xtext", "path":"$.xtext","datatype":"string"},{"column":"xnumberAsText", "path":"$.xnumberAsText","datatype":"string"},{"column":"xtime", "path":"$.xtime","datatype":"timespan"}, {"column":"xtextWithNulls", "path":"$.xtextWithNulls","datatype":"string"}, {"column":"xdynamicWithNulls", "path":"$.xdynamicWithNulls","datatype":"dynamic"}]'
- Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(System.getProperty("clusterPath"),
System.getProperty("appId"),
System.getProperty("appKey"),
System.getProperty("appTenant"));
- Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties = new IngestionProperties(System.getProperty("dbName"),
System.getProperty("tableName"));
- Create Source info
StreamSourceInfo:
InputStream inputStream = new ByteArrayInputStream(Charset.forName("UTF-8").encode(data).array());
StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream);
If the data is compressed:
streamSourceInfo.setCompressionType(CompressionType.gz);
FileSourceInfo:
FileSourceInfo fileSourceInfo = new FileSourceInfo(path, new File(path).length());
- Ingest into table and verify ingestion status
From stream:
OperationStatus status = streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
From File:
OperationStatus status = streamingIngestClient.ingestFromFile(fileSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
cd samples
mvn clean compile exec:java -Dexec.cleanupDaemonThreads=false \
-Dexec.mainClass="StreamingIngest" \
-DclusterPath="cluster/path" \
-DappId="app-id" \
-DappKey="appKey" \
-DappTenant="subscription-id" \
-DdbName="dbName" \
-DtableName="tableName" \
-DdataMappingName="dataMappingName"
Take a look at this Sample Code to learn how to run File Ingestion using CompletableFutures in order to make the calls asynchronously.
Note: The implementation itself of the File Ingestion API, like all the other APIs in this version, is not asynchronous.
This sample will demonstrate how to retrieve ingestion status.
Sample Code
- Create Azure Data Explorer Cluster and DB
- Create Azure Active Directory App Registration and grant it permissions to DB (save the app key and the application ID for later)
- Create a table in the DB
- Create a mapping between the file and the table
- Set timeout for status retrieval :
Integer timeoutInSec = Integer.getInteger("timeoutInSec");
- Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials( System.getProperty("clusterPath"),
System.getProperty("appId"),
System.getProperty("appKey"),
System.getProperty("appTenant"));
- Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties = new IngestionProperties( System.getProperty("dbName"),
System.getProperty("tableName"));
ingestionProperties.getIngestionMapping().setIngestionMappingReference(System.getProperty("dataMappingName"), IngestionMapping.IngestionMappingKind.Csv);
ingestionProperties.setReportMethod(QueueAndTable);
ingestionProperties.setReportLevel(IngestionProperties.IngestionReportLevel.FAILURES_AND_SUCCESSES);
- Load file and ingest it into table
FileSourceInfo fileSourceInfo = new FileSourceInfo(System.getProperty("filePath"), 0);
IngestionResult ingestionResult = client.ingestFromFile(fileSourceInfo, ingestionProperties);
- Retrieve ingestion status and wait for result
List<IngestionStatus> statuses = ingestionResult.getIngestionStatusCollection();
while (statuses.get(0).status == OperationStatus.Pending && timeoutInSec > 0) {
Thread.sleep(1000);
timeoutInSec -= 1;
statuses = ingestionResult.getIngestionStatusCollection();
}
To run this sample:
cd samples
mvn clean compile exec:java -Dexec.mainClass="TableStatus" \
-DclusterPath="cluster/path" \
-DappId="app-id" \
-DappKey="appKey" \
-DappTenant="subscription-id" \
-DdbName="dbName" \
-DtableName="tableName" \
-DdataMappingName="dataMappingName" \
-DfilePath="file/path"
-DtimeoutInSec=300
- Simple Jmeter sample to test stress test of the clients and cluster
Fill the right parameters in jmeter_test_load.properties file, the application provided should be granted User and Ingestor permissions on the database. The performance we tested here was of the client, and therefore we use the logs to compare total run time of each request. A warn log is logged for each request with and can be parsed as one TXT (given column name is 'data') with the following KQL:
jmeterLog | parse-where data with Datetime:string " WARN " Text:string "after: " ms:int * | summarize percentiles(ms, 5,80, 90,95),count(), avg(ms) by substring(Text,39)
If you don't have a Microsoft Azure subscription you can get a FREE trial account here
This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact [email protected] with any additional questions or comments.