반응형

업무에서 Cluster 기반의 Database 엔진을 사용하고 있다. Database 서버 내에서 굉장히 많은 양의 PL/SQL Function을 사용 중인데, 이것을 Spark로 바꾸고 있다. 성격상 이론보다 개발부터 진행 중이지만, 이론도 꼭 필요하다고 생각하여 공부 중이다.


Spark의 구조적 API란?

  • 구조적 API는 주로 정형 데이터 (CSV, JSON, Table) 등의 데이터를 처리하기 위한 고수준의 API이다. 
  • 정형화가 가능한 데이터들은 일반적으로 Table 형태로 표현할 수 있는데, Spark는 이러한 Table 구조의 데이터들을 빠르게 연산할 수 있는 다양한 API 등을 지원해 준다. 
  • 매우 쉽게 생각하면, Database의 Query 혹은 Function 등의 기능을 제공해 주는 기능이라고 생각하면 된다.
  • API라는 말이 이질적으로 다가올 수 있는데, Spark는 실행 계획 수립과 처리에 사용하는 자체 데이터 타입 정보를 가지고 있는 Catalyst 엔진을 사용한다. 즉, Pyspark를 통해 전달하는 데이터도 Catalyst 엔진에서 자체 데이터 타입으로 변경된다. 사용자는 Spark 코드를 짠다고 하지만, 실제로는 사용자의 코드가 그대로 연산되는 것이 아니라, Catalyst 엔진에서 수행 가능하도록 만들어 놓은 API를 호출하는 것이기 때문에, API라고 부른다. 
  • 개인적인 경험상, SQL Query Function을 Spark API로 바꾸는 것이 복잡하긴 해도, 80% 이상의 변환이 가능하였다. (안 되는 부분도 몇 단계의 과정을 거치면 거의 변환이 되었다.)

구조적 데이터 종류

  • DataFrame : Run time 단계에서 데이터 타입 일치를 확인
  • DataRow : Compile 단계에서 데이터 타입의 일치 여부를 확인, 다만 Spark와 같이 JVM 기반의 언어인 Scala와 Java에서만 지원한다.

 

Schema

  • DataFrame은 Schema로 정의되는데, Schema에는 칼럼명과 각 칼럼의 데이터 타입을 정의한다.
  • Schema는 데이터 소스에서 읽거나, 직접 정의가 가능하다. 
  • 간단한 데이터 분석 등에 대해서는 타입 추론을 많이 사용하지만, 운영을 위한 ETL 등에서는 데이터 타입을 미리 지정해 주는 것이 좋다.

 

Spark 구조적 데이터 타입 

  • 기본적으로 Python은 타입추론을 하여, 변수의 타입 선언에 민감하지 않다.
  • Spark는 앞서 말했듯, Python에서 연산을 처리하는 것이 아닌, Catalyst 엔진에서 데이터의 연산을 처리하기 때문에, DataType을 지정해줘야 한다.
  • 기본적으로 Pandas의 Dataframe처럼 DataFrame 생성 시, 데이터의 타입을 추론하지만, 특정 칼럼의 전 데이터가 NULL인 경우 등, 데이터 타입 추론이 불가한 경우에는 에러가 뜰 수 있기 때문에, DataFrame 선언 시 아래의 데이터 타입을 지정해 주는 것이 좋다. 
  • DataFrame 내에는 Array나 Struct 타입의 복합 데이터 타입도 지정할 수 있다.

[Spark, Python 데이터 타입 비교 및 사용 예시]

(편의를 위해, spark session 접근 및 "import pyspark.sql.functions as F"는 생략

Spark 데이터 타입 Python 데이터 타입 pyspark import & 사용예시
ByteType bytes from pyspark.sql.types import ByteType
byte_column = F.lit(byte_value).cast(ByteType())
ShortType int from pyspark.sql.types import ShortType
short_value = 10
short_column = F.lit(short_value).cast(ShortType())
IntegerType int from pyspark.sql.types import IntegerType
int_value = 100
int_column = F.lit(int_value).cast(IntegerType())
LongType int from pyspark.sql.types import LongType
long_value = 1000
long_column = F.lit(long_value).cast(LongType())
FloatType float from pyspark.sql.types import FloatType
float_value = 3.14
float_column = F.lit(float_value).cast(FloatType())
DoubleType float from pyspark.sql.types import DoubleType
double_value = 2.71828
double_column = F.lit(double_value).cast(DoubleType())
DecimalType decimal from pyspark.sql.types import DecimalType
decimal_value = Decimal('3.14159265359')
decimal_column = F.lit(decimal_value).cast(DecimalType(10, 10))  # (precision, scale)
StringType str from pyspark.sql.types import StringType
string_value = "Hello, World!"
string_column = F.lit(string_value).cast(StringType())
BinaryType bytes from pyspark.sql.types import BinaryType
binary_value = bytes([0, 1, 2, 3, 4])
binary_column = F.lit(binary_value).cast(BinaryType())
BooleanType bool from pyspark.sql.types import BooleanType
boolean_value = True
boolean_column = F.lit(boolean_value).cast(BooleanType())
TimestampType datetime의 timestamp from pyspark.sql.types import TimestampType
from datetime import datetime
timestamp_value = datetime(2023, 9, 3, 12, 0, 0)
timestamp_column = F.lit(timestamp_value).cast(TimestampType())
DateType date from pyspark.sql.types import DateType
from datetime import date
date_value = date(2023, 9, 3)
date_column = F.lit(date_value).cast(DateType())
ArrayType list from pyspark.sql.types import ArrayType, IntegerType

python_list = [1, 2, 3, 4, 5]
array_column = F.lit(python_list).cast(ArrayType(IntegerType()))
MapType dict from pyspark.sql.types import MapType, StringType, IntegerType

python_dict = {"apple": 1, "banana": 2, "cherry": 3}

map_column = F.lit(python_dict).cast(MapType(StringType(), IntegerType()))
StructType class
(C언어의 구조체 데이터 타입)
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

python_struct = [("Alice", 30), ("Bob", 25), ("Charlie", 35)]

schema = StructType([
    StructField("name", StringType(), nullable=True),
    StructField("age", IntegerType(), nullable=True)
])

struct_column = F.lit(python_struct).cast(schema)
StructField class
(StructType의 각 필드에 대한 메타 정보 설정에 사용됨)

 

 

구조적 API의 실행 과정

  • Spark를 공부하면서 느낀 점은 Database 엔진과 매우 비슷하다는 점이다. Database 엔진은 SQL에 대해서 통계적 정보 등을 이용하여 실행 계획을 만드는데, Spark도 마찬가지이다. 
  • 구조적 API는 다음과 같이 실행된다.
    1. 다양한 언어를 통해, DataFrame/SQL 등을 이용한 코드를 작성한다.
      • 코드가 검증 전 논리적 실행 계획으로 변환된다. 이 과정은 코드의 유효성이나, 테이블, 칼럼 등의 존재 여부만 확인한다. 쉽게 말해서, 문법검사 단계만 진행한다고 생각하면 된다.
    2. 코드에서 이상이 없다면, Spark가 이 코드를 논리적 실행 계획으로 변환한다.
      • Spark의 Analyzer에 의해, 칼럼과 테이블등을 검증한다. 이 과정에서 Catalog와 DataFrame의 정보 등을 활용한다. 
      • Catalyst의 Optimizer에 의해 Predicate Pushing Down 등의 논리 계획 최적화가 이뤄진다. (Database 엔진의 Catalyst Optimizer와 거의 동일하다.)
    3. Spark의 논리적 실행 계획을 물리적 실행 계획으로 변환하고, 추가적 최적화를 진행한다.
      • 각 Plan에 대한 Cost를 고려하여 논리적 실행 계획을 물리적 실행 계획으로 변환한다. 
      • 특히, Spark는 Cluster 환경에서 많이 사용하기 때문에, 이 부분이 Spark의 강점이다. (사실 Cluster 기반의 Database 엔진의 강점이기도 하다.)
    4. Spark는 Cluster에서 물리적 실행계획(RDD 처리)을 실행한다.

 

 

구조적 API 기본 연산 

  • 구조적 API 사용을 위해, 기본적인 연산들을 알아놓아야 한다. 거의 대부분이 SQL에 있는 함수 및 쿼리이고, 동일한 동작을 한다.
  • 글 쓰는 시점에서 가장 최근인 3.4.1(23.6.20) 기준으로 작성했다. (Spark는 계속 업데이트 중이다. 사용 전에 함수를 살펴보는 것도 좋다. : https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/index.html)
  • 매우 유용한 함수들이 많지만, 나의 기준에서 많이 활용했던 기본 Function 기준으로 소개한다.

 [DataFrame 관련 함수]

 

 

  •  Select
    • SQL에서와 마찬가지로, spark에서 칼럼 조회를 위해 사용하는 API이다. Query와 동일하게 select 안에 칼럼을 지정해 주면 되고, 모든 칼럼을 의미하는 '*'도 동작한다.
# 모든 칼럼을 선택
selected_df = df.select("*")

# 특정 칼럼을 선택
name_df = df.select("name")

# 다중 칼럼을 선택 가능
column_list = ['a','b','c']
selected_df = df.select(*column_list)

# Alias 사용 가능
selected_df = df.select("name").alias("name2")
  • SelectExpr
    • SelectExpr은 select와 비슷하지만, SQL처럼 표현식을 사용가능하다. 'as'를 이용한 alias 지정도 가능하고, 숫자형 칼럼에는 연산도 가능하다. 
    • Select를 SQL에 가까운 형태로 사용할 수 있다.
# 칼럼 이름 변경하여 선택
name_renamed_df = df.selectExpr("name as full_name")

# 새로운 칼럼 추가하여 선택
age_plus_10_df = df.selectExpr("*", "age + 10 as age_plus_10")
age_double_10_df = df.selectExpr("*", "age * 2 as age_double")
  • withColumn
    • withColumn은 DataFrame에 새로운 칼럼을 추가하거나, 기존 칼럼을 변경하는 데 사용된다. 
    • 한 번에 하나의 칼럼밖에 못 바꾸기 때문에, 여러 칼럼을 바꿔야 하면, withColumn을 계속 연결해서 사용해줘야 한다.
    • withColumnRenamed를 사용하면, 칼럼명을 바꿀 수 있다. (바꿀 칼럼명, 바뀔 칼럼명)
# withColumn으로 새로운 열 추가
df = df.withColumn("square_age", df["age"]*df["age"])

# withColumnRenamed로 칼럼명 변경
df = df.withColumnRenamed("square_age", "squared_age")
  • Drop
    • Drop은 DataFrame에서 칼럼을 제거할 때 사용한다. 
    • Drop의 호출만으로 DataFrame의 칼럼이 제거되는 것이 아닌, 새로운 DataFrame에 칼럼이 제거된 상태로 할당되는 구조이기 때문에, 꼭 반환값을 받아야 한다.
# 특정 칼럼을 제거한 DataFrame 생성
df_without_age = df.drop("age")
df_without_name = df.drop(col("age"))
  • Cast
    • Cast는 DataFrame의 칼럼의 데이터타입을 변경하는 함수이다. 
# "age" 칼럼의 데이터 타입을 문자열(string)로 변환
df_casted = df.withColumn("age_string", df["age"].cast("string"))
  • Where/Filter
    • Where와 Filter는 동일한 동작을 하는 함수로, DataFrame에서 특정 조건을 만족하는 행을 선택할 때 사용된다.
    • SQL의 Where와 같다. 
    • and 조건 (&)과 or 조건 (|) 연산자를 지원하기 때문에, 조건을 하나의 filter나 where에 담을 수 있다. (중복 조건일 때는 연속해서 where or filter를 걸어도 된다.)
# Filter와 Where 조건
filtered_df = df.filter(df["age"]>=30)
where_df = df.where(df["age"]>30)

# 다중 Filter
filtered_df = df.filter(df["age"] <=20 | df["age"] > 50)
filtered_df = df.filter(df["age"] <=30 & df["name"] == "Choi")
filtered_df = df.filter(df["age"] <=30).filter(df["name"] == "Choi")
  • Distinct
    • Distinct 함수는 중복 Row를 제거하고, 고유한 Row만 선택한다.
    • SQL의 distinct와 동일하다.
    • subset 매개변수로, 특정 칼럼들을 list 형태로 전달하면, 해당 칼럼을 기준으로 Distinct가 가능하다.
# 전체 칼럼 중, 중복행 제거
distinct_df= df.distinct()

# 특정 칼럼 중복행 제거
distinct_df= df.distinct(["name"])
  • sort, orderBy
    • sort와 OrderBy는 특정 칼럼 기준으로 순서를 나열하기 위해 사용하는 함수이다.
    • 둘 다 비슷한 동작을 하지만, 사용 방법이 약간 다르다.
    • sort는 ascending 인자를 True or False로 오름차순, 내림차순을 선택하고, orderBy는 칼럼기준으로 asc함수나 desc 함수를 사용하여, 정렬 방법을 지정해줘야 한다.
    • orderBy는 각 칼럼의 정렬순을 따로 지정(어떤 칼럼은 오름차순, 어떤 칼럼은 내림차순으로)할 수 있기 때문에 유용하다.
# sort, orderby를 이용한 정렬
sorted_desc_df = df.sort(col("age"), ascending=False)
sorted_desc_df = df.orderBy(col("age"))

# 다중 칼럼 정렬
sorted_desc_df = df.sort(["age","name","city"])
sorted_df = df.sort(col("age"), ascending=True).sort(col("name"), ascending=False).sort(col("city"), ascending=True)
sorted_df = df.orderBy(col("age").asc(), col("name").desc(), col("city").asc())
  •  limit
    • limit은 DataFrame에서 특정 수의 행을 선택하여 반환하는 데 사용한다. 
    • limit을 이용하여, 특정 칼럼 이상을 Filtering 하거나, 일부 데이터만 보는 데 사용할 수 있다. 
    • 다만, 특정 순으로 정렬되어 있지 않으면, 순서가 유지되지 않을 수 있다.
    • Oracle의 rownum 조건이나, Postgresql의 limit과 동일하다.
sort_df = df.orderBy[col("age")]
limited_df = sort_df.limit(2)
  • repartiton
    • repartition은 DataFrame의 partition을 변경하거나, 재분할하는 데 사용된다.
    • 숫자를 인자로 받아서, 해당 숫자만큼의 partition으로 나눈다. (칼럼을 지정하지 않으면, 모든 칼럼을 고려하여 나눈다.)
    • Spark의 강점이 데이터 분산 처리이기 때문에, 데이터 분산을 최적화하여, 성능에 큰 영향을 미친다. 
# 단순 partition 숫자 지정
repartitioned_df = df.repartition(4)

# partition 칼럼 지정
repartitioned_df = df.repartition(4, "name", "age")
  • coalesce
    • coaleasce는 DataFrame의 partition 수를 줄이는 데 사용된다. (repartition은 증/감이 모두 가능하다는 것과 다르다.)
    • partition을 줄일 때는 repartition보다 성능이 좋기 때문에(default가 shuffle을 사용하지 않기 때문에), 줄일때는 coalesce를 사용하는 것이 좋다.
    • 숫자를 인자로 받아서,  해당 숫자만큼의 partition으로 줄인다.
    • 불필요한 partition을 줄여서 오버헤드를 줄이는 데 사용할 수 있다. 
coalesced_df = df.coalesce(2)
  • collect, show, take
    • collect, show, take는 DataFrame에서 데이터를 가져오는 데 사용하는 함수이다.
    • collect는 DataFrame의 전체 row를 로컬 메모리에 array 형태로 변수 할당할 때 사용하는 함수이다. 이를 이용해 unittest 등에 활용 가능하다. 다만, 모든 데이터를 다 불러오기 때문에, 로컬에 메모리가 충분할 때만 사용하는 것이 좋다.
    • show는 DataFrame의 일부 행을 로컬 메모리에 출력한다. Default는 20이며, 인자로 숫자를 지정해 주면, 그 숫자만큼 이 출력된다. Spark는 비동기 처리이기 때문에, 중간중간에 show를 수행해 줘야, 데이터 처리의 중간단계를 확인 가능하다.
    • take는 collect와 비슷하나, 인자로 숫자를 받아, 해당 숫자만큼의 상위 데이터들을 가져와서 array 형태로 반환한다.
df.show(2)
data = df.take(2)
collect_data = df.collect()

[기본 연산을 위한 함수]

기본적으로 pyspark에서 function은 pyspark.sql.functions에서 import 가능하다. (보통 alia로 F를 지정해서 많이 사용한다.)

 

  • Lit
    • Lit은 column에 상수값을 추가하거나, 리터럴 값을 생성할 때 사용하는 함수이다. when이나 expr이랑 같이 사용하면, 복잡한 연산등을 같이 담을 수 있다.
df = df.withColumn("country", F.lit("KOREA"))
df_with_country = df.withColumn("d_day", F.lit(50))
  • nanvl
    • nanvl은 2개의 칼럼을 받아서, 첫 번째 칼럼이 nan이 아닐 경우 첫번째 칼럼을, nan일 경우 두 번째 칼럼을 반환하는 함수이다. 
    • SQL의 nvl이나 postgresql의 coalesce와 동일하다.
df_with_age = df.withColumn("age", F.nanvl(df["age"], 0.0))
  • union
    • union은 두 개 이상의 DataFrame을 결합하여 새로운 DataFrame을 생성하는 데, 사용된다.
    •  결합하는 DataFrame은 모든 행이 동일한 데이터 타입을 가져야 한다.
    • SQL의 merge와 다른 점은 중복행을 제거하지 않는다는 것이다. (만약, 필요하다면, union 후에 distinct 처리가 필요하다.)
# 두 개의 DataFrame 생성
data1 = [("Minsu", 30), ("Daehwi", 30)]
data2 = [("Jaeeun", 28), ("JoonSung", 29)]
columns = ["Name", "Age"]
df1 = spark.createDataFrame(data1, columns)
df2 = spark.createDataFrame(data2, columns)

# 두 개의 DataFrame을 합침
union_df = df1.union(df2)
  • join
    • join은 DataFrame을 합치는 데 사용된다. 
    • SQL의 join과 같은데, SQL이 inner, outer, left, right join이 각각 있는 것에 비해, join 함수에서 인자로 결합 방식을 선택한다.
    • on에 공통 열의 이름을, how에 join 방법(inner, outer, left, right)을 지정해 준다.
    • join 후에는 두 테이블의 모든 칼럼명이 존재한다. 이때, 테이블의 alias를 지정 안 하고, 두 테이블에 공통으로 존재하는 칼럼을 참조하려 하면, 어느 테이블의 칼럼을 의미하는지 몰라서(join에 참여했든 안 했든) 에러가 뜬다. 따라서, join 전에 드라이빙 테이블과 join 테이블 모두를 alias 지정해 주고, join 후에 select로 사용할 칼럼만 남기는 것이 좋다.
# 두 DataFrame을 "Name" 열을 기준으로 inner 조인
result_df = df1.alias("A").join(df2.alias("B"), on="Name", how="inner")
result_df = result_df.select("A.Name", "A.ID", "B.Occupation")

# 두 DataFrame을 "Name"과 "EmployeeName" 열을 기준으로 inner 조인
result_df = df1.join(df2, col("Name") == col("EmployeeName"), how="inner")
  • isNull
    • isNull은 DataFrame의 특정 칼럼이 null인지를 확인하기 위한 함수이다.
    • Null일 경우 True, 아닐 경우 False를 반환한다.
filtered_df = df.filter(col("age").isNull())
  • monotonically_increasing_id
    • monotonically_increasing_id는 DataFrame에 1씩 증가하는 interger 형태의 숫자를 부여한다.
    • GroupBy 등에서 grouping 번호를 지정할 때, 매우 요긴하게 사용 가능하다.
df_with_id = df.withColumn("ID", monotonically_increasing_id())

 

 

 


SQL을 많이 쓰다 보니, Spark의 직관적이지 않은 데이터 처리 방법이 익숙하지 않다. 다음 시간에는 데이터 타입에 특화된 함수를 중심으로 알아봐야겠다. 

+ Recent posts