7a07321d66def1f1a3b2c4bc4da80566a82af2f9
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, TemplateError, TemplateNotFound, StrictUndefined, UndefinedError
26
27 from osm_lcm import ROclient
28 from osm_lcm.ng_ro import NgRoClient, NgRoException
29 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
30 from osm_lcm.data_utils.nsd import get_vnf_profiles
31 from osm_lcm.data_utils.vnfd import get_vdu_list, get_vdu_profile, \
32 get_ee_sorted_initial_config_primitive_list, get_ee_sorted_terminate_config_primitive_list, \
33 get_kdu_list, get_virtual_link_profiles, get_vdu, get_configuration, \
34 get_vdu_index, get_scaling_aspect, get_number_of_instances, get_juju_ee_ref
35 from osm_lcm.data_utils.list_utils import find_in_list
36 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index
37 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
38 from osm_lcm.data_utils.database.vim_account import VimAccountDB
39 from n2vc.k8s_helm_conn import K8sHelmConnector
40 from n2vc.k8s_helm3_conn import K8sHelm3Connector
41 from n2vc.k8s_juju_conn import K8sJujuConnector
42
43 from osm_common.dbbase import DbException
44 from osm_common.fsbase import FsException
45
46 from osm_lcm.data_utils.database.database import Database
47 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
48
49 from n2vc.n2vc_juju_conn import N2VCJujuConnector
50 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
51
52 from osm_lcm.lcm_helm_conn import LCMHelmConn
53
54 from copy import copy, deepcopy
55 from time import time
56 from uuid import uuid4
57
58 from random import randint
59
60 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
61
62
63 class NsLcm(LcmBase):
64 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
65 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
66 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
67 timeout_charm_delete = 10 * 60
68 timeout_primitive = 30 * 60 # timeout for primitive execution
69 timeout_progress_primitive = 10 * 60 # timeout for some progress in a primitive execution
70
71 SUBOPERATION_STATUS_NOT_FOUND = -1
72 SUBOPERATION_STATUS_NEW = -2
73 SUBOPERATION_STATUS_SKIP = -3
74 task_name_deploy_vca = "Deploying VCA"
75
76 def __init__(self, msg, lcm_tasks, config, loop, prometheus=None):
77 """
78 Init, Connect to database, filesystem storage, and messaging
79 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
80 :return: None
81 """
82 super().__init__(
83 msg=msg,
84 logger=logging.getLogger('lcm.ns')
85 )
86
87 self.db = Database().instance.db
88 self.fs = Filesystem().instance.fs
89 self.loop = loop
90 self.lcm_tasks = lcm_tasks
91 self.timeout = config["timeout"]
92 self.ro_config = config["ro_config"]
93 self.ng_ro = config["ro_config"].get("ng")
94 self.vca_config = config["VCA"].copy()
95
96 # create N2VC connector
97 self.n2vc = N2VCJujuConnector(
98 log=self.logger,
99 loop=self.loop,
100 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
101 username=self.vca_config.get('user', None),
102 vca_config=self.vca_config,
103 on_update_db=self._on_update_n2vc_db,
104 fs=self.fs,
105 db=self.db
106 )
107
108 self.conn_helm_ee = LCMHelmConn(
109 log=self.logger,
110 loop=self.loop,
111 url=None,
112 username=None,
113 vca_config=self.vca_config,
114 on_update_db=self._on_update_n2vc_db
115 )
116
117 self.k8sclusterhelm2 = K8sHelmConnector(
118 kubectl_command=self.vca_config.get("kubectlpath"),
119 helm_command=self.vca_config.get("helmpath"),
120 log=self.logger,
121 on_update_db=None,
122 fs=self.fs,
123 db=self.db
124 )
125
126 self.k8sclusterhelm3 = K8sHelm3Connector(
127 kubectl_command=self.vca_config.get("kubectlpath"),
128 helm_command=self.vca_config.get("helm3path"),
129 fs=self.fs,
130 log=self.logger,
131 db=self.db,
132 on_update_db=None,
133 )
134
135 self.k8sclusterjuju = K8sJujuConnector(
136 kubectl_command=self.vca_config.get("kubectlpath"),
137 juju_command=self.vca_config.get("jujupath"),
138 log=self.logger,
139 loop=self.loop,
140 on_update_db=None,
141 vca_config=self.vca_config,
142 fs=self.fs,
143 db=self.db
144 )
145
146 self.k8scluster_map = {
147 "helm-chart": self.k8sclusterhelm2,
148 "helm-chart-v3": self.k8sclusterhelm3,
149 "chart": self.k8sclusterhelm3,
150 "juju-bundle": self.k8sclusterjuju,
151 "juju": self.k8sclusterjuju,
152 }
153
154 self.vca_map = {
155 "lxc_proxy_charm": self.n2vc,
156 "native_charm": self.n2vc,
157 "k8s_proxy_charm": self.n2vc,
158 "helm": self.conn_helm_ee,
159 "helm-v3": self.conn_helm_ee
160 }
161
162 self.prometheus = prometheus
163
164 # create RO client
165 self.RO = NgRoClient(self.loop, **self.ro_config)
166
167 @staticmethod
168 def increment_ip_mac(ip_mac, vm_index=1):
169 if not isinstance(ip_mac, str):
170 return ip_mac
171 try:
172 # try with ipv4 look for last dot
173 i = ip_mac.rfind(".")
174 if i > 0:
175 i += 1
176 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
177 # try with ipv6 or mac look for last colon. Operate in hex
178 i = ip_mac.rfind(":")
179 if i > 0:
180 i += 1
181 # format in hex, len can be 2 for mac or 4 for ipv6
182 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(ip_mac[:i], int(ip_mac[i:], 16) + vm_index)
183 except Exception:
184 pass
185 return None
186
187 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
188
189 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
190
191 try:
192 # TODO filter RO descriptor fields...
193
194 # write to database
195 db_dict = dict()
196 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
197 db_dict['deploymentStatus'] = ro_descriptor
198 self.update_db_2("nsrs", nsrs_id, db_dict)
199
200 except Exception as e:
201 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
202
203 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
204
205 # remove last dot from path (if exists)
206 if path.endswith('.'):
207 path = path[:-1]
208
209 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
210 # .format(table, filter, path, updated_data))
211 try:
212
213 nsr_id = filter.get('_id')
214
215 # read ns record from database
216 nsr = self.db.get_one(table='nsrs', q_filter=filter)
217 current_ns_status = nsr.get('nsState')
218
219 # get vca status for NS
220 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
221
222 # vcaStatus
223 db_dict = dict()
224 db_dict['vcaStatus'] = status_dict
225
226 # update configurationStatus for this VCA
227 try:
228 vca_index = int(path[path.rfind(".")+1:])
229
230 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
231 vca_status = vca_list[vca_index].get('status')
232
233 configuration_status_list = nsr.get('configurationStatus')
234 config_status = configuration_status_list[vca_index].get('status')
235
236 if config_status == 'BROKEN' and vca_status != 'failed':
237 db_dict['configurationStatus'][vca_index] = 'READY'
238 elif config_status != 'BROKEN' and vca_status == 'failed':
239 db_dict['configurationStatus'][vca_index] = 'BROKEN'
240 except Exception as e:
241 # not update configurationStatus
242 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
243
244 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
245 # if nsState = 'DEGRADED' check if all is OK
246 is_degraded = False
247 if current_ns_status in ('READY', 'DEGRADED'):
248 error_description = ''
249 # check machines
250 if status_dict.get('machines'):
251 for machine_id in status_dict.get('machines'):
252 machine = status_dict.get('machines').get(machine_id)
253 # check machine agent-status
254 if machine.get('agent-status'):
255 s = machine.get('agent-status').get('status')
256 if s != 'started':
257 is_degraded = True
258 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
259 # check machine instance status
260 if machine.get('instance-status'):
261 s = machine.get('instance-status').get('status')
262 if s != 'running':
263 is_degraded = True
264 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
265 # check applications
266 if status_dict.get('applications'):
267 for app_id in status_dict.get('applications'):
268 app = status_dict.get('applications').get(app_id)
269 # check application status
270 if app.get('status'):
271 s = app.get('status').get('status')
272 if s != 'active':
273 is_degraded = True
274 error_description += 'application {} status={} ; '.format(app_id, s)
275
276 if error_description:
277 db_dict['errorDescription'] = error_description
278 if current_ns_status == 'READY' and is_degraded:
279 db_dict['nsState'] = 'DEGRADED'
280 if current_ns_status == 'DEGRADED' and not is_degraded:
281 db_dict['nsState'] = 'READY'
282
283 # write to database
284 self.update_db_2("nsrs", nsr_id, db_dict)
285
286 except (asyncio.CancelledError, asyncio.TimeoutError):
287 raise
288 except Exception as e:
289 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
290
291 @staticmethod
292 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
293 try:
294 env = Environment(undefined=StrictUndefined)
295 template = env.from_string(cloud_init_text)
296 return template.render(additional_params or {})
297 except UndefinedError as e:
298 raise LcmException("Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
299 "file, must be provided in the instantiation parameters inside the "
300 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id))
301 except (TemplateError, TemplateNotFound) as e:
302 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
303 format(vnfd_id, vdu_id, e))
304
305 def _get_vdu_cloud_init_content(self, vdu, vnfd):
306 cloud_init_content = cloud_init_file = None
307 try:
308 if vdu.get("cloud-init-file"):
309 base_folder = vnfd["_admin"]["storage"]
310 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
311 vdu["cloud-init-file"])
312 with self.fs.file_open(cloud_init_file, "r") as ci_file:
313 cloud_init_content = ci_file.read()
314 elif vdu.get("cloud-init"):
315 cloud_init_content = vdu["cloud-init"]
316
317 return cloud_init_content
318 except FsException as e:
319 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
320 format(vnfd["id"], vdu["id"], cloud_init_file, e))
321
322 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
323 vdur = next(vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"])
324 additional_params = vdur.get("additionalParams")
325 return parse_yaml_strings(additional_params)
326
327 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
328 """
329 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
330 :param vnfd: input vnfd
331 :param new_id: overrides vnf id if provided
332 :param additionalParams: Instantiation params for VNFs provided
333 :param nsrId: Id of the NSR
334 :return: copy of vnfd
335 """
336 vnfd_RO = deepcopy(vnfd)
337 # remove unused by RO configuration, monitoring, scaling and internal keys
338 vnfd_RO.pop("_id", None)
339 vnfd_RO.pop("_admin", None)
340 vnfd_RO.pop("monitoring-param", None)
341 vnfd_RO.pop("scaling-group-descriptor", None)
342 vnfd_RO.pop("kdu", None)
343 vnfd_RO.pop("k8s-cluster", None)
344 if new_id:
345 vnfd_RO["id"] = new_id
346
347 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
348 for vdu in get_iterable(vnfd_RO, "vdu"):
349 vdu.pop("cloud-init-file", None)
350 vdu.pop("cloud-init", None)
351 return vnfd_RO
352
353 @staticmethod
354 def ip_profile_2_RO(ip_profile):
355 RO_ip_profile = deepcopy(ip_profile)
356 if "dns-server" in RO_ip_profile:
357 if isinstance(RO_ip_profile["dns-server"], list):
358 RO_ip_profile["dns-address"] = []
359 for ds in RO_ip_profile.pop("dns-server"):
360 RO_ip_profile["dns-address"].append(ds['address'])
361 else:
362 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
363 if RO_ip_profile.get("ip-version") == "ipv4":
364 RO_ip_profile["ip-version"] = "IPv4"
365 if RO_ip_profile.get("ip-version") == "ipv6":
366 RO_ip_profile["ip-version"] = "IPv6"
367 if "dhcp-params" in RO_ip_profile:
368 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
369 return RO_ip_profile
370
371 def _get_ro_vim_id_for_vim_account(self, vim_account):
372 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
373 if db_vim["_admin"]["operationalState"] != "ENABLED":
374 raise LcmException("VIM={} is not available. operationalState={}".format(
375 vim_account, db_vim["_admin"]["operationalState"]))
376 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
377 return RO_vim_id
378
379 def get_ro_wim_id_for_wim_account(self, wim_account):
380 if isinstance(wim_account, str):
381 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
382 if db_wim["_admin"]["operationalState"] != "ENABLED":
383 raise LcmException("WIM={} is not available. operationalState={}".format(
384 wim_account, db_wim["_admin"]["operationalState"]))
385 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
386 return RO_wim_id
387 else:
388 return wim_account
389
390 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
391
392 db_vdu_push_list = []
393 db_update = {"_admin.modified": time()}
394 if vdu_create:
395 for vdu_id, vdu_count in vdu_create.items():
396 vdur = next((vdur for vdur in reversed(db_vnfr["vdur"]) if vdur["vdu-id-ref"] == vdu_id), None)
397 if not vdur:
398 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".
399 format(vdu_id))
400
401 for count in range(vdu_count):
402 vdur_copy = deepcopy(vdur)
403 vdur_copy["status"] = "BUILD"
404 vdur_copy["status-detailed"] = None
405 vdur_copy["ip-address"]: None
406 vdur_copy["_id"] = str(uuid4())
407 vdur_copy["count-index"] += count + 1
408 vdur_copy["id"] = "{}-{}".format(vdur_copy["vdu-id-ref"], vdur_copy["count-index"])
409 vdur_copy.pop("vim_info", None)
410 for iface in vdur_copy["interfaces"]:
411 if iface.get("fixed-ip"):
412 iface["ip-address"] = self.increment_ip_mac(iface["ip-address"], count+1)
413 else:
414 iface.pop("ip-address", None)
415 if iface.get("fixed-mac"):
416 iface["mac-address"] = self.increment_ip_mac(iface["mac-address"], count+1)
417 else:
418 iface.pop("mac-address", None)
419 iface.pop("mgmt_vnf", None) # only first vdu can be managment of vnf
420 db_vdu_push_list.append(vdur_copy)
421 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
422 if vdu_delete:
423 for vdu_id, vdu_count in vdu_delete.items():
424 if mark_delete:
425 indexes_to_delete = [iv[0] for iv in enumerate(db_vnfr["vdur"]) if iv[1]["vdu-id-ref"] == vdu_id]
426 db_update.update({"vdur.{}.status".format(i): "DELETING" for i in indexes_to_delete[-vdu_count:]})
427 else:
428 # it must be deleted one by one because common.db does not allow otherwise
429 vdus_to_delete = [v for v in reversed(db_vnfr["vdur"]) if v["vdu-id-ref"] == vdu_id]
430 for vdu in vdus_to_delete[:vdu_count]:
431 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, None, pull={"vdur": {"_id": vdu["_id"]}})
432 db_push = {"vdur": db_vdu_push_list} if db_vdu_push_list else None
433 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
434 # modify passed dictionary db_vnfr
435 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
436 db_vnfr["vdur"] = db_vnfr_["vdur"]
437
438 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
439 """
440 Updates database nsr with the RO info for the created vld
441 :param ns_update_nsr: dictionary to be filled with the updated info
442 :param db_nsr: content of db_nsr. This is also modified
443 :param nsr_desc_RO: nsr descriptor from RO
444 :return: Nothing, LcmException is raised on errors
445 """
446
447 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
448 for net_RO in get_iterable(nsr_desc_RO, "nets"):
449 if vld["id"] != net_RO.get("ns_net_osm_id"):
450 continue
451 vld["vim-id"] = net_RO.get("vim_net_id")
452 vld["name"] = net_RO.get("vim_name")
453 vld["status"] = net_RO.get("status")
454 vld["status-detailed"] = net_RO.get("error_msg")
455 ns_update_nsr["vld.{}".format(vld_index)] = vld
456 break
457 else:
458 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
459
460 def set_vnfr_at_error(self, db_vnfrs, error_text):
461 try:
462 for db_vnfr in db_vnfrs.values():
463 vnfr_update = {"status": "ERROR"}
464 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
465 if "status" not in vdur:
466 vdur["status"] = "ERROR"
467 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
468 if error_text:
469 vdur["status-detailed"] = str(error_text)
470 vnfr_update["vdur.{}.status-detailed".format(vdu_index)] = "ERROR"
471 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
472 except DbException as e:
473 self.logger.error("Cannot update vnf. {}".format(e))
474
475 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
476 """
477 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
478 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
479 :param nsr_desc_RO: nsr descriptor from RO
480 :return: Nothing, LcmException is raised on errors
481 """
482 for vnf_index, db_vnfr in db_vnfrs.items():
483 for vnf_RO in nsr_desc_RO["vnfs"]:
484 if vnf_RO["member_vnf_index"] != vnf_index:
485 continue
486 vnfr_update = {}
487 if vnf_RO.get("ip_address"):
488 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
489 elif not db_vnfr.get("ip-address"):
490 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
491 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
492
493 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
494 vdur_RO_count_index = 0
495 if vdur.get("pdu-type"):
496 continue
497 for vdur_RO in get_iterable(vnf_RO, "vms"):
498 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
499 continue
500 if vdur["count-index"] != vdur_RO_count_index:
501 vdur_RO_count_index += 1
502 continue
503 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
504 if vdur_RO.get("ip_address"):
505 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
506 else:
507 vdur["ip-address"] = None
508 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
509 vdur["name"] = vdur_RO.get("vim_name")
510 vdur["status"] = vdur_RO.get("status")
511 vdur["status-detailed"] = vdur_RO.get("error_msg")
512 for ifacer in get_iterable(vdur, "interfaces"):
513 for interface_RO in get_iterable(vdur_RO, "interfaces"):
514 if ifacer["name"] == interface_RO.get("internal_name"):
515 ifacer["ip-address"] = interface_RO.get("ip_address")
516 ifacer["mac-address"] = interface_RO.get("mac_address")
517 break
518 else:
519 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
520 "from VIM info"
521 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
522 vnfr_update["vdur.{}".format(vdu_index)] = vdur
523 break
524 else:
525 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
526 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
527
528 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
529 for net_RO in get_iterable(nsr_desc_RO, "nets"):
530 if vld["id"] != net_RO.get("vnf_net_osm_id"):
531 continue
532 vld["vim-id"] = net_RO.get("vim_net_id")
533 vld["name"] = net_RO.get("vim_name")
534 vld["status"] = net_RO.get("status")
535 vld["status-detailed"] = net_RO.get("error_msg")
536 vnfr_update["vld.{}".format(vld_index)] = vld
537 break
538 else:
539 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
540 vnf_index, vld["id"]))
541
542 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
543 break
544
545 else:
546 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
547
548 def _get_ns_config_info(self, nsr_id):
549 """
550 Generates a mapping between vnf,vdu elements and the N2VC id
551 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
552 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
553 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
554 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
555 """
556 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
557 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
558 mapping = {}
559 ns_config_info = {"osm-config-mapping": mapping}
560 for vca in vca_deployed_list:
561 if not vca["member-vnf-index"]:
562 continue
563 if not vca["vdu_id"]:
564 mapping[vca["member-vnf-index"]] = vca["application"]
565 else:
566 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
567 vca["application"]
568 return ns_config_info
569
570 async def _instantiate_ng_ro(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds,
571 n2vc_key_list, stage, start_deploy, timeout_ns_deploy):
572
573 db_vims = {}
574
575 def get_vim_account(vim_account_id):
576 nonlocal db_vims
577 if vim_account_id in db_vims:
578 return db_vims[vim_account_id]
579 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
580 db_vims[vim_account_id] = db_vim
581 return db_vim
582
583 # modify target_vld info with instantiation parameters
584 def parse_vld_instantiation_params(target_vim, target_vld, vld_params, target_sdn):
585 if vld_params.get("ip-profile"):
586 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params["ip-profile"]
587 if vld_params.get("provider-network"):
588 target_vld["vim_info"][target_vim]["provider_network"] = vld_params["provider-network"]
589 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
590 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params["provider-network"]["sdn-ports"]
591 if vld_params.get("wimAccountId"):
592 target_wim = "wim:{}".format(vld_params["wimAccountId"])
593 target_vld["vim_info"][target_wim] = {}
594 for param in ("vim-network-name", "vim-network-id"):
595 if vld_params.get(param):
596 if isinstance(vld_params[param], dict):
597 for vim, vim_net in vld_params[param].items():
598 other_target_vim = "vim:" + vim
599 populate_dict(target_vld["vim_info"], (other_target_vim, param.replace("-", "_")), vim_net)
600 else: # isinstance str
601 target_vld["vim_info"][target_vim][param.replace("-", "_")] = vld_params[param]
602 if vld_params.get("common_id"):
603 target_vld["common_id"] = vld_params.get("common_id")
604
605 nslcmop_id = db_nslcmop["_id"]
606 target = {
607 "name": db_nsr["name"],
608 "ns": {"vld": []},
609 "vnf": [],
610 "image": deepcopy(db_nsr["image"]),
611 "flavor": deepcopy(db_nsr["flavor"]),
612 "action_id": nslcmop_id,
613 "cloud_init_content": {},
614 }
615 for image in target["image"]:
616 image["vim_info"] = {}
617 for flavor in target["flavor"]:
618 flavor["vim_info"] = {}
619
620 if db_nslcmop.get("lcmOperationType") != "instantiate":
621 # get parameters of instantiation:
622 db_nslcmop_instantiate = self.db.get_list("nslcmops", {"nsInstanceId": db_nslcmop["nsInstanceId"],
623 "lcmOperationType": "instantiate"})[-1]
624 ns_params = db_nslcmop_instantiate.get("operationParams")
625 else:
626 ns_params = db_nslcmop.get("operationParams")
627 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
628 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
629
630 cp2target = {}
631 for vld_index, vld in enumerate(db_nsr.get("vld")):
632 target_vim = "vim:{}".format(ns_params["vimAccountId"])
633 target_vld = {
634 "id": vld["id"],
635 "name": vld["name"],
636 "mgmt-network": vld.get("mgmt-network", False),
637 "type": vld.get("type"),
638 "vim_info": {
639 target_vim: {
640 "vim_network_name": vld.get("vim-network-name"),
641 "vim_account_id": ns_params["vimAccountId"]
642 }
643 }
644 }
645 # check if this network needs SDN assist
646 if vld.get("pci-interfaces"):
647 db_vim = get_vim_account(ns_params["vimAccountId"])
648 sdnc_id = db_vim["config"].get("sdn-controller")
649 if sdnc_id:
650 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
651 target_sdn = "sdn:{}".format(sdnc_id)
652 target_vld["vim_info"][target_sdn] = {
653 "sdn": True, "target_vim": target_vim, "vlds": [sdn_vld], "type": vld.get("type")}
654
655 nsd_vnf_profiles = get_vnf_profiles(nsd)
656 for nsd_vnf_profile in nsd_vnf_profiles:
657 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
658 if cp["virtual-link-profile-id"] == vld["id"]:
659 cp2target["member_vnf:{}.{}".format(
660 cp["constituent-cpd-id"][0]["constituent-base-element-id"],
661 cp["constituent-cpd-id"][0]["constituent-cpd-id"]
662 )] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
663
664 # check at nsd descriptor, if there is an ip-profile
665 vld_params = {}
666 virtual_link_profiles = get_virtual_link_profiles(nsd)
667
668 for vlp in virtual_link_profiles:
669 ip_profile = find_in_list(nsd["ip-profiles"],
670 lambda profile: profile["name"] == vlp["ip-profile-ref"])
671 vld_params["ip-profile"] = ip_profile["ip-profile-params"]
672 # update vld_params with instantiation params
673 vld_instantiation_params = find_in_list(get_iterable(ns_params, "vld"),
674 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]))
675 if vld_instantiation_params:
676 vld_params.update(vld_instantiation_params)
677 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
678 target["ns"]["vld"].append(target_vld)
679
680 for vnfr in db_vnfrs.values():
681 vnfd = find_in_list(db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"])
682 vnf_params = find_in_list(get_iterable(ns_params, "vnf"),
683 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"])
684 target_vnf = deepcopy(vnfr)
685 target_vim = "vim:{}".format(vnfr["vim-account-id"])
686 for vld in target_vnf.get("vld", ()):
687 # check if connected to a ns.vld, to fill target'
688 vnf_cp = find_in_list(vnfd.get("int-virtual-link-desc", ()),
689 lambda cpd: cpd.get("id") == vld["id"])
690 if vnf_cp:
691 ns_cp = "member_vnf:{}.{}".format(vnfr["member-vnf-index-ref"], vnf_cp["id"])
692 if cp2target.get(ns_cp):
693 vld["target"] = cp2target[ns_cp]
694
695 vld["vim_info"] = {target_vim: {"vim_network_name": vld.get("vim-network-name")}}
696 # check if this network needs SDN assist
697 target_sdn = None
698 if vld.get("pci-interfaces"):
699 db_vim = get_vim_account(vnfr["vim-account-id"])
700 sdnc_id = db_vim["config"].get("sdn-controller")
701 if sdnc_id:
702 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
703 target_sdn = "sdn:{}".format(sdnc_id)
704 vld["vim_info"][target_sdn] = {
705 "sdn": True, "target_vim": target_vim, "vlds": [sdn_vld], "type": vld.get("type")}
706
707 # check at vnfd descriptor, if there is an ip-profile
708 vld_params = {}
709 vnfd_vlp = find_in_list(
710 get_virtual_link_profiles(vnfd),
711 lambda a_link_profile: a_link_profile["id"] == vld["id"]
712 )
713 if vnfd_vlp and vnfd_vlp.get("virtual-link-protocol-data") and \
714 vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data"):
715 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"]["l3-protocol-data"]
716 ip_profile_dest_data = {}
717 if "ip-version" in ip_profile_source_data:
718 ip_profile_dest_data["ip-version"] = ip_profile_source_data["ip-version"]
719 if "cidr" in ip_profile_source_data:
720 ip_profile_dest_data["subnet-address"] = ip_profile_source_data["cidr"]
721 if "gateway-ip" in ip_profile_source_data:
722 ip_profile_dest_data["gateway-address"] = ip_profile_source_data["gateway-ip"]
723 if "dhcp-enabled" in ip_profile_source_data:
724 ip_profile_dest_data["dhcp-params"] = {
725 "enabled": ip_profile_source_data["dhcp-enabled"]
726 }
727
728 vld_params["ip-profile"] = ip_profile_dest_data
729 # update vld_params with instantiation params
730 if vnf_params:
731 vld_instantiation_params = find_in_list(get_iterable(vnf_params, "internal-vld"),
732 lambda i_vld: i_vld["name"] == vld["id"])
733 if vld_instantiation_params:
734 vld_params.update(vld_instantiation_params)
735 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
736
737 vdur_list = []
738 for vdur in target_vnf.get("vdur", ()):
739 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
740 continue # This vdu must not be created
741 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
742
743 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
744
745 if ssh_keys_all:
746 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
747 vnf_configuration = get_configuration(vnfd, vnfd["id"])
748 if vdu_configuration and vdu_configuration.get("config-access") and \
749 vdu_configuration.get("config-access").get("ssh-access"):
750 vdur["ssh-keys"] = ssh_keys_all
751 vdur["ssh-access-required"] = vdu_configuration["config-access"]["ssh-access"]["required"]
752 elif vnf_configuration and vnf_configuration.get("config-access") and \
753 vnf_configuration.get("config-access").get("ssh-access") and \
754 any(iface.get("mgmt-vnf") for iface in vdur["interfaces"]):
755 vdur["ssh-keys"] = ssh_keys_all
756 vdur["ssh-access-required"] = vnf_configuration["config-access"]["ssh-access"]["required"]
757 elif ssh_keys_instantiation and \
758 find_in_list(vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")):
759 vdur["ssh-keys"] = ssh_keys_instantiation
760
761 self.logger.debug("NS > vdur > {}".format(vdur))
762
763 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
764 # cloud-init
765 if vdud.get("cloud-init-file"):
766 vdur["cloud-init"] = "{}:file:{}".format(vnfd["_id"], vdud.get("cloud-init-file"))
767 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
768 if vdur["cloud-init"] not in target["cloud_init_content"]:
769 base_folder = vnfd["_admin"]["storage"]
770 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
771 vdud.get("cloud-init-file"))
772 with self.fs.file_open(cloud_init_file, "r") as ci_file:
773 target["cloud_init_content"][vdur["cloud-init"]] = ci_file.read()
774 elif vdud.get("cloud-init"):
775 vdur["cloud-init"] = "{}:vdu:{}".format(vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"]))
776 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
777 target["cloud_init_content"][vdur["cloud-init"]] = vdud["cloud-init"]
778 vdur["additionalParams"] = vdur.get("additionalParams") or {}
779 deploy_params_vdu = self._format_additional_params(vdur.get("additionalParams") or {})
780 deploy_params_vdu["OSM"] = get_osm_params(vnfr, vdur["vdu-id-ref"], vdur["count-index"])
781 vdur["additionalParams"] = deploy_params_vdu
782
783 # flavor
784 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
785 if target_vim not in ns_flavor["vim_info"]:
786 ns_flavor["vim_info"][target_vim] = {}
787
788 # deal with images
789 # in case alternative images are provided we must check if they should be applied
790 # for the vim_type, modify the vim_type taking into account
791 ns_image_id = int(vdur["ns-image-id"])
792 if vdur.get("alt-image-ids"):
793 db_vim = get_vim_account(vnfr["vim-account-id"])
794 vim_type = db_vim["vim_type"]
795 for alt_image_id in vdur.get("alt-image-ids"):
796 ns_alt_image = target["image"][int(alt_image_id)]
797 if vim_type == ns_alt_image.get("vim-type"):
798 # must use alternative image
799 self.logger.debug("use alternative image id: {}".format(alt_image_id))
800 ns_image_id = alt_image_id
801 vdur["ns-image-id"] = ns_image_id
802 break
803 ns_image = target["image"][int(ns_image_id)]
804 if target_vim not in ns_image["vim_info"]:
805 ns_image["vim_info"][target_vim] = {}
806
807 vdur["vim_info"] = {target_vim: {}}
808 # instantiation parameters
809 # if vnf_params:
810 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
811 # vdud["id"]), None)
812 vdur_list.append(vdur)
813 target_vnf["vdur"] = vdur_list
814 target["vnf"].append(target_vnf)
815
816 desc = await self.RO.deploy(nsr_id, target)
817 self.logger.debug("RO return > {}".format(desc))
818 action_id = desc["action_id"]
819 await self._wait_ng_ro(nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage)
820
821 # Updating NSR
822 db_nsr_update = {
823 "_admin.deployed.RO.operational-status": "running",
824 "detailed-status": " ".join(stage)
825 }
826 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
827 self.update_db_2("nsrs", nsr_id, db_nsr_update)
828 self._write_op_status(nslcmop_id, stage)
829 self.logger.debug(logging_text + "ns deployed at RO. RO_id={}".format(action_id))
830 return
831
832 async def _wait_ng_ro(self, nsr_id, action_id, nslcmop_id=None, start_time=None, timeout=600, stage=None):
833 detailed_status_old = None
834 db_nsr_update = {}
835 start_time = start_time or time()
836 while time() <= start_time + timeout:
837 desc_status = await self.RO.status(nsr_id, action_id)
838 self.logger.debug("Wait NG RO > {}".format(desc_status))
839 if desc_status["status"] == "FAILED":
840 raise NgRoException(desc_status["details"])
841 elif desc_status["status"] == "BUILD":
842 if stage:
843 stage[2] = "VIM: ({})".format(desc_status["details"])
844 elif desc_status["status"] == "DONE":
845 if stage:
846 stage[2] = "Deployed at VIM"
847 break
848 else:
849 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status["status"])
850 if stage and nslcmop_id and stage[2] != detailed_status_old:
851 detailed_status_old = stage[2]
852 db_nsr_update["detailed-status"] = " ".join(stage)
853 self.update_db_2("nsrs", nsr_id, db_nsr_update)
854 self._write_op_status(nslcmop_id, stage)
855 await asyncio.sleep(15, loop=self.loop)
856 else: # timeout_ns_deploy
857 raise NgRoException("Timeout waiting ns to deploy")
858
859 async def _terminate_ng_ro(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
860 db_nsr_update = {}
861 failed_detail = []
862 action_id = None
863 start_deploy = time()
864 try:
865 target = {
866 "ns": {"vld": []},
867 "vnf": [],
868 "image": [],
869 "flavor": [],
870 "action_id": nslcmop_id
871 }
872 desc = await self.RO.deploy(nsr_id, target)
873 action_id = desc["action_id"]
874 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
875 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
876 self.logger.debug(logging_text + "ns terminate action at RO. action_id={}".format(action_id))
877
878 # wait until done
879 delete_timeout = 20 * 60 # 20 minutes
880 await self._wait_ng_ro(nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage)
881
882 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
883 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
884 # delete all nsr
885 await self.RO.delete(nsr_id)
886 except Exception as e:
887 if isinstance(e, NgRoException) and e.http_code == 404: # not found
888 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
889 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
890 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
891 self.logger.debug(logging_text + "RO_action_id={} already deleted".format(action_id))
892 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
893 failed_detail.append("delete conflict: {}".format(e))
894 self.logger.debug(logging_text + "RO_action_id={} delete conflict: {}".format(action_id, e))
895 else:
896 failed_detail.append("delete error: {}".format(e))
897 self.logger.error(logging_text + "RO_action_id={} delete error: {}".format(action_id, e))
898
899 if failed_detail:
900 stage[2] = "Error deleting from VIM"
901 else:
902 stage[2] = "Deleted from VIM"
903 db_nsr_update["detailed-status"] = " ".join(stage)
904 self.update_db_2("nsrs", nsr_id, db_nsr_update)
905 self._write_op_status(nslcmop_id, stage)
906
907 if failed_detail:
908 raise LcmException("; ".join(failed_detail))
909 return
910
911 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds,
912 n2vc_key_list, stage):
913 """
914 Instantiate at RO
915 :param logging_text: preffix text to use at logging
916 :param nsr_id: nsr identity
917 :param nsd: database content of ns descriptor
918 :param db_nsr: database content of ns record
919 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
920 :param db_vnfrs:
921 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
922 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
923 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
924 :return: None or exception
925 """
926 try:
927 start_deploy = time()
928 ns_params = db_nslcmop.get("operationParams")
929 if ns_params and ns_params.get("timeout_ns_deploy"):
930 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
931 else:
932 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
933
934 # Check for and optionally request placement optimization. Database will be updated if placement activated
935 stage[2] = "Waiting for Placement."
936 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
937 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
938 for vnfr in db_vnfrs.values():
939 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
940 break
941 else:
942 ns_params["vimAccountId"] == vnfr["vim-account-id"]
943
944 return await self._instantiate_ng_ro(logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs,
945 db_vnfds, n2vc_key_list, stage, start_deploy, timeout_ns_deploy)
946 except Exception as e:
947 stage[2] = "ERROR deploying at VIM"
948 self.set_vnfr_at_error(db_vnfrs, str(e))
949 self.logger.error("Error deploying at VIM {}".format(e),
950 exc_info=not isinstance(e, (ROclient.ROClientException, LcmException, DbException,
951 NgRoException)))
952 raise
953
954 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
955 """
956 Wait for kdu to be up, get ip address
957 :param logging_text: prefix use for logging
958 :param nsr_id:
959 :param vnfr_id:
960 :param kdu_name:
961 :return: IP address
962 """
963
964 # self.logger.debug(logging_text + "Starting wait_kdu_up")
965 nb_tries = 0
966
967 while nb_tries < 360:
968 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
969 kdur = next((x for x in get_iterable(db_vnfr, "kdur") if x.get("kdu-name") == kdu_name), None)
970 if not kdur:
971 raise LcmException("Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name))
972 if kdur.get("status"):
973 if kdur["status"] in ("READY", "ENABLED"):
974 return kdur.get("ip-address")
975 else:
976 raise LcmException("target KDU={} is in error state".format(kdu_name))
977
978 await asyncio.sleep(10, loop=self.loop)
979 nb_tries += 1
980 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
981
982 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
983 """
984 Wait for ip addres at RO, and optionally, insert public key in virtual machine
985 :param logging_text: prefix use for logging
986 :param nsr_id:
987 :param vnfr_id:
988 :param vdu_id:
989 :param vdu_index:
990 :param pub_key: public ssh key to inject, None to skip
991 :param user: user to apply the public ssh key
992 :return: IP address
993 """
994
995 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
996 ro_nsr_id = None
997 ip_address = None
998 nb_tries = 0
999 target_vdu_id = None
1000 ro_retries = 0
1001
1002 while True:
1003
1004 ro_retries += 1
1005 if ro_retries >= 360: # 1 hour
1006 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
1007
1008 await asyncio.sleep(10, loop=self.loop)
1009
1010 # get ip address
1011 if not target_vdu_id:
1012 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1013
1014 if not vdu_id: # for the VNF case
1015 if db_vnfr.get("status") == "ERROR":
1016 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1017 ip_address = db_vnfr.get("ip-address")
1018 if not ip_address:
1019 continue
1020 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
1021 else: # VDU case
1022 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
1023 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
1024
1025 if not vdur and len(db_vnfr.get("vdur", ())) == 1: # If only one, this should be the target vdu
1026 vdur = db_vnfr["vdur"][0]
1027 if not vdur:
1028 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id, vdu_id,
1029 vdu_index))
1030 # New generation RO stores information at "vim_info"
1031 ng_ro_status = None
1032 target_vim = None
1033 if vdur.get("vim_info"):
1034 target_vim = next(t for t in vdur["vim_info"]) # there should be only one key
1035 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1036 if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE" or ng_ro_status == "ACTIVE":
1037 ip_address = vdur.get("ip-address")
1038 if not ip_address:
1039 continue
1040 target_vdu_id = vdur["vdu-id-ref"]
1041 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1042 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1043
1044 if not target_vdu_id:
1045 continue
1046
1047 # inject public key into machine
1048 if pub_key and user:
1049 self.logger.debug(logging_text + "Inserting RO key")
1050 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1051 if vdur.get("pdu-type"):
1052 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1053 return ip_address
1054 try:
1055 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
1056 if self.ng_ro:
1057 target = {"action": {"action": "inject_ssh_key", "key": pub_key, "user": user},
1058 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1059 }
1060 desc = await self.RO.deploy(nsr_id, target)
1061 action_id = desc["action_id"]
1062 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1063 break
1064 else:
1065 # wait until NS is deployed at RO
1066 if not ro_nsr_id:
1067 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1068 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
1069 if not ro_nsr_id:
1070 continue
1071 result_dict = await self.RO.create_action(
1072 item="ns",
1073 item_id_name=ro_nsr_id,
1074 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
1075 )
1076 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1077 if not result_dict or not isinstance(result_dict, dict):
1078 raise LcmException("Unknown response from RO when injecting key")
1079 for result in result_dict.values():
1080 if result.get("vim_result") == 200:
1081 break
1082 else:
1083 raise ROclient.ROClientException("error injecting key: {}".format(
1084 result.get("description")))
1085 break
1086 except NgRoException as e:
1087 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1088 except ROclient.ROClientException as e:
1089 if not nb_tries:
1090 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1091 format(e, 20*10))
1092 nb_tries += 1
1093 if nb_tries >= 20:
1094 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1095 else:
1096 break
1097
1098 return ip_address
1099
1100 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1101 """
1102 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1103 """
1104 my_vca = vca_deployed_list[vca_index]
1105 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1106 # vdu or kdu: no dependencies
1107 return
1108 timeout = 300
1109 while timeout >= 0:
1110 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1111 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1112 configuration_status_list = db_nsr["configurationStatus"]
1113 for index, vca_deployed in enumerate(configuration_status_list):
1114 if index == vca_index:
1115 # myself
1116 continue
1117 if not my_vca.get("member-vnf-index") or \
1118 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1119 internal_status = configuration_status_list[index].get("status")
1120 if internal_status == 'READY':
1121 continue
1122 elif internal_status == 'BROKEN':
1123 raise LcmException("Configuration aborted because dependent charm/s has failed")
1124 else:
1125 break
1126 else:
1127 # no dependencies, return
1128 return
1129 await asyncio.sleep(10)
1130 timeout -= 1
1131
1132 raise LcmException("Configuration aborted because dependent charm/s timeout")
1133
1134 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
1135 config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name,
1136 ee_config_descriptor):
1137 nsr_id = db_nsr["_id"]
1138 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1139 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1140 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1141 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1142 db_dict = {
1143 'collection': 'nsrs',
1144 'filter': {'_id': nsr_id},
1145 'path': db_update_entry
1146 }
1147 step = ""
1148 try:
1149
1150 element_type = 'NS'
1151 element_under_configuration = nsr_id
1152
1153 vnfr_id = None
1154 if db_vnfr:
1155 vnfr_id = db_vnfr["_id"]
1156 osm_config["osm"]["vnf_id"] = vnfr_id
1157
1158 namespace = "{nsi}.{ns}".format(
1159 nsi=nsi_id if nsi_id else "",
1160 ns=nsr_id)
1161
1162 if vnfr_id:
1163 element_type = 'VNF'
1164 element_under_configuration = vnfr_id
1165 namespace += ".{}-{}".format(vnfr_id, vdu_index or 0)
1166 if vdu_id:
1167 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1168 element_type = 'VDU'
1169 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1170 osm_config["osm"]["vdu_id"] = vdu_id
1171 elif kdu_name:
1172 namespace += ".{}.{}".format(kdu_name, vdu_index or 0)
1173 element_type = 'KDU'
1174 element_under_configuration = kdu_name
1175 osm_config["osm"]["kdu_name"] = kdu_name
1176
1177 # Get artifact path
1178 artifact_path = "{}/{}/{}/{}".format(
1179 base_folder["folder"],
1180 base_folder["pkg-dir"],
1181 "charms" if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1182 vca_name
1183 )
1184
1185 self.logger.debug("Artifact path > {}".format(artifact_path))
1186
1187 # get initial_config_primitive_list that applies to this element
1188 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1189
1190 self.logger.debug("Initial config primitive list > {}".format(initial_config_primitive_list))
1191
1192 # add config if not present for NS charm
1193 ee_descriptor_id = ee_config_descriptor.get("id")
1194 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1195 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(initial_config_primitive_list,
1196 vca_deployed, ee_descriptor_id)
1197
1198 self.logger.debug("Initial config primitive list #2 > {}".format(initial_config_primitive_list))
1199 # n2vc_redesign STEP 3.1
1200 # find old ee_id if exists
1201 ee_id = vca_deployed.get("ee_id")
1202
1203 vim_account_id = (
1204 deep_get(db_vnfr, ("vim-account-id",)) or
1205 deep_get(deploy_params, ("OSM", "vim_account_id"))
1206 )
1207 vca_cloud, vca_cloud_credential = self.get_vca_cloud_and_credentials(vim_account_id)
1208 vca_k8s_cloud, vca_k8s_cloud_credential = self.get_vca_k8s_cloud_and_credentials(vim_account_id)
1209 # create or register execution environment in VCA
1210 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1211
1212 self._write_configuration_status(
1213 nsr_id=nsr_id,
1214 vca_index=vca_index,
1215 status='CREATING',
1216 element_under_configuration=element_under_configuration,
1217 element_type=element_type
1218 )
1219
1220 step = "create execution environment"
1221 self.logger.debug(logging_text + step)
1222
1223 ee_id = None
1224 credentials = None
1225 if vca_type == "k8s_proxy_charm":
1226 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1227 charm_name=artifact_path[artifact_path.rfind("/") + 1:],
1228 namespace=namespace,
1229 artifact_path=artifact_path,
1230 db_dict=db_dict,
1231 cloud_name=vca_k8s_cloud,
1232 credential_name=vca_k8s_cloud_credential,
1233 )
1234 elif vca_type == "helm" or vca_type == "helm-v3":
1235 ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
1236 namespace=namespace,
1237 reuse_ee_id=ee_id,
1238 db_dict=db_dict,
1239 config=osm_config,
1240 artifact_path=artifact_path,
1241 vca_type=vca_type
1242 )
1243 else:
1244 ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
1245 namespace=namespace,
1246 reuse_ee_id=ee_id,
1247 db_dict=db_dict,
1248 cloud_name=vca_cloud,
1249 credential_name=vca_cloud_credential,
1250 )
1251
1252 elif vca_type == "native_charm":
1253 step = "Waiting to VM being up and getting IP address"
1254 self.logger.debug(logging_text + step)
1255 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1256 user=None, pub_key=None)
1257 credentials = {"hostname": rw_mgmt_ip}
1258 # get username
1259 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1260 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1261 # merged. Meanwhile let's get username from initial-config-primitive
1262 if not username and initial_config_primitive_list:
1263 for config_primitive in initial_config_primitive_list:
1264 for param in config_primitive.get("parameter", ()):
1265 if param["name"] == "ssh-username":
1266 username = param["value"]
1267 break
1268 if not username:
1269 raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
1270 "'config-access.ssh-access.default-user'")
1271 credentials["username"] = username
1272 # n2vc_redesign STEP 3.2
1273
1274 self._write_configuration_status(
1275 nsr_id=nsr_id,
1276 vca_index=vca_index,
1277 status='REGISTERING',
1278 element_under_configuration=element_under_configuration,
1279 element_type=element_type
1280 )
1281
1282 step = "register execution environment {}".format(credentials)
1283 self.logger.debug(logging_text + step)
1284 ee_id = await self.vca_map[vca_type].register_execution_environment(
1285 credentials=credentials,
1286 namespace=namespace,
1287 db_dict=db_dict,
1288 cloud_name=vca_cloud,
1289 credential_name=vca_cloud_credential,
1290 )
1291
1292 # for compatibility with MON/POL modules, the need model and application name at database
1293 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1294 ee_id_parts = ee_id.split('.')
1295 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1296 if len(ee_id_parts) >= 2:
1297 model_name = ee_id_parts[0]
1298 application_name = ee_id_parts[1]
1299 db_nsr_update[db_update_entry + "model"] = model_name
1300 db_nsr_update[db_update_entry + "application"] = application_name
1301
1302 # n2vc_redesign STEP 3.3
1303 step = "Install configuration Software"
1304
1305 self._write_configuration_status(
1306 nsr_id=nsr_id,
1307 vca_index=vca_index,
1308 status='INSTALLING SW',
1309 element_under_configuration=element_under_configuration,
1310 element_type=element_type,
1311 other_update=db_nsr_update
1312 )
1313
1314 # TODO check if already done
1315 self.logger.debug(logging_text + step)
1316 config = None
1317 if vca_type == "native_charm":
1318 config_primitive = next((p for p in initial_config_primitive_list if p["name"] == "config"), None)
1319 if config_primitive:
1320 config = self._map_primitive_params(
1321 config_primitive,
1322 {},
1323 deploy_params
1324 )
1325 num_units = 1
1326 if vca_type == "lxc_proxy_charm":
1327 if element_type == "NS":
1328 num_units = db_nsr.get("config-units") or 1
1329 elif element_type == "VNF":
1330 num_units = db_vnfr.get("config-units") or 1
1331 elif element_type == "VDU":
1332 for v in db_vnfr["vdur"]:
1333 if vdu_id == v["vdu-id-ref"]:
1334 num_units = v.get("config-units") or 1
1335 break
1336 if vca_type != "k8s_proxy_charm":
1337 await self.vca_map[vca_type].install_configuration_sw(
1338 ee_id=ee_id,
1339 artifact_path=artifact_path,
1340 db_dict=db_dict,
1341 config=config,
1342 num_units=num_units,
1343 )
1344
1345 # write in db flag of configuration_sw already installed
1346 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1347
1348 # add relations for this VCA (wait for other peers related with this VCA)
1349 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id,
1350 vca_index=vca_index, vca_type=vca_type)
1351
1352 # if SSH access is required, then get execution environment SSH public
1353 # if native charm we have waited already to VM be UP
1354 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1355 pub_key = None
1356 user = None
1357 # self.logger.debug("get ssh key block")
1358 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1359 # self.logger.debug("ssh key needed")
1360 # Needed to inject a ssh key
1361 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1362 step = "Install configuration Software, getting public ssh key"
1363 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1364
1365 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1366 else:
1367 # self.logger.debug("no need to get ssh key")
1368 step = "Waiting to VM being up and getting IP address"
1369 self.logger.debug(logging_text + step)
1370
1371 # n2vc_redesign STEP 5.1
1372 # wait for RO (ip-address) Insert pub_key into VM
1373 if vnfr_id:
1374 if kdu_name:
1375 rw_mgmt_ip = await self.wait_kdu_up(logging_text, nsr_id, vnfr_id, kdu_name)
1376 else:
1377 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id,
1378 vdu_index, user=user, pub_key=pub_key)
1379 else:
1380 rw_mgmt_ip = None # This is for a NS configuration
1381
1382 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1383
1384 # store rw_mgmt_ip in deploy params for later replacement
1385 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1386
1387 # n2vc_redesign STEP 6 Execute initial config primitive
1388 step = 'execute initial config primitive'
1389
1390 # wait for dependent primitives execution (NS -> VNF -> VDU)
1391 if initial_config_primitive_list:
1392 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1393
1394 # stage, in function of element type: vdu, kdu, vnf or ns
1395 my_vca = vca_deployed_list[vca_index]
1396 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1397 # VDU or KDU
1398 stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1399 elif my_vca.get("member-vnf-index"):
1400 # VNF
1401 stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1402 else:
1403 # NS
1404 stage[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1405
1406 self._write_configuration_status(
1407 nsr_id=nsr_id,
1408 vca_index=vca_index,
1409 status='EXECUTING PRIMITIVE'
1410 )
1411
1412 self._write_op_status(
1413 op_id=nslcmop_id,
1414 stage=stage
1415 )
1416
1417 check_if_terminated_needed = True
1418 for initial_config_primitive in initial_config_primitive_list:
1419 # adding information on the vca_deployed if it is a NS execution environment
1420 if not vca_deployed["member-vnf-index"]:
1421 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1422 # TODO check if already done
1423 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1424
1425 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1426 self.logger.debug(logging_text + step)
1427 await self.vca_map[vca_type].exec_primitive(
1428 ee_id=ee_id,
1429 primitive_name=initial_config_primitive["name"],
1430 params_dict=primitive_params_,
1431 db_dict=db_dict
1432 )
1433 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1434 if check_if_terminated_needed:
1435 if config_descriptor.get('terminate-config-primitive'):
1436 self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True})
1437 check_if_terminated_needed = False
1438
1439 # TODO register in database that primitive is done
1440
1441 # STEP 7 Configure metrics
1442 if vca_type == "helm" or vca_type == "helm-v3":
1443 prometheus_jobs = await self.add_prometheus_metrics(
1444 ee_id=ee_id,
1445 artifact_path=artifact_path,
1446 ee_config_descriptor=ee_config_descriptor,
1447 vnfr_id=vnfr_id,
1448 nsr_id=nsr_id,
1449 target_ip=rw_mgmt_ip,
1450 )
1451 if prometheus_jobs:
1452 self.update_db_2("nsrs", nsr_id, {db_update_entry + "prometheus_jobs": prometheus_jobs})
1453
1454 step = "instantiated at VCA"
1455 self.logger.debug(logging_text + step)
1456
1457 self._write_configuration_status(
1458 nsr_id=nsr_id,
1459 vca_index=vca_index,
1460 status='READY'
1461 )
1462
1463 except Exception as e: # TODO not use Exception but N2VC exception
1464 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1465 if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)):
1466 self.logger.error("Exception while {} : {}".format(step, e), exc_info=True)
1467 self._write_configuration_status(
1468 nsr_id=nsr_id,
1469 vca_index=vca_index,
1470 status='BROKEN'
1471 )
1472 raise LcmException("{} {}".format(step, e)) from e
1473
1474 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1475 error_description: str = None, error_detail: str = None, other_update: dict = None):
1476 """
1477 Update db_nsr fields.
1478 :param nsr_id:
1479 :param ns_state:
1480 :param current_operation:
1481 :param current_operation_id:
1482 :param error_description:
1483 :param error_detail:
1484 :param other_update: Other required changes at database if provided, will be cleared
1485 :return:
1486 """
1487 try:
1488 db_dict = other_update or {}
1489 db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility
1490 db_dict["_admin.current-operation"] = current_operation_id
1491 db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None
1492 db_dict["currentOperation"] = current_operation
1493 db_dict["currentOperationID"] = current_operation_id
1494 db_dict["errorDescription"] = error_description
1495 db_dict["errorDetail"] = error_detail
1496
1497 if ns_state:
1498 db_dict["nsState"] = ns_state
1499 self.update_db_2("nsrs", nsr_id, db_dict)
1500 except DbException as e:
1501 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1502
1503 def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0,
1504 operation_state: str = None, other_update: dict = None):
1505 try:
1506 db_dict = other_update or {}
1507 db_dict['queuePosition'] = queuePosition
1508 if isinstance(stage, list):
1509 db_dict['stage'] = stage[0]
1510 db_dict['detailed-status'] = " ".join(stage)
1511 elif stage is not None:
1512 db_dict['stage'] = str(stage)
1513
1514 if error_message is not None:
1515 db_dict['errorMessage'] = error_message
1516 if operation_state is not None:
1517 db_dict['operationState'] = operation_state
1518 db_dict["statusEnteredTime"] = time()
1519 self.update_db_2("nslcmops", op_id, db_dict)
1520 except DbException as e:
1521 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1522
1523 def _write_all_config_status(self, db_nsr: dict, status: str):
1524 try:
1525 nsr_id = db_nsr["_id"]
1526 # configurationStatus
1527 config_status = db_nsr.get('configurationStatus')
1528 if config_status:
1529 db_nsr_update = {"configurationStatus.{}.status".format(index): status for index, v in
1530 enumerate(config_status) if v}
1531 # update status
1532 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1533
1534 except DbException as e:
1535 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1536
1537 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1538 element_under_configuration: str = None, element_type: str = None,
1539 other_update: dict = None):
1540
1541 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1542 # .format(vca_index, status))
1543
1544 try:
1545 db_path = 'configurationStatus.{}.'.format(vca_index)
1546 db_dict = other_update or {}
1547 if status:
1548 db_dict[db_path + 'status'] = status
1549 if element_under_configuration:
1550 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1551 if element_type:
1552 db_dict[db_path + 'elementType'] = element_type
1553 self.update_db_2("nsrs", nsr_id, db_dict)
1554 except DbException as e:
1555 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1556 .format(status, nsr_id, vca_index, e))
1557
1558 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
1559 """
1560 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1561 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1562 Database is used because the result can be obtained from a different LCM worker in case of HA.
1563 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1564 :param db_nslcmop: database content of nslcmop
1565 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1566 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
1567 computed 'vim-account-id'
1568 """
1569 modified = False
1570 nslcmop_id = db_nslcmop['_id']
1571 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
1572 if placement_engine == "PLA":
1573 self.logger.debug(logging_text + "Invoke and wait for placement optimization")
1574 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id}, loop=self.loop)
1575 db_poll_interval = 5
1576 wait = db_poll_interval * 10
1577 pla_result = None
1578 while not pla_result and wait >= 0:
1579 await asyncio.sleep(db_poll_interval)
1580 wait -= db_poll_interval
1581 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1582 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
1583
1584 if not pla_result:
1585 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id))
1586
1587 for pla_vnf in pla_result['vnf']:
1588 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
1589 if not pla_vnf.get('vimAccountId') or not vnfr:
1590 continue
1591 modified = True
1592 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
1593 # Modifies db_vnfrs
1594 vnfr["vim-account-id"] = pla_vnf['vimAccountId']
1595 return modified
1596
1597 def update_nsrs_with_pla_result(self, params):
1598 try:
1599 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
1600 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
1601 except Exception as e:
1602 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
1603
1604 async def instantiate(self, nsr_id, nslcmop_id):
1605 """
1606
1607 :param nsr_id: ns instance to deploy
1608 :param nslcmop_id: operation to run
1609 :return:
1610 """
1611
1612 # Try to lock HA task here
1613 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1614 if not task_is_locked_by_me:
1615 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1616 return
1617
1618 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1619 self.logger.debug(logging_text + "Enter")
1620
1621 # get all needed from database
1622
1623 # database nsrs record
1624 db_nsr = None
1625
1626 # database nslcmops record
1627 db_nslcmop = None
1628
1629 # update operation on nsrs
1630 db_nsr_update = {}
1631 # update operation on nslcmops
1632 db_nslcmop_update = {}
1633
1634 nslcmop_operation_state = None
1635 db_vnfrs = {} # vnf's info indexed by member-index
1636 # n2vc_info = {}
1637 tasks_dict_info = {} # from task to info text
1638 exc = None
1639 error_list = []
1640 stage = ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1641 # ^ stage, step, VIM progress
1642 try:
1643 # wait for any previous tasks in process
1644 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1645
1646 stage[1] = "Sync filesystem from database."
1647 self.fs.sync() # TODO, make use of partial sync, only for the needed packages
1648
1649 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1650 stage[1] = "Reading from database."
1651 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1652 db_nsr_update["detailed-status"] = "creating"
1653 db_nsr_update["operational-status"] = "init"
1654 self._write_ns_status(
1655 nsr_id=nsr_id,
1656 ns_state="BUILDING",
1657 current_operation="INSTANTIATING",
1658 current_operation_id=nslcmop_id,
1659 other_update=db_nsr_update
1660 )
1661 self._write_op_status(
1662 op_id=nslcmop_id,
1663 stage=stage,
1664 queuePosition=0
1665 )
1666
1667 # read from db: operation
1668 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
1669 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1670 ns_params = db_nslcmop.get("operationParams")
1671 if ns_params and ns_params.get("timeout_ns_deploy"):
1672 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1673 else:
1674 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1675
1676 # read from db: ns
1677 stage[1] = "Getting nsr={} from db.".format(nsr_id)
1678 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1679 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
1680 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
1681 db_nsr["nsd"] = nsd
1682 # nsr_name = db_nsr["name"] # TODO short-name??
1683
1684 # read from db: vnf's of this ns
1685 stage[1] = "Getting vnfrs from db."
1686 self.logger.debug(logging_text + stage[1])
1687 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1688
1689 # read from db: vnfd's for every vnf
1690 db_vnfds = [] # every vnfd data
1691
1692 # for each vnf in ns, read vnfd
1693 for vnfr in db_vnfrs_list:
1694 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
1695 vnfd_id = vnfr["vnfd-id"]
1696 vnfd_ref = vnfr["vnfd-ref"]
1697
1698 # if we haven't this vnfd, read it from db
1699 if vnfd_id not in db_vnfds:
1700 # read from db
1701 stage[1] = "Getting vnfd={} id='{}' from db.".format(vnfd_id, vnfd_ref)
1702 self.logger.debug(logging_text + stage[1])
1703 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1704
1705 # store vnfd
1706 db_vnfds.append(vnfd)
1707
1708 # Get or generates the _admin.deployed.VCA list
1709 vca_deployed_list = None
1710 if db_nsr["_admin"].get("deployed"):
1711 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1712 if vca_deployed_list is None:
1713 vca_deployed_list = []
1714 configuration_status_list = []
1715 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1716 db_nsr_update["configurationStatus"] = configuration_status_list
1717 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1718 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1719 elif isinstance(vca_deployed_list, dict):
1720 # maintain backward compatibility. Change a dict to list at database
1721 vca_deployed_list = list(vca_deployed_list.values())
1722 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1723 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1724
1725 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1726 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1727 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1728
1729 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1730 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1731 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1732 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"})
1733
1734 # n2vc_redesign STEP 2 Deploy Network Scenario
1735 stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
1736 self._write_op_status(
1737 op_id=nslcmop_id,
1738 stage=stage
1739 )
1740
1741 stage[1] = "Deploying KDUs."
1742 # self.logger.debug(logging_text + "Before deploy_kdus")
1743 # Call to deploy_kdus in case exists the "vdu:kdu" param
1744 await self.deploy_kdus(
1745 logging_text=logging_text,
1746 nsr_id=nsr_id,
1747 nslcmop_id=nslcmop_id,
1748 db_vnfrs=db_vnfrs,
1749 db_vnfds=db_vnfds,
1750 task_instantiation_info=tasks_dict_info,
1751 )
1752
1753 stage[1] = "Getting VCA public key."
1754 # n2vc_redesign STEP 1 Get VCA public ssh-key
1755 # feature 1429. Add n2vc public key to needed VMs
1756 n2vc_key = self.n2vc.get_public_key()
1757 n2vc_key_list = [n2vc_key]
1758 if self.vca_config.get("public_key"):
1759 n2vc_key_list.append(self.vca_config["public_key"])
1760
1761 stage[1] = "Deploying NS at VIM."
1762 task_ro = asyncio.ensure_future(
1763 self.instantiate_RO(
1764 logging_text=logging_text,
1765 nsr_id=nsr_id,
1766 nsd=nsd,
1767 db_nsr=db_nsr,
1768 db_nslcmop=db_nslcmop,
1769 db_vnfrs=db_vnfrs,
1770 db_vnfds=db_vnfds,
1771 n2vc_key_list=n2vc_key_list,
1772 stage=stage
1773 )
1774 )
1775 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1776 tasks_dict_info[task_ro] = "Deploying at VIM"
1777
1778 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1779 stage[1] = "Deploying Execution Environments."
1780 self.logger.debug(logging_text + stage[1])
1781
1782 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1783 for vnf_profile in get_vnf_profiles(nsd):
1784 vnfd_id = vnf_profile["vnfd-id"]
1785 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
1786 member_vnf_index = str(vnf_profile["id"])
1787 db_vnfr = db_vnfrs[member_vnf_index]
1788 base_folder = vnfd["_admin"]["storage"]
1789 vdu_id = None
1790 vdu_index = 0
1791 vdu_name = None
1792 kdu_name = None
1793
1794 # Get additional parameters
1795 deploy_params = {"OSM": get_osm_params(db_vnfr)}
1796 if db_vnfr.get("additionalParamsForVnf"):
1797 deploy_params.update(parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy()))
1798
1799 descriptor_config = get_configuration(vnfd, vnfd["id"])
1800 if descriptor_config:
1801 self._deploy_n2vc(
1802 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1803 db_nsr=db_nsr,
1804 db_vnfr=db_vnfr,
1805 nslcmop_id=nslcmop_id,
1806 nsr_id=nsr_id,
1807 nsi_id=nsi_id,
1808 vnfd_id=vnfd_id,
1809 vdu_id=vdu_id,
1810 kdu_name=kdu_name,
1811 member_vnf_index=member_vnf_index,
1812 vdu_index=vdu_index,
1813 vdu_name=vdu_name,
1814 deploy_params=deploy_params,
1815 descriptor_config=descriptor_config,
1816 base_folder=base_folder,
1817 task_instantiation_info=tasks_dict_info,
1818 stage=stage
1819 )
1820
1821 # Deploy charms for each VDU that supports one.
1822 for vdud in get_vdu_list(vnfd):
1823 vdu_id = vdud["id"]
1824 descriptor_config = get_configuration(vnfd, vdu_id)
1825 vdur = find_in_list(db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id)
1826
1827 if vdur.get("additionalParams"):
1828 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
1829 else:
1830 deploy_params_vdu = deploy_params
1831 deploy_params_vdu["OSM"] = get_osm_params(db_vnfr, vdu_id, vdu_count_index=0)
1832 vdud_count = get_vdu_profile(vnfd, vdu_id).get("max-number-of-instances", 1)
1833
1834 self.logger.debug("VDUD > {}".format(vdud))
1835 self.logger.debug("Descriptor config > {}".format(descriptor_config))
1836 if descriptor_config:
1837 vdu_name = None
1838 kdu_name = None
1839 for vdu_index in range(vdud_count):
1840 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1841 self._deploy_n2vc(
1842 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1843 member_vnf_index, vdu_id, vdu_index),
1844 db_nsr=db_nsr,
1845 db_vnfr=db_vnfr,
1846 nslcmop_id=nslcmop_id,
1847 nsr_id=nsr_id,
1848 nsi_id=nsi_id,
1849 vnfd_id=vnfd_id,
1850 vdu_id=vdu_id,
1851 kdu_name=kdu_name,
1852 member_vnf_index=member_vnf_index,
1853 vdu_index=vdu_index,
1854 vdu_name=vdu_name,
1855 deploy_params=deploy_params_vdu,
1856 descriptor_config=descriptor_config,
1857 base_folder=base_folder,
1858 task_instantiation_info=tasks_dict_info,
1859 stage=stage
1860 )
1861 for kdud in get_kdu_list(vnfd):
1862 kdu_name = kdud["name"]
1863 descriptor_config = get_configuration(vnfd, kdu_name)
1864 if descriptor_config:
1865 vdu_id = None
1866 vdu_index = 0
1867 vdu_name = None
1868 kdur = next(x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name)
1869 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
1870 if kdur.get("additionalParams"):
1871 deploy_params_kdu = parse_yaml_strings(kdur["additionalParams"])
1872
1873 self._deploy_n2vc(
1874 logging_text=logging_text,
1875 db_nsr=db_nsr,
1876 db_vnfr=db_vnfr,
1877 nslcmop_id=nslcmop_id,
1878 nsr_id=nsr_id,
1879 nsi_id=nsi_id,
1880 vnfd_id=vnfd_id,
1881 vdu_id=vdu_id,
1882 kdu_name=kdu_name,
1883 member_vnf_index=member_vnf_index,
1884 vdu_index=vdu_index,
1885 vdu_name=vdu_name,
1886 deploy_params=deploy_params_kdu,
1887 descriptor_config=descriptor_config,
1888 base_folder=base_folder,
1889 task_instantiation_info=tasks_dict_info,
1890 stage=stage
1891 )
1892
1893 # Check if this NS has a charm configuration
1894 descriptor_config = nsd.get("ns-configuration")
1895 if descriptor_config and descriptor_config.get("juju"):
1896 vnfd_id = None
1897 db_vnfr = None
1898 member_vnf_index = None
1899 vdu_id = None
1900 kdu_name = None
1901 vdu_index = 0
1902 vdu_name = None
1903
1904 # Get additional parameters
1905 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
1906 if db_nsr.get("additionalParamsForNs"):
1907 deploy_params.update(parse_yaml_strings(db_nsr["additionalParamsForNs"].copy()))
1908 base_folder = nsd["_admin"]["storage"]
1909 self._deploy_n2vc(
1910 logging_text=logging_text,
1911 db_nsr=db_nsr,
1912 db_vnfr=db_vnfr,
1913 nslcmop_id=nslcmop_id,
1914 nsr_id=nsr_id,
1915 nsi_id=nsi_id,
1916 vnfd_id=vnfd_id,
1917 vdu_id=vdu_id,
1918 kdu_name=kdu_name,
1919 member_vnf_index=member_vnf_index,
1920 vdu_index=vdu_index,
1921 vdu_name=vdu_name,
1922 deploy_params=deploy_params,
1923 descriptor_config=descriptor_config,
1924 base_folder=base_folder,
1925 task_instantiation_info=tasks_dict_info,
1926 stage=stage
1927 )
1928
1929 # rest of staff will be done at finally
1930
1931 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
1932 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e))
1933 exc = e
1934 except asyncio.CancelledError:
1935 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
1936 exc = "Operation was cancelled"
1937 except Exception as e:
1938 exc = traceback.format_exc()
1939 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
1940 finally:
1941 if exc:
1942 error_list.append(str(exc))
1943 try:
1944 # wait for pending tasks
1945 if tasks_dict_info:
1946 stage[1] = "Waiting for instantiate pending tasks."
1947 self.logger.debug(logging_text + stage[1])
1948 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy,
1949 stage, nslcmop_id, nsr_id=nsr_id)
1950 stage[1] = stage[2] = ""
1951 except asyncio.CancelledError:
1952 error_list.append("Cancelled")
1953 # TODO cancel all tasks
1954 except Exception as exc:
1955 error_list.append(str(exc))
1956
1957 # update operation-status
1958 db_nsr_update["operational-status"] = "running"
1959 # let's begin with VCA 'configured' status (later we can change it)
1960 db_nsr_update["config-status"] = "configured"
1961 for task, task_name in tasks_dict_info.items():
1962 if not task.done() or task.cancelled() or task.exception():
1963 if task_name.startswith(self.task_name_deploy_vca):
1964 # A N2VC task is pending
1965 db_nsr_update["config-status"] = "failed"
1966 else:
1967 # RO or KDU task is pending
1968 db_nsr_update["operational-status"] = "failed"
1969
1970 # update status at database
1971 if error_list:
1972 error_detail = ". ".join(error_list)
1973 self.logger.error(logging_text + error_detail)
1974 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
1975 error_description_nsr = 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id, stage[0])
1976
1977 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
1978 db_nslcmop_update["detailed-status"] = error_detail
1979 nslcmop_operation_state = "FAILED"
1980 ns_state = "BROKEN"
1981 else:
1982 error_detail = None
1983 error_description_nsr = error_description_nslcmop = None
1984 ns_state = "READY"
1985 db_nsr_update["detailed-status"] = "Done"
1986 db_nslcmop_update["detailed-status"] = "Done"
1987 nslcmop_operation_state = "COMPLETED"
1988
1989 if db_nsr:
1990 self._write_ns_status(
1991 nsr_id=nsr_id,
1992 ns_state=ns_state,
1993 current_operation="IDLE",
1994 current_operation_id=None,
1995 error_description=error_description_nsr,
1996 error_detail=error_detail,
1997 other_update=db_nsr_update
1998 )
1999 self._write_op_status(
2000 op_id=nslcmop_id,
2001 stage="",
2002 error_message=error_description_nslcmop,
2003 operation_state=nslcmop_operation_state,
2004 other_update=db_nslcmop_update,
2005 )
2006
2007 if nslcmop_operation_state:
2008 try:
2009 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2010 "operationState": nslcmop_operation_state},
2011 loop=self.loop)
2012 except Exception as e:
2013 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2014
2015 self.logger.debug(logging_text + "Exit")
2016 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2017
2018 async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int,
2019 timeout: int = 3600, vca_type: str = None) -> bool:
2020
2021 # steps:
2022 # 1. find all relations for this VCA
2023 # 2. wait for other peers related
2024 # 3. add relations
2025
2026 try:
2027 vca_type = vca_type or "lxc_proxy_charm"
2028
2029 # STEP 1: find all relations for this VCA
2030
2031 # read nsr record
2032 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2033 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2034
2035 # this VCA data
2036 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
2037
2038 # read all ns-configuration relations
2039 ns_relations = list()
2040 db_ns_relations = deep_get(nsd, ('ns-configuration', 'relation'))
2041 if db_ns_relations:
2042 for r in db_ns_relations:
2043 # check if this VCA is in the relation
2044 if my_vca.get('member-vnf-index') in\
2045 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2046 ns_relations.append(r)
2047
2048 # read all vnf-configuration relations
2049 vnf_relations = list()
2050 db_vnfd_list = db_nsr.get('vnfd-id')
2051 if db_vnfd_list:
2052 for vnfd in db_vnfd_list:
2053 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
2054 db_vnf_relations = get_configuration(db_vnfd, db_vnfd["id"]).get("relation", [])
2055 if db_vnf_relations:
2056 for r in db_vnf_relations:
2057 # check if this VCA is in the relation
2058 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2059 vnf_relations.append(r)
2060
2061 # if no relations, terminate
2062 if not ns_relations and not vnf_relations:
2063 self.logger.debug(logging_text + ' No relations')
2064 return True
2065
2066 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
2067
2068 # add all relations
2069 start = time()
2070 while True:
2071 # check timeout
2072 now = time()
2073 if now - start >= timeout:
2074 self.logger.error(logging_text + ' : timeout adding relations')
2075 return False
2076
2077 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2078 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2079
2080 # for each defined NS relation, find the VCA's related
2081 for r in ns_relations.copy():
2082 from_vca_ee_id = None
2083 to_vca_ee_id = None
2084 from_vca_endpoint = None
2085 to_vca_endpoint = None
2086 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2087 for vca in vca_list:
2088 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
2089 and vca.get('config_sw_installed'):
2090 from_vca_ee_id = vca.get('ee_id')
2091 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2092 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
2093 and vca.get('config_sw_installed'):
2094 to_vca_ee_id = vca.get('ee_id')
2095 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2096 if from_vca_ee_id and to_vca_ee_id:
2097 # add relation
2098 await self.vca_map[vca_type].add_relation(
2099 ee_id_1=from_vca_ee_id,
2100 ee_id_2=to_vca_ee_id,
2101 endpoint_1=from_vca_endpoint,
2102 endpoint_2=to_vca_endpoint)
2103 # remove entry from relations list
2104 ns_relations.remove(r)
2105 else:
2106 # check failed peers
2107 try:
2108 vca_status_list = db_nsr.get('configurationStatus')
2109 if vca_status_list:
2110 for i in range(len(vca_list)):
2111 vca = vca_list[i]
2112 vca_status = vca_status_list[i]
2113 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
2114 if vca_status.get('status') == 'BROKEN':
2115 # peer broken: remove relation from list
2116 ns_relations.remove(r)
2117 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
2118 if vca_status.get('status') == 'BROKEN':
2119 # peer broken: remove relation from list
2120 ns_relations.remove(r)
2121 except Exception:
2122 # ignore
2123 pass
2124
2125 # for each defined VNF relation, find the VCA's related
2126 for r in vnf_relations.copy():
2127 from_vca_ee_id = None
2128 to_vca_ee_id = None
2129 from_vca_endpoint = None
2130 to_vca_endpoint = None
2131 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2132 for vca in vca_list:
2133 key_to_check = "vdu_id"
2134 if vca.get("vdu_id") is None:
2135 key_to_check = "vnfd_id"
2136 if vca.get(key_to_check) == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
2137 from_vca_ee_id = vca.get('ee_id')
2138 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2139 if vca.get(key_to_check) == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
2140 to_vca_ee_id = vca.get('ee_id')
2141 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2142 if from_vca_ee_id and to_vca_ee_id:
2143 # add relation
2144 await self.vca_map[vca_type].add_relation(
2145 ee_id_1=from_vca_ee_id,
2146 ee_id_2=to_vca_ee_id,
2147 endpoint_1=from_vca_endpoint,
2148 endpoint_2=to_vca_endpoint)
2149 # remove entry from relations list
2150 vnf_relations.remove(r)
2151 else:
2152 # check failed peers
2153 try:
2154 vca_status_list = db_nsr.get('configurationStatus')
2155 if vca_status_list:
2156 for i in range(len(vca_list)):
2157 vca = vca_list[i]
2158 vca_status = vca_status_list[i]
2159 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
2160 if vca_status.get('status') == 'BROKEN':
2161 # peer broken: remove relation from list
2162 vnf_relations.remove(r)
2163 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2164 if vca_status.get('status') == 'BROKEN':
2165 # peer broken: remove relation from list
2166 vnf_relations.remove(r)
2167 except Exception:
2168 # ignore
2169 pass
2170
2171 # wait for next try
2172 await asyncio.sleep(5.0)
2173
2174 if not ns_relations and not vnf_relations:
2175 self.logger.debug('Relations added')
2176 break
2177
2178 return True
2179
2180 except Exception as e:
2181 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2182 return False
2183
2184 async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdud: dict,
2185 vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600):
2186
2187 try:
2188 k8sclustertype = k8s_instance_info["k8scluster-type"]
2189 # Instantiate kdu
2190 db_dict_install = {"collection": "nsrs",
2191 "filter": {"_id": nsr_id},
2192 "path": nsr_db_path}
2193
2194 kdu_instance = self.k8scluster_map[k8sclustertype].generate_kdu_instance_name(
2195 db_dict=db_dict_install,
2196 kdu_model=k8s_instance_info["kdu-model"],
2197 kdu_name=k8s_instance_info["kdu-name"],
2198 )
2199 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
2200 await self.k8scluster_map[k8sclustertype].install(
2201 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2202 kdu_model=k8s_instance_info["kdu-model"],
2203 atomic=True,
2204 params=k8params,
2205 db_dict=db_dict_install,
2206 timeout=timeout,
2207 kdu_name=k8s_instance_info["kdu-name"],
2208 namespace=k8s_instance_info["namespace"],
2209 kdu_instance=kdu_instance,
2210 )
2211 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
2212
2213 # Obtain services to obtain management service ip
2214 services = await self.k8scluster_map[k8sclustertype].get_services(
2215 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2216 kdu_instance=kdu_instance,
2217 namespace=k8s_instance_info["namespace"])
2218
2219 # Obtain management service info (if exists)
2220 vnfr_update_dict = {}
2221 if services:
2222 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
2223 mgmt_services = [service for service in kdud.get("service", []) if service.get("mgmt-service")]
2224 for mgmt_service in mgmt_services:
2225 for service in services:
2226 if service["name"].startswith(mgmt_service["name"]):
2227 # Mgmt service found, Obtain service ip
2228 ip = service.get("external_ip", service.get("cluster_ip"))
2229 if isinstance(ip, list) and len(ip) == 1:
2230 ip = ip[0]
2231
2232 vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip
2233
2234 # Check if must update also mgmt ip at the vnf
2235 service_external_cp = mgmt_service.get("external-connection-point-ref")
2236 if service_external_cp:
2237 if deep_get(vnfd, ("mgmt-interface", "cp")) == service_external_cp:
2238 vnfr_update_dict["ip-address"] = ip
2239
2240 break
2241 else:
2242 self.logger.warn("Mgmt service name: {} not found".format(mgmt_service["name"]))
2243
2244 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
2245 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
2246
2247 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
2248 if kdu_config and kdu_config.get("initial-config-primitive") and \
2249 get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None:
2250 initial_config_primitive_list = kdu_config.get("initial-config-primitive")
2251 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
2252
2253 for initial_config_primitive in initial_config_primitive_list:
2254 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, {})
2255
2256 await asyncio.wait_for(
2257 self.k8scluster_map[k8sclustertype].exec_primitive(
2258 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2259 kdu_instance=kdu_instance,
2260 primitive_name=initial_config_primitive["name"],
2261 params=primitive_params_, db_dict={}),
2262 timeout=timeout)
2263
2264 except Exception as e:
2265 # Prepare update db with error and raise exception
2266 try:
2267 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)})
2268 self.update_db_2("vnfrs", vnfr_data.get("_id"), {"kdur.{}.status".format(kdu_index): "ERROR"})
2269 except Exception:
2270 # ignore to keep original exception
2271 pass
2272 # reraise original error
2273 raise
2274
2275 return kdu_instance
2276
2277 async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
2278 # Launch kdus if present in the descriptor
2279
2280 k8scluster_id_2_uuic = {"helm-chart-v3": {}, "helm-chart": {}, "juju-bundle": {}}
2281
2282 async def _get_cluster_id(cluster_id, cluster_type):
2283 nonlocal k8scluster_id_2_uuic
2284 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2285 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2286
2287 # check if K8scluster is creating and wait look if previous tasks in process
2288 task_name, task_dependency = self.lcm_tasks.lookfor_related("k8scluster", cluster_id)
2289 if task_dependency:
2290 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name, cluster_id)
2291 self.logger.debug(logging_text + text)
2292 await asyncio.wait(task_dependency, timeout=3600)
2293
2294 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2295 if not db_k8scluster:
2296 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2297
2298 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2299 if not k8s_id:
2300 if cluster_type == "helm-chart-v3":
2301 try:
2302 # backward compatibility for existing clusters that have not been initialized for helm v3
2303 k8s_credentials = yaml.safe_dump(db_k8scluster.get("credentials"))
2304 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(k8s_credentials,
2305 reuse_cluster_uuid=cluster_id)
2306 db_k8scluster_update = {}
2307 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
2308 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
2309 db_k8scluster_update["_admin.helm-chart-v3.created"] = uninstall_sw
2310 db_k8scluster_update["_admin.helm-chart-v3.operationalState"] = "ENABLED"
2311 self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update)
2312 except Exception as e:
2313 self.logger.error(logging_text + "error initializing helm-v3 cluster: {}".format(str(e)))
2314 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id,
2315 cluster_type))
2316 else:
2317 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".
2318 format(cluster_id, cluster_type))
2319 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2320 return k8s_id
2321
2322 logging_text += "Deploy kdus: "
2323 step = ""
2324 try:
2325 db_nsr_update = {"_admin.deployed.K8s": []}
2326 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2327
2328 index = 0
2329 updated_cluster_list = []
2330 updated_v3_cluster_list = []
2331
2332 for vnfr_data in db_vnfrs.values():
2333 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
2334 # Step 0: Prepare and set parameters
2335 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
2336 vnfd_id = vnfr_data.get('vnfd-id')
2337 vnfd_with_id = find_in_list(db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id)
2338 kdud = next(kdud for kdud in vnfd_with_id["kdu"] if kdud["name"] == kdur["kdu-name"])
2339 namespace = kdur.get("k8s-namespace")
2340 if kdur.get("helm-chart"):
2341 kdumodel = kdur["helm-chart"]
2342 # Default version: helm3, if helm-version is v2 assign v2
2343 k8sclustertype = "helm-chart-v3"
2344 self.logger.debug("kdur: {}".format(kdur))
2345 if kdur.get("helm-version") and kdur.get("helm-version") == "v2":
2346 k8sclustertype = "helm-chart"
2347 elif kdur.get("juju-bundle"):
2348 kdumodel = kdur["juju-bundle"]
2349 k8sclustertype = "juju-bundle"
2350 else:
2351 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2352 "juju-bundle. Maybe an old NBI version is running".
2353 format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
2354 # check if kdumodel is a file and exists
2355 try:
2356 vnfd_with_id = find_in_list(db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id)
2357 storage = deep_get(vnfd_with_id, ('_admin', 'storage'))
2358 if storage and storage.get('pkg-dir'): # may be not present if vnfd has not artifacts
2359 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2360 filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["pkg-dir"], k8sclustertype,
2361 kdumodel)
2362 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2363 kdumodel = self.fs.path + filename
2364 except (asyncio.TimeoutError, asyncio.CancelledError):
2365 raise
2366 except Exception: # it is not a file
2367 pass
2368
2369 k8s_cluster_id = kdur["k8s-cluster"]["id"]
2370 step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
2371 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
2372
2373 # Synchronize repos
2374 if (k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list)\
2375 or (k8sclustertype == "helm-chart-v3" and cluster_uuid not in updated_v3_cluster_list):
2376 del_repo_list, added_repo_dict = await asyncio.ensure_future(
2377 self.k8scluster_map[k8sclustertype].synchronize_repos(cluster_uuid=cluster_uuid))
2378 if del_repo_list or added_repo_dict:
2379 if k8sclustertype == "helm-chart":
2380 unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
2381 updated = {'_admin.helm_charts_added.' +
2382 item: name for item, name in added_repo_dict.items()}
2383 updated_cluster_list.append(cluster_uuid)
2384 elif k8sclustertype == "helm-chart-v3":
2385 unset = {'_admin.helm_charts_v3_added.' + item: None for item in del_repo_list}
2386 updated = {'_admin.helm_charts_v3_added.' +
2387 item: name for item, name in added_repo_dict.items()}
2388 updated_v3_cluster_list.append(cluster_uuid)
2389 self.logger.debug(logging_text + "repos synchronized on k8s cluster "
2390 "'{}' to_delete: {}, to_add: {}".
2391 format(k8s_cluster_id, del_repo_list, added_repo_dict))
2392 self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
2393
2394 # Instantiate kdu
2395 step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
2396 kdur["kdu-name"], k8s_cluster_id)
2397 k8s_instance_info = {"kdu-instance": None,
2398 "k8scluster-uuid": cluster_uuid,
2399 "k8scluster-type": k8sclustertype,
2400 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
2401 "kdu-name": kdur["kdu-name"],
2402 "kdu-model": kdumodel,
2403 "namespace": namespace}
2404 db_path = "_admin.deployed.K8s.{}".format(index)
2405 db_nsr_update[db_path] = k8s_instance_info
2406 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2407 vnfd_with_id = find_in_list(db_vnfds, lambda vnf: vnf["_id"] == vnfd_id)
2408 task = asyncio.ensure_future(
2409 self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdud, vnfd_with_id,
2410 k8s_instance_info, k8params=desc_params, timeout=600))
2411 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
2412 task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
2413
2414 index += 1
2415
2416 except (LcmException, asyncio.CancelledError):
2417 raise
2418 except Exception as e:
2419 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
2420 if isinstance(e, (N2VCException, DbException)):
2421 self.logger.error(logging_text + msg)
2422 else:
2423 self.logger.critical(logging_text + msg, exc_info=True)
2424 raise LcmException(msg)
2425 finally:
2426 if db_nsr_update:
2427 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2428
2429 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2430 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2431 base_folder, task_instantiation_info, stage):
2432 # launch instantiate_N2VC in a asyncio task and register task object
2433 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2434 # if not found, create one entry and update database
2435 # fill db_nsr._admin.deployed.VCA.<index>
2436
2437 self.logger.debug(logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id))
2438 if "execution-environment-list" in descriptor_config:
2439 ee_list = descriptor_config.get("execution-environment-list", [])
2440 else: # other types as script are not supported
2441 ee_list = []
2442
2443 for ee_item in ee_list:
2444 self.logger.debug(logging_text + "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item.get('juju'),
2445 ee_item.get("helm-chart")))
2446 ee_descriptor_id = ee_item.get("id")
2447 if ee_item.get("juju"):
2448 vca_name = ee_item['juju'].get('charm')
2449 vca_type = "lxc_proxy_charm" if ee_item['juju'].get('charm') is not None else "native_charm"
2450 if ee_item['juju'].get('cloud') == "k8s":
2451 vca_type = "k8s_proxy_charm"
2452 elif ee_item['juju'].get('proxy') is False:
2453 vca_type = "native_charm"
2454 elif ee_item.get("helm-chart"):
2455 vca_name = ee_item['helm-chart']
2456 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
2457 vca_type = "helm"
2458 else:
2459 vca_type = "helm-v3"
2460 else:
2461 self.logger.debug(logging_text + "skipping non juju neither charm configuration")
2462 continue
2463
2464 vca_index = -1
2465 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2466 if not vca_deployed:
2467 continue
2468 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2469 vca_deployed.get("vdu_id") == vdu_id and \
2470 vca_deployed.get("kdu_name") == kdu_name and \
2471 vca_deployed.get("vdu_count_index", 0) == vdu_index and \
2472 vca_deployed.get("ee_descriptor_id") == ee_descriptor_id:
2473 break
2474 else:
2475 # not found, create one.
2476 target = "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
2477 if vdu_id:
2478 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
2479 elif kdu_name:
2480 target += "/kdu/{}".format(kdu_name)
2481 vca_deployed = {
2482 "target_element": target,
2483 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
2484 "member-vnf-index": member_vnf_index,
2485 "vdu_id": vdu_id,
2486 "kdu_name": kdu_name,
2487 "vdu_count_index": vdu_index,
2488 "operational-status": "init", # TODO revise
2489 "detailed-status": "", # TODO revise
2490 "step": "initial-deploy", # TODO revise
2491 "vnfd_id": vnfd_id,
2492 "vdu_name": vdu_name,
2493 "type": vca_type,
2494 "ee_descriptor_id": ee_descriptor_id
2495 }
2496 vca_index += 1
2497
2498 # create VCA and configurationStatus in db
2499 db_dict = {
2500 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2501 "configurationStatus.{}".format(vca_index): dict()
2502 }
2503 self.update_db_2("nsrs", nsr_id, db_dict)
2504
2505 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2506
2507 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
2508 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
2509 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
2510
2511 # Launch task
2512 task_n2vc = asyncio.ensure_future(
2513 self.instantiate_N2VC(
2514 logging_text=logging_text,
2515 vca_index=vca_index,
2516 nsi_id=nsi_id,
2517 db_nsr=db_nsr,
2518 db_vnfr=db_vnfr,
2519 vdu_id=vdu_id,
2520 kdu_name=kdu_name,
2521 vdu_index=vdu_index,
2522 deploy_params=deploy_params,
2523 config_descriptor=descriptor_config,
2524 base_folder=base_folder,
2525 nslcmop_id=nslcmop_id,
2526 stage=stage,
2527 vca_type=vca_type,
2528 vca_name=vca_name,
2529 ee_config_descriptor=ee_item
2530 )
2531 )
2532 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2533 task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
2534 member_vnf_index or "", vdu_id or "")
2535
2536 @staticmethod
2537 def _create_nslcmop(nsr_id, operation, params):
2538 """
2539 Creates a ns-lcm-opp content to be stored at database.
2540 :param nsr_id: internal id of the instance
2541 :param operation: instantiate, terminate, scale, action, ...
2542 :param params: user parameters for the operation
2543 :return: dictionary following SOL005 format
2544 """
2545 # Raise exception if invalid arguments
2546 if not (nsr_id and operation and params):
2547 raise LcmException(
2548 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2549 now = time()
2550 _id = str(uuid4())
2551 nslcmop = {
2552 "id": _id,
2553 "_id": _id,
2554 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2555 "operationState": "PROCESSING",
2556 "statusEnteredTime": now,
2557 "nsInstanceId": nsr_id,
2558 "lcmOperationType": operation,
2559 "startTime": now,
2560 "isAutomaticInvocation": False,
2561 "operationParams": params,
2562 "isCancelPending": False,
2563 "links": {
2564 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2565 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2566 }
2567 }
2568 return nslcmop
2569
2570 def _format_additional_params(self, params):
2571 params = params or {}
2572 for key, value in params.items():
2573 if str(value).startswith("!!yaml "):
2574 params[key] = yaml.safe_load(value[7:])
2575 return params
2576
2577 def _get_terminate_primitive_params(self, seq, vnf_index):
2578 primitive = seq.get('name')
2579 primitive_params = {}
2580 params = {
2581 "member_vnf_index": vnf_index,
2582 "primitive": primitive,
2583 "primitive_params": primitive_params,
2584 }
2585 desc_params = {}
2586 return self._map_primitive_params(seq, params, desc_params)
2587
2588 # sub-operations
2589
2590 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
2591 op = deep_get(db_nslcmop, ('_admin', 'operations'), [])[op_index]
2592 if op.get('operationState') == 'COMPLETED':
2593 # b. Skip sub-operation
2594 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2595 return self.SUBOPERATION_STATUS_SKIP
2596 else:
2597 # c. retry executing sub-operation
2598 # The sub-operation exists, and operationState != 'COMPLETED'
2599 # Update operationState = 'PROCESSING' to indicate a retry.
2600 operationState = 'PROCESSING'
2601 detailed_status = 'In progress'
2602 self._update_suboperation_status(
2603 db_nslcmop, op_index, operationState, detailed_status)
2604 # Return the sub-operation index
2605 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2606 # with arguments extracted from the sub-operation
2607 return op_index
2608
2609 # Find a sub-operation where all keys in a matching dictionary must match
2610 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2611 def _find_suboperation(self, db_nslcmop, match):
2612 if db_nslcmop and match:
2613 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2614 for i, op in enumerate(op_list):
2615 if all(op.get(k) == match[k] for k in match):
2616 return i
2617 return self.SUBOPERATION_STATUS_NOT_FOUND
2618
2619 # Update status for a sub-operation given its index
2620 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2621 # Update DB for HA tasks
2622 q_filter = {'_id': db_nslcmop['_id']}
2623 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2624 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2625 self.db.set_one("nslcmops",
2626 q_filter=q_filter,
2627 update_dict=update_dict,
2628 fail_on_empty=False)
2629
2630 # Add sub-operation, return the index of the added sub-operation
2631 # Optionally, set operationState, detailed-status, and operationType
2632 # Status and type are currently set for 'scale' sub-operations:
2633 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2634 # 'detailed-status' : status message
2635 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2636 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2637 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2638 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2639 RO_nsr_id=None, RO_scaling_info=None):
2640 if not db_nslcmop:
2641 return self.SUBOPERATION_STATUS_NOT_FOUND
2642 # Get the "_admin.operations" list, if it exists
2643 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2644 op_list = db_nslcmop_admin.get('operations')
2645 # Create or append to the "_admin.operations" list
2646 new_op = {'member_vnf_index': vnf_index,
2647 'vdu_id': vdu_id,
2648 'vdu_count_index': vdu_count_index,
2649 'primitive': primitive,
2650 'primitive_params': mapped_primitive_params}
2651 if operationState:
2652 new_op['operationState'] = operationState
2653 if detailed_status:
2654 new_op['detailed-status'] = detailed_status
2655 if operationType:
2656 new_op['lcmOperationType'] = operationType
2657 if RO_nsr_id:
2658 new_op['RO_nsr_id'] = RO_nsr_id
2659 if RO_scaling_info:
2660 new_op['RO_scaling_info'] = RO_scaling_info
2661 if not op_list:
2662 # No existing operations, create key 'operations' with current operation as first list element
2663 db_nslcmop_admin.update({'operations': [new_op]})
2664 op_list = db_nslcmop_admin.get('operations')
2665 else:
2666 # Existing operations, append operation to list
2667 op_list.append(new_op)
2668
2669 db_nslcmop_update = {'_admin.operations': op_list}
2670 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2671 op_index = len(op_list) - 1
2672 return op_index
2673
2674 # Helper methods for scale() sub-operations
2675
2676 # pre-scale/post-scale:
2677 # Check for 3 different cases:
2678 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2679 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2680 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2681 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2682 operationType, RO_nsr_id=None, RO_scaling_info=None):
2683 # Find this sub-operation
2684 if RO_nsr_id and RO_scaling_info:
2685 operationType = 'SCALE-RO'
2686 match = {
2687 'member_vnf_index': vnf_index,
2688 'RO_nsr_id': RO_nsr_id,
2689 'RO_scaling_info': RO_scaling_info,
2690 }
2691 else:
2692 match = {
2693 'member_vnf_index': vnf_index,
2694 'primitive': vnf_config_primitive,
2695 'primitive_params': primitive_params,
2696 'lcmOperationType': operationType
2697 }
2698 op_index = self._find_suboperation(db_nslcmop, match)
2699 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
2700 # a. New sub-operation
2701 # The sub-operation does not exist, add it.
2702 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2703 # The following parameters are set to None for all kind of scaling:
2704 vdu_id = None
2705 vdu_count_index = None
2706 vdu_name = None
2707 if RO_nsr_id and RO_scaling_info:
2708 vnf_config_primitive = None
2709 primitive_params = None
2710 else:
2711 RO_nsr_id = None
2712 RO_scaling_info = None
2713 # Initial status for sub-operation
2714 operationState = 'PROCESSING'
2715 detailed_status = 'In progress'
2716 # Add sub-operation for pre/post-scaling (zero or more operations)
2717 self._add_suboperation(db_nslcmop,
2718 vnf_index,
2719 vdu_id,
2720 vdu_count_index,
2721 vdu_name,
2722 vnf_config_primitive,
2723 primitive_params,
2724 operationState,
2725 detailed_status,
2726 operationType,
2727 RO_nsr_id,
2728 RO_scaling_info)
2729 return self.SUBOPERATION_STATUS_NEW
2730 else:
2731 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2732 # or op_index (operationState != 'COMPLETED')
2733 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
2734
2735 # Function to return execution_environment id
2736
2737 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
2738 # TODO vdu_index_count
2739 for vca in vca_deployed_list:
2740 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
2741 return vca["ee_id"]
2742
2743 async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor,
2744 vca_index, destroy_ee=True, exec_primitives=True, scaling_in=False):
2745 """
2746 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
2747 :param logging_text:
2748 :param db_nslcmop:
2749 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
2750 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
2751 :param vca_index: index in the database _admin.deployed.VCA
2752 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
2753 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
2754 not executed properly
2755 :param scaling_in: True destroys the application, False destroys the model
2756 :return: None or exception
2757 """
2758
2759 self.logger.debug(
2760 logging_text + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
2761 vca_index, vca_deployed, config_descriptor, destroy_ee
2762 )
2763 )
2764
2765 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
2766
2767 # execute terminate_primitives
2768 if exec_primitives:
2769 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
2770 config_descriptor.get("terminate-config-primitive"), vca_deployed.get("ee_descriptor_id"))
2771 vdu_id = vca_deployed.get("vdu_id")
2772 vdu_count_index = vca_deployed.get("vdu_count_index")
2773 vdu_name = vca_deployed.get("vdu_name")
2774 vnf_index = vca_deployed.get("member-vnf-index")
2775 if terminate_primitives and vca_deployed.get("needed_terminate"):
2776 for seq in terminate_primitives:
2777 # For each sequence in list, get primitive and call _ns_execute_primitive()
2778 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2779 vnf_index, seq.get("name"))
2780 self.logger.debug(logging_text + step)
2781 # Create the primitive for each sequence, i.e. "primitive": "touch"
2782 primitive = seq.get('name')
2783 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2784
2785 # Add sub-operation
2786 self._add_suboperation(db_nslcmop,
2787 vnf_index,
2788 vdu_id,
2789 vdu_count_index,
2790 vdu_name,
2791 primitive,
2792 mapped_primitive_params)
2793 # Sub-operations: Call _ns_execute_primitive() instead of action()
2794 try:
2795 result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
2796 mapped_primitive_params,
2797 vca_type=vca_type)
2798 except LcmException:
2799 # this happens when VCA is not deployed. In this case it is not needed to terminate
2800 continue
2801 result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2802 if result not in result_ok:
2803 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
2804 "error {}".format(seq.get("name"), vnf_index, result_detail))
2805 # set that this VCA do not need terminated
2806 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
2807 self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
2808
2809 if vca_deployed.get("prometheus_jobs") and self.prometheus:
2810 await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
2811
2812 if destroy_ee:
2813 await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"], scaling_in=scaling_in)
2814
2815 async def _delete_all_N2VC(self, db_nsr: dict):
2816 self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
2817 namespace = "." + db_nsr["_id"]
2818 try:
2819 await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
2820 except N2VCNotFound: # already deleted. Skip
2821 pass
2822 self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
2823
2824 async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
2825 """
2826 Terminates a deployment from RO
2827 :param logging_text:
2828 :param nsr_deployed: db_nsr._admin.deployed
2829 :param nsr_id:
2830 :param nslcmop_id:
2831 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
2832 this method will update only the index 2, but it will write on database the concatenated content of the list
2833 :return:
2834 """
2835 db_nsr_update = {}
2836 failed_detail = []
2837 ro_nsr_id = ro_delete_action = None
2838 if nsr_deployed and nsr_deployed.get("RO"):
2839 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
2840 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2841 try:
2842 if ro_nsr_id:
2843 stage[2] = "Deleting ns from VIM."
2844 db_nsr_update["detailed-status"] = " ".join(stage)
2845 self._write_op_status(nslcmop_id, stage)
2846 self.logger.debug(logging_text + stage[2])
2847 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2848 self._write_op_status(nslcmop_id, stage)
2849 desc = await self.RO.delete("ns", ro_nsr_id)
2850 ro_delete_action = desc["action_id"]
2851 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
2852 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2853 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2854 if ro_delete_action:
2855 # wait until NS is deleted from VIM
2856 stage[2] = "Waiting ns deleted from VIM."
2857 detailed_status_old = None
2858 self.logger.debug(logging_text + stage[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id,
2859 ro_delete_action))
2860 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2861 self._write_op_status(nslcmop_id, stage)
2862
2863 delete_timeout = 20 * 60 # 20 minutes
2864 while delete_timeout > 0:
2865 desc = await self.RO.show(
2866 "ns",
2867 item_id_name=ro_nsr_id,
2868 extra_item="action",
2869 extra_item_id=ro_delete_action)
2870
2871 # deploymentStatus
2872 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
2873
2874 ns_status, ns_status_info = self.RO.check_action_status(desc)
2875 if ns_status == "ERROR":
2876 raise ROclient.ROClientException(ns_status_info)
2877 elif ns_status == "BUILD":
2878 stage[2] = "Deleting from VIM {}".format(ns_status_info)
2879 elif ns_status == "ACTIVE":
2880 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2881 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2882 break
2883 else:
2884 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2885 if stage[2] != detailed_status_old:
2886 detailed_status_old = stage[2]
2887 db_nsr_update["detailed-status"] = " ".join(stage)
2888 self._write_op_status(nslcmop_id, stage)
2889 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2890 await asyncio.sleep(5, loop=self.loop)
2891 delete_timeout -= 5
2892 else: # delete_timeout <= 0:
2893 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2894
2895 except Exception as e:
2896 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2897 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2898 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2899 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2900 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2901 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id))
2902 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2903 failed_detail.append("delete conflict: {}".format(e))
2904 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
2905 else:
2906 failed_detail.append("delete error: {}".format(e))
2907 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e))
2908
2909 # Delete nsd
2910 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
2911 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
2912 try:
2913 stage[2] = "Deleting nsd from RO."
2914 db_nsr_update["detailed-status"] = " ".join(stage)
2915 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2916 self._write_op_status(nslcmop_id, stage)
2917 await self.RO.delete("nsd", ro_nsd_id)
2918 self.logger.debug(logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id))
2919 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2920 except Exception as e:
2921 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2922 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2923 self.logger.debug(logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id))
2924 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2925 failed_detail.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e))
2926 self.logger.debug(logging_text + failed_detail[-1])
2927 else:
2928 failed_detail.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id, e))
2929 self.logger.error(logging_text + failed_detail[-1])
2930
2931 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
2932 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2933 if not vnf_deployed or not vnf_deployed["id"]:
2934 continue
2935 try:
2936 ro_vnfd_id = vnf_deployed["id"]
2937 stage[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
2938 vnf_deployed["member-vnf-index"], ro_vnfd_id)
2939 db_nsr_update["detailed-status"] = " ".join(stage)
2940 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2941 self._write_op_status(nslcmop_id, stage)
2942 await self.RO.delete("vnfd", ro_vnfd_id)
2943 self.logger.debug(logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id))
2944 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2945 except Exception as e:
2946 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2947 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2948 self.logger.debug(logging_text + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id))
2949 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2950 failed_detail.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e))
2951 self.logger.debug(logging_text + failed_detail[-1])
2952 else:
2953 failed_detail.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e))
2954 self.logger.error(logging_text + failed_detail[-1])
2955
2956 if failed_detail:
2957 stage[2] = "Error deleting from VIM"
2958 else:
2959 stage[2] = "Deleted from VIM"
2960 db_nsr_update["detailed-status"] = " ".join(stage)
2961 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2962 self._write_op_status(nslcmop_id, stage)
2963
2964 if failed_detail:
2965 raise LcmException("; ".join(failed_detail))
2966
2967 async def terminate(self, nsr_id, nslcmop_id):
2968 # Try to lock HA task here
2969 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2970 if not task_is_locked_by_me:
2971 return
2972
2973 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
2974 self.logger.debug(logging_text + "Enter")
2975 timeout_ns_terminate = self.timeout_ns_terminate
2976 db_nsr = None
2977 db_nslcmop = None
2978 operation_params = None
2979 exc = None
2980 error_list = [] # annotates all failed error messages
2981 db_nslcmop_update = {}
2982 autoremove = False # autoremove after terminated
2983 tasks_dict_info = {}
2984 db_nsr_update = {}
2985 stage = ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
2986 # ^ contains [stage, step, VIM-status]
2987 try:
2988 # wait for any previous tasks in process
2989 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
2990
2991 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2992 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2993 operation_params = db_nslcmop.get("operationParams") or {}
2994 if operation_params.get("timeout_ns_terminate"):
2995 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
2996 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2997 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2998
2999 db_nsr_update["operational-status"] = "terminating"
3000 db_nsr_update["config-status"] = "terminating"
3001 self._write_ns_status(
3002 nsr_id=nsr_id,
3003 ns_state="TERMINATING",
3004 current_operation="TERMINATING",
3005 current_operation_id=nslcmop_id,
3006 other_update=db_nsr_update
3007 )
3008 self._write_op_status(
3009 op_id=nslcmop_id,
3010 queuePosition=0,
3011 stage=stage
3012 )
3013 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
3014 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
3015 return
3016
3017 stage[1] = "Getting vnf descriptors from db."
3018 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
3019 db_vnfds_from_id = {}
3020 db_vnfds_from_member_index = {}
3021 # Loop over VNFRs
3022 for vnfr in db_vnfrs_list:
3023 vnfd_id = vnfr["vnfd-id"]
3024 if vnfd_id not in db_vnfds_from_id:
3025 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
3026 db_vnfds_from_id[vnfd_id] = vnfd
3027 db_vnfds_from_member_index[vnfr["member-vnf-index-ref"]] = db_vnfds_from_id[vnfd_id]
3028
3029 # Destroy individual execution environments when there are terminating primitives.
3030 # Rest of EE will be deleted at once
3031 # TODO - check before calling _destroy_N2VC
3032 # if not operation_params.get("skip_terminate_primitives"):#
3033 # or not vca.get("needed_terminate"):
3034 stage[0] = "Stage 2/3 execute terminating primitives."
3035 self.logger.debug(logging_text + stage[0])
3036 stage[1] = "Looking execution environment that needs terminate."
3037 self.logger.debug(logging_text + stage[1])
3038
3039 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
3040 config_descriptor = None
3041 if not vca or not vca.get("ee_id"):
3042 continue
3043 if not vca.get("member-vnf-index"):
3044 # ns
3045 config_descriptor = db_nsr.get("ns-configuration")
3046 elif vca.get("vdu_id"):
3047 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3048 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
3049 elif vca.get("kdu_name"):
3050 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3051 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
3052 else:
3053 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3054 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
3055 vca_type = vca.get("type")
3056 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
3057 vca.get("needed_terminate"))
3058 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
3059 # pending native charms
3060 destroy_ee = True if vca_type in ("helm", "helm-v3", "native_charm") else False
3061 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
3062 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
3063 task = asyncio.ensure_future(
3064 self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index,
3065 destroy_ee, exec_terminate_primitives))
3066 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
3067
3068 # wait for pending tasks of terminate primitives
3069 if tasks_dict_info:
3070 self.logger.debug(logging_text + 'Waiting for tasks {}'.format(list(tasks_dict_info.keys())))
3071 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
3072 min(self.timeout_charm_delete, timeout_ns_terminate),
3073 stage, nslcmop_id)
3074 tasks_dict_info.clear()
3075 if error_list:
3076 return # raise LcmException("; ".join(error_list))
3077
3078 # remove All execution environments at once
3079 stage[0] = "Stage 3/3 delete all."
3080
3081 if nsr_deployed.get("VCA"):
3082 stage[1] = "Deleting all execution environments."
3083 self.logger.debug(logging_text + stage[1])
3084 task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_all_N2VC(db_nsr=db_nsr),
3085 timeout=self.timeout_charm_delete))
3086 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3087 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
3088
3089 # Delete from k8scluster
3090 stage[1] = "Deleting KDUs."
3091 self.logger.debug(logging_text + stage[1])
3092 # print(nsr_deployed)
3093 for kdu in get_iterable(nsr_deployed, "K8s"):
3094 if not kdu or not kdu.get("kdu-instance"):
3095 continue
3096 kdu_instance = kdu.get("kdu-instance")
3097 if kdu.get("k8scluster-type") in self.k8scluster_map:
3098 task_delete_kdu_instance = asyncio.ensure_future(
3099 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
3100 cluster_uuid=kdu.get("k8scluster-uuid"),
3101 kdu_instance=kdu_instance))
3102 else:
3103 self.logger.error(logging_text + "Unknown k8s deployment type {}".
3104 format(kdu.get("k8scluster-type")))
3105 continue
3106 tasks_dict_info[task_delete_kdu_instance] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
3107
3108 # remove from RO
3109 stage[1] = "Deleting ns from VIM."
3110 if self.ng_ro:
3111 task_delete_ro = asyncio.ensure_future(
3112 self._terminate_ng_ro(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3113 else:
3114 task_delete_ro = asyncio.ensure_future(
3115 self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3116 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
3117
3118 # rest of staff will be done at finally
3119
3120 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
3121 self.logger.error(logging_text + "Exit Exception {}".format(e))
3122 exc = e
3123 except asyncio.CancelledError:
3124 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
3125 exc = "Operation was cancelled"
3126 except Exception as e:
3127 exc = traceback.format_exc()
3128 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
3129 finally:
3130 if exc:
3131 error_list.append(str(exc))
3132 try:
3133 # wait for pending tasks
3134 if tasks_dict_info:
3135 stage[1] = "Waiting for terminate pending tasks."
3136 self.logger.debug(logging_text + stage[1])
3137 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_terminate,
3138 stage, nslcmop_id)
3139 stage[1] = stage[2] = ""
3140 except asyncio.CancelledError:
3141 error_list.append("Cancelled")
3142 # TODO cancell all tasks
3143 except Exception as exc:
3144 error_list.append(str(exc))
3145 # update status at database
3146 if error_list:
3147 error_detail = "; ".join(error_list)
3148 # self.logger.error(logging_text + error_detail)
3149 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
3150 error_description_nsr = 'Operation: TERMINATING.{}, {}.'.format(nslcmop_id, stage[0])
3151
3152 db_nsr_update["operational-status"] = "failed"
3153 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
3154 db_nslcmop_update["detailed-status"] = error_detail
3155 nslcmop_operation_state = "FAILED"
3156 ns_state = "BROKEN"
3157 else:
3158 error_detail = None
3159 error_description_nsr = error_description_nslcmop = None
3160 ns_state = "NOT_INSTANTIATED"
3161 db_nsr_update["operational-status"] = "terminated"
3162 db_nsr_update["detailed-status"] = "Done"
3163 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
3164 db_nslcmop_update["detailed-status"] = "Done"
3165 nslcmop_operation_state = "COMPLETED"
3166
3167 if db_nsr:
3168 self._write_ns_status(
3169 nsr_id=nsr_id,
3170 ns_state=ns_state,
3171 current_operation="IDLE",
3172 current_operation_id=None,
3173 error_description=error_description_nsr,
3174 error_detail=error_detail,
3175 other_update=db_nsr_update
3176 )
3177 self._write_op_status(
3178 op_id=nslcmop_id,
3179 stage="",
3180 error_message=error_description_nslcmop,
3181 operation_state=nslcmop_operation_state,
3182 other_update=db_nslcmop_update,
3183 )
3184 if ns_state == "NOT_INSTANTIATED":
3185 try:
3186 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "NOT_INSTANTIATED"})
3187 except DbException as e:
3188 self.logger.warn(logging_text + 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
3189 format(nsr_id, e))
3190 if operation_params:
3191 autoremove = operation_params.get("autoremove", False)
3192 if nslcmop_operation_state:
3193 try:
3194 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3195 "operationState": nslcmop_operation_state,
3196 "autoremove": autoremove},
3197 loop=self.loop)
3198 except Exception as e:
3199 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3200
3201 self.logger.debug(logging_text + "Exit")
3202 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
3203
3204 async def _wait_for_tasks(self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None):
3205 time_start = time()
3206 error_detail_list = []
3207 error_list = []
3208 pending_tasks = list(created_tasks_info.keys())
3209 num_tasks = len(pending_tasks)
3210 num_done = 0
3211 stage[1] = "{}/{}.".format(num_done, num_tasks)
3212 self._write_op_status(nslcmop_id, stage)
3213 while pending_tasks:
3214 new_error = None
3215 _timeout = timeout + time_start - time()
3216 done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout,
3217 return_when=asyncio.FIRST_COMPLETED)
3218 num_done += len(done)
3219 if not done: # Timeout
3220 for task in pending_tasks:
3221 new_error = created_tasks_info[task] + ": Timeout"
3222 error_detail_list.append(new_error)
3223 error_list.append(new_error)
3224 break
3225 for task in done:
3226 if task.cancelled():
3227 exc = "Cancelled"
3228 else:
3229 exc = task.exception()
3230 if exc:
3231 if isinstance(exc, asyncio.TimeoutError):
3232 exc = "Timeout"
3233 new_error = created_tasks_info[task] + ": {}".format(exc)
3234 error_list.append(created_tasks_info[task])
3235 error_detail_list.append(new_error)
3236 if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException,
3237 K8sException, NgRoException)):
3238 self.logger.error(logging_text + new_error)
3239 else:
3240 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
3241 self.logger.error(logging_text + created_tasks_info[task] + " " + exc_traceback)
3242 else:
3243 self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
3244 stage[1] = "{}/{}.".format(num_done, num_tasks)
3245 if new_error:
3246 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
3247 if nsr_id: # update also nsr
3248 self.update_db_2("nsrs", nsr_id, {"errorDescription": "Error at: " + ", ".join(error_list),
3249 "errorDetail": ". ".join(error_detail_list)})
3250 self._write_op_status(nslcmop_id, stage)
3251 return error_detail_list
3252
3253 @staticmethod
3254 def _map_primitive_params(primitive_desc, params, instantiation_params):
3255 """
3256 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3257 The default-value is used. If it is between < > it look for a value at instantiation_params
3258 :param primitive_desc: portion of VNFD/NSD that describes primitive
3259 :param params: Params provided by user
3260 :param instantiation_params: Instantiation params provided by user
3261 :return: a dictionary with the calculated params
3262 """
3263 calculated_params = {}
3264 for parameter in primitive_desc.get("parameter", ()):
3265 param_name = parameter["name"]
3266 if param_name in params:
3267 calculated_params[param_name] = params[param_name]
3268 elif "default-value" in parameter or "value" in parameter:
3269 if "value" in parameter:
3270 calculated_params[param_name] = parameter["value"]
3271 else:
3272 calculated_params[param_name] = parameter["default-value"]
3273 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
3274 and calculated_params[param_name].endswith(">"):
3275 if calculated_params[param_name][1:-1] in instantiation_params:
3276 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
3277 else:
3278 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3279 format(calculated_params[param_name], primitive_desc["name"]))
3280 else:
3281 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3282 format(param_name, primitive_desc["name"]))
3283
3284 if isinstance(calculated_params[param_name], (dict, list, tuple)):
3285 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name],
3286 default_flow_style=True, width=256)
3287 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
3288 calculated_params[param_name] = calculated_params[param_name][7:]
3289 if parameter.get("data-type") == "INTEGER":
3290 try:
3291 calculated_params[param_name] = int(calculated_params[param_name])
3292 except ValueError: # error converting string to int
3293 raise LcmException(
3294 "Parameter {} of primitive {} must be integer".format(param_name, primitive_desc["name"]))
3295 elif parameter.get("data-type") == "BOOLEAN":
3296 calculated_params[param_name] = not ((str(calculated_params[param_name])).lower() == 'false')
3297
3298 # add always ns_config_info if primitive name is config
3299 if primitive_desc["name"] == "config":
3300 if "ns_config_info" in instantiation_params:
3301 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
3302 return calculated_params
3303
3304 def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None,
3305 ee_descriptor_id=None):
3306 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3307 for vca in deployed_vca:
3308 if not vca:
3309 continue
3310 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
3311 continue
3312 if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]:
3313 continue
3314 if kdu_name and kdu_name != vca["kdu_name"]:
3315 continue
3316 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
3317 continue
3318 break
3319 else:
3320 # vca_deployed not found
3321 raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
3322 " is not deployed".format(member_vnf_index, vdu_id, vdu_count_index, kdu_name,
3323 ee_descriptor_id))
3324 # get ee_id
3325 ee_id = vca.get("ee_id")
3326 vca_type = vca.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3327 if not ee_id:
3328 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3329 "execution environment"
3330 .format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
3331 return ee_id, vca_type
3332
3333 async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0, retries_interval=30,
3334 timeout=None, vca_type=None, db_dict=None) -> (str, str):
3335 try:
3336 if primitive == "config":
3337 primitive_params = {"params": primitive_params}
3338
3339 vca_type = vca_type or "lxc_proxy_charm"
3340
3341 while retries >= 0:
3342 try:
3343 output = await asyncio.wait_for(
3344 self.vca_map[vca_type].exec_primitive(
3345 ee_id=ee_id,
3346 primitive_name=primitive,
3347 params_dict=primitive_params,
3348 progress_timeout=self.timeout_progress_primitive,
3349 total_timeout=self.timeout_primitive,
3350 db_dict=db_dict),
3351 timeout=timeout or self.timeout_primitive)
3352 # execution was OK
3353 break
3354 except asyncio.CancelledError:
3355 raise
3356 except Exception as e: # asyncio.TimeoutError
3357 if isinstance(e, asyncio.TimeoutError):
3358 e = "Timeout"
3359 retries -= 1
3360 if retries >= 0:
3361 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
3362 # wait and retry
3363 await asyncio.sleep(retries_interval, loop=self.loop)
3364 else:
3365 return 'FAILED', str(e)
3366
3367 return 'COMPLETED', output
3368
3369 except (LcmException, asyncio.CancelledError):
3370 raise
3371 except Exception as e:
3372 return 'FAIL', 'Error executing action {}: {}'.format(primitive, e)
3373
3374 async def vca_status_refresh(self, nsr_id, nslcmop_id):
3375 """
3376 Updating the vca_status with latest juju information in nsrs record
3377 :param: nsr_id: Id of the nsr
3378 :param: nslcmop_id: Id of the nslcmop
3379 :return: None
3380 """
3381
3382 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
3383 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3384
3385 for vca_index, _ in enumerate(db_nsr['_admin']['deployed']['VCA']):
3386 table, filter, path = "nsrs", {"_id": nsr_id}, "_admin.deployed.VCA.{}.".format(vca_index)
3387 await self._on_update_n2vc_db(table, filter, path, {})
3388
3389 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
3390 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
3391
3392 async def action(self, nsr_id, nslcmop_id):
3393 # Try to lock HA task here
3394 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3395 if not task_is_locked_by_me:
3396 return
3397
3398 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
3399 self.logger.debug(logging_text + "Enter")
3400 # get all needed from database
3401 db_nsr = None
3402 db_nslcmop = None
3403 db_nsr_update = {}
3404 db_nslcmop_update = {}
3405 nslcmop_operation_state = None
3406 error_description_nslcmop = None
3407 exc = None
3408 try:
3409 # wait for any previous tasks in process
3410 step = "Waiting for previous operations to terminate"
3411 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3412
3413 self._write_ns_status(
3414 nsr_id=nsr_id,
3415 ns_state=None,
3416 current_operation="RUNNING ACTION",
3417 current_operation_id=nslcmop_id
3418 )
3419
3420 step = "Getting information from database"
3421 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3422 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3423
3424 nsr_deployed = db_nsr["_admin"].get("deployed")
3425 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3426 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3427 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
3428 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3429 primitive = db_nslcmop["operationParams"]["primitive"]
3430 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
3431 timeout_ns_action = db_nslcmop["operationParams"].get("timeout_ns_action", self.timeout_primitive)
3432
3433 if vnf_index:
3434 step = "Getting vnfr from database"
3435 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3436 step = "Getting vnfd from database"
3437 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3438 else:
3439 step = "Getting nsd from database"
3440 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
3441
3442 # for backward compatibility
3443 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3444 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3445 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3446 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3447
3448 # look for primitive
3449 config_primitive_desc = descriptor_configuration = None
3450 if vdu_id:
3451 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
3452 elif kdu_name:
3453 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
3454 elif vnf_index:
3455 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
3456 else:
3457 descriptor_configuration = db_nsd.get("ns-configuration")
3458
3459 if descriptor_configuration and descriptor_configuration.get("config-primitive"):
3460 for config_primitive in descriptor_configuration["config-primitive"]:
3461 if config_primitive["name"] == primitive:
3462 config_primitive_desc = config_primitive
3463 break
3464
3465 if not config_primitive_desc:
3466 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
3467 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3468 format(primitive))
3469 primitive_name = primitive
3470 ee_descriptor_id = None
3471 else:
3472 primitive_name = config_primitive_desc.get("execution-environment-primitive", primitive)
3473 ee_descriptor_id = config_primitive_desc.get("execution-environment-ref")
3474
3475 if vnf_index:
3476 if vdu_id:
3477 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3478 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
3479 elif kdu_name:
3480 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
3481 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3482 else:
3483 desc_params = parse_yaml_strings(db_vnfr.get("additionalParamsForVnf"))
3484 else:
3485 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
3486 if kdu_name and get_configuration(db_vnfd, kdu_name):
3487 kdu_configuration = get_configuration(db_vnfd, kdu_name)
3488 actions = set()
3489 for primitive in kdu_configuration.get("initial-config-primitive", []):
3490 actions.add(primitive["name"])
3491 for primitive in kdu_configuration.get("config-primitive", []):
3492 actions.add(primitive["name"])
3493 kdu_action = True if primitive_name in actions else False
3494
3495 # TODO check if ns is in a proper status
3496 if kdu_name and (primitive_name in ("upgrade", "rollback", "status") or kdu_action):
3497 # kdur and desc_params already set from before
3498 if primitive_params:
3499 desc_params.update(primitive_params)
3500 # TODO Check if we will need something at vnf level
3501 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
3502 if kdu_name == kdu["kdu-name"] and kdu["member-vnf-index"] == vnf_index:
3503 break
3504 else:
3505 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index))
3506
3507 if kdu.get("k8scluster-type") not in self.k8scluster_map:
3508 msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
3509 raise LcmException(msg)
3510
3511 db_dict = {"collection": "nsrs",
3512 "filter": {"_id": nsr_id},
3513 "path": "_admin.deployed.K8s.{}".format(index)}
3514 self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name))
3515 step = "Executing kdu {}".format(primitive_name)
3516 if primitive_name == "upgrade":
3517 if desc_params.get("kdu_model"):
3518 kdu_model = desc_params.get("kdu_model")
3519 del desc_params["kdu_model"]
3520 else:
3521 kdu_model = kdu.get("kdu-model")
3522 parts = kdu_model.split(sep=":")
3523 if len(parts) == 2:
3524 kdu_model = parts[0]
3525
3526 detailed_status = await asyncio.wait_for(
3527 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
3528 cluster_uuid=kdu.get("k8scluster-uuid"),
3529 kdu_instance=kdu.get("kdu-instance"),
3530 atomic=True, kdu_model=kdu_model,
3531 params=desc_params, db_dict=db_dict,
3532 timeout=timeout_ns_action),
3533 timeout=timeout_ns_action + 10)
3534 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(detailed_status))
3535 elif primitive_name == "rollback":
3536 detailed_status = await asyncio.wait_for(
3537 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
3538 cluster_uuid=kdu.get("k8scluster-uuid"),
3539 kdu_instance=kdu.get("kdu-instance"),
3540 db_dict=db_dict),
3541 timeout=timeout_ns_action)
3542 elif primitive_name == "status":
3543 detailed_status = await asyncio.wait_for(
3544 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
3545 cluster_uuid=kdu.get("k8scluster-uuid"),
3546 kdu_instance=kdu.get("kdu-instance")),
3547 timeout=timeout_ns_action)
3548 else:
3549 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(kdu["kdu-name"], nsr_id)
3550 params = self._map_primitive_params(config_primitive_desc, primitive_params, desc_params)
3551
3552 detailed_status = await asyncio.wait_for(
3553 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
3554 cluster_uuid=kdu.get("k8scluster-uuid"),
3555 kdu_instance=kdu_instance,
3556 primitive_name=primitive_name,
3557 params=params, db_dict=db_dict,
3558 timeout=timeout_ns_action),
3559 timeout=timeout_ns_action)
3560
3561 if detailed_status:
3562 nslcmop_operation_state = 'COMPLETED'
3563 else:
3564 detailed_status = ''
3565 nslcmop_operation_state = 'FAILED'
3566 else:
3567 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"], member_vnf_index=vnf_index,
3568 vdu_id=vdu_id, vdu_count_index=vdu_count_index,
3569 ee_descriptor_id=ee_descriptor_id)
3570 db_nslcmop_notif = {"collection": "nslcmops",
3571 "filter": {"_id": nslcmop_id},
3572 "path": "admin.VCA"}
3573 nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
3574 ee_id,
3575 primitive=primitive_name,
3576 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
3577 timeout=timeout_ns_action,
3578 vca_type=vca_type,
3579 db_dict=db_nslcmop_notif)
3580
3581 db_nslcmop_update["detailed-status"] = detailed_status
3582 error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else ""
3583 self.logger.debug(logging_text + " task Done with result {} {}".format(nslcmop_operation_state,
3584 detailed_status))
3585 return # database update is called inside finally
3586
3587 except (DbException, LcmException, N2VCException, K8sException) as e:
3588 self.logger.error(logging_text + "Exit Exception {}".format(e))
3589 exc = e
3590 except asyncio.CancelledError:
3591 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3592 exc = "Operation was cancelled"
3593 except asyncio.TimeoutError:
3594 self.logger.error(logging_text + "Timeout while '{}'".format(step))
3595 exc = "Timeout"
3596 except Exception as e:
3597 exc = traceback.format_exc()
3598 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3599 finally:
3600 if exc:
3601 db_nslcmop_update["detailed-status"] = detailed_status = error_description_nslcmop = \
3602 "FAILED {}: {}".format(step, exc)
3603 nslcmop_operation_state = "FAILED"
3604 if db_nsr:
3605 self._write_ns_status(
3606 nsr_id=nsr_id,
3607 ns_state=db_nsr["nsState"], # TODO check if degraded. For the moment use previous status
3608 current_operation="IDLE",
3609 current_operation_id=None,
3610 # error_description=error_description_nsr,
3611 # error_detail=error_detail,
3612 other_update=db_nsr_update
3613 )
3614
3615 self._write_op_status(op_id=nslcmop_id, stage="", error_message=error_description_nslcmop,
3616 operation_state=nslcmop_operation_state, other_update=db_nslcmop_update)
3617
3618 if nslcmop_operation_state:
3619 try:
3620 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3621 "operationState": nslcmop_operation_state},
3622 loop=self.loop)
3623 except Exception as e:
3624 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3625 self.logger.debug(logging_text + "Exit")
3626 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
3627 return nslcmop_operation_state, detailed_status
3628
3629 async def scale(self, nsr_id, nslcmop_id):
3630 # Try to lock HA task here
3631 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3632 if not task_is_locked_by_me:
3633 return
3634
3635 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
3636 stage = ['', '', '']
3637 tasks_dict_info = {}
3638 # ^ stage, step, VIM progress
3639 self.logger.debug(logging_text + "Enter")
3640 # get all needed from database
3641 db_nsr = None
3642 db_nslcmop_update = {}
3643 db_nsr_update = {}
3644 exc = None
3645 # in case of error, indicates what part of scale was failed to put nsr at error status
3646 scale_process = None
3647 old_operational_status = ""
3648 old_config_status = ""
3649 nsi_id = None
3650 try:
3651 # wait for any previous tasks in process
3652 step = "Waiting for previous operations to terminate"
3653 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3654 self._write_ns_status(nsr_id=nsr_id, ns_state=None,
3655 current_operation="SCALING", current_operation_id=nslcmop_id)
3656
3657 step = "Getting nslcmop from database"
3658 self.logger.debug(step + " after having waited for previous tasks to be completed")
3659 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3660
3661 step = "Getting nsr from database"
3662 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3663 old_operational_status = db_nsr["operational-status"]
3664 old_config_status = db_nsr["config-status"]
3665
3666 step = "Parsing scaling parameters"
3667 db_nsr_update["operational-status"] = "scaling"
3668 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3669 nsr_deployed = db_nsr["_admin"].get("deployed")
3670
3671 #######
3672 nsr_deployed = db_nsr["_admin"].get("deployed")
3673 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3674 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3675 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3676 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3677 #######
3678
3679 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3680 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3681 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3682 # for backward compatibility
3683 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3684 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3685 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3686 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3687
3688 step = "Getting vnfr from database"
3689 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3690
3691 step = "Getting vnfd from database"
3692 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3693
3694 base_folder = db_vnfd["_admin"]["storage"]
3695
3696 step = "Getting scaling-group-descriptor"
3697 scaling_descriptor = find_in_list(
3698 get_scaling_aspect(
3699 db_vnfd
3700 ),
3701 lambda scale_desc: scale_desc["name"] == scaling_group
3702 )
3703 if not scaling_descriptor:
3704 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3705 "at vnfd:scaling-group-descriptor".format(scaling_group))
3706
3707 step = "Sending scale order to VIM"
3708 # TODO check if ns is in a proper status
3709 nb_scale_op = 0
3710 if not db_nsr["_admin"].get("scaling-group"):
3711 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
3712 admin_scale_index = 0
3713 else:
3714 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
3715 if admin_scale_info["name"] == scaling_group:
3716 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
3717 break
3718 else: # not found, set index one plus last element and add new entry with the name
3719 admin_scale_index += 1
3720 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
3721 RO_scaling_info = []
3722 VCA_scaling_info = []
3723 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
3724 if scaling_type == "SCALE_OUT":
3725 if "aspect-delta-details" not in scaling_descriptor:
3726 raise LcmException(
3727 "Aspect delta details not fount in scaling descriptor {}".format(
3728 scaling_descriptor["name"]
3729 )
3730 )
3731 # count if max-instance-count is reached
3732 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
3733
3734 vdu_scaling_info["scaling_direction"] = "OUT"
3735 vdu_scaling_info["vdu-create"] = {}
3736 for delta in deltas:
3737 for vdu_delta in delta["vdu-delta"]:
3738 vdud = get_vdu(db_vnfd, vdu_delta["id"])
3739 vdu_index = get_vdur_index(db_vnfr, vdu_delta)
3740 cloud_init_text = self._get_vdu_cloud_init_content(vdud, db_vnfd)
3741 if cloud_init_text:
3742 additional_params = self._get_vdu_additional_params(db_vnfr, vdud["id"]) or {}
3743 cloud_init_list = []
3744
3745 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
3746 max_instance_count = 10
3747 if vdu_profile and "max-number-of-instances" in vdu_profile:
3748 max_instance_count = vdu_profile.get("max-number-of-instances", 10)
3749
3750 default_instance_num = get_number_of_instances(db_vnfd, vdud["id"])
3751
3752 nb_scale_op += vdu_delta.get("number-of-instances", 1)
3753
3754 if nb_scale_op + default_instance_num > max_instance_count:
3755 raise LcmException(
3756 "reached the limit of {} (max-instance-count) "
3757 "scaling-out operations for the "
3758 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group)
3759 )
3760 for x in range(vdu_delta.get("number-of-instances", 1)):
3761 if cloud_init_text:
3762 # TODO Information of its own ip is not available because db_vnfr is not updated.
3763 additional_params["OSM"] = get_osm_params(
3764 db_vnfr,
3765 vdu_delta["id"],
3766 vdu_index + x
3767 )
3768 cloud_init_list.append(
3769 self._parse_cloud_init(
3770 cloud_init_text,
3771 additional_params,
3772 db_vnfd["id"],
3773 vdud["id"]
3774 )
3775 )
3776 VCA_scaling_info.append(
3777 {
3778 "osm_vdu_id": vdu_delta["id"],
3779 "member-vnf-index": vnf_index,
3780 "type": "create",
3781 "vdu_index": vdu_index + x
3782 }
3783 )
3784 RO_scaling_info.append(
3785 {
3786 "osm_vdu_id": vdu_delta["id"],
3787 "member-vnf-index": vnf_index,
3788 "type": "create",
3789 "count": vdu_delta.get("number-of-instances", 1)
3790 }
3791 )
3792 if cloud_init_list:
3793 RO_scaling_info[-1]["cloud_init"] = cloud_init_list
3794 vdu_scaling_info["vdu-create"][vdu_delta["id"]] = vdu_delta.get("number-of-instances", 1)
3795
3796 elif scaling_type == "SCALE_IN":
3797 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
3798 min_instance_count = int(scaling_descriptor["min-instance-count"])
3799
3800 vdu_scaling_info["scaling_direction"] = "IN"
3801 vdu_scaling_info["vdu-delete"] = {}
3802 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
3803 for delta in deltas:
3804 for vdu_delta in delta["vdu-delta"]:
3805 vdu_index = get_vdur_index(db_vnfr, vdu_delta)
3806 min_instance_count = 0
3807 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
3808 if vdu_profile and "min-number-of-instances" in vdu_profile:
3809 min_instance_count = vdu_profile["min-number-of-instances"]
3810
3811 default_instance_num = get_number_of_instances(db_vnfd, vdu_delta["id"])
3812
3813 nb_scale_op -= vdu_delta.get("number-of-instances", 1)
3814 if nb_scale_op + default_instance_num < min_instance_count:
3815 raise LcmException(
3816 "reached the limit of {} (min-instance-count) scaling-in operations for the "
3817 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group)
3818 )
3819 RO_scaling_info.append({"osm_vdu_id": vdu_delta["id"], "member-vnf-index": vnf_index,
3820 "type": "delete", "count": vdu_delta.get("number-of-instances", 1),
3821 "vdu_index": vdu_index - 1})
3822 for x in range(vdu_delta.get("number-of-instances", 1)):
3823 VCA_scaling_info.append(
3824 {
3825 "osm_vdu_id": vdu_delta["id"],
3826 "member-vnf-index": vnf_index,
3827 "type": "delete",
3828 "vdu_index": vdu_index - 1 - x
3829 }
3830 )
3831 vdu_scaling_info["vdu-delete"][vdu_delta["id"]] = vdu_delta.get("number-of-instances", 1)
3832
3833 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3834 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
3835 if vdu_scaling_info["scaling_direction"] == "IN":
3836 for vdur in reversed(db_vnfr["vdur"]):
3837 if vdu_delete.get(vdur["vdu-id-ref"]):
3838 vdu_delete[vdur["vdu-id-ref"]] -= 1
3839 vdu_scaling_info["vdu"].append({
3840 "name": vdur.get("name") or vdur.get("vdu-name"),
3841 "vdu_id": vdur["vdu-id-ref"],
3842 "interface": []
3843 })
3844 for interface in vdur["interfaces"]:
3845 vdu_scaling_info["vdu"][-1]["interface"].append({
3846 "name": interface["name"],
3847 "ip_address": interface["ip-address"],
3848 "mac_address": interface.get("mac-address"),
3849 })
3850 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
3851
3852 # PRE-SCALE BEGIN
3853 step = "Executing pre-scale vnf-config-primitive"
3854 if scaling_descriptor.get("scaling-config-action"):
3855 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3856 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
3857 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
3858 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3859 step = db_nslcmop_update["detailed-status"] = \
3860 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
3861
3862 # look for primitive
3863 for config_primitive in (get_configuration(
3864 db_vnfd, db_vnfd["id"]
3865 ) or {}).get("config-primitive", ()):
3866 if config_primitive["name"] == vnf_config_primitive:
3867 break
3868 else:
3869 raise LcmException(
3870 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3871 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3872 "primitive".format(scaling_group, vnf_config_primitive))
3873
3874 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3875 if db_vnfr.get("additionalParamsForVnf"):
3876 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3877
3878 scale_process = "VCA"
3879 db_nsr_update["config-status"] = "configuring pre-scaling"
3880 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3881
3882 # Pre-scale retry check: Check if this sub-operation has been executed before
3883 op_index = self._check_or_add_scale_suboperation(
3884 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
3885 if op_index == self.SUBOPERATION_STATUS_SKIP:
3886 # Skip sub-operation
3887 result = 'COMPLETED'
3888 result_detail = 'Done'
3889 self.logger.debug(logging_text +
3890 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3891 vnf_config_primitive, result, result_detail))
3892 else:
3893 if op_index == self.SUBOPERATION_STATUS_NEW:
3894 # New sub-operation: Get index of this sub-operation
3895 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3896 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3897 format(vnf_config_primitive))
3898 else:
3899 # retry: Get registered params for this existing sub-operation
3900 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3901 vnf_index = op.get('member_vnf_index')
3902 vnf_config_primitive = op.get('primitive')
3903 primitive_params = op.get('primitive_params')
3904 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
3905 format(vnf_config_primitive))
3906 # Execute the primitive, either with new (first-time) or registered (reintent) args
3907 ee_descriptor_id = config_primitive.get("execution-environment-ref")
3908 primitive_name = config_primitive.get("execution-environment-primitive",
3909 vnf_config_primitive)
3910 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
3911 member_vnf_index=vnf_index,
3912 vdu_id=None,
3913 vdu_count_index=None,
3914 ee_descriptor_id=ee_descriptor_id)
3915 result, result_detail = await self._ns_execute_primitive(
3916 ee_id, primitive_name, primitive_params, vca_type=vca_type)
3917 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3918 vnf_config_primitive, result, result_detail))
3919 # Update operationState = COMPLETED | FAILED
3920 self._update_suboperation_status(
3921 db_nslcmop, op_index, result, result_detail)
3922
3923 if result == "FAILED":
3924 raise LcmException(result_detail)
3925 db_nsr_update["config-status"] = old_config_status
3926 scale_process = None
3927 # PRE-SCALE END
3928
3929 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
3930 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
3931
3932 # SCALE-IN VCA - BEGIN
3933 if VCA_scaling_info:
3934 step = db_nslcmop_update["detailed-status"] = \
3935 "Deleting the execution environments"
3936 scale_process = "VCA"
3937 for vdu_info in VCA_scaling_info:
3938 if vdu_info["type"] == "delete":
3939 member_vnf_index = str(vdu_info["member-vnf-index"])
3940 self.logger.debug(logging_text + "vdu info: {}".format(vdu_info))
3941 vdu_id = vdu_info["osm_vdu_id"]
3942 vdu_index = int(vdu_info["vdu_index"])
3943 stage[1] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
3944 member_vnf_index, vdu_id, vdu_index)
3945 stage[2] = step = "Scaling in VCA"
3946 self._write_op_status(
3947 op_id=nslcmop_id,
3948 stage=stage
3949 )
3950 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
3951 config_update = db_nsr["configurationStatus"]
3952 for vca_index, vca in enumerate(vca_update):
3953 if (vca or vca.get("ee_id")) and vca["member-vnf-index"] == member_vnf_index and \
3954 vca["vdu_count_index"] == vdu_index:
3955 if vca.get("vdu_id"):
3956 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
3957 elif vca.get("kdu_name"):
3958 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
3959 else:
3960 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
3961 operation_params = db_nslcmop.get("operationParams") or {}
3962 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
3963 vca.get("needed_terminate"))
3964 task = asyncio.ensure_future(asyncio.wait_for(
3965 self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
3966 vca_index, destroy_ee=True,
3967 exec_primitives=exec_terminate_primitives,
3968 scaling_in=True), timeout=self.timeout_charm_delete))
3969 # wait before next removal
3970 await asyncio.sleep(30)
3971 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
3972 del vca_update[vca_index]
3973 del config_update[vca_index]
3974 # wait for pending tasks of terminate primitives
3975 if tasks_dict_info:
3976 self.logger.debug(logging_text +
3977 'Waiting for tasks {}'.format(list(tasks_dict_info.keys())))
3978 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
3979 min(self.timeout_charm_delete,
3980 self.timeout_ns_terminate),
3981 stage, nslcmop_id)
3982 tasks_dict_info.clear()
3983 if error_list:
3984 raise LcmException("; ".join(error_list))
3985
3986 db_vca_and_config_update = {
3987 "_admin.deployed.VCA": vca_update,
3988 "configurationStatus": config_update
3989 }
3990 self.update_db_2("nsrs", db_nsr["_id"], db_vca_and_config_update)
3991 scale_process = None
3992 # SCALE-IN VCA - END
3993
3994 # SCALE RO - BEGIN
3995 if RO_scaling_info:
3996 scale_process = "RO"
3997 if self.ro_config.get("ng"):
3998 await self._scale_ng_ro(logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage)
3999 vdu_scaling_info.pop("vdu-create", None)
4000 vdu_scaling_info.pop("vdu-delete", None)
4001
4002 scale_process = None
4003 if db_nsr_update:
4004 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4005 # SCALE RO - END
4006
4007 # SCALE-UP VCA - BEGIN
4008 if VCA_scaling_info:
4009 step = db_nslcmop_update["detailed-status"] = \
4010 "Creating new execution environments"
4011 scale_process = "VCA"
4012 for vdu_info in VCA_scaling_info:
4013 if vdu_info["type"] == "create":
4014 member_vnf_index = str(vdu_info["member-vnf-index"])
4015 self.logger.debug(logging_text + "vdu info: {}".format(vdu_info))
4016 vnfd_id = db_vnfr["vnfd-ref"]
4017 vdu_index = int(vdu_info["vdu_index"])
4018 deploy_params = {"OSM": get_osm_params(db_vnfr)}
4019 if db_vnfr.get("additionalParamsForVnf"):
4020 deploy_params.update(parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy()))
4021 descriptor_config = get_configuration(db_vnfd, db_vnfd["id"])
4022 if descriptor_config:
4023 vdu_id = None
4024 vdu_name = None
4025 kdu_name = None
4026 self._deploy_n2vc(
4027 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
4028 db_nsr=db_nsr,
4029 db_vnfr=db_vnfr,
4030 nslcmop_id=nslcmop_id,
4031 nsr_id=nsr_id,
4032 nsi_id=nsi_id,
4033 vnfd_id=vnfd_id,
4034 vdu_id=vdu_id,
4035 kdu_name=kdu_name,
4036 member_vnf_index=member_vnf_index,
4037 vdu_index=vdu_index,
4038 vdu_name=vdu_name,
4039 deploy_params=deploy_params,
4040 descriptor_config=descriptor_config,
4041 base_folder=base_folder,
4042 task_instantiation_info=tasks_dict_info,
4043 stage=stage
4044 )
4045 vdu_id = vdu_info["osm_vdu_id"]
4046 vdur = find_in_list(db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id)
4047 descriptor_config = get_configuration(db_vnfd, vdu_id)
4048 if vdur.get("additionalParams"):
4049 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
4050 else:
4051 deploy_params_vdu = deploy_params
4052 deploy_params_vdu["OSM"] = get_osm_params(db_vnfr, vdu_id, vdu_count_index=vdu_index)
4053 if descriptor_config:
4054 vdu_name = None
4055 kdu_name = None
4056 stage[1] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
4057 member_vnf_index, vdu_id, vdu_index)
4058 stage[2] = step = "Scaling out VCA"
4059 self._write_op_status(
4060 op_id=nslcmop_id,
4061 stage=stage
4062 )
4063 self._deploy_n2vc(
4064 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
4065 member_vnf_index, vdu_id, vdu_index),
4066 db_nsr=db_nsr,
4067 db_vnfr=db_vnfr,
4068 nslcmop_id=nslcmop_id,
4069 nsr_id=nsr_id,
4070 nsi_id=nsi_id,
4071 vnfd_id=vnfd_id,
4072 vdu_id=vdu_id,
4073 kdu_name=kdu_name,
4074 member_vnf_index=member_vnf_index,
4075 vdu_index=vdu_index,
4076 vdu_name=vdu_name,
4077 deploy_params=deploy_params_vdu,
4078 descriptor_config=descriptor_config,
4079 base_folder=base_folder,
4080 task_instantiation_info=tasks_dict_info,
4081 stage=stage
4082 )
4083 # TODO: scaling for kdu is not implemented yet.
4084 kdu_name = vdu_info["osm_vdu_id"]
4085 descriptor_config = get_configuration(db_vnfd, kdu_name)
4086 if descriptor_config:
4087 vdu_id = None
4088 vdu_index = vdu_index
4089 vdu_name = None
4090 kdur = next(x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name)
4091 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
4092 if kdur.get("additionalParams"):
4093 deploy_params_kdu = parse_yaml_strings(kdur["additionalParams"])
4094
4095 self._deploy_n2vc(
4096 logging_text=logging_text,
4097 db_nsr=db_nsr,
4098 db_vnfr=db_vnfr,
4099 nslcmop_id=nslcmop_id,
4100 nsr_id=nsr_id,
4101 nsi_id=nsi_id,
4102 vnfd_id=vnfd_id,
4103 vdu_id=vdu_id,
4104 kdu_name=kdu_name,
4105 member_vnf_index=member_vnf_index,
4106 vdu_index=vdu_index,
4107 vdu_name=vdu_name,
4108 deploy_params=deploy_params_kdu,
4109 descriptor_config=descriptor_config,
4110 base_folder=base_folder,
4111 task_instantiation_info=tasks_dict_info,
4112 stage=stage
4113 )
4114 # SCALE-UP VCA - END
4115 scale_process = None
4116
4117 # POST-SCALE BEGIN
4118 # execute primitive service POST-SCALING
4119 step = "Executing post-scale vnf-config-primitive"
4120 if scaling_descriptor.get("scaling-config-action"):
4121 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
4122 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
4123 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
4124 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
4125 step = db_nslcmop_update["detailed-status"] = \
4126 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
4127
4128 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
4129 if db_vnfr.get("additionalParamsForVnf"):
4130 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
4131
4132 # look for primitive
4133 for config_primitive in (
4134 get_configuration(db_vnfd, db_vnfd["id"]) or {}
4135 ).get("config-primitive", ()):
4136 if config_primitive["name"] == vnf_config_primitive:
4137 break
4138 else:
4139 raise LcmException(
4140 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
4141 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
4142 "config-primitive".format(scaling_group, vnf_config_primitive))
4143 scale_process = "VCA"
4144 db_nsr_update["config-status"] = "configuring post-scaling"
4145 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
4146
4147 # Post-scale retry check: Check if this sub-operation has been executed before
4148 op_index = self._check_or_add_scale_suboperation(
4149 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
4150 if op_index == self.SUBOPERATION_STATUS_SKIP:
4151 # Skip sub-operation
4152 result = 'COMPLETED'
4153 result_detail = 'Done'
4154 self.logger.debug(logging_text +
4155 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
4156 format(vnf_config_primitive, result, result_detail))
4157 else:
4158 if op_index == self.SUBOPERATION_STATUS_NEW:
4159 # New sub-operation: Get index of this sub-operation
4160 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4161 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
4162 format(vnf_config_primitive))
4163 else:
4164 # retry: Get registered params for this existing sub-operation
4165 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4166 vnf_index = op.get('member_vnf_index')
4167 vnf_config_primitive = op.get('primitive')
4168 primitive_params = op.get('primitive_params')
4169 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
4170 format(vnf_config_primitive))
4171 # Execute the primitive, either with new (first-time) or registered (reintent) args
4172 ee_descriptor_id = config_primitive.get("execution-environment-ref")
4173 primitive_name = config_primitive.get("execution-environment-primitive",
4174 vnf_config_primitive)
4175 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
4176 member_vnf_index=vnf_index,
4177 vdu_id=None,
4178 vdu_count_index=None,
4179 ee_descriptor_id=ee_descriptor_id)
4180 result, result_detail = await self._ns_execute_primitive(
4181 ee_id, primitive_name, primitive_params, vca_type=vca_type)
4182 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
4183 vnf_config_primitive, result, result_detail))
4184 # Update operationState = COMPLETED | FAILED
4185 self._update_suboperation_status(
4186 db_nslcmop, op_index, result, result_detail)
4187
4188 if result == "FAILED":
4189 raise LcmException(result_detail)
4190 db_nsr_update["config-status"] = old_config_status
4191 scale_process = None
4192 # POST-SCALE END
4193
4194 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
4195 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
4196 else old_operational_status
4197 db_nsr_update["config-status"] = old_config_status
4198 return
4199 except (ROclient.ROClientException, DbException, LcmException, NgRoException) as e:
4200 self.logger.error(logging_text + "Exit Exception {}".format(e))
4201 exc = e
4202 except asyncio.CancelledError:
4203 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
4204 exc = "Operation was cancelled"
4205 except Exception as e:
4206 exc = traceback.format_exc()
4207 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
4208 finally:
4209 self._write_ns_status(nsr_id=nsr_id, ns_state=None, current_operation="IDLE", current_operation_id=None)
4210 if tasks_dict_info:
4211 stage[1] = "Waiting for instantiate pending tasks."
4212 self.logger.debug(logging_text + stage[1])
4213 exc = await self._wait_for_tasks(logging_text, tasks_dict_info, self.timeout_ns_deploy,
4214 stage, nslcmop_id, nsr_id=nsr_id)
4215 if exc:
4216 db_nslcmop_update["detailed-status"] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
4217 nslcmop_operation_state = "FAILED"
4218 if db_nsr:
4219 db_nsr_update["operational-status"] = old_operational_status
4220 db_nsr_update["config-status"] = old_config_status
4221 db_nsr_update["detailed-status"] = ""
4222 if scale_process:
4223 if "VCA" in scale_process:
4224 db_nsr_update["config-status"] = "failed"
4225 if "RO" in scale_process:
4226 db_nsr_update["operational-status"] = "failed"
4227 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
4228 exc)
4229 else:
4230 error_description_nslcmop = None
4231 nslcmop_operation_state = "COMPLETED"
4232 db_nslcmop_update["detailed-status"] = "Done"
4233
4234 self._write_op_status(op_id=nslcmop_id, stage="", error_message=error_description_nslcmop,
4235 operation_state=nslcmop_operation_state, other_update=db_nslcmop_update)
4236 if db_nsr:
4237 self._write_ns_status(nsr_id=nsr_id, ns_state=None, current_operation="IDLE",
4238 current_operation_id=None, other_update=db_nsr_update)
4239
4240 if nslcmop_operation_state:
4241 try:
4242 msg = {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id, "operationState": nslcmop_operation_state}
4243 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
4244 except Exception as e:
4245 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
4246 self.logger.debug(logging_text + "Exit")
4247 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
4248
4249 async def _scale_ng_ro(self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage):
4250 nsr_id = db_nslcmop["nsInstanceId"]
4251 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4252 db_vnfrs = {}
4253
4254 # read from db: vnfd's for every vnf
4255 db_vnfds = []
4256
4257 # for each vnf in ns, read vnfd
4258 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
4259 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
4260 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
4261 # if we haven't this vnfd, read it from db
4262 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
4263 # read from db
4264 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4265 db_vnfds.append(vnfd)
4266 n2vc_key = self.n2vc.get_public_key()
4267 n2vc_key_list = [n2vc_key]
4268 self.scale_vnfr(db_vnfr, vdu_scaling_info.get("vdu-create"), vdu_scaling_info.get("vdu-delete"),
4269 mark_delete=True)
4270 # db_vnfr has been updated, update db_vnfrs to use it
4271 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
4272 await self._instantiate_ng_ro(logging_text, nsr_id, db_nsd, db_nsr, db_nslcmop, db_vnfrs,
4273 db_vnfds, n2vc_key_list, stage=stage, start_deploy=time(),
4274 timeout_ns_deploy=self.timeout_ns_deploy)
4275 if vdu_scaling_info.get("vdu-delete"):
4276 self.scale_vnfr(db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False)
4277
4278 async def add_prometheus_metrics(self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip):
4279 if not self.prometheus:
4280 return
4281 # look if exist a file called 'prometheus*.j2' and
4282 artifact_content = self.fs.dir_ls(artifact_path)
4283 job_file = next((f for f in artifact_content if f.startswith("prometheus") and f.endswith(".j2")), None)
4284 if not job_file:
4285 return
4286 with self.fs.file_open((artifact_path, job_file), "r") as f:
4287 job_data = f.read()
4288
4289 # TODO get_service
4290 _, _, service = ee_id.partition(".") # remove prefix "namespace."
4291 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
4292 host_port = "80"
4293 vnfr_id = vnfr_id.replace("-", "")
4294 variables = {
4295 "JOB_NAME": vnfr_id,
4296 "TARGET_IP": target_ip,
4297 "EXPORTER_POD_IP": host_name,
4298 "EXPORTER_POD_PORT": host_port,
4299 }
4300 job_list = self.prometheus.parse_job(job_data, variables)
4301 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
4302 for job in job_list:
4303 if not isinstance(job.get("job_name"), str) or vnfr_id not in job["job_name"]:
4304 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
4305 job["nsr_id"] = nsr_id
4306 job_dict = {jl["job_name"]: jl for jl in job_list}
4307 if await self.prometheus.update(job_dict):
4308 return list(job_dict.keys())
4309
4310 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
4311 """
4312 Get VCA Cloud and VCA Cloud Credentials for the VIM account
4313
4314 :param: vim_account_id: VIM Account ID
4315
4316 :return: (cloud_name, cloud_credential)
4317 """
4318 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
4319 return config.get("vca_cloud"), config.get("vca_cloud_credential")
4320
4321 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
4322 """
4323 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
4324
4325 :param: vim_account_id: VIM Account ID
4326
4327 :return: (cloud_name, cloud_credential)
4328 """
4329 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
4330 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")