# -*- coding: utf-8 -*- import os import six import sys import time import traceback from django.conf import settings from django.core.management.base import BaseCommand, CommandError from django.db.backends import utils from django.utils.six import PY3 from django.utils.datastructures import OrderedSet from django_extensions.management.shells import import_objects from django_extensions.management.utils import signalcommand def use_vi_mode(): editor = os.environ.get('EDITOR') if not editor: return False editor = os.path.basename(editor) return editor.startswith('vi') or editor.endswith('vim') class Command(BaseCommand): help = "Like the 'shell' command but autoloads the models of all installed Django apps." extra_args = None tests_mode = False def add_arguments(self, parser): super(Command, self).add_arguments(parser) parser.add_argument( '--plain', action='store_true', dest='plain', default=False, help='Tells Django to use plain Python, not BPython nor IPython.' ) parser.add_argument( '--bpython', action='store_true', dest='bpython', default=False, help='Tells Django to use BPython, not IPython.' ) parser.add_argument( '--ptpython', action='store_true', dest='ptpython', default=False, help='Tells Django to use PTPython, not IPython.' ) parser.add_argument( '--ptipython', action='store_true', dest='ptipython', default=False, help='Tells Django to use PT-IPython, not IPython.' ) parser.add_argument( '--ipython', action='store_true', dest='ipython', default=False, help='Tells Django to use IPython, not BPython.' ) parser.add_argument( '--notebook', action='store_true', dest='notebook', default=False, help='Tells Django to use IPython Notebook.' ) parser.add_argument( '--kernel', action='store_true', dest='kernel', default=False, help='Tells Django to start an IPython Kernel.' ) parser.add_argument( '--connection-file', action='store', dest='connection_file', help='Specifies the connection file to use if using the --kernel option' ) parser.add_argument( '--no-startup', action='store_true', dest='no_startup', default=False, help='When using plain Python, ignore the PYTHONSTARTUP environment variable and ~/.pythonrc.py script.' ) parser.add_argument( '--use-pythonrc', action='store_true', dest='use_pythonrc', default=False, help='When using plain Python, load the PYTHONSTARTUP environment variable and ~/.pythonrc.py script.' ) parser.add_argument( '--print-sql', action='store_true', default=False, help="Print SQL queries as they're executed" ) parser.add_argument( '--dont-load', action='append', dest='dont_load', default=[], help='Ignore autoloading of some apps/models. Can be used several times.' ) parser.add_argument( '--quiet-load', action='store_true', default=False, dest='quiet_load', help='Do not display loaded models messages' ) parser.add_argument( '--vi', action='store_true', default=use_vi_mode(), dest='vi_mode', help='Load Vi key bindings (for --ptpython and --ptipython)' ) parser.add_argument( '--no-browser', action='store_true', default=False, dest='no_browser', help='Don\'t open the notebook in a browser after startup.' ) def run_from_argv(self, argv): if '--' in argv[2:]: idx = argv.index('--') self.extra_args = argv[idx + 1:] argv = argv[:idx] return super(Command, self).run_from_argv(argv) def get_ipython_arguments(self, options): if self.extra_args: return self.extra_args ipython_args = 'IPYTHON_ARGUMENTS' arguments = getattr(settings, ipython_args, []) if not arguments: arguments = os.environ.get(ipython_args, '').split() return arguments def get_notebook_arguments(self, options): if self.extra_args: return self.extra_args notebook_args = 'NOTEBOOK_ARGUMENTS' arguments = getattr(settings, notebook_args, []) if not arguments: arguments = os.environ.get(notebook_args, '').split() return arguments def get_imported_objects(self, options): imported_objects = import_objects(options, self.style) if self.tests_mode: # save imported objects so we can run tests against it later self.tests_imported_objects = imported_objects return imported_objects def get_kernel(self, options): try: from IPython import release if release.version_info[0] < 2: print(self.style.ERROR("--kernel requires at least IPython version 2.0")) return from IPython import start_kernel except ImportError: return traceback.format_exc() def run_kernel(): imported_objects = self.get_imported_objects(options) kwargs = dict( argv=[], user_ns=imported_objects, ) connection_file = options['connection_file'] if connection_file: kwargs['connection_file'] = connection_file start_kernel(**kwargs) return run_kernel def get_notebook(self, options): from IPython import release try: from notebook.notebookapp import NotebookApp except ImportError: try: from IPython.html.notebookapp import NotebookApp except ImportError: if release.version_info[0] >= 3: raise try: from IPython.frontend.html.notebook import notebookapp NotebookApp = notebookapp.NotebookApp except ImportError: return traceback.format_exc() no_browser = options['no_browser'] def install_kernel_spec(app, display_name, ipython_arguments): """install an IPython >= 3.0 kernelspec that loads django extensions""" ksm = app.kernel_spec_manager try_spec_names = getattr(settings, 'NOTEBOOK_KERNEL_SPEC_NAMES', [ 'python3' if PY3 else 'python2', 'python', ]) if isinstance(try_spec_names, six.string_types): try_spec_names = [try_spec_names] ks = None for spec_name in try_spec_names: try: ks = ksm.get_kernel_spec(spec_name) break except Exception: continue if not ks: raise CommandError("No notebook (Python) kernel specs found") ks.argv.extend(ipython_arguments) ks.display_name = display_name manage_py_dir, manage_py = os.path.split(os.path.realpath(sys.argv[0])) if manage_py == 'manage.py' and os.path.isdir(manage_py_dir) and manage_py_dir != os.getcwd(): pythonpath = ks.env.get('PYTHONPATH', os.environ.get('PYTHONPATH', '')) pythonpath = pythonpath.split(os.pathsep) if manage_py_dir not in pythonpath: pythonpath.append(manage_py_dir) ks.env['PYTHONPATH'] = os.pathsep.join(filter(None, pythonpath)) kernel_dir = os.path.join(ksm.user_kernel_dir, 'django_extensions') if not os.path.exists(kernel_dir): os.makedirs(kernel_dir) with open(os.path.join(kernel_dir, 'kernel.json'), 'w') as f: f.write(ks.to_json()) def run_notebook(): app = NotebookApp.instance() # Treat IPYTHON_ARGUMENTS from settings ipython_arguments = self.get_ipython_arguments(options) if 'django_extensions.management.notebook_extension' not in ipython_arguments: ipython_arguments.extend(['--ext', 'django_extensions.management.notebook_extension']) # Treat NOTEBOOK_ARGUMENTS from settings notebook_arguments = self.get_notebook_arguments(options) if no_browser and '--no-browser' not in notebook_arguments: notebook_arguments.append('--no-browser') if '--notebook-dir' not in notebook_arguments: notebook_arguments.extend(['--notebook-dir', '.']) # IPython < 3 passes through kernel args from notebook CLI if release.version_info[0] < 3: notebook_arguments.extend(ipython_arguments) app.initialize(notebook_arguments) # IPython >= 3 uses kernelspecs to specify kernel CLI args if release.version_info[0] >= 3: display_name = getattr(settings, 'IPYTHON_KERNEL_DISPLAY_NAME', "Django Shell-Plus") install_kernel_spec(app, display_name, ipython_arguments) app.start() return run_notebook def get_plain(self, options): # Using normal Python shell import code imported_objects = self.get_imported_objects(options) try: # Try activating rlcompleter, because it's handy. import readline except ImportError: pass else: # We don't have to wrap the following import in a 'try', because # we already know 'readline' was imported successfully. import rlcompleter readline.set_completer(rlcompleter.Completer(imported_objects).complete) # Enable tab completion on systems using libedit (e.g. macOS). # These lines are copied from Lib/site.py on Python 3.4. readline_doc = getattr(readline, '__doc__', '') if readline_doc is not None and 'libedit' in readline_doc: readline.parse_and_bind("bind ^I rl_complete") else: readline.parse_and_bind("tab:complete") use_pythonrc = options['use_pythonrc'] no_startup = options['no_startup'] # We want to honor both $PYTHONSTARTUP and .pythonrc.py, so follow system # conventions and get $PYTHONSTARTUP first then .pythonrc.py. if use_pythonrc or not no_startup: for pythonrc in OrderedSet([os.environ.get("PYTHONSTARTUP"), os.path.expanduser('~/.pythonrc.py')]): if not pythonrc: continue if not os.path.isfile(pythonrc): continue with open(pythonrc) as handle: pythonrc_code = handle.read() # Match the behavior of the cpython shell where an error in # PYTHONSTARTUP prints an exception and continues. try: exec(compile(pythonrc_code, pythonrc, 'exec'), imported_objects) except Exception: traceback.print_exc() if self.tests_mode: raise def run_plain(): code.interact(local=imported_objects) return run_plain def get_bpython(self, options): try: from bpython import embed except ImportError: return traceback.format_exc() def run_bpython(): imported_objects = self.get_imported_objects(options) kwargs = {} if self.extra_args: kwargs['args'] = self.extra_args embed(imported_objects, **kwargs) return run_bpython def get_ipython(self, options): try: from IPython import start_ipython def run_ipython(): imported_objects = self.get_imported_objects(options) ipython_arguments = self.get_ipython_arguments(options) start_ipython(argv=ipython_arguments, user_ns=imported_objects) return run_ipython except ImportError: str_exc = traceback.format_exc() # IPython < 0.11 # Explicitly pass an empty list as arguments, because otherwise # IPython would use sys.argv from this script. # Notebook not supported for IPython < 0.11. try: from IPython.Shell import IPShell except ImportError: return str_exc + "\n" + traceback.format_exc() def run_ipython(): imported_objects = self.get_imported_objects(options) shell = IPShell(argv=[], user_ns=imported_objects) shell.mainloop() return run_ipython def get_ptpython(self, options): try: from ptpython.repl import embed, run_config except ImportError: tb = traceback.format_exc() try: # prompt_toolkit < v0.27 from prompt_toolkit.contrib.repl import embed, run_config except ImportError: return tb def run_ptpython(): imported_objects = self.get_imported_objects(options) history_filename = os.path.expanduser('~/.ptpython_history') embed(globals=imported_objects, history_filename=history_filename, vi_mode=options['vi_mode'], configure=run_config) return run_ptpython def get_ptipython(self, options): try: from ptpython.repl import run_config from ptpython.ipython import embed except ImportError: tb = traceback.format_exc() try: # prompt_toolkit < v0.27 from prompt_toolkit.contrib.repl import run_config from prompt_toolkit.contrib.ipython import embed except ImportError: return tb def run_ptipython(): imported_objects = self.get_imported_objects(options) history_filename = os.path.expanduser('~/.ptpython_history') embed(user_ns=imported_objects, history_filename=history_filename, vi_mode=options['vi_mode'], configure=run_config) return run_ptipython def set_application_name(self, options): """Set the application_name on PostgreSQL connection Use the fallback_application_name to let the user override it with PGAPPNAME env variable http://www.postgresql.org/docs/9.4/static/libpq-connect.html#LIBPQ-PARAMKEYWORDS # noqa """ supported_backends = ['django.db.backends.postgresql', 'django.db.backends.postgresql_psycopg2'] opt_name = 'fallback_application_name' default_app_name = 'django_shell' app_name = default_app_name dbs = getattr(settings, 'DATABASES', []) # lookup over all the databases entry for db in dbs.keys(): if dbs[db]['ENGINE'] in supported_backends: try: options = dbs[db]['OPTIONS'] except KeyError: options = {} # dot not override a defined value if opt_name in options.keys(): app_name = dbs[db]['OPTIONS'][opt_name] else: dbs[db].setdefault('OPTIONS', {}).update({opt_name: default_app_name}) app_name = default_app_name return app_name @signalcommand def handle(self, *args, **options): use_kernel = options['kernel'] use_notebook = options['notebook'] use_ipython = options['ipython'] use_bpython = options['bpython'] use_plain = options['plain'] use_ptpython = options['ptpython'] use_ptipython = options['ptipython'] verbosity = options["verbosity"] print_sql = getattr(settings, 'SHELL_PLUS_PRINT_SQL', False) truncate = getattr(settings, 'SHELL_PLUS_PRINT_SQL_TRUNCATE', 1000) if options["print_sql"] or print_sql: # Code from http://gist.github.com/118990 try: import sqlparse sqlparse_format_kwargs_defaults = dict( reindent_aligned=True, truncate_strings=500, ) sqlparse_format_kwargs = getattr(settings, 'SHELL_PLUS_SQLPARSE_FORMAT_KWARGS', sqlparse_format_kwargs_defaults) except ImportError: sqlparse = None try: import pygments.lexers import pygments.formatters pygments_formatter = getattr(settings, 'SHELL_PLUS_PYGMENTS_FORMATTER', pygments.formatters.TerminalFormatter) pygments_formatter_kwargs = getattr(settings, 'SHELL_PLUS_PYGMENTS_FORMATTER_KWARGS', {}) except ImportError: pygments = None class PrintQueryWrapper(utils.CursorDebugWrapper): def execute(self, sql, params=()): starttime = time.time() try: return utils.CursorWrapper.execute(self, sql, params) finally: execution_time = time.time() - starttime raw_sql = self.db.ops.last_executed_query(self.cursor, sql, params) if sqlparse: raw_sql = raw_sql[:truncate] raw_sql = sqlparse.format(raw_sql, **sqlparse_format_kwargs) if pygments: raw_sql = pygments.highlight( raw_sql, pygments.lexers.get_lexer_by_name("sql"), pygments_formatter(**pygments_formatter_kwargs), ) print(raw_sql) print("") print('Execution time: %.6fs [Database: %s]' % (execution_time, self.db.alias)) print("") utils.CursorDebugWrapper = PrintQueryWrapper shells = ( ('ptipython', self.get_ptipython), ('ptpython', self.get_ptpython), ('bpython', self.get_bpython), ('ipython', self.get_ipython), ('plain', self.get_plain), ) SETTINGS_SHELL_PLUS = getattr(settings, 'SHELL_PLUS', None) shell = None shell_name = "any" self.set_application_name(options) if use_kernel: shell = self.get_kernel(options) shell_name = "IPython Kernel" elif use_notebook: shell = self.get_notebook(options) shell_name = "IPython Notebook" elif use_plain: shell = self.get_plain(options) shell_name = "plain" elif use_ipython: shell = self.get_ipython(options) shell_name = "IPython" elif use_bpython: shell = self.get_bpython(options) shell_name = "BPython" elif use_ptpython: shell = self.get_ptpython(options) shell_name = "ptpython" elif use_ptipython: shell = self.get_ptipython(options) shell_name = "ptipython" elif SETTINGS_SHELL_PLUS: shell_name = SETTINGS_SHELL_PLUS shell = dict(shells)[shell_name](options) else: for shell_name, func in shells: if verbosity > 2: print(self.style.NOTICE("Trying shell: %s" % shell_name)) shell = func(options) if callable(shell): if verbosity > 1: print(self.style.NOTICE("Using shell: %s" % shell_name)) break if not callable(shell): if shell: print(shell) print(self.style.ERROR("Could not load %s interactive Python environment." % shell_name)) return if self.tests_mode: return 130 shell()