Skip to content

Commit

Permalink
Query rewriter plugin - design only
Browse files Browse the repository at this point in the history
  • Loading branch information
ScrapCodes committed Feb 6, 2025
1 parent a7a7f80 commit 2cd1b94
Show file tree
Hide file tree
Showing 20 changed files with 991 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,8 @@ public final class SystemSessionProperties
public static final String NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING = "native_min_columnar_encoding_channels_to_prefer_row_wise_encoding";
public static final String NATIVE_ENFORCE_JOIN_BUILD_INPUT_PARTITION = "native_enforce_join_build_input_partition";
public static final String NATIVE_EXECUTION_SCALE_WRITER_THREADS_ENABLED = "native_execution_scale_writer_threads_enabled";
public static final String IS_QUERY_REWRITER_PLUGIN_ENABLED = "is_query_rewriter_plugin_enabled";
public static final String IS_QUERY_REWRITER_PLUGIN_SUCCEEDED = "is_query_rewriter_plugin_succeeded";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -1858,7 +1860,27 @@ public SystemSessionProperties(
EXPRESSION_OPTIMIZER_NAME,
"Configure which expression optimizer to use",
featuresConfig.getExpressionOptimizerName(),
false));
false),
booleanProperty(
IS_QUERY_REWRITER_PLUGIN_ENABLED,
"Use queries rewriter plugin",
false,
true),
booleanProperty(
IS_QUERY_REWRITER_PLUGIN_SUCCEEDED,
"Query rewrite success",
false,
true));
}

public static boolean isQueryRewriterPluginSucceeded(Session session)
{
return session.getSystemProperty(IS_QUERY_REWRITER_PLUGIN_SUCCEEDED, Boolean.class);
}

public static boolean isQueryRewriterPluginEnabled(Session session)
{
return session.getSystemProperty(IS_QUERY_REWRITER_PLUGIN_ENABLED, Boolean.class);
}

public static boolean isSpoolingOutputBufferEnabled(Session session)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.airlift.concurrent.BoundedExecutor;
import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.common.analyzer.PreparedQuery;
import com.facebook.presto.common.resourceGroups.QueryType;
import com.facebook.presto.execution.QueryIdGenerator;
Expand All @@ -37,6 +38,7 @@
import com.facebook.presto.spi.resourceGroups.SelectionContext;
import com.facebook.presto.spi.resourceGroups.SelectionCriteria;
import com.facebook.presto.spi.security.AccessControl;
import com.facebook.presto.sql.analyzer.AnalyzerProviderManager;
import com.facebook.presto.sql.analyzer.QueryPreparerProviderManager;
import com.facebook.presto.transaction.TransactionManager;
import com.google.common.util.concurrent.AbstractFuture;
Expand Down Expand Up @@ -92,9 +94,13 @@ public class DispatchManager

private final QueryPreparerProviderManager queryPreparerProviderManager;

private final AnalyzerProviderManager analyzerProviderManager;

private final QueryRewriterManager queryRewriterManager;

/**
* Dispatch Manager is used for the pre-queuing part of queries prior to the query execution phase.
*
* <p>
* Dispatch Manager object is instantiated when the presto server is launched by server bootstrap time. It is a critical component in resource management section of the query.
*
* @param queryIdGenerator query ID generator for generating a new query ID when a query is created
Expand Down Expand Up @@ -126,7 +132,9 @@ public DispatchManager(
QueryManagerConfig queryManagerConfig,
DispatchExecutor dispatchExecutor,
ClusterStatusSender clusterStatusSender,
Optional<ClusterQueryTrackerService> clusterQueryTrackerService)
Optional<ClusterQueryTrackerService> clusterQueryTrackerService,
AnalyzerProviderManager analyzerProviderManager,
QueryRewriterManager queryRewriterManager)
{
this.queryIdGenerator = requireNonNull(queryIdGenerator, "queryIdGenerator is null");
this.queryPreparerProviderManager = requireNonNull(queryPreparerProviderManager, "queryPreparerProviderManager is null");
Expand All @@ -147,6 +155,10 @@ public DispatchManager(
this.clusterStatusSender = requireNonNull(clusterStatusSender, "clusterStatusSender is null");

this.queryTracker = new QueryTracker<>(queryManagerConfig, dispatchExecutor.getScheduledExecutor(), clusterQueryTrackerService);

this.analyzerProviderManager = requireNonNull(analyzerProviderManager, "analyzerProviderManager is null");

this.queryRewriterManager = requireNonNull(queryRewriterManager, "queryRewriterManager is null");
}

/**
Expand Down Expand Up @@ -181,7 +193,7 @@ public QueryManagerStats getStats()

/**
* Create a query id
*
* <p>
* This method is called when a {@code Query} object is created
*
* @return {@link QueryId}
Expand Down Expand Up @@ -279,6 +291,17 @@ private <C> void createQueryInternal(QueryId queryId, String slug, int retryCoun
preparedQuery = queryPreparerProvider.getQueryPreparer().prepareQuery(analyzerOptions, query, sessionBuilder.getPreparedStatements(), sessionBuilder.getWarningCollector());
query = preparedQuery.getFormattedQuery().orElse(query);

// Rewrite the query
if (SystemSessionProperties.isQueryRewriterPluginEnabled(session)) {
QueryAndSessionProperties queryAndSessionProperties = queryRewriterManager.rewriteQueryAndSession(query, session, analyzerOptions,
analyzerProviderManager.getAnalyzerProvider(getAnalyzerType(session)), queryPreparerProvider);
if (queryAndSessionProperties.getPreparedQuery().isPresent()) {
queryAndSessionProperties.getSystemSessionProperties().forEach(sessionBuilder::setSystemProperty);
preparedQuery = queryAndSessionProperties.getPreparedQuery().get();
query = queryAndSessionProperties.getQuery();
}
}

// select resource group
Optional<QueryType> queryType = preparedQuery.getQueryType();
sessionBuilder.setQueryType(queryType);
Expand Down Expand Up @@ -377,7 +400,6 @@ public ListenableFuture<?> waitForDispatched(QueryId queryId)

/**
* Return a list of {@link BasicQueryInfo}.
*
*/
public List<BasicQueryInfo> getQueries()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.facebook.presto.dispatcher;

import com.facebook.presto.common.analyzer.PreparedQuery;

import java.util.Map;
import java.util.Optional;

public class QueryAndSessionProperties
{
private final String query;
private final Map<String, String> systemSessionProperties;
private final Optional<PreparedQuery> preparedQuery;

public QueryAndSessionProperties(String query, Map<String, String> systemSessionProperties, Optional<PreparedQuery> preparedQuery)
{
this.query = query;
this.systemSessionProperties = systemSessionProperties;
this.preparedQuery = preparedQuery;
}

public String getQuery()
{
return query;
}

public Map<String, String> getSystemSessionProperties()
{
return systemSessionProperties;
}

public Optional<PreparedQuery> getPreparedQuery()
{
return preparedQuery;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.dispatcher;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.Session;
import com.facebook.presto.common.analyzer.PreparedQuery;
import com.facebook.presto.eventlistener.EventListenerManager;
import com.facebook.presto.server.SessionContext;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.analyzer.AnalyzerOptions;
import com.facebook.presto.spi.analyzer.AnalyzerProvider;
import com.facebook.presto.spi.analyzer.QueryPreparerProvider;
import com.facebook.presto.spi.eventlistener.EventListener;
import com.facebook.presto.spi.rewriter.QueryRewriterInput;
import com.facebook.presto.spi.rewriter.QueryRewriterOutput;
import com.facebook.presto.spi.rewriter.QueryRewriterProvider;
import com.facebook.presto.spi.rewriter.QueryRewriterProviderFactory;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

import static com.facebook.presto.SystemSessionProperties.IS_QUERY_REWRITER_PLUGIN_ENABLED;
import static com.google.common.base.Preconditions.checkState;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

/**
* To provide a query rewriter plugin, i.e. a plugin that inputs a set of session properties and query and returns the
* updated session properties and rewritten query.
* 1) Provide implementation for QueryRewriterProviderFactory
* 2) Provide implementation for QueryRewriterProvider
* 3) Implement com.facebook.presto.spi.Plugin and provide implementation for Iterable<QueryRewriterProviderFactory> getQueryRewriterProviderFactory()
* 4) Finally provide implementation of QueryRewriter, which does actual query rewrite.
* For example:
* {@link presto-tests/com.facebook.presto.plugin.rewriter.UpperCasingQueryRewriterPlugin}
*/
public class QueryRewriterManager
{
private static final Logger log = Logger.get(QueryRewriterManager.class);

private final AtomicReference<Optional<QueryRewriterProviderFactory>> providerFactory = new AtomicReference<>(Optional.empty());
private final AtomicReference<Optional<QueryRewriterProvider>> provider = new AtomicReference<>(Optional.empty());
private final EventListenerManager eventListenerManager;

@Inject
public QueryRewriterManager(EventListenerManager eventListenerManager)
{
this.eventListenerManager = requireNonNull(eventListenerManager, "eventListenerManager is null");
}

public static Boolean isQueryRewriterPluginEnabled(SessionContext sessionContext)
{
return Boolean.parseBoolean(sessionContext.getSystemProperties().getOrDefault(IS_QUERY_REWRITER_PLUGIN_ENABLED, "false"));
}

public void addQueryRewriterProviderFactory(QueryRewriterProviderFactory queryRewriterProviderFactory)
{
requireNonNull(queryRewriterProviderFactory, "queryRewriterProviderFactory is null");
checkState(providerFactory.compareAndSet(Optional.empty(), Optional.of(queryRewriterProviderFactory)),
format("A query rewriter factory is already registered with name %s", queryRewriterProviderFactory.getName()));
}

public void loadQueryRewriterProvider()
{
List<EventListener> configuredEventListener = ImmutableList.of();
if (eventListenerManager.getConfiguredEventListener().isPresent()) {
configuredEventListener = ImmutableList.of(eventListenerManager.getConfiguredEventListener().get());
}
Optional<QueryRewriterProviderFactory> queryRewriterProviderFactory = providerFactory.get();
if (queryRewriterProviderFactory.isPresent()) {
QueryRewriterProvider queryRewriterProvider = queryRewriterProviderFactory.get().create(configuredEventListener);
checkState(provider.compareAndSet(Optional.empty(), Optional.of(queryRewriterProvider)),
format("A query rewriter provider is already registered %s", queryRewriterProvider));
}
}

public Optional<QueryRewriterProvider> getQueryRewriterProvider()
{
return provider.get();
}

public QueryAndSessionProperties rewriteQueryAndSession(
String query,
Session session,
AnalyzerOptions analyzerOptions,
AnalyzerProvider analyzerProvider,
QueryPreparerProvider queryPreparerProvider)
{
QueryId queryId = session.getQueryId();
QueryAndSessionProperties rewrittenQueryAndSessionProperties = new QueryAndSessionProperties(query, session.getSystemProperties(), Optional.empty());
if (getQueryRewriterProvider().isPresent()) {
QueryRewriterInput queryRewriterInput = new QueryRewriterInput.Builder()
.setQuery(query)
.setQueryId(queryId.getId())
.setCatalog(session.getCatalog())
.setSchema(session.getSchema())
.setPreparedStatements(session.getPreparedStatements())
.setWarningCollector(session.getWarningCollector())
.setSessionProperties(session.getSystemProperties())
.setAnalyzerOptions(analyzerOptions)
.setAnalyzerProvider(analyzerProvider)
.setQueryPreparer(queryPreparerProvider.getQueryPreparer())
.build();
try {
QueryRewriterProvider provider = getQueryRewriterProvider().get();
QueryRewriterOutput queryRewriterOutput = provider.getQueryRewriter().rewriteSQL(queryRewriterInput);
String rewrittenQuery = queryRewriterOutput.getRewrittenQuery();
// Checking if the rewritten query is parseable.
PreparedQuery preparedQuery = queryPreparerProvider.getQueryPreparer()
.prepareQuery(analyzerOptions, rewrittenQuery, session.getPreparedStatements(), session.getWarningCollector());
// apply updated session properties.
Map<String, String> systemPropertyOverrides = queryRewriterOutput.getSessionProperties();
rewrittenQueryAndSessionProperties = new QueryAndSessionProperties(rewrittenQuery, systemPropertyOverrides, Optional.of(preparedQuery));
log.info("createQueryInternal :: QueryId [%s] - Replacing with optimized query", queryId.getId());
}
catch (Exception e) {
// TODO : Implement a better way to test if rewritten query is parseable.
log.warn(format("rewritten query for query with id %s is discarded", session.getQueryId()), e);
}
}
return rewrittenQueryAndSessionProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public void addEventListenerFactory(EventListenerFactory eventListenerFactory)
}
}

public Optional<EventListener> getConfiguredEventListener()
{
return configuredEventListener.get();
}

public void loadConfiguredEventListener()
throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.facebook.presto.connector.ConnectorManager;
import com.facebook.presto.cost.HistoryBasedPlanStatisticsManager;
import com.facebook.presto.dispatcher.QueryPrerequisitesManager;
import com.facebook.presto.dispatcher.QueryRewriterManager;
import com.facebook.presto.eventlistener.EventListenerManager;
import com.facebook.presto.execution.resourceGroups.ResourceGroupManager;
import com.facebook.presto.metadata.Metadata;
Expand All @@ -42,6 +43,7 @@
import com.facebook.presto.spi.plan.PlanCheckerProviderFactory;
import com.facebook.presto.spi.prerequisites.QueryPrerequisitesFactory;
import com.facebook.presto.spi.resourceGroups.ResourceGroupConfigurationManagerFactory;
import com.facebook.presto.spi.rewriter.QueryRewriterProviderFactory;
import com.facebook.presto.spi.security.PasswordAuthenticatorFactory;
import com.facebook.presto.spi.security.PrestoAuthenticatorFactory;
import com.facebook.presto.spi.security.SystemAccessControlFactory;
Expand Down Expand Up @@ -144,6 +146,7 @@ public class PluginManager
private final ClientRequestFilterManager clientRequestFilterManager;
private final PlanCheckerProviderManager planCheckerProviderManager;
private final ExpressionOptimizerManager expressionOptimizerManager;
private final QueryRewriterManager queryRewriterManager;

@Inject
public PluginManager(
Expand All @@ -169,7 +172,8 @@ public PluginManager(
NodeStatusNotificationManager nodeStatusNotificationManager,
ClientRequestFilterManager clientRequestFilterManager,
PlanCheckerProviderManager planCheckerProviderManager,
ExpressionOptimizerManager expressionOptimizerManager)
ExpressionOptimizerManager expressionOptimizerManager,
QueryRewriterManager queryRewriterManager)
{
requireNonNull(nodeInfo, "nodeInfo is null");
requireNonNull(config, "config is null");
Expand Down Expand Up @@ -205,6 +209,7 @@ public PluginManager(
this.clientRequestFilterManager = requireNonNull(clientRequestFilterManager, "clientRequestFilterManager is null");
this.planCheckerProviderManager = requireNonNull(planCheckerProviderManager, "planCheckerProviderManager is null");
this.expressionOptimizerManager = requireNonNull(expressionOptimizerManager, "expressionManager is null");
this.queryRewriterManager = requireNonNull(queryRewriterManager, "queryRewriterManager is null");
}

public void loadPlugins()
Expand Down Expand Up @@ -379,6 +384,11 @@ public void installPlugin(Plugin plugin)
log.info("Registering client request filter factory");
clientRequestFilterManager.registerClientRequestFilterFactory(clientRequestFilterFactory);
}

for (QueryRewriterProviderFactory queryRewriterProviderFactory : plugin.getQueryRewriterProviderFactories()) {
log.info("Registering query rewriter provider factory %s", queryRewriterProviderFactory.getName());
queryRewriterManager.addQueryRewriterProviderFactory(queryRewriterProviderFactory);
}
}

public void installCoordinatorPlugin(CoordinatorPlugin plugin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.facebook.presto.ClientRequestFilterModule;
import com.facebook.presto.dispatcher.QueryPrerequisitesManager;
import com.facebook.presto.dispatcher.QueryPrerequisitesManagerModule;
import com.facebook.presto.dispatcher.QueryRewriterManager;
import com.facebook.presto.eventlistener.EventListenerManager;
import com.facebook.presto.eventlistener.EventListenerModule;
import com.facebook.presto.execution.resourceGroups.ResourceGroupManager;
Expand Down Expand Up @@ -190,6 +191,7 @@ public void run()
injector.getInstance(NodeStatusNotificationManager.class).loadNodeStatusNotificationProvider();
injector.getInstance(GracefulShutdownHandler.class).loadNodeStatusNotification();
injector.getInstance(SessionPropertyManager.class).loadSessionPropertyProviders();
injector.getInstance(QueryRewriterManager.class).loadQueryRewriterProvider();
PlanCheckerProviderManager planCheckerProviderManager = injector.getInstance(PlanCheckerProviderManager.class);
InternalNodeManager nodeManager = injector.getInstance(DiscoveryNodeManager.class);
NodeInfo nodeInfo = injector.getInstance(NodeInfo.class);
Expand Down
Loading

0 comments on commit 2cd1b94

Please sign in to comment.