minor tweaks to exception handling and logging
This commit is contained in:
parent
871613a6ea
commit
e8f7ce4cf9
1 changed files with 26 additions and 14 deletions
|
@ -46,6 +46,9 @@ logger.addHandler(consolehandler)
|
||||||
|
|
||||||
logger.setLevel(logging.DEBUG)
|
logger.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
class VerifyError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
def is_exe(fpath):
|
def is_exe(fpath):
|
||||||
return os.path.exists(fpath) and os.access(fpath, os.X_OK)
|
return os.path.exists(fpath) and os.access(fpath, os.X_OK)
|
||||||
|
|
||||||
|
@ -104,6 +107,7 @@ def verify_file(bucket, filename):
|
||||||
fp = open(filename)
|
fp = open(filename)
|
||||||
local_md5 = hashlib.md5(fp.read())
|
local_md5 = hashlib.md5(fp.read())
|
||||||
fp.close()
|
fp.close()
|
||||||
|
logger.debug("verify_file: %s: local md5 %s, etag %s", filename, local_md5.hexdigest(), key.etag)
|
||||||
if '"%s"' % local_md5.hexdigest() == key.etag:
|
if '"%s"' % local_md5.hexdigest() == key.etag:
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
@ -120,6 +124,10 @@ def send_file(bucket, filename):
|
||||||
logger.warning("send_file: %s already exists on S3, overwriting", basefilename)
|
logger.warning("send_file: %s already exists on S3, overwriting", basefilename)
|
||||||
|
|
||||||
k.set_contents_from_filename(filename, cb=handle_progress, reduced_redundancy=True)
|
k.set_contents_from_filename(filename, cb=handle_progress, reduced_redundancy=True)
|
||||||
|
|
||||||
|
logger.debug("send_file: %s sent, verifying fidelity", filename)
|
||||||
|
if not verify_file(bucket, filename):
|
||||||
|
raise VerifyError("verify failed")
|
||||||
return k
|
return k
|
||||||
|
|
||||||
def encryption_worker(in_q, out_q):
|
def encryption_worker(in_q, out_q):
|
||||||
|
@ -155,7 +163,7 @@ def sending_worker(in_q, out_q, accesskey, sharedkey, host):
|
||||||
key.set_acl('private')
|
key.set_acl('private')
|
||||||
key.close()
|
key.close()
|
||||||
done = True
|
done = True
|
||||||
except (boto.exception.S3ResponseError, socket.error), e:
|
except (boto.exception.S3ResponseError, socket.error, VerifyError), e:
|
||||||
retry_count += 1
|
retry_count += 1
|
||||||
sleeptime = 2**retry_count
|
sleeptime = 2**retry_count
|
||||||
logger.error('sending_worker: exception %s, retrying in %i seconds (%i/%i)', e, sleeptime, retry_count, max_retries)
|
logger.error('sending_worker: exception %s, retrying in %i seconds (%i/%i)', e, sleeptime, retry_count, max_retries)
|
||||||
|
@ -349,20 +357,34 @@ if __name__ == '__main__':
|
||||||
gpg_queue.put('STOP')
|
gpg_queue.put('STOP')
|
||||||
|
|
||||||
for i in procs:
|
for i in procs:
|
||||||
|
# wait for each process to terminate in turn
|
||||||
i.join()
|
i.join()
|
||||||
|
logger.debug("main: process terminated: %s", i.name)
|
||||||
|
|
||||||
|
# count how many crypto processes are still running
|
||||||
crypto_running = 0
|
crypto_running = 0
|
||||||
for j in procs:
|
for j in procs:
|
||||||
if j.name.startswith("encryption_worker") and j.is_alive():
|
if j.name.startswith("encryption_worker") and j.is_alive():
|
||||||
crypto_running += 1
|
crypto_running += 1
|
||||||
|
|
||||||
if crypto_running == 0 and not send_queue_closed:
|
if crypto_running == 0 and not send_queue_closed:
|
||||||
|
# crypto is done, close up the send queue
|
||||||
|
logger.debug("main: queuing final file")
|
||||||
|
finalfile = '%sCOMPLETE' % filehead
|
||||||
|
fp = open(finalfile, 'w')
|
||||||
|
fp.write('%s %s "%s"' % (beginning, time.time(), mesg))
|
||||||
|
fp.close()
|
||||||
|
send_queue.put(finalfile)
|
||||||
|
|
||||||
|
logger.debug("main: queuing stop sentinel for send_queue")
|
||||||
send_queue.put('STOP')
|
send_queue.put('STOP')
|
||||||
send_queue_closed = True
|
send_queue_closed = True
|
||||||
logger.debug("main: queuing stop sentinel for send_queue")
|
|
||||||
if not send_p.is_alive() and not unlink_queue_closed:
|
if not send_p.is_alive() and not unlink_queue_closed:
|
||||||
|
# sending is done, close up the unlink queue
|
||||||
|
logger.debug("main: queuing stop sentinel for unlink_queue")
|
||||||
unlink_queue.put('STOP')
|
unlink_queue.put('STOP')
|
||||||
unlink_queue_closed = True
|
unlink_queue_closed = True
|
||||||
logger.debug("main: queuing stop sentinel for unlink_queue")
|
|
||||||
logger.debug("main: process terminated: %s", i.name)
|
|
||||||
|
|
||||||
for qname, q in queues.items():
|
for qname, q in queues.items():
|
||||||
time.sleep(5) # settle
|
time.sleep(5) # settle
|
||||||
|
@ -372,14 +394,4 @@ if __name__ == '__main__':
|
||||||
else:
|
else:
|
||||||
logger.debug("main: queue %s is empty", qname)
|
logger.debug("main: queue %s is empty", qname)
|
||||||
|
|
||||||
logger.debug("main: finalizing backup")
|
|
||||||
|
|
||||||
# finalize the backup
|
|
||||||
bucket = open_s3(secrets.accesskey, secrets.sharedkey, host)
|
|
||||||
key = Key(bucket)
|
|
||||||
key.key = '%sCOMPLETE' % os.path.basename(filehead)
|
|
||||||
key.set_acl('private')
|
|
||||||
key.set_contents_from_string('%s %s "%s"' % (beginning, time.time(), mesg))
|
|
||||||
key.close()
|
|
||||||
|
|
||||||
logger.info("main: completed run after %i seconds", (time.time() - beginning))
|
logger.info("main: completed run after %i seconds", (time.time() - beginning))
|
||||||
|
|
Loading…
Reference in a new issue