https://github.com/aghie/lam
Tip revision: 20ebff57e945cec9c24175abdc1d6d6b92d6d8e9 authored by DVC on 03 April 2018, 13:35:05 UTC
Create LICENSE
Create LICENSE
Tip revision: 20ebff5
preprocess.py
import string
import nltk
import sys
import numpy as np
from nltk.tokenize import StanfordTokenizer
from nltk.tokenize import TweetTokenizer
from stop_words import get_stop_words
from collections import OrderedDict
from nltk.stem.snowball import SnowballStemmer
from nltk.tag import StanfordNERTagger
from model_init.subjective_lexicon import mpqa_priors_to_dict
stopwords = get_stop_words("en")
#Stopwords typically occurring in debates
stopwords += [
"gentleman",
"hon",
"right",
"rose—",
"minister",
"member",
"members",
"mr",
"mrs",
"friend",
"mr_speaker",
"mr","speaker",
"chief_secretary",
"her_majesty",
"her","majesty",
"house_of_commons",
"house_of_lords",
]
"""
Remove documents that are either too long or too short
"""
def remove_texts(docs,high=60, low=10, strategy='zipfs'):
lengths = [len(doc) for doc_id, doc in docs.items()]
new_docs = OrderedDict({})
mean = np.mean(lengths)
std = np.std(lengths)
print("------------------------------",
"Printing some texts statistics",
"------------------------------")
print ("Mean:",mean,"Std: +/-",std)
for percentile in range(100,10,-5):
percentile_value = np.percentile(lengths, percentile)
print ("Percentile "+str(percentile)+" = "+str(percentile_value))
if strategy == 'mean-std': #not good, std desviation is too large
for doc_id, doc in docs.items():
if mean < len(doc) and len(doc) < mean+2*std:
new_docs[doc_id] = doc
elif strategy == 'zipfs':
for doc_id, doc in docs.items():
if low <= len(doc) and len(doc) <= high:
new_docs[doc_id] = doc
elif strategy == 'percentile':
for doc_id, doc in docs.items():
if np.percentile(lengths,15) <= len(doc) and len(doc) <= np.percentile(lengths,95):
new_docs[doc_id] = doc
return new_docs
"""
Removes the words that are too common or too rare
Each doc_i is a list of tuples (textid,[(word,tag)])
"""
def remove_words(docs, high = 120, low =0, strategy='mean-std'):
vocab = {}
for doc_id, doc in docs.items():
for word,postag in doc:
if word in vocab:
vocab[word] += 1
else:
vocab[word] = 1
lengths = np.array([vocab[word] for word in vocab])
sorted_lengths = sorted(lengths,reverse=True)
mean = np.mean(lengths)
std = np.std(lengths)
print("------------------------------",
"Printing some word statistics",
"------------------------------")
print ("Mean:",mean,"Std: +/-",std)
for percentile in range(100,10,-5):
percentile_value = np.percentile(lengths, percentile)
print ("Percentile "+str(percentile)+" = "+str(percentile_value))
new_docs = OrderedDict({})
if strategy == 'mean-std':
for doc_id, doc in docs.items():
new_words = [(word,postag) for word,postag in doc
if (mean-2*std) < vocab[word] and vocab[word] < (mean+2*std)]
if new_words != []:
new_docs[doc_id] = new_words
elif strategy == 'zipfs':
for doc_id, doc in docs.items():
new_docs[doc_id] = []
for word,postag in doc:
if vocab[word] <= high and vocab[word] >= low:
new_docs[doc_id].append((word,postag))
elif strategy == 'percentile':
for doc_id, doc in docs.items():
new_docs[doc_id] = []
for word,postag in doc:
if vocab[word] <= np.percentile(lengths,99) and vocab[word] >= np.percentile(lengths,65):
new_docs[doc_id].append((word,postag))
return new_docs
"""
Determines if a string s is a number
"""
def is_number(s):
try:
complex(s)
except ValueError:
return False
return True
def _is_content_word(self,postag):
return postag.startswith('n') or postag.startswith('jj') or postag.startswith('vb') or postag.startswith('rb')
"""
Lemmatizes a list of documents
Each doc_i is a list of tuples (textid,[(word,tag)])
"""
def lemmatize_words(docs):
from nltk.stem import WordNetLemmatizer
wordnet_lemmatizer = WordNetLemmatizer()
new_docs = OrderedDict({})
for doc_id, doc in docs.items():
new_docs[doc_id] = []
for word,postag in doc:
try:
lemma = wordnet_lemmatizer.lemmatize(word,pos=postag.lower()[0])
new_docs[doc_id].append((lemma.lower(),postag))
except KeyError:
lemma = word
new_docs[doc_id].append((lemma.lower(),postag))
return new_docs
"""
Removes the stopwords from a list of documents
Each doc_i is a list of tuples (textid,[(word,tag)])
"""
def remove_stopwords(docs):
punctuation = string.punctuation + string.digits+"”"+"“"
new_docs = OrderedDict({})
for doc_id, doc in docs.items():
new_docs[doc_id] = []
for word,postag in doc:
if word not in stopwords and word not in punctuation and not is_number(word) and len(word) > 1: #To avoid rare web characters that might be not considered among the stopword lists
new_docs[doc_id].append((word,postag))
return new_docs
# """
# Returns a list o docs annotated with NER information
# Each doc_i is a list of tuples (textid,[(word,tag)])
# """
# def NER(docs):
#
# print ("NER... (it might take some seconds/minutes)")
# st = StanfordNERTagger('/home/david.vilares/Descargas/stanford-ner-2012-11-11-nodistsim/conll.closed.iob2.crf.ser.gz',
# '/home/david.vilares/Descargas/stanford-ner-2015-12-09/stanford-ner.jar',
# encoding='utf-8')
#
# new_docs = OrderedDict({})
# #We append all docs not to be calling ther NER jar for every single document
# aux_docs = []
# docs_id = []
# for doc_id, doc in docs.items():
# aux_docs.append(doc)
# docs_id.append(doc_id)
# ner_docs = st.tag_sents(aux_docs)
#
# if len(docs_id) != len(ner_docs): raise ValueError
# #We can do this zip because we assumed docs is an ordered dict!
# for doc_id,ner_doc in zip(docs_id,ner_docs):
# composed_ner = []
# aux = []
# for word, ner in ner_doc:
# if len(word) > 0:
# if ner == 'O':
# #If we finished computing a multiword NER
# #we needed to put it in the list first
# if composed_ner != []:
# aux.append('_'.join(composed_ner))
# composed_ner = []
# aux.append(word)
# else:
# if ner.startswith('B-') and composed_ner != []:
# aux.append('_'.join(composed_ner))
# composed_ner = [word]
# else:
# composed_ner.append(word)
# new_docs[doc_id] = aux
# return new_docs
"""
Each doc_i is a list of tuples (textid,[(word,tag)])
"""
def lowercase(docs):
new_docs = OrderedDict({})
for doc_id, doc in docs.items():
new_docs[doc_id] = [(w.lower(),p) for w,p in doc]
return new_docs
def tokenize(docs, punctuation = string.punctuation,
tokenizer = TweetTokenizer()):
new_docs = OrderedDict({})
for doc_id, doc in docs.items():
new_docs[doc_id] = tokenizer.tokenize(doc)
return new_docs
"""
Tags a collection of documents
Each doc_i is a list of tuples (textid, text)
"""
def postagging(docs):
new_docs = OrderedDict({})
for doc_id, doc in docs.items():
new_docs[doc_id] = nltk.pos_tag(docs[doc_id])
return new_docs
def _index_inside_negating_scope(index, indexes_negating, scope = 4):
aux = False
for index_negating in indexes_negating:
if index_negating < index and index_negating + scope > index:
return True
return aux
"""
Applies a simple negation heuristic to created 'negation bi-grams' (e.g.
transforms not good into not_good)
Each doc_i is a list of tuples (word,tag)
"""
def neg_subjective(docs, negating_terms=['not','no'], subjective_dict_priors={}):
new_docs = OrderedDict({})
for doc_id, doc in docs.items():
new_docs[doc_id] = []
indexes_negating = []
#identify negating terms
index = 0
for word,postag in doc:
if word in negating_terms:
indexes_negating.append(index)
index+=1
index = 0
for word,postag in doc:
if _index_inside_negating_scope(index, indexes_negating) and word in subjective_dict_priors:
new_docs[doc_id].append(("not_"+word,postag))
#reverse subjective priors
if (len(subjective_dict_priors[word]) == 2 or len(subjective_dict_priors[word]) % 2 !=0 ):
subjective_dict_priors["not_"+word] = subjective_dict_priors[word][::-1]
else:
raise NotImplementedError("List of subjectivity priors must be 2 or odd number to be able to apply an inversion of the subjective priors")
else:
new_docs[doc_id].append((word,postag))
index+=1
return new_docs
def remove_outlines(docs):
print ("Removing most common/rare words...")
non_outlinewords_docs = remove_words(docs, strategy='percentile')
print("Removing the largest/shortest texts...")
non_outline_docs = remove_texts(non_outlinewords_docs, strategy='percentile')
return non_outline_docs
"""
Each doc is a list of tuples (textid, text)
"""
def preprocess(ori_docs, subjective_dict_priors):
print ("Tokenizing...")
tokenized_docs = tokenize(ori_docs, tokenizer= TweetTokenizer())
print ("PoS tagging...")
postagged_docs = postagging(tokenized_docs)
# ner_docs = NER(tokenized_docs)
print ("Lemmatizing...")
lematized_docs = lemmatize_words(postagged_docs)
print ("Lowercasing...")
lowercase_docs = lowercase(lematized_docs)
if subjective_dict_priors is not None:
print ("Applying simple negation scope")
neg_subjective_docs= neg_subjective(lowercase_docs,
negating_terms = ['not'],
subjective_dict_priors=subjective_dict_priors)
else:
neg_subjective_docs = lowercase_docs
print ("Removing stopwords...")
non_stopwords_docs = remove_stopwords(neg_subjective_docs)
return non_stopwords_docs