swh:1:snp:634d2b8906a7a2f6511ccb358da84e19b290d2c9
Raw File
Tip revision: 534313a165a8c662771ddfa092ed29d835b58582 authored by James Vega on 11 January 2005, 14:40:36 UTC
Updated to 0.80.0rc2.
Tip revision: 534313a
testsupport.py
###
# Copyright (c) 2002-2004, Jeremiah Fincher
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
#   * Redistributions of source code must retain the above copyright notice,
#     this list of conditions, and the following disclaimer.
#   * Redistributions in binary form must reproduce the above copyright notice,
#     this list of conditions, and the following disclaimer in the
#     documentation and/or other materials provided with the distribution.
#   * Neither the name of the author of this software nor the name of
#     contributors to this software may be used to endorse or promote products
#     derived from this software without specific prior written consent.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
###

__revision__ = "$Id$"

import supybot.fix as fix

import gc
import os
import re
import sys
import time
started = time.time()
import shutil
import unittest
import threading

import supybot.log as log
import supybot.conf as conf
import supybot.utils as utils
import supybot.ircdb as ircdb
import supybot.world as world
world.startedAt = started
import supybot.irclib as irclib
import supybot.drivers as drivers
import supybot.ircmsgs as ircmsgs
import supybot.registry as registry
import supybot.ircutils as ircutils
import supybot.callbacks as callbacks

import supybot.Owner as Owner

network = True

nicks = ['fatjim','scn','moshez','LordVan','MetaCosm','pythong','fishfart',
         'alb','d0rt','jemfinch','StyxAlso','fors','deltab','gd',
         'hellz_hunter','are_j|pub_comp','jason_','dreid','sayke_','winjer',
         'TenOfTen','GoNoVas','queuetue','the|zzz','Hellfried','Therion',
         'shro','DaCa','rexec','polin8','r0ky','aaron_','ironfroggy','eugene',
         'faassen','tirloni','mackstann','Yhg1s','ElBarono','vegai','shang',
         'typo_','kikoforgetme','asqui','TazyTiggy','fab','nixman','liiwi',
         'AdamV','paolo','red_one','_AleX_','lament','jamessan','supybot',
         'macr0_zzz','plaisthos','redghost','disco','mphardy','gt3','mathie',
         'jonez','r0ky-office','tic','d33p','ES3merge','talin','af','flippo',
         'sholden','ameoba','shepherg','j2','Acapnotic','dash','merlin262',
         'Taaus','_moshez','rik','jafo__','blk-majik','JT__','itamar',
         'kermit-','davidmccabe','glyph','jojo','dave_p','goo','hyjinx',
         'SamB','exarkun','drewp','Ragica','skylan','redgore','k3','Ra1stlin',
         'StevenK','carball','h3x','carljm','_jacob','teratorn','frangen',
         'phed','datazone','Yaggo','acct_','nowhere','pyn','ThomasWaldmann',
         'dunker','pilotLight','brainless','LoganH_','jmpnz','steinn',
         'EliasREC','lowks__','OldSmrf','Mad77','snibril','delta','psy',
         'skimpIzu','Kengur','MoonFallen','kotkis','Hyperi']

fd = file(os.path.join('test', 'rfc2812.msgs'), 'r')
rawmsgs = [line.strip() for line in fd]
fd.close()

msgs = []
for s in rawmsgs:
    try:
        msgs.append(ircmsgs.IrcMsg(s))
    except:
        print 'IrcMsg constructor failed: %r' % s

nicks += [msg.nick for msg in msgs if msg.nick]

originalCallbacksGetHelp = callbacks.getHelp
lastGetHelp = 'x'*1000
def cachingGetHelp(method, name=None):
    global lastGetHelp
    lastGetHelp = originalCallbacksGetHelp(method, name)
    return lastGetHelp
callbacks.getHelp = cachingGetHelp

def getTestIrc():
    irc = irclib.Irc('test')
    while irc.takeMsg():
        pass
    return irc

class TimeoutError(AssertionError):
    def __str__(self):
        return '%r timed out' % self.args[0]

class TestPlugin(callbacks.Privmsg):
    def eval(self, irc, msg, args):
        """<text>

        This is the help for eval.
        """
        try:
            irc.reply(repr(eval(' '.join(args))))
        except Exception, e:
            irc.reply(utils.exnToString(e))
TestInstance = TestPlugin()
conf.registerPlugin('TestPlugin', True, public=False)

class SupyTestCase(unittest.TestCase):
    def setUp(self):
        log.critical('Beginning test case %s', self.id())
        threads = [t.getName() for t in threading.enumerate()]
        log.critical('Threads: %s' % utils.commaAndify(threads))
        unittest.TestCase.setUp(self)

    def tearDown(self):
        for irc in world.ircs[:]:
            irc._reallyDie()


class PluginTestCase(SupyTestCase):
    """Subclass this to write a test case for a plugin.  See test/test_Fun.py
    for an example.
    """
    timeout = 10
    plugins = None
    cleanConfDir = True
    cleanDataDir = True
    config = {}
    def __init__(self, methodName='runTest'):
        originalRunTest = getattr(self, methodName)
        def runTest(self):
            run = True
            if hasattr(self, 'irc') and self.irc:
                for cb in self.irc.callbacks:
                    cbModule = sys.modules[cb.__class__.__module__]
                    if hasattr(cbModule, 'deprecated') and cbModule.deprecated:
                        print 
                        print 'Ignored, %s is deprecated.' % cb.name()
                        run = False
            if run:
                originalRunTest()
        runTest = utils.changeFunctionName(runTest, methodName)
        setattr(self.__class__, methodName, runTest)
        SupyTestCase.__init__(self, methodName=methodName)
        self.originals = {}

    def setUp(self, nick='test'):
        if self.__class__ in (PluginTestCase, ChannelPluginTestCase):
            # Necessary because there's a test in here that shouldn\'t run.
            return
        SupyTestCase.setUp(self)
        # Just in case, let's do this.  Too many people forget to call their
        # super methods.
        for irc in world.ircs[:]:
            irc._reallyDie()
        # Set conf variables appropriately.
        conf.supybot.reply.whenAddressedBy.chars.setValue('@')
        conf.supybot.reply.error.detailed.setValue(True)
        conf.supybot.reply.whenNotCommand.setValue(True)
        self.myVerbose = world.myVerbose
        def rmFiles(dir):
            for filename in os.listdir(dir):
                file = os.path.join(dir, filename)
                if os.path.isfile(file):
                    os.remove(file)
                else:
                    shutil.rmtree(file)
        if self.cleanConfDir:
            rmFiles(conf.supybot.directories.conf())
        if self.cleanDataDir:
            rmFiles(conf.supybot.directories.data())
        ircdb.users.reload()
        ircdb.ignores.reload()
        ircdb.channels.reload()
        if self.plugins is None:
            raise ValueError, 'PluginTestCase must have a "plugins" attribute.'
        self.nick = nick
        self.prefix = ircutils.joinHostmask(nick, 'user', 'host.domain.tld')
        self.irc = getTestIrc()
        MiscModule = Owner.loadPluginModule('Misc')
        ConfigModule = Owner.loadPluginModule('Config')
        _ = Owner.loadPluginClass(self.irc, Owner)
        _ = Owner.loadPluginClass(self.irc, MiscModule)
        _ = Owner.loadPluginClass(self.irc, ConfigModule)
        if isinstance(self.plugins, str):
            self.plugins = [self.plugins]
        else:
            for name in self.plugins:
                if name not in ('Owner', 'Misc', 'Config'):
                    module = Owner.loadPluginModule(name,ignoreDeprecation=True)
                    cb = Owner.loadPluginClass(self.irc, module)
        self.irc.addCallback(TestInstance)
        for (name, value) in self.config.iteritems():
            group = conf.supybot
            parts = registry.split(name)
            if parts[0] == 'supybot':
                parts.pop(0)
            for part in parts:
                group = group.get(part)
            self.originals[group] = group()
            group.setValue(value)

    def tearDown(self):
        if self.__class__ in (PluginTestCase, ChannelPluginTestCase):
            # Necessary because there's a test in here that shouldn\'t run.
            return
        for (group, original) in self.originals.iteritems():
            group.setValue(original)
        ircdb.users.close()
        ircdb.ignores.close()
        ircdb.channels.close()
        SupyTestCase.tearDown(self)
        self.irc = None
        gc.collect()

    def _feedMsg(self, query, timeout=None, to=None, frm=None,
                 usePrefixChar=True):
        if to is None:
            to = self.irc.nick
        if frm is None:
            frm = self.prefix
        if timeout is None:
            timeout = self.timeout
        if self.myVerbose:
            print # Extra newline, so it's pretty.
        prefixChars = conf.supybot.reply.whenAddressedBy.chars()
        if not usePrefixChar and query[0] in prefixChars:
            query = query[1:]
        msg = ircmsgs.privmsg(to, query, prefix=frm)
        if self.myVerbose:
            print 'Feeding: %r' % msg
        self.irc.feedMsg(msg)
        fed = time.time()
        response = self.irc.takeMsg()
        while response is None and time.time() - fed < timeout:
            time.sleep(0.1) # So it doesn't suck up 100% cpu.
            drivers.run()
            response = self.irc.takeMsg()
        if self.myVerbose:
            print 'Response: %r' % response
        return response

    def getMsg(self, query, **kwargs):
        return self._feedMsg(query, **kwargs)

    def feedMsg(self, query, to=None, frm=None):
        """Just feeds it a message, that's all."""
        if to is None:
            to = self.irc.nick
        if frm is None:
            frm = self.prefix
        self.irc.feedMsg(ircmsgs.privmsg(to, query, prefix=frm))

    # These assertError/assertNoError are somewhat fragile.  The proper way to
    # do them would be to use a proxy for the irc object and intercept .error.
    # But that would be hard, so I don't bother.  When this breaks, it'll get
    # fixed, but not until then.
    def assertError(self, query, **kwargs):
        m = self._feedMsg(query, **kwargs)
        if m is None:
            raise TimeoutError, query
        if lastGetHelp not in m.args[1]:
            self.failUnless(m.args[1].startswith('Error:'),
                            '%r did not error: %s' % (query, m.args[1]))
        return m

    def assertSnarfError(self, query, **kwargs):
        return self.assertError(query, usePrefixChar=False, **kwargs)

    def assertNotError(self, query, **kwargs):
        m = self._feedMsg(query, **kwargs)
        if m is None:
            raise TimeoutError, query
        self.failIf(m.args[1].startswith('Error:'),
                    '%r errored: %s' % (query, m.args[1]))
        self.failIf(lastGetHelp in m.args[1],
                    '%r returned the help string.' % query)
        return m

    def assertSnarfNotError(self, query, **kwargs):
        return self.assertNotError(query, usePrefixChar=False, **kwargs)

    def assertHelp(self, query, **kwargs):
        m = self._feedMsg(query, **kwargs)
        if m is None:
            raise TimeoutError, query
        self.failUnless(lastGetHelp in m.args[1],
                        '%s is not the help (%s)' % (m.args[1], lastGetHelp))
        return m

    def assertNoResponse(self, query, timeout=0, **kwargs):
        m = self._feedMsg(query, timeout=timeout, **kwargs)
        self.failIf(m, 'Unexpected response: %r' % m)
        return m

    def assertSnarfNoResponse(self, query, timeout=0, **kwargs):
        return self.assertNoResponse(query, timeout=timeout,
                                     usePrefixChar=False, **kwargs)

    def assertResponse(self, query, expectedResponse, **kwargs):
        m = self._feedMsg(query, **kwargs)
        if m is None:
            raise TimeoutError, query
        self.assertEqual(m.args[1], expectedResponse,
                         '%r != %r' % (expectedResponse, m.args[1]))
        return m

    def assertSnarfResponse(self, query, expectedResponse, **kwargs):
        return self.assertResponse(query, expectedResponse,
                                   usePrefixChar=False, **kwargs)

    def assertRegexp(self, query, regexp, flags=re.I, **kwargs):
        m = self._feedMsg(query, **kwargs)
        if m is None:
            raise TimeoutError, query
        self.failUnless(re.search(regexp, m.args[1], flags),
                        '%r does not match %r' % (m.args[1], regexp))
        return m

    def assertSnarfRegexp(self, query, regexp, flags=re.I, **kwargs):
        return self.assertRegexp(query, regexp, flags=re.I,
                                 usePrefixChar=False, **kwargs)

    def assertNotRegexp(self, query, regexp, flags=re.I, **kwargs):
        m = self._feedMsg(query, **kwargs)
        if m is None:
            raise TimeoutError, query
        self.failUnless(re.search(regexp, m.args[1], flags) is None,
                        '%r matched %r' % (m.args[1], regexp))
        return m

    def assertSnarfNotRegexp(self, query, regexp, flags=re.I, **kwargs):
        return self.assertNotRegexp(query, regexp, flags=re.I,
                                    usePrefixChar=False, **kwargs)

    def assertAction(self, query, expectedResponse=None, **kwargs):
        m = self._feedMsg(query, **kwargs)
        if m is None:
            raise TimeoutError, query
        self.failUnless(ircmsgs.isAction(m), '%r is not an action.' % m)
        if expectedResponse is not None:
            s = ircmsgs.unAction(m)
            self.assertEqual(s, expectedResponse, '%r != %r' % (s, m))
        return m

    def assertSnarfAction(self, query, expectedResponse=None, **kwargs):
        return self.assertAction(query, expectedResponse=None,
                                 usePrefixChar=False, **kwargs)

    def assertActionRegexp(self, query, regexp, flags=re.I, **kwargs):
        m = self._feedMsg(query, **kwargs)
        if m is None:
            raise TimeoutError, query
        self.failUnless(ircmsgs.isAction(m))
        s = ircmsgs.unAction(m)
        self.failUnless(re.search(regexp, s, flags),
                        '%r does not match %r' % (s, regexp))

    def assertSnarfActionRegexp(self, query, regexp, flags=re.I, **kwargs):
        return self.assertActionRegexp(query, regexp, flags=re.I,
                                       usePrefixChar=False, **kwargs)

    _noTestDoc = ('Admin', 'Channel', 'Config',
                  'Misc', 'Owner', 'User', 'TestPlugin')
    def testDocumentation(self):
        if self.__class__ in (PluginTestCase, ChannelPluginTestCase):
            return
        for cb in self.irc.callbacks:
            name = cb.name()
            if ((name in self._noTestDoc) and \
               not name.lower() in self.__class__.__name__.lower()) or \
               isinstance(cb, callbacks.PrivmsgRegexp):
                continue
            self.failUnless(sys.modules[cb.__class__.__name__].__doc__,
                            '%s has no module documentation.' % name)
            if hasattr(cb, 'isCommand'):
                for attr in dir(cb):
                    if cb.isCommand(attr) and \
                       attr == callbacks.canonicalName(attr):
                        self.failUnless(getattr(cb, attr, None).__doc__,
                                        '%s.%s has no help.' % (name, attr))


class ChannelPluginTestCase(PluginTestCase):
    channel = '#test'
    def setUp(self):
        if self.__class__ in (PluginTestCase, ChannelPluginTestCase):
            return
        PluginTestCase.setUp(self)
        self.irc.feedMsg(ircmsgs.join(self.channel, prefix=self.prefix))
        m = self.irc.takeMsg()
        self.failIf(m is None, 'No message back from joining channel.')
        self.assertEqual(m.command, 'MODE')
        m = self.irc.takeMsg()
        self.failIf(m is None, 'No message back from joining channel.')
        self.assertEqual(m.command, 'WHO')

    def _feedMsg(self, query, timeout=None, to=None, frm=None, private=False,
                 usePrefixChar=True):
        if to is None:
            if private:
                to = self.irc.nick
            else:
                to = self.channel
        if frm is None:
            frm = self.prefix
        if timeout is None:
            timeout = self.timeout
        if self.myVerbose:
            print # Newline, just like PluginTestCase.
        prefixChars = conf.supybot.reply.whenAddressedBy.chars()
        if query[0] not in prefixChars and usePrefixChar:
            query = prefixChars[0] + query
        msg = ircmsgs.privmsg(to, query, prefix=frm)
        if self.myVerbose:
            print 'Feeding: %r' % msg
        self.irc.feedMsg(msg)
        fed = time.time()
        response = self.irc.takeMsg()
        while response is None and time.time() - fed < timeout:
            time.sleep(0.1)
            drivers.run()
            response = self.irc.takeMsg()
        if response is not None:
            if response.command == 'PRIVMSG':
                args = list(response.args)
                # Strip off nick: at beginning of response.
                if args[1].startswith(self.nick) or \
                   args[1].startswith(ircutils.nickFromHostmask(self.prefix)):
                    try:
                        args[1] = args[1].split(' ', 1)[1]
                    except IndexError:
                        # Odd.  We'll skip this.
                        pass
                ret = ircmsgs.privmsg(*args)
            else:
                ret = response
        else:
            ret = None
        if self.myVerbose:
            print 'Returning: %r' % ret
        return ret

    def feedMsg(self, query, to=None, frm=None, private=False):
        """Just feeds it a message, that's all."""
        if to is None:
            if private:
                to = self.irc.nick
            else:
                to = self.channel
        if frm is None:
            frm = self.prefix
        self.irc.feedMsg(ircmsgs.privmsg(to, query, prefix=frm))


class PluginDocumentation:
    pass # This is old stuff, it should be removed some day.


# vim:set shiftwidth=4 tabstop=8 expandtab textwidth=78:

back to top