Model development in PySpark ML on a dataset with different data types for rusty dummies

Do you already know how to work with multiple data types in PySpark ML? No? Then you urgently need to visit us.



image



Hello! I want to cover in detail one interesting, but, unfortunately, not a topic in the Spark documentation: how to train a model in PySpark ML on a dataset with different data types (strings and numbers)? The desire to write this article was caused by the need to browse the Internet for several days in search of the necessary article with the code, because the official tutorial from Spark provides an example of working not only with signs of one data type, but generally with one sign, but information on how to work with several columns the more different types of data, there is no. However, having studied in detail the capabilities of PySpark for working with data, I managed to write working code and understand how everything happens, which I want to share with you. So full speed ahead, friends!



Initially, let's import all the necessary libraries for work, and then we will analyze the code in detail so that any self-respecting "rusty teapot", as, by the way, I recently, will understand everything:



#  
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pyspark.sql.functions as sf
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
#other types of regression models
#     
#from pyspark.ml.regression import LinearRegression
#from pyspark.ml.regression import RandomForestRegressor
#from pyspark.ml.regression import GeneralizedLinearRegression
#from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator


Now let's create a (local) Spark context and a Spark session and check if everything works by displaying it on the screen. Creating a Spark session is the starting point for working with datasets in Spark:



#  
sc = SparkContext('local')
spark = SparkSession(sc)
spark






There is a tool for working with data, now let's load it. The article uses a dataset that was taken from the Kaggle machine learning competition site:

https://www.kaggle.com/unitednations/international-greenhouse-gas-emissions

which, after downloading, is stored in path_csv in .csv format and has the following options:



  • header: if the first line in our file is a header, then we put "true"
  • delimiter: we put a sign separating the data of one line by signs, often it is "," or ";"
  • inferSchema: if true, then PySpark will automatically detect the type of each column, otherwise you will have to write it yourself


#   .csv  path_csv
path_csv = 'greenhouse_gas_inventory_data_data.csv'
data = spark.read.format("csv")\
        .option("header", "true")\
        .option("delimiter", ",")\
        .option("inferSchema", "true")\
        .load(path_csv)


To better understand what kind of data we are dealing with, let's look at a few of their lines:



#   
data.show()




Let's also see how many rows we have in the dataset:

#  
data.select('year').count()






And finally, let's infer the types of our data, which, as we remember, we asked PySpark to determine automatically using option ("inferSchema", "true"):



#     
data.printSchema()






Now let's move on to our main dish - working with several signs of different data types. Spark can train the model on the transformed data, where the predicted column is a vector and the columns with features are also a vector, which complicates the task ... But we do not give up, and to train the model in PySpark we will use Pipeline, into which we will pass a certain action plan (variable stages):



  1. step label_stringIdx: we transform the column of the value dataset that we want to predict into a Spark vector string and re-name it to label with the parameter handleInvalid = 'keep', meaning that our predicted column supports null
  2. stringIndexer step: convert string columns to Spark categorical strings
  3. encoder: ()
  4. assembler: Spark, , VectorAssembler(), ( ) (assemblerInputs) ยซfeaturesยป
  5. gbt: PySpark ML GBTRegressor,


#value -      - 
stages = []
label_stringIdx = StringIndexer(inputCol = 'value', outputCol = 'label', handleInvalid = 'keep')
stages += [label_stringIdx]

#depend on categorical columns: country and types of emission
#   :    
categoricalColumns = ['country_or_area', 'category']
for categoricalCol in categoricalColumns:
    #        
    stringIndexer = StringIndexer(inputCol = categoricalCol,
                                  outputCol = categoricalCol + 'Index',
                                  handleInvalid = 'keep')
    encoder = OneHotEncoder(inputCol=stringIndexer.getOutputCol(),
                            outputCol=categoricalCol + "classVec")
    stages += [stringIndexer, encoder]

#   : 
numericCols = ['year']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
#    - - 
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]


Let's divide our dataset into training and test samples in the favorite ratio of 70% to 30%, respectively, and start training the model using a gradient regression boosting tree (GBTRegressor), which should predict the label vector based on features previously combined into one vector โ€œfeaturesโ€ with iterable limit maxIter = 10:



#       (30% )
(trainingData, testData) = data.randomSplit([0.7, 0.3])

#  (   )
gbt = GBTRegressor(labelCol="label", featuresCol="features", maxIter=10)
stages += [gbt]

#   stages    
pipeline = Pipeline(stages=stages)


And now we just need to send the computer an action plan and a training dataset:



#  
model = pipeline.fit(trainingData)

#     
predictions = model.transform(testData)


Let's save our model so that we can always return to using it without re-training:



# 
pipeline.write().overwrite().save('model/gbtregr_model')


And if you decide to start using the trained model for predictions again, then simply write:



#     
load_model = pipeline.read().load('model/gbtregr_model')




So, we looked at how in a tool for working with big data in the Python language, PySpark, work with several feature columns of different data types is implemented.



Now it's time to apply this to your models ...



All Articles