log = logging.getLogger(__name__)
-API_VERSION = '27.0'
+API_VERSION = "27.0"
class VMwareCollector(BaseVimCollector):
- def __init__(self, config: Config, vim_account_id: str):
+ def __init__(self, config: Config, vim_account_id: str, vim_session: object):
super().__init__(config, vim_account_id)
self.common_db = CommonDbClient(config)
vim_account = self.get_vim_account(vim_account_id)
- self.vcloud_site = vim_account['vim_url']
- self.admin_username = vim_account['admin_username']
- self.admin_password = vim_account['admin_password']
- self.vrops = vROPS_Helper(vrops_site=vim_account['vrops_site'],
- vrops_user=vim_account['vrops_user'],
- vrops_password=vim_account['vrops_password'])
+ self.vcloud_site = vim_account["vim_url"]
+ self.admin_username = vim_account["admin_username"]
+ self.admin_password = vim_account["admin_password"]
+ self.vrops = vROPS_Helper(
+ vrops_site=vim_account["vrops_site"],
+ vrops_user=vim_account["vrops_user"],
+ vrops_password=vim_account["vrops_password"],
+ )
def connect_as_admin(self):
- """ Method connect as pvdc admin user to vCloud director.
- There are certain action that can be done only by provider vdc admin user.
- Organization creation / provider network creation etc.
+ """Method connect as pvdc admin user to vCloud director.
+ There are certain action that can be done only by provider vdc admin user.
+ Organization creation / provider network creation etc.
- Returns:
- The return client object that letter can be used to connect to vcloud direct as admin for provider vdc
+ Returns:
+ The return client object that letter can be used to connect to vcloud direct as admin for provider vdc
"""
log.debug("Logging into vCD org as admin.")
host = self.vcloud_site
admin_user = self.admin_username
admin_passwd = self.admin_password
- org = 'System'
+ org = "System"
client = Client(host, verify_ssl_certs=False)
client.set_highest_supported_version()
- client.set_credentials(BasicLoginCredentials(admin_user, org,
- admin_passwd))
+ client.set_credentials(BasicLoginCredentials(admin_user, org, admin_passwd))
return client
except Exception as e:
- log.error("Can't connect to a vCloud director as: {} with exception {}".format(admin_user, e))
+ log.error(
+ "Can't connect to a vCloud director as: {} with exception {}".format(
+ admin_user, e
+ )
+ )
def get_vim_account(self, vim_account_id: str):
"""
- Method to get VIM account details by its ID
- arg - VIM ID
- return - dict with vim account details
+ Method to get VIM account details by its ID
+ arg - VIM ID
+ return - dict with vim account details
"""
vim_account = {}
vim_account_info = self.common_db.get_vim_account(vim_account_id)
- vim_account['vim_url'] = vim_account_info['vim_url']
+ vim_account["vim_url"] = vim_account_info["vim_url"]
- vim_config = vim_account_info['config']
- vim_account['admin_username'] = vim_config['admin_username']
- vim_account['admin_password'] = self.common_db.decrypt_vim_password(vim_config['admin_password'],
- vim_account_info['schema_version'],
- vim_account_id)
- vim_account['vrops_site'] = vim_config['vrops_site']
- vim_account['vrops_user'] = vim_config['vrops_user']
- vim_account['vrops_password'] = self.common_db.decrypt_vim_password(vim_config['vrops_password'],
- vim_account_info['schema_version'],
- vim_account_id)
+ vim_config = vim_account_info["config"]
+ vim_account["admin_username"] = vim_config["admin_username"]
+ vim_account["admin_password"] = vim_config["admin_password"]
+ vim_account["vrops_site"] = vim_config["vrops_site"]
+ vim_account["vrops_user"] = vim_config["vrops_user"]
+ vim_account["vrops_password"] = vim_config["vrops_password"]
return vim_account
def get_vm_moref_id(self, vapp_uuid):
"""
- Method to get the moref_id of given VM
- arg - vapp_uuid
- return - VM mored_id
+ Method to get the moref_id of given VM
+ arg - vapp_uuid
+ return - VM mored_id
"""
vm_moref_id = None
try:
if vm_details and "vm_vcenter_info" in vm_details:
vm_moref_id = vm_details["vm_vcenter_info"].get("vm_moref_id", None)
- log.debug("Found vm_moref_id: {} for vApp UUID: {}".format(vm_moref_id, vapp_uuid))
+ log.debug(
+ "Found vm_moref_id: {} for vApp UUID: {}".format(
+ vm_moref_id, vapp_uuid
+ )
+ )
else:
- log.error("Failed to find vm_moref_id from vApp UUID: {}".format(vapp_uuid))
+ log.error(
+ "Failed to find vm_moref_id from vApp UUID: {}".format(
+ vapp_uuid
+ )
+ )
except Exception as exp:
- log.warning("Error occurred while getting VM moref ID for VM: {}\n{}".format(exp, traceback.format_exc()))
+ log.warning(
+ "Error occurred while getting VM moref ID for VM: {}\n{}".format(
+ exp, traceback.format_exc()
+ )
+ )
return vm_moref_id
log.error("Failed to connect to vCD")
return parsed_respond
- url_list = [self.vcloud_site, '/api/vApp/vapp-', vapp_uuid]
- get_vapp_restcall = ''.join(url_list)
+ url_list = [self.vcloud_site, "/api/vApp/vapp-", vapp_uuid]
+ get_vapp_restcall = "".join(url_list)
if vca._session:
- headers = {'Accept': 'application/*+xml;version=' + API_VERSION,
- 'x-vcloud-authorization': vca._session.headers['x-vcloud-authorization']}
- response = requests.get(get_vapp_restcall,
- headers=headers,
- verify=False)
+ headers = {
+ "Accept": "application/*+xml;version=" + API_VERSION,
+ "x-vcloud-authorization": vca._session.headers[
+ "x-vcloud-authorization"
+ ],
+ }
+ response = requests.get(get_vapp_restcall, headers=headers, verify=False)
if response.status_code != 200:
- log.error("REST API call {} failed. Return status code {}".format(get_vapp_restcall,
- response.content))
+ log.error(
+ "REST API call {} failed. Return status code {}".format(
+ get_vapp_restcall, response.content
+ )
+ )
return parsed_respond
try:
xmlroot_respond = XmlElementTree.fromstring(response.content)
- namespaces = {'vm': 'http://www.vmware.com/vcloud/v1.5',
- "vmext": "http://www.vmware.com/vcloud/extension/v1.5",
- "xmlns": "http://www.vmware.com/vcloud/v1.5"}
+ namespaces = {
+ "vm": "http://www.vmware.com/vcloud/v1.5",
+ "vmext": "http://www.vmware.com/vcloud/extension/v1.5",
+ "xmlns": "http://www.vmware.com/vcloud/v1.5",
+ }
# parse children section for other attrib
- children_section = xmlroot_respond.find('vm:Children/', namespaces)
+ children_section = xmlroot_respond.find("vm:Children/", namespaces)
if children_section is not None:
- vCloud_extension_section = children_section.find('xmlns:VCloudExtension', namespaces)
+ vCloud_extension_section = children_section.find(
+ "xmlns:VCloudExtension", namespaces
+ )
if vCloud_extension_section is not None:
vm_vcenter_info = {}
- vim_info = vCloud_extension_section.find('vmext:VmVimInfo', namespaces)
- vmext = vim_info.find('vmext:VmVimObjectRef', namespaces)
+ vim_info = vCloud_extension_section.find(
+ "vmext:VmVimInfo", namespaces
+ )
+ vmext = vim_info.find("vmext:VmVimObjectRef", namespaces)
if vmext is not None:
- vm_vcenter_info["vm_moref_id"] = vmext.find('vmext:MoRef', namespaces).text
+ vm_vcenter_info["vm_moref_id"] = vmext.find(
+ "vmext:MoRef", namespaces
+ ).text
parsed_respond["vm_vcenter_info"] = vm_vcenter_info
except Exception as exp:
- log.warning("Error occurred for getting vApp details: {}\n{}".format(exp,
- traceback.format_exc()))
+ log.warning(
+ "Error occurred for getting vApp details: {}\n{}".format(
+ exp, traceback.format_exc()
+ )
+ )
return parsed_respond
def collect(self, vnfr: dict):
- vnfd = self.common_db.get_vnfd(vnfr['vnfd-id'])
+ vnfd = self.common_db.get_vnfd(vnfr["vnfd-id"])
vdu_mappings = {}
# Populate extra tags for metrics
- nsr_id = vnfr['nsr-id-ref']
+ nsr_id = vnfr["nsr-id-ref"]
tags = {}
- tags['ns_name'] = self.common_db.get_nsr(nsr_id)['name']
- if vnfr['_admin']['projects_read']:
- tags['project_id'] = vnfr['_admin']['projects_read'][0]
+ tags["ns_name"] = self.common_db.get_nsr(nsr_id)["name"]
+ if vnfr["_admin"]["projects_read"]:
+ tags["project_id"] = vnfr["_admin"]["projects_read"][0]
else:
- tags['project_id'] = None
+ tags["project_id"] = ""
# Fetch the list of all known resources from vROPS.
resource_list = self.vrops.get_vm_resource_list_from_vrops()
- for vdur in vnfr['vdur']:
+ for vdur in vnfr["vdur"]:
# This avoids errors when vdur records have not been completely filled
- if 'name' not in vdur:
+ if "name" not in vdur:
continue
- vdu = next(
- filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu'])
- )
+ vdu = next(filter(lambda vdu: vdu["id"] == vdur["vdu-id-ref"], vnfd["vdu"]))
- if 'monitoring-param' not in vdu:
+ if "monitoring-parameter" not in vdu:
continue
- resource_uuid = vdur['vim-id']
+ resource_uuid = vdur["vim-id"]
# Find vm_moref_id from vApp uuid in vCD
vim_id = self.get_vm_moref_id(resource_uuid)
if vim_id is None:
- log.debug("Failed to find vROPS ID for vApp in vCD: {}".format(resource_uuid))
+ log.debug(
+ "Failed to find vROPS ID for vApp in vCD: {}".format(resource_uuid)
+ )
continue
- vdu_mappings[vim_id] = {'name': vdur['name']}
+ vdu_mappings[vim_id] = {"name": vdur["name"]}
# Map the vROPS instance id to the vim-id so we can look it up.
for resource in resource_list:
- for resourceIdentifier in resource['resourceKey']['resourceIdentifiers']:
- if resourceIdentifier['identifierType']['name'] == 'VMEntityObjectID':
- if resourceIdentifier['value'] != vim_id:
+ for resourceIdentifier in resource["resourceKey"][
+ "resourceIdentifiers"
+ ]:
+ if (
+ resourceIdentifier["identifierType"]["name"]
+ == "VMEntityObjectID"
+ ):
+ if resourceIdentifier["value"] != vim_id:
continue
- vdu_mappings[vim_id]['vrops_id'] = resource['identifier']
+ vdu_mappings[vim_id]["vrops_id"] = resource["identifier"]
if len(vdu_mappings) != 0:
- return self.vrops.get_metrics(vdu_mappings=vdu_mappings,
- monitoring_params=vdu['monitoring-param'],
- vnfr=vnfr,
- tags=tags
- )
+ return self.vrops.get_metrics(
+ vdu_mappings=vdu_mappings,
+ monitoring_params=vdu["monitoring-parameter"],
+ vnfr=vnfr,
+ tags=tags,
+ )
else:
return []