Spark DataFrame 10mins#

아파치 스파크는 In-memory 기반 범용 대용량 데이터 처리 엔진입니다.

빅데이터 처리를 위해 처음 많이 쓰였던 Hadoop Map/Reduce 는 Map/Reduce 처리시 디스크 I/O로 인해서 속도의 제약이 발생하게 되어있는데 반해 스파크는 읽어온 데이터를 메모리에 저장하여 DAG(Directed Acyclic Graph) 연산을 통해 재사용하기에 Map/Reduce 대비 처리 속도 10x~100x 배 바르게 데이터 처리가 가능합니다.

그 외에도 Scala, Java 그리고 특히 Python, R API를 지원하고 데이터 처리시 Dataframe 및 SQL 문법을 지원함으로써 개발 편의성 또한 갖추고 있습니다. 그리고 Map/Reduce 와 같이 효율적인 분산 프로그램을 짜기 위해 많은 것들을 고려할 필요 없이 단지 싱글 프로그램에서 API 를 이용하듯 개발하면 분산 클러스터에서 수행시 자동으로 분산 처리가 된다는 장점이 있습니다.

이러한 특징은 분산 클러스터 뿐만 아니라 멀티 코어 머신에서도 진가를 발휘 합니다. 일반적으로 Python 및 R 언어는 싱글 코어 만을 사용하며 멀티코어 프로그래밍을 하는 것이 까다롭지만, 스파크를 이용하면 이러한 걱정 없이 멀티 코어를 모두 활용하여 데이터를 처리할 수 있습니다.

참조

import os
from pyspark.sql import SparkSession

# Python Version Mismatch Error 일 경우, os.environ 으로 직접 설정 후 실행
# Exception: Python in worker has different version 2.7 than that in driver 3.5, 
# PySpark cannot run with different minor versions.Please check environment variables 
# PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

# os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
# os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"

spark_home = os.environ.get('SPARK_HOME', None)
print(f"spark_home: {spark_home}")

spark = SparkSession.builder.master("local[*]").appName("spark")
spark = spark.config("spark.driver.memory", "1g")
spark = spark.config("spark.executor.memory", "1g")
spark = spark.config("spark.python.worker.memory", "1g")
spark = spark.getOrCreate()

sc = spark.sparkContext

spark
spark_home: /Users/comafire/Projects/skp_n4e_jupyter_sts/usr/spark-3.2.0-bin-hadoop3.2

SparkSession - in-memory

SparkContext

Spark UI

Version
v3.2.0
Master
local[*]
AppName
spark

DataFrame 생성#

# import pyspark class
from pyspark.sql import *
from pyspark.sql import functions as f
from pyspark.sql import types as t


# SparkContext 를 이용해 생성
s = sc.parallelize([
      (1, 'MacBook Pro', 2015, '15"', '16GB', '512GB SSD', 13.75, 9.48, 0.61, 4.02)
    , (2, 'MacBook', 2016, '12"', '8GB', '256GB SSD', 11.04, 7.74, 0.52, 2.03)
    , (3, 'MacBook Air', 2016, '13.3"', '8GB', '128GB SSD', 12.8, 8.94, 0.68, 2.96)
    , (4, 'iMac', 2017, '27"', '64GB', '1TB SSD', 25.6, 8.0, 20.3, 20.8)
])

columns = ['Id', 'Model', 'Year', 'ScreenSize', 'RAM', 'HDD', 'W', 'D', 'H', 'Weight']
df = spark.createDataFrame(s, columns)
df.show()
+---+-----------+----+----------+----+---------+-----+----+----+------+
| Id|      Model|Year|ScreenSize| RAM|      HDD|    W|   D|   H|Weight|
+---+-----------+----+----------+----+---------+-----+----+----+------+
|  1|MacBook Pro|2015|       15"|16GB|512GB SSD|13.75|9.48|0.61|  4.02|
|  2|    MacBook|2016|       12"| 8GB|256GB SSD|11.04|7.74|0.52|  2.03|
|  3|MacBook Air|2016|     13.3"| 8GB|128GB SSD| 12.8|8.94|0.68|  2.96|
|  4|       iMac|2017|       27"|64GB|  1TB SSD| 25.6| 8.0|20.3|  20.8|
+---+-----------+----+----------+----+---------+-----+----+----+------+
# 직접 Row 클래스를 사용해서 행 생성
department1 = Row(id='123456', name='Computer Science')
department2 = Row(id='789012', name='Mechanical Engineering')
department3 = Row(id='345678', name='Theater and Drama')
department4 = Row(id='901234', name='Indoor Recreation')

# Class 를 사용해서 행 생성
Employee = Row("firstName", "lastName", "email", "salary")
employee1 = Employee('michael', 'armbrust', 'no-reply@berkeley.edu', 100000)
employee2 = Employee('xiangrui', 'meng', 'no-reply@stanford.edu', 120000)
employee3 = Employee('matei', None, 'no-reply@waterloo.edu', 140000)
employee4 = Employee(None, 'wendell', 'no-reply@berkeley.edu', 160000)

# Create the DepartmentWithEmployees instances from Departments and Employees
Group = Row("department", "employees")
group1 = Group(department1, [employee1, employee2])
group2 = Group(department2, [employee3, employee4])

group3 = Row(department=department3, employees=[employee1, employee4])
group4 = Row(department=department4, employees=[employee2, employee3])

print(department1)
print(employee1)
print(group1)
print(group3)
Row(id='123456', name='Computer Science')
Row(firstName='michael', lastName='armbrust', email='no-reply@berkeley.edu', salary=100000)
Row(department=Row(id='123456', name='Computer Science'), employees=[Row(firstName='michael', lastName='armbrust', email='no-reply@berkeley.edu', salary=100000), Row(firstName='xiangrui', lastName='meng', email='no-reply@stanford.edu', salary=120000)])
Row(department=Row(id='345678', name='Theater and Drama'), employees=[Row(firstName='michael', lastName='armbrust', email='no-reply@berkeley.edu', salary=100000), Row(firstName=None, lastName='wendell', email='no-reply@berkeley.edu', salary=160000)])
s1 = [group1, group2]
df1 = spark.createDataFrame(s1)
df1.show()

s2 = [group3, group4]
df2 = spark.createDataFrame(s2)
df2.show(df2.count(), False)
+--------------------+--------------------+
|          department|           employees|
+--------------------+--------------------+
|{123456, Computer...|[{michael, armbru...|
|{789012, Mechanic...|[{matei, null, no...|
+--------------------+--------------------+

+---------------------------+----------------------------------------------------------------------------------------------------+
|department                 |employees                                                                                           |
+---------------------------+----------------------------------------------------------------------------------------------------+
|{345678, Theater and Drama}|[{michael, armbrust, no-reply@berkeley.edu, 100000}, {null, wendell, no-reply@berkeley.edu, 160000}]|
|{901234, Indoor Recreation}|[{xiangrui, meng, no-reply@stanford.edu, 120000}, {matei, null, no-reply@waterloo.edu, 140000}]     |
+---------------------------+----------------------------------------------------------------------------------------------------+
df_union = df1.unionAll(df2)
print(df_union.show(df_union.count(), False))
+--------------------------------+-----------------------------------------------------------------------------------------------------+
|department                      |employees                                                                                            |
+--------------------------------+-----------------------------------------------------------------------------------------------------+
|{123456, Computer Science}      |[{michael, armbrust, no-reply@berkeley.edu, 100000}, {xiangrui, meng, no-reply@stanford.edu, 120000}]|
|{789012, Mechanical Engineering}|[{matei, null, no-reply@waterloo.edu, 140000}, {null, wendell, no-reply@berkeley.edu, 160000}]       |
|{345678, Theater and Drama}     |[{michael, armbrust, no-reply@berkeley.edu, 100000}, {null, wendell, no-reply@berkeley.edu, 160000}] |
|{901234, Indoor Recreation}     |[{xiangrui, meng, no-reply@stanford.edu, 120000}, {matei, null, no-reply@waterloo.edu, 140000}]      |
+--------------------------------+-----------------------------------------------------------------------------------------------------+

None
df_explode = df_union.select(f.col("department").alias("d"), f.explode("employees").alias("e"))
df_explode = df_explode.selectExpr("d.id", "d.name as departmentName", "e.firstName", "e.lastName", "e.email", "e.salary")
#df_explode = df_explode.select(f.col("d.id"), f.col("d.name").alias("departmentName"), f.col("e.firstName"), f.col("e.lastName"), f.col("e.email"), f.col("e.salary"))

df_explode.show(3)
+------+--------------------+---------+--------+--------------------+------+
|    id|      departmentName|firstName|lastName|               email|salary|
+------+--------------------+---------+--------+--------------------+------+
|123456|    Computer Science|  michael|armbrust|no-reply@berkeley...|100000|
|123456|    Computer Science| xiangrui|    meng|no-reply@stanford...|120000|
|789012|Mechanical Engine...|    matei|    null|no-reply@waterloo...|140000|
+------+--------------------+---------+--------+--------------------+------+
only showing top 3 rows

Read/Write Parquet#

# https://docs.databricks.com/spark/latest/data-sources/read-parquet.html

path_explode = "/tmp/df_explode.parquet"
df = df_explode

df = df.repartition(1)
df.write.mode('overwrite').parquet(path_explode)
df = spark.read.parquet(path_explode)
df.show(3)

# explode를 사용할 때, explode 대상이 되는 값이 빈 리스트([])라면, 해당 Row 는 제거됩니다. 
# 만약 Row 는 유지하고 값만 null 로 처리하려면 Spark 2.2+ 부터 지원하는 explode_outer 함수를 사용하면 됩니다.
+------+--------------------+---------+--------+--------------------+------+
|    id|      departmentName|firstName|lastName|               email|salary|
+------+--------------------+---------+--------+--------------------+------+
|123456|    Computer Science|  michael|armbrust|no-reply@berkeley...|100000|
|123456|    Computer Science| xiangrui|    meng|no-reply@stanford...|120000|
|789012|Mechanical Engine...|    matei|    null|no-reply@waterloo...|140000|
+------+--------------------+---------+--------+--------------------+------+
only showing top 3 rows

Read/Write CSV#

# https://docs.databricks.com/spark/latest/data-sources/read-csv.html

path_explode = "/tmp/df_explode.csv"
df = df_explode

df = df.repartition(1)
df.write.format("csv").mode('overwrite').option("header", "true").save(path_explode)
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(path_explode)
df.show(3)
+------+--------------------+---------+--------+--------------------+------+
|    id|      departmentName|firstName|lastName|               email|salary|
+------+--------------------+---------+--------+--------------------+------+
|123456|    Computer Science|  michael|armbrust|no-reply@berkeley...|100000|
|123456|    Computer Science| xiangrui|    meng|no-reply@stanford...|120000|
|789012|Mechanical Engine...|    matei|    null|no-reply@waterloo...|140000|
+------+--------------------+---------+--------+--------------------+------+
only showing top 3 rows

Read/Write JSON#

#https://docs.databricks.com/spark/latest/data-sources/read-json.html
  
path_explode = "/tmp/df_explode.json"
df = df_explode

df = df.repartition(1)
df.write.format("json").mode('overwrite').save(path_explode)
df = spark.read.format("json").load(path_explode)
df.show(3)
+--------------------+--------------------+---------+------+--------+------+
|      departmentName|               email|firstName|    id|lastName|salary|
+--------------------+--------------------+---------+------+--------+------+
|    Computer Science|no-reply@berkeley...|  michael|123456|armbrust|100000|
|    Computer Science|no-reply@stanford...| xiangrui|123456|    meng|120000|
|Mechanical Engine...|no-reply@waterloo...|    matei|789012|    null|140000|
+--------------------+--------------------+---------+------+--------+------+
only showing top 3 rows

Convert to Pandas#

import pandas as pd

df_pandas = df_explode.toPandas()
print(df_pandas.head())

df_spark = spark.createDataFrame(df_pandas)
print(df_spark.show())
       id          departmentName firstName  lastName                  email  \
0  123456        Computer Science   michael  armbrust  no-reply@berkeley.edu   
1  123456        Computer Science  xiangrui      meng  no-reply@stanford.edu   
2  789012  Mechanical Engineering     matei      None  no-reply@waterloo.edu   
3  789012  Mechanical Engineering      None   wendell  no-reply@berkeley.edu   
4  345678       Theater and Drama   michael  armbrust  no-reply@berkeley.edu   

   salary  
0  100000  
1  120000  
2  140000  
3  160000  
4  100000  
+------+--------------------+---------+--------+--------------------+------+
|    id|      departmentName|firstName|lastName|               email|salary|
+------+--------------------+---------+--------+--------------------+------+
|123456|    Computer Science|  michael|armbrust|no-reply@berkeley...|100000|
|123456|    Computer Science| xiangrui|    meng|no-reply@stanford...|120000|
|789012|Mechanical Engine...|    matei|    null|no-reply@waterloo...|140000|
|789012|Mechanical Engine...|     null| wendell|no-reply@berkeley...|160000|
|345678|   Theater and Drama|  michael|armbrust|no-reply@berkeley...|100000|
|345678|   Theater and Drama|     null| wendell|no-reply@berkeley...|160000|
|901234|   Indoor Recreation| xiangrui|    meng|no-reply@stanford...|120000|
|901234|   Indoor Recreation|    matei|    null|no-reply@waterloo...|140000|
+------+--------------------+---------+--------+--------------------+------+

None

DataFrame 정보보기#

df = df_explode

# DataFrame 컬럼 정보 살펴보기
print(df.printSchema())
print(df.schema)
print(df.columns)
print(df.dtypes)
root
 |-- id: string (nullable = true)
 |-- departmentName: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- email: string (nullable = true)
 |-- salary: long (nullable = true)

None
StructType(List(StructField(id,StringType,true),StructField(departmentName,StringType,true),StructField(firstName,StringType,true),StructField(lastName,StringType,true),StructField(email,StringType,true),StructField(salary,LongType,true)))
['id', 'departmentName', 'firstName', 'lastName', 'email', 'salary']
[('id', 'string'), ('departmentName', 'string'), ('firstName', 'string'), ('lastName', 'string'), ('email', 'string'), ('salary', 'bigint')]
df.show(3)
df.show(3, False)
print(df.first())
print(df.head(2))
print(df.take(2))
print(df.count())
print(df.select("id").distinct().show())
print(df.select("id").distinct().count())
+------+--------------------+---------+--------+--------------------+------+
|    id|      departmentName|firstName|lastName|               email|salary|
+------+--------------------+---------+--------+--------------------+------+
|123456|    Computer Science|  michael|armbrust|no-reply@berkeley...|100000|
|123456|    Computer Science| xiangrui|    meng|no-reply@stanford...|120000|
|789012|Mechanical Engine...|    matei|    null|no-reply@waterloo...|140000|
+------+--------------------+---------+--------+--------------------+------+
only showing top 3 rows

+------+----------------------+---------+--------+---------------------+------+
|id    |departmentName        |firstName|lastName|email                |salary|
+------+----------------------+---------+--------+---------------------+------+
|123456|Computer Science      |michael  |armbrust|no-reply@berkeley.edu|100000|
|123456|Computer Science      |xiangrui |meng    |no-reply@stanford.edu|120000|
|789012|Mechanical Engineering|matei    |null    |no-reply@waterloo.edu|140000|
+------+----------------------+---------+--------+---------------------+------+
only showing top 3 rows

Row(id='123456', departmentName='Computer Science', firstName='michael', lastName='armbrust', email='no-reply@berkeley.edu', salary=100000)
[Row(id='123456', departmentName='Computer Science', firstName='michael', lastName='armbrust', email='no-reply@berkeley.edu', salary=100000), Row(id='123456', departmentName='Computer Science', firstName='xiangrui', lastName='meng', email='no-reply@stanford.edu', salary=120000)]
[Row(id='123456', departmentName='Computer Science', firstName='michael', lastName='armbrust', email='no-reply@berkeley.edu', salary=100000), Row(id='123456', departmentName='Computer Science', firstName='xiangrui', lastName='meng', email='no-reply@stanford.edu', salary=120000)]
8
+------+
|    id|
+------+
|123456|
|789012|
|901234|
|345678|
+------+

None
4

DataFrame 조작하기#

Selection#

# 열 선택
df.select("id", "departmentName", "firstName", "salary").show()

# 중복 제거
df.drop_duplicates(subset = ['firstName']).show()
+------+--------------------+---------+------+
|    id|      departmentName|firstName|salary|
+------+--------------------+---------+------+
|123456|    Computer Science|  michael|100000|
|123456|    Computer Science| xiangrui|120000|
|789012|Mechanical Engine...|    matei|140000|
|789012|Mechanical Engine...|     null|160000|
|345678|   Theater and Drama|  michael|100000|
|345678|   Theater and Drama|     null|160000|
|901234|   Indoor Recreation| xiangrui|120000|
|901234|   Indoor Recreation|    matei|140000|
+------+--------------------+---------+------+

+------+--------------------+---------+--------+--------------------+------+
|    id|      departmentName|firstName|lastName|               email|salary|
+------+--------------------+---------+--------+--------------------+------+
|789012|Mechanical Engine...|     null| wendell|no-reply@berkeley...|160000|
|123456|    Computer Science| xiangrui|    meng|no-reply@stanford...|120000|
|789012|Mechanical Engine...|    matei|    null|no-reply@waterloo...|140000|
|123456|    Computer Science|  michael|armbrust|no-reply@berkeley...|100000|
+------+--------------------+---------+--------+--------------------+------+
# 조건을 통한 선택
# Where, Filter
df.select("id", "salary").filter(df["salary"] > 140000).show()
df.select("id", "salary").where(f.col("salary") > 140000).show()

# Between
df.select("id", "salary").where(df["salary"].between(10000, 140000)).show()
+------+------+
|    id|salary|
+------+------+
|789012|160000|
|345678|160000|
+------+------+

+------+------+
|    id|salary|
+------+------+
|789012|160000|
|345678|160000|
+------+------+

+------+------+
|    id|salary|
+------+------+
|123456|100000|
|123456|120000|
|789012|140000|
|345678|100000|
|901234|120000|
|901234|140000|
+------+------+
# Like
df.select("id", "departmentName").where(df['departmentName'].like("%Com%")).show()

# Startswith, endswith
df.select("id", "departmentName").where(df['departmentName'].startswith("Indoor")).show()
df.select("id", "departmentName").where(df['departmentName'].endswith("Drama")).show()

# isin
df.select("id", "departmentName").where(df["departmentName"].isin("Computer Science", "Indoor Recreation")).show()
+------+----------------+
|    id|  departmentName|
+------+----------------+
|123456|Computer Science|
|123456|Computer Science|
+------+----------------+

+------+-----------------+
|    id|   departmentName|
+------+-----------------+
|901234|Indoor Recreation|
|901234|Indoor Recreation|
+------+-----------------+

+------+-----------------+
|    id|   departmentName|
+------+-----------------+
|345678|Theater and Drama|
|345678|Theater and Drama|
+------+-----------------+

+------+-----------------+
|    id|   departmentName|
+------+-----------------+
|123456| Computer Science|
|123456| Computer Science|
|901234|Indoor Recreation|
|901234|Indoor Recreation|
+------+-----------------+

Setting#

# 연산을 통한 컬럼 생성
df.select("id", "salary", (df["salary"] * 0.5).alias("bonus")).show(2)
df.select("id", "salary").withColumn("bonus", df["salary"] * 0.5).show(2)
df.select("id", "departmentName", (df["departmentName"].substr(1, 3)).alias("substr")).show(2)

# 조건을 통한 컬럼 생성
df.select("id", "salary", f.when(df["salary"] > 120000, "High").otherwise("Low").alias("cost")).show(3)
+------+------+-------+
|    id|salary|  bonus|
+------+------+-------+
|123456|100000|50000.0|
|123456|120000|60000.0|
+------+------+-------+
only showing top 2 rows

+------+------+-------+
|    id|salary|  bonus|
+------+------+-------+
|123456|100000|50000.0|
|123456|120000|60000.0|
+------+------+-------+
only showing top 2 rows

+------+----------------+------+
|    id|  departmentName|substr|
+------+----------------+------+
|123456|Computer Science|   Com|
|123456|Computer Science|   Com|
+------+----------------+------+
only showing top 2 rows

+------+------+----+
|    id|salary|cost|
+------+------+----+
|123456|100000| Low|
|123456|120000| Low|
|789012|140000|High|
+------+------+----+
only showing top 3 rows
# 사용자 함수를 통한 컬럼 생성
# Lambda 함수 방식
bonus = f.udf(lambda x, y: x * y, t.FloatType())
df.withColumn('bonus', bonus(df['salary'], f.lit(0.5))).show()

# Annotation 방식
@f.udf('float')
def bonus(x, y):
    return x * y

df.withColumn('bonus', bonus(df['salary'], f.lit(0.5))).show()
+------+--------------------+---------+--------+--------------------+------+-------+
|    id|      departmentName|firstName|lastName|               email|salary|  bonus|
+------+--------------------+---------+--------+--------------------+------+-------+
|123456|    Computer Science|  michael|armbrust|no-reply@berkeley...|100000|50000.0|
|123456|    Computer Science| xiangrui|    meng|no-reply@stanford...|120000|60000.0|
|789012|Mechanical Engine...|    matei|    null|no-reply@waterloo...|140000|70000.0|
|789012|Mechanical Engine...|     null| wendell|no-reply@berkeley...|160000|80000.0|
|345678|   Theater and Drama|  michael|armbrust|no-reply@berkeley...|100000|50000.0|
|345678|   Theater and Drama|     null| wendell|no-reply@berkeley...|160000|80000.0|
|901234|   Indoor Recreation| xiangrui|    meng|no-reply@stanford...|120000|60000.0|
|901234|   Indoor Recreation|    matei|    null|no-reply@waterloo...|140000|70000.0|
+------+--------------------+---------+--------+--------------------+------+-------+

+------+--------------------+---------+--------+--------------------+------+-------+
|    id|      departmentName|firstName|lastName|               email|salary|  bonus|
+------+--------------------+---------+--------+--------------------+------+-------+
|123456|    Computer Science|  michael|armbrust|no-reply@berkeley...|100000|50000.0|
|123456|    Computer Science| xiangrui|    meng|no-reply@stanford...|120000|60000.0|
|789012|Mechanical Engine...|    matei|    null|no-reply@waterloo...|140000|70000.0|
|789012|Mechanical Engine...|     null| wendell|no-reply@berkeley...|160000|80000.0|
|345678|   Theater and Drama|  michael|armbrust|no-reply@berkeley...|100000|50000.0|
|345678|   Theater and Drama|     null| wendell|no-reply@berkeley...|160000|80000.0|
|901234|   Indoor Recreation| xiangrui|    meng|no-reply@stanford...|120000|60000.0|
|901234|   Indoor Recreation|    matei|    null|no-reply@waterloo...|140000|70000.0|
+------+--------------------+---------+--------+--------------------+------+-------+
df1 = sc.parallelize([
  [1, 2, 3, 4, 5], 
  [1, 2, 3, 4, 5], 
  [1, 2, 3, 4, 5], 
]).toDF(['c1', 'c2', 'c3', 'c4', 'c5'])
df1.show()

# Array 방식을 통한 여러 컬럼 연산
def sum_two(*args):
    return args[0] + args[1]

udf_sum_two = f.udf(sum_two, t.IntegerType())
  
def sum_all(*args):
    return sum(args)

udf_sum_all = f.udf(sum_all, t.IntegerType())


# Sum Two Columns c1 + c2
df1.withColumn("sum_two", udf_sum_two(f.col("c1"), f.col("c2"))).show()

# Sum All Columns
df1.withColumn("sum_all", udf_sum_all(*[f.col(i) for i in df1.columns])).show()

# Sum Some Columns
df1.withColumn("sum_some", udf_sum_all(f.col("c1"), f.col("c2"), f.col("c3"))).show()
+---+---+---+---+---+
| c1| c2| c3| c4| c5|
+---+---+---+---+---+
|  1|  2|  3|  4|  5|
|  1|  2|  3|  4|  5|
|  1|  2|  3|  4|  5|
+---+---+---+---+---+

+---+---+---+---+---+-------+
| c1| c2| c3| c4| c5|sum_two|
+---+---+---+---+---+-------+
|  1|  2|  3|  4|  5|      3|
|  1|  2|  3|  4|  5|      3|
|  1|  2|  3|  4|  5|      3|
+---+---+---+---+---+-------+

+---+---+---+---+---+-------+
| c1| c2| c3| c4| c5|sum_all|
+---+---+---+---+---+-------+
|  1|  2|  3|  4|  5|     15|
|  1|  2|  3|  4|  5|     15|
|  1|  2|  3|  4|  5|     15|
+---+---+---+---+---+-------+

+---+---+---+---+---+--------+
| c1| c2| c3| c4| c5|sum_some|
+---+---+---+---+---+--------+
|  1|  2|  3|  4|  5|       6|
|  1|  2|  3|  4|  5|       6|
|  1|  2|  3|  4|  5|       6|
+---+---+---+---+---+--------+

Missing Value#

df.select("id", "firstName", "lastName").show()

# 결측치 체크  
df.select("id", "firstName").where(df["firstName"].isNotNull()).show()
df.select("id", "firstName").where(df["firstName"].isNull()).show()
+------+---------+--------+
|    id|firstName|lastName|
+------+---------+--------+
|123456|  michael|armbrust|
|123456| xiangrui|    meng|
|789012|    matei|    null|
|789012|     null| wendell|
|345678|  michael|armbrust|
|345678|     null| wendell|
|901234| xiangrui|    meng|
|901234|    matei|    null|
+------+---------+--------+

+------+---------+
|    id|firstName|
+------+---------+
|123456|  michael|
|123456| xiangrui|
|789012|    matei|
|345678|  michael|
|901234| xiangrui|
|901234|    matei|
+------+---------+

+------+---------+
|    id|firstName|
+------+---------+
|789012|     null|
|345678|     null|
+------+---------+
# 상수로 체우기
df.select("id", "firstName", "lastName").fillna({ 'firstName': 'Unknown', 'lastName': 'Unknown' }).show()
+------+---------+--------+
|    id|firstName|lastName|
+------+---------+--------+
|123456|  michael|armbrust|
|123456| xiangrui|    meng|
|789012|    matei| Unknown|
|789012|  Unknown| wendell|
|345678|  michael|armbrust|
|345678|  Unknown| wendell|
|901234| xiangrui|    meng|
|901234|    matei| Unknown|
+------+---------+--------+

Operation#

# 컬럼 이름 변경
df.withColumnRenamed('id', 'ID').show()

# 컬럼 삭제
df.drop("email").show()

# 통계 확인
print("n rows: {}".format(df.count()))
df.describe().show()
+------+--------------------+---------+--------+--------------------+------+
|    ID|      departmentName|firstName|lastName|               email|salary|
+------+--------------------+---------+--------+--------------------+------+
|123456|    Computer Science|  michael|armbrust|no-reply@berkeley...|100000|
|123456|    Computer Science| xiangrui|    meng|no-reply@stanford...|120000|
|789012|Mechanical Engine...|    matei|    null|no-reply@waterloo...|140000|
|789012|Mechanical Engine...|     null| wendell|no-reply@berkeley...|160000|
|345678|   Theater and Drama|  michael|armbrust|no-reply@berkeley...|100000|
|345678|   Theater and Drama|     null| wendell|no-reply@berkeley...|160000|
|901234|   Indoor Recreation| xiangrui|    meng|no-reply@stanford...|120000|
|901234|   Indoor Recreation|    matei|    null|no-reply@waterloo...|140000|
+------+--------------------+---------+--------+--------------------+------+

+------+--------------------+---------+--------+------+
|    id|      departmentName|firstName|lastName|salary|
+------+--------------------+---------+--------+------+
|123456|    Computer Science|  michael|armbrust|100000|
|123456|    Computer Science| xiangrui|    meng|120000|
|789012|Mechanical Engine...|    matei|    null|140000|
|789012|Mechanical Engine...|     null| wendell|160000|
|345678|   Theater and Drama|  michael|armbrust|100000|
|345678|   Theater and Drama|     null| wendell|160000|
|901234|   Indoor Recreation| xiangrui|    meng|120000|
|901234|   Indoor Recreation|    matei|    null|140000|
+------+--------------------+---------+--------+------+

n rows: 8
+-------+-----------------+-----------------+---------+--------+--------------------+------------------+
|summary|               id|   departmentName|firstName|lastName|               email|            salary|
+-------+-----------------+-----------------+---------+--------+--------------------+------------------+
|  count|                8|                8|        6|       6|                   8|                 8|
|   mean|         539845.0|             null|     null|    null|                null|          130000.0|
| stddev|339649.7466592818|             null|     null|    null|                null|23904.572186687874|
|    min|           123456| Computer Science|    matei|armbrust|no-reply@berkeley...|            100000|
|    max|           901234|Theater and Drama| xiangrui| wendell|no-reply@waterloo...|            160000|
+-------+-----------------+-----------------+---------+--------+--------------------+------------------+
# GroupBy
df.groupBy("departmentName").agg(f.sum('salary').alias('tot_salary'), f.mean('salary').alias('avg_salary')).show()

# Pivot
df.groupBy("departmentName").pivot('firstName').agg(f.mean('salary').alias('avg_salary')).show()
+--------------------+----------+----------+
|      departmentName|tot_salary|avg_salary|
+--------------------+----------+----------+
|   Theater and Drama|    260000|  130000.0|
|Mechanical Engine...|    300000|  150000.0|
|    Computer Science|    220000|  110000.0|
|   Indoor Recreation|    260000|  130000.0|
+--------------------+----------+----------+

+--------------------+--------+--------+--------+--------+
|      departmentName|    null|   matei| michael|xiangrui|
+--------------------+--------+--------+--------+--------+
|   Theater and Drama|160000.0|    null|100000.0|    null|
|Mechanical Engine...|160000.0|140000.0|    null|    null|
|    Computer Science|    null|    null|100000.0|120000.0|
|   Indoor Recreation|    null|140000.0|    null|120000.0|
+--------------------+--------+--------+--------+--------+
# Sort
df.sort(df['salary'].desc(), df['id'].asc()).show()
df.sort(["salary", "id"], ascending=[False, True]).show()
df.orderBy(["salary", "id"],ascending=[0, 1]).show()
+------+--------------------+---------+--------+--------------------+------+
|    id|      departmentName|firstName|lastName|               email|salary|
+------+--------------------+---------+--------+--------------------+------+
|345678|   Theater and Drama|     null| wendell|no-reply@berkeley...|160000|
|789012|Mechanical Engine...|     null| wendell|no-reply@berkeley...|160000|
|789012|Mechanical Engine...|    matei|    null|no-reply@waterloo...|140000|
|901234|   Indoor Recreation|    matei|    null|no-reply@waterloo...|140000|
|123456|    Computer Science| xiangrui|    meng|no-reply@stanford...|120000|
|901234|   Indoor Recreation| xiangrui|    meng|no-reply@stanford...|120000|
|123456|    Computer Science|  michael|armbrust|no-reply@berkeley...|100000|
|345678|   Theater and Drama|  michael|armbrust|no-reply@berkeley...|100000|
+------+--------------------+---------+--------+--------------------+------+

+------+--------------------+---------+--------+--------------------+------+
|    id|      departmentName|firstName|lastName|               email|salary|
+------+--------------------+---------+--------+--------------------+------+
|345678|   Theater and Drama|     null| wendell|no-reply@berkeley...|160000|
|789012|Mechanical Engine...|     null| wendell|no-reply@berkeley...|160000|
|789012|Mechanical Engine...|    matei|    null|no-reply@waterloo...|140000|
|901234|   Indoor Recreation|    matei|    null|no-reply@waterloo...|140000|
|123456|    Computer Science| xiangrui|    meng|no-reply@stanford...|120000|
|901234|   Indoor Recreation| xiangrui|    meng|no-reply@stanford...|120000|
|123456|    Computer Science|  michael|armbrust|no-reply@berkeley...|100000|
|345678|   Theater and Drama|  michael|armbrust|no-reply@berkeley...|100000|
+------+--------------------+---------+--------+--------------------+------+

+------+--------------------+---------+--------+--------------------+------+
|    id|      departmentName|firstName|lastName|               email|salary|
+------+--------------------+---------+--------+--------------------+------+
|345678|   Theater and Drama|     null| wendell|no-reply@berkeley...|160000|
|789012|Mechanical Engine...|     null| wendell|no-reply@berkeley...|160000|
|789012|Mechanical Engine...|    matei|    null|no-reply@waterloo...|140000|
|901234|   Indoor Recreation|    matei|    null|no-reply@waterloo...|140000|
|123456|    Computer Science| xiangrui|    meng|no-reply@stanford...|120000|
|901234|   Indoor Recreation| xiangrui|    meng|no-reply@stanford...|120000|
|123456|    Computer Science|  michael|armbrust|no-reply@berkeley...|100000|
|345678|   Theater and Drama|  michael|armbrust|no-reply@berkeley...|100000|
+------+--------------------+---------+--------+--------------------+------+

Merge#

sa = [(1, 'Pirate'),(2, 'Monkey'),(3, 'Ninja'),(4, 'Spaghetti')]
df_a = spark.createDataFrame(sa, ['a_id','a_name'])
 
sb = [(1, 'Rutabaga'),(2, 'Pirate'),(3, 'Ninja'),(4, 'Darth Vader')]
df_b = spark.createDataFrame(sb, ['b_id','b_name'])
 
df_a.show()
df_b.show()

# Join; Inner
df_join = df_a.alias('a').join(df_b.alias('b'), f.col("a.a_name") == f.col("b.b_name"), 'inner')
df_join = df_join.select(f.col("a.*"), f.col("b.*"))
df_join.show()
+----+---------+
|a_id|   a_name|
+----+---------+
|   1|   Pirate|
|   2|   Monkey|
|   3|    Ninja|
|   4|Spaghetti|
+----+---------+

+----+-----------+
|b_id|     b_name|
+----+-----------+
|   1|   Rutabaga|
|   2|     Pirate|
|   3|      Ninja|
|   4|Darth Vader|
+----+-----------+

+----+------+----+------+
|a_id|a_name|b_id|b_name|
+----+------+----+------+
|   3| Ninja|   3| Ninja|
|   1|Pirate|   2|Pirate|
+----+------+----+------+
# Join: Left
df_join = df_a.alias('a').join(df_b.alias('b'), f.col("a.a_name") == f.col("b.b_name"), 'left')
df_join = df_join.select(f.col("a.*"), f.col("b.*"))
df_join.show()

# Join: Left
df_join = df_a.alias('a').join(df_b.alias('b'), f.col("a.a_name") == f.col("b.b_name"), 'right')
df_join = df_join.select(f.col("a.*"), f.col("b.*"))
df_join.show()

# Join: Full
df_join = df_a.alias('a').join(df_b.alias('b'), f.col("a.a_name") == f.col("b.b_name"), 'full')
df_join = df_join.select(f.col("a.*"), f.col("b.*"))
df_join.show()
+----+---------+----+------+
|a_id|   a_name|b_id|b_name|
+----+---------+----+------+
|   4|Spaghetti|null|  null|
|   3|    Ninja|   3| Ninja|
|   1|   Pirate|   2|Pirate|
|   2|   Monkey|null|  null|
+----+---------+----+------+

+----+------+----+-----------+
|a_id|a_name|b_id|     b_name|
+----+------+----+-----------+
|null|  null|   1|   Rutabaga|
|   3| Ninja|   3|      Ninja|
|   1|Pirate|   2|     Pirate|
|null|  null|   4|Darth Vader|
+----+------+----+-----------+

+----+---------+----+-----------+
|a_id|   a_name|b_id|     b_name|
+----+---------+----+-----------+
|null|     null|   1|   Rutabaga|
|   4|Spaghetti|null|       null|
|   3|    Ninja|   3|      Ninja|
|   1|   Pirate|   2|     Pirate|
|   2|   Monkey|null|       null|
|null|     null|   4|Darth Vader|
+----+---------+----+-----------+

SQL Query#

# register the DataFrame as a temp table so that we can query it using SQL
df.registerTempTable("df_example")

# Perform the same query as the DataFrame above and return ``explain``
df_table = spark.sql("SELECT departmentName, SUM(salary) AS tot_salary, AVG(salary) AS avg_salary FROM df_example GROUP BY departmentName")
df_table.show()
+--------------------+----------+----------+
|      departmentName|tot_salary|avg_salary|
+--------------------+----------+----------+
|   Theater and Drama|    260000|  130000.0|
|Mechanical Engine...|    300000|  150000.0|
|    Computer Science|    220000|  110000.0|
|   Indoor Recreation|    260000|  130000.0|
+--------------------+----------+----------+
spark.stop()