Fix bug introduced in aae391fc:
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, TemplateError, TemplateNotFound, StrictUndefined, UndefinedError
26
27 from osm_lcm import ROclient
28 from osm_lcm.ng_ro import NgRoClient, NgRoException
29 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
30 from n2vc.k8s_helm_conn import K8sHelmConnector
31 from n2vc.k8s_helm3_conn import K8sHelm3Connector
32 from n2vc.k8s_juju_conn import K8sJujuConnector
33
34 from osm_common.dbbase import DbException
35 from osm_common.fsbase import FsException
36
37 from n2vc.n2vc_juju_conn import N2VCJujuConnector
38 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
39
40 from osm_lcm.lcm_helm_conn import LCMHelmConn
41
42 from copy import copy, deepcopy
43 from http import HTTPStatus
44 from time import time
45 from uuid import uuid4
46
47 from random import randint
48
49 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
50
51
52 class NsLcm(LcmBase):
53 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
54 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
55 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
56 timeout_charm_delete = 10 * 60
57 timeout_primitive = 30 * 60 # timeout for primitive execution
58 timeout_progress_primitive = 10 * 60 # timeout for some progress in a primitive execution
59
60 SUBOPERATION_STATUS_NOT_FOUND = -1
61 SUBOPERATION_STATUS_NEW = -2
62 SUBOPERATION_STATUS_SKIP = -3
63 task_name_deploy_vca = "Deploying VCA"
64
65 def __init__(self, db, msg, fs, lcm_tasks, config, loop, prometheus=None):
66 """
67 Init, Connect to database, filesystem storage, and messaging
68 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
69 :return: None
70 """
71 super().__init__(
72 db=db,
73 msg=msg,
74 fs=fs,
75 logger=logging.getLogger('lcm.ns')
76 )
77
78 self.loop = loop
79 self.lcm_tasks = lcm_tasks
80 self.timeout = config["timeout"]
81 self.ro_config = config["ro_config"]
82 self.ng_ro = config["ro_config"].get("ng")
83 self.vca_config = config["VCA"].copy()
84
85 # create N2VC connector
86 self.n2vc = N2VCJujuConnector(
87 db=self.db,
88 fs=self.fs,
89 log=self.logger,
90 loop=self.loop,
91 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
92 username=self.vca_config.get('user', None),
93 vca_config=self.vca_config,
94 on_update_db=self._on_update_n2vc_db
95 )
96
97 self.conn_helm_ee = LCMHelmConn(
98 db=self.db,
99 fs=self.fs,
100 log=self.logger,
101 loop=self.loop,
102 url=None,
103 username=None,
104 vca_config=self.vca_config,
105 on_update_db=self._on_update_n2vc_db
106 )
107
108 self.k8sclusterhelm2 = K8sHelmConnector(
109 kubectl_command=self.vca_config.get("kubectlpath"),
110 helm_command=self.vca_config.get("helmpath"),
111 fs=self.fs,
112 log=self.logger,
113 db=self.db,
114 on_update_db=None,
115 )
116
117 self.k8sclusterhelm3 = K8sHelm3Connector(
118 kubectl_command=self.vca_config.get("kubectlpath"),
119 helm_command=self.vca_config.get("helm3path"),
120 fs=self.fs,
121 log=self.logger,
122 db=self.db,
123 on_update_db=None,
124 )
125
126 self.k8sclusterjuju = K8sJujuConnector(
127 kubectl_command=self.vca_config.get("kubectlpath"),
128 juju_command=self.vca_config.get("jujupath"),
129 fs=self.fs,
130 log=self.logger,
131 db=self.db,
132 loop=self.loop,
133 on_update_db=None,
134 vca_config=self.vca_config,
135 )
136
137 self.k8scluster_map = {
138 "helm-chart": self.k8sclusterhelm2,
139 "helm-chart-v3": self.k8sclusterhelm3,
140 "chart": self.k8sclusterhelm3,
141 "juju-bundle": self.k8sclusterjuju,
142 "juju": self.k8sclusterjuju,
143 }
144
145 self.vca_map = {
146 "lxc_proxy_charm": self.n2vc,
147 "native_charm": self.n2vc,
148 "k8s_proxy_charm": self.n2vc,
149 "helm": self.conn_helm_ee,
150 "helm-v3": self.conn_helm_ee
151 }
152
153 self.prometheus = prometheus
154
155 # create RO client
156 if self.ng_ro:
157 self.RO = NgRoClient(self.loop, **self.ro_config)
158 else:
159 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
160
161 @staticmethod
162 def increment_ip_mac(ip_mac, vm_index=1):
163 if not isinstance(ip_mac, str):
164 return ip_mac
165 try:
166 # try with ipv4 look for last dot
167 i = ip_mac.rfind(".")
168 if i > 0:
169 i += 1
170 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
171 # try with ipv6 or mac look for last colon. Operate in hex
172 i = ip_mac.rfind(":")
173 if i > 0:
174 i += 1
175 # format in hex, len can be 2 for mac or 4 for ipv6
176 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(ip_mac[:i], int(ip_mac[i:], 16) + vm_index)
177 except Exception:
178 pass
179 return None
180
181 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
182
183 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
184
185 try:
186 # TODO filter RO descriptor fields...
187
188 # write to database
189 db_dict = dict()
190 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
191 db_dict['deploymentStatus'] = ro_descriptor
192 self.update_db_2("nsrs", nsrs_id, db_dict)
193
194 except Exception as e:
195 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
196
197 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
198
199 # remove last dot from path (if exists)
200 if path.endswith('.'):
201 path = path[:-1]
202
203 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
204 # .format(table, filter, path, updated_data))
205
206 try:
207
208 nsr_id = filter.get('_id')
209
210 # read ns record from database
211 nsr = self.db.get_one(table='nsrs', q_filter=filter)
212 current_ns_status = nsr.get('nsState')
213
214 # get vca status for NS
215 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
216
217 # vcaStatus
218 db_dict = dict()
219 db_dict['vcaStatus'] = status_dict
220
221 # update configurationStatus for this VCA
222 try:
223 vca_index = int(path[path.rfind(".")+1:])
224
225 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
226 vca_status = vca_list[vca_index].get('status')
227
228 configuration_status_list = nsr.get('configurationStatus')
229 config_status = configuration_status_list[vca_index].get('status')
230
231 if config_status == 'BROKEN' and vca_status != 'failed':
232 db_dict['configurationStatus'][vca_index] = 'READY'
233 elif config_status != 'BROKEN' and vca_status == 'failed':
234 db_dict['configurationStatus'][vca_index] = 'BROKEN'
235 except Exception as e:
236 # not update configurationStatus
237 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
238
239 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
240 # if nsState = 'DEGRADED' check if all is OK
241 is_degraded = False
242 if current_ns_status in ('READY', 'DEGRADED'):
243 error_description = ''
244 # check machines
245 if status_dict.get('machines'):
246 for machine_id in status_dict.get('machines'):
247 machine = status_dict.get('machines').get(machine_id)
248 # check machine agent-status
249 if machine.get('agent-status'):
250 s = machine.get('agent-status').get('status')
251 if s != 'started':
252 is_degraded = True
253 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
254 # check machine instance status
255 if machine.get('instance-status'):
256 s = machine.get('instance-status').get('status')
257 if s != 'running':
258 is_degraded = True
259 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
260 # check applications
261 if status_dict.get('applications'):
262 for app_id in status_dict.get('applications'):
263 app = status_dict.get('applications').get(app_id)
264 # check application status
265 if app.get('status'):
266 s = app.get('status').get('status')
267 if s != 'active':
268 is_degraded = True
269 error_description += 'application {} status={} ; '.format(app_id, s)
270
271 if error_description:
272 db_dict['errorDescription'] = error_description
273 if current_ns_status == 'READY' and is_degraded:
274 db_dict['nsState'] = 'DEGRADED'
275 if current_ns_status == 'DEGRADED' and not is_degraded:
276 db_dict['nsState'] = 'READY'
277
278 # write to database
279 self.update_db_2("nsrs", nsr_id, db_dict)
280
281 except (asyncio.CancelledError, asyncio.TimeoutError):
282 raise
283 except Exception as e:
284 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
285
286 @staticmethod
287 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
288 try:
289 env = Environment(undefined=StrictUndefined)
290 template = env.from_string(cloud_init_text)
291 return template.render(additional_params or {})
292 except UndefinedError as e:
293 raise LcmException("Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
294 "file, must be provided in the instantiation parameters inside the "
295 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id))
296 except (TemplateError, TemplateNotFound) as e:
297 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
298 format(vnfd_id, vdu_id, e))
299
300 def _get_cloud_init(self, vdu, vnfd):
301 try:
302 cloud_init_content = cloud_init_file = None
303 if vdu.get("cloud-init-file"):
304 base_folder = vnfd["_admin"]["storage"]
305 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
306 vdu["cloud-init-file"])
307 with self.fs.file_open(cloud_init_file, "r") as ci_file:
308 cloud_init_content = ci_file.read()
309 elif vdu.get("cloud-init"):
310 cloud_init_content = vdu["cloud-init"]
311
312 return cloud_init_content
313 except FsException as e:
314 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
315 format(vnfd["id"], vdu["id"], cloud_init_file, e))
316
317 def _get_osm_params(self, db_vnfr, vdu_id=None, vdu_count_index=0):
318 osm_params = {x.replace("-", "_"): db_vnfr[x] for x in ("ip-address", "vim-account-id", "vnfd-id", "vnfd-ref")
319 if db_vnfr.get(x) is not None}
320 osm_params["ns_id"] = db_vnfr["nsr-id-ref"]
321 osm_params["vnf_id"] = db_vnfr["_id"]
322 osm_params["member_vnf_index"] = db_vnfr["member-vnf-index-ref"]
323 if db_vnfr.get("vdur"):
324 osm_params["vdu"] = {}
325 for vdur in db_vnfr["vdur"]:
326 vdu = {
327 "count_index": vdur["count-index"],
328 "vdu_id": vdur["vdu-id-ref"],
329 "interfaces": {}
330 }
331 if vdur.get("ip-address"):
332 vdu["ip_address"] = vdur["ip-address"]
333 for iface in vdur["interfaces"]:
334 vdu["interfaces"][iface["name"]] = \
335 {x.replace("-", "_"): iface[x] for x in ("mac-address", "ip-address", "vnf-vld-id", "name")
336 if iface.get(x) is not None}
337 vdu_id_index = "{}-{}".format(vdur["vdu-id-ref"], vdur["count-index"])
338 osm_params["vdu"][vdu_id_index] = vdu
339 if vdu_id:
340 osm_params["vdu_id"] = vdu_id
341 osm_params["count_index"] = vdu_count_index
342 return osm_params
343
344 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
345 vdur = next(vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"])
346 additional_params = vdur.get("additionalParams")
347 return self._format_additional_params(additional_params)
348
349 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
350 """
351 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
352 :param vnfd: input vnfd
353 :param new_id: overrides vnf id if provided
354 :param additionalParams: Instantiation params for VNFs provided
355 :param nsrId: Id of the NSR
356 :return: copy of vnfd
357 """
358 vnfd_RO = deepcopy(vnfd)
359 # remove unused by RO configuration, monitoring, scaling and internal keys
360 vnfd_RO.pop("_id", None)
361 vnfd_RO.pop("_admin", None)
362 vnfd_RO.pop("vnf-configuration", None)
363 vnfd_RO.pop("monitoring-param", None)
364 vnfd_RO.pop("scaling-group-descriptor", None)
365 vnfd_RO.pop("kdu", None)
366 vnfd_RO.pop("k8s-cluster", None)
367 if new_id:
368 vnfd_RO["id"] = new_id
369
370 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
371 for vdu in get_iterable(vnfd_RO, "vdu"):
372 vdu.pop("cloud-init-file", None)
373 vdu.pop("cloud-init", None)
374 return vnfd_RO
375
376 @staticmethod
377 def ip_profile_2_RO(ip_profile):
378 RO_ip_profile = deepcopy(ip_profile)
379 if "dns-server" in RO_ip_profile:
380 if isinstance(RO_ip_profile["dns-server"], list):
381 RO_ip_profile["dns-address"] = []
382 for ds in RO_ip_profile.pop("dns-server"):
383 RO_ip_profile["dns-address"].append(ds['address'])
384 else:
385 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
386 if RO_ip_profile.get("ip-version") == "ipv4":
387 RO_ip_profile["ip-version"] = "IPv4"
388 if RO_ip_profile.get("ip-version") == "ipv6":
389 RO_ip_profile["ip-version"] = "IPv6"
390 if "dhcp-params" in RO_ip_profile:
391 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
392 return RO_ip_profile
393
394 def _ns_params_2_RO(self, ns_params, nsd, vnfd_dict, db_vnfrs, n2vc_key_list):
395 """
396 Creates a RO ns descriptor from OSM ns_instantiate params
397 :param ns_params: OSM instantiate params
398 :param vnfd_dict: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
399 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index. {member-vnf-index: {vnfr_object}, ...}
400 :return: The RO ns descriptor
401 """
402 vim_2_RO = {}
403 wim_2_RO = {}
404 # TODO feature 1417: Check that no instantiation is set over PDU
405 # check if PDU forces a concrete vim-network-id and add it
406 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
407
408 def vim_account_2_RO(vim_account):
409 if vim_account in vim_2_RO:
410 return vim_2_RO[vim_account]
411
412 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
413 if db_vim["_admin"]["operationalState"] != "ENABLED":
414 raise LcmException("VIM={} is not available. operationalState={}".format(
415 vim_account, db_vim["_admin"]["operationalState"]))
416 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
417 vim_2_RO[vim_account] = RO_vim_id
418 return RO_vim_id
419
420 def wim_account_2_RO(wim_account):
421 if isinstance(wim_account, str):
422 if wim_account in wim_2_RO:
423 return wim_2_RO[wim_account]
424
425 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
426 if db_wim["_admin"]["operationalState"] != "ENABLED":
427 raise LcmException("WIM={} is not available. operationalState={}".format(
428 wim_account, db_wim["_admin"]["operationalState"]))
429 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
430 wim_2_RO[wim_account] = RO_wim_id
431 return RO_wim_id
432 else:
433 return wim_account
434
435 if not ns_params:
436 return None
437 RO_ns_params = {
438 # "name": ns_params["nsName"],
439 # "description": ns_params.get("nsDescription"),
440 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
441 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
442 # "scenario": ns_params["nsdId"],
443 }
444 # set vim_account of each vnf if different from general vim_account.
445 # Get this information from <vnfr> database content, key vim-account-id
446 # Vim account can be set by placement_engine and it may be different from
447 # the instantiate parameters (vnfs.member-vnf-index.datacenter).
448 for vnf_index, vnfr in db_vnfrs.items():
449 if vnfr.get("vim-account-id") and vnfr["vim-account-id"] != ns_params["vimAccountId"]:
450 populate_dict(RO_ns_params, ("vnfs", vnf_index, "datacenter"), vim_account_2_RO(vnfr["vim-account-id"]))
451
452 n2vc_key_list = n2vc_key_list or []
453 for vnfd_ref, vnfd in vnfd_dict.items():
454 vdu_needed_access = []
455 mgmt_cp = None
456 if vnfd.get("vnf-configuration"):
457 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
458 if ssh_required and vnfd.get("mgmt-interface"):
459 if vnfd["mgmt-interface"].get("vdu-id"):
460 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
461 elif vnfd["mgmt-interface"].get("cp"):
462 mgmt_cp = vnfd["mgmt-interface"]["cp"]
463
464 for vdu in vnfd.get("vdu", ()):
465 if vdu.get("vdu-configuration"):
466 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
467 if ssh_required:
468 vdu_needed_access.append(vdu["id"])
469 elif mgmt_cp:
470 for vdu_interface in vdu.get("interface"):
471 if vdu_interface.get("external-connection-point-ref") and \
472 vdu_interface["external-connection-point-ref"] == mgmt_cp:
473 vdu_needed_access.append(vdu["id"])
474 mgmt_cp = None
475 break
476
477 if vdu_needed_access:
478 for vnf_member in nsd.get("constituent-vnfd"):
479 if vnf_member["vnfd-id-ref"] != vnfd_ref:
480 continue
481 for vdu in vdu_needed_access:
482 populate_dict(RO_ns_params,
483 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
484 n2vc_key_list)
485 # cloud init
486 for vdu in get_iterable(vnfd, "vdu"):
487 cloud_init_text = self._get_cloud_init(vdu, vnfd)
488 if not cloud_init_text:
489 continue
490 for vnf_member in nsd.get("constituent-vnfd"):
491 if vnf_member["vnfd-id-ref"] != vnfd_ref:
492 continue
493 db_vnfr = db_vnfrs[vnf_member["member-vnf-index"]]
494 additional_params = self._get_vdu_additional_params(db_vnfr, vdu["id"]) or {}
495
496 cloud_init_list = []
497 for vdu_index in range(0, int(vdu.get("count", 1))):
498 additional_params["OSM"] = self._get_osm_params(db_vnfr, vdu["id"], vdu_index)
499 cloud_init_list.append(self._parse_cloud_init(cloud_init_text, additional_params, vnfd["id"],
500 vdu["id"]))
501 populate_dict(RO_ns_params,
502 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu["id"], "cloud_init"),
503 cloud_init_list)
504
505 if ns_params.get("vduImage"):
506 RO_ns_params["vduImage"] = ns_params["vduImage"]
507
508 if ns_params.get("ssh_keys"):
509 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
510 for vnf_params in get_iterable(ns_params, "vnf"):
511 for constituent_vnfd in nsd["constituent-vnfd"]:
512 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
513 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
514 break
515 else:
516 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
517 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
518
519 for vdu_params in get_iterable(vnf_params, "vdu"):
520 # TODO feature 1417: check that this VDU exist and it is not a PDU
521 if vdu_params.get("volume"):
522 for volume_params in vdu_params["volume"]:
523 if volume_params.get("vim-volume-id"):
524 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
525 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
526 volume_params["vim-volume-id"])
527 if vdu_params.get("interface"):
528 for interface_params in vdu_params["interface"]:
529 if interface_params.get("ip-address"):
530 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
531 vdu_params["id"], "interfaces", interface_params["name"],
532 "ip_address"),
533 interface_params["ip-address"])
534 if interface_params.get("mac-address"):
535 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
536 vdu_params["id"], "interfaces", interface_params["name"],
537 "mac_address"),
538 interface_params["mac-address"])
539 if interface_params.get("floating-ip-required"):
540 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
541 vdu_params["id"], "interfaces", interface_params["name"],
542 "floating-ip"),
543 interface_params["floating-ip-required"])
544
545 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
546 if internal_vld_params.get("vim-network-name"):
547 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
548 internal_vld_params["name"], "vim-network-name"),
549 internal_vld_params["vim-network-name"])
550 if internal_vld_params.get("vim-network-id"):
551 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
552 internal_vld_params["name"], "vim-network-id"),
553 internal_vld_params["vim-network-id"])
554 if internal_vld_params.get("ip-profile"):
555 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
556 internal_vld_params["name"], "ip-profile"),
557 self.ip_profile_2_RO(internal_vld_params["ip-profile"]))
558 if internal_vld_params.get("provider-network"):
559
560 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
561 internal_vld_params["name"], "provider-network"),
562 internal_vld_params["provider-network"].copy())
563
564 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
565 # look for interface
566 iface_found = False
567 for vdu_descriptor in vnf_descriptor["vdu"]:
568 for vdu_interface in vdu_descriptor["interface"]:
569 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
570 if icp_params.get("ip-address"):
571 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
572 vdu_descriptor["id"], "interfaces",
573 vdu_interface["name"], "ip_address"),
574 icp_params["ip-address"])
575
576 if icp_params.get("mac-address"):
577 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
578 vdu_descriptor["id"], "interfaces",
579 vdu_interface["name"], "mac_address"),
580 icp_params["mac-address"])
581 iface_found = True
582 break
583 if iface_found:
584 break
585 else:
586 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
587 "internal-vld:id-ref={} is not present at vnfd:internal-"
588 "connection-point".format(vnf_params["member-vnf-index"],
589 icp_params["id-ref"]))
590
591 for vld_params in get_iterable(ns_params, "vld"):
592 if "ip-profile" in vld_params:
593 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
594 self.ip_profile_2_RO(vld_params["ip-profile"]))
595
596 if vld_params.get("provider-network"):
597
598 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
599 vld_params["provider-network"].copy())
600
601 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
602 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
603 wim_account_2_RO(vld_params["wimAccountId"])),
604 if vld_params.get("vim-network-name"):
605 RO_vld_sites = []
606 if isinstance(vld_params["vim-network-name"], dict):
607 for vim_account, vim_net in vld_params["vim-network-name"].items():
608 RO_vld_sites.append({
609 "netmap-use": vim_net,
610 "datacenter": vim_account_2_RO(vim_account)
611 })
612 else: # isinstance str
613 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
614 if RO_vld_sites:
615 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
616
617 if vld_params.get("vim-network-id"):
618 RO_vld_sites = []
619 if isinstance(vld_params["vim-network-id"], dict):
620 for vim_account, vim_net in vld_params["vim-network-id"].items():
621 RO_vld_sites.append({
622 "netmap-use": vim_net,
623 "datacenter": vim_account_2_RO(vim_account)
624 })
625 else: # isinstance str
626 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
627 if RO_vld_sites:
628 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
629 if vld_params.get("ns-net"):
630 if isinstance(vld_params["ns-net"], dict):
631 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
632 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
633 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
634 if "vnfd-connection-point-ref" in vld_params:
635 for cp_params in vld_params["vnfd-connection-point-ref"]:
636 # look for interface
637 for constituent_vnfd in nsd["constituent-vnfd"]:
638 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
639 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
640 break
641 else:
642 raise LcmException(
643 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
644 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
645 match_cp = False
646 for vdu_descriptor in vnf_descriptor["vdu"]:
647 for interface_descriptor in vdu_descriptor["interface"]:
648 if interface_descriptor.get("external-connection-point-ref") == \
649 cp_params["vnfd-connection-point-ref"]:
650 match_cp = True
651 break
652 if match_cp:
653 break
654 else:
655 raise LcmException(
656 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
657 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
658 cp_params["member-vnf-index-ref"],
659 cp_params["vnfd-connection-point-ref"],
660 vnf_descriptor["id"]))
661 if cp_params.get("ip-address"):
662 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
663 vdu_descriptor["id"], "interfaces",
664 interface_descriptor["name"], "ip_address"),
665 cp_params["ip-address"])
666 if cp_params.get("mac-address"):
667 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
668 vdu_descriptor["id"], "interfaces",
669 interface_descriptor["name"], "mac_address"),
670 cp_params["mac-address"])
671 return RO_ns_params
672
673 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
674
675 db_vdu_push_list = []
676 db_update = {"_admin.modified": time()}
677 if vdu_create:
678 for vdu_id, vdu_count in vdu_create.items():
679 vdur = next((vdur for vdur in reversed(db_vnfr["vdur"]) if vdur["vdu-id-ref"] == vdu_id), None)
680 if not vdur:
681 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".
682 format(vdu_id))
683
684 for count in range(vdu_count):
685 vdur_copy = deepcopy(vdur)
686 vdur_copy["status"] = "BUILD"
687 vdur_copy["status-detailed"] = None
688 vdur_copy["ip-address"]: None
689 vdur_copy["_id"] = str(uuid4())
690 vdur_copy["count-index"] += count + 1
691 vdur_copy["id"] = "{}-{}".format(vdur_copy["vdu-id-ref"], vdur_copy["count-index"])
692 vdur_copy.pop("vim_info", None)
693 for iface in vdur_copy["interfaces"]:
694 if iface.get("fixed-ip"):
695 iface["ip-address"] = self.increment_ip_mac(iface["ip-address"], count+1)
696 else:
697 iface.pop("ip-address", None)
698 if iface.get("fixed-mac"):
699 iface["mac-address"] = self.increment_ip_mac(iface["mac-address"], count+1)
700 else:
701 iface.pop("mac-address", None)
702 iface.pop("mgmt_vnf", None) # only first vdu can be managment of vnf
703 db_vdu_push_list.append(vdur_copy)
704 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
705 if vdu_delete:
706 for vdu_id, vdu_count in vdu_delete.items():
707 if mark_delete:
708 indexes_to_delete = [iv[0] for iv in enumerate(db_vnfr["vdur"]) if iv[1]["vdu-id-ref"] == vdu_id]
709 db_update.update({"vdur.{}.status".format(i): "DELETING" for i in indexes_to_delete[-vdu_count:]})
710 else:
711 # it must be deleted one by one because common.db does not allow otherwise
712 vdus_to_delete = [v for v in reversed(db_vnfr["vdur"]) if v["vdu-id-ref"] == vdu_id]
713 for vdu in vdus_to_delete[:vdu_count]:
714 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, None, pull={"vdur": {"_id": vdu["_id"]}})
715 db_push = {"vdur": db_vdu_push_list} if db_vdu_push_list else None
716 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
717 # modify passed dictionary db_vnfr
718 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
719 db_vnfr["vdur"] = db_vnfr_["vdur"]
720
721 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
722 """
723 Updates database nsr with the RO info for the created vld
724 :param ns_update_nsr: dictionary to be filled with the updated info
725 :param db_nsr: content of db_nsr. This is also modified
726 :param nsr_desc_RO: nsr descriptor from RO
727 :return: Nothing, LcmException is raised on errors
728 """
729
730 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
731 for net_RO in get_iterable(nsr_desc_RO, "nets"):
732 if vld["id"] != net_RO.get("ns_net_osm_id"):
733 continue
734 vld["vim-id"] = net_RO.get("vim_net_id")
735 vld["name"] = net_RO.get("vim_name")
736 vld["status"] = net_RO.get("status")
737 vld["status-detailed"] = net_RO.get("error_msg")
738 ns_update_nsr["vld.{}".format(vld_index)] = vld
739 break
740 else:
741 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
742
743 def set_vnfr_at_error(self, db_vnfrs, error_text):
744 try:
745 for db_vnfr in db_vnfrs.values():
746 vnfr_update = {"status": "ERROR"}
747 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
748 if "status" not in vdur:
749 vdur["status"] = "ERROR"
750 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
751 if error_text:
752 vdur["status-detailed"] = str(error_text)
753 vnfr_update["vdur.{}.status-detailed".format(vdu_index)] = "ERROR"
754 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
755 except DbException as e:
756 self.logger.error("Cannot update vnf. {}".format(e))
757
758 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
759 """
760 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
761 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
762 :param nsr_desc_RO: nsr descriptor from RO
763 :return: Nothing, LcmException is raised on errors
764 """
765 for vnf_index, db_vnfr in db_vnfrs.items():
766 for vnf_RO in nsr_desc_RO["vnfs"]:
767 if vnf_RO["member_vnf_index"] != vnf_index:
768 continue
769 vnfr_update = {}
770 if vnf_RO.get("ip_address"):
771 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
772 elif not db_vnfr.get("ip-address"):
773 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
774 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
775
776 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
777 vdur_RO_count_index = 0
778 if vdur.get("pdu-type"):
779 continue
780 for vdur_RO in get_iterable(vnf_RO, "vms"):
781 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
782 continue
783 if vdur["count-index"] != vdur_RO_count_index:
784 vdur_RO_count_index += 1
785 continue
786 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
787 if vdur_RO.get("ip_address"):
788 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
789 else:
790 vdur["ip-address"] = None
791 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
792 vdur["name"] = vdur_RO.get("vim_name")
793 vdur["status"] = vdur_RO.get("status")
794 vdur["status-detailed"] = vdur_RO.get("error_msg")
795 for ifacer in get_iterable(vdur, "interfaces"):
796 for interface_RO in get_iterable(vdur_RO, "interfaces"):
797 if ifacer["name"] == interface_RO.get("internal_name"):
798 ifacer["ip-address"] = interface_RO.get("ip_address")
799 ifacer["mac-address"] = interface_RO.get("mac_address")
800 break
801 else:
802 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
803 "from VIM info"
804 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
805 vnfr_update["vdur.{}".format(vdu_index)] = vdur
806 break
807 else:
808 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
809 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
810
811 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
812 for net_RO in get_iterable(nsr_desc_RO, "nets"):
813 if vld["id"] != net_RO.get("vnf_net_osm_id"):
814 continue
815 vld["vim-id"] = net_RO.get("vim_net_id")
816 vld["name"] = net_RO.get("vim_name")
817 vld["status"] = net_RO.get("status")
818 vld["status-detailed"] = net_RO.get("error_msg")
819 vnfr_update["vld.{}".format(vld_index)] = vld
820 break
821 else:
822 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
823 vnf_index, vld["id"]))
824
825 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
826 break
827
828 else:
829 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
830
831 def _get_ns_config_info(self, nsr_id):
832 """
833 Generates a mapping between vnf,vdu elements and the N2VC id
834 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
835 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
836 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
837 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
838 """
839 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
840 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
841 mapping = {}
842 ns_config_info = {"osm-config-mapping": mapping}
843 for vca in vca_deployed_list:
844 if not vca["member-vnf-index"]:
845 continue
846 if not vca["vdu_id"]:
847 mapping[vca["member-vnf-index"]] = vca["application"]
848 else:
849 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
850 vca["application"]
851 return ns_config_info
852
853 @staticmethod
854 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed, ee_descriptor_id):
855 """
856 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
857 primitives as verify-ssh-credentials, or config when needed
858 :param desc_primitive_list: information of the descriptor
859 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
860 this element contains a ssh public key
861 :param ee_descriptor_id: execution environment descriptor id. It is the value of
862 XXX_configuration.execution-environment-list.INDEX.id; it can be None
863 :return: The modified list. Can ba an empty list, but always a list
864 """
865
866 primitive_list = desc_primitive_list or []
867
868 # filter primitives by ee_id
869 primitive_list = [p for p in primitive_list if p.get("execution-environment-ref") == ee_descriptor_id]
870
871 # sort by 'seq'
872 if primitive_list:
873 primitive_list.sort(key=lambda val: int(val['seq']))
874
875 # look for primitive config, and get the position. None if not present
876 config_position = None
877 for index, primitive in enumerate(primitive_list):
878 if primitive["name"] == "config":
879 config_position = index
880 break
881
882 # for NS, add always a config primitive if not present (bug 874)
883 if not vca_deployed["member-vnf-index"] and config_position is None:
884 primitive_list.insert(0, {"name": "config", "parameter": []})
885 config_position = 0
886 # TODO revise if needed: for VNF/VDU add verify-ssh-credentials after config
887 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
888 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
889 return primitive_list
890
891 async def _instantiate_ng_ro(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
892 n2vc_key_list, stage, start_deploy, timeout_ns_deploy):
893
894 db_vims = {}
895
896 def get_vim_account(vim_account_id):
897 nonlocal db_vims
898 if vim_account_id in db_vims:
899 return db_vims[vim_account_id]
900 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
901 db_vims[vim_account_id] = db_vim
902 return db_vim
903
904 # modify target_vld info with instantiation parameters
905 def parse_vld_instantiation_params(target_vim, target_vld, vld_params, target_sdn):
906 if vld_params.get("ip-profile"):
907 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params["ip-profile"]
908 if vld_params.get("provider-network"):
909 target_vld["vim_info"][target_vim]["provider_network"] = vld_params["provider-network"]
910 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
911 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params["provider-network"]["sdn-ports"]
912 if vld_params.get("wimAccountId"):
913 target_wim = "wim:{}".format(vld_params["wimAccountId"])
914 target_vld["vim_info"][target_wim] = {}
915 for param in ("vim-network-name", "vim-network-id"):
916 if vld_params.get(param):
917 if isinstance(vld_params[param], dict):
918 pass
919 # for vim_account, vim_net in vld_params[param].items():
920 # TODO populate vim_info RO_vld_sites.append({
921 else: # isinstance str
922 target_vld["vim_info"][target_vim][param.replace("-", "_")] = vld_params[param]
923 # TODO if vld_params.get("ns-net"):
924
925 nslcmop_id = db_nslcmop["_id"]
926 target = {
927 "name": db_nsr["name"],
928 "ns": {"vld": []},
929 "vnf": [],
930 "image": deepcopy(db_nsr["image"]),
931 "flavor": deepcopy(db_nsr["flavor"]),
932 "action_id": nslcmop_id,
933 "cloud_init_content": {},
934 }
935 for image in target["image"]:
936 image["vim_info"] = {}
937 for flavor in target["flavor"]:
938 flavor["vim_info"] = {}
939
940 if db_nslcmop.get("lcmOperationType") != "instantiate":
941 # get parameters of instantiation:
942 db_nslcmop_instantiate = self.db.get_list("nslcmops", {"nsInstanceId": db_nslcmop["nsInstanceId"],
943 "lcmOperationType": "instantiate"})[-1]
944 ns_params = db_nslcmop_instantiate.get("operationParams")
945 else:
946 ns_params = db_nslcmop.get("operationParams")
947 ssh_keys = []
948 if ns_params.get("ssh_keys"):
949 ssh_keys += ns_params.get("ssh_keys")
950 if n2vc_key_list:
951 ssh_keys += n2vc_key_list
952
953 cp2target = {}
954 for vld_index, vld in enumerate(db_nsr.get("vld")):
955 target_vim = "vim:{}".format(ns_params["vimAccountId"])
956 target_vld = {
957 "id": vld["id"],
958 "name": vld["name"],
959 "mgmt-network": vld.get("mgmt-network", False),
960 "type": vld.get("type"),
961 "vim_info": {
962 target_vim: {"vim-network-name": vld.get("vim-network-name")}
963 }
964 }
965 # check if this network needs SDN assist
966 target_sdn = None
967 if vld.get("pci-interfaces"):
968 db_vim = get_vim_account(ns_params["vimAccountId"])
969 sdnc_id = db_vim["config"].get("sdn-controller")
970 if sdnc_id:
971 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
972 target_sdn = "sdn:{}".format(sdnc_id)
973 target_vld["vim_info"][target_sdn] = {
974 "sdn": True, "target_vim": target_vim, "vlds": [sdn_vld], "type": vld.get("type")}
975
976 nsd_vld = next(v for v in nsd["vld"] if v["id"] == vld["id"])
977 for cp in nsd_vld["vnfd-connection-point-ref"]:
978 cp2target["member_vnf:{}.{}".format(cp["member-vnf-index-ref"], cp["vnfd-connection-point-ref"])] = \
979 "nsrs:{}:vld.{}".format(nsr_id, vld_index)
980
981 # check at nsd descriptor, if there is an ip-profile
982 vld_params = {}
983 if nsd_vld.get("ip-profile-ref"):
984 ip_profile = next(ipp for ipp in nsd["ip-profiles"] if ipp["name"] == nsd_vld["ip-profile-ref"])
985 vld_params["ip-profile"] = ip_profile["ip-profile-params"]
986 # update vld_params with instantiation params
987 vld_instantiation_params = next((v for v in get_iterable(ns_params, "vld")
988 if v["name"] in (vld["name"], vld["id"])), None)
989 if vld_instantiation_params:
990 vld_params.update(vld_instantiation_params)
991 parse_vld_instantiation_params(target_vim, target_vld, vld_params, target_sdn)
992 target["ns"]["vld"].append(target_vld)
993 for vnfr in db_vnfrs.values():
994 vnfd = db_vnfds_ref[vnfr["vnfd-ref"]]
995 vnf_params = next((v for v in get_iterable(ns_params, "vnf")
996 if v["member-vnf-index"] == vnfr["member-vnf-index-ref"]), None)
997 target_vnf = deepcopy(vnfr)
998 target_vim = "vim:{}".format(vnfr["vim-account-id"])
999 for vld in target_vnf.get("vld", ()):
1000 # check if connected to a ns.vld, to fill target'
1001 vnf_cp = next((cp for cp in vnfd.get("connection-point", ()) if
1002 cp.get("internal-vld-ref") == vld["id"]), None)
1003 if vnf_cp:
1004 ns_cp = "member_vnf:{}.{}".format(vnfr["member-vnf-index-ref"], vnf_cp["id"])
1005 if cp2target.get(ns_cp):
1006 vld["target"] = cp2target[ns_cp]
1007 vld["vim_info"] = {target_vim: {"vim-network-name": vld.get("vim-network-name")}}
1008 # check if this network needs SDN assist
1009 target_sdn = None
1010 if vld.get("pci-interfaces"):
1011 db_vim = get_vim_account(vnfr["vim-account-id"])
1012 sdnc_id = db_vim["config"].get("sdn-controller")
1013 if sdnc_id:
1014 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
1015 target_sdn = "sdn:{}".format(sdnc_id)
1016 vld["vim_info"][target_sdn] = {
1017 "sdn": True, "target_vim": target_vim, "vlds": [sdn_vld], "type": vld.get("type")}
1018
1019 # check at vnfd descriptor, if there is an ip-profile
1020 vld_params = {}
1021 vnfd_vld = next(v for v in vnfd["internal-vld"] if v["id"] == vld["id"])
1022 if vnfd_vld.get("ip-profile-ref"):
1023 ip_profile = next(ipp for ipp in vnfd["ip-profiles"] if ipp["name"] == vnfd_vld["ip-profile-ref"])
1024 vld_params["ip-profile"] = ip_profile["ip-profile-params"]
1025 # update vld_params with instantiation params
1026 if vnf_params:
1027 vld_instantiation_params = next((v for v in get_iterable(vnf_params, "internal-vld")
1028 if v["name"] == vld["id"]), None)
1029 if vld_instantiation_params:
1030 vld_params.update(vld_instantiation_params)
1031 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
1032
1033 vdur_list = []
1034 for vdur in target_vnf.get("vdur", ()):
1035 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
1036 continue # This vdu must not be created
1037 vdur["vim_info"] = {target_vim: {}}
1038 vdud_index, vdud = next(k for k in enumerate(vnfd["vdu"]) if k[1]["id"] == vdur["vdu-id-ref"])
1039
1040 if ssh_keys:
1041 if deep_get(vdud, ("vdu-configuration", "config-access", "ssh-access", "required")):
1042 vdur["ssh-keys"] = ssh_keys
1043 vdur["ssh-access-required"] = True
1044 elif deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required")) and \
1045 any(iface.get("mgmt-vnf") for iface in vdur["interfaces"]):
1046 vdur["ssh-keys"] = ssh_keys
1047 vdur["ssh-access-required"] = True
1048
1049 # cloud-init
1050 if vdud.get("cloud-init-file"):
1051 vdur["cloud-init"] = "{}:file:{}".format(vnfd["_id"], vdud.get("cloud-init-file"))
1052 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1053 if vdur["cloud-init"] not in target["cloud_init_content"]:
1054 base_folder = vnfd["_admin"]["storage"]
1055 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
1056 vdud.get("cloud-init-file"))
1057 with self.fs.file_open(cloud_init_file, "r") as ci_file:
1058 target["cloud_init_content"][vdur["cloud-init"]] = ci_file.read()
1059 elif vdud.get("cloud-init"):
1060 vdur["cloud-init"] = "{}:vdu:{}".format(vnfd["_id"], vdud_index)
1061 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1062 target["cloud_init_content"][vdur["cloud-init"]] = vdud["cloud-init"]
1063 vdur["additionalParams"] = vdur.get("additionalParams") or {}
1064 deploy_params_vdu = self._format_additional_params(vdur.get("additionalParams") or {})
1065 deploy_params_vdu["OSM"] = self._get_osm_params(vnfr, vdur["vdu-id-ref"], vdur["count-index"])
1066 vdur["additionalParams"] = deploy_params_vdu
1067
1068 # flavor
1069 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
1070 if target_vim not in ns_flavor["vim_info"]:
1071 ns_flavor["vim_info"][target_vim] = {}
1072 # image
1073 ns_image = target["image"][int(vdur["ns-image-id"])]
1074 if target_vim not in ns_image["vim_info"]:
1075 ns_image["vim_info"][target_vim] = {}
1076
1077 vdur["vim_info"] = {target_vim: {}}
1078 # instantiation parameters
1079 # if vnf_params:
1080 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1081 # vdud["id"]), None)
1082 vdur_list.append(vdur)
1083 target_vnf["vdur"] = vdur_list
1084 target["vnf"].append(target_vnf)
1085
1086 desc = await self.RO.deploy(nsr_id, target)
1087 action_id = desc["action_id"]
1088 await self._wait_ng_ro(nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage)
1089
1090 # Updating NSR
1091 db_nsr_update = {
1092 "_admin.deployed.RO.operational-status": "running",
1093 "detailed-status": " ".join(stage)
1094 }
1095 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1096 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1097 self._write_op_status(nslcmop_id, stage)
1098 self.logger.debug(logging_text + "ns deployed at RO. RO_id={}".format(action_id))
1099 return
1100
1101 async def _wait_ng_ro(self, nsr_id, action_id, nslcmop_id=None, start_time=None, timeout=600, stage=None):
1102 detailed_status_old = None
1103 db_nsr_update = {}
1104 start_time = start_time or time()
1105 while time() <= start_time + timeout:
1106 desc_status = await self.RO.status(nsr_id, action_id)
1107 if desc_status["status"] == "FAILED":
1108 raise NgRoException(desc_status["details"])
1109 elif desc_status["status"] == "BUILD":
1110 if stage:
1111 stage[2] = "VIM: ({})".format(desc_status["details"])
1112 elif desc_status["status"] == "DONE":
1113 if stage:
1114 stage[2] = "Deployed at VIM"
1115 break
1116 else:
1117 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status["status"])
1118 if stage and nslcmop_id and stage[2] != detailed_status_old:
1119 detailed_status_old = stage[2]
1120 db_nsr_update["detailed-status"] = " ".join(stage)
1121 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1122 self._write_op_status(nslcmop_id, stage)
1123 await asyncio.sleep(5, loop=self.loop)
1124 else: # timeout_ns_deploy
1125 raise NgRoException("Timeout waiting ns to deploy")
1126
1127 async def _terminate_ng_ro(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
1128 db_nsr_update = {}
1129 failed_detail = []
1130 action_id = None
1131 start_deploy = time()
1132 try:
1133 target = {
1134 "ns": {"vld": []},
1135 "vnf": [],
1136 "image": [],
1137 "flavor": [],
1138 "action_id": nslcmop_id
1139 }
1140 desc = await self.RO.deploy(nsr_id, target)
1141 action_id = desc["action_id"]
1142 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1143 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1144 self.logger.debug(logging_text + "ns terminate action at RO. action_id={}".format(action_id))
1145
1146 # wait until done
1147 delete_timeout = 20 * 60 # 20 minutes
1148 await self._wait_ng_ro(nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage)
1149
1150 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1151 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1152 # delete all nsr
1153 await self.RO.delete(nsr_id)
1154 except Exception as e:
1155 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1156 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1157 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1158 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1159 self.logger.debug(logging_text + "RO_action_id={} already deleted".format(action_id))
1160 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1161 failed_detail.append("delete conflict: {}".format(e))
1162 self.logger.debug(logging_text + "RO_action_id={} delete conflict: {}".format(action_id, e))
1163 else:
1164 failed_detail.append("delete error: {}".format(e))
1165 self.logger.error(logging_text + "RO_action_id={} delete error: {}".format(action_id, e))
1166
1167 if failed_detail:
1168 stage[2] = "Error deleting from VIM"
1169 else:
1170 stage[2] = "Deleted from VIM"
1171 db_nsr_update["detailed-status"] = " ".join(stage)
1172 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1173 self._write_op_status(nslcmop_id, stage)
1174
1175 if failed_detail:
1176 raise LcmException("; ".join(failed_detail))
1177 return
1178
1179 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
1180 n2vc_key_list, stage):
1181 """
1182 Instantiate at RO
1183 :param logging_text: preffix text to use at logging
1184 :param nsr_id: nsr identity
1185 :param nsd: database content of ns descriptor
1186 :param db_nsr: database content of ns record
1187 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1188 :param db_vnfrs:
1189 :param db_vnfds_ref: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1190 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1191 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1192 :return: None or exception
1193 """
1194 try:
1195 db_nsr_update = {}
1196 RO_descriptor_number = 0 # number of descriptors created at RO
1197 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
1198 nslcmop_id = db_nslcmop["_id"]
1199 start_deploy = time()
1200 ns_params = db_nslcmop.get("operationParams")
1201 if ns_params and ns_params.get("timeout_ns_deploy"):
1202 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1203 else:
1204 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1205
1206 # Check for and optionally request placement optimization. Database will be updated if placement activated
1207 stage[2] = "Waiting for Placement."
1208 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1209 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1210 for vnfr in db_vnfrs.values():
1211 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1212 break
1213 else:
1214 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1215
1216 if self.ng_ro:
1217 return await self._instantiate_ng_ro(logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs,
1218 db_vnfds_ref, n2vc_key_list, stage, start_deploy,
1219 timeout_ns_deploy)
1220 # deploy RO
1221 # get vnfds, instantiate at RO
1222 for c_vnf in nsd.get("constituent-vnfd", ()):
1223 member_vnf_index = c_vnf["member-vnf-index"]
1224 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
1225 vnfd_ref = vnfd["id"]
1226
1227 stage[2] = "Creating vnfd='{}' member_vnf_index='{}' at RO".format(vnfd_ref, member_vnf_index)
1228 db_nsr_update["detailed-status"] = " ".join(stage)
1229 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1230 self._write_op_status(nslcmop_id, stage)
1231
1232 # self.logger.debug(logging_text + stage[2])
1233 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
1234 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
1235 RO_descriptor_number += 1
1236
1237 # look position at deployed.RO.vnfd if not present it will be appended at the end
1238 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
1239 if vnf_deployed["member-vnf-index"] == member_vnf_index:
1240 break
1241 else:
1242 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
1243 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
1244
1245 # look if present
1246 RO_update = {"member-vnf-index": member_vnf_index}
1247 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
1248 if vnfd_list:
1249 RO_update["id"] = vnfd_list[0]["uuid"]
1250 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
1251 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
1252 else:
1253 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
1254 get("additionalParamsForVnf"), nsr_id)
1255 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
1256 RO_update["id"] = desc["uuid"]
1257 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
1258 vnfd_ref, member_vnf_index, desc["uuid"]))
1259 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
1260 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
1261
1262 # create nsd at RO
1263 nsd_ref = nsd["id"]
1264
1265 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
1266 db_nsr_update["detailed-status"] = " ".join(stage)
1267 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1268 self._write_op_status(nslcmop_id, stage)
1269
1270 # self.logger.debug(logging_text + stage[2])
1271 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
1272 RO_descriptor_number += 1
1273 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
1274 if nsd_list:
1275 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
1276 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
1277 nsd_ref, RO_nsd_uuid))
1278 else:
1279 nsd_RO = deepcopy(nsd)
1280 nsd_RO["id"] = RO_osm_nsd_id
1281 nsd_RO.pop("_id", None)
1282 nsd_RO.pop("_admin", None)
1283 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
1284 member_vnf_index = c_vnf["member-vnf-index"]
1285 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
1286 for c_vld in nsd_RO.get("vld", ()):
1287 for cp in c_vld.get("vnfd-connection-point-ref", ()):
1288 member_vnf_index = cp["member-vnf-index-ref"]
1289 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
1290
1291 desc = await self.RO.create("nsd", descriptor=nsd_RO)
1292 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1293 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
1294 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
1295 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1296
1297 # Crate ns at RO
1298 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
1299 db_nsr_update["detailed-status"] = " ".join(stage)
1300 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1301 self._write_op_status(nslcmop_id, stage)
1302
1303 # if present use it unless in error status
1304 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
1305 if RO_nsr_id:
1306 try:
1307 stage[2] = "Looking for existing ns at RO"
1308 db_nsr_update["detailed-status"] = " ".join(stage)
1309 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1310 self._write_op_status(nslcmop_id, stage)
1311 # self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
1312 desc = await self.RO.show("ns", RO_nsr_id)
1313
1314 except ROclient.ROClientException as e:
1315 if e.http_code != HTTPStatus.NOT_FOUND:
1316 raise
1317 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1318 if RO_nsr_id:
1319 ns_status, ns_status_info = self.RO.check_ns_status(desc)
1320 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
1321 if ns_status == "ERROR":
1322 stage[2] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
1323 self.logger.debug(logging_text + stage[2])
1324 await self.RO.delete("ns", RO_nsr_id)
1325 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1326 if not RO_nsr_id:
1327 stage[2] = "Checking dependencies"
1328 db_nsr_update["detailed-status"] = " ".join(stage)
1329 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1330 self._write_op_status(nslcmop_id, stage)
1331 # self.logger.debug(logging_text + stage[2])
1332
1333 # check if VIM is creating and wait look if previous tasks in process
1334 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
1335 if task_dependency:
1336 stage[2] = "Waiting for related tasks '{}' to be completed".format(task_name)
1337 self.logger.debug(logging_text + stage[2])
1338 await asyncio.wait(task_dependency, timeout=3600)
1339 if ns_params.get("vnf"):
1340 for vnf in ns_params["vnf"]:
1341 if "vimAccountId" in vnf:
1342 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
1343 vnf["vimAccountId"])
1344 if task_dependency:
1345 stage[2] = "Waiting for related tasks '{}' to be completed.".format(task_name)
1346 self.logger.debug(logging_text + stage[2])
1347 await asyncio.wait(task_dependency, timeout=3600)
1348
1349 stage[2] = "Checking instantiation parameters."
1350 RO_ns_params = self._ns_params_2_RO(ns_params, nsd, db_vnfds_ref, db_vnfrs, n2vc_key_list)
1351 stage[2] = "Deploying ns at VIM."
1352 db_nsr_update["detailed-status"] = " ".join(stage)
1353 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1354 self._write_op_status(nslcmop_id, stage)
1355
1356 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
1357 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
1358 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1359 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
1360 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
1361
1362 # wait until NS is ready
1363 stage[2] = "Waiting VIM to deploy ns."
1364 db_nsr_update["detailed-status"] = " ".join(stage)
1365 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1366 self._write_op_status(nslcmop_id, stage)
1367 detailed_status_old = None
1368 self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
1369
1370 old_desc = None
1371 while time() <= start_deploy + timeout_ns_deploy:
1372 desc = await self.RO.show("ns", RO_nsr_id)
1373
1374 # deploymentStatus
1375 if desc != old_desc:
1376 # desc has changed => update db
1377 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
1378 old_desc = desc
1379
1380 ns_status, ns_status_info = self.RO.check_ns_status(desc)
1381 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
1382 if ns_status == "ERROR":
1383 raise ROclient.ROClientException(ns_status_info)
1384 elif ns_status == "BUILD":
1385 stage[2] = "VIM: ({})".format(ns_status_info)
1386 elif ns_status == "ACTIVE":
1387 stage[2] = "Waiting for management IP address reported by the VIM. Updating VNFRs."
1388 try:
1389 self.ns_update_vnfr(db_vnfrs, desc)
1390 break
1391 except LcmExceptionNoMgmtIP:
1392 pass
1393 else:
1394 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
1395 if stage[2] != detailed_status_old:
1396 detailed_status_old = stage[2]
1397 db_nsr_update["detailed-status"] = " ".join(stage)
1398 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1399 self._write_op_status(nslcmop_id, stage)
1400 await asyncio.sleep(5, loop=self.loop)
1401 else: # timeout_ns_deploy
1402 raise ROclient.ROClientException("Timeout waiting ns to be ready")
1403
1404 # Updating NSR
1405 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
1406
1407 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
1408 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1409 stage[2] = "Deployed at VIM"
1410 db_nsr_update["detailed-status"] = " ".join(stage)
1411 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1412 self._write_op_status(nslcmop_id, stage)
1413 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
1414 # self.logger.debug(logging_text + "Deployed at VIM")
1415 except Exception as e:
1416 stage[2] = "ERROR deploying at VIM"
1417 self.set_vnfr_at_error(db_vnfrs, str(e))
1418 self.logger.error("Error deploying at VIM {}".format(e),
1419 exc_info=not isinstance(e, (ROclient.ROClientException, LcmException, DbException,
1420 NgRoException)))
1421 raise
1422
1423 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1424 """
1425 Wait for kdu to be up, get ip address
1426 :param logging_text: prefix use for logging
1427 :param nsr_id:
1428 :param vnfr_id:
1429 :param kdu_name:
1430 :return: IP address
1431 """
1432
1433 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1434 nb_tries = 0
1435
1436 while nb_tries < 360:
1437 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1438 kdur = next((x for x in get_iterable(db_vnfr, "kdur") if x.get("kdu-name") == kdu_name), None)
1439 if not kdur:
1440 raise LcmException("Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name))
1441 if kdur.get("status"):
1442 if kdur["status"] in ("READY", "ENABLED"):
1443 return kdur.get("ip-address")
1444 else:
1445 raise LcmException("target KDU={} is in error state".format(kdu_name))
1446
1447 await asyncio.sleep(10, loop=self.loop)
1448 nb_tries += 1
1449 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1450
1451 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
1452 """
1453 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1454 :param logging_text: prefix use for logging
1455 :param nsr_id:
1456 :param vnfr_id:
1457 :param vdu_id:
1458 :param vdu_index:
1459 :param pub_key: public ssh key to inject, None to skip
1460 :param user: user to apply the public ssh key
1461 :return: IP address
1462 """
1463
1464 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1465 ro_nsr_id = None
1466 ip_address = None
1467 nb_tries = 0
1468 target_vdu_id = None
1469 ro_retries = 0
1470
1471 while True:
1472
1473 ro_retries += 1
1474 if ro_retries >= 360: # 1 hour
1475 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
1476
1477 await asyncio.sleep(10, loop=self.loop)
1478
1479 # get ip address
1480 if not target_vdu_id:
1481 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1482
1483 if not vdu_id: # for the VNF case
1484 if db_vnfr.get("status") == "ERROR":
1485 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1486 ip_address = db_vnfr.get("ip-address")
1487 if not ip_address:
1488 continue
1489 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
1490 else: # VDU case
1491 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
1492 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
1493
1494 if not vdur and len(db_vnfr.get("vdur", ())) == 1: # If only one, this should be the target vdu
1495 vdur = db_vnfr["vdur"][0]
1496 if not vdur:
1497 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id, vdu_id,
1498 vdu_index))
1499 # New generation RO stores information at "vim_info"
1500 ng_ro_status = None
1501 if vdur.get("vim_info"):
1502 target_vim = next(t for t in vdur["vim_info"]) # there should be only one key
1503 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1504 if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE" or ng_ro_status == "ACTIVE":
1505 ip_address = vdur.get("ip-address")
1506 if not ip_address:
1507 continue
1508 target_vdu_id = vdur["vdu-id-ref"]
1509 elif vdur.get("status") == "ERROR" or vdur["vim_info"][target_vim].get("vim_status") == "ERROR":
1510 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1511
1512 if not target_vdu_id:
1513 continue
1514
1515 # inject public key into machine
1516 if pub_key and user:
1517 self.logger.debug(logging_text + "Inserting RO key")
1518 if vdur.get("pdu-type"):
1519 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1520 return ip_address
1521 try:
1522 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
1523 if self.ng_ro:
1524 self.logger.debug(logging_text + "ALF lanzando orden")
1525 target = {"action": "inject_ssh_key", "key": pub_key, "user": user,
1526 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1527 }
1528 desc = await self.RO.deploy(nsr_id, target)
1529 action_id = desc["action_id"]
1530 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1531 break
1532 else:
1533 # wait until NS is deployed at RO
1534 if not ro_nsr_id:
1535 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1536 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
1537 if not ro_nsr_id:
1538 continue
1539 result_dict = await self.RO.create_action(
1540 item="ns",
1541 item_id_name=ro_nsr_id,
1542 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
1543 )
1544 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1545 if not result_dict or not isinstance(result_dict, dict):
1546 raise LcmException("Unknown response from RO when injecting key")
1547 for result in result_dict.values():
1548 if result.get("vim_result") == 200:
1549 break
1550 else:
1551 raise ROclient.ROClientException("error injecting key: {}".format(
1552 result.get("description")))
1553 break
1554 except NgRoException as e:
1555 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1556 except ROclient.ROClientException as e:
1557 if not nb_tries:
1558 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1559 format(e, 20*10))
1560 nb_tries += 1
1561 if nb_tries >= 20:
1562 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1563 else:
1564 break
1565
1566 return ip_address
1567
1568 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1569 """
1570 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1571 """
1572 my_vca = vca_deployed_list[vca_index]
1573 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1574 # vdu or kdu: no dependencies
1575 return
1576 timeout = 300
1577 while timeout >= 0:
1578 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1579 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1580 configuration_status_list = db_nsr["configurationStatus"]
1581 for index, vca_deployed in enumerate(configuration_status_list):
1582 if index == vca_index:
1583 # myself
1584 continue
1585 if not my_vca.get("member-vnf-index") or \
1586 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1587 internal_status = configuration_status_list[index].get("status")
1588 if internal_status == 'READY':
1589 continue
1590 elif internal_status == 'BROKEN':
1591 raise LcmException("Configuration aborted because dependent charm/s has failed")
1592 else:
1593 break
1594 else:
1595 # no dependencies, return
1596 return
1597 await asyncio.sleep(10)
1598 timeout -= 1
1599
1600 raise LcmException("Configuration aborted because dependent charm/s timeout")
1601
1602 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
1603 config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name,
1604 ee_config_descriptor):
1605 nsr_id = db_nsr["_id"]
1606 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1607 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1608 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1609 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1610 db_dict = {
1611 'collection': 'nsrs',
1612 'filter': {'_id': nsr_id},
1613 'path': db_update_entry
1614 }
1615 step = ""
1616 try:
1617
1618 element_type = 'NS'
1619 element_under_configuration = nsr_id
1620
1621 vnfr_id = None
1622 if db_vnfr:
1623 vnfr_id = db_vnfr["_id"]
1624 osm_config["osm"]["vnf_id"] = vnfr_id
1625
1626 namespace = "{nsi}.{ns}".format(
1627 nsi=nsi_id if nsi_id else "",
1628 ns=nsr_id)
1629
1630 if vnfr_id:
1631 element_type = 'VNF'
1632 element_under_configuration = vnfr_id
1633 namespace += ".{}".format(vnfr_id)
1634 if vdu_id:
1635 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1636 element_type = 'VDU'
1637 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1638 osm_config["osm"]["vdu_id"] = vdu_id
1639 elif kdu_name:
1640 namespace += ".{}".format(kdu_name)
1641 element_type = 'KDU'
1642 element_under_configuration = kdu_name
1643 osm_config["osm"]["kdu_name"] = kdu_name
1644
1645 # Get artifact path
1646 artifact_path = "{}/{}/{}/{}".format(
1647 base_folder["folder"],
1648 base_folder["pkg-dir"],
1649 "charms" if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1650 vca_name
1651 )
1652 # get initial_config_primitive_list that applies to this element
1653 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1654
1655 # add config if not present for NS charm
1656 ee_descriptor_id = ee_config_descriptor.get("id")
1657 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1658 vca_deployed, ee_descriptor_id)
1659
1660 # n2vc_redesign STEP 3.1
1661 # find old ee_id if exists
1662 ee_id = vca_deployed.get("ee_id")
1663
1664 vim_account_id = (
1665 deep_get(db_vnfr, ("vim-account-id",)) or
1666 deep_get(deploy_params, ("OSM", "vim_account_id"))
1667 )
1668 vca_cloud, vca_cloud_credential = self.get_vca_cloud_and_credentials(vim_account_id)
1669 vca_k8s_cloud, vca_k8s_cloud_credential = self.get_vca_k8s_cloud_and_credentials(vim_account_id)
1670 # create or register execution environment in VCA
1671 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1672
1673 self._write_configuration_status(
1674 nsr_id=nsr_id,
1675 vca_index=vca_index,
1676 status='CREATING',
1677 element_under_configuration=element_under_configuration,
1678 element_type=element_type
1679 )
1680
1681 step = "create execution environment"
1682 self.logger.debug(logging_text + step)
1683
1684 ee_id = None
1685 credentials = None
1686 if vca_type == "k8s_proxy_charm":
1687 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1688 charm_name=artifact_path[artifact_path.rfind("/") + 1:],
1689 namespace=namespace,
1690 artifact_path=artifact_path,
1691 db_dict=db_dict,
1692 cloud_name=vca_k8s_cloud,
1693 credential_name=vca_k8s_cloud_credential,
1694 )
1695 else:
1696 ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
1697 namespace=namespace,
1698 reuse_ee_id=ee_id,
1699 db_dict=db_dict,
1700 cloud_name=vca_cloud,
1701 credential_name=vca_cloud_credential,
1702 )
1703
1704 elif vca_type == "native_charm":
1705 step = "Waiting to VM being up and getting IP address"
1706 self.logger.debug(logging_text + step)
1707 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1708 user=None, pub_key=None)
1709 credentials = {"hostname": rw_mgmt_ip}
1710 # get username
1711 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1712 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1713 # merged. Meanwhile let's get username from initial-config-primitive
1714 if not username and initial_config_primitive_list:
1715 for config_primitive in initial_config_primitive_list:
1716 for param in config_primitive.get("parameter", ()):
1717 if param["name"] == "ssh-username":
1718 username = param["value"]
1719 break
1720 if not username:
1721 raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
1722 "'config-access.ssh-access.default-user'")
1723 credentials["username"] = username
1724 # n2vc_redesign STEP 3.2
1725
1726 self._write_configuration_status(
1727 nsr_id=nsr_id,
1728 vca_index=vca_index,
1729 status='REGISTERING',
1730 element_under_configuration=element_under_configuration,
1731 element_type=element_type
1732 )
1733
1734 step = "register execution environment {}".format(credentials)
1735 self.logger.debug(logging_text + step)
1736 ee_id = await self.vca_map[vca_type].register_execution_environment(
1737 credentials=credentials,
1738 namespace=namespace,
1739 db_dict=db_dict,
1740 cloud_name=vca_cloud,
1741 credential_name=vca_cloud_credential,
1742 )
1743
1744 # for compatibility with MON/POL modules, the need model and application name at database
1745 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1746 ee_id_parts = ee_id.split('.')
1747 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1748 if len(ee_id_parts) >= 2:
1749 model_name = ee_id_parts[0]
1750 application_name = ee_id_parts[1]
1751 db_nsr_update[db_update_entry + "model"] = model_name
1752 db_nsr_update[db_update_entry + "application"] = application_name
1753
1754 # n2vc_redesign STEP 3.3
1755 step = "Install configuration Software"
1756
1757 self._write_configuration_status(
1758 nsr_id=nsr_id,
1759 vca_index=vca_index,
1760 status='INSTALLING SW',
1761 element_under_configuration=element_under_configuration,
1762 element_type=element_type,
1763 other_update=db_nsr_update
1764 )
1765
1766 # TODO check if already done
1767 self.logger.debug(logging_text + step)
1768 config = None
1769 if vca_type == "native_charm":
1770 config_primitive = next((p for p in initial_config_primitive_list if p["name"] == "config"), None)
1771 if config_primitive:
1772 config = self._map_primitive_params(
1773 config_primitive,
1774 {},
1775 deploy_params
1776 )
1777 num_units = 1
1778 if vca_type == "lxc_proxy_charm":
1779 if element_type == "NS":
1780 num_units = db_nsr.get("config-units") or 1
1781 elif element_type == "VNF":
1782 num_units = db_vnfr.get("config-units") or 1
1783 elif element_type == "VDU":
1784 for v in db_vnfr["vdur"]:
1785 if vdu_id == v["vdu-id-ref"]:
1786 num_units = v.get("config-units") or 1
1787 break
1788 if vca_type != "k8s_proxy_charm":
1789 await self.vca_map[vca_type].install_configuration_sw(
1790 ee_id=ee_id,
1791 artifact_path=artifact_path,
1792 db_dict=db_dict,
1793 config=config,
1794 num_units=num_units,
1795 )
1796
1797 # write in db flag of configuration_sw already installed
1798 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1799
1800 # add relations for this VCA (wait for other peers related with this VCA)
1801 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id,
1802 vca_index=vca_index, vca_type=vca_type)
1803
1804 # if SSH access is required, then get execution environment SSH public
1805 # if native charm we have waited already to VM be UP
1806 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1807 pub_key = None
1808 user = None
1809 # self.logger.debug("get ssh key block")
1810 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1811 # self.logger.debug("ssh key needed")
1812 # Needed to inject a ssh key
1813 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1814 step = "Install configuration Software, getting public ssh key"
1815 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1816
1817 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1818 else:
1819 # self.logger.debug("no need to get ssh key")
1820 step = "Waiting to VM being up and getting IP address"
1821 self.logger.debug(logging_text + step)
1822
1823 # n2vc_redesign STEP 5.1
1824 # wait for RO (ip-address) Insert pub_key into VM
1825 if vnfr_id:
1826 if kdu_name:
1827 rw_mgmt_ip = await self.wait_kdu_up(logging_text, nsr_id, vnfr_id, kdu_name)
1828 else:
1829 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id,
1830 vdu_index, user=user, pub_key=pub_key)
1831 else:
1832 rw_mgmt_ip = None # This is for a NS configuration
1833
1834 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1835
1836 # store rw_mgmt_ip in deploy params for later replacement
1837 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1838
1839 # n2vc_redesign STEP 6 Execute initial config primitive
1840 step = 'execute initial config primitive'
1841
1842 # wait for dependent primitives execution (NS -> VNF -> VDU)
1843 if initial_config_primitive_list:
1844 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1845
1846 # stage, in function of element type: vdu, kdu, vnf or ns
1847 my_vca = vca_deployed_list[vca_index]
1848 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1849 # VDU or KDU
1850 stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1851 elif my_vca.get("member-vnf-index"):
1852 # VNF
1853 stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1854 else:
1855 # NS
1856 stage[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1857
1858 self._write_configuration_status(
1859 nsr_id=nsr_id,
1860 vca_index=vca_index,
1861 status='EXECUTING PRIMITIVE'
1862 )
1863
1864 self._write_op_status(
1865 op_id=nslcmop_id,
1866 stage=stage
1867 )
1868
1869 check_if_terminated_needed = True
1870 for initial_config_primitive in initial_config_primitive_list:
1871 # adding information on the vca_deployed if it is a NS execution environment
1872 if not vca_deployed["member-vnf-index"]:
1873 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1874 # TODO check if already done
1875 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1876
1877 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1878 self.logger.debug(logging_text + step)
1879 await self.vca_map[vca_type].exec_primitive(
1880 ee_id=ee_id,
1881 primitive_name=initial_config_primitive["name"],
1882 params_dict=primitive_params_,
1883 db_dict=db_dict
1884 )
1885 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1886 if check_if_terminated_needed:
1887 if config_descriptor.get('terminate-config-primitive'):
1888 self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True})
1889 check_if_terminated_needed = False
1890
1891 # TODO register in database that primitive is done
1892
1893 # STEP 7 Configure metrics
1894 if vca_type == "helm" or vca_type == "helm-v3":
1895 prometheus_jobs = await self.add_prometheus_metrics(
1896 ee_id=ee_id,
1897 artifact_path=artifact_path,
1898 ee_config_descriptor=ee_config_descriptor,
1899 vnfr_id=vnfr_id,
1900 nsr_id=nsr_id,
1901 target_ip=rw_mgmt_ip,
1902 )
1903 if prometheus_jobs:
1904 self.update_db_2("nsrs", nsr_id, {db_update_entry + "prometheus_jobs": prometheus_jobs})
1905
1906 step = "instantiated at VCA"
1907 self.logger.debug(logging_text + step)
1908
1909 self._write_configuration_status(
1910 nsr_id=nsr_id,
1911 vca_index=vca_index,
1912 status='READY'
1913 )
1914
1915 except Exception as e: # TODO not use Exception but N2VC exception
1916 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1917 if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)):
1918 self.logger.error("Exception while {} : {}".format(step, e), exc_info=True)
1919 self._write_configuration_status(
1920 nsr_id=nsr_id,
1921 vca_index=vca_index,
1922 status='BROKEN'
1923 )
1924 raise LcmException("{} {}".format(step, e)) from e
1925
1926 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1927 error_description: str = None, error_detail: str = None, other_update: dict = None):
1928 """
1929 Update db_nsr fields.
1930 :param nsr_id:
1931 :param ns_state:
1932 :param current_operation:
1933 :param current_operation_id:
1934 :param error_description:
1935 :param error_detail:
1936 :param other_update: Other required changes at database if provided, will be cleared
1937 :return:
1938 """
1939 try:
1940 db_dict = other_update or {}
1941 db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility
1942 db_dict["_admin.current-operation"] = current_operation_id
1943 db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None
1944 db_dict["currentOperation"] = current_operation
1945 db_dict["currentOperationID"] = current_operation_id
1946 db_dict["errorDescription"] = error_description
1947 db_dict["errorDetail"] = error_detail
1948
1949 if ns_state:
1950 db_dict["nsState"] = ns_state
1951 self.update_db_2("nsrs", nsr_id, db_dict)
1952 except DbException as e:
1953 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1954
1955 def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0,
1956 operation_state: str = None, other_update: dict = None):
1957 try:
1958 db_dict = other_update or {}
1959 db_dict['queuePosition'] = queuePosition
1960 if isinstance(stage, list):
1961 db_dict['stage'] = stage[0]
1962 db_dict['detailed-status'] = " ".join(stage)
1963 elif stage is not None:
1964 db_dict['stage'] = str(stage)
1965
1966 if error_message is not None:
1967 db_dict['errorMessage'] = error_message
1968 if operation_state is not None:
1969 db_dict['operationState'] = operation_state
1970 db_dict["statusEnteredTime"] = time()
1971 self.update_db_2("nslcmops", op_id, db_dict)
1972 except DbException as e:
1973 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1974
1975 def _write_all_config_status(self, db_nsr: dict, status: str):
1976 try:
1977 nsr_id = db_nsr["_id"]
1978 # configurationStatus
1979 config_status = db_nsr.get('configurationStatus')
1980 if config_status:
1981 db_nsr_update = {"configurationStatus.{}.status".format(index): status for index, v in
1982 enumerate(config_status) if v}
1983 # update status
1984 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1985
1986 except DbException as e:
1987 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1988
1989 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1990 element_under_configuration: str = None, element_type: str = None,
1991 other_update: dict = None):
1992
1993 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1994 # .format(vca_index, status))
1995
1996 try:
1997 db_path = 'configurationStatus.{}.'.format(vca_index)
1998 db_dict = other_update or {}
1999 if status:
2000 db_dict[db_path + 'status'] = status
2001 if element_under_configuration:
2002 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
2003 if element_type:
2004 db_dict[db_path + 'elementType'] = element_type
2005 self.update_db_2("nsrs", nsr_id, db_dict)
2006 except DbException as e:
2007 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
2008 .format(status, nsr_id, vca_index, e))
2009
2010 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
2011 """
2012 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2013 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2014 Database is used because the result can be obtained from a different LCM worker in case of HA.
2015 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2016 :param db_nslcmop: database content of nslcmop
2017 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2018 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2019 computed 'vim-account-id'
2020 """
2021 modified = False
2022 nslcmop_id = db_nslcmop['_id']
2023 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
2024 if placement_engine == "PLA":
2025 self.logger.debug(logging_text + "Invoke and wait for placement optimization")
2026 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id}, loop=self.loop)
2027 db_poll_interval = 5
2028 wait = db_poll_interval * 10
2029 pla_result = None
2030 while not pla_result and wait >= 0:
2031 await asyncio.sleep(db_poll_interval)
2032 wait -= db_poll_interval
2033 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2034 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
2035
2036 if not pla_result:
2037 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id))
2038
2039 for pla_vnf in pla_result['vnf']:
2040 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
2041 if not pla_vnf.get('vimAccountId') or not vnfr:
2042 continue
2043 modified = True
2044 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
2045 # Modifies db_vnfrs
2046 vnfr["vim-account-id"] = pla_vnf['vimAccountId']
2047 return modified
2048
2049 def update_nsrs_with_pla_result(self, params):
2050 try:
2051 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
2052 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
2053 except Exception as e:
2054 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
2055
2056 async def instantiate(self, nsr_id, nslcmop_id):
2057 """
2058
2059 :param nsr_id: ns instance to deploy
2060 :param nslcmop_id: operation to run
2061 :return:
2062 """
2063
2064 # Try to lock HA task here
2065 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2066 if not task_is_locked_by_me:
2067 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
2068 return
2069
2070 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
2071 self.logger.debug(logging_text + "Enter")
2072
2073 # get all needed from database
2074
2075 # database nsrs record
2076 db_nsr = None
2077
2078 # database nslcmops record
2079 db_nslcmop = None
2080
2081 # update operation on nsrs
2082 db_nsr_update = {}
2083 # update operation on nslcmops
2084 db_nslcmop_update = {}
2085
2086 nslcmop_operation_state = None
2087 db_vnfrs = {} # vnf's info indexed by member-index
2088 # n2vc_info = {}
2089 tasks_dict_info = {} # from task to info text
2090 exc = None
2091 error_list = []
2092 stage = ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
2093 # ^ stage, step, VIM progress
2094 try:
2095 # wait for any previous tasks in process
2096 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2097
2098 stage[1] = "Sync filesystem from database."
2099 self.fs.sync() # TODO, make use of partial sync, only for the needed packages
2100
2101 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2102 stage[1] = "Reading from database."
2103 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2104 db_nsr_update["detailed-status"] = "creating"
2105 db_nsr_update["operational-status"] = "init"
2106 self._write_ns_status(
2107 nsr_id=nsr_id,
2108 ns_state="BUILDING",
2109 current_operation="INSTANTIATING",
2110 current_operation_id=nslcmop_id,
2111 other_update=db_nsr_update
2112 )
2113 self._write_op_status(
2114 op_id=nslcmop_id,
2115 stage=stage,
2116 queuePosition=0
2117 )
2118
2119 # read from db: operation
2120 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2121 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2122 ns_params = db_nslcmop.get("operationParams")
2123 if ns_params and ns_params.get("timeout_ns_deploy"):
2124 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
2125 else:
2126 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
2127
2128 # read from db: ns
2129 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2130 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2131 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
2132 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2133 db_nsr["nsd"] = nsd
2134 # nsr_name = db_nsr["name"] # TODO short-name??
2135
2136 # read from db: vnf's of this ns
2137 stage[1] = "Getting vnfrs from db."
2138 self.logger.debug(logging_text + stage[1])
2139 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2140
2141 # read from db: vnfd's for every vnf
2142 db_vnfds_ref = {} # every vnfd data indexed by vnf name
2143 db_vnfds = {} # every vnfd data indexed by vnf id
2144 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
2145
2146 # for each vnf in ns, read vnfd
2147 for vnfr in db_vnfrs_list:
2148 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
2149 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
2150 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
2151
2152 # if we haven't this vnfd, read it from db
2153 if vnfd_id not in db_vnfds:
2154 # read from db
2155 stage[1] = "Getting vnfd={} id='{}' from db.".format(vnfd_id, vnfd_ref)
2156 self.logger.debug(logging_text + stage[1])
2157 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2158
2159 # store vnfd
2160 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
2161 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
2162 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
2163
2164 # Get or generates the _admin.deployed.VCA list
2165 vca_deployed_list = None
2166 if db_nsr["_admin"].get("deployed"):
2167 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2168 if vca_deployed_list is None:
2169 vca_deployed_list = []
2170 configuration_status_list = []
2171 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2172 db_nsr_update["configurationStatus"] = configuration_status_list
2173 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2174 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2175 elif isinstance(vca_deployed_list, dict):
2176 # maintain backward compatibility. Change a dict to list at database
2177 vca_deployed_list = list(vca_deployed_list.values())
2178 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2179 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2180
2181 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
2182 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2183 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2184
2185 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2186 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2187 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2188 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"})
2189
2190 # n2vc_redesign STEP 2 Deploy Network Scenario
2191 stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
2192 self._write_op_status(
2193 op_id=nslcmop_id,
2194 stage=stage
2195 )
2196
2197 stage[1] = "Deploying KDUs."
2198 # self.logger.debug(logging_text + "Before deploy_kdus")
2199 # Call to deploy_kdus in case exists the "vdu:kdu" param
2200 await self.deploy_kdus(
2201 logging_text=logging_text,
2202 nsr_id=nsr_id,
2203 nslcmop_id=nslcmop_id,
2204 db_vnfrs=db_vnfrs,
2205 db_vnfds=db_vnfds,
2206 task_instantiation_info=tasks_dict_info,
2207 )
2208
2209 stage[1] = "Getting VCA public key."
2210 # n2vc_redesign STEP 1 Get VCA public ssh-key
2211 # feature 1429. Add n2vc public key to needed VMs
2212 n2vc_key = self.n2vc.get_public_key()
2213 n2vc_key_list = [n2vc_key]
2214 if self.vca_config.get("public_key"):
2215 n2vc_key_list.append(self.vca_config["public_key"])
2216
2217 stage[1] = "Deploying NS at VIM."
2218 task_ro = asyncio.ensure_future(
2219 self.instantiate_RO(
2220 logging_text=logging_text,
2221 nsr_id=nsr_id,
2222 nsd=nsd,
2223 db_nsr=db_nsr,
2224 db_nslcmop=db_nslcmop,
2225 db_vnfrs=db_vnfrs,
2226 db_vnfds_ref=db_vnfds_ref,
2227 n2vc_key_list=n2vc_key_list,
2228 stage=stage
2229 )
2230 )
2231 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2232 tasks_dict_info[task_ro] = "Deploying at VIM"
2233
2234 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2235 stage[1] = "Deploying Execution Environments."
2236 self.logger.debug(logging_text + stage[1])
2237
2238 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2239 # get_iterable() returns a value from a dict or empty tuple if key does not exist
2240 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
2241 vnfd_id = c_vnf["vnfd-id-ref"]
2242 vnfd = db_vnfds_ref[vnfd_id]
2243 member_vnf_index = str(c_vnf["member-vnf-index"])
2244 db_vnfr = db_vnfrs[member_vnf_index]
2245 base_folder = vnfd["_admin"]["storage"]
2246 vdu_id = None
2247 vdu_index = 0
2248 vdu_name = None
2249 kdu_name = None
2250
2251 # Get additional parameters
2252 deploy_params = {"OSM": self._get_osm_params(db_vnfr)}
2253 if db_vnfr.get("additionalParamsForVnf"):
2254 deploy_params.update(self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy()))
2255
2256 descriptor_config = vnfd.get("vnf-configuration")
2257 if descriptor_config:
2258 self._deploy_n2vc(
2259 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
2260 db_nsr=db_nsr,
2261 db_vnfr=db_vnfr,
2262 nslcmop_id=nslcmop_id,
2263 nsr_id=nsr_id,
2264 nsi_id=nsi_id,
2265 vnfd_id=vnfd_id,
2266 vdu_id=vdu_id,
2267 kdu_name=kdu_name,
2268 member_vnf_index=member_vnf_index,
2269 vdu_index=vdu_index,
2270 vdu_name=vdu_name,
2271 deploy_params=deploy_params,
2272 descriptor_config=descriptor_config,
2273 base_folder=base_folder,
2274 task_instantiation_info=tasks_dict_info,
2275 stage=stage
2276 )
2277
2278 # Deploy charms for each VDU that supports one.
2279 for vdud in get_iterable(vnfd, 'vdu'):
2280 vdu_id = vdud["id"]
2281 descriptor_config = vdud.get('vdu-configuration')
2282 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
2283 if vdur.get("additionalParams"):
2284 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
2285 else:
2286 deploy_params_vdu = deploy_params
2287 deploy_params_vdu["OSM"] = self._get_osm_params(db_vnfr, vdu_id, vdu_count_index=0)
2288 if descriptor_config:
2289 vdu_name = None
2290 kdu_name = None
2291 for vdu_index in range(int(vdud.get("count", 1))):
2292 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2293 self._deploy_n2vc(
2294 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2295 member_vnf_index, vdu_id, vdu_index),
2296 db_nsr=db_nsr,
2297 db_vnfr=db_vnfr,
2298 nslcmop_id=nslcmop_id,
2299 nsr_id=nsr_id,
2300 nsi_id=nsi_id,
2301 vnfd_id=vnfd_id,
2302 vdu_id=vdu_id,
2303 kdu_name=kdu_name,
2304 member_vnf_index=member_vnf_index,
2305 vdu_index=vdu_index,
2306 vdu_name=vdu_name,
2307 deploy_params=deploy_params_vdu,
2308 descriptor_config=descriptor_config,
2309 base_folder=base_folder,
2310 task_instantiation_info=tasks_dict_info,
2311 stage=stage
2312 )
2313 for kdud in get_iterable(vnfd, 'kdu'):
2314 kdu_name = kdud["name"]
2315 descriptor_config = kdud.get('kdu-configuration')
2316 if descriptor_config:
2317 vdu_id = None
2318 vdu_index = 0
2319 vdu_name = None
2320 kdur = next(x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name)
2321 deploy_params_kdu = {"OSM": self._get_osm_params(db_vnfr)}
2322 if kdur.get("additionalParams"):
2323 deploy_params_kdu = self._format_additional_params(kdur["additionalParams"])
2324
2325 self._deploy_n2vc(
2326 logging_text=logging_text,
2327 db_nsr=db_nsr,
2328 db_vnfr=db_vnfr,
2329 nslcmop_id=nslcmop_id,
2330 nsr_id=nsr_id,
2331 nsi_id=nsi_id,
2332 vnfd_id=vnfd_id,
2333 vdu_id=vdu_id,
2334 kdu_name=kdu_name,
2335 member_vnf_index=member_vnf_index,
2336 vdu_index=vdu_index,
2337 vdu_name=vdu_name,
2338 deploy_params=deploy_params_kdu,
2339 descriptor_config=descriptor_config,
2340 base_folder=base_folder,
2341 task_instantiation_info=tasks_dict_info,
2342 stage=stage
2343 )
2344
2345 # Check if this NS has a charm configuration
2346 descriptor_config = nsd.get("ns-configuration")
2347 if descriptor_config and descriptor_config.get("juju"):
2348 vnfd_id = None
2349 db_vnfr = None
2350 member_vnf_index = None
2351 vdu_id = None
2352 kdu_name = None
2353 vdu_index = 0
2354 vdu_name = None
2355
2356 # Get additional parameters
2357 deploy_params = {"OSM": self._get_osm_params(db_vnfr)}
2358 if db_nsr.get("additionalParamsForNs"):
2359 deploy_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"].copy()))
2360 base_folder = nsd["_admin"]["storage"]
2361 self._deploy_n2vc(
2362 logging_text=logging_text,
2363 db_nsr=db_nsr,
2364 db_vnfr=db_vnfr,
2365 nslcmop_id=nslcmop_id,
2366 nsr_id=nsr_id,
2367 nsi_id=nsi_id,
2368 vnfd_id=vnfd_id,
2369 vdu_id=vdu_id,
2370 kdu_name=kdu_name,
2371 member_vnf_index=member_vnf_index,
2372 vdu_index=vdu_index,
2373 vdu_name=vdu_name,
2374 deploy_params=deploy_params,
2375 descriptor_config=descriptor_config,
2376 base_folder=base_folder,
2377 task_instantiation_info=tasks_dict_info,
2378 stage=stage
2379 )
2380
2381 # rest of staff will be done at finally
2382
2383 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
2384 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e))
2385 exc = e
2386 except asyncio.CancelledError:
2387 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
2388 exc = "Operation was cancelled"
2389 except Exception as e:
2390 exc = traceback.format_exc()
2391 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
2392 finally:
2393 if exc:
2394 error_list.append(str(exc))
2395 try:
2396 # wait for pending tasks
2397 if tasks_dict_info:
2398 stage[1] = "Waiting for instantiate pending tasks."
2399 self.logger.debug(logging_text + stage[1])
2400 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy,
2401 stage, nslcmop_id, nsr_id=nsr_id)
2402 stage[1] = stage[2] = ""
2403 except asyncio.CancelledError:
2404 error_list.append("Cancelled")
2405 # TODO cancel all tasks
2406 except Exception as exc:
2407 error_list.append(str(exc))
2408
2409 # update operation-status
2410 db_nsr_update["operational-status"] = "running"
2411 # let's begin with VCA 'configured' status (later we can change it)
2412 db_nsr_update["config-status"] = "configured"
2413 for task, task_name in tasks_dict_info.items():
2414 if not task.done() or task.cancelled() or task.exception():
2415 if task_name.startswith(self.task_name_deploy_vca):
2416 # A N2VC task is pending
2417 db_nsr_update["config-status"] = "failed"
2418 else:
2419 # RO or KDU task is pending
2420 db_nsr_update["operational-status"] = "failed"
2421
2422 # update status at database
2423 if error_list:
2424 error_detail = ". ".join(error_list)
2425 self.logger.error(logging_text + error_detail)
2426 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
2427 error_description_nsr = 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id, stage[0])
2428
2429 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
2430 db_nslcmop_update["detailed-status"] = error_detail
2431 nslcmop_operation_state = "FAILED"
2432 ns_state = "BROKEN"
2433 else:
2434 error_detail = None
2435 error_description_nsr = error_description_nslcmop = None
2436 ns_state = "READY"
2437 db_nsr_update["detailed-status"] = "Done"
2438 db_nslcmop_update["detailed-status"] = "Done"
2439 nslcmop_operation_state = "COMPLETED"
2440
2441 if db_nsr:
2442 self._write_ns_status(
2443 nsr_id=nsr_id,
2444 ns_state=ns_state,
2445 current_operation="IDLE",
2446 current_operation_id=None,
2447 error_description=error_description_nsr,
2448 error_detail=error_detail,
2449 other_update=db_nsr_update
2450 )
2451 self._write_op_status(
2452 op_id=nslcmop_id,
2453 stage="",
2454 error_message=error_description_nslcmop,
2455 operation_state=nslcmop_operation_state,
2456 other_update=db_nslcmop_update,
2457 )
2458
2459 if nslcmop_operation_state:
2460 try:
2461 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2462 "operationState": nslcmop_operation_state},
2463 loop=self.loop)
2464 except Exception as e:
2465 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2466
2467 self.logger.debug(logging_text + "Exit")
2468 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2469
2470 async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int,
2471 timeout: int = 3600, vca_type: str = None) -> bool:
2472
2473 # steps:
2474 # 1. find all relations for this VCA
2475 # 2. wait for other peers related
2476 # 3. add relations
2477
2478 try:
2479 vca_type = vca_type or "lxc_proxy_charm"
2480
2481 # STEP 1: find all relations for this VCA
2482
2483 # read nsr record
2484 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2485 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2486
2487 # this VCA data
2488 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
2489
2490 # read all ns-configuration relations
2491 ns_relations = list()
2492 db_ns_relations = deep_get(nsd, ('ns-configuration', 'relation'))
2493 if db_ns_relations:
2494 for r in db_ns_relations:
2495 # check if this VCA is in the relation
2496 if my_vca.get('member-vnf-index') in\
2497 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2498 ns_relations.append(r)
2499
2500 # read all vnf-configuration relations
2501 vnf_relations = list()
2502 db_vnfd_list = db_nsr.get('vnfd-id')
2503 if db_vnfd_list:
2504 for vnfd in db_vnfd_list:
2505 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
2506 db_vnf_relations = deep_get(db_vnfd, ('vnf-configuration', 'relation'))
2507 if db_vnf_relations:
2508 for r in db_vnf_relations:
2509 # check if this VCA is in the relation
2510 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2511 vnf_relations.append(r)
2512
2513 # if no relations, terminate
2514 if not ns_relations and not vnf_relations:
2515 self.logger.debug(logging_text + ' No relations')
2516 return True
2517
2518 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
2519
2520 # add all relations
2521 start = time()
2522 while True:
2523 # check timeout
2524 now = time()
2525 if now - start >= timeout:
2526 self.logger.error(logging_text + ' : timeout adding relations')
2527 return False
2528
2529 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2530 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2531
2532 # for each defined NS relation, find the VCA's related
2533 for r in ns_relations.copy():
2534 from_vca_ee_id = None
2535 to_vca_ee_id = None
2536 from_vca_endpoint = None
2537 to_vca_endpoint = None
2538 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2539 for vca in vca_list:
2540 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
2541 and vca.get('config_sw_installed'):
2542 from_vca_ee_id = vca.get('ee_id')
2543 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2544 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
2545 and vca.get('config_sw_installed'):
2546 to_vca_ee_id = vca.get('ee_id')
2547 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2548 if from_vca_ee_id and to_vca_ee_id:
2549 # add relation
2550 await self.vca_map[vca_type].add_relation(
2551 ee_id_1=from_vca_ee_id,
2552 ee_id_2=to_vca_ee_id,
2553 endpoint_1=from_vca_endpoint,
2554 endpoint_2=to_vca_endpoint)
2555 # remove entry from relations list
2556 ns_relations.remove(r)
2557 else:
2558 # check failed peers
2559 try:
2560 vca_status_list = db_nsr.get('configurationStatus')
2561 if vca_status_list:
2562 for i in range(len(vca_list)):
2563 vca = vca_list[i]
2564 vca_status = vca_status_list[i]
2565 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
2566 if vca_status.get('status') == 'BROKEN':
2567 # peer broken: remove relation from list
2568 ns_relations.remove(r)
2569 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
2570 if vca_status.get('status') == 'BROKEN':
2571 # peer broken: remove relation from list
2572 ns_relations.remove(r)
2573 except Exception:
2574 # ignore
2575 pass
2576
2577 # for each defined VNF relation, find the VCA's related
2578 for r in vnf_relations.copy():
2579 from_vca_ee_id = None
2580 to_vca_ee_id = None
2581 from_vca_endpoint = None
2582 to_vca_endpoint = None
2583 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2584 for vca in vca_list:
2585 key_to_check = "vdu_id"
2586 if vca.get("vdu_id") is None:
2587 key_to_check = "vnfd_id"
2588 if vca.get(key_to_check) == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
2589 from_vca_ee_id = vca.get('ee_id')
2590 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2591 if vca.get(key_to_check) == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
2592 to_vca_ee_id = vca.get('ee_id')
2593 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2594 if from_vca_ee_id and to_vca_ee_id:
2595 # add relation
2596 await self.vca_map[vca_type].add_relation(
2597 ee_id_1=from_vca_ee_id,
2598 ee_id_2=to_vca_ee_id,
2599 endpoint_1=from_vca_endpoint,
2600 endpoint_2=to_vca_endpoint)
2601 # remove entry from relations list
2602 vnf_relations.remove(r)
2603 else:
2604 # check failed peers
2605 try:
2606 vca_status_list = db_nsr.get('configurationStatus')
2607 if vca_status_list:
2608 for i in range(len(vca_list)):
2609 vca = vca_list[i]
2610 vca_status = vca_status_list[i]
2611 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
2612 if vca_status.get('status') == 'BROKEN':
2613 # peer broken: remove relation from list
2614 vnf_relations.remove(r)
2615 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2616 if vca_status.get('status') == 'BROKEN':
2617 # peer broken: remove relation from list
2618 vnf_relations.remove(r)
2619 except Exception:
2620 # ignore
2621 pass
2622
2623 # wait for next try
2624 await asyncio.sleep(5.0)
2625
2626 if not ns_relations and not vnf_relations:
2627 self.logger.debug('Relations added')
2628 break
2629
2630 return True
2631
2632 except Exception as e:
2633 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2634 return False
2635
2636 async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdud: dict,
2637 vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600):
2638
2639 try:
2640 k8sclustertype = k8s_instance_info["k8scluster-type"]
2641 # Instantiate kdu
2642 db_dict_install = {"collection": "nsrs",
2643 "filter": {"_id": nsr_id},
2644 "path": nsr_db_path}
2645
2646 kdu_instance = await self.k8scluster_map[k8sclustertype].install(
2647 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2648 kdu_model=k8s_instance_info["kdu-model"],
2649 atomic=True,
2650 params=k8params,
2651 db_dict=db_dict_install,
2652 timeout=timeout,
2653 kdu_name=k8s_instance_info["kdu-name"],
2654 namespace=k8s_instance_info["namespace"])
2655 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
2656
2657 # Obtain services to obtain management service ip
2658 services = await self.k8scluster_map[k8sclustertype].get_services(
2659 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2660 kdu_instance=kdu_instance,
2661 namespace=k8s_instance_info["namespace"])
2662
2663 # Obtain management service info (if exists)
2664 vnfr_update_dict = {}
2665 if services:
2666 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
2667 mgmt_services = [service for service in kdud.get("service", []) if service.get("mgmt-service")]
2668 for mgmt_service in mgmt_services:
2669 for service in services:
2670 if service["name"].startswith(mgmt_service["name"]):
2671 # Mgmt service found, Obtain service ip
2672 ip = service.get("external_ip", service.get("cluster_ip"))
2673 if isinstance(ip, list) and len(ip) == 1:
2674 ip = ip[0]
2675
2676 vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip
2677
2678 # Check if must update also mgmt ip at the vnf
2679 service_external_cp = mgmt_service.get("external-connection-point-ref")
2680 if service_external_cp:
2681 if deep_get(vnfd, ("mgmt-interface", "cp")) == service_external_cp:
2682 vnfr_update_dict["ip-address"] = ip
2683
2684 break
2685 else:
2686 self.logger.warn("Mgmt service name: {} not found".format(mgmt_service["name"]))
2687
2688 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
2689 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
2690
2691 kdu_config = kdud.get("kdu-configuration")
2692 if kdu_config and kdu_config.get("initial-config-primitive") and kdu_config.get("juju") is None:
2693 initial_config_primitive_list = kdu_config.get("initial-config-primitive")
2694 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
2695
2696 for initial_config_primitive in initial_config_primitive_list:
2697 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, {})
2698
2699 await asyncio.wait_for(
2700 self.k8scluster_map[k8sclustertype].exec_primitive(
2701 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2702 kdu_instance=kdu_instance,
2703 primitive_name=initial_config_primitive["name"],
2704 params=primitive_params_, db_dict={}),
2705 timeout=timeout)
2706
2707 except Exception as e:
2708 # Prepare update db with error and raise exception
2709 try:
2710 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)})
2711 self.update_db_2("vnfrs", vnfr_data.get("_id"), {"kdur.{}.status".format(kdu_index): "ERROR"})
2712 except Exception:
2713 # ignore to keep original exception
2714 pass
2715 # reraise original error
2716 raise
2717
2718 return kdu_instance
2719
2720 async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
2721 # Launch kdus if present in the descriptor
2722
2723 k8scluster_id_2_uuic = {"helm-chart-v3": {}, "helm-chart": {}, "juju-bundle": {}}
2724
2725 async def _get_cluster_id(cluster_id, cluster_type):
2726 nonlocal k8scluster_id_2_uuic
2727 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2728 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2729
2730 # check if K8scluster is creating and wait look if previous tasks in process
2731 task_name, task_dependency = self.lcm_tasks.lookfor_related("k8scluster", cluster_id)
2732 if task_dependency:
2733 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name, cluster_id)
2734 self.logger.debug(logging_text + text)
2735 await asyncio.wait(task_dependency, timeout=3600)
2736
2737 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2738 if not db_k8scluster:
2739 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2740
2741 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2742 if not k8s_id:
2743 if cluster_type == "helm-chart-v3":
2744 try:
2745 # backward compatibility for existing clusters that have not been initialized for helm v3
2746 k8s_credentials = yaml.safe_dump(db_k8scluster.get("credentials"))
2747 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(k8s_credentials,
2748 reuse_cluster_uuid=cluster_id)
2749 db_k8scluster_update = {}
2750 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
2751 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
2752 db_k8scluster_update["_admin.helm-chart-v3.created"] = uninstall_sw
2753 db_k8scluster_update["_admin.helm-chart-v3.operationalState"] = "ENABLED"
2754 self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update)
2755 except Exception as e:
2756 self.logger.error(logging_text + "error initializing helm-v3 cluster: {}".format(str(e)))
2757 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id,
2758 cluster_type))
2759 else:
2760 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".
2761 format(cluster_id, cluster_type))
2762 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2763 return k8s_id
2764
2765 logging_text += "Deploy kdus: "
2766 step = ""
2767 try:
2768 db_nsr_update = {"_admin.deployed.K8s": []}
2769 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2770
2771 index = 0
2772 updated_cluster_list = []
2773 updated_v3_cluster_list = []
2774
2775 for vnfr_data in db_vnfrs.values():
2776 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
2777 # Step 0: Prepare and set parameters
2778 desc_params = self._format_additional_params(kdur.get("additionalParams"))
2779 vnfd_id = vnfr_data.get('vnfd-id')
2780 kdud = next(kdud for kdud in db_vnfds[vnfd_id]["kdu"] if kdud["name"] == kdur["kdu-name"])
2781 namespace = kdur.get("k8s-namespace")
2782 if kdur.get("helm-chart"):
2783 kdumodel = kdur["helm-chart"]
2784 # Default version: helm3, if helm-version is v2 assign v2
2785 k8sclustertype = "helm-chart-v3"
2786 self.logger.debug("kdur: {}".format(kdur))
2787 if kdur.get("helm-version") and kdur.get("helm-version") == "v2":
2788 k8sclustertype = "helm-chart"
2789 elif kdur.get("juju-bundle"):
2790 kdumodel = kdur["juju-bundle"]
2791 k8sclustertype = "juju-bundle"
2792 else:
2793 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2794 "juju-bundle. Maybe an old NBI version is running".
2795 format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
2796 # check if kdumodel is a file and exists
2797 try:
2798 storage = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage'))
2799 if storage and storage.get('pkg-dir'): # may be not present if vnfd has not artifacts
2800 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2801 filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["pkg-dir"], k8sclustertype,
2802 kdumodel)
2803 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2804 kdumodel = self.fs.path + filename
2805 except (asyncio.TimeoutError, asyncio.CancelledError):
2806 raise
2807 except Exception: # it is not a file
2808 pass
2809
2810 k8s_cluster_id = kdur["k8s-cluster"]["id"]
2811 step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
2812 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
2813
2814 # Synchronize repos
2815 if (k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list)\
2816 or (k8sclustertype == "helm-chart-v3" and cluster_uuid not in updated_v3_cluster_list):
2817 del_repo_list, added_repo_dict = await asyncio.ensure_future(
2818 self.k8scluster_map[k8sclustertype].synchronize_repos(cluster_uuid=cluster_uuid))
2819 if del_repo_list or added_repo_dict:
2820 if k8sclustertype == "helm-chart":
2821 unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
2822 updated = {'_admin.helm_charts_added.' +
2823 item: name for item, name in added_repo_dict.items()}
2824 updated_cluster_list.append(cluster_uuid)
2825 elif k8sclustertype == "helm-chart-v3":
2826 unset = {'_admin.helm_charts_v3_added.' + item: None for item in del_repo_list}
2827 updated = {'_admin.helm_charts_v3_added.' +
2828 item: name for item, name in added_repo_dict.items()}
2829 updated_v3_cluster_list.append(cluster_uuid)
2830 self.logger.debug(logging_text + "repos synchronized on k8s cluster "
2831 "'{}' to_delete: {}, to_add: {}".
2832 format(k8s_cluster_id, del_repo_list, added_repo_dict))
2833 self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
2834
2835 # Instantiate kdu
2836 step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
2837 kdur["kdu-name"], k8s_cluster_id)
2838 k8s_instance_info = {"kdu-instance": None,
2839 "k8scluster-uuid": cluster_uuid,
2840 "k8scluster-type": k8sclustertype,
2841 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
2842 "kdu-name": kdur["kdu-name"],
2843 "kdu-model": kdumodel,
2844 "namespace": namespace}
2845 db_path = "_admin.deployed.K8s.{}".format(index)
2846 db_nsr_update[db_path] = k8s_instance_info
2847 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2848
2849 task = asyncio.ensure_future(
2850 self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdud, db_vnfds[vnfd_id],
2851 k8s_instance_info, k8params=desc_params, timeout=600))
2852 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
2853 task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
2854
2855 index += 1
2856
2857 except (LcmException, asyncio.CancelledError):
2858 raise
2859 except Exception as e:
2860 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
2861 if isinstance(e, (N2VCException, DbException)):
2862 self.logger.error(logging_text + msg)
2863 else:
2864 self.logger.critical(logging_text + msg, exc_info=True)
2865 raise LcmException(msg)
2866 finally:
2867 if db_nsr_update:
2868 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2869
2870 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2871 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2872 base_folder, task_instantiation_info, stage):
2873 # launch instantiate_N2VC in a asyncio task and register task object
2874 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2875 # if not found, create one entry and update database
2876 # fill db_nsr._admin.deployed.VCA.<index>
2877
2878 self.logger.debug(logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id))
2879 if descriptor_config.get("juju"): # There is one execution envioronment of type juju
2880 ee_list = [descriptor_config]
2881 elif descriptor_config.get("execution-environment-list"):
2882 ee_list = descriptor_config.get("execution-environment-list")
2883 else: # other types as script are not supported
2884 ee_list = []
2885
2886 for ee_item in ee_list:
2887 self.logger.debug(logging_text + "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item.get('juju'),
2888 ee_item.get("helm-chart")))
2889 ee_descriptor_id = ee_item.get("id")
2890 if ee_item.get("juju"):
2891 vca_name = ee_item['juju'].get('charm')
2892 vca_type = "lxc_proxy_charm" if ee_item['juju'].get('charm') is not None else "native_charm"
2893 if ee_item['juju'].get('cloud') == "k8s":
2894 vca_type = "k8s_proxy_charm"
2895 elif ee_item['juju'].get('proxy') is False:
2896 vca_type = "native_charm"
2897 elif ee_item.get("helm-chart"):
2898 vca_name = ee_item['helm-chart']
2899 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
2900 vca_type = "helm"
2901 else:
2902 vca_type = "helm-v3"
2903 else:
2904 self.logger.debug(logging_text + "skipping non juju neither charm configuration")
2905 continue
2906
2907 vca_index = -1
2908 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2909 if not vca_deployed:
2910 continue
2911 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2912 vca_deployed.get("vdu_id") == vdu_id and \
2913 vca_deployed.get("kdu_name") == kdu_name and \
2914 vca_deployed.get("vdu_count_index", 0) == vdu_index and \
2915 vca_deployed.get("ee_descriptor_id") == ee_descriptor_id:
2916 break
2917 else:
2918 # not found, create one.
2919 target = "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
2920 if vdu_id:
2921 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
2922 elif kdu_name:
2923 target += "/kdu/{}".format(kdu_name)
2924 vca_deployed = {
2925 "target_element": target,
2926 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
2927 "member-vnf-index": member_vnf_index,
2928 "vdu_id": vdu_id,
2929 "kdu_name": kdu_name,
2930 "vdu_count_index": vdu_index,
2931 "operational-status": "init", # TODO revise
2932 "detailed-status": "", # TODO revise
2933 "step": "initial-deploy", # TODO revise
2934 "vnfd_id": vnfd_id,
2935 "vdu_name": vdu_name,
2936 "type": vca_type,
2937 "ee_descriptor_id": ee_descriptor_id
2938 }
2939 vca_index += 1
2940
2941 # create VCA and configurationStatus in db
2942 db_dict = {
2943 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2944 "configurationStatus.{}".format(vca_index): dict()
2945 }
2946 self.update_db_2("nsrs", nsr_id, db_dict)
2947
2948 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2949
2950 # Launch task
2951 task_n2vc = asyncio.ensure_future(
2952 self.instantiate_N2VC(
2953 logging_text=logging_text,
2954 vca_index=vca_index,
2955 nsi_id=nsi_id,
2956 db_nsr=db_nsr,
2957 db_vnfr=db_vnfr,
2958 vdu_id=vdu_id,
2959 kdu_name=kdu_name,
2960 vdu_index=vdu_index,
2961 deploy_params=deploy_params,
2962 config_descriptor=descriptor_config,
2963 base_folder=base_folder,
2964 nslcmop_id=nslcmop_id,
2965 stage=stage,
2966 vca_type=vca_type,
2967 vca_name=vca_name,
2968 ee_config_descriptor=ee_item
2969 )
2970 )
2971 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2972 task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
2973 member_vnf_index or "", vdu_id or "")
2974
2975 @staticmethod
2976 def _get_terminate_config_primitive(primitive_list, vca_deployed):
2977 """ Get a sorted terminate config primitive list. In case ee_descriptor_id is present at vca_deployed,
2978 it get only those primitives for this execution envirom"""
2979
2980 primitive_list = primitive_list or []
2981 # filter primitives by ee_descriptor_id
2982 ee_descriptor_id = vca_deployed.get("ee_descriptor_id")
2983 primitive_list = [p for p in primitive_list if p.get("execution-environment-ref") == ee_descriptor_id]
2984
2985 if primitive_list:
2986 primitive_list.sort(key=lambda val: int(val['seq']))
2987
2988 return primitive_list
2989
2990 @staticmethod
2991 def _create_nslcmop(nsr_id, operation, params):
2992 """
2993 Creates a ns-lcm-opp content to be stored at database.
2994 :param nsr_id: internal id of the instance
2995 :param operation: instantiate, terminate, scale, action, ...
2996 :param params: user parameters for the operation
2997 :return: dictionary following SOL005 format
2998 """
2999 # Raise exception if invalid arguments
3000 if not (nsr_id and operation and params):
3001 raise LcmException(
3002 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
3003 now = time()
3004 _id = str(uuid4())
3005 nslcmop = {
3006 "id": _id,
3007 "_id": _id,
3008 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3009 "operationState": "PROCESSING",
3010 "statusEnteredTime": now,
3011 "nsInstanceId": nsr_id,
3012 "lcmOperationType": operation,
3013 "startTime": now,
3014 "isAutomaticInvocation": False,
3015 "operationParams": params,
3016 "isCancelPending": False,
3017 "links": {
3018 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
3019 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
3020 }
3021 }
3022 return nslcmop
3023
3024 def _format_additional_params(self, params):
3025 params = params or {}
3026 for key, value in params.items():
3027 if str(value).startswith("!!yaml "):
3028 params[key] = yaml.safe_load(value[7:])
3029 return params
3030
3031 def _get_terminate_primitive_params(self, seq, vnf_index):
3032 primitive = seq.get('name')
3033 primitive_params = {}
3034 params = {
3035 "member_vnf_index": vnf_index,
3036 "primitive": primitive,
3037 "primitive_params": primitive_params,
3038 }
3039 desc_params = {}
3040 return self._map_primitive_params(seq, params, desc_params)
3041
3042 # sub-operations
3043
3044 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
3045 op = deep_get(db_nslcmop, ('_admin', 'operations'), [])[op_index]
3046 if op.get('operationState') == 'COMPLETED':
3047 # b. Skip sub-operation
3048 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3049 return self.SUBOPERATION_STATUS_SKIP
3050 else:
3051 # c. retry executing sub-operation
3052 # The sub-operation exists, and operationState != 'COMPLETED'
3053 # Update operationState = 'PROCESSING' to indicate a retry.
3054 operationState = 'PROCESSING'
3055 detailed_status = 'In progress'
3056 self._update_suboperation_status(
3057 db_nslcmop, op_index, operationState, detailed_status)
3058 # Return the sub-operation index
3059 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3060 # with arguments extracted from the sub-operation
3061 return op_index
3062
3063 # Find a sub-operation where all keys in a matching dictionary must match
3064 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3065 def _find_suboperation(self, db_nslcmop, match):
3066 if db_nslcmop and match:
3067 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
3068 for i, op in enumerate(op_list):
3069 if all(op.get(k) == match[k] for k in match):
3070 return i
3071 return self.SUBOPERATION_STATUS_NOT_FOUND
3072
3073 # Update status for a sub-operation given its index
3074 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
3075 # Update DB for HA tasks
3076 q_filter = {'_id': db_nslcmop['_id']}
3077 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
3078 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
3079 self.db.set_one("nslcmops",
3080 q_filter=q_filter,
3081 update_dict=update_dict,
3082 fail_on_empty=False)
3083
3084 # Add sub-operation, return the index of the added sub-operation
3085 # Optionally, set operationState, detailed-status, and operationType
3086 # Status and type are currently set for 'scale' sub-operations:
3087 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3088 # 'detailed-status' : status message
3089 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3090 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3091 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
3092 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
3093 RO_nsr_id=None, RO_scaling_info=None):
3094 if not db_nslcmop:
3095 return self.SUBOPERATION_STATUS_NOT_FOUND
3096 # Get the "_admin.operations" list, if it exists
3097 db_nslcmop_admin = db_nslcmop.get('_admin', {})
3098 op_list = db_nslcmop_admin.get('operations')
3099 # Create or append to the "_admin.operations" list
3100 new_op = {'member_vnf_index': vnf_index,
3101 'vdu_id': vdu_id,
3102 'vdu_count_index': vdu_count_index,
3103 'primitive': primitive,
3104 'primitive_params': mapped_primitive_params}
3105 if operationState:
3106 new_op['operationState'] = operationState
3107 if detailed_status:
3108 new_op['detailed-status'] = detailed_status
3109 if operationType:
3110 new_op['lcmOperationType'] = operationType
3111 if RO_nsr_id:
3112 new_op['RO_nsr_id'] = RO_nsr_id
3113 if RO_scaling_info:
3114 new_op['RO_scaling_info'] = RO_scaling_info
3115 if not op_list:
3116 # No existing operations, create key 'operations' with current operation as first list element
3117 db_nslcmop_admin.update({'operations': [new_op]})
3118 op_list = db_nslcmop_admin.get('operations')
3119 else:
3120 # Existing operations, append operation to list
3121 op_list.append(new_op)
3122
3123 db_nslcmop_update = {'_admin.operations': op_list}
3124 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
3125 op_index = len(op_list) - 1
3126 return op_index
3127
3128 # Helper methods for scale() sub-operations
3129
3130 # pre-scale/post-scale:
3131 # Check for 3 different cases:
3132 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3133 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3134 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3135 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
3136 operationType, RO_nsr_id=None, RO_scaling_info=None):
3137 # Find this sub-operation
3138 if RO_nsr_id and RO_scaling_info:
3139 operationType = 'SCALE-RO'
3140 match = {
3141 'member_vnf_index': vnf_index,
3142 'RO_nsr_id': RO_nsr_id,
3143 'RO_scaling_info': RO_scaling_info,
3144 }
3145 else:
3146 match = {
3147 'member_vnf_index': vnf_index,
3148 'primitive': vnf_config_primitive,
3149 'primitive_params': primitive_params,
3150 'lcmOperationType': operationType
3151 }
3152 op_index = self._find_suboperation(db_nslcmop, match)
3153 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3154 # a. New sub-operation
3155 # The sub-operation does not exist, add it.
3156 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3157 # The following parameters are set to None for all kind of scaling:
3158 vdu_id = None
3159 vdu_count_index = None
3160 vdu_name = None
3161 if RO_nsr_id and RO_scaling_info:
3162 vnf_config_primitive = None
3163 primitive_params = None
3164 else:
3165 RO_nsr_id = None
3166 RO_scaling_info = None
3167 # Initial status for sub-operation
3168 operationState = 'PROCESSING'
3169 detailed_status = 'In progress'
3170 # Add sub-operation for pre/post-scaling (zero or more operations)
3171 self._add_suboperation(db_nslcmop,
3172 vnf_index,
3173 vdu_id,
3174 vdu_count_index,
3175 vdu_name,
3176 vnf_config_primitive,
3177 primitive_params,
3178 operationState,
3179 detailed_status,
3180 operationType,
3181 RO_nsr_id,
3182 RO_scaling_info)
3183 return self.SUBOPERATION_STATUS_NEW
3184 else:
3185 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3186 # or op_index (operationState != 'COMPLETED')
3187 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3188
3189 # Function to return execution_environment id
3190
3191 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3192 # TODO vdu_index_count
3193 for vca in vca_deployed_list:
3194 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3195 return vca["ee_id"]
3196
3197 async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor,
3198 vca_index, destroy_ee=True, exec_primitives=True):
3199 """
3200 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3201 :param logging_text:
3202 :param db_nslcmop:
3203 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3204 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3205 :param vca_index: index in the database _admin.deployed.VCA
3206 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3207 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3208 not executed properly
3209 :return: None or exception
3210 """
3211
3212 self.logger.debug(
3213 logging_text + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3214 vca_index, vca_deployed, config_descriptor, destroy_ee
3215 )
3216 )
3217
3218 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
3219
3220 # execute terminate_primitives
3221 if exec_primitives:
3222 terminate_primitives = self._get_terminate_config_primitive(
3223 config_descriptor.get("terminate-config-primitive"), vca_deployed)
3224 vdu_id = vca_deployed.get("vdu_id")
3225 vdu_count_index = vca_deployed.get("vdu_count_index")
3226 vdu_name = vca_deployed.get("vdu_name")
3227 vnf_index = vca_deployed.get("member-vnf-index")
3228 if terminate_primitives and vca_deployed.get("needed_terminate"):
3229 for seq in terminate_primitives:
3230 # For each sequence in list, get primitive and call _ns_execute_primitive()
3231 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
3232 vnf_index, seq.get("name"))
3233 self.logger.debug(logging_text + step)
3234 # Create the primitive for each sequence, i.e. "primitive": "touch"
3235 primitive = seq.get('name')
3236 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
3237
3238 # Add sub-operation
3239 self._add_suboperation(db_nslcmop,
3240 vnf_index,
3241 vdu_id,
3242 vdu_count_index,
3243 vdu_name,
3244 primitive,
3245 mapped_primitive_params)
3246 # Sub-operations: Call _ns_execute_primitive() instead of action()
3247 try:
3248 result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
3249 mapped_primitive_params,
3250 vca_type=vca_type)
3251 except LcmException:
3252 # this happens when VCA is not deployed. In this case it is not needed to terminate
3253 continue
3254 result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
3255 if result not in result_ok:
3256 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
3257 "error {}".format(seq.get("name"), vnf_index, result_detail))
3258 # set that this VCA do not need terminated
3259 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
3260 self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
3261
3262 if vca_deployed.get("prometheus_jobs") and self.prometheus:
3263 await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
3264
3265 if destroy_ee:
3266 await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"])
3267
3268 async def _delete_all_N2VC(self, db_nsr: dict):
3269 self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
3270 namespace = "." + db_nsr["_id"]
3271 try:
3272 await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
3273 except N2VCNotFound: # already deleted. Skip
3274 pass
3275 self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
3276
3277 async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
3278 """
3279 Terminates a deployment from RO
3280 :param logging_text:
3281 :param nsr_deployed: db_nsr._admin.deployed
3282 :param nsr_id:
3283 :param nslcmop_id:
3284 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
3285 this method will update only the index 2, but it will write on database the concatenated content of the list
3286 :return:
3287 """
3288 db_nsr_update = {}
3289 failed_detail = []
3290 ro_nsr_id = ro_delete_action = None
3291 if nsr_deployed and nsr_deployed.get("RO"):
3292 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
3293 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
3294 try:
3295 if ro_nsr_id:
3296 stage[2] = "Deleting ns from VIM."
3297 db_nsr_update["detailed-status"] = " ".join(stage)
3298 self._write_op_status(nslcmop_id, stage)
3299 self.logger.debug(logging_text + stage[2])
3300 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3301 self._write_op_status(nslcmop_id, stage)
3302 desc = await self.RO.delete("ns", ro_nsr_id)
3303 ro_delete_action = desc["action_id"]
3304 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
3305 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3306 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3307 if ro_delete_action:
3308 # wait until NS is deleted from VIM
3309 stage[2] = "Waiting ns deleted from VIM."
3310 detailed_status_old = None
3311 self.logger.debug(logging_text + stage[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id,
3312 ro_delete_action))
3313 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3314 self._write_op_status(nslcmop_id, stage)
3315
3316 delete_timeout = 20 * 60 # 20 minutes
3317 while delete_timeout > 0:
3318 desc = await self.RO.show(
3319 "ns",
3320 item_id_name=ro_nsr_id,
3321 extra_item="action",
3322 extra_item_id=ro_delete_action)
3323
3324 # deploymentStatus
3325 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3326
3327 ns_status, ns_status_info = self.RO.check_action_status(desc)
3328 if ns_status == "ERROR":
3329 raise ROclient.ROClientException(ns_status_info)
3330 elif ns_status == "BUILD":
3331 stage[2] = "Deleting from VIM {}".format(ns_status_info)
3332 elif ns_status == "ACTIVE":
3333 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3334 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3335 break
3336 else:
3337 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
3338 if stage[2] != detailed_status_old:
3339 detailed_status_old = stage[2]
3340 db_nsr_update["detailed-status"] = " ".join(stage)
3341 self._write_op_status(nslcmop_id, stage)
3342 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3343 await asyncio.sleep(5, loop=self.loop)
3344 delete_timeout -= 5
3345 else: # delete_timeout <= 0:
3346 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
3347
3348 except Exception as e:
3349 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3350 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3351 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3352 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3353 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3354 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id))
3355 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3356 failed_detail.append("delete conflict: {}".format(e))
3357 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
3358 else:
3359 failed_detail.append("delete error: {}".format(e))
3360 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e))
3361
3362 # Delete nsd
3363 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
3364 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
3365 try:
3366 stage[2] = "Deleting nsd from RO."
3367 db_nsr_update["detailed-status"] = " ".join(stage)
3368 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3369 self._write_op_status(nslcmop_id, stage)
3370 await self.RO.delete("nsd", ro_nsd_id)
3371 self.logger.debug(logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id))
3372 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3373 except Exception as e:
3374 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3375 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3376 self.logger.debug(logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id))
3377 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3378 failed_detail.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e))
3379 self.logger.debug(logging_text + failed_detail[-1])
3380 else:
3381 failed_detail.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id, e))
3382 self.logger.error(logging_text + failed_detail[-1])
3383
3384 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
3385 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
3386 if not vnf_deployed or not vnf_deployed["id"]:
3387 continue
3388 try:
3389 ro_vnfd_id = vnf_deployed["id"]
3390 stage[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
3391 vnf_deployed["member-vnf-index"], ro_vnfd_id)
3392 db_nsr_update["detailed-status"] = " ".join(stage)
3393 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3394 self._write_op_status(nslcmop_id, stage)
3395 await self.RO.delete("vnfd", ro_vnfd_id)
3396 self.logger.debug(logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id))
3397 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
3398 except Exception as e:
3399 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3400 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
3401 self.logger.debug(logging_text + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id))
3402 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3403 failed_detail.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e))
3404 self.logger.debug(logging_text + failed_detail[-1])
3405 else:
3406 failed_detail.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e))
3407 self.logger.error(logging_text + failed_detail[-1])
3408
3409 if failed_detail:
3410 stage[2] = "Error deleting from VIM"
3411 else:
3412 stage[2] = "Deleted from VIM"
3413 db_nsr_update["detailed-status"] = " ".join(stage)
3414 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3415 self._write_op_status(nslcmop_id, stage)
3416
3417 if failed_detail:
3418 raise LcmException("; ".join(failed_detail))
3419
3420 async def terminate(self, nsr_id, nslcmop_id):
3421 # Try to lock HA task here
3422 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3423 if not task_is_locked_by_me:
3424 return
3425
3426 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
3427 self.logger.debug(logging_text + "Enter")
3428 timeout_ns_terminate = self.timeout_ns_terminate
3429 db_nsr = None
3430 db_nslcmop = None
3431 operation_params = None
3432 exc = None
3433 error_list = [] # annotates all failed error messages
3434 db_nslcmop_update = {}
3435 autoremove = False # autoremove after terminated
3436 tasks_dict_info = {}
3437 db_nsr_update = {}
3438 stage = ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
3439 # ^ contains [stage, step, VIM-status]
3440 try:
3441 # wait for any previous tasks in process
3442 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
3443
3444 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
3445 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3446 operation_params = db_nslcmop.get("operationParams") or {}
3447 if operation_params.get("timeout_ns_terminate"):
3448 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
3449 stage[1] = "Getting nsr={} from db.".format(nsr_id)
3450 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3451
3452 db_nsr_update["operational-status"] = "terminating"
3453 db_nsr_update["config-status"] = "terminating"
3454 self._write_ns_status(
3455 nsr_id=nsr_id,
3456 ns_state="TERMINATING",
3457 current_operation="TERMINATING",
3458 current_operation_id=nslcmop_id,
3459 other_update=db_nsr_update
3460 )
3461 self._write_op_status(
3462 op_id=nslcmop_id,
3463 queuePosition=0,
3464 stage=stage
3465 )
3466 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
3467 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
3468 return
3469
3470 stage[1] = "Getting vnf descriptors from db."
3471 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
3472 db_vnfds_from_id = {}
3473 db_vnfds_from_member_index = {}
3474 # Loop over VNFRs
3475 for vnfr in db_vnfrs_list:
3476 vnfd_id = vnfr["vnfd-id"]
3477 if vnfd_id not in db_vnfds_from_id:
3478 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
3479 db_vnfds_from_id[vnfd_id] = vnfd
3480 db_vnfds_from_member_index[vnfr["member-vnf-index-ref"]] = db_vnfds_from_id[vnfd_id]
3481
3482 # Destroy individual execution environments when there are terminating primitives.
3483 # Rest of EE will be deleted at once
3484 # TODO - check before calling _destroy_N2VC
3485 # if not operation_params.get("skip_terminate_primitives"):#
3486 # or not vca.get("needed_terminate"):
3487 stage[0] = "Stage 2/3 execute terminating primitives."
3488 self.logger.debug(logging_text + stage[0])
3489 stage[1] = "Looking execution environment that needs terminate."
3490 self.logger.debug(logging_text + stage[1])
3491 # self.logger.debug("nsr_deployed: {}".format(nsr_deployed))
3492 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
3493 config_descriptor = None
3494 if not vca or not vca.get("ee_id"):
3495 continue
3496 if not vca.get("member-vnf-index"):
3497 # ns
3498 config_descriptor = db_nsr.get("ns-configuration")
3499 elif vca.get("vdu_id"):
3500 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3501 vdud = next((vdu for vdu in db_vnfd.get("vdu", ()) if vdu["id"] == vca.get("vdu_id")), None)
3502 if vdud:
3503 config_descriptor = vdud.get("vdu-configuration")
3504 elif vca.get("kdu_name"):
3505 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3506 kdud = next((kdu for kdu in db_vnfd.get("kdu", ()) if kdu["name"] == vca.get("kdu_name")), None)
3507 if kdud:
3508 config_descriptor = kdud.get("kdu-configuration")
3509 else:
3510 config_descriptor = db_vnfds_from_member_index[vca["member-vnf-index"]].get("vnf-configuration")
3511 vca_type = vca.get("type")
3512 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
3513 vca.get("needed_terminate"))
3514 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
3515 # pending native charms
3516 destroy_ee = True if vca_type in ("helm", "helm-v3", "native_charm") else False
3517 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
3518 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
3519 task = asyncio.ensure_future(
3520 self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index,
3521 destroy_ee, exec_terminate_primitives))
3522 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
3523
3524 # wait for pending tasks of terminate primitives
3525 if tasks_dict_info:
3526 self.logger.debug(logging_text + 'Waiting for tasks {}'.format(list(tasks_dict_info.keys())))
3527 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
3528 min(self.timeout_charm_delete, timeout_ns_terminate),
3529 stage, nslcmop_id)
3530 tasks_dict_info.clear()
3531 if error_list:
3532 return # raise LcmException("; ".join(error_list))
3533
3534 # remove All execution environments at once
3535 stage[0] = "Stage 3/3 delete all."
3536
3537 if nsr_deployed.get("VCA"):
3538 stage[1] = "Deleting all execution environments."
3539 self.logger.debug(logging_text + stage[1])
3540 task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_all_N2VC(db_nsr=db_nsr),
3541 timeout=self.timeout_charm_delete))
3542 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3543 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
3544
3545 # Delete from k8scluster
3546 stage[1] = "Deleting KDUs."
3547 self.logger.debug(logging_text + stage[1])
3548 # print(nsr_deployed)
3549 for kdu in get_iterable(nsr_deployed, "K8s"):
3550 if not kdu or not kdu.get("kdu-instance"):
3551 continue
3552 kdu_instance = kdu.get("kdu-instance")
3553 if kdu.get("k8scluster-type") in self.k8scluster_map:
3554 task_delete_kdu_instance = asyncio.ensure_future(
3555 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
3556 cluster_uuid=kdu.get("k8scluster-uuid"),
3557 kdu_instance=kdu_instance))
3558 else:
3559 self.logger.error(logging_text + "Unknown k8s deployment type {}".
3560 format(kdu.get("k8scluster-type")))
3561 continue
3562 tasks_dict_info[task_delete_kdu_instance] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
3563
3564 # remove from RO
3565 stage[1] = "Deleting ns from VIM."
3566 if self.ng_ro:
3567 task_delete_ro = asyncio.ensure_future(
3568 self._terminate_ng_ro(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3569 else:
3570 task_delete_ro = asyncio.ensure_future(
3571 self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3572 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
3573
3574 # rest of staff will be done at finally
3575
3576 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
3577 self.logger.error(logging_text + "Exit Exception {}".format(e))
3578 exc = e
3579 except asyncio.CancelledError:
3580 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
3581 exc = "Operation was cancelled"
3582 except Exception as e:
3583 exc = traceback.format_exc()
3584 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
3585 finally:
3586 if exc:
3587 error_list.append(str(exc))
3588 try:
3589 # wait for pending tasks
3590 if tasks_dict_info:
3591 stage[1] = "Waiting for terminate pending tasks."
3592 self.logger.debug(logging_text + stage[1])
3593 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_terminate,
3594 stage, nslcmop_id)
3595 stage[1] = stage[2] = ""
3596 except asyncio.CancelledError:
3597 error_list.append("Cancelled")
3598 # TODO cancell all tasks
3599 except Exception as exc:
3600 error_list.append(str(exc))
3601 # update status at database
3602 if error_list:
3603 error_detail = "; ".join(error_list)
3604 # self.logger.error(logging_text + error_detail)
3605 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
3606 error_description_nsr = 'Operation: TERMINATING.{}, {}.'.format(nslcmop_id, stage[0])
3607
3608 db_nsr_update["operational-status"] = "failed"
3609 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
3610 db_nslcmop_update["detailed-status"] = error_detail
3611 nslcmop_operation_state = "FAILED"
3612 ns_state = "BROKEN"
3613 else:
3614 error_detail = None
3615 error_description_nsr = error_description_nslcmop = None
3616 ns_state = "NOT_INSTANTIATED"
3617 db_nsr_update["operational-status"] = "terminated"
3618 db_nsr_update["detailed-status"] = "Done"
3619 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
3620 db_nslcmop_update["detailed-status"] = "Done"
3621 nslcmop_operation_state = "COMPLETED"
3622
3623 if db_nsr:
3624 self._write_ns_status(
3625 nsr_id=nsr_id,
3626 ns_state=ns_state,
3627 current_operation="IDLE",
3628 current_operation_id=None,
3629 error_description=error_description_nsr,
3630 error_detail=error_detail,
3631 other_update=db_nsr_update
3632 )
3633 self._write_op_status(
3634 op_id=nslcmop_id,
3635 stage="",
3636 error_message=error_description_nslcmop,
3637 operation_state=nslcmop_operation_state,
3638 other_update=db_nslcmop_update,
3639 )
3640 if ns_state == "NOT_INSTANTIATED":
3641 try:
3642 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "NOT_INSTANTIATED"})
3643 except DbException as e:
3644 self.logger.warn(logging_text + 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
3645 format(nsr_id, e))
3646 if operation_params:
3647 autoremove = operation_params.get("autoremove", False)
3648 if nslcmop_operation_state:
3649 try:
3650 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3651 "operationState": nslcmop_operation_state,
3652 "autoremove": autoremove},
3653 loop=self.loop)
3654 except Exception as e:
3655 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3656
3657 self.logger.debug(logging_text + "Exit")
3658 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
3659
3660 async def _wait_for_tasks(self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None):
3661 time_start = time()
3662 error_detail_list = []
3663 error_list = []
3664 pending_tasks = list(created_tasks_info.keys())
3665 num_tasks = len(pending_tasks)
3666 num_done = 0
3667 stage[1] = "{}/{}.".format(num_done, num_tasks)
3668 self._write_op_status(nslcmop_id, stage)
3669 while pending_tasks:
3670 new_error = None
3671 _timeout = timeout + time_start - time()
3672 done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout,
3673 return_when=asyncio.FIRST_COMPLETED)
3674 num_done += len(done)
3675 if not done: # Timeout
3676 for task in pending_tasks:
3677 new_error = created_tasks_info[task] + ": Timeout"
3678 error_detail_list.append(new_error)
3679 error_list.append(new_error)
3680 break
3681 for task in done:
3682 if task.cancelled():
3683 exc = "Cancelled"
3684 else:
3685 exc = task.exception()
3686 if exc:
3687 if isinstance(exc, asyncio.TimeoutError):
3688 exc = "Timeout"
3689 new_error = created_tasks_info[task] + ": {}".format(exc)
3690 error_list.append(created_tasks_info[task])
3691 error_detail_list.append(new_error)
3692 if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException,
3693 K8sException, NgRoException)):
3694 self.logger.error(logging_text + new_error)
3695 else:
3696 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
3697 self.logger.error(logging_text + created_tasks_info[task] + " " + exc_traceback)
3698 else:
3699 self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
3700 stage[1] = "{}/{}.".format(num_done, num_tasks)
3701 if new_error:
3702 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
3703 if nsr_id: # update also nsr
3704 self.update_db_2("nsrs", nsr_id, {"errorDescription": "Error at: " + ", ".join(error_list),
3705 "errorDetail": ". ".join(error_detail_list)})
3706 self._write_op_status(nslcmop_id, stage)
3707 return error_detail_list
3708
3709 @staticmethod
3710 def _map_primitive_params(primitive_desc, params, instantiation_params):
3711 """
3712 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3713 The default-value is used. If it is between < > it look for a value at instantiation_params
3714 :param primitive_desc: portion of VNFD/NSD that describes primitive
3715 :param params: Params provided by user
3716 :param instantiation_params: Instantiation params provided by user
3717 :return: a dictionary with the calculated params
3718 """
3719 calculated_params = {}
3720 for parameter in primitive_desc.get("parameter", ()):
3721 param_name = parameter["name"]
3722 if param_name in params:
3723 calculated_params[param_name] = params[param_name]
3724 elif "default-value" in parameter or "value" in parameter:
3725 if "value" in parameter:
3726 calculated_params[param_name] = parameter["value"]
3727 else:
3728 calculated_params[param_name] = parameter["default-value"]
3729 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
3730 and calculated_params[param_name].endswith(">"):
3731 if calculated_params[param_name][1:-1] in instantiation_params:
3732 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
3733 else:
3734 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3735 format(calculated_params[param_name], primitive_desc["name"]))
3736 else:
3737 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3738 format(param_name, primitive_desc["name"]))
3739
3740 if isinstance(calculated_params[param_name], (dict, list, tuple)):
3741 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
3742 width=256)
3743 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
3744 calculated_params[param_name] = calculated_params[param_name][7:]
3745 if parameter.get("data-type") == "INTEGER":
3746 try:
3747 calculated_params[param_name] = int(calculated_params[param_name])
3748 except ValueError: # error converting string to int
3749 raise LcmException(
3750 "Parameter {} of primitive {} must be integer".format(param_name, primitive_desc["name"]))
3751 elif parameter.get("data-type") == "BOOLEAN":
3752 calculated_params[param_name] = not ((str(calculated_params[param_name])).lower() == 'false')
3753
3754 # add always ns_config_info if primitive name is config
3755 if primitive_desc["name"] == "config":
3756 if "ns_config_info" in instantiation_params:
3757 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
3758 return calculated_params
3759
3760 def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None,
3761 ee_descriptor_id=None):
3762 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3763 for vca in deployed_vca:
3764 if not vca:
3765 continue
3766 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
3767 continue
3768 if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]:
3769 continue
3770 if kdu_name and kdu_name != vca["kdu_name"]:
3771 continue
3772 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
3773 continue
3774 break
3775 else:
3776 # vca_deployed not found
3777 raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
3778 " is not deployed".format(member_vnf_index, vdu_id, vdu_count_index, kdu_name,
3779 ee_descriptor_id))
3780
3781 # get ee_id
3782 ee_id = vca.get("ee_id")
3783 vca_type = vca.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3784 if not ee_id:
3785 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3786 "execution environment"
3787 .format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
3788 return ee_id, vca_type
3789
3790 async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0,
3791 retries_interval=30, timeout=None,
3792 vca_type=None, db_dict=None) -> (str, str):
3793 try:
3794 if primitive == "config":
3795 primitive_params = {"params": primitive_params}
3796
3797 vca_type = vca_type or "lxc_proxy_charm"
3798
3799 while retries >= 0:
3800 try:
3801 output = await asyncio.wait_for(
3802 self.vca_map[vca_type].exec_primitive(
3803 ee_id=ee_id,
3804 primitive_name=primitive,
3805 params_dict=primitive_params,
3806 progress_timeout=self.timeout_progress_primitive,
3807 total_timeout=self.timeout_primitive,
3808 db_dict=db_dict),
3809 timeout=timeout or self.timeout_primitive)
3810 # execution was OK
3811 break
3812 except asyncio.CancelledError:
3813 raise
3814 except Exception as e: # asyncio.TimeoutError
3815 if isinstance(e, asyncio.TimeoutError):
3816 e = "Timeout"
3817 retries -= 1
3818 if retries >= 0:
3819 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
3820 # wait and retry
3821 await asyncio.sleep(retries_interval, loop=self.loop)
3822 else:
3823 return 'FAILED', str(e)
3824
3825 return 'COMPLETED', output
3826
3827 except (LcmException, asyncio.CancelledError):
3828 raise
3829 except Exception as e:
3830 return 'FAIL', 'Error executing action {}: {}'.format(primitive, e)
3831
3832 async def action(self, nsr_id, nslcmop_id):
3833
3834 # Try to lock HA task here
3835 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3836 if not task_is_locked_by_me:
3837 return
3838
3839 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
3840 self.logger.debug(logging_text + "Enter")
3841 # get all needed from database
3842 db_nsr = None
3843 db_nslcmop = None
3844 db_nsr_update = {}
3845 db_nslcmop_update = {}
3846 nslcmop_operation_state = None
3847 error_description_nslcmop = None
3848 exc = None
3849 try:
3850 # wait for any previous tasks in process
3851 step = "Waiting for previous operations to terminate"
3852 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3853
3854 self._write_ns_status(
3855 nsr_id=nsr_id,
3856 ns_state=None,
3857 current_operation="RUNNING ACTION",
3858 current_operation_id=nslcmop_id
3859 )
3860
3861 step = "Getting information from database"
3862 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3863 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3864
3865 nsr_deployed = db_nsr["_admin"].get("deployed")
3866 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3867 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3868 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
3869 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3870 primitive = db_nslcmop["operationParams"]["primitive"]
3871 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
3872 timeout_ns_action = db_nslcmop["operationParams"].get("timeout_ns_action", self.timeout_primitive)
3873
3874 if vnf_index:
3875 step = "Getting vnfr from database"
3876 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3877 step = "Getting vnfd from database"
3878 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3879 else:
3880 step = "Getting nsd from database"
3881 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
3882
3883 # for backward compatibility
3884 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3885 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3886 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3887 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3888
3889 # look for primitive
3890 config_primitive_desc = descriptor_configuration = None
3891 if vdu_id:
3892 for vdu in get_iterable(db_vnfd, "vdu"):
3893 if vdu_id == vdu["id"]:
3894 descriptor_configuration = vdu.get("vdu-configuration")
3895 break
3896 elif kdu_name:
3897 for kdu in get_iterable(db_vnfd, "kdu"):
3898 if kdu_name == kdu["name"]:
3899 descriptor_configuration = kdu.get("kdu-configuration")
3900 break
3901 elif vnf_index:
3902 descriptor_configuration = db_vnfd.get("vnf-configuration")
3903 else:
3904 descriptor_configuration = db_nsd.get("ns-configuration")
3905
3906 if descriptor_configuration and descriptor_configuration.get("config-primitive"):
3907 for config_primitive in descriptor_configuration["config-primitive"]:
3908 if config_primitive["name"] == primitive:
3909 config_primitive_desc = config_primitive
3910 break
3911
3912 if not config_primitive_desc:
3913 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
3914 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3915 format(primitive))
3916 primitive_name = primitive
3917 ee_descriptor_id = None
3918 else:
3919 primitive_name = config_primitive_desc.get("execution-environment-primitive", primitive)
3920 ee_descriptor_id = config_primitive_desc.get("execution-environment-ref")
3921
3922 if vnf_index:
3923 if vdu_id:
3924 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3925 desc_params = self._format_additional_params(vdur.get("additionalParams"))
3926 elif kdu_name:
3927 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
3928 desc_params = self._format_additional_params(kdur.get("additionalParams"))
3929 else:
3930 desc_params = self._format_additional_params(db_vnfr.get("additionalParamsForVnf"))
3931 else:
3932 desc_params = self._format_additional_params(db_nsr.get("additionalParamsForNs"))
3933
3934 if kdu_name:
3935 kdu_action = True if not deep_get(kdu, ("kdu-configuration", "juju")) else False
3936
3937 # TODO check if ns is in a proper status
3938 if kdu_name and (primitive_name in ("upgrade", "rollback", "status") or kdu_action):
3939 # kdur and desc_params already set from before
3940 if primitive_params:
3941 desc_params.update(primitive_params)
3942 # TODO Check if we will need something at vnf level
3943 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
3944 if kdu_name == kdu["kdu-name"] and kdu["member-vnf-index"] == vnf_index:
3945 break
3946 else:
3947 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index))
3948
3949 if kdu.get("k8scluster-type") not in self.k8scluster_map:
3950 msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
3951 raise LcmException(msg)
3952
3953 db_dict = {"collection": "nsrs",
3954 "filter": {"_id": nsr_id},
3955 "path": "_admin.deployed.K8s.{}".format(index)}
3956 self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name))
3957 step = "Executing kdu {}".format(primitive_name)
3958 if primitive_name == "upgrade":
3959 if desc_params.get("kdu_model"):
3960 kdu_model = desc_params.get("kdu_model")
3961 del desc_params["kdu_model"]
3962 else:
3963 kdu_model = kdu.get("kdu-model")
3964 parts = kdu_model.split(sep=":")
3965 if len(parts) == 2:
3966 kdu_model = parts[0]
3967
3968 detailed_status = await asyncio.wait_for(
3969 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
3970 cluster_uuid=kdu.get("k8scluster-uuid"),
3971 kdu_instance=kdu.get("kdu-instance"),
3972 atomic=True, kdu_model=kdu_model,
3973 params=desc_params, db_dict=db_dict,
3974 timeout=timeout_ns_action),
3975 timeout=timeout_ns_action + 10)
3976 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(detailed_status))
3977 elif primitive_name == "rollback":
3978 detailed_status = await asyncio.wait_for(
3979 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
3980 cluster_uuid=kdu.get("k8scluster-uuid"),
3981 kdu_instance=kdu.get("kdu-instance"),
3982 db_dict=db_dict),
3983 timeout=timeout_ns_action)
3984 elif primitive_name == "status":
3985 detailed_status = await asyncio.wait_for(
3986 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
3987 cluster_uuid=kdu.get("k8scluster-uuid"),
3988 kdu_instance=kdu.get("kdu-instance")),
3989 timeout=timeout_ns_action)
3990 else:
3991 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(kdu["kdu-name"], nsr_id)
3992 params = self._map_primitive_params(config_primitive_desc, primitive_params, desc_params)
3993
3994 detailed_status = await asyncio.wait_for(
3995 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
3996 cluster_uuid=kdu.get("k8scluster-uuid"),
3997 kdu_instance=kdu_instance,
3998 primitive_name=primitive_name,
3999 params=params, db_dict=db_dict,
4000 timeout=timeout_ns_action),
4001 timeout=timeout_ns_action)
4002
4003 if detailed_status:
4004 nslcmop_operation_state = 'COMPLETED'
4005 else:
4006 detailed_status = ''
4007 nslcmop_operation_state = 'FAILED'
4008 else:
4009 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
4010 member_vnf_index=vnf_index,
4011 vdu_id=vdu_id,
4012 vdu_count_index=vdu_count_index,
4013 ee_descriptor_id=ee_descriptor_id)
4014 db_nslcmop_notif = {"collection": "nslcmops",
4015 "filter": {"_id": nslcmop_id},
4016 "path": "admin.VCA"}
4017 nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
4018 ee_id,
4019 primitive=primitive_name,
4020 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
4021 timeout=timeout_ns_action,
4022 vca_type=vca_type,
4023 db_dict=db_nslcmop_notif)
4024
4025 db_nslcmop_update["detailed-status"] = detailed_status
4026 error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else ""
4027 self.logger.debug(logging_text + " task Done with result {} {}".format(nslcmop_operation_state,
4028 detailed_status))
4029 return # database update is called inside finally
4030
4031 except (DbException, LcmException, N2VCException, K8sException) as e:
4032 self.logger.error(logging_text + "Exit Exception {}".format(e))
4033 exc = e
4034 except asyncio.CancelledError:
4035 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
4036 exc = "Operation was cancelled"
4037 except asyncio.TimeoutError:
4038 self.logger.error(logging_text + "Timeout while '{}'".format(step))
4039 exc = "Timeout"
4040 except Exception as e:
4041 exc = traceback.format_exc()
4042 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
4043 finally:
4044 if exc:
4045 db_nslcmop_update["detailed-status"] = detailed_status = error_description_nslcmop = \
4046 "FAILED {}: {}".format(step, exc)
4047 nslcmop_operation_state = "FAILED"
4048 if db_nsr:
4049 self._write_ns_status(
4050 nsr_id=nsr_id,
4051 ns_state=db_nsr["nsState"], # TODO check if degraded. For the moment use previous status
4052 current_operation="IDLE",
4053 current_operation_id=None,
4054 # error_description=error_description_nsr,
4055 # error_detail=error_detail,
4056 other_update=db_nsr_update
4057 )
4058
4059 self._write_op_status(
4060 op_id=nslcmop_id,
4061 stage="",
4062 error_message=error_description_nslcmop,
4063 operation_state=nslcmop_operation_state,
4064 other_update=db_nslcmop_update,
4065 )
4066
4067 if nslcmop_operation_state:
4068 try:
4069 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
4070 "operationState": nslcmop_operation_state},
4071 loop=self.loop)
4072 except Exception as e:
4073 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
4074 self.logger.debug(logging_text + "Exit")
4075 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
4076 return nslcmop_operation_state, detailed_status
4077
4078 async def scale(self, nsr_id, nslcmop_id):
4079
4080 # Try to lock HA task here
4081 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
4082 if not task_is_locked_by_me:
4083 return
4084
4085 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
4086 stage = ['', '', '']
4087 # ^ stage, step, VIM progress
4088 self.logger.debug(logging_text + "Enter")
4089 # get all needed from database
4090 db_nsr = None
4091 db_nslcmop = None
4092 db_nslcmop_update = {}
4093 nslcmop_operation_state = None
4094 db_nsr_update = {}
4095 exc = None
4096 # in case of error, indicates what part of scale was failed to put nsr at error status
4097 scale_process = None
4098 old_operational_status = ""
4099 old_config_status = ""
4100 try:
4101 # wait for any previous tasks in process
4102 step = "Waiting for previous operations to terminate"
4103 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
4104
4105 self._write_ns_status(
4106 nsr_id=nsr_id,
4107 ns_state=None,
4108 current_operation="SCALING",
4109 current_operation_id=nslcmop_id
4110 )
4111
4112 step = "Getting nslcmop from database"
4113 self.logger.debug(step + " after having waited for previous tasks to be completed")
4114 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4115 step = "Getting nsr from database"
4116 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4117
4118 old_operational_status = db_nsr["operational-status"]
4119 old_config_status = db_nsr["config-status"]
4120 step = "Parsing scaling parameters"
4121 # self.logger.debug(step)
4122 db_nsr_update["operational-status"] = "scaling"
4123 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4124 nsr_deployed = db_nsr["_admin"].get("deployed")
4125
4126 #######
4127 nsr_deployed = db_nsr["_admin"].get("deployed")
4128 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
4129 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
4130 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
4131 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
4132 #######
4133
4134 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
4135 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
4136 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
4137 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
4138 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
4139
4140 # for backward compatibility
4141 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
4142 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
4143 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
4144 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4145
4146 step = "Getting vnfr from database"
4147 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
4148 step = "Getting vnfd from database"
4149 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4150
4151 step = "Getting scaling-group-descriptor"
4152 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
4153 if scaling_descriptor["name"] == scaling_group:
4154 break
4155 else:
4156 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
4157 "at vnfd:scaling-group-descriptor".format(scaling_group))
4158
4159 # cooldown_time = 0
4160 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
4161 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
4162 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
4163 # break
4164
4165 # TODO check if ns is in a proper status
4166 step = "Sending scale order to VIM"
4167 nb_scale_op = 0
4168 if not db_nsr["_admin"].get("scaling-group"):
4169 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
4170 admin_scale_index = 0
4171 else:
4172 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
4173 if admin_scale_info["name"] == scaling_group:
4174 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
4175 break
4176 else: # not found, set index one plus last element and add new entry with the name
4177 admin_scale_index += 1
4178 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
4179 RO_scaling_info = []
4180 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
4181 if scaling_type == "SCALE_OUT":
4182 # count if max-instance-count is reached
4183 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
4184 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
4185 if nb_scale_op >= max_instance_count:
4186 raise LcmException("reached the limit of {} (max-instance-count) "
4187 "scaling-out operations for the "
4188 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
4189
4190 nb_scale_op += 1
4191 vdu_scaling_info["scaling_direction"] = "OUT"
4192 vdu_scaling_info["vdu-create"] = {}
4193 for vdu_scale_info in scaling_descriptor["vdu"]:
4194 vdud = next(vdu for vdu in db_vnfd.get("vdu") if vdu["id"] == vdu_scale_info["vdu-id-ref"])
4195 vdu_index = len([x for x in db_vnfr.get("vdur", ())
4196 if x.get("vdu-id-ref") == vdu_scale_info["vdu-id-ref"] and
4197 x.get("member-vnf-index-ref") == vnf_index])
4198 cloud_init_text = self._get_cloud_init(vdud, db_vnfd)
4199 if cloud_init_text:
4200 additional_params = self._get_vdu_additional_params(db_vnfr, vdud["id"]) or {}
4201 cloud_init_list = []
4202 for x in range(vdu_scale_info.get("count", 1)):
4203 if cloud_init_text:
4204 # TODO Information of its own ip is not available because db_vnfr is not updated.
4205 additional_params["OSM"] = self._get_osm_params(db_vnfr, vdu_scale_info["vdu-id-ref"],
4206 vdu_index + x)
4207 cloud_init_list.append(self._parse_cloud_init(cloud_init_text, additional_params,
4208 db_vnfd["id"], vdud["id"]))
4209 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
4210 "type": "create", "count": vdu_scale_info.get("count", 1)})
4211 if cloud_init_list:
4212 RO_scaling_info[-1]["cloud_init"] = cloud_init_list
4213 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
4214
4215 elif scaling_type == "SCALE_IN":
4216 # count if min-instance-count is reached
4217 min_instance_count = 0
4218 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
4219 min_instance_count = int(scaling_descriptor["min-instance-count"])
4220 if nb_scale_op <= min_instance_count:
4221 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
4222 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
4223 nb_scale_op -= 1
4224 vdu_scaling_info["scaling_direction"] = "IN"
4225 vdu_scaling_info["vdu-delete"] = {}
4226 for vdu_scale_info in scaling_descriptor["vdu"]:
4227 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
4228 "type": "delete", "count": vdu_scale_info.get("count", 1)})
4229 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
4230
4231 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
4232 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
4233 if vdu_scaling_info["scaling_direction"] == "IN":
4234 for vdur in reversed(db_vnfr["vdur"]):
4235 if vdu_delete.get(vdur["vdu-id-ref"]):
4236 vdu_delete[vdur["vdu-id-ref"]] -= 1
4237 vdu_scaling_info["vdu"].append({
4238 "name": vdur.get("name") or vdur.get("vdu-name"),
4239 "vdu_id": vdur["vdu-id-ref"],
4240 "interface": []
4241 })
4242 for interface in vdur["interfaces"]:
4243 vdu_scaling_info["vdu"][-1]["interface"].append({
4244 "name": interface["name"],
4245 "ip_address": interface["ip-address"],
4246 "mac_address": interface.get("mac-address"),
4247 })
4248 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
4249
4250 # PRE-SCALE BEGIN
4251 step = "Executing pre-scale vnf-config-primitive"
4252 if scaling_descriptor.get("scaling-config-action"):
4253 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
4254 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
4255 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
4256 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
4257 step = db_nslcmop_update["detailed-status"] = \
4258 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
4259
4260 # look for primitive
4261 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
4262 if config_primitive["name"] == vnf_config_primitive:
4263 break
4264 else:
4265 raise LcmException(
4266 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
4267 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
4268 "primitive".format(scaling_group, vnf_config_primitive))
4269
4270 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
4271 if db_vnfr.get("additionalParamsForVnf"):
4272 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
4273
4274 scale_process = "VCA"
4275 db_nsr_update["config-status"] = "configuring pre-scaling"
4276 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
4277
4278 # Pre-scale retry check: Check if this sub-operation has been executed before
4279 op_index = self._check_or_add_scale_suboperation(
4280 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
4281 if op_index == self.SUBOPERATION_STATUS_SKIP:
4282 # Skip sub-operation
4283 result = 'COMPLETED'
4284 result_detail = 'Done'
4285 self.logger.debug(logging_text +
4286 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
4287 vnf_config_primitive, result, result_detail))
4288 else:
4289 if op_index == self.SUBOPERATION_STATUS_NEW:
4290 # New sub-operation: Get index of this sub-operation
4291 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4292 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
4293 format(vnf_config_primitive))
4294 else:
4295 # retry: Get registered params for this existing sub-operation
4296 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4297 vnf_index = op.get('member_vnf_index')
4298 vnf_config_primitive = op.get('primitive')
4299 primitive_params = op.get('primitive_params')
4300 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
4301 format(vnf_config_primitive))
4302 # Execute the primitive, either with new (first-time) or registered (reintent) args
4303 ee_descriptor_id = config_primitive.get("execution-environment-ref")
4304 primitive_name = config_primitive.get("execution-environment-primitive",
4305 vnf_config_primitive)
4306 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
4307 member_vnf_index=vnf_index,
4308 vdu_id=None,
4309 vdu_count_index=None,
4310 ee_descriptor_id=ee_descriptor_id)
4311 result, result_detail = await self._ns_execute_primitive(
4312 ee_id, primitive_name, primitive_params, vca_type)
4313 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
4314 vnf_config_primitive, result, result_detail))
4315 # Update operationState = COMPLETED | FAILED
4316 self._update_suboperation_status(
4317 db_nslcmop, op_index, result, result_detail)
4318
4319 if result == "FAILED":
4320 raise LcmException(result_detail)
4321 db_nsr_update["config-status"] = old_config_status
4322 scale_process = None
4323 # PRE-SCALE END
4324
4325 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
4326 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
4327
4328 # SCALE RO - BEGIN
4329 if RO_scaling_info:
4330 scale_process = "RO"
4331 if self.ro_config.get("ng"):
4332 await self._scale_ng_ro(logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage)
4333 else:
4334 await self._RO_scale(logging_text, RO_nsr_id, RO_scaling_info, db_nslcmop, db_vnfr,
4335 db_nslcmop_update, vdu_scaling_info)
4336 vdu_scaling_info.pop("vdu-create", None)
4337 vdu_scaling_info.pop("vdu-delete", None)
4338
4339 scale_process = None
4340 if db_nsr_update:
4341 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4342
4343 # POST-SCALE BEGIN
4344 # execute primitive service POST-SCALING
4345 step = "Executing post-scale vnf-config-primitive"
4346 if scaling_descriptor.get("scaling-config-action"):
4347 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
4348 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
4349 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
4350 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
4351 step = db_nslcmop_update["detailed-status"] = \
4352 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
4353
4354 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
4355 if db_vnfr.get("additionalParamsForVnf"):
4356 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
4357
4358 # look for primitive
4359 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
4360 if config_primitive["name"] == vnf_config_primitive:
4361 break
4362 else:
4363 raise LcmException(
4364 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
4365 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
4366 "config-primitive".format(scaling_group, vnf_config_primitive))
4367 scale_process = "VCA"
4368 db_nsr_update["config-status"] = "configuring post-scaling"
4369 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
4370
4371 # Post-scale retry check: Check if this sub-operation has been executed before
4372 op_index = self._check_or_add_scale_suboperation(
4373 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
4374 if op_index == self.SUBOPERATION_STATUS_SKIP:
4375 # Skip sub-operation
4376 result = 'COMPLETED'
4377 result_detail = 'Done'
4378 self.logger.debug(logging_text +
4379 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
4380 format(vnf_config_primitive, result, result_detail))
4381 else:
4382 if op_index == self.SUBOPERATION_STATUS_NEW:
4383 # New sub-operation: Get index of this sub-operation
4384 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4385 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
4386 format(vnf_config_primitive))
4387 else:
4388 # retry: Get registered params for this existing sub-operation
4389 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4390 vnf_index = op.get('member_vnf_index')
4391 vnf_config_primitive = op.get('primitive')
4392 primitive_params = op.get('primitive_params')
4393 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
4394 format(vnf_config_primitive))
4395 # Execute the primitive, either with new (first-time) or registered (reintent) args
4396 ee_descriptor_id = config_primitive.get("execution-environment-ref")
4397 primitive_name = config_primitive.get("execution-environment-primitive",
4398 vnf_config_primitive)
4399 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
4400 member_vnf_index=vnf_index,
4401 vdu_id=None,
4402 vdu_count_index=None,
4403 ee_descriptor_id=ee_descriptor_id)
4404 result, result_detail = await self._ns_execute_primitive(
4405 ee_id, primitive_name, primitive_params, vca_type)
4406 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
4407 vnf_config_primitive, result, result_detail))
4408 # Update operationState = COMPLETED | FAILED
4409 self._update_suboperation_status(
4410 db_nslcmop, op_index, result, result_detail)
4411
4412 if result == "FAILED":
4413 raise LcmException(result_detail)
4414 db_nsr_update["config-status"] = old_config_status
4415 scale_process = None
4416 # POST-SCALE END
4417
4418 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
4419 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
4420 else old_operational_status
4421 db_nsr_update["config-status"] = old_config_status
4422 return
4423 except (ROclient.ROClientException, DbException, LcmException, NgRoException) as e:
4424 self.logger.error(logging_text + "Exit Exception {}".format(e))
4425 exc = e
4426 except asyncio.CancelledError:
4427 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
4428 exc = "Operation was cancelled"
4429 except Exception as e:
4430 exc = traceback.format_exc()
4431 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
4432 finally:
4433 self._write_ns_status(
4434 nsr_id=nsr_id,
4435 ns_state=None,
4436 current_operation="IDLE",
4437 current_operation_id=None
4438 )
4439 if exc:
4440 db_nslcmop_update["detailed-status"] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
4441 nslcmop_operation_state = "FAILED"
4442 if db_nsr:
4443 db_nsr_update["operational-status"] = old_operational_status
4444 db_nsr_update["config-status"] = old_config_status
4445 db_nsr_update["detailed-status"] = ""
4446 if scale_process:
4447 if "VCA" in scale_process:
4448 db_nsr_update["config-status"] = "failed"
4449 if "RO" in scale_process:
4450 db_nsr_update["operational-status"] = "failed"
4451 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
4452 exc)
4453 else:
4454 error_description_nslcmop = None
4455 nslcmop_operation_state = "COMPLETED"
4456 db_nslcmop_update["detailed-status"] = "Done"
4457
4458 self._write_op_status(
4459 op_id=nslcmop_id,
4460 stage="",
4461 error_message=error_description_nslcmop,
4462 operation_state=nslcmop_operation_state,
4463 other_update=db_nslcmop_update,
4464 )
4465 if db_nsr:
4466 self._write_ns_status(
4467 nsr_id=nsr_id,
4468 ns_state=None,
4469 current_operation="IDLE",
4470 current_operation_id=None,
4471 other_update=db_nsr_update
4472 )
4473
4474 if nslcmop_operation_state:
4475 try:
4476 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
4477 "operationState": nslcmop_operation_state},
4478 loop=self.loop)
4479 # if cooldown_time:
4480 # await asyncio.sleep(cooldown_time, loop=self.loop)
4481 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
4482 except Exception as e:
4483 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
4484 self.logger.debug(logging_text + "Exit")
4485 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
4486
4487 async def _scale_ng_ro(self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage):
4488 nsr_id = db_nslcmop["nsInstanceId"]
4489 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4490 db_vnfrs = {}
4491
4492 # read from db: vnfd's for every vnf
4493 db_vnfds = {} # every vnfd data indexed by vnf id
4494 db_vnfds_ref = {} # every vnfd data indexed by vnfd id
4495 db_vnfds = {}
4496
4497 # for each vnf in ns, read vnfd
4498 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
4499 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
4500 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
4501 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
4502 # if we haven't this vnfd, read it from db
4503 if vnfd_id not in db_vnfds:
4504 # read from db
4505 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4506 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
4507 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
4508 n2vc_key = self.n2vc.get_public_key()
4509 n2vc_key_list = [n2vc_key]
4510 self.scale_vnfr(db_vnfr, vdu_scaling_info.get("vdu-create"), vdu_scaling_info.get("vdu-delete"),
4511 mark_delete=True)
4512 # db_vnfr has been updated, update db_vnfrs to use it
4513 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
4514 await self._instantiate_ng_ro(logging_text, nsr_id, db_nsd, db_nsr, db_nslcmop, db_vnfrs,
4515 db_vnfds_ref, n2vc_key_list, stage=stage, start_deploy=time(),
4516 timeout_ns_deploy=self.timeout_ns_deploy)
4517 if vdu_scaling_info.get("vdu-delete"):
4518 self.scale_vnfr(db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False)
4519
4520 async def _RO_scale(self, logging_text, RO_nsr_id, RO_scaling_info, db_nslcmop, db_vnfr, db_nslcmop_update,
4521 vdu_scaling_info):
4522 nslcmop_id = db_nslcmop["_id"]
4523 nsr_id = db_nslcmop["nsInstanceId"]
4524 vdu_create = vdu_scaling_info.get("vdu-create")
4525 vdu_delete = vdu_scaling_info.get("vdu-delete")
4526 # Scale RO retry check: Check if this sub-operation has been executed before
4527 op_index = self._check_or_add_scale_suboperation(
4528 db_nslcmop, db_vnfr["member-vnf-index-ref"], None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
4529 if op_index == self.SUBOPERATION_STATUS_SKIP:
4530 # Skip sub-operation
4531 result = 'COMPLETED'
4532 result_detail = 'Done'
4533 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(result, result_detail))
4534 else:
4535 if op_index == self.SUBOPERATION_STATUS_NEW:
4536 # New sub-operation: Get index of this sub-operation
4537 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4538 self.logger.debug(logging_text + "New sub-operation RO")
4539 else:
4540 # retry: Get registered params for this existing sub-operation
4541 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4542 RO_nsr_id = op.get('RO_nsr_id')
4543 RO_scaling_info = op.get('RO_scaling_info')
4544 self.logger.debug(logging_text + "Sub-operation RO retry")
4545
4546 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
4547 # wait until ready
4548 RO_nslcmop_id = RO_desc["instance_action_id"]
4549 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
4550
4551 RO_task_done = False
4552 step = detailed_status = "Waiting for VIM to scale. RO_task_id={}.".format(RO_nslcmop_id)
4553 detailed_status_old = None
4554 self.logger.debug(logging_text + step)
4555
4556 deployment_timeout = 1 * 3600 # One hour
4557 while deployment_timeout > 0:
4558 if not RO_task_done:
4559 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
4560 extra_item_id=RO_nslcmop_id)
4561
4562 # deploymentStatus
4563 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4564
4565 ns_status, ns_status_info = self.RO.check_action_status(desc)
4566 if ns_status == "ERROR":
4567 raise ROclient.ROClientException(ns_status_info)
4568 elif ns_status == "BUILD":
4569 detailed_status = step + "; {}".format(ns_status_info)
4570 elif ns_status == "ACTIVE":
4571 RO_task_done = True
4572 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
4573 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
4574 self.logger.debug(logging_text + step)
4575 else:
4576 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
4577 else:
4578 desc = await self.RO.show("ns", RO_nsr_id)
4579 ns_status, ns_status_info = self.RO.check_ns_status(desc)
4580 # deploymentStatus
4581 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4582
4583 if ns_status == "ERROR":
4584 raise ROclient.ROClientException(ns_status_info)
4585 elif ns_status == "BUILD":
4586 detailed_status = step + "; {}".format(ns_status_info)
4587 elif ns_status == "ACTIVE":
4588 step = detailed_status = \
4589 "Waiting for management IP address reported by the VIM. Updating VNFRs"
4590 try:
4591 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
4592 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
4593 break
4594 except LcmExceptionNoMgmtIP:
4595 pass
4596 else:
4597 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
4598 if detailed_status != detailed_status_old:
4599 self._update_suboperation_status(
4600 db_nslcmop, op_index, 'COMPLETED', detailed_status)
4601 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
4602 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
4603
4604 await asyncio.sleep(5, loop=self.loop)
4605 deployment_timeout -= 5
4606 if deployment_timeout <= 0:
4607 self._update_suboperation_status(
4608 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
4609 raise ROclient.ROClientException("Timeout waiting ns to be ready")
4610
4611 # update VDU_SCALING_INFO with the obtained ip_addresses
4612 if vdu_scaling_info["scaling_direction"] == "OUT":
4613 for vdur in reversed(db_vnfr["vdur"]):
4614 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
4615 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
4616 vdu_scaling_info["vdu"].append({
4617 "name": vdur["name"] or vdur.get("vdu-name"),
4618 "vdu_id": vdur["vdu-id-ref"],
4619 "interface": []
4620 })
4621 for interface in vdur["interfaces"]:
4622 vdu_scaling_info["vdu"][-1]["interface"].append({
4623 "name": interface["name"],
4624 "ip_address": interface["ip-address"],
4625 "mac_address": interface.get("mac-address"),
4626 })
4627 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
4628
4629 async def add_prometheus_metrics(self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip):
4630 if not self.prometheus:
4631 return
4632 # look if exist a file called 'prometheus*.j2' and
4633 artifact_content = self.fs.dir_ls(artifact_path)
4634 job_file = next((f for f in artifact_content if f.startswith("prometheus") and f.endswith(".j2")), None)
4635 if not job_file:
4636 return
4637 with self.fs.file_open((artifact_path, job_file), "r") as f:
4638 job_data = f.read()
4639
4640 # TODO get_service
4641 _, _, service = ee_id.partition(".") # remove prefix "namespace."
4642 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
4643 host_port = "80"
4644 vnfr_id = vnfr_id.replace("-", "")
4645 variables = {
4646 "JOB_NAME": vnfr_id,
4647 "TARGET_IP": target_ip,
4648 "EXPORTER_POD_IP": host_name,
4649 "EXPORTER_POD_PORT": host_port,
4650 }
4651 job_list = self.prometheus.parse_job(job_data, variables)
4652 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
4653 for job in job_list:
4654 if not isinstance(job.get("job_name"), str) or vnfr_id not in job["job_name"]:
4655 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
4656 job["nsr_id"] = nsr_id
4657 job_dict = {jl["job_name"]: jl for jl in job_list}
4658 if await self.prometheus.update(job_dict):
4659 return list(job_dict.keys())
4660
4661 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
4662 """
4663 Get VCA Cloud and VCA Cloud Credentials for the VIM account
4664
4665 :param: vim_account_id: VIM Account ID
4666
4667 :return: (cloud_name, cloud_credential)
4668 """
4669 config = self.get_vim_account_config(vim_account_id)
4670 return config.get("vca_cloud"), config.get("vca_cloud_credential")
4671
4672 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
4673 """
4674 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
4675
4676 :param: vim_account_id: VIM Account ID
4677
4678 :return: (cloud_name, cloud_credential)
4679 """
4680 config = self.get_vim_account_config(vim_account_id)
4681 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")
4682
4683 def get_vim_account_config(self, vim_account_id: str) -> dict:
4684 """
4685 Get VIM Account config from the OSM Database
4686
4687 :param: vim_account_id: VIM Account ID
4688
4689 :return: Dictionary with the config of the vim account
4690 """
4691 vim_account = self.db.get_one(table="vim_accounts", q_filter={"_id": vim_account_id}, fail_on_empty=False)
4692 return vim_account.get("config", {}) if vim_account else {}