# -*- coding: utf-8 -*-
import argparse
import functools
import logging
import os
import subprocess
import sys
from typing import List, Optional
import mincepy
import cmd2.constants
import cmd2.plugin
import cmd2.utils
import pyos
from pyos import db
from pyos import os as pos
from pyos import version
from . import constants
from . import utils
_LOGGER = logging.getLogger(__name__)
__all__ = 'PyosShell', 'mod'
[docs]def mod() -> str:
"""Get the message of the day string"""
banner_lines = version.LOGO.split('\n')
max_line_length = max(map(len, banner_lines))
second_column = [
'',
'Documentation: https://pyos.readthedocs.io/',
'',
'',
f'Powered by mincePy (v{mincepy.__version__})', # pylint: disable=no-member,
f'Version {version.__version__}'
]
second_column.extend([''] * (len(banner_lines) - len(second_column)))
message = []
for banner, info in zip(banner_lines, second_column):
fmt_string = f'{{:<{max_line_length}}} | {{}}'
message.append(fmt_string.format(banner, info))
message.append('\n')
return '\n'.join(message)
[docs]class PyosShell(cmd2.Cmd):
"""The pyOS shell"""
def __init__(self,
startup_script='',
startup_commands: Optional[List[str]] = None,
skip_intro=False):
hist_path = os.path.join(utils.get_app_dir(), constants.HISTORY_FILE)
startup_script = startup_script or os.path.join(utils.get_app_dir(), constants.STARTUP_FILE)
super().__init__(startup_script=startup_script,
allow_cli_args=False,
use_ipython=True,
persistent_history_file=hist_path)
if startup_commands:
self._startup_commands.extend(startup_commands)
for cmd_set in utils.plugins_get_commands():
self.register_command_set(cmd_set)
self.default_to_shell = True
if not skip_intro:
self.intro = mod()
self._update_prompt()
self.register_cmdfinalization_hook(self.command_finalise)
[docs] @classmethod
def execute(cls, *cmd, **kwarg):
"""Run the passed commands and return"""
init_args = dict(skip_intro=True)
init_args.update(kwarg)
cmd = list(cmd) + ['quit']
init_args['startup_commands'] = cmd
app = PyosShell(**init_args)
app.cmdloop()
def command_finalise(
self, data: cmd2.plugin.CommandFinalizationData) -> cmd2.plugin.CommandFinalizationData:
self._update_prompt()
return data
def _update_prompt(self):
try:
historian = db.get_historian()
except RuntimeError:
# Happens when there is a global historian but pyos.db.init() hasn't been called
historian = None
if historian is None:
self.prompt = '[not connected]$ '
else:
self.prompt = f'{pos.getcwd()}$ '
def _redirect_output(self, statement: cmd2.Statement) -> cmd2.utils.RedirectionSavedState:
if statement.pipe_to:
# Initialize the redirection saved state
saved = self._create_redirection_save()
_LOGGER.debug('Attempting piped command:\n'
'%s\n'
'stdin=%s, stdout=%s', statement.pipe_to, saved.saved_sys_stdin,
saved.saved_self_stdout)
funcs = []
for part in statement.pipe_to.split(cmd2.constants.REDIRECTION_PIPE):
# Parse the command
# Whitespace (especially at beginning) confuses parsing to statement
part = part.strip()
part_statement = self._input_line_to_statement(part)
funcs.append(functools.partial(self.onecmd, part_statement))
thread_proc = utils.Piper(funcs, out_stream=self.stdout)
self.stdout = thread_proc.out_redirector
self.stdin = thread_proc.in_redirector
saved.redirecting = True
try:
self._cur_pipe_proc_reader = thread_proc
self._redirecting = True
thread_proc.start()
except Exception:
# Shut down the full pipe
thread_proc.shutdown(wait=True)
# Restore the default streams
self._restore_output(statement, saved)
raise
return saved
return super()._redirect_output(statement)
# Preserve quotes since we are passing these strings to the shell
[docs] @cmd2.with_argparser(cmd2.Cmd.shell_parser, preserve_quotes=True)
def do_shell(self, args: argparse.Namespace) -> None:
"""Execute a command as if at the OS prompt"""
# Create a list of arguments to shell
tokens = [args.command] + args.command_args
# Expand ~ where needed
cmd2.utils.expand_user_in_tokens(tokens)
expanded_command = ' '.join(tokens)
# Prevent KeyboardInterrupts while in the shell process. The shell process will
# still receive the SIGINT since it is in the same process group as us.
with self.sigint_protection:
# For any stream that is a StdSim, we will use a pipe so we can capture its output
kwargs = dict(
stdin=sys.stdin if self._redirecting else None,
# Pass standard input if redirecting
stdout=subprocess.PIPE
if isinstance(self.stdout, cmd2.utils.StdSim) else self.stdout,
stderr=subprocess.PIPE if isinstance(sys.stderr, cmd2.utils.StdSim) else sys.stderr,
shell=True)
_LOGGER.debug('Starting shell command: %s. Capturing stdin: %s', expanded_command,
self._redirecting)
with subprocess.Popen(expanded_command, **kwargs) as proc:
proc_reader = cmd2.utils.ProcReader(proc, self.stdout, sys.stderr)
proc_reader.wait()
# Save the return code of the application for use in a pyscript
self.last_result = proc.returncode
# Only include the do_ipy() method if IPython is available on the system
if cmd2.cmd2.ipython_available: # pragma: no cover
[docs] @cmd2.with_argparser(cmd2.cmd2.Cmd.ipython_parser)
def do_ipy(self, _: argparse.Namespace) -> Optional[bool]:
"""
Enter an interactive IPython shell
:return: True if running of commands should stop
"""
from cmd2.py_bridge import PyBridge
# noinspection PyUnusedLocal
def load_ipy(cmd2_app: PyosShell, py_bridge: PyBridge):
"""
Embed an IPython shell in an environment that is restricted to only the variables in this function
:param cmd2_app: instance of the cmd2 app
:param py_bridge: a PyBridge
"""
# Create a variable pointing to py_bridge and name it using the value of py_bridge_name
exec(f'{cmd2_app.py_bridge_name} = py_bridge') # pylint: disable=exec-used
# Add self variable pointing to cmd2_app, if allowed
if cmd2_app.self_in_py:
exec('self = cmd2_app') # pylint: disable=exec-used
# Delete these names from the environment so IPython can't use them
del cmd2_app
del py_bridge
cmd2.cmd2.embed(
banner1=('Entering an pyOS IPython shell. Type quit or <Ctrl>-d to exit.\n'
'Run Python code from external files with: run filename.py\n'),
user_module=pyos.psh,
exit_msg=f'Leaving IPython, back to {sys.argv[0]}')
if self.in_pyscript():
self.perror('Recursively entering interactive Python shells is not allowed')
else:
try:
self._in_py = True
new_py_bridge = PyBridge(self)
load_ipy(self, new_py_bridge)
return new_py_bridge.stop
finally:
self._in_py = False
return None
def _create_redirection_save(self):
"""Save the current state of the steams and members related to redirection"""
return utils.RedirectionSavedState(self.stdout, sys.stdout, self.stdin, sys.stdin,
self._cur_pipe_proc_reader, self._redirecting)
def _restore_output(self, statement: cmd2.Statement,
saved_redir_state: utils.RedirectionSavedState):
"""Handles restoring state after output redirection
:param statement: Statement object which contains the parsed input from the user
:param saved_redir_state: contains information needed to restore state data
"""
if saved_redir_state.redirecting:
# If we redirected output to the clipboard
if statement.output and not statement.output_to:
# Fallback to default behaviour
super()._restore_output(statement, saved_redir_state)
else:
# Check if we need to wait for the process being piped
if self._cur_pipe_proc_reader is not None:
self._cur_pipe_proc_reader.wait()
self.stdout = saved_redir_state.saved_self_stdout
sys.stdout = saved_redir_state.saved_sys_stdout
self.stdin = saved_redir_state.saved_self_stdin
sys.stdin = saved_redir_state.saved_sys_stdin
self._cur_pipe_proc_reader = saved_redir_state.saved_pipe_proc_reader
self._redirecting = saved_redir_state.saved_redirecting
else:
super()._restore_output(statement, saved_redir_state)
if isinstance(saved_redir_state, utils.RedirectionSavedState):
sys.stdin = saved_redir_state.saved_sys_stdin
if __name__ == '__main__':
sys.exit(PyosShell().cmdloop())