Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Guide/readme/example for using with AWS Glue ETL job #82

Open
leeprevost opened this issue Feb 28, 2024 · 12 comments
Open

Guide/readme/example for using with AWS Glue ETL job #82

leeprevost opened this issue Feb 28, 2024 · 12 comments

Comments

@leeprevost
Copy link

I wonder if you could make suggestions on how to use this in an AWS glue job. My method does not involve using spark-submit but rather creating job definitions and run-job using boto3 tools.

When I try to use this in my script, i get:
pyspark.sql.utils.IllegalArgumentException: Compression codec nl.basjes.hadoop.io.compress.SplittableGzipCodec not found.

have tried passing --conf nl.basjes.hadoop.io.compress.SplittableGzipCodec, -packages nl.basjes.hadoop.io.compress.SplittableGzipCodec and other methods as args to job to no avail. I think I must need to put a copy of the codec on s3 and point to it with extra-files or other arg?

@nielsbasjes
Copy link
Owner

First off: I have never used AWS at all so I have no experience with any of the tools you mention. In addition my Python and Spark knowledge is negligible. I simply use different tools.

Now this library was written in Java and explicitly only hooks into specific Hadoop APIs which are also used by Spark.

See: https://github.com/nielsbasjes/splittablegzip/blob/main/README-Spark.md

So to use this you will need:

  • Some JVM based framework that uses the Hadoop APIs for reading compressed files (for example MapReduce and Spark).
    • I see pyspark in your error so I suspect that should work.
  • Make sure that this code is available in the JVM that actually does the work (i.e. this jar must be made available to the JVM code running AWS).
    • I have no idea on how to achieve this in your case. Normally it would be added as a dependency to the application, but since you are doing Python I have no clue on how to do this.
  • And you must configure the Hadoop code to not use the default Gzip codec and use this implementation instead.
    • Specifically the io.compression.codecs must be nl.basjes.hadoop.io.compress.SplittableGzipCodec

This is all I have for you.
Good luck in getting it to work.

If you have figured it out I'm willing to add your insights to the documentation.

@leeprevost
Copy link
Author

Thanks for the response. I think I am good on your bullet 1 and 3 within my scripts (yes, using pyspark). But, on item 2, I'm struggling with the following:

AWS Glue requires passing an --extra-jars flag and an s3 path to the "jars." So, I'm developing these scripts in python using pyspark. i use windows. So, not familiar with java, "jars" or even Maven, at all. My assumption is that Maven is to java as "pip" is to python. I don't think Glue will install from the maven repo so I think I need to download the "jars" files to my s3 path and just point to them.

I see the java in your repo but am not sure how to determine what I need to satifsy AWS Glue's "--extra-jars" option. Does that make sense?

@leeprevost
Copy link
Author

Further, from AWS docs,
--extra-jars
The Amazon S3 paths to additional Java .jar files that AWS Glue adds to the Java classpath before executing your script. Multiple values must be complete paths separated by a comma (,).

Again, thank you for your help.

@leeprevost
Copy link
Author

So, even further reducing my question, I think all I need is to get some of these (which) on my s3 and add the extra jars path. Again, don't have maven installed on windows machine and have zero java experience. do I need them all?

https://repo1.maven.org/maven2/nl/basjes/hadoop/splittablegzip/1.3/

@nielsbasjes
Copy link
Owner

Maven is just a tool to manage the build process of a Java based system. A jar is nothing more than a file with some "ready to run" java code.

The maven.org site is just a site where you can reliably download such jar files.

For you project you only need to download this single file and put it on S3
https://repo1.maven.org/maven2/nl/basjes/hadoop/splittablegzip/1.3/splittablegzip-1.3.jar
That is the file you specify under extra jars.

@leeprevost
Copy link
Author

Thank you. I was making it much harder than necessary. Am testing now and will report back.

@leeprevost
Copy link
Author

Ok, I added these two parameters to my jobs definition:

 '--extra-jars': "s3://aws-glue-assets-[my account num]-us-east-1/jars/",   # path to the splittablegzip-1.3.jar file
 '--user-jars-first': "true",

I then added this to my script:
spark.conf.set('spark.sql.files.maxPartitionBytes', 1000 * (1024 ** 2))
And used this as my read/load:

options = {
            "sep" :'\t',
            "header" : 'false',
            "io.compression.codecs ": 'nl.basjes.hadoop.io.compress.SplittableGzipCodec'

        }
        df = spark.read.load(
            input_paths,
            format="csv",
            schema=schema,
            **options,
        )
       

in my logs, I see the job starts and reports passed args and it reports my show statment from my lazy load. But, when it goes to write the resulting large file, the job shuts down with a series of warnings and then a shut down failure:

in reverse order

24/02/29 19:43:39 ERROR ExecutorTaskManagement: threshold for executors failed after launch reached
24/02/29 19:43:39 WARN ExecutorTaskManagement: executor status Success(FAILED) for 10 g-fab6a5b5deb98bda8ca701f28cfc65a98dfa965d
24/02/29 19:43:39 WARN ExecutorTaskManagement: executor status Success(FAILED) for 9 g-8d96cb3851173fc44c5684822792da8e639f5f73
24/02/29 19:43:39 INFO TaskGroupInterface: getting status for executor task g-fab6a5b5deb98bda8ca701f28cfc65a98dfa965d
24/02/29 19:43:39 INFO ExecutorTaskManagement: polling for executor task status
24/02/29 19:43:39 INFO ExecutorTaskManagement: polling 2 pending JES executor tasks for status
24/02/29 19:43:39 INFO TaskGroupInterface: getting status for executor task g-8d96cb3851173fc44c5684822792da8e639f5f73
24/02/29 19:43:39 INFO JESSchedulerBackend: polling for JES task status
24/02/29 19:43:35 INFO MultipartUploadOutputStream: close closed:false s3://aws-glue-assets-997744839392-us-east-1/sparkHistoryLogs/spark-application-1709235518164.inprogress
24/02/29 19:43:35 INFO MultipartUploadOutputStream: close closed:false s3://aws-glue-assets-997744839392-us-east-1/sparkHistoryLogs/jr_e7e6cc777685fee21dca25913f3be5ef4c6f024ffb843747f30004c11cb0e5c6.inprogress
24/02/29 19:43:35 INFO LogPusher: uploading /tmp/spark-event-logs/ to s3://aws-glue-assets-997744839392-us-east-1/sparkHistoryLogs/
24/02/29 19:43:31 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/02/29 19:43:16 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

I dont' think I've changed anything else in my script which successfully runs (but very slow due to large gz files). Now seems to not run. I suspect:

  1. Am not successfully passing the right pointer to the new codec? Not sure how nl.basjes.hadoop.io.compress.SplittableGzipCode gets resolved to the splittablegzip-1.3.jar file?
  2. Searching this error, looks like mostly arond OOM errors. But, surely if it ran without the splittable codec, it would run better and within memory limits now with the parallelization.
  3. spark does seem to run up until the write so doesn't appear to be something on the init.

@leeprevost
Copy link
Author

OK, am reporting back that I commented out the changes above and script is running fine but with everything loaded on one executor ,not parallelization, and slow! So, something about the write statement that causes job to fail but using same write

        (df.write
         .partitionBy(*partitions.keys())
         .option("compression", "gzip")
         .option("path", output_paths[parqdb])
         .option("maxRecordsPerFile", "100000000")
         .mode("overwrite")
         .format("parquet")
         .save()
         ) 

have tries with and without the maxRecordsPerFile option

@leeprevost
Copy link
Author

Thinking about this some more:
Am wondering if the last post on this thread on the spark jira is the answer:

https://issues.apache.org/jira/browse/SPARK-29102

or, AWS glue has a capaqbility to install python libraries using PIP, the equivalent of Maven. But don't see a similar capability to kickoff a maven install. Only the way to pass the jar file using --extra-jars like above. However, am seeing some things where for exmaple Glue can be configured for a delta lake using spark.conf settings.

I don't use "spark-submit" for an aws glue job so therefore can't pass -- package arg.

Hoping you see anything in this madness!

@leeprevost
Copy link
Author

This looks promising.

so question

again, I see I need extra jars with pointer to the jar file on s3. No problem there. But in the config statement, I can pass what your guide says to pass to —packages. But again, I don’t see how the two resolve.

@leeprevost
Copy link
Author

leeprevost commented Mar 1, 2024

OK, I think I'm getting very close but job still failing on my read statement with:

raise error_class(parsed_response, operation_name)
botocore.errorfactory.IllegalSessionStateException: An error occurred (IllegalSessionStateException) when calling the GetStatement operation: Session bd9fb206-9f6b-49d6-897a-c30e0771e0fc unavailable, fail to call ReplServer

On startup, my spark session seems to initialize properly including recognition of my jar files directory:

Current idle_timeout is None minutes.
idle_timeout has been set to 120 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 3
Extra jars to be included:
s3://aws-glue-assets-XXXXXXXXXX-us-east-1/jars/
s3://aws-glue-assets-XXXXXXXXXX-us-east-1/jars/
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 3
Session ID: bd9fb206-9f6b-49d6-897a-c30e0771e0fc
Applying the following default arguments:
--glue_kernel_version 1.0.4
--enable-glue-datacatalog true
--user-jars-first true
--extra-jars s3://aws-glue-assets-XXXXXXXXXXXXX-us-east-1/jars/
Waiting for session bd9fb206-9f6b-49d6-897a-c30e0771e0fc to get into ready status...
Session bd9fb206-9f6b-49d6-897a-c30e0771e0fc has been created.

The s3 pointer on the --extra-jars flag is where I have uploaded splittablegzip-1.3.jar

I then attempt to set config but get an error:

spark.conf.set('spark.sql.files.maxPartitionBytes', 1000 * (1024 ** 2))
spark.conf.set('spark.jars.packages', 'io.compression.codecs:nl.basjes.hadoop.io.compress.SplittableGzipCodec')

>>AnalysisException:  Cannot modify the value of a Spark config: spark.jars.packages. See also 'https://spark.apache.org/docs/latest/sql-migration-guide.html#ddl-statements' 

And, when I run the read statement, I get the error above:


options = {
            "sep" :'\t',
            "header" : 'false',
            "io.compression.codecs": 'nl.basjes.hadoop.io.compress.SplittableGzipCodec'

        }
df = spark.read.load(
    sm_file,
    format="csv",
   # schema=schema,
    **options,
).count()

so, its got to be something wrong with my second conf.set statement above ....

@leeprevost
Copy link
Author

Related post for help SO

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants