본문 바로가기
딥러닝/자연어처리

[Keras] Attention 을 이용해 자연어 감성 분석 해보기 : Hierarchical Attention Networks for Document Classification 구현

by 달죽 2020. 11. 10.
반응형

 

 

 

데이터 셋 : Imdb 

 

 

import tensorflow as tf
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
  try:
        # 첫 번째 GPU만 사용하도록 제한
    tf.config.experimental.set_visible_devices(gpus[0], 'GPU')
  except RuntimeError as e:
    print(e)
    
    
MAX_SENTENCES = 10
MAX_SENTENCE_LENGTH = 25
import os, re 
import pandas as pd 
import tensorflow as tf 
from tensorflow.keras import utils 


dataset = tf.keras.utils.get_file( fname='imdb.tar.gz', # 다운받은 압축파일의 이름
                                  origin = "http://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz", # 링크주소 
                                  extract = True)

basic_dir = os.path.dirname(dataset) 
print(basic_dir)
os.path.join(basic_dir, 'aclImdb') # os.path.join(basic_dir, 'aclImdb') 이라고 입력하시면, 그 데이터셋보다 한 단계 아래인 폴더에까지 접근이 가능합니다. 이렇게 os.path.join 함수는 디렉터리 경로를 조인해주는 역할을 합니다.

path_train_pos = os.path.join(basic_dir, 'aclImdb', 'train', 'pos') 
path_train_pos

#'C:\\Users\\user\\.keras\\datasets\\aclImdb\\train\\pos'

# refer: http://ai.stanford.edu/~amaas/data/sentiment/

import os

data_dir = os.path.join(basic_dir, 'aclImdb')
train_dir = os.path.join(data_dir, 'train')
test_dir = os.path.join(data_dir, 'test')

def load_dataset(split='train'):
    x_data = []
    y_data = []
    
    base_dir = os.path.join(basic_dir, 'aclImdb')
    split_dir = os.path.join(base_dir, split)
    for sentiment, y in [('neg', 0), ('pos', 1)]:
        data_dir = os.path.join(split_dir, sentiment)
        for file_name in os.listdir(data_dir):
            file_path = os.path.join(data_dir, file_name)
            with open(file_path, 'r', encoding='utf-8') as fp:
                review = fp.read()
            x_data.append(review)
            y_data.append(y)
            
    return x_data, y_data
            
train_x_data, train_y_data = load_dataset(split='train')
test_x_data, test_y_data = load_dataset(split='test')

print("len(train_x_data): {}".format(len(train_x_data)))
print("len(test_x_data): {}".format(len(test_x_data)))

len(train_x_data): 25000

len(test_x_data): 25000

 

import nltk
nltk.download('punkt')

from nltk.tokenize import sent_tokenize


def average(x):
    return sum(x) / len(x)

avg_nb_words = average([len(review.strip().split()) for review in train_x_data])
print("Average # of words: {}".format(avg_nb_words))
# Average # of words: 233.7872

avg_nb_sentences = average([len(sent_tokenize(review)) for review in train_x_data])
print("Average # of sentences: {}".format(avg_nb_sentences))
# Average # of sentences: 10.83412

avg_nb_words_in_sentence = average([average([len(sent.strip().split()) for sent in sent_tokenize(review)]) for review in train_x_data])
print("Average # of words in sentence: {}".format(avg_nb_words_in_sentence))
# Average # of words in sentence: 24.953852357350815

import numpy as np
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical
from nltk.tokenize import sent_tokenize

tokenizer = Tokenizer()
tokenizer.fit_on_texts(train_x_data)
tokenizer.fit_on_texts(test_x_data)

max_nb_words = len(tokenizer.word_index) + 1

def doc2hierarchical(text,
                     max_sentences=MAX_SENTENCES,
                     max_sentence_length=MAX_SENTENCE_LENGTH):
    sentences = sent_tokenize(text)
    tokenized_sentences = tokenizer.texts_to_sequences(sentences)
    tokenized_sentences = pad_sequences(tokenized_sentences, maxlen=max_sentence_length)

    pad_size = max_sentences - tokenized_sentences.shape[0]

    if pad_size <= 0:  # tokenized_sentences.shape[0] < max_sentences
        tokenized_sentences = tokenized_sentences[:max_sentences]
    else:
        tokenized_sentences = np.pad(
            tokenized_sentences, ((0, pad_size), (0, 0)),
            mode='constant', constant_values=0
        )
    
    return tokenized_sentences
            
def build_dataset(x_data, y_data, 
                  max_sentences=MAX_SENTENCES, 
                  max_sentence_length=MAX_SENTENCE_LENGTH,
                  tokenizer=tokenizer):
    
    nb_instances = len(x_data)
    X_data = np.zeros((nb_instances, max_sentences, max_sentence_length), dtype='int32')
    for i, review in enumerate(x_data):
        tokenized_sentences = doc2hierarchical(review)
            
        X_data[i] = tokenized_sentences[None, ...]
        
    nb_classes = len(set(y_data))
    Y_data = to_categorical(y_data, nb_classes)
    
    return X_data, Y_data

train_X_data, train_Y_data = build_dataset(train_x_data, train_y_data)
test_X_data, test_Y_data = build_dataset(test_x_data, test_y_data)

print("train_X_data.shape: {}".format(train_X_data.shape))
print("test_X_data.shape: {}".format(test_X_data.shape))

train_X_data.shape: (25000, 10, 25)

test_X_data.shape: (25000, 10, 25)

 

from sklearn.model_selection import train_test_split
train_X_data, val_X_data, train_Y_data, val_Y_data = train_test_split(train_X_data, train_Y_data, 
                                                                      test_size=0.1, 
                                                                      random_state=42)

print("train_X_data.shape: {}".format(train_X_data.shape))
print("train_Y_data.shape: {}".format(train_Y_data.shape))
print("val_X_data.shape: {}".format(val_X_data.shape))
print("val_Y_data.shape: {}".format(val_Y_data.shape))

train_X_data.shape: (22500, 10, 25)

train_Y_data.shape: (22500, 2)

val_X_data.shape: (2500, 10, 25)

val_Y_data.shape: (2500, 2)

 

embedding_dir = './embeddings/'

def load_word2vec(tokenizer=tokenizer):
    from gensim.models import KeyedVectors
    embedding_path = os.path.join(embedding_dir, 'GoogleNews-vectors-negative300.bin')
    embeddings_index = KeyedVectors.load_word2vec_format(embedding_path, binary=True)
    
    return embeddings_index
    
def load_embedding(embedding_type='word2vec',
                   tokenizer=tokenizer,
                   embedding_dim=300):
    
    if embedding_type == 'word2vec':
        embeddings_index = load_word2vec()
        
    embedding_matrix = np.random.normal(0, 1, (max_nb_words, embedding_dim))
    for word, i in tokenizer.word_index.items():
        try:
            embedding_vector = embeddings_index[word]
        except KeyError:
            embedding_vector = None
        if embedding_vector is not None:
            embedding_matrix[i] = embedding_vector
            
    return embedding_matrix

embedding_matrix = load_embedding('word2vec')

print("embedding_matrix.shape: {}".format(embedding_matrix.shape))

embedding_matrix.shape: (124253, 300)

 

import tensorflow.keras as keras
from tensorflow.python.keras import backend as K
from tensorflow.python.keras.layers import Layer, InputSpec

from tensorflow.python.keras.layers import Input, Embedding, Dense
from tensorflow.python.keras.layers import Lambda, Permute, RepeatVector, Multiply
from tensorflow.python.keras.layers import Bidirectional, TimeDistributed
from tensorflow.python.keras.layers import GRU
from tensorflow.python.keras.layers import BatchNormalization, Dropout
from tensorflow.python.keras.models import Model, Sequential
from tensorflow.python.keras.callbacks import ModelCheckpoint
class AttentionLayer(Layer):
    def __init__(self, attention_dim, **kwargs):
        self.attention_dim = attention_dim
        super(AttentionLayer, self).__init__(**kwargs)
    
    def build(self, input_shape):
        self.W = self.add_weight(name='Attention_Weight',
                                 shape=(input_shape[-1], self.attention_dim),
                                 initializer='random_normal',
                                 trainable=True)
        self.b = self.add_weight(name='Attention_Bias',
                                 shape=(self.attention_dim, ),
                                 initializer='random_normal',
                                 trainable=True)
        self.u = self.add_weight(name='Attention_Context_Vector',
                                 shape=(self.attention_dim, 1),
                                 initializer='random_normal',
                                 trainable=True)
        super(AttentionLayer, self).build(input_shape)
        
    def call(self, x):
        # refer to the original paper
        # link: https://www.cs.cmu.edu/~hovy/papers/16HLT-hierarchical-attention-networks.pdf
        u_it = K.tanh(K.dot(x, self.W) + self.b)
        a_it = K.dot(u_it, self.u)
        a_it = K.squeeze(a_it, -1)
        a_it = K.softmax(a_it)
        
        return a_it
        
    def compute_output_shape(self, input_shape):
        return (input_shape[0], input_shape[1])
    

def WeightedSum(attentions, representations):
    # from Shape(batch_size, len_units) to Shape(batch_size, rnn_dim * 2, len_units)
    repeated_attentions = RepeatVector(K.int_shape(representations)[-1])(attentions)
    # from Shape(batch_size, rnn_dim * 2, len_units) to Shape(batch_size, len_units, lstm_dim * 2)
    repeated_attentions = Permute([2, 1])(repeated_attentions)

    # compute representation as the weighted sum of representations
    aggregated_representation = Multiply()([representations, repeated_attentions])
    aggregated_representation = Lambda(lambda x: K.sum(x, axis=1))(aggregated_representation)

    return aggregated_representation
    
    
def HieAtt(embedding_matrix,
           max_sentences,
           max_sentence_length,
           nb_classes,
           embedding_dim=300,
           attention_dim=100,
           rnn_dim=150,
           include_dense_batch_normalization=False,
           include_dense_dropout=True,
           nb_dense=1,
           dense_dim=300,
           dense_dropout=0.2,
           optimizer = keras.optimizers.Adam(lr=0.001)):

    # embedding_matrix = (max_nb_words + 1, embedding_dim)
    max_nb_words = embedding_matrix.shape[0] - 1
    embedding_layer = Embedding(max_nb_words + 1, 
                                embedding_dim,
                                weights=[embedding_matrix],
                                input_length=max_sentence_length,
                                trainable=False)

    # first, build a sentence encoder
    sentence_input = Input(shape=(max_sentence_length, ), dtype='int32')
    embedded_sentence = embedding_layer(sentence_input)
    embedded_sentence = Dropout(dense_dropout)(embedded_sentence)
    contextualized_sentence = Bidirectional(GRU(rnn_dim, return_sequences=True))(embedded_sentence)
    
    # word attention computation
    word_attention = AttentionLayer(attention_dim)(contextualized_sentence)
    sentence_representation = WeightedSum(word_attention, contextualized_sentence)
    
    sentence_encoder = Model(inputs=[sentence_input], 
                             outputs=[sentence_representation])

    # then, build a document encoder
    document_input = Input(shape=(max_sentences, max_sentence_length), dtype='int32')
    embedded_document = TimeDistributed(sentence_encoder)(document_input)
    contextualized_document = Bidirectional(GRU(rnn_dim, return_sequences=True))(embedded_document)
    
    # sentence attention computation
    sentence_attention = AttentionLayer(attention_dim)(contextualized_document)
    document_representation = WeightedSum(sentence_attention, contextualized_document)
    
    # finally, add fc layers for classification
    fc_layers = Sequential()
    for _ in range(nb_dense):
        if include_dense_batch_normalization == True:
            fc_layers.add(BatchNormalization())
        fc_layers.add(Dense(dense_dim, activation='relu'))
        if include_dense_dropout == True:
            fc_layers.add(Dropout(dense_dropout))
    fc_layers.add(Dense(nb_classes, activation='softmax'))
    
    pred_sentiment = fc_layers(document_representation)

    model = Model(inputs=[document_input],
                  outputs=[pred_sentiment])
    
    ############### build attention extractor ###############
    word_attention_extractor = Model(inputs=[sentence_input],
                                     outputs=[word_attention])
    word_attentions = TimeDistributed(word_attention_extractor)(document_input)
    attention_extractor = Model(inputs=[document_input],
                                     outputs=[word_attentions, sentence_attention])
    
    model.compile(loss=['categorical_crossentropy'],
              optimizer=optimizer,
              metrics=['accuracy'])

    return model, attention_extractor


model_name = "HieAtt"
model_path = './models/checkpoints/{}.h5'.format(model_name)
checkpointer = tf.keras.callbacks.ModelCheckpoint(filepath=model_path,monitor='val_acc',verbose=True,save_best_only=True,mode='max')


callback = tf.keras.callbacks.EarlyStopping(monitor='loss', patience=3)

model, attention_extractor = HieAtt(embedding_matrix=embedding_matrix,
                                    max_sentences=MAX_SENTENCES,
                                    max_sentence_length=MAX_SENTENCE_LENGTH,
                                    nb_classes=2,
                                    embedding_dim=300,
                                    attention_dim=100,
                                    rnn_dim=150,
                                    include_dense_batch_normalization=False,
                                    include_dense_dropout=True,
                                    nb_dense=1,
                                    dense_dim=300,
                                    dense_dropout=0.2,
                                    optimizer = keras.optimizers.Adam(lr=0.001))

history = model.fit(x=train_X_data,
                    y=train_Y_data,
                    batch_size=128,
                    epochs=30,
                    verbose=True,
                    validation_data=(val_X_data, val_Y_data),
                    callbacks=[checkpointer,callback]
                    
                    )

val_X_data.shape 

# (2500, 10, 25)

score = model.evaluate(test_X_data, test_Y_data, verbose=0, batch_size=128)
print("Test Accuracy of {}: {}".format(model_name, score[1]))

#Test Accuracy of HieAtt: 0.

import matplotlib.pyplot as plt

history_dict = history.history
loss = history_dict['loss']
val_loss = history_dict['val_loss']

epochs = range(1, len(loss) + 1)

plt.plot(epochs, loss, 'bo', label='Training loss')
plt.plot(epochs, val_loss, 'b', label='Validation loss')
plt.title('Training and validation loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

plt.show()

 

plt.clf()
acc = history_dict['accuracy']
val_acc = history_dict['val_accuracy']

epochs = range(1, len(loss) + 1)

plt.plot(epochs, acc, 'bo', label='Training acc')
plt.plot(epochs, val_acc, 'b', label='Validation acc')
plt.title('Training and validation accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

plt.show()

model.summary()

 

Model: "model_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
==================================================================================================
input_2 (InputLayer)            [(None, 10, 25)]     0                                            
__________________________________________________________________________________________________
time_distributed (TimeDistribut (None, 10, 300)      37712000    input_2[0][0]                    
__________________________________________________________________________________________________
bidirectional_1 (Bidirectional) (None, 10, 300)      405900      time_distributed[0][0]           
__________________________________________________________________________________________________
attention_layer_1 (AttentionLay (None, 10)           30200       bidirectional_1[0][0]            
__________________________________________________________________________________________________
repeat_vector_1 (RepeatVector)  (None, 300, 10)      0           attention_layer_1[0][0]          
__________________________________________________________________________________________________
permute_1 (Permute)             (None, 10, 300)      0           repeat_vector_1[0][0]            
__________________________________________________________________________________________________
multiply_1 (Multiply)           (None, 10, 300)      0           bidirectional_1[0][0]            
                                                                 permute_1[0][0]                  
__________________________________________________________________________________________________
lambda_1 (Lambda)               (None, 300)          0           multiply_1[0][0]                 
__________________________________________________________________________________________________
sequential (Sequential)         (None, 2)            90902       lambda_1[0][0]                   
==================================================================================================
Total params: 38,239,002
Trainable params: 963,102
Non-trainable params: 37,275,900

 

 

import seaborn as sn

word_rev_index = {}
for word, i in tokenizer.word_index.items():
    word_rev_index[i] = word

def sentiment_analysis(review):        
    tokenized_sentences = doc2hierarchical(review)
    
    # word attention만 가져오기
    pred_attention = attention_extractor.predict(np.asarray([tokenized_sentences]))[0][0]
    for sent_idx, sentence in enumerate(tokenized_sentences):
        if sentence[-1] == 0:
            continue
            
        for word_idx in range(MAX_SENTENCE_LENGTH):
            if sentence[word_idx] != 0:
                words = [word_rev_index[word_id] for word_id in sentence[word_idx:]]
                pred_att = pred_attention[sent_idx][-len(words):]
                pred_att = np.expand_dims(pred_att, axis=0)
                break

        fig, ax = plt.subplots(figsize=(len(words), 1))
        plt.rc('xtick', labelsize=16)
        midpoint = (max(pred_att[:, 0]) - min(pred_att[:, 0])) / 2
        heatmap = sn.heatmap(pred_att, xticklabels=words, yticklabels=False, square=True, linewidths=0.1, cmap='coolwarm', center=midpoint, vmin=0, vmax=1)
        plt.xticks(rotation=45)
        plt.show()
        
# sentiment_analysis("Delicious healthy food. The steak is amazing. Fish and pork are awesome too. Service is above and beyond. Not a bad thing to say about this place. Worth every penny!")
sentiment_analysis("i want to go home")

집에 가고 싶은 나의 마음을 잘 주의해 주었다..

blog.naver.com/PostView.nhn?blogId=hist0134&logNo=221386940063&parentCategoryNo=&categoryNo=21&viewDate=&isShowPopularPosts=true&from=search

 

 

이 블로그를 참고했다! 케라스 버전 에러를 다 수정해서 고쳐봤다.

 

 

 

Hierarchical Attention Networks for Document Classification 이 논문은 조만간 또 리뷰할 예정..

 

반응형

댓글