From e4fd5ca8ac183b59732fc050cd437cb922c65b56 Mon Sep 17 00:00:00 2001 From: Ryan Tucker Date: Thu, 22 Sep 2011 10:43:28 -0400 Subject: [PATCH] Fix multiprocessing bugs, add unlink_worker This is still under some live development, for sure :-) Among other minor (yet significant) fixes, I've added another worker process to handle unlinking. This verifies that the file was transmitted properly (or at least with the right md5sum) before unlinking it. --- BackupPC_archiveHost_s3 | 115 ++++++++++++++++++++++++++++++---------- 1 file changed, 88 insertions(+), 27 deletions(-) diff --git a/BackupPC_archiveHost_s3 b/BackupPC_archiveHost_s3 index 54de70e..1b427d9 100755 --- a/BackupPC_archiveHost_s3 +++ b/BackupPC_archiveHost_s3 @@ -118,40 +118,72 @@ def encryption_worker(in_q, out_q): logger.info("Beginning encryption of %s.", filename) result = encrypt_file(filename, gpgkey, comppath) out_q.put(result) - logger.info("Finished encryption of %s in %i seconds.", filename, time.time()-cryptstart_time) - logger.info("Finished processing %i items in encryption queue in %i seconds.", counter, time.time()-start_time) - out_q.put('STOP') + logger.debug("encryption_worker: encrypted %s in %i seconds", filename, time.time()-cryptstart_time) + logger.debug("encryption_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time()-start_time) -def sending_worker(in_q, accesskey, sharedkey, host): +def sending_worker(in_q, out_q, accesskey, sharedkey, host): "Sends things from the in_q using the send_file method" start_time = time.time() counter = 0 - bucket = open_s3(accesskey, sharedkey, host) for filename in iter(in_q.get, 'STOP'): sending_start = time.time() counter += 1 retry_count = 0 max_retries = 10 + done = False - while retry_count <= max_retries: + while retry_count <= max_retries and not done: try: + logger.info("sending_worker: sending %s", filename) + bucket = open_s3(accesskey, sharedkey, host) key = send_file(bucket, filename) key.set_acl('private') key.close() - retry_count = max_retries+1 + done = True except (boto.exception.S3ResponseError, socket.error), e: retry_count += 1 sleeptime = 2**retry_count logger.error('Encountered exception %s, retrying in %i seconds (%i/%i)', e, sleeptime, retry_count, max_retries) time.sleep(sleeptime) - size = os.path.getsize(filename) - os.unlink(filename) - sending_seconds = time.time() - sending_start + if not done: + # trip out + logger.error('sending_worker: could not upload %s in %i retries') + else: + size = os.path.getsize(filename) + sending_seconds = time.time() - sending_start + bytespersecond = size / sending_seconds + logger.debug("sending_worker: sent %s in %i seconds at %i bytes/second.", filename, sending_seconds, bytespersecond) + out_q.put(filename) - bytespersecond = size / sending_seconds + logger.debug("sending_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time() - start_time) + out_q.put('STOP') - logger.info("Finished sending of %s in %i seconds, transfer speed %i bytes/second.", filename, sending_seconds, bytespersecond) +def unlink_worker(in_q, accesskey, sharedkey, host): + start_time = time.time() + counter = 0 + bucket = open_s3(accesskey, sharedkey, host) + for filename in iter(in_q.get, 'STOP'): + counter += 1 + basefilename = os.path.basename(filename) + key = bucket.get_key(basefilename) + stat = os.stat(filename) + if key: + if key.size == stat[6]: + fp = open(filename) + local_md5 = hashlib.md5(fp.read()) + fp.close() + if '"%s"' % local_md5.hexdigest() == key.etag: + logger.debug("unlink_worker: deleting %s", basefilename) + os.unlink(filename) + else: + logger.error("unlink_worker: md5sum for %s did not match: %s != %s", basefilename, '"%s"' % local_md5.hexdigest(), key.etag) + else: + logger.error("unlink_worker: size mismatch for %s: %i != %i", basefilename, stat[6], key.size) + else: + logger.error("unlink_worker: key does not exist: %s", basefilename) + + logger.debug("unlink_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time() - start_time) if __name__ == '__main__': # Read in arguments, verify that they match the BackupPC standard exactly @@ -249,6 +281,13 @@ if __name__ == '__main__': # Create queues for handling encryption and file transfers gpg_queue = Queue() send_queue = Queue() + unlink_queue = Queue() + + queues = { + 'gpg_queue': gpg_queue, + 'send_queue': send_queue, + 'unlink_queue': unlink_queue, + } # Pre-run to check for artifacts for i in glob.glob(fileglob): @@ -257,7 +296,7 @@ if __name__ == '__main__': os.unlink(i + '.gpg') # Run again to send files to the relevant queue - for i in glob.glob(fileglob): + for i in sorted(glob.glob(fileglob)): if secrets.gpgsymmetrickey and not i.endswith('.gpg'): # A tar file, unencrypted, needs encrypted. logger.debug("Adding %s to gpg_queue", i) @@ -269,6 +308,7 @@ if __name__ == '__main__': # Put a STOP command at the end of the GPG queue. gpg_queue.put('STOP') + gpg_queue_closed = True # Start some handlers, wait until everything is done try: @@ -276,30 +316,51 @@ if __name__ == '__main__': except NotImplementedError: process_count = 1 + procs = [] + for i in range(process_count): - Process(target=encryption_worker, args=(gpg_queue, send_queue,)).start() + p = Process(name="encryption_worker_%i" % i, target=encryption_worker, args=(gpg_queue, send_queue,)) + p.start() + procs.append(p) - send_p = Process(target=sending_worker, args=(send_queue, secrets.accesskey, secrets.sharedkey, host)) + send_p = Process(name="send_worker", target=sending_worker, args=(send_queue, unlink_queue, secrets.accesskey, secrets.sharedkey, host)) send_p.start() + procs.append(send_p) - while send_p.is_alive(): - # Generate some output so that BackupPC doesn't time out - mesg = "Checkpoint: %i task(s) in GPG queue, %i task(s) in S3 queue.\n" % (gpg_queue.qsize(), send_queue.qsize()) - logger.info(mesg) - sys.stdout.write(mesg + '\n') - sys.stdout.flush() - send_p.join(300) + unlink_p = Process(name="unlink_worker", target=unlink_worker, args=(unlink_queue, secrets.accesskey, secrets.sharedkey, host)) + unlink_p.start() + procs.append(unlink_p) - if not gpg_queue.empty(): - raise Exception("GPG queue not empty") - if not send_queue.empty(): - raise Exception("Send queue not empty") + send_queue_closed = False + unlink_queue_closed = False - logger.info("Finalizing backup.") + for i in procs: + i.join() + crypto_running = 0 + for j in procs: + if j.name.startswith("encryption_worker") and j.is_alive(): + crypto_running += 1 + if crypto_running == 0 and not send_queue_closed: + send_queue.put('STOP') + send_queue_closed = True + logger.debug("main: queuing stop sentinel for send_queue") + if not send_p.is_alive() and not unlink_queue_closed: + unlink_queue.put('STOP') + unlink_queue_closed = True + logger.debug("main: queuing stop sentinel for unlink_queue") + logger.debug("main: process terminated: %s", i.name) + + for qname, q in queues.items(): + if not q.empty(): + logger.critical("main: queue %s not empty!", qname) + raise Exception("queue not empty: %s" % qname) + + logger.debug("main: finalizing backup") # finalize the backup bucket = open_s3(secrets.accesskey, secrets.sharedkey, host) key = Key(bucket) key.key = '%sCOMPLETE' % os.path.basename(filehead) + key.set_acl('private') key.set_contents_from_string('%s %s "%s"' % (beginning, time.time(), mesg)) key.close()