昨今、深層学習の技術の発展によって、自然言語処理界隈においても深層学習を活用する例で賑わってきている印象です。
今回は、自然言語処理における機械翻訳や対話モデルなどタスクで利用されるSequence to sequence model: Seq2Seqと、その改良版であるAttention Seq2SeqのPythonによる実装コードを紹介します。
機械学習や自然言語処理に興味のある方や、実際に手を動かして学びたい方にとって参考になれば幸いです。
系列変換モデル(Sequence to Sequence Model: Seq2Seq)
自然言語処理の典型的な応用タスクとして、機械翻訳、文章要約、対話、質問応答、文章分類などが考えられます。
特に以下のタスクらは文章から文章への変換とみなせるタスクであり、系列から系列への変換を行うモデルで対応できそうなことがわかります。
- 機械翻訳 = 「翻訳元の言語の文章」から「翻訳先の言語の文章」への変換
- 対話・質疑応答 = 「相手の発言の文章」から「自分の発言の文章」への変換
- 要約 = 「元の文章」から「要約された文章」への変換
これについて以下の論文で提案された、受け取った系列を別の系列に変換する確率をモデル化したものが系列変換モデル(Sequence to Sequence Model)です。
- Sequence to Sequence Learning with Neural Networks: https://arxiv.org/abs/1409.3215
系列変換モデルはエンコーダー(符号化器)とデコーダー(復号化器)の2つの要素から構成され、エンコーダーでは入力文の単語のone-hotベクトル列を受け取り、RNNによって隠れベクトルを保存して、デコーダーではエンコーダーが作成した隠れ層のベクトル表現を用いて出力文を生成します。
系列変換モデルが盛んに研究されるようになってから、文献ごとにSequence-to-sequence、符号化復号化モデル(Encoder Decoder Model)、End-to-end学習といった新しい用語が使われるようになったみたいですが、概念的にはいずれも同じものと考えていいみたいです。
後述のAttentionの詳しい解説や、このような自然言語処理における深層学習適用の発展の経緯などについては、下記書籍が参考になるのでおすすめです。
Seq2Seqによる対話モデルの実装
実際に実装について見ていきます。
実はChainerやTensorFlowの公式チュートリアルにもSeq2Seqモデルの実装例が公開されています。
- TensorFlow: https://www.tensorflow.org/tutorials/seq2seq
特にChainerのチュートリアルコードは機械翻訳タスク用に、精度評価指標としてBLEU(BiLingual Evaluation Understudy)が使われています。
BLEUは、機械翻訳の分野において一般的に用いられる自動評価基準で、N-gramマッチ率に基づく手法を用いています。
今回ここで実装するコードに関しては特に評価基準などは使わず、実際に学習させたモデルで簡単な対話の出力させてみてその様子を確認してみることにします。
まずはデータコンバータークラスを以下のように作成します。
日本語で考える場合、文章を形態素解析で単語レベルに分解し、また単語自体も数値に置き換えることでモデルが認識できるようになります。
数値系列を受け取り、数値系列を返却するので、逆にどの数値がどの単語を表すのかも覚えておく必要があります。
import datetime
import numpy as np
import chainer
import chainer.functions as F
import chainer.links as L
import MeCab
# GPUのセット
gpu = -1
if gpu >= 0: # numpyかcuda.cupyか
xp = chainer.cuda.cupy
chainer.cuda.get_device(gpu).use()
else:
xp = np
# データ変換クラスの定義
class DataConverter:
def __init__(self, batch_col_size):
"""クラスの初期化
Args:
batch_col_size: 学習時のミニバッチ単語数サイズ
"""
self.mecab = MeCab.Tagger() # 形態素解析器
self.vocab = {"<eos>":0, "<unknown>": 1} # 単語辞書
self.batch_col_size = batch_col_size
def load(self, data):
"""学習時に、教師データを読み込んでミニバッチサイズに対応したNumpy配列に変換する
Args:
data: 対話データ
"""
# 単語辞書の登録
self.vocab = {"<eos>":0, "<unknown>": 1} # 単語辞書を初期化
for d in data:
sentences = [d[0][0], d[1][0]] # 入力文、返答文
for sentence in sentences:
sentence_words = self.sentence2words(sentence) # 文章を単語に分解する
for word in sentence_words:
if word not in self.vocab:
self.vocab[word] = len(self.vocab)
# 教師データのID化と整理
queries, responses = [], []
for d in data:
query, response = d[0][0], d[1][0] # エンコード文、デコード文
queries.append(self.sentence2ids(sentence=query, train=True, sentence_type="query"))
responses.append(self.sentence2ids(sentence=response, train=True, sentence_type="response"))
self.train_queries = xp.vstack(queries)
self.train_responses = xp.vstack(responses)
def sentence2words(self, sentence):
"""文章を単語の配列にして返却する
Args:
sentence: 文章文字列
"""
sentence_words = []
for m in self.mecab.parse(sentence).split("\n"): # 形態素解析で単語に分解する
w = m.split("\t")[0].lower() # 単語
if len(w) == 0 or w == "eos": # 不正文字、EOSは省略
continue
sentence_words.append(w)
sentence_words.append("<eos>") # 最後にvocabに登録している<eos>を代入する
return sentence_words
def sentence2ids(self, sentence, train=True, sentence_type="query"):
"""文章を単語IDのNumpy配列に変換して返却する
Args:
sentence: 文章文字列
train: 学習用かどうか
sentence_type: 学習用でミニバッチ対応のためのサイズ補填方向をクエリー・レスポンスで変更するため"query"or"response"を指定
Returns:
ids: 単語IDのNumpy配列
"""
ids = [] # 単語IDに変換して格納する配列
sentence_words = self.sentence2words(sentence) # 文章を単語に分解する
for word in sentence_words:
if word in self.vocab: # 単語辞書に存在する単語ならば、IDに変換する
ids.append(self.vocab[word])
else: # 単語辞書に存在しない単語ならば、<unknown>に変換する
ids.append(self.vocab["<unknown>"])
# 学習時は、ミニバッチ対応のため、単語数サイズを調整してNumpy変換する
if train:
if sentence_type == "query": # クエリーの場合は前方にミニバッチ単語数サイズになるまで-1を補填する
while len(ids) > self.batch_col_size: # ミニバッチ単語サイズよりも大きければ、ミニバッチ単語サイズになるまで先頭から削る
ids.pop(0)
ids = xp.array([-1]*(self.batch_col_size-len(ids))+ids, dtype="int32")
elif sentence_type == "response": # レスポンスの場合は後方にミニバッチ単語数サイズになるまで-1を補填する
while len(ids) > self.batch_col_size: # ミニバッチ単語サイズよりも大きければ、ミニバッチ単語サイズになるまで末尾から削る
ids.pop()
ids = xp.array(ids+[-1]*(self.batch_col_size-len(ids)), dtype="int32")
else: # 予測時は、そのままNumpy変換する
ids = xp.array([ids], dtype="int32")
return ids
def ids2words(self, ids):
"""予測時に、単語IDのNumpy配列を単語に変換して返却する
Args:
ids: 単語IDのNumpy配列
Returns:
words: 単語の配列
"""
words = [] # 単語を格納する配列
for i in ids: # 順番に単語IDを単語辞書から参照して単語に変換する
words.append(list(self.vocab.keys())[list(self.vocab.values()).index(i)])
return words
次にSeq2Seqモデルを作ってみます。
エンコーダー用のLSTMとデコーダー用のLSTMを用意して、受け取った系列をエンコーダーが読み取り、デコーダーがまた系列を出力するように作ります。
# モデルクラスの定義
class LSTM_Encoder(chainer.Chain):
def __init__(self, vocab_size, embed_size, hidden_size):
"""Encoderのインスタンス化
Args:
vocab_size: 使われる単語の種類数
embed_size: 単語をベクトル表現した際のサイズ
hidden_size: 隠れ層のサイズ
"""
super(LSTM_Encoder, self).__init__(
xe = L.EmbedID(vocab_size, embed_size, ignore_label=-1),
eh = L.Linear(embed_size, 4 * hidden_size),
hh = L.Linear(hidden_size, 4 * hidden_size)
)
def __call__(self, x, c, h):
"""Encoderの計算
Args:
x: one-hotな単語
c: 内部メモリ
h: 隠れ層
Returns:
次の内部メモリ, 次の隠れ層
"""
e = F.tanh(self.xe(x))
return F.lstm(c, self.eh(e) + self.hh(h))
class LSTM_Decoder(chainer.Chain):
def __init__(self, vocab_size, embed_size, hidden_size):
"""Decoderのインスタンス化
Args:
vocab_size: 使われる単語の種類数(語彙数)
embed_size: 単語をベクトル表現した際のサイズ
hidden_size: 隠れ層のサイズ
"""
super(LSTM_Decoder, self).__init__(
ye = L.EmbedID(vocab_size, embed_size, ignore_label=-1),
eh = L.Linear(embed_size, 4 * hidden_size),
hh = L.Linear(hidden_size, 4 * hidden_size),
he = L.Linear(hidden_size, embed_size),
ey = L.Linear(embed_size, vocab_size)
)
def __call__(self, y, c, h):
"""Decoderの計算
Args:
y: one-hotな単語
c: 内部メモリ
h: 隠れ層
Returns:
予測単語、次の内部メモリ、次の隠れ層
"""
e = F.tanh(self.ye(y))
c, h = F.lstm(c, self.eh(e) + self.hh(h))
t = self.ey(F.tanh(self.he(h)))
return t, c, h
class Seq2Seq(chainer.Chain):
def __init__(self, vocab_size, embed_size, hidden_size):
"""Seq2Seqのインスタンス化
Args:
vocab_size: 語彙サイズ
embed_size: 単語ベクトルのサイズ
hidden_size: 中間ベクトルのサイズ
"""
super(Seq2Seq, self).__init__(
encoder = LSTM_Encoder(vocab_size, embed_size, hidden_size), # Encoderのインスタンス化
decoder = LSTM_Decoder(vocab_size, embed_size, hidden_size) # Decoderのインスタンス化
)
self.vocab_size = vocab_size
self.embed_size = embed_size
self.hidden_size = hidden_size
self.decode_max_size = 20 # デコードはEOSが出力されれば終了する、出力されない場合の最大出力語彙数
def encode(self, words, batch_size):
"""Encoderの計算
Args:
words: 単語が記録されたリスト
"""
# 内部メモリ、中間ベクトルの初期化
c = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
h = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
# エンコーダーに単語を順番に読み込ませる
for w in words:
c, h = self.encoder(w, c, h)
self.h = h # 計算した中間ベクトルをデコーダーに引き継ぐためにインスタンス変数にする
self.c = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32')) # 内部メモリは引き継がないので、初期化
def decode(self, w):
"""Decoderの計算
Args:
w: 単語
Returns:
単語数サイズのベクトル
"""
t, self.c, self.h = self.decoder(w, self.c, self.h)
return t
def reset(self):
"""勾配の初期化
"""
self.zerograds()
def __call__(self, enc_words, dec_words=None, train=True):
"""順伝播の計算を行う関数
Args:
enc_words: 発話文の単語を記録したリスト
dec_words: 応答文の単語を記録したリスト
Returns:
計算した損失の合計 or 予測したデコード文字列
"""
enc_words = enc_words.T
if train:
dec_words = dec_words.T
batch_size = len(enc_words[0]) # バッチサイズを記録
self.reset() # model内に保存されている勾配をリセット
enc_words = [chainer.Variable(xp.array(row, dtype='int32')) for row in enc_words] # 発話リスト内の単語を、chainerの型であるVariable型に変更
self.encode(enc_words, batch_size) # エンコードの計算
t = chainer.Variable(xp.array([0 for _ in range(batch_size)], dtype='int32')) # <eos>をデコーダーに読み込ませる
loss = chainer.Variable(xp.zeros((), dtype='float32')) # 損失の初期化
ys = [] # デコーダーが生成するデコード文字列を格納する配列
# デコーダーの計算
if train: # 学習の場合は損失を計算する
for w in dec_words:
y = self.decode(t) # 1単語ずつをデコードする
t = chainer.Variable(xp.array(w, dtype='int32')) # 正解単語をVariable型に変換
loss += F.softmax_cross_entropy(y, t) # 正解単語と予測単語を照らし合わせて損失を計算
return loss
else: # 予測の場合はデコード文字列を生成する
for i in range(self.decode_max_size):
y = self.decode(t)
y = np.argmax(y.data) # 確率で出力されたままなので、確率が高い予測単語を取得する
ys.append(y)
t = chainer.Variable(xp.array([y], dtype='int32'))
if y == 0: # EOSを出力したならばデコードを終了する
break
return ys
例えば、以下のような簡単な問いかけ文章→応答文章の教師データがあるとして、以下のようにして学習させることができます。
# 教師データ
data = [
[["初めまして。"], ["初めまして。よろしくお願いします。"]],
[["どこから来たんですか?"], ["日本から来ました。"]],
[["日本のどこに住んでるんですか?"], ["東京に住んでいます。"]],
[["仕事は何してますか?"], ["私は会社員です。"]],
[["お会いできて嬉しかったです。"], ["私もです!"]],
[["おはよう。"], ["おはようございます。"]],
[["いつも何時に起きますか?"], ["6時に起きます。"]],
[["朝食は何を食べますか?"], ["たいていトーストと卵を食べます。"]],
[["朝食は毎日食べますか?"], ["たまに朝食を抜くことがあります。"]],
[["野菜をたくさん取っていますか?"], ["毎日野菜を取るようにしています。"]],
[["週末は何をしていますか?"], ["友達と会っていることが多いです。"]],
[["どこに行くのが好き?"], ["私たちは渋谷に行くのが好きです。"]]
]
# 定数
embed_size = 100
hidden_size = 100
batch_size = 6 # ミニバッチ学習のバッチサイズ数
epoch_num = 50 # エポック数
N = len(data) # 教師データの数
# 教師データの読み込み
data_converter = DataConverter(batch_col_size=batch_col_size) # データコンバーター
data_converter.load(data) # 教師データ読み込み
vocab_size = len(data_converter.vocab) # 単語数
# モデルの宣言
model = Seq2Seq(vocab_size=vocab_size, embed_size=embed_size, hidden_size=hidden_size)
opt = chainer.optimizers.Adam()
opt.setup(model)
opt.add_hook(chainer.optimizer.GradientClipping(5))
if gpu >= 0:
model.to_gpu(gpu)
model.reset()
# 学習
st = datetime.datetime.now()
for epoch in range(epoch_num):
# ミニバッチ学習
perm = np.random.permutation(N) # ランダムな整数列リストを取得
total_loss = 0
for i in range(0, N, batch_size):
enc_words = data_converter.train_queries[perm[i:i+batch_size]]
dec_words = data_converter.train_responses[perm[i:i+batch_size]]
model.reset()
loss = model(enc_words=enc_words, dec_words=dec_words, train=True)
loss.backward()
loss.unchain_backward()
total_loss += loss.data
opt.update()
if (epoch+1)%10 == 0:
ed = datetime.datetime.now()
print("epoch:\t{}\ttotal loss:\t{}\ttime:\t{}".format(epoch+1, total_loss, ed-st))
st = datetime.datetime.now()
epoch: 10 total loss: 69.04533767700195 time: 0:00:01.822704
epoch: 20 total loss: 43.514638900756836 time: 0:00:01.808647
epoch: 30 total loss: 23.216137886047363 time: 0:00:01.838429
epoch: 40 total loss: 9.63737440109253 time: 0:00:01.819828
epoch: 50 total loss: 4.097650527954102 time: 0:00:01.827059
損失が下がっていっているのが分かります。
同じ問いかけ文章ですが、学習させたモデルに文章を入力すると、応答文章を予測(出力)してくれるようになりました。
def predict(model, query):
enc_query = data_converter.sentence2ids(query, train=False)
dec_response = model(enc_words=enc_query, train=False)
response = data_converter.ids2words(dec_response)
print(query, "=>", response)
predict(model, "初めまして。")
predict(model, "どこから来たんですか?")
predict(model, "日本のどこに住んでるんですか?")
predict(model, "仕事は何してますか?")
predict(model, "お会いできて嬉しかったです。")
predict(model, "おはよう。")
predict(model, "いつも何時に起きますか?")
predict(model, "朝食は何を食べますか?")
predict(model, "朝食は毎日食べますか?")
predict(model, "野菜をたくさん取っていますか?")
predict(model, "週末は何をしていますか?")
predict(model, "どこに行くのが好き?")
初めまして。 => ['初め', 'まして', '。', 'よろしく', 'お願い', 'し', 'ます', '。', '<eos>']
どこから来たんですか? => ['日本', 'から', '来', 'まし', 'た', '。', '<eos>']
日本のどこに住んでるんですか? => ['東京', 'に', '住ん', 'で', 'い', 'ます', '。', '<eos>']
仕事は何してますか? => ['私', 'は', '会社', '員', 'です', '。', '<eos>']
お会いできて嬉しかったです。 => ['私', 'も', 'です', '!', '<eos>']
おはよう。 => ['おはよう', 'ござい', 'ます', '。', '<eos>']
いつも何時に起きますか? => ['6', '時', 'に', '起き', 'ます', '。', '<eos>']
朝食は何を食べますか? => ['たいてい', 'トースト', 'と', '卵', 'を', '食べ', 'ます', '。', '<eos>']
朝食は毎日食べますか? => ['たまに', '朝食', 'を', '抜く', 'こと', 'が', 'あり', 'ます', '。', '<eos>']
野菜をたくさん取っていますか? => ['毎日', '野菜', 'を', '取る', 'よう', 'に', 'し', 'て', 'い', 'ます', '。', '<eos>']
週末は何をしていますか? => ['友達', 'と', '会っ', 'て', 'いる', 'こと', 'が', '多い', 'です', '。', '<eos>']
どこに行くのが好き? => ['私', 'たち', 'は', '渋谷', 'に', '行く', 'の', 'が', '好き', 'です', '。', '<eos>']
注意機構(Attention)
複数のベクトルがあった時に、どのベクトルを重要視するかも含めて学習させる仕組みのことを注意機構(Attention mechanism)といいます。
特に, 複数ベクトルの重み付き平均を使う方法をソフト注意機構(Soft attention mechanism)といいます。
通常のSequence to Sequence Modelでは、エンコーダーで入力文章を通した最後の隠れ層ベクトルを使う仕組み上、入力文章前半の情報が反映されづらくなってしまいます。
これに対して、英語では文章を逆順にしたり、日本語・中国語はそのままの順序で読み込ませるなどをすることで、精度を上げることはできることが報告されていますが、本質的な解決にはなりません。
そこで、入力文章の各単語のベクトルをエンコードした時の隠れ層ベクトルをそれぞれ保存しておき、デコード時に、どの隠れ層ベクトルをどのくらいの重みで見るべきかまでを学習させるものが、ソフト注意機構になります。
ちなみにこれに対し、見るべき隠れ層ベクトルを一つに決めてしまう方法もあり、ハード注意機構(Hard attention mecanism)といいます。
詳しくは文献や前述で紹介した書籍などが参考になります。
- Neural Machine Translation by Jointly Learning to Align and Translate: https://arxiv.org/abs/1409.0473
それでは先ほど作成したSeq2Seqにソフト注意機構の方法でAttentionを追加実装してみようと思います。
Attention Seq2Seqによる対話モデルの実装
データコンバータークラスは先ほどのものを活用できます。
モデルクラスは以下のように作成してみました。
エンコーダーのLSTMは概ね変わらず、デコーダーにAttentionの仕組みを取り入れています。
また、エンコーダーも今度は順向きの文章の読み込みだけでなく、逆向きの文章の読み込みも加えた、Bi-directionalな入力を学習するようにしてみました。(なのでより正確にはBi-directional LSTMを用いたAttention Seq2Seqといったところでしょうか)
# モデルクラスの定義
# LSTMエンコーダークラス
class LSTMEncoder(chainer.Chain):
def __init__(self, vocab_size, embed_size, hidden_size):
"""Encoderのインスタンス化
Args:
vocab_size: 使われる単語の種類数
embed_size: 単語をベクトル表現した際のサイズ
hidden_size: 隠れ層のサイズ
"""
super(LSTMEncoder, self).__init__(
xe = L.EmbedID(vocab_size, embed_size, ignore_label=-1),
eh = L.Linear(embed_size, 4 * hidden_size),
hh = L.Linear(hidden_size, 4 * hidden_size)
)
def __call__(self, x, c, h):
"""Encoderの計算
Args:
x: one-hotな単語
c: 内部メモリ
h: 隠れ層
Returns:
次の内部メモリ、次の隠れ層
"""
e = F.tanh(self.xe(x))
return F.lstm(c, self.eh(e) + self.hh(h))
# Attention Model + LSTMデコーダークラス
class AttLSTMDecoder(chainer.Chain):
def __init__(self, vocab_size, embed_size, hidden_size):
"""Attention ModelのためのDecoderのインスタンス化
Args:
vocab_size: 語彙数
embed_size: 単語ベクトルのサイズ
hidden_size: 隠れ層のサイズ
"""
super(AttLSTMDecoder, self).__init__(
ye = L.EmbedID(vocab_size, embed_size, ignore_label=-1), # 単語を単語ベクトルに変換する層
eh = L.Linear(embed_size, 4 * hidden_size), # 単語ベクトルを隠れ層の4倍のサイズのベクトルに変換する層
hh = L.Linear(hidden_size, 4 * hidden_size), # Decoderの中間ベクトルを隠れ層の4倍のサイズのベクトルに変換する層
fh = L.Linear(hidden_size, 4 * hidden_size), # 順向きEncoderの中間ベクトルの加重平均を隠れ層の4倍のサイズのベクトルに変換する層
bh = L.Linear(hidden_size, 4 * hidden_size), # 順向きEncoderの中間ベクトルの加重平均を隠れ層の4倍のサイズのベクトルに変換する層
he = L.Linear(hidden_size, embed_size), # 隠れ層サイズのベクトルを単語ベクトルのサイズに変換する層
ey = L.Linear(embed_size, vocab_size) # 単語ベクトルを語彙数サイズのベクトルに変換する層
)
def __call__(self, y, c, h, f, b):
"""Decoderの計算
Args:
y: Decoderに入力する単語
c: 内部メモリ
h: Decoderの中間ベクトル
f: Attention Modelで計算された順向きEncoderの加重平均
b: Attention Modelで計算された逆向きEncoderの加重平均
Returns:
語彙数サイズのベクトル、更新された内部メモリ、更新された中間ベクトル
"""
e = F.tanh(self.ye(y)) # 単語を単語ベクトルに変換
c, h = F.lstm(c, self.eh(e) + self.hh(h) + self.fh(f) + self.bh(b)) # 単語ベクトル、Decoderの中間ベクトル、順向きEncoderのAttention、逆向きEncoderのAttentionを使ってLSTM
t = self.ey(F.tanh(self.he(h))) # LSTMから出力された中間ベクトルを語彙数サイズのベクトルに変換する
return t, c, h
# Attentionモデルクラス
class Attention(chainer.Chain):
def __init__(self, hidden_size):
"""Attentionのインスタンス化
Args:
hidden_size: 隠れ層のサイズ
"""
super(Attention, self).__init__(
fh = L.Linear(hidden_size, hidden_size), # 順向きのEncoderの中間ベクトルを隠れ層サイズのベクトルに変換する線形結合層
bh = L.Linear(hidden_size, hidden_size), # 逆向きのEncoderの中間ベクトルを隠れ層サイズのベクトルに変換する線形結合層
hh = L.Linear(hidden_size, hidden_size), # Decoderの中間ベクトルを隠れ層サイズのベクトルに変換する線形結合層
hw = L.Linear(hidden_size, 1), # 隠れ層サイズのベクトルをスカラーに変換するための線形結合層
)
self.hidden_size = hidden_size # 隠れ層のサイズを記憶
def __call__(self, fs, bs, h):
"""Attentionの計算
Args:
fs: 順向きのEncoderの中間ベクトルが記録されたリスト
bs: 逆向きのEncoderの中間ベクトルが記録されたリスト
h: Decoderで出力された中間ベクトル
Returns:
順向きのEncoderの中間ベクトルの加重平均、逆向きのEncoderの中間ベクトルの加重平均
"""
batch_size = h.data.shape[0] # ミニバッチのサイズを記憶
ws = [] # ウェイトを記録するためのリストの初期化
sum_w = chainer.Variable(xp.zeros((batch_size, 1), dtype='float32')) # ウェイトの合計値を計算するための値を初期化
# Encoderの中間ベクトルとDecoderの中間ベクトルを使ってウェイトの計算
for f, b in zip(fs, bs):
w = F.tanh(self.fh(f)+self.bh(b)+self.hh(h)) # 順向きEncoderの中間ベクトル、逆向きEncoderの中間ベクトル、Decoderの中間ベクトルを使ってウェイトの計算
w = F.exp(self.hw(w)) # softmax関数を使って正規化する
ws.append(w) # 計算したウェイトを記録
sum_w += w
# 出力する加重平均ベクトルの初期化
att_f = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
att_b = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
for f, b, w in zip(fs, bs, ws):
w /= sum_w # ウェイトの和が1になるように正規化
# ウェイト * Encoderの中間ベクトルを出力するベクトルに足していく
att_f += F.reshape(F.batch_matmul(f, w), (batch_size, self.hidden_size))
att_b += F.reshape(F.batch_matmul(b, w), (batch_size, self.hidden_size))
return att_f, att_b
# Attention Sequence to Sequence Modelクラス
class AttSeq2Seq(chainer.Chain):
def __init__(self, vocab_size, embed_size, hidden_size, batch_col_size):
"""Attention + Seq2Seqのインスタンス化
Args:
vocab_size: 語彙数のサイズ
embed_size: 単語ベクトルのサイズ
hidden_size: 隠れ層のサイズ
"""
super(AttSeq2Seq, self).__init__(
f_encoder = LSTMEncoder(vocab_size, embed_size, hidden_size), # 順向きのEncoder
b_encoder = LSTMEncoder(vocab_size, embed_size, hidden_size), # 逆向きのEncoder
attention = Attention(hidden_size), # Attention Model
decoder = AttLSTMDecoder(vocab_size, embed_size, hidden_size) # Decoder
)
self.vocab_size = vocab_size
self.embed_size = embed_size
self.hidden_size = hidden_size
self.decode_max_size = batch_col_size # デコードはEOSが出力されれば終了する、出力されない場合の最大出力語彙数
# 順向きのEncoderの中間ベクトル、逆向きのEncoderの中間ベクトルを保存するためのリストを初期化
self.fs = []
self.bs = []
def encode(self, words, batch_size):
"""Encoderの計算
Args:
words: 入力で使用する単語記録されたリスト
batch_size: ミニバッチのサイズ
"""
c = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
h = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
# 順向きのEncoderの計算
for w in words:
c, h = self.f_encoder(w, c, h)
self.fs.append(h) # 計算された中間ベクトルを記録
# 内部メモリ、中間ベクトルの初期化
c = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
h = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
# 逆向きのEncoderの計算
for w in reversed(words):
c, h = self.b_encoder(w, c, h)
self.bs.insert(0, h) # 計算された中間ベクトルを記録
# 内部メモリ、中間ベクトルの初期化
self.c = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
self.h = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
def decode(self, w):
"""Decoderの計算
Args:
w: Decoderで入力する単語
Returns:
予測単語
"""
att_f, att_b = self.attention(self.fs, self.bs, self.h)
t, self.c, self.h = self.decoder(w, self.c, self.h, att_f, att_b)
return t
def reset(self):
"""インスタンス変数を初期化する
"""
# Encoderの中間ベクトルを記録するリストの初期化
self.fs = []
self.bs = []
# 勾配の初期化
self.zerograds()
def __call__(self, enc_words, dec_words=None, train=True):
"""順伝播の計算を行う関数
Args:
enc_words: 発話文の単語を記録したリスト
dec_words: 応答文の単語を記録したリスト
train: 学習か予測か
Returns:
計算した損失の合計 or 予測したデコード文字列
"""
enc_words = enc_words.T
if train:
dec_words = dec_words.T
batch_size = len(enc_words[0]) # バッチサイズを記録
self.reset() # model内に保存されている勾配をリセット
enc_words = [chainer.Variable(xp.array(row, dtype='int32')) for row in enc_words] # 発話リスト内の単語をVariable型に変更
self.encode(enc_words, batch_size) # エンコードの計算
t = chainer.Variable(xp.array([0 for _ in range(batch_size)], dtype='int32')) # <eos>をデコーダーに読み込ませる
loss = chainer.Variable(xp.zeros((), dtype='float32')) # 損失の初期化
ys = [] # デコーダーが生成する単語を記録するリスト
# デコーダーの計算
if train: # 学習の場合は損失を計算する
for w in dec_words:
y = self.decode(t) # 1単語ずつをデコードする
t = chainer.Variable(xp.array(w, dtype='int32')) # 正解単語をVariable型に変換
loss += F.softmax_cross_entropy(y, t) # 正解単語と予測単語を照らし合わせて損失を計算
return loss
else: # 予測の場合はデコード文字列を生成する
for i in range(self.decode_max_size):
y = self.decode(t)
y = xp.argmax(y.data) # 確率で出力されたままなので、確率が高い予測単語を取得する
ys.append(y)
t = chainer.Variable(xp.array([y], dtype='int32'))
if y == 0: # EOSを出力したならばデコードを終了する
break
return ys
同じ教師データを使って学習させると、損失が下がっていくことが確認できます。
# 定数
embed_size = 100
hidden_size = 100
batch_size = 6 # ミニバッチ学習のバッチサイズ数
batch_col_size = 15
epoch_num = 50 # エポック数
N = len(data) # 教師データの数
# 教師データの読み込み
data_converter = DataConverter(batch_col_size=batch_col_size) # データコンバーター
data_converter.load(data) # 教師データ読み込み
vocab_size = len(data_converter.vocab) # 単語数
# モデルの宣言
model = AttSeq2Seq(vocab_size=vocab_size, embed_size=embed_size, hidden_size=hidden_size, batch_col_size=batch_col_size)
opt = chainer.optimizers.Adam()
opt.setup(model)
opt.add_hook(chainer.optimizer.GradientClipping(5))
if gpu >= 0:
model.to_gpu(gpu)
model.reset()
# 学習
st = datetime.datetime.now()
for epoch in range(epoch_num):
# ミニバッチ学習
perm = np.random.permutation(N) # ランダムな整数列リストを取得
total_loss = 0
for i in range(0, N, batch_size):
enc_words = data_converter.train_queries[perm[i:i+batch_size]]
dec_words = data_converter.train_responses[perm[i:i+batch_size]]
model.reset()
loss = model(enc_words=enc_words, dec_words=dec_words, train=True)
loss.backward()
loss.unchain_backward()
total_loss += loss.data
opt.update()
if (epoch+1)%10 == 0:
ed = datetime.datetime.now()
print("epoch:\t{}\ttotal loss:\t{}\ttime:\t{}".format(epoch+1, total_loss, ed-st))
st = datetime.datetime.now()
epoch: 10 total loss: 63.68022537231445 time: 0:00:18.342773
epoch: 20 total loss: 34.956777572631836 time: 0:00:18.312278
epoch: 30 total loss: 16.539709091186523 time: 0:00:18.366164
epoch: 40 total loss: 7.058552265167236 time: 0:00:18.306779
epoch: 50 total loss: 3.2891041040420532 time: 0:00:18.359869
同様にして、問いかけ文章を入力すると応答文章が返却されます。
どうせならSeq2Seqももう少し難しい文章とかを学習・予測させて、精度に違いが体感で現れるかどうか試せば良かったですね。
def predict(model, query):
enc_query = data_converter.sentence2ids(query, train=False)
dec_response = model(enc_words=enc_query, train=False)
response = data_converter.ids2words(dec_response)
print(query, "=>", response)
predict(model, "初めまして。")
predict(model, "どこから来たんですか?")
predict(model, "日本のどこに住んでるんですか?")
predict(model, "仕事は何してますか?")
predict(model, "お会いできて嬉しかったです。")
predict(model, "おはよう。")
predict(model, "いつも何時に起きますか?")
predict(model, "朝食は何を食べますか?")
predict(model, "朝食は毎日食べますか?")
predict(model, "野菜をたくさん取っていますか?")
predict(model, "週末は何をしていますか?")
predict(model, "どこに行くのが好き?")
初めまして。 => ['初め', 'まして', '。', 'よろしく', 'お願い', 'し', 'ます', '。', '<eos>']
どこから来たんですか? => ['日本', 'から', '来', 'まし', 'た', '。', '<eos>']
日本のどこに住んでるんですか? => ['東京', 'に', '住ん', 'で', 'い', 'ます', '。', '<eos>']
仕事は何してますか? => ['私', 'は', '会社', '員', 'です', '。', '<eos>']
お会いできて嬉しかったです。 => ['私', 'も', 'です', '!', '<eos>']
おはよう。 => ['おはよう', 'ござい', 'ます', '。', '<eos>']
いつも何時に起きますか? => ['6', '時', 'に', '起き', 'ます', '。', '<eos>']
朝食は何を食べますか? => ['たいてい', 'トースト', 'と', '卵', 'を', '食べ', 'ます', '。', '<eos>']
朝食は毎日食べますか? => ['たまに', '朝食', 'を', '抜く', 'こと', 'が', 'あり', 'ます', '。', '<eos>']
野菜をたくさん取っていますか? => ['毎日', '野菜', 'を', '取る', 'よう', 'に', 'し', 'て', 'い', 'ます', '。', '<eos>']
週末は何をしていますか? => ['友達', 'と', '会っ', 'て', 'いる', 'こと', 'が', '多い', 'です', '。', '<eos>']
どこに行くのが好き? => ['私', 'たち', 'は', '渋谷', 'に', '行く', 'の', 'が', '好き', 'です', '。', '<eos>']
まとめ
Seq2SeqおよびAttention Seq2Seqの実装について紹介しました。
自然言語処理や機械学習の分野では日々新しい手法や改良が進んでいますが、系列変換モデルは依然として非常に有用であり、Google翻訳の機械翻訳のベースとしても活用されています。
参考としてのコード紹介のみの記事となりましたが、皆さんの学習や実践に役立てば幸いです。