Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to update salesforce object (Task not serializable) #63

Open
it-am opened this issue Dec 9, 2020 · 2 comments
Open

Unable to update salesforce object (Task not serializable) #63

it-am opened this issue Dec 9, 2020 · 2 comments

Comments

@it-am
Copy link

it-am commented Dec 9, 2020

Hi,

able to read from SalesForce object using below code:

salesforce_dataframe = spark.read.format("com.springml.spark.salesforce").option("soql","select id,name from account").option("username", username).option("password", password_and_token).option("bulk", True).option("sfObject", "acccount").load()
salesforce_dynamicframe = DynamicFrame.fromDF(salesforce_dataframe, glueContext, "salesforce_dataframe")
s3_datasink = glueContext.write_dynamic_frame.from_options(frame = salesforce_dynamicframe, connection_type = "s3", connection_options = {"path": s3_staging_bucket},format = format)

But unfortunately NOT able to update SalesForce object using following code in AWS Glue Job:

ga = glueContext.create_dynamic_frame_from_options("s3", {'paths': [s3_staging_bucket]}, format=format)
df = ga.toDF()
df.write.format("com.springml.spark.salesforce").option("username", username).option("password", password_and_token).option("sfObject", source_object).save()

Getting following error message in AWS Glue Job:

Traceback (most recent call last):
  File "/tmp/main.py", line 56, in <module>
    df.write.format("com.springml.spark.salesforce").option("username", username).option("password", password_and_token).option("sfObject", source_object).save()
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 732, in save
    self._jwrite.save()
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)

py4j.protocol.Py4JJavaError: An error occurred while calling o88.save.: org.apache.spark.SparkException: Task not 
serializable	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)	at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)	at 
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)	at 
org.apache.spark.SparkContext.clean(SparkContext.scala:2326)	at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)	at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)	at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)	at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)	at 
org.apache.spark.rdd.RDD.withScope(RDD.scala:363)	at 
org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)	at 
com.springml.spark.salesforce.SFObjectWriter.writeData(SFObjectWriter.scala:36)	at 
com.springml.spark.salesforce.DefaultSource.updateSalesforceObject(DefaultSource.scala:166)	at 
com.springml.spark.salesforce.DefaultSource.createRelation(DefaultSource.scala:139)	at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)	at
 org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)	at
 org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)	at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)	at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)	at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)	at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)	at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)	at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)	at 
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)	at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)	at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)	at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)	at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)	at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)	at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)	at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)	at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)	at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)	at 
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)	at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)	at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)	at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)	at 
java.lang.reflect.Method.invoke(Method.java:498)	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)	at
 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)	at py4j.Gateway.invoke(Gateway.java:282)	at 
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)	at 
py4j.commands.CallCommand.execute(CallCommand.java:79)	at 
py4j.GatewayConnection.run(GatewayConnection.java:238)	at java.lang.Thread.run(Thread.java:748)Caused by: 
java.io.NotSerializableException: com.springml.salesforce.wave.impl.BulkAPIImplSerialization stack:	- object not 
serializable (class: com.springml.salesforce.wave.impl.BulkAPIImpl, value: 
com.springml.salesforce.wave.impl.BulkAPIImpl@6aa40f4f)	- field (class: com.springml.spark.salesforce.SFObjectWriter,
 name: bulkAPI, type: interface com.springml.salesforce.wave.api.BulkAPI)	- object (class 
com.springml.spark.salesforce.SFObjectWriter, com.springml.spark.salesforce.SFObjectWriter@6f0231a0)	- field (class:
 com.springml.spark.salesforce.SFObjectWriter$$anonfun$writeData$1, name: $outer, type: class 
com.springml.spark.salesforce.SFObjectWriter)	- object (class 
com.springml.spark.salesforce.SFObjectWriter$$anonfun$writeData$1, <function2>)	at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)	at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)	at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)	at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)	... 43 more

Could you please advise?

@dkennedy-eng
Copy link

Hey @it-am, I can see you were able to get your bulk option working. What change did you make?

@Mounisha26
Copy link

Hi @it-am - I am facing the same error while updating the salesforce object. Any luck on fixing the issue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants