Spark DataFrame 10mins
Contents
Spark DataFrame 10mins#
아파치 스파크는 In-memory 기반 범용 대용량 데이터 처리 엔진입니다.
빅데이터 처리를 위해 처음 많이 쓰였던 Hadoop Map/Reduce 는 Map/Reduce 처리시 디스크 I/O로 인해서 속도의 제약이 발생하게 되어있는데 반해 스파크는 읽어온 데이터를 메모리에 저장하여 DAG(Directed Acyclic Graph) 연산을 통해 재사용하기에 Map/Reduce 대비 처리 속도 10x~100x 배 바르게 데이터 처리가 가능합니다.
그 외에도 Scala, Java 그리고 특히 Python, R API를 지원하고 데이터 처리시 Dataframe 및 SQL 문법을 지원함으로써 개발 편의성 또한 갖추고 있습니다. 그리고 Map/Reduce 와 같이 효율적인 분산 프로그램을 짜기 위해 많은 것들을 고려할 필요 없이 단지 싱글 프로그램에서 API 를 이용하듯 개발하면 분산 클러스터에서 수행시 자동으로 분산 처리가 된다는 장점이 있습니다.
이러한 특징은 분산 클러스터 뿐만 아니라 멀티 코어 머신에서도 진가를 발휘 합니다. 일반적으로 Python 및 R 언어는 싱글 코어 만을 사용하며 멀티코어 프로그래밍을 하는 것이 까다롭지만, 스파크를 이용하면 이러한 걱정 없이 멀티 코어를 모두 활용하여 데이터를 처리할 수 있습니다.
참조
import os
from pyspark.sql import SparkSession
# Python Version Mismatch Error 일 경우, os.environ 으로 직접 설정 후 실행
# Exception: Python in worker has different version 2.7 than that in driver 3.5,
# PySpark cannot run with different minor versions.Please check environment variables
# PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.
# os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
# os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
spark_home = os.environ.get('SPARK_HOME', None)
print(f"spark_home: {spark_home}")
spark = SparkSession.builder.master("local[*]").appName("spark")
spark = spark.config("spark.driver.memory", "1g")
spark = spark.config("spark.executor.memory", "1g")
spark = spark.config("spark.python.worker.memory", "1g")
spark = spark.getOrCreate()
sc = spark.sparkContext
spark
spark_home: /Users/comafire/Projects/skp_n4e_jupyter_sts/usr/spark-3.2.0-bin-hadoop3.2
SparkSession - in-memory
DataFrame 생성#
# import pyspark class
from pyspark.sql import *
from pyspark.sql import functions as f
from pyspark.sql import types as t
# SparkContext 를 이용해 생성
s = sc.parallelize([
(1, 'MacBook Pro', 2015, '15"', '16GB', '512GB SSD', 13.75, 9.48, 0.61, 4.02)
, (2, 'MacBook', 2016, '12"', '8GB', '256GB SSD', 11.04, 7.74, 0.52, 2.03)
, (3, 'MacBook Air', 2016, '13.3"', '8GB', '128GB SSD', 12.8, 8.94, 0.68, 2.96)
, (4, 'iMac', 2017, '27"', '64GB', '1TB SSD', 25.6, 8.0, 20.3, 20.8)
])
columns = ['Id', 'Model', 'Year', 'ScreenSize', 'RAM', 'HDD', 'W', 'D', 'H', 'Weight']
df = spark.createDataFrame(s, columns)
df.show()
+---+-----------+----+----------+----+---------+-----+----+----+------+
| Id| Model|Year|ScreenSize| RAM| HDD| W| D| H|Weight|
+---+-----------+----+----------+----+---------+-----+----+----+------+
| 1|MacBook Pro|2015| 15"|16GB|512GB SSD|13.75|9.48|0.61| 4.02|
| 2| MacBook|2016| 12"| 8GB|256GB SSD|11.04|7.74|0.52| 2.03|
| 3|MacBook Air|2016| 13.3"| 8GB|128GB SSD| 12.8|8.94|0.68| 2.96|
| 4| iMac|2017| 27"|64GB| 1TB SSD| 25.6| 8.0|20.3| 20.8|
+---+-----------+----+----------+----+---------+-----+----+----+------+
# 직접 Row 클래스를 사용해서 행 생성
department1 = Row(id='123456', name='Computer Science')
department2 = Row(id='789012', name='Mechanical Engineering')
department3 = Row(id='345678', name='Theater and Drama')
department4 = Row(id='901234', name='Indoor Recreation')
# Class 를 사용해서 행 생성
Employee = Row("firstName", "lastName", "email", "salary")
employee1 = Employee('michael', 'armbrust', 'no-reply@berkeley.edu', 100000)
employee2 = Employee('xiangrui', 'meng', 'no-reply@stanford.edu', 120000)
employee3 = Employee('matei', None, 'no-reply@waterloo.edu', 140000)
employee4 = Employee(None, 'wendell', 'no-reply@berkeley.edu', 160000)
# Create the DepartmentWithEmployees instances from Departments and Employees
Group = Row("department", "employees")
group1 = Group(department1, [employee1, employee2])
group2 = Group(department2, [employee3, employee4])
group3 = Row(department=department3, employees=[employee1, employee4])
group4 = Row(department=department4, employees=[employee2, employee3])
print(department1)
print(employee1)
print(group1)
print(group3)
Row(id='123456', name='Computer Science')
Row(firstName='michael', lastName='armbrust', email='no-reply@berkeley.edu', salary=100000)
Row(department=Row(id='123456', name='Computer Science'), employees=[Row(firstName='michael', lastName='armbrust', email='no-reply@berkeley.edu', salary=100000), Row(firstName='xiangrui', lastName='meng', email='no-reply@stanford.edu', salary=120000)])
Row(department=Row(id='345678', name='Theater and Drama'), employees=[Row(firstName='michael', lastName='armbrust', email='no-reply@berkeley.edu', salary=100000), Row(firstName=None, lastName='wendell', email='no-reply@berkeley.edu', salary=160000)])
s1 = [group1, group2]
df1 = spark.createDataFrame(s1)
df1.show()
s2 = [group3, group4]
df2 = spark.createDataFrame(s2)
df2.show(df2.count(), False)
+--------------------+--------------------+
| department| employees|
+--------------------+--------------------+
|{123456, Computer...|[{michael, armbru...|
|{789012, Mechanic...|[{matei, null, no...|
+--------------------+--------------------+
+---------------------------+----------------------------------------------------------------------------------------------------+
|department |employees |
+---------------------------+----------------------------------------------------------------------------------------------------+
|{345678, Theater and Drama}|[{michael, armbrust, no-reply@berkeley.edu, 100000}, {null, wendell, no-reply@berkeley.edu, 160000}]|
|{901234, Indoor Recreation}|[{xiangrui, meng, no-reply@stanford.edu, 120000}, {matei, null, no-reply@waterloo.edu, 140000}] |
+---------------------------+----------------------------------------------------------------------------------------------------+
df_union = df1.unionAll(df2)
print(df_union.show(df_union.count(), False))
+--------------------------------+-----------------------------------------------------------------------------------------------------+
|department |employees |
+--------------------------------+-----------------------------------------------------------------------------------------------------+
|{123456, Computer Science} |[{michael, armbrust, no-reply@berkeley.edu, 100000}, {xiangrui, meng, no-reply@stanford.edu, 120000}]|
|{789012, Mechanical Engineering}|[{matei, null, no-reply@waterloo.edu, 140000}, {null, wendell, no-reply@berkeley.edu, 160000}] |
|{345678, Theater and Drama} |[{michael, armbrust, no-reply@berkeley.edu, 100000}, {null, wendell, no-reply@berkeley.edu, 160000}] |
|{901234, Indoor Recreation} |[{xiangrui, meng, no-reply@stanford.edu, 120000}, {matei, null, no-reply@waterloo.edu, 140000}] |
+--------------------------------+-----------------------------------------------------------------------------------------------------+
None
df_explode = df_union.select(f.col("department").alias("d"), f.explode("employees").alias("e"))
df_explode = df_explode.selectExpr("d.id", "d.name as departmentName", "e.firstName", "e.lastName", "e.email", "e.salary")
#df_explode = df_explode.select(f.col("d.id"), f.col("d.name").alias("departmentName"), f.col("e.firstName"), f.col("e.lastName"), f.col("e.email"), f.col("e.salary"))
df_explode.show(3)
+------+--------------------+---------+--------+--------------------+------+
| id| departmentName|firstName|lastName| email|salary|
+------+--------------------+---------+--------+--------------------+------+
|123456| Computer Science| michael|armbrust|no-reply@berkeley...|100000|
|123456| Computer Science| xiangrui| meng|no-reply@stanford...|120000|
|789012|Mechanical Engine...| matei| null|no-reply@waterloo...|140000|
+------+--------------------+---------+--------+--------------------+------+
only showing top 3 rows
Read/Write Parquet#
# https://docs.databricks.com/spark/latest/data-sources/read-parquet.html
path_explode = "/tmp/df_explode.parquet"
df = df_explode
df = df.repartition(1)
df.write.mode('overwrite').parquet(path_explode)
df = spark.read.parquet(path_explode)
df.show(3)
# explode를 사용할 때, explode 대상이 되는 값이 빈 리스트([])라면, 해당 Row 는 제거됩니다.
# 만약 Row 는 유지하고 값만 null 로 처리하려면 Spark 2.2+ 부터 지원하는 explode_outer 함수를 사용하면 됩니다.
+------+--------------------+---------+--------+--------------------+------+
| id| departmentName|firstName|lastName| email|salary|
+------+--------------------+---------+--------+--------------------+------+
|123456| Computer Science| michael|armbrust|no-reply@berkeley...|100000|
|123456| Computer Science| xiangrui| meng|no-reply@stanford...|120000|
|789012|Mechanical Engine...| matei| null|no-reply@waterloo...|140000|
+------+--------------------+---------+--------+--------------------+------+
only showing top 3 rows
Read/Write CSV#
# https://docs.databricks.com/spark/latest/data-sources/read-csv.html
path_explode = "/tmp/df_explode.csv"
df = df_explode
df = df.repartition(1)
df.write.format("csv").mode('overwrite').option("header", "true").save(path_explode)
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(path_explode)
df.show(3)
+------+--------------------+---------+--------+--------------------+------+
| id| departmentName|firstName|lastName| email|salary|
+------+--------------------+---------+--------+--------------------+------+
|123456| Computer Science| michael|armbrust|no-reply@berkeley...|100000|
|123456| Computer Science| xiangrui| meng|no-reply@stanford...|120000|
|789012|Mechanical Engine...| matei| null|no-reply@waterloo...|140000|
+------+--------------------+---------+--------+--------------------+------+
only showing top 3 rows
Read/Write JSON#
#https://docs.databricks.com/spark/latest/data-sources/read-json.html
path_explode = "/tmp/df_explode.json"
df = df_explode
df = df.repartition(1)
df.write.format("json").mode('overwrite').save(path_explode)
df = spark.read.format("json").load(path_explode)
df.show(3)
+--------------------+--------------------+---------+------+--------+------+
| departmentName| email|firstName| id|lastName|salary|
+--------------------+--------------------+---------+------+--------+------+
| Computer Science|no-reply@berkeley...| michael|123456|armbrust|100000|
| Computer Science|no-reply@stanford...| xiangrui|123456| meng|120000|
|Mechanical Engine...|no-reply@waterloo...| matei|789012| null|140000|
+--------------------+--------------------+---------+------+--------+------+
only showing top 3 rows
Convert to Pandas#
import pandas as pd
df_pandas = df_explode.toPandas()
print(df_pandas.head())
df_spark = spark.createDataFrame(df_pandas)
print(df_spark.show())
id departmentName firstName lastName email \
0 123456 Computer Science michael armbrust no-reply@berkeley.edu
1 123456 Computer Science xiangrui meng no-reply@stanford.edu
2 789012 Mechanical Engineering matei None no-reply@waterloo.edu
3 789012 Mechanical Engineering None wendell no-reply@berkeley.edu
4 345678 Theater and Drama michael armbrust no-reply@berkeley.edu
salary
0 100000
1 120000
2 140000
3 160000
4 100000
+------+--------------------+---------+--------+--------------------+------+
| id| departmentName|firstName|lastName| email|salary|
+------+--------------------+---------+--------+--------------------+------+
|123456| Computer Science| michael|armbrust|no-reply@berkeley...|100000|
|123456| Computer Science| xiangrui| meng|no-reply@stanford...|120000|
|789012|Mechanical Engine...| matei| null|no-reply@waterloo...|140000|
|789012|Mechanical Engine...| null| wendell|no-reply@berkeley...|160000|
|345678| Theater and Drama| michael|armbrust|no-reply@berkeley...|100000|
|345678| Theater and Drama| null| wendell|no-reply@berkeley...|160000|
|901234| Indoor Recreation| xiangrui| meng|no-reply@stanford...|120000|
|901234| Indoor Recreation| matei| null|no-reply@waterloo...|140000|
+------+--------------------+---------+--------+--------------------+------+
None
DataFrame 정보보기#
df = df_explode
# DataFrame 컬럼 정보 살펴보기
print(df.printSchema())
print(df.schema)
print(df.columns)
print(df.dtypes)
root
|-- id: string (nullable = true)
|-- departmentName: string (nullable = true)
|-- firstName: string (nullable = true)
|-- lastName: string (nullable = true)
|-- email: string (nullable = true)
|-- salary: long (nullable = true)
None
StructType(List(StructField(id,StringType,true),StructField(departmentName,StringType,true),StructField(firstName,StringType,true),StructField(lastName,StringType,true),StructField(email,StringType,true),StructField(salary,LongType,true)))
['id', 'departmentName', 'firstName', 'lastName', 'email', 'salary']
[('id', 'string'), ('departmentName', 'string'), ('firstName', 'string'), ('lastName', 'string'), ('email', 'string'), ('salary', 'bigint')]
df.show(3)
df.show(3, False)
print(df.first())
print(df.head(2))
print(df.take(2))
print(df.count())
print(df.select("id").distinct().show())
print(df.select("id").distinct().count())
+------+--------------------+---------+--------+--------------------+------+
| id| departmentName|firstName|lastName| email|salary|
+------+--------------------+---------+--------+--------------------+------+
|123456| Computer Science| michael|armbrust|no-reply@berkeley...|100000|
|123456| Computer Science| xiangrui| meng|no-reply@stanford...|120000|
|789012|Mechanical Engine...| matei| null|no-reply@waterloo...|140000|
+------+--------------------+---------+--------+--------------------+------+
only showing top 3 rows
+------+----------------------+---------+--------+---------------------+------+
|id |departmentName |firstName|lastName|email |salary|
+------+----------------------+---------+--------+---------------------+------+
|123456|Computer Science |michael |armbrust|no-reply@berkeley.edu|100000|
|123456|Computer Science |xiangrui |meng |no-reply@stanford.edu|120000|
|789012|Mechanical Engineering|matei |null |no-reply@waterloo.edu|140000|
+------+----------------------+---------+--------+---------------------+------+
only showing top 3 rows
Row(id='123456', departmentName='Computer Science', firstName='michael', lastName='armbrust', email='no-reply@berkeley.edu', salary=100000)
[Row(id='123456', departmentName='Computer Science', firstName='michael', lastName='armbrust', email='no-reply@berkeley.edu', salary=100000), Row(id='123456', departmentName='Computer Science', firstName='xiangrui', lastName='meng', email='no-reply@stanford.edu', salary=120000)]
[Row(id='123456', departmentName='Computer Science', firstName='michael', lastName='armbrust', email='no-reply@berkeley.edu', salary=100000), Row(id='123456', departmentName='Computer Science', firstName='xiangrui', lastName='meng', email='no-reply@stanford.edu', salary=120000)]
8
+------+
| id|
+------+
|123456|
|789012|
|901234|
|345678|
+------+
None
4
DataFrame 조작하기#
Selection#
# 열 선택
df.select("id", "departmentName", "firstName", "salary").show()
# 중복 제거
df.drop_duplicates(subset = ['firstName']).show()
+------+--------------------+---------+------+
| id| departmentName|firstName|salary|
+------+--------------------+---------+------+
|123456| Computer Science| michael|100000|
|123456| Computer Science| xiangrui|120000|
|789012|Mechanical Engine...| matei|140000|
|789012|Mechanical Engine...| null|160000|
|345678| Theater and Drama| michael|100000|
|345678| Theater and Drama| null|160000|
|901234| Indoor Recreation| xiangrui|120000|
|901234| Indoor Recreation| matei|140000|
+------+--------------------+---------+------+
+------+--------------------+---------+--------+--------------------+------+
| id| departmentName|firstName|lastName| email|salary|
+------+--------------------+---------+--------+--------------------+------+
|789012|Mechanical Engine...| null| wendell|no-reply@berkeley...|160000|
|123456| Computer Science| xiangrui| meng|no-reply@stanford...|120000|
|789012|Mechanical Engine...| matei| null|no-reply@waterloo...|140000|
|123456| Computer Science| michael|armbrust|no-reply@berkeley...|100000|
+------+--------------------+---------+--------+--------------------+------+
# 조건을 통한 선택
# Where, Filter
df.select("id", "salary").filter(df["salary"] > 140000).show()
df.select("id", "salary").where(f.col("salary") > 140000).show()
# Between
df.select("id", "salary").where(df["salary"].between(10000, 140000)).show()
+------+------+
| id|salary|
+------+------+
|789012|160000|
|345678|160000|
+------+------+
+------+------+
| id|salary|
+------+------+
|789012|160000|
|345678|160000|
+------+------+
+------+------+
| id|salary|
+------+------+
|123456|100000|
|123456|120000|
|789012|140000|
|345678|100000|
|901234|120000|
|901234|140000|
+------+------+
# Like
df.select("id", "departmentName").where(df['departmentName'].like("%Com%")).show()
# Startswith, endswith
df.select("id", "departmentName").where(df['departmentName'].startswith("Indoor")).show()
df.select("id", "departmentName").where(df['departmentName'].endswith("Drama")).show()
# isin
df.select("id", "departmentName").where(df["departmentName"].isin("Computer Science", "Indoor Recreation")).show()
+------+----------------+
| id| departmentName|
+------+----------------+
|123456|Computer Science|
|123456|Computer Science|
+------+----------------+
+------+-----------------+
| id| departmentName|
+------+-----------------+
|901234|Indoor Recreation|
|901234|Indoor Recreation|
+------+-----------------+
+------+-----------------+
| id| departmentName|
+------+-----------------+
|345678|Theater and Drama|
|345678|Theater and Drama|
+------+-----------------+
+------+-----------------+
| id| departmentName|
+------+-----------------+
|123456| Computer Science|
|123456| Computer Science|
|901234|Indoor Recreation|
|901234|Indoor Recreation|
+------+-----------------+
Setting#
# 연산을 통한 컬럼 생성
df.select("id", "salary", (df["salary"] * 0.5).alias("bonus")).show(2)
df.select("id", "salary").withColumn("bonus", df["salary"] * 0.5).show(2)
df.select("id", "departmentName", (df["departmentName"].substr(1, 3)).alias("substr")).show(2)
# 조건을 통한 컬럼 생성
df.select("id", "salary", f.when(df["salary"] > 120000, "High").otherwise("Low").alias("cost")).show(3)
+------+------+-------+
| id|salary| bonus|
+------+------+-------+
|123456|100000|50000.0|
|123456|120000|60000.0|
+------+------+-------+
only showing top 2 rows
+------+------+-------+
| id|salary| bonus|
+------+------+-------+
|123456|100000|50000.0|
|123456|120000|60000.0|
+------+------+-------+
only showing top 2 rows
+------+----------------+------+
| id| departmentName|substr|
+------+----------------+------+
|123456|Computer Science| Com|
|123456|Computer Science| Com|
+------+----------------+------+
only showing top 2 rows
+------+------+----+
| id|salary|cost|
+------+------+----+
|123456|100000| Low|
|123456|120000| Low|
|789012|140000|High|
+------+------+----+
only showing top 3 rows
# 사용자 함수를 통한 컬럼 생성
# Lambda 함수 방식
bonus = f.udf(lambda x, y: x * y, t.FloatType())
df.withColumn('bonus', bonus(df['salary'], f.lit(0.5))).show()
# Annotation 방식
@f.udf('float')
def bonus(x, y):
return x * y
df.withColumn('bonus', bonus(df['salary'], f.lit(0.5))).show()
+------+--------------------+---------+--------+--------------------+------+-------+
| id| departmentName|firstName|lastName| email|salary| bonus|
+------+--------------------+---------+--------+--------------------+------+-------+
|123456| Computer Science| michael|armbrust|no-reply@berkeley...|100000|50000.0|
|123456| Computer Science| xiangrui| meng|no-reply@stanford...|120000|60000.0|
|789012|Mechanical Engine...| matei| null|no-reply@waterloo...|140000|70000.0|
|789012|Mechanical Engine...| null| wendell|no-reply@berkeley...|160000|80000.0|
|345678| Theater and Drama| michael|armbrust|no-reply@berkeley...|100000|50000.0|
|345678| Theater and Drama| null| wendell|no-reply@berkeley...|160000|80000.0|
|901234| Indoor Recreation| xiangrui| meng|no-reply@stanford...|120000|60000.0|
|901234| Indoor Recreation| matei| null|no-reply@waterloo...|140000|70000.0|
+------+--------------------+---------+--------+--------------------+------+-------+
+------+--------------------+---------+--------+--------------------+------+-------+
| id| departmentName|firstName|lastName| email|salary| bonus|
+------+--------------------+---------+--------+--------------------+------+-------+
|123456| Computer Science| michael|armbrust|no-reply@berkeley...|100000|50000.0|
|123456| Computer Science| xiangrui| meng|no-reply@stanford...|120000|60000.0|
|789012|Mechanical Engine...| matei| null|no-reply@waterloo...|140000|70000.0|
|789012|Mechanical Engine...| null| wendell|no-reply@berkeley...|160000|80000.0|
|345678| Theater and Drama| michael|armbrust|no-reply@berkeley...|100000|50000.0|
|345678| Theater and Drama| null| wendell|no-reply@berkeley...|160000|80000.0|
|901234| Indoor Recreation| xiangrui| meng|no-reply@stanford...|120000|60000.0|
|901234| Indoor Recreation| matei| null|no-reply@waterloo...|140000|70000.0|
+------+--------------------+---------+--------+--------------------+------+-------+
df1 = sc.parallelize([
[1, 2, 3, 4, 5],
[1, 2, 3, 4, 5],
[1, 2, 3, 4, 5],
]).toDF(['c1', 'c2', 'c3', 'c4', 'c5'])
df1.show()
# Array 방식을 통한 여러 컬럼 연산
def sum_two(*args):
return args[0] + args[1]
udf_sum_two = f.udf(sum_two, t.IntegerType())
def sum_all(*args):
return sum(args)
udf_sum_all = f.udf(sum_all, t.IntegerType())
# Sum Two Columns c1 + c2
df1.withColumn("sum_two", udf_sum_two(f.col("c1"), f.col("c2"))).show()
# Sum All Columns
df1.withColumn("sum_all", udf_sum_all(*[f.col(i) for i in df1.columns])).show()
# Sum Some Columns
df1.withColumn("sum_some", udf_sum_all(f.col("c1"), f.col("c2"), f.col("c3"))).show()
+---+---+---+---+---+
| c1| c2| c3| c4| c5|
+---+---+---+---+---+
| 1| 2| 3| 4| 5|
| 1| 2| 3| 4| 5|
| 1| 2| 3| 4| 5|
+---+---+---+---+---+
+---+---+---+---+---+-------+
| c1| c2| c3| c4| c5|sum_two|
+---+---+---+---+---+-------+
| 1| 2| 3| 4| 5| 3|
| 1| 2| 3| 4| 5| 3|
| 1| 2| 3| 4| 5| 3|
+---+---+---+---+---+-------+
+---+---+---+---+---+-------+
| c1| c2| c3| c4| c5|sum_all|
+---+---+---+---+---+-------+
| 1| 2| 3| 4| 5| 15|
| 1| 2| 3| 4| 5| 15|
| 1| 2| 3| 4| 5| 15|
+---+---+---+---+---+-------+
+---+---+---+---+---+--------+
| c1| c2| c3| c4| c5|sum_some|
+---+---+---+---+---+--------+
| 1| 2| 3| 4| 5| 6|
| 1| 2| 3| 4| 5| 6|
| 1| 2| 3| 4| 5| 6|
+---+---+---+---+---+--------+
Missing Value#
df.select("id", "firstName", "lastName").show()
# 결측치 체크
df.select("id", "firstName").where(df["firstName"].isNotNull()).show()
df.select("id", "firstName").where(df["firstName"].isNull()).show()
+------+---------+--------+
| id|firstName|lastName|
+------+---------+--------+
|123456| michael|armbrust|
|123456| xiangrui| meng|
|789012| matei| null|
|789012| null| wendell|
|345678| michael|armbrust|
|345678| null| wendell|
|901234| xiangrui| meng|
|901234| matei| null|
+------+---------+--------+
+------+---------+
| id|firstName|
+------+---------+
|123456| michael|
|123456| xiangrui|
|789012| matei|
|345678| michael|
|901234| xiangrui|
|901234| matei|
+------+---------+
+------+---------+
| id|firstName|
+------+---------+
|789012| null|
|345678| null|
+------+---------+
# 상수로 체우기
df.select("id", "firstName", "lastName").fillna({ 'firstName': 'Unknown', 'lastName': 'Unknown' }).show()
+------+---------+--------+
| id|firstName|lastName|
+------+---------+--------+
|123456| michael|armbrust|
|123456| xiangrui| meng|
|789012| matei| Unknown|
|789012| Unknown| wendell|
|345678| michael|armbrust|
|345678| Unknown| wendell|
|901234| xiangrui| meng|
|901234| matei| Unknown|
+------+---------+--------+
Operation#
# 컬럼 이름 변경
df.withColumnRenamed('id', 'ID').show()
# 컬럼 삭제
df.drop("email").show()
# 통계 확인
print("n rows: {}".format(df.count()))
df.describe().show()
+------+--------------------+---------+--------+--------------------+------+
| ID| departmentName|firstName|lastName| email|salary|
+------+--------------------+---------+--------+--------------------+------+
|123456| Computer Science| michael|armbrust|no-reply@berkeley...|100000|
|123456| Computer Science| xiangrui| meng|no-reply@stanford...|120000|
|789012|Mechanical Engine...| matei| null|no-reply@waterloo...|140000|
|789012|Mechanical Engine...| null| wendell|no-reply@berkeley...|160000|
|345678| Theater and Drama| michael|armbrust|no-reply@berkeley...|100000|
|345678| Theater and Drama| null| wendell|no-reply@berkeley...|160000|
|901234| Indoor Recreation| xiangrui| meng|no-reply@stanford...|120000|
|901234| Indoor Recreation| matei| null|no-reply@waterloo...|140000|
+------+--------------------+---------+--------+--------------------+------+
+------+--------------------+---------+--------+------+
| id| departmentName|firstName|lastName|salary|
+------+--------------------+---------+--------+------+
|123456| Computer Science| michael|armbrust|100000|
|123456| Computer Science| xiangrui| meng|120000|
|789012|Mechanical Engine...| matei| null|140000|
|789012|Mechanical Engine...| null| wendell|160000|
|345678| Theater and Drama| michael|armbrust|100000|
|345678| Theater and Drama| null| wendell|160000|
|901234| Indoor Recreation| xiangrui| meng|120000|
|901234| Indoor Recreation| matei| null|140000|
+------+--------------------+---------+--------+------+
n rows: 8
+-------+-----------------+-----------------+---------+--------+--------------------+------------------+
|summary| id| departmentName|firstName|lastName| email| salary|
+-------+-----------------+-----------------+---------+--------+--------------------+------------------+
| count| 8| 8| 6| 6| 8| 8|
| mean| 539845.0| null| null| null| null| 130000.0|
| stddev|339649.7466592818| null| null| null| null|23904.572186687874|
| min| 123456| Computer Science| matei|armbrust|no-reply@berkeley...| 100000|
| max| 901234|Theater and Drama| xiangrui| wendell|no-reply@waterloo...| 160000|
+-------+-----------------+-----------------+---------+--------+--------------------+------------------+
# GroupBy
df.groupBy("departmentName").agg(f.sum('salary').alias('tot_salary'), f.mean('salary').alias('avg_salary')).show()
# Pivot
df.groupBy("departmentName").pivot('firstName').agg(f.mean('salary').alias('avg_salary')).show()
+--------------------+----------+----------+
| departmentName|tot_salary|avg_salary|
+--------------------+----------+----------+
| Theater and Drama| 260000| 130000.0|
|Mechanical Engine...| 300000| 150000.0|
| Computer Science| 220000| 110000.0|
| Indoor Recreation| 260000| 130000.0|
+--------------------+----------+----------+
+--------------------+--------+--------+--------+--------+
| departmentName| null| matei| michael|xiangrui|
+--------------------+--------+--------+--------+--------+
| Theater and Drama|160000.0| null|100000.0| null|
|Mechanical Engine...|160000.0|140000.0| null| null|
| Computer Science| null| null|100000.0|120000.0|
| Indoor Recreation| null|140000.0| null|120000.0|
+--------------------+--------+--------+--------+--------+
# Sort
df.sort(df['salary'].desc(), df['id'].asc()).show()
df.sort(["salary", "id"], ascending=[False, True]).show()
df.orderBy(["salary", "id"],ascending=[0, 1]).show()
+------+--------------------+---------+--------+--------------------+------+
| id| departmentName|firstName|lastName| email|salary|
+------+--------------------+---------+--------+--------------------+------+
|345678| Theater and Drama| null| wendell|no-reply@berkeley...|160000|
|789012|Mechanical Engine...| null| wendell|no-reply@berkeley...|160000|
|789012|Mechanical Engine...| matei| null|no-reply@waterloo...|140000|
|901234| Indoor Recreation| matei| null|no-reply@waterloo...|140000|
|123456| Computer Science| xiangrui| meng|no-reply@stanford...|120000|
|901234| Indoor Recreation| xiangrui| meng|no-reply@stanford...|120000|
|123456| Computer Science| michael|armbrust|no-reply@berkeley...|100000|
|345678| Theater and Drama| michael|armbrust|no-reply@berkeley...|100000|
+------+--------------------+---------+--------+--------------------+------+
+------+--------------------+---------+--------+--------------------+------+
| id| departmentName|firstName|lastName| email|salary|
+------+--------------------+---------+--------+--------------------+------+
|345678| Theater and Drama| null| wendell|no-reply@berkeley...|160000|
|789012|Mechanical Engine...| null| wendell|no-reply@berkeley...|160000|
|789012|Mechanical Engine...| matei| null|no-reply@waterloo...|140000|
|901234| Indoor Recreation| matei| null|no-reply@waterloo...|140000|
|123456| Computer Science| xiangrui| meng|no-reply@stanford...|120000|
|901234| Indoor Recreation| xiangrui| meng|no-reply@stanford...|120000|
|123456| Computer Science| michael|armbrust|no-reply@berkeley...|100000|
|345678| Theater and Drama| michael|armbrust|no-reply@berkeley...|100000|
+------+--------------------+---------+--------+--------------------+------+
+------+--------------------+---------+--------+--------------------+------+
| id| departmentName|firstName|lastName| email|salary|
+------+--------------------+---------+--------+--------------------+------+
|345678| Theater and Drama| null| wendell|no-reply@berkeley...|160000|
|789012|Mechanical Engine...| null| wendell|no-reply@berkeley...|160000|
|789012|Mechanical Engine...| matei| null|no-reply@waterloo...|140000|
|901234| Indoor Recreation| matei| null|no-reply@waterloo...|140000|
|123456| Computer Science| xiangrui| meng|no-reply@stanford...|120000|
|901234| Indoor Recreation| xiangrui| meng|no-reply@stanford...|120000|
|123456| Computer Science| michael|armbrust|no-reply@berkeley...|100000|
|345678| Theater and Drama| michael|armbrust|no-reply@berkeley...|100000|
+------+--------------------+---------+--------+--------------------+------+
Merge#
sa = [(1, 'Pirate'),(2, 'Monkey'),(3, 'Ninja'),(4, 'Spaghetti')]
df_a = spark.createDataFrame(sa, ['a_id','a_name'])
sb = [(1, 'Rutabaga'),(2, 'Pirate'),(3, 'Ninja'),(4, 'Darth Vader')]
df_b = spark.createDataFrame(sb, ['b_id','b_name'])
df_a.show()
df_b.show()
# Join; Inner
df_join = df_a.alias('a').join(df_b.alias('b'), f.col("a.a_name") == f.col("b.b_name"), 'inner')
df_join = df_join.select(f.col("a.*"), f.col("b.*"))
df_join.show()
+----+---------+
|a_id| a_name|
+----+---------+
| 1| Pirate|
| 2| Monkey|
| 3| Ninja|
| 4|Spaghetti|
+----+---------+
+----+-----------+
|b_id| b_name|
+----+-----------+
| 1| Rutabaga|
| 2| Pirate|
| 3| Ninja|
| 4|Darth Vader|
+----+-----------+
+----+------+----+------+
|a_id|a_name|b_id|b_name|
+----+------+----+------+
| 3| Ninja| 3| Ninja|
| 1|Pirate| 2|Pirate|
+----+------+----+------+
# Join: Left
df_join = df_a.alias('a').join(df_b.alias('b'), f.col("a.a_name") == f.col("b.b_name"), 'left')
df_join = df_join.select(f.col("a.*"), f.col("b.*"))
df_join.show()
# Join: Left
df_join = df_a.alias('a').join(df_b.alias('b'), f.col("a.a_name") == f.col("b.b_name"), 'right')
df_join = df_join.select(f.col("a.*"), f.col("b.*"))
df_join.show()
# Join: Full
df_join = df_a.alias('a').join(df_b.alias('b'), f.col("a.a_name") == f.col("b.b_name"), 'full')
df_join = df_join.select(f.col("a.*"), f.col("b.*"))
df_join.show()
+----+---------+----+------+
|a_id| a_name|b_id|b_name|
+----+---------+----+------+
| 4|Spaghetti|null| null|
| 3| Ninja| 3| Ninja|
| 1| Pirate| 2|Pirate|
| 2| Monkey|null| null|
+----+---------+----+------+
+----+------+----+-----------+
|a_id|a_name|b_id| b_name|
+----+------+----+-----------+
|null| null| 1| Rutabaga|
| 3| Ninja| 3| Ninja|
| 1|Pirate| 2| Pirate|
|null| null| 4|Darth Vader|
+----+------+----+-----------+
+----+---------+----+-----------+
|a_id| a_name|b_id| b_name|
+----+---------+----+-----------+
|null| null| 1| Rutabaga|
| 4|Spaghetti|null| null|
| 3| Ninja| 3| Ninja|
| 1| Pirate| 2| Pirate|
| 2| Monkey|null| null|
|null| null| 4|Darth Vader|
+----+---------+----+-----------+
SQL Query#
# register the DataFrame as a temp table so that we can query it using SQL
df.registerTempTable("df_example")
# Perform the same query as the DataFrame above and return ``explain``
df_table = spark.sql("SELECT departmentName, SUM(salary) AS tot_salary, AVG(salary) AS avg_salary FROM df_example GROUP BY departmentName")
df_table.show()
+--------------------+----------+----------+
| departmentName|tot_salary|avg_salary|
+--------------------+----------+----------+
| Theater and Drama| 260000| 130000.0|
|Mechanical Engine...| 300000| 150000.0|
| Computer Science| 220000| 110000.0|
| Indoor Recreation| 260000| 130000.0|
+--------------------+----------+----------+
spark.stop()