#!/usr/bin/python -W ignore::DeprecationWarning # A BackupPC script to archive a host's files to Amazon S3. # # Point $Conf{ArchiveClientCmd} at me. # Requires python-boto # # Usage: BackupPC_archiveHost tarCreatePath splitPath parPath host bkupNum \ # compPath fileExt splitSize outLoc parFile share # # Create secrets.py such that it has: # accesskey = 'amazon aws access key' # sharedkey = 'amazon aws shared key' # gpgsymmetrickey = 'gpg symmetric key -- make it good, but do not lose it' # speedfile = '/path/to/a/speed-limit-file' import base64 import glob import md5 import os import secrets import socket import sys import time from subprocess import * from boto.s3.connection import S3Connection from boto.s3.key import Key import boto.exception import logging import logging.handlers logger = logging.getLogger('') loghandler = logging.handlers.SysLogHandler('/dev/log', facility=logging.handlers.SysLogHandler.LOG_DAEMON) logformatter = logging.Formatter('%(filename)s: %(levelname)s: %(message)s') loghandler.setFormatter(logformatter) logger.addHandler(loghandler) logger.setLevel(logging.DEBUG) # classes class MyKey(Key): def _compute_md5(self, fp): """ @type fp: file @param fp: File pointer to the file to MD5 hash @rtype: string @return: MD5 Hash of the file in fp """ m = md5.new() try: s = fp.fastread(self.BufferSize) except AttributeError: s = fp.read(self.BufferSize) while s: m.update(s) try: s = fp.fastread(self.BufferSize) except AttributeError: s = fp.read(self.BufferSize) self.md5 = m.hexdigest() self.base64md5 = base64.encodestring(m.digest()) if self.base64md5[-1] == '\n': self.base64md5 = self.base64md5[0:-1] self.size = fp.tell() fp.seek(0) class SlowFile: def __init__(self, name, mode='rb', speed=19200): """Open a rate-limited filehandle. Supports read for now. name, mode as normal; speed in bytes per second""" self.name = name self.mode = mode self.speed = speed self.fd = open(self.name, self.mode) self.closed = False self.encoding = None self.delayfactor = 1 self.lastblocktargettime = 0 self.lastdebug = 0 def close(self): self.closed = True return self.fd.close() def fastread(self, len=1024): return self.fd.read(len) def flush(self): return self.fd.flush() def fileno(self): return self.fd.fileno() def isatty(self): return self.fd.isatty() def tell(self): return self.fd.tell() def read(self, len=1024): if not self.fd: raise IOError starttime = time.time() if self.lastblocktargettime == 0: # first time through pass elif starttime < self.lastblocktargettime: # we're early sleepfor = self.lastblocktargettime - starttime time.sleep(sleepfor) else: # we're late; something got slow. lateness = starttime - self.lastblocktargettime if lateness > 0.2: self.delayfactor += min(0.5, lateness) logger.debug('%.2f seconds late (congestion?); setting delayfactor to %.2f' % (lateness, self.delayfactor)) targetspeed = self.speed/float(self.delayfactor) self.lastblocktargettime = time.time() + len/float(targetspeed) if time.time() > self.lastdebug+60: if self.delayfactor > 1: # reduce the penalty a bit self.delayfactor -= 0.2 targetspeed = self.speed/float(self.delayfactor) self.lastblocktargettime = time.time() + len/float(targetspeed) if self.delayfactor < 1: self.delayfactor = 1 if self.delayfactor > 20: self.delayfactor = 20 logger.debug('Target %i bytes/second (%i kilobits/second), delay factor %.2f, block len %i' % (targetspeed, targetspeed*8/1024, self.delayfactor, len)) if hasattr(secrets, 'speedfile'): try: newkbits = int(open(secrets.speedfile, 'r').readline()) newspeed = int((newkbits/float(8))*1024) if newspeed != self.speed: self.delayfactor = 1 self.speed = newspeed logger.debug('Setting new speed! %i bytes/second' % newspeed) self.lastblocktargettime = 0 except IOError: logger.debug('IO Error opening semaphore file') except ValueError: logger.debug('Semaphore file invalid') self.lastdebug = time.time() return self.fd.read(len) def seek(self, offset, mode=0): self.delayfactor = 1 return self.fd.seek(offset, mode) def write(self, data): raise NotImplementedError # functions def is_exe(fpath): return os.path.exists(fpath) and os.access(fpath, os.X_OK) def encrypt_file(filename, key, compress='/bin/cat'): compressmap = {'cat': 'none', 'gzip': 'ZLIB', 'bzip2': 'BZIP2'} if os.path.basename(compress) in compressmap.keys(): compress_algo = compressmap[os.path.basename(compress)] else: compress_algo = 'none' cmd = ['/usr/bin/gpg', '--batch', '--no-tty'] cmd.extend(['--compress-algo', compress_algo]) cmd.extend(['--output', '%s.gpg' % filename]) cmd.extend(['--passphrase-fd', '0']) cmd.extend(['--symmetric', filename]) if is_exe(cmd[0]): logger.info('Encrypting %s (compression: %s)' % (filename, compress_algo)) logger.debug(`cmd`) else: logger.error('%s is not an executable file!' % cmd[0]) proc = Popen(cmd, stdin=PIPE, stdout=PIPE) proc.communicate(key) if os.path.exists(filename + '.gpg'): oldfilesize = os.path.getsize(filename) newfilesize = os.path.getsize(filename + '.gpg') compressed = ((oldfilesize - newfilesize) / float(oldfilesize)) * 100 logger.info('%s shrunk by %.2f%% (%i -> %i bytes)' % (filename, compressed, oldfilesize, newfilesize)) os.unlink(filename) return filename + '.gpg' else: logger.error('%s.gpg does not exist' % filename) raise Exception def open_s3(accesskey, sharedkey, host): conn = S3Connection(accesskey, sharedkey, is_secure=False) mybucketname = (accesskey + '-bkup-' + host).lower() try: bucket = conn.get_bucket(mybucketname) except boto.exception.S3ResponseError: logging.info('Creating bucket %s' % mybucketname) bucket = conn.create_bucket(mybucketname) bucket.set_acl('private') return bucket def handle_progress(transmitted, pending): logging.debug('%i of %i bytes transmitted (%.2f%%)' % (transmitted, pending, (transmitted/float(pending))*100)) def send_file(bucket, filename, cmd, mesg): basefilename = os.path.basename(filename) if bucket.get_key(basefilename): logging.error('Duplicate filename %s! I hope that is OK.' % basefilename) k = MyKey(bucket) k.key = basefilename if cmd: k.set_metadata('backuppc-cmd', cmd) if mesg: k.set_metadata('backuppc-mesg', mesg) logging.info('Uploading %s...' % basefilename) fd = SlowFile(name=filename, mode='rb') putHeaders = {'x-amz-storage-class': 'REDUCED_REDUNDANCY'} k.set_contents_from_file(fd, headers=putHeaders, cb=handle_progress) return k if __name__ == '__main__': # Read in arguments if len(sys.argv) != 12: sys.stderr.write("Usage: %s tarCreatePath splitPath parPath host bkupNum \ compPath fileExt splitSize outLoc parFile share\n" % sys.argv[0]) sys.exit(1) else: tarCreate = sys.argv[1] splitPath = sys.argv[2] parPath = sys.argv[3] host = sys.argv[4] bkupNum = int(sys.argv[5]) compPath = sys.argv[6] fileExt = sys.argv[7] splitSize = int(sys.argv[8]) outLoc = sys.argv[9] parfile = sys.argv[10] share = sys.argv[11] for i in [tarCreate, compPath, splitPath, parPath]: if i is not '' and not is_exe(i): sys.stderr.write('Error: %s is not an executable program\n' % i) sys.exit(1) # open s3 connection bucket = open_s3(secrets.accesskey, secrets.sharedkey, host) beginning = time.time() mesg = "Writing archive for host %s, backup #%i" % (host, bkupNum) if splitSize > 0 and is_exe(splitPath): mesg += ', split into %i byte chunks' % splitSize if secrets.gpgsymmetrickey: mesg += ', encrypted with secret key' logger.info(mesg) sys.stdout.write(time.strftime('%d-%H:%M:%S') + ": " + mesg + '\n') sys.stdout.flush() # Prepare the pipeline if share == '*': share = '\*' cmd = '%s -t -h %s -n %i -s %s . ' % (tarCreate, host, bkupNum, share) if splitSize > 0 and is_exe(splitPath): filehead = '%s/%s.%i.tar.' % (outLoc, host, bkupNum) fileglob = filehead + '*' cmd += '| %s -b %i - %s' % (splitPath, splitSize, filehead) else: fileglob = '%s/%s.%i.tar' % (outLoc, host, bkupNum) cmd += '> %s' % fileglob filehead = fileglob + '.' # is there already evidence of this having been done before? if glob.glob('%s/%s.*.tar.*' % (outLoc, host)): logger.info('Evidence of failed execution run prior! Finishing it.') somefile = os.path.basename(glob.glob('%s/%s.*.tar.*' % (outLoc, host))[0]) keyparts = somefile.split('.') encrypted = split = tarred = final = False if keyparts[-1] == 'gpg': keyparts.pop() if keyparts[-1] != 'tar' and len(keyparts[-1]) is 2: keyparts.pop() if keyparts[-1] == 'tar': keyparts.pop() bkupNum = int(keyparts.pop()) filehead = '%s/%s.%i.tar.' % (outLoc, host, bkupNum) fileglob = filehead + '*' mesg = "Continuing upload for host %s, backup #%i" % (host, bkupNum) if splitSize > 0 and is_exe(splitPath): mesg += ', split into %i byte chunks' % splitSize if secrets.gpgsymmetrickey: mesg += ', encrypted with secret key' logger.info(mesg) sys.stdout.write(time.strftime('%d-%H:%M:%S') + ": " + mesg + '\n') sys.stdout.flush() else: logger.debug('Executing %s' % cmd) returncode = os.system(cmd) if returncode != 0: logger.error('%s died with exit code %i' % (cmd, returncode)) sys.exit(1) logging.info('Beginning post-processing of %i files from %s #%i' % (len(glob.glob(fileglob)), host, bkupNum)) for i in sorted(glob.glob(fileglob)): sending_start = time.time() if secrets.gpgsymmetrickey and not i.endswith('.gpg'): sendfile = encrypt_file(i, secrets.gpgsymmetrickey, compPath) else: # either encryption is off, or the file is already encrypted sendfile = i encrypt_seconds = time.time() - sending_start # create some output so backuppc doesn't time out sys.stdout.write("%s: Sending %s to S3...\n" % (time.strftime('%d-%H:%M:%S'), sendfile)) sys.stdout.flush() retry_count = 0 max_retries = 10 while retry_count <= max_retries: try: key = send_file(bucket, sendfile, cmd, mesg) key.set_acl('private') key.close() retry_count = max_retries+1 except (boto.exception.S3ResponseError, socket.error), e: retry_count += 1 sleeptime = 2**retry_count err = 'Encountered exception %s, retrying in %i seconds (%i/%i)' % (e, sleeptime, retry_count, max_retries) logger.error(err) sys.stdout.write(time.strftime('%d-%H:%M:%S') + ': ' + err + '\n') sys.stdout.flush() time.sleep(sleeptime) size = os.path.getsize(sendfile) os.unlink(sendfile) sending_seconds = time.time() - sending_start bytespersecond = size / (sending_seconds - encrypt_seconds) sys.stdout.write('%s: File sent. Total time %i seconds, crypto time %i seconds, transfer speed %i bytes/second.\n' % (time.strftime('%d-%H:%M:%S'), sending_seconds, encrypt_seconds, bytespersecond)) sys.stdout.flush() # finalize the backup key = MyKey(bucket) key.key = '%sCOMPLETE' % os.path.basename(filehead) key.set_contents_from_string('%s %s "%s"' % (beginning, time.time(), mesg)) key.close()