Added support for helm v3
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, TemplateError, TemplateNotFound, StrictUndefined, UndefinedError
26
27 from osm_lcm import ROclient
28 from osm_lcm.ng_ro import NgRoClient, NgRoException
29 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
30 from n2vc.k8s_helm_conn import K8sHelmConnector
31 from n2vc.k8s_helm3_conn import K8sHelm3Connector
32 from n2vc.k8s_juju_conn import K8sJujuConnector
33
34 from osm_common.dbbase import DbException
35 from osm_common.fsbase import FsException
36
37 from n2vc.n2vc_juju_conn import N2VCJujuConnector
38 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
39
40 from osm_lcm.lcm_helm_conn import LCMHelmConn
41
42 from copy import copy, deepcopy
43 from http import HTTPStatus
44 from time import time
45 from uuid import uuid4
46
47 from random import randint
48
49 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
50
51
52 class N2VCJujuConnectorLCM(N2VCJujuConnector):
53
54 async def create_execution_environment(self, namespace: str, db_dict: dict, reuse_ee_id: str = None,
55 progress_timeout: float = None, total_timeout: float = None,
56 config: dict = None, artifact_path: str = None,
57 vca_type: str = None) -> (str, dict):
58 # admit two new parameters, artifact_path and vca_type
59 if vca_type == "k8s_proxy_charm":
60 ee_id = await self.install_k8s_proxy_charm(
61 charm_name=artifact_path[artifact_path.rfind("/") + 1:],
62 namespace=namespace,
63 artifact_path=artifact_path,
64 db_dict=db_dict)
65 return ee_id, None
66 else:
67 return await super().create_execution_environment(
68 namespace=namespace, db_dict=db_dict, reuse_ee_id=reuse_ee_id,
69 progress_timeout=progress_timeout, total_timeout=total_timeout)
70
71 async def install_configuration_sw(self, ee_id: str, artifact_path: str, db_dict: dict,
72 progress_timeout: float = None, total_timeout: float = None,
73 config: dict = None, num_units: int = 1, vca_type: str = "lxc_proxy_charm"):
74 if vca_type == "k8s_proxy_charm":
75 return
76 return await super().install_configuration_sw(
77 ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict, progress_timeout=progress_timeout,
78 total_timeout=total_timeout, config=config, num_units=num_units)
79
80
81 class NsLcm(LcmBase):
82 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
83 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
84 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
85 timeout_charm_delete = 10 * 60
86 timeout_primitive = 30 * 60 # timeout for primitive execution
87 timeout_progress_primitive = 10 * 60 # timeout for some progress in a primitive execution
88
89 SUBOPERATION_STATUS_NOT_FOUND = -1
90 SUBOPERATION_STATUS_NEW = -2
91 SUBOPERATION_STATUS_SKIP = -3
92 task_name_deploy_vca = "Deploying VCA"
93
94 def __init__(self, db, msg, fs, lcm_tasks, config, loop, prometheus=None):
95 """
96 Init, Connect to database, filesystem storage, and messaging
97 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
98 :return: None
99 """
100 super().__init__(
101 db=db,
102 msg=msg,
103 fs=fs,
104 logger=logging.getLogger('lcm.ns')
105 )
106
107 self.loop = loop
108 self.lcm_tasks = lcm_tasks
109 self.timeout = config["timeout"]
110 self.ro_config = config["ro_config"]
111 self.ng_ro = config["ro_config"].get("ng")
112 self.vca_config = config["VCA"].copy()
113
114 # create N2VC connector
115 self.n2vc = N2VCJujuConnectorLCM(
116 db=self.db,
117 fs=self.fs,
118 log=self.logger,
119 loop=self.loop,
120 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
121 username=self.vca_config.get('user', None),
122 vca_config=self.vca_config,
123 on_update_db=self._on_update_n2vc_db
124 )
125
126 self.conn_helm_ee = LCMHelmConn(
127 db=self.db,
128 fs=self.fs,
129 log=self.logger,
130 loop=self.loop,
131 url=None,
132 username=None,
133 vca_config=self.vca_config,
134 on_update_db=self._on_update_n2vc_db
135 )
136
137 self.k8sclusterhelm2 = K8sHelmConnector(
138 kubectl_command=self.vca_config.get("kubectlpath"),
139 helm_command=self.vca_config.get("helmpath"),
140 fs=self.fs,
141 log=self.logger,
142 db=self.db,
143 on_update_db=None,
144 )
145
146 self.k8sclusterhelm3 = K8sHelm3Connector(
147 kubectl_command=self.vca_config.get("kubectlpath"),
148 helm_command=self.vca_config.get("helm3path"),
149 fs=self.fs,
150 log=self.logger,
151 db=self.db,
152 on_update_db=None,
153 )
154
155 self.k8sclusterjuju = K8sJujuConnector(
156 kubectl_command=self.vca_config.get("kubectlpath"),
157 juju_command=self.vca_config.get("jujupath"),
158 fs=self.fs,
159 log=self.logger,
160 db=self.db,
161 loop=self.loop,
162 on_update_db=None,
163 vca_config=self.vca_config,
164 )
165
166 self.k8scluster_map = {
167 "helm-chart": self.k8sclusterhelm2,
168 "helm-chart-v3": self.k8sclusterhelm3,
169 "chart": self.k8sclusterhelm3,
170 "juju-bundle": self.k8sclusterjuju,
171 "juju": self.k8sclusterjuju,
172 }
173
174 self.vca_map = {
175 "lxc_proxy_charm": self.n2vc,
176 "native_charm": self.n2vc,
177 "k8s_proxy_charm": self.n2vc,
178 "helm": self.conn_helm_ee,
179 "helm-v3": self.conn_helm_ee
180 }
181
182 self.prometheus = prometheus
183
184 # create RO client
185 if self.ng_ro:
186 self.RO = NgRoClient(self.loop, **self.ro_config)
187 else:
188 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
189
190 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
191
192 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
193
194 try:
195 # TODO filter RO descriptor fields...
196
197 # write to database
198 db_dict = dict()
199 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
200 db_dict['deploymentStatus'] = ro_descriptor
201 self.update_db_2("nsrs", nsrs_id, db_dict)
202
203 except Exception as e:
204 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
205
206 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
207
208 # remove last dot from path (if exists)
209 if path.endswith('.'):
210 path = path[:-1]
211
212 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
213 # .format(table, filter, path, updated_data))
214
215 try:
216
217 nsr_id = filter.get('_id')
218
219 # read ns record from database
220 nsr = self.db.get_one(table='nsrs', q_filter=filter)
221 current_ns_status = nsr.get('nsState')
222
223 # get vca status for NS
224 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
225
226 # vcaStatus
227 db_dict = dict()
228 db_dict['vcaStatus'] = status_dict
229
230 # update configurationStatus for this VCA
231 try:
232 vca_index = int(path[path.rfind(".")+1:])
233
234 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
235 vca_status = vca_list[vca_index].get('status')
236
237 configuration_status_list = nsr.get('configurationStatus')
238 config_status = configuration_status_list[vca_index].get('status')
239
240 if config_status == 'BROKEN' and vca_status != 'failed':
241 db_dict['configurationStatus'][vca_index] = 'READY'
242 elif config_status != 'BROKEN' and vca_status == 'failed':
243 db_dict['configurationStatus'][vca_index] = 'BROKEN'
244 except Exception as e:
245 # not update configurationStatus
246 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
247
248 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
249 # if nsState = 'DEGRADED' check if all is OK
250 is_degraded = False
251 if current_ns_status in ('READY', 'DEGRADED'):
252 error_description = ''
253 # check machines
254 if status_dict.get('machines'):
255 for machine_id in status_dict.get('machines'):
256 machine = status_dict.get('machines').get(machine_id)
257 # check machine agent-status
258 if machine.get('agent-status'):
259 s = machine.get('agent-status').get('status')
260 if s != 'started':
261 is_degraded = True
262 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
263 # check machine instance status
264 if machine.get('instance-status'):
265 s = machine.get('instance-status').get('status')
266 if s != 'running':
267 is_degraded = True
268 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
269 # check applications
270 if status_dict.get('applications'):
271 for app_id in status_dict.get('applications'):
272 app = status_dict.get('applications').get(app_id)
273 # check application status
274 if app.get('status'):
275 s = app.get('status').get('status')
276 if s != 'active':
277 is_degraded = True
278 error_description += 'application {} status={} ; '.format(app_id, s)
279
280 if error_description:
281 db_dict['errorDescription'] = error_description
282 if current_ns_status == 'READY' and is_degraded:
283 db_dict['nsState'] = 'DEGRADED'
284 if current_ns_status == 'DEGRADED' and not is_degraded:
285 db_dict['nsState'] = 'READY'
286
287 # write to database
288 self.update_db_2("nsrs", nsr_id, db_dict)
289
290 except (asyncio.CancelledError, asyncio.TimeoutError):
291 raise
292 except Exception as e:
293 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
294
295 @staticmethod
296 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
297 try:
298 env = Environment(undefined=StrictUndefined)
299 template = env.from_string(cloud_init_text)
300 return template.render(additional_params or {})
301 except UndefinedError as e:
302 raise LcmException("Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
303 "file, must be provided in the instantiation parameters inside the "
304 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id))
305 except (TemplateError, TemplateNotFound) as e:
306 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
307 format(vnfd_id, vdu_id, e))
308
309 def _get_cloud_init(self, vdu, vnfd):
310 try:
311 cloud_init_content = cloud_init_file = None
312 if vdu.get("cloud-init-file"):
313 base_folder = vnfd["_admin"]["storage"]
314 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
315 vdu["cloud-init-file"])
316 with self.fs.file_open(cloud_init_file, "r") as ci_file:
317 cloud_init_content = ci_file.read()
318 elif vdu.get("cloud-init"):
319 cloud_init_content = vdu["cloud-init"]
320
321 return cloud_init_content
322 except FsException as e:
323 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
324 format(vnfd["id"], vdu["id"], cloud_init_file, e))
325
326 def _get_osm_params(self, db_vnfr, vdu_id=None, vdu_count_index=0):
327 osm_params = {x.replace("-", "_"): db_vnfr[x] for x in ("ip-address", "vim-account-id", "vnfd-id", "vnfd-ref")
328 if db_vnfr.get(x) is not None}
329 osm_params["ns_id"] = db_vnfr["nsr-id-ref"]
330 osm_params["vnf_id"] = db_vnfr["_id"]
331 osm_params["member_vnf_index"] = db_vnfr["member-vnf-index-ref"]
332 if db_vnfr.get("vdur"):
333 osm_params["vdu"] = {}
334 for vdur in db_vnfr["vdur"]:
335 vdu = {
336 "count_index": vdur["count-index"],
337 "vdu_id": vdur["vdu-id-ref"],
338 "interfaces": {}
339 }
340 if vdur.get("ip-address"):
341 vdu["ip_address"] = vdur["ip-address"]
342 for iface in vdur["interfaces"]:
343 vdu["interfaces"][iface["name"]] = \
344 {x.replace("-", "_"): iface[x] for x in ("mac-address", "ip-address", "vnf-vld-id", "name")
345 if iface.get(x) is not None}
346 vdu_id_index = "{}-{}".format(vdur["vdu-id-ref"], vdur["count-index"])
347 osm_params["vdu"][vdu_id_index] = vdu
348 if vdu_id:
349 osm_params["vdu_id"] = vdu_id
350 osm_params["count_index"] = vdu_count_index
351 return osm_params
352
353 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
354 vdur = next(vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"])
355 additional_params = vdur.get("additionalParams")
356 return self._format_additional_params(additional_params)
357
358 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
359 """
360 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
361 :param vnfd: input vnfd
362 :param new_id: overrides vnf id if provided
363 :param additionalParams: Instantiation params for VNFs provided
364 :param nsrId: Id of the NSR
365 :return: copy of vnfd
366 """
367 vnfd_RO = deepcopy(vnfd)
368 # remove unused by RO configuration, monitoring, scaling and internal keys
369 vnfd_RO.pop("_id", None)
370 vnfd_RO.pop("_admin", None)
371 vnfd_RO.pop("vnf-configuration", None)
372 vnfd_RO.pop("monitoring-param", None)
373 vnfd_RO.pop("scaling-group-descriptor", None)
374 vnfd_RO.pop("kdu", None)
375 vnfd_RO.pop("k8s-cluster", None)
376 if new_id:
377 vnfd_RO["id"] = new_id
378
379 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
380 for vdu in get_iterable(vnfd_RO, "vdu"):
381 vdu.pop("cloud-init-file", None)
382 vdu.pop("cloud-init", None)
383 return vnfd_RO
384
385 def _ns_params_2_RO(self, ns_params, nsd, vnfd_dict, db_vnfrs, n2vc_key_list):
386 """
387 Creates a RO ns descriptor from OSM ns_instantiate params
388 :param ns_params: OSM instantiate params
389 :param vnfd_dict: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
390 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index. {member-vnf-index: {vnfr_object}, ...}
391 :return: The RO ns descriptor
392 """
393 vim_2_RO = {}
394 wim_2_RO = {}
395 # TODO feature 1417: Check that no instantiation is set over PDU
396 # check if PDU forces a concrete vim-network-id and add it
397 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
398
399 def vim_account_2_RO(vim_account):
400 if vim_account in vim_2_RO:
401 return vim_2_RO[vim_account]
402
403 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
404 if db_vim["_admin"]["operationalState"] != "ENABLED":
405 raise LcmException("VIM={} is not available. operationalState={}".format(
406 vim_account, db_vim["_admin"]["operationalState"]))
407 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
408 vim_2_RO[vim_account] = RO_vim_id
409 return RO_vim_id
410
411 def wim_account_2_RO(wim_account):
412 if isinstance(wim_account, str):
413 if wim_account in wim_2_RO:
414 return wim_2_RO[wim_account]
415
416 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
417 if db_wim["_admin"]["operationalState"] != "ENABLED":
418 raise LcmException("WIM={} is not available. operationalState={}".format(
419 wim_account, db_wim["_admin"]["operationalState"]))
420 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
421 wim_2_RO[wim_account] = RO_wim_id
422 return RO_wim_id
423 else:
424 return wim_account
425
426 def ip_profile_2_RO(ip_profile):
427 RO_ip_profile = deepcopy((ip_profile))
428 if "dns-server" in RO_ip_profile:
429 if isinstance(RO_ip_profile["dns-server"], list):
430 RO_ip_profile["dns-address"] = []
431 for ds in RO_ip_profile.pop("dns-server"):
432 RO_ip_profile["dns-address"].append(ds['address'])
433 else:
434 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
435 if RO_ip_profile.get("ip-version") == "ipv4":
436 RO_ip_profile["ip-version"] = "IPv4"
437 if RO_ip_profile.get("ip-version") == "ipv6":
438 RO_ip_profile["ip-version"] = "IPv6"
439 if "dhcp-params" in RO_ip_profile:
440 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
441 return RO_ip_profile
442
443 if not ns_params:
444 return None
445 RO_ns_params = {
446 # "name": ns_params["nsName"],
447 # "description": ns_params.get("nsDescription"),
448 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
449 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
450 # "scenario": ns_params["nsdId"],
451 }
452 # set vim_account of each vnf if different from general vim_account.
453 # Get this information from <vnfr> database content, key vim-account-id
454 # Vim account can be set by placement_engine and it may be different from
455 # the instantiate parameters (vnfs.member-vnf-index.datacenter).
456 for vnf_index, vnfr in db_vnfrs.items():
457 if vnfr.get("vim-account-id") and vnfr["vim-account-id"] != ns_params["vimAccountId"]:
458 populate_dict(RO_ns_params, ("vnfs", vnf_index, "datacenter"), vim_account_2_RO(vnfr["vim-account-id"]))
459
460 n2vc_key_list = n2vc_key_list or []
461 for vnfd_ref, vnfd in vnfd_dict.items():
462 vdu_needed_access = []
463 mgmt_cp = None
464 if vnfd.get("vnf-configuration"):
465 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
466 if ssh_required and vnfd.get("mgmt-interface"):
467 if vnfd["mgmt-interface"].get("vdu-id"):
468 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
469 elif vnfd["mgmt-interface"].get("cp"):
470 mgmt_cp = vnfd["mgmt-interface"]["cp"]
471
472 for vdu in vnfd.get("vdu", ()):
473 if vdu.get("vdu-configuration"):
474 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
475 if ssh_required:
476 vdu_needed_access.append(vdu["id"])
477 elif mgmt_cp:
478 for vdu_interface in vdu.get("interface"):
479 if vdu_interface.get("external-connection-point-ref") and \
480 vdu_interface["external-connection-point-ref"] == mgmt_cp:
481 vdu_needed_access.append(vdu["id"])
482 mgmt_cp = None
483 break
484
485 if vdu_needed_access:
486 for vnf_member in nsd.get("constituent-vnfd"):
487 if vnf_member["vnfd-id-ref"] != vnfd_ref:
488 continue
489 for vdu in vdu_needed_access:
490 populate_dict(RO_ns_params,
491 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
492 n2vc_key_list)
493 # cloud init
494 for vdu in get_iterable(vnfd, "vdu"):
495 cloud_init_text = self._get_cloud_init(vdu, vnfd)
496 if not cloud_init_text:
497 continue
498 for vnf_member in nsd.get("constituent-vnfd"):
499 if vnf_member["vnfd-id-ref"] != vnfd_ref:
500 continue
501 db_vnfr = db_vnfrs[vnf_member["member-vnf-index"]]
502 additional_params = self._get_vdu_additional_params(db_vnfr, vdu["id"]) or {}
503
504 cloud_init_list = []
505 for vdu_index in range(0, int(vdu.get("count", 1))):
506 additional_params["OSM"] = self._get_osm_params(db_vnfr, vdu["id"], vdu_index)
507 cloud_init_list.append(self._parse_cloud_init(cloud_init_text, additional_params, vnfd["id"],
508 vdu["id"]))
509 populate_dict(RO_ns_params,
510 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu["id"], "cloud_init"),
511 cloud_init_list)
512
513 if ns_params.get("vduImage"):
514 RO_ns_params["vduImage"] = ns_params["vduImage"]
515
516 if ns_params.get("ssh_keys"):
517 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
518 for vnf_params in get_iterable(ns_params, "vnf"):
519 for constituent_vnfd in nsd["constituent-vnfd"]:
520 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
521 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
522 break
523 else:
524 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
525 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
526
527 for vdu_params in get_iterable(vnf_params, "vdu"):
528 # TODO feature 1417: check that this VDU exist and it is not a PDU
529 if vdu_params.get("volume"):
530 for volume_params in vdu_params["volume"]:
531 if volume_params.get("vim-volume-id"):
532 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
533 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
534 volume_params["vim-volume-id"])
535 if vdu_params.get("interface"):
536 for interface_params in vdu_params["interface"]:
537 if interface_params.get("ip-address"):
538 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
539 vdu_params["id"], "interfaces", interface_params["name"],
540 "ip_address"),
541 interface_params["ip-address"])
542 if interface_params.get("mac-address"):
543 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
544 vdu_params["id"], "interfaces", interface_params["name"],
545 "mac_address"),
546 interface_params["mac-address"])
547 if interface_params.get("floating-ip-required"):
548 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
549 vdu_params["id"], "interfaces", interface_params["name"],
550 "floating-ip"),
551 interface_params["floating-ip-required"])
552
553 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
554 if internal_vld_params.get("vim-network-name"):
555 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
556 internal_vld_params["name"], "vim-network-name"),
557 internal_vld_params["vim-network-name"])
558 if internal_vld_params.get("vim-network-id"):
559 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
560 internal_vld_params["name"], "vim-network-id"),
561 internal_vld_params["vim-network-id"])
562 if internal_vld_params.get("ip-profile"):
563 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
564 internal_vld_params["name"], "ip-profile"),
565 ip_profile_2_RO(internal_vld_params["ip-profile"]))
566 if internal_vld_params.get("provider-network"):
567
568 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
569 internal_vld_params["name"], "provider-network"),
570 internal_vld_params["provider-network"].copy())
571
572 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
573 # look for interface
574 iface_found = False
575 for vdu_descriptor in vnf_descriptor["vdu"]:
576 for vdu_interface in vdu_descriptor["interface"]:
577 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
578 if icp_params.get("ip-address"):
579 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
580 vdu_descriptor["id"], "interfaces",
581 vdu_interface["name"], "ip_address"),
582 icp_params["ip-address"])
583
584 if icp_params.get("mac-address"):
585 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
586 vdu_descriptor["id"], "interfaces",
587 vdu_interface["name"], "mac_address"),
588 icp_params["mac-address"])
589 iface_found = True
590 break
591 if iface_found:
592 break
593 else:
594 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
595 "internal-vld:id-ref={} is not present at vnfd:internal-"
596 "connection-point".format(vnf_params["member-vnf-index"],
597 icp_params["id-ref"]))
598
599 for vld_params in get_iterable(ns_params, "vld"):
600 if "ip-profile" in vld_params:
601 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
602 ip_profile_2_RO(vld_params["ip-profile"]))
603
604 if vld_params.get("provider-network"):
605
606 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
607 vld_params["provider-network"].copy())
608
609 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
610 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
611 wim_account_2_RO(vld_params["wimAccountId"])),
612 if vld_params.get("vim-network-name"):
613 RO_vld_sites = []
614 if isinstance(vld_params["vim-network-name"], dict):
615 for vim_account, vim_net in vld_params["vim-network-name"].items():
616 RO_vld_sites.append({
617 "netmap-use": vim_net,
618 "datacenter": vim_account_2_RO(vim_account)
619 })
620 else: # isinstance str
621 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
622 if RO_vld_sites:
623 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
624
625 if vld_params.get("vim-network-id"):
626 RO_vld_sites = []
627 if isinstance(vld_params["vim-network-id"], dict):
628 for vim_account, vim_net in vld_params["vim-network-id"].items():
629 RO_vld_sites.append({
630 "netmap-use": vim_net,
631 "datacenter": vim_account_2_RO(vim_account)
632 })
633 else: # isinstance str
634 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
635 if RO_vld_sites:
636 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
637 if vld_params.get("ns-net"):
638 if isinstance(vld_params["ns-net"], dict):
639 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
640 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
641 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
642 if "vnfd-connection-point-ref" in vld_params:
643 for cp_params in vld_params["vnfd-connection-point-ref"]:
644 # look for interface
645 for constituent_vnfd in nsd["constituent-vnfd"]:
646 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
647 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
648 break
649 else:
650 raise LcmException(
651 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
652 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
653 match_cp = False
654 for vdu_descriptor in vnf_descriptor["vdu"]:
655 for interface_descriptor in vdu_descriptor["interface"]:
656 if interface_descriptor.get("external-connection-point-ref") == \
657 cp_params["vnfd-connection-point-ref"]:
658 match_cp = True
659 break
660 if match_cp:
661 break
662 else:
663 raise LcmException(
664 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
665 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
666 cp_params["member-vnf-index-ref"],
667 cp_params["vnfd-connection-point-ref"],
668 vnf_descriptor["id"]))
669 if cp_params.get("ip-address"):
670 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
671 vdu_descriptor["id"], "interfaces",
672 interface_descriptor["name"], "ip_address"),
673 cp_params["ip-address"])
674 if cp_params.get("mac-address"):
675 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
676 vdu_descriptor["id"], "interfaces",
677 interface_descriptor["name"], "mac_address"),
678 cp_params["mac-address"])
679 return RO_ns_params
680
681 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
682 # make a copy to do not change
683 vdu_create = copy(vdu_create)
684 vdu_delete = copy(vdu_delete)
685
686 vdurs = db_vnfr.get("vdur")
687 if vdurs is None:
688 vdurs = []
689 vdu_index = len(vdurs)
690 while vdu_index:
691 vdu_index -= 1
692 vdur = vdurs[vdu_index]
693 if vdur.get("pdu-type"):
694 continue
695 vdu_id_ref = vdur["vdu-id-ref"]
696 if vdu_create and vdu_create.get(vdu_id_ref):
697 vdur_copy = deepcopy(vdur)
698 vdur_copy["status"] = "BUILD"
699 vdur_copy["status-detailed"] = None
700 vdur_copy["ip_address"]: None
701 for iface in vdur_copy["interfaces"]:
702 iface["ip-address"] = None
703 iface["mac-address"] = None
704 iface.pop("mgmt_vnf", None) # only first vdu can be managment of vnf # TODO ALF
705 for index in range(0, vdu_create[vdu_id_ref]):
706 vdur_copy["_id"] = str(uuid4())
707 vdur_copy["count-index"] += 1
708 vdurs.insert(vdu_index+1+index, vdur_copy)
709 self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
710 vdur_copy = deepcopy(vdur_copy)
711
712 del vdu_create[vdu_id_ref]
713 if vdu_delete and vdu_delete.get(vdu_id_ref):
714 del vdurs[vdu_index]
715 vdu_delete[vdu_id_ref] -= 1
716 if not vdu_delete[vdu_id_ref]:
717 del vdu_delete[vdu_id_ref]
718 # check all operations are done
719 if vdu_create or vdu_delete:
720 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
721 vdu_create))
722 if vdu_delete:
723 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
724 vdu_delete))
725
726 vnfr_update = {"vdur": vdurs}
727 db_vnfr["vdur"] = vdurs
728 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
729
730 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
731 """
732 Updates database nsr with the RO info for the created vld
733 :param ns_update_nsr: dictionary to be filled with the updated info
734 :param db_nsr: content of db_nsr. This is also modified
735 :param nsr_desc_RO: nsr descriptor from RO
736 :return: Nothing, LcmException is raised on errors
737 """
738
739 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
740 for net_RO in get_iterable(nsr_desc_RO, "nets"):
741 if vld["id"] != net_RO.get("ns_net_osm_id"):
742 continue
743 vld["vim-id"] = net_RO.get("vim_net_id")
744 vld["name"] = net_RO.get("vim_name")
745 vld["status"] = net_RO.get("status")
746 vld["status-detailed"] = net_RO.get("error_msg")
747 ns_update_nsr["vld.{}".format(vld_index)] = vld
748 break
749 else:
750 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
751
752 def set_vnfr_at_error(self, db_vnfrs, error_text):
753 try:
754 for db_vnfr in db_vnfrs.values():
755 vnfr_update = {"status": "ERROR"}
756 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
757 if "status" not in vdur:
758 vdur["status"] = "ERROR"
759 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
760 if error_text:
761 vdur["status-detailed"] = str(error_text)
762 vnfr_update["vdur.{}.status-detailed".format(vdu_index)] = "ERROR"
763 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
764 except DbException as e:
765 self.logger.error("Cannot update vnf. {}".format(e))
766
767 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
768 """
769 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
770 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
771 :param nsr_desc_RO: nsr descriptor from RO
772 :return: Nothing, LcmException is raised on errors
773 """
774 for vnf_index, db_vnfr in db_vnfrs.items():
775 for vnf_RO in nsr_desc_RO["vnfs"]:
776 if vnf_RO["member_vnf_index"] != vnf_index:
777 continue
778 vnfr_update = {}
779 if vnf_RO.get("ip_address"):
780 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
781 elif not db_vnfr.get("ip-address"):
782 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
783 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
784
785 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
786 vdur_RO_count_index = 0
787 if vdur.get("pdu-type"):
788 continue
789 for vdur_RO in get_iterable(vnf_RO, "vms"):
790 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
791 continue
792 if vdur["count-index"] != vdur_RO_count_index:
793 vdur_RO_count_index += 1
794 continue
795 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
796 if vdur_RO.get("ip_address"):
797 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
798 else:
799 vdur["ip-address"] = None
800 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
801 vdur["name"] = vdur_RO.get("vim_name")
802 vdur["status"] = vdur_RO.get("status")
803 vdur["status-detailed"] = vdur_RO.get("error_msg")
804 for ifacer in get_iterable(vdur, "interfaces"):
805 for interface_RO in get_iterable(vdur_RO, "interfaces"):
806 if ifacer["name"] == interface_RO.get("internal_name"):
807 ifacer["ip-address"] = interface_RO.get("ip_address")
808 ifacer["mac-address"] = interface_RO.get("mac_address")
809 break
810 else:
811 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
812 "from VIM info"
813 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
814 vnfr_update["vdur.{}".format(vdu_index)] = vdur
815 break
816 else:
817 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
818 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
819
820 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
821 for net_RO in get_iterable(nsr_desc_RO, "nets"):
822 if vld["id"] != net_RO.get("vnf_net_osm_id"):
823 continue
824 vld["vim-id"] = net_RO.get("vim_net_id")
825 vld["name"] = net_RO.get("vim_name")
826 vld["status"] = net_RO.get("status")
827 vld["status-detailed"] = net_RO.get("error_msg")
828 vnfr_update["vld.{}".format(vld_index)] = vld
829 break
830 else:
831 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
832 vnf_index, vld["id"]))
833
834 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
835 break
836
837 else:
838 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
839
840 def _get_ns_config_info(self, nsr_id):
841 """
842 Generates a mapping between vnf,vdu elements and the N2VC id
843 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
844 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
845 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
846 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
847 """
848 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
849 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
850 mapping = {}
851 ns_config_info = {"osm-config-mapping": mapping}
852 for vca in vca_deployed_list:
853 if not vca["member-vnf-index"]:
854 continue
855 if not vca["vdu_id"]:
856 mapping[vca["member-vnf-index"]] = vca["application"]
857 else:
858 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
859 vca["application"]
860 return ns_config_info
861
862 @staticmethod
863 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed, ee_descriptor_id):
864 """
865 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
866 primitives as verify-ssh-credentials, or config when needed
867 :param desc_primitive_list: information of the descriptor
868 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
869 this element contains a ssh public key
870 :param ee_descriptor_id: execution environment descriptor id. It is the value of
871 XXX_configuration.execution-environment-list.INDEX.id; it can be None
872 :return: The modified list. Can ba an empty list, but always a list
873 """
874
875 primitive_list = desc_primitive_list or []
876
877 # filter primitives by ee_id
878 primitive_list = [p for p in primitive_list if p.get("execution-environment-ref") == ee_descriptor_id]
879
880 # sort by 'seq'
881 if primitive_list:
882 primitive_list.sort(key=lambda val: int(val['seq']))
883
884 # look for primitive config, and get the position. None if not present
885 config_position = None
886 for index, primitive in enumerate(primitive_list):
887 if primitive["name"] == "config":
888 config_position = index
889 break
890
891 # for NS, add always a config primitive if not present (bug 874)
892 if not vca_deployed["member-vnf-index"] and config_position is None:
893 primitive_list.insert(0, {"name": "config", "parameter": []})
894 config_position = 0
895 # TODO revise if needed: for VNF/VDU add verify-ssh-credentials after config
896 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
897 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
898 return primitive_list
899
900 async def _instantiate_ng_ro(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
901 n2vc_key_list, stage, start_deploy, timeout_ns_deploy):
902 nslcmop_id = db_nslcmop["_id"]
903 target = {
904 "name": db_nsr["name"],
905 "ns": {"vld": []},
906 "vnf": [],
907 "image": deepcopy(db_nsr["image"]),
908 "flavor": deepcopy(db_nsr["flavor"]),
909 "action_id": nslcmop_id,
910 }
911 for image in target["image"]:
912 image["vim_info"] = []
913 for flavor in target["flavor"]:
914 flavor["vim_info"] = []
915
916 ns_params = db_nslcmop.get("operationParams")
917 ssh_keys = []
918 if ns_params.get("ssh_keys"):
919 ssh_keys += ns_params.get("ssh_keys")
920 if n2vc_key_list:
921 ssh_keys += n2vc_key_list
922
923 cp2target = {}
924 for vld_index, vld in enumerate(nsd.get("vld")):
925 target_vld = {"id": vld["id"],
926 "name": vld["name"],
927 "mgmt-network": vld.get("mgmt-network", False),
928 "type": vld.get("type"),
929 "vim_info": [{"vim-network-name": vld.get("vim-network-name"),
930 "vim_account_id": ns_params["vimAccountId"]}],
931 }
932 for cp in vld["vnfd-connection-point-ref"]:
933 cp2target["member_vnf:{}.{}".format(cp["member-vnf-index-ref"], cp["vnfd-connection-point-ref"])] = \
934 "nsrs:{}:vld.{}".format(nsr_id, vld_index)
935 target["ns"]["vld"].append(target_vld)
936 for vnfr in db_vnfrs.values():
937 vnfd = db_vnfds_ref[vnfr["vnfd-ref"]]
938 target_vnf = deepcopy(vnfr)
939 for vld in target_vnf.get("vld", ()):
940 # check if connected to a ns.vld
941 vnf_cp = next((cp for cp in vnfd.get("connection-point", ()) if
942 cp.get("internal-vld-ref") == vld["id"]), None)
943 if vnf_cp:
944 ns_cp = "member_vnf:{}.{}".format(vnfr["member-vnf-index-ref"], vnf_cp["id"])
945 if cp2target.get(ns_cp):
946 vld["target"] = cp2target[ns_cp]
947 vld["vim_info"] = [{"vim-network-name": vld.get("vim-network-name"),
948 "vim_account_id": vnfr["vim-account-id"]}]
949
950 for vdur in target_vnf.get("vdur", ()):
951 vdur["vim_info"] = [{"vim_account_id": vnfr["vim-account-id"]}]
952 vdud_index, vdud = next(k for k in enumerate(vnfd["vdu"]) if k[1]["id"] == vdur["vdu-id-ref"])
953 # vdur["additionalParams"] = vnfr.get("additionalParamsForVnf") # TODO additional params for VDU
954
955 if ssh_keys:
956 if deep_get(vdud, ("vdu-configuration", "config-access", "ssh-access", "required")):
957 vdur["ssh-keys"] = ssh_keys
958 vdur["ssh-access-required"] = True
959 elif deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required")) and \
960 any(iface.get("mgmt-vnf") for iface in vdur["interfaces"]):
961 vdur["ssh-keys"] = ssh_keys
962 vdur["ssh-access-required"] = True
963
964 # cloud-init
965 if vdud.get("cloud-init-file"):
966 vdur["cloud-init"] = "{}:file:{}".format(vnfd["_id"], vdud.get("cloud-init-file"))
967 elif vdud.get("cloud-init"):
968 vdur["cloud-init"] = "{}:vdu:{}".format(vnfd["_id"], vdud_index)
969
970 # flavor
971 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
972 if not next((vi for vi in ns_flavor["vim_info"] if
973 vi and vi.get("vim_account_id") == vnfr["vim-account-id"]), None):
974 ns_flavor["vim_info"].append({"vim_account_id": vnfr["vim-account-id"]})
975 # image
976 ns_image = target["image"][int(vdur["ns-image-id"])]
977 if not next((vi for vi in ns_image["vim_info"] if
978 vi and vi.get("vim_account_id") == vnfr["vim-account-id"]), None):
979 ns_image["vim_info"].append({"vim_account_id": vnfr["vim-account-id"]})
980
981 vdur["vim_info"] = [{"vim_account_id": vnfr["vim-account-id"]}]
982 target["vnf"].append(target_vnf)
983
984 desc = await self.RO.deploy(nsr_id, target)
985 action_id = desc["action_id"]
986 await self._wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage)
987
988 # Updating NSR
989 db_nsr_update = {
990 "_admin.deployed.RO.operational-status": "running",
991 "detailed-status": " ".join(stage)
992 }
993 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
994 self.update_db_2("nsrs", nsr_id, db_nsr_update)
995 self._write_op_status(nslcmop_id, stage)
996 self.logger.debug(logging_text + "ns deployed at RO. RO_id={}".format(action_id))
997 return
998
999 async def _wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_time, timeout, stage):
1000 detailed_status_old = None
1001 db_nsr_update = {}
1002 while time() <= start_time + timeout:
1003 desc_status = await self.RO.status(nsr_id, action_id)
1004 if desc_status["status"] == "FAILED":
1005 raise NgRoException(desc_status["details"])
1006 elif desc_status["status"] == "BUILD":
1007 stage[2] = "VIM: ({})".format(desc_status["details"])
1008 elif desc_status["status"] == "DONE":
1009 stage[2] = "Deployed at VIM"
1010 break
1011 else:
1012 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status["status"])
1013 if stage[2] != detailed_status_old:
1014 detailed_status_old = stage[2]
1015 db_nsr_update["detailed-status"] = " ".join(stage)
1016 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1017 self._write_op_status(nslcmop_id, stage)
1018 await asyncio.sleep(5, loop=self.loop)
1019 else: # timeout_ns_deploy
1020 raise NgRoException("Timeout waiting ns to deploy")
1021
1022 async def _terminate_ng_ro(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
1023 db_nsr_update = {}
1024 failed_detail = []
1025 action_id = None
1026 start_deploy = time()
1027 try:
1028 target = {
1029 "ns": {"vld": []},
1030 "vnf": [],
1031 "image": [],
1032 "flavor": [],
1033 }
1034 desc = await self.RO.deploy(nsr_id, target)
1035 action_id = desc["action_id"]
1036 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1037 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1038 self.logger.debug(logging_text + "ns terminate action at RO. action_id={}".format(action_id))
1039
1040 # wait until done
1041 delete_timeout = 20 * 60 # 20 minutes
1042 await self._wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage)
1043
1044 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1045 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1046 # delete all nsr
1047 await self.RO.delete(nsr_id)
1048 except Exception as e:
1049 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1050 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1051 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1052 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1053 self.logger.debug(logging_text + "RO_action_id={} already deleted".format(action_id))
1054 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1055 failed_detail.append("delete conflict: {}".format(e))
1056 self.logger.debug(logging_text + "RO_action_id={} delete conflict: {}".format(action_id, e))
1057 else:
1058 failed_detail.append("delete error: {}".format(e))
1059 self.logger.error(logging_text + "RO_action_id={} delete error: {}".format(action_id, e))
1060
1061 if failed_detail:
1062 stage[2] = "Error deleting from VIM"
1063 else:
1064 stage[2] = "Deleted from VIM"
1065 db_nsr_update["detailed-status"] = " ".join(stage)
1066 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1067 self._write_op_status(nslcmop_id, stage)
1068
1069 if failed_detail:
1070 raise LcmException("; ".join(failed_detail))
1071 return
1072
1073 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
1074 n2vc_key_list, stage):
1075 """
1076 Instantiate at RO
1077 :param logging_text: preffix text to use at logging
1078 :param nsr_id: nsr identity
1079 :param nsd: database content of ns descriptor
1080 :param db_nsr: database content of ns record
1081 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1082 :param db_vnfrs:
1083 :param db_vnfds_ref: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1084 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1085 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1086 :return: None or exception
1087 """
1088 try:
1089 db_nsr_update = {}
1090 RO_descriptor_number = 0 # number of descriptors created at RO
1091 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
1092 nslcmop_id = db_nslcmop["_id"]
1093 start_deploy = time()
1094 ns_params = db_nslcmop.get("operationParams")
1095 if ns_params and ns_params.get("timeout_ns_deploy"):
1096 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1097 else:
1098 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1099
1100 # Check for and optionally request placement optimization. Database will be updated if placement activated
1101 stage[2] = "Waiting for Placement."
1102 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1103 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1104 for vnfr in db_vnfrs.values():
1105 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1106 break
1107 else:
1108 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1109
1110 if self.ng_ro:
1111 return await self._instantiate_ng_ro(logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs,
1112 db_vnfds_ref, n2vc_key_list, stage, start_deploy,
1113 timeout_ns_deploy)
1114 # deploy RO
1115 # get vnfds, instantiate at RO
1116 for c_vnf in nsd.get("constituent-vnfd", ()):
1117 member_vnf_index = c_vnf["member-vnf-index"]
1118 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
1119 vnfd_ref = vnfd["id"]
1120
1121 stage[2] = "Creating vnfd='{}' member_vnf_index='{}' at RO".format(vnfd_ref, member_vnf_index)
1122 db_nsr_update["detailed-status"] = " ".join(stage)
1123 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1124 self._write_op_status(nslcmop_id, stage)
1125
1126 # self.logger.debug(logging_text + stage[2])
1127 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
1128 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
1129 RO_descriptor_number += 1
1130
1131 # look position at deployed.RO.vnfd if not present it will be appended at the end
1132 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
1133 if vnf_deployed["member-vnf-index"] == member_vnf_index:
1134 break
1135 else:
1136 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
1137 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
1138
1139 # look if present
1140 RO_update = {"member-vnf-index": member_vnf_index}
1141 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
1142 if vnfd_list:
1143 RO_update["id"] = vnfd_list[0]["uuid"]
1144 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
1145 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
1146 else:
1147 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
1148 get("additionalParamsForVnf"), nsr_id)
1149 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
1150 RO_update["id"] = desc["uuid"]
1151 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
1152 vnfd_ref, member_vnf_index, desc["uuid"]))
1153 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
1154 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
1155
1156 # create nsd at RO
1157 nsd_ref = nsd["id"]
1158
1159 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
1160 db_nsr_update["detailed-status"] = " ".join(stage)
1161 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1162 self._write_op_status(nslcmop_id, stage)
1163
1164 # self.logger.debug(logging_text + stage[2])
1165 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
1166 RO_descriptor_number += 1
1167 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
1168 if nsd_list:
1169 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
1170 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
1171 nsd_ref, RO_nsd_uuid))
1172 else:
1173 nsd_RO = deepcopy(nsd)
1174 nsd_RO["id"] = RO_osm_nsd_id
1175 nsd_RO.pop("_id", None)
1176 nsd_RO.pop("_admin", None)
1177 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
1178 member_vnf_index = c_vnf["member-vnf-index"]
1179 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
1180 for c_vld in nsd_RO.get("vld", ()):
1181 for cp in c_vld.get("vnfd-connection-point-ref", ()):
1182 member_vnf_index = cp["member-vnf-index-ref"]
1183 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
1184
1185 desc = await self.RO.create("nsd", descriptor=nsd_RO)
1186 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1187 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
1188 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
1189 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1190
1191 # Crate ns at RO
1192 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
1193 db_nsr_update["detailed-status"] = " ".join(stage)
1194 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1195 self._write_op_status(nslcmop_id, stage)
1196
1197 # if present use it unless in error status
1198 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
1199 if RO_nsr_id:
1200 try:
1201 stage[2] = "Looking for existing ns at RO"
1202 db_nsr_update["detailed-status"] = " ".join(stage)
1203 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1204 self._write_op_status(nslcmop_id, stage)
1205 # self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
1206 desc = await self.RO.show("ns", RO_nsr_id)
1207
1208 except ROclient.ROClientException as e:
1209 if e.http_code != HTTPStatus.NOT_FOUND:
1210 raise
1211 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1212 if RO_nsr_id:
1213 ns_status, ns_status_info = self.RO.check_ns_status(desc)
1214 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
1215 if ns_status == "ERROR":
1216 stage[2] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
1217 self.logger.debug(logging_text + stage[2])
1218 await self.RO.delete("ns", RO_nsr_id)
1219 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1220 if not RO_nsr_id:
1221 stage[2] = "Checking dependencies"
1222 db_nsr_update["detailed-status"] = " ".join(stage)
1223 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1224 self._write_op_status(nslcmop_id, stage)
1225 # self.logger.debug(logging_text + stage[2])
1226
1227 # check if VIM is creating and wait look if previous tasks in process
1228 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
1229 if task_dependency:
1230 stage[2] = "Waiting for related tasks '{}' to be completed".format(task_name)
1231 self.logger.debug(logging_text + stage[2])
1232 await asyncio.wait(task_dependency, timeout=3600)
1233 if ns_params.get("vnf"):
1234 for vnf in ns_params["vnf"]:
1235 if "vimAccountId" in vnf:
1236 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
1237 vnf["vimAccountId"])
1238 if task_dependency:
1239 stage[2] = "Waiting for related tasks '{}' to be completed.".format(task_name)
1240 self.logger.debug(logging_text + stage[2])
1241 await asyncio.wait(task_dependency, timeout=3600)
1242
1243 stage[2] = "Checking instantiation parameters."
1244 RO_ns_params = self._ns_params_2_RO(ns_params, nsd, db_vnfds_ref, db_vnfrs, n2vc_key_list)
1245 stage[2] = "Deploying ns at VIM."
1246 db_nsr_update["detailed-status"] = " ".join(stage)
1247 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1248 self._write_op_status(nslcmop_id, stage)
1249
1250 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
1251 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
1252 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1253 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
1254 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
1255
1256 # wait until NS is ready
1257 stage[2] = "Waiting VIM to deploy ns."
1258 db_nsr_update["detailed-status"] = " ".join(stage)
1259 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1260 self._write_op_status(nslcmop_id, stage)
1261 detailed_status_old = None
1262 self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
1263
1264 old_desc = None
1265 while time() <= start_deploy + timeout_ns_deploy:
1266 desc = await self.RO.show("ns", RO_nsr_id)
1267
1268 # deploymentStatus
1269 if desc != old_desc:
1270 # desc has changed => update db
1271 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
1272 old_desc = desc
1273
1274 ns_status, ns_status_info = self.RO.check_ns_status(desc)
1275 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
1276 if ns_status == "ERROR":
1277 raise ROclient.ROClientException(ns_status_info)
1278 elif ns_status == "BUILD":
1279 stage[2] = "VIM: ({})".format(ns_status_info)
1280 elif ns_status == "ACTIVE":
1281 stage[2] = "Waiting for management IP address reported by the VIM. Updating VNFRs."
1282 try:
1283 self.ns_update_vnfr(db_vnfrs, desc)
1284 break
1285 except LcmExceptionNoMgmtIP:
1286 pass
1287 else:
1288 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
1289 if stage[2] != detailed_status_old:
1290 detailed_status_old = stage[2]
1291 db_nsr_update["detailed-status"] = " ".join(stage)
1292 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1293 self._write_op_status(nslcmop_id, stage)
1294 await asyncio.sleep(5, loop=self.loop)
1295 else: # timeout_ns_deploy
1296 raise ROclient.ROClientException("Timeout waiting ns to be ready")
1297
1298 # Updating NSR
1299 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
1300
1301 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
1302 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1303 stage[2] = "Deployed at VIM"
1304 db_nsr_update["detailed-status"] = " ".join(stage)
1305 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1306 self._write_op_status(nslcmop_id, stage)
1307 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
1308 # self.logger.debug(logging_text + "Deployed at VIM")
1309 except (ROclient.ROClientException, LcmException, DbException, NgRoException) as e:
1310 stage[2] = "ERROR deploying at VIM"
1311 self.set_vnfr_at_error(db_vnfrs, str(e))
1312 raise
1313
1314 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1315 """
1316 Wait for kdu to be up, get ip address
1317 :param logging_text: prefix use for logging
1318 :param nsr_id:
1319 :param vnfr_id:
1320 :param kdu_name:
1321 :return: IP address
1322 """
1323
1324 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1325 nb_tries = 0
1326
1327 while nb_tries < 360:
1328 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1329 kdur = next((x for x in get_iterable(db_vnfr, "kdur") if x.get("kdu-name") == kdu_name), None)
1330 if not kdur:
1331 raise LcmException("Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name))
1332 if kdur.get("status"):
1333 if kdur["status"] in ("READY", "ENABLED"):
1334 return kdur.get("ip-address")
1335 else:
1336 raise LcmException("target KDU={} is in error state".format(kdu_name))
1337
1338 await asyncio.sleep(10, loop=self.loop)
1339 nb_tries += 1
1340 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1341
1342 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
1343 """
1344 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1345 :param logging_text: prefix use for logging
1346 :param nsr_id:
1347 :param vnfr_id:
1348 :param vdu_id:
1349 :param vdu_index:
1350 :param pub_key: public ssh key to inject, None to skip
1351 :param user: user to apply the public ssh key
1352 :return: IP address
1353 """
1354
1355 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1356 ro_nsr_id = None
1357 ip_address = None
1358 nb_tries = 0
1359 target_vdu_id = None
1360 ro_retries = 0
1361
1362 while True:
1363
1364 ro_retries += 1
1365 if ro_retries >= 360: # 1 hour
1366 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
1367
1368 await asyncio.sleep(10, loop=self.loop)
1369
1370 # get ip address
1371 if not target_vdu_id:
1372 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1373
1374 if not vdu_id: # for the VNF case
1375 if db_vnfr.get("status") == "ERROR":
1376 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1377 ip_address = db_vnfr.get("ip-address")
1378 if not ip_address:
1379 continue
1380 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
1381 else: # VDU case
1382 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
1383 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
1384
1385 if not vdur and len(db_vnfr.get("vdur", ())) == 1: # If only one, this should be the target vdu
1386 vdur = db_vnfr["vdur"][0]
1387 if not vdur:
1388 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id, vdu_id,
1389 vdu_index))
1390
1391 if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE":
1392 ip_address = vdur.get("ip-address")
1393 if not ip_address:
1394 continue
1395 target_vdu_id = vdur["vdu-id-ref"]
1396 elif vdur.get("status") == "ERROR":
1397 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1398
1399 if not target_vdu_id:
1400 continue
1401
1402 # inject public key into machine
1403 if pub_key and user:
1404 # wait until NS is deployed at RO
1405 if not ro_nsr_id:
1406 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1407 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
1408 if not ro_nsr_id:
1409 continue
1410
1411 # self.logger.debug(logging_text + "Inserting RO key")
1412 if vdur.get("pdu-type"):
1413 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1414 return ip_address
1415 try:
1416 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
1417 if self.ng_ro:
1418 target = {"action": "inject_ssh_key", "key": pub_key, "user": user,
1419 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdu_id}]}],
1420 }
1421 await self.RO.deploy(nsr_id, target)
1422 else:
1423 result_dict = await self.RO.create_action(
1424 item="ns",
1425 item_id_name=ro_nsr_id,
1426 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
1427 )
1428 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1429 if not result_dict or not isinstance(result_dict, dict):
1430 raise LcmException("Unknown response from RO when injecting key")
1431 for result in result_dict.values():
1432 if result.get("vim_result") == 200:
1433 break
1434 else:
1435 raise ROclient.ROClientException("error injecting key: {}".format(
1436 result.get("description")))
1437 break
1438 except NgRoException as e:
1439 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1440 except ROclient.ROClientException as e:
1441 if not nb_tries:
1442 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1443 format(e, 20*10))
1444 nb_tries += 1
1445 if nb_tries >= 20:
1446 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1447 else:
1448 break
1449
1450 return ip_address
1451
1452 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1453 """
1454 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1455 """
1456 my_vca = vca_deployed_list[vca_index]
1457 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1458 # vdu or kdu: no dependencies
1459 return
1460 timeout = 300
1461 while timeout >= 0:
1462 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1463 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1464 configuration_status_list = db_nsr["configurationStatus"]
1465 for index, vca_deployed in enumerate(configuration_status_list):
1466 if index == vca_index:
1467 # myself
1468 continue
1469 if not my_vca.get("member-vnf-index") or \
1470 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1471 internal_status = configuration_status_list[index].get("status")
1472 if internal_status == 'READY':
1473 continue
1474 elif internal_status == 'BROKEN':
1475 raise LcmException("Configuration aborted because dependent charm/s has failed")
1476 else:
1477 break
1478 else:
1479 # no dependencies, return
1480 return
1481 await asyncio.sleep(10)
1482 timeout -= 1
1483
1484 raise LcmException("Configuration aborted because dependent charm/s timeout")
1485
1486 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
1487 config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name,
1488 ee_config_descriptor):
1489 nsr_id = db_nsr["_id"]
1490 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1491 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1492 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1493 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1494 db_dict = {
1495 'collection': 'nsrs',
1496 'filter': {'_id': nsr_id},
1497 'path': db_update_entry
1498 }
1499 step = ""
1500 try:
1501
1502 element_type = 'NS'
1503 element_under_configuration = nsr_id
1504
1505 vnfr_id = None
1506 if db_vnfr:
1507 vnfr_id = db_vnfr["_id"]
1508 osm_config["osm"]["vnf_id"] = vnfr_id
1509
1510 namespace = "{nsi}.{ns}".format(
1511 nsi=nsi_id if nsi_id else "",
1512 ns=nsr_id)
1513
1514 if vnfr_id:
1515 element_type = 'VNF'
1516 element_under_configuration = vnfr_id
1517 namespace += ".{}".format(vnfr_id)
1518 if vdu_id:
1519 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1520 element_type = 'VDU'
1521 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1522 osm_config["osm"]["vdu_id"] = vdu_id
1523 elif kdu_name:
1524 namespace += ".{}".format(kdu_name)
1525 element_type = 'KDU'
1526 element_under_configuration = kdu_name
1527 osm_config["osm"]["kdu_name"] = kdu_name
1528
1529 # Get artifact path
1530 artifact_path = "{}/{}/{}/{}".format(
1531 base_folder["folder"],
1532 base_folder["pkg-dir"],
1533 "charms" if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1534 vca_name
1535 )
1536 # get initial_config_primitive_list that applies to this element
1537 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1538
1539 # add config if not present for NS charm
1540 ee_descriptor_id = ee_config_descriptor.get("id")
1541 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1542 vca_deployed, ee_descriptor_id)
1543
1544 # n2vc_redesign STEP 3.1
1545 # find old ee_id if exists
1546 ee_id = vca_deployed.get("ee_id")
1547
1548 # create or register execution environment in VCA
1549 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1550
1551 self._write_configuration_status(
1552 nsr_id=nsr_id,
1553 vca_index=vca_index,
1554 status='CREATING',
1555 element_under_configuration=element_under_configuration,
1556 element_type=element_type
1557 )
1558
1559 step = "create execution environment"
1560 self.logger.debug(logging_text + step)
1561 ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
1562 namespace=namespace,
1563 reuse_ee_id=ee_id,
1564 db_dict=db_dict,
1565 config=osm_config,
1566 artifact_path=artifact_path,
1567 vca_type=vca_type)
1568
1569 elif vca_type == "native_charm":
1570 step = "Waiting to VM being up and getting IP address"
1571 self.logger.debug(logging_text + step)
1572 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1573 user=None, pub_key=None)
1574 credentials = {"hostname": rw_mgmt_ip}
1575 # get username
1576 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1577 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1578 # merged. Meanwhile let's get username from initial-config-primitive
1579 if not username and initial_config_primitive_list:
1580 for config_primitive in initial_config_primitive_list:
1581 for param in config_primitive.get("parameter", ()):
1582 if param["name"] == "ssh-username":
1583 username = param["value"]
1584 break
1585 if not username:
1586 raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
1587 "'config-access.ssh-access.default-user'")
1588 credentials["username"] = username
1589 # n2vc_redesign STEP 3.2
1590
1591 self._write_configuration_status(
1592 nsr_id=nsr_id,
1593 vca_index=vca_index,
1594 status='REGISTERING',
1595 element_under_configuration=element_under_configuration,
1596 element_type=element_type
1597 )
1598
1599 step = "register execution environment {}".format(credentials)
1600 self.logger.debug(logging_text + step)
1601 ee_id = await self.vca_map[vca_type].register_execution_environment(
1602 credentials=credentials, namespace=namespace, db_dict=db_dict)
1603
1604 # for compatibility with MON/POL modules, the need model and application name at database
1605 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1606 ee_id_parts = ee_id.split('.')
1607 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1608 if len(ee_id_parts) >= 2:
1609 model_name = ee_id_parts[0]
1610 application_name = ee_id_parts[1]
1611 db_nsr_update[db_update_entry + "model"] = model_name
1612 db_nsr_update[db_update_entry + "application"] = application_name
1613
1614 # n2vc_redesign STEP 3.3
1615 step = "Install configuration Software"
1616
1617 self._write_configuration_status(
1618 nsr_id=nsr_id,
1619 vca_index=vca_index,
1620 status='INSTALLING SW',
1621 element_under_configuration=element_under_configuration,
1622 element_type=element_type,
1623 other_update=db_nsr_update
1624 )
1625
1626 # TODO check if already done
1627 self.logger.debug(logging_text + step)
1628 config = None
1629 if vca_type == "native_charm":
1630 config_primitive = next((p for p in initial_config_primitive_list if p["name"] == "config"), None)
1631 if config_primitive:
1632 config = self._map_primitive_params(
1633 config_primitive,
1634 {},
1635 deploy_params
1636 )
1637 num_units = 1
1638 if vca_type == "lxc_proxy_charm":
1639 if element_type == "NS":
1640 num_units = db_nsr.get("config-units") or 1
1641 elif element_type == "VNF":
1642 num_units = db_vnfr.get("config-units") or 1
1643 elif element_type == "VDU":
1644 for v in db_vnfr["vdur"]:
1645 if vdu_id == v["vdu-id-ref"]:
1646 num_units = v.get("config-units") or 1
1647 break
1648
1649 await self.vca_map[vca_type].install_configuration_sw(
1650 ee_id=ee_id,
1651 artifact_path=artifact_path,
1652 db_dict=db_dict,
1653 config=config,
1654 num_units=num_units,
1655 vca_type=vca_type
1656 )
1657
1658 # write in db flag of configuration_sw already installed
1659 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1660
1661 # add relations for this VCA (wait for other peers related with this VCA)
1662 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id,
1663 vca_index=vca_index, vca_type=vca_type)
1664
1665 # if SSH access is required, then get execution environment SSH public
1666 # if native charm we have waited already to VM be UP
1667 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1668 pub_key = None
1669 user = None
1670 # self.logger.debug("get ssh key block")
1671 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1672 # self.logger.debug("ssh key needed")
1673 # Needed to inject a ssh key
1674 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1675 step = "Install configuration Software, getting public ssh key"
1676 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1677
1678 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1679 else:
1680 # self.logger.debug("no need to get ssh key")
1681 step = "Waiting to VM being up and getting IP address"
1682 self.logger.debug(logging_text + step)
1683
1684 # n2vc_redesign STEP 5.1
1685 # wait for RO (ip-address) Insert pub_key into VM
1686 if vnfr_id:
1687 if kdu_name:
1688 rw_mgmt_ip = await self.wait_kdu_up(logging_text, nsr_id, vnfr_id, kdu_name)
1689 else:
1690 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id,
1691 vdu_index, user=user, pub_key=pub_key)
1692 else:
1693 rw_mgmt_ip = None # This is for a NS configuration
1694
1695 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1696
1697 # store rw_mgmt_ip in deploy params for later replacement
1698 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1699
1700 # n2vc_redesign STEP 6 Execute initial config primitive
1701 step = 'execute initial config primitive'
1702
1703 # wait for dependent primitives execution (NS -> VNF -> VDU)
1704 if initial_config_primitive_list:
1705 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1706
1707 # stage, in function of element type: vdu, kdu, vnf or ns
1708 my_vca = vca_deployed_list[vca_index]
1709 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1710 # VDU or KDU
1711 stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1712 elif my_vca.get("member-vnf-index"):
1713 # VNF
1714 stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1715 else:
1716 # NS
1717 stage[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1718
1719 self._write_configuration_status(
1720 nsr_id=nsr_id,
1721 vca_index=vca_index,
1722 status='EXECUTING PRIMITIVE'
1723 )
1724
1725 self._write_op_status(
1726 op_id=nslcmop_id,
1727 stage=stage
1728 )
1729
1730 check_if_terminated_needed = True
1731 for initial_config_primitive in initial_config_primitive_list:
1732 # adding information on the vca_deployed if it is a NS execution environment
1733 if not vca_deployed["member-vnf-index"]:
1734 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1735 # TODO check if already done
1736 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1737
1738 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1739 self.logger.debug(logging_text + step)
1740 await self.vca_map[vca_type].exec_primitive(
1741 ee_id=ee_id,
1742 primitive_name=initial_config_primitive["name"],
1743 params_dict=primitive_params_,
1744 db_dict=db_dict
1745 )
1746 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1747 if check_if_terminated_needed:
1748 if config_descriptor.get('terminate-config-primitive'):
1749 self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True})
1750 check_if_terminated_needed = False
1751
1752 # TODO register in database that primitive is done
1753
1754 # STEP 7 Configure metrics
1755 if vca_type == "helm" or vca_type == "helm-v3":
1756 prometheus_jobs = await self.add_prometheus_metrics(
1757 ee_id=ee_id,
1758 artifact_path=artifact_path,
1759 ee_config_descriptor=ee_config_descriptor,
1760 vnfr_id=vnfr_id,
1761 nsr_id=nsr_id,
1762 target_ip=rw_mgmt_ip,
1763 )
1764 if prometheus_jobs:
1765 self.update_db_2("nsrs", nsr_id, {db_update_entry + "prometheus_jobs": prometheus_jobs})
1766
1767 step = "instantiated at VCA"
1768 self.logger.debug(logging_text + step)
1769
1770 self._write_configuration_status(
1771 nsr_id=nsr_id,
1772 vca_index=vca_index,
1773 status='READY'
1774 )
1775
1776 except Exception as e: # TODO not use Exception but N2VC exception
1777 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1778 if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)):
1779 self.logger.error("Exception while {} : {}".format(step, e), exc_info=True)
1780 self._write_configuration_status(
1781 nsr_id=nsr_id,
1782 vca_index=vca_index,
1783 status='BROKEN'
1784 )
1785 raise LcmException("{} {}".format(step, e)) from e
1786
1787 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1788 error_description: str = None, error_detail: str = None, other_update: dict = None):
1789 """
1790 Update db_nsr fields.
1791 :param nsr_id:
1792 :param ns_state:
1793 :param current_operation:
1794 :param current_operation_id:
1795 :param error_description:
1796 :param error_detail:
1797 :param other_update: Other required changes at database if provided, will be cleared
1798 :return:
1799 """
1800 try:
1801 db_dict = other_update or {}
1802 db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility
1803 db_dict["_admin.current-operation"] = current_operation_id
1804 db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None
1805 db_dict["currentOperation"] = current_operation
1806 db_dict["currentOperationID"] = current_operation_id
1807 db_dict["errorDescription"] = error_description
1808 db_dict["errorDetail"] = error_detail
1809
1810 if ns_state:
1811 db_dict["nsState"] = ns_state
1812 self.update_db_2("nsrs", nsr_id, db_dict)
1813 except DbException as e:
1814 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1815
1816 def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0,
1817 operation_state: str = None, other_update: dict = None):
1818 try:
1819 db_dict = other_update or {}
1820 db_dict['queuePosition'] = queuePosition
1821 if isinstance(stage, list):
1822 db_dict['stage'] = stage[0]
1823 db_dict['detailed-status'] = " ".join(stage)
1824 elif stage is not None:
1825 db_dict['stage'] = str(stage)
1826
1827 if error_message is not None:
1828 db_dict['errorMessage'] = error_message
1829 if operation_state is not None:
1830 db_dict['operationState'] = operation_state
1831 db_dict["statusEnteredTime"] = time()
1832 self.update_db_2("nslcmops", op_id, db_dict)
1833 except DbException as e:
1834 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1835
1836 def _write_all_config_status(self, db_nsr: dict, status: str):
1837 try:
1838 nsr_id = db_nsr["_id"]
1839 # configurationStatus
1840 config_status = db_nsr.get('configurationStatus')
1841 if config_status:
1842 db_nsr_update = {"configurationStatus.{}.status".format(index): status for index, v in
1843 enumerate(config_status) if v}
1844 # update status
1845 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1846
1847 except DbException as e:
1848 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1849
1850 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1851 element_under_configuration: str = None, element_type: str = None,
1852 other_update: dict = None):
1853
1854 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1855 # .format(vca_index, status))
1856
1857 try:
1858 db_path = 'configurationStatus.{}.'.format(vca_index)
1859 db_dict = other_update or {}
1860 if status:
1861 db_dict[db_path + 'status'] = status
1862 if element_under_configuration:
1863 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1864 if element_type:
1865 db_dict[db_path + 'elementType'] = element_type
1866 self.update_db_2("nsrs", nsr_id, db_dict)
1867 except DbException as e:
1868 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1869 .format(status, nsr_id, vca_index, e))
1870
1871 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
1872 """
1873 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1874 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1875 Database is used because the result can be obtained from a different LCM worker in case of HA.
1876 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1877 :param db_nslcmop: database content of nslcmop
1878 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1879 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
1880 computed 'vim-account-id'
1881 """
1882 modified = False
1883 nslcmop_id = db_nslcmop['_id']
1884 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
1885 if placement_engine == "PLA":
1886 self.logger.debug(logging_text + "Invoke and wait for placement optimization")
1887 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id}, loop=self.loop)
1888 db_poll_interval = 5
1889 wait = db_poll_interval * 10
1890 pla_result = None
1891 while not pla_result and wait >= 0:
1892 await asyncio.sleep(db_poll_interval)
1893 wait -= db_poll_interval
1894 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1895 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
1896
1897 if not pla_result:
1898 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id))
1899
1900 for pla_vnf in pla_result['vnf']:
1901 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
1902 if not pla_vnf.get('vimAccountId') or not vnfr:
1903 continue
1904 modified = True
1905 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
1906 # Modifies db_vnfrs
1907 vnfr["vim-account-id"] = pla_vnf['vimAccountId']
1908 return modified
1909
1910 def update_nsrs_with_pla_result(self, params):
1911 try:
1912 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
1913 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
1914 except Exception as e:
1915 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
1916
1917 async def instantiate(self, nsr_id, nslcmop_id):
1918 """
1919
1920 :param nsr_id: ns instance to deploy
1921 :param nslcmop_id: operation to run
1922 :return:
1923 """
1924
1925 # Try to lock HA task here
1926 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1927 if not task_is_locked_by_me:
1928 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1929 return
1930
1931 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1932 self.logger.debug(logging_text + "Enter")
1933
1934 # get all needed from database
1935
1936 # database nsrs record
1937 db_nsr = None
1938
1939 # database nslcmops record
1940 db_nslcmop = None
1941
1942 # update operation on nsrs
1943 db_nsr_update = {}
1944 # update operation on nslcmops
1945 db_nslcmop_update = {}
1946
1947 nslcmop_operation_state = None
1948 db_vnfrs = {} # vnf's info indexed by member-index
1949 # n2vc_info = {}
1950 tasks_dict_info = {} # from task to info text
1951 exc = None
1952 error_list = []
1953 stage = ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1954 # ^ stage, step, VIM progress
1955 try:
1956 # wait for any previous tasks in process
1957 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1958
1959 stage[1] = "Sync filesystem from database."
1960 self.fs.sync() # TODO, make use of partial sync, only for the needed packages
1961
1962 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1963 stage[1] = "Reading from database."
1964 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1965 db_nsr_update["detailed-status"] = "creating"
1966 db_nsr_update["operational-status"] = "init"
1967 self._write_ns_status(
1968 nsr_id=nsr_id,
1969 ns_state="BUILDING",
1970 current_operation="INSTANTIATING",
1971 current_operation_id=nslcmop_id,
1972 other_update=db_nsr_update
1973 )
1974 self._write_op_status(
1975 op_id=nslcmop_id,
1976 stage=stage,
1977 queuePosition=0
1978 )
1979
1980 # read from db: operation
1981 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
1982 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1983 ns_params = db_nslcmop.get("operationParams")
1984 if ns_params and ns_params.get("timeout_ns_deploy"):
1985 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1986 else:
1987 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1988
1989 # read from db: ns
1990 stage[1] = "Getting nsr={} from db.".format(nsr_id)
1991 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1992 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
1993 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
1994 db_nsr["nsd"] = nsd
1995 # nsr_name = db_nsr["name"] # TODO short-name??
1996
1997 # read from db: vnf's of this ns
1998 stage[1] = "Getting vnfrs from db."
1999 self.logger.debug(logging_text + stage[1])
2000 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2001
2002 # read from db: vnfd's for every vnf
2003 db_vnfds_ref = {} # every vnfd data indexed by vnf name
2004 db_vnfds = {} # every vnfd data indexed by vnf id
2005 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
2006
2007 # for each vnf in ns, read vnfd
2008 for vnfr in db_vnfrs_list:
2009 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
2010 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
2011 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
2012
2013 # if we haven't this vnfd, read it from db
2014 if vnfd_id not in db_vnfds:
2015 # read from db
2016 stage[1] = "Getting vnfd={} id='{}' from db.".format(vnfd_id, vnfd_ref)
2017 self.logger.debug(logging_text + stage[1])
2018 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2019
2020 # store vnfd
2021 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
2022 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
2023 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
2024
2025 # Get or generates the _admin.deployed.VCA list
2026 vca_deployed_list = None
2027 if db_nsr["_admin"].get("deployed"):
2028 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2029 if vca_deployed_list is None:
2030 vca_deployed_list = []
2031 configuration_status_list = []
2032 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2033 db_nsr_update["configurationStatus"] = configuration_status_list
2034 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2035 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2036 elif isinstance(vca_deployed_list, dict):
2037 # maintain backward compatibility. Change a dict to list at database
2038 vca_deployed_list = list(vca_deployed_list.values())
2039 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2040 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2041
2042 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
2043 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2044 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2045
2046 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2047 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2048 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2049 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"})
2050
2051 # n2vc_redesign STEP 2 Deploy Network Scenario
2052 stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
2053 self._write_op_status(
2054 op_id=nslcmop_id,
2055 stage=stage
2056 )
2057
2058 stage[1] = "Deploying KDUs."
2059 # self.logger.debug(logging_text + "Before deploy_kdus")
2060 # Call to deploy_kdus in case exists the "vdu:kdu" param
2061 await self.deploy_kdus(
2062 logging_text=logging_text,
2063 nsr_id=nsr_id,
2064 nslcmop_id=nslcmop_id,
2065 db_vnfrs=db_vnfrs,
2066 db_vnfds=db_vnfds,
2067 task_instantiation_info=tasks_dict_info,
2068 )
2069
2070 stage[1] = "Getting VCA public key."
2071 # n2vc_redesign STEP 1 Get VCA public ssh-key
2072 # feature 1429. Add n2vc public key to needed VMs
2073 n2vc_key = self.n2vc.get_public_key()
2074 n2vc_key_list = [n2vc_key]
2075 if self.vca_config.get("public_key"):
2076 n2vc_key_list.append(self.vca_config["public_key"])
2077
2078 stage[1] = "Deploying NS at VIM."
2079 task_ro = asyncio.ensure_future(
2080 self.instantiate_RO(
2081 logging_text=logging_text,
2082 nsr_id=nsr_id,
2083 nsd=nsd,
2084 db_nsr=db_nsr,
2085 db_nslcmop=db_nslcmop,
2086 db_vnfrs=db_vnfrs,
2087 db_vnfds_ref=db_vnfds_ref,
2088 n2vc_key_list=n2vc_key_list,
2089 stage=stage
2090 )
2091 )
2092 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2093 tasks_dict_info[task_ro] = "Deploying at VIM"
2094
2095 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2096 stage[1] = "Deploying Execution Environments."
2097 self.logger.debug(logging_text + stage[1])
2098
2099 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2100 # get_iterable() returns a value from a dict or empty tuple if key does not exist
2101 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
2102 vnfd_id = c_vnf["vnfd-id-ref"]
2103 vnfd = db_vnfds_ref[vnfd_id]
2104 member_vnf_index = str(c_vnf["member-vnf-index"])
2105 db_vnfr = db_vnfrs[member_vnf_index]
2106 base_folder = vnfd["_admin"]["storage"]
2107 vdu_id = None
2108 vdu_index = 0
2109 vdu_name = None
2110 kdu_name = None
2111
2112 # Get additional parameters
2113 deploy_params = {"OSM": self._get_osm_params(db_vnfr)}
2114 if db_vnfr.get("additionalParamsForVnf"):
2115 deploy_params.update(self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy()))
2116
2117 descriptor_config = vnfd.get("vnf-configuration")
2118 if descriptor_config:
2119 self._deploy_n2vc(
2120 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
2121 db_nsr=db_nsr,
2122 db_vnfr=db_vnfr,
2123 nslcmop_id=nslcmop_id,
2124 nsr_id=nsr_id,
2125 nsi_id=nsi_id,
2126 vnfd_id=vnfd_id,
2127 vdu_id=vdu_id,
2128 kdu_name=kdu_name,
2129 member_vnf_index=member_vnf_index,
2130 vdu_index=vdu_index,
2131 vdu_name=vdu_name,
2132 deploy_params=deploy_params,
2133 descriptor_config=descriptor_config,
2134 base_folder=base_folder,
2135 task_instantiation_info=tasks_dict_info,
2136 stage=stage
2137 )
2138
2139 # Deploy charms for each VDU that supports one.
2140 for vdud in get_iterable(vnfd, 'vdu'):
2141 vdu_id = vdud["id"]
2142 descriptor_config = vdud.get('vdu-configuration')
2143 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
2144 if vdur.get("additionalParams"):
2145 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
2146 else:
2147 deploy_params_vdu = deploy_params
2148 deploy_params_vdu["OSM"] = self._get_osm_params(db_vnfr, vdu_id, vdu_count_index=0)
2149 if descriptor_config:
2150 vdu_name = None
2151 kdu_name = None
2152 for vdu_index in range(int(vdud.get("count", 1))):
2153 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2154 self._deploy_n2vc(
2155 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2156 member_vnf_index, vdu_id, vdu_index),
2157 db_nsr=db_nsr,
2158 db_vnfr=db_vnfr,
2159 nslcmop_id=nslcmop_id,
2160 nsr_id=nsr_id,
2161 nsi_id=nsi_id,
2162 vnfd_id=vnfd_id,
2163 vdu_id=vdu_id,
2164 kdu_name=kdu_name,
2165 member_vnf_index=member_vnf_index,
2166 vdu_index=vdu_index,
2167 vdu_name=vdu_name,
2168 deploy_params=deploy_params_vdu,
2169 descriptor_config=descriptor_config,
2170 base_folder=base_folder,
2171 task_instantiation_info=tasks_dict_info,
2172 stage=stage
2173 )
2174 for kdud in get_iterable(vnfd, 'kdu'):
2175 kdu_name = kdud["name"]
2176 descriptor_config = kdud.get('kdu-configuration')
2177 if descriptor_config:
2178 vdu_id = None
2179 vdu_index = 0
2180 vdu_name = None
2181 kdur = next(x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name)
2182 deploy_params_kdu = {"OSM": self._get_osm_params(db_vnfr)}
2183 if kdur.get("additionalParams"):
2184 deploy_params_kdu = self._format_additional_params(kdur["additionalParams"])
2185
2186 self._deploy_n2vc(
2187 logging_text=logging_text,
2188 db_nsr=db_nsr,
2189 db_vnfr=db_vnfr,
2190 nslcmop_id=nslcmop_id,
2191 nsr_id=nsr_id,
2192 nsi_id=nsi_id,
2193 vnfd_id=vnfd_id,
2194 vdu_id=vdu_id,
2195 kdu_name=kdu_name,
2196 member_vnf_index=member_vnf_index,
2197 vdu_index=vdu_index,
2198 vdu_name=vdu_name,
2199 deploy_params=deploy_params_kdu,
2200 descriptor_config=descriptor_config,
2201 base_folder=base_folder,
2202 task_instantiation_info=tasks_dict_info,
2203 stage=stage
2204 )
2205
2206 # Check if this NS has a charm configuration
2207 descriptor_config = nsd.get("ns-configuration")
2208 if descriptor_config and descriptor_config.get("juju"):
2209 vnfd_id = None
2210 db_vnfr = None
2211 member_vnf_index = None
2212 vdu_id = None
2213 kdu_name = None
2214 vdu_index = 0
2215 vdu_name = None
2216
2217 # Get additional parameters
2218 deploy_params = {"OSM": self._get_osm_params(db_vnfr)}
2219 if db_nsr.get("additionalParamsForNs"):
2220 deploy_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"].copy()))
2221 base_folder = nsd["_admin"]["storage"]
2222 self._deploy_n2vc(
2223 logging_text=logging_text,
2224 db_nsr=db_nsr,
2225 db_vnfr=db_vnfr,
2226 nslcmop_id=nslcmop_id,
2227 nsr_id=nsr_id,
2228 nsi_id=nsi_id,
2229 vnfd_id=vnfd_id,
2230 vdu_id=vdu_id,
2231 kdu_name=kdu_name,
2232 member_vnf_index=member_vnf_index,
2233 vdu_index=vdu_index,
2234 vdu_name=vdu_name,
2235 deploy_params=deploy_params,
2236 descriptor_config=descriptor_config,
2237 base_folder=base_folder,
2238 task_instantiation_info=tasks_dict_info,
2239 stage=stage
2240 )
2241
2242 # rest of staff will be done at finally
2243
2244 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
2245 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e))
2246 exc = e
2247 except asyncio.CancelledError:
2248 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
2249 exc = "Operation was cancelled"
2250 except Exception as e:
2251 exc = traceback.format_exc()
2252 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
2253 finally:
2254 if exc:
2255 error_list.append(str(exc))
2256 try:
2257 # wait for pending tasks
2258 if tasks_dict_info:
2259 stage[1] = "Waiting for instantiate pending tasks."
2260 self.logger.debug(logging_text + stage[1])
2261 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy,
2262 stage, nslcmop_id, nsr_id=nsr_id)
2263 stage[1] = stage[2] = ""
2264 except asyncio.CancelledError:
2265 error_list.append("Cancelled")
2266 # TODO cancel all tasks
2267 except Exception as exc:
2268 error_list.append(str(exc))
2269
2270 # update operation-status
2271 db_nsr_update["operational-status"] = "running"
2272 # let's begin with VCA 'configured' status (later we can change it)
2273 db_nsr_update["config-status"] = "configured"
2274 for task, task_name in tasks_dict_info.items():
2275 if not task.done() or task.cancelled() or task.exception():
2276 if task_name.startswith(self.task_name_deploy_vca):
2277 # A N2VC task is pending
2278 db_nsr_update["config-status"] = "failed"
2279 else:
2280 # RO or KDU task is pending
2281 db_nsr_update["operational-status"] = "failed"
2282
2283 # update status at database
2284 if error_list:
2285 error_detail = ". ".join(error_list)
2286 self.logger.error(logging_text + error_detail)
2287 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
2288 error_description_nsr = 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id, stage[0])
2289
2290 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
2291 db_nslcmop_update["detailed-status"] = error_detail
2292 nslcmop_operation_state = "FAILED"
2293 ns_state = "BROKEN"
2294 else:
2295 error_detail = None
2296 error_description_nsr = error_description_nslcmop = None
2297 ns_state = "READY"
2298 db_nsr_update["detailed-status"] = "Done"
2299 db_nslcmop_update["detailed-status"] = "Done"
2300 nslcmop_operation_state = "COMPLETED"
2301
2302 if db_nsr:
2303 self._write_ns_status(
2304 nsr_id=nsr_id,
2305 ns_state=ns_state,
2306 current_operation="IDLE",
2307 current_operation_id=None,
2308 error_description=error_description_nsr,
2309 error_detail=error_detail,
2310 other_update=db_nsr_update
2311 )
2312 self._write_op_status(
2313 op_id=nslcmop_id,
2314 stage="",
2315 error_message=error_description_nslcmop,
2316 operation_state=nslcmop_operation_state,
2317 other_update=db_nslcmop_update,
2318 )
2319
2320 if nslcmop_operation_state:
2321 try:
2322 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2323 "operationState": nslcmop_operation_state},
2324 loop=self.loop)
2325 except Exception as e:
2326 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2327
2328 self.logger.debug(logging_text + "Exit")
2329 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2330
2331 async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int,
2332 timeout: int = 3600, vca_type: str = None) -> bool:
2333
2334 # steps:
2335 # 1. find all relations for this VCA
2336 # 2. wait for other peers related
2337 # 3. add relations
2338
2339 try:
2340 vca_type = vca_type or "lxc_proxy_charm"
2341
2342 # STEP 1: find all relations for this VCA
2343
2344 # read nsr record
2345 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2346 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2347
2348 # this VCA data
2349 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
2350
2351 # read all ns-configuration relations
2352 ns_relations = list()
2353 db_ns_relations = deep_get(nsd, ('ns-configuration', 'relation'))
2354 if db_ns_relations:
2355 for r in db_ns_relations:
2356 # check if this VCA is in the relation
2357 if my_vca.get('member-vnf-index') in\
2358 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2359 ns_relations.append(r)
2360
2361 # read all vnf-configuration relations
2362 vnf_relations = list()
2363 db_vnfd_list = db_nsr.get('vnfd-id')
2364 if db_vnfd_list:
2365 for vnfd in db_vnfd_list:
2366 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
2367 db_vnf_relations = deep_get(db_vnfd, ('vnf-configuration', 'relation'))
2368 if db_vnf_relations:
2369 for r in db_vnf_relations:
2370 # check if this VCA is in the relation
2371 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2372 vnf_relations.append(r)
2373
2374 # if no relations, terminate
2375 if not ns_relations and not vnf_relations:
2376 self.logger.debug(logging_text + ' No relations')
2377 return True
2378
2379 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
2380
2381 # add all relations
2382 start = time()
2383 while True:
2384 # check timeout
2385 now = time()
2386 if now - start >= timeout:
2387 self.logger.error(logging_text + ' : timeout adding relations')
2388 return False
2389
2390 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2391 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2392
2393 # for each defined NS relation, find the VCA's related
2394 for r in ns_relations.copy():
2395 from_vca_ee_id = None
2396 to_vca_ee_id = None
2397 from_vca_endpoint = None
2398 to_vca_endpoint = None
2399 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2400 for vca in vca_list:
2401 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
2402 and vca.get('config_sw_installed'):
2403 from_vca_ee_id = vca.get('ee_id')
2404 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2405 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
2406 and vca.get('config_sw_installed'):
2407 to_vca_ee_id = vca.get('ee_id')
2408 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2409 if from_vca_ee_id and to_vca_ee_id:
2410 # add relation
2411 await self.vca_map[vca_type].add_relation(
2412 ee_id_1=from_vca_ee_id,
2413 ee_id_2=to_vca_ee_id,
2414 endpoint_1=from_vca_endpoint,
2415 endpoint_2=to_vca_endpoint)
2416 # remove entry from relations list
2417 ns_relations.remove(r)
2418 else:
2419 # check failed peers
2420 try:
2421 vca_status_list = db_nsr.get('configurationStatus')
2422 if vca_status_list:
2423 for i in range(len(vca_list)):
2424 vca = vca_list[i]
2425 vca_status = vca_status_list[i]
2426 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
2427 if vca_status.get('status') == 'BROKEN':
2428 # peer broken: remove relation from list
2429 ns_relations.remove(r)
2430 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
2431 if vca_status.get('status') == 'BROKEN':
2432 # peer broken: remove relation from list
2433 ns_relations.remove(r)
2434 except Exception:
2435 # ignore
2436 pass
2437
2438 # for each defined VNF relation, find the VCA's related
2439 for r in vnf_relations.copy():
2440 from_vca_ee_id = None
2441 to_vca_ee_id = None
2442 from_vca_endpoint = None
2443 to_vca_endpoint = None
2444 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2445 for vca in vca_list:
2446 key_to_check = "vdu_id"
2447 if vca.get("vdu_id") is None:
2448 key_to_check = "vnfd_id"
2449 if vca.get(key_to_check) == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
2450 from_vca_ee_id = vca.get('ee_id')
2451 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2452 if vca.get(key_to_check) == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
2453 to_vca_ee_id = vca.get('ee_id')
2454 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2455 if from_vca_ee_id and to_vca_ee_id:
2456 # add relation
2457 await self.vca_map[vca_type].add_relation(
2458 ee_id_1=from_vca_ee_id,
2459 ee_id_2=to_vca_ee_id,
2460 endpoint_1=from_vca_endpoint,
2461 endpoint_2=to_vca_endpoint)
2462 # remove entry from relations list
2463 vnf_relations.remove(r)
2464 else:
2465 # check failed peers
2466 try:
2467 vca_status_list = db_nsr.get('configurationStatus')
2468 if vca_status_list:
2469 for i in range(len(vca_list)):
2470 vca = vca_list[i]
2471 vca_status = vca_status_list[i]
2472 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
2473 if vca_status.get('status') == 'BROKEN':
2474 # peer broken: remove relation from list
2475 vnf_relations.remove(r)
2476 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2477 if vca_status.get('status') == 'BROKEN':
2478 # peer broken: remove relation from list
2479 vnf_relations.remove(r)
2480 except Exception:
2481 # ignore
2482 pass
2483
2484 # wait for next try
2485 await asyncio.sleep(5.0)
2486
2487 if not ns_relations and not vnf_relations:
2488 self.logger.debug('Relations added')
2489 break
2490
2491 return True
2492
2493 except Exception as e:
2494 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2495 return False
2496
2497 async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdud: dict,
2498 vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600):
2499
2500 try:
2501 k8sclustertype = k8s_instance_info["k8scluster-type"]
2502 # Instantiate kdu
2503 db_dict_install = {"collection": "nsrs",
2504 "filter": {"_id": nsr_id},
2505 "path": nsr_db_path}
2506
2507 kdu_instance = await self.k8scluster_map[k8sclustertype].install(
2508 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2509 kdu_model=k8s_instance_info["kdu-model"],
2510 atomic=True,
2511 params=k8params,
2512 db_dict=db_dict_install,
2513 timeout=timeout,
2514 kdu_name=k8s_instance_info["kdu-name"],
2515 namespace=k8s_instance_info["namespace"])
2516 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
2517
2518 # Obtain services to obtain management service ip
2519 services = await self.k8scluster_map[k8sclustertype].get_services(
2520 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2521 kdu_instance=kdu_instance,
2522 namespace=k8s_instance_info["namespace"])
2523
2524 # Obtain management service info (if exists)
2525 vnfr_update_dict = {}
2526 if services:
2527 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
2528 mgmt_services = [service for service in kdud.get("service", []) if service.get("mgmt-service")]
2529 for mgmt_service in mgmt_services:
2530 for service in services:
2531 if service["name"].startswith(mgmt_service["name"]):
2532 # Mgmt service found, Obtain service ip
2533 ip = service.get("external_ip", service.get("cluster_ip"))
2534 if isinstance(ip, list) and len(ip) == 1:
2535 ip = ip[0]
2536
2537 vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip
2538
2539 # Check if must update also mgmt ip at the vnf
2540 service_external_cp = mgmt_service.get("external-connection-point-ref")
2541 if service_external_cp:
2542 if deep_get(vnfd, ("mgmt-interface", "cp")) == service_external_cp:
2543 vnfr_update_dict["ip-address"] = ip
2544
2545 break
2546 else:
2547 self.logger.warn("Mgmt service name: {} not found".format(mgmt_service["name"]))
2548
2549 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
2550 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
2551
2552 kdu_config = kdud.get("kdu-configuration")
2553 if kdu_config and kdu_config.get("initial-config-primitive") and kdu_config.get("juju") is None:
2554 initial_config_primitive_list = kdu_config.get("initial-config-primitive")
2555 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
2556
2557 for initial_config_primitive in initial_config_primitive_list:
2558 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, {})
2559
2560 await asyncio.wait_for(
2561 self.k8scluster_map[k8sclustertype].exec_primitive(
2562 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2563 kdu_instance=kdu_instance,
2564 primitive_name=initial_config_primitive["name"],
2565 params=primitive_params_, db_dict={}),
2566 timeout=timeout)
2567
2568 except Exception as e:
2569 # Prepare update db with error and raise exception
2570 try:
2571 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)})
2572 self.update_db_2("vnfrs", vnfr_data.get("_id"), {"kdur.{}.status".format(kdu_index): "ERROR"})
2573 except Exception:
2574 # ignore to keep original exception
2575 pass
2576 # reraise original error
2577 raise
2578
2579 return kdu_instance
2580
2581 async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
2582 # Launch kdus if present in the descriptor
2583
2584 k8scluster_id_2_uuic = {"helm-chart-v3": {}, "helm-chart": {}, "juju-bundle": {}}
2585
2586 async def _get_cluster_id(cluster_id, cluster_type):
2587 nonlocal k8scluster_id_2_uuic
2588 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2589 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2590
2591 # check if K8scluster is creating and wait look if previous tasks in process
2592 task_name, task_dependency = self.lcm_tasks.lookfor_related("k8scluster", cluster_id)
2593 if task_dependency:
2594 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name, cluster_id)
2595 self.logger.debug(logging_text + text)
2596 await asyncio.wait(task_dependency, timeout=3600)
2597
2598 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2599 if not db_k8scluster:
2600 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2601
2602 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2603 if not k8s_id:
2604 if cluster_type == "helm-chart-v3":
2605 try:
2606 # backward compatibility for existing clusters that have not been initialized for helm v3
2607 k8s_credentials = yaml.safe_dump(db_k8scluster.get("credentials"))
2608 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(k8s_credentials,
2609 reuse_cluster_uuid=cluster_id)
2610 db_k8scluster_update = {}
2611 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
2612 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
2613 db_k8scluster_update["_admin.helm-chart-v3.created"] = uninstall_sw
2614 db_k8scluster_update["_admin.helm-chart-v3.operationalState"] = "ENABLED"
2615 self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update)
2616 except Exception as e:
2617 self.logger.error(logging_text + "error initializing helm-v3 cluster: {}".format(str(e)))
2618 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id,
2619 cluster_type))
2620 else:
2621 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".
2622 format(cluster_id, cluster_type))
2623 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2624 return k8s_id
2625
2626 logging_text += "Deploy kdus: "
2627 step = ""
2628 try:
2629 db_nsr_update = {"_admin.deployed.K8s": []}
2630 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2631
2632 index = 0
2633 updated_cluster_list = []
2634 updated_v3_cluster_list = []
2635
2636 for vnfr_data in db_vnfrs.values():
2637 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
2638 # Step 0: Prepare and set parameters
2639 desc_params = self._format_additional_params(kdur.get("additionalParams"))
2640 vnfd_id = vnfr_data.get('vnfd-id')
2641 kdud = next(kdud for kdud in db_vnfds[vnfd_id]["kdu"] if kdud["name"] == kdur["kdu-name"])
2642 namespace = kdur.get("k8s-namespace")
2643 if kdur.get("helm-chart"):
2644 kdumodel = kdur["helm-chart"]
2645 # Default version: helm3, if helm-version is v2 assign v2
2646 k8sclustertype = "helm-chart-v3"
2647 self.logger.debug("kdur: {}".format(kdur))
2648 if kdur.get("helm-version") and kdur.get("helm-version") == "v2":
2649 k8sclustertype = "helm-chart"
2650 elif kdur.get("juju-bundle"):
2651 kdumodel = kdur["juju-bundle"]
2652 k8sclustertype = "juju-bundle"
2653 else:
2654 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2655 "juju-bundle. Maybe an old NBI version is running".
2656 format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
2657 # check if kdumodel is a file and exists
2658 try:
2659 storage = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage'))
2660 if storage and storage.get('pkg-dir'): # may be not present if vnfd has not artifacts
2661 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2662 filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["pkg-dir"], k8sclustertype,
2663 kdumodel)
2664 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2665 kdumodel = self.fs.path + filename
2666 except (asyncio.TimeoutError, asyncio.CancelledError):
2667 raise
2668 except Exception: # it is not a file
2669 pass
2670
2671 k8s_cluster_id = kdur["k8s-cluster"]["id"]
2672 step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
2673 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
2674
2675 # Synchronize repos
2676 if (k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list)\
2677 or (k8sclustertype == "helm-chart-v3" and cluster_uuid not in updated_v3_cluster_list):
2678 del_repo_list, added_repo_dict = await asyncio.ensure_future(
2679 self.k8scluster_map[k8sclustertype].synchronize_repos(cluster_uuid=cluster_uuid))
2680 if del_repo_list or added_repo_dict:
2681 if k8sclustertype == "helm-chart":
2682 unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
2683 updated = {'_admin.helm_charts_added.' +
2684 item: name for item, name in added_repo_dict.items()}
2685 updated_cluster_list.append(cluster_uuid)
2686 elif k8sclustertype == "helm-chart-v3":
2687 unset = {'_admin.helm_charts_v3_added.' + item: None for item in del_repo_list}
2688 updated = {'_admin.helm_charts_v3_added.' +
2689 item: name for item, name in added_repo_dict.items()}
2690 updated_v3_cluster_list.append(cluster_uuid)
2691 self.logger.debug(logging_text + "repos synchronized on k8s cluster "
2692 "'{}' to_delete: {}, to_add: {}".
2693 format(k8s_cluster_id, del_repo_list, added_repo_dict))
2694 self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
2695
2696 # Instantiate kdu
2697 step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
2698 kdur["kdu-name"], k8s_cluster_id)
2699 k8s_instance_info = {"kdu-instance": None,
2700 "k8scluster-uuid": cluster_uuid,
2701 "k8scluster-type": k8sclustertype,
2702 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
2703 "kdu-name": kdur["kdu-name"],
2704 "kdu-model": kdumodel,
2705 "namespace": namespace}
2706 db_path = "_admin.deployed.K8s.{}".format(index)
2707 db_nsr_update[db_path] = k8s_instance_info
2708 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2709
2710 task = asyncio.ensure_future(
2711 self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdud, db_vnfds[vnfd_id],
2712 k8s_instance_info, k8params=desc_params, timeout=600))
2713 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
2714 task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
2715
2716 index += 1
2717
2718 except (LcmException, asyncio.CancelledError):
2719 raise
2720 except Exception as e:
2721 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
2722 if isinstance(e, (N2VCException, DbException)):
2723 self.logger.error(logging_text + msg)
2724 else:
2725 self.logger.critical(logging_text + msg, exc_info=True)
2726 raise LcmException(msg)
2727 finally:
2728 if db_nsr_update:
2729 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2730
2731 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2732 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2733 base_folder, task_instantiation_info, stage):
2734 # launch instantiate_N2VC in a asyncio task and register task object
2735 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2736 # if not found, create one entry and update database
2737 # fill db_nsr._admin.deployed.VCA.<index>
2738
2739 self.logger.debug(logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id))
2740 if descriptor_config.get("juju"): # There is one execution envioronment of type juju
2741 ee_list = [descriptor_config]
2742 elif descriptor_config.get("execution-environment-list"):
2743 ee_list = descriptor_config.get("execution-environment-list")
2744 else: # other types as script are not supported
2745 ee_list = []
2746
2747 for ee_item in ee_list:
2748 self.logger.debug(logging_text + "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item.get('juju'),
2749 ee_item.get("helm-chart")))
2750 ee_descriptor_id = ee_item.get("id")
2751 if ee_item.get("juju"):
2752 vca_name = ee_item['juju'].get('charm')
2753 vca_type = "lxc_proxy_charm" if ee_item['juju'].get('charm') is not None else "native_charm"
2754 if ee_item['juju'].get('cloud') == "k8s":
2755 vca_type = "k8s_proxy_charm"
2756 elif ee_item['juju'].get('proxy') is False:
2757 vca_type = "native_charm"
2758 elif ee_item.get("helm-chart"):
2759 vca_name = ee_item['helm-chart']
2760 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
2761 vca_type = "helm"
2762 else:
2763 vca_type = "helm-v3"
2764 else:
2765 self.logger.debug(logging_text + "skipping non juju neither charm configuration")
2766 continue
2767
2768 vca_index = -1
2769 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2770 if not vca_deployed:
2771 continue
2772 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2773 vca_deployed.get("vdu_id") == vdu_id and \
2774 vca_deployed.get("kdu_name") == kdu_name and \
2775 vca_deployed.get("vdu_count_index", 0) == vdu_index and \
2776 vca_deployed.get("ee_descriptor_id") == ee_descriptor_id:
2777 break
2778 else:
2779 # not found, create one.
2780 target = "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
2781 if vdu_id:
2782 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
2783 elif kdu_name:
2784 target += "/kdu/{}".format(kdu_name)
2785 vca_deployed = {
2786 "target_element": target,
2787 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
2788 "member-vnf-index": member_vnf_index,
2789 "vdu_id": vdu_id,
2790 "kdu_name": kdu_name,
2791 "vdu_count_index": vdu_index,
2792 "operational-status": "init", # TODO revise
2793 "detailed-status": "", # TODO revise
2794 "step": "initial-deploy", # TODO revise
2795 "vnfd_id": vnfd_id,
2796 "vdu_name": vdu_name,
2797 "type": vca_type,
2798 "ee_descriptor_id": ee_descriptor_id
2799 }
2800 vca_index += 1
2801
2802 # create VCA and configurationStatus in db
2803 db_dict = {
2804 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2805 "configurationStatus.{}".format(vca_index): dict()
2806 }
2807 self.update_db_2("nsrs", nsr_id, db_dict)
2808
2809 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2810
2811 # Launch task
2812 task_n2vc = asyncio.ensure_future(
2813 self.instantiate_N2VC(
2814 logging_text=logging_text,
2815 vca_index=vca_index,
2816 nsi_id=nsi_id,
2817 db_nsr=db_nsr,
2818 db_vnfr=db_vnfr,
2819 vdu_id=vdu_id,
2820 kdu_name=kdu_name,
2821 vdu_index=vdu_index,
2822 deploy_params=deploy_params,
2823 config_descriptor=descriptor_config,
2824 base_folder=base_folder,
2825 nslcmop_id=nslcmop_id,
2826 stage=stage,
2827 vca_type=vca_type,
2828 vca_name=vca_name,
2829 ee_config_descriptor=ee_item
2830 )
2831 )
2832 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2833 task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
2834 member_vnf_index or "", vdu_id or "")
2835
2836 @staticmethod
2837 def _get_terminate_config_primitive(primitive_list, vca_deployed):
2838 """ Get a sorted terminate config primitive list. In case ee_descriptor_id is present at vca_deployed,
2839 it get only those primitives for this execution envirom"""
2840
2841 primitive_list = primitive_list or []
2842 # filter primitives by ee_descriptor_id
2843 ee_descriptor_id = vca_deployed.get("ee_descriptor_id")
2844 primitive_list = [p for p in primitive_list if p.get("execution-environment-ref") == ee_descriptor_id]
2845
2846 if primitive_list:
2847 primitive_list.sort(key=lambda val: int(val['seq']))
2848
2849 return primitive_list
2850
2851 @staticmethod
2852 def _create_nslcmop(nsr_id, operation, params):
2853 """
2854 Creates a ns-lcm-opp content to be stored at database.
2855 :param nsr_id: internal id of the instance
2856 :param operation: instantiate, terminate, scale, action, ...
2857 :param params: user parameters for the operation
2858 :return: dictionary following SOL005 format
2859 """
2860 # Raise exception if invalid arguments
2861 if not (nsr_id and operation and params):
2862 raise LcmException(
2863 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2864 now = time()
2865 _id = str(uuid4())
2866 nslcmop = {
2867 "id": _id,
2868 "_id": _id,
2869 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2870 "operationState": "PROCESSING",
2871 "statusEnteredTime": now,
2872 "nsInstanceId": nsr_id,
2873 "lcmOperationType": operation,
2874 "startTime": now,
2875 "isAutomaticInvocation": False,
2876 "operationParams": params,
2877 "isCancelPending": False,
2878 "links": {
2879 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2880 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2881 }
2882 }
2883 return nslcmop
2884
2885 def _format_additional_params(self, params):
2886 params = params or {}
2887 for key, value in params.items():
2888 if str(value).startswith("!!yaml "):
2889 params[key] = yaml.safe_load(value[7:])
2890 return params
2891
2892 def _get_terminate_primitive_params(self, seq, vnf_index):
2893 primitive = seq.get('name')
2894 primitive_params = {}
2895 params = {
2896 "member_vnf_index": vnf_index,
2897 "primitive": primitive,
2898 "primitive_params": primitive_params,
2899 }
2900 desc_params = {}
2901 return self._map_primitive_params(seq, params, desc_params)
2902
2903 # sub-operations
2904
2905 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
2906 op = deep_get(db_nslcmop, ('_admin', 'operations'), [])[op_index]
2907 if op.get('operationState') == 'COMPLETED':
2908 # b. Skip sub-operation
2909 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2910 return self.SUBOPERATION_STATUS_SKIP
2911 else:
2912 # c. retry executing sub-operation
2913 # The sub-operation exists, and operationState != 'COMPLETED'
2914 # Update operationState = 'PROCESSING' to indicate a retry.
2915 operationState = 'PROCESSING'
2916 detailed_status = 'In progress'
2917 self._update_suboperation_status(
2918 db_nslcmop, op_index, operationState, detailed_status)
2919 # Return the sub-operation index
2920 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2921 # with arguments extracted from the sub-operation
2922 return op_index
2923
2924 # Find a sub-operation where all keys in a matching dictionary must match
2925 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2926 def _find_suboperation(self, db_nslcmop, match):
2927 if db_nslcmop and match:
2928 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2929 for i, op in enumerate(op_list):
2930 if all(op.get(k) == match[k] for k in match):
2931 return i
2932 return self.SUBOPERATION_STATUS_NOT_FOUND
2933
2934 # Update status for a sub-operation given its index
2935 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2936 # Update DB for HA tasks
2937 q_filter = {'_id': db_nslcmop['_id']}
2938 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2939 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2940 self.db.set_one("nslcmops",
2941 q_filter=q_filter,
2942 update_dict=update_dict,
2943 fail_on_empty=False)
2944
2945 # Add sub-operation, return the index of the added sub-operation
2946 # Optionally, set operationState, detailed-status, and operationType
2947 # Status and type are currently set for 'scale' sub-operations:
2948 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2949 # 'detailed-status' : status message
2950 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2951 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2952 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2953 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2954 RO_nsr_id=None, RO_scaling_info=None):
2955 if not db_nslcmop:
2956 return self.SUBOPERATION_STATUS_NOT_FOUND
2957 # Get the "_admin.operations" list, if it exists
2958 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2959 op_list = db_nslcmop_admin.get('operations')
2960 # Create or append to the "_admin.operations" list
2961 new_op = {'member_vnf_index': vnf_index,
2962 'vdu_id': vdu_id,
2963 'vdu_count_index': vdu_count_index,
2964 'primitive': primitive,
2965 'primitive_params': mapped_primitive_params}
2966 if operationState:
2967 new_op['operationState'] = operationState
2968 if detailed_status:
2969 new_op['detailed-status'] = detailed_status
2970 if operationType:
2971 new_op['lcmOperationType'] = operationType
2972 if RO_nsr_id:
2973 new_op['RO_nsr_id'] = RO_nsr_id
2974 if RO_scaling_info:
2975 new_op['RO_scaling_info'] = RO_scaling_info
2976 if not op_list:
2977 # No existing operations, create key 'operations' with current operation as first list element
2978 db_nslcmop_admin.update({'operations': [new_op]})
2979 op_list = db_nslcmop_admin.get('operations')
2980 else:
2981 # Existing operations, append operation to list
2982 op_list.append(new_op)
2983
2984 db_nslcmop_update = {'_admin.operations': op_list}
2985 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2986 op_index = len(op_list) - 1
2987 return op_index
2988
2989 # Helper methods for scale() sub-operations
2990
2991 # pre-scale/post-scale:
2992 # Check for 3 different cases:
2993 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2994 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2995 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2996 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2997 operationType, RO_nsr_id=None, RO_scaling_info=None):
2998 # Find this sub-operation
2999 if RO_nsr_id and RO_scaling_info:
3000 operationType = 'SCALE-RO'
3001 match = {
3002 'member_vnf_index': vnf_index,
3003 'RO_nsr_id': RO_nsr_id,
3004 'RO_scaling_info': RO_scaling_info,
3005 }
3006 else:
3007 match = {
3008 'member_vnf_index': vnf_index,
3009 'primitive': vnf_config_primitive,
3010 'primitive_params': primitive_params,
3011 'lcmOperationType': operationType
3012 }
3013 op_index = self._find_suboperation(db_nslcmop, match)
3014 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3015 # a. New sub-operation
3016 # The sub-operation does not exist, add it.
3017 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3018 # The following parameters are set to None for all kind of scaling:
3019 vdu_id = None
3020 vdu_count_index = None
3021 vdu_name = None
3022 if RO_nsr_id and RO_scaling_info:
3023 vnf_config_primitive = None
3024 primitive_params = None
3025 else:
3026 RO_nsr_id = None
3027 RO_scaling_info = None
3028 # Initial status for sub-operation
3029 operationState = 'PROCESSING'
3030 detailed_status = 'In progress'
3031 # Add sub-operation for pre/post-scaling (zero or more operations)
3032 self._add_suboperation(db_nslcmop,
3033 vnf_index,
3034 vdu_id,
3035 vdu_count_index,
3036 vdu_name,
3037 vnf_config_primitive,
3038 primitive_params,
3039 operationState,
3040 detailed_status,
3041 operationType,
3042 RO_nsr_id,
3043 RO_scaling_info)
3044 return self.SUBOPERATION_STATUS_NEW
3045 else:
3046 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3047 # or op_index (operationState != 'COMPLETED')
3048 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3049
3050 # Function to return execution_environment id
3051
3052 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3053 # TODO vdu_index_count
3054 for vca in vca_deployed_list:
3055 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3056 return vca["ee_id"]
3057
3058 async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor,
3059 vca_index, destroy_ee=True, exec_primitives=True):
3060 """
3061 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3062 :param logging_text:
3063 :param db_nslcmop:
3064 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3065 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3066 :param vca_index: index in the database _admin.deployed.VCA
3067 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3068 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3069 not executed properly
3070 :return: None or exception
3071 """
3072
3073 self.logger.debug(
3074 logging_text + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3075 vca_index, vca_deployed, config_descriptor, destroy_ee
3076 )
3077 )
3078
3079 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
3080
3081 # execute terminate_primitives
3082 if exec_primitives:
3083 terminate_primitives = self._get_terminate_config_primitive(
3084 config_descriptor.get("terminate-config-primitive"), vca_deployed)
3085 vdu_id = vca_deployed.get("vdu_id")
3086 vdu_count_index = vca_deployed.get("vdu_count_index")
3087 vdu_name = vca_deployed.get("vdu_name")
3088 vnf_index = vca_deployed.get("member-vnf-index")
3089 if terminate_primitives and vca_deployed.get("needed_terminate"):
3090 for seq in terminate_primitives:
3091 # For each sequence in list, get primitive and call _ns_execute_primitive()
3092 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
3093 vnf_index, seq.get("name"))
3094 self.logger.debug(logging_text + step)
3095 # Create the primitive for each sequence, i.e. "primitive": "touch"
3096 primitive = seq.get('name')
3097 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
3098
3099 # Add sub-operation
3100 self._add_suboperation(db_nslcmop,
3101 vnf_index,
3102 vdu_id,
3103 vdu_count_index,
3104 vdu_name,
3105 primitive,
3106 mapped_primitive_params)
3107 # Sub-operations: Call _ns_execute_primitive() instead of action()
3108 try:
3109 result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
3110 mapped_primitive_params,
3111 vca_type=vca_type)
3112 except LcmException:
3113 # this happens when VCA is not deployed. In this case it is not needed to terminate
3114 continue
3115 result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
3116 if result not in result_ok:
3117 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
3118 "error {}".format(seq.get("name"), vnf_index, result_detail))
3119 # set that this VCA do not need terminated
3120 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
3121 self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
3122
3123 if vca_deployed.get("prometheus_jobs") and self.prometheus:
3124 await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
3125
3126 if destroy_ee:
3127 await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"])
3128
3129 async def _delete_all_N2VC(self, db_nsr: dict):
3130 self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
3131 namespace = "." + db_nsr["_id"]
3132 try:
3133 await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
3134 except N2VCNotFound: # already deleted. Skip
3135 pass
3136 self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
3137
3138 async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
3139 """
3140 Terminates a deployment from RO
3141 :param logging_text:
3142 :param nsr_deployed: db_nsr._admin.deployed
3143 :param nsr_id:
3144 :param nslcmop_id:
3145 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
3146 this method will update only the index 2, but it will write on database the concatenated content of the list
3147 :return:
3148 """
3149 db_nsr_update = {}
3150 failed_detail = []
3151 ro_nsr_id = ro_delete_action = None
3152 if nsr_deployed and nsr_deployed.get("RO"):
3153 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
3154 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
3155 try:
3156 if ro_nsr_id:
3157 stage[2] = "Deleting ns from VIM."
3158 db_nsr_update["detailed-status"] = " ".join(stage)
3159 self._write_op_status(nslcmop_id, stage)
3160 self.logger.debug(logging_text + stage[2])
3161 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3162 self._write_op_status(nslcmop_id, stage)
3163 desc = await self.RO.delete("ns", ro_nsr_id)
3164 ro_delete_action = desc["action_id"]
3165 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
3166 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3167 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3168 if ro_delete_action:
3169 # wait until NS is deleted from VIM
3170 stage[2] = "Waiting ns deleted from VIM."
3171 detailed_status_old = None
3172 self.logger.debug(logging_text + stage[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id,
3173 ro_delete_action))
3174 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3175 self._write_op_status(nslcmop_id, stage)
3176
3177 delete_timeout = 20 * 60 # 20 minutes
3178 while delete_timeout > 0:
3179 desc = await self.RO.show(
3180 "ns",
3181 item_id_name=ro_nsr_id,
3182 extra_item="action",
3183 extra_item_id=ro_delete_action)
3184
3185 # deploymentStatus
3186 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3187
3188 ns_status, ns_status_info = self.RO.check_action_status(desc)
3189 if ns_status == "ERROR":
3190 raise ROclient.ROClientException(ns_status_info)
3191 elif ns_status == "BUILD":
3192 stage[2] = "Deleting from VIM {}".format(ns_status_info)
3193 elif ns_status == "ACTIVE":
3194 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3195 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3196 break
3197 else:
3198 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
3199 if stage[2] != detailed_status_old:
3200 detailed_status_old = stage[2]
3201 db_nsr_update["detailed-status"] = " ".join(stage)
3202 self._write_op_status(nslcmop_id, stage)
3203 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3204 await asyncio.sleep(5, loop=self.loop)
3205 delete_timeout -= 5
3206 else: # delete_timeout <= 0:
3207 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
3208
3209 except Exception as e:
3210 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3211 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3212 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3213 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3214 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3215 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id))
3216 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3217 failed_detail.append("delete conflict: {}".format(e))
3218 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
3219 else:
3220 failed_detail.append("delete error: {}".format(e))
3221 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e))
3222
3223 # Delete nsd
3224 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
3225 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
3226 try:
3227 stage[2] = "Deleting nsd from RO."
3228 db_nsr_update["detailed-status"] = " ".join(stage)
3229 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3230 self._write_op_status(nslcmop_id, stage)
3231 await self.RO.delete("nsd", ro_nsd_id)
3232 self.logger.debug(logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id))
3233 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3234 except Exception as e:
3235 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3236 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3237 self.logger.debug(logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id))
3238 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3239 failed_detail.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e))
3240 self.logger.debug(logging_text + failed_detail[-1])
3241 else:
3242 failed_detail.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id, e))
3243 self.logger.error(logging_text + failed_detail[-1])
3244
3245 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
3246 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
3247 if not vnf_deployed or not vnf_deployed["id"]:
3248 continue
3249 try:
3250 ro_vnfd_id = vnf_deployed["id"]
3251 stage[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
3252 vnf_deployed["member-vnf-index"], ro_vnfd_id)
3253 db_nsr_update["detailed-status"] = " ".join(stage)
3254 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3255 self._write_op_status(nslcmop_id, stage)
3256 await self.RO.delete("vnfd", ro_vnfd_id)
3257 self.logger.debug(logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id))
3258 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
3259 except Exception as e:
3260 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3261 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
3262 self.logger.debug(logging_text + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id))
3263 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3264 failed_detail.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e))
3265 self.logger.debug(logging_text + failed_detail[-1])
3266 else:
3267 failed_detail.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e))
3268 self.logger.error(logging_text + failed_detail[-1])
3269
3270 if failed_detail:
3271 stage[2] = "Error deleting from VIM"
3272 else:
3273 stage[2] = "Deleted from VIM"
3274 db_nsr_update["detailed-status"] = " ".join(stage)
3275 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3276 self._write_op_status(nslcmop_id, stage)
3277
3278 if failed_detail:
3279 raise LcmException("; ".join(failed_detail))
3280
3281 async def terminate(self, nsr_id, nslcmop_id):
3282 # Try to lock HA task here
3283 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3284 if not task_is_locked_by_me:
3285 return
3286
3287 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
3288 self.logger.debug(logging_text + "Enter")
3289 timeout_ns_terminate = self.timeout_ns_terminate
3290 db_nsr = None
3291 db_nslcmop = None
3292 operation_params = None
3293 exc = None
3294 error_list = [] # annotates all failed error messages
3295 db_nslcmop_update = {}
3296 autoremove = False # autoremove after terminated
3297 tasks_dict_info = {}
3298 db_nsr_update = {}
3299 stage = ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
3300 # ^ contains [stage, step, VIM-status]
3301 try:
3302 # wait for any previous tasks in process
3303 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
3304
3305 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
3306 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3307 operation_params = db_nslcmop.get("operationParams") or {}
3308 if operation_params.get("timeout_ns_terminate"):
3309 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
3310 stage[1] = "Getting nsr={} from db.".format(nsr_id)
3311 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3312
3313 db_nsr_update["operational-status"] = "terminating"
3314 db_nsr_update["config-status"] = "terminating"
3315 self._write_ns_status(
3316 nsr_id=nsr_id,
3317 ns_state="TERMINATING",
3318 current_operation="TERMINATING",
3319 current_operation_id=nslcmop_id,
3320 other_update=db_nsr_update
3321 )
3322 self._write_op_status(
3323 op_id=nslcmop_id,
3324 queuePosition=0,
3325 stage=stage
3326 )
3327 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
3328 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
3329 return
3330
3331 stage[1] = "Getting vnf descriptors from db."
3332 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
3333 db_vnfds_from_id = {}
3334 db_vnfds_from_member_index = {}
3335 # Loop over VNFRs
3336 for vnfr in db_vnfrs_list:
3337 vnfd_id = vnfr["vnfd-id"]
3338 if vnfd_id not in db_vnfds_from_id:
3339 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
3340 db_vnfds_from_id[vnfd_id] = vnfd
3341 db_vnfds_from_member_index[vnfr["member-vnf-index-ref"]] = db_vnfds_from_id[vnfd_id]
3342
3343 # Destroy individual execution environments when there are terminating primitives.
3344 # Rest of EE will be deleted at once
3345 # TODO - check before calling _destroy_N2VC
3346 # if not operation_params.get("skip_terminate_primitives"):#
3347 # or not vca.get("needed_terminate"):
3348 stage[0] = "Stage 2/3 execute terminating primitives."
3349 self.logger.debug(logging_text + stage[0])
3350 stage[1] = "Looking execution environment that needs terminate."
3351 self.logger.debug(logging_text + stage[1])
3352 # self.logger.debug("nsr_deployed: {}".format(nsr_deployed))
3353 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
3354 config_descriptor = None
3355 if not vca or not vca.get("ee_id"):
3356 continue
3357 if not vca.get("member-vnf-index"):
3358 # ns
3359 config_descriptor = db_nsr.get("ns-configuration")
3360 elif vca.get("vdu_id"):
3361 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3362 vdud = next((vdu for vdu in db_vnfd.get("vdu", ()) if vdu["id"] == vca.get("vdu_id")), None)
3363 if vdud:
3364 config_descriptor = vdud.get("vdu-configuration")
3365 elif vca.get("kdu_name"):
3366 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3367 kdud = next((kdu for kdu in db_vnfd.get("kdu", ()) if kdu["name"] == vca.get("kdu_name")), None)
3368 if kdud:
3369 config_descriptor = kdud.get("kdu-configuration")
3370 else:
3371 config_descriptor = db_vnfds_from_member_index[vca["member-vnf-index"]].get("vnf-configuration")
3372 vca_type = vca.get("type")
3373 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
3374 vca.get("needed_terminate"))
3375 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
3376 # pending native charms
3377 destroy_ee = True if vca_type in ("helm", "helm-v3", "native_charm") else False
3378 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
3379 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
3380 task = asyncio.ensure_future(
3381 self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index,
3382 destroy_ee, exec_terminate_primitives))
3383 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
3384
3385 # wait for pending tasks of terminate primitives
3386 if tasks_dict_info:
3387 self.logger.debug(logging_text + 'Waiting for tasks {}'.format(list(tasks_dict_info.keys())))
3388 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
3389 min(self.timeout_charm_delete, timeout_ns_terminate),
3390 stage, nslcmop_id)
3391 tasks_dict_info.clear()
3392 if error_list:
3393 return # raise LcmException("; ".join(error_list))
3394
3395 # remove All execution environments at once
3396 stage[0] = "Stage 3/3 delete all."
3397
3398 if nsr_deployed.get("VCA"):
3399 stage[1] = "Deleting all execution environments."
3400 self.logger.debug(logging_text + stage[1])
3401 task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_all_N2VC(db_nsr=db_nsr),
3402 timeout=self.timeout_charm_delete))
3403 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3404 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
3405
3406 # Delete from k8scluster
3407 stage[1] = "Deleting KDUs."
3408 self.logger.debug(logging_text + stage[1])
3409 # print(nsr_deployed)
3410 for kdu in get_iterable(nsr_deployed, "K8s"):
3411 if not kdu or not kdu.get("kdu-instance"):
3412 continue
3413 kdu_instance = kdu.get("kdu-instance")
3414 if kdu.get("k8scluster-type") in self.k8scluster_map:
3415 task_delete_kdu_instance = asyncio.ensure_future(
3416 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
3417 cluster_uuid=kdu.get("k8scluster-uuid"),
3418 kdu_instance=kdu_instance))
3419 else:
3420 self.logger.error(logging_text + "Unknown k8s deployment type {}".
3421 format(kdu.get("k8scluster-type")))
3422 continue
3423 tasks_dict_info[task_delete_kdu_instance] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
3424
3425 # remove from RO
3426 stage[1] = "Deleting ns from VIM."
3427 if self.ng_ro:
3428 task_delete_ro = asyncio.ensure_future(
3429 self._terminate_ng_ro(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3430 else:
3431 task_delete_ro = asyncio.ensure_future(
3432 self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3433 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
3434
3435 # rest of staff will be done at finally
3436
3437 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
3438 self.logger.error(logging_text + "Exit Exception {}".format(e))
3439 exc = e
3440 except asyncio.CancelledError:
3441 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
3442 exc = "Operation was cancelled"
3443 except Exception as e:
3444 exc = traceback.format_exc()
3445 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
3446 finally:
3447 if exc:
3448 error_list.append(str(exc))
3449 try:
3450 # wait for pending tasks
3451 if tasks_dict_info:
3452 stage[1] = "Waiting for terminate pending tasks."
3453 self.logger.debug(logging_text + stage[1])
3454 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_terminate,
3455 stage, nslcmop_id)
3456 stage[1] = stage[2] = ""
3457 except asyncio.CancelledError:
3458 error_list.append("Cancelled")
3459 # TODO cancell all tasks
3460 except Exception as exc:
3461 error_list.append(str(exc))
3462 # update status at database
3463 if error_list:
3464 error_detail = "; ".join(error_list)
3465 # self.logger.error(logging_text + error_detail)
3466 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
3467 error_description_nsr = 'Operation: TERMINATING.{}, {}.'.format(nslcmop_id, stage[0])
3468
3469 db_nsr_update["operational-status"] = "failed"
3470 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
3471 db_nslcmop_update["detailed-status"] = error_detail
3472 nslcmop_operation_state = "FAILED"
3473 ns_state = "BROKEN"
3474 else:
3475 error_detail = None
3476 error_description_nsr = error_description_nslcmop = None
3477 ns_state = "NOT_INSTANTIATED"
3478 db_nsr_update["operational-status"] = "terminated"
3479 db_nsr_update["detailed-status"] = "Done"
3480 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
3481 db_nslcmop_update["detailed-status"] = "Done"
3482 nslcmop_operation_state = "COMPLETED"
3483
3484 if db_nsr:
3485 self._write_ns_status(
3486 nsr_id=nsr_id,
3487 ns_state=ns_state,
3488 current_operation="IDLE",
3489 current_operation_id=None,
3490 error_description=error_description_nsr,
3491 error_detail=error_detail,
3492 other_update=db_nsr_update
3493 )
3494 self._write_op_status(
3495 op_id=nslcmop_id,
3496 stage="",
3497 error_message=error_description_nslcmop,
3498 operation_state=nslcmop_operation_state,
3499 other_update=db_nslcmop_update,
3500 )
3501 if ns_state == "NOT_INSTANTIATED":
3502 try:
3503 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "NOT_INSTANTIATED"})
3504 except DbException as e:
3505 self.logger.warn(logging_text + 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
3506 format(nsr_id, e))
3507 if operation_params:
3508 autoremove = operation_params.get("autoremove", False)
3509 if nslcmop_operation_state:
3510 try:
3511 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3512 "operationState": nslcmop_operation_state,
3513 "autoremove": autoremove},
3514 loop=self.loop)
3515 except Exception as e:
3516 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3517
3518 self.logger.debug(logging_text + "Exit")
3519 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
3520
3521 async def _wait_for_tasks(self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None):
3522 time_start = time()
3523 error_detail_list = []
3524 error_list = []
3525 pending_tasks = list(created_tasks_info.keys())
3526 num_tasks = len(pending_tasks)
3527 num_done = 0
3528 stage[1] = "{}/{}.".format(num_done, num_tasks)
3529 self._write_op_status(nslcmop_id, stage)
3530 while pending_tasks:
3531 new_error = None
3532 _timeout = timeout + time_start - time()
3533 done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout,
3534 return_when=asyncio.FIRST_COMPLETED)
3535 num_done += len(done)
3536 if not done: # Timeout
3537 for task in pending_tasks:
3538 new_error = created_tasks_info[task] + ": Timeout"
3539 error_detail_list.append(new_error)
3540 error_list.append(new_error)
3541 break
3542 for task in done:
3543 if task.cancelled():
3544 exc = "Cancelled"
3545 else:
3546 exc = task.exception()
3547 if exc:
3548 if isinstance(exc, asyncio.TimeoutError):
3549 exc = "Timeout"
3550 new_error = created_tasks_info[task] + ": {}".format(exc)
3551 error_list.append(created_tasks_info[task])
3552 error_detail_list.append(new_error)
3553 if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException,
3554 K8sException)):
3555 self.logger.error(logging_text + new_error)
3556 else:
3557 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
3558 self.logger.error(logging_text + created_tasks_info[task] + exc_traceback)
3559 else:
3560 self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
3561 stage[1] = "{}/{}.".format(num_done, num_tasks)
3562 if new_error:
3563 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
3564 if nsr_id: # update also nsr
3565 self.update_db_2("nsrs", nsr_id, {"errorDescription": "Error at: " + ", ".join(error_list),
3566 "errorDetail": ". ".join(error_detail_list)})
3567 self._write_op_status(nslcmop_id, stage)
3568 return error_detail_list
3569
3570 @staticmethod
3571 def _map_primitive_params(primitive_desc, params, instantiation_params):
3572 """
3573 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3574 The default-value is used. If it is between < > it look for a value at instantiation_params
3575 :param primitive_desc: portion of VNFD/NSD that describes primitive
3576 :param params: Params provided by user
3577 :param instantiation_params: Instantiation params provided by user
3578 :return: a dictionary with the calculated params
3579 """
3580 calculated_params = {}
3581 for parameter in primitive_desc.get("parameter", ()):
3582 param_name = parameter["name"]
3583 if param_name in params:
3584 calculated_params[param_name] = params[param_name]
3585 elif "default-value" in parameter or "value" in parameter:
3586 if "value" in parameter:
3587 calculated_params[param_name] = parameter["value"]
3588 else:
3589 calculated_params[param_name] = parameter["default-value"]
3590 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
3591 and calculated_params[param_name].endswith(">"):
3592 if calculated_params[param_name][1:-1] in instantiation_params:
3593 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
3594 else:
3595 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3596 format(calculated_params[param_name], primitive_desc["name"]))
3597 else:
3598 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3599 format(param_name, primitive_desc["name"]))
3600
3601 if isinstance(calculated_params[param_name], (dict, list, tuple)):
3602 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
3603 width=256)
3604 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
3605 calculated_params[param_name] = calculated_params[param_name][7:]
3606 if parameter.get("data-type") == "INTEGER":
3607 try:
3608 calculated_params[param_name] = int(calculated_params[param_name])
3609 except ValueError: # error converting string to int
3610 raise LcmException(
3611 "Parameter {} of primitive {} must be integer".format(param_name, primitive_desc["name"]))
3612 elif parameter.get("data-type") == "BOOLEAN":
3613 calculated_params[param_name] = not ((str(calculated_params[param_name])).lower() == 'false')
3614
3615 # add always ns_config_info if primitive name is config
3616 if primitive_desc["name"] == "config":
3617 if "ns_config_info" in instantiation_params:
3618 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
3619 return calculated_params
3620
3621 def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None,
3622 ee_descriptor_id=None):
3623 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3624 for vca in deployed_vca:
3625 if not vca:
3626 continue
3627 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
3628 continue
3629 if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]:
3630 continue
3631 if kdu_name and kdu_name != vca["kdu_name"]:
3632 continue
3633 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
3634 continue
3635 break
3636 else:
3637 # vca_deployed not found
3638 raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
3639 " is not deployed".format(member_vnf_index, vdu_id, vdu_count_index, kdu_name,
3640 ee_descriptor_id))
3641
3642 # get ee_id
3643 ee_id = vca.get("ee_id")
3644 vca_type = vca.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3645 if not ee_id:
3646 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3647 "execution environment"
3648 .format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
3649 return ee_id, vca_type
3650
3651 async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0,
3652 retries_interval=30, timeout=None,
3653 vca_type=None, db_dict=None) -> (str, str):
3654 try:
3655 if primitive == "config":
3656 primitive_params = {"params": primitive_params}
3657
3658 vca_type = vca_type or "lxc_proxy_charm"
3659
3660 while retries >= 0:
3661 try:
3662 output = await asyncio.wait_for(
3663 self.vca_map[vca_type].exec_primitive(
3664 ee_id=ee_id,
3665 primitive_name=primitive,
3666 params_dict=primitive_params,
3667 progress_timeout=self.timeout_progress_primitive,
3668 total_timeout=self.timeout_primitive,
3669 db_dict=db_dict),
3670 timeout=timeout or self.timeout_primitive)
3671 # execution was OK
3672 break
3673 except asyncio.CancelledError:
3674 raise
3675 except Exception as e: # asyncio.TimeoutError
3676 if isinstance(e, asyncio.TimeoutError):
3677 e = "Timeout"
3678 retries -= 1
3679 if retries >= 0:
3680 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
3681 # wait and retry
3682 await asyncio.sleep(retries_interval, loop=self.loop)
3683 else:
3684 return 'FAILED', str(e)
3685
3686 return 'COMPLETED', output
3687
3688 except (LcmException, asyncio.CancelledError):
3689 raise
3690 except Exception as e:
3691 return 'FAIL', 'Error executing action {}: {}'.format(primitive, e)
3692
3693 async def action(self, nsr_id, nslcmop_id):
3694
3695 # Try to lock HA task here
3696 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3697 if not task_is_locked_by_me:
3698 return
3699
3700 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
3701 self.logger.debug(logging_text + "Enter")
3702 # get all needed from database
3703 db_nsr = None
3704 db_nslcmop = None
3705 db_nsr_update = {}
3706 db_nslcmop_update = {}
3707 nslcmop_operation_state = None
3708 error_description_nslcmop = None
3709 exc = None
3710 try:
3711 # wait for any previous tasks in process
3712 step = "Waiting for previous operations to terminate"
3713 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3714
3715 self._write_ns_status(
3716 nsr_id=nsr_id,
3717 ns_state=None,
3718 current_operation="RUNNING ACTION",
3719 current_operation_id=nslcmop_id
3720 )
3721
3722 step = "Getting information from database"
3723 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3724 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3725
3726 nsr_deployed = db_nsr["_admin"].get("deployed")
3727 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3728 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3729 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
3730 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3731 primitive = db_nslcmop["operationParams"]["primitive"]
3732 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
3733 timeout_ns_action = db_nslcmop["operationParams"].get("timeout_ns_action", self.timeout_primitive)
3734
3735 if vnf_index:
3736 step = "Getting vnfr from database"
3737 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3738 step = "Getting vnfd from database"
3739 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3740 else:
3741 step = "Getting nsd from database"
3742 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
3743
3744 # for backward compatibility
3745 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3746 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3747 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3748 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3749
3750 # look for primitive
3751 config_primitive_desc = descriptor_configuration = None
3752 if vdu_id:
3753 for vdu in get_iterable(db_vnfd, "vdu"):
3754 if vdu_id == vdu["id"]:
3755 descriptor_configuration = vdu.get("vdu-configuration")
3756 break
3757 elif kdu_name:
3758 for kdu in get_iterable(db_vnfd, "kdu"):
3759 if kdu_name == kdu["name"]:
3760 descriptor_configuration = kdu.get("kdu-configuration")
3761 break
3762 elif vnf_index:
3763 descriptor_configuration = db_vnfd.get("vnf-configuration")
3764 else:
3765 descriptor_configuration = db_nsd.get("ns-configuration")
3766
3767 if descriptor_configuration and descriptor_configuration.get("config-primitive"):
3768 for config_primitive in descriptor_configuration["config-primitive"]:
3769 if config_primitive["name"] == primitive:
3770 config_primitive_desc = config_primitive
3771 break
3772
3773 if not config_primitive_desc:
3774 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
3775 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3776 format(primitive))
3777 primitive_name = primitive
3778 ee_descriptor_id = None
3779 else:
3780 primitive_name = config_primitive_desc.get("execution-environment-primitive", primitive)
3781 ee_descriptor_id = config_primitive_desc.get("execution-environment-ref")
3782
3783 if vnf_index:
3784 if vdu_id:
3785 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3786 desc_params = self._format_additional_params(vdur.get("additionalParams"))
3787 elif kdu_name:
3788 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
3789 desc_params = self._format_additional_params(kdur.get("additionalParams"))
3790 else:
3791 desc_params = self._format_additional_params(db_vnfr.get("additionalParamsForVnf"))
3792 else:
3793 desc_params = self._format_additional_params(db_nsr.get("additionalParamsForNs"))
3794
3795 if kdu_name:
3796 kdu_action = True if not deep_get(kdu, ("kdu-configuration", "juju")) else False
3797
3798 # TODO check if ns is in a proper status
3799 if kdu_name and (primitive_name in ("upgrade", "rollback", "status") or kdu_action):
3800 # kdur and desc_params already set from before
3801 if primitive_params:
3802 desc_params.update(primitive_params)
3803 # TODO Check if we will need something at vnf level
3804 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
3805 if kdu_name == kdu["kdu-name"] and kdu["member-vnf-index"] == vnf_index:
3806 break
3807 else:
3808 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index))
3809
3810 if kdu.get("k8scluster-type") not in self.k8scluster_map:
3811 msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
3812 raise LcmException(msg)
3813
3814 db_dict = {"collection": "nsrs",
3815 "filter": {"_id": nsr_id},
3816 "path": "_admin.deployed.K8s.{}".format(index)}
3817 self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name))
3818 step = "Executing kdu {}".format(primitive_name)
3819 if primitive_name == "upgrade":
3820 if desc_params.get("kdu_model"):
3821 kdu_model = desc_params.get("kdu_model")
3822 del desc_params["kdu_model"]
3823 else:
3824 kdu_model = kdu.get("kdu-model")
3825 parts = kdu_model.split(sep=":")
3826 if len(parts) == 2:
3827 kdu_model = parts[0]
3828
3829 detailed_status = await asyncio.wait_for(
3830 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
3831 cluster_uuid=kdu.get("k8scluster-uuid"),
3832 kdu_instance=kdu.get("kdu-instance"),
3833 atomic=True, kdu_model=kdu_model,
3834 params=desc_params, db_dict=db_dict,
3835 timeout=timeout_ns_action),
3836 timeout=timeout_ns_action + 10)
3837 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(detailed_status))
3838 elif primitive_name == "rollback":
3839 detailed_status = await asyncio.wait_for(
3840 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
3841 cluster_uuid=kdu.get("k8scluster-uuid"),
3842 kdu_instance=kdu.get("kdu-instance"),
3843 db_dict=db_dict),
3844 timeout=timeout_ns_action)
3845 elif primitive_name == "status":
3846 detailed_status = await asyncio.wait_for(
3847 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
3848 cluster_uuid=kdu.get("k8scluster-uuid"),
3849 kdu_instance=kdu.get("kdu-instance")),
3850 timeout=timeout_ns_action)
3851 else:
3852 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(kdu["kdu-name"], nsr_id)
3853 params = self._map_primitive_params(config_primitive_desc, primitive_params, desc_params)
3854
3855 detailed_status = await asyncio.wait_for(
3856 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
3857 cluster_uuid=kdu.get("k8scluster-uuid"),
3858 kdu_instance=kdu_instance,
3859 primitive_name=primitive_name,
3860 params=params, db_dict=db_dict,
3861 timeout=timeout_ns_action),
3862 timeout=timeout_ns_action)
3863
3864 if detailed_status:
3865 nslcmop_operation_state = 'COMPLETED'
3866 else:
3867 detailed_status = ''
3868 nslcmop_operation_state = 'FAILED'
3869 else:
3870 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
3871 member_vnf_index=vnf_index,
3872 vdu_id=vdu_id,
3873 vdu_count_index=vdu_count_index,
3874 ee_descriptor_id=ee_descriptor_id)
3875 db_nslcmop_notif = {"collection": "nslcmops",
3876 "filter": {"_id": nslcmop_id},
3877 "path": "admin.VCA"}
3878 nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
3879 ee_id,
3880 primitive=primitive_name,
3881 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
3882 timeout=timeout_ns_action,
3883 vca_type=vca_type,
3884 db_dict=db_nslcmop_notif)
3885
3886 db_nslcmop_update["detailed-status"] = detailed_status
3887 error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else ""
3888 self.logger.debug(logging_text + " task Done with result {} {}".format(nslcmop_operation_state,
3889 detailed_status))
3890 return # database update is called inside finally
3891
3892 except (DbException, LcmException, N2VCException, K8sException) as e:
3893 self.logger.error(logging_text + "Exit Exception {}".format(e))
3894 exc = e
3895 except asyncio.CancelledError:
3896 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3897 exc = "Operation was cancelled"
3898 except asyncio.TimeoutError:
3899 self.logger.error(logging_text + "Timeout while '{}'".format(step))
3900 exc = "Timeout"
3901 except Exception as e:
3902 exc = traceback.format_exc()
3903 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3904 finally:
3905 if exc:
3906 db_nslcmop_update["detailed-status"] = detailed_status = error_description_nslcmop = \
3907 "FAILED {}: {}".format(step, exc)
3908 nslcmop_operation_state = "FAILED"
3909 if db_nsr:
3910 self._write_ns_status(
3911 nsr_id=nsr_id,
3912 ns_state=db_nsr["nsState"], # TODO check if degraded. For the moment use previous status
3913 current_operation="IDLE",
3914 current_operation_id=None,
3915 # error_description=error_description_nsr,
3916 # error_detail=error_detail,
3917 other_update=db_nsr_update
3918 )
3919
3920 self._write_op_status(
3921 op_id=nslcmop_id,
3922 stage="",
3923 error_message=error_description_nslcmop,
3924 operation_state=nslcmop_operation_state,
3925 other_update=db_nslcmop_update,
3926 )
3927
3928 if nslcmop_operation_state:
3929 try:
3930 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3931 "operationState": nslcmop_operation_state},
3932 loop=self.loop)
3933 except Exception as e:
3934 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3935 self.logger.debug(logging_text + "Exit")
3936 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
3937 return nslcmop_operation_state, detailed_status
3938
3939 async def scale(self, nsr_id, nslcmop_id):
3940
3941 # Try to lock HA task here
3942 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3943 if not task_is_locked_by_me:
3944 return
3945
3946 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
3947 self.logger.debug(logging_text + "Enter")
3948 # get all needed from database
3949 db_nsr = None
3950 db_nslcmop = None
3951 db_nslcmop_update = {}
3952 nslcmop_operation_state = None
3953 db_nsr_update = {}
3954 exc = None
3955 # in case of error, indicates what part of scale was failed to put nsr at error status
3956 scale_process = None
3957 old_operational_status = ""
3958 old_config_status = ""
3959 try:
3960 # wait for any previous tasks in process
3961 step = "Waiting for previous operations to terminate"
3962 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3963
3964 self._write_ns_status(
3965 nsr_id=nsr_id,
3966 ns_state=None,
3967 current_operation="SCALING",
3968 current_operation_id=nslcmop_id
3969 )
3970
3971 step = "Getting nslcmop from database"
3972 self.logger.debug(step + " after having waited for previous tasks to be completed")
3973 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3974 step = "Getting nsr from database"
3975 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3976
3977 old_operational_status = db_nsr["operational-status"]
3978 old_config_status = db_nsr["config-status"]
3979 step = "Parsing scaling parameters"
3980 # self.logger.debug(step)
3981 db_nsr_update["operational-status"] = "scaling"
3982 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3983 nsr_deployed = db_nsr["_admin"].get("deployed")
3984
3985 #######
3986 nsr_deployed = db_nsr["_admin"].get("deployed")
3987 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3988 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3989 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3990 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3991 #######
3992
3993 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
3994 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3995 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3996 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3997 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
3998
3999 # for backward compatibility
4000 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
4001 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
4002 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
4003 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4004
4005 step = "Getting vnfr from database"
4006 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
4007 step = "Getting vnfd from database"
4008 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4009
4010 step = "Getting scaling-group-descriptor"
4011 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
4012 if scaling_descriptor["name"] == scaling_group:
4013 break
4014 else:
4015 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
4016 "at vnfd:scaling-group-descriptor".format(scaling_group))
4017
4018 # cooldown_time = 0
4019 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
4020 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
4021 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
4022 # break
4023
4024 # TODO check if ns is in a proper status
4025 step = "Sending scale order to VIM"
4026 nb_scale_op = 0
4027 if not db_nsr["_admin"].get("scaling-group"):
4028 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
4029 admin_scale_index = 0
4030 else:
4031 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
4032 if admin_scale_info["name"] == scaling_group:
4033 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
4034 break
4035 else: # not found, set index one plus last element and add new entry with the name
4036 admin_scale_index += 1
4037 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
4038 RO_scaling_info = []
4039 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
4040 if scaling_type == "SCALE_OUT":
4041 # count if max-instance-count is reached
4042 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
4043 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
4044 if nb_scale_op >= max_instance_count:
4045 raise LcmException("reached the limit of {} (max-instance-count) "
4046 "scaling-out operations for the "
4047 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
4048
4049 nb_scale_op += 1
4050 vdu_scaling_info["scaling_direction"] = "OUT"
4051 vdu_scaling_info["vdu-create"] = {}
4052 for vdu_scale_info in scaling_descriptor["vdu"]:
4053 vdud = next(vdu for vdu in db_vnfd.get("vdu") if vdu["id"] == vdu_scale_info["vdu-id-ref"])
4054 vdu_index = len([x for x in db_vnfr.get("vdur", ())
4055 if x.get("vdu-id-ref") == vdu_scale_info["vdu-id-ref"] and
4056 x.get("member-vnf-index-ref") == vnf_index])
4057 cloud_init_text = self._get_cloud_init(vdud, db_vnfd)
4058 if cloud_init_text:
4059 additional_params = self._get_vdu_additional_params(db_vnfr, vdud["id"]) or {}
4060 cloud_init_list = []
4061 for x in range(vdu_scale_info.get("count", 1)):
4062 if cloud_init_text:
4063 # TODO Information of its own ip is not available because db_vnfr is not updated.
4064 additional_params["OSM"] = self._get_osm_params(db_vnfr, vdu_scale_info["vdu-id-ref"],
4065 vdu_index + x)
4066 cloud_init_list.append(self._parse_cloud_init(cloud_init_text, additional_params,
4067 db_vnfd["id"], vdud["id"]))
4068 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
4069 "type": "create", "count": vdu_scale_info.get("count", 1)})
4070 if cloud_init_list:
4071 RO_scaling_info[-1]["cloud_init"] = cloud_init_list
4072 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
4073
4074 elif scaling_type == "SCALE_IN":
4075 # count if min-instance-count is reached
4076 min_instance_count = 0
4077 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
4078 min_instance_count = int(scaling_descriptor["min-instance-count"])
4079 if nb_scale_op <= min_instance_count:
4080 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
4081 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
4082 nb_scale_op -= 1
4083 vdu_scaling_info["scaling_direction"] = "IN"
4084 vdu_scaling_info["vdu-delete"] = {}
4085 for vdu_scale_info in scaling_descriptor["vdu"]:
4086 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
4087 "type": "delete", "count": vdu_scale_info.get("count", 1)})
4088 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
4089
4090 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
4091 vdu_create = vdu_scaling_info.get("vdu-create")
4092 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
4093 if vdu_scaling_info["scaling_direction"] == "IN":
4094 for vdur in reversed(db_vnfr["vdur"]):
4095 if vdu_delete.get(vdur["vdu-id-ref"]):
4096 vdu_delete[vdur["vdu-id-ref"]] -= 1
4097 vdu_scaling_info["vdu"].append({
4098 "name": vdur["name"],
4099 "vdu_id": vdur["vdu-id-ref"],
4100 "interface": []
4101 })
4102 for interface in vdur["interfaces"]:
4103 vdu_scaling_info["vdu"][-1]["interface"].append({
4104 "name": interface["name"],
4105 "ip_address": interface["ip-address"],
4106 "mac_address": interface.get("mac-address"),
4107 })
4108 vdu_delete = vdu_scaling_info.pop("vdu-delete")
4109
4110 # PRE-SCALE BEGIN
4111 step = "Executing pre-scale vnf-config-primitive"
4112 if scaling_descriptor.get("scaling-config-action"):
4113 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
4114 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
4115 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
4116 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
4117 step = db_nslcmop_update["detailed-status"] = \
4118 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
4119
4120 # look for primitive
4121 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
4122 if config_primitive["name"] == vnf_config_primitive:
4123 break
4124 else:
4125 raise LcmException(
4126 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
4127 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
4128 "primitive".format(scaling_group, vnf_config_primitive))
4129
4130 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
4131 if db_vnfr.get("additionalParamsForVnf"):
4132 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
4133
4134 scale_process = "VCA"
4135 db_nsr_update["config-status"] = "configuring pre-scaling"
4136 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
4137
4138 # Pre-scale retry check: Check if this sub-operation has been executed before
4139 op_index = self._check_or_add_scale_suboperation(
4140 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
4141 if op_index == self.SUBOPERATION_STATUS_SKIP:
4142 # Skip sub-operation
4143 result = 'COMPLETED'
4144 result_detail = 'Done'
4145 self.logger.debug(logging_text +
4146 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
4147 vnf_config_primitive, result, result_detail))
4148 else:
4149 if op_index == self.SUBOPERATION_STATUS_NEW:
4150 # New sub-operation: Get index of this sub-operation
4151 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4152 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
4153 format(vnf_config_primitive))
4154 else:
4155 # retry: Get registered params for this existing sub-operation
4156 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4157 vnf_index = op.get('member_vnf_index')
4158 vnf_config_primitive = op.get('primitive')
4159 primitive_params = op.get('primitive_params')
4160 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
4161 format(vnf_config_primitive))
4162 # Execute the primitive, either with new (first-time) or registered (reintent) args
4163 ee_descriptor_id = config_primitive.get("execution-environment-ref")
4164 primitive_name = config_primitive.get("execution-environment-primitive",
4165 vnf_config_primitive)
4166 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
4167 member_vnf_index=vnf_index,
4168 vdu_id=None,
4169 vdu_count_index=None,
4170 ee_descriptor_id=ee_descriptor_id)
4171 result, result_detail = await self._ns_execute_primitive(
4172 ee_id, primitive_name, primitive_params, vca_type)
4173 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
4174 vnf_config_primitive, result, result_detail))
4175 # Update operationState = COMPLETED | FAILED
4176 self._update_suboperation_status(
4177 db_nslcmop, op_index, result, result_detail)
4178
4179 if result == "FAILED":
4180 raise LcmException(result_detail)
4181 db_nsr_update["config-status"] = old_config_status
4182 scale_process = None
4183 # PRE-SCALE END
4184
4185 # SCALE RO - BEGIN
4186 # Should this block be skipped if 'RO_nsr_id' == None ?
4187 # if (RO_nsr_id and RO_scaling_info):
4188 if RO_scaling_info:
4189 scale_process = "RO"
4190 # Scale RO retry check: Check if this sub-operation has been executed before
4191 op_index = self._check_or_add_scale_suboperation(
4192 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
4193 if op_index == self.SUBOPERATION_STATUS_SKIP:
4194 # Skip sub-operation
4195 result = 'COMPLETED'
4196 result_detail = 'Done'
4197 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
4198 result, result_detail))
4199 else:
4200 if op_index == self.SUBOPERATION_STATUS_NEW:
4201 # New sub-operation: Get index of this sub-operation
4202 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4203 self.logger.debug(logging_text + "New sub-operation RO")
4204 else:
4205 # retry: Get registered params for this existing sub-operation
4206 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4207 RO_nsr_id = op.get('RO_nsr_id')
4208 RO_scaling_info = op.get('RO_scaling_info')
4209 self.logger.debug(logging_text + "Sub-operation RO retry for primitive {}".format(
4210 vnf_config_primitive))
4211
4212 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
4213 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
4214 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
4215 # wait until ready
4216 RO_nslcmop_id = RO_desc["instance_action_id"]
4217 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
4218
4219 RO_task_done = False
4220 step = detailed_status = "Waiting for VIM to scale. RO_task_id={}.".format(RO_nslcmop_id)
4221 detailed_status_old = None
4222 self.logger.debug(logging_text + step)
4223
4224 deployment_timeout = 1 * 3600 # One hour
4225 while deployment_timeout > 0:
4226 if not RO_task_done:
4227 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
4228 extra_item_id=RO_nslcmop_id)
4229
4230 # deploymentStatus
4231 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4232
4233 ns_status, ns_status_info = self.RO.check_action_status(desc)
4234 if ns_status == "ERROR":
4235 raise ROclient.ROClientException(ns_status_info)
4236 elif ns_status == "BUILD":
4237 detailed_status = step + "; {}".format(ns_status_info)
4238 elif ns_status == "ACTIVE":
4239 RO_task_done = True
4240 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
4241 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
4242 self.logger.debug(logging_text + step)
4243 else:
4244 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
4245 else:
4246 desc = await self.RO.show("ns", RO_nsr_id)
4247 ns_status, ns_status_info = self.RO.check_ns_status(desc)
4248 # deploymentStatus
4249 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4250
4251 if ns_status == "ERROR":
4252 raise ROclient.ROClientException(ns_status_info)
4253 elif ns_status == "BUILD":
4254 detailed_status = step + "; {}".format(ns_status_info)
4255 elif ns_status == "ACTIVE":
4256 step = detailed_status = \
4257 "Waiting for management IP address reported by the VIM. Updating VNFRs"
4258 try:
4259 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
4260 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
4261 break
4262 except LcmExceptionNoMgmtIP:
4263 pass
4264 else:
4265 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
4266 if detailed_status != detailed_status_old:
4267 self._update_suboperation_status(
4268 db_nslcmop, op_index, 'COMPLETED', detailed_status)
4269 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
4270 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
4271
4272 await asyncio.sleep(5, loop=self.loop)
4273 deployment_timeout -= 5
4274 if deployment_timeout <= 0:
4275 self._update_suboperation_status(
4276 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
4277 raise ROclient.ROClientException("Timeout waiting ns to be ready")
4278
4279 # update VDU_SCALING_INFO with the obtained ip_addresses
4280 if vdu_scaling_info["scaling_direction"] == "OUT":
4281 for vdur in reversed(db_vnfr["vdur"]):
4282 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
4283 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
4284 vdu_scaling_info["vdu"].append({
4285 "name": vdur["name"],
4286 "vdu_id": vdur["vdu-id-ref"],
4287 "interface": []
4288 })
4289 for interface in vdur["interfaces"]:
4290 vdu_scaling_info["vdu"][-1]["interface"].append({
4291 "name": interface["name"],
4292 "ip_address": interface["ip-address"],
4293 "mac_address": interface.get("mac-address"),
4294 })
4295 del vdu_scaling_info["vdu-create"]
4296
4297 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
4298 # SCALE RO - END
4299
4300 scale_process = None
4301 if db_nsr_update:
4302 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4303
4304 # POST-SCALE BEGIN
4305 # execute primitive service POST-SCALING
4306 step = "Executing post-scale vnf-config-primitive"
4307 if scaling_descriptor.get("scaling-config-action"):
4308 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
4309 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
4310 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
4311 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
4312 step = db_nslcmop_update["detailed-status"] = \
4313 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
4314
4315 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
4316 if db_vnfr.get("additionalParamsForVnf"):
4317 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
4318
4319 # look for primitive
4320 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
4321 if config_primitive["name"] == vnf_config_primitive:
4322 break
4323 else:
4324 raise LcmException(
4325 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
4326 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
4327 "config-primitive".format(scaling_group, vnf_config_primitive))
4328 scale_process = "VCA"
4329 db_nsr_update["config-status"] = "configuring post-scaling"
4330 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
4331
4332 # Post-scale retry check: Check if this sub-operation has been executed before
4333 op_index = self._check_or_add_scale_suboperation(
4334 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
4335 if op_index == self.SUBOPERATION_STATUS_SKIP:
4336 # Skip sub-operation
4337 result = 'COMPLETED'
4338 result_detail = 'Done'
4339 self.logger.debug(logging_text +
4340 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
4341 format(vnf_config_primitive, result, result_detail))
4342 else:
4343 if op_index == self.SUBOPERATION_STATUS_NEW:
4344 # New sub-operation: Get index of this sub-operation
4345 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4346 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
4347 format(vnf_config_primitive))
4348 else:
4349 # retry: Get registered params for this existing sub-operation
4350 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4351 vnf_index = op.get('member_vnf_index')
4352 vnf_config_primitive = op.get('primitive')
4353 primitive_params = op.get('primitive_params')
4354 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
4355 format(vnf_config_primitive))
4356 # Execute the primitive, either with new (first-time) or registered (reintent) args
4357 ee_descriptor_id = config_primitive.get("execution-environment-ref")
4358 primitive_name = config_primitive.get("execution-environment-primitive",
4359 vnf_config_primitive)
4360 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
4361 member_vnf_index=vnf_index,
4362 vdu_id=None,
4363 vdu_count_index=None,
4364 ee_descriptor_id=ee_descriptor_id)
4365 result, result_detail = await self._ns_execute_primitive(
4366 ee_id, primitive_name, primitive_params, vca_type)
4367 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
4368 vnf_config_primitive, result, result_detail))
4369 # Update operationState = COMPLETED | FAILED
4370 self._update_suboperation_status(
4371 db_nslcmop, op_index, result, result_detail)
4372
4373 if result == "FAILED":
4374 raise LcmException(result_detail)
4375 db_nsr_update["config-status"] = old_config_status
4376 scale_process = None
4377 # POST-SCALE END
4378
4379 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
4380 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
4381 else old_operational_status
4382 db_nsr_update["config-status"] = old_config_status
4383 return
4384 except (ROclient.ROClientException, DbException, LcmException) as e:
4385 self.logger.error(logging_text + "Exit Exception {}".format(e))
4386 exc = e
4387 except asyncio.CancelledError:
4388 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
4389 exc = "Operation was cancelled"
4390 except Exception as e:
4391 exc = traceback.format_exc()
4392 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
4393 finally:
4394 self._write_ns_status(
4395 nsr_id=nsr_id,
4396 ns_state=None,
4397 current_operation="IDLE",
4398 current_operation_id=None
4399 )
4400 if exc:
4401 db_nslcmop_update["detailed-status"] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
4402 nslcmop_operation_state = "FAILED"
4403 if db_nsr:
4404 db_nsr_update["operational-status"] = old_operational_status
4405 db_nsr_update["config-status"] = old_config_status
4406 db_nsr_update["detailed-status"] = ""
4407 if scale_process:
4408 if "VCA" in scale_process:
4409 db_nsr_update["config-status"] = "failed"
4410 if "RO" in scale_process:
4411 db_nsr_update["operational-status"] = "failed"
4412 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
4413 exc)
4414 else:
4415 error_description_nslcmop = None
4416 nslcmop_operation_state = "COMPLETED"
4417 db_nslcmop_update["detailed-status"] = "Done"
4418
4419 self._write_op_status(
4420 op_id=nslcmop_id,
4421 stage="",
4422 error_message=error_description_nslcmop,
4423 operation_state=nslcmop_operation_state,
4424 other_update=db_nslcmop_update,
4425 )
4426 if db_nsr:
4427 self._write_ns_status(
4428 nsr_id=nsr_id,
4429 ns_state=None,
4430 current_operation="IDLE",
4431 current_operation_id=None,
4432 other_update=db_nsr_update
4433 )
4434
4435 if nslcmop_operation_state:
4436 try:
4437 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
4438 "operationState": nslcmop_operation_state},
4439 loop=self.loop)
4440 # if cooldown_time:
4441 # await asyncio.sleep(cooldown_time, loop=self.loop)
4442 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
4443 except Exception as e:
4444 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
4445 self.logger.debug(logging_text + "Exit")
4446 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
4447
4448 async def add_prometheus_metrics(self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip):
4449 if not self.prometheus:
4450 return
4451 # look if exist a file called 'prometheus*.j2' and
4452 artifact_content = self.fs.dir_ls(artifact_path)
4453 job_file = next((f for f in artifact_content if f.startswith("prometheus") and f.endswith(".j2")), None)
4454 if not job_file:
4455 return
4456 with self.fs.file_open((artifact_path, job_file), "r") as f:
4457 job_data = f.read()
4458
4459 # TODO get_service
4460 _, _, service = ee_id.partition(".") # remove prefix "namespace."
4461 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
4462 host_port = "80"
4463 vnfr_id = vnfr_id.replace("-", "")
4464 variables = {
4465 "JOB_NAME": vnfr_id,
4466 "TARGET_IP": target_ip,
4467 "EXPORTER_POD_IP": host_name,
4468 "EXPORTER_POD_PORT": host_port,
4469 }
4470 job_list = self.prometheus.parse_job(job_data, variables)
4471 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
4472 for job in job_list:
4473 if not isinstance(job.get("job_name"), str) or vnfr_id not in job["job_name"]:
4474 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
4475 job["nsr_id"] = nsr_id
4476 job_dict = {jl["job_name"]: jl for jl in job_list}
4477 if await self.prometheus.update(job_dict):
4478 return list(job_dict.keys())