-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ingest interface #16
Ingest interface #16
Conversation
@rabee333 FYI |
@@ -22,7 +24,7 @@ public IngestionBlobInfo(String blobPath, String databaseName, String tableName) | |||
this.databaseName = databaseName; | |||
this.tableName = tableName; | |||
id = UUID.randomUUID(); | |||
retainBlobOnSuccess = false; | |||
retainBlobOnSuccess = true; //false doesn't seem to work |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean? If set to false, it does not mean the blob will be deleted at once.
Kusto will drop temporary containers once a day
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vladikbr I thought we agreed that we won't be in charge of GC for clients. I did the same in python..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will not be in charge for user GC (for the ingestFromBlob scenario).
But we will (and we do) take care of the intermediate blobs we create on ingestFromFile/Stream scenarios.
These are taken care of by the DM service by daily dropping of the containers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant that if this is set to false, the blob is still retained even after some time (I tried it a while back, so I might be wrong here).
But now I'm confused, if I set it to retainBlob...=true it will stay there forever?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vladikbr , we discussed this today, until we handle this better in the DM, ingesting from blob should mark this as true
, where as all other options mark it as false
. that is because we don't want to delete the source blob of the customer, just the intermediate blobs created when ingesting from a source that is not a blob (file, stream...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another way to look at it is that Kusto should only delete blobs on containers it owns.
That's why I think this parameter is confusing - as a user I'm not aware of the internal-intermediate blobs kusto manages, I only know my source ones. so when setting this to false I expect my source blob to be deleted...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thing is that the intermediate blobs are stored at DM storage accounts, therefore will be deleted after 1 week.
} | ||
|
||
@Override | ||
public KustoIngestionResult ingestFromBlob(BlobSourceInfo blobSourceInfo, KustoIngestionProperties ingestionProperties) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vladikbr we said we are gonna drop ingestion results for now, and only support Status queues, which means ingestion methods are basically async and have no result right?
ingest/src/main/java/com/microsoft/azure/kusto/ingest/KustoIngestClientImpl.java
Outdated
Show resolved
Hide resolved
import java.time.Instant; | ||
import java.util.*; | ||
import java.util.stream.Collectors; | ||
public interface KustoIngestClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might "Kusto" should be removed from the name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tend to agree. We have Kusto in the package names
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some comments and suggestions
|
||
public class KustoIngestClientFactory { | ||
|
||
public static KustoIngestClient createClient(KustoConnectionStringBuilder kcsb) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we will have other types of clients in the future, then let's add ClientType enum parameter to the createClient() method (currently includes only one type), this is to leave the usage of this method as is in the future, without breaking the implementations of users when they upgrade to next versions.
-> Also I suggest to rename the method to getClient().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. This should resemble what we have in .Net - and this method should be called createQueuedIngestClient(). We will differentiate by method, not by an argument.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vladikbr we had a discussion on the whole usage of the term "queued" and how it confuses people/users getting started with Kusto. Let's continue offline.
@@ -1,4 +1,4 @@ | |||
package com.microsoft.azure.kusto.ingest;//import org.joda.time.DateTime; | |||
package com.microsoft.azure.kusto.ingest.result;//import org.joda.time.DateTime; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the comment
import com.microsoft.azure.storage.StorageException; | ||
|
||
public interface IKustoIngestionResult { | ||
public interface KustoIngestionResult { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Kusto" might be removed from the name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we have Kusto in all the package names already
@@ -30,8 +35,9 @@ public static void main(String[] args) throws Exception { | |||
KustoIngestionProperties ingestionProperties = new KustoIngestionProperties(dbName, tableName); | |||
ingestionProperties.setReportLevel(KustoIngestionProperties.IngestionReportLevel.FailuresAndSuccesses); | |||
ingestionProperties.setReportMethod(KustoIngestionProperties.IngestionReportMethod.Table); | |||
IKustoIngestionResult kustoIngestionResult = client.ingestFromMultipleBlobs(Collections.singletonList(blob), | |||
false, ingestionProperties); | |||
//IKustoIngestionResult kustoIngestionResult = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the comment
ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestClientFactory.java
Show resolved
Hide resolved
ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestClientImpl.java
Show resolved
Hide resolved
ObjectMapper objectMapper = new ObjectMapper(); | ||
String serializedIngestionBlobInfo = objectMapper.writeValueAsString(ingestionBlobInfo); | ||
|
||
postMessageToQueue(resourceManager.getIngestionResource(ResourceManager.ResourceTypes.SECURED_READY_FOR_AGGREGATION_QUEUE), serializedIngestionBlobInfo); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: can you split it to two lines for readability?
} | ||
} | ||
|
||
@Nullable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why Nullable? Isn't 0 good enough?
|
||
|
||
// TODO: redesign to avoid those wrapper methods over static ones. | ||
public void postMessageToQueue(String queuePath, String serializedIngestionBlobInfo) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private?
AzureStorageHelper.postMessageToQueue(queuePath,serializedIngestionBlobInfo); | ||
} | ||
|
||
public CloudBlockBlob uploadLocalFileToBlob(String filePath, String blobName, String storageUri) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private?
return AzureStorageHelper.uploadLocalFileToBlob(filePath, blobName, storageUri); | ||
} | ||
|
||
public CloudBlockBlob uploadStreamToBlob(InputStream inputStream, String blobName, String storageUri, boolean compress) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private
ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestClientImpl.java
Show resolved
Hide resolved
@@ -22,7 +24,7 @@ public IngestionBlobInfo(String blobPath, String databaseName, String tableName) | |||
this.databaseName = databaseName; | |||
this.tableName = tableName; | |||
id = UUID.randomUUID(); | |||
retainBlobOnSuccess = false; | |||
retainBlobOnSuccess = true; //false doesn't seem to work |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thing is that the intermediate blobs are stored at DM storage accounts, therefore will be deleted after 1 week.
2261969
to
a542f03
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks Great! Thanks Tamir!
Refactor the ingest client implementation to use an interface and factory.
(ingest from stream merged here but not changed since its own PR)
Closes #16