Feature-9904: Enhancing NG-UI to enable Juju operational view dashboard
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, TemplateError, TemplateNotFound, StrictUndefined, UndefinedError
26
27 from osm_lcm import ROclient
28 from osm_lcm.ng_ro import NgRoClient, NgRoException
29 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
30 from osm_lcm.data_utils.nsd import get_vnf_profiles
31 from osm_lcm.data_utils.vnfd import get_vdu_list, get_vdu_profile, \
32 get_ee_sorted_initial_config_primitive_list, get_ee_sorted_terminate_config_primitive_list, \
33 get_kdu_list, get_virtual_link_profiles, get_vdu, get_configuration, \
34 get_vdu_index, get_scaling_aspect, get_number_of_instances, get_juju_ee_ref
35 from osm_lcm.data_utils.list_utils import find_in_list
36 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index
37 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
38 from osm_lcm.data_utils.database.vim_account import VimAccountDB
39 from n2vc.k8s_helm_conn import K8sHelmConnector
40 from n2vc.k8s_helm3_conn import K8sHelm3Connector
41 from n2vc.k8s_juju_conn import K8sJujuConnector
42
43 from osm_common.dbbase import DbException
44 from osm_common.fsbase import FsException
45
46 from osm_lcm.data_utils.database.database import Database
47 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
48
49 from n2vc.n2vc_juju_conn import N2VCJujuConnector
50 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
51
52 from osm_lcm.lcm_helm_conn import LCMHelmConn
53
54 from copy import copy, deepcopy
55 from time import time
56 from uuid import uuid4
57
58 from random import randint
59
60 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
61
62
63 class NsLcm(LcmBase):
64 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
65 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
66 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
67 timeout_charm_delete = 10 * 60
68 timeout_primitive = 30 * 60 # timeout for primitive execution
69 timeout_progress_primitive = 10 * 60 # timeout for some progress in a primitive execution
70
71 SUBOPERATION_STATUS_NOT_FOUND = -1
72 SUBOPERATION_STATUS_NEW = -2
73 SUBOPERATION_STATUS_SKIP = -3
74 task_name_deploy_vca = "Deploying VCA"
75
76 def __init__(self, msg, lcm_tasks, config, loop, prometheus=None):
77 """
78 Init, Connect to database, filesystem storage, and messaging
79 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
80 :return: None
81 """
82 super().__init__(
83 msg=msg,
84 logger=logging.getLogger('lcm.ns')
85 )
86
87 self.db = Database().instance.db
88 self.fs = Filesystem().instance.fs
89 self.loop = loop
90 self.lcm_tasks = lcm_tasks
91 self.timeout = config["timeout"]
92 self.ro_config = config["ro_config"]
93 self.ng_ro = config["ro_config"].get("ng")
94 self.vca_config = config["VCA"].copy()
95
96 # create N2VC connector
97 self.n2vc = N2VCJujuConnector(
98 log=self.logger,
99 loop=self.loop,
100 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
101 username=self.vca_config.get('user', None),
102 vca_config=self.vca_config,
103 on_update_db=self._on_update_n2vc_db,
104 fs=self.fs,
105 db=self.db
106 )
107
108 self.conn_helm_ee = LCMHelmConn(
109 log=self.logger,
110 loop=self.loop,
111 url=None,
112 username=None,
113 vca_config=self.vca_config,
114 on_update_db=self._on_update_n2vc_db
115 )
116
117 self.k8sclusterhelm2 = K8sHelmConnector(
118 kubectl_command=self.vca_config.get("kubectlpath"),
119 helm_command=self.vca_config.get("helmpath"),
120 log=self.logger,
121 on_update_db=None,
122 fs=self.fs,
123 db=self.db
124 )
125
126 self.k8sclusterhelm3 = K8sHelm3Connector(
127 kubectl_command=self.vca_config.get("kubectlpath"),
128 helm_command=self.vca_config.get("helm3path"),
129 fs=self.fs,
130 log=self.logger,
131 db=self.db,
132 on_update_db=None,
133 )
134
135 self.k8sclusterjuju = K8sJujuConnector(
136 kubectl_command=self.vca_config.get("kubectlpath"),
137 juju_command=self.vca_config.get("jujupath"),
138 log=self.logger,
139 loop=self.loop,
140 on_update_db=None,
141 vca_config=self.vca_config,
142 fs=self.fs,
143 db=self.db
144 )
145
146 self.k8scluster_map = {
147 "helm-chart": self.k8sclusterhelm2,
148 "helm-chart-v3": self.k8sclusterhelm3,
149 "chart": self.k8sclusterhelm3,
150 "juju-bundle": self.k8sclusterjuju,
151 "juju": self.k8sclusterjuju,
152 }
153
154 self.vca_map = {
155 "lxc_proxy_charm": self.n2vc,
156 "native_charm": self.n2vc,
157 "k8s_proxy_charm": self.n2vc,
158 "helm": self.conn_helm_ee,
159 "helm-v3": self.conn_helm_ee
160 }
161
162 self.prometheus = prometheus
163
164 # create RO client
165 self.RO = NgRoClient(self.loop, **self.ro_config)
166
167 @staticmethod
168 def increment_ip_mac(ip_mac, vm_index=1):
169 if not isinstance(ip_mac, str):
170 return ip_mac
171 try:
172 # try with ipv4 look for last dot
173 i = ip_mac.rfind(".")
174 if i > 0:
175 i += 1
176 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
177 # try with ipv6 or mac look for last colon. Operate in hex
178 i = ip_mac.rfind(":")
179 if i > 0:
180 i += 1
181 # format in hex, len can be 2 for mac or 4 for ipv6
182 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(ip_mac[:i], int(ip_mac[i:], 16) + vm_index)
183 except Exception:
184 pass
185 return None
186
187 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
188
189 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
190
191 try:
192 # TODO filter RO descriptor fields...
193
194 # write to database
195 db_dict = dict()
196 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
197 db_dict['deploymentStatus'] = ro_descriptor
198 self.update_db_2("nsrs", nsrs_id, db_dict)
199
200 except Exception as e:
201 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
202
203 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
204
205 # remove last dot from path (if exists)
206 if path.endswith('.'):
207 path = path[:-1]
208
209 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
210 # .format(table, filter, path, updated_data))
211 try:
212
213 nsr_id = filter.get('_id')
214
215 # read ns record from database
216 nsr = self.db.get_one(table='nsrs', q_filter=filter)
217 current_ns_status = nsr.get('nsState')
218
219 # get vca status for NS
220 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
221
222 # vcaStatus
223 db_dict = dict()
224 db_dict['vcaStatus'] = status_dict
225 await self.n2vc.update_vca_status(db_dict['vcaStatus'])
226
227 # update configurationStatus for this VCA
228 try:
229 vca_index = int(path[path.rfind(".")+1:])
230
231 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
232 vca_status = vca_list[vca_index].get('status')
233
234 configuration_status_list = nsr.get('configurationStatus')
235 config_status = configuration_status_list[vca_index].get('status')
236
237 if config_status == 'BROKEN' and vca_status != 'failed':
238 db_dict['configurationStatus'][vca_index] = 'READY'
239 elif config_status != 'BROKEN' and vca_status == 'failed':
240 db_dict['configurationStatus'][vca_index] = 'BROKEN'
241 except Exception as e:
242 # not update configurationStatus
243 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
244
245 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
246 # if nsState = 'DEGRADED' check if all is OK
247 is_degraded = False
248 if current_ns_status in ('READY', 'DEGRADED'):
249 error_description = ''
250 # check machines
251 if status_dict.get('machines'):
252 for machine_id in status_dict.get('machines'):
253 machine = status_dict.get('machines').get(machine_id)
254 # check machine agent-status
255 if machine.get('agent-status'):
256 s = machine.get('agent-status').get('status')
257 if s != 'started':
258 is_degraded = True
259 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
260 # check machine instance status
261 if machine.get('instance-status'):
262 s = machine.get('instance-status').get('status')
263 if s != 'running':
264 is_degraded = True
265 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
266 # check applications
267 if status_dict.get('applications'):
268 for app_id in status_dict.get('applications'):
269 app = status_dict.get('applications').get(app_id)
270 # check application status
271 if app.get('status'):
272 s = app.get('status').get('status')
273 if s != 'active':
274 is_degraded = True
275 error_description += 'application {} status={} ; '.format(app_id, s)
276
277 if error_description:
278 db_dict['errorDescription'] = error_description
279 if current_ns_status == 'READY' and is_degraded:
280 db_dict['nsState'] = 'DEGRADED'
281 if current_ns_status == 'DEGRADED' and not is_degraded:
282 db_dict['nsState'] = 'READY'
283
284 # write to database
285 self.update_db_2("nsrs", nsr_id, db_dict)
286
287 except (asyncio.CancelledError, asyncio.TimeoutError):
288 raise
289 except Exception as e:
290 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
291
292 @staticmethod
293 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
294 try:
295 env = Environment(undefined=StrictUndefined)
296 template = env.from_string(cloud_init_text)
297 return template.render(additional_params or {})
298 except UndefinedError as e:
299 raise LcmException("Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
300 "file, must be provided in the instantiation parameters inside the "
301 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id))
302 except (TemplateError, TemplateNotFound) as e:
303 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
304 format(vnfd_id, vdu_id, e))
305
306 def _get_vdu_cloud_init_content(self, vdu, vnfd):
307 cloud_init_content = cloud_init_file = None
308 try:
309 if vdu.get("cloud-init-file"):
310 base_folder = vnfd["_admin"]["storage"]
311 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
312 vdu["cloud-init-file"])
313 with self.fs.file_open(cloud_init_file, "r") as ci_file:
314 cloud_init_content = ci_file.read()
315 elif vdu.get("cloud-init"):
316 cloud_init_content = vdu["cloud-init"]
317
318 return cloud_init_content
319 except FsException as e:
320 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
321 format(vnfd["id"], vdu["id"], cloud_init_file, e))
322
323 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
324 vdur = next(vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"])
325 additional_params = vdur.get("additionalParams")
326 return parse_yaml_strings(additional_params)
327
328 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
329 """
330 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
331 :param vnfd: input vnfd
332 :param new_id: overrides vnf id if provided
333 :param additionalParams: Instantiation params for VNFs provided
334 :param nsrId: Id of the NSR
335 :return: copy of vnfd
336 """
337 vnfd_RO = deepcopy(vnfd)
338 # remove unused by RO configuration, monitoring, scaling and internal keys
339 vnfd_RO.pop("_id", None)
340 vnfd_RO.pop("_admin", None)
341 vnfd_RO.pop("monitoring-param", None)
342 vnfd_RO.pop("scaling-group-descriptor", None)
343 vnfd_RO.pop("kdu", None)
344 vnfd_RO.pop("k8s-cluster", None)
345 if new_id:
346 vnfd_RO["id"] = new_id
347
348 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
349 for vdu in get_iterable(vnfd_RO, "vdu"):
350 vdu.pop("cloud-init-file", None)
351 vdu.pop("cloud-init", None)
352 return vnfd_RO
353
354 @staticmethod
355 def ip_profile_2_RO(ip_profile):
356 RO_ip_profile = deepcopy(ip_profile)
357 if "dns-server" in RO_ip_profile:
358 if isinstance(RO_ip_profile["dns-server"], list):
359 RO_ip_profile["dns-address"] = []
360 for ds in RO_ip_profile.pop("dns-server"):
361 RO_ip_profile["dns-address"].append(ds['address'])
362 else:
363 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
364 if RO_ip_profile.get("ip-version") == "ipv4":
365 RO_ip_profile["ip-version"] = "IPv4"
366 if RO_ip_profile.get("ip-version") == "ipv6":
367 RO_ip_profile["ip-version"] = "IPv6"
368 if "dhcp-params" in RO_ip_profile:
369 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
370 return RO_ip_profile
371
372 def _get_ro_vim_id_for_vim_account(self, vim_account):
373 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
374 if db_vim["_admin"]["operationalState"] != "ENABLED":
375 raise LcmException("VIM={} is not available. operationalState={}".format(
376 vim_account, db_vim["_admin"]["operationalState"]))
377 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
378 return RO_vim_id
379
380 def get_ro_wim_id_for_wim_account(self, wim_account):
381 if isinstance(wim_account, str):
382 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
383 if db_wim["_admin"]["operationalState"] != "ENABLED":
384 raise LcmException("WIM={} is not available. operationalState={}".format(
385 wim_account, db_wim["_admin"]["operationalState"]))
386 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
387 return RO_wim_id
388 else:
389 return wim_account
390
391 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
392
393 db_vdu_push_list = []
394 db_update = {"_admin.modified": time()}
395 if vdu_create:
396 for vdu_id, vdu_count in vdu_create.items():
397 vdur = next((vdur for vdur in reversed(db_vnfr["vdur"]) if vdur["vdu-id-ref"] == vdu_id), None)
398 if not vdur:
399 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".
400 format(vdu_id))
401
402 for count in range(vdu_count):
403 vdur_copy = deepcopy(vdur)
404 vdur_copy["status"] = "BUILD"
405 vdur_copy["status-detailed"] = None
406 vdur_copy["ip-address"]: None
407 vdur_copy["_id"] = str(uuid4())
408 vdur_copy["count-index"] += count + 1
409 vdur_copy["id"] = "{}-{}".format(vdur_copy["vdu-id-ref"], vdur_copy["count-index"])
410 vdur_copy.pop("vim_info", None)
411 for iface in vdur_copy["interfaces"]:
412 if iface.get("fixed-ip"):
413 iface["ip-address"] = self.increment_ip_mac(iface["ip-address"], count+1)
414 else:
415 iface.pop("ip-address", None)
416 if iface.get("fixed-mac"):
417 iface["mac-address"] = self.increment_ip_mac(iface["mac-address"], count+1)
418 else:
419 iface.pop("mac-address", None)
420 iface.pop("mgmt_vnf", None) # only first vdu can be managment of vnf
421 db_vdu_push_list.append(vdur_copy)
422 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
423 if vdu_delete:
424 for vdu_id, vdu_count in vdu_delete.items():
425 if mark_delete:
426 indexes_to_delete = [iv[0] for iv in enumerate(db_vnfr["vdur"]) if iv[1]["vdu-id-ref"] == vdu_id]
427 db_update.update({"vdur.{}.status".format(i): "DELETING" for i in indexes_to_delete[-vdu_count:]})
428 else:
429 # it must be deleted one by one because common.db does not allow otherwise
430 vdus_to_delete = [v for v in reversed(db_vnfr["vdur"]) if v["vdu-id-ref"] == vdu_id]
431 for vdu in vdus_to_delete[:vdu_count]:
432 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, None, pull={"vdur": {"_id": vdu["_id"]}})
433 db_push = {"vdur": db_vdu_push_list} if db_vdu_push_list else None
434 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
435 # modify passed dictionary db_vnfr
436 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
437 db_vnfr["vdur"] = db_vnfr_["vdur"]
438
439 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
440 """
441 Updates database nsr with the RO info for the created vld
442 :param ns_update_nsr: dictionary to be filled with the updated info
443 :param db_nsr: content of db_nsr. This is also modified
444 :param nsr_desc_RO: nsr descriptor from RO
445 :return: Nothing, LcmException is raised on errors
446 """
447
448 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
449 for net_RO in get_iterable(nsr_desc_RO, "nets"):
450 if vld["id"] != net_RO.get("ns_net_osm_id"):
451 continue
452 vld["vim-id"] = net_RO.get("vim_net_id")
453 vld["name"] = net_RO.get("vim_name")
454 vld["status"] = net_RO.get("status")
455 vld["status-detailed"] = net_RO.get("error_msg")
456 ns_update_nsr["vld.{}".format(vld_index)] = vld
457 break
458 else:
459 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
460
461 def set_vnfr_at_error(self, db_vnfrs, error_text):
462 try:
463 for db_vnfr in db_vnfrs.values():
464 vnfr_update = {"status": "ERROR"}
465 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
466 if "status" not in vdur:
467 vdur["status"] = "ERROR"
468 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
469 if error_text:
470 vdur["status-detailed"] = str(error_text)
471 vnfr_update["vdur.{}.status-detailed".format(vdu_index)] = "ERROR"
472 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
473 except DbException as e:
474 self.logger.error("Cannot update vnf. {}".format(e))
475
476 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
477 """
478 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
479 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
480 :param nsr_desc_RO: nsr descriptor from RO
481 :return: Nothing, LcmException is raised on errors
482 """
483 for vnf_index, db_vnfr in db_vnfrs.items():
484 for vnf_RO in nsr_desc_RO["vnfs"]:
485 if vnf_RO["member_vnf_index"] != vnf_index:
486 continue
487 vnfr_update = {}
488 if vnf_RO.get("ip_address"):
489 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
490 elif not db_vnfr.get("ip-address"):
491 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
492 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
493
494 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
495 vdur_RO_count_index = 0
496 if vdur.get("pdu-type"):
497 continue
498 for vdur_RO in get_iterable(vnf_RO, "vms"):
499 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
500 continue
501 if vdur["count-index"] != vdur_RO_count_index:
502 vdur_RO_count_index += 1
503 continue
504 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
505 if vdur_RO.get("ip_address"):
506 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
507 else:
508 vdur["ip-address"] = None
509 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
510 vdur["name"] = vdur_RO.get("vim_name")
511 vdur["status"] = vdur_RO.get("status")
512 vdur["status-detailed"] = vdur_RO.get("error_msg")
513 for ifacer in get_iterable(vdur, "interfaces"):
514 for interface_RO in get_iterable(vdur_RO, "interfaces"):
515 if ifacer["name"] == interface_RO.get("internal_name"):
516 ifacer["ip-address"] = interface_RO.get("ip_address")
517 ifacer["mac-address"] = interface_RO.get("mac_address")
518 break
519 else:
520 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
521 "from VIM info"
522 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
523 vnfr_update["vdur.{}".format(vdu_index)] = vdur
524 break
525 else:
526 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
527 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
528
529 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
530 for net_RO in get_iterable(nsr_desc_RO, "nets"):
531 if vld["id"] != net_RO.get("vnf_net_osm_id"):
532 continue
533 vld["vim-id"] = net_RO.get("vim_net_id")
534 vld["name"] = net_RO.get("vim_name")
535 vld["status"] = net_RO.get("status")
536 vld["status-detailed"] = net_RO.get("error_msg")
537 vnfr_update["vld.{}".format(vld_index)] = vld
538 break
539 else:
540 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
541 vnf_index, vld["id"]))
542
543 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
544 break
545
546 else:
547 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
548
549 def _get_ns_config_info(self, nsr_id):
550 """
551 Generates a mapping between vnf,vdu elements and the N2VC id
552 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
553 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
554 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
555 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
556 """
557 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
558 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
559 mapping = {}
560 ns_config_info = {"osm-config-mapping": mapping}
561 for vca in vca_deployed_list:
562 if not vca["member-vnf-index"]:
563 continue
564 if not vca["vdu_id"]:
565 mapping[vca["member-vnf-index"]] = vca["application"]
566 else:
567 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
568 vca["application"]
569 return ns_config_info
570
571 async def _instantiate_ng_ro(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds,
572 n2vc_key_list, stage, start_deploy, timeout_ns_deploy):
573
574 db_vims = {}
575
576 def get_vim_account(vim_account_id):
577 nonlocal db_vims
578 if vim_account_id in db_vims:
579 return db_vims[vim_account_id]
580 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
581 db_vims[vim_account_id] = db_vim
582 return db_vim
583
584 # modify target_vld info with instantiation parameters
585 def parse_vld_instantiation_params(target_vim, target_vld, vld_params, target_sdn):
586 if vld_params.get("ip-profile"):
587 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params["ip-profile"]
588 if vld_params.get("provider-network"):
589 target_vld["vim_info"][target_vim]["provider_network"] = vld_params["provider-network"]
590 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
591 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params["provider-network"]["sdn-ports"]
592 if vld_params.get("wimAccountId"):
593 target_wim = "wim:{}".format(vld_params["wimAccountId"])
594 target_vld["vim_info"][target_wim] = {}
595 for param in ("vim-network-name", "vim-network-id"):
596 if vld_params.get(param):
597 if isinstance(vld_params[param], dict):
598 for vim, vim_net in vld_params[param].items():
599 other_target_vim = "vim:" + vim
600 populate_dict(target_vld["vim_info"], (other_target_vim, param.replace("-", "_")), vim_net)
601 else: # isinstance str
602 target_vld["vim_info"][target_vim][param.replace("-", "_")] = vld_params[param]
603 if vld_params.get("common_id"):
604 target_vld["common_id"] = vld_params.get("common_id")
605
606 nslcmop_id = db_nslcmop["_id"]
607 target = {
608 "name": db_nsr["name"],
609 "ns": {"vld": []},
610 "vnf": [],
611 "image": deepcopy(db_nsr["image"]),
612 "flavor": deepcopy(db_nsr["flavor"]),
613 "action_id": nslcmop_id,
614 "cloud_init_content": {},
615 }
616 for image in target["image"]:
617 image["vim_info"] = {}
618 for flavor in target["flavor"]:
619 flavor["vim_info"] = {}
620
621 if db_nslcmop.get("lcmOperationType") != "instantiate":
622 # get parameters of instantiation:
623 db_nslcmop_instantiate = self.db.get_list("nslcmops", {"nsInstanceId": db_nslcmop["nsInstanceId"],
624 "lcmOperationType": "instantiate"})[-1]
625 ns_params = db_nslcmop_instantiate.get("operationParams")
626 else:
627 ns_params = db_nslcmop.get("operationParams")
628 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
629 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
630
631 cp2target = {}
632 for vld_index, vld in enumerate(db_nsr.get("vld")):
633 target_vim = "vim:{}".format(ns_params["vimAccountId"])
634 target_vld = {
635 "id": vld["id"],
636 "name": vld["name"],
637 "mgmt-network": vld.get("mgmt-network", False),
638 "type": vld.get("type"),
639 "vim_info": {
640 target_vim: {
641 "vim_network_name": vld.get("vim-network-name"),
642 "vim_account_id": ns_params["vimAccountId"]
643 }
644 }
645 }
646 # check if this network needs SDN assist
647 if vld.get("pci-interfaces"):
648 db_vim = get_vim_account(ns_params["vimAccountId"])
649 sdnc_id = db_vim["config"].get("sdn-controller")
650 if sdnc_id:
651 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
652 target_sdn = "sdn:{}".format(sdnc_id)
653 target_vld["vim_info"][target_sdn] = {
654 "sdn": True, "target_vim": target_vim, "vlds": [sdn_vld], "type": vld.get("type")}
655
656 nsd_vnf_profiles = get_vnf_profiles(nsd)
657 for nsd_vnf_profile in nsd_vnf_profiles:
658 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
659 if cp["virtual-link-profile-id"] == vld["id"]:
660 cp2target["member_vnf:{}.{}".format(
661 cp["constituent-cpd-id"][0]["constituent-base-element-id"],
662 cp["constituent-cpd-id"][0]["constituent-cpd-id"]
663 )] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
664
665 # check at nsd descriptor, if there is an ip-profile
666 vld_params = {}
667 virtual_link_profiles = get_virtual_link_profiles(nsd)
668
669 for vlp in virtual_link_profiles:
670 ip_profile = find_in_list(nsd["ip-profiles"],
671 lambda profile: profile["name"] == vlp["ip-profile-ref"])
672 vld_params["ip-profile"] = ip_profile["ip-profile-params"]
673 # update vld_params with instantiation params
674 vld_instantiation_params = find_in_list(get_iterable(ns_params, "vld"),
675 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]))
676 if vld_instantiation_params:
677 vld_params.update(vld_instantiation_params)
678 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
679 target["ns"]["vld"].append(target_vld)
680
681 for vnfr in db_vnfrs.values():
682 vnfd = find_in_list(db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"])
683 vnf_params = find_in_list(get_iterable(ns_params, "vnf"),
684 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"])
685 target_vnf = deepcopy(vnfr)
686 target_vim = "vim:{}".format(vnfr["vim-account-id"])
687 for vld in target_vnf.get("vld", ()):
688 # check if connected to a ns.vld, to fill target'
689 vnf_cp = find_in_list(vnfd.get("int-virtual-link-desc", ()),
690 lambda cpd: cpd.get("id") == vld["id"])
691 if vnf_cp:
692 ns_cp = "member_vnf:{}.{}".format(vnfr["member-vnf-index-ref"], vnf_cp["id"])
693 if cp2target.get(ns_cp):
694 vld["target"] = cp2target[ns_cp]
695
696 vld["vim_info"] = {target_vim: {"vim_network_name": vld.get("vim-network-name")}}
697 # check if this network needs SDN assist
698 target_sdn = None
699 if vld.get("pci-interfaces"):
700 db_vim = get_vim_account(vnfr["vim-account-id"])
701 sdnc_id = db_vim["config"].get("sdn-controller")
702 if sdnc_id:
703 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
704 target_sdn = "sdn:{}".format(sdnc_id)
705 vld["vim_info"][target_sdn] = {
706 "sdn": True, "target_vim": target_vim, "vlds": [sdn_vld], "type": vld.get("type")}
707
708 # check at vnfd descriptor, if there is an ip-profile
709 vld_params = {}
710 vnfd_vlp = find_in_list(
711 get_virtual_link_profiles(vnfd),
712 lambda a_link_profile: a_link_profile["id"] == vld["id"]
713 )
714 if vnfd_vlp and vnfd_vlp.get("virtual-link-protocol-data") and \
715 vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data"):
716 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"]["l3-protocol-data"]
717 ip_profile_dest_data = {}
718 if "ip-version" in ip_profile_source_data:
719 ip_profile_dest_data["ip-version"] = ip_profile_source_data["ip-version"]
720 if "cidr" in ip_profile_source_data:
721 ip_profile_dest_data["subnet-address"] = ip_profile_source_data["cidr"]
722 if "gateway-ip" in ip_profile_source_data:
723 ip_profile_dest_data["gateway-address"] = ip_profile_source_data["gateway-ip"]
724 if "dhcp-enabled" in ip_profile_source_data:
725 ip_profile_dest_data["dhcp-params"] = {
726 "enabled": ip_profile_source_data["dhcp-enabled"]
727 }
728
729 vld_params["ip-profile"] = ip_profile_dest_data
730 # update vld_params with instantiation params
731 if vnf_params:
732 vld_instantiation_params = find_in_list(get_iterable(vnf_params, "internal-vld"),
733 lambda i_vld: i_vld["name"] == vld["id"])
734 if vld_instantiation_params:
735 vld_params.update(vld_instantiation_params)
736 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
737
738 vdur_list = []
739 for vdur in target_vnf.get("vdur", ()):
740 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
741 continue # This vdu must not be created
742 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
743
744 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
745
746 if ssh_keys_all:
747 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
748 vnf_configuration = get_configuration(vnfd, vnfd["id"])
749 if vdu_configuration and vdu_configuration.get("config-access") and \
750 vdu_configuration.get("config-access").get("ssh-access"):
751 vdur["ssh-keys"] = ssh_keys_all
752 vdur["ssh-access-required"] = vdu_configuration["config-access"]["ssh-access"]["required"]
753 elif vnf_configuration and vnf_configuration.get("config-access") and \
754 vnf_configuration.get("config-access").get("ssh-access") and \
755 any(iface.get("mgmt-vnf") for iface in vdur["interfaces"]):
756 vdur["ssh-keys"] = ssh_keys_all
757 vdur["ssh-access-required"] = vnf_configuration["config-access"]["ssh-access"]["required"]
758 elif ssh_keys_instantiation and \
759 find_in_list(vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")):
760 vdur["ssh-keys"] = ssh_keys_instantiation
761
762 self.logger.debug("NS > vdur > {}".format(vdur))
763
764 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
765 # cloud-init
766 if vdud.get("cloud-init-file"):
767 vdur["cloud-init"] = "{}:file:{}".format(vnfd["_id"], vdud.get("cloud-init-file"))
768 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
769 if vdur["cloud-init"] not in target["cloud_init_content"]:
770 base_folder = vnfd["_admin"]["storage"]
771 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
772 vdud.get("cloud-init-file"))
773 with self.fs.file_open(cloud_init_file, "r") as ci_file:
774 target["cloud_init_content"][vdur["cloud-init"]] = ci_file.read()
775 elif vdud.get("cloud-init"):
776 vdur["cloud-init"] = "{}:vdu:{}".format(vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"]))
777 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
778 target["cloud_init_content"][vdur["cloud-init"]] = vdud["cloud-init"]
779 vdur["additionalParams"] = vdur.get("additionalParams") or {}
780 deploy_params_vdu = self._format_additional_params(vdur.get("additionalParams") or {})
781 deploy_params_vdu["OSM"] = get_osm_params(vnfr, vdur["vdu-id-ref"], vdur["count-index"])
782 vdur["additionalParams"] = deploy_params_vdu
783
784 # flavor
785 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
786 if target_vim not in ns_flavor["vim_info"]:
787 ns_flavor["vim_info"][target_vim] = {}
788
789 # deal with images
790 # in case alternative images are provided we must check if they should be applied
791 # for the vim_type, modify the vim_type taking into account
792 ns_image_id = int(vdur["ns-image-id"])
793 if vdur.get("alt-image-ids"):
794 db_vim = get_vim_account(vnfr["vim-account-id"])
795 vim_type = db_vim["vim_type"]
796 for alt_image_id in vdur.get("alt-image-ids"):
797 ns_alt_image = target["image"][int(alt_image_id)]
798 if vim_type == ns_alt_image.get("vim-type"):
799 # must use alternative image
800 self.logger.debug("use alternative image id: {}".format(alt_image_id))
801 ns_image_id = alt_image_id
802 vdur["ns-image-id"] = ns_image_id
803 break
804 ns_image = target["image"][int(ns_image_id)]
805 if target_vim not in ns_image["vim_info"]:
806 ns_image["vim_info"][target_vim] = {}
807
808 vdur["vim_info"] = {target_vim: {}}
809 # instantiation parameters
810 # if vnf_params:
811 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
812 # vdud["id"]), None)
813 vdur_list.append(vdur)
814 target_vnf["vdur"] = vdur_list
815 target["vnf"].append(target_vnf)
816
817 desc = await self.RO.deploy(nsr_id, target)
818 self.logger.debug("RO return > {}".format(desc))
819 action_id = desc["action_id"]
820 await self._wait_ng_ro(nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage)
821
822 # Updating NSR
823 db_nsr_update = {
824 "_admin.deployed.RO.operational-status": "running",
825 "detailed-status": " ".join(stage)
826 }
827 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
828 self.update_db_2("nsrs", nsr_id, db_nsr_update)
829 self._write_op_status(nslcmop_id, stage)
830 self.logger.debug(logging_text + "ns deployed at RO. RO_id={}".format(action_id))
831 return
832
833 async def _wait_ng_ro(self, nsr_id, action_id, nslcmop_id=None, start_time=None, timeout=600, stage=None):
834 detailed_status_old = None
835 db_nsr_update = {}
836 start_time = start_time or time()
837 while time() <= start_time + timeout:
838 desc_status = await self.RO.status(nsr_id, action_id)
839 self.logger.debug("Wait NG RO > {}".format(desc_status))
840 if desc_status["status"] == "FAILED":
841 raise NgRoException(desc_status["details"])
842 elif desc_status["status"] == "BUILD":
843 if stage:
844 stage[2] = "VIM: ({})".format(desc_status["details"])
845 elif desc_status["status"] == "DONE":
846 if stage:
847 stage[2] = "Deployed at VIM"
848 break
849 else:
850 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status["status"])
851 if stage and nslcmop_id and stage[2] != detailed_status_old:
852 detailed_status_old = stage[2]
853 db_nsr_update["detailed-status"] = " ".join(stage)
854 self.update_db_2("nsrs", nsr_id, db_nsr_update)
855 self._write_op_status(nslcmop_id, stage)
856 await asyncio.sleep(15, loop=self.loop)
857 else: # timeout_ns_deploy
858 raise NgRoException("Timeout waiting ns to deploy")
859
860 async def _terminate_ng_ro(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
861 db_nsr_update = {}
862 failed_detail = []
863 action_id = None
864 start_deploy = time()
865 try:
866 target = {
867 "ns": {"vld": []},
868 "vnf": [],
869 "image": [],
870 "flavor": [],
871 "action_id": nslcmop_id
872 }
873 desc = await self.RO.deploy(nsr_id, target)
874 action_id = desc["action_id"]
875 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
876 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
877 self.logger.debug(logging_text + "ns terminate action at RO. action_id={}".format(action_id))
878
879 # wait until done
880 delete_timeout = 20 * 60 # 20 minutes
881 await self._wait_ng_ro(nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage)
882
883 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
884 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
885 # delete all nsr
886 await self.RO.delete(nsr_id)
887 except Exception as e:
888 if isinstance(e, NgRoException) and e.http_code == 404: # not found
889 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
890 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
891 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
892 self.logger.debug(logging_text + "RO_action_id={} already deleted".format(action_id))
893 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
894 failed_detail.append("delete conflict: {}".format(e))
895 self.logger.debug(logging_text + "RO_action_id={} delete conflict: {}".format(action_id, e))
896 else:
897 failed_detail.append("delete error: {}".format(e))
898 self.logger.error(logging_text + "RO_action_id={} delete error: {}".format(action_id, e))
899
900 if failed_detail:
901 stage[2] = "Error deleting from VIM"
902 else:
903 stage[2] = "Deleted from VIM"
904 db_nsr_update["detailed-status"] = " ".join(stage)
905 self.update_db_2("nsrs", nsr_id, db_nsr_update)
906 self._write_op_status(nslcmop_id, stage)
907
908 if failed_detail:
909 raise LcmException("; ".join(failed_detail))
910 return
911
912 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds,
913 n2vc_key_list, stage):
914 """
915 Instantiate at RO
916 :param logging_text: preffix text to use at logging
917 :param nsr_id: nsr identity
918 :param nsd: database content of ns descriptor
919 :param db_nsr: database content of ns record
920 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
921 :param db_vnfrs:
922 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
923 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
924 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
925 :return: None or exception
926 """
927 try:
928 start_deploy = time()
929 ns_params = db_nslcmop.get("operationParams")
930 if ns_params and ns_params.get("timeout_ns_deploy"):
931 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
932 else:
933 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
934
935 # Check for and optionally request placement optimization. Database will be updated if placement activated
936 stage[2] = "Waiting for Placement."
937 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
938 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
939 for vnfr in db_vnfrs.values():
940 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
941 break
942 else:
943 ns_params["vimAccountId"] == vnfr["vim-account-id"]
944
945 return await self._instantiate_ng_ro(logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs,
946 db_vnfds, n2vc_key_list, stage, start_deploy, timeout_ns_deploy)
947 except Exception as e:
948 stage[2] = "ERROR deploying at VIM"
949 self.set_vnfr_at_error(db_vnfrs, str(e))
950 self.logger.error("Error deploying at VIM {}".format(e),
951 exc_info=not isinstance(e, (ROclient.ROClientException, LcmException, DbException,
952 NgRoException)))
953 raise
954
955 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
956 """
957 Wait for kdu to be up, get ip address
958 :param logging_text: prefix use for logging
959 :param nsr_id:
960 :param vnfr_id:
961 :param kdu_name:
962 :return: IP address
963 """
964
965 # self.logger.debug(logging_text + "Starting wait_kdu_up")
966 nb_tries = 0
967
968 while nb_tries < 360:
969 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
970 kdur = next((x for x in get_iterable(db_vnfr, "kdur") if x.get("kdu-name") == kdu_name), None)
971 if not kdur:
972 raise LcmException("Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name))
973 if kdur.get("status"):
974 if kdur["status"] in ("READY", "ENABLED"):
975 return kdur.get("ip-address")
976 else:
977 raise LcmException("target KDU={} is in error state".format(kdu_name))
978
979 await asyncio.sleep(10, loop=self.loop)
980 nb_tries += 1
981 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
982
983 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
984 """
985 Wait for ip addres at RO, and optionally, insert public key in virtual machine
986 :param logging_text: prefix use for logging
987 :param nsr_id:
988 :param vnfr_id:
989 :param vdu_id:
990 :param vdu_index:
991 :param pub_key: public ssh key to inject, None to skip
992 :param user: user to apply the public ssh key
993 :return: IP address
994 """
995
996 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
997 ro_nsr_id = None
998 ip_address = None
999 nb_tries = 0
1000 target_vdu_id = None
1001 ro_retries = 0
1002
1003 while True:
1004
1005 ro_retries += 1
1006 if ro_retries >= 360: # 1 hour
1007 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
1008
1009 await asyncio.sleep(10, loop=self.loop)
1010
1011 # get ip address
1012 if not target_vdu_id:
1013 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1014
1015 if not vdu_id: # for the VNF case
1016 if db_vnfr.get("status") == "ERROR":
1017 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1018 ip_address = db_vnfr.get("ip-address")
1019 if not ip_address:
1020 continue
1021 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
1022 else: # VDU case
1023 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
1024 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
1025
1026 if not vdur and len(db_vnfr.get("vdur", ())) == 1: # If only one, this should be the target vdu
1027 vdur = db_vnfr["vdur"][0]
1028 if not vdur:
1029 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id, vdu_id,
1030 vdu_index))
1031 # New generation RO stores information at "vim_info"
1032 ng_ro_status = None
1033 target_vim = None
1034 if vdur.get("vim_info"):
1035 target_vim = next(t for t in vdur["vim_info"]) # there should be only one key
1036 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1037 if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE" or ng_ro_status == "ACTIVE":
1038 ip_address = vdur.get("ip-address")
1039 if not ip_address:
1040 continue
1041 target_vdu_id = vdur["vdu-id-ref"]
1042 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1043 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1044
1045 if not target_vdu_id:
1046 continue
1047
1048 # inject public key into machine
1049 if pub_key and user:
1050 self.logger.debug(logging_text + "Inserting RO key")
1051 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1052 if vdur.get("pdu-type"):
1053 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1054 return ip_address
1055 try:
1056 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
1057 if self.ng_ro:
1058 target = {"action": {"action": "inject_ssh_key", "key": pub_key, "user": user},
1059 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1060 }
1061 desc = await self.RO.deploy(nsr_id, target)
1062 action_id = desc["action_id"]
1063 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1064 break
1065 else:
1066 # wait until NS is deployed at RO
1067 if not ro_nsr_id:
1068 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1069 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
1070 if not ro_nsr_id:
1071 continue
1072 result_dict = await self.RO.create_action(
1073 item="ns",
1074 item_id_name=ro_nsr_id,
1075 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
1076 )
1077 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1078 if not result_dict or not isinstance(result_dict, dict):
1079 raise LcmException("Unknown response from RO when injecting key")
1080 for result in result_dict.values():
1081 if result.get("vim_result") == 200:
1082 break
1083 else:
1084 raise ROclient.ROClientException("error injecting key: {}".format(
1085 result.get("description")))
1086 break
1087 except NgRoException as e:
1088 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1089 except ROclient.ROClientException as e:
1090 if not nb_tries:
1091 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1092 format(e, 20*10))
1093 nb_tries += 1
1094 if nb_tries >= 20:
1095 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1096 else:
1097 break
1098
1099 return ip_address
1100
1101 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1102 """
1103 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1104 """
1105 my_vca = vca_deployed_list[vca_index]
1106 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1107 # vdu or kdu: no dependencies
1108 return
1109 timeout = 300
1110 while timeout >= 0:
1111 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1112 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1113 configuration_status_list = db_nsr["configurationStatus"]
1114 for index, vca_deployed in enumerate(configuration_status_list):
1115 if index == vca_index:
1116 # myself
1117 continue
1118 if not my_vca.get("member-vnf-index") or \
1119 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1120 internal_status = configuration_status_list[index].get("status")
1121 if internal_status == 'READY':
1122 continue
1123 elif internal_status == 'BROKEN':
1124 raise LcmException("Configuration aborted because dependent charm/s has failed")
1125 else:
1126 break
1127 else:
1128 # no dependencies, return
1129 return
1130 await asyncio.sleep(10)
1131 timeout -= 1
1132
1133 raise LcmException("Configuration aborted because dependent charm/s timeout")
1134
1135 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
1136 config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name,
1137 ee_config_descriptor):
1138 nsr_id = db_nsr["_id"]
1139 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1140 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1141 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1142 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1143 db_dict = {
1144 'collection': 'nsrs',
1145 'filter': {'_id': nsr_id},
1146 'path': db_update_entry
1147 }
1148 step = ""
1149 try:
1150
1151 element_type = 'NS'
1152 element_under_configuration = nsr_id
1153
1154 vnfr_id = None
1155 if db_vnfr:
1156 vnfr_id = db_vnfr["_id"]
1157 osm_config["osm"]["vnf_id"] = vnfr_id
1158
1159 namespace = "{nsi}.{ns}".format(
1160 nsi=nsi_id if nsi_id else "",
1161 ns=nsr_id)
1162
1163 if vnfr_id:
1164 element_type = 'VNF'
1165 element_under_configuration = vnfr_id
1166 namespace += ".{}-{}".format(vnfr_id, vdu_index or 0)
1167 if vdu_id:
1168 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1169 element_type = 'VDU'
1170 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1171 osm_config["osm"]["vdu_id"] = vdu_id
1172 elif kdu_name:
1173 namespace += ".{}.{}".format(kdu_name, vdu_index or 0)
1174 element_type = 'KDU'
1175 element_under_configuration = kdu_name
1176 osm_config["osm"]["kdu_name"] = kdu_name
1177
1178 # Get artifact path
1179 artifact_path = "{}/{}/{}/{}".format(
1180 base_folder["folder"],
1181 base_folder["pkg-dir"],
1182 "charms" if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1183 vca_name
1184 )
1185
1186 self.logger.debug("Artifact path > {}".format(artifact_path))
1187
1188 # get initial_config_primitive_list that applies to this element
1189 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1190
1191 self.logger.debug("Initial config primitive list > {}".format(initial_config_primitive_list))
1192
1193 # add config if not present for NS charm
1194 ee_descriptor_id = ee_config_descriptor.get("id")
1195 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1196 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(initial_config_primitive_list,
1197 vca_deployed, ee_descriptor_id)
1198
1199 self.logger.debug("Initial config primitive list #2 > {}".format(initial_config_primitive_list))
1200 # n2vc_redesign STEP 3.1
1201 # find old ee_id if exists
1202 ee_id = vca_deployed.get("ee_id")
1203
1204 vim_account_id = (
1205 deep_get(db_vnfr, ("vim-account-id",)) or
1206 deep_get(deploy_params, ("OSM", "vim_account_id"))
1207 )
1208 vca_cloud, vca_cloud_credential = self.get_vca_cloud_and_credentials(vim_account_id)
1209 vca_k8s_cloud, vca_k8s_cloud_credential = self.get_vca_k8s_cloud_and_credentials(vim_account_id)
1210 # create or register execution environment in VCA
1211 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1212
1213 self._write_configuration_status(
1214 nsr_id=nsr_id,
1215 vca_index=vca_index,
1216 status='CREATING',
1217 element_under_configuration=element_under_configuration,
1218 element_type=element_type
1219 )
1220
1221 step = "create execution environment"
1222 self.logger.debug(logging_text + step)
1223
1224 ee_id = None
1225 credentials = None
1226 if vca_type == "k8s_proxy_charm":
1227 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1228 charm_name=artifact_path[artifact_path.rfind("/") + 1:],
1229 namespace=namespace,
1230 artifact_path=artifact_path,
1231 db_dict=db_dict,
1232 cloud_name=vca_k8s_cloud,
1233 credential_name=vca_k8s_cloud_credential,
1234 )
1235 elif vca_type == "helm" or vca_type == "helm-v3":
1236 ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
1237 namespace=namespace,
1238 reuse_ee_id=ee_id,
1239 db_dict=db_dict,
1240 config=osm_config,
1241 artifact_path=artifact_path,
1242 vca_type=vca_type
1243 )
1244 else:
1245 ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
1246 namespace=namespace,
1247 reuse_ee_id=ee_id,
1248 db_dict=db_dict,
1249 cloud_name=vca_cloud,
1250 credential_name=vca_cloud_credential,
1251 )
1252
1253 elif vca_type == "native_charm":
1254 step = "Waiting to VM being up and getting IP address"
1255 self.logger.debug(logging_text + step)
1256 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1257 user=None, pub_key=None)
1258 credentials = {"hostname": rw_mgmt_ip}
1259 # get username
1260 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1261 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1262 # merged. Meanwhile let's get username from initial-config-primitive
1263 if not username and initial_config_primitive_list:
1264 for config_primitive in initial_config_primitive_list:
1265 for param in config_primitive.get("parameter", ()):
1266 if param["name"] == "ssh-username":
1267 username = param["value"]
1268 break
1269 if not username:
1270 raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
1271 "'config-access.ssh-access.default-user'")
1272 credentials["username"] = username
1273 # n2vc_redesign STEP 3.2
1274
1275 self._write_configuration_status(
1276 nsr_id=nsr_id,
1277 vca_index=vca_index,
1278 status='REGISTERING',
1279 element_under_configuration=element_under_configuration,
1280 element_type=element_type
1281 )
1282
1283 step = "register execution environment {}".format(credentials)
1284 self.logger.debug(logging_text + step)
1285 ee_id = await self.vca_map[vca_type].register_execution_environment(
1286 credentials=credentials,
1287 namespace=namespace,
1288 db_dict=db_dict,
1289 cloud_name=vca_cloud,
1290 credential_name=vca_cloud_credential,
1291 )
1292
1293 # for compatibility with MON/POL modules, the need model and application name at database
1294 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1295 ee_id_parts = ee_id.split('.')
1296 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1297 if len(ee_id_parts) >= 2:
1298 model_name = ee_id_parts[0]
1299 application_name = ee_id_parts[1]
1300 db_nsr_update[db_update_entry + "model"] = model_name
1301 db_nsr_update[db_update_entry + "application"] = application_name
1302
1303 # n2vc_redesign STEP 3.3
1304 step = "Install configuration Software"
1305
1306 self._write_configuration_status(
1307 nsr_id=nsr_id,
1308 vca_index=vca_index,
1309 status='INSTALLING SW',
1310 element_under_configuration=element_under_configuration,
1311 element_type=element_type,
1312 other_update=db_nsr_update
1313 )
1314
1315 # TODO check if already done
1316 self.logger.debug(logging_text + step)
1317 config = None
1318 if vca_type == "native_charm":
1319 config_primitive = next((p for p in initial_config_primitive_list if p["name"] == "config"), None)
1320 if config_primitive:
1321 config = self._map_primitive_params(
1322 config_primitive,
1323 {},
1324 deploy_params
1325 )
1326 num_units = 1
1327 if vca_type == "lxc_proxy_charm":
1328 if element_type == "NS":
1329 num_units = db_nsr.get("config-units") or 1
1330 elif element_type == "VNF":
1331 num_units = db_vnfr.get("config-units") or 1
1332 elif element_type == "VDU":
1333 for v in db_vnfr["vdur"]:
1334 if vdu_id == v["vdu-id-ref"]:
1335 num_units = v.get("config-units") or 1
1336 break
1337 if vca_type != "k8s_proxy_charm":
1338 await self.vca_map[vca_type].install_configuration_sw(
1339 ee_id=ee_id,
1340 artifact_path=artifact_path,
1341 db_dict=db_dict,
1342 config=config,
1343 num_units=num_units,
1344 )
1345
1346 # write in db flag of configuration_sw already installed
1347 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1348
1349 # add relations for this VCA (wait for other peers related with this VCA)
1350 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id,
1351 vca_index=vca_index, vca_type=vca_type)
1352
1353 # if SSH access is required, then get execution environment SSH public
1354 # if native charm we have waited already to VM be UP
1355 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1356 pub_key = None
1357 user = None
1358 # self.logger.debug("get ssh key block")
1359 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1360 # self.logger.debug("ssh key needed")
1361 # Needed to inject a ssh key
1362 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1363 step = "Install configuration Software, getting public ssh key"
1364 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1365
1366 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1367 else:
1368 # self.logger.debug("no need to get ssh key")
1369 step = "Waiting to VM being up and getting IP address"
1370 self.logger.debug(logging_text + step)
1371
1372 # n2vc_redesign STEP 5.1
1373 # wait for RO (ip-address) Insert pub_key into VM
1374 if vnfr_id:
1375 if kdu_name:
1376 rw_mgmt_ip = await self.wait_kdu_up(logging_text, nsr_id, vnfr_id, kdu_name)
1377 else:
1378 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id,
1379 vdu_index, user=user, pub_key=pub_key)
1380 else:
1381 rw_mgmt_ip = None # This is for a NS configuration
1382
1383 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1384
1385 # store rw_mgmt_ip in deploy params for later replacement
1386 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1387
1388 # n2vc_redesign STEP 6 Execute initial config primitive
1389 step = 'execute initial config primitive'
1390
1391 # wait for dependent primitives execution (NS -> VNF -> VDU)
1392 if initial_config_primitive_list:
1393 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1394
1395 # stage, in function of element type: vdu, kdu, vnf or ns
1396 my_vca = vca_deployed_list[vca_index]
1397 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1398 # VDU or KDU
1399 stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1400 elif my_vca.get("member-vnf-index"):
1401 # VNF
1402 stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1403 else:
1404 # NS
1405 stage[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1406
1407 self._write_configuration_status(
1408 nsr_id=nsr_id,
1409 vca_index=vca_index,
1410 status='EXECUTING PRIMITIVE'
1411 )
1412
1413 self._write_op_status(
1414 op_id=nslcmop_id,
1415 stage=stage
1416 )
1417
1418 check_if_terminated_needed = True
1419 for initial_config_primitive in initial_config_primitive_list:
1420 # adding information on the vca_deployed if it is a NS execution environment
1421 if not vca_deployed["member-vnf-index"]:
1422 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1423 # TODO check if already done
1424 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1425
1426 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1427 self.logger.debug(logging_text + step)
1428 await self.vca_map[vca_type].exec_primitive(
1429 ee_id=ee_id,
1430 primitive_name=initial_config_primitive["name"],
1431 params_dict=primitive_params_,
1432 db_dict=db_dict
1433 )
1434 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1435 if check_if_terminated_needed:
1436 if config_descriptor.get('terminate-config-primitive'):
1437 self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True})
1438 check_if_terminated_needed = False
1439
1440 # TODO register in database that primitive is done
1441
1442 # STEP 7 Configure metrics
1443 if vca_type == "helm" or vca_type == "helm-v3":
1444 prometheus_jobs = await self.add_prometheus_metrics(
1445 ee_id=ee_id,
1446 artifact_path=artifact_path,
1447 ee_config_descriptor=ee_config_descriptor,
1448 vnfr_id=vnfr_id,
1449 nsr_id=nsr_id,
1450 target_ip=rw_mgmt_ip,
1451 )
1452 if prometheus_jobs:
1453 self.update_db_2("nsrs", nsr_id, {db_update_entry + "prometheus_jobs": prometheus_jobs})
1454
1455 step = "instantiated at VCA"
1456 self.logger.debug(logging_text + step)
1457
1458 self._write_configuration_status(
1459 nsr_id=nsr_id,
1460 vca_index=vca_index,
1461 status='READY'
1462 )
1463
1464 except Exception as e: # TODO not use Exception but N2VC exception
1465 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1466 if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)):
1467 self.logger.error("Exception while {} : {}".format(step, e), exc_info=True)
1468 self._write_configuration_status(
1469 nsr_id=nsr_id,
1470 vca_index=vca_index,
1471 status='BROKEN'
1472 )
1473 raise LcmException("{} {}".format(step, e)) from e
1474
1475 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1476 error_description: str = None, error_detail: str = None, other_update: dict = None):
1477 """
1478 Update db_nsr fields.
1479 :param nsr_id:
1480 :param ns_state:
1481 :param current_operation:
1482 :param current_operation_id:
1483 :param error_description:
1484 :param error_detail:
1485 :param other_update: Other required changes at database if provided, will be cleared
1486 :return:
1487 """
1488 try:
1489 db_dict = other_update or {}
1490 db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility
1491 db_dict["_admin.current-operation"] = current_operation_id
1492 db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None
1493 db_dict["currentOperation"] = current_operation
1494 db_dict["currentOperationID"] = current_operation_id
1495 db_dict["errorDescription"] = error_description
1496 db_dict["errorDetail"] = error_detail
1497
1498 if ns_state:
1499 db_dict["nsState"] = ns_state
1500 self.update_db_2("nsrs", nsr_id, db_dict)
1501 except DbException as e:
1502 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1503
1504 def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0,
1505 operation_state: str = None, other_update: dict = None):
1506 try:
1507 db_dict = other_update or {}
1508 db_dict['queuePosition'] = queuePosition
1509 if isinstance(stage, list):
1510 db_dict['stage'] = stage[0]
1511 db_dict['detailed-status'] = " ".join(stage)
1512 elif stage is not None:
1513 db_dict['stage'] = str(stage)
1514
1515 if error_message is not None:
1516 db_dict['errorMessage'] = error_message
1517 if operation_state is not None:
1518 db_dict['operationState'] = operation_state
1519 db_dict["statusEnteredTime"] = time()
1520 self.update_db_2("nslcmops", op_id, db_dict)
1521 except DbException as e:
1522 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1523
1524 def _write_all_config_status(self, db_nsr: dict, status: str):
1525 try:
1526 nsr_id = db_nsr["_id"]
1527 # configurationStatus
1528 config_status = db_nsr.get('configurationStatus')
1529 if config_status:
1530 db_nsr_update = {"configurationStatus.{}.status".format(index): status for index, v in
1531 enumerate(config_status) if v}
1532 # update status
1533 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1534
1535 except DbException as e:
1536 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1537
1538 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1539 element_under_configuration: str = None, element_type: str = None,
1540 other_update: dict = None):
1541
1542 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1543 # .format(vca_index, status))
1544
1545 try:
1546 db_path = 'configurationStatus.{}.'.format(vca_index)
1547 db_dict = other_update or {}
1548 if status:
1549 db_dict[db_path + 'status'] = status
1550 if element_under_configuration:
1551 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1552 if element_type:
1553 db_dict[db_path + 'elementType'] = element_type
1554 self.update_db_2("nsrs", nsr_id, db_dict)
1555 except DbException as e:
1556 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1557 .format(status, nsr_id, vca_index, e))
1558
1559 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
1560 """
1561 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1562 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1563 Database is used because the result can be obtained from a different LCM worker in case of HA.
1564 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1565 :param db_nslcmop: database content of nslcmop
1566 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1567 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
1568 computed 'vim-account-id'
1569 """
1570 modified = False
1571 nslcmop_id = db_nslcmop['_id']
1572 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
1573 if placement_engine == "PLA":
1574 self.logger.debug(logging_text + "Invoke and wait for placement optimization")
1575 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id}, loop=self.loop)
1576 db_poll_interval = 5
1577 wait = db_poll_interval * 10
1578 pla_result = None
1579 while not pla_result and wait >= 0:
1580 await asyncio.sleep(db_poll_interval)
1581 wait -= db_poll_interval
1582 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1583 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
1584
1585 if not pla_result:
1586 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id))
1587
1588 for pla_vnf in pla_result['vnf']:
1589 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
1590 if not pla_vnf.get('vimAccountId') or not vnfr:
1591 continue
1592 modified = True
1593 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
1594 # Modifies db_vnfrs
1595 vnfr["vim-account-id"] = pla_vnf['vimAccountId']
1596 return modified
1597
1598 def update_nsrs_with_pla_result(self, params):
1599 try:
1600 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
1601 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
1602 except Exception as e:
1603 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
1604
1605 async def instantiate(self, nsr_id, nslcmop_id):
1606 """
1607
1608 :param nsr_id: ns instance to deploy
1609 :param nslcmop_id: operation to run
1610 :return:
1611 """
1612
1613 # Try to lock HA task here
1614 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1615 if not task_is_locked_by_me:
1616 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1617 return
1618
1619 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1620 self.logger.debug(logging_text + "Enter")
1621
1622 # get all needed from database
1623
1624 # database nsrs record
1625 db_nsr = None
1626
1627 # database nslcmops record
1628 db_nslcmop = None
1629
1630 # update operation on nsrs
1631 db_nsr_update = {}
1632 # update operation on nslcmops
1633 db_nslcmop_update = {}
1634
1635 nslcmop_operation_state = None
1636 db_vnfrs = {} # vnf's info indexed by member-index
1637 # n2vc_info = {}
1638 tasks_dict_info = {} # from task to info text
1639 exc = None
1640 error_list = []
1641 stage = ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1642 # ^ stage, step, VIM progress
1643 try:
1644 # wait for any previous tasks in process
1645 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1646
1647 stage[1] = "Sync filesystem from database."
1648 self.fs.sync() # TODO, make use of partial sync, only for the needed packages
1649
1650 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1651 stage[1] = "Reading from database."
1652 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1653 db_nsr_update["detailed-status"] = "creating"
1654 db_nsr_update["operational-status"] = "init"
1655 self._write_ns_status(
1656 nsr_id=nsr_id,
1657 ns_state="BUILDING",
1658 current_operation="INSTANTIATING",
1659 current_operation_id=nslcmop_id,
1660 other_update=db_nsr_update
1661 )
1662 self._write_op_status(
1663 op_id=nslcmop_id,
1664 stage=stage,
1665 queuePosition=0
1666 )
1667
1668 # read from db: operation
1669 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
1670 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1671 ns_params = db_nslcmop.get("operationParams")
1672 if ns_params and ns_params.get("timeout_ns_deploy"):
1673 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1674 else:
1675 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1676
1677 # read from db: ns
1678 stage[1] = "Getting nsr={} from db.".format(nsr_id)
1679 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1680 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
1681 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
1682 db_nsr["nsd"] = nsd
1683 # nsr_name = db_nsr["name"] # TODO short-name??
1684
1685 # read from db: vnf's of this ns
1686 stage[1] = "Getting vnfrs from db."
1687 self.logger.debug(logging_text + stage[1])
1688 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1689
1690 # read from db: vnfd's for every vnf
1691 db_vnfds = [] # every vnfd data
1692
1693 # for each vnf in ns, read vnfd
1694 for vnfr in db_vnfrs_list:
1695 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
1696 vnfd_id = vnfr["vnfd-id"]
1697 vnfd_ref = vnfr["vnfd-ref"]
1698
1699 # if we haven't this vnfd, read it from db
1700 if vnfd_id not in db_vnfds:
1701 # read from db
1702 stage[1] = "Getting vnfd={} id='{}' from db.".format(vnfd_id, vnfd_ref)
1703 self.logger.debug(logging_text + stage[1])
1704 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1705
1706 # store vnfd
1707 db_vnfds.append(vnfd)
1708
1709 # Get or generates the _admin.deployed.VCA list
1710 vca_deployed_list = None
1711 if db_nsr["_admin"].get("deployed"):
1712 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1713 if vca_deployed_list is None:
1714 vca_deployed_list = []
1715 configuration_status_list = []
1716 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1717 db_nsr_update["configurationStatus"] = configuration_status_list
1718 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1719 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1720 elif isinstance(vca_deployed_list, dict):
1721 # maintain backward compatibility. Change a dict to list at database
1722 vca_deployed_list = list(vca_deployed_list.values())
1723 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1724 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1725
1726 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1727 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1728 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1729
1730 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1731 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1732 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1733 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"})
1734
1735 # n2vc_redesign STEP 2 Deploy Network Scenario
1736 stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
1737 self._write_op_status(
1738 op_id=nslcmop_id,
1739 stage=stage
1740 )
1741
1742 stage[1] = "Deploying KDUs."
1743 # self.logger.debug(logging_text + "Before deploy_kdus")
1744 # Call to deploy_kdus in case exists the "vdu:kdu" param
1745 await self.deploy_kdus(
1746 logging_text=logging_text,
1747 nsr_id=nsr_id,
1748 nslcmop_id=nslcmop_id,
1749 db_vnfrs=db_vnfrs,
1750 db_vnfds=db_vnfds,
1751 task_instantiation_info=tasks_dict_info,
1752 )
1753
1754 stage[1] = "Getting VCA public key."
1755 # n2vc_redesign STEP 1 Get VCA public ssh-key
1756 # feature 1429. Add n2vc public key to needed VMs
1757 n2vc_key = self.n2vc.get_public_key()
1758 n2vc_key_list = [n2vc_key]
1759 if self.vca_config.get("public_key"):
1760 n2vc_key_list.append(self.vca_config["public_key"])
1761
1762 stage[1] = "Deploying NS at VIM."
1763 task_ro = asyncio.ensure_future(
1764 self.instantiate_RO(
1765 logging_text=logging_text,
1766 nsr_id=nsr_id,
1767 nsd=nsd,
1768 db_nsr=db_nsr,
1769 db_nslcmop=db_nslcmop,
1770 db_vnfrs=db_vnfrs,
1771 db_vnfds=db_vnfds,
1772 n2vc_key_list=n2vc_key_list,
1773 stage=stage
1774 )
1775 )
1776 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1777 tasks_dict_info[task_ro] = "Deploying at VIM"
1778
1779 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1780 stage[1] = "Deploying Execution Environments."
1781 self.logger.debug(logging_text + stage[1])
1782
1783 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1784 for vnf_profile in get_vnf_profiles(nsd):
1785 vnfd_id = vnf_profile["vnfd-id"]
1786 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
1787 member_vnf_index = str(vnf_profile["id"])
1788 db_vnfr = db_vnfrs[member_vnf_index]
1789 base_folder = vnfd["_admin"]["storage"]
1790 vdu_id = None
1791 vdu_index = 0
1792 vdu_name = None
1793 kdu_name = None
1794
1795 # Get additional parameters
1796 deploy_params = {"OSM": get_osm_params(db_vnfr)}
1797 if db_vnfr.get("additionalParamsForVnf"):
1798 deploy_params.update(parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy()))
1799
1800 descriptor_config = get_configuration(vnfd, vnfd["id"])
1801 if descriptor_config:
1802 self._deploy_n2vc(
1803 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1804 db_nsr=db_nsr,
1805 db_vnfr=db_vnfr,
1806 nslcmop_id=nslcmop_id,
1807 nsr_id=nsr_id,
1808 nsi_id=nsi_id,
1809 vnfd_id=vnfd_id,
1810 vdu_id=vdu_id,
1811 kdu_name=kdu_name,
1812 member_vnf_index=member_vnf_index,
1813 vdu_index=vdu_index,
1814 vdu_name=vdu_name,
1815 deploy_params=deploy_params,
1816 descriptor_config=descriptor_config,
1817 base_folder=base_folder,
1818 task_instantiation_info=tasks_dict_info,
1819 stage=stage
1820 )
1821
1822 # Deploy charms for each VDU that supports one.
1823 for vdud in get_vdu_list(vnfd):
1824 vdu_id = vdud["id"]
1825 descriptor_config = get_configuration(vnfd, vdu_id)
1826 vdur = find_in_list(db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id)
1827
1828 if vdur.get("additionalParams"):
1829 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
1830 else:
1831 deploy_params_vdu = deploy_params
1832 deploy_params_vdu["OSM"] = get_osm_params(db_vnfr, vdu_id, vdu_count_index=0)
1833 vdud_count = get_vdu_profile(vnfd, vdu_id).get("max-number-of-instances", 1)
1834
1835 self.logger.debug("VDUD > {}".format(vdud))
1836 self.logger.debug("Descriptor config > {}".format(descriptor_config))
1837 if descriptor_config:
1838 vdu_name = None
1839 kdu_name = None
1840 for vdu_index in range(vdud_count):
1841 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1842 self._deploy_n2vc(
1843 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1844 member_vnf_index, vdu_id, vdu_index),
1845 db_nsr=db_nsr,
1846 db_vnfr=db_vnfr,
1847 nslcmop_id=nslcmop_id,
1848 nsr_id=nsr_id,
1849 nsi_id=nsi_id,
1850 vnfd_id=vnfd_id,
1851 vdu_id=vdu_id,
1852 kdu_name=kdu_name,
1853 member_vnf_index=member_vnf_index,
1854 vdu_index=vdu_index,
1855 vdu_name=vdu_name,
1856 deploy_params=deploy_params_vdu,
1857 descriptor_config=descriptor_config,
1858 base_folder=base_folder,
1859 task_instantiation_info=tasks_dict_info,
1860 stage=stage
1861 )
1862 for kdud in get_kdu_list(vnfd):
1863 kdu_name = kdud["name"]
1864 descriptor_config = get_configuration(vnfd, kdu_name)
1865 if descriptor_config:
1866 vdu_id = None
1867 vdu_index = 0
1868 vdu_name = None
1869 kdur = next(x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name)
1870 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
1871 if kdur.get("additionalParams"):
1872 deploy_params_kdu = parse_yaml_strings(kdur["additionalParams"])
1873
1874 self._deploy_n2vc(
1875 logging_text=logging_text,
1876 db_nsr=db_nsr,
1877 db_vnfr=db_vnfr,
1878 nslcmop_id=nslcmop_id,
1879 nsr_id=nsr_id,
1880 nsi_id=nsi_id,
1881 vnfd_id=vnfd_id,
1882 vdu_id=vdu_id,
1883 kdu_name=kdu_name,
1884 member_vnf_index=member_vnf_index,
1885 vdu_index=vdu_index,
1886 vdu_name=vdu_name,
1887 deploy_params=deploy_params_kdu,
1888 descriptor_config=descriptor_config,
1889 base_folder=base_folder,
1890 task_instantiation_info=tasks_dict_info,
1891 stage=stage
1892 )
1893
1894 # Check if this NS has a charm configuration
1895 descriptor_config = nsd.get("ns-configuration")
1896 if descriptor_config and descriptor_config.get("juju"):
1897 vnfd_id = None
1898 db_vnfr = None
1899 member_vnf_index = None
1900 vdu_id = None
1901 kdu_name = None
1902 vdu_index = 0
1903 vdu_name = None
1904
1905 # Get additional parameters
1906 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
1907 if db_nsr.get("additionalParamsForNs"):
1908 deploy_params.update(parse_yaml_strings(db_nsr["additionalParamsForNs"].copy()))
1909 base_folder = nsd["_admin"]["storage"]
1910 self._deploy_n2vc(
1911 logging_text=logging_text,
1912 db_nsr=db_nsr,
1913 db_vnfr=db_vnfr,
1914 nslcmop_id=nslcmop_id,
1915 nsr_id=nsr_id,
1916 nsi_id=nsi_id,
1917 vnfd_id=vnfd_id,
1918 vdu_id=vdu_id,
1919 kdu_name=kdu_name,
1920 member_vnf_index=member_vnf_index,
1921 vdu_index=vdu_index,
1922 vdu_name=vdu_name,
1923 deploy_params=deploy_params,
1924 descriptor_config=descriptor_config,
1925 base_folder=base_folder,
1926 task_instantiation_info=tasks_dict_info,
1927 stage=stage
1928 )
1929
1930 # rest of staff will be done at finally
1931
1932 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
1933 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e))
1934 exc = e
1935 except asyncio.CancelledError:
1936 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
1937 exc = "Operation was cancelled"
1938 except Exception as e:
1939 exc = traceback.format_exc()
1940 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
1941 finally:
1942 if exc:
1943 error_list.append(str(exc))
1944 try:
1945 # wait for pending tasks
1946 if tasks_dict_info:
1947 stage[1] = "Waiting for instantiate pending tasks."
1948 self.logger.debug(logging_text + stage[1])
1949 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy,
1950 stage, nslcmop_id, nsr_id=nsr_id)
1951 stage[1] = stage[2] = ""
1952 except asyncio.CancelledError:
1953 error_list.append("Cancelled")
1954 # TODO cancel all tasks
1955 except Exception as exc:
1956 error_list.append(str(exc))
1957
1958 # update operation-status
1959 db_nsr_update["operational-status"] = "running"
1960 # let's begin with VCA 'configured' status (later we can change it)
1961 db_nsr_update["config-status"] = "configured"
1962 for task, task_name in tasks_dict_info.items():
1963 if not task.done() or task.cancelled() or task.exception():
1964 if task_name.startswith(self.task_name_deploy_vca):
1965 # A N2VC task is pending
1966 db_nsr_update["config-status"] = "failed"
1967 else:
1968 # RO or KDU task is pending
1969 db_nsr_update["operational-status"] = "failed"
1970
1971 # update status at database
1972 if error_list:
1973 error_detail = ". ".join(error_list)
1974 self.logger.error(logging_text + error_detail)
1975 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
1976 error_description_nsr = 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id, stage[0])
1977
1978 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
1979 db_nslcmop_update["detailed-status"] = error_detail
1980 nslcmop_operation_state = "FAILED"
1981 ns_state = "BROKEN"
1982 else:
1983 error_detail = None
1984 error_description_nsr = error_description_nslcmop = None
1985 ns_state = "READY"
1986 db_nsr_update["detailed-status"] = "Done"
1987 db_nslcmop_update["detailed-status"] = "Done"
1988 nslcmop_operation_state = "COMPLETED"
1989
1990 if db_nsr:
1991 self._write_ns_status(
1992 nsr_id=nsr_id,
1993 ns_state=ns_state,
1994 current_operation="IDLE",
1995 current_operation_id=None,
1996 error_description=error_description_nsr,
1997 error_detail=error_detail,
1998 other_update=db_nsr_update
1999 )
2000 self._write_op_status(
2001 op_id=nslcmop_id,
2002 stage="",
2003 error_message=error_description_nslcmop,
2004 operation_state=nslcmop_operation_state,
2005 other_update=db_nslcmop_update,
2006 )
2007
2008 if nslcmop_operation_state:
2009 try:
2010 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2011 "operationState": nslcmop_operation_state},
2012 loop=self.loop)
2013 except Exception as e:
2014 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2015
2016 self.logger.debug(logging_text + "Exit")
2017 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2018
2019 async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int,
2020 timeout: int = 3600, vca_type: str = None) -> bool:
2021
2022 # steps:
2023 # 1. find all relations for this VCA
2024 # 2. wait for other peers related
2025 # 3. add relations
2026
2027 try:
2028 vca_type = vca_type or "lxc_proxy_charm"
2029
2030 # STEP 1: find all relations for this VCA
2031
2032 # read nsr record
2033 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2034 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2035
2036 # this VCA data
2037 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
2038
2039 # read all ns-configuration relations
2040 ns_relations = list()
2041 db_ns_relations = deep_get(nsd, ('ns-configuration', 'relation'))
2042 if db_ns_relations:
2043 for r in db_ns_relations:
2044 # check if this VCA is in the relation
2045 if my_vca.get('member-vnf-index') in\
2046 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2047 ns_relations.append(r)
2048
2049 # read all vnf-configuration relations
2050 vnf_relations = list()
2051 db_vnfd_list = db_nsr.get('vnfd-id')
2052 if db_vnfd_list:
2053 for vnfd in db_vnfd_list:
2054 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
2055 db_vnf_relations = get_configuration(db_vnfd, db_vnfd["id"]).get("relation", [])
2056 if db_vnf_relations:
2057 for r in db_vnf_relations:
2058 # check if this VCA is in the relation
2059 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2060 vnf_relations.append(r)
2061
2062 # if no relations, terminate
2063 if not ns_relations and not vnf_relations:
2064 self.logger.debug(logging_text + ' No relations')
2065 return True
2066
2067 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
2068
2069 # add all relations
2070 start = time()
2071 while True:
2072 # check timeout
2073 now = time()
2074 if now - start >= timeout:
2075 self.logger.error(logging_text + ' : timeout adding relations')
2076 return False
2077
2078 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2079 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2080
2081 # for each defined NS relation, find the VCA's related
2082 for r in ns_relations.copy():
2083 from_vca_ee_id = None
2084 to_vca_ee_id = None
2085 from_vca_endpoint = None
2086 to_vca_endpoint = None
2087 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2088 for vca in vca_list:
2089 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
2090 and vca.get('config_sw_installed'):
2091 from_vca_ee_id = vca.get('ee_id')
2092 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2093 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
2094 and vca.get('config_sw_installed'):
2095 to_vca_ee_id = vca.get('ee_id')
2096 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2097 if from_vca_ee_id and to_vca_ee_id:
2098 # add relation
2099 await self.vca_map[vca_type].add_relation(
2100 ee_id_1=from_vca_ee_id,
2101 ee_id_2=to_vca_ee_id,
2102 endpoint_1=from_vca_endpoint,
2103 endpoint_2=to_vca_endpoint)
2104 # remove entry from relations list
2105 ns_relations.remove(r)
2106 else:
2107 # check failed peers
2108 try:
2109 vca_status_list = db_nsr.get('configurationStatus')
2110 if vca_status_list:
2111 for i in range(len(vca_list)):
2112 vca = vca_list[i]
2113 vca_status = vca_status_list[i]
2114 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
2115 if vca_status.get('status') == 'BROKEN':
2116 # peer broken: remove relation from list
2117 ns_relations.remove(r)
2118 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
2119 if vca_status.get('status') == 'BROKEN':
2120 # peer broken: remove relation from list
2121 ns_relations.remove(r)
2122 except Exception:
2123 # ignore
2124 pass
2125
2126 # for each defined VNF relation, find the VCA's related
2127 for r in vnf_relations.copy():
2128 from_vca_ee_id = None
2129 to_vca_ee_id = None
2130 from_vca_endpoint = None
2131 to_vca_endpoint = None
2132 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2133 for vca in vca_list:
2134 key_to_check = "vdu_id"
2135 if vca.get("vdu_id") is None:
2136 key_to_check = "vnfd_id"
2137 if vca.get(key_to_check) == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
2138 from_vca_ee_id = vca.get('ee_id')
2139 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2140 if vca.get(key_to_check) == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
2141 to_vca_ee_id = vca.get('ee_id')
2142 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2143 if from_vca_ee_id and to_vca_ee_id:
2144 # add relation
2145 await self.vca_map[vca_type].add_relation(
2146 ee_id_1=from_vca_ee_id,
2147 ee_id_2=to_vca_ee_id,
2148 endpoint_1=from_vca_endpoint,
2149 endpoint_2=to_vca_endpoint)
2150 # remove entry from relations list
2151 vnf_relations.remove(r)
2152 else:
2153 # check failed peers
2154 try:
2155 vca_status_list = db_nsr.get('configurationStatus')
2156 if vca_status_list:
2157 for i in range(len(vca_list)):
2158 vca = vca_list[i]
2159 vca_status = vca_status_list[i]
2160 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
2161 if vca_status.get('status') == 'BROKEN':
2162 # peer broken: remove relation from list
2163 vnf_relations.remove(r)
2164 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2165 if vca_status.get('status') == 'BROKEN':
2166 # peer broken: remove relation from list
2167 vnf_relations.remove(r)
2168 except Exception:
2169 # ignore
2170 pass
2171
2172 # wait for next try
2173 await asyncio.sleep(5.0)
2174
2175 if not ns_relations and not vnf_relations:
2176 self.logger.debug('Relations added')
2177 break
2178
2179 return True
2180
2181 except Exception as e:
2182 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2183 return False
2184
2185 async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdud: dict,
2186 vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600):
2187
2188 try:
2189 k8sclustertype = k8s_instance_info["k8scluster-type"]
2190 # Instantiate kdu
2191 db_dict_install = {"collection": "nsrs",
2192 "filter": {"_id": nsr_id},
2193 "path": nsr_db_path}
2194
2195 kdu_instance = self.k8scluster_map[k8sclustertype].generate_kdu_instance_name(
2196 db_dict=db_dict_install,
2197 kdu_model=k8s_instance_info["kdu-model"],
2198 kdu_name=k8s_instance_info["kdu-name"],
2199 )
2200 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
2201 await self.k8scluster_map[k8sclustertype].install(
2202 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2203 kdu_model=k8s_instance_info["kdu-model"],
2204 atomic=True,
2205 params=k8params,
2206 db_dict=db_dict_install,
2207 timeout=timeout,
2208 kdu_name=k8s_instance_info["kdu-name"],
2209 namespace=k8s_instance_info["namespace"],
2210 kdu_instance=kdu_instance,
2211 )
2212 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
2213
2214 # Obtain services to obtain management service ip
2215 services = await self.k8scluster_map[k8sclustertype].get_services(
2216 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2217 kdu_instance=kdu_instance,
2218 namespace=k8s_instance_info["namespace"])
2219
2220 # Obtain management service info (if exists)
2221 vnfr_update_dict = {}
2222 if services:
2223 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
2224 mgmt_services = [service for service in kdud.get("service", []) if service.get("mgmt-service")]
2225 for mgmt_service in mgmt_services:
2226 for service in services:
2227 if service["name"].startswith(mgmt_service["name"]):
2228 # Mgmt service found, Obtain service ip
2229 ip = service.get("external_ip", service.get("cluster_ip"))
2230 if isinstance(ip, list) and len(ip) == 1:
2231 ip = ip[0]
2232
2233 vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip
2234
2235 # Check if must update also mgmt ip at the vnf
2236 service_external_cp = mgmt_service.get("external-connection-point-ref")
2237 if service_external_cp:
2238 if deep_get(vnfd, ("mgmt-interface", "cp")) == service_external_cp:
2239 vnfr_update_dict["ip-address"] = ip
2240
2241 break
2242 else:
2243 self.logger.warn("Mgmt service name: {} not found".format(mgmt_service["name"]))
2244
2245 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
2246 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
2247
2248 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
2249 if kdu_config and kdu_config.get("initial-config-primitive") and \
2250 get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None:
2251 initial_config_primitive_list = kdu_config.get("initial-config-primitive")
2252 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
2253
2254 for initial_config_primitive in initial_config_primitive_list:
2255 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, {})
2256
2257 await asyncio.wait_for(
2258 self.k8scluster_map[k8sclustertype].exec_primitive(
2259 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2260 kdu_instance=kdu_instance,
2261 primitive_name=initial_config_primitive["name"],
2262 params=primitive_params_, db_dict={}),
2263 timeout=timeout)
2264
2265 except Exception as e:
2266 # Prepare update db with error and raise exception
2267 try:
2268 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)})
2269 self.update_db_2("vnfrs", vnfr_data.get("_id"), {"kdur.{}.status".format(kdu_index): "ERROR"})
2270 except Exception:
2271 # ignore to keep original exception
2272 pass
2273 # reraise original error
2274 raise
2275
2276 return kdu_instance
2277
2278 async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
2279 # Launch kdus if present in the descriptor
2280
2281 k8scluster_id_2_uuic = {"helm-chart-v3": {}, "helm-chart": {}, "juju-bundle": {}}
2282
2283 async def _get_cluster_id(cluster_id, cluster_type):
2284 nonlocal k8scluster_id_2_uuic
2285 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2286 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2287
2288 # check if K8scluster is creating and wait look if previous tasks in process
2289 task_name, task_dependency = self.lcm_tasks.lookfor_related("k8scluster", cluster_id)
2290 if task_dependency:
2291 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name, cluster_id)
2292 self.logger.debug(logging_text + text)
2293 await asyncio.wait(task_dependency, timeout=3600)
2294
2295 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2296 if not db_k8scluster:
2297 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2298
2299 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2300 if not k8s_id:
2301 if cluster_type == "helm-chart-v3":
2302 try:
2303 # backward compatibility for existing clusters that have not been initialized for helm v3
2304 k8s_credentials = yaml.safe_dump(db_k8scluster.get("credentials"))
2305 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(k8s_credentials,
2306 reuse_cluster_uuid=cluster_id)
2307 db_k8scluster_update = {}
2308 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
2309 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
2310 db_k8scluster_update["_admin.helm-chart-v3.created"] = uninstall_sw
2311 db_k8scluster_update["_admin.helm-chart-v3.operationalState"] = "ENABLED"
2312 self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update)
2313 except Exception as e:
2314 self.logger.error(logging_text + "error initializing helm-v3 cluster: {}".format(str(e)))
2315 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id,
2316 cluster_type))
2317 else:
2318 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".
2319 format(cluster_id, cluster_type))
2320 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2321 return k8s_id
2322
2323 logging_text += "Deploy kdus: "
2324 step = ""
2325 try:
2326 db_nsr_update = {"_admin.deployed.K8s": []}
2327 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2328
2329 index = 0
2330 updated_cluster_list = []
2331 updated_v3_cluster_list = []
2332
2333 for vnfr_data in db_vnfrs.values():
2334 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
2335 # Step 0: Prepare and set parameters
2336 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
2337 vnfd_id = vnfr_data.get('vnfd-id')
2338 vnfd_with_id = find_in_list(db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id)
2339 kdud = next(kdud for kdud in vnfd_with_id["kdu"] if kdud["name"] == kdur["kdu-name"])
2340 namespace = kdur.get("k8s-namespace")
2341 if kdur.get("helm-chart"):
2342 kdumodel = kdur["helm-chart"]
2343 # Default version: helm3, if helm-version is v2 assign v2
2344 k8sclustertype = "helm-chart-v3"
2345 self.logger.debug("kdur: {}".format(kdur))
2346 if kdur.get("helm-version") and kdur.get("helm-version") == "v2":
2347 k8sclustertype = "helm-chart"
2348 elif kdur.get("juju-bundle"):
2349 kdumodel = kdur["juju-bundle"]
2350 k8sclustertype = "juju-bundle"
2351 else:
2352 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2353 "juju-bundle. Maybe an old NBI version is running".
2354 format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
2355 # check if kdumodel is a file and exists
2356 try:
2357 vnfd_with_id = find_in_list(db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id)
2358 storage = deep_get(vnfd_with_id, ('_admin', 'storage'))
2359 if storage and storage.get('pkg-dir'): # may be not present if vnfd has not artifacts
2360 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2361 filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["pkg-dir"], k8sclustertype,
2362 kdumodel)
2363 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2364 kdumodel = self.fs.path + filename
2365 except (asyncio.TimeoutError, asyncio.CancelledError):
2366 raise
2367 except Exception: # it is not a file
2368 pass
2369
2370 k8s_cluster_id = kdur["k8s-cluster"]["id"]
2371 step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
2372 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
2373
2374 # Synchronize repos
2375 if (k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list)\
2376 or (k8sclustertype == "helm-chart-v3" and cluster_uuid not in updated_v3_cluster_list):
2377 del_repo_list, added_repo_dict = await asyncio.ensure_future(
2378 self.k8scluster_map[k8sclustertype].synchronize_repos(cluster_uuid=cluster_uuid))
2379 if del_repo_list or added_repo_dict:
2380 if k8sclustertype == "helm-chart":
2381 unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
2382 updated = {'_admin.helm_charts_added.' +
2383 item: name for item, name in added_repo_dict.items()}
2384 updated_cluster_list.append(cluster_uuid)
2385 elif k8sclustertype == "helm-chart-v3":
2386 unset = {'_admin.helm_charts_v3_added.' + item: None for item in del_repo_list}
2387 updated = {'_admin.helm_charts_v3_added.' +
2388 item: name for item, name in added_repo_dict.items()}
2389 updated_v3_cluster_list.append(cluster_uuid)
2390 self.logger.debug(logging_text + "repos synchronized on k8s cluster "
2391 "'{}' to_delete: {}, to_add: {}".
2392 format(k8s_cluster_id, del_repo_list, added_repo_dict))
2393 self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
2394
2395 # Instantiate kdu
2396 step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
2397 kdur["kdu-name"], k8s_cluster_id)
2398 k8s_instance_info = {"kdu-instance": None,
2399 "k8scluster-uuid": cluster_uuid,
2400 "k8scluster-type": k8sclustertype,
2401 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
2402 "kdu-name": kdur["kdu-name"],
2403 "kdu-model": kdumodel,
2404 "namespace": namespace}
2405 db_path = "_admin.deployed.K8s.{}".format(index)
2406 db_nsr_update[db_path] = k8s_instance_info
2407 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2408 vnfd_with_id = find_in_list(db_vnfds, lambda vnf: vnf["_id"] == vnfd_id)
2409 task = asyncio.ensure_future(
2410 self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdud, vnfd_with_id,
2411 k8s_instance_info, k8params=desc_params, timeout=600))
2412 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
2413 task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
2414
2415 index += 1
2416
2417 except (LcmException, asyncio.CancelledError):
2418 raise
2419 except Exception as e:
2420 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
2421 if isinstance(e, (N2VCException, DbException)):
2422 self.logger.error(logging_text + msg)
2423 else:
2424 self.logger.critical(logging_text + msg, exc_info=True)
2425 raise LcmException(msg)
2426 finally:
2427 if db_nsr_update:
2428 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2429
2430 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2431 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2432 base_folder, task_instantiation_info, stage):
2433 # launch instantiate_N2VC in a asyncio task and register task object
2434 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2435 # if not found, create one entry and update database
2436 # fill db_nsr._admin.deployed.VCA.<index>
2437
2438 self.logger.debug(logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id))
2439 if "execution-environment-list" in descriptor_config:
2440 ee_list = descriptor_config.get("execution-environment-list", [])
2441 else: # other types as script are not supported
2442 ee_list = []
2443
2444 for ee_item in ee_list:
2445 self.logger.debug(logging_text + "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item.get('juju'),
2446 ee_item.get("helm-chart")))
2447 ee_descriptor_id = ee_item.get("id")
2448 if ee_item.get("juju"):
2449 vca_name = ee_item['juju'].get('charm')
2450 vca_type = "lxc_proxy_charm" if ee_item['juju'].get('charm') is not None else "native_charm"
2451 if ee_item['juju'].get('cloud') == "k8s":
2452 vca_type = "k8s_proxy_charm"
2453 elif ee_item['juju'].get('proxy') is False:
2454 vca_type = "native_charm"
2455 elif ee_item.get("helm-chart"):
2456 vca_name = ee_item['helm-chart']
2457 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
2458 vca_type = "helm"
2459 else:
2460 vca_type = "helm-v3"
2461 else:
2462 self.logger.debug(logging_text + "skipping non juju neither charm configuration")
2463 continue
2464
2465 vca_index = -1
2466 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2467 if not vca_deployed:
2468 continue
2469 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2470 vca_deployed.get("vdu_id") == vdu_id and \
2471 vca_deployed.get("kdu_name") == kdu_name and \
2472 vca_deployed.get("vdu_count_index", 0) == vdu_index and \
2473 vca_deployed.get("ee_descriptor_id") == ee_descriptor_id:
2474 break
2475 else:
2476 # not found, create one.
2477 target = "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
2478 if vdu_id:
2479 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
2480 elif kdu_name:
2481 target += "/kdu/{}".format(kdu_name)
2482 vca_deployed = {
2483 "target_element": target,
2484 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
2485 "member-vnf-index": member_vnf_index,
2486 "vdu_id": vdu_id,
2487 "kdu_name": kdu_name,
2488 "vdu_count_index": vdu_index,
2489 "operational-status": "init", # TODO revise
2490 "detailed-status": "", # TODO revise
2491 "step": "initial-deploy", # TODO revise
2492 "vnfd_id": vnfd_id,
2493 "vdu_name": vdu_name,
2494 "type": vca_type,
2495 "ee_descriptor_id": ee_descriptor_id
2496 }
2497 vca_index += 1
2498
2499 # create VCA and configurationStatus in db
2500 db_dict = {
2501 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2502 "configurationStatus.{}".format(vca_index): dict()
2503 }
2504 self.update_db_2("nsrs", nsr_id, db_dict)
2505
2506 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2507
2508 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
2509 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
2510 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
2511
2512 # Launch task
2513 task_n2vc = asyncio.ensure_future(
2514 self.instantiate_N2VC(
2515 logging_text=logging_text,
2516 vca_index=vca_index,
2517 nsi_id=nsi_id,
2518 db_nsr=db_nsr,
2519 db_vnfr=db_vnfr,
2520 vdu_id=vdu_id,
2521 kdu_name=kdu_name,
2522 vdu_index=vdu_index,
2523 deploy_params=deploy_params,
2524 config_descriptor=descriptor_config,
2525 base_folder=base_folder,
2526 nslcmop_id=nslcmop_id,
2527 stage=stage,
2528 vca_type=vca_type,
2529 vca_name=vca_name,
2530 ee_config_descriptor=ee_item
2531 )
2532 )
2533 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2534 task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
2535 member_vnf_index or "", vdu_id or "")
2536
2537 @staticmethod
2538 def _create_nslcmop(nsr_id, operation, params):
2539 """
2540 Creates a ns-lcm-opp content to be stored at database.
2541 :param nsr_id: internal id of the instance
2542 :param operation: instantiate, terminate, scale, action, ...
2543 :param params: user parameters for the operation
2544 :return: dictionary following SOL005 format
2545 """
2546 # Raise exception if invalid arguments
2547 if not (nsr_id and operation and params):
2548 raise LcmException(
2549 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2550 now = time()
2551 _id = str(uuid4())
2552 nslcmop = {
2553 "id": _id,
2554 "_id": _id,
2555 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2556 "operationState": "PROCESSING",
2557 "statusEnteredTime": now,
2558 "nsInstanceId": nsr_id,
2559 "lcmOperationType": operation,
2560 "startTime": now,
2561 "isAutomaticInvocation": False,
2562 "operationParams": params,
2563 "isCancelPending": False,
2564 "links": {
2565 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2566 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2567 }
2568 }
2569 return nslcmop
2570
2571 def _format_additional_params(self, params):
2572 params = params or {}
2573 for key, value in params.items():
2574 if str(value).startswith("!!yaml "):
2575 params[key] = yaml.safe_load(value[7:])
2576 return params
2577
2578 def _get_terminate_primitive_params(self, seq, vnf_index):
2579 primitive = seq.get('name')
2580 primitive_params = {}
2581 params = {
2582 "member_vnf_index": vnf_index,
2583 "primitive": primitive,
2584 "primitive_params": primitive_params,
2585 }
2586 desc_params = {}
2587 return self._map_primitive_params(seq, params, desc_params)
2588
2589 # sub-operations
2590
2591 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
2592 op = deep_get(db_nslcmop, ('_admin', 'operations'), [])[op_index]
2593 if op.get('operationState') == 'COMPLETED':
2594 # b. Skip sub-operation
2595 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2596 return self.SUBOPERATION_STATUS_SKIP
2597 else:
2598 # c. retry executing sub-operation
2599 # The sub-operation exists, and operationState != 'COMPLETED'
2600 # Update operationState = 'PROCESSING' to indicate a retry.
2601 operationState = 'PROCESSING'
2602 detailed_status = 'In progress'
2603 self._update_suboperation_status(
2604 db_nslcmop, op_index, operationState, detailed_status)
2605 # Return the sub-operation index
2606 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2607 # with arguments extracted from the sub-operation
2608 return op_index
2609
2610 # Find a sub-operation where all keys in a matching dictionary must match
2611 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2612 def _find_suboperation(self, db_nslcmop, match):
2613 if db_nslcmop and match:
2614 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2615 for i, op in enumerate(op_list):
2616 if all(op.get(k) == match[k] for k in match):
2617 return i
2618 return self.SUBOPERATION_STATUS_NOT_FOUND
2619
2620 # Update status for a sub-operation given its index
2621 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2622 # Update DB for HA tasks
2623 q_filter = {'_id': db_nslcmop['_id']}
2624 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2625 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2626 self.db.set_one("nslcmops",
2627 q_filter=q_filter,
2628 update_dict=update_dict,
2629 fail_on_empty=False)
2630
2631 # Add sub-operation, return the index of the added sub-operation
2632 # Optionally, set operationState, detailed-status, and operationType
2633 # Status and type are currently set for 'scale' sub-operations:
2634 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2635 # 'detailed-status' : status message
2636 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2637 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2638 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2639 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2640 RO_nsr_id=None, RO_scaling_info=None):
2641 if not db_nslcmop:
2642 return self.SUBOPERATION_STATUS_NOT_FOUND
2643 # Get the "_admin.operations" list, if it exists
2644 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2645 op_list = db_nslcmop_admin.get('operations')
2646 # Create or append to the "_admin.operations" list
2647 new_op = {'member_vnf_index': vnf_index,
2648 'vdu_id': vdu_id,
2649 'vdu_count_index': vdu_count_index,
2650 'primitive': primitive,
2651 'primitive_params': mapped_primitive_params}
2652 if operationState:
2653 new_op['operationState'] = operationState
2654 if detailed_status:
2655 new_op['detailed-status'] = detailed_status
2656 if operationType:
2657 new_op['lcmOperationType'] = operationType
2658 if RO_nsr_id:
2659 new_op['RO_nsr_id'] = RO_nsr_id
2660 if RO_scaling_info:
2661 new_op['RO_scaling_info'] = RO_scaling_info
2662 if not op_list:
2663 # No existing operations, create key 'operations' with current operation as first list element
2664 db_nslcmop_admin.update({'operations': [new_op]})
2665 op_list = db_nslcmop_admin.get('operations')
2666 else:
2667 # Existing operations, append operation to list
2668 op_list.append(new_op)
2669
2670 db_nslcmop_update = {'_admin.operations': op_list}
2671 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2672 op_index = len(op_list) - 1
2673 return op_index
2674
2675 # Helper methods for scale() sub-operations
2676
2677 # pre-scale/post-scale:
2678 # Check for 3 different cases:
2679 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2680 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2681 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2682 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2683 operationType, RO_nsr_id=None, RO_scaling_info=None):
2684 # Find this sub-operation
2685 if RO_nsr_id and RO_scaling_info:
2686 operationType = 'SCALE-RO'
2687 match = {
2688 'member_vnf_index': vnf_index,
2689 'RO_nsr_id': RO_nsr_id,
2690 'RO_scaling_info': RO_scaling_info,
2691 }
2692 else:
2693 match = {
2694 'member_vnf_index': vnf_index,
2695 'primitive': vnf_config_primitive,
2696 'primitive_params': primitive_params,
2697 'lcmOperationType': operationType
2698 }
2699 op_index = self._find_suboperation(db_nslcmop, match)
2700 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
2701 # a. New sub-operation
2702 # The sub-operation does not exist, add it.
2703 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2704 # The following parameters are set to None for all kind of scaling:
2705 vdu_id = None
2706 vdu_count_index = None
2707 vdu_name = None
2708 if RO_nsr_id and RO_scaling_info:
2709 vnf_config_primitive = None
2710 primitive_params = None
2711 else:
2712 RO_nsr_id = None
2713 RO_scaling_info = None
2714 # Initial status for sub-operation
2715 operationState = 'PROCESSING'
2716 detailed_status = 'In progress'
2717 # Add sub-operation for pre/post-scaling (zero or more operations)
2718 self._add_suboperation(db_nslcmop,
2719 vnf_index,
2720 vdu_id,
2721 vdu_count_index,
2722 vdu_name,
2723 vnf_config_primitive,
2724 primitive_params,
2725 operationState,
2726 detailed_status,
2727 operationType,
2728 RO_nsr_id,
2729 RO_scaling_info)
2730 return self.SUBOPERATION_STATUS_NEW
2731 else:
2732 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2733 # or op_index (operationState != 'COMPLETED')
2734 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
2735
2736 # Function to return execution_environment id
2737
2738 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
2739 # TODO vdu_index_count
2740 for vca in vca_deployed_list:
2741 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
2742 return vca["ee_id"]
2743
2744 async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor,
2745 vca_index, destroy_ee=True, exec_primitives=True, scaling_in=False):
2746 """
2747 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
2748 :param logging_text:
2749 :param db_nslcmop:
2750 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
2751 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
2752 :param vca_index: index in the database _admin.deployed.VCA
2753 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
2754 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
2755 not executed properly
2756 :param scaling_in: True destroys the application, False destroys the model
2757 :return: None or exception
2758 """
2759
2760 self.logger.debug(
2761 logging_text + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
2762 vca_index, vca_deployed, config_descriptor, destroy_ee
2763 )
2764 )
2765
2766 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
2767
2768 # execute terminate_primitives
2769 if exec_primitives:
2770 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
2771 config_descriptor.get("terminate-config-primitive"), vca_deployed.get("ee_descriptor_id"))
2772 vdu_id = vca_deployed.get("vdu_id")
2773 vdu_count_index = vca_deployed.get("vdu_count_index")
2774 vdu_name = vca_deployed.get("vdu_name")
2775 vnf_index = vca_deployed.get("member-vnf-index")
2776 if terminate_primitives and vca_deployed.get("needed_terminate"):
2777 for seq in terminate_primitives:
2778 # For each sequence in list, get primitive and call _ns_execute_primitive()
2779 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2780 vnf_index, seq.get("name"))
2781 self.logger.debug(logging_text + step)
2782 # Create the primitive for each sequence, i.e. "primitive": "touch"
2783 primitive = seq.get('name')
2784 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2785
2786 # Add sub-operation
2787 self._add_suboperation(db_nslcmop,
2788 vnf_index,
2789 vdu_id,
2790 vdu_count_index,
2791 vdu_name,
2792 primitive,
2793 mapped_primitive_params)
2794 # Sub-operations: Call _ns_execute_primitive() instead of action()
2795 try:
2796 result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
2797 mapped_primitive_params,
2798 vca_type=vca_type)
2799 except LcmException:
2800 # this happens when VCA is not deployed. In this case it is not needed to terminate
2801 continue
2802 result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2803 if result not in result_ok:
2804 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
2805 "error {}".format(seq.get("name"), vnf_index, result_detail))
2806 # set that this VCA do not need terminated
2807 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
2808 self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
2809
2810 if vca_deployed.get("prometheus_jobs") and self.prometheus:
2811 await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
2812
2813 if destroy_ee:
2814 await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"], scaling_in=scaling_in)
2815
2816 async def _delete_all_N2VC(self, db_nsr: dict):
2817 self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
2818 namespace = "." + db_nsr["_id"]
2819 try:
2820 await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
2821 except N2VCNotFound: # already deleted. Skip
2822 pass
2823 self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
2824
2825 async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
2826 """
2827 Terminates a deployment from RO
2828 :param logging_text:
2829 :param nsr_deployed: db_nsr._admin.deployed
2830 :param nsr_id:
2831 :param nslcmop_id:
2832 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
2833 this method will update only the index 2, but it will write on database the concatenated content of the list
2834 :return:
2835 """
2836 db_nsr_update = {}
2837 failed_detail = []
2838 ro_nsr_id = ro_delete_action = None
2839 if nsr_deployed and nsr_deployed.get("RO"):
2840 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
2841 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2842 try:
2843 if ro_nsr_id:
2844 stage[2] = "Deleting ns from VIM."
2845 db_nsr_update["detailed-status"] = " ".join(stage)
2846 self._write_op_status(nslcmop_id, stage)
2847 self.logger.debug(logging_text + stage[2])
2848 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2849 self._write_op_status(nslcmop_id, stage)
2850 desc = await self.RO.delete("ns", ro_nsr_id)
2851 ro_delete_action = desc["action_id"]
2852 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
2853 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2854 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2855 if ro_delete_action:
2856 # wait until NS is deleted from VIM
2857 stage[2] = "Waiting ns deleted from VIM."
2858 detailed_status_old = None
2859 self.logger.debug(logging_text + stage[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id,
2860 ro_delete_action))
2861 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2862 self._write_op_status(nslcmop_id, stage)
2863
2864 delete_timeout = 20 * 60 # 20 minutes
2865 while delete_timeout > 0:
2866 desc = await self.RO.show(
2867 "ns",
2868 item_id_name=ro_nsr_id,
2869 extra_item="action",
2870 extra_item_id=ro_delete_action)
2871
2872 # deploymentStatus
2873 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
2874
2875 ns_status, ns_status_info = self.RO.check_action_status(desc)
2876 if ns_status == "ERROR":
2877 raise ROclient.ROClientException(ns_status_info)
2878 elif ns_status == "BUILD":
2879 stage[2] = "Deleting from VIM {}".format(ns_status_info)
2880 elif ns_status == "ACTIVE":
2881 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2882 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2883 break
2884 else:
2885 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2886 if stage[2] != detailed_status_old:
2887 detailed_status_old = stage[2]
2888 db_nsr_update["detailed-status"] = " ".join(stage)
2889 self._write_op_status(nslcmop_id, stage)
2890 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2891 await asyncio.sleep(5, loop=self.loop)
2892 delete_timeout -= 5
2893 else: # delete_timeout <= 0:
2894 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2895
2896 except Exception as e:
2897 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2898 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2899 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2900 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2901 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2902 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id))
2903 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2904 failed_detail.append("delete conflict: {}".format(e))
2905 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
2906 else:
2907 failed_detail.append("delete error: {}".format(e))
2908 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e))
2909
2910 # Delete nsd
2911 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
2912 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
2913 try:
2914 stage[2] = "Deleting nsd from RO."
2915 db_nsr_update["detailed-status"] = " ".join(stage)
2916 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2917 self._write_op_status(nslcmop_id, stage)
2918 await self.RO.delete("nsd", ro_nsd_id)
2919 self.logger.debug(logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id))
2920 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2921 except Exception as e:
2922 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2923 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2924 self.logger.debug(logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id))
2925 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2926 failed_detail.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e))
2927 self.logger.debug(logging_text + failed_detail[-1])
2928 else:
2929 failed_detail.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id, e))
2930 self.logger.error(logging_text + failed_detail[-1])
2931
2932 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
2933 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2934 if not vnf_deployed or not vnf_deployed["id"]:
2935 continue
2936 try:
2937 ro_vnfd_id = vnf_deployed["id"]
2938 stage[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
2939 vnf_deployed["member-vnf-index"], ro_vnfd_id)
2940 db_nsr_update["detailed-status"] = " ".join(stage)
2941 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2942 self._write_op_status(nslcmop_id, stage)
2943 await self.RO.delete("vnfd", ro_vnfd_id)
2944 self.logger.debug(logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id))
2945 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2946 except Exception as e:
2947 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2948 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2949 self.logger.debug(logging_text + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id))
2950 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2951 failed_detail.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e))
2952 self.logger.debug(logging_text + failed_detail[-1])
2953 else:
2954 failed_detail.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e))
2955 self.logger.error(logging_text + failed_detail[-1])
2956
2957 if failed_detail:
2958 stage[2] = "Error deleting from VIM"
2959 else:
2960 stage[2] = "Deleted from VIM"
2961 db_nsr_update["detailed-status"] = " ".join(stage)
2962 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2963 self._write_op_status(nslcmop_id, stage)
2964
2965 if failed_detail:
2966 raise LcmException("; ".join(failed_detail))
2967
2968 async def terminate(self, nsr_id, nslcmop_id):
2969 # Try to lock HA task here
2970 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2971 if not task_is_locked_by_me:
2972 return
2973
2974 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
2975 self.logger.debug(logging_text + "Enter")
2976 timeout_ns_terminate = self.timeout_ns_terminate
2977 db_nsr = None
2978 db_nslcmop = None
2979 operation_params = None
2980 exc = None
2981 error_list = [] # annotates all failed error messages
2982 db_nslcmop_update = {}
2983 autoremove = False # autoremove after terminated
2984 tasks_dict_info = {}
2985 db_nsr_update = {}
2986 stage = ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
2987 # ^ contains [stage, step, VIM-status]
2988 try:
2989 # wait for any previous tasks in process
2990 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
2991
2992 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2993 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2994 operation_params = db_nslcmop.get("operationParams") or {}
2995 if operation_params.get("timeout_ns_terminate"):
2996 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
2997 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2998 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2999
3000 db_nsr_update["operational-status"] = "terminating"
3001 db_nsr_update["config-status"] = "terminating"
3002 self._write_ns_status(
3003 nsr_id=nsr_id,
3004 ns_state="TERMINATING",
3005 current_operation="TERMINATING",
3006 current_operation_id=nslcmop_id,
3007 other_update=db_nsr_update
3008 )
3009 self._write_op_status(
3010 op_id=nslcmop_id,
3011 queuePosition=0,
3012 stage=stage
3013 )
3014 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
3015 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
3016 return
3017
3018 stage[1] = "Getting vnf descriptors from db."
3019 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
3020 db_vnfds_from_id = {}
3021 db_vnfds_from_member_index = {}
3022 # Loop over VNFRs
3023 for vnfr in db_vnfrs_list:
3024 vnfd_id = vnfr["vnfd-id"]
3025 if vnfd_id not in db_vnfds_from_id:
3026 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
3027 db_vnfds_from_id[vnfd_id] = vnfd
3028 db_vnfds_from_member_index[vnfr["member-vnf-index-ref"]] = db_vnfds_from_id[vnfd_id]
3029
3030 # Destroy individual execution environments when there are terminating primitives.
3031 # Rest of EE will be deleted at once
3032 # TODO - check before calling _destroy_N2VC
3033 # if not operation_params.get("skip_terminate_primitives"):#
3034 # or not vca.get("needed_terminate"):
3035 stage[0] = "Stage 2/3 execute terminating primitives."
3036 self.logger.debug(logging_text + stage[0])
3037 stage[1] = "Looking execution environment that needs terminate."
3038 self.logger.debug(logging_text + stage[1])
3039
3040 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
3041 config_descriptor = None
3042 if not vca or not vca.get("ee_id"):
3043 continue
3044 if not vca.get("member-vnf-index"):
3045 # ns
3046 config_descriptor = db_nsr.get("ns-configuration")
3047 elif vca.get("vdu_id"):
3048 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3049 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
3050 elif vca.get("kdu_name"):
3051 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3052 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
3053 else:
3054 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3055 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
3056 vca_type = vca.get("type")
3057 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
3058 vca.get("needed_terminate"))
3059 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
3060 # pending native charms
3061 destroy_ee = True if vca_type in ("helm", "helm-v3", "native_charm") else False
3062 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
3063 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
3064 task = asyncio.ensure_future(
3065 self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index,
3066 destroy_ee, exec_terminate_primitives))
3067 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
3068
3069 # wait for pending tasks of terminate primitives
3070 if tasks_dict_info:
3071 self.logger.debug(logging_text + 'Waiting for tasks {}'.format(list(tasks_dict_info.keys())))
3072 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
3073 min(self.timeout_charm_delete, timeout_ns_terminate),
3074 stage, nslcmop_id)
3075 tasks_dict_info.clear()
3076 if error_list:
3077 return # raise LcmException("; ".join(error_list))
3078
3079 # remove All execution environments at once
3080 stage[0] = "Stage 3/3 delete all."
3081
3082 if nsr_deployed.get("VCA"):
3083 stage[1] = "Deleting all execution environments."
3084 self.logger.debug(logging_text + stage[1])
3085 task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_all_N2VC(db_nsr=db_nsr),
3086 timeout=self.timeout_charm_delete))
3087 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3088 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
3089
3090 # Delete from k8scluster
3091 stage[1] = "Deleting KDUs."
3092 self.logger.debug(logging_text + stage[1])
3093 # print(nsr_deployed)
3094 for kdu in get_iterable(nsr_deployed, "K8s"):
3095 if not kdu or not kdu.get("kdu-instance"):
3096 continue
3097 kdu_instance = kdu.get("kdu-instance")
3098 if kdu.get("k8scluster-type") in self.k8scluster_map:
3099 task_delete_kdu_instance = asyncio.ensure_future(
3100 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
3101 cluster_uuid=kdu.get("k8scluster-uuid"),
3102 kdu_instance=kdu_instance))
3103 else:
3104 self.logger.error(logging_text + "Unknown k8s deployment type {}".
3105 format(kdu.get("k8scluster-type")))
3106 continue
3107 tasks_dict_info[task_delete_kdu_instance] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
3108
3109 # remove from RO
3110 stage[1] = "Deleting ns from VIM."
3111 if self.ng_ro:
3112 task_delete_ro = asyncio.ensure_future(
3113 self._terminate_ng_ro(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3114 else:
3115 task_delete_ro = asyncio.ensure_future(
3116 self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3117 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
3118
3119 # rest of staff will be done at finally
3120
3121 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
3122 self.logger.error(logging_text + "Exit Exception {}".format(e))
3123 exc = e
3124 except asyncio.CancelledError:
3125 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
3126 exc = "Operation was cancelled"
3127 except Exception as e:
3128 exc = traceback.format_exc()
3129 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
3130 finally:
3131 if exc:
3132 error_list.append(str(exc))
3133 try:
3134 # wait for pending tasks
3135 if tasks_dict_info:
3136 stage[1] = "Waiting for terminate pending tasks."
3137 self.logger.debug(logging_text + stage[1])
3138 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_terminate,
3139 stage, nslcmop_id)
3140 stage[1] = stage[2] = ""
3141 except asyncio.CancelledError:
3142 error_list.append("Cancelled")
3143 # TODO cancell all tasks
3144 except Exception as exc:
3145 error_list.append(str(exc))
3146 # update status at database
3147 if error_list:
3148 error_detail = "; ".join(error_list)
3149 # self.logger.error(logging_text + error_detail)
3150 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
3151 error_description_nsr = 'Operation: TERMINATING.{}, {}.'.format(nslcmop_id, stage[0])
3152
3153 db_nsr_update["operational-status"] = "failed"
3154 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
3155 db_nslcmop_update["detailed-status"] = error_detail
3156 nslcmop_operation_state = "FAILED"
3157 ns_state = "BROKEN"
3158 else:
3159 error_detail = None
3160 error_description_nsr = error_description_nslcmop = None
3161 ns_state = "NOT_INSTANTIATED"
3162 db_nsr_update["operational-status"] = "terminated"
3163 db_nsr_update["detailed-status"] = "Done"
3164 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
3165 db_nslcmop_update["detailed-status"] = "Done"
3166 nslcmop_operation_state = "COMPLETED"
3167
3168 if db_nsr:
3169 self._write_ns_status(
3170 nsr_id=nsr_id,
3171 ns_state=ns_state,
3172 current_operation="IDLE",
3173 current_operation_id=None,
3174 error_description=error_description_nsr,
3175 error_detail=error_detail,
3176 other_update=db_nsr_update
3177 )
3178 self._write_op_status(
3179 op_id=nslcmop_id,
3180 stage="",
3181 error_message=error_description_nslcmop,
3182 operation_state=nslcmop_operation_state,
3183 other_update=db_nslcmop_update,
3184 )
3185 if ns_state == "NOT_INSTANTIATED":
3186 try:
3187 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "NOT_INSTANTIATED"})
3188 except DbException as e:
3189 self.logger.warn(logging_text + 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
3190 format(nsr_id, e))
3191 if operation_params:
3192 autoremove = operation_params.get("autoremove", False)
3193 if nslcmop_operation_state:
3194 try:
3195 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3196 "operationState": nslcmop_operation_state,
3197 "autoremove": autoremove},
3198 loop=self.loop)
3199 except Exception as e:
3200 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3201
3202 self.logger.debug(logging_text + "Exit")
3203 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
3204
3205 async def _wait_for_tasks(self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None):
3206 time_start = time()
3207 error_detail_list = []
3208 error_list = []
3209 pending_tasks = list(created_tasks_info.keys())
3210 num_tasks = len(pending_tasks)
3211 num_done = 0
3212 stage[1] = "{}/{}.".format(num_done, num_tasks)
3213 self._write_op_status(nslcmop_id, stage)
3214 while pending_tasks:
3215 new_error = None
3216 _timeout = timeout + time_start - time()
3217 done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout,
3218 return_when=asyncio.FIRST_COMPLETED)
3219 num_done += len(done)
3220 if not done: # Timeout
3221 for task in pending_tasks:
3222 new_error = created_tasks_info[task] + ": Timeout"
3223 error_detail_list.append(new_error)
3224 error_list.append(new_error)
3225 break
3226 for task in done:
3227 if task.cancelled():
3228 exc = "Cancelled"
3229 else:
3230 exc = task.exception()
3231 if exc:
3232 if isinstance(exc, asyncio.TimeoutError):
3233 exc = "Timeout"
3234 new_error = created_tasks_info[task] + ": {}".format(exc)
3235 error_list.append(created_tasks_info[task])
3236 error_detail_list.append(new_error)
3237 if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException,
3238 K8sException, NgRoException)):
3239 self.logger.error(logging_text + new_error)
3240 else:
3241 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
3242 self.logger.error(logging_text + created_tasks_info[task] + " " + exc_traceback)
3243 else:
3244 self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
3245 stage[1] = "{}/{}.".format(num_done, num_tasks)
3246 if new_error:
3247 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
3248 if nsr_id: # update also nsr
3249 self.update_db_2("nsrs", nsr_id, {"errorDescription": "Error at: " + ", ".join(error_list),
3250 "errorDetail": ". ".join(error_detail_list)})
3251 self._write_op_status(nslcmop_id, stage)
3252 return error_detail_list
3253
3254 @staticmethod
3255 def _map_primitive_params(primitive_desc, params, instantiation_params):
3256 """
3257 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3258 The default-value is used. If it is between < > it look for a value at instantiation_params
3259 :param primitive_desc: portion of VNFD/NSD that describes primitive
3260 :param params: Params provided by user
3261 :param instantiation_params: Instantiation params provided by user
3262 :return: a dictionary with the calculated params
3263 """
3264 calculated_params = {}
3265 for parameter in primitive_desc.get("parameter", ()):
3266 param_name = parameter["name"]
3267 if param_name in params:
3268 calculated_params[param_name] = params[param_name]
3269 elif "default-value" in parameter or "value" in parameter:
3270 if "value" in parameter:
3271 calculated_params[param_name] = parameter["value"]
3272 else:
3273 calculated_params[param_name] = parameter["default-value"]
3274 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
3275 and calculated_params[param_name].endswith(">"):
3276 if calculated_params[param_name][1:-1] in instantiation_params:
3277 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
3278 else:
3279 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3280 format(calculated_params[param_name], primitive_desc["name"]))
3281 else:
3282 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3283 format(param_name, primitive_desc["name"]))
3284
3285 if isinstance(calculated_params[param_name], (dict, list, tuple)):
3286 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name],
3287 default_flow_style=True, width=256)
3288 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
3289 calculated_params[param_name] = calculated_params[param_name][7:]
3290 if parameter.get("data-type") == "INTEGER":
3291 try:
3292 calculated_params[param_name] = int(calculated_params[param_name])
3293 except ValueError: # error converting string to int
3294 raise LcmException(
3295 "Parameter {} of primitive {} must be integer".format(param_name, primitive_desc["name"]))
3296 elif parameter.get("data-type") == "BOOLEAN":
3297 calculated_params[param_name] = not ((str(calculated_params[param_name])).lower() == 'false')
3298
3299 # add always ns_config_info if primitive name is config
3300 if primitive_desc["name"] == "config":
3301 if "ns_config_info" in instantiation_params:
3302 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
3303 return calculated_params
3304
3305 def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None,
3306 ee_descriptor_id=None):
3307 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3308 for vca in deployed_vca:
3309 if not vca:
3310 continue
3311 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
3312 continue
3313 if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]:
3314 continue
3315 if kdu_name and kdu_name != vca["kdu_name"]:
3316 continue
3317 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
3318 continue
3319 break
3320 else:
3321 # vca_deployed not found
3322 raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
3323 " is not deployed".format(member_vnf_index, vdu_id, vdu_count_index, kdu_name,
3324 ee_descriptor_id))
3325 # get ee_id
3326 ee_id = vca.get("ee_id")
3327 vca_type = vca.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3328 if not ee_id:
3329 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3330 "execution environment"
3331 .format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
3332 return ee_id, vca_type
3333
3334 async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0, retries_interval=30,
3335 timeout=None, vca_type=None, db_dict=None) -> (str, str):
3336 try:
3337 if primitive == "config":
3338 primitive_params = {"params": primitive_params}
3339
3340 vca_type = vca_type or "lxc_proxy_charm"
3341
3342 while retries >= 0:
3343 try:
3344 output = await asyncio.wait_for(
3345 self.vca_map[vca_type].exec_primitive(
3346 ee_id=ee_id,
3347 primitive_name=primitive,
3348 params_dict=primitive_params,
3349 progress_timeout=self.timeout_progress_primitive,
3350 total_timeout=self.timeout_primitive,
3351 db_dict=db_dict),
3352 timeout=timeout or self.timeout_primitive)
3353 # execution was OK
3354 break
3355 except asyncio.CancelledError:
3356 raise
3357 except Exception as e: # asyncio.TimeoutError
3358 if isinstance(e, asyncio.TimeoutError):
3359 e = "Timeout"
3360 retries -= 1
3361 if retries >= 0:
3362 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
3363 # wait and retry
3364 await asyncio.sleep(retries_interval, loop=self.loop)
3365 else:
3366 return 'FAILED', str(e)
3367
3368 return 'COMPLETED', output
3369
3370 except (LcmException, asyncio.CancelledError):
3371 raise
3372 except Exception as e:
3373 return 'FAIL', 'Error executing action {}: {}'.format(primitive, e)
3374
3375 async def vca_status_refresh(self, nsr_id, nslcmop_id):
3376 """
3377 Updating the vca_status with latest juju information in nsrs record
3378 :param: nsr_id: Id of the nsr
3379 :param: nslcmop_id: Id of the nslcmop
3380 :return: None
3381 """
3382
3383 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
3384 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3385
3386 for vca_index, _ in enumerate(db_nsr['_admin']['deployed']['VCA']):
3387 table, filter, path = "nsrs", {"_id": nsr_id}, "_admin.deployed.VCA.{}.".format(vca_index)
3388 await self._on_update_n2vc_db(table, filter, path, {})
3389
3390 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
3391 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
3392
3393 async def action(self, nsr_id, nslcmop_id):
3394 # Try to lock HA task here
3395 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3396 if not task_is_locked_by_me:
3397 return
3398
3399 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
3400 self.logger.debug(logging_text + "Enter")
3401 # get all needed from database
3402 db_nsr = None
3403 db_nslcmop = None
3404 db_nsr_update = {}
3405 db_nslcmop_update = {}
3406 nslcmop_operation_state = None
3407 error_description_nslcmop = None
3408 exc = None
3409 try:
3410 # wait for any previous tasks in process
3411 step = "Waiting for previous operations to terminate"
3412 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3413
3414 self._write_ns_status(
3415 nsr_id=nsr_id,
3416 ns_state=None,
3417 current_operation="RUNNING ACTION",
3418 current_operation_id=nslcmop_id
3419 )
3420
3421 step = "Getting information from database"
3422 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3423 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3424
3425 nsr_deployed = db_nsr["_admin"].get("deployed")
3426 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3427 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3428 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
3429 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3430 primitive = db_nslcmop["operationParams"]["primitive"]
3431 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
3432 timeout_ns_action = db_nslcmop["operationParams"].get("timeout_ns_action", self.timeout_primitive)
3433
3434 if vnf_index:
3435 step = "Getting vnfr from database"
3436 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3437 step = "Getting vnfd from database"
3438 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3439 else:
3440 step = "Getting nsd from database"
3441 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
3442
3443 # for backward compatibility
3444 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3445 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3446 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3447 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3448
3449 # look for primitive
3450 config_primitive_desc = descriptor_configuration = None
3451 if vdu_id:
3452 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
3453 elif kdu_name:
3454 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
3455 elif vnf_index:
3456 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
3457 else:
3458 descriptor_configuration = db_nsd.get("ns-configuration")
3459
3460 if descriptor_configuration and descriptor_configuration.get("config-primitive"):
3461 for config_primitive in descriptor_configuration["config-primitive"]:
3462 if config_primitive["name"] == primitive:
3463 config_primitive_desc = config_primitive
3464 break
3465
3466 if not config_primitive_desc:
3467 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
3468 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3469 format(primitive))
3470 primitive_name = primitive
3471 ee_descriptor_id = None
3472 else:
3473 primitive_name = config_primitive_desc.get("execution-environment-primitive", primitive)
3474 ee_descriptor_id = config_primitive_desc.get("execution-environment-ref")
3475
3476 if vnf_index:
3477 if vdu_id:
3478 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3479 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
3480 elif kdu_name:
3481 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
3482 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3483 else:
3484 desc_params = parse_yaml_strings(db_vnfr.get("additionalParamsForVnf"))
3485 else:
3486 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
3487 if kdu_name and get_configuration(db_vnfd, kdu_name):
3488 kdu_configuration = get_configuration(db_vnfd, kdu_name)
3489 actions = set()
3490 for primitive in kdu_configuration.get("initial-config-primitive", []):
3491 actions.add(primitive["name"])
3492 for primitive in kdu_configuration.get("config-primitive", []):
3493 actions.add(primitive["name"])
3494 kdu_action = True if primitive_name in actions else False
3495
3496 # TODO check if ns is in a proper status
3497 if kdu_name and (primitive_name in ("upgrade", "rollback", "status") or kdu_action):
3498 # kdur and desc_params already set from before
3499 if primitive_params:
3500 desc_params.update(primitive_params)
3501 # TODO Check if we will need something at vnf level
3502 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
3503 if kdu_name == kdu["kdu-name"] and kdu["member-vnf-index"] == vnf_index:
3504 break
3505 else:
3506 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index))
3507
3508 if kdu.get("k8scluster-type") not in self.k8scluster_map:
3509 msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
3510 raise LcmException(msg)
3511
3512 db_dict = {"collection": "nsrs",
3513 "filter": {"_id": nsr_id},
3514 "path": "_admin.deployed.K8s.{}".format(index)}
3515 self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name))
3516 step = "Executing kdu {}".format(primitive_name)
3517 if primitive_name == "upgrade":
3518 if desc_params.get("kdu_model"):
3519 kdu_model = desc_params.get("kdu_model")
3520 del desc_params["kdu_model"]
3521 else:
3522 kdu_model = kdu.get("kdu-model")
3523 parts = kdu_model.split(sep=":")
3524 if len(parts) == 2:
3525 kdu_model = parts[0]
3526
3527 detailed_status = await asyncio.wait_for(
3528 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
3529 cluster_uuid=kdu.get("k8scluster-uuid"),
3530 kdu_instance=kdu.get("kdu-instance"),
3531 atomic=True, kdu_model=kdu_model,
3532 params=desc_params, db_dict=db_dict,
3533 timeout=timeout_ns_action),
3534 timeout=timeout_ns_action + 10)
3535 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(detailed_status))
3536 elif primitive_name == "rollback":
3537 detailed_status = await asyncio.wait_for(
3538 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
3539 cluster_uuid=kdu.get("k8scluster-uuid"),
3540 kdu_instance=kdu.get("kdu-instance"),
3541 db_dict=db_dict),
3542 timeout=timeout_ns_action)
3543 elif primitive_name == "status":
3544 detailed_status = await asyncio.wait_for(
3545 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
3546 cluster_uuid=kdu.get("k8scluster-uuid"),
3547 kdu_instance=kdu.get("kdu-instance")),
3548 timeout=timeout_ns_action)
3549 else:
3550 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(kdu["kdu-name"], nsr_id)
3551 params = self._map_primitive_params(config_primitive_desc, primitive_params, desc_params)
3552
3553 detailed_status = await asyncio.wait_for(
3554 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
3555 cluster_uuid=kdu.get("k8scluster-uuid"),
3556 kdu_instance=kdu_instance,
3557 primitive_name=primitive_name,
3558 params=params, db_dict=db_dict,
3559 timeout=timeout_ns_action),
3560 timeout=timeout_ns_action)
3561
3562 if detailed_status:
3563 nslcmop_operation_state = 'COMPLETED'
3564 else:
3565 detailed_status = ''
3566 nslcmop_operation_state = 'FAILED'
3567 else:
3568 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"], member_vnf_index=vnf_index,
3569 vdu_id=vdu_id, vdu_count_index=vdu_count_index,
3570 ee_descriptor_id=ee_descriptor_id)
3571 db_nslcmop_notif = {"collection": "nslcmops",
3572 "filter": {"_id": nslcmop_id},
3573 "path": "admin.VCA"}
3574 nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
3575 ee_id,
3576 primitive=primitive_name,
3577 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
3578 timeout=timeout_ns_action,
3579 vca_type=vca_type,
3580 db_dict=db_nslcmop_notif)
3581
3582 db_nslcmop_update["detailed-status"] = detailed_status
3583 error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else ""
3584 self.logger.debug(logging_text + " task Done with result {} {}".format(nslcmop_operation_state,
3585 detailed_status))
3586 return # database update is called inside finally
3587
3588 except (DbException, LcmException, N2VCException, K8sException) as e:
3589 self.logger.error(logging_text + "Exit Exception {}".format(e))
3590 exc = e
3591 except asyncio.CancelledError:
3592 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3593 exc = "Operation was cancelled"
3594 except asyncio.TimeoutError:
3595 self.logger.error(logging_text + "Timeout while '{}'".format(step))
3596 exc = "Timeout"
3597 except Exception as e:
3598 exc = traceback.format_exc()
3599 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3600 finally:
3601 if exc:
3602 db_nslcmop_update["detailed-status"] = detailed_status = error_description_nslcmop = \
3603 "FAILED {}: {}".format(step, exc)
3604 nslcmop_operation_state = "FAILED"
3605 if db_nsr:
3606 self._write_ns_status(
3607 nsr_id=nsr_id,
3608 ns_state=db_nsr["nsState"], # TODO check if degraded. For the moment use previous status
3609 current_operation="IDLE",
3610 current_operation_id=None,
3611 # error_description=error_description_nsr,
3612 # error_detail=error_detail,
3613 other_update=db_nsr_update
3614 )
3615
3616 self._write_op_status(op_id=nslcmop_id, stage="", error_message=error_description_nslcmop,
3617 operation_state=nslcmop_operation_state, other_update=db_nslcmop_update)
3618
3619 if nslcmop_operation_state:
3620 try:
3621 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3622 "operationState": nslcmop_operation_state},
3623 loop=self.loop)
3624 except Exception as e:
3625 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3626 self.logger.debug(logging_text + "Exit")
3627 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
3628 return nslcmop_operation_state, detailed_status
3629
3630 async def scale(self, nsr_id, nslcmop_id):
3631 # Try to lock HA task here
3632 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3633 if not task_is_locked_by_me:
3634 return
3635
3636 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
3637 stage = ['', '', '']
3638 tasks_dict_info = {}
3639 # ^ stage, step, VIM progress
3640 self.logger.debug(logging_text + "Enter")
3641 # get all needed from database
3642 db_nsr = None
3643 db_nslcmop_update = {}
3644 db_nsr_update = {}
3645 exc = None
3646 # in case of error, indicates what part of scale was failed to put nsr at error status
3647 scale_process = None
3648 old_operational_status = ""
3649 old_config_status = ""
3650 nsi_id = None
3651 try:
3652 # wait for any previous tasks in process
3653 step = "Waiting for previous operations to terminate"
3654 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3655 self._write_ns_status(nsr_id=nsr_id, ns_state=None,
3656 current_operation="SCALING", current_operation_id=nslcmop_id)
3657
3658 step = "Getting nslcmop from database"
3659 self.logger.debug(step + " after having waited for previous tasks to be completed")
3660 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3661
3662 step = "Getting nsr from database"
3663 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3664 old_operational_status = db_nsr["operational-status"]
3665 old_config_status = db_nsr["config-status"]
3666
3667 step = "Parsing scaling parameters"
3668 db_nsr_update["operational-status"] = "scaling"
3669 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3670 nsr_deployed = db_nsr["_admin"].get("deployed")
3671
3672 #######
3673 nsr_deployed = db_nsr["_admin"].get("deployed")
3674 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3675 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3676 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3677 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3678 #######
3679
3680 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3681 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3682 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3683 # for backward compatibility
3684 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3685 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3686 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3687 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3688
3689 step = "Getting vnfr from database"
3690 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3691
3692 step = "Getting vnfd from database"
3693 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3694
3695 base_folder = db_vnfd["_admin"]["storage"]
3696
3697 step = "Getting scaling-group-descriptor"
3698 scaling_descriptor = find_in_list(
3699 get_scaling_aspect(
3700 db_vnfd
3701 ),
3702 lambda scale_desc: scale_desc["name"] == scaling_group
3703 )
3704 if not scaling_descriptor:
3705 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3706 "at vnfd:scaling-group-descriptor".format(scaling_group))
3707
3708 step = "Sending scale order to VIM"
3709 # TODO check if ns is in a proper status
3710 nb_scale_op = 0
3711 if not db_nsr["_admin"].get("scaling-group"):
3712 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
3713 admin_scale_index = 0
3714 else:
3715 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
3716 if admin_scale_info["name"] == scaling_group:
3717 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
3718 break
3719 else: # not found, set index one plus last element and add new entry with the name
3720 admin_scale_index += 1
3721 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
3722 RO_scaling_info = []
3723 VCA_scaling_info = []
3724 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
3725 if scaling_type == "SCALE_OUT":
3726 if "aspect-delta-details" not in scaling_descriptor:
3727 raise LcmException(
3728 "Aspect delta details not fount in scaling descriptor {}".format(
3729 scaling_descriptor["name"]
3730 )
3731 )
3732 # count if max-instance-count is reached
3733 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
3734
3735 vdu_scaling_info["scaling_direction"] = "OUT"
3736 vdu_scaling_info["vdu-create"] = {}
3737 for delta in deltas:
3738 for vdu_delta in delta["vdu-delta"]:
3739 vdud = get_vdu(db_vnfd, vdu_delta["id"])
3740 vdu_index = get_vdur_index(db_vnfr, vdu_delta)
3741 cloud_init_text = self._get_vdu_cloud_init_content(vdud, db_vnfd)
3742 if cloud_init_text:
3743 additional_params = self._get_vdu_additional_params(db_vnfr, vdud["id"]) or {}
3744 cloud_init_list = []
3745
3746 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
3747 max_instance_count = 10
3748 if vdu_profile and "max-number-of-instances" in vdu_profile:
3749 max_instance_count = vdu_profile.get("max-number-of-instances", 10)
3750
3751 default_instance_num = get_number_of_instances(db_vnfd, vdud["id"])
3752
3753 nb_scale_op += vdu_delta.get("number-of-instances", 1)
3754
3755 if nb_scale_op + default_instance_num > max_instance_count:
3756 raise LcmException(
3757 "reached the limit of {} (max-instance-count) "
3758 "scaling-out operations for the "
3759 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group)
3760 )
3761 for x in range(vdu_delta.get("number-of-instances", 1)):
3762 if cloud_init_text:
3763 # TODO Information of its own ip is not available because db_vnfr is not updated.
3764 additional_params["OSM"] = get_osm_params(
3765 db_vnfr,
3766 vdu_delta["id"],
3767 vdu_index + x
3768 )
3769 cloud_init_list.append(
3770 self._parse_cloud_init(
3771 cloud_init_text,
3772 additional_params,
3773 db_vnfd["id"],
3774 vdud["id"]
3775 )
3776 )
3777 VCA_scaling_info.append(
3778 {
3779 "osm_vdu_id": vdu_delta["id"],
3780 "member-vnf-index": vnf_index,
3781 "type": "create",
3782 "vdu_index": vdu_index + x
3783 }
3784 )
3785 RO_scaling_info.append(
3786 {
3787 "osm_vdu_id": vdu_delta["id"],
3788 "member-vnf-index": vnf_index,
3789 "type": "create",
3790 "count": vdu_delta.get("number-of-instances", 1)
3791 }
3792 )
3793 if cloud_init_list:
3794 RO_scaling_info[-1]["cloud_init"] = cloud_init_list
3795 vdu_scaling_info["vdu-create"][vdu_delta["id"]] = vdu_delta.get("number-of-instances", 1)
3796
3797 elif scaling_type == "SCALE_IN":
3798 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
3799 min_instance_count = int(scaling_descriptor["min-instance-count"])
3800
3801 vdu_scaling_info["scaling_direction"] = "IN"
3802 vdu_scaling_info["vdu-delete"] = {}
3803 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
3804 for delta in deltas:
3805 for vdu_delta in delta["vdu-delta"]:
3806 vdu_index = get_vdur_index(db_vnfr, vdu_delta)
3807 min_instance_count = 0
3808 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
3809 if vdu_profile and "min-number-of-instances" in vdu_profile:
3810 min_instance_count = vdu_profile["min-number-of-instances"]
3811
3812 default_instance_num = get_number_of_instances(db_vnfd, vdu_delta["id"])
3813
3814 nb_scale_op -= vdu_delta.get("number-of-instances", 1)
3815 if nb_scale_op + default_instance_num < min_instance_count:
3816 raise LcmException(
3817 "reached the limit of {} (min-instance-count) scaling-in operations for the "
3818 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group)
3819 )
3820 RO_scaling_info.append({"osm_vdu_id": vdu_delta["id"], "member-vnf-index": vnf_index,
3821 "type": "delete", "count": vdu_delta.get("number-of-instances", 1),
3822 "vdu_index": vdu_index - 1})
3823 for x in range(vdu_delta.get("number-of-instances", 1)):
3824 VCA_scaling_info.append(
3825 {
3826 "osm_vdu_id": vdu_delta["id"],
3827 "member-vnf-index": vnf_index,
3828 "type": "delete",
3829 "vdu_index": vdu_index - 1 - x
3830 }
3831 )
3832 vdu_scaling_info["vdu-delete"][vdu_delta["id"]] = vdu_delta.get("number-of-instances", 1)
3833
3834 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3835 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
3836 if vdu_scaling_info["scaling_direction"] == "IN":
3837 for vdur in reversed(db_vnfr["vdur"]):
3838 if vdu_delete.get(vdur["vdu-id-ref"]):
3839 vdu_delete[vdur["vdu-id-ref"]] -= 1
3840 vdu_scaling_info["vdu"].append({
3841 "name": vdur.get("name") or vdur.get("vdu-name"),
3842 "vdu_id": vdur["vdu-id-ref"],
3843 "interface": []
3844 })
3845 for interface in vdur["interfaces"]:
3846 vdu_scaling_info["vdu"][-1]["interface"].append({
3847 "name": interface["name"],
3848 "ip_address": interface["ip-address"],
3849 "mac_address": interface.get("mac-address"),
3850 })
3851 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
3852
3853 # PRE-SCALE BEGIN
3854 step = "Executing pre-scale vnf-config-primitive"
3855 if scaling_descriptor.get("scaling-config-action"):
3856 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3857 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
3858 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
3859 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3860 step = db_nslcmop_update["detailed-status"] = \
3861 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
3862
3863 # look for primitive
3864 for config_primitive in (get_configuration(
3865 db_vnfd, db_vnfd["id"]
3866 ) or {}).get("config-primitive", ()):
3867 if config_primitive["name"] == vnf_config_primitive:
3868 break
3869 else:
3870 raise LcmException(
3871 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3872 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3873 "primitive".format(scaling_group, vnf_config_primitive))
3874
3875 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3876 if db_vnfr.get("additionalParamsForVnf"):
3877 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3878
3879 scale_process = "VCA"
3880 db_nsr_update["config-status"] = "configuring pre-scaling"
3881 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3882
3883 # Pre-scale retry check: Check if this sub-operation has been executed before
3884 op_index = self._check_or_add_scale_suboperation(
3885 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
3886 if op_index == self.SUBOPERATION_STATUS_SKIP:
3887 # Skip sub-operation
3888 result = 'COMPLETED'
3889 result_detail = 'Done'
3890 self.logger.debug(logging_text +
3891 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3892 vnf_config_primitive, result, result_detail))
3893 else:
3894 if op_index == self.SUBOPERATION_STATUS_NEW:
3895 # New sub-operation: Get index of this sub-operation
3896 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3897 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3898 format(vnf_config_primitive))
3899 else:
3900 # retry: Get registered params for this existing sub-operation
3901 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3902 vnf_index = op.get('member_vnf_index')
3903 vnf_config_primitive = op.get('primitive')
3904 primitive_params = op.get('primitive_params')
3905 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
3906 format(vnf_config_primitive))
3907 # Execute the primitive, either with new (first-time) or registered (reintent) args
3908 ee_descriptor_id = config_primitive.get("execution-environment-ref")
3909 primitive_name = config_primitive.get("execution-environment-primitive",
3910 vnf_config_primitive)
3911 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
3912 member_vnf_index=vnf_index,
3913 vdu_id=None,
3914 vdu_count_index=None,
3915 ee_descriptor_id=ee_descriptor_id)
3916 result, result_detail = await self._ns_execute_primitive(
3917 ee_id, primitive_name, primitive_params, vca_type=vca_type)
3918 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3919 vnf_config_primitive, result, result_detail))
3920 # Update operationState = COMPLETED | FAILED
3921 self._update_suboperation_status(
3922 db_nslcmop, op_index, result, result_detail)
3923
3924 if result == "FAILED":
3925 raise LcmException(result_detail)
3926 db_nsr_update["config-status"] = old_config_status
3927 scale_process = None
3928 # PRE-SCALE END
3929
3930 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
3931 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
3932
3933 # SCALE-IN VCA - BEGIN
3934 if VCA_scaling_info:
3935 step = db_nslcmop_update["detailed-status"] = \
3936 "Deleting the execution environments"
3937 scale_process = "VCA"
3938 for vdu_info in VCA_scaling_info:
3939 if vdu_info["type"] == "delete":
3940 member_vnf_index = str(vdu_info["member-vnf-index"])
3941 self.logger.debug(logging_text + "vdu info: {}".format(vdu_info))
3942 vdu_id = vdu_info["osm_vdu_id"]
3943 vdu_index = int(vdu_info["vdu_index"])
3944 stage[1] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
3945 member_vnf_index, vdu_id, vdu_index)
3946 stage[2] = step = "Scaling in VCA"
3947 self._write_op_status(
3948 op_id=nslcmop_id,
3949 stage=stage
3950 )
3951 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
3952 config_update = db_nsr["configurationStatus"]
3953 for vca_index, vca in enumerate(vca_update):
3954 if (vca or vca.get("ee_id")) and vca["member-vnf-index"] == member_vnf_index and \
3955 vca["vdu_count_index"] == vdu_index:
3956 if vca.get("vdu_id"):
3957 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
3958 elif vca.get("kdu_name"):
3959 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
3960 else:
3961 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
3962 operation_params = db_nslcmop.get("operationParams") or {}
3963 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
3964 vca.get("needed_terminate"))
3965 task = asyncio.ensure_future(asyncio.wait_for(
3966 self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
3967 vca_index, destroy_ee=True,
3968 exec_primitives=exec_terminate_primitives,
3969 scaling_in=True), timeout=self.timeout_charm_delete))
3970 # wait before next removal
3971 await asyncio.sleep(30)
3972 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
3973 del vca_update[vca_index]
3974 del config_update[vca_index]
3975 # wait for pending tasks of terminate primitives
3976 if tasks_dict_info:
3977 self.logger.debug(logging_text +
3978 'Waiting for tasks {}'.format(list(tasks_dict_info.keys())))
3979 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
3980 min(self.timeout_charm_delete,
3981 self.timeout_ns_terminate),
3982 stage, nslcmop_id)
3983 tasks_dict_info.clear()
3984 if error_list:
3985 raise LcmException("; ".join(error_list))
3986
3987 db_vca_and_config_update = {
3988 "_admin.deployed.VCA": vca_update,
3989 "configurationStatus": config_update
3990 }
3991 self.update_db_2("nsrs", db_nsr["_id"], db_vca_and_config_update)
3992 scale_process = None
3993 # SCALE-IN VCA - END
3994
3995 # SCALE RO - BEGIN
3996 if RO_scaling_info:
3997 scale_process = "RO"
3998 if self.ro_config.get("ng"):
3999 await self._scale_ng_ro(logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage)
4000 vdu_scaling_info.pop("vdu-create", None)
4001 vdu_scaling_info.pop("vdu-delete", None)
4002
4003 scale_process = None
4004 if db_nsr_update:
4005 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4006 # SCALE RO - END
4007
4008 # SCALE-UP VCA - BEGIN
4009 if VCA_scaling_info:
4010 step = db_nslcmop_update["detailed-status"] = \
4011 "Creating new execution environments"
4012 scale_process = "VCA"
4013 for vdu_info in VCA_scaling_info:
4014 if vdu_info["type"] == "create":
4015 member_vnf_index = str(vdu_info["member-vnf-index"])
4016 self.logger.debug(logging_text + "vdu info: {}".format(vdu_info))
4017 vnfd_id = db_vnfr["vnfd-ref"]
4018 vdu_index = int(vdu_info["vdu_index"])
4019 deploy_params = {"OSM": get_osm_params(db_vnfr)}
4020 if db_vnfr.get("additionalParamsForVnf"):
4021 deploy_params.update(parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy()))
4022 descriptor_config = get_configuration(db_vnfd, db_vnfd["id"])
4023 if descriptor_config:
4024 vdu_id = None
4025 vdu_name = None
4026 kdu_name = None
4027 self._deploy_n2vc(
4028 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
4029 db_nsr=db_nsr,
4030 db_vnfr=db_vnfr,
4031 nslcmop_id=nslcmop_id,
4032 nsr_id=nsr_id,
4033 nsi_id=nsi_id,
4034 vnfd_id=vnfd_id,
4035 vdu_id=vdu_id,
4036 kdu_name=kdu_name,
4037 member_vnf_index=member_vnf_index,
4038 vdu_index=vdu_index,
4039 vdu_name=vdu_name,
4040 deploy_params=deploy_params,
4041 descriptor_config=descriptor_config,
4042 base_folder=base_folder,
4043 task_instantiation_info=tasks_dict_info,
4044 stage=stage
4045 )
4046 vdu_id = vdu_info["osm_vdu_id"]
4047 vdur = find_in_list(db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id)
4048 descriptor_config = get_configuration(db_vnfd, vdu_id)
4049 if vdur.get("additionalParams"):
4050 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
4051 else:
4052 deploy_params_vdu = deploy_params
4053 deploy_params_vdu["OSM"] = get_osm_params(db_vnfr, vdu_id, vdu_count_index=vdu_index)
4054 if descriptor_config:
4055 vdu_name = None
4056 kdu_name = None
4057 stage[1] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
4058 member_vnf_index, vdu_id, vdu_index)
4059 stage[2] = step = "Scaling out VCA"
4060 self._write_op_status(
4061 op_id=nslcmop_id,
4062 stage=stage
4063 )
4064 self._deploy_n2vc(
4065 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
4066 member_vnf_index, vdu_id, vdu_index),
4067 db_nsr=db_nsr,
4068 db_vnfr=db_vnfr,
4069 nslcmop_id=nslcmop_id,
4070 nsr_id=nsr_id,
4071 nsi_id=nsi_id,
4072 vnfd_id=vnfd_id,
4073 vdu_id=vdu_id,
4074 kdu_name=kdu_name,
4075 member_vnf_index=member_vnf_index,
4076 vdu_index=vdu_index,
4077 vdu_name=vdu_name,
4078 deploy_params=deploy_params_vdu,
4079 descriptor_config=descriptor_config,
4080 base_folder=base_folder,
4081 task_instantiation_info=tasks_dict_info,
4082 stage=stage
4083 )
4084 # TODO: scaling for kdu is not implemented yet.
4085 kdu_name = vdu_info["osm_vdu_id"]
4086 descriptor_config = get_configuration(db_vnfd, kdu_name)
4087 if descriptor_config:
4088 vdu_id = None
4089 vdu_index = vdu_index
4090 vdu_name = None
4091 kdur = next(x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name)
4092 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
4093 if kdur.get("additionalParams"):
4094 deploy_params_kdu = parse_yaml_strings(kdur["additionalParams"])
4095
4096 self._deploy_n2vc(
4097 logging_text=logging_text,
4098 db_nsr=db_nsr,
4099 db_vnfr=db_vnfr,
4100 nslcmop_id=nslcmop_id,
4101 nsr_id=nsr_id,
4102 nsi_id=nsi_id,
4103 vnfd_id=vnfd_id,
4104 vdu_id=vdu_id,
4105 kdu_name=kdu_name,
4106 member_vnf_index=member_vnf_index,
4107 vdu_index=vdu_index,
4108 vdu_name=vdu_name,
4109 deploy_params=deploy_params_kdu,
4110 descriptor_config=descriptor_config,
4111 base_folder=base_folder,
4112 task_instantiation_info=tasks_dict_info,
4113 stage=stage
4114 )
4115 # SCALE-UP VCA - END
4116 scale_process = None
4117
4118 # POST-SCALE BEGIN
4119 # execute primitive service POST-SCALING
4120 step = "Executing post-scale vnf-config-primitive"
4121 if scaling_descriptor.get("scaling-config-action"):
4122 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
4123 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
4124 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
4125 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
4126 step = db_nslcmop_update["detailed-status"] = \
4127 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
4128
4129 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
4130 if db_vnfr.get("additionalParamsForVnf"):
4131 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
4132
4133 # look for primitive
4134 for config_primitive in (
4135 get_configuration(db_vnfd, db_vnfd["id"]) or {}
4136 ).get("config-primitive", ()):
4137 if config_primitive["name"] == vnf_config_primitive:
4138 break
4139 else:
4140 raise LcmException(
4141 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
4142 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
4143 "config-primitive".format(scaling_group, vnf_config_primitive))
4144 scale_process = "VCA"
4145 db_nsr_update["config-status"] = "configuring post-scaling"
4146 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
4147
4148 # Post-scale retry check: Check if this sub-operation has been executed before
4149 op_index = self._check_or_add_scale_suboperation(
4150 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
4151 if op_index == self.SUBOPERATION_STATUS_SKIP:
4152 # Skip sub-operation
4153 result = 'COMPLETED'
4154 result_detail = 'Done'
4155 self.logger.debug(logging_text +
4156 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
4157 format(vnf_config_primitive, result, result_detail))
4158 else:
4159 if op_index == self.SUBOPERATION_STATUS_NEW:
4160 # New sub-operation: Get index of this sub-operation
4161 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4162 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
4163 format(vnf_config_primitive))
4164 else:
4165 # retry: Get registered params for this existing sub-operation
4166 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4167 vnf_index = op.get('member_vnf_index')
4168 vnf_config_primitive = op.get('primitive')
4169 primitive_params = op.get('primitive_params')
4170 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
4171 format(vnf_config_primitive))
4172 # Execute the primitive, either with new (first-time) or registered (reintent) args
4173 ee_descriptor_id = config_primitive.get("execution-environment-ref")
4174 primitive_name = config_primitive.get("execution-environment-primitive",
4175 vnf_config_primitive)
4176 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
4177 member_vnf_index=vnf_index,
4178 vdu_id=None,
4179 vdu_count_index=None,
4180 ee_descriptor_id=ee_descriptor_id)
4181 result, result_detail = await self._ns_execute_primitive(
4182 ee_id, primitive_name, primitive_params, vca_type=vca_type)
4183 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
4184 vnf_config_primitive, result, result_detail))
4185 # Update operationState = COMPLETED | FAILED
4186 self._update_suboperation_status(
4187 db_nslcmop, op_index, result, result_detail)
4188
4189 if result == "FAILED":
4190 raise LcmException(result_detail)
4191 db_nsr_update["config-status"] = old_config_status
4192 scale_process = None
4193 # POST-SCALE END
4194
4195 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
4196 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
4197 else old_operational_status
4198 db_nsr_update["config-status"] = old_config_status
4199 return
4200 except (ROclient.ROClientException, DbException, LcmException, NgRoException) as e:
4201 self.logger.error(logging_text + "Exit Exception {}".format(e))
4202 exc = e
4203 except asyncio.CancelledError:
4204 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
4205 exc = "Operation was cancelled"
4206 except Exception as e:
4207 exc = traceback.format_exc()
4208 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
4209 finally:
4210 self._write_ns_status(nsr_id=nsr_id, ns_state=None, current_operation="IDLE", current_operation_id=None)
4211 if tasks_dict_info:
4212 stage[1] = "Waiting for instantiate pending tasks."
4213 self.logger.debug(logging_text + stage[1])
4214 exc = await self._wait_for_tasks(logging_text, tasks_dict_info, self.timeout_ns_deploy,
4215 stage, nslcmop_id, nsr_id=nsr_id)
4216 if exc:
4217 db_nslcmop_update["detailed-status"] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
4218 nslcmop_operation_state = "FAILED"
4219 if db_nsr:
4220 db_nsr_update["operational-status"] = old_operational_status
4221 db_nsr_update["config-status"] = old_config_status
4222 db_nsr_update["detailed-status"] = ""
4223 if scale_process:
4224 if "VCA" in scale_process:
4225 db_nsr_update["config-status"] = "failed"
4226 if "RO" in scale_process:
4227 db_nsr_update["operational-status"] = "failed"
4228 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
4229 exc)
4230 else:
4231 error_description_nslcmop = None
4232 nslcmop_operation_state = "COMPLETED"
4233 db_nslcmop_update["detailed-status"] = "Done"
4234
4235 self._write_op_status(op_id=nslcmop_id, stage="", error_message=error_description_nslcmop,
4236 operation_state=nslcmop_operation_state, other_update=db_nslcmop_update)
4237 if db_nsr:
4238 self._write_ns_status(nsr_id=nsr_id, ns_state=None, current_operation="IDLE",
4239 current_operation_id=None, other_update=db_nsr_update)
4240
4241 if nslcmop_operation_state:
4242 try:
4243 msg = {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id, "operationState": nslcmop_operation_state}
4244 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
4245 except Exception as e:
4246 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
4247 self.logger.debug(logging_text + "Exit")
4248 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
4249
4250 async def _scale_ng_ro(self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage):
4251 nsr_id = db_nslcmop["nsInstanceId"]
4252 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4253 db_vnfrs = {}
4254
4255 # read from db: vnfd's for every vnf
4256 db_vnfds = []
4257
4258 # for each vnf in ns, read vnfd
4259 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
4260 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
4261 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
4262 # if we haven't this vnfd, read it from db
4263 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
4264 # read from db
4265 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4266 db_vnfds.append(vnfd)
4267 n2vc_key = self.n2vc.get_public_key()
4268 n2vc_key_list = [n2vc_key]
4269 self.scale_vnfr(db_vnfr, vdu_scaling_info.get("vdu-create"), vdu_scaling_info.get("vdu-delete"),
4270 mark_delete=True)
4271 # db_vnfr has been updated, update db_vnfrs to use it
4272 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
4273 await self._instantiate_ng_ro(logging_text, nsr_id, db_nsd, db_nsr, db_nslcmop, db_vnfrs,
4274 db_vnfds, n2vc_key_list, stage=stage, start_deploy=time(),
4275 timeout_ns_deploy=self.timeout_ns_deploy)
4276 if vdu_scaling_info.get("vdu-delete"):
4277 self.scale_vnfr(db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False)
4278
4279 async def add_prometheus_metrics(self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip):
4280 if not self.prometheus:
4281 return
4282 # look if exist a file called 'prometheus*.j2' and
4283 artifact_content = self.fs.dir_ls(artifact_path)
4284 job_file = next((f for f in artifact_content if f.startswith("prometheus") and f.endswith(".j2")), None)
4285 if not job_file:
4286 return
4287 with self.fs.file_open((artifact_path, job_file), "r") as f:
4288 job_data = f.read()
4289
4290 # TODO get_service
4291 _, _, service = ee_id.partition(".") # remove prefix "namespace."
4292 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
4293 host_port = "80"
4294 vnfr_id = vnfr_id.replace("-", "")
4295 variables = {
4296 "JOB_NAME": vnfr_id,
4297 "TARGET_IP": target_ip,
4298 "EXPORTER_POD_IP": host_name,
4299 "EXPORTER_POD_PORT": host_port,
4300 }
4301 job_list = self.prometheus.parse_job(job_data, variables)
4302 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
4303 for job in job_list:
4304 if not isinstance(job.get("job_name"), str) or vnfr_id not in job["job_name"]:
4305 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
4306 job["nsr_id"] = nsr_id
4307 job_dict = {jl["job_name"]: jl for jl in job_list}
4308 if await self.prometheus.update(job_dict):
4309 return list(job_dict.keys())
4310
4311 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
4312 """
4313 Get VCA Cloud and VCA Cloud Credentials for the VIM account
4314
4315 :param: vim_account_id: VIM Account ID
4316
4317 :return: (cloud_name, cloud_credential)
4318 """
4319 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
4320 return config.get("vca_cloud"), config.get("vca_cloud_credential")
4321
4322 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
4323 """
4324 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
4325
4326 :param: vim_account_id: VIM Account ID
4327
4328 :return: (cloud_name, cloud_credential)
4329 """
4330 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
4331 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")