Source code for twtb.logic.telegram.button_handlers

"""Module for handlers for buttons in :func:`/start command <twtb.logic.telegram.on_message._start_command>`."""
import abc
import asyncio
import typing as t

from loguru import logger

from twtb.logic.shared.db import Database
from twtb.logic.shared.db.channels_info import (
from import ButtonData

[docs]class ButtonHandler(abc.ABC): """Abstract class for button handlers."""
[docs] handles: ButtonData
[docs] async def handle(self, event: -> None: """Handle button. Args: event: Telethon's event. """ logger.trace(f"Handling {} button with {type(self).__name__} from {event.sender_id}.") return await self._handle(event)
[docs] async def _handle(self, event: -> None: ...
[docs] def get_handler(cls, button_data: ButtonData) -> "ButtonHandler": """Get handler for button. Args: button_data: Button data. Returns: Handler for button. """ for possible_handler in cls.__subclasses__(): if possible_handler.handles == button_data: return possible_handler() raise NotImplementedError(f"Button handler for {button_data!r} is not implemented")
[docs]class SubscribeToWordButtonHandler(ButtonHandler): """Handler for ``Subscribe to word`` button."""
[docs] handles = ButtonData.SUBSCRIBE_TO_WORD
[docs] async def _handle(self, event: -> None: # noqa: D102 try: async with event.client.conversation( as conversation: await conversation.send_message("What word do you want to subscribe to?") word = (await conversation.get_response()).text except (asyncio.TimeoutError, ValueError): logger.trace(f"User {event.sender_id} took too long to respond") await event.respond("You took too long to respond :(") raise database = Database() is_subscribed = await database.subscribe_user(event.sender_id, word) await event.respond("Done!" if is_subscribed else "You are already subscribed to this word!")
[docs]class UnsubscribeFromWordButtonHandler(ButtonHandler): """Handler for ``Unsubscribe from word`` button."""
[docs] handles = ButtonData.UNSUBSCRIBE_FROM_WORD
[docs] async def _handle(self, event: -> None: try: async with event.client.conversation( as conversation: await conversation.send_message("What word do you want to unsubscribe from?") word = (await conversation.get_response()).text except (asyncio.TimeoutError, ValueError): logger.trace(f"User {event.sender_id} took too long to respond") await event.respond("You took too long to respond :(") raise database = Database() is_removed = await database.unsubscribe_user(event.sender_id, word) await event.respond("Done!" if is_removed else "You are not subscribed to this word!") logger.trace( f"User {event.sender_id} {f'is unsubscribed from the word {word!r}' if is_removed else f'was not subscribed to the word {word!r}, but tried to unsubscribe.'}" )
[docs]class ListMySubscribesButtonHandler(ButtonHandler): """Handler for ``List my subscribes`` button."""
[docs] handles = ButtonData.LIST_MY_SUBSCRIBES
[docs] async def _handle(self, event: -> None: # noqa: D102 database = Database() subscribes = await database.get_user_words(event.sender_id) if len(subscribes) == 0: await event.respond("You are not subscribed to anything yet!") return await event.respond("Your subscribes:\n" + "\n".join(subscribes))
[docs]class ListKnownChannelsButtonHandler(ButtonHandler): """Handler for ``List known channels`` button."""
[docs] handles = ButtonData.LIST_KNOWN_CHANNELS
[docs] async def _handle(self, event: -> None: # noqa: D102 database = Database() channels = await database.get_all_channels() if len(channels) == 0: logger.warning("No channels were added yet!") await event.respond("No channels were added yet!") return human_friendly_names: t.List[str] = [] for channel in channels: entity = await database.channels_info.get_and_save(channel, event.client) human_friendly_names.append(f"{entity.title} (@{entity.username})") await event.respond("Known channels:\n" + "\n".join(human_friendly_names))
[docs]class AddChannelButtonHandler(ButtonHandler): """Handler for ``Add channel`` button."""
[docs] handles = ButtonData.ADD_CHANNEL
[docs] async def _handle(self, event: -> None: # noqa: D102 try: async with event.client.conversation( as conversation: await conversation.send_message("What channel do you want to add?") channel = (await conversation.get_response()).text except (asyncio.TimeoutError, ValueError): logger.trace(f"User {event.sender_id} took too long to respond") await event.respond("You took too long to respond :(") raise database = Database() try: entity = await database.channels_info.get_and_save(channel, event.client) except ChannelNotFoundOrIsInvalidError: logger.trace(f"Channel {channel!r} that gave {event.sender.username!r} was not found.") await event.respond("Channel not found!") return except ProvidedIdIsNotAChannelError as exception: logger.trace( f"Channel {exception.channel_name!r} that gave {event.sender_id} is not a channel, but is {exception.actual_type_name}." ) await event.respond("This is not a channel!") return is_added = await database.add_channel(entity.username) await event.respond("Done!" if is_added else "This channel is already added!")