2024-heraklion-testing-debu.../hands_on_solutions/massmail_solution/massmail
2024-08-26 13:54:13 +02:00

409 lines
14 KiB
Python

#!/usr/bin/env python
# Script to send mass email
#
# Copyright (C) 2003-2017 Tiziano Zito <opossumnano@gmail.com>, Jakob Jordan <jakobjordan@posteo.de>
#
# This script is free software and comes without any warranty, to
# the extent permitted by applicable law. You can redistribute it
# and/or modify it under the terms of the Do What The Fuck You Want
# To Public License, Version 2, as published by Sam Hocevar.
# http://www.wtfpl.net
#
# Full license text:
#
# DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
# Version 2, December 2004.
#
# Everyone is permitted to copy and distribute verbatim or modified
# copies of this license document, and changing it is allowed as long
# as the name is changed.
#
# DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
# TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
#
# 0. You just DO WHAT THE FUCK YOU WANT TO.
import smtplib, getopt, sys, os, email, getpass
import email.header
import email.mime.text
import tempfile
from py.test import raises
PROGNAME = os.path.basename(sys.argv[0])
USAGE = """Send mass mail
Usage:
%s [...] PARAMETER_FILE < BODY
Options:
-F FROM set the From: header for all messages.
Must be ASCII. This argument is required
-S SUBJECT set the Subject: header for all messages
-B BCC set the Bcc: header for all messages.
Must be ASCII
-s SEPARATOR set field separator in parameter file,
default: ";"
-e ENCODING set PARAMETER_FILE *and* BODY character set
encoding, default: "UTF-8". Note that if you fuck
up this one, your email will be full of rubbish:
You have been warned!
-f fake run: don't really send emails, just print to
standard output what would be done. Don't be scared
if you can not read the body: it is base64 encoded
UTF8 text
-z SERVER the SMTP server to use. This argument is required
-P PORT the SMTP port to use.
-u SMTP user name. If not set, use anonymous SMTP
connection
-p SMTP password. If not set you will be prompted for one
-h show this usage message
Notes:
The message body is read from standard input or
typed in interactively (exit with ^D) and keywords are subsituted with values
read from a parameter file. The first line of the parameter file defines
the keywords. The keyword $EMAIL$ must always be present and contains a comma
separated list of email addresses.
Keep in mind shell escaping when setting headers with white spaces or special
characters.
Character set encodings are those supported by python.
Examples:
* Example of a parameter file:
$NAME$; $SURNAME$; $EMAIL$
John; Smith; j@guys.com
Anne; Joyce; a@girls.com
* Example of body:
Dear $NAME$ $SURNAME$,
I think you are a great guy/girl!
Cheers,
My self.
* Example usage:
%s -F "Great Guy <gg@guys.com>" -S "You are a great guy" -B "Not so great Guy <ngg@guys.com>" parameter-file < body
"""%(PROGNAME, PROGNAME)
def error(s):
sys.stderr.write(PROGNAME+': ')
sys.stderr.write(s+'\n')
sys.stderr.flush()
sys.exit(-1)
def parse_command_line_options(arguments):
# parse options
try:
opts, args = getopt.getopt(arguments, "hfs:F:S:B:R:e:u:p:P:z:")
except getopt.GetoptError, err:
error(str(err)+USAGE)
# set default options
options = {
'sep': u';',
'fake': False,
'from': '',
'subject': '',
'bcc': '',
'encoding': 'utf-8',
'smtpuser': None,
'smtppassword': None,
'server': None,
'port': 0,
'in_reply_to': '',
}
for option, value in opts:
if option == "-e":
options['encoding'] = value
if option == "-s":
options['sep'] = value
elif option == "-F":
options['from'] = value
elif option == "-S":
options['subject'] = value
elif option == "-B":
options['bcc'] = value
elif option == "-R":
options['in_reply_to'] = value
elif option == "-h":
error(USAGE)
elif option == "-f":
options['fake'] = True
elif option == "-u":
options['smtpuser'] = value
elif option == "-p":
options['smtppassword'] = value
elif option == "-P":
options['port'] = int(value)
elif option == "-z":
options['server'] = value
if len(args) == 0:
error('You must specify a parameter file')
if len(options['from']) == 0:
error('You must set a from address with option -F')
if options['server'] is None:
error('You must set a SMTP server with option -z')
if options['sep'] == ",":
error('Separator can not be a comma')
# get password if needed
if options['smtpuser'] is not None and options['smtppassword'] is None:
options['smtppassword'] = getpass.getpass('Enter password for %s: '%options['smtpuser'])
# set filenames of parameter and mail body
options['fn_parameters'] = args[0]
return options
def parse_parameter_file(options):
pars_fh = open(options['fn_parameters'], 'rbU')
pars = pars_fh.read()
pars_fh.close()
try:
pars = unicode(pars, encoding=options['encoding'])
except UnicodeDecodeError, e:
error('Error decoding "'+options['fn_parameters']+'": '+str(e))
try:
options['subject'] = unicode(options['subject'], encoding=options['encoding'])
except UnicodeDecodeError, e:
error('Error decoding SUBJECT: '+str(e))
try:
options['from'] = unicode(options['from'], encoding=options['encoding'])
except UnicodeDecodeError, e:
error('Error decoding FROM: '+str(e))
if options['in_reply_to'] and not options['in_reply_to'].startswith('<'):
options['in_reply_to'] = '<{}>'.format(options['in_reply_to'])
# split lines
pars = pars.splitlines()
# get keywords from first line
keywords_list = [key.strip() for key in pars[0].split(options['sep'])]
# fail immediately if no EMAIL keyword is found
if '$EMAIL$' not in keywords_list:
error('No $EMAIL$ keyword found in %s'%options['fn_parameters'])
# check that all keywords start and finish with a '$' character
for key in keywords_list:
if not key.startswith('$') or not key.endswith('$'):
error(('Keyword "%s" malformed: should be $KEYWORD$'%key).encode(options['encoding']))
# gather all values
email_count = 0
keywords = dict([(key, []) for key in keywords_list])
for count, line in enumerate(pars[1:]):
# ignore empty lines
if len(line) == 0:
continue
values = [key.strip() for key in line.split(options['sep'])]
if len(values) != len(keywords_list):
error(('Line %d in "%s" malformed: %d values found instead of'
' %d: %s'%(count+1,options['fn_parameters'],len(values),len(keywords_list),line)).encode(options['encoding']))
for i, key in enumerate(keywords_list):
keywords[key].append(values[i])
email_count += 1
return keywords, email_count
def create_email_bodies(options, keywords, email_count, body):
try:
body = unicode(body, encoding=options['encoding'])
except UnicodeDecodeError, e:
error('Error decoding email body: '+str(e))
# find keywords and substitute with values
# we need to create email_count bodies
msgs = {}
for i in range(email_count):
lbody = body
for key in keywords:
lbody = lbody.replace(key, keywords[key][i])
# Any single dollar left? That means that the body was malformed
single_dollar_exists = lbody.count('$') != 2 * lbody.count('$$')
if single_dollar_exists:
raise ValueError('Malformed email body: unclosed placeholder or non-escaped dollar sign.')
# Replace double dollars with single dollars
lbody = lbody.replace('$$', '$')
# encode body
lbody = email.mime.text.MIMEText(lbody.encode(options['encoding']), 'plain', options['encoding'])
msgs[keywords['$EMAIL$'][i]] = lbody
return msgs
def add_email_headers(options, msgs):
# msgs is now a dictionary with {emailaddr:body}
# we need to add the headers
for emailaddr in msgs:
msg = msgs[emailaddr]
msg['To'] = str(emailaddr)
msg['From'] = email.header.Header(options['from'])
if options['subject']:
msg['Subject'] = email.header.Header(options['subject'].encode(options['encoding']), options['encoding'])
if options['in_reply_to']:
msg['In-Reply-To'] = email.header.Header(options['in_reply_to'])
msgs[emailaddr] = msg
return None
def send_messages(options, msgs):
server = smtplib.SMTP(options['server'], port=options['port'])
if options['smtpuser'] is not None:
server.starttls()
server.login(options['smtpuser'], options['smtppassword'])
for emailaddr in msgs:
print 'Sending email to:', emailaddr
emails = [e.strip() for e in emailaddr.split(',')]
if len(options['bcc']) > 0:
emails.append(options['bcc'])
if options['fake']:
print msgs[emailaddr].as_string()
else:
try:
out = server.sendmail(options['from'], emails, msgs[emailaddr].as_string())
except Exception, err:
error(str(err))
if len(out) != 0:
error(str(out))
server.close()
def test_dummy():
pass
def test_parse_parameter_file():
expected_keywords = {u'$VALUE$': [u'this is a test'], u'$EMAIL$': [u'testrecv@test']}
with tempfile.NamedTemporaryFile() as f:
f.write('$EMAIL$;$VALUE$\ntestrecv@test;this is a test')
f.flush()
cmd_options = [
'-F', 'testfrom@test',
'-z', 'localhost',
f.name,
]
options = parse_command_line_options(cmd_options)
keywords, email_count = parse_parameter_file(options)
assert keywords == expected_keywords
def test_local_sending():
parameter_string = '$EMAIL$;$NAME$;$VALUE$\ntestrecv@test.org;TestName;531'
email_body = 'Dear $NAME$,\nthis is a test: $VALUE$\nBest regards'
email_to = 'testrecv@test.org'
email_from = 'testfrom@test.org'
email_subject = 'Test Subject'
email_encoding = 'utf-8'
expected_email = email.mime.text.MIMEText('Dear TestName,\nthis is a test: 531\nBest regards'.encode(email_encoding), 'plain', email_encoding)
expected_email['To'] = email_to
expected_email['From'] = email_from
expected_email['Subject'] = email.header.Header(email_subject.encode(email_encoding), email_encoding)
with tempfile.NamedTemporaryFile() as f:
f.write(parameter_string)
f.flush()
cmd_options = [
'-F', email_from,
'-S', email_subject,
'-z', 'localhost',
'-e', email_encoding,
f.name
]
options = parse_command_line_options(cmd_options)
keywords, email_count = parse_parameter_file(options)
msgs = create_email_bodies(options, keywords, email_count, email_body)
add_email_headers(options, msgs)
assert msgs['testrecv@test.org'].as_string() == expected_email.as_string()
def test_malformed_body():
parameter_string = '$EMAIL$;$NAME$;$VALUE$\ntestrecv@test.org;TestName;531'
email_body = '$NAME$VALUE$'
email_from = 'testfrom@test.org'
email_subject = 'Test Subject'
email_encoding = 'utf-8'
with tempfile.NamedTemporaryFile() as f:
f.write(parameter_string)
f.flush()
cmd_options = [
'-F', email_from,
'-S', email_subject,
'-z', 'localhost',
'-e', email_encoding,
f.name
]
options = parse_command_line_options(cmd_options)
keywords, email_count = parse_parameter_file(options)
with raises(ValueError):
msgs = create_email_bodies(options, keywords, email_count, email_body)
def test_double_dollar():
parameter_string = '$EMAIL$;$NAME$;$VALUE$\ntestrecv@test.org;TestName;531'
email_body = 'Dear $NAME$,\nyou owe us 254$$'
email_to = 'testrecv@test.org'
email_from = 'testfrom@test.org'
email_subject = 'Test Subject'
email_encoding = 'utf-8'
expected_email = email.mime.text.MIMEText('Dear TestName,\nyou owe us 254$'.encode(email_encoding), 'plain', email_encoding)
expected_email['To'] = email_to
expected_email['From'] = email_from
expected_email['Subject'] = email.header.Header(email_subject.encode(email_encoding), email_encoding)
with tempfile.NamedTemporaryFile() as f:
f.write(parameter_string)
f.flush()
cmd_options = [
'-F', email_from,
'-S', email_subject,
'-z', 'localhost',
'-e', email_encoding,
f.name
]
options = parse_command_line_options(cmd_options)
keywords, email_count = parse_parameter_file(options)
msgs = create_email_bodies(options, keywords, email_count, email_body)
assert msgs['testrecv@test.org'].get_payload(decode=email_encoding) == expected_email.get_payload(decode=email_encoding)
if __name__ == '__main__':
options = parse_command_line_options(sys.argv[1:])
keywords, email_count = parse_parameter_file(options)
msgs = create_email_bodies(options, keywords, email_count, sys.stdin.read())
add_email_headers(options, msgs)
send_messages(options, msgs)