Categories
Mastering Development

pyspark writing jdbc times out

So basically I am using pyspark (jdbc format) to read tables in form a database and then write that data to an Azure Data Lake. The code that I’ve written works, except for the very large tables (400k rows, 50 cols) with the following error:

Py4JJavaError: An error occurred while calling o94.parquet.
: org.apache.spark.SparkException: Job aborted.

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 in stage 2.0 failed 1 times, most recent failure: Lost task 23.0 in stage 2.0 (TID 25, 2612a419099c, executor driver): com.microsoft.sqlserver.jdbc.SQLServerException: SQL Server returned an incomplete response. The connection has been closed.

I assumed this issue was due to not enough allocated memory, so I increased the executor and driver memory to 10g each. However, the problem persisted. Here is my code:

spkConfig = SparkConf() \
    .setAppName(appName) \
    .setMaster(master) \
    .set(f"fs.azure.account.auth.type.{azStorageAccount}.dfs.core.windows.net", "SharedKey") \
    .set(f"fs.azure.account.key.{azStorageAccount}.dfs.core.windows.net", azStorageKey) \
    .set("spark.executor.memory", "10g") \
    .set("spark.driver.memory", "10g") \
    .set("spark.cores.max", "5")
spkContext = SparkContext(conf=spkConfig)
sqlContext = SQLContext(spkContext)
spark = sqlContext.sparkSession

##
# Read table from DB to Dataframe partitioned on ID
##
def read_data(tableName, partitionCol, dbQuery, partitionSize, partitionUpperBound):  
    jdbcDF = spark.read.format("jdbc") \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .option("url", f"jdbc:sqlserver://{dbHost}:{dbPort};databaseName={dbDatabase}") \
    .option("dbtable", dbQuery) \
    .option("user", dbUsername) \
    .option("password", dbPassword) \
    .option("queryTimeout", 10) \
    .option("numPartitions", partitionSize) \
    .option("partitionColumn", partitionCol) \
    .option("lowerBound", 1) \
    .option("upperBound", partitionUpperBound) \
    .load()

    return jdbcDF

##
# Write Dataframe as automatically partitioned parquet files for each month
##
def write_data(tableName, tableDF, partitionCol):
    tableDF \
    .withColumn("year", year(partitionCol)) \
    .withColumn("month", month(partitionCol)) \
    .write.mode('overwrite')\
    .partitionBy('year', 'month') \
    .parquet(f"abfss://{azStorageContainer}@{azStorageAccount}.dfs.core.windows.net/" + tableName + ".parquet")

Leave a Reply

Your email address will not be published. Required fields are marked *