adding rate limiting and retry logic
This commit is contained in:
parent
07777a8628
commit
dd0a141d5f
1 changed files with 120 additions and 8 deletions
|
@ -12,7 +12,9 @@
|
|||
# sharedkey = 'amazon aws shared key'
|
||||
# gpgsymmetrickey = 'gpg symmetric key -- make it good, but do not lose it'
|
||||
|
||||
import base64
|
||||
import glob
|
||||
import md5
|
||||
import os
|
||||
import secrets
|
||||
import sys
|
||||
|
@ -35,6 +37,103 @@ loghandler.setFormatter(logformatter)
|
|||
logger.addHandler(loghandler)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
# classes
|
||||
class MyKey(Key):
|
||||
def _compute_md5(self, fp):
|
||||
"""
|
||||
@type fp: file
|
||||
@param fp: File pointer to the file to MD5 hash
|
||||
|
||||
@rtype: string
|
||||
@return: MD5 Hash of the file in fp
|
||||
"""
|
||||
m = md5.new()
|
||||
s = fp.fastread(self.BufferSize)
|
||||
while s:
|
||||
m.update(s)
|
||||
s = fp.fastread(self.BufferSize)
|
||||
self.md5 = m.hexdigest()
|
||||
self.base64md5 = base64.encodestring(m.digest())
|
||||
if self.base64md5[-1] == '\n':
|
||||
self.base64md5 = self.base64md5[0:-1]
|
||||
self.size = fp.tell()
|
||||
fp.seek(0)
|
||||
|
||||
class SlowFile:
|
||||
def __init__(self, name, mode='rb', speed=19200):
|
||||
"""Open a rate-limited filehandle. Supports read for now.
|
||||
name, mode as normal; speed in bytes per second"""
|
||||
|
||||
self.name = name
|
||||
self.mode = mode
|
||||
self.speed = speed
|
||||
|
||||
self.fd = open(self.name, self.mode)
|
||||
|
||||
self.closed = False
|
||||
self.encoding = None
|
||||
|
||||
self.delayfactor = 1
|
||||
self.lastblocktargettime = None
|
||||
|
||||
self.lastdebug = 0
|
||||
|
||||
def close(self):
|
||||
self.closed = True
|
||||
return self.fd.close()
|
||||
|
||||
def fastread(self, len=1024):
|
||||
return self.fd.read(len)
|
||||
|
||||
def flush(self):
|
||||
return self.fd.flush()
|
||||
|
||||
def fileno(self):
|
||||
return self.fd.fileno()
|
||||
|
||||
def isatty(self):
|
||||
return self.fd.isatty()
|
||||
|
||||
def tell(self):
|
||||
return self.fd.tell()
|
||||
|
||||
def read(self, len=1024):
|
||||
if not self.fd:
|
||||
raise IOError
|
||||
|
||||
starttime = time.time()
|
||||
|
||||
if starttime < self.lastblocktargettime:
|
||||
# we're early
|
||||
sleepfor = self.lastblocktargettime - starttime
|
||||
time.sleep(sleepfor)
|
||||
if self.delayfactor > 1:
|
||||
self.delayfactor -= 0.02
|
||||
else:
|
||||
# we're late; something got slow.
|
||||
self.delayfactor += 0.01
|
||||
|
||||
if self.delayfactor < 1:
|
||||
self.delayfactor = 1
|
||||
if self.delayfactor > 5:
|
||||
self.delayfactor = 5
|
||||
|
||||
targetspeed = self.speed/float(self.delayfactor)
|
||||
self.lastblocktargettime = time.time() + len/float(targetspeed)
|
||||
|
||||
if time.time() > self.lastdebug+60:
|
||||
logger.debug('Target %i bytes/second (%i kilobits/second), delay factor %.2f, block len %i' % (targetspeed, targetspeed*8/1024, self.delayfactor, len))
|
||||
self.lastdebug = time.time()
|
||||
|
||||
return self.fd.read(len)
|
||||
|
||||
def seek(self, offset, mode=0):
|
||||
self.delayfactor = 1
|
||||
return self.fd.seek(offset, mode)
|
||||
|
||||
def write(self, data):
|
||||
raise NotImplementedError
|
||||
|
||||
# functions
|
||||
def is_exe(fpath):
|
||||
return os.path.exists(fpath) and os.access(fpath, os.X_OK)
|
||||
|
@ -89,11 +188,13 @@ def handle_progress(transmitted, pending):
|
|||
def send_file(bucket, filename):
|
||||
if bucket.get_key(filename):
|
||||
logging.error('Duplicate filename %s! I hope that is OK.' % filename)
|
||||
k = Key(bucket)
|
||||
k = MyKey(bucket)
|
||||
k.key = os.path.basename(filename)
|
||||
logging.info('Uploading %s...' % os.path.basename(filename))
|
||||
|
||||
k.set_contents_from_filename(filename, cb=handle_progress)
|
||||
fd = SlowFile(name=filename, mode='rb')
|
||||
|
||||
k.set_contents_from_file(fd, cb=handle_progress)
|
||||
|
||||
return k
|
||||
|
||||
|
@ -154,7 +255,7 @@ if returncode != 0:
|
|||
logger.error('%s died with exit code %i' % (cmd, returncode))
|
||||
sys.exit(1)
|
||||
|
||||
logging.info('Beginning post-processing of %s #%i' % (host, bkupNum))
|
||||
logging.info('Beginning post-processing of %i files from %s #%i' % (len(glob.glob(fileglob)), host, bkupNum))
|
||||
|
||||
for i in sorted(glob.glob(fileglob)):
|
||||
if secrets.gpgsymmetrickey:
|
||||
|
@ -162,11 +263,22 @@ for i in sorted(glob.glob(fileglob)):
|
|||
else:
|
||||
sendfile = i
|
||||
|
||||
key = send_file(bucket, sendfile)
|
||||
key.set_metadata('backuppc_cmd', cmd)
|
||||
key.set_metadata('backuppc_mesg', mesg)
|
||||
key.set_acl('private')
|
||||
key.close()
|
||||
retry_count = 0
|
||||
max_retries = 10
|
||||
|
||||
while retry_count <= max_retries:
|
||||
try:
|
||||
key = send_file(bucket, sendfile)
|
||||
key.set_metadata('backuppc_cmd', cmd)
|
||||
key.set_metadata('backuppc_mesg', mesg)
|
||||
key.set_acl('private')
|
||||
key.close()
|
||||
retry_count = max_retries+1
|
||||
except boto.exception.S3ResponseError as e:
|
||||
retry_count += 1
|
||||
sleeptime = 2**retry_count
|
||||
log.error('Encountered S3 exception %s, retrying in %i seconds (%i/%i)' % (e, sleeptime, retry_count, max_retries))
|
||||
time.sleep(sleeptime)
|
||||
|
||||
os.unlink(sendfile)
|
||||
|
||||
|
|
Loading…
Reference in a new issue