Remove SlowFile class

This functionality is best provided by the upstream router and/or
underlying system.
This commit is contained in:
Ryan Tucker 2011-09-20 22:26:19 -04:00
parent 7fa6dd0115
commit a888eab0c3

View file

@ -40,135 +40,6 @@ loghandler.setFormatter(logformatter)
logger.addHandler(loghandler)
logger.setLevel(logging.DEBUG)
# classes
class MyKey(Key):
def _compute_md5(self, fp):
"""
@type fp: file
@param fp: File pointer to the file to MD5 hash
@rtype: string
@return: MD5 Hash of the file in fp
"""
m = md5.new()
try:
s = fp.fastread(self.BufferSize)
except AttributeError:
s = fp.read(self.BufferSize)
while s:
m.update(s)
try:
s = fp.fastread(self.BufferSize)
except AttributeError:
s = fp.read(self.BufferSize)
self.md5 = m.hexdigest()
self.base64md5 = base64.encodestring(m.digest())
if self.base64md5[-1] == '\n':
self.base64md5 = self.base64md5[0:-1]
self.size = fp.tell()
fp.seek(0)
class SlowFile:
def __init__(self, name, mode='rb', speed=19200):
"""Open a rate-limited filehandle. Supports read for now.
name, mode as normal; speed in bytes per second"""
self.name = name
self.mode = mode
self.speed = speed
self.fd = open(self.name, self.mode)
self.closed = False
self.encoding = None
self.delayfactor = 1
self.lastblocktargettime = 0
self.lastdebug = 0
def close(self):
self.closed = True
return self.fd.close()
def fastread(self, len=1024):
return self.fd.read(len)
def flush(self):
return self.fd.flush()
def fileno(self):
return self.fd.fileno()
def isatty(self):
return self.fd.isatty()
def tell(self):
return self.fd.tell()
def read(self, len=1024):
if not self.fd:
raise IOError
starttime = time.time()
if self.lastblocktargettime == 0:
# first time through
pass
elif starttime < self.lastblocktargettime:
# we're early
sleepfor = self.lastblocktargettime - starttime
time.sleep(sleepfor)
else:
# we're late; something got slow.
lateness = starttime - self.lastblocktargettime
if lateness > 0.2:
self.delayfactor += min(0.5, lateness)
logger.debug('%.2f seconds late (congestion?); setting delayfactor to %.2f' % (lateness, self.delayfactor))
targetspeed = self.speed/float(self.delayfactor)
self.lastblocktargettime = time.time() + len/float(targetspeed)
if time.time() > self.lastdebug+60:
if self.delayfactor > 1:
# reduce the penalty a bit
self.delayfactor -= 0.2
targetspeed = self.speed/float(self.delayfactor)
self.lastblocktargettime = time.time() + len/float(targetspeed)
if self.delayfactor < 1:
self.delayfactor = 1
if self.delayfactor > 20:
self.delayfactor = 20
logger.debug('Target %i bytes/second (%i kilobits/second), delay factor %.2f, block len %i' % (targetspeed, targetspeed*8/1024, self.delayfactor, len))
if hasattr(secrets, 'speedfile'):
try:
newkbits = int(open(secrets.speedfile, 'r').readline())
newspeed = int((newkbits/float(8))*1024)
if newspeed != self.speed:
self.delayfactor = 1
self.speed = newspeed
logger.debug('Setting new speed! %i bytes/second' % newspeed)
self.lastblocktargettime = 0
except IOError:
logger.debug('IO Error opening semaphore file')
except ValueError:
logger.debug('Semaphore file invalid')
self.lastdebug = time.time()
return self.fd.read(len)
def seek(self, offset, mode=0):
self.delayfactor = 1
return self.fd.seek(offset, mode)
def write(self, data):
raise NotImplementedError
# functions
def is_exe(fpath):
return os.path.exists(fpath) and os.access(fpath, os.X_OK)
@ -411,7 +282,7 @@ if __name__ == '__main__':
# finalize the backup
bucket = open_s3(secrets.accesskey, secrets.sharedkey, host)
key = MyKey(bucket)
key = Key(bucket)
key.key = '%sCOMPLETE' % os.path.basename(filehead)
key.set_contents_from_string('%s %s "%s"' % (beginning, time.time(), mesg))
key.close()