Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem with writing RecordIO format, section 'The Amazon Record format' in README.md file probably needs a change - an old import from mllib (should be ml) #131

Open
borysrybak opened this issue Dec 2, 2020 · 0 comments

Comments

@borysrybak
Copy link

borysrybak commented Dec 2, 2020

System Information

  • Spark/PySpark version: v.2.4.4
  • Working in Jupyter Notebook

Describe the problem

I was trying to write the dataframe object into s3 by following the formatting instructions:

(...) Writing a DataFrame using the "sagemaker" format serializes a column named "label", expected to contain Doubles, and a column named "features", expected to contain a Sparse or Dense org.apache.mllib.linalg.Vector. If the features column contains a SparseVector, SageMaker Spark sparsely-encodes the Vector into the Amazon Record. If the features column contains a DenseVector, SageMaker Spark densely-encodes the Vector into the Amazon Record. (...)

and by executing exact the code as follows:

myDataFrame.write
    .format("sagemaker")
    .option("labelColumnName", "myLabelColumn")
    .option("featuresColumnName", "myFeaturesColumn")
    .save("s3://my-s3-bucket/my-s3-prefix")

In my case, I did prepare the dataset with the schema that was described in the section I mentioned before, so my data types were like:

  • 'myFeatures':'pyspark.mllib.linalg.VectorUDT'
  • 'myLabels':'pyspark.sql.types.DoubleType'

but whenever I was executing the code:

df.write
    .format("sagemaker")
    .option("labelColumnName", "myLabels")
    .option("featuresColumnName", "myFeatures")
    .save("s3://...")

everytime I was receiving such errors:

scala.MatchError: (11,[0,3,7,10],[1.0,1.0,1.0,1.0]) (of class org.apache.spark.mllib.linalg.SparseVector)

I double-checked on the format of my SparseVector and I was sure that the Vector type that was prepared was explicitly defined and it was a SparseVector from mllib.linalg package

After a few tests and experiments, it turned out that the correct dtype of the feature shouldn't be the SparseVector from mllib.linalg, but from ml.linalg :)

this is the correct/working schema I used and it abled me to write the dataframe into s3 in RecordIO format:
'features': pyspark.ml.linalg.VectorUDT,
'label': pyspark.sql.types.DoubleType,


mwe:

from pyspark.mllib.linalg import VectorUDT, SparseVector

df = spark.createDataFrame([
    (1.0, SparseVector(4, {1: 1.0, 3: 5.5})),
    (0.0, SparseVector(4, {1: 1.0, 2: 5.5}))
], ['labels', 'features'])

df.show():

+------+-------------------+
|labels|           features|
+------+-------------------+
|   1.0|(4,[1,3],[1.0,5.5])|
|   0.0|(2,[1,2],[1.0,5.5])|
+------+-------------------+

{s.name: type(s.dataType) for s in df.schema}:

{'features': pyspark.mllib.linalg.VectorUDT,
 'labels': pyspark.sql.types.DoubleType}

and after executing:

df.write.format(
    "sagemaker"
).option(
    "labelColumnName", "labels"
).option(
    "featuresColumnName", "features"
).save(
    my_path
)

log:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-61-58bc3a4fb49d> in <module>()
      6     "featuresColumnName", "features"
      7 ).save(
----> 8     my_path
      9 )

/usr/lib/spark/python/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options)
    736             self._jwrite.save()
    737         else:
--> 738             self._jwrite.save(path)
    739 
    740     @since(1.4)

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o8427.save.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:83)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 63 in stage 309.0 failed 4 times, most recent failure: Lost task 63.3 in stage 309.0 (TID 15086, ip-10-128-40-43.eu-west-1.compute.internal, executor 19): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: scala.MatchError: (2,[1,2],[1.0,5.5]) (of class org.apache.spark.mllib.linalg.SparseVector)
	at com.amazonaws.services.sagemaker.sparksdk.protobuf.ProtobufConverter$.rowToProtobuf(ProtobufConverter.scala:69)
	at com.amazonaws.services.sagemaker.sparksdk.protobuf.SageMakerProtobufWriter.write(SageMakerProtobufWriter.scala:84)
	at com.amazonaws.services.sagemaker.sparksdk.protobuf.SageMakerProtobufWriter.write(SageMakerProtobufWriter.scala:72)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
	... 10 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2041)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2029)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2028)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2028)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2262)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2211)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2200)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
	... 32 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: scala.MatchError: (2,[1,2],[1.0,5.5]) (of class org.apache.spark.mllib.linalg.SparseVector)
	at com.amazonaws.services.sagemaker.sparksdk.protobuf.ProtobufConverter$.rowToProtobuf(ProtobufConverter.scala:69)
	at com.amazonaws.services.sagemaker.sparksdk.protobuf.SageMakerProtobufWriter.write(SageMakerProtobufWriter.scala:84)
	at com.amazonaws.services.sagemaker.sparksdk.protobuf.SageMakerProtobufWriter.write(SageMakerProtobufWriter.scala:72)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
	... 10 more


and once created with ml.linalg:

from pyspark.ml.linalg import VectorUDT, SparseVector

df = spark.createDataFrame([
    (1.0, SparseVector(4,[1,3],[1.0,5.5])),
    (0.0, SparseVector(2,[1,2],[1.0,5.5]))
], ['labels', 'features'])
df.show()

+------+-------------------+
|labels|           features|
+------+-------------------+
|   1.0|(4,[1,3],[1.0,5.5])|
|   0.0|(4,[1,3],[1.0,5.5])|
+------+-------------------+

{s.name: type(s.dataType) for s in df.schema}
->
{'features': pyspark.ml.linalg.VectorUDT,
 'labels': pyspark.sql.types.DoubleType}

df.write.format(
    "sagemaker"
).option(
    "labelColumnName", "labels"
).option(
    "featuresColumnName", "features"
).save(
    my_path
)

everything went ok and the data has been written into s3.


my suggestion is to change/update the imports that have been used as an example in that section - it may generate some confusions :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant