diff --git a/BackupPC_archiveHost_s3 b/BackupPC_archiveHost_s3 index 54de70e..1b427d9 100755 --- a/BackupPC_archiveHost_s3 +++ b/BackupPC_archiveHost_s3 @@ -118,40 +118,72 @@ def encryption_worker(in_q, out_q): logger.info("Beginning encryption of %s.", filename) result = encrypt_file(filename, gpgkey, comppath) out_q.put(result) - logger.info("Finished encryption of %s in %i seconds.", filename, time.time()-cryptstart_time) - logger.info("Finished processing %i items in encryption queue in %i seconds.", counter, time.time()-start_time) - out_q.put('STOP') + logger.debug("encryption_worker: encrypted %s in %i seconds", filename, time.time()-cryptstart_time) + logger.debug("encryption_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time()-start_time) -def sending_worker(in_q, accesskey, sharedkey, host): +def sending_worker(in_q, out_q, accesskey, sharedkey, host): "Sends things from the in_q using the send_file method" start_time = time.time() counter = 0 - bucket = open_s3(accesskey, sharedkey, host) for filename in iter(in_q.get, 'STOP'): sending_start = time.time() counter += 1 retry_count = 0 max_retries = 10 + done = False - while retry_count <= max_retries: + while retry_count <= max_retries and not done: try: + logger.info("sending_worker: sending %s", filename) + bucket = open_s3(accesskey, sharedkey, host) key = send_file(bucket, filename) key.set_acl('private') key.close() - retry_count = max_retries+1 + done = True except (boto.exception.S3ResponseError, socket.error), e: retry_count += 1 sleeptime = 2**retry_count logger.error('Encountered exception %s, retrying in %i seconds (%i/%i)', e, sleeptime, retry_count, max_retries) time.sleep(sleeptime) - size = os.path.getsize(filename) - os.unlink(filename) - sending_seconds = time.time() - sending_start + if not done: + # trip out + logger.error('sending_worker: could not upload %s in %i retries') + else: + size = os.path.getsize(filename) + sending_seconds = time.time() - sending_start + bytespersecond = size / sending_seconds + logger.debug("sending_worker: sent %s in %i seconds at %i bytes/second.", filename, sending_seconds, bytespersecond) + out_q.put(filename) - bytespersecond = size / sending_seconds + logger.debug("sending_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time() - start_time) + out_q.put('STOP') - logger.info("Finished sending of %s in %i seconds, transfer speed %i bytes/second.", filename, sending_seconds, bytespersecond) +def unlink_worker(in_q, accesskey, sharedkey, host): + start_time = time.time() + counter = 0 + bucket = open_s3(accesskey, sharedkey, host) + for filename in iter(in_q.get, 'STOP'): + counter += 1 + basefilename = os.path.basename(filename) + key = bucket.get_key(basefilename) + stat = os.stat(filename) + if key: + if key.size == stat[6]: + fp = open(filename) + local_md5 = hashlib.md5(fp.read()) + fp.close() + if '"%s"' % local_md5.hexdigest() == key.etag: + logger.debug("unlink_worker: deleting %s", basefilename) + os.unlink(filename) + else: + logger.error("unlink_worker: md5sum for %s did not match: %s != %s", basefilename, '"%s"' % local_md5.hexdigest(), key.etag) + else: + logger.error("unlink_worker: size mismatch for %s: %i != %i", basefilename, stat[6], key.size) + else: + logger.error("unlink_worker: key does not exist: %s", basefilename) + + logger.debug("unlink_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time() - start_time) if __name__ == '__main__': # Read in arguments, verify that they match the BackupPC standard exactly @@ -249,6 +281,13 @@ if __name__ == '__main__': # Create queues for handling encryption and file transfers gpg_queue = Queue() send_queue = Queue() + unlink_queue = Queue() + + queues = { + 'gpg_queue': gpg_queue, + 'send_queue': send_queue, + 'unlink_queue': unlink_queue, + } # Pre-run to check for artifacts for i in glob.glob(fileglob): @@ -257,7 +296,7 @@ if __name__ == '__main__': os.unlink(i + '.gpg') # Run again to send files to the relevant queue - for i in glob.glob(fileglob): + for i in sorted(glob.glob(fileglob)): if secrets.gpgsymmetrickey and not i.endswith('.gpg'): # A tar file, unencrypted, needs encrypted. logger.debug("Adding %s to gpg_queue", i) @@ -269,6 +308,7 @@ if __name__ == '__main__': # Put a STOP command at the end of the GPG queue. gpg_queue.put('STOP') + gpg_queue_closed = True # Start some handlers, wait until everything is done try: @@ -276,30 +316,51 @@ if __name__ == '__main__': except NotImplementedError: process_count = 1 + procs = [] + for i in range(process_count): - Process(target=encryption_worker, args=(gpg_queue, send_queue,)).start() + p = Process(name="encryption_worker_%i" % i, target=encryption_worker, args=(gpg_queue, send_queue,)) + p.start() + procs.append(p) - send_p = Process(target=sending_worker, args=(send_queue, secrets.accesskey, secrets.sharedkey, host)) + send_p = Process(name="send_worker", target=sending_worker, args=(send_queue, unlink_queue, secrets.accesskey, secrets.sharedkey, host)) send_p.start() + procs.append(send_p) - while send_p.is_alive(): - # Generate some output so that BackupPC doesn't time out - mesg = "Checkpoint: %i task(s) in GPG queue, %i task(s) in S3 queue.\n" % (gpg_queue.qsize(), send_queue.qsize()) - logger.info(mesg) - sys.stdout.write(mesg + '\n') - sys.stdout.flush() - send_p.join(300) + unlink_p = Process(name="unlink_worker", target=unlink_worker, args=(unlink_queue, secrets.accesskey, secrets.sharedkey, host)) + unlink_p.start() + procs.append(unlink_p) - if not gpg_queue.empty(): - raise Exception("GPG queue not empty") - if not send_queue.empty(): - raise Exception("Send queue not empty") + send_queue_closed = False + unlink_queue_closed = False - logger.info("Finalizing backup.") + for i in procs: + i.join() + crypto_running = 0 + for j in procs: + if j.name.startswith("encryption_worker") and j.is_alive(): + crypto_running += 1 + if crypto_running == 0 and not send_queue_closed: + send_queue.put('STOP') + send_queue_closed = True + logger.debug("main: queuing stop sentinel for send_queue") + if not send_p.is_alive() and not unlink_queue_closed: + unlink_queue.put('STOP') + unlink_queue_closed = True + logger.debug("main: queuing stop sentinel for unlink_queue") + logger.debug("main: process terminated: %s", i.name) + + for qname, q in queues.items(): + if not q.empty(): + logger.critical("main: queue %s not empty!", qname) + raise Exception("queue not empty: %s" % qname) + + logger.debug("main: finalizing backup") # finalize the backup bucket = open_s3(secrets.accesskey, secrets.sharedkey, host) key = Key(bucket) key.key = '%sCOMPLETE' % os.path.basename(filehead) + key.set_acl('private') key.set_contents_from_string('%s %s "%s"' % (beginning, time.time(), mesg)) key.close()