https://github.com/pupil-labs/pupil
Raw File
Tip revision: 4fc04a5559ef4ce3d12b86fff749a605060b50ad authored by Pablo Prietz on 20 November 2017, 09:43:44 UTC
Eye: Remove "Integrated Camera" from preferred camera list
Tip revision: 4fc04a5
background_helper.py
'''
(*)~---------------------------------------------------------------------------
Pupil - eye tracking platform
Copyright (C) 2012-2017  Pupil Labs

Distributed under the terms of the GNU
Lesser General Public License (LGPL v3.0).
See COPYING and COPYING.LESSER for license details.
---------------------------------------------------------------------------~(*)
'''

from ctypes import c_bool
import multiprocessing as mp
# mp = mp.get_context('fork')
import logging
logger = logging.getLogger(__name__)


class EarlyCancellationError(Exception):
    pass


class Task_Proxy(object):
    '''Future like object that runs a given generator in the background and returns is able to return the results incrementally'''
    def __init__(self, name, generator, args=(), kwargs={}):
        super().__init__()

        self._should_terminate_flag = mp.Value(c_bool, 0)
        self._completed = False
        self._canceled = False

        pipe_recv, pipe_send = mp.Pipe(False)
        wrapper_args = [pipe_send, self._should_terminate_flag, generator]
        wrapper_args.extend(args)
        self.process = mp.Process(target=self._wrapper, name=name, args=wrapper_args, kwargs=kwargs)
        self.process.start()
        self.pipe = pipe_recv

    def _wrapper(self, pipe, _should_terminate_flag, generator, *args, **kwargs):
        '''Executed in background, pipes generator results to foreground'''
        logger.debug('Entering _wrapper')
        try:
            for datum in generator(*args, **kwargs):
                if _should_terminate_flag.value:
                    raise EarlyCancellationError('Task was cancelled')
                pipe.send(datum)
        except Exception as e:
            pipe.send(e)
            import traceback
            logger.warning(traceback.format_exc())
        else:
            pipe.send(StopIteration())
        finally:
            pipe.close()
            logger.debug('Exiting _wrapper')

    def fetch(self):
        '''Fetches progress and available results from background'''
        while self.pipe.poll(0):
            try:
                datum = self.pipe.recv()
            except EOFError:
                logger.debug("Process canceled be user.")
                return
            else:
                if isinstance(datum, StopIteration):
                    self._completed = True
                    return
                elif isinstance(datum, EarlyCancellationError):
                    self._canceled = True
                    return
                elif isinstance(datum, Exception):
                    raise datum
                else:
                    yield datum

    def cancel(self, timeout=1):
        if self.process is not None:
            self._should_terminate_flag.value = True
            for x in self.fetch():
                # fetch to flush pipe to allow process to react to cancel comand.
                pass
            self.process.join(timeout)
            self.process = None

    @property
    def completed(self):
        return self._completed

    @property
    def canceled(self):
        return self._canceled

    def __del__(self):
        self.cancel(timeout=.1)
        self.process = None


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(processName)s - [%(levelname)s] %(name)s: %(message)s')

    def example_generator(mu=0., sigma=1., steps=100):
        '''samples `N(\mu, \sigma^2)`'''
        import numpy as np
        from time import sleep
        for i in range(steps):
            # yield progress, datum
            yield (i + 1) / steps, sigma * np.random.randn() + mu
            sleep(np.random.rand() * .1)

    # initialize task proxy
    task = Task_Proxy('Background', example_generator, args=(5., 3.), kwargs={'steps': 100})

    from time import time, sleep
    start = time()
    maximal_duration = 2.
    while time() - start < maximal_duration:
        # fetch all available results
        for progress, random_number in task.fetch():
            logger.debug('[{:3.0f}%] {:0.2f}'.format(progress * 100, random_number))

        # test if task is completed
        if task.completed:
            break
        sleep(1.)

    logger.debug('Canceling task')
    task.cancel(wait=True)
    logger.debug('Task done')
back to top