contact-mailer/main.py

356 lines
12 KiB
Python
Raw Normal View History

2020-04-29 22:07:36 +00:00
import bottle
request = bottle.request
response = bottle.response
redirect = bottle.redirect
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
2020-05-01 06:24:21 +00:00
import os # for environ vars
2020-04-30 16:46:01 +00:00
import sys # to print ot stderr
2020-05-01 06:24:21 +00:00
import re # to match our template system
import pymongo # database
2020-04-29 22:07:36 +00:00
from dotenv import load_dotenv
2020-05-01 06:24:21 +00:00
import random, string # for tokens
2020-04-29 22:07:36 +00:00
import html # for sanitization
2020-05-01 06:24:21 +00:00
##################################################### Bottle stuff ############################################$
2020-04-29 22:07:36 +00:00
# The exception that is thrown when an argument is missing
class MissingParameterException (Exception):
pass
2020-05-01 06:24:21 +00:00
class StripPathMiddleware(object):
'''
Get that slash out of the request
'''
def __init__(self, a):
self.a = a
def __call__(self, e, h):
e['PATH_INFO'] = e['PATH_INFO'].rstrip('/')
return self.a(e, h)
2020-04-29 22:07:36 +00:00
app = application = bottle.Bottle()
2020-05-01 06:24:21 +00:00
##################################################### Configuration ############################################$
# Load file from .env file.
load_dotenv(os.path.dirname(__file__) + '.env')
token_chars = string.ascii_lowercase+string.ascii_uppercase+string.digits
token_len = 50
2020-04-29 22:07:36 +00:00
# Get address and port from env
listen_address = os.environ['LISTEN_ADDRESS'] if 'LISTEN_ADDRESS' in os.environ else '0.0.0.0'
listen_port = os.environ['LISTEN_PORT'] if 'LISTEN_PORT' in os.environ else 8080
# Get mail related informations from env
mail_default_subject = os.environ['MAIL_DEFAULT_SUBJECT'] if 'MAIL_DEFAULT_SUBJECT' in os.environ else 'Nouveau message'
mail_subject_prefix = os.environ['MAIL_SUBJECT_PREFIX'] if 'MAIL_SUBJECT_PREFIX' in os.environ else '[Contact]'
# Redirect info
success_redirect_default = os.environ['SUCCESS_REDIRECT_DEFAULT'] if 'SUCCESS_REDIRECT_DEFAULT' in os.environ else '/success'
failure_redirect_default = os.environ['FAILURE_REDIRECT_DEFAULT'] if 'FAILURE_REDIRECT_DEFAULT' in os.environ else '/fail'
# Get SMTP infos from env
if 'SMTP_SERVER_ADDRESS' in os.environ:
smtp_server_address = os.environ['SMTP_SERVER_ADDRESS']
else:
raise MissingParameterException("Environment variable SMTP_SERVER_ADDRESS is missing")
if 'SMTP_SERVER_PORT' in os.environ:
smtp_server_port = os.environ['SMTP_SERVER_PORT']
else:
raise MissingParameterException("Environment variable SMTP_SERVER_PORT is missing")
if 'SMTP_SERVER_USERNAME' in os.environ:
smtp_server_username = os.environ['SMTP_SERVER_USERNAME']
else:
raise MissingParameterException("Environment variable SMTP_SERVER_USERNAME is missing")
if 'SMTP_SERVER_PASSWORD' in os.environ:
smtp_server_password = os.environ['SMTP_SERVER_PASSWORD']
else:
raise MissingParameterException("return Environment variable SMTP_SERVER_PASSWORD is missing")
if 'SMTP_SERVER_SENDER' in os.environ:
smtp_server_sender = os.environ['SMTP_SERVER_SENDER']
else:
raise MissingParameterException("Environment variable SMTP_SERVER_SENDER is missing")
# Get mongodb connection
if 'MONGODB_HOST' in os.environ:
mongodb_host = os.environ['MONGODB_HOST']
else:
raise MissingParameterException("Environment variable MONGODB_HOST is missing")
mongodb_port = os.environ['MONGODB_PORT'] if 'MONGODB_PORT' in os.environ else '27017'
mongodb_dbname = os.environ['MONGODB_DBNAME'] if 'MONGODB_DBNAME' in os.environ else 'contact_mailer'
# Security
if 'SMTP_SSL' in os.environ and os.environ['SMTP_SSL'] == 'true':
security = 'ssl'
elif 'SMTP_STARTTLS' in os.environ and os.onviron['SMTP_STARTTLS'] == 'true':
security = 'starttls'
else:
raise MissingParameterException('No security env var (SMTP_SSL or SMTP_STARTTLS) have been defined. (Expected true or false)')
if 'ADMIN_PASSWORD' in os.environ:
admin_password = os.environ['ADMIN_PASSWORD']
else:
raise MissingParameterException("Environment variable ADMIN_PASSWORD is missing")
# mongodb initialization
mongodb_client = pymongo.MongoClient("mongodb://{}:{}/".format(mongodb_host, mongodb_port))
mongodb_database = mongodb_client[mongodb_dbname]
# form template regex
form_regex = '\{\{(\w+)(\|\w+)?\}\}'
2020-04-30 16:46:01 +00:00
@app.post('/fail')
def fail ():
a = 2/0
print('lol, failed', file=sys.stderr)
return 'failed'
2020-04-29 22:07:36 +00:00
@app.post('/submit')
def submission ():
# Getting subject
if 'token' in request.forms:
token = request.forms.getunicode('token')
else:
response.status = 400
return 'Le jeton dautentification est requis'
2020-04-30 20:38:33 +00:00
2020-04-29 22:07:36 +00:00
if 'mail' in request.forms:
from_address = request.forms.getunicode('mail')
else:
2020-04-30 20:38:33 +00:00
#response.status = 400
#return 'Le mail est requis'
from_address = ''
2020-04-29 22:07:36 +00:00
try:
form = mongodb_database['forms'].find({'token': token})[0]
except IndexError as e:
response.status = 400
2020-04-30 20:38:33 +00:00
return 'Le formulaire est introuvable'
2020-04-29 22:07:36 +00:00
try:
subject_fields = fill_fields(request, get_fields(form['subject']))
content_fields = fill_fields(request, get_fields(form['content']))
except MissingParameterException as e:
response.status = 404
return str(e)
subject = re.sub(form_regex, r'{\1}', form['subject']).format(**subject_fields)
content = re.sub(form_regex, r'{\1}', form['content']).format(**content_fields)
if not send_mail(from_address, form['mail'], subject, content):
response.status = 500
return 'Le mail na pas pu être envoyé.'
# Redirection
#redirect(success_redirect_default)
2020-04-30 15:24:13 +00:00
origin = request.headers.get('origin')
2020-04-30 20:38:33 +00:00
return '<p>Mail envoyé !</p>' + ('<p>Retour au <a href="{}">formulaire de contact</a></p>'.format(origin) if origin else '')
2020-04-29 22:07:36 +00:00
2020-04-30 20:38:33 +00:00
##################################################### Helpers ############################################$
2020-04-29 22:07:36 +00:00
def get_fields (string):
""" Parse the string looking for template elements and create an array with template to fill and their default values. None if mandatory. """
result = {}
for match in re.findall(form_regex, string):
result[match[0]] = match[1]
return result
def fill_fields(request, fields):
"""Look for fields in request and fill fields dict with values or let default ones. If the value is required, throw exception."""
for field in fields:
if field in request.forms:
fields[field] = html.escape(request.forms[field])
elif fields[field] == None:
raise MissingParameterException("Le champs {} est obligatoire".format(field))
return fields
def send_mail(from_address, to, subject, content):
"""Actually connect to smtp server, build a message object and send it as a mail"""
msg = MIMEMultipart()
msg['From'] = smtp_server_sender
msg.add_header('reply-to', from_address)
msg['To'] = to
msg['Subject'] = subject
msg.attach(MIMEText(content, 'plain', "utf-8"))
# SMTP preambles
if security == 'ssl':
smtp = smtplib.SMTP_SSL(smtp_server_address, smtp_server_port)
elif security == 'starttls':
smtp = smtplib.SMTP(smtp_server_address, smtp_server_port)
smtp.ehlo()
if security == 'starttls':
smtp.starttls()
smtp.ehlo()
# SMTP connection
smtp.login(smtp_server_username, smtp_server_password)
refused = smtp.sendmail(smtp_server_sender, to, msg.as_string())
smtp.close()
if refused:
print('Message was not send to ' + str(refused))
return False
return True
2020-04-30 20:38:33 +00:00
def login(request):
"""
Check if user is admin or simple user. Return a disct with _privilege key. dict is also a user if _privilege == 1
Privileges : 0=admin 1=loggedIn 1000=guest
"""
if 'admin_pass' in request.forms and request.forms['admin_pass'] == admin_password:
return {'_privilege':0}
if 'token' in request.forms:
token = request.forms.getunicode('token')
try:
user = mongodb_database['users'].find({'token': token})[0]
user['_privilege'] = 1
return user
except IndexError as e:
pass
return {'_privilege': 1000} # anonymous
##################################################### Forms ############################################$
2020-04-29 22:07:36 +00:00
@app.post('/form')
def create_form ():
# Getting subject template
if 'subject' in request.forms:
subject = request.forms.getunicode('subject')
elif mail_default_subject != '':
subject = mail_default_subject
else:
response.status = 400
return 'Le champs « sujet » est requis'
# Getting mail content
if 'content' in request.forms:
content = request.forms.getunicode('content')
else:
response.status = 400
return 'Le champs « contenu » est requis'
# Getting from address
if 'mail' in request.forms:
mail = request.forms.getunicode('mail')
else:
response.status = 400
return 'Le champs « adresse » est requis'
2020-04-30 20:38:33 +00:00
user = login(request)
if user['_privilege'] > 1:
2020-04-29 22:07:36 +00:00
response.status = 400
2020-04-30 20:38:33 +00:00
return 'Privilèges insufisants'
2020-04-29 22:07:36 +00:00
# TODO limit the insertion rate
token = ''.join(random.sample(token_chars, token_len))
inserted = mongodb_database['forms'].insert_one({
'mail': mail,
'content': content,
'subject': subject,
'user_id': user['_id'],
'token': token,
})
return 'Créé : ' + token
2020-04-30 20:38:33 +00:00
@app.post('/form/list')
def list_forms ():
user = login(request)
if user['_privilege'] == 0:
filt = {}
elif user['_privilege'] == 1:
filt = {'user_id': user['_id']}
else:
response.status = 400
return 'Privilèges insufisants'
return bottle.template("list.tpl", data=mongodb_database['forms'].find(filt))
2020-04-29 22:07:36 +00:00
2020-04-30 20:38:33 +00:00
@app.delete('/form/<token>')
def delete_form(token):
# If admin or form owner
user = login(request)
if user['_privilege'] > 1:
2020-04-29 22:07:36 +00:00
response.status = 400
2020-04-30 20:38:33 +00:00
return 'Privilèges insufisants'
2020-04-29 22:07:36 +00:00
2020-04-30 20:38:33 +00:00
# Actually delete
try:
form = mongodb_database['forms'].find({'token':token })[0]
except IndexError as e:
response.status = 400
return 'Le token nest pas valide'
if user['_privilege'] == 0 or (form['user_id'] == user['_id']):
mongodb_database['forms'].delete_one({
'token': token,
})
return 'Supprimé ' + token
response.status = 400
return 'Privilèges insufisants'
##################################################### Users ############################################$
@app.post('/user/list')
def list_users ():
user = login(request)
if user['_privilege'] > 0:
response.status = 400
return 'Privilèges insufisants'
return bottle.template("list.tpl", data=mongodb_database['users'].find())
2020-04-29 22:07:36 +00:00
@app.put('/user/<username>')
def create_user (username):
2020-04-30 20:38:33 +00:00
user = login(request)
if user['_privilege'] > 0:
2020-04-29 22:07:36 +00:00
response.status = 400
2020-04-30 20:38:33 +00:00
return 'Privilèges insufisants'
2020-04-29 22:07:36 +00:00
try:
mongodb_database['users'].find({'username': username})[0]
return 'Lutilisateur existe déjà'
except IndexError as e:
2020-04-30 20:38:33 +00:00
inserted = mongodb_database['users'].insert_one({
'username': username,
'token': ''.join(random.sample(token_chars, token_len))
})
return 'Créé : ' + username
2020-04-29 22:07:36 +00:00
@app.delete('/user/<username>')
def delete_user (username):
2020-04-30 20:38:33 +00:00
user = login(request)
if user['_privilege'] > 0:
2020-04-29 22:07:36 +00:00
response.status = 400
2020-04-30 20:38:33 +00:00
return 'Privilèges insufisants'
2020-04-29 22:07:36 +00:00
try:
mongodb_database['users'].find({'username': username})[0]
except IndexError as e:
response.status = 400
return 'Lutilisateur nexiste pas'
mongodb_database['users'].delete_one({
'username': username,
})
return 'Supprimé ' + username
if __name__ == '__main__':
bottle.run(app=StripPathMiddleware(app), host=listen_address, port=listen_port, debug=True)
2020-05-01 06:24:21 +00:00
else:
prod_app = StripPathMiddleware(app)