From 871613a6ea730b063f5efc753ac0880c386ffa6a Mon Sep 17 00:00:00 2001 From: Ryan Tucker Date: Thu, 22 Sep 2011 22:21:37 -0400 Subject: [PATCH] Various adjustments and bug fixes Fix: force integers to strs in Popen arguments Refactored verify_file out of unlink_worker If a file already exists on S3 and is identical, don't re-upload Add some delays, etc, to allow things to settle Put a STOP in the gpg_queue for each process --- BackupPC_archiveHost_s3 | 116 +++++++++++++++++++++++----------------- 1 file changed, 68 insertions(+), 48 deletions(-) diff --git a/BackupPC_archiveHost_s3 b/BackupPC_archiveHost_s3 index 733574d..cfc1926 100755 --- a/BackupPC_archiveHost_s3 +++ b/BackupPC_archiveHost_s3 @@ -92,17 +92,34 @@ def open_s3(accesskey, sharedkey, host): return bucket def handle_progress(transmitted, pending): - logger.debug('send_file: %i of %i bytes transmitted (%.2f%%)' % (transmitted, pending, (transmitted/float(pending))*100)) + logger.debug("send_file: %i of %i bytes transmitted (%.2f%%)", transmitted, pending, (transmitted/float(pending))*100) + +def verify_file(bucket, filename): + "Returns True if the file size and md5sum match, False otherwise" + basefilename = os.path.basename(filename) + key = bucket.get_key(basefilename) + stat = os.stat(filename) + if key: + if key.size == stat[6]: + fp = open(filename) + local_md5 = hashlib.md5(fp.read()) + fp.close() + if '"%s"' % local_md5.hexdigest() == key.etag: + return True + return False def send_file(bucket, filename): basefilename = os.path.basename(filename) - k = Key(bucket) k.key = basefilename - if k.exists(): - logger.warning('send_file: %s already exists on S3, overwriting' % basefilename) - k.set_contents_from_filename(filename, cb=handle_progress, reduced_redundancy=True) + if k.exists(): + if verify_file(bucket, filename): + logger.warning("send_file: %s already exists and is identical, not overwriting", basefilename) + return k + logger.warning("send_file: %s already exists on S3, overwriting", basefilename) + + k.set_contents_from_filename(filename, cb=handle_progress, reduced_redundancy=True) return k def encryption_worker(in_q, out_q): @@ -117,6 +134,7 @@ def encryption_worker(in_q, out_q): out_q.put(result) logger.debug("encryption_worker: encrypted %s in %i seconds", filename, time.time()-cryptstart_time) logger.debug("encryption_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time()-start_time) + time.sleep(5) # settle def sending_worker(in_q, out_q, accesskey, sharedkey, host): "Sends things from the in_q using the send_file method" @@ -145,7 +163,7 @@ def sending_worker(in_q, out_q, accesskey, sharedkey, host): if not done: # trip out - logger.error('sending_worker: could not upload %s in %i retries') + logger.error('sending_worker: could not upload %s in %i retries', filename, retry_count) else: size = os.path.getsize(filename) sending_seconds = time.time() - sending_start @@ -154,7 +172,7 @@ def sending_worker(in_q, out_q, accesskey, sharedkey, host): out_q.put(filename) logger.debug("sending_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time() - start_time) - out_q.put('STOP') + time.sleep(5) # settle def unlink_worker(in_q, accesskey, sharedkey, host): start_time = time.time() @@ -162,25 +180,25 @@ def unlink_worker(in_q, accesskey, sharedkey, host): bucket = open_s3(accesskey, sharedkey, host) for filename in iter(in_q.get, 'STOP'): counter += 1 - basefilename = os.path.basename(filename) - key = bucket.get_key(basefilename) - stat = os.stat(filename) - if key: - if key.size == stat[6]: - fp = open(filename) - local_md5 = hashlib.md5(fp.read()) - fp.close() - if '"%s"' % local_md5.hexdigest() == key.etag: - logger.debug("unlink_worker: deleting %s", basefilename) - os.unlink(filename) - else: - logger.error("unlink_worker: md5sum for %s did not match: %s != %s", basefilename, '"%s"' % local_md5.hexdigest(), key.etag) + retry_count = 0 + max_retries = 3 + done = False + while retry_count <= max_retries and not done: + if verify_file(bucket, filename): + logger.debug("unlink_worker: deleting %s", filename) + os.unlink(filename) + done = True else: - logger.error("unlink_worker: size mismatch for %s: %i != %i", basefilename, stat[6], key.size) - else: - logger.error("unlink_worker: key does not exist: %s", basefilename) + retry_count += 1 + sleeptime = 2**retry_count + logger.error("unlink_worker: verify_file on %s returned false, retrying in %i seconds (%i/%i)", filename, sleeptime, retry_count, max_retries) + time.sleep(sleeptime) + + if not done: + logger.error("unlink_worker: could not verify remote %s in %i retries", filename, retry_count) logger.debug("unlink_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time() - start_time) + time.sleep(5) # settle if __name__ == '__main__': # Read in arguments, verify that they match the BackupPC standard exactly @@ -207,8 +225,16 @@ if __name__ == '__main__': beginning = time.time() - if share == '*': - share = '\*' + # Create queues for workers + gpg_queue = Queue() + send_queue = Queue() + unlink_queue = Queue() + + queues = { + 'gpg_queue': gpg_queue, + 'send_queue': send_queue, + 'unlink_queue': unlink_queue, + } # Is there already evidence of this having been done before? if glob.glob('%s/%s.*.tar.*' % (outLoc, host)): @@ -239,7 +265,7 @@ if __name__ == '__main__': tarcmd = [tarCreate, '-t'] tarcmd.extend(['-h', host]) - tarcmd.extend(['-n', bkupNum]) + tarcmd.extend(['-n', str(bkupNum)]) tarcmd.extend(['-s', share]) tarcmd.extend(['.']) @@ -249,7 +275,7 @@ if __name__ == '__main__': if splitSize > 0 and is_exe(splitPath): filehead = outfile + '.' fileglob = filehead + '*' - splitcmd = [splitPath, '-b', splitSize, '-', filehead] + splitcmd = [splitPath, '-b', str(splitSize), '-', filehead] mesg += ', split into %i byte chunks' % splitSize else: fileglob = outfile @@ -261,31 +287,21 @@ if __name__ == '__main__': logger.info("main: %s", mesg) logger.debug("main: executing tarcmd: %s > %s", ' '.join(tarcmd), outfile) - outfp = open(outfile, 'wb') - proc = Popen(tarcmd, preexec_fn=lambda : os.nice(10), stdout=outfile) + tarfp = open(outfile, 'wb') + proc = Popen(tarcmd, preexec_fn=lambda : os.nice(10), stdout=tarfp) proc.communicate() - outfp.close() + tarfp.close() if splitcmd: logger.debug("main: executing splitcmd: %s", ' '.join(splitcmd)) - infp = open(outfile, 'rb') - proc = Popen(splitcmd, preexec_fn=lambda : os.nice(10), stdin=infp) + tarfp = open(outfile, 'rb') + proc = Popen(splitcmd, preexec_fn=lambda : os.nice(10), stdin=tarfp) proc.communicate() - infp.close() + tarfp.close() + os.unlink(outfile) logger.info("main: dumped %i files from %s #%i" % (len(glob.glob(fileglob)), host, bkupNum)) - # Create queues for handling encryption and file transfers - gpg_queue = Queue() - send_queue = Queue() - unlink_queue = Queue() - - queues = { - 'gpg_queue': gpg_queue, - 'send_queue': send_queue, - 'unlink_queue': unlink_queue, - } - # Pre-run to check for artifacts for i in glob.glob(fileglob): if not i.endswith('.gpg') and os.path.exists(i + '.gpg'): @@ -303,10 +319,6 @@ if __name__ == '__main__': logger.debug("main: adding %s to send_queue", i) send_queue.put(i) - # Put a STOP command at the end of the GPG queue. - gpg_queue.put('STOP') - gpg_queue_closed = True - # Start some handlers, wait until everything is done try: process_count = cpu_count() @@ -331,6 +343,11 @@ if __name__ == '__main__': send_queue_closed = False unlink_queue_closed = False + # Put STOP command(s) at the end of the GPG queue. + gpg_queue_closed = True + for i in range(process_count): + gpg_queue.put('STOP') + for i in procs: i.join() crypto_running = 0 @@ -348,9 +365,12 @@ if __name__ == '__main__': logger.debug("main: process terminated: %s", i.name) for qname, q in queues.items(): + time.sleep(5) # settle if not q.empty(): logger.critical("main: queue %s not empty!", qname) raise Exception("queue not empty: %s" % qname) + else: + logger.debug("main: queue %s is empty", qname) logger.debug("main: finalizing backup")