PySpark APIs
satya - 9/6/2019, 9:13:34 AM
Goal
Document here all the APIs that are useful in the pyspark
Focus on key apis
APIs that are used often
satya - 9/7/2019, 11:18:28 AM
range(3,6) returns a list of numbers between 3 and 6: 3,4,5
range(3,6) returns a list of numbers between 3 and 6: 3,4,5
satya - 9/7/2019, 11:25:05 AM
Difference between map and flatMap transformations in Spark (pySpark)
Difference between map and flatMap transformations in Spark (pySpark)
satya - 9/7/2019, 11:53:17 AM
Property SparkSession.read API is here
satya - 9/7/2019, 11:54:26 AM
The SparkSession is
pyspark.sql.SparkSession
satya - 9/7/2019, 11:54:49 AM
Here is how to establish it
spark = SparkSession .builder .appName("PythonWordCount") .getOrCreate()
satya - 9/7/2019, 11:58:44 AM
How to read a file using DataFrameReader: text() is here
satya - 9/7/2019, 11:59:03 AM
Example
>>> df = spark.read.text('python/test_support/sql/text-test.txt')
>>> df.collect()
[Row(value='hello'), Row(value='this')]
>>> df = spark.read.text('python/test_support/sql/text-test.txt', wholetext=True)
>>> df.collect()
[Row(value='hello\nthis')]
satya - 9/7/2019, 6:14:41 PM
A map takes an element and returns an element
the returned element can be a singleton like a string or value
Or it can be a list or a tuple
it is still considered a single value and left as such
satya - 9/7/2019, 6:16:52 PM
A flatmap takes an element and interprets what is returned as a list explicitly
Once the list is returned it will split that list into multiple elements and add them to the data set. So when a flatmap function returns a list of 10 elements, 10 items are added to the main list. In case of map this would have been a single element with 10 embedded values.
satya - 9/7/2019, 6:17:36 PM
How does reduceByKey works for multi vlaued objects in spark
How does reduceByKey works for multi vlaued objects in spark
Search for: How does reduceByKey works for multi vlaued objects in spark
satya - 9/11/2019, 2:05:54 PM
A first sample program: Analyzing Sonnet2
###############################################
# RDD APIs tested:
# flatmap
# map
# sort
# collect
#
# Others:
# type(), len()
# string.split(), .lower(), .format()
#
# General reminders in Python
# Things inside () are tuples
# Things inside [] are lists
#
# Key libraries used
# pyspark.sql
# pyspark.rdd
###############################################
from __future__ import print_function
import sys
from operator import add
from pyspark.sql import SparkSession
#***********************************
#Function: printCollected
#***********************************
def printBeginMsg(msg):
print ("*****************************")
print ("*" + msg)
print ("*****************************")
def printEndMsg(msg):
print ("*****************************")
print ("* End of " + msg)
print ("*****************************")
#RDDs are distributed and stay distributed
#until a collect() (or another function that require gathering) is called
#This is a utility function for this demo only
#In practicce collect is called very few times
def printCollected(msg, rdd):
rddCollected = rdd.collect()
printBeginMsg(msg)
for item in rddCollected:
print (item)
printEndMsg(msg)
#***********************************
#End of function
#***********************************
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: wordcount <file>", file=sys.stderr)
sys.exit(-1)
spark = SparkSession .builder .appName("PythonWordCount") .getOrCreate()
#
# spark.read is a property
# it returns a dataframereader
#
lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
printCollected("Raw lines", lines)
lineAsListOfWords = lines.map(lambda x: x.split(' '))
printCollected("Raw lines split into words. Each line is a list of words", lineAsListOfWords)
justAListOfWords = lineAsListOfWords.flatMap(lambda x: x).map(lambda x: x.lower())
printCollected("Just A List of Words, from flatmap", justAListOfWords)
#***************************************************************
# make each word a list which is (word, length-of-thatword)
# WordObject: (word, length, howmany)
#***************************************************************
listOfWordObjects = justAListOfWords.map(lambda x: (x, len(x), 1))
printCollected("List of Word Objects", listOfWordObjects)
#***************************************************************
# Even though there are duplicated words each words length is
# determined multiple times.
# Lets reduce the list first
# WordObject2: (word, howmany)
#***************************************************************
listOfWordObjects2 = justAListOfWords.map(lambda x: (x, 1))
printCollected("List of Word Objects 2, no length", listOfWordObjects2)
#***************************************************************
#Lets count the similarwords together
#Although I call it a list, it is really an RDD
#The lambda function returns a tuple
#likely the map on the RDD will convert that to an RDD again
#So the output is really not a python object but spark object
#***************************************************************
listOfUniqueWordObjects2RDD = listOfWordObjects2.reduceByKey(add)
printCollected("List of Unique Word Objects 2, no length", listOfUniqueWordObjects2RDD)
#***************************************************************
#This will print
#Type of .map on an RDD is: <class 'pyspark.rdd.PipelinedRDD'>
#***************************************************************
print("Type of .map on an RDD is: {}".format(type(listOfUniqueWordObjects2RDD)));
#***************************************************************
# WordObject3: (word, length, howmany)
#***************************************************************
listOfUniqueWordObjects3RDD = listOfUniqueWordObjects2RDD.map(lambda x: (x[0], len(x[0]), x[1]))
printCollected("List of Unique Word Objects 3, with length", listOfUniqueWordObjects3RDD)
#***************************************************************
#sort words by length
#***************************************************************
listOfUniqueWordObjects3SoryByLength = listOfUniqueWordObjects3RDD.sortBy( lambda x: (x[1]), False)
printCollected("List of Unique Word Objects sorted by length", listOfUniqueWordObjects3SoryByLength)
#***************************************************************
#sort words by Frequency
#***************************************************************
listOfUniqueWordObjects3SoryByFreq = listOfUniqueWordObjects3RDD.sortBy( lambda x: (x[2]), False)
printCollected("List of Unique Word Objects sorted by frequency", listOfUniqueWordObjects3SoryByFreq)
spark.stop()
satya - 9/11/2019, 2:06:30 PM
Sonnet2.txt
When forty winters shall beseige thy brow,
And dig deep trenches in thy beauty's field,
Thy youth's proud livery, so gazed on now,
Will be a tatter'd weed, of small worth held:
Then being ask'd where all thy beauty lies,
Where all the treasure of thy lusty days,
To say, within thine own deep-sunken eyes,
Were an all-eating shame and thriftless praise.
How much more praise deserved thy beauty's use,
If thou couldst answer 'This fair child of mine
Shall sum my count and make my old excuse,'
Proving his beauty by succession thine!
This were to be new made when thou art old,
And see thy blood warm when thou feel'st it cold.
satya - 9/11/2019, 2:07:11 PM
First two sets of output
*****************************
*Raw lines
*****************************
When forty winters shall beseige thy brow,
And dig deep trenches in thy beauty's field,
Thy youth's proud livery, so gazed on now,
Will be a tatter'd weed, of small worth held:
Then being ask'd where all thy beauty lies,
Where all the treasure of thy lusty days,
To say, within thine own deep-sunken eyes,
Were an all-eating shame and thriftless praise.
How much more praise deserved thy beauty's use,
If thou couldst answer 'This fair child of mine
Shall sum my count and make my old excuse,'
Proving his beauty by succession thine!
This were to be new made when thou art old,
And see thy blood warm when thou feel'st it cold.
*****************************
* End of Raw lines
*****************************
*****************************
*Raw lines split into words. Each line is a list of words
*****************************
['When', 'forty', 'winters', 'shall', 'beseige', 'thy', 'brow,']
['And', 'dig', 'deep', 'trenches', 'in', 'thy', "beauty's", 'field,']
['Thy', "youth's", 'proud', 'livery,', 'so', 'gazed', 'on', 'now,']
['Will', 'be', 'a', "tatter'd", 'weed,', 'of', 'small', 'worth', 'held:']
['Then', 'being', "ask'd", 'where', 'all', 'thy', 'beauty', 'lies,']
['Where', 'all', 'the', 'treasure', 'of', 'thy', 'lusty', 'days,']
['To', 'say,', 'within', 'thine', 'own', 'deep-sunken', 'eyes,']
['Were', 'an', 'all-eating', 'shame', 'and', 'thriftless', 'praise.']
['How', 'much', 'more', 'praise', 'deserved', 'thy', "beauty's", 'use,']
['If', 'thou', 'couldst', 'answer', "'This", 'fair', 'child', 'of', 'mine']
['Shall', 'sum', 'my', 'count', 'and', 'make', 'my', 'old', "excuse,'"]
['Proving', 'his', 'beauty', 'by', 'succession', 'thine!']
['This', 'were', 'to', 'be', 'new', 'made', 'when', 'thou', 'art', 'old,']
['And', 'see', 'thy', 'blood', 'warm', 'when', 'thou', "feel'st", 'it', 'cold.']
*****************************
* End of Raw lines split into words. Each line is a list of words
*****************************
satya - 9/11/2019, 2:07:47 PM
From flatmap
*****************************
*Just A List of Words, from flatmap
*****************************
when
forty
winters
shall
beseige
thy
brow,
and
...
..
and lot more
satya - 9/11/2019, 2:08:15 PM
List of word objects
*****************************
*List of Word Objects
*****************************
('when', 4, 1)
('forty', 5, 1)
('winters', 7, 1)
('shall', 5, 1)
('beseige', 7, 1)
('thy', 3, 1)
('brow,', 5, 1)
('and', 3, 1)
('dig', 3, 1)
('deep', 4, 1)
('trenches', 8, 1)
('in', 2, 1)
.....
satya - 9/11/2019, 2:08:33 PM
Next
*****************************
*List of Word Objects 2, no length
*****************************
('when', 1)
('forty', 1)
('winters', 1)
('shall', 1)
('beseige', 1)
('thy', 1)
('brow,', 1)
....
satya - 9/11/2019, 2:09:01 PM
Next
*****************************
*List of Unique Word Objects 2, no length
*****************************
('when', 3)
('forty', 1)
('winters', 1)
('shall', 2)
('beseige', 1)
('thy', 7)
('brow,', 1)
....
satya - 9/11/2019, 2:09:23 PM
Next
*****************************
*List of Unique Word Objects 3, with length
*****************************
('when', 4, 3)
('forty', 5, 1)
('winters', 7, 1)
('shall', 5, 2)
('beseige', 7, 1)
('thy', 3, 7)
('brow,', 5, 1)
.....
satya - 9/11/2019, 2:09:43 PM
Next
*****************************
*List of Unique Word Objects sorted by length
*****************************
('deep-sunken', 11, 1)
('all-eating', 10, 1)
('thriftless', 10, 1)
('succession', 10, 1)
('trenches', 8, 1)
("beauty's", 8, 2)
("tatter'd", 8, 1)
('treasure', 8, 1)
('deserved', 8, 1)
("excuse,'", 8, 1)
('winters', 7, 1)
('beseige', 7, 1)
("youth's", 7, 1)
('livery,', 7, 1)
('praise.', 7, 1)
('couldst', 7, 1)
('proving', 7, 1)
("feel'st", 7, 1)
('field,', 6, 1)
('beauty', 6, 2)
('within', 6, 1)
('praise', 6, 1)
('answer', 6, 1)
('thine!', 6, 1)
('forty', 5, 1)
('shall', 5, 2)
('brow,', 5, 1)
....
satya - 9/11/2019, 2:10:01 PM
Next
*****************************
*List of Unique Word Objects sorted by frequency
*****************************
('thy', 3, 7)
('and', 3, 4)
('when', 4, 3)
('of', 2, 3)
('thou', 4, 3)
('shall', 5, 2)
("beauty's", 8, 2)
('be', 2, 2)
('where', 5, 2)
('all', 3, 2)
('beauty', 6, 2)
('to', 2, 2)
('were', 4, 2)
('my', 2, 2)
....
satya - 9/11/2019, 2:10:39 PM
At some point I will post the github links for these
At some point I will post the github links for these
satya - 7/12/2020, 6:55:05 PM
Submitting pyspark program to run on windows: a batch file rs1.bat
@echo off
@rem the spark examples are at
@rem c:\satya\i\spark\examples\src\main\python
@rem notice the spark bin directory in its installation path
@rem *****************************************************
@rem this is how to submit a spark job using .py file
@rem example: rs1.cmd wordcount.py sonnets2.txt
@rem
@rem rs1.cmd : This batch file
@rem wordcount.py : pyspark program
@rem sonnet2.txt: input argument
@rem
@rem pwd: C:\satya\data\code\pyspark
@rem \wordcount.py
@rem \sonnet2.txt
@rem
@rem *****************************************************
c:\satya\i\spark\bin\spark-submit --master local[4] %1 %2 %3