From 994a19891884c88c283f2bf9726335edd3a5e9df Mon Sep 17 00:00:00 2001 From: malakaganga Date: Sat, 6 May 2023 15:51:53 +0530 Subject: [PATCH] Fix Passthrough Threads getting stucked due to request message discard Adding new worker pool (RequestMessageDiscardWorkerPool) to discard request messages So ClientWorker (Passthrough Message Processor) threads won't get stucked while draining the input stream. Also remove setting REQUEST_DONE state in SourceContext when Error Scenario since when discarding the message we automatically set REQUEST_DONE in SourceHandler. Fixes: https://github.com/wso2/api-manager/issues/1792 --- .../transport/passthru/ClientWorker.java | 5 - .../passthru/MessageDiscardWorker.java | 97 +++++++++++++++++++ .../transport/passthru/TargetHandler.java | 10 +- .../passthru/config/BaseConfiguration.java | 20 ++++ .../config/PassThroughConfigPNames.java | 20 ++++ .../config/PassThroughConfiguration.java | 18 ++++ 6 files changed, 164 insertions(+), 6 deletions(-) create mode 100644 modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/MessageDiscardWorker.java diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/ClientWorker.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/ClientWorker.java index dbee9d95d2..ff373322d1 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/ClientWorker.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/ClientWorker.java @@ -246,11 +246,6 @@ public void run() { getContext().setAttribute(PassThroughConstants.CLIENT_WORKER_START_TIME, System.currentTimeMillis()); } try { - // If an error has happened in the request processing, consumes the data in pipe completely and discard it - // If the consumeAndDiscard property is set to true - if (response.isForceShutdownConnectionOnComplete() && conf.isConsumeAndDiscard()) { - RelayUtils.discardRequestMessage(requestMessageContext); - } if (expectEntityBody) { String cType = response.getHeader(HTTP.CONTENT_TYPE); if(cType == null){ diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/MessageDiscardWorker.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/MessageDiscardWorker.java new file mode 100644 index 0000000000..1c7603cca7 --- /dev/null +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/MessageDiscardWorker.java @@ -0,0 +1,97 @@ +/** + * Copyright (c) 2023, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.synapse.transport.passthru; + +import org.apache.axis2.AxisFault; +import org.apache.axis2.context.MessageContext; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.http.nio.NHttpClientConnection; +import org.apache.http.nio.NHttpServerConnection; +import org.apache.synapse.transport.passthru.config.TargetConfiguration; +import org.apache.synapse.transport.passthru.util.RelayUtils; + +public class MessageDiscardWorker implements Runnable { + + private Log log = LogFactory.getLog(MessageDiscardWorker.class); + private ClientWorker clientWorker = null; + + TargetConfiguration targetConfiguration = null; + + private TargetResponse response = null; + + private MessageContext requestMessageContext; + + NHttpClientConnection conn = null; + + public MessageDiscardWorker(MessageContext requestMsgContext, TargetResponse response, + TargetConfiguration targetConfiguration, ClientWorker clientWorker, NHttpClientConnection conn) { + this.response = response; + this.requestMessageContext = requestMsgContext; + this.targetConfiguration = targetConfiguration; + this.clientWorker = clientWorker; + this.conn = conn; + } + + public void run() { + + // If an error has happened in the request processing, consumes the data in pipe completely and discard it + try { + RelayUtils.discardRequestMessage(requestMessageContext); + } catch (AxisFault af) { + log.error("Fault discarding request message", af); + } + + targetConfiguration.getWorkerPool().execute(clientWorker); + + targetConfiguration.getMetrics().incrementMessagesReceived(); + + NHttpServerConnection sourceConn = (NHttpServerConnection) requestMessageContext.getProperty( + PassThroughConstants.PASS_THROUGH_SOURCE_CONNECTION); + if (sourceConn != null) { + sourceConn.getContext().setAttribute(PassThroughConstants.RES_HEADER_ARRIVAL_TIME, + conn.getContext() + .getAttribute(PassThroughConstants.RES_HEADER_ARRIVAL_TIME) + ); + conn.getContext().removeAttribute(PassThroughConstants.RES_HEADER_ARRIVAL_TIME); + + sourceConn.getContext().setAttribute(PassThroughConstants.REQ_DEPARTURE_TIME, + conn.getContext() + .getAttribute(PassThroughConstants.REQ_DEPARTURE_TIME) + ); + conn.getContext().removeAttribute(PassThroughConstants.REQ_DEPARTURE_TIME); + sourceConn.getContext().setAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_START_TIME, + conn.getContext() + .getAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_START_TIME) + ); + + conn.getContext().removeAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_START_TIME); + sourceConn.getContext().setAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_END_TIME, + conn.getContext() + .getAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_END_TIME) + ); + conn.getContext().removeAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_END_TIME); + sourceConn.getContext().setAttribute(PassThroughConstants.RES_FROM_BACKEND_READ_START_TIME, + conn.getContext() + .getAttribute(PassThroughConstants.RES_FROM_BACKEND_READ_START_TIME) + ); + conn.getContext().removeAttribute(PassThroughConstants.RES_FROM_BACKEND_READ_START_TIME); + + } + + } +} diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java index b8719dd9d4..7d3f9f44ac 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java @@ -458,10 +458,18 @@ public void responseReceived(NHttpClientConnection conn) { if (statusCode == HttpStatus.SC_ACCEPTED && handle202(requestMsgContext)) { return; } + if (targetResponse.isForceShutdownConnectionOnComplete() && conf.isConsumeAndDiscard()) { + ClientWorker clientWorker = new ClientWorker(targetConfiguration, requestMsgContext, targetResponse, + allowedResponseProperties); + targetConfiguration.getSecondaryWorkerPool().execute(new MessageDiscardWorker(requestMsgContext, + targetResponse, targetConfiguration, clientWorker, conn)); + return; + } + WorkerPool workerPool = targetConfiguration.getWorkerPool(); workerPool.execute( new ClientWorker(targetConfiguration, requestMsgContext, targetResponse)); - if (workerPool.getActiveCount() >= conf.getWorkerPoolCoreSize()) { + if (workerPool.getActiveCount() >= conf.getWorkerPoolCoreSize()) { conn.getContext().setAttribute(PassThroughConstants.CLIENT_WORKER_SIDE_QUEUED_TIME, System.currentTimeMillis()); } diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/BaseConfiguration.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/BaseConfiguration.java index a7f92d42cf..0457a1695c 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/BaseConfiguration.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/BaseConfiguration.java @@ -49,6 +49,9 @@ public abstract class BaseConfiguration { /** The thread pool for executing the messages passing through */ private WorkerPool workerPool = null; + /** The secondary thread pool for executing the messages passing through */ + private WorkerPool secondaryWorkerPool = null; + /** The Axis2 ConfigurationContext */ protected ConfigurationContext configurationContext = null; @@ -75,6 +78,10 @@ public abstract class BaseConfiguration { private static final String PASSTHROUGH_THREAD_GROUP = "Pass-through Message Processing Thread Group"; private static final String PASSTHROUGH_THREAD_ID ="PassThroughMessageProcessor"; + private static final String SECONDARY_PASSTHROUGH_THREAD_GROUP = "Secondary Pass-through Message Processing " + + "Thread Group"; + private static final String SECONDARY_PASSTHROUGH_THREAD_ID = "PassThroughMessageSecondaryProcessor"; + private Integer socketTimeout = null; private Integer connectionTimeout = null; @@ -101,6 +108,16 @@ public void build() throws AxisFault { PASSTHROUGH_THREAD_ID); } + if (secondaryWorkerPool == null) { + secondaryWorkerPool = WorkerPoolFactory.getWorkerPool( + conf.getSecondaryWorkerPoolCoreSize(), + conf.getSecondaryWorkerPoolMaxSize(), + conf.getSecondaryWorkerThreadKeepaliveSec(), + conf.getSecondaryWorkerPoolQueueLen(), + SECONDARY_PASSTHROUGH_THREAD_GROUP, + SECONDARY_PASSTHROUGH_THREAD_ID); + } + httpParams = buildHttpParams(); ioReactorConfig = buildIOReactorConfig(); String sysCorrelationStatus = System.getProperty(PassThroughConstants.CORRELATION_LOGS_SYS_PROPERTY); @@ -147,6 +164,9 @@ public int getIOBufferSize() { public WorkerPool getWorkerPool() { return workerPool; } + public WorkerPool getSecondaryWorkerPool() { + return secondaryWorkerPool; + } public ConfigurationContext getConfigurationContext() { return configurationContext; diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/PassThroughConfigPNames.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/PassThroughConfigPNames.java index e7ee53d7dc..8e193cfb91 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/PassThroughConfigPNames.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/PassThroughConfigPNames.java @@ -49,6 +49,26 @@ public interface PassThroughConfigPNames { */ public String IO_THREADS_PER_REACTOR = "io_threads_per_reactor"; + /** + * Defines the core size (number of threads) of the secondary worker thread pool. + */ + public String SECONDARY_WORKER_POOL_SIZE_CORE = "secondary_worker_pool_size_core"; + + /** + * Defines the maximum size (number of threads) of the secondary worker thread pool. + */ + public String SECONDARY_WORKER_POOL_SIZE_MAX = "secondary_worker_pool_size_max"; + + /** + * Defines the keep-alive time for extra threads in the secondary worker pool. + */ + public String SECONDARY_WORKER_THREAD_KEEPALIVE_SEC = "secondary_worker_thread_keepalive_sec"; + + /** + * Defines the length of the queue that is used to hold Runnable tasks to be executed by the + * secondary worker pool. + */ + public String SECONDARY_WORKER_POOL_QUEUE_LENGTH = "secondary_worker_pool_queue_length"; /** * Defines the IO buffer size */ diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/PassThroughConfiguration.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/PassThroughConfiguration.java index 25313614af..c7d18d2562 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/PassThroughConfiguration.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/PassThroughConfiguration.java @@ -89,22 +89,40 @@ public int getWorkerPoolCoreSize() { return getIntProperty(PassThroughConfigPNames.WORKER_POOL_SIZE_CORE, DEFAULT_WORKER_POOL_SIZE_CORE); } + public int getSecondaryWorkerPoolCoreSize() { + return ConfigurationBuilderUtil.getIntProperty(PassThroughConfigPNames.SECONDARY_WORKER_POOL_SIZE_CORE, + DEFAULT_WORKER_POOL_SIZE_CORE, props); + } public int getWorkerPoolMaxSize() { return getIntProperty(PassThroughConfigPNames.WORKER_POOL_SIZE_MAX, DEFAULT_WORKER_POOL_SIZE_MAX); } + public int getSecondaryWorkerPoolMaxSize() { + return ConfigurationBuilderUtil.getIntProperty(PassThroughConfigPNames.SECONDARY_WORKER_POOL_SIZE_MAX, + DEFAULT_WORKER_POOL_SIZE_MAX, props); + } public int getWorkerThreadKeepaliveSec() { return getIntProperty(PassThroughConfigPNames.WORKER_THREAD_KEEP_ALIVE_SEC, DEFAULT_WORKER_THREAD_KEEPALIVE_SEC); } + public int getSecondaryWorkerThreadKeepaliveSec() { + return ConfigurationBuilderUtil.getIntProperty(PassThroughConfigPNames.SECONDARY_WORKER_THREAD_KEEPALIVE_SEC, + DEFAULT_WORKER_THREAD_KEEPALIVE_SEC, props); + } + public int getWorkerPoolQueueLen() { return getIntProperty(PassThroughConfigPNames.WORKER_POOL_QUEUE_LENGTH, DEFAULT_WORKER_POOL_QUEUE_LENGTH); } + public int getSecondaryWorkerPoolQueueLen() { + return ConfigurationBuilderUtil.getIntProperty(PassThroughConfigPNames.SECONDARY_WORKER_POOL_QUEUE_LENGTH, + DEFAULT_WORKER_POOL_QUEUE_LENGTH, props); + } + public int getIOThreadsPerReactor() { return getIntProperty(PassThroughConfigPNames.IO_THREADS_PER_REACTOR, DEFAULT_IO_THREADS_PER_REACTOR);