Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query rewriter plugin - design only #24509

Open
wants to merge 1 commit into
base: ibm
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,8 @@ public final class SystemSessionProperties
public static final String NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING = "native_min_columnar_encoding_channels_to_prefer_row_wise_encoding";
public static final String NATIVE_ENFORCE_JOIN_BUILD_INPUT_PARTITION = "native_enforce_join_build_input_partition";
public static final String NATIVE_EXECUTION_SCALE_WRITER_THREADS_ENABLED = "native_execution_scale_writer_threads_enabled";
public static final String IS_QUERY_REWRITER_PLUGIN_ENABLED = "is_query_rewriter_plugin_enabled";
public static final String IS_QUERY_REWRITER_PLUGIN_SUCCEEDED = "is_query_rewriter_plugin_succeeded";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -1858,7 +1860,27 @@ public SystemSessionProperties(
EXPRESSION_OPTIMIZER_NAME,
"Configure which expression optimizer to use",
featuresConfig.getExpressionOptimizerName(),
false));
false),
booleanProperty(
IS_QUERY_REWRITER_PLUGIN_ENABLED,
"Use queries rewriter plugin",
false,
true),
booleanProperty(
IS_QUERY_REWRITER_PLUGIN_SUCCEEDED,
"Query rewrite success",
false,
true));
}

public static boolean isQueryRewriterPluginSucceeded(Session session)
{
return session.getSystemProperty(IS_QUERY_REWRITER_PLUGIN_SUCCEEDED, Boolean.class);
}

public static boolean isQueryRewriterPluginEnabled(Session session)
{
return session.getSystemProperty(IS_QUERY_REWRITER_PLUGIN_ENABLED, Boolean.class);
}

public static boolean isSpoolingOutputBufferEnabled(Session session)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.airlift.concurrent.BoundedExecutor;
import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.common.analyzer.PreparedQuery;
import com.facebook.presto.common.resourceGroups.QueryType;
import com.facebook.presto.execution.QueryIdGenerator;
Expand All @@ -37,6 +38,7 @@
import com.facebook.presto.spi.resourceGroups.SelectionContext;
import com.facebook.presto.spi.resourceGroups.SelectionCriteria;
import com.facebook.presto.spi.security.AccessControl;
import com.facebook.presto.sql.analyzer.AnalyzerProviderManager;
import com.facebook.presto.sql.analyzer.QueryPreparerProviderManager;
import com.facebook.presto.transaction.TransactionManager;
import com.google.common.util.concurrent.AbstractFuture;
Expand Down Expand Up @@ -92,9 +94,13 @@ public class DispatchManager

private final QueryPreparerProviderManager queryPreparerProviderManager;

private final AnalyzerProviderManager analyzerProviderManager;

private final QueryRewriterManager queryRewriterManager;

/**
* Dispatch Manager is used for the pre-queuing part of queries prior to the query execution phase.
*
* <p>
* Dispatch Manager object is instantiated when the presto server is launched by server bootstrap time. It is a critical component in resource management section of the query.
*
* @param queryIdGenerator query ID generator for generating a new query ID when a query is created
Expand Down Expand Up @@ -126,7 +132,9 @@ public DispatchManager(
QueryManagerConfig queryManagerConfig,
DispatchExecutor dispatchExecutor,
ClusterStatusSender clusterStatusSender,
Optional<ClusterQueryTrackerService> clusterQueryTrackerService)
Optional<ClusterQueryTrackerService> clusterQueryTrackerService,
AnalyzerProviderManager analyzerProviderManager,
QueryRewriterManager queryRewriterManager)
{
this.queryIdGenerator = requireNonNull(queryIdGenerator, "queryIdGenerator is null");
this.queryPreparerProviderManager = requireNonNull(queryPreparerProviderManager, "queryPreparerProviderManager is null");
Expand All @@ -147,6 +155,10 @@ public DispatchManager(
this.clusterStatusSender = requireNonNull(clusterStatusSender, "clusterStatusSender is null");

this.queryTracker = new QueryTracker<>(queryManagerConfig, dispatchExecutor.getScheduledExecutor(), clusterQueryTrackerService);

this.analyzerProviderManager = requireNonNull(analyzerProviderManager, "analyzerProviderManager is null");

this.queryRewriterManager = requireNonNull(queryRewriterManager, "queryRewriterManager is null");
}

/**
Expand Down Expand Up @@ -181,7 +193,7 @@ public QueryManagerStats getStats()

/**
* Create a query id
*
* <p>
* This method is called when a {@code Query} object is created
*
* @return {@link QueryId}
Expand Down Expand Up @@ -279,6 +291,17 @@ private <C> void createQueryInternal(QueryId queryId, String slug, int retryCoun
preparedQuery = queryPreparerProvider.getQueryPreparer().prepareQuery(analyzerOptions, query, sessionBuilder.getPreparedStatements(), sessionBuilder.getWarningCollector());
query = preparedQuery.getFormattedQuery().orElse(query);

// Rewrite the query
if (SystemSessionProperties.isQueryRewriterPluginEnabled(session)) {
QueryAndSessionProperties queryAndSessionProperties = queryRewriterManager.rewriteQueryAndSession(query, session, analyzerOptions,
analyzerProviderManager.getAnalyzerProvider(getAnalyzerType(session)), queryPreparerProvider);
if (queryAndSessionProperties.getPreparedQuery().isPresent()) {
queryAndSessionProperties.getSystemSessionProperties().forEach(sessionBuilder::setSystemProperty);
preparedQuery = queryAndSessionProperties.getPreparedQuery().get();
query = queryAndSessionProperties.getQuery();
}
}

// select resource group
Optional<QueryType> queryType = preparedQuery.getQueryType();
sessionBuilder.setQueryType(queryType);
Expand Down Expand Up @@ -377,7 +400,6 @@ public ListenableFuture<?> waitForDispatched(QueryId queryId)

/**
* Return a list of {@link BasicQueryInfo}.
*
*/
public List<BasicQueryInfo> getQueries()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.facebook.presto.dispatcher;

import com.facebook.presto.common.analyzer.PreparedQuery;

import java.util.Map;
import java.util.Optional;

public class QueryAndSessionProperties
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done differently, the reason is we have SessionBuilder at dispatchManager, which can directly set the required properties.

{
private final String query;
private final Map<String, String> systemSessionProperties;
private final Optional<PreparedQuery> preparedQuery;

public QueryAndSessionProperties(String query, Map<String, String> systemSessionProperties, Optional<PreparedQuery> preparedQuery)
{
this.query = query;
this.systemSessionProperties = systemSessionProperties;
this.preparedQuery = preparedQuery;
}

public String getQuery()
{
return query;
}

public Map<String, String> getSystemSessionProperties()
{
return systemSessionProperties;
}

public Optional<PreparedQuery> getPreparedQuery()
{
return preparedQuery;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.dispatcher;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.Session;
import com.facebook.presto.common.analyzer.PreparedQuery;
import com.facebook.presto.eventlistener.EventListenerManager;
import com.facebook.presto.server.SessionContext;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.analyzer.AnalyzerOptions;
import com.facebook.presto.spi.analyzer.AnalyzerProvider;
import com.facebook.presto.spi.analyzer.QueryPreparerProvider;
import com.facebook.presto.spi.eventlistener.EventListener;
import com.facebook.presto.spi.rewriter.QueryRewriterInput;
import com.facebook.presto.spi.rewriter.QueryRewriterOutput;
import com.facebook.presto.spi.rewriter.QueryRewriterProvider;
import com.facebook.presto.spi.rewriter.QueryRewriterProviderFactory;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

import static com.facebook.presto.SystemSessionProperties.IS_QUERY_REWRITER_PLUGIN_ENABLED;
import static com.google.common.base.Preconditions.checkState;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

/**
* To provide a query rewriter plugin, i.e. a plugin that inputs a set of session properties and query and returns the
* updated session properties and rewritten query.
* 1) Provide implementation for QueryRewriterProviderFactory
* 2) Provide implementation for QueryRewriterProvider
* 3) Implement com.facebook.presto.spi.Plugin and provide implementation for Iterable<QueryRewriterProviderFactory> getQueryRewriterProviderFactory()
* 4) Finally provide implementation of QueryRewriter, which does actual query rewrite.
* For example:
* {@link presto-tests/com.facebook.presto.plugin.rewriter.UpperCasingQueryRewriterPlugin}
*/
public class QueryRewriterManager
{
private static final Logger log = Logger.get(QueryRewriterManager.class);

private final AtomicReference<Optional<QueryRewriterProviderFactory>> providerFactory = new AtomicReference<>(Optional.empty());
private final AtomicReference<Optional<QueryRewriterProvider>> provider = new AtomicReference<>(Optional.empty());
private final EventListenerManager eventListenerManager;

@Inject
public QueryRewriterManager(EventListenerManager eventListenerManager)
{
this.eventListenerManager = requireNonNull(eventListenerManager, "eventListenerManager is null");
}

public static Boolean isQueryRewriterPluginEnabled(SessionContext sessionContext)
{
return Boolean.parseBoolean(sessionContext.getSystemProperties().getOrDefault(IS_QUERY_REWRITER_PLUGIN_ENABLED, "false"));
}

public void addQueryRewriterProviderFactory(QueryRewriterProviderFactory queryRewriterProviderFactory)
{
requireNonNull(queryRewriterProviderFactory, "queryRewriterProviderFactory is null");
checkState(providerFactory.compareAndSet(Optional.empty(), Optional.of(queryRewriterProviderFactory)),
format("A query rewriter factory is already registered with name %s", queryRewriterProviderFactory.getName()));
}

public void loadQueryRewriterProvider()
{
List<EventListener> configuredEventListener = ImmutableList.of();
if (eventListenerManager.getConfiguredEventListener().isPresent()) {
configuredEventListener = ImmutableList.of(eventListenerManager.getConfiguredEventListener().get());
}
Optional<QueryRewriterProviderFactory> queryRewriterProviderFactory = providerFactory.get();
if (queryRewriterProviderFactory.isPresent()) {
QueryRewriterProvider queryRewriterProvider = queryRewriterProviderFactory.get().create(configuredEventListener);
checkState(provider.compareAndSet(Optional.empty(), Optional.of(queryRewriterProvider)),
format("A query rewriter provider is already registered %s", queryRewriterProvider));
}
}

public Optional<QueryRewriterProvider> getQueryRewriterProvider()
{
return provider.get();
}

public QueryAndSessionProperties rewriteQueryAndSession(
String query,
Session session,
AnalyzerOptions analyzerOptions,
AnalyzerProvider analyzerProvider,
QueryPreparerProvider queryPreparerProvider)
{
QueryId queryId = session.getQueryId();
QueryAndSessionProperties rewrittenQueryAndSessionProperties = new QueryAndSessionProperties(query, session.getSystemProperties(), Optional.empty());
if (getQueryRewriterProvider().isPresent()) {
QueryRewriterInput queryRewriterInput = new QueryRewriterInput.Builder()
.setQuery(query)
.setQueryId(queryId.getId())
.setCatalog(session.getCatalog())
.setSchema(session.getSchema())
.setPreparedStatements(session.getPreparedStatements())
.setWarningCollector(session.getWarningCollector())
.setSessionProperties(session.getSystemProperties())
.setAnalyzerOptions(analyzerOptions)
.setAnalyzerProvider(analyzerProvider)
.setQueryPreparer(queryPreparerProvider.getQueryPreparer())
.build();
try {
QueryRewriterProvider provider = getQueryRewriterProvider().get();
QueryRewriterOutput queryRewriterOutput = provider.getQueryRewriter().rewriteSQL(queryRewriterInput);
String rewrittenQuery = queryRewriterOutput.getRewrittenQuery();
// Checking if the rewritten query is parseable.
PreparedQuery preparedQuery = queryPreparerProvider.getQueryPreparer()
.prepareQuery(analyzerOptions, rewrittenQuery, session.getPreparedStatements(), session.getWarningCollector());
// apply updated session properties.
Map<String, String> systemPropertyOverrides = queryRewriterOutput.getSessionProperties();
rewrittenQueryAndSessionProperties = new QueryAndSessionProperties(rewrittenQuery, systemPropertyOverrides, Optional.of(preparedQuery));
log.info("createQueryInternal :: QueryId [%s] - Replacing with optimized query", queryId.getId());
}
catch (Exception e) {
// TODO : Implement a better way to test if rewritten query is parseable.
log.warn(format("rewritten query for query with id %s is discarded", session.getQueryId()), e);
}
}
return rewrittenQueryAndSessionProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public void addEventListenerFactory(EventListenerFactory eventListenerFactory)
}
}

public Optional<EventListener> getConfiguredEventListener()
{
return configuredEventListener.get();
}

public void loadConfiguredEventListener()
throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.facebook.presto.connector.ConnectorManager;
import com.facebook.presto.cost.HistoryBasedPlanStatisticsManager;
import com.facebook.presto.dispatcher.QueryPrerequisitesManager;
import com.facebook.presto.dispatcher.QueryRewriterManager;
import com.facebook.presto.eventlistener.EventListenerManager;
import com.facebook.presto.execution.resourceGroups.ResourceGroupManager;
import com.facebook.presto.metadata.Metadata;
Expand All @@ -42,6 +43,7 @@
import com.facebook.presto.spi.plan.PlanCheckerProviderFactory;
import com.facebook.presto.spi.prerequisites.QueryPrerequisitesFactory;
import com.facebook.presto.spi.resourceGroups.ResourceGroupConfigurationManagerFactory;
import com.facebook.presto.spi.rewriter.QueryRewriterProviderFactory;
import com.facebook.presto.spi.security.PasswordAuthenticatorFactory;
import com.facebook.presto.spi.security.PrestoAuthenticatorFactory;
import com.facebook.presto.spi.security.SystemAccessControlFactory;
Expand Down Expand Up @@ -144,6 +146,7 @@ public class PluginManager
private final ClientRequestFilterManager clientRequestFilterManager;
private final PlanCheckerProviderManager planCheckerProviderManager;
private final ExpressionOptimizerManager expressionOptimizerManager;
private final QueryRewriterManager queryRewriterManager;

@Inject
public PluginManager(
Expand All @@ -169,7 +172,8 @@ public PluginManager(
NodeStatusNotificationManager nodeStatusNotificationManager,
ClientRequestFilterManager clientRequestFilterManager,
PlanCheckerProviderManager planCheckerProviderManager,
ExpressionOptimizerManager expressionOptimizerManager)
ExpressionOptimizerManager expressionOptimizerManager,
QueryRewriterManager queryRewriterManager)
{
requireNonNull(nodeInfo, "nodeInfo is null");
requireNonNull(config, "config is null");
Expand Down Expand Up @@ -205,6 +209,7 @@ public PluginManager(
this.clientRequestFilterManager = requireNonNull(clientRequestFilterManager, "clientRequestFilterManager is null");
this.planCheckerProviderManager = requireNonNull(planCheckerProviderManager, "planCheckerProviderManager is null");
this.expressionOptimizerManager = requireNonNull(expressionOptimizerManager, "expressionManager is null");
this.queryRewriterManager = requireNonNull(queryRewriterManager, "queryRewriterManager is null");
}

public void loadPlugins()
Expand Down Expand Up @@ -379,6 +384,11 @@ public void installPlugin(Plugin plugin)
log.info("Registering client request filter factory");
clientRequestFilterManager.registerClientRequestFilterFactory(clientRequestFilterFactory);
}

for (QueryRewriterProviderFactory queryRewriterProviderFactory : plugin.getQueryRewriterProviderFactories()) {
log.info("Registering query rewriter provider factory %s", queryRewriterProviderFactory.getName());
queryRewriterManager.addQueryRewriterProviderFactory(queryRewriterProviderFactory);
}
}

public void installCoordinatorPlugin(CoordinatorPlugin plugin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.facebook.presto.ClientRequestFilterModule;
import com.facebook.presto.dispatcher.QueryPrerequisitesManager;
import com.facebook.presto.dispatcher.QueryPrerequisitesManagerModule;
import com.facebook.presto.dispatcher.QueryRewriterManager;
import com.facebook.presto.eventlistener.EventListenerManager;
import com.facebook.presto.eventlistener.EventListenerModule;
import com.facebook.presto.execution.resourceGroups.ResourceGroupManager;
Expand Down Expand Up @@ -190,6 +191,7 @@ public void run()
injector.getInstance(NodeStatusNotificationManager.class).loadNodeStatusNotificationProvider();
injector.getInstance(GracefulShutdownHandler.class).loadNodeStatusNotification();
injector.getInstance(SessionPropertyManager.class).loadSessionPropertyProviders();
injector.getInstance(QueryRewriterManager.class).loadQueryRewriterProvider();
PlanCheckerProviderManager planCheckerProviderManager = injector.getInstance(PlanCheckerProviderManager.class);
InternalNodeManager nodeManager = injector.getInstance(DiscoveryNodeManager.class);
NodeInfo nodeInfo = injector.getInstance(NodeInfo.class);
Expand Down
Loading
Loading