Initial import
[samba] / source / stf / comfychair.py
diff --git a/source/stf/comfychair.py b/source/stf/comfychair.py
new file mode 100644 (file)
index 0000000..522f9be
--- /dev/null
@@ -0,0 +1,445 @@
+#! /usr/bin/env python
+
+# Copyright (C) 2002, 2003 by Martin Pool <mbp@samba.org>
+# Copyright (C) 2003 by Tim Potter <tpot@samba.org>
+# 
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; either version 2 of the
+# License, or (at your option) any later version.
+# 
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# General Public License for more details.
+# 
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+# USA
+
+"""comfychair: a Python-based instrument of software torture.
+
+Copyright (C) 2002, 2003 by Martin Pool <mbp@samba.org>
+Copyright (C) 2003 by Tim Potter <tpot@samba.org>
+
+This is a test framework designed for testing programs written in
+Python, or (through a fork/exec interface) any other language.
+
+For more information, see the file README.comfychair.
+
+To run a test suite based on ComfyChair, just run it as a program.
+"""
+
+import sys, re
+
+
+class TestCase:
+    """A base class for tests.  This class defines required functions which
+    can optionally be overridden by subclasses.  It also provides some
+    utility functions for"""
+
+    def __init__(self):
+        self.test_log = ""
+        self.background_pids = []
+        self._cleanups = []
+        self._enter_rundir()
+        self._save_environment()
+        self.add_cleanup(self.teardown)
+
+
+    # --------------------------------------------------
+    # Save and restore directory
+    def _enter_rundir(self):
+        import os
+        self.basedir = os.getcwd()
+        self.add_cleanup(self._restore_directory)
+        self.rundir = os.path.join(self.basedir,
+                                   'testtmp', 
+                                   self.__class__.__name__)
+        self.tmpdir = os.path.join(self.rundir, 'tmp')
+        os.system("rm -fr %s" % self.rundir)
+        os.makedirs(self.tmpdir)
+        os.system("mkdir -p %s" % self.rundir)
+        os.chdir(self.rundir)
+
+    def _restore_directory(self):
+        import os
+        os.chdir(self.basedir)
+
+    # --------------------------------------------------
+    # Save and restore environment
+    def _save_environment(self):
+        import os
+        self._saved_environ = os.environ.copy()
+        self.add_cleanup(self._restore_environment)
+
+    def _restore_environment(self):
+        import os
+        os.environ.clear()
+        os.environ.update(self._saved_environ)
+
+    
+    def setup(self):
+        """Set up test fixture."""
+        pass
+
+    def teardown(self):
+        """Tear down test fixture."""
+        pass
+
+    def runtest(self):
+        """Run the test."""
+        pass
+
+
+    def add_cleanup(self, c):
+        """Queue a cleanup to be run when the test is complete."""
+        self._cleanups.append(c)
+        
+
+    def fail(self, reason = ""):
+        """Say the test failed."""
+        raise AssertionError(reason)
+
+
+    #############################################################
+    # Requisition methods
+
+    def require(self, predicate, message):
+        """Check a predicate for running this test.
+
+If the predicate value is not true, the test is skipped with a message explaining
+why."""
+        if not predicate:
+            raise NotRunError, message
+
+    def require_root(self):
+        """Skip this test unless run by root."""
+        import os
+        self.require(os.getuid() == 0,
+                     "must be root to run this test")
+
+    #############################################################
+    # Assertion methods
+
+    def assert_(self, expr, reason = ""):
+        if not expr:
+            raise AssertionError(reason)
+
+    def assert_equal(self, a, b):
+        if not a == b:
+            raise AssertionError("assertEquals failed: %s" % `(a, b)`)
+            
+    def assert_notequal(self, a, b):
+        if a == b:
+            raise AssertionError("assertNotEqual failed: %s" % `(a, b)`)
+
+    def assert_re_match(self, pattern, s):
+        """Assert that a string matches a particular pattern
+
+        Inputs:
+          pattern      string: regular expression
+          s            string: to be matched
+
+        Raises:
+          AssertionError if not matched
+          """
+        if not re.match(pattern, s):
+            raise AssertionError("string does not match regexp\n"
+                                 "    string: %s\n"
+                                 "    re: %s" % (`s`, `pattern`))
+
+    def assert_re_search(self, pattern, s):
+        """Assert that a string *contains* a particular pattern
+
+        Inputs:
+          pattern      string: regular expression
+          s            string: to be searched
+
+        Raises:
+          AssertionError if not matched
+          """
+        if not re.search(pattern, s):
+            raise AssertionError("string does not contain regexp\n"
+                                 "    string: %s\n"
+                                 "    re: %s" % (`s`, `pattern`))
+
+
+    def assert_no_file(self, filename):
+        import os.path
+        assert not os.path.exists(filename), ("file exists but should not: %s" % filename)
+
+
+    #############################################################
+    # Methods for running programs
+
+    def runcmd_background(self, cmd):
+        import os
+        self.test_log = self.test_log + "Run in background:\n" + `cmd` + "\n"
+        pid = os.fork()
+        if pid == 0:
+            # child
+            try:
+                os.execvp("/bin/sh", ["/bin/sh", "-c", cmd])
+            finally:
+                os._exit(127)
+        self.test_log = self.test_log + "pid: %d\n" % pid
+        return pid
+
+
+    def runcmd(self, cmd, expectedResult = 0):
+        """Run a command, fail if the command returns an unexpected exit
+        code.  Return the output produced."""
+        rc, output, stderr = self.runcmd_unchecked(cmd)
+        if rc != expectedResult:
+            raise AssertionError("""command returned %d; expected %s: \"%s\"
+stdout:
+%s
+stderr:
+%s""" % (rc, expectedResult, cmd, output, stderr))
+
+        return output, stderr
+
+
+    def run_captured(self, cmd):
+        """Run a command, capturing stdout and stderr.
+
+        Based in part on popen2.py
+
+        Returns (waitstatus, stdout, stderr)."""
+        import os, types
+        pid = os.fork()
+        if pid == 0:
+            # child
+            try: 
+                pid = os.getpid()
+                openmode = os.O_WRONLY|os.O_CREAT|os.O_TRUNC
+
+                outfd = os.open('%d.out' % pid, openmode, 0666)
+                os.dup2(outfd, 1)
+                os.close(outfd)
+
+                errfd = os.open('%d.err' % pid, openmode, 0666)
+                os.dup2(errfd, 2)
+                os.close(errfd)
+
+                if isinstance(cmd, types.StringType):
+                    cmd = ['/bin/sh', '-c', cmd]
+
+                os.execvp(cmd[0], cmd)
+            finally:
+                os._exit(127)
+        else:
+            # parent
+            exited_pid, waitstatus = os.waitpid(pid, 0)
+            stdout = open('%d.out' % pid).read()
+            stderr = open('%d.err' % pid).read()
+            return waitstatus, stdout, stderr
+
+
+    def runcmd_unchecked(self, cmd, skip_on_noexec = 0):
+        """Invoke a command; return (exitcode, stdout, stderr)"""
+        import os
+        waitstatus, stdout, stderr = self.run_captured(cmd)
+        assert not os.WIFSIGNALED(waitstatus), \
+               ("%s terminated with signal %d" % (`cmd`, os.WTERMSIG(waitstatus)))
+        rc = os.WEXITSTATUS(waitstatus)
+        self.test_log = self.test_log + ("""Run command: %s
+Wait status: %#x (exit code %d, signal %d)
+stdout:
+%s
+stderr:
+%s""" % (cmd, waitstatus, os.WEXITSTATUS(waitstatus), os.WTERMSIG(waitstatus),
+         stdout, stderr))
+        if skip_on_noexec and rc == 127:
+            # Either we could not execute the command or the command
+            # returned exit code 127.  According to system(3) we can't
+            # tell the difference.
+            raise NotRunError, "could not execute %s" % `cmd`
+        return rc, stdout, stderr
+    
+
+    def explain_failure(self, exc_info = None):
+        print "test_log:"
+        print self.test_log
+
+
+    def log(self, msg):
+        """Log a message to the test log.  This message is displayed if
+        the test fails, or when the runtests function is invoked with
+        the verbose option."""
+        self.test_log = self.test_log + msg + "\n"
+
+
+class NotRunError(Exception):
+    """Raised if a test must be skipped because of missing resources"""
+    def __init__(self, value = None):
+        self.value = value
+
+
+def _report_error(case, debugger):
+    """Ask the test case to explain failure, and optionally run a debugger
+
+    Input:
+      case         TestCase instance
+      debugger     if true, a debugger function to be applied to the traceback
+"""
+    import sys
+    ex = sys.exc_info()
+    print "-----------------------------------------------------------------"
+    if ex:
+        import traceback
+        traceback.print_exc(file=sys.stdout)
+    case.explain_failure()
+    print "-----------------------------------------------------------------"
+
+    if debugger:
+        tb = ex[2]
+        debugger(tb)
+
+
+def runtests(test_list, verbose = 0, debugger = None):
+    """Run a series of tests.
+
+    Inputs:
+      test_list    sequence of TestCase classes
+      verbose      print more information as testing proceeds
+      debugger     debugger object to be applied to errors
+
+    Returns:
+      unix return code: 0 for success, 1 for failures, 2 for test failure
+    """
+    import traceback
+    ret = 0
+    for test_class in test_list:
+        print "%-30s" % _test_name(test_class),
+        # flush now so that long running tests are easier to follow
+        sys.stdout.flush()
+
+        obj = None
+        try:
+            try: # run test and show result
+                obj = test_class()
+                obj.setup()
+                obj.runtest()
+                print "OK"
+            except KeyboardInterrupt:
+                print "INTERRUPT"
+                _report_error(obj, debugger)
+                ret = 2
+                break
+            except NotRunError, msg:
+                print "NOTRUN, %s" % msg.value
+            except:
+                print "FAIL"
+                _report_error(obj, debugger)
+                ret = 1
+        finally:
+            while obj and obj._cleanups:
+                try:
+                    apply(obj._cleanups.pop())
+                except KeyboardInterrupt:
+                    print "interrupted during teardown"
+                    _report_error(obj, debugger)
+                    ret = 2
+                    break
+                except:
+                    print "error during teardown"
+                    _report_error(obj, debugger)
+                    ret = 1
+        # Display log file if we're verbose
+        if ret == 0 and verbose:
+            obj.explain_failure()
+            
+    return ret
+
+
+def _test_name(test_class):
+    """Return a human-readable name for a test class.
+    """
+    try:
+        return test_class.__name__
+    except:
+        return `test_class`
+
+
+def print_help():
+    """Help for people running tests"""
+    import sys
+    print """%s: software test suite based on ComfyChair
+
+usage:
+    To run all tests, just run this program.  To run particular tests,
+    list them on the command line.
+
+options:
+    --help              show usage message
+    --list              list available tests
+    --verbose, -v       show more information while running tests
+    --post-mortem, -p   enter Python debugger on error
+""" % sys.argv[0]
+
+
+def print_list(test_list):
+    """Show list of available tests"""
+    for test_class in test_list:
+        print "    %s" % _test_name(test_class)
+
+
+def main(tests, extra_tests=[]):
+    """Main entry point for test suites based on ComfyChair.
+
+    inputs:
+      tests       Sequence of TestCase subclasses to be run by default.
+      extra_tests Sequence of TestCase subclasses that are available but
+                  not run by default.
+
+Test suites should contain this boilerplate:
+
+    if __name__ == '__main__':
+        comfychair.main(tests)
+
+This function handles standard options such as --help and --list, and
+by default runs all tests in the suggested order.
+
+Calls sys.exit() on completion.
+"""
+    from sys import argv
+    import getopt, sys
+
+    opt_verbose = 0
+    debugger = None
+
+    opts, args = getopt.getopt(argv[1:], 'pv',
+                               ['help', 'list', 'verbose', 'post-mortem'])
+    for opt, opt_arg in opts:
+        if opt == '--help':
+            print_help()
+            return
+        elif opt == '--list':
+            print_list(tests + extra_tests)
+            return
+        elif opt == '--verbose' or opt == '-v':
+            opt_verbose = 1
+        elif opt == '--post-mortem' or opt == '-p':
+            import pdb
+            debugger = pdb.post_mortem
+
+    if args:
+        all_tests = tests + extra_tests
+        by_name = {}
+        for t in all_tests:
+            by_name[_test_name(t)] = t
+        which_tests = []
+        for name in args:
+            which_tests.append(by_name[name])
+    else:
+        which_tests = tests
+
+    sys.exit(runtests(which_tests, verbose=opt_verbose,
+                      debugger=debugger))
+
+
+if __name__ == '__main__':
+    print __doc__