• JP

Differences between FAILFAST, PERMISSIVE and DROPMALFORED modes in Dataframes


There's a bit differences between them and we're going to find out in this post. The parameter mode is a way to handle with corrupted records and depending of the mode, allows validating Dataframes and keeping data consistent.


In this post we'll create a Dataframe with PySpark and comparing the differences between these three types of mode:

  • PERMISSIVE

  • DROPMALFORMED

  • FAILFAST


CSV file content


This content below simulates some corrupted records. There are String types for the engines column that we'll define as an Integer type in the schema.

"type","country","city","engines","first_flight","number_built"
"Airbus A220","Canada","Calgary",2,2013-03-02,179
"Airbus A220","Canada","Calgary","two",2013-03-02,179
"Airbus A220","Canada","Calgary",2,2013-03-02,179
"Airbus A320","France","Lyon","two",1986-06-10,10066
"Airbus A330","France","Lyon","two",1992-01-02,1521
"Boeing 737","USA","New York","two",1967-08-03,10636
"Boeing 737","USA","New York","two",1967-08-03,10636
"Boeing 737","USA","New York",2,1967-08-03,10636
"Airbus A220","Canada","Calgary",2,2013-03-02,179

Let's start creating a simple Dataframe that will load data from a CSV file with the content above, let's supposed that the content above it's from a file called airplanes.csv. To modeling the content, we're also creating a schema that will allows us to Data validate.


Creating a Dataframe using PERMISSIVE mode


The PERMISSIVE mode sets to null field values when corrupted records are detected. By default, if you don't specify the parameter mode, Spark sets the PERMISSIVE value.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

if __name__ == "__main__":

    spark = SparkSession.builder \
        .master("local[1]") \
        .appName("spark-app") \
        .getOrCreate()

    schema = StructType([
        StructField("TYPE", StringType()),
        StructField("COUNTRY", StringType()),
        StructField("CITY", StringType()),
        StructField("ENGINES", IntegerType()),
        StructField("FIRST_FLIGHT", StringType()),
        StructField("NUMBER_BUILT", IntegerType())
    ])

    read_df = spark.read \
        .option("header", "true") \
        .option("mode", "PERMISSIVE") \
        .format("csv") \
        .schema(schema) \
        .load("airplanes.csv")

    read_df.show(10)

Result of PERMISSIVE mode



 

Creating a Dataframe using DROPMALFORMED mode


The DROPMALFORMED mode ignores corrupted records. The meaning that, if you choose this type of mode, the corrupted records won't be list.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

if __name__ == "__main__":

    spark = SparkSession.builder \
        .master("local[1]") \
        .appName("spark-app") \
        .getOrCreate()

    schema = StructType([
        StructField("TYPE", StringType()),
        StructField("COUNTRY", StringType()),
        StructField("CITY", StringType()),
        StructField("ENGINES", IntegerType()),
        StructField("FIRST_FLIGHT", StringType()),
        StructField("NUMBER_BUILT", IntegerType())
    ])

    read_df = spark.read \
        .option("header", "true") \
        .option("mode", "DROPMALFORMED") \
        .format("csv") \
        .schema(schema) \
        .load("airplanes.csv")

    read_df.show(10)

Result of DROPMALFORMED mode



After execution it's possible to realize that the corrupted records aren't available at Dataframe.

 

Creating a Dataframe using FAILFAST mode


Different of DROPMALFORMED and PERMISSIVE mode, FAILFAST throws an exception when detects corrupted records.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

if __name__ == "__main__":

    spark = SparkSession.builder \
        .master("local[1]") \
        .appName("spark-app") \
        .getOrCreate()

    schema = StructType([
        StructField("TYPE", StringType()),
        StructField("COUNTRY", StringType()),
        StructField("CITY", StringType()),
        StructField("ENGINES", IntegerType()),
        StructField("FIRST_FLIGHT", StringType()),
        StructField("NUMBER_BUILT", IntegerType())
    ])

    read_df = spark.read \
        .option("header", "true") \
        .option("mode", "FAILFAST") \
        .format("csv") \
        .schema(schema) \
        .load("airplanes.csv")

    read_df.show(10)

Result of FAILFAST mode


ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)

org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.



Cool? I hope you enjoyed it!

Posts recentes

Ver tudo