Atelier Big Data avancée 20/21
TP2 Introduction à spark et
scala
Introduction
Ce TP s'appuie sur une distribution hadoop qu'on déploie avec Docker et que l'on trouve sur
github :
[Link]
On utilise également Zeppelin. C'est un notebook qui permet d'exécuter des codes scala et
de voir de façon conviviale les résultats de certaines commandes. Ca ressemble un peu au
iPython Notebook.
Installation de l'environnement
# récupérer le docker
git clone [Link]
cd docker-single-node-hadoop
docker-compose build
docker-compose up -d
# entrer dans le docker
docker exec -ti dockersinglenodehadoop_hsn_1 bash
# aller voir zeppelin dans son navigateur
# aller sur [Link]
# volumes partagés par docker
# le répertoire docker_data du repository est monté sur /data à l'intérieur du docker
Quelques rappel des principes, les RDD dans spark
Spark manipule des RDD (Resilient Distributed Dataset). Des RDD sont des listes
immutables.
Dans spark on enchaine des traitements sur des RDD pour obtenir de nouveaux RDD pour
l'étape d'après. C'est par ces enchaînements (workflow) qu'on fait des opérations avec
spark.
On a 2 types d'opérations sur le RDD :
Les transformations ne font que définir le workflow des RDD mais n'exécutent pas
les calculs ([Link], filter(), ...)
Les actions lancent les calculs proprement dit et renvoient un résultat (count(),
saveAsTextFile,...)
RSI31 Page 1
Atelier Big Data avancée 20/21
Opérations usuelles
Créer un RDD à la main
// créer un RDD à la main
val lines = [Link](List("chien", "chat"))
// compter les lignes du RDD
[Link]()
lire un fichier présent dans HDFS
Dans le docker :
Créer un fichier texte chemin_faisant.txt contenant le texte suivant :
Marcheur, ce sont tes traces
ce chemin, et rien de plus ;
Marcheur, il n'y a pas de chemin,
Le chemin se construit en marchant.
En marchant se construit le chemin,
Et en regardant en arrière
On voit la sente que jamais
On ne foulera à nouveau.
Marcheur, il n'y a pas de chemin,
Seulement des sillages sur la mer.
Antonio Machado
# copier le fichier dans hdfs
hdfs dfs -put chemin_faisant.txt /tmp/chemin_faisant.txt
# vérifier qu'il a bien été transféré
hdfs dfs -put chemin_faisant.txt /tmp/chemin_faisant.txt
Aller dans zeppelin à l'adresse [Link] et créer un nouveau notebook.
Dans ce notebook, exécutez les commandes suivantes :
// créer un RDD avec le contenu du fichier.
// note : dans notre docker, localhost:9000 est le host et le port de hdfs.
var lines = [Link]("hdfs://localhost:9000/tmp/chemin_faisant.txt")
// compte le nombre d'élément du RDD
[Link]()
// renvoie la 1ère ligne du RDD
[Link]()
// prend les 5 premières lignes du RDD et les affiches les unes sous les autres
[Link](5).foreach(println)
RSI31 Page 2
Atelier Big Data avancée 20/21
/* retour :
lines: [Link][String] = MapPartitionsRDD[15] at textFile at <console>:25
res22: Long = 12
res25: String = Marcheur, ce sont tes traces
Marcheur, ce sont tes traces
ce chemin, et rien de plus ;
Marcheur, il n'y a pas de chemin,
Le chemin se construit en marchant.
En marchant se construit le chemin,
*/
Filtrer des lignes
On va enchaîner 2 RDD
var lines = [Link]("hdfs://localhost:9000/tmp/chemin_faisant.txt")
var cheminLines = [Link](line => [Link]("chemin"))
[Link]()
[Link]()
/** @returns
lines: [Link][String] = MapPartitionsRDD[19] at textFile at <console>:23
cheminLines: [Link][String] = MapPartitionsRDD[20] at filter at <console>:25
res28: Long = 12
res29: Long = 5
*/
Compter les mots
val input = [Link]("hdfs://localhost:9000/tmp/chemin_faisant.txt")
val words = [Link](line => [Link](" "))
val counts = [Link](word => (word, 1)).reduceByKey{case (x, y) => x + y}
[Link]("/data/transfert/tuto_spark/word_count_result")
// lire les fichiers part-00000 et part-00001 pour voir le résultat du calcul
Mettre un RDD en cache
var lines = [Link]("hdfs://localhost:9000/tmp/chemin_faisant.txt")
var cheminLines = [Link](line => [Link]("chemin"))
// ici, on indique que cheminLines doit être enregistré en cache
// on a donc pas à le recalculer 2 fois pour le count() et pour le first()
[Link]
[Link]()
[Link]()
RSI31 Page 3
Atelier Big Data avancée 20/21
Unir 2 RDD : union
Warning, le processus ne dédoublonne pas. Certaines lignes se retrouvent 2 fois dans le
RDD final.
var lines = [Link]("hdfs://localhost:9000/tmp/chemin_faisant.txt")
var cheminLines = [Link](line => [Link]("chemin"))
var marcheurLines = [Link](line => [Link]("Marcheur"))
var cheminOuMarcheurLines = [Link](marcheurLines)
[Link]
[Link]()
[Link]()
[Link]()
[Link]()
/* returns
res83: Long = 8
res84: String = ce chemin, et rien de plus ;
res85: Long = 5
res86: Long = 3
*/
map : mapping simple, 1 élément vers 1 élément
val input = [Link](List(1, 2, 3, 4))
val result = [Link](x => x + x)
[Link]()
println([Link]().mkString(","))
/* résultat
res128: Array[Int] = Array(2, 4, 6, 8)
2,4,6,8
*/
flatMap
val lines = [Link](List("hello world", "bonjour le monde"))
val words = [Link](line => [Link](" "))
[Link]()
/* renvoie :
res131: Array[String] = Array(hello, world, bonjour, le, monde)
*/
Quelques fonctions utiles
// des fonctions simples
var rdd = [Link]()
var rdd = [Link](rdd2)
var rdd = [Link](rdd2)
RSI31 Page 4
Atelier Big Data avancée 20/21
var rdd = [Link](rdd2)
// cartesian product
val letters = [Link](List("A", "B", "C"))
var digits = [Link](List(1, 2))
var rdd = [Link](digits)
[Link]()
// returns Array[(String, Int)] = Array((A,1), (A,2), (B,1), (B,2), (C,1), (C,2))
Extraction de sous-ensembles
sample permet de récupérer un sous ensemble
val sample = [Link](withReplacement, fraction, [seed])
val lines = [Link]("hdfs://localhost:9000/tmp/chemin_faisant.txt")
val extract = [Link](false, 0.5)
[Link]().foreach(println)
[Link]()
[Link]()
/* résultat non prédictible, mais la moitié de l'échantillon
Marcheur, ce sont tes traces
Le chemin se construit en marchant.
Et en regardant en arrière
On voit la sente que jamais
Marcheur, il n'y a pas de chemin,
Seulement des sillages sur la mer.
Antonio Machado
6
12
*/
Les actions possibles
// réduction simple
val input = [Link](List(1, 2, 3, 4))
[Link]((x,y) => x+y)
// renvoie 10
// fold : idem réduction mais avec une "zero value" qui est l'identité de votre opération
val input = [Link](List(1, 2, 3, 4))
[Link](0)((x,y) => x+y)
// renvoie 10 aussi
// renvoie toute la collection
[Link]()
// on devine ce que ça fait
[Link]()
[Link]()
RSI31 Page 5
Atelier Big Data avancée 20/21
[Link](3) // 3 elements
[Link](4) // 4 top elements
[Link](5)(ordering) // 5 élément ordonnées suivant la fonction fournie
[Link](false, 12) // 12 éléments au pif
[Link](func) // applique la func aux élements du RDD
// aggrégation
[Link](zeroValue)(seqOp, combOp)
[Link]() // moyenne
Retour sur la persistance
La persistance peut se faire en mémoire ou sur disque ou un peu des deux.
import [Link]
val input = [Link](List(1, 2, 3, 4))
val result = [Link](x => x + x)
[Link](StorageLevel.DISK_ONLY)
println([Link]())
println([Link]().mkString(","))
/*
5 types de persistance. Avec SER, c'est avec sérialisation. Ca prend plus de CPU et moins de place.
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK
MEMORY_AND_DISK_SER
DISK_ONLY
*/
RSI31 Page 6