From d0ea15beffe07c23e72408a3e2feb42342e2820c Mon Sep 17 00:00:00 2001 From: Ryan Tucker Date: Sat, 24 Sep 2011 18:27:42 -0400 Subject: [PATCH] Move unlinks to unlink_worker; handle OSErrors --- BackupPC_archiveHost_s3 | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/BackupPC_archiveHost_s3 b/BackupPC_archiveHost_s3 index cd081f4..be2febd 100755 --- a/BackupPC_archiveHost_s3 +++ b/BackupPC_archiveHost_s3 @@ -78,7 +78,6 @@ def encrypt_file(filename, key, compress='/bin/cat'): newfilesize = os.path.getsize(filename + '.gpg') compressed = ((oldfilesize - newfilesize) / float(oldfilesize)) * 100 logger.debug('encrypt_file: %s %s by %.2f%% (%i -> %i bytes)' % (filename, 'shrunk' if oldfilesize>newfilesize else 'grew', compressed, oldfilesize, newfilesize)) - os.unlink(filename) return filename + '.gpg' else: raise RuntimeError('output file does not exist: %s.gpg' % filename) @@ -107,7 +106,7 @@ def verify_file(bucket, filename): fp = open(filename) local_md5 = hashlib.md5(fp.read()) fp.close() - logger.debug("verify_file: %s: local md5 %s, etag %s", filename, local_md5.hexdigest(), key.etag) + logger.debug('verify_file: %s: local md5 "%s", etag %s', filename, local_md5.hexdigest(), key.etag) if '"%s"' % local_md5.hexdigest() == key.etag: return True return False @@ -130,7 +129,7 @@ def send_file(bucket, filename): raise VerifyError("verify failed") return k -def encryption_worker(in_q, out_q): +def encryption_worker(in_q, out_q, unlink_q): "Encrypts things from the in_q, puts them in the out_q" start_time = time.time() counter = 0 @@ -140,6 +139,7 @@ def encryption_worker(in_q, out_q): logger.info("encryption_worker: encrypting %s", filename) result = encrypt_file(filename, gpgkey, comppath) out_q.put(result) + unlink_q.put(filename) logger.debug("encryption_worker: encrypted %s in %i seconds", filename, time.time()-cryptstart_time) logger.debug("encryption_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time()-start_time) time.sleep(5) # settle @@ -182,14 +182,16 @@ def sending_worker(in_q, out_q, accesskey, sharedkey, host): logger.debug("sending_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time() - start_time) time.sleep(5) # settle -def unlink_worker(in_q, accesskey, sharedkey, host): +def unlink_worker(in_q): start_time = time.time() counter = 0 - bucket = open_s3(accesskey, sharedkey, host) for filename in iter(in_q.get, 'STOP'): counter += 1 logger.debug("unlink_worker: deleting %s", filename) - os.unlink(filename) + try: + os.unlink(filename) + except OSError, e: + logger.warning("unlink_worker: caught exception: %s", e) logger.debug("unlink_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time() - start_time) time.sleep(5) # settle @@ -292,7 +294,7 @@ if __name__ == '__main__': proc = Popen(splitcmd, preexec_fn=lambda : os.nice(10), stdin=tarfp) proc.communicate() tarfp.close() - os.unlink(outfile) + unlink_queue.put(outfile) logger.info("main: dumped %i files from %s #%i" % (len(glob.glob(fileglob)), host, bkupNum)) @@ -322,7 +324,7 @@ if __name__ == '__main__': procs = [] for i in range(process_count): - p = Process(name="encryption_worker_%i" % i, target=encryption_worker, args=(gpg_queue, send_queue,)) + p = Process(name="encryption_worker_%i" % i, target=encryption_worker, args=(gpg_queue, send_queue, unlink_queue)) p.start() procs.append(p) @@ -330,7 +332,7 @@ if __name__ == '__main__': send_p.start() procs.append(send_p) - unlink_p = Process(name="unlink_worker", target=unlink_worker, args=(unlink_queue, secrets.accesskey, secrets.sharedkey, host)) + unlink_p = Process(name="unlink_worker", target=unlink_worker, args=(unlink_queue)) unlink_p.start() procs.append(unlink_p)