Source code for twtb.logic.telegram.subscribe

"""Module for subscribing to all channels in the database."""
import asyncio
import typing as t

import telethon.tl.functions.channels
import telethon.tl.types
from loguru import logger

from twtb.logic.shared.db import Database


[docs]async def run_periodical_subscribing(client: telethon.TelegramClient) -> None: """Run periodical subscribing to all channels in the database. Args: client: Telethon's client object. Must not be bot. """ logger.trace("Starting periodical subscribing...") while True: asyncio.create_task(subscribe_to_all_channels(client)) await asyncio.sleep(60)
[docs]async def subscribe_to_all_channels(client: telethon.TelegramClient) -> None: """Subscribe to all channels in the database. Args: client: Telethon's client. Must not be bot. """ logger.trace("Subscribing to all channels...") channels = await Database().get_all_channels() if len(channels) == 0: logger.warning("No channels were added yet!") return channels_info = await get_info_about_channels(client, channels) for channel in map(lambda channel: channel.username, filter(lambda channel: channel.left, channels_info)): logger.trace(f"Subscribing to {channel}...") await client(telethon.tl.functions.channels.JoinChannelRequest(channel)) # subscribe await client( telethon.tl.functions.account.UpdateNotifySettingsRequest( peer=channel, settings=telethon.tl.types.InputPeerNotifySettings(mute_until=2**31 - 1) ) ) # disable notifications await client.edit_folder(channel, 1) # move to archive
[docs]async def get_info_about_channels( client: telethon.TelegramClient, channels: t.Set[str] ) -> t.Set[telethon.tl.types.TypeChat]: """Get info about channels from Telegram. Args: client: Telethon's client. channels: A list of channels to get info about. Returns: Channels' info. """ try: peer_dialogs = await client(telethon.tl.functions.messages.GetPeerDialogsRequest(peers=channels)) except TypeError as exception: if isinstance(exception.__context__, telethon.errors.rpcerrorlist.UsernameNotOccupiedError): channel_name = exception.__context__.request.username logger.info(f"Added channel {channel_name!r} does not exist!") if channel_name in channels and await Database().delete_channel(channel_name): channels.discard(channel_name) else: logger.error(f"Channel (that doesn't exist) is not in the database! {channel_name=}") return await get_info_about_channels(client, channels) raise return t.cast(t.Set[telethon.tl.types.TypeChat], peer_dialogs.chats)