From d331654069fe683ad12967830c43b0824ad14c45 Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Apr 10 2024 09:20:23 +0000 Subject: PR#4012: rmtree: use fork Merges #4012 https://pagure.io/koji/pull-request/4012 Fixes: #3755 https://pagure.io/koji/issue/3755 koji-gc occasionally gets Directory not empty fault on delete Fixes: #2481 https://pagure.io/koji/issue/2481 koji.util.rmtree() is not multi-process/thread safe --- diff --git a/koji/util.py b/koji/util.py index 0f70750..31c32ba 100644 --- a/koji/util.py +++ b/koji/util.py @@ -25,6 +25,7 @@ import calendar import datetime import errno import hashlib +import json import logging import os import os.path @@ -34,6 +35,7 @@ import shutil import stat import struct import sys +import tempfile import time import warnings from fnmatch import fnmatch @@ -433,8 +435,135 @@ class _RetryRmtree(Exception): # We raise this exception only when it makes sense for rmtree to retry from the top -def rmtree(path, logger=None): - """Delete a directory tree without crossing fs boundaries""" +def rmtree(path, logger=None, background=False): + """Delete a directory tree without crossing fs boundaries + + :param str path: the directory to remove + :param Logger logger: Logger object + :param bool background: if True, runs in the background returning a cleanup function + :return: None or [pid, check_function] + + In the background case, caller is responsible for waiting on pid and should call + the returned check function once it finishes. + """ + # we use the fake logger to avoid issues with logging locks while forking + fd, logfile = tempfile.mkstemp(suffix='.jsonl') + os.close(fd) + pid = os.fork() + + if not pid: + # child process + try: + status = 1 + with SimpleProxyLogger(logfile) as mylogger: + try: + _rmtree_nofork(path, logger=mylogger) + except Exception as e: + mylogger.error('rmtree failed: %s' % e) + raise + status = 0 + finally: + # diediedie + os._exit(status) + # not reached + + # parent process + logger = logger or logging.getLogger('koji') + + def _rmtree_check(): + if not background: + # caller will wait in background case + _pid, status = os.waitpid(pid, 0) + try: + SimpleProxyLogger.send(logfile, logger) + except Exception as err: + logger.error("Failed to get rmtree logs -- %s" % err) + try: + os.unlink(logfile) + except Exception: + pass + if not background: + # caller should check status in background case + if not isSuccess(status): + raise koji.GenericError(parseStatus(status, "rmtree process")) + if os.path.exists(path): + raise koji.GenericError("Failed to remove directory: %s" % path) + + if background: + return pid, _rmtree_check + else: + return _rmtree_check() + + +class SimpleProxyLogger(object): + """Save log messages to a file and log them later""" + + DEBUG = logging.DEBUG + INFO = logging.INFO + WARNING = logging.WARNING + ERROR = logging.ERROR + + def __init__(self, filename): + self.outfile = koji._open_text_file(filename, mode='wt') + + # so we can use as a context manager + def __enter__(self): + return self + + def __exit__(self, _type, value, traceback): + self.outfile.close() + # don't eat exceptions + return False + + def log(self, level, msg, *args, **kwargs): + # jsonl output + data = [level, msg, args, kwargs] + try: + line = json.dumps(data, indent=None) + except Exception: + try: + data = [logging.ERROR, "Unable to log: %s" % data, (), {}] + line = json.dumps(data, indent=None) + except Exception: + line = '[40, "Invalid log data", [], {}]' + try: + self.outfile.write(line) + self.outfile.write('\n') + except Exception: + pass + + def info(self, msg, *args, **kwargs): + self.log(self.INFO, msg, *args, **kwargs) + + def warning(self, msg, *args, **kwargs): + self.log(self.WARNING, msg, *args, **kwargs) + + def error(self, msg, *args, **kwargs): + self.log(self.ERROR, msg, *args, **kwargs) + + def debug(self, msg, *args, **kwargs): + self.log(self.DEBUG, msg, *args, **kwargs) + + @staticmethod + def send(filename, logger): + with koji._open_text_file(filename, mode='rt') as fo: + for line in fo: + try: + level, msg, args, kwargs = json.loads(line) + except Exception: + level = logging.ERROR + msg = "Bad log data: %r" + args = (line,) + kwargs = {} + logger.log(level, msg, *args, **kwargs) + + +def _rmtree_nofork(path, logger=None): + """Delete a directory tree without crossing fs boundaries + + This function is not thread safe because it relies on chdir to avoid + forming long paths. + """ # implemented to avoid forming long paths # see: https://pagure.io/koji/issue/201 logger = logger or logging.getLogger('koji') @@ -447,12 +576,18 @@ def rmtree(path, logger=None): raise koji.GenericError("Not a directory: %s" % path) dev = st.st_dev cwd = os.getcwd() + abspath = os.path.abspath(path) try: # retry loop while True: try: os.chdir(path) + new_cwd = os.getcwd() + # make sure we're where we think we are + if not os.path.samefile(new_cwd, abspath): + raise koji.GenericError('chdir to %s resulted in different cwd %s', + path, new_cwd) except OSError as e: if e.errno in (errno.ENOENT, errno.ESTALE): # likely racing with another rmtree @@ -461,7 +596,7 @@ def rmtree(path, logger=None): return raise try: - _rmtree(dev, logger) + _rmtree(dev, new_cwd, logger) except _RetryRmtree as e: # reset and retry os.chdir(cwd) @@ -479,37 +614,45 @@ def rmtree(path, logger=None): raise -def _rmtree(dev, logger): +def _rmtree(dev, cwd, logger): """Remove all contents of CWD""" # This implementation avoids forming long paths and recursion. Otherwise # we will have errors with very deep directory trees. # - to avoid forming long paths we change directory as we go # - to avoid recursion we maintain our own stack dirstack = [] - # Each entry in dirstack is a list of subdirs for that level + # Each entry in dirstack contains data for a level of directory traversal + # - path + # - subdirs # As we descend into the tree, we append a new entry to dirstack # When we ascend back up after removal, we pop them off + while True: - dirs = _stripcwd(dev, logger) + dirs = _stripcwd(dev, cwd, logger) # if cwd has no subdirs, walk back up until we find some while not dirs and dirstack: + _assert_cwd(cwd) try: os.chdir('..') except OSError as e: + _assert_cwd(cwd) if e.errno in (errno.ENOENT, errno.ESTALE): # likely in a race with another rmtree # however, we cannot proceed from here, so we return to the top raise _RetryRmtree(str(e)) raise dirs = dirstack.pop() + cwd = os.path.dirname(cwd) - # now that we've ascended back up by one, the first dir entry is + # now that we've ascended back up by one, the last dir entry is # one we've just cleared, so we should remove it empty_dir = dirs.pop() + _assert_cwd(cwd) try: os.rmdir(empty_dir) except OSError as e: + _assert_cwd(cwd) # If this happens, either something else is writing to the dir, # or there is a bug in our code. # For now, we ignore this and proceed, but we'll still fail at @@ -524,9 +667,11 @@ def _rmtree(dev, logger): # otherwise we descend into the next subdir subdir = dirs[-1] # note: we do not pop here because we need to remember to remove subdir later + _assert_cwd(cwd) try: os.chdir(subdir) except OSError as e: + _assert_cwd(cwd) if e.errno == errno.ENOENT: # likely in a race with another rmtree # we'll ignore this and continue @@ -535,12 +680,26 @@ def _rmtree(dev, logger): logger.warning("Subdir disappeared during rmtree %s: %s" % (subdir, e)) continue # with dirstack unchanged raise + cwd = os.path.join(cwd, subdir) dirstack.append(dirs) -def _stripcwd(dev, logger): +def _assert_cwd(cwd): + try: + actual = os.getcwd() + except OSError as e: + if e.errno == errno.ENOENT: + # subsequent calls should fail with better handling + return + raise + if cwd != actual: + raise koji.GenericError('CWD changed unexpectedly: %s -> %s' % (cwd, actual)) + + +def _stripcwd(dev, cwd, logger): """Unlink all files in cwd and return list of subdirs""" dirs = [] + _assert_cwd(cwd) try: fdirs = os.listdir('.') except OSError as e: @@ -553,6 +712,7 @@ def _stripcwd(dev, logger): try: st = os.lstat(fn) except OSError as e: + _assert_cwd(cwd) if e.errno == errno.ENOENT: continue raise @@ -562,6 +722,7 @@ def _stripcwd(dev, logger): if stat.S_ISDIR(st.st_mode): dirs.append(fn) else: + _assert_cwd(cwd) try: os.unlink(fn) except OSError: diff --git a/tests/test_lib/test_utils.py b/tests/test_lib/test_utils.py index ce4a5e3..7552797 100644 --- a/tests/test_lib/test_utils.py +++ b/tests/test_lib/test_utils.py @@ -3,8 +3,10 @@ from __future__ import absolute_import import calendar import errno import locale +import logging from unittest.case import TestCase import mock +import multiprocessing import optparse import os import resource @@ -12,6 +14,7 @@ import time import six import shutil import tempfile +import threading import unittest import requests_mock @@ -1235,227 +1238,384 @@ class MavenUtilTestCase(unittest.TestCase): class TestRmtree(unittest.TestCase): + + def setUp(self): + # none of these tests should actually do anything with the fs + # however, just in case, we set up a tempdir and restore cwd + self.tempdir = tempfile.mkdtemp() + self.dirname = '%s/some-dir' % self.tempdir + os.mkdir(self.dirname) + self.savecwd = os.getcwd() + + self.chdir = mock.patch('os.chdir').start() + self.rmdir = mock.patch('os.rmdir').start() + self.unlink = mock.patch('os.unlink').start() + self.lstat = mock.patch('os.lstat').start() + self.listdir = mock.patch('os.listdir').start() + self.getcwd = mock.patch('os.getcwd').start() + self.isdir = mock.patch('stat.S_ISDIR').start() + self.samefile = mock.patch('os.path.samefile').start() + self._assert_cwd = mock.patch('koji.util._assert_cwd').start() + + def tearDown(self): + mock.patch.stopall() + os.chdir(self.savecwd) + shutil.rmtree(self.tempdir) + @patch('koji.util._rmtree') - @patch('os.rmdir') - @patch('os.chdir') - @patch('os.getcwd') - @patch('stat.S_ISDIR') - @patch('os.lstat') - def test_rmtree_file(self, lstat, isdir, getcwd, chdir, rmdir, _rmtree): - """ Tests that the koji.util.rmtree function raises error when the + def test_rmtree_file(self, _rmtree): + """ Tests that the koji.util._rmtree_nofork function raises error when the path parameter is not a directory. """ stat = mock.MagicMock() stat.st_dev = 'dev' - lstat.return_value = stat - isdir.return_value = False - getcwd.return_value = 'cwd' + self.lstat.return_value = stat + self.isdir.return_value = False + self.getcwd.return_value = 'cwd' with self.assertRaises(koji.GenericError): - koji.util.rmtree('/mnt/folder/some_file') + koji.util._rmtree_nofork(self.dirname) _rmtree.assert_not_called() - rmdir.assert_not_called() + self.rmdir.assert_not_called() @patch('koji.util._rmtree') - @patch('os.rmdir') - @patch('os.chdir') - @patch('os.getcwd') - @patch('stat.S_ISDIR') - @patch('os.lstat') - def test_rmtree_directory(self, lstat, isdir, getcwd, chdir, rmdir, _rmtree): - """ Tests that the koji.util.rmtree function returns nothing when the path is a directory. + def test_rmtree_directory(self, _rmtree): + """ Tests that the koji.util._rmtree_nofork function returns nothing when the path is a directory. """ stat = mock.MagicMock() stat.st_dev = 'dev' - lstat.return_value = stat - isdir.return_value = True - getcwd.return_value = 'cwd' - path = '/mnt/folder' + self.lstat.return_value = stat + self.isdir.return_value = True + path = self.dirname + self.getcwd.return_value = path logger = mock.MagicMock() - self.assertEqual(koji.util.rmtree(path, logger), None) - chdir.assert_called_with('cwd') - _rmtree.assert_called_once_with('dev', logger) - rmdir.assert_called_once_with(path) + result = koji.util._rmtree_nofork(path, logger) + self.assertEqual(result, None) + self.chdir.assert_called_with(path) + _rmtree.assert_called_once_with('dev', path, logger) + self.rmdir.assert_called_once_with(path) - @patch('koji.util._rmtree') - @patch('os.rmdir') - @patch('os.chdir') - @patch('os.getcwd') - @patch('stat.S_ISDIR') - @patch('os.lstat') - def test_rmtree_directory_scrub_failure(self, lstat, isdir, getcwd, chdir, rmdir, _rmtree): - """ Tests that the koji.util.rmtree function returns a GeneralException + @patch('koji.util._stripcwd') + def test_rmtree_directory_stripcwd_failure(self, stripcwd): + """ Tests that the koji.util._rmtree_nofork function returns a GeneralException when the scrub of the files in the directory fails. """ stat = mock.MagicMock() stat.st_dev = 'dev' - lstat.return_value = stat - isdir.return_value = True - getcwd.return_value = 'cwd' - path = '/mnt/folder' + self.lstat.return_value = stat + self.isdir.return_value = True + self.getcwd.return_value = 'cwd' + stripcwd.side_effect = OSError('xyz') + logger = mock.MagicMock() + + with self.assertRaises(OSError): + koji.util._rmtree('dev', 'cwd', logger) + + @patch('koji.util._rmtree') + def test_rmtree_call_failure(self, _rmtree): + """ Tests that the koji.util._rmtree_nofork function returns a GeneralException + when the underlying _rmtree call fails + """ + stat = mock.MagicMock() + stat.st_dev = 'dev' + self.lstat.return_value = stat + self.isdir.return_value = True + self.getcwd.return_value = 'cwd' + path = self.dirname _rmtree.side_effect = OSError('xyz') with self.assertRaises(OSError): - koji.util.rmtree(path) + koji.util._rmtree_nofork(path) + + @patch('koji.util._rmtree') + def test_rmtree_getcwd_mismatch(self, _rmtree): + """ Tests that the koji.util._rmtree_nofork function returns a GeneralException + when getcwd disagrees with initial chdir + """ + stat = mock.MagicMock() + stat.st_dev = 'dev' + self.lstat.return_value = stat + self.isdir.return_value = True + self.getcwd.return_value = 'cwd' + path = self.dirname + self.samefile.return_value = False + + with self.assertRaises(koji.GenericError): + koji.util._rmtree_nofork(path) - @patch('os.chdir') - @patch('os.rmdir') @patch('koji.util._stripcwd') - def test_rmtree_internal_empty(self, stripcwd, rmdir, chdir): + def test_rmtree_internal_empty(self, stripcwd): dev = 'dev' stripcwd.return_value = [] logger = mock.MagicMock() - koji.util._rmtree(dev, logger) + koji.util._rmtree(dev, self.dirname, logger) - stripcwd.assert_called_once_with(dev, logger) - rmdir.assert_not_called() - chdir.assert_not_called() + stripcwd.assert_called_once_with(dev, self.dirname, logger) + self.rmdir.assert_not_called() + self.chdir.assert_not_called() - @patch('os.chdir') - @patch('os.rmdir') @patch('koji.util._stripcwd') - def test_rmtree_internal_dirs(self, stripcwd, rmdir, chdir): + def test_rmtree_internal_dirs(self, stripcwd): dev = 'dev' stripcwd.side_effect = (['a', 'b'], [], []) logger = mock.MagicMock() + path = self.dirname - koji.util._rmtree(dev, logger) + koji.util._rmtree(dev, path, logger) - stripcwd.assert_has_calls([call(dev, logger), call(dev, logger), call(dev, logger)]) - rmdir.assert_has_calls([call('b'), call('a')]) - chdir.assert_has_calls([call('b'), call('..'), call('a'), call('..')]) + stripcwd.assert_has_calls([call(dev, path, logger), + call(dev, path + '/b', logger), + call(dev, path + '/a', logger)]) + self.rmdir.assert_has_calls([call('b'), call('a')]) + self.chdir.assert_has_calls([call('b'), call('..'), call('a'), call('..')]) - @patch('os.chdir') - @patch('os.rmdir') @patch('koji.util._stripcwd') - def test_rmtree_internal_fail(self, stripcwd, rmdir, chdir): + def test_rmtree_internal_fail(self, stripcwd): dev = 'dev' stripcwd.side_effect = (['a', 'b'], [], []) - rmdir.side_effect = OSError() + self.rmdir.side_effect = OSError() logger = mock.MagicMock() + path = self.dirname # don't fail on anything - koji.util._rmtree(dev, logger) + koji.util._rmtree(dev, path, logger) - stripcwd.assert_has_calls([call(dev, logger), call(dev, logger), call(dev, logger)]) - rmdir.assert_has_calls([call('b'), call('a')]) - chdir.assert_has_calls([call('b'), call('..'), call('a'), call('..')]) + stripcwd.assert_has_calls([call(dev, path, logger), + call(dev, path + '/b', logger), + call(dev, path + '/a', logger)]) + self.rmdir.assert_has_calls([call('b'), call('a')]) + self.chdir.assert_has_calls([call('b'), call('..'), call('a'), call('..')]) - @patch('os.listdir') - @patch('os.lstat') - @patch('stat.S_ISDIR') - @patch('os.unlink') - def test_stripcwd_empty(self, unlink, isdir, lstat, listdir): + def test_stripcwd_empty(self): # simple empty directory dev = 'dev' - listdir.return_value = [] + self.listdir.return_value = [] logger = mock.MagicMock() - koji.util._stripcwd(dev, logger) + koji.util._stripcwd(dev, self.dirname, logger) - listdir.assert_called_once_with('.') - unlink.assert_not_called() - isdir.assert_not_called() - lstat.assert_not_called() + self.listdir.assert_called_once_with('.') + self.unlink.assert_not_called() + self.isdir.assert_not_called() + self.lstat.assert_not_called() - @patch('os.listdir') - @patch('os.lstat') - @patch('stat.S_ISDIR') - @patch('os.unlink') - def test_stripcwd_all(self, unlink, isdir, lstat, listdir): + def test_stripcwd_all(self): # test valid file + dir dev = 'dev' - listdir.return_value = ['a', 'b'] + self.listdir.return_value = ['a', 'b'] st = mock.MagicMock() st.st_dev = dev st.st_mode = 'mode' - lstat.return_value = st - isdir.side_effect = [True, False] + self.lstat.return_value = st + self.isdir.side_effect = [True, False] logger = mock.MagicMock() - koji.util._stripcwd(dev, logger) + koji.util._stripcwd(dev, self.dirname, logger) - listdir.assert_called_once_with('.') - unlink.assert_called_once_with('b') - isdir.assert_has_calls([call('mode'), call('mode')]) - lstat.assert_has_calls([call('a'), call('b')]) + self.listdir.assert_called_once_with('.') + self.unlink.assert_called_once_with('b') + self.isdir.assert_has_calls([call('mode'), call('mode')]) + self.lstat.assert_has_calls([call('a'), call('b')]) - @patch('os.listdir') - @patch('os.lstat') - @patch('stat.S_ISDIR') - @patch('os.unlink') - def test_stripcwd_diffdev(self, unlink, isdir, lstat, listdir): + def test_stripcwd_diffdev(self): # ignore files on different devices dev = 'dev' - listdir.return_value = ['a', 'b'] + self.listdir.return_value = ['a', 'b'] st1 = mock.MagicMock() st1.st_dev = dev st1.st_mode = 'mode' st2 = mock.MagicMock() st2.st_dev = 'other_dev' st2.st_mode = 'mode' - lstat.side_effect = [st1, st2] - isdir.side_effect = [True, False] + self.lstat.side_effect = [st1, st2] + self.isdir.side_effect = [True, False] logger = mock.MagicMock() - koji.util._stripcwd(dev, logger) + koji.util._stripcwd(dev, self.dirname, logger) - listdir.assert_called_once_with('.') - unlink.assert_not_called() - isdir.assert_called_once_with('mode') - lstat.assert_has_calls([call('a'), call('b')]) + self.listdir.assert_called_once_with('.') + self.unlink.assert_not_called() + self.isdir.assert_called_once_with('mode') + self.lstat.assert_has_calls([call('a'), call('b')]) - @patch('os.listdir') - @patch('os.lstat') - @patch('stat.S_ISDIR') - @patch('os.unlink') - def test_stripcwd_fails(self, unlink, isdir, lstat, listdir): + def test_stripcwd_fails(self): # ignore all unlink errors dev = 'dev' - listdir.return_value = ['a', 'b'] + self.listdir.return_value = ['a', 'b'] st = mock.MagicMock() st.st_dev = dev st.st_mode = 'mode' - lstat.return_value = st - isdir.side_effect = [True, False] - unlink.side_effect = OSError() + self.lstat.return_value = st + self.isdir.side_effect = [True, False] + self.unlink.side_effect = OSError() logger = mock.MagicMock() - koji.util._stripcwd(dev, logger) + koji.util._stripcwd(dev, self.dirname, logger) - listdir.assert_called_once_with('.') - unlink.assert_called_once_with('b') - isdir.assert_has_calls([call('mode'), call('mode')]) - lstat.assert_has_calls([call('a'), call('b')]) + self.listdir.assert_called_once_with('.') + self.unlink.assert_called_once_with('b') + self.isdir.assert_has_calls([call('mode'), call('mode')]) + self.lstat.assert_has_calls([call('a'), call('b')]) - @patch('os.listdir') - @patch('os.lstat') - @patch('stat.S_ISDIR') - @patch('os.unlink') - def test_stripcwd_stat_fail(self, unlink, isdir, lstat, listdir): + def test_stripcwd_stat_fail(self): # something else deletes a file in the middle of _stripcwd() dev = 'dev' - listdir.return_value = ['will-not-exist.txt'] - lstat.side_effect = OSError(errno.ENOENT, 'No such file or directory') + self.listdir.return_value = ['will-not-exist.txt'] + self.lstat.side_effect = OSError(errno.ENOENT, 'No such file or directory') logger = mock.MagicMock() - koji.util._stripcwd(dev, logger) + koji.util._stripcwd(dev, self.dirname, logger) + + self.listdir.assert_called_once_with('.') + self.lstat.assert_called_once_with('will-not-exist.txt') + self.unlink.assert_not_called() + self.isdir.assert_not_called() + + @mock.patch('koji.util._rmtree_nofork') + @mock.patch('os.fork') + @mock.patch('os._exit') + def test_rmtree_child(self, _exit, fork, rmtree_nofork): + fork.return_value = 0 + path = "/SOME_PATH" + logger = "LOGGER" + + class Exited(Exception): + pass + + _exit.side_effect = Exited + # using exception to simulate os._exit in the test + with self.assertRaises(Exited): + koji.util.rmtree(path, logger) + fork.assert_called_once() + rmtree_nofork.assert_called_once() + self.assertEqual(rmtree_nofork.mock_calls[0].args[0], path) + _exit.assert_called_once() + + @mock.patch('koji.util._rmtree_nofork') + @mock.patch('os.fork') + @mock.patch('os.waitpid') + @mock.patch('os._exit') + def test_rmtree_child_fails(self, _exit, waitpid, fork, rmtree_nofork): + fork.return_value = 0 + path = "/SOME_PATH" + logger = "LOGGER" + + class Failed(Exception): + pass + + rmtree_nofork.side_effect = Failed() + # the exception should be re-raised + with self.assertRaises(Failed): + koji.util.rmtree(path, logger) + fork.assert_called_once() + rmtree_nofork.assert_called_once() + self.assertEqual(rmtree_nofork.mock_calls[0].args[0], path) + _exit.assert_called_once() + waitpid.assert_not_called + + @mock.patch('koji.util._rmtree_nofork') + @mock.patch('os.fork') + @mock.patch('os.waitpid') + @mock.patch('os._exit') + def test_rmtree_parent(self, _exit, waitpid, fork, rmtree_nofork): + pid = 137 + fork.return_value = pid + waitpid.return_value = pid, 0 + path = "/SOME_PATH" + logger = "LOGGER" + koji.util.rmtree(path, logger) + fork.assert_called_once() + rmtree_nofork.assert_not_called() + _exit.assert_not_called() + + @mock.patch('koji.util.SimpleProxyLogger.send') + @mock.patch('koji.util._rmtree_nofork') + @mock.patch('os.fork') + @mock.patch('os.unlink') + @mock.patch('os.waitpid') + @mock.patch('os._exit') + def test_rmtree_parent_logfail(self, _exit, waitpid, unlink, fork, rmtree_nofork, logsend): + pid = 137 + fork.return_value = pid + waitpid.return_value = pid, 0 + path = "/SOME_PATH" + logger = mock.MagicMock() + + class Failed(Exception): + pass + + logsend.side_effect = Failed('hello') + koji.util.rmtree(path, logger) + logsend.assert_called_once() + logger.error.assert_called_once() + if not logger.error.mock_calls[0].args[0].startswith('Failed to get rmtree logs'): + raise Exception('Wrong log message') + fork.assert_called_once() + rmtree_nofork.assert_not_called() + _exit.assert_not_called() - listdir.assert_called_once_with('.') - lstat.assert_called_once_with('will-not-exist.txt') - unlink.assert_not_called() - isdir.assert_not_called() + +class TestAssertCWD(unittest.TestCase): + + def setUp(self): + self.getcwd = mock.patch('os.getcwd').start() + + def tearDown(self): + mock.patch.stopall() + + def test_assert_cwd(self): + self.getcwd.return_value = '/mydir' + koji.util._assert_cwd('/mydir') + with self.assertRaises(koji.GenericError): + koji.util._assert_cwd('/wrongdir') + + @mock.patch('os.getcwd') + def test_assert_cwd_call_fails(self, getcwd): + exc = Exception('hello') + getcwd.side_effect = exc + with self.assertRaises(Exception) as e: + koji.util._assert_cwd('/test') + # should re-raise same exception + self.assertEqual(e, exc) + + exc = OSError() + exc.errno = errno.ENOENT + getcwd.side_effect = exc + # should ignore + koji.util._assert_cwd('/test') class TestRmtree2(unittest.TestCase): def setUp(self): self.tempdir = tempfile.mkdtemp() + self.savecwd = os.getcwd() + # rmtree calls chdir, so save and restore cwd in case of a bug def tearDown(self): shutil.rmtree(self.tempdir) + os.chdir(self.savecwd) + + def test_rmtree_missing(self): + # should not error if already removed + dirname = '%s/NOSUCHDIR' % self.tempdir + koji.util.rmtree(dirname) + + dirname = '%s/NOSUCHDIR/NOSUCHDIR' % self.tempdir + koji.util.rmtree(dirname) + + def test_rmtree_notadir(self): + # should error if not a directory + fname = '%s/hello.txt' % self.tempdir + with open(fname, 'wt') as fo: + fo.write('hello\n') + with self.assertRaises(koji.GenericError): + koji.util.rmtree(fname) + + if not os.path.exists(fname): + raise Exception('deleted: %s', fname) def test_rmtree_parallel_chdir_down_failure(self): dirname = '%s/some-dir/' % self.tempdir @@ -1473,12 +1633,85 @@ class TestRmtree2(unittest.TestCase): mock_data['removed'] = True return os_chdir(*a, **kw) with mock.patch('os.chdir', new=my_chdir): - koji.util.rmtree(dirname) + koji.util._rmtree_nofork(dirname) if not mock_data['removed']: raise Exception('mocked call not working') if os.path.exists(dirname): raise Exception('test directory not removed') + def test_rmtree_relative(self): + relpath = 'some-dir-95628' + path = "%s/%s" % (self.tempdir, relpath) + os.makedirs('%s/a/b/c/d/e/f/g/h/i/j/k' % path) + + oldcwd = os.getcwd() + os.chdir(self.tempdir) + try: + koji.util._rmtree_nofork(relpath) + finally: + os.chdir(oldcwd) + + if os.path.exists(path): + raise Exception('test directory not removed') + + def test_rmtree_dev_change(self): + dirname = '%s/some-dir/' % self.tempdir + os.makedirs('%s/a/b/c/d/e/f/g/h/i/j/k' % dirname) + doomed = [ + '%s/a/b/c/d/e/f/DOOMED' % dirname, + '%s/a/b/c/d/e/DOOMED' % dirname, + '%s/a/b/DOOMED' % dirname, + ] + safe = [ + '%s/a/b/c/d/e/f/g/SAFE' % dirname, + '%s/a/b/c/d/e/f/g/h/SAFE' % dirname, + '%s/a/b/c/d/e/f/g/h/i/SAFE' % dirname, + '%s/a/b/c/d/e/f/g/h/i/j/SAFE' % dirname, + ] + for fn in doomed + safe: + with open(fn, 'wt') as fo: + fo.write('hello') + + os_lstat = os.lstat + pingfile = self.tempdir + '/ping' + + def my_lstat(path, **kw): + # report different dev mid-tree + ret = os_lstat(path, **kw) + if path.endswith('g'): + # path might be absolute or relative + with open(pingfile, 'wt') as fo: + fo.write('ping') + ret = mock.MagicMock(wraps=ret) + ret.st_dev = "NEWDEV" + return ret + + with mock.patch('os.lstat', new=my_lstat): + with self.assertRaises(koji.GenericError): + koji.util.rmtree(dirname) + if not os.path.exists(pingfile): + raise Exception('mocked call not working') + for fn in doomed: + if os.path.exists(fn): + raise Exception('not deleted: %s', fn) + for fn in safe: + if not os.path.exists(fn): + raise Exception('deleted: %s', fn) + if not os.path.exists(dirname): + raise Exception('deleted: %s', dirname) + + def test_rmtree_complex(self): + dirname = '%s/some-dir/' % self.tempdir + # For this test, we make a complex tree to remove + for i in range(8): + for j in range(8): + for k in range(8): + os.makedirs('%s/a/%s/c/d/%s/e/f/%s/g/h' % (dirname, i, j, k)) + + koji.util.rmtree(dirname) + if os.path.exists(dirname): + raise Exception('test directory not removed') + def test_rmtree_parallel_chdir_down_complex(self): dirname = '%s/some-dir/' % self.tempdir # For this test, we make a complex tree to remove @@ -1499,7 +1732,7 @@ class TestRmtree2(unittest.TestCase): mock_data['removed'] = True return os_chdir(path) with mock.patch('os.chdir', new=my_chdir): - koji.util.rmtree(dirname) + koji.util._rmtree_nofork(dirname) if not mock_data['removed']: raise Exception('mocked call not working') if os.path.exists(dirname): @@ -1525,7 +1758,7 @@ class TestRmtree2(unittest.TestCase): raise e return os_chdir(path) with mock.patch('os.chdir', new=my_chdir): - koji.util.rmtree(dirname) + koji.util._rmtree_nofork(dirname) if not mock_data['removed']: raise Exception('mocked call not working') if os.path.exists(dirname): @@ -1551,7 +1784,7 @@ class TestRmtree2(unittest.TestCase): raise e return os_listdir(*a, **kw) with mock.patch('os.listdir', new=my_listdir): - koji.util.rmtree(dirname) + koji.util._rmtree_nofork(dirname) if not mock_data['removed']: raise Exception('mocked call not working') if os.path.exists(dirname): @@ -1576,10 +1809,171 @@ class TestRmtree2(unittest.TestCase): return ret # does not contain extra_file with mock.patch('os.listdir', new=my_listdir): with self.assertRaises(OSError): - koji.util.rmtree(dirname) + koji.util._rmtree_nofork(dirname) if not mock_data.get('ping'): raise Exception('mocked call not working') + def test_rmtree_threading(self): + # multiple complex trees to be deleted in parallel threads + dirs = [] + for n in range(10): + dirname = '%s/some-dir-%s/' % (self.tempdir, n) + dirs.append(dirname) + for i in range(8): + for j in range(8): + for k in range(8): + os.makedirs('%s/a/%s/c/d/%s/e/f/%s/g/h' % (dirname, i, j, k)) + + sync = threading.Event() + def do_rmtree(dirname): + sync.wait() + koji.util.rmtree(dirname) + + threads = [] + for d in dirs: + thread = threading.Thread(target=do_rmtree, args=(d,)) + thread.start() + threads.append(thread) + sync.set() + for thread in threads: + thread.join() + + for dirname in dirs: + if os.path.exists(dirname): + raise Exception('test directory not removed') + + def test_rmtree_race_thread(self): + # parallel threads deleting the same complex tree + dirname = '%s/some-dir/' % (self.tempdir) + for i in range(8): + for j in range(8): + for k in range(8): + os.makedirs('%s/a/%s/c/d/%s/e/f/%s/g/h' % (dirname, i, j, k)) + + sync = threading.Event() + def do_rmtree(dirname): + sync.wait() + koji.util.rmtree(dirname) + + threads = [] + for n in range(3): + thread = threading.Thread(target=do_rmtree, args=(dirname,)) + thread.start() + threads.append(thread) + sync.set() + for thread in threads: + thread.join() + + if os.path.exists(dirname): + raise Exception('test directory not removed') + + def test_rmtree_race_process(self): + # parallel threads deleting the same complex tree + dirname = '%s/some-dir/' % (self.tempdir) + for i in range(8): + for j in range(8): + for k in range(8): + os.makedirs('%s/a/%s/c/d/%s/e/f/%s/g/h' % (dirname, i, j, k)) + + sync = multiprocessing.Event() + def do_rmtree(dirname): + sync.wait() + koji.util.rmtree(dirname) + + procs = [] + for n in range(3): + proc = multiprocessing.Process(target=do_rmtree, args=(dirname,)) + proc.start() + procs.append(proc) + sync.set() + for proc in procs: + proc.join() + + if os.path.exists(dirname): + raise Exception('test directory not removed') + + def test_rmtree_deep_subdir(self): + # create a deep subdir + dirname = '%s/some-dir/' % (self.tempdir) + MAX_PATH = os.pathconf(dirname, 'PC_PATH_MAX') + subname = "deep_path_directory_%05i_______________________________________________________" + limit = MAX_PATH // (len(subname % 123) + 1) + # two segments each 2/3 of the limit, so each below, but together above + seglen = limit * 2 // 3 + segment = '/'.join([subname % n for n in range(seglen)]) + path1 = os.path.join(dirname, segment) + os.makedirs(path1) + cwd = os.getcwd() + os.chdir(path1) + os.makedirs(segment) + os.chdir(cwd) + + koji.util.rmtree(dirname) + + if os.path.exists(dirname): + raise Exception('test directory not removed') + + +class TestProxyLogger(unittest.TestCase): + + def setUp(self): + self.tempdir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.tempdir) + + def test_proxy_logger(self): + logfile = self.tempdir + '/log.jsonl' + with koji.util.SimpleProxyLogger(logfile) as proxy: + proxy.info('hello world') + proxy.warning('hmm -- %s', ['data']) + proxy.error('We have a problem -- %r', {'a': 1}) + proxy.debug('yadayadayada') + + logger = mock.MagicMock() + koji.util.SimpleProxyLogger.send(logfile, logger) + logger.log.assert_has_calls([ + call(20, 'hello world'), + call(30, 'hmm -- %s', ['data']), + call(40, 'We have a problem -- %r', {'a': 1}), + call(10, 'yadayadayada')]) + + def test_proxy_logger_bad_data(self): + logfile = self.tempdir + '/log.jsonl' + with koji.util.SimpleProxyLogger(logfile) as proxy: + # non-json-encodable + proxy.info('bad - %s', Exception()) + logger = mock.MagicMock() + koji.util.SimpleProxyLogger.send(logfile, logger) + logger.log.assert_called_once() + self.assertEqual(logger.log.mock_calls[0].args[0], logging.ERROR) + if not logger.log.mock_calls[0].args[1].startswith('Unable to log'): + raise Exception('Wrong error message') + + def test_proxy_logger_bad_line(self): + logfile = self.tempdir + '/log.jsonl' + with open(logfile, 'wt') as fo: + fo.write('INVALID_JSON()') + logger = mock.MagicMock() + koji.util.SimpleProxyLogger.send(logfile, logger) + logger.log.assert_called_once() + self.assertEqual(logger.log.mock_calls[0].args[0], logging.ERROR) + if not logger.log.mock_calls[0].args[1].startswith('Bad log data: '): + raise Exception('Wrong error message') + + def test_proxy_logger_repr_fail(self): + class BadValue: + def __repr__(self): + raise ValueError('no') + strfail = BadValue() + + logfile = self.tempdir + '/log.jsonl' + with koji.util.SimpleProxyLogger(logfile) as proxy: + proxy.info('bad - %s', strfail) + logger = mock.MagicMock() + koji.util.SimpleProxyLogger.send(logfile, logger) + logger.log.assert_called_once_with(logging.ERROR, 'Invalid log data') + class TestMoveAndSymlink(unittest.TestCase): @mock.patch('koji.ensuredir') diff --git a/util/kojira b/util/kojira index 3032bea..1a258d7 100755 --- a/util/kojira +++ b/util/kojira @@ -285,53 +285,6 @@ class ManagedRepo(object): return self.state == koji.REPO_PROBLEM -class LoggingLockManager(object): - """A context manager that acquires all the logging locks""" - - # This lock business is a workaround for https://pagure.io/koji/issue/2714 - - def __enter__(self): - self.acquire_locks() - return self - - def __exit__(self, exc_type, exc_val, traceback): - # we want to free the locks regardless of what happend inside the with statement - self.release_locks() - return False # don't eat exceptions - - def acquire_locks(self): - # init - self.locks = [] - self.module_lock = False - toplogger = logging.getLogger('koji') - - # module level lock - if hasattr(logging, '_acquireLock'): - logging._acquireLock() - self.module_lock = True - - # also each handler can have its own lock - # in kojira, the we should only have handlers attached to the koji logger - self.locks = [h.lock for h in toplogger.handlers if h.lock] - for lock in self.locks: - lock.acquire() - - def release_locks(self): - # Only parent process should have locked locks in 3.9+, child ones will - # be reinitilized to free state - # In older pythons, state could be random (and no module_lock) - if self.module_lock: - try: - logging._releaseLock() - except RuntimeError: - pass - for lock in self.locks: - try: - lock.release() - except RuntimeError: - pass - - class RepoManager(object): def __init__(self, options, session): @@ -363,8 +316,9 @@ class RepoManager(object): len(self.repos), len(self.delete_pids)) for tag_id, task_id in self.tasks.items(): self.logger.debug("Tracking task %s for tag %s", task_id, tag_id) - for pid, desc in self.delete_pids.items(): - self.logger.debug("Delete job %s: %r", pid, desc) + for pid in self.delete_pids: + path = self.delete_pids[pid][0] + self.logger.debug("Delete job %s: %r", pid, path) def rmtree(self, path): """Spawn (or queue) and rmtree job""" @@ -375,18 +329,23 @@ class RepoManager(object): def checkQueue(self): finished = [pid for pid in self.delete_pids if self.waitPid(pid)] for pid in finished: - path = self.delete_pids[pid] - self.logger.info("Completed rmtree job for %s", path) + path, check_func = self.delete_pids[pid] del self.delete_pids[pid] + try: + check_func() + except Exception as e: + self.logger.error("Failed rmtree job for %s: %s", path, e) + continue + self.logger.info("Completed rmtree job for %s", path) while self.delete_queue and len(self.delete_pids) <= self.options.max_delete_processes: path = self.delete_queue.pop(0) - pid = self._rmtree(path) + pid, check_func = rmtree(path, background=True) # koji.util.rmtree self.logger.info("Started rmtree (pid %i) for %s", pid, path) - self.delete_pids[pid] = path + self.delete_pids[pid] = (path, check_func) def waitPid(self, pid): # XXX - can we unify with TaskManager? - prefix = "pid %i (%s)" % (pid, self.delete_pids.get(pid)) + prefix = "pid %i (%s)" % (pid, self.delete_pids.get(pid)[0]) try: (childpid, status) = os.waitpid(pid, os.WNOHANG) except OSError as e: @@ -401,24 +360,6 @@ class RepoManager(object): return True return False - def _rmtree(self, path): - with LoggingLockManager(): - pid = os.fork() - if pid: - return pid - # no return - try: - status = 1 - self.session._forget() - try: - rmtree(path) - status = 0 - except BaseException: - logger.error(''.join(traceback.format_exception(*sys.exc_info()))) - logging.shutdown() - finally: - os._exit(status) - def killChildren(self): # XXX - unify with TaskManager? sig = signal.SIGTERM