Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve TDS protocol violation occurring due to parallel query enforced #1937

Merged
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
92a3ae6
Fixed error: “lost connection to parallel worker” when running parall…
Oct 5, 2023
ec6a648
Merge branch 'BABEL_3_X_DEV' into jira-babel-4393
Deepesh125 Oct 12, 2023
6b2a509
Re-enable parallel query tests marked for BABEL-4393
Oct 12, 2023
eabf316
Fix test cases
Oct 12, 2023
e8745f7
Merge branch 'babelfish-for-postgresql:BABEL_3_X_DEV' into jira-babel…
Deepesh125 Oct 18, 2023
9225395
Address review comments
Oct 18, 2023
c5ddb5d
Address review comments
Oct 18, 2023
eb02b03
Merge branch 'babelfish-for-postgresql:BABEL_3_X_DEV' into jira-babel…
Deepesh125 Oct 19, 2023
4a8dc87
Resolve TDS protocol violation occurring due to parallel query enforced
Oct 19, 2023
7fd2270
Merge branch 'babelfish-for-postgresql:BABEL_3_X_DEV' into jira-babel…
Deepesh125 Oct 20, 2023
8e007d7
Addressed review comment
Oct 20, 2023
7e26010
Merge branch 'jira-babel-4393' into jira-babel-4424
Oct 20, 2023
7741ad1
Resolve infinite recursion
Oct 23, 2023
dcf94a3
Handle Append and MergeAppend plan node differently
Oct 23, 2023
dd1bc90
Update schedule file for parallel testing
Oct 23, 2023
38c48fc
Fix test cases
Oct 23, 2023
5742ab2
Ignore test time out issue
Oct 24, 2023
9f17a66
Fixed union all test cases
Oct 24, 2023
6f1eae3
Merge branch 'BABEL_3_X_DEV' into jira-babel-4393
Oct 25, 2023
72eeb5d
Handle more cases to hold interrupts
Oct 25, 2023
296260b
Merge branch 'jira-babel-4393' into jira-babel-4424
Oct 25, 2023
514b46e
Merge branch 'BABEL_3_X_DEV' into jira-babel-4424
Oct 25, 2023
d300ae8
Merge branch 'BABEL_3_X_DEV' into jira-babel-4424
Oct 25, 2023
09c4680
Additional test cases
Oct 26, 2023
c480446
Additional test cases for mergeappend node
Oct 26, 2023
69f165e
Additional test cases for mergeappend node
Oct 26, 2023
d74f930
Merge branch 'babelfish-for-postgresql:BABEL_3_X_DEV' into jira-babel…
Deepesh125 Oct 27, 2023
4501556
Merge branch 'babelfish-for-postgresql:BABEL_3_X_DEV' into jira-babel…
Deepesh125 Oct 30, 2023
6432276
Another fixes
Oct 31, 2023
459fe61
Fix new test cases
Nov 2, 2023
61b78cf
Test cases fixes
Nov 3, 2023
1b4326d
Merge branch 'babelfish-for-postgresql:BABEL_3_X_DEV' into jira-babel…
Deepesh125 Nov 3, 2023
1c7f31d
Merge branch 'babelfish-for-postgresql:BABEL_3_X_DEV' into jira-babel…
Deepesh125 Nov 7, 2023
b1d3287
Fixed errors
Nov 7, 2023
4a47172
Fixed test cases
Nov 7, 2023
0ac1f17
Fixed test cases
Nov 7, 2023
8633335
Donot override typmod
Nov 8, 2023
ce3e193
Additional test cases
Nov 8, 2023
e89cbe9
Merge branch 'BABEL_3_X_DEV' into jira-babel-4424
Nov 10, 2023
7a682e0
Addressed test cases
Nov 10, 2023
3dd1093
Merge branch 'BABEL_3_X_DEV' into jira-babel-4424
Deepesh125 Nov 10, 2023
02c5670
Resolved comments
Nov 10, 2023
829e1c9
Fixed output file
Nov 10, 2023
9993fc0
Merge branch 'babelfish-for-postgresql:BABEL_3_X_DEV' into jira-babel…
Deepesh125 Nov 16, 2023
d412745
Addressed comments
Nov 16, 2023
6dfc565
Merge branch 'babelfish-for-postgresql:BABEL_3_X_DEV' into jira-babel…
Deepesh125 Nov 16, 2023
e29ac8c
Initial commit for Babelfish v3.5 (#2027)
basasairohan Nov 17, 2023
ecfc790
Reimplement datediff and dateadd in c to improve performance (#1998)
Jakeowen1 Nov 17, 2023
2aaac7e
Fix test output plan verification (#2022)
wboettge Nov 17, 2023
70b845f
fix jdbc tests fails locally with error " 'tds_fdw' extension does no…
skumawat2025 Nov 20, 2023
4b3e651
Additional checks
Nov 20, 2023
2c0524c
Merge branch 'babelfish-for-postgresql:BABEL_3_X_DEV' into jira-babel…
Deepesh125 Nov 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion contrib/babelfishpg_tds/src/backend/tds/tdsprinttup.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ TdsPrinttupStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
{
DR_printtup *myState = (DR_printtup *) self;
Portal portal = myState->portal;
PlannedStmt *plannedStmt = PortalGetPrimaryStmt(portal);
List *targetList = NIL;

if (portal->strategy != PORTAL_MULTI_QUERY)
Deepesh125 marked this conversation as resolved.
Show resolved Hide resolved
{
targetList = FetchStatementTargetList((Node *) plannedStmt);
Deepesh125 marked this conversation as resolved.
Show resolved Hide resolved
}

/*
* Create I/O buffer to be used for all messages. This cannot be inside
Expand All @@ -78,7 +85,8 @@ TdsPrinttupStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
ALLOCSET_DEFAULT_SIZES);

TdsSendRowDescription(typeinfo,
FetchPortalTargetList(portal),
plannedStmt,
targetList,
portal->formats);
return;
}
Expand Down
155 changes: 138 additions & 17 deletions contrib/babelfishpg_tds/src/backend/tds/tdsresponse.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "miscadmin.h"
#include "nodes/pathnodes.h"
#include "parser/parse_coerce.h"
#include "parser/parsetree.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
Expand Down Expand Up @@ -128,6 +129,8 @@ static void FillTabNameWithoutNumParts(StringInfo buf, uint8 numParts, TdsRelati
static void SetTdsEstateErrorData(void);
static void ResetTdsEstateErrorData(void);
static void SetAttributesForColmetada(TdsColumnMetaData *col);
static int32 resolve_numeric_typmod_from_exp(Plan *plan, Node *expr);
static int32 resolve_numeric_typmod_outer_var(Plan *plan, AttrNumber attno);

static inline void
SendPendingDone(bool more)
Expand Down Expand Up @@ -401,9 +404,112 @@ PrintTupPrepareInfo(DR_printtup *myState, TupleDesc typeinfo, int numAttrs)
}
}

static int32
resolve_numeric_typmod_from_append_or_mergeappend(Plan *plan, AttrNumber attno)
{
ListCell *lc;
int32 max_precision = 0,
max_scale = 0,
precision = 0,
scale = 0,
integralDigitCount = 0,
typmod = -1,
result_typmod = -1;
List *planlist = NIL;
if (IsA(plan, Append))
{
planlist = ((Append *) plan)->appendplans;
}
else if(IsA(plan, MergeAppend))
{
planlist = ((MergeAppend *) plan)->mergeplans;
}

Assert(planlist != NIL);
foreach(lc, planlist)
{
TargetEntry *tle;
Plan *outerplan = (Plan *) lfirst(lc);

/* if outerplan is SubqueryScan then use actual subplan */
if (IsA(outerplan, SubqueryScan))
outerplan = ((SubqueryScan *)outerplan)->subplan;

tle = get_tle_by_resno(outerplan->targetlist, attno);
if (IsA(tle->expr, Var))
{
Var *var = (Var *)tle->expr;
if (var->varno == OUTER_VAR)
{
typmod = resolve_numeric_typmod_outer_var(outerplan, var->varattno);
}
else
{
typmod = resolve_numeric_typmod_from_exp(outerplan, (Node *)tle->expr);
}
}
else
{
typmod = resolve_numeric_typmod_from_exp(outerplan, (Node *)tle->expr);
}
if (typmod == -1)
continue;
scale = (typmod - VARHDRSZ) & 0xffff;
precision = ((typmod - VARHDRSZ) >> 16) & 0xffff;
integralDigitCount = Max(precision - scale, max_precision - max_scale);
max_scale = Max(max_scale, scale);
Deepesh125 marked this conversation as resolved.
Show resolved Hide resolved
max_precision = integralDigitCount + max_scale;
Deepesh125 marked this conversation as resolved.
Show resolved Hide resolved
/*
* If max_precision is more than TDS_MAX_NUM_PRECISION then adjust precision
* to TDS_MAX_NUM_PRECISION at the cost of scale.
*/
if (max_precision > TDS_MAX_NUM_PRECISION)
{
max_scale = Max(0, max_scale - (max_precision - TDS_MAX_NUM_PRECISION));
max_precision = TDS_MAX_NUM_PRECISION;
}
result_typmod = ((max_precision << 16) | max_scale) + VARHDRSZ;
}
/* If max_precision is still default then use tds specific defaults */
if (result_typmod == -1)
{
result_typmod = ((tds_default_numeric_precision << 16) | tds_default_numeric_scale) + VARHDRSZ;
}
return result_typmod;
}

static int32
resolve_numeric_typmod_outer_var(Plan *plan, AttrNumber attno)
{
TargetEntry *tle;
Plan *outerplan = NULL;

if (IsA(plan, Append) || IsA(plan, MergeAppend))
return resolve_numeric_typmod_from_append_or_mergeappend(plan, attno);
else
outerplan = outerPlan(plan);

/* if outerplan is SubqueryScan then use actual subplan */
if (IsA(outerplan, SubqueryScan))
outerplan = ((SubqueryScan *)outerplan)->subplan;

/* outerplan must not be NULL */
Assert(outerplan);
tle = get_tle_by_resno(outerplan->targetlist, attno);
if (IsA(tle->expr, Var))
{
Var *var = (Var *)tle->expr;
if (var->varno == OUTER_VAR)
{
return resolve_numeric_typmod_outer_var(outerplan, var->varattno);
}
}
return resolve_numeric_typmod_from_exp(outerplan, (Node *)tle->expr);
}

/* look for a typmod to return from a numeric expression */
static int32
resolve_numeric_typmod_from_exp(Node *expr)
resolve_numeric_typmod_from_exp(Plan *plan, Node *expr)
{
if (expr == NULL)
return -1;
Expand Down Expand Up @@ -434,6 +540,12 @@ resolve_numeric_typmod_from_exp(Node *expr)
{
Var *var = (Var *) expr;

/* If this var referes to tuple returned by its outer plan then find the original tle from it */
if (var->varno == OUTER_VAR)
{
Assert(plan);
return (resolve_numeric_typmod_outer_var(plan, var->varattno));
}
return var->vartypmod;
}
case T_OpExpr:
Expand Down Expand Up @@ -464,8 +576,8 @@ resolve_numeric_typmod_from_exp(Node *expr)
{
arg1 = linitial(op->args);
arg2 = lsecond(op->args);
typmod1 = resolve_numeric_typmod_from_exp(arg1);
typmod2 = resolve_numeric_typmod_from_exp(arg2);
typmod1 = resolve_numeric_typmod_from_exp(plan, arg1);
typmod2 = resolve_numeric_typmod_from_exp(plan, arg2);
scale1 = (typmod1 - VARHDRSZ) & 0xffff;
precision1 = ((typmod1 - VARHDRSZ) >> 16) & 0xffff;
scale2 = (typmod2 - VARHDRSZ) & 0xffff;
Expand All @@ -474,7 +586,7 @@ resolve_numeric_typmod_from_exp(Node *expr)
else if (list_length(op->args) == 1)
{
arg1 = linitial(op->args);
typmod1 = resolve_numeric_typmod_from_exp(arg1);
typmod1 = resolve_numeric_typmod_from_exp(plan, arg1);
scale1 = (typmod1 - VARHDRSZ) & 0xffff;
precision1 = ((typmod1 - VARHDRSZ) >> 16) & 0xffff;
scale2 = 0;
Expand Down Expand Up @@ -545,7 +657,7 @@ resolve_numeric_typmod_from_exp(Node *expr)
scale = Min(precision, TDS_MAX_NUM_PRECISION) - integralDigitCount;

/*
* precisionn adjustment to TDS_MAX_NUM_PRECISION
* precision adjustment to TDS_MAX_NUM_PRECISION
*/
if (precision > TDS_MAX_NUM_PRECISION)
precision = TDS_MAX_NUM_PRECISION;
Expand Down Expand Up @@ -653,7 +765,7 @@ resolve_numeric_typmod_from_exp(Node *expr)
Assert(nullif->args != NIL);

arg1 = linitial(nullif->args);
return resolve_numeric_typmod_from_exp(arg1);
return resolve_numeric_typmod_from_exp(plan, arg1);
}
case T_CoalesceExpr:
{
Expand All @@ -676,7 +788,7 @@ resolve_numeric_typmod_from_exp(Node *expr)
foreach(lc, coale->args)
{
arg = lfirst(lc);
arg_typmod = resolve_numeric_typmod_from_exp(arg);
arg_typmod = resolve_numeric_typmod_from_exp(plan, arg);
/* return -1 if we fail to resolve one of the arg's typmod */
if (arg_typmod == -1)
return -1;
Expand Down Expand Up @@ -717,7 +829,7 @@ resolve_numeric_typmod_from_exp(Node *expr)
{
casewhen = lfirst(lc);
casewhen_result = (Node *) casewhen->result;
typmod = resolve_numeric_typmod_from_exp(casewhen_result);
typmod = resolve_numeric_typmod_from_exp(plan, casewhen_result);

/*
* return -1 if we fail to resolve one of the result's
Expand Down Expand Up @@ -752,7 +864,7 @@ resolve_numeric_typmod_from_exp(Node *expr)
Assert(aggref->args != NIL);

te = (TargetEntry *) linitial(aggref->args);
typmod = resolve_numeric_typmod_from_exp((Node *) te->expr);
typmod = resolve_numeric_typmod_from_exp(plan, (Node *) te->expr);
aggFuncName = get_func_name(aggref->aggfnoid);

scale = (typmod - VARHDRSZ) & 0xffff;
Expand Down Expand Up @@ -798,7 +910,7 @@ resolve_numeric_typmod_from_exp(Node *expr)
{
PlaceHolderVar *phv = (PlaceHolderVar *) expr;

return resolve_numeric_typmod_from_exp((Node *) phv->phexpr);
return resolve_numeric_typmod_from_exp(plan, (Node *) phv->phexpr);
}
case T_RelabelType:
{
Expand All @@ -807,7 +919,7 @@ resolve_numeric_typmod_from_exp(Node *expr)
if (rlt->resulttypmod != -1)
return rlt->resulttypmod;
else
return resolve_numeric_typmod_from_exp((Node *) rlt->arg);
return resolve_numeric_typmod_from_exp(plan, (Node *) rlt->arg);
}
/* TODO handle more Expr types if needed */
default:
Expand Down Expand Up @@ -1562,8 +1674,8 @@ TdsGetGenericTypmod(Node *expr)
* for a relation. (used for keyset and dynamic cursors)
*/
void
PrepareRowDescription(TupleDesc typeinfo, List *targetlist, int16 *formats,
bool extendedInfo, bool fetchPkeys)
PrepareRowDescription(TupleDesc typeinfo, PlannedStmt *plannedstmt, List *targetlist,
int16 *formats, bool extendedInfo, bool fetchPkeys)
{
int natts = typeinfo->natts;
int attno;
Expand Down Expand Up @@ -1782,7 +1894,15 @@ PrepareRowDescription(TupleDesc typeinfo, List *targetlist, int16 *formats,
* than -1.
*/
if (atttypmod == -1 && tle != NULL)
atttypmod = resolve_numeric_typmod_from_exp((Node *) tle->expr);
{
if (!plannedstmt)
{
ereport(ERROR,
Deepesh125 marked this conversation as resolved.
Show resolved Hide resolved
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Internal error detected while calculating the precision of numeric expression")));
}
atttypmod = resolve_numeric_typmod_from_exp(plannedstmt->planTree, (Node *) tle->expr);
}

/*
* Get the precision and scale out of the typmod value if
Expand Down Expand Up @@ -2558,7 +2678,7 @@ TdsSendInfoOrError(int token, int number, int state, int class,
}

void
TdsSendRowDescription(TupleDesc typeinfo,
TdsSendRowDescription(TupleDesc typeinfo, PlannedStmt *plannedstmt,
List *targetlist, int16 *formats)
{
TDSRequest request = TdsRequestCtrl->request;
Expand All @@ -2567,7 +2687,7 @@ TdsSendRowDescription(TupleDesc typeinfo,
Assert(typeinfo != NULL);

/* Prepare the column metadata first */
PrepareRowDescription(typeinfo, targetlist, formats, false, false);
PrepareRowDescription(typeinfo, plannedstmt, targetlist, formats, false, false);

/*
* If fNoMetadata flags is set in RPC header flag, the server doesn't need
Expand Down Expand Up @@ -3293,7 +3413,8 @@ TDSStatementExceptionCallback(PLtsql_execstate *estate, PLtsql_stmt *stmt, bool
void
SendColumnMetadata(TupleDesc typeinfo, List *targetlist, int16 *formats)
{
TdsSendRowDescription(typeinfo, targetlist, formats);
/* This will only be used for sp_preapre request hence do not need to pass plannedstmt */
TdsSendRowDescription(typeinfo, NULL, targetlist, formats);
TdsPrintTupShutdown();
}

Expand Down
13 changes: 9 additions & 4 deletions contrib/babelfishpg_tds/src/backend/tds/tdsrpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -2334,10 +2334,15 @@ static void
SendCursorResponse(TDSRequestSP req)
{
int cmd_type = TDS_CMD_UNKNOWN;
Portal portal;

/* fetch the portal */
portal = GetPortalFromCursorHandle(req->cursorHandle, false);
Portal portal = GetPortalFromCursorHandle(req->cursorHandle, false);
Deepesh125 marked this conversation as resolved.
Show resolved Hide resolved
PlannedStmt *plannedStmt = PortalGetPrimaryStmt(portal);
List *targetList = NIL;

if (portal->strategy != PORTAL_MULTI_QUERY)
{
targetList = FetchStatementTargetList((Node *) plannedStmt);
}

/*
* If we are in aborted transaction state, we can't run
Expand Down Expand Up @@ -2366,7 +2371,7 @@ SendCursorResponse(TDSRequestSP req)
* break the protocol. We also need to fetch the primary keys for dynamic
* and keyset cursors (XXX: these cursors are not yet implemented).
*/
PrepareRowDescription(portal->tupDesc, FetchPortalTargetList(portal),
PrepareRowDescription(portal->tupDesc, plannedStmt, targetList,
portal->formats, true,
(req->scrollopt & (SP_CURSOR_SCROLLOPT_DYNAMIC | SP_CURSOR_SCROLLOPT_KEYSET)));

Expand Down
6 changes: 3 additions & 3 deletions contrib/babelfishpg_tds/src/include/tds_response.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ extern void TdsSendDone(int tag, int status,
extern void SendColumnMetadataToken(int natts, bool sendRowStat);
extern void SendTabNameToken(void);
extern void SendColInfoToken(int natts, bool sendRowStat);
extern void PrepareRowDescription(TupleDesc typeinfo, List *targetlist, int16 *formats,
bool extendedInfo, bool fetchPkeys);
extern void PrepareRowDescription(TupleDesc typeinfo, PlannedStmt *plannedstmt, List *targetlist,
int16 *formats, bool extendedInfo, bool fetchPkeys);
extern void SendReturnValueTokenInternal(ParameterToken token, uint8 status,
FmgrInfo *finfo, Datum datum, bool isNull,
bool forceCoercion);
Expand All @@ -85,7 +85,7 @@ extern void TdsSendEnvChangeBinary(int envid,
void *old, int old_nbytes);
extern void TdsSendReturnStatus(int status);
extern void TdsSendHandle(void);
extern void TdsSendRowDescription(TupleDesc typeinfo,
extern void TdsSendRowDescription(TupleDesc typeinfo, PlannedStmt *PlannedStmt,
List *targetlist, int16 *formats);
extern bool TdsPrintTup(TupleTableSlot *slot, DestReceiver *self);
extern void TdsPrintTupShutdown(void);
Expand Down
Loading