A Spark program consists of a driver application and worker programs
The SparkContext
contains all of the necessary info on the cluster to run Spark code.
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('spark-app').setMaster('local[*]')
sc = SparkContext.getOrCreate(conf=conf)
sc
A partitioned collection of objects spread accross a cluster, stored in memory or on disk.
3 ways of creating a RDD
array = range(10)
array
range(0, 10)
rdd = sc.parallelize(array)
rdd
PythonRDD[1] at RDD at PythonRDD.scala:48
3 ways of creating a RDD
titanic = sc.textFile('data/titanic.csv')
titanic
data/titanic.csv MapPartitionsRDD[3] at textFile at <unknown>:0
titanic.take(3)
['PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked', '1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S', '2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C']
3 ways of creating a RDD
rdd.map(lambda number: number * 2)
PythonRDD[5] at RDD at PythonRDD.scala:48
rdd.map(lambda number: number * 2).collect()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Let's create a RDD from a list of numbers, and play with it.
rdd = sc.parallelize(range(16), 4)
rdd.cache()
PythonRDD[8] at RDD at PythonRDD.scala:48
print(rdd) # prints only info on RDD, no evaluation
PythonRDD[8] at RDD at PythonRDD.scala:48
print(rdd.map(lambda x: x*2)) # specific methods to gather data back to driver
PythonRDD[9] at RDD at PythonRDD.scala:48
print(rdd.map(lambda num: num + 1).toDebugString()) # check RDD lineage
b'(4) PythonRDD[10] at RDD at PythonRDD.scala:48 []\n | PythonRDD[8] at RDD at PythonRDD.scala:48 []\n | ParallelCollectionRDD[7] at parallelize at PythonRDD.scala:489 []'
Come in two types : transformations / actions
Transformations shape your dataset
Return a new RDD containing only the elements that satisfy a predicate.
Ex : return only even numbers.
rdd.filter(lambda x: x % 2 == 0).collect()
[0, 2, 4, 6, 8, 10, 12, 14]
Return a new RDD by applying a function to each element of this RDD.
Ex : multiply all numbers by 2.
rdd.map(lambda x: x * 2).collect()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30]
Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
Ex : return a long matrix of rows [1, 2, 3] of dimension the number of elements in the rdd
variable, then flatten it.
rdd.flatMap(lambda num: [1, 2, 3]).take(6)
[1, 2, 3, 1, 2, 3]
Return a new RDD containing the distinct elements in this RDD.
rdd.map(lambda num: 0 if num % 2 == 0 else 1).distinct().collect()
[0, 1]
Actions execute the task and associated transformations
Return a list that contains all of the elements in this RDD.
Note this method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory
rdd.take(5)
[0, 1, 2, 3, 4]
Return the number of elements in this RDD.
rdd.count()
16
Reduces the elements of this RDD using the specified commutative and associative binary operator. Currently reduces partitions locally.
Ex : sum all the numbers in the RDD.
rdd.reduce(lambda x,y: x + y)
120
Key/value RDDs are commonly used to perform aggregations, and often we will do some initial ETL (extract, transform, and load) to get our data into a key/value format.
Key/value RDDs expose new operations (e.g., counting up reviews for each product, grouping together data with the same key, and grouping together two different RDDs).
Merge the values for each key using an associative and commutative reduce function.
Ex : Add all numbers associated to each key.
rdd = sc.parallelize([('a', 1), ('b', 0), ('b', 2), ('a', 5)], 4)
rdd.reduceByKey(lambda x,y: x + y).collect()
[('b', 2), ('a', 6)]
Return an RDD containing all pairs of elements with matching keys in self and other.
Ex : Add all numbers associated to vowels and consonants.
countLetter = sc.parallelize([('a', 1), ('b', 6), ('c', 2), ('a', 5)], 4)
defLetter = sc.parallelize([('a', 'vowel'), ('b', 'consonant'), ('c', 'consonant'), ('d', 'consonant')], 4)
countLetter.join(defLetter).map(lambda x: (x[1][1], x[1][0])).reduceByKey(lambda x,y: x + y).collect()
[('consonant', 8), ('vowel', 6)]
rdd = sc.textFile('data/lorem.txt')
rdd.flatMap(lambda row: [(r, 1) for r in row.split(' ')]).reduceByKey(lambda x,y: x + y).take(6)
[('Lorem', 1), ('ipsum', 3), ('consectetur', 2), ('elit.', 2), ('est', 4), ('mattis', 5)]
Resilient Distributed Datasets (RDDs) are a distributed collection of immutable JVM objects that allow you to perform calculations very quickly, and they are the backbone of Apache Spark
sc.stop()
Spark powers a stack of libraries including SQL and DataFrames, MLlib for machine learning, GraphX, and Spark Streaming. You can combine these libraries seamlessly in the same application.
This chapter introduces Spark SQL, Spark’s interface for working with structured and semistructured data.
The entry point to programming Spark with the Dataset and DataFrame API.
from pyspark import SparkConf
from pyspark.sql import SparkSession
conf = SparkConf().setAppName('spark-app').setMaster('local[*]')
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark
SparkSession - in-memory
Under the hood, a Dataframe is an RDD composed of Row objects with additional schema information of the types in each col‐ umn. Row objects are just wrappers around arrays of basic types.
titanic = spark.read.option('header', 'true').option('inferSchema', 'true').csv('data/titanic.csv')
titanic.createOrReplaceTempView('titanic')
titanic.show(8)
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+ |PassengerId|Survived|Pclass| Name| Sex| Age|SibSp|Parch| Ticket| Fare|Cabin|Embarked| +-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+ | 1| 0| 3|Braund, Mr. Owen ...| male|22.0| 1| 0| A/5 21171| 7.25| null| S| | 2| 1| 1|Cumings, Mrs. Joh...|female|38.0| 1| 0| PC 17599|71.2833| C85| C| | 3| 1| 3|Heikkinen, Miss. ...|female|26.0| 0| 0|STON/O2. 3101282| 7.925| null| S| | 4| 1| 1|Futrelle, Mrs. Ja...|female|35.0| 1| 0| 113803| 53.1| C123| S| | 5| 0| 3|Allen, Mr. Willia...| male|35.0| 0| 0| 373450| 8.05| null| S| | 6| 0| 3| Moran, Mr. James| male|null| 0| 0| 330877| 8.4583| null| Q| | 7| 0| 1|McCarthy, Mr. Tim...| male|54.0| 0| 0| 17463|51.8625| E46| S| | 8| 0| 3|Palsson, Master. ...| male| 2.0| 3| 1| 349909| 21.075| null| S| +-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+ only showing top 8 rows
titanic.filter(titanic.Sex == 'male').select(['Name', 'Sex', 'Survived']).show(3)
+--------------------+----+--------+ | Name| Sex|Survived| +--------------------+----+--------+ |Braund, Mr. Owen ...|male| 0| |Allen, Mr. Willia...|male| 0| | Moran, Mr. James|male| 0| +--------------------+----+--------+ only showing top 3 rows
sql
function on SparkSession to run SQL queries programmatically on temporary tablesspark.sql('SELECT Name, Sex, Survived FROM titanic WHERE Sex = "male"').show(3)
+--------------------+----+--------+ | Name| Sex|Survived| +--------------------+----+--------+ |Braund, Mr. Owen ...|male| 0| |Allen, Mr. Willia...|male| 0| | Moran, Mr. James|male| 0| +--------------------+----+--------+ only showing top 3 rows
Spark provides with a unique interface for reading/saving data, which is then implemented for multiple data storage formats : json, parquet, jdbc, orc, libsvm, csv, text.
ransomware = spark.read.json('data/ransomware.json')
ransomware.printSchema()
root |-- comment: string (nullable = true) |-- decryptor: string (nullable = true) |-- encryptionAlgorithm: string (nullable = true) |-- extensionPattern: string (nullable = true) |-- extensions: string (nullable = true) |-- iocs: string (nullable = true) |-- microsoftDetectionName: string (nullable = true) |-- microsoftInfo: string (nullable = true) |-- name: array (nullable = true) | |-- element: string (containsNull = true) |-- ransomNoteFilenames: string (nullable = true) |-- resources: array (nullable = true) | |-- element: string (containsNull = true) |-- sandbox: string (nullable = true) |-- screenshots: string (nullable = true) |-- snort: string (nullable = true)
Catalyst is an extensible query optimizer used internally by SparkSQL for planning and defining the execution of SparkSQL queries.
titanic[titanic['Sex'] == 'male'].select(['Name', 'Sex']).explain()
== Physical Plan == *Project [Name#15, Sex#16] +- *Filter (isnotnull(Sex#16) && (Sex#16 = male)) +- *FileScan csv [Name#15,Sex#16] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/workspaceperso/pyspark-interactive-lecture/notebooks/data/titanic.csv], PartitionFilters: [], PushedFilters: [IsNotNull(Sex), EqualTo(Sex,male)], ReadSchema: struct<Name:string,Sex:string>
MLlib is Spark’s machine learning (ML) library. It has an RDD-based API in maintenance mode and a Dataframe-based API.
A Transformer implements a method transform(), which converts one DataFrame into another
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="Sex", outputCol="SexIndex")
titanic_indexed = indexer.fit(titanic).transform(titanic)
titanic_indexed.show(8)
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+ |PassengerId|Survived|Pclass| Name| Sex| Age|SibSp|Parch| Ticket| Fare|Cabin|Embarked|SexIndex| +-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+ | 1| 0| 3|Braund, Mr. Owen ...| male|22.0| 1| 0| A/5 21171| 7.25| null| S| 0.0| | 2| 1| 1|Cumings, Mrs. Joh...|female|38.0| 1| 0| PC 17599|71.2833| C85| C| 1.0| | 3| 1| 3|Heikkinen, Miss. ...|female|26.0| 0| 0|STON/O2. 3101282| 7.925| null| S| 1.0| | 4| 1| 1|Futrelle, Mrs. Ja...|female|35.0| 1| 0| 113803| 53.1| C123| S| 1.0| | 5| 0| 3|Allen, Mr. Willia...| male|35.0| 0| 0| 373450| 8.05| null| S| 0.0| | 6| 0| 3| Moran, Mr. James| male|null| 0| 0| 330877| 8.4583| null| Q| 0.0| | 7| 0| 1|McCarthy, Mr. Tim...| male|54.0| 0| 0| 17463|51.8625| E46| S| 0.0| | 8| 0| 3|Palsson, Master. ...| male| 2.0| 3| 1| 349909| 21.075| null| S| 0.0| +-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+ only showing top 8 rows
An Estimator implements a method fit(), which accepts a DataFrame and produces a Model, which is a Transformer.
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["SexIndex", "Fare"], outputCol="features")
titanic_train = assembler.transform(titanic_indexed)
rf = RandomForestClassifier(labelCol="Survived", featuresCol="features", numTrees=10)
model = rf.fit(titanic_train)
model.transform(titanic_train).select(["Survived", "prediction", "probability"]).show(8)
+--------+----------+--------------------+ |Survived|prediction| probability| +--------+----------+--------------------+ | 0| 0.0|[0.94189369125263...| | 1| 1.0|[0.21883383407637...| | 1| 1.0|[0.46619780756453...| | 1| 1.0|[0.02089552238805...| | 0| 0.0|[0.87832770415448...| | 0| 0.0|[0.84656818503583...| | 0| 0.0|[0.66412205718598...| | 0| 0.0|[0.86223713039307...| +--------+----------+--------------------+ only showing top 8 rows
MLlib standardizes APIs for machine learning algorithms to make it easier to combine multiple algorithms into a single pipeline, or workflow.
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexer, assembler, rf])
model = pipeline.fit(titanic)
model.transform(titanic).select(["Survived", "prediction", "probability"]).show(8)
+--------+----------+--------------------+ |Survived|prediction| probability| +--------+----------+--------------------+ | 0| 0.0|[0.94189369125263...| | 1| 1.0|[0.21883383407637...| | 1| 1.0|[0.46619780756453...| | 1| 1.0|[0.02089552238805...| | 0| 0.0|[0.87832770415448...| | 0| 0.0|[0.84656818503583...| | 0| 0.0|[0.66412205718598...| | 0| 0.0|[0.86223713039307...| +--------+----------+--------------------+ only showing top 8 rows
Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams.
# Prepare a netcat client before launching launchSparkStreaming
import nclib
#nc = nclib.Netcat(listen=('localhost', 9999), verbose=True)
#for i in range(1000):
#nc.send_line(b'hello world')
#nc.close()
To support graph computation, GraphX extends the Spark RDD by introducing a new Graph abstraction: a directed multigraph with properties attached to each vertex and edge.
NB : No active development of Python bindings on GraphX...take a look on GraphFrames for graph computation on Dataframes, which is the unofficial GraphX Dataframe-based API.
Spark's main contribution is to enable previously disparate cluster workloads to be composed. In the following example, we build a logistic model on the titanic dataset, save it on disk and push it to spark streaming for realtime inference.
spark.stop()