Raw File
"""
# Text-Fabric kernel

Text-Fabric can be used as a service.
The full API of Text-Fabric needs a lot of memory, which makes it unusably for
rapid successions of loading and unloading, like when used in a web server context.

However, you can start TF as a service process,
after which many clients can connect to it,
all looking at the same (read-only) data. We call this a **TF kernel**.

The API that the TF kernel offers is limited,
it is primarily template search that is offered.
see *Kernel API* below.

## Start

You can run the TF kernel as follows:

```sh
python -m tf.server.kernel ddd
```

where `ddd` points to a corpus, see `tf.app.use`.

!!! example
    See the
    [start-up script](https://github.com/annotation/text-fabric/blob/master/tf/server/start.py)
    of the text-fabric browser.

## Connect

The TF kernel can be connected by an other Python program as follows:

```python
from tf.server.kernel import makeTfConnection
TF = makeTfConnection(lhost, port)
api = TF.connect()
```

After this, `api` can be used to obtain information from the TF kernel.

See the web server of the text-fabric browser, `tf.server.web`.

## Kernel API

The API of the TF kernel is created by the function `makeTfKernel`.

It returns a class `TfKernel` with a number of exposed methods
that can be called by other programs.

For the machinery of interprocess communication we rely on the
[rpyc](https://github.com/tomerfiliba/rpyc) module.
See especially the docs on
[services](https://rpyc.readthedocs.io/en/latest/docs/services.html#services).

!!! explanation "Shadow objects"
    The way rpyc works in the case of data transmission has a pitfall.
    When a service returns a Python object to the client, it
    does not return the object itself, but only a shadow object
    so called *netref* objects. This strategy is called
    [boxing](https://rpyc.readthedocs.io/en/latest/docs/theory.html#boxing).
    To the client the shadow object looks like the real thing,
    but when the client needs to access members, they will be fetched
    on the fly.

    This is a performance problem when the service sends a big list or dict,
    and the client iterates over all its items. Each item will be fetched in
    a separate interprocess call, which causes an enormous overhead.

    Boxing only happens for mutable objects. And here lies the work-around:

    The service must send big chunks of data as immutable objects,
    such as tuples. They are sent within a single interprocess call,
    and fly swiftly through the connecting pipe.
"""

import sys
import pickle
from functools import reduce

from ..parameters import GH
from ..capable import Capable
from ..core.helpers import console
from ..core.timestamp import AUTO
from ..advanced.app import findApp
from ..advanced.highlight import getPassageHighlights
from ..advanced.search import runSearch, runSearchCondensed
from ..advanced.helpers import getRowsX
from ..advanced.tables import compose, composeP, composeT
from ..advanced.text import specialCharacters

from .command import argKernel


Cap = Capable("browser")
rpyc = Cap.load("rpyc")
ThreadedServer = Cap.loadFrom("rpyc", "ThreadedServer")


TF_DONE = "TF setup done."
TF_ERROR = "Could not set up TF"


# KERNEL CREATION


def makeTfKernel(app, appName, port):
    if not Cap.can("browser"):
        console(f"{TF_ERROR}")
        return False

    if not app.api:
        console(f"{TF_ERROR}")
        return False

    TF = app.api.TF
    reset = TF.reset
    cache = TF.cache

    reset()
    cache = {}
    console(f"{TF_DONE}\nKernel listening at port {port}")

    class TfKernel(rpyc.Service):
        def on_connect(self, conn):
            self.app = app
            pass

        def on_disconnect(self, conn):
            self.app = None
            pass

        def exposed_monitor(self):
            """A utility function that spits out some information from the kernel
            to the outside world.

            At this moment it is only used for debugging, but later it can be useful
            to monitor the kernel or manage it while it remains running.
            """

            app = self.app
            api = app.api
            S = api.S

            searchExe = getattr(S, "exe", None)
            if searchExe:
                searchExe = searchExe.outerTemplate

            _msgCache = cache(_asString=True)

            data = dict(searchExe=searchExe, _msgCache=_msgCache)
            return data

        def exposed_header(self):
            """Fetches all the stuff to create a header.

            This is shown after loading a data set.
            It contains links to data and documentation of the data source.
            """

            app = self.app
            return app.header()

        def exposed_provenance(self):
            """Fetches provenance metadata to be shown on exported data pages."""

            app = self.app
            aContext = app.context
            backend = app.backend
            org = aContext.org
            repo = aContext.repo
            commit = aContext.commit
            appProvenance = (
                (
                    ("backend", backend),
                    ("name", appName),
                    ("org", org),
                    ("repo", repo),
                    ("commit", commit),
                ),
            )
            return (appProvenance, app.provenance)

        def exposed_setNames(self):
            """Gets the names of the custom sets that the kernel has loaded.

            The kernel can load additional sets of data triggered by the
            `--sets=` command line argument with which the kernel
            was started.

            A web server kan use this informatiomn to write out provenance info.
            """

            app = self.app
            return (
                tuple(sorted(app.sets.keys()))
                if hasattr(app, "sets") and type(app.sets) is dict
                else ()
            )

        def exposed_css(self):
            """Delivers the CSS code to be inserted on the browser page."""

            app = self.app
            return f'<style type="text/css">{app.loadCss()}</style>'

        def exposed_characters(self, fmt=None):
            """Delivers the HTML for a widget of hard-to-type characters."""

            app = self.app
            return specialCharacters(app, fmt=fmt, _browse=True)

        def exposed_context(self):
            """Fetches the TF app context settings for the corpus."""

            app = self.app

            return pickle.dumps(app.context)

        def exposed_passage(
            self,
            features,
            query,
            sec0,
            sec1=None,
            sec2=None,
            opened=set(),
            getx=None,
            **options,
        ):
            """Gets passages, i.e. sections of level 1 (chapter-like).

            The material will be displayed as a sequence of plain
            representations of the sec2s (verse-like), which can be expanded to pretty
            displays when the user chooses to do so.

            Parameters
            ----------
            features: string | iterable
                The features that should be displayed in pretty displays when expanding
                a plain representation of a sec2 into a pretty display

            query: string
                The query whose results should be highlighted in the passage display.

            sec0: string | int
                The level 0 section (book)-like label in which the passage occurs

            sec1: string | int, optional None
                The level 1 section (chapter)-like label to fetch

            sec2: string | int, optional None
                The level 2 section (verse-like) label that should get focus

            opened: set, optional, `set()`
                The set of items that are currently expanded into pretty display

            getx: string | int, optional None
                If given, only a single sec2 (verse) will be fetched, but in pretty
                display.
                `getx` is the identifier (section label, verse number) of the item/

            options: dict
                Additional, optional display options, see `tf.advanced.options`.
            """

            app = self.app
            api = app.api
            F = api.F
            L = api.L
            T = api.T

            aContext = app.context
            browseNavLevel = aContext.browseNavLevel
            browseContentPretty = aContext.browseContentPretty

            sectionFeatureTypes = T.sectionFeatureTypes
            sec0Type = T.sectionTypes[0]
            sec1Type = T.sectionTypes[1]
            sectionDepth = len(T.sectionTypes)
            browseNavLevel = min((sectionDepth, browseNavLevel))
            finalSecType = T.sectionTypes[browseNavLevel]
            finalSec = (sec0, sec1, sec2)[browseNavLevel]

            if sec0:
                if sectionFeatureTypes[0] == "int":
                    sec0 = int(sec0)
            if sec1 and browseNavLevel == 2:
                if sectionFeatureTypes[1] == "int":
                    sec1 = int(sec1)

            sec0Node = T.nodeFromSection((sec0,)) if sec0 else None
            sec1Node = T.nodeFromSection((sec0, sec1)) if sec0 and sec1 else None

            contentNode = (sec0Node, sec1Node)[browseNavLevel - 1]

            if getx is not None:
                if sectionFeatureTypes[browseNavLevel] == "int":
                    getx = int(getx)

            sec0s = tuple(T.sectionFromNode(s)[0] for s in F.otype.s(sec0Type))
            sec1s = ()
            if browseNavLevel == 2:
                sec1s = (
                    ()
                    if sec0Node is None
                    else tuple(
                        T.sectionFromNode(s)[1] for s in L.d(sec0Node, otype=sec1Type)
                    )
                )

            items = (
                contentNode
                if browseContentPretty
                else L.d(contentNode, otype=finalSecType)
                if contentNode
                else []
            )

            highlights = (
                getPassageHighlights(app, contentNode, query, cache) if items else set()
            )

            passage = ""

            if items:
                passage = composeP(
                    app,
                    browseNavLevel,
                    finalSecType,
                    features,
                    items,
                    opened,
                    finalSec,
                    getx=getx,
                    highlights=highlights,
                    **options,
                )

            return (passage, sec0Type, pickle.dumps((sec0s, sec1s)), browseNavLevel)

        def exposed_rawSearch(self, query):
            app = self.app
            rawSearch = app.api.S.search

            (results, messages) = rawSearch(query, _msgCache=True)
            if messages:
                # console(messages, error=True)
                results = ()
            else:
                results = tuple(sorted(results))
                # console(f'{len(results)} results')
            return (results, messages)

        def exposed_table(
            self,
            kind,
            task,
            features,
            opened=set(),
            getx=None,
            **options,
        ):
            """Fetches material corresponding to a list of sections or tuples of nodes.

            Parameters
            ----------
            kind: string
                Either `sections` or `tuples`:
                whether to find section material or tuple material.

            task: iterable
                The list of things (sections or tuples) to retrieve the material for;
                Typically coming from the *section pad* / *node pad* in the browser.

            features: string | iterable
                The features that should be displayed in pretty displays when expanding
                a plain representation of a sec2 into a pretty display

            opened: set, optional, `set()`
                The set of items that are currently expanded into pretty display

            getx: string | int, optional None
                If given, only a single sec2 (verse) will be fetched, but in pretty
                display.
                `getx` is the identifier (section label, verse number) of the item/

            options: dict
                Additional, optional display options, see `tf.advanced.options`.
            """

            app = self.app

            if kind == "sections":
                results = []
                messages = []
                if task:
                    lines = task.split("\n")
                    for (i, line) in enumerate(lines):
                        line = line.strip()
                        node = app.nodeFromSectionStr(line)
                        if type(node) is not int:
                            messages.append(str(node))
                        else:
                            results.append((i + 1, (node,)))
                results = tuple(results)
                messages = "\n".join(messages)
            elif kind == "tuples":
                results = ()
                messages = ""
                if task:
                    lines = task.split("\n")
                    try:
                        results = tuple(
                            (i + 1, tuple(int(n) for n in t.strip().split(",")))
                            for (i, t) in enumerate(lines)
                            if t.strip()
                        )
                    except Exception as e:
                        messages = f"{e}"

            allResults = ((None, kind),) + results
            table = composeT(app, features, allResults, opened, getx=getx, **options)
            return (table, messages)

        def exposed_search(
            self,
            query,
            batch,
            position=1,
            opened=set(),
            getx=None,
            **options,
        ):
            """Executes a TF search template, retrieves formatted results.

            The very work horse of this API.

            Formatted results for additional nodes and sections are also retrieved.

            Parameters
            ----------
            query: string
                The query whose results should be highlighted in the passage display.
                Typically coming from the *search pad* in the browser.

            batch: int
                The number of table rows to show on one page in the browser.

            position: integer, optional 1
                The position that is in focus in the browser.
                The navigation links take this position as the central point,
                and enable the user to navigate to neighbouring results,
                in ever bigger strides.

            opened: set, optional set()
                The set of items that are currently expanded into pretty display.
                Normally, only the information to provide a *plain*
                representation of a result is being fetched,
                but for the opened ones information is gathered for
                pretty displays.

            getx: string | int, optional None
                If given, only a single sec2 (verse) will be fetched, but in pretty
                display.
                `getx` is the identifier (section label, verse number) of the item/
            """

            app = self.app
            display = app.display
            dContext = display.distill(options)
            condensed = dContext.condensed
            condenseType = dContext.condenseType

            total = 0

            results = ()
            status = True
            messages = ("", "")
            if query:
                (results, status, messages, features) = (
                    runSearchCondensed(app, query, cache, condenseType)
                    if condensed and condenseType
                    else runSearch(app, query, cache)
                )

                status = status[0] and status[1]
                if not status:
                    results = ()
                total += len(results)

            (start, end) = _batchAround(total, position, batch)

            selectedResults = results[start - 1 : end]
            opened = set(opened)

            before = {n for n in opened if n > 0 and n < start}
            after = {n for n in opened if n > end and n <= len(results)}
            beforeResults = tuple((n, results[n - 1]) for n in sorted(before))
            afterResults = tuple((n, results[n - 1]) for n in sorted(after))

            allResults = (
                ((None, "results"),)
                + beforeResults
                + tuple((i + start, r) for (i, r) in enumerate(selectedResults))
                + afterResults
            )
            features = set(reduce(set.union, (x[1] for x in features), set()))
            featureStr = " ".join(sorted(features))
            table = compose(
                app,
                allResults,
                featureStr,
                position,
                opened,
                start=start,
                getx=getx,
                **options,
            )
            return (table, status, " ".join(messages), featureStr, start, total)

        def exposed_csvs(self, query, tuples, sections, **options):
            """Gets query results etc. in plain csv format.

            The query results, tuples, and sections are retrieved, as in
            `exposed_search`, but this function only needs some features per node.
            """

            app = self.app
            display = app.display
            dContext = display.distill(options)
            fmt = dContext.fmt
            condensed = dContext.condensed
            condenseType = dContext.condenseType

            sectionResults = []
            if sections:
                sectionLines = sections.split("\n")
                for sectionLine in sectionLines:
                    sectionLine = sectionLine.strip()
                    node = app.nodeFromSectionStr(sectionLine)
                    if type(node) is int:
                        sectionResults.append((node,))
            sectionResults = tuple(sectionResults)

            tupleResults = ()
            if tuples:
                tupleLines = tuples.split("\n")
                try:
                    tupleResults = tuple(
                        tuple(int(n) for n in t.strip().split(","))
                        for t in tupleLines
                        if t.strip()
                    )
                except Exception:
                    pass

            queryResults = ()
            queryMessages = ("", "")
            features = ()
            if query:
                (queryResults, queryStatus, queryMessages, features) = runSearch(
                    app, query, cache
                )
                (queryResultsC, queryStatusC, queryMessagesC, featuresC) = (
                    runSearchCondensed(app, query, cache, condenseType)
                    if queryStatus[0] and queryStatus[1] and condensed and condenseType
                    else (None, (False, False), ("", ""), None)
                )

                queryStatus = queryStatus[0] and queryStatus[1]
                queryStatusC = queryStatusC[0] and queryStatusC[1]
                if not queryStatus:
                    queryResults = ()
                if not queryStatusC:
                    queryResultsC = ()

            csvs = (
                ("sections", sectionResults),
                ("nodes", tupleResults),
                ("results", queryResults),
            )
            if condensed and condenseType:
                csvs += ((f"resultsBy{condenseType}", queryResultsC),)

            tupleResultsX = getRowsX(
                app,
                tupleResults,
                features,
                condenseType,
                fmt=fmt,
            )
            queryResultsX = getRowsX(
                app,
                queryResults,
                features,
                condenseType,
                fmt=fmt,
            )
            return (
                queryStatus,
                " ".join(queryMessages[0]),
                pickle.dumps(csvs),
                pickle.dumps(tupleResultsX),
                pickle.dumps(queryResultsX),
            )

    return TfKernel()
    return ThreadedServer(
        TfKernel(),
        port=int(port),
        protocol_config={
            # 'allow_pickle': True,
            # 'allow_public_attrs': True,
        },
    )


# KERNEL CONNECTION


def makeTfConnection(lhost, port, timeout):
    if not Cap.can("browser"):
        return None

    class TfConnection:
        def connect(self):
            try:
                connection = rpyc.connect(
                    lhost, port, config=dict(sync_request_timeout=timeout)
                )
                self.connection = connection
            except ConnectionRefusedError as e:
                self.connection = None
                return str(e)
            return connection.root

    return TfConnection()


# TOP LEVEL


def main(cargs=sys.argv):
    args = argKernel(cargs)
    if not args:
        console(f"{TF_ERROR}")
        return

    if not Cap.can("browser"):
        console(f"{TF_ERROR}")
        return

    (dataSource, portKernel) = args
    backend = dataSource.get("backend", GH) or GH
    appName = dataSource["appName"]
    checkout = dataSource["checkout"]
    checkoutApp = dataSource["checkoutApp"]
    dataLoc = dataSource["dataLoc"]
    moduleRefs = dataSource["moduleRefs"]
    locations = dataSource["locations"]
    modules = dataSource["modules"]
    setFile = dataSource["setFile"]
    version = dataSource["version"]

    if checkout is None:
        checkout = ""

    versionRep = "" if version is None else f" version {version}"
    console(
        f"Setting up TF kernel for {appName} {moduleRefs or ''} "
        f"{setFile or ''}{versionRep}"
    )
    app = findApp(
        appName,
        checkoutApp,
        dataLoc,
        backend,
        True,
        silent=AUTO,
        checkout=checkout,
        mod=moduleRefs,
        locations=locations,
        modules=modules,
        setFile=setFile,
        version=version,
    )
    if app is None:
        console(f"{TF_ERROR}")
        return

    kernel = makeTfKernel(app, appName, portKernel)
    if kernel:
        server = ThreadedServer(
            kernel,
            port=int(portKernel),
            protocol_config={
                # 'allow_pickle': True,
                # 'allow_public_attrs': True,
            },
        )
        server.start()


# LOWER LEVEL


def _batchAround(nResults, position, batch):
    halfBatch = int((batch + 1) / 2)
    left = min(max(position - halfBatch, 1), nResults)
    right = max(min(position + halfBatch, nResults), 1)
    discrepancy = batch - (right - left + 1)
    if discrepancy != 0:
        right += discrepancy
    if right > nResults:
        right = nResults
    return (left, right)


if __name__ == "__main__":
    main()
back to top