6360655bf8
The "except foo as e" form, used on line 350, doesn't work with older versions of Python; this change should allow it to work with Python 2.x in general. Thanks to Aaron Ciarlotta for the heads up!
376 lines
12 KiB
Python
Executable file
376 lines
12 KiB
Python
Executable file
#!/usr/bin/python -W ignore::DeprecationWarning
|
|
# A BackupPC script to archive a host's files to Amazon S3.
|
|
#
|
|
# Point $Conf{ArchiveClientCmd} at me.
|
|
# Requires python-boto
|
|
#
|
|
# Usage: BackupPC_archiveHost tarCreatePath splitPath parPath host bkupNum \
|
|
# compPath fileExt splitSize outLoc parFile share
|
|
#
|
|
# Create secrets.py such that it has:
|
|
# accesskey = 'amazon aws access key'
|
|
# sharedkey = 'amazon aws shared key'
|
|
# gpgsymmetrickey = 'gpg symmetric key -- make it good, but do not lose it'
|
|
|
|
import base64
|
|
import glob
|
|
import md5
|
|
import os
|
|
import secrets
|
|
import socket
|
|
import sys
|
|
import time
|
|
|
|
from subprocess import *
|
|
|
|
from boto.s3.connection import S3Connection
|
|
from boto.s3.key import Key
|
|
import boto.exception
|
|
|
|
import logging
|
|
import logging.handlers
|
|
|
|
logger = logging.getLogger('')
|
|
loghandler = logging.handlers.SysLogHandler('/dev/log',
|
|
facility=logging.handlers.SysLogHandler.LOG_DAEMON)
|
|
logformatter = logging.Formatter('%(filename)s: %(levelname)s: %(message)s')
|
|
loghandler.setFormatter(logformatter)
|
|
logger.addHandler(loghandler)
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
# classes
|
|
class MyKey(Key):
|
|
def _compute_md5(self, fp):
|
|
"""
|
|
@type fp: file
|
|
@param fp: File pointer to the file to MD5 hash
|
|
|
|
@rtype: string
|
|
@return: MD5 Hash of the file in fp
|
|
"""
|
|
m = md5.new()
|
|
try:
|
|
s = fp.fastread(self.BufferSize)
|
|
except AttributeError:
|
|
s = fp.read(self.BufferSize)
|
|
while s:
|
|
m.update(s)
|
|
try:
|
|
s = fp.fastread(self.BufferSize)
|
|
except AttributeError:
|
|
s = fp.read(self.BufferSize)
|
|
self.md5 = m.hexdigest()
|
|
self.base64md5 = base64.encodestring(m.digest())
|
|
if self.base64md5[-1] == '\n':
|
|
self.base64md5 = self.base64md5[0:-1]
|
|
self.size = fp.tell()
|
|
fp.seek(0)
|
|
|
|
class SlowFile:
|
|
def __init__(self, name, mode='rb', speed=19200):
|
|
"""Open a rate-limited filehandle. Supports read for now.
|
|
name, mode as normal; speed in bytes per second"""
|
|
|
|
self.name = name
|
|
self.mode = mode
|
|
self.speed = speed
|
|
|
|
self.fd = open(self.name, self.mode)
|
|
|
|
self.closed = False
|
|
self.encoding = None
|
|
|
|
self.delayfactor = 1
|
|
self.lastblocktargettime = 0
|
|
|
|
self.lastdebug = 0
|
|
|
|
def close(self):
|
|
self.closed = True
|
|
return self.fd.close()
|
|
|
|
def fastread(self, len=1024):
|
|
return self.fd.read(len)
|
|
|
|
def flush(self):
|
|
return self.fd.flush()
|
|
|
|
def fileno(self):
|
|
return self.fd.fileno()
|
|
|
|
def isatty(self):
|
|
return self.fd.isatty()
|
|
|
|
def tell(self):
|
|
return self.fd.tell()
|
|
|
|
def read(self, len=1024):
|
|
if not self.fd:
|
|
raise IOError
|
|
|
|
starttime = time.time()
|
|
|
|
if self.lastblocktargettime == 0:
|
|
# first time through
|
|
pass
|
|
elif starttime < self.lastblocktargettime:
|
|
# we're early
|
|
sleepfor = self.lastblocktargettime - starttime
|
|
time.sleep(sleepfor)
|
|
else:
|
|
# we're late; something got slow.
|
|
lateness = starttime - self.lastblocktargettime
|
|
if lateness > 0.2:
|
|
self.delayfactor += min(0.5, lateness)
|
|
logger.debug('%.2f seconds late (congestion?); setting delayfactor to %.2f' % (lateness, self.delayfactor))
|
|
|
|
targetspeed = self.speed/float(self.delayfactor)
|
|
self.lastblocktargettime = time.time() + len/float(targetspeed)
|
|
|
|
if time.time() > self.lastdebug+60:
|
|
if self.delayfactor > 1:
|
|
# reduce the penalty a bit
|
|
self.delayfactor -= 0.2
|
|
targetspeed = self.speed/float(self.delayfactor)
|
|
self.lastblocktargettime = time.time() + len/float(targetspeed)
|
|
|
|
if self.delayfactor < 1:
|
|
self.delayfactor = 1
|
|
if self.delayfactor > 20:
|
|
self.delayfactor = 20
|
|
|
|
logger.debug('Target %i bytes/second (%i kilobits/second), delay factor %.2f, block len %i' % (targetspeed, targetspeed*8/1024, self.delayfactor, len))
|
|
|
|
try:
|
|
newkbits = int(open('/home/rtucker/Dropbox/Semaphores/BackupPC_archiveHost_s3.maxspeed', 'r').readline())
|
|
newspeed = int((newkbits/float(8))*1024)
|
|
if newspeed != self.speed:
|
|
self.delayfactor = 1
|
|
self.speed = newspeed
|
|
logger.debug('Setting new speed! %i bytes/second' % newspeed)
|
|
self.lastblocktargettime = 0
|
|
except IOError:
|
|
logger.debug('No Semaphore file for new speed')
|
|
except ValueError:
|
|
logger.debug('Semaphore file invalid')
|
|
|
|
self.lastdebug = time.time()
|
|
|
|
return self.fd.read(len)
|
|
|
|
def seek(self, offset, mode=0):
|
|
self.delayfactor = 1
|
|
return self.fd.seek(offset, mode)
|
|
|
|
def write(self, data):
|
|
raise NotImplementedError
|
|
|
|
# functions
|
|
def is_exe(fpath):
|
|
return os.path.exists(fpath) and os.access(fpath, os.X_OK)
|
|
|
|
def encrypt_file(filename, key, compress='/bin/cat'):
|
|
compressmap = {'cat': 'none', 'gzip': 'ZLIB', 'bzip2': 'BZIP2'}
|
|
if os.path.basename(compress) in compressmap.keys():
|
|
compress_algo = compressmap[os.path.basename(compress)]
|
|
else:
|
|
compress_algo = 'none'
|
|
|
|
cmd = ['/usr/bin/gpg', '--batch', '--no-tty']
|
|
cmd.extend(['--compress-algo', compress_algo])
|
|
cmd.extend(['--output', '%s.gpg' % filename])
|
|
cmd.extend(['--passphrase-fd', '0'])
|
|
cmd.extend(['--symmetric', filename])
|
|
|
|
if is_exe(cmd[0]):
|
|
logging.info('Encrypting %s (compression: %s)' % (filename, compress_algo))
|
|
logging.debug(`cmd`)
|
|
else:
|
|
logging.error('%s is not an executable file!' % cmd[0])
|
|
|
|
proc = Popen(cmd, stdin=PIPE, stdout=PIPE)
|
|
proc.communicate(key)
|
|
|
|
if os.path.exists(filename + '.gpg'):
|
|
oldfilesize = os.path.getsize(filename)
|
|
newfilesize = os.path.getsize(filename + '.gpg')
|
|
compressed = ((oldfilesize - newfilesize) / float(oldfilesize)) * 100
|
|
logging.info('%s shrunk by %.2f%% (%i -> %i bytes)' % (filename, compressed, oldfilesize, newfilesize))
|
|
os.unlink(filename)
|
|
return filename + '.gpg'
|
|
else:
|
|
logging.error('%s.gpg does not exist' % filename)
|
|
raise Exception
|
|
|
|
def open_s3(accesskey, sharedkey, host):
|
|
conn = S3Connection(accesskey, sharedkey, is_secure=False)
|
|
mybucketname = (accesskey + '-bkup-' + host).lower()
|
|
try:
|
|
bucket = conn.get_bucket(mybucketname)
|
|
except boto.exception.S3ResponseError:
|
|
logging.info('Creating bucket %s' % mybucketname)
|
|
bucket = conn.create_bucket(mybucketname)
|
|
bucket.set_acl('private')
|
|
return bucket
|
|
|
|
def handle_progress(transmitted, pending):
|
|
logging.debug('%i of %i bytes transmitted (%.2f%%)' % (transmitted, pending, (transmitted/float(pending))*100))
|
|
|
|
def send_file(bucket, filename, cmd, mesg):
|
|
basefilename = os.path.basename(filename)
|
|
if bucket.get_key(basefilename):
|
|
logging.error('Duplicate filename %s! I hope that is OK.' % basefilename)
|
|
k = MyKey(bucket)
|
|
k.key = basefilename
|
|
if cmd: k.set_metadata('backuppc-cmd', cmd)
|
|
if mesg: k.set_metadata('backuppc-mesg', mesg)
|
|
logging.info('Uploading %s...' % basefilename)
|
|
|
|
fd = SlowFile(name=filename, mode='rb')
|
|
|
|
putHeaders = {'x-amz-storage-class': 'REDUCED_REDUNDANCY'}
|
|
|
|
k.set_contents_from_file(fd, headers=putHeaders, cb=handle_progress)
|
|
|
|
return k
|
|
|
|
# Read in arguments
|
|
if len(sys.argv) != 12:
|
|
sys.stderr.write("Usage: %s tarCreatePath splitPath parPath host bkupNum \
|
|
compPath fileExt splitSize outLoc parFile share\n" % sys.argv[0])
|
|
sys.exit(1)
|
|
else:
|
|
tarCreate = sys.argv[1]
|
|
splitPath = sys.argv[2]
|
|
parPath = sys.argv[3]
|
|
host = sys.argv[4]
|
|
bkupNum = int(sys.argv[5])
|
|
compPath = sys.argv[6]
|
|
fileExt = sys.argv[7]
|
|
splitSize = int(sys.argv[8])
|
|
outLoc = sys.argv[9]
|
|
parfile = sys.argv[10]
|
|
share = sys.argv[11]
|
|
|
|
for i in [tarCreate, compPath, splitPath, parPath]:
|
|
if i is not '' and not is_exe(i):
|
|
sys.stderr.write('Error: %s is not an executable program\n' % i)
|
|
sys.exit(1)
|
|
|
|
# open s3 connection
|
|
bucket = open_s3(secrets.accesskey, secrets.sharedkey, host)
|
|
|
|
beginning = time.time()
|
|
|
|
mesg = "Writing archive for host %s, backup #%i" % (host, bkupNum)
|
|
if splitSize > 0 and is_exe(splitPath):
|
|
mesg += ', split into %i byte chunks' % splitSize
|
|
if secrets.gpgsymmetrickey:
|
|
mesg += ', encrypted with secret key'
|
|
|
|
logging.info(mesg)
|
|
sys.stdout.write(time.strftime('%d-%H:%M:%S') + ": " + mesg + '\n')
|
|
sys.stdout.flush()
|
|
|
|
# Prepare the pipeline
|
|
if share == '*':
|
|
share = '\*'
|
|
|
|
cmd = '%s -t -h %s -n %i -s %s . ' % (tarCreate, host, bkupNum, share)
|
|
|
|
if splitSize > 0 and is_exe(splitPath):
|
|
filehead = '%s/%s.%i.tar.' % (outLoc, host, bkupNum)
|
|
fileglob = filehead + '*'
|
|
cmd += '| %s -b %i - %s' % (splitPath, splitSize, filehead)
|
|
else:
|
|
fileglob = '%s/%s.%i.tar' % (outLoc, host, bkupNum)
|
|
cmd += '> %s' % fileglob
|
|
filehead = fileglob + '.'
|
|
|
|
# is there already evidence of this having been done before?
|
|
if glob.glob('%s/%s.*.tar.*' % (outLoc, host)):
|
|
logging.info('Evidence of failed execution run prior! Finishing it.')
|
|
somefile = os.path.basename(glob.glob('%s/%s.*.tar.*' % (outLoc, host))[0])
|
|
keyparts = somefile.split('.')
|
|
encrypted = split = tarred = final = False
|
|
if keyparts[-1] == 'gpg':
|
|
keyparts.pop()
|
|
if keyparts[-1] != 'tar' and len(keyparts[-1]) is 2:
|
|
keyparts.pop()
|
|
if keyparts[-1] == 'tar':
|
|
keyparts.pop()
|
|
|
|
bkupNum = int(keyparts.pop())
|
|
|
|
filehead = '%s/%s.%i.tar.' % (outLoc, host, bkupNum)
|
|
fileglob = filehead + '*'
|
|
|
|
mesg = "Continuing upload for host %s, backup #%i" % (host, bkupNum)
|
|
if splitSize > 0 and is_exe(splitPath):
|
|
mesg += ', split into %i byte chunks' % splitSize
|
|
if secrets.gpgsymmetrickey:
|
|
mesg += ', encrypted with secret key'
|
|
|
|
logging.info(mesg)
|
|
sys.stdout.write(time.strftime('%d-%H:%M:%S') + ": " + mesg + '\n')
|
|
sys.stdout.flush()
|
|
else:
|
|
logging.debug('Executing %s' % cmd)
|
|
|
|
returncode = os.system(cmd)
|
|
|
|
if returncode != 0:
|
|
logger.error('%s died with exit code %i' % (cmd, returncode))
|
|
sys.exit(1)
|
|
|
|
logging.info('Beginning post-processing of %i files from %s #%i' % (len(glob.glob(fileglob)), host, bkupNum))
|
|
|
|
for i in sorted(glob.glob(fileglob)):
|
|
sending_start = time.time()
|
|
if secrets.gpgsymmetrickey and not i.endswith('.gpg'):
|
|
sendfile = encrypt_file(i, secrets.gpgsymmetrickey, compPath)
|
|
else:
|
|
# either encryption is off, or the file is already encrypted
|
|
sendfile = i
|
|
encrypt_seconds = time.time() - sending_start
|
|
|
|
# create some output so backuppc doesn't time out
|
|
sys.stdout.write("%s: Sending %s to S3...\n" % (time.strftime('%d-%H:%M:%S'), sendfile))
|
|
sys.stdout.flush()
|
|
|
|
retry_count = 0
|
|
max_retries = 10
|
|
|
|
while retry_count <= max_retries:
|
|
try:
|
|
key = send_file(bucket, sendfile, cmd, mesg)
|
|
key.set_acl('private')
|
|
key.close()
|
|
retry_count = max_retries+1
|
|
except (boto.exception.S3ResponseError, socket.error), e:
|
|
retry_count += 1
|
|
sleeptime = 2**retry_count
|
|
err = 'Encountered exception %s, retrying in %i seconds (%i/%i)' % (e, sleeptime, retry_count, max_retries)
|
|
logger.error(err)
|
|
sys.stdout.write(time.strftime('%d-%H:%M:%S') + ': ' + err + '\n')
|
|
sys.stdout.flush()
|
|
time.sleep(sleeptime)
|
|
|
|
size = os.path.getsize(sendfile)
|
|
|
|
os.unlink(sendfile)
|
|
|
|
sending_seconds = time.time() - sending_start
|
|
|
|
bytespersecond = size / (sending_seconds - encrypt_seconds)
|
|
|
|
sys.stdout.write('%s: File sent. Total time %i seconds, crypto time %i seconds, transfer speed %i bytes/second.\n' % (time.strftime('%d-%H:%M:%S'), sending_seconds, encrypt_seconds, bytespersecond))
|
|
sys.stdout.flush()
|
|
|
|
# finalize the backup
|
|
key = MyKey(bucket)
|
|
key.key = '%sCOMPLETE' % os.path.basename(filehead)
|
|
key.set_contents_from_string('%s %s "%s"' % (beginning, time.time(), mesg))
|
|
key.close()
|
|
|
|
|