mutubot/main.py
2026-01-05 12:18:42 +01:00

205 lines
7.1 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import discord
import yaml
import requests
from datetime import date
import urllib.parse
from random import shuffle
import time
# To send discord messages (fucking async functions…)
import asyncio
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
scheduler.start()
from dotenv import load_dotenv
load_dotenv()
from email.message import EmailMessage
import smtplib, ssl
# Create a secure SSL context
ssl_context = ssl.create_default_context()
def load_guilds_data():
with open("guilds.yml", 'r') as stream:
try:
return yaml.safe_load(stream)
except yaml.YAMLError as exc:
print(exc)
def send_mass_mail(guild, subject, content):
return send_mail(guild, guild['mailing'], subject, content)
def send_mail(guild, to, subject, content):
print(to, content)
msg = EmailMessage()
msg['Subject'] = subject
msg['From'] = guild['mail_from']
msg['To'] = to
msg.set_content(content)
with smtplib.SMTP_SSL(guild['smtp_host'], guild['smtp_port'], context=ssl_context) as server:
server.login(guild['smtp_username'], guild['smtp_pass'])
return server.send_message(msg)
return False
async def mail_message(message):
subject = message.content.split('\n')[0][:50]
# TODO add … if message is not complete on the subject
print('Discord message received: ', subject)
if subject.lower() == 'test':
print('Message de test détecté, il ne sera pas relayé')
await message.reply('Message de test détecté, il ne sera pas relayé')
else:
try:
send_mass_mail(guilds[message.guild.id], f'[Mutubot] Message discord de {message.author.display_name} : {subject}', f'{message.author.display_name}:\n{message.content}')
except Exception as e:
await message.reply('Le message semble ne pas sêtre envoyé à la mailing')
print('Message non envoyé à la mailing', e)
else:
await message.reply('Message envoyé à la mailing')
def req(url, data):
x = requests.post(url, headers={'Content-Type': 'application/x-www-form-urlencoded'}, data=data)
if x.status_code != 200:
print('ERROR', x)
print(x.content)
raise Exception('Request error ' + str(x.status_code))
# Load some data
TOKEN = os.getenv('DISCORD_TOKEN')
guilds = load_guilds_data()
reminder_channels = []
for i in guilds:
if 'reminder_channel' in guilds[i]:
reminder_channels.append(guilds[i]['reminder_channel'])
from html.parser import HTMLParser
class TokenFinder(HTMLParser):
def __init__(self, admin_url):
self.token = None
self.public_link = None
self.delete_links = []
self.public_links = []
self.admin_url = admin_url
super().__init__()
def handle_starttag(self, tag, attrs_tuple):
if tag != 'input' and tag != 'a':
return
attrs = dict(attrs_tuple)
if tag == 'input' and 'type' in attrs and attrs['type'] == 'hidden' and 'name' in attrs and attrs['name'] == 'control' and 'value' in attrs :
self.token = attrs['value']
elif tag == 'input' and 'id' in attrs and attrs['id'] == 'public-link' and 'value' in attrs :
self.public_link = attrs['value']
elif tag == 'a' and 'href' in attrs and attrs['href'].startswith(self.admin_url + '/action/delete_column/') :
self.delete_links.append(attrs['href'])
elif tag == 'a' and 'href' in attrs and self.public_link and attrs['href'].startswith(self.public_link + '/vote/') :
self.public_links.append(attrs['href'])
def scrap_framavote (admin_url):
# Parse html to find data
finder = TokenFinder(admin_url)
finder.feed(requests.get(admin_url).content.decode('UTF-8'))
return finder
def send_mails_vote(guild, finder):
shuffle(guild['members'])
for mail,link in zip(guild['members'], finder.public_links):
content = f"""
Ce mail remplace tous les précédents sil y en a !
Vous avez été convié·e à un vote anonyme. Voici le lien où voter :
{link}
"""
send_mail(guild, mail, '[Mutubot] Vous êtes convié·e à un vote anonyme', content)
time.sleep(2)
@scheduler.scheduled_job('cron', day=25)
def cleaner ():
for guild_id in guilds:
erase_framadate(guilds[guild_id]['framadate_week'])
erase_framadate(guilds[guild_id]['framadate_weekend'])
# TODO erase calc revenus ?
@scheduler.scheduled_job('cron', day=1)
def reminder ():
# TODO on dirait que ça ne senvoie quau premier serveur
for i in guilds:
print(f"reminding {i} : {guilds[i]['mailing']}")
message = generate_reminder_message(guilds[i])
send_mass_mail(guilds[i], '[Mutubot] La mutunion cest bientôt !', message)
channel = client.get_channel(guilds[i]['reminder_channel'])
asyncio.run_coroutine_threadsafe(channel.send(message), client.loop)
def generate_reminder_message (guild):
# If the 10 is weekend
sondage = guild['framadate_week']
if date.today().replace(day=10).weekday() > 4:
sondage = guild['framadate_weekend']
return f"""
Coucou !
Il est lheure de déclarer ses revenus :
<{guild['link_declaration']}>
Et dannoncer à quelle heure vous souhaitez faire la mutunion :
<{sondage}>
(Voter vide quand on ne peut pas cest rapide et utile !)
Bon début de mois :D
Le mutubot
"""
# Discord API
intents = discord.Intents.default()
intents.message_content = True
client = discord.Client(intents=intents)
@client.event
async def on_ready():
print(f'{client.user} is connected to the following guild:\n')
for guild in client.guilds:
print(f'{guild.name} (id: {guild.id})')
@client.event
async def on_message(message):
if message.author == client.user:
return
if message.guild.id not in guilds:
return
if message.content.startswith('!randomvote '):
await message.reply('Vérifiez bien que le framadate a le bon nombre de ligne (une par votant·e) et de colonnes (une par option à voter) ? Répondez « Pamplemousse agrivoltaiste » pour confirmer que cest tout bon et lançer lenvoi des lien de vote')
return
elif message.content == 'Pamplemousse agrivoltaiste' and message.type == discord.MessageType.reply :
finder = scrap_framavote(guild['framavote'])
guild = guilds[message.guild.id]
nb_links = len(finder.public_links)
nb_membres = len(guild['members'])
if nb_links != nb_membres:
await message.reply("Il semblerait quil ny ait pas autant de lignes("+nb_links+") que de membres("+nb_membres+")…")
return
await message.reply("Ok je my met, patientez bien ça peut prendre plusieurs minutes")
send_mails_vote(guilds[message.guild.id], finder)
await message.reply("Cest fait ! Vérifiez que tout le monde a reçu les mails ; linformatique ce nest jamais bien déterministe… ")
return
elif message.channel.id in guilds[message.guild.id]['mailed_channels']:
await mail_message(message)
# Actually starts the bot
client.run(TOKEN)