반응형

DeepViT 배경 설명

  • DeepVit는 2021년에 ViT에 후속으로 나온 논문이다.
  • ViT의 등장 이후, CNN 처럼 ViT를 깊게 쌓기 위한 방법을 제시한 논문으로, ImageNet classification에서 기존 CNN 기반의 SOTA를 넘어서는 성능을 보였다고 한다.

 

Abstract

  • 이 논문에서는 Image Classification에서 Layer가 깊어질수록 좋은 성능을 내는 CNN과 달리, ViT의 performance는 layer가 깊어질수록 성능이 더 빨리 saturate 되는 것을 발견했다.
  • 이것은 transformer의 사이즈가 커지면서, attention map들이 점점 비슷한 형태를 띠는 "attention collapse issue" 때문이다.
  • 이것은 ViT에 deeper layer들을 적용하기 위해서 self-attention 방식이 효과적이지 못함을 보여준다.
  • 이러한 결과로, 이 논문에서는 간단하면서 효과적인 "Re-attention"이라는 attention map을 재생성해서, layer들의 다양성을 향상하는 방법을 제안한다.
  • 이 방법은 기존 ViT를 조금만 수정하더라도 좋은 성능을 보인다.

 

Introduction

[배경]

  • CNN 방식은 Image Classification 학습 시에 더 깊은 layer를 사용할 수록, 더 풍부하고, 복잡한 형태의 representations를 학습하기 때문에, layer를 어떻게 더 효과적으로 쌓는지에 대해서 연구가 많이 되고 있다.
  • ViT가 좋은 성능을 보이면서, 자연스럽게 ViT도 model을 깊게 쌓으면 CNN 처럼 성능이 좋아질 것인가? 에 대한 관심이 생기고 있다.

[ViT 깊이 실험]

  • 이를 검증하기 위해, 각기 다른 block number의 ViT를 가지고 ImageNet classification 성능을 비교해보았다.
  • 실험 결과, ViT 깊이가 커질수록 성능이 좋아지지 않는다는 것을 발견했고, 심지어 성능이 떨어지기도 하였다.
  • 실험적으로 이유를 확인해 보았을 때, ViT의 깊이가 깊어지면, 특정 layer 이상에서 attention map이 비슷해지는 현상을 발생하였다. (즉, attention의 역할을 제대로 수행하지 못함.) 더 깊어지면, 아예 모든 값들이 같아짐을 확인했다.
  • 즉, ViT의 깊이가 깊어지면 self-attention 방식이 더 이상 working 하지 않음을 의미한다.

[방법 제안]

  • 이러한 "attention collapse" 문제를 해결하고, ViT를 효과적으로 scale 하기 위해, 이 논문에서는 간단하지만 효과적인 self-attention 방식인, "Re-Attention" 방식을 소개한다.
  • Re-Attention은 Multi-Head self-attention 구조를 따르고, 다른 attention head들의 information을 이용하여, 좋은 성능을 내게 한다.
  • 이 방식을 도입하면, 별도의 augmentation이나 regularization 추가 없이도, 더 깊은 block의 ViT를 효과적으로 학습하여, 성능향상을 확인할 수 있다. (SOTA)

 

Attention Collapse

  • ViT에서 Transformer block 개수를 다르게 ImageNet Classification을 수행해 보았을 때, 기존 ViT에서는 block 개수가 커질수록 Improvement가 점점 감소되고, 심지어 성능이 줄어드는 것을 확인할 수 있다.
  • 이 이유를 CNN에는 없는 self-attention 때문으로 지목했는데, model의 깊이에 따른 attention을 확인해 보았다.
  • Transformer block이 32개 일 때, 각 block layer에서 다른 layer (인접한 k개의 layer) 와의 유사도를 구해본 결과 17번째 block을 넘어서는 순간 90% 이상의 거의 비슷한 output을 냄을 확인할 수 있다. 즉, 이후의 attention map들이 거의 비슷한 형태를 보이고, MHSA가 MLP를 악화시킬 수 있다는 것을 의미한다. 

 

Re-Attention

  • Attention Collapse를 해결하기 위해, 두 가지 방법을 제안한다. 첫 번째는, self-attention 연산을 위한 hidden dimension의 수를 늘리는 것이고, 두 번째는,  re-attention 방식이다.

[Self-Attention in Higher Diemnsion Space]

  • Self-Attention이 비슷해지는 것을 방지하기 위해, Dimension size를 늘리면, 더 많은 정보를 가지고 있게 되고, attention map이 더 다양해지고, 비슷해지지 않게 된다.
  • 아래 그림과 표를 보면, 12 Block의 ViT에서 Dimension size를 늘렸을 때, 비슷한 Block들의 수가 줄어들면서, ImageNet 성능이 향상됨을 확인할 수 있다.
  • 하지만, 이러한 방식은 성능적 한계가 있다는 점과, Parameter 숫자가 매우 늘었다는 단점이 있다.

[Re-Attention]

  • 다른 Transformer block 사이의 attnention map은 매우 비슷하지만, 동일 transformer block에서 다른 head 사이에서는 similarity가 작음을 확인했다.
  • 같은 attention layer의 다른 head들은 각기 다른 aspect에 집중하고 있기 때문이다.
  • 이 결과를 바탕으로, cross-head communication을 위해, attention map들을 재생성하는 방식을 제안한다.
  • 이를 위해, learnable parameter인 transformation matrix(H X H)를 개념을 도입하여, self-attention map의 head dimension 방향으로 곱해준다. 그 후 layer normalization을 진행하여 "Re-Attention" 구성한다. 

  • Re-Attention의 장점은 크게 2가지이다. 첫 번째는 Re-Attention map은 다른 attention head 들 사이에 정보를 교환할 수 있어, 상호 보완이 가능하고, attention map의 다양성을 늘린다. 또한, Re-Attention map은 효과적이면서 간단하다. 

→ 쉽게 말하면, self-attention을 진행할 때, 기존처럼 단순 softmax 값으로 값 참조를 하는 것이 아닌, 별도의 learnable parameter로 다양성을 향상하자는 개념임.

 

Experiments

  • 실험에서는 attention collapse 문제에 대한 설명을 위한 실험을 진행한다. 추가적으로 Re-attention의 장점에 대한 추가적인 실험을 진행한다. (생략) 
  • 논문에서 주장한 것처럼 Re-Attention을 사용하였을 때, 비슷한 attention 패턴이 매우 줄고, 이로 인해 image classification에서 기존 ViT보다 더 높은 성능을 보인다. (ImageNet)

  • Image Classification SOTA 모델들과 비교해 보았을 때도, 더 좋은 성능을 보인다.

Reference

ZHOU, Daquan, et al. Deepvit: Towards deeper vision transformer. arXiv preprint arXiv:2103.11886, 2021.

 

논문 총평

  • 내 식견이 넓지 않은 까닭인지 저자들이 주장하는 Attention Collapse 현상과 Re-Attention 논리 구조를 100% 이해하진 못했다. 
  • 다만, CNN SOTA와 비견할 정도로 높은 성능을 보인다는 점에서 좋은 연구였다고 생각한다.
반응형

SQL에서 가장 유용하다고 생각하는 기능을 집계함수이다. 테이블 내에서 Grouping을 통해 통계값을 추출하고, 값을 모아 연산하는 과정이 다른 절차형 언어들에 비해 SQL이 가지는 특장점이라고 생각한다. Spark의 구조적 API도 SQL의 기본적인 기능을 제공하기 때문에, 마찬가지로 집계 연산 함수를 위한 API를 제공한다. 


Spark 집계 연산 함수란?

  • 집계함수는 키나 그룹을 지정하고, 하나 이상의 칼럼을 특정 연산으로 모아서, 그룹별로 결과 내는 데 사용하는 함수이다.
  • 기본적으로 Spark의 집계함수는 pyspark.sql.functions 내에 정의되어 있다.
  • SQL의 문법과 거의 비슷하게 지원하지만, SQL의 모든 기능을 100% 지원하지는 않는다. (2023년 9월 기준)

 

Spark의 groupBy

  • Spark의 groupBy는 데이터를 특정 열 기준으로 그룹화하고, 그룹별로 집계 연산을 수행하는 데 사용된다.
  • Spark의 DataFrame에서 groupBy 함수에 그룹화를 위한 칼럼을 인자로 넣어주고, agg 함수의 인자로 집계 연산 식을 넣어준다. 
  • Spark는 groupBy 작업을 여러 파티션에 병렬로 분산 처리하기 때문에, 효율적 그룹화와 집계가 가능하다.
from sklearn import datasets
import pandas as pd
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

if __name__ == '__main__':
    iris = datasets.load_iris()
    df = pd.DataFrame(data=iris.data, columns=iris.feature_names)
    df["species"] = iris.target

    spark = SparkSession.builder.appName("iris").getOrCreate()
    df = spark.createDataFrame(df)

    grouped_df = df.groupBy(F.col("species")).agg(F.avg(F.col("petal width (cm)")).alias("avg petal width (cm)"))

    grouped_df.show()

Spark의 partitionBy

  • SQL에서 지원하는 partition by를 Spark에서도 사용할 수 있다.
  • groupBy와의 차이는 groupBy는 개별데이터가 없어지고, 그룹화에 참여하지 않은 칼럼 정보가 사라지지만, partitionBy는 그렇지 않다는 점이다. (group by 하고, join을 한 것 동일한 효과를 낸다.)
  • pyspark.sql.window에 Window를 사용하여, window(그룹화 방식)을 설정해주고, 집계함수 뒤에 over 함수에 window를 넣어줘서 partitionBy를 수행한다.
from sklearn import datasets
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as F

if __name__ == '__main__':
    iris = datasets.load_iris()
    df = pd.DataFrame(data=iris.data, columns=iris.feature_names)
    df["species"] = iris.target

    spark = SparkSession.builder.appName("iris").getOrCreate()
    df = spark.createDataFrame(df)

    window = Window.partitionBy(F.col("species"))

    partitioned_df = df.select("*", F.avg(F.col("petal width (cm)")).over(window).alias("avg petal width (cm)"))

    partitioned_df.show()

 

Spark의 집계 연산 함수 종류

Pyspark에서 집계함수를 위한 API도 pyspark.sql.functions에 선언되어 있다. 
 
[Agg 함수]

  • count
    • 가장 기본적인 집계함수로 해당 그룹에 포함되는 row의 개수를 세는데 사용한다.
    • spark에서는 SQL과 동일하게 count라는 API를 제공한다.
grouped_df = df.groupBy(F.col("species")).agg(F.count(F.col("petal width (cm)")))
grouped_df.show()

 

  • countDistinct 
    • 집계를 하다보면, 단순 포함되는 element 개수가 중요한 경우도 있지만, 값 자체가 중요한 경우가 많다.
    • 중복 값을 제외하고, 고유한 값의 개수를 집계한다.
    • SQL에서는 count(distinct column)의 형식으로 표현되지만, spark에서는 countDistinct라는 별도의 API로 집계한다.
grouped_df = df.groupBy(F.col("species")).agg(F.countDistinct(F.col("petal width (cm)")))
grouped_df.show()
  • approx_count_distinct
    • 기본적으로 count distinct 연산은 속도와 메모리를 많이 사용한다. 고윳값의 개수를 정확히 알지 않아도 되는 연산등에서는 이러한 연산은 불필요한 cost를 발생시킨다.
    • 예를 들어, 통계분포를 사용하기 위해, 대략 고윳값이 30개 이상인지 아닌지를 파악하기 위해서, countDistinct 함수를 사용하는 것은 매우 비효율적이다.
    • pyspark에서는 approx_count_distinct 함수를 제공하여, 대략적인 고윳값의 개수를 파악할 수 있다. approx_count_distinct 함수는 HyperLogLog 알고리즘을 사용하여, 근사치를 사용하여 계산한다. (메모리 내의 데이터 분포로 전체 데이터의 고윳값 개수를 추정)
    • relativeSD 인자를 통해, 정확성을 조정 가능하다. (기본값은 0.05) 정확성이 높아질수록, 메모리 사용량 및 연산 시간은 늘어남.
grouped_df = df.groupBy(F.col("species")).agg(F.approx_count(F.col("petal width (cm)")))
grouped_df.show()

 

  • first, last
    • 특정 순서로 정렬되어 있는 데이터들에서 그룹의 첫 값이나, 마지막 값이 필요한 경우가 있다. 
    • Spark에서는 first, last 함수를 제공하여, 해당 그룹의 첫번째 행의 값과 마지막 행의 값을 표기할 수 있다. 
    • first와 last가 유의미한 값을 갖기 위해서는 orderBy와 함께 사용되어야 한다. first와 last는 행의 정렬 기준으로 값을 추출하기 때문에 정렬되지 않은 DataFrame에서 유의미한 결과가 아닐 수 있다.
df = df.orderBy(F.col("petal width (cm)"))
grouped_df = df.groupBy(F.col("species")).agg(F.first(F.col("petal width (cm)")), F.last(F.col("petal width (cm)")))
grouped_df.show()

 

  • min, max
    • 통계값의 꽃 min, max이다. Spark에서도 모두 활용 가능하다.
    • min, max는 수치형 데이터 타입, 소수형 데이터 타입, timestamp 데이터 타입, datetype 등에 모두 사용 가능하다. 
    • 문자형 데이터 타입에서도 사용할 수 있는데, 사전순으로 정렬하여 가장 큰 값을 찾는다.
df = df.orderBy(F.col("petal width (cm)"))
grouped_df = df.groupBy(F.col("species")).agg(F.avg(F.col("petal width (cm)")), F.min(F.col("petal width (cm)")),
                                              F.max(F.col("petal width (cm)")))
grouped_df.show()
  • avg, sum
    • avg, sum도 Spark에서 활용 가능하다.
    • avg, sum은 수치형 데이터 타입, 소수형 데이터 타입에만 활용 가능하다. 
grouped_df = df.groupBy(F.col("species")).agg(F.avg(F.col("petal width (cm)")), F.sum(F.col("petal width (cm)")))
grouped_df.show()

 

  • sumDistinct
    • countDistinct와 같이, 중복을 제외하고 고유한 값들을 더하기 위해, sumDistinct가 존재한다.
grouped_df = df.groupBy(F.col("species")).agg(F.avgDistinct(F.col("petal width (cm)")))
grouped_df.show()

 

  • collect_list, collect_set
    • 그룹에 있는 모든 원소의 값들이 필요한 경우도 존재한다. 이 경우 collect_list와 collect_set을 이용하여, 각각 칼럼 값들을 원소로 한 list와 set을 구할 수 있다.
    • 특히, 그룹화 후, 파이썬등으로 인자를 넘길때, collect_list와 collect set을 많이 활용한다.
    • 다만, 그룹화 시에, row 순서가 유지되지 않기 때문에 주의해야 한다.
grouped_df = df.groupBy(F.col("species")).agg(F.collect_list(F.col("petal width (cm)")),
                                              F.collect_set(F.col("petal width (cm)")))
grouped_df.show()

 

  • mode
    • Spark 3.4 버전에서 새로 등장한, 최빈값을 추출하는 집계함수이다.
grouped_df = df.groupBy(F.col("species")).agg(F.mode(F.col("petal width (cm)")))
grouped_df.show()
  • product
    • 그룹 내의 모든 값을 곱하는 집계함수이다. 
grouped_df = df.groupBy(F.col("species")).agg(F.product(F.col("petal width (cm)")))
grouped_df.show()

 
이 밖에도, 표준편차나 분산, 첨도 등을 구하는 다양한 함수들이 존재하니, Spark 공식 문서를 참고하는 것이 좋다.
https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html

Functions — PySpark 3.4.1 documentation

Computes the first argument into a string from a binary using the provided character set (one of ‘US-ASCII’, ‘ISO-8859-1’, ‘UTF-8’, ‘UTF-16BE’, ‘UTF-16LE’, ‘UTF-16’).

spark.apache.org

 
 
[Window 함수]

  • row_number, rank, dense_rank
    • 그룹화한 후에, 그룹 내에서 순위등을 매기는 경우가 있다. 이러한 경우에는 row_number나 rank, dense_rank를 사용할 수 있다.
    • row_number와 rank, dense_rank는 window에 orderBy 절을 추가해야 한다.
    • row_number는 정렬 후 row 순대로 값이 나오기 때문에 빈값이 없지만, 값이 같은 경우 orderBy 된 순서가 달라질 수 있다. rank의 경우에는 빈값이 존재 가능하다. (1위 동률이 많으면, 2등이 없다.) dense_rank는 빈값 없이 순위를 매긴다. 
window = Window.partitionBy(F.col("species")).orderBy(F.col("petal width (cm)"))
partitioned_df = df.select("*", F.row_number().over(window).alias("row_num"), F.rank().over(window).alias("rank"),
                           F.dense_rank().over(window).alias("dense_rank"))
partitioned_df.show()

 
 

Pyspark 집계 연산 함수의 아쉬운 점

  • Spark의 집계 함수를 사용할 때, 가장 헷갈리는 부분은 groupBy 시에 collect_list와 같은 복합 집계 함수를 사용하였을 때, 순서가 유지되느냐였다.
  • 우선, collect_list를 여러 칼럼에 사용하면, 묶는 순서는 유지된다. 즉, collect_list를 여러 칼럼에 사용했더라도, 같은 index에 존재하는 값은 같은 row를 통해서 온 값이다.
  • 하지만 Pyspark의 집계함수가 아쉬운 점이 하나 있는데, groupBy의 collect_list 때, 순서를 지정해 줄 수 없다는 점이다. 
  • SQL에서는 아래와 같은 기능이 하나의 쿼리를 통해 구현된다. 
SELECT STRING_AGG(COL1,',' ORDER BY COL4),STRING_AGG(COL2,',' ORDER BY COL5) FROM TABLE GROUP BY COL3
  • Pyspark의 경우, 각기 다른 collect_list가 같은 정렬 기준으로 결합된다면, orderBy 후에 groupBy를 수행하면 되지만, 위와 같은 SQL을 처리하기 위해서는 orderBy를 여러 번 호출해서 withColumn 형식으로 groupBy 테이블에 추가해줘야 한다는 점이다.
  • 그래도, 여러 단계를 거치면 구현이 가능하다는 점과, Spark는 계속 발전하고 있다는 점이 긍정적이다...

 


SQL의 문법이 익숙해서, Pyspark가 약간 아쉬운 면도 있는 것 같다. 하지만, UDF의 호환성이 훌륭하다는 점에서 Pyspark는 매우 좋은 것 같다.

반응형

업무에서 Cluster 기반의 Database 엔진을 사용하고 있다. Database 서버 내에서 굉장히 많은 양의 PL/SQL Function을 사용 중인데, 이것을 Spark로 바꾸고 있다. 성격상 이론보다 개발부터 진행 중이지만, 이론도 꼭 필요하다고 생각하여 공부 중이다.


Spark의 구조적 API란?

  • 구조적 API는 주로 정형 데이터 (CSV, JSON, Table) 등의 데이터를 처리하기 위한 고수준의 API이다. 
  • 정형화가 가능한 데이터들은 일반적으로 Table 형태로 표현할 수 있는데, Spark는 이러한 Table 구조의 데이터들을 빠르게 연산할 수 있는 다양한 API 등을 지원해 준다. 
  • 매우 쉽게 생각하면, Database의 Query 혹은 Function 등의 기능을 제공해 주는 기능이라고 생각하면 된다.
  • API라는 말이 이질적으로 다가올 수 있는데, Spark는 실행 계획 수립과 처리에 사용하는 자체 데이터 타입 정보를 가지고 있는 Catalyst 엔진을 사용한다. 즉, Pyspark를 통해 전달하는 데이터도 Catalyst 엔진에서 자체 데이터 타입으로 변경된다. 사용자는 Spark 코드를 짠다고 하지만, 실제로는 사용자의 코드가 그대로 연산되는 것이 아니라, Catalyst 엔진에서 수행 가능하도록 만들어 놓은 API를 호출하는 것이기 때문에, API라고 부른다. 
  • 개인적인 경험상, SQL Query Function을 Spark API로 바꾸는 것이 복잡하긴 해도, 80% 이상의 변환이 가능하였다. (안 되는 부분도 몇 단계의 과정을 거치면 거의 변환이 되었다.)

구조적 데이터 종류

  • DataFrame : Run time 단계에서 데이터 타입 일치를 확인
  • DataRow : Compile 단계에서 데이터 타입의 일치 여부를 확인, 다만 Spark와 같이 JVM 기반의 언어인 Scala와 Java에서만 지원한다.

 

Schema

  • DataFrame은 Schema로 정의되는데, Schema에는 칼럼명과 각 칼럼의 데이터 타입을 정의한다.
  • Schema는 데이터 소스에서 읽거나, 직접 정의가 가능하다. 
  • 간단한 데이터 분석 등에 대해서는 타입 추론을 많이 사용하지만, 운영을 위한 ETL 등에서는 데이터 타입을 미리 지정해 주는 것이 좋다.

 

Spark 구조적 데이터 타입 

  • 기본적으로 Python은 타입추론을 하여, 변수의 타입 선언에 민감하지 않다.
  • Spark는 앞서 말했듯, Python에서 연산을 처리하는 것이 아닌, Catalyst 엔진에서 데이터의 연산을 처리하기 때문에, DataType을 지정해줘야 한다.
  • 기본적으로 Pandas의 Dataframe처럼 DataFrame 생성 시, 데이터의 타입을 추론하지만, 특정 칼럼의 전 데이터가 NULL인 경우 등, 데이터 타입 추론이 불가한 경우에는 에러가 뜰 수 있기 때문에, DataFrame 선언 시 아래의 데이터 타입을 지정해 주는 것이 좋다. 
  • DataFrame 내에는 Array나 Struct 타입의 복합 데이터 타입도 지정할 수 있다.

[Spark, Python 데이터 타입 비교 및 사용 예시]

(편의를 위해, spark session 접근 및 "import pyspark.sql.functions as F"는 생략

Spark 데이터 타입 Python 데이터 타입 pyspark import & 사용예시
ByteType bytes from pyspark.sql.types import ByteType
byte_column = F.lit(byte_value).cast(ByteType())
ShortType int from pyspark.sql.types import ShortType
short_value = 10
short_column = F.lit(short_value).cast(ShortType())
IntegerType int from pyspark.sql.types import IntegerType
int_value = 100
int_column = F.lit(int_value).cast(IntegerType())
LongType int from pyspark.sql.types import LongType
long_value = 1000
long_column = F.lit(long_value).cast(LongType())
FloatType float from pyspark.sql.types import FloatType
float_value = 3.14
float_column = F.lit(float_value).cast(FloatType())
DoubleType float from pyspark.sql.types import DoubleType
double_value = 2.71828
double_column = F.lit(double_value).cast(DoubleType())
DecimalType decimal from pyspark.sql.types import DecimalType
decimal_value = Decimal('3.14159265359')
decimal_column = F.lit(decimal_value).cast(DecimalType(10, 10))  # (precision, scale)
StringType str from pyspark.sql.types import StringType
string_value = "Hello, World!"
string_column = F.lit(string_value).cast(StringType())
BinaryType bytes from pyspark.sql.types import BinaryType
binary_value = bytes([0, 1, 2, 3, 4])
binary_column = F.lit(binary_value).cast(BinaryType())
BooleanType bool from pyspark.sql.types import BooleanType
boolean_value = True
boolean_column = F.lit(boolean_value).cast(BooleanType())
TimestampType datetime의 timestamp from pyspark.sql.types import TimestampType
from datetime import datetime
timestamp_value = datetime(2023, 9, 3, 12, 0, 0)
timestamp_column = F.lit(timestamp_value).cast(TimestampType())
DateType date from pyspark.sql.types import DateType
from datetime import date
date_value = date(2023, 9, 3)
date_column = F.lit(date_value).cast(DateType())
ArrayType list from pyspark.sql.types import ArrayType, IntegerType

python_list = [1, 2, 3, 4, 5]
array_column = F.lit(python_list).cast(ArrayType(IntegerType()))
MapType dict from pyspark.sql.types import MapType, StringType, IntegerType

python_dict = {"apple": 1, "banana": 2, "cherry": 3}

map_column = F.lit(python_dict).cast(MapType(StringType(), IntegerType()))
StructType class
(C언어의 구조체 데이터 타입)
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

python_struct = [("Alice", 30), ("Bob", 25), ("Charlie", 35)]

schema = StructType([
    StructField("name", StringType(), nullable=True),
    StructField("age", IntegerType(), nullable=True)
])

struct_column = F.lit(python_struct).cast(schema)
StructField class
(StructType의 각 필드에 대한 메타 정보 설정에 사용됨)

 

 

구조적 API의 실행 과정

  • Spark를 공부하면서 느낀 점은 Database 엔진과 매우 비슷하다는 점이다. Database 엔진은 SQL에 대해서 통계적 정보 등을 이용하여 실행 계획을 만드는데, Spark도 마찬가지이다. 
  • 구조적 API는 다음과 같이 실행된다.
    1. 다양한 언어를 통해, DataFrame/SQL 등을 이용한 코드를 작성한다.
      • 코드가 검증 전 논리적 실행 계획으로 변환된다. 이 과정은 코드의 유효성이나, 테이블, 칼럼 등의 존재 여부만 확인한다. 쉽게 말해서, 문법검사 단계만 진행한다고 생각하면 된다.
    2. 코드에서 이상이 없다면, Spark가 이 코드를 논리적 실행 계획으로 변환한다.
      • Spark의 Analyzer에 의해, 칼럼과 테이블등을 검증한다. 이 과정에서 Catalog와 DataFrame의 정보 등을 활용한다. 
      • Catalyst의 Optimizer에 의해 Predicate Pushing Down 등의 논리 계획 최적화가 이뤄진다. (Database 엔진의 Catalyst Optimizer와 거의 동일하다.)
    3. Spark의 논리적 실행 계획을 물리적 실행 계획으로 변환하고, 추가적 최적화를 진행한다.
      • 각 Plan에 대한 Cost를 고려하여 논리적 실행 계획을 물리적 실행 계획으로 변환한다. 
      • 특히, Spark는 Cluster 환경에서 많이 사용하기 때문에, 이 부분이 Spark의 강점이다. (사실 Cluster 기반의 Database 엔진의 강점이기도 하다.)
    4. Spark는 Cluster에서 물리적 실행계획(RDD 처리)을 실행한다.

 

 

구조적 API 기본 연산 

  • 구조적 API 사용을 위해, 기본적인 연산들을 알아놓아야 한다. 거의 대부분이 SQL에 있는 함수 및 쿼리이고, 동일한 동작을 한다.
  • 글 쓰는 시점에서 가장 최근인 3.4.1(23.6.20) 기준으로 작성했다. (Spark는 계속 업데이트 중이다. 사용 전에 함수를 살펴보는 것도 좋다. : https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/index.html)
  • 매우 유용한 함수들이 많지만, 나의 기준에서 많이 활용했던 기본 Function 기준으로 소개한다.

 [DataFrame 관련 함수]

 

 

  •  Select
    • SQL에서와 마찬가지로, spark에서 칼럼 조회를 위해 사용하는 API이다. Query와 동일하게 select 안에 칼럼을 지정해 주면 되고, 모든 칼럼을 의미하는 '*'도 동작한다.
# 모든 칼럼을 선택
selected_df = df.select("*")

# 특정 칼럼을 선택
name_df = df.select("name")

# 다중 칼럼을 선택 가능
column_list = ['a','b','c']
selected_df = df.select(*column_list)

# Alias 사용 가능
selected_df = df.select("name").alias("name2")
  • SelectExpr
    • SelectExpr은 select와 비슷하지만, SQL처럼 표현식을 사용가능하다. 'as'를 이용한 alias 지정도 가능하고, 숫자형 칼럼에는 연산도 가능하다. 
    • Select를 SQL에 가까운 형태로 사용할 수 있다.
# 칼럼 이름 변경하여 선택
name_renamed_df = df.selectExpr("name as full_name")

# 새로운 칼럼 추가하여 선택
age_plus_10_df = df.selectExpr("*", "age + 10 as age_plus_10")
age_double_10_df = df.selectExpr("*", "age * 2 as age_double")
  • withColumn
    • withColumn은 DataFrame에 새로운 칼럼을 추가하거나, 기존 칼럼을 변경하는 데 사용된다. 
    • 한 번에 하나의 칼럼밖에 못 바꾸기 때문에, 여러 칼럼을 바꿔야 하면, withColumn을 계속 연결해서 사용해줘야 한다.
    • withColumnRenamed를 사용하면, 칼럼명을 바꿀 수 있다. (바꿀 칼럼명, 바뀔 칼럼명)
# withColumn으로 새로운 열 추가
df = df.withColumn("square_age", df["age"]*df["age"])

# withColumnRenamed로 칼럼명 변경
df = df.withColumnRenamed("square_age", "squared_age")
  • Drop
    • Drop은 DataFrame에서 칼럼을 제거할 때 사용한다. 
    • Drop의 호출만으로 DataFrame의 칼럼이 제거되는 것이 아닌, 새로운 DataFrame에 칼럼이 제거된 상태로 할당되는 구조이기 때문에, 꼭 반환값을 받아야 한다.
# 특정 칼럼을 제거한 DataFrame 생성
df_without_age = df.drop("age")
df_without_name = df.drop(col("age"))
  • Cast
    • Cast는 DataFrame의 칼럼의 데이터타입을 변경하는 함수이다. 
# "age" 칼럼의 데이터 타입을 문자열(string)로 변환
df_casted = df.withColumn("age_string", df["age"].cast("string"))
  • Where/Filter
    • Where와 Filter는 동일한 동작을 하는 함수로, DataFrame에서 특정 조건을 만족하는 행을 선택할 때 사용된다.
    • SQL의 Where와 같다. 
    • and 조건 (&)과 or 조건 (|) 연산자를 지원하기 때문에, 조건을 하나의 filter나 where에 담을 수 있다. (중복 조건일 때는 연속해서 where or filter를 걸어도 된다.)
# Filter와 Where 조건
filtered_df = df.filter(df["age"]>=30)
where_df = df.where(df["age"]>30)

# 다중 Filter
filtered_df = df.filter(df["age"] <=20 | df["age"] > 50)
filtered_df = df.filter(df["age"] <=30 & df["name"] == "Choi")
filtered_df = df.filter(df["age"] <=30).filter(df["name"] == "Choi")
  • Distinct
    • Distinct 함수는 중복 Row를 제거하고, 고유한 Row만 선택한다.
    • SQL의 distinct와 동일하다.
    • subset 매개변수로, 특정 칼럼들을 list 형태로 전달하면, 해당 칼럼을 기준으로 Distinct가 가능하다.
# 전체 칼럼 중, 중복행 제거
distinct_df= df.distinct()

# 특정 칼럼 중복행 제거
distinct_df= df.distinct(["name"])
  • sort, orderBy
    • sort와 OrderBy는 특정 칼럼 기준으로 순서를 나열하기 위해 사용하는 함수이다.
    • 둘 다 비슷한 동작을 하지만, 사용 방법이 약간 다르다.
    • sort는 ascending 인자를 True or False로 오름차순, 내림차순을 선택하고, orderBy는 칼럼기준으로 asc함수나 desc 함수를 사용하여, 정렬 방법을 지정해줘야 한다.
    • orderBy는 각 칼럼의 정렬순을 따로 지정(어떤 칼럼은 오름차순, 어떤 칼럼은 내림차순으로)할 수 있기 때문에 유용하다.
# sort, orderby를 이용한 정렬
sorted_desc_df = df.sort(col("age"), ascending=False)
sorted_desc_df = df.orderBy(col("age"))

# 다중 칼럼 정렬
sorted_desc_df = df.sort(["age","name","city"])
sorted_df = df.sort(col("age"), ascending=True).sort(col("name"), ascending=False).sort(col("city"), ascending=True)
sorted_df = df.orderBy(col("age").asc(), col("name").desc(), col("city").asc())
  •  limit
    • limit은 DataFrame에서 특정 수의 행을 선택하여 반환하는 데 사용한다. 
    • limit을 이용하여, 특정 칼럼 이상을 Filtering 하거나, 일부 데이터만 보는 데 사용할 수 있다. 
    • 다만, 특정 순으로 정렬되어 있지 않으면, 순서가 유지되지 않을 수 있다.
    • Oracle의 rownum 조건이나, Postgresql의 limit과 동일하다.
sort_df = df.orderBy[col("age")]
limited_df = sort_df.limit(2)
  • repartiton
    • repartition은 DataFrame의 partition을 변경하거나, 재분할하는 데 사용된다.
    • 숫자를 인자로 받아서, 해당 숫자만큼의 partition으로 나눈다. (칼럼을 지정하지 않으면, 모든 칼럼을 고려하여 나눈다.)
    • Spark의 강점이 데이터 분산 처리이기 때문에, 데이터 분산을 최적화하여, 성능에 큰 영향을 미친다. 
# 단순 partition 숫자 지정
repartitioned_df = df.repartition(4)

# partition 칼럼 지정
repartitioned_df = df.repartition(4, "name", "age")
  • coalesce
    • coaleasce는 DataFrame의 partition 수를 줄이는 데 사용된다. (repartition은 증/감이 모두 가능하다는 것과 다르다.)
    • partition을 줄일 때는 repartition보다 성능이 좋기 때문에(default가 shuffle을 사용하지 않기 때문에), 줄일때는 coalesce를 사용하는 것이 좋다.
    • 숫자를 인자로 받아서,  해당 숫자만큼의 partition으로 줄인다.
    • 불필요한 partition을 줄여서 오버헤드를 줄이는 데 사용할 수 있다. 
coalesced_df = df.coalesce(2)
  • collect, show, take
    • collect, show, take는 DataFrame에서 데이터를 가져오는 데 사용하는 함수이다.
    • collect는 DataFrame의 전체 row를 로컬 메모리에 array 형태로 변수 할당할 때 사용하는 함수이다. 이를 이용해 unittest 등에 활용 가능하다. 다만, 모든 데이터를 다 불러오기 때문에, 로컬에 메모리가 충분할 때만 사용하는 것이 좋다.
    • show는 DataFrame의 일부 행을 로컬 메모리에 출력한다. Default는 20이며, 인자로 숫자를 지정해 주면, 그 숫자만큼 이 출력된다. Spark는 비동기 처리이기 때문에, 중간중간에 show를 수행해 줘야, 데이터 처리의 중간단계를 확인 가능하다.
    • take는 collect와 비슷하나, 인자로 숫자를 받아, 해당 숫자만큼의 상위 데이터들을 가져와서 array 형태로 반환한다.
df.show(2)
data = df.take(2)
collect_data = df.collect()

[기본 연산을 위한 함수]

기본적으로 pyspark에서 function은 pyspark.sql.functions에서 import 가능하다. (보통 alia로 F를 지정해서 많이 사용한다.)

 

  • Lit
    • Lit은 column에 상수값을 추가하거나, 리터럴 값을 생성할 때 사용하는 함수이다. when이나 expr이랑 같이 사용하면, 복잡한 연산등을 같이 담을 수 있다.
df = df.withColumn("country", F.lit("KOREA"))
df_with_country = df.withColumn("d_day", F.lit(50))
  • nanvl
    • nanvl은 2개의 칼럼을 받아서, 첫 번째 칼럼이 nan이 아닐 경우 첫번째 칼럼을, nan일 경우 두 번째 칼럼을 반환하는 함수이다. 
    • SQL의 nvl이나 postgresql의 coalesce와 동일하다.
df_with_age = df.withColumn("age", F.nanvl(df["age"], 0.0))
  • union
    • union은 두 개 이상의 DataFrame을 결합하여 새로운 DataFrame을 생성하는 데, 사용된다.
    •  결합하는 DataFrame은 모든 행이 동일한 데이터 타입을 가져야 한다.
    • SQL의 merge와 다른 점은 중복행을 제거하지 않는다는 것이다. (만약, 필요하다면, union 후에 distinct 처리가 필요하다.)
# 두 개의 DataFrame 생성
data1 = [("Minsu", 30), ("Daehwi", 30)]
data2 = [("Jaeeun", 28), ("JoonSung", 29)]
columns = ["Name", "Age"]
df1 = spark.createDataFrame(data1, columns)
df2 = spark.createDataFrame(data2, columns)

# 두 개의 DataFrame을 합침
union_df = df1.union(df2)
  • join
    • join은 DataFrame을 합치는 데 사용된다. 
    • SQL의 join과 같은데, SQL이 inner, outer, left, right join이 각각 있는 것에 비해, join 함수에서 인자로 결합 방식을 선택한다.
    • on에 공통 열의 이름을, how에 join 방법(inner, outer, left, right)을 지정해 준다.
    • join 후에는 두 테이블의 모든 칼럼명이 존재한다. 이때, 테이블의 alias를 지정 안 하고, 두 테이블에 공통으로 존재하는 칼럼을 참조하려 하면, 어느 테이블의 칼럼을 의미하는지 몰라서(join에 참여했든 안 했든) 에러가 뜬다. 따라서, join 전에 드라이빙 테이블과 join 테이블 모두를 alias 지정해 주고, join 후에 select로 사용할 칼럼만 남기는 것이 좋다.
# 두 DataFrame을 "Name" 열을 기준으로 inner 조인
result_df = df1.alias("A").join(df2.alias("B"), on="Name", how="inner")
result_df = result_df.select("A.Name", "A.ID", "B.Occupation")

# 두 DataFrame을 "Name"과 "EmployeeName" 열을 기준으로 inner 조인
result_df = df1.join(df2, col("Name") == col("EmployeeName"), how="inner")
  • isNull
    • isNull은 DataFrame의 특정 칼럼이 null인지를 확인하기 위한 함수이다.
    • Null일 경우 True, 아닐 경우 False를 반환한다.
filtered_df = df.filter(col("age").isNull())
  • monotonically_increasing_id
    • monotonically_increasing_id는 DataFrame에 1씩 증가하는 interger 형태의 숫자를 부여한다.
    • GroupBy 등에서 grouping 번호를 지정할 때, 매우 요긴하게 사용 가능하다.
df_with_id = df.withColumn("ID", monotonically_increasing_id())

 

 

 


SQL을 많이 쓰다 보니, Spark의 직관적이지 않은 데이터 처리 방법이 익숙하지 않다. 다음 시간에는 데이터 타입에 특화된 함수를 중심으로 알아봐야겠다. 

반응형

사실, 비정형 데이터를 분석하는 게 더 재밌고 공부할 부분도 많지만, 내 기준으로 현업에서는 정형데이터를 다루는 경우가 많은 것 같다. 정형 데이터를 처리할 때, 처리 속도가 빠르고, 설명력이 좋은 머신러닝 알고리즘을 선호하는 경우가 많은데, 대표적인 것이 바로 의사결정나무이다. 


의사결정나무란?

  • 의사결정나무는 계층적으로 데이터를 분할하면서, 의사 결정 규칙을 학습하여, 데이터 분류와 회귀 분석에 사용할 수 있는 예측 모델이다.
  • 의사결정나무의 가장 큰 장점은 해석력이다. 의사결정나무는 다른 머신러닝 기법들과는 다르게, 직관적으로 분할 규칙을 이해할 수 있다.
  • 의사결정나무는 특성중요도(Feature Importance, 어떤 Feature가 예측에 큰 영향을 미치는지)를 쉽게 파악할 수 있다.  
  • 의사결정나무는 일반적으로 연산 cost가 적은 편이다. 
  • 의사결정나무는 Overfitting에 취약하다. 따라서, Depth 제한 등을 둬야한다.
  • 또한, 의사결정 나무는 각 분기마다의 최적을 계산하기 때문에, 성능을 조금이라도 더 올려야 하는 Global Optimum을 찾는 문제에 약하다. 
  • CNN의 유형에 다양한 모델이 있듯, 의사결정나무에도 ID3, C4.5, CART, CHARID 등 다양한 알고리즘 등이 존재한다. 

 

의사결정나무 구성요소

의사결정나무에는 Node, Branch, Depth 개념이 있다.

  • Node : 의사결정나무에서 분할 기준과 그 답(위의 결정 나무 그림에서 박스로 표시되는 부분)
    • Root Node: 맨 상위에 존재하는 Node
    • Inter Node : 중간에 존재하는 Node
    • Terminal or Leaf Node : 맨 마지막, 자식이 없는 node
  • Branch : 하나의 Node로부터 끝 Node까지 연결된 Node들
  • Depth : Branch를 이루는 Node의 개수

 

의사결정나무 기본 원리

  • 의사결정나무의 핵심 아이디어는 각 분기마다, 데이터를 가장 잘 구분할 수 있는 특성을 뽑아서, 데이터를 분할하는 것이다.
  • 즉, 데이터를 분할하는 방식이 매우 중요한데, 이를 위해 불순도(Impurity) 개념을 사용한다.
  • 불순도란 특정 노드 내에 있는 데이터들이 얼마나 서로 다른 클래스 또는 값들을 가지고 있는지를 의미한다. 즉, 현재 분할된 데이터의 집합들이 불순도가 낮다면, 이 데이터를 나누었던 분할과정이 잘 수행되었다고 할 수 있다. (같은 특징만을 가진 데이터끼리 모은 분할이기 때문에)
  • 다만, 단순 불순도를 낮추는 방향으로 학습하면, 이미 잘 분할된 데이터도 계속 분할하는 상태가 발생한다. 이를 예방하기 위해 , 현재 노드의 불순도와 자식 노드의 불순도 차이를 Information Gain이라고 하고, Information Gain이 클수록 좋은 분할 기준이다. (가중 평균 불순도는 왼쪽 자식노드와 오른쪽 자식노드의 불순도를 각 노드의 데이터개수에 대한 가중평균을 통해 합한 값을 의미한다.)

  • 즉, 의사결정나무는 재귀적으로, Information Gain이 큰 방향으로 데이터를 분할하는 과정이다. 

 

불순도 측정

불순도를 측정하는 방법은 크게 3가지가 있다.

  • Gini 지수 : Gini 수는 무작위로 선택한 데이터의 Class가 오분류되었을 확률을 나타낸다. 전 노드에서 완벽하게 분할되었다면, 불순도가 하나의 class가 p가 1.0이 될 것이고, Gini 지수는 0이 된다. 

  • Entropy : Entropy는 자주 등장하는 개념이어서 익숙할 것이다. 데이터의 혼잡도를 의미한다. 마찬가지로, 전 노드에서 완벽하게 분할되었다면, 하나의 class만 있고, p가 1.0이나 0.0 값이다. 두 경우 모두 식의 값이 0이 되고, (1인 경우는 log항에 의해, 0인 경우는 극한을 생각해 보면 쉽게 이해 간다.) 합인 entropy도 0이 된다.

  • Variance : 회귀분석에서는 일반적으로 불순도를 분산으로 정의한다.

 

가지치기 (Pruning)

  • 위의 의사결정나무 과정을 요약하면, Information Gain이 최대가 되는 방향으로 자식노드를 만들어, 데이터를 분할하는 것이다. 
  • 다만, 이 경우에는 모든 데이터가 Terminal Node가 되어(Full Tree, 모든 데이터를 고유의 특성으로 각각 분할하여 일반화의 특성이 없어짐) Overfitting이 발생한다. 
  • 이를 해결하기 위해, 오히려 분기를 합치는 과정이 필요하다. 가지치기에는 2가지 방법이 있는데, 사전가지치기와 사후가지치기이다.
    • 사전 가지치기 (Pre-Pruning) : 분할하는 조건을 사전에 정의하여, 해당 조건을 만족하지 않으면 더 이상 분할하지 않는다. 한 Node의 데이터 개수가 일정 값 미만이면 분할을 멈추는 등의 방법이 있다.
    • 사후 가지치기 (Post-Pruning) : 우선 full tree를 만들고, 불필요한 Branch를 제거하는 방식. Pruning 후의 예측 성능과 복잡성 사이의 균형을 측정하도록 아래와 같이 Cost를 줌.

 

 

의사결정나무 파이썬 구현

  • 의사결정나무는 sklearn을 통해 쉽게 사용 가능하다. 
from sklearn.datasets import load_iris
from sklearn.tree import DecisionTreeClassifier

if __name__ == '__main__':
    # 데이터 불러오기
    iris = load_iris()
    X = iris.data
    y = iris.target

    # 의사결정 트리 모델 학습
    clf = DecisionTreeClassifier()
    clf.fit(X, y)
    
    # 예측
    new_data = [[5.1, 3.5, 1.4, 0.2]]
    predicted_class = clf.predict(new_data)
    print("Predicted class:", predicted_class)
반응형

Pypi란?

  • Pypi(Python Package Index)는 Python을 위한 오픈소스 패키지 저장소이다.
  • Pypi는 오픈소스 패키지를 매우 쉽게 설치할 수 있게 하여, 지금의 Python의 인기를 만든 가장 큰 요인이다. 

Pypi 명령어

  • 패키지 설치
    • 기본적으로 패키지 설치는 pip install을 통해 가능하다.
    • 특정 버전을 명시할 수 있지만, 버전을 명시하지 않으면, pip 버전 내 저장소 안에 있는 가장 최근 버전의 패키지를 설치한다.
    • --upgrade를 붙여주면, pip 버전 내 저장소 안에 있는 가장 최근 버전의 패키지를 설치해 준다. 
    • 실제 코드에선 패키지가 매우 많고, 의존성이 복잡하기 때문에 별도의 파일로 관리하는데, (보통 requirements.txt) 이때, -r 옵션을 붙여주면, 해당 txt를 읽어서, 그 안에 존재하는 패키지를 모두 설치해 준다. 
    • requiremets.txt는 위부터 아래로 순서대로 실행되기 때문에, 의존성이 있는 경우에는 순서에 유의해야한다.
    • pypi는 기본적으로 의존성이 있는 패키지들은 자동으로 설치해 주기 때문에, 의존성에 민감한 패키지들의 경우, --no-deps를 추가하여, 해당 패키지만 설치하기도 한다.
pip install {패키지명}
pip install {패키지명} == {버전}
pip install --upgrade {패키지명}
pip install -r {패키지 목록 파일}
pip install {패키지명} --no-deps

 

  • 패키지 삭제
    • 패키지를 설치하다 보면, 의존성이 꼬이는 경우가 많다. 이런 경우, pip install --upgrade를 사용하는 방법도 있지만, 아예 package를 지우고 다시 설치하는 편이 좋은 경우도 있는데, uninstall을 통해 삭제한다.
    • pip uninstall을 하면, 해당 패키지를 위해 설치되었던, 의존성 있는 라이브러리들은 잔존하게 되는데, 이것은 pip-autoremove를 통해 모두 삭제할 수 있다.
pip uninstall {패키지명}
pip install pip-autoremove #패키지 설치
pip-autoremove {패키지명}

 

  • 설치 가능한 패키지 조회
    • 기존에는 pip search를 통해, 특정 패키지의 설치 가능한 모든 버전을 확인할 수 있었다.
    • 하지만, 너무 많은 API call이 있어서, 지원을 멈췄다고 한다. 
    • 설치 가능한 패키지를 확인하는 방법은 pypi.org(https://pypi.org/)에 접속하여, 직접 검색해 보는 것 밖에 없다. (예전 버전은 pip search {패키지명} 사용하면 된다.)

 

  • 설치된 패키지 조회
    • pip list는 현재 환경 (local or 가상환경) 내에 존재하는 패키지들과 각 패키지의 버전을 나열해 준다.
    • pip freeze도 pip list처럼 설치된 패키지 목록을 출력하지만, requirements.txt를 바로 구성할 수 있는 형태로 출력된다. 
    • 따라서, pip freeze를 requirements.txt로 export 하고, 다른 환경에서 install 하면 현재 가상환경의 pypi 패키지들을 그대로 설치할 수 있다. (테스트 환경 구축 시 많이 활용한다.) 
pip list
pip freeze
pip freeze > requirements.txt

pip list
pip frreze

 

  • 의존성 확인
    • 계속 언급하듯, pypi 내에서는 의존성이 꼬이는 경우가 많다.(가상환경이 자주 쓰이는 이유이다.)
    • 물론, 설치 시에 의존성에 대한 문제를 제기하겠지만, 당장 실행되면 넘어가는 경우가 많다.
    • 이렇게 의존성이 꼬이는 경우가 많아, 의존성을 check 하기 위한 명령어가 존재하는데, 바로 pip check이다.
pip check

 

 

폐쇄망 Pypi 사용법

  • pypi는 기본적으로 원격 저장소에서 패키지를 가져오는 것이기 때문에, 인터넷 연결이 존재해야 한다. 
  • 아예 오프라인 환경이나, 방화벽 등에 의해, 원격 저장소에 접근하지 못하는 환경에서는 단순 pip install을 이용하여 설치가 불가능하다.
  • 우선, 프락시 서버가 있는 경우에는 아래와 같이 proxy 서버를 명시하여 사용할 수 있다.
pip install --proxy {proxy 서버 IP}:{proxy 서버 port} {패키지명}
  • proxy 서버도 존재하지 않는 경우에는 인터넷 망에서 패키지를 설치하고, 오프라인 환경에서 빌드하여 사용하는 방법을 사용할 수 있다.
    • 우선 인터넷망에서 pip download를 통해, 패키지를 빌드 전 파일로 내려준다.
    • 설치된 파일들을 USB 등을 통해, 폐쇄망 서버로 옮기고, 아래와 같이 --no-index(index 서버를 사용하지 않겠다는 뜻)와 --find-link 명령어를 포함한 pip install 명령어를 통해 설치해 준다.
    • download를 하나하나 하기 귀찮다면(보통은 의존성 때문에), pip freeze를 이용하여, requirement 형태로 떨군 후, pip download -r requirements.txt를 이용하여, 모든 패키지를 설치하고, 이 패키지를 폐쇄망에서 설치하는 방법도 있다.
(인터넷망) pip download {패키지명}
(폐쇄망) pip install --no-index --find-links={패키지 파일 저장 경로}

 

 

 

반응형

NaViT 배경 설명

  • NaViT은 Google DeepMind에서 2023년 7월(리뷰 시점에서 1달 전)에 나온 논문이다. 
  • Model 크기에 맞게 Input size를 조정하던 기존의 CNN 구조에서 벗어나, ViT로 다양한 resolution의 input을 학습하고자 하였다. 

 

Abstact

  • computer vision model에서 이미지 처리 전에 고정된 이미지 resoultion은 최적이 아님에도 불구하고, 보편적으로 사용된다.
  • ViT 같은 모델은 flexible한 sequence-based modeling을 제공하기 때문에, input sequence 길이를 가변적으로 사용 가능하다.
  • 이 논문에서는 ViT의 특징을 이용한, 학습과정에서 무작위의 resolution과 aspect ratio을 다룰 수 있도록 하는 sequence packing을 사용했다. 
  • 이 논문에서는 large-scale의 supervised 및 contrastive image-text pretraining을 통해 모델의 효과성을 보여준다.
  • 또한, 이 모델은 image, video classification이나, object detection, semantic segment등에 transfer 되어, 성능 향상에 사용될 수 있다.
  • inference time에서, input resolution의 유연성은 time cost와 performance의 trade-off 사이에서 최적을 찾을 수 있게 한다.

 

Introduction

[배경]

  • ViT는 간단하고, flexible, scalable 하며, 기존 CNN의 대체로 사용할 수 있다. 
  • ViT는 input image들을 resize 하고, 고정된 aspec ratio의 patch들로 잘라서, 모델의 input으로 사용한다.
  • 최근 고정된 aspect ratio을 사용하는 것을 대체할 방법을 제시하는 논문들이 등장하고 있다. 
  • FliexiViT는 다양한 patch size를 하나의 architecture에서 다룬다. 이것은 각 training step에서 patch size를 random sampling 하고, reszing algorithm이 초기 convolutional embedding이 다양한 patch size를 다룰 수 있게 하기 때문이다. (이 논문도 읽어 봐야겠다.)

[소개]

  • 이 논문에서 다른 이미지들에서의 여러 patch들이, single sequence로 묶여 처리하는 NaViT라는 모델을 소개한다. 논문의 아이디어는 NLP에서 다양한 example들을 singlel sequence로 처리하는 example packing에서 영감을 받았다.
  • NaViT은 다음과 같은 장점이 있다.
  • 학습 과정에서 training cost를 줄이기 위해, resolution을 randomly sampling 한다.
  • NaViT는 다양한 resolutons에서 좋은 성능을 보이고, cost-performance가 smooth 한 trade-off를 제공한다.
  • 고정된 batch shape은 aspect-ratio preserving resoltion sampling이나, variable token dropping rates, adaptive computation 등의 새로운 아이디어를 이끈다.
  • 동일한  computational cost 내에서, NaViT는 ViT를 능가하는 성능을 보인다.  이는 NaViT 한정된 computational cost 내에서 더 많은 양의 training example을 처리할 수 있기 때문이다.
  • 향상된 efficiency는 fine-tuning process에서도 이어진다. NaViT는 pre-training과 fine-tuning에서 다양한 resolution으로 학습되었기 때문에, 무작위 resolution으로 평가하였을 때, NaViT는 더욱 효과적이다. 

 

Method

  • 기존 딥러닝 모델들은 fixed 된 이미지 사이즈를 사용한다. 이를 위해, 이미지를 resizing 하거나, padding을 하는데, 성능이 떨어지고, 비효율적이다.
  • 한편, Language Modeling에서는 example packing(다양한 example들을 하나의 sequence에 묶어서 학습을 가속화함)을 이용하여, 고정된 sequence length의 한계를 넘는다. 
  • 이 논문에서는 image를 token으로 대하여, ViT도 같은 방법으로 효과를 볼 수 있음을 보여준다. 이를 Patch n'Pack이라 명명한다. 또, 이미지가 native resolution으로 학습될 수 있기 때문에, 이 모델을 NaViT로 명명한다.

[구조 변화]

  • NaViT는 ViT를 기반으로 만들어졌다. Patch n'Pack을 가능하게 하기 위해, 몇 가지의 구조적 변화를 주었다.

1. Masked self attention and masked pooling

  • example들이 서로 겹치는 것을 막기 위해, self-attention masked를 추가하였다.
  • 마찬가지로, encode 최상단의 masked pooling은 token을 pooling 하는 것을 목표로 한다. sequence 내의 각 example은 single vector representation으로 표현된다. 

2. Factorized & fractional positional embeddings

  • 무작위 resolution과 aspect ratio를 다루기 위해, factorized position embeddings를 제안한다. 
  • factorized position embeddings에서는 embeddings을 x방향과 y방향으로 decomposition 하고, 각각 합해진다.
  • 2개의 스키마를 고려하였는데, absolute embeddings와 factional embeddings이다. 특히, fractional embeddings는 상대적 거리이기 때문에, 이미지 사이즈와 무관하지만, original aspect ratio가 깨질 수 있다. 
  • 학습한 embedding과 sinusoidal embeddings, NeRF를 사용하여 학습된 Foueier positional embeddings을 고려한다.

 
[Training changes] 

  • Patch n' pack을 이용하여 학습과정에서 몇가지 새로운 테크닉을 사용할 수 있다. 

1. Continuous Token Dropping

  • Token dropping (학습 과정에서 input patch를 무작위로 빼는 것)은 학습을 빠르게 하기 위해 고안되었다. 
  • 기존 Token dropping에서는 모든 example들에서 동일한 비율로 token이 drop 되지만,  continuous token dropping은 이미지마다 drop 비율이 달라질 수 있다.
  • 이로 인해, 학습이 빨라지고(처리 token 양이 줄기 때문) , 학습 시에 완전 이미지(drop 안 한 이미지)도 같이 학습할 수 있다는 장점이 있다.

2. Resolution sampling

  • NaViT은 original image의 aspect ratio를 유지하면서, random sampling 된 사이즈를 이용한다. 
  • original ViT에서는 작은 이미지를 통해 throughput이 커지면, performance도 증가하는 특징을 가지고 있다. 
  • NaviT은 다양한 resolution을 학습 시에 사용하기 때문에, 높은 throughput과 큰 이미지로 학습이 모두 포함되기 때문에, original ViT보다 성능 향상을 보인다. 

 
[Efficiency of NaViT]

  • NaViT의 computational cost에 대한 장이다.

1. Self attention cost

  • 원래 이미지의 patch를 자를수록 computational cost가 매우 증가하지만(quadratic 하게), Transformer의 hidden dimension이 늘리면, computational cost가 original image를 한 번에 처리하는 것보다 아주 조금만 늘어난다.
  • 매우 긴 sequence를 사용하면, memory cost가 많아, 속도가 느려지는 현상이 있는데,  memory-efficient 방법을 사용하여 이 문제를 해결하였다.

 
2. Packing, and sequence-level padding

  • prediction을 위한 최종 sequence length는 동일해야 한다. 
  • 길이를 맞추기 위해, example들을 perfect combination 하는 것이 아닌, fixed length가 되도록, 더해서 맞추거나, padding을 사용한다. 
  • padding 되는 toekn은 전체의 2% 이하이기 때문에, 간단한 방법으로 충분하다.

 
3. Padding examples and the contrastive loss

  • per-token loss가 직관적이지만, 많은 computer vision model들은 example 단위의 loss를 구한다. 
  • 이를 도입하기 위해서는 example 개수의 max를 정해놓고, 그 이상은 drop 하는 방법을 사용한다. 그런데, 이럼 학습 시에 아무것도 학습하지 않지만, computational cost를 낭비하는 상황이 발생한다.
  • contrastive learning은 이 상황에 취약한데, time과 memory의 loss computational scale이 quadratic 하게 증가한다.
  • 이를 방지하기 위해, chunked contrastive loss라는 방법을 사용하는데, softmax 계산을 위해 모든 데이터를 모으는 것이 아닌, local device subset에 각각 데이터를 모으고, global softmax normalization을 위한 통계값만 뽑는 형식이다.
  • 이로 인해, 최대 example 수를 늘릴 수 있다.  

 
 

Experiments

  • NaViT은 기본적으로 original ViT을 따랐다. 
  • NaViT을 2개 setup으로 학습했다. classification training으로 JFT-4B를 사용했고(sigmoid cross-entropy loss 사용), contrastive language-image training으로 WebLI를 사용(contrastive image-text loss 사용)했다.
  • FLAX  library를 이용했고, JAX로 구현했다. 

 
[Improved Training Efficiency and performance]

  • 동일 computational cost에서 NaViT은 ViT을 능가한다. 
  • 동일 성능(Accuracy)을 위해, NaViT은 ViT보다 4배 이상의 빠르게 학습되었다.  

 

 
[variable resolution의 장점]

  • variable-resolution pre-training : fixed image로 학습한 최고의 결과도 variable resolution과 동일할 정도로, variable resolution이 우위에 있다.
  • variable-resolution fine-tuning : variable resolution으로 fine-tuning 한 게 성능이 더 좋고, fine-tuning을 low resolution으로 했더라도, higher resolution에 대한 성능을 유지하는 것을 보여준다. 

 
 

Reference

Dehghani, Mostafa, et al. "Patch n'Pack: NaViT, a Vision Transformer for any Aspect Ratio and Resolution." arXiv preprint arXiv:2307.06304 (2023).
 
 

논문 총평

  • 아이디어가 매우 간단하고, 직관적이여서 좋았다. 
  • 사실 아이디어도 중요하지만, 소스가 JAX로 짜놨다고해서, 코드를 한 번 보고 싶다!
반응형

JAX 란?

  • JAX란, 머신러닝의 연산을 가속화하기 위해, Google에서 발표한 컴파일러를 최적화하는 기술이다.
  • JAX는 머신러닝에서 필수적인 Autograd와 XLA(integrated with Accelerated Linear Algebra, 가속 선형 대수) 컴파일러를 통해, 머신러닝의 연산을 빠르게 실행해 준다.
  • JAX는 설치가 매우 쉽고, 기존 Python에서 구현된 Numpy를 쉽게 변환할 수 있어서, 많이 활용되고 있다. 
  • 다만 JAX는 구글의 공식 제품이 아닌, 연구 프로젝트 기 때문에, 아직은 이런저런 버그가 있을 수 있다고 한다.

 

JAX 설치 방법

  • JAX는 우선 기본적으로 Linux나 Mac 운영 체제에서 동작한다.
  • Window도 동작하기는 하지만, 실험버전으로 CPU를 활용한 jax만 지원된다. (WSL을 사용하면 GPU를 사용할 수 있긴 하다.)

[CPU 설치]

pip install --upgrade "jax[cpu]"

 

[GPU & TPU 설치]

  • GPU에서도 pypi를 통해 쉽게 설치가 가능하다. 하지만, GPU는 Linux 환경에서만 설치되는 것을 명심하자. (나의 경우에는 WSL로 진행했다.)
# CUDA 12 
pip install --upgrade "jax[cuda12_pip]" -f https://storage.googleapis.com/jax-releases/jax_cuda_releases.html
# CUDA 11
pip install --upgrade "jax[cuda11_pip]" -f https://storage.googleapis.com/jax-releases/jax_cuda_releases.html

 

JAX 기본 기능

jax.numpy

  • jax는 기본적으로 jax.numpy를 통해, numpy의 API를 그대로 호환해 준다.
  • jax.numpy와 numpy는 거의 비슷하지만, 차이가 있는데,  jax는 함수형 프로그래밍으로 설계되어 있다는 점이다.
  • 즉, numpy는 배열에 직접 접근해서, 값을 바꾸는 것이 허용되지만, jax.numpy는 데이터를 직접 조작하는 것이 허용되지 않는다. → 거의 모든 Python 가속기들의 특징인 것 같다.
  • 다만, 값을 직접 바꾸는 것은 불가능하지만, 해당 요소를 반영한 새로운 배열을 생성할 수 있다.
import jax.numpy as jnp

if __name__ == '__main__':
    data = jnp.array([1,2,3,4])
    data[0] = 5
    # ERROR

    data = data.at[0].set(5)
    # data = [5,2,3,4]

 

 

[grad]

  • JAX는 native Python 및 numpy 코드를 자동으로 미분할 수 있는 기능을 제공한다.
  • JAX의 grad 함수는 함수의 입력에 대한 gradient를 자동으로 계산해 주는 함수이다.
  • JAX의 grad 함수는 loss의 기울기를 구할 때, 매우 빠르고 쉽게 활용될 수 있다.
  • JAX의 grad는 N차 미분값까지 쉽게 구할 수 있다.
import jax
import jax.numpy as jnp

def square(x):
    return x ** 2

if __name__ == '__main__':
    grad_square = jax.grad(square)

    # Calculate Gradient
    x = jnp.array(2.0)
    grad_value = grad_square(x)

    print("Input:", x)
    print("Gradient:", grad_value)

 

 

[jit]

  • jax.jit 함수는 JAX에서 제공하는 함수를 최적화해 주는 메커니즘이다. 
  • jit 함수를 통해, 정의한 함수를 컴파일하여, 최적화된 코드로 변환하고, 이를 Cache에 저장해 둔 뒤, 호출 시, 최적화된 코드를 통해 빠르게 실행된다.
  • 최적화된 Code를 Cache에 저장해 두기 때문에, 반복 변환이나, 불필요한 변환은 피하는 것이 좋다.
  • 다만, jit은 아래와 같은 경우에는 속도 향상이 없거나, 오히려 늦어질 수 있다.
    • 변환하려는 함수 내에 제어문이 포함된 경우
    • 재귀함수 
    • 데이터를 조작하는 함수
    • 크고 복잡한 함수 → 변환을 위한 cost가 더 많이 들 수 있음
  • 다른 모듈처럼, jit 사용을 위해, 단순 decorator만 사용해 주면 된다. 하지만, 변환을 위한 cost가 더 많이 들 수 있기 때문에, 꼭 비교해 보고 사용하는 것이 좋다.
import jax
import jax.numpy as jnp

@jax.jit
def square(x):
    return x ** 2

if __name__ == '__main__':
    grad_square = jax.grad(square)

    # Calculate Gradient
    x = jnp.array(2.0)
    grad_value = grad_square(x)

    print("Input:", x)
    print("Gradient:", grad_value)

 

 

[vmap]

  • jax.vmap 함수는 함수를 Vector 화하여 mapping 하는 함수이다. 
  • vmap 함수를 통해, 배열의 각 요소에 함수를 병렬로 실행할 수 있다. (pandas의 apply와 비슷한 개념이다.)
  • jit과 vmap은 같이 사용될 수 있다. (jit을 먼저 래핑 한 후, vmap을 하거나, vmap을 래핑한 후, jit을 하거나 둘 다 가능하다.)
import jax
import jax.numpy as jnp

def dot_product(x, y):
    return jnp.dot(x, y)

if __name__ == '__main__':
    grad_square = jax.grad(dot_product)

    vectorized_dot_product = jax.vmap(dot_product)

    x = jnp.array([i for i in range(10000)])
    y = jnp.array([i for i in range(10000)])
    grad_value = dot_product(x, y)

 

 

JAX 사용후기

  • JAX는 기본적으로 multi GPU 환경이나, TPU 환경에서 유리하다. 나의 경우에는 single GPU 환경이기 때문에, JAX를 쓰면 오히려 변환에 더 오랜 시간이 걸렸다. (JAX가 분산에 최적화되었기 때문이다.)
  • JAX가 numpy를 호환한다고 하지만,  아직 torch 등의 딥러닝 프레임워크와 호환이 부족하다. 따라서, 단순 기존 코드의 최적화가 아닌, 분산 환경에서 속도를 향상시키기 위한 대대적 Refactoring이나 개발에 사용하는 것이 좋을 것 같다.
  • JAX는 현재 기준(2023.08.07)으로 CUDA 11버전까지만 지원한다. 이것도 환경을 제한하는 요소인 것 같다.
  • 그럼에도 불구하고, JAX는 딥러닝 코드를 Python 언어 내에서 최적화할 수 있는 선택지를 제공한다는 점에서 매우 유용한 것 같다.

 

 

반응형

Pandas 란?

  • Pandas는 파이썬에서 데이터 처리와 분석을 위한 라이브러리로, numpy를 기반으로 개발되었다.
  • Pandas는 DataFrame과 Series라는 데이터 구조를 사용하여, 데이터를 쉽게 처리할 수 있도록 한다.
  • Pandas는 C 언어 등, low level의 언어로 개발된 numpy를 기반으로 하고, 수년간의 버전 업그레이드로 그 자체에 최적화를 해놓았기 때문에, 일반적으로 Pandas에서 제공하는 함수들을 사용하는 것이 성능 면에서 가장 좋다.
  • 하지만, 데이터 크기와 연산의 복잡성에 따라, 특정한 상황에서는 Pandas의 성능을 최적화하기 위한 방법이 필요하다.

 

Pandas의 데이터 처리 최적화 원리

  • Pandas는 기본적으로 low level 언어로 최적화가 많이 되었기 때문에, Pandas 데이터 처리를 위한 연산 과정에 Python 언어로 처리하는 과정이 생략되는 것이 좋다. 
  • Pandas는 메모리 위에서 동작한다. 이에 따라, 메모리의 가용 용량을 벗어나는 데이터 처리 및 연산은 한 번에 처리할 수 없다. 따라서, 메모리를 효율적으로 사용할 수 있도록 변경하는 것이 좋다.

 

Pandas 데이터 로드

  • 사실, Pandas를 많이 활용하는 이유 중 하나는 Pandas의 Dataframe이  SQL 테이블 형태와 거의 유사하기 때문이다.
  • Pandas에 들어갈 데이터를 Code 내부에서 주입하는 경우도 있지만, 대부분의 경우, Database나, CSV File 등에서 Import 해오는 경우가 많다.
  •  Pandas는 앞서 말한대로, 메모리에 DataFrame을 올려놓고, 연산하는 형태이기 때문에, 메모리가 연산을 효율적으로 처리할 수 있도록, 작은 단위의 필요한 데이터만 사용하는 것이 연산 측면에서 유리하다.

 

1. Query 및 파일 최적화

  • Pandas에서 SQL이나 CSV 등의 Raw 형태의 데이터를 읽고, 이를 Filtering하여Filtering 하여 사용하는 경우가 많은데, 이는 Raw 데이터 전체를 메모리 올려, 메모리 & I/O 부담을 증가시킨다. 따라서, 필요한 데이터만 미리 Filtering 하여 가져오는 것이 좋다.
  • 이렇게 Pandas에서 필요한 데이터만 가져오면, 전체 Series의 갯수(Row 수)가 줄기 때문에, Index 활용 측면에서도 유리하다.
  • 예시로, 한국어 형태소 분석을 위해 SQL 테이블에서 1주일치 데이터를 읽어서, 처리하였는데, 하루씩 읽어서 7번 처리하는 게 속도 면에서 더 효율적이었다.  
  • 마찬가지로, 필요한 칼럼만 가져오는 것이 성능 면에서 유리하다.
df = pd.read_csv('raw_data.csv', usecols=['col1', 'col2', ...])

 

2. Data Type 미리 지정

  • Pandas는 자료 구조형 선언의 제약을 받지 않는 파이썬 위에서 돌아가기 때문에, 읽어온 데이터를 통해 Data Type을 추론하는 과정이 포함된다. 
  • Data Type을 사용자가 미리 지정하면, 1) Data Type 추론 과정을 줄일 수 있고, 2) 실제 필요한 데이터 크기에 맞는 정도만 메모리를 할당하기 때문에, 성능 면에서 유리하다.
  • 다만, Data Type을 미리 지정하는 것은, 추후 연산 시에 메모리를 효율적으로 사용할 수 있다는 장점이 있지만, 읽는 속도 자체에는 큰 영향을 미치지 않는다.
dtypes = {'col1': 'int', 'col2': 'float', ...}
df = pd.read_csv('raw_data.csv', dtype=dtypes)

 

3. chunk 옵션 사용

  • 위의 방법을 사용해도, 어쩔수 없이 대용량 데이터를 모두 사용해야 하는 경우가 많다.
  • 이와 같은 경우에는 DataFrame을 chunk 단위로 처리하는 것이 효율적이다.
  • chuncksize를 지정해줘서, 한 번에 읽을(메모리에 올릴) Series(row) 수를 지정할 수 있다.
  • 하지만, 전체 데이터가 같이 필요한 것들(특정 칼럼 sort 등)은 처리가 까다롭기 때문에, 다른 행들 간의 연산이 비교적 적은 경우에 활용하는 것이 좋다.
for chunk in pd.read_csv('raw_data.csv', chunksize=10000):
    processing(chunk)

 

4. Dask 사용

  • 만약, 메모리가 감당하기 어려운 정도의 어려운 정도의 데이터 양과 연산이 포함된다면, 대용량 데이터를 분산 처리하기 위해 개발된, Dask를 사용할 수 있다.
  • Dask는 Pandas와 달리, Disk에 저장된 데이터를 처리하기 때문에, 여러 머신에서 분산처리가 가능하고, 지연 연산을 사용하기 때문에, 실제 연산을 최적화하는 과정이 포함된다.(SQL의 실행 Plan을 생각하면 된다.) 따라서, 초 대용량 데이터 처리에는 Dask의 강점이 있다.
  • 하지만, 메모리가 감당 가능한 수준의 연산에서는 메모리와 디스크의 속도 차이 등 때문에, Pandas가 유리하다.
import dask.dataframe as dd
if __name__ == "__main__":
	df = dd.read_csv('raw_data.csv')

 

 

Pandas 연산 & 조회

[실험 데이터셋]

  • Pandas 연산 테스트를 위해, 예시 데이터로 Kaggle 데이터셋을 사용했다. (https://www.kaggle.com/datasets/jordankrishnayah/45m-headlines-from-2007-2022-10-largest-sites?resource=download)
  • 사용할 데이터는, 4.5M 분량의 2007년부터 2022년 주요 언론사의 기사 제목 headline 데이터이다. 데이터는 총, 4405392개 row로 구성되어 있고, [Date, Publication, Headline, URL]의 4개의 칼럼으로 구성되어 있다.

 

1. 반복문 최적화

  • Pandas 연산에서 가장 큰 성능 개선 포인트는 반복문 연산이다.
  • Pandas가 Python 언어로 동작하기 때문에, Python의 list의 개념으로 Dataframe을 다루기 때문에, 이런 문제가 많이 발생한다.
  • Pandas를 For문을 통해, 각 row에 접근하는 경우에는, 각 row마다 연산을 각각 실행한다. 이에 따라, 데이터 크기가 커질수록 연산의 Overhead는 가중화된다.
  • 가장 쉬운 방법은 Pandas의 apply를 사용하는 것이다. 
  • 또한, Numpy Vectorize를 사용하는 방법, iterrows, itertuples를 사용하는 방법들이 있는데, 일반적으로 itertuples와 numpy vectorize는 Pandas Apply보다 좋은 성능을 보인다고 알려져 있다.
  • 추가적으로 멀티스레드를 이용하여, Pandas 연산을 병렬화 하고, 효율적으로 처리하는 swifter가 있다.

 

(테스트 상황)

  • headline 데이터셋에서 URL의 "http://" or "https://" 부분을 제거하는 작업을 테스트
  • headline 데이터셋에서 Date의 연도를 제거하는 작업을 테스트

 

(1) 단순 For문

for i in range(data.shape[0]):
    data["URL"][i] = data["URL"][i].replace('http://', '').replace('https://', '')

→ 실행 시간 : 94848.09 s(1000 row 실행시간: 21.53으로 추정)

 

for i in range(data.shape[0]):
    data["Date"][i] = data["Date"][i]%10000

→ 실행 시간 : 572.70s(1000 row 실행시간: 0.13으로 추정)

 

 

(2) Pandas Apply

data["URL"] = data["URL"].apply(lambda x: x.replace('http://', '').replace('https://', ''))

→ 실행 시간 : 1.47s

data["Date"] = data["Date"].apply(lambda x: x % 10000)

→ 실행 시간 : 0.88s

 

 

(3) Numpy Vectorize

  • numpy에서 제공하는 vectorize를 통해, 연산하고자 하는 함수를 vectorize화 할 수 있다.
  • numpy를 import 하여 쉽게 사용할 수 있다.
import numpy as np
vectorize_function = np.vectorize(lambda x: x.replace('http://', '').replace('https://', ''))
data["URL"] = vectorize_function(data["URL"])

→ 실행 시간 : 59.09s

import numpy as np
vectorize_function = np.vectorize(lambda x: x % 10000)
data["Date"] = vectorize_function(data["Date"])

→ 실행 시간 : 0.46s

 

(4) Vector화 된, For문 사용

  • Pandas는 numpy로 만들어졌기 때문에, numpy의 데이터를 조회하기 위한 iterator의 순회문을 사용하면, 빠르게 데이터를 순회할 수 있다.

[iterrows]

temp_date = []
for i, row in data.iterrows():
	temp_url.append(row["URL"].replace('http://', '').replace('https://', ''))
data["URL"] = temp_url

→ 실행 시간 : 88.10s

temp_date = []
for i, row in data.iterrows():
	temp_date.append(row["Date"]%10000)
data["Date"] = temp_date

→ 실행 시간 : 86.14s

 

 

[itertuples]

temp_url = []
for row in data.itertuples(index=False):
    temp_url.append(row.URL.replace('http://', '').replace('https://', ''))
data["URL"] = temp_url

→ 실행 시간 : 3.37s

temp_date = []
for row in data.itertuples(index=False):
	temp_date.append(row.Date%10000)
data["Date"] = temp_date

→ 실행 시간 : 2.69s

 

 

(5) Swifter

  • Swifter는 pandas의 apply를 멀티스레드를 통해, 병렬화하여 빠르게 처리하도록 하는 파이썬 패키지이다.
  • pip install swifter를 통해, 설치 가능하다.
data["URL"] = data["URL"].swifter.apply(lambda x: x.replace('http://', '').replace('https://', ''))

→ 실행 시간 : 3.98s

data["Date"] = data["Date"].swifter.apply(lambda x: x % 10000)

→ 실행 시간 : 0.37s

 

[실험 결과] 

  • Python 영역의 연산을 활용하는 경우(Python 라이브러리 or String 사용 등)에는 apply나 itertuples를 사용한 순회가 가장 좋은 성능을 보인다. → 첫 번째 실험
  • Python 영역의 연산을 활용하지 않는 경우, 즉, Cython으로 변환이 가능한 연산등은 np.vectorize가 가장 좋은 성능을 보인다.
  • 데이터의 타입, 크기, 연산에 따라, 가장 적합한 연산은 다르겠지만,
    • 일반적으로 apply나 itertuples를 사용한 순회가 가장 좋다.(일반적으로 대용량에선 itertuples가 apply보다 낫다고 함.)
    • 연산이 간단한 경우(Cython으로 변환이 될만한 간단한 연산)에는 np.vectorize를 통해 최적화가 가능하다.
    • 단순 for문은 사용하지 않는 것이 좋다. 
    • Swifter는 데이터의 크기가 매우 크고, 연산이 복잡하지 않은 연산에서 효과적이다.

 

2. 특정 조건 데이터 조회

  • 특정 조건 데이터 조회는 Pandas에서 자주 사용된다. 
  • 연산에 비해, 긴 시간이 걸리지는 않지만, 데이터가 많고, 연산이 복잡해질수록 조건에 맞는 데이터를 찾는 시간이 오래 걸린다.

(테스트 상황)

  • headline 데이터셋에서 2022년부터 데이터 중, New York Times의 데이터를 찾으려고 한다.

 

(1) Boolean Type으로 indexing

  • 가장 일반적인 방법이다. 여러 개의 칼럼들의 조건의 boolean 형태로 각각 연산하여 구할 True인 값만 가져올 수 있다.
filtered_data = data[(data["Date"]>20220000) & (data["Publication"]=='New York Times')]

→ 실행 시간 :0.14s

 

 

(2) loc를 이용한 indexing

  • Boolean Type으로 indexing과 동일하다.
filtered_data = data.loc[(data["Date"]>20220000) & (data["Publication"]=='New York Times')]

→ 실행 시간 :0.14s

 

 

 

(3) query를 사용한 조회

  • Dataframe은 query를 지원한다. (하지만, like 등의 조건은 지원하지 않는다.)
  • 참조하는 칼럼이 많고, 데이터가 클수록, query를 내부적으로 최적화하는 단계가 있어 더 좋은 성능을 보인다.
filtered_data = data.query("Date >20220000 and Publication == 'New York Times'")

→ 실행 시간 :0.07s

 

 

 

(4) isin을 사용한 indexing

  • 큰 범위애서 보면, Boolean Type으로 indexing에 속하는데, 특정 문자열과 일치하는 조건을 찾을 때는, boolean type에 isin을 넣어주면 더 빨리 찾을 수 있다.
filtered_data = data[(data["Publication"].isin(['New York Times'])) & (data["Date"] > 20220000)]

→ 실행 시간 :0.05s

 

 

(5) itertuples를 사용한 순회

  • 순회를 이용한 데이터 indexing은 별로 좋은 방법은 아니다.
  • 하지만, 연산과 조회를 같이하는 경우에는 한 번의 순회에 조회 조건을 넣어, 데이터를 찾는 것도 고려해 볼 수 있다.
find_index = []
for i, row in enumerate(data.itertuples(index=False), start=0):
    if row.Date > 20220000 and row.Publication == 'New York Times':
        find_index.append(i)

filtered_data = data.iloc[find_index]

→ 실행 시간 :2.01s

 

[실험 결과] 

  • 일반적으로 사용되는 boolean을 사용하는 것이 좋다고 알려져 있지만, 참조하는 칼럼이 많고, 데이터가 많을 경우에는 query를 사용하는 것이 효과적일 수 있다.
  • Boolean Type도 단순히 조건을 넣어서 indexing 하는 것보다, isin  같은 pandas 연산자를 함께 사용해서 데이터를 찾는 것이 효율적이다.

 

3. 문자열 포함 검색

  • SQL에서는 LIKE라는 특정 문자열을 포함했는지 여부를 찾는 방법이 있지만, Pandas에서는 LIKE를 지원하지 않는다. 
  • 생각보다, 특정 문자를 포함하는지 여부를 점검하는 경우가 많은데, 이런 경우 어느 방법이 효과적일까?

 

(테스트 상황)

  • headline 데이터셋에서 URL이 https 형식을 사용하는 row만 추출하려 한다.

(1) Pandas str contains를 사용한 검색

  • Pandas에서는 문자열 존재 여부를 체크해 주는 str.contains가 존재한다. 
filtered_data = data[data["URL"].str.contains("https://")]

→ 실행 시간 :1.18s

 

 

(2) apply를 통한, indexing (in)

  • Python에서 특정 문자가 포함되었는지 여부는 in을 통해 쉽게 파악할 수 있다.
  • pandas의 apply를 통한 indexing을 이용해 쉽게 문자열을 검색할 수 있다.
filtered_data = data[data["URL"].apply(lambda x: "https://" in x)]

→ 실행 시간 : 0.62s

 

 

(3) apply를 통한, indexing (startswith)

  • in과 마찬가지지만, in은 위치를 특정해주지는 못한다. 이럴 때는 startswith나 endwith 등을 사용하여 indexing 할 수 있다.
filtered_data = data[data["URL"].apply(lambda x: x.startswith("https://"))]

→ 실행 시간 : 0.69s

 

 

(4) apply를 통한, indexing (re)

  • in과 statrswith 등으로 문자열 검색이 가능하지만, 실제 검색하고자 하는 데이터들은 특정 형태를 가지고 있는 경우가 많다. 
  • 이런 경우, 보통 for문을 통한 순회를 가장 먼저 생각하는데, 위에서 보인대로, 단순 for문은 너무 많은 시간이 걸린다.
  • 이런 경우, apply에 re를 사용하여, 정규 표현식 형태로 문자를 검색할 수 있다.
import re
filtered_data = data[data["URL"].apply(lambda x: True if re.search("https://", x) else False)]

→ 실행 시간 : 2.38s

 

 

[실험 결과] 

  • str contains보다, pandas apply를 통해, boolean 형태의 output을 내는 함수를 정의하고, 이것을 indexing 하는 것이 더 빠르다.
  • 가장 중요한 것은 단순 반복문 형태의 순회는 최대한 지양하는 것이 성능 측면에서 도움이 된다.

 

 

+ Recent posts