Various adjustments and bug fixes

Fix: force integers to strs in Popen arguments

Refactored verify_file out of unlink_worker
If a file already exists on S3 and is identical, don't re-upload
Add some delays, etc, to allow things to settle
Put a STOP in the gpg_queue for each process
This commit is contained in:
Ryan Tucker 2011-09-22 22:21:37 -04:00
parent 7bda017245
commit 871613a6ea

View file

@ -92,17 +92,34 @@ def open_s3(accesskey, sharedkey, host):
return bucket
def handle_progress(transmitted, pending):
logger.debug('send_file: %i of %i bytes transmitted (%.2f%%)' % (transmitted, pending, (transmitted/float(pending))*100))
logger.debug("send_file: %i of %i bytes transmitted (%.2f%%)", transmitted, pending, (transmitted/float(pending))*100)
def verify_file(bucket, filename):
"Returns True if the file size and md5sum match, False otherwise"
basefilename = os.path.basename(filename)
key = bucket.get_key(basefilename)
stat = os.stat(filename)
if key:
if key.size == stat[6]:
fp = open(filename)
local_md5 = hashlib.md5(fp.read())
fp.close()
if '"%s"' % local_md5.hexdigest() == key.etag:
return True
return False
def send_file(bucket, filename):
basefilename = os.path.basename(filename)
k = Key(bucket)
k.key = basefilename
if k.exists():
logger.warning('send_file: %s already exists on S3, overwriting' % basefilename)
k.set_contents_from_filename(filename, cb=handle_progress, reduced_redundancy=True)
if k.exists():
if verify_file(bucket, filename):
logger.warning("send_file: %s already exists and is identical, not overwriting", basefilename)
return k
logger.warning("send_file: %s already exists on S3, overwriting", basefilename)
k.set_contents_from_filename(filename, cb=handle_progress, reduced_redundancy=True)
return k
def encryption_worker(in_q, out_q):
@ -117,6 +134,7 @@ def encryption_worker(in_q, out_q):
out_q.put(result)
logger.debug("encryption_worker: encrypted %s in %i seconds", filename, time.time()-cryptstart_time)
logger.debug("encryption_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time()-start_time)
time.sleep(5) # settle
def sending_worker(in_q, out_q, accesskey, sharedkey, host):
"Sends things from the in_q using the send_file method"
@ -145,7 +163,7 @@ def sending_worker(in_q, out_q, accesskey, sharedkey, host):
if not done:
# trip out
logger.error('sending_worker: could not upload %s in %i retries')
logger.error('sending_worker: could not upload %s in %i retries', filename, retry_count)
else:
size = os.path.getsize(filename)
sending_seconds = time.time() - sending_start
@ -154,7 +172,7 @@ def sending_worker(in_q, out_q, accesskey, sharedkey, host):
out_q.put(filename)
logger.debug("sending_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time() - start_time)
out_q.put('STOP')
time.sleep(5) # settle
def unlink_worker(in_q, accesskey, sharedkey, host):
start_time = time.time()
@ -162,25 +180,25 @@ def unlink_worker(in_q, accesskey, sharedkey, host):
bucket = open_s3(accesskey, sharedkey, host)
for filename in iter(in_q.get, 'STOP'):
counter += 1
basefilename = os.path.basename(filename)
key = bucket.get_key(basefilename)
stat = os.stat(filename)
if key:
if key.size == stat[6]:
fp = open(filename)
local_md5 = hashlib.md5(fp.read())
fp.close()
if '"%s"' % local_md5.hexdigest() == key.etag:
logger.debug("unlink_worker: deleting %s", basefilename)
os.unlink(filename)
else:
logger.error("unlink_worker: md5sum for %s did not match: %s != %s", basefilename, '"%s"' % local_md5.hexdigest(), key.etag)
retry_count = 0
max_retries = 3
done = False
while retry_count <= max_retries and not done:
if verify_file(bucket, filename):
logger.debug("unlink_worker: deleting %s", filename)
os.unlink(filename)
done = True
else:
logger.error("unlink_worker: size mismatch for %s: %i != %i", basefilename, stat[6], key.size)
else:
logger.error("unlink_worker: key does not exist: %s", basefilename)
retry_count += 1
sleeptime = 2**retry_count
logger.error("unlink_worker: verify_file on %s returned false, retrying in %i seconds (%i/%i)", filename, sleeptime, retry_count, max_retries)
time.sleep(sleeptime)
if not done:
logger.error("unlink_worker: could not verify remote %s in %i retries", filename, retry_count)
logger.debug("unlink_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time() - start_time)
time.sleep(5) # settle
if __name__ == '__main__':
# Read in arguments, verify that they match the BackupPC standard exactly
@ -207,8 +225,16 @@ if __name__ == '__main__':
beginning = time.time()
if share == '*':
share = '\*'
# Create queues for workers
gpg_queue = Queue()
send_queue = Queue()
unlink_queue = Queue()
queues = {
'gpg_queue': gpg_queue,
'send_queue': send_queue,
'unlink_queue': unlink_queue,
}
# Is there already evidence of this having been done before?
if glob.glob('%s/%s.*.tar.*' % (outLoc, host)):
@ -239,7 +265,7 @@ if __name__ == '__main__':
tarcmd = [tarCreate, '-t']
tarcmd.extend(['-h', host])
tarcmd.extend(['-n', bkupNum])
tarcmd.extend(['-n', str(bkupNum)])
tarcmd.extend(['-s', share])
tarcmd.extend(['.'])
@ -249,7 +275,7 @@ if __name__ == '__main__':
if splitSize > 0 and is_exe(splitPath):
filehead = outfile + '.'
fileglob = filehead + '*'
splitcmd = [splitPath, '-b', splitSize, '-', filehead]
splitcmd = [splitPath, '-b', str(splitSize), '-', filehead]
mesg += ', split into %i byte chunks' % splitSize
else:
fileglob = outfile
@ -261,31 +287,21 @@ if __name__ == '__main__':
logger.info("main: %s", mesg)
logger.debug("main: executing tarcmd: %s > %s", ' '.join(tarcmd), outfile)
outfp = open(outfile, 'wb')
proc = Popen(tarcmd, preexec_fn=lambda : os.nice(10), stdout=outfile)
tarfp = open(outfile, 'wb')
proc = Popen(tarcmd, preexec_fn=lambda : os.nice(10), stdout=tarfp)
proc.communicate()
outfp.close()
tarfp.close()
if splitcmd:
logger.debug("main: executing splitcmd: %s", ' '.join(splitcmd))
infp = open(outfile, 'rb')
proc = Popen(splitcmd, preexec_fn=lambda : os.nice(10), stdin=infp)
tarfp = open(outfile, 'rb')
proc = Popen(splitcmd, preexec_fn=lambda : os.nice(10), stdin=tarfp)
proc.communicate()
infp.close()
tarfp.close()
os.unlink(outfile)
logger.info("main: dumped %i files from %s #%i" % (len(glob.glob(fileglob)), host, bkupNum))
# Create queues for handling encryption and file transfers
gpg_queue = Queue()
send_queue = Queue()
unlink_queue = Queue()
queues = {
'gpg_queue': gpg_queue,
'send_queue': send_queue,
'unlink_queue': unlink_queue,
}
# Pre-run to check for artifacts
for i in glob.glob(fileglob):
if not i.endswith('.gpg') and os.path.exists(i + '.gpg'):
@ -303,10 +319,6 @@ if __name__ == '__main__':
logger.debug("main: adding %s to send_queue", i)
send_queue.put(i)
# Put a STOP command at the end of the GPG queue.
gpg_queue.put('STOP')
gpg_queue_closed = True
# Start some handlers, wait until everything is done
try:
process_count = cpu_count()
@ -331,6 +343,11 @@ if __name__ == '__main__':
send_queue_closed = False
unlink_queue_closed = False
# Put STOP command(s) at the end of the GPG queue.
gpg_queue_closed = True
for i in range(process_count):
gpg_queue.put('STOP')
for i in procs:
i.join()
crypto_running = 0
@ -348,9 +365,12 @@ if __name__ == '__main__':
logger.debug("main: process terminated: %s", i.name)
for qname, q in queues.items():
time.sleep(5) # settle
if not q.empty():
logger.critical("main: queue %s not empty!", qname)
raise Exception("queue not empty: %s" % qname)
else:
logger.debug("main: queue %s is empty", qname)
logger.debug("main: finalizing backup")