== "persistent-storage:persistent-storage"
):
for vdu_volume in vdu_instantiation_volumes_list:
-
if (
vdu_volume["vim-volume-id"]
and root_disk["id"] == vdu_volume["name"]
):
-
persistent_root_disk[vsd["id"]] = {
"vim_volume_id": vdu_volume["vim-volume-id"],
"image_id": vdu.get("sw-image-desc"),
return persistent_root_disk
else:
-
if root_disk.get("size-of-storage"):
persistent_root_disk[vsd["id"]] = {
"image_id": vdu.get("sw-image-desc"),
and disk["id"] not in persistent_root_disk.keys()
):
for vdu_volume in vdu_instantiation_volumes_list:
-
if vdu_volume["vim-volume-id"] and disk["id"] == vdu_volume["name"]:
-
persistent_disk[disk["id"]] = {
"vim_volume_id": vdu_volume["vim-volume-id"],
}
net_list (list): Net list of VDU
"""
for iface_index, interface in enumerate(target_vdu["interfaces"]):
-
net_text = Ns._check_vld_information_of_interfaces(
interface, ns_preffix, vnf_preffix
)
True if i.get("position") is not None else False
for i in target_vdu["interfaces"]
):
-
Ns._sort_vdu_interfaces(target_vdu)
# If the position info is provided for some interfaces but not all of them, the interfaces
# which has specific position numbers will be placed and others' positions will not be taken care.
else:
-
Ns._partially_locate_vdu_interfaces(target_vdu)
# If the position info is not provided for the interfaces, interfaces will be attached
)
if vdu_instantiation_volumes_list:
-
# Find the root volumes and add to the disk_list
persistent_root_disk = Ns.find_persistent_root_volumes(
vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list
vim_details = yaml.safe_load(f"{vim_details_text}")
for iface_index, interface in enumerate(existing_vdu["interfaces"]):
-
if "port-security-enabled" in interface:
interface["port_security"] = interface.pop("port-security-enabled")
task_index += 1
break
else:
-
for vdu_index, vdu in enumerate(db_vnfr["vdur"]):
extra_dict["params"] = {
"vim_vm_id": vdu["vim-id"],
return self.new(ro_task, task_create_index, None)
def new(self, ro_task, task_index, task_depends):
-
task = ro_task["tasks"][task_index]
task_id = task["task_id"]
target_vim = self.my_vims[ro_task["target_id"]]
)
if task["action"] == "DELETE":
- (new_status, db_vim_info_update,) = self._delete_task(
+ (
+ new_status,
+ db_vim_info_update,
+ ) = self._delete_task(
ro_task, task_index, task_depends, db_ro_task_update
)
new_status = (
else:
refresh_at = ro_task["vim_info"]["refresh_at"]
if refresh_at and refresh_at != -1 and now > refresh_at:
- (new_status, db_vim_info_update,) = self.item2class[
+ (
+ new_status,
+ db_vim_info_update,
+ ) = self.item2class[
task["item"]
].refresh(ro_task)
_update_refresh(new_status)
}
def _format_in(self, kwargs):
-
error_text = ""
try:
indata = None
self,
epa_params,
):
-
target_flavor = {}
indata = {
"vnf": [
self,
epa_params,
):
-
target_flavor = {
"no-target-flavor": "here",
}
self,
epa_params,
):
-
expected_result = {
"find_params": {
"flavor_data": {
self,
epa_params,
):
-
expected_result = {
"find_params": {
"flavor_data": {
self,
epa_params,
):
-
expected_result = {
"find_params": {
"flavor_data": {
self,
epa_params,
):
-
kwargs = {
"db": db,
}
self,
epa_params,
):
-
expected_result = {
"find_params": {
"flavor_data": {
self,
epa_params,
):
-
kwargs = {
"db": db,
}
self, mock_volume_keeping_required
):
"""Find persistent ordinary volume, volume id is not persistent_root_disk dict,
- vim-volume-id is given as instantiation parameter but disk id is not matching."""
+ vim-volume-id is given as instantiation parameter but disk id is not matching.
+ """
mock_volume_keeping_required.return_value = True
vdu_instantiation_volumes_list = [
{
class LockRenew:
-
renew_list = []
# ^ static method, common for all RO. Time ordered list of dictionaries with information of locks that needs to
# be renewed. The time order is achieved as it is appended at the end
@patch("osm_rosdn_juniper_contrail.sdn_api.UnderlayApi")
def test_juniper_contrail_sdn_with_ssl_cert(self, mock_underlay_api):
-
config = {
"ca_cert": "/path/to/certfile",
"project": "test_project",
else:
for index, subnet in enumerate(net_list):
-
net_intr = self.conn_vpc.create_network_interface(
subnet_id=subnet.get("net_id"),
groups=None,
class vimconnector(vimconn.VimConnector):
-
# Translate azure provisioning state to OSM provision state
# The first three ones are the transitional status once a user initiated action has been requested
# Once the operation is complete, it will transition into the states Succeeded or Failed
self._format_vimconn_exception(e)
def _build_os_profile(self, vm_name, cloud_config, image_id):
-
# initial os_profile
os_profile = {"computer_name": vm_name}
self._format_vimconn_exception(e)
def delete_inuse_nic(self, nic_name):
-
# Obtain nic data
nic_data = self.conn_vnet.network_interfaces.get(self.resource_group, nic_name)
# TODO - check if there is a public ip to delete and delete it
if network_interfaces:
-
# Deallocate the vm
async_vm_deallocate = (
self.conn_compute.virtual_machines.begin_deallocate(
class vimconnector(vimconn.VimConnector):
-
# Translate Google Cloud provisioning state to OSM provision state
# The first three ones are the transitional status once a user initiated action has been requested
# Once the operation is complete, it will transition into the states Succeeded or Failed
self.logger.debug("create network name %s, ip_profile %s", net_name, ip_profile)
try:
-
self.logger.debug("creating network_name: {}".format(net_name))
network = "projects/{}/global/networks/default".format(self.project)
)
try:
-
self.logger.debug("creating subnet_name: {}".format(subnet_name))
subnetwork_body = {
)
try:
-
if self.reload_client:
self._reload_connection()
self.logger.debug("Deleting network: {}".format(str(net_id)))
try:
-
net_name = self._get_resource_name_from_resource_id(net_id)
# Check associated VMs
return new_flavor.id
except nvExceptions.Conflict as e:
-
if change_name_if_used and retry < max_retries:
continue
# For VF
elif net["type"] == "VF" or net["type"] == "SR-IOV":
-
port_dict["binding:vnic_type"] = "direct"
# VIO specific Changes
key_id = "vim_volume_id" if "vim_volume_id" in disk.keys() else "vim_id"
if disk.get(key_id):
-
block_device_mapping["vd" + chr(base_disk_index)] = disk[key_id]
existing_vim_volumes.append({"id": disk[key_id]})
key_id = "vim_volume_id" if "vim_volume_id" in disk.keys() else "vim_id"
if disk.get(key_id):
-
# Use existing persistent volume
block_device_mapping["vd" + chr(base_disk_index)] = disk[key_id]
existing_vim_volumes.append({"id": disk[key_id]})
# In case of RO in HA there can be conflicts, two RO trying to assign same floating IP, so retry
# several times
while not assigned:
-
free_floating_ip = self._get_free_floating_ip(
server, floating_network
)
self.neutron.update_port(port[0], port_update)
except Exception:
-
raise vimconn.VimConnException(
"It was not possible to disable port security for port {}".format(
port[0]
k_id (str): Port id in the VIM
"""
try:
-
port_dict = self.neutron.list_ports()
existing_ports = [port["id"] for port in port_dict["ports"] if port_dict]
self.neutron.delete_port(k_id)
except Exception as e:
-
self.logger.error("Error deleting port: {}: {}".format(type(e).__name__, e))
def _delete_volumes_by_id_wth_cinder(
k_item, k_id = self._get_item_name_id(k)
if k_item == "volume":
-
unavailable_vol = self._delete_volumes_by_id_wth_cinder(
k, k_id, volumes_to_hold, created_items
)
keep_waiting = True
elif k_item == "floating_ip":
-
self._delete_floating_ip_by_id(k, k_id, created_items)
except Exception as e:
def action_vminstance(self, vm_id, action_dict, created_items={}):
"""Send and action over a VM instance from VIM
- Returns None or the console dict if the action was successfully sent to the VIM"""
+ Returns None or the console dict if the action was successfully sent to the VIM
+ """
self.logger.debug("Action over VM '%s': %s", vm_id, str(action_dict))
try:
def get_network(self, net_id):
"""Method obtains network details of net_id VIM network
- Return a dict with the fields at filter_dict (see get_network_list) plus some VIM specific>}, ...]"""
+ Return a dict with the fields at filter_dict (see get_network_list) plus some VIM specific>}, ...]
+ """
try:
_, vdc = self.get_vdc_details()
vdc_id = vdc.get("id").split(":")[3]
class TestGCPOperations:
-
gcp_conn = None
time_id = datetime.today().strftime("%Y%m%d%H%M%S")
vim_id = "gcp-test-" + time_id
--- /dev/null
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+---
+fixes:
+ - |
+ Reformat files according to new black validation.