[d34684]: / run_tests  Maximize  Restore  History

Download this file

226 lines (187 with data), 8.6 kB

#!/usr/bin/env python

#       Licensed to the Apache Software Foundation (ASF) under one
#       or more contributor license agreements.  See the NOTICE file
#       distributed with this work for additional information
#       regarding copyright ownership.  The ASF licenses this file
#       to you under the Apache License, Version 2.0 (the
#       "License"); you may not use this file except in compliance
#       with the License.  You may obtain a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#       Unless required by applicable law or agreed to in writing,
#       software distributed under the License is distributed on an
#       "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
#       KIND, either express or implied.  See the License for the
#       specific language governing permissions and limitations
#       under the License.

import argparse
from glob import glob
import json
import multiprocessing
from multiprocessing.pool import ThreadPool
import subprocess
import sys
import threading
import os

import six

CPUS = multiprocessing.cpu_count()
CONCURRENT_SUITES = (CPUS // 4) or CPUS
CONCURRENT_TESTS = CPUS // CONCURRENT_SUITES

ALT_PKG_PATHS = {
    'Allura': 'allura/tests/',
}

NOT_MULTIPROC_SAFE = [
    'ForgeGit',
    'ForgeSVN',
]

# unless you want to mess with changing stdout's own encoding, this works well:
#  py2 gets utf-8 encoded (binary) and py3 gets unicode text
print_ensured = six.ensure_binary if six.PY2 else six.ensure_text


def run_one(cmd, **popen_kwargs):
    cmd_to_show = '`{}` in {}'.format(cmd, popen_kwargs.get('cwd', '.'))
    print('{} running {}\n'.format(threading.current_thread(), cmd_to_show))
    sys.stdout.flush()

    all_popen_kwargs = dict(shell=True, stderr=subprocess.STDOUT,
                            stdout=subprocess.PIPE,
                            bufsize=1,  # 1 == line-buffered
                            text=True,
                            close_fds='posix' in sys.builtin_module_names)
    all_popen_kwargs.update(popen_kwargs)
    proc = subprocess.Popen(cmd, **all_popen_kwargs)
    while proc.poll() is None:
        line = proc.stdout.readline()
        sys.stdout.write(print_ensured(line))
        if 'No data to combine' in line:
            sys.stdout.write('^^ error from "coverage combine" command.  Make sure your package has a setup.cfg with coverage settings like other packages\n')
        sys.stdout.flush()
    # wait for completion and get remainder of output
    out_remainder, _ = proc.communicate()
    sys.stdout.write(print_ensured(out_remainder))
    sys.stdout.flush()
    print('finished {}, with returncode: {}'.format(cmd_to_show, proc.returncode))
    sys.stdout.flush()
    return proc


def run_many(cmds, processes=None):
    """
    cmds: list of shell commands, or list of (shell cmds, popen_kwargs)
    processes: number of processes, or None for # of CPU cores
    """
    thread_pool = ThreadPool(processes=processes)

    async_results = []
    for cmd_kwds in cmds:
        if type(cmd_kwds) == ():  # noqa: E721
            cmd = cmd_kwds
            kwds = {}
        else:
            cmd = cmd_kwds[0]
            kwds = cmd_kwds[1]
        result = thread_pool.apply_async(run_one, args=(cmd,), kwds=kwds)
        async_results.append(result)

    thread_pool.close()
    thread_pool.join()

    procs = [async_result.get() for async_result in async_results]
    return [p.returncode for p in procs]


def get_packages():
    packages = sorted([p.split('/')[0] for p in glob("*/setup.py")])

    # make it first, to catch syntax errors
    packages.remove('AlluraTest')
    packages.insert(0, 'AlluraTest')
    return packages


def check_packages(packages):
    for pkg in packages:
        try:
            __import__(pkg.lower())
        except ImportError:
            print("Not running tests for {}, since it isn't set up".format(pkg))
        else:
            yield pkg


def run_tests_in_parallel(options, runner_args):
    default_args = [
        # '-c /dev/null',  # pytest's equivalent of nose's NOSE_IGNORE_CONFIG_FILES='1' is '-c /dev/null/'
        '--disable-warnings',
    ]

    def get_pkg_path(pkg):
        return ALT_PKG_PATHS.get(pkg, '')
    
    def get_concurrent_tests(pkg):
        if pkg in NOT_MULTIPROC_SAFE:
            return 1
        return options.concurrent_tests

    def get_multiproc_args(pkg):
        if get_concurrent_tests(pkg) > 1:
            return '-n {procs_per_suite} --dist loadfile'.format(
                procs_per_suite=options.concurrent_tests
            )
        return ''

    def get_concurrent_suites():
        if '-n' in sys.argv:
            return options.concurrent_suites
        return CPUS

    cmds = []
    env = dict(os.environ)
    concurrent_tests_map = dict()
    for package in check_packages(options.packages):
        runner = 'pytest'
        if options.coverage:
            # This is the recommended way to run coverage + pytest  https://coverage.readthedocs.io/en/6.5.0/
            runner = f'coverage run -m {runner}'
            """
            And using config settings in setup.cfg seems to work well with parallel processes
            Otherwise need to run with a complex setup like:
                PYTHONPATH=/tmp/cov coverage run --rcfile /tmp/cov/.covrc
            And /tmp/cov/sitecustomize.py containing:
                import os
                os.environ['COVERAGE_PROCESS_START'] = '/tmp/cov/.covrc'
                import coverage
                coverage.process_startup()
            """

        multiproc_args = get_multiproc_args(package)
        cmd = "{runner} {pkg_path} {args} {multiproc_args}".format(
            runner=runner,
            pkg_path=get_pkg_path(package),
            args=' '.join(default_args + runner_args),
            multiproc_args=multiproc_args,
        )
        num_concurrent_tests = get_concurrent_tests(package)
        concurrent_tests_map[package] = num_concurrent_tests if num_concurrent_tests > 1 else '.'
        if options.coverage:
            cmd += ' && coverage combine'  # merge separate files present from multiprocessing config being on
            cmd += ' && coverage report --include=./* --omit="*/tests/*"'
        cmds.append((cmd, dict(cwd=package, env=env)))

    # TODO: add a way to include this or not; and add xml output for Jenkins
    cmds.append(('npm run lint-es6', {}))

    num_concurrent_suites = get_concurrent_suites()

    print('Running tests in parallel...')
    print('\t{:>20}: {}'.format('# CPUS',                       CPUS))
    print('\t{:>20}: {}'.format('# Suites in Parallel',         num_concurrent_suites))
    print('\t{:>20}: {}'.format('# Test in Parallel Per Suite', json.dumps(concurrent_tests_map, indent=2)))

    ret_codes = run_many(cmds, processes=num_concurrent_suites)

    if options.coverage and not any(ret_codes) and len(options.packages) > 1:
        subprocess.call('rm .coverage', shell=True)
        subprocess.check_call('cp --backup=numbered */.coverage .', shell=True)
        subprocess.check_call('coverage combine --append', shell=True)
        report_cmd = "coverage report --omit='*/tests/*'"
        subprocess.check_call(report_cmd, shell=True)
        print('\nFor HTML coverage report run: {}'.format(report_cmd.replace(' report', ' html')))

    return ret_codes


def parse_args():
    parser = argparse.ArgumentParser(
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog='''All additional arguments are passed along to pytest (e.g. -v)''')
    parser.add_argument('-n', help='Number of test suites to run concurrently in separate '
                                   'processes. Default: # CPUs / 4',
                        dest='concurrent_suites', type=int, default=CONCURRENT_SUITES)
    parser.add_argument('-m', help='Number of tests to run concurrently in separate '
                                   'processes, per suite. Default: # CPUs / # concurrent suites. '
                                   '(equivalent to pytest-xdist\'s -n option)',
                        dest='concurrent_tests', type=int, default=CONCURRENT_TESTS)
    parser.add_argument('--coverage', action='store_true',
                        help='Collect code coverage details, and report')
    parser.add_argument(
        '-p', help='List of packages to run tests on. Default: all',
        dest='packages', choices=get_packages(), default=get_packages(),
        nargs='+')
    return parser.parse_known_args()


if __name__ == "__main__":
    ret_codes = run_tests_in_parallel(*parse_args())
    sys.exit(any(ret_codes))