Categories
Mastering Development

Error while loading data from bigquery table to dataproc cluster

I’m new to Dataproc and PySpark and facing certain issues while integrating bigquery table to dataproc cluster via Jupyter Lab api. Below is the code that i used for loading bigquery table to the dataproc cluster through jupyter notebook api but i am getting an error while loading the table from pyspark.sql import SparkSession SparkSession.builder.appName(‘Jupyter […]

Categories
Mastering Development

How to configure Apache Spark 2.4.5 to connect to MySQL metastore of HIVE?

I am trying to run a hive query using a HiveContext object and receiving the following error. Py4JJavaError Traceback (most recent call last) /usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 62 try: —> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: /usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 327 “An error occurred while calling {0}{1}{2}.\n”. –> 328 format(target_id, […]

Categories
Mastering Development

Pyspark dataframe shows error while displaying the dataframe contents

I am using spark 2.3.2 and using pyspark to read from the hive. Here is my code; from pyspark import SparkContext from pyspark.sql import SQLContext sql_sc = SQLContext(sc) SparkContext.setSystemProperty(“hive.metastore.uris”, “thrift://17.20.24.186:9083”) df=sql_sc.sql(“SELECT * FROM mtsods.model_result_abt”) df.show() ## here is where error occurs When ever i tried to display the contents of the dataframe an error occur […]

Categories
Development

Microsoft Azure spark kusto connector — Is it possible to get files of azure storage out of databricks?

I’m trying to read and write files at azure storage, my attempts until now: Creating the Spark Session: import pyspark from pyspark.sql import SparkSession from pyspark.sql import SQLContext sparkOptions = {“executor_memory” : “1G”,”driver_memory”: “1G”, “max_results_size”: “1G”} conf = pyspark.SparkConf().setAppName(app) conf = (conf.setMaster(“local[*]”) .set(‘spark.executor.memory’, sparkOptions[“executor_memory”])\ .set(‘spark.driver.memory’, sparkOptions[“driver_memory”])\ .set(‘spark.driver.maxResultSize’, sparkOptions[“max_results_size”])\ .set(‘spark.sql.crossJoin.enabled’, “true”)\ .set(‘spark.jars.packages’, ‘com.microsoft.azure.kusto:spark-kusto-connector:1.0.0’)\ .set(“fs.azure”, “org.apache.hadoop.fs.azure.NativeAzureFileSystem”)\ .set(“fs.azure.account.auth.type”, […]

Categories
Development

Error in count after a filter on PySpark RDDs

My Code import findspark findspark.init() from pyspark.sql import SparkSession from pyspark import SparkContext sc = SparkContext(appName=”test”).getOrCreate() rdd = sc.textFile(“sample.txt”) filter_words = rdd.map(lambda line: line.split(“:”)[-1]) \ .flatMap(lambda line: line.split(” “))\ .filter(lambda words: str(words)[0] in [“A”, “E”, “I”, “O”, “U”] ) filter_words.take(3) gives [‘AccessibleComputing’, ‘Anarchism’, ‘AfghanistanHistory’] but filter_words.count() gives an error as follow: <ipython-input-21-1f9bb262f4d6> in <module> —-> […]

Categories
Development

How to use Kafka data source in Jupyter notebook?

I use Jupyter notebook to run the following streaming query using Spark Structured Streaming. The script throws a StreamingQueryException. How could I fix this? Here are versions of the programs that I use: Spark: 2.4.3 Scala: 2.11 KAFKA: 2.1.1 I also used additional jar files: spark-sql-kafka-0-10_2.11-2.4.3.jar kafka-clients-2.1.1.jar Here is the Spark consumer code: from pyspark.sql […]

Categories
Development

Spark refuse to create empty dataframe when using pyarrow

I want to create an empty dataframe out of an existing spark dataframe. I use pyarrow support (enabled in spark conf). When I try to create an empty dataframe out of an empty RDD and the same schema as my existing dataframe I got a java.lang.NegativeArraySizeException. Here is the entire code to reproduce the error […]

Categories
Development

reduceByKey with multiple values

Each value in my rdd is a tuple: temp = clustering.map(lambda x: (x[0][0], (1,1))) temp.take(10) [(0, (1, 1)), (0, (1, 1)), (6, (1, 1)), (0, (1, 1)), (0, (1, 1)), (0, (1, 1)), (0, (1, 1)), (0, (1, 1)), (7, (1, 1)), (0, (1, 1))] then tried to reduce it by key: temp.reduceByKey(lambda a,b: (a[1]+b[1])).collect() […]