contact-mailer/main.py
Adrian Amaglio 849f81fabd bit more doc
2020-05-01 16:00:10 +02:00

371 lines
12 KiB
Python
Executable File
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import bottle
request = bottle.request
response = bottle.response
redirect = bottle.redirect
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import os # for environ vars
import sys # to print ot stderr
import re # to match our template system
import pymongo # database
from dotenv import load_dotenv
import random, string # for tokens
import html # for sanitization
##################################################### Bottle stuff ############################################$
# The exception that is thrown when an argument is missing
class MissingParameterException (Exception):
pass
class StripPathMiddleware(object):
'''
Get that slash out of the request
'''
def __init__(self, a):
self.a = a
def __call__(self, e, h):
e['PATH_INFO'] = e['PATH_INFO'].rstrip('/')
return self.a(e, h)
app = application = bottle.Bottle(catchall=False)
##################################################### Configuration ############################################$
def get_env(var, default=None):
"""var is an env var name, default is the value to return if var does not exist. If no default and no value, an exception is raised."""
if var in os.environ:
return os.environ[var]
elif default is not None:
return default
else:
raise MissingParameterException("Environment variable {} is missing".format(var))
# Token generation
token_chars = string.ascii_lowercase+string.ascii_uppercase+string.digits
token_len = 50
# form template regex
form_regex = '\{\{(\w+)(\|\w+)?\}\}'
# Load file from .env file.
load_dotenv(os.path.dirname(__file__) + '.env')
# Get address and port from env
listen_address = get_env('LISTEN_ADDRESS', '0.0.0.0')
listen_port = get_env('LISTEN_PORT', 8080)
# Get SMTP infos from env
smtp_server_address = get_env('SMTP_SERVER_ADDRESS')
smtp_server_port = get_env('SMTP_SERVER_PORT')
smtp_server_username = get_env('SMTP_SERVER_USERNAME')
smtp_server_password = get_env('SMTP_SERVER_PASSWORD')
smtp_server_sender = get_env('SMTP_SERVER_SENDER')
# Get mongodb connection
mongodb_host = get_env('MONGODB_HOST')
mongodb_port = get_env('MONGODB_PORT', '27017')
mongodb_dbname = get_env('MONGODB_DBNAME', 'contact_mailer')
# Security
admin_password = get_env('ADMIN_PASSWORD')
if 'SMTP_SSL' in os.environ and os.environ['SMTP_SSL'] == 'true':
security = 'ssl'
elif 'SMTP_STARTTLS' in os.environ and os.onviron['SMTP_STARTTLS'] == 'true':
security = 'starttls'
else:
raise MissingParameterException('No security env var (SMTP_SSL or SMTP_STARTTLS) have been defined. (Expected true or false)')
# mongodb initialization
mongodb_client = pymongo.MongoClient("mongodb://{}:{}/".format(mongodb_host, mongodb_port), connect=False, serverSelectionTimeoutMS=10000, connectTimeoutMS=10000)
mongodb_database = mongodb_client[mongodb_dbname]
##################################################### main route: mail submission ############################################$
@app.post('/submit')
def submission ():
# Getting subject
if 'token' in request.forms:
token = request.forms.getunicode('token')
else:
response.status = 400
return 'Le jeton dautentification est requis'
if 'mail' in request.forms:
from_address = request.forms.getunicode('mail')
else:
#response.status = 400
#return 'Le mail est requis'
from_address = ''
try:
form = mongodb_database['forms'].find({'token': token})[0]
except IndexError as e:
response.status = 400
return 'Le formulaire est introuvable'
except pymongo.errors.ServerSelectionTimeoutError as e:
response.status = 500
return 'La base de donnée nest pas accessible'
try:
subject_fields = fill_fields(request, get_fields(form['subject']))
content_fields = fill_fields(request, get_fields(form['content']))
except MissingParameterException as e:
response.status = 404
return str(e)
subject = re.sub(form_regex, r'{\1}', form['subject']).format(**subject_fields)
content = re.sub(form_regex, r'{\1}', form['content']).format(**content_fields)
try:
if not send_mail(from_address, form['mail'], subject, content):
response.status = 500
return 'Le mail na pas pu être envoyé.'
except SMTPDataError as e:
response.status = 500
return 'Le mail a été refusé.'
# Redirection
#redirect(success_redirect_default)
origin = request.headers.get('origin')
return '<p>Mail envoyé !</p>' + ('<p>Retour au <a href="{}">formulaire de contact</a></p>'.format(origin) if origin else '')
##################################################### Helpers ############################################$
def get_fields (string):
""" Parse the string looking for template elements and create an array with template to fill and their default values. None if mandatory. """
result = {}
for match in re.findall(form_regex, string):
result[match[0]] = match[1]
return result
def fill_fields(request, fields):
"""Look for fields in request and fill fields dict with values or let default ones. If the value is required, throw exception."""
for field in fields:
if field in request.forms:
fields[field] = html.escape(request.forms[field])
elif fields[field] == None:
raise MissingParameterException("Le champs {} est obligatoire".format(field))
return fields
def send_mail(from_address, to, subject, content):
"""Actually connect to smtp server, build a message object and send it as a mail"""
msg = MIMEMultipart()
msg['From'] = smtp_server_sender
msg.add_header('reply-to', from_address)
msg['To'] = to
msg['Subject'] = subject
msg.attach(MIMEText(content, 'plain', "utf-8"))
# SMTP preambles
if security == 'ssl':
smtp = smtplib.SMTP_SSL(smtp_server_address, smtp_server_port)
elif security == 'starttls':
smtp = smtplib.SMTP(smtp_server_address, smtp_server_port)
smtp.ehlo()
if security == 'starttls':
smtp.starttls()
smtp.ehlo()
# SMTP connection
smtp.login(smtp_server_username, smtp_server_password)
refused = smtp.sendmail(smtp_server_sender, to, msg.as_string())
smtp.close()
if refused:
print('Message was not send to ' + str(refused))
return False
return True
def login(request):
"""
Check if user is admin or simple user. Return a disct with _privilege key. dict is also a user if _privilege == 1
Privileges : 0=admin 1=loggedIn 1000=guest
"""
if 'admin_pass' in request.forms and request.forms['admin_pass'] == admin_password:
return {'_privilege':0}
if 'token' in request.forms:
token = request.forms.getunicode('token')
try:
user = mongodb_database['users'].find({'token': token})[0]
user['_privilege'] = 1
return user
except IndexError as e:
pass
except pymongo.errors.ServerSelectionTimeoutError as e:
response.status = 500
return 'La base de donnée nest pas accessible'
return {'_privilege': 1000} # anonymous
##################################################### Forms ############################################$
@app.post('/form')
def create_form ():
# Getting subject template
if 'subject' in request.forms:
subject = request.forms.getunicode('subject')
elif mail_default_subject != '':
subject = mail_default_subject
else:
response.status = 400
return 'Le champs « sujet » est requis'
# Getting mail content
if 'content' in request.forms:
content = request.forms.getunicode('content')
else:
response.status = 400
return 'Le champs « contenu » est requis'
# Getting from address
if 'mail' in request.forms:
mail = request.forms.getunicode('mail')
else:
response.status = 400
return 'Le champs « adresse » est requis'
user = login(request)
if user['_privilege'] > 1:
response.status = 400
return 'Privilèges insufisants'
# TODO limit the insertion rate
token = ''.join(random.sample(token_chars, token_len))
try:
inserted = mongodb_database['forms'].insert_one({
'mail': mail,
'content': content,
'subject': subject,
'user_id': user['_id'],
'token': token,
})
except pymongo.errors.ServerSelectionTimeoutError as e:
response.status = 500
return 'La base de donnée nest pas accessible'
return 'Créé : ' + token
@app.post('/form/list')
def list_forms ():
try:
user = login(request)
if user['_privilege'] == 0:
filt = {}
elif user['_privilege'] == 1:
filt = {'user_id': user['_id']}
else:
response.status = 400
return 'Privilèges insufisants'
data = mongodb_database['forms'].find(filt)
return bottle.template("list.tpl", data=data)
except pymongo.errors.ServerSelectionTimeoutError as e:
response.status = 500
return 'La base de donnée nest pas accessible'
@app.delete('/form/<token>')
def delete_form(token):
# If admin or form owner
user = login(request)
if user['_privilege'] > 1:
response.status = 400
return 'Privilèges insufisants'
# Actually delete
try:
form = mongodb_database['forms'].find({'token':token })[0]
except IndexError as e:
response.status = 400
return 'Le token nest pas valide'
except pymongo.errors.ServerSelectionTimeoutError as e:
response.status = 500
return 'La base de donnée nest pas accessible'
if user['_privilege'] == 0 or (form['user_id'] == user['_id']):
try:
mongodb_database['forms'].delete_one({
'token': token,
})
except pymongo.errors.ServerSelectionTimeoutError as e:
response.status = 500
return 'La base de donnée nest pas accessible'
return 'Supprimé ' + token
response.status = 400
return 'Privilèges insufisants'
##################################################### Users ############################################$
@app.post('/user/list')
def list_users ():
user = login(request)
if user['_privilege'] > 0:
response.status = 400
return 'Privilèges insufisants'
try:
data = mongodb_database['users'].find()
return bottle.template("list.tpl", data=data)
except pymongo.errors.ServerSelectionTimeoutError as e:
response.status = 500
return 'La base de donnée nest pas accessible'
@app.put('/user/<username>')
def create_user (username):
user = login(request)
if user['_privilege'] > 0:
response.status = 400
return 'Privilèges insufisants'
try:
mongodb_database['users'].find({'username': username})[0]
return 'Lutilisateur existe déjà'
except IndexError as e:
try:
inserted = mongodb_database['users'].insert_one({
'username': username,
'token': ''.join(random.sample(token_chars, token_len))
})
return 'Créé : ' + username
except pymongo.errors.ServerSelectionTimeoutError as e:
response.status = 500
return 'La base de donnée nest pas accessible'
except pymongo.errors.ServerSelectionTimeoutError as e:
response.status = 500
return 'La base de donnée nest pas accessible'
@app.delete('/user/<username>')
def delete_user (username):
user = login(request)
if user['_privilege'] > 0:
response.status = 400
return 'Privilèges insufisants'
try:
mongodb_database['users'].find({'username': username})[0]
mongodb_database['users'].delete_one({
'username': username,
})
return 'Supprimé ' + username
except IndexError as e:
response.status = 400
return 'Lutilisateur nexiste pas'
except pymongo.errors.ServerSelectionTimeoutError as e:
response.status = 500
return 'La base de donnée nest pas accessible'
##################################################### app startup ############################################$
if __name__ == '__main__':
bottle.run(app=StripPathMiddleware(app), host=listen_address, port=listen_port, debug=True)
else:
prod_app = StripPathMiddleware(app)