쿼터니언이란 무엇인가?

쿼터니언(Quaternion)이란 선형대수학에서 사용되는 개념으로 3차원 벡터 공간에서 회전을 표현하는 수학적 개념이다. 우리말로 사원수라고 일컬으며 4개의 요소로 구성된 복소수 시스템이다.

 

쿼터니언은 어떤 배경에서 고안되었는가?

쿼터니언은 3차원 공간의 회전을 표현한다고 했다. 이러한 3차원 공간의 회전을 표현하는 데 있어 원래는 오일러 각(roll, pitch, yaw)을 사용해 표현한다. 하지만 이 오일러 각을 사용할 경우 Gimbal Lock 현상이 발생한다. 잠깐 Gimbal에 대한 개념을 설명하자면, 단일 축에 대한 회전을 허용하는 수평 유지 장치를 의미한다. 

 

(좌): 각각의 짐벌은 roll, pitch, yaw의 자유도를 갖는다.

 

Gimbal Lock 현상은 3차원 공간상에서 두 축이 겹쳐 한 축이 소실되거나 자유도가 떨어지는 현상이다.  각 Gimbal들은 각자 특정 축(x, y, z)으로만 회전하도록 되어 있다. 하지만 위 우측 그림의 STATIC POSITION을 보면 맨 안쪽의 Gimbal은 다른 Gimbal을 회전시켜 두지 않는 이상 더 이상 roll 축으로 회전할 수 없게 된다. 이를 Gimbal Lock 현상이라 한다. 그렇다면 왜 Gimbal Lock 현상이 발생할까? 그 원인은 세 축이 의존적이기 때문이다. 예를 들어 z축을 회전시키면 x, y축도 함께 회전하게 되기 때문이다.

 

또 다르게 말하면 오일러 각에서 회전 자체를 세 축으로 나눠서 계산하기 때문이다. 예를 들어 어떤 물체 a가 있고 이 물체 a를 roll에 대해 회전하고 pitch에 대해 회전하고 yaw에 대해 회전한다고 가정하자. 어떤 물체 a는 roll에 대해 회전한 뒤 a’가 된다. 이후 pitch에 대해 회전한다고 하면 a에 대해 회전하는 게 아니라 a` 에서 회전하게 된다. 또 yaw에 대해 회전한다고 하면 a`가 아니라 a``에 대해 회전시킨다. 즉 각 축에 대한 회전은 이전 축에 의존하는 것이다.

 

쿼터니언은 이 Gimbal Lock 문제를 어떻게 해결할까?

쿼터니언은 정확히는 Gimbal Lock 문제를 해결하진 못한다. 다만 오일러 각을 사용해 회전을 표현할 때보다 Gimbal Lock 문제를 최소화시킬 수 있다. 쿼터니언은 4차원 복소수 공간의 벡터로 마찬가지로 벡터 공간에서의 회전을 표현하는 역할을 한다. 쿼터니언은 다음과 같은 형태를 가진다.

 

$$ q = xi + jy + zk + w $$

 

크게 보자면 쿼터니언 q는 xi + jy + zk는 벡터 요소, w는 스칼라 요소로 구성된다. i, j, k는 각각 x, y, z 축을 나타내는 허수 단위 벡터다. 이러한 쿼터니언은 4개의 요소로 구성되어 4차원 공간에서 회전을 표현한다. 이렇게 회전을 4차원으로 표현하는 쿼터니언은 세 개의 축을 사용하는 3차원 오일러 각과 달리 축의 의존성을 없애고 독립성을 가져오는 데 도움을 준다. 쿼터니언의 수식적 표현으로 파고들어 쿼터니언의 4차원 구조와 회전 연산 수식에 내재된 특성으로 짐벌락 문제를 해결하는 원리를 이해하는 데 도움될 수 있다 생각드나 짐벌락 문제 자체는 직관적으로 이해하는 것이 더 직접적이란 생각이 든다.

 

Reference

[1] https://handhp1.tistory.com/3

[2] https://luv-n-interest.tistory.com/805

[3] https://kangworld.tistory.com/185

[4] https://ko.wikipedia.org/wiki/짐벌

모델 학습 후 성능 최적화를 위해 하이퍼파라미터 튜닝을 수행할 때가 있다. Optuna 사용 방법은 여러 문서에 기술되어 있어 참고할 수 있지만 파이토치에서 분산처리를 적용한 Multi-GPU 환경에서 Optuna를 함께 실행하는 방법에 대한 레퍼런스가 적어 약간의 삽질을 수행했기 때문에 추후 참고 목적으로 작성한다. 또한 파이토치로 분산처리 적용하여 Multi-GPU 학습을 수행하고자할 때 mp.spawn의 리턴값을 일반적으로는 받아오지 못하는 경우가 있어 이에 대한 내용도 덧붙이고자 한다.

 

전체 의사코드는 아래와 같고, 각 함수마다 간단히 서술한다.

import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler

def setup(rank, world_size):
    dist.init_process_group(backend='nccl', init_method="tcp://127.0.0.1:1234", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

def run_tuning(tuning_fn, world_size, epochs, lr, optimizer_name):
    parent_conn, child_conn = mp.Pipe()
    mp.spawn(tuning_fn, args=(world_size, epochs, lr, optimizer_name, child_conn), nprocs=world_size, join=True)
    while parent_conn.poll():
        best_mAP = parent_conn.recv()
    return best_mAP
    
def tune_hyperparameter(rank, world_size, optimizer, scheduler, epochs, trial):
    setup(rank, world_size)
    torch.cuda.set_device(rank)

    model = Model().to(rank)
    model = DDP(model, device_ids=[rank])
    
    train_dataset = ''
    valid_dataset = ''
    train_sampler = DistributedSampler(train_dataset, num_replicas=world_size, rank=rank, shuffle=True)
    valid_sampler = DistributedSampler(valid_dataset, num_replicas=world_size, rank=rank, shuffle=False)
    train_loader = torch.utils.data.DataLoader()
    valid_loader = torch.utils.data.DataLoader()

    for epoch in range(epochs):
        train()
        mAP = valid()
        ...

    conn.send(best_mAP)
    trial.report(mAP, epoch)
    if trial.should_prune():
        raise optuna.TrialPruned()

    cleanup()
    
def objective(trial, world_size):
    epochs = trial.suggest_int("epoch", 10, 20)
    lr = trial.suggest_float("lr", low=1e-7, high=1e-2, log=True)
    optimizer_name = trial.suggest_categorical("optimizer", ["Adam", "AdamW", "AdamP", "RAdam"])
    
    best_mAP = run_tuning(tune_hyperparameter, world_size, epochs, lr, optimizer_name)
    return best_mAP
    
    
if __name__ == "__main__":
	seed = 42
    ngpus_per_node = torch.cuda.device_count()
    world_size = ngpus_per_node

    func = lambda trial: objective(trial, world_size, world_size)

    study = optuna.create_study(direction="maximize", sampler=RandomSampler(seed=seed))
    study.optimize(func, n_trials=5)

    pruned_trials = study.get_trials(deepcopy=False, states=[TrialState.PRUNED])
    complete_trials = study.get_trials(deepcopy=False, states=[TrialState.COMPLETE])

    trial = study.best_trial.value

 

Optuna 사용 핵심은 objective 함수 작성이다. 함수 내에는 튜닝할 하이퍼파라미터의 종류와 그 값의 범위값을 작성해준다. 이후 학습 루틴을 수행 후 그 결과를 리턴값으로 전달해주는 것이 끝이다. 학습루틴을 objective 함수 내부에 정의해도 되나 분산학습을 함께 적용하기 위해 별도의 함수로 만들어 실행한다. 

import optuna

def objective(trial, world_size):
    epochs = trial.suggest_int("epoch", 10, 20)
    lr = trial.suggest_float("lr", low=1e-7, high=1e-2, log=True)
    optimizer_name = trial.suggest_categorical("optimizer", ["Adam", "AdamW", "AdamP", "RAdam"])
    
    best_mAP = run_tuning(tune_hyperparameter, world_size, epochs, lr, optimizer_name)
    return best_mAP
    
if __name__ == "__main__":
	func = lambda trial: objective(trial, world_size)
    study = optuna.create_study()
    study.optimize(func, n_trials=5)

run_tuning 함수는 인자로 실제로 하이퍼파라미터 튜닝을 수행할 함수를 받는다. run_tuning 함수는 다음과 같이 구성되어 있다. 

def run_tuning(tuning_fn, world_size, epochs, lr, optimizer_name):
    parent_conn, child_conn = mp.Pipe()
    mp.spawn(tuning_fn, args=(world_size, epochs, lr, optimizer_name, child_conn), nprocs=world_size, join=True)
    while parent_conn.poll():
        best_mAP = parent_conn.recv()
    return best_mAP

run_tuning은 실제로 하이퍼파라미터 튜닝을 수행할 함수를 첫 번째 인자(tuning_fn)로 받는다. mp.spawn은 리턴값이 없으므로 학습 관련 결과에 대한 값을 받아올 수 없다. 만약 multi-gpu를 통한 분산처리의 리턴값을 받아올 필요가 없다면 mp.spawn() 한 줄만 작성해주어도 된다. 하지만 필요한 경우가 있다. 이를 해결하기 위해 mp.Pipe()를 사용한다. 학습 관련 결과를 child_conn을 이용해 parent_conn으로 전송하는 것이다. child_conn은 학습 함수인 tuning_fn의 인자로 들어가서 학습 관련 결과에 대해 parent_conn으로 전달한다. mp.spawn이 종료되면 parent_conn에서 poll() 메서드와 recv() 메서드를 통해 그 결과를 가져올 수 있다. 

 

아래 함수는 실제로 파이토치에서 Multi-GPU를 활용해 학습하는 루틴을 정의한 함수다. 

def tune_hyperparameter(rank, world_size, optimizer, scheduler, epochs, trial):
    setup(rank, world_size)
    torch.cuda.set_device(rank)

    model = Model().to(rank)
    model = DDP(model, device_ids=[rank])
    
    train_dataset = ...
    valid_dataset = ...
    train_sampler = DistributedSampler(train_dataset, num_replicas=world_size, rank=rank, shuffle=True)
    valid_sampler = DistributedSampler(valid_dataset, num_replicas=world_size, rank=rank, shuffle=False)
    train_loader = torch.utils.data.DataLoader(...)
    valid_loader = torch.utils.data.DataLoader(...)

    for epoch in range(epochs):
        train()
        mAP = valid()
        ...

    conn.send(max_best_mAP)
    trial.report(mAP, epoch)
    if trial.should_prune():
        raise optuna.TrialPruned()

    cleanup()

함수의 가장 처음과 끝은 setup()과 cleanup()으로 각각 Multi-GPU로 학습 가능하도록 초기화하고 학습 이후 환경 초기화를 수행하는 역할을 한다. 모델은 DDP로 래핑해주고 데이터셋도 DistributedSampler를 사용한 결과로 데이터로더를 만들어준다. 이후 학습을 수행하고 conn.send()를 통해 그 결과를 parent_conn()으로 전달한다. 

논문 제목: Multi-Task Pre-Training for Plug-and-PlayTask-Oriented Dialogue System

게재 학회 / 게재 년도: ACL / 2022.05

 

이 논문은 기존의 모듈화된 dialogue system을 하나의 모델에서 end-to-end 방식으로 동작가능한 PPTOD라는 모델에 관한 논문이다. 기존의 dialogue system은 대개 4개의 모듈인 NLU, DST, DP, NLG로 나뉜 방식으로 구성된다. 논문에서는 이러한 모듈화된 방식에 순서성이 있다고 하여 cascaded 방식이라 부른다. 하지만 이 방식에는 크게 3가지 문제가 있다고 말하며 PPTOD 모델의 연구 배경을 말한다. 그 문제는 모듈화된 계단식 dialogue system 구성이 첫 번째로 이전 모듈에서 에러가 발생하면 이후 모듈에 에러가 전파된다는 것이며 두 번째로 모든 모듈에서 각 모델이 학습하기 위해 각각의 데이터셋 수집과 라벨링 과정이 필요하고 마지막으로 계단식이기에 필연적으로 Inference latency가 느려진다는 것이다.

따라서 이러한 문제점을 해결하기 위해 end-to-end 방식의 PPTOD라는 모델을 제시한다. 여기서 end-to-end란 기존의 NLU, DST, DP, NLG를 하나로 통합한 것이다. 따라서 유저의 utterance가 들어오면 한번에 1. 의도 분류 2. 슬롯 필링 3. 액션 생성 4. 대답 생성이 가능하다. 아래 PPTOD 모델을 만들기 위해 pre-training하는 예시를 살펴보자.

가장 앞에 "translate dialogue to A"는 하나의 모델에서 DST, NLU, DP, NLG 중 어떤 것인지 구분하기 위해 사용되는 일종의 prefix이다. 이 prefix는 위 그림과 같이 "belief state", "user intent", "dialogue act", "system response"가 있다. 두 번째로 DST를 위해 이전까지 유저와 봇이 주고 받은 대화와 현재 유저의 utterance를 모두 concatenation해준 다음 입력으로 넣어준다. 그렇게 되면 이제 prefix에 따라 생성되는 결과값이 4가지로 분리된다. 즉, slot filling된 결과, user intent, bot action, bot 응답이다. 간단히 정리하면 prefix와 유저 utterance가 입력으로 들어가고 4개의 결과를 모두 출력할 수 있는 구조인 것이다. 다르게 말하면 세부적으로 모듈화된 task-oriented dialogue 문제를 단일한 포맷의 text generation 문제로 바꾼 모델이다. 

이 T5 기반의 end-to-end 모델을 학습하기 위해 추가적인 pre-training과, fine-tuning 과정을 거쳤다. 데이터셋은 기존의 Dialogue System을 만들기 위해 공개된 아래 데이터셋을 조합했다.

총 80개 도메인의 230M개 가량의 유저 utterance 데이터셋을 사용했다. 우선 pre-training을 위해 사용한 세부적인 하이퍼파라미터는 learning rate: 5e-5, epoch: 10, optimizer: Adam, model: T5, max_seq_len: 1024, batch_size: 128, loss: maximum likelihood이며 구현의 용이성을 위해 허깅페이스 라이브러리를 사용했다. fine-tuning은 pre-training과 동일한 하이퍼 파라미터를 사용했다.

 

모델 학습에 사용하는 데이터셋의 형태는 $d = (z_t, x, y)$을 가진다. $d$는 데이터셋을 의미하고, $\displaystyle t \in \{NLU, DST, DP, NLG\}$이고 $z_t$는 "translate dialogue to A:" 형태의 prompt를 의미한다. $x$는 유저의 현재 발화 + 이전의 유저 발화 + 봇 응답이 전부 concatenation 된 형태이다. $y$는 생성해야할 target sequence를 의미한다. 학습에 사용한 loss 함수는 maximum likelihood를 사용했다. 

 

PPTOD 모델의 버전은 크게 3가지로 small, base, large 모델이 있다. 각각 t5-small, t5-base, t5-large에 대해 pre-training하고 fine-tuning한 결과이다. 다만 학습시킬 때 각각의 사이즈 별로 다른 configuration을 사용했다고 한다. 이에 대한 별도의 언급은 없다. t5 모델의 기본 configuration을 사용했을 것이다. 

 

PPTOD 모델을 평가하기 위해 크게 3가지 측면에서 벤치마킹을 수행했다. 1. end-to-end dialogue modeling 2. DST 3. user intent classification 측면이다. 벤치마킹을 위해 MultiWOZ 2.0과 MultiWOZ 2.1를 사용했다. 결과적으로 3가지 측면 모두 PPTOD 모델의 우수성을 이야기하고 있다. 

 

 

첨언하자면 MultiWOZ 데이터셋에 있는 Inform, Success, BLeU, Combined Score는 MultiWOZ 데이터셋에서 제시하는 평가 가이드라인이라고 한다. 또 Combined Score는 (Inform + Success) * 0.5 + BLUE로 계산된다고 한다. 위 표의 성능을 보면 PPTOD base 모델이 가장 좋다. PPTOD large 모델이 오히려 성능이 떨어지는 것은 사전 훈련 단계에서 보지 못했던 어휘에 대해 토큰을 생성하는 방법 학습할 때 능력이 떨어지는 것으로 분석한다고 말한다. 이 말이 잘 와닿진 않지만 일단은 PPTOD base 모델이 가장 좋다고 한다. 

 

이 논문의 저자들은 또 PPTOD 모델이 적은 데이터셋에서도 좋은 성능이 나는지 보기 위해서 학습 데이터셋을 1% 썼을 때, 5% 썼을 때, 10% 썼을 때, 20% 썼을 때에 대해 모델 성능을 비교했다. 참고로 표를 만들기 위해 5회 모델 학습에 대한 평균성능을 기재했다. 

1% 학습 데이터셋만으로도 다른 모델들보다 성능이 뛰어남을 보인다. 여기까지가 1. end-to-end dialogue modeling에 대한 벤치마크 평가다. 이외의 2. DST 측면의 평가와 3. user intent classification 측면의 평가도 모두 PPTOD large 모델이 우수하다는 것을 말하므로 생략한다. 

 

마지막으로 Inference latency 측면에서 PPTOD 모델(plug-and-play)이 기존의 cascaded 방식의 모델들보다 비약적으로 빨라졌다. 

 

200ms도 느리진 않지만 서비스 측면에서는 14배 빠른 end-to-end (plug-and-play) 모델이 경쟁력을 보일 수 있을 것이다. 

 

끝으로 이 논문의 핵심 컨트리뷰션은 기존의 챗봇이 NLU, DST, DP, NLG와 같이 모듈화되어 있었다면 이 모듈화된 것을 end-to-end 방식으로 바꿨다는 데 있다. 또 이를 통합함으로써 자연스럽게 모델 추론 속도가 향상되었다는 점이다. end-to-end 모델의 우수성은 이 논문의 PPTOD 모델로 증명되었다. 다만 TOD 챗봇을 위해 다양한 도메인의 데이터셋이 만들어진다면 앞으로 많은 end-to-end 연구가 이루어질 수 있을 것이라 생각한다.

multi-domain dialogue system의 기본적인 내용을 살펴보다 보니 2011년 논문까지 거슬러 올랐다. 이 논문은 multi-domain 챗봇 시스템에서 도메인을 어떻게 결정할 것이냐?는 문제에 대한 답을 한다. 이런 multi-domain 챗봇 시스템은 기존 single-domain 챗봇 시스템에서 새로운 도메인이 추가될 때 확장성이 요구되지만 이를 어떻게 충족시킬 수 있을까하는 물음으로부터 시작된 연구다. multi-domain 챗봇 시스템이 사용했던 기존의 방식은 사용자 발화에 대한 도메인을 결정하고 그 도메인에 해당하는 하위도메인 시스템이 결정되는 구조였다. 하지만 이는 한 가지 문제점이 있다. 만약 multi-turn일 경우 다음 이야기 하는 발화의 도메인이 이전에 했던 발화의 도메인과 의도치 않게 바뀔 수 있다는 것이다. 따라서 보통 이전 발화 정보에 가중치를 줘서 도메인 결정에 함께 사용했다고 한다. 하지만 이 방법 또한 한계점이 있다고 하는데 여기에 대해 언급된 바는 없었다. (유추해 보자면 가중치와는 무관하게 도메인이 추가될 때마다 새로 학습해야하는 것이 아닐까?)

 

다만 이 도메인 결정 문제를 해결하기 위해 3가지 요소를 사용했다. 단어, 화행, 이전 발화 도메인 정보다. 단어는 화자의 발화에 포함된 단어이고 화행은 용어에서 직관적인 이해가 되지 않았지만 예를 들어 만약 restaurant 도메인이면 ask_food, ask_location 등이 화행이다. 이전 발화 도메인 정보는 말 그대로 어떤 도메인인지를 말한다. 가령 이 논문에서는 아래와 같이 Weather, Resfinder, Songfinder, TVGuide라는 4가지 도메인을 사용했다. 

딥러닝 연구가 막 약동하기 시작할 때 쯤이여서 사용한 모델은 통계기반의 최대 엔트로피 모델을 사용했다. 결과적으로 이전 발화도메인 정보와 화행을 추가했을 때 아래와 같이 성능이 제일 높게 나왔다고 한다.

데이터셋이 적어서 일반화된 성능이라 보기는 어려울 것으로 보인다. 다만 이 논문을 통해 얻게 된 인사이트는 챗봇 시스템 구현에 있어서 이전 발화 도메인과 화행 정보를 고려해서 무분별한 도메인 전환이 이뤄지지 않도록 해야한다는 것이다. 아래의 대화 예제처럼 말이다.

 

또 도메인 결정 문제도 하나의 연구 주제가 될 수 있음을 알게 되었다. 최근 연구들에서는 어떻게 도메인 결정 문제를 해결하고 있을지 궁금해지는 대목이다.

 

발생 에러 메시지

return torch.stack(batch, 0, out=out)
RuntimeError: stack expects each tensor to be equal size, but got [3, 64] at entry 0 and [4, 64] at entry 1

 

데이터로더로부터 데이터를 가져와 학습시키기 전 발생한 오류다. 텐서 사이즈가 동일하지 않다고 한다. 원인은 배치 사이즈 별로 데이터가 묶이지 않아 발생한다고 한다. 직접적으로 텐서 사이즈를 조작 해주어도 잘 해결되지 않았다. 해결 방법은 DataLoader에 collate_fn 파라미터를 사용해줌으로써 학습을 시작할 수 있었다. collate_fn 파라미터는 텐서 간의 사이즈를 조절하는 역할을 한다. 아래와 같이 함수 하나를 만들어 주고 DataLoader(dataset, ..., collate_fn = collate_fn)과 같은 형식으로 입력해주면 문제 해결 가능하다.

 

def collate_fn(batch):
    return tuple(zip(*batch))

 

 

1. 분산 학습 개요

최근 몇 년간 Large Language Model을 만드는 추세가 계속해서 이어지고 있다. 이런 거대 모델의 경우 파라미터를 역전파로 업데이트하기 위해 많은 양의 메모리와 컴퓨팅 파워가 필요하다. 따라서 여러 프로세서에 분산시켜 모델을 학습하는 분산 학습이 필요하다. 분산 학습을 통해 CPU 또는 GPU 상의 학습 속도 향상을 이룰 수 있다. 많은 사람들이 사용하는 딥러닝 라이브러리인 파이토치에서 이런 분산 학습을 돕는 아래 API들이 있다.

 

1. torch.multiprocessing

  • 여러 파이썬 프로세스를 생성하는 역할. 일반적으로 CPU나 GPU 코어 수 만큼 프로세스 생성 가능

2. torch.distributed

  • 분산 학습을 진행할 수 있도록 각 프로세스 간의 통신을 가능하게 하는 일종의 IPC 역할

3. torch.utils.data.distributed.DistributedSampler

  • 학습 데이터셋을 프로세스 수 만큼 분할해 분산 학습 세션의 모든 프로세스가 동일한 양의 데이터로 학습하도록 만드는 역할.
  • 프로세스 수만큼 나누기 위해 world_size라는 인자를 사용. 

4. torch.nn.parallel.DistributedDataParallel

해당 API는 내부적으로 5가지 동작이 이뤄짐

  • 분산 환경에서 각 프로세스마다 고유한 모델 사본이 생성됨.
  • 고유 모델 사본 별 자체 옵티마이저를 갖고, 전역 이터레이션과 동기화됨
  • 각 분산 학습 이터레이션에서 개별 loss를 통해 기울기가 계산되고, 각 프로세스의 기울기 평균을 구함
  • 평균 기울기는 매개변수를 조정하는 각 모델 복사본에 전역으로 역전파됨.
  • 전역 역전파 때문에 모든 모델의 매개변수는 이터레이션마다 동일하도록 자동으로 동기화됨. 

 

2. 분산 학습 루틴 정의

그렇다면 실제로 위 API를 이용해 어떻게 분산 학습 루틴을 정의할 수 있을까? 아래 예시 코드를 통해 알아보자.

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

import torch.multiprocessing as mp
import torch.distributed as dist

import os
import time
import argparse

class ConvNet(nn.Module):
	pass
    
def train(gpu_num, args):
    rank = args.machine_id * args.num_gpu_processes + gpu_num                        
    dist.init_process_group(backend='nccl', init_method='env://', world_size=args.world_size, rank=rank) 
    torch.manual_seed(0)
    model = ConvNet()
    torch.cuda.set_device(gpu_num)
    model.cuda(gpu_num)
    criterion = nn.NLLLoss().cuda(gpu_num) # nll is the negative likelihood loss
    
    train_dataset = datasets.MNIST('../data', train=True, download=True,
                                   transform=transforms.Compose([
                                       transforms.ToTensor(),
                                       transforms.Normalize((0.1302,), (0.3069,))]))  
                                       
    train_sampler = torch.utils.data.distributed.DistributedSampler(
        train_dataset,
        num_replicas=args.world_size,
        rank=rank
    )
    train_dataloader = torch.utils.data.DataLoader(
       dataset=train_dataset,
       batch_size=args.batch_size,
       shuffle=False,            
       num_workers=0,
       pin_memory=True,
       sampler=train_sampler)
       
    optimizer = optim.Adadelta(model.parameters(), lr=0.5)
    model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu_num])
    model.train()
    for epoch in range(args.epochs):
        for b_i, (X, y) in enumerate(train_dataloader):
            X, y = X.cuda(non_blocking=True), y.cuda(non_blocking=True)
            ... 
            if b_i % 10 == 0 and gpu_num==0:
                print (...)
            
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--num-machines', default=1, type=int,)
    parser.add_argument('--num-gpu-processes', default=1, type=int)
    parser.add_argument('--machine-id', default=0, type=int)
    parser.add_argument('--epochs', default=1, type=int)
    parser.add_argument('--batch-size', default=128, type=int)
    args = parser.parse_args()
    
    args.world_size = args.num_gpu_processes * args.num_machines                
    os.environ['MASTER_ADDR'] = '127.0.0.1'              
    os.environ['MASTER_PORT'] = '8892'      
    start = time.time()
    mp.spawn(train, nprocs=args.num_gpu_processes, args=(args,))
    print(f"Finished training in {time.time()-start} secs")
    
if __name__ == '__main__':
    main()

 

ConvNet 클래스는 코드가 길어지지 않도록 한 것으로 모종의 컨볼루션 레이어가 들어있다고 가정한다. 분산 학습의 루틴이 되는 로직의 핵심은 train 함수에 들어있다. 살펴보면 가장 먼저 첫번째로 rank가 할당된다. rank는 전체 분산시스템에서 프로세스 순서를 지칭하는 것이다. 예를 들어 4-CPU를 가진 시스템 2개가 있다면 8개의 프로세스를 생성할 수 있고 각 프로세스는 0~7번까지 고유 번호를 가질 것이다. rank를 구하기 위해 사용하는 식은 $rank = n*4 + k$이다. 여기서 n은 시스템 번호(0, 1)이 되고, k는 프로세스 번호(0, 1, 2, 3)이다. 참고로 rank가 0인 프로세스만 학습에 대한 로깅을 출력한다. 그 이유는 rank가 0인 프로세스가 다른 프로세스와의 통신의 주축이 되기 때문이다. 만약 그렇지 않다면 프로세스 개수만큼 로그가 출력될 것이다.

 

두 번째로 dist.init_process_group 메서드가 보인다. 이는 분산 학습을 진행하는 각 프로세스 간의 통신을 위해 사용한다.  정확히는 매개변수로 들어가는 backend 인자가 그 역할을 한다. pytorch에서 지원하는 backend 인자에는 Gloo, NCCL, MPI가 있다. 간단히 언급하면 주로 Gloo는 CPU 분산 학습에 NCCL은 GPU 분산 학습에, MPI는 고성능 분산 학습에 사용된다. init method는 각 프로세스가 서로 탐색하는 방법으로 URL이 입력되며 기본값으로 env://이 설정된다. world_size는 분산 학습에 사용할 전체 프로세스 수다. world_size 수 만큼 전체 학습 데이터셋 수가 분할된다.

 

세 번째로 torch.utils.data.distributed.DistributedSampler다. world_size 수 만큼 데이터셋을 분할하고 모든 프로세스가 동일한 양의 데이터셋을 갖도록 한다. 이후에 DataLoader가 나오는데 shuffle=False로 설정한 것은 프로세스 간 처리할 데이터셋의 중복을 피하기 위함이다. 

 

네 번째로 torch.nn.parallel.DistributedDataParallel다. 분산 환경에서 사용할 각각의 모델 복사본을 생성한다. 생성된 각 모델 복사본은 각자 옵티마이저를 갖고, loss function으로부터 기울기를 계산하고 rank 0을 가지는 프로세스와 통신하여 기울기의 평균을 구하고 rank 0을 갖는 프로세스로부터 평균 기울기를 받아 역전파를 수행한다. 이 DistributedDataparallel을 사용하면 각각 독립된 프로세스를 생성하므로 파이썬 속도 한계의 원인인 GIL 제약이 사라져 모델 학습 속도를 늘릴 수 있다는 장점이 있다. 

 

다섯 번째로 MASTER_ADDR은 rank가 0인 프로세스를 실행하는 시스템의 IP 주소를 의미하고 MASTER_PORT는 그 장치에서 사용 가능한 PORT를 의미한다. 이는 앞서 언급했듯 rank 0인 시스템이 모든 백엔드 통신을 설정하기 때문에 다른 프로세스들이 호스팅 시스템을 찾기 위해 사용한다. IP에 따라 local이나 remote를 설정해 사용할 수 있다. 

 

마지막으로 데이터로더에 pin_memory=True는 데이터셋이 로딩 된 장치(ex: CPU)에서 다양한 장치(GPU)로 데이터를 빠르게 전송할 수 있도록 한다. 예를 들어 데이터셋이 CPU가 사용하는 고정된 page-lock 메모리 영역에 할당되어 있다면 GPU는 이 CPU의 page-lock 메모리 영역을 참조하여 학습도중 필요한 데이터를 복사해 사용한다. pin_memory=True는 참고로 학습 루틴 상에서 non_blocking=True라는 매개변수와 함께 동작한다. 결과적으로 GPU 학습 속도가 향상되는 효과를 가져온다.

훈련이 완료된 ML/DL 모델은 실제 서비스에 적용하기 위해 프로덕션 레벨로 배포하고 서빙해야한다. 이를 위한 모델 서빙 종류는 로컬에 플라스크로 모델 서버 구축, 도커로 모델 서빙 환경 구축, 토치서브로 모델 서빙하는 방법, 클라우드로 모델 서빙하는 방법이 있다. 모델 서빙은 모델 훈련이 완료되고 저장되어 있어 재사용가능 할때 이루어지므로 간단히 모델 저장 방법에 대해 언급하고 넘어가자.

 

파이토치 모델 저장 및 로딩 방법

파이토처리 모델을 저장하고 로딩하는 방법은 2가지가 있다. 첫 번째 방법은 가장 간단한 방식으로 전체 모델 객체를 저장하고 로딩하는 방식이다.

torch.save(model, PATH)
model = torch.load(PATH)

모델 전체를 저장하므로 간편하다. 하지만 이 방식은 경우에 따라 문제가 발생할 수 있다. 그 이유는 모델 객체 전체를 저장하기 때문에 모델 객체 내부에 매개변수, 모델 클래스, 디렉터리 구조까지 함께 저장한다. 따라서 만약 추후 디렉터리 구조라도 변경된다면 모델 로딩에 실패하게 되고 문제 해결이 어려워질 수 있다. 따라서 모델 매개변수만 저장하는 아래 두 번째 방법을 사용하는 것이 좋다.

torch.save(model.state_dict(), PATH)
model = ConvNet()
model.load_state_dict(torch.load(PATH))

모델 매개변수만 저장하기 위해 state_dict() 메서드를 사용하고, 추후 빈 모델 객체를 인스턴스화해 빈 모델 객체에 매개 변수를 로딩해 사용하는 방법이다. 간단히 파이토치 모델 저장하는 방법을 알아보았으니 다음으로 이 모델을 배포하여 서빙하는 방법에 대해 알아보자. 

 

1. 플라스크로 로컬에 모델 서버 구축하기

가장 간단하게 설치해 사용할 수 있는 플라스크를 서버로 사용한다. 플라스크 서버에 입력(추론 요청)이 들어오면 추론한 결과를 출력값으로 하여 되돌려 보내준다. 중요한 것은 플라스크 서버 내부에서 추론해 결과를 돌려주는 일이므로 추론 함수 작성이 필요하다. 예시를 위해 MNIST 숫자를 예측하는 모델이라 가정한다. 그렇다면 다음과 같은 추론 함수를 작성할 수 있다. 

def run_model(input_tensor):
    model_input = input_tensor.unsqueeze(0)
    with torch.no_grad():
        model_output = model(model_input)[0]
    model_prediction = model_output.detach().numpy().argmax()
    return model_prediction

 

이 추론 함수를 플라스크 서버 내부에서 동작시키려면 플라스크 서버의 코드는 다음과 같은 형태로 작성할 수 있다. 

 

import os
import json
import numpy as np
from flask import Flask, request

import torch
import torch.nn as nn
import torch.nn.functional as F

class ConvNet(nn.Module):
    def __init__(self):
        super(ConvNet, self).__init__()
        self.cn1 = nn.Conv2d(1, 16, 3, 1)
        self.cn2 = nn.Conv2d(16, 32, 3, 1)
        self.dp1 = nn.Dropout2d(0.10)
        self.dp2 = nn.Dropout2d(0.25)
        self.fc1 = nn.Linear(4608, 64) # 4608 is basically 12 X 12 X 32
        self.fc2 = nn.Linear(64, 10)
 
    def forward(self, x):
        x = self.cn1(x)
        x = F.relu(x)
        x = self.cn2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dp1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dp2(x)
        x = self.fc2(x)
        op = F.log_softmax(x, dim=1)
        return op
    
model = ConvNet()
PATH_TO_MODEL = "./convnet.pth"
model.load_state_dict(torch.load(PATH_TO_MODEL, map_location="cpu"))
model.eval()

def run_model(input_tensor):
    model_input = input_tensor.unsqueeze(0)
    with torch.no_grad():
        model_output = model(model_input)[0]
    model_prediction = model_output.detach().numpy().argmax()
    return model_prediction

def post_process(output):
    return str(output)

app = Flask(__name__)

@app.route("/test", methods=["POST"])
def test():
    data = request.files['data'].read()
    md = json.load(request.files['metadata'])
    input_array = np.frombuffer(data, dtype=np.float32)
    input_image_tensor = torch.from_numpy(input_array).view(md["dims"])
    output = run_model(input_image_tensor)
    final_output = post_process(output)
    return final_output

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8890)

 

위와 같이 작성된 플라스크 서버를 구동시키면 입력을 기다릴 것이고 입력 request를 날려주는 함수를 하나 작성하면 다음과 같이 작성할 수 있다.

 

import io
import json
import requests
from PIL import Image
from torchvision import transforms

def image_to_tensor(image):
    gray_image = transforms.functional.to_grayscale(image)
    resized_image = transforms.functional.resize(gray_image, (28, 28))
    input_image_tensor = transforms.functional.to_tensor(resized_image)
    input_image_tensor_norm = transforms.functional.normalize(input_image_tensor, (0.1302,), (0.3069,))
    return input_image_tensor_norm

image = Image.open("./digit_image.jpg")
image_tensor = image_to_tensor(image)
dimensions = io.StringIO(json.dumps({'dims': list(image_tensor.shape)}))
data = io.BytesIO(bytearray(image_tensor.numpy()))

res = requests.post('http://localhost:8890/test', files={'metadata': dimensions, 'data' : data})
response = json.loads(res.content)

print("Predicted digit :", response)

 

위와 같이 작성한 추론 요청 함수를 실행시키면 플라스크 모델 서버로부터 추론한 결과를 받을 수 있게 된다. 하지만 이와 같이 구축된 모델 추론 파이프라인을 다른 환경에서 동일하게 구축하려면 수동으로 동일한 라이브러리 설치부터 폴더구조를 맞춰주고 파일을 복사하는 등의 작업이 필요하다. 때문에 이 환경을 그대로 복제하여 다른 환경에서도 사용할 수 있도록 하는 확장성이 필요하다. 이 때 사용할 수 있는 것이 도커다. 도커를 통해 손쉽게 위 환경을 복제하고 실행하는 방법에 대해 알아보자. 

 

 

2. 도커로 모델 서빙 환경 구축하기

도커를 사용하면 서버 구축에 사용되었던 소프트웨어 환경을 손쉽게 만들 수 있다.

 

2.1 Dockerfile 만들기

도커를 사용하기 위해서는 가장 먼저 Dockerfile을 만들어야 한다. Dockerfile을 실행하면 결과적으로 위 플라스크로 구축했던 모델 서빙환경이 그대로 재현되어야 한다. Dockerfile 실행하면 도커 이미지가 생성되고 이 과정을 이미지 빌드라고 한다. 이를 위해 Dockerfile에 다음과 같은 스크립트를 작성할 수 있다.

FROM python:3.8-slim 

RUN apt-get -q update && apt-get -q install -y wget

COPY ./server.py ./
COPY ./requirements.txt ./

RUN wget -q https://raw.githubusercontent.com/wikibook/mpytc/main/Chapter10/convnet.pth
RUN wget -q https://github.com/wikibook/mpytc/raw/main/Chapter10/digit_image.jpg

RUN pip install --no-cache-dir -r requirements.txt

USER root
ENTRYPOINT ["python", "server.py"]

 

간단히 각 명령에 대해 설명하자면 FROM을 통해 도커에 python3.8이 포함된 표준 리눅스 OS를 가져오도록 지시하고, RUN을 통해 업데이트 하고 wget을 다운로드 한다. 이후 COPY를 통해 로컬에 만들어두었던 서버 파일과 환경구축에 필요한 requirements.txt 파일을 복사해준다. 이후 추론에 필요로한 모델과 예제 이미지를 다운로드 받고, pip install을 통해 파이썬 라이브러리를 모두 설치해준다. 참고로 requirements.txt에는 다음과 같은 라이브러리가 들어있다. (만약 동작하지 않는다면 Flask를 업데이트 할 것) 

torch==1.5.0
torchvision==0.5.0
Pillow==6.2.2
Flask==1.1.1

USER를 통해 루트 권한을 부여하고 ENTRYPOINT를 통해 python server.py 명령이 실행되면 도커상에서 플라스크 모델 서버가 실행된다. 

 

2.2 도커 이미지 빌드

도커 파일이 작성되었으면 이를 빌드해주어 도커 이미지로 만들어주어야 한다. 이를 위해 다음과 같은 명령어를 사용한다.

docker build -t <tag name> .

여기서 <tag name>은 임의 설정이다. 

 

2.3 도커 이미지 배포 (실행)

도커 이미지 빌드를 통해 만들어진 도커 이미지를 다음과 같은 명령 실행을 통해 배포가 가능하다.

docker run -p 8890:8890 <tag name>

 

위 명령이 실행되면 다음과 같이 도커 이미지가 실행되어 모델 서빙이 가능하도록 플라스크 서버가 구동되는 것을 확인할 수 있다.

 

구동된 플라스크 서버에 다시 추론을 요청하는 request를 날리면 이에 대한 응답이 결과로 반환되는 것을 확인할 수 있다.

 

도커 실행을 중지하려면 현재 실행 중인 도커 컨테이너를 확인해야 하며 이는 docker ps -a 명령으로 확인할 수 있다. 그러면 CONTAINER ID를 확인할 수 있고 이를 복사하여 docker stop <CONTAINER ID> -t 0을 실행시켜주면 도커 컨테이너가 중지된다. 만약 도커 컨테이너를 삭제하고 싶다면 docke rm <CONTAINER ID> 명령을 실행하면 된다. 빌드했던 도커 이미지까지 삭제하려면 docker images 명령을 통해 <tag name>을 확인하고 docker rmi <tag name>을 통해 삭제할 수 있다.

 

3. 토치서브로 모델 서빙하기

torchserve는 파이토치 모델 서버 라이브러리다. 메타와 AWS에서 만들었고, 파이토치 모델 배포를 도와주는 역할을 한다. torchserve를 사용하기 위해서 Java 11 SDK 설치와, pip install torchserve torch-model-archiver 명령 실행을 통해 라이브러리 설치가 우선적으로 필요하다. 여기서 torch-model-archiver는 압축 라이브러리다. 입력값을 3개 받아 .mar 파일로 만들어주는 역할을 한다. 입력값 3개란 1. 모델 클래스 파일 2. 훈련된 모델 파일 3. 핸들러(전처리, 후처리) 파일이다.

 

1. 클래스 파일은 모델 레이어가 구성된 파일로, 앞서 플라스크 서버 구축때 사용할 때 정의했던 ConvNet 클래스를 별도 파일로 만들어준 것이다. 

2. 훈련된 모델 파일은 convnet.pth와 같이 모델 학습이 완료된 파일이며

3. 핸들러는 torchvision의 transform 클래스와 같이 전처리를 할 수 있는 로직이나 별도의 후처리 로직이 담긴 파일이다.

 

torch-model-archiver를 사용해 아래 명령을 실행시켜주면 목표로하는 convnet.mar 파일이 생성된다. 

torch-model-archiver --model-name convnet --version 1.0 \
--model-file ./convnet.py --serialized-file ./convnet.pth --handler ./convnet_handler.py

이후 아래 명령을 통해 새 디렉터리를 생성하고 생성한 convnet.mar 파일을 옮겨준다.

mkdir model_store
mv convnet.mar model_store

이후 토치서브를 이용해 모델 서버를 런칭하는 명령 실행을 통해 서버를 구동해준다.

torchserve --start --ncs --model-store model_store --models convnet.mar

이후 curl을 활용해 토치서브 모델 서버에 추론 요청을 수행할 수 있다. 핸들러에 의해 어떤 입력이라도 전처리되어 텐서로 바뀌기 대문에 별도의 코드를 작성하지 않아도 된다.

curl http://127.0.0.1:8080/predictions/convnet -T ./digit_image.jpg

결과적으로 추론한 결과 숫자가 터미널에 출력될 것이다. 만약 토치서브로 구동한 서버를 종료하고 싶다면 torchserve --stop 명령을 통해 중지할 수 있다. 덧붙여 만약 서버의 모델 서빙 여부를 확인하기 위한 핑을 보내는 명령은 다음과 같이 사용할 수 있다. 참고로 포트 정보는 torchserve 실행에 의해 서버가 실행될 때 로그를 통해 확인 가능하다.

curl http://localhost:8081/models

 

 

4. 토치스크립트로 범용 파이토치 모델 만들기

지금까지는 로컬 플라스크 서버, 도커 환경, 토치서브 환경을 통해 모델 서버를 구현했다. 하지만 이는 파이썬 스크립트 환경에서 이루어진 것이다. 훈련한 모델이 반드시 훈련했던 환경에서 모델 서빙이 이뤄진다 할 수 없다. 파이썬이 실행되지 않는 외부환경에서도 실행될 수 있다. 따라서 파이토치 모델을 C++과 같은 타 언어에서도 실행될 수 있도록 중간 표현으로 만들 필요성이 있다. 이 때 토치스크립트를 사용할 수 있다. 토치스크립트는 별도의 라이브러리는 아니고 파이토치 내부에서 모델 연산 최적화를 위해 사용하는 JIT compiler에서 사용된다. 토치스크립트는 간단히 torch.jit.script(model) 또는 torch.jit.trace(model, input)를 실행시켜 만들 수 있다. 

 

파이토치 모델을 토치스크립트로 컴파일하기 위해선, 언급했듯 두 가지 방식이 존재한다. 첫 번째는 trace 방식이고 두 번째는 script 방식이다. trace 방식을 위해선 모델과 더미(임의) 입력값이 필요하다. 더미 입력값을 모델에 넣어서 입력값이 어떻게 흐르는지 추적해 기록한다. trace 방식을 사용해 모델을 만든다면 다음과 같은 형태로 만들 수 있다.

model = ConvNet()
PATH_TO_MODEL = "./convnet.pth"
model.load_state_dict(torch.load(PATH_TO_MODEL, map_location="cpu"))
model.eval()
for p in model.parameters():
    p.requires_grad_(False)
demo_input = torch.ones(1, 1, 28, 28)
traced_model = torch.jit.trace(model, demo_input)
# print (traced_model.graph)
# print(traced_model.code)
torch.jit.save(traced_model, 'traced_convnet.pt')
loaded_traced_model = torch.jit.load('traced_convnet.pt')

중간에 있는 torch.jit.trace() 메서드를 통해 토치스크립트 형식의 객체를 만들어주는 것이 핵심이다. 이후 모델을 저장하게 되면 C++과 같은 다른 언어에서도 파이토치 모델을 로딩해 추론할 수 있다. 위를 살펴보면 model과 loaded_trace_model 두 개가 있는데 향후 모델에 입력값을 넣고 추론해보면 추론 결과는 완벽히 동일하게 출력된다.

 

하지만 이러한 trace 방식을 이용하면 한 가지 큰 문제가 발생할 수 있다. 예를 들어 모델 순전파가 if나 for문과 같은 제어 흐름으로 구성된다면 trace는 여러 가능한 경로 중 하나만 토치 스크립트로 렌더링할 것이다. 따라서 기존 모델과 동일성을 보장할 수 없다. 이를 해결하기 위해서는 script 방식을 사용해 토치 스크립트로 컴파일 해야 한다. script 방식도 위 trace 방식과 동일하다. 다만 차이점이 있다면 더미 입력이 사용되지 않는 것이 특징이다. 

model = ConvNet()
PATH_TO_MODEL = "./convnet.pth"
model.load_state_dict(torch.load(PATH_TO_MODEL, map_location="cpu"))
model.eval()
for p in model.parameters():
    p.requires_grad_(False)
scripted_model = torch.jit.script(model)
# print(scripted_model.graph)
# print(scripted_model.code)
torch.jit.save(scripted_model, 'scripted_convnet.pt')
loaded_scripted_model = torch.jit.load('scripted_convnet.pt')

마찬가지로 중간에 있는 torch.jit.script() 메서드를 통해 토치스크립트 형식의 객체를 만들어주는 것이 핵심이다. 또 model과 loaded_scripted_model에 모두 입력값을 넣고 추론해보면 추론 결과가 동일하게 출력된다. 이렇게 script 방식을 사용해 토치스크립트 코드로 만들면 trace 방식에 비해 정확성을 더 얻을 수 있다. 다만 script 방식이 가지는 단점이 있다면 파이토치 모델이 토치스크립트에서 지원하지 않는 기능을 포함하면 동작할 수 없다. 이 때는 모델 순전파에 if for문 로직을 제거한 뒤 trace 방식을 사용해야 한다.

 

결론을 이야기하자면 사실상 토치스크립트는 모델 서빙을 위해 필수적으로 사용해야 한다. 그 이유는 파이썬 내부에서는 전역 인터프리터 잠금(GIL)이 설정되어 있어 한 번에 한 쓰레드만 실행될 수 있어 연산 병렬화가 불가능하기 때문이다. 하지만 토치스크립트를 통해 중간 표현으로 바꾸어 범용 형식으로 만들어두면 연산 병렬화가 가능해져 모델 서빙 속도 향상이 가능해진다. 

 

5. ONNX로 범용 파이토치 모델 만들기

토치스크립트와 마찬가지로 ONNX 프레임워크를 사용해도 파이토치 모델을 범용화할 수 있다. 그렇다면 토치스크립트와의 차이점은 무엇일까? 토치스크립트는 파이토치에서만 사용할 수 있다면 ONNX는 텐서플로우나 이외의 딥러닝 라이브러리와 같이 더 넓은 범위에서 표준화 시킬 수 있다. 예를 들어 파이토치로 만든 모델을 텐서플로우에서 로드하여 사용할 수 있다. 이를 위해 ONNX 라이브러리 설치가 필요하다. pip install onnx onnx-tf 명령을 통해 필요한 라이브러리를 설치할 수 있다. 

 

학습한 파이토치 모델을 ONNX 포맷으로 바꾸어 저장하기 위해서는 다음과 같은 예시 코드를 사용할 수 있다.

model = ConvNet()
PATH_TO_MODEL = "./convnet.pth"
model.load_state_dict(torch.load(PATH_TO_MODEL, map_location="cpu"))
model.eval()
for p in model.parameters():
    p.requires_grad_(False)
demo_input = torch.ones(1, 1, 28, 28)
torch.onnx.export(model, demo_input, "convnet.onnx")

토치스크립트에서 trace 방식과 마찬가지로 더미 입력값이 필요하다. 핵심은 torch.onnx.export() 메서드를 사용해 onnx 포맷 형식으로 파일을 생성하는 것이다. 이후 텐서플로우 모델로 변환하기 위해서는 다음과 같은 예시 코드를 사용할 수 있다.

 

import onnx
from onnx_tf.backend import prepare

model_onnx = onnx.load("./convnet.onnx")
tf_rep = prepare(model_onnx)
tf_rep.export_graph("./convnet.pb")

 

tf.rep.export_graph() 메서드가 실행되면 텐서플로우에서 사용 가능한 모델 파일인 convnet.pb가 생성된다.

 

이후 텐서플로우에서 사용하기 위해 모델 그래프를 파싱하는 과정이 필요하다. (참고로 TF 1.5 버전) 

with tf.gfile.GFile("./convnet.pb", "rb") as f:
    graph_definition = tf.GraphDef()
    graph_definition.ParseFromString(f.read())
    
with tf.Graph().as_default() as model_graph:
    tf.import_graph_def(graph_definition, name="")
    
for op in model_graph.get_operations():
    print(op.values())

 

그래프를 파싱하고 그 결과를 출력하면 아래와 같이(간략화된) 그래프의 입출력 노드 정보를 확인할 수 있다. 

(<tf.Tensor 'Const:0' shape=(16,) dtype=float32>,)
...
(<tf.Tensor 'input.1:0' shape=(1, 1, 28, 28) dtype=float32>,)
...
(<tf.Tensor '18:0' shape=(1, 10) dtype=float32>,)

 

입출력 노드 정보를 기반으로 모델의 입출력을 지정해줄 수 있고 텐서플로우를 통해 추론하면 아래와 같이 예측 확률분포가 출력되는 것을 확인할 수 있다. 

model_output = model_graph.get_tensor_by_name('18:0')
model_input = model_graph.get_tensor_by_name('input.1:0')

sess = tf.Session(graph=model_graph)
output = sess.run(model_output, feed_dict={model_input: input_tensor.unsqueeze(0)})
print(output)

> [[-9.35050774e+00 -1.20893326e+01 -2.23922171e-03 -8.92477798e+00
  -9.81972313e+00 -1.33498535e+01 -9.04598618e+00 -1.44924192e+01
  -6.30233145e+00 -1.22827682e+01]]

 

지금까지의 전체 과정을 요약하자면 파이토치 모델을 서빙하는 방법과 모델을 범용화 시키는 방법에 대해 알아봤다. 모델 서빙을 위해서는 크게 3가지로 로컬에서 플라스크 서버를 실행시켜 서빙하는 방법, 플라스크 서버를 도커 이미지화 하는 방법, 토치서브를 통해 서빙하는 방법이 있었다. 도커 이미지화를 위해선 Dockerfile을 생성해주고 몇몇 명령어를 실행시켜주었고, 토치서브를 실행시키기 위해 torch-model-archiver를 통해 .mar 파일로 만들어준 뒤, 서버를 실행시켜 추론 결과를 응답해주었다. 다음으로 파이토치 모델의 범용화를 위해 토치스크립트를 사용해 모델 범용화하는 방법과 ONNX를 통해 모델 범용화하는 방법이다.

 

잘 이뤄지는 모델 학습이  중요한 만큼 서비스와 제품화를 위해 모델을 배포하고 서빙하는 과정도 중요하다. 특히 실제 사용되는 서비스에서는 속도가 중요하므로 토치스크립트나 ONNX를 통해 모델을 범용 포맷으로 만듦으로써 파이썬 속도 한계의 원인인 전역 인터프리터 잠금(GIL)으로부터 벗어날 수 있다. 이후 플라스크 서버나 토치서브 환경을 도커 컨테이너로 만들어 둔다면 개발 속도 측면이나 효율성 등의 측면에서 많은 이점을 얻을 수 있을 것이다. 글을 작성하며 사용한 예시 코드의 전체 코드는 아래 링크를 참조하였다.

 

코드: https://github.com/wikibook/mpytc/tree/main/Chapter10

 

Reference

[1] 실전! 파이토치 딥러닝 프로젝트

 

Rasa 개요

챗봇을 구축하기 위해 사용할 수 있는 라이브러리로는 Rasa가 있고, 한글 버전으로는 Kochat이 있다. 웹 인터페이스 기반으로 코드를 사용하지 않고 쉽게 챗봇을 만들 수 있는 DialogFlow라는 클라우드 서비스가 있지만 간단한 챗봇을 만들 때만 쓸 수 있고, 디테일한 구현이 필요할 땐 Rasa와 같은 오픈소스 라이브러리를 써야한다. Rasa는 크게 Rasa NLU와 Rasa Core로 나뉜다. rasa nlu는 인텐트(intent) 분류와 엔티티(entity) 추출에 사용하는 라이브러리이다. 이를 통해 챗봇 커스터마이징이 가능하고 상상한 거의 모든 종류의 챗봇을 만들 수 있다고 한다. rasa core는 인텐트 추가와 같은 스케일업을 도와주는 라이브러리다. 이 rasa core를 통해 챗봇 응답과 챗봇 동작을 명시 가능하다. 이 명시되는 정보를 액션(Action)이라 하는데, 다르게 말하면 dialouge state에 응답하기 위해 취해야할 행동을 뜻한다. 더하여 rasa core는 과거 히스토리(대화내역)을 기반으로 다음에 이루어져야 할 액션을 예측하는 확률모델을 생성하는 역할을 한다. 

 

Rasa NLU

rasa nlu를 사용하면 학습과 추론을 매우 손쉽게 할 수 있다. 아래 예시 코드와 같이 라이브러리를 제외하면 단 9줄 가량의 코드로도 동작한다. 

from rasa_nlu.training_data import load_data
from rasa_nlu.model import Trainer
from rasa_nlu import config
from rasa_nlu.model import Interpreter

def train_horoscopebot(data_json, config_file, model_dir):
    training_data = load_data(data_json)
    trainer = Trainer(config.load(config_file))
    trainer.train(training_data)
    model_directory = trainer.persist(model_dir, fixed_model_name='horoscopebot')

def predict_intent(text):
    interpreter = Interpreter.load('./models/nlu/default/horoscopebot')
    print(interpreter.parse(text))

 

위 예시 코드는 별자리를 알려주는 태스크를 수행하는 챗봇을 구현하기 위해 사용된 것으로 사용자 입력이 들어오면 아래와 같이 추론 가능하다.

위 결과를 보면 네 개의 사전 정의한 인텐트(greeting, get_horoscope, dob_intent, subscription)중 예측한 인텐트에 대한 신뢰점수가 confidence로 계산되는 것을 확인할 수 있다. 사용자는 별자리를 알고 싶다 말했고, 학습을 진행했던 결과 사용자의 발화(utterance)는 별자리를 묻는 get_horoscope일 확률이 96.12%라는 것을 보여준다. 위와 같이 rasa nlu를 사용해 챗봇 시스템을 학습하고 추론하기 위해서는 간단한 아래 과정을 거친다.

 

1. 챗봇 구현 범위 설정

2. 데이터셋 생성 (data.json)

3. 모델 학습

4. 추론

 

네 가지지만 사실상 챗봇 구현 범위 설정과 데이터셋 생성이 대부분이다. 챗봇 구현 범위 설정이란 모종의 태스크를 수행하는 챗봇을 만든다 가정할 때 사용자가 어떤 종류의 의도를 가질 수 있을지 정하는 것이다. 가령 음식점에서 사용할 수 있는 챗봇을 만든다면 손님은, 주문하거나 결제하거나 화장실 위치를 묻거나 인사하거나 기타 의도를 가진 말을 할 것이다. 이러한 사용자 발화의 범위 설정이 챗봇 시스템 구현의 첫 단계이다. 이후엔 데이터셋 생성이다. rasa nlu를 통해 학습하기 위해서는 json 형태로 데이터를 만들어야 한다. 별자리를 알려주는 태스크를 학습하기 위해 생성한 데이터셋의 예시는 아래와 같다.

 

기본적으로 예상되는 사용자의 발화를 text에 입력하고 그에 따른 라벨이 되는 인텐트의 종류를 입력해준다. 빈 칸의 엔티티는 현재 별자리 운세를 알려주는 태스크에서는 사용되지 않는다. 하지만 간단한 설명을 하자면, 엔티티는 범주를 의미하는 것으로 가령 음식 주문 할 때 A라는 음식 2개를 가져달라고 요청할 수 있다고 가정하면 "음식 종류"라는 엔티티는 "A"라는 값을 가질 것이고 "음식 수량"이란 엔티티는 '2'라는 값을 가질 것이다. 이처럼 하나의 인텐트에는 여러 엔티티를 가질 수 있는 것이 특징이다. 

 

위와 같이 사용자 발화 별 인텐트와 엔티티를 지정하기 위해서 직접 json 포맷으로 작성해줄 수 있지만 쉽게 작성할 수 있는 웹 인터페이스가 있으면 좋을 것이다. rasa에서는 rasa-nlu-trainer라는 웹 인터페이스를 통해 데이터셋을 만드는 작업을 쉽게 할 수 있도록 한다. 일종의 annotation 도구인 것이다. rasa-nlu-trainer는 node.js 기반으로 동작하기 때문에 별도로 node.js를 설치해주어야 하고 sudo npm i -g rasa-nlu-trainer라는 명령어를 통해 설치할 수 있다. 여기서 i는 install이고 g는 global을 뜻한다. 이를 실행하게 되면 다음과 같이 웹으로 쉽게 데이터셋을 만들 수 있도록하는 인터페이스를 제공한다. 

 

 

데이터셋을 직접 만드는데 가장 시간이 많이 걸릴 것이다. 가급적이면 공개되어 있는 것을 가져올 수 있겠지만 내가 원하는 특정 태스크를 수행하기 위해서는 대부분 커스터마이징이 필요할 것이고 이에 따른 데이터는 직접 구축하는 경우가 많을 것이다. 

 

Rasa Core

rasa nlu를 통해 예측된 intent에 따라 챗봇의 응답과 동작이 이루어질 것이다. 이 때 사용하는 것이 rasa core이다. rasa core를 통해 챗봇 응답과 챗봇 동작을 명시할 수 있다. 챗봇에서 동작을 'Action'이라 한다. rasa core를 사용하면 과거의 대화내역을 기반으로 챗봇이 해야할 다음 Action을 예측하는 모델을 생성할 수 있다. 이를 위해 가장 먼저 도메인 파일이라 불리는 것을 만들어 주어야 한다. 도메인 파일이란 쉽게 말해 어떤 주제 속에서 대화가 이뤄지는지 정의하는 파일이다. 음식점에서 사용할 챗봇이면 도메인이 음식점이 되는 것과 같다. 이 도메인 파일은 크게 5가지 내용을 포함 해야한다. 1. 인텐트 2. 엔티티 3. 슬롯 4. 템플릿 5. 액션이다. 도메인 파일은 yml 확장자를 가지며 아래와 같은 형태로 정의할 수 있다.

 

 

1. 인텐트는 언급했듯 발화의 의도를 말한다.

2. 엔티티는 발화 속 캐치해야 할 핵심 키워드를 뜻하고

3. 슬롯은 키워드가 채워질 공간을 말한다. 별자리 별 운세를 알려주는 태스크에서는 월/일을 알아야 하므로 MM, DD같은 슬롯이 정의될 수 있다. 

4. 템플릿은 말 그대로 인텐트에 대한 기본 응답을 뜻한다. 여러 개의 기본 응답을 만들어 랜덤으로 사용한다면 다양하기에 사용자 경험에 있어 친근감을 줄 수 있는 요소가 될 것이다.

5. 액션은 사용자 발화에 따라 어떤 행동을 취할 수 있을지를 정의하는 것이다. 

 

이렇게 도메인 파일 내에 5개의 요소를 넣어 yml 파일로 정의했으면 다음으론 거의 끝으로 스토리 파일을 만들어 주어야 한다.

스토리 파일이란 대화가 어떤 시나리오로 흐를 수 있는지 정의한 파일이다. 파일은 마크다운 확장자로 만들어준다. 예시는 아래와 같다.

 

간단하다. 첫 번째 스토리는 챗봇이 사용자에게 인사하고 별자리가 무엇이냐고 물어본다. 이후 Capricorn(마갈궁자리)라는 응답을 받아 slot에 채워주고 현재 별자리 운세를 가져와서 보내주고 이 서비스를 구독하겠는지 묻고 끝나는 상황이다. 두 번째 스토리는 첫 번째와 달리 챗봇이 인사한 다음 사용자가 바로 별자리를 알려주고 cancer(게자리)라는 정보를 slot에 채우고 현재 별자리 운세를 가져와 사용자에게 전달하고 이 서비스를 구독하겠는지 묻고 사용자가 구독한다는(True) 발화를 캐치한 다음 구독자 목록에 추가하는 프로세스로 이뤄지는 것을 알 수 있다. 

 

사용자는 미리 정의된 템플릿에 따라 액션을 취할 수 있지만 이는 정적이다. 예컨데 운세를 가져오거나 구독자 추가와 같은 동적으로 이뤄저야 하는 것은 별도의 API를 사용해 외부 데이터베이스와 연결될 필요가 있다. 이 때는 Custom 액션이라 하여 파이썬 스크립트를 별도로 만들어 주어야 한다. 간단한 코드 예시는 다음과 같다.

 

class GetTodaysHoroscope(Action):
    def name(self):
        return "get_todays_horoscope"

    def run(self, dispatcher, tracker, domain):
        user_horoscope_sign = tracker.get_slot('horoscope_sign')
        base_url = http://horoscope-api.herokuapp.com/horoscope/{day}/{sign}
        url = base_url.format(**{'day':"today", 'sign':user_horoscope_sign})
        # http://horoscope-api.herokuapp.com/horoscope/today/capricorn
        res = requests.get(url)
        todays_horoscope = res.json()['horoscope']
        response = "Your today's horoscope:\n{}".format(todays_horoscope)

        dispatcher.utter_message(response)
        return [SlotSet("horoscope_sign", user_horoscope_sign)]

 

오늘 별자리 별 운세를 가져오기 위해 GetTodaysHoroscope라는 클래스를 만들어주고 내부에 run이라는 비즈니스 로직을 작성해줄 수 있다. run의 매개변수로 크게 dispatcher, tracker, domain이 들어가는 것을 알 수 있다. 먼저 tracker는 현재 상태 추적기로 슬롯에 접근해 값을 가져올 수 있고 이외에도 최근 사용자의 발화 정보 내역들에 접근할 수 있다. dispatcher는 별자리 정보를 기반으로 운세를 알려주는 API 호출 결과를 사용자에게 전달하는 역할을 한다. domain은 글자 그대로 이 챗봇의 도메인을 의미한다. 마지막으로 보면 SlotSet이 있는데 이는 추후 사용자 발화 정보를 히스토리로 활용하기 위해 슬롯이라는 공간에 저장하는 것이다. 

 

마지막으로 학습은 아래와 같이 간단한 로직으로 이루어진다.

""" Training a chatbot agent """
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

from rasa_core import utils
from rasa_core.agent import Agent
from rasa_core.policies.keras_policy import KerasPolicy
from rasa_core.policies.memoization import MemoizationPolicy
from rasa_core.policies.sklearn_policy import SklearnPolicy
import warnings
import ruamel
warnings.simplefilter('ignore', ruamel.yaml.error.UnsafeLoaderWarning)

if __name__ == "__main__":
    utils.configure_colored_logging(loglevel="DEBUG")
    training_data_file = "./data/stories.md"
    model_path = "./models/dialogue"
    agent = Agent("horoscope_domain.yml", policies=[MemoizationPolicy(), KerasPolicy(), SklearnPolicy()])
    training_data = agent.load_data(training_data_file)
    agent.train(
        training_data,
        augmentation_factor=50,
        epochs=500,
        batch_size=10,
        validation_split=0.2
    )
    agent.persist(model_path)

 

main을 살펴보면 1. 로깅 설정 2.스토리 파일 경로 설정 3. 모델 저장 경로 설정 4. 다음 액션을 가져오는 에이전트 세팅 5~6. 학습, 마지막으로 에이전트 오브젝트(모델)를 지정된 경로에 저장하여 재사용하는 로직으로 구성된다. 이렇게 해주면 에이전트 모델 학습이 이루어진다. rasa nlu를 통해 추론한 사용자 발화 의도를 기반으로 rasa core를 통해 학습한 에이전트로 다음 챗봇 Action을 수행할 수 있다. 

 

Reference

[1] 파이썬으로 챗봇 만들기 

엔트로피(Entropy)

물리학에서 무질서도라는 개념으로 사용된 Entropy는 확률통계학에서 확률분포의 불확실성을 나타내는 척도이다. 이 불확실성의 척도는 확률분포에 대한 정보량의 기대값으로 표현한다. 하나의 확률분포에 대한 불확실성은 entropy로 계산하고, 두 확률분포 간의 불확실성은 cross entropy를 통해 계산한다. entropy와 cross entropy는 다음과 같이 정의된다.

$\displaystyle H(x) = -\sum_{i=1}^n p(x_i)\log(p(x_i))$

위 식에서 확률값인 $p(x_i)$의 총합은 1이기 때문에 수식은 $\displaystyle H(x) = -\sum_{i=1}^n \log p(x_i)$와도 같다. 간단한 예시인 동전과 주사위를 통해 먼저 entropy를 계산해보자. 동전의 경우 확률이 $\displaystyle 1 \over 2$인 이산확률분포를 가지며 주사위의 경우 확률이 $\displaystyle 1 \over 6$인 확률분포를 가진다. 동전과 주사위를 던지는 상황에 대한 entropy는 각각 다음과 같이 계산된다.

$\displaystyle H(x) = - \left({1\over 2}\log{1\over 2} + {1\over 2}\log{1\over 2}\right) = 0.3010$

$\displaystyle H(x) = -\left({1\over 6}\log {1\over 6} + {1\over 6}\log {1\over 6} + {1\over 6}\log {1\over 6} + {1\over 6}\log {1\over 6} + {1\over 6}\log {1\over 6} + {1\over 6}\log {1\over 6}\right) = 0.7782$


위 예시를 통해 알 수 있는 것은 확률변수가 다양하게 나올 수 있는 경우, 즉 확률에 대한 불확실성이 큰 경우 entropy가 더 크다는 것이다.

 

크로스 엔트로피(Cross Entropy)

entropy가 확률분포 하나에 대한 불확실성을 나타내는 척도라면 cross entropy는 확률분포 두 개에 대한 불확실성을 나타내는 척도다. 즉 cross entpry는 두 확률분포 간의 차이를 구하기 위해 사용한다. 머신러닝에서 실제 데이터의 확률분포와 모델이 예측한 확률분포 간의 차이를 구하는 것이다. Cross Entropy는 다음과 같은 함수로 정의된다.

$\displaystyle H_{p,q}(x) = -\sum_{i=1}^n p(x_i)\log(q(x_i))$

위 수식에서 $p(x)$는 실제 데이터의 확률 분포를 의미하며 $q(x)$는 모델이 예측한 확률분포 분포를 의미한다. 이를 활용하여 머신러닝에선 손실함수로 사용할 수 있다. 데이터셋으로부터 실제 확률분포인 $p(x)$를 계산할 수 있고 만든 모델로 예측을 통해 $q(x)$를 알 수 있기 때문이다. 이 둘 간의 차이를 손실로 두어 이 손실이 줄어드는 방향으로 학습하는 것이다.

간단한 예시를 통해 크로스 엔트로피를 계산해보자. 만약 A, B 두사람이 있고 한 개의 상자안에 있는 RGB 색상을 갖는 공이 $0.8, 0.1, 0.1$의 비율로 들어가 있다고 가정하자. 하지만 A는 직감적으로 $0.6, 0.2, 0.2$의 비율로 들어 있을 것 같다 예측했고 B는 $0.4, 0.3, 0.3$으로 들어 있을 것 같다 예측할 경우 cross entropy는 다음과 같이 계산된다.

$\displaystyle H_{p, q}(x) = - \left({8\over 10} \log({6\over 10}) + {1\over 10} \log({2 \over 10}) + {1\over 10}\log({2 \over 10})\right) = 0.3173 $

$\displaystyle H_{p, q}(x) = - \left({8\over 10} \log({4\over 10}) + {1\over 10} \log({3 \over 10}) + {1\over 10}\log({3 \over 10})\right) = 0.4230$


A의 예측은 0.3173이고 B의 예측은 0.4230이다. 즉 예측과 멀어지면 멀어질수록 cross entropy가 증가하는 것이다. 이러한 수식을 이용해 ML/DL에서 cross entropy 값이 줄어드는(불확실성이 줄어드는) 방향으로 학습을 진행한다.


KL Divergence란?

쿨백-라이블러 발산(Kullback-Leibler Divergence, KLD)은 두 확률분포 간의 차이를 측정하는 함수다. 두 확률분포 간의 차이는 $H_{p, q}(x) - H(x)$로 계산한다. 즉 크로스 엔트로피에서 엔트로피를 빼준 값이 KL Divergence 값이다.

$\displaystyle H(x) = -\sum_{i=1}^n p(x_i)\log(p(x_i))$
$\displaystyle H_{p,q}(x) = -\sum_{i=1}^n p(x_i)\log(q(x_i))$

$\displaystyle KL(p || q) = H_{p, q}(x) - H(x) = \sum_{i=1}^n p(x_i) \log {p(x_i)\over q(x_i)}$


두 확률분포는 $p(x), q(x)$로 표현하며 각각 사전 확률분포, 사후 확률분포이다. 달리 말해 $p(x)$는 실제 확률분포고 $q(x)$는 예측 확률분포이다. KL Divergence의 수식은 다음과 같이 이산형과 연속형으로 나뉘어 정의된다.

$KL(p || q) = \begin{cases} \displaystyle \sum_{i=1}^n p(x_i) \log{p(x_i) \over q(x_i)} \mbox{(이산형)} \\ \int p(x) \log{p(x) \over q(x)}dx \mbox{(연속형)} \end{cases}$

 

예측 확률분포인 $q(x)$가 실제 확률분포인 $p(x)$에 가까이갈수록 KL Divergence 값은 0에 가까워진다. 그렇다면 Cross Entropy와 KL Divergence의 차이점은 무엇일까? 둘 다 공통적으로 두 확률분포 간의 차이를 측정하는 척도이자 함수이다. KL Divergence를 최소화하는 것은 Cross Entropy를 최소화하는 것과 같다. 하지만 다른 점은 KL- Divergence 내의 $p(x)\log p(x)$는 실제 확률분포로서 알 수 없는 분포다. $H(x)$를 모르므로 KL-Divergence를 손실함수로 적용할 수 없다. 따라서 이를 제외하고 남은 $-p(x)\log q(x)$인 cross entropy를 사용한다.

 

 

Reference

[1] [머신러닝] 크로스 엔트로피(cross entropy) (rcchun)
[2] Cross-entropy 의 이해: 정보이론과의 관계 (DEEPPLAY)
[3] CrossEntropy와 KL-divergence (Jeff_Kang)

마르코프 체인 정의

마르코프 체인(Markov Chain)은 마르코프 성질을 지닌 이산확률과정이다. 마르코프 성질이란 $n+1$회의 상태(state)는 $n$회의 상태나 그 이전 $n-1, n-2, \dots$의 상태에 의해 결정되는 것이다. 달리 말해 과거 상태가 현재/미래 상태에 영향을 미치는 성질이다. 이산확률과정이란 시간의 진행에 대해 확률적인 변화를 가지는 구조를 의미한다. 이 마르코프 체인은 때때로 단순하지만 강력한 효과를 발휘하기에 사용한다. 실제로 예측을 위해선 많은 변수과 모델링을 거쳐야 하지만 마르코프 체인은 이런 비용을 줄여주기 때문이다.

 

마르코프 체인 예시

마르코프 성질 여부에 대한 흔한 예시로는 동전 앞뒤 예측과 날씨예측이 있다. 동전 앞뒤를 예측하는 것은 독립시행이기 때문에 n번째 상태가 n+1번째 상태에 영향을 주지 않으므로 마르코프 성질이 없다. 반면 날씨 예측과 같이 직관적으로 오늘 날씨에 의해 내일 날씨가 결정될 수 있으므로 마르코프 성질이 있다고 할 수 있다. 만약 오늘을 기반으로 하루 뒤를 예측한다면 1차 마코프 모델이라하고 이틀 뒤를 예측한다면 2차 마코프 모델이라 한다. 

 

 

마르코프 체인 활용

마르코프 체인은 주로 결합확률분포(Joint Probability Distribution)에 사용된다. 예를 들어 확률 변수 $X_1, X_2, \dots, X_n$이 있다고 가정하면 일반적으로 이 확률변수들의 결합확률분포는 다음과 같이 계산할 수 있다.

 

$P(X_1, X_2, \dots, X_n) = P(X_1) \times P(X_2|X_1) \times P(X_3|X_2,X_1) \times \dots \times P(X_n|X_{n-1}, X_{n-2}, \dots, X_1)$

 

하지만 마르코프 성질을 이용하면 위 보다 더 단순한 계산을 통해 결합확률분포를 구할 수 있다.

만약 어떠한 상태의 시점이 $t$고, 확률분포가 마르코프 성질을 따른다면 

 

$P(X_{t+1}|X_t, X_{t-1}, \dots, X_2, X_1) = P(X_{t+1}|X_t)$

 

로 단순화 할 수 있고 일반화를 적용하면 이전에 결합확률분포의 계산을 다음과 같이 단순화 가능하다.

 

$P(X_1, X_2, \dots, X_n) = P(X_1) \times P(X_2|X_1) \times P(X_3|X_2) \times P(X_4|X_3) \times \dots \times P(X_n|X_{n-1})$

 

이러한 마르코프 체인은 주로 베이지안 통계학이나 강화학습에 사용되며, MCMC(Markov Chain Monte Carlo)와도 연결되어 사용된다.

 

마르코프 모델

마르코프 모델이란 마르코프 체인을 기반으로 만든 확률 모델이다. 아래 날씨 예제를 통해 내일 날씨를 예측하는 마르코프 모델을 만들어보자.  마르코프 모델을 만들기 위해선 가장 먼저 각 상태를 정의해야 한다. 그리고 이 상태는 상태 전이 확률로 나타낸다. 상태 전이 확률이란 어떤 상태에서 다른 상태로 이동할 확률을 의미한다. 상태 전이 확률을 행렬로 표현한 것을 전이 행렬(Transition Matrix)라고 한다.

 

전이 행렬(Transition Matrix)

 

이 전이 행렬을 해석하자면 만약 오늘 날씨가 Sunny라면 내일 Suuny일 확률이 0.7, Rainy일 확률이 0.2, Cloudy일 확률이 0.1이라 예측한다. 이를 기반으로 오늘이 어떤 특정 날씨일 때 내일이 어떤 특정 날씨일 확률을 구하기 위해서는 위 전이 행렬이 $T$라고할 때 $T \times T$처럼 곱해주면 된다. 만약 오늘 날씨를 기반으로 이틀 뒤 날씨를 예측하고 싶다면 $T \times T \times T$를 계산하면 된다. 이렇게 반복적으로 전이행렬을 곱해줌으로써 일련의 예측을 함께 연결한다면 이를 마르코프 체인이라 한다.

 

참고로 전이 행렬을 거듭 곱하다보면 더 이상 전이 행렬의 값이 변하지 않고 수렴하는 상태가 오는데 이를 안정 상태(Steady State)라고 한다. 즉 행렬 $T$는 거듭곱해 어떤 행렬 $M$이 되지만 수렴한 뒤에는 $MT = T$가 된다. 또 전이 행렬은 도식화 가능하다. 이 도식화한 것을 상태 전이도(State Transition Diagram)라고 하며 아래와 같다.

 

상태 전이도 (State Transition Diagram)

 

전이 행렬 또는 상태 전이도를 기반으로 마르코프 모델을 만들어 날씨 예측을 할 수 있다. 아래는 10일간 날씨 예측을 할 수 있는 간단한 파이썬 스크립트다.

 

import numpy as np

transition_matrix = np.atleast_2d([[0.7, 0.2, 0.1],
                                   [0.1, 0.6, 0.3],
                                   [0.2, 0.5, 0.3]])

possible_states = ["sunny", "rainy", "cloudy"]
start_state = "sunny"
num = 10
index_dict = {}
future_state = []

for index in range(len(possible_states)): # {'sunny': 0, 'rainy': 1, 'cloudy': 2}
    index_dict[possible_states[index]] = index

for _ in range(num):
    new_state = np.random.choice(possible_states, p=transition_matrix[index_dict[start_state], :])
    future_state.append(new_state)

print (future_state) 
"""
> ['rainy', 'sunny', 'rainy', 'sunny', 'sunny', 'rainy', 'cloudy', 'sunny', 'sunny', 'sunny']
It will be different whenever you run it. because I used np.random.choice
"""

 

 

용어 정리

마르코프 체인(Markov Chain): 상태 전이 확률을 기반으로 일련의 예측을 연결한 것

마르코프 성질(Markov Property): $n+1$ state는 $n$ state에 의해 결정되는 것처럼 상태들 간의 인과성이 있는 성질

이산확률 과정(Discrete Stochastic Process): 시간 흐름에 따라 확률적인 변화를 가지는 구조. 특히 시간 흐름이 이산적일때를 의미

연속확률 과정(Continous Stochastic Process): 시간 흐름에 따라 확률적인 변화를 가지는 구조. 특히 시간 흐름이 연속적일 때를 의미

마르코프 모델(Markov Model): 미래 상태에 대한 확률 값이 과거에만 종속된 모델

상태 전이 확률(State Transition Probability): 어떤 상태에서 다른 상태로 이동할 확률

전이 행렬(Transition Matrix): 전이 확률을 행렬 형태로 나타낸 것

상태 전이도(State Transition Diagram): 상태 전이 확률을 정리하여 만든 도식

안정 상태(Steady State): 전이 행렬을 거듭 곱해도 더 이상 변하지 않는 상태

정적분포(Stationary Distribution): 전이 행렬이 안정 상태일 때 갖는 확률분포

상태 공간(State Space): $0, 1, 2, \dots, n-1, n$ 시점에서의 확률과정의 상태들의 집합

 

Reference

[1] Markov Chains

[2] Markov Chain Text Generation – Part 1 (KEN GRAHAM)

[3] 마르코프 체인 (Markov Chain) 정의 및 예시 (Data to Impact)

 

+ Recent posts