Apache Spark - Structured API

Apache Spark - Structured API

Apache Spark's Structured API is a high-level programming interface that enables users to manipulate and analyze structured and semi-structured data in a distributed computing environment. It is built on top of the Spark Core engine, providing a user-friendly abstraction layer. Developers can use popular high-level programming languages like Scala and Python to write code with the Structured API. The code is then compiled into low-level code, such as Java bytecode or machine code, which is executed by the JVM or hardware. The Structured API includes three main components:-

DataFrames:-

DataFrames are the most commonly used component of the Structured API. They are distributed collections of data organized into named columns. DataFrames can be created from a variety of sources, such as structured data files, Hive tables, and external databases.

Datasets:-

Datasets are an extension of DataFrames, and provide type-safety and object-oriented programming features. Datasets are strongly-typed collections of domain-specific objects that can be processed in a distributed environment.

SQL:-

The Structured API also includes a SQL interface for working with data using SQL queries. SQL queries can be executed on DataFrames and Datasets, making it easy to leverage existing SQL skills and tools.

RDD's vs Dataframes vs Datasets:-

RDDs, DataFrames, and Datasets are all abstractions in Apache Spark for processing data in a distributed computing environment.

RDDs are the basic building block of Spark and provide a low-level API for distributed data processing. They are resilient, distributed, and can be processed in parallel.

DataFrames are a higher-level abstraction built on top of RDDs that provide a more convenient and optimized API for working with structured and semi-structured data. They offer a schema-based approach, similar to a table in a relational database.

Datasets are a newer API introduced in Spark 1.6 as a new API that provides a strongly typed, object-oriented programming interface, similar to RDDs. However, unlike RDDs, Datasets also incorporate the performance optimizations and ease of use provided by DataFrames.

In general, RDDs are more flexible and can be used for arbitrary data types and processing tasks, while DataFrames and Datasets provide more structured and optimized APIs for specific types of data processing tasks.

Datasets provide compile-time safety while datasets do not.

challenges/limitations:-

RDDs, DataFrames, and Datasets are all core abstractions in Apache Spark that provide different levels of abstraction for processing data in distributed environments. RDDs provide a low-level, flexible API that requires manual memory management and lacks the performance optimizations of higher-level abstractions like DataFrames and Datasets. DataFrames and Datasets provide higher-level abstractions with built-in optimization techniques like query optimization, predicate pushdown, and code generation. Datasets provide additional compile-time type safety over DataFrames, but require the user to define an explicit schema for the data. Each abstraction has its own set of challenges, such as the difficulty of debugging and optimizing complex RDD transformations, the challenges of working with nullable data in DataFrames, and the complexity of working with type safety and serialization issues in Datasets.

preferred choice - dataframes vs dataset:-

The choice between DataFrames and Datasets depends on the specific needs and use cases of your application. However, in many cases, DataFrames are more commonly used than Datasets because they offer a good balance of ease of use and performance.

While there can be overhead involved in converting DataFrames to Datasets, this can be offset by the benefits that Datasets offer, such as compile-time safety, stronger typing, and functional programming constructs. DataFrames use Tungsten binary format for serialization which is more efficient while Datasets use a custom encoder that can be customized for specific use cases.

Using datasets will help in cutting down developer mistakes but comes at the extra cost of casting and expensive serialization.

Spark Session:-

SparkSession is the primary interface for working with data in Spark, serving as a unified entry point for data processing. It allows users to create RDDs, DataFrames, and Datasets, as well as execute SQL queries and read/write data from different sources. SparkSession is a singleton object, created using a builder pattern that can be configured with various settings and options. These settings include the master URL, application name, and other Spark configurations.

import org.apache.spark.sql.SparkSession

object structured_api_1 extends App {

  // Create a SparkSession object with the given configuration
  val spark = SparkSession.builder()
              .appName("my first app")
              .master("local[*]")  // Run Spark locally using all available cores
              .getOrCreate()  // If a SparkSession already exists, return it; otherwise create a new one

  // Do some Spark operations here...

  // Stop the SparkSession object to free up resources
  spark.stop()

}

Another way of creating SparkSession:-

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object api_2 extends App {

  val sparkConf = new SparkConf()

  sparkConf.set("spark.app.name", "my app 1")
  sparkConf.set("spark.master", "local[*]")

  val spark = SparkSession.builder().
              config(sparkConf).
              getOrCreate()

  spark.stop()
}

Loading data - Dataframe reader API:-

Once a SparkSession is created - we can use it to read data from various sources, such as CSV files, JSON files, Parquet files, and more. We can also use it to perform various operations on the data, such as filtering, grouping, aggregating, and joining, using the Spark SQL API or the DataFrame API. Finally, we can also use SparkSession to write the processed data to different output formats, such as CSV, JSON, Parquet, and more.

Although Spark has built-in functionality to read data from external sources - it is not very efficient and there may be cases where you need to load data into HDFS before processing it with Spark. In such cases, you can use Sqoop, a tool that can efficiently transfer data between external databases and Hadoop.

import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
import org.apache.log4j.Level
import org.apache.log4j.Logger

// Define a new object named "api_3" that extends the App trait
object api_3 extends App {

  // Set the log level to ERROR for the "org" logger
  Logger.getLogger("org").setLevel(Level.ERROR)

  // Create a new SparkConf object and set the application name and master URL
  val sparkConf = new SparkConf()
  sparkConf.set("spark.app.name", "my first app")
  sparkConf.set("spark.master", "local[*]")

  // Create a new SparkSession object using the SparkConf
  val spark = SparkSession.builder().
              config(sparkConf).
              getOrCreate()

  // Read a CSV file from the specified path and create a DataFrame
  val df1 = spark.read.
            option("header", true).
            option("inferSchema", true).
            csv("/C:/Users/gvpre/Downloads/orders-201019-002101.csv")

// This is the standard way of taking input
  val df1 = spark.read.
            format("csv").
            option("header", true).
            option("inferSchema", true).
            option("path", "/C:/Users/gvpre/Downloads/orders-201019-002101.csv").
            option("mode", "premissive").
            load()

  // Print the schema of the DataFrame to the console
  df1.printSchema()

  // Show the first 20 rows of the DataFrame in the console
  df1.show()

  // Stop the SparkSession
  spark.stop()

}

Dataframe reader API - Read modes:-

The DataFrame reader API in Spark has several read modes to handle data sources that may contain malformed records. The "Permissive" mode reads all records from the source, treating any malformed records as null or empty values. In contrast, the "DropMalformed" mode drops any records that cannot be parsed or contain malformed data. The "FailFast" mode immediately fails the read operation upon encountering any malformed records.

Additionally, the "Permissive_InferSchema" mode infers the schema of the data source and reads all records, treating any malformed records as null or empty values. Finally, the "DropMalformed_InferSchema" mode infers the schema of the data source, drops any malformed records, and reads the remaining records. These modes provide flexibility in handling various data sources with different levels of data quality.

Note:-

using inferSchema is not always recommended, especially when working with large datasets or when the schema of the data is complex or ambiguous. Instead, it is recommended to define the schema explicitly using a StructType object or a case class, as this can provide better performance and avoid potential errors that may arise from inferring the schema.

converting Dataframe to Dataset using case class:-

// Import necessary Spark libraries
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.log4j.Level
import org.apache.log4j.Logger
import java.sql.Timestamp

// Define the name of the Scala object
object api_5 extends App {

  // Set log level to ERROR
  Logger.getLogger("org").setLevel(Level.ERROR) 

  // Configure Spark
  val sparkConf = new SparkConf()
  sparkConf.set("spark.master", "local[2]")  // Run Spark locally on 2 cores
  sparkConf.set("spark.app.name", "app 2")  // Set the name of the Spark application

  // Create a SparkSession
  val spark = SparkSession.builder().
              config(sparkConf).
              getOrCreate()

  // Define a case class to represent the orders data
  case class ordersData (
    order_id: Int,
    order_date : Timestamp,
    order_customer_id: Int,
    oredr_status: String
  )


  // Read the orders data from a CSV file and infer the schema
  val df1 = spark.read.
          option("header", true).
          option("inferSchema", true).
          csv("/C:/Users/gvpre/Downloads/orders-201019-002101.csv")

  // Import SparkSession implicits
  import spark.implicits._

  // Convert the DataFrame to a Dataset of ordersData
  val ds1 = df1.as[ordersData]

  // Show the contents of the Dataset
  ds1.show()  
}

converting dataframe to dataset using strutType and DDL String:-

// prigramming style
val ordersSchema1 = StrutType(List(
StructField("order_id", IntegerType),
StructField("order_date", TimestampType),
StructField("order_customer_id", IntegerType),
StructField("order_status", StringType),
))

df = spark.read.format("csv") \
    .option("header", "true") \
    .schema(ordersSchema1) \
    .load("path/to/csv/file.csv")

// DDL stype
val ordersScheme2 = "order_id Int, ordr_date String, customer_id Int, order_status String"

df = spark.read.format("csv") \
    .option("header", "true") \
    .schema(ordersSchema2) \
    .load("path/to/csv/file.csv")

Dataframe writer API:-

df.wirte.
format("csv").
mode(SaveMode.Overwrite).
option("path", "path/folder").
save()

Dataframe writer API- save modes:-

overwrite: Overwrite the existing data at the target location.

append: Append the new data to the existing data at the target location.

ignore: Do not write the new data if the target location already has data. This is the default mode.

error or errorifexists: Throw an exception if the target location already has data.

Spark file layout:-

If we want the data to be written to a specific number of files then we can use the repartition method to change the number of partitions of the Dataframe before invoking the writer API. Do note that the number of files created will be equal to the number of partitions of the DataFrame. This method is not always preferred as it involves a full shuffle. Also, partition pruning is not possible.

we also have the following options to control the file layout while using the dataframe writer api:-

partitionBy:-

partitionBy is a method in Spark's DataFrameWriter API that allows you to specify the column(s) to partition the output by when writing data to a file. Partitioning is a way of horizontally dividing data into separate directories or files based on the values of one or more columns. This can improve query performance by allowing Spark to skip reading irrelevant data.

val df = Seq(
  ("Alice", "2019", "January", 100),
  ("Bob", "2020", "February", 200),
  ("Charlie", "2020", "February", 300),
  ("David", "2021", "March", 400)
).toDF("name", "year", "month", "amount")

df.write
  .partitionBy("year", "month")
  .parquet("/path/to/output/directory")

bucketBy:-

The bucketBy function in the SparkWriterApi of Apache Spark is used to partition data by a specified column and write it to separate files within a given directory. It is useful for optimizing query performance and managing large datasets.

df.write.bucketBy(numBuckets, colName).mode(mode).saveAsTable("retail.orders")

MaxRecordsPerFile:-

The maxRecordsPerFile option in the SparkWriterApi of Apache Spark is used to set the maximum number of records to be written per file when writing data to disk. It helps to manage the size of output files and prevent files from becoming too large, which can impact performance and file system performance.

df.write.format("csv").option("maxRecordsPerFile", 1000).save("path/to/directory")

Handling unsupported file formats:-

If we want to write data in a format(Avro, xml) that is not supported by default by spark then we have to download and add the corresponding jar file.

Saving data to tables:-

we can use the Writer API to save the data into a persistent storage system in the form of a table. The table created through the Writer API has two components: data and metadata.

The data component is stored in the Hadoop Distributed File System (HDFS) or the local file system, depending on the configuration. By default, the data is stored in the directory specified by the spark.sql.warehouse.dir configuration property. The metadata component, on the other hand, is stored in the catalog metastore. By default, the metadata is stored in an in-memory catalog, which means that the metadata will be lost when the application terminates. However, we can also configure Spark to use a persistent metastore, such as Hive Metastore, to store the metadata permanently.

// Import required classes and packages
import java.sql.Timestamp
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SaveMode

// Define an object with the main function
object api_7 extends App {

  // Set the logging level to ERROR for the 'org' logger
  Logger.getLogger("org").setLevel(Level.ERROR)

  // Create a SparkConf object and set some configuration properties
  val sparkConf = new SparkConf()
  sparkConf.set("spark.master", "local[2]") // Set the master URL to run on two local threads
  sparkConf.set("spark.app.name", "app 2") // Set the name of the Spark application

  // Create a SparkSession object with the given SparkConf and enable Hive support
  val spark = SparkSession.builder().
              config(sparkConf).
              enableHiveSupport().
              getOrCreate()

  // Read a CSV file into a DataFrame using SparkSession's read method
  val df1 = spark.read.
            format("csv").
            option("header", true).
            option("inferSchema", true).
            option("path", "/C:/Users/gvpre/Downloads/orders-201019-002101.csv").
            load()

  // Create a new database called 'retail4' if it doesn't exist
  spark.sql("create database if not exists retail4")

  // Write the DataFrame to a table called 'retail4.orders' using the CSV format
  // in overwrite mode (if the table already exists, it will be replaced)
  df1.write.
  format("csv").
  mode(SaveMode.Overwrite).
  saveAsTable("retail4.orders")

  // List all the tables in the 'retail4' database and display them in a tabular format
  spark.catalog.listTables("retail4").show() 
}

Spark SQL:-

Spark SQL module within Apache Spark provides a programming interface to work with structured and semi-structured data using SQL queries. we can create a table out of the Dataframe using createOrReplaceTempView method to run sql queries on it.

import java.sql.Timestamp
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession


object api_7 extends App {

  Logger.getLogger("org").setLevel(Level.ERROR)


  val sparkConf = new SparkConf()
  sparkConf.set("spark.master", "local[2]")
  sparkConf.set("spark.app.name", "app 2")


  val spark = SparkSession.builder().
              config(sparkConf).
              getOrCreate()


  val df1 = spark.read.
            format("csv").
            option("header", true).
            option("inferSchema", true).
            option("path", "/C:/Users/gvpre/Downloads/orders-201019-002101.csv").
            load()       

  df1.createOrReplaceTempView("orders")

// df2 will be a Dataframe
  val df2 = spark.sql("select order_Status, count(*) from orders group by order_status")

  df2.show()

  }

How to refer a column in dataframe/dataset:-

we can refer to a column in a DataFrame or Dataset in two ways: column string and column object.

Column String:

We can refer to a column by its name as a string. For example, if you have a DataFrame named df with a column named name, you can refer to that column as "name".

Column Object:

we can also refer to a column using a Column object. This allows us to perform more complex operations on the column, such as applying functions or aggregating data.

column expression:-

A column expression is a logical expression that represents a computation to be performed on one or more columns in a Spark DataFrame.

// Refer to the "name" column using a string
val names = df.select("name", "order_id")

// Refer to the "name" column using a Column object
val names = df.seclet(column("order_id"), col("order_date"))

// column expression
val names = df.select.Expr("order_id", "order_date", "concat(order_status, '_status')")

Spark execution model:-

Apache Spark's Structured API operates lazily until an action is called. Each action triggers one or more parallel computations called jobs, each of which consists of one or more stages. A stage represents a sequence of tasks that can be executed without shuffling data, and a task corresponds to a partition of data. The number of tasks is equal to the number of partitions, which can be set explicitly or determined automatically by Spark. To improve performance, it is best to minimize the number of shuffles in a Spark job, since shuffling involves expensive data transfers between nodes.

Aggregrate transformations:-

Simple Aggregations:

These are basic aggregate functions that perform computations on a single column of a DataFrame or RDD. Some examples of simple aggregations are count, sum, avg, min, max, mean, first, and last. These functions take a column as input and return a scalar value as output.

Grouping Aggregations:

Grouping Aggregations are used to group data based on one or more columns and perform aggregations on those groups. Examples of grouping aggregations include groupBy, agg, rollup, cube, pivot, collect_list, collect_set, approx_count_distinct, and percentile_approx. These functions allow you to compute aggregates on subsets of data based on different grouping criteria.

Window Aggregations:

Window functions are used to perform computations across multiple rows of data within a window or a partition. Examples of window aggregations include rank, dense_rank, percent_rank, cume_dist, lag, lead, ntile, sum, avg, min, max, and count. Window functions allow you to compute values over a sliding window of data, based on a specified partitioning and ordering of rows.

Joins:-

Simple join:-

Broadcast join:-

Whole stage code Generation(WSCG):-

Spark Whole-Stage Code Generation (WSCG) is an optimization technique that generates optimized code for entire stages of a query to speed up query execution.

When executing a query in Spark, it goes through a series of stages. Each stage has a set of tasks that can be executed in parallel. Normally, the data for each task is passed through multiple operators to perform various transformations before generating the final result. With WSCG, Spark generates optimized code for the entire stage, including all the operators and transformations within it. This optimized code is then compiled and executed on the executor nodes, resulting in significant performance improvements. However, WSCG may not always provide performance improvements, especially for small queries or those with a lot of data shuffling. Therefore, it's important to evaluate query performance before and after enabling WSCG to determine its effectiveness.

To enable WSCG, set the configuration property spark.sql.codegen.wholeStage to true.

Catalyst optimizer:-

Catalyst optimizer is a query optimizer that helps to optimize Spark SQL queries. It is a rule-based optimizer that uses a set of predefined rules to optimize queries. These rules are applied to the query plan to improve performance by reducing the amount of data that needs to be processed.

The optimizer works by generating an initial query plan, and then applying a series of optimization rules to the plan to generate a more efficient plan. These rules include things like predicate pushdown, which pushes filters closer to the data source to reduce the amount of data that needs to be processed, and column pruning, which removes unused columns from the query plan to reduce the amount of data that needs to be transferred over the network.

In addition to these rules, the Catalyst optimizer also supports cost-based optimization, which estimates the cost of different query plans and selects the plan with the lowest cost.

Overall, the Catalyst optimizer is an important component of Spark that helps to improve the performance of Spark SQL queries by optimizing the query plan.