#! /usr/bin/python3
# -*- coding: utf-8 -*-
#
# Copyright 2005 Lars Wirzenius (liw@iki.fi)
# Copyright © 2011-2019 Andreas Beckmann (anbe@debian.org)
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <https://www.gnu.org/licenses/>


"""Distributed piuparts processing, slave program

Lars Wirzenius <liw@iki.fi>
"""
from __future__ import print_function

import fcntl
import logging
import os
import random
import shlex
import subprocess
import sys
import time
from signal import SIGALRM, SIGHUP, SIGINT, SIGKILL, SIGUSR1, alarm, signal

import apt_pkg

import piupartslib.conf
import piupartslib.packagesdb
from piupartslib.conf import MissingSection

apt_pkg.init_system()


CONFIG_FILE = "/etc/piuparts/piuparts.conf"
DISTRO_CONFIG_FILE = "/etc/piuparts/distros.conf"
MAX_WAIT_TEST_RUN = 90 * 60

interrupted = False
old_sigint_handler = None
got_sighup = False
got_sigusr1 = False


def setup_logging(log_level, log_file_name):
    logger = logging.getLogger()
    logger.setLevel(log_level)

    formatter = logging.Formatter(fmt="%(asctime)s %(message)s", datefmt="%H:%M:%S")

    handler = logging.StreamHandler(sys.stderr)
    handler.setFormatter(formatter)
    logger.addHandler(handler)

    if log_file_name:
        handler = logging.FileHandler(log_file_name)
        logger.addHandler(handler)


class Config(piupartslib.conf.Config):
    def __init__(self, section="slave", defaults_section=None):
        self.section = section
        piupartslib.conf.Config.__init__(
            self,
            section,
            {
                "sections": "slave",
                "basetgz-sections": "",
                "idle-sleep": 300,
                "max-tgz-age": 2592000,
                "min-tgz-retry-delay": 21600,
                "master-host": None,
                "master-user": None,
                "master-command": None,
                "proxy": None,
                "mirror": None,
                "setarch": None,
                "piuparts-command": "sudo piuparts",
                "piuparts-flags": "",
                "tmpdir": None,
                "distro": None,
                "area": None,
                "components": None,
                "chroot-tgz": None,
                "upgrade-test-distros": None,
                "basetgz-directory": ".",
                "chroot-meta-auto": None,
                "chroot-meta-directory": None,
                "max-reserved": 1,
                "debug": "no",
                "keep-sources-list": "no",
                "arch": None,
                "precedence": "1",
                "slave-load-max": None,
                "slave-flush-interval": 0,
            },
            defaults_section=defaults_section,
        )


class Alarm(Exception):
    pass


def alarm_handler(signum, frame):
    raise Alarm


def sigint_handler(signum, frame):
    global interrupted
    interrupted = True
    print("\nSlave interrupted by the user, waiting for the current test to finish.")
    print("Press Ctrl-C again to abort now.")
    signal(SIGINT, old_sigint_handler)


def sighup_handler(signum, frame):
    global got_sighup
    got_sighup = True
    print("SIGHUP: Will flush finished logs.")


def sigusr1_handler(signum, frame):
    global got_sigusr1
    global got_sighup
    got_sigusr1 = True
    got_sighup = True
    print("SIGUSR1: Will restart.")


class MasterIsBusy(Exception):
    def __init__(self):
        self.args = ("Master is busy, retry later",)


class MasterNotOK(Exception):
    def __init__(self):
        self.args = ("Master did not respond with 'ok'",)


class MasterDidNotGreet(Exception):
    def __init__(self):
        self.args = ("Master did not start with 'hello'",)


class MasterCommunicationFailed(Exception):
    def __init__(self):
        self.args = ("Communication with master failed",)


class MasterIsCrazy(Exception):
    def __init__(self):
        self.args = ("Master said something unexpected",)


class MasterCantRecycle(Exception):
    def __init__(self):
        self.args = ("Master has nothing to recycle",)


class Slave:
    def __init__(self):
        self._to_master = None
        self._from_master = None
        self._master_host = None
        self._master_user = None
        self._master_command = None
        self._section = None

    def _readline(self):
        try:
            line = self._from_master.readline()
        except IOError:
            raise MasterCommunicationFailed()
        logging.debug("<< " + str(line.rstrip()))
        return line

    def _writeline(self, *words):
        line = " ".join(words)
        logging.debug(">> " + line)
        try:
            self._to_master.write(line + "\n")
            self._to_master.flush()
        except IOError:
            raise MasterCommunicationFailed()

    def set_master_host(self, host):
        logging.debug("Setting master host to %s" % host)
        if self._master_host != host:
            self.close()
            self._master_host = host

    def set_master_user(self, user):
        logging.debug("Setting master user to %s" % user)
        if self._master_user != user:
            self.close()
            self._master_user = user

    def set_master_command(self, cmd):
        logging.debug("Setting master command to %s" % cmd)
        if self._master_command != cmd:
            self.close()
            self._master_command = cmd

    def set_section(self, section):
        logging.debug("Setting section to %s" % section)
        self._section = section

    def connect_to_master(self):
        if not self._is_connected():
            self._initial_connect()
        self._select_section()

    def _is_connected(self):
        return self._to_master and self._from_master

    def _initial_connect(self):
        logging.info("Connecting to %s" % self._master_host)
        ssh_command = ["ssh", "-x"]
        if self._master_user:
            ssh_command.extend(["-l", self._master_user])
        ssh_command.append(self._master_host)
        ssh_command.append(self._master_command or "command-is-set-in-authorized_keys")
        p = subprocess.Popen(
            ssh_command,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            universal_newlines=True,
        )
        self._to_master = p.stdin
        self._from_master = p.stdout
        line = self._readline()
        if line != "hello\n":
            raise MasterDidNotGreet()

    def _select_section(self):
        self._writeline("section", self._section)
        line = self._readline()
        if line == "busy\n":
            raise MasterIsBusy()
        elif line != "ok\n":
            raise MasterNotOK()
        logging.debug("Connected to master")

    def close(self):
        if self._from_master is None and self._to_master is None:
            return
        logging.debug("Closing connection to master")
        if self._from_master is not None:
            self._from_master.close()
        if self._to_master is not None:
            self._to_master.close()
        self._from_master = self._to_master = None
        logging.info("Connection to master closed")

    def send_log(self, section, pass_or_fail, filename):
        logging.info("Sending log file %s/%s" % (section, filename))
        basename = os.path.basename(filename)
        package, rest = basename.split("_", 1)
        version = rest[: -len(".log")]
        self._writeline(pass_or_fail, package, version)
        with open(filename, "r") as f:
            for line in f:
                if line.endswith("\n"):
                    line = line[:-1]
                self._writeline(" " + line)
        self._writeline(".")
        line = self._readline()
        if line != "ok\n":
            raise MasterNotOK()

    def get_status(self, section):
        self._writeline("status")
        line = self._readline()
        words = line.split()
        if words and words[0] == "ok":
            logging.info("Master " + section + " status: " + " ".join(words[1:]))
        else:
            raise MasterIsCrazy()

    def enable_recycling(self):
        self._writeline("recycle")
        line = self._readline()
        if line != "ok\n":
            raise MasterCantRecycle()

    def get_idle(self):
        self._writeline("idle")
        line = self._readline()
        words = line.split()
        if words and words[0] == "ok" and len(words) == 2:
            return int(words[1])
        else:
            raise MasterIsCrazy()

    def reserve(self):
        self._writeline("reserve")
        line = self._readline()
        words = line.split()
        if words and words[0] == "ok":
            logging.info("Reserved for us: %s %s" % (words[1], words[2]))
            self.remember_reservation(words[1], words[2])
            return True
        elif words and words[0] == "error":
            logging.info("Master didn't reserve anything (more) for us")
            return False
        else:
            raise MasterIsCrazy()

    def unreserve(self, filename):
        basename = os.path.basename(filename)
        package, rest = basename.split("_", 1)
        version = rest[: -len(".log")]
        logging.info("Unreserve: %s %s" % (package, version))
        self._writeline("unreserve", package, version)
        line = self._readline()
        if line != "ok\n":
            raise MasterNotOK()

    def _reserved_filename(self, name, version):
        return os.path.join("reserved", "%s_%s.log" % (name, version))

    def remember_reservation(self, name, version):
        create_file(self._reserved_filename(name, version), "")

    def get_reserved(self):
        vlist = []
        for basename in os.listdir("reserved"):
            if "_" in basename and basename.endswith(".log"):
                name, version = basename[: -len(".log")].split("_", 1)
                vlist.append((name, version))
        return vlist

    def forget_reserved(self, name, version):
        try:
            os.remove(self._reserved_filename(name, version))
        except os.error:
            pass


class Section:
    def __init__(self, section, slave=None):
        self._config = Config(section=section, defaults_section="global")
        self._config.read(CONFIG_FILE)
        self._distro_config = piupartslib.conf.DistroConfig(DISTRO_CONFIG_FILE, self._config["mirror"])
        self._error_wait_until = 0
        self._idle_wait_until = 0
        self._recycle_wait_until = 0
        self._tarball_wait_until = 0
        self._slave_directory = os.path.abspath(section)
        if not os.path.exists(self._slave_directory):
            os.makedirs(self._slave_directory)

        if self._config["debug"] in ["yes", "true"]:
            self._logger = logging.getLogger()
            self._logger.setLevel(logging.DEBUG)

        self._slave = slave or Slave()

        for rdir in ["new", "pass", "fail", "untestable", "reserved"]:
            rdir = os.path.join(self._slave_directory, rdir)
            if not os.path.exists(rdir):
                os.mkdir(rdir)

        if int(self._config["max-reserved"]) > 0:
            self._check_tarball()

    def _throttle_if_overloaded(self):
        global interrupted
        if interrupted or got_sighup:
            return
        if self._config["slave-load-max"] is None:
            return
        load_max = float(self._config["slave-load-max"])
        if load_max < 1.0:
            return
        if os.getloadavg()[0] <= load_max:
            return
        load_resume = max(load_max - 1.0, 0.9)
        secs = random.randrange(30, 90)
        self._slave.close()
        while True:
            load = os.getloadavg()[0]
            if load <= load_resume:
                break
            logging.info("Sleeping due to high load (%.2f)" % load)
            try:
                time.sleep(secs)
            except KeyboardInterrupt:
                interrupted = True
            if interrupted or got_sighup:
                break
            if secs < 300:
                secs += random.randrange(30, 90)

    def _connect_to_master(self, recycle=False):
        self._slave.set_master_host(self._config["master-host"])
        self._slave.set_master_user(self._config["master-user"])
        self._slave.set_master_command(self._config["master-command"])
        self._slave.set_section(self._config.section)
        self._slave.connect_to_master()
        if recycle:
            self._slave.enable_recycling()

    def _get_tarball(self):
        basetgz = self._config["chroot-tgz"] or self._distro_config.get_basetgz(
            self._config.get_start_distro(),
            self._config.get_arch(),
            merged_usr="--no-merged-usr" not in self._config["piuparts-flags"],
        )
        return os.path.join(self._config["basetgz-directory"], basetgz)

    def _check_tarball(self):
        if int(self._config["max-tgz-age"]) < 0:
            return

        oldcwd = os.getcwd()
        os.chdir(self._slave_directory)

        tgz = self._get_tarball()
        max_tgz_age = int(self._config["max-tgz-age"])
        min_tgz_retry_delay = int(self._config["min-tgz-retry-delay"])
        ttl = 0
        if max_tgz_age == 0:
            ttl = 86400
        needs_update = not os.path.exists(tgz)
        if not needs_update and max_tgz_age > 0:
            # tgz exists and age is limited, so check age
            now = time.time()
            age = now - os.path.getmtime(tgz)
            ttl = max_tgz_age - age
            logging.info("Check-replace %s: age=%d vs. max=%d" % (tgz, age, max_tgz_age))
            if ttl < 0:
                if os.path.exists(tgz + ".log"):
                    age = now - os.path.getmtime(tgz + ".log")
                ttl = min_tgz_retry_delay - age
                logging.info("Limit-replace %s: last-retry=%d vs. min=%d" % (tgz, age, min_tgz_retry_delay))
                if ttl < 0:
                    needs_update = True
                    logging.info("%s too old.  Forcing re-creation" % tgz)
        if needs_update:
            self._slave.close()
            create_chroot(self._config, tgz, self._config.get_start_distro())
            ttl = min_tgz_retry_delay
        self._tarball_wait_until = time.time() + ttl

        os.chdir(oldcwd)

    def _get_refchroot_metadata(self):
        if self._config["chroot-meta-auto"]:
            if self._config["chroot-meta-directory"]:
                path = os.path.join(self._config["chroot-meta-directory"], self._config.section)
                if not os.path.exists(path):
                    os.makedirs(path)
                return os.path.join(path, self._config["chroot-meta-auto"])
            return self._config["chroot-meta-auto"]
        return None

    def _check_refchroot_metadata(self):
        refchroot_metadata = self._get_refchroot_metadata()
        if refchroot_metadata:
            if os.path.exists(refchroot_metadata):
                try:
                    age = time.time() - os.path.getmtime(refchroot_metadata)
                    if age > 6 * 3600:
                        os.unlink(refchroot_metadata)
                        logging.info("Deleting old %s" % refchroot_metadata)
                except OSError:
                    pass

    def _count_submittable_logs(self):
        files = 0
        subdirs = ["pass", "fail", "untestable"]
        if interrupted:
            subdirs += ["reserved", "new"]
        for logdir in subdirs:
            for basename in os.listdir(os.path.join(self._slave_directory, logdir)):
                if basename.endswith(".log"):
                    files += 1
        return files

    def precedence(self):
        return int(self._config["precedence"])

    def sleep_until(self, recycle=False):
        if recycle:
            return max(self._error_wait_until, self._recycle_wait_until)
        return max(self._error_wait_until, self._idle_wait_until)

    def run(self, do_processing=True, recycle=False):
        if time.time() < self.sleep_until(recycle=recycle):
            return 0

        self._throttle_if_overloaded()

        self._config = Config(section=self._config.section, defaults_section="global")
        try:
            self._config.read(CONFIG_FILE)
        except MissingSection:
            logging.info("unknown section " + self._config.section)
            self._error_wait_until = time.time() + 3600
            return 0
        self._distro_config = piupartslib.conf.DistroConfig(DISTRO_CONFIG_FILE, self._config["mirror"])

        if interrupted or got_sighup:
            do_processing = False

        if do_processing and time.time() > self._tarball_wait_until:
            self._check_tarball()

        if self._config.get_distro() == "None":
            # section is for tarball creation only
            self._idle_wait_until = self._tarball_wait_until + 60
            self._recycle_wait_until = self._tarball_wait_until + 3600
            return 0

        if interrupted or got_sighup:
            do_processing = False

        if not do_processing and self._count_submittable_logs() == 0:
            return 0

        logging.info("-------------------------------------------")
        action = "Running"
        if recycle:
            action = "Recycling"
        if not do_processing:
            action = "Flushing"
        logging.info("%s section %s (precedence=%d)" % (action, self._config.section, self.precedence()))

        if int(self._config["max-reserved"]) == 0:
            logging.info("disabled")
            self._error_wait_until = time.time() + 12 * 3600
            return 0

        if not self._config.get_distro() and not self._config.get_distros():
            logging.error("neither 'distro' nor 'upgrade-test-distros' configured")
            self._error_wait_until = time.time() + 3600
            return 0

        with open(os.path.join(self._slave_directory, "slave.lock"), "w") as lock:
            oldcwd = os.getcwd()
            os.chdir(self._slave_directory)
            try:
                fcntl.flock(lock, fcntl.LOCK_EX | fcntl.LOCK_NB)
            except IOError:
                logging.info("busy")
                self._error_wait_until = time.time() + 900
            else:
                if do_processing:
                    self._check_refchroot_metadata()
                if self._talk_to_master(fetch=do_processing, recycle=recycle, unreserve=interrupted):
                    if do_processing:
                        if not self._slave.get_reserved():
                            self._idle_wait_until = time.time() + int(self._config["idle-sleep"])
                            if recycle:
                                self._recycle_wait_until = self._idle_wait_until + 3600
                if do_processing and self._slave.get_reserved():
                    processed = self._process()
                    if got_sighup and self._slave.get_reserved():
                        # keep this section at the front of the round-robin runnable queue
                        self._idle_wait_until = 0
                        self._recycle_wait_until = 0
                    else:
                        # put this section at the end of the round-robin runnable queue
                        self._idle_wait_until = time.time()
                        self._recycle_wait_until = time.time()
                    return processed
            finally:
                os.chdir(oldcwd)
        return 0

    def _talk_to_master(self, fetch=False, unreserve=False, recycle=False):
        flush = self._count_submittable_logs() > 0
        fetch = fetch and not self._slave.get_reserved()
        if not flush and not fetch:
            return True

        try:
            self._connect_to_master(recycle=recycle)
        except KeyboardInterrupt:
            raise
        except MasterIsBusy:
            logging.error("master is busy")
            self._error_wait_until = time.time() + random.randrange(60, 180)
        except MasterCantRecycle:
            logging.error("master has nothing to recycle")
            self._recycle_wait_until = max(time.time(), self._idle_wait_until) + 3600
        except (
            MasterDidNotGreet,
            MasterIsCrazy,
            MasterCommunicationFailed,
            MasterNotOK,
        ):
            logging.error("connection to master failed")
            self._error_wait_until = time.time() + 900
            self._slave.close()
        else:
            try:
                for logdir in ["pass", "fail", "untestable"]:
                    for basename in os.listdir(logdir):
                        if basename.endswith(".log"):
                            fullname = os.path.join(logdir, basename)
                            self._slave.send_log(self._config.section, logdir, fullname)
                            os.remove(fullname)

                if unreserve:
                    for logdir in ["new", "reserved"]:
                        for basename in os.listdir(logdir):
                            if basename.endswith(".log"):
                                fullname = os.path.join(logdir, basename)
                                self._slave.unreserve(fullname)
                                os.remove(fullname)

                if fetch:
                    max_reserved = int(self._config["max-reserved"])
                    idle = self._slave.get_idle()
                    if idle > 0:
                        idle = min(idle, int(self._config["idle-sleep"]))
                        logging.info("idle (%d)" % idle)
                        if not recycle:
                            self._idle_wait_until = time.time() + idle
                        else:
                            self._recycle_wait_until = time.time() + idle
                        return 0
                    while len(self._slave.get_reserved()) < max_reserved and self._slave.reserve():
                        pass
                    self._slave.get_status(self._config.section)
            except MasterNotOK:
                logging.error("master did not respond with 'ok'")
                self._error_wait_until = time.time() + 900
                self._slave.close()
            except (MasterIsCrazy, MasterCommunicationFailed):
                logging.error("communication with master failed")
                self._error_wait_until = time.time() + 900
                self._slave.close()
            else:
                return True
        return False

    def _process(self):
        global interrupted
        last_flush = time.time()

        packagenames = set([x[0] for x in self._slave.get_reserved()])
        packages_files = {}
        for distro in [self._config.get_distro()] + self._config.get_distros():
            if distro not in packages_files:
                try:
                    pf = piupartslib.packagesdb.PackagesFile()
                    pf.load_packages_urls(
                        self._distro_config.get_packages_urls(distro, self._config.get_area(), self._config.get_arch()),
                        packagenames,
                    )
                    packages_files[distro] = pf
                except IOError:
                    logging.error("failed to fetch packages file for %s" % distro)
                    self._error_wait_until = time.time() + 900
                    return 0
                except KeyboardInterrupt:
                    interrupted = True
        del packagenames

        test_count = 0
        self._check_tarball()
        if not os.path.exists(self._get_tarball()):
            self._error_wait_until = time.time() + 300
        self._check_refchroot_metadata()
        for package_name, version in self._slave.get_reserved():
            self._throttle_if_overloaded()
            if interrupted or got_sighup:
                break
            if int(self._config["slave-flush-interval"]):
                if time.time() - last_flush > int(self._config["slave-flush-interval"]):
                    last_flush += 300  # throttle retries
                    if self._talk_to_master():
                        last_flush = time.time()
            if not os.path.exists(self._get_tarball()):
                logging.error("Missing chroot-tgz %s" % self._get_tarball())
                break
            test_count += 1
            self._test_package(package_name, version, packages_files)
            self._slave.forget_reserved(package_name, version)
        self._talk_to_master(unreserve=interrupted)
        return test_count

    def _test_package(self, pname, pvers, packages_files):
        global old_sigint_handler
        old_sigint_handler = signal(SIGINT, sigint_handler)
        self._slave.close()

        logging.info("Testing package %s/%s %s" % (self._config.section, pname, pvers))

        output_name = log_name(pname, pvers)
        logging.debug("Opening log file %s" % output_name)
        new_name = os.path.join("new", output_name)
        output = open(new_name, "w")
        output.write(time.strftime("Start: %Y-%m-%d %H:%M:%S %Z\n", time.gmtime()))

        distupgrade = len(self._config.get_distros()) > 1

        command = []
        if self._config["setarch"]:
            command.append("setarch")
            command.extend(self._config["setarch"].split())
        command.extend(self._config["piuparts-command"].split())
        if self._config["piuparts-flags"]:
            command.extend(self._config["piuparts-flags"].split())
        if "http_proxy" in os.environ:
            command.extend(["--proxy", os.environ["http_proxy"]])
        if self._config["mirror"]:
            mirror = self._config["mirror"]
            if self._config["components"]:
                mirror += " " + self._config["components"]
            command.extend(["--mirror", mirror])
        if self._config["tmpdir"]:
            command.extend(["--tmpdir", self._config["tmpdir"]])
        command.extend(["--arch", self._config.get_arch()])
        command.extend(["-b", self._get_tarball()])
        if not distupgrade:
            command.extend(["-d", self._config.get_distro()])
            command.append("--no-upgrade-test")
        else:
            for distro in self._config.get_distros():
                command.extend(["-d", distro])
        if self._config["keep-sources-list"] in ["yes", "true"]:
            command.append("--keep-sources-list")
        if distupgrade and self._config["chroot-meta-auto"]:
            refchroot_metadata = self._get_refchroot_metadata()
            if not os.path.exists(refchroot_metadata):
                command.extend(["-S", refchroot_metadata])
            else:
                command.extend(["-B", refchroot_metadata])
        command.extend(["--apt", "%s=%s" % (pname, pvers)])

        subdir = "fail"
        ret = 0

        if not distupgrade:
            distro = self._config.get_distro()
            if pname not in packages_files[distro]:
                output.write("Package %s found not in %s\n" % (pname, distro))
                ret = -10001
            else:
                package = packages_files[distro][pname]
                if pvers != package["Version"]:
                    output.write(
                        "Package %s %s not found in %s, %s is available\n" % (pname, pvers, distro, package["Version"])
                    )
                    ret = -10002
                output.write("\n")
                package.dump(output)
                output.write("\n")
        else:
            distros = self._config.get_distros()
            if distros:
                # the package must exist somewhere
                for distro in distros:
                    if pname in packages_files[distro]:
                        break
                else:
                    output.write("Package %s not found in any distribution\n" % pname)
                    ret = -10003

                # the package must have the correct version in the distupgrade target distro
                distro = distros[-1]
                if pname not in packages_files[distro]:
                    # the package may "disappear" in the distupgrade target distro
                    if pvers == "None":
                        pass
                    else:
                        output.write("Package %s not found in %s\n" % (pname, distro))
                        ret = -10004
                else:
                    package = packages_files[distro][pname]
                    if pvers != package["Version"]:
                        output.write(
                            "Package %s %s not found in %s, %s is available\n"
                            % (pname, pvers, distro, package["Version"])
                        )
                        ret = -10005

                for distro in distros:
                    output.write("\n[%s]\n" % distro)
                    if pname in packages_files[distro]:
                        packages_files[distro][pname].dump(output)
                output.write("\n")

                if ret == 0:
                    prev = "~"
                    for distro in distros:
                        if pname in packages_files[distro]:
                            v = packages_files[distro][pname]["Version"]
                            if not apt_pkg.version_compare(prev, v) <= 0:
                                output.write("Upgrade to %s requires downgrade: %s > %s\n" % (distro, prev, v))
                                ret = -10006
                            prev = v
            else:
                ret = -10010
        if ret != 0:
            subdir = "untestable"

        if ret == 0:
            output.write("Executing: %s\n" % command2string(command))
            ret, f = run_test_with_timeout(command, MAX_WAIT_TEST_RUN)
            if not f or f[-1] != "\n":
                f += "\n"
            output.write(f.replace("\033", "[ESC]"))
            lastline = f.split("\n")[-2]
            if ret < 0:
                output.write(" *** Process KILLED - exceed maximum run time ***\n")
            elif "piuparts run ends" not in lastline:
                ret += 1024
                output.write(" *** PIUPARTS OUTPUT INCOMPLETE ***\n")
            elif distupgrade and self._config["chroot-meta-auto"]:
                try:
                    refchroot_metadata = self._get_refchroot_metadata()
                    if "History of available packages does not match - reference chroot may be outdated" in f:
                        os.unlink(refchroot_metadata)
                        logging.info("Deleting outdated %s" % refchroot_metadata)
                    elif "Initial package selections do not match - ignoring loaded reference chroot state" in f:
                        os.unlink(refchroot_metadata)
                        logging.info("Deleting mismatching %s" % refchroot_metadata)
                except OSError:
                    pass

        output.write("\n")
        output.write("ret=%d\n" % ret)
        output.write(time.strftime("End: %Y-%m-%d %H:%M:%S %Z\n", time.gmtime()))
        output.close()
        if ret == 0:
            subdir = "pass"
        os.rename(new_name, os.path.join(subdir, output_name))
        logging.debug("Done with %s: %s (%d)" % (output_name, subdir, ret))
        signal(SIGINT, old_sigint_handler)


def log_name(package, version):
    return "%s_%s.log" % (package, version)


def command2string(command):
    """Quote s.t. copy+paste from the logfile gives a runnable command in the shell."""
    return " ".join([shlex.quote(arg) for arg in command])


def run_test_with_timeout(cmd, maxwait, kill_all=True):
    def terminate_subprocess(p, kill_all):
        pids = [p.pid]
        if kill_all:
            ps = subprocess.Popen(
                ["ps", "--no-headers", "-o", "pid", "--ppid", "%d" % p.pid],
                stdout=subprocess.PIPE,
                universal_newlines=True,
            )
            stdout, stderr = ps.communicate()
            pids.extend([int(pid) for pid in stdout.split()])
        if p.poll() is None:
            print("Sending SIGINT...")
            try:
                os.killpg(os.getpgid(p.pid), SIGINT)
            except OSError:
                pass
            # piuparts has 30 seconds to clean up after Ctrl-C
            for i in range(60):
                time.sleep(0.5)
                if p.poll() is not None:
                    break
        if p.poll() is None:
            print("Sending SIGTERM...")
            p.terminate()
            # piuparts has 5 seconds to clean up after SIGTERM
            for i in range(10):
                time.sleep(0.5)
                if p.poll() is not None:
                    break
        if p.poll() is None:
            print("Sending SIGKILL...")
            p.kill()
        for pid in pids:
            if pid > 0:
                try:
                    os.kill(pid, SIGKILL)
                    print("Killed %d" % pid)
                except OSError:
                    pass

    logging.debug("Executing: %s" % command2string(cmd))

    stdout = ""
    p = subprocess.Popen(
        cmd,
        preexec_fn=os.setpgrp,
        universal_newlines=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
    )
    if maxwait > 0:
        signal(SIGALRM, alarm_handler)
        alarm(maxwait)
    try:
        stdout, stderr = p.communicate()
        alarm(0)
    except Alarm:
        terminate_subprocess(p, kill_all)
        return -1, stdout
    except KeyboardInterrupt:
        print("\nSlave interrupted by the user, cleaning up...")
        try:
            terminate_subprocess(p, kill_all)
        except KeyboardInterrupt:
            print("\nTerminating piuparts was interrupted... manual cleanup still neccessary.")
            raise
        raise

    ret = p.returncode
    if ret in [124, 137]:
        # process was terminated by the timeout command
        ret = -ret
    return ret, stdout


def create_chroot(config, tarball, distro):
    command = []
    if config["setarch"]:
        command.append("setarch")
        command.extend(config["setarch"].split())
    command.extend(config["piuparts-command"].split())
    if config["piuparts-flags"]:
        command.extend(config["piuparts-flags"].split())
    if "http_proxy" in os.environ:
        command.extend(["--proxy", os.environ["http_proxy"]])
    if config["mirror"]:
        mirror = config["mirror"]
        if config["components"]:
            mirror += " " + config["components"]
        command.extend(["--mirror", mirror])
    if config["tmpdir"]:
        command.extend(["--tmpdir", config["tmpdir"]])
    command.extend(["--arch", config.get_arch()])
    command.extend(["-d", distro])
    command.extend(["-s", tarball + ".new"])
    command.extend(["--no-install-purge-test", "--no-upgrade-test"])
    command.extend(["--apt", "TARBALL"])  # dummy package name

    output_name = tarball + ".log"
    with open(output_name, "w") as output:
        try:
            fcntl.flock(output, fcntl.LOCK_EX | fcntl.LOCK_NB)
        except IOError:
            logging.info("Creation of tarball %s already in progress." % tarball)
        else:
            logging.info("Creating new tarball %s" % tarball)
            output.write(time.strftime("Start: %Y-%m-%d %H:%M:%S %Z\n\n", time.gmtime()))
            output.write("Executing: " + command2string(command) + "\n\n")
            logging.debug("Executing: " + command2string(command))
            try:
                p = subprocess.Popen(
                    command,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.STDOUT,
                    universal_newlines=True,
                )
                for line in p.stdout:
                    output.write(line)
                    logging.debug(">> " + line.rstrip())
                p.wait()
                output.write(time.strftime("\nEnd: %Y-%m-%d %H:%M:%S %Z\n", time.gmtime()))
                if os.path.exists(tarball + ".new"):
                    os.rename(tarball + ".new", tarball)
                else:
                    logging.error("Tarball creation failed, see %s" % output_name)
            except IOError:
                output.write(time.strftime("\nFAIL: %Y-%m-%d %H:%M:%S %Z\n", time.gmtime()))
                logging.error("Tarball creation failed with IOError")


def create_file(filename, contents):
    with open(filename, "w") as f:
        f.write(contents)


def main():
    setup_logging(logging.INFO, None)
    signal(SIGHUP, sighup_handler)
    signal(SIGUSR1, sigusr1_handler)

    # For supporting multiple architectures and suites, we take command-line
    # argument(s) referring to section(s) in the configuration file.
    # If no argument is given, the "sections" entry from the "global" section
    # is used.
    section_names = []
    global_config = Config(section="global")
    global_config.read(CONFIG_FILE)
    if global_config["proxy"]:
        os.environ["http_proxy"] = global_config["proxy"]
    if len(sys.argv) > 1:
        section_names = sys.argv[1:]
    else:
        section_names = global_config["sections"].split()
        section_names += global_config["basetgz-sections"].split()

    persistent_connection = Slave()
    sections = []
    for section_name in section_names:
        try:
            sections.append(Section(section_name, persistent_connection))
        except MissingSection:
            # ignore unknown sections
            pass

    if not sections:
        logging.error("no sections found")
        return

    # flush logs from previous run
    for section in sections:
        section.run(do_processing=False)

    while True:
        global got_sighup
        test_count = 0

        for section in sorted(sections, key=lambda section: (section.precedence(), section.sleep_until())):
            test_count += section.run(do_processing=(test_count == 0))

        if got_sigusr1:
            logging.info("Restarting...")
            os.execv(__file__, sys.argv)

        if test_count == 0 and got_sighup:
            # clear SIGHUP state after flushing all sections
            got_sighup = False
            continue

        if test_count == 0:
            # try to recycle old logs
            # round robin recycling of all sections is ensured by the recycle_wait_until timestamps
            idle_until = min([section.sleep_until() for section in sections])
            for section in sorted(sections, key=lambda section: section.sleep_until(recycle=True)):
                test_count += section.run(recycle=True)
                if test_count > 0 and idle_until < time.time():
                    break

        if interrupted:
            raise KeyboardInterrupt

        if test_count == 0 and not got_sighup:
            now = time.time()
            sleep_until = min(
                [now + int(global_config["idle-sleep"])] + [section.sleep_until() for section in sections]
            )
            if sleep_until > now:
                to_sleep = max(60, sleep_until - now)
                persistent_connection.close()
                logging.info("Nothing to do, sleeping for %d seconds." % to_sleep)
                time.sleep(to_sleep)


if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print("")
        print("Slave interrupted by the user, exiting...")
        sys.exit(1)

# vi:set et ts=4 sw=4 :
