Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark query cancellation #267

Merged
merged 9 commits into from
Sep 5, 2019

Conversation

se7entyse7en
Copy link
Contributor

@se7entyse7en se7entyse7en commented Aug 27, 2019

Closes #223.
Depends on src-d/PyHive#2.

This PR adds support for query cancellation by canceling the underlying Spark job. This is done by saving in the query object the corresponding json serialized operationHandle in the extra field. This operationHandle is then retrieved, marshaled and sent to Thrift server to request the cancelation.

This also adds some logging and sets the log level and log file for celery worker.

At celery layer the task ends correctly without raising any error:

[2019-08-27 13:59:30,711: INFO/MainProcess] Received task: sql_lab.get_sql_results[95b522b4-2336-492a-9784-baadce9f1cc1]
[2019-08-27 13:59:31,098: INFO/MainProcess] Parsing with sqlparse statement SELECT
    r.repository_id AS repo,
    c.commit_author_when AS date,
    cm.file_path AS path,
    UAST_EXTRACT(UAST(f.blob_content, 'Go', '//uast:Import/Path'), 'Value') AS imports
FROM
    repositories AS r
    NATURAL JOIN ref_commits AS rc
    NATURAL JOIN commits AS c
    NATURAL JOIN commit_files AS cm
    NATURAL JOIN files AS f
WHERE rc.ref_name = 'HEAD'
    AND f.file_path REGEXP('.*.go$')
    AND NOT IS_VENDOR(f.file_path)
    AND NOT IS_BINARY(f.blob_content)
    AND f.blob_size < 1000000
[2019-08-27 13:59:31,109: INFO/MainProcess] Executing 1 statement(s)
[2019-08-27 13:59:31,109: INFO/MainProcess] Set query to 'running'
[2019-08-27 13:59:31,109: INFO/MainProcess] Database.get_sqla_engine(). Masked URL: sparksql://gsc:10000/default
[2019-08-27 13:59:31,448: INFO/MainProcess] USE `default`
[2019-08-27 13:59:32,221: INFO/MainProcess] Running statement 1 out of 1
[2019-08-27 13:59:32,244: INFO/MainProcess] Parsing with sqlparse statement SELECT
    r.repository_id AS repo,
    c.commit_author_when AS date,
    cm.file_path AS path,
    UAST_EXTRACT(UAST(f.blob_content, 'Go', '//uast:Import/Path'), 'Value') AS imports
FROM
    repositories AS r
    NATURAL JOIN ref_commits AS rc
    NATURAL JOIN commits AS c
    NATURAL JOIN commit_files AS cm
    NATURAL JOIN files AS f
WHERE rc.ref_name = 'HEAD'
    AND f.file_path REGEXP('.*.go$')
    AND NOT IS_VENDOR(f.file_path)
    AND NOT IS_BINARY(f.blob_content)
    AND f.blob_size < 1000000
[2019-08-27 13:59:32,256: INFO/MainProcess] Parsing with sqlparse statement SELECT
    r.repository_id AS repo,
    c.commit_author_when AS date,
    cm.file_path AS path,
    UAST_EXTRACT(UAST(f.blob_content, 'Go', '//uast:Import/Path'), 'Value') AS imports
FROM
    repositories AS r
    NATURAL JOIN ref_commits AS rc
    NATURAL JOIN commits AS c
    NATURAL JOIN commit_files AS cm
    NATURAL JOIN files AS f
WHERE rc.ref_name = 'HEAD'
    AND f.file_path REGEXP('.*.go$')
    AND NOT IS_VENDOR(f.file_path)
    AND NOT IS_BINARY(f.blob_content)
    AND f.blob_size < 1000000
[2019-08-27 13:59:32,265: INFO/MainProcess] Running query:
SELECT
    r.repository_id AS repo,
    c.commit_author_when AS date,
    cm.file_path AS path,
    UAST_EXTRACT(UAST(f.blob_content, 'Go', '//uast:Import/Path'), 'Value') AS imports
FROM
    repositories AS r
    NATURAL JOIN ref_commits AS rc
    NATURAL JOIN commits AS c
    NATURAL JOIN commit_files AS cm
    NATURAL JOIN files AS f
WHERE rc.ref_name = 'HEAD'
    AND f.file_path REGEXP('.*.go$')
    AND NOT IS_VENDOR(f.file_path)
    AND NOT IS_BINARY(f.blob_content)
    AND f.blob_size < 1000000 LIMIT 100000
[2019-08-27 13:59:32,266: INFO/MainProcess] SELECT
    r.repository_id AS repo,
    c.commit_author_when AS date,
    cm.file_path AS path,
    UAST_EXTRACT(UAST(f.blob_content, 'Go', '//uast:Import/Path'), 'Value') AS imports
FROM
    repositories AS r
    NATURAL JOIN ref_commits AS rc
    NATURAL JOIN commits AS c
    NATURAL JOIN commit_files AS cm
    NATURAL JOIN files AS f
WHERE rc.ref_name = 'HEAD'
    AND f.file_path REGEXP('.*.go$')
    AND NOT IS_VENDOR(f.file_path)
    AND NOT IS_BINARY(f.blob_content)
    AND f.blob_size < 1000000 LIMIT 100000
[2019-08-27 13:59:32,272: INFO/MainProcess] Handling cursor
[2019-08-27 13:59:32,334: INFO/MainProcess] Current operation handles: [{'operation_id': {'guid': 'È\x88\x8e°u\x85OL\x84bÚãORÿ\x86', 'secret': 'f´\x10£a\x1eNI\x95ä@ø?ÝâÂ'}, 'operation_type': 0, 'has_result_set': True, 'modified_row_count': None}]
[2019-08-27 13:59:35,859: WARNING/MainProcess] An exception occured while fetching data, returning empty result
[2019-08-27 13:59:36,194: INFO/MainProcess] Storing results in results backend, key: 0368a910-337e-4d2a-b50a-f3c788322976
[2019-08-27 13:59:36,227: INFO/MainProcess] Task sql_lab.get_sql_results[95b522b4-2336-492a-9784-baadce9f1cc1] succeeded in 5.507471300006728s: None

At Spark level it's possible to see that the job is canceled by looking at the dashboard: http://localhost:4040/jobs:

Screenshot 2019-08-27 at 16 05 52

On the other hand some exception is not handled by gsc:

sourced-ui_1    | 2019-08-27 13:59:30,464:INFO:root:Parsing with sqlparse statement SELECT
sourced-ui_1    |     r.repository_id AS repo,
sourced-ui_1    |     c.commit_author_when AS date,
sourced-ui_1    |     cm.file_path AS path,
sourced-ui_1    |     UAST_EXTRACT(UAST(f.blob_content, 'Go', '//uast:Import/Path'), 'Value') AS imports
sourced-ui_1    | FROM
sourced-ui_1    |     repositories AS r
sourced-ui_1    |     NATURAL JOIN ref_commits AS rc
sourced-ui_1    |     NATURAL JOIN commits AS c
sourced-ui_1    |     NATURAL JOIN commit_files AS cm
sourced-ui_1    |     NATURAL JOIN files AS f
sourced-ui_1    | WHERE rc.ref_name = 'HEAD'
sourced-ui_1    |     AND f.file_path REGEXP('.*.go$')
sourced-ui_1    |     AND NOT IS_VENDOR(f.file_path)
sourced-ui_1    |     AND NOT IS_BINARY(f.blob_content)
sourced-ui_1    |     AND f.blob_size < 1000000
sourced-ui_1    | 2019-08-27 13:59:30,543:INFO:root:Parsing with sqlparse statement SELECT
sourced-ui_1    |     r.repository_id AS repo,
sourced-ui_1    |     c.commit_author_when AS date,
sourced-ui_1    |     cm.file_path AS path,
sourced-ui_1    |     UAST_EXTRACT(UAST(f.blob_content, 'Go', '//uast:Import/Path'), 'Value') AS imports
sourced-ui_1    | FROM
sourced-ui_1    |     repositories AS r
sourced-ui_1    |     NATURAL JOIN ref_commits AS rc
sourced-ui_1    |     NATURAL JOIN commits AS c
sourced-ui_1    |     NATURAL JOIN commit_files AS cm
sourced-ui_1    |     NATURAL JOIN files AS f
sourced-ui_1    | WHERE rc.ref_name = 'HEAD'
sourced-ui_1    |     AND f.file_path REGEXP('.*.go$')
sourced-ui_1    |     AND NOT IS_VENDOR(f.file_path)
sourced-ui_1    |     AND NOT IS_BINARY(f.blob_content)
sourced-ui_1    |     AND f.blob_size < 1000000
sourced-ui_1    | 2019-08-27 13:59:30,602:INFO:root:Triggering query_id: 2
sourced-ui_1    | 2019-08-27 13:59:30,617:INFO:root:Running query on a Celery worker
gsc_1           | 2019-08-27 13:59:31 INFO  ThriftCLIService:243 - Client protocol version: HIVE_CLI_SERVICE_PROTOCOL_V6
gsc_1           | 2019-08-27 13:59:31 INFO  SessionState:641 - Created HDFS directory: /tmp/hive/superset
gsc_1           | 2019-08-27 13:59:31 INFO  SessionState:641 - Created local directory: /tmp/d10bef94-f4ac-4a03-a7dd-79fb5fb23895_resources
gsc_1           | 2019-08-27 13:59:31 INFO  SessionState:641 - Created HDFS directory: /tmp/hive/superset/d10bef94-f4ac-4a03-a7dd-79fb5fb23895
gsc_1           | 2019-08-27 13:59:31 INFO  SessionState:641 - Created local directory: /tmp/root/d10bef94-f4ac-4a03-a7dd-79fb5fb23895
gsc_1           | 2019-08-27 13:59:31 INFO  SessionState:641 - Created HDFS directory: /tmp/hive/superset/d10bef94-f4ac-4a03-a7dd-79fb5fb23895/_tmp_space.db
gsc_1           | 2019-08-27 13:59:31 INFO  HiveSessionImpl:318 - Operation log session directory is created: /tmp/root/operation_logs/d10bef94-f4ac-4a03-a7dd-79fb5fb23895
gsc_1           | 2019-08-27 13:59:31 INFO  SparkExecuteStatementOperation:54 - Running query 'USE `default`' with 1ddc7292-c160-4d74-a7b0-f0c932a124d8
gsc_1           | 2019-08-27 13:59:32 INFO  CodeGenerator:54 - Code generated in 378.3003 ms
gsc_1           | 2019-08-27 13:59:32 INFO  DAGScheduler:54 - Asked to cancel job group 1ddc7292-c160-4d74-a7b0-f0c932a124d8
gsc_1           | 2019-08-27 13:59:32 INFO  SparkExecuteStatementOperation:54 - Running query 'SELECT
gsc_1           |     r.repository_id AS repo,
gsc_1           |     c.commit_author_when AS date,
gsc_1           |     cm.file_path AS path,
gsc_1           |     UAST_EXTRACT(UAST(f.blob_content, 'Go', '//uast:Import/Path'), 'Value') AS imports
gsc_1           | FROM
gsc_1           |     repositories AS r
gsc_1           |     NATURAL JOIN ref_commits AS rc
gsc_1           |     NATURAL JOIN commits AS c
gsc_1           |     NATURAL JOIN commit_files AS cm
gsc_1           |     NATURAL JOIN files AS f
gsc_1           | WHERE rc.ref_name = 'HEAD'
gsc_1           |     AND f.file_path REGEXP('.*.go$')
gsc_1           |     AND NOT IS_VENDOR(f.file_path)
gsc_1           |     AND NOT IS_BINARY(f.blob_content)
gsc_1           |     AND f.blob_size < 1000000 LIMIT 100000' with 19b28028-c118-4cff-8e9f-83887ad073a9
gsc_1           | 2019-08-27 13:59:33 INFO  CodeGenerator:54 - Code generated in 162.9486 ms
sourced-ui_1    | 2019-08-27 13:59:34,320:INFO:root:Requested stop query for client id `u3t9XvZtm`
sourced-ui_1    | 2019-08-27 13:59:34,335:INFO:root:Query retrieved with id `2`
sourced-ui_1    | 2019-08-27 13:59:34,393:INFO:root:Committed status change for query with id `2`
sourced-ui_1    | 2019-08-27 13:59:34,408:INFO:root:Query with id `2` has connection id `1`
sourced-ui_1    | 2019-08-27 13:59:34,430:INFO:root:Database.get_sqla_engine(). Masked URL: sparksql://gsc:10000/default
gsc_1           | 2019-08-27 13:59:34 INFO  CodeGenerator:54 - Code generated in 103.8353 ms
gsc_1           | 2019-08-27 13:59:34 INFO  CodeGenerator:54 - Code generated in 30.1023 ms
gsc_1           | 2019-08-27 13:59:34 INFO  ThriftCLIService:243 - Client protocol version: HIVE_CLI_SERVICE_PROTOCOL_V6
gsc_1           | 2019-08-27 13:59:35 INFO  SessionState:641 - Created local directory: /tmp/62dd5a06-063a-4128-8ec0-d762268c4249_resources
gsc_1           | 2019-08-27 13:59:35 INFO  SessionState:641 - Created HDFS directory: /tmp/hive/superset/62dd5a06-063a-4128-8ec0-d762268c4249
gsc_1           | 2019-08-27 13:59:35 INFO  SessionState:641 - Created local directory: /tmp/root/62dd5a06-063a-4128-8ec0-d762268c4249
gsc_1           | 2019-08-27 13:59:35 INFO  SessionState:641 - Created HDFS directory: /tmp/hive/superset/62dd5a06-063a-4128-8ec0-d762268c4249/_tmp_space.db
gsc_1           | 2019-08-27 13:59:35 INFO  HiveSessionImpl:318 - Operation log session directory is created: /tmp/root/operation_logs/62dd5a06-063a-4128-8ec0-d762268c4249
sourced-ui_1    | 2019-08-27 13:59:35,375:INFO:pyhive.hive:USE `default`
gsc_1           | 2019-08-27 13:59:35 INFO  SparkExecuteStatementOperation:54 - Running query 'USE `default`' with 21cd9a0e-bb90-4ab7-8ddd-d25feeabe492
gsc_1           | 2019-08-27 13:59:35 INFO  DAGScheduler:54 - Asked to cancel job group 21cd9a0e-bb90-4ab7-8ddd-d25feeabe492
sourced-ui_1    | 2019-08-27 13:59:35,471:INFO:root:Calling `cancel_query` on db engine
sourced-ui_1    | 2019-08-27 13:59:35,471:INFO:root:Cancelling query with id: `2`
sourced-ui_1    | 2019-08-27 13:59:35,472:INFO:root:Cancelling operation handle: `{'operation_id': {'guid': 'È\x88\x8e°u\x85OL\x84bÚãORÿ\x86', 'secret': 'f´\x10£a\x1eNI\x95ä@ø?ÝâÂ'}, 'operation_type': 0, 'has_result_set': True, 'modified_row_count': None}`
gsc_1           | 2019-08-27 13:59:35 INFO  SparkExecuteStatementOperation:54 - Cancel 'SELECT
gsc_1           |     r.repository_id AS repo,
gsc_1           |     c.commit_author_when AS date,
gsc_1           |     cm.file_path AS path,
gsc_1           |     UAST_EXTRACT(UAST(f.blob_content, 'Go', '//uast:Import/Path'), 'Value') AS imports
gsc_1           | FROM
gsc_1           |     repositories AS r
gsc_1           |     NATURAL JOIN ref_commits AS rc
gsc_1           |     NATURAL JOIN commits AS c
gsc_1           |     NATURAL JOIN commit_files AS cm
gsc_1           |     NATURAL JOIN files AS f
gsc_1           | WHERE rc.ref_name = 'HEAD'
gsc_1           |     AND f.file_path REGEXP('.*.go$')
gsc_1           |     AND NOT IS_VENDOR(f.file_path)
gsc_1           |     AND NOT IS_BINARY(f.blob_content)
gsc_1           |     AND f.blob_size < 1000000 LIMIT 100000' with 19b28028-c118-4cff-8e9f-83887ad073a9
gsc_1           | 2019-08-27 13:59:35 INFO  DAGScheduler:54 - Asked to cancel job group 19b28028-c118-4cff-8e9f-83887ad073a9
gsc_1           | 2019-08-27 13:59:35 INFO  SparkContext:54 - Starting job: run at AccessController.java:0
gsc_1           | 2019-08-27 13:59:35 INFO  ContextCleaner:54 - Cleaned accumulator 0
gsc_1           | 2019-08-27 13:59:35 ERROR SparkExecuteStatementOperation:91 - Error executing query, currentState CANCELED,
gsc_1           | java.lang.InterruptedException
gsc_1           | 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1302)
gsc_1           | 	at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
gsc_1           | 	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
gsc_1           | 	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153)
gsc_1           | 	at org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:222)
gsc_1           | 	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:633)
gsc_1           | 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
gsc_1           | 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
gsc_1           | 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
gsc_1           | 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
gsc_1           | 	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
gsc_1           | 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
gsc_1           | 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
gsc_1           | 	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
gsc_1           | 	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
gsc_1           | 	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:297)
gsc_1           | 	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278)
gsc_1           | 	at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2727)
gsc_1           | 	at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2727)
gsc_1           | 	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
gsc_1           | 	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
gsc_1           | 	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
gsc_1           | 	at org.apache.spark.sql.Dataset.collect(Dataset.scala:2727)
gsc_1           | 	at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:246)
gsc_1           | 	at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1$$anon$2.run(SparkExecuteStatementOperation.scala:175)
gsc_1           | 	at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1$$anon$2.run(SparkExecuteStatementOperation.scala:171)
gsc_1           | 	at java.security.AccessController.doPrivileged(Native Method)
gsc_1           | 	at javax.security.auth.Subject.doAs(Subject.java:422)
gsc_1           | 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
gsc_1           | 	at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1.run(SparkExecuteStatementOperation.scala:185)
gsc_1           | 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
gsc_1           | 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
gsc_1           | 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
gsc_1           | 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
gsc_1           | 	at java.lang.Thread.run(Thread.java:748)
gsc_1           | 2019-08-27 13:59:35 WARN  ThriftCLIService:630 - Error fetching results:
gsc_1           | org.apache.hive.service.cli.HiveSQLException: Expected state FINISHED, but found CANCELED
gsc_1           | 	at org.apache.hive.service.cli.operation.Operation.assertState(Operation.java:161)
gsc_1           | 	at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.getNextRowSet(SparkExecuteStatementOperation.scala:113)
gsc_1           | 	at org.apache.hive.service.cli.operation.OperationManager.getOperationNextRowSet(OperationManager.java:220)
gsc_1           | 	at org.apache.hive.service.cli.session.HiveSessionImpl.fetchResults(HiveSessionImpl.java:767)
gsc_1           | 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
gsc_1           | 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
gsc_1           | 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
gsc_1           | 	at java.lang.reflect.Method.invoke(Method.java:498)
gsc_1           | 	at org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:78)
gsc_1           | 	at org.apache.hive.service.cli.session.HiveSessionProxy.access$000(HiveSessionProxy.java:36)
gsc_1           | 	at org.apache.hive.service.cli.session.HiveSessionProxy$1.run(HiveSessionProxy.java:63)
gsc_1           | 	at java.security.AccessController.doPrivileged(Native Method)
gsc_1           | 	at javax.security.auth.Subject.doAs(Subject.java:422)
gsc_1           | 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
gsc_1           | 	at org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:59)
gsc_1           | 	at com.sun.proxy.$Proxy22.fetchResults(Unknown Source)
gsc_1           | 	at org.apache.hive.service.cli.CLIService.fetchResults(CLIService.java:455)
gsc_1           | 	at org.apache.hive.service.cli.thrift.ThriftCLIService.FetchResults(ThriftCLIService.java:621)
gsc_1           | 	at org.apache.hive.service.cli.thrift.TCLIService$Processor$FetchResults.getResult(TCLIService.java:1553)
gsc_1           | 	at org.apache.hive.service.cli.thrift.TCLIService$Processor$FetchResults.getResult(TCLIService.java:1538)
gsc_1           | 	at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
gsc_1           | 	at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
gsc_1           | 	at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:53)
gsc_1           | 	at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
gsc_1           | 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
gsc_1           | 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
gsc_1           | 	at java.lang.Thread.run(Thread.java:748)
gsc_1           | 2019-08-27 13:59:35 ERROR SparkExecuteStatementOperation:179 - Error running hive query:
gsc_1           | org.apache.hive.service.cli.HiveSQLException: Illegal Operation state transition from CANCELED to ERROR
gsc_1           | 	at org.apache.hive.service.cli.OperationState.validateTransition(OperationState.java:92)
gsc_1           | 	at org.apache.hive.service.cli.OperationState.validateTransition(OperationState.java:98)
gsc_1           | 	at org.apache.hive.service.cli.operation.Operation.setState(Operation.java:126)
gsc_1           | 	at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:266)
gsc_1           | 	at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1$$anon$2.run(SparkExecuteStatementOperation.scala:175)
gsc_1           | 	at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1$$anon$2.run(SparkExecuteStatementOperation.scala:171)
gsc_1           | 	at java.security.AccessController.doPrivileged(Native Method)
gsc_1           | 	at javax.security.auth.Subject.doAs(Subject.java:422)
gsc_1           | 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
gsc_1           | 	at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1.run(SparkExecuteStatementOperation.scala:185)
gsc_1           | 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
gsc_1           | 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
gsc_1           | 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
gsc_1           | 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
gsc_1           | 	at java.lang.Thread.run(Thread.java:748)
gsc_1           | 2019-08-27 13:59:35 INFO  DAGScheduler:54 - Registering RDD 2 (run at AccessController.java:0)
gsc_1           | 2019-08-27 13:59:35 INFO  DAGScheduler:54 - Got job 0 (run at AccessController.java:0) with 1 output partitions
gsc_1           | 2019-08-27 13:59:35 INFO  DAGScheduler:54 - Final stage: ResultStage 1 (run at AccessController.java:0)
gsc_1           | 2019-08-27 13:59:35 INFO  SparkExecuteStatementOperation:54 - Result Schema: StructType(StructField(repo,StringType,true), StructField(date,TimestampType,true), StructField(path,StringType,true), StructField(imports,ArrayType(StringType,true),true))
gsc_1           | 2019-08-27 13:59:35 INFO  DAGScheduler:54 - Parents of final stage: List(ShuffleMapStage 0)
gsc_1           | 2019-08-27 13:59:35 INFO  DAGScheduler:54 - Missing parents: List(ShuffleMapStage 0)
gsc_1           | 2019-08-27 13:59:35 INFO  DAGScheduler:54 - Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at run at AccessController.java:0), which has no missing parents
gsc_1           | 2019-08-27 13:59:36 INFO  DAGScheduler:54 - Asked to cancel job group 19b28028-c118-4cff-8e9f-83887ad073a9
gsc_1           | 2019-08-27 13:59:36 INFO  MemoryStore:54 - Block broadcast_0 stored as values in memory (estimated size 6.7 KB, free 366.3 MB)
gsc_1           | 2019-08-27 13:59:36 INFO  MemoryStore:54 - Block broadcast_0_piece0 stored as bytes in memory (estimated size 3.8 KB, free 366.3 MB)
gsc_1           | 2019-08-27 13:59:36 INFO  BlockManagerInfo:54 - Added broadcast_0_piece0 in memory on bc56b54576bf:38809 (size: 3.8 KB, free: 366.3 MB)
gsc_1           | 2019-08-27 13:59:36 INFO  SparkContext:54 - Created broadcast 0 from broadcast at DAGScheduler.scala:1039
gsc_1           | 2019-08-27 13:59:36 INFO  DAGScheduler:54 - Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at run at AccessController.java:0) (first 15 tasks are for partitions Vector(0))
gsc_1           | 2019-08-27 13:59:36 INFO  TaskSchedulerImpl:54 - Adding task set 0.0 with 1 tasks
gsc_1           | 2019-08-27 13:59:36 INFO  TaskSetManager:54 - Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 10396 bytes)
gsc_1           | 2019-08-27 13:59:36 INFO  TaskSchedulerImpl:54 - Cancelling stage 0
gsc_1           | 2019-08-27 13:59:36 INFO  Executor:54 - Executor is trying to kill task 0.0 in stage 0.0 (TID 0), reason: Stage cancelled
gsc_1           | 2019-08-27 13:59:36 INFO  TaskSchedulerImpl:54 - Stage 0 was cancelled
gsc_1           | 2019-08-27 13:59:36 INFO  Executor:54 - Running task 0.0 in stage 0.0 (TID 0)
gsc_1           | 2019-08-27 13:59:36 INFO  DAGScheduler:54 - ShuffleMapStage 0 (run at AccessController.java:0) failed in 0.543 s due to Job 0 cancelled part of cancelled job group 19b28028-c118-4cff-8e9f-83887ad073a9
gsc_1           | 2019-08-27 13:59:36 INFO  Executor:54 - Fetching spark://bc56b54576bf:36451/jars/gitbase-spark-connector-enterprise-uber.jar with timestamp 1566914245093
gsc_1           | 2019-08-27 13:59:36 INFO  TransportClientFactory:267 - Successfully created connection to bc56b54576bf/172.31.0.9:36451 after 88 ms (0 ms spent in bootstraps)
gsc_1           | 2019-08-27 13:59:36 INFO  Utils:54 - Fetching spark://bc56b54576bf:36451/jars/gitbase-spark-connector-enterprise-uber.jar to /tmp/spark-8ac04fdd-1031-4d4f-9e0f-5db04a411b25/userFiles-b7f9e9dc-0a08-450c-995a-bc06c59ac55d/fetchFileTemp4891356638895890777.tmp
gsc_1           | 2019-08-27 13:59:37 INFO  Executor:54 - Adding file:/tmp/spark-8ac04fdd-1031-4d4f-9e0f-5db04a411b25/userFiles-b7f9e9dc-0a08-450c-995a-bc06c59ac55d/gitbase-spark-connector-enterprise-uber.jar to class loader
gsc_1           | 2019-08-27 13:59:37 INFO  Executor:54 - Executor killed task 0.0 in stage 0.0 (TID 0), reason: Stage cancelled
gsc_1           | 2019-08-27 13:59:37 WARN  TaskSetManager:66 - Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): TaskKilled (Stage cancelled)
gsc_1           | 2019-08-27 13:59:37 INFO  TaskSchedulerImpl:54 - Removed TaskSet 0.0, whose tasks have all completed, from pool

  • I have updated the CHANGELOG file according to the conventions in keepachangelog.com
  • This PR contains changes that do not require a mention in the CHANGELOG file

@se7entyse7en
Copy link
Contributor Author

/cc @src-d/data-processing for error logs of gsc.

connection of the cursor, but that's not an integer, and here an
integer is required."""

return 1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative would be to change this condition.

@@ -37,7 +37,7 @@ fi
if [ "$#" -ne 0 ]; then
exec "$@"
elif [ "$SUPERSET_ENV" = "development" ]; then
celery worker --app=superset.sql_lab:celery_app --pool=gevent -Ofair &
celery worker --app=superset.sql_lab:celery_app --logfile=/tmp/celery.log --loglevel=INFO --pool=gevent -Ofair &
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may worth to expose logfile and loglevel as env vars, WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's do it in separate PR

@@ -37,7 +37,7 @@ fi
if [ "$#" -ne 0 ]; then
exec "$@"
elif [ "$SUPERSET_ENV" = "development" ]; then
celery worker --app=superset.sql_lab:celery_app --pool=gevent -Ofair &
celery worker --app=superset.sql_lab:celery_app --logfile=/tmp/celery.log --loglevel=INFO --pool=gevent -Ofair &
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we redirect it to stdout of the container instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to not mix it with the application's logs. But I agree that it would make logs much more accessible. What I would do is to redirect to stdout, but to make celery run in a separate container. Do you see any particular reason for which it needs to be run in the same container?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can consider moving celery to separate container. I don't see any disadvantages except +1 container. But let's do it in a separate issue.
Do you mind to just revert this change for now?
One of the problems I see with it - INFO is very verbose and I'm not sure if log rotation works correctly in this case. If not, /tmp/celery.log would grow very large fast and at the end would crash container with out of disk space error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #269 to discuss about moving celery to a separate container.

@@ -1416,6 +1421,74 @@ class SparkSQLEngineSpec(HiveEngineSpec):

engine = 'sparksql'

@classmethod
def _dumps_operation_handle(cls, op_handle):
return {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't check how pyhive types are defined but most probably you can just use __dict__ attribute instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't work because op_handle.operationId is an instance of hive.ttypes.THandleIdentifier. One way would be to define a custom json encoder/decoder, but IMO it doesn't worth the effort.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about?

dict(op_handle.__dict__, operation_id=op_handle.operationId.__dict__)

or

dict(op_handle.__dict__, operation_id={
    'guid': op_handle.operationId.guid.decode('ISO-8859-1'),
    'secret': op_handle.operationId.secret.decode('ISO-8859-1'),
})

if decoding is required.

The idea is remove direct dependency on TOperationHandle internals.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks much nicer!!!

@@ -1151,7 +1151,12 @@ def fetch_data(cls, cursor, limit):
raise Exception('Query error', state.errorMessage)
try:
return super(HiveEngineSpec, cls).fetch_data(cursor, limit)
except pyhive.exc.ProgrammingError:
except (pyhive.exc.ProgrammingError, pyhive.exc.OperationalError):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it's a good idea. Let's check, in which cases OperationalError is returned. I believe that should be some cases when we should return it to a user/log instead of just skipping it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's check, in which cases OperationalError is returned.

Actually, I didn't check the other cases when this error is raised, and I agree that in some cases we should return it.

My idea was this to be temporary until gsc handles this correctly without raising an exception on cancelation. I could have a look at the specific exception raised during cancelation and skip that one only. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's possible to catch only cancelation error it would be much better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed here.

# The `pyhive.exc.OperationalError` exception is raised when a
# query is cancelled. This seems to happen because `gsc` expects
# the state to be `FINISHED` but got `CANCELED`.
cancelation_error_msg = 'Expected state FINISHED, but found CANCELED'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add FIXME comment as it's needed only while gsc doesn't support CANCELED.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done here.

@se7entyse7en
Copy link
Contributor Author

I'm gonna address DCO and rebase after approvals.

@se7entyse7en
Copy link
Contributor Author

[private] https://src-d.slack.com/archives/C7TB5NEDN/p1567074879029100

This PR only sends the request to cancel the query downstream. To also have the underlying query on gitbase stopped, at least gitbase:v0.24.0-beta3 is needed.

@se7entyse7en se7entyse7en force-pushed the spark-query-cancellation branch from 0c06f07 to 0bc3296 Compare August 29, 2019 16:34
@se7entyse7en
Copy link
Contributor Author

se7entyse7en commented Aug 29, 2019

Rebased and force-pushed. After rebasing, the cancellation request wasn't being sent, I needed to add this. The reason is that some lines later there's a refresh of the query object that makes the connection_id to get lost.

I tried with gitbase:v0.24.0-beta3 before rebasing and it seemed that the underlying query was getting canceled correctly. Now it seems that even if gsc receives the cancel request, it still waits for gitbase to finish the query ([private] https://src-d.slack.com/archives/C7UDG0GP6/p1567092636042600). But this behavior shouldn't be affected by the rebase.

Need further investigation 😞.

@smacker
Copy link
Contributor

smacker commented Aug 29, 2019

All linters are failing.

Signed-off-by: Lou Marvin Caraig <[email protected]>
@se7entyse7en
Copy link
Contributor Author

se7entyse7en commented Aug 30, 2019

Hi need some help here, especially from @src-d/data-processing team. Here's what currently happens using this PR for sourced-ui and gitbase:v0.24.0-beta3 and gsc:v0.6.0:

  1. I run the query from sourced-ui

Screenshot 2019-08-30 at 11 06 46

  1. gsc receives the query

Screenshot 2019-08-30 at 11 07 07

  1. gitbase receives the query

Screenshot 2019-08-30 at 11 08 56

  1. I stop the query, and sourced-ui sends the cancel request

Screenshot 2019-08-30 at 11 18 41

  1. The cancel request arrives to gsc

Screenshot 2019-08-30 at 11 10 10

  1. After 3/4 mins gitbase logs the following and gsc kills the task (AFAIU this means that gitbase succesfully ended the query, correct me if I'm wrong)

Screenshot 2019-08-30 at 11 11 21

The problem is that between point 5 and point 6 if run SHOW PROCESSLIST in gitbase I can still see the query running. And the status of the spark job is not very clear. In the Spark UI it says Status: FAILED, it has a failed stage, with a cancel as a failure reason, but also says that one task is running:

Screenshot 2019-08-30 at 10 58 46

After point 6, running SHOW PROCESSLIST in gitbase shows no query. And also the Spark UI shows the task as killed:

Screenshot 2019-08-30 at 13 38 43

So it seems like the Spark task was killed after gitbase finished the query. Is this the expected behavior? Am I missing something or misinterpreting the logs?

In the layers above gsc everything seems fine: the celery task exits normally, and the UI alerts the user that the query has been successfully stopped.

@se7entyse7en
Copy link
Contributor Author

Okay, I asked @ajnavarro and to recap Spark cancelation is not instant, and gitbase simply checks for the tcp connection being open, so this is the most we can do so far: [private] slack thread.

@se7entyse7en
Copy link
Contributor Author

@smacker do you understand what pylint is complaining here?

@smacker
Copy link
Contributor

smacker commented Aug 30, 2019

@se7entyse7en somehow pylint thinks that pyhive.exc.OperationalError.args[0] is a string.

I found similar issue but it's an opposite to what we see. And I don't see any relevant issue in astroid repository.

Anyway, pylint shouldn't be able to interfer the type of the arg[0] in this case. It can be anything. And that anything doesn't have to have status attribute.
Most probably we can just silence this error if we know for sure there is always an object with status field.

Signed-off-by: Lou Marvin Caraig <[email protected]>
Signed-off-by: Lou Marvin Caraig <[email protected]>
@se7entyse7en
Copy link
Contributor Author

Fixed requirement with the proper release here.

@se7entyse7en
Copy link
Contributor Author

To fix test coverage we would need to add tests for SparkSQLEngineSpec, but this means essentially mocking everything or adding Spark for testing. The former doesn't make much sense IMO, if we want to do the latter I'd open a separate issue.

@smacker
Copy link
Contributor

smacker commented Sep 5, 2019

Let's just merge. I don't see a point to mock everything. We can add testing on spark instead of mysql in gsc branch. Though we don't have a test for cancelation on mysql too.

@se7entyse7en se7entyse7en merged commit 2d0723f into src-d:master Sep 5, 2019
@se7entyse7en se7entyse7en deleted the spark-query-cancellation branch September 5, 2019 11:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Kill Spark Query
3 participants