Use vca_config and loop options in K8sJujuConnector
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, TemplateError, TemplateNotFound, StrictUndefined, UndefinedError
26
27 from osm_lcm import ROclient
28 from osm_lcm.ng_ro import NgRoClient, NgRoException
29 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
30 from n2vc.k8s_helm_conn import K8sHelmConnector
31 from n2vc.k8s_juju_conn import K8sJujuConnector
32
33 from osm_common.dbbase import DbException
34 from osm_common.fsbase import FsException
35
36 from n2vc.n2vc_juju_conn import N2VCJujuConnector
37 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
38
39 from osm_lcm.lcm_helm_conn import LCMHelmConn
40
41 from copy import copy, deepcopy
42 from http import HTTPStatus
43 from time import time
44 from uuid import uuid4
45
46 from random import randint
47
48 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
49
50
51 class N2VCJujuConnectorLCM(N2VCJujuConnector):
52
53 async def create_execution_environment(self, namespace: str, db_dict: dict, reuse_ee_id: str = None,
54 progress_timeout: float = None, total_timeout: float = None,
55 config: dict = None, artifact_path: str = None,
56 vca_type: str = None) -> (str, dict):
57 # admit two new parameters, artifact_path and vca_type
58 if vca_type == "k8s_proxy_charm":
59 ee_id = await self.install_k8s_proxy_charm(
60 charm_name=artifact_path[artifact_path.rfind("/") + 1:],
61 namespace=namespace,
62 artifact_path=artifact_path,
63 db_dict=db_dict)
64 return ee_id, None
65 else:
66 return await super().create_execution_environment(
67 namespace=namespace, db_dict=db_dict, reuse_ee_id=reuse_ee_id,
68 progress_timeout=progress_timeout, total_timeout=total_timeout)
69
70 async def install_configuration_sw(self, ee_id: str, artifact_path: str, db_dict: dict,
71 progress_timeout: float = None, total_timeout: float = None,
72 config: dict = None, num_units: int = 1, vca_type: str = "lxc_proxy_charm"):
73 if vca_type == "k8s_proxy_charm":
74 return
75 return await super().install_configuration_sw(
76 ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict, progress_timeout=progress_timeout,
77 total_timeout=total_timeout, config=config, num_units=num_units)
78
79
80 class NsLcm(LcmBase):
81 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
82 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
83 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
84 timeout_charm_delete = 10 * 60
85 timeout_primitive = 30 * 60 # timeout for primitive execution
86 timeout_progress_primitive = 10 * 60 # timeout for some progress in a primitive execution
87
88 SUBOPERATION_STATUS_NOT_FOUND = -1
89 SUBOPERATION_STATUS_NEW = -2
90 SUBOPERATION_STATUS_SKIP = -3
91 task_name_deploy_vca = "Deploying VCA"
92
93 def __init__(self, db, msg, fs, lcm_tasks, config, loop, prometheus=None):
94 """
95 Init, Connect to database, filesystem storage, and messaging
96 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
97 :return: None
98 """
99 super().__init__(
100 db=db,
101 msg=msg,
102 fs=fs,
103 logger=logging.getLogger('lcm.ns')
104 )
105
106 self.loop = loop
107 self.lcm_tasks = lcm_tasks
108 self.timeout = config["timeout"]
109 self.ro_config = config["ro_config"]
110 self.ng_ro = config["ro_config"].get("ng")
111 self.vca_config = config["VCA"].copy()
112
113 # create N2VC connector
114 self.n2vc = N2VCJujuConnectorLCM(
115 db=self.db,
116 fs=self.fs,
117 log=self.logger,
118 loop=self.loop,
119 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
120 username=self.vca_config.get('user', None),
121 vca_config=self.vca_config,
122 on_update_db=self._on_update_n2vc_db
123 )
124
125 self.conn_helm_ee = LCMHelmConn(
126 db=self.db,
127 fs=self.fs,
128 log=self.logger,
129 loop=self.loop,
130 url=None,
131 username=None,
132 vca_config=self.vca_config,
133 on_update_db=self._on_update_n2vc_db
134 )
135
136 self.k8sclusterhelm = K8sHelmConnector(
137 kubectl_command=self.vca_config.get("kubectlpath"),
138 helm_command=self.vca_config.get("helmpath"),
139 fs=self.fs,
140 log=self.logger,
141 db=self.db,
142 on_update_db=None,
143 )
144
145 self.k8sclusterjuju = K8sJujuConnector(
146 kubectl_command=self.vca_config.get("kubectlpath"),
147 juju_command=self.vca_config.get("jujupath"),
148 fs=self.fs,
149 log=self.logger,
150 db=self.db,
151 loop=self.loop,
152 on_update_db=None,
153 vca_config=self.vca_config,
154 )
155
156 self.k8scluster_map = {
157 "helm-chart": self.k8sclusterhelm,
158 "chart": self.k8sclusterhelm,
159 "juju-bundle": self.k8sclusterjuju,
160 "juju": self.k8sclusterjuju,
161 }
162
163 self.vca_map = {
164 "lxc_proxy_charm": self.n2vc,
165 "native_charm": self.n2vc,
166 "k8s_proxy_charm": self.n2vc,
167 "helm": self.conn_helm_ee
168 }
169
170 self.prometheus = prometheus
171
172 # create RO client
173 if self.ng_ro:
174 self.RO = NgRoClient(self.loop, **self.ro_config)
175 else:
176 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
177
178 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
179
180 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
181
182 try:
183 # TODO filter RO descriptor fields...
184
185 # write to database
186 db_dict = dict()
187 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
188 db_dict['deploymentStatus'] = ro_descriptor
189 self.update_db_2("nsrs", nsrs_id, db_dict)
190
191 except Exception as e:
192 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
193
194 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
195
196 # remove last dot from path (if exists)
197 if path.endswith('.'):
198 path = path[:-1]
199
200 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
201 # .format(table, filter, path, updated_data))
202
203 try:
204
205 nsr_id = filter.get('_id')
206
207 # read ns record from database
208 nsr = self.db.get_one(table='nsrs', q_filter=filter)
209 current_ns_status = nsr.get('nsState')
210
211 # get vca status for NS
212 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
213
214 # vcaStatus
215 db_dict = dict()
216 db_dict['vcaStatus'] = status_dict
217
218 # update configurationStatus for this VCA
219 try:
220 vca_index = int(path[path.rfind(".")+1:])
221
222 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
223 vca_status = vca_list[vca_index].get('status')
224
225 configuration_status_list = nsr.get('configurationStatus')
226 config_status = configuration_status_list[vca_index].get('status')
227
228 if config_status == 'BROKEN' and vca_status != 'failed':
229 db_dict['configurationStatus'][vca_index] = 'READY'
230 elif config_status != 'BROKEN' and vca_status == 'failed':
231 db_dict['configurationStatus'][vca_index] = 'BROKEN'
232 except Exception as e:
233 # not update configurationStatus
234 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
235
236 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
237 # if nsState = 'DEGRADED' check if all is OK
238 is_degraded = False
239 if current_ns_status in ('READY', 'DEGRADED'):
240 error_description = ''
241 # check machines
242 if status_dict.get('machines'):
243 for machine_id in status_dict.get('machines'):
244 machine = status_dict.get('machines').get(machine_id)
245 # check machine agent-status
246 if machine.get('agent-status'):
247 s = machine.get('agent-status').get('status')
248 if s != 'started':
249 is_degraded = True
250 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
251 # check machine instance status
252 if machine.get('instance-status'):
253 s = machine.get('instance-status').get('status')
254 if s != 'running':
255 is_degraded = True
256 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
257 # check applications
258 if status_dict.get('applications'):
259 for app_id in status_dict.get('applications'):
260 app = status_dict.get('applications').get(app_id)
261 # check application status
262 if app.get('status'):
263 s = app.get('status').get('status')
264 if s != 'active':
265 is_degraded = True
266 error_description += 'application {} status={} ; '.format(app_id, s)
267
268 if error_description:
269 db_dict['errorDescription'] = error_description
270 if current_ns_status == 'READY' and is_degraded:
271 db_dict['nsState'] = 'DEGRADED'
272 if current_ns_status == 'DEGRADED' and not is_degraded:
273 db_dict['nsState'] = 'READY'
274
275 # write to database
276 self.update_db_2("nsrs", nsr_id, db_dict)
277
278 except (asyncio.CancelledError, asyncio.TimeoutError):
279 raise
280 except Exception as e:
281 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
282
283 @staticmethod
284 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
285 try:
286 env = Environment(undefined=StrictUndefined)
287 template = env.from_string(cloud_init_text)
288 return template.render(additional_params or {})
289 except UndefinedError as e:
290 raise LcmException("Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
291 "file, must be provided in the instantiation parameters inside the "
292 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id))
293 except (TemplateError, TemplateNotFound) as e:
294 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
295 format(vnfd_id, vdu_id, e))
296
297 def _get_cloud_init(self, vdu, vnfd):
298 try:
299 cloud_init_content = cloud_init_file = None
300 if vdu.get("cloud-init-file"):
301 base_folder = vnfd["_admin"]["storage"]
302 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
303 vdu["cloud-init-file"])
304 with self.fs.file_open(cloud_init_file, "r") as ci_file:
305 cloud_init_content = ci_file.read()
306 elif vdu.get("cloud-init"):
307 cloud_init_content = vdu["cloud-init"]
308
309 return cloud_init_content
310 except FsException as e:
311 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
312 format(vnfd["id"], vdu["id"], cloud_init_file, e))
313
314 def _get_osm_params(self, db_vnfr, vdu_id=None, vdu_count_index=0):
315 osm_params = {x.replace("-", "_"): db_vnfr[x] for x in ("ip-address", "vim-account-id", "vnfd-id", "vnfd-ref")
316 if db_vnfr.get(x) is not None}
317 osm_params["ns_id"] = db_vnfr["nsr-id-ref"]
318 osm_params["vnf_id"] = db_vnfr["_id"]
319 osm_params["member_vnf_index"] = db_vnfr["member-vnf-index-ref"]
320 if db_vnfr.get("vdur"):
321 osm_params["vdu"] = {}
322 for vdur in db_vnfr["vdur"]:
323 vdu = {
324 "count_index": vdur["count-index"],
325 "vdu_id": vdur["vdu-id-ref"],
326 "interfaces": {}
327 }
328 if vdur.get("ip-address"):
329 vdu["ip_address"] = vdur["ip-address"]
330 for iface in vdur["interfaces"]:
331 vdu["interfaces"][iface["name"]] = \
332 {x.replace("-", "_"): iface[x] for x in ("mac-address", "ip-address", "vnf-vld-id", "name")
333 if iface.get(x) is not None}
334 vdu_id_index = "{}-{}".format(vdur["vdu-id-ref"], vdur["count-index"])
335 osm_params["vdu"][vdu_id_index] = vdu
336 if vdu_id:
337 osm_params["vdu_id"] = vdu_id
338 osm_params["count_index"] = vdu_count_index
339 return osm_params
340
341 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
342 vdur = next(vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"])
343 additional_params = vdur.get("additionalParams")
344 return self._format_additional_params(additional_params)
345
346 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
347 """
348 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
349 :param vnfd: input vnfd
350 :param new_id: overrides vnf id if provided
351 :param additionalParams: Instantiation params for VNFs provided
352 :param nsrId: Id of the NSR
353 :return: copy of vnfd
354 """
355 vnfd_RO = deepcopy(vnfd)
356 # remove unused by RO configuration, monitoring, scaling and internal keys
357 vnfd_RO.pop("_id", None)
358 vnfd_RO.pop("_admin", None)
359 vnfd_RO.pop("vnf-configuration", None)
360 vnfd_RO.pop("monitoring-param", None)
361 vnfd_RO.pop("scaling-group-descriptor", None)
362 vnfd_RO.pop("kdu", None)
363 vnfd_RO.pop("k8s-cluster", None)
364 if new_id:
365 vnfd_RO["id"] = new_id
366
367 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
368 for vdu in get_iterable(vnfd_RO, "vdu"):
369 vdu.pop("cloud-init-file", None)
370 vdu.pop("cloud-init", None)
371 return vnfd_RO
372
373 def _ns_params_2_RO(self, ns_params, nsd, vnfd_dict, db_vnfrs, n2vc_key_list):
374 """
375 Creates a RO ns descriptor from OSM ns_instantiate params
376 :param ns_params: OSM instantiate params
377 :param vnfd_dict: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
378 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index. {member-vnf-index: {vnfr_object}, ...}
379 :return: The RO ns descriptor
380 """
381 vim_2_RO = {}
382 wim_2_RO = {}
383 # TODO feature 1417: Check that no instantiation is set over PDU
384 # check if PDU forces a concrete vim-network-id and add it
385 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
386
387 def vim_account_2_RO(vim_account):
388 if vim_account in vim_2_RO:
389 return vim_2_RO[vim_account]
390
391 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
392 if db_vim["_admin"]["operationalState"] != "ENABLED":
393 raise LcmException("VIM={} is not available. operationalState={}".format(
394 vim_account, db_vim["_admin"]["operationalState"]))
395 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
396 vim_2_RO[vim_account] = RO_vim_id
397 return RO_vim_id
398
399 def wim_account_2_RO(wim_account):
400 if isinstance(wim_account, str):
401 if wim_account in wim_2_RO:
402 return wim_2_RO[wim_account]
403
404 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
405 if db_wim["_admin"]["operationalState"] != "ENABLED":
406 raise LcmException("WIM={} is not available. operationalState={}".format(
407 wim_account, db_wim["_admin"]["operationalState"]))
408 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
409 wim_2_RO[wim_account] = RO_wim_id
410 return RO_wim_id
411 else:
412 return wim_account
413
414 def ip_profile_2_RO(ip_profile):
415 RO_ip_profile = deepcopy((ip_profile))
416 if "dns-server" in RO_ip_profile:
417 if isinstance(RO_ip_profile["dns-server"], list):
418 RO_ip_profile["dns-address"] = []
419 for ds in RO_ip_profile.pop("dns-server"):
420 RO_ip_profile["dns-address"].append(ds['address'])
421 else:
422 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
423 if RO_ip_profile.get("ip-version") == "ipv4":
424 RO_ip_profile["ip-version"] = "IPv4"
425 if RO_ip_profile.get("ip-version") == "ipv6":
426 RO_ip_profile["ip-version"] = "IPv6"
427 if "dhcp-params" in RO_ip_profile:
428 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
429 return RO_ip_profile
430
431 if not ns_params:
432 return None
433 RO_ns_params = {
434 # "name": ns_params["nsName"],
435 # "description": ns_params.get("nsDescription"),
436 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
437 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
438 # "scenario": ns_params["nsdId"],
439 }
440 # set vim_account of each vnf if different from general vim_account.
441 # Get this information from <vnfr> database content, key vim-account-id
442 # Vim account can be set by placement_engine and it may be different from
443 # the instantiate parameters (vnfs.member-vnf-index.datacenter).
444 for vnf_index, vnfr in db_vnfrs.items():
445 if vnfr.get("vim-account-id") and vnfr["vim-account-id"] != ns_params["vimAccountId"]:
446 populate_dict(RO_ns_params, ("vnfs", vnf_index, "datacenter"), vim_account_2_RO(vnfr["vim-account-id"]))
447
448 n2vc_key_list = n2vc_key_list or []
449 for vnfd_ref, vnfd in vnfd_dict.items():
450 vdu_needed_access = []
451 mgmt_cp = None
452 if vnfd.get("vnf-configuration"):
453 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
454 if ssh_required and vnfd.get("mgmt-interface"):
455 if vnfd["mgmt-interface"].get("vdu-id"):
456 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
457 elif vnfd["mgmt-interface"].get("cp"):
458 mgmt_cp = vnfd["mgmt-interface"]["cp"]
459
460 for vdu in vnfd.get("vdu", ()):
461 if vdu.get("vdu-configuration"):
462 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
463 if ssh_required:
464 vdu_needed_access.append(vdu["id"])
465 elif mgmt_cp:
466 for vdu_interface in vdu.get("interface"):
467 if vdu_interface.get("external-connection-point-ref") and \
468 vdu_interface["external-connection-point-ref"] == mgmt_cp:
469 vdu_needed_access.append(vdu["id"])
470 mgmt_cp = None
471 break
472
473 if vdu_needed_access:
474 for vnf_member in nsd.get("constituent-vnfd"):
475 if vnf_member["vnfd-id-ref"] != vnfd_ref:
476 continue
477 for vdu in vdu_needed_access:
478 populate_dict(RO_ns_params,
479 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
480 n2vc_key_list)
481 # cloud init
482 for vdu in get_iterable(vnfd, "vdu"):
483 cloud_init_text = self._get_cloud_init(vdu, vnfd)
484 if not cloud_init_text:
485 continue
486 for vnf_member in nsd.get("constituent-vnfd"):
487 if vnf_member["vnfd-id-ref"] != vnfd_ref:
488 continue
489 db_vnfr = db_vnfrs[vnf_member["member-vnf-index"]]
490 additional_params = self._get_vdu_additional_params(db_vnfr, vdu["id"]) or {}
491
492 cloud_init_list = []
493 for vdu_index in range(0, int(vdu.get("count", 1))):
494 additional_params["OSM"] = self._get_osm_params(db_vnfr, vdu["id"], vdu_index)
495 cloud_init_list.append(self._parse_cloud_init(cloud_init_text, additional_params, vnfd["id"],
496 vdu["id"]))
497 populate_dict(RO_ns_params,
498 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu["id"], "cloud_init"),
499 cloud_init_list)
500
501 if ns_params.get("vduImage"):
502 RO_ns_params["vduImage"] = ns_params["vduImage"]
503
504 if ns_params.get("ssh_keys"):
505 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
506 for vnf_params in get_iterable(ns_params, "vnf"):
507 for constituent_vnfd in nsd["constituent-vnfd"]:
508 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
509 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
510 break
511 else:
512 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
513 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
514
515 for vdu_params in get_iterable(vnf_params, "vdu"):
516 # TODO feature 1417: check that this VDU exist and it is not a PDU
517 if vdu_params.get("volume"):
518 for volume_params in vdu_params["volume"]:
519 if volume_params.get("vim-volume-id"):
520 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
521 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
522 volume_params["vim-volume-id"])
523 if vdu_params.get("interface"):
524 for interface_params in vdu_params["interface"]:
525 if interface_params.get("ip-address"):
526 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
527 vdu_params["id"], "interfaces", interface_params["name"],
528 "ip_address"),
529 interface_params["ip-address"])
530 if interface_params.get("mac-address"):
531 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
532 vdu_params["id"], "interfaces", interface_params["name"],
533 "mac_address"),
534 interface_params["mac-address"])
535 if interface_params.get("floating-ip-required"):
536 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
537 vdu_params["id"], "interfaces", interface_params["name"],
538 "floating-ip"),
539 interface_params["floating-ip-required"])
540
541 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
542 if internal_vld_params.get("vim-network-name"):
543 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
544 internal_vld_params["name"], "vim-network-name"),
545 internal_vld_params["vim-network-name"])
546 if internal_vld_params.get("vim-network-id"):
547 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
548 internal_vld_params["name"], "vim-network-id"),
549 internal_vld_params["vim-network-id"])
550 if internal_vld_params.get("ip-profile"):
551 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
552 internal_vld_params["name"], "ip-profile"),
553 ip_profile_2_RO(internal_vld_params["ip-profile"]))
554 if internal_vld_params.get("provider-network"):
555
556 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
557 internal_vld_params["name"], "provider-network"),
558 internal_vld_params["provider-network"].copy())
559
560 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
561 # look for interface
562 iface_found = False
563 for vdu_descriptor in vnf_descriptor["vdu"]:
564 for vdu_interface in vdu_descriptor["interface"]:
565 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
566 if icp_params.get("ip-address"):
567 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
568 vdu_descriptor["id"], "interfaces",
569 vdu_interface["name"], "ip_address"),
570 icp_params["ip-address"])
571
572 if icp_params.get("mac-address"):
573 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
574 vdu_descriptor["id"], "interfaces",
575 vdu_interface["name"], "mac_address"),
576 icp_params["mac-address"])
577 iface_found = True
578 break
579 if iface_found:
580 break
581 else:
582 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
583 "internal-vld:id-ref={} is not present at vnfd:internal-"
584 "connection-point".format(vnf_params["member-vnf-index"],
585 icp_params["id-ref"]))
586
587 for vld_params in get_iterable(ns_params, "vld"):
588 if "ip-profile" in vld_params:
589 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
590 ip_profile_2_RO(vld_params["ip-profile"]))
591
592 if vld_params.get("provider-network"):
593
594 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
595 vld_params["provider-network"].copy())
596
597 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
598 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
599 wim_account_2_RO(vld_params["wimAccountId"])),
600 if vld_params.get("vim-network-name"):
601 RO_vld_sites = []
602 if isinstance(vld_params["vim-network-name"], dict):
603 for vim_account, vim_net in vld_params["vim-network-name"].items():
604 RO_vld_sites.append({
605 "netmap-use": vim_net,
606 "datacenter": vim_account_2_RO(vim_account)
607 })
608 else: # isinstance str
609 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
610 if RO_vld_sites:
611 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
612
613 if vld_params.get("vim-network-id"):
614 RO_vld_sites = []
615 if isinstance(vld_params["vim-network-id"], dict):
616 for vim_account, vim_net in vld_params["vim-network-id"].items():
617 RO_vld_sites.append({
618 "netmap-use": vim_net,
619 "datacenter": vim_account_2_RO(vim_account)
620 })
621 else: # isinstance str
622 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
623 if RO_vld_sites:
624 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
625 if vld_params.get("ns-net"):
626 if isinstance(vld_params["ns-net"], dict):
627 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
628 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
629 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
630 if "vnfd-connection-point-ref" in vld_params:
631 for cp_params in vld_params["vnfd-connection-point-ref"]:
632 # look for interface
633 for constituent_vnfd in nsd["constituent-vnfd"]:
634 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
635 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
636 break
637 else:
638 raise LcmException(
639 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
640 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
641 match_cp = False
642 for vdu_descriptor in vnf_descriptor["vdu"]:
643 for interface_descriptor in vdu_descriptor["interface"]:
644 if interface_descriptor.get("external-connection-point-ref") == \
645 cp_params["vnfd-connection-point-ref"]:
646 match_cp = True
647 break
648 if match_cp:
649 break
650 else:
651 raise LcmException(
652 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
653 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
654 cp_params["member-vnf-index-ref"],
655 cp_params["vnfd-connection-point-ref"],
656 vnf_descriptor["id"]))
657 if cp_params.get("ip-address"):
658 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
659 vdu_descriptor["id"], "interfaces",
660 interface_descriptor["name"], "ip_address"),
661 cp_params["ip-address"])
662 if cp_params.get("mac-address"):
663 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
664 vdu_descriptor["id"], "interfaces",
665 interface_descriptor["name"], "mac_address"),
666 cp_params["mac-address"])
667 return RO_ns_params
668
669 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
670 # make a copy to do not change
671 vdu_create = copy(vdu_create)
672 vdu_delete = copy(vdu_delete)
673
674 vdurs = db_vnfr.get("vdur")
675 if vdurs is None:
676 vdurs = []
677 vdu_index = len(vdurs)
678 while vdu_index:
679 vdu_index -= 1
680 vdur = vdurs[vdu_index]
681 if vdur.get("pdu-type"):
682 continue
683 vdu_id_ref = vdur["vdu-id-ref"]
684 if vdu_create and vdu_create.get(vdu_id_ref):
685 vdur_copy = deepcopy(vdur)
686 vdur_copy["status"] = "BUILD"
687 vdur_copy["status-detailed"] = None
688 vdur_copy["ip_address"]: None
689 for iface in vdur_copy["interfaces"]:
690 iface["ip-address"] = None
691 iface["mac-address"] = None
692 iface.pop("mgmt_vnf", None) # only first vdu can be managment of vnf # TODO ALF
693 for index in range(0, vdu_create[vdu_id_ref]):
694 vdur_copy["_id"] = str(uuid4())
695 vdur_copy["count-index"] += 1
696 vdurs.insert(vdu_index+1+index, vdur_copy)
697 self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
698 vdur_copy = deepcopy(vdur_copy)
699
700 del vdu_create[vdu_id_ref]
701 if vdu_delete and vdu_delete.get(vdu_id_ref):
702 del vdurs[vdu_index]
703 vdu_delete[vdu_id_ref] -= 1
704 if not vdu_delete[vdu_id_ref]:
705 del vdu_delete[vdu_id_ref]
706 # check all operations are done
707 if vdu_create or vdu_delete:
708 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
709 vdu_create))
710 if vdu_delete:
711 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
712 vdu_delete))
713
714 vnfr_update = {"vdur": vdurs}
715 db_vnfr["vdur"] = vdurs
716 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
717
718 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
719 """
720 Updates database nsr with the RO info for the created vld
721 :param ns_update_nsr: dictionary to be filled with the updated info
722 :param db_nsr: content of db_nsr. This is also modified
723 :param nsr_desc_RO: nsr descriptor from RO
724 :return: Nothing, LcmException is raised on errors
725 """
726
727 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
728 for net_RO in get_iterable(nsr_desc_RO, "nets"):
729 if vld["id"] != net_RO.get("ns_net_osm_id"):
730 continue
731 vld["vim-id"] = net_RO.get("vim_net_id")
732 vld["name"] = net_RO.get("vim_name")
733 vld["status"] = net_RO.get("status")
734 vld["status-detailed"] = net_RO.get("error_msg")
735 ns_update_nsr["vld.{}".format(vld_index)] = vld
736 break
737 else:
738 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
739
740 def set_vnfr_at_error(self, db_vnfrs, error_text):
741 try:
742 for db_vnfr in db_vnfrs.values():
743 vnfr_update = {"status": "ERROR"}
744 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
745 if "status" not in vdur:
746 vdur["status"] = "ERROR"
747 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
748 if error_text:
749 vdur["status-detailed"] = str(error_text)
750 vnfr_update["vdur.{}.status-detailed".format(vdu_index)] = "ERROR"
751 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
752 except DbException as e:
753 self.logger.error("Cannot update vnf. {}".format(e))
754
755 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
756 """
757 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
758 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
759 :param nsr_desc_RO: nsr descriptor from RO
760 :return: Nothing, LcmException is raised on errors
761 """
762 for vnf_index, db_vnfr in db_vnfrs.items():
763 for vnf_RO in nsr_desc_RO["vnfs"]:
764 if vnf_RO["member_vnf_index"] != vnf_index:
765 continue
766 vnfr_update = {}
767 if vnf_RO.get("ip_address"):
768 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
769 elif not db_vnfr.get("ip-address"):
770 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
771 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
772
773 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
774 vdur_RO_count_index = 0
775 if vdur.get("pdu-type"):
776 continue
777 for vdur_RO in get_iterable(vnf_RO, "vms"):
778 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
779 continue
780 if vdur["count-index"] != vdur_RO_count_index:
781 vdur_RO_count_index += 1
782 continue
783 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
784 if vdur_RO.get("ip_address"):
785 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
786 else:
787 vdur["ip-address"] = None
788 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
789 vdur["name"] = vdur_RO.get("vim_name")
790 vdur["status"] = vdur_RO.get("status")
791 vdur["status-detailed"] = vdur_RO.get("error_msg")
792 for ifacer in get_iterable(vdur, "interfaces"):
793 for interface_RO in get_iterable(vdur_RO, "interfaces"):
794 if ifacer["name"] == interface_RO.get("internal_name"):
795 ifacer["ip-address"] = interface_RO.get("ip_address")
796 ifacer["mac-address"] = interface_RO.get("mac_address")
797 break
798 else:
799 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
800 "from VIM info"
801 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
802 vnfr_update["vdur.{}".format(vdu_index)] = vdur
803 break
804 else:
805 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
806 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
807
808 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
809 for net_RO in get_iterable(nsr_desc_RO, "nets"):
810 if vld["id"] != net_RO.get("vnf_net_osm_id"):
811 continue
812 vld["vim-id"] = net_RO.get("vim_net_id")
813 vld["name"] = net_RO.get("vim_name")
814 vld["status"] = net_RO.get("status")
815 vld["status-detailed"] = net_RO.get("error_msg")
816 vnfr_update["vld.{}".format(vld_index)] = vld
817 break
818 else:
819 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
820 vnf_index, vld["id"]))
821
822 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
823 break
824
825 else:
826 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
827
828 def _get_ns_config_info(self, nsr_id):
829 """
830 Generates a mapping between vnf,vdu elements and the N2VC id
831 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
832 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
833 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
834 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
835 """
836 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
837 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
838 mapping = {}
839 ns_config_info = {"osm-config-mapping": mapping}
840 for vca in vca_deployed_list:
841 if not vca["member-vnf-index"]:
842 continue
843 if not vca["vdu_id"]:
844 mapping[vca["member-vnf-index"]] = vca["application"]
845 else:
846 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
847 vca["application"]
848 return ns_config_info
849
850 @staticmethod
851 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed, ee_descriptor_id):
852 """
853 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
854 primitives as verify-ssh-credentials, or config when needed
855 :param desc_primitive_list: information of the descriptor
856 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
857 this element contains a ssh public key
858 :param ee_descriptor_id: execution environment descriptor id. It is the value of
859 XXX_configuration.execution-environment-list.INDEX.id; it can be None
860 :return: The modified list. Can ba an empty list, but always a list
861 """
862
863 primitive_list = desc_primitive_list or []
864
865 # filter primitives by ee_id
866 primitive_list = [p for p in primitive_list if p.get("execution-environment-ref") == ee_descriptor_id]
867
868 # sort by 'seq'
869 if primitive_list:
870 primitive_list.sort(key=lambda val: int(val['seq']))
871
872 # look for primitive config, and get the position. None if not present
873 config_position = None
874 for index, primitive in enumerate(primitive_list):
875 if primitive["name"] == "config":
876 config_position = index
877 break
878
879 # for NS, add always a config primitive if not present (bug 874)
880 if not vca_deployed["member-vnf-index"] and config_position is None:
881 primitive_list.insert(0, {"name": "config", "parameter": []})
882 config_position = 0
883 # TODO revise if needed: for VNF/VDU add verify-ssh-credentials after config
884 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
885 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
886 return primitive_list
887
888 async def _instantiate_ng_ro(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
889 n2vc_key_list, stage, start_deploy, timeout_ns_deploy):
890 nslcmop_id = db_nslcmop["_id"]
891 target = {
892 "name": db_nsr["name"],
893 "ns": {"vld": []},
894 "vnf": [],
895 "image": deepcopy(db_nsr["image"]),
896 "flavor": deepcopy(db_nsr["flavor"]),
897 "action_id": nslcmop_id,
898 }
899 for image in target["image"]:
900 image["vim_info"] = []
901 for flavor in target["flavor"]:
902 flavor["vim_info"] = []
903
904 ns_params = db_nslcmop.get("operationParams")
905 ssh_keys = []
906 if ns_params.get("ssh_keys"):
907 ssh_keys += ns_params.get("ssh_keys")
908 if n2vc_key_list:
909 ssh_keys += n2vc_key_list
910
911 cp2target = {}
912 for vld_index, vld in enumerate(nsd.get("vld")):
913 target_vld = {"id": vld["id"],
914 "name": vld["name"],
915 "mgmt-network": vld.get("mgmt-network", False),
916 "type": vld.get("type"),
917 "vim_info": [{"vim-network-name": vld.get("vim-network-name"),
918 "vim_account_id": ns_params["vimAccountId"]}],
919 }
920 for cp in vld["vnfd-connection-point-ref"]:
921 cp2target["member_vnf:{}.{}".format(cp["member-vnf-index-ref"], cp["vnfd-connection-point-ref"])] = \
922 "nsrs:{}:vld.{}".format(nsr_id, vld_index)
923 target["ns"]["vld"].append(target_vld)
924 for vnfr in db_vnfrs.values():
925 vnfd = db_vnfds_ref[vnfr["vnfd-ref"]]
926 target_vnf = deepcopy(vnfr)
927 for vld in target_vnf.get("vld", ()):
928 # check if connected to a ns.vld
929 vnf_cp = next((cp for cp in vnfd.get("connection-point", ()) if
930 cp.get("internal-vld-ref") == vld["id"]), None)
931 if vnf_cp:
932 ns_cp = "member_vnf:{}.{}".format(vnfr["member-vnf-index-ref"], vnf_cp["id"])
933 if cp2target.get(ns_cp):
934 vld["target"] = cp2target[ns_cp]
935 vld["vim_info"] = [{"vim-network-name": vld.get("vim-network-name"),
936 "vim_account_id": vnfr["vim-account-id"]}]
937
938 for vdur in target_vnf.get("vdur", ()):
939 vdur["vim_info"] = [{"vim_account_id": vnfr["vim-account-id"]}]
940 vdud_index, vdud = next(k for k in enumerate(vnfd["vdu"]) if k[1]["id"] == vdur["vdu-id-ref"])
941 # vdur["additionalParams"] = vnfr.get("additionalParamsForVnf") # TODO additional params for VDU
942
943 if ssh_keys:
944 if deep_get(vdud, ("vdu-configuration", "config-access", "ssh-access", "required")):
945 vdur["ssh-keys"] = ssh_keys
946 vdur["ssh-access-required"] = True
947 elif deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required")) and \
948 any(iface.get("mgmt-vnf") for iface in vdur["interfaces"]):
949 vdur["ssh-keys"] = ssh_keys
950 vdur["ssh-access-required"] = True
951
952 # cloud-init
953 if vdud.get("cloud-init-file"):
954 vdur["cloud-init"] = "{}:file:{}".format(vnfd["_id"], vdud.get("cloud-init-file"))
955 elif vdud.get("cloud-init"):
956 vdur["cloud-init"] = "{}:vdu:{}".format(vnfd["_id"], vdud_index)
957
958 # flavor
959 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
960 if not next((vi for vi in ns_flavor["vim_info"] if
961 vi and vi.get("vim_account_id") == vnfr["vim-account-id"]), None):
962 ns_flavor["vim_info"].append({"vim_account_id": vnfr["vim-account-id"]})
963 # image
964 ns_image = target["image"][int(vdur["ns-image-id"])]
965 if not next((vi for vi in ns_image["vim_info"] if
966 vi and vi.get("vim_account_id") == vnfr["vim-account-id"]), None):
967 ns_image["vim_info"].append({"vim_account_id": vnfr["vim-account-id"]})
968
969 vdur["vim_info"] = [{"vim_account_id": vnfr["vim-account-id"]}]
970 target["vnf"].append(target_vnf)
971
972 desc = await self.RO.deploy(nsr_id, target)
973 action_id = desc["action_id"]
974 await self._wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage)
975
976 # Updating NSR
977 db_nsr_update = {
978 "_admin.deployed.RO.operational-status": "running",
979 "detailed-status": " ".join(stage)
980 }
981 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
982 self.update_db_2("nsrs", nsr_id, db_nsr_update)
983 self._write_op_status(nslcmop_id, stage)
984 self.logger.debug(logging_text + "ns deployed at RO. RO_id={}".format(action_id))
985 return
986
987 async def _wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_time, timeout, stage):
988 detailed_status_old = None
989 db_nsr_update = {}
990 while time() <= start_time + timeout:
991 desc_status = await self.RO.status(nsr_id, action_id)
992 if desc_status["status"] == "FAILED":
993 raise NgRoException(desc_status["details"])
994 elif desc_status["status"] == "BUILD":
995 stage[2] = "VIM: ({})".format(desc_status["details"])
996 elif desc_status["status"] == "DONE":
997 stage[2] = "Deployed at VIM"
998 break
999 else:
1000 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status["status"])
1001 if stage[2] != detailed_status_old:
1002 detailed_status_old = stage[2]
1003 db_nsr_update["detailed-status"] = " ".join(stage)
1004 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1005 self._write_op_status(nslcmop_id, stage)
1006 await asyncio.sleep(5, loop=self.loop)
1007 else: # timeout_ns_deploy
1008 raise NgRoException("Timeout waiting ns to deploy")
1009
1010 async def _terminate_ng_ro(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
1011 db_nsr_update = {}
1012 failed_detail = []
1013 action_id = None
1014 start_deploy = time()
1015 try:
1016 target = {
1017 "ns": {"vld": []},
1018 "vnf": [],
1019 "image": [],
1020 "flavor": [],
1021 }
1022 desc = await self.RO.deploy(nsr_id, target)
1023 action_id = desc["action_id"]
1024 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1025 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1026 self.logger.debug(logging_text + "ns terminate action at RO. action_id={}".format(action_id))
1027
1028 # wait until done
1029 delete_timeout = 20 * 60 # 20 minutes
1030 await self._wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage)
1031
1032 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1033 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1034 # delete all nsr
1035 await self.RO.delete(nsr_id)
1036 except Exception as e:
1037 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1038 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1039 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1040 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1041 self.logger.debug(logging_text + "RO_action_id={} already deleted".format(action_id))
1042 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1043 failed_detail.append("delete conflict: {}".format(e))
1044 self.logger.debug(logging_text + "RO_action_id={} delete conflict: {}".format(action_id, e))
1045 else:
1046 failed_detail.append("delete error: {}".format(e))
1047 self.logger.error(logging_text + "RO_action_id={} delete error: {}".format(action_id, e))
1048
1049 if failed_detail:
1050 stage[2] = "Error deleting from VIM"
1051 else:
1052 stage[2] = "Deleted from VIM"
1053 db_nsr_update["detailed-status"] = " ".join(stage)
1054 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1055 self._write_op_status(nslcmop_id, stage)
1056
1057 if failed_detail:
1058 raise LcmException("; ".join(failed_detail))
1059 return
1060
1061 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
1062 n2vc_key_list, stage):
1063 """
1064 Instantiate at RO
1065 :param logging_text: preffix text to use at logging
1066 :param nsr_id: nsr identity
1067 :param nsd: database content of ns descriptor
1068 :param db_nsr: database content of ns record
1069 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1070 :param db_vnfrs:
1071 :param db_vnfds_ref: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1072 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1073 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1074 :return: None or exception
1075 """
1076 try:
1077 db_nsr_update = {}
1078 RO_descriptor_number = 0 # number of descriptors created at RO
1079 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
1080 nslcmop_id = db_nslcmop["_id"]
1081 start_deploy = time()
1082 ns_params = db_nslcmop.get("operationParams")
1083 if ns_params and ns_params.get("timeout_ns_deploy"):
1084 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1085 else:
1086 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1087
1088 # Check for and optionally request placement optimization. Database will be updated if placement activated
1089 stage[2] = "Waiting for Placement."
1090 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1091 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1092 for vnfr in db_vnfrs.values():
1093 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1094 break
1095 else:
1096 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1097
1098 if self.ng_ro:
1099 return await self._instantiate_ng_ro(logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs,
1100 db_vnfds_ref, n2vc_key_list, stage, start_deploy,
1101 timeout_ns_deploy)
1102 # deploy RO
1103 # get vnfds, instantiate at RO
1104 for c_vnf in nsd.get("constituent-vnfd", ()):
1105 member_vnf_index = c_vnf["member-vnf-index"]
1106 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
1107 vnfd_ref = vnfd["id"]
1108
1109 stage[2] = "Creating vnfd='{}' member_vnf_index='{}' at RO".format(vnfd_ref, member_vnf_index)
1110 db_nsr_update["detailed-status"] = " ".join(stage)
1111 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1112 self._write_op_status(nslcmop_id, stage)
1113
1114 # self.logger.debug(logging_text + stage[2])
1115 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
1116 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
1117 RO_descriptor_number += 1
1118
1119 # look position at deployed.RO.vnfd if not present it will be appended at the end
1120 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
1121 if vnf_deployed["member-vnf-index"] == member_vnf_index:
1122 break
1123 else:
1124 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
1125 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
1126
1127 # look if present
1128 RO_update = {"member-vnf-index": member_vnf_index}
1129 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
1130 if vnfd_list:
1131 RO_update["id"] = vnfd_list[0]["uuid"]
1132 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
1133 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
1134 else:
1135 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
1136 get("additionalParamsForVnf"), nsr_id)
1137 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
1138 RO_update["id"] = desc["uuid"]
1139 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
1140 vnfd_ref, member_vnf_index, desc["uuid"]))
1141 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
1142 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
1143
1144 # create nsd at RO
1145 nsd_ref = nsd["id"]
1146
1147 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
1148 db_nsr_update["detailed-status"] = " ".join(stage)
1149 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1150 self._write_op_status(nslcmop_id, stage)
1151
1152 # self.logger.debug(logging_text + stage[2])
1153 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
1154 RO_descriptor_number += 1
1155 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
1156 if nsd_list:
1157 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
1158 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
1159 nsd_ref, RO_nsd_uuid))
1160 else:
1161 nsd_RO = deepcopy(nsd)
1162 nsd_RO["id"] = RO_osm_nsd_id
1163 nsd_RO.pop("_id", None)
1164 nsd_RO.pop("_admin", None)
1165 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
1166 member_vnf_index = c_vnf["member-vnf-index"]
1167 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
1168 for c_vld in nsd_RO.get("vld", ()):
1169 for cp in c_vld.get("vnfd-connection-point-ref", ()):
1170 member_vnf_index = cp["member-vnf-index-ref"]
1171 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
1172
1173 desc = await self.RO.create("nsd", descriptor=nsd_RO)
1174 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1175 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
1176 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
1177 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1178
1179 # Crate ns at RO
1180 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
1181 db_nsr_update["detailed-status"] = " ".join(stage)
1182 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1183 self._write_op_status(nslcmop_id, stage)
1184
1185 # if present use it unless in error status
1186 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
1187 if RO_nsr_id:
1188 try:
1189 stage[2] = "Looking for existing ns at RO"
1190 db_nsr_update["detailed-status"] = " ".join(stage)
1191 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1192 self._write_op_status(nslcmop_id, stage)
1193 # self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
1194 desc = await self.RO.show("ns", RO_nsr_id)
1195
1196 except ROclient.ROClientException as e:
1197 if e.http_code != HTTPStatus.NOT_FOUND:
1198 raise
1199 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1200 if RO_nsr_id:
1201 ns_status, ns_status_info = self.RO.check_ns_status(desc)
1202 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
1203 if ns_status == "ERROR":
1204 stage[2] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
1205 self.logger.debug(logging_text + stage[2])
1206 await self.RO.delete("ns", RO_nsr_id)
1207 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1208 if not RO_nsr_id:
1209 stage[2] = "Checking dependencies"
1210 db_nsr_update["detailed-status"] = " ".join(stage)
1211 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1212 self._write_op_status(nslcmop_id, stage)
1213 # self.logger.debug(logging_text + stage[2])
1214
1215 # check if VIM is creating and wait look if previous tasks in process
1216 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
1217 if task_dependency:
1218 stage[2] = "Waiting for related tasks '{}' to be completed".format(task_name)
1219 self.logger.debug(logging_text + stage[2])
1220 await asyncio.wait(task_dependency, timeout=3600)
1221 if ns_params.get("vnf"):
1222 for vnf in ns_params["vnf"]:
1223 if "vimAccountId" in vnf:
1224 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
1225 vnf["vimAccountId"])
1226 if task_dependency:
1227 stage[2] = "Waiting for related tasks '{}' to be completed.".format(task_name)
1228 self.logger.debug(logging_text + stage[2])
1229 await asyncio.wait(task_dependency, timeout=3600)
1230
1231 stage[2] = "Checking instantiation parameters."
1232 RO_ns_params = self._ns_params_2_RO(ns_params, nsd, db_vnfds_ref, db_vnfrs, n2vc_key_list)
1233 stage[2] = "Deploying ns at VIM."
1234 db_nsr_update["detailed-status"] = " ".join(stage)
1235 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1236 self._write_op_status(nslcmop_id, stage)
1237
1238 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
1239 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
1240 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1241 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
1242 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
1243
1244 # wait until NS is ready
1245 stage[2] = "Waiting VIM to deploy ns."
1246 db_nsr_update["detailed-status"] = " ".join(stage)
1247 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1248 self._write_op_status(nslcmop_id, stage)
1249 detailed_status_old = None
1250 self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
1251
1252 old_desc = None
1253 while time() <= start_deploy + timeout_ns_deploy:
1254 desc = await self.RO.show("ns", RO_nsr_id)
1255
1256 # deploymentStatus
1257 if desc != old_desc:
1258 # desc has changed => update db
1259 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
1260 old_desc = desc
1261
1262 ns_status, ns_status_info = self.RO.check_ns_status(desc)
1263 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
1264 if ns_status == "ERROR":
1265 raise ROclient.ROClientException(ns_status_info)
1266 elif ns_status == "BUILD":
1267 stage[2] = "VIM: ({})".format(ns_status_info)
1268 elif ns_status == "ACTIVE":
1269 stage[2] = "Waiting for management IP address reported by the VIM. Updating VNFRs."
1270 try:
1271 self.ns_update_vnfr(db_vnfrs, desc)
1272 break
1273 except LcmExceptionNoMgmtIP:
1274 pass
1275 else:
1276 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
1277 if stage[2] != detailed_status_old:
1278 detailed_status_old = stage[2]
1279 db_nsr_update["detailed-status"] = " ".join(stage)
1280 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1281 self._write_op_status(nslcmop_id, stage)
1282 await asyncio.sleep(5, loop=self.loop)
1283 else: # timeout_ns_deploy
1284 raise ROclient.ROClientException("Timeout waiting ns to be ready")
1285
1286 # Updating NSR
1287 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
1288
1289 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
1290 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1291 stage[2] = "Deployed at VIM"
1292 db_nsr_update["detailed-status"] = " ".join(stage)
1293 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1294 self._write_op_status(nslcmop_id, stage)
1295 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
1296 # self.logger.debug(logging_text + "Deployed at VIM")
1297 except (ROclient.ROClientException, LcmException, DbException, NgRoException) as e:
1298 stage[2] = "ERROR deploying at VIM"
1299 self.set_vnfr_at_error(db_vnfrs, str(e))
1300 raise
1301
1302 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1303 """
1304 Wait for kdu to be up, get ip address
1305 :param logging_text: prefix use for logging
1306 :param nsr_id:
1307 :param vnfr_id:
1308 :param kdu_name:
1309 :return: IP address
1310 """
1311
1312 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1313 nb_tries = 0
1314
1315 while nb_tries < 360:
1316 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1317 kdur = next((x for x in get_iterable(db_vnfr, "kdur") if x.get("kdu-name") == kdu_name), None)
1318 if not kdur:
1319 raise LcmException("Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name))
1320 if kdur.get("status"):
1321 if kdur["status"] in ("READY", "ENABLED"):
1322 return kdur.get("ip-address")
1323 else:
1324 raise LcmException("target KDU={} is in error state".format(kdu_name))
1325
1326 await asyncio.sleep(10, loop=self.loop)
1327 nb_tries += 1
1328 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1329
1330 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
1331 """
1332 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1333 :param logging_text: prefix use for logging
1334 :param nsr_id:
1335 :param vnfr_id:
1336 :param vdu_id:
1337 :param vdu_index:
1338 :param pub_key: public ssh key to inject, None to skip
1339 :param user: user to apply the public ssh key
1340 :return: IP address
1341 """
1342
1343 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1344 ro_nsr_id = None
1345 ip_address = None
1346 nb_tries = 0
1347 target_vdu_id = None
1348 ro_retries = 0
1349
1350 while True:
1351
1352 ro_retries += 1
1353 if ro_retries >= 360: # 1 hour
1354 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
1355
1356 await asyncio.sleep(10, loop=self.loop)
1357
1358 # get ip address
1359 if not target_vdu_id:
1360 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1361
1362 if not vdu_id: # for the VNF case
1363 if db_vnfr.get("status") == "ERROR":
1364 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1365 ip_address = db_vnfr.get("ip-address")
1366 if not ip_address:
1367 continue
1368 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
1369 else: # VDU case
1370 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
1371 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
1372
1373 if not vdur and len(db_vnfr.get("vdur", ())) == 1: # If only one, this should be the target vdu
1374 vdur = db_vnfr["vdur"][0]
1375 if not vdur:
1376 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id, vdu_id,
1377 vdu_index))
1378
1379 if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE":
1380 ip_address = vdur.get("ip-address")
1381 if not ip_address:
1382 continue
1383 target_vdu_id = vdur["vdu-id-ref"]
1384 elif vdur.get("status") == "ERROR":
1385 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1386
1387 if not target_vdu_id:
1388 continue
1389
1390 # inject public key into machine
1391 if pub_key and user:
1392 # wait until NS is deployed at RO
1393 if not ro_nsr_id:
1394 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1395 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
1396 if not ro_nsr_id:
1397 continue
1398
1399 # self.logger.debug(logging_text + "Inserting RO key")
1400 if vdur.get("pdu-type"):
1401 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1402 return ip_address
1403 try:
1404 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
1405 if self.ng_ro:
1406 target = {"action": "inject_ssh_key", "key": pub_key, "user": user,
1407 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdu_id}]}],
1408 }
1409 await self.RO.deploy(nsr_id, target)
1410 else:
1411 result_dict = await self.RO.create_action(
1412 item="ns",
1413 item_id_name=ro_nsr_id,
1414 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
1415 )
1416 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1417 if not result_dict or not isinstance(result_dict, dict):
1418 raise LcmException("Unknown response from RO when injecting key")
1419 for result in result_dict.values():
1420 if result.get("vim_result") == 200:
1421 break
1422 else:
1423 raise ROclient.ROClientException("error injecting key: {}".format(
1424 result.get("description")))
1425 break
1426 except NgRoException as e:
1427 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1428 except ROclient.ROClientException as e:
1429 if not nb_tries:
1430 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1431 format(e, 20*10))
1432 nb_tries += 1
1433 if nb_tries >= 20:
1434 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1435 else:
1436 break
1437
1438 return ip_address
1439
1440 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1441 """
1442 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1443 """
1444 my_vca = vca_deployed_list[vca_index]
1445 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1446 # vdu or kdu: no dependencies
1447 return
1448 timeout = 300
1449 while timeout >= 0:
1450 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1451 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1452 configuration_status_list = db_nsr["configurationStatus"]
1453 for index, vca_deployed in enumerate(configuration_status_list):
1454 if index == vca_index:
1455 # myself
1456 continue
1457 if not my_vca.get("member-vnf-index") or \
1458 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1459 internal_status = configuration_status_list[index].get("status")
1460 if internal_status == 'READY':
1461 continue
1462 elif internal_status == 'BROKEN':
1463 raise LcmException("Configuration aborted because dependent charm/s has failed")
1464 else:
1465 break
1466 else:
1467 # no dependencies, return
1468 return
1469 await asyncio.sleep(10)
1470 timeout -= 1
1471
1472 raise LcmException("Configuration aborted because dependent charm/s timeout")
1473
1474 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
1475 config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name,
1476 ee_config_descriptor):
1477 nsr_id = db_nsr["_id"]
1478 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1479 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1480 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1481 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1482 db_dict = {
1483 'collection': 'nsrs',
1484 'filter': {'_id': nsr_id},
1485 'path': db_update_entry
1486 }
1487 step = ""
1488 try:
1489
1490 element_type = 'NS'
1491 element_under_configuration = nsr_id
1492
1493 vnfr_id = None
1494 if db_vnfr:
1495 vnfr_id = db_vnfr["_id"]
1496 osm_config["osm"]["vnf_id"] = vnfr_id
1497
1498 namespace = "{nsi}.{ns}".format(
1499 nsi=nsi_id if nsi_id else "",
1500 ns=nsr_id)
1501
1502 if vnfr_id:
1503 element_type = 'VNF'
1504 element_under_configuration = vnfr_id
1505 namespace += ".{}".format(vnfr_id)
1506 if vdu_id:
1507 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1508 element_type = 'VDU'
1509 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1510 osm_config["osm"]["vdu_id"] = vdu_id
1511 elif kdu_name:
1512 namespace += ".{}".format(kdu_name)
1513 element_type = 'KDU'
1514 element_under_configuration = kdu_name
1515 osm_config["osm"]["kdu_name"] = kdu_name
1516
1517 # Get artifact path
1518 artifact_path = "{}/{}/{}/{}".format(
1519 base_folder["folder"],
1520 base_folder["pkg-dir"],
1521 "charms" if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1522 vca_name
1523 )
1524 # get initial_config_primitive_list that applies to this element
1525 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1526
1527 # add config if not present for NS charm
1528 ee_descriptor_id = ee_config_descriptor.get("id")
1529 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1530 vca_deployed, ee_descriptor_id)
1531
1532 # n2vc_redesign STEP 3.1
1533 # find old ee_id if exists
1534 ee_id = vca_deployed.get("ee_id")
1535
1536 # create or register execution environment in VCA
1537 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm"):
1538
1539 self._write_configuration_status(
1540 nsr_id=nsr_id,
1541 vca_index=vca_index,
1542 status='CREATING',
1543 element_under_configuration=element_under_configuration,
1544 element_type=element_type
1545 )
1546
1547 step = "create execution environment"
1548 self.logger.debug(logging_text + step)
1549 ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
1550 namespace=namespace,
1551 reuse_ee_id=ee_id,
1552 db_dict=db_dict,
1553 config=osm_config,
1554 artifact_path=artifact_path,
1555 vca_type=vca_type)
1556
1557 elif vca_type == "native_charm":
1558 step = "Waiting to VM being up and getting IP address"
1559 self.logger.debug(logging_text + step)
1560 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1561 user=None, pub_key=None)
1562 credentials = {"hostname": rw_mgmt_ip}
1563 # get username
1564 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1565 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1566 # merged. Meanwhile let's get username from initial-config-primitive
1567 if not username and initial_config_primitive_list:
1568 for config_primitive in initial_config_primitive_list:
1569 for param in config_primitive.get("parameter", ()):
1570 if param["name"] == "ssh-username":
1571 username = param["value"]
1572 break
1573 if not username:
1574 raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
1575 "'config-access.ssh-access.default-user'")
1576 credentials["username"] = username
1577 # n2vc_redesign STEP 3.2
1578
1579 self._write_configuration_status(
1580 nsr_id=nsr_id,
1581 vca_index=vca_index,
1582 status='REGISTERING',
1583 element_under_configuration=element_under_configuration,
1584 element_type=element_type
1585 )
1586
1587 step = "register execution environment {}".format(credentials)
1588 self.logger.debug(logging_text + step)
1589 ee_id = await self.vca_map[vca_type].register_execution_environment(
1590 credentials=credentials, namespace=namespace, db_dict=db_dict)
1591
1592 # for compatibility with MON/POL modules, the need model and application name at database
1593 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1594 ee_id_parts = ee_id.split('.')
1595 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1596 if len(ee_id_parts) >= 2:
1597 model_name = ee_id_parts[0]
1598 application_name = ee_id_parts[1]
1599 db_nsr_update[db_update_entry + "model"] = model_name
1600 db_nsr_update[db_update_entry + "application"] = application_name
1601
1602 # n2vc_redesign STEP 3.3
1603 step = "Install configuration Software"
1604
1605 self._write_configuration_status(
1606 nsr_id=nsr_id,
1607 vca_index=vca_index,
1608 status='INSTALLING SW',
1609 element_under_configuration=element_under_configuration,
1610 element_type=element_type,
1611 other_update=db_nsr_update
1612 )
1613
1614 # TODO check if already done
1615 self.logger.debug(logging_text + step)
1616 config = None
1617 if vca_type == "native_charm":
1618 config_primitive = next((p for p in initial_config_primitive_list if p["name"] == "config"), None)
1619 if config_primitive:
1620 config = self._map_primitive_params(
1621 config_primitive,
1622 {},
1623 deploy_params
1624 )
1625 num_units = 1
1626 if vca_type == "lxc_proxy_charm":
1627 if element_type == "NS":
1628 num_units = db_nsr.get("config-units") or 1
1629 elif element_type == "VNF":
1630 num_units = db_vnfr.get("config-units") or 1
1631 elif element_type == "VDU":
1632 for v in db_vnfr["vdur"]:
1633 if vdu_id == v["vdu-id-ref"]:
1634 num_units = v.get("config-units") or 1
1635 break
1636
1637 await self.vca_map[vca_type].install_configuration_sw(
1638 ee_id=ee_id,
1639 artifact_path=artifact_path,
1640 db_dict=db_dict,
1641 config=config,
1642 num_units=num_units,
1643 vca_type=vca_type
1644 )
1645
1646 # write in db flag of configuration_sw already installed
1647 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1648
1649 # add relations for this VCA (wait for other peers related with this VCA)
1650 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id,
1651 vca_index=vca_index, vca_type=vca_type)
1652
1653 # if SSH access is required, then get execution environment SSH public
1654 # if native charm we have waited already to VM be UP
1655 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm"):
1656 pub_key = None
1657 user = None
1658 # self.logger.debug("get ssh key block")
1659 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1660 # self.logger.debug("ssh key needed")
1661 # Needed to inject a ssh key
1662 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1663 step = "Install configuration Software, getting public ssh key"
1664 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1665
1666 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1667 else:
1668 # self.logger.debug("no need to get ssh key")
1669 step = "Waiting to VM being up and getting IP address"
1670 self.logger.debug(logging_text + step)
1671
1672 # n2vc_redesign STEP 5.1
1673 # wait for RO (ip-address) Insert pub_key into VM
1674 if vnfr_id:
1675 if kdu_name:
1676 rw_mgmt_ip = await self.wait_kdu_up(logging_text, nsr_id, vnfr_id, kdu_name)
1677 else:
1678 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id,
1679 vdu_index, user=user, pub_key=pub_key)
1680 else:
1681 rw_mgmt_ip = None # This is for a NS configuration
1682
1683 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1684
1685 # store rw_mgmt_ip in deploy params for later replacement
1686 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1687
1688 # n2vc_redesign STEP 6 Execute initial config primitive
1689 step = 'execute initial config primitive'
1690
1691 # wait for dependent primitives execution (NS -> VNF -> VDU)
1692 if initial_config_primitive_list:
1693 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1694
1695 # stage, in function of element type: vdu, kdu, vnf or ns
1696 my_vca = vca_deployed_list[vca_index]
1697 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1698 # VDU or KDU
1699 stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1700 elif my_vca.get("member-vnf-index"):
1701 # VNF
1702 stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1703 else:
1704 # NS
1705 stage[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1706
1707 self._write_configuration_status(
1708 nsr_id=nsr_id,
1709 vca_index=vca_index,
1710 status='EXECUTING PRIMITIVE'
1711 )
1712
1713 self._write_op_status(
1714 op_id=nslcmop_id,
1715 stage=stage
1716 )
1717
1718 check_if_terminated_needed = True
1719 for initial_config_primitive in initial_config_primitive_list:
1720 # adding information on the vca_deployed if it is a NS execution environment
1721 if not vca_deployed["member-vnf-index"]:
1722 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1723 # TODO check if already done
1724 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1725
1726 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1727 self.logger.debug(logging_text + step)
1728 await self.vca_map[vca_type].exec_primitive(
1729 ee_id=ee_id,
1730 primitive_name=initial_config_primitive["name"],
1731 params_dict=primitive_params_,
1732 db_dict=db_dict
1733 )
1734 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1735 if check_if_terminated_needed:
1736 if config_descriptor.get('terminate-config-primitive'):
1737 self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True})
1738 check_if_terminated_needed = False
1739
1740 # TODO register in database that primitive is done
1741
1742 # STEP 7 Configure metrics
1743 if vca_type == "helm":
1744 prometheus_jobs = await self.add_prometheus_metrics(
1745 ee_id=ee_id,
1746 artifact_path=artifact_path,
1747 ee_config_descriptor=ee_config_descriptor,
1748 vnfr_id=vnfr_id,
1749 nsr_id=nsr_id,
1750 target_ip=rw_mgmt_ip,
1751 )
1752 if prometheus_jobs:
1753 self.update_db_2("nsrs", nsr_id, {db_update_entry + "prometheus_jobs": prometheus_jobs})
1754
1755 step = "instantiated at VCA"
1756 self.logger.debug(logging_text + step)
1757
1758 self._write_configuration_status(
1759 nsr_id=nsr_id,
1760 vca_index=vca_index,
1761 status='READY'
1762 )
1763
1764 except Exception as e: # TODO not use Exception but N2VC exception
1765 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1766 if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)):
1767 self.logger.error("Exception while {} : {}".format(step, e), exc_info=True)
1768 self._write_configuration_status(
1769 nsr_id=nsr_id,
1770 vca_index=vca_index,
1771 status='BROKEN'
1772 )
1773 raise LcmException("{} {}".format(step, e)) from e
1774
1775 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1776 error_description: str = None, error_detail: str = None, other_update: dict = None):
1777 """
1778 Update db_nsr fields.
1779 :param nsr_id:
1780 :param ns_state:
1781 :param current_operation:
1782 :param current_operation_id:
1783 :param error_description:
1784 :param error_detail:
1785 :param other_update: Other required changes at database if provided, will be cleared
1786 :return:
1787 """
1788 try:
1789 db_dict = other_update or {}
1790 db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility
1791 db_dict["_admin.current-operation"] = current_operation_id
1792 db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None
1793 db_dict["currentOperation"] = current_operation
1794 db_dict["currentOperationID"] = current_operation_id
1795 db_dict["errorDescription"] = error_description
1796 db_dict["errorDetail"] = error_detail
1797
1798 if ns_state:
1799 db_dict["nsState"] = ns_state
1800 self.update_db_2("nsrs", nsr_id, db_dict)
1801 except DbException as e:
1802 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1803
1804 def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0,
1805 operation_state: str = None, other_update: dict = None):
1806 try:
1807 db_dict = other_update or {}
1808 db_dict['queuePosition'] = queuePosition
1809 if isinstance(stage, list):
1810 db_dict['stage'] = stage[0]
1811 db_dict['detailed-status'] = " ".join(stage)
1812 elif stage is not None:
1813 db_dict['stage'] = str(stage)
1814
1815 if error_message is not None:
1816 db_dict['errorMessage'] = error_message
1817 if operation_state is not None:
1818 db_dict['operationState'] = operation_state
1819 db_dict["statusEnteredTime"] = time()
1820 self.update_db_2("nslcmops", op_id, db_dict)
1821 except DbException as e:
1822 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1823
1824 def _write_all_config_status(self, db_nsr: dict, status: str):
1825 try:
1826 nsr_id = db_nsr["_id"]
1827 # configurationStatus
1828 config_status = db_nsr.get('configurationStatus')
1829 if config_status:
1830 db_nsr_update = {"configurationStatus.{}.status".format(index): status for index, v in
1831 enumerate(config_status) if v}
1832 # update status
1833 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1834
1835 except DbException as e:
1836 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1837
1838 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1839 element_under_configuration: str = None, element_type: str = None,
1840 other_update: dict = None):
1841
1842 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1843 # .format(vca_index, status))
1844
1845 try:
1846 db_path = 'configurationStatus.{}.'.format(vca_index)
1847 db_dict = other_update or {}
1848 if status:
1849 db_dict[db_path + 'status'] = status
1850 if element_under_configuration:
1851 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1852 if element_type:
1853 db_dict[db_path + 'elementType'] = element_type
1854 self.update_db_2("nsrs", nsr_id, db_dict)
1855 except DbException as e:
1856 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1857 .format(status, nsr_id, vca_index, e))
1858
1859 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
1860 """
1861 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1862 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1863 Database is used because the result can be obtained from a different LCM worker in case of HA.
1864 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1865 :param db_nslcmop: database content of nslcmop
1866 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1867 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
1868 computed 'vim-account-id'
1869 """
1870 modified = False
1871 nslcmop_id = db_nslcmop['_id']
1872 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
1873 if placement_engine == "PLA":
1874 self.logger.debug(logging_text + "Invoke and wait for placement optimization")
1875 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id}, loop=self.loop)
1876 db_poll_interval = 5
1877 wait = db_poll_interval * 10
1878 pla_result = None
1879 while not pla_result and wait >= 0:
1880 await asyncio.sleep(db_poll_interval)
1881 wait -= db_poll_interval
1882 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1883 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
1884
1885 if not pla_result:
1886 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id))
1887
1888 for pla_vnf in pla_result['vnf']:
1889 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
1890 if not pla_vnf.get('vimAccountId') or not vnfr:
1891 continue
1892 modified = True
1893 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
1894 # Modifies db_vnfrs
1895 vnfr["vim-account-id"] = pla_vnf['vimAccountId']
1896 return modified
1897
1898 def update_nsrs_with_pla_result(self, params):
1899 try:
1900 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
1901 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
1902 except Exception as e:
1903 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
1904
1905 async def instantiate(self, nsr_id, nslcmop_id):
1906 """
1907
1908 :param nsr_id: ns instance to deploy
1909 :param nslcmop_id: operation to run
1910 :return:
1911 """
1912
1913 # Try to lock HA task here
1914 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1915 if not task_is_locked_by_me:
1916 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1917 return
1918
1919 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1920 self.logger.debug(logging_text + "Enter")
1921
1922 # get all needed from database
1923
1924 # database nsrs record
1925 db_nsr = None
1926
1927 # database nslcmops record
1928 db_nslcmop = None
1929
1930 # update operation on nsrs
1931 db_nsr_update = {}
1932 # update operation on nslcmops
1933 db_nslcmop_update = {}
1934
1935 nslcmop_operation_state = None
1936 db_vnfrs = {} # vnf's info indexed by member-index
1937 # n2vc_info = {}
1938 tasks_dict_info = {} # from task to info text
1939 exc = None
1940 error_list = []
1941 stage = ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1942 # ^ stage, step, VIM progress
1943 try:
1944 # wait for any previous tasks in process
1945 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1946
1947 stage[1] = "Sync filesystem from database."
1948 self.fs.sync() # TODO, make use of partial sync, only for the needed packages
1949
1950 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1951 stage[1] = "Reading from database."
1952 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1953 db_nsr_update["detailed-status"] = "creating"
1954 db_nsr_update["operational-status"] = "init"
1955 self._write_ns_status(
1956 nsr_id=nsr_id,
1957 ns_state="BUILDING",
1958 current_operation="INSTANTIATING",
1959 current_operation_id=nslcmop_id,
1960 other_update=db_nsr_update
1961 )
1962 self._write_op_status(
1963 op_id=nslcmop_id,
1964 stage=stage,
1965 queuePosition=0
1966 )
1967
1968 # read from db: operation
1969 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
1970 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1971 ns_params = db_nslcmop.get("operationParams")
1972 if ns_params and ns_params.get("timeout_ns_deploy"):
1973 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1974 else:
1975 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1976
1977 # read from db: ns
1978 stage[1] = "Getting nsr={} from db.".format(nsr_id)
1979 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1980 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
1981 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
1982 db_nsr["nsd"] = nsd
1983 # nsr_name = db_nsr["name"] # TODO short-name??
1984
1985 # read from db: vnf's of this ns
1986 stage[1] = "Getting vnfrs from db."
1987 self.logger.debug(logging_text + stage[1])
1988 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1989
1990 # read from db: vnfd's for every vnf
1991 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1992 db_vnfds = {} # every vnfd data indexed by vnf id
1993 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
1994
1995 # for each vnf in ns, read vnfd
1996 for vnfr in db_vnfrs_list:
1997 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
1998 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
1999 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
2000
2001 # if we haven't this vnfd, read it from db
2002 if vnfd_id not in db_vnfds:
2003 # read from db
2004 stage[1] = "Getting vnfd={} id='{}' from db.".format(vnfd_id, vnfd_ref)
2005 self.logger.debug(logging_text + stage[1])
2006 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2007
2008 # store vnfd
2009 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
2010 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
2011 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
2012
2013 # Get or generates the _admin.deployed.VCA list
2014 vca_deployed_list = None
2015 if db_nsr["_admin"].get("deployed"):
2016 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2017 if vca_deployed_list is None:
2018 vca_deployed_list = []
2019 configuration_status_list = []
2020 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2021 db_nsr_update["configurationStatus"] = configuration_status_list
2022 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2023 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2024 elif isinstance(vca_deployed_list, dict):
2025 # maintain backward compatibility. Change a dict to list at database
2026 vca_deployed_list = list(vca_deployed_list.values())
2027 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2028 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2029
2030 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
2031 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2032 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2033
2034 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2035 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2036 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2037 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"})
2038
2039 # n2vc_redesign STEP 2 Deploy Network Scenario
2040 stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
2041 self._write_op_status(
2042 op_id=nslcmop_id,
2043 stage=stage
2044 )
2045
2046 stage[1] = "Deploying KDUs."
2047 # self.logger.debug(logging_text + "Before deploy_kdus")
2048 # Call to deploy_kdus in case exists the "vdu:kdu" param
2049 await self.deploy_kdus(
2050 logging_text=logging_text,
2051 nsr_id=nsr_id,
2052 nslcmop_id=nslcmop_id,
2053 db_vnfrs=db_vnfrs,
2054 db_vnfds=db_vnfds,
2055 task_instantiation_info=tasks_dict_info,
2056 )
2057
2058 stage[1] = "Getting VCA public key."
2059 # n2vc_redesign STEP 1 Get VCA public ssh-key
2060 # feature 1429. Add n2vc public key to needed VMs
2061 n2vc_key = self.n2vc.get_public_key()
2062 n2vc_key_list = [n2vc_key]
2063 if self.vca_config.get("public_key"):
2064 n2vc_key_list.append(self.vca_config["public_key"])
2065
2066 stage[1] = "Deploying NS at VIM."
2067 task_ro = asyncio.ensure_future(
2068 self.instantiate_RO(
2069 logging_text=logging_text,
2070 nsr_id=nsr_id,
2071 nsd=nsd,
2072 db_nsr=db_nsr,
2073 db_nslcmop=db_nslcmop,
2074 db_vnfrs=db_vnfrs,
2075 db_vnfds_ref=db_vnfds_ref,
2076 n2vc_key_list=n2vc_key_list,
2077 stage=stage
2078 )
2079 )
2080 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2081 tasks_dict_info[task_ro] = "Deploying at VIM"
2082
2083 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2084 stage[1] = "Deploying Execution Environments."
2085 self.logger.debug(logging_text + stage[1])
2086
2087 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2088 # get_iterable() returns a value from a dict or empty tuple if key does not exist
2089 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
2090 vnfd_id = c_vnf["vnfd-id-ref"]
2091 vnfd = db_vnfds_ref[vnfd_id]
2092 member_vnf_index = str(c_vnf["member-vnf-index"])
2093 db_vnfr = db_vnfrs[member_vnf_index]
2094 base_folder = vnfd["_admin"]["storage"]
2095 vdu_id = None
2096 vdu_index = 0
2097 vdu_name = None
2098 kdu_name = None
2099
2100 # Get additional parameters
2101 deploy_params = {"OSM": self._get_osm_params(db_vnfr)}
2102 if db_vnfr.get("additionalParamsForVnf"):
2103 deploy_params.update(self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy()))
2104
2105 descriptor_config = vnfd.get("vnf-configuration")
2106 if descriptor_config:
2107 self._deploy_n2vc(
2108 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
2109 db_nsr=db_nsr,
2110 db_vnfr=db_vnfr,
2111 nslcmop_id=nslcmop_id,
2112 nsr_id=nsr_id,
2113 nsi_id=nsi_id,
2114 vnfd_id=vnfd_id,
2115 vdu_id=vdu_id,
2116 kdu_name=kdu_name,
2117 member_vnf_index=member_vnf_index,
2118 vdu_index=vdu_index,
2119 vdu_name=vdu_name,
2120 deploy_params=deploy_params,
2121 descriptor_config=descriptor_config,
2122 base_folder=base_folder,
2123 task_instantiation_info=tasks_dict_info,
2124 stage=stage
2125 )
2126
2127 # Deploy charms for each VDU that supports one.
2128 for vdud in get_iterable(vnfd, 'vdu'):
2129 vdu_id = vdud["id"]
2130 descriptor_config = vdud.get('vdu-configuration')
2131 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
2132 if vdur.get("additionalParams"):
2133 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
2134 else:
2135 deploy_params_vdu = deploy_params
2136 deploy_params_vdu["OSM"] = self._get_osm_params(db_vnfr, vdu_id, vdu_count_index=0)
2137 if descriptor_config:
2138 vdu_name = None
2139 kdu_name = None
2140 for vdu_index in range(int(vdud.get("count", 1))):
2141 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2142 self._deploy_n2vc(
2143 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2144 member_vnf_index, vdu_id, vdu_index),
2145 db_nsr=db_nsr,
2146 db_vnfr=db_vnfr,
2147 nslcmop_id=nslcmop_id,
2148 nsr_id=nsr_id,
2149 nsi_id=nsi_id,
2150 vnfd_id=vnfd_id,
2151 vdu_id=vdu_id,
2152 kdu_name=kdu_name,
2153 member_vnf_index=member_vnf_index,
2154 vdu_index=vdu_index,
2155 vdu_name=vdu_name,
2156 deploy_params=deploy_params_vdu,
2157 descriptor_config=descriptor_config,
2158 base_folder=base_folder,
2159 task_instantiation_info=tasks_dict_info,
2160 stage=stage
2161 )
2162 for kdud in get_iterable(vnfd, 'kdu'):
2163 kdu_name = kdud["name"]
2164 descriptor_config = kdud.get('kdu-configuration')
2165 if descriptor_config:
2166 vdu_id = None
2167 vdu_index = 0
2168 vdu_name = None
2169 kdur = next(x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name)
2170 deploy_params_kdu = {"OSM": self._get_osm_params(db_vnfr)}
2171 if kdur.get("additionalParams"):
2172 deploy_params_kdu = self._format_additional_params(kdur["additionalParams"])
2173
2174 self._deploy_n2vc(
2175 logging_text=logging_text,
2176 db_nsr=db_nsr,
2177 db_vnfr=db_vnfr,
2178 nslcmop_id=nslcmop_id,
2179 nsr_id=nsr_id,
2180 nsi_id=nsi_id,
2181 vnfd_id=vnfd_id,
2182 vdu_id=vdu_id,
2183 kdu_name=kdu_name,
2184 member_vnf_index=member_vnf_index,
2185 vdu_index=vdu_index,
2186 vdu_name=vdu_name,
2187 deploy_params=deploy_params_kdu,
2188 descriptor_config=descriptor_config,
2189 base_folder=base_folder,
2190 task_instantiation_info=tasks_dict_info,
2191 stage=stage
2192 )
2193
2194 # Check if this NS has a charm configuration
2195 descriptor_config = nsd.get("ns-configuration")
2196 if descriptor_config and descriptor_config.get("juju"):
2197 vnfd_id = None
2198 db_vnfr = None
2199 member_vnf_index = None
2200 vdu_id = None
2201 kdu_name = None
2202 vdu_index = 0
2203 vdu_name = None
2204
2205 # Get additional parameters
2206 deploy_params = {"OSM": self._get_osm_params(db_vnfr)}
2207 if db_nsr.get("additionalParamsForNs"):
2208 deploy_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"].copy()))
2209 base_folder = nsd["_admin"]["storage"]
2210 self._deploy_n2vc(
2211 logging_text=logging_text,
2212 db_nsr=db_nsr,
2213 db_vnfr=db_vnfr,
2214 nslcmop_id=nslcmop_id,
2215 nsr_id=nsr_id,
2216 nsi_id=nsi_id,
2217 vnfd_id=vnfd_id,
2218 vdu_id=vdu_id,
2219 kdu_name=kdu_name,
2220 member_vnf_index=member_vnf_index,
2221 vdu_index=vdu_index,
2222 vdu_name=vdu_name,
2223 deploy_params=deploy_params,
2224 descriptor_config=descriptor_config,
2225 base_folder=base_folder,
2226 task_instantiation_info=tasks_dict_info,
2227 stage=stage
2228 )
2229
2230 # rest of staff will be done at finally
2231
2232 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
2233 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e))
2234 exc = e
2235 except asyncio.CancelledError:
2236 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
2237 exc = "Operation was cancelled"
2238 except Exception as e:
2239 exc = traceback.format_exc()
2240 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
2241 finally:
2242 if exc:
2243 error_list.append(str(exc))
2244 try:
2245 # wait for pending tasks
2246 if tasks_dict_info:
2247 stage[1] = "Waiting for instantiate pending tasks."
2248 self.logger.debug(logging_text + stage[1])
2249 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy,
2250 stage, nslcmop_id, nsr_id=nsr_id)
2251 stage[1] = stage[2] = ""
2252 except asyncio.CancelledError:
2253 error_list.append("Cancelled")
2254 # TODO cancel all tasks
2255 except Exception as exc:
2256 error_list.append(str(exc))
2257
2258 # update operation-status
2259 db_nsr_update["operational-status"] = "running"
2260 # let's begin with VCA 'configured' status (later we can change it)
2261 db_nsr_update["config-status"] = "configured"
2262 for task, task_name in tasks_dict_info.items():
2263 if not task.done() or task.cancelled() or task.exception():
2264 if task_name.startswith(self.task_name_deploy_vca):
2265 # A N2VC task is pending
2266 db_nsr_update["config-status"] = "failed"
2267 else:
2268 # RO or KDU task is pending
2269 db_nsr_update["operational-status"] = "failed"
2270
2271 # update status at database
2272 if error_list:
2273 error_detail = ". ".join(error_list)
2274 self.logger.error(logging_text + error_detail)
2275 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
2276 error_description_nsr = 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id, stage[0])
2277
2278 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
2279 db_nslcmop_update["detailed-status"] = error_detail
2280 nslcmop_operation_state = "FAILED"
2281 ns_state = "BROKEN"
2282 else:
2283 error_detail = None
2284 error_description_nsr = error_description_nslcmop = None
2285 ns_state = "READY"
2286 db_nsr_update["detailed-status"] = "Done"
2287 db_nslcmop_update["detailed-status"] = "Done"
2288 nslcmop_operation_state = "COMPLETED"
2289
2290 if db_nsr:
2291 self._write_ns_status(
2292 nsr_id=nsr_id,
2293 ns_state=ns_state,
2294 current_operation="IDLE",
2295 current_operation_id=None,
2296 error_description=error_description_nsr,
2297 error_detail=error_detail,
2298 other_update=db_nsr_update
2299 )
2300 self._write_op_status(
2301 op_id=nslcmop_id,
2302 stage="",
2303 error_message=error_description_nslcmop,
2304 operation_state=nslcmop_operation_state,
2305 other_update=db_nslcmop_update,
2306 )
2307
2308 if nslcmop_operation_state:
2309 try:
2310 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2311 "operationState": nslcmop_operation_state},
2312 loop=self.loop)
2313 except Exception as e:
2314 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2315
2316 self.logger.debug(logging_text + "Exit")
2317 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2318
2319 async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int,
2320 timeout: int = 3600, vca_type: str = None) -> bool:
2321
2322 # steps:
2323 # 1. find all relations for this VCA
2324 # 2. wait for other peers related
2325 # 3. add relations
2326
2327 try:
2328 vca_type = vca_type or "lxc_proxy_charm"
2329
2330 # STEP 1: find all relations for this VCA
2331
2332 # read nsr record
2333 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2334 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2335
2336 # this VCA data
2337 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
2338
2339 # read all ns-configuration relations
2340 ns_relations = list()
2341 db_ns_relations = deep_get(nsd, ('ns-configuration', 'relation'))
2342 if db_ns_relations:
2343 for r in db_ns_relations:
2344 # check if this VCA is in the relation
2345 if my_vca.get('member-vnf-index') in\
2346 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2347 ns_relations.append(r)
2348
2349 # read all vnf-configuration relations
2350 vnf_relations = list()
2351 db_vnfd_list = db_nsr.get('vnfd-id')
2352 if db_vnfd_list:
2353 for vnfd in db_vnfd_list:
2354 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
2355 db_vnf_relations = deep_get(db_vnfd, ('vnf-configuration', 'relation'))
2356 if db_vnf_relations:
2357 for r in db_vnf_relations:
2358 # check if this VCA is in the relation
2359 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2360 vnf_relations.append(r)
2361
2362 # if no relations, terminate
2363 if not ns_relations and not vnf_relations:
2364 self.logger.debug(logging_text + ' No relations')
2365 return True
2366
2367 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
2368
2369 # add all relations
2370 start = time()
2371 while True:
2372 # check timeout
2373 now = time()
2374 if now - start >= timeout:
2375 self.logger.error(logging_text + ' : timeout adding relations')
2376 return False
2377
2378 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2379 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2380
2381 # for each defined NS relation, find the VCA's related
2382 for r in ns_relations.copy():
2383 from_vca_ee_id = None
2384 to_vca_ee_id = None
2385 from_vca_endpoint = None
2386 to_vca_endpoint = None
2387 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2388 for vca in vca_list:
2389 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
2390 and vca.get('config_sw_installed'):
2391 from_vca_ee_id = vca.get('ee_id')
2392 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2393 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
2394 and vca.get('config_sw_installed'):
2395 to_vca_ee_id = vca.get('ee_id')
2396 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2397 if from_vca_ee_id and to_vca_ee_id:
2398 # add relation
2399 await self.vca_map[vca_type].add_relation(
2400 ee_id_1=from_vca_ee_id,
2401 ee_id_2=to_vca_ee_id,
2402 endpoint_1=from_vca_endpoint,
2403 endpoint_2=to_vca_endpoint)
2404 # remove entry from relations list
2405 ns_relations.remove(r)
2406 else:
2407 # check failed peers
2408 try:
2409 vca_status_list = db_nsr.get('configurationStatus')
2410 if vca_status_list:
2411 for i in range(len(vca_list)):
2412 vca = vca_list[i]
2413 vca_status = vca_status_list[i]
2414 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
2415 if vca_status.get('status') == 'BROKEN':
2416 # peer broken: remove relation from list
2417 ns_relations.remove(r)
2418 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
2419 if vca_status.get('status') == 'BROKEN':
2420 # peer broken: remove relation from list
2421 ns_relations.remove(r)
2422 except Exception:
2423 # ignore
2424 pass
2425
2426 # for each defined VNF relation, find the VCA's related
2427 for r in vnf_relations.copy():
2428 from_vca_ee_id = None
2429 to_vca_ee_id = None
2430 from_vca_endpoint = None
2431 to_vca_endpoint = None
2432 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2433 for vca in vca_list:
2434 key_to_check = "vdu_id"
2435 if vca.get("vdu_id") is None:
2436 key_to_check = "vnfd_id"
2437 if vca.get(key_to_check) == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
2438 from_vca_ee_id = vca.get('ee_id')
2439 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2440 if vca.get(key_to_check) == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
2441 to_vca_ee_id = vca.get('ee_id')
2442 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2443 if from_vca_ee_id and to_vca_ee_id:
2444 # add relation
2445 await self.vca_map[vca_type].add_relation(
2446 ee_id_1=from_vca_ee_id,
2447 ee_id_2=to_vca_ee_id,
2448 endpoint_1=from_vca_endpoint,
2449 endpoint_2=to_vca_endpoint)
2450 # remove entry from relations list
2451 vnf_relations.remove(r)
2452 else:
2453 # check failed peers
2454 try:
2455 vca_status_list = db_nsr.get('configurationStatus')
2456 if vca_status_list:
2457 for i in range(len(vca_list)):
2458 vca = vca_list[i]
2459 vca_status = vca_status_list[i]
2460 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
2461 if vca_status.get('status') == 'BROKEN':
2462 # peer broken: remove relation from list
2463 vnf_relations.remove(r)
2464 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2465 if vca_status.get('status') == 'BROKEN':
2466 # peer broken: remove relation from list
2467 vnf_relations.remove(r)
2468 except Exception:
2469 # ignore
2470 pass
2471
2472 # wait for next try
2473 await asyncio.sleep(5.0)
2474
2475 if not ns_relations and not vnf_relations:
2476 self.logger.debug('Relations added')
2477 break
2478
2479 return True
2480
2481 except Exception as e:
2482 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2483 return False
2484
2485 async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdud: dict,
2486 vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600):
2487
2488 try:
2489 k8sclustertype = k8s_instance_info["k8scluster-type"]
2490 # Instantiate kdu
2491 db_dict_install = {"collection": "nsrs",
2492 "filter": {"_id": nsr_id},
2493 "path": nsr_db_path}
2494
2495 kdu_instance = await self.k8scluster_map[k8sclustertype].install(
2496 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2497 kdu_model=k8s_instance_info["kdu-model"],
2498 atomic=True,
2499 params=k8params,
2500 db_dict=db_dict_install,
2501 timeout=timeout,
2502 kdu_name=k8s_instance_info["kdu-name"],
2503 namespace=k8s_instance_info["namespace"])
2504 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
2505
2506 # Obtain services to obtain management service ip
2507 services = await self.k8scluster_map[k8sclustertype].get_services(
2508 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2509 kdu_instance=kdu_instance,
2510 namespace=k8s_instance_info["namespace"])
2511
2512 # Obtain management service info (if exists)
2513 vnfr_update_dict = {}
2514 if services:
2515 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
2516 mgmt_services = [service for service in kdud.get("service", []) if service.get("mgmt-service")]
2517 for mgmt_service in mgmt_services:
2518 for service in services:
2519 if service["name"].startswith(mgmt_service["name"]):
2520 # Mgmt service found, Obtain service ip
2521 ip = service.get("external_ip", service.get("cluster_ip"))
2522 if isinstance(ip, list) and len(ip) == 1:
2523 ip = ip[0]
2524
2525 vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip
2526
2527 # Check if must update also mgmt ip at the vnf
2528 service_external_cp = mgmt_service.get("external-connection-point-ref")
2529 if service_external_cp:
2530 if deep_get(vnfd, ("mgmt-interface", "cp")) == service_external_cp:
2531 vnfr_update_dict["ip-address"] = ip
2532
2533 break
2534 else:
2535 self.logger.warn("Mgmt service name: {} not found".format(mgmt_service["name"]))
2536
2537 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
2538 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
2539
2540 kdu_config = kdud.get("kdu-configuration")
2541 if kdu_config and kdu_config.get("initial-config-primitive") and kdu_config.get("juju") is None:
2542 initial_config_primitive_list = kdu_config.get("initial-config-primitive")
2543 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
2544
2545 for initial_config_primitive in initial_config_primitive_list:
2546 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, {})
2547
2548 await asyncio.wait_for(
2549 self.k8scluster_map[k8sclustertype].exec_primitive(
2550 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2551 kdu_instance=kdu_instance,
2552 primitive_name=initial_config_primitive["name"],
2553 params=primitive_params_, db_dict={}),
2554 timeout=timeout)
2555
2556 except Exception as e:
2557 # Prepare update db with error and raise exception
2558 try:
2559 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)})
2560 self.update_db_2("vnfrs", vnfr_data.get("_id"), {"kdur.{}.status".format(kdu_index): "ERROR"})
2561 except Exception:
2562 # ignore to keep original exception
2563 pass
2564 # reraise original error
2565 raise
2566
2567 return kdu_instance
2568
2569 async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
2570 # Launch kdus if present in the descriptor
2571
2572 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
2573
2574 async def _get_cluster_id(cluster_id, cluster_type):
2575 nonlocal k8scluster_id_2_uuic
2576 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2577 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2578
2579 # check if K8scluster is creating and wait look if previous tasks in process
2580 task_name, task_dependency = self.lcm_tasks.lookfor_related("k8scluster", cluster_id)
2581 if task_dependency:
2582 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name, cluster_id)
2583 self.logger.debug(logging_text + text)
2584 await asyncio.wait(task_dependency, timeout=3600)
2585
2586 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2587 if not db_k8scluster:
2588 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2589
2590 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2591 if not k8s_id:
2592 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id,
2593 cluster_type))
2594 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2595 return k8s_id
2596
2597 logging_text += "Deploy kdus: "
2598 step = ""
2599 try:
2600 db_nsr_update = {"_admin.deployed.K8s": []}
2601 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2602
2603 index = 0
2604 updated_cluster_list = []
2605
2606 for vnfr_data in db_vnfrs.values():
2607 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
2608 # Step 0: Prepare and set parameters
2609 desc_params = self._format_additional_params(kdur.get("additionalParams"))
2610 vnfd_id = vnfr_data.get('vnfd-id')
2611 kdud = next(kdud for kdud in db_vnfds[vnfd_id]["kdu"] if kdud["name"] == kdur["kdu-name"])
2612 namespace = kdur.get("k8s-namespace")
2613 if kdur.get("helm-chart"):
2614 kdumodel = kdur["helm-chart"]
2615 k8sclustertype = "helm-chart"
2616 elif kdur.get("juju-bundle"):
2617 kdumodel = kdur["juju-bundle"]
2618 k8sclustertype = "juju-bundle"
2619 else:
2620 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2621 "juju-bundle. Maybe an old NBI version is running".
2622 format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
2623 # check if kdumodel is a file and exists
2624 try:
2625 storage = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage'))
2626 if storage and storage.get('pkg-dir'): # may be not present if vnfd has not artifacts
2627 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2628 filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["pkg-dir"], k8sclustertype,
2629 kdumodel)
2630 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2631 kdumodel = self.fs.path + filename
2632 except (asyncio.TimeoutError, asyncio.CancelledError):
2633 raise
2634 except Exception: # it is not a file
2635 pass
2636
2637 k8s_cluster_id = kdur["k8s-cluster"]["id"]
2638 step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
2639 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
2640
2641 # Synchronize repos
2642 if k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list:
2643 del_repo_list, added_repo_dict = await asyncio.ensure_future(
2644 self.k8sclusterhelm.synchronize_repos(cluster_uuid=cluster_uuid))
2645 if del_repo_list or added_repo_dict:
2646 unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
2647 updated = {'_admin.helm_charts_added.' +
2648 item: name for item, name in added_repo_dict.items()}
2649 self.logger.debug(logging_text + "repos synchronized on k8s cluster '{}' to_delete: {}, "
2650 "to_add: {}".format(k8s_cluster_id, del_repo_list,
2651 added_repo_dict))
2652 self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
2653 updated_cluster_list.append(cluster_uuid)
2654
2655 # Instantiate kdu
2656 step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
2657 kdur["kdu-name"], k8s_cluster_id)
2658 k8s_instance_info = {"kdu-instance": None,
2659 "k8scluster-uuid": cluster_uuid,
2660 "k8scluster-type": k8sclustertype,
2661 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
2662 "kdu-name": kdur["kdu-name"],
2663 "kdu-model": kdumodel,
2664 "namespace": namespace}
2665 db_path = "_admin.deployed.K8s.{}".format(index)
2666 db_nsr_update[db_path] = k8s_instance_info
2667 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2668
2669 task = asyncio.ensure_future(
2670 self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdud, db_vnfds[vnfd_id],
2671 k8s_instance_info, k8params=desc_params, timeout=600))
2672 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
2673 task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
2674
2675 index += 1
2676
2677 except (LcmException, asyncio.CancelledError):
2678 raise
2679 except Exception as e:
2680 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
2681 if isinstance(e, (N2VCException, DbException)):
2682 self.logger.error(logging_text + msg)
2683 else:
2684 self.logger.critical(logging_text + msg, exc_info=True)
2685 raise LcmException(msg)
2686 finally:
2687 if db_nsr_update:
2688 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2689
2690 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2691 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2692 base_folder, task_instantiation_info, stage):
2693 # launch instantiate_N2VC in a asyncio task and register task object
2694 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2695 # if not found, create one entry and update database
2696 # fill db_nsr._admin.deployed.VCA.<index>
2697
2698 self.logger.debug(logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id))
2699 if descriptor_config.get("juju"): # There is one execution envioronment of type juju
2700 ee_list = [descriptor_config]
2701 elif descriptor_config.get("execution-environment-list"):
2702 ee_list = descriptor_config.get("execution-environment-list")
2703 else: # other types as script are not supported
2704 ee_list = []
2705
2706 for ee_item in ee_list:
2707 self.logger.debug(logging_text + "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item.get('juju'),
2708 ee_item.get("helm-chart")))
2709 ee_descriptor_id = ee_item.get("id")
2710 if ee_item.get("juju"):
2711 vca_name = ee_item['juju'].get('charm')
2712 vca_type = "lxc_proxy_charm" if ee_item['juju'].get('charm') is not None else "native_charm"
2713 if ee_item['juju'].get('cloud') == "k8s":
2714 vca_type = "k8s_proxy_charm"
2715 elif ee_item['juju'].get('proxy') is False:
2716 vca_type = "native_charm"
2717 elif ee_item.get("helm-chart"):
2718 vca_name = ee_item['helm-chart']
2719 vca_type = "helm"
2720 else:
2721 self.logger.debug(logging_text + "skipping non juju neither charm configuration")
2722 continue
2723
2724 vca_index = -1
2725 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2726 if not vca_deployed:
2727 continue
2728 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2729 vca_deployed.get("vdu_id") == vdu_id and \
2730 vca_deployed.get("kdu_name") == kdu_name and \
2731 vca_deployed.get("vdu_count_index", 0) == vdu_index and \
2732 vca_deployed.get("ee_descriptor_id") == ee_descriptor_id:
2733 break
2734 else:
2735 # not found, create one.
2736 target = "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
2737 if vdu_id:
2738 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
2739 elif kdu_name:
2740 target += "/kdu/{}".format(kdu_name)
2741 vca_deployed = {
2742 "target_element": target,
2743 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
2744 "member-vnf-index": member_vnf_index,
2745 "vdu_id": vdu_id,
2746 "kdu_name": kdu_name,
2747 "vdu_count_index": vdu_index,
2748 "operational-status": "init", # TODO revise
2749 "detailed-status": "", # TODO revise
2750 "step": "initial-deploy", # TODO revise
2751 "vnfd_id": vnfd_id,
2752 "vdu_name": vdu_name,
2753 "type": vca_type,
2754 "ee_descriptor_id": ee_descriptor_id
2755 }
2756 vca_index += 1
2757
2758 # create VCA and configurationStatus in db
2759 db_dict = {
2760 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2761 "configurationStatus.{}".format(vca_index): dict()
2762 }
2763 self.update_db_2("nsrs", nsr_id, db_dict)
2764
2765 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2766
2767 # Launch task
2768 task_n2vc = asyncio.ensure_future(
2769 self.instantiate_N2VC(
2770 logging_text=logging_text,
2771 vca_index=vca_index,
2772 nsi_id=nsi_id,
2773 db_nsr=db_nsr,
2774 db_vnfr=db_vnfr,
2775 vdu_id=vdu_id,
2776 kdu_name=kdu_name,
2777 vdu_index=vdu_index,
2778 deploy_params=deploy_params,
2779 config_descriptor=descriptor_config,
2780 base_folder=base_folder,
2781 nslcmop_id=nslcmop_id,
2782 stage=stage,
2783 vca_type=vca_type,
2784 vca_name=vca_name,
2785 ee_config_descriptor=ee_item
2786 )
2787 )
2788 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2789 task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
2790 member_vnf_index or "", vdu_id or "")
2791
2792 @staticmethod
2793 def _get_terminate_config_primitive(primitive_list, vca_deployed):
2794 """ Get a sorted terminate config primitive list. In case ee_descriptor_id is present at vca_deployed,
2795 it get only those primitives for this execution envirom"""
2796
2797 primitive_list = primitive_list or []
2798 # filter primitives by ee_descriptor_id
2799 ee_descriptor_id = vca_deployed.get("ee_descriptor_id")
2800 primitive_list = [p for p in primitive_list if p.get("execution-environment-ref") == ee_descriptor_id]
2801
2802 if primitive_list:
2803 primitive_list.sort(key=lambda val: int(val['seq']))
2804
2805 return primitive_list
2806
2807 @staticmethod
2808 def _create_nslcmop(nsr_id, operation, params):
2809 """
2810 Creates a ns-lcm-opp content to be stored at database.
2811 :param nsr_id: internal id of the instance
2812 :param operation: instantiate, terminate, scale, action, ...
2813 :param params: user parameters for the operation
2814 :return: dictionary following SOL005 format
2815 """
2816 # Raise exception if invalid arguments
2817 if not (nsr_id and operation and params):
2818 raise LcmException(
2819 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2820 now = time()
2821 _id = str(uuid4())
2822 nslcmop = {
2823 "id": _id,
2824 "_id": _id,
2825 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2826 "operationState": "PROCESSING",
2827 "statusEnteredTime": now,
2828 "nsInstanceId": nsr_id,
2829 "lcmOperationType": operation,
2830 "startTime": now,
2831 "isAutomaticInvocation": False,
2832 "operationParams": params,
2833 "isCancelPending": False,
2834 "links": {
2835 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2836 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2837 }
2838 }
2839 return nslcmop
2840
2841 def _format_additional_params(self, params):
2842 params = params or {}
2843 for key, value in params.items():
2844 if str(value).startswith("!!yaml "):
2845 params[key] = yaml.safe_load(value[7:])
2846 return params
2847
2848 def _get_terminate_primitive_params(self, seq, vnf_index):
2849 primitive = seq.get('name')
2850 primitive_params = {}
2851 params = {
2852 "member_vnf_index": vnf_index,
2853 "primitive": primitive,
2854 "primitive_params": primitive_params,
2855 }
2856 desc_params = {}
2857 return self._map_primitive_params(seq, params, desc_params)
2858
2859 # sub-operations
2860
2861 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
2862 op = deep_get(db_nslcmop, ('_admin', 'operations'), [])[op_index]
2863 if op.get('operationState') == 'COMPLETED':
2864 # b. Skip sub-operation
2865 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2866 return self.SUBOPERATION_STATUS_SKIP
2867 else:
2868 # c. retry executing sub-operation
2869 # The sub-operation exists, and operationState != 'COMPLETED'
2870 # Update operationState = 'PROCESSING' to indicate a retry.
2871 operationState = 'PROCESSING'
2872 detailed_status = 'In progress'
2873 self._update_suboperation_status(
2874 db_nslcmop, op_index, operationState, detailed_status)
2875 # Return the sub-operation index
2876 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2877 # with arguments extracted from the sub-operation
2878 return op_index
2879
2880 # Find a sub-operation where all keys in a matching dictionary must match
2881 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2882 def _find_suboperation(self, db_nslcmop, match):
2883 if db_nslcmop and match:
2884 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2885 for i, op in enumerate(op_list):
2886 if all(op.get(k) == match[k] for k in match):
2887 return i
2888 return self.SUBOPERATION_STATUS_NOT_FOUND
2889
2890 # Update status for a sub-operation given its index
2891 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2892 # Update DB for HA tasks
2893 q_filter = {'_id': db_nslcmop['_id']}
2894 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2895 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2896 self.db.set_one("nslcmops",
2897 q_filter=q_filter,
2898 update_dict=update_dict,
2899 fail_on_empty=False)
2900
2901 # Add sub-operation, return the index of the added sub-operation
2902 # Optionally, set operationState, detailed-status, and operationType
2903 # Status and type are currently set for 'scale' sub-operations:
2904 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2905 # 'detailed-status' : status message
2906 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2907 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2908 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2909 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2910 RO_nsr_id=None, RO_scaling_info=None):
2911 if not db_nslcmop:
2912 return self.SUBOPERATION_STATUS_NOT_FOUND
2913 # Get the "_admin.operations" list, if it exists
2914 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2915 op_list = db_nslcmop_admin.get('operations')
2916 # Create or append to the "_admin.operations" list
2917 new_op = {'member_vnf_index': vnf_index,
2918 'vdu_id': vdu_id,
2919 'vdu_count_index': vdu_count_index,
2920 'primitive': primitive,
2921 'primitive_params': mapped_primitive_params}
2922 if operationState:
2923 new_op['operationState'] = operationState
2924 if detailed_status:
2925 new_op['detailed-status'] = detailed_status
2926 if operationType:
2927 new_op['lcmOperationType'] = operationType
2928 if RO_nsr_id:
2929 new_op['RO_nsr_id'] = RO_nsr_id
2930 if RO_scaling_info:
2931 new_op['RO_scaling_info'] = RO_scaling_info
2932 if not op_list:
2933 # No existing operations, create key 'operations' with current operation as first list element
2934 db_nslcmop_admin.update({'operations': [new_op]})
2935 op_list = db_nslcmop_admin.get('operations')
2936 else:
2937 # Existing operations, append operation to list
2938 op_list.append(new_op)
2939
2940 db_nslcmop_update = {'_admin.operations': op_list}
2941 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2942 op_index = len(op_list) - 1
2943 return op_index
2944
2945 # Helper methods for scale() sub-operations
2946
2947 # pre-scale/post-scale:
2948 # Check for 3 different cases:
2949 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2950 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2951 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2952 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2953 operationType, RO_nsr_id=None, RO_scaling_info=None):
2954 # Find this sub-operation
2955 if RO_nsr_id and RO_scaling_info:
2956 operationType = 'SCALE-RO'
2957 match = {
2958 'member_vnf_index': vnf_index,
2959 'RO_nsr_id': RO_nsr_id,
2960 'RO_scaling_info': RO_scaling_info,
2961 }
2962 else:
2963 match = {
2964 'member_vnf_index': vnf_index,
2965 'primitive': vnf_config_primitive,
2966 'primitive_params': primitive_params,
2967 'lcmOperationType': operationType
2968 }
2969 op_index = self._find_suboperation(db_nslcmop, match)
2970 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
2971 # a. New sub-operation
2972 # The sub-operation does not exist, add it.
2973 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2974 # The following parameters are set to None for all kind of scaling:
2975 vdu_id = None
2976 vdu_count_index = None
2977 vdu_name = None
2978 if RO_nsr_id and RO_scaling_info:
2979 vnf_config_primitive = None
2980 primitive_params = None
2981 else:
2982 RO_nsr_id = None
2983 RO_scaling_info = None
2984 # Initial status for sub-operation
2985 operationState = 'PROCESSING'
2986 detailed_status = 'In progress'
2987 # Add sub-operation for pre/post-scaling (zero or more operations)
2988 self._add_suboperation(db_nslcmop,
2989 vnf_index,
2990 vdu_id,
2991 vdu_count_index,
2992 vdu_name,
2993 vnf_config_primitive,
2994 primitive_params,
2995 operationState,
2996 detailed_status,
2997 operationType,
2998 RO_nsr_id,
2999 RO_scaling_info)
3000 return self.SUBOPERATION_STATUS_NEW
3001 else:
3002 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3003 # or op_index (operationState != 'COMPLETED')
3004 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3005
3006 # Function to return execution_environment id
3007
3008 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3009 # TODO vdu_index_count
3010 for vca in vca_deployed_list:
3011 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3012 return vca["ee_id"]
3013
3014 async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor,
3015 vca_index, destroy_ee=True, exec_primitives=True):
3016 """
3017 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3018 :param logging_text:
3019 :param db_nslcmop:
3020 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3021 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3022 :param vca_index: index in the database _admin.deployed.VCA
3023 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3024 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3025 not executed properly
3026 :return: None or exception
3027 """
3028
3029 self.logger.debug(
3030 logging_text + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3031 vca_index, vca_deployed, config_descriptor, destroy_ee
3032 )
3033 )
3034
3035 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
3036
3037 # execute terminate_primitives
3038 if exec_primitives:
3039 terminate_primitives = self._get_terminate_config_primitive(
3040 config_descriptor.get("terminate-config-primitive"), vca_deployed)
3041 vdu_id = vca_deployed.get("vdu_id")
3042 vdu_count_index = vca_deployed.get("vdu_count_index")
3043 vdu_name = vca_deployed.get("vdu_name")
3044 vnf_index = vca_deployed.get("member-vnf-index")
3045 if terminate_primitives and vca_deployed.get("needed_terminate"):
3046 for seq in terminate_primitives:
3047 # For each sequence in list, get primitive and call _ns_execute_primitive()
3048 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
3049 vnf_index, seq.get("name"))
3050 self.logger.debug(logging_text + step)
3051 # Create the primitive for each sequence, i.e. "primitive": "touch"
3052 primitive = seq.get('name')
3053 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
3054
3055 # Add sub-operation
3056 self._add_suboperation(db_nslcmop,
3057 vnf_index,
3058 vdu_id,
3059 vdu_count_index,
3060 vdu_name,
3061 primitive,
3062 mapped_primitive_params)
3063 # Sub-operations: Call _ns_execute_primitive() instead of action()
3064 try:
3065 result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
3066 mapped_primitive_params,
3067 vca_type=vca_type)
3068 except LcmException:
3069 # this happens when VCA is not deployed. In this case it is not needed to terminate
3070 continue
3071 result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
3072 if result not in result_ok:
3073 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
3074 "error {}".format(seq.get("name"), vnf_index, result_detail))
3075 # set that this VCA do not need terminated
3076 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
3077 self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
3078
3079 if vca_deployed.get("prometheus_jobs") and self.prometheus:
3080 await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
3081
3082 if destroy_ee:
3083 await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"])
3084
3085 async def _delete_all_N2VC(self, db_nsr: dict):
3086 self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
3087 namespace = "." + db_nsr["_id"]
3088 try:
3089 await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
3090 except N2VCNotFound: # already deleted. Skip
3091 pass
3092 self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
3093
3094 async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
3095 """
3096 Terminates a deployment from RO
3097 :param logging_text:
3098 :param nsr_deployed: db_nsr._admin.deployed
3099 :param nsr_id:
3100 :param nslcmop_id:
3101 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
3102 this method will update only the index 2, but it will write on database the concatenated content of the list
3103 :return:
3104 """
3105 db_nsr_update = {}
3106 failed_detail = []
3107 ro_nsr_id = ro_delete_action = None
3108 if nsr_deployed and nsr_deployed.get("RO"):
3109 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
3110 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
3111 try:
3112 if ro_nsr_id:
3113 stage[2] = "Deleting ns from VIM."
3114 db_nsr_update["detailed-status"] = " ".join(stage)
3115 self._write_op_status(nslcmop_id, stage)
3116 self.logger.debug(logging_text + stage[2])
3117 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3118 self._write_op_status(nslcmop_id, stage)
3119 desc = await self.RO.delete("ns", ro_nsr_id)
3120 ro_delete_action = desc["action_id"]
3121 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
3122 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3123 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3124 if ro_delete_action:
3125 # wait until NS is deleted from VIM
3126 stage[2] = "Waiting ns deleted from VIM."
3127 detailed_status_old = None
3128 self.logger.debug(logging_text + stage[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id,
3129 ro_delete_action))
3130 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3131 self._write_op_status(nslcmop_id, stage)
3132
3133 delete_timeout = 20 * 60 # 20 minutes
3134 while delete_timeout > 0:
3135 desc = await self.RO.show(
3136 "ns",
3137 item_id_name=ro_nsr_id,
3138 extra_item="action",
3139 extra_item_id=ro_delete_action)
3140
3141 # deploymentStatus
3142 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3143
3144 ns_status, ns_status_info = self.RO.check_action_status(desc)
3145 if ns_status == "ERROR":
3146 raise ROclient.ROClientException(ns_status_info)
3147 elif ns_status == "BUILD":
3148 stage[2] = "Deleting from VIM {}".format(ns_status_info)
3149 elif ns_status == "ACTIVE":
3150 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3151 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3152 break
3153 else:
3154 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
3155 if stage[2] != detailed_status_old:
3156 detailed_status_old = stage[2]
3157 db_nsr_update["detailed-status"] = " ".join(stage)
3158 self._write_op_status(nslcmop_id, stage)
3159 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3160 await asyncio.sleep(5, loop=self.loop)
3161 delete_timeout -= 5
3162 else: # delete_timeout <= 0:
3163 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
3164
3165 except Exception as e:
3166 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3167 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3168 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3169 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3170 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3171 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id))
3172 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3173 failed_detail.append("delete conflict: {}".format(e))
3174 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
3175 else:
3176 failed_detail.append("delete error: {}".format(e))
3177 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e))
3178
3179 # Delete nsd
3180 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
3181 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
3182 try:
3183 stage[2] = "Deleting nsd from RO."
3184 db_nsr_update["detailed-status"] = " ".join(stage)
3185 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3186 self._write_op_status(nslcmop_id, stage)
3187 await self.RO.delete("nsd", ro_nsd_id)
3188 self.logger.debug(logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id))
3189 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3190 except Exception as e:
3191 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3192 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3193 self.logger.debug(logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id))
3194 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3195 failed_detail.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e))
3196 self.logger.debug(logging_text + failed_detail[-1])
3197 else:
3198 failed_detail.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id, e))
3199 self.logger.error(logging_text + failed_detail[-1])
3200
3201 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
3202 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
3203 if not vnf_deployed or not vnf_deployed["id"]:
3204 continue
3205 try:
3206 ro_vnfd_id = vnf_deployed["id"]
3207 stage[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
3208 vnf_deployed["member-vnf-index"], ro_vnfd_id)
3209 db_nsr_update["detailed-status"] = " ".join(stage)
3210 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3211 self._write_op_status(nslcmop_id, stage)
3212 await self.RO.delete("vnfd", ro_vnfd_id)
3213 self.logger.debug(logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id))
3214 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
3215 except Exception as e:
3216 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3217 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
3218 self.logger.debug(logging_text + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id))
3219 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3220 failed_detail.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e))
3221 self.logger.debug(logging_text + failed_detail[-1])
3222 else:
3223 failed_detail.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e))
3224 self.logger.error(logging_text + failed_detail[-1])
3225
3226 if failed_detail:
3227 stage[2] = "Error deleting from VIM"
3228 else:
3229 stage[2] = "Deleted from VIM"
3230 db_nsr_update["detailed-status"] = " ".join(stage)
3231 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3232 self._write_op_status(nslcmop_id, stage)
3233
3234 if failed_detail:
3235 raise LcmException("; ".join(failed_detail))
3236
3237 async def terminate(self, nsr_id, nslcmop_id):
3238 # Try to lock HA task here
3239 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3240 if not task_is_locked_by_me:
3241 return
3242
3243 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
3244 self.logger.debug(logging_text + "Enter")
3245 timeout_ns_terminate = self.timeout_ns_terminate
3246 db_nsr = None
3247 db_nslcmop = None
3248 operation_params = None
3249 exc = None
3250 error_list = [] # annotates all failed error messages
3251 db_nslcmop_update = {}
3252 autoremove = False # autoremove after terminated
3253 tasks_dict_info = {}
3254 db_nsr_update = {}
3255 stage = ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
3256 # ^ contains [stage, step, VIM-status]
3257 try:
3258 # wait for any previous tasks in process
3259 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
3260
3261 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
3262 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3263 operation_params = db_nslcmop.get("operationParams") or {}
3264 if operation_params.get("timeout_ns_terminate"):
3265 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
3266 stage[1] = "Getting nsr={} from db.".format(nsr_id)
3267 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3268
3269 db_nsr_update["operational-status"] = "terminating"
3270 db_nsr_update["config-status"] = "terminating"
3271 self._write_ns_status(
3272 nsr_id=nsr_id,
3273 ns_state="TERMINATING",
3274 current_operation="TERMINATING",
3275 current_operation_id=nslcmop_id,
3276 other_update=db_nsr_update
3277 )
3278 self._write_op_status(
3279 op_id=nslcmop_id,
3280 queuePosition=0,
3281 stage=stage
3282 )
3283 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
3284 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
3285 return
3286
3287 stage[1] = "Getting vnf descriptors from db."
3288 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
3289 db_vnfds_from_id = {}
3290 db_vnfds_from_member_index = {}
3291 # Loop over VNFRs
3292 for vnfr in db_vnfrs_list:
3293 vnfd_id = vnfr["vnfd-id"]
3294 if vnfd_id not in db_vnfds_from_id:
3295 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
3296 db_vnfds_from_id[vnfd_id] = vnfd
3297 db_vnfds_from_member_index[vnfr["member-vnf-index-ref"]] = db_vnfds_from_id[vnfd_id]
3298
3299 # Destroy individual execution environments when there are terminating primitives.
3300 # Rest of EE will be deleted at once
3301 # TODO - check before calling _destroy_N2VC
3302 # if not operation_params.get("skip_terminate_primitives"):#
3303 # or not vca.get("needed_terminate"):
3304 stage[0] = "Stage 2/3 execute terminating primitives."
3305 self.logger.debug(logging_text + stage[0])
3306 stage[1] = "Looking execution environment that needs terminate."
3307 self.logger.debug(logging_text + stage[1])
3308 # self.logger.debug("nsr_deployed: {}".format(nsr_deployed))
3309 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
3310 config_descriptor = None
3311 if not vca or not vca.get("ee_id"):
3312 continue
3313 if not vca.get("member-vnf-index"):
3314 # ns
3315 config_descriptor = db_nsr.get("ns-configuration")
3316 elif vca.get("vdu_id"):
3317 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3318 vdud = next((vdu for vdu in db_vnfd.get("vdu", ()) if vdu["id"] == vca.get("vdu_id")), None)
3319 if vdud:
3320 config_descriptor = vdud.get("vdu-configuration")
3321 elif vca.get("kdu_name"):
3322 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3323 kdud = next((kdu for kdu in db_vnfd.get("kdu", ()) if kdu["name"] == vca.get("kdu_name")), None)
3324 if kdud:
3325 config_descriptor = kdud.get("kdu-configuration")
3326 else:
3327 config_descriptor = db_vnfds_from_member_index[vca["member-vnf-index"]].get("vnf-configuration")
3328 vca_type = vca.get("type")
3329 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
3330 vca.get("needed_terminate"))
3331 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
3332 # pending native charms
3333 destroy_ee = True if vca_type in ("helm", "native_charm") else False
3334 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
3335 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
3336 task = asyncio.ensure_future(
3337 self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index,
3338 destroy_ee, exec_terminate_primitives))
3339 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
3340
3341 # wait for pending tasks of terminate primitives
3342 if tasks_dict_info:
3343 self.logger.debug(logging_text + 'Waiting for tasks {}'.format(list(tasks_dict_info.keys())))
3344 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
3345 min(self.timeout_charm_delete, timeout_ns_terminate),
3346 stage, nslcmop_id)
3347 tasks_dict_info.clear()
3348 if error_list:
3349 return # raise LcmException("; ".join(error_list))
3350
3351 # remove All execution environments at once
3352 stage[0] = "Stage 3/3 delete all."
3353
3354 if nsr_deployed.get("VCA"):
3355 stage[1] = "Deleting all execution environments."
3356 self.logger.debug(logging_text + stage[1])
3357 task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_all_N2VC(db_nsr=db_nsr),
3358 timeout=self.timeout_charm_delete))
3359 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3360 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
3361
3362 # Delete from k8scluster
3363 stage[1] = "Deleting KDUs."
3364 self.logger.debug(logging_text + stage[1])
3365 # print(nsr_deployed)
3366 for kdu in get_iterable(nsr_deployed, "K8s"):
3367 if not kdu or not kdu.get("kdu-instance"):
3368 continue
3369 kdu_instance = kdu.get("kdu-instance")
3370 if kdu.get("k8scluster-type") in self.k8scluster_map:
3371 task_delete_kdu_instance = asyncio.ensure_future(
3372 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
3373 cluster_uuid=kdu.get("k8scluster-uuid"),
3374 kdu_instance=kdu_instance))
3375 else:
3376 self.logger.error(logging_text + "Unknown k8s deployment type {}".
3377 format(kdu.get("k8scluster-type")))
3378 continue
3379 tasks_dict_info[task_delete_kdu_instance] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
3380
3381 # remove from RO
3382 stage[1] = "Deleting ns from VIM."
3383 if self.ng_ro:
3384 task_delete_ro = asyncio.ensure_future(
3385 self._terminate_ng_ro(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3386 else:
3387 task_delete_ro = asyncio.ensure_future(
3388 self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3389 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
3390
3391 # rest of staff will be done at finally
3392
3393 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
3394 self.logger.error(logging_text + "Exit Exception {}".format(e))
3395 exc = e
3396 except asyncio.CancelledError:
3397 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
3398 exc = "Operation was cancelled"
3399 except Exception as e:
3400 exc = traceback.format_exc()
3401 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
3402 finally:
3403 if exc:
3404 error_list.append(str(exc))
3405 try:
3406 # wait for pending tasks
3407 if tasks_dict_info:
3408 stage[1] = "Waiting for terminate pending tasks."
3409 self.logger.debug(logging_text + stage[1])
3410 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_terminate,
3411 stage, nslcmop_id)
3412 stage[1] = stage[2] = ""
3413 except asyncio.CancelledError:
3414 error_list.append("Cancelled")
3415 # TODO cancell all tasks
3416 except Exception as exc:
3417 error_list.append(str(exc))
3418 # update status at database
3419 if error_list:
3420 error_detail = "; ".join(error_list)
3421 # self.logger.error(logging_text + error_detail)
3422 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
3423 error_description_nsr = 'Operation: TERMINATING.{}, {}.'.format(nslcmop_id, stage[0])
3424
3425 db_nsr_update["operational-status"] = "failed"
3426 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
3427 db_nslcmop_update["detailed-status"] = error_detail
3428 nslcmop_operation_state = "FAILED"
3429 ns_state = "BROKEN"
3430 else:
3431 error_detail = None
3432 error_description_nsr = error_description_nslcmop = None
3433 ns_state = "NOT_INSTANTIATED"
3434 db_nsr_update["operational-status"] = "terminated"
3435 db_nsr_update["detailed-status"] = "Done"
3436 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
3437 db_nslcmop_update["detailed-status"] = "Done"
3438 nslcmop_operation_state = "COMPLETED"
3439
3440 if db_nsr:
3441 self._write_ns_status(
3442 nsr_id=nsr_id,
3443 ns_state=ns_state,
3444 current_operation="IDLE",
3445 current_operation_id=None,
3446 error_description=error_description_nsr,
3447 error_detail=error_detail,
3448 other_update=db_nsr_update
3449 )
3450 self._write_op_status(
3451 op_id=nslcmop_id,
3452 stage="",
3453 error_message=error_description_nslcmop,
3454 operation_state=nslcmop_operation_state,
3455 other_update=db_nslcmop_update,
3456 )
3457 if ns_state == "NOT_INSTANTIATED":
3458 try:
3459 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "NOT_INSTANTIATED"})
3460 except DbException as e:
3461 self.logger.warn(logging_text + 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
3462 format(nsr_id, e))
3463 if operation_params:
3464 autoremove = operation_params.get("autoremove", False)
3465 if nslcmop_operation_state:
3466 try:
3467 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3468 "operationState": nslcmop_operation_state,
3469 "autoremove": autoremove},
3470 loop=self.loop)
3471 except Exception as e:
3472 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3473
3474 self.logger.debug(logging_text + "Exit")
3475 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
3476
3477 async def _wait_for_tasks(self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None):
3478 time_start = time()
3479 error_detail_list = []
3480 error_list = []
3481 pending_tasks = list(created_tasks_info.keys())
3482 num_tasks = len(pending_tasks)
3483 num_done = 0
3484 stage[1] = "{}/{}.".format(num_done, num_tasks)
3485 self._write_op_status(nslcmop_id, stage)
3486 while pending_tasks:
3487 new_error = None
3488 _timeout = timeout + time_start - time()
3489 done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout,
3490 return_when=asyncio.FIRST_COMPLETED)
3491 num_done += len(done)
3492 if not done: # Timeout
3493 for task in pending_tasks:
3494 new_error = created_tasks_info[task] + ": Timeout"
3495 error_detail_list.append(new_error)
3496 error_list.append(new_error)
3497 break
3498 for task in done:
3499 if task.cancelled():
3500 exc = "Cancelled"
3501 else:
3502 exc = task.exception()
3503 if exc:
3504 if isinstance(exc, asyncio.TimeoutError):
3505 exc = "Timeout"
3506 new_error = created_tasks_info[task] + ": {}".format(exc)
3507 error_list.append(created_tasks_info[task])
3508 error_detail_list.append(new_error)
3509 if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException,
3510 K8sException)):
3511 self.logger.error(logging_text + new_error)
3512 else:
3513 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
3514 self.logger.error(logging_text + created_tasks_info[task] + exc_traceback)
3515 else:
3516 self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
3517 stage[1] = "{}/{}.".format(num_done, num_tasks)
3518 if new_error:
3519 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
3520 if nsr_id: # update also nsr
3521 self.update_db_2("nsrs", nsr_id, {"errorDescription": "Error at: " + ", ".join(error_list),
3522 "errorDetail": ". ".join(error_detail_list)})
3523 self._write_op_status(nslcmop_id, stage)
3524 return error_detail_list
3525
3526 def _map_primitive_params(self, primitive_desc, params, instantiation_params):
3527 """
3528 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3529 The default-value is used. If it is between < > it look for a value at instantiation_params
3530 :param primitive_desc: portion of VNFD/NSD that describes primitive
3531 :param params: Params provided by user
3532 :param instantiation_params: Instantiation params provided by user
3533 :return: a dictionary with the calculated params
3534 """
3535 calculated_params = {}
3536 for parameter in primitive_desc.get("parameter", ()):
3537 param_name = parameter["name"]
3538 if param_name in params:
3539 calculated_params[param_name] = params[param_name]
3540 elif "default-value" in parameter or "value" in parameter:
3541 if "value" in parameter:
3542 calculated_params[param_name] = parameter["value"]
3543 else:
3544 calculated_params[param_name] = parameter["default-value"]
3545 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
3546 and calculated_params[param_name].endswith(">"):
3547 if calculated_params[param_name][1:-1] in instantiation_params:
3548 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
3549 else:
3550 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3551 format(calculated_params[param_name], primitive_desc["name"]))
3552 else:
3553 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3554 format(param_name, primitive_desc["name"]))
3555
3556 if isinstance(calculated_params[param_name], (dict, list, tuple)):
3557 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
3558 width=256)
3559 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
3560 calculated_params[param_name] = calculated_params[param_name][7:]
3561 if parameter.get("data-type") == "INTEGER":
3562 try:
3563 calculated_params[param_name] = int(calculated_params[param_name])
3564 except ValueError: # error converting string to int
3565 raise LcmException(
3566 "Parameter {} of primitive {} must be integer".format(param_name, primitive_desc["name"]))
3567 elif parameter.get("data-type") == "BOOLEAN":
3568 calculated_params[param_name] = not ((str(calculated_params[param_name])).lower() == 'false')
3569
3570 # add always ns_config_info if primitive name is config
3571 if primitive_desc["name"] == "config":
3572 if "ns_config_info" in instantiation_params:
3573 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
3574 calculated_params["VCA"] = self.vca_config
3575 return calculated_params
3576
3577 def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None,
3578 ee_descriptor_id=None):
3579 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3580 for vca in deployed_vca:
3581 if not vca:
3582 continue
3583 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
3584 continue
3585 if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]:
3586 continue
3587 if kdu_name and kdu_name != vca["kdu_name"]:
3588 continue
3589 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
3590 continue
3591 break
3592 else:
3593 # vca_deployed not found
3594 raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
3595 " is not deployed".format(member_vnf_index, vdu_id, vdu_count_index, kdu_name,
3596 ee_descriptor_id))
3597
3598 # get ee_id
3599 ee_id = vca.get("ee_id")
3600 vca_type = vca.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3601 if not ee_id:
3602 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3603 "execution environment"
3604 .format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
3605 return ee_id, vca_type
3606
3607 async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0,
3608 retries_interval=30, timeout=None,
3609 vca_type=None, db_dict=None) -> (str, str):
3610 try:
3611 if primitive == "config":
3612 primitive_params = {"params": primitive_params}
3613
3614 vca_type = vca_type or "lxc_proxy_charm"
3615
3616 while retries >= 0:
3617 try:
3618 output = await asyncio.wait_for(
3619 self.vca_map[vca_type].exec_primitive(
3620 ee_id=ee_id,
3621 primitive_name=primitive,
3622 params_dict=primitive_params,
3623 progress_timeout=self.timeout_progress_primitive,
3624 total_timeout=self.timeout_primitive,
3625 db_dict=db_dict),
3626 timeout=timeout or self.timeout_primitive)
3627 # execution was OK
3628 break
3629 except asyncio.CancelledError:
3630 raise
3631 except Exception as e: # asyncio.TimeoutError
3632 if isinstance(e, asyncio.TimeoutError):
3633 e = "Timeout"
3634 retries -= 1
3635 if retries >= 0:
3636 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
3637 # wait and retry
3638 await asyncio.sleep(retries_interval, loop=self.loop)
3639 else:
3640 return 'FAILED', str(e)
3641
3642 return 'COMPLETED', output
3643
3644 except (LcmException, asyncio.CancelledError):
3645 raise
3646 except Exception as e:
3647 return 'FAIL', 'Error executing action {}: {}'.format(primitive, e)
3648
3649 async def action(self, nsr_id, nslcmop_id):
3650
3651 # Try to lock HA task here
3652 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3653 if not task_is_locked_by_me:
3654 return
3655
3656 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
3657 self.logger.debug(logging_text + "Enter")
3658 # get all needed from database
3659 db_nsr = None
3660 db_nslcmop = None
3661 db_nsr_update = {}
3662 db_nslcmop_update = {}
3663 nslcmop_operation_state = None
3664 error_description_nslcmop = None
3665 exc = None
3666 try:
3667 # wait for any previous tasks in process
3668 step = "Waiting for previous operations to terminate"
3669 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3670
3671 self._write_ns_status(
3672 nsr_id=nsr_id,
3673 ns_state=None,
3674 current_operation="RUNNING ACTION",
3675 current_operation_id=nslcmop_id
3676 )
3677
3678 step = "Getting information from database"
3679 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3680 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3681
3682 nsr_deployed = db_nsr["_admin"].get("deployed")
3683 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3684 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3685 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
3686 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3687 primitive = db_nslcmop["operationParams"]["primitive"]
3688 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
3689 timeout_ns_action = db_nslcmop["operationParams"].get("timeout_ns_action", self.timeout_primitive)
3690
3691 if vnf_index:
3692 step = "Getting vnfr from database"
3693 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3694 step = "Getting vnfd from database"
3695 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3696 else:
3697 step = "Getting nsd from database"
3698 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
3699
3700 # for backward compatibility
3701 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3702 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3703 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3704 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3705
3706 # look for primitive
3707 config_primitive_desc = descriptor_configuration = None
3708 if vdu_id:
3709 for vdu in get_iterable(db_vnfd, "vdu"):
3710 if vdu_id == vdu["id"]:
3711 descriptor_configuration = vdu.get("vdu-configuration")
3712 break
3713 elif kdu_name:
3714 for kdu in get_iterable(db_vnfd, "kdu"):
3715 if kdu_name == kdu["name"]:
3716 descriptor_configuration = kdu.get("kdu-configuration")
3717 break
3718 elif vnf_index:
3719 descriptor_configuration = db_vnfd.get("vnf-configuration")
3720 else:
3721 descriptor_configuration = db_nsd.get("ns-configuration")
3722
3723 if descriptor_configuration and descriptor_configuration.get("config-primitive"):
3724 for config_primitive in descriptor_configuration["config-primitive"]:
3725 if config_primitive["name"] == primitive:
3726 config_primitive_desc = config_primitive
3727 break
3728
3729 if not config_primitive_desc:
3730 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
3731 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3732 format(primitive))
3733 primitive_name = primitive
3734 ee_descriptor_id = None
3735 else:
3736 primitive_name = config_primitive_desc.get("execution-environment-primitive", primitive)
3737 ee_descriptor_id = config_primitive_desc.get("execution-environment-ref")
3738
3739 if vnf_index:
3740 if vdu_id:
3741 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3742 desc_params = self._format_additional_params(vdur.get("additionalParams"))
3743 elif kdu_name:
3744 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
3745 desc_params = self._format_additional_params(kdur.get("additionalParams"))
3746 else:
3747 desc_params = self._format_additional_params(db_vnfr.get("additionalParamsForVnf"))
3748 else:
3749 desc_params = self._format_additional_params(db_nsr.get("additionalParamsForNs"))
3750
3751 if kdu_name:
3752 kdu_action = True if not deep_get(kdu, ("kdu-configuration", "juju")) else False
3753
3754 # TODO check if ns is in a proper status
3755 if kdu_name and (primitive_name in ("upgrade", "rollback", "status") or kdu_action):
3756 # kdur and desc_params already set from before
3757 if primitive_params:
3758 desc_params.update(primitive_params)
3759 # TODO Check if we will need something at vnf level
3760 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
3761 if kdu_name == kdu["kdu-name"] and kdu["member-vnf-index"] == vnf_index:
3762 break
3763 else:
3764 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index))
3765
3766 if kdu.get("k8scluster-type") not in self.k8scluster_map:
3767 msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
3768 raise LcmException(msg)
3769
3770 db_dict = {"collection": "nsrs",
3771 "filter": {"_id": nsr_id},
3772 "path": "_admin.deployed.K8s.{}".format(index)}
3773 self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name))
3774 step = "Executing kdu {}".format(primitive_name)
3775 if primitive_name == "upgrade":
3776 if desc_params.get("kdu_model"):
3777 kdu_model = desc_params.get("kdu_model")
3778 del desc_params["kdu_model"]
3779 else:
3780 kdu_model = kdu.get("kdu-model")
3781 parts = kdu_model.split(sep=":")
3782 if len(parts) == 2:
3783 kdu_model = parts[0]
3784
3785 detailed_status = await asyncio.wait_for(
3786 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
3787 cluster_uuid=kdu.get("k8scluster-uuid"),
3788 kdu_instance=kdu.get("kdu-instance"),
3789 atomic=True, kdu_model=kdu_model,
3790 params=desc_params, db_dict=db_dict,
3791 timeout=timeout_ns_action),
3792 timeout=timeout_ns_action + 10)
3793 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(detailed_status))
3794 elif primitive_name == "rollback":
3795 detailed_status = await asyncio.wait_for(
3796 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
3797 cluster_uuid=kdu.get("k8scluster-uuid"),
3798 kdu_instance=kdu.get("kdu-instance"),
3799 db_dict=db_dict),
3800 timeout=timeout_ns_action)
3801 elif primitive_name == "status":
3802 detailed_status = await asyncio.wait_for(
3803 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
3804 cluster_uuid=kdu.get("k8scluster-uuid"),
3805 kdu_instance=kdu.get("kdu-instance")),
3806 timeout=timeout_ns_action)
3807 else:
3808 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(kdu["kdu-name"], nsr_id)
3809 params = self._map_primitive_params(config_primitive_desc, primitive_params, desc_params)
3810
3811 detailed_status = await asyncio.wait_for(
3812 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
3813 cluster_uuid=kdu.get("k8scluster-uuid"),
3814 kdu_instance=kdu_instance,
3815 primitive_name=primitive_name,
3816 params=params, db_dict=db_dict,
3817 timeout=timeout_ns_action),
3818 timeout=timeout_ns_action)
3819
3820 if detailed_status:
3821 nslcmop_operation_state = 'COMPLETED'
3822 else:
3823 detailed_status = ''
3824 nslcmop_operation_state = 'FAILED'
3825 else:
3826 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
3827 member_vnf_index=vnf_index,
3828 vdu_id=vdu_id,
3829 vdu_count_index=vdu_count_index,
3830 ee_descriptor_id=ee_descriptor_id)
3831 db_nslcmop_notif = {"collection": "nslcmops",
3832 "filter": {"_id": nslcmop_id},
3833 "path": "admin.VCA"}
3834 nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
3835 ee_id,
3836 primitive=primitive_name,
3837 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
3838 timeout=timeout_ns_action,
3839 vca_type=vca_type,
3840 db_dict=db_nslcmop_notif)
3841
3842 db_nslcmop_update["detailed-status"] = detailed_status
3843 error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else ""
3844 self.logger.debug(logging_text + " task Done with result {} {}".format(nslcmop_operation_state,
3845 detailed_status))
3846 return # database update is called inside finally
3847
3848 except (DbException, LcmException, N2VCException, K8sException) as e:
3849 self.logger.error(logging_text + "Exit Exception {}".format(e))
3850 exc = e
3851 except asyncio.CancelledError:
3852 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3853 exc = "Operation was cancelled"
3854 except asyncio.TimeoutError:
3855 self.logger.error(logging_text + "Timeout while '{}'".format(step))
3856 exc = "Timeout"
3857 except Exception as e:
3858 exc = traceback.format_exc()
3859 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3860 finally:
3861 if exc:
3862 db_nslcmop_update["detailed-status"] = detailed_status = error_description_nslcmop = \
3863 "FAILED {}: {}".format(step, exc)
3864 nslcmop_operation_state = "FAILED"
3865 if db_nsr:
3866 self._write_ns_status(
3867 nsr_id=nsr_id,
3868 ns_state=db_nsr["nsState"], # TODO check if degraded. For the moment use previous status
3869 current_operation="IDLE",
3870 current_operation_id=None,
3871 # error_description=error_description_nsr,
3872 # error_detail=error_detail,
3873 other_update=db_nsr_update
3874 )
3875
3876 self._write_op_status(
3877 op_id=nslcmop_id,
3878 stage="",
3879 error_message=error_description_nslcmop,
3880 operation_state=nslcmop_operation_state,
3881 other_update=db_nslcmop_update,
3882 )
3883
3884 if nslcmop_operation_state:
3885 try:
3886 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3887 "operationState": nslcmop_operation_state},
3888 loop=self.loop)
3889 except Exception as e:
3890 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3891 self.logger.debug(logging_text + "Exit")
3892 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
3893 return nslcmop_operation_state, detailed_status
3894
3895 async def scale(self, nsr_id, nslcmop_id):
3896
3897 # Try to lock HA task here
3898 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3899 if not task_is_locked_by_me:
3900 return
3901
3902 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
3903 self.logger.debug(logging_text + "Enter")
3904 # get all needed from database
3905 db_nsr = None
3906 db_nslcmop = None
3907 db_nslcmop_update = {}
3908 nslcmop_operation_state = None
3909 db_nsr_update = {}
3910 exc = None
3911 # in case of error, indicates what part of scale was failed to put nsr at error status
3912 scale_process = None
3913 old_operational_status = ""
3914 old_config_status = ""
3915 try:
3916 # wait for any previous tasks in process
3917 step = "Waiting for previous operations to terminate"
3918 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3919
3920 self._write_ns_status(
3921 nsr_id=nsr_id,
3922 ns_state=None,
3923 current_operation="SCALING",
3924 current_operation_id=nslcmop_id
3925 )
3926
3927 step = "Getting nslcmop from database"
3928 self.logger.debug(step + " after having waited for previous tasks to be completed")
3929 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3930 step = "Getting nsr from database"
3931 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3932
3933 old_operational_status = db_nsr["operational-status"]
3934 old_config_status = db_nsr["config-status"]
3935 step = "Parsing scaling parameters"
3936 # self.logger.debug(step)
3937 db_nsr_update["operational-status"] = "scaling"
3938 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3939 nsr_deployed = db_nsr["_admin"].get("deployed")
3940
3941 #######
3942 nsr_deployed = db_nsr["_admin"].get("deployed")
3943 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3944 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3945 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3946 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3947 #######
3948
3949 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
3950 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3951 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3952 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3953 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
3954
3955 # for backward compatibility
3956 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3957 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3958 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3959 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3960
3961 step = "Getting vnfr from database"
3962 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3963 step = "Getting vnfd from database"
3964 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3965
3966 step = "Getting scaling-group-descriptor"
3967 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
3968 if scaling_descriptor["name"] == scaling_group:
3969 break
3970 else:
3971 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3972 "at vnfd:scaling-group-descriptor".format(scaling_group))
3973
3974 # cooldown_time = 0
3975 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
3976 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
3977 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
3978 # break
3979
3980 # TODO check if ns is in a proper status
3981 step = "Sending scale order to VIM"
3982 nb_scale_op = 0
3983 if not db_nsr["_admin"].get("scaling-group"):
3984 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
3985 admin_scale_index = 0
3986 else:
3987 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
3988 if admin_scale_info["name"] == scaling_group:
3989 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
3990 break
3991 else: # not found, set index one plus last element and add new entry with the name
3992 admin_scale_index += 1
3993 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
3994 RO_scaling_info = []
3995 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
3996 if scaling_type == "SCALE_OUT":
3997 # count if max-instance-count is reached
3998 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
3999 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
4000 if nb_scale_op >= max_instance_count:
4001 raise LcmException("reached the limit of {} (max-instance-count) "
4002 "scaling-out operations for the "
4003 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
4004
4005 nb_scale_op += 1
4006 vdu_scaling_info["scaling_direction"] = "OUT"
4007 vdu_scaling_info["vdu-create"] = {}
4008 for vdu_scale_info in scaling_descriptor["vdu"]:
4009 vdud = next(vdu for vdu in db_vnfd.get("vdu") if vdu["id"] == vdu_scale_info["vdu-id-ref"])
4010 vdu_index = len([x for x in db_vnfr.get("vdur", ())
4011 if x.get("vdu-id-ref") == vdu_scale_info["vdu-id-ref"] and
4012 x.get("member-vnf-index-ref") == vnf_index])
4013 cloud_init_text = self._get_cloud_init(vdud, db_vnfd)
4014 if cloud_init_text:
4015 additional_params = self._get_vdu_additional_params(db_vnfr, vdud["id"]) or {}
4016 cloud_init_list = []
4017 for x in range(vdu_scale_info.get("count", 1)):
4018 if cloud_init_text:
4019 # TODO Information of its own ip is not available because db_vnfr is not updated.
4020 additional_params["OSM"] = self._get_osm_params(db_vnfr, vdu_scale_info["vdu-id-ref"],
4021 vdu_index + x)
4022 cloud_init_list.append(self._parse_cloud_init(cloud_init_text, additional_params,
4023 db_vnfd["id"], vdud["id"]))
4024 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
4025 "type": "create", "count": vdu_scale_info.get("count", 1)})
4026 if cloud_init_list:
4027 RO_scaling_info[-1]["cloud_init"] = cloud_init_list
4028 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
4029
4030 elif scaling_type == "SCALE_IN":
4031 # count if min-instance-count is reached
4032 min_instance_count = 0
4033 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
4034 min_instance_count = int(scaling_descriptor["min-instance-count"])
4035 if nb_scale_op <= min_instance_count:
4036 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
4037 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
4038 nb_scale_op -= 1
4039 vdu_scaling_info["scaling_direction"] = "IN"
4040 vdu_scaling_info["vdu-delete"] = {}
4041 for vdu_scale_info in scaling_descriptor["vdu"]:
4042 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
4043 "type": "delete", "count": vdu_scale_info.get("count", 1)})
4044 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
4045
4046 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
4047 vdu_create = vdu_scaling_info.get("vdu-create")
4048 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
4049 if vdu_scaling_info["scaling_direction"] == "IN":
4050 for vdur in reversed(db_vnfr["vdur"]):
4051 if vdu_delete.get(vdur["vdu-id-ref"]):
4052 vdu_delete[vdur["vdu-id-ref"]] -= 1
4053 vdu_scaling_info["vdu"].append({
4054 "name": vdur["name"],
4055 "vdu_id": vdur["vdu-id-ref"],
4056 "interface": []
4057 })
4058 for interface in vdur["interfaces"]:
4059 vdu_scaling_info["vdu"][-1]["interface"].append({
4060 "name": interface["name"],
4061 "ip_address": interface["ip-address"],
4062 "mac_address": interface.get("mac-address"),
4063 })
4064 vdu_delete = vdu_scaling_info.pop("vdu-delete")
4065
4066 # PRE-SCALE BEGIN
4067 step = "Executing pre-scale vnf-config-primitive"
4068 if scaling_descriptor.get("scaling-config-action"):
4069 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
4070 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
4071 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
4072 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
4073 step = db_nslcmop_update["detailed-status"] = \
4074 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
4075
4076 # look for primitive
4077 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
4078 if config_primitive["name"] == vnf_config_primitive:
4079 break
4080 else:
4081 raise LcmException(
4082 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
4083 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
4084 "primitive".format(scaling_group, vnf_config_primitive))
4085
4086 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
4087 if db_vnfr.get("additionalParamsForVnf"):
4088 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
4089
4090 scale_process = "VCA"
4091 db_nsr_update["config-status"] = "configuring pre-scaling"
4092 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
4093
4094 # Pre-scale retry check: Check if this sub-operation has been executed before
4095 op_index = self._check_or_add_scale_suboperation(
4096 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
4097 if op_index == self.SUBOPERATION_STATUS_SKIP:
4098 # Skip sub-operation
4099 result = 'COMPLETED'
4100 result_detail = 'Done'
4101 self.logger.debug(logging_text +
4102 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
4103 vnf_config_primitive, result, result_detail))
4104 else:
4105 if op_index == self.SUBOPERATION_STATUS_NEW:
4106 # New sub-operation: Get index of this sub-operation
4107 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4108 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
4109 format(vnf_config_primitive))
4110 else:
4111 # retry: Get registered params for this existing sub-operation
4112 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4113 vnf_index = op.get('member_vnf_index')
4114 vnf_config_primitive = op.get('primitive')
4115 primitive_params = op.get('primitive_params')
4116 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
4117 format(vnf_config_primitive))
4118 # Execute the primitive, either with new (first-time) or registered (reintent) args
4119 ee_descriptor_id = config_primitive.get("execution-environment-ref")
4120 primitive_name = config_primitive.get("execution-environment-primitive",
4121 vnf_config_primitive)
4122 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
4123 member_vnf_index=vnf_index,
4124 vdu_id=None,
4125 vdu_count_index=None,
4126 ee_descriptor_id=ee_descriptor_id)
4127 result, result_detail = await self._ns_execute_primitive(
4128 ee_id, primitive_name, primitive_params, vca_type)
4129 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
4130 vnf_config_primitive, result, result_detail))
4131 # Update operationState = COMPLETED | FAILED
4132 self._update_suboperation_status(
4133 db_nslcmop, op_index, result, result_detail)
4134
4135 if result == "FAILED":
4136 raise LcmException(result_detail)
4137 db_nsr_update["config-status"] = old_config_status
4138 scale_process = None
4139 # PRE-SCALE END
4140
4141 # SCALE RO - BEGIN
4142 # Should this block be skipped if 'RO_nsr_id' == None ?
4143 # if (RO_nsr_id and RO_scaling_info):
4144 if RO_scaling_info:
4145 scale_process = "RO"
4146 # Scale RO retry check: Check if this sub-operation has been executed before
4147 op_index = self._check_or_add_scale_suboperation(
4148 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
4149 if op_index == self.SUBOPERATION_STATUS_SKIP:
4150 # Skip sub-operation
4151 result = 'COMPLETED'
4152 result_detail = 'Done'
4153 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
4154 result, result_detail))
4155 else:
4156 if op_index == self.SUBOPERATION_STATUS_NEW:
4157 # New sub-operation: Get index of this sub-operation
4158 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4159 self.logger.debug(logging_text + "New sub-operation RO")
4160 else:
4161 # retry: Get registered params for this existing sub-operation
4162 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4163 RO_nsr_id = op.get('RO_nsr_id')
4164 RO_scaling_info = op.get('RO_scaling_info')
4165 self.logger.debug(logging_text + "Sub-operation RO retry for primitive {}".format(
4166 vnf_config_primitive))
4167
4168 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
4169 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
4170 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
4171 # wait until ready
4172 RO_nslcmop_id = RO_desc["instance_action_id"]
4173 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
4174
4175 RO_task_done = False
4176 step = detailed_status = "Waiting for VIM to scale. RO_task_id={}.".format(RO_nslcmop_id)
4177 detailed_status_old = None
4178 self.logger.debug(logging_text + step)
4179
4180 deployment_timeout = 1 * 3600 # One hour
4181 while deployment_timeout > 0:
4182 if not RO_task_done:
4183 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
4184 extra_item_id=RO_nslcmop_id)
4185
4186 # deploymentStatus
4187 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4188
4189 ns_status, ns_status_info = self.RO.check_action_status(desc)
4190 if ns_status == "ERROR":
4191 raise ROclient.ROClientException(ns_status_info)
4192 elif ns_status == "BUILD":
4193 detailed_status = step + "; {}".format(ns_status_info)
4194 elif ns_status == "ACTIVE":
4195 RO_task_done = True
4196 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
4197 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
4198 self.logger.debug(logging_text + step)
4199 else:
4200 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
4201 else:
4202 desc = await self.RO.show("ns", RO_nsr_id)
4203 ns_status, ns_status_info = self.RO.check_ns_status(desc)
4204 # deploymentStatus
4205 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4206
4207 if ns_status == "ERROR":
4208 raise ROclient.ROClientException(ns_status_info)
4209 elif ns_status == "BUILD":
4210 detailed_status = step + "; {}".format(ns_status_info)
4211 elif ns_status == "ACTIVE":
4212 step = detailed_status = \
4213 "Waiting for management IP address reported by the VIM. Updating VNFRs"
4214 try:
4215 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
4216 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
4217 break
4218 except LcmExceptionNoMgmtIP:
4219 pass
4220 else:
4221 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
4222 if detailed_status != detailed_status_old:
4223 self._update_suboperation_status(
4224 db_nslcmop, op_index, 'COMPLETED', detailed_status)
4225 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
4226 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
4227
4228 await asyncio.sleep(5, loop=self.loop)
4229 deployment_timeout -= 5
4230 if deployment_timeout <= 0:
4231 self._update_suboperation_status(
4232 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
4233 raise ROclient.ROClientException("Timeout waiting ns to be ready")
4234
4235 # update VDU_SCALING_INFO with the obtained ip_addresses
4236 if vdu_scaling_info["scaling_direction"] == "OUT":
4237 for vdur in reversed(db_vnfr["vdur"]):
4238 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
4239 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
4240 vdu_scaling_info["vdu"].append({
4241 "name": vdur["name"],
4242 "vdu_id": vdur["vdu-id-ref"],
4243 "interface": []
4244 })
4245 for interface in vdur["interfaces"]:
4246 vdu_scaling_info["vdu"][-1]["interface"].append({
4247 "name": interface["name"],
4248 "ip_address": interface["ip-address"],
4249 "mac_address": interface.get("mac-address"),
4250 })
4251 del vdu_scaling_info["vdu-create"]
4252
4253 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
4254 # SCALE RO - END
4255
4256 scale_process = None
4257 if db_nsr_update:
4258 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4259
4260 # POST-SCALE BEGIN
4261 # execute primitive service POST-SCALING
4262 step = "Executing post-scale vnf-config-primitive"
4263 if scaling_descriptor.get("scaling-config-action"):
4264 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
4265 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
4266 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
4267 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
4268 step = db_nslcmop_update["detailed-status"] = \
4269 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
4270
4271 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
4272 if db_vnfr.get("additionalParamsForVnf"):
4273 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
4274
4275 # look for primitive
4276 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
4277 if config_primitive["name"] == vnf_config_primitive:
4278 break
4279 else:
4280 raise LcmException(
4281 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
4282 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
4283 "config-primitive".format(scaling_group, vnf_config_primitive))
4284 scale_process = "VCA"
4285 db_nsr_update["config-status"] = "configuring post-scaling"
4286 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
4287
4288 # Post-scale retry check: Check if this sub-operation has been executed before
4289 op_index = self._check_or_add_scale_suboperation(
4290 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
4291 if op_index == self.SUBOPERATION_STATUS_SKIP:
4292 # Skip sub-operation
4293 result = 'COMPLETED'
4294 result_detail = 'Done'
4295 self.logger.debug(logging_text +
4296 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
4297 format(vnf_config_primitive, result, result_detail))
4298 else:
4299 if op_index == self.SUBOPERATION_STATUS_NEW:
4300 # New sub-operation: Get index of this sub-operation
4301 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4302 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
4303 format(vnf_config_primitive))
4304 else:
4305 # retry: Get registered params for this existing sub-operation
4306 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4307 vnf_index = op.get('member_vnf_index')
4308 vnf_config_primitive = op.get('primitive')
4309 primitive_params = op.get('primitive_params')
4310 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
4311 format(vnf_config_primitive))
4312 # Execute the primitive, either with new (first-time) or registered (reintent) args
4313 ee_descriptor_id = config_primitive.get("execution-environment-ref")
4314 primitive_name = config_primitive.get("execution-environment-primitive",
4315 vnf_config_primitive)
4316 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
4317 member_vnf_index=vnf_index,
4318 vdu_id=None,
4319 vdu_count_index=None,
4320 ee_descriptor_id=ee_descriptor_id)
4321 result, result_detail = await self._ns_execute_primitive(
4322 ee_id, primitive_name, primitive_params, vca_type)
4323 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
4324 vnf_config_primitive, result, result_detail))
4325 # Update operationState = COMPLETED | FAILED
4326 self._update_suboperation_status(
4327 db_nslcmop, op_index, result, result_detail)
4328
4329 if result == "FAILED":
4330 raise LcmException(result_detail)
4331 db_nsr_update["config-status"] = old_config_status
4332 scale_process = None
4333 # POST-SCALE END
4334
4335 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
4336 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
4337 else old_operational_status
4338 db_nsr_update["config-status"] = old_config_status
4339 return
4340 except (ROclient.ROClientException, DbException, LcmException) as e:
4341 self.logger.error(logging_text + "Exit Exception {}".format(e))
4342 exc = e
4343 except asyncio.CancelledError:
4344 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
4345 exc = "Operation was cancelled"
4346 except Exception as e:
4347 exc = traceback.format_exc()
4348 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
4349 finally:
4350 self._write_ns_status(
4351 nsr_id=nsr_id,
4352 ns_state=None,
4353 current_operation="IDLE",
4354 current_operation_id=None
4355 )
4356 if exc:
4357 db_nslcmop_update["detailed-status"] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
4358 nslcmop_operation_state = "FAILED"
4359 if db_nsr:
4360 db_nsr_update["operational-status"] = old_operational_status
4361 db_nsr_update["config-status"] = old_config_status
4362 db_nsr_update["detailed-status"] = ""
4363 if scale_process:
4364 if "VCA" in scale_process:
4365 db_nsr_update["config-status"] = "failed"
4366 if "RO" in scale_process:
4367 db_nsr_update["operational-status"] = "failed"
4368 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
4369 exc)
4370 else:
4371 error_description_nslcmop = None
4372 nslcmop_operation_state = "COMPLETED"
4373 db_nslcmop_update["detailed-status"] = "Done"
4374
4375 self._write_op_status(
4376 op_id=nslcmop_id,
4377 stage="",
4378 error_message=error_description_nslcmop,
4379 operation_state=nslcmop_operation_state,
4380 other_update=db_nslcmop_update,
4381 )
4382 if db_nsr:
4383 self._write_ns_status(
4384 nsr_id=nsr_id,
4385 ns_state=None,
4386 current_operation="IDLE",
4387 current_operation_id=None,
4388 other_update=db_nsr_update
4389 )
4390
4391 if nslcmop_operation_state:
4392 try:
4393 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
4394 "operationState": nslcmop_operation_state},
4395 loop=self.loop)
4396 # if cooldown_time:
4397 # await asyncio.sleep(cooldown_time, loop=self.loop)
4398 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
4399 except Exception as e:
4400 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
4401 self.logger.debug(logging_text + "Exit")
4402 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
4403
4404 async def add_prometheus_metrics(self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip):
4405 if not self.prometheus:
4406 return
4407 # look if exist a file called 'prometheus*.j2' and
4408 artifact_content = self.fs.dir_ls(artifact_path)
4409 job_file = next((f for f in artifact_content if f.startswith("prometheus") and f.endswith(".j2")), None)
4410 if not job_file:
4411 return
4412 with self.fs.file_open((artifact_path, job_file), "r") as f:
4413 job_data = f.read()
4414
4415 # TODO get_service
4416 _, _, service = ee_id.partition(".") # remove prefix "namespace."
4417 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
4418 host_port = "80"
4419 vnfr_id = vnfr_id.replace("-", "")
4420 variables = {
4421 "JOB_NAME": vnfr_id,
4422 "TARGET_IP": target_ip,
4423 "EXPORTER_POD_IP": host_name,
4424 "EXPORTER_POD_PORT": host_port,
4425 }
4426 job_list = self.prometheus.parse_job(job_data, variables)
4427 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
4428 for job in job_list:
4429 if not isinstance(job.get("job_name"), str) or vnfr_id not in job["job_name"]:
4430 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
4431 job["nsr_id"] = nsr_id
4432 job_dict = {jl["job_name"]: jl for jl in job_list}
4433 if await self.prometheus.update(job_dict):
4434 return list(job_dict.keys())