"Failed create a new network {}".format(net_name)
)
- def get_vcd_network_list(self):
- """Method available organization for a logged in tenant
-
- Returns:
- The return vca object that letter can be used to connect to vcloud direct as admin
- """
-
- self.logger.debug(
- "get_vcd_network_list(): retrieving network list for vcd {}".format(
- self.tenant_name
- )
- )
-
- if not self.tenant_name:
- raise vimconn.VimConnConnectionException("Tenant name is empty.")
-
- _, vdc = self.get_vdc_details()
- if vdc is None:
- raise vimconn.VimConnConnectionException(
- "Can't retrieve information for a VDC {}".format(self.tenant_name)
- )
-
- vdc_uuid = vdc.get("id").split(":")[3]
- if self.client._session:
- headers = {
- "Accept": "application/*+xml;version=" + API_VERSION,
- "x-vcloud-authorization": self.client._session.headers[
- "x-vcloud-authorization"
- ],
- }
- response = self.perform_request(
- req_type="GET", url=vdc.get("href"), headers=headers
- )
-
- if response.status_code != 200:
- self.logger.error("Failed to get vdc content")
- raise vimconn.VimConnNotFoundException("Failed to get vdc content")
- else:
- content = XmlElementTree.fromstring(response.text)
-
- network_list = []
- try:
- for item in content:
- if item.tag.split("}")[-1] == "AvailableNetworks":
- for net in item:
- response = self.perform_request(
- req_type="GET", url=net.get("href"), headers=headers
- )
-
- if response.status_code != 200:
- self.logger.error("Failed to get network content")
- raise vimconn.VimConnNotFoundException(
- "Failed to get network content"
- )
- else:
- net_details = XmlElementTree.fromstring(response.text)
-
- filter_dict = {}
- net_uuid = net_details.get("id").split(":")
-
- if len(net_uuid) != 4:
- continue
- else:
- net_uuid = net_uuid[3]
- # create dict entry
- self.logger.debug(
- "get_vcd_network_list(): Adding network {} "
- "to a list vcd id {} network {}".format(
- net_uuid, vdc_uuid, net_details.get("name")
- )
- )
- filter_dict["name"] = net_details.get("name")
- filter_dict["id"] = net_uuid
-
- if [
- i.text
- for i in net_details
- if i.tag.split("}")[-1] == "IsShared"
- ][0] == "true":
- shared = True
- else:
- shared = False
-
- filter_dict["shared"] = shared
- filter_dict["tenant_id"] = vdc_uuid
-
- if int(net_details.get("status")) == 1:
- filter_dict["admin_state_up"] = True
- else:
- filter_dict["admin_state_up"] = False
-
- filter_dict["status"] = "ACTIVE"
- filter_dict["type"] = "bridge"
- network_list.append(filter_dict)
- self.logger.debug(
- "get_vcd_network_list adding entry {}".format(
- filter_dict
- )
- )
- except Exception:
- self.logger.debug("Error in get_vcd_network_list", exc_info=True)
- pass
-
- self.logger.debug("get_vcd_network_list returning {}".format(network_list))
-
- return network_list
-
def get_network_list(self, filter_dict={}):
"""Obtain tenant networks of VIM
Filter_dict can be:
def get_network(self, net_id):
"""Method obtains network details of net_id VIM network
- Return a dict with the fields at filter_dict (see get_network_list) plus some VIM specific>}, ...]"""
+ Return a dict with the fields at filter_dict (see get_network_list) plus some VIM specific>}, ...]
+ """
try:
_, vdc = self.get_vdc_details()
vdc_id = vdc.get("id").split(":")[3]
:param created_items: dictionary with extra items to be deleted. provided by method new_network
Returns the network identifier or raises an exception upon error or when network is not found
"""
-
- # ############# Stub code for SRIOV #################
- # dvport_group = self.get_dvport_group(net_id)
- # if dvport_group:
- # #delete portgroup
- # status = self.destroy_dvport_group(net_id)
- # if status:
- # # Remove vlanID from persistent info
- # if net_id in self.persistent_info["used_vlanIDs"]:
- # del self.persistent_info["used_vlanIDs"][net_id]
- #
- # return net_id
-
vcd_network = self.get_vcd_network(network_uuid=net_id)
if vcd_network is not None and vcd_network:
if self.delete_network_action(network_uuid=net_id):
"Exception occured while retriving catalog items {}".format(exp)
)
- def get_vappid(self, vdc=None, vapp_name=None):
- """Method takes vdc object and vApp name and returns vapp uuid or None
-
- Args:
- vdc: The VDC object.
- vapp_name: is application vappp name identifier
-
- Returns:
- The return vApp name otherwise None
- """
- if vdc is None or vapp_name is None:
- return None
-
- # UUID has following format https://host/api/vApp/vapp-30da58a3-e7c7-4d09-8f68-d4c8201169cf
- try:
- refs = [
- ref
- for ref in vdc.ResourceEntities.ResourceEntity
- if ref.name == vapp_name
- and ref.type_ == "application/vnd.vmware.vcloud.vApp+xml"
- ]
-
- if len(refs) == 1:
- return refs[0].href.split("vapp")[1][1:]
- except Exception as e:
- self.logger.exception(e)
- return False
-
- return None
-
- def check_vapp(self, vdc=None, vapp_uuid=None):
- """Method Method returns True or False if vapp deployed in vCloud director
-
- Args:
- vca: Connector to VCA
- vdc: The VDC object.
- vappid: vappid is application identifier
-
- Returns:
- The return True if vApp deployed
- :param vdc:
- :param vapp_uuid:
- """
- try:
- refs = [
- ref
- for ref in vdc.ResourceEntities.ResourceEntity
- if ref.type_ == "application/vnd.vmware.vcloud.vApp+xml"
- ]
-
- for ref in refs:
- vappid = ref.href.split("vapp")[1][1:]
- # find vapp with respected vapp uuid
-
- if vappid == vapp_uuid:
- return True
- except Exception as e:
- self.logger.exception(e)
-
- return False
-
- return False
-
def get_namebyvappid(self, vapp_uuid=None):
"""Method returns vApp name from vCD and lookup done by vapp_id.
"The upload iso task failed with status {}".format(result.get("status"))
)
- def get_vcd_availibility_zones(self, respool_href, headers):
- """Method to find presence of av zone is VIM resource pool
-
- Args:
- respool_href - resource pool href
- headers - header information
-
- Returns:
- vcd_az - list of azone present in vCD
- """
- vcd_az = []
- url = respool_href
- resp = self.perform_request(req_type="GET", url=respool_href, headers=headers)
-
- if resp.status_code != requests.codes.ok:
- self.logger.debug(
- "REST API call {} failed. Return status code {}".format(
- url, resp.status_code
- )
- )
- else:
- # Get the href to hostGroups and find provided hostGroup is present in it
- resp_xml = XmlElementTree.fromstring(resp.content)
- for child in resp_xml:
- if "VMWProviderVdcResourcePool" in child.tag:
- for schild in child:
- if "Link" in schild.tag:
- if (
- schild.attrib.get("type")
- == "application/vnd.vmware.admin.vmwHostGroupsType+xml"
- ):
- hostGroup = schild.attrib.get("href")
- hg_resp = self.perform_request(
- req_type="GET", url=hostGroup, headers=headers
- )
-
- if hg_resp.status_code != requests.codes.ok:
- self.logger.debug(
- "REST API call {} failed. Return status code {}".format(
- hostGroup, hg_resp.status_code
- )
- )
- else:
- hg_resp_xml = XmlElementTree.fromstring(
- hg_resp.content
- )
- for hostGroup in hg_resp_xml:
- if "HostGroup" in hostGroup.tag:
- # append host group name to the list
- vcd_az.append(hostGroup.attrib.get("name"))
-
- return vcd_az
-
def set_availability_zones(self):
"""
Set vim availability zone
raise vimconn.VimConnException(msg)
- # #
- # #
- # # based on current discussion
- # #
- # #
- # # server:
- # created: '2016-09-08T11:51:58'
- # description: simple-instance.linux1.1
- # flavor: ddc6776e-75a9-11e6-ad5f-0800273e724c
- # hostId: e836c036-74e7-11e6-b249-0800273e724c
- # image: dde30fe6-75a9-11e6-ad5f-0800273e724c
- # status: ACTIVE
- # error_msg:
- # interfaces: …
- #
def get_vminstance(self, vim_vm_uuid=None):
"""Returns the VM instance information from VIM"""
self.logger.debug("Client requesting vm instance {} ".format(vim_vm_uuid))
return vm_dict
- def delete_vminstance(self, vm__vim_uuid, created_items=None):
+ def delete_vminstance(self, vm_id, created_items=None, volumes_to_hold=None):
"""Method poweroff and remove VM instance from vcloud director network.
Args:
- vm__vim_uuid: VM UUID
+ vm_id: VM UUID
Returns:
Returns the instance identifier
"""
- self.logger.debug(
- "Client requesting delete vm instance {} ".format(vm__vim_uuid)
- )
+ self.logger.debug("Client requesting delete vm instance {} ".format(vm_id))
_, vdc = self.get_vdc_details()
vdc_obj = VDC(self.client, href=vdc.get("href"))
)
try:
- vapp_name = self.get_namebyvappid(vm__vim_uuid)
+ vapp_name = self.get_namebyvappid(vm_id)
if vapp_name is None:
self.logger.debug(
"delete_vminstance(): Failed to get vm by given {} vm uuid".format(
- vm__vim_uuid
+ vm_id
)
)
return (
-1,
"delete_vminstance(): Failed to get vm by given {} vm uuid".format(
- vm__vim_uuid
+ vm_id
),
)
- self.logger.info(
- "Deleting vApp {} and UUID {}".format(vapp_name, vm__vim_uuid)
- )
+ self.logger.info("Deleting vApp {} and UUID {}".format(vapp_name, vm_id))
vapp_resource = vdc_obj.get_vapp(vapp_name)
vapp = VApp(self.client, resource=vapp_resource)
if not powered_off:
self.logger.debug(
"delete_vminstance(): Failed to power off VM instance {} ".format(
- vm__vim_uuid
+ vm_id
)
)
else:
self.logger.info(
"delete_vminstance(): Powered off VM instance {} ".format(
- vm__vim_uuid
+ vm_id
)
)
if not vapp:
self.logger.debug(
"delete_vminstance(): Failed to get vm by given {} vm uuid".format(
- vm__vim_uuid
+ vm_id
)
)
return (
-1,
"delete_vminstance(): Failed to get vm by given {} vm uuid".format(
- vm__vim_uuid
+ vm_id
),
)
if not undeployed:
self.logger.debug(
"delete_vminstance(): Failed to undeploy vApp {} ".format(
- vm__vim_uuid
+ vm_id
)
)
if not vapp:
self.logger.debug(
"delete_vminstance(): Failed to get vm by given {} vm uuid".format(
- vm__vim_uuid
+ vm_id
)
)
return (
-1,
"delete_vminstance(): Failed to get vm by given {} vm uuid".format(
- vm__vim_uuid
+ vm_id
),
)
if result is None:
self.logger.debug(
- "delete_vminstance(): Failed delete uuid {} ".format(
- vm__vim_uuid
- )
+ "delete_vminstance(): Failed delete uuid {} ".format(vm_id)
)
else:
self.logger.info(
- "Deleted vm instance {} sccessfully".format(vm__vim_uuid)
+ "Deleted vm instance {} successfully".format(vm_id)
)
config_drive_catalog_name, config_drive_catalog_id = (
- "cfg_drv-" + vm__vim_uuid,
+ "cfg_drv-" + vm_id,
None,
)
catalog_list = self.get_image_list()
)
self.delete_image(config_drive_catalog_id)
- return vm__vim_uuid
+ return vm_id
except Exception:
self.logger.debug(traceback.format_exc())
raise vimconn.VimConnException(
- "delete_vminstance(): Failed delete vm instance {}".format(vm__vim_uuid)
+ "delete_vminstance(): Failed delete vm instance {}".format(vm_id)
)
def refresh_vms_status(self, vm_list):
exc_info=True,
)
- def action_vminstance(self, vm__vim_uuid=None, action_dict=None, created_items={}):
+ def action_vminstance(self, vm_id=None, action_dict=None, created_items={}):
"""Send and action over a VM instance from VIM
Returns the vm_id if the action was successfully sent to the VIM"""
self.logger.debug(
- "Received action for vm {} and action dict {}".format(
- vm__vim_uuid, action_dict
- )
+ "Received action for vm {} and action dict {}".format(vm_id, action_dict)
)
- if vm__vim_uuid is None or action_dict is None:
+ if vm_id is None or action_dict is None:
raise vimconn.VimConnException("Invalid request. VM id or action is None.")
_, vdc = self.get_vdc_details()
)
)
- vapp_name = self.get_namebyvappid(vm__vim_uuid)
+ vapp_name = self.get_namebyvappid(vm_id)
if vapp_name is None:
self.logger.debug(
"action_vminstance(): Failed to get vm by given {} vm uuid".format(
- vm__vim_uuid
+ vm_id
)
)
raise vimconn.VimConnException(
- "Failed to get vm by given {} vm uuid".format(vm__vim_uuid)
+ "Failed to get vm by given {} vm uuid".format(vm_id)
)
else:
self.logger.info(
- "Action_vminstance vApp {} and UUID {}".format(vapp_name, vm__vim_uuid)
+ "Action_vminstance vApp {} and UUID {}".format(vapp_name, vm_id)
)
try:
self.logger.info(
"action_vminstance: Power on vApp: {}".format(vapp_name)
)
- poweron_task = self.power_on_vapp(vm__vim_uuid, vapp_name)
+ poweron_task = self.power_on_vapp(vm_id, vapp_name)
result = self.client.get_task_monitor().wait_for_success(
task=poweron_task
)
self.instance_actions_result("pause", result, vapp_name)
elif "resume" in action_dict:
self.logger.info("action_vminstance: resume vApp: {}".format(vapp_name))
- poweron_task = self.power_on_vapp(vm__vim_uuid, vapp_name)
+ poweron_task = self.power_on_vapp(vm_id, vapp_name)
result = self.client.get_task_monitor().wait_for_success(
task=poweron_task
)
)
)
- return vm__vim_uuid
+ return vm_id
except Exception as exp:
self.logger.debug("action_vminstance: Failed with Exception {}".format(exp))
return console_dict
- # NOT USED METHODS in current version
-
- def host_vim2gui(self, host, server_dict):
- """Transform host dictionary from VIM format to GUI format,
- and append to the server_dict
- """
- raise vimconn.VimConnNotImplemented("Should have implemented this")
-
def get_hosts_info(self):
"""Get the information of deployed hosts
Returns the hosts content"""
Returns the hosts content"""
raise vimconn.VimConnNotImplemented("Should have implemented this")
- def get_processor_rankings(self):
- """Get the processor rankings in the VIM database"""
- raise vimconn.VimConnNotImplemented("Should have implemented this")
-
- def new_host(self, host_data):
- """Adds a new host to VIM"""
- """Returns status code of the VIM response"""
- raise vimconn.VimConnNotImplemented("Should have implemented this")
-
- def new_external_port(self, port_data):
- """Adds a external port to VIM"""
- """Returns the port identifier"""
- raise vimconn.VimConnNotImplemented("Should have implemented this")
-
- def new_external_network(self, net_name, net_type):
- """Adds a external network to VIM (shared)"""
- """Returns the network identifier"""
- raise vimconn.VimConnNotImplemented("Should have implemented this")
-
- def connect_port_network(self, port_id, network_id, admin=False):
- """Connects a external port to a network"""
- """Returns status code of the VIM response"""
- raise vimconn.VimConnNotImplemented("Should have implemented this")
-
- def new_vminstancefromJSON(self, vm_data):
- """Adds a VM instance to VIM"""
- """Returns the instance identifier"""
- raise vimconn.VimConnNotImplemented("Should have implemented this")
-
def get_network_name_by_id(self, network_uuid=None):
"""Method gets vcloud director network named based on supplied uuid.
return None
- def get_vapp_list(self, vdc_name=None):
- """
- Method retrieves vApp list deployed vCloud director and returns a dictionary
- contains a list of all vapp deployed for queried VDC.
- The key for a dictionary is vApp UUID
-
-
- Args:
- vca - is active VCA connection.
- vdc_name - is a vdc name that will be used to query vms action
-
- Returns:
- The return dictionary and key for each entry vapp UUID
- """
- vapp_dict = {}
-
- if vdc_name is None:
- return vapp_dict
-
- content = self.vms_view_action(vdc_name=vdc_name)
- try:
- vm_list_xmlroot = XmlElementTree.fromstring(content)
- for vm_xml in vm_list_xmlroot:
- if vm_xml.tag.split("}")[1] == "VMRecord":
- if vm_xml.attrib["isVAppTemplate"] == "true":
- rawuuid = vm_xml.attrib["container"].split("/")[-1:]
- if "vappTemplate-" in rawuuid[0]:
- # vm in format vappTemplate-e63d40e7-4ff5-4c6d-851f-96c1e4da86a5 we remove
- # vm and use raw UUID as key
- vapp_dict[rawuuid[0][13:]] = vm_xml.attrib
- except Exception:
- pass
-
- return vapp_dict
-
- def get_vm_list(self, vdc_name=None):
- """
- Method retrieves VM's list deployed vCloud director. It returns a dictionary
- contains a list of all VM's deployed for queried VDC.
- The key for a dictionary is VM UUID
-
-
- Args:
- vca - is active VCA connection.
- vdc_name - is a vdc name that will be used to query vms action
-
- Returns:
- The return dictionary and key for each entry vapp UUID
- """
- vm_dict = {}
-
- if vdc_name is None:
- return vm_dict
-
- content = self.vms_view_action(vdc_name=vdc_name)
- try:
- vm_list_xmlroot = XmlElementTree.fromstring(content)
- for vm_xml in vm_list_xmlroot:
- if vm_xml.tag.split("}")[1] == "VMRecord":
- if vm_xml.attrib["isVAppTemplate"] == "false":
- rawuuid = vm_xml.attrib["href"].split("/")[-1:]
- if "vm-" in rawuuid[0]:
- # vm in format vm-e63d40e7-4ff5-4c6d-851f-96c1e4da86a5 we remove
- # vm and use raw UUID as key
- vm_dict[rawuuid[0][3:]] = vm_xml.attrib
- except Exception:
- pass
-
- return vm_dict
-
def get_vapp(self, vdc_name=None, vapp_name=None, isuuid=False):
"""
Method retrieves VM deployed vCloud director. It returns VM attribute as dictionary
return None
- def create_vdc_rest(self, vdc_name=None):
+ def get_vapp_details_rest(self, vapp_uuid=None, need_admin_access=False):
"""
- Method create network in vCloud director
+ Method retrieve vapp detail from vCloud director
Args:
- vdc_name - vdc name to be created
+ vapp_uuid - is vapp identifier.
+
Returns:
- The return response
+ The return network uuid or return None
"""
- self.logger.info("Creating new vdc {}".format(vdc_name))
- vca = self.connect_as_admin()
+ parsed_respond = {}
+ vca = None
+
+ if need_admin_access:
+ vca = self.connect_as_admin()
+ else:
+ vca = self.client
if not vca:
raise vimconn.VimConnConnectionException("Failed to connect vCD")
-
- if vdc_name is None:
- return None
-
- url_list = [self.url, "/api/admin/org/", self.org_uuid]
- vm_list_rest_call = "".join(url_list)
-
- if vca._session:
- headers = {
- "Accept": "application/*+xml;version=" + API_VERSION,
- "x-vcloud-authorization": self.client._session.headers[
- "x-vcloud-authorization"
- ],
- }
- response = self.perform_request(
- req_type="GET", url=vm_list_rest_call, headers=headers
- )
- provider_vdc_ref = None
- add_vdc_rest_url = None
- # available_networks = None
-
- if response.status_code != requests.codes.ok:
- self.logger.debug(
- "REST API call {} failed. Return status code {}".format(
- vm_list_rest_call, response.status_code
- )
- )
-
- return None
- else:
- try:
- vm_list_xmlroot = XmlElementTree.fromstring(response.text)
- for child in vm_list_xmlroot:
- # application/vnd.vmware.admin.providervdc+xml
- if child.tag.split("}")[1] == "Link":
- if (
- child.attrib.get("type")
- == "application/vnd.vmware.admin.createVdcParams+xml"
- and child.attrib.get("rel") == "add"
- ):
- add_vdc_rest_url = child.attrib.get("href")
- except Exception:
- self.logger.debug(
- "Failed parse respond for rest api call {}".format(
- vm_list_rest_call
- )
- )
- self.logger.debug("Respond body {}".format(response.text))
-
- return None
-
- response = self.get_provider_rest(vca=vca)
- try:
- vm_list_xmlroot = XmlElementTree.fromstring(response)
- for child in vm_list_xmlroot:
- if child.tag.split("}")[1] == "ProviderVdcReferences":
- for sub_child in child:
- provider_vdc_ref = sub_child.attrib.get("href")
- except Exception:
- self.logger.debug(
- "Failed parse respond for rest api call {}".format(
- vm_list_rest_call
- )
- )
- self.logger.debug("Respond body {}".format(response))
-
- return None
-
- if add_vdc_rest_url is not None and provider_vdc_ref is not None:
- data = """ <CreateVdcParams name="{0:s}" xmlns="http://www.vmware.com/vcloud/v1.5"><Description>{1:s}</Description>
- <AllocationModel>ReservationPool</AllocationModel>
- <ComputeCapacity><Cpu><Units>MHz</Units><Allocated>2048</Allocated><Limit>2048</Limit></Cpu>
- <Memory><Units>MB</Units><Allocated>2048</Allocated><Limit>2048</Limit></Memory>
- </ComputeCapacity><NicQuota>0</NicQuota><NetworkQuota>100</NetworkQuota>
- <VdcStorageProfile><Enabled>true</Enabled><Units>MB</Units><Limit>20480</Limit><Default>true</Default></VdcStorageProfile>
- <ProviderVdcReference
- name="Main Provider"
- href="{2:s}" />
- <UsesFastProvisioning>true</UsesFastProvisioning></CreateVdcParams>""".format(
- escape(vdc_name), escape(vdc_name), provider_vdc_ref
- )
- headers[
- "Content-Type"
- ] = "application/vnd.vmware.admin.createVdcParams+xml"
- response = self.perform_request(
- req_type="POST",
- url=add_vdc_rest_url,
- headers=headers,
- data=data,
- )
-
- # if we all ok we respond with content otherwise by default None
- if response.status_code == 201:
- return response.text
-
- return None
-
- def get_vapp_details_rest(self, vapp_uuid=None, need_admin_access=False):
- """
- Method retrieve vapp detail from vCloud director
-
- Args:
- vapp_uuid - is vapp identifier.
-
- Returns:
- The return network uuid or return None
- """
- parsed_respond = {}
- vca = None
-
- if need_admin_access:
- vca = self.connect_as_admin()
- else:
- vca = self.client
-
- if not vca:
- raise vimconn.VimConnConnectionException("Failed to connect vCD")
- if vapp_uuid is None:
- return None
+ if vapp_uuid is None:
+ return None
url_list = [self.url, "/api/vApp/vapp-", vapp_uuid]
get_vapp_restcall = "".join(url_list)
return parsed_respond
- def acquire_console(self, vm_uuid=None):
- if vm_uuid is None:
- return None
-
- if self.client._session:
- headers = {
- "Accept": "application/*+xml;version=" + API_VERSION,
- "x-vcloud-authorization": self.client._session.headers[
- "x-vcloud-authorization"
- ],
- }
- vm_dict = self.get_vapp_details_rest(vapp_uuid=vm_uuid)
- console_dict = vm_dict["acquireTicket"]
- console_rest_call = console_dict["href"]
-
- response = self.perform_request(
- req_type="POST", url=console_rest_call, headers=headers
- )
-
- if response.status_code == 403:
- response = self.retry_rest("POST", console_rest_call)
-
- if response.status_code == requests.codes.ok:
- return response.text
-
- return None
-
def modify_vm_disk(self, vapp_uuid, flavor_disk):
"""
Method retrieve vm disk details
"affinity".format(exp)
)
- def cloud_init(self, vapp, cloud_config):
- """
- Method to inject ssh-key
- vapp - vapp object
- cloud_config a dictionary with:
- 'key-pairs': (optional) list of strings with the public key to be inserted to the default user
- 'users': (optional) list of users to be inserted, each item is a dict with:
- 'name': (mandatory) user name,
- 'key-pairs': (optional) list of strings with the public key to be inserted to the user
- 'user-data': (optional) can be a string with the text script to be passed directly to cloud-init,
- or a list of strings, each one contains a script to be passed, usually with a MIMEmultipart file
- 'config-files': (optional). List of files to be transferred. Each item is a dict with:
- 'dest': (mandatory) string with the destination absolute path
- 'encoding': (optional, by default text). Can be one of:
- 'b64', 'base64', 'gz', 'gz+b64', 'gz+base64', 'gzip+b64', 'gzip+base64'
- 'content' (mandatory): string with the content of the file
- 'permissions': (optional) string with file permissions, typically octal notation '0644'
- 'owner': (optional) file owner, string with the format 'owner:group'
- 'boot-data-drive': boolean to indicate if user-data must be passed using a boot drive (hard disk
- """
- try:
- if not isinstance(cloud_config, dict):
- raise Exception(
- "cloud_init : parameter cloud_config is not a dictionary"
- )
- else:
- key_pairs = []
- userdata = []
-
- if "key-pairs" in cloud_config:
- key_pairs = cloud_config["key-pairs"]
-
- if "users" in cloud_config:
- userdata = cloud_config["users"]
-
- self.logger.debug("cloud_init : Guest os customization started..")
- customize_script = self.format_script(
- key_pairs=key_pairs, users_list=userdata
- )
- customize_script = customize_script.replace("&", "&")
- self.guest_customization(vapp, customize_script)
- except Exception as exp:
- self.logger.error(
- "cloud_init : exception occurred while injecting " "ssh-key"
- )
-
- raise vimconn.VimConnException(
- "cloud_init : Error {} failed to inject " "ssh-key".format(exp)
- )
-
- def format_script(self, key_pairs=[], users_list=[]):
- bash_script = """#!/bin/sh
-echo performing customization tasks with param $1 at `date "+DATE: %Y-%m-%d - TIME: %H:%M:%S"`>> /root/customization.log
-if [ "$1" = "precustomization" ];then
- echo performing precustomization tasks on `date "+DATE: %Y-%m-%d - TIME: %H:%M:%S"` >> /root/customization.log
-"""
-
- keys = "\n".join(key_pairs)
- if keys:
- keys_data = """
- if [ ! -d /root/.ssh ];then
- mkdir /root/.ssh
- chown root:root /root/.ssh
- chmod 700 /root/.ssh
- touch /root/.ssh/authorized_keys
- chown root:root /root/.ssh/authorized_keys
- chmod 600 /root/.ssh/authorized_keys
- # make centos with selinux happy
- which restorecon && restorecon -Rv /root/.ssh
- else
- touch /root/.ssh/authorized_keys
- chown root:root /root/.ssh/authorized_keys
- chmod 600 /root/.ssh/authorized_keys
- fi
- echo '{key}' >> /root/.ssh/authorized_keys
- """.format(
- key=keys
- )
-
- bash_script += keys_data
-
- for user in users_list:
- if "name" in user:
- user_name = user["name"]
-
- if "key-pairs" in user:
- user_keys = "\n".join(user["key-pairs"])
- else:
- user_keys = None
-
- add_user_name = """
- useradd -d /home/{user_name} -m -g users -s /bin/bash {user_name}
- """.format(
- user_name=user_name
- )
-
- bash_script += add_user_name
-
- if user_keys:
- user_keys_data = """
- mkdir /home/{user_name}/.ssh
- chown {user_name}:{user_name} /home/{user_name}/.ssh
- chmod 700 /home/{user_name}/.ssh
- touch /home/{user_name}/.ssh/authorized_keys
- chown {user_name}:{user_name} /home/{user_name}/.ssh/authorized_keys
- chmod 600 /home/{user_name}/.ssh/authorized_keys
- # make centos with selinux happy
- which restorecon && restorecon -Rv /home/{user_name}/.ssh
- echo '{user_key}' >> /home/{user_name}/.ssh/authorized_keys
- """.format(
- user_name=user_name, user_key=user_keys
- )
- bash_script += user_keys_data
-
- return bash_script + "\n\tfi"
-
- def guest_customization(self, vapp, customize_script):
- """
- Method to customize guest os
- vapp - Vapp object
- customize_script - Customize script to be run at first boot of VM.
- """
- for vm in vapp.get_all_vms():
- vm_id = vm.get("id").split(":")[-1]
- vm_name = vm.get("name")
- vm_name = vm_name.replace("_", "-")
-
- vm_customization_url = (
- "{}/api/vApp/vm-{}/guestCustomizationSection/".format(self.url, vm_id)
- )
- headers = {
- "Accept": "application/*+xml;version=" + API_VERSION,
- "x-vcloud-authorization": self.client._session.headers[
- "x-vcloud-authorization"
- ],
- }
-
- headers[
- "Content-Type"
- ] = "application/vnd.vmware.vcloud.guestCustomizationSection+xml"
-
- data = """<GuestCustomizationSection
- xmlns="http://www.vmware.com/vcloud/v1.5"
- xmlns:ovf="http://schemas.dmtf.org/ovf/envelope/1"
- ovf:required="false" href="{}"
- type="application/vnd.vmware.vcloud.guestCustomizationSection+xml">
- <ovf:Info>Specifies Guest OS Customization Settings</ovf:Info>
- <Enabled>true</Enabled>
- <ChangeSid>false</ChangeSid>
- <VirtualMachineId>{}</VirtualMachineId>
- <JoinDomainEnabled>false</JoinDomainEnabled>
- <UseOrgSettings>false</UseOrgSettings>
- <AdminPasswordEnabled>false</AdminPasswordEnabled>
- <AdminPasswordAuto>true</AdminPasswordAuto>
- <AdminAutoLogonEnabled>false</AdminAutoLogonEnabled>
- <AdminAutoLogonCount>0</AdminAutoLogonCount>
- <ResetPasswordRequired>false</ResetPasswordRequired>
- <CustomizationScript>{}</CustomizationScript>
- <ComputerName>{}</ComputerName>
- <Link href="{}"
- type="application/vnd.vmware.vcloud.guestCustomizationSection+xml" rel="edit"/>
- </GuestCustomizationSection>
- """.format(
- vm_customization_url,
- vm_id,
- customize_script,
- vm_name,
- vm_customization_url,
- )
-
- response = self.perform_request(
- req_type="PUT", url=vm_customization_url, headers=headers, data=data
- )
- if response.status_code == 202:
- guest_task = self.get_task_from_response(response.text)
- self.client.get_task_monitor().wait_for_success(task=guest_task)
- self.logger.info(
- "guest_customization : customized guest os task "
- "completed for VM {}".format(vm_name)
- )
- else:
- self.logger.error(
- "guest_customization : task for customized guest os"
- "failed for VM {}".format(vm_name)
- )
-
- raise vimconn.VimConnException(
- "guest_customization : failed to perform"
- "guest os customization on VM {}".format(vm_name)
- )
-
def add_new_disk(self, vapp_uuid, disk_size):
"""
Method to create an empty vm disk
elif exp_type == "NotFound":
raise vimconn.VimConnNotFoundException(message=msg)
- def add_sriov(self, vapp_uuid, sriov_nets, vmname_andid):
- """
- Method to attach SRIOV adapters to VM
-
- Args:
- vapp_uuid - uuid of vApp/VM
- sriov_nets - SRIOV devices infromation as specified in VNFD (flavor)
- vmname_andid - vmname
-
- Returns:
- The status of add SRIOV adapter task , vm object and
- vcenter_conect object
- """
- vm_obj = None
- vcenter_conect, content = self.get_vcenter_content()
- vm_moref_id = self.get_vm_moref_id(vapp_uuid)
-
- if vm_moref_id:
- try:
- no_of_sriov_devices = len(sriov_nets)
- if no_of_sriov_devices > 0:
- # Get VM and its host
- host_obj, vm_obj = self.get_vm_obj(content, vm_moref_id)
- self.logger.info(
- "VM {} is currently on host {}".format(vm_obj, host_obj)
- )
-
- if host_obj and vm_obj:
- # get SRIOV devies from host on which vapp is currently installed
- avilable_sriov_devices = self.get_sriov_devices(
- host_obj,
- no_of_sriov_devices,
- )
-
- if len(avilable_sriov_devices) == 0:
- # find other hosts with active pci devices
- (
- new_host_obj,
- avilable_sriov_devices,
- ) = self.get_host_and_sriov_devices(
- content,
- no_of_sriov_devices,
- )
-
- if (
- new_host_obj is not None
- and len(avilable_sriov_devices) > 0
- ):
- # Migrate vm to the host where SRIOV devices are available
- self.logger.info(
- "Relocate VM {} on new host {}".format(
- vm_obj, new_host_obj
- )
- )
- task = self.relocate_vm(new_host_obj, vm_obj)
-
- if task is not None:
- result = self.wait_for_vcenter_task(
- task, vcenter_conect
- )
- self.logger.info(
- "Migrate VM status: {}".format(result)
- )
- host_obj = new_host_obj
- else:
- self.logger.info(
- "Fail to migrate VM : {}".format(result)
- )
-
- raise vimconn.VimConnNotFoundException(
- "Fail to migrate VM : {} to host {}".format(
- vmname_andid, new_host_obj
- )
- )
-
- if (
- host_obj is not None
- and avilable_sriov_devices is not None
- and len(avilable_sriov_devices) > 0
- ):
- # Add SRIOV devices one by one
- for sriov_net in sriov_nets:
- network_name = sriov_net.get("net_id")
- self.create_dvPort_group(network_name)
-
- if (
- sriov_net.get("type") == "VF"
- or sriov_net.get("type") == "SR-IOV"
- ):
- # add vlan ID ,Modify portgroup for vlan ID
- self.configure_vlanID(
- content, vcenter_conect, network_name
- )
-
- task = self.add_sriov_to_vm(
- content,
- vm_obj,
- host_obj,
- network_name,
- avilable_sriov_devices[0],
- )
-
- if task:
- status = self.wait_for_vcenter_task(
- task, vcenter_conect
- )
-
- if status:
- self.logger.info(
- "Added SRIOV {} to VM {}".format(
- no_of_sriov_devices, str(vm_obj)
- )
- )
- else:
- self.logger.error(
- "Fail to add SRIOV {} to VM {}".format(
- no_of_sriov_devices, str(vm_obj)
- )
- )
-
- raise vimconn.VimConnUnexpectedResponse(
- "Fail to add SRIOV adapter in VM {}".format(
- str(vm_obj)
- )
- )
-
- return True, vm_obj, vcenter_conect
- else:
- self.logger.error(
- "Currently there is no host with"
- " {} number of avaialble SRIOV "
- "VFs required for VM {}".format(
- no_of_sriov_devices, vmname_andid
- )
- )
-
- raise vimconn.VimConnNotFoundException(
- "Currently there is no host with {} "
- "number of avaialble SRIOV devices required for VM {}".format(
- no_of_sriov_devices, vmname_andid
- )
- )
- else:
- self.logger.debug(
- "No infromation about SRIOV devices {} ", sriov_nets
- )
- except vmodl.MethodFault as error:
- self.logger.error("Error occurred while adding SRIOV {} ", error)
-
- return None, vm_obj, vcenter_conect
-
def get_sriov_devices(self, host, no_of_vfs):
"""
Method to get the details of SRIOV devices on given host
return sriovInfo
- def get_host_and_sriov_devices(self, content, no_of_vfs):
- """
- Method to get the details of SRIOV devices infromation on all hosts
-
- Args:
- content - vSphere host object
- no_of_vfs - number of pci VFs needed on host
-
- Returns:
- array of SRIOV devices and host object
- """
- host_obj = None
- sriov_device_objs = None
-
- try:
- if content:
- container = content.viewManager.CreateContainerView(
- content.rootFolder, [vim.HostSystem], True
- )
-
- for host in container.view:
- devices = self.get_sriov_devices(host, no_of_vfs)
-
- if devices:
- host_obj = host
- sriov_device_objs = devices
- break
- except Exception as exp:
- self.logger.error(
- "Error {} occurred while finding SRIOV devices on host: {}".format(
- exp, host_obj
- )
- )
-
- return host_obj, sriov_device_objs
-
- def add_sriov_to_vm(self, content, vm_obj, host_obj, network_name, sriov_device):
- """
- Method to add SRIOV adapter to vm
-
- Args:
- host_obj - vSphere host object
- vm_obj - vSphere vm object
- content - vCenter content object
- network_name - name of distributed virtaul portgroup
- sriov_device - SRIOV device info
-
- Returns:
- task object
- """
- devices = []
- vnic_label = "sriov nic"
-
- try:
- dvs_portgr = self.get_dvport_group(network_name)
- network_name = dvs_portgr.name
- nic = vim.vm.device.VirtualDeviceSpec()
- # VM device
- nic.operation = vim.vm.device.VirtualDeviceSpec.Operation.add
- nic.device = vim.vm.device.VirtualSriovEthernetCard()
- nic.device.addressType = "assigned"
- # nic.device.key = 13016
- nic.device.deviceInfo = vim.Description()
- nic.device.deviceInfo.label = vnic_label
- nic.device.deviceInfo.summary = network_name
- nic.device.backing = vim.vm.device.VirtualEthernetCard.NetworkBackingInfo()
-
- nic.device.backing.network = self.get_obj(
- content, [vim.Network], network_name
- )
- nic.device.backing.deviceName = network_name
- nic.device.backing.useAutoDetect = False
- nic.device.connectable = vim.vm.device.VirtualDevice.ConnectInfo()
- nic.device.connectable.startConnected = True
- nic.device.connectable.allowGuestControl = True
-
- nic.device.sriovBacking = (
- vim.vm.device.VirtualSriovEthernetCard.SriovBackingInfo()
- )
- nic.device.sriovBacking.physicalFunctionBacking = (
- vim.vm.device.VirtualPCIPassthrough.DeviceBackingInfo()
- )
- nic.device.sriovBacking.physicalFunctionBacking.id = sriov_device.id
-
- devices.append(nic)
- vmconf = vim.vm.ConfigSpec(deviceChange=devices)
- task = vm_obj.ReconfigVM_Task(vmconf)
-
- return task
- except Exception as exp:
- self.logger.error(
- "Error {} occurred while adding SRIOV adapter in VM: {}".format(
- exp, vm_obj
- )
- )
-
- return None
-
- def create_dvPort_group(self, network_name):
- """
- Method to create disributed virtual portgroup
-
- Args:
- network_name - name of network/portgroup
-
- Returns:
- portgroup key
- """
- try:
- new_network_name = [network_name, "-", str(uuid.uuid4())]
- network_name = "".join(new_network_name)
- vcenter_conect, content = self.get_vcenter_content()
-
- dv_switch = self.get_obj(
- content, [vim.DistributedVirtualSwitch], self.dvs_name
- )
-
- if dv_switch:
- dv_pg_spec = vim.dvs.DistributedVirtualPortgroup.ConfigSpec()
- dv_pg_spec.name = network_name
-
- dv_pg_spec.type = (
- vim.dvs.DistributedVirtualPortgroup.PortgroupType.earlyBinding
- )
- dv_pg_spec.defaultPortConfig = (
- vim.dvs.VmwareDistributedVirtualSwitch.VmwarePortConfigPolicy()
- )
- dv_pg_spec.defaultPortConfig.securityPolicy = (
- vim.dvs.VmwareDistributedVirtualSwitch.SecurityPolicy()
- )
- dv_pg_spec.defaultPortConfig.securityPolicy.allowPromiscuous = (
- vim.BoolPolicy(value=False)
- )
- dv_pg_spec.defaultPortConfig.securityPolicy.forgedTransmits = (
- vim.BoolPolicy(value=False)
- )
- dv_pg_spec.defaultPortConfig.securityPolicy.macChanges = vim.BoolPolicy(
- value=False
- )
-
- task = dv_switch.AddDVPortgroup_Task([dv_pg_spec])
- self.wait_for_vcenter_task(task, vcenter_conect)
-
- dvPort_group = self.get_obj(
- content, [vim.dvs.DistributedVirtualPortgroup], network_name
- )
-
- if dvPort_group:
- self.logger.info(
- "Created disributed virtaul port group: {}".format(dvPort_group)
- )
- return dvPort_group.key
- else:
- self.logger.debug(
- "No disributed virtual switch found with name {}".format(
- network_name
- )
- )
-
- except Exception as exp:
- self.logger.error(
- "Error occurred while creating disributed virtaul port group {}"
- " : {}".format(network_name, exp)
- )
-
- return None
-
def reconfig_portgroup(self, content, dvPort_group_name, config_info={}):
"""
Method to reconfigure disributed virtual portgroup
return None
- def destroy_dvport_group(self, dvPort_group_name):
- """
- Method to destroy disributed virtual portgroup
-
- Args:
- network_name - name of network/portgroup
-
- Returns:
- True if portgroup successfully got deleted else false
- """
- vcenter_conect, _ = self.get_vcenter_content()
-
- try:
- status = None
- dvPort_group = self.get_dvport_group(dvPort_group_name)
-
- if dvPort_group:
- task = dvPort_group.Destroy_Task()
- status = self.wait_for_vcenter_task(task, vcenter_conect)
-
- return status
- except vmodl.MethodFault as exp:
- self.logger.error(
- "Caught vmodl fault {} while deleting disributed virtaul port group {}".format(
- exp, dvPort_group_name
- )
- )
-
- return None
-
def get_dvport_group(self, dvPort_group_name):
"""
Method to get disributed virtual portgroup
return vlanId
- def configure_vlanID(self, content, vcenter_conect, dvPort_group_name):
- """
- Method to configure vlanID in disributed virtual portgroup vlanID
-
- Args:
- network_name - name of network/portgroup
-
- Returns:
- None
- """
- vlanID = self.get_vlanID_from_dvs_portgr(dvPort_group_name)
-
- if vlanID == 0:
- # configure vlanID
- vlanID = self.genrate_vlanID(dvPort_group_name)
- config = {"vlanID": vlanID}
- task = self.reconfig_portgroup(
- content, dvPort_group_name, config_info=config
- )
-
- if task:
- status = self.wait_for_vcenter_task(task, vcenter_conect)
-
- if status:
- self.logger.info(
- "Reconfigured Port group {} for vlan ID {}".format(
- dvPort_group_name, vlanID
- )
- )
- else:
- self.logger.error(
- "Fail reconfigure portgroup {} for vlanID{}".format(
- dvPort_group_name, vlanID
- )
- )
-
- def genrate_vlanID(self, network_name):
- """
- Method to get unused vlanID
- Args:
- network_name - name of network/portgroup
- Returns:
- vlanID
- """
- vlan_id = None
- used_ids = []
-
- if self.config.get("vlanID_range") is None:
- raise vimconn.VimConnConflictException(
- "You must provide a 'vlanID_range' "
- "at config value before creating sriov network with vlan tag"
- )
-
- if "used_vlanIDs" not in self.persistent_info:
- self.persistent_info["used_vlanIDs"] = {}
- else:
- used_ids = list(self.persistent_info["used_vlanIDs"].values())
-
- for vlanID_range in self.config.get("vlanID_range"):
- start_vlanid, end_vlanid = vlanID_range.split("-")
-
- if start_vlanid > end_vlanid:
- raise vimconn.VimConnConflictException(
- "Invalid vlan ID range {}".format(vlanID_range)
- )
-
- for vid in range(int(start_vlanid), int(end_vlanid) + 1):
- if vid not in used_ids:
- vlan_id = vid
- self.persistent_info["used_vlanIDs"][network_name] = vlan_id
- return vlan_id
-
- if vlan_id is None:
- raise vimconn.VimConnConflictException("All Vlan IDs are in use")
-
- def get_obj(self, content, vimtype, name):
- """
- Get the vsphere object associated with a given text name
- """
- obj = None
- container = content.viewManager.CreateContainerView(
- content.rootFolder, vimtype, True
- )
-
- for item in container.view:
- if item.name == name:
- obj = item
- break
-
- return obj
-
def insert_media_to_vm(self, vapp, image_id):
"""
Method to insert media CD-ROM (ISO image) from catalog to vm.