408 lines
		
	
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			408 lines
		
	
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python
 | |
| # Script to send mass email
 | |
| #
 | |
| # Copyright (C) 2003-2017 Tiziano Zito <opossumnano@gmail.com>, Jakob Jordan <jakobjordan@posteo.de>
 | |
| #
 | |
| # This script is free software and comes without any warranty, to
 | |
| # the extent permitted by applicable law. You can redistribute it
 | |
| # and/or modify it under the terms of the Do What The Fuck You Want
 | |
| # To Public License, Version 2, as published by Sam Hocevar.
 | |
| # http://www.wtfpl.net
 | |
| # 
 | |
| # Full license text:
 | |
| # 
 | |
| # DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
 | |
| # Version 2, December 2004.
 | |
| #
 | |
| # Everyone is permitted to copy and distribute verbatim or modified
 | |
| # copies of this license document, and changing it is allowed as long
 | |
| # as the name is changed.
 | |
| #
 | |
| # DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
 | |
| # TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
 | |
| #
 | |
| # 0. You just DO WHAT THE FUCK YOU WANT TO.
 | |
| 
 | |
| import smtplib, getopt, sys, os, email, getpass
 | |
| import email.header
 | |
| import email.mime.text
 | |
| import tempfile
 | |
| 
 | |
| from py.test import raises
 | |
| 
 | |
| 
 | |
| PROGNAME = os.path.basename(sys.argv[0])
 | |
| USAGE = """Send mass mail
 | |
| Usage:
 | |
|   %s [...] PARAMETER_FILE < BODY
 | |
| 
 | |
| Options:
 | |
|   -F FROM           set the From: header for all messages.
 | |
|                     Must be ASCII. This argument is required
 | |
| 
 | |
|   -S SUBJECT        set the Subject: header for all messages
 | |
| 
 | |
|   -B BCC            set the Bcc: header for all messages.
 | |
|                     Must be ASCII
 | |
| 
 | |
|   -s SEPARATOR      set field separator in parameter file,
 | |
|                     default: ";"
 | |
| 
 | |
|   -e ENCODING       set PARAMETER_FILE *and* BODY character set
 | |
|                     encoding, default: "UTF-8". Note that if you fuck
 | |
|                     up this one, your email will be full of rubbish:
 | |
|                     You have been warned!
 | |
| 
 | |
|   -f                fake run: don't really send emails, just print to
 | |
|                     standard output what would be done. Don't be scared
 | |
|                     if you can not read the body: it is base64 encoded
 | |
|                     UTF8 text
 | |
| 
 | |
|   -z SERVER         the SMTP server to use. This argument is required
 | |
| 
 | |
|   -P PORT           the SMTP port to use.
 | |
| 
 | |
|   -u                SMTP user name. If not set, use anonymous SMTP 
 | |
|                     connection
 | |
| 
 | |
|   -p                SMTP password. If not set you will be prompted for one
 | |
| 
 | |
|   -h                show this usage message
 | |
| 
 | |
| Notes:
 | |
|   The message body is read from standard input or
 | |
|   typed in interactively (exit with ^D) and keywords are subsituted with values
 | |
|   read from a parameter file. The first line of the parameter file defines
 | |
|   the keywords. The keyword $EMAIL$ must always be present and contains a comma
 | |
|   separated list of email addresses.
 | |
|   Keep in mind shell escaping when setting headers with white spaces or special
 | |
|   characters.
 | |
|   Character set encodings are those supported by python.
 | |
| 
 | |
| Examples:
 | |
| 
 | |
| * Example of a parameter file:
 | |
| 
 | |
| $NAME$; $SURNAME$; $EMAIL$
 | |
| John; Smith; j@guys.com
 | |
| Anne; Joyce; a@girls.com
 | |
| 
 | |
| * Example of body:
 | |
| 
 | |
| Dear $NAME$ $SURNAME$,
 | |
| 
 | |
| I think you are a great guy/girl!
 | |
| 
 | |
| Cheers,
 | |
| 
 | |
| My self.
 | |
| 
 | |
| * Example usage:
 | |
| 
 | |
| %s  -F "Great Guy <gg@guys.com>" -S "You are a great guy" -B "Not so great Guy <ngg@guys.com>" parameter-file < body
 | |
|  
 | |
| """%(PROGNAME, PROGNAME)
 | |
| 
 | |
| def error(s):
 | |
|     sys.stderr.write(PROGNAME+': ')
 | |
|     sys.stderr.write(s+'\n')
 | |
|     sys.stderr.flush()
 | |
|     sys.exit(-1)
 | |
| 
 | |
| def parse_command_line_options(arguments):
 | |
|     # parse options
 | |
|     try:
 | |
|         opts, args = getopt.getopt(arguments, "hfs:F:S:B:R:e:u:p:P:z:")
 | |
|     except getopt.GetoptError, err:
 | |
|         error(str(err)+USAGE)
 | |
| 
 | |
|     # set default options
 | |
|     options = {
 | |
|         'sep': u';',
 | |
|         'fake': False,
 | |
|         'from': '',
 | |
|         'subject': '',
 | |
|         'bcc': '',
 | |
|         'encoding': 'utf-8',
 | |
|         'smtpuser': None,
 | |
|         'smtppassword': None,
 | |
|         'server': None,
 | |
|         'port': 0,
 | |
|         'in_reply_to': '',
 | |
|     }
 | |
| 
 | |
|     for option, value in opts:
 | |
|         if option == "-e":
 | |
|             options['encoding'] = value
 | |
|         if option == "-s":
 | |
|             options['sep'] = value
 | |
|         elif option == "-F":
 | |
|             options['from'] = value
 | |
|         elif option == "-S":
 | |
|             options['subject'] = value
 | |
|         elif option == "-B":
 | |
|             options['bcc'] = value
 | |
|         elif option == "-R":
 | |
|             options['in_reply_to'] = value
 | |
|         elif option == "-h":
 | |
|             error(USAGE)
 | |
|         elif option == "-f":
 | |
|             options['fake'] = True
 | |
|         elif option == "-u":
 | |
|             options['smtpuser'] = value
 | |
|         elif option == "-p":
 | |
|             options['smtppassword'] = value
 | |
|         elif option == "-P":
 | |
|             options['port'] = int(value)
 | |
|         elif option == "-z":
 | |
|             options['server'] = value
 | |
| 
 | |
|     if len(args) == 0:
 | |
|         error('You must specify a parameter file')
 | |
| 
 | |
|     if len(options['from']) == 0:
 | |
|         error('You must set a from address with option -F')
 | |
| 
 | |
|     if options['server'] is None:
 | |
|         error('You must set a SMTP server with option -z')
 | |
| 
 | |
|     if options['sep'] == ",":
 | |
|         error('Separator can not be a comma')
 | |
| 
 | |
|     # get password if needed
 | |
|     if options['smtpuser'] is not None and options['smtppassword'] is None:
 | |
|         options['smtppassword'] = getpass.getpass('Enter password for %s: '%options['smtpuser'])
 | |
| 
 | |
|     # set filenames of parameter and mail body
 | |
|     options['fn_parameters'] = args[0]
 | |
| 
 | |
|     return options
 | |
| 
 | |
| def parse_parameter_file(options):
 | |
|     pars_fh = open(options['fn_parameters'], 'rbU')
 | |
|     pars = pars_fh.read()
 | |
|     pars_fh.close()
 | |
| 
 | |
|     try:
 | |
|         pars = unicode(pars, encoding=options['encoding'])
 | |
|     except UnicodeDecodeError, e:
 | |
|         error('Error decoding "'+options['fn_parameters']+'": '+str(e))
 | |
| 
 | |
|     try:
 | |
|         options['subject'] = unicode(options['subject'], encoding=options['encoding'])
 | |
|     except UnicodeDecodeError, e:
 | |
|         error('Error decoding SUBJECT: '+str(e))
 | |
| 
 | |
|     try:
 | |
|         options['from'] = unicode(options['from'], encoding=options['encoding'])
 | |
|     except UnicodeDecodeError, e:
 | |
|         error('Error decoding FROM: '+str(e))
 | |
| 
 | |
|     if options['in_reply_to'] and not options['in_reply_to'].startswith('<'):
 | |
|         options['in_reply_to'] = '<{}>'.format(options['in_reply_to'])
 | |
| 
 | |
|     # split lines
 | |
|     pars = pars.splitlines()
 | |
| 
 | |
|     # get keywords from first line
 | |
|     keywords_list = [key.strip() for key in pars[0].split(options['sep'])]
 | |
| 
 | |
|     # fail immediately if no EMAIL keyword is found
 | |
|     if '$EMAIL$' not in keywords_list:
 | |
|         error('No $EMAIL$ keyword found in %s'%options['fn_parameters'])
 | |
| 
 | |
|     # check that all keywords start and finish with a '$' character
 | |
|     for key in keywords_list:
 | |
|         if not key.startswith('$') or not key.endswith('$'):
 | |
|             error(('Keyword "%s" malformed: should be $KEYWORD$'%key).encode(options['encoding']))
 | |
| 
 | |
|     # gather all values
 | |
|     email_count = 0
 | |
|     keywords = dict([(key, []) for key in keywords_list])
 | |
|     for count, line in enumerate(pars[1:]):
 | |
|         # ignore empty lines
 | |
|         if len(line) == 0:
 | |
|             continue
 | |
|         values = [key.strip() for key in line.split(options['sep'])]
 | |
|         if len(values) != len(keywords_list):
 | |
|             error(('Line %d in "%s" malformed: %d values found instead of'
 | |
|                    ' %d: %s'%(count+1,options['fn_parameters'],len(values),len(keywords_list),line)).encode(options['encoding']))
 | |
|         for i, key in enumerate(keywords_list):
 | |
|             keywords[key].append(values[i])
 | |
|         email_count += 1
 | |
| 
 | |
|     return keywords, email_count
 | |
| 
 | |
| def create_email_bodies(options, keywords, email_count, body):
 | |
|     try:
 | |
|         body = unicode(body, encoding=options['encoding'])
 | |
|     except UnicodeDecodeError, e:
 | |
|         error('Error decoding email body: '+str(e))
 | |
| 
 | |
|     # find keywords and substitute with values
 | |
|     # we need to create email_count bodies
 | |
|     msgs = {}
 | |
| 
 | |
|     for i in range(email_count):
 | |
|         lbody = body
 | |
|         for key in keywords:
 | |
|             lbody = lbody.replace(key, keywords[key][i])
 | |
| 
 | |
|         # Any single dollar left? That means that the body was malformed
 | |
|         single_dollar_exists = lbody.count('$') != 2 * lbody.count('$$')
 | |
|         if single_dollar_exists:
 | |
|             raise ValueError('Malformed email body: unclosed placeholder or non-escaped dollar sign.')
 | |
| 
 | |
|         # Replace double dollars with single dollars
 | |
|         lbody = lbody.replace('$$', '$')
 | |
| 
 | |
|         # encode body
 | |
|         lbody = email.mime.text.MIMEText(lbody.encode(options['encoding']), 'plain', options['encoding'])
 | |
|         msgs[keywords['$EMAIL$'][i]] = lbody
 | |
| 
 | |
|     return msgs
 | |
| 
 | |
| def add_email_headers(options, msgs):
 | |
|     # msgs is now a dictionary with {emailaddr:body}
 | |
|     # we need to add the headers
 | |
| 
 | |
|     for emailaddr in msgs:
 | |
|         msg = msgs[emailaddr]
 | |
|         msg['To'] = str(emailaddr)
 | |
|         msg['From'] = email.header.Header(options['from'])
 | |
|         if options['subject']:
 | |
|             msg['Subject'] = email.header.Header(options['subject'].encode(options['encoding']), options['encoding'])
 | |
|         if options['in_reply_to']:
 | |
|             msg['In-Reply-To'] = email.header.Header(options['in_reply_to'])
 | |
|         msgs[emailaddr] = msg
 | |
| 
 | |
|     return None
 | |
| 
 | |
| def send_messages(options, msgs):
 | |
|     server = smtplib.SMTP(options['server'], port=options['port'])
 | |
| 
 | |
|     if options['smtpuser'] is not None:
 | |
|         server.starttls()
 | |
|         server.login(options['smtpuser'], options['smtppassword'])
 | |
| 
 | |
|     for emailaddr in msgs:
 | |
|         print 'Sending email to:', emailaddr
 | |
|         emails = [e.strip() for e in emailaddr.split(',')]
 | |
|         if len(options['bcc']) > 0:
 | |
|             emails.append(options['bcc'])
 | |
|         if options['fake']:
 | |
|             print msgs[emailaddr].as_string()
 | |
|         else:
 | |
|             try:
 | |
|                 out = server.sendmail(options['from'], emails, msgs[emailaddr].as_string())
 | |
|             except Exception, err:
 | |
|                 error(str(err))
 | |
| 
 | |
|             if len(out) != 0:
 | |
|                 error(str(out))
 | |
| 
 | |
|     server.close()
 | |
| 
 | |
| def test_dummy():
 | |
|     pass
 | |
| 
 | |
| def test_parse_parameter_file():
 | |
|     expected_keywords = {u'$VALUE$': [u'this is a test'], u'$EMAIL$': [u'testrecv@test']}
 | |
|     with tempfile.NamedTemporaryFile() as f:
 | |
|         f.write('$EMAIL$;$VALUE$\ntestrecv@test;this is a test')
 | |
|         f.flush()
 | |
|         cmd_options = [
 | |
|             '-F', 'testfrom@test',
 | |
|             '-z', 'localhost',
 | |
|             f.name,
 | |
|         ]
 | |
|         options = parse_command_line_options(cmd_options)
 | |
|         keywords, email_count = parse_parameter_file(options)
 | |
|     assert keywords == expected_keywords
 | |
| 
 | |
| def test_local_sending():
 | |
|     parameter_string = '$EMAIL$;$NAME$;$VALUE$\ntestrecv@test.org;TestName;531'
 | |
|     email_body = 'Dear $NAME$,\nthis is a test: $VALUE$\nBest regards'
 | |
|     email_to = 'testrecv@test.org'
 | |
|     email_from = 'testfrom@test.org'
 | |
|     email_subject = 'Test Subject'
 | |
|     email_encoding = 'utf-8'
 | |
| 
 | |
|     expected_email = email.mime.text.MIMEText('Dear TestName,\nthis is a test: 531\nBest regards'.encode(email_encoding), 'plain', email_encoding)
 | |
|     expected_email['To'] = email_to
 | |
|     expected_email['From'] = email_from
 | |
|     expected_email['Subject'] = email.header.Header(email_subject.encode(email_encoding), email_encoding)
 | |
| 
 | |
|     with tempfile.NamedTemporaryFile() as f:
 | |
|         f.write(parameter_string)
 | |
|         f.flush()
 | |
|         cmd_options = [
 | |
|             '-F', email_from,
 | |
|             '-S', email_subject,
 | |
|             '-z', 'localhost',
 | |
|             '-e', email_encoding,
 | |
|             f.name
 | |
|         ]
 | |
|         options = parse_command_line_options(cmd_options)
 | |
|         keywords, email_count = parse_parameter_file(options)
 | |
|         msgs = create_email_bodies(options, keywords, email_count, email_body)
 | |
|         add_email_headers(options, msgs)
 | |
|         assert msgs['testrecv@test.org'].as_string() == expected_email.as_string()
 | |
|        
 | |
| def test_malformed_body():
 | |
|     parameter_string = '$EMAIL$;$NAME$;$VALUE$\ntestrecv@test.org;TestName;531'
 | |
|     email_body = '$NAME$VALUE$'
 | |
|     email_from = 'testfrom@test.org'
 | |
|     email_subject = 'Test Subject'
 | |
|     email_encoding = 'utf-8'
 | |
| 
 | |
|     with tempfile.NamedTemporaryFile() as f:
 | |
|         f.write(parameter_string)
 | |
|         f.flush()
 | |
|         cmd_options = [
 | |
|             '-F', email_from,
 | |
|             '-S', email_subject,
 | |
|             '-z', 'localhost',
 | |
|             '-e', email_encoding,
 | |
|             f.name
 | |
|         ]
 | |
|         options = parse_command_line_options(cmd_options)
 | |
|         keywords, email_count = parse_parameter_file(options)
 | |
|         with raises(ValueError):
 | |
|             msgs = create_email_bodies(options, keywords, email_count, email_body)
 | |
| 
 | |
| def test_double_dollar():
 | |
|     parameter_string = '$EMAIL$;$NAME$;$VALUE$\ntestrecv@test.org;TestName;531'
 | |
|     email_body = 'Dear $NAME$,\nyou owe us 254$$'
 | |
|     email_to = 'testrecv@test.org'
 | |
|     email_from = 'testfrom@test.org'
 | |
|     email_subject = 'Test Subject'
 | |
|     email_encoding = 'utf-8'
 | |
| 
 | |
|     expected_email = email.mime.text.MIMEText('Dear TestName,\nyou owe us 254$'.encode(email_encoding), 'plain', email_encoding)
 | |
|     expected_email['To'] = email_to
 | |
|     expected_email['From'] = email_from
 | |
|     expected_email['Subject'] = email.header.Header(email_subject.encode(email_encoding), email_encoding)
 | |
| 
 | |
|     with tempfile.NamedTemporaryFile() as f:
 | |
|         f.write(parameter_string)
 | |
|         f.flush()
 | |
|         cmd_options = [
 | |
|             '-F', email_from,
 | |
|             '-S', email_subject,
 | |
|             '-z', 'localhost',
 | |
|             '-e', email_encoding,
 | |
|             f.name
 | |
|         ]
 | |
|         options = parse_command_line_options(cmd_options)
 | |
|         keywords, email_count = parse_parameter_file(options)
 | |
|         msgs = create_email_bodies(options, keywords, email_count, email_body)
 | |
|         assert msgs['testrecv@test.org'].get_payload(decode=email_encoding) == expected_email.get_payload(decode=email_encoding)
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     options = parse_command_line_options(sys.argv[1:])
 | |
|     keywords, email_count = parse_parameter_file(options)
 | |
|     msgs = create_email_bodies(options, keywords, email_count, sys.stdin.read())
 | |
|     add_email_headers(options, msgs)
 | |
|     send_messages(options, msgs)
 |