Web app
Example Webapp
# This is code for the tutorial in README.md
from typing import Any, List, MutableMapping
from aiohttp.web import Application
from mode import Service
from mode.threads import ServiceThread
from mode.utils.objects import cached_property
class User:
... # implement yourself
def remove_expired_users(d):
print("REMOVING EXPIRED USERS")
... # implement yourself
async def run_websocket_server():
print("STARTING WEBSOCKET SERVER")
... # implement yourself
class Websockets(Service):
def __init__(self, port: int = 8081, **kwargs: Any) -> None:
self.port = 8081
self._server = None
super().__init__(**kwargs)
async def on_start(self) -> None:
self._server = await run_websocket_server()
async def on_stop(self) -> None:
if self._server is not None:
self._server.close()
class Webserver(ServiceThread):
def __init__(self, port: int = 8000, bind: str = None, **kwargs: Any) -> None:
self._app = Application()
self.port = port
self.bind = bind
self._handler = None
self._srv = None
super().__init__(**kwargs)
async def on_start(self) -> None:
handler = self._handler = self._app.make_handler()
# self.loop is the event loop in this thread
# self.parent_loop is the loop that created this thread.
self._srv = await self.loop.create_server(handler, self.bind, self.port)
self.log.info("Serving on port %s", self.port)
async def on_thread_stop(self) -> None:
# on_thread_stop() executes in the thread.
# on_stop() executes in parent thread.
# quite a few steps required to stop the aiohttp server:
if self._srv is not None:
self.log.info("Closing server")
self._srv.close()
self.log.info("Waiting for server to close handle")
await self._srv.wait_closed()
if self._app is not None:
self.log.info("Shutting down web application")
await self._app.shutdown()
if self._handler is not None:
self.log.info("Waiting for handler to shut down")
await self._handler.shutdown(60.0)
if self._app is not None:
self.log.info("Cleanup")
await self._app.cleanup()
class UserCache(Service):
_cache: MutableMapping[str, User]
def __post_init__(self):
self._cache = {}
async def lookup(self, user_id: str) -> User:
try:
return self._cache[user_id]
except KeyError:
user = self._cache[user_id] = await User.objects.get(user_id)
return user
@Service.timer(10) # execute every 10 seconds.
async def _remove_expired(self):
remove_expired_users(self._cache)
class App(Service):
def __init__(
self,
web_port: int = 8000,
web_bind: str = None,
websocket_port: int = 8001,
**kwargs: Any
) -> None:
self.web_port = web_port
self.web_bind = web_bind
self.websocket_port = websocket_port
super().__init__(**kwargs)
def on_init_dependencies(self) -> List:
return [
self.websockets,
self.webserver,
self.user_cache,
]
async def on_start(self) -> None:
import io
import pydot
o = io.StringIO()
beacon = self.beacon.root or self.beacon
beacon.as_graph().to_dot(o)
(graph,) = pydot.graph_from_dot_data(o.getvalue())
print("WRITING GRAPH TO image.png")
with open("image.png", "wb") as fh:
fh.write(graph.create_png())
@cached_property
def websockets(self) -> Websockets:
return Websockets(
port=self.websocket_port,
loop=self.loop,
beacon=self.beacon,
)
@cached_property
def webserver(self) -> Webserver:
return Webserver(
port=self.web_port,
bind=self.web_bind,
loop=self.loop,
beacon=self.beacon,
)
@cached_property
def user_cache(self) -> UserCache:
return UserCache(loop=self.loop, beacon=self.beacon)
app = App()
if __name__ == "__main__":
from mode.worker import Worker
Worker(app, loglevel="info", daemon=True).execute_from_commandline()