https://gricad-gitlab.univ-grenoble-alpes.fr/coavouxm/flaubertagger.git
Tip revision: c939ca9fac094ac3c379256ef3d3d4d14a5a4bf1 authored by m on 23 February 2024, 16:44:50 UTC
up
up
Tip revision: c939ca9
corpus_reader.py
import os
from collections import defaultdict
from enum import IntEnum
import logging
SPECIAL_CHARS = {"-LRB-", "-RRB-", "#RRB#", "#LRB#"} # warning: duplicate from word_encoders
def get_corpus_filenames(corpus_id):
FTB_root = os.path.expanduser("~/data/FRENCH_SPMRL/gold/conll/{split}/{split}.French.gold.conll")
FTB_train = FTB_root.format(split="train")
FTB_dev = FTB_root.format(split="dev")
FTB_test = FTB_root.format(split="test")
UD_treebanks_folder={
"fqb": "~/data/ud-treebanks-v2.6/UD_French-FQB",
"ftb": "~/data/ud-treebanks-v2.6/UD_French-FTB",
"gsd": "~/data/ud-treebanks-v2.6/UD_French-GSD",
"partut": "~/data/ud-treebanks-v2.6/UD_French-ParTUT",
"pud": "~/data/ud-treebanks-v2.6/UD_French-PUD",
"sequoia": "~/data/ud-treebanks-v2.6/UD_French-Sequoia",
"spoken": "~/data/ud-treebanks-v2.6/UD_French-Spoken"}
UD_treebanks_folder = {k: os.path.expanduser(v) for k, v in UD_treebanks_folder.items()}
if corpus_id == "ftb_spmrl":
return FTB_train, FTB_dev, FTB_test
assert(corpus_id in {"fqb", "ftb", "gsd", "partut", "pud", "sequoia", "spoken"})
root_folder = UD_treebanks_folder[corpus_id]
train = f"{root_folder}/fr_{corpus_id}-ud-train.conllu"
dev = f"{root_folder}/fr_{corpus_id}-ud-dev.conllu"
test = f"{root_folder}/fr_{corpus_id}-ud-test.conllu"
if not os.path.exists(train):
train = None
if not os.path.exists(dev):
dev = None
if not os.path.exists(test):
test = None
return train, dev, test
def normalize(features):
if features == "_":
return features
features = features.split("|")
features = [f for f in features if f != "_"]
features = "|".join(sorted(features))
return features
def feats2dict(features):
features = features.split("|")
features = [f.split("=") for f in features]
features = dict(features)
return features
def dict2feats(feats):
return "|".join([f"{k}={v}" for k, v in sorted(feats.items())])
def bin_features(features, bins):
if features == "":
features="_"
if features == "_":
return ["_"] * len(bins)
features = feats2dict(features)
res = []
for feat_bin in bins:
feats = dict((k, features[k]) for k in feat_bin if k in features)
if len(feats) > 0:
res.append(dict2feats(feats))
else:
res.append("_")
return res
def get_bins(corpus_id):
bins = [("m", "t"), ("p", "n"), ("mwehead", "pred"), ("s", "g")]
if corpus_id != "ftb_spmrl":
bins =[("Mood", "VerbForm", "Tense"), ("Number", "Person"), ("Definite", "Gender", "NumType", "Polarity", "Poss", "PronType", "Reflex", "Voice", "Typo", "Foreign", "Case")]
return bins
class ConllID(IntEnum):
ID, TOKEN, LEMMA, CPOS, FPOS, FEATURES, HEAD, FUN = list(range(8))
class DepTree:
NONE = "_"
#ID, TOKEN, LEMMA, CPOS, FPOS, FEATURES, HEAD, FUN = list(range(8))
def __init__(self, tokens, cpos=None, fpos=None, features=None, heads=None, functions=None, corpus_id=None):
#1 Certes certes ADV ADV _ 5 mod 5 mod
self.tokens = tokens
#self.lemmas = lemmas
self.cpos = cpos
self.fpos = fpos
self.heads = heads
self.features = features
self.functions = functions
self.corpus_id = corpus_id
if cpos is None:
self.cpos = [DepTree.NONE] * len(self.tokens)
if fpos is None:
self.fpos = [DepTree.NONE] * len(self.tokens)
if heads is None: # WARNING: have to solve amalgames to mmake heads right
self.heads = [DepTree.NONE] * len(self.tokens)
if features is None:
self.features = [DepTree.NONE] * len(self.tokens)
else:
self.features = [normalize(feats) for feats in self.features]
if functions is None:
self.functions = [DepTree.NONE] * len(self.tokens)
def get_training_example(self, coarse):
bins = get_bins(self.corpus_id)
pos = self.cpos if coarse else self.fpos
return {"tokens": self.tokens,
"pos": pos,
#"features": [bin_features(feats, bins) for feats in self.features],
"all_tags": [[pos_tag] + bin_features(feats, bins) for pos_tag, feats in zip(pos, self.features)]}
def __len__(self):
return len(self.tokens)
def __str__(self):
result = []
for i in range(len(self)):
line = f"{i+1}\t{self.tokens[i]}\t_\t{self.cpos[i]}\t{self.fpos[i]}\t{self.features[i]}\t{self.heads[i]}\t{self.functions[i]}\t_\t_"
result.append(line)
return "\n".join(result)
@staticmethod
def from_string(string, mapping=None, corpus_id=None):
tokens = []
cpos = []
fpos = []
features = []
heads = []
functions = []
lines = string.split("\n")
i = 0
while i < len(lines):
line = lines[i]
if not line.strip():
i +=1
continue
if line[0] == "#":
i +=1
continue
line = line.split("\t")
if "-" in line[0]:
# amalgame
first = lines[i+1].split("\t")
second = lines[i+2].split("\t")
tokens.append(line[ConllID.TOKEN])
cpos.append(first[ConllID.CPOS] + "+" + second[ConllID.CPOS])
fpos.append(first[ConllID.FPOS] + "+" + second[ConllID.FPOS])
feats = [first[ConllID.FEATURES], second[ConllID.FEATURES]]
feats = [f for f in feats if f != "_"]
features.append("|".join(feats))
h1 = first[ConllID.HEAD]
h2 = second[ConllID.HEAD]
h = h1
if h1 != h2:
ids = [first[ConllID.ID], second[ConllID.ID]]
not_in_ids = [potential_head for potential_head in [h1, h2] if potential_head not in ids]
assert(len(not_in_ids) > 0)
h = not_in_ids[-1]
heads.append(int(h))
functions.append(first[ConllID.FUN] + "+" + second[ConllID.FUN])
i += 3
else:
tokens.append(line[ConllID.TOKEN])
cpos.append(line[ConllID.CPOS])
fpos.append(line[ConllID.FPOS])
features.append(line[ConllID.FEATURES])
if line[ConllID.HEAD] != "_":
heads.append(int(line[ConllID.HEAD]))
functions.append(line[ConllID.FUN])
i += 1
if len(tokens) > 0:
return DepTree(tokens, cpos=cpos, fpos=fpos, features=features, heads=heads, functions=functions, corpus_id=corpus_id)
return None
def read_conll_treebank(filename, corpus_id):
if filename is None:
return None
trees = []
with open(filename, encoding="utf8") as f:
tree_strings = f.read().split("\n\n")
for tree_string in tree_strings:
tree = DepTree.from_string(tree_string, corpus_id=corpus_id)
if tree is not None:
trees.append(tree)
return trees
def write_conll_treebank(trees, filename):
with open(filename, "w", encoding="utf8") as f:
for t in trees:
f.write(f"{str(t)}\n\n")
"""
def load_FTB(args):
corpus_id = "ftb_spmrl"
FTB_train, FTB_dev, FTB_test = get_corpus_filenames(corpus_id)
train = read_conll_treebank(FTB_train, corpus_id)
dev = read_conll_treebank(FTB_dev, corpus_id)
test = read_conll_treebank(FTB_test, corpus_id)
if args.S is not None:
train = train[:args.S]
dev = dev[:args.S]
tags = [defaultdict(int) for i in range(5)]
for tree in train:
training_example = tree.get_training_example()
for token_tags in training_example["all_tags"]:
for i, tag in enumerate(token_tags):
tags[i][tag] += 1
i2tags = [sorted(tagset) for tagset in tags]
tags2i = [{t: i for i, t in enumerate(tagset)} for tagset in i2tags]
return {"corpus": (train, dev, test),
"i2tags": i2tags,
"tags2i": tags2i,
"stats": tags,
"header": ["POS", "tense_mood", "number_person", "mwe", "subcat_gender"]}
"""
def reassign_tokens(train, dev, test):
train_spmrl, dev_spmrl, test_spmrl = get_corpus_filenames("ftb_spmrl")
train_spmrl = read_conll_treebank(train_spmrl, "ftb_spmrl")
dev_spmrl = read_conll_treebank(dev_spmrl, "ftb_spmrl")
test_spmrl = read_conll_treebank(test_spmrl, "ftb_spmrl")
for t1, t2 in zip(train + dev + test,
train_spmrl + dev_spmrl + test_spmrl):
if t1.tokens[0] != t2.tokens[0]:
print(t1.tokens)
print(t2.tokens)
print()
t1.tokens = [tok for tok in t2.tokens]
def load_corpus(args):
corpus_id = args.corpus
train, dev, test = get_corpus_filenames(corpus_id)
train = read_conll_treebank(train, corpus_id)
dev = read_conll_treebank(dev, corpus_id)
test = read_conll_treebank(test, corpus_id)
# Some corpus specific pre-treatment
if corpus_id == "ftb":
reassign_tokens(train, dev, test)
if corpus_id == "partut":
for t in train + dev + test:
for i in range(len(t)):
if t.tokens[i] == "\xad":
t.tokens[i] = "-"
if args.S is not None:
train = train[:args.S]
dev = dev[:args.S]
aux_data = []
if args.aux_data is not None:
aux_data = read_conll_treebank(args.aux_data, "artificial")
bins = get_bins(corpus_id)
tags = [defaultdict(int) for i in range(len(bins)+1)]
char_freqs = defaultdict(int)
word_freqs = defaultdict(int)
all_features = {k for ks in bins for k in ks}
for tree in train + aux_data:
training_example = tree.get_training_example(args.cpos)
for token_tags in training_example["all_tags"]:
for i, tag in enumerate(token_tags):
tags[i][tag] += 1
for token_features in tree.features:
if token_features not in {"_", ""}:
token_features = feats2dict(token_features)
ignored = [f for f in token_features if f not in all_features]
for ign in ignored:
logging.warning(f"Ignored feature type {ign}")
for token in training_example["tokens"]:
word_freqs[token] += 1
if token in SPECIAL_CHARS:
char_freqs[token] += 1
for char in token:
char_freqs[char] += 1
i2tags = [sorted(tagset) for tagset in tags]
tags2i = [{t: i for i, t in enumerate(tagset)} for tagset in i2tags]
bins = ["_".join(binn) for binn in bins]
i2chars = ["<PAD>", "<UNK>", "<START>", "<STOP>", "<SOS>", "<EOS>"] + sorted(char_freqs, key = lambda x: char_freqs[x], reverse=True)
i2words = ["<PAD>", "<UNK>", "<SOS>", "<EOS>"] + sorted(word_freqs, key=lambda x: word_freqs[x], reverse=True)
return {"corpus": (train, dev, test),
"aux_data": aux_data,
"char2i": {c: i for i, c in enumerate(i2chars)},
"word2i": {w: i for i, w in enumerate(i2words)},
"i2tags": i2tags,
"tags2i": tags2i,
"stats": tags,
"header": ["POS"] + bins}
# "tense_mood", "number_person", "mwe", "subcat_gender"]}
if __name__ == "__main__":
# ftb_dev = read_conll_treebank(FTB_dev)
# write_conll_treebank(ftb_dev, "test")
#
# ftb_dev = read_conll_treebank("test")
# write_conll_treebank(ftb_dev, "test2")
# ftb_train = read_conll_treebank(FTB_train)
# newc = defaultdict(int)
# for tree in ftb_train:
# for f in tree.features:
# newc[f] += 1
# print()
# for k in newc:
# if k not in DepTree.counts:
# print("not in dept", k)
# for k in DepTree.counts:
# if k not in newc:
# print("not in newc", k)
# print(sum(newc.values()))
# print(sum(DepTree.counts.values()))
# print(len(newc))
# print(len(DepTree.counts))
#for k in ["fqb", "ftb", "gsd", "partut", "pud", "sequoia", "spoken"]:
corpus_id = "sequoia"
train, dev, test = get_corpus_filenames(corpus_id)
train = read_conll_treebank(train, corpus_id)
write_conll_treebank(train, "test")
train = read_conll_treebank("test", corpus_id)
write_conll_treebank(train, "test2")
# dev = read_conll_treebank(dev)
# test = read_conll_treebank(test)