Spark : Operations on RDD in Spark

Map Transformation  :


Map Transformation is the transformation which takes every item in RDD and produces an RDD with modified items by applying function .


Example :

val distData = sc.parallelize("Hello", "Data", "engineers")
val newData = distData.map (line => line.length() )

Output :

( 4, 4, 9 )

In the above example, it accepts the three data items in to distData RDD . When Map Transformation with function length is done and throws the Values in resultant RDD .


Filter Transformation  :


Filter Transformation filters the data items from input RDD which qualifies the condition provided .It Provides data items in the output to the resultant RDD for all the data items in which condition becomes false .


>>val RDD1 = sc.parallelize(List(10,20,30,40,50)) 

>>val FilterRDD= data.filter(x => x!=20) 

>>FilterRDD.collect()

Output :

Array[int] = Array(10,30,40,50)


Flat Map Transformations:


Flat Map is the transformation in which every data item in the input RDD will make 0 or more items in the resultant RDD . All the data items are flattened in the resultant RDD .

Example :

sc.parallelize([1,2,3,4,5]).flatMap(lambda x: [x, x*x]).collect()

Output:
[1, 1,  2, 4, 3, 9, 4, 16, 5, 25]

similarly when the same function is applied on map transformation below is the output

sc.parallelize([3,4,5]).map(lambda x: [x,  x*x]).collect()

Output:
[[1, 1],[2, 4] , [3, 9], [4, 16], [5, 25]]

Map Partition Transformations :


MapPartition transformation is a transformation in which the function will be applied on each partition of an RDD at once instead of every data item in the RDD . Mappartition optimises the  performance in spark .It  holds the memory utilized for computing the function untill the function is executed at partition level.
It can be better explained with below scenario


val data = sc.parallelize(List(10,20,30,40,50,60,70,80,90), 3)

Map:

def sumfuncmap(numbers : Int) : Int =
{
var sum = 10

return sum + numbers
}

data.map(sumfuncmap).collect

returns Array[Int] = Array(  20, 30, 40, 50, 60, 70, 80, 90,100   )

Map Partition :


def sumfuncpartition(numbers : Iterator[Int]) : Iterator[Int] =
{
var sum = 1
while(numbers.hasNext)
{
sum = sum + numbers.next()
}
return Iterator(sum)
}

data.mapPartitions(sumfuncpartition).collect

returns


Array[Int] = Array(21, 51,81)    

MappartitionwithIndex Transformation:

It is same as the  mappartition transformation for which it adds the  index to the data items in the dataset , if the RDD is partitioned the index will be done on top of the partition of dataitems.

example :

> val parallel = sc.parallelize(1 to 9)

> parallel.mapPartitionsWithIndex ( (index: Int, it: Iterator[Int]) => it.toList.map(x => index + ", "+x).iterator).collect

res1: Array[String] = Array(0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 7, 9)

Mappartition at Partition level :

> val RDD1= sc.parallelize(1 to 9, 3)
> RDD1.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => it.toList.map(x => index + ", "+x).iterator).collect
res2: Array[String] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2, 8, 2, 9)

Union Transformation : 

It is the set Operation Transformation which is used to append the data items across the RDDs . If we have duplicate elements in the RDD , we use distinct keyword to remove the data as duplicate.

Example : 

val R1 = sc.parallelize(List("H","e","l","l","o"))
val R2 = sc.parallelize(List("T","e","a","m"))
val result = R1.union(R2).collect()

List( "H","e","l","l","o","T","e","a","m")


Intersection Transformation : 

It is the set Operation Transformation which is used to  pull common data items across the RDDs . If we have duplicate elements in the RDD , we use distinct keyword to remove the data as duplicate.


Example : 

val R1 = sc.parallelize(List("H","e","l","l","o"))
val R2 = sc.parallelize(List("T","e","a","m"))
val result = R1.union(R2).collect()


List( "e","e")


Distinct Transformation : 

This Transformation removes the duplicate data items among the RDD.

Example : 

val R1 = sc.parallelize(List("H","e","l","l","o"))
val R2 = sc.parallelize(List("T","e","a","m"))
val result = R1.union(R2).collect().distinct()

List( "e")


GroupbyKey Transformation : 

groupByKey transformation is  transformation which will group the data elements in an RDD based on Key , provided RDD should be in Key Value Pair ( [K,V]) Pair format . It performs shuffling.


Example : 

Val rdd1 = sc.parallelize(List("We are here for learning  we  are here to grow "))

Val rdd2=rdd1.flatMap(words => words.split(" ")).map(x=>(x,1)).collect()

Array((We,1) ( are , 1 ) , (here , 1) , (for,1)  , (learning ) ,(we ,1) , (are , 1), (here , 1) ,(to,1),(grow))

rdd2.groupByKey().collect()
Array((We,CompactBuffer(1, 1)),(are,CompactBuffer(1, 1))(here,CompactBuffer(1, 1))(for,CompactBuffer(1, 1))(learning,CompactBuffer(1, 1))(to,CompactBuffer(1, 1))(grow,CompactBuffer(1, 1)))



ReducebyKey Transformation : 

reduceByKey transformation is  transformation which will group the data elements in an RDD based on Key and perform the necessary instruction provided  , provided RDD should be in Key Value Pair ( [K,V]) Pair format . It is used as an alternative to groupByKey as it performs large data set shuffling in optimised manner.

Example : 

val rdd1 = sc.parallelize(Seq(5,10),(5,15),(4,8),(4,12),(5,20),(10,50)))

val reducedByKey = RDD1.reduceByKey(_ + _)

Output : 

Array((5,45),(4,20),(10,50))

sortByKey Transformation : 

sortByKey transformation is a transformation in which the data elements in the RDD are ordered ascending , descending or custom order based on the key value in key value pair (K,V) .

Example : 

val RDD1 = sc.parallelize(Seq( ("Parle",10),("Britania" ,70) , ("Apple",20),("Creamtreat",90))

val sorted1 = RDD1.sortByKey().collect()

Output : 

Array(("Apple",20), ("Britania",70) ,("Creamtreat",90),("Parle",10))


Join Transformation : 

Join Transformation is a transormation which joins two RDDs in to one RDD .

// List containing (customer_id, (last_name, last_name))

val nameRDD1 = List((101, ("Mark", "Zuckenburg")), 
              (102, ("Silvester", "Stallone")), 
              (103, ("John", "Cena")),
              (104, ("Rocky", "Balboa")))
val subscriptions = sc.parallelize(as) // Pair Rdd with key = customer_Id, value = (last_name, subsription_card_name)

// List containing (customer_id, Item). Contains all customer who has done the order

val ls = List((101, "Shoes"), 
              (101, "Watch"), 
              (102, "Mobile"), 
              (102, "Groceries"), 
              (103, "Medicine"), 
              (103, "Cool Drink"))


val locations = sc.parallelize(ls)

subscriptions.join(locations): 

(101,((Mark,Zuckenburg),Shoes))
(101,((Mark,Zuckenburg),Watch))
(102,((Silvester,Stallone),Mobile))
(102,((Silvester,Stallone),Groceries))
(103,((John,Cena),Medicine))
(103,((John,Cena),Cool Drink))

Cartesian Transformation : 

Cartesian Transformation is a transformation which performs Cartesian of join present in both the RDDs , which means every element in RDD1 joins with every element in RDD2 and forms another set in the Data .

Example : 

>>val RDD1=sc.parallelize(Array("A","B","C"))

>>val RDD2=sc.parallelize(Array("1","2"))

>>RDD1.cartesian(RDD2).collect()

Array((A,1),A,2),(B,1),(B,2),(C,1),(C,2))

Pipe Transformation : 

Pipe Transformation allows us to use the functionality out of scala and Java such as Fortran , Shell , Perl etc.. for the elements in its RDD . 

Example :

>>val RDD1=sc.parallelize(Array("A","B","C"))

>>RDD1.pipe("echo 'Pipe used here' ")

COALESCE Transformation : 

Coalesce transformation which is a kind of change in partitions in RDD in which the data shuffling will happen with less local data movement . This transformations are used for performance tuning purpose and reduce the I/O Operations . 

Example : 

>>Val RDD1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 4)

>>RDD1.collect()

Partition 0001: 1, 2
Partition 0002: 3, 4, 5
Partition 0003: 6, 7

Partition 0004: 8, 9, 10

>> RDD1.coalesce(2)

Partition 0001: 1, 2, 3, 4, 5

Partition 0003: 6, 7, 8, 9, 10

Repartition Transformation : 

Repartition transformation which is a kind of change in partitions in RDD in which the full data shuffle happens to create equal number of items in each partition. This transformations are used for performance tuning purpose and reduce the I/O Operations .

Example : 

>> Val RDD1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 4)

>> RDD1.collect()

Partition 0001: 1, 2
Partition 0002: 3, 4, 5
Partition 0003: 6, 7

Partition 0004: 8, 9, 10

>> RDD1.coalesce(5)

Partition 0001: 1,2
Partition 0002: 3,4 
Partition 0003: 5,6

Partition 0004: 7,8
Partition 0005: 9,10