Move unlinks to unlink_worker; handle OSErrors

This commit is contained in:
Ryan Tucker 2011-09-24 18:27:42 -04:00
parent e74e89ede1
commit d0ea15beff

View file

@ -78,7 +78,6 @@ def encrypt_file(filename, key, compress='/bin/cat'):
newfilesize = os.path.getsize(filename + '.gpg') newfilesize = os.path.getsize(filename + '.gpg')
compressed = ((oldfilesize - newfilesize) / float(oldfilesize)) * 100 compressed = ((oldfilesize - newfilesize) / float(oldfilesize)) * 100
logger.debug('encrypt_file: %s %s by %.2f%% (%i -> %i bytes)' % (filename, 'shrunk' if oldfilesize>newfilesize else 'grew', compressed, oldfilesize, newfilesize)) logger.debug('encrypt_file: %s %s by %.2f%% (%i -> %i bytes)' % (filename, 'shrunk' if oldfilesize>newfilesize else 'grew', compressed, oldfilesize, newfilesize))
os.unlink(filename)
return filename + '.gpg' return filename + '.gpg'
else: else:
raise RuntimeError('output file does not exist: %s.gpg' % filename) raise RuntimeError('output file does not exist: %s.gpg' % filename)
@ -107,7 +106,7 @@ def verify_file(bucket, filename):
fp = open(filename) fp = open(filename)
local_md5 = hashlib.md5(fp.read()) local_md5 = hashlib.md5(fp.read())
fp.close() fp.close()
logger.debug("verify_file: %s: local md5 %s, etag %s", filename, local_md5.hexdigest(), key.etag) logger.debug('verify_file: %s: local md5 "%s", etag %s', filename, local_md5.hexdigest(), key.etag)
if '"%s"' % local_md5.hexdigest() == key.etag: if '"%s"' % local_md5.hexdigest() == key.etag:
return True return True
return False return False
@ -130,7 +129,7 @@ def send_file(bucket, filename):
raise VerifyError("verify failed") raise VerifyError("verify failed")
return k return k
def encryption_worker(in_q, out_q): def encryption_worker(in_q, out_q, unlink_q):
"Encrypts things from the in_q, puts them in the out_q" "Encrypts things from the in_q, puts them in the out_q"
start_time = time.time() start_time = time.time()
counter = 0 counter = 0
@ -140,6 +139,7 @@ def encryption_worker(in_q, out_q):
logger.info("encryption_worker: encrypting %s", filename) logger.info("encryption_worker: encrypting %s", filename)
result = encrypt_file(filename, gpgkey, comppath) result = encrypt_file(filename, gpgkey, comppath)
out_q.put(result) out_q.put(result)
unlink_q.put(filename)
logger.debug("encryption_worker: encrypted %s in %i seconds", filename, time.time()-cryptstart_time) logger.debug("encryption_worker: encrypted %s in %i seconds", filename, time.time()-cryptstart_time)
logger.debug("encryption_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time()-start_time) logger.debug("encryption_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time()-start_time)
time.sleep(5) # settle time.sleep(5) # settle
@ -182,14 +182,16 @@ def sending_worker(in_q, out_q, accesskey, sharedkey, host):
logger.debug("sending_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time() - start_time) logger.debug("sending_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time() - start_time)
time.sleep(5) # settle time.sleep(5) # settle
def unlink_worker(in_q, accesskey, sharedkey, host): def unlink_worker(in_q):
start_time = time.time() start_time = time.time()
counter = 0 counter = 0
bucket = open_s3(accesskey, sharedkey, host)
for filename in iter(in_q.get, 'STOP'): for filename in iter(in_q.get, 'STOP'):
counter += 1 counter += 1
logger.debug("unlink_worker: deleting %s", filename) logger.debug("unlink_worker: deleting %s", filename)
os.unlink(filename) try:
os.unlink(filename)
except OSError, e:
logger.warning("unlink_worker: caught exception: %s", e)
logger.debug("unlink_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time() - start_time) logger.debug("unlink_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time() - start_time)
time.sleep(5) # settle time.sleep(5) # settle
@ -292,7 +294,7 @@ if __name__ == '__main__':
proc = Popen(splitcmd, preexec_fn=lambda : os.nice(10), stdin=tarfp) proc = Popen(splitcmd, preexec_fn=lambda : os.nice(10), stdin=tarfp)
proc.communicate() proc.communicate()
tarfp.close() tarfp.close()
os.unlink(outfile) unlink_queue.put(outfile)
logger.info("main: dumped %i files from %s #%i" % (len(glob.glob(fileglob)), host, bkupNum)) logger.info("main: dumped %i files from %s #%i" % (len(glob.glob(fileglob)), host, bkupNum))
@ -322,7 +324,7 @@ if __name__ == '__main__':
procs = [] procs = []
for i in range(process_count): for i in range(process_count):
p = Process(name="encryption_worker_%i" % i, target=encryption_worker, args=(gpg_queue, send_queue,)) p = Process(name="encryption_worker_%i" % i, target=encryption_worker, args=(gpg_queue, send_queue, unlink_queue))
p.start() p.start()
procs.append(p) procs.append(p)
@ -330,7 +332,7 @@ if __name__ == '__main__':
send_p.start() send_p.start()
procs.append(send_p) procs.append(send_p)
unlink_p = Process(name="unlink_worker", target=unlink_worker, args=(unlink_queue, secrets.accesskey, secrets.sharedkey, host)) unlink_p = Process(name="unlink_worker", target=unlink_worker, args=(unlink_queue))
unlink_p.start() unlink_p.start()
procs.append(unlink_p) procs.append(unlink_p)