1
0
Fork 0
mirror of https://github.com/retspen/webvirtcloud synced 2025-07-31 12:41:08 +00:00

lint with black python. convert f style strings to old one. some small fixes

This commit is contained in:
catborise 2020-11-05 12:34:31 +03:00
parent c20c353a40
commit 508e3609be
54 changed files with 2123 additions and 1824 deletions

View file

@ -1,11 +1,15 @@
from libvirt import (
VIR_NETWORK_SECTION_IP_DHCP_HOST,
VIR_NETWORK_UPDATE_AFFECT_CONFIG,
VIR_NETWORK_UPDATE_AFFECT_LIVE,
VIR_NETWORK_UPDATE_COMMAND_ADD_LAST,
VIR_NETWORK_UPDATE_COMMAND_DELETE,
VIR_NETWORK_UPDATE_COMMAND_MODIFY, libvirtError)
from lxml import etree
from libvirt import libvirtError
from libvirt import VIR_NETWORK_SECTION_IP_DHCP_HOST
from libvirt import VIR_NETWORK_UPDATE_COMMAND_ADD_LAST, VIR_NETWORK_UPDATE_COMMAND_DELETE, VIR_NETWORK_UPDATE_COMMAND_MODIFY
from libvirt import VIR_NETWORK_UPDATE_AFFECT_LIVE, VIR_NETWORK_UPDATE_AFFECT_CONFIG
from vrtManager import util
from vrtManager.IPy import IP
from vrtManager.connection import wvmConnect
from vrtManager.IPy import IP
def network_size(subnet, dhcp=None):
"""
@ -17,7 +21,7 @@ def network_size(subnet, dhcp=None):
if addr.version() == 4:
dhcp_pool = [addr[2].strCompressed(), addr[addr.len() - 2].strCompressed()]
if addr.version() == 6:
mask = mask.lstrip('/') if '/' in mask else mask
mask = mask.lstrip("/") if "/" in mask else mask
dhcp_pool = [IP(addr[0].strCompressed() + hex(256)), IP(addr[0].strCompressed() + hex(512 - 1))]
if dhcp:
return gateway, mask, dhcp_pool
@ -26,7 +30,6 @@ def network_size(subnet, dhcp=None):
class wvmNetworks(wvmConnect):
def get_networks_info(self):
get_networks = self.get_networks()
networks = []
@ -39,44 +42,55 @@ class wvmNetworks(wvmConnect):
net_bridge = util.get_xml_path(net.XMLDesc(0), "/network/forward/interface/@dev")
net_forward = util.get_xml_path(net.XMLDesc(0), "/network/forward/@mode")
networks.append({'name': network, 'status': net_status,
'device': net_bridge, 'forward': net_forward})
networks.append({"name": network, "status": net_status, "device": net_bridge, "forward": net_forward})
return networks
def define_network(self, xml):
self.wvm.networkDefineXML(xml)
def create_network(self, name, forward,
ipv4, gateway, mask, dhcp4,
ipv6, gateway6, prefix6, dhcp6,
bridge, openvswitch, fixed=False):
def create_network(
self,
name,
forward,
ipv4,
gateway,
mask,
dhcp4,
ipv6,
gateway6,
prefix6,
dhcp6,
bridge,
openvswitch,
fixed=False,
):
xml = f"""
<network>
<name>{name}</name>"""
if forward in ['nat', 'route', 'bridge']:
if forward in ["nat", "route", "bridge"]:
xml += f"""<forward mode='{forward}'/>"""
if forward == 'macvtap':
if forward == "macvtap":
xml += f"""<forward mode='bridge'>
<interface dev='{bridge}'/>
</forward>"""
else:
xml += """<bridge """
if forward in ['nat', 'route', 'none']:
if forward in ["nat", "route", "none"]:
xml += """stp='on' delay='0'"""
if forward == 'bridge':
if forward == "bridge":
xml += f"""name='{bridge}'"""
xml += """/>"""
if openvswitch is True:
xml += """<virtualport type='openvswitch'/>"""
if forward not in ['bridge', 'macvtap']:
if forward not in ["bridge", "macvtap"]:
if ipv4:
xml += f"""<ip address='{gateway}' netmask='{mask}'>"""
if dhcp4:
xml += f"""<dhcp>
<range start='{dhcp4[0]}' end='{dhcp4[1]}' />"""
if fixed:
fist_oct = int(dhcp4[0].strip().split('.')[3])
last_oct = int(dhcp4[1].strip().split('.')[3])
fist_oct = int(dhcp4[0].strip().split(".")[3])
last_oct = int(dhcp4[1].strip().split(".")[3])
for ip in range(fist_oct, last_oct + 1):
xml += f"""<host mac='{util.randomMAC()}' ip='{gateway[:-2]}.{ip}' />"""
xml += """</dhcp>"""
@ -144,23 +158,23 @@ class wvmNetwork(wvmConnect):
if util.get_xml_path(xml, "/network/ip") is None:
return ip_networks
tree = etree.fromstring(xml)
ips = tree.findall('.ip')
ips = tree.findall(".ip")
for ip in ips:
address_str = ip.get('address')
netmask_str = ip.get('netmask')
prefix = ip.get('prefix')
family = ip.get('family', 'ipv4')
base = 32 if family == 'ipv4' else 128
address_str = ip.get("address")
netmask_str = ip.get("netmask")
prefix = ip.get("prefix")
family = ip.get("family", "ipv4")
base = 32 if family == "ipv4" else 128
if prefix:
prefix = int(prefix)
binstr = ((prefix * "1") + ((base - prefix) * "0"))
binstr = (prefix * "1") + ((base - prefix) * "0")
netmask_str = str(IP(int(binstr, base=2)))
if netmask_str:
netmask = IP(netmask_str)
gateway = IP(address_str)
network = IP(gateway.int() & netmask.int())
netmask_str = netmask_str if family == 'ipv4' else str(prefix)
netmask_str = netmask_str if family == "ipv4" else str(prefix)
ret = IP(str(network) + "/" + netmask_str)
else:
ret = IP(str(address_str))
@ -178,12 +192,12 @@ class wvmNetwork(wvmConnect):
forward_dev = util.get_xml_path(xml, "/network/forward/@dev")
return [fw, forward_dev]
def get_dhcp_range(self, family='ipv4'):
def get_dhcp_range(self, family="ipv4"):
xml = self._XMLDesc(0)
if family == 'ipv4':
if family == "ipv4":
dhcpstart = util.get_xml_path(xml, "/network/ip[not(@family='ipv6')]/dhcp/range[1]/@start")
dhcpend = util.get_xml_path(xml, "/network/ip[not(@family='ipv6')]/dhcp/range[1]/@end")
if family == 'ipv6':
if family == "ipv6":
dhcpstart = util.get_xml_path(xml, "/network/ip[@family='ipv6']/dhcp/range[1]/@start")
dhcpend = util.get_xml_path(xml, "/network/ip[@family='ipv6']/dhcp/range[1]/@end")
@ -192,13 +206,13 @@ class wvmNetwork(wvmConnect):
return [IP(dhcpstart), IP(dhcpend)]
def get_dhcp_range_start(self, family='ipv4'):
def get_dhcp_range_start(self, family="ipv4"):
dhcp = self.get_dhcp_range(family)
if not dhcp:
return None
return dhcp[0]
def get_dhcp_range_end(self, family='ipv4'):
def get_dhcp_range_end(self, family="ipv4"):
dhcp = self.get_dhcp_range(family)
if not dhcp:
return None
@ -211,74 +225,77 @@ class wvmNetwork(wvmConnect):
return True
return bool(util.get_xml_path(xml, "/network/ip/dhcp/bootp/@file"))
def get_dhcp_host_addr(self, family='ipv4'):
def get_dhcp_host_addr(self, family="ipv4"):
result = list()
tree = etree.fromstring(self._XMLDesc(0))
for ipdhcp in tree.findall("./ip"):
if family == 'ipv4':
if ipdhcp.get('family') is None:
hosts = ipdhcp.findall('./dhcp/host')
if family == "ipv4":
if ipdhcp.get("family") is None:
hosts = ipdhcp.findall("./dhcp/host")
for host in hosts:
host_ip = host.get('ip')
mac = host.get('mac')
name = host.get('name', '')
result.append({'ip': host_ip, 'mac': mac, 'name': name})
host_ip = host.get("ip")
mac = host.get("mac")
name = host.get("name", "")
result.append({"ip": host_ip, "mac": mac, "name": name})
return result
else:
continue
if family == 'ipv6':
if family == "ipv6":
hosts = tree.xpath("./ip[@family='ipv6']/dhcp/host")
for host in hosts:
host_ip = host.get('ip')
host_id = host.get('id')
name = host.get('name', '')
result.append({'ip': host_ip, 'id': host_id, 'name': name})
host_ip = host.get("ip")
host_id = host.get("id")
name = host.get("name", "")
result.append({"ip": host_ip, "id": host_id, "name": name})
return result
def modify_dhcp_range(self, range_start, range_end, family='ipv4'):
def modify_dhcp_range(self, range_start, range_end, family="ipv4"):
if not self.is_active():
tree = etree.fromstring(self._XMLDesc(0))
if family == 'ipv4':
if family == "ipv4":
dhcp_range = tree.xpath("./ip[not(@family='ipv6')]/dhcp/range")
if family == 'ipv6':
if family == "ipv6":
dhcp_range = tree.xpath("./ip[@family='ipv6']/dhcp/range")
dhcp_range[0].set('start', range_start)
dhcp_range[0].set('end', range_end)
dhcp_range[0].set("start", range_start)
dhcp_range[0].set("end", range_end)
self.wvm.networkDefineXML(etree.tostring(tree).decode())
def delete_fixed_address(self, ip, family='ipv4'):
def delete_fixed_address(self, ip, family="ipv4"):
tree = etree.fromstring(self._XMLDesc(0))
if family == 'ipv4':
if family == "ipv4":
hosts = tree.xpath("/network/ip[not(@family='ipv6')]/dhcp/host")
parent_index = self.parent_count - 2
if family == 'ipv6':
if family == "ipv6":
hosts = tree.xpath("/network/ip[@family='ipv6']/dhcp/host")
parent_index = self.parent_count - 1
for h in hosts:
if h.get('ip') == ip:
if family == 'ipv4':
new_xml = '<host mac="{}" name="{}" ip="{}"/>'.format(h.get('mac'), h.get('name'), ip)
if family == 'ipv6':
new_xml = '<host id="{}" name="{}" ip="{}"/>'.format(h.get('id'), h.get('name'), ip)
if h.get("ip") == ip:
if family == "ipv4":
new_xml = '<host mac="{}" name="{}" ip="{}"/>'.format(h.get("mac"), h.get("name"), ip)
if family == "ipv6":
new_xml = '<host id="{}" name="{}" ip="{}"/>'.format(h.get("id"), h.get("name"), ip)
self.update(VIR_NETWORK_UPDATE_COMMAND_DELETE, VIR_NETWORK_SECTION_IP_DHCP_HOST,
new_xml,
parent_index,
VIR_NETWORK_UPDATE_AFFECT_LIVE | VIR_NETWORK_UPDATE_AFFECT_CONFIG)
self.update(
VIR_NETWORK_UPDATE_COMMAND_DELETE,
VIR_NETWORK_SECTION_IP_DHCP_HOST,
new_xml,
parent_index,
VIR_NETWORK_UPDATE_AFFECT_LIVE | VIR_NETWORK_UPDATE_AFFECT_CONFIG,
)
break
def modify_fixed_address(self, name, address, mac_duid, family='ipv4'):
def modify_fixed_address(self, name, address, mac_duid, family="ipv4"):
tree = etree.fromstring(self._XMLDesc(0))
if family == 'ipv4':
new_xml = '<host mac="{}" {} ip="{}"/>'.format(mac_duid, 'name="' + name + '"' if name else '', IP(address))
if family == "ipv4":
new_xml = '<host mac="{}" {} ip="{}"/>'.format(mac_duid, 'name="' + name + '"' if name else "", IP(address))
hosts = tree.xpath("./ip[not(@family='ipv6')]/dhcp/host")
compare_var = 'mac'
compare_var = "mac"
parent_index = self.parent_count - 2
if family == 'ipv6':
new_xml = '<host id="{}" {} ip="{}"/>'.format(mac_duid, 'name="' + name + '"' if name else '', IP(address))
if family == "ipv6":
new_xml = '<host id="{}" {} ip="{}"/>'.format(mac_duid, 'name="' + name + '"' if name else "", IP(address))
hosts = tree.xpath("./ip[@family='ipv6']/dhcp/host")
compare_var = 'id'
compare_var = "id"
parent_index = self.parent_count - 1
new_host_xml = etree.fromstring(new_xml)
@ -288,17 +305,25 @@ class wvmNetwork(wvmConnect):
host = h
break
if host is None:
self.update(VIR_NETWORK_UPDATE_COMMAND_ADD_LAST, VIR_NETWORK_SECTION_IP_DHCP_HOST, new_xml,
parent_index,
VIR_NETWORK_UPDATE_AFFECT_LIVE | VIR_NETWORK_UPDATE_AFFECT_CONFIG)
self.update(
VIR_NETWORK_UPDATE_COMMAND_ADD_LAST,
VIR_NETWORK_SECTION_IP_DHCP_HOST,
new_xml,
parent_index,
VIR_NETWORK_UPDATE_AFFECT_LIVE | VIR_NETWORK_UPDATE_AFFECT_CONFIG,
)
else:
# change the host
if host.get('name') == new_host_xml.get('name') and host.get('ip') == new_host_xml.get('ip'):
if host.get("name") == new_host_xml.get("name") and host.get("ip") == new_host_xml.get("ip"):
return False
else:
self.update(VIR_NETWORK_UPDATE_COMMAND_MODIFY, VIR_NETWORK_SECTION_IP_DHCP_HOST, new_xml,
parent_index,
VIR_NETWORK_UPDATE_AFFECT_LIVE | VIR_NETWORK_UPDATE_AFFECT_CONFIG)
self.update(
VIR_NETWORK_UPDATE_COMMAND_MODIFY,
VIR_NETWORK_SECTION_IP_DHCP_HOST,
new_xml,
parent_index,
VIR_NETWORK_UPDATE_AFFECT_LIVE | VIR_NETWORK_UPDATE_AFFECT_CONFIG,
)
def get_qos(self):
qos_values = dict()
@ -307,19 +332,19 @@ class wvmNetwork(wvmConnect):
if qos:
qos = qos[0]
in_qos = qos.find('inbound')
in_qos = qos.find("inbound")
if in_qos is not None:
in_av = in_qos.get('average')
in_peak = in_qos.get('peak')
in_burst = in_qos.get('burst')
qos_values['inbound'] = {'average': in_av, 'peak': in_peak, 'burst': in_burst}
in_av = in_qos.get("average")
in_peak = in_qos.get("peak")
in_burst = in_qos.get("burst")
qos_values["inbound"] = {"average": in_av, "peak": in_peak, "burst": in_burst}
out_qos = qos.find('outbound')
out_qos = qos.find("outbound")
if out_qos is not None:
out_av = out_qos.get('average')
out_peak = out_qos.get('peak')
out_burst = out_qos.get('burst')
qos_values['outbound'] = {'average': out_av, 'peak': out_peak, 'burst': out_burst}
out_av = out_qos.get("average")
out_peak = out_qos.get("peak")
out_burst = out_qos.get("burst")
qos_values["outbound"] = {"average": out_av, "peak": out_peak, "burst": out_burst}
return qos_values
def set_qos(self, direction, average, peak, burst):
@ -328,7 +353,7 @@ class wvmNetwork(wvmConnect):
elif direction == "outbound":
xml = f"<outbound average='{average}' peak='{peak}' burst='{burst}'/>"
else:
raise Exception('Direction must be inbound or outbound')
raise Exception("Direction must be inbound or outbound")
tree = etree.fromstring(self._XMLDesc(0))
@ -363,7 +388,7 @@ class wvmNetwork(wvmConnect):
self.leases = self.net.DHCPLeases()
except Exception as e:
self.leases = []
raise f"Error getting {self} DHCP leases: {e}"
raise "Error getting {} DHCP leases: {}".format(self, e)
def get_dhcp_leases(self):
if self.leases is None: