335 lines
11 KiB
Python
Executable file
335 lines
11 KiB
Python
Executable file
#!/usr/bin/python -W ignore::DeprecationWarning
|
|
# A BackupPC script to archive a host's files to Amazon S3.
|
|
#
|
|
# Point $Conf{ArchiveClientCmd} at me.
|
|
# Requires python-boto
|
|
#
|
|
# Usage: BackupPC_archiveHost tarCreatePath splitPath parPath host bkupNum \
|
|
# compPath fileExt splitSize outLoc parFile share
|
|
#
|
|
# Create secrets.py such that it has:
|
|
# accesskey = 'amazon aws access key'
|
|
# sharedkey = 'amazon aws shared key'
|
|
# gpgsymmetrickey = 'gpg symmetric key -- make it good, but do not lose it'
|
|
|
|
import base64
|
|
import glob
|
|
import md5
|
|
import os
|
|
import secrets
|
|
import sys
|
|
import time
|
|
|
|
from subprocess import *
|
|
|
|
from boto.s3.connection import S3Connection
|
|
from boto.s3.key import Key
|
|
import boto.exception
|
|
|
|
import logging
|
|
import logging.handlers
|
|
|
|
logger = logging.getLogger('')
|
|
loghandler = logging.handlers.SysLogHandler('/dev/log',
|
|
facility=logging.handlers.SysLogHandler.LOG_DAEMON)
|
|
logformatter = logging.Formatter('%(filename)s: %(levelname)s: %(message)s')
|
|
loghandler.setFormatter(logformatter)
|
|
logger.addHandler(loghandler)
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
# classes
|
|
class MyKey(Key):
|
|
def _compute_md5(self, fp):
|
|
"""
|
|
@type fp: file
|
|
@param fp: File pointer to the file to MD5 hash
|
|
|
|
@rtype: string
|
|
@return: MD5 Hash of the file in fp
|
|
"""
|
|
m = md5.new()
|
|
s = fp.fastread(self.BufferSize)
|
|
while s:
|
|
m.update(s)
|
|
s = fp.fastread(self.BufferSize)
|
|
self.md5 = m.hexdigest()
|
|
self.base64md5 = base64.encodestring(m.digest())
|
|
if self.base64md5[-1] == '\n':
|
|
self.base64md5 = self.base64md5[0:-1]
|
|
self.size = fp.tell()
|
|
fp.seek(0)
|
|
|
|
class SlowFile:
|
|
def __init__(self, name, mode='rb', speed=19200):
|
|
"""Open a rate-limited filehandle. Supports read for now.
|
|
name, mode as normal; speed in bytes per second"""
|
|
|
|
self.name = name
|
|
self.mode = mode
|
|
self.speed = speed
|
|
|
|
self.fd = open(self.name, self.mode)
|
|
|
|
self.closed = False
|
|
self.encoding = None
|
|
|
|
self.delayfactor = 1
|
|
self.lastblocktargettime = 0
|
|
|
|
self.lastdebug = 0
|
|
|
|
def close(self):
|
|
self.closed = True
|
|
return self.fd.close()
|
|
|
|
def fastread(self, len=1024):
|
|
return self.fd.read(len)
|
|
|
|
def flush(self):
|
|
return self.fd.flush()
|
|
|
|
def fileno(self):
|
|
return self.fd.fileno()
|
|
|
|
def isatty(self):
|
|
return self.fd.isatty()
|
|
|
|
def tell(self):
|
|
return self.fd.tell()
|
|
|
|
def read(self, len=1024):
|
|
if not self.fd:
|
|
raise IOError
|
|
|
|
starttime = time.time()
|
|
|
|
if self.lastblocktargettime == 0:
|
|
# first time through
|
|
pass
|
|
elif starttime < self.lastblocktargettime:
|
|
# we're early
|
|
sleepfor = self.lastblocktargettime - starttime
|
|
time.sleep(sleepfor)
|
|
else:
|
|
# we're late; something got slow.
|
|
self.delayfactor += 0.5
|
|
logger.debug('%.2f seconds late (congestion?); setting delayfactor to %.2f' % (starttime - self.lastblocktargettime, self.delayfactor))
|
|
|
|
targetspeed = self.speed/float(self.delayfactor)
|
|
self.lastblocktargettime = time.time() + len/float(targetspeed)
|
|
|
|
if time.time() > self.lastdebug+300:
|
|
if self.delayfactor > 1:
|
|
# reduce the penalty a bit
|
|
self.delayfactor = self.delayfactor / 2.0
|
|
|
|
if self.delayfactor < 1:
|
|
self.delayfactor = 1
|
|
if self.delayfactor > 20:
|
|
self.delayfactor = 20
|
|
|
|
logger.debug('Target %i bytes/second (%i kilobits/second), delay factor %.2f, block len %i' % (targetspeed, targetspeed*8/1024, self.delayfactor, len))
|
|
|
|
try:
|
|
newkbits = int(open('/home/rtucker/Dropbox/Semaphores/BackupPC_archiveHost_s3.maxspeed', 'r').readline())
|
|
newspeed = int((newkbits/float(8))*1024)
|
|
if newspeed != self.speed:
|
|
self.delayfactor = 1
|
|
self.speed = newspeed
|
|
logger.debug('Setting new speed! %i bytes/second' % newspeed)
|
|
except IOError:
|
|
logger.debug('No Semaphore file for new speed')
|
|
except ValueError:
|
|
logger.debug('Semaphore file invalid')
|
|
|
|
self.lastdebug = time.time()
|
|
|
|
return self.fd.read(len)
|
|
|
|
def seek(self, offset, mode=0):
|
|
self.delayfactor = 1
|
|
return self.fd.seek(offset, mode)
|
|
|
|
def write(self, data):
|
|
raise NotImplementedError
|
|
|
|
# functions
|
|
def is_exe(fpath):
|
|
return os.path.exists(fpath) and os.access(fpath, os.X_OK)
|
|
|
|
def encrypt_file(filename, key, compress='/bin/cat'):
|
|
compressmap = {'cat': 'none', 'gzip': 'ZLIB', 'bzip2': 'BZIP2'}
|
|
if os.path.basename(compress) in compressmap.keys():
|
|
compress_algo = compressmap[os.path.basename(compress)]
|
|
else:
|
|
compress_algo = 'none'
|
|
|
|
cmd = ['/usr/bin/gpg', '--batch', '--no-tty']
|
|
cmd.extend(['--compress-algo', compress_algo])
|
|
cmd.extend(['--output', '%s.gpg' % filename])
|
|
cmd.extend(['--passphrase-fd', '0'])
|
|
cmd.extend(['--symmetric', filename])
|
|
|
|
if is_exe(cmd[0]):
|
|
logging.info('Encrypting %s (compression: %s)' % (filename, compress_algo))
|
|
logging.debug(`cmd`)
|
|
else:
|
|
logging.error('%s is not an executable file!' % cmd[0])
|
|
|
|
proc = Popen(cmd, stdin=PIPE, stdout=PIPE)
|
|
proc.communicate(key)
|
|
|
|
if os.path.exists(filename + '.gpg'):
|
|
oldfilesize = os.path.getsize(filename)
|
|
newfilesize = os.path.getsize(filename + '.gpg')
|
|
compressed = ((oldfilesize - newfilesize) / float(oldfilesize)) * 100
|
|
logging.info('%s shrunk by %.2f%% (%i -> %i bytes)' % (filename, compressed, oldfilesize, newfilesize))
|
|
os.unlink(filename)
|
|
return filename + '.gpg'
|
|
else:
|
|
logging.error('%s.gpg does not exist' % filename)
|
|
raise Exception
|
|
|
|
def open_s3(accesskey, sharedkey, host):
|
|
conn = S3Connection(accesskey, sharedkey, is_secure=False)
|
|
mybucketname = (accesskey + '-bkup-' + host).lower()
|
|
try:
|
|
bucket = conn.get_bucket(mybucketname)
|
|
except boto.exception.S3ResponseError:
|
|
logging.info('Creating bucket %s' % mybucketname)
|
|
bucket = conn.create_bucket(mybucketname)
|
|
bucket.set_acl('private')
|
|
return bucket
|
|
|
|
def handle_progress(transmitted, pending):
|
|
logging.debug('%i of %i bytes transmitted (%.2f%%)' % (transmitted, pending, (transmitted/float(pending))*100))
|
|
|
|
def send_file(bucket, filename):
|
|
basefilename = os.path.basename(filename)
|
|
if bucket.get_key(basefilename):
|
|
logging.error('Duplicate filename %s! I hope that is OK.' % basefilename)
|
|
k = MyKey(bucket)
|
|
k.key = basefilename
|
|
logging.info('Uploading %s...' % basefilename)
|
|
|
|
fd = SlowFile(name=filename, mode='rb')
|
|
|
|
k.set_contents_from_file(fd, cb=handle_progress)
|
|
|
|
return k
|
|
|
|
# Read in arguments
|
|
if len(sys.argv) != 12:
|
|
sys.stderr.write("Usage: %s tarCreatePath splitPath parPath host bkupNum \
|
|
compPath fileExt splitSize outLoc parFile share\n" % sys.argv[0])
|
|
sys.exit(1)
|
|
else:
|
|
tarCreate = sys.argv[1]
|
|
splitPath = sys.argv[2]
|
|
parPath = sys.argv[3]
|
|
host = sys.argv[4]
|
|
bkupNum = int(sys.argv[5])
|
|
compPath = sys.argv[6]
|
|
fileExt = sys.argv[7]
|
|
splitSize = int(sys.argv[8])
|
|
outLoc = sys.argv[9]
|
|
parfile = sys.argv[10]
|
|
share = sys.argv[11]
|
|
|
|
for i in [tarCreate, compPath, splitPath, parPath]:
|
|
if i is not '' and not is_exe(i):
|
|
sys.stderr.write('Error: %s is not an executable program\n' % i)
|
|
sys.exit(1)
|
|
|
|
# open s3 connection
|
|
bucket = open_s3(secrets.accesskey, secrets.sharedkey, host)
|
|
|
|
beginning = time.time()
|
|
|
|
mesg = "Writing archive for host %s, backup #%i" % (host, bkupNum)
|
|
if splitSize > 0 and is_exe(splitPath):
|
|
mesg += ', split into %i byte chunks' % splitSize
|
|
if secrets.gpgsymmetrickey:
|
|
mesg += ', encrypted with secret key'
|
|
|
|
logging.info(mesg)
|
|
sys.stdout.write(time.strftime('%d-%H:%M:%S') + ": " + mesg + '\n')
|
|
sys.stdout.flush()
|
|
|
|
# Prepare the pipeline
|
|
if share == '*':
|
|
share = '\*'
|
|
|
|
cmd = '%s -t -h %s -n %i -s %s . ' % (tarCreate, host, bkupNum, share)
|
|
|
|
if splitSize > 0 and is_exe(splitPath):
|
|
filehead = '%s/%s.%i.tar.' % (outLoc, host, bkupNum)
|
|
fileglob = filehead + '*'
|
|
cmd += '| %s -b %i - %s' % (splitPath, splitSize, filehead)
|
|
else:
|
|
fileglob = '%s/%s.%i.tar' % (outLoc, host, bkupNum)
|
|
cmd += '> %s' % fileglob
|
|
filehead = fileglob + '.'
|
|
|
|
# is there already evidence of this having been done before?
|
|
if glob.glob(fileglob):
|
|
logging.info('Evidence of failed execution run prior! Finishing it.')
|
|
else:
|
|
logging.debug('Executing %s' % cmd)
|
|
|
|
returncode = os.system(cmd)
|
|
|
|
if returncode != 0:
|
|
logger.error('%s died with exit code %i' % (cmd, returncode))
|
|
sys.exit(1)
|
|
|
|
logging.info('Beginning post-processing of %i files from %s #%i' % (len(glob.glob(fileglob)), host, bkupNum))
|
|
|
|
for i in sorted(glob.glob(fileglob)):
|
|
sending_start = time.time()
|
|
if secrets.gpgsymmetrickey and not i.endswith('.gpg'):
|
|
sendfile = encrypt_file(i, secrets.gpgsymmetrickey, compPath)
|
|
else:
|
|
# either encryption is off, or the file is already encrypted
|
|
sendfile = i
|
|
encrypt_seconds = time.time() - sending_start
|
|
|
|
# create some output so backuppc doesn't time out
|
|
sys.stdout.write("%s: Sending %s to S3...\n" % (time.strftime('%d-%H:%M:%S'), sendfile))
|
|
sys.stdout.flush()
|
|
|
|
retry_count = 0
|
|
max_retries = 10
|
|
|
|
while retry_count <= max_retries:
|
|
try:
|
|
key = send_file(bucket, sendfile)
|
|
key.set_metadata('backuppc_cmd', cmd)
|
|
key.set_metadata('backuppc_mesg', mesg)
|
|
key.set_acl('private')
|
|
key.update_metadata()
|
|
key.close()
|
|
retry_count = max_retries+1
|
|
except boto.exception.S3ResponseError as e:
|
|
retry_count += 1
|
|
sleeptime = 2**retry_count
|
|
log.error('Encountered S3 exception %s, retrying in %i seconds (%i/%i)' % (e, sleeptime, retry_count, max_retries))
|
|
time.sleep(sleeptime)
|
|
|
|
size = os.path.getsize(sendfile)
|
|
|
|
os.unlink(sendfile)
|
|
|
|
sending_seconds = time.time() - sending_start
|
|
|
|
bytespersecond = size / (sending_seconds - encrypt_seconds)
|
|
|
|
sys.stdout.write('%s: File sent. Total time %i seconds, crypto time %i seconds, transfer speed %i bytes/second.\n' % (time.strftime('%d-%H:%M:%S'), sending_seconds, encrypt_seconds, bytespersecond))
|
|
|
|
# finalize the backup
|
|
key = MyKey(bucket)
|
|
key.key = '%sCOMPLETE' % filehead
|
|
key.set_contents_from_string('%s %s "%s"' % (beginning, time.time(), mesg))
|
|
key.close()
|
|
|
|
|