From 77ca1f2c23052b136d9c5a35629604eb45d6d25c Mon Sep 17 00:00:00 2001 From: Tristan Cacqueray Date: Sep 06 2019 14:50:41 +0000 Subject: Refactor exec_pod procedure to cleanly handle exceptions --- diff --git a/k1s/api.py b/k1s/api.py index 0aabf1e..7974fca 100644 --- a/k1s/api.py +++ b/k1s/api.py @@ -19,7 +19,7 @@ import os import select import subprocess import time -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Union import cherrypy @@ -51,6 +51,71 @@ def delete_pod(name: str) -> None: subprocess.Popen(Podman + ["kill", "k1s-" + name]) +def exec_pod( + name: str, stdin: bool, + args: Union[str, List[str]], + spdy: SPDYHandler) -> int: + exec_command = ["exec"] + if stdin: + exec_command.append("-i") + exec_command.append("k1s-" + name) + if isinstance(args, list): + exec_command.extend(args) + else: + exec_command.append(args) + + log.debug("Running %s", args) + proc = subprocess.Popen( + Podman + exec_command, + bufsize=0, start_new_session=True, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + stdin=subprocess.PIPE if stdin else None) + rc = -1 + try: + for f in (proc.stdout, proc.stderr, spdy.sock): + # Make proc output non blocking + fd = f.fileno() + fl = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) + + idle_time = time.monotonic() + while True: + process_active = False + r, w, x = select.select( + [proc.stdout, proc.stderr, spdy.sock], [], [], 1) + # print("Select yield", r) + if time.monotonic() - idle_time > 3600: + log.error("ERROR: process stalled") + break + for reader in r: + if reader == spdy.sock: + # Assume streamId is always stdin + _, flag, data = spdy.readDataFrame() + if flag == 1: + # This is the end + proc.stdin.close() + else: + proc.stdin.write(data) + proc.stdin.flush() + else: + + idle_time = time.monotonic() + if reader == proc.stdout: + output = "stdout" + else: + output = "stderr" + data = reader.read() + if data: + process_active = True + spdy.sendFrame(spdy.streams[output], data) + if not process_active and proc.poll() is not None: + rc = proc.poll() + break + finally: + proc.terminate() + return rc + + def create_pod(name: str, namespace: str, image: str) -> Pod: log.info("Creating pod %s with %s", name, image) create_args = [ @@ -107,67 +172,19 @@ class ExecHandler(SPDYHandler): self.streams[name] = streamId # print("Got all the streams!", self.streams) - execCommand = ["exec"] - if self.args.get('stdin'): - execCommand.append("-i") - execCommand.append("k1s-" + self.args['pod']) - if isinstance(self.args['command'], list): - execCommand.extend(self.args['command']) - else: - execCommand.append(self.args['command']) - - self.proc = subprocess.Popen( - Podman + execCommand, - bufsize=0, start_new_session=True, - stdout=subprocess.PIPE, stderr=subprocess.PIPE, - stdin=subprocess.PIPE if self.args.get('stdin') else None) - for f in (self.proc.stdout, self.proc.stderr, self.sock): - # Make proc output non blocking - fd = f.fileno() - fl = fcntl.fcntl(fd, fcntl.F_GETFL) - fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) - - rc = -1 - idle_time = time.monotonic() - while True: - process_active = False - r, w, x = select.select( - [self.proc.stdout, self.proc.stderr, self.sock], [], [], 1) - # print("Select yield", r) - if time.monotonic() - idle_time > 3600: - log.error("ERROR: process stalled") - break - for reader in r: - if reader == self.sock: - # Assume streamId is always stdin - _, flag, data = self.readDataFrame() - if flag == 1: - # This is the end - self.proc.stdin.close() - else: - self.proc.stdin.write(data) - self.proc.stdin.flush() - else: - - idle_time = time.monotonic() - if reader == self.proc.stdout: - output = "stdout" - else: - output = "stderr" - data = reader.read() - if data: - process_active = True - self.sendFrame(self.streams[output], data) - if not process_active and self.proc.poll() is not None: - rc = self.proc.poll() - break + try: + rc = exec_pod( + self.args['pod'], self.args.get('stdin', False), + self.args['command'], self) + except Exception: + log.exception("execution failed") + rc = 1 self.sendFrame(self.streams['error'], json.dumps({ 'kind': 'Status', 'Status': 'Failure' if rc else 'Success', 'code': rc}).encode('ascii')) self.sock.close() - self.proc.terminate() # print(self.addr, "over and out")