# devops-stages/stage-build.sh
#
-FROM ubuntu:20.04
+FROM ubuntu:22.04
ARG APT_PROXY
RUN if [ ! -z $APT_PROXY ] ; then \
python3 \
python3-all \
python3-dev \
- python3-setuptools
-
-RUN python3 -m easy_install pip==21.3.1
-RUN pip install tox==3.24.5
+ python3-setuptools \
+ python3-pip \
+ tox
timeout_large = 120
timeout_short = 30
- def __init__(self, loop, uri, **kwargs):
- self.loop = loop
+ def __init__(self, uri, **kwargs):
self.uri = uri
self.username = kwargs.get("username")
except asyncio.TimeoutError:
raise ROClientException("Timeout", http_code=504)
except Exception as e:
+ self.logger.critical(
+ "Got invalid version text: '{}'; causing exception {}".format(
+ response_text, str(e)
+ )
+ )
raise ROClientException(
"Got invalid version text: '{}'; causing exception {}".format(
response_text, e
raise ROClientException("Invalid item {}".format(item))
if item == "tenant":
all_tenants = None
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
content = await self._list_item(
session,
self.client_to_RO[item],
elif item == "vim_account":
all_tenants = False
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
content = await self._get_item(
session,
self.client_to_RO[item],
if item in ("tenant", "vim", "wim"):
all_tenants = None
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
result = await self._del_item(
session,
self.client_to_RO[item],
create_desc = self._create_envelop(item, desc)
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
_all_tenants = all_tenants
if item == "vim":
_all_tenants = True
create_desc = self._create_envelop(item, desc)
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
outdata = await self._create_item(
session,
self.client_to_RO[item],
# create_desc = self._create_envelop(item, desc)
create_desc = desc
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
_all_tenants = all_tenants
if item == "vim":
_all_tenants = True
)
create_desc = self._create_envelop(item, desc)
payload_req = yaml.safe_dump(create_desc)
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
# check that exist
item_id = await self._get_item_uuid(
session, self.client_to_RO[item], item_id_name, all_tenants=True
async def detach(self, item, item_id_name=None):
# TODO replace the code with delete_item(vim_account,...)
try:
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
# check that exist
item_id = await self._get_item_uuid(
session, self.client_to_RO[item], item_id_name, all_tenants=False
raise ROClientException(str(content), http_code=mano_response.status)
else:
raise ROClientException("Unknown value for action '{}".format(str(action)))
-
-
-if __name__ == "__main__":
- RO_URL = "http://localhost:9090/openmano"
- TEST_TENANT = "myTenant"
- TEST_VIM1 = "myvim"
- TEST_URL1 = "https://localhost:5000/v1"
- TEST_TYPE1 = "openstack"
- TEST_CONFIG1 = {"use_floating_ip": True}
- TEST_VIM2 = "myvim2"
- TEST_URL2 = "https://localhost:5000/v2"
- TEST_TYPE2 = "openvim"
- TEST_CONFIG2 = {"config2": "config2", "config3": True}
-
- streamformat = "%(asctime)s %(name)s %(levelname)s: %(message)s"
- logging.basicConfig(format=streamformat)
- logger = logging.getLogger("ROClient")
-
- tenant_id = None
- vim_id = False
- loop = asyncio.get_event_loop()
- myClient = ROClient(uri=RO_URL, loop=loop, loglevel="DEBUG")
- try:
- # test tenant
- content = loop.run_until_complete(myClient.get_list("tenant"))
- print("tenants", content)
- content = loop.run_until_complete(myClient.create("tenant", name=TEST_TENANT))
- tenant_id = True
- content = loop.run_until_complete(myClient.show("tenant", TEST_TENANT))
- print("tenant", TEST_TENANT, content)
- content = loop.run_until_complete(
- myClient.edit("tenant", TEST_TENANT, description="another description")
- )
- content = loop.run_until_complete(myClient.show("tenant", TEST_TENANT))
- print("tenant edited", TEST_TENANT, content)
- myClient["tenant"] = TEST_TENANT
-
- # test VIM
- content = loop.run_until_complete(
- myClient.create(
- "vim",
- name=TEST_VIM1,
- type=TEST_TYPE1,
- vim_url=TEST_URL1,
- config=TEST_CONFIG1,
- )
- )
- vim_id = True
- content = loop.run_until_complete(myClient.get_list("vim"))
- print("vim", content)
- content = loop.run_until_complete(myClient.show("vim", TEST_VIM1))
- print("vim", TEST_VIM1, content)
- content = loop.run_until_complete(
- myClient.edit(
- "vim",
- TEST_VIM1,
- description="another description",
- name=TEST_VIM2,
- type=TEST_TYPE2,
- vim_url=TEST_URL2,
- config=TEST_CONFIG2,
- )
- )
- content = loop.run_until_complete(myClient.show("vim", TEST_VIM2))
- print("vim edited", TEST_VIM2, content)
-
- # test VIM_ACCOUNT
- content = loop.run_until_complete(
- myClient.attach_datacenter(
- TEST_VIM2,
- vim_username="user",
- vim_password="pass",
- vim_tenant_name="vimtenant1",
- config=TEST_CONFIG1,
- )
- )
- vim_id = True
- content = loop.run_until_complete(myClient.get_list("vim_account"))
- print("vim_account", content)
- content = loop.run_until_complete(myClient.show("vim_account", TEST_VIM2))
- print("vim_account", TEST_VIM2, content)
- content = loop.run_until_complete(
- myClient.edit(
- "vim_account",
- TEST_VIM2,
- vim_username="user2",
- vim_password="pass2",
- vim_tenant_name="vimtenant2",
- config=TEST_CONFIG2,
- )
- )
- content = loop.run_until_complete(myClient.show("vim_account", TEST_VIM2))
- print("vim_account edited", TEST_VIM2, content)
-
- myClient["vim"] = TEST_VIM2
-
- except Exception as e:
- logger.error("Error {}".format(e), exc_info=True)
-
- for item in (
- ("vim_account", TEST_VIM1),
- ("vim", TEST_VIM1),
- ("vim_account", TEST_VIM2),
- ("vim", TEST_VIM2),
- ("tenant", TEST_TENANT),
- ):
- try:
- content = loop.run_until_complete(myClient.delete(item[0], item[1]))
- print("{} {} deleted; {}".format(item[0], item[1], content))
- except Exception as e:
- if e.http_code == 404:
- print("{} {} not present or already deleted".format(item[0], item[1]))
- else:
- logger.error("Error {}".format(e), exc_info=True)
-
- loop.close()
main_config = LcmCfg()
- def __init__(self, config_file, loop=None):
+ def __init__(self, config_file):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
self.logger.critical("Loaded configuration:" + str(self.main_config.to_dict()))
# TODO: check if lcm_hc.py is necessary
self.health_check_file = get_health_check_file(self.main_config.to_dict())
- self.loop = loop or asyncio.get_event_loop()
self.ns = (
self.netslice
) = (
# copy message configuration in order to remove 'group_id' for msg_admin
config_message = self.main_config.message.to_dict()
- config_message["loop"] = self.loop
+ config_message["loop"] = asyncio.get_event_loop()
if config_message["driver"] == "local":
self.msg = msglocal.MsgLocal()
self.msg.connect(config_message)
# try new RO, if fail old RO
try:
self.main_config.RO.uri = ro_uri + "ro"
- ro_server = NgRoClient(self.loop, **self.main_config.RO.to_dict())
+ ro_server = NgRoClient(**self.main_config.RO.to_dict())
ro_version = await ro_server.get_version()
self.main_config.RO.ng = True
except Exception:
self.main_config.RO.uri = ro_uri + "openmano"
- ro_server = ROClient(self.loop, **self.main_config.RO.to_dict())
+ ro_server = ROClient(**self.main_config.RO.to_dict())
ro_version = await ro_server.get_version()
self.main_config.RO.ng = False
if versiontuple(ro_version) < versiontuple(min_RO_version):
"worker_id": self.worker_id,
"version": lcm_version,
},
- self.loop,
)
# time between pings are low when it is not received and at starting
wait_time = (
if not self.pings_not_received:
kafka_has_received = True
self.pings_not_received += 1
- await asyncio.sleep(wait_time, loop=self.loop)
+ await asyncio.sleep(wait_time)
if self.pings_not_received > 10:
raise LcmException("It is not receiving pings from Kafka bus")
consecutive_errors = 0
"Task kafka_read retrying after Exception {}".format(e)
)
wait_time = 2 if not first_start else 5
- await asyncio.sleep(wait_time, loop=self.loop)
+ await asyncio.sleep(wait_time)
def kafka_read_callback(self, topic, command, params):
order_id = 1
sys.stdout.flush()
return
elif command == "test":
- asyncio.Task(self.test(params), loop=self.loop)
+ asyncio.Task(self.test(params))
return
if topic == "admin":
topics_admin = ("admin",)
await asyncio.gather(
self.msg.aioread(
- topics, self.loop, self.kafka_read_callback, from_beginning=True
+ topics, self.kafka_read_callback, from_beginning=True
),
self.msg_admin.aioread(
topics_admin,
- self.loop,
self.kafka_read_callback,
group_id=False,
),
"Task kafka_read retrying after Exception {}".format(e)
)
wait_time = 2 if not self.first_start else 5
- await asyncio.sleep(wait_time, loop=self.loop)
+ await asyncio.sleep(wait_time)
# self.logger.debug("Task kafka_read terminating")
self.logger.debug("Task kafka_read exit")
+ async def kafka_read_ping(self):
+ await asyncio.gather(self.kafka_read(), self.kafka_ping())
+
def start(self):
# check RO version
- self.loop.run_until_complete(self.check_RO_version())
+ asyncio.run(self.check_RO_version())
- self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config, self.loop)
+ self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config)
# TODO: modify the rest of classes to use the LcmCfg object instead of dicts
self.netslice = netslice.NetsliceLcm(
- self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop, self.ns
- )
- self.vim = vim_sdn.VimLcm(
- self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
- )
- self.wim = vim_sdn.WimLcm(
- self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
- )
- self.sdn = vim_sdn.SdnLcm(
- self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
+ self.msg, self.lcm_tasks, self.main_config.to_dict(), self.ns
)
+ self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
+ self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
+ self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
self.k8scluster = vim_sdn.K8sClusterLcm(
- self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
- )
- self.vca = vim_sdn.VcaLcm(
- self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
+ self.msg, self.lcm_tasks, self.main_config.to_dict()
)
+ self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
self.k8srepo = vim_sdn.K8sRepoLcm(
- self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
+ self.msg, self.lcm_tasks, self.main_config.to_dict()
)
- self.loop.run_until_complete(
- asyncio.gather(self.kafka_read(), self.kafka_ping())
- )
+ asyncio.run(self.kafka_read_ping())
# TODO
# self.logger.debug("Terminating cancelling creation tasks")
# timeout = 200
# while self.is_pending_tasks():
# self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
- # await asyncio.sleep(2, loop=self.loop)
+ # await asyncio.sleep(2)
# timeout -= 2
# if not timeout:
# self.lcm_tasks.cancel("ALL", "ALL")
- self.loop.close()
- self.loop = None
if self.db:
self.db.db_disconnect()
if self.msg:
ctx.load_verify_locations(trusted)
ctx.set_ciphers("ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20")
ctx.set_alpn_protocols(["h2"])
- try:
- ctx.set_npn_protocols(["h2"])
- except NotImplementedError:
- pass
return ctx
def __init__(
self,
log: object = None,
- loop: object = None,
vca_config: VcaConfig = None,
on_update_db=None,
):
# parent class constructor
N2VCConnector.__init__(
- self, log=log, loop=loop, on_update_db=on_update_db, db=self.db, fs=self.fs
+ self, log=log, on_update_db=on_update_db, db=self.db, fs=self.fs
)
self.vca_config = vca_config
class NetsliceLcm(LcmBase):
- def __init__(self, msg, lcm_tasks, config, loop, ns):
+ def __init__(self, msg, lcm_tasks, config, ns):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
"""
# logging
self.logger = logging.getLogger("lcm.netslice")
- self.loop = loop
self.lcm_tasks = lcm_tasks
self.ns = ns
self.ro_config = config["RO"]
db_nsilcmop_update = {}
nsilcmop_operation_state = None
vim_2_RO = {}
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
nsi_vld_instantiationi_params = {}
def ip_profile_2_RO(ip_profile):
break
# TODO: future improvement due to synchronism -> await asyncio.wait(vca_task_list, timeout=300)
- await asyncio.sleep(5, loop=self.loop)
+ await asyncio.sleep(5)
else: # timeout_nsi_deploy reached:
raise LcmException("Timeout waiting nsi to be ready.")
db_nsilcmop = None
db_nsir_update = {"_admin.nsilcmop": nsilcmop_id}
db_nsilcmop_update = {}
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
nsir_deployed = None
failed_detail = [] # annotates all failed error messages
nsilcmop_operation_state = None
)
break
- await asyncio.sleep(5, loop=self.loop)
+ await asyncio.sleep(5)
termination_timeout -= 5
if termination_timeout <= 0:
"operationState": nsilcmop_operation_state,
"autoremove": autoremove,
},
- loop=self.loop,
)
except Exception as e:
self.logger.error(
timeout_large = 120
timeout_short = 30
- def __init__(self, loop, uri, **kwargs):
- self.loop = loop
+ def __init__(self, uri, **kwargs):
self.endpoint_url = uri
if not self.endpoint_url.endswith("/"):
self.endpoint_url += "/"
payload_req = yaml.safe_dump(target)
url = "{}/ns/v1/deploy/{nsr_id}".format(self.endpoint_url, nsr_id=nsr_id)
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
self.logger.debug("NG-RO POST %s %s", url, payload_req)
# timeout = aiohttp.ClientTimeout(total=self.timeout_large)
async with session.post(
payload_req = yaml.safe_dump(target)
url = "{}/ns/v1/migrate/{nsr_id}".format(self.endpoint_url, nsr_id=nsr_id)
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
self.logger.debug("NG-RO POST %s %s", url, payload_req)
# timeout = aiohttp.ClientTimeout(total=self.timeout_large)
async with session.post(
url = "{}/ns/v1/{operation_type}/{nsr_id}".format(
self.endpoint_url, operation_type=operation_type, nsr_id=nsr_id
)
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
self.logger.debug("NG-RO POST %s %s", url, payload_req)
# timeout = aiohttp.ClientTimeout(total=self.timeout_large)
async with session.post(
url = "{}/ns/v1/deploy/{nsr_id}/{action_id}".format(
self.endpoint_url, nsr_id=nsr_id, action_id=action_id
)
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
self.logger.debug("GET %s", url)
# timeout = aiohttp.ClientTimeout(total=self.timeout_short)
async with session.get(url, headers=self.headers_req) as response:
async def delete(self, nsr_id):
try:
url = "{}/ns/v1/deploy/{nsr_id}".format(self.endpoint_url, nsr_id=nsr_id)
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
self.logger.debug("DELETE %s", url)
# timeout = aiohttp.ClientTimeout(total=self.timeout_short)
async with session.delete(url, headers=self.headers_req) as response:
"""
try:
response_text = ""
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
url = "{}/version".format(self.endpoint_url)
self.logger.debug("RO GET %s", url)
# timeout = aiohttp.ClientTimeout(total=self.timeout_short)
payload_req = yaml.safe_dump(target)
url = "{}/ns/v1/recreate/{nsr_id}".format(self.endpoint_url, nsr_id=nsr_id)
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
self.logger.debug("NG-RO POST %s %s", url, payload_req)
async with session.post(
url, headers=self.headers_req, data=payload_req
url = "{}/ns/v1/recreate/{nsr_id}/{action_id}".format(
self.endpoint_url, nsr_id=nsr_id, action_id=action_id
)
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
self.logger.debug("GET %s", url)
async with session.get(url, headers=self.headers_req) as response:
response_text = await response.read()
url = "{}/ns/v1/verticalscale/{nsr_id}".format(
self.endpoint_url, nsr_id=nsr_id
)
- async with aiohttp.ClientSession(loop=self.loop) as session:
+ async with aiohttp.ClientSession() as session:
self.logger.debug("NG-RO POST %s %s", url, payload_req)
async with session.post(
url, headers=self.headers_req, data=payload_req
SUBOPERATION_STATUS_SKIP = -3
task_name_deploy_vca = "Deploying VCA"
- def __init__(self, msg, lcm_tasks, config: LcmCfg, loop):
+ def __init__(self, msg, lcm_tasks, config: LcmCfg):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
self.db = Database().instance.db
self.fs = Filesystem().instance.fs
- self.loop = loop
self.lcm_tasks = lcm_tasks
self.timeout = config.timeout
self.ro_config = config.RO
# create N2VC connector
self.n2vc = N2VCJujuConnector(
log=self.logger,
- loop=self.loop,
on_update_db=self._on_update_n2vc_db,
fs=self.fs,
db=self.db,
self.conn_helm_ee = LCMHelmConn(
log=self.logger,
- loop=self.loop,
vca_config=self.vca_config,
on_update_db=self._on_update_n2vc_db,
)
kubectl_command=self.vca_config.kubectlpath,
juju_command=self.vca_config.jujupath,
log=self.logger,
- loop=self.loop,
on_update_db=self._on_update_k8s_db,
fs=self.fs,
db=self.db,
}
# create RO client
- self.RO = NgRoClient(self.loop, **self.ro_config.to_dict())
+ self.RO = NgRoClient(**self.ro_config.to_dict())
self.op_status_map = {
"instantiation": self.RO.status,
db_nsr_update["detailed-status"] = " ".join(stage)
self.update_db_2("nsrs", nsr_id, db_nsr_update)
self._write_op_status(nslcmop_id, stage)
- await asyncio.sleep(15, loop=self.loop)
+ await asyncio.sleep(15)
else: # timeout_ns_deploy
raise NgRoException("Timeout waiting ns to deploy")
"target KDU={} is in error state".format(kdu_name)
)
- await asyncio.sleep(10, loop=self.loop)
+ await asyncio.sleep(10)
nb_tries += 1
raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
"Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
)
- await asyncio.sleep(10, loop=self.loop)
+ await asyncio.sleep(10)
# get ip address
if not target_vdu_id:
self.logger.debug(
logging_text + "Invoke and wait for placement optimization"
)
- await self.msg.aiowrite(
- "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
- )
+ await self.msg.aiowrite("pla", "get_placement", {"nslcmopId": nslcmop_id})
db_poll_interval = 5
wait = db_poll_interval * 10
pla_result = None
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
},
- loop=self.loop,
)
except Exception as e:
self.logger.error(
"operationState": nslcmop_operation_state,
"autoremove": autoremove,
},
- loop=self.loop,
)
except Exception as e:
self.logger.error(
)
)
# wait and retry
- await asyncio.sleep(retries_interval, loop=self.loop)
+ await asyncio.sleep(retries_interval)
else:
if isinstance(e, asyncio.TimeoutError):
e = N2VCException(
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
},
- loop=self.loop,
)
except Exception as e:
self.logger.error(
and member_vnf_index
):
msg.update({"vnf_member_index": member_vnf_index})
- await self.msg.aiowrite("ns", change_type, msg, loop=self.loop)
+ await self.msg.aiowrite("ns", change_type, msg)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
}
- await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
+ await self.msg.aiowrite("ns", "scaled", msg)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
kdur_name = kdur.get("name")
break
- await asyncio.sleep(10, loop=self.loop)
+ await asyncio.sleep(10)
else:
if vdu_id and vdu_index is not None:
raise LcmException(
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
}
- await self.msg.aiowrite("ns", "migrated", msg, loop=self.loop)
+ await self.msg.aiowrite("ns", "migrated", msg)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
}
- await self.msg.aiowrite("ns", "healed", msg, loop=self.loop)
+ await self.msg.aiowrite("ns", "healed", msg)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
self.logger.debug("Wait Heal RO > {}".format(operational_status_ro))
if operational_status_ro != "healing":
break
- await asyncio.sleep(15, loop=self.loop)
+ await asyncio.sleep(15)
else: # timeout_ns_deploy
raise NgRoException("Timeout waiting ns to deploy")
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
}
- await self.msg.aiowrite("ns", "verticalscaled", msg, loop=self.loop)
+ await self.msg.aiowrite("ns", "verticalscaled", msg)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
lcm_helm_conn.K8sHelm3Connector
)
vca_config = VcaConfig(vca_config)
- self.helm_conn = LCMHelmConn(
- loop=self.loop, vca_config=vca_config, log=self.logger
- )
+ self.helm_conn = LCMHelmConn(vca_config=vca_config, log=self.logger)
@asynctest.fail_on(active_handles=True)
async def test_create_execution_environment(self):
("active", "Ready!"),
):
# call callback after some time
- asyncio.sleep(5, loop=self.loop)
+ asyncio.sleep(5)
callback(model_name, application_name, status, message, *callback_args)
@staticmethod
ns.LCMHelmConn = asynctest.MagicMock(ns.LCMHelmConn)
def create_nslcm_class(self):
- self.my_ns = ns.NsLcm(self.msg, self.lcm_tasks, lcm_config, self.loop)
+ self.my_ns = ns.NsLcm(self.msg, self.lcm_tasks, lcm_config)
self.my_ns.fs = self.fs
self.my_ns.db = self.db
self.my_ns._wait_dependent_n2vc = asynctest.CoroutineMock()
def mock_ro(self):
if not getenv("OSMLCMTEST_RO_NOMOCK"):
- self.my_ns.RO = asynctest.Mock(
- NgRoClient(self.loop, **lcm_config.RO.to_dict())
- )
+ self.my_ns.RO = asynctest.Mock(NgRoClient(**lcm_config.RO.to_dict()))
# TODO first time should be empty list, following should return a dict
# self.my_ns.RO.get_list = asynctest.CoroutineMock(self.my_ns.RO.get_list, return_value=[])
self.my_ns.RO.deploy = asynctest.CoroutineMock(
# "description": "done"}})
self.my_ns.RO.delete = asynctest.CoroutineMock(self.my_ns.RO.delete)
- # @asynctest.fail_on(active_handles=True) # all async tasks must be completed
- # async def test_instantiate(self):
- # nsr_id = descriptors.test_ids["TEST-A"]["ns"]
- # nslcmop_id = descriptors.test_ids["TEST-A"]["instantiate"]
- # # print("Test instantiate started")
-
- # # delete deployed information of database
- # if not getenv("OSMLCMTEST_DB_NOMOCK"):
- # if self.db.get_list("nsrs")[0]["_admin"].get("deployed"):
- # del self.db.get_list("nsrs")[0]["_admin"]["deployed"]
- # for db_vnfr in self.db.get_list("vnfrs"):
- # db_vnfr.pop("ip_address", None)
- # for db_vdur in db_vnfr["vdur"]:
- # db_vdur.pop("ip_address", None)
- # db_vdur.pop("mac_address", None)
- # if getenv("OSMLCMTEST_RO_VIMID"):
- # self.db.get_list("vim_accounts")[0]["_admin"]["deployed"]["RO"] = getenv("OSMLCMTEST_RO_VIMID")
- # if getenv("OSMLCMTEST_RO_VIMID"):
- # self.db.get_list("nsrs")[0]["_admin"]["deployed"]["RO"] = getenv("OSMLCMTEST_RO_VIMID")
-
- # await self.my_ns.instantiate(nsr_id, nslcmop_id)
-
- # self.msg.aiowrite.assert_called_once_with("ns", "instantiated",
- # {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
- # "operationState": "COMPLETED"},
- # loop=self.loop)
- # self.lcm_tasks.lock_HA.assert_called_once_with('ns', 'nslcmops', nslcmop_id)
- # if not getenv("OSMLCMTEST_LOGGING_NOMOCK"):
- # self.assertTrue(self.my_ns.logger.debug.called, "Debug method not called")
- # self.my_ns.logger.error.assert_not_called()
- # self.my_ns.logger.exception().assert_not_called()
-
- # if not getenv("OSMLCMTEST_DB_NOMOCK"):
- # self.assertTrue(self.db.set_one.called, "db.set_one not called")
- # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
- # db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
- # self.assertEqual(db_nsr["_admin"].get("nsState"), "INSTANTIATED", "Not instantiated")
- # for vnfr in db_vnfrs_list:
- # self.assertEqual(vnfr["_admin"].get("nsState"), "INSTANTIATED", "Not instantiated")
-
- # if not getenv("OSMLCMTEST_VCA_NOMOCK"):
- # # check intial-primitives called
- # self.assertTrue(self.my_ns.n2vc.exec_primitive.called,
- # "Exec primitive not called for initial config primitive")
- # for _call in self.my_ns.n2vc.exec_primitive.call_args_list:
- # self.assertIn(_call[1]["primitive_name"], ("config", "touch"),
- # "called exec primitive with a primitive different than config or touch")
-
- # # TODO add more checks of called methods
- # # TODO add a terminate
-
- # async def test_instantiate_ee_list(self):
- # # Using modern IM where configuration is in the new format of execution_environment_list
- # ee_descriptor_id = "charm_simple"
- # non_used_initial_primitive = {
- # "name": "not_to_be_called",
- # "seq": 3,
- # "execution-environment-ref": "not_used_ee"
- # }
- # ee_list = [
- # {
- # "id": ee_descriptor_id,
- # "juju": {"charm": "simple"},
-
- # },
- # ]
-
- # self.db.set_one(
- # "vnfds",
- # q_filter={"_id": "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77"},
- # update_dict={"vnf-configuration.0.execution-environment-list": ee_list,
- # "vnf-configuration.0.initial-config-primitive.0.execution-environment-ref": ee_descriptor_id,
- # "vnf-configuration.0.initial-config-primitive.1.execution-environment-ref": ee_descriptor_id,
- # "vnf-configuration.0.initial-config-primitive.2": non_used_initial_primitive,
- # "vnf-configuration.0.config-primitive.0.execution-environment-ref": ee_descriptor_id,
- # "vnf-configuration.0.config-primitive.0.execution-environment-primitive": "touch_charm",
- # },
- # unset={"vnf-configuration.juju": None})
- # await self.test_instantiate()
- # # this will check that the initial-congig-primitive 'not_to_be_called' is not called
-
class TestMyNS(TestBaseNS):
@asynctest.fail_on(active_handles=True)
self.msg = Mock(msgbase.MsgBase())
self.lcm_tasks = Mock()
self.config = {"database": {"driver": "mongo"}}
- self.vca_lcm = VcaLcm(self.msg, self.lcm_tasks, self.config, self.loop)
+ self.vca_lcm = VcaLcm(self.msg, self.lcm_tasks, self.config)
self.vca_lcm.db = Mock()
self.vca_lcm.fs = Mock()
"kubectlpath": "/usr/bin/kubectl",
}
}
- self.k8scluster_lcm = K8sClusterLcm(
- self.msg, self.lcm_tasks, self.vca_config, self.loop
- )
+ self.k8scluster_lcm = K8sClusterLcm(self.msg, self.lcm_tasks, self.vca_config)
self.k8scluster_lcm.db = Mock()
self.k8scluster_lcm.fs = Mock()
),
}
- def __init__(self, msg, lcm_tasks, config, loop):
+ def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
"""
self.logger = logging.getLogger("lcm.vim")
- self.loop = loop
self.lcm_tasks = lcm_tasks
self.ro_config = config["RO"]
db_vim_update["_admin.deployed.RO"] = None
db_vim_update["_admin.detailed-status"] = step
self.update_db_2("vim_accounts", vim_id, db_vim_update)
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
vim_RO = deepcopy(vim_content)
vim_RO.pop("_id", None)
vim_RO.pop("_admin", None)
RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
step = "Editing vim at RO"
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
vim_RO = deepcopy(vim_content)
vim_RO.pop("_id", None)
vim_RO.pop("_admin", None)
and db_vim["_admin"]["deployed"].get("RO")
):
RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
step = "Detaching vim from RO tenant"
try:
await RO.detach("vim_account", RO_vim_id)
# values that are encrypted at wim config because they are passwords
wim_config_encrypted = ()
- def __init__(self, msg, lcm_tasks, config, loop):
+ def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
"""
self.logger = logging.getLogger("lcm.vim")
- self.loop = loop
self.lcm_tasks = lcm_tasks
self.ro_config = config["RO"]
step = "Creating wim at RO"
db_wim_update["_admin.detailed-status"] = step
self.update_db_2("wim_accounts", wim_id, db_wim_update)
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
wim_RO = deepcopy(wim_content)
wim_RO.pop("_id", None)
wim_RO.pop("_admin", None)
):
RO_wim_id = db_wim["_admin"]["deployed"]["RO"]
step = "Editing wim at RO"
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
wim_RO = deepcopy(wim_content)
wim_RO.pop("_id", None)
wim_RO.pop("_admin", None)
and db_wim["_admin"]["deployed"].get("RO")
):
RO_wim_id = db_wim["_admin"]["deployed"]["RO"]
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
step = "Detaching wim from RO tenant"
try:
await RO.detach("wim_account", RO_wim_id)
class SdnLcm(LcmBase):
- def __init__(self, msg, lcm_tasks, config, loop):
+ def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
"""
self.logger = logging.getLogger("lcm.sdn")
- self.loop = loop
self.lcm_tasks = lcm_tasks
self.ro_config = config["RO"]
db_sdn_update["_admin.detailed-status"] = step
self.update_db_2("sdns", sdn_id, db_sdn_update)
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
sdn_RO = deepcopy(sdn_content)
sdn_RO.pop("_id", None)
sdn_RO.pop("_admin", None)
and db_sdn["_admin"]["deployed"].get("RO")
):
RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
step = "Editing sdn at RO"
sdn_RO = deepcopy(sdn_content)
sdn_RO.pop("_id", None)
and db_sdn["_admin"]["deployed"].get("RO")
):
RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
step = "Deleting sdn from RO"
try:
await RO.delete("sdn", RO_sdn_id)
class K8sClusterLcm(LcmBase):
timeout_create = 300
- def __init__(self, msg, lcm_tasks, config, loop):
+ def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
"""
self.logger = logging.getLogger("lcm.k8scluster")
- self.loop = loop
self.lcm_tasks = lcm_tasks
self.vca_config = config["VCA"]
kubectl_command=self.vca_config.get("kubectlpath"),
juju_command=self.vca_config.get("jujupath"),
log=self.logger,
- loop=self.loop,
on_update_db=None,
db=self.db,
fs=self.fs,
class VcaLcm(LcmBase):
timeout_create = 30
- def __init__(self, msg, lcm_tasks, config, loop):
+ def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
"""
self.logger = logging.getLogger("lcm.vca")
- self.loop = loop
self.lcm_tasks = lcm_tasks
super().__init__(msg, self.logger)
# create N2VC connector
- self.n2vc = N2VCJujuConnector(
- log=self.logger, loop=self.loop, fs=self.fs, db=self.db
- )
+ self.n2vc = N2VCJujuConnector(log=self.logger, fs=self.fs, db=self.db)
def _get_vca_by_id(self, vca_id: str) -> dict:
db_vca = self.db.get_one("vca", {"_id": vca_id})
class K8sRepoLcm(LcmBase):
- def __init__(self, msg, lcm_tasks, config, loop):
+ def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
"""
self.logger = logging.getLogger("lcm.k8srepo")
- self.loop = loop
self.lcm_tasks = lcm_tasks
self.vca_config = config["VCA"]
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
# google-auth
-certifi==2022.12.7
+certifi==2023.5.7
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
# kubernetes
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
# cryptography
# pynacl
-charset-normalizer==2.1.1
+charset-normalizer==3.1.0
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
# requests
-cryptography==39.0.0
+cryptography==40.0.2
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
# paramiko
dataclasses==0.6
# via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
-google-auth==2.16.0
+dnspython==2.3.0
+ # via
+ # -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
+ # -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
+ # pymongo
+google-auth==2.17.3
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
# kubernetes
# via
# -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
# aiokafka
-kubernetes==25.3.0
+kubernetes==26.1.0
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
# juju
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
# juju
# theblues
-motor==1.3.1
+motor==3.1.2
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
# -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
-mypy-extensions==0.4.3
+mypy-extensions==1.0.0
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
# typing-inspect
# requests-oauthlib
osm-common @ git+https://osm.etsi.org/gerrit/osm/common.git@master
# via -r requirements-dev.in
-packaging==23.0
+packaging==23.1
# via
# -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
# aiokafka
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
# macaroonbakery
-pyasn1==0.4.8
+pyasn1==0.5.0
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
# juju
# pyasn1-modules
# rsa
-pyasn1-modules==0.2.8
+pyasn1-modules==0.3.0
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
# google-auth
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
# macaroonbakery
-pymongo==3.13.0
+pymongo==4.3.3
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
# -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
# kubernetes
-pytz==2022.7.1
+pytz==2023.3
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
# pyrfc3339
# juju
# jujubundlelib
# kubernetes
-requests==2.28.2
+requests==2.30.0
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
# kubernetes
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
# juju
-toposort==1.9
+toposort==1.10
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
# juju
-typing-extensions==4.4.0
+typing-extensions==4.5.0
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
# typing-inspect
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
# juju
-urllib3==1.26.14
+urllib3==2.0.2
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
# kubernetes
# requests
-websocket-client==1.5.0
+websocket-client==1.5.1
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
# kubernetes
-websockets==7.0
+websockets==11.0.3
# via
# -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
# juju
#######################################################################################
asynctest==0.13.0
# via -r requirements-test.in
-coverage==7.1.0
+coverage==7.2.5
# via -r requirements-test.in
-mock==5.0.1
+mock==5.0.2
# via -r requirements-test.in
-nose2==0.12.0
+nose2==0.13.0
# via -r requirements-test.in
# See the License for the specific language governing permissions and
# limitations under the License.
#######################################################################################
-aiohttp==3.8.3
+aiohttp==3.8.4
# via -r requirements.in
aiosignal==1.3.1
# via aiohttp
# via
# -r requirements.in
# aiohttp
-attrs==22.2.0
+attrs==23.1.0
# via
# aiohttp
# glom
-boltons==21.0.0
+boltons==23.0.0
# via
# face
# glom
-charset-normalizer==2.1.1
+charset-normalizer==3.1.0
# via aiohttp
checksumdir==1.2.0
# via -r requirements.in
# via
# aiohttp
# aiosignal
-glom==23.1.1
+glom==23.3.0
# via config-man
-grpcio==1.51.1
+grpcio==1.54.2
# via grpcio-tools
grpcio-tools==1.48.1
# via -r requirements.in
-grpclib==0.4.3
+grpclib==0.4.4
# via -r requirements.in
h2==4.1.0
# via grpclib
# via
# -r requirements.in
# grpcio-tools
-pydantic==1.10.4
+pydantic==1.10.7
# via -r requirements.in
pyyaml==5.4.1
# via -r requirements.in
-typing-extensions==4.4.0
+typing-extensions==4.5.0
# via pydantic
-yarl==1.8.2
+yarl==1.9.2
# via aiohttp
# The following packages are considered to be unsafe in a requirements file:
[testenv]
usedevelop = True
-basepython = python3.8
+basepython = python3.10
setenv = VIRTUAL_ENV={envdir}
PYTHONDONTWRITEBYTECODE = 1
deps = -r{toxinidir}/requirements.txt