mirror of
https://github.com/retspen/webvirtcloud
synced 2025-01-24 06:05:20 +00:00
196 lines
5.1 KiB
Python
196 lines
5.1 KiB
Python
import random
|
|
import secrets
|
|
import re
|
|
import string
|
|
|
|
import libvirt
|
|
import lxml.etree as etree
|
|
|
|
|
|
def is_kvm_available(xml):
|
|
kvm_domains = get_xml_path(xml, "//domain/@type='kvm'")
|
|
return kvm_domains > 0
|
|
|
|
|
|
def randomMAC():
|
|
"""Generate a random MAC address."""
|
|
# qemu MAC
|
|
oui = [0x52, 0x54, 0x00]
|
|
|
|
mac = oui + [random.randint(0x00, 0xFF), random.randint(0x00, 0xFF), random.randint(0x00, 0xFF)]
|
|
return ":".join(map(lambda x: "%02x" % x, mac))
|
|
|
|
|
|
def randomUUID():
|
|
"""Generate a random UUID."""
|
|
u = [secrets.randbelow(256) for _ in range(16)]
|
|
u[6] = u[6] & 15 | 4 << 4
|
|
u[8] = u[8] & 63 | 2 << 6
|
|
return "-".join(["%02x" * 4, "%02x" * 2, "%02x" * 2, "%02x" * 2, "%02x" * 6]) % tuple(u)
|
|
|
|
|
|
def randomPasswd(length=12, alphabet=string.ascii_letters + string.digits):
|
|
"""Generate a random password"""
|
|
return "".join([secrets.choice(alphabet) for _ in range(length)])
|
|
|
|
|
|
def get_max_vcpus(conn, guest_type=None):
|
|
"""@param conn: libvirt connection to poll for max possible vcpus
|
|
@param guest_type: optional guest type (kvm, etc.)"""
|
|
if type is None:
|
|
guest_type = conn.getType()
|
|
try:
|
|
m = conn.getMaxVcpus(guest_type.lower())
|
|
except libvirt.libvirtError:
|
|
m = 32
|
|
return m
|
|
|
|
|
|
def xml_escape(xml_str):
|
|
"""Replaces chars ' " < > & with xml safe counterparts"""
|
|
if xml_str is None:
|
|
return None
|
|
|
|
xml_str = xml_str.replace("&", "&")
|
|
xml_str = xml_str.replace("'", "'")
|
|
xml_str = xml_str.replace('"', """)
|
|
xml_str = xml_str.replace("<", "<")
|
|
xml_str = xml_str.replace(">", ">")
|
|
return xml_str
|
|
|
|
|
|
def compareMAC(p, q):
|
|
"""Compare two MAC addresses"""
|
|
pa = p.split(":")
|
|
qa = q.split(":")
|
|
|
|
if len(pa) != len(qa):
|
|
if p > q:
|
|
return 1
|
|
else:
|
|
return -1
|
|
|
|
for i in range(len(pa)):
|
|
n = int(pa[i], 0x10) - int(qa[i], 0x10)
|
|
if n > 0:
|
|
return 1
|
|
elif n < 0:
|
|
return -1
|
|
return 0
|
|
|
|
|
|
def get_xml_path(xml, path=None, func=None):
|
|
"""
|
|
Return the content from the passed xml xpath, or return the result
|
|
of a passed function (receives xpathContext as its only arg)
|
|
"""
|
|
doc = None
|
|
result = None
|
|
|
|
doc = etree.fromstring(xml)
|
|
if path:
|
|
result = get_xpath(doc, path)
|
|
elif func:
|
|
result = func(doc)
|
|
|
|
else:
|
|
raise ValueError("'path' or 'func' is required.")
|
|
return result
|
|
|
|
|
|
def get_xpath(doc, path):
|
|
result = None
|
|
|
|
ret = doc.xpath(path)
|
|
if ret is not None:
|
|
if isinstance(ret, list):
|
|
if len(ret) >= 1:
|
|
result = ret[0].text if hasattr(ret[0], "text") else ret[0]
|
|
else:
|
|
result = ret
|
|
|
|
return result
|
|
|
|
|
|
def pretty_mem(val):
|
|
val = int(val)
|
|
if val > (10 * 1024 * 1024):
|
|
return "%2.2f GB" % (val / (1024.0 * 1024.0))
|
|
else:
|
|
return "%2.0f MB" % (val / 1024.0)
|
|
|
|
|
|
def pretty_bytes(val):
|
|
val = int(val)
|
|
if val > (1024 * 1024 * 1024):
|
|
return "%2.2f GB" % (val / (1024.0 * 1024.0 * 1024.0))
|
|
else:
|
|
return "%2.2f MB" % (val / (1024.0 * 1024.0))
|
|
|
|
|
|
def validate_uuid(val):
|
|
if not isinstance(val, str):
|
|
raise ValueError("UUID must be a string.")
|
|
|
|
form = re.match("[a-fA-F0-9]{8}[-]([a-fA-F0-9]{4}[-]){3}[a-fA-F0-9]{12}$", val)
|
|
if form is None:
|
|
form = re.match("[a-fA-F0-9]{32}$", val)
|
|
if form is None:
|
|
raise ValueError(
|
|
"UUID must be a 32-digit hexadecimal number. It may take "
|
|
"the form xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx or may "
|
|
"omit hyphens altogether."
|
|
)
|
|
|
|
else: # UUID had no dashes, so add them in
|
|
val = val[0:8] + "-" + val[8:12] + "-" + val[12:16] + "-" + val[16:20] + "-" + val[20:32]
|
|
return val
|
|
|
|
|
|
def validate_macaddr(val):
|
|
if val is None:
|
|
return
|
|
|
|
if not (isinstance(val, str) or isinstance(val, str)):
|
|
raise ValueError("MAC address must be a string.")
|
|
|
|
form = re.match("^([0-9a-fA-F]{1,2}:){5}[0-9a-fA-F]{1,2}$", val)
|
|
if form is None:
|
|
raise ValueError("MAC address must be of the format AA:BB:CC:DD:EE:FF, was {}".format(val))
|
|
|
|
|
|
# Mapping of UEFI binary names to their associated architectures. We
|
|
# only use this info to do things automagically for the user, it shouldn't
|
|
# validate anything the user explicitly enters.
|
|
UEFI_ARCH_PATTERNS = {
|
|
"i686": [
|
|
r".*ovmf-ia32.*", # fedora, gerd's firmware repo
|
|
],
|
|
"x86_64": [
|
|
r".*OVMF_CODE\.fd", # RHEL
|
|
r".*ovmf-x64/OVMF.*\.fd", # gerd's firmware repo
|
|
r".*ovmf-x86_64-.*", # SUSE
|
|
r".*ovmf.*",
|
|
".*OVMF.*", # generic attempt at a catchall
|
|
],
|
|
"aarch64": [
|
|
r".*AAVMF_CODE\.fd", # RHEL
|
|
r".*aarch64/QEMU_EFI.*", # gerd's firmware repo
|
|
r".*aarch64.*", # generic attempt at a catchall
|
|
],
|
|
"armv7l": [
|
|
r".*arm/QEMU_EFI.*", # fedora, gerd's firmware repo
|
|
],
|
|
}
|
|
|
|
|
|
def vol_dev_type(vol_bus):
|
|
if vol_bus == "ide":
|
|
return "hd"
|
|
elif vol_bus == "fdc":
|
|
return "fd"
|
|
elif vol_bus == "virtio":
|
|
return "vd"
|
|
elif vol_bus in ["scsi", "sata"]:
|
|
return "sd"
|
|
|