FB_init

Tuesday, November 13, 2018

Pandas UDF for PySpark, handling missing data


Problem statement:
  You have a DataFrame and one column has string values, but some values are the empty string. You need to apply the OneHotEncoder, but it doesn't take the empty string.

Solution:
  Use a Pandas UDF to translate the empty strings into another constant string.

First, consider the function to apply the OneHotEncoder:

from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
# ...
def one_hot_encode(_df, input_column, output_column):
indexer = StringIndexer(inputCol=input_column, outputCol=input_column+"_indexed", handleInvalid='skip')
_model = indexer.fit(_df)
_td = _model.transform(_df)
encoder = OneHotEncoder(inputCol=input_column+"_indexed", outputCol=output_column, dropLast=True)
_df2 = encoder.transform(_td)
return _df2
view raw sample.py hosted with ❤ by GitHub
Now the interesting part. This is the Pandas UDF function:
from pyspark.sql.functions import pandas_udf
#...
# Use pandas_udf to define a Pandas UDF
@pandas_udf('string')
# Input/output are both a pandas.Series of string
def pandas_not_null(s):
return s.fillna("_NO_₦Ӑ_").replace('', '_NO_ӖӍΡṬΫ_')

And now you can create a new column and apply the OneHotEncoder:

dataframe = dataframe.withColumn('ACOLUMN_not_null', pandas_not_null('ACOLUMN'))
dataframe = one_hot_encode(dataframe, "ACOLUMN_not_null", "ACOLUMN_one_hot")

For more information, see https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html and http://pandas.pydata.org/pandas-docs/stable/generated/pandas.Series.str.replace.html#pandas.Series.str.replace .

This is the exception you get if you don't replace the empty string:

   File "/Users/vivaomengo/anaconda/lib/python3.6/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/Users/vivaomengo/anaconda/lib/python3.6/site-packages/pyspark/sql/utils.py", line 79, in deco
    raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.IllegalArgumentException: 'requirement failed: Cannot have an empty string for name.'


No comments: