Telethon Snippets.bak
128 subscribers
1 link
You can find useful snippets for telethon here.
If you want to add any, make a submission in @TelethonChat.
Download Telegram
Unless otherwise specified, the following applies to all snippets:

from telethon import TelegramClient, sync, errors
from telethon.tl import functions as f, types as t
client = TelegramClient(...).start() # successfully logged in
👍1
Delete all migrated chats from your dialogs

for d in client.iter_dialogs():
if d.is_group and not d.is_channel:
if d.entity.migrated_to:
client(f.messages.DeleteHistoryRequest(d.input_entity.chat_id, client.get_messages(d.input_entity)[0].id, False))

#delete #migrated
👍1
Counts the amount of photos above/below a given size threshold

threshold = 75 * 1024 # 75kb
below = 0
above = 0
chat = 'mychat'
filter = t.InputMessagesFilterPhotos

for m in client.iter_messages(chat, filter=filter):
if m.photo.sizes[-1].size < threshold:
below += 1
else:
above += 1
print('Below: {}\nAbove: {}\nTotal: {}'.format(below, above, below + above))

#count #media #photos
👍1
Reply to a message with "haste" to upload to hastebin and edit your message to the URL
Note: does not handle failure of http request.

import requests

@client.on(events.NewMessage(outgoing=True, pattern="(?i)^haste$"))
def haste(e):
if e.is_reply and e.get_reply_message().message:
text = e.get_reply_message().text
e.edit("[Haste link to the above](https://hastebin.com/{}) ".format(requests.post("https://hastebin.com/documents", data=text).json()['key']))


#hastebin #haste
👍1
Block, report and clear all new PMs. Use with caution

whitelist = {d.id for d in client.get_dialogs() if d.id > 0}

@client.on(events.NewMessage)
async def handler(event):
if not event.is_private or event.chat_id in whitelist:
return
who = await event.get_input_chat()
await client(f.messages.ReportSpamRequest(who))
await client(f.contacts.BlockRequest(who))
await client(f.messages.DeleteHistoryRequest(who, 0))

#block #PM #report
👍1
Get % of people who participated in the last 5000 messages

group = 'somegroup'
people = {}
total = client.get_participants(group, limit=0).total
for m in client.iter_messages(group, limit=5000):
if not m.sender.bot:
people[m.sender.id] = m.sender.first_name
print(len(people)/total)


#analytics #stats #participation
👍1
Get the usage count of the top 50 words in the chat

class custom(dict):
def __missing__(self, key): return 0

words = custom()
progress = event.reply("Processed 0 messages")
total = 0
for msg in client.iter_messages(group):
total += 1
if total % 200 == 0:
progress.edit("Processed {} messages".format(total))
if msg.text:
for word in msg.text.split():
words[word.lower()] += 1
global freq
freq = sorted(words, key=words.get, reverse=True)
out = ""
for i in range(51):
out += "{}. {}:{}\n".format(i+1, words[freq[i]], freq[i])

progress.edit(out, parse_mode=None)


#analytics #stats #words
👍1
Helper function for determining how long ago a user was online.
Sample usage: within_6_days = [x for x in client.iter_participants(group) if online_within(x, 6)]

import datetime

def online_within(participant, days):
status = participant.status
if isinstance(status, t.UserStatusOnline):
return True

last_seen = status.was_online if isinstance(status, t.UserStatusOffline) else None

if last_seen:
now = datetime.datetime.now(tz=datetime.timezone.utc)
diff = now - last_seen
return diff <= datetime.timedelta(days=days)

if isinstance(status, t.UserStatusRecently) and days >= 1 \
or isinstance(status, t.UserStatusLastWeek) and days >= 7 \
or isinstance(status, t.UserStatusLastMonth) and days >= 30:
return True

return False


#filter #status #user
1
Example on how to send a file multiple times with only one upload:

file = client.upload_file(path_to_photo)
#send as photo
client.send_file(chat, file)
#send as doc
client.send_file(chat, file, force_document=True)

#file #upload #cache
Note that this only works in master branch at time of writing. Expect implementation in v1.6
🔥3🥰1
Count amount of stickers installed and print some stats

sets = client(functions.messages.GetAllStickersRequest(0)).sets
scnt = sum(x.count for x in sets)
print(f'{scnt} stickers across {len(sets)} sets (average of {scnt / len(sets):.2f} stickers per pack)')


#stickers #stats
🔥1
Not a telethon snippet but very useful nonetheless:
Log to a rotating file. Blood/sweat/tears were shed in the discovery of this

logger = logging.getLogger()
level = logging.INFO
logger.setLevel(level)
# 5*5MB log files max:
h = logging.handlers.RotatingFileHandler('/path/to/log/file', encoding='utf-8', maxBytes=5 * 1024 * 1024, backupCount=5)
# example of format: 2019-04-05 20:28:45,944 INFO: blah
h.setFormatter(logging.Formatter("%(asctime)s\t%(levelname)s:\t%(message)s"))
h.setLevel(level)
logger.addHandler(h)

#logging
😁1
Print out all the people that have unicode in their names, along with what their name would look like without it

for member in client.iter_participants(some_chat):
x = member.first_name
y = member.first_name.encode().decode('ascii', 'ignore')
if x != y:
print(f"{x}\t|\t{y}")


#members #ascii #unicode
👍1🔥1
Print out all the channels you created and left

with client.takeout() as takeout:
channels = [c for c in takeout(f.channels.GetLeftChannelsRequest(0)).chats if c.creator]
for channel in channels:
print(channel.title)
#do other stuff with channel if you want


#channels #takeout #left
👍1
You know what it does

from random import shuffle
members = client.get_participants(channel, limit=None)
shuffle(members)
for i in range(members.total//2):
client(f.channels.EditBannedRequest(channel, members[i], t.ChannelBannedRights(0, view_messages=True)))


#purple #guy #meme
👍1
Remove all deleted accounts

from time import time
from datetime import datetime

# ban for a day, to remove from banned list automatically
rights = t.ChatBannedRights(datetime.fromtimestamp(time()+86000), view_messages=True)
ban = f.channels.EditBannedRequest
requests = []
for m in client.iter_participants(somechat):
if m.deleted:
requests.append(ban(somechat, m, rights))

client(requests)

#clean #ban #deleted
Delete all messages from now until the replied one (inclusive)

@client.on(events.NewMessage(outgoing=True, pattern="!purge", func=lambda e: e.is_reply))
def purge(event):
reply = event.get_reply_message()
temp = [event.id]
for msg in client.get_messages(event.input_chat, min_id=reply.id-1, max_id=event.id):
temp.append(msg.id)
client.delete_messages(event.input_chat, temp)


#delete #purge
👍1
Note that I'm still not using async so you'll need to adapt any code that doesn't work. Should be simple though
Event filter decorator for whether a user or bot


import functools

def filter_user_type(kind, _cache={}):
if kind == 'user':
bot = False
elif kind == 'bot':
bot = True
else:
raise TypeError('user type must be either "user" or "bot"')

def decorator(func):
@functools.wraps(func)
async def wrapped(event, *args, **kwargs):
if not event.is_private:
return await func(event, *args, **kwargs)
if event.sender_id not in _cache:
_cache[event.sender_id] = (await event.get_sender()).bot
if _cache[event.sender_id] == bot:
return await func(event, *args, **kwargs)
return wrapped
return decorator

Usage:

@client.on(events.NewMessage)
@filter_user_type('bot')
async def foo(e):
print('found a bot!')
Reply to a message with !fixreply to re-send your latest message as a reply to it

@client.on(events.NewMessage(outgoing=True, pattern="^!fixreply", func=lambda e: e.reply_to_msg_id))
def fixreply(event):
try:
oldmsg = client.get_messages(event.input_chat, limit=2, from_user=client.iss.one)[1]
if oldmsg.media and not isinstance(oldmsg.media, t.iss.onessageMediaWebPage):
client.send_message(event.input_chat, oldmsg, reply_to=event.reply_to_msg_id)
event.delete()
else:
event.edit(oldmsg.text)
except Exception:
event.delete()
else:
oldmsg.delete()

#fixreply
🔥2