From dca2dbdad891c51419b722d6cb6c8ba30410910e Mon Sep 17 00:00:00 2001 From: flifloo Date: Fri, 8 Apr 2022 18:20:55 +0200 Subject: [PATCH] Rewrite a lot of BackupPC_archiveHost_s3 to b2 --- .gitignore | 4 +- BackupPC_archiveHost_b2.py | 616 +++++++++++++++++++++++++++++++++++++ BackupPC_archiveHost_s3 | 407 ------------------------ Dockerfile | 3 + README.markdown | 17 +- requirements.txt | 3 + secrets.py.orig | 7 +- 7 files changed, 633 insertions(+), 424 deletions(-) create mode 100755 BackupPC_archiveHost_b2.py delete mode 100755 BackupPC_archiveHost_s3 create mode 100644 Dockerfile create mode 100644 requirements.txt diff --git a/.gitignore b/.gitignore index a6e9ced..e9d6025 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ -secrets.py +/.idea/ +/secrets.py *.pyc +/.venv/ diff --git a/BackupPC_archiveHost_b2.py b/BackupPC_archiveHost_b2.py new file mode 100755 index 0000000..fb6e154 --- /dev/null +++ b/BackupPC_archiveHost_b2.py @@ -0,0 +1,616 @@ +#!/usr/bin/python3 +# A BackupPC script to archive a host"s files to Backblaze B2. +# +# Point $Conf{ArchiveClientCmd} at me. +# see requirements.txt +# +# Usage: BackupPC_archiveHost_b2.py tarCreatePath splitPath parPath host bkupNum \ +# compPath fileExt splitSize outLoc parFile share +# +# Create secrets.py such that it has: +# access_key = "amazon aws access key" +# shared_key = "amazon aws shared key" +# gpg_symmetric_key = "gpg symmetric key -- make it good, but do not lose it" +# +# Copyright (c) 2009-2011 Ryan S. Tucker +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +from argparse import ArgumentParser +from hashlib import md5, sha1 +from logging import getLogger, Formatter, StreamHandler, DEBUG, WARNING, INFO +from logging.handlers import RotatingFileHandler +from multiprocessing import Process, Queue, cpu_count +from os import access, X_OK, nice +from pathlib import Path +from re import search, escape +from subprocess import Popen +from sys import stdout +from time import time, sleep +from typing import Dict + +from b2sdk.v2 import InMemoryAccountInfo, B2Api, Bucket, AbstractProgressListener +from b2sdk.v2.exception import B2Error, NonExistentBucket +from gnupg import GPG +from math import ceil +from progress.bar import ChargingBar + +import secrets + + +logger = getLogger(__name__) +logger_formatter = Formatter("%(asctime)s: %(levelname)s: %(message)s") +file_handler = RotatingFileHandler("/tmp/archive_run.log") +file_handler.setFormatter(logger_formatter) +logger.addHandler(file_handler) +console_handler = StreamHandler(stdout) +console_handler.setFormatter(logger_formatter) +logger.addHandler(console_handler) +logger.setLevel(INFO) + +b2_info = InMemoryAccountInfo() +b2_api = B2Api(b2_info) + +Path("/tmp/gnupg").mkdir(parents=True, exist_ok=True) +gpg = GPG(gpgbinary="/usr/bin/gpg2", gnupghome="/tmp/gnupg") + + +class ProgressListener(AbstractProgressListener): + """ + Listener for B2 progression printed in logger every percent + """ + total_byte_count = 0 + total_send_bytes = 0 + bar = None + enable = True + + def __init__(self, filename: str): + """ + :param filename: The name of the file targeted for the progression + """ + super().__init__() + self.filename = filename + self.bar = ChargingBar(filename) + + def set_total_bytes(self, total_byte_count: int): + """ + Set the total byte count + :param total_byte_count: The total count of bytes + """ + self.total_byte_count = total_byte_count + self.bar.max = self.total_byte_count + self.enable = True + + def bytes_completed(self, byte_count: int): + """ + Indicate the progression + :param byte_count: The number of bytes made during the last action + """ + if not self.enable: + return + + self.total_send_bytes += byte_count + self.bar.next(byte_count) + if self.total_send_bytes >= self.total_byte_count: + self.bar.finish() + self.enable = False + + +def sizeof_fmt(num: int, suffix="B") -> str: + """ + Reformat size to human-readable string + :param num: The size + :param suffix: The input size unit + :return: The size in a human-readable version + """ + for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: + if abs(num) < 1024.0: + return f"{num:3.1f}{unit}{suffix}" + num /= 1024.0 + return f"{num:.1f}Yi{suffix}" + + +def exec_path(path: str) -> Path: + """ + Check if the given string is a valid path and an executable + :param path: The executable path + :return: The path object if valid + :raise ValueError: If the path didn't exist or is not an executable + """ + path = Path(path) + if not path.exists() or not is_exe(path): + raise ValueError("Path not executable") + else: + return path + + +def dir_path(path: str) -> Path: + """ + Check if the given string is a valid path and a directory + :param path: The directory path + :return: The path object if valid + :raise ValueError: If the path didn't exist or is not a directory + """ + path = Path(path) + if not path.exists() or not path.is_dir(): + raise ValueError("Path not a directory") + else: + return path + + +def positive_int(i: str) -> int: + i = int(i) + if i <= 0: + raise ValueError("Should be greater than 0") + return i + + +def is_exe(fpath: Path) -> bool: + """ + Check if a file est executable + :param fpath: The path to the file to check + :return: True if the file is executable, False instead + """ + return fpath.exists() and access(fpath, X_OK) + + +def encrypt_file(filename: Path, key: str, compress: Path = Path("/bin/cat")) -> Path: + """ + Encrypt a file with a kay and compress it, returning the path to the new file + :param filename: The path to the file to encrypt + :param key: The key for encryption + :param compress: The compression used + :return: The path to the encrypted file + """ + compress_map = {"cat": "none", "gzip": "ZLIB", "bzip2": "BZIP2"} + encrypt_output = Path(f"{filename}.gpg") + + if compress.name in compress_map.keys(): + compress_algo = compress_map[compress.name] + else: + compress_algo = "none" + + logger.debug(f"encrypt_file: encrypting {filename} (compression: {compress_algo})") + with open(filename, "rb") as f: + gpg.encrypt_file(file=f, output=encrypt_output, recipients=None, symmetric=True, passphrase=key, + armor=False, extra_args=["--compress-algo", compress_algo]) + + if encrypt_output.exists() and encrypt_output.stat().st_size: + old_filesize = filename.stat().st_size + new_filesize = encrypt_output.stat().st_size + + compressed = ((old_filesize - new_filesize) / float(old_filesize)) * 100 + logger.debug(f"encrypt_file: {filename} {'shrunk' if old_filesize > new_filesize else 'grew'} by " + f"{compressed:.2f}% ({sizeof_fmt(old_filesize)} -> {sizeof_fmt(new_filesize)})") + return encrypt_output + else: + raise RuntimeError(f"output file does not exist: {encrypt_output}") + + +def open_b2(access_key: str, shared_key: str, host: str) -> Bucket: + """ + Get the B2 bucket for a host + :param access_key: The application key id + :param shared_key: The application key + :param host: The host name to generate the bucket name + :return: The host B2 bucket + """ + b2_api.authorize_account("production", access_key, shared_key) + my_bucket_name = f"{access_key}-bckpc-{host.replace('.', '-')}".lower() + lifecycle_rules = [{ + 'daysFromHidingToDeleting': 1, + 'daysFromUploadingToHiding': None, + 'fileNamePrefix': '' + }] + + try: + bucket = b2_api.get_bucket_by_name(my_bucket_name) + except NonExistentBucket: + logger.info(f"open_b2: creating new bucket {my_bucket_name}") + # noinspection PyTypeChecker + bucket = b2_api.create_bucket(my_bucket_name, "allPrivate", lifecycle_rules=lifecycle_rules) + return bucket + + +def get_file_hash(file: Path, algo: str = "md5") -> str: + """ + Get the hash of a file + :param file: The path to the file + :param algo: The hash algorithm (sha1/md5) + :return: The hash string + """ + if algo.lower() == "md5": + file_hash = md5() + elif algo.lower() == "sha1": + file_hash = sha1() + else: + raise ValueError("Invalid algo") + + with open(file, "rb") as fp: + while True: + data = fp.read(65536) + if not data: + break + file_hash.update(data) + + return file_hash.hexdigest() + + +def verify_file(bucket: Bucket, filename: Path, base_filename: str) -> bool: + """ + Check if a local file is the same as a file in the bucket + :param bucket: The target bucket + :param filename: The path ot the file to check + :param base_filename: The filename inside the bucket + :return: True if the file size and hash match, False otherwise + """ + file_stat = filename.stat() + info = next(bucket.list_file_versions(base_filename), None) + + if not info or info.size != file_stat.st_size: + return False + elif info.content_md5 and info.content_md5 != "none": + remote_hash = info.content_md5 + algo = "md5" + elif info.content_sha1 and info.content_sha1 != "none": + remote_hash = info.content_sha1 + algo = "sha1" + else: + logger.error(f"verify_file: {base_filename}: no remote hash") + return False + + local_hash = get_file_hash(filename, algo) + + logger.debug(f'verify_file: {base_filename}: local {algo} "{local_hash}", remote {remote_hash}') + return local_hash == remote_hash + + +def send_file(bucket: Bucket, filename: Path): + """ + Send a file to a bucket + :param bucket: The target buck + :param filename: The path to the file to upload + """ + base_filename = filename.name + + versions = list(bucket.list_file_versions(base_filename)) + if versions: + if verify_file(bucket, filename, base_filename): + logger.warning(f"send_file: {base_filename} already exists and is identical, not overwriting", ) + return + else: + logger.warning(f"send_file: {base_filename} already exists on B2, overwriting") + + for v in versions: + v.delete() + + file_hash = get_file_hash(filename, "sha1") + + bucket.upload_local_file(str(filename), base_filename, progress_listener=ProgressListener(base_filename), + sha1_sum=file_hash) + + +def encryption_worker(queues: Dict[str, Queue]): + """ + Encrypts things from the queues + :param queues: Dictionary of queues + """ + start_time = time() + counter = 0 + for filename, gpg_key, comp_path in iter(queues["gpg_queue"].get, "STOP"): + counter += 1 + crypt_start_time = time() + logger.info(f"encryption_worker: encrypting {filename}") + result = encrypt_file(filename, gpg_key, comp_path) + queues["send_queue"].put(result) + queues["unlink_queue"].put(filename) + logger.debug(f"encryption_worker: encrypted {filename} in {time() - crypt_start_time:.2f} seconds") + logger.debug(f"encryption_worker: queue is empty, terminating after {counter} items in {time() - start_time:.2f} " + f"seconds") + sleep(5) # settle + + +def sending_worker(queues: Dict[str, Queue], access_key: str, shared_key: str, host: str): + """ + Sends things from the queues + :param queues: Dictionary of queues + :param access_key: B2 access key + :param shared_key: B2 shared key + :param host: host + """ + start_time = time() + counter = 0 + for filename in iter(queues["send_queue"].get, "STOP"): + sending_start = time() + counter += 1 + retry_count = 0 + max_retries = 10 + done = False + + while retry_count <= max_retries and not done: + try: + logger.info(f"sending_worker: sending {filename}") + bucket = open_b2(access_key, shared_key, host) + send_file(bucket, filename) + done = True + except B2Error as e: + retry_count += 1 + sleep_time = 2 ** retry_count + logger.error(f"sending_worker: exception {e}, retrying in {sleep_time} seconds ({retry_count}/" + f"{max_retries})") + logger.exception(e) + sleep(sleep_time) + + if not done: + # trip out + logger.error(f"sending_worker: could not upload {filename} in {retry_count} retries") + else: + size = filename.stat().st_size + sending_seconds = time() - sending_start + bytes_per_second = size / sending_seconds + logger.debug(f"sending_worker: sent {filename} in {sending_seconds} seconds at " + f"{sizeof_fmt(bytes_per_second)}/second.") + queues["unlink_queue"].put(filename) + + logger.debug(f"sending_worker: queue is empty, terminating after {counter} items in {time() - start_time} seconds") + sleep(5) # settle + + +def unlink_worker(queues: Dict[str, Queue]): + """ + Unlink things from the queue + :param queues: Dictionary of queues + """ + start_time = time() + counter = 0 + for filename in iter(queues["unlink_queue"].get, "STOP"): + counter += 1 + logger.debug(f"unlink_worker: deleting {filename}") + try: + filename.unlink() + except FileNotFoundError as e: + logger.warning(f"unlink_worker: caught exception: {e}") + + logger.debug(f"unlink_worker: queue is empty, terminating after {counter} items in {time() - start_time} seconds") + sleep(5) # settle + + +def workers(queues: Dict[str, Queue], host: str, out_loc: Path, bkup_num: int, + beginning: time, msg: str, process_count: int = None): + """ + Manage workers for archiving + :param queues: Dictionary of queues + :param host: Host name + :param out_loc: The temporary location path + :param bkup_num: The backup number to archive + :param beginning: The beginning time to write on COMPLETED file + :param msg: Log message to write on COMPLETED file + :param process_count: Number of parallels workers + """ + # Start some handlers, wait until everything is done + if not process_count: + try: + process_count = cpu_count() + except NotImplementedError: + process_count = 1 + + encryption_procs = [] + for ep in range(process_count): + p = Process(name=f"encryption_worker_{ep}", target=encryption_worker, + args=(queues,)) + queues["gpg_queue"].put("STOP") + p.start() + encryption_procs.append(p) + + send_procs = [] + for sp in range(ceil(process_count/2)): + p = Process(name=f"send_worker_{sp}", target=sending_worker, + args=(queues, secrets.access_key, secrets.shared_key, host)) + p.start() + send_procs.append(p) + + unlink_procs = [] + for up in range(ceil(process_count/4)): + p = Process(name=f"unlink_worker_{up}", target=unlink_worker, args=(queues,)) + p.start() + unlink_procs.append(p) + + send_queue_closed = False + unlink_queue_closed = False + + for ep in encryption_procs: + # wait for each process to terminate in turn + ep.join() + logger.debug(f"workers: process terminated: {ep.name}") + + if not next(filter(lambda e: e.is_alive(), encryption_procs), None) and not send_queue_closed: + # crypto is done, close up the send queue + logger.debug("workers: queuing final file") + final_file = out_loc / f"{host}.{bkup_num}.tar.COMPLETE" + with open(final_file, "w") as fp: + fp.write(f'{beginning} {time()} "{msg}"') + queues["send_queue"].put(final_file) + + logger.debug("workers: queuing stop sentinel for send_queue") + for _ in send_procs: + queues["send_queue"].put("STOP") + send_queue_closed = True + + if send_queue_closed: + for sp in send_procs: + sp.join() + logger.debug(f"workers: process terminated: {sp.name}") + + if not next(filter(lambda s: s.is_alive(), send_procs), None) and not unlink_queue_closed: + # sending is done, close up the unlink queue + logger.debug("workers: queuing stop sentinel for unlink_queue") + for _ in unlink_procs: + queues["unlink_queue"].put("STOP") + unlink_queue_closed = True + + if unlink_queue_closed: + for up in unlink_procs: + up.join() + logger.debug(f"workers: process terminated: {up.name}") + + for qname, q in queues.items(): + sleep(5) # settle + if not q.empty(): + logger.critical(f"workers: queue {qname} not empty!") + raise Exception(f"queue not empty: {qname}") + else: + logger.debug(f"workers: queue {qname} is empty") + + +def archive(tar_create: Path, split_path: Path, par_path: Path, host: str, bkup_num: int, comp_path: Path, + file_ext: str, split_size: int, out_loc: Path, par_file: int, share: str, jobs: int = None): + """ + Archie a host to a B2 storage + :param tar_create: The path to the tar binary + :param split_path: The path to the split binary + :param par_path: The path to the parity binary (not used) + :param host: The host name + :param bkup_num: The backup number to archive + :param comp_path: The compression binary + :param file_ext: The extension assigned to the compression type (not used) + :param split_size: The archive split size + :param out_loc: The temporary location path + :param par_file: The amount of parity data to create (not used) + :param share: Backup share to archive + """ + beginning = time() + + # Create queues for workers + queues = { + "gpg_queue": Queue(), + "send_queue": Queue(), + "unlink_queue": Queue(), + } + + g = list(out_loc.glob(f"{host}.*.tar.*")) + file_glob = "" + # Is there already evidence of this having been done before? + if g: + logger.warning("main: finishing previous incomplete run") + some_file = g[0].name + r = search(rf"{escape(host)}\.-?([0-9]+)\.tar\..*", some_file) + bkup_num = int(r.groups()[0]) + + file_glob = ".*" + + msg = f"Continuing upload for host {host}, backup #{bkup_num}" + if split_size > 0: + msg += f", split into {split_size} byte chunks" + if secrets.gpg_symmetric_key: + msg += ", encrypted with secret key" + logger.info(f"main: {msg}") + else: + msg = f"Writing archive for host {host}, backup #{bkup_num}" + + tar_cmd = [str(tar_create), "-t"] + tar_cmd.extend(["-h", host]) + tar_cmd.extend(["-n", str(bkup_num)]) + tar_cmd.extend(["-s", share]) + tar_cmd.extend(["."]) + + split_cmd = None + outfile = out_loc / f"{host}.{bkup_num}.tar" + + if split_size > 0: + file_glob = ".*" + split_cmd = [str(split_path), "-b", str(split_size), "-", str(out_loc / f"{host}.{bkup_num}.tar.")] + msg += f", split into {split_size} byte chunks" + + if secrets.gpg_symmetric_key: + msg += ", encrypted with secret key" + + logger.info(f"main: {msg}") + logger.debug(f"main: executing tar_cmd: {' '.join(tar_cmd)} > {outfile}") + + tar_fp = open(outfile, "wb") + proc = Popen(tar_cmd, preexec_fn=lambda: nice(10), stdout=tar_fp) + proc.communicate() + tar_fp.close() + + if split_cmd: + logger.debug(f"main: executing split_cmd: {' '.join(split_cmd)}") + tar_fp = open(outfile, "rb") + proc = Popen(split_cmd, preexec_fn=lambda: nice(10), stdin=tar_fp) + proc.communicate() + tar_fp.close() + queues["unlink_queue"].put(outfile) + + file_glob = list(out_loc.glob(f"{host}.{bkup_num}.tar{file_glob}")) + + logger.info(f"main: dumped {len(file_glob)} files from {host} #{bkup_num}") + + # Pre-run to check for artifacts + for i in file_glob: + gpg_file = i.with_suffix(i.suffix + ".gpg") + if not i.name.endswith(".gpg") and gpg_file.exists(): + logger.warning(f"main: orphaned GPG file being deleted: {gpg_file}") + gpg_file.unlink() + + # Run again to send files to the relevant queue + for i in sorted(file_glob): + if (secrets.gpg_symmetric_key + and not i.name.endswith(".gpg") + and not i.name.endswith(".COMPLETE")): + # A tar file, unencrypted, needs encrypted. + logger.debug(f"main: adding {i} to gpg_queue") + queues["gpg_queue"].put([i, secrets.gpg_symmetric_key, comp_path]) + else: + # either encryption is off, or the file is already encrypted + logger.debug(f"main: adding {i} to send_queue") + queues["send_queue"].put(i) + + workers(queues, host, out_loc, bkup_num, beginning, msg, jobs) + logger.info(f"main: completed run after {time() - beginning} seconds") + + +def main(): + # Read in arguments, verify that they match the BackupPC standard exactly + parser = ArgumentParser(description="Archive a BackupPC host into B2") + parser.add_argument("tarCreatePath", type=exec_path, help="Path to the tar binary") + parser.add_argument("splitPath", type=exec_path, help="Path to the split binary") + parser.add_argument("parPath", type=exec_path, help="The path to the parity binary (not used)") + parser.add_argument("host", type=str, help="Host name to backup") + parser.add_argument("bkupNum", type=int, help="Backup number to archive") + parser.add_argument("compPath", type=exec_path, help="Compression binary") + parser.add_argument("fileExt", type=str, help="The extension assigned to the compression type (not used)") + parser.add_argument("splitSize", type=int, help="Archive split size") + parser.add_argument("outLoc", type=dir_path, help="Temporary location path") + parser.add_argument("parFile", type=int, help="The amount of parity data to create (not used)") + parser.add_argument("share", type=str, help="Backup share to archive") + parser.add_argument("-v", "--verbose", action="store_const", dest="loglevel", const=INFO, default=WARNING, + help="Set log to info level") + parser.add_argument("-d", "--debug", action="store_const", dest="loglevel", const=DEBUG, + help="Set log to debug level") + parser.add_argument("-j", "--jobs", type=positive_int, dest="jobs", default=None, + help="Number of process to run in parallel, default to the number of core in the system") + args = parser.parse_args() + + logger.setLevel(args.loglevel) + + archive(args.tarCreatePath, args.splitPath, args.parPath, args.host, args.bkupNum, args.compPath, args.fileExt, + args.splitSize, args.outLoc, args.parFile, args.share, args.jobs) + + +if __name__ == "__main__": + main() diff --git a/BackupPC_archiveHost_s3 b/BackupPC_archiveHost_s3 deleted file mode 100755 index ce91aeb..0000000 --- a/BackupPC_archiveHost_s3 +++ /dev/null @@ -1,407 +0,0 @@ -#!/usr/bin/python -W ignore::DeprecationWarning -# A BackupPC script to archive a host's files to Amazon S3. -# -# Point $Conf{ArchiveClientCmd} at me. -# Requires python-boto -# -# Usage: BackupPC_archiveHost tarCreatePath splitPath parPath host bkupNum \ -# compPath fileExt splitSize outLoc parFile share -# -# Create secrets.py such that it has: -# accesskey = 'amazon aws access key' -# sharedkey = 'amazon aws shared key' -# gpgsymmetrickey = 'gpg symmetric key -- make it good, but do not lose it' -# -# Copyright (c) 2009-2011 Ryan S. Tucker -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -import glob -import hashlib -import os -import socket -import sys -import time - -from multiprocessing import Process, Queue, cpu_count -from subprocess import * - -from boto.s3.connection import S3Connection -from boto.s3.key import Key -import boto.exception - -import logging -import logging.handlers - -import secrets - -logger = logging.getLogger(__name__) - -sysloghandler = logging.handlers.SysLogHandler('/dev/log', - facility=logging.handlers.SysLogHandler.LOG_DAEMON) -syslogformatter = logging.Formatter('%(filename)s: %(levelname)s: %(message)s') -sysloghandler.setFormatter(syslogformatter) -logger.addHandler(sysloghandler) - -consolehandler = logging.StreamHandler(sys.stdout) -consoleformatter = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s') -consolehandler.setFormatter(consoleformatter) -logger.addHandler(consolehandler) - -logger.setLevel(logging.DEBUG) - -class VerifyError(Exception): - pass - -def is_exe(fpath): - return os.path.exists(fpath) and os.access(fpath, os.X_OK) - -def encrypt_file(filename, key, compress='/bin/cat'): - compressmap = {'cat': 'none', 'gzip': 'ZLIB', 'bzip2': 'BZIP2'} - if os.path.basename(compress) in compressmap.keys(): - compress_algo = compressmap[os.path.basename(compress)] - else: - compress_algo = 'none' - - cmd = ['/usr/bin/gpg', '--batch', '--no-tty'] - cmd.extend(['--compress-algo', compress_algo]) - cmd.extend(['--output', '%s.gpg' % filename]) - cmd.extend(['--passphrase-fd', '0']) - cmd.extend(['--symmetric', filename]) - - if is_exe(cmd[0]): - logger.debug('encrypt_file: encrypting %s (compression: %s)' % (filename, compress_algo)) - else: - raise RuntimeError('%s is not an executable file!' % cmd[0]) - - proc = Popen(cmd, preexec_fn=lambda : os.nice(10), stdin=PIPE, stdout=PIPE) - proc.communicate(key) - - if os.path.exists(filename + '.gpg'): - oldfilesize = os.path.getsize(filename) - newfilesize = os.path.getsize(filename + '.gpg') - compressed = ((oldfilesize - newfilesize) / float(oldfilesize)) * 100 - logger.debug('encrypt_file: %s %s by %.2f%% (%i -> %i bytes)' % (filename, 'shrunk' if oldfilesize>newfilesize else 'grew', compressed, oldfilesize, newfilesize)) - return filename + '.gpg' - else: - raise RuntimeError('output file does not exist: %s.gpg' % filename) - -def open_s3(accesskey, sharedkey, host): - conn = S3Connection(accesskey, sharedkey, is_secure=True) - mybucketname = (accesskey + '-bkup-' + host).lower() - try: - bucket = conn.get_bucket(mybucketname) - except boto.exception.S3ResponseError: - logger.info('open_s3: creating new bucket %s' % mybucketname) - bucket = conn.create_bucket(mybucketname) - bucket.set_acl('private') - return bucket - -def handle_progress(transmitted, pending): - logger.debug("send_file: %i of %i bytes transmitted (%.2f%%)", transmitted, pending, (transmitted/float(pending))*100) - -def verify_file(bucket, filename): - "Returns True if the file size and md5sum match, False otherwise" - basefilename = os.path.basename(filename) - key = bucket.get_key(basefilename) - stat = os.stat(filename) - if key: - if key.size == stat[6]: - fp = open(filename) - local_md5 = hashlib.md5(fp.read()) - fp.close() - logger.debug('verify_file: %s: local md5 "%s", etag %s', filename, local_md5.hexdigest(), key.etag) - if '"%s"' % local_md5.hexdigest() == key.etag: - return True - return False - -def send_file(bucket, filename): - basefilename = os.path.basename(filename) - k = Key(bucket) - k.key = basefilename - - if k.exists(): - if verify_file(bucket, filename): - logger.warning("send_file: %s already exists and is identical, not overwriting", basefilename) - return k - logger.warning("send_file: %s already exists on S3, overwriting", basefilename) - - k.set_contents_from_filename(filename, cb=handle_progress, reduced_redundancy=True) - - logger.debug("send_file: %s sent, verifying fidelity", filename) - if not verify_file(bucket, filename): - raise VerifyError("verify failed") - return k - -def encryption_worker(in_q, out_q, unlink_q): - "Encrypts things from the in_q, puts them in the out_q" - start_time = time.time() - counter = 0 - for filename, gpgkey, comppath in iter(in_q.get, 'STOP'): - counter += 1 - cryptstart_time = time.time() - logger.info("encryption_worker: encrypting %s", filename) - result = encrypt_file(filename, gpgkey, comppath) - out_q.put(result) - unlink_q.put(filename) - logger.debug("encryption_worker: encrypted %s in %i seconds", filename, time.time()-cryptstart_time) - logger.debug("encryption_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time()-start_time) - time.sleep(5) # settle - -def sending_worker(in_q, out_q, accesskey, sharedkey, host): - "Sends things from the in_q using the send_file method" - start_time = time.time() - counter = 0 - for filename in iter(in_q.get, 'STOP'): - sending_start = time.time() - counter += 1 - retry_count = 0 - max_retries = 10 - done = False - - while retry_count <= max_retries and not done: - try: - logger.info("sending_worker: sending %s", filename) - bucket = open_s3(accesskey, sharedkey, host) - key = send_file(bucket, filename) - key.set_acl('private') - key.close() - done = True - except (boto.exception.S3ResponseError, socket.error, VerifyError), e: - retry_count += 1 - sleeptime = 2**retry_count - logger.error('sending_worker: exception %s, retrying in %i seconds (%i/%i)', e, sleeptime, retry_count, max_retries) - time.sleep(sleeptime) - - if not done: - # trip out - logger.error('sending_worker: could not upload %s in %i retries', filename, retry_count) - else: - size = os.path.getsize(filename) - sending_seconds = time.time() - sending_start - bytespersecond = size / sending_seconds - logger.debug("sending_worker: sent %s in %i seconds at %i bytes/second.", filename, sending_seconds, bytespersecond) - out_q.put(filename) - - logger.debug("sending_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time() - start_time) - time.sleep(5) # settle - -def unlink_worker(in_q): - start_time = time.time() - counter = 0 - for filename in iter(in_q.get, 'STOP'): - counter += 1 - logger.debug("unlink_worker: deleting %s", filename) - try: - os.unlink(filename) - except OSError, e: - logger.warning("unlink_worker: caught exception: %s", e) - - logger.debug("unlink_worker: queue is empty, terminating after %i items in %i seconds", counter, time.time() - start_time) - time.sleep(5) # settle - -if __name__ == '__main__': - # Read in arguments, verify that they match the BackupPC standard exactly - if len(sys.argv) != 12: - sys.stderr.write("Usage: %s tarCreatePath splitPath parPath host bkupNum compPath fileExt splitSize outLoc parFile share\n" % sys.argv[0]) - sys.exit(1) - else: - tarCreate = sys.argv[1] - splitPath = sys.argv[2] - parPath = sys.argv[3] - host = sys.argv[4] - bkupNum = int(sys.argv[5]) - compPath = sys.argv[6] - fileExt = sys.argv[7] - splitSize = int(sys.argv[8]) - outLoc = sys.argv[9] - parfile = sys.argv[10] - share = sys.argv[11] - - for i in [tarCreate, compPath, splitPath, parPath]: - if i is not '' and not is_exe(i): - sys.stderr.write('Error: %s is not an executable program\n' % i) - sys.exit(1) - - beginning = time.time() - - # Create queues for workers - gpg_queue = Queue() - send_queue = Queue() - unlink_queue = Queue() - - queues = { - 'gpg_queue': gpg_queue, - 'send_queue': send_queue, - 'unlink_queue': unlink_queue, - } - - # Is there already evidence of this having been done before? - if glob.glob('%s/%s.*.tar.*' % (outLoc, host)): - logger.warning('main: finishing previous incomplete run') - somefile = os.path.basename(glob.glob('%s/%s.*.tar.*' % (outLoc, host))[0]) - keyparts = somefile.split('.') - encrypted = split = tarred = final = False - if keyparts[-1] == 'gpg': - keyparts.pop() - if keyparts[-1] != 'tar' and len(keyparts[-1]) is 2: - keyparts.pop() - if keyparts[-1] == 'tar': - keyparts.pop() - - bkupNum = int(keyparts.pop()) - - filehead = '%s/%s.%i.tar.' % (outLoc, host, bkupNum) - fileglob = filehead + '*' - - mesg = "Continuing upload for host %s, backup #%i" % (host, bkupNum) - if splitSize > 0 and is_exe(splitPath): - mesg += ', split into %i byte chunks' % splitSize - if secrets.gpgsymmetrickey: - mesg += ', encrypted with secret key' - logger.info("main: %s", mesg) - else: - mesg = "Writing archive for host %s, backup #%i" % (host, bkupNum) - - tarcmd = [tarCreate, '-t'] - tarcmd.extend(['-h', host]) - tarcmd.extend(['-n', str(bkupNum)]) - tarcmd.extend(['-s', share]) - tarcmd.extend(['.']) - - splitcmd = None - outfile = '%s/%s.%i.tar' % (outLoc, host, bkupNum) - - if splitSize > 0 and is_exe(splitPath): - filehead = outfile + '.' - fileglob = filehead + '*' - splitcmd = [splitPath, '-b', str(splitSize), '-', filehead] - mesg += ', split into %i byte chunks' % splitSize - else: - fileglob = outfile - filehead = fileglob + '.' - - if secrets.gpgsymmetrickey: - mesg += ', encrypted with secret key' - - logger.info("main: %s", mesg) - logger.debug("main: executing tarcmd: %s > %s", ' '.join(tarcmd), outfile) - - tarfp = open(outfile, 'wb') - proc = Popen(tarcmd, preexec_fn=lambda : os.nice(10), stdout=tarfp) - proc.communicate() - tarfp.close() - - if splitcmd: - logger.debug("main: executing splitcmd: %s", ' '.join(splitcmd)) - tarfp = open(outfile, 'rb') - proc = Popen(splitcmd, preexec_fn=lambda : os.nice(10), stdin=tarfp) - proc.communicate() - tarfp.close() - unlink_queue.put(outfile) - - logger.info("main: dumped %i files from %s #%i" % (len(glob.glob(fileglob)), host, bkupNum)) - - # Pre-run to check for artifacts - for i in glob.glob(fileglob): - if not i.endswith('.gpg') and os.path.exists(i + '.gpg'): - logger.warning("main: orphaned GPG file being deleted: %s", i + '.gpg') - os.unlink(i + '.gpg') - - # Run again to send files to the relevant queue - for i in sorted(glob.glob(fileglob)): - if (secrets.gpgsymmetrickey - and not i.endswith('.gpg') - and not i.endswith('.COMPLETE')): - # A tar file, unencrypted, needs encrypted. - logger.debug("main: adding %s to gpg_queue", i) - gpg_queue.put([i, secrets.gpgsymmetrickey, compPath]) - else: - # either encryption is off, or the file is already encrypted - logger.debug("main: adding %s to send_queue", i) - send_queue.put(i) - - # Start some handlers, wait until everything is done - try: - process_count = cpu_count() - except NotImplementedError: - process_count = 1 - - procs = [] - - for i in range(process_count): - p = Process(name="encryption_worker_%i" % i, target=encryption_worker, args=(gpg_queue, send_queue, unlink_queue)) - p.start() - procs.append(p) - - send_p = Process(name="send_worker", target=sending_worker, args=(send_queue, unlink_queue, secrets.accesskey, secrets.sharedkey, host)) - send_p.start() - procs.append(send_p) - - unlink_p = Process(name="unlink_worker", target=unlink_worker, args=(unlink_queue,)) - unlink_p.start() - procs.append(unlink_p) - - send_queue_closed = False - unlink_queue_closed = False - - # Put STOP command(s) at the end of the GPG queue. - gpg_queue_closed = True - for i in range(process_count): - gpg_queue.put('STOP') - - for i in procs: - # wait for each process to terminate in turn - i.join() - logger.debug("main: process terminated: %s", i.name) - - # count how many crypto processes are still running - crypto_running = 0 - for j in procs: - if j.name.startswith("encryption_worker") and j.is_alive(): - crypto_running += 1 - - if crypto_running == 0 and not send_queue_closed: - # crypto is done, close up the send queue - logger.debug("main: queuing final file") - finalfile = '%sCOMPLETE' % filehead - fp = open(finalfile, 'w') - fp.write('%s %s "%s"' % (beginning, time.time(), mesg)) - fp.close() - send_queue.put(finalfile) - - logger.debug("main: queuing stop sentinel for send_queue") - send_queue.put('STOP') - send_queue_closed = True - - if not send_p.is_alive() and not unlink_queue_closed: - # sending is done, close up the unlink queue - logger.debug("main: queuing stop sentinel for unlink_queue") - unlink_queue.put('STOP') - unlink_queue_closed = True - - for qname, q in queues.items(): - time.sleep(5) # settle - if not q.empty(): - logger.critical("main: queue %s not empty!", qname) - raise Exception("queue not empty: %s" % qname) - else: - logger.debug("main: queue %s is empty", qname) - - logger.info("main: completed run after %i seconds", (time.time() - beginning)) diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..01eec3e --- /dev/null +++ b/Dockerfile @@ -0,0 +1,3 @@ +FROM adferrand/backuppc +RUN apk add py3-pip gnupg +RUN pip3 install b2sdk python-gnupg progress diff --git a/README.markdown b/README.markdown index 00fd174..4b8b0f4 100644 --- a/README.markdown +++ b/README.markdown @@ -1,30 +1,23 @@ -BackupPC_archiveHost_s3 +BackupPC archive host B2 ======================= +> Fork from [rtucker/backuppc-archive-s3](https://github.com/rtucker/backuppc-archive-s3) This is a Python script that acts as an interface between [BackupPC](http://backuppc.sourceforge.net/) and -[Amazon S3](http://aws.amazon.com/s3/). It uses BackupPC's +[Backblaze B2](https://www.backblaze.com/b2/cloud-storage.html). It uses BackupPC's [archive function](http://backuppc.sourceforge.net/faq/BackupPC.html#archive_functions) to extract a tarball and split it into chunks, like the normal archive function. Then, the chunks are encrypted using gpg and transmitted to -S3 using [Boto](https://github.com/boto/boto). +B2 using [B2 Python SDK](https://github.com/Backblaze/b2-sdk-python). Installation ------------ -I wrote this script some years ago, and can't remember how to get it going. -But, here's going to be my best guess :-) - ### Install the prerequisites -> You will need Python, [Boto](https://github.com/boto/boto), and a +> You will need Python3.9, pip requirements, and a > working BackupPC installation. -> Note: Python 2.6+ and Boto 2.0+ are required for recent changes, which -> include multiprocessing support. I may make these optional later on, -> but until then, tag stable-20110610 is what was running before I decided -> to mess with things! - ### Download and install this script > Something like this seems like a good idea: diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..feb33f9 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +python-gnupg==0.4.8 +b2sdk==1.14.1 +progress==1.6 diff --git a/secrets.py.orig b/secrets.py.orig index bbcea5b..a23b832 100644 --- a/secrets.py.orig +++ b/secrets.py.orig @@ -1,5 +1,4 @@ # Create secrets.py such that it has: -# accesskey = 'amazon aws access key' -# sharedkey = 'amazon aws shared key' -# gpgsymmetrickey = 'gpg symmetric key -- make it good, but do not lose it' -# speedfile = 'path to a file that has a max upload speed in kbits/sec' +# access_key = "Backblaze B2 application key id" +# shared_key = "Backblaze B2 application key" +# gpg_symmetric_key = "gpg symmetric key -- make it good, but do not lose it"