https://github.com/donmesh/ALDONAr
Revision 71f0914d39c4dbf5bf56107955cfabfc5c29780b authored by donmesh on 13 January 2020, 19:06:47 UTC, committed by donmesh on 13 January 2020, 19:06:47 UTC
0 parent
Raw File
Tip revision: 71f0914d39c4dbf5bf56107955cfabfc5c29780b authored by donmesh on 13 January 2020, 19:06:47 UTC
Uploaded files
Tip revision: 71f0914
DataReader.py
import xml.etree.ElementTree as ET
import re
import numpy as np
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import seaborn as sns
import os
import pandas as pd
import itertools

class Reader(): 
  class Review():
    def __init__(self):
      self.id = None
      self.sentences = {}
    def __str__(self):
      text = 'REVIEW: {}\n'.format(self.id)
      for sentence in self.sentences.values():
        text += str(sentence)
      return text           
  class Sentence():
    def __init__(self):
      self.id = None
      self.text = ''
      self.opinions = []
    def __str__(self):
      text = 'SENTENCE: {}\n{}\n'.format(self.id, self.text)
      if self.opinions:
        text += 'SENTENCE-LEVEL OPINIONS:\n'
        for opinion in self.opinions:
          text += str(opinion) + "\n"
      return text   
  class Opinion():
    def __init__(self):
      self.entity, self.attribute = None, None
      self.category, self.polarity, self.target = None, None, None
      self.start, self.end = None, None
    def __str__(self):
      if self.target:
        text = "[{}; {}] '{}' ({}-{})".format(self.category, self.polarity, self.target, self.start, self.end)
      return text

  def __init__(self, FLAGS):
    self.train_proportion = FLAGS.train_proportion
    self.seed = FLAGS.seed
    
  def cleaning(self, text):
    text = re.sub("(")", '"', text)
    text = re.sub("(')", "'", text)
    text = re.sub("(&)", "and", text) 
    text = re.sub(r"[^\w\s]", " ", text)  # Replace punctuation with spaces
    text = re.sub("\d", "", text)         # Remove numbers
    text = re.sub("\s+", " ", text)       # Replace all spaces with 1 space
    text = re.sub("^\s+|\s+$", "", text)  # Remove spaces in the beginning and in the end
    return text    
  
  def readData(self, file):
    self.reviews = {}
    tree = ET.parse(file)
    root = tree.getroot()
    sentences, aspects, polarities = [],[],[]
    longest_sentence = 0
    for R in root:
      review = self.Review()
      for s in R.find('sentences').findall('sentence'):
        sentence = self.Sentence()
        sentence.text = self.cleaning(s.find('text').text.lower())
        if s.get('OutOfScope'):
          continue
        else:
          if s.find('Opinions'):
            for o in s.find('Opinions').findall('Opinion'):
              opinion = self.Opinion()
              opinion.target = self.cleaning(o.get("target").lower())
              if ((len(opinion.target) > len(sentence.text)) or
                  (sentence.text.find(opinion.target) == -1) or
                  (opinion.target == "null")):
                   continue            
              if opinion.target != '':
                opinion.category = o.get('category').lower()
                opinion.entity, opinion.attribute = opinion.category.split('#')
                opinion.polarity = o.get('polarity').lower()
                opinion.start = int(o.get('from'))
                opinion.end = int(o.get('to'))
                sentence.opinions.append(opinion)
                sentences.append(sentence.text)
                aspects.append(opinion.target)
                polarities.append(opinion.polarity)  
            sentence.id = s.get('id')        
            review.sentences[sentence.id] = sentence
            if longest_sentence < len(sentence.text.split()):
              longest_sentence = len(sentence.text.split())
      review.id = R.get('rid')
      self.reviews[review.id] = review
    sentences = np.array(sentences).reshape([-1,1])
    aspects = np.array(aspects).reshape([-1,1])
    polarities = np.array(polarities).reshape([-1,1])
    data = np.concatenate([np.concatenate([sentences, aspects], axis=1), polarities], axis=1)
    return data, longest_sentence
  
  def splitTrainData(self, data):
    X_train, X_test, y_train, y_test = train_test_split(data[:,:2], data[:,2], 
                                                        train_size=self.train_proportion, random_state=self.seed)
    
    data_train = np.concatenate([X_train, y_train.reshape([-1,1])], axis=1)
    data_test = np.concatenate([X_test, y_test.reshape([-1,1])], axis=1)
    return data_train, data_test  
    
  def transformSent2idx(self, data, word2idx):
    sentences, aspects, polarities = [], [], []
    UNKNOWN_TOKEN = word2idx['UNK']
    for sentence, aspect, polarity in data:
      temp_aspect = [word2idx.get(word, UNKNOWN_TOKEN) for word in aspect.split()]
      aspect_new = [value for value in temp_aspect if value != UNKNOWN_TOKEN]
      if len(aspect_new) == 0:
        continue
      temp_sentence = [word2idx.get(word, UNKNOWN_TOKEN) for word in sentence.split()]
      sentence_new = [value for value in temp_sentence if value !=  UNKNOWN_TOKEN]
      aspects.append(aspect_new)
      sentences.append(sentence_new)
      if polarity == 'negative': polarities.append(np.array([1,0,0]))
      elif polarity == 'neutral': polarities.append(np.array([0,1,0]))
      elif polarity == 'positive': polarities.append(np.array([0,0,1]))
    sentences = np.array(list(itertools.zip_longest(*sentences, fillvalue=0))).T
    aspects = np.array(list(itertools.zip_longest(*aspects, fillvalue=0))).T
    polarities = np.array(polarities)
    data = [sentences, aspects, polarities]
    return data 

  def readEmbeddings(self, file):
    word2idx = {'PAD': 0 }
    embeddings = []    
    with open(file, 'r', encoding='utf8') as f: 
      for index, line in enumerate(f): 
        values = line.split()
        word = values[0]
        word_embeddings = np.asarray(values[1:], dtype=np.float32)
        word2idx[word] = index + 1
        embeddings.append(word_embeddings) 
    EMBEDDINGS_DIM = len(embeddings[0])
    embeddings.insert(0, np.zeros(EMBEDDINGS_DIM))
    embeddings.append(np.random.randn(EMBEDDINGS_DIM))
    embeddings = np.asarray(embeddings, dtype=np.float32)
    word2idx['UNK'] = len(embeddings)
    idx2word = {v: k for k, v in word2idx.items()}
    return embeddings, word2idx, idx2word 

  def aspectCategories(self, purpose):
    categories = []
    for r in self.reviews.values():
      for s in r.sentences.values():
        for o in s.opinions:
          categories.append(o.category)
    categories = pd.DataFrame(categories, columns=['category'])      
    cat_order = pd.unique(categories.category)
    cat_order.sort()
    g = sns.catplot('category', data=categories, kind='count', aspect=1.5, 
                    palette=sns.color_palette('gray', len(cat_order)),
                    order = cat_order)
    g.set_xticklabels(rotation=90, size=13)
    g.set_xlabels(size=15)
    g.set_ylabels(size=15)
    plt.title(purpose.title()+ ' data', fontsize=20, y=1.05)
    if not os.path.exists('./Results'):
      os.makedirs('./Results')
    plt.savefig('./Results/{}_aspects.png'.format(purpose), bbox_inches='tight')
    plt.show()
    
  def activationFunctions(self):
    sigmoid = lambda x: 1/(1+np.exp(-x))
    x = np.arange(-5,5,0.1)
    y= np.empty([2,100])
    y[0,:]= np.tanh(x)
    y[1,:] = sigmoid(x)
    i = 0
    for fun in ['tanh', 'sigmoid']:
        fig = plt.figure(fun)
        ax = fig.add_subplot(111)
        ax.plot(x,y[i,:])
        ax.spines['left'].set_position('zero')
        ax.spines['right'].set_color('none')
        ax.spines['bottom'].set_position('zero')
        ax.spines['top'].set_color('none')
        ax.spines['left'].set_smart_bounds(True)
        ax.spines['bottom'].set_smart_bounds(True)
        ax.xaxis.set_ticks_position('bottom')
        ax.yaxis.set_ticks_position('left')    
        ax.axhline(linewidth = 0.5, color = 'black')
        ax.axvline(linewidth = 0.5, color = 'black')
        plt.title('y = '+fun+'(x)')
        plt.show()
        i += 1    

  def transformPolarity(self, polarity):
    if polarity == 'negative': return np.array([1,0,0])
    elif polarity == 'neutral': return np.array([0,1,0])
    elif polarity == 'positive': return np.array([0,0,1])
    else: Exception(polarity) 

  def plotAttention(self, weights, sentence, file):
    plt.figure('attention', figsize=(14,14))
    weights = weights[weights!=0]
    labels = np.array([f'{sentence}\n{weights:.2f}' for sentence, weights in zip(sentence, weights)]).reshape([1,-1])
    sns.heatmap([weights], annot=labels, yticklabels=False, xticklabels=False, cbar=False, square=True, cmap='binary', fmt='', annot_kws={'size':15})
    plt.savefig(file)

  def plotPolarityDistribution(self, data_train, data_test, print_results, plot_results):
    train = np.array(list(map(self.transformPolarity, np.array(data_train)[:,2])))
    test = np.array(list(map(self.transformPolarity, np.array(data_test)[:,2])))
    total_train = train.sum()
    total_test = test.sum()
    
    nega, neua, pa = train.sum(0)
    max_height = np.max([nega,neua,pa])
    negb, neub, pb = test.sum(0)

    neg_train = np.ones([nega])*1.6
    neg_test = np.ones([negb])*1.8
    neu_train = np.ones([neua])*0.9
    neu_test = np.ones([neub])*1.1
    pos_train = np.ones([pa])*0.2
    pos_test = np.ones([pb])*0.4    
    
    if print_results:
        table = pd.DataFrame(columns = pd.MultiIndex.from_product([['Negative', 'Neutral', 'Positive', 'total'], 
                                                                  ["Freq.", "%"]]),
                            data    =  [[nega, np.round(100*nega/total_train,2),
                                         neua, np.round(100*neua/total_train,2), 
                                         pa, np.round(100*pa/total_train,2), 
                                         total_train, np.round(100*total_train/total_train)],
                                        [negb, np.round(100*negb/total_test,2), 
                                         neub, np.round(100*neub/total_test,2),
                                         pb, np.round(100*pb/total_test,2),
                                         total_test, np.round(100*total_test/total_test)]],
                            index   =  ["Train", "Test"])
        print(table)
        
    if plot_results:
      fig = plt.figure("Polarity distribution", figsize=(8, 3))
      fig.set_size_inches(8, 3.3, forward=True)
      ax = fig.add_subplot(111)
      
      ax.spines["left"].set_position("zero")
      ax.spines["right"].set_color("none")
      ax.spines["top"].set_color("none")
      ax.spines["left"].set_smart_bounds(True)
      ax.spines["bottom"].set_smart_bounds(True)
      ax.xaxis.set_ticks([])
      ax.yaxis.set_ticks_position("left")
      ax.yaxis.set_label_text("Number of sentences")
      ax.set_ylim([0, max_height+100])
      
      green = sns.color_palette("dark", 2)[1]
      red = sns.color_palette("OrRd",10)[8]
      ax.hist(pos_train, label = "Train positive", color=green, hatch = "//")
      ax.hist(pos_test, label = "Test positive", color=green, hatch = "\\\\")
      ax.hist(neu_train, label = "Train neutral", color="grey", hatch = "//")
      ax.hist(neu_test, label = "Test neutral", color="grey", hatch = "\\\\")
      ax.hist(neg_train, label = "Train negative", color=red, hatch = "//")
      ax.hist(neg_test, label = "Test negative", color=red, hatch = "\\\\")
      
      ax.annotate("{}%".format(np.round(100*pa/total_train,2)), xy=(0.12, 1325))
      ax.annotate("{}%".format(np.round(100*pb/total_test,2)), xy=(0.35, 495))
      ax.annotate("{}%".format(np.round(100*neua/total_train,2)), xy=(0.85, 85))
      ax.annotate("{}%".format(np.round(100*neub/total_test,2)), xy=(1.05, 40))  
      ax.annotate("{}%".format(np.round(100*nega/total_train,2)), xy=(1.52, 502))
      ax.annotate("{}%".format(np.round(100*negb/total_test,2)), xy=(1.73, 150))
      plt.legend(loc="upper right")
      plt.show()
back to top