#!/usr/bin/env python # Script to send mass email # # Copyright (C) 2003-2017 Tiziano Zito , Jakob Jordan # # This script is free software and comes without any warranty, to # the extent permitted by applicable law. You can redistribute it # and/or modify it under the terms of the Do What The Fuck You Want # To Public License, Version 2, as published by Sam Hocevar. # http://www.wtfpl.net # # Full license text: # # DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE # Version 2, December 2004. # # Everyone is permitted to copy and distribute verbatim or modified # copies of this license document, and changing it is allowed as long # as the name is changed. # # DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE # TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION # # 0. You just DO WHAT THE FUCK YOU WANT TO. import smtplib, getopt, sys, os, email, getpass import email.header import email.mime.text import tempfile from py.test import raises PROGNAME = os.path.basename(sys.argv[0]) USAGE = """Send mass mail Usage: %s [...] PARAMETER_FILE < BODY Options: -F FROM set the From: header for all messages. Must be ASCII. This argument is required -S SUBJECT set the Subject: header for all messages -B BCC set the Bcc: header for all messages. Must be ASCII -s SEPARATOR set field separator in parameter file, default: ";" -e ENCODING set PARAMETER_FILE *and* BODY character set encoding, default: "UTF-8". Note that if you fuck up this one, your email will be full of rubbish: You have been warned! -f fake run: don't really send emails, just print to standard output what would be done. Don't be scared if you can not read the body: it is base64 encoded UTF8 text -z SERVER the SMTP server to use. This argument is required -P PORT the SMTP port to use. -u SMTP user name. If not set, use anonymous SMTP connection -p SMTP password. If not set you will be prompted for one -h show this usage message Notes: The message body is read from standard input or typed in interactively (exit with ^D) and keywords are subsituted with values read from a parameter file. The first line of the parameter file defines the keywords. The keyword $EMAIL$ must always be present and contains a comma separated list of email addresses. Keep in mind shell escaping when setting headers with white spaces or special characters. Character set encodings are those supported by python. Examples: * Example of a parameter file: $NAME$; $SURNAME$; $EMAIL$ John; Smith; j@guys.com Anne; Joyce; a@girls.com * Example of body: Dear $NAME$ $SURNAME$, I think you are a great guy/girl! Cheers, My self. * Example usage: %s -F "Great Guy " -S "You are a great guy" -B "Not so great Guy " parameter-file < body """%(PROGNAME, PROGNAME) def error(s): sys.stderr.write(PROGNAME+': ') sys.stderr.write(s+'\n') sys.stderr.flush() sys.exit(-1) def parse_command_line_options(arguments): # parse options try: opts, args = getopt.getopt(arguments, "hfs:F:S:B:R:e:u:p:P:z:") except getopt.GetoptError, err: error(str(err)+USAGE) # set default options options = { 'sep': u';', 'fake': False, 'from': '', 'subject': '', 'bcc': '', 'encoding': 'utf-8', 'smtpuser': None, 'smtppassword': None, 'server': None, 'port': 0, 'in_reply_to': '', } for option, value in opts: if option == "-e": options['encoding'] = value if option == "-s": options['sep'] = value elif option == "-F": options['from'] = value elif option == "-S": options['subject'] = value elif option == "-B": options['bcc'] = value elif option == "-R": options['in_reply_to'] = value elif option == "-h": error(USAGE) elif option == "-f": options['fake'] = True elif option == "-u": options['smtpuser'] = value elif option == "-p": options['smtppassword'] = value elif option == "-P": options['port'] = int(value) elif option == "-z": options['server'] = value if len(args) == 0: error('You must specify a parameter file') if len(options['from']) == 0: error('You must set a from address with option -F') if options['server'] is None: error('You must set a SMTP server with option -z') if options['sep'] == ",": error('Separator can not be a comma') # get password if needed if options['smtpuser'] is not None and options['smtppassword'] is None: options['smtppassword'] = getpass.getpass('Enter password for %s: '%options['smtpuser']) # set filenames of parameter and mail body options['fn_parameters'] = args[0] return options def parse_parameter_file(options): pars_fh = open(options['fn_parameters'], 'rbU') pars = pars_fh.read() pars_fh.close() try: pars = unicode(pars, encoding=options['encoding']) except UnicodeDecodeError, e: error('Error decoding "'+options['fn_parameters']+'": '+str(e)) try: options['subject'] = unicode(options['subject'], encoding=options['encoding']) except UnicodeDecodeError, e: error('Error decoding SUBJECT: '+str(e)) try: options['from'] = unicode(options['from'], encoding=options['encoding']) except UnicodeDecodeError, e: error('Error decoding FROM: '+str(e)) if options['in_reply_to'] and not options['in_reply_to'].startswith('<'): options['in_reply_to'] = '<{}>'.format(options['in_reply_to']) # split lines pars = pars.splitlines() # get keywords from first line keywords_list = [key.strip() for key in pars[0].split(options['sep'])] # fail immediately if no EMAIL keyword is found if '$EMAIL$' not in keywords_list: error('No $EMAIL$ keyword found in %s'%options['fn_parameters']) # check that all keywords start and finish with a '$' character for key in keywords_list: if not key.startswith('$') or not key.endswith('$'): error(('Keyword "%s" malformed: should be $KEYWORD$'%key).encode(options['encoding'])) # gather all values email_count = 0 keywords = dict([(key, []) for key in keywords_list]) for count, line in enumerate(pars[1:]): # ignore empty lines if len(line) == 0: continue values = [key.strip() for key in line.split(options['sep'])] if len(values) != len(keywords_list): error(('Line %d in "%s" malformed: %d values found instead of' ' %d: %s'%(count+1,options['fn_parameters'],len(values),len(keywords_list),line)).encode(options['encoding'])) for i, key in enumerate(keywords_list): keywords[key].append(values[i]) email_count += 1 return keywords, email_count def create_email_bodies(options, keywords, email_count, body): try: body = unicode(body, encoding=options['encoding']) except UnicodeDecodeError, e: error('Error decoding email body: '+str(e)) # find keywords and substitute with values # we need to create email_count bodies msgs = {} for i in range(email_count): lbody = body for key in keywords: lbody = lbody.replace(key, keywords[key][i]) # Any single dollar left? That means that the body was malformed single_dollar_exists = lbody.count('$') != 2 * lbody.count('$$') if single_dollar_exists: raise ValueError('Malformed email body: unclosed placeholder or non-escaped dollar sign.') # Replace double dollars with single dollars lbody = lbody.replace('$$', '$') # encode body lbody = email.mime.text.MIMEText(lbody.encode(options['encoding']), 'plain', options['encoding']) msgs[keywords['$EMAIL$'][i]] = lbody return msgs def add_email_headers(options, msgs): # msgs is now a dictionary with {emailaddr:body} # we need to add the headers for emailaddr in msgs: msg = msgs[emailaddr] msg['To'] = str(emailaddr) msg['From'] = email.header.Header(options['from']) if options['subject']: msg['Subject'] = email.header.Header(options['subject'].encode(options['encoding']), options['encoding']) if options['in_reply_to']: msg['In-Reply-To'] = email.header.Header(options['in_reply_to']) msgs[emailaddr] = msg return None def send_messages(options, msgs): server = smtplib.SMTP(options['server'], port=options['port']) if options['smtpuser'] is not None: server.starttls() server.login(options['smtpuser'], options['smtppassword']) for emailaddr in msgs: print 'Sending email to:', emailaddr emails = [e.strip() for e in emailaddr.split(',')] if len(options['bcc']) > 0: emails.append(options['bcc']) if options['fake']: print msgs[emailaddr].as_string() else: try: out = server.sendmail(options['from'], emails, msgs[emailaddr].as_string()) except Exception, err: error(str(err)) if len(out) != 0: error(str(out)) server.close() def test_dummy(): pass def test_parse_parameter_file(): expected_keywords = {u'$VALUE$': [u'this is a test'], u'$EMAIL$': [u'testrecv@test']} with tempfile.NamedTemporaryFile() as f: f.write('$EMAIL$;$VALUE$\ntestrecv@test;this is a test') f.flush() cmd_options = [ '-F', 'testfrom@test', '-z', 'localhost', f.name, ] options = parse_command_line_options(cmd_options) keywords, email_count = parse_parameter_file(options) assert keywords == expected_keywords def test_local_sending(): parameter_string = '$EMAIL$;$NAME$;$VALUE$\ntestrecv@test.org;TestName;531' email_body = 'Dear $NAME$,\nthis is a test: $VALUE$\nBest regards' email_to = 'testrecv@test.org' email_from = 'testfrom@test.org' email_subject = 'Test Subject' email_encoding = 'utf-8' expected_email = email.mime.text.MIMEText('Dear TestName,\nthis is a test: 531\nBest regards'.encode(email_encoding), 'plain', email_encoding) expected_email['To'] = email_to expected_email['From'] = email_from expected_email['Subject'] = email.header.Header(email_subject.encode(email_encoding), email_encoding) with tempfile.NamedTemporaryFile() as f: f.write(parameter_string) f.flush() cmd_options = [ '-F', email_from, '-S', email_subject, '-z', 'localhost', '-e', email_encoding, f.name ] options = parse_command_line_options(cmd_options) keywords, email_count = parse_parameter_file(options) msgs = create_email_bodies(options, keywords, email_count, email_body) add_email_headers(options, msgs) assert msgs['testrecv@test.org'].as_string() == expected_email.as_string() def test_malformed_body(): parameter_string = '$EMAIL$;$NAME$;$VALUE$\ntestrecv@test.org;TestName;531' email_body = '$NAME$VALUE$' email_from = 'testfrom@test.org' email_subject = 'Test Subject' email_encoding = 'utf-8' with tempfile.NamedTemporaryFile() as f: f.write(parameter_string) f.flush() cmd_options = [ '-F', email_from, '-S', email_subject, '-z', 'localhost', '-e', email_encoding, f.name ] options = parse_command_line_options(cmd_options) keywords, email_count = parse_parameter_file(options) with raises(ValueError): msgs = create_email_bodies(options, keywords, email_count, email_body) def test_double_dollar(): parameter_string = '$EMAIL$;$NAME$;$VALUE$\ntestrecv@test.org;TestName;531' email_body = 'Dear $NAME$,\nyou owe us 254$$' email_to = 'testrecv@test.org' email_from = 'testfrom@test.org' email_subject = 'Test Subject' email_encoding = 'utf-8' expected_email = email.mime.text.MIMEText('Dear TestName,\nyou owe us 254$'.encode(email_encoding), 'plain', email_encoding) expected_email['To'] = email_to expected_email['From'] = email_from expected_email['Subject'] = email.header.Header(email_subject.encode(email_encoding), email_encoding) with tempfile.NamedTemporaryFile() as f: f.write(parameter_string) f.flush() cmd_options = [ '-F', email_from, '-S', email_subject, '-z', 'localhost', '-e', email_encoding, f.name ] options = parse_command_line_options(cmd_options) keywords, email_count = parse_parameter_file(options) msgs = create_email_bodies(options, keywords, email_count, email_body) assert msgs['testrecv@test.org'].get_payload(decode=email_encoding) == expected_email.get_payload(decode=email_encoding) if __name__ == '__main__': options = parse_command_line_options(sys.argv[1:]) keywords, email_count = parse_parameter_file(options) msgs = create_email_bodies(options, keywords, email_count, sys.stdin.read()) add_email_headers(options, msgs) send_messages(options, msgs)