#!/usr/bin/python -W ignore::DeprecationWarning # A BackupPC script to archive a host's files to Amazon S3. # # Point $Conf{ArchiveClientCmd} at me. # Requires python-boto # # Usage: BackupPC_archiveHost tarCreatePath splitPath parPath host bkupNum \ # compPath fileExt splitSize outLoc parFile share # # Create secrets.py such that it has: # accesskey = 'amazon aws access key' # sharedkey = 'amazon aws shared key' # gpgsymmetrickey = 'gpg symmetric key -- make it good, but do not lose it' # # Copyright (c) 2009-2011 Ryan S. Tucker # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. import glob import hashlib import os import socket import sys import time from multiprocessing import Process, Queue, cpu_count from subprocess import * from boto.s3.connection import S3Connection from boto.s3.key import Key import boto.exception import logging import logging.handlers import secrets logger = logging.getLogger(__name__) sysloghandler = logging.handlers.SysLogHandler('/dev/log', facility=logging.handlers.SysLogHandler.LOG_DAEMON) syslogformatter = logging.Formatter('%(filename)s: %(levelname)s: %(message)s') sysloghandler.setFormatter(syslogformatter) logger.addHandler(sysloghandler) consolehandler = logging.StreamHandler(sys.stdout) consoleformatter = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s') consolehandler.setFormatter(consoleformatter) logger.addHandler(consolehandler) logger.setLevel(logging.DEBUG) class VerifyError(Exception): pass def is_exe(fpath): return os.path.exists(fpath) and os.access(fpath, os.X_OK) def encrypt_file(filename, key, compress='/bin/cat'): compressmap = {'cat': 'none', 'gzip': 'ZLIB', 'bzip2': 'BZIP2'} if os.path.basename(compress) in compressmap.keys(): compress_algo = compressmap[os.path.basename(compress)] else: compress_algo = 'none' cmd = ['/usr/bin/gpg', '--batch', '--no-tty'] cmd.extend(['--compress-algo', compress_algo]) cmd.extend(['--output', '%s.gpg' % filename]) cmd.extend(['--passphrase-fd', '0']) cmd.extend(['--symmetric', filename]) if is_exe(cmd[0]): logger.debug('encrypt_file: encrypting %s (compression: %s)' % (filename, compress_algo)) else: raise RuntimeError('%s is not an executable file!' % cmd[0]) proc = Popen(cmd, preexec_fn=lambda : os.nice(10), stdin=PIPE, stdout=PIPE) proc.communicate(key) if os.path.exists(filename + '.gpg'): oldfilesize = os.path.getsize(filename) newfilesize = os.path.getsize(filename + '.gpg') compressed = ((oldfilesize - newfilesize) / float(oldfilesize)) * 100 logger.debug('encrypt_file: %s %s by %.2f%% (%i -> %i bytes)' % (filename, 'shrunk' if oldfilesize>newfilesize else 'grew', compressed, oldfilesize, newfilesize)) return filename + '.gpg' else: raise RuntimeError('output file does not exist: %s.gpg' % filename) def open_s3(accesskey, sharedkey, host): conn = S3Connection(accesskey, sharedkey, is_secure=True) mybucketname = (accesskey + '-bkup-' + host).lower() try: bucket = conn.get_bucket(mybucketname) except boto.exception.S3ResponseError: logger.info('open_s3: creating new bucket %s' % mybucketname) bucket = conn.create_bucket(mybucketname) bucket.set_acl('private') return bucket def handle_progress(transmitted, pending): logger.debug("send_file: %i of %i bytes transmitted (%.2f%%)", transmitted, pending, (transmitted/float(pending))*100) def verify_file(bucket, filename): "Returns True if the file size and md5sum match, False otherwise" basefilename = os.path.basename(filename) key = bucket.get_key(basefilename) stat = os.stat(filename) if key: if key.size == stat[6]: fp = open(filename) local_md5 = hashlib.md5(fp.read()) fp.close() logger.debug('verify_file: %s: local md5 "%s", etag %s', filename, local_md5.hexdigest(), key.etag) if '"%s"' % local_md5.hexdigest() == key.etag: return True return False def send_file(bucket, filename): basefilename = os.path.basename(filename) k = Key(bucket) k.key = basefilename if k.exists(): if verify_file(bucket, filename): logger.warning("send_file: %s already exists and is identical, not overwriting", basefilename) return k logger.warning("send_file: %s already exists on S3, overwriting", basefilename) k.set_contents_from_filename(filename, cb=handle_progress, reduced_redundancy=True) logger.debug("send_file: %s sent, verifying fidelity", filename) if not verify_file(bucket, filename): raise VerifyError("verify failed") return k def encryption_worker(in_q, out_q, unlink_q): "Encrypts things from the in_q, puts them in the out_q" start_time = time.time() counter = 0 for filename, gpgkey, comppath in iter(in_q.get, 'STOP'): counter += 1 cryptstart_time = time.time() logger.info("encryption_worker: encrypting %s", filename) result = encrypt_file(filename, gpgkey, comppath) out_q.put(result) unlink_q.put(filename) logger.debug("encryption_worker: encrypted %s in %i seconds", filename, time.time()-cryptstart_time) logger.debug("encryption_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time()-start_time) time.sleep(5) # settle def sending_worker(in_q, out_q, accesskey, sharedkey, host): "Sends things from the in_q using the send_file method" start_time = time.time() counter = 0 for filename in iter(in_q.get, 'STOP'): sending_start = time.time() counter += 1 retry_count = 0 max_retries = 10 done = False while retry_count <= max_retries and not done: try: logger.info("sending_worker: sending %s", filename) bucket = open_s3(accesskey, sharedkey, host) key = send_file(bucket, filename) key.set_acl('private') key.close() done = True except (boto.exception.S3ResponseError, socket.error, VerifyError), e: retry_count += 1 sleeptime = 2**retry_count logger.error('sending_worker: exception %s, retrying in %i seconds (%i/%i)', e, sleeptime, retry_count, max_retries) time.sleep(sleeptime) if not done: # trip out logger.error('sending_worker: could not upload %s in %i retries', filename, retry_count) else: size = os.path.getsize(filename) sending_seconds = time.time() - sending_start bytespersecond = size / sending_seconds logger.debug("sending_worker: sent %s in %i seconds at %i bytes/second.", filename, sending_seconds, bytespersecond) out_q.put(filename) logger.debug("sending_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time() - start_time) time.sleep(5) # settle def unlink_worker(in_q): start_time = time.time() counter = 0 for filename in iter(in_q.get, 'STOP'): counter += 1 logger.debug("unlink_worker: deleting %s", filename) try: os.unlink(filename) except OSError, e: logger.warning("unlink_worker: caught exception: %s", e) logger.debug("unlink_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time() - start_time) time.sleep(5) # settle if __name__ == '__main__': # Read in arguments, verify that they match the BackupPC standard exactly if len(sys.argv) != 12: sys.stderr.write("Usage: %s tarCreatePath splitPath parPath host bkupNum compPath fileExt splitSize outLoc parFile share\n" % sys.argv[0]) sys.exit(1) else: tarCreate = sys.argv[1] splitPath = sys.argv[2] parPath = sys.argv[3] host = sys.argv[4] bkupNum = int(sys.argv[5]) compPath = sys.argv[6] fileExt = sys.argv[7] splitSize = int(sys.argv[8]) outLoc = sys.argv[9] parfile = sys.argv[10] share = sys.argv[11] for i in [tarCreate, compPath, splitPath, parPath]: if i is not '' and not is_exe(i): sys.stderr.write('Error: %s is not an executable program\n' % i) sys.exit(1) beginning = time.time() # Create queues for workers gpg_queue = Queue() send_queue = Queue() unlink_queue = Queue() queues = { 'gpg_queue': gpg_queue, 'send_queue': send_queue, 'unlink_queue': unlink_queue, } # Is there already evidence of this having been done before? if glob.glob('%s/%s.*.tar.*' % (outLoc, host)): logger.warning('main: finishing previous incomplete run') somefile = os.path.basename(glob.glob('%s/%s.*.tar.*' % (outLoc, host))[0]) keyparts = somefile.split('.') encrypted = split = tarred = final = False if keyparts[-1] == 'gpg': keyparts.pop() if keyparts[-1] != 'tar' and len(keyparts[-1]) is 2: keyparts.pop() if keyparts[-1] == 'tar': keyparts.pop() bkupNum = int(keyparts.pop()) filehead = '%s/%s.%i.tar.' % (outLoc, host, bkupNum) fileglob = filehead + '*' mesg = "Continuing upload for host %s, backup #%i" % (host, bkupNum) if splitSize > 0 and is_exe(splitPath): mesg += ', split into %i byte chunks' % splitSize if secrets.gpgsymmetrickey: mesg += ', encrypted with secret key' logger.info("main: %s", mesg) else: mesg = "Writing archive for host %s, backup #%i" % (host, bkupNum) tarcmd = [tarCreate, '-t'] tarcmd.extend(['-h', host]) tarcmd.extend(['-n', str(bkupNum)]) tarcmd.extend(['-s', share]) tarcmd.extend(['.']) splitcmd = None outfile = '%s/%s.%i.tar' % (outLoc, host, bkupNum) if splitSize > 0 and is_exe(splitPath): filehead = outfile + '.' fileglob = filehead + '*' splitcmd = [splitPath, '-b', str(splitSize), '-', filehead] mesg += ', split into %i byte chunks' % splitSize else: fileglob = outfile filehead = fileglob + '.' if secrets.gpgsymmetrickey: mesg += ', encrypted with secret key' logger.info("main: %s", mesg) logger.debug("main: executing tarcmd: %s > %s", ' '.join(tarcmd), outfile) tarfp = open(outfile, 'wb') proc = Popen(tarcmd, preexec_fn=lambda : os.nice(10), stdout=tarfp) proc.communicate() tarfp.close() if splitcmd: logger.debug("main: executing splitcmd: %s", ' '.join(splitcmd)) tarfp = open(outfile, 'rb') proc = Popen(splitcmd, preexec_fn=lambda : os.nice(10), stdin=tarfp) proc.communicate() tarfp.close() unlink_queue.put(outfile) logger.info("main: dumped %i files from %s #%i" % (len(glob.glob(fileglob)), host, bkupNum)) # Pre-run to check for artifacts for i in glob.glob(fileglob): if not i.endswith('.gpg') and os.path.exists(i + '.gpg'): logger.warning("main: orphaned GPG file being deleted: %s", i + '.gpg') os.unlink(i + '.gpg') # Run again to send files to the relevant queue for i in sorted(glob.glob(fileglob)): if (secrets.gpgsymmetrickey and not i.endswith('.gpg') and not i.endswith('.COMPLETE')): # A tar file, unencrypted, needs encrypted. logger.debug("main: adding %s to gpg_queue", i) gpg_queue.put([i, secrets.gpgsymmetrickey, compPath]) else: # either encryption is off, or the file is already encrypted logger.debug("main: adding %s to send_queue", i) send_queue.put(i) # Start some handlers, wait until everything is done try: process_count = cpu_count() except NotImplementedError: process_count = 1 procs = [] for i in range(process_count): p = Process(name="encryption_worker_%i" % i, target=encryption_worker, args=(gpg_queue, send_queue, unlink_queue)) p.start() procs.append(p) send_p = Process(name="send_worker", target=sending_worker, args=(send_queue, unlink_queue, secrets.accesskey, secrets.sharedkey, host)) send_p.start() procs.append(send_p) unlink_p = Process(name="unlink_worker", target=unlink_worker, args=(unlink_queue,)) unlink_p.start() procs.append(unlink_p) send_queue_closed = False unlink_queue_closed = False # Put STOP command(s) at the end of the GPG queue. gpg_queue_closed = True for i in range(process_count): gpg_queue.put('STOP') for i in procs: # wait for each process to terminate in turn i.join() logger.debug("main: process terminated: %s", i.name) # count how many crypto processes are still running crypto_running = 0 for j in procs: if j.name.startswith("encryption_worker") and j.is_alive(): crypto_running += 1 if crypto_running == 0 and not send_queue_closed: # crypto is done, close up the send queue logger.debug("main: queuing final file") finalfile = '%sCOMPLETE' % filehead fp = open(finalfile, 'w') fp.write('%s %s "%s"' % (beginning, time.time(), mesg)) fp.close() send_queue.put(finalfile) logger.debug("main: queuing stop sentinel for send_queue") send_queue.put('STOP') send_queue_closed = True if not send_p.is_alive() and not unlink_queue_closed: # sending is done, close up the unlink queue logger.debug("main: queuing stop sentinel for unlink_queue") unlink_queue.put('STOP') unlink_queue_closed = True for qname, q in queues.items(): time.sleep(5) # settle if not q.empty(): logger.critical("main: queue %s not empty!", qname) raise Exception("queue not empty: %s" % qname) else: logger.debug("main: queue %s is empty", qname) logger.info("main: completed run after %i seconds", (time.time() - beginning))