반응형

SQL에서 가장 유용하다고 생각하는 기능을 집계함수이다. 테이블 내에서 Grouping을 통해 통계값을 추출하고, 값을 모아 연산하는 과정이 다른 절차형 언어들에 비해 SQL이 가지는 특장점이라고 생각한다. Spark의 구조적 API도 SQL의 기본적인 기능을 제공하기 때문에, 마찬가지로 집계 연산 함수를 위한 API를 제공한다. 


Spark 집계 연산 함수란?

  • 집계함수는 키나 그룹을 지정하고, 하나 이상의 칼럼을 특정 연산으로 모아서, 그룹별로 결과 내는 데 사용하는 함수이다.
  • 기본적으로 Spark의 집계함수는 pyspark.sql.functions 내에 정의되어 있다.
  • SQL의 문법과 거의 비슷하게 지원하지만, SQL의 모든 기능을 100% 지원하지는 않는다. (2023년 9월 기준)

 

Spark의 groupBy

  • Spark의 groupBy는 데이터를 특정 열 기준으로 그룹화하고, 그룹별로 집계 연산을 수행하는 데 사용된다.
  • Spark의 DataFrame에서 groupBy 함수에 그룹화를 위한 칼럼을 인자로 넣어주고, agg 함수의 인자로 집계 연산 식을 넣어준다. 
  • Spark는 groupBy 작업을 여러 파티션에 병렬로 분산 처리하기 때문에, 효율적 그룹화와 집계가 가능하다.
from sklearn import datasets
import pandas as pd
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

if __name__ == '__main__':
    iris = datasets.load_iris()
    df = pd.DataFrame(data=iris.data, columns=iris.feature_names)
    df["species"] = iris.target

    spark = SparkSession.builder.appName("iris").getOrCreate()
    df = spark.createDataFrame(df)

    grouped_df = df.groupBy(F.col("species")).agg(F.avg(F.col("petal width (cm)")).alias("avg petal width (cm)"))

    grouped_df.show()

Spark의 partitionBy

  • SQL에서 지원하는 partition by를 Spark에서도 사용할 수 있다.
  • groupBy와의 차이는 groupBy는 개별데이터가 없어지고, 그룹화에 참여하지 않은 칼럼 정보가 사라지지만, partitionBy는 그렇지 않다는 점이다. (group by 하고, join을 한 것 동일한 효과를 낸다.)
  • pyspark.sql.window에 Window를 사용하여, window(그룹화 방식)을 설정해주고, 집계함수 뒤에 over 함수에 window를 넣어줘서 partitionBy를 수행한다.
from sklearn import datasets
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as F

if __name__ == '__main__':
    iris = datasets.load_iris()
    df = pd.DataFrame(data=iris.data, columns=iris.feature_names)
    df["species"] = iris.target

    spark = SparkSession.builder.appName("iris").getOrCreate()
    df = spark.createDataFrame(df)

    window = Window.partitionBy(F.col("species"))

    partitioned_df = df.select("*", F.avg(F.col("petal width (cm)")).over(window).alias("avg petal width (cm)"))

    partitioned_df.show()

 

Spark의 집계 연산 함수 종류

Pyspark에서 집계함수를 위한 API도 pyspark.sql.functions에 선언되어 있다. 
 
[Agg 함수]

  • count
    • 가장 기본적인 집계함수로 해당 그룹에 포함되는 row의 개수를 세는데 사용한다.
    • spark에서는 SQL과 동일하게 count라는 API를 제공한다.
grouped_df = df.groupBy(F.col("species")).agg(F.count(F.col("petal width (cm)")))
grouped_df.show()

 

  • countDistinct 
    • 집계를 하다보면, 단순 포함되는 element 개수가 중요한 경우도 있지만, 값 자체가 중요한 경우가 많다.
    • 중복 값을 제외하고, 고유한 값의 개수를 집계한다.
    • SQL에서는 count(distinct column)의 형식으로 표현되지만, spark에서는 countDistinct라는 별도의 API로 집계한다.
grouped_df = df.groupBy(F.col("species")).agg(F.countDistinct(F.col("petal width (cm)")))
grouped_df.show()
  • approx_count_distinct
    • 기본적으로 count distinct 연산은 속도와 메모리를 많이 사용한다. 고윳값의 개수를 정확히 알지 않아도 되는 연산등에서는 이러한 연산은 불필요한 cost를 발생시킨다.
    • 예를 들어, 통계분포를 사용하기 위해, 대략 고윳값이 30개 이상인지 아닌지를 파악하기 위해서, countDistinct 함수를 사용하는 것은 매우 비효율적이다.
    • pyspark에서는 approx_count_distinct 함수를 제공하여, 대략적인 고윳값의 개수를 파악할 수 있다. approx_count_distinct 함수는 HyperLogLog 알고리즘을 사용하여, 근사치를 사용하여 계산한다. (메모리 내의 데이터 분포로 전체 데이터의 고윳값 개수를 추정)
    • relativeSD 인자를 통해, 정확성을 조정 가능하다. (기본값은 0.05) 정확성이 높아질수록, 메모리 사용량 및 연산 시간은 늘어남.
grouped_df = df.groupBy(F.col("species")).agg(F.approx_count(F.col("petal width (cm)")))
grouped_df.show()

 

  • first, last
    • 특정 순서로 정렬되어 있는 데이터들에서 그룹의 첫 값이나, 마지막 값이 필요한 경우가 있다. 
    • Spark에서는 first, last 함수를 제공하여, 해당 그룹의 첫번째 행의 값과 마지막 행의 값을 표기할 수 있다. 
    • first와 last가 유의미한 값을 갖기 위해서는 orderBy와 함께 사용되어야 한다. first와 last는 행의 정렬 기준으로 값을 추출하기 때문에 정렬되지 않은 DataFrame에서 유의미한 결과가 아닐 수 있다.
df = df.orderBy(F.col("petal width (cm)"))
grouped_df = df.groupBy(F.col("species")).agg(F.first(F.col("petal width (cm)")), F.last(F.col("petal width (cm)")))
grouped_df.show()

 

  • min, max
    • 통계값의 꽃 min, max이다. Spark에서도 모두 활용 가능하다.
    • min, max는 수치형 데이터 타입, 소수형 데이터 타입, timestamp 데이터 타입, datetype 등에 모두 사용 가능하다. 
    • 문자형 데이터 타입에서도 사용할 수 있는데, 사전순으로 정렬하여 가장 큰 값을 찾는다.
df = df.orderBy(F.col("petal width (cm)"))
grouped_df = df.groupBy(F.col("species")).agg(F.avg(F.col("petal width (cm)")), F.min(F.col("petal width (cm)")),
                                              F.max(F.col("petal width (cm)")))
grouped_df.show()
  • avg, sum
    • avg, sum도 Spark에서 활용 가능하다.
    • avg, sum은 수치형 데이터 타입, 소수형 데이터 타입에만 활용 가능하다. 
grouped_df = df.groupBy(F.col("species")).agg(F.avg(F.col("petal width (cm)")), F.sum(F.col("petal width (cm)")))
grouped_df.show()

 

  • sumDistinct
    • countDistinct와 같이, 중복을 제외하고 고유한 값들을 더하기 위해, sumDistinct가 존재한다.
grouped_df = df.groupBy(F.col("species")).agg(F.avgDistinct(F.col("petal width (cm)")))
grouped_df.show()

 

  • collect_list, collect_set
    • 그룹에 있는 모든 원소의 값들이 필요한 경우도 존재한다. 이 경우 collect_list와 collect_set을 이용하여, 각각 칼럼 값들을 원소로 한 list와 set을 구할 수 있다.
    • 특히, 그룹화 후, 파이썬등으로 인자를 넘길때, collect_list와 collect set을 많이 활용한다.
    • 다만, 그룹화 시에, row 순서가 유지되지 않기 때문에 주의해야 한다.
grouped_df = df.groupBy(F.col("species")).agg(F.collect_list(F.col("petal width (cm)")),
                                              F.collect_set(F.col("petal width (cm)")))
grouped_df.show()

 

  • mode
    • Spark 3.4 버전에서 새로 등장한, 최빈값을 추출하는 집계함수이다.
grouped_df = df.groupBy(F.col("species")).agg(F.mode(F.col("petal width (cm)")))
grouped_df.show()
  • product
    • 그룹 내의 모든 값을 곱하는 집계함수이다. 
grouped_df = df.groupBy(F.col("species")).agg(F.product(F.col("petal width (cm)")))
grouped_df.show()

 
이 밖에도, 표준편차나 분산, 첨도 등을 구하는 다양한 함수들이 존재하니, Spark 공식 문서를 참고하는 것이 좋다.
https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html

Functions — PySpark 3.4.1 documentation

Computes the first argument into a string from a binary using the provided character set (one of ‘US-ASCII’, ‘ISO-8859-1’, ‘UTF-8’, ‘UTF-16BE’, ‘UTF-16LE’, ‘UTF-16’).

spark.apache.org

 
 
[Window 함수]

  • row_number, rank, dense_rank
    • 그룹화한 후에, 그룹 내에서 순위등을 매기는 경우가 있다. 이러한 경우에는 row_number나 rank, dense_rank를 사용할 수 있다.
    • row_number와 rank, dense_rank는 window에 orderBy 절을 추가해야 한다.
    • row_number는 정렬 후 row 순대로 값이 나오기 때문에 빈값이 없지만, 값이 같은 경우 orderBy 된 순서가 달라질 수 있다. rank의 경우에는 빈값이 존재 가능하다. (1위 동률이 많으면, 2등이 없다.) dense_rank는 빈값 없이 순위를 매긴다. 
window = Window.partitionBy(F.col("species")).orderBy(F.col("petal width (cm)"))
partitioned_df = df.select("*", F.row_number().over(window).alias("row_num"), F.rank().over(window).alias("rank"),
                           F.dense_rank().over(window).alias("dense_rank"))
partitioned_df.show()

 
 

Pyspark 집계 연산 함수의 아쉬운 점

  • Spark의 집계 함수를 사용할 때, 가장 헷갈리는 부분은 groupBy 시에 collect_list와 같은 복합 집계 함수를 사용하였을 때, 순서가 유지되느냐였다.
  • 우선, collect_list를 여러 칼럼에 사용하면, 묶는 순서는 유지된다. 즉, collect_list를 여러 칼럼에 사용했더라도, 같은 index에 존재하는 값은 같은 row를 통해서 온 값이다.
  • 하지만 Pyspark의 집계함수가 아쉬운 점이 하나 있는데, groupBy의 collect_list 때, 순서를 지정해 줄 수 없다는 점이다. 
  • SQL에서는 아래와 같은 기능이 하나의 쿼리를 통해 구현된다. 
SELECT STRING_AGG(COL1,',' ORDER BY COL4),STRING_AGG(COL2,',' ORDER BY COL5) FROM TABLE GROUP BY COL3
  • Pyspark의 경우, 각기 다른 collect_list가 같은 정렬 기준으로 결합된다면, orderBy 후에 groupBy를 수행하면 되지만, 위와 같은 SQL을 처리하기 위해서는 orderBy를 여러 번 호출해서 withColumn 형식으로 groupBy 테이블에 추가해줘야 한다는 점이다.
  • 그래도, 여러 단계를 거치면 구현이 가능하다는 점과, Spark는 계속 발전하고 있다는 점이 긍정적이다...

 


SQL의 문법이 익숙해서, Pyspark가 약간 아쉬운 면도 있는 것 같다. 하지만, UDF의 호환성이 훌륭하다는 점에서 Pyspark는 매우 좋은 것 같다.

반응형

업무에서 Cluster 기반의 Database 엔진을 사용하고 있다. Database 서버 내에서 굉장히 많은 양의 PL/SQL Function을 사용 중인데, 이것을 Spark로 바꾸고 있다. 성격상 이론보다 개발부터 진행 중이지만, 이론도 꼭 필요하다고 생각하여 공부 중이다.


Spark의 구조적 API란?

  • 구조적 API는 주로 정형 데이터 (CSV, JSON, Table) 등의 데이터를 처리하기 위한 고수준의 API이다. 
  • 정형화가 가능한 데이터들은 일반적으로 Table 형태로 표현할 수 있는데, Spark는 이러한 Table 구조의 데이터들을 빠르게 연산할 수 있는 다양한 API 등을 지원해 준다. 
  • 매우 쉽게 생각하면, Database의 Query 혹은 Function 등의 기능을 제공해 주는 기능이라고 생각하면 된다.
  • API라는 말이 이질적으로 다가올 수 있는데, Spark는 실행 계획 수립과 처리에 사용하는 자체 데이터 타입 정보를 가지고 있는 Catalyst 엔진을 사용한다. 즉, Pyspark를 통해 전달하는 데이터도 Catalyst 엔진에서 자체 데이터 타입으로 변경된다. 사용자는 Spark 코드를 짠다고 하지만, 실제로는 사용자의 코드가 그대로 연산되는 것이 아니라, Catalyst 엔진에서 수행 가능하도록 만들어 놓은 API를 호출하는 것이기 때문에, API라고 부른다. 
  • 개인적인 경험상, SQL Query Function을 Spark API로 바꾸는 것이 복잡하긴 해도, 80% 이상의 변환이 가능하였다. (안 되는 부분도 몇 단계의 과정을 거치면 거의 변환이 되었다.)

구조적 데이터 종류

  • DataFrame : Run time 단계에서 데이터 타입 일치를 확인
  • DataRow : Compile 단계에서 데이터 타입의 일치 여부를 확인, 다만 Spark와 같이 JVM 기반의 언어인 Scala와 Java에서만 지원한다.

 

Schema

  • DataFrame은 Schema로 정의되는데, Schema에는 칼럼명과 각 칼럼의 데이터 타입을 정의한다.
  • Schema는 데이터 소스에서 읽거나, 직접 정의가 가능하다. 
  • 간단한 데이터 분석 등에 대해서는 타입 추론을 많이 사용하지만, 운영을 위한 ETL 등에서는 데이터 타입을 미리 지정해 주는 것이 좋다.

 

Spark 구조적 데이터 타입 

  • 기본적으로 Python은 타입추론을 하여, 변수의 타입 선언에 민감하지 않다.
  • Spark는 앞서 말했듯, Python에서 연산을 처리하는 것이 아닌, Catalyst 엔진에서 데이터의 연산을 처리하기 때문에, DataType을 지정해줘야 한다.
  • 기본적으로 Pandas의 Dataframe처럼 DataFrame 생성 시, 데이터의 타입을 추론하지만, 특정 칼럼의 전 데이터가 NULL인 경우 등, 데이터 타입 추론이 불가한 경우에는 에러가 뜰 수 있기 때문에, DataFrame 선언 시 아래의 데이터 타입을 지정해 주는 것이 좋다. 
  • DataFrame 내에는 Array나 Struct 타입의 복합 데이터 타입도 지정할 수 있다.

[Spark, Python 데이터 타입 비교 및 사용 예시]

(편의를 위해, spark session 접근 및 "import pyspark.sql.functions as F"는 생략

Spark 데이터 타입 Python 데이터 타입 pyspark import & 사용예시
ByteType bytes from pyspark.sql.types import ByteType
byte_column = F.lit(byte_value).cast(ByteType())
ShortType int from pyspark.sql.types import ShortType
short_value = 10
short_column = F.lit(short_value).cast(ShortType())
IntegerType int from pyspark.sql.types import IntegerType
int_value = 100
int_column = F.lit(int_value).cast(IntegerType())
LongType int from pyspark.sql.types import LongType
long_value = 1000
long_column = F.lit(long_value).cast(LongType())
FloatType float from pyspark.sql.types import FloatType
float_value = 3.14
float_column = F.lit(float_value).cast(FloatType())
DoubleType float from pyspark.sql.types import DoubleType
double_value = 2.71828
double_column = F.lit(double_value).cast(DoubleType())
DecimalType decimal from pyspark.sql.types import DecimalType
decimal_value = Decimal('3.14159265359')
decimal_column = F.lit(decimal_value).cast(DecimalType(10, 10))  # (precision, scale)
StringType str from pyspark.sql.types import StringType
string_value = "Hello, World!"
string_column = F.lit(string_value).cast(StringType())
BinaryType bytes from pyspark.sql.types import BinaryType
binary_value = bytes([0, 1, 2, 3, 4])
binary_column = F.lit(binary_value).cast(BinaryType())
BooleanType bool from pyspark.sql.types import BooleanType
boolean_value = True
boolean_column = F.lit(boolean_value).cast(BooleanType())
TimestampType datetime의 timestamp from pyspark.sql.types import TimestampType
from datetime import datetime
timestamp_value = datetime(2023, 9, 3, 12, 0, 0)
timestamp_column = F.lit(timestamp_value).cast(TimestampType())
DateType date from pyspark.sql.types import DateType
from datetime import date
date_value = date(2023, 9, 3)
date_column = F.lit(date_value).cast(DateType())
ArrayType list from pyspark.sql.types import ArrayType, IntegerType

python_list = [1, 2, 3, 4, 5]
array_column = F.lit(python_list).cast(ArrayType(IntegerType()))
MapType dict from pyspark.sql.types import MapType, StringType, IntegerType

python_dict = {"apple": 1, "banana": 2, "cherry": 3}

map_column = F.lit(python_dict).cast(MapType(StringType(), IntegerType()))
StructType class
(C언어의 구조체 데이터 타입)
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

python_struct = [("Alice", 30), ("Bob", 25), ("Charlie", 35)]

schema = StructType([
    StructField("name", StringType(), nullable=True),
    StructField("age", IntegerType(), nullable=True)
])

struct_column = F.lit(python_struct).cast(schema)
StructField class
(StructType의 각 필드에 대한 메타 정보 설정에 사용됨)

 

 

구조적 API의 실행 과정

  • Spark를 공부하면서 느낀 점은 Database 엔진과 매우 비슷하다는 점이다. Database 엔진은 SQL에 대해서 통계적 정보 등을 이용하여 실행 계획을 만드는데, Spark도 마찬가지이다. 
  • 구조적 API는 다음과 같이 실행된다.
    1. 다양한 언어를 통해, DataFrame/SQL 등을 이용한 코드를 작성한다.
      • 코드가 검증 전 논리적 실행 계획으로 변환된다. 이 과정은 코드의 유효성이나, 테이블, 칼럼 등의 존재 여부만 확인한다. 쉽게 말해서, 문법검사 단계만 진행한다고 생각하면 된다.
    2. 코드에서 이상이 없다면, Spark가 이 코드를 논리적 실행 계획으로 변환한다.
      • Spark의 Analyzer에 의해, 칼럼과 테이블등을 검증한다. 이 과정에서 Catalog와 DataFrame의 정보 등을 활용한다. 
      • Catalyst의 Optimizer에 의해 Predicate Pushing Down 등의 논리 계획 최적화가 이뤄진다. (Database 엔진의 Catalyst Optimizer와 거의 동일하다.)
    3. Spark의 논리적 실행 계획을 물리적 실행 계획으로 변환하고, 추가적 최적화를 진행한다.
      • 각 Plan에 대한 Cost를 고려하여 논리적 실행 계획을 물리적 실행 계획으로 변환한다. 
      • 특히, Spark는 Cluster 환경에서 많이 사용하기 때문에, 이 부분이 Spark의 강점이다. (사실 Cluster 기반의 Database 엔진의 강점이기도 하다.)
    4. Spark는 Cluster에서 물리적 실행계획(RDD 처리)을 실행한다.

 

 

구조적 API 기본 연산 

  • 구조적 API 사용을 위해, 기본적인 연산들을 알아놓아야 한다. 거의 대부분이 SQL에 있는 함수 및 쿼리이고, 동일한 동작을 한다.
  • 글 쓰는 시점에서 가장 최근인 3.4.1(23.6.20) 기준으로 작성했다. (Spark는 계속 업데이트 중이다. 사용 전에 함수를 살펴보는 것도 좋다. : https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/index.html)
  • 매우 유용한 함수들이 많지만, 나의 기준에서 많이 활용했던 기본 Function 기준으로 소개한다.

 [DataFrame 관련 함수]

 

 

  •  Select
    • SQL에서와 마찬가지로, spark에서 칼럼 조회를 위해 사용하는 API이다. Query와 동일하게 select 안에 칼럼을 지정해 주면 되고, 모든 칼럼을 의미하는 '*'도 동작한다.
# 모든 칼럼을 선택
selected_df = df.select("*")

# 특정 칼럼을 선택
name_df = df.select("name")

# 다중 칼럼을 선택 가능
column_list = ['a','b','c']
selected_df = df.select(*column_list)

# Alias 사용 가능
selected_df = df.select("name").alias("name2")
  • SelectExpr
    • SelectExpr은 select와 비슷하지만, SQL처럼 표현식을 사용가능하다. 'as'를 이용한 alias 지정도 가능하고, 숫자형 칼럼에는 연산도 가능하다. 
    • Select를 SQL에 가까운 형태로 사용할 수 있다.
# 칼럼 이름 변경하여 선택
name_renamed_df = df.selectExpr("name as full_name")

# 새로운 칼럼 추가하여 선택
age_plus_10_df = df.selectExpr("*", "age + 10 as age_plus_10")
age_double_10_df = df.selectExpr("*", "age * 2 as age_double")
  • withColumn
    • withColumn은 DataFrame에 새로운 칼럼을 추가하거나, 기존 칼럼을 변경하는 데 사용된다. 
    • 한 번에 하나의 칼럼밖에 못 바꾸기 때문에, 여러 칼럼을 바꿔야 하면, withColumn을 계속 연결해서 사용해줘야 한다.
    • withColumnRenamed를 사용하면, 칼럼명을 바꿀 수 있다. (바꿀 칼럼명, 바뀔 칼럼명)
# withColumn으로 새로운 열 추가
df = df.withColumn("square_age", df["age"]*df["age"])

# withColumnRenamed로 칼럼명 변경
df = df.withColumnRenamed("square_age", "squared_age")
  • Drop
    • Drop은 DataFrame에서 칼럼을 제거할 때 사용한다. 
    • Drop의 호출만으로 DataFrame의 칼럼이 제거되는 것이 아닌, 새로운 DataFrame에 칼럼이 제거된 상태로 할당되는 구조이기 때문에, 꼭 반환값을 받아야 한다.
# 특정 칼럼을 제거한 DataFrame 생성
df_without_age = df.drop("age")
df_without_name = df.drop(col("age"))
  • Cast
    • Cast는 DataFrame의 칼럼의 데이터타입을 변경하는 함수이다. 
# "age" 칼럼의 데이터 타입을 문자열(string)로 변환
df_casted = df.withColumn("age_string", df["age"].cast("string"))
  • Where/Filter
    • Where와 Filter는 동일한 동작을 하는 함수로, DataFrame에서 특정 조건을 만족하는 행을 선택할 때 사용된다.
    • SQL의 Where와 같다. 
    • and 조건 (&)과 or 조건 (|) 연산자를 지원하기 때문에, 조건을 하나의 filter나 where에 담을 수 있다. (중복 조건일 때는 연속해서 where or filter를 걸어도 된다.)
# Filter와 Where 조건
filtered_df = df.filter(df["age"]>=30)
where_df = df.where(df["age"]>30)

# 다중 Filter
filtered_df = df.filter(df["age"] <=20 | df["age"] > 50)
filtered_df = df.filter(df["age"] <=30 & df["name"] == "Choi")
filtered_df = df.filter(df["age"] <=30).filter(df["name"] == "Choi")
  • Distinct
    • Distinct 함수는 중복 Row를 제거하고, 고유한 Row만 선택한다.
    • SQL의 distinct와 동일하다.
    • subset 매개변수로, 특정 칼럼들을 list 형태로 전달하면, 해당 칼럼을 기준으로 Distinct가 가능하다.
# 전체 칼럼 중, 중복행 제거
distinct_df= df.distinct()

# 특정 칼럼 중복행 제거
distinct_df= df.distinct(["name"])
  • sort, orderBy
    • sort와 OrderBy는 특정 칼럼 기준으로 순서를 나열하기 위해 사용하는 함수이다.
    • 둘 다 비슷한 동작을 하지만, 사용 방법이 약간 다르다.
    • sort는 ascending 인자를 True or False로 오름차순, 내림차순을 선택하고, orderBy는 칼럼기준으로 asc함수나 desc 함수를 사용하여, 정렬 방법을 지정해줘야 한다.
    • orderBy는 각 칼럼의 정렬순을 따로 지정(어떤 칼럼은 오름차순, 어떤 칼럼은 내림차순으로)할 수 있기 때문에 유용하다.
# sort, orderby를 이용한 정렬
sorted_desc_df = df.sort(col("age"), ascending=False)
sorted_desc_df = df.orderBy(col("age"))

# 다중 칼럼 정렬
sorted_desc_df = df.sort(["age","name","city"])
sorted_df = df.sort(col("age"), ascending=True).sort(col("name"), ascending=False).sort(col("city"), ascending=True)
sorted_df = df.orderBy(col("age").asc(), col("name").desc(), col("city").asc())
  •  limit
    • limit은 DataFrame에서 특정 수의 행을 선택하여 반환하는 데 사용한다. 
    • limit을 이용하여, 특정 칼럼 이상을 Filtering 하거나, 일부 데이터만 보는 데 사용할 수 있다. 
    • 다만, 특정 순으로 정렬되어 있지 않으면, 순서가 유지되지 않을 수 있다.
    • Oracle의 rownum 조건이나, Postgresql의 limit과 동일하다.
sort_df = df.orderBy[col("age")]
limited_df = sort_df.limit(2)
  • repartiton
    • repartition은 DataFrame의 partition을 변경하거나, 재분할하는 데 사용된다.
    • 숫자를 인자로 받아서, 해당 숫자만큼의 partition으로 나눈다. (칼럼을 지정하지 않으면, 모든 칼럼을 고려하여 나눈다.)
    • Spark의 강점이 데이터 분산 처리이기 때문에, 데이터 분산을 최적화하여, 성능에 큰 영향을 미친다. 
# 단순 partition 숫자 지정
repartitioned_df = df.repartition(4)

# partition 칼럼 지정
repartitioned_df = df.repartition(4, "name", "age")
  • coalesce
    • coaleasce는 DataFrame의 partition 수를 줄이는 데 사용된다. (repartition은 증/감이 모두 가능하다는 것과 다르다.)
    • partition을 줄일 때는 repartition보다 성능이 좋기 때문에(default가 shuffle을 사용하지 않기 때문에), 줄일때는 coalesce를 사용하는 것이 좋다.
    • 숫자를 인자로 받아서,  해당 숫자만큼의 partition으로 줄인다.
    • 불필요한 partition을 줄여서 오버헤드를 줄이는 데 사용할 수 있다. 
coalesced_df = df.coalesce(2)
  • collect, show, take
    • collect, show, take는 DataFrame에서 데이터를 가져오는 데 사용하는 함수이다.
    • collect는 DataFrame의 전체 row를 로컬 메모리에 array 형태로 변수 할당할 때 사용하는 함수이다. 이를 이용해 unittest 등에 활용 가능하다. 다만, 모든 데이터를 다 불러오기 때문에, 로컬에 메모리가 충분할 때만 사용하는 것이 좋다.
    • show는 DataFrame의 일부 행을 로컬 메모리에 출력한다. Default는 20이며, 인자로 숫자를 지정해 주면, 그 숫자만큼 이 출력된다. Spark는 비동기 처리이기 때문에, 중간중간에 show를 수행해 줘야, 데이터 처리의 중간단계를 확인 가능하다.
    • take는 collect와 비슷하나, 인자로 숫자를 받아, 해당 숫자만큼의 상위 데이터들을 가져와서 array 형태로 반환한다.
df.show(2)
data = df.take(2)
collect_data = df.collect()

[기본 연산을 위한 함수]

기본적으로 pyspark에서 function은 pyspark.sql.functions에서 import 가능하다. (보통 alia로 F를 지정해서 많이 사용한다.)

 

  • Lit
    • Lit은 column에 상수값을 추가하거나, 리터럴 값을 생성할 때 사용하는 함수이다. when이나 expr이랑 같이 사용하면, 복잡한 연산등을 같이 담을 수 있다.
df = df.withColumn("country", F.lit("KOREA"))
df_with_country = df.withColumn("d_day", F.lit(50))
  • nanvl
    • nanvl은 2개의 칼럼을 받아서, 첫 번째 칼럼이 nan이 아닐 경우 첫번째 칼럼을, nan일 경우 두 번째 칼럼을 반환하는 함수이다. 
    • SQL의 nvl이나 postgresql의 coalesce와 동일하다.
df_with_age = df.withColumn("age", F.nanvl(df["age"], 0.0))
  • union
    • union은 두 개 이상의 DataFrame을 결합하여 새로운 DataFrame을 생성하는 데, 사용된다.
    •  결합하는 DataFrame은 모든 행이 동일한 데이터 타입을 가져야 한다.
    • SQL의 merge와 다른 점은 중복행을 제거하지 않는다는 것이다. (만약, 필요하다면, union 후에 distinct 처리가 필요하다.)
# 두 개의 DataFrame 생성
data1 = [("Minsu", 30), ("Daehwi", 30)]
data2 = [("Jaeeun", 28), ("JoonSung", 29)]
columns = ["Name", "Age"]
df1 = spark.createDataFrame(data1, columns)
df2 = spark.createDataFrame(data2, columns)

# 두 개의 DataFrame을 합침
union_df = df1.union(df2)
  • join
    • join은 DataFrame을 합치는 데 사용된다. 
    • SQL의 join과 같은데, SQL이 inner, outer, left, right join이 각각 있는 것에 비해, join 함수에서 인자로 결합 방식을 선택한다.
    • on에 공통 열의 이름을, how에 join 방법(inner, outer, left, right)을 지정해 준다.
    • join 후에는 두 테이블의 모든 칼럼명이 존재한다. 이때, 테이블의 alias를 지정 안 하고, 두 테이블에 공통으로 존재하는 칼럼을 참조하려 하면, 어느 테이블의 칼럼을 의미하는지 몰라서(join에 참여했든 안 했든) 에러가 뜬다. 따라서, join 전에 드라이빙 테이블과 join 테이블 모두를 alias 지정해 주고, join 후에 select로 사용할 칼럼만 남기는 것이 좋다.
# 두 DataFrame을 "Name" 열을 기준으로 inner 조인
result_df = df1.alias("A").join(df2.alias("B"), on="Name", how="inner")
result_df = result_df.select("A.Name", "A.ID", "B.Occupation")

# 두 DataFrame을 "Name"과 "EmployeeName" 열을 기준으로 inner 조인
result_df = df1.join(df2, col("Name") == col("EmployeeName"), how="inner")
  • isNull
    • isNull은 DataFrame의 특정 칼럼이 null인지를 확인하기 위한 함수이다.
    • Null일 경우 True, 아닐 경우 False를 반환한다.
filtered_df = df.filter(col("age").isNull())
  • monotonically_increasing_id
    • monotonically_increasing_id는 DataFrame에 1씩 증가하는 interger 형태의 숫자를 부여한다.
    • GroupBy 등에서 grouping 번호를 지정할 때, 매우 요긴하게 사용 가능하다.
df_with_id = df.withColumn("ID", monotonically_increasing_id())

 

 

 


SQL을 많이 쓰다 보니, Spark의 직관적이지 않은 데이터 처리 방법이 익숙하지 않다. 다음 시간에는 데이터 타입에 특화된 함수를 중심으로 알아봐야겠다. 

반응형

Spark에 대해 집중적으로 공부하기 전에 Spark의 전체적인 구조와 개념들에 대해 알아보자. 


Spark 기본 아키텍처

  • 이전 글에서 언급한대로, Spark는 클러스터 자원 내에서 분산 처리를 위한 프레임워크이다. (물론, 단일 머신 내에서도 사용 가능하다.)
  • Spark는 클러스터 내의 데이터 처리를 총괄하는 클러스터 매니저Spark Application을 요청하고, 클러스터 매니저가 Spark Application에 대한 자원을 할당하는 방식으로 처리된다. 
  • Spark Application은 2종류의 Process로 구성된다.
    • Driver Process : Cluster 내의 단 하나의 노드에서 실행됨. 대한 정보, 사용자 프로그램과 입력에 대한 응답, Excutor Process의 작업에 대한 분석, 배포, 스케줄링 역할을 수행.  
    • Executor Process : Cluster 환경에 맞게 여러 개의 노드에서 실행될 수 있음. Driver Process가 할당한 작업을 처리하고, 진행 상황을 Driver에 전송. 

스파크 아키텍처 (출처 : https://learn.microsoft.com/ko-kr/dotnet/spark/what-is-spark)

 

SparkSession

  • Spark 아키텍처를 보면, Driver process 내에 SparkSession이 존재하는 것을 볼 수 있다. 
  • SparkSession은 Spark에서 작업을 수행하기 위한 진입점이다. (이전 버전의 SparkContext, SQL Context 등을 통합)
  • SparkSession은 Spark Application과 데이터 소스와의 연결, Cluster 간의 연결을 관리하고, 데이터 처리를 위한 API를 제공하는 역할을 한다.

 

Spark의  API

  • Spark는 기본적으로 Scala, Java, Python, SQL, R 등의 다양한 언어를 지원한다. 사용자들은 본인들이 편한 언어를 통해서, Spark의 API를 호출하면 된다. 
  • 이것은 Spark Session이 각 언어에서 작성된 코드를 API 형태로 받아, Spark 코드로 변환해주기 때문에 가능하다. (이 때문에, 어느 언어의 API를 사용하던 실행 시간등에 큰 차이가 없다.) 
  • Spark는 추상화 수준에 따라 크게 2가지 종류의 API를 제공한다.
    • 저수준의 비구조적 API : RDD, SparkContext, Accumulator, Broadcast Variable
    • 고수준의 구조적 API : DataSet, DataFrame, SQL Table
  • Spark 초기 버전에는 RDD와 같은 저수준의 비구조적 API가 많이 사용되었지만, 더 복잡하고 낮은 추상화 수준을 가지기 때문에, 최신 버전에서는 DataFrame이나 Dataset과 같은 고수준의 구조적 API가 많이 활용된다. 
  • 실제 고수준의 구조화된 API들도, 실제 연산 시에는 RDD로 처리된다. (Spark 내부적 처리)

 

Spark 데이터 처리 방식

  • Spark는 데이터구조는 기본적으로 불변성의 특성(변경하지 못함)을 가지고 있다. 데이터 변경을 위해서는 원하는 변경 방법을 Spark에 "Transformation"라는 명령을 보내야한다.
  • Spark의 연산은 기본적으로 지연 연산 방식을 사용한다. 데이터에 직접적인 수정을 하지 않고, Raw 데이터에 적용할 Transformation의 실행 계획을 생성하고, 물리적 실행 계획으로 Compile한다.
  • 실행 계획에 대한 실제 연산은 "Action" 명령을 통해 이뤄진다.  (즉, 실제 사용될때 연산이 진행된다.)

 

 

 

반응형

 

데이터를 다루는 입장에서는 데이터 처리 속도를 신경 쓰지 않을 수 없다. 최근 운영 중인 시스템의 처리 속도를 개선하기 위해 다양한 방법을 고민하던 중,  Spark에 대한 관심이 생겨서, 공부한 내용을 정리해 보기로 한다. 


Introduction

Apache Spark란?

  • Apache Spark는 빅데이터 처리를 위한 오픈소스 분산, 통합 컴퓨팅 시스템이다. 

[통합]

  • Apache Spark 전에는 각기 다른 시스템으로 구성된 데이터의 흐름(SQL 처리, 데이터 스트리밍, 데이터 분석)을  개발자가 직접 조합하여 Application을 만들었다.
  •  Apache Spark 내에서는 다양한 데이터 흐름을 처리하는 시스템 및 API들을 일관성있는 API로 통합하여, 개발자들이 더 쉽고 효율적인 코드를 개발할 수 있도록 하였다. 

[분산]

  • 많은 데이터 분석가 & 데이터 엔지니어들이 Apache Spark에 집중하는 이유는 분산 처리를 이용한 빠른 데이터 처리 때문이다. Apache Spark는 같은 데이터 분산 처리 플랫폼인 Hadoop을 개선한 플랫폼인데, Hadoop과의 차이를 통해서 Spark의 특징을 살펴보자. 
  Hadoop Spark
목적 데이터 분산 처리 & 저장 데이터 분산 처리 & 분석
데이터 처리 모델 HDFS를 사용한 데이터 저장
MapReduce 기반 분산 처리 모델
인메모리 기반 분산 처리 모델
저장소 HDFS HDFS, S3, Elastic Search 등
속도 작은 규모의 데이터셋의 경우 Hadoop이 더 빠를 수 있음
(Spark 메모리 Load 시간 때문)
반복적인 처리가 필요한 작업에서는 Hadoop의 최대 100배 빠름
호환성 별도의 Package와 연계해야함 언어 : Python, R, Java, Scala, SQL 등 지원
분석 : MLlib, GraphX API 등 지원
  • 대체적으로 Spark가 더 좋은 플랫폼이구나라는 생각을 할 수 있지만, Hadoop이 Spark에 비해 갖는 장점이 있는데, 바로 데이터 보존에 대한 안정성이다. Spark는 인메모리 기반의 데이터 처리를 진행하기 때문에, 서버 다운 등의 장애에 취약하다. 따라서, 최근(?)에는 Hadoop과 Spark를 같이 사용하는 경우가 많다.

 

+ Recent posts