swh:1:snp:7ce5f1105410d5ee1ad6abfdc873986c25b579e5
Tip revision: 3f1ccee247290d97430561f25b945309e6aaa7eb authored by Dirk Roorda on 29 October 2020, 15:49:03 UTC
small fix
small fix
Tip revision: 3f1ccee
mql.py
"""
.. include:: ../../docs/convert/mql.md
"""
import os
import re
from itertools import chain
from ..core.data import WARP
from ..core.helpers import (
cleanName,
isClean,
specFromRanges,
rangesFromList,
setFromSpec,
nbytes,
console,
)
# If a feature, with type string, has less than ENUM_LIMIT values,
# an enumeration type for it will be created
# provided all values of that feature are a valid name for MQL.
ENUM_LIMIT = 1000
ONE_ENUM_TYPE = True
class MQL(object):
def __init__(self, mqlDir, mqlName, tfFeatures, tmObj):
error = tmObj.error
self.mqlDir = mqlDir
cleanDb = cleanName(mqlName)
if cleanDb != mqlName:
error(f'db name "{mqlName}" => "{cleanDb}"')
self.mqlName = cleanDb
self.tfFeatures = tfFeatures
self.tmObj = tmObj
self.enums = {}
self._check()
def write(self):
tmObj = self.tmObj
error = tmObj.error
info = tmObj.info
indent = tmObj.indent
if not self.good:
return
if not os.path.exists(self.mqlDir):
try:
os.makedirs(self.mqlDir, exist_ok=True)
except Exception:
error(f'Cannot create directory "{self.mqlDir}"')
self.good = False
return
mqlPath = f"{self.mqlDir}/{self.mqlName}.mql"
try:
fm = open(mqlPath, "w", encoding="utf8")
except Exception:
error(f"Could not write to {mqlPath}")
self.good = False
return
info(f"Loading {len(self.featureList)} features")
for ft in self.featureList:
fObj = self.features[ft]
fObj.load()
self.fm = fm
self._writeStartDb()
self._writeEnums()
self._writeTypes()
self._writeDataAll()
self._writeEndDb()
indent(level=0)
info("Done")
def _check(self):
tmObj = self.tmObj
error = tmObj.error
info = tmObj.info
indent = tmObj.indent
info(f"Checking features of dataset {self.mqlName}")
self.features = {}
self.featureList = []
indent(level=1)
for (f, fo) in sorted(self.tfFeatures.items()):
if fo.method is not None or f in WARP:
continue
fo.load(metaOnly=True)
if fo.isConfig:
continue
cleanF = cleanName(f)
if cleanF != f:
error(f'feature "{f}" => "{cleanF}"')
self.featureList.append(cleanF)
self.features[cleanF] = fo
good = True
for feat in (WARP[0], WARP[1], "__levels__"):
if feat not in self.tfFeatures:
error(
"{} feature {} is missing from data set".format(
"Warp"
if feat in WARP
else "Computed"
if feat.startswith("__")
else "Data",
feat,
)
)
good = False
else:
fObj = self.tfFeatures[feat]
if not fObj.load():
good = False
indent(level=0)
if not good:
error("Export to MQL aborted")
else:
info(f"{len(self.featureList)} features to export to MQL ...")
self.good = good
def _writeStartDb(self):
self.fm.write(
"""
CREATE DATABASE '{name}'
GO
USE DATABASE '{name}'
GO
""".format(
name=self.mqlName
)
)
def _writeEndDb(self):
self.fm.write(
"""
VACUUM DATABASE ANALYZE
GO
"""
)
self.fm.close()
def _writeEnums(self):
tmObj = self.tmObj
info = tmObj.info
indent = tmObj.indent
indent(level=0)
info("Writing enumerations")
indent(level=1)
for ft in self.featureList:
fObj = self.features[ft]
if fObj.isEdge or fObj.dataType == "int":
continue
fMap = fObj.data
fValues = sorted(set(fMap.values()))
if len(fValues) > ENUM_LIMIT:
continue
eligible = all(isClean(fVal) for fVal in fValues)
if not eligible:
unclean = [fVal for fVal in fValues if not isClean(fVal)]
console(
"\t{:<15}: {:>4} values, {} not a name, e.g. «{}»".format(
ft, len(fValues), len(unclean), unclean[0],
)
)
continue
self.enums[ft] = fValues
if ONE_ENUM_TYPE:
self._writeEnumsAsOne()
else:
for ft in sorted(self.enums):
self._writeEnum(ft)
indent(level=0)
info(f"Written {len(self.enums)} enumerations")
def _writeEnumsAsOne(self):
tmObj = self.tmObj
info = tmObj.info
fValues = list(chain.from_iterable((set(fV) for fV in self.enums.values())))
if len(fValues):
info(f"Writing an all-in-one enum with {len(fValues):>4} values")
fValuesEnumerated = ",\n\t".join(
"{} = {}".format(fVal, i) for (i, fVal) in enumerate(fValues)
)
self.fm.write(
f"""
CREATE ENUMERATION all_enum = {{
{fValuesEnumerated}
}}
GO
"""
)
def _writeEnum(self, ft):
tmObj = self.tmObj
info = tmObj.info
fValues = self.enums[ft]
if len(fValues):
info(f"enum {ft:<15} with {len(fValues):>4} values")
fValuesEnumerated = ",\n\t".join(
f"{fVal} = {i}" for (i, fVal) in enumerate(fValues)
)
self.fm.write(
f"""
CREATE ENUMERATION {ft}_enum = {{
{fValuesEnumerated}
}}
GO
"""
)
def _writeTypes(self):
def valInt(n):
return str(n)
def valStr(s):
if "'" in s:
return '"{}"'.format(s.replace('"', '\\"'))
else:
return "'{}'".format(s)
def valIds(ids):
return "({})".format(",".join(str(i) for i in ids))
tmObj = self.tmObj
error = tmObj.error
info = tmObj.info
indent = tmObj.indent
self.levels = self.tfFeatures["__levels__"].data[::-1]
indent(level=0)
info(
"Mapping {} features onto {} object types".format(
len(self.featureList), len(self.levels),
)
)
otypeSupport = {}
for (otype, av, start, end) in self.levels:
cleanOtype = cleanName(otype)
if cleanOtype != otype:
error(f'otype "{otype}" => "{cleanOtype}"')
otypeSupport[cleanOtype] = set(range(start, end + 1))
self.otypes = {}
self.featureTypes = {}
self.featureMethods = {}
for ft in self.featureList:
fObj = self.features[ft]
if fObj.isEdge:
dataType = "LIST OF id_d"
method = valIds
else:
if fObj.dataType == "str":
dataType = 'string DEFAULT ""'
method = valInt if ft in self.enums else valStr
elif fObj.dataType == "int":
dataType = "integer DEFAULT 0"
method = valInt
else:
dataType = 'string DEFAULT ""'
method = valStr
self.featureTypes[ft] = dataType
self.featureMethods[ft] = method
support = set(fObj.data.keys())
for otype in otypeSupport:
if len(support & otypeSupport[otype]):
self.otypes.setdefault(otype, []).append(ft)
for otype in (cleanName(x[0]) for x in self.levels):
self._writeType(otype)
def _writeType(self, otype):
self.fm.write(
f"""
CREATE OBJECT TYPE
[{otype}
"""
)
for ft in self.otypes[otype]:
fType = (
"{}_enum".format("all" if ONE_ENUM_TYPE else ft)
if ft in self.enums
else self.featureTypes[ft]
)
self.fm.write(f" {ft}:{fType};\n")
self.fm.write(
"""
]
GO
"""
)
def _writeDataAll(self):
tmObj = self.tmObj
info = tmObj.info
info(
"Writing {} features as data in {} object types".format(
len(self.featureList), len(self.levels),
)
)
oslotsData = self.tfFeatures[WARP[1]].data
self.oslots = oslotsData[0]
self.maxSlot = oslotsData[1]
for (otype, av, start, end) in self.levels:
self._writeData(otype, start, end)
def _writeData(self, otype, start, end):
tmObj = self.tmObj
info = tmObj.info
indent = tmObj.indent
fm = self.fm
indent(level=1, reset=True)
info(f"{otype} data ...")
oslots = self.oslots
maxSlot = self.maxSlot
oFeats = self.otypes[otype]
features = self.features
featureMethods = self.featureMethods
fm.write(
"""
DROP INDEXES ON OBJECT TYPE[{o}]
GO
CREATE OBJECTS
WITH OBJECT TYPE[{o}]
""".format(
o=otype
)
)
curSize = 0
LIMIT = 50000
t = 0
j = 0
indent(level=2, reset=True)
for n in range(start, end + 1):
oMql = """
CREATE OBJECT
FROM MONADS= {{ {m} }}
WITH ID_D={i} [
""".format(
m=n
if n <= maxSlot
else specFromRanges(rangesFromList(oslots[n - maxSlot - 1])),
i=n,
)
for ft in oFeats:
method = featureMethods[ft]
fMap = features[ft].data
if n in fMap:
oMql += f"{ft}:={method(fMap[n])};\n"
oMql += """
]
"""
fm.write(oMql)
curSize += len(bytes(oMql, encoding="utf8"))
t += 1
j += 1
if j == LIMIT:
fm.write(
"""
GO
CREATE OBJECTS
WITH OBJECT TYPE[{o}]
""".format(
o=otype
)
)
info(
f"batch of size {nbytes(curSize):>20} with {j:>7} of {t:>7} {otype}s"
)
j = 0
curSize = 0
info(f"batch of size {nbytes(curSize):>20} with {j:>7} of {t:>7} {otype}s")
fm.write(
"""
GO
CREATE INDEXES ON OBJECT TYPE[{o}]
GO
""".format(
o=otype
)
)
indent(level=1)
info("{} data: {} objects".format(otype, t))
# MQL IMPORT
uniscan = re.compile(r"(?:\\x..)+")
def makeuni(match):
""" Make proper unicode of a text that contains byte escape codes
such as backslash xb6
"""
byts = eval('"' + match.group(0) + '"')
return byts.encode("latin1").decode("utf-8")
def uni(line):
return uniscan.sub(makeuni, line)
def tfFromMql(mqlFile, tmObj, slotType=None, otext=None, meta=None):
error = tmObj.error
if slotType is None:
error("ERROR: no slotType specified")
return (False, {}, {}, {})
(good, objectTypes, tables, edgeF, nodeF) = parseMql(mqlFile, tmObj)
if not good:
return (False, {}, {}, {})
return tfFromData(tmObj, objectTypes, tables, edgeF, nodeF, slotType, otext, meta)
def parseMql(mqlFile, tmObj):
info = tmObj.info
error = tmObj.error
info("Parsing mql source ...")
fh = open(mqlFile, encoding="utf8")
objectTypes = dict()
tables = dict()
edgeF = dict()
nodeF = dict()
curId = None
curEnum = None
curObjectType = None
curTable = None
curObject = None
curValue = None
curFeature = None
seeObjects = False
inObjectTypeFeatures = False
STRING_TYPES = {"ascii", "string"}
enums = dict()
chunkSize = 1000000
inThisChunk = 0
good = True
for (ln, line) in enumerate(fh):
inThisChunk += 1
if inThisChunk == chunkSize:
info(f"\tline {ln + 1:>9}")
inThisChunk = 0
if line.startswith("CREATE OBJECTS WITH OBJECT TYPE") or line.startswith(
"WITH OBJECT TYPE"
):
comps = line.rstrip().rstrip("]").split("[", 1)
curTable = comps[1]
info(f"\t\tobjects in {curTable}")
curObject = None
if curTable not in tables:
tables[curTable] = dict()
seeObjects = True
elif line == "CREATE OBJECT\n":
curObject = None
curObject = dict(feats=dict(), monads=None)
curId = None
seeObjects = True
elif curEnum is not None:
if line.startswith("}"):
curEnum = None
continue
comps = line.strip().rstrip(",").split("=", 1)
comp = comps[0].strip()
words = comp.split()
if words[0] == "DEFAULT":
enums[curEnum]["default"] = uni(words[1])
value = words[1]
else:
value = words[0]
enums[curEnum]["values"].append(value)
elif curObjectType is not None:
if line.startswith("]"):
curObjectType = None
inObjectTypeFeatures = False
continue
if curObjectType is True:
if line.startswith("["):
curObjectType = line.rstrip()[1:]
objectTypes[curObjectType] = dict()
info(f"\t\totype {curObjectType}")
inObjectTypeFeatures = True
continue
if inObjectTypeFeatures:
comps = line.strip().rstrip(";").split(":", 1)
feature = comps[0].strip()
fInfo = comps[1].strip()
fCleanInfo = fInfo.replace("FROM SET", "")
fInfoComps = fCleanInfo.split(" ", 1)
fMQLType = fInfoComps[0]
if len(fInfoComps) == 2:
fDefaultComps = fInfoComps[1].strip().split(" ", 1)
fDefault = fDefaultComps[1] if len(fDefaultComps) > 1 else None
else:
fDefault = None
if fDefault is not None and fMQLType in STRING_TYPES:
fDefault = uni(fDefault[1:-1])
default = enums.get(fMQLType, {}).get("default", fDefault)
ftype = (
"str"
if fMQLType in enums
else "int"
if fMQLType == "integer"
else "str"
if fMQLType in STRING_TYPES
else "int"
if fInfo == "id_d"
else "str"
)
isEdge = fMQLType == "id_d"
if isEdge:
edgeF.setdefault(curObjectType, set()).add(feature)
else:
nodeF.setdefault(curObjectType, set()).add(feature)
objectTypes[curObjectType][feature] = (ftype, default)
info(
"\t\t\tfeature {} ({}) =def= {} : {}".format(
feature, ftype, default, "edge" if isEdge else "node"
)
)
elif seeObjects:
if curObject is not None:
if line.startswith("]"):
objectType = objectTypes[curTable]
for (feature, (ftype, default)) in objectType.items():
if feature not in curObject["feats"] and default is not None:
curObject["feats"][feature] = default
tables[curTable][curId] = curObject
curObject = None
continue
elif line.startswith("["):
name = line.rstrip()[1:]
if len(name):
curTable = name
if curTable not in tables:
tables[curTable] = dict()
elif line.startswith("FROM MONADS"):
monads = (
line.split("=", 1)[1]
.replace("{", "")
.replace("}", "")
.replace(" ", "")
.strip()
)
curObject["monads"] = setFromSpec(monads)
elif line.startswith("WITH ID_D"):
comps = line.replace("[", "").rstrip().split("=", 1)
curId = int(comps[1])
elif line.startswith("GO"):
pass
elif line.strip() == "":
pass
else:
if curValue is not None:
toBeContinued = not line.rstrip().endswith('";')
if toBeContinued:
curValue += line
else:
curValue += line.rstrip().rstrip(";").rstrip('"')
curObject["feats"][curFeature] = uni(curValue)
curValue = None
curFeature = None
continue
if ":=" in line:
(featurePart, valuePart) = line.split("=", 1)
feature = featurePart[0:-1].strip()
valuePart = valuePart.lstrip()
isText = ':="' in line
toBeContinued = isText and not line.rstrip().endswith('";')
if toBeContinued:
# this happens if a feature value
# contains a new line
# we must continue scanning lines
# until we meet the end of the value
curFeature = feature
curValue = valuePart.lstrip('"')
else:
value = valuePart.rstrip().rstrip(";").strip('"')
curObject["feats"][feature] = (
uni(value) if isText else value
)
else:
error(f"ERROR: line {ln}: unrecognized line -->{line}<--")
good = False
break
else:
if line.startswith("CREATE OBJECT"):
curObject = dict(feats=dict(), monads=None)
curId = None
else:
if line.startswith("CREATE ENUMERATION"):
words = line.split()
curEnum = words[2]
enums[curEnum] = dict(default=None, values=[])
info(f"\t\tenum {curEnum}")
elif line.startswith("CREATE OBJECT TYPE"):
curObjectType = True
inObjectTypeFeatures = False
info(f"{ln + 1} lines parsed")
fh.close()
for table in tables:
info(f"{len(tables[table])} objects of type {table}")
if len(tables) == 0:
info("No objects found")
return (good, objectTypes, tables, nodeF, edgeF)
def tfFromData(tmObj, objectTypes, tables, nodeF, edgeF, slotType, otext, meta):
info = tmObj.info
info("Making TF data ...")
NIL = {"nil", "NIL", "Nil"}
tableOrder = [slotType] + [t for t in sorted(tables) if t != slotType]
iddFromMonad = dict()
slotFromMonad = dict()
nodeFromIdd = dict()
iddFromNode = dict()
nodeFeatures = dict()
edgeFeatures = dict()
metaData = dict()
# metadata that ends up in every feature
metaData[""] = {} if meta is None else meta
# the config feature otext
metaData["otext"] = otext
good = True
info("Monad - idd mapping ...")
for idd in tables.get(slotType, {}):
monad = list(tables[slotType][idd]["monads"])[0]
iddFromMonad[monad] = idd
info("Removing holes in the monad sequence")
# we set up a monad - slot mapping
curSlot = 0
otype = dict()
for monad in sorted(iddFromMonad):
curSlot += 1
slotFromMonad[monad] = curSlot
idd = iddFromMonad[monad]
nodeFromIdd[idd] = curSlot
iddFromNode[curSlot] = idd
otype[curSlot] = slotType
maxSlot = curSlot
info(f"maxSlot={maxSlot}")
info("Node mapping and otype ...")
node = maxSlot
for t in tableOrder[1:]:
for idd in sorted(tables[t]):
node += 1
nodeFromIdd[idd] = node
iddFromNode[node] = idd
otype[node] = t
nodeFeatures["otype"] = otype
metaData["otype"] = dict(valueType="str")
info("oslots ...")
oslots = dict()
for t in tableOrder[1:]:
for idd in tables.get(t, {}):
node = nodeFromIdd[idd]
monads = tables[t][idd]["monads"]
oslots[node] = {slotFromMonad[m] for m in monads}
edgeFeatures["oslots"] = oslots
metaData["oslots"] = dict(valueType="str")
info("metadata ...")
for t in nodeF:
for f in nodeF[t]:
ftype = objectTypes[t][f][0]
metaData.setdefault(f, {})["valueType"] = ftype
for t in edgeF:
for f in edgeF[t]:
metaData.setdefault(f, {})["valueType"] = "str"
info("features ...")
chunkSize = 100000
for t in tableOrder:
info(f"\tfeatures from {t}s")
inThisChunk = 0
thisTable = tables.get(t, {})
for (i, idd) in enumerate(thisTable):
inThisChunk += 1
if inThisChunk == chunkSize:
info(f"\t{i + 1:>9} {t}s")
inThisChunk = 0
node = nodeFromIdd[idd]
features = tables[t][idd]["feats"]
for (f, v) in features.items():
isEdge = f in edgeF.get(t, set())
if isEdge:
if v not in NIL:
edgeFeatures.setdefault(f, {}).setdefault(node, set()).add(
nodeFromIdd[int(v)]
)
else:
nodeFeatures.setdefault(f, {})[node] = v
info(f"\t{len(thisTable):>9} {t}s")
return (good, nodeFeatures, edgeFeatures, metaData)