Adapt to use a multiprocessing queue

This will let it GPG things in the background while S3ing things in the
foreground, hopefully helping throughput!
This commit is contained in:
Ryan Tucker 2011-09-20 22:24:56 -04:00
parent 3af02240a4
commit 7fa6dd0115

View file

@ -22,6 +22,7 @@ import socket
import sys import sys
import time import time
from multiprocessing import Process, Queue, cpu_count
from subprocess import * from subprocess import *
from boto.s3.connection import S3Connection from boto.s3.connection import S3Connection
@ -210,32 +211,71 @@ def open_s3(accesskey, sharedkey, host):
try: try:
bucket = conn.get_bucket(mybucketname) bucket = conn.get_bucket(mybucketname)
except boto.exception.S3ResponseError: except boto.exception.S3ResponseError:
logging.info('Creating bucket %s' % mybucketname) logger.info('Creating bucket %s' % mybucketname)
bucket = conn.create_bucket(mybucketname) bucket = conn.create_bucket(mybucketname)
bucket.set_acl('private') bucket.set_acl('private')
return bucket return bucket
def handle_progress(transmitted, pending): def handle_progress(transmitted, pending):
logging.debug('%i of %i bytes transmitted (%.2f%%)' % (transmitted, pending, (transmitted/float(pending))*100)) logger.debug('%i of %i bytes transmitted (%.2f%%)' % (transmitted, pending, (transmitted/float(pending))*100))
def send_file(bucket, filename, cmd, mesg): def send_file(bucket, filename):
basefilename = os.path.basename(filename) basefilename = os.path.basename(filename)
if bucket.get_key(basefilename): if bucket.get_key(basefilename):
logging.error('Duplicate filename %s! I hope that is OK.' % basefilename) logger.error('Duplicate filename %s! I hope that is OK.' % basefilename)
k = MyKey(bucket) k = Key(bucket)
k.key = basefilename k.key = basefilename
if cmd: k.set_metadata('backuppc-cmd', cmd) logger.info('Uploading %s...' % basefilename)
if mesg: k.set_metadata('backuppc-mesg', mesg) fd = open(name=filename, mode='rb')
logging.info('Uploading %s...' % basefilename)
fd = SlowFile(name=filename, mode='rb')
putHeaders = {'x-amz-storage-class': 'REDUCED_REDUNDANCY'} putHeaders = {'x-amz-storage-class': 'REDUCED_REDUNDANCY'}
k.set_contents_from_file(fd, headers=putHeaders, cb=handle_progress) k.set_contents_from_file(fd, headers=putHeaders, cb=handle_progress)
return k return k
def encryption_worker(in_q, out_q):
"Encrypts things from the in_q, puts them in the out_q"
start_time = time.time()
counter = 0
for filename, gpgkey, comppath in iter(in_q.get, 'STOP'):
counter += 1
cryptstart_time = time.time()
logger.info("Beginning encryption of %s.", filename)
result = encrypt_file(filename, gpgkey, comppath)
out_q.put(result)
logger.info("Finished encryption of %s in %i seconds.", filename, time.time()-cryptstart_time)
logger.info("Encryption worker dying after %i tasks in %i seconds.", counter, time.time()-start_time)
out_q.put('STOP')
def sending_worker(in_q, accesskey, sharedkey, host):
"Sends things from the in_q using the send_file method"
start_time = time.time()
counter = 0
bucket = open_s3(accesskey, sharedkey, host)
for filename in iter(in_q.get, 'STOP'):
sending_start = time.time()
counter += 1
retry_count = 0
max_retries = 10
while retry_count <= max_retries:
try:
key = send_file(bucket, filename)
key.set_acl('private')
key.close()
retry_count = max_retries+1
except (boto.exception.S3ResponseError, socket.error), e:
retry_count += 1
sleeptime = 2**retry_count
logger.error('Encountered exception %s, retrying in %i seconds (%i/%i)', e, sleeptime, retry_count, max_retries)
time.sleep(sleeptime)
size = os.path.getsize(filename)
os.unlink(filename)
sending_seconds = time.time() - sending_start
bytespersecond = size / sending_seconds
logger.info("Finished sending of %s in %i seconds, transfer speed %i bytes/second.", filename, sending_seconds, bytespersecond)
if __name__ == '__main__': if __name__ == '__main__':
# Read in arguments # Read in arguments
if len(sys.argv) != 12: if len(sys.argv) != 12:
@ -260,9 +300,6 @@ if __name__ == '__main__':
sys.stderr.write('Error: %s is not an executable program\n' % i) sys.stderr.write('Error: %s is not an executable program\n' % i)
sys.exit(1) sys.exit(1)
# open s3 connection
bucket = open_s3(secrets.accesskey, secrets.sharedkey, host)
beginning = time.time() beginning = time.time()
mesg = "Writing archive for host %s, backup #%i" % (host, bkupNum) mesg = "Writing archive for host %s, backup #%i" % (host, bkupNum)
@ -326,51 +363,54 @@ if __name__ == '__main__':
logger.error('%s died with exit code %i' % (cmd, returncode)) logger.error('%s died with exit code %i' % (cmd, returncode))
sys.exit(1) sys.exit(1)
logging.info('Beginning post-processing of %i files from %s #%i' % (len(glob.glob(fileglob)), host, bkupNum)) logger.info('Beginning post-processing of %i files from %s #%i' % (len(glob.glob(fileglob)), host, bkupNum))
# Create queues for handling encryption and file transfers
gpg_queue = Queue()
send_queue = Queue()
for i in sorted(glob.glob(fileglob)): for i in sorted(glob.glob(fileglob)):
sending_start = time.time()
if secrets.gpgsymmetrickey and not i.endswith('.gpg'): if secrets.gpgsymmetrickey and not i.endswith('.gpg'):
sendfile = encrypt_file(i, secrets.gpgsymmetrickey, compPath) logger.debug("Adding %s to gpg_queue", i)
gpg_queue.put([i, secrets.gpgsymmetrickey, compPath])
else: else:
# either encryption is off, or the file is already encrypted # either encryption is off, or the file is already encrypted
sendfile = i logger.debug("Adding %s to send_queue", i)
encrypt_seconds = time.time() - sending_start send_queue.put(i)
# create some output so backuppc doesn't time out # Put a STOP command at the end of the GPG queue.
sys.stdout.write("%s: Sending %s to S3...\n" % (time.strftime('%d-%H:%M:%S'), sendfile)) gpg_queue.put('STOP')
# Start some handlers, wait until everything is done
try:
process_count = cpu_count()
except NotImplementedError:
process_count = 1
for i in range(process_count):
Process(target=encryption_worker, args=(gpg_queue, send_queue,)).start()
send_p = Process(target=sending_worker, args=(send_queue, secrets.accesskey, secrets.sharedkey, host))
send_p.start()
while send_p.is_alive():
# Generate some output so that BackupPC doesn't time out
mesg = "Checkpoint: %i task(s) in GPG queue, %i task(s) in S3 queue.\n" % (gpg_queue.qsize(), send_queue.qsize())
logger.info(mesg)
sys.stdout.write(mesg + '\n')
sys.stdout.flush() sys.stdout.flush()
send_p.join(300)
retry_count = 0 if not gpg_queue.empty():
max_retries = 10 raise Exception("GPG queue not empty")
if not send_queue.empty():
raise Exception("Send queue not empty")
while retry_count <= max_retries: sys.stdout.write("Reached end of queue! Finalizing backup.\n")
try: sys.stdout.flush()
key = send_file(bucket, sendfile, cmd, mesg)
key.set_acl('private')
key.close()
retry_count = max_retries+1
except (boto.exception.S3ResponseError, socket.error), e:
retry_count += 1
sleeptime = 2**retry_count
err = 'Encountered exception %s, retrying in %i seconds (%i/%i)' % (e, sleeptime, retry_count, max_retries)
logger.error(err)
sys.stdout.write(time.strftime('%d-%H:%M:%S') + ': ' + err + '\n')
sys.stdout.flush()
time.sleep(sleeptime)
size = os.path.getsize(sendfile)
os.unlink(sendfile)
sending_seconds = time.time() - sending_start
bytespersecond = size / (sending_seconds - encrypt_seconds)
sys.stdout.write('%s: File sent. Total time %i seconds, crypto time %i seconds, transfer speed %i bytes/second.\n' % (time.strftime('%d-%H:%M:%S'), sending_seconds, encrypt_seconds, bytespersecond))
sys.stdout.flush()
# finalize the backup # finalize the backup
bucket = open_s3(secrets.accesskey, secrets.sharedkey, host)
key = MyKey(bucket) key = MyKey(bucket)
key.key = '%sCOMPLETE' % os.path.basename(filehead) key.key = '%sCOMPLETE' % os.path.basename(filehead)
key.set_contents_from_string('%s %s "%s"' % (beginning, time.time(), mesg)) key.set_contents_from_string('%s %s "%s"' % (beginning, time.time(), mesg))