contact-mailer/main.py
Adrian Amaglio 6828c5684c v2
2020-04-30 00:08:00 +02:00

324 lines
11 KiB
Python
Executable File
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import bottle
request = bottle.request
response = bottle.response
redirect = bottle.redirect
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import os
import re
import pymongo
from dotenv import load_dotenv
import random, string
import html # for sanitization
# Load file from .env file.
load_dotenv(os.path.dirname(__file__) + '.env')
token_chars = string.ascii_lowercase+string.ascii_uppercase+string.digits
token_len = 50
# The exception that is thrown when an argument is missing
class MissingParameterException (Exception):
pass
app = application = bottle.Bottle()
# Get address and port from env
listen_address = os.environ['LISTEN_ADDRESS'] if 'LISTEN_ADDRESS' in os.environ else '0.0.0.0'
listen_port = os.environ['LISTEN_PORT'] if 'LISTEN_PORT' in os.environ else 8080
# Get mail related informations from env
mail_default_subject = os.environ['MAIL_DEFAULT_SUBJECT'] if 'MAIL_DEFAULT_SUBJECT' in os.environ else 'Nouveau message'
mail_subject_prefix = os.environ['MAIL_SUBJECT_PREFIX'] if 'MAIL_SUBJECT_PREFIX' in os.environ else '[Contact]'
# Redirect info
success_redirect_default = os.environ['SUCCESS_REDIRECT_DEFAULT'] if 'SUCCESS_REDIRECT_DEFAULT' in os.environ else '/success'
failure_redirect_default = os.environ['FAILURE_REDIRECT_DEFAULT'] if 'FAILURE_REDIRECT_DEFAULT' in os.environ else '/fail'
# Get SMTP infos from env
if 'SMTP_SERVER_ADDRESS' in os.environ:
smtp_server_address = os.environ['SMTP_SERVER_ADDRESS']
else:
raise MissingParameterException("Environment variable SMTP_SERVER_ADDRESS is missing")
if 'SMTP_SERVER_PORT' in os.environ:
smtp_server_port = os.environ['SMTP_SERVER_PORT']
else:
raise MissingParameterException("Environment variable SMTP_SERVER_PORT is missing")
if 'SMTP_SERVER_USERNAME' in os.environ:
smtp_server_username = os.environ['SMTP_SERVER_USERNAME']
else:
raise MissingParameterException("Environment variable SMTP_SERVER_USERNAME is missing")
if 'SMTP_SERVER_PASSWORD' in os.environ:
smtp_server_password = os.environ['SMTP_SERVER_PASSWORD']
else:
raise MissingParameterException("return Environment variable SMTP_SERVER_PASSWORD is missing")
if 'SMTP_SERVER_SENDER' in os.environ:
smtp_server_sender = os.environ['SMTP_SERVER_SENDER']
else:
raise MissingParameterException("Environment variable SMTP_SERVER_SENDER is missing")
# Get mongodb connection
if 'MONGODB_HOST' in os.environ:
mongodb_host = os.environ['MONGODB_HOST']
else:
raise MissingParameterException("Environment variable MONGODB_HOST is missing")
mongodb_port = os.environ['MONGODB_PORT'] if 'MONGODB_PORT' in os.environ else '27017'
mongodb_dbname = os.environ['MONGODB_DBNAME'] if 'MONGODB_DBNAME' in os.environ else 'contact_mailer'
# Security
if 'SMTP_SSL' in os.environ and os.environ['SMTP_SSL'] == 'true':
security = 'ssl'
elif 'SMTP_STARTTLS' in os.environ and os.onviron['SMTP_STARTTLS'] == 'true':
security = 'starttls'
else:
raise MissingParameterException('No security env var (SMTP_SSL or SMTP_STARTTLS) have been defined. (Expected true or false)')
if 'ADMIN_PASSWORD' in os.environ:
admin_password = os.environ['ADMIN_PASSWORD']
else:
raise MissingParameterException("Environment variable ADMIN_PASSWORD is missing")
# mongodb initialization
mongodb_client = pymongo.MongoClient("mongodb://{}:{}/".format(mongodb_host, mongodb_port))
mongodb_database = mongodb_client[mongodb_dbname]
# form template regex
form_regex = '\{\{(\w+)(\|\w+)?\}\}'
@app.post('/submit')
@app.post('/submit/')
def submission ():
# Getting subject
if 'token' in request.forms:
token = request.forms.getunicode('token')
else:
response.status = 400
return 'Le jeton dautentification est requis'
if 'mail' in request.forms:
from_address = request.forms.getunicode('mail')
else:
response.status = 400
return 'Le mail est requis'
try:
form = mongodb_database['forms'].find({'token': token})[0]
except IndexError as e:
response.status = 400
return 'Lauthentification a échouée'
try:
subject_fields = fill_fields(request, get_fields(form['subject']))
content_fields = fill_fields(request, get_fields(form['content']))
except MissingParameterException as e:
response.status = 404
return str(e)
subject = re.sub(form_regex, r'{\1}', form['subject']).format(**subject_fields)
content = re.sub(form_regex, r'{\1}', form['content']).format(**content_fields)
if not send_mail(from_address, form['mail'], subject, content):
response.status = 500
return 'Le mail na pas pu être envoyé.'
# Redirection
#redirect(success_redirect_default)
return 'Mail envoyé !'
def get_fields (string):
""" Parse the string looking for template elements and create an array with template to fill and their default values. None if mandatory. """
result = {}
for match in re.findall(form_regex, string):
result[match[0]] = match[1]
return result
def fill_fields(request, fields):
"""Look for fields in request and fill fields dict with values or let default ones. If the value is required, throw exception."""
for field in fields:
if field in request.forms:
fields[field] = html.escape(request.forms[field])
elif fields[field] == None:
raise MissingParameterException("Le champs {} est obligatoire".format(field))
return fields
def send_mail(from_address, to, subject, content):
"""Actually connect to smtp server, build a message object and send it as a mail"""
msg = MIMEMultipart()
msg['From'] = smtp_server_sender
msg.add_header('reply-to', from_address)
msg['To'] = to
msg['Subject'] = subject
msg.attach(MIMEText(content, 'plain', "utf-8"))
# SMTP preambles
if security == 'ssl':
smtp = smtplib.SMTP_SSL(smtp_server_address, smtp_server_port)
elif security == 'starttls':
smtp = smtplib.SMTP(smtp_server_address, smtp_server_port)
smtp.ehlo()
if security == 'starttls':
smtp.starttls()
smtp.ehlo()
# SMTP connection
smtp.login(smtp_server_username, smtp_server_password)
refused = smtp.sendmail(smtp_server_sender, to, msg.as_string())
smtp.close()
if refused:
print('Message was not send to ' + str(refused))
return False
return True
@app.post('/form')
@app.post('/form/')
def create_form ():
# Getting subject template
if 'subject' in request.forms:
subject = request.forms.getunicode('subject')
elif mail_default_subject != '':
subject = mail_default_subject
else:
response.status = 400
return 'Le champs « sujet » est requis'
# Getting mail content
if 'content' in request.forms:
content = request.forms.getunicode('content')
else:
response.status = 400
return 'Le champs « contenu » est requis'
# Getting from address
if 'mail' in request.forms:
mail = request.forms.getunicode('mail')
else:
response.status = 400
return 'Le champs « adresse » est requis'
# Getting auth token
if 'token' in request.forms:
token = request.forms.getunicode('token')
else:
response.status = 400
return 'Le jeton dautentification na pas été envoyé'
try:
user = mongodb_database['users'].find({'token': token})[0]
except IndexError as e:
response.status = 400
return 'Lauthentification a échouée'
# TODO limit the insertion rate
token = ''.join(random.sample(token_chars, token_len))
inserted = mongodb_database['forms'].insert_one({
'mail': mail,
'content': content,
'subject': subject,
'user_id': user['_id'],
'token': token,
})
return 'Créé : ' + token
##################################################### Admin ############################################$
@app.post('/admin/list')
@app.post('/admin/list/')
def admin_list ():
if not ('admin_pass' in request.forms and request.forms['admin_pass'] == admin_password):
response.status = 400
return 'Le champs « admin_pass » est requis'
return bottle.template("list.tpl", mongodb_database=mongodb_database)
@app.put('/user/<username>')
@app.put('/user/<username>/')
def create_user (username):
if not ('admin_pass' in request.forms and request.forms['admin_pass'] == admin_password):
response.status = 400
return 'Le champs « admin_pass » est requis'
try:
mongodb_database['users'].find({'username': username})[0]
return 'Lutilisateur existe déjà'
except IndexError as e:
pass
inserted = mongodb_database['users'].insert_one({
'username': username,
'token': ''.join(random.sample(token_chars, token_len))
})
return 'Créé : ' + username
@app.delete('/user/<username>')
@app.delete('/user/<username>/')
def delete_user (username):
if not ('admin_pass' in request.forms and request.forms['admin_pass'] == admin_password):
response.status = 400
return 'Le champs « admin_pass » est requis'
try:
mongodb_database['users'].find({'username': username})[0]
except IndexError as e:
response.status = 400
return 'Lutilisateur nexiste pas'
mongodb_database['users'].delete_one({
'username': username,
})
return 'Supprimé ' + username
@app.delete('/form/<token>')
@app.delete('/form/<token>/')
def delete_form(token):
# If admin or form owner
admin = False
if 'admin_pass' in request.forms and request.forms['admin_pass'] == admin_password:
admin = True
user_token = False
if 'token' in request.forms:
try:
user = mongodb_database['users'].find({'token':request.forms['token']})[0]
user_token = True
except IndexError as e:
pass
# Actually delete
try:
form = mongodb_database['forms'].find({'token':token })[0]
except IndexError as e:
response.status = 400
return 'Le token nest pas valide'
if (user_token and form['user_id'] == user['_id']) or admin:
mongodb_database['forms'].delete_one({
'token': token,
})
return 'Supprimé ' + token
response.status = 400
return 'Vous navez pas les droits pour supprimer ce formulaire'
class StripPathMiddleware(object):
'''
Get that slash out of the request
'''
def __init__(self, a):
self.a = a
def __call__(self, e, h):
e['PATH_INFO'] = e['PATH_INFO'].rstrip('/')
return self.a(e, h)
if __name__ == '__main__':
bottle.run(app=StripPathMiddleware(app), host=listen_address, port=listen_port, debug=True)