From dd0a141d5fae8002851d7b2db091a52c43365bd2 Mon Sep 17 00:00:00 2001 From: Ryan Tucker Date: Tue, 29 Dec 2009 14:41:50 -0500 Subject: [PATCH] adding rate limiting and retry logic --- BackupPC_archiveHost_s3 | 128 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 120 insertions(+), 8 deletions(-) diff --git a/BackupPC_archiveHost_s3 b/BackupPC_archiveHost_s3 index edb59a1..1b41493 100755 --- a/BackupPC_archiveHost_s3 +++ b/BackupPC_archiveHost_s3 @@ -12,7 +12,9 @@ # sharedkey = 'amazon aws shared key' # gpgsymmetrickey = 'gpg symmetric key -- make it good, but do not lose it' +import base64 import glob +import md5 import os import secrets import sys @@ -35,6 +37,103 @@ loghandler.setFormatter(logformatter) logger.addHandler(loghandler) logger.setLevel(logging.DEBUG) +# classes +class MyKey(Key): + def _compute_md5(self, fp): + """ + @type fp: file + @param fp: File pointer to the file to MD5 hash + + @rtype: string + @return: MD5 Hash of the file in fp + """ + m = md5.new() + s = fp.fastread(self.BufferSize) + while s: + m.update(s) + s = fp.fastread(self.BufferSize) + self.md5 = m.hexdigest() + self.base64md5 = base64.encodestring(m.digest()) + if self.base64md5[-1] == '\n': + self.base64md5 = self.base64md5[0:-1] + self.size = fp.tell() + fp.seek(0) + +class SlowFile: + def __init__(self, name, mode='rb', speed=19200): + """Open a rate-limited filehandle. Supports read for now. + name, mode as normal; speed in bytes per second""" + + self.name = name + self.mode = mode + self.speed = speed + + self.fd = open(self.name, self.mode) + + self.closed = False + self.encoding = None + + self.delayfactor = 1 + self.lastblocktargettime = None + + self.lastdebug = 0 + + def close(self): + self.closed = True + return self.fd.close() + + def fastread(self, len=1024): + return self.fd.read(len) + + def flush(self): + return self.fd.flush() + + def fileno(self): + return self.fd.fileno() + + def isatty(self): + return self.fd.isatty() + + def tell(self): + return self.fd.tell() + + def read(self, len=1024): + if not self.fd: + raise IOError + + starttime = time.time() + + if starttime < self.lastblocktargettime: + # we're early + sleepfor = self.lastblocktargettime - starttime + time.sleep(sleepfor) + if self.delayfactor > 1: + self.delayfactor -= 0.02 + else: + # we're late; something got slow. + self.delayfactor += 0.01 + + if self.delayfactor < 1: + self.delayfactor = 1 + if self.delayfactor > 5: + self.delayfactor = 5 + + targetspeed = self.speed/float(self.delayfactor) + self.lastblocktargettime = time.time() + len/float(targetspeed) + + if time.time() > self.lastdebug+60: + logger.debug('Target %i bytes/second (%i kilobits/second), delay factor %.2f, block len %i' % (targetspeed, targetspeed*8/1024, self.delayfactor, len)) + self.lastdebug = time.time() + + return self.fd.read(len) + + def seek(self, offset, mode=0): + self.delayfactor = 1 + return self.fd.seek(offset, mode) + + def write(self, data): + raise NotImplementedError + # functions def is_exe(fpath): return os.path.exists(fpath) and os.access(fpath, os.X_OK) @@ -89,11 +188,13 @@ def handle_progress(transmitted, pending): def send_file(bucket, filename): if bucket.get_key(filename): logging.error('Duplicate filename %s! I hope that is OK.' % filename) - k = Key(bucket) + k = MyKey(bucket) k.key = os.path.basename(filename) logging.info('Uploading %s...' % os.path.basename(filename)) - k.set_contents_from_filename(filename, cb=handle_progress) + fd = SlowFile(name=filename, mode='rb') + + k.set_contents_from_file(fd, cb=handle_progress) return k @@ -154,7 +255,7 @@ if returncode != 0: logger.error('%s died with exit code %i' % (cmd, returncode)) sys.exit(1) -logging.info('Beginning post-processing of %s #%i' % (host, bkupNum)) +logging.info('Beginning post-processing of %i files from %s #%i' % (len(glob.glob(fileglob)), host, bkupNum)) for i in sorted(glob.glob(fileglob)): if secrets.gpgsymmetrickey: @@ -162,11 +263,22 @@ for i in sorted(glob.glob(fileglob)): else: sendfile = i - key = send_file(bucket, sendfile) - key.set_metadata('backuppc_cmd', cmd) - key.set_metadata('backuppc_mesg', mesg) - key.set_acl('private') - key.close() + retry_count = 0 + max_retries = 10 + + while retry_count <= max_retries: + try: + key = send_file(bucket, sendfile) + key.set_metadata('backuppc_cmd', cmd) + key.set_metadata('backuppc_mesg', mesg) + key.set_acl('private') + key.close() + retry_count = max_retries+1 + except boto.exception.S3ResponseError as e: + retry_count += 1 + sleeptime = 2**retry_count + log.error('Encountered S3 exception %s, retrying in %i seconds (%i/%i)' % (e, sleeptime, retry_count, max_retries)) + time.sleep(sleeptime) os.unlink(sendfile)