Connecting Dots
  • Home
  • About me
  • Categories
  • Tags
  • Archives

Example to use Scalar Pandas UDF and UDF

In [3]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StructField, StringType, IntegerType, StructType
schema = StructType(
    [
        StructField("Name", StringType(), True), 
        StructField("Age", IntegerType(), True), 
        StructField("Gender", StringType(), True), 
    ]
)

df = spark.createDataFrame([ ['Raju', 24, 'Male'], ['Sita', 26, 'Female'], ['Andrew', 25, 'Male'] ]* 1000000 , schema=schema)
In [4]:
df.show()
+------+---+------+
|  Name|Age|Gender|
+------+---+------+
|  Raju| 24|  Male|
|  Sita| 26|Female|
|Andrew| 25|  Male|
|  Raju| 24|  Male|
|  Sita| 26|Female|
|Andrew| 25|  Male|
|  Raju| 24|  Male|
|  Sita| 26|Female|
|Andrew| 25|  Male|
|  Raju| 24|  Male|
|  Sita| 26|Female|
|Andrew| 25|  Male|
|  Raju| 24|  Male|
|  Sita| 26|Female|
|Andrew| 25|  Male|
|  Raju| 24|  Male|
|  Sita| 26|Female|
|Andrew| 25|  Male|
|  Raju| 24|  Male|
|  Sita| 26|Female|
+------+---+------+
only showing top 20 rows

In [5]:
%%time

def square(x):
    return x**2

square_udf_float = udf(lambda z: square(z), IntegerType())

df.select('Name','Age', square_udf_float('Age').alias('Age_squared')).collect()
CPU times: user 10.7 s, sys: 520 ms, total: 11.2 s
Wall time: 21.2 s
In [6]:
%%time

from pyspark.sql.functions import PandasUDFType, pandas_udf

def square_ages(ages):
    return ages**2
    
@pandas_udf('integer', returnType=PandasUDFType.SCALAR)
def pandas_udf_square(ages):
    return square_ages(ages)

df.select('Name','Age', pandas_udf_square('Age').alias('Age_squared')).collect()
CPU times: user 10.6 s, sys: 420 ms, total: 11 s
Wall time: 20.9 s
Comments
comments powered by Disqus

Published

May 31, 2019

Category

Data Engineering

Contact

  • Powered by Pelican. Theme: Elegant by Talha Mansoor