From 7fa6dd0115fb11c8231be84b8debaec93cc44bc1 Mon Sep 17 00:00:00 2001 From: Ryan Tucker Date: Tue, 20 Sep 2011 22:24:56 -0400 Subject: [PATCH] Adapt to use a multiprocessing queue This will let it GPG things in the background while S3ing things in the foreground, hopefully helping throughput! --- BackupPC_archiveHost_s3 | 140 ++++++++++++++++++++++++++-------------- 1 file changed, 90 insertions(+), 50 deletions(-) diff --git a/BackupPC_archiveHost_s3 b/BackupPC_archiveHost_s3 index 6302e07..1090b88 100755 --- a/BackupPC_archiveHost_s3 +++ b/BackupPC_archiveHost_s3 @@ -22,6 +22,7 @@ import socket import sys import time +from multiprocessing import Process, Queue, cpu_count from subprocess import * from boto.s3.connection import S3Connection @@ -210,32 +211,71 @@ def open_s3(accesskey, sharedkey, host): try: bucket = conn.get_bucket(mybucketname) except boto.exception.S3ResponseError: - logging.info('Creating bucket %s' % mybucketname) + logger.info('Creating bucket %s' % mybucketname) bucket = conn.create_bucket(mybucketname) bucket.set_acl('private') return bucket def handle_progress(transmitted, pending): - logging.debug('%i of %i bytes transmitted (%.2f%%)' % (transmitted, pending, (transmitted/float(pending))*100)) + logger.debug('%i of %i bytes transmitted (%.2f%%)' % (transmitted, pending, (transmitted/float(pending))*100)) -def send_file(bucket, filename, cmd, mesg): +def send_file(bucket, filename): basefilename = os.path.basename(filename) if bucket.get_key(basefilename): - logging.error('Duplicate filename %s! I hope that is OK.' % basefilename) - k = MyKey(bucket) + logger.error('Duplicate filename %s! I hope that is OK.' % basefilename) + k = Key(bucket) k.key = basefilename - if cmd: k.set_metadata('backuppc-cmd', cmd) - if mesg: k.set_metadata('backuppc-mesg', mesg) - logging.info('Uploading %s...' % basefilename) - - fd = SlowFile(name=filename, mode='rb') - + logger.info('Uploading %s...' % basefilename) + fd = open(name=filename, mode='rb') putHeaders = {'x-amz-storage-class': 'REDUCED_REDUNDANCY'} - k.set_contents_from_file(fd, headers=putHeaders, cb=handle_progress) - return k +def encryption_worker(in_q, out_q): + "Encrypts things from the in_q, puts them in the out_q" + start_time = time.time() + counter = 0 + for filename, gpgkey, comppath in iter(in_q.get, 'STOP'): + counter += 1 + cryptstart_time = time.time() + logger.info("Beginning encryption of %s.", filename) + result = encrypt_file(filename, gpgkey, comppath) + out_q.put(result) + logger.info("Finished encryption of %s in %i seconds.", filename, time.time()-cryptstart_time) + logger.info("Encryption worker dying after %i tasks in %i seconds.", counter, time.time()-start_time) + out_q.put('STOP') + +def sending_worker(in_q, accesskey, sharedkey, host): + "Sends things from the in_q using the send_file method" + start_time = time.time() + counter = 0 + bucket = open_s3(accesskey, sharedkey, host) + for filename in iter(in_q.get, 'STOP'): + sending_start = time.time() + counter += 1 + retry_count = 0 + max_retries = 10 + + while retry_count <= max_retries: + try: + key = send_file(bucket, filename) + key.set_acl('private') + key.close() + retry_count = max_retries+1 + except (boto.exception.S3ResponseError, socket.error), e: + retry_count += 1 + sleeptime = 2**retry_count + logger.error('Encountered exception %s, retrying in %i seconds (%i/%i)', e, sleeptime, retry_count, max_retries) + time.sleep(sleeptime) + + size = os.path.getsize(filename) + os.unlink(filename) + sending_seconds = time.time() - sending_start + + bytespersecond = size / sending_seconds + + logger.info("Finished sending of %s in %i seconds, transfer speed %i bytes/second.", filename, sending_seconds, bytespersecond) + if __name__ == '__main__': # Read in arguments if len(sys.argv) != 12: @@ -260,9 +300,6 @@ if __name__ == '__main__': sys.stderr.write('Error: %s is not an executable program\n' % i) sys.exit(1) - # open s3 connection - bucket = open_s3(secrets.accesskey, secrets.sharedkey, host) - beginning = time.time() mesg = "Writing archive for host %s, backup #%i" % (host, bkupNum) @@ -326,51 +363,54 @@ if __name__ == '__main__': logger.error('%s died with exit code %i' % (cmd, returncode)) sys.exit(1) - logging.info('Beginning post-processing of %i files from %s #%i' % (len(glob.glob(fileglob)), host, bkupNum)) + logger.info('Beginning post-processing of %i files from %s #%i' % (len(glob.glob(fileglob)), host, bkupNum)) + + # Create queues for handling encryption and file transfers + gpg_queue = Queue() + send_queue = Queue() for i in sorted(glob.glob(fileglob)): - sending_start = time.time() if secrets.gpgsymmetrickey and not i.endswith('.gpg'): - sendfile = encrypt_file(i, secrets.gpgsymmetrickey, compPath) + logger.debug("Adding %s to gpg_queue", i) + gpg_queue.put([i, secrets.gpgsymmetrickey, compPath]) else: # either encryption is off, or the file is already encrypted - sendfile = i - encrypt_seconds = time.time() - sending_start + logger.debug("Adding %s to send_queue", i) + send_queue.put(i) - # create some output so backuppc doesn't time out - sys.stdout.write("%s: Sending %s to S3...\n" % (time.strftime('%d-%H:%M:%S'), sendfile)) + # Put a STOP command at the end of the GPG queue. + gpg_queue.put('STOP') + + # Start some handlers, wait until everything is done + try: + process_count = cpu_count() + except NotImplementedError: + process_count = 1 + + for i in range(process_count): + Process(target=encryption_worker, args=(gpg_queue, send_queue,)).start() + + send_p = Process(target=sending_worker, args=(send_queue, secrets.accesskey, secrets.sharedkey, host)) + send_p.start() + + while send_p.is_alive(): + # Generate some output so that BackupPC doesn't time out + mesg = "Checkpoint: %i task(s) in GPG queue, %i task(s) in S3 queue.\n" % (gpg_queue.qsize(), send_queue.qsize()) + logger.info(mesg) + sys.stdout.write(mesg + '\n') sys.stdout.flush() + send_p.join(300) - retry_count = 0 - max_retries = 10 + if not gpg_queue.empty(): + raise Exception("GPG queue not empty") + if not send_queue.empty(): + raise Exception("Send queue not empty") - while retry_count <= max_retries: - try: - key = send_file(bucket, sendfile, cmd, mesg) - key.set_acl('private') - key.close() - retry_count = max_retries+1 - except (boto.exception.S3ResponseError, socket.error), e: - retry_count += 1 - sleeptime = 2**retry_count - err = 'Encountered exception %s, retrying in %i seconds (%i/%i)' % (e, sleeptime, retry_count, max_retries) - logger.error(err) - sys.stdout.write(time.strftime('%d-%H:%M:%S') + ': ' + err + '\n') - sys.stdout.flush() - time.sleep(sleeptime) - - size = os.path.getsize(sendfile) - - os.unlink(sendfile) - - sending_seconds = time.time() - sending_start - - bytespersecond = size / (sending_seconds - encrypt_seconds) - - sys.stdout.write('%s: File sent. Total time %i seconds, crypto time %i seconds, transfer speed %i bytes/second.\n' % (time.strftime('%d-%H:%M:%S'), sending_seconds, encrypt_seconds, bytespersecond)) - sys.stdout.flush() + sys.stdout.write("Reached end of queue! Finalizing backup.\n") + sys.stdout.flush() # finalize the backup + bucket = open_s3(secrets.accesskey, secrets.sharedkey, host) key = MyKey(bucket) key.key = '%sCOMPLETE' % os.path.basename(filehead) key.set_contents_from_string('%s %s "%s"' % (beginning, time.time(), mesg))