
Sunday, November 18, 2018

PySpark: references to variable number of columns in UDF

Problem statement:

  Suppose that you want to create a column in a DataFrame based on many existing columns, but you don't know how many columns, possibly because that will be given by the user or another software.

This is how you can do it:
import pyspark.sql.functions as f
import pyspark.sql.types as t
# ...
data_frame = data_frame.withColumn('columnB', data_frame['columnA'])
data_frame = data_frame.withColumn('columnC', data_frame['columnA'])
attrs = ['columnA', 'columnB', 'columnC']
# Concatenate the given columns. Each column is of type SparseVector in this case.
def udf_concat_vec(*a):
result = []
# a is a tuple of size 1
var1 = a[0]
# var1 is a list of size 3
for var2 in var1:
result = np.concatenate((result, var2.toArray()))
return result.tolist()
my_udf_concat_vec = f.UserDefinedFunction(udf_concat_vec, t.ArrayType(t.FloatType()))
data_frame = data_frame.withColumn("together", my_udf_concat_vec(f.array(attrs)))

Saturday, November 17, 2018

PySpark: Concatenate two DataFrame columns using UDF

Problem Statement:
  Using PySpark, you have two columns of a DataFrame that have vectors of floats and you want to create a new column to contain the concatenation of the other two columns.

This is how you can do it:

import pyspark.sql.functions as f
import pyspark.sql.types as t
# ...
def udf_concat_vec(a, b):
# a and b of type SparseVector
return np.concatenate((a.toArray(), b.toArray())).tolist()
my_udf_concat_vec = f.UserDefinedFunction(udf_concat_vec, t.ArrayType(t.FloatType()))
df2 = df.withColumn("togetherAB", my_udf_concat_vec('columnA', 'columnB'))

Thursday, November 15, 2018

PySpark, NLP and Pandas UDF

Problem statement:
  Assume that your DataFrame in PySpark has a column with text. Assume that you want to apply NLP and vectorize this text, creating a new column.

This is how to do it using @pandas_udf.

import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
import spacy
# nlp = spacy.load('en_core_web_lg')
nlp = spacy.load('en_core_web_sm')
# Use pandas_udf to define a Pandas UDF
@pandas_udf('array<double>', PandasUDFType.SCALAR)
# The input is a pandas.Series with strings. The output is a pandas.Series of arrays of double.
def pandas_nlp(s):
return s.fillna("_NO_₦Ӑ_").replace('', '_NO_ӖӍΡṬΫ_').transform(lambda x: (nlp(x).vector.tolist()))

spaCy is the NLP library used ( see https://spacy.io/api/doc ). nlp(astring) is the call that vectorizes the text. The s.fillna("_NO_₦Ӑ_").replace('', '_NO_ӖӍΡṬΫ_') expressions fill in missing data.

Now you can create a new column in the dataframe calling the function.

dataframe = dataframe.withColumn('description_vec', pandas_nlp('description'))

For more information on Pandas UDF see

Tuesday, November 13, 2018

Pandas UDF for PySpark, handling missing data

Problem statement:
  You have a DataFrame and one column has string values, but some values are the empty string. You need to apply the OneHotEncoder, but it doesn't take the empty string.

  Use a Pandas UDF to translate the empty strings into another constant string.

First, consider the function to apply the OneHotEncoder:

from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
# ...
def one_hot_encode(_df, input_column, output_column):
indexer = StringIndexer(inputCol=input_column, outputCol=input_column+"_indexed", handleInvalid='skip')
_model = indexer.fit(_df)
_td = _model.transform(_df)
encoder = OneHotEncoder(inputCol=input_column+"_indexed", outputCol=output_column, dropLast=True)
_df2 = encoder.transform(_td)
return _df2
Now the interesting part. This is the Pandas UDF function:
from pyspark.sql.functions import pandas_udf
# Use pandas_udf to define a Pandas UDF
# Input/output are both a pandas.Series of string
def pandas_not_null(s):
return s.fillna("_NO_₦Ӑ_").replace('', '_NO_ӖӍΡṬΫ_')

And now you can create a new column and apply the OneHotEncoder:

dataframe = dataframe.withColumn('ACOLUMN_not_null', pandas_not_null('ACOLUMN'))
dataframe = one_hot_encode(dataframe, "ACOLUMN_not_null", "ACOLUMN_one_hot")

For more information, see https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html and http://pandas.pydata.org/pandas-docs/stable/generated/pandas.Series.str.replace.html#pandas.Series.str.replace .

This is the exception you get if you don't replace the empty string:

   File "/Users/vivaomengo/anaconda/lib/python3.6/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/Users/vivaomengo/anaconda/lib/python3.6/site-packages/pyspark/sql/utils.py", line 79, in deco
    raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.IllegalArgumentException: 'requirement failed: Cannot have an empty string for name.'