반응형

Scikit-learn 이란?

  • Python 기반의 머신러닝 라이브러리로, 머신러닝 관련 다양한 알고리즘과 함수들을 포함하고 있어, 머신러닝 프로젝트에서는 필수 라이브러리이다.

설치 방법

pip install scikit-learn

 

 

Scikit-learn 주요 함수 

  • 사실 Scikit-learn은 계속 새로운 버전이 등장하는 라이브러리이기 때문에, 라이브러리 내의 모든 함수를 보기 위해서는 공식 홈페이지에 방문하는 것이 좋다.
  • 아래 정리된 내용은 지금껏 Scikit-learn을 사용해 오면서, 순전히 주관적인 기준으로 Scikit-learn의 주요 함수를 정리한 것이다.
  • Scikit-learn은 크게 분류하면, Classification, Regression , Clustering 등의 특정 알고리즘을 구현해 놓은 알고리즘 함수와 전처리, 특성추출, 평가등을 통해, 머신러닝 데이터를 쉽게 처리할 수 있게 하는 비알고리즘 함수로 나눌 수 있다. 

 

[머신러닝 데이터처리 함수]

분류 함수명 설명
preprocessing StandardScaler z-score normalization
MinMaxScaler min-max normalization
OrdinalEncoder 범주형 변수 숫자로 encoding
model_selection train_test_split trainset, testset split
KFold K-fold 교차 검증
cross_validate 교차 검증
GridSearchCV 하이퍼파라미터 튜닝(Grid Search로)
RandomizedSearchCV 하이퍼파라미터 튜닝(Random Search로)
metrics accuracy_score accuracy 계산
top_k_accuracy_score top-k accuracy 계산
auc auc 계산
roc_curve roc curve(fpr, tpr)
roc_auc_score roc curve에서 auc 계산
confusion_matrix confusion matrix
recall_score reacall 계산
classification_report 분류 모델 성능 요약 report 생성
mean_squared_error MSE 계산
r2_score 결정계수 계산

 

 

<preprocessing>

preprocessing은 숫자형 데이터를 normalize하거나, encoding 하는 데 사용된다. 

  • StandardScaler : 평균 0, 표준편차 1로 값을 normalize한다. 
  • MinMaxScaler : min-max normalization을 진행한다. (0~1사이의 범위를 가짐)

→ fit을 이용하여, scaler를 조정(최대,최소 나 평균 표준편차)하고, transform을 통해, 데이터를 변환한다. fit_transform을 사용하면, 해당 데이터셋 내에서 scale을 조정할 수 있다. 

from sklearn import preprocessing
import numpy as np

if __name__ == '__main__':
    # x = [1,2,3,...,9]
    x = np.arange(10).reshape(10,1)
    
    # z-score normalize
    std_normal = preprocessing.StandardScaler()
    normalized_x =  std_normal.fit_transform(x)
    
    
    # min-max normalize
    minmax_normal = preprocessing.MinMaxScaler()
    normalized_x = minmax_normal.fit_transform(x)
  • OrdinalEncoder : 범주형 데이터들에 숫자를 mapping 해줄 때 사용한다.

→ Scaler와 마찬가지로, fit을 이용하여, 문자를 숫자로 만드는 dictionary 구조를 만들 수 있고, transform을 이용하여 적용할 수 있다. fit_transform으로 dictionary 생성 및 변환이 가능하고, inverse_transform을 통해, 거꾸로 숫자에서 문자로 바꾸는 변환도 가능하다.

from sklearn import preprocessing
import numpy as np

if __name__ == '__main__':
    category = np.array(["사과", "딸기", "배", "두리안"])
    category = np.expand_dims(category,axis=-1)
    encoder.fit_transform(category)
    
    # mapping 정보 확인 (해당 list의 index에 번호 대응)
    print(encoder.categories_)

 

 

<model_selection>

model_selection은 데이터를 나누거나, 교차검증, 하이퍼 파라미터등을 통해, model 실험에 도움이 되는 함수들이 존재한다.

 

  • train_test_split : data를 train과 test로 분할해준다. 비율을 지정할 수 있는 등, 굉장히 유용하여, 많이 사용한다.

→ array를 넣어주고, train size나 test size 중 1개를 설정하여(비율), 정한다. random_state를 명시적으로 지정하면, 복원 가능하다. 

from sklearn import datasets
from sklearn import model_selection

if __name__ == '__main__':
	# iris data load
	iris_data = datasets.load_iris()
    
    X = iris_data["data"]
	Y = iris_data["target"]
    
    # train과 test를 8:2로 분리
    x_train, x_test, y_train, y_test = model_selection.train_test_split(X,Y, train_size = 0.8)
    
    print(x_train.size) #480
    print(x_test.size)  #120

 

  • KFold : K-fold 교차 검증을 위한, 객체 생성에 사용된다. 

→ n_split를 통해, K값을 정해주고, shuffle을 통해, 각 반복에서 데이터를 섞을지 여부를 선택가능하다. 

from sklearn.datasets import load_iris
from sklearn.model_selection import KFold
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
import numpy as np


if __name__ == '__main__':
    iris = load_iris()
    X, Y = iris.data, iris.target

    # K-Fold 교차 검증
    kfold = KFold(n_splits=5, shuffle=True, random_state=0)

    model = LogisticRegression()

    fold_accuracies = []

    for train_index, test_index in kfold.split(X):
        x_train, x_test = X[train_index], X[test_index]
        y_train, y_test = Y[train_index], Y[test_index]

        model.fit(x_train, y_train)

        y_pred = model.predict(x_test)

        accuracy = accuracy_score(y_test, y_pred)
        fold_accuracies.append(accuracy)
        
    for i, accuracy in enumerate(fold_accuracies, 1):
        print(f"Fold {i} Accuracy: {accuracy}")

    mean_accuracy = np.mean(fold_accuracies)
    print(f"\nMean Cross-Validation Accuracy: {mean_accuracy}")
  • GridSearchCV : GridSearch로 최적의 하이퍼파라미터를 찾는다. 
  • RandomizedSearchCV : Random Search로 최적의 하이퍼파라미터를 찾는다. GridSearch보다 빠르다. 

→ 최적화하려는 모델을 인자로 넣어준다. pram_grid 인자에 탐색할 하이퍼파라미터의 리스트들을 딕셔너리 형태로 넣어준다. scoring 인자를 통해, 모델의 성능 평가를 위한 지표 설정이 가능하다. cv를 통해, Fold 수를 지정 가능하다.

from sklearn.datasets import load_iris
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV, train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from scipy.stats import uniform, randint

if __name__ == '__main__':
    iris = load_iris()
    X, y = iris.data, iris.target

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0)

    model = LogisticRegression()

    param_grid = {
        'C': [0.001, 0.01, 0.1, 1, 10, 100],
        'penalty': ['l1', 'l2']
    }

    param_dist = {
        'C': uniform(0.001, 100),
        'penalty': ['l1', 'l2']
    }

    grid_search = GridSearchCV(model, param_grid, cv=5, scoring='accuracy')
    grid_search.fit(X_train, y_train)

    print("GridSearchCV - Best Parameters:", grid_search.best_params_)
    print("GridSearchCV - Best Accuracy:", grid_search.best_score_)

    random_search = RandomizedSearchCV(model, param_dist, n_iter=10, cv=5, scoring='accuracy', random_state=0)
    random_search.fit(X_train, y_train)

    print("\nRandomizedSearchCV - Best Parameters:", random_search.best_params_)
    print("RandomizedSearchCV - Best Accuracy:", random_search.best_score_)

 

 

<metrics> 

metrics는 모델의 성능을 쉽게 측정할 수 있도록 한다. torch 등의 딥러닝에서 얻은 데이터도 numpy나 list로 변환하여, 사이킷런 의 metrics를 이용하면, 매우 쉽게 성능을 구할 수 있다.

  • accuracy_score : accuracy 값을 구할 수 있다.
  • top_k_accuracy_score : top-k accuracy(상위 k개 중 정답 존재하는지) score를 구할 수 있다.
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score, top_k_accuracy_score, confusion_matrix, recall_score, r2_score, classification_report

if __name__ == '__main__':
    iris = load_iris()
    X,y = iris["data"], iris["target"]
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    model = DecisionTreeClassifier()
    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)

    accuracy = accuracy_score(y_test, y_pred)
    print(f"Accuracy Score: {accuracy}")
    # Accuracy Score: 1.0
    
    top_k_acc = top_k_accuracy_score(y_test, model.predict_proba(X_test), k=2)
    print(f"Top-k Accuracy Score: {top_k_acc}")
    # Top-k Accuracy Score: 1.0
  • roc_curve : 분류 모델의 roc curve의 x축과 y축 (각각 fpr, tpr) 값을 구할 수 있다.
  • auc : AUC(area under the curve)를 쉽게 구할 수 있다. auc를 사용하기 전에는 roc_curve함수를 먼저 사용하여, fpr과 tpr을 구해야 한다. 
  • roc_auc_score : auc 값을 roc_curve 선행 없이 구할 수 있다.
  • confusion_matrix : confusion matrix를 쉽게 구할 수 있다.
  • recall_score : recall 값을 구할 수 있다.
  • classification_report : recall, precision , f1 score등의 결과를 쉽게 구할 수 있다. 
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import auc, roc_curve, roc_auc_score, confusion_matrix, recall_score, classification_report
import numpy as np

if __name__ == '__main__':
    iris = load_iris()
    X,y = iris["data"], iris["target"]
    
    y = np.array([1 if i==0 else 0 for i in y])

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    model = DecisionTreeClassifier()
    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)
    y_prob = model.predict_proba(X_test)[:, 1]

    fpr, tpr, thresholds = roc_curve(y_test, y_prob)
    area_under_curve = auc(fpr, tpr)
    print(f"AUC Score: {area_under_curve}")
    # AUC Score: 1.0

    plt.plot(fpr, tpr, label='ROC Curve')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC) Curve')
    plt.legend()
    plt.show()

    roc_auc = roc_auc_score(y_test, y_prob)
    print(f"ROC AUC Score: {roc_auc}")
    # ROC AUC Score: 1.0

    conf_matrix = confusion_matrix(y_test, y_pred)
    print(f"Confusion Matrix:\n{conf_matrix}")
    # Confusion Matrix:
    # [[20  0]
    # [ 0 10]]

    recall = recall_score(y_test, y_pred)
    print(f"Recall Score: {recall}")
    # Recall Score: 1.0

    class_report = classification_report(y_test, y_pred)
    print(f"Classification Report:\n{class_report}")
    # Classification Report:
    #               precision    recall  f1-score   support
    # 
    #            0       1.00      1.00      1.00        20
    #            1       1.00      1.00      1.00        10
    # 
    #     accuracy                           1.00        30
    #    macro avg       1.00      1.00      1.00        30
    # weighted avg       1.00      1.00      1.00        30

 

  • mean_squared_error : 평균 제곱 오차(Mean Squared Error, MSE)를 구할 수 있다.
  • r2_score : 회귀 모델의 결정 계수를 구할 수 있다.
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score, mean_squared_error
import numpy as np

if __name__ == '__main__':
    iris = load_iris()
    X,y = iris["data"][:,0].reshape(-1,1), iris["data"][:,-1]

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    model = LinearRegression()
    model.fit(X_train, y_train)
    
    y_pred = model.predict(X_test)

    mse = mean_squared_error(y_test, y_pred)
    print(f"Mean Sqared Error:{mse}")
    # Mean Sqared Error:0.1541458433507937
    
    r2 = r2_score(y_test, y_pred) 
    print(f"R2 Score:{r2}")
    # R2 Score:0.7575009893273535

 

반응형

최근 회사에서 plsql로 수 천 줄이 넘는 query를 python으로 바꾸었다. 최초에는 기존 병렬 처리를 그대로 바꾸기 위해, pyspark로 바꾸는 방법을 선택했지만, 생각보다 속도가 매우 느리고, Python UDF 사용 시, 데이터 정합성이 틀어지는 문제등이 발생하였다. (아마 비동기 처리 때문일 것으로 예상된다.) 결국, 여러 가지 시도 후, 이 프로젝트에서는 pandas가 더 유용할 것이라고 판단되었고, 속도를 끌어올리기 위해 여러 방법을 사용해 보다가 발견한 방법을 소개한다.


Pandas Apply의 문제

  • Pandas는 C를 이용한 연산을 하는 numpy 기반으로 되어 있기 때문에, 어느 정도의 벡터화는 가능하지만, 무거운 연산등을 apply로 수행할 때, 속도적 한계를 가진다.
  • 이는 Python의 고질적 문제인 GIL(Global Interpreter Lock)으로 인한 병렬 처리의 한계와 타입 추론에 추가적인 cost가 든다는 점등이 있다. 
  • 특히, GIL 때문에, pandas는 하나의 쓰레드 만이 python 바이트 코드를 실행하기 때문에, 많은 row에 apply를 수행할 때, 병렬 처리의 효과를 보지 못해 오랜 연산 시간이 걸리게 된다.
  • 이를 해결하기 위해, dask나 swifter 같은 다양한 multiprocessing 지원 라이브러리가 나왔지만, ⓐ경험상 특정 연산이 아니면 속도적 이점이 뚜렷하지 않고, ⓑ처리해야 할 row수가 적은 경우에는 오히려 작업 병렬화에 드는 cost가 더 큰 경우가 많다.

2023.07.21 - [Python] - Pandas 성능 향상을 위한 방법들

 

Pandas 성능 향상을 위한 방법들

Pandas 란? Pandas는 파이썬에서 데이터 처리와 분석을 위한 라이브러리로, numpy를 기반으로 개발되었다. Pandas는 DataFrame과 Series라는 데이터 구조를 사용하여, 데이터를 쉽게 처리할 수 있도록 한다.

devhwi.tistory.com

  • 보통, 이런 경우에 다음 step으로 고민하는 것이 spark인데, 경험상 처리할 데이터가 수 GB 이하인 경우에는 pandas가 유리하다. 특히, 복잡한 로직이 전처리 과정에 많이 들어갈 경우에는 직렬화 cost가 많다는 점, API가 확실히 유연하지 않다는 점(예를 들어, 여러 칼럼에 대한 동시에 unnest를 수행할 때, pandas는 explode 한 번에 가능하지만, spark는 여러 칼럼을 묶었다 푸는 과정이 추가적으로 필요하다) 때문에 pandas가 소규모 데이터 처리에 더 큰 강점을 가진다고 생각된다.

 

Python Multiprocessing

  • 이를 해결하기 위해, 기존 apply를 python "multiprocessing" 라이브러리를 사용하여 처리 방식을 바꿔서 처리할 수 있다.
  • 원리는 처리해야하는 전체 row를 사용 가능한 process 수만큼으로 나눠서 처리하고, 그 연산 결과를 concat 하는 것이다.
  • 특히, 이 방식은 apply에서 참조하는 칼럼이 하나인 경우에는 연산처리 시간이 매우 크지 않아 차이가 덜한데, 여러 칼럼을 참조하는 경우(apply의 axis가 1인 경우)에는 그 효과가 매우 크다.
  • 물론, 이 방식이 항상 장점만 있는 것은 아니다. 우선 당연히 단일 코어가 아니여야하고, 적은 데이터에서는 프로세스 간 통신 오버헤드나 cocat 과정에서 발생하는 cost 등이 존재하여, 성능 상 오히려 손해를 볼 수도 있다. 또한, 메모리 사용량이 증가할 수 있다는 것이 단점이 될 수 있다.
  • 따라서, 모든 apply를 해당 방식으로 바꾸기보다는 time debugging 이후, 연산 시간이 너무 많이 소요되는 apply를 대체하여 비교하는 방식으로 적용하는 것을 추천한다.
  • 대체 코드는 다음과 같다. parallel_apply의 인자는 처리할 pandas 데이터와 그 함수에 적용할 func를 넘겨주면 된다.(이때, func에 lambda를 사용할 수 없는데, lambda는 직렬화가 되지 않기 때문이다. 따라서 일반 함수를 정의해서 사용하면 된다.)

 

[DataFrame으로 나누기]

import pandas as pd
from multiprocessing import Pool, cpu_count

def parallel_apply(data, func, num_processes=None):
    if num_processes is None:
        num_processes = cpu_count()

    chunk_size = len(data) // num_processes
    chunks = [data.iloc[i:i + chunk_size] for i in range(0, len(data), chunk_size)]

    with Pool(num_processes) as pool:
        results = pool.map(func, chunks)

    return pd.concat(results, axis=0)

 

[Row로 나누기]

def parallel_apply(data, func, num_processes=None):
    if num_processes is None:
        num_processes = cpu_count()
    # print("core 갯수 ", num_processes)

    with Pool(num_processes) as pool:
        results = pool.map(func, [row for index, row in df.iterrows()])
    return results

 

  • 데이터를 chunk 단위로 나누는 방식은 DataFrame 단위(하나의 큰 DataFrame을 여러 DataFrame으로 나눔)나, Row 단위(각 row의 연산을 병렬 수행함) 경험적으로 연산의 CPU 사용량 클 때는 Row 단위의 나눔이 성능 상 우위가 있는 것 같고, 아니면 chunk 단위로 나누는 것이 빠른 것 같다.

 

사용 예시

  • 우선 여러 DataFrame으로 나눠서 처리하는 예시이다. 실제 사용하는 함수에서는 apply에 들어가는 함수가 매우 heavy하여 그 효과가 더 컸지만, 그만큼 heavy 한 함수가 생각나지 않아 row의 크기를 늘렸다.
import time

import pandas as pd
import numpy as np
from multiprocessing import Pool, cpu_count
import math


def custom_func(row):
    return row['data1'] ** 2 - math.sqrt(row['data2'])

def parallel_custom_func(chunk):
    result = chunk.apply(custom_func, axis=1)
    return result


def parallel_apply(data, func, num_processes=None):
    if num_processes is None:
        num_processes = cpu_count()
    print("core 갯수 ", num_processes)

    chunk_size = len(data) // num_processes
    chunks = [data.iloc[i:i + chunk_size] for i in range(0, len(data), chunk_size)]

    with Pool(num_processes) as pool:
        results = pool.map(func, chunks)
    return pd.concat(results, axis=0)


if __name__ == '__main__':
    df = pd.DataFrame({'data1': np.arange(1000000), 'data2': np.arange(1000000)})

    # pandas apply
    start_time = time.time()
    df["data"] = df.apply(lambda row: row['data1'] ** 2 - math.sqrt(row['data2']), axis=1)
    print("일반 Apply",time.time() - start_time)

    # multiprocessing apply
    start_time = time.time()
    df["parallel_data"] = parallel_apply(df, parallel_custom_func)
    print("Parallel Apply",time.time() - start_time)

 

Apply 외 다른 함수에 적용 & 주의점

  • row 단위의 연산을 하는 다른 함수에도 적용 가능하다. 예를 들어, unnest를 위한 explode 함수등에도 사용 가능하다. 다만, 연산이 매우 무겁지 않은 경우에는 속도적 차이를 느끼지 못했다.
  • 경험상, pandas의 내장 함수가 가장 빠르기 때문에 내장 함수로 처리 가능한 것은 내장 함수를 우선적으로 선택하고(C를 이용한 벡터화가 매우 잘 되어 있을 것으로 예상됨), apply 함수 중, 연산 수행 시간이 매우 오래 걸리는 부분을 선택적으로 multiprocessing을 이용한 함수로 바꾸면 효과가 큰 것 같다.
  • 연산이 가볍고, row수가 많지 않을 때는 단순 apply가 빠르다. 꼭 time debugging 후 바꿔야한다!
  • 그럼에도 불구하고, multiprocessing이 swifter 등보다 좋은 점은 연산의 병렬 처리 정도를 연산에 참여하는 process 수를 조절하면서 적절하게 조정할 수 있다는 것이다. 단순히 apply를 사용하거나 전체 process를 다 사용하는 것이 아니라, concat 과정에서의 overhead와 비교해가면서 적절한 수준의 병렬처리를 해주면서 조절할 수 있다.
반응형

Python으로 데이터를 다루는 업무에서 Pandas는 거의 필수적이다. Pandas는 쉽고, 빠르고(Python에 비해), 유용하고, 간편하기 때문이다.  또한, 테이블 형태의 구조를 가지고 있는 pandas DataFrame은 매우 친숙하고, 안정적인 느낌을 준다. 평소에는 Pandas의 사용하는 기능만 사용하고, 필요한 기능이 있으면 그때그때 알아보면서 사용하고 있는데, 이번 기회에 Pandas의 함수를 정리해보고자 한다. 

(사실 pandas의 전체 함수를 정리하는 것은 거의 불가능하다. 내 기준으로 많이 사용하고 유용할 것 같은 함수를 정리했다. 필요한 함수가 있다면 https://pandas.pydata.org/docs/reference/index.html 를 직접 참조하는 것이 좋을 것이다.)


Pandas의 데이터 구조

  • Series : Pandas의 series는 1D array와 유사한 데이터구조로, index와 value로 구성되어 있다. Series가 1D의 형태를 가지고 있다 보니, axis가 1개 존재하는데, 이 axis를 index라고 한다. value로는 ndarray나, dictionary, 스칼라 값들이 들어갈 수 있다.
  • DataFrame : DatFrame은 2D의 데이터 구조로, SQL 테이블과 비슷하게, column과 row로 구성되어있다. DataFrame의 각혈은 Series로, DataFrame은 칼럼명이 key인 Series들의 dictionary로 생각할 수 있다. DataFrame은 Series나 2D ndarray, 다른 DataFrame, 1D 데이터(ndarray, list, Series 등등..)들로 구성된 Dictionary를 통해 만들 수 있다.
  • 기본적으로 DataFrame은 Numpy를 기반으로 만들어졌다. 따라서, Series는 Numpy 1D 배열(ndarray)과 동일하다고 생각하면 된다. 때문에, numpy에 있는 거의 모든 함수들이 Pandas 데이터 구조에 호환된다.  

 

Pandas의 함수

[Indexing/Selection]

  • Indexing/Selection은 Python의 리스트처럼 매우 직관적이여서, 공식 사이트의 부분으로 설명을 대체한다.

  • query : 위에서 나온 DataFrame 기본 indexing 말고도, SQL 형식으로 DataFrame에서 조건에 맞는 row를 가지고 올 수 있다. 
import seaborn as sns

if __name__ == '__main__':
    titanic_data = sns.load_dataset("titanic")
    print("[query 출력값]")
    print(titanic_data.query('age > 30'))

 

  • between : 특정 숫자형 열의 범위 내의 데이터를 filtering하기 위해 사용할 수 있는 함수이다. 
import seaborn as sns

if __name__ == '__main__':
    titanic_data = sns.load_dataset("titanic")
    print("[between 출력값]")
    print(titanic_data[titanic_data['age'].between(30,40)])

 

[데이터 연산]

  • 사실 Series와 DataFrame이 많이 사용되는 이유이기도 한데, Series나 DataFrame 끼리 연산이 된다.
  • 단순 연산이 간단할 뿐 아니라, 간단한 연산 위주로 수행한다면, 속도가 매우 빠르다. (Pandas는 numpy로 구성되고, numpy는 C로 변환하여 연산을 하기 때문에)
import pandas as pd
import numpy as np

if __name__ == '__main__':

    df1 = pd.DataFrame(np.arange(10))
    df2 = pd.DataFrame(np.arange(10)*2)

    print(df1+df2)
    print(df1-df2)
    print(df1*df2)
    print(df1/df2)

 

[데이터 읽기&쓰기]

  • read_csv, read_excel : 각각 csv 형태, excel 형태를 쉽게 읽어서, DataFrame 형태로 반환한다.
  • to_csv, to_execel : DataFrame을 각각 csv, excel 형태로 export 한다.

 

[데이터 요약&추출]

  • head : DataFrame의 처음 N개(인자값) 행을 출력한다. (default:5) 
  • tail :  DataFrame의 마지막 N개(인자값) 행을 출력한다. (default:5) 
  • sample : DataFrame에서 무작위 행 N개를(인자값) 추출한다. (default:1), DataFrame의 행을 무작위로 섞고 싶다면, frac 인자(추출 비율)를 1로 지정하면, 전체 DataFrame이 무작위로 섞여서 나온다. 

  • describe : DataFrame의 칼럼 중, 숫자 칼럼에 대한 통계값(평균, 표준편차, 최솟값, 사분위수, 백분위수, 최댓값)을 표현해 준다.

  • value_counts : Series 객체 내의 unique value의 빈도를 반환한다. Categorical 데이터 개수 파악에 주로 사용한다.

  • dtypes : dtypes는 함수가 아닌 속성이다. 각 열의 데이터 유형을 출력한다. pandas의 DataFrame은 기본적으로 열의 데이터 유형을 추정하는 방식을 사용하기 때문에, 추정한 유형을 알기 위해 출력이 필요하다.
  • info : DataFrame의 각 칼럼 값과, Index 범위, not null 인자 개수, 데이터 타입, 메모리 정보를 출력해 준다. 

 

import seaborn as sns

if __name__ == '__name__':
    titanic_data = sns.load_dataset("titanic")

    print("[head 출력값]")
    print(titanic_data.head(5))
    print("[tail 출력값]")
    print(titanic_data.tail(5))
    print("[sample 출력값]")
    print(titanic_data.sample(5))
    print("[describe 출력값]")
    print(titanic_data.describe())
    print("[value_counts 출력값]")
    print(titanic_data.value_counts())
    print("[dtypes 출력값]")
    print(titanic_data.dtypes)
    print("[info 출력값]")
    print(titanic_data.info())

 

[데이터 결합]

  • combine_first : 두 DataFrame을 결합할 때, 첫 번째 DataFrame이 누락된 경우, 두 번째 DataFrame의 값으로 대체하는 값이다. DataFrame을 이용한 coalesce or nvl 함수라고 보면 된다. 어떠한 DataFrame에 비어있는 값을 다른 DataFrame의 값으로 대체하기 위한 함수이다. 
  • combine : 두 DataFrame을 결합할 때, 개발자가 정의한 함수에 따라서 결합하기 위한 함수이다. combine_first가 단순히 하나의 DataFrame에 없는 값을 대체해 주는 것에 비해, combine은 결합 조건 및 결합 방식을 직접 정의 가능하다.
  • concat : DataFrame을 단순히 쌓는다. axis가 0이면, row에 append 하는 구조로 DataFrame을 결합하고, axis가 1이면 칼럼이 늘어나는 방식이다. 
  • join : SQL에서의 join처럼 두 DataFrame 간의 join을 위해 사용되는 함수이다. 다만, 인덱스 기준으로 DataFrame을 결합하기 때문에, index를 가공하지 않으면 사용이 제한적이다. join 방법을 지정하는 how 인자에 left, right, inner, outer를 지정할 수 있다.(default left)
  • merge : 조금 더, 유연한 방식의 join 방법이다. join은 DataFrame 함수이고, merge는 pandas 함수이기 때문에, join은 인덱스 기준으로만 join이 가능한 반면, merge는 join key를 지정해 줄 수 있다. 인자는 다양하지만, join key를 지정하는 "on", join 방식을 결정하는 "how" 정도만 사용하면 된다. "how" 인자로는 join과 마찬가지로  left, right, inner, outer를 지정할 수 있지만, default 값이 inner join이다. 
import pandas as pd
import numpy as np

if __name__ == '__main__':
    df1 = pd.DataFrame({'data': [1, 2, np.nan, 4, 5, 6]}, index=[np.arange(6)])
    df2 = pd.DataFrame({'data': [0, 0, 5, 0, 0, 0]}, index=[np.arange(6)])

    df1.columns = ['key']
    df2.columns = ['key']

    combine_df1 = df1.combine_first(df2)
    combine_df2 = df1.combine(df2, lambda x, y: x ** 2 + y ** 2)
    concat_df1 = pd.concat([df1, df2], axis=0)
    concat_df2 = pd.concat([df1, df2], axis=1)
    join_df = df1.join(df2, how='inner',lsuffix='left', rsuffix='right')
    merge_df = pd.merge(df1, df2, on='index', how='inner')

    print("[combine_first 출력값]")
    print(combine_df1)
    print("[combine 출력값]")
    print(combine_df2)
    print("[concat 행 방향 출력값]")
    print(concat_df1)
    print("[concat 열 방향 출력값]")
    print(concat_df2)
    print("[join 출력값]")
    print(join_df)
    print("[merge 출력값]")
    print(merge_df)

 

 

[데이터 변환]

  • apply : pandas에서 가장 유용한 함수라고 생각한다. 인자로 함수를 넣어주면, DataFrame의 각 행에 함수를 적용할 수 있다. 일반적으로 간단한 연산들은 단순 반복문보다 apply로 처리하는 게 속도적으로 유리하다.
  • fillna : DataFrame이나 Series의 누락된 값을 다른 값으로 채울 수 있다. 보통 결측치를 보정하는 데 사용한다.
  • replace : DataFrame의 특정 값을 다른 값으로 변환해 준다.
  • map : dictionary 형태의 mapping을 통해, DataFrame 내의 값을 다른 값으로 mapping 해줄 수 있다. 
import seaborn as sns

if __name__ == '__main__':
    titanic_data = sns.load_dataset("titanic")

    print("[apply 출력값]")
    print(titanic_data['fare'].apply(lambda x: (x+10)*2))
    print("[fillna 출력값]")
    print(titanic_data['age'].fillna(titanic_data['age'].mean()))
    print("[replace 출력값]")
    print(titanic_data['sex'].replace('male', 1))
    print("[map 출력값]")
    print(titanic_data['sex'].map({'male':1, 'female':2}))

 

[집계 함수]

  • mean, max, min, sum, median, std, count : pandas의 DataFrame은 Table 형태의 구조를 띄고 있기 때문에, 집계가 매우 용이하다. DataFrame의 특정 칼럼을 지정하고, 해당 함수를 사용하면, 칼럼의 통계값을 쉽게 얻을 수 있다. 
  • agg : 집계를 한 번에 적용할 때, 쉽게 사용할 수 있다. 보통 groupby와 함께 자주 사용된다. axis가 0이면 칼럼별로, 1이면 행별로 함수를 적용한다.
  • cut : 연속형 데이터를 구간으로 나누는 데 사용된다. 
if __name__ == '__main__':
    titanic_data = sns.load_dataset("titanic")

    print("[max 출력값]")
    print(titanic_data['fare'].max())
    print("[agg 출력값]")
    print(titanic_data['fare'].agg(['max', 'min', 'mean'], axis=0))
    print("[cut 출력값]")
    bins = [0, 18, 35, 50, 100]
    labels = ['미성년자', '청년', '중년', '노년']
    print(pd.cut(titanic_data['age'], bins=bins, labels=labels))

 

 

[정렬 함수]

  • sort_values : SQL의 orderby 함수처럼 특정 칼럼의 값에 따라 정렬할 수 있다. 기본적으로 오름차순으로 정렬한다."by" 인자에 열을 명시해줘야 한다.
  • sort_index : Index 기반으로 DataFrame을 정렬한다. 마찬가지로 기본적으로 오름차순으로 정렬한다.
  • nsamllest, nlargest : 각각 DataFrame 내에서 가장 큰 값, 가장 작은 값 N개를 가진 행을 찾는 데 사용된다. 
if __name__ == '__main__':
    titanic_data = sns.load_dataset("titanic")

    print("[sort_values 출력값]")
    print(titanic_data.sort_values(by='age'))
    print("[sort_values 출력값]")
    print(titanic_data.sort_values(by='age'))
    print("[sort_index 출력값]")
    titanic_data = titanic_data.sample(frac=1)
    print(titanic_data.sort_index())
    print("[nlargest 출력값]")
    print(titanic_data["age"].nlargest(5))
    print("[nsamllest 출력값]")
    print(titanic_data["age"].nsmallest(5))

 

 

[그 밖 유용한 함수들]

  • groupby : SQL의 groupby처럼 DataFrame을 grouping 하여, group 별 집계, 연산에 사용하는 함수이다. groupby key를 지정해 주면, 해당 기준으로 grouping 된다. 
if __name__ == '__main__':
    titanic_data = sns.load_dataset("titanic")

    group_titanic_data = titanic_data.groupby('sex')
    print(group_titanic_data)
    print(group_titanic_data["fare"].max())
    print(group_titanic_data["fare"].mean())
    print(group_titanic_data["fare"].min())
    print(group_titanic_data["fare"].agg(['max', 'mean', 'count']))

  • explode : SQL의 Unnest와 같다. DataFrame 내에 List 또는 Series로 저장된 요소들을 분해하여 풀어내는 데 사용된다. 길이가 같은 여러 칼럼의 list를 한 번에 풀 수 있는 게 매우 유용하다. (spark에선 이 기능이 직관적이지 않다.)
if __name__ == '__main__':
    titanic_data = sns.load_dataset("titanic")

    group_titanic_data = titanic_data.groupby('sex')
    # group by를 통해 각 age 요소를 list 형태로 만듬
    group_data = group_titanic_data['age'].agg(list).reset_index()
    print(group_data)
    print(group_data.explode('age'))

 

 

반응형

Pypi란?

  • Pypi(Python Package Index)는 Python을 위한 오픈소스 패키지 저장소이다.
  • Pypi는 오픈소스 패키지를 매우 쉽게 설치할 수 있게 하여, 지금의 Python의 인기를 만든 가장 큰 요인이다. 

Pypi 명령어

  • 패키지 설치
    • 기본적으로 패키지 설치는 pip install을 통해 가능하다.
    • 특정 버전을 명시할 수 있지만, 버전을 명시하지 않으면, pip 버전 내 저장소 안에 있는 가장 최근 버전의 패키지를 설치한다.
    • --upgrade를 붙여주면, pip 버전 내 저장소 안에 있는 가장 최근 버전의 패키지를 설치해 준다. 
    • 실제 코드에선 패키지가 매우 많고, 의존성이 복잡하기 때문에 별도의 파일로 관리하는데, (보통 requirements.txt) 이때, -r 옵션을 붙여주면, 해당 txt를 읽어서, 그 안에 존재하는 패키지를 모두 설치해 준다. 
    • requiremets.txt는 위부터 아래로 순서대로 실행되기 때문에, 의존성이 있는 경우에는 순서에 유의해야한다.
    • pypi는 기본적으로 의존성이 있는 패키지들은 자동으로 설치해 주기 때문에, 의존성에 민감한 패키지들의 경우, --no-deps를 추가하여, 해당 패키지만 설치하기도 한다.
pip install {패키지명}
pip install {패키지명} == {버전}
pip install --upgrade {패키지명}
pip install -r {패키지 목록 파일}
pip install {패키지명} --no-deps

 

  • 패키지 삭제
    • 패키지를 설치하다 보면, 의존성이 꼬이는 경우가 많다. 이런 경우, pip install --upgrade를 사용하는 방법도 있지만, 아예 package를 지우고 다시 설치하는 편이 좋은 경우도 있는데, uninstall을 통해 삭제한다.
    • pip uninstall을 하면, 해당 패키지를 위해 설치되었던, 의존성 있는 라이브러리들은 잔존하게 되는데, 이것은 pip-autoremove를 통해 모두 삭제할 수 있다.
pip uninstall {패키지명}
pip install pip-autoremove #패키지 설치
pip-autoremove {패키지명}

 

  • 설치 가능한 패키지 조회
    • 기존에는 pip search를 통해, 특정 패키지의 설치 가능한 모든 버전을 확인할 수 있었다.
    • 하지만, 너무 많은 API call이 있어서, 지원을 멈췄다고 한다. 
    • 설치 가능한 패키지를 확인하는 방법은 pypi.org(https://pypi.org/)에 접속하여, 직접 검색해 보는 것 밖에 없다. (예전 버전은 pip search {패키지명} 사용하면 된다.)

 

  • 설치된 패키지 조회
    • pip list는 현재 환경 (local or 가상환경) 내에 존재하는 패키지들과 각 패키지의 버전을 나열해 준다.
    • pip freeze도 pip list처럼 설치된 패키지 목록을 출력하지만, requirements.txt를 바로 구성할 수 있는 형태로 출력된다. 
    • 따라서, pip freeze를 requirements.txt로 export 하고, 다른 환경에서 install 하면 현재 가상환경의 pypi 패키지들을 그대로 설치할 수 있다. (테스트 환경 구축 시 많이 활용한다.) 
pip list
pip freeze
pip freeze > requirements.txt

pip list
pip frreze

 

  • 의존성 확인
    • 계속 언급하듯, pypi 내에서는 의존성이 꼬이는 경우가 많다.(가상환경이 자주 쓰이는 이유이다.)
    • 물론, 설치 시에 의존성에 대한 문제를 제기하겠지만, 당장 실행되면 넘어가는 경우가 많다.
    • 이렇게 의존성이 꼬이는 경우가 많아, 의존성을 check 하기 위한 명령어가 존재하는데, 바로 pip check이다.
pip check

 

 

폐쇄망 Pypi 사용법

  • pypi는 기본적으로 원격 저장소에서 패키지를 가져오는 것이기 때문에, 인터넷 연결이 존재해야 한다. 
  • 아예 오프라인 환경이나, 방화벽 등에 의해, 원격 저장소에 접근하지 못하는 환경에서는 단순 pip install을 이용하여 설치가 불가능하다.
  • 우선, 프락시 서버가 있는 경우에는 아래와 같이 proxy 서버를 명시하여 사용할 수 있다.
pip install --proxy {proxy 서버 IP}:{proxy 서버 port} {패키지명}
  • proxy 서버도 존재하지 않는 경우에는 인터넷 망에서 패키지를 설치하고, 오프라인 환경에서 빌드하여 사용하는 방법을 사용할 수 있다.
    • 우선 인터넷망에서 pip download를 통해, 패키지를 빌드 전 파일로 내려준다.
    • 설치된 파일들을 USB 등을 통해, 폐쇄망 서버로 옮기고, 아래와 같이 --no-index(index 서버를 사용하지 않겠다는 뜻)와 --find-link 명령어를 포함한 pip install 명령어를 통해 설치해 준다.
    • download를 하나하나 하기 귀찮다면(보통은 의존성 때문에), pip freeze를 이용하여, requirement 형태로 떨군 후, pip download -r requirements.txt를 이용하여, 모든 패키지를 설치하고, 이 패키지를 폐쇄망에서 설치하는 방법도 있다.
(인터넷망) pip download {패키지명}
(폐쇄망) pip install --no-index --find-links={패키지 파일 저장 경로}

 

 

 

반응형

JAX 란?

  • JAX란, 머신러닝의 연산을 가속화하기 위해, Google에서 발표한 컴파일러를 최적화하는 기술이다.
  • JAX는 머신러닝에서 필수적인 Autograd와 XLA(integrated with Accelerated Linear Algebra, 가속 선형 대수) 컴파일러를 통해, 머신러닝의 연산을 빠르게 실행해 준다.
  • JAX는 설치가 매우 쉽고, 기존 Python에서 구현된 Numpy를 쉽게 변환할 수 있어서, 많이 활용되고 있다. 
  • 다만 JAX는 구글의 공식 제품이 아닌, 연구 프로젝트 기 때문에, 아직은 이런저런 버그가 있을 수 있다고 한다.

 

JAX 설치 방법

  • JAX는 우선 기본적으로 Linux나 Mac 운영 체제에서 동작한다.
  • Window도 동작하기는 하지만, 실험버전으로 CPU를 활용한 jax만 지원된다. (WSL을 사용하면 GPU를 사용할 수 있긴 하다.)

[CPU 설치]

pip install --upgrade "jax[cpu]"

 

[GPU & TPU 설치]

  • GPU에서도 pypi를 통해 쉽게 설치가 가능하다. 하지만, GPU는 Linux 환경에서만 설치되는 것을 명심하자. (나의 경우에는 WSL로 진행했다.)
# CUDA 12 
pip install --upgrade "jax[cuda12_pip]" -f https://storage.googleapis.com/jax-releases/jax_cuda_releases.html
# CUDA 11
pip install --upgrade "jax[cuda11_pip]" -f https://storage.googleapis.com/jax-releases/jax_cuda_releases.html

 

JAX 기본 기능

jax.numpy

  • jax는 기본적으로 jax.numpy를 통해, numpy의 API를 그대로 호환해 준다.
  • jax.numpy와 numpy는 거의 비슷하지만, 차이가 있는데,  jax는 함수형 프로그래밍으로 설계되어 있다는 점이다.
  • 즉, numpy는 배열에 직접 접근해서, 값을 바꾸는 것이 허용되지만, jax.numpy는 데이터를 직접 조작하는 것이 허용되지 않는다. → 거의 모든 Python 가속기들의 특징인 것 같다.
  • 다만, 값을 직접 바꾸는 것은 불가능하지만, 해당 요소를 반영한 새로운 배열을 생성할 수 있다.
import jax.numpy as jnp

if __name__ == '__main__':
    data = jnp.array([1,2,3,4])
    data[0] = 5
    # ERROR

    data = data.at[0].set(5)
    # data = [5,2,3,4]

 

 

[grad]

  • JAX는 native Python 및 numpy 코드를 자동으로 미분할 수 있는 기능을 제공한다.
  • JAX의 grad 함수는 함수의 입력에 대한 gradient를 자동으로 계산해 주는 함수이다.
  • JAX의 grad 함수는 loss의 기울기를 구할 때, 매우 빠르고 쉽게 활용될 수 있다.
  • JAX의 grad는 N차 미분값까지 쉽게 구할 수 있다.
import jax
import jax.numpy as jnp

def square(x):
    return x ** 2

if __name__ == '__main__':
    grad_square = jax.grad(square)

    # Calculate Gradient
    x = jnp.array(2.0)
    grad_value = grad_square(x)

    print("Input:", x)
    print("Gradient:", grad_value)

 

 

[jit]

  • jax.jit 함수는 JAX에서 제공하는 함수를 최적화해 주는 메커니즘이다. 
  • jit 함수를 통해, 정의한 함수를 컴파일하여, 최적화된 코드로 변환하고, 이를 Cache에 저장해 둔 뒤, 호출 시, 최적화된 코드를 통해 빠르게 실행된다.
  • 최적화된 Code를 Cache에 저장해 두기 때문에, 반복 변환이나, 불필요한 변환은 피하는 것이 좋다.
  • 다만, jit은 아래와 같은 경우에는 속도 향상이 없거나, 오히려 늦어질 수 있다.
    • 변환하려는 함수 내에 제어문이 포함된 경우
    • 재귀함수 
    • 데이터를 조작하는 함수
    • 크고 복잡한 함수 → 변환을 위한 cost가 더 많이 들 수 있음
  • 다른 모듈처럼, jit 사용을 위해, 단순 decorator만 사용해 주면 된다. 하지만, 변환을 위한 cost가 더 많이 들 수 있기 때문에, 꼭 비교해 보고 사용하는 것이 좋다.
import jax
import jax.numpy as jnp

@jax.jit
def square(x):
    return x ** 2

if __name__ == '__main__':
    grad_square = jax.grad(square)

    # Calculate Gradient
    x = jnp.array(2.0)
    grad_value = grad_square(x)

    print("Input:", x)
    print("Gradient:", grad_value)

 

 

[vmap]

  • jax.vmap 함수는 함수를 Vector 화하여 mapping 하는 함수이다. 
  • vmap 함수를 통해, 배열의 각 요소에 함수를 병렬로 실행할 수 있다. (pandas의 apply와 비슷한 개념이다.)
  • jit과 vmap은 같이 사용될 수 있다. (jit을 먼저 래핑 한 후, vmap을 하거나, vmap을 래핑한 후, jit을 하거나 둘 다 가능하다.)
import jax
import jax.numpy as jnp

def dot_product(x, y):
    return jnp.dot(x, y)

if __name__ == '__main__':
    grad_square = jax.grad(dot_product)

    vectorized_dot_product = jax.vmap(dot_product)

    x = jnp.array([i for i in range(10000)])
    y = jnp.array([i for i in range(10000)])
    grad_value = dot_product(x, y)

 

 

JAX 사용후기

  • JAX는 기본적으로 multi GPU 환경이나, TPU 환경에서 유리하다. 나의 경우에는 single GPU 환경이기 때문에, JAX를 쓰면 오히려 변환에 더 오랜 시간이 걸렸다. (JAX가 분산에 최적화되었기 때문이다.)
  • JAX가 numpy를 호환한다고 하지만,  아직 torch 등의 딥러닝 프레임워크와 호환이 부족하다. 따라서, 단순 기존 코드의 최적화가 아닌, 분산 환경에서 속도를 향상시키기 위한 대대적 Refactoring이나 개발에 사용하는 것이 좋을 것 같다.
  • JAX는 현재 기준(2023.08.07)으로 CUDA 11버전까지만 지원한다. 이것도 환경을 제한하는 요소인 것 같다.
  • 그럼에도 불구하고, JAX는 딥러닝 코드를 Python 언어 내에서 최적화할 수 있는 선택지를 제공한다는 점에서 매우 유용한 것 같다.

 

 

반응형

Pandas 란?

  • Pandas는 파이썬에서 데이터 처리와 분석을 위한 라이브러리로, numpy를 기반으로 개발되었다.
  • Pandas는 DataFrame과 Series라는 데이터 구조를 사용하여, 데이터를 쉽게 처리할 수 있도록 한다.
  • Pandas는 C 언어 등, low level의 언어로 개발된 numpy를 기반으로 하고, 수년간의 버전 업그레이드로 그 자체에 최적화를 해놓았기 때문에, 일반적으로 Pandas에서 제공하는 함수들을 사용하는 것이 성능 면에서 가장 좋다.
  • 하지만, 데이터 크기와 연산의 복잡성에 따라, 특정한 상황에서는 Pandas의 성능을 최적화하기 위한 방법이 필요하다.

 

Pandas의 데이터 처리 최적화 원리

  • Pandas는 기본적으로 low level 언어로 최적화가 많이 되었기 때문에, Pandas 데이터 처리를 위한 연산 과정에 Python 언어로 처리하는 과정이 생략되는 것이 좋다. 
  • Pandas는 메모리 위에서 동작한다. 이에 따라, 메모리의 가용 용량을 벗어나는 데이터 처리 및 연산은 한 번에 처리할 수 없다. 따라서, 메모리를 효율적으로 사용할 수 있도록 변경하는 것이 좋다.

 

Pandas 데이터 로드

  • 사실, Pandas를 많이 활용하는 이유 중 하나는 Pandas의 Dataframe이  SQL 테이블 형태와 거의 유사하기 때문이다.
  • Pandas에 들어갈 데이터를 Code 내부에서 주입하는 경우도 있지만, 대부분의 경우, Database나, CSV File 등에서 Import 해오는 경우가 많다.
  •  Pandas는 앞서 말한대로, 메모리에 DataFrame을 올려놓고, 연산하는 형태이기 때문에, 메모리가 연산을 효율적으로 처리할 수 있도록, 작은 단위의 필요한 데이터만 사용하는 것이 연산 측면에서 유리하다.

 

1. Query 및 파일 최적화

  • Pandas에서 SQL이나 CSV 등의 Raw 형태의 데이터를 읽고, 이를 Filtering하여Filtering 하여 사용하는 경우가 많은데, 이는 Raw 데이터 전체를 메모리 올려, 메모리 & I/O 부담을 증가시킨다. 따라서, 필요한 데이터만 미리 Filtering 하여 가져오는 것이 좋다.
  • 이렇게 Pandas에서 필요한 데이터만 가져오면, 전체 Series의 갯수(Row 수)가 줄기 때문에, Index 활용 측면에서도 유리하다.
  • 예시로, 한국어 형태소 분석을 위해 SQL 테이블에서 1주일치 데이터를 읽어서, 처리하였는데, 하루씩 읽어서 7번 처리하는 게 속도 면에서 더 효율적이었다.  
  • 마찬가지로, 필요한 칼럼만 가져오는 것이 성능 면에서 유리하다.
df = pd.read_csv('raw_data.csv', usecols=['col1', 'col2', ...])

 

2. Data Type 미리 지정

  • Pandas는 자료 구조형 선언의 제약을 받지 않는 파이썬 위에서 돌아가기 때문에, 읽어온 데이터를 통해 Data Type을 추론하는 과정이 포함된다. 
  • Data Type을 사용자가 미리 지정하면, 1) Data Type 추론 과정을 줄일 수 있고, 2) 실제 필요한 데이터 크기에 맞는 정도만 메모리를 할당하기 때문에, 성능 면에서 유리하다.
  • 다만, Data Type을 미리 지정하는 것은, 추후 연산 시에 메모리를 효율적으로 사용할 수 있다는 장점이 있지만, 읽는 속도 자체에는 큰 영향을 미치지 않는다.
dtypes = {'col1': 'int', 'col2': 'float', ...}
df = pd.read_csv('raw_data.csv', dtype=dtypes)

 

3. chunk 옵션 사용

  • 위의 방법을 사용해도, 어쩔수 없이 대용량 데이터를 모두 사용해야 하는 경우가 많다.
  • 이와 같은 경우에는 DataFrame을 chunk 단위로 처리하는 것이 효율적이다.
  • chuncksize를 지정해줘서, 한 번에 읽을(메모리에 올릴) Series(row) 수를 지정할 수 있다.
  • 하지만, 전체 데이터가 같이 필요한 것들(특정 칼럼 sort 등)은 처리가 까다롭기 때문에, 다른 행들 간의 연산이 비교적 적은 경우에 활용하는 것이 좋다.
for chunk in pd.read_csv('raw_data.csv', chunksize=10000):
    processing(chunk)

 

4. Dask 사용

  • 만약, 메모리가 감당하기 어려운 정도의 어려운 정도의 데이터 양과 연산이 포함된다면, 대용량 데이터를 분산 처리하기 위해 개발된, Dask를 사용할 수 있다.
  • Dask는 Pandas와 달리, Disk에 저장된 데이터를 처리하기 때문에, 여러 머신에서 분산처리가 가능하고, 지연 연산을 사용하기 때문에, 실제 연산을 최적화하는 과정이 포함된다.(SQL의 실행 Plan을 생각하면 된다.) 따라서, 초 대용량 데이터 처리에는 Dask의 강점이 있다.
  • 하지만, 메모리가 감당 가능한 수준의 연산에서는 메모리와 디스크의 속도 차이 등 때문에, Pandas가 유리하다.
import dask.dataframe as dd
if __name__ == "__main__":
	df = dd.read_csv('raw_data.csv')

 

 

Pandas 연산 & 조회

[실험 데이터셋]

  • Pandas 연산 테스트를 위해, 예시 데이터로 Kaggle 데이터셋을 사용했다. (https://www.kaggle.com/datasets/jordankrishnayah/45m-headlines-from-2007-2022-10-largest-sites?resource=download)
  • 사용할 데이터는, 4.5M 분량의 2007년부터 2022년 주요 언론사의 기사 제목 headline 데이터이다. 데이터는 총, 4405392개 row로 구성되어 있고, [Date, Publication, Headline, URL]의 4개의 칼럼으로 구성되어 있다.

 

1. 반복문 최적화

  • Pandas 연산에서 가장 큰 성능 개선 포인트는 반복문 연산이다.
  • Pandas가 Python 언어로 동작하기 때문에, Python의 list의 개념으로 Dataframe을 다루기 때문에, 이런 문제가 많이 발생한다.
  • Pandas를 For문을 통해, 각 row에 접근하는 경우에는, 각 row마다 연산을 각각 실행한다. 이에 따라, 데이터 크기가 커질수록 연산의 Overhead는 가중화된다.
  • 가장 쉬운 방법은 Pandas의 apply를 사용하는 것이다. 
  • 또한, Numpy Vectorize를 사용하는 방법, iterrows, itertuples를 사용하는 방법들이 있는데, 일반적으로 itertuples와 numpy vectorize는 Pandas Apply보다 좋은 성능을 보인다고 알려져 있다.
  • 추가적으로 멀티스레드를 이용하여, Pandas 연산을 병렬화 하고, 효율적으로 처리하는 swifter가 있다.

 

(테스트 상황)

  • headline 데이터셋에서 URL의 "http://" or "https://" 부분을 제거하는 작업을 테스트
  • headline 데이터셋에서 Date의 연도를 제거하는 작업을 테스트

 

(1) 단순 For문

for i in range(data.shape[0]):
    data["URL"][i] = data["URL"][i].replace('http://', '').replace('https://', '')

→ 실행 시간 : 94848.09 s(1000 row 실행시간: 21.53으로 추정)

 

for i in range(data.shape[0]):
    data["Date"][i] = data["Date"][i]%10000

→ 실행 시간 : 572.70s(1000 row 실행시간: 0.13으로 추정)

 

 

(2) Pandas Apply

data["URL"] = data["URL"].apply(lambda x: x.replace('http://', '').replace('https://', ''))

→ 실행 시간 : 1.47s

data["Date"] = data["Date"].apply(lambda x: x % 10000)

→ 실행 시간 : 0.88s

 

 

(3) Numpy Vectorize

  • numpy에서 제공하는 vectorize를 통해, 연산하고자 하는 함수를 vectorize화 할 수 있다.
  • numpy를 import 하여 쉽게 사용할 수 있다.
import numpy as np
vectorize_function = np.vectorize(lambda x: x.replace('http://', '').replace('https://', ''))
data["URL"] = vectorize_function(data["URL"])

→ 실행 시간 : 59.09s

import numpy as np
vectorize_function = np.vectorize(lambda x: x % 10000)
data["Date"] = vectorize_function(data["Date"])

→ 실행 시간 : 0.46s

 

(4) Vector화 된, For문 사용

  • Pandas는 numpy로 만들어졌기 때문에, numpy의 데이터를 조회하기 위한 iterator의 순회문을 사용하면, 빠르게 데이터를 순회할 수 있다.

[iterrows]

temp_date = []
for i, row in data.iterrows():
	temp_url.append(row["URL"].replace('http://', '').replace('https://', ''))
data["URL"] = temp_url

→ 실행 시간 : 88.10s

temp_date = []
for i, row in data.iterrows():
	temp_date.append(row["Date"]%10000)
data["Date"] = temp_date

→ 실행 시간 : 86.14s

 

 

[itertuples]

temp_url = []
for row in data.itertuples(index=False):
    temp_url.append(row.URL.replace('http://', '').replace('https://', ''))
data["URL"] = temp_url

→ 실행 시간 : 3.37s

temp_date = []
for row in data.itertuples(index=False):
	temp_date.append(row.Date%10000)
data["Date"] = temp_date

→ 실행 시간 : 2.69s

 

 

(5) Swifter

  • Swifter는 pandas의 apply를 멀티스레드를 통해, 병렬화하여 빠르게 처리하도록 하는 파이썬 패키지이다.
  • pip install swifter를 통해, 설치 가능하다.
data["URL"] = data["URL"].swifter.apply(lambda x: x.replace('http://', '').replace('https://', ''))

→ 실행 시간 : 3.98s

data["Date"] = data["Date"].swifter.apply(lambda x: x % 10000)

→ 실행 시간 : 0.37s

 

[실험 결과] 

  • Python 영역의 연산을 활용하는 경우(Python 라이브러리 or String 사용 등)에는 apply나 itertuples를 사용한 순회가 가장 좋은 성능을 보인다. → 첫 번째 실험
  • Python 영역의 연산을 활용하지 않는 경우, 즉, Cython으로 변환이 가능한 연산등은 np.vectorize가 가장 좋은 성능을 보인다.
  • 데이터의 타입, 크기, 연산에 따라, 가장 적합한 연산은 다르겠지만,
    • 일반적으로 apply나 itertuples를 사용한 순회가 가장 좋다.(일반적으로 대용량에선 itertuples가 apply보다 낫다고 함.)
    • 연산이 간단한 경우(Cython으로 변환이 될만한 간단한 연산)에는 np.vectorize를 통해 최적화가 가능하다.
    • 단순 for문은 사용하지 않는 것이 좋다. 
    • Swifter는 데이터의 크기가 매우 크고, 연산이 복잡하지 않은 연산에서 효과적이다.

 

2. 특정 조건 데이터 조회

  • 특정 조건 데이터 조회는 Pandas에서 자주 사용된다. 
  • 연산에 비해, 긴 시간이 걸리지는 않지만, 데이터가 많고, 연산이 복잡해질수록 조건에 맞는 데이터를 찾는 시간이 오래 걸린다.

(테스트 상황)

  • headline 데이터셋에서 2022년부터 데이터 중, New York Times의 데이터를 찾으려고 한다.

 

(1) Boolean Type으로 indexing

  • 가장 일반적인 방법이다. 여러 개의 칼럼들의 조건의 boolean 형태로 각각 연산하여 구할 True인 값만 가져올 수 있다.
filtered_data = data[(data["Date"]>20220000) & (data["Publication"]=='New York Times')]

→ 실행 시간 :0.14s

 

 

(2) loc를 이용한 indexing

  • Boolean Type으로 indexing과 동일하다.
filtered_data = data.loc[(data["Date"]>20220000) & (data["Publication"]=='New York Times')]

→ 실행 시간 :0.14s

 

 

 

(3) query를 사용한 조회

  • Dataframe은 query를 지원한다. (하지만, like 등의 조건은 지원하지 않는다.)
  • 참조하는 칼럼이 많고, 데이터가 클수록, query를 내부적으로 최적화하는 단계가 있어 더 좋은 성능을 보인다.
filtered_data = data.query("Date >20220000 and Publication == 'New York Times'")

→ 실행 시간 :0.07s

 

 

 

(4) isin을 사용한 indexing

  • 큰 범위애서 보면, Boolean Type으로 indexing에 속하는데, 특정 문자열과 일치하는 조건을 찾을 때는, boolean type에 isin을 넣어주면 더 빨리 찾을 수 있다.
filtered_data = data[(data["Publication"].isin(['New York Times'])) & (data["Date"] > 20220000)]

→ 실행 시간 :0.05s

 

 

(5) itertuples를 사용한 순회

  • 순회를 이용한 데이터 indexing은 별로 좋은 방법은 아니다.
  • 하지만, 연산과 조회를 같이하는 경우에는 한 번의 순회에 조회 조건을 넣어, 데이터를 찾는 것도 고려해 볼 수 있다.
find_index = []
for i, row in enumerate(data.itertuples(index=False), start=0):
    if row.Date > 20220000 and row.Publication == 'New York Times':
        find_index.append(i)

filtered_data = data.iloc[find_index]

→ 실행 시간 :2.01s

 

[실험 결과] 

  • 일반적으로 사용되는 boolean을 사용하는 것이 좋다고 알려져 있지만, 참조하는 칼럼이 많고, 데이터가 많을 경우에는 query를 사용하는 것이 효과적일 수 있다.
  • Boolean Type도 단순히 조건을 넣어서 indexing 하는 것보다, isin  같은 pandas 연산자를 함께 사용해서 데이터를 찾는 것이 효율적이다.

 

3. 문자열 포함 검색

  • SQL에서는 LIKE라는 특정 문자열을 포함했는지 여부를 찾는 방법이 있지만, Pandas에서는 LIKE를 지원하지 않는다. 
  • 생각보다, 특정 문자를 포함하는지 여부를 점검하는 경우가 많은데, 이런 경우 어느 방법이 효과적일까?

 

(테스트 상황)

  • headline 데이터셋에서 URL이 https 형식을 사용하는 row만 추출하려 한다.

(1) Pandas str contains를 사용한 검색

  • Pandas에서는 문자열 존재 여부를 체크해 주는 str.contains가 존재한다. 
filtered_data = data[data["URL"].str.contains("https://")]

→ 실행 시간 :1.18s

 

 

(2) apply를 통한, indexing (in)

  • Python에서 특정 문자가 포함되었는지 여부는 in을 통해 쉽게 파악할 수 있다.
  • pandas의 apply를 통한 indexing을 이용해 쉽게 문자열을 검색할 수 있다.
filtered_data = data[data["URL"].apply(lambda x: "https://" in x)]

→ 실행 시간 : 0.62s

 

 

(3) apply를 통한, indexing (startswith)

  • in과 마찬가지지만, in은 위치를 특정해주지는 못한다. 이럴 때는 startswith나 endwith 등을 사용하여 indexing 할 수 있다.
filtered_data = data[data["URL"].apply(lambda x: x.startswith("https://"))]

→ 실행 시간 : 0.69s

 

 

(4) apply를 통한, indexing (re)

  • in과 statrswith 등으로 문자열 검색이 가능하지만, 실제 검색하고자 하는 데이터들은 특정 형태를 가지고 있는 경우가 많다. 
  • 이런 경우, 보통 for문을 통한 순회를 가장 먼저 생각하는데, 위에서 보인대로, 단순 for문은 너무 많은 시간이 걸린다.
  • 이런 경우, apply에 re를 사용하여, 정규 표현식 형태로 문자를 검색할 수 있다.
import re
filtered_data = data[data["URL"].apply(lambda x: True if re.search("https://", x) else False)]

→ 실행 시간 : 2.38s

 

 

[실험 결과] 

  • str contains보다, pandas apply를 통해, boolean 형태의 output을 내는 함수를 정의하고, 이것을 indexing 하는 것이 더 빠르다.
  • 가장 중요한 것은 단순 반복문 형태의 순회는 최대한 지양하는 것이 성능 측면에서 도움이 된다.

 

 

반응형

Transformer는 사실, NLP 분야뿐만 아니라, 다양한 분야에서 많이 사용되기 때문에, 그만큼 구현 소스를 쉽게 찾을 수 있다. 나도, Transformer를 자주 사용하지만, 라이브러리에서 읽어오는 형태로 사용하기 때문에, 그 상세 구조에 대해서는 대략적으로만 알고 있다. 이번 기회에 Transformer를 pytorch로 직접 짜보면서 그 구조를 정확히 이해하고자 한다.

 

Full source : https://github.com/daehwichoi/transformer-pytorch/blob/main/model/transformer.py

 

구현 방향

  • 사실, pytorch로 Transformer를 구현한 사례는 google 검색만 해도 굉장히 많이 나온다. 하지만, original transformer를 직접 구현해보고 싶어서, 논문을 그대로 구현하는데 초점을 맞췄다. 
  • 모델 학습을 위한 layer(Dropout 등)나, dataloader는 task마다 다르고, 구현 목적이 Transformer 모델을 구현하는 것이기 때문에, 모델 구현만 진행했다.

 

참고 자료

  • Transformer 논문 내, 구조 설명 부분

2023.05.08 - [NLP 논문] - Transformer (Attention Is All You Need) - (1) 리뷰

 

Transformer (Attention Is All You Need) - (1) 리뷰

Transformer 배경 설명 Transformer는 Google Brain이 2017년 "Attention is All You Need"라는 논문에서 제안된 딥러닝 모델이다. Transformer는 기존 자연어 처리 분야에서 주로 사용되던 RNN, LSTM 같은 순환 신경망 모

devhwi.tistory.com

 

 

구조 설명

  • Transformer는 크게 Encoder 부분과 Decoder 부분, input&output embedding, postional encoding으로 나뉜다.
  • Encoder 부분은 N개의 Encoder가 연결된 구조로 구성되어 있고, Decoder도 N개의 Decoder가 연결된 구조로 구성되어 있다.
  • Ecoder는 크게, Multi-Head Attention(self-attention)과 redidual 부분(residual add  & layer norm), Feed Forward로 구성되어 있다. 
  • Decoder는 크게, Masked-Multi-Head Attention(self-attention)과 resiedidual 부분(residual add  & layer norm), Multi-Head Attention(encoder-decoder attention), Feed Forward로 구성되어 있다.

 

구현 내용 설명

(순서는 내가 구현한 순서이다.)

1. Multi-Head Attention

[Sacled Dot-Product Attention]

  • Multi-Head Attention의 핵심은 scaled_dot_product_attention이다.
  • scaled_dot_product는 Query, Key, Value가 있을 때, Query와 Key의 Transpose의 Matmul(Dot Product)를 통해, similarity를 계산하고, similarity 기반으로 Value 값을 참조한다.
  • Scaled Dot-product는 Network 여러 부분에서 사용되지만, Decoder 부분에서는 masking 처리를 해야 하는 부분이 있기 때문에, mask부분을 포함해서 함께 구현했다.

    def scaled_dot_product_attention(self, q, k, v, mask=None):
        d_k = k.size()[-1]
        k_transpose = torch.transpose(k, 3, 2)

        output = torch.matmul(q, k_transpose)
        output = output / math.sqrt(d_k)
        if mask is not None:
            output = output.masked_fill(mask.unsqueeze(1).unsqueeze(-1), 0)

        output = F.softmax(output, -1)
        output = torch.matmul(output, v)

        return output

[Multi-Head Attention]

  • Multi-Head Attention은 scaled Dot-Product Attention을 query에 해당하는 value 값들을 참조하기 위해 사용하는데, query, key, value를 그대로 사용하는 것이 아니라, 여러 개의 head로 나누고, query, key, value를 linear projection 한 후, 사용한다. 
  • Scaled Dot-Product Attention 이후, 각 head의 value 값을 concat하고, linear layer을 거쳐 output을 낸다.
  • 주의할 점은, sequence의 순서가 중요하기 때문에, contiguous를 사용해서, 순서를 유지한다는 점이다.

class MultiHeadAttention(nn.Module):
    def __init__(self, dim_num=512, head_num=8):
        super().__init__()
        self.head_num = head_num
        self.dim_num = dim_num

        self.query_embed = nn.Linear(dim_num, dim_num)
        self.key_embed = nn.Linear(dim_num, dim_num)
        self.value_embed = nn.Linear(dim_num, dim_num)
        self.output_embed = nn.Linear(dim_num, dim_num)

    def scaled_dot_product_attention(self, q, k, v, mask=None):
    ...
    
    def forward(self, q, k, v, mask=None):
        batch_size = q.size()[0]

        # 순서 유지 때문에 view 후 transpose 사용
        q = self.query_embed(q).view(batch_size, -1, self.head_num, self.dim_num // self.head_num).transpose(1, 2)
        k = self.key_embed(k).view(batch_size, -1, self.head_num, self.dim_num // self.head_num).transpose(1, 2)
        v = self.value_embed(v).view(batch_size, -1, self.head_num, self.dim_num // self.head_num).transpose(1, 2)

        output = self.scaled_dot_product_attention(q, k, v, mask)
        batch_num, head_num, seq_num, hidden_num = output.size()
        output = torch.transpose(output, 1, 2).contiguous().view((batch_size, -1, hidden_num * self.head_num))

        return output

 

2. Residual Add & Layer Norm

[Layer Norm]

  • Layer Norm은 dimension layer 방향으로 평균을 빼고, 표준 편차로 나누는 Normalization 기법이다.
  • 이 부분은 nn.LayerNorm을 통해, 구현할 수 있다. 
   def layer_norm(self, input):
        mean = torch.mean(input, dim=-1, keepdim=True)
        std = torch.std(input, dim=-1, keepdim=True)
        output = (input - mean) / std
        return output

[Add & Layer Norm]

  • 이전 층의 output을 layer norm을 통해, normalization 한 후, residual 값을 더해준다.
class AddLayerNorm(nn.Module):
    def __init__(self):
        super().__init__()

    def layer_norm(self, input):
    ...

    def forward(self, input, residual):
        return residual + self.layer_norm(input)

 

3. Feed Forward

  • Feed Forward는 Fully Connected Layer → Relu →  Fully Connected Layer로 구성되어 있다.

class FeedForward(nn.Module):
    def __init__(self, dim_num=512):
        super().__init__()
        self.layer1 = nn.Linear(dim_num, dim_num * 4)
        self.layer2 = nn.Linear(dim_num * 4, dim_num)

    def forward(self, input):
        output = self.layer1(input)
        output = self.layer2(F.relu(output))

        return output

 

4. Encoder

  • Encoder는 Multi-Head Attention → Residual Add & Layer Norm → Feed Forward → Residual Add & Layer Norm 순으로 구성되어 있다. 
  • Encoder는 단순히, 앞서 선언했던, sub layer들을 연결하는 방식으로 구현했다.

class Encoder(nn.Module):
    def __init__(self, dim_num=512):
        super().__init__()
        self.multihead = MultiHeadAttention(dim_num=dim_num)
        self.residual_layer1 = AddLayerNorm()
        self.feed_forward = FeedForward(dim_num=dim_num)
        self.residual_layer2 = AddLayerNorm()

    def forward(self, q, k, v):
        multihead_output = self.multihead(q, k, v)
        residual1_output = self.residual_layer1(multihead_output, q)
        feedforward_output = self.feed_forward(residual1_output)
        output = self.residual_layer2(feedforward_output, residual1_output)

        return output

 

5. Decoder

  • Decoder는 Masked Multi-Head Attention → Residual Add & Layer Norm → Multi-Head Attention → Residual Add & Layer Norm Feed Forward → Residual Add & Layer Norm 순으로 구성되어 있다.
  • Encoder와 마찬가지로, 앞서 구현해놓은 sub-layer를 연결하면 되지만, 중간 Multi-Head Attention은 Query와 Key를 Encoder의 Output을 사용하기 때문에, 이 점을 명시해야 한다.
  • Decoder는 Ecoder와 다르게, masking을 이용하여, mask를 인자로 받는 것도 주의해야 한다.

class Decoder(nn.Module):
    def __init__(self, dim_num=512):
        super().__init__()

        self.masked_multihead = MultiHeadAttention(dim_num=dim_num)
        self.residual_layer1 = AddLayerNorm()
        self.multihead = MultiHeadAttention(dim_num=dim_num)
        self.residual_layer2 = AddLayerNorm()
        self.feed_forward = FeedForward(dim_num=dim_num)
        self.residual_layer3 = AddLayerNorm()

    def forward(self, o_q, o_k, o_v, encoder_output, mask):
        masked_multihead_output = self.masked_multihead(o_q, o_k, o_v, mask)
        residual1_output = self.residual_layer1(masked_multihead_output, o_q)
        multihead_output = self.multihead(encoder_output, encoder_output, residual1_output, mask)
        residual2_output = self.residual_layer2(multihead_output, residual1_output)
        feedforward_output = self.feed_forward(residual2_output)
        output = self.residual_layer3(feedforward_output, residual2_output)

        return output

 

6. Transformer

  • 전체 Transformer는 Input Embedding, Positional Encoding, Output Embedding, N개의 encoder와 N개의 decoder로 구성되어 있다. 

[positional_encoding]

  • positinal encoding은 짝수번째 token과 홀수번째 token이 각기 다른 식을 따른다. 아래 식에서 i는 hidden dimension 방향의 index이고, pos는 positional 방향(몇 번째 seq인지)을 의미한다.
  • positional encoding은 크게, 두 부분에서 사용되는데, Input과 Output의 sequence length 길이가 다를 수 있기 때문에, 이것을 인자로 받는 형태로 구현했다.
  • 마지막에 self.register_buffer는 추후, model parameter 학습 시, psotional encoding이 학습되지 않도록 막아주기 위한 용도이다. 

    def position_encoding(self, position_max_length=100):
        position = torch.arange(0, position_max_length, dtype=torch.float).unsqueeze(1)
        pe = torch.zeros(position_max_length, self.hidden_dim)
        div_term = torch.pow(torch.ones(self.hidden_dim // 2).fill_(10000),
                             torch.arange(0, self.hidden_dim, 2) / torch.tensor(self.hidden_dim, dtype=torch.float32))
        pe[:, 0::2] = torch.sin(position / div_term)
        pe[:, 1::2] = torch.cos(position / div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

        return pe

 

[input & output Embedding]

  • Embedding은 nn.Embedding을 통해, 쉽게 구현할 수 있다.
  • Embedding의 첫 번째 인자는 input 데이터의 total word 개수, 두 번째 인자는 hidden dimension의 수이다.
  • total_word_num은 sequence dictionary에 존재하는 unique value의 개수를 의미한다. (전체 단어가 아님을 주의)
  • 사실 편의를 위해, 공통 total_word_num을 사용했는데, 번역과 같은 경우, input의 단어 개수와 output의 단어 개수가 다를 수 있어, task에 따라서는 다른 인자를 받는 게 맞다.
 self.input_data_embed = nn.Embedding(total_word_num, self.hidden_dim)
 self.output_data_embed = nn.Embedding(total_word_num, self.hidden_dim)

 

[Transformer]

  • Transformer의 Encoder 부분은 앞서 구현했던, Encoder를 N번 반복하는 구조로 구현되어 있다. 
  • Encoder 부분에 들어가는 query, key, value는 문장의 embedding 한 값으로 모두 같고, (참조를 위한 query와 key가 비효율적이다.) 전번째 encoder의 결과가 다음 encoder의 query, key, value가 된다.
  • Decoder 부분도 비슷하지만, Encoder의 output이 사용된다는 점, Decoder 단에서는 다음 sequence를 볼 수 없기 때문에, 그 부분을 처리하기 위한 mask가 존재한다는 점이 다르다. 
  • Decoder에 masking으로 0 값을 넣어주었지만, 실제 학습해서는 매우 작은 값을 넣어주는 것이 학습 측면에서 유리하다고 한다.
  • Encoder 부분과 Decoder 부분을 모두 거치면, 목적에 맞는 fully connected layer를 연결하여, output을 낸다. 
class Transformer(nn.Module):
    def __init__(self, encoder_num=6, decoder_num=6, hidden_dim=512, max_encoder_seq_length=100,
                 max_decoder_seq_length=100):
        super().__init__()

        self.encoder_num = encoder_num
        self.hidden_dim = hidden_dim
        self.max_encoder_seq_length = max_encoder_seq_length
        self.max_decoder_seq_length = max_decoder_seq_length

        self.input_data_embed = nn.Embedding(max_seq_length, self.hidden_dim)
        self.Encoders = [Encoder(dim_num=hidden_dim) for _ in range(encoder_num)]

        self.output_data_embed = nn.Embedding(max_seq_length, self.hidden_dim)
        self.Decoders = [Decoder(dim_num=hidden_dim) for _ in range(decoder_num)]

        self.last_linear_layer = nn.Linear(self.hidden_dim, max_seq_length)

    def position_encoding(self, position_max_length=100):
    ...

    def forward(self, input, output, mask):

        input_embed = self.input_data_embed(input)
        input_embed += self.position_encoding(self.max_encoder_seq_length)
        q, k, v = input_embed, input_embed, input_embed

        for encoder in self.Encoders:
            encoder_output = encoder(q, k, v)
            q = encoder_output
            k = encoder_output
            v = encoder_output

        output_embed = self.output_data_embed(output)
        output += self.position_encoding(self.max_decoder_seq_length)
        output_embed = output_embed.masked_fill(mask.unsqueeze(-1), 0)
        d_q, d_k, d_v = output_embed, output_embed, output_embed

        for decoder in self.Decoders:
            decoder_output = decoder(d_q, d_k, d_v, encoder_output, mask)
            d_q = decoder_output
            d_k = decoder_output
            d_v = decoder_output

        output = F.softmax(self.last_linear_layer(decoder_output), dim=-1)
        return output

 

총평

  • 실제 NLP 단어 예측 등, 데이터셋을 넣어보기 위해, dataloader와 학습 등을 연결해 봐야겠다.
  • 특정 task를 풀기 위해, 데이터셋을 처리하기 위한 model을 짜는 것도 좋지만, 가끔은 논문을 그대로 구현해 보는 것도 좋을 것 같다. 특히, 그림과 글만 보고 구현을 하려고 하니, 내가 정확하게 알지 못했던 부분, 특히 머리로 이해하고 넘어간 부분을 완전히 알게 된 것 같아 좋다. 
반응형

Introduction

  • Pytorch 학습 중, Resource와 모델 구조에 대한 profiling은 torch profiler를 이용해 가능하였다.

2023.07.09 - [Python] - Pytorch 구조 & Resource Profiler 도구 (torch profiler)

 

Pytorch Resource & 모델 구조 Profiler 도구 (torch profiler)

Introduction 딥러닝 학습을 잘(?)한다는 것을 정의하기는 어렵지만, 더 빠른 시간 안에 많은 양을 학습하는 것은 매우 중요하다. 딥러닝의 모델은 다수의 layer로 구성되어 있기 때문에, 각 layer의 결

devhwi.tistory.com

  • 이때, profiling의 결과는 테이블 구조의 텍스트 형태로 터미널에 출력 or 파일에 저장 가능하다.
  • 이 결과로도 Insight를 충분히 추출할 수 있지만, 텍스트 형태로 분석하다 보면, 가시성이 떨어진다는 점과, profiling 이력 간 비교가 어렵다는 점이 아쉽다.
  • 이를 해결할 수 있는 딥러닝의 시각화 툴인 Tensorboard에 profiling 결과를 올리는 방법이 있어서, 이 방법을 알아보고자 한다.

 

원리

  • torch profiler의 옵션에 "on_trace_ready"라는 옵션이 존재한다. 이것은 profiling 결과가 준비되었을 때, 호출할 callback 함수를 지정하는 것인데, 이 callback 함수로 tensorboard에서 제공하는 trace handler를 연결하여, tensorboard에서 읽을 수 있는 log 형태로 떨궈준다.

 

Setup

  • tensorboard에서 torch profiler의 결과를 읽어서 표현할 수 있는 plugin을 추가로 설치해야한다. (당연히, tensorboard가 필요하기 때문에, 아래 plugin을 설치하면, tensorboard도 자동으로 설치된다.)
pip install torch_tb_profiler

 

사용법

  • torch profiler를 이용하기 위한, 콘텍스트 관리자(with 절)에  on_trace_ready 옵션에, "tensorboard_trace_handler" 함수를 지정해 준다.
  • 이때, tensorboard_trace_handler 함수의 인자로, tensorboard에서 읽을 수 있도록 log 디렉터리를 지정해 준다.
  ...
  with profile(activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA], record_shapes=True,
                 profile_memory=True, on_trace_ready=torch.profiler.tensorboard_trace_handler('./log/resnet18')) as prof:
        for epoch in range(TRAIN_EPOCH):
            running_loss = 0.0
            for i, data in enumerate(trainloader, 0):
                inputs, labels = data[0].to(device), data[1].to(device)
                optimizer.zero_grad()
                with record_function("model_inference"):
                    outputs = model(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()

                running_loss += loss.item()
                if i % TRAIN_PRINT_FREQUENCY == TRAIN_PRINT_FREQUENCY - 1:
                    print(f'Epoch: {epoch + 1}, Batch: {i + 1}, Loss: {running_loss / 200:.3f}')
                    running_loss = 0.0
  ...
  • 그 후에, tensorboard 실행을 해주면 끝난다.
tensorboard --logdir=./log/resnet18

 

결과

[NORMAL]

  • Overview : Overview는 전체적인 프로파일링의 결과에 대해 보여주는 화면이다.
    • Configuration : Profiling 시 사용된 설정 정보를 표시한다. 
    • Execution Summary : 전체 수행 시간과, 각 단계에 소요된 수행시간을 보여준다.
    • Spen Time Breakdown : 코드 또는 연산의 실행 시간을 단계별(Kernel, Memcpy, Memset, Runtime, DataLoader, CPU Exec, Other)로 분해하여 보여준다. 
    • Performance Recommentation : profiling 결과를 기반으로 한 성능 개선 권장 사항을 자동으로 생성해 준다. (실제로 어떤 원리로 동작하는지는 잘 모른다.)

  • Operator : 연산에 대한 profiling 결과를 보여준다. torch profiling 결과를 터미널에서 출력하였을 때, 보여주던 결과를 그대로 보여준다고 생각하면 된다. 추가적으로 Tensor Cores Eligible 옵션이 있는데, 해당 연산이 GPU를 사용할 수 있는지에 대한 가능 여부를 표시한 것이다. Group By 조건을 바꾸면, input shape도 볼 수 있다.

 

  • Trace : 함수 및 연산의 실행 시간을 시간 경과에 따라 그래프 형태로 표시한다. 해당 코드의 실행에 사용된 Process와 그 안의 Thread의 동작을 확인 할 수 있다.  (사실 이 UI는 torch profiler의 chrome tracing 기능으로도 볼 수 있다.)

 

  • Memory : 실행 시간에 따른 Memory 사용 추이를 보여준다. 각 연산마다 할당한 메모리와, Allocation Time과 Release Time, Duration을 보여준다. 코드 실행에 사용한 H/W 별로 볼 수 있다. (다만, 해당 UI에서 Memory를 많이 사용하는지, Chrome이 계속 죽는다.)

 

 

[DIFF]

  • Tensorboard를 통한 torch profiler 시각화의 가장 큰 장점이라고 할 수 있는 이력 간 비교 기능이다. Baseline의 log를 정한 뒤, 비교하고자 하는 log를 대입하면, 그 둘 간의 profiling 결과의 delta 값을 보여준다. 
  • 이를 통해, Profiling 결과를 비교하면서, H/W 효율화를 위한 구조 개선을 진행할 수 있다.

 

 

Torch 모델에서 torch profiling을 시각화하여 비교하는 방법을 알아보았다. 개인 프로젝트에서는 그 효용이 덜하겠지만, 많은 사람들이 같은 모델을 연구할 때, 성능과 profiling 결과를 모두 tensorboard를 통해 시각화하여, 성능을 유지하면서 모델의 연산 효율성을 향상하거나, 모델의 연산  효율성을 유지하면서 모델의 성능을 향상하는 데, 사용하면 매우 유용할 것이다. 

+ Recent posts