from __future__ import absolute_import from datetime import timedelta import pytest import logging import re import mock from tornado import gen from tornado.ioloop import PeriodicCallback, IOLoop from tornado.httpclient import HTTPError from tornado.httpserver import HTTPServer import bokeh.server.server as server from bokeh.application import Application from bokeh.application.handlers import Handler from bokeh.model import Model from bokeh.core.properties import List, String from bokeh.client import pull_session from bokeh.server.server import BaseServer, Server from bokeh.server.tornado import BokehTornado from bokeh.util.session_id import check_session_id_signature from .utils import ManagedServerLoop, url, ws_url, http_get, websocket_open logging.basicConfig(level=logging.DEBUG) @gen.coroutine def async_value(value): yield gen.moment # this ensures we actually return to the loop raise gen.Return(value) class HookListModel(Model): hooks = List(String) class HookTestHandler(Handler): def __init__(self): super(HookTestHandler, self).__init__() self.load_count = 0 self.unload_count = 0 self.session_creation_async_value = 0 self.hooks = [] self.server_periodic_remover = None self.session_periodic_remover = None def modify_document(self, doc): # this checks that the session created hook has run # and session destroyed has not. assert self.session_creation_async_value == 3 doc.title = "Modified" doc.roots[0].hooks.append("modify") self.hooks.append("modify") def on_server_loaded(self, server_context): assert len(server_context.sessions) == 0 self.load_count += 1 self.hooks.append("server_loaded") server_context.add_next_tick_callback(self.on_next_tick_server) server_context.add_timeout_callback(self.on_timeout_server, 2) periodic_cb_id = server_context.add_periodic_callback(self.on_periodic_server, 3) def remover(): server_context.remove_periodic_callback(periodic_cb_id) self.server_periodic_remover = remover def on_server_unloaded(self, server_context): self.unload_count += 1 self.hooks.append("server_unloaded") # important to test that this can be async @gen.coroutine def on_session_created(self, session_context): @gen.coroutine def setup_document(doc): # session creation hook is allowed to init the document # before any modify_document() handlers kick in from bokeh.document import DEFAULT_TITLE hook_list = HookListModel() assert doc.title == DEFAULT_TITLE assert len(doc.roots) == 0 hook_list.hooks.append("session_created") doc.add_root(hook_list) self.session_creation_async_value = yield async_value(1) self.session_creation_async_value = yield async_value(2) self.session_creation_async_value = yield async_value(3) yield session_context.with_locked_document(setup_document) server_context = session_context.server_context server_context.add_next_tick_callback(self.on_next_tick_session) server_context.add_timeout_callback(self.on_timeout_session, 2) periodic_cb_id = server_context.add_periodic_callback(self.on_periodic_session, 3) def remover(): server_context.remove_periodic_callback(periodic_cb_id) self.session_periodic_remover = remover self.hooks.append("session_created") # this has to be async too @gen.coroutine def on_session_destroyed(self, session_context): # this should be no-op'd, because the session is already destroyed @gen.coroutine def shutdown_document(doc): doc.roots[0].hooks.append("session_destroyed") self.session_creation_async_value = yield async_value(4) self.session_creation_async_value = yield async_value(5) self.session_creation_async_value = yield async_value(6) yield session_context.with_locked_document(shutdown_document) self.hooks.append("session_destroyed") def on_next_tick_server(self): self.hooks.append("next_tick_server") def on_timeout_server(self): self.hooks.append("timeout_server") def on_periodic_server(self): self.hooks.append("periodic_server") self.server_periodic_remover() def on_next_tick_session(self): self.hooks.append("next_tick_session") def on_timeout_session(self): self.hooks.append("timeout_session") def on_periodic_session(self): self.hooks.append("periodic_session") self.session_periodic_remover() def test__lifecycle_hooks(): application = Application() handler = HookTestHandler() application.add(handler) with ManagedServerLoop(application, check_unused_sessions_milliseconds=30) as server: # wait for server callbacks to run before we mix in the # session, this keeps the test deterministic def check_done(): if len(handler.hooks) == 4: server.io_loop.stop() server_load_checker = PeriodicCallback(check_done, 1) server_load_checker.start() server.io_loop.start() server_load_checker.stop() # now we create a session client_session = pull_session(session_id='test__lifecycle_hooks', url=url(server), io_loop=server.io_loop) client_doc = client_session.document assert len(client_doc.roots) == 1 server_session = server.get_session('/', client_session.id) server_doc = server_session.document assert len(server_doc.roots) == 1 # we have to capture these here for examination later, since after # the session is closed, doc.roots will be emptied client_hook_list = client_doc.roots[0] server_hook_list = server_doc.roots[0] client_session.close() # expire the session quickly rather than after the # usual timeout server_session.request_expiration() def on_done(): server.io_loop.stop() server.io_loop.call_later(0.1, on_done) server.io_loop.start() assert handler.hooks == ["server_loaded", "next_tick_server", "timeout_server", "periodic_server", "session_created", "next_tick_session", "modify", "timeout_session", "periodic_session", "session_destroyed", "server_unloaded"] assert handler.load_count == 1 assert handler.unload_count == 1 # this is 3 instead of 6 because locked callbacks on destroyed sessions # are turned into no-ops assert handler.session_creation_async_value == 3 assert client_doc.title == "Modified" assert server_doc.title == "Modified" # only the handler sees the event that adds "session_destroyed" since # the session is shut down at that point. assert client_hook_list.hooks == ["session_created", "modify"] assert server_hook_list.hooks == ["session_created", "modify"] def test_prefix(): application = Application() with ManagedServerLoop(application) as server: assert server.prefix == "" with ManagedServerLoop(application, prefix="foo") as server: assert server.prefix == "foo" def test_get_sessions(): application = Application() with ManagedServerLoop(application) as server: server_sessions = server.get_sessions('/') assert len(server_sessions) == 0 http_get(server.io_loop, url(server)) server_sessions = server.get_sessions('/') assert len(server_sessions) == 1 http_get(server.io_loop, url(server)) server_sessions = server.get_sessions('/') assert len(server_sessions) == 2 server_sessions = server.get_sessions() assert len(server_sessions) == 2 with pytest.raises(ValueError): server.get_sessions("/foo") with ManagedServerLoop({"/foo": application, "/bar": application}) as server: http_get(server.io_loop, url(server) + "foo") server_sessions = server.get_sessions('/foo') assert len(server_sessions) == 1 server_sessions = server.get_sessions('/bar') assert len(server_sessions) == 0 server_sessions = server.get_sessions() assert len(server_sessions) == 1 http_get(server.io_loop, url(server) + "foo") server_sessions = server.get_sessions('/foo') assert len(server_sessions) == 2 server_sessions = server.get_sessions('/bar') assert len(server_sessions) == 0 server_sessions = server.get_sessions() assert len(server_sessions) == 2 http_get(server.io_loop, url(server) + "bar") server_sessions = server.get_sessions('/foo') assert len(server_sessions) == 2 server_sessions = server.get_sessions('/bar') assert len(server_sessions) == 1 server_sessions = server.get_sessions() assert len(server_sessions) == 3 def test__request_in_session_context(): application = Application() with ManagedServerLoop(application) as server: response = http_get(server.io_loop, url(server) + "?foo=10") html = response.body sessionid = extract_sessionid_from_json(html) server_session = server.get_session('/', sessionid) server_doc = server_session.document session_context = server_doc.session_context # do we have a request assert session_context.request is not None def test__request_in_session_context_has_arguments(): application = Application() with ManagedServerLoop(application) as server: response = http_get(server.io_loop, url(server) + "?foo=10") html = response.body sessionid = extract_sessionid_from_json(html) server_session = server.get_session('/', sessionid) server_doc = server_session.document session_context = server_doc.session_context # test if we can get the argument from the request assert session_context.request.arguments['foo'] == [b'10'] def test__no_request_arguments_in_session_context(): application = Application() with ManagedServerLoop(application) as server: response = http_get(server.io_loop, url(server)) html = response.body sessionid = extract_sessionid_from_json(html) server_session = server.get_session('/', sessionid) server_doc = server_session.document session_context = server_doc.session_context # if we do not pass any arguments to the url, the request arguments # should be empty assert len(session_context.request.arguments) == 0 # examples: # "sessionid" : "NzlNoPfEYJahnPljE34xI0a5RSTaU1Aq1Cx5" # 'sessionid':'NzlNoPfEYJahnPljE34xI0a5RSTaU1Aq1Cx5' sessionid_in_json = re.compile("""["']sessionid["'] *: *["']([^"]+)["']""") def extract_sessionid_from_json(html): from six import string_types if not isinstance(html, string_types): import codecs html = codecs.decode(html, 'utf-8') match = sessionid_in_json.search(html) return match.group(1) # examples: # "sessionid" : "NzlNoPfEYJahnPljE34xI0a5RSTaU1Aq1Cx5" # 'sessionid':'NzlNoPfEYJahnPljE34xI0a5RSTaU1Aq1Cx5' use_for_title_in_json = re.compile("""["']use_for_title["'] *: *(false|true)""") def extract_use_for_title_from_json(html): from six import string_types if not isinstance(html, string_types): import codecs html = codecs.decode(html, 'utf-8') match = use_for_title_in_json.search(html) return match.group(1) def autoload_url(server): return url(server) + \ "autoload.js?bokeh-protocol-version=1.0&bokeh-autoload-element=foo" def resource_files_requested(response, requested=True): from six import string_types if not isinstance(response, string_types): import codecs response = codecs.decode(response, 'utf-8') for file in [ 'static/css/bokeh.min.css', 'static/css/bokeh-widgets.min.css', 'static/js/bokeh.min.js', 'static/js/bokeh-widgets.min.js']: if requested: assert file in response else: assert file not in response def test_use_xheaders(): application = Application() with ManagedServerLoop(application, use_xheaders=True) as server: assert server._http.xheaders == True @pytest.mark.parametrize("querystring,requested", [ ("", True), ("&resources=default", True), ("&resources=whatever", True), ("&resources=none", False), ]) @pytest.mark.unit def test__resource_files_requested(querystring, requested): """ Checks if the loading of resource files is requested by the autoload.js response based on the value of the "resources" parameter. """ application = Application() with ManagedServerLoop(application) as server: response = http_get(server.io_loop, autoload_url(server) + querystring) resource_files_requested(response.body, requested=requested) def test__autocreate_session_autoload(): application = Application() with ManagedServerLoop(application) as server: sessions = server.get_sessions('/') assert 0 == len(sessions) response = http_get(server.io_loop, autoload_url(server)) js = response.body sessionid = extract_sessionid_from_json(js) sessions = server.get_sessions('/') assert 1 == len(sessions) assert sessionid == sessions[0].id def test__no_set_title_autoload(): application = Application() with ManagedServerLoop(application) as server: sessions = server.get_sessions('/') assert 0 == len(sessions) response = http_get(server.io_loop, autoload_url(server)) js = response.body use_for_title = extract_use_for_title_from_json(js) assert use_for_title == "false" def test__autocreate_session_doc(): application = Application() with ManagedServerLoop(application) as server: sessions = server.get_sessions('/') assert 0 == len(sessions) response = http_get(server.io_loop, url(server)) html = response.body sessionid = extract_sessionid_from_json(html) sessions = server.get_sessions('/') assert 1 == len(sessions) assert sessionid == sessions[0].id def test__no_autocreate_session_websocket(): application = Application() with ManagedServerLoop(application) as server: sessions = server.get_sessions('/') assert 0 == len(sessions) websocket_open(server.io_loop, ws_url(server) + "?bokeh-protocol-version=1.0") sessions = server.get_sessions('/') assert 0 == len(sessions) def test__use_provided_session_autoload(): application = Application() with ManagedServerLoop(application) as server: sessions = server.get_sessions('/') assert 0 == len(sessions) expected = 'foo' response = http_get(server.io_loop, autoload_url(server) + "&bokeh-session-id=" + expected) js = response.body sessionid = extract_sessionid_from_json(js) assert expected == sessionid sessions = server.get_sessions('/') assert 1 == len(sessions) assert expected == sessions[0].id def test__use_provided_session_doc(): application = Application() with ManagedServerLoop(application) as server: sessions = server.get_sessions('/') assert 0 == len(sessions) expected = 'foo' response = http_get(server.io_loop, url(server) + "?bokeh-session-id=" + expected) html = response.body sessionid = extract_sessionid_from_json(html) assert expected == sessionid sessions = server.get_sessions('/') assert 1 == len(sessions) assert expected == sessions[0].id def test__use_provided_session_websocket(): application = Application() with ManagedServerLoop(application) as server: sessions = server.get_sessions('/') assert 0 == len(sessions) expected = 'foo' url = ws_url(server) + \ "?bokeh-protocol-version=1.0" + \ "&bokeh-session-id=" + expected websocket_open(server.io_loop, url) sessions = server.get_sessions('/') assert 1 == len(sessions) assert expected == sessions[0].id def test__autocreate_signed_session_autoload(): application = Application() with ManagedServerLoop(application, sign_sessions=True, secret_key='foo') as server: sessions = server.get_sessions('/') assert 0 == len(sessions) response = http_get(server.io_loop, autoload_url(server)) js = response.body sessionid = extract_sessionid_from_json(js) sessions = server.get_sessions('/') assert 1 == len(sessions) assert sessionid == sessions[0].id assert check_session_id_signature(sessionid, signed=True, secret_key='foo') def test__autocreate_signed_session_doc(): application = Application() with ManagedServerLoop(application, sign_sessions=True, secret_key='foo') as server: sessions = server.get_sessions('/') assert 0 == len(sessions) response = http_get(server.io_loop, url(server)) html = response.body sessionid = extract_sessionid_from_json(html) sessions = server.get_sessions('/') assert 1 == len(sessions) assert sessionid == sessions[0].id assert check_session_id_signature(sessionid, signed=True, secret_key='foo') def test__reject_unsigned_session_autoload(): application = Application() with ManagedServerLoop(application, sign_sessions=True, secret_key='bar') as server: sessions = server.get_sessions('/') assert 0 == len(sessions) expected = 'foo' with (pytest.raises(HTTPError)) as info: http_get(server.io_loop, autoload_url(server) + "&bokeh-session-id=" + expected) assert 'Invalid session ID' in repr(info.value) sessions = server.get_sessions('/') assert 0 == len(sessions) def test__reject_unsigned_session_doc(): application = Application() with ManagedServerLoop(application, sign_sessions=True, secret_key='bar') as server: sessions = server.get_sessions('/') assert 0 == len(sessions) expected = 'foo' with (pytest.raises(HTTPError)) as info: http_get(server.io_loop, url(server) + "?bokeh-session-id=" + expected) assert 'Invalid session ID' in repr(info.value) sessions = server.get_sessions('/') assert 0 == len(sessions) def test__reject_unsigned_session_websocket(): application = Application() with ManagedServerLoop(application, sign_sessions=True, secret_key='bar') as server: sessions = server.get_sessions('/') assert 0 == len(sessions) expected = 'foo' url = ws_url(server) + \ "?bokeh-protocol-version=1.0" + \ "&bokeh-session-id=" + expected websocket_open(server.io_loop, url) sessions = server.get_sessions('/') assert 0 == len(sessions) def test__no_generate_session_autoload(): application = Application() with ManagedServerLoop(application, generate_session_ids=False) as server: sessions = server.get_sessions('/') assert 0 == len(sessions) with (pytest.raises(HTTPError)) as info: http_get(server.io_loop, autoload_url(server)) assert 'No bokeh-session-id provided' in repr(info.value) sessions = server.get_sessions('/') assert 0 == len(sessions) def test__no_generate_session_doc(): application = Application() with ManagedServerLoop(application, generate_session_ids=False) as server: sessions = server.get_sessions('/') assert 0 == len(sessions) with (pytest.raises(HTTPError)) as info: http_get(server.io_loop, url(server)) assert 'No bokeh-session-id provided' in repr(info.value) sessions = server.get_sessions('/') assert 0 == len(sessions) def test__server_multiple_processes(): # Can't use an ioloop in this test with mock.patch('tornado.httpserver.HTTPServer.add_sockets'): with mock.patch('tornado.process.fork_processes') as tornado_fp: application = Application() server.Server(application, num_procs=3, port=0) tornado_fp.assert_called_with(3) def test__existing_ioloop_with_multiple_processes_exception(): application = Application() ioloop_instance = IOLoop.instance() ; ioloop_instance # silence flake8 with pytest.raises(RuntimeError): with ManagedServerLoop(application, num_procs=3): pass def test__actual_port_number(): application = Application() with ManagedServerLoop(application, port=0) as server: port = server.port assert port > 0 http_get(server.io_loop, url(server)) def test__ioloop_not_forcibly_stopped(): # Issue #5494 application = Application() loop = IOLoop() loop.make_current() server = Server(application, io_loop=loop) server.start() result = [] def f(): server.unlisten() server.stop() # If server.stop() were to stop the Tornado IO loop, # g() wouldn't be called and `result` would remain empty. loop.add_timeout(timedelta(seconds=0.01), g) def g(): result.append(None) loop.stop() loop.add_callback(f) loop.start() assert result == [None] # This test just maintains basic creation and setup, detailed functionality # is exercised by Server tests above def test_base_server(): app = BokehTornado(Application()) httpserver = HTTPServer(app) httpserver.start() loop = IOLoop() loop.make_current() server = BaseServer(loop, app, httpserver) server.start() assert server.io_loop == loop assert server._tornado.io_loop == loop httpserver.stop() server.stop() server.io_loop.close() def test_server_applications_callable_arg(): def modify_doc(doc): doc.title = "Hello, world!" with ManagedServerLoop(modify_doc, port=0) as server: http_get(server.io_loop, url(server)) session = server.get_sessions('/')[0] assert session.document.title == "Hello, world!" with ManagedServerLoop({"/foo": modify_doc}, port=0) as server: http_get(server.io_loop, url(server) + "foo") session = server.get_sessions('/foo')[0] assert session.document.title == "Hello, world!"