반응형

SQL에서 가장 유용하다고 생각하는 기능을 집계함수이다. 테이블 내에서 Grouping을 통해 통계값을 추출하고, 값을 모아 연산하는 과정이 다른 절차형 언어들에 비해 SQL이 가지는 특장점이라고 생각한다. Spark의 구조적 API도 SQL의 기본적인 기능을 제공하기 때문에, 마찬가지로 집계 연산 함수를 위한 API를 제공한다. 


Spark 집계 연산 함수란?

  • 집계함수는 키나 그룹을 지정하고, 하나 이상의 칼럼을 특정 연산으로 모아서, 그룹별로 결과 내는 데 사용하는 함수이다.
  • 기본적으로 Spark의 집계함수는 pyspark.sql.functions 내에 정의되어 있다.
  • SQL의 문법과 거의 비슷하게 지원하지만, SQL의 모든 기능을 100% 지원하지는 않는다. (2023년 9월 기준)

 

Spark의 groupBy

  • Spark의 groupBy는 데이터를 특정 열 기준으로 그룹화하고, 그룹별로 집계 연산을 수행하는 데 사용된다.
  • Spark의 DataFrame에서 groupBy 함수에 그룹화를 위한 칼럼을 인자로 넣어주고, agg 함수의 인자로 집계 연산 식을 넣어준다. 
  • Spark는 groupBy 작업을 여러 파티션에 병렬로 분산 처리하기 때문에, 효율적 그룹화와 집계가 가능하다.
from sklearn import datasets
import pandas as pd
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

if __name__ == '__main__':
    iris = datasets.load_iris()
    df = pd.DataFrame(data=iris.data, columns=iris.feature_names)
    df["species"] = iris.target

    spark = SparkSession.builder.appName("iris").getOrCreate()
    df = spark.createDataFrame(df)

    grouped_df = df.groupBy(F.col("species")).agg(F.avg(F.col("petal width (cm)")).alias("avg petal width (cm)"))

    grouped_df.show()

Spark의 partitionBy

  • SQL에서 지원하는 partition by를 Spark에서도 사용할 수 있다.
  • groupBy와의 차이는 groupBy는 개별데이터가 없어지고, 그룹화에 참여하지 않은 칼럼 정보가 사라지지만, partitionBy는 그렇지 않다는 점이다. (group by 하고, join을 한 것 동일한 효과를 낸다.)
  • pyspark.sql.window에 Window를 사용하여, window(그룹화 방식)을 설정해주고, 집계함수 뒤에 over 함수에 window를 넣어줘서 partitionBy를 수행한다.
from sklearn import datasets
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as F

if __name__ == '__main__':
    iris = datasets.load_iris()
    df = pd.DataFrame(data=iris.data, columns=iris.feature_names)
    df["species"] = iris.target

    spark = SparkSession.builder.appName("iris").getOrCreate()
    df = spark.createDataFrame(df)

    window = Window.partitionBy(F.col("species"))

    partitioned_df = df.select("*", F.avg(F.col("petal width (cm)")).over(window).alias("avg petal width (cm)"))

    partitioned_df.show()

 

Spark의 집계 연산 함수 종류

Pyspark에서 집계함수를 위한 API도 pyspark.sql.functions에 선언되어 있다. 
 
[Agg 함수]

  • count
    • 가장 기본적인 집계함수로 해당 그룹에 포함되는 row의 개수를 세는데 사용한다.
    • spark에서는 SQL과 동일하게 count라는 API를 제공한다.
grouped_df = df.groupBy(F.col("species")).agg(F.count(F.col("petal width (cm)")))
grouped_df.show()

 

  • countDistinct 
    • 집계를 하다보면, 단순 포함되는 element 개수가 중요한 경우도 있지만, 값 자체가 중요한 경우가 많다.
    • 중복 값을 제외하고, 고유한 값의 개수를 집계한다.
    • SQL에서는 count(distinct column)의 형식으로 표현되지만, spark에서는 countDistinct라는 별도의 API로 집계한다.
grouped_df = df.groupBy(F.col("species")).agg(F.countDistinct(F.col("petal width (cm)")))
grouped_df.show()
  • approx_count_distinct
    • 기본적으로 count distinct 연산은 속도와 메모리를 많이 사용한다. 고윳값의 개수를 정확히 알지 않아도 되는 연산등에서는 이러한 연산은 불필요한 cost를 발생시킨다.
    • 예를 들어, 통계분포를 사용하기 위해, 대략 고윳값이 30개 이상인지 아닌지를 파악하기 위해서, countDistinct 함수를 사용하는 것은 매우 비효율적이다.
    • pyspark에서는 approx_count_distinct 함수를 제공하여, 대략적인 고윳값의 개수를 파악할 수 있다. approx_count_distinct 함수는 HyperLogLog 알고리즘을 사용하여, 근사치를 사용하여 계산한다. (메모리 내의 데이터 분포로 전체 데이터의 고윳값 개수를 추정)
    • relativeSD 인자를 통해, 정확성을 조정 가능하다. (기본값은 0.05) 정확성이 높아질수록, 메모리 사용량 및 연산 시간은 늘어남.
grouped_df = df.groupBy(F.col("species")).agg(F.approx_count(F.col("petal width (cm)")))
grouped_df.show()

 

  • first, last
    • 특정 순서로 정렬되어 있는 데이터들에서 그룹의 첫 값이나, 마지막 값이 필요한 경우가 있다. 
    • Spark에서는 first, last 함수를 제공하여, 해당 그룹의 첫번째 행의 값과 마지막 행의 값을 표기할 수 있다. 
    • first와 last가 유의미한 값을 갖기 위해서는 orderBy와 함께 사용되어야 한다. first와 last는 행의 정렬 기준으로 값을 추출하기 때문에 정렬되지 않은 DataFrame에서 유의미한 결과가 아닐 수 있다.
df = df.orderBy(F.col("petal width (cm)"))
grouped_df = df.groupBy(F.col("species")).agg(F.first(F.col("petal width (cm)")), F.last(F.col("petal width (cm)")))
grouped_df.show()

 

  • min, max
    • 통계값의 꽃 min, max이다. Spark에서도 모두 활용 가능하다.
    • min, max는 수치형 데이터 타입, 소수형 데이터 타입, timestamp 데이터 타입, datetype 등에 모두 사용 가능하다. 
    • 문자형 데이터 타입에서도 사용할 수 있는데, 사전순으로 정렬하여 가장 큰 값을 찾는다.
df = df.orderBy(F.col("petal width (cm)"))
grouped_df = df.groupBy(F.col("species")).agg(F.avg(F.col("petal width (cm)")), F.min(F.col("petal width (cm)")),
                                              F.max(F.col("petal width (cm)")))
grouped_df.show()
  • avg, sum
    • avg, sum도 Spark에서 활용 가능하다.
    • avg, sum은 수치형 데이터 타입, 소수형 데이터 타입에만 활용 가능하다. 
grouped_df = df.groupBy(F.col("species")).agg(F.avg(F.col("petal width (cm)")), F.sum(F.col("petal width (cm)")))
grouped_df.show()

 

  • sumDistinct
    • countDistinct와 같이, 중복을 제외하고 고유한 값들을 더하기 위해, sumDistinct가 존재한다.
grouped_df = df.groupBy(F.col("species")).agg(F.avgDistinct(F.col("petal width (cm)")))
grouped_df.show()

 

  • collect_list, collect_set
    • 그룹에 있는 모든 원소의 값들이 필요한 경우도 존재한다. 이 경우 collect_list와 collect_set을 이용하여, 각각 칼럼 값들을 원소로 한 list와 set을 구할 수 있다.
    • 특히, 그룹화 후, 파이썬등으로 인자를 넘길때, collect_list와 collect set을 많이 활용한다.
    • 다만, 그룹화 시에, row 순서가 유지되지 않기 때문에 주의해야 한다.
grouped_df = df.groupBy(F.col("species")).agg(F.collect_list(F.col("petal width (cm)")),
                                              F.collect_set(F.col("petal width (cm)")))
grouped_df.show()

 

  • mode
    • Spark 3.4 버전에서 새로 등장한, 최빈값을 추출하는 집계함수이다.
grouped_df = df.groupBy(F.col("species")).agg(F.mode(F.col("petal width (cm)")))
grouped_df.show()
  • product
    • 그룹 내의 모든 값을 곱하는 집계함수이다. 
grouped_df = df.groupBy(F.col("species")).agg(F.product(F.col("petal width (cm)")))
grouped_df.show()

 
이 밖에도, 표준편차나 분산, 첨도 등을 구하는 다양한 함수들이 존재하니, Spark 공식 문서를 참고하는 것이 좋다.
https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html

Functions — PySpark 3.4.1 documentation

Computes the first argument into a string from a binary using the provided character set (one of ‘US-ASCII’, ‘ISO-8859-1’, ‘UTF-8’, ‘UTF-16BE’, ‘UTF-16LE’, ‘UTF-16’).

spark.apache.org

 
 
[Window 함수]

  • row_number, rank, dense_rank
    • 그룹화한 후에, 그룹 내에서 순위등을 매기는 경우가 있다. 이러한 경우에는 row_number나 rank, dense_rank를 사용할 수 있다.
    • row_number와 rank, dense_rank는 window에 orderBy 절을 추가해야 한다.
    • row_number는 정렬 후 row 순대로 값이 나오기 때문에 빈값이 없지만, 값이 같은 경우 orderBy 된 순서가 달라질 수 있다. rank의 경우에는 빈값이 존재 가능하다. (1위 동률이 많으면, 2등이 없다.) dense_rank는 빈값 없이 순위를 매긴다. 
window = Window.partitionBy(F.col("species")).orderBy(F.col("petal width (cm)"))
partitioned_df = df.select("*", F.row_number().over(window).alias("row_num"), F.rank().over(window).alias("rank"),
                           F.dense_rank().over(window).alias("dense_rank"))
partitioned_df.show()

 
 

Pyspark 집계 연산 함수의 아쉬운 점

  • Spark의 집계 함수를 사용할 때, 가장 헷갈리는 부분은 groupBy 시에 collect_list와 같은 복합 집계 함수를 사용하였을 때, 순서가 유지되느냐였다.
  • 우선, collect_list를 여러 칼럼에 사용하면, 묶는 순서는 유지된다. 즉, collect_list를 여러 칼럼에 사용했더라도, 같은 index에 존재하는 값은 같은 row를 통해서 온 값이다.
  • 하지만 Pyspark의 집계함수가 아쉬운 점이 하나 있는데, groupBy의 collect_list 때, 순서를 지정해 줄 수 없다는 점이다. 
  • SQL에서는 아래와 같은 기능이 하나의 쿼리를 통해 구현된다. 
SELECT STRING_AGG(COL1,',' ORDER BY COL4),STRING_AGG(COL2,',' ORDER BY COL5) FROM TABLE GROUP BY COL3
  • Pyspark의 경우, 각기 다른 collect_list가 같은 정렬 기준으로 결합된다면, orderBy 후에 groupBy를 수행하면 되지만, 위와 같은 SQL을 처리하기 위해서는 orderBy를 여러 번 호출해서 withColumn 형식으로 groupBy 테이블에 추가해줘야 한다는 점이다.
  • 그래도, 여러 단계를 거치면 구현이 가능하다는 점과, Spark는 계속 발전하고 있다는 점이 긍정적이다...

 


SQL의 문법이 익숙해서, Pyspark가 약간 아쉬운 면도 있는 것 같다. 하지만, UDF의 호환성이 훌륭하다는 점에서 Pyspark는 매우 좋은 것 같다.

+ Recent posts