whoami

  • Ruben Berenguel (@berenguel)
  • PhD in Mathematics
  • (big) data consultant
  • Lead data engineer using Python, Go and Scala
  • Right now at Affectv

What is Pandas?

  • Python Data Analysis library
  • Used everywhere data and Python appear in job offers
  • Efficient (is columnar and has a C and Cython backend)

How does Pandas manage columnar data?

What is Arrow?

  • Cross-language in-memory columnar format library
  • Optimised for efficiency across languages
  • Integrates seamlessly with Pandas

How does Arrow manage columnar data?

🏹 ❤️ 🐼

  • Arrow uses RecordBatches
  • Pandas uses blocks handled by a BlockManager
  • You can convert an Arrow Table into a Pandas DataFrame easily

What is Spark?

  • Distributed Computation framework
  • Open source
  • Easy to use
  • Scales horizontally and vertically

How does Spark work?

Spark usually runs on top of a cluster manager

And a distributed storage

A Spark program runs in the driver

The driver requests resources from the cluster manager to run tasks

The driver requests resources from the cluster manager to run tasks

The driver requests resources from the cluster manager to run tasks

The driver requests resources from the cluster manager to run tasks

The main building block is the RDD:

##__R__esilient __D__istributed __D__ataset

Py Spark

PySpark offers a Python API to the Scala core of Spark

It uses the Py4J bridge

gateway = JavaGateway(
    gateway_parameters=GatewayParameters(
       port=gateway_port, 
       auth_token=gateway_secret,
       auto_convert=True))
java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.api.python.*")
.
.
.
return gateway

The main entrypoints are RDD and PipelinedRDD(RDD)

PipelinedRDD

builds in the JVM a

PythonRDD

The magic is in

compute

compute

is run on each executor and starts a Python worker via PythonRunner

Workers act as standalone processors of streams of data

  • Connects back to the JVM that started it
  • Load included Python libraries
  • Deserializes the pickled function coming from the stream
  • Applies the function to the data coming from the stream
  • Sends the output back

But… wasn’t Spark magically optimising everything?

Yes, for Spark DataFrame

Spark will generate a plan

(a D irected A cyclic __G__raph)

to compute the result

And the plan will be optimised using Catalyst

Depending on the function, the optimiser will choose

PythonUDFRunner

or

PythonArrowRunner

(both extend PythonRunner)

If we can define our functions using Pandas Series transformations we can speed up PySpark code from 3x to 100x!

Quick examples

The basics: toPandas

from pyspark.sql.functions import rand

df = spark.range(1 << 20).toDF("id").withColumn("x", rand())

spark.conf.set("spark.sql.execution.arrow.enabled", "false")
pandas_df = df.toPandas()
from pyspark.sql.functions import rand

df = spark.range(1 << 20).toDF("id").withColumn("x", rand())

spark.conf.set("spark.sql.execution.arrow.enabled", "false")
pandas_df = df.toPandas()
from pyspark.sql.functions import rand

df = spark.range(1 << 20).toDF("id").withColumn("x", rand())

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_df = df.toPandas()

The fun: .groupBy

from pyspark.sql.functions import rand, randn, floor
from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.range(20000000).toDF("row").drop("row") \
     .withColumn("id", floor(rand()*10000)).withColumn("spent", (randn()+3)*100)

@pandas_udf("id long, spent double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
    spent = pdf.spent
    return pdf.assign(spent=spent - spent.mean())

df_to_pandas_arrow = df.groupby("id").apply(subtract_mean).toPandas()
from pyspark.sql.functions import rand, randn, floor
from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.range(20000000).toDF("row").drop("row") \
     .withColumn("id", floor(rand()*10000)).withColumn("spent", (randn()+3)*100)

@pandas_udf("id long, spent double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
    spent = pdf.spent
    return pdf.assign(spent=spent - spent.mean())

df_to_pandas_arrow = df.groupby("id").apply(subtract_mean).toPandas()

Before you may have done something like..

import numpy as np
from pyspark.sql.functions import collect_list

grouped = df2.groupby("id").agg(collect_list('spent').alias("spent_list"))
as_pandas = grouped.toPandas()
as_pandas["mean"] = as_pandas["spent_list"].apply(np.mean)
as_pandas["substracted"] = as_pandas["spent_list"].apply(np.array) - as_pandas["mean"]
df_to_pandas = as_pandas.drop(columns=["spent_list", "mean"]).explode("substracted")
import numpy as np
from pyspark.sql.functions import collect_list

grouped = df2.groupby("id").agg(collect_list('spent').alias("spent_list"))
as_pandas = grouped.toPandas()
as_pandas["mean"] = as_pandas["spent_list"].apply(np.mean)
as_pandas["substracted"] = as_pandas["spent_list"].apply(np.array) - as_pandas["mean"]
df_to_pandas = as_pandas.drop(columns=["spent_list", "mean"]).explode("substracted")

TLDR:

Use1 Arrow and Pandas UDF s

1 →

in pyspark

Questions?

Thanks!

Get the slides from my github:

github.com/rberenguel/

The repository is

pyspark-arrow-pandas

Further references

Table for toPandas

2 to x Direct (s) With Arrow (s) Factor
17 1,08 0,18 5,97
18 1,69 0,26 6,45
19 4,16 0,30 13,87
20 5,76 0,61 9,44
21 9,73 0,96 10,14
22 17,90 1,64 10,91
23 (OOM) 3,42
24 (OOM) 11,40

EOF

// reveal.js plugins