From 7e92bf9d6d2ad6406220490280125fe290a503a4 Mon Sep 17 00:00:00 2001 From: Marc <7050295+marcleblanc2@users.noreply.github.com> Date: Tue, 12 Mar 2024 02:12:28 +0300 Subject: [PATCH] Subversion batch commit processing (#23) * Added batch commit processing to SVN repo conversion * Improved error handling * Truncated long outputs * Switching customer environment to Docker images with "main" branch tag instead of "latest" --- repo-converter/build/run.py | 236 +++++++++++++++++++---------- repo-converter/docker-compose.yaml | 2 +- 2 files changed, 159 insertions(+), 79 deletions(-) diff --git a/repo-converter/build/run.py b/repo-converter/build/run.py index 4da7eb6..991e4b7 100644 --- a/repo-converter/build/run.py +++ b/repo-converter/build/run.py @@ -89,6 +89,7 @@ import signal # https://docs.python.org/3/library/signal.html import subprocess # https://docs.python.org/3/library/subprocess.html import sys # https://docs.python.org/3/library/sys.html +import textwrap # https://docs.python.org/3/library/textwrap.html import time # https://docs.python.org/3/library/time.html # Third party libraries # psutil requires adding gcc to the Docker image build, which adds about 4 minutes to the build time, and doubled the size of the image @@ -98,6 +99,7 @@ # Global variables +child_processes_list_dict = {} environment_variables_dict = {} repos_dict = {} script_name = os.path.basename(__file__) @@ -106,17 +108,13 @@ def register_signal_handler(): - logging.debug(f"Registering signal handler") - try: signal.signal(signal.SIGINT, signal_handler) except Exception as exception: - logging.error(f"Registering signal handler failed: {exception}") - - logging.debug(f"Registering signal handler succeeded") + logging.error(f"Registering signal handler failed with exception: {type(exception)}, {exception.args}, {exception}") def signal_handler(incoming_signal, frame): @@ -195,9 +193,9 @@ def parse_repos_to_convert_file_into_repos_dict(): logging.error(f"repos-to-convert.yaml file not found at {environment_variables_dict['REPOS_TO_CONVERT']}") sys.exit(1) - except (AttributeError, yaml.scanner.ScannerError) as e: + except (AttributeError, yaml.scanner.ScannerError) as exception: - logging.error(f"Invalid YAML file format in {environment_variables_dict['REPOS_TO_CONVERT']}, please check the structure matches the format in the README.md. Exception: {type(e)}, {e.args}, {e}") + logging.error(f"Invalid YAML file format in {environment_variables_dict['REPOS_TO_CONVERT']}, please check the structure matches the format in the README.md. Exception: {type(exception)}, {exception.args}, {exception}") sys.exit(2) @@ -239,24 +237,27 @@ def clone_svn_repos(): repo_path = str(environment_variables_dict['SRC_SERVE_ROOT']+"/"+code_host_name+"/"+git_org_name+"/"+git_repo_name) ## Define common command args - arg_svn_non_interactive = [ "--non-interactive" ] # Do not prompt, just fail if the command doesn't work, only used for direct `svn` command - arg_svn_username = [ "--username", username ] - arg_svn_password = [ "--password", password ] # Only used for direct `svn` command - # arg_svn_echo_password = [ "echo", f"\"{password}\"", "|" ] # Used for git svn commands # Breaks getting the correct process exit code + arg_svn_non_interactive = [ "--non-interactive" ] # Do not prompt, just fail if the command doesn't work, only used for direct `svn` command + arg_svn_username = [ "--username", username ] + arg_svn_password = [ "--password", password ] # Only used for direct `svn` commands arg_svn_echo_password = None - arg_svn_repo_code_root = [ svn_repo_code_root ] - arg_git_cfg = [ "git", "-C", repo_path, "config" ] - arg_git_svn = [ "git", "-C", repo_path, "svn" ] + arg_svn_repo_code_root = [ svn_repo_code_root ] + arg_git = [ "git", "-C", repo_path ] + arg_git_cfg = arg_git + [ "config" ] + arg_git_svn = arg_git + [ "svn" ] + arg_batch_end_revision = [ "repo-converter.batch-end-revision" ] ## Define commands cmd_run_svn_info = [ "svn", "info" ] + arg_svn_repo_code_root + arg_svn_non_interactive - cmd_run_svn_log = [ "svn", "log", "--xml" ] + arg_svn_repo_code_root + arg_svn_non_interactive + cmd_run_svn_log = [ "svn", "log", "--xml", "--with-no-revprops" ] + arg_svn_repo_code_root + arg_svn_non_interactive cmd_cfg_git_default_branch = arg_git_cfg + [ "--global", "init.defaultBranch", git_default_branch ] # Possibility of collisions if multiple of these are run overlapping, make sure it's quick between reading and using this cmd_run_git_svn_init = arg_git_svn + [ "init" ] + arg_svn_repo_code_root cmd_cfg_git_bare_clone = arg_git_cfg + [ "core.bare", "true" ] cmd_cfg_git_authors_file = arg_git_cfg + [ "svn.authorsfile", authors_file_path ] cmd_cfg_git_authors_prog = arg_git_cfg + [ "svn.authorsProg", authors_prog_path ] cmd_run_git_svn_fetch = arg_git_svn + [ "fetch" ] + cmd_cfg_git_get_batch_end_revision = arg_git_cfg + [ "--get" ] + arg_batch_end_revision + cmd_cfg_git_set_batch_end_revision = arg_git_cfg + arg_batch_end_revision ## Modify commands based on config parameters if username: @@ -270,9 +271,6 @@ def clone_svn_repos(): cmd_run_svn_info += arg_svn_password cmd_run_svn_log += arg_svn_password - # Used to check if this command is already running in another process, without the password - cmd_run_git_svn_fetch_without_password = ' '.join(cmd_run_git_svn_fetch) - # States # Create: # State: @@ -305,23 +303,32 @@ def clone_svn_repos(): ## Check if we're in the Running state # Check if a fetch process is currently running for this repo + cmd_run_git_svn_fetch_string = ' '.join(cmd_run_git_svn_fetch) + try: ps_command = ["ps", "-e", "--format", "%a"] - completed_ps_command = subprocess_run(ps_command) + running_processes = subprocess_run(ps_command) - if cmd_run_git_svn_fetch_without_password in completed_ps_command: + logging.debug(f"running_processes: {running_processes}") + running_processes_string = " ".join(running_processes) - logging.info(f"Fetching process already running for {repo_key}") + if cmd_run_git_svn_fetch_string in running_processes_string: + + logging.info(f"Fetching process {cmd_run_git_svn_fetch_string} already running for {repo_key}") continue - except Exception as e: - logging.warning(f"Failed to check if {cmd_run_git_svn_fetch_without_password} is already running, will try to start it. Exception: {type(e)}, {e.args}, {e}") + else: + logging.info(f"Fetching process {cmd_run_git_svn_fetch_string} not found for {repo_key}, will start it") + + except Exception as exception: + logging.warning(f"Failed to check if {cmd_run_git_svn_fetch_string} is already running, will try to start it. Exception: {type(exception)}, {exception.args}, {exception}") ## Check if we're in the Update state # Check if the git repo already exists and has the correct settings in the config file + # TODO: Replace this with a git config --get svn.url repo_git_config_file_path = repo_path + "/.git/config" if os.path.exists(repo_git_config_file_path): @@ -358,37 +365,20 @@ def clone_svn_repos(): # Capture the output so we know the max revision in this repo's history svn_info = subprocess_run(cmd_run_svn_info, password, arg_svn_echo_password) - if repo_state == "create": - - logging.info(f"Didn't find a repo on disk for {repo_key}, creating it") - - # # If the user didn't provide a batch size, try and determine one from repo stats - # if not fetch_batch_size and not repo_total_revisions: - - # # Get the rev number for the last rev this repo was changed from the svn info output - # # Default to not specifying a --revision - # if "Last Changed Rev:" in svn_info: - - # last_changed_rev = int(svn_info.split("Last Changed Rev: ")[1].split(" ")[0]) - # logging.debug(f"Last Changed Rev for {repo_key}: {last_changed_rev}") + # TODO: Check if the latest rev number from git log -1 is the same as the max rev number from svn info + # If yes, we're up to date, continue to the next repo, instead of forking the git svn process to do the same check - # cmd_run_svn_log += ["--revision", "BASE:"+str(last_changed_rev)] + # Get the latest revision number for this repo + # if "Last Changed Rev:" in svn_info: - # # Get the number of revisions in this repo's history, to know how many batches to fetch in the initial clone - # # Note this could be a slow process - # svn_log = subprocess_run(cmd_run_svn_log, password) + # last_changed_rev = int(svn_info.split("Last Changed Rev: ")[1].split(" ")[0]) + # logging.debug(f"Last Changed Rev for {repo_key}: {last_changed_rev}") - # repo_rev_count = int(svn_info.split("Revision: ")[1].split(" ")[0]) - # if repo_rev_count < 10000: - # fetch_batch_size = last_changed_rev - # else: - # fetch_batch_size = f"BASE:{last_changed_rev}" - # # TODO: Find a way to set batch size for initial fetch vs update fetches - # if fetch_batch_size and not fetch_batch_size == "HEAD": - # cmd_run_git_svn_fetch += ["--revision", fetch_batch_size] + if repo_state == "create": + logging.info(f"Didn't find a repo on disk for {repo_key}, creating it") # Create the repo path if it doesn't exist if not os.path.exists(repo_path): @@ -418,7 +408,6 @@ def clone_svn_repos(): subprocess_run(cmd_cfg_git_bare_clone) - ## Back to steps we do for both Create and Update states, so users can update the below parameters without having to restart the clone from scratch # Configure the authors file, if provided @@ -443,12 +432,71 @@ def clone_svn_repos(): else: logging.warning(f".gitignore file not found at {git_ignore_file_path}, skipping") - # Start a fetch - logging.info(f"Fetching SVN repo {repo_key} with {cmd_run_git_svn_fetch_without_password}") - # Retaining a reference to this process prevents the process from getting cleaned up automagically - # But, removing all references to it doesn't seem to help it - # So, need to keep a list and clean it up manually + # If the user has configured a batch size + if fetch_batch_size: + + try: + + batch_start_revision = None + batch_end_revision = None + + # Get the revision number to start with + if repo_state == "update": + + # Try to retrieve repo-converter.batch-end-revision from git config + # previous_batch_end_revision = git config --get repo-converter.batch-end-revision + # Need to fail gracefully + previous_batch_end_revision = subprocess_run(cmd_cfg_git_get_batch_end_revision) + logging.debug(f"previous_batch_end_revision: {previous_batch_end_revision}") + + if previous_batch_end_revision: + + batch_start_revision = int(" ".join(previous_batch_end_revision)) + 1 + logging.debug(f"batch_start_revision: {batch_start_revision}") + + if repo_state == "create" or batch_start_revision == None: + + # If this is a new repo, get the first changed revision number for this repo from the svn server log + cmd_run_svn_log_batch_start_revision = cmd_run_svn_log + ["--limit", "1", "--revision", "1:HEAD"] + svn_log_batch_start_revision = subprocess_run(cmd_run_svn_log_batch_start_revision, password, arg_svn_echo_password) + batch_start_revision = int(" ".join(svn_log_batch_start_revision).split("revision=\"")[1].split("\"")[0]) + + if batch_start_revision: + + # Get the batch size'th revision number for the rev to end this batch range + cmd_run_svn_log_batch_end_revision = cmd_run_svn_log + ["--limit", str(fetch_batch_size), "--revision", f"{batch_start_revision}:HEAD"] + cmd_run_svn_log_batch_end_revision_output = subprocess_run(cmd_run_svn_log_batch_end_revision, password, arg_svn_echo_password) + + try: + + # While we're at it, update the batch starting rev to the first real rev number after the previous end rev +1 + batch_start_revision = int(" ".join(cmd_run_svn_log_batch_end_revision_output).split("revision=\"")[1].split("\"")[0]) + + # Reverse the output so we can get the last revision number + cmd_run_svn_log_batch_end_revision_output.reverse() + batch_end_revision = int(" ".join(cmd_run_svn_log_batch_end_revision_output).split("revision=\"")[1].split("\"")[0]) + + except IndexError as exception: + logging.debug(f"Repo {repo_key} likely already up to date; running the fetch without the batch size limit.") + + if batch_start_revision and batch_end_revision: + + # If we were successful getting both starting and ending revision numbers, then use them + cmd_run_git_svn_fetch += ["--revision", f"{batch_start_revision}:{batch_end_revision}"] + + # Store the ending revision number, hoping that this batch completes successfully, as these revs won't be retried + cmd_cfg_git_set_batch_end_revision.append(str(batch_end_revision)) + subprocess_run(cmd_cfg_git_set_batch_end_revision) + + except Exception as exception: + + # Log a warning if this fails, and run the fetch without the --revision arg + logging.warning(f"Failed to get batch start or end revision for repo {repo_key} and batch size {fetch_batch_size}; running the fetch without the batch size limit; exception: {type(exception)}, {exception.args}, {exception}") + + # Start the fetch + cmd_run_git_svn_fetch_string_may_have_batch_range = ' '.join(cmd_run_git_svn_fetch) + logging.info(f"Fetching SVN repo {repo_key} with {cmd_run_git_svn_fetch_string_may_have_batch_range}") multiprocessing.Process(target=subprocess_run, name=f"subprocess_run({cmd_run_git_svn_fetch})", args=(cmd_run_git_svn_fetch, password, password)).start() @@ -485,8 +533,7 @@ def subprocess_run(args, password=None, echo_password=None): try: - logging.debug(f"Starting subprocess: {args_without_password_string}") - + # Create the process object and start it subprocess_to_run = psutil.Popen( args = args, stdin = subprocess.PIPE, @@ -495,26 +542,60 @@ def subprocess_run(args, password=None, echo_password=None): text = True, ) + logging.debug(f"pid {subprocess_to_run.pid} started: {args_without_password_string}; {subprocess_to_run}") + + # Add it to the dict so we can use the commands to identify the PID later + child_processes_list_dict[subprocess_to_run.pid] = args_without_password_string + # If password is provided to this function, feed it into the subprocess' stdin pipe + # communicate() also waits for the process to finish if echo_password: subprocess_stdout_and_stderr = subprocess_to_run.communicate(password) + else: subprocess_stdout_and_stderr = subprocess_to_run.communicate() - subprocess_stdout_and_stderr_without_password = redact_password_from_list(subprocess_stdout_and_stderr[0].splitlines(), password) + # By now, the subprocess should be finished + process_clock_time_seconds = time.time() - subprocess_to_run.create_time() + process_clock_time_formatted = time.strftime("%H:%M:%S", time.localtime(process_clock_time_seconds)) + + # Capture the output in a list + subprocess_stdout_and_stderr_list = subprocess_stdout_and_stderr[0].splitlines() + + # If the output is longer than max_output_characters, it's probably just a list of all files converted, so truncate it + max_output_characters = 10000 + max_output_line_length = 1000 + max_output_lines = 20 + + if len(str(subprocess_stdout_and_stderr_list)) > max_output_characters: + + # If the output list is longer than max_output_lines lines, truncate it + subprocess_stdout_and_stderr_list = subprocess_stdout_and_stderr_list[-max_output_lines:] + subprocess_stdout_and_stderr_list.append(f"...OUTPUT TRUNCATED TO {max_output_lines} LINES") + + # Truncate really long lines + for i in range(len(subprocess_stdout_and_stderr_list)): + if len(subprocess_stdout_and_stderr_list[i]) > max_output_line_length: + subprocess_stdout_and_stderr_list[i] = textwrap.shorten(subprocess_stdout_and_stderr_list[i], width=max_output_line_length, placeholder=f"...LINE TRUNCATED TO {max_output_line_length} CHARACTERS") + + # Redact passwords for logging + subprocess_stdout_and_stderr_without_password = redact_password_from_list(subprocess_stdout_and_stderr_list, password) + + # If the process exited successfully if subprocess_to_run.returncode == 0: subprocess_stdout_without_password = subprocess_stdout_and_stderr_without_password - logging.debug(f"Subprocess {subprocess_to_run} succeeded with stdout: {subprocess_stdout_without_password}") + logging.debug(f"pid {subprocess_to_run.pid} succeeded after {process_clock_time_formatted}: {args_without_password_string}; {subprocess_to_run} with stdout: {subprocess_stdout_without_password}") else: subprocess_stderr_without_password = subprocess_stdout_and_stderr_without_password - logging.error(f"Subprocess {subprocess_to_run} failed with stderr: {subprocess_stderr_without_password}") + logging.error(f"pid {subprocess_to_run.pid} failed after {process_clock_time_formatted}: {args_without_password_string}; {subprocess_to_run} with stderr: {subprocess_stderr_without_password}") + + except subprocess.CalledProcessError as exception: - except subprocess.CalledProcessError as error: - logging.error(f"Subprocess {subprocess_to_run} raised exception: {error}") + logging.error(f"pid {subprocess_to_run.pid} raised exception after {process_clock_time_formatted}: {args_without_password_string}; {subprocess_to_run} with exception: {type(exception)}, {exception.args}, {exception}") if subprocess_stderr_without_password: @@ -540,11 +621,11 @@ def subprocess_run(args, password=None, echo_password=None): if subprocess_run(["rm", "-f", lock_file_path]): logging.info(f"Successfully deleted {lock_file_path}") - except subprocess.CalledProcessError as error: - logging.error(f"Failed to rm -f lockfile at {lock_file_path} with error: {error}") + except subprocess.CalledProcessError as exception: + logging.error(f"Failed to rm -f lockfile at {lock_file_path} with exception: {type(exception)}, {exception.args}, {exception}") - except ValueError as error: - logging.error(f"Failed to find git execution path in command args while trying to delete {lock_file_path} with error: {error}") + except ValueError as exception: + logging.error(f"Failed to find git execution path in command args while trying to delete {lock_file_path} with exception: {type(exception)}, {exception.args}, {exception}") return subprocess_stdout_without_password @@ -575,14 +656,7 @@ def clone_git_repos(): def status_update_and_cleanup_zombie_processes(): - - # subprocess_to_run = psutil.Popen( - # args = args, - # stdin = subprocess.PIPE, - # stdout = subprocess.PIPE, - # stderr = subprocess.STDOUT, - # text = True, - # ) + # The current approach should return the same list of processes as just ps -ef, but may be more flexible # Get the current process ID, should be 1 in Docker os_this_pid = os.getpid() @@ -621,28 +695,34 @@ def status_update_and_cleanup_zombie_processes(): try: + # Check if this PID is one we've stored a command for + process_command = child_processes_list_dict.get(process_pid_to_wait_for, "") + # Create an instance of a Process object for the PID number # Raises psutil.NoSuchProcess if the PID has already finished process_to_wait_for = psutil.Process(process_pid_to_wait_for) + process_clock_time_seconds = time.time() - process_to_wait_for.create_time() + process_clock_time_formatted = time.strftime("%H:%M:%S", time.localtime(process_clock_time_seconds)) + # This rarely fires, ex. if cleaning up processes at the beginning of a script execution and the process finished during the interval if process_to_wait_for.status() == psutil.STATUS_ZOMBIE: - logging.debug(f"Subprocess {process_to_wait_for} is a zombie") + logging.debug(f"pid {process_pid_to_wait_for} is a zombie after {process_clock_time_formatted}: {process_command} {process_to_wait_for}") # Wait a short period, and capture the return status # Raises psutil.TimeoutExpired if the process is busy executing longer than the wait time return_status = process_to_wait_for.wait(0.1) - logging.debug(f"Subprocess {process_pid_to_wait_for} finished now with return status: {return_status}") + logging.debug(f"pid {process_pid_to_wait_for} finished now after {process_clock_time_formatted}: {process_command} {process_to_wait_for} with return status: {return_status}") except psutil.NoSuchProcess as exception: - logging.debug(f"Subprocess {process_pid_to_wait_for} already finished") + logging.debug(f"pid {process_pid_to_wait_for} already finished after {process_clock_time_formatted}: {process_command} {process_to_wait_for}") except psutil.TimeoutExpired as exception: - logging.debug(f"Subprocess {process_pid_to_wait_for} is still running at this moment") + logging.debug(f"pid {process_pid_to_wait_for} is still running after {process_clock_time_formatted}: {process_command} {process_to_wait_for}") except Exception as exception: - logging.debug(f"Subprocess {process_pid_to_wait_for} raised exception while waiting: {exception}") + logging.debug(f"pid {process_pid_to_wait_for} raised exception while waiting after {process_clock_time_formatted}: {process_command} {process_to_wait_for} exception: {type(exception)}, {exception.args}, {exception}") def main(): diff --git a/repo-converter/docker-compose.yaml b/repo-converter/docker-compose.yaml index 3fb0a7c..b347883 100644 --- a/repo-converter/docker-compose.yaml +++ b/repo-converter/docker-compose.yaml @@ -9,7 +9,7 @@ services: repo-converter: container_name: repo-converter - image: ghcr.io/sourcegraph/implementation-bridge-repo-converter:latest + image: ghcr.io/sourcegraph/implementation-bridge-repo-converter:main volumes: - ../config/repos-to-convert.yaml:/sourcegraph/repos-to-convert.yaml:ro - ../config/toprc:/root/.config/procps/toprc