Skip to content

Commit

Permalink
Add separate billing project parameter
Browse files Browse the repository at this point in the history
This CL adds the ability to specify a billing project as a URL
parameter in the connection string used to create a BQConnection
instance. Previously, we used the project defined in the URL's path
component for both the billing and default projects. Recall that the
default project and dataset are used to disambiguate unqualified
tables referenced in the query being processed.

Now, when the billing project is passed as a URL parameter, we only
use the path component for the default project. If no billing project
parameter is found, we revert back to our former behavior, using the
path component to define both projects.
  • Loading branch information
mmotoyama committed Nov 8, 2024
1 parent 3518182 commit 537dc93
Show file tree
Hide file tree
Showing 7 changed files with 303 additions and 43 deletions.
35 changes: 30 additions & 5 deletions src/main/java/net/starschema/clouddb/jdbc/BQConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
* @author Gunics Balázs, Horváth Attila
*/
public class BQConnection implements Connection {
public static final String PROJECT_ID_PARAMETER_NAME = "quotaProjectId";

/** Variable to store auto commit mode */
private boolean autoCommitEnabled = false;

Expand All @@ -50,10 +52,14 @@ public class BQConnection implements Connection {
/** The bigquery client to access the service. */
private Bigquery bigquery = null;

/** The default dataset to configure on queries processed by this connection. */
private String dataset = null;

/** The ProjectId for the connection */
private String projectId = null;
/** The default ProjectId to configure on queries processed by this connection. */
private final String defaultProjectId;

/** The ProjectId to use for billing. */
private final String projectId;

/** Boolean to determine if the Connection is closed */
private boolean isclosed = false;
Expand Down Expand Up @@ -151,10 +157,10 @@ public BQConnection(String url, Properties loginProp, HttpTransport httpTranspor
Matcher matchData = projectAndDatasetMatcher.matcher(pathParams);

if (matchData.find()) {
this.projectId = CatalogName.toProjectId(matchData.group(1));
this.defaultProjectId = CatalogName.toProjectId(matchData.group(1));
this.dataset = matchData.group(2);
} else {
this.projectId = CatalogName.toProjectId(pathParams);
this.defaultProjectId = CatalogName.toProjectId(pathParams);
}
} catch (UnsupportedEncodingException e1) {
throw new BQSQLException(e1);
Expand All @@ -179,6 +185,11 @@ public BQConnection(String url, Properties loginProp, HttpTransport httpTranspor
throw new BQSQLException(e2);
}

// extract the 'quotaProjectId' parameter. if absent, then use the defaultProjectId for billing.
this.projectId =
caseInsensitiveProps.getProperty(
PROJECT_ID_PARAMETER_NAME.toLowerCase(), this.defaultProjectId);

String userId = caseInsensitiveProps.getProperty("user");
String userKey = caseInsensitiveProps.getProperty("password");
String userPath = caseInsensitiveProps.getProperty("path");
Expand Down Expand Up @@ -419,6 +430,13 @@ public String getDataSet() {
return this.dataset;
}

/**
* Returns the default dataset that should be configured on queries processed by this connection.
*/
public String getDefaultDataSet() {
return this.dataset;
}

/**
*
*
Expand Down Expand Up @@ -694,11 +712,18 @@ public DatabaseMetaData getMetaData() throws SQLException {
return metadata;
}

/** Getter method for projectId */
/** Getter method for the projectId to use for billing. */
public String getProjectId() {
return projectId;
}

/**
* Returns the default project that should be configured on queries processed by this connection.
*/
public String getDefaultProjectId() {
return this.defaultProjectId;
}

/**
*
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ public ResultSet executeQuery() throws SQLException {
this.connection.getBigquery(),
this.projectId,
this.RunnableStatement,
this.connection.getDataSet(),
this.connection.getDefaultDataSet(),
this.connection.getDefaultProjectId(),
this.connection.getUseLegacySql(),
this.connection.getMaxBillingBytes());
this.logger.info("Executing Query: " + this.RunnableStatement);
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/net/starschema/clouddb/jdbc/BQStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,8 @@ protected QueryResponse runSyncQuery(String querySql, boolean unlimitedBillingBy
this.connection.getBigquery(),
projectId,
querySql,
connection.getDataSet(),
connection.getDefaultDataSet(),
connection.getDefaultProjectId(),
this.connection.getUseLegacySql(),
!unlimitedBillingBytes ? this.connection.getMaxBillingBytes() : null,
getSyncTimeoutMillis(), // we need this to respond fast enough to avoid any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ private int executeDML(String sql) throws SQLException {
this.connection.getBigquery(),
projectId,
sql,
connection.getDataSet(),
connection.getDefaultDataSet(),
connection.getDefaultProjectId(),
this.connection.getUseLegacySql(),
this.connection.getMaxBillingBytes(),
(long) querytimeout * 1000,
Expand Down Expand Up @@ -322,7 +323,8 @@ public ResultSet executeQuery(String querySql, boolean unlimitedBillingBytes)
this.connection.getBigquery(),
projectId,
querySql,
connection.getDataSet(),
connection.getDefaultDataSet(),
connection.getDefaultProjectId(),
this.connection.getUseLegacySql(),
billingBytes,
(long) querytimeout * 1000,
Expand Down
103 changes: 83 additions & 20 deletions src/main/java/net/starschema/clouddb/jdbc/BQSupportFuncts.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,28 +68,44 @@ public class BQSupportFuncts {
*/
public static String constructUrlFromPropertiesFile(
Properties properties, boolean full, String dataset) throws UnsupportedEncodingException {
String projectId = properties.getProperty("projectid");
String projectId = properties.getProperty("projectid"); // Represent the billing project.
logger.debug("projectId is: " + projectId);
String User = properties.getProperty("user");
String Password = properties.getProperty("password");
String path = properties.getProperty("path");
dataset = dataset == null ? properties.getProperty("dataset") : dataset;
String defaultProjectId = properties.getProperty("defaultprojectid");

if (projectId == null && defaultProjectId == null) {
return null;
} else if (projectId == null) {
projectId = defaultProjectId;
} else if (defaultProjectId == null) {
defaultProjectId = projectId;
}

String forreturn = "";
// Set to '?' for the first param and '&' for subsequent params.
String paramSep = "?";

// When the billing and default projects are identical, then we do not add the 'quotaProjectId'
// parameter to the URL, and we instead set the URL's path component to the shared project id.
// When they differ, we define the default project using the path component, and we configure
// the billing project by adding the 'quotaProjectId' parameter to the URL.
String pathComponent =
(Objects.equals(projectId, defaultProjectId)) ? projectId : defaultProjectId;

if (properties.getProperty("type").equals("installed")) {
if (User != null && Password != null && projectId != null) {
forreturn = BQDriver.getURLPrefix() + URLEncoder.encode(projectId, "UTF-8");
if (User != null && Password != null) {
forreturn = BQDriver.getURLPrefix() + URLEncoder.encode(pathComponent, "UTF-8");
} else {
return null;
}
} else if (properties.getProperty("type").equals("service")) {
if (User != null && Password != null && projectId != null) {
if (User != null && Password != null) {
forreturn =
BQDriver.getURLPrefix()
+ URLEncoder.encode(projectId, "UTF-8")
+ URLEncoder.encode(pathComponent, "UTF-8")
+ (dataset != null && full ? "/" + URLEncoder.encode(dataset, "UTF-8") : "")
+ "?withServiceAccount=true";
paramSep = "&";
Expand All @@ -108,10 +124,10 @@ public static String constructUrlFromPropertiesFile(
}
} else if (properties.getProperty("type").equals("oauth")) {
String accessToken = properties.getProperty("oauthaccesstoken");
if (accessToken != null && projectId != null) {
if (accessToken != null) {
forreturn =
BQDriver.getURLPrefix()
+ URLEncoder.encode(projectId, "UTF-8")
+ URLEncoder.encode(pathComponent, "UTF-8")
+ (dataset != null && full ? "/" + URLEncoder.encode(dataset, "UTF-8") : "");
if (full) {
forreturn += "?oAuthAccessToken=" + URLEncoder.encode(accessToken, "UTF-8");
Expand All @@ -123,14 +139,24 @@ public static String constructUrlFromPropertiesFile(
} else if (properties.getProperty("type").equals("applicationDefault")) {
forreturn =
BQDriver.getURLPrefix()
+ URLEncoder.encode(projectId, "UTF-8")
+ URLEncoder.encode(pathComponent, "UTF-8")
+ (dataset != null && full ? "/" + URLEncoder.encode(dataset, "UTF-8") : "")
+ "?withApplicationDefaultCredentials=true";
paramSep = "&";
} else {
return null;
}

// Specify the 'quotaProjectId' URL parameter when the billing and default projects differ.
if (!Objects.equals(projectId, defaultProjectId)) {
forreturn +=
paramSep
+ BQConnection.PROJECT_ID_PARAMETER_NAME
+ "="
+ URLEncoder.encode(projectId, "UTF-8");
paramSep = "&";
}

String useLegacySql = properties.getProperty("useLegacySql");
if (useLegacySql != null) {
forreturn += paramSep + "useLegacySql=" + useLegacySql;
Expand Down Expand Up @@ -621,10 +647,11 @@ public static Properties readFromPropFile(String filePath) throws IOException {
* Run a query using the synchronous jobs.query() BigQuery endpoint.
*
* @param bigquery The BigQuery API wrapper
* @param projectId
* @param projectId The ProjectId to use for billing
* @param querySql The SQL to execute
* @param dataSet default dataset, can be null
* @param useLegacySql
* @param defaultDataSet default dataset, can be null
* @param defaultProjectId default projectId, only used when the default dataset is non-null
* @param useLegacySql Use the legacy SQL dialect when true
* @param maxBillingBytes Maximum bytes that the API will allow to bill
* @param queryTimeoutMs The timeout at which point the API will return with an incomplete result
* NOTE: this does _not_ mean the query fails, just we have to get the results async
Expand All @@ -639,7 +666,8 @@ static QueryResponse runSyncQuery(
Bigquery bigquery,
String projectId,
String querySql,
String dataSet,
String defaultDataSet,
String defaultProjectId,
Boolean useLegacySql,
Long maxBillingBytes,
Long queryTimeoutMs,
Expand All @@ -652,7 +680,8 @@ static QueryResponse runSyncQuery(
bigquery,
projectId,
querySql,
dataSet,
defaultDataSet,
defaultProjectId,
useLegacySql,
maxBillingBytes,
queryTimeoutMs,
Expand All @@ -671,7 +700,8 @@ static Bigquery.Jobs.Query getSyncQuery(
Bigquery bigquery,
String projectId,
String querySql,
String dataSet,
String defaultDataSet,
String defaultProjectId,
Boolean useLegacySql,
Long maxBillingBytes,
Long queryTimeoutMs,
Expand All @@ -691,8 +721,9 @@ static Bigquery.Jobs.Query getSyncQuery(
if (jobCreationMode != null) {
qr = qr.setJobCreationMode(jobCreationMode.name());
}
if (dataSet != null) {
qr.setDefaultDataset(new DatasetReference().setDatasetId(dataSet).setProjectId(projectId));
if (defaultDataSet != null) {
qr.setDefaultDataset(
new DatasetReference().setDatasetId(defaultDataSet).setProjectId(defaultProjectId));
}
if (maxResults != null) {
qr.setMaxResults(maxResults);
Expand All @@ -701,12 +732,43 @@ static Bigquery.Jobs.Query getSyncQuery(
return bigquery.jobs().query(projectId, qr);
}

/**
* Starts a new query in async mode.
*
* <p>This method exists to maintain backwards compatibility with the prior releases of bqjdbc.
*
* @param bigquery The bigquery instance, which is authorized
* @param projectId The project ID to use for the billing and default projects
* @param querySql The sql query which we want to run
* @param dataset The default dataset, can be null
* @param useLegacySql Use the legacy SQL dialect when true
* @param maxBillingBytes Maximum bytes that the API will allow to bill
* @return A JobReference which we'll use to poll the bigquery, for its state, then for its mined
* data.
* @throws IOException
* <p>if the request for initializing or executing job fails
*/
public static Job startQuery(
Bigquery bigquery,
String projectId,
String querySql,
String dataset,
Boolean useLegacySql,
Long maxBillingBytes)
throws IOException {
return startQuery(
bigquery, projectId, querySql, dataset, projectId, useLegacySql, maxBillingBytes);
}

/**
* Starts a new query in async mode.
*
* @param bigquery The bigquery instance, which is authorized
* @param projectId The project's ID
* @param projectId The project ID to use for billing
* @param querySql The sql query which we want to run
* @param defaultDataSet The default dataset, can be null
* @param defaultProjectId The default project ID, only used when the default dataset is non-null
* @param useLegacySql Use the legacy SQL dialect when true
* @return A JobReference which we'll use to poll the bigquery, for its state, then for its mined
* data.
* @throws IOException
Expand All @@ -716,7 +778,8 @@ public static Job startQuery(
Bigquery bigquery,
String projectId,
String querySql,
String dataSet,
String defaultDataSet,
String defaultProjectId,
Boolean useLegacySql,
Long maxBillingBytes)
throws IOException {
Expand All @@ -730,9 +793,9 @@ public static Job startQuery(
JobReference jobReference = new JobReference().setProjectId(projectId).setJobId(jobId);
job.setJobReference(jobReference);

if (dataSet != null)
if (defaultDataSet != null)
queryConfig.setDefaultDataset(
new DatasetReference().setDatasetId(dataSet).setProjectId(projectId));
new DatasetReference().setDatasetId(defaultDataSet).setProjectId(defaultProjectId));

job.setConfiguration(config);
queryConfig.setQuery(querySql);
Expand Down
Loading

0 comments on commit 537dc93

Please sign in to comment.