PySpark APIs

Document here all the APIs that are useful in the pyspark

Focus on key apis

APIs that are used often

Here is the API reference

Python reference is here

range(3,6) returns a list of numbers between 3 and 6: 3,4,5

pyspark flatmap

Search for: pyspark flatmap

Difference between map and flatMap transformations in Spark (pySpark)

Property SparkSession.read API is here


pyspark.sql.SparkSession

spark = SparkSession        .builder        .appName("PythonWordCount")        .getOrCreate()

DataFrameReader api

How to read a file using DataFrameReader: text() is here


>>> df = spark.read.text('python/test_support/sql/text-test.txt')
>>> df.collect()
[Row(value='hello'), Row(value='this')]
>>> df = spark.read.text('python/test_support/sql/text-test.txt', wholetext=True)
>>> df.collect()
[Row(value='hello\nthis')]

the returned element can be a singleton like a string or value

Or it can be a list or a tuple

it is still considered a single value and left as such

Once the list is returned it will split that list into multiple elements and add them to the data set. So when a flatmap function returns a list of 10 elements, 10 items are added to the main list. In case of map this would have been a single element with 10 embedded values.

How does reduceByKey works for multi vlaued objects in spark

Search for: How does reduceByKey works for multi vlaued objects in spark


###############################################
# RDD APIs tested:
#   flatmap
#   map
#   sort
#   collect
#
# Others:
#  type(), len()
#  string.split(), .lower(), .format()
#
# General reminders in Python
#  Things inside () are tuples
#  Things inside [] are lists
#
#  Key libraries used
#  pyspark.sql
#  pyspark.rdd
###############################################

from __future__ import print_function

import sys
from operator import add

from pyspark.sql import SparkSession

#***********************************
#Function: printCollected
#***********************************
def printBeginMsg(msg):
    print ("*****************************")
    print ("*" + msg)
    print ("*****************************")

def printEndMsg(msg):
    print ("*****************************")
    print ("* End of " + msg)
    print ("*****************************")

#RDDs are distributed and stay distributed
#until a collect() (or another function that require gathering) is called
#This is a utility function for this demo only
#In practicce collect is called very few times
def printCollected(msg, rdd):
    rddCollected = rdd.collect()
    printBeginMsg(msg)
    for item in rddCollected:
        print (item)
    printEndMsg(msg)

#***********************************
#End of function
#***********************************

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: wordcount <file>", file=sys.stderr)
        sys.exit(-1)

    spark = SparkSession        .builder        .appName("PythonWordCount")        .getOrCreate()


    #
    # spark.read is a property
    # it returns a dataframereader
    #
    lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
    printCollected("Raw lines", lines)

    lineAsListOfWords = lines.map(lambda x: x.split(' ')) 
    printCollected("Raw lines split into words. Each line is a list of words",         lineAsListOfWords)

    justAListOfWords = lineAsListOfWords.flatMap(lambda x: x).map(lambda x: x.lower())
    printCollected("Just A List of Words, from flatmap", justAListOfWords)

    #***************************************************************
    # make each word a list which is (word, length-of-thatword)
    # WordObject: (word, length, howmany)
    #***************************************************************
    listOfWordObjects = justAListOfWords.map(lambda x: (x, len(x), 1))
    printCollected("List of Word Objects", listOfWordObjects)

    #***************************************************************
    # Even though there are duplicated words each words length is 
    # determined multiple times.
    # Lets reduce the list first
    # WordObject2: (word, howmany)
    #***************************************************************
    listOfWordObjects2 = justAListOfWords.map(lambda x: (x, 1))
    printCollected("List of Word Objects 2, no length", listOfWordObjects2)

    #***************************************************************
    #Lets count the similarwords together
    #Although I call it a list, it is really an RDD
    #The lambda function returns a tuple
    #likely the map on the RDD will convert that to an RDD again
    #So the output is really not a python object but spark object
    #***************************************************************
    listOfUniqueWordObjects2RDD = listOfWordObjects2.reduceByKey(add)
    printCollected("List of Unique Word Objects 2, no length", listOfUniqueWordObjects2RDD)

    #***************************************************************
    #This will print 
    #Type of .map on an RDD is: <class 'pyspark.rdd.PipelinedRDD'>
    #***************************************************************
    print("Type of .map on an RDD is: {}".format(type(listOfUniqueWordObjects2RDD)));

    #***************************************************************
    # WordObject3: (word, length, howmany)
    #***************************************************************
    listOfUniqueWordObjects3RDD = listOfUniqueWordObjects2RDD.map(lambda x: (x[0], len(x[0]), x[1]))
    printCollected("List of Unique Word Objects 3, with length", listOfUniqueWordObjects3RDD)

    #***************************************************************
    #sort words by length
    #***************************************************************
    listOfUniqueWordObjects3SoryByLength =         listOfUniqueWordObjects3RDD.sortBy(            lambda x: (x[1]), False)
    printCollected("List of Unique Word Objects sorted by length",         listOfUniqueWordObjects3SoryByLength)

    #***************************************************************
    #sort words by Frequency
    #***************************************************************
    listOfUniqueWordObjects3SoryByFreq =         listOfUniqueWordObjects3RDD.sortBy(            lambda x: (x[2]), False)
    printCollected("List of Unique Word Objects sorted by frequency",         listOfUniqueWordObjects3SoryByFreq)

    spark.stop()

When forty winters shall beseige thy brow,
And dig deep trenches in thy beauty's field,
Thy youth's proud livery, so gazed on now,
Will be a tatter'd weed, of small worth held:
Then being ask'd where all thy beauty lies,
Where all the treasure of thy lusty days,
To say, within thine own deep-sunken eyes,
Were an all-eating shame and thriftless praise.
How much more praise deserved thy beauty's use,
If thou couldst answer 'This fair child of mine
Shall sum my count and make my old excuse,'
Proving his beauty by succession thine!
This were to be new made when thou art old,
And see thy blood warm when thou feel'st it cold.

*****************************
*Raw lines
*****************************
When forty winters shall beseige thy brow,
And dig deep trenches in thy beauty's field,
Thy youth's proud livery, so gazed on now,
Will be a tatter'd weed, of small worth held:
Then being ask'd where all thy beauty lies,
Where all the treasure of thy lusty days,
To say, within thine own deep-sunken eyes,
Were an all-eating shame and thriftless praise.
How much more praise deserved thy beauty's use,
If thou couldst answer 'This fair child of mine
Shall sum my count and make my old excuse,'
Proving his beauty by succession thine!
This were to be new made when thou art old,
And see thy blood warm when thou feel'st it cold.
*****************************
* End of Raw lines
*****************************
*****************************
*Raw lines split into words. Each line is a list of words
*****************************
['When', 'forty', 'winters', 'shall', 'beseige', 'thy', 'brow,']
['And', 'dig', 'deep', 'trenches', 'in', 'thy', "beauty's", 'field,']
['Thy', "youth's", 'proud', 'livery,', 'so', 'gazed', 'on', 'now,']
['Will', 'be', 'a', "tatter'd", 'weed,', 'of', 'small', 'worth', 'held:']
['Then', 'being', "ask'd", 'where', 'all', 'thy', 'beauty', 'lies,']
['Where', 'all', 'the', 'treasure', 'of', 'thy', 'lusty', 'days,']
['To', 'say,', 'within', 'thine', 'own', 'deep-sunken', 'eyes,']
['Were', 'an', 'all-eating', 'shame', 'and', 'thriftless', 'praise.']
['How', 'much', 'more', 'praise', 'deserved', 'thy', "beauty's", 'use,']
['If', 'thou', 'couldst', 'answer', "'This", 'fair', 'child', 'of', 'mine']
['Shall', 'sum', 'my', 'count', 'and', 'make', 'my', 'old', "excuse,'"]
['Proving', 'his', 'beauty', 'by', 'succession', 'thine!']
['This', 'were', 'to', 'be', 'new', 'made', 'when', 'thou', 'art', 'old,']
['And', 'see', 'thy', 'blood', 'warm', 'when', 'thou', "feel'st", 'it', 'cold.']
*****************************
* End of Raw lines split into words. Each line is a list of words
*****************************

*****************************
*Just A List of Words, from flatmap
*****************************
when
forty
winters
shall
beseige
thy
brow,
and
...
..

and lot more

*****************************
*List of Word Objects
*****************************
('when', 4, 1)
('forty', 5, 1)
('winters', 7, 1)
('shall', 5, 1)
('beseige', 7, 1)
('thy', 3, 1)
('brow,', 5, 1)
('and', 3, 1)
('dig', 3, 1)
('deep', 4, 1)
('trenches', 8, 1)
('in', 2, 1)

.....

*****************************
*List of Word Objects 2, no length
*****************************
('when', 1)
('forty', 1)
('winters', 1)
('shall', 1)
('beseige', 1)
('thy', 1)
('brow,', 1)
....

*****************************
*List of Unique Word Objects 2, no length
*****************************
('when', 3)
('forty', 1)
('winters', 1)
('shall', 2)
('beseige', 1)
('thy', 7)
('brow,', 1)
....

*****************************
*List of Unique Word Objects 3, with length
*****************************
('when', 4, 3)
('forty', 5, 1)
('winters', 7, 1)
('shall', 5, 2)
('beseige', 7, 1)
('thy', 3, 7)
('brow,', 5, 1)

.....

*****************************
*List of Unique Word Objects sorted by length
*****************************
('deep-sunken', 11, 1)
('all-eating', 10, 1)
('thriftless', 10, 1)
('succession', 10, 1)
('trenches', 8, 1)
("beauty's", 8, 2)
("tatter'd", 8, 1)
('treasure', 8, 1)
('deserved', 8, 1)
("excuse,'", 8, 1)
('winters', 7, 1)
('beseige', 7, 1)
("youth's", 7, 1)
('livery,', 7, 1)
('praise.', 7, 1)
('couldst', 7, 1)
('proving', 7, 1)
("feel'st", 7, 1)
('field,', 6, 1)
('beauty', 6, 2)
('within', 6, 1)
('praise', 6, 1)
('answer', 6, 1)
('thine!', 6, 1)
('forty', 5, 1)
('shall', 5, 2)
('brow,', 5, 1)
....

*****************************
*List of Unique Word Objects sorted by frequency
*****************************
('thy', 3, 7)
('and', 3, 4)
('when', 4, 3)
('of', 2, 3)
('thou', 4, 3)
('shall', 5, 2)
("beauty's", 8, 2)
('be', 2, 2)
('where', 5, 2)
('all', 3, 2)
('beauty', 6, 2)
('to', 2, 2)
('were', 4, 2)
('my', 2, 2)

....

At some point I will post the github links for these


@echo off

@rem the spark examples are at
@rem c:\satya\i\spark\examples\src\main\python
@rem notice the spark bin directory in its installation path

@rem *****************************************************
@rem this is how to submit a spark job using .py file
@rem example: rs1.cmd wordcount.py sonnets2.txt
@rem 
@rem rs1.cmd : This batch file
@rem wordcount.py : pyspark program
@rem sonnet2.txt: input argument
@rem
@rem pwd: C:\satya\data\code\pyspark 
@rem     \wordcount.py
@rem     \sonnet2.txt
@rem
@rem *****************************************************
c:\satya\i\spark\bin\spark-submit --master local[4] %1 %2 %3