Fix multiprocessing bugs, add unlink_worker

This is still under some live development, for sure :-)

Among other minor (yet significant) fixes, I've added another
worker process to handle unlinking.  This verifies that the file
was transmitted properly (or at least with the right md5sum)
before unlinking it.
This commit is contained in:
Ryan Tucker 2011-09-22 10:43:28 -04:00
parent 3892037351
commit e4fd5ca8ac

View file

@ -118,40 +118,72 @@ def encryption_worker(in_q, out_q):
logger.info("Beginning encryption of %s.", filename) logger.info("Beginning encryption of %s.", filename)
result = encrypt_file(filename, gpgkey, comppath) result = encrypt_file(filename, gpgkey, comppath)
out_q.put(result) out_q.put(result)
logger.info("Finished encryption of %s in %i seconds.", filename, time.time()-cryptstart_time) logger.debug("encryption_worker: encrypted %s in %i seconds", filename, time.time()-cryptstart_time)
logger.info("Finished processing %i items in encryption queue in %i seconds.", counter, time.time()-start_time) logger.debug("encryption_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time()-start_time)
out_q.put('STOP')
def sending_worker(in_q, accesskey, sharedkey, host): def sending_worker(in_q, out_q, accesskey, sharedkey, host):
"Sends things from the in_q using the send_file method" "Sends things from the in_q using the send_file method"
start_time = time.time() start_time = time.time()
counter = 0 counter = 0
bucket = open_s3(accesskey, sharedkey, host)
for filename in iter(in_q.get, 'STOP'): for filename in iter(in_q.get, 'STOP'):
sending_start = time.time() sending_start = time.time()
counter += 1 counter += 1
retry_count = 0 retry_count = 0
max_retries = 10 max_retries = 10
done = False
while retry_count <= max_retries: while retry_count <= max_retries and not done:
try: try:
logger.info("sending_worker: sending %s", filename)
bucket = open_s3(accesskey, sharedkey, host)
key = send_file(bucket, filename) key = send_file(bucket, filename)
key.set_acl('private') key.set_acl('private')
key.close() key.close()
retry_count = max_retries+1 done = True
except (boto.exception.S3ResponseError, socket.error), e: except (boto.exception.S3ResponseError, socket.error), e:
retry_count += 1 retry_count += 1
sleeptime = 2**retry_count sleeptime = 2**retry_count
logger.error('Encountered exception %s, retrying in %i seconds (%i/%i)', e, sleeptime, retry_count, max_retries) logger.error('Encountered exception %s, retrying in %i seconds (%i/%i)', e, sleeptime, retry_count, max_retries)
time.sleep(sleeptime) time.sleep(sleeptime)
if not done:
# trip out
logger.error('sending_worker: could not upload %s in %i retries')
else:
size = os.path.getsize(filename) size = os.path.getsize(filename)
os.unlink(filename)
sending_seconds = time.time() - sending_start sending_seconds = time.time() - sending_start
bytespersecond = size / sending_seconds bytespersecond = size / sending_seconds
logger.debug("sending_worker: sent %s in %i seconds at %i bytes/second.", filename, sending_seconds, bytespersecond)
out_q.put(filename)
logger.info("Finished sending of %s in %i seconds, transfer speed %i bytes/second.", filename, sending_seconds, bytespersecond) logger.debug("sending_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time() - start_time)
out_q.put('STOP')
def unlink_worker(in_q, accesskey, sharedkey, host):
start_time = time.time()
counter = 0
bucket = open_s3(accesskey, sharedkey, host)
for filename in iter(in_q.get, 'STOP'):
counter += 1
basefilename = os.path.basename(filename)
key = bucket.get_key(basefilename)
stat = os.stat(filename)
if key:
if key.size == stat[6]:
fp = open(filename)
local_md5 = hashlib.md5(fp.read())
fp.close()
if '"%s"' % local_md5.hexdigest() == key.etag:
logger.debug("unlink_worker: deleting %s", basefilename)
os.unlink(filename)
else:
logger.error("unlink_worker: md5sum for %s did not match: %s != %s", basefilename, '"%s"' % local_md5.hexdigest(), key.etag)
else:
logger.error("unlink_worker: size mismatch for %s: %i != %i", basefilename, stat[6], key.size)
else:
logger.error("unlink_worker: key does not exist: %s", basefilename)
logger.debug("unlink_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time() - start_time)
if __name__ == '__main__': if __name__ == '__main__':
# Read in arguments, verify that they match the BackupPC standard exactly # Read in arguments, verify that they match the BackupPC standard exactly
@ -249,6 +281,13 @@ if __name__ == '__main__':
# Create queues for handling encryption and file transfers # Create queues for handling encryption and file transfers
gpg_queue = Queue() gpg_queue = Queue()
send_queue = Queue() send_queue = Queue()
unlink_queue = Queue()
queues = {
'gpg_queue': gpg_queue,
'send_queue': send_queue,
'unlink_queue': unlink_queue,
}
# Pre-run to check for artifacts # Pre-run to check for artifacts
for i in glob.glob(fileglob): for i in glob.glob(fileglob):
@ -257,7 +296,7 @@ if __name__ == '__main__':
os.unlink(i + '.gpg') os.unlink(i + '.gpg')
# Run again to send files to the relevant queue # Run again to send files to the relevant queue
for i in glob.glob(fileglob): for i in sorted(glob.glob(fileglob)):
if secrets.gpgsymmetrickey and not i.endswith('.gpg'): if secrets.gpgsymmetrickey and not i.endswith('.gpg'):
# A tar file, unencrypted, needs encrypted. # A tar file, unencrypted, needs encrypted.
logger.debug("Adding %s to gpg_queue", i) logger.debug("Adding %s to gpg_queue", i)
@ -269,6 +308,7 @@ if __name__ == '__main__':
# Put a STOP command at the end of the GPG queue. # Put a STOP command at the end of the GPG queue.
gpg_queue.put('STOP') gpg_queue.put('STOP')
gpg_queue_closed = True
# Start some handlers, wait until everything is done # Start some handlers, wait until everything is done
try: try:
@ -276,30 +316,51 @@ if __name__ == '__main__':
except NotImplementedError: except NotImplementedError:
process_count = 1 process_count = 1
procs = []
for i in range(process_count): for i in range(process_count):
Process(target=encryption_worker, args=(gpg_queue, send_queue,)).start() p = Process(name="encryption_worker_%i" % i, target=encryption_worker, args=(gpg_queue, send_queue,))
p.start()
procs.append(p)
send_p = Process(target=sending_worker, args=(send_queue, secrets.accesskey, secrets.sharedkey, host)) send_p = Process(name="send_worker", target=sending_worker, args=(send_queue, unlink_queue, secrets.accesskey, secrets.sharedkey, host))
send_p.start() send_p.start()
procs.append(send_p)
while send_p.is_alive(): unlink_p = Process(name="unlink_worker", target=unlink_worker, args=(unlink_queue, secrets.accesskey, secrets.sharedkey, host))
# Generate some output so that BackupPC doesn't time out unlink_p.start()
mesg = "Checkpoint: %i task(s) in GPG queue, %i task(s) in S3 queue.\n" % (gpg_queue.qsize(), send_queue.qsize()) procs.append(unlink_p)
logger.info(mesg)
sys.stdout.write(mesg + '\n')
sys.stdout.flush()
send_p.join(300)
if not gpg_queue.empty(): send_queue_closed = False
raise Exception("GPG queue not empty") unlink_queue_closed = False
if not send_queue.empty():
raise Exception("Send queue not empty")
logger.info("Finalizing backup.") for i in procs:
i.join()
crypto_running = 0
for j in procs:
if j.name.startswith("encryption_worker") and j.is_alive():
crypto_running += 1
if crypto_running == 0 and not send_queue_closed:
send_queue.put('STOP')
send_queue_closed = True
logger.debug("main: queuing stop sentinel for send_queue")
if not send_p.is_alive() and not unlink_queue_closed:
unlink_queue.put('STOP')
unlink_queue_closed = True
logger.debug("main: queuing stop sentinel for unlink_queue")
logger.debug("main: process terminated: %s", i.name)
for qname, q in queues.items():
if not q.empty():
logger.critical("main: queue %s not empty!", qname)
raise Exception("queue not empty: %s" % qname)
logger.debug("main: finalizing backup")
# finalize the backup # finalize the backup
bucket = open_s3(secrets.accesskey, secrets.sharedkey, host) bucket = open_s3(secrets.accesskey, secrets.sharedkey, host)
key = Key(bucket) key = Key(bucket)
key.key = '%sCOMPLETE' % os.path.basename(filehead) key.key = '%sCOMPLETE' % os.path.basename(filehead)
key.set_acl('private')
key.set_contents_from_string('%s %s "%s"' % (beginning, time.time(), mesg)) key.set_contents_from_string('%s %s "%s"' % (beginning, time.time(), mesg))
key.close() key.close()