Skip to content

Commit

Permalink
Merge pull request #267 from se7entyse7en/spark-query-cancellation
Browse files Browse the repository at this point in the history
Spark query cancellation
  • Loading branch information
se7entyse7en authored Sep 5, 2019
2 parents 2bde4ab + 073e415 commit 2d0723f
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
### Added

- Superset updated to v0.34.0rc1 which brings many improvements and bug fixes ([#250](https://github.com/src-d/sourced-ui/issues/250))
- Add support for Spark query cancelation ([#223](https://github.com/src-d/sourced-ui/issues/223))

### Changed

Expand Down
2 changes: 1 addition & 1 deletion superset/requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-e git+https://github.com/src-d/[email protected]srcd1#egg=PyHive
-e git+https://github.com/src-d/[email protected]srcd2#egg=PyHive
black==19.3b0
coverage==4.5.3
flake8-import-order==0.18.1
Expand Down
82 changes: 82 additions & 0 deletions superset/superset/db_engine_specs/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,3 +440,85 @@ def select_star(
latest_partition=latest_partition,
cols=cols,
)

@classmethod
def fetch_data(cls, cursor, limit):
import pyhive
from TCLIService import ttypes

try:
return super(SparkSQLEngineSpec, cls).fetch_data(cursor, limit)
except pyhive.exc.OperationalError as op_err:
# FIXME: The `pyhive.exc.OperationalError` exception is raised when
# a query is cancelled. This seems to happen because `gsc` expects
# the state to be `FINISHED` but got `CANCELED`. This has to be
# fixed once `gsc` correctly handles cancelation.
cancelation_error_msg = "Expected state FINISHED, but found CANCELED"
if (
len(op_err.args) > 0
and isinstance(op_err.args[0], ttypes.TFetchResultsResp)
# pylint: disable=no-member
and op_err.args[0].status.errorMessage == cancelation_error_msg
):
logging.warning("Query has been cancelled, returning empty result")
return []

raise op_err

@classmethod
def _dumps_operation_handle(cls, op_handle):
return dict(
op_handle.__dict__,
operationId={
"guid": op_handle.operationId.guid.decode("ISO-8859-1"),
"secret": op_handle.operationId.secret.decode("ISO-8859-1"),
},
)

@classmethod
def _loads_operation_handle(cls, op_handle):
from pyhive import hive

op_handle["operationId"] = hive.ttypes.THandleIdentifier(
**{k: v.encode("ISO-8859-1") for k, v in op_handle["operationId"].items()}
)

return hive.ttypes.TOperationHandle(**op_handle)

@classmethod
def get_connection_id(cls, cursor):
"""Returns connection id for a cursor
Just uses a hard-coded dummy id. This is done because queries
corresponding to a cursor without a connection id are interpreted as
non-cancellable.
A more suitable value should be the session id of the underlying
connection of the cursor, but that"s not an integer, and here an
integer is required."""

return 1

@classmethod
def handle_cursor(cls, cursor, query, session):
"""Handle a live cursor between the execute and fetchall calls
Adds the json dumps of the `pyhive.hive.ttypes.TOperationHandle`s
to the query object."""

operation_handles = query.extra.get("operation_handles", [])
operation_handles.append(cls._dumps_operation_handle(cursor._operationHandle))
query.set_extra_json_key("operation_handles", operation_handles)
session.commit()
logging.info("Current operation handles: %s", operation_handles)

super(SparkSQLEngineSpec, cls).handle_cursor(cursor, query, session)

@classmethod
def cancel_query(cls, cursor, query):
"""Cancels query in the underlying database"""

logging.info("Cancelling query with id: `%s`", query.id)
for op_handle in query.extra.get("operation_handles", []):
logging.info("Cancelling operation handle: `%s`", op_handle)
cursor.cancel(operation_handle=cls._loads_operation_handle(op_handle))
6 changes: 6 additions & 0 deletions superset/superset/sql_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ def execute_sql_statements(
with closing(engine.raw_connection()) as conn:
with closing(conn.cursor()) as cursor:
query.connection_id = db_engine_spec.get_connection_id(cursor)
session.commit()
statement_count = len(statements)
for i, statement in enumerate(statements):
# check if the query was stopped
Expand Down Expand Up @@ -368,7 +369,11 @@ def execute_sql_statements(


def cancel_query(query, user_name):
logging.info(
"Query with id `%s` has connection id `%s`", query.id, query.connection_id
)
if not query.connection_id:
logging.info("No connection id found, query cancellation skipped")
return

database = query.database
Expand All @@ -382,4 +387,5 @@ def cancel_query(query, user_name):

with closing(engine.raw_connection()) as conn:
with closing(conn.cursor()) as cursor:
logging.info("Calling `cancel_query` on db engine")
db_engine_spec.cancel_query(cursor, query)
7 changes: 5 additions & 2 deletions superset/superset/views/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2428,11 +2428,14 @@ def stop_query(self):
client_id = request.form.get("client_id")
try:
query = db.session.query(Query).filter_by(client_id=client_id).one()
logging.info("Query retrieved with id `%s`", query.id)
query.status = QueryStatus.STOPPED

db.session.commit()
logging.info("Committed status change for query with id `%s`", query.id)
sql_lab.cancel_query(query, g.user.username if g.user else None)
except Exception:
pass
except Exception as e:
return json_error_response("{}".format(e))
return self.json_response("OK")

@has_access_api
Expand Down

0 comments on commit 2d0723f

Please sign in to comment.