Skip to content

Commit

Permalink
Resolve TDS protocol violation occurring due to parallel query enforc…
Browse files Browse the repository at this point in the history
…ed (babelfish-for-postgresql#1937)

This issue was mainly caused by incorrect inference of typmod information for numeric expression where its being set
to default when Parallel worker is planned. Reason for the same is explained below:

When parallel query is enforced, Postgres engine planes Gather node on the top of top plan. For example, consider
following query -

SELECT amount + 100 FROM overflow_test where id = 1

-- If parallel query is not enforced- 
Query Text: SELECT amount + 100 FROM overflow_test where id = 1
Index Scan using overflow_test_pkey on master_dbo.overflow_test  (cost=0.15..8.17 rows=1 width=32)
  Output: ((amount)::numeric + '100'::numeric)
  Index Cond: (overflow_test.id = 1)

-- If parallel query is enforced then Postgres will wrap the above plan under Gather node - 
Query Text: SELECT amount + 100 FROM overflow_test where id = 1                                                                                                                                                                                 
Gather  (cost=0.15..8.17 rows=1 width=32)                                                                                                                                                                                                       
  Output: (((amount)::numeric + '100'::numeric))                                                                                                                                                                                                
  Workers Planned: 1                                                                                                                                                                                                                            
  Single Copy: true                                                                                                                                                                                                                             
  ->  Index Scan using overflow_test_pkey on master_dbo.overflow_test  (cost=0.15..8.17 rows=1 width=32)                                                                                                                                        
        Output: ((amount)::numeric + '100'::numeric)                                                                                                                                                                                            
        Index Cond: (overflow_test.id = 1)    

When Postgres does this, it also modifies targetlist of gather node to refer to the tuples returned by its lefttree subplan
or outerplan (Indexscan in this case). Hence, targetlist of gather node will contain only one tle in above case which is
Var with attno = OUTER_VAR (it indicates reference to its outer plan) and vartypmod being -1.

Now, While sending metadata from TDS layer we use this typmod to deduce max_scale and precision for numeric. This
would be set to default since vartypmod = -1 whereas actual data being computed may not fit in it hence it will run into
an error while sending numeric response. Hence, hang or crash of end client software.

This commit fixes this issue by taking special Var into account while We extract the information of outer plan from
planned stmt and use the same to get the original/referenced tle from outer plan.

This commit further improves the implementation of resolve_numeric_typmod_from_exp to handle reference to outer
var correctly. It can now handle Append and MergeAppend node correctly which was required for following test case -

-- setup
create table t1 (a numeric(6,4), b numeric(6,3));
insert into t1 values (4, 16);
insert into t1 values (10.1234, 10.123);
insert into t1 values (1.2, 6);
insert into t1 values (NULL, 101.123);

-- test
select a from t1 UNION All
select b from t1;
For this UNION ALL expression, typmod was always computed to be -1 which is not right and it should (7,4) ideally. This commit introduces helped function called resolve_numeric_typmod_from_append_or_mergeappend to compute typmod for expression correctly.

Task: BABEL-4424, BABEL-4359
Signed-off-by: Dipesh Dhameliya <[email protected]>
  • Loading branch information
Deepesh125 authored and Jason Teng committed Dec 28, 2023
1 parent 369fe79 commit 85a6a3a
Show file tree
Hide file tree
Showing 12 changed files with 1,475 additions and 75 deletions.
10 changes: 9 additions & 1 deletion contrib/babelfishpg_tds/src/backend/tds/tdsprinttup.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ TdsPrinttupStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
{
DR_printtup *myState = (DR_printtup *) self;
Portal portal = myState->portal;
PlannedStmt *plannedStmt = PortalGetPrimaryStmt(portal);
List *targetList = NIL;

if (portal->strategy != PORTAL_MULTI_QUERY)
{
targetList = FetchStatementTargetList((Node *) plannedStmt);
}

/*
* Create I/O buffer to be used for all messages. This cannot be inside
Expand All @@ -78,7 +85,8 @@ TdsPrinttupStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
ALLOCSET_DEFAULT_SIZES);

TdsSendRowDescription(typeinfo,
FetchPortalTargetList(portal),
plannedStmt,
targetList,
portal->formats);
return;
}
Expand Down
156 changes: 139 additions & 17 deletions contrib/babelfishpg_tds/src/backend/tds/tdsresponse.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "miscadmin.h"
#include "nodes/pathnodes.h"
#include "parser/parse_coerce.h"
#include "parser/parsetree.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
Expand Down Expand Up @@ -129,6 +130,8 @@ static void FillTabNameWithoutNumParts(StringInfo buf, uint8 numParts, TdsRelati
static void SetTdsEstateErrorData(void);
static void ResetTdsEstateErrorData(void);
static void SetAttributesForColmetada(TdsColumnMetaData *col);
static int32 resolve_numeric_typmod_from_exp(Plan *plan, Node *expr);
static int32 resolve_numeric_typmod_outer_var(Plan *plan, AttrNumber attno);

static inline void
SendPendingDone(bool more)
Expand Down Expand Up @@ -402,9 +405,112 @@ PrintTupPrepareInfo(DR_printtup *myState, TupleDesc typeinfo, int numAttrs)
}
}

static int32
resolve_numeric_typmod_from_append_or_mergeappend(Plan *plan, AttrNumber attno)
{
ListCell *lc;
int32 max_precision = 0,
max_scale = 0,
precision = 0,
scale = 0,
integralDigitCount = 0,
typmod = -1,
result_typmod = -1;
List *planlist = NIL;
if (IsA(plan, Append))
{
planlist = ((Append *) plan)->appendplans;
}
else if(IsA(plan, MergeAppend))
{
planlist = ((MergeAppend *) plan)->mergeplans;
}

Assert(planlist != NIL);
foreach(lc, planlist)
{
TargetEntry *tle;
Plan *outerplan = (Plan *) lfirst(lc);

/* if outerplan is SubqueryScan then use actual subplan */
if (IsA(outerplan, SubqueryScan))
outerplan = ((SubqueryScan *)outerplan)->subplan;

tle = get_tle_by_resno(outerplan->targetlist, attno);
if (IsA(tle->expr, Var))
{
Var *var = (Var *)tle->expr;
if (var->varno == OUTER_VAR)
{
typmod = resolve_numeric_typmod_outer_var(outerplan, var->varattno);
}
else
{
typmod = resolve_numeric_typmod_from_exp(outerplan, (Node *)tle->expr);
}
}
else
{
typmod = resolve_numeric_typmod_from_exp(outerplan, (Node *)tle->expr);
}
if (typmod == -1)
continue;
scale = (typmod - VARHDRSZ) & 0xffff;
precision = ((typmod - VARHDRSZ) >> 16) & 0xffff;
integralDigitCount = Max(precision - scale, max_precision - max_scale);
max_scale = Max(max_scale, scale);
max_precision = integralDigitCount + max_scale;
/*
* If max_precision is more than TDS_MAX_NUM_PRECISION then adjust precision
* to TDS_MAX_NUM_PRECISION at the cost of scale.
*/
if (max_precision > TDS_MAX_NUM_PRECISION)
{
max_scale = Max(0, max_scale - (max_precision - TDS_MAX_NUM_PRECISION));
max_precision = TDS_MAX_NUM_PRECISION;
}
result_typmod = ((max_precision << 16) | max_scale) + VARHDRSZ;
}
/* If max_precision is still default then use tds specific defaults */
if (result_typmod == -1)
{
result_typmod = ((tds_default_numeric_precision << 16) | tds_default_numeric_scale) + VARHDRSZ;
}
return result_typmod;
}

static int32
resolve_numeric_typmod_outer_var(Plan *plan, AttrNumber attno)
{
TargetEntry *tle;
Plan *outerplan = NULL;

if (IsA(plan, Append) || IsA(plan, MergeAppend))
return resolve_numeric_typmod_from_append_or_mergeappend(plan, attno);
else
outerplan = outerPlan(plan);

/* if outerplan is SubqueryScan then use actual subplan */
if (IsA(outerplan, SubqueryScan))
outerplan = ((SubqueryScan *)outerplan)->subplan;

/* outerplan must not be NULL */
Assert(outerplan);
tle = get_tle_by_resno(outerplan->targetlist, attno);
if (IsA(tle->expr, Var))
{
Var *var = (Var *)tle->expr;
if (var->varno == OUTER_VAR)
{
return resolve_numeric_typmod_outer_var(outerplan, var->varattno);
}
}
return resolve_numeric_typmod_from_exp(outerplan, (Node *)tle->expr);
}

/* look for a typmod to return from a numeric expression */
static int32
resolve_numeric_typmod_from_exp(Node *expr)
resolve_numeric_typmod_from_exp(Plan *plan, Node *expr)
{
if (expr == NULL)
return -1;
Expand Down Expand Up @@ -435,6 +541,12 @@ resolve_numeric_typmod_from_exp(Node *expr)
{
Var *var = (Var *) expr;

/* If this var referes to tuple returned by its outer plan then find the original tle from it */
if (var->varno == OUTER_VAR)
{
Assert(plan);
return (resolve_numeric_typmod_outer_var(plan, var->varattno));
}
return var->vartypmod;
}
case T_OpExpr:
Expand Down Expand Up @@ -465,8 +577,8 @@ resolve_numeric_typmod_from_exp(Node *expr)
{
arg1 = linitial(op->args);
arg2 = lsecond(op->args);
typmod1 = resolve_numeric_typmod_from_exp(arg1);
typmod2 = resolve_numeric_typmod_from_exp(arg2);
typmod1 = resolve_numeric_typmod_from_exp(plan, arg1);
typmod2 = resolve_numeric_typmod_from_exp(plan, arg2);
scale1 = (typmod1 - VARHDRSZ) & 0xffff;
precision1 = ((typmod1 - VARHDRSZ) >> 16) & 0xffff;
scale2 = (typmod2 - VARHDRSZ) & 0xffff;
Expand All @@ -475,7 +587,7 @@ resolve_numeric_typmod_from_exp(Node *expr)
else if (list_length(op->args) == 1)
{
arg1 = linitial(op->args);
typmod1 = resolve_numeric_typmod_from_exp(arg1);
typmod1 = resolve_numeric_typmod_from_exp(plan, arg1);
scale1 = (typmod1 - VARHDRSZ) & 0xffff;
precision1 = ((typmod1 - VARHDRSZ) >> 16) & 0xffff;
scale2 = 0;
Expand Down Expand Up @@ -546,7 +658,7 @@ resolve_numeric_typmod_from_exp(Node *expr)
scale = Min(precision, TDS_MAX_NUM_PRECISION) - integralDigitCount;

/*
* precisionn adjustment to TDS_MAX_NUM_PRECISION
* precision adjustment to TDS_MAX_NUM_PRECISION
*/
if (precision > TDS_MAX_NUM_PRECISION)
precision = TDS_MAX_NUM_PRECISION;
Expand Down Expand Up @@ -654,7 +766,7 @@ resolve_numeric_typmod_from_exp(Node *expr)
Assert(nullif->args != NIL);

arg1 = linitial(nullif->args);
return resolve_numeric_typmod_from_exp(arg1);
return resolve_numeric_typmod_from_exp(plan, arg1);
}
case T_CoalesceExpr:
{
Expand All @@ -677,7 +789,7 @@ resolve_numeric_typmod_from_exp(Node *expr)
foreach(lc, coale->args)
{
arg = lfirst(lc);
arg_typmod = resolve_numeric_typmod_from_exp(arg);
arg_typmod = resolve_numeric_typmod_from_exp(plan, arg);
/* return -1 if we fail to resolve one of the arg's typmod */
if (arg_typmod == -1)
return -1;
Expand Down Expand Up @@ -718,7 +830,7 @@ resolve_numeric_typmod_from_exp(Node *expr)
{
casewhen = lfirst(lc);
casewhen_result = (Node *) casewhen->result;
typmod = resolve_numeric_typmod_from_exp(casewhen_result);
typmod = resolve_numeric_typmod_from_exp(plan, casewhen_result);

/*
* return -1 if we fail to resolve one of the result's
Expand Down Expand Up @@ -753,7 +865,7 @@ resolve_numeric_typmod_from_exp(Node *expr)
Assert(aggref->args != NIL);

te = (TargetEntry *) linitial(aggref->args);
typmod = resolve_numeric_typmod_from_exp((Node *) te->expr);
typmod = resolve_numeric_typmod_from_exp(plan, (Node *) te->expr);
aggFuncName = get_func_name(aggref->aggfnoid);

scale = (typmod - VARHDRSZ) & 0xffff;
Expand Down Expand Up @@ -799,7 +911,7 @@ resolve_numeric_typmod_from_exp(Node *expr)
{
PlaceHolderVar *phv = (PlaceHolderVar *) expr;

return resolve_numeric_typmod_from_exp((Node *) phv->phexpr);
return resolve_numeric_typmod_from_exp(plan, (Node *) phv->phexpr);
}
case T_RelabelType:
{
Expand All @@ -808,7 +920,7 @@ resolve_numeric_typmod_from_exp(Node *expr)
if (rlt->resulttypmod != -1)
return rlt->resulttypmod;
else
return resolve_numeric_typmod_from_exp((Node *) rlt->arg);
return resolve_numeric_typmod_from_exp(plan, (Node *) rlt->arg);
}
/* TODO handle more Expr types if needed */
default:
Expand Down Expand Up @@ -1563,8 +1675,8 @@ TdsGetGenericTypmod(Node *expr)
* for a relation. (used for keyset and dynamic cursors)
*/
void
PrepareRowDescription(TupleDesc typeinfo, List *targetlist, int16 *formats,
bool extendedInfo, bool fetchPkeys)
PrepareRowDescription(TupleDesc typeinfo, PlannedStmt *plannedstmt, List *targetlist,
int16 *formats, bool extendedInfo, bool fetchPkeys)
{
int natts = typeinfo->natts;
int attno;
Expand Down Expand Up @@ -1783,7 +1895,16 @@ PrepareRowDescription(TupleDesc typeinfo, List *targetlist, int16 *formats,
* than -1.
*/
if (atttypmod == -1 && tle != NULL)
atttypmod = resolve_numeric_typmod_from_exp((Node *) tle->expr);
{
if (!plannedstmt || !plannedstmt->planTree)
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Internal error detected while calculating the precision of numeric expression"),
errhint("plannedstmt is NULL while calculating the precision of numeric expression when it contains outer var")));
}
atttypmod = resolve_numeric_typmod_from_exp(plannedstmt->planTree, (Node *) tle->expr);
}

/*
* Get the precision and scale out of the typmod value if
Expand Down Expand Up @@ -2559,7 +2680,7 @@ TdsSendInfoOrError(int token, int number, int state, int class,
}

void
TdsSendRowDescription(TupleDesc typeinfo,
TdsSendRowDescription(TupleDesc typeinfo, PlannedStmt *plannedstmt,
List *targetlist, int16 *formats)
{
TDSRequest request = TdsRequestCtrl->request;
Expand All @@ -2568,7 +2689,7 @@ TdsSendRowDescription(TupleDesc typeinfo,
Assert(typeinfo != NULL);

/* Prepare the column metadata first */
PrepareRowDescription(typeinfo, targetlist, formats, false, false);
PrepareRowDescription(typeinfo, plannedstmt, targetlist, formats, false, false);

/*
* If fNoMetadata flags is set in RPC header flag, the server doesn't need
Expand Down Expand Up @@ -3294,7 +3415,8 @@ TDSStatementExceptionCallback(PLtsql_execstate *estate, PLtsql_stmt *stmt, bool
void
SendColumnMetadata(TupleDesc typeinfo, List *targetlist, int16 *formats)
{
TdsSendRowDescription(typeinfo, targetlist, formats);
/* This will only be used for sp_preapre request hence do not need to pass plannedstmt */
TdsSendRowDescription(typeinfo, NULL, targetlist, formats);
TdsPrintTupShutdown();
}

Expand Down
13 changes: 9 additions & 4 deletions contrib/babelfishpg_tds/src/backend/tds/tdsrpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -2334,10 +2334,15 @@ static void
SendCursorResponse(TDSRequestSP req)
{
int cmd_type = TDS_CMD_UNKNOWN;
Portal portal;

/* fetch the portal */
portal = GetPortalFromCursorHandle(req->cursorHandle, false);
Portal portal = GetPortalFromCursorHandle(req->cursorHandle, false);
PlannedStmt *plannedStmt = PortalGetPrimaryStmt(portal);
List *targetList = NIL;

if (portal->strategy != PORTAL_MULTI_QUERY)
{
targetList = FetchStatementTargetList((Node *) plannedStmt);
}

/*
* If we are in aborted transaction state, we can't run
Expand Down Expand Up @@ -2366,7 +2371,7 @@ SendCursorResponse(TDSRequestSP req)
* break the protocol. We also need to fetch the primary keys for dynamic
* and keyset cursors (XXX: these cursors are not yet implemented).
*/
PrepareRowDescription(portal->tupDesc, FetchPortalTargetList(portal),
PrepareRowDescription(portal->tupDesc, plannedStmt, targetList,
portal->formats, true,
(req->scrollopt & (SP_CURSOR_SCROLLOPT_DYNAMIC | SP_CURSOR_SCROLLOPT_KEYSET)));

Expand Down
6 changes: 3 additions & 3 deletions contrib/babelfishpg_tds/src/include/tds_response.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ extern void TdsSendDone(int tag, int status,
extern void SendColumnMetadataToken(int natts, bool sendRowStat);
extern void SendTabNameToken(void);
extern void SendColInfoToken(int natts, bool sendRowStat);
extern void PrepareRowDescription(TupleDesc typeinfo, List *targetlist, int16 *formats,
bool extendedInfo, bool fetchPkeys);
extern void PrepareRowDescription(TupleDesc typeinfo, PlannedStmt *plannedstmt, List *targetlist,
int16 *formats, bool extendedInfo, bool fetchPkeys);
extern void SendReturnValueTokenInternal(ParameterToken token, uint8 status,
FmgrInfo *finfo, Datum datum, bool isNull,
bool forceCoercion);
Expand All @@ -85,7 +85,7 @@ extern void TdsSendEnvChangeBinary(int envid,
void *old, int old_nbytes);
extern void TdsSendReturnStatus(int status);
extern void TdsSendHandle(void);
extern void TdsSendRowDescription(TupleDesc typeinfo,
extern void TdsSendRowDescription(TupleDesc typeinfo, PlannedStmt *PlannedStmt,
List *targetlist, int16 *formats);
extern bool TdsPrintTup(TupleTableSlot *slot, DestReceiver *self);
extern void TdsPrintTupShutdown(void);
Expand Down
Loading

0 comments on commit 85a6a3a

Please sign in to comment.