Skip to content

Commit

Permalink
AND query pushdown for EXCEL and ODF
Browse files Browse the repository at this point in the history
  • Loading branch information
ergo70 committed Feb 13, 2020
1 parent 95c4192 commit 223da83
Showing 1 changed file with 25 additions and 6 deletions.
31 changes: 25 additions & 6 deletions cloudfs_fdw/cloudfs_fdw.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def can_sort(self, sortkeys):

if self.format in ['xls', 'xlsx', 'odf']:
return sortkeys

return can_sort

def execute(self, quals, columns, sortkeys=None):
Expand All @@ -116,7 +116,7 @@ def execute(self, quals, columns, sortkeys=None):
yield row

elif self.format in ['xls', 'xlsx', 'odf']:
for row in self._render_excel_or_odf(data_stream, sortkeys):
for row in self._render_excel_or_odf(data_stream, quals, sortkeys):
yield row

else:
Expand All @@ -139,7 +139,7 @@ def _render_json(self, data_stream):
for obj in object_stream:
yield obj.values()[:len(self.columns)]

def _render_excel_or_odf(self, data_stream, sortkeys):
def _render_excel_or_odf(self, data_stream, quals, sortkeys):
engine = 'xlrd'

if self.format == 'odf':
Expand All @@ -148,16 +148,35 @@ def _render_excel_or_odf(self, data_stream, sortkeys):
object_stream = pandas.read_excel(
data_stream, sheet_name=self.sheet, header=0 if self.skip_header else None, engine=engine)

object_stream.columns = [column.replace(
" ", "_") for column in object_stream.columns]
object_stream.columns = [column.replace(
":", "_") for column in object_stream.columns]

if quals or sortkeys:
df_columns = object_stream.columns.values

if quals:
query = ''
column_names = [*self.columns.keys()]

for qual in quals:
column_index = column_names.index(qual.field_name)
query += df_columns[column_index] + ('==' if qual.operator == '=' else qual.operator) + (
('"' + str(qual.value) + '"') if type(qual.value is str) else str(qual.value)) + ' and '

object_stream.query(expr=query[:-5], inplace=True)

if sortkeys:
columns = object_stream.columns.values
sort_columns = []
sort_orders = []

for sortkey in sortkeys:
sort_columns.append(columns[sortkey.attnum - 1])
sort_columns.append(df_columns[sortkey.attnum - 1])
sort_orders.append(not sortkey.is_reversed)

object_stream.sort_values(by=sort_columns, axis=0, ascending=sort_orders, inplace=True)
object_stream.sort_values(
by=sort_columns, axis=0, ascending=sort_orders, inplace=True)

for row in object_stream.iterrows():
yield row[1].values[:len(self.columns)]

0 comments on commit 223da83

Please sign in to comment.