반응형

최근 회사에서 plsql로 수 천 줄이 넘는 query를 python으로 바꾸었다. 최초에는 기존 병렬 처리를 그대로 바꾸기 위해, pyspark로 바꾸는 방법을 선택했지만, 생각보다 속도가 매우 느리고, Python UDF 사용 시, 데이터 정합성이 틀어지는 문제등이 발생하였다. (아마 비동기 처리 때문일 것으로 예상된다.) 결국, 여러 가지 시도 후, 이 프로젝트에서는 pandas가 더 유용할 것이라고 판단되었고, 속도를 끌어올리기 위해 여러 방법을 사용해 보다가 발견한 방법을 소개한다.


Pandas Apply의 문제

  • Pandas는 C를 이용한 연산을 하는 numpy 기반으로 되어 있기 때문에, 어느 정도의 벡터화는 가능하지만, 무거운 연산등을 apply로 수행할 때, 속도적 한계를 가진다.
  • 이는 Python의 고질적 문제인 GIL(Global Interpreter Lock)으로 인한 병렬 처리의 한계와 타입 추론에 추가적인 cost가 든다는 점등이 있다. 
  • 특히, GIL 때문에, pandas는 하나의 쓰레드 만이 python 바이트 코드를 실행하기 때문에, 많은 row에 apply를 수행할 때, 병렬 처리의 효과를 보지 못해 오랜 연산 시간이 걸리게 된다.
  • 이를 해결하기 위해, dask나 swifter 같은 다양한 multiprocessing 지원 라이브러리가 나왔지만, ⓐ경험상 특정 연산이 아니면 속도적 이점이 뚜렷하지 않고, ⓑ처리해야 할 row수가 적은 경우에는 오히려 작업 병렬화에 드는 cost가 더 큰 경우가 많다.

2023.07.21 - [Python] - Pandas 성능 향상을 위한 방법들

 

Pandas 성능 향상을 위한 방법들

Pandas 란? Pandas는 파이썬에서 데이터 처리와 분석을 위한 라이브러리로, numpy를 기반으로 개발되었다. Pandas는 DataFrame과 Series라는 데이터 구조를 사용하여, 데이터를 쉽게 처리할 수 있도록 한다.

devhwi.tistory.com

  • 보통, 이런 경우에 다음 step으로 고민하는 것이 spark인데, 경험상 처리할 데이터가 수 GB 이하인 경우에는 pandas가 유리하다. 특히, 복잡한 로직이 전처리 과정에 많이 들어갈 경우에는 직렬화 cost가 많다는 점, API가 확실히 유연하지 않다는 점(예를 들어, 여러 칼럼에 대한 동시에 unnest를 수행할 때, pandas는 explode 한 번에 가능하지만, spark는 여러 칼럼을 묶었다 푸는 과정이 추가적으로 필요하다) 때문에 pandas가 소규모 데이터 처리에 더 큰 강점을 가진다고 생각된다.

 

Python Multiprocessing

  • 이를 해결하기 위해, 기존 apply를 python "multiprocessing" 라이브러리를 사용하여 처리 방식을 바꿔서 처리할 수 있다.
  • 원리는 처리해야하는 전체 row를 사용 가능한 process 수만큼으로 나눠서 처리하고, 그 연산 결과를 concat 하는 것이다.
  • 특히, 이 방식은 apply에서 참조하는 칼럼이 하나인 경우에는 연산처리 시간이 매우 크지 않아 차이가 덜한데, 여러 칼럼을 참조하는 경우(apply의 axis가 1인 경우)에는 그 효과가 매우 크다.
  • 물론, 이 방식이 항상 장점만 있는 것은 아니다. 우선 당연히 단일 코어가 아니여야하고, 적은 데이터에서는 프로세스 간 통신 오버헤드나 cocat 과정에서 발생하는 cost 등이 존재하여, 성능 상 오히려 손해를 볼 수도 있다. 또한, 메모리 사용량이 증가할 수 있다는 것이 단점이 될 수 있다.
  • 따라서, 모든 apply를 해당 방식으로 바꾸기보다는 time debugging 이후, 연산 시간이 너무 많이 소요되는 apply를 대체하여 비교하는 방식으로 적용하는 것을 추천한다.
  • 대체 코드는 다음과 같다. parallel_apply의 인자는 처리할 pandas 데이터와 그 함수에 적용할 func를 넘겨주면 된다.(이때, func에 lambda를 사용할 수 없는데, lambda는 직렬화가 되지 않기 때문이다. 따라서 일반 함수를 정의해서 사용하면 된다.)

 

[DataFrame으로 나누기]

import pandas as pd
from multiprocessing import Pool, cpu_count

def parallel_apply(data, func, num_processes=None):
    if num_processes is None:
        num_processes = cpu_count()

    chunk_size = len(data) // num_processes
    chunks = [data.iloc[i:i + chunk_size] for i in range(0, len(data), chunk_size)]

    with Pool(num_processes) as pool:
        results = pool.map(func, chunks)

    return pd.concat(results, axis=0)

 

[Row로 나누기]

def parallel_apply(data, func, num_processes=None):
    if num_processes is None:
        num_processes = cpu_count()
    # print("core 갯수 ", num_processes)

    with Pool(num_processes) as pool:
        results = pool.map(func, [row for index, row in df.iterrows()])
    return results

 

  • 데이터를 chunk 단위로 나누는 방식은 DataFrame 단위(하나의 큰 DataFrame을 여러 DataFrame으로 나눔)나, Row 단위(각 row의 연산을 병렬 수행함) 경험적으로 연산의 CPU 사용량 클 때는 Row 단위의 나눔이 성능 상 우위가 있는 것 같고, 아니면 chunk 단위로 나누는 것이 빠른 것 같다.

 

사용 예시

  • 우선 여러 DataFrame으로 나눠서 처리하는 예시이다. 실제 사용하는 함수에서는 apply에 들어가는 함수가 매우 heavy하여 그 효과가 더 컸지만, 그만큼 heavy 한 함수가 생각나지 않아 row의 크기를 늘렸다.
import time

import pandas as pd
import numpy as np
from multiprocessing import Pool, cpu_count
import math


def custom_func(row):
    return row['data1'] ** 2 - math.sqrt(row['data2'])

def parallel_custom_func(chunk):
    result = chunk.apply(custom_func, axis=1)
    return result


def parallel_apply(data, func, num_processes=None):
    if num_processes is None:
        num_processes = cpu_count()
    print("core 갯수 ", num_processes)

    chunk_size = len(data) // num_processes
    chunks = [data.iloc[i:i + chunk_size] for i in range(0, len(data), chunk_size)]

    with Pool(num_processes) as pool:
        results = pool.map(func, chunks)
    return pd.concat(results, axis=0)


if __name__ == '__main__':
    df = pd.DataFrame({'data1': np.arange(1000000), 'data2': np.arange(1000000)})

    # pandas apply
    start_time = time.time()
    df["data"] = df.apply(lambda row: row['data1'] ** 2 - math.sqrt(row['data2']), axis=1)
    print("일반 Apply",time.time() - start_time)

    # multiprocessing apply
    start_time = time.time()
    df["parallel_data"] = parallel_apply(df, parallel_custom_func)
    print("Parallel Apply",time.time() - start_time)

 

Apply 외 다른 함수에 적용 & 주의점

  • row 단위의 연산을 하는 다른 함수에도 적용 가능하다. 예를 들어, unnest를 위한 explode 함수등에도 사용 가능하다. 다만, 연산이 매우 무겁지 않은 경우에는 속도적 차이를 느끼지 못했다.
  • 경험상, pandas의 내장 함수가 가장 빠르기 때문에 내장 함수로 처리 가능한 것은 내장 함수를 우선적으로 선택하고(C를 이용한 벡터화가 매우 잘 되어 있을 것으로 예상됨), apply 함수 중, 연산 수행 시간이 매우 오래 걸리는 부분을 선택적으로 multiprocessing을 이용한 함수로 바꾸면 효과가 큰 것 같다.
  • 연산이 가볍고, row수가 많지 않을 때는 단순 apply가 빠르다. 꼭 time debugging 후 바꿔야한다!
  • 그럼에도 불구하고, multiprocessing이 swifter 등보다 좋은 점은 연산의 병렬 처리 정도를 연산에 참여하는 process 수를 조절하면서 적절하게 조정할 수 있다는 것이다. 단순히 apply를 사용하거나 전체 process를 다 사용하는 것이 아니라, concat 과정에서의 overhead와 비교해가면서 적절한 수준의 병렬처리를 해주면서 조절할 수 있다.

+ Recent posts