We are hiring ! See our job offers.
Raw File
RNN_subFun.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Mon Mar 16 10:41:49 2020

@author: sanoev
Subfunction to use the RNN_Model
"""

import math
import numpy as np
import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout, Activation, LSTM, Bidirectional, Embedding
import nltk
from nltk.lm.preprocessing import padded_everygram_pipeline
from nltk.lm import MLE
from nltk.lm import Vocabulary

#%%
def shuffle_and_split_training_set(sentences_original, next_original, percentage_test=2):
    # shuffle at unison
    print('Shuffling sentences')

    tmp_sentences = []; tmp_sentences_inx = [];
    tmp_next_word = [];
    for i in np.random.permutation(len(sentences_original)):
        tmp_sentences_inx.append(i)
        tmp_sentences.append(sentences_original[i])
        tmp_next_word.append(next_original[i])

    cut_index = int(len(sentences_original) * (1.-(percentage_test/100.)))
    x_train, x_test = tmp_sentences[:cut_index], tmp_sentences[cut_index:]
    y_train, y_test = tmp_next_word[:cut_index], tmp_next_word[cut_index:]

    print("Size of training set = %d" % len(x_train))
    print("Size of test set = %d" % len(y_test))
    return (x_train, y_train), (x_test, y_test), (tmp_sentences_inx, cut_index)


# Data generator for fit and evaluate
def generator(sentence_list, next_word_list, batch_size, SEQUENCE_LEN, word_indices):
    index = 0
    while True:
        x = np.zeros((batch_size, SEQUENCE_LEN), dtype=np.int32)
        y = np.zeros((batch_size), dtype=np.int32)
        for i in range(batch_size):
            for t, w in enumerate(sentence_list[index % len(sentence_list)]):
                x[i, t] = word_indices[w]
            y[i] = word_indices[next_word_list[index % len(sentence_list)]]
            index = index + 1
        yield x, y

# Data generator for fit and evaluate
def generator_return(sentence_list, next_word_list, batch_size, SEQUENCE_LEN, word_indices):
    index = 0
    x = np.zeros((batch_size, SEQUENCE_LEN), dtype=np.int32)
    y = np.zeros((batch_size), dtype=np.int32)
    for i in range(batch_size):
        for t, w in enumerate(sentence_list[index % len(sentence_list)]):
            x[i, t] = word_indices[w]
        y[i] = word_indices[next_word_list[index % len(sentence_list)]]
        index = index + 1
    return x, y

def cal_perplexity_model(sentences, next_words, Ngram = 0):
    # https://stackoverflow.com/questions/54941966/how-can-i-calculate-perplexity-using-nltk   
    n=2    
    tokenized_text = [sen + [next_words[it]] for it,sen in enumerate(sentences)]
    train_data = [nltk.bigrams(t,  pad_right=False, pad_left=False) for t in tokenized_text]
    words = [word for sent in tokenized_text for word in sent]
    padded_vocab = Vocabulary(words)
    NLTKmodel = MLE(n)
    NLTKmodel.fit(train_data, padded_vocab)
    return NLTKmodel

def get_model(words, SEQUENCE_LEN = [], dropout=0.2, EmbLay = []):
    print('Build model...')
    model = Sequential()
    if len(EmbLay) == 0:
        model.add(Embedding(input_dim=len(words), output_dim=300))
    else:       
        model.add(Embedding(input_dim=len(words), output_dim = len(EmbLay[0])))
        model.layers[0].set_weights([EmbLay])
        model.layers[0].trainable = False
    model.add(Bidirectional(LSTM(128)))
    if dropout > 0:
        model.add(Dropout(dropout))
    model.add(Dense(len(words)))
    model.add(Activation('softmax'))
    return model


# Functions from keras-team/keras/blob/master/examples/lstm_text_generation.py
def sample(preds, temperature=1.0):
    # helper function to sample an index from a probability array
    preds = np.asarray(preds).astype('float64')
    preds = np.log(preds) / temperature
    exp_preds = np.exp(preds)
    preds = exp_preds / np.sum(exp_preds)
    probas = np.random.multinomial(1, preds, 1)
    return np.argmax(probas)

#%% on ending of an epoch
# define a class for the model
class CustomModelEval(keras.callbacks.Callback):
    def __init__(self, examples_file, meanProb_file, SEQUENCE_LEN, word_indices, indices_word, sentences, next_words, sentences_test, next_words_test, bigramModel):
        self.sentences = sentences
        self.next_words = next_words
        self.sentences_test = sentences_test
        self.next_words_test= next_words_test
        self.examples_file = examples_file
        self.meanProb_file = meanProb_file
        self.SEQUENCE_LEN = SEQUENCE_LEN
        self.word_indices = word_indices
        self.indices_word = indices_word
        self.bigram = bigramModel

    def on_epoch_end(self,epoch, logs):
        # Function invoked at end of each epoch. Prints generated text.
        self.examples_file.write('\n----- Generating text after Epoch: %d\n' % epoch)
    
        # Randomly pick a seed sequence
        seed_index = np.random.randint(len(self.sentences+self.sentences_test))
        seed = (self.sentences+self.sentences_test)[seed_index]
    
        for diversity in [0.3, 0.4, 0.5, 0.6, 0.7]:
            sentence = seed
            self.examples_file.write('----- Diversity:' + str(diversity) + '\n')
            self.examples_file.write('----- Generating with seed:\n"' + ' '.join(sentence) + '"\n')
            self.examples_file.write(' '.join(sentence))
    
            for i in range(50):
                x_pred = np.zeros((1, self.SEQUENCE_LEN))
                for t, word in enumerate(sentence):
                    x_pred[0, t] = self.word_indices[word]
    
                preds = self.model.predict(x_pred, verbose=0)[0]
                next_index = sample(preds, diversity)
                next_word = self.indices_word[next_index]
    
                sentence = sentence[1:]
                sentence.append(next_word)
    
                self.examples_file.write(" "+next_word)
            self.examples_file.write('\n')
        self.examples_file.write('='*80 + '\n')
        self.examples_file.flush()
        
        # get the mean probability and perplexity of the model:
        # do in many batches...
        bs = 10
        stepT = int(len(self.sentences)/bs)
        AP = np.array([])
        indMaxWord = np.array([])
        for bv in range(4):            
            rangeV = [stepT*bv,stepT*(bv+1)]
            add = stepT*bv
            if bv == 3:
                rangeV[1]=len(self.sentences)+1
               
            # probability
            [modelInput,labels] = generator_return(self.sentences[rangeV[0]:rangeV[1]], self.next_words[rangeV[0]:rangeV[1]], len(self.sentences[rangeV[0]:rangeV[1]]), 10, self.word_indices)
            preds = self.model.predict(modelInput)     
            indMaxWord=np.append(indMaxWord, np.argmax(preds,1))
            for it in range(preds.shape[0]):
                AP = np.append(AP, preds[:,labels[it+add]])
                      
        # perplexity
        tokenized_text = [sen + [self.indices_word[indMaxWord[it]]] for it,sen in enumerate(self.sentences)] # the whole sentence        
        test_data = [nltk.bigrams(t,  pad_right=False, pad_left=False) for t in tokenized_text]
        tokenized_text_bi = [[sen[-1]] + [self.indices_word[indMaxWord[it]]] for it,sen in enumerate(self.sentences)] # the whole sentence        
        test_data_bigram = [nltk.bigrams(t,  pad_right=False, pad_left=False) for t in tokenized_text_bi]
         
        adpr = 0; adper = 0; adbi = 0; cntper = 0; cntbi = 0
        for it in range(len(self.sentences)):           
            #adpr = adpr+preds[it,labels[it]] # probability actual output
            adpr = adpr+AP[it]
            per = self.bigram.perplexity(test_data[it])
            bi = self.bigram.perplexity(test_data_bigram[it])
            if math.isinf(per) == False:
                adper = adper + per
                cntper = cntper+1
            if math.isinf(bi) == False:
                adbi = adbi + bi
                cntbi = cntbi+1
        meanP = adpr/len(self.next_words)
        meanPer = adper/cntper
        meanBi= adbi/cntbi
                     
        # validation data
        # probability
        [modelInput,labels] = generator_return(self.sentences_test, self.next_words_test, len(self.sentences_test), 10, self.word_indices)
        preds = self.model.predict(modelInput)
        # perplexity
        indMaxWord = np.argmax(preds,1)
        tokenized_text = [sen + [self.indices_word[indMaxWord[it]]] for it,sen in enumerate(self.sentences_test)] # the whole sentence        
        test_data = [nltk.bigrams(t,  pad_right=False, pad_left=False) for t in tokenized_text]
        tokenized_text_bi = [[sen[-1]] + [self.indices_word[indMaxWord[it]]] for it,sen in enumerate(self.sentences_test)] # the whole sentence        
        test_data_bigram = [nltk.bigrams(t,  pad_right=False, pad_left=False) for t in tokenized_text_bi]
        
        adpr = 0; adper = 0; adbi = 0; cntperVal = 0; cntbiVal = 0
        for it in range(len(self.sentences_test)):           
            adpr = adpr+preds[it,labels[it]] # probability actual output
            per = self.bigram.perplexity(test_data[it])
            bi = self.bigram.perplexity(test_data_bigram[it])
            if math.isinf(per) == False:
                adper = adper + per
                cntperVal = cntperVal+1
            if math.isinf(bi) == False:
                adbi = adbi + bi
                cntbiVal = cntbiVal+1
        meanPVal = adpr/len(self.next_words_test)
        meanPerVal = adper/cntperVal
        meanBiVal = adbi/cntbiVal
       
        self.meanProb_file.write('\n----- Prop/Per/Bigram after Epoch: %d\n' % epoch)
        self.meanProb_file.write('PropTest\t' + str(meanP) + '\t' + str(len(self.next_words)) + '\n')
        self.meanProb_file.write('PropVal\t' + str(meanPVal) + '\t' + str(len(self.next_words_test)) +  '\n')
        self.meanProb_file.write('PerplexicityTest\t' + str(meanPer) + '\t' + str(cntper/len(self.next_words)) + '\n')
        self.meanProb_file.write('PerplexityVal\t' + str(meanPerVal) + '\t' + str(cntperVal/len(self.next_words_test)) + '\n')
        self.meanProb_file.write('BigramTest\t' + str(meanBi) + '\t' + str(cntbi/len(self.next_words)) + '\n')
        self.meanProb_file.write('BigramVal\t' + str(meanBiVal) + '\t' + str(cntbiVal/len(self.next_words_test)) +  '\n')
        self.meanProb_file.flush()
    
#%% on ending of an epoch
# define a class for the model
class CustomModelEval_v2(keras.callbacks.Callback):
    def __init__(self, examples_file, meanProb_file, SEQUENCE_LEN, word_indices, indices_word, sentences, next_words, sentences_test, next_words_test, bigramModel):
        self.sentences = sentences
        self.next_words = next_words
        self.sentences_test = sentences_test
        self.next_words_test= next_words_test
        self.examples_file = examples_file
        self.meanProb_file = meanProb_file
        self.SEQUENCE_LEN = SEQUENCE_LEN
        self.word_indices = word_indices
        self.indices_word = indices_word
        self.bigram = bigramModel

    def on_epoch_end(self,epoch, logs):
        # Function invoked at end of each epoch. Prints generated text.
        self.examples_file.write('\n----- Generating text after Epoch: %d\n' % epoch)
    
        # Randomly pick a seed sequence
        seed_index = np.random.randint(len(self.sentences+self.sentences_test))
        seed = (self.sentences+self.sentences_test)[seed_index]
    
        for diversity in [0.3, 0.4, 0.5, 0.6, 0.7]:
            sentence = seed
            self.examples_file.write('----- Diversity:' + str(diversity) + '\n')
            self.examples_file.write('----- Generating with seed:\n"' + ' '.join(sentence) + '"\n')
            self.examples_file.write(' '.join(sentence))
    
            for i in range(50):
                x_pred = np.zeros((1, self.SEQUENCE_LEN))
                for t, word in enumerate(sentence):
                    x_pred[0, t] = self.word_indices[word]
    
                preds = self.model.predict(x_pred, verbose=0)[0]
                next_index = sample(preds, diversity)
                next_word = self.indices_word[next_index]
    
                sentence = sentence[1:]
                sentence.append(next_word)
    
                self.examples_file.write(" "+next_word)
            self.examples_file.write('\n')
        self.examples_file.write('='*80 + '\n')
        self.examples_file.flush()
        
        # get the mean probability and perplexity of the model (only of valdation).. Training takes too long:
        # do in many batches...
    
                     
        # validation data
        # probability
        [modelInput,labels] = generator_return(self.sentences_test, self.next_words_test, len(self.sentences_test), 10, self.word_indices)
        preds = self.model.predict(modelInput)
        # perplexity
        indMaxWord = np.argmax(preds,1)
        tokenized_text = [sen + [self.indices_word[indMaxWord[it]]] for it,sen in enumerate(self.sentences_test)] # the whole sentence        
        test_data = [nltk.bigrams(t,  pad_right=False, pad_left=False) for t in tokenized_text]
        tokenized_text_bi = [[sen[-1]] + [self.indices_word[indMaxWord[it]]] for it,sen in enumerate(self.sentences_test)] # the whole sentence        
        test_data_bigram = [nltk.bigrams(t,  pad_right=False, pad_left=False) for t in tokenized_text_bi]
        
        adpr = 0; adper = 0; adbi = 0; cntperVal = 0; cntbiVal = 0
        for it in range(len(self.sentences_test)):           
            adpr = adpr+preds[it,labels[it]] # probability actual output
            per = self.bigram.perplexity(test_data[it])
            bi = self.bigram.perplexity(test_data_bigram[it])
            if math.isinf(per) == False:
                adper = adper + per
                cntperVal = cntperVal+1
            if math.isinf(bi) == False:
                adbi = adbi + bi
                cntbiVal = cntbiVal+1
        meanPVal = adpr/len(self.next_words_test)
        meanPerVal = adper/cntperVal
        meanBiVal = adbi/cntbiVal
        cntper = 0
        meanP = 0
        meanPer = 0
        meanBi = 0
        cntbi = 0
        
       
        self.meanProb_file.write('\n----- Prop/Per/Bigram after Epoch: %d\n' % epoch)
        self.meanProb_file.write('PropTest\t' + str(meanP) + '\t' + str(len(self.next_words)) + '\n')
        self.meanProb_file.write('PropVal\t' + str(meanPVal) + '\t' + str(len(self.next_words_test)) +  '\n')
        self.meanProb_file.write('PerplexicityTest\t' + str(meanPer) + '\t' + str(cntper/len(self.next_words)) + '\n')
        self.meanProb_file.write('PerplexityVal\t' + str(meanPerVal) + '\t' + str(cntperVal/len(self.next_words_test)) + '\n')
        self.meanProb_file.write('BigramTest\t' + str(meanBi) + '\t' + str(cntbi/len(self.next_words)) + '\n')
        self.meanProb_file.write('BigramVal\t' + str(meanBiVal) + '\t' + str(cntbiVal/len(self.next_words_test)) + '\n')
        self.meanProb_file.flush()
back to top