blob: 2285f216ca7e8a466cbdd14fbb02cfbac45085c8 [file] [log] [blame]
# -*- coding: utf-8 -*-
##
# Copyright ETSI Contributors and Others.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# For those usages not covered by the Apache License, Version 2.0 please
# contact with: nfvlabs@tid.es
##
"""
This module contains unit tests for the OpenStack VIM connector
Run this directly with python2 or python3.
"""
from datetime import datetime
import json
import logging
from osm_rovim_gcp.vimconn_gcp import vimconnector
__author__ = "Sergio G.R."
__date__ = "$05-nov-2021 12:00:00$"
class TestGCPOperations:
gcp_conn = None
time_id = datetime.today().strftime("%Y%m%d%H%M%S")
vim_id = "gcp-test-" + time_id
vim_name = vim_id
vm_name = "gcp-test-vm-" + time_id
net_name = "gcp-test-net-" + time_id
cloud_config = None
config = {}
credentials_file = None
image_id = None
image_connector_id = None
flavor_id = None
def setUp(
self,
project_name,
region_name,
credentials_file,
image_id,
image_connector_id,
flavor_id,
):
self.config["project_name"] = project_name
self.config["region_name"] = region_name
try:
with open(credentials_file) as file:
self.config["credentials"] = json.load(file)
except ValueError:
raise Exception(
"Not possible to read credentials JSON file %s",
self.config["credentials"],
)
self.image_id = image_id
self.image_connector_id = image_connector_id
self.flavor_id = flavor_id
# instantiate dummy VIM connector so we can test it
self.gcp_conn = vimconnector(
uuid=self.vim_id,
name=self.vim_name,
tenant_id=project_name,
tenant_name=project_name,
url=None,
url_admin=None,
user=None,
passwd=None,
log_level=None,
config=self.config,
)
def test_networks(self):
net_id_1 = self.gcp_conn.new_network(
self.net_name, None, {"subnet_address": "10.0.0.0/25"}
)
net_id_2 = self.gcp_conn.new_network(
self.net_name, None, {"subnet_address": "10.9.0.0/25"}
)
_ = self.gcp_conn.delete_network(net_id_1[0])
_ = self.gcp_conn.delete_network(net_id_2[0])
def test_vminstances_default(self):
vm_id_1 = self.gcp_conn.new_vminstance(
name=self.vm_name,
description="testvm",
start=True,
image_id=self.image_id,
flavor_id=self.flavor_id,
net_list=[{"name": "default", "use": "mgmt"}],
cloud_config=self.cloud_config,
)
_ = self.gcp_conn.delete_vminstance(vm_id_1[0])
def test_vminstances_2_nets(self):
net_id_1 = self.gcp_conn.new_network(
self.net_name, None, {"subnet_address": "10.0.0.0/25"}
)
net_id_2 = self.gcp_conn.new_network(
self.net_name, None, {"subnet_address": "10.9.0.0/25"}
)
vm_id_1 = self.gcp_conn.new_vminstance(
name=self.vm_name,
description="testvm",
start=True,
image_id=self.image_id,
flavor_id=self.flavor_id,
net_list=[
{"net_id": net_id_1[0], "use": "mgmt"},
{"net_id": net_id_2[0], "use": "internal"},
],
cloud_config=self.cloud_config,
)
_ = self.gcp_conn.delete_vminstance(vm_id_1[0])
_ = self.gcp_conn.delete_network(net_id_1[0])
_ = self.gcp_conn.delete_network(net_id_2[0])
def test_vminstances_image_connector_id(self):
image_id = self.gcp_conn.get_image_list({"name": self.image_connector_id})
vm_id_1 = self.gcp_conn.new_vminstance(
name=self.vm_name,
description="testvm",
start=True,
image_id=image_id[0].get("id"),
flavor_id=self.flavor_id,
net_list=[{"name": "default", "use": "mgmt"}],
cloud_config=self.cloud_config,
)
_ = self.gcp_conn.delete_vminstance(vm_id_1[0])
def test_vminstances_flavor(self):
machine_type = self.gcp_conn.get_flavor_id_from_data(
{
"disk": 10,
"ram": 2048,
"vcpus": 1,
"extended": {"mempage-size": "LARGE", "numas": [{"threads": 1}]},
}
)
vm_id_1 = self.gcp_conn.new_vminstance(
name=self.vm_name,
description="testvm",
start=True,
image_id=self.image_id,
flavor_id=machine_type,
net_list=[{"name": "default", "use": "mgmt"}],
cloud_config=self.cloud_config,
)
_ = self.gcp_conn.delete_vminstance(vm_id_1[0])
if __name__ == "__main__":
# Setting logging parameters:
log_format = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(funcName)s(): %(message)s"
log_formatter = logging.Formatter(log_format, datefmt="%Y-%m-%dT%H:%M:%S")
handler = logging.StreamHandler()
handler.setFormatter(log_formatter)
logger = logging.getLogger("ro.vim.gcp")
logger.setLevel(level=logging.DEBUG)
logger.addHandler(handler)
# Setting relevant values for the tests from environment file
gcp_env_file = "gcp.env"
try:
with open(gcp_env_file) as f:
for line in f:
var, value = line.replace("\n", "").split("=")
if var == "GCP_PROJECT":
project_name = value
elif var == "GCP_REGION":
region_name = value
elif var == "GCP_CREDENTIALS":
credentials_file = value
elif var == "GCP_IMAGE":
image_id = value
elif var == "GCP_IMAGE_CONNECTOR_ID":
image_connector_id = value
elif var == "GCP_FLAVOR":
flavor_id = value
except ValueError:
raise Exception("Wrong format of GCP test environment file")
if (
project_name is None
or region_name is None
or credentials_file is None
or image_id is None
or image_connector_id is None
or flavor_id is None
):
raise Exception(
"GCP test environment file must include at least GCP_PROJECT, GCP_REGION, "
"GCP_CREDENTIALS, GCP_IMAGE, GCP_IMAGE_PATTERN and GCP_FLAVOR"
)
test_gcp = TestGCPOperations()
test_gcp.setUp(
project_name,
region_name,
credentials_file,
image_id,
image_connector_id,
flavor_id,
)
test_gcp.test_networks()
test_gcp.test_vminstances_default()
test_gcp.test_vminstances_2_nets()
test_gcp.test_vminstances_connector_id()
test_gcp.test_vminstances_flavor()