[PATCH 1/2] sb/execute: Port the rtemstoolkit performance fixes for python3

chrisj at rtems.org chrisj at rtems.org
Tue Dec 18 04:25:05 UTC 2018


From: Chris Johns <chrisj at rtems.org>

Close #3664.
---
 source-builder/sb/execute.py | 332 +++++++++++++++++++++++++++++------
 1 file changed, 278 insertions(+), 54 deletions(-)

diff --git a/source-builder/sb/execute.py b/source-builder/sb/execute.py
index 12d8114..0c25163 100755
--- a/source-builder/sb/execute.py
+++ b/source-builder/sb/execute.py
@@ -1,6 +1,6 @@
 #
 # RTEMS Tools Project (http://www.rtems.org/)
-# Copyright 2010-2016 Chris Johns (chrisj at rtems.org)
+# Copyright 2010-2017 Chris Johns (chrisj at rtems.org)
 # All rights reserved.
 #
 # This file is part of the RTEMS Tools package in 'rtems-tools'.
@@ -8,7 +8,7 @@
 # Permission to use, copy, modify, and/or distribute this software for any
 # purpose with or without fee is hereby granted, provided that the above
 # copyright notice and this permission notice appear in all copies.
-#
+ #
 # THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
@@ -16,6 +16,7 @@
 # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 # OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+#
 
 #
 # Execute commands or scripts.
@@ -26,15 +27,21 @@
 from __future__ import print_function
 
 import functools
+import io
 import os
 import re
 import sys
 import subprocess
 import threading
+import time
+import traceback
 
 import error
 import log
 
+# Trace exceptions
+trace_threads = False
+
 # Redefine the PIPE from subprocess
 PIPE = subprocess.PIPE
 
@@ -86,75 +93,240 @@ def arg_subst_str(command, subst):
     def add(x, y): return x + ' ' + str(y)
     return functools.reduce(add, cmd, '')
 
-class execute:
-    """Execute commands or scripts. The 'output' is a funtion
-    that handles the output from the process."""
-    def __init__(self, output = None, error_prefix = '', verbose = False):
+class execute(object):
+    """Execute commands or scripts. The 'output' is a funtion that handles the
+    output from the process. The 'input' is a function that blocks and returns
+    data to be written to stdin"""
+    def __init__(self, output = None, input = None, cleanup = None,
+                 error_prefix = '', verbose = False):
+        self.lock = threading.Lock()
         self.output = output
+        self.input = input
+        self.cleanup = cleanup
         self.error_prefix = error_prefix
         self.verbose = verbose
         self.shell_exe = None
         self.shell_commands = False
         self.path = None
         self.environment = None
-
-    def capture(self, proc, timeout = None):
-        """Create 2 threads to read stdout and stderr and send to the
-        output handler. Based on the 'communicate' code in the subprocess
-        module."""
-        def _readthread(fh, out, prefix = ''):
+        self.outputting = False
+        self.timing_out = False
+        self.proc = None
+
+    def capture(self, proc, command = 'pipe', timeout = None):
+        """Create 3 threads to read stdout and stderr and send to the output handler
+        and call an input handler is provided. Based on the 'communicate' code
+        in the subprocess module."""
+        def _writethread(exe, fh, input):
+            """Call the input handler and write it to the stdin. The input handler should
+            block and return None or False if this thread is to exit and True if this
+            is a timeout check."""
+            if trace_threads:
+                print('execute:_writethread: start')
+            encoding = True
+            try:
+                tmp = bytes('temp', sys.stdin.encoding)
+            except:
+                encoding = False
+            try:
+                while True:
+                    if trace_threads:
+                        print('execute:_writethread: call input', input)
+                    lines = input()
+                    if type(lines) == str or type(lines) == bytes:
+                        try:
+                            if encoding:
+                                lines = bytes(lines, sys.stdin.encoding)
+                            fh.write(lines)
+                            fh.flush()
+                        except:
+                            break
+                    if lines == None or \
+                       lines == False or \
+                       (lines == True and fh.closed):
+                        break
+            except:
+                if trace_threads:
+                    print('execute:_writethread: exception')
+                    print(traceback.format_exc())
+                pass
+            try:
+                fh.close()
+            except:
+                pass
+            if trace_threads:
+                print('execute:_writethread: finished')
+
+        def _readthread(exe, fh, out, prefix = ''):
             """Read from a file handle and write to the output handler
             until the file closes."""
-            count = 0
-            while True:
-                line = fh.readline()
-                # str and bytes are the same type in Python2
-                if type(line) is not str and type(line) is bytes:
-                    line = line.decode(sys.stdout.encoding)
-                count += 1
-                if len(line) == 0:
-                    break
+            def _output_line(line, exe, prefix, out, count):
+                #exe.lock.acquire()
+                #exe.outputting = True
+                #exe.lock.release()
                 if out:
                     out(prefix + line)
                 else:
                     log.output(prefix + line)
                     if count > 10:
                         log.flush()
-                        count = 0
 
-        def _timerthread(proc, timer):
-            """Timer thread calls the timer handler if one
-            is present once a second. The user provides a handler
-            and returns False to kill the process or True continue."""
-            while True:
+            if trace_threads:
+                print('execute:_readthread: start')
+            count = 0
+            line = ''
+            try:
+                while True:
+                    #
+                    # The io module file handling return up to the size passed
+                    # in to the read call. The io handle has the default
+                    # buffering size. On any error assume the handle has gone
+                    # and the process is shutting down.
+                    #
+                    try:
+                        data = fh.read1(4096)
+                    except:
+                        data = ''
+                    if len(data) == 0:
+                        if len(line) > 0:
+                            _output_line(line + '\n', exe, prefix, out, count)
+                        break
+                    # str and bytes are the same type in Python2
+                    if type(data) is not str and type(data) is bytes:
+                        data = data.decode(sys.stdout.encoding)
+                    last_ch = data[-1]
+                    sd = (line + data).split('\n')
+                    if last_ch != '\n':
+                        line = sd[-1]
+                    else:
+                        line = ''
+                    sd = sd[:-1]
+                    if len(sd) > 0:
+                        for l in sd:
+                            _output_line(l + '\n', exe, prefix, out, count)
+                            count += 1
+                        if count > 10:
+                            count -= 10
+            except:
+                raise
+                if trace_threads:
+                    print('execute:_readthread: exception')
+                    print(traceback.format_exc())
+                pass
+            try:
+                fh.close()
+            except:
+                pass
+            if len(line):
+                _output_line(line, exe, prefix, out, 100)
+            if trace_threads:
+                print('execute:_readthread: finished')
+
+        def _timerthread(exe, interval, function):
+            """Timer thread is used to timeout a process if no output is
+            produced for the timeout interval."""
+            count = interval
+            while exe.timing_out:
                 time.sleep(1)
-                if not timer(proc):
-                    proc.stdout.close()
-                    proc.stderr.close()
+                if count > 0:
+                    count -= 1
+                exe.lock.acquire()
+                if exe.outputting:
+                    count = interval
+                    exe.outputting = False
+                exe.lock.release()
+                if count == 0:
+                    try:
+                        proc.kill()
+                    except:
+                        pass
+                    else:
+                        function()
+                    break
+
+        name = os.path.basename(command[0])
+
+        stdin_thread = None
+        stdout_thread = None
+        stderr_thread = None
+        timeout_thread = None
 
         if proc.stdout:
             stdout_thread = threading.Thread(target = _readthread,
-                                             args = (proc.stdout,
+                                             name = '_stdout[%s]' % (name),
+                                             args = (self,
+                                                     io.open(proc.stdout.fileno(),
+                                                             mode = 'rb',
+                                                             closefd = False),
                                                      self.output,
                                                      ''))
-            stdout_thread.setDaemon(True)
+            stdout_thread.daemon = True
             stdout_thread.start()
         if proc.stderr:
             stderr_thread = threading.Thread(target = _readthread,
-                                             args = (proc.stderr,
+                                             name = '_stderr[%s]' % (name),
+                                             args = (self,
+                                                     io.open(proc.stderr.fileno(),
+                                                             mode = 'rb',
+                                                             closefd = False),
                                                      self.output,
                                                      self.error_prefix))
-            stderr_thread.setDaemon(True)
+            stderr_thread.daemon = True
             stderr_thread.start()
-        if proc.stdout:
-            stdout_thread.join()
-        if proc.stderr:
-            stderr_thread.join()
-        return proc.wait()
+        if self.input and proc.stdin:
+            stdin_thread = threading.Thread(target = _writethread,
+                                            name = '_stdin[%s]' % (name),
+                                            args = (self,
+                                                    proc.stdin,
+                                                    self.input))
+            stdin_thread.daemon = True
+            stdin_thread.start()
+        if timeout:
+            self.timing_out = True
+            timeout_thread = threading.Thread(target = _timerthread,
+                                              name = '_timeout[%s]' % (name),
+                                              args = (self,
+                                                      timeout[0],
+                                                      timeout[1]))
+            timeout_thread.daemon = True
+            timeout_thread.start()
+        try:
+            self.lock.acquire()
+            try:
+                self.proc = proc
+            except:
+                raise
+            finally:
+                self.lock.release()
+            exitcode = proc.wait()
+        except:
+            proc.kill()
+            raise
+        finally:
+            self.lock.acquire()
+            try:
+                self.proc = None
+            except:
+                raise
+            finally:
+                self.lock.release()
+            if self.cleanup:
+                self.cleanup(proc)
+            if timeout_thread:
+                self.timing_out = False
+                timeout_thread.join(10)
+            if stdin_thread:
+                stdin_thread.join(2)
+            if stdout_thread:
+                stdout_thread.join(2)
+            if stderr_thread:
+                stderr_thread.join(2)
+        return exitcode
 
     def open(self, command, capture = True, shell = False,
              cwd = None, env = None,
-             stdin = None, stdout = None, stderr = None):
+             stdin = None, stdout = None, stderr = None,
+             timeout = None):
         """Open a command with arguments. Provide the arguments as a list or
         a string."""
         if self.verbose:
@@ -166,9 +338,13 @@ class execute:
             if shell:
                 what = 'shell'
             log.output(what + ': ' + s)
+        if self.output is None:
+            raise error.general('capture needs an output handler')
         if shell and self.shell_exe:
             command = arg_list(command)
             command[:0] = self.shell_exe
+        if not stdin and self.input:
+            stdin = subprocess.PIPE
         if not stdout:
             stdout = subprocess.PIPE
         if not stderr:
@@ -191,10 +367,13 @@ class execute:
             proc = subprocess.Popen(command, shell = shell,
                                     cwd = cwd, env = env,
                                     stdin = stdin, stdout = stdout,
-                                    stderr = stderr)
+                                    stderr = stderr,
+                                    close_fds = False)
             if not capture:
                 return (0, proc)
-            exit_code = self.capture(proc)
+            if self.output is None:
+                raise error.general('capture needs an output handler')
+            exit_code = self.capture(proc, command, timeout)
             if self.verbose:
                 log.output('exit: ' + str(exit_code))
         except OSError as ose:
@@ -204,23 +383,26 @@ class execute:
         return (exit_code, proc)
 
     def spawn(self, command, capture = True, cwd = None, env = None,
-              stdin = None, stdout = None, stderr = None):
+              stdin = None, stdout = None, stderr = None,
+              timeout = None):
         """Spawn a command with arguments. Provide the arguments as a list or
         a string."""
         return self.open(command, capture, False, cwd, env,
-                         stdin, stdout, stderr)
+                         stdin, stdout, stderr, timeout)
 
     def shell(self, command, capture = True, cwd = None, env = None,
-              stdin = None, stdout = None, stderr = None):
+              stdin = None, stdout = None, stderr = None,
+              timeout = None):
         """Execute a command within a shell context. The command can contain
         argumments. The shell is specific to the operating system. For example
         it is cmd.exe on Windows XP."""
         return self.open(command, capture, True, cwd, env,
-                         stdin, stdout, stderr)
+                         stdin, stdout, stderr, timeout)
 
     def command(self, command, args = None, capture = True, shell = False,
                 cwd = None, env = None,
-                stdin = None, stdout = None, stderr = None):
+                stdin = None, stdout = None, stderr = None,
+                timeout = None):
         """Run the command with the args. The args can be a list
         or a string."""
         if args and not type(args) is list:
@@ -230,18 +412,21 @@ class execute:
             cmd.extend(args)
         return self.open(cmd, capture = capture, shell = shell,
                          cwd = cwd, env = env,
-                         stdin = stdin, stdout = stdout, stderr = stderr)
+                         stdin = stdin, stdout = stdout, stderr = stderr,
+                         timeout = timeout)
 
     def command_subst(self, command, substs, capture = True, shell = False,
                       cwd = None, env = None,
-                      stdin = None, stdout = None, stderr = None):
+                      stdin = None, stdout = None, stderr = None,
+                      timeout = None):
         """Run the command from the config data with the
         option format string subsituted with the subst variables."""
         args = arg_subst(command, substs)
         return self.command(args[0], args[1:], capture = capture,
                             shell = shell or self.shell_commands,
                             cwd = cwd, env = env,
-                            stdin = stdin, stdout = stdout, stderr = stderr)
+                            stdin = stdin, stdout = stdout, stderr = stderr,
+                            timeout = timeout)
 
     def set_shell(self, execute):
         """Set the shell to execute when issuing a shell command."""
@@ -275,6 +460,37 @@ class execute:
         self.environment = environment
         return old_environment
 
+    def kill(self):
+        self.lock.acquire()
+        try:
+            if self.proc is not None:
+                self.proc.kill()
+        except:
+            raise
+        finally:
+            self.lock.release()
+
+    def terminate(self):
+        self.lock.acquire()
+        try:
+            if self.proc is not None:
+                self.proc.terminate()
+        except:
+            raise
+        finally:
+            self.lock.release()
+
+    def send_signal(self, signal):
+        self.lock.acquire()
+        try:
+            if self.proc is not None:
+                print("sending sig")
+                self.proc.send_signal(signal)
+        except:
+            raise
+        finally:
+            self.lock.release()
+
 class capture_execution(execute):
     """Capture all output as a string and return it."""
 
@@ -303,13 +519,14 @@ class capture_execution(execute):
                          verbose = verbose)
 
     def open(self, command, capture = True, shell = False, cwd = None, env = None,
-             stdin = None, stdout = None, stderr = None):
+             stdin = None, stdout = None, stderr = None, timeout = None):
         if not capture:
             raise error.general('output capture must be true; leave as default')
         #self.snapper.get_and_clear()
         exit_code, proc = execute.open(self, command, capture = True, shell = shell,
                                        cwd = cwd, env = env,
-                                       stdin = stdin, stdout = stdout, stderr = stderr)
+                                       stdin = stdin, stdout = stdout, stderr = stderr,
+                                       timeout = timeout)
         return (exit_code, proc, self.snapper.get_and_clear())
 
     def set_output(self, output):
@@ -333,11 +550,18 @@ if __name__ == "__main__":
         if ec == 0:
             print('piping input into ' + commands['pipe'][0] + ': ' + \
                   commands['pipe'][2])
-            proc.stdin.write(bytes(commands['pipe'][2], sys.stdin.encoding))
+            try:
+                out = bytes(commands['pipe'][2], sys.stdin.encoding)
+            except:
+                out = commands['pipe'][2]
+            proc.stdin.write(out)
             proc.stdin.close()
             e.capture(proc)
             del proc
 
+    def capture_output(text):
+        print(text, end = '')
+
     cmd_shell_test = 'if "%OS%" == "Windows_NT" (echo It is WinNT) else echo Is is not WinNT'
     sh_shell_test = 'x="me"; if [ $x = "me" ]; then echo "It was me"; else "It was him"; fi'
 
@@ -363,7 +587,7 @@ if __name__ == "__main__":
     print(arg_subst(['nothing', 'xx-%0-yyy', '%1', '%2-something'],
                     ['subst0', 'subst1', 'subst2']))
 
-    e = execute(error_prefix = 'ERR: ', verbose = True)
+    e = execute(error_prefix = 'ERR: ', output = capture_output, verbose = True)
     if sys.platform == "win32":
         run_tests(e, commands['windows'], False)
         if os.path.exists('c:\\msys\\1.0\\bin\\sh.exe'):
-- 
2.19.1



More information about the devel mailing list