Add options to ssh commands

This commit is contained in:
Ethanell 2020-03-09 08:59:12 +01:00
parent b182c359fc
commit b59a4acaa7
5 changed files with 47 additions and 28 deletions

11
main.py
View file

@ -12,7 +12,7 @@ from ip.ipv4 import ipv4
if not isfile("config.json"):
with open("config.json", "w") as config:
data = {"database": {"host": "", "user": "", "password": "", "name": ""},
"ssh": {"host": "", "port": 22, "user": "", "key": ""},
"ssh": {"host": "", "port": 22, "user": "", "key": "", "options": []},
"interface": {"default": ""},
"IPv6": {"template": ""}}
dump(data, config)
@ -29,6 +29,7 @@ SSH_HOST = conf["ssh"]["host"]
SSH_PORT = conf["ssh"]["port"]
SSH_USER = conf["ssh"]["user"]
SSH_KEY = conf["ssh"]["key"]
SSH_OPTIONS = conf["ssh"]["options"]
IPV6_TEMPLATE = conf["IPv6"]["template"]
@ -61,9 +62,9 @@ out = ipv4(args.prefix, ipl, macl)
# Insert the list
if not args.delete:
insert_whmcs_ipv4(out, args.interface, db, debug, args.verbose)
insert_router_ipv4(out, args.interface, SSH_HOST, SSH_PORT, SSH_USER, SSH_KEY, debug, args.verbose)
insert_router_ipv6(out, IPV6_TEMPLATE, args.interface, SSH_HOST, SSH_PORT, SSH_USER, SSH_KEY, debug, args.verbose)
insert_router_ipv4(out, args.interface, SSH_HOST, SSH_PORT, SSH_USER, SSH_KEY, debug, args.verbose, SSH_OPTIONS)
insert_router_ipv6(out, IPV6_TEMPLATE, args.interface, SSH_HOST, SSH_PORT, SSH_USER, SSH_KEY, debug, args.verbose, SSH_OPTIONS)
else:
remove_whmcs_ipv4(out, db, debug, args.verbose)
remove_router_ipv4(out, SSH_HOST, SSH_PORT, SSH_USER, SSH_KEY, debug, args.verbose)
remove_router_ipv6(out, IPV6_TEMPLATE, SSH_HOST, SSH_PORT, SSH_USER, SSH_KEY, debug, args.verbose)
remove_router_ipv4(out, SSH_HOST, SSH_PORT, SSH_USER, SSH_KEY, debug, args.verbose, SSH_OPTIONS)
remove_router_ipv6(out, IPV6_TEMPLATE, SSH_HOST, SSH_PORT, SSH_USER, SSH_KEY, debug, args.verbose, SSH_OPTIONS)

View file

@ -13,7 +13,7 @@ rmac = re.compile(r"(?:[A-F]|[0-9]){1,3}:(?:[A-F]|[0-9]){1,3}:(?:[A-F]|[0-9]){1,
r"{1,3}:(?:[A-F]|[0-9]){1,3}")
def get_router_ipv4(host: str, port: int, user: str, key: str) -> [IPv4Address]:
def get_router_ipv4(host: str, port: int, user: str, key: str, ssh_options: list = []) -> [IPv4Address]:
"""
Gets IPv4 list of the router
@ -25,16 +25,18 @@ def get_router_ipv4(host: str, port: int, user: str, key: str) -> [IPv4Address]:
:type port: str
:param key: The SSH key of the router
:type key: str
:param ssh_options: SSH optionals arguments
:type ssh_options: list
:return: List of IPv4 in the router
:rtype: [IPv4Address]
"""
out = run(["ssh", "-i", key, "-o", "StrictHostKeyChecking no", f"{user}@{host}", "-p", str(port), "/ip arp print"],
out = run(["ssh", "-i", key, "-o", "StrictHostKeyChecking no"] + ssh_options + [f"{user}@{host}", "-p", str(port), "/ip arp print"],
stdout=PIPE).stdout.decode()
return [IPv4Address(i) for i in ripv4.findall(out)]
def get_router_ipv4_id(ipv4: IPv4Address, host: str, port: int, user: str, key: str) -> int:
def get_router_ipv4_id(ipv4: IPv4Address, host: str, port: int, user: str, key: str, ssh_options: list = []) -> int:
"""
Get IPv4 id of the router
@ -48,11 +50,13 @@ def get_router_ipv4_id(ipv4: IPv4Address, host: str, port: int, user: str, key:
:type port: str
:param key: The SSH key of the router
:type key: str
:param ssh_options: SSH optionals arguments
:type ssh_options: list
:return: List of IPv4 in the router
:rtype: int
"""
out = run(["ssh", "-i", key, "-o", "StrictHostKeyChecking no", f"{user}@{host}", "-p", str(port), "/ip arp print"],
out = run(["ssh", "-i", key, "-o", "StrictHostKeyChecking no"] + ssh_options + [f"{user}@{host}", "-p", str(port), "/ip arp print"],
stdout=PIPE).stdout.decode()
for i in ripv4_id.findall(out):
if i[1] == str(ipv4):
@ -60,7 +64,7 @@ def get_router_ipv4_id(ipv4: IPv4Address, host: str, port: int, user: str, key:
return -1
def get_router_ipv6(host: str, port: int, user: str, key: str) -> [IPv6Address]:
def get_router_ipv6(host: str, port: int, user: str, key: str, ssh_options: list = []) -> [IPv6Address]:
"""
Gets IPv6 list of the router
@ -72,16 +76,18 @@ def get_router_ipv6(host: str, port: int, user: str, key: str) -> [IPv6Address]:
:type port: str
:param key: The SSH key of the router
:type key: str
:param ssh_options: SSH optionals arguments
:type ssh_options: list
:return: List of IPv6 in the router
:rtype: [IPv6Address]
"""
out = run(["ssh", "-i", key, "-o", "StrictHostKeyChecking no", f"{user}@{host}", "-p", str(port),
out = run(["ssh", "-i", key, "-o", "StrictHostKeyChecking no"] + ssh_options + [f"{user}@{host}", "-p", str(port),
"/ipv6 route print"], stdout=PIPE).stdout.decode()
return [IPv6Address(i) for i in ripv6.findall(out)]
def get_router_ipv6_id(ipv6: str, host: str, port: int, user: str, key: str) -> int:
def get_router_ipv6_id(ipv6: str, host: str, port: int, user: str, key: str, ssh_options: list = []) -> int:
"""
Get IPv6 id of the router
@ -97,11 +103,13 @@ def get_router_ipv6_id(ipv6: str, host: str, port: int, user: str, key: str) ->
:type port: str
:param key: The SSH key of the router
:type key: str
:param ssh_options: SSH optionals arguments
:type ssh_options: list
:return: The id of the given IPv6
:rtype: int
"""
out = run(["ssh", "-i", key, "-o", "StrictHostKeyChecking no", f"{user}@{host}", "-p", str(port),
out = run(["ssh", "-i", key, "-o", "StrictHostKeyChecking no"] + ssh_options + [f"{user}@{host}", "-p", str(port),
"/ipv6 route print"], stdout=PIPE).stdout.decode()
for i in ripv6_id.findall(out):
if i[1] == ipv6[:-5]:
@ -109,7 +117,7 @@ def get_router_ipv6_id(ipv6: str, host: str, port: int, user: str, key: str) ->
return -1
def get_router_mac(host: str, port: int, user: str, key: str) -> [str]:
def get_router_mac(host: str, port: int, user: str, key: str, ssh_options: list = []) -> [str]:
"""
Gets MAC list of the router
@ -121,10 +129,12 @@ def get_router_mac(host: str, port: int, user: str, key: str) -> [str]:
:type port: str
:param key: The SSH key of the router
:type key: str
:param ssh_options: SSH optionals arguments
:type ssh_options: list
:return: List of MAC in the router
:rtype: [str]
"""
out = run(["ssh", "-i", key, "-o", "StrictHostKeyChecking no", f"{user}@{host}", "-p", str(port), "/ip arp print"],
out = run(["ssh", "-i", key, "-o", "StrictHostKeyChecking no"] + ssh_options + [f"{user}@{host}", "-p", str(port), "/ip arp print"],
stdout=PIPE).stdout.decode()
return rmac.findall(out)

View file

@ -5,7 +5,7 @@ from subprocess import run
def insert_router_ipv4(insert: [(IPv4Address, str, IPv4Address, int)], interface: str, host: str, port: int, user: str,
key: str, debug: bool = False, verbose: bool = False):
key: str, debug: bool = False, verbose: bool = False, ssh_options: list = []):
"""
This function insert IPv4 on the router
@ -25,16 +25,18 @@ def insert_router_ipv4(insert: [(IPv4Address, str, IPv4Address, int)], interface
:type debug: bool
:param verbose: Print each command on router
:type verbose: bool
:param ssh_options: SSH optionals arguments
:type ssh_options: list
"""
ipl = get_router_ipv4(host, port, user, key)
macl = get_router_mac(host, port, user, key)
ipl = get_router_ipv4(host, port, user, key, ssh_options)
macl = get_router_mac(host, port, user, key, ssh_options)
print("Start insert IPv4 on router")
for i in insert:
if i[1]:
if (i[0] not in ipl) and (i[1] not in macl):
cmd = ["ssh", "-i", key, "-o", "StrictHostKeyChecking no", f"{user}@{host}", "-p", str(port),
cmd = ["ssh", "-i", key, "-o", "StrictHostKeyChecking no"] + ssh_options + [f"{user}@{host}", "-p", str(port),
f"/ip arp add address={i[0]} mac-address={i[1]} interface={interface}"]
if not debug:
run(cmd)
@ -44,7 +46,7 @@ def insert_router_ipv4(insert: [(IPv4Address, str, IPv4Address, int)], interface
def insert_router_ipv6(insert: [(IPv4Address, str, IPv4Address, int)], ipv6: str, interface: str, host: str, port: int, user: str,
key: str, debug: bool = False, verbose: bool = False):
key: str, debug: bool = False, verbose: bool = False, ssh_options: list = []):
"""
This function insert IPv6 on the router
@ -66,6 +68,8 @@ def insert_router_ipv6(insert: [(IPv4Address, str, IPv4Address, int)], ipv6: str
:type debug: bool
:param verbose: Print each command on router
:type verbose: bool
:param ssh_options: SSH optionals arguments
:type ssh_options: list
"""
print("Start insert IPv6 on router")
@ -73,7 +77,7 @@ def insert_router_ipv6(insert: [(IPv4Address, str, IPv4Address, int)], ipv6: str
if i[1]:
ip = ipv6.format(str(i[0]).split(".")[-1])
gateway = link_local(i[1])
cmd = ["ssh", "-i", key, "-o", "StrictHostKeyChecking no", f"{user}@{host}", "-p", str(port),
cmd = ["ssh", "-i", key, "-o", "StrictHostKeyChecking no"] + ssh_options + [f"{user}@{host}", "-p", str(port),
f"/ipv6 route add dst-address={ip} gateway={gateway}%{interface}"]
if not debug:
run(cmd)

View file

@ -4,7 +4,7 @@ from subprocess import run
def remove_router_ipv4(remove: [(IPv4Address, str, IPv4Address, int)], host: str, port: int, user: str,
key: str, debug: bool = False, verbose: bool = False):
key: str, debug: bool = False, verbose: bool = False, ssh_options: list = []):
"""
This function remove IPv6 on the router
@ -22,13 +22,15 @@ def remove_router_ipv4(remove: [(IPv4Address, str, IPv4Address, int)], host: str
:type debug: bool
:param verbose: Print each command on router
:type verbose: bool
:param ssh_options: SSH optionals arguments
:type ssh_options: list
"""
print("Start remove IPv4 on router")
for i in remove:
id = get_router_ipv4_id(i[0], host, port, user, key)
id = get_router_ipv4_id(i[0], host, port, user, key, ssh_options)
if id != -1:
cmd = ["ssh", "-i", key, "-o", "StrictHostKeyChecking no", f"{user}@{host}", "-p", str(port),
cmd = ["ssh", "-i", key, "-o", "StrictHostKeyChecking no"] + ssh_options + [f"{user}@{host}", "-p", str(port),
f"/ip arp remove {id}"]
if not debug:
run(cmd)
@ -40,7 +42,7 @@ def remove_router_ipv4(remove: [(IPv4Address, str, IPv4Address, int)], host: str
def remove_router_ipv6(remove: [(IPv4Address, str, IPv4Address, int)], ipv6: str, host: str, port: int, user: str,
key: str, debug: bool = False, verbose: bool = False):
key: str, debug: bool = False, verbose: bool = False, ssh_options: list = []):
"""
This function remove IPv6 on the router
@ -60,14 +62,16 @@ def remove_router_ipv6(remove: [(IPv4Address, str, IPv4Address, int)], ipv6: str
:type debug: bool
:param verbose: Print each command on router
:type verbose: bool
:param ssh_options: SSH optionals arguments
:type ssh_options: list
"""
print("Start remove IPv6 on router")
for i in remove:
ip = ipv6.format(str(i[0]).split(".")[-1])
id = get_router_ipv6_id(ip, host, port, user, key)
id = get_router_ipv6_id(ip, host, port, user, key, ssh_options)
if id != -1:
cmd = ["ssh", "-i", key, "-o", "StrictHostKeyChecking no", f"{user}@{host}", "-p", str(port),
cmd = ["ssh", "-i", key, "-o", "StrictHostKeyChecking no"] + ssh_options + [f"{user}@{host}", "-p", str(port),
f"/ipv6 route remove {id}"]
if not debug:
run(cmd)

View file

@ -35,7 +35,7 @@ def insert_whmcs_ipv4(insert: [(IPv4Address, str, IPv4Address, int)], interface:
# For every IP to insert
for i in insert:
if i[1]:
cmd = f"INSERT INTO mg_proxmox_addon_ip (ip, type, mac_address, subnet_mask, cidr, sid gateway, tag) " \
cmd = f"INSERT INTO mg_proxmox_addon_ip (ip, type, mac_address, subnet_mask, cidr, sid, gateway, tag) " \
f"VALUES ('{i[0]}', 'IPv4', '{i[1]}', '{i[2]}', {i[3]}, 0, '{gateway}', {vlan})"
try:
cursor.execute(cmd)