PySpark menyediakan berbagai API yang dapat dimanfaatkan sesuai kebutuhan. Ada lebih dari satu cara yang dapat digunakan untuk dapat melakukan operasi DataFrame.
Selain menggunakan fungsi-fungsi python, kita dapat menggunakan ekspresi SQL untuk memanipulasi sebuah DataFrame. Ekspresi SQL tersebut dieksekusi menggunakan fungsi pyspark.sql.functions.expr(). Pada artikel ini, kita akan membahas fungsi tersebut beserta contoh penggunaannya.
Fungsi expr() di PySpark adalah bagian dari API DataFrame dalam package spark.sql.functions. Fungsi ini memungkinkan pengguna untuk melakukan transformasi menggunakan ekspresi serupa SQL.
Beberapa keuntungan dari fungsi expr():
Berikut ini package yang akan digunakan
from pyspark.sql import SparkSession
import pyspark.sql.functions as FSelanjutnya kita buat object SparkSession dengan nama aplikasi ‘Belajar Pyspark – Fungsi expr’.
spark = SparkSession.builder.appName("Belajar PySpark - expr").getOrCreate()DataFrame yang akan kita gunakan adalah sebagai berikut:
data = [['Agus','Fisika',100],['Windy','Fisika',200],
['Budi','Biologi',200],['Dina','Fisika',150],
['Bayu','Fisika',50],['Dedi','Biologi',50]]
kolom = ["nama","jurusan","nilai"]
df = spark.createDataFrame(data,kolom)
df.show()(gb 1)
Dengan fungsi expr() kita dapat menggunakan fungsi-fungsi SQL yang lebih familiar. Ekspresi SQL tersebut dapat digunakan untuk melakukan update, filter, agregasi, maupun menambah kolom baru.
Untuk mengkonversi nilai sebuah kolom menjadi huruf besar, kita dapat menggunakan fungsi SQL upper() dan fungsi withColumn():
df.withColumn("nama", F.expr("upper(nama)")).show()(gb 2)
Kita dapat melakukan filtering baris DataFrame menggunakan ekspresi SQL seperti berikut ini
df.filter(F.expr("nilai > 150")).show()(gb 3)
Untuk filter menggunakan gabungan beberapa kondisi
df.filter(F.expr("nama LIKE '%in%' AND nilai > 150")).show()(gb 4)
Operasi agregasi dapat dilakukan dengan menggunakan gabungan fungsi groupBy() dan expr() seperti contoh berikut
df.groupBy("jurusan").agg(F.expr("avg(nilai) as nilai_rata2")).show()(gb 5)
Untuk memilih beberapa kolom ataupun menambah kolom baru kita dapat menggunakan kombinasi perintah select() dan expr()
df.select(F.col("*"), F.expr('upper(nama) as nama1')).show()(gb 6)
Perintah di atas juga dapat ditulis dengan lebih singkat dengan menggunakan fungsi DataFrame.selectExpr()
df.selectExpr('*', 'upper(nama) as nama1').show()(gb 7)
Untuk mengekstraksi kolom sebuah DataFrame menggunakan ekspresi SQL, sebaiknya memilih selectExpr() dibandingkan expr(). Disamping sintaksnya lebih singkat dan jelas, kita juga tidak perlu melakukan import spark.sql.functions.
Fungsi selectExpr() dapat menerima beberapa ekspresi SQL sekaligus, yang dipisahkan dengan tanda koma (,). Oleh karena itu fungsi ini dapat mengembalikan beberapa kolom dari beberapa ekspresi SQL.
df.selectExpr("upper(nama) as nama1",
"upper(jurusan) as jurusan1",
"nilai").show()(gb 8)
Fungsi expr() juga dapat mengeksekusi ekspresi SQL yang kompleks, seperti misalnya statement kondisional menggunakan CASE WHEN.
df.withColumn("kode_jurusan",
F.expr("CASE WHEN jurusan = 'Fisika' THEN 'F'"
" WHEN jurusan = 'Biologi' THEN 'B'"
" ELSE 'NA' END")).show()(gb 9)
Untuk mengeksekusi beberapa ekspresi SQL sekaligus, misalnya untuk memilih beberapa kolom, gunakan fungsi selectExpr() seperti contoh berikut ini
df.selectExpr("nama", "jurusan",
"(CASE WHEN jurusan = 'Fisika' THEN 'F'"
" WHEN jurusan = 'Biologi' THEN 'B'"
" ELSE 'NA' END) as kode_jurusan",
"(CASE WHEN nilai < 100 THEN 'C'"
" WHEN nilai < 200 THEN 'B'"
" ELSE 'A' END) as kode_nilai",
).show()(gb 10)
Notebook untuk tutorial ini bisa diunduh di sini.
Artikel sebelumnya :