Deep Learning/from scratch II

[밑바닥부터 시작하는 딥러닝 2] - 7장 RNN을 사용한 문장 생성 I

해파리냉채무침 2024. 2. 21. 01:25

언어 모델을 사용한 문장 생성

RNN을 사용한 문장 생성의 순서

시계열 데이터(T개분 만큼) 모아 처리하는 Time LSTM과 Time Affine 계층을 만들었다.

I라는 단어를 주었을 때 출력한 확률분포는 다음과 같이 보인다. 이것을 기초로 다음 단어를 생성하는 방법은 두가지가 있다. 

https://dltpgh7060.tistory.com/82

1) 확률이 가장 높은 단어 선택하기. 결과가 일정하게 정해짐 (결정적 방법)

2) 확률적으로 선택, 각 후보 단어의 확률에 맞게 선택하는것, 확률이 높은 단어는 선택되기 쉽고 확률이 낮은 단어는 선택되기 어렵다. 이 방식은 선택되는 단어가 매번 다르다. (확률적 결정 방법)

2번째 방법(확률적 선택)으로 샘플링을 수행한 결과로 say가 선택되었다. 다만 결정적이 아니고 확률적으로 결정되었다는 특징이 있다. 다른 단어들도 해당 단어의 출현 확률에 따라 정해진 비율만큼 샘플링될 가능성이 있다. 

https://dltpgh7060.tistory.com/82

앞서 생성된 단어 say를 언어 모델에 입력하여 단어의 확률분포를 얻고, 확률 분포를 기준으로 다음 출현할 단어를 샘플링한다. 이 과정을 반복한다. 이렇게 생성된 문장은 훈련 데이터에서 사용된 단어의 정렬 패턴을 학습한 것이다. 

문장 생성 구현

코드를 구현하면 다음과 같다.

import sys
sys.path.append('..')
import numpy as np
from functions import softmax
from rnnlm import Rnnlm
from better_rnnlm import BetterRnnlm


class RnnlmGen(Rnnlm):
    def generate(self, start_id, skip_ids=None, sample_size=100):
    #start_id는 최초로 주는 단어 ID,  sample_size는 샘플링 하는 단어수, skip_id는 샘플링 되지 않도록 하는 단어 ID
        word_ids = [start_id]

        x = start_id
        while len(word_ids) < sample_size: 
            x = np.array(x).reshape(1, 1) #predict는 미니배치 처리를 하므로 x는 2차원 배열이여함
            score = self.predict(x)
            p = softmax(score.flatten()) #softmax 정규화를 통한 확률분포 p 얻기

            sampled = np.random.choice(len(p), size=1, p=p)
            if (skip_ids is None) or (sampled not in skip_ids):
                x = sampled
                word_ids.append(int(x))

        return word_ids

위에서 생성한 RnnlmGen으로 문장을 생성해본다.

import sys
sys.path.append('..')
from rnnlm_gen import RnnlmGen
from dataset import ptb


corpus, word_to_id, id_to_word = ptb.load_data('train')
vocab_size = len(word_to_id)
corpus_size = len(corpus)

model = RnnlmGen()
#model.load_params('../ch06/Rnnlm.pkl')

# start문자와 skip 문자 설정
start_word = 'you' #첫단어
start_id = word_to_id[start_word]
skip_words = ['N', '<unk>', '$'] #샘플링 하지 않는 단어
skip_ids = [word_to_id[w] for w in skip_words]
# 문장 생성
word_ids = model.generate(start_id, skip_ids)
txt = ' '.join([id_to_word[i] for i in word_ids]) #리스트와 단어들 사이 구분자 삽입하며 모두 연결
txt = txt.replace(' <eos>', '.\n')
print(txt)

join 함수는 다음과 같이 단어 배열을 문장으로 변환해준다.

' '.join(['you','say','goodbye'])

'you say goodbye'

 

앞의 코드를 실행하면 결과는 다음과 같이 문장 배열의 맥락, 순서나 문장이라고 할 수 없을 정도로 무작위로 단어가 나열되어있다. 

https://dltpgh7060.tistory.com/82

앞에서 학습을 끝낸 가중치 pickle파일을 이용해 문장을 생성하면 다음과 같다.( 책의 내용 그대로 가져옴)

you'll include one of them a good problems.

moreover so if not gene' s corr experience with the heat of bridges a new

deficits model is non-violent what it' s rule must exploit it.

there 's no tires industry could occur.

beyond my hours where hs is n't going home says and janpanese letter.

...

노란색으로 하이라이트 친 문장은 올바른 순서로 배치한 문장들이라고 할 수 있다. 이전 문장 배열보다 훨씬 올바른 문장들이 많이 보였다. 더 나은 모델로 더 올바른 문장들을 생성해야한다.

 seq2seq

seq2seq의 원리

seq2seq(sequence to sequence) 는 2개의 RNN를 이용한다. Encoder(부호화)-Decoder (암호화)모델이라고도 한다.

'나는 고양이로소이다' 라는 출발어 문장을 인코딩 , 인코딩 정보를 decoder에 전달, decoder가 'I am a cat'이라는 도착어 문장을 생성한다. encoder에는 인코딩한 정보에는 변역에 필요한 정보가 조밀하게 응축되어있다. 

encoder 계층은 다음과 같이 구성된다.

https://dltpgh7060.tistory.com/83

RNN를 이용해 시계열 데이터를 h라는 은닉상태 벡터로 변환한다. h는 LSTM 계층의 마지막 은닉상태로 , 입력문장(출발어)를 번역하는데 필요한 정보가 인코딩된다. LSTM 의 은닉상태 h는 고정길이 벡터이다. 

결론은 인코딩 = 문장을 고정길이 벡터로 변환 이다.

 

decoder 계층은 다음과 같이 구성된다. 

특이점이 있다면 LSTM 계층이 벡터 h를 입력받는다. 앞의 모델에서는 LSTM 계층이 아무것도 받지 않았다( 은닉상태로 영벡터를 받았다고 할 수 있음)

 

decoder와 encoder를 연결하면 다음과 같다.

Seq2seq는 encoder의 LSTM과 decoder의 LSTM 로 구성된다. 이때 LSTM 계층의 은닉상태 h가 decoder와 encoder를 이어주는 가교가 된다. 순전파시 encoder에서 인코딩된 정보가 LSTM 계층의 은닉상태를 통해 decoder로 전해진다.

시계열 데이터 변환용 장난감 문제 

seq2seq로 더하기를 다룬다. 단어 단위가 아닌 문자 단위로 분할한다. 만약 "57+5" 입력되면 ['5','7' ,'+', '5'] 리스트로 처리된다. 

가변 길이 시계열 데이터

덧셈 시 문자수가 문제마다 다르다는 특성이 있다.

예를 들면 "57+5"는 총 4문자, 628+521은 총 7문자이다.  신경망 학습 시 미니배치로 학습하게 되는데, 미니 배치 학습 시 속한 샘플들의 데이터 형상이 모두 같아야함 -> 이문제를 해결하기 위해 패딩을 써서 의미없는 데이터를 채워 모든 데이터의 길이를 균일하게 맞추도록 한다. 

https://dltpgh7060.tistory.com/83

모든 입력 데이터의 길이를 통일하고, 남는 공간에는 의미없는 데이터(공백)을 채운다. 출력앞의 _는 질문과 정답을 구분하기 위한 구분자이다. 이렇게 출력 데이터는 총 5자로 통일한다.

패딩을 하면 존재하지 않던 패딩용 문자까지 seq2seq가 처리하기 때문에, 정확성을 위해서는 seq2seq에 패딩 전용처리를 추가해야한다. 

덧셈 데이터셋

seq2seq용 텍스트 파일을 아래 코드로 쉽게 처리할 수 있다.

학습 데이터

import sys
sys.path.append('..')
import os
import numpy


id_to_char = {}
char_to_id = {}


def _update_vocab(txt):
    chars = list(txt)

    for i, char in enumerate(chars):
        if char not in char_to_id:
            tmp_id = len(char_to_id)
            char_to_id[char] = tmp_id
            id_to_char[tmp_id] = char


def load_data(file_name='addition.txt', seed=1984):#텍스트를 문자ID로 변환
    file_path = os.path.dirname(os.path.abspath(__file__)) + '/' + file_name

    if not os.path.exists(file_path):
        print('No file: %s' % file_name)
        return None

    questions, answers = [], []

    for line in open(file_path, 'r'): #파일 읽기
        idx = line.find('_')
        questions.append(line[:idx])
        answers.append(line[idx:-1])

    # create vocab dict
    for i in range(len(questions)):
        q, a = questions[i], answers[i]
        _update_vocab(q)
        _update_vocab(a)

    # create numpy array
    x = numpy.zeros((len(questions), len(questions[0])), dtype=numpy.int)
    t = numpy.zeros((len(questions), len(answers[0])), dtype=numpy.int)

    for i, sentence in enumerate(questions):
        x[i] = [char_to_id[c] for c in list(sentence)]
    for i, sentence in enumerate(answers):
        t[i] = [char_to_id[c] for c in list(sentence)]

    # shuffle
    indices = numpy.arange(len(x))
    if seed is not None:
        numpy.random.seed(seed)
    numpy.random.shuffle(indices) #무작위수 사용하여 전체 데이터 섞기
    x = x[indices]
    t = t[indices]

    # 10% for validation set
    split_at = len(x) - len(x) // 10
    (x_train, x_test) = x[:split_at], x[split_at:]
    (t_train, t_test) = t[:split_at], t[split_at:]

    return (x_train, t_train), (x_test, t_test)


def get_vocab(): #문자와 문자 ID의 대응 관게를 담은 딕셔너리 반환
    return char_to_id, id_to_char
import sys
sys.path.append('..')
from dataset import sequence


(x_train, t_train), (x_test, t_test) = \
    sequence.load_data('addition.txt', seed=1984)
char_to_id, id_to_char = sequence.get_vocab()

print(x_train.shape, t_train.shape) #문자ID가 저장되어 있음
print(x_test.shape, t_test.shape)
# (45000, 7) (45000, 5)
# (5000, 7) (5000, 5)

print(x_train[0]) 
print(t_train[0])
# [ 3  0  2  0  0 11  5]
# [ 6  0 11  7  5]

print(''.join([id_to_char[c] for c in x_train[0]]))
print(''.join([id_to_char[c] for c in t_train[0]]))
# 71+118
# _189

상위의 코드로 지정한 텍스트 파일을 읽어 텍스트를 문자 ID로 변환한후, 이를 train data와 test data로 나눠 반환한다. 

 

seq2seq 구현

seq2seq는 2개의 RNN을 연결한 신경망이다.

Encoder 클래스

encoder 클래스는 embedding 계층과 LSTM 계층으로 구성된다. 

embedding 계층:  문자(정확히 문자ID)를 문자 벡터로 변환. 변환된 벡터가 LSTM 계층으로 이동

LSTM 계층: 오른쪽으로 은닉상태와 셀 출력, 위쪽상태로는 은닉상태만 출력,  더 위로는 다른 계층이 없으니 LSTM 계층의 위쪽 출력은 폐기한다.

LSTM의 은닉상태 h를 출력하고, 이를 decoder에 전달한다. LSTM의 셀은 자기 자신만 사용한다는 전제로 설계되었기 때문에 다른 계층에 전달하는 일은 흔치 않다.

 

Time 계층을 이용하면  encoder는  다음과 같이 구현된다.

https://yerimoh.github.io/DL18/

import sys
sys.path.append('..')
from time_layers import *
from base_model import BaseModel


class Encoder:
    def __init__(self, vocab_size, wordvec_size, hidden_size):
        V, D, H = vocab_size, wordvec_size, hidden_size
        #vocab_size는어휘수(문자 종류) , 문자벡터차원수, 은닉상태 벡터 차원수 
        rn = np.random.randn

        embed_W = (rn(V, D) / 100).astype('f') #가중치 매개변수 초기화
        lstm_Wx = (rn(D, 4 * H) / np.sqrt(D)).astype('f')
        lstm_Wh = (rn(H, 4 * H) / np.sqrt(H)).astype('f')
        lstm_b = np.zeros(4 * H).astype('f')

        self.embed = TimeEmbedding(embed_W)
        self.lstm = TimeLSTM(lstm_Wx, lstm_Wh, lstm_b, stateful=False)#timeLSTM 계층이 상태를 유지하지 않음

        self.params = self.embed.params + self.lstm.params #매개변수 기울기 보관
        self.grads = self.embed.grads + self.lstm.grads
        self.hs = None

    def forward(self, xs): #순전파 
        xs = self.embed.forward(xs)
        hs = self.lstm.forward(xs)
        self.hs = hs
        return hs[:, -1, :] #마지막 은닉상태만을 추출해 forward 메서드 출력 반환

    def backward(self, dh):
        dhs = np.zeros_like(self.hs) # 원소가 모두 0인 텐서 생성
        dhs[:, -1, :] = dh #마지막 은닉상태 dhs의 해당위치에 할당

        dout = self.lstm.backward(dhs)
        dout = self.embed.backward(dout)
        return dout

Decoder 클래스

encoder 클래스가 출력한 h를 받아 목적으로 하는 다른 문자열을 출력한다. decoder는 RNN으로 구현할 수 있다. 

정답 데이터는 _62이고, 입력 데이터는 ['_','6','2',' ']로 주고, 대응하는 출력이 ['6','2',' ', ' ']이 되도록 한다. 이 문장생성 방법에는 확률적(확률 분포 바탕) 이 아닌 결정적(가장 높은 확률)로 선택한다. 

 

decoder 문자열 생성 순서는 다음과 같다.

argmax 노드는 affine 계층의 출력 중 값이 가장 큰 원소의 인덱스를 반환한다. softmax 를 사용하지 않고 affine 계층이 출력하는 점수가 가장 큰 문자 ID를 반환한다. (softmax 를 생략하는 이유는 softmax 계층이  입력된 벡터를 정규화하는데, 정규화 과정에서 벡터의 각 원소값이 달라져도 대소 관계는 바뀌지 않기 때문이다) 

 

decoder 클래스는 다음과 같이 구성된다. 

class Decoder:
    def __init__(self, vocab_size, wordvec_size, hidden_size):
        V, D, H = vocab_size, wordvec_size, hidden_size
        rn = np.random.randn

        embed_W = (rn(V, D) / 100).astype('f')
        lstm_Wx = (rn(D, 4 * H) / np.sqrt(D)).astype('f')
        lstm_Wh = (rn(H, 4 * H) / np.sqrt(H)).astype('f')
        lstm_b = np.zeros(4 * H).astype('f')
        affine_W = (rn(H, V) / np.sqrt(H)).astype('f')
        affine_b = np.zeros(V).astype('f')

        self.embed = TimeEmbedding(embed_W)
        self.lstm = TimeLSTM(lstm_Wx, lstm_Wh, lstm_b, stateful=True)#timelstm이 상태를 갖도록함, encoder의 h를 유지한다.
        self.affine = TimeAffine(affine_W, affine_b)

        self.params, self.grads = [], []
        for layer in (self.embed, self.lstm, self.affine):
            self.params += layer.params
            self.grads += layer.grads

    def forward(self, xs, h):
        self.lstm.set_state(h)

        out = self.embed.forward(xs)
        out = self.lstm.forward(out)
        score = self.affine.forward(out)
        return score

    def backward(self, dscore): #위쪽 계층에서 기울기 dscore를받아옴 
        dout = self.affine.backward(dscore)
        dout = self.lstm.backward(dout)
        dout = self.embed.backward(dout)
        dh = self.lstm.dh #timelstm 기울기 dh 반환
        return dh

    def generate(self, h, start_id, sample_size):#encoder로부터 받은 은닉상태 h, 생성문자수 sample_size
        sampled = []
        sample_id = start_id
        self.lstm.set_state(h)

        for _ in range(sample_size):
            x = np.array(sample_id).reshape((1, 1))
            out = self.embed.forward(x)
            out = self.lstm.forward(out)
            score = self.affine.forward(out)

            sample_id = np.argmax(score.flatten()) #affine 계층이 출력하는 점수가 가장 큰 문자 ID를 선택
            sampled.append(int(sample_id))

        return sampled

time affine -> time LSTM -> time embedding 계층 순서로 전파 

Time LSTM 계층 시간방향 기울기는 TimeLSTM 클래스의 인스턴스 변수 dh에 저장되어있음. 이 시간 방향의 기울기 dh를 꺼내서 backward() 출력으로 반환. decoder 클래스는 학습시와 문장 생성시의 동작이 다르다. 

seq2seq 클래스

seq2seq는 encoder 클래스와 decoder 클래스를 연결해준다

코드는 사실상 위의 코드와 같음

class Seq2seq(BaseModel):
    def __init__(self, vocab_size, wordvec_size, hidden_size):
        V, D, H = vocab_size, wordvec_size, hidden_size
        self.encoder = Encoder(V, D, H)
        self.decoder = Decoder(V, D, H)
        self.softmax = TimeSoftmaxWithLoss()

        self.params = self.encoder.params + self.decoder.params
        self.grads = self.encoder.grads + self.decoder.grads

    def forward(self, xs, ts):
        decoder_xs, decoder_ts = ts[:, :-1], ts[:, 1:]

        h = self.encoder.forward(xs)
        score = self.decoder.forward(decoder_xs, h)
        loss = self.softmax.forward(score, decoder_ts)
        return loss

    def backward(self, dout=1):
        dout = self.softmax.backward(dout)
        dh = self.decoder.backward(dout)
        dout = self.encoder.backward(dh)
        return dout

    def generate(self, xs, start_id, sample_size):
        h = self.encoder.forward(xs)
        sampled = self.decoder.generate(h, start_id, sample_size)
        return sampled

seq2seq 평가

1) 학습 데이터에서 미니배치 선택

2) 미니배치로부터 기울기계산

3) 기울기를 사용하여 매개변수 갱신

seq2seq가 문자열 생성을 수행하여 학습 중간 정답률을 측정한다.

import sys
sys.path.append('..')
import numpy as np
import matplotlib.pyplot as plt
from dataset import sequence
from optimizer import Adam
from trainer import Trainer
from util import eval_seq2seq
from seq2seq import Seq2seq
from peeky_seq2seq import PeekySeq2seq


# 데이터셋 읽기
(x_train, t_train), (x_test, t_test) = sequence.load_data('addition.txt')
char_to_id, id_to_char = sequence.get_vocab()

# 하이퍼 파라미터 설정
vocab_size = len(char_to_id)
wordvec_size = 16
hidden_size = 128
batch_size = 128
max_epoch = 25
max_grad = 5.0

#모델, 옵티마지어, 트레이너 생성
model = Seq2seq(vocab_size, wordvec_size, hidden_size) 
optimizer = Adam()
trainer = Trainer(model, optimizer)

acc_list = []
for epoch in range(max_epoch):
    trainer.fit(x_train, t_train, max_epoch=1,
                batch_size=batch_size, max_grad=max_grad)

    correct_num = 0
    for i in range(len(x_test)):
        question, correct = x_test[[i]], t_test[[i]]
        verbose = i < 10
        correct_num += eval_seq2seq(model, question, correct,
                                    id_to_char, verbose, is_reverse)

    acc = float(correct_num) / len(x_test) #평가 척도로 정답률 사용
    acc_list.append(acc)
    print('val acc %.3f%%' % (acc * 100))

정답률 측정에는 다음과 같은 메서드를 사용했다.

def eval_seq2seq(model, question, correct, id_to_char,
                 verbose=False, is_reverse=False):
    correct = correct.flatten()
    start_id = correct[0]
    correct = correct[1:]
    guess = model.generate(question, start_id, len(correct))

    question = ''.join([id_to_char[int(c)] for c in question.flatten()])
    correct = ''.join([id_to_char[int(c)] for c in correct])
    guess = ''.join([id_to_char[int(c)] for c in guess])

    if verbose:
        if is_reverse:
            question = question[::-1]

        colors = {'ok': '\033[92m', 'fail': '\033[91m', 'close': '\033[0m'}
        print('Q', question)
        print('T', correct)

        is_windows = os.name == 'nt'

        if correct == guess:
            mark = colors['ok'] + '☑' + colors['close']
            if is_windows:
                mark = 'O'
            print(mark + ' ' + guess)
        else:
            mark = colors['fail'] + '☒' + colors['close']
            if is_windows:
                mark = 'X'
            print(mark + ' ' + guess)
        print('---')

    return 1 if guess == correct else 0 #답이 맞으면 1, 아니면 0을 반환

코드를 실행하면 다음결과가 나타난다. T가 답이고 아래 x는 모델이 내놓은 답이다. 정답을 맞추면☑를 반환하게 된다.  

학습이 진행됨에 따라 accuracy는 다음과 같다. 초기에는 정답을 거의 맞추지 못하다가 epoch이 20을 조금 넘었을 때 정답률이 약 10%정도인것을 알 수 있다.