Belajar PySpark : SQL pada Dataframe dengan expr()

PySpark menyediakan berbagai API yang dapat dimanfaatkan sesuai kebutuhan. Ada lebih dari satu cara yang dapat digunakan untuk dapat melakukan operasi DataFrame.

Selain menggunakan fungsi-fungsi python, kita dapat menggunakan ekspresi SQL untuk memanipulasi sebuah DataFrame. Ekspresi SQL tersebut dieksekusi menggunakan fungsi pyspark.sql.functions.expr().  Pada artikel ini, kita akan membahas fungsi tersebut beserta contoh penggunaannya.

Apa itu fungsi expr() dalam PySpark?

Fungsi expr() di PySpark adalah bagian dari API DataFrame dalam package spark.sql.functions. Fungsi ini memungkinkan pengguna untuk melakukan transformasi menggunakan ekspresi serupa SQL.

Beberapa keuntungan dari fungsi expr():

  • Menyederhanakan kode sehingga lebih mudah untuk ditulis, dipahami, dan dikelola.
  • Menggunakan ekspresi Seperti SQL yang familiar, tanpa harus membuat temporary view.
  • Terintegrasi dengan Catalyst optimizer, yang memastikan bahwa transformasi dijalankan secara efisien. Hal ini menjamin kinerja yang tidak kalah dengan penggunaan fungsi-fungsi pyspark biasa.
  • Memungkinkan untuk menggunakan nilai kolom DataFrame yang ada sebagai argumen ekspresi ke fungsi Pyspark.

Berikut ini package yang akan digunakan

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

Selanjutnya kita buat object SparkSession dengan nama aplikasi ‘Belajar Pyspark – Fungsi expr’.

spark = SparkSession.builder.appName("Belajar PySpark - expr").getOrCreate()

DataFrame yang akan kita gunakan adalah sebagai berikut:

data = [['Agus','Fisika',100],['Windy','Fisika',200],
        ['Budi','Biologi',200],['Dina','Fisika',150],
        ['Bayu','Fisika',50],['Dedi','Biologi',50]]

kolom = ["nama","jurusan","nilai"]

df = spark.createDataFrame(data,kolom)
df.show()

(gb 1)

Mengeksekusi ekspresi SQL sederhana


Dengan fungsi expr() kita dapat menggunakan fungsi-fungsi SQL yang lebih familiar. Ekspresi SQL tersebut dapat digunakan untuk melakukan update, filter, agregasi, maupun menambah kolom baru.

Mengubah nilai kolom dengan withColumn dan expr


Untuk mengkonversi nilai sebuah kolom menjadi huruf besar, kita dapat menggunakan fungsi SQL upper() dan fungsi withColumn():

df.withColumn("nama", F.expr("upper(nama)")).show()

(gb 2)

Memfilter DataFrame

Kita dapat melakukan filtering baris DataFrame menggunakan ekspresi SQL seperti berikut ini

df.filter(F.expr("nilai > 150")).show()

(gb 3)

Untuk filter menggunakan gabungan beberapa kondisi

df.filter(F.expr("nama LIKE '%in%' AND nilai > 150")).show()

(gb 4)

Agregasi

Operasi agregasi dapat dilakukan dengan menggunakan gabungan fungsi groupBy() dan expr() seperti contoh berikut

df.groupBy("jurusan").agg(F.expr("avg(nilai) as nilai_rata2")).show()

(gb 5)

Memilih kolom dan menambahkan kolom baru

Untuk memilih beberapa kolom ataupun menambah kolom baru kita dapat menggunakan kombinasi perintah select() dan expr()

df.select(F.col("*"), F.expr('upper(nama) as nama1')).show()

(gb 6)

Perintah di atas juga dapat ditulis dengan lebih singkat dengan menggunakan fungsi DataFrame.selectExpr()

df.selectExpr('*', 'upper(nama) as nama1').show()

(gb 7)

Untuk mengekstraksi kolom sebuah DataFrame menggunakan ekspresi SQL, sebaiknya memilih selectExpr() dibandingkan expr(). Disamping sintaksnya lebih singkat dan jelas, kita juga tidak perlu melakukan import spark.sql.functions.

Fungsi selectExpr()  dapat menerima beberapa ekspresi SQL sekaligus, yang dipisahkan dengan tanda koma (,). Oleh karena itu fungsi ini dapat mengembalikan beberapa kolom dari beberapa ekspresi SQL.

df.selectExpr("upper(nama) as nama1",
              "upper(jurusan) as jurusan1",
              "nilai").show()

(gb 8)

Mengeksekusi ekspresi SQL yang kompleks

Fungsi expr() juga dapat mengeksekusi ekspresi SQL yang kompleks, seperti misalnya statement kondisional menggunakan CASE WHEN.

df.withColumn("kode_jurusan",
  F.expr("CASE WHEN jurusan = 'Fisika' THEN 'F'"
    " WHEN jurusan = 'Biologi' THEN 'B'"
    " ELSE 'NA' END")).show()

(gb 9)

Untuk mengeksekusi beberapa ekspresi SQL sekaligus, misalnya untuk memilih beberapa kolom, gunakan fungsi selectExpr() seperti contoh berikut ini

df.selectExpr("nama", "jurusan",
  "(CASE WHEN jurusan = 'Fisika' THEN 'F'"
      " WHEN jurusan = 'Biologi' THEN 'B'"
      " ELSE 'NA' END) as kode_jurusan",
  "(CASE WHEN nilai < 100 THEN 'C'"
      " WHEN nilai < 200 THEN 'B'"
      " ELSE 'A' END) as kode_nilai",
  ).show()

(gb 10)

Notebook untuk tutorial ini bisa diunduh di sini.

Artikel sebelumnya :

Sebuah program edukasi yang disusun oleh Solusi247 untuk membangun talenta data Indonesia

Contact Us

Segitiga Emas Business Park
Jl. Prof. Dr. Satrio KAV 6, Setia Budi
Jakarta Selatan
P: +62 21 579 511 32
M: info@datalearns247.com