diff --git a/BackupPC_archiveHost_s3 b/BackupPC_archiveHost_s3 index cfc1926..4872417 100755 --- a/BackupPC_archiveHost_s3 +++ b/BackupPC_archiveHost_s3 @@ -46,6 +46,9 @@ logger.addHandler(consolehandler) logger.setLevel(logging.DEBUG) +class VerifyError(Exception): + pass + def is_exe(fpath): return os.path.exists(fpath) and os.access(fpath, os.X_OK) @@ -104,6 +107,7 @@ def verify_file(bucket, filename): fp = open(filename) local_md5 = hashlib.md5(fp.read()) fp.close() + logger.debug("verify_file: %s: local md5 %s, etag %s", filename, local_md5.hexdigest(), key.etag) if '"%s"' % local_md5.hexdigest() == key.etag: return True return False @@ -120,6 +124,10 @@ def send_file(bucket, filename): logger.warning("send_file: %s already exists on S3, overwriting", basefilename) k.set_contents_from_filename(filename, cb=handle_progress, reduced_redundancy=True) + + logger.debug("send_file: %s sent, verifying fidelity", filename) + if not verify_file(bucket, filename): + raise VerifyError("verify failed") return k def encryption_worker(in_q, out_q): @@ -155,7 +163,7 @@ def sending_worker(in_q, out_q, accesskey, sharedkey, host): key.set_acl('private') key.close() done = True - except (boto.exception.S3ResponseError, socket.error), e: + except (boto.exception.S3ResponseError, socket.error, VerifyError), e: retry_count += 1 sleeptime = 2**retry_count logger.error('sending_worker: exception %s, retrying in %i seconds (%i/%i)', e, sleeptime, retry_count, max_retries) @@ -349,20 +357,34 @@ if __name__ == '__main__': gpg_queue.put('STOP') for i in procs: + # wait for each process to terminate in turn i.join() + logger.debug("main: process terminated: %s", i.name) + + # count how many crypto processes are still running crypto_running = 0 for j in procs: if j.name.startswith("encryption_worker") and j.is_alive(): crypto_running += 1 + if crypto_running == 0 and not send_queue_closed: + # crypto is done, close up the send queue + logger.debug("main: queuing final file") + finalfile = '%sCOMPLETE' % filehead + fp = open(finalfile, 'w') + fp.write('%s %s "%s"' % (beginning, time.time(), mesg)) + fp.close() + send_queue.put(finalfile) + + logger.debug("main: queuing stop sentinel for send_queue") send_queue.put('STOP') send_queue_closed = True - logger.debug("main: queuing stop sentinel for send_queue") + if not send_p.is_alive() and not unlink_queue_closed: + # sending is done, close up the unlink queue + logger.debug("main: queuing stop sentinel for unlink_queue") unlink_queue.put('STOP') unlink_queue_closed = True - logger.debug("main: queuing stop sentinel for unlink_queue") - logger.debug("main: process terminated: %s", i.name) for qname, q in queues.items(): time.sleep(5) # settle @@ -372,14 +394,4 @@ if __name__ == '__main__': else: logger.debug("main: queue %s is empty", qname) - logger.debug("main: finalizing backup") - - # finalize the backup - bucket = open_s3(secrets.accesskey, secrets.sharedkey, host) - key = Key(bucket) - key.key = '%sCOMPLETE' % os.path.basename(filehead) - key.set_acl('private') - key.set_contents_from_string('%s %s "%s"' % (beginning, time.time(), mesg)) - key.close() - logger.info("main: completed run after %i seconds", (time.time() - beginning))