198def1010c17c0a872e1ad8f7a750e265e37b0d
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, TemplateError, TemplateNotFound, StrictUndefined, UndefinedError
26
27 from osm_lcm import ROclient
28 from osm_lcm.ng_ro import NgRoClient, NgRoException
29 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
30 from n2vc.k8s_helm_conn import K8sHelmConnector
31 from n2vc.k8s_helm3_conn import K8sHelm3Connector
32 from n2vc.k8s_juju_conn import K8sJujuConnector
33
34 from osm_common.dbbase import DbException
35 from osm_common.fsbase import FsException
36
37 from n2vc.n2vc_juju_conn import N2VCJujuConnector
38 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
39
40 from osm_lcm.lcm_helm_conn import LCMHelmConn
41
42 from copy import copy, deepcopy
43 from http import HTTPStatus
44 from time import time
45 from uuid import uuid4
46
47 from random import randint
48
49 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
50
51
52 class NsLcm(LcmBase):
53 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
54 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
55 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
56 timeout_charm_delete = 10 * 60
57 timeout_primitive = 30 * 60 # timeout for primitive execution
58 timeout_progress_primitive = 10 * 60 # timeout for some progress in a primitive execution
59
60 SUBOPERATION_STATUS_NOT_FOUND = -1
61 SUBOPERATION_STATUS_NEW = -2
62 SUBOPERATION_STATUS_SKIP = -3
63 task_name_deploy_vca = "Deploying VCA"
64
65 def __init__(self, db, msg, fs, lcm_tasks, config, loop, prometheus=None):
66 """
67 Init, Connect to database, filesystem storage, and messaging
68 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
69 :return: None
70 """
71 super().__init__(
72 db=db,
73 msg=msg,
74 fs=fs,
75 logger=logging.getLogger('lcm.ns')
76 )
77
78 self.loop = loop
79 self.lcm_tasks = lcm_tasks
80 self.timeout = config["timeout"]
81 self.ro_config = config["ro_config"]
82 self.ng_ro = config["ro_config"].get("ng")
83 self.vca_config = config["VCA"].copy()
84
85 # create N2VC connector
86 self.n2vc = N2VCJujuConnector(
87 db=self.db,
88 fs=self.fs,
89 log=self.logger,
90 loop=self.loop,
91 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
92 username=self.vca_config.get('user', None),
93 vca_config=self.vca_config,
94 on_update_db=self._on_update_n2vc_db
95 )
96
97 self.conn_helm_ee = LCMHelmConn(
98 db=self.db,
99 fs=self.fs,
100 log=self.logger,
101 loop=self.loop,
102 url=None,
103 username=None,
104 vca_config=self.vca_config,
105 on_update_db=self._on_update_n2vc_db
106 )
107
108 self.k8sclusterhelm2 = K8sHelmConnector(
109 kubectl_command=self.vca_config.get("kubectlpath"),
110 helm_command=self.vca_config.get("helmpath"),
111 fs=self.fs,
112 log=self.logger,
113 db=self.db,
114 on_update_db=None,
115 )
116
117 self.k8sclusterhelm3 = K8sHelm3Connector(
118 kubectl_command=self.vca_config.get("kubectlpath"),
119 helm_command=self.vca_config.get("helm3path"),
120 fs=self.fs,
121 log=self.logger,
122 db=self.db,
123 on_update_db=None,
124 )
125
126 self.k8sclusterjuju = K8sJujuConnector(
127 kubectl_command=self.vca_config.get("kubectlpath"),
128 juju_command=self.vca_config.get("jujupath"),
129 fs=self.fs,
130 log=self.logger,
131 db=self.db,
132 loop=self.loop,
133 on_update_db=None,
134 vca_config=self.vca_config,
135 )
136
137 self.k8scluster_map = {
138 "helm-chart": self.k8sclusterhelm2,
139 "helm-chart-v3": self.k8sclusterhelm3,
140 "chart": self.k8sclusterhelm3,
141 "juju-bundle": self.k8sclusterjuju,
142 "juju": self.k8sclusterjuju,
143 }
144
145 self.vca_map = {
146 "lxc_proxy_charm": self.n2vc,
147 "native_charm": self.n2vc,
148 "k8s_proxy_charm": self.n2vc,
149 "helm": self.conn_helm_ee,
150 "helm-v3": self.conn_helm_ee
151 }
152
153 self.prometheus = prometheus
154
155 # create RO client
156 if self.ng_ro:
157 self.RO = NgRoClient(self.loop, **self.ro_config)
158 else:
159 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
160
161 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
162
163 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
164
165 try:
166 # TODO filter RO descriptor fields...
167
168 # write to database
169 db_dict = dict()
170 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
171 db_dict['deploymentStatus'] = ro_descriptor
172 self.update_db_2("nsrs", nsrs_id, db_dict)
173
174 except Exception as e:
175 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
176
177 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
178
179 # remove last dot from path (if exists)
180 if path.endswith('.'):
181 path = path[:-1]
182
183 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
184 # .format(table, filter, path, updated_data))
185
186 try:
187
188 nsr_id = filter.get('_id')
189
190 # read ns record from database
191 nsr = self.db.get_one(table='nsrs', q_filter=filter)
192 current_ns_status = nsr.get('nsState')
193
194 # get vca status for NS
195 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
196
197 # vcaStatus
198 db_dict = dict()
199 db_dict['vcaStatus'] = status_dict
200
201 # update configurationStatus for this VCA
202 try:
203 vca_index = int(path[path.rfind(".")+1:])
204
205 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
206 vca_status = vca_list[vca_index].get('status')
207
208 configuration_status_list = nsr.get('configurationStatus')
209 config_status = configuration_status_list[vca_index].get('status')
210
211 if config_status == 'BROKEN' and vca_status != 'failed':
212 db_dict['configurationStatus'][vca_index] = 'READY'
213 elif config_status != 'BROKEN' and vca_status == 'failed':
214 db_dict['configurationStatus'][vca_index] = 'BROKEN'
215 except Exception as e:
216 # not update configurationStatus
217 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
218
219 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
220 # if nsState = 'DEGRADED' check if all is OK
221 is_degraded = False
222 if current_ns_status in ('READY', 'DEGRADED'):
223 error_description = ''
224 # check machines
225 if status_dict.get('machines'):
226 for machine_id in status_dict.get('machines'):
227 machine = status_dict.get('machines').get(machine_id)
228 # check machine agent-status
229 if machine.get('agent-status'):
230 s = machine.get('agent-status').get('status')
231 if s != 'started':
232 is_degraded = True
233 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
234 # check machine instance status
235 if machine.get('instance-status'):
236 s = machine.get('instance-status').get('status')
237 if s != 'running':
238 is_degraded = True
239 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
240 # check applications
241 if status_dict.get('applications'):
242 for app_id in status_dict.get('applications'):
243 app = status_dict.get('applications').get(app_id)
244 # check application status
245 if app.get('status'):
246 s = app.get('status').get('status')
247 if s != 'active':
248 is_degraded = True
249 error_description += 'application {} status={} ; '.format(app_id, s)
250
251 if error_description:
252 db_dict['errorDescription'] = error_description
253 if current_ns_status == 'READY' and is_degraded:
254 db_dict['nsState'] = 'DEGRADED'
255 if current_ns_status == 'DEGRADED' and not is_degraded:
256 db_dict['nsState'] = 'READY'
257
258 # write to database
259 self.update_db_2("nsrs", nsr_id, db_dict)
260
261 except (asyncio.CancelledError, asyncio.TimeoutError):
262 raise
263 except Exception as e:
264 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
265
266 @staticmethod
267 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
268 try:
269 env = Environment(undefined=StrictUndefined)
270 template = env.from_string(cloud_init_text)
271 return template.render(additional_params or {})
272 except UndefinedError as e:
273 raise LcmException("Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
274 "file, must be provided in the instantiation parameters inside the "
275 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id))
276 except (TemplateError, TemplateNotFound) as e:
277 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
278 format(vnfd_id, vdu_id, e))
279
280 def _get_cloud_init(self, vdu, vnfd):
281 try:
282 cloud_init_content = cloud_init_file = None
283 if vdu.get("cloud-init-file"):
284 base_folder = vnfd["_admin"]["storage"]
285 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
286 vdu["cloud-init-file"])
287 with self.fs.file_open(cloud_init_file, "r") as ci_file:
288 cloud_init_content = ci_file.read()
289 elif vdu.get("cloud-init"):
290 cloud_init_content = vdu["cloud-init"]
291
292 return cloud_init_content
293 except FsException as e:
294 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
295 format(vnfd["id"], vdu["id"], cloud_init_file, e))
296
297 def _get_osm_params(self, db_vnfr, vdu_id=None, vdu_count_index=0):
298 osm_params = {x.replace("-", "_"): db_vnfr[x] for x in ("ip-address", "vim-account-id", "vnfd-id", "vnfd-ref")
299 if db_vnfr.get(x) is not None}
300 osm_params["ns_id"] = db_vnfr["nsr-id-ref"]
301 osm_params["vnf_id"] = db_vnfr["_id"]
302 osm_params["member_vnf_index"] = db_vnfr["member-vnf-index-ref"]
303 if db_vnfr.get("vdur"):
304 osm_params["vdu"] = {}
305 for vdur in db_vnfr["vdur"]:
306 vdu = {
307 "count_index": vdur["count-index"],
308 "vdu_id": vdur["vdu-id-ref"],
309 "interfaces": {}
310 }
311 if vdur.get("ip-address"):
312 vdu["ip_address"] = vdur["ip-address"]
313 for iface in vdur["interfaces"]:
314 vdu["interfaces"][iface["name"]] = \
315 {x.replace("-", "_"): iface[x] for x in ("mac-address", "ip-address", "vnf-vld-id", "name")
316 if iface.get(x) is not None}
317 vdu_id_index = "{}-{}".format(vdur["vdu-id-ref"], vdur["count-index"])
318 osm_params["vdu"][vdu_id_index] = vdu
319 if vdu_id:
320 osm_params["vdu_id"] = vdu_id
321 osm_params["count_index"] = vdu_count_index
322 return osm_params
323
324 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
325 vdur = next(vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"])
326 additional_params = vdur.get("additionalParams")
327 return self._format_additional_params(additional_params)
328
329 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
330 """
331 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
332 :param vnfd: input vnfd
333 :param new_id: overrides vnf id if provided
334 :param additionalParams: Instantiation params for VNFs provided
335 :param nsrId: Id of the NSR
336 :return: copy of vnfd
337 """
338 vnfd_RO = deepcopy(vnfd)
339 # remove unused by RO configuration, monitoring, scaling and internal keys
340 vnfd_RO.pop("_id", None)
341 vnfd_RO.pop("_admin", None)
342 vnfd_RO.pop("vnf-configuration", None)
343 vnfd_RO.pop("monitoring-param", None)
344 vnfd_RO.pop("scaling-group-descriptor", None)
345 vnfd_RO.pop("kdu", None)
346 vnfd_RO.pop("k8s-cluster", None)
347 if new_id:
348 vnfd_RO["id"] = new_id
349
350 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
351 for vdu in get_iterable(vnfd_RO, "vdu"):
352 vdu.pop("cloud-init-file", None)
353 vdu.pop("cloud-init", None)
354 return vnfd_RO
355
356 def _ns_params_2_RO(self, ns_params, nsd, vnfd_dict, db_vnfrs, n2vc_key_list):
357 """
358 Creates a RO ns descriptor from OSM ns_instantiate params
359 :param ns_params: OSM instantiate params
360 :param vnfd_dict: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
361 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index. {member-vnf-index: {vnfr_object}, ...}
362 :return: The RO ns descriptor
363 """
364 vim_2_RO = {}
365 wim_2_RO = {}
366 # TODO feature 1417: Check that no instantiation is set over PDU
367 # check if PDU forces a concrete vim-network-id and add it
368 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
369
370 def vim_account_2_RO(vim_account):
371 if vim_account in vim_2_RO:
372 return vim_2_RO[vim_account]
373
374 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
375 if db_vim["_admin"]["operationalState"] != "ENABLED":
376 raise LcmException("VIM={} is not available. operationalState={}".format(
377 vim_account, db_vim["_admin"]["operationalState"]))
378 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
379 vim_2_RO[vim_account] = RO_vim_id
380 return RO_vim_id
381
382 def wim_account_2_RO(wim_account):
383 if isinstance(wim_account, str):
384 if wim_account in wim_2_RO:
385 return wim_2_RO[wim_account]
386
387 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
388 if db_wim["_admin"]["operationalState"] != "ENABLED":
389 raise LcmException("WIM={} is not available. operationalState={}".format(
390 wim_account, db_wim["_admin"]["operationalState"]))
391 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
392 wim_2_RO[wim_account] = RO_wim_id
393 return RO_wim_id
394 else:
395 return wim_account
396
397 def ip_profile_2_RO(ip_profile):
398 RO_ip_profile = deepcopy((ip_profile))
399 if "dns-server" in RO_ip_profile:
400 if isinstance(RO_ip_profile["dns-server"], list):
401 RO_ip_profile["dns-address"] = []
402 for ds in RO_ip_profile.pop("dns-server"):
403 RO_ip_profile["dns-address"].append(ds['address'])
404 else:
405 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
406 if RO_ip_profile.get("ip-version") == "ipv4":
407 RO_ip_profile["ip-version"] = "IPv4"
408 if RO_ip_profile.get("ip-version") == "ipv6":
409 RO_ip_profile["ip-version"] = "IPv6"
410 if "dhcp-params" in RO_ip_profile:
411 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
412 return RO_ip_profile
413
414 if not ns_params:
415 return None
416 RO_ns_params = {
417 # "name": ns_params["nsName"],
418 # "description": ns_params.get("nsDescription"),
419 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
420 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
421 # "scenario": ns_params["nsdId"],
422 }
423 # set vim_account of each vnf if different from general vim_account.
424 # Get this information from <vnfr> database content, key vim-account-id
425 # Vim account can be set by placement_engine and it may be different from
426 # the instantiate parameters (vnfs.member-vnf-index.datacenter).
427 for vnf_index, vnfr in db_vnfrs.items():
428 if vnfr.get("vim-account-id") and vnfr["vim-account-id"] != ns_params["vimAccountId"]:
429 populate_dict(RO_ns_params, ("vnfs", vnf_index, "datacenter"), vim_account_2_RO(vnfr["vim-account-id"]))
430
431 n2vc_key_list = n2vc_key_list or []
432 for vnfd_ref, vnfd in vnfd_dict.items():
433 vdu_needed_access = []
434 mgmt_cp = None
435 if vnfd.get("vnf-configuration"):
436 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
437 if ssh_required and vnfd.get("mgmt-interface"):
438 if vnfd["mgmt-interface"].get("vdu-id"):
439 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
440 elif vnfd["mgmt-interface"].get("cp"):
441 mgmt_cp = vnfd["mgmt-interface"]["cp"]
442
443 for vdu in vnfd.get("vdu", ()):
444 if vdu.get("vdu-configuration"):
445 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
446 if ssh_required:
447 vdu_needed_access.append(vdu["id"])
448 elif mgmt_cp:
449 for vdu_interface in vdu.get("interface"):
450 if vdu_interface.get("external-connection-point-ref") and \
451 vdu_interface["external-connection-point-ref"] == mgmt_cp:
452 vdu_needed_access.append(vdu["id"])
453 mgmt_cp = None
454 break
455
456 if vdu_needed_access:
457 for vnf_member in nsd.get("constituent-vnfd"):
458 if vnf_member["vnfd-id-ref"] != vnfd_ref:
459 continue
460 for vdu in vdu_needed_access:
461 populate_dict(RO_ns_params,
462 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
463 n2vc_key_list)
464 # cloud init
465 for vdu in get_iterable(vnfd, "vdu"):
466 cloud_init_text = self._get_cloud_init(vdu, vnfd)
467 if not cloud_init_text:
468 continue
469 for vnf_member in nsd.get("constituent-vnfd"):
470 if vnf_member["vnfd-id-ref"] != vnfd_ref:
471 continue
472 db_vnfr = db_vnfrs[vnf_member["member-vnf-index"]]
473 additional_params = self._get_vdu_additional_params(db_vnfr, vdu["id"]) or {}
474
475 cloud_init_list = []
476 for vdu_index in range(0, int(vdu.get("count", 1))):
477 additional_params["OSM"] = self._get_osm_params(db_vnfr, vdu["id"], vdu_index)
478 cloud_init_list.append(self._parse_cloud_init(cloud_init_text, additional_params, vnfd["id"],
479 vdu["id"]))
480 populate_dict(RO_ns_params,
481 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu["id"], "cloud_init"),
482 cloud_init_list)
483
484 if ns_params.get("vduImage"):
485 RO_ns_params["vduImage"] = ns_params["vduImage"]
486
487 if ns_params.get("ssh_keys"):
488 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
489 for vnf_params in get_iterable(ns_params, "vnf"):
490 for constituent_vnfd in nsd["constituent-vnfd"]:
491 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
492 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
493 break
494 else:
495 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
496 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
497
498 for vdu_params in get_iterable(vnf_params, "vdu"):
499 # TODO feature 1417: check that this VDU exist and it is not a PDU
500 if vdu_params.get("volume"):
501 for volume_params in vdu_params["volume"]:
502 if volume_params.get("vim-volume-id"):
503 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
504 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
505 volume_params["vim-volume-id"])
506 if vdu_params.get("interface"):
507 for interface_params in vdu_params["interface"]:
508 if interface_params.get("ip-address"):
509 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
510 vdu_params["id"], "interfaces", interface_params["name"],
511 "ip_address"),
512 interface_params["ip-address"])
513 if interface_params.get("mac-address"):
514 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
515 vdu_params["id"], "interfaces", interface_params["name"],
516 "mac_address"),
517 interface_params["mac-address"])
518 if interface_params.get("floating-ip-required"):
519 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
520 vdu_params["id"], "interfaces", interface_params["name"],
521 "floating-ip"),
522 interface_params["floating-ip-required"])
523
524 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
525 if internal_vld_params.get("vim-network-name"):
526 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
527 internal_vld_params["name"], "vim-network-name"),
528 internal_vld_params["vim-network-name"])
529 if internal_vld_params.get("vim-network-id"):
530 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
531 internal_vld_params["name"], "vim-network-id"),
532 internal_vld_params["vim-network-id"])
533 if internal_vld_params.get("ip-profile"):
534 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
535 internal_vld_params["name"], "ip-profile"),
536 ip_profile_2_RO(internal_vld_params["ip-profile"]))
537 if internal_vld_params.get("provider-network"):
538
539 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
540 internal_vld_params["name"], "provider-network"),
541 internal_vld_params["provider-network"].copy())
542
543 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
544 # look for interface
545 iface_found = False
546 for vdu_descriptor in vnf_descriptor["vdu"]:
547 for vdu_interface in vdu_descriptor["interface"]:
548 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
549 if icp_params.get("ip-address"):
550 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
551 vdu_descriptor["id"], "interfaces",
552 vdu_interface["name"], "ip_address"),
553 icp_params["ip-address"])
554
555 if icp_params.get("mac-address"):
556 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
557 vdu_descriptor["id"], "interfaces",
558 vdu_interface["name"], "mac_address"),
559 icp_params["mac-address"])
560 iface_found = True
561 break
562 if iface_found:
563 break
564 else:
565 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
566 "internal-vld:id-ref={} is not present at vnfd:internal-"
567 "connection-point".format(vnf_params["member-vnf-index"],
568 icp_params["id-ref"]))
569
570 for vld_params in get_iterable(ns_params, "vld"):
571 if "ip-profile" in vld_params:
572 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
573 ip_profile_2_RO(vld_params["ip-profile"]))
574
575 if vld_params.get("provider-network"):
576
577 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
578 vld_params["provider-network"].copy())
579
580 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
581 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
582 wim_account_2_RO(vld_params["wimAccountId"])),
583 if vld_params.get("vim-network-name"):
584 RO_vld_sites = []
585 if isinstance(vld_params["vim-network-name"], dict):
586 for vim_account, vim_net in vld_params["vim-network-name"].items():
587 RO_vld_sites.append({
588 "netmap-use": vim_net,
589 "datacenter": vim_account_2_RO(vim_account)
590 })
591 else: # isinstance str
592 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
593 if RO_vld_sites:
594 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
595
596 if vld_params.get("vim-network-id"):
597 RO_vld_sites = []
598 if isinstance(vld_params["vim-network-id"], dict):
599 for vim_account, vim_net in vld_params["vim-network-id"].items():
600 RO_vld_sites.append({
601 "netmap-use": vim_net,
602 "datacenter": vim_account_2_RO(vim_account)
603 })
604 else: # isinstance str
605 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
606 if RO_vld_sites:
607 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
608 if vld_params.get("ns-net"):
609 if isinstance(vld_params["ns-net"], dict):
610 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
611 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
612 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
613 if "vnfd-connection-point-ref" in vld_params:
614 for cp_params in vld_params["vnfd-connection-point-ref"]:
615 # look for interface
616 for constituent_vnfd in nsd["constituent-vnfd"]:
617 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
618 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
619 break
620 else:
621 raise LcmException(
622 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
623 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
624 match_cp = False
625 for vdu_descriptor in vnf_descriptor["vdu"]:
626 for interface_descriptor in vdu_descriptor["interface"]:
627 if interface_descriptor.get("external-connection-point-ref") == \
628 cp_params["vnfd-connection-point-ref"]:
629 match_cp = True
630 break
631 if match_cp:
632 break
633 else:
634 raise LcmException(
635 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
636 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
637 cp_params["member-vnf-index-ref"],
638 cp_params["vnfd-connection-point-ref"],
639 vnf_descriptor["id"]))
640 if cp_params.get("ip-address"):
641 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
642 vdu_descriptor["id"], "interfaces",
643 interface_descriptor["name"], "ip_address"),
644 cp_params["ip-address"])
645 if cp_params.get("mac-address"):
646 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
647 vdu_descriptor["id"], "interfaces",
648 interface_descriptor["name"], "mac_address"),
649 cp_params["mac-address"])
650 return RO_ns_params
651
652 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
653 # make a copy to do not change
654 vdu_create = copy(vdu_create)
655 vdu_delete = copy(vdu_delete)
656
657 vdurs = db_vnfr.get("vdur")
658 if vdurs is None:
659 vdurs = []
660 vdu_index = len(vdurs)
661 while vdu_index:
662 vdu_index -= 1
663 vdur = vdurs[vdu_index]
664 if vdur.get("pdu-type"):
665 continue
666 vdu_id_ref = vdur["vdu-id-ref"]
667 if vdu_create and vdu_create.get(vdu_id_ref):
668 vdur_copy = deepcopy(vdur)
669 vdur_copy["status"] = "BUILD"
670 vdur_copy["status-detailed"] = None
671 vdur_copy["ip_address"]: None
672 for iface in vdur_copy["interfaces"]:
673 iface["ip-address"] = None
674 iface["mac-address"] = None
675 iface.pop("mgmt_vnf", None) # only first vdu can be managment of vnf # TODO ALF
676 for index in range(0, vdu_create[vdu_id_ref]):
677 vdur_copy["_id"] = str(uuid4())
678 vdur_copy["count-index"] += 1
679 vdurs.insert(vdu_index+1+index, vdur_copy)
680 self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
681 vdur_copy = deepcopy(vdur_copy)
682
683 del vdu_create[vdu_id_ref]
684 if vdu_delete and vdu_delete.get(vdu_id_ref):
685 del vdurs[vdu_index]
686 vdu_delete[vdu_id_ref] -= 1
687 if not vdu_delete[vdu_id_ref]:
688 del vdu_delete[vdu_id_ref]
689 # check all operations are done
690 if vdu_create or vdu_delete:
691 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
692 vdu_create))
693 if vdu_delete:
694 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
695 vdu_delete))
696
697 vnfr_update = {"vdur": vdurs}
698 db_vnfr["vdur"] = vdurs
699 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
700
701 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
702 """
703 Updates database nsr with the RO info for the created vld
704 :param ns_update_nsr: dictionary to be filled with the updated info
705 :param db_nsr: content of db_nsr. This is also modified
706 :param nsr_desc_RO: nsr descriptor from RO
707 :return: Nothing, LcmException is raised on errors
708 """
709
710 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
711 for net_RO in get_iterable(nsr_desc_RO, "nets"):
712 if vld["id"] != net_RO.get("ns_net_osm_id"):
713 continue
714 vld["vim-id"] = net_RO.get("vim_net_id")
715 vld["name"] = net_RO.get("vim_name")
716 vld["status"] = net_RO.get("status")
717 vld["status-detailed"] = net_RO.get("error_msg")
718 ns_update_nsr["vld.{}".format(vld_index)] = vld
719 break
720 else:
721 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
722
723 def set_vnfr_at_error(self, db_vnfrs, error_text):
724 try:
725 for db_vnfr in db_vnfrs.values():
726 vnfr_update = {"status": "ERROR"}
727 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
728 if "status" not in vdur:
729 vdur["status"] = "ERROR"
730 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
731 if error_text:
732 vdur["status-detailed"] = str(error_text)
733 vnfr_update["vdur.{}.status-detailed".format(vdu_index)] = "ERROR"
734 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
735 except DbException as e:
736 self.logger.error("Cannot update vnf. {}".format(e))
737
738 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
739 """
740 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
741 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
742 :param nsr_desc_RO: nsr descriptor from RO
743 :return: Nothing, LcmException is raised on errors
744 """
745 for vnf_index, db_vnfr in db_vnfrs.items():
746 for vnf_RO in nsr_desc_RO["vnfs"]:
747 if vnf_RO["member_vnf_index"] != vnf_index:
748 continue
749 vnfr_update = {}
750 if vnf_RO.get("ip_address"):
751 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
752 elif not db_vnfr.get("ip-address"):
753 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
754 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
755
756 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
757 vdur_RO_count_index = 0
758 if vdur.get("pdu-type"):
759 continue
760 for vdur_RO in get_iterable(vnf_RO, "vms"):
761 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
762 continue
763 if vdur["count-index"] != vdur_RO_count_index:
764 vdur_RO_count_index += 1
765 continue
766 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
767 if vdur_RO.get("ip_address"):
768 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
769 else:
770 vdur["ip-address"] = None
771 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
772 vdur["name"] = vdur_RO.get("vim_name")
773 vdur["status"] = vdur_RO.get("status")
774 vdur["status-detailed"] = vdur_RO.get("error_msg")
775 for ifacer in get_iterable(vdur, "interfaces"):
776 for interface_RO in get_iterable(vdur_RO, "interfaces"):
777 if ifacer["name"] == interface_RO.get("internal_name"):
778 ifacer["ip-address"] = interface_RO.get("ip_address")
779 ifacer["mac-address"] = interface_RO.get("mac_address")
780 break
781 else:
782 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
783 "from VIM info"
784 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
785 vnfr_update["vdur.{}".format(vdu_index)] = vdur
786 break
787 else:
788 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
789 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
790
791 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
792 for net_RO in get_iterable(nsr_desc_RO, "nets"):
793 if vld["id"] != net_RO.get("vnf_net_osm_id"):
794 continue
795 vld["vim-id"] = net_RO.get("vim_net_id")
796 vld["name"] = net_RO.get("vim_name")
797 vld["status"] = net_RO.get("status")
798 vld["status-detailed"] = net_RO.get("error_msg")
799 vnfr_update["vld.{}".format(vld_index)] = vld
800 break
801 else:
802 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
803 vnf_index, vld["id"]))
804
805 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
806 break
807
808 else:
809 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
810
811 def _get_ns_config_info(self, nsr_id):
812 """
813 Generates a mapping between vnf,vdu elements and the N2VC id
814 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
815 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
816 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
817 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
818 """
819 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
820 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
821 mapping = {}
822 ns_config_info = {"osm-config-mapping": mapping}
823 for vca in vca_deployed_list:
824 if not vca["member-vnf-index"]:
825 continue
826 if not vca["vdu_id"]:
827 mapping[vca["member-vnf-index"]] = vca["application"]
828 else:
829 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
830 vca["application"]
831 return ns_config_info
832
833 @staticmethod
834 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed, ee_descriptor_id):
835 """
836 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
837 primitives as verify-ssh-credentials, or config when needed
838 :param desc_primitive_list: information of the descriptor
839 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
840 this element contains a ssh public key
841 :param ee_descriptor_id: execution environment descriptor id. It is the value of
842 XXX_configuration.execution-environment-list.INDEX.id; it can be None
843 :return: The modified list. Can ba an empty list, but always a list
844 """
845
846 primitive_list = desc_primitive_list or []
847
848 # filter primitives by ee_id
849 primitive_list = [p for p in primitive_list if p.get("execution-environment-ref") == ee_descriptor_id]
850
851 # sort by 'seq'
852 if primitive_list:
853 primitive_list.sort(key=lambda val: int(val['seq']))
854
855 # look for primitive config, and get the position. None if not present
856 config_position = None
857 for index, primitive in enumerate(primitive_list):
858 if primitive["name"] == "config":
859 config_position = index
860 break
861
862 # for NS, add always a config primitive if not present (bug 874)
863 if not vca_deployed["member-vnf-index"] and config_position is None:
864 primitive_list.insert(0, {"name": "config", "parameter": []})
865 config_position = 0
866 # TODO revise if needed: for VNF/VDU add verify-ssh-credentials after config
867 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
868 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
869 return primitive_list
870
871 async def _instantiate_ng_ro(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
872 n2vc_key_list, stage, start_deploy, timeout_ns_deploy):
873 nslcmop_id = db_nslcmop["_id"]
874 target = {
875 "name": db_nsr["name"],
876 "ns": {"vld": []},
877 "vnf": [],
878 "image": deepcopy(db_nsr["image"]),
879 "flavor": deepcopy(db_nsr["flavor"]),
880 "action_id": nslcmop_id,
881 }
882 for image in target["image"]:
883 image["vim_info"] = []
884 for flavor in target["flavor"]:
885 flavor["vim_info"] = []
886
887 ns_params = db_nslcmop.get("operationParams")
888 ssh_keys = []
889 if ns_params.get("ssh_keys"):
890 ssh_keys += ns_params.get("ssh_keys")
891 if n2vc_key_list:
892 ssh_keys += n2vc_key_list
893
894 cp2target = {}
895 for vld_index, vld in enumerate(nsd.get("vld")):
896 target_vld = {"id": vld["id"],
897 "name": vld["name"],
898 "mgmt-network": vld.get("mgmt-network", False),
899 "type": vld.get("type"),
900 "vim_info": [{"vim-network-name": vld.get("vim-network-name"),
901 "vim_account_id": ns_params["vimAccountId"]}],
902 }
903 for cp in vld["vnfd-connection-point-ref"]:
904 cp2target["member_vnf:{}.{}".format(cp["member-vnf-index-ref"], cp["vnfd-connection-point-ref"])] = \
905 "nsrs:{}:vld.{}".format(nsr_id, vld_index)
906 target["ns"]["vld"].append(target_vld)
907 for vnfr in db_vnfrs.values():
908 vnfd = db_vnfds_ref[vnfr["vnfd-ref"]]
909 target_vnf = deepcopy(vnfr)
910 for vld in target_vnf.get("vld", ()):
911 # check if connected to a ns.vld
912 vnf_cp = next((cp for cp in vnfd.get("connection-point", ()) if
913 cp.get("internal-vld-ref") == vld["id"]), None)
914 if vnf_cp:
915 ns_cp = "member_vnf:{}.{}".format(vnfr["member-vnf-index-ref"], vnf_cp["id"])
916 if cp2target.get(ns_cp):
917 vld["target"] = cp2target[ns_cp]
918 vld["vim_info"] = [{"vim-network-name": vld.get("vim-network-name"),
919 "vim_account_id": vnfr["vim-account-id"]}]
920
921 for vdur in target_vnf.get("vdur", ()):
922 vdur["vim_info"] = [{"vim_account_id": vnfr["vim-account-id"]}]
923 vdud_index, vdud = next(k for k in enumerate(vnfd["vdu"]) if k[1]["id"] == vdur["vdu-id-ref"])
924 # vdur["additionalParams"] = vnfr.get("additionalParamsForVnf") # TODO additional params for VDU
925
926 if ssh_keys:
927 if deep_get(vdud, ("vdu-configuration", "config-access", "ssh-access", "required")):
928 vdur["ssh-keys"] = ssh_keys
929 vdur["ssh-access-required"] = True
930 elif deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required")) and \
931 any(iface.get("mgmt-vnf") for iface in vdur["interfaces"]):
932 vdur["ssh-keys"] = ssh_keys
933 vdur["ssh-access-required"] = True
934
935 # cloud-init
936 if vdud.get("cloud-init-file"):
937 vdur["cloud-init"] = "{}:file:{}".format(vnfd["_id"], vdud.get("cloud-init-file"))
938 elif vdud.get("cloud-init"):
939 vdur["cloud-init"] = "{}:vdu:{}".format(vnfd["_id"], vdud_index)
940
941 # flavor
942 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
943 if not next((vi for vi in ns_flavor["vim_info"] if
944 vi and vi.get("vim_account_id") == vnfr["vim-account-id"]), None):
945 ns_flavor["vim_info"].append({"vim_account_id": vnfr["vim-account-id"]})
946 # image
947 ns_image = target["image"][int(vdur["ns-image-id"])]
948 if not next((vi for vi in ns_image["vim_info"] if
949 vi and vi.get("vim_account_id") == vnfr["vim-account-id"]), None):
950 ns_image["vim_info"].append({"vim_account_id": vnfr["vim-account-id"]})
951
952 vdur["vim_info"] = [{"vim_account_id": vnfr["vim-account-id"]}]
953 target["vnf"].append(target_vnf)
954
955 desc = await self.RO.deploy(nsr_id, target)
956 action_id = desc["action_id"]
957 await self._wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage)
958
959 # Updating NSR
960 db_nsr_update = {
961 "_admin.deployed.RO.operational-status": "running",
962 "detailed-status": " ".join(stage)
963 }
964 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
965 self.update_db_2("nsrs", nsr_id, db_nsr_update)
966 self._write_op_status(nslcmop_id, stage)
967 self.logger.debug(logging_text + "ns deployed at RO. RO_id={}".format(action_id))
968 return
969
970 async def _wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_time, timeout, stage):
971 detailed_status_old = None
972 db_nsr_update = {}
973 while time() <= start_time + timeout:
974 desc_status = await self.RO.status(nsr_id, action_id)
975 if desc_status["status"] == "FAILED":
976 raise NgRoException(desc_status["details"])
977 elif desc_status["status"] == "BUILD":
978 stage[2] = "VIM: ({})".format(desc_status["details"])
979 elif desc_status["status"] == "DONE":
980 stage[2] = "Deployed at VIM"
981 break
982 else:
983 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status["status"])
984 if stage[2] != detailed_status_old:
985 detailed_status_old = stage[2]
986 db_nsr_update["detailed-status"] = " ".join(stage)
987 self.update_db_2("nsrs", nsr_id, db_nsr_update)
988 self._write_op_status(nslcmop_id, stage)
989 await asyncio.sleep(5, loop=self.loop)
990 else: # timeout_ns_deploy
991 raise NgRoException("Timeout waiting ns to deploy")
992
993 async def _terminate_ng_ro(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
994 db_nsr_update = {}
995 failed_detail = []
996 action_id = None
997 start_deploy = time()
998 try:
999 target = {
1000 "ns": {"vld": []},
1001 "vnf": [],
1002 "image": [],
1003 "flavor": [],
1004 }
1005 desc = await self.RO.deploy(nsr_id, target)
1006 action_id = desc["action_id"]
1007 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1008 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1009 self.logger.debug(logging_text + "ns terminate action at RO. action_id={}".format(action_id))
1010
1011 # wait until done
1012 delete_timeout = 20 * 60 # 20 minutes
1013 await self._wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage)
1014
1015 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1016 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1017 # delete all nsr
1018 await self.RO.delete(nsr_id)
1019 except Exception as e:
1020 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1021 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1022 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1023 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1024 self.logger.debug(logging_text + "RO_action_id={} already deleted".format(action_id))
1025 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1026 failed_detail.append("delete conflict: {}".format(e))
1027 self.logger.debug(logging_text + "RO_action_id={} delete conflict: {}".format(action_id, e))
1028 else:
1029 failed_detail.append("delete error: {}".format(e))
1030 self.logger.error(logging_text + "RO_action_id={} delete error: {}".format(action_id, e))
1031
1032 if failed_detail:
1033 stage[2] = "Error deleting from VIM"
1034 else:
1035 stage[2] = "Deleted from VIM"
1036 db_nsr_update["detailed-status"] = " ".join(stage)
1037 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1038 self._write_op_status(nslcmop_id, stage)
1039
1040 if failed_detail:
1041 raise LcmException("; ".join(failed_detail))
1042 return
1043
1044 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
1045 n2vc_key_list, stage):
1046 """
1047 Instantiate at RO
1048 :param logging_text: preffix text to use at logging
1049 :param nsr_id: nsr identity
1050 :param nsd: database content of ns descriptor
1051 :param db_nsr: database content of ns record
1052 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1053 :param db_vnfrs:
1054 :param db_vnfds_ref: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1055 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1056 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1057 :return: None or exception
1058 """
1059 try:
1060 db_nsr_update = {}
1061 RO_descriptor_number = 0 # number of descriptors created at RO
1062 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
1063 nslcmop_id = db_nslcmop["_id"]
1064 start_deploy = time()
1065 ns_params = db_nslcmop.get("operationParams")
1066 if ns_params and ns_params.get("timeout_ns_deploy"):
1067 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1068 else:
1069 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1070
1071 # Check for and optionally request placement optimization. Database will be updated if placement activated
1072 stage[2] = "Waiting for Placement."
1073 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1074 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1075 for vnfr in db_vnfrs.values():
1076 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1077 break
1078 else:
1079 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1080
1081 if self.ng_ro:
1082 return await self._instantiate_ng_ro(logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs,
1083 db_vnfds_ref, n2vc_key_list, stage, start_deploy,
1084 timeout_ns_deploy)
1085 # deploy RO
1086 # get vnfds, instantiate at RO
1087 for c_vnf in nsd.get("constituent-vnfd", ()):
1088 member_vnf_index = c_vnf["member-vnf-index"]
1089 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
1090 vnfd_ref = vnfd["id"]
1091
1092 stage[2] = "Creating vnfd='{}' member_vnf_index='{}' at RO".format(vnfd_ref, member_vnf_index)
1093 db_nsr_update["detailed-status"] = " ".join(stage)
1094 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1095 self._write_op_status(nslcmop_id, stage)
1096
1097 # self.logger.debug(logging_text + stage[2])
1098 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
1099 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
1100 RO_descriptor_number += 1
1101
1102 # look position at deployed.RO.vnfd if not present it will be appended at the end
1103 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
1104 if vnf_deployed["member-vnf-index"] == member_vnf_index:
1105 break
1106 else:
1107 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
1108 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
1109
1110 # look if present
1111 RO_update = {"member-vnf-index": member_vnf_index}
1112 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
1113 if vnfd_list:
1114 RO_update["id"] = vnfd_list[0]["uuid"]
1115 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
1116 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
1117 else:
1118 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
1119 get("additionalParamsForVnf"), nsr_id)
1120 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
1121 RO_update["id"] = desc["uuid"]
1122 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
1123 vnfd_ref, member_vnf_index, desc["uuid"]))
1124 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
1125 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
1126
1127 # create nsd at RO
1128 nsd_ref = nsd["id"]
1129
1130 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
1131 db_nsr_update["detailed-status"] = " ".join(stage)
1132 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1133 self._write_op_status(nslcmop_id, stage)
1134
1135 # self.logger.debug(logging_text + stage[2])
1136 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
1137 RO_descriptor_number += 1
1138 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
1139 if nsd_list:
1140 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
1141 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
1142 nsd_ref, RO_nsd_uuid))
1143 else:
1144 nsd_RO = deepcopy(nsd)
1145 nsd_RO["id"] = RO_osm_nsd_id
1146 nsd_RO.pop("_id", None)
1147 nsd_RO.pop("_admin", None)
1148 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
1149 member_vnf_index = c_vnf["member-vnf-index"]
1150 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
1151 for c_vld in nsd_RO.get("vld", ()):
1152 for cp in c_vld.get("vnfd-connection-point-ref", ()):
1153 member_vnf_index = cp["member-vnf-index-ref"]
1154 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
1155
1156 desc = await self.RO.create("nsd", descriptor=nsd_RO)
1157 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1158 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
1159 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
1160 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1161
1162 # Crate ns at RO
1163 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
1164 db_nsr_update["detailed-status"] = " ".join(stage)
1165 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1166 self._write_op_status(nslcmop_id, stage)
1167
1168 # if present use it unless in error status
1169 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
1170 if RO_nsr_id:
1171 try:
1172 stage[2] = "Looking for existing ns at RO"
1173 db_nsr_update["detailed-status"] = " ".join(stage)
1174 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1175 self._write_op_status(nslcmop_id, stage)
1176 # self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
1177 desc = await self.RO.show("ns", RO_nsr_id)
1178
1179 except ROclient.ROClientException as e:
1180 if e.http_code != HTTPStatus.NOT_FOUND:
1181 raise
1182 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1183 if RO_nsr_id:
1184 ns_status, ns_status_info = self.RO.check_ns_status(desc)
1185 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
1186 if ns_status == "ERROR":
1187 stage[2] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
1188 self.logger.debug(logging_text + stage[2])
1189 await self.RO.delete("ns", RO_nsr_id)
1190 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1191 if not RO_nsr_id:
1192 stage[2] = "Checking dependencies"
1193 db_nsr_update["detailed-status"] = " ".join(stage)
1194 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1195 self._write_op_status(nslcmop_id, stage)
1196 # self.logger.debug(logging_text + stage[2])
1197
1198 # check if VIM is creating and wait look if previous tasks in process
1199 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
1200 if task_dependency:
1201 stage[2] = "Waiting for related tasks '{}' to be completed".format(task_name)
1202 self.logger.debug(logging_text + stage[2])
1203 await asyncio.wait(task_dependency, timeout=3600)
1204 if ns_params.get("vnf"):
1205 for vnf in ns_params["vnf"]:
1206 if "vimAccountId" in vnf:
1207 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
1208 vnf["vimAccountId"])
1209 if task_dependency:
1210 stage[2] = "Waiting for related tasks '{}' to be completed.".format(task_name)
1211 self.logger.debug(logging_text + stage[2])
1212 await asyncio.wait(task_dependency, timeout=3600)
1213
1214 stage[2] = "Checking instantiation parameters."
1215 RO_ns_params = self._ns_params_2_RO(ns_params, nsd, db_vnfds_ref, db_vnfrs, n2vc_key_list)
1216 stage[2] = "Deploying ns at VIM."
1217 db_nsr_update["detailed-status"] = " ".join(stage)
1218 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1219 self._write_op_status(nslcmop_id, stage)
1220
1221 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
1222 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
1223 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1224 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
1225 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
1226
1227 # wait until NS is ready
1228 stage[2] = "Waiting VIM to deploy ns."
1229 db_nsr_update["detailed-status"] = " ".join(stage)
1230 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1231 self._write_op_status(nslcmop_id, stage)
1232 detailed_status_old = None
1233 self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
1234
1235 old_desc = None
1236 while time() <= start_deploy + timeout_ns_deploy:
1237 desc = await self.RO.show("ns", RO_nsr_id)
1238
1239 # deploymentStatus
1240 if desc != old_desc:
1241 # desc has changed => update db
1242 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
1243 old_desc = desc
1244
1245 ns_status, ns_status_info = self.RO.check_ns_status(desc)
1246 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
1247 if ns_status == "ERROR":
1248 raise ROclient.ROClientException(ns_status_info)
1249 elif ns_status == "BUILD":
1250 stage[2] = "VIM: ({})".format(ns_status_info)
1251 elif ns_status == "ACTIVE":
1252 stage[2] = "Waiting for management IP address reported by the VIM. Updating VNFRs."
1253 try:
1254 self.ns_update_vnfr(db_vnfrs, desc)
1255 break
1256 except LcmExceptionNoMgmtIP:
1257 pass
1258 else:
1259 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
1260 if stage[2] != detailed_status_old:
1261 detailed_status_old = stage[2]
1262 db_nsr_update["detailed-status"] = " ".join(stage)
1263 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1264 self._write_op_status(nslcmop_id, stage)
1265 await asyncio.sleep(5, loop=self.loop)
1266 else: # timeout_ns_deploy
1267 raise ROclient.ROClientException("Timeout waiting ns to be ready")
1268
1269 # Updating NSR
1270 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
1271
1272 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
1273 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1274 stage[2] = "Deployed at VIM"
1275 db_nsr_update["detailed-status"] = " ".join(stage)
1276 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1277 self._write_op_status(nslcmop_id, stage)
1278 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
1279 # self.logger.debug(logging_text + "Deployed at VIM")
1280 except (ROclient.ROClientException, LcmException, DbException, NgRoException) as e:
1281 stage[2] = "ERROR deploying at VIM"
1282 self.set_vnfr_at_error(db_vnfrs, str(e))
1283 raise
1284
1285 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1286 """
1287 Wait for kdu to be up, get ip address
1288 :param logging_text: prefix use for logging
1289 :param nsr_id:
1290 :param vnfr_id:
1291 :param kdu_name:
1292 :return: IP address
1293 """
1294
1295 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1296 nb_tries = 0
1297
1298 while nb_tries < 360:
1299 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1300 kdur = next((x for x in get_iterable(db_vnfr, "kdur") if x.get("kdu-name") == kdu_name), None)
1301 if not kdur:
1302 raise LcmException("Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name))
1303 if kdur.get("status"):
1304 if kdur["status"] in ("READY", "ENABLED"):
1305 return kdur.get("ip-address")
1306 else:
1307 raise LcmException("target KDU={} is in error state".format(kdu_name))
1308
1309 await asyncio.sleep(10, loop=self.loop)
1310 nb_tries += 1
1311 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1312
1313 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
1314 """
1315 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1316 :param logging_text: prefix use for logging
1317 :param nsr_id:
1318 :param vnfr_id:
1319 :param vdu_id:
1320 :param vdu_index:
1321 :param pub_key: public ssh key to inject, None to skip
1322 :param user: user to apply the public ssh key
1323 :return: IP address
1324 """
1325
1326 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1327 ro_nsr_id = None
1328 ip_address = None
1329 nb_tries = 0
1330 target_vdu_id = None
1331 ro_retries = 0
1332
1333 while True:
1334
1335 ro_retries += 1
1336 if ro_retries >= 360: # 1 hour
1337 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
1338
1339 await asyncio.sleep(10, loop=self.loop)
1340
1341 # get ip address
1342 if not target_vdu_id:
1343 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1344
1345 if not vdu_id: # for the VNF case
1346 if db_vnfr.get("status") == "ERROR":
1347 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1348 ip_address = db_vnfr.get("ip-address")
1349 if not ip_address:
1350 continue
1351 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
1352 else: # VDU case
1353 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
1354 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
1355
1356 if not vdur and len(db_vnfr.get("vdur", ())) == 1: # If only one, this should be the target vdu
1357 vdur = db_vnfr["vdur"][0]
1358 if not vdur:
1359 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id, vdu_id,
1360 vdu_index))
1361
1362 if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE":
1363 ip_address = vdur.get("ip-address")
1364 if not ip_address:
1365 continue
1366 target_vdu_id = vdur["vdu-id-ref"]
1367 elif vdur.get("status") == "ERROR":
1368 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1369
1370 if not target_vdu_id:
1371 continue
1372
1373 # inject public key into machine
1374 if pub_key and user:
1375 # wait until NS is deployed at RO
1376 if not ro_nsr_id:
1377 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1378 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
1379 if not ro_nsr_id:
1380 continue
1381
1382 # self.logger.debug(logging_text + "Inserting RO key")
1383 if vdur.get("pdu-type"):
1384 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1385 return ip_address
1386 try:
1387 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
1388 if self.ng_ro:
1389 target = {"action": "inject_ssh_key", "key": pub_key, "user": user,
1390 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdu_id}]}],
1391 }
1392 await self.RO.deploy(nsr_id, target)
1393 else:
1394 result_dict = await self.RO.create_action(
1395 item="ns",
1396 item_id_name=ro_nsr_id,
1397 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
1398 )
1399 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1400 if not result_dict or not isinstance(result_dict, dict):
1401 raise LcmException("Unknown response from RO when injecting key")
1402 for result in result_dict.values():
1403 if result.get("vim_result") == 200:
1404 break
1405 else:
1406 raise ROclient.ROClientException("error injecting key: {}".format(
1407 result.get("description")))
1408 break
1409 except NgRoException as e:
1410 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1411 except ROclient.ROClientException as e:
1412 if not nb_tries:
1413 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1414 format(e, 20*10))
1415 nb_tries += 1
1416 if nb_tries >= 20:
1417 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1418 else:
1419 break
1420
1421 return ip_address
1422
1423 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1424 """
1425 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1426 """
1427 my_vca = vca_deployed_list[vca_index]
1428 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1429 # vdu or kdu: no dependencies
1430 return
1431 timeout = 300
1432 while timeout >= 0:
1433 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1434 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1435 configuration_status_list = db_nsr["configurationStatus"]
1436 for index, vca_deployed in enumerate(configuration_status_list):
1437 if index == vca_index:
1438 # myself
1439 continue
1440 if not my_vca.get("member-vnf-index") or \
1441 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1442 internal_status = configuration_status_list[index].get("status")
1443 if internal_status == 'READY':
1444 continue
1445 elif internal_status == 'BROKEN':
1446 raise LcmException("Configuration aborted because dependent charm/s has failed")
1447 else:
1448 break
1449 else:
1450 # no dependencies, return
1451 return
1452 await asyncio.sleep(10)
1453 timeout -= 1
1454
1455 raise LcmException("Configuration aborted because dependent charm/s timeout")
1456
1457 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
1458 config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name,
1459 ee_config_descriptor):
1460 nsr_id = db_nsr["_id"]
1461 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1462 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1463 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1464 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1465 db_dict = {
1466 'collection': 'nsrs',
1467 'filter': {'_id': nsr_id},
1468 'path': db_update_entry
1469 }
1470 step = ""
1471 try:
1472
1473 element_type = 'NS'
1474 element_under_configuration = nsr_id
1475
1476 vnfr_id = None
1477 if db_vnfr:
1478 vnfr_id = db_vnfr["_id"]
1479 osm_config["osm"]["vnf_id"] = vnfr_id
1480
1481 namespace = "{nsi}.{ns}".format(
1482 nsi=nsi_id if nsi_id else "",
1483 ns=nsr_id)
1484
1485 if vnfr_id:
1486 element_type = 'VNF'
1487 element_under_configuration = vnfr_id
1488 namespace += ".{}".format(vnfr_id)
1489 if vdu_id:
1490 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1491 element_type = 'VDU'
1492 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1493 osm_config["osm"]["vdu_id"] = vdu_id
1494 elif kdu_name:
1495 namespace += ".{}".format(kdu_name)
1496 element_type = 'KDU'
1497 element_under_configuration = kdu_name
1498 osm_config["osm"]["kdu_name"] = kdu_name
1499
1500 # Get artifact path
1501 artifact_path = "{}/{}/{}/{}".format(
1502 base_folder["folder"],
1503 base_folder["pkg-dir"],
1504 "charms" if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1505 vca_name
1506 )
1507 # get initial_config_primitive_list that applies to this element
1508 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1509
1510 # add config if not present for NS charm
1511 ee_descriptor_id = ee_config_descriptor.get("id")
1512 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1513 vca_deployed, ee_descriptor_id)
1514
1515 # n2vc_redesign STEP 3.1
1516 # find old ee_id if exists
1517 ee_id = vca_deployed.get("ee_id")
1518
1519 vim_account_id = (
1520 deep_get(db_vnfr, ("vim-account-id",)) or
1521 deep_get(deploy_params, ("OSM", "vim_account_id"))
1522 )
1523 vca_cloud, vca_cloud_credential = self.get_vca_cloud_and_credentials(vim_account_id)
1524 vca_k8s_cloud, vca_k8s_cloud_credential = self.get_vca_k8s_cloud_and_credentials(vim_account_id)
1525 # create or register execution environment in VCA
1526 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1527
1528 self._write_configuration_status(
1529 nsr_id=nsr_id,
1530 vca_index=vca_index,
1531 status='CREATING',
1532 element_under_configuration=element_under_configuration,
1533 element_type=element_type
1534 )
1535
1536 step = "create execution environment"
1537 self.logger.debug(logging_text + step)
1538
1539 ee_id = None
1540 credentials = None
1541 if vca_type == "k8s_proxy_charm":
1542 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1543 charm_name=artifact_path[artifact_path.rfind("/") + 1:],
1544 namespace=namespace,
1545 artifact_path=artifact_path,
1546 db_dict=db_dict,
1547 cloud_name=vca_k8s_cloud,
1548 credential_name=vca_k8s_cloud_credential,
1549 )
1550 else:
1551 ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
1552 namespace=namespace,
1553 reuse_ee_id=ee_id,
1554 db_dict=db_dict,
1555 config=osm_config,
1556 cloud_name=vca_cloud,
1557 credential_name=vca_cloud_credential,
1558 )
1559
1560 elif vca_type == "native_charm":
1561 step = "Waiting to VM being up and getting IP address"
1562 self.logger.debug(logging_text + step)
1563 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1564 user=None, pub_key=None)
1565 credentials = {"hostname": rw_mgmt_ip}
1566 # get username
1567 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1568 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1569 # merged. Meanwhile let's get username from initial-config-primitive
1570 if not username and initial_config_primitive_list:
1571 for config_primitive in initial_config_primitive_list:
1572 for param in config_primitive.get("parameter", ()):
1573 if param["name"] == "ssh-username":
1574 username = param["value"]
1575 break
1576 if not username:
1577 raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
1578 "'config-access.ssh-access.default-user'")
1579 credentials["username"] = username
1580 # n2vc_redesign STEP 3.2
1581
1582 self._write_configuration_status(
1583 nsr_id=nsr_id,
1584 vca_index=vca_index,
1585 status='REGISTERING',
1586 element_under_configuration=element_under_configuration,
1587 element_type=element_type
1588 )
1589
1590 step = "register execution environment {}".format(credentials)
1591 self.logger.debug(logging_text + step)
1592 ee_id = await self.vca_map[vca_type].register_execution_environment(
1593 credentials=credentials,
1594 namespace=namespace,
1595 db_dict=db_dict,
1596 cloud_name=vca_cloud,
1597 credential_name=vca_cloud_credential,
1598 )
1599
1600 # for compatibility with MON/POL modules, the need model and application name at database
1601 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1602 ee_id_parts = ee_id.split('.')
1603 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1604 if len(ee_id_parts) >= 2:
1605 model_name = ee_id_parts[0]
1606 application_name = ee_id_parts[1]
1607 db_nsr_update[db_update_entry + "model"] = model_name
1608 db_nsr_update[db_update_entry + "application"] = application_name
1609
1610 # n2vc_redesign STEP 3.3
1611 step = "Install configuration Software"
1612
1613 self._write_configuration_status(
1614 nsr_id=nsr_id,
1615 vca_index=vca_index,
1616 status='INSTALLING SW',
1617 element_under_configuration=element_under_configuration,
1618 element_type=element_type,
1619 other_update=db_nsr_update
1620 )
1621
1622 # TODO check if already done
1623 self.logger.debug(logging_text + step)
1624 config = None
1625 if vca_type == "native_charm":
1626 config_primitive = next((p for p in initial_config_primitive_list if p["name"] == "config"), None)
1627 if config_primitive:
1628 config = self._map_primitive_params(
1629 config_primitive,
1630 {},
1631 deploy_params
1632 )
1633 num_units = 1
1634 if vca_type == "lxc_proxy_charm":
1635 if element_type == "NS":
1636 num_units = db_nsr.get("config-units") or 1
1637 elif element_type == "VNF":
1638 num_units = db_vnfr.get("config-units") or 1
1639 elif element_type == "VDU":
1640 for v in db_vnfr["vdur"]:
1641 if vdu_id == v["vdu-id-ref"]:
1642 num_units = v.get("config-units") or 1
1643 break
1644 if vca_type != "k8s_proxy_charm":
1645 await self.vca_map[vca_type].install_configuration_sw(
1646 ee_id=ee_id,
1647 artifact_path=artifact_path,
1648 db_dict=db_dict,
1649 config=config,
1650 num_units=num_units,
1651 )
1652
1653 # write in db flag of configuration_sw already installed
1654 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1655
1656 # add relations for this VCA (wait for other peers related with this VCA)
1657 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id,
1658 vca_index=vca_index, vca_type=vca_type)
1659
1660 # if SSH access is required, then get execution environment SSH public
1661 # if native charm we have waited already to VM be UP
1662 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1663 pub_key = None
1664 user = None
1665 # self.logger.debug("get ssh key block")
1666 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1667 # self.logger.debug("ssh key needed")
1668 # Needed to inject a ssh key
1669 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1670 step = "Install configuration Software, getting public ssh key"
1671 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1672
1673 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1674 else:
1675 # self.logger.debug("no need to get ssh key")
1676 step = "Waiting to VM being up and getting IP address"
1677 self.logger.debug(logging_text + step)
1678
1679 # n2vc_redesign STEP 5.1
1680 # wait for RO (ip-address) Insert pub_key into VM
1681 if vnfr_id:
1682 if kdu_name:
1683 rw_mgmt_ip = await self.wait_kdu_up(logging_text, nsr_id, vnfr_id, kdu_name)
1684 else:
1685 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id,
1686 vdu_index, user=user, pub_key=pub_key)
1687 else:
1688 rw_mgmt_ip = None # This is for a NS configuration
1689
1690 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1691
1692 # store rw_mgmt_ip in deploy params for later replacement
1693 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1694
1695 # n2vc_redesign STEP 6 Execute initial config primitive
1696 step = 'execute initial config primitive'
1697
1698 # wait for dependent primitives execution (NS -> VNF -> VDU)
1699 if initial_config_primitive_list:
1700 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1701
1702 # stage, in function of element type: vdu, kdu, vnf or ns
1703 my_vca = vca_deployed_list[vca_index]
1704 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1705 # VDU or KDU
1706 stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1707 elif my_vca.get("member-vnf-index"):
1708 # VNF
1709 stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1710 else:
1711 # NS
1712 stage[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1713
1714 self._write_configuration_status(
1715 nsr_id=nsr_id,
1716 vca_index=vca_index,
1717 status='EXECUTING PRIMITIVE'
1718 )
1719
1720 self._write_op_status(
1721 op_id=nslcmop_id,
1722 stage=stage
1723 )
1724
1725 check_if_terminated_needed = True
1726 for initial_config_primitive in initial_config_primitive_list:
1727 # adding information on the vca_deployed if it is a NS execution environment
1728 if not vca_deployed["member-vnf-index"]:
1729 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1730 # TODO check if already done
1731 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1732
1733 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1734 self.logger.debug(logging_text + step)
1735 await self.vca_map[vca_type].exec_primitive(
1736 ee_id=ee_id,
1737 primitive_name=initial_config_primitive["name"],
1738 params_dict=primitive_params_,
1739 db_dict=db_dict
1740 )
1741 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1742 if check_if_terminated_needed:
1743 if config_descriptor.get('terminate-config-primitive'):
1744 self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True})
1745 check_if_terminated_needed = False
1746
1747 # TODO register in database that primitive is done
1748
1749 # STEP 7 Configure metrics
1750 if vca_type == "helm" or vca_type == "helm-v3":
1751 prometheus_jobs = await self.add_prometheus_metrics(
1752 ee_id=ee_id,
1753 artifact_path=artifact_path,
1754 ee_config_descriptor=ee_config_descriptor,
1755 vnfr_id=vnfr_id,
1756 nsr_id=nsr_id,
1757 target_ip=rw_mgmt_ip,
1758 )
1759 if prometheus_jobs:
1760 self.update_db_2("nsrs", nsr_id, {db_update_entry + "prometheus_jobs": prometheus_jobs})
1761
1762 step = "instantiated at VCA"
1763 self.logger.debug(logging_text + step)
1764
1765 self._write_configuration_status(
1766 nsr_id=nsr_id,
1767 vca_index=vca_index,
1768 status='READY'
1769 )
1770
1771 except Exception as e: # TODO not use Exception but N2VC exception
1772 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1773 if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)):
1774 self.logger.error("Exception while {} : {}".format(step, e), exc_info=True)
1775 self._write_configuration_status(
1776 nsr_id=nsr_id,
1777 vca_index=vca_index,
1778 status='BROKEN'
1779 )
1780 raise LcmException("{} {}".format(step, e)) from e
1781
1782 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1783 error_description: str = None, error_detail: str = None, other_update: dict = None):
1784 """
1785 Update db_nsr fields.
1786 :param nsr_id:
1787 :param ns_state:
1788 :param current_operation:
1789 :param current_operation_id:
1790 :param error_description:
1791 :param error_detail:
1792 :param other_update: Other required changes at database if provided, will be cleared
1793 :return:
1794 """
1795 try:
1796 db_dict = other_update or {}
1797 db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility
1798 db_dict["_admin.current-operation"] = current_operation_id
1799 db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None
1800 db_dict["currentOperation"] = current_operation
1801 db_dict["currentOperationID"] = current_operation_id
1802 db_dict["errorDescription"] = error_description
1803 db_dict["errorDetail"] = error_detail
1804
1805 if ns_state:
1806 db_dict["nsState"] = ns_state
1807 self.update_db_2("nsrs", nsr_id, db_dict)
1808 except DbException as e:
1809 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1810
1811 def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0,
1812 operation_state: str = None, other_update: dict = None):
1813 try:
1814 db_dict = other_update or {}
1815 db_dict['queuePosition'] = queuePosition
1816 if isinstance(stage, list):
1817 db_dict['stage'] = stage[0]
1818 db_dict['detailed-status'] = " ".join(stage)
1819 elif stage is not None:
1820 db_dict['stage'] = str(stage)
1821
1822 if error_message is not None:
1823 db_dict['errorMessage'] = error_message
1824 if operation_state is not None:
1825 db_dict['operationState'] = operation_state
1826 db_dict["statusEnteredTime"] = time()
1827 self.update_db_2("nslcmops", op_id, db_dict)
1828 except DbException as e:
1829 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1830
1831 def _write_all_config_status(self, db_nsr: dict, status: str):
1832 try:
1833 nsr_id = db_nsr["_id"]
1834 # configurationStatus
1835 config_status = db_nsr.get('configurationStatus')
1836 if config_status:
1837 db_nsr_update = {"configurationStatus.{}.status".format(index): status for index, v in
1838 enumerate(config_status) if v}
1839 # update status
1840 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1841
1842 except DbException as e:
1843 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1844
1845 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1846 element_under_configuration: str = None, element_type: str = None,
1847 other_update: dict = None):
1848
1849 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1850 # .format(vca_index, status))
1851
1852 try:
1853 db_path = 'configurationStatus.{}.'.format(vca_index)
1854 db_dict = other_update or {}
1855 if status:
1856 db_dict[db_path + 'status'] = status
1857 if element_under_configuration:
1858 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1859 if element_type:
1860 db_dict[db_path + 'elementType'] = element_type
1861 self.update_db_2("nsrs", nsr_id, db_dict)
1862 except DbException as e:
1863 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1864 .format(status, nsr_id, vca_index, e))
1865
1866 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
1867 """
1868 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1869 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1870 Database is used because the result can be obtained from a different LCM worker in case of HA.
1871 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1872 :param db_nslcmop: database content of nslcmop
1873 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1874 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
1875 computed 'vim-account-id'
1876 """
1877 modified = False
1878 nslcmop_id = db_nslcmop['_id']
1879 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
1880 if placement_engine == "PLA":
1881 self.logger.debug(logging_text + "Invoke and wait for placement optimization")
1882 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id}, loop=self.loop)
1883 db_poll_interval = 5
1884 wait = db_poll_interval * 10
1885 pla_result = None
1886 while not pla_result and wait >= 0:
1887 await asyncio.sleep(db_poll_interval)
1888 wait -= db_poll_interval
1889 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1890 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
1891
1892 if not pla_result:
1893 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id))
1894
1895 for pla_vnf in pla_result['vnf']:
1896 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
1897 if not pla_vnf.get('vimAccountId') or not vnfr:
1898 continue
1899 modified = True
1900 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
1901 # Modifies db_vnfrs
1902 vnfr["vim-account-id"] = pla_vnf['vimAccountId']
1903 return modified
1904
1905 def update_nsrs_with_pla_result(self, params):
1906 try:
1907 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
1908 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
1909 except Exception as e:
1910 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
1911
1912 async def instantiate(self, nsr_id, nslcmop_id):
1913 """
1914
1915 :param nsr_id: ns instance to deploy
1916 :param nslcmop_id: operation to run
1917 :return:
1918 """
1919
1920 # Try to lock HA task here
1921 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1922 if not task_is_locked_by_me:
1923 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1924 return
1925
1926 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1927 self.logger.debug(logging_text + "Enter")
1928
1929 # get all needed from database
1930
1931 # database nsrs record
1932 db_nsr = None
1933
1934 # database nslcmops record
1935 db_nslcmop = None
1936
1937 # update operation on nsrs
1938 db_nsr_update = {}
1939 # update operation on nslcmops
1940 db_nslcmop_update = {}
1941
1942 nslcmop_operation_state = None
1943 db_vnfrs = {} # vnf's info indexed by member-index
1944 # n2vc_info = {}
1945 tasks_dict_info = {} # from task to info text
1946 exc = None
1947 error_list = []
1948 stage = ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1949 # ^ stage, step, VIM progress
1950 try:
1951 # wait for any previous tasks in process
1952 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1953
1954 stage[1] = "Sync filesystem from database."
1955 self.fs.sync() # TODO, make use of partial sync, only for the needed packages
1956
1957 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1958 stage[1] = "Reading from database."
1959 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1960 db_nsr_update["detailed-status"] = "creating"
1961 db_nsr_update["operational-status"] = "init"
1962 self._write_ns_status(
1963 nsr_id=nsr_id,
1964 ns_state="BUILDING",
1965 current_operation="INSTANTIATING",
1966 current_operation_id=nslcmop_id,
1967 other_update=db_nsr_update
1968 )
1969 self._write_op_status(
1970 op_id=nslcmop_id,
1971 stage=stage,
1972 queuePosition=0
1973 )
1974
1975 # read from db: operation
1976 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
1977 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1978 ns_params = db_nslcmop.get("operationParams")
1979 if ns_params and ns_params.get("timeout_ns_deploy"):
1980 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1981 else:
1982 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1983
1984 # read from db: ns
1985 stage[1] = "Getting nsr={} from db.".format(nsr_id)
1986 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1987 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
1988 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
1989 db_nsr["nsd"] = nsd
1990 # nsr_name = db_nsr["name"] # TODO short-name??
1991
1992 # read from db: vnf's of this ns
1993 stage[1] = "Getting vnfrs from db."
1994 self.logger.debug(logging_text + stage[1])
1995 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1996
1997 # read from db: vnfd's for every vnf
1998 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1999 db_vnfds = {} # every vnfd data indexed by vnf id
2000 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
2001
2002 # for each vnf in ns, read vnfd
2003 for vnfr in db_vnfrs_list:
2004 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
2005 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
2006 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
2007
2008 # if we haven't this vnfd, read it from db
2009 if vnfd_id not in db_vnfds:
2010 # read from db
2011 stage[1] = "Getting vnfd={} id='{}' from db.".format(vnfd_id, vnfd_ref)
2012 self.logger.debug(logging_text + stage[1])
2013 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2014
2015 # store vnfd
2016 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
2017 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
2018 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
2019
2020 # Get or generates the _admin.deployed.VCA list
2021 vca_deployed_list = None
2022 if db_nsr["_admin"].get("deployed"):
2023 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2024 if vca_deployed_list is None:
2025 vca_deployed_list = []
2026 configuration_status_list = []
2027 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2028 db_nsr_update["configurationStatus"] = configuration_status_list
2029 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2030 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2031 elif isinstance(vca_deployed_list, dict):
2032 # maintain backward compatibility. Change a dict to list at database
2033 vca_deployed_list = list(vca_deployed_list.values())
2034 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2035 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2036
2037 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
2038 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2039 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2040
2041 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2042 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2043 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2044 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"})
2045
2046 # n2vc_redesign STEP 2 Deploy Network Scenario
2047 stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
2048 self._write_op_status(
2049 op_id=nslcmop_id,
2050 stage=stage
2051 )
2052
2053 stage[1] = "Deploying KDUs."
2054 # self.logger.debug(logging_text + "Before deploy_kdus")
2055 # Call to deploy_kdus in case exists the "vdu:kdu" param
2056 await self.deploy_kdus(
2057 logging_text=logging_text,
2058 nsr_id=nsr_id,
2059 nslcmop_id=nslcmop_id,
2060 db_vnfrs=db_vnfrs,
2061 db_vnfds=db_vnfds,
2062 task_instantiation_info=tasks_dict_info,
2063 )
2064
2065 stage[1] = "Getting VCA public key."
2066 # n2vc_redesign STEP 1 Get VCA public ssh-key
2067 # feature 1429. Add n2vc public key to needed VMs
2068 n2vc_key = self.n2vc.get_public_key()
2069 n2vc_key_list = [n2vc_key]
2070 if self.vca_config.get("public_key"):
2071 n2vc_key_list.append(self.vca_config["public_key"])
2072
2073 stage[1] = "Deploying NS at VIM."
2074 task_ro = asyncio.ensure_future(
2075 self.instantiate_RO(
2076 logging_text=logging_text,
2077 nsr_id=nsr_id,
2078 nsd=nsd,
2079 db_nsr=db_nsr,
2080 db_nslcmop=db_nslcmop,
2081 db_vnfrs=db_vnfrs,
2082 db_vnfds_ref=db_vnfds_ref,
2083 n2vc_key_list=n2vc_key_list,
2084 stage=stage
2085 )
2086 )
2087 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2088 tasks_dict_info[task_ro] = "Deploying at VIM"
2089
2090 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2091 stage[1] = "Deploying Execution Environments."
2092 self.logger.debug(logging_text + stage[1])
2093
2094 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2095 # get_iterable() returns a value from a dict or empty tuple if key does not exist
2096 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
2097 vnfd_id = c_vnf["vnfd-id-ref"]
2098 vnfd = db_vnfds_ref[vnfd_id]
2099 member_vnf_index = str(c_vnf["member-vnf-index"])
2100 db_vnfr = db_vnfrs[member_vnf_index]
2101 base_folder = vnfd["_admin"]["storage"]
2102 vdu_id = None
2103 vdu_index = 0
2104 vdu_name = None
2105 kdu_name = None
2106
2107 # Get additional parameters
2108 deploy_params = {"OSM": self._get_osm_params(db_vnfr)}
2109 if db_vnfr.get("additionalParamsForVnf"):
2110 deploy_params.update(self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy()))
2111
2112 descriptor_config = vnfd.get("vnf-configuration")
2113 if descriptor_config:
2114 self._deploy_n2vc(
2115 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
2116 db_nsr=db_nsr,
2117 db_vnfr=db_vnfr,
2118 nslcmop_id=nslcmop_id,
2119 nsr_id=nsr_id,
2120 nsi_id=nsi_id,
2121 vnfd_id=vnfd_id,
2122 vdu_id=vdu_id,
2123 kdu_name=kdu_name,
2124 member_vnf_index=member_vnf_index,
2125 vdu_index=vdu_index,
2126 vdu_name=vdu_name,
2127 deploy_params=deploy_params,
2128 descriptor_config=descriptor_config,
2129 base_folder=base_folder,
2130 task_instantiation_info=tasks_dict_info,
2131 stage=stage
2132 )
2133
2134 # Deploy charms for each VDU that supports one.
2135 for vdud in get_iterable(vnfd, 'vdu'):
2136 vdu_id = vdud["id"]
2137 descriptor_config = vdud.get('vdu-configuration')
2138 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
2139 if vdur.get("additionalParams"):
2140 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
2141 else:
2142 deploy_params_vdu = deploy_params
2143 deploy_params_vdu["OSM"] = self._get_osm_params(db_vnfr, vdu_id, vdu_count_index=0)
2144 if descriptor_config:
2145 vdu_name = None
2146 kdu_name = None
2147 for vdu_index in range(int(vdud.get("count", 1))):
2148 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2149 self._deploy_n2vc(
2150 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2151 member_vnf_index, vdu_id, vdu_index),
2152 db_nsr=db_nsr,
2153 db_vnfr=db_vnfr,
2154 nslcmop_id=nslcmop_id,
2155 nsr_id=nsr_id,
2156 nsi_id=nsi_id,
2157 vnfd_id=vnfd_id,
2158 vdu_id=vdu_id,
2159 kdu_name=kdu_name,
2160 member_vnf_index=member_vnf_index,
2161 vdu_index=vdu_index,
2162 vdu_name=vdu_name,
2163 deploy_params=deploy_params_vdu,
2164 descriptor_config=descriptor_config,
2165 base_folder=base_folder,
2166 task_instantiation_info=tasks_dict_info,
2167 stage=stage
2168 )
2169 for kdud in get_iterable(vnfd, 'kdu'):
2170 kdu_name = kdud["name"]
2171 descriptor_config = kdud.get('kdu-configuration')
2172 if descriptor_config:
2173 vdu_id = None
2174 vdu_index = 0
2175 vdu_name = None
2176 kdur = next(x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name)
2177 deploy_params_kdu = {"OSM": self._get_osm_params(db_vnfr)}
2178 if kdur.get("additionalParams"):
2179 deploy_params_kdu = self._format_additional_params(kdur["additionalParams"])
2180
2181 self._deploy_n2vc(
2182 logging_text=logging_text,
2183 db_nsr=db_nsr,
2184 db_vnfr=db_vnfr,
2185 nslcmop_id=nslcmop_id,
2186 nsr_id=nsr_id,
2187 nsi_id=nsi_id,
2188 vnfd_id=vnfd_id,
2189 vdu_id=vdu_id,
2190 kdu_name=kdu_name,
2191 member_vnf_index=member_vnf_index,
2192 vdu_index=vdu_index,
2193 vdu_name=vdu_name,
2194 deploy_params=deploy_params_kdu,
2195 descriptor_config=descriptor_config,
2196 base_folder=base_folder,
2197 task_instantiation_info=tasks_dict_info,
2198 stage=stage
2199 )
2200
2201 # Check if this NS has a charm configuration
2202 descriptor_config = nsd.get("ns-configuration")
2203 if descriptor_config and descriptor_config.get("juju"):
2204 vnfd_id = None
2205 db_vnfr = None
2206 member_vnf_index = None
2207 vdu_id = None
2208 kdu_name = None
2209 vdu_index = 0
2210 vdu_name = None
2211
2212 # Get additional parameters
2213 deploy_params = {"OSM": self._get_osm_params(db_vnfr)}
2214 if db_nsr.get("additionalParamsForNs"):
2215 deploy_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"].copy()))
2216 base_folder = nsd["_admin"]["storage"]
2217 self._deploy_n2vc(
2218 logging_text=logging_text,
2219 db_nsr=db_nsr,
2220 db_vnfr=db_vnfr,
2221 nslcmop_id=nslcmop_id,
2222 nsr_id=nsr_id,
2223 nsi_id=nsi_id,
2224 vnfd_id=vnfd_id,
2225 vdu_id=vdu_id,
2226 kdu_name=kdu_name,
2227 member_vnf_index=member_vnf_index,
2228 vdu_index=vdu_index,
2229 vdu_name=vdu_name,
2230 deploy_params=deploy_params,
2231 descriptor_config=descriptor_config,
2232 base_folder=base_folder,
2233 task_instantiation_info=tasks_dict_info,
2234 stage=stage
2235 )
2236
2237 # rest of staff will be done at finally
2238
2239 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
2240 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e))
2241 exc = e
2242 except asyncio.CancelledError:
2243 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
2244 exc = "Operation was cancelled"
2245 except Exception as e:
2246 exc = traceback.format_exc()
2247 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
2248 finally:
2249 if exc:
2250 error_list.append(str(exc))
2251 try:
2252 # wait for pending tasks
2253 if tasks_dict_info:
2254 stage[1] = "Waiting for instantiate pending tasks."
2255 self.logger.debug(logging_text + stage[1])
2256 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy,
2257 stage, nslcmop_id, nsr_id=nsr_id)
2258 stage[1] = stage[2] = ""
2259 except asyncio.CancelledError:
2260 error_list.append("Cancelled")
2261 # TODO cancel all tasks
2262 except Exception as exc:
2263 error_list.append(str(exc))
2264
2265 # update operation-status
2266 db_nsr_update["operational-status"] = "running"
2267 # let's begin with VCA 'configured' status (later we can change it)
2268 db_nsr_update["config-status"] = "configured"
2269 for task, task_name in tasks_dict_info.items():
2270 if not task.done() or task.cancelled() or task.exception():
2271 if task_name.startswith(self.task_name_deploy_vca):
2272 # A N2VC task is pending
2273 db_nsr_update["config-status"] = "failed"
2274 else:
2275 # RO or KDU task is pending
2276 db_nsr_update["operational-status"] = "failed"
2277
2278 # update status at database
2279 if error_list:
2280 error_detail = ". ".join(error_list)
2281 self.logger.error(logging_text + error_detail)
2282 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
2283 error_description_nsr = 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id, stage[0])
2284
2285 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
2286 db_nslcmop_update["detailed-status"] = error_detail
2287 nslcmop_operation_state = "FAILED"
2288 ns_state = "BROKEN"
2289 else:
2290 error_detail = None
2291 error_description_nsr = error_description_nslcmop = None
2292 ns_state = "READY"
2293 db_nsr_update["detailed-status"] = "Done"
2294 db_nslcmop_update["detailed-status"] = "Done"
2295 nslcmop_operation_state = "COMPLETED"
2296
2297 if db_nsr:
2298 self._write_ns_status(
2299 nsr_id=nsr_id,
2300 ns_state=ns_state,
2301 current_operation="IDLE",
2302 current_operation_id=None,
2303 error_description=error_description_nsr,
2304 error_detail=error_detail,
2305 other_update=db_nsr_update
2306 )
2307 self._write_op_status(
2308 op_id=nslcmop_id,
2309 stage="",
2310 error_message=error_description_nslcmop,
2311 operation_state=nslcmop_operation_state,
2312 other_update=db_nslcmop_update,
2313 )
2314
2315 if nslcmop_operation_state:
2316 try:
2317 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2318 "operationState": nslcmop_operation_state},
2319 loop=self.loop)
2320 except Exception as e:
2321 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2322
2323 self.logger.debug(logging_text + "Exit")
2324 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2325
2326 async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int,
2327 timeout: int = 3600, vca_type: str = None) -> bool:
2328
2329 # steps:
2330 # 1. find all relations for this VCA
2331 # 2. wait for other peers related
2332 # 3. add relations
2333
2334 try:
2335 vca_type = vca_type or "lxc_proxy_charm"
2336
2337 # STEP 1: find all relations for this VCA
2338
2339 # read nsr record
2340 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2341 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2342
2343 # this VCA data
2344 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
2345
2346 # read all ns-configuration relations
2347 ns_relations = list()
2348 db_ns_relations = deep_get(nsd, ('ns-configuration', 'relation'))
2349 if db_ns_relations:
2350 for r in db_ns_relations:
2351 # check if this VCA is in the relation
2352 if my_vca.get('member-vnf-index') in\
2353 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2354 ns_relations.append(r)
2355
2356 # read all vnf-configuration relations
2357 vnf_relations = list()
2358 db_vnfd_list = db_nsr.get('vnfd-id')
2359 if db_vnfd_list:
2360 for vnfd in db_vnfd_list:
2361 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
2362 db_vnf_relations = deep_get(db_vnfd, ('vnf-configuration', 'relation'))
2363 if db_vnf_relations:
2364 for r in db_vnf_relations:
2365 # check if this VCA is in the relation
2366 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2367 vnf_relations.append(r)
2368
2369 # if no relations, terminate
2370 if not ns_relations and not vnf_relations:
2371 self.logger.debug(logging_text + ' No relations')
2372 return True
2373
2374 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
2375
2376 # add all relations
2377 start = time()
2378 while True:
2379 # check timeout
2380 now = time()
2381 if now - start >= timeout:
2382 self.logger.error(logging_text + ' : timeout adding relations')
2383 return False
2384
2385 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2386 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2387
2388 # for each defined NS relation, find the VCA's related
2389 for r in ns_relations.copy():
2390 from_vca_ee_id = None
2391 to_vca_ee_id = None
2392 from_vca_endpoint = None
2393 to_vca_endpoint = None
2394 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2395 for vca in vca_list:
2396 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
2397 and vca.get('config_sw_installed'):
2398 from_vca_ee_id = vca.get('ee_id')
2399 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2400 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
2401 and vca.get('config_sw_installed'):
2402 to_vca_ee_id = vca.get('ee_id')
2403 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2404 if from_vca_ee_id and to_vca_ee_id:
2405 # add relation
2406 await self.vca_map[vca_type].add_relation(
2407 ee_id_1=from_vca_ee_id,
2408 ee_id_2=to_vca_ee_id,
2409 endpoint_1=from_vca_endpoint,
2410 endpoint_2=to_vca_endpoint)
2411 # remove entry from relations list
2412 ns_relations.remove(r)
2413 else:
2414 # check failed peers
2415 try:
2416 vca_status_list = db_nsr.get('configurationStatus')
2417 if vca_status_list:
2418 for i in range(len(vca_list)):
2419 vca = vca_list[i]
2420 vca_status = vca_status_list[i]
2421 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
2422 if vca_status.get('status') == 'BROKEN':
2423 # peer broken: remove relation from list
2424 ns_relations.remove(r)
2425 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
2426 if vca_status.get('status') == 'BROKEN':
2427 # peer broken: remove relation from list
2428 ns_relations.remove(r)
2429 except Exception:
2430 # ignore
2431 pass
2432
2433 # for each defined VNF relation, find the VCA's related
2434 for r in vnf_relations.copy():
2435 from_vca_ee_id = None
2436 to_vca_ee_id = None
2437 from_vca_endpoint = None
2438 to_vca_endpoint = None
2439 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2440 for vca in vca_list:
2441 key_to_check = "vdu_id"
2442 if vca.get("vdu_id") is None:
2443 key_to_check = "vnfd_id"
2444 if vca.get(key_to_check) == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
2445 from_vca_ee_id = vca.get('ee_id')
2446 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2447 if vca.get(key_to_check) == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
2448 to_vca_ee_id = vca.get('ee_id')
2449 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2450 if from_vca_ee_id and to_vca_ee_id:
2451 # add relation
2452 await self.vca_map[vca_type].add_relation(
2453 ee_id_1=from_vca_ee_id,
2454 ee_id_2=to_vca_ee_id,
2455 endpoint_1=from_vca_endpoint,
2456 endpoint_2=to_vca_endpoint)
2457 # remove entry from relations list
2458 vnf_relations.remove(r)
2459 else:
2460 # check failed peers
2461 try:
2462 vca_status_list = db_nsr.get('configurationStatus')
2463 if vca_status_list:
2464 for i in range(len(vca_list)):
2465 vca = vca_list[i]
2466 vca_status = vca_status_list[i]
2467 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
2468 if vca_status.get('status') == 'BROKEN':
2469 # peer broken: remove relation from list
2470 vnf_relations.remove(r)
2471 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2472 if vca_status.get('status') == 'BROKEN':
2473 # peer broken: remove relation from list
2474 vnf_relations.remove(r)
2475 except Exception:
2476 # ignore
2477 pass
2478
2479 # wait for next try
2480 await asyncio.sleep(5.0)
2481
2482 if not ns_relations and not vnf_relations:
2483 self.logger.debug('Relations added')
2484 break
2485
2486 return True
2487
2488 except Exception as e:
2489 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2490 return False
2491
2492 async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdud: dict,
2493 vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600):
2494
2495 try:
2496 k8sclustertype = k8s_instance_info["k8scluster-type"]
2497 # Instantiate kdu
2498 db_dict_install = {"collection": "nsrs",
2499 "filter": {"_id": nsr_id},
2500 "path": nsr_db_path}
2501
2502 kdu_instance = await self.k8scluster_map[k8sclustertype].install(
2503 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2504 kdu_model=k8s_instance_info["kdu-model"],
2505 atomic=True,
2506 params=k8params,
2507 db_dict=db_dict_install,
2508 timeout=timeout,
2509 kdu_name=k8s_instance_info["kdu-name"],
2510 namespace=k8s_instance_info["namespace"])
2511 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
2512
2513 # Obtain services to obtain management service ip
2514 services = await self.k8scluster_map[k8sclustertype].get_services(
2515 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2516 kdu_instance=kdu_instance,
2517 namespace=k8s_instance_info["namespace"])
2518
2519 # Obtain management service info (if exists)
2520 vnfr_update_dict = {}
2521 if services:
2522 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
2523 mgmt_services = [service for service in kdud.get("service", []) if service.get("mgmt-service")]
2524 for mgmt_service in mgmt_services:
2525 for service in services:
2526 if service["name"].startswith(mgmt_service["name"]):
2527 # Mgmt service found, Obtain service ip
2528 ip = service.get("external_ip", service.get("cluster_ip"))
2529 if isinstance(ip, list) and len(ip) == 1:
2530 ip = ip[0]
2531
2532 vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip
2533
2534 # Check if must update also mgmt ip at the vnf
2535 service_external_cp = mgmt_service.get("external-connection-point-ref")
2536 if service_external_cp:
2537 if deep_get(vnfd, ("mgmt-interface", "cp")) == service_external_cp:
2538 vnfr_update_dict["ip-address"] = ip
2539
2540 break
2541 else:
2542 self.logger.warn("Mgmt service name: {} not found".format(mgmt_service["name"]))
2543
2544 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
2545 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
2546
2547 kdu_config = kdud.get("kdu-configuration")
2548 if kdu_config and kdu_config.get("initial-config-primitive") and kdu_config.get("juju") is None:
2549 initial_config_primitive_list = kdu_config.get("initial-config-primitive")
2550 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
2551
2552 for initial_config_primitive in initial_config_primitive_list:
2553 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, {})
2554
2555 await asyncio.wait_for(
2556 self.k8scluster_map[k8sclustertype].exec_primitive(
2557 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2558 kdu_instance=kdu_instance,
2559 primitive_name=initial_config_primitive["name"],
2560 params=primitive_params_, db_dict={}),
2561 timeout=timeout)
2562
2563 except Exception as e:
2564 # Prepare update db with error and raise exception
2565 try:
2566 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)})
2567 self.update_db_2("vnfrs", vnfr_data.get("_id"), {"kdur.{}.status".format(kdu_index): "ERROR"})
2568 except Exception:
2569 # ignore to keep original exception
2570 pass
2571 # reraise original error
2572 raise
2573
2574 return kdu_instance
2575
2576 async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
2577 # Launch kdus if present in the descriptor
2578
2579 k8scluster_id_2_uuic = {"helm-chart-v3": {}, "helm-chart": {}, "juju-bundle": {}}
2580
2581 async def _get_cluster_id(cluster_id, cluster_type):
2582 nonlocal k8scluster_id_2_uuic
2583 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2584 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2585
2586 # check if K8scluster is creating and wait look if previous tasks in process
2587 task_name, task_dependency = self.lcm_tasks.lookfor_related("k8scluster", cluster_id)
2588 if task_dependency:
2589 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name, cluster_id)
2590 self.logger.debug(logging_text + text)
2591 await asyncio.wait(task_dependency, timeout=3600)
2592
2593 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2594 if not db_k8scluster:
2595 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2596
2597 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2598 if not k8s_id:
2599 if cluster_type == "helm-chart-v3":
2600 try:
2601 # backward compatibility for existing clusters that have not been initialized for helm v3
2602 k8s_credentials = yaml.safe_dump(db_k8scluster.get("credentials"))
2603 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(k8s_credentials,
2604 reuse_cluster_uuid=cluster_id)
2605 db_k8scluster_update = {}
2606 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
2607 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
2608 db_k8scluster_update["_admin.helm-chart-v3.created"] = uninstall_sw
2609 db_k8scluster_update["_admin.helm-chart-v3.operationalState"] = "ENABLED"
2610 self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update)
2611 except Exception as e:
2612 self.logger.error(logging_text + "error initializing helm-v3 cluster: {}".format(str(e)))
2613 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id,
2614 cluster_type))
2615 else:
2616 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".
2617 format(cluster_id, cluster_type))
2618 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2619 return k8s_id
2620
2621 logging_text += "Deploy kdus: "
2622 step = ""
2623 try:
2624 db_nsr_update = {"_admin.deployed.K8s": []}
2625 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2626
2627 index = 0
2628 updated_cluster_list = []
2629 updated_v3_cluster_list = []
2630
2631 for vnfr_data in db_vnfrs.values():
2632 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
2633 # Step 0: Prepare and set parameters
2634 desc_params = self._format_additional_params(kdur.get("additionalParams"))
2635 vnfd_id = vnfr_data.get('vnfd-id')
2636 kdud = next(kdud for kdud in db_vnfds[vnfd_id]["kdu"] if kdud["name"] == kdur["kdu-name"])
2637 namespace = kdur.get("k8s-namespace")
2638 if kdur.get("helm-chart"):
2639 kdumodel = kdur["helm-chart"]
2640 # Default version: helm3, if helm-version is v2 assign v2
2641 k8sclustertype = "helm-chart-v3"
2642 self.logger.debug("kdur: {}".format(kdur))
2643 if kdur.get("helm-version") and kdur.get("helm-version") == "v2":
2644 k8sclustertype = "helm-chart"
2645 elif kdur.get("juju-bundle"):
2646 kdumodel = kdur["juju-bundle"]
2647 k8sclustertype = "juju-bundle"
2648 else:
2649 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2650 "juju-bundle. Maybe an old NBI version is running".
2651 format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
2652 # check if kdumodel is a file and exists
2653 try:
2654 storage = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage'))
2655 if storage and storage.get('pkg-dir'): # may be not present if vnfd has not artifacts
2656 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2657 filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["pkg-dir"], k8sclustertype,
2658 kdumodel)
2659 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2660 kdumodel = self.fs.path + filename
2661 except (asyncio.TimeoutError, asyncio.CancelledError):
2662 raise
2663 except Exception: # it is not a file
2664 pass
2665
2666 k8s_cluster_id = kdur["k8s-cluster"]["id"]
2667 step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
2668 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
2669
2670 # Synchronize repos
2671 if (k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list)\
2672 or (k8sclustertype == "helm-chart-v3" and cluster_uuid not in updated_v3_cluster_list):
2673 del_repo_list, added_repo_dict = await asyncio.ensure_future(
2674 self.k8scluster_map[k8sclustertype].synchronize_repos(cluster_uuid=cluster_uuid))
2675 if del_repo_list or added_repo_dict:
2676 if k8sclustertype == "helm-chart":
2677 unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
2678 updated = {'_admin.helm_charts_added.' +
2679 item: name for item, name in added_repo_dict.items()}
2680 updated_cluster_list.append(cluster_uuid)
2681 elif k8sclustertype == "helm-chart-v3":
2682 unset = {'_admin.helm_charts_v3_added.' + item: None for item in del_repo_list}
2683 updated = {'_admin.helm_charts_v3_added.' +
2684 item: name for item, name in added_repo_dict.items()}
2685 updated_v3_cluster_list.append(cluster_uuid)
2686 self.logger.debug(logging_text + "repos synchronized on k8s cluster "
2687 "'{}' to_delete: {}, to_add: {}".
2688 format(k8s_cluster_id, del_repo_list, added_repo_dict))
2689 self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
2690
2691 # Instantiate kdu
2692 step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
2693 kdur["kdu-name"], k8s_cluster_id)
2694 k8s_instance_info = {"kdu-instance": None,
2695 "k8scluster-uuid": cluster_uuid,
2696 "k8scluster-type": k8sclustertype,
2697 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
2698 "kdu-name": kdur["kdu-name"],
2699 "kdu-model": kdumodel,
2700 "namespace": namespace}
2701 db_path = "_admin.deployed.K8s.{}".format(index)
2702 db_nsr_update[db_path] = k8s_instance_info
2703 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2704
2705 task = asyncio.ensure_future(
2706 self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdud, db_vnfds[vnfd_id],
2707 k8s_instance_info, k8params=desc_params, timeout=600))
2708 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
2709 task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
2710
2711 index += 1
2712
2713 except (LcmException, asyncio.CancelledError):
2714 raise
2715 except Exception as e:
2716 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
2717 if isinstance(e, (N2VCException, DbException)):
2718 self.logger.error(logging_text + msg)
2719 else:
2720 self.logger.critical(logging_text + msg, exc_info=True)
2721 raise LcmException(msg)
2722 finally:
2723 if db_nsr_update:
2724 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2725
2726 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2727 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2728 base_folder, task_instantiation_info, stage):
2729 # launch instantiate_N2VC in a asyncio task and register task object
2730 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2731 # if not found, create one entry and update database
2732 # fill db_nsr._admin.deployed.VCA.<index>
2733
2734 self.logger.debug(logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id))
2735 if descriptor_config.get("juju"): # There is one execution envioronment of type juju
2736 ee_list = [descriptor_config]
2737 elif descriptor_config.get("execution-environment-list"):
2738 ee_list = descriptor_config.get("execution-environment-list")
2739 else: # other types as script are not supported
2740 ee_list = []
2741
2742 for ee_item in ee_list:
2743 self.logger.debug(logging_text + "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item.get('juju'),
2744 ee_item.get("helm-chart")))
2745 ee_descriptor_id = ee_item.get("id")
2746 if ee_item.get("juju"):
2747 vca_name = ee_item['juju'].get('charm')
2748 vca_type = "lxc_proxy_charm" if ee_item['juju'].get('charm') is not None else "native_charm"
2749 if ee_item['juju'].get('cloud') == "k8s":
2750 vca_type = "k8s_proxy_charm"
2751 elif ee_item['juju'].get('proxy') is False:
2752 vca_type = "native_charm"
2753 elif ee_item.get("helm-chart"):
2754 vca_name = ee_item['helm-chart']
2755 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
2756 vca_type = "helm"
2757 else:
2758 vca_type = "helm-v3"
2759 else:
2760 self.logger.debug(logging_text + "skipping non juju neither charm configuration")
2761 continue
2762
2763 vca_index = -1
2764 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2765 if not vca_deployed:
2766 continue
2767 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2768 vca_deployed.get("vdu_id") == vdu_id and \
2769 vca_deployed.get("kdu_name") == kdu_name and \
2770 vca_deployed.get("vdu_count_index", 0) == vdu_index and \
2771 vca_deployed.get("ee_descriptor_id") == ee_descriptor_id:
2772 break
2773 else:
2774 # not found, create one.
2775 target = "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
2776 if vdu_id:
2777 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
2778 elif kdu_name:
2779 target += "/kdu/{}".format(kdu_name)
2780 vca_deployed = {
2781 "target_element": target,
2782 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
2783 "member-vnf-index": member_vnf_index,
2784 "vdu_id": vdu_id,
2785 "kdu_name": kdu_name,
2786 "vdu_count_index": vdu_index,
2787 "operational-status": "init", # TODO revise
2788 "detailed-status": "", # TODO revise
2789 "step": "initial-deploy", # TODO revise
2790 "vnfd_id": vnfd_id,
2791 "vdu_name": vdu_name,
2792 "type": vca_type,
2793 "ee_descriptor_id": ee_descriptor_id
2794 }
2795 vca_index += 1
2796
2797 # create VCA and configurationStatus in db
2798 db_dict = {
2799 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2800 "configurationStatus.{}".format(vca_index): dict()
2801 }
2802 self.update_db_2("nsrs", nsr_id, db_dict)
2803
2804 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2805
2806 # Launch task
2807 task_n2vc = asyncio.ensure_future(
2808 self.instantiate_N2VC(
2809 logging_text=logging_text,
2810 vca_index=vca_index,
2811 nsi_id=nsi_id,
2812 db_nsr=db_nsr,
2813 db_vnfr=db_vnfr,
2814 vdu_id=vdu_id,
2815 kdu_name=kdu_name,
2816 vdu_index=vdu_index,
2817 deploy_params=deploy_params,
2818 config_descriptor=descriptor_config,
2819 base_folder=base_folder,
2820 nslcmop_id=nslcmop_id,
2821 stage=stage,
2822 vca_type=vca_type,
2823 vca_name=vca_name,
2824 ee_config_descriptor=ee_item
2825 )
2826 )
2827 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2828 task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
2829 member_vnf_index or "", vdu_id or "")
2830
2831 @staticmethod
2832 def _get_terminate_config_primitive(primitive_list, vca_deployed):
2833 """ Get a sorted terminate config primitive list. In case ee_descriptor_id is present at vca_deployed,
2834 it get only those primitives for this execution envirom"""
2835
2836 primitive_list = primitive_list or []
2837 # filter primitives by ee_descriptor_id
2838 ee_descriptor_id = vca_deployed.get("ee_descriptor_id")
2839 primitive_list = [p for p in primitive_list if p.get("execution-environment-ref") == ee_descriptor_id]
2840
2841 if primitive_list:
2842 primitive_list.sort(key=lambda val: int(val['seq']))
2843
2844 return primitive_list
2845
2846 @staticmethod
2847 def _create_nslcmop(nsr_id, operation, params):
2848 """
2849 Creates a ns-lcm-opp content to be stored at database.
2850 :param nsr_id: internal id of the instance
2851 :param operation: instantiate, terminate, scale, action, ...
2852 :param params: user parameters for the operation
2853 :return: dictionary following SOL005 format
2854 """
2855 # Raise exception if invalid arguments
2856 if not (nsr_id and operation and params):
2857 raise LcmException(
2858 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2859 now = time()
2860 _id = str(uuid4())
2861 nslcmop = {
2862 "id": _id,
2863 "_id": _id,
2864 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2865 "operationState": "PROCESSING",
2866 "statusEnteredTime": now,
2867 "nsInstanceId": nsr_id,
2868 "lcmOperationType": operation,
2869 "startTime": now,
2870 "isAutomaticInvocation": False,
2871 "operationParams": params,
2872 "isCancelPending": False,
2873 "links": {
2874 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2875 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2876 }
2877 }
2878 return nslcmop
2879
2880 def _format_additional_params(self, params):
2881 params = params or {}
2882 for key, value in params.items():
2883 if str(value).startswith("!!yaml "):
2884 params[key] = yaml.safe_load(value[7:])
2885 return params
2886
2887 def _get_terminate_primitive_params(self, seq, vnf_index):
2888 primitive = seq.get('name')
2889 primitive_params = {}
2890 params = {
2891 "member_vnf_index": vnf_index,
2892 "primitive": primitive,
2893 "primitive_params": primitive_params,
2894 }
2895 desc_params = {}
2896 return self._map_primitive_params(seq, params, desc_params)
2897
2898 # sub-operations
2899
2900 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
2901 op = deep_get(db_nslcmop, ('_admin', 'operations'), [])[op_index]
2902 if op.get('operationState') == 'COMPLETED':
2903 # b. Skip sub-operation
2904 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2905 return self.SUBOPERATION_STATUS_SKIP
2906 else:
2907 # c. retry executing sub-operation
2908 # The sub-operation exists, and operationState != 'COMPLETED'
2909 # Update operationState = 'PROCESSING' to indicate a retry.
2910 operationState = 'PROCESSING'
2911 detailed_status = 'In progress'
2912 self._update_suboperation_status(
2913 db_nslcmop, op_index, operationState, detailed_status)
2914 # Return the sub-operation index
2915 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2916 # with arguments extracted from the sub-operation
2917 return op_index
2918
2919 # Find a sub-operation where all keys in a matching dictionary must match
2920 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2921 def _find_suboperation(self, db_nslcmop, match):
2922 if db_nslcmop and match:
2923 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2924 for i, op in enumerate(op_list):
2925 if all(op.get(k) == match[k] for k in match):
2926 return i
2927 return self.SUBOPERATION_STATUS_NOT_FOUND
2928
2929 # Update status for a sub-operation given its index
2930 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2931 # Update DB for HA tasks
2932 q_filter = {'_id': db_nslcmop['_id']}
2933 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2934 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2935 self.db.set_one("nslcmops",
2936 q_filter=q_filter,
2937 update_dict=update_dict,
2938 fail_on_empty=False)
2939
2940 # Add sub-operation, return the index of the added sub-operation
2941 # Optionally, set operationState, detailed-status, and operationType
2942 # Status and type are currently set for 'scale' sub-operations:
2943 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2944 # 'detailed-status' : status message
2945 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2946 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2947 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2948 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2949 RO_nsr_id=None, RO_scaling_info=None):
2950 if not db_nslcmop:
2951 return self.SUBOPERATION_STATUS_NOT_FOUND
2952 # Get the "_admin.operations" list, if it exists
2953 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2954 op_list = db_nslcmop_admin.get('operations')
2955 # Create or append to the "_admin.operations" list
2956 new_op = {'member_vnf_index': vnf_index,
2957 'vdu_id': vdu_id,
2958 'vdu_count_index': vdu_count_index,
2959 'primitive': primitive,
2960 'primitive_params': mapped_primitive_params}
2961 if operationState:
2962 new_op['operationState'] = operationState
2963 if detailed_status:
2964 new_op['detailed-status'] = detailed_status
2965 if operationType:
2966 new_op['lcmOperationType'] = operationType
2967 if RO_nsr_id:
2968 new_op['RO_nsr_id'] = RO_nsr_id
2969 if RO_scaling_info:
2970 new_op['RO_scaling_info'] = RO_scaling_info
2971 if not op_list:
2972 # No existing operations, create key 'operations' with current operation as first list element
2973 db_nslcmop_admin.update({'operations': [new_op]})
2974 op_list = db_nslcmop_admin.get('operations')
2975 else:
2976 # Existing operations, append operation to list
2977 op_list.append(new_op)
2978
2979 db_nslcmop_update = {'_admin.operations': op_list}
2980 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2981 op_index = len(op_list) - 1
2982 return op_index
2983
2984 # Helper methods for scale() sub-operations
2985
2986 # pre-scale/post-scale:
2987 # Check for 3 different cases:
2988 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2989 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2990 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2991 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2992 operationType, RO_nsr_id=None, RO_scaling_info=None):
2993 # Find this sub-operation
2994 if RO_nsr_id and RO_scaling_info:
2995 operationType = 'SCALE-RO'
2996 match = {
2997 'member_vnf_index': vnf_index,
2998 'RO_nsr_id': RO_nsr_id,
2999 'RO_scaling_info': RO_scaling_info,
3000 }
3001 else:
3002 match = {
3003 'member_vnf_index': vnf_index,
3004 'primitive': vnf_config_primitive,
3005 'primitive_params': primitive_params,
3006 'lcmOperationType': operationType
3007 }
3008 op_index = self._find_suboperation(db_nslcmop, match)
3009 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3010 # a. New sub-operation
3011 # The sub-operation does not exist, add it.
3012 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3013 # The following parameters are set to None for all kind of scaling:
3014 vdu_id = None
3015 vdu_count_index = None
3016 vdu_name = None
3017 if RO_nsr_id and RO_scaling_info:
3018 vnf_config_primitive = None
3019 primitive_params = None
3020 else:
3021 RO_nsr_id = None
3022 RO_scaling_info = None
3023 # Initial status for sub-operation
3024 operationState = 'PROCESSING'
3025 detailed_status = 'In progress'
3026 # Add sub-operation for pre/post-scaling (zero or more operations)
3027 self._add_suboperation(db_nslcmop,
3028 vnf_index,
3029 vdu_id,
3030 vdu_count_index,
3031 vdu_name,
3032 vnf_config_primitive,
3033 primitive_params,
3034 operationState,
3035 detailed_status,
3036 operationType,
3037 RO_nsr_id,
3038 RO_scaling_info)
3039 return self.SUBOPERATION_STATUS_NEW
3040 else:
3041 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3042 # or op_index (operationState != 'COMPLETED')
3043 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3044
3045 # Function to return execution_environment id
3046
3047 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3048 # TODO vdu_index_count
3049 for vca in vca_deployed_list:
3050 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3051 return vca["ee_id"]
3052
3053 async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor,
3054 vca_index, destroy_ee=True, exec_primitives=True):
3055 """
3056 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3057 :param logging_text:
3058 :param db_nslcmop:
3059 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3060 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3061 :param vca_index: index in the database _admin.deployed.VCA
3062 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3063 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3064 not executed properly
3065 :return: None or exception
3066 """
3067
3068 self.logger.debug(
3069 logging_text + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3070 vca_index, vca_deployed, config_descriptor, destroy_ee
3071 )
3072 )
3073
3074 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
3075
3076 # execute terminate_primitives
3077 if exec_primitives:
3078 terminate_primitives = self._get_terminate_config_primitive(
3079 config_descriptor.get("terminate-config-primitive"), vca_deployed)
3080 vdu_id = vca_deployed.get("vdu_id")
3081 vdu_count_index = vca_deployed.get("vdu_count_index")
3082 vdu_name = vca_deployed.get("vdu_name")
3083 vnf_index = vca_deployed.get("member-vnf-index")
3084 if terminate_primitives and vca_deployed.get("needed_terminate"):
3085 for seq in terminate_primitives:
3086 # For each sequence in list, get primitive and call _ns_execute_primitive()
3087 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
3088 vnf_index, seq.get("name"))
3089 self.logger.debug(logging_text + step)
3090 # Create the primitive for each sequence, i.e. "primitive": "touch"
3091 primitive = seq.get('name')
3092 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
3093
3094 # Add sub-operation
3095 self._add_suboperation(db_nslcmop,
3096 vnf_index,
3097 vdu_id,
3098 vdu_count_index,
3099 vdu_name,
3100 primitive,
3101 mapped_primitive_params)
3102 # Sub-operations: Call _ns_execute_primitive() instead of action()
3103 try:
3104 result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
3105 mapped_primitive_params,
3106 vca_type=vca_type)
3107 except LcmException:
3108 # this happens when VCA is not deployed. In this case it is not needed to terminate
3109 continue
3110 result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
3111 if result not in result_ok:
3112 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
3113 "error {}".format(seq.get("name"), vnf_index, result_detail))
3114 # set that this VCA do not need terminated
3115 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
3116 self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
3117
3118 if vca_deployed.get("prometheus_jobs") and self.prometheus:
3119 await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
3120
3121 if destroy_ee:
3122 await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"])
3123
3124 async def _delete_all_N2VC(self, db_nsr: dict):
3125 self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
3126 namespace = "." + db_nsr["_id"]
3127 try:
3128 await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
3129 except N2VCNotFound: # already deleted. Skip
3130 pass
3131 self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
3132
3133 async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
3134 """
3135 Terminates a deployment from RO
3136 :param logging_text:
3137 :param nsr_deployed: db_nsr._admin.deployed
3138 :param nsr_id:
3139 :param nslcmop_id:
3140 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
3141 this method will update only the index 2, but it will write on database the concatenated content of the list
3142 :return:
3143 """
3144 db_nsr_update = {}
3145 failed_detail = []
3146 ro_nsr_id = ro_delete_action = None
3147 if nsr_deployed and nsr_deployed.get("RO"):
3148 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
3149 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
3150 try:
3151 if ro_nsr_id:
3152 stage[2] = "Deleting ns from VIM."
3153 db_nsr_update["detailed-status"] = " ".join(stage)
3154 self._write_op_status(nslcmop_id, stage)
3155 self.logger.debug(logging_text + stage[2])
3156 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3157 self._write_op_status(nslcmop_id, stage)
3158 desc = await self.RO.delete("ns", ro_nsr_id)
3159 ro_delete_action = desc["action_id"]
3160 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
3161 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3162 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3163 if ro_delete_action:
3164 # wait until NS is deleted from VIM
3165 stage[2] = "Waiting ns deleted from VIM."
3166 detailed_status_old = None
3167 self.logger.debug(logging_text + stage[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id,
3168 ro_delete_action))
3169 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3170 self._write_op_status(nslcmop_id, stage)
3171
3172 delete_timeout = 20 * 60 # 20 minutes
3173 while delete_timeout > 0:
3174 desc = await self.RO.show(
3175 "ns",
3176 item_id_name=ro_nsr_id,
3177 extra_item="action",
3178 extra_item_id=ro_delete_action)
3179
3180 # deploymentStatus
3181 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3182
3183 ns_status, ns_status_info = self.RO.check_action_status(desc)
3184 if ns_status == "ERROR":
3185 raise ROclient.ROClientException(ns_status_info)
3186 elif ns_status == "BUILD":
3187 stage[2] = "Deleting from VIM {}".format(ns_status_info)
3188 elif ns_status == "ACTIVE":
3189 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3190 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3191 break
3192 else:
3193 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
3194 if stage[2] != detailed_status_old:
3195 detailed_status_old = stage[2]
3196 db_nsr_update["detailed-status"] = " ".join(stage)
3197 self._write_op_status(nslcmop_id, stage)
3198 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3199 await asyncio.sleep(5, loop=self.loop)
3200 delete_timeout -= 5
3201 else: # delete_timeout <= 0:
3202 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
3203
3204 except Exception as e:
3205 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3206 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3207 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3208 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3209 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3210 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id))
3211 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3212 failed_detail.append("delete conflict: {}".format(e))
3213 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
3214 else:
3215 failed_detail.append("delete error: {}".format(e))
3216 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e))
3217
3218 # Delete nsd
3219 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
3220 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
3221 try:
3222 stage[2] = "Deleting nsd from RO."
3223 db_nsr_update["detailed-status"] = " ".join(stage)
3224 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3225 self._write_op_status(nslcmop_id, stage)
3226 await self.RO.delete("nsd", ro_nsd_id)
3227 self.logger.debug(logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id))
3228 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3229 except Exception as e:
3230 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3231 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3232 self.logger.debug(logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id))
3233 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3234 failed_detail.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e))
3235 self.logger.debug(logging_text + failed_detail[-1])
3236 else:
3237 failed_detail.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id, e))
3238 self.logger.error(logging_text + failed_detail[-1])
3239
3240 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
3241 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
3242 if not vnf_deployed or not vnf_deployed["id"]:
3243 continue
3244 try:
3245 ro_vnfd_id = vnf_deployed["id"]
3246 stage[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
3247 vnf_deployed["member-vnf-index"], ro_vnfd_id)
3248 db_nsr_update["detailed-status"] = " ".join(stage)
3249 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3250 self._write_op_status(nslcmop_id, stage)
3251 await self.RO.delete("vnfd", ro_vnfd_id)
3252 self.logger.debug(logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id))
3253 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
3254 except Exception as e:
3255 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3256 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
3257 self.logger.debug(logging_text + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id))
3258 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3259 failed_detail.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e))
3260 self.logger.debug(logging_text + failed_detail[-1])
3261 else:
3262 failed_detail.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e))
3263 self.logger.error(logging_text + failed_detail[-1])
3264
3265 if failed_detail:
3266 stage[2] = "Error deleting from VIM"
3267 else:
3268 stage[2] = "Deleted from VIM"
3269 db_nsr_update["detailed-status"] = " ".join(stage)
3270 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3271 self._write_op_status(nslcmop_id, stage)
3272
3273 if failed_detail:
3274 raise LcmException("; ".join(failed_detail))
3275
3276 async def terminate(self, nsr_id, nslcmop_id):
3277 # Try to lock HA task here
3278 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3279 if not task_is_locked_by_me:
3280 return
3281
3282 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
3283 self.logger.debug(logging_text + "Enter")
3284 timeout_ns_terminate = self.timeout_ns_terminate
3285 db_nsr = None
3286 db_nslcmop = None
3287 operation_params = None
3288 exc = None
3289 error_list = [] # annotates all failed error messages
3290 db_nslcmop_update = {}
3291 autoremove = False # autoremove after terminated
3292 tasks_dict_info = {}
3293 db_nsr_update = {}
3294 stage = ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
3295 # ^ contains [stage, step, VIM-status]
3296 try:
3297 # wait for any previous tasks in process
3298 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
3299
3300 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
3301 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3302 operation_params = db_nslcmop.get("operationParams") or {}
3303 if operation_params.get("timeout_ns_terminate"):
3304 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
3305 stage[1] = "Getting nsr={} from db.".format(nsr_id)
3306 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3307
3308 db_nsr_update["operational-status"] = "terminating"
3309 db_nsr_update["config-status"] = "terminating"
3310 self._write_ns_status(
3311 nsr_id=nsr_id,
3312 ns_state="TERMINATING",
3313 current_operation="TERMINATING",
3314 current_operation_id=nslcmop_id,
3315 other_update=db_nsr_update
3316 )
3317 self._write_op_status(
3318 op_id=nslcmop_id,
3319 queuePosition=0,
3320 stage=stage
3321 )
3322 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
3323 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
3324 return
3325
3326 stage[1] = "Getting vnf descriptors from db."
3327 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
3328 db_vnfds_from_id = {}
3329 db_vnfds_from_member_index = {}
3330 # Loop over VNFRs
3331 for vnfr in db_vnfrs_list:
3332 vnfd_id = vnfr["vnfd-id"]
3333 if vnfd_id not in db_vnfds_from_id:
3334 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
3335 db_vnfds_from_id[vnfd_id] = vnfd
3336 db_vnfds_from_member_index[vnfr["member-vnf-index-ref"]] = db_vnfds_from_id[vnfd_id]
3337
3338 # Destroy individual execution environments when there are terminating primitives.
3339 # Rest of EE will be deleted at once
3340 # TODO - check before calling _destroy_N2VC
3341 # if not operation_params.get("skip_terminate_primitives"):#
3342 # or not vca.get("needed_terminate"):
3343 stage[0] = "Stage 2/3 execute terminating primitives."
3344 self.logger.debug(logging_text + stage[0])
3345 stage[1] = "Looking execution environment that needs terminate."
3346 self.logger.debug(logging_text + stage[1])
3347 # self.logger.debug("nsr_deployed: {}".format(nsr_deployed))
3348 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
3349 config_descriptor = None
3350 if not vca or not vca.get("ee_id"):
3351 continue
3352 if not vca.get("member-vnf-index"):
3353 # ns
3354 config_descriptor = db_nsr.get("ns-configuration")
3355 elif vca.get("vdu_id"):
3356 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3357 vdud = next((vdu for vdu in db_vnfd.get("vdu", ()) if vdu["id"] == vca.get("vdu_id")), None)
3358 if vdud:
3359 config_descriptor = vdud.get("vdu-configuration")
3360 elif vca.get("kdu_name"):
3361 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3362 kdud = next((kdu for kdu in db_vnfd.get("kdu", ()) if kdu["name"] == vca.get("kdu_name")), None)
3363 if kdud:
3364 config_descriptor = kdud.get("kdu-configuration")
3365 else:
3366 config_descriptor = db_vnfds_from_member_index[vca["member-vnf-index"]].get("vnf-configuration")
3367 vca_type = vca.get("type")
3368 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
3369 vca.get("needed_terminate"))
3370 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
3371 # pending native charms
3372 destroy_ee = True if vca_type in ("helm", "helm-v3", "native_charm") else False
3373 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
3374 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
3375 task = asyncio.ensure_future(
3376 self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index,
3377 destroy_ee, exec_terminate_primitives))
3378 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
3379
3380 # wait for pending tasks of terminate primitives
3381 if tasks_dict_info:
3382 self.logger.debug(logging_text + 'Waiting for tasks {}'.format(list(tasks_dict_info.keys())))
3383 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
3384 min(self.timeout_charm_delete, timeout_ns_terminate),
3385 stage, nslcmop_id)
3386 tasks_dict_info.clear()
3387 if error_list:
3388 return # raise LcmException("; ".join(error_list))
3389
3390 # remove All execution environments at once
3391 stage[0] = "Stage 3/3 delete all."
3392
3393 if nsr_deployed.get("VCA"):
3394 stage[1] = "Deleting all execution environments."
3395 self.logger.debug(logging_text + stage[1])
3396 task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_all_N2VC(db_nsr=db_nsr),
3397 timeout=self.timeout_charm_delete))
3398 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3399 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
3400
3401 # Delete from k8scluster
3402 stage[1] = "Deleting KDUs."
3403 self.logger.debug(logging_text + stage[1])
3404 # print(nsr_deployed)
3405 for kdu in get_iterable(nsr_deployed, "K8s"):
3406 if not kdu or not kdu.get("kdu-instance"):
3407 continue
3408 kdu_instance = kdu.get("kdu-instance")
3409 if kdu.get("k8scluster-type") in self.k8scluster_map:
3410 task_delete_kdu_instance = asyncio.ensure_future(
3411 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
3412 cluster_uuid=kdu.get("k8scluster-uuid"),
3413 kdu_instance=kdu_instance))
3414 else:
3415 self.logger.error(logging_text + "Unknown k8s deployment type {}".
3416 format(kdu.get("k8scluster-type")))
3417 continue
3418 tasks_dict_info[task_delete_kdu_instance] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
3419
3420 # remove from RO
3421 stage[1] = "Deleting ns from VIM."
3422 if self.ng_ro:
3423 task_delete_ro = asyncio.ensure_future(
3424 self._terminate_ng_ro(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3425 else:
3426 task_delete_ro = asyncio.ensure_future(
3427 self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3428 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
3429
3430 # rest of staff will be done at finally
3431
3432 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
3433 self.logger.error(logging_text + "Exit Exception {}".format(e))
3434 exc = e
3435 except asyncio.CancelledError:
3436 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
3437 exc = "Operation was cancelled"
3438 except Exception as e:
3439 exc = traceback.format_exc()
3440 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
3441 finally:
3442 if exc:
3443 error_list.append(str(exc))
3444 try:
3445 # wait for pending tasks
3446 if tasks_dict_info:
3447 stage[1] = "Waiting for terminate pending tasks."
3448 self.logger.debug(logging_text + stage[1])
3449 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_terminate,
3450 stage, nslcmop_id)
3451 stage[1] = stage[2] = ""
3452 except asyncio.CancelledError:
3453 error_list.append("Cancelled")
3454 # TODO cancell all tasks
3455 except Exception as exc:
3456 error_list.append(str(exc))
3457 # update status at database
3458 if error_list:
3459 error_detail = "; ".join(error_list)
3460 # self.logger.error(logging_text + error_detail)
3461 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
3462 error_description_nsr = 'Operation: TERMINATING.{}, {}.'.format(nslcmop_id, stage[0])
3463
3464 db_nsr_update["operational-status"] = "failed"
3465 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
3466 db_nslcmop_update["detailed-status"] = error_detail
3467 nslcmop_operation_state = "FAILED"
3468 ns_state = "BROKEN"
3469 else:
3470 error_detail = None
3471 error_description_nsr = error_description_nslcmop = None
3472 ns_state = "NOT_INSTANTIATED"
3473 db_nsr_update["operational-status"] = "terminated"
3474 db_nsr_update["detailed-status"] = "Done"
3475 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
3476 db_nslcmop_update["detailed-status"] = "Done"
3477 nslcmop_operation_state = "COMPLETED"
3478
3479 if db_nsr:
3480 self._write_ns_status(
3481 nsr_id=nsr_id,
3482 ns_state=ns_state,
3483 current_operation="IDLE",
3484 current_operation_id=None,
3485 error_description=error_description_nsr,
3486 error_detail=error_detail,
3487 other_update=db_nsr_update
3488 )
3489 self._write_op_status(
3490 op_id=nslcmop_id,
3491 stage="",
3492 error_message=error_description_nslcmop,
3493 operation_state=nslcmop_operation_state,
3494 other_update=db_nslcmop_update,
3495 )
3496 if ns_state == "NOT_INSTANTIATED":
3497 try:
3498 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "NOT_INSTANTIATED"})
3499 except DbException as e:
3500 self.logger.warn(logging_text + 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
3501 format(nsr_id, e))
3502 if operation_params:
3503 autoremove = operation_params.get("autoremove", False)
3504 if nslcmop_operation_state:
3505 try:
3506 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3507 "operationState": nslcmop_operation_state,
3508 "autoremove": autoremove},
3509 loop=self.loop)
3510 except Exception as e:
3511 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3512
3513 self.logger.debug(logging_text + "Exit")
3514 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
3515
3516 async def _wait_for_tasks(self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None):
3517 time_start = time()
3518 error_detail_list = []
3519 error_list = []
3520 pending_tasks = list(created_tasks_info.keys())
3521 num_tasks = len(pending_tasks)
3522 num_done = 0
3523 stage[1] = "{}/{}.".format(num_done, num_tasks)
3524 self._write_op_status(nslcmop_id, stage)
3525 while pending_tasks:
3526 new_error = None
3527 _timeout = timeout + time_start - time()
3528 done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout,
3529 return_when=asyncio.FIRST_COMPLETED)
3530 num_done += len(done)
3531 if not done: # Timeout
3532 for task in pending_tasks:
3533 new_error = created_tasks_info[task] + ": Timeout"
3534 error_detail_list.append(new_error)
3535 error_list.append(new_error)
3536 break
3537 for task in done:
3538 if task.cancelled():
3539 exc = "Cancelled"
3540 else:
3541 exc = task.exception()
3542 if exc:
3543 if isinstance(exc, asyncio.TimeoutError):
3544 exc = "Timeout"
3545 new_error = created_tasks_info[task] + ": {}".format(exc)
3546 error_list.append(created_tasks_info[task])
3547 error_detail_list.append(new_error)
3548 if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException,
3549 K8sException)):
3550 self.logger.error(logging_text + new_error)
3551 else:
3552 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
3553 self.logger.error(logging_text + created_tasks_info[task] + exc_traceback)
3554 else:
3555 self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
3556 stage[1] = "{}/{}.".format(num_done, num_tasks)
3557 if new_error:
3558 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
3559 if nsr_id: # update also nsr
3560 self.update_db_2("nsrs", nsr_id, {"errorDescription": "Error at: " + ", ".join(error_list),
3561 "errorDetail": ". ".join(error_detail_list)})
3562 self._write_op_status(nslcmop_id, stage)
3563 return error_detail_list
3564
3565 @staticmethod
3566 def _map_primitive_params(primitive_desc, params, instantiation_params):
3567 """
3568 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3569 The default-value is used. If it is between < > it look for a value at instantiation_params
3570 :param primitive_desc: portion of VNFD/NSD that describes primitive
3571 :param params: Params provided by user
3572 :param instantiation_params: Instantiation params provided by user
3573 :return: a dictionary with the calculated params
3574 """
3575 calculated_params = {}
3576 for parameter in primitive_desc.get("parameter", ()):
3577 param_name = parameter["name"]
3578 if param_name in params:
3579 calculated_params[param_name] = params[param_name]
3580 elif "default-value" in parameter or "value" in parameter:
3581 if "value" in parameter:
3582 calculated_params[param_name] = parameter["value"]
3583 else:
3584 calculated_params[param_name] = parameter["default-value"]
3585 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
3586 and calculated_params[param_name].endswith(">"):
3587 if calculated_params[param_name][1:-1] in instantiation_params:
3588 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
3589 else:
3590 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3591 format(calculated_params[param_name], primitive_desc["name"]))
3592 else:
3593 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3594 format(param_name, primitive_desc["name"]))
3595
3596 if isinstance(calculated_params[param_name], (dict, list, tuple)):
3597 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
3598 width=256)
3599 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
3600 calculated_params[param_name] = calculated_params[param_name][7:]
3601 if parameter.get("data-type") == "INTEGER":
3602 try:
3603 calculated_params[param_name] = int(calculated_params[param_name])
3604 except ValueError: # error converting string to int
3605 raise LcmException(
3606 "Parameter {} of primitive {} must be integer".format(param_name, primitive_desc["name"]))
3607 elif parameter.get("data-type") == "BOOLEAN":
3608 calculated_params[param_name] = not ((str(calculated_params[param_name])).lower() == 'false')
3609
3610 # add always ns_config_info if primitive name is config
3611 if primitive_desc["name"] == "config":
3612 if "ns_config_info" in instantiation_params:
3613 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
3614 return calculated_params
3615
3616 def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None,
3617 ee_descriptor_id=None):
3618 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3619 for vca in deployed_vca:
3620 if not vca:
3621 continue
3622 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
3623 continue
3624 if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]:
3625 continue
3626 if kdu_name and kdu_name != vca["kdu_name"]:
3627 continue
3628 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
3629 continue
3630 break
3631 else:
3632 # vca_deployed not found
3633 raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
3634 " is not deployed".format(member_vnf_index, vdu_id, vdu_count_index, kdu_name,
3635 ee_descriptor_id))
3636
3637 # get ee_id
3638 ee_id = vca.get("ee_id")
3639 vca_type = vca.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3640 if not ee_id:
3641 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3642 "execution environment"
3643 .format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
3644 return ee_id, vca_type
3645
3646 async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0,
3647 retries_interval=30, timeout=None,
3648 vca_type=None, db_dict=None) -> (str, str):
3649 try:
3650 if primitive == "config":
3651 primitive_params = {"params": primitive_params}
3652
3653 vca_type = vca_type or "lxc_proxy_charm"
3654
3655 while retries >= 0:
3656 try:
3657 output = await asyncio.wait_for(
3658 self.vca_map[vca_type].exec_primitive(
3659 ee_id=ee_id,
3660 primitive_name=primitive,
3661 params_dict=primitive_params,
3662 progress_timeout=self.timeout_progress_primitive,
3663 total_timeout=self.timeout_primitive,
3664 db_dict=db_dict),
3665 timeout=timeout or self.timeout_primitive)
3666 # execution was OK
3667 break
3668 except asyncio.CancelledError:
3669 raise
3670 except Exception as e: # asyncio.TimeoutError
3671 if isinstance(e, asyncio.TimeoutError):
3672 e = "Timeout"
3673 retries -= 1
3674 if retries >= 0:
3675 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
3676 # wait and retry
3677 await asyncio.sleep(retries_interval, loop=self.loop)
3678 else:
3679 return 'FAILED', str(e)
3680
3681 return 'COMPLETED', output
3682
3683 except (LcmException, asyncio.CancelledError):
3684 raise
3685 except Exception as e:
3686 return 'FAIL', 'Error executing action {}: {}'.format(primitive, e)
3687
3688 async def action(self, nsr_id, nslcmop_id):
3689
3690 # Try to lock HA task here
3691 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3692 if not task_is_locked_by_me:
3693 return
3694
3695 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
3696 self.logger.debug(logging_text + "Enter")
3697 # get all needed from database
3698 db_nsr = None
3699 db_nslcmop = None
3700 db_nsr_update = {}
3701 db_nslcmop_update = {}
3702 nslcmop_operation_state = None
3703 error_description_nslcmop = None
3704 exc = None
3705 try:
3706 # wait for any previous tasks in process
3707 step = "Waiting for previous operations to terminate"
3708 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3709
3710 self._write_ns_status(
3711 nsr_id=nsr_id,
3712 ns_state=None,
3713 current_operation="RUNNING ACTION",
3714 current_operation_id=nslcmop_id
3715 )
3716
3717 step = "Getting information from database"
3718 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3719 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3720
3721 nsr_deployed = db_nsr["_admin"].get("deployed")
3722 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3723 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3724 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
3725 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3726 primitive = db_nslcmop["operationParams"]["primitive"]
3727 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
3728 timeout_ns_action = db_nslcmop["operationParams"].get("timeout_ns_action", self.timeout_primitive)
3729
3730 if vnf_index:
3731 step = "Getting vnfr from database"
3732 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3733 step = "Getting vnfd from database"
3734 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3735 else:
3736 step = "Getting nsd from database"
3737 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
3738
3739 # for backward compatibility
3740 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3741 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3742 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3743 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3744
3745 # look for primitive
3746 config_primitive_desc = descriptor_configuration = None
3747 if vdu_id:
3748 for vdu in get_iterable(db_vnfd, "vdu"):
3749 if vdu_id == vdu["id"]:
3750 descriptor_configuration = vdu.get("vdu-configuration")
3751 break
3752 elif kdu_name:
3753 for kdu in get_iterable(db_vnfd, "kdu"):
3754 if kdu_name == kdu["name"]:
3755 descriptor_configuration = kdu.get("kdu-configuration")
3756 break
3757 elif vnf_index:
3758 descriptor_configuration = db_vnfd.get("vnf-configuration")
3759 else:
3760 descriptor_configuration = db_nsd.get("ns-configuration")
3761
3762 if descriptor_configuration and descriptor_configuration.get("config-primitive"):
3763 for config_primitive in descriptor_configuration["config-primitive"]:
3764 if config_primitive["name"] == primitive:
3765 config_primitive_desc = config_primitive
3766 break
3767
3768 if not config_primitive_desc:
3769 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
3770 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3771 format(primitive))
3772 primitive_name = primitive
3773 ee_descriptor_id = None
3774 else:
3775 primitive_name = config_primitive_desc.get("execution-environment-primitive", primitive)
3776 ee_descriptor_id = config_primitive_desc.get("execution-environment-ref")
3777
3778 if vnf_index:
3779 if vdu_id:
3780 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3781 desc_params = self._format_additional_params(vdur.get("additionalParams"))
3782 elif kdu_name:
3783 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
3784 desc_params = self._format_additional_params(kdur.get("additionalParams"))
3785 else:
3786 desc_params = self._format_additional_params(db_vnfr.get("additionalParamsForVnf"))
3787 else:
3788 desc_params = self._format_additional_params(db_nsr.get("additionalParamsForNs"))
3789
3790 if kdu_name:
3791 kdu_action = True if not deep_get(kdu, ("kdu-configuration", "juju")) else False
3792
3793 # TODO check if ns is in a proper status
3794 if kdu_name and (primitive_name in ("upgrade", "rollback", "status") or kdu_action):
3795 # kdur and desc_params already set from before
3796 if primitive_params:
3797 desc_params.update(primitive_params)
3798 # TODO Check if we will need something at vnf level
3799 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
3800 if kdu_name == kdu["kdu-name"] and kdu["member-vnf-index"] == vnf_index:
3801 break
3802 else:
3803 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index))
3804
3805 if kdu.get("k8scluster-type") not in self.k8scluster_map:
3806 msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
3807 raise LcmException(msg)
3808
3809 db_dict = {"collection": "nsrs",
3810 "filter": {"_id": nsr_id},
3811 "path": "_admin.deployed.K8s.{}".format(index)}
3812 self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name))
3813 step = "Executing kdu {}".format(primitive_name)
3814 if primitive_name == "upgrade":
3815 if desc_params.get("kdu_model"):
3816 kdu_model = desc_params.get("kdu_model")
3817 del desc_params["kdu_model"]
3818 else:
3819 kdu_model = kdu.get("kdu-model")
3820 parts = kdu_model.split(sep=":")
3821 if len(parts) == 2:
3822 kdu_model = parts[0]
3823
3824 detailed_status = await asyncio.wait_for(
3825 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
3826 cluster_uuid=kdu.get("k8scluster-uuid"),
3827 kdu_instance=kdu.get("kdu-instance"),
3828 atomic=True, kdu_model=kdu_model,
3829 params=desc_params, db_dict=db_dict,
3830 timeout=timeout_ns_action),
3831 timeout=timeout_ns_action + 10)
3832 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(detailed_status))
3833 elif primitive_name == "rollback":
3834 detailed_status = await asyncio.wait_for(
3835 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
3836 cluster_uuid=kdu.get("k8scluster-uuid"),
3837 kdu_instance=kdu.get("kdu-instance"),
3838 db_dict=db_dict),
3839 timeout=timeout_ns_action)
3840 elif primitive_name == "status":
3841 detailed_status = await asyncio.wait_for(
3842 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
3843 cluster_uuid=kdu.get("k8scluster-uuid"),
3844 kdu_instance=kdu.get("kdu-instance")),
3845 timeout=timeout_ns_action)
3846 else:
3847 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(kdu["kdu-name"], nsr_id)
3848 params = self._map_primitive_params(config_primitive_desc, primitive_params, desc_params)
3849
3850 detailed_status = await asyncio.wait_for(
3851 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
3852 cluster_uuid=kdu.get("k8scluster-uuid"),
3853 kdu_instance=kdu_instance,
3854 primitive_name=primitive_name,
3855 params=params, db_dict=db_dict,
3856 timeout=timeout_ns_action),
3857 timeout=timeout_ns_action)
3858
3859 if detailed_status:
3860 nslcmop_operation_state = 'COMPLETED'
3861 else:
3862 detailed_status = ''
3863 nslcmop_operation_state = 'FAILED'
3864 else:
3865 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
3866 member_vnf_index=vnf_index,
3867 vdu_id=vdu_id,
3868 vdu_count_index=vdu_count_index,
3869 ee_descriptor_id=ee_descriptor_id)
3870 db_nslcmop_notif = {"collection": "nslcmops",
3871 "filter": {"_id": nslcmop_id},
3872 "path": "admin.VCA"}
3873 nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
3874 ee_id,
3875 primitive=primitive_name,
3876 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
3877 timeout=timeout_ns_action,
3878 vca_type=vca_type,
3879 db_dict=db_nslcmop_notif)
3880
3881 db_nslcmop_update["detailed-status"] = detailed_status
3882 error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else ""
3883 self.logger.debug(logging_text + " task Done with result {} {}".format(nslcmop_operation_state,
3884 detailed_status))
3885 return # database update is called inside finally
3886
3887 except (DbException, LcmException, N2VCException, K8sException) as e:
3888 self.logger.error(logging_text + "Exit Exception {}".format(e))
3889 exc = e
3890 except asyncio.CancelledError:
3891 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3892 exc = "Operation was cancelled"
3893 except asyncio.TimeoutError:
3894 self.logger.error(logging_text + "Timeout while '{}'".format(step))
3895 exc = "Timeout"
3896 except Exception as e:
3897 exc = traceback.format_exc()
3898 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3899 finally:
3900 if exc:
3901 db_nslcmop_update["detailed-status"] = detailed_status = error_description_nslcmop = \
3902 "FAILED {}: {}".format(step, exc)
3903 nslcmop_operation_state = "FAILED"
3904 if db_nsr:
3905 self._write_ns_status(
3906 nsr_id=nsr_id,
3907 ns_state=db_nsr["nsState"], # TODO check if degraded. For the moment use previous status
3908 current_operation="IDLE",
3909 current_operation_id=None,
3910 # error_description=error_description_nsr,
3911 # error_detail=error_detail,
3912 other_update=db_nsr_update
3913 )
3914
3915 self._write_op_status(
3916 op_id=nslcmop_id,
3917 stage="",
3918 error_message=error_description_nslcmop,
3919 operation_state=nslcmop_operation_state,
3920 other_update=db_nslcmop_update,
3921 )
3922
3923 if nslcmop_operation_state:
3924 try:
3925 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3926 "operationState": nslcmop_operation_state},
3927 loop=self.loop)
3928 except Exception as e:
3929 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3930 self.logger.debug(logging_text + "Exit")
3931 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
3932 return nslcmop_operation_state, detailed_status
3933
3934 async def scale(self, nsr_id, nslcmop_id):
3935
3936 # Try to lock HA task here
3937 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3938 if not task_is_locked_by_me:
3939 return
3940
3941 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
3942 self.logger.debug(logging_text + "Enter")
3943 # get all needed from database
3944 db_nsr = None
3945 db_nslcmop = None
3946 db_nslcmop_update = {}
3947 nslcmop_operation_state = None
3948 db_nsr_update = {}
3949 exc = None
3950 # in case of error, indicates what part of scale was failed to put nsr at error status
3951 scale_process = None
3952 old_operational_status = ""
3953 old_config_status = ""
3954 try:
3955 # wait for any previous tasks in process
3956 step = "Waiting for previous operations to terminate"
3957 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3958
3959 self._write_ns_status(
3960 nsr_id=nsr_id,
3961 ns_state=None,
3962 current_operation="SCALING",
3963 current_operation_id=nslcmop_id
3964 )
3965
3966 step = "Getting nslcmop from database"
3967 self.logger.debug(step + " after having waited for previous tasks to be completed")
3968 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3969 step = "Getting nsr from database"
3970 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3971
3972 old_operational_status = db_nsr["operational-status"]
3973 old_config_status = db_nsr["config-status"]
3974 step = "Parsing scaling parameters"
3975 # self.logger.debug(step)
3976 db_nsr_update["operational-status"] = "scaling"
3977 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3978 nsr_deployed = db_nsr["_admin"].get("deployed")
3979
3980 #######
3981 nsr_deployed = db_nsr["_admin"].get("deployed")
3982 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3983 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3984 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3985 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3986 #######
3987
3988 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
3989 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3990 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3991 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3992 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
3993
3994 # for backward compatibility
3995 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3996 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3997 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3998 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3999
4000 step = "Getting vnfr from database"
4001 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
4002 step = "Getting vnfd from database"
4003 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4004
4005 step = "Getting scaling-group-descriptor"
4006 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
4007 if scaling_descriptor["name"] == scaling_group:
4008 break
4009 else:
4010 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
4011 "at vnfd:scaling-group-descriptor".format(scaling_group))
4012
4013 # cooldown_time = 0
4014 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
4015 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
4016 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
4017 # break
4018
4019 # TODO check if ns is in a proper status
4020 step = "Sending scale order to VIM"
4021 nb_scale_op = 0
4022 if not db_nsr["_admin"].get("scaling-group"):
4023 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
4024 admin_scale_index = 0
4025 else:
4026 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
4027 if admin_scale_info["name"] == scaling_group:
4028 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
4029 break
4030 else: # not found, set index one plus last element and add new entry with the name
4031 admin_scale_index += 1
4032 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
4033 RO_scaling_info = []
4034 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
4035 if scaling_type == "SCALE_OUT":
4036 # count if max-instance-count is reached
4037 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
4038 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
4039 if nb_scale_op >= max_instance_count:
4040 raise LcmException("reached the limit of {} (max-instance-count) "
4041 "scaling-out operations for the "
4042 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
4043
4044 nb_scale_op += 1
4045 vdu_scaling_info["scaling_direction"] = "OUT"
4046 vdu_scaling_info["vdu-create"] = {}
4047 for vdu_scale_info in scaling_descriptor["vdu"]:
4048 vdud = next(vdu for vdu in db_vnfd.get("vdu") if vdu["id"] == vdu_scale_info["vdu-id-ref"])
4049 vdu_index = len([x for x in db_vnfr.get("vdur", ())
4050 if x.get("vdu-id-ref") == vdu_scale_info["vdu-id-ref"] and
4051 x.get("member-vnf-index-ref") == vnf_index])
4052 cloud_init_text = self._get_cloud_init(vdud, db_vnfd)
4053 if cloud_init_text:
4054 additional_params = self._get_vdu_additional_params(db_vnfr, vdud["id"]) or {}
4055 cloud_init_list = []
4056 for x in range(vdu_scale_info.get("count", 1)):
4057 if cloud_init_text:
4058 # TODO Information of its own ip is not available because db_vnfr is not updated.
4059 additional_params["OSM"] = self._get_osm_params(db_vnfr, vdu_scale_info["vdu-id-ref"],
4060 vdu_index + x)
4061 cloud_init_list.append(self._parse_cloud_init(cloud_init_text, additional_params,
4062 db_vnfd["id"], vdud["id"]))
4063 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
4064 "type": "create", "count": vdu_scale_info.get("count", 1)})
4065 if cloud_init_list:
4066 RO_scaling_info[-1]["cloud_init"] = cloud_init_list
4067 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
4068
4069 elif scaling_type == "SCALE_IN":
4070 # count if min-instance-count is reached
4071 min_instance_count = 0
4072 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
4073 min_instance_count = int(scaling_descriptor["min-instance-count"])
4074 if nb_scale_op <= min_instance_count:
4075 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
4076 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
4077 nb_scale_op -= 1
4078 vdu_scaling_info["scaling_direction"] = "IN"
4079 vdu_scaling_info["vdu-delete"] = {}
4080 for vdu_scale_info in scaling_descriptor["vdu"]:
4081 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
4082 "type": "delete", "count": vdu_scale_info.get("count", 1)})
4083 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
4084
4085 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
4086 vdu_create = vdu_scaling_info.get("vdu-create")
4087 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
4088 if vdu_scaling_info["scaling_direction"] == "IN":
4089 for vdur in reversed(db_vnfr["vdur"]):
4090 if vdu_delete.get(vdur["vdu-id-ref"]):
4091 vdu_delete[vdur["vdu-id-ref"]] -= 1
4092 vdu_scaling_info["vdu"].append({
4093 "name": vdur["name"],
4094 "vdu_id": vdur["vdu-id-ref"],
4095 "interface": []
4096 })
4097 for interface in vdur["interfaces"]:
4098 vdu_scaling_info["vdu"][-1]["interface"].append({
4099 "name": interface["name"],
4100 "ip_address": interface["ip-address"],
4101 "mac_address": interface.get("mac-address"),
4102 })
4103 vdu_delete = vdu_scaling_info.pop("vdu-delete")
4104
4105 # PRE-SCALE BEGIN
4106 step = "Executing pre-scale vnf-config-primitive"
4107 if scaling_descriptor.get("scaling-config-action"):
4108 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
4109 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
4110 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
4111 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
4112 step = db_nslcmop_update["detailed-status"] = \
4113 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
4114
4115 # look for primitive
4116 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
4117 if config_primitive["name"] == vnf_config_primitive:
4118 break
4119 else:
4120 raise LcmException(
4121 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
4122 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
4123 "primitive".format(scaling_group, vnf_config_primitive))
4124
4125 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
4126 if db_vnfr.get("additionalParamsForVnf"):
4127 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
4128
4129 scale_process = "VCA"
4130 db_nsr_update["config-status"] = "configuring pre-scaling"
4131 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
4132
4133 # Pre-scale retry check: Check if this sub-operation has been executed before
4134 op_index = self._check_or_add_scale_suboperation(
4135 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
4136 if op_index == self.SUBOPERATION_STATUS_SKIP:
4137 # Skip sub-operation
4138 result = 'COMPLETED'
4139 result_detail = 'Done'
4140 self.logger.debug(logging_text +
4141 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
4142 vnf_config_primitive, result, result_detail))
4143 else:
4144 if op_index == self.SUBOPERATION_STATUS_NEW:
4145 # New sub-operation: Get index of this sub-operation
4146 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4147 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
4148 format(vnf_config_primitive))
4149 else:
4150 # retry: Get registered params for this existing sub-operation
4151 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4152 vnf_index = op.get('member_vnf_index')
4153 vnf_config_primitive = op.get('primitive')
4154 primitive_params = op.get('primitive_params')
4155 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
4156 format(vnf_config_primitive))
4157 # Execute the primitive, either with new (first-time) or registered (reintent) args
4158 ee_descriptor_id = config_primitive.get("execution-environment-ref")
4159 primitive_name = config_primitive.get("execution-environment-primitive",
4160 vnf_config_primitive)
4161 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
4162 member_vnf_index=vnf_index,
4163 vdu_id=None,
4164 vdu_count_index=None,
4165 ee_descriptor_id=ee_descriptor_id)
4166 result, result_detail = await self._ns_execute_primitive(
4167 ee_id, primitive_name, primitive_params, vca_type)
4168 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
4169 vnf_config_primitive, result, result_detail))
4170 # Update operationState = COMPLETED | FAILED
4171 self._update_suboperation_status(
4172 db_nslcmop, op_index, result, result_detail)
4173
4174 if result == "FAILED":
4175 raise LcmException(result_detail)
4176 db_nsr_update["config-status"] = old_config_status
4177 scale_process = None
4178 # PRE-SCALE END
4179
4180 # SCALE RO - BEGIN
4181 # Should this block be skipped if 'RO_nsr_id' == None ?
4182 # if (RO_nsr_id and RO_scaling_info):
4183 if RO_scaling_info:
4184 scale_process = "RO"
4185 # Scale RO retry check: Check if this sub-operation has been executed before
4186 op_index = self._check_or_add_scale_suboperation(
4187 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
4188 if op_index == self.SUBOPERATION_STATUS_SKIP:
4189 # Skip sub-operation
4190 result = 'COMPLETED'
4191 result_detail = 'Done'
4192 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
4193 result, result_detail))
4194 else:
4195 if op_index == self.SUBOPERATION_STATUS_NEW:
4196 # New sub-operation: Get index of this sub-operation
4197 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4198 self.logger.debug(logging_text + "New sub-operation RO")
4199 else:
4200 # retry: Get registered params for this existing sub-operation
4201 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4202 RO_nsr_id = op.get('RO_nsr_id')
4203 RO_scaling_info = op.get('RO_scaling_info')
4204 self.logger.debug(logging_text + "Sub-operation RO retry for primitive {}".format(
4205 vnf_config_primitive))
4206
4207 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
4208 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
4209 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
4210 # wait until ready
4211 RO_nslcmop_id = RO_desc["instance_action_id"]
4212 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
4213
4214 RO_task_done = False
4215 step = detailed_status = "Waiting for VIM to scale. RO_task_id={}.".format(RO_nslcmop_id)
4216 detailed_status_old = None
4217 self.logger.debug(logging_text + step)
4218
4219 deployment_timeout = 1 * 3600 # One hour
4220 while deployment_timeout > 0:
4221 if not RO_task_done:
4222 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
4223 extra_item_id=RO_nslcmop_id)
4224
4225 # deploymentStatus
4226 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4227
4228 ns_status, ns_status_info = self.RO.check_action_status(desc)
4229 if ns_status == "ERROR":
4230 raise ROclient.ROClientException(ns_status_info)
4231 elif ns_status == "BUILD":
4232 detailed_status = step + "; {}".format(ns_status_info)
4233 elif ns_status == "ACTIVE":
4234 RO_task_done = True
4235 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
4236 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
4237 self.logger.debug(logging_text + step)
4238 else:
4239 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
4240 else:
4241 desc = await self.RO.show("ns", RO_nsr_id)
4242 ns_status, ns_status_info = self.RO.check_ns_status(desc)
4243 # deploymentStatus
4244 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4245
4246 if ns_status == "ERROR":
4247 raise ROclient.ROClientException(ns_status_info)
4248 elif ns_status == "BUILD":
4249 detailed_status = step + "; {}".format(ns_status_info)
4250 elif ns_status == "ACTIVE":
4251 step = detailed_status = \
4252 "Waiting for management IP address reported by the VIM. Updating VNFRs"
4253 try:
4254 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
4255 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
4256 break
4257 except LcmExceptionNoMgmtIP:
4258 pass
4259 else:
4260 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
4261 if detailed_status != detailed_status_old:
4262 self._update_suboperation_status(
4263 db_nslcmop, op_index, 'COMPLETED', detailed_status)
4264 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
4265 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
4266
4267 await asyncio.sleep(5, loop=self.loop)
4268 deployment_timeout -= 5
4269 if deployment_timeout <= 0:
4270 self._update_suboperation_status(
4271 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
4272 raise ROclient.ROClientException("Timeout waiting ns to be ready")
4273
4274 # update VDU_SCALING_INFO with the obtained ip_addresses
4275 if vdu_scaling_info["scaling_direction"] == "OUT":
4276 for vdur in reversed(db_vnfr["vdur"]):
4277 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
4278 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
4279 vdu_scaling_info["vdu"].append({
4280 "name": vdur["name"],
4281 "vdu_id": vdur["vdu-id-ref"],
4282 "interface": []
4283 })
4284 for interface in vdur["interfaces"]:
4285 vdu_scaling_info["vdu"][-1]["interface"].append({
4286 "name": interface["name"],
4287 "ip_address": interface["ip-address"],
4288 "mac_address": interface.get("mac-address"),
4289 })
4290 del vdu_scaling_info["vdu-create"]
4291
4292 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
4293 # SCALE RO - END
4294
4295 scale_process = None
4296 if db_nsr_update:
4297 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4298
4299 # POST-SCALE BEGIN
4300 # execute primitive service POST-SCALING
4301 step = "Executing post-scale vnf-config-primitive"
4302 if scaling_descriptor.get("scaling-config-action"):
4303 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
4304 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
4305 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
4306 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
4307 step = db_nslcmop_update["detailed-status"] = \
4308 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
4309
4310 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
4311 if db_vnfr.get("additionalParamsForVnf"):
4312 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
4313
4314 # look for primitive
4315 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
4316 if config_primitive["name"] == vnf_config_primitive:
4317 break
4318 else:
4319 raise LcmException(
4320 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
4321 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
4322 "config-primitive".format(scaling_group, vnf_config_primitive))
4323 scale_process = "VCA"
4324 db_nsr_update["config-status"] = "configuring post-scaling"
4325 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
4326
4327 # Post-scale retry check: Check if this sub-operation has been executed before
4328 op_index = self._check_or_add_scale_suboperation(
4329 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
4330 if op_index == self.SUBOPERATION_STATUS_SKIP:
4331 # Skip sub-operation
4332 result = 'COMPLETED'
4333 result_detail = 'Done'
4334 self.logger.debug(logging_text +
4335 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
4336 format(vnf_config_primitive, result, result_detail))
4337 else:
4338 if op_index == self.SUBOPERATION_STATUS_NEW:
4339 # New sub-operation: Get index of this sub-operation
4340 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4341 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
4342 format(vnf_config_primitive))
4343 else:
4344 # retry: Get registered params for this existing sub-operation
4345 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4346 vnf_index = op.get('member_vnf_index')
4347 vnf_config_primitive = op.get('primitive')
4348 primitive_params = op.get('primitive_params')
4349 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
4350 format(vnf_config_primitive))
4351 # Execute the primitive, either with new (first-time) or registered (reintent) args
4352 ee_descriptor_id = config_primitive.get("execution-environment-ref")
4353 primitive_name = config_primitive.get("execution-environment-primitive",
4354 vnf_config_primitive)
4355 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
4356 member_vnf_index=vnf_index,
4357 vdu_id=None,
4358 vdu_count_index=None,
4359 ee_descriptor_id=ee_descriptor_id)
4360 result, result_detail = await self._ns_execute_primitive(
4361 ee_id, primitive_name, primitive_params, vca_type)
4362 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
4363 vnf_config_primitive, result, result_detail))
4364 # Update operationState = COMPLETED | FAILED
4365 self._update_suboperation_status(
4366 db_nslcmop, op_index, result, result_detail)
4367
4368 if result == "FAILED":
4369 raise LcmException(result_detail)
4370 db_nsr_update["config-status"] = old_config_status
4371 scale_process = None
4372 # POST-SCALE END
4373
4374 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
4375 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
4376 else old_operational_status
4377 db_nsr_update["config-status"] = old_config_status
4378 return
4379 except (ROclient.ROClientException, DbException, LcmException) as e:
4380 self.logger.error(logging_text + "Exit Exception {}".format(e))
4381 exc = e
4382 except asyncio.CancelledError:
4383 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
4384 exc = "Operation was cancelled"
4385 except Exception as e:
4386 exc = traceback.format_exc()
4387 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
4388 finally:
4389 self._write_ns_status(
4390 nsr_id=nsr_id,
4391 ns_state=None,
4392 current_operation="IDLE",
4393 current_operation_id=None
4394 )
4395 if exc:
4396 db_nslcmop_update["detailed-status"] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
4397 nslcmop_operation_state = "FAILED"
4398 if db_nsr:
4399 db_nsr_update["operational-status"] = old_operational_status
4400 db_nsr_update["config-status"] = old_config_status
4401 db_nsr_update["detailed-status"] = ""
4402 if scale_process:
4403 if "VCA" in scale_process:
4404 db_nsr_update["config-status"] = "failed"
4405 if "RO" in scale_process:
4406 db_nsr_update["operational-status"] = "failed"
4407 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
4408 exc)
4409 else:
4410 error_description_nslcmop = None
4411 nslcmop_operation_state = "COMPLETED"
4412 db_nslcmop_update["detailed-status"] = "Done"
4413
4414 self._write_op_status(
4415 op_id=nslcmop_id,
4416 stage="",
4417 error_message=error_description_nslcmop,
4418 operation_state=nslcmop_operation_state,
4419 other_update=db_nslcmop_update,
4420 )
4421 if db_nsr:
4422 self._write_ns_status(
4423 nsr_id=nsr_id,
4424 ns_state=None,
4425 current_operation="IDLE",
4426 current_operation_id=None,
4427 other_update=db_nsr_update
4428 )
4429
4430 if nslcmop_operation_state:
4431 try:
4432 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
4433 "operationState": nslcmop_operation_state},
4434 loop=self.loop)
4435 # if cooldown_time:
4436 # await asyncio.sleep(cooldown_time, loop=self.loop)
4437 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
4438 except Exception as e:
4439 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
4440 self.logger.debug(logging_text + "Exit")
4441 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
4442
4443 async def add_prometheus_metrics(self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip):
4444 if not self.prometheus:
4445 return
4446 # look if exist a file called 'prometheus*.j2' and
4447 artifact_content = self.fs.dir_ls(artifact_path)
4448 job_file = next((f for f in artifact_content if f.startswith("prometheus") and f.endswith(".j2")), None)
4449 if not job_file:
4450 return
4451 with self.fs.file_open((artifact_path, job_file), "r") as f:
4452 job_data = f.read()
4453
4454 # TODO get_service
4455 _, _, service = ee_id.partition(".") # remove prefix "namespace."
4456 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
4457 host_port = "80"
4458 vnfr_id = vnfr_id.replace("-", "")
4459 variables = {
4460 "JOB_NAME": vnfr_id,
4461 "TARGET_IP": target_ip,
4462 "EXPORTER_POD_IP": host_name,
4463 "EXPORTER_POD_PORT": host_port,
4464 }
4465 job_list = self.prometheus.parse_job(job_data, variables)
4466 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
4467 for job in job_list:
4468 if not isinstance(job.get("job_name"), str) or vnfr_id not in job["job_name"]:
4469 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
4470 job["nsr_id"] = nsr_id
4471 job_dict = {jl["job_name"]: jl for jl in job_list}
4472 if await self.prometheus.update(job_dict):
4473 return list(job_dict.keys())
4474
4475 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
4476 """
4477 Get VCA Cloud and VCA Cloud Credentials for the VIM account
4478
4479 :param: vim_account_id: VIM Account ID
4480
4481 :return: (cloud_name, cloud_credential)
4482 """
4483 config = self.get_vim_account_config(vim_account_id)
4484 return config.get("vca_cloud"), config.get("vca_cloud_credential")
4485
4486 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
4487 """
4488 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
4489
4490 :param: vim_account_id: VIM Account ID
4491
4492 :return: (cloud_name, cloud_credential)
4493 """
4494 config = self.get_vim_account_config(vim_account_id)
4495 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")
4496
4497 def get_vim_account_config(self, vim_account_id: str) -> dict:
4498 """
4499 Get VIM Account config from the OSM Database
4500
4501 :param: vim_account_id: VIM Account ID
4502
4503 :return: Dictionary with the config of the vim account
4504 """
4505 vim_account = self.db.get_one(table="vim_accounts", q_filter={"_id": vim_account_id}, fail_on_empty=False)
4506 return vim_account.get("config", {}) if vim_account else {}