Add delete option, support of IPv6 and some fix

This commit is contained in:
Ethanell 2020-03-01 17:48:44 +01:00
parent b463117300
commit 97b2e07b81
8 changed files with 312 additions and 22 deletions

View file

@ -37,9 +37,9 @@ def ipv4(prefix: str, ipl: [IPv4Address] = None, macl: [str] = None) -> [(IPv4Ad
mac = None
if ip not in ipl:
ipl.append(ip)
mac = str(RandMac("00:00:00:00:00:00", True))
mac = str(RandMac("00:00:00:00:00:00", True)).replace("'", "")
while mac in macl:
mac = str(RandMac("00:00:00:00:00:00", True))
mac = str(RandMac("00:00:00:00:00:00", True)).replace("'", "")
macl.append(mac)
out.append((ip, mac, subnet_mask, cidr))

23
ip/ipv6.py Normal file
View file

@ -0,0 +1,23 @@
def link_local(mac: str) -> str:
"""
Convert MAC to IPv6 Link-local address
:param mac: MAC address
:type mac: str
:return: IPv6 Link-local address
:rtype: str
"""
# only accept MACs separated by a colon
parts = mac.split(":")
# modify parts to match IPv6 value
parts.insert(3, "ff")
parts.insert(4, "fe")
parts[0] = "%x" % (int(parts[0], 16) ^ 2)
# format output
ipv6_parts = []
for i in range(0, len(parts), 2):
ipv6_parts.append("".join(parts[i:i+2]))
ipv6 = "fe80::%s" % (":".join(ipv6_parts))
return ipv6

27
main.py
View file

@ -4,12 +4,17 @@ from os.path import isfile
from json import dump, load
from whmcs.get_whmcs import get_whmcs_ipv4, get_whmcs_mac
from whmcs.insert_whmcs import insert_whmcs_ipv4
from router.insert_router import insert_router_ipv4
from whmcs.remove_whmcs import remove_whmcs_ipv4
from router.insert_router import insert_router_ipv4, insert_router_ipv6
from router.remove_router import remove_router_ipv4, remove_router_ipv6
from ip.ipv4 import ipv4
if not isfile("config.json"):
with open("config.json", "w") as config:
data = {"database": {"host": "", "user": "", "password": "", "name": ""}, "ssh": {"host": "", "port": 22, "user": "", "key": ""}, "interface": {"default": ""}}
data = {"database": {"host": "", "user": "", "password": "", "name": ""},
"ssh": {"host": "", "port": 22, "user": "", "key": ""},
"interface": {"default": ""},
"IPv6": {"template": ""}}
dump(data, config)
print("Config file created, please fill it")
exit()
@ -25,18 +30,24 @@ SSH_PORT = conf["ssh"]["port"]
SSH_USER = conf["ssh"]["user"]
SSH_KEY = conf["ssh"]["key"]
IPV6_TEMPLATE = conf["IPv6"]["template"]
DEFAULT_INTERFACE = conf["interface"]["default"]
pars = ArgumentParser()
pars.add_argument("interface", help="Interface of IPs")
pars.add_argument("prefix", help="IPs prefix")
pars.add_argument("-d", "--debug", help="Any consequence", action="store_true")
pars.add_argument("-d", "--debug", help="Any consequence and verbose", action="store_true")
pars.add_argument("-v", "--verbose", help="More output", action="store_true")
pars.add_argument("--delete", help="Delete IPv4 and v6", action="store_true")
args = pars.parse_args()
debug = False
if args.debug:
debug = True
print("DEBUG MOD ACTICATED !")
if args.verbose:
print("Verbose enabled")
# DB connection
db = connect(DB_HOST, DB_USER, DB_PASS, DB_NAME)
@ -48,5 +59,11 @@ macl = get_whmcs_mac(db)
out = ipv4(args.prefix, ipl, macl)
# Insert the list
insert_whmcs_ipv4(out, args.interface, db, debug)
insert_router_ipv4(out, args.interface, SSH_HOST, SSH_PORT, SSH_USER, SSH_KEY, debug)
if not args.delete:
insert_whmcs_ipv4(out, args.interface, db, debug, args.verbose)
insert_router_ipv4(out, args.interface, SSH_HOST, SSH_PORT, SSH_USER, SSH_KEY, debug, args.verbose)
insert_router_ipv6(out, IPV6_TEMPLATE, args.interface, SSH_HOST, SSH_PORT, SSH_USER, SSH_KEY, debug, args.verbose)
else:
remove_whmcs_ipv4(out, db, debug, args.verbose)
remove_router_ipv4(out, SSH_HOST, SSH_PORT, SSH_USER, SSH_KEY, debug, args.verbose)
remove_router_ipv6(out, IPV6_TEMPLATE, SSH_HOST, SSH_PORT, SSH_USER, SSH_KEY, debug, args.verbose)

View file

@ -1,10 +1,16 @@
from subprocess import run, PIPE
from ipaddress import IPv4Address
from ipaddress import IPv4Address, IPv6Address
import re
rip = re.compile(r"[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}") # IP regex
# IPv4 regex
ripv4 = re.compile(r"[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}")
ripv4_id = re.compile(r" *([0-9]+).* ([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})")
# IPv6 regex
ripv6 = re.compile(r" *[0-9]+ +.+ ([a-f0-9]{1,4}:[a-f0-9]{1,4}:[a-f0-9]{1,4}:[a-f0-9]{1,4})::\/")
ripv6_id = re.compile(r" *([0-9]+) +.+ ([a-f0-9]{1,4}:[a-f0-9]{1,4}:[a-f0-9]{1,4}:[a-f0-9]{1,4})::\/")
# MAC regex
rmac = re.compile(r"(?:[A-F]|[0-9]){1,3}:(?:[A-F]|[0-9]){1,3}:(?:[A-F]|[0-9]){1,3}:(?:[A-F]|[0-9]){1,3}:(?:[A-F]|[0-9])"
r"{1,3}:(?:[A-F]|[0-9]){1,3}") # MAC regex
r"{1,3}:(?:[A-F]|[0-9]){1,3}")
def get_router_ipv4(host: str, port: int, user: str, key: str) -> [IPv4Address]:
@ -25,7 +31,82 @@ def get_router_ipv4(host: str, port: int, user: str, key: str) -> [IPv4Address]:
out = run(["ssh", "-i", key, "-o", "StrictHostKeyChecking no", f"{user}@{host}", "-p", str(port), "/ip arp print"],
stdout=PIPE).stdout.decode()
return [IPv4Address(i) for i in rip.findall(out)]
return [IPv4Address(i) for i in ripv4.findall(out)]
def get_router_ipv4_id(ipv4: IPv4Address, host: str, port: int, user: str, key: str) -> int:
"""
Get IPv4 id of the router
:param ipv4: The IPv4 to search
:type ipv4: IPv4Address
:param host: The SSH host of the router
:type host: str
:param port: The SSH port of the router
:type port: int
:param user: The SSH user of the router
:type port: str
:param key: The SSH key of the router
:type key: str
:return: List of IPv4 in the router
:rtype: int
"""
out = run(["ssh", "-i", key, "-o", "StrictHostKeyChecking no", f"{user}@{host}", "-p", str(port), "/ip arp print"],
stdout=PIPE).stdout.decode()
for i in ripv4_id.findall(out):
if i[1] == str(ipv4):
return i[0]
return -1
def get_router_ipv6(host: str, port: int, user: str, key: str) -> [IPv6Address]:
"""
Gets IPv6 list of the router
:param host: The SSH host of the router
:type host: str
:param port: The SSH port of the router
:type port: int
:param user: The SSH user of the router
:type port: str
:param key: The SSH key of the router
:type key: str
:return: List of IPv6 in the router
:rtype: [IPv6Address]
"""
out = run(["ssh", "-i", key, "-o", "StrictHostKeyChecking no", f"{user}@{host}", "-p", str(port),
"/ipv6 route print"], stdout=PIPE).stdout.decode()
return [IPv6Address(i) for i in ripv6.findall(out)]
def get_router_ipv6_id(ipv6: str, host: str, port: int, user: str, key: str) -> int:
"""
Get IPv6 id of the router
Return -1 if not found
:param ipv6: The IPv6 to search
:type ipv6: str
:param host: The SSH host of the router
:type host: str
:param port: The SSH port of the router
:type port: int
:param user: The SSH user of the router
:type port: str
:param key: The SSH key of the router
:type key: str
:return: The id of the given IPv6
:rtype: int
"""
out = run(["ssh", "-i", key, "-o", "StrictHostKeyChecking no", f"{user}@{host}", "-p", str(port),
"/ipv6 route print"], stdout=PIPE).stdout.decode()
for i in ripv6_id.findall(out):
if i[1] == ipv6[:-5]:
return i[0]
return -1
def get_router_mac(host: str, port: int, user: str, key: str) -> [str]:
@ -44,6 +125,6 @@ def get_router_mac(host: str, port: int, user: str, key: str) -> [str]:
:rtype: [str]
"""
out = run(["ssh", "-i", key, "-o", "StrictHostKeyChecking no", f"{user}@{host}", "-p", port, "/ip arp print"],
out = run(["ssh", "-i", key, "-o", "StrictHostKeyChecking no", f"{user}@{host}", "-p", str(port), "/ip arp print"],
stdout=PIPE).stdout.decode()
return rmac.findall(out)

View file

@ -1,10 +1,11 @@
from ipaddress import IPv4Address
from router.get_router import get_router_ipv4
from router.get_router import get_router_ipv4, get_router_mac
from ip.ipv6 import link_local
from subprocess import run
def insert_router_ipv4(insert: [(IPv4Address, str, IPv4Address, int)], interface: str, host: str, port: int, user: str,
key: str, debug: bool = False):
key: str, debug: bool = False, verbose: bool = False):
"""
This function insert IPv4 on the router
@ -20,19 +21,62 @@ def insert_router_ipv4(insert: [(IPv4Address, str, IPv4Address, int)], interface
:type port: str
:param key: The SSH key of the router
:type key: str
:param debug: Disable commit on database
:param debug: Disable command on router
:type debug: bool
:param verbose: Print each command on router
:type verbose: bool
"""
ipl = get_router_ipv4(host, port, user, key)
macl = get_router_mac(host, port, user, key)
print("Start insert on router")
print("Start insert IPv4 on router")
for i in insert:
if i[1]:
if ((i[0] not in ipl) or not (ipl[ipl.find(i[0]):5].replace(" ", ""))) and (i[1] not in ipl):
cmd = ["ssh", "-i", key, "-o", "StrictHostKeyChecking no", f"{user}@{host}", "-p", str(port), f"/ip arp add address={i[0]} mac-address={i[1]} interface={interface}".replace("'", "")]
if (i[0] not in ipl) and (i[1] not in macl):
cmd = ["ssh", "-i", key, "-o", "StrictHostKeyChecking no", f"{user}@{host}", "-p", str(port),
f"/ip arp add address={i[0]} mac-address={i[1]} interface={interface}"]
if not debug:
run(cmd)
else:
if debug or verbose:
print(cmd)
print("Insert on router done")
print("Insert IPv4 on router done")
def insert_router_ipv6(insert: [(IPv4Address, str, IPv4Address, int)], ipv6: str, interface: str, host: str, port: int, user: str,
key: str, debug: bool = False, verbose: bool = False):
"""
This function insert IPv6 on the router
:param insert: The list of IPs, MACs to insert
:type insert: [(IPv4Address, str, IPv4Address, int)]
:param ipv6: The IPV6 template
:type ipv6: str
:param interface: The interface of IPs
:type interface: str
:param host: The SSH host of the router
:type host: str
:param port: The SSH port of the router
:type port: int
:param user: The SSH user of the router
:type port: str
:param key: The SSH key of the router
:type key: str
:param debug: Disable commit on database
:type debug: bool
:param verbose: Print each command on router
:type verbose: bool
"""
print("Start insert IPv6 on router")
for i in insert:
if i[1]:
ip = ipv6.format(str(i[0]).split(".")[-1])
gateway = link_local(i[1])
cmd = ["ssh", "-i", key, "-o", "StrictHostKeyChecking no", f"{user}@{host}", "-p", str(port),
f"/ipv6 route add dst-address={ip} gateway={gateway}%{interface}"]
if not debug:
run(cmd)
if debug or verbose:
print(cmd)
print("Insert IPv6 on router done")

78
router/remove_router.py Normal file
View file

@ -0,0 +1,78 @@
from ipaddress import IPv4Address
from router.get_router import get_router_ipv4_id, get_router_ipv6_id
from subprocess import run
def remove_router_ipv4(remove: [(IPv4Address, str, IPv4Address, int)], host: str, port: int, user: str,
key: str, debug: bool = False, verbose: bool = False):
"""
This function remove IPv6 on the router
:param remove: The list of IPs, MACs to remove
:type remove: [(IPv4Address, str, IPv4Address, int)]
:param host: The SSH host of the router
:type host: str
:param port: The SSH port of the router
:type port: int
:param user: The SSH user of the router
:type port: str
:param key: The SSH key of the router
:type key: str
:param debug: Disable commit on database
:type debug: bool
:param verbose: Print each command on router
:type verbose: bool
"""
print("Start remove IPv4 on router")
for i in remove:
id = get_router_ipv4_id(i[0], host, port, user, key)
if id != -1:
cmd = ["ssh", "-i", key, "-o", "StrictHostKeyChecking no", f"{user}@{host}", "-p", str(port),
f"/ip arp remove {id}"]
if not debug:
run(cmd)
if debug or verbose:
print(cmd)
elif debug or verbose:
print(f"IPv4:{i[0]} not found")
print("Remove IPv4 on router done")
def remove_router_ipv6(remove: [(IPv4Address, str, IPv4Address, int)], ipv6: str, host: str, port: int, user: str,
key: str, debug: bool = False, verbose: bool = False):
"""
This function remove IPv6 on the router
:param remove: The list of IPs, MACs to remove
:type remove: [(IPv4Address, str, IPv4Address, int)]
:param ipv6: The IPV6 template
:type ipv6: str
:param host: The SSH host of the router
:type host: str
:param port: The SSH port of the router
:type port: int
:param user: The SSH user of the router
:type port: str
:param key: The SSH key of the router
:type key: str
:param debug: Disable commit on database
:type debug: bool
:param verbose: Print each command on router
:type verbose: bool
"""
print("Start remove IPv6 on router")
for i in remove:
ip = ipv6.format(str(i[0]).split(".")[-1])
id = get_router_ipv6_id(ip, host, port, user, key)
if id != -1:
cmd = ["ssh", "-i", key, "-o", "StrictHostKeyChecking no", f"{user}@{host}", "-p", str(port),
f"/ipv6 route remove {id}"]
if not debug:
run(cmd)
if debug or verbose:
print(cmd)
elif debug or verbose:
print(f"IPv6:{ip} not found")
print("Remove IPv6 on router done")

View file

@ -3,7 +3,8 @@ from sys import stderr
from pymysql import Connect
def insert_whmcs_ipv4(insert: [(IPv4Address, str, IPv4Address, int)], interface: str, db: Connect, debug: bool = False):
def insert_whmcs_ipv4(insert: [(IPv4Address, str, IPv4Address, int)], interface: str, db: Connect, debug: bool = False,
verbose: bool = False):
"""
This function insert given IPs and MACs to WHMCS
@ -15,6 +16,8 @@ def insert_whmcs_ipv4(insert: [(IPv4Address, str, IPv4Address, int)], interface:
:type db: pymysql.Connect
:param debug: Disable commit on database
:type debug: bool
:param verbose: Print actions on database
:type verbose: bool
"""
cursor = db.cursor()
# Get gateway
@ -33,12 +36,14 @@ def insert_whmcs_ipv4(insert: [(IPv4Address, str, IPv4Address, int)], interface:
for i in insert:
if i[1]:
cmd = f"INSERT INTO mg_proxmox_addon_ip (ip, type, mac_address, subnet_mask, cidr, gateway, tag) " \
f"VALUES ('{i[0]}', 'IPv4', {i[1]}, '{i[2]}', {i[3]}, '{gateway}', {vlan})"
f"VALUES ('{i[0]}', 'IPv4', '{i[1]}', '{i[2]}', {i[3]}, '{gateway}', {vlan})"
try:
cursor.execute(cmd)
except Exception as e:
print(cmd, file=stderr)
raise e
if debug or verbose:
print(cmd)
cursor.close()

42
whmcs/remove_whmcs.py Normal file
View file

@ -0,0 +1,42 @@
from ipaddress import IPv4Address
from sys import stderr
from pymysql import Connect
def remove_whmcs_ipv4(remove: [(IPv4Address, str, IPv4Address, int)], db: Connect, debug: bool = False, verbose: bool = False):
"""
This function remove IPv6 on the router
:param remove: The list of IPs, MACs to remove
:type remove: [(IPv4Address, str, IPv4Address, int)]
:param db: The database connection of WHMCS
:type db: pymysql.Connect
:param debug: Disable commit on database
:type debug: bool
:param verbose: Print each command on router
:type verbose: bool
"""
cursor = db.cursor()
for i in remove:
cmd = f"DELETE FROM mg_proxmox_addon_ip WHERE ip = '{i[0]}'"
try:
cursor.execute(cmd)
except Exception as e:
print(cmd, file=stderr)
raise e
if debug or verbose:
print(cmd)
cursor.close()
# Commit to the DB
if not debug:
try:
print("Commit to DB...")
db.commit()
except Exception as e:
raise e
else:
print("Commited to DB")