1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
22 import logging
.handlers
25 from jinja2
import Environment
, TemplateError
, TemplateNotFound
, StrictUndefined
, UndefinedError
27 from osm_lcm
import ROclient
28 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
29 from osm_lcm
.lcm_utils
import LcmException
, LcmExceptionNoMgmtIP
, LcmBase
, deep_get
, get_iterable
, populate_dict
30 from osm_lcm
.data_utils
.nsd
import get_vnf_profiles
31 from osm_lcm
.data_utils
.vnfd
import get_vnf_configuration
, get_vdu_list
, get_vdu_profile
, \
32 get_ee_sorted_initial_config_primitive_list
, get_ee_sorted_terminate_config_primitive_list
, \
33 get_kdu_list
, get_virtual_link_profiles
, get_vdu
, get_vdu_configuration
, get_kdu_configuration
, \
34 get_vdu_index
, get_scaling_aspect
, get_number_of_instances
35 from osm_lcm
.data_utils
.list_utils
import find_in_list
36 from osm_lcm
.data_utils
.vnfr
import get_osm_params
37 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
38 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
39 from n2vc
.k8s_helm_conn
import K8sHelmConnector
40 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
41 from n2vc
.k8s_juju_conn
import K8sJujuConnector
43 from osm_common
.dbbase
import DbException
44 from osm_common
.fsbase
import FsException
46 from osm_lcm
.data_utils
.database
.database
import Database
47 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
49 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
50 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
52 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
54 from copy
import copy
, deepcopy
56 from uuid
import uuid4
58 from random
import randint
60 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
64 timeout_vca_on_error
= 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
65 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
66 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
67 timeout_charm_delete
= 10 * 60
68 timeout_primitive
= 30 * 60 # timeout for primitive execution
69 timeout_progress_primitive
= 10 * 60 # timeout for some progress in a primitive execution
71 SUBOPERATION_STATUS_NOT_FOUND
= -1
72 SUBOPERATION_STATUS_NEW
= -2
73 SUBOPERATION_STATUS_SKIP
= -3
74 task_name_deploy_vca
= "Deploying VCA"
76 def __init__(self
, msg
, lcm_tasks
, config
, loop
, prometheus
=None):
78 Init, Connect to database, filesystem storage, and messaging
79 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
84 logger
=logging
.getLogger('lcm.ns')
87 self
.db
= Database().instance
.db
88 self
.fs
= Filesystem().instance
.fs
90 self
.lcm_tasks
= lcm_tasks
91 self
.timeout
= config
["timeout"]
92 self
.ro_config
= config
["ro_config"]
93 self
.ng_ro
= config
["ro_config"].get("ng")
94 self
.vca_config
= config
["VCA"].copy()
96 # create N2VC connector
97 self
.n2vc
= N2VCJujuConnector(
100 url
='{}:{}'.format(self
.vca_config
['host'], self
.vca_config
['port']),
101 username
=self
.vca_config
.get('user', None),
102 vca_config
=self
.vca_config
,
103 on_update_db
=self
._on
_update
_n
2vc
_db
,
108 self
.conn_helm_ee
= LCMHelmConn(
113 vca_config
=self
.vca_config
,
114 on_update_db
=self
._on
_update
_n
2vc
_db
117 self
.k8sclusterhelm2
= K8sHelmConnector(
118 kubectl_command
=self
.vca_config
.get("kubectlpath"),
119 helm_command
=self
.vca_config
.get("helmpath"),
126 self
.k8sclusterhelm3
= K8sHelm3Connector(
127 kubectl_command
=self
.vca_config
.get("kubectlpath"),
128 helm_command
=self
.vca_config
.get("helm3path"),
135 self
.k8sclusterjuju
= K8sJujuConnector(
136 kubectl_command
=self
.vca_config
.get("kubectlpath"),
137 juju_command
=self
.vca_config
.get("jujupath"),
141 vca_config
=self
.vca_config
,
146 self
.k8scluster_map
= {
147 "helm-chart": self
.k8sclusterhelm2
,
148 "helm-chart-v3": self
.k8sclusterhelm3
,
149 "chart": self
.k8sclusterhelm3
,
150 "juju-bundle": self
.k8sclusterjuju
,
151 "juju": self
.k8sclusterjuju
,
155 "lxc_proxy_charm": self
.n2vc
,
156 "native_charm": self
.n2vc
,
157 "k8s_proxy_charm": self
.n2vc
,
158 "helm": self
.conn_helm_ee
,
159 "helm-v3": self
.conn_helm_ee
162 self
.prometheus
= prometheus
165 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
168 def increment_ip_mac(ip_mac
, vm_index
=1):
169 if not isinstance(ip_mac
, str):
172 # try with ipv4 look for last dot
173 i
= ip_mac
.rfind(".")
176 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
177 # try with ipv6 or mac look for last colon. Operate in hex
178 i
= ip_mac
.rfind(":")
181 # format in hex, len can be 2 for mac or 4 for ipv6
182 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
)
187 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
189 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
192 # TODO filter RO descriptor fields...
196 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
197 db_dict
['deploymentStatus'] = ro_descriptor
198 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
200 except Exception as e
:
201 self
.logger
.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id
, e
))
203 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
):
205 # remove last dot from path (if exists)
206 if path
.endswith('.'):
209 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
210 # .format(table, filter, path, updated_data))
214 nsr_id
= filter.get('_id')
216 # read ns record from database
217 nsr
= self
.db
.get_one(table
='nsrs', q_filter
=filter)
218 current_ns_status
= nsr
.get('nsState')
220 # get vca status for NS
221 status_dict
= await self
.n2vc
.get_status(namespace
='.' + nsr_id
, yaml_format
=False)
225 db_dict
['vcaStatus'] = status_dict
227 # update configurationStatus for this VCA
229 vca_index
= int(path
[path
.rfind(".")+1:])
231 vca_list
= deep_get(target_dict
=nsr
, key_list
=('_admin', 'deployed', 'VCA'))
232 vca_status
= vca_list
[vca_index
].get('status')
234 configuration_status_list
= nsr
.get('configurationStatus')
235 config_status
= configuration_status_list
[vca_index
].get('status')
237 if config_status
== 'BROKEN' and vca_status
!= 'failed':
238 db_dict
['configurationStatus'][vca_index
] = 'READY'
239 elif config_status
!= 'BROKEN' and vca_status
== 'failed':
240 db_dict
['configurationStatus'][vca_index
] = 'BROKEN'
241 except Exception as e
:
242 # not update configurationStatus
243 self
.logger
.debug('Error updating vca_index (ignore): {}'.format(e
))
245 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
246 # if nsState = 'DEGRADED' check if all is OK
248 if current_ns_status
in ('READY', 'DEGRADED'):
249 error_description
= ''
251 if status_dict
.get('machines'):
252 for machine_id
in status_dict
.get('machines'):
253 machine
= status_dict
.get('machines').get(machine_id
)
254 # check machine agent-status
255 if machine
.get('agent-status'):
256 s
= machine
.get('agent-status').get('status')
259 error_description
+= 'machine {} agent-status={} ; '.format(machine_id
, s
)
260 # check machine instance status
261 if machine
.get('instance-status'):
262 s
= machine
.get('instance-status').get('status')
265 error_description
+= 'machine {} instance-status={} ; '.format(machine_id
, s
)
267 if status_dict
.get('applications'):
268 for app_id
in status_dict
.get('applications'):
269 app
= status_dict
.get('applications').get(app_id
)
270 # check application status
271 if app
.get('status'):
272 s
= app
.get('status').get('status')
275 error_description
+= 'application {} status={} ; '.format(app_id
, s
)
277 if error_description
:
278 db_dict
['errorDescription'] = error_description
279 if current_ns_status
== 'READY' and is_degraded
:
280 db_dict
['nsState'] = 'DEGRADED'
281 if current_ns_status
== 'DEGRADED' and not is_degraded
:
282 db_dict
['nsState'] = 'READY'
285 self
.update_db_2("nsrs", nsr_id
, db_dict
)
287 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
289 except Exception as e
:
290 self
.logger
.warn('Error updating NS state for ns={}: {}'.format(nsr_id
, e
))
293 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
295 env
= Environment(undefined
=StrictUndefined
)
296 template
= env
.from_string(cloud_init_text
)
297 return template
.render(additional_params
or {})
298 except UndefinedError
as e
:
299 raise LcmException("Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
300 "file, must be provided in the instantiation parameters inside the "
301 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
))
302 except (TemplateError
, TemplateNotFound
) as e
:
303 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
304 format(vnfd_id
, vdu_id
, e
))
306 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
307 cloud_init_content
= cloud_init_file
= None
309 if vdu
.get("cloud-init-file"):
310 base_folder
= vnfd
["_admin"]["storage"]
311 cloud_init_file
= "{}/{}/cloud_init/{}".format(base_folder
["folder"], base_folder
["pkg-dir"],
312 vdu
["cloud-init-file"])
313 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
314 cloud_init_content
= ci_file
.read()
315 elif vdu
.get("cloud-init"):
316 cloud_init_content
= vdu
["cloud-init"]
318 return cloud_init_content
319 except FsException
as e
:
320 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
321 format(vnfd
["id"], vdu
["id"], cloud_init_file
, e
))
323 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
324 vdur
= next(vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"])
325 additional_params
= vdur
.get("additionalParams")
326 return parse_yaml_strings(additional_params
)
328 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
330 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
331 :param vnfd: input vnfd
332 :param new_id: overrides vnf id if provided
333 :param additionalParams: Instantiation params for VNFs provided
334 :param nsrId: Id of the NSR
335 :return: copy of vnfd
337 vnfd_RO
= deepcopy(vnfd
)
338 # remove unused by RO configuration, monitoring, scaling and internal keys
339 vnfd_RO
.pop("_id", None)
340 vnfd_RO
.pop("_admin", None)
341 vnfd_RO
.pop("vnf-configuration", None)
342 vnfd_RO
.pop("monitoring-param", None)
343 vnfd_RO
.pop("scaling-group-descriptor", None)
344 vnfd_RO
.pop("kdu", None)
345 vnfd_RO
.pop("k8s-cluster", None)
347 vnfd_RO
["id"] = new_id
349 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
350 for vdu
in get_iterable(vnfd_RO
, "vdu"):
351 vdu
.pop("cloud-init-file", None)
352 vdu
.pop("cloud-init", None)
356 def ip_profile_2_RO(ip_profile
):
357 RO_ip_profile
= deepcopy(ip_profile
)
358 if "dns-server" in RO_ip_profile
:
359 if isinstance(RO_ip_profile
["dns-server"], list):
360 RO_ip_profile
["dns-address"] = []
361 for ds
in RO_ip_profile
.pop("dns-server"):
362 RO_ip_profile
["dns-address"].append(ds
['address'])
364 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
365 if RO_ip_profile
.get("ip-version") == "ipv4":
366 RO_ip_profile
["ip-version"] = "IPv4"
367 if RO_ip_profile
.get("ip-version") == "ipv6":
368 RO_ip_profile
["ip-version"] = "IPv6"
369 if "dhcp-params" in RO_ip_profile
:
370 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
373 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
374 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
375 if db_vim
["_admin"]["operationalState"] != "ENABLED":
376 raise LcmException("VIM={} is not available. operationalState={}".format(
377 vim_account
, db_vim
["_admin"]["operationalState"]))
378 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
381 def get_ro_wim_id_for_wim_account(self
, wim_account
):
382 if isinstance(wim_account
, str):
383 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
384 if db_wim
["_admin"]["operationalState"] != "ENABLED":
385 raise LcmException("WIM={} is not available. operationalState={}".format(
386 wim_account
, db_wim
["_admin"]["operationalState"]))
387 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
392 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
394 db_vdu_push_list
= []
395 db_update
= {"_admin.modified": time()}
397 for vdu_id
, vdu_count
in vdu_create
.items():
398 vdur
= next((vdur
for vdur
in reversed(db_vnfr
["vdur"]) if vdur
["vdu-id-ref"] == vdu_id
), None)
400 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".
403 for count
in range(vdu_count
):
404 vdur_copy
= deepcopy(vdur
)
405 vdur_copy
["status"] = "BUILD"
406 vdur_copy
["status-detailed"] = None
407 vdur_copy
["ip-address"]: None
408 vdur_copy
["_id"] = str(uuid4())
409 vdur_copy
["count-index"] += count
+ 1
410 vdur_copy
["id"] = "{}-{}".format(vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"])
411 vdur_copy
.pop("vim_info", None)
412 for iface
in vdur_copy
["interfaces"]:
413 if iface
.get("fixed-ip"):
414 iface
["ip-address"] = self
.increment_ip_mac(iface
["ip-address"], count
+1)
416 iface
.pop("ip-address", None)
417 if iface
.get("fixed-mac"):
418 iface
["mac-address"] = self
.increment_ip_mac(iface
["mac-address"], count
+1)
420 iface
.pop("mac-address", None)
421 iface
.pop("mgmt_vnf", None) # only first vdu can be managment of vnf
422 db_vdu_push_list
.append(vdur_copy
)
423 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
425 for vdu_id
, vdu_count
in vdu_delete
.items():
427 indexes_to_delete
= [iv
[0] for iv
in enumerate(db_vnfr
["vdur"]) if iv
[1]["vdu-id-ref"] == vdu_id
]
428 db_update
.update({"vdur.{}.status".format(i
): "DELETING" for i
in indexes_to_delete
[-vdu_count
:]})
430 # it must be deleted one by one because common.db does not allow otherwise
431 vdus_to_delete
= [v
for v
in reversed(db_vnfr
["vdur"]) if v
["vdu-id-ref"] == vdu_id
]
432 for vdu
in vdus_to_delete
[:vdu_count
]:
433 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, None, pull
={"vdur": {"_id": vdu
["_id"]}})
434 db_push
= {"vdur": db_vdu_push_list
} if db_vdu_push_list
else None
435 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
436 # modify passed dictionary db_vnfr
437 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
438 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
440 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
442 Updates database nsr with the RO info for the created vld
443 :param ns_update_nsr: dictionary to be filled with the updated info
444 :param db_nsr: content of db_nsr. This is also modified
445 :param nsr_desc_RO: nsr descriptor from RO
446 :return: Nothing, LcmException is raised on errors
449 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
450 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
451 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
453 vld
["vim-id"] = net_RO
.get("vim_net_id")
454 vld
["name"] = net_RO
.get("vim_name")
455 vld
["status"] = net_RO
.get("status")
456 vld
["status-detailed"] = net_RO
.get("error_msg")
457 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
460 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld
["id"]))
462 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
464 for db_vnfr
in db_vnfrs
.values():
465 vnfr_update
= {"status": "ERROR"}
466 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
467 if "status" not in vdur
:
468 vdur
["status"] = "ERROR"
469 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
471 vdur
["status-detailed"] = str(error_text
)
472 vnfr_update
["vdur.{}.status-detailed".format(vdu_index
)] = "ERROR"
473 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
474 except DbException
as e
:
475 self
.logger
.error("Cannot update vnf. {}".format(e
))
477 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
479 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
480 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
481 :param nsr_desc_RO: nsr descriptor from RO
482 :return: Nothing, LcmException is raised on errors
484 for vnf_index
, db_vnfr
in db_vnfrs
.items():
485 for vnf_RO
in nsr_desc_RO
["vnfs"]:
486 if vnf_RO
["member_vnf_index"] != vnf_index
:
489 if vnf_RO
.get("ip_address"):
490 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
["ip_address"].split(";")[0]
491 elif not db_vnfr
.get("ip-address"):
492 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
493 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index
))
495 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
496 vdur_RO_count_index
= 0
497 if vdur
.get("pdu-type"):
499 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
500 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
502 if vdur
["count-index"] != vdur_RO_count_index
:
503 vdur_RO_count_index
+= 1
505 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
506 if vdur_RO
.get("ip_address"):
507 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
509 vdur
["ip-address"] = None
510 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
511 vdur
["name"] = vdur_RO
.get("vim_name")
512 vdur
["status"] = vdur_RO
.get("status")
513 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
514 for ifacer
in get_iterable(vdur
, "interfaces"):
515 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
516 if ifacer
["name"] == interface_RO
.get("internal_name"):
517 ifacer
["ip-address"] = interface_RO
.get("ip_address")
518 ifacer
["mac-address"] = interface_RO
.get("mac_address")
521 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
523 .format(vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]))
524 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
527 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
528 "VIM info".format(vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]))
530 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
531 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
532 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
534 vld
["vim-id"] = net_RO
.get("vim_net_id")
535 vld
["name"] = net_RO
.get("vim_name")
536 vld
["status"] = net_RO
.get("status")
537 vld
["status-detailed"] = net_RO
.get("error_msg")
538 vnfr_update
["vld.{}".format(vld_index
)] = vld
541 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
542 vnf_index
, vld
["id"]))
544 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
548 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index
))
550 def _get_ns_config_info(self
, nsr_id
):
552 Generates a mapping between vnf,vdu elements and the N2VC id
553 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
554 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
555 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
556 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
558 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
559 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
561 ns_config_info
= {"osm-config-mapping": mapping
}
562 for vca
in vca_deployed_list
:
563 if not vca
["member-vnf-index"]:
565 if not vca
["vdu_id"]:
566 mapping
[vca
["member-vnf-index"]] = vca
["application"]
568 mapping
["{}.{}.{}".format(vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"])] =\
570 return ns_config_info
572 async def _instantiate_ng_ro(self
, logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
, db_vnfds
,
573 n2vc_key_list
, stage
, start_deploy
, timeout_ns_deploy
):
577 def get_vim_account(vim_account_id
):
579 if vim_account_id
in db_vims
:
580 return db_vims
[vim_account_id
]
581 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
582 db_vims
[vim_account_id
] = db_vim
585 # modify target_vld info with instantiation parameters
586 def parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, target_sdn
):
587 if vld_params
.get("ip-profile"):
588 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
["ip-profile"]
589 if vld_params
.get("provider-network"):
590 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
["provider-network"]
591 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
592 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
["provider-network"]["sdn-ports"]
593 if vld_params
.get("wimAccountId"):
594 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
595 target_vld
["vim_info"][target_wim
] = {}
596 for param
in ("vim-network-name", "vim-network-id"):
597 if vld_params
.get(param
):
598 if isinstance(vld_params
[param
], dict):
599 for vim
, vim_net
in vld_params
[param
]:
600 other_target_vim
= "vim:" + vim
601 populate_dict(target_vld
["vim_info"], (other_target_vim
, param
.replace("-", "_")), vim_net
)
602 else: # isinstance str
603 target_vld
["vim_info"][target_vim
][param
.replace("-", "_")] = vld_params
[param
]
604 if vld_params
.get("common_id"):
605 target_vld
["common_id"] = vld_params
.get("common_id")
607 nslcmop_id
= db_nslcmop
["_id"]
609 "name": db_nsr
["name"],
612 "image": deepcopy(db_nsr
["image"]),
613 "flavor": deepcopy(db_nsr
["flavor"]),
614 "action_id": nslcmop_id
,
615 "cloud_init_content": {},
617 for image
in target
["image"]:
618 image
["vim_info"] = {}
619 for flavor
in target
["flavor"]:
620 flavor
["vim_info"] = {}
622 if db_nslcmop
.get("lcmOperationType") != "instantiate":
623 # get parameters of instantiation:
624 db_nslcmop_instantiate
= self
.db
.get_list("nslcmops", {"nsInstanceId": db_nslcmop
["nsInstanceId"],
625 "lcmOperationType": "instantiate"})[-1]
626 ns_params
= db_nslcmop_instantiate
.get("operationParams")
628 ns_params
= db_nslcmop
.get("operationParams")
629 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
630 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
633 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
634 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
638 "mgmt-network": vld
.get("mgmt-network", False),
639 "type": vld
.get("type"),
642 "vim_network_name": vld
.get("vim-network-name"),
643 "vim_account_id": ns_params
["vimAccountId"]
647 # check if this network needs SDN assist
648 if vld
.get("pci-interfaces"):
649 db_vim
= VimAccountDB
.get_vim_account_with_id(target_vld
["vim_info"][0]["vim_account_id"])
650 sdnc_id
= db_vim
["config"].get("sdn-controller")
652 target_vld
["vim_info"].append({"sdnc_id": sdnc_id
})
654 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
655 for nsd_vnf_profile
in nsd_vnf_profiles
:
656 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
657 if cp
["virtual-link-profile-id"] == vld
["id"]:
658 cp2target
["member_vnf:{}.{}".format(
659 cp
["constituent-cpd-id"][0]["constituent-base-element-id"],
660 cp
["constituent-cpd-id"][0]["constituent-cpd-id"]
661 )] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
663 # check at nsd descriptor, if there is an ip-profile
665 virtual_link_profiles
= get_virtual_link_profiles(nsd
)
667 for vlp
in virtual_link_profiles
:
668 ip_profile
= find_in_list(nsd
["ip-profiles"],
669 lambda profile
: profile
["name"] == vlp
["ip-profile-ref"])
670 vld_params
["ip-profile"] = ip_profile
["ip-profile-params"]
671 # update vld_params with instantiation params
672 vld_instantiation_params
= find_in_list(get_iterable(ns_params
, "vld"),
673 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]))
674 if vld_instantiation_params
:
675 vld_params
.update(vld_instantiation_params
)
676 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
677 target
["ns"]["vld"].append(target_vld
)
679 for vnfr
in db_vnfrs
.values():
680 vnfd
= find_in_list(db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"])
681 vnf_params
= find_in_list(get_iterable(ns_params
, "vnf"),
682 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"])
683 target_vnf
= deepcopy(vnfr
)
684 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
685 for vld
in target_vnf
.get("vld", ()):
686 # check if connected to a ns.vld, to fill target'
687 vnf_cp
= find_in_list(vnfd
.get("int-virtual-link-desc", ()),
688 lambda cpd
: cpd
.get("id") == vld
["id"])
690 ns_cp
= "member_vnf:{}.{}".format(vnfr
["member-vnf-index-ref"], vnf_cp
["id"])
691 if cp2target
.get(ns_cp
):
692 vld
["target"] = cp2target
[ns_cp
]
694 vld
["vim_info"] = {target_vim
: {"vim_network_name": vld
.get("vim-network-name")}}
695 # check if this network needs SDN assist
697 if vld
.get("pci-interfaces"):
698 db_vim
= get_vim_account(vnfr
["vim-account-id"])
699 sdnc_id
= db_vim
["config"].get("sdn-controller")
701 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
702 target_sdn
= "sdn:{}".format(sdnc_id
)
703 vld
["vim_info"][target_sdn
] = {
704 "sdn": True, "target_vim": target_vim
, "vlds": [sdn_vld
], "type": vld
.get("type")}
706 # check at vnfd descriptor, if there is an ip-profile
708 vnfd_vlp
= find_in_list(
709 get_virtual_link_profiles(vnfd
),
710 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"]
712 if vnfd_vlp
and vnfd_vlp
.get("virtual-link-protocol-data") and \
713 vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data"):
714 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"]["l3-protocol-data"]
715 ip_profile_dest_data
= {}
716 if "ip-version" in ip_profile_source_data
:
717 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
["ip-version"]
718 if "cidr" in ip_profile_source_data
:
719 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
["cidr"]
720 if "gateway-ip" in ip_profile_source_data
:
721 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
["gateway-ip"]
722 if "dhcp-enabled" in ip_profile_source_data
:
723 ip_profile_dest_data
["dhcp-params"] = {
724 "enabled": ip_profile_source_data
["dhcp-enabled"]
727 vld_params
["ip-profile"] = ip_profile_dest_data
728 # update vld_params with instantiation params
730 vld_instantiation_params
= find_in_list(get_iterable(vnf_params
, "internal-vld"),
731 lambda i_vld
: i_vld
["name"] == vld
["id"])
732 if vld_instantiation_params
:
733 vld_params
.update(vld_instantiation_params
)
734 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
737 for vdur
in target_vnf
.get("vdur", ()):
738 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
739 continue # This vdu must not be created
740 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
742 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
745 vdu_configuration
= get_vdu_configuration(vnfd
, vdur
["vdu-id-ref"])
746 vnf_configuration
= get_vnf_configuration(vnfd
)
747 if vdu_configuration
and vdu_configuration
.get("config-access") and \
748 vdu_configuration
.get("config-access").get("ssh-access"):
749 vdur
["ssh-keys"] = ssh_keys_all
750 vdur
["ssh-access-required"] = vdu_configuration
["config-access"]["ssh-access"]["required"]
751 elif vnf_configuration
and vnf_configuration
.get("config-access") and \
752 vnf_configuration
.get("config-access").get("ssh-access") and \
753 any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"]):
754 vdur
["ssh-keys"] = ssh_keys_all
755 vdur
["ssh-access-required"] = vnf_configuration
["config-access"]["ssh-access"]["required"]
756 elif ssh_keys_instantiation
and \
757 find_in_list(vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")):
758 vdur
["ssh-keys"] = ssh_keys_instantiation
760 self
.logger
.debug("NS > vdur > {}".format(vdur
))
762 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
764 if vdud
.get("cloud-init-file"):
765 vdur
["cloud-init"] = "{}:file:{}".format(vnfd
["_id"], vdud
.get("cloud-init-file"))
766 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
767 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
768 base_folder
= vnfd
["_admin"]["storage"]
769 cloud_init_file
= "{}/{}/cloud_init/{}".format(base_folder
["folder"], base_folder
["pkg-dir"],
770 vdud
.get("cloud-init-file"))
771 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
772 target
["cloud_init_content"][vdur
["cloud-init"]] = ci_file
.read()
773 elif vdud
.get("cloud-init"):
774 vdur
["cloud-init"] = "{}:vdu:{}".format(vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"]))
775 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
776 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
["cloud-init"]
777 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
778 deploy_params_vdu
= self
._format
_additional
_params
(vdur
.get("additionalParams") or {})
779 deploy_params_vdu
["OSM"] = get_osm_params(vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"])
780 vdur
["additionalParams"] = deploy_params_vdu
783 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
784 if target_vim
not in ns_flavor
["vim_info"]:
785 ns_flavor
["vim_info"][target_vim
] = {}
788 # in case alternative images are provided we must check if they should be applied
789 # for the vim_type, modify the vim_type taking into account
790 ns_image_id
= int(vdur
["ns-image-id"])
791 if vdur
.get("alt-image-ids"):
792 db_vim
= get_vim_account(vnfr
["vim-account-id"])
793 vim_type
= db_vim
["vim_type"]
794 for alt_image_id
in vdur
.get("alt-image-ids"):
795 ns_alt_image
= target
["image"][int(alt_image_id
)]
796 if vim_type
== ns_alt_image
.get("vim-type"):
797 # must use alternative image
798 self
.logger
.debug("use alternative image id: {}".format(alt_image_id
))
799 ns_image_id
= alt_image_id
800 vdur
["ns-image-id"] = ns_image_id
802 ns_image
= target
["image"][int(ns_image_id
)]
803 if target_vim
not in ns_image
["vim_info"]:
804 ns_image
["vim_info"][target_vim
] = {}
806 vdur
["vim_info"] = {target_vim
: {}}
807 # instantiation parameters
809 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
811 vdur_list
.append(vdur
)
812 target_vnf
["vdur"] = vdur_list
813 target
["vnf"].append(target_vnf
)
815 desc
= await self
.RO
.deploy(nsr_id
, target
)
816 self
.logger
.debug("RO return > {}".format(desc
))
817 action_id
= desc
["action_id"]
818 await self
._wait
_ng
_ro
(nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
)
822 "_admin.deployed.RO.operational-status": "running",
823 "detailed-status": " ".join(stage
)
825 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
826 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
827 self
._write
_op
_status
(nslcmop_id
, stage
)
828 self
.logger
.debug(logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
))
831 async def _wait_ng_ro(self
, nsr_id
, action_id
, nslcmop_id
=None, start_time
=None, timeout
=600, stage
=None):
832 detailed_status_old
= None
834 start_time
= start_time
or time()
835 while time() <= start_time
+ timeout
:
836 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
837 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
838 if desc_status
["status"] == "FAILED":
839 raise NgRoException(desc_status
["details"])
840 elif desc_status
["status"] == "BUILD":
842 stage
[2] = "VIM: ({})".format(desc_status
["details"])
843 elif desc_status
["status"] == "DONE":
845 stage
[2] = "Deployed at VIM"
848 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status
["status"])
849 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
850 detailed_status_old
= stage
[2]
851 db_nsr_update
["detailed-status"] = " ".join(stage
)
852 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
853 self
._write
_op
_status
(nslcmop_id
, stage
)
854 await asyncio
.sleep(15, loop
=self
.loop
)
855 else: # timeout_ns_deploy
856 raise NgRoException("Timeout waiting ns to deploy")
858 async def _terminate_ng_ro(self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
):
862 start_deploy
= time()
869 "action_id": nslcmop_id
871 desc
= await self
.RO
.deploy(nsr_id
, target
)
872 action_id
= desc
["action_id"]
873 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
874 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
875 self
.logger
.debug(logging_text
+ "ns terminate action at RO. action_id={}".format(action_id
))
878 delete_timeout
= 20 * 60 # 20 minutes
879 await self
._wait
_ng
_ro
(nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
)
881 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
882 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
884 await self
.RO
.delete(nsr_id
)
885 except Exception as e
:
886 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
887 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
888 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
889 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
890 self
.logger
.debug(logging_text
+ "RO_action_id={} already deleted".format(action_id
))
891 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
892 failed_detail
.append("delete conflict: {}".format(e
))
893 self
.logger
.debug(logging_text
+ "RO_action_id={} delete conflict: {}".format(action_id
, e
))
895 failed_detail
.append("delete error: {}".format(e
))
896 self
.logger
.error(logging_text
+ "RO_action_id={} delete error: {}".format(action_id
, e
))
899 stage
[2] = "Error deleting from VIM"
901 stage
[2] = "Deleted from VIM"
902 db_nsr_update
["detailed-status"] = " ".join(stage
)
903 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
904 self
._write
_op
_status
(nslcmop_id
, stage
)
907 raise LcmException("; ".join(failed_detail
))
910 async def instantiate_RO(self
, logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
, db_vnfds
,
911 n2vc_key_list
, stage
):
914 :param logging_text: preffix text to use at logging
915 :param nsr_id: nsr identity
916 :param nsd: database content of ns descriptor
917 :param db_nsr: database content of ns record
918 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
920 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
921 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
922 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
923 :return: None or exception
926 start_deploy
= time()
927 ns_params
= db_nslcmop
.get("operationParams")
928 if ns_params
and ns_params
.get("timeout_ns_deploy"):
929 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
931 timeout_ns_deploy
= self
.timeout
.get("ns_deploy", self
.timeout_ns_deploy
)
933 # Check for and optionally request placement optimization. Database will be updated if placement activated
934 stage
[2] = "Waiting for Placement."
935 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
936 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
937 for vnfr
in db_vnfrs
.values():
938 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
941 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
943 return await self
._instantiate
_ng
_ro
(logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
,
944 db_vnfds
, n2vc_key_list
, stage
, start_deploy
, timeout_ns_deploy
)
945 except Exception as e
:
946 stage
[2] = "ERROR deploying at VIM"
947 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
948 self
.logger
.error("Error deploying at VIM {}".format(e
),
949 exc_info
=not isinstance(e
, (ROclient
.ROClientException
, LcmException
, DbException
,
953 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
955 Wait for kdu to be up, get ip address
956 :param logging_text: prefix use for logging
963 # self.logger.debug(logging_text + "Starting wait_kdu_up")
966 while nb_tries
< 360:
967 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
968 kdur
= next((x
for x
in get_iterable(db_vnfr
, "kdur") if x
.get("kdu-name") == kdu_name
), None)
970 raise LcmException("Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
))
971 if kdur
.get("status"):
972 if kdur
["status"] in ("READY", "ENABLED"):
973 return kdur
.get("ip-address")
975 raise LcmException("target KDU={} is in error state".format(kdu_name
))
977 await asyncio
.sleep(10, loop
=self
.loop
)
979 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
981 async def wait_vm_up_insert_key_ro(self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None):
983 Wait for ip addres at RO, and optionally, insert public key in virtual machine
984 :param logging_text: prefix use for logging
989 :param pub_key: public ssh key to inject, None to skip
990 :param user: user to apply the public ssh key
994 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1004 if ro_retries
>= 360: # 1 hour
1005 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
))
1007 await asyncio
.sleep(10, loop
=self
.loop
)
1010 if not target_vdu_id
:
1011 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1013 if not vdu_id
: # for the VNF case
1014 if db_vnfr
.get("status") == "ERROR":
1015 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1016 ip_address
= db_vnfr
.get("ip-address")
1019 vdur
= next((x
for x
in get_iterable(db_vnfr
, "vdur") if x
.get("ip-address") == ip_address
), None)
1021 vdur
= next((x
for x
in get_iterable(db_vnfr
, "vdur")
1022 if x
.get("vdu-id-ref") == vdu_id
and x
.get("count-index") == vdu_index
), None)
1024 if not vdur
and len(db_vnfr
.get("vdur", ())) == 1: # If only one, this should be the target vdu
1025 vdur
= db_vnfr
["vdur"][0]
1027 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id
, vdu_id
,
1029 # New generation RO stores information at "vim_info"
1032 if vdur
.get("vim_info"):
1033 target_vim
= next(t
for t
in vdur
["vim_info"]) # there should be only one key
1034 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1035 if vdur
.get("pdu-type") or vdur
.get("status") == "ACTIVE" or ng_ro_status
== "ACTIVE":
1036 ip_address
= vdur
.get("ip-address")
1039 target_vdu_id
= vdur
["vdu-id-ref"]
1040 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1041 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1043 if not target_vdu_id
:
1046 # inject public key into machine
1047 if pub_key
and user
:
1048 self
.logger
.debug(logging_text
+ "Inserting RO key")
1049 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1050 if vdur
.get("pdu-type"):
1051 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1054 ro_vm_id
= "{}-{}".format(db_vnfr
["member-vnf-index-ref"], target_vdu_id
) # TODO add vdu_index
1056 target
= {"action": {"action": "inject_ssh_key", "key": pub_key
, "user": user
},
1057 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1059 desc
= await self
.RO
.deploy(nsr_id
, target
)
1060 action_id
= desc
["action_id"]
1061 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600)
1064 # wait until NS is deployed at RO
1066 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1067 ro_nsr_id
= deep_get(db_nsrs
, ("_admin", "deployed", "RO", "nsr_id"))
1070 result_dict
= await self
.RO
.create_action(
1072 item_id_name
=ro_nsr_id
,
1073 descriptor
={"add_public_key": pub_key
, "vms": [ro_vm_id
], "user": user
}
1075 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1076 if not result_dict
or not isinstance(result_dict
, dict):
1077 raise LcmException("Unknown response from RO when injecting key")
1078 for result
in result_dict
.values():
1079 if result
.get("vim_result") == 200:
1082 raise ROclient
.ROClientException("error injecting key: {}".format(
1083 result
.get("description")))
1085 except NgRoException
as e
:
1086 raise LcmException("Reaching max tries injecting key. Error: {}".format(e
))
1087 except ROclient
.ROClientException
as e
:
1089 self
.logger
.debug(logging_text
+ "error injecting key: {}. Retrying until {} seconds".
1093 raise LcmException("Reaching max tries injecting key. Error: {}".format(e
))
1099 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1101 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1103 my_vca
= vca_deployed_list
[vca_index
]
1104 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1105 # vdu or kdu: no dependencies
1109 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1110 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1111 configuration_status_list
= db_nsr
["configurationStatus"]
1112 for index
, vca_deployed
in enumerate(configuration_status_list
):
1113 if index
== vca_index
:
1116 if not my_vca
.get("member-vnf-index") or \
1117 (vca_deployed
.get("member-vnf-index") == my_vca
.get("member-vnf-index")):
1118 internal_status
= configuration_status_list
[index
].get("status")
1119 if internal_status
== 'READY':
1121 elif internal_status
== 'BROKEN':
1122 raise LcmException("Configuration aborted because dependent charm/s has failed")
1126 # no dependencies, return
1128 await asyncio
.sleep(10)
1131 raise LcmException("Configuration aborted because dependent charm/s timeout")
1133 async def instantiate_N2VC(self
, logging_text
, vca_index
, nsi_id
, db_nsr
, db_vnfr
, vdu_id
, kdu_name
, vdu_index
,
1134 config_descriptor
, deploy_params
, base_folder
, nslcmop_id
, stage
, vca_type
, vca_name
,
1135 ee_config_descriptor
):
1136 nsr_id
= db_nsr
["_id"]
1137 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1138 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1139 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1140 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1142 'collection': 'nsrs',
1143 'filter': {'_id': nsr_id
},
1144 'path': db_update_entry
1150 element_under_configuration
= nsr_id
1154 vnfr_id
= db_vnfr
["_id"]
1155 osm_config
["osm"]["vnf_id"] = vnfr_id
1157 namespace
= "{nsi}.{ns}".format(
1158 nsi
=nsi_id
if nsi_id
else "",
1162 element_type
= 'VNF'
1163 element_under_configuration
= vnfr_id
1164 namespace
+= ".{}".format(vnfr_id
)
1166 namespace
+= ".{}-{}".format(vdu_id
, vdu_index
or 0)
1167 element_type
= 'VDU'
1168 element_under_configuration
= "{}-{}".format(vdu_id
, vdu_index
or 0)
1169 osm_config
["osm"]["vdu_id"] = vdu_id
1171 namespace
+= ".{}".format(kdu_name
)
1172 element_type
= 'KDU'
1173 element_under_configuration
= kdu_name
1174 osm_config
["osm"]["kdu_name"] = kdu_name
1177 artifact_path
= "{}/{}/{}/{}".format(
1178 base_folder
["folder"],
1179 base_folder
["pkg-dir"],
1180 "charms" if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1184 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1186 # get initial_config_primitive_list that applies to this element
1187 initial_config_primitive_list
= config_descriptor
.get('initial-config-primitive')
1189 self
.logger
.debug("Initial config primitive list > {}".format(initial_config_primitive_list
))
1191 # add config if not present for NS charm
1192 ee_descriptor_id
= ee_config_descriptor
.get("id")
1193 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1194 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(initial_config_primitive_list
,
1195 vca_deployed
, ee_descriptor_id
)
1197 self
.logger
.debug("Initial config primitive list #2 > {}".format(initial_config_primitive_list
))
1198 # n2vc_redesign STEP 3.1
1199 # find old ee_id if exists
1200 ee_id
= vca_deployed
.get("ee_id")
1203 deep_get(db_vnfr
, ("vim-account-id",)) or
1204 deep_get(deploy_params
, ("OSM", "vim_account_id"))
1206 vca_cloud
, vca_cloud_credential
= self
.get_vca_cloud_and_credentials(vim_account_id
)
1207 vca_k8s_cloud
, vca_k8s_cloud_credential
= self
.get_vca_k8s_cloud_and_credentials(vim_account_id
)
1208 # create or register execution environment in VCA
1209 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1211 self
._write
_configuration
_status
(
1213 vca_index
=vca_index
,
1215 element_under_configuration
=element_under_configuration
,
1216 element_type
=element_type
1219 step
= "create execution environment"
1220 self
.logger
.debug(logging_text
+ step
)
1224 if vca_type
== "k8s_proxy_charm":
1225 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1226 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1:],
1227 namespace
=namespace
,
1228 artifact_path
=artifact_path
,
1230 cloud_name
=vca_k8s_cloud
,
1231 credential_name
=vca_k8s_cloud_credential
,
1233 elif vca_type
== "helm" or vca_type
== "helm-v3":
1234 ee_id
, credentials
= await self
.vca_map
[vca_type
].create_execution_environment(
1235 namespace
=namespace
,
1239 artifact_path
=artifact_path
,
1243 ee_id
, credentials
= await self
.vca_map
[vca_type
].create_execution_environment(
1244 namespace
=namespace
,
1247 cloud_name
=vca_cloud
,
1248 credential_name
=vca_cloud_credential
,
1251 elif vca_type
== "native_charm":
1252 step
= "Waiting to VM being up and getting IP address"
1253 self
.logger
.debug(logging_text
+ step
)
1254 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
,
1255 user
=None, pub_key
=None)
1256 credentials
= {"hostname": rw_mgmt_ip
}
1258 username
= deep_get(config_descriptor
, ("config-access", "ssh-access", "default-user"))
1259 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1260 # merged. Meanwhile let's get username from initial-config-primitive
1261 if not username
and initial_config_primitive_list
:
1262 for config_primitive
in initial_config_primitive_list
:
1263 for param
in config_primitive
.get("parameter", ()):
1264 if param
["name"] == "ssh-username":
1265 username
= param
["value"]
1268 raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
1269 "'config-access.ssh-access.default-user'")
1270 credentials
["username"] = username
1271 # n2vc_redesign STEP 3.2
1273 self
._write
_configuration
_status
(
1275 vca_index
=vca_index
,
1276 status
='REGISTERING',
1277 element_under_configuration
=element_under_configuration
,
1278 element_type
=element_type
1281 step
= "register execution environment {}".format(credentials
)
1282 self
.logger
.debug(logging_text
+ step
)
1283 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1284 credentials
=credentials
,
1285 namespace
=namespace
,
1287 cloud_name
=vca_cloud
,
1288 credential_name
=vca_cloud_credential
,
1291 # for compatibility with MON/POL modules, the need model and application name at database
1292 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1293 ee_id_parts
= ee_id
.split('.')
1294 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1295 if len(ee_id_parts
) >= 2:
1296 model_name
= ee_id_parts
[0]
1297 application_name
= ee_id_parts
[1]
1298 db_nsr_update
[db_update_entry
+ "model"] = model_name
1299 db_nsr_update
[db_update_entry
+ "application"] = application_name
1301 # n2vc_redesign STEP 3.3
1302 step
= "Install configuration Software"
1304 self
._write
_configuration
_status
(
1306 vca_index
=vca_index
,
1307 status
='INSTALLING SW',
1308 element_under_configuration
=element_under_configuration
,
1309 element_type
=element_type
,
1310 other_update
=db_nsr_update
1313 # TODO check if already done
1314 self
.logger
.debug(logging_text
+ step
)
1316 if vca_type
== "native_charm":
1317 config_primitive
= next((p
for p
in initial_config_primitive_list
if p
["name"] == "config"), None)
1318 if config_primitive
:
1319 config
= self
._map
_primitive
_params
(
1325 if vca_type
== "lxc_proxy_charm":
1326 if element_type
== "NS":
1327 num_units
= db_nsr
.get("config-units") or 1
1328 elif element_type
== "VNF":
1329 num_units
= db_vnfr
.get("config-units") or 1
1330 elif element_type
== "VDU":
1331 for v
in db_vnfr
["vdur"]:
1332 if vdu_id
== v
["vdu-id-ref"]:
1333 num_units
= v
.get("config-units") or 1
1335 if vca_type
!= "k8s_proxy_charm":
1336 await self
.vca_map
[vca_type
].install_configuration_sw(
1338 artifact_path
=artifact_path
,
1341 num_units
=num_units
,
1344 # write in db flag of configuration_sw already installed
1345 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True})
1347 # add relations for this VCA (wait for other peers related with this VCA)
1348 await self
._add
_vca
_relations
(logging_text
=logging_text
, nsr_id
=nsr_id
,
1349 vca_index
=vca_index
, vca_type
=vca_type
)
1351 # if SSH access is required, then get execution environment SSH public
1352 # if native charm we have waited already to VM be UP
1353 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1356 # self.logger.debug("get ssh key block")
1357 if deep_get(config_descriptor
, ("config-access", "ssh-access", "required")):
1358 # self.logger.debug("ssh key needed")
1359 # Needed to inject a ssh key
1360 user
= deep_get(config_descriptor
, ("config-access", "ssh-access", "default-user"))
1361 step
= "Install configuration Software, getting public ssh key"
1362 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(ee_id
=ee_id
, db_dict
=db_dict
)
1364 step
= "Insert public key into VM user={} ssh_key={}".format(user
, pub_key
)
1366 # self.logger.debug("no need to get ssh key")
1367 step
= "Waiting to VM being up and getting IP address"
1368 self
.logger
.debug(logging_text
+ step
)
1370 # n2vc_redesign STEP 5.1
1371 # wait for RO (ip-address) Insert pub_key into VM
1374 rw_mgmt_ip
= await self
.wait_kdu_up(logging_text
, nsr_id
, vnfr_id
, kdu_name
)
1376 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(logging_text
, nsr_id
, vnfr_id
, vdu_id
,
1377 vdu_index
, user
=user
, pub_key
=pub_key
)
1379 rw_mgmt_ip
= None # This is for a NS configuration
1381 self
.logger
.debug(logging_text
+ ' VM_ip_address={}'.format(rw_mgmt_ip
))
1383 # store rw_mgmt_ip in deploy params for later replacement
1384 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
1386 # n2vc_redesign STEP 6 Execute initial config primitive
1387 step
= 'execute initial config primitive'
1389 # wait for dependent primitives execution (NS -> VNF -> VDU)
1390 if initial_config_primitive_list
:
1391 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
1393 # stage, in function of element type: vdu, kdu, vnf or ns
1394 my_vca
= vca_deployed_list
[vca_index
]
1395 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1397 stage
[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1398 elif my_vca
.get("member-vnf-index"):
1400 stage
[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1403 stage
[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1405 self
._write
_configuration
_status
(
1407 vca_index
=vca_index
,
1408 status
='EXECUTING PRIMITIVE'
1411 self
._write
_op
_status
(
1416 check_if_terminated_needed
= True
1417 for initial_config_primitive
in initial_config_primitive_list
:
1418 # adding information on the vca_deployed if it is a NS execution environment
1419 if not vca_deployed
["member-vnf-index"]:
1420 deploy_params
["ns_config_info"] = json
.dumps(self
._get
_ns
_config
_info
(nsr_id
))
1421 # TODO check if already done
1422 primitive_params_
= self
._map
_primitive
_params
(initial_config_primitive
, {}, deploy_params
)
1424 step
= "execute primitive '{}' params '{}'".format(initial_config_primitive
["name"], primitive_params_
)
1425 self
.logger
.debug(logging_text
+ step
)
1426 await self
.vca_map
[vca_type
].exec_primitive(
1428 primitive_name
=initial_config_primitive
["name"],
1429 params_dict
=primitive_params_
,
1432 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1433 if check_if_terminated_needed
:
1434 if config_descriptor
.get('terminate-config-primitive'):
1435 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True})
1436 check_if_terminated_needed
= False
1438 # TODO register in database that primitive is done
1440 # STEP 7 Configure metrics
1441 if vca_type
== "helm" or vca_type
== "helm-v3":
1442 prometheus_jobs
= await self
.add_prometheus_metrics(
1444 artifact_path
=artifact_path
,
1445 ee_config_descriptor
=ee_config_descriptor
,
1448 target_ip
=rw_mgmt_ip
,
1451 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "prometheus_jobs": prometheus_jobs
})
1453 step
= "instantiated at VCA"
1454 self
.logger
.debug(logging_text
+ step
)
1456 self
._write
_configuration
_status
(
1458 vca_index
=vca_index
,
1462 except Exception as e
: # TODO not use Exception but N2VC exception
1463 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1464 if not isinstance(e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)):
1465 self
.logger
.error("Exception while {} : {}".format(step
, e
), exc_info
=True)
1466 self
._write
_configuration
_status
(
1468 vca_index
=vca_index
,
1471 raise LcmException("{} {}".format(step
, e
)) from e
1473 def _write_ns_status(self
, nsr_id
: str, ns_state
: str, current_operation
: str, current_operation_id
: str,
1474 error_description
: str = None, error_detail
: str = None, other_update
: dict = None):
1476 Update db_nsr fields.
1479 :param current_operation:
1480 :param current_operation_id:
1481 :param error_description:
1482 :param error_detail:
1483 :param other_update: Other required changes at database if provided, will be cleared
1487 db_dict
= other_update
or {}
1488 db_dict
["_admin.nslcmop"] = current_operation_id
# for backward compatibility
1489 db_dict
["_admin.current-operation"] = current_operation_id
1490 db_dict
["_admin.operation-type"] = current_operation
if current_operation
!= "IDLE" else None
1491 db_dict
["currentOperation"] = current_operation
1492 db_dict
["currentOperationID"] = current_operation_id
1493 db_dict
["errorDescription"] = error_description
1494 db_dict
["errorDetail"] = error_detail
1497 db_dict
["nsState"] = ns_state
1498 self
.update_db_2("nsrs", nsr_id
, db_dict
)
1499 except DbException
as e
:
1500 self
.logger
.warn('Error writing NS status, ns={}: {}'.format(nsr_id
, e
))
1502 def _write_op_status(self
, op_id
: str, stage
: list = None, error_message
: str = None, queuePosition
: int = 0,
1503 operation_state
: str = None, other_update
: dict = None):
1505 db_dict
= other_update
or {}
1506 db_dict
['queuePosition'] = queuePosition
1507 if isinstance(stage
, list):
1508 db_dict
['stage'] = stage
[0]
1509 db_dict
['detailed-status'] = " ".join(stage
)
1510 elif stage
is not None:
1511 db_dict
['stage'] = str(stage
)
1513 if error_message
is not None:
1514 db_dict
['errorMessage'] = error_message
1515 if operation_state
is not None:
1516 db_dict
['operationState'] = operation_state
1517 db_dict
["statusEnteredTime"] = time()
1518 self
.update_db_2("nslcmops", op_id
, db_dict
)
1519 except DbException
as e
:
1520 self
.logger
.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id
, e
))
1522 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
1524 nsr_id
= db_nsr
["_id"]
1525 # configurationStatus
1526 config_status
= db_nsr
.get('configurationStatus')
1528 db_nsr_update
= {"configurationStatus.{}.status".format(index
): status
for index
, v
in
1529 enumerate(config_status
) if v
}
1531 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1533 except DbException
as e
:
1534 self
.logger
.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id
, e
))
1536 def _write_configuration_status(self
, nsr_id
: str, vca_index
: int, status
: str = None,
1537 element_under_configuration
: str = None, element_type
: str = None,
1538 other_update
: dict = None):
1540 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1541 # .format(vca_index, status))
1544 db_path
= 'configurationStatus.{}.'.format(vca_index
)
1545 db_dict
= other_update
or {}
1547 db_dict
[db_path
+ 'status'] = status
1548 if element_under_configuration
:
1549 db_dict
[db_path
+ 'elementUnderConfiguration'] = element_under_configuration
1551 db_dict
[db_path
+ 'elementType'] = element_type
1552 self
.update_db_2("nsrs", nsr_id
, db_dict
)
1553 except DbException
as e
:
1554 self
.logger
.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1555 .format(status
, nsr_id
, vca_index
, e
))
1557 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
1559 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1560 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1561 Database is used because the result can be obtained from a different LCM worker in case of HA.
1562 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1563 :param db_nslcmop: database content of nslcmop
1564 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1565 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
1566 computed 'vim-account-id'
1569 nslcmop_id
= db_nslcmop
['_id']
1570 placement_engine
= deep_get(db_nslcmop
, ('operationParams', 'placement-engine'))
1571 if placement_engine
== "PLA":
1572 self
.logger
.debug(logging_text
+ "Invoke and wait for placement optimization")
1573 await self
.msg
.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id
}, loop
=self
.loop
)
1574 db_poll_interval
= 5
1575 wait
= db_poll_interval
* 10
1577 while not pla_result
and wait
>= 0:
1578 await asyncio
.sleep(db_poll_interval
)
1579 wait
-= db_poll_interval
1580 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1581 pla_result
= deep_get(db_nslcmop
, ('_admin', 'pla'))
1584 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id
))
1586 for pla_vnf
in pla_result
['vnf']:
1587 vnfr
= db_vnfrs
.get(pla_vnf
['member-vnf-index'])
1588 if not pla_vnf
.get('vimAccountId') or not vnfr
:
1591 self
.db
.set_one("vnfrs", {"_id": vnfr
["_id"]}, {"vim-account-id": pla_vnf
['vimAccountId']})
1593 vnfr
["vim-account-id"] = pla_vnf
['vimAccountId']
1596 def update_nsrs_with_pla_result(self
, params
):
1598 nslcmop_id
= deep_get(params
, ('placement', 'nslcmopId'))
1599 self
.update_db_2("nslcmops", nslcmop_id
, {"_admin.pla": params
.get('placement')})
1600 except Exception as e
:
1601 self
.logger
.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id
, e
))
1603 async def instantiate(self
, nsr_id
, nslcmop_id
):
1606 :param nsr_id: ns instance to deploy
1607 :param nslcmop_id: operation to run
1611 # Try to lock HA task here
1612 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
1613 if not task_is_locked_by_me
:
1614 self
.logger
.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id
))
1617 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
1618 self
.logger
.debug(logging_text
+ "Enter")
1620 # get all needed from database
1622 # database nsrs record
1625 # database nslcmops record
1628 # update operation on nsrs
1630 # update operation on nslcmops
1631 db_nslcmop_update
= {}
1633 nslcmop_operation_state
= None
1634 db_vnfrs
= {} # vnf's info indexed by member-index
1636 tasks_dict_info
= {} # from task to info text
1639 stage
= ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1640 # ^ stage, step, VIM progress
1642 # wait for any previous tasks in process
1643 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
1645 stage
[1] = "Sync filesystem from database."
1646 self
.fs
.sync() # TODO, make use of partial sync, only for the needed packages
1648 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1649 stage
[1] = "Reading from database."
1650 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1651 db_nsr_update
["detailed-status"] = "creating"
1652 db_nsr_update
["operational-status"] = "init"
1653 self
._write
_ns
_status
(
1655 ns_state
="BUILDING",
1656 current_operation
="INSTANTIATING",
1657 current_operation_id
=nslcmop_id
,
1658 other_update
=db_nsr_update
1660 self
._write
_op
_status
(
1666 # read from db: operation
1667 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
1668 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1669 ns_params
= db_nslcmop
.get("operationParams")
1670 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1671 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1673 timeout_ns_deploy
= self
.timeout
.get("ns_deploy", self
.timeout_ns_deploy
)
1676 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
1677 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1678 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
1679 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
1681 # nsr_name = db_nsr["name"] # TODO short-name??
1683 # read from db: vnf's of this ns
1684 stage
[1] = "Getting vnfrs from db."
1685 self
.logger
.debug(logging_text
+ stage
[1])
1686 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
1688 # read from db: vnfd's for every vnf
1689 db_vnfds
= [] # every vnfd data
1691 # for each vnf in ns, read vnfd
1692 for vnfr
in db_vnfrs_list
:
1693 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
1694 vnfd_id
= vnfr
["vnfd-id"]
1695 vnfd_ref
= vnfr
["vnfd-ref"]
1697 # if we haven't this vnfd, read it from db
1698 if vnfd_id
not in db_vnfds
:
1700 stage
[1] = "Getting vnfd={} id='{}' from db.".format(vnfd_id
, vnfd_ref
)
1701 self
.logger
.debug(logging_text
+ stage
[1])
1702 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
1705 db_vnfds
.append(vnfd
)
1707 # Get or generates the _admin.deployed.VCA list
1708 vca_deployed_list
= None
1709 if db_nsr
["_admin"].get("deployed"):
1710 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
1711 if vca_deployed_list
is None:
1712 vca_deployed_list
= []
1713 configuration_status_list
= []
1714 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
1715 db_nsr_update
["configurationStatus"] = configuration_status_list
1716 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1717 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
1718 elif isinstance(vca_deployed_list
, dict):
1719 # maintain backward compatibility. Change a dict to list at database
1720 vca_deployed_list
= list(vca_deployed_list
.values())
1721 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
1722 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
1724 if not isinstance(deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list):
1725 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
1726 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
1728 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1729 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
1730 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1731 self
.db
.set_list("vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"})
1733 # n2vc_redesign STEP 2 Deploy Network Scenario
1734 stage
[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
1735 self
._write
_op
_status
(
1740 stage
[1] = "Deploying KDUs."
1741 # self.logger.debug(logging_text + "Before deploy_kdus")
1742 # Call to deploy_kdus in case exists the "vdu:kdu" param
1743 await self
.deploy_kdus(
1744 logging_text
=logging_text
,
1746 nslcmop_id
=nslcmop_id
,
1749 task_instantiation_info
=tasks_dict_info
,
1752 stage
[1] = "Getting VCA public key."
1753 # n2vc_redesign STEP 1 Get VCA public ssh-key
1754 # feature 1429. Add n2vc public key to needed VMs
1755 n2vc_key
= self
.n2vc
.get_public_key()
1756 n2vc_key_list
= [n2vc_key
]
1757 if self
.vca_config
.get("public_key"):
1758 n2vc_key_list
.append(self
.vca_config
["public_key"])
1760 stage
[1] = "Deploying NS at VIM."
1761 task_ro
= asyncio
.ensure_future(
1762 self
.instantiate_RO(
1763 logging_text
=logging_text
,
1767 db_nslcmop
=db_nslcmop
,
1770 n2vc_key_list
=n2vc_key_list
,
1774 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
1775 tasks_dict_info
[task_ro
] = "Deploying at VIM"
1777 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1778 stage
[1] = "Deploying Execution Environments."
1779 self
.logger
.debug(logging_text
+ stage
[1])
1781 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
1782 for vnf_profile
in get_vnf_profiles(nsd
):
1783 vnfd_id
= vnf_profile
["vnfd-id"]
1784 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
1785 member_vnf_index
= str(vnf_profile
["id"])
1786 db_vnfr
= db_vnfrs
[member_vnf_index
]
1787 base_folder
= vnfd
["_admin"]["storage"]
1793 # Get additional parameters
1794 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
1795 if db_vnfr
.get("additionalParamsForVnf"):
1796 deploy_params
.update(parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy()))
1798 descriptor_config
= get_vnf_configuration(vnfd
)
1799 if descriptor_config
:
1801 logging_text
=logging_text
+ "member_vnf_index={} ".format(member_vnf_index
),
1804 nslcmop_id
=nslcmop_id
,
1810 member_vnf_index
=member_vnf_index
,
1811 vdu_index
=vdu_index
,
1813 deploy_params
=deploy_params
,
1814 descriptor_config
=descriptor_config
,
1815 base_folder
=base_folder
,
1816 task_instantiation_info
=tasks_dict_info
,
1820 # Deploy charms for each VDU that supports one.
1821 for vdud
in get_vdu_list(vnfd
):
1823 descriptor_config
= get_vdu_configuration(vnfd
, vdu_id
)
1824 vdur
= find_in_list(db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
)
1826 if vdur
.get("additionalParams"):
1827 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
1829 deploy_params_vdu
= deploy_params
1830 deploy_params_vdu
["OSM"] = get_osm_params(db_vnfr
, vdu_id
, vdu_count_index
=0)
1831 vdud_count
= get_vdu_profile(vnfd
, vdu_id
).get("max-number-of-instances", 1)
1833 self
.logger
.debug("VDUD > {}".format(vdud
))
1834 self
.logger
.debug("Descriptor config > {}".format(descriptor_config
))
1835 if descriptor_config
:
1838 for vdu_index
in range(vdud_count
):
1839 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1841 logging_text
=logging_text
+ "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1842 member_vnf_index
, vdu_id
, vdu_index
),
1845 nslcmop_id
=nslcmop_id
,
1851 member_vnf_index
=member_vnf_index
,
1852 vdu_index
=vdu_index
,
1854 deploy_params
=deploy_params_vdu
,
1855 descriptor_config
=descriptor_config
,
1856 base_folder
=base_folder
,
1857 task_instantiation_info
=tasks_dict_info
,
1860 for kdud
in get_kdu_list(vnfd
):
1861 kdu_name
= kdud
["name"]
1862 descriptor_config
= kdud
.get('kdu-configuration')
1863 if descriptor_config
:
1867 kdur
= next(x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
)
1868 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
1869 if kdur
.get("additionalParams"):
1870 deploy_params_kdu
= parse_yaml_strings(kdur
["additionalParams"])
1873 logging_text
=logging_text
,
1876 nslcmop_id
=nslcmop_id
,
1882 member_vnf_index
=member_vnf_index
,
1883 vdu_index
=vdu_index
,
1885 deploy_params
=deploy_params_kdu
,
1886 descriptor_config
=descriptor_config
,
1887 base_folder
=base_folder
,
1888 task_instantiation_info
=tasks_dict_info
,
1892 # Check if this NS has a charm configuration
1893 descriptor_config
= nsd
.get("ns-configuration")
1894 if descriptor_config
and descriptor_config
.get("juju"):
1897 member_vnf_index
= None
1903 # Get additional parameters
1904 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
1905 if db_nsr
.get("additionalParamsForNs"):
1906 deploy_params
.update(parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy()))
1907 base_folder
= nsd
["_admin"]["storage"]
1909 logging_text
=logging_text
,
1912 nslcmop_id
=nslcmop_id
,
1918 member_vnf_index
=member_vnf_index
,
1919 vdu_index
=vdu_index
,
1921 deploy_params
=deploy_params
,
1922 descriptor_config
=descriptor_config
,
1923 base_folder
=base_folder
,
1924 task_instantiation_info
=tasks_dict_info
,
1928 # rest of staff will be done at finally
1930 except (ROclient
.ROClientException
, DbException
, LcmException
, N2VCException
) as e
:
1931 self
.logger
.error(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
))
1933 except asyncio
.CancelledError
:
1934 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(stage
[1]))
1935 exc
= "Operation was cancelled"
1936 except Exception as e
:
1937 exc
= traceback
.format_exc()
1938 self
.logger
.critical(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
), exc_info
=True)
1941 error_list
.append(str(exc
))
1943 # wait for pending tasks
1945 stage
[1] = "Waiting for instantiate pending tasks."
1946 self
.logger
.debug(logging_text
+ stage
[1])
1947 error_list
+= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
, timeout_ns_deploy
,
1948 stage
, nslcmop_id
, nsr_id
=nsr_id
)
1949 stage
[1] = stage
[2] = ""
1950 except asyncio
.CancelledError
:
1951 error_list
.append("Cancelled")
1952 # TODO cancel all tasks
1953 except Exception as exc
:
1954 error_list
.append(str(exc
))
1956 # update operation-status
1957 db_nsr_update
["operational-status"] = "running"
1958 # let's begin with VCA 'configured' status (later we can change it)
1959 db_nsr_update
["config-status"] = "configured"
1960 for task
, task_name
in tasks_dict_info
.items():
1961 if not task
.done() or task
.cancelled() or task
.exception():
1962 if task_name
.startswith(self
.task_name_deploy_vca
):
1963 # A N2VC task is pending
1964 db_nsr_update
["config-status"] = "failed"
1966 # RO or KDU task is pending
1967 db_nsr_update
["operational-status"] = "failed"
1969 # update status at database
1971 error_detail
= ". ".join(error_list
)
1972 self
.logger
.error(logging_text
+ error_detail
)
1973 error_description_nslcmop
= '{} Detail: {}'.format(stage
[0], error_detail
)
1974 error_description_nsr
= 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id
, stage
[0])
1976 db_nsr_update
["detailed-status"] = error_description_nsr
+ " Detail: " + error_detail
1977 db_nslcmop_update
["detailed-status"] = error_detail
1978 nslcmop_operation_state
= "FAILED"
1982 error_description_nsr
= error_description_nslcmop
= None
1984 db_nsr_update
["detailed-status"] = "Done"
1985 db_nslcmop_update
["detailed-status"] = "Done"
1986 nslcmop_operation_state
= "COMPLETED"
1989 self
._write
_ns
_status
(
1992 current_operation
="IDLE",
1993 current_operation_id
=None,
1994 error_description
=error_description_nsr
,
1995 error_detail
=error_detail
,
1996 other_update
=db_nsr_update
1998 self
._write
_op
_status
(
2001 error_message
=error_description_nslcmop
,
2002 operation_state
=nslcmop_operation_state
,
2003 other_update
=db_nslcmop_update
,
2006 if nslcmop_operation_state
:
2008 await self
.msg
.aiowrite("ns", "instantiated", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
2009 "operationState": nslcmop_operation_state
},
2011 except Exception as e
:
2012 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
2014 self
.logger
.debug(logging_text
+ "Exit")
2015 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2017 async def _add_vca_relations(self
, logging_text
, nsr_id
, vca_index
: int,
2018 timeout
: int = 3600, vca_type
: str = None) -> bool:
2021 # 1. find all relations for this VCA
2022 # 2. wait for other peers related
2026 vca_type
= vca_type
or "lxc_proxy_charm"
2028 # STEP 1: find all relations for this VCA
2031 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2032 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2035 my_vca
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))[vca_index
]
2037 # read all ns-configuration relations
2038 ns_relations
= list()
2039 db_ns_relations
= deep_get(nsd
, ('ns-configuration', 'relation'))
2041 for r
in db_ns_relations
:
2042 # check if this VCA is in the relation
2043 if my_vca
.get('member-vnf-index') in\
2044 (r
.get('entities')[0].get('id'), r
.get('entities')[1].get('id')):
2045 ns_relations
.append(r
)
2047 # read all vnf-configuration relations
2048 vnf_relations
= list()
2049 db_vnfd_list
= db_nsr
.get('vnfd-id')
2051 for vnfd
in db_vnfd_list
:
2052 db_vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd
})
2053 db_vnf_relations
= deep_get(db_vnfd
, ('vnf-configuration', 'relation'))
2054 if db_vnf_relations
:
2055 for r
in db_vnf_relations
:
2056 # check if this VCA is in the relation
2057 if my_vca
.get('vdu_id') in (r
.get('entities')[0].get('id'), r
.get('entities')[1].get('id')):
2058 vnf_relations
.append(r
)
2060 # if no relations, terminate
2061 if not ns_relations
and not vnf_relations
:
2062 self
.logger
.debug(logging_text
+ ' No relations')
2065 self
.logger
.debug(logging_text
+ ' adding relations\n {}\n {}'.format(ns_relations
, vnf_relations
))
2072 if now
- start
>= timeout
:
2073 self
.logger
.error(logging_text
+ ' : timeout adding relations')
2076 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2077 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2079 # for each defined NS relation, find the VCA's related
2080 for r
in ns_relations
.copy():
2081 from_vca_ee_id
= None
2083 from_vca_endpoint
= None
2084 to_vca_endpoint
= None
2085 vca_list
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))
2086 for vca
in vca_list
:
2087 if vca
.get('member-vnf-index') == r
.get('entities')[0].get('id') \
2088 and vca
.get('config_sw_installed'):
2089 from_vca_ee_id
= vca
.get('ee_id')
2090 from_vca_endpoint
= r
.get('entities')[0].get('endpoint')
2091 if vca
.get('member-vnf-index') == r
.get('entities')[1].get('id') \
2092 and vca
.get('config_sw_installed'):
2093 to_vca_ee_id
= vca
.get('ee_id')
2094 to_vca_endpoint
= r
.get('entities')[1].get('endpoint')
2095 if from_vca_ee_id
and to_vca_ee_id
:
2097 await self
.vca_map
[vca_type
].add_relation(
2098 ee_id_1
=from_vca_ee_id
,
2099 ee_id_2
=to_vca_ee_id
,
2100 endpoint_1
=from_vca_endpoint
,
2101 endpoint_2
=to_vca_endpoint
)
2102 # remove entry from relations list
2103 ns_relations
.remove(r
)
2105 # check failed peers
2107 vca_status_list
= db_nsr
.get('configurationStatus')
2109 for i
in range(len(vca_list
)):
2111 vca_status
= vca_status_list
[i
]
2112 if vca
.get('member-vnf-index') == r
.get('entities')[0].get('id'):
2113 if vca_status
.get('status') == 'BROKEN':
2114 # peer broken: remove relation from list
2115 ns_relations
.remove(r
)
2116 if vca
.get('member-vnf-index') == r
.get('entities')[1].get('id'):
2117 if vca_status
.get('status') == 'BROKEN':
2118 # peer broken: remove relation from list
2119 ns_relations
.remove(r
)
2124 # for each defined VNF relation, find the VCA's related
2125 for r
in vnf_relations
.copy():
2126 from_vca_ee_id
= None
2128 from_vca_endpoint
= None
2129 to_vca_endpoint
= None
2130 vca_list
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))
2131 for vca
in vca_list
:
2132 key_to_check
= "vdu_id"
2133 if vca
.get("vdu_id") is None:
2134 key_to_check
= "vnfd_id"
2135 if vca
.get(key_to_check
) == r
.get('entities')[0].get('id') and vca
.get('config_sw_installed'):
2136 from_vca_ee_id
= vca
.get('ee_id')
2137 from_vca_endpoint
= r
.get('entities')[0].get('endpoint')
2138 if vca
.get(key_to_check
) == r
.get('entities')[1].get('id') and vca
.get('config_sw_installed'):
2139 to_vca_ee_id
= vca
.get('ee_id')
2140 to_vca_endpoint
= r
.get('entities')[1].get('endpoint')
2141 if from_vca_ee_id
and to_vca_ee_id
:
2143 await self
.vca_map
[vca_type
].add_relation(
2144 ee_id_1
=from_vca_ee_id
,
2145 ee_id_2
=to_vca_ee_id
,
2146 endpoint_1
=from_vca_endpoint
,
2147 endpoint_2
=to_vca_endpoint
)
2148 # remove entry from relations list
2149 vnf_relations
.remove(r
)
2151 # check failed peers
2153 vca_status_list
= db_nsr
.get('configurationStatus')
2155 for i
in range(len(vca_list
)):
2157 vca_status
= vca_status_list
[i
]
2158 if vca
.get('vdu_id') == r
.get('entities')[0].get('id'):
2159 if vca_status
.get('status') == 'BROKEN':
2160 # peer broken: remove relation from list
2161 vnf_relations
.remove(r
)
2162 if vca
.get('vdu_id') == r
.get('entities')[1].get('id'):
2163 if vca_status
.get('status') == 'BROKEN':
2164 # peer broken: remove relation from list
2165 vnf_relations
.remove(r
)
2171 await asyncio
.sleep(5.0)
2173 if not ns_relations
and not vnf_relations
:
2174 self
.logger
.debug('Relations added')
2179 except Exception as e
:
2180 self
.logger
.warn(logging_text
+ ' ERROR adding relations: {}'.format(e
))
2183 async def _install_kdu(self
, nsr_id
: str, nsr_db_path
: str, vnfr_data
: dict, kdu_index
: int, kdud
: dict,
2184 vnfd
: dict, k8s_instance_info
: dict, k8params
: dict = None, timeout
: int = 600):
2187 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
2189 db_dict_install
= {"collection": "nsrs",
2190 "filter": {"_id": nsr_id
},
2191 "path": nsr_db_path
}
2193 kdu_instance
= self
.k8scluster_map
[k8sclustertype
].generate_kdu_instance_name(
2194 db_dict
=db_dict_install
,
2195 kdu_model
=k8s_instance_info
["kdu-model"],
2197 self
.update_db_2("nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
})
2198 await self
.k8scluster_map
[k8sclustertype
].install(
2199 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2200 kdu_model
=k8s_instance_info
["kdu-model"],
2203 db_dict
=db_dict_install
,
2205 kdu_name
=k8s_instance_info
["kdu-name"],
2206 namespace
=k8s_instance_info
["namespace"],
2207 kdu_instance
=kdu_instance
,
2209 self
.update_db_2("nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
})
2211 # Obtain services to obtain management service ip
2212 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
2213 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2214 kdu_instance
=kdu_instance
,
2215 namespace
=k8s_instance_info
["namespace"])
2217 # Obtain management service info (if exists)
2218 vnfr_update_dict
= {}
2220 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
2221 mgmt_services
= [service
for service
in kdud
.get("service", []) if service
.get("mgmt-service")]
2222 for mgmt_service
in mgmt_services
:
2223 for service
in services
:
2224 if service
["name"].startswith(mgmt_service
["name"]):
2225 # Mgmt service found, Obtain service ip
2226 ip
= service
.get("external_ip", service
.get("cluster_ip"))
2227 if isinstance(ip
, list) and len(ip
) == 1:
2230 vnfr_update_dict
["kdur.{}.ip-address".format(kdu_index
)] = ip
2232 # Check if must update also mgmt ip at the vnf
2233 service_external_cp
= mgmt_service
.get("external-connection-point-ref")
2234 if service_external_cp
:
2235 if deep_get(vnfd
, ("mgmt-interface", "cp")) == service_external_cp
:
2236 vnfr_update_dict
["ip-address"] = ip
2240 self
.logger
.warn("Mgmt service name: {} not found".format(mgmt_service
["name"]))
2242 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
2243 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
2245 kdu_config
= kdud
.get("kdu-configuration")
2246 if kdu_config
and kdu_config
.get("initial-config-primitive") and kdu_config
.get("juju") is None:
2247 initial_config_primitive_list
= kdu_config
.get("initial-config-primitive")
2248 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
2250 for initial_config_primitive
in initial_config_primitive_list
:
2251 primitive_params_
= self
._map
_primitive
_params
(initial_config_primitive
, {}, {})
2253 await asyncio
.wait_for(
2254 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
2255 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2256 kdu_instance
=kdu_instance
,
2257 primitive_name
=initial_config_primitive
["name"],
2258 params
=primitive_params_
, db_dict
={}),
2261 except Exception as e
:
2262 # Prepare update db with error and raise exception
2264 self
.update_db_2("nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)})
2265 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), {"kdur.{}.status".format(kdu_index
): "ERROR"})
2267 # ignore to keep original exception
2269 # reraise original error
2274 async def deploy_kdus(self
, logging_text
, nsr_id
, nslcmop_id
, db_vnfrs
, db_vnfds
, task_instantiation_info
):
2275 # Launch kdus if present in the descriptor
2277 k8scluster_id_2_uuic
= {"helm-chart-v3": {}, "helm-chart": {}, "juju-bundle": {}}
2279 async def _get_cluster_id(cluster_id
, cluster_type
):
2280 nonlocal k8scluster_id_2_uuic
2281 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
2282 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
2284 # check if K8scluster is creating and wait look if previous tasks in process
2285 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related("k8scluster", cluster_id
)
2287 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name
, cluster_id
)
2288 self
.logger
.debug(logging_text
+ text
)
2289 await asyncio
.wait(task_dependency
, timeout
=3600)
2291 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False)
2292 if not db_k8scluster
:
2293 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
2295 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
2297 if cluster_type
== "helm-chart-v3":
2299 # backward compatibility for existing clusters that have not been initialized for helm v3
2300 k8s_credentials
= yaml
.safe_dump(db_k8scluster
.get("credentials"))
2301 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(k8s_credentials
,
2302 reuse_cluster_uuid
=cluster_id
)
2303 db_k8scluster_update
= {}
2304 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
2305 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
2306 db_k8scluster_update
["_admin.helm-chart-v3.created"] = uninstall_sw
2307 db_k8scluster_update
["_admin.helm-chart-v3.operationalState"] = "ENABLED"
2308 self
.update_db_2("k8sclusters", cluster_id
, db_k8scluster_update
)
2309 except Exception as e
:
2310 self
.logger
.error(logging_text
+ "error initializing helm-v3 cluster: {}".format(str(e
)))
2311 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id
,
2314 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".
2315 format(cluster_id
, cluster_type
))
2316 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
2319 logging_text
+= "Deploy kdus: "
2322 db_nsr_update
= {"_admin.deployed.K8s": []}
2323 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2326 updated_cluster_list
= []
2327 updated_v3_cluster_list
= []
2329 for vnfr_data
in db_vnfrs
.values():
2330 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
2331 # Step 0: Prepare and set parameters
2332 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
2333 vnfd_id
= vnfr_data
.get('vnfd-id')
2334 vnfd_with_id
= find_in_list(db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
)
2335 kdud
= next(kdud
for kdud
in vnfd_with_id
["kdu"] if kdud
["name"] == kdur
["kdu-name"])
2336 namespace
= kdur
.get("k8s-namespace")
2337 if kdur
.get("helm-chart"):
2338 kdumodel
= kdur
["helm-chart"]
2339 # Default version: helm3, if helm-version is v2 assign v2
2340 k8sclustertype
= "helm-chart-v3"
2341 self
.logger
.debug("kdur: {}".format(kdur
))
2342 if kdur
.get("helm-version") and kdur
.get("helm-version") == "v2":
2343 k8sclustertype
= "helm-chart"
2344 elif kdur
.get("juju-bundle"):
2345 kdumodel
= kdur
["juju-bundle"]
2346 k8sclustertype
= "juju-bundle"
2348 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2349 "juju-bundle. Maybe an old NBI version is running".
2350 format(vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]))
2351 # check if kdumodel is a file and exists
2353 vnfd_with_id
= find_in_list(db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
)
2354 storage
= deep_get(vnfd_with_id
, ('_admin', 'storage'))
2355 if storage
and storage
.get('pkg-dir'): # may be not present if vnfd has not artifacts
2356 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2357 filename
= '{}/{}/{}s/{}'.format(storage
["folder"], storage
["pkg-dir"], k8sclustertype
,
2359 if self
.fs
.file_exists(filename
, mode
='file') or self
.fs
.file_exists(filename
, mode
='dir'):
2360 kdumodel
= self
.fs
.path
+ filename
2361 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
2363 except Exception: # it is not a file
2366 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
2367 step
= "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id
)
2368 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
2371 if (k8sclustertype
== "helm-chart" and cluster_uuid
not in updated_cluster_list
)\
2372 or (k8sclustertype
== "helm-chart-v3" and cluster_uuid
not in updated_v3_cluster_list
):
2373 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
2374 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(cluster_uuid
=cluster_uuid
))
2375 if del_repo_list
or added_repo_dict
:
2376 if k8sclustertype
== "helm-chart":
2377 unset
= {'_admin.helm_charts_added.' + item
: None for item
in del_repo_list
}
2378 updated
= {'_admin.helm_charts_added.' +
2379 item
: name
for item
, name
in added_repo_dict
.items()}
2380 updated_cluster_list
.append(cluster_uuid
)
2381 elif k8sclustertype
== "helm-chart-v3":
2382 unset
= {'_admin.helm_charts_v3_added.' + item
: None for item
in del_repo_list
}
2383 updated
= {'_admin.helm_charts_v3_added.' +
2384 item
: name
for item
, name
in added_repo_dict
.items()}
2385 updated_v3_cluster_list
.append(cluster_uuid
)
2386 self
.logger
.debug(logging_text
+ "repos synchronized on k8s cluster "
2387 "'{}' to_delete: {}, to_add: {}".
2388 format(k8s_cluster_id
, del_repo_list
, added_repo_dict
))
2389 self
.db
.set_one("k8sclusters", {"_id": k8s_cluster_id
}, updated
, unset
=unset
)
2392 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data
["member-vnf-index-ref"],
2393 kdur
["kdu-name"], k8s_cluster_id
)
2394 k8s_instance_info
= {"kdu-instance": None,
2395 "k8scluster-uuid": cluster_uuid
,
2396 "k8scluster-type": k8sclustertype
,
2397 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
2398 "kdu-name": kdur
["kdu-name"],
2399 "kdu-model": kdumodel
,
2400 "namespace": namespace
}
2401 db_path
= "_admin.deployed.K8s.{}".format(index
)
2402 db_nsr_update
[db_path
] = k8s_instance_info
2403 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2404 vnfd_with_id
= find_in_list(db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
)
2405 task
= asyncio
.ensure_future(
2406 self
._install
_kdu
(nsr_id
, db_path
, vnfr_data
, kdu_index
, kdud
, vnfd_with_id
,
2407 k8s_instance_info
, k8params
=desc_params
, timeout
=600))
2408 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_KDU-{}".format(index
), task
)
2409 task_instantiation_info
[task
] = "Deploying KDU {}".format(kdur
["kdu-name"])
2413 except (LcmException
, asyncio
.CancelledError
):
2415 except Exception as e
:
2416 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
2417 if isinstance(e
, (N2VCException
, DbException
)):
2418 self
.logger
.error(logging_text
+ msg
)
2420 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
2421 raise LcmException(msg
)
2424 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2426 def _deploy_n2vc(self
, logging_text
, db_nsr
, db_vnfr
, nslcmop_id
, nsr_id
, nsi_id
, vnfd_id
, vdu_id
,
2427 kdu_name
, member_vnf_index
, vdu_index
, vdu_name
, deploy_params
, descriptor_config
,
2428 base_folder
, task_instantiation_info
, stage
):
2429 # launch instantiate_N2VC in a asyncio task and register task object
2430 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2431 # if not found, create one entry and update database
2432 # fill db_nsr._admin.deployed.VCA.<index>
2434 self
.logger
.debug(logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
))
2435 if descriptor_config
.get("juju"): # There is one execution envioronment of type juju
2436 ee_list
= [descriptor_config
]
2437 elif descriptor_config
.get("execution-environment-list"):
2438 ee_list
= descriptor_config
.get("execution-environment-list")
2439 else: # other types as script are not supported
2442 for ee_item
in ee_list
:
2443 self
.logger
.debug(logging_text
+ "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item
.get('juju'),
2444 ee_item
.get("helm-chart")))
2445 ee_descriptor_id
= ee_item
.get("id")
2446 if ee_item
.get("juju"):
2447 vca_name
= ee_item
['juju'].get('charm')
2448 vca_type
= "lxc_proxy_charm" if ee_item
['juju'].get('charm') is not None else "native_charm"
2449 if ee_item
['juju'].get('cloud') == "k8s":
2450 vca_type
= "k8s_proxy_charm"
2451 elif ee_item
['juju'].get('proxy') is False:
2452 vca_type
= "native_charm"
2453 elif ee_item
.get("helm-chart"):
2454 vca_name
= ee_item
['helm-chart']
2455 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
2458 vca_type
= "helm-v3"
2460 self
.logger
.debug(logging_text
+ "skipping non juju neither charm configuration")
2464 for vca_index
, vca_deployed
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
2465 if not vca_deployed
:
2467 if vca_deployed
.get("member-vnf-index") == member_vnf_index
and \
2468 vca_deployed
.get("vdu_id") == vdu_id
and \
2469 vca_deployed
.get("kdu_name") == kdu_name
and \
2470 vca_deployed
.get("vdu_count_index", 0) == vdu_index
and \
2471 vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
:
2474 # not found, create one.
2475 target
= "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
2477 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
2479 target
+= "/kdu/{}".format(kdu_name
)
2481 "target_element": target
,
2482 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
2483 "member-vnf-index": member_vnf_index
,
2485 "kdu_name": kdu_name
,
2486 "vdu_count_index": vdu_index
,
2487 "operational-status": "init", # TODO revise
2488 "detailed-status": "", # TODO revise
2489 "step": "initial-deploy", # TODO revise
2491 "vdu_name": vdu_name
,
2493 "ee_descriptor_id": ee_descriptor_id
2497 # create VCA and configurationStatus in db
2499 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
2500 "configurationStatus.{}".format(vca_index
): dict()
2502 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2504 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
2506 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
2507 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
2508 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
2511 task_n2vc
= asyncio
.ensure_future(
2512 self
.instantiate_N2VC(
2513 logging_text
=logging_text
,
2514 vca_index
=vca_index
,
2520 vdu_index
=vdu_index
,
2521 deploy_params
=deploy_params
,
2522 config_descriptor
=descriptor_config
,
2523 base_folder
=base_folder
,
2524 nslcmop_id
=nslcmop_id
,
2528 ee_config_descriptor
=ee_item
2531 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_N2VC-{}".format(vca_index
), task_n2vc
)
2532 task_instantiation_info
[task_n2vc
] = self
.task_name_deploy_vca
+ " {}.{}".format(
2533 member_vnf_index
or "", vdu_id
or "")
2536 def _create_nslcmop(nsr_id
, operation
, params
):
2538 Creates a ns-lcm-opp content to be stored at database.
2539 :param nsr_id: internal id of the instance
2540 :param operation: instantiate, terminate, scale, action, ...
2541 :param params: user parameters for the operation
2542 :return: dictionary following SOL005 format
2544 # Raise exception if invalid arguments
2545 if not (nsr_id
and operation
and params
):
2547 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2553 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2554 "operationState": "PROCESSING",
2555 "statusEnteredTime": now
,
2556 "nsInstanceId": nsr_id
,
2557 "lcmOperationType": operation
,
2559 "isAutomaticInvocation": False,
2560 "operationParams": params
,
2561 "isCancelPending": False,
2563 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
2564 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
2569 def _format_additional_params(self
, params
):
2570 params
= params
or {}
2571 for key
, value
in params
.items():
2572 if str(value
).startswith("!!yaml "):
2573 params
[key
] = yaml
.safe_load(value
[7:])
2576 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
2577 primitive
= seq
.get('name')
2578 primitive_params
= {}
2580 "member_vnf_index": vnf_index
,
2581 "primitive": primitive
,
2582 "primitive_params": primitive_params
,
2585 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
2589 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
2590 op
= deep_get(db_nslcmop
, ('_admin', 'operations'), [])[op_index
]
2591 if op
.get('operationState') == 'COMPLETED':
2592 # b. Skip sub-operation
2593 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2594 return self
.SUBOPERATION_STATUS_SKIP
2596 # c. retry executing sub-operation
2597 # The sub-operation exists, and operationState != 'COMPLETED'
2598 # Update operationState = 'PROCESSING' to indicate a retry.
2599 operationState
= 'PROCESSING'
2600 detailed_status
= 'In progress'
2601 self
._update
_suboperation
_status
(
2602 db_nslcmop
, op_index
, operationState
, detailed_status
)
2603 # Return the sub-operation index
2604 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2605 # with arguments extracted from the sub-operation
2608 # Find a sub-operation where all keys in a matching dictionary must match
2609 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2610 def _find_suboperation(self
, db_nslcmop
, match
):
2611 if db_nslcmop
and match
:
2612 op_list
= db_nslcmop
.get('_admin', {}).get('operations', [])
2613 for i
, op
in enumerate(op_list
):
2614 if all(op
.get(k
) == match
[k
] for k
in match
):
2616 return self
.SUBOPERATION_STATUS_NOT_FOUND
2618 # Update status for a sub-operation given its index
2619 def _update_suboperation_status(self
, db_nslcmop
, op_index
, operationState
, detailed_status
):
2620 # Update DB for HA tasks
2621 q_filter
= {'_id': db_nslcmop
['_id']}
2622 update_dict
= {'_admin.operations.{}.operationState'.format(op_index
): operationState
,
2623 '_admin.operations.{}.detailed-status'.format(op_index
): detailed_status
}
2624 self
.db
.set_one("nslcmops",
2626 update_dict
=update_dict
,
2627 fail_on_empty
=False)
2629 # Add sub-operation, return the index of the added sub-operation
2630 # Optionally, set operationState, detailed-status, and operationType
2631 # Status and type are currently set for 'scale' sub-operations:
2632 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2633 # 'detailed-status' : status message
2634 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2635 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2636 def _add_suboperation(self
, db_nslcmop
, vnf_index
, vdu_id
, vdu_count_index
, vdu_name
, primitive
,
2637 mapped_primitive_params
, operationState
=None, detailed_status
=None, operationType
=None,
2638 RO_nsr_id
=None, RO_scaling_info
=None):
2640 return self
.SUBOPERATION_STATUS_NOT_FOUND
2641 # Get the "_admin.operations" list, if it exists
2642 db_nslcmop_admin
= db_nslcmop
.get('_admin', {})
2643 op_list
= db_nslcmop_admin
.get('operations')
2644 # Create or append to the "_admin.operations" list
2645 new_op
= {'member_vnf_index': vnf_index
,
2647 'vdu_count_index': vdu_count_index
,
2648 'primitive': primitive
,
2649 'primitive_params': mapped_primitive_params
}
2651 new_op
['operationState'] = operationState
2653 new_op
['detailed-status'] = detailed_status
2655 new_op
['lcmOperationType'] = operationType
2657 new_op
['RO_nsr_id'] = RO_nsr_id
2659 new_op
['RO_scaling_info'] = RO_scaling_info
2661 # No existing operations, create key 'operations' with current operation as first list element
2662 db_nslcmop_admin
.update({'operations': [new_op
]})
2663 op_list
= db_nslcmop_admin
.get('operations')
2665 # Existing operations, append operation to list
2666 op_list
.append(new_op
)
2668 db_nslcmop_update
= {'_admin.operations': op_list
}
2669 self
.update_db_2("nslcmops", db_nslcmop
['_id'], db_nslcmop_update
)
2670 op_index
= len(op_list
) - 1
2673 # Helper methods for scale() sub-operations
2675 # pre-scale/post-scale:
2676 # Check for 3 different cases:
2677 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2678 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2679 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2680 def _check_or_add_scale_suboperation(self
, db_nslcmop
, vnf_index
, vnf_config_primitive
, primitive_params
,
2681 operationType
, RO_nsr_id
=None, RO_scaling_info
=None):
2682 # Find this sub-operation
2683 if RO_nsr_id
and RO_scaling_info
:
2684 operationType
= 'SCALE-RO'
2686 'member_vnf_index': vnf_index
,
2687 'RO_nsr_id': RO_nsr_id
,
2688 'RO_scaling_info': RO_scaling_info
,
2692 'member_vnf_index': vnf_index
,
2693 'primitive': vnf_config_primitive
,
2694 'primitive_params': primitive_params
,
2695 'lcmOperationType': operationType
2697 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
2698 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
2699 # a. New sub-operation
2700 # The sub-operation does not exist, add it.
2701 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2702 # The following parameters are set to None for all kind of scaling:
2704 vdu_count_index
= None
2706 if RO_nsr_id
and RO_scaling_info
:
2707 vnf_config_primitive
= None
2708 primitive_params
= None
2711 RO_scaling_info
= None
2712 # Initial status for sub-operation
2713 operationState
= 'PROCESSING'
2714 detailed_status
= 'In progress'
2715 # Add sub-operation for pre/post-scaling (zero or more operations)
2716 self
._add
_suboperation
(db_nslcmop
,
2721 vnf_config_primitive
,
2728 return self
.SUBOPERATION_STATUS_NEW
2730 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2731 # or op_index (operationState != 'COMPLETED')
2732 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
2734 # Function to return execution_environment id
2736 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
2737 # TODO vdu_index_count
2738 for vca
in vca_deployed_list
:
2739 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
2742 async def destroy_N2VC(self
, logging_text
, db_nslcmop
, vca_deployed
, config_descriptor
,
2743 vca_index
, destroy_ee
=True, exec_primitives
=True):
2745 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
2746 :param logging_text:
2748 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
2749 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
2750 :param vca_index: index in the database _admin.deployed.VCA
2751 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
2752 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
2753 not executed properly
2754 :return: None or exception
2758 logging_text
+ " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
2759 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
2763 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
2765 # execute terminate_primitives
2767 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
2768 config_descriptor
.get("terminate-config-primitive"), vca_deployed
.get("ee_descriptor_id"))
2769 vdu_id
= vca_deployed
.get("vdu_id")
2770 vdu_count_index
= vca_deployed
.get("vdu_count_index")
2771 vdu_name
= vca_deployed
.get("vdu_name")
2772 vnf_index
= vca_deployed
.get("member-vnf-index")
2773 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
2774 for seq
in terminate_primitives
:
2775 # For each sequence in list, get primitive and call _ns_execute_primitive()
2776 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
2777 vnf_index
, seq
.get("name"))
2778 self
.logger
.debug(logging_text
+ step
)
2779 # Create the primitive for each sequence, i.e. "primitive": "touch"
2780 primitive
= seq
.get('name')
2781 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(seq
, vnf_index
)
2784 self
._add
_suboperation
(db_nslcmop
,
2790 mapped_primitive_params
)
2791 # Sub-operations: Call _ns_execute_primitive() instead of action()
2793 result
, result_detail
= await self
._ns
_execute
_primitive
(vca_deployed
["ee_id"], primitive
,
2794 mapped_primitive_params
,
2796 except LcmException
:
2797 # this happens when VCA is not deployed. In this case it is not needed to terminate
2799 result_ok
= ['COMPLETED', 'PARTIALLY_COMPLETED']
2800 if result
not in result_ok
:
2801 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
2802 "error {}".format(seq
.get("name"), vnf_index
, result_detail
))
2803 # set that this VCA do not need terminated
2804 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(vca_index
)
2805 self
.update_db_2("nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False})
2807 if vca_deployed
.get("prometheus_jobs") and self
.prometheus
:
2808 await self
.prometheus
.update(remove_jobs
=vca_deployed
["prometheus_jobs"])
2811 await self
.vca_map
[vca_type
].delete_execution_environment(vca_deployed
["ee_id"])
2813 async def _delete_all_N2VC(self
, db_nsr
: dict):
2814 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
='TERMINATING')
2815 namespace
= "." + db_nsr
["_id"]
2817 await self
.n2vc
.delete_namespace(namespace
=namespace
, total_timeout
=self
.timeout_charm_delete
)
2818 except N2VCNotFound
: # already deleted. Skip
2820 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
='DELETED')
2822 async def _terminate_RO(self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
):
2824 Terminates a deployment from RO
2825 :param logging_text:
2826 :param nsr_deployed: db_nsr._admin.deployed
2829 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
2830 this method will update only the index 2, but it will write on database the concatenated content of the list
2835 ro_nsr_id
= ro_delete_action
= None
2836 if nsr_deployed
and nsr_deployed
.get("RO"):
2837 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
2838 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
2841 stage
[2] = "Deleting ns from VIM."
2842 db_nsr_update
["detailed-status"] = " ".join(stage
)
2843 self
._write
_op
_status
(nslcmop_id
, stage
)
2844 self
.logger
.debug(logging_text
+ stage
[2])
2845 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2846 self
._write
_op
_status
(nslcmop_id
, stage
)
2847 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
2848 ro_delete_action
= desc
["action_id"]
2849 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
2850 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
2851 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
2852 if ro_delete_action
:
2853 # wait until NS is deleted from VIM
2854 stage
[2] = "Waiting ns deleted from VIM."
2855 detailed_status_old
= None
2856 self
.logger
.debug(logging_text
+ stage
[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id
,
2858 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2859 self
._write
_op
_status
(nslcmop_id
, stage
)
2861 delete_timeout
= 20 * 60 # 20 minutes
2862 while delete_timeout
> 0:
2863 desc
= await self
.RO
.show(
2865 item_id_name
=ro_nsr_id
,
2866 extra_item
="action",
2867 extra_item_id
=ro_delete_action
)
2870 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
2872 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
2873 if ns_status
== "ERROR":
2874 raise ROclient
.ROClientException(ns_status_info
)
2875 elif ns_status
== "BUILD":
2876 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
2877 elif ns_status
== "ACTIVE":
2878 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
2879 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
2882 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status
)
2883 if stage
[2] != detailed_status_old
:
2884 detailed_status_old
= stage
[2]
2885 db_nsr_update
["detailed-status"] = " ".join(stage
)
2886 self
._write
_op
_status
(nslcmop_id
, stage
)
2887 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2888 await asyncio
.sleep(5, loop
=self
.loop
)
2890 else: # delete_timeout <= 0:
2891 raise ROclient
.ROClientException("Timeout waiting ns deleted from VIM")
2893 except Exception as e
:
2894 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2895 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
2896 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
2897 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
2898 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
2899 self
.logger
.debug(logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
))
2900 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
2901 failed_detail
.append("delete conflict: {}".format(e
))
2902 self
.logger
.debug(logging_text
+ "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
))
2904 failed_detail
.append("delete error: {}".format(e
))
2905 self
.logger
.error(logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
))
2908 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
2909 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
2911 stage
[2] = "Deleting nsd from RO."
2912 db_nsr_update
["detailed-status"] = " ".join(stage
)
2913 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2914 self
._write
_op
_status
(nslcmop_id
, stage
)
2915 await self
.RO
.delete("nsd", ro_nsd_id
)
2916 self
.logger
.debug(logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
))
2917 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
2918 except Exception as e
:
2919 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
2920 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
2921 self
.logger
.debug(logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
))
2922 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
2923 failed_detail
.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
))
2924 self
.logger
.debug(logging_text
+ failed_detail
[-1])
2926 failed_detail
.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
))
2927 self
.logger
.error(logging_text
+ failed_detail
[-1])
2929 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
2930 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
2931 if not vnf_deployed
or not vnf_deployed
["id"]:
2934 ro_vnfd_id
= vnf_deployed
["id"]
2935 stage
[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
2936 vnf_deployed
["member-vnf-index"], ro_vnfd_id
)
2937 db_nsr_update
["detailed-status"] = " ".join(stage
)
2938 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2939 self
._write
_op
_status
(nslcmop_id
, stage
)
2940 await self
.RO
.delete("vnfd", ro_vnfd_id
)
2941 self
.logger
.debug(logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
))
2942 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
2943 except Exception as e
:
2944 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
2945 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
2946 self
.logger
.debug(logging_text
+ "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
))
2947 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
2948 failed_detail
.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
))
2949 self
.logger
.debug(logging_text
+ failed_detail
[-1])
2951 failed_detail
.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
))
2952 self
.logger
.error(logging_text
+ failed_detail
[-1])
2955 stage
[2] = "Error deleting from VIM"
2957 stage
[2] = "Deleted from VIM"
2958 db_nsr_update
["detailed-status"] = " ".join(stage
)
2959 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2960 self
._write
_op
_status
(nslcmop_id
, stage
)
2963 raise LcmException("; ".join(failed_detail
))
2965 async def terminate(self
, nsr_id
, nslcmop_id
):
2966 # Try to lock HA task here
2967 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
2968 if not task_is_locked_by_me
:
2971 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
2972 self
.logger
.debug(logging_text
+ "Enter")
2973 timeout_ns_terminate
= self
.timeout_ns_terminate
2976 operation_params
= None
2978 error_list
= [] # annotates all failed error messages
2979 db_nslcmop_update
= {}
2980 autoremove
= False # autoremove after terminated
2981 tasks_dict_info
= {}
2983 stage
= ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
2984 # ^ contains [stage, step, VIM-status]
2986 # wait for any previous tasks in process
2987 await self
.lcm_tasks
.waitfor_related_HA("ns", 'nslcmops', nslcmop_id
)
2989 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2990 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2991 operation_params
= db_nslcmop
.get("operationParams") or {}
2992 if operation_params
.get("timeout_ns_terminate"):
2993 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
2994 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2995 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2997 db_nsr_update
["operational-status"] = "terminating"
2998 db_nsr_update
["config-status"] = "terminating"
2999 self
._write
_ns
_status
(
3001 ns_state
="TERMINATING",
3002 current_operation
="TERMINATING",
3003 current_operation_id
=nslcmop_id
,
3004 other_update
=db_nsr_update
3006 self
._write
_op
_status
(
3011 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
3012 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
3015 stage
[1] = "Getting vnf descriptors from db."
3016 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
3017 db_vnfds_from_id
= {}
3018 db_vnfds_from_member_index
= {}
3020 for vnfr
in db_vnfrs_list
:
3021 vnfd_id
= vnfr
["vnfd-id"]
3022 if vnfd_id
not in db_vnfds_from_id
:
3023 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
3024 db_vnfds_from_id
[vnfd_id
] = vnfd
3025 db_vnfds_from_member_index
[vnfr
["member-vnf-index-ref"]] = db_vnfds_from_id
[vnfd_id
]
3027 # Destroy individual execution environments when there are terminating primitives.
3028 # Rest of EE will be deleted at once
3029 # TODO - check before calling _destroy_N2VC
3030 # if not operation_params.get("skip_terminate_primitives"):#
3031 # or not vca.get("needed_terminate"):
3032 stage
[0] = "Stage 2/3 execute terminating primitives."
3033 self
.logger
.debug(logging_text
+ stage
[0])
3034 stage
[1] = "Looking execution environment that needs terminate."
3035 self
.logger
.debug(logging_text
+ stage
[1])
3037 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
3038 config_descriptor
= None
3039 if not vca
or not vca
.get("ee_id"):
3041 if not vca
.get("member-vnf-index"):
3043 config_descriptor
= db_nsr
.get("ns-configuration")
3044 elif vca
.get("vdu_id"):
3045 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3046 vdud
= next((vdu
for vdu
in db_vnfd
.get("vdu", ()) if vdu
["id"] == vca
.get("vdu_id")), None)
3048 config_descriptor
= vdud
.get("vdu-configuration")
3049 elif vca
.get("kdu_name"):
3050 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3051 kdud
= next((kdu
for kdu
in db_vnfd
.get("kdu", ()) if kdu
["name"] == vca
.get("kdu_name")), None)
3053 config_descriptor
= kdud
.get("kdu-configuration")
3055 config_descriptor
= db_vnfds_from_member_index
[vca
["member-vnf-index"]].get("vnf-configuration")
3056 vca_type
= vca
.get("type")
3057 exec_terminate_primitives
= (not operation_params
.get("skip_terminate_primitives") and
3058 vca
.get("needed_terminate"))
3059 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
3060 # pending native charms
3061 destroy_ee
= True if vca_type
in ("helm", "helm-v3", "native_charm") else False
3062 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
3063 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
3064 task
= asyncio
.ensure_future(
3065 self
.destroy_N2VC(logging_text
, db_nslcmop
, vca
, config_descriptor
, vca_index
,
3066 destroy_ee
, exec_terminate_primitives
))
3067 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
3069 # wait for pending tasks of terminate primitives
3071 self
.logger
.debug(logging_text
+ 'Waiting for tasks {}'.format(list(tasks_dict_info
.keys())))
3072 error_list
= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
,
3073 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
3075 tasks_dict_info
.clear()
3077 return # raise LcmException("; ".join(error_list))
3079 # remove All execution environments at once
3080 stage
[0] = "Stage 3/3 delete all."
3082 if nsr_deployed
.get("VCA"):
3083 stage
[1] = "Deleting all execution environments."
3084 self
.logger
.debug(logging_text
+ stage
[1])
3085 task_delete_ee
= asyncio
.ensure_future(asyncio
.wait_for(self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
),
3086 timeout
=self
.timeout_charm_delete
))
3087 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3088 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
3090 # Delete from k8scluster
3091 stage
[1] = "Deleting KDUs."
3092 self
.logger
.debug(logging_text
+ stage
[1])
3093 # print(nsr_deployed)
3094 for kdu
in get_iterable(nsr_deployed
, "K8s"):
3095 if not kdu
or not kdu
.get("kdu-instance"):
3097 kdu_instance
= kdu
.get("kdu-instance")
3098 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
3099 task_delete_kdu_instance
= asyncio
.ensure_future(
3100 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
3101 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3102 kdu_instance
=kdu_instance
))
3104 self
.logger
.error(logging_text
+ "Unknown k8s deployment type {}".
3105 format(kdu
.get("k8scluster-type")))
3107 tasks_dict_info
[task_delete_kdu_instance
] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
3110 stage
[1] = "Deleting ns from VIM."
3112 task_delete_ro
= asyncio
.ensure_future(
3113 self
._terminate
_ng
_ro
(logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
))
3115 task_delete_ro
= asyncio
.ensure_future(
3116 self
._terminate
_RO
(logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
))
3117 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
3119 # rest of staff will be done at finally
3121 except (ROclient
.ROClientException
, DbException
, LcmException
, N2VCException
) as e
:
3122 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3124 except asyncio
.CancelledError
:
3125 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(stage
[1]))
3126 exc
= "Operation was cancelled"
3127 except Exception as e
:
3128 exc
= traceback
.format_exc()
3129 self
.logger
.critical(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
), exc_info
=True)
3132 error_list
.append(str(exc
))
3134 # wait for pending tasks
3136 stage
[1] = "Waiting for terminate pending tasks."
3137 self
.logger
.debug(logging_text
+ stage
[1])
3138 error_list
+= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
, timeout_ns_terminate
,
3140 stage
[1] = stage
[2] = ""
3141 except asyncio
.CancelledError
:
3142 error_list
.append("Cancelled")
3143 # TODO cancell all tasks
3144 except Exception as exc
:
3145 error_list
.append(str(exc
))
3146 # update status at database
3148 error_detail
= "; ".join(error_list
)
3149 # self.logger.error(logging_text + error_detail)
3150 error_description_nslcmop
= '{} Detail: {}'.format(stage
[0], error_detail
)
3151 error_description_nsr
= 'Operation: TERMINATING.{}, {}.'.format(nslcmop_id
, stage
[0])
3153 db_nsr_update
["operational-status"] = "failed"
3154 db_nsr_update
["detailed-status"] = error_description_nsr
+ " Detail: " + error_detail
3155 db_nslcmop_update
["detailed-status"] = error_detail
3156 nslcmop_operation_state
= "FAILED"
3160 error_description_nsr
= error_description_nslcmop
= None
3161 ns_state
= "NOT_INSTANTIATED"
3162 db_nsr_update
["operational-status"] = "terminated"
3163 db_nsr_update
["detailed-status"] = "Done"
3164 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
3165 db_nslcmop_update
["detailed-status"] = "Done"
3166 nslcmop_operation_state
= "COMPLETED"
3169 self
._write
_ns
_status
(
3172 current_operation
="IDLE",
3173 current_operation_id
=None,
3174 error_description
=error_description_nsr
,
3175 error_detail
=error_detail
,
3176 other_update
=db_nsr_update
3178 self
._write
_op
_status
(
3181 error_message
=error_description_nslcmop
,
3182 operation_state
=nslcmop_operation_state
,
3183 other_update
=db_nslcmop_update
,
3185 if ns_state
== "NOT_INSTANTIATED":
3187 self
.db
.set_list("vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "NOT_INSTANTIATED"})
3188 except DbException
as e
:
3189 self
.logger
.warn(logging_text
+ 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
3191 if operation_params
:
3192 autoremove
= operation_params
.get("autoremove", False)
3193 if nslcmop_operation_state
:
3195 await self
.msg
.aiowrite("ns", "terminated", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
3196 "operationState": nslcmop_operation_state
,
3197 "autoremove": autoremove
},
3199 except Exception as e
:
3200 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
3202 self
.logger
.debug(logging_text
+ "Exit")
3203 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
3205 async def _wait_for_tasks(self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None):
3207 error_detail_list
= []
3209 pending_tasks
= list(created_tasks_info
.keys())
3210 num_tasks
= len(pending_tasks
)
3212 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
3213 self
._write
_op
_status
(nslcmop_id
, stage
)
3214 while pending_tasks
:
3216 _timeout
= timeout
+ time_start
- time()
3217 done
, pending_tasks
= await asyncio
.wait(pending_tasks
, timeout
=_timeout
,
3218 return_when
=asyncio
.FIRST_COMPLETED
)
3219 num_done
+= len(done
)
3220 if not done
: # Timeout
3221 for task
in pending_tasks
:
3222 new_error
= created_tasks_info
[task
] + ": Timeout"
3223 error_detail_list
.append(new_error
)
3224 error_list
.append(new_error
)
3227 if task
.cancelled():
3230 exc
= task
.exception()
3232 if isinstance(exc
, asyncio
.TimeoutError
):
3234 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
3235 error_list
.append(created_tasks_info
[task
])
3236 error_detail_list
.append(new_error
)
3237 if isinstance(exc
, (str, DbException
, N2VCException
, ROclient
.ROClientException
, LcmException
,
3238 K8sException
, NgRoException
)):
3239 self
.logger
.error(logging_text
+ new_error
)
3241 exc_traceback
= "".join(traceback
.format_exception(None, exc
, exc
.__traceback
__))
3242 self
.logger
.error(logging_text
+ created_tasks_info
[task
] + " " + exc_traceback
)
3244 self
.logger
.debug(logging_text
+ created_tasks_info
[task
] + ": Done")
3245 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
3247 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
3248 if nsr_id
: # update also nsr
3249 self
.update_db_2("nsrs", nsr_id
, {"errorDescription": "Error at: " + ", ".join(error_list
),
3250 "errorDetail": ". ".join(error_detail_list
)})
3251 self
._write
_op
_status
(nslcmop_id
, stage
)
3252 return error_detail_list
3255 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
3257 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3258 The default-value is used. If it is between < > it look for a value at instantiation_params
3259 :param primitive_desc: portion of VNFD/NSD that describes primitive
3260 :param params: Params provided by user
3261 :param instantiation_params: Instantiation params provided by user
3262 :return: a dictionary with the calculated params
3264 calculated_params
= {}
3265 for parameter
in primitive_desc
.get("parameter", ()):
3266 param_name
= parameter
["name"]
3267 if param_name
in params
:
3268 calculated_params
[param_name
] = params
[param_name
]
3269 elif "default-value" in parameter
or "value" in parameter
:
3270 if "value" in parameter
:
3271 calculated_params
[param_name
] = parameter
["value"]
3273 calculated_params
[param_name
] = parameter
["default-value"]
3274 if isinstance(calculated_params
[param_name
], str) and calculated_params
[param_name
].startswith("<") \
3275 and calculated_params
[param_name
].endswith(">"):
3276 if calculated_params
[param_name
][1:-1] in instantiation_params
:
3277 calculated_params
[param_name
] = instantiation_params
[calculated_params
[param_name
][1:-1]]
3279 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3280 format(calculated_params
[param_name
], primitive_desc
["name"]))
3282 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3283 format(param_name
, primitive_desc
["name"]))
3285 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
3286 calculated_params
[param_name
] = yaml
.safe_dump(calculated_params
[param_name
],
3287 default_flow_style
=True, width
=256)
3288 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[param_name
].startswith("!!yaml "):
3289 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
3290 if parameter
.get("data-type") == "INTEGER":
3292 calculated_params
[param_name
] = int(calculated_params
[param_name
])
3293 except ValueError: # error converting string to int
3295 "Parameter {} of primitive {} must be integer".format(param_name
, primitive_desc
["name"]))
3296 elif parameter
.get("data-type") == "BOOLEAN":
3297 calculated_params
[param_name
] = not ((str(calculated_params
[param_name
])).lower() == 'false')
3299 # add always ns_config_info if primitive name is config
3300 if primitive_desc
["name"] == "config":
3301 if "ns_config_info" in instantiation_params
:
3302 calculated_params
["ns_config_info"] = instantiation_params
["ns_config_info"]
3303 return calculated_params
3305 def _look_for_deployed_vca(self
, deployed_vca
, member_vnf_index
, vdu_id
, vdu_count_index
, kdu_name
=None,
3306 ee_descriptor_id
=None):
3307 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3308 for vca
in deployed_vca
:
3311 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
3313 if vdu_count_index
is not None and vdu_count_index
!= vca
["vdu_count_index"]:
3315 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
3317 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
3321 # vca_deployed not found
3322 raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
3323 " is not deployed".format(member_vnf_index
, vdu_id
, vdu_count_index
, kdu_name
,
3326 ee_id
= vca
.get("ee_id")
3327 vca_type
= vca
.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3329 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3330 "execution environment"
3331 .format(member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
))
3332 return ee_id
, vca_type
3334 async def _ns_execute_primitive(self
, ee_id
, primitive
, primitive_params
, retries
=0, retries_interval
=30,
3335 timeout
=None, vca_type
=None, db_dict
=None) -> (str, str):
3337 if primitive
== "config":
3338 primitive_params
= {"params": primitive_params
}
3340 vca_type
= vca_type
or "lxc_proxy_charm"
3344 output
= await asyncio
.wait_for(
3345 self
.vca_map
[vca_type
].exec_primitive(
3347 primitive_name
=primitive
,
3348 params_dict
=primitive_params
,
3349 progress_timeout
=self
.timeout_progress_primitive
,
3350 total_timeout
=self
.timeout_primitive
,
3352 timeout
=timeout
or self
.timeout_primitive
)
3355 except asyncio
.CancelledError
:
3357 except Exception as e
: # asyncio.TimeoutError
3358 if isinstance(e
, asyncio
.TimeoutError
):
3362 self
.logger
.debug('Error executing action {} on {} -> {}'.format(primitive
, ee_id
, e
))
3364 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
3366 return 'FAILED', str(e
)
3368 return 'COMPLETED', output
3370 except (LcmException
, asyncio
.CancelledError
):
3372 except Exception as e
:
3373 return 'FAIL', 'Error executing action {}: {}'.format(primitive
, e
)
3375 async def action(self
, nsr_id
, nslcmop_id
):
3376 # Try to lock HA task here
3377 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3378 if not task_is_locked_by_me
:
3381 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
3382 self
.logger
.debug(logging_text
+ "Enter")
3383 # get all needed from database
3387 db_nslcmop_update
= {}
3388 nslcmop_operation_state
= None
3389 error_description_nslcmop
= None
3392 # wait for any previous tasks in process
3393 step
= "Waiting for previous operations to terminate"
3394 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
3396 self
._write
_ns
_status
(
3399 current_operation
="RUNNING ACTION",
3400 current_operation_id
=nslcmop_id
3403 step
= "Getting information from database"
3404 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3405 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3407 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3408 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
3409 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
3410 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
3411 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
3412 primitive
= db_nslcmop
["operationParams"]["primitive"]
3413 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
3414 timeout_ns_action
= db_nslcmop
["operationParams"].get("timeout_ns_action", self
.timeout_primitive
)
3417 step
= "Getting vnfr from database"
3418 db_vnfr
= self
.db
.get_one("vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
})
3419 step
= "Getting vnfd from database"
3420 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
3422 step
= "Getting nsd from database"
3423 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
3425 # for backward compatibility
3426 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
3427 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
3428 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
3429 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3431 # look for primitive
3432 config_primitive_desc
= descriptor_configuration
= None
3434 descriptor_configuration
= get_vdu_configuration(db_vnfd
, vdu_id
)
3436 descriptor_configuration
= get_kdu_configuration(db_vnfd
, kdu_name
)
3438 descriptor_configuration
= get_vnf_configuration(db_vnfd
)
3440 descriptor_configuration
= db_nsd
.get("ns-configuration")
3442 if descriptor_configuration
and descriptor_configuration
.get("config-primitive"):
3443 for config_primitive
in descriptor_configuration
["config-primitive"]:
3444 if config_primitive
["name"] == primitive
:
3445 config_primitive_desc
= config_primitive
3448 if not config_primitive_desc
:
3449 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
3450 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3452 primitive_name
= primitive
3453 ee_descriptor_id
= None
3455 primitive_name
= config_primitive_desc
.get("execution-environment-primitive", primitive
)
3456 ee_descriptor_id
= config_primitive_desc
.get("execution-environment-ref")
3460 vdur
= next((x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None)
3461 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
3463 kdur
= next((x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None)
3464 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3466 desc_params
= parse_yaml_strings(db_vnfr
.get("additionalParamsForVnf"))
3468 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
3469 if kdu_name
and get_kdu_configuration(db_vnfd
, kdu_name
):
3470 kdu_configuration
= get_kdu_configuration(db_vnfd
, kdu_name
)
3472 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
3473 actions
.add(primitive
["name"])
3474 for primitive
in kdu_configuration
.get("config-primitive", []):
3475 actions
.add(primitive
["name"])
3476 kdu_action
= True if primitive_name
in actions
else False
3478 # TODO check if ns is in a proper status
3479 if kdu_name
and (primitive_name
in ("upgrade", "rollback", "status") or kdu_action
):
3480 # kdur and desc_params already set from before
3481 if primitive_params
:
3482 desc_params
.update(primitive_params
)
3483 # TODO Check if we will need something at vnf level
3484 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
3485 if kdu_name
== kdu
["kdu-name"] and kdu
["member-vnf-index"] == vnf_index
:
3488 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
))
3490 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
3491 msg
= "unknown k8scluster-type '{}'".format(kdu
.get("k8scluster-type"))
3492 raise LcmException(msg
)
3494 db_dict
= {"collection": "nsrs",
3495 "filter": {"_id": nsr_id
},
3496 "path": "_admin.deployed.K8s.{}".format(index
)}
3497 self
.logger
.debug(logging_text
+ "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
))
3498 step
= "Executing kdu {}".format(primitive_name
)
3499 if primitive_name
== "upgrade":
3500 if desc_params
.get("kdu_model"):
3501 kdu_model
= desc_params
.get("kdu_model")
3502 del desc_params
["kdu_model"]
3504 kdu_model
= kdu
.get("kdu-model")
3505 parts
= kdu_model
.split(sep
=":")
3507 kdu_model
= parts
[0]
3509 detailed_status
= await asyncio
.wait_for(
3510 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
3511 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3512 kdu_instance
=kdu
.get("kdu-instance"),
3513 atomic
=True, kdu_model
=kdu_model
,
3514 params
=desc_params
, db_dict
=db_dict
,
3515 timeout
=timeout_ns_action
),
3516 timeout
=timeout_ns_action
+ 10)
3517 self
.logger
.debug(logging_text
+ " Upgrade of kdu {} done".format(detailed_status
))
3518 elif primitive_name
== "rollback":
3519 detailed_status
= await asyncio
.wait_for(
3520 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
3521 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3522 kdu_instance
=kdu
.get("kdu-instance"),
3524 timeout
=timeout_ns_action
)
3525 elif primitive_name
== "status":
3526 detailed_status
= await asyncio
.wait_for(
3527 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
3528 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3529 kdu_instance
=kdu
.get("kdu-instance")),
3530 timeout
=timeout_ns_action
)
3532 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(kdu
["kdu-name"], nsr_id
)
3533 params
= self
._map
_primitive
_params
(config_primitive_desc
, primitive_params
, desc_params
)
3535 detailed_status
= await asyncio
.wait_for(
3536 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
3537 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3538 kdu_instance
=kdu_instance
,
3539 primitive_name
=primitive_name
,
3540 params
=params
, db_dict
=db_dict
,
3541 timeout
=timeout_ns_action
),
3542 timeout
=timeout_ns_action
)
3545 nslcmop_operation_state
= 'COMPLETED'
3547 detailed_status
= ''
3548 nslcmop_operation_state
= 'FAILED'
3550 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"], member_vnf_index
=vnf_index
,
3551 vdu_id
=vdu_id
, vdu_count_index
=vdu_count_index
,
3552 ee_descriptor_id
=ee_descriptor_id
)
3553 db_nslcmop_notif
= {"collection": "nslcmops",
3554 "filter": {"_id": nslcmop_id
},
3555 "path": "admin.VCA"}
3556 nslcmop_operation_state
, detailed_status
= await self
._ns
_execute
_primitive
(
3558 primitive
=primitive_name
,
3559 primitive_params
=self
._map
_primitive
_params
(config_primitive_desc
, primitive_params
, desc_params
),
3560 timeout
=timeout_ns_action
,
3562 db_dict
=db_nslcmop_notif
)
3564 db_nslcmop_update
["detailed-status"] = detailed_status
3565 error_description_nslcmop
= detailed_status
if nslcmop_operation_state
== "FAILED" else ""
3566 self
.logger
.debug(logging_text
+ " task Done with result {} {}".format(nslcmop_operation_state
,
3568 return # database update is called inside finally
3570 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
3571 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3573 except asyncio
.CancelledError
:
3574 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(step
))
3575 exc
= "Operation was cancelled"
3576 except asyncio
.TimeoutError
:
3577 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
3579 except Exception as e
:
3580 exc
= traceback
.format_exc()
3581 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
3584 db_nslcmop_update
["detailed-status"] = detailed_status
= error_description_nslcmop
= \
3585 "FAILED {}: {}".format(step
, exc
)
3586 nslcmop_operation_state
= "FAILED"
3588 self
._write
_ns
_status
(
3590 ns_state
=db_nsr
["nsState"], # TODO check if degraded. For the moment use previous status
3591 current_operation
="IDLE",
3592 current_operation_id
=None,
3593 # error_description=error_description_nsr,
3594 # error_detail=error_detail,
3595 other_update
=db_nsr_update
3598 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
="", error_message
=error_description_nslcmop
,
3599 operation_state
=nslcmop_operation_state
, other_update
=db_nslcmop_update
)
3601 if nslcmop_operation_state
:
3603 await self
.msg
.aiowrite("ns", "actioned", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
3604 "operationState": nslcmop_operation_state
},
3606 except Exception as e
:
3607 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
3608 self
.logger
.debug(logging_text
+ "Exit")
3609 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
3610 return nslcmop_operation_state
, detailed_status
3612 async def scale(self
, nsr_id
, nslcmop_id
):
3613 # Try to lock HA task here
3614 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3615 if not task_is_locked_by_me
:
3618 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
3619 stage
= ['', '', '']
3620 # ^ stage, step, VIM progress
3621 self
.logger
.debug(logging_text
+ "Enter")
3622 # get all needed from database
3624 db_nslcmop_update
= {}
3627 # in case of error, indicates what part of scale was failed to put nsr at error status
3628 scale_process
= None
3629 old_operational_status
= ""
3630 old_config_status
= ""
3632 # wait for any previous tasks in process
3633 step
= "Waiting for previous operations to terminate"
3634 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
3635 self
._write
_ns
_status
(nsr_id
=nsr_id
, ns_state
=None,
3636 current_operation
="SCALING", current_operation_id
=nslcmop_id
)
3638 step
= "Getting nslcmop from database"
3639 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
3640 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3642 step
= "Getting nsr from database"
3643 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3644 old_operational_status
= db_nsr
["operational-status"]
3645 old_config_status
= db_nsr
["config-status"]
3647 step
= "Parsing scaling parameters"
3648 db_nsr_update
["operational-status"] = "scaling"
3649 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3650 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3653 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3654 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
3655 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3656 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3657 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3660 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3661 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3662 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
3663 # for backward compatibility
3664 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
3665 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
3666 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
3667 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3669 step
= "Getting vnfr from database"
3670 db_vnfr
= self
.db
.get_one("vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
})
3672 step
= "Getting vnfd from database"
3673 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
3675 step
= "Getting scaling-group-descriptor"
3676 scaling_descriptor
= find_in_list(
3680 lambda scale_desc
: scale_desc
["name"] == scaling_group
3682 if not scaling_descriptor
:
3683 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3684 "at vnfd:scaling-group-descriptor".format(scaling_group
))
3686 step
= "Sending scale order to VIM"
3687 # TODO check if ns is in a proper status
3689 if not db_nsr
["_admin"].get("scaling-group"):
3690 self
.update_db_2("nsrs", nsr_id
, {"_admin.scaling-group": [{"name": scaling_group
, "nb-scale-op": 0}]})
3691 admin_scale_index
= 0
3693 for admin_scale_index
, admin_scale_info
in enumerate(db_nsr
["_admin"]["scaling-group"]):
3694 if admin_scale_info
["name"] == scaling_group
:
3695 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
3697 else: # not found, set index one plus last element and add new entry with the name
3698 admin_scale_index
+= 1
3699 db_nsr_update
["_admin.scaling-group.{}.name".format(admin_scale_index
)] = scaling_group
3700 RO_scaling_info
= []
3701 vdu_scaling_info
= {"scaling_group_name": scaling_group
, "vdu": []}
3702 if scaling_type
== "SCALE_OUT":
3703 if "aspect-delta-details" not in scaling_descriptor
:
3705 "Aspect delta details not fount in scaling descriptor {}".format(
3706 scaling_descriptor
["name"]
3709 # count if max-instance-count is reached
3710 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
3712 vdu_scaling_info
["scaling_direction"] = "OUT"
3713 vdu_scaling_info
["vdu-create"] = {}
3714 for delta
in deltas
:
3715 for vdu_delta
in delta
["vdu-delta"]:
3716 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
3717 vdu_index
= get_vdu_index(db_vnfr
, vdu_delta
["id"])
3718 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
3720 additional_params
= self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"]) or {}
3721 cloud_init_list
= []
3723 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
3724 max_instance_count
= 10
3725 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
3726 max_instance_count
= vdu_profile
.get("max-number-of-instances", 10)
3728 deafult_instance_num
= get_number_of_instances(db_vnfd
, vdud
["id"])
3730 nb_scale_op
+= vdu_delta
.get("number-of-instances", 1)
3732 if nb_scale_op
+ deafult_instance_num
> max_instance_count
:
3734 "reached the limit of {} (max-instance-count) "
3735 "scaling-out operations for the "
3736 "scaling-group-descriptor '{}'".format(nb_scale_op
, scaling_group
)
3738 for x
in range(vdu_delta
.get("number-of-instances", 1)):
3740 # TODO Information of its own ip is not available because db_vnfr is not updated.
3741 additional_params
["OSM"] = get_osm_params(
3746 cloud_init_list
.append(
3747 self
._parse
_cloud
_init
(
3754 RO_scaling_info
.append(
3756 "osm_vdu_id": vdu_delta
["id"],
3757 "member-vnf-index": vnf_index
,
3759 "count": vdu_delta
.get("number-of-instances", 1)
3763 RO_scaling_info
[-1]["cloud_init"] = cloud_init_list
3764 vdu_scaling_info
["vdu-create"][vdu_delta
["id"]] = vdu_delta
.get("number-of-instances", 1)
3766 elif scaling_type
== "SCALE_IN":
3767 if "min-instance-count" in scaling_descriptor
and scaling_descriptor
["min-instance-count"] is not None:
3768 min_instance_count
= int(scaling_descriptor
["min-instance-count"])
3770 vdu_scaling_info
["scaling_direction"] = "IN"
3771 vdu_scaling_info
["vdu-delete"] = {}
3772 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
3773 for delta
in deltas
:
3774 for vdu_delta
in delta
["vdu-delta"]:
3775 min_instance_count
= 0
3776 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
3777 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
3778 min_instance_count
= vdu_profile
["min-number-of-instances"]
3780 deafult_instance_num
= get_number_of_instances(db_vnfd
, vdu_delta
["id"])
3782 nb_scale_op
-= vdu_delta
.get("number-of-instances", 1)
3783 if nb_scale_op
+ deafult_instance_num
< min_instance_count
:
3785 "reached the limit of {} (min-instance-count) scaling-in operations for the "
3786 "scaling-group-descriptor '{}'".format(nb_scale_op
, scaling_group
)
3788 RO_scaling_info
.append({"osm_vdu_id": vdu_delta
["id"], "member-vnf-index": vnf_index
,
3789 "type": "delete", "count": vdu_delta
.get("number-of-instances", 1)})
3790 vdu_scaling_info
["vdu-delete"][vdu_delta
["id"]] = vdu_delta
.get("number-of-instances", 1)
3792 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3793 vdu_delete
= copy(vdu_scaling_info
.get("vdu-delete"))
3794 if vdu_scaling_info
["scaling_direction"] == "IN":
3795 for vdur
in reversed(db_vnfr
["vdur"]):
3796 if vdu_delete
.get(vdur
["vdu-id-ref"]):
3797 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
3798 vdu_scaling_info
["vdu"].append({
3799 "name": vdur
.get("name") or vdur
.get("vdu-name"),
3800 "vdu_id": vdur
["vdu-id-ref"],
3803 for interface
in vdur
["interfaces"]:
3804 vdu_scaling_info
["vdu"][-1]["interface"].append({
3805 "name": interface
["name"],
3806 "ip_address": interface
["ip-address"],
3807 "mac_address": interface
.get("mac-address"),
3809 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
3812 step
= "Executing pre-scale vnf-config-primitive"
3813 if scaling_descriptor
.get("scaling-config-action"):
3814 for scaling_config_action
in scaling_descriptor
["scaling-config-action"]:
3815 if (scaling_config_action
.get("trigger") == "pre-scale-in" and scaling_type
== "SCALE_IN") \
3816 or (scaling_config_action
.get("trigger") == "pre-scale-out" and scaling_type
== "SCALE_OUT"):
3817 vnf_config_primitive
= scaling_config_action
["vnf-config-primitive-name-ref"]
3818 step
= db_nslcmop_update
["detailed-status"] = \
3819 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive
)
3821 # look for primitive
3822 for config_primitive
in db_vnfd
.get("vnf-configuration", {}).get("config-primitive", ()):
3823 if config_primitive
["name"] == vnf_config_primitive
:
3827 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3828 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3829 "primitive".format(scaling_group
, vnf_config_primitive
))
3831 vnfr_params
= {"VDU_SCALE_INFO": vdu_scaling_info
}
3832 if db_vnfr
.get("additionalParamsForVnf"):
3833 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
3835 scale_process
= "VCA"
3836 db_nsr_update
["config-status"] = "configuring pre-scaling"
3837 primitive_params
= self
._map
_primitive
_params
(config_primitive
, {}, vnfr_params
)
3839 # Pre-scale retry check: Check if this sub-operation has been executed before
3840 op_index
= self
._check
_or
_add
_scale
_suboperation
(
3841 db_nslcmop
, nslcmop_id
, vnf_index
, vnf_config_primitive
, primitive_params
, 'PRE-SCALE')
3842 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
3843 # Skip sub-operation
3844 result
= 'COMPLETED'
3845 result_detail
= 'Done'
3846 self
.logger
.debug(logging_text
+
3847 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3848 vnf_config_primitive
, result
, result_detail
))
3850 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
3851 # New sub-operation: Get index of this sub-operation
3852 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
3853 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} New sub-operation".
3854 format(vnf_config_primitive
))
3856 # retry: Get registered params for this existing sub-operation
3857 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
3858 vnf_index
= op
.get('member_vnf_index')
3859 vnf_config_primitive
= op
.get('primitive')
3860 primitive_params
= op
.get('primitive_params')
3861 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Sub-operation retry".
3862 format(vnf_config_primitive
))
3863 # Execute the primitive, either with new (first-time) or registered (reintent) args
3864 ee_descriptor_id
= config_primitive
.get("execution-environment-ref")
3865 primitive_name
= config_primitive
.get("execution-environment-primitive",
3866 vnf_config_primitive
)
3867 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
3868 member_vnf_index
=vnf_index
,
3870 vdu_count_index
=None,
3871 ee_descriptor_id
=ee_descriptor_id
)
3872 result
, result_detail
= await self
._ns
_execute
_primitive
(
3873 ee_id
, primitive_name
, primitive_params
, vca_type
)
3874 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Done with result {} {}".format(
3875 vnf_config_primitive
, result
, result_detail
))
3876 # Update operationState = COMPLETED | FAILED
3877 self
._update
_suboperation
_status
(
3878 db_nslcmop
, op_index
, result
, result_detail
)
3880 if result
== "FAILED":
3881 raise LcmException(result_detail
)
3882 db_nsr_update
["config-status"] = old_config_status
3883 scale_process
= None
3886 db_nsr_update
["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)] = nb_scale_op
3887 db_nsr_update
["_admin.scaling-group.{}.time".format(admin_scale_index
)] = time()
3891 scale_process
= "RO"
3892 if self
.ro_config
.get("ng"):
3893 await self
._scale
_ng
_ro
(logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
)
3894 vdu_scaling_info
.pop("vdu-create", None)
3895 vdu_scaling_info
.pop("vdu-delete", None)
3897 scale_process
= None
3899 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3902 # execute primitive service POST-SCALING
3903 step
= "Executing post-scale vnf-config-primitive"
3904 if scaling_descriptor
.get("scaling-config-action"):
3905 for scaling_config_action
in scaling_descriptor
["scaling-config-action"]:
3906 if (scaling_config_action
.get("trigger") == "post-scale-in" and scaling_type
== "SCALE_IN") \
3907 or (scaling_config_action
.get("trigger") == "post-scale-out" and scaling_type
== "SCALE_OUT"):
3908 vnf_config_primitive
= scaling_config_action
["vnf-config-primitive-name-ref"]
3909 step
= db_nslcmop_update
["detailed-status"] = \
3910 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive
)
3912 vnfr_params
= {"VDU_SCALE_INFO": vdu_scaling_info
}
3913 if db_vnfr
.get("additionalParamsForVnf"):
3914 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
3916 # look for primitive
3917 for config_primitive
in db_vnfd
.get("vnf-configuration", {}).get("config-primitive", ()):
3918 if config_primitive
["name"] == vnf_config_primitive
:
3922 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
3923 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
3924 "config-primitive".format(scaling_group
, vnf_config_primitive
))
3925 scale_process
= "VCA"
3926 db_nsr_update
["config-status"] = "configuring post-scaling"
3927 primitive_params
= self
._map
_primitive
_params
(config_primitive
, {}, vnfr_params
)
3929 # Post-scale retry check: Check if this sub-operation has been executed before
3930 op_index
= self
._check
_or
_add
_scale
_suboperation
(
3931 db_nslcmop
, nslcmop_id
, vnf_index
, vnf_config_primitive
, primitive_params
, 'POST-SCALE')
3932 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
3933 # Skip sub-operation
3934 result
= 'COMPLETED'
3935 result_detail
= 'Done'
3936 self
.logger
.debug(logging_text
+
3937 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3938 format(vnf_config_primitive
, result
, result_detail
))
3940 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
3941 # New sub-operation: Get index of this sub-operation
3942 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
3943 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} New sub-operation".
3944 format(vnf_config_primitive
))
3946 # retry: Get registered params for this existing sub-operation
3947 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
3948 vnf_index
= op
.get('member_vnf_index')
3949 vnf_config_primitive
= op
.get('primitive')
3950 primitive_params
= op
.get('primitive_params')
3951 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Sub-operation retry".
3952 format(vnf_config_primitive
))
3953 # Execute the primitive, either with new (first-time) or registered (reintent) args
3954 ee_descriptor_id
= config_primitive
.get("execution-environment-ref")
3955 primitive_name
= config_primitive
.get("execution-environment-primitive",
3956 vnf_config_primitive
)
3957 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
3958 member_vnf_index
=vnf_index
,
3960 vdu_count_index
=None,
3961 ee_descriptor_id
=ee_descriptor_id
)
3962 result
, result_detail
= await self
._ns
_execute
_primitive
(
3963 ee_id
, primitive_name
, primitive_params
, vca_type
)
3964 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Done with result {} {}".format(
3965 vnf_config_primitive
, result
, result_detail
))
3966 # Update operationState = COMPLETED | FAILED
3967 self
._update
_suboperation
_status
(
3968 db_nslcmop
, op_index
, result
, result_detail
)
3970 if result
== "FAILED":
3971 raise LcmException(result_detail
)
3972 db_nsr_update
["config-status"] = old_config_status
3973 scale_process
= None
3976 db_nsr_update
["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3977 db_nsr_update
["operational-status"] = "running" if old_operational_status
== "failed" \
3978 else old_operational_status
3979 db_nsr_update
["config-status"] = old_config_status
3981 except (ROclient
.ROClientException
, DbException
, LcmException
, NgRoException
) as e
:
3982 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3984 except asyncio
.CancelledError
:
3985 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(step
))
3986 exc
= "Operation was cancelled"
3987 except Exception as e
:
3988 exc
= traceback
.format_exc()
3989 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
3991 self
._write
_ns
_status
(nsr_id
=nsr_id
, ns_state
=None, current_operation
="IDLE", current_operation_id
=None)
3993 db_nslcmop_update
["detailed-status"] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
3994 nslcmop_operation_state
= "FAILED"
3996 db_nsr_update
["operational-status"] = old_operational_status
3997 db_nsr_update
["config-status"] = old_config_status
3998 db_nsr_update
["detailed-status"] = ""
4000 if "VCA" in scale_process
:
4001 db_nsr_update
["config-status"] = "failed"
4002 if "RO" in scale_process
:
4003 db_nsr_update
["operational-status"] = "failed"
4004 db_nsr_update
["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id
, step
,
4007 error_description_nslcmop
= None
4008 nslcmop_operation_state
= "COMPLETED"
4009 db_nslcmop_update
["detailed-status"] = "Done"
4011 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
="", error_message
=error_description_nslcmop
,
4012 operation_state
=nslcmop_operation_state
, other_update
=db_nslcmop_update
)
4014 self
._write
_ns
_status
(nsr_id
=nsr_id
, ns_state
=None, current_operation
="IDLE",
4015 current_operation_id
=None, other_update
=db_nsr_update
)
4017 if nslcmop_operation_state
:
4019 msg
= {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
, "operationState": nslcmop_operation_state
}
4020 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
4021 except Exception as e
:
4022 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
4023 self
.logger
.debug(logging_text
+ "Exit")
4024 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
4026 async def _scale_ng_ro(self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
):
4027 nsr_id
= db_nslcmop
["nsInstanceId"]
4028 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
4031 # read from db: vnfd's for every vnf
4034 # for each vnf in ns, read vnfd
4035 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
4036 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
4037 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
4038 # if we haven't this vnfd, read it from db
4039 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
4041 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4042 db_vnfds
.append(vnfd
)
4043 n2vc_key
= self
.n2vc
.get_public_key()
4044 n2vc_key_list
= [n2vc_key
]
4045 self
.scale_vnfr(db_vnfr
, vdu_scaling_info
.get("vdu-create"), vdu_scaling_info
.get("vdu-delete"),
4047 # db_vnfr has been updated, update db_vnfrs to use it
4048 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
4049 await self
._instantiate
_ng
_ro
(logging_text
, nsr_id
, db_nsd
, db_nsr
, db_nslcmop
, db_vnfrs
,
4050 db_vnfds
, n2vc_key_list
, stage
=stage
, start_deploy
=time(),
4051 timeout_ns_deploy
=self
.timeout_ns_deploy
)
4052 if vdu_scaling_info
.get("vdu-delete"):
4053 self
.scale_vnfr(db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False)
4055 async def add_prometheus_metrics(self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
):
4056 if not self
.prometheus
:
4058 # look if exist a file called 'prometheus*.j2' and
4059 artifact_content
= self
.fs
.dir_ls(artifact_path
)
4060 job_file
= next((f
for f
in artifact_content
if f
.startswith("prometheus") and f
.endswith(".j2")), None)
4063 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
4067 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
4068 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
4070 vnfr_id
= vnfr_id
.replace("-", "")
4072 "JOB_NAME": vnfr_id
,
4073 "TARGET_IP": target_ip
,
4074 "EXPORTER_POD_IP": host_name
,
4075 "EXPORTER_POD_PORT": host_port
,
4077 job_list
= self
.prometheus
.parse_job(job_data
, variables
)
4078 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
4079 for job
in job_list
:
4080 if not isinstance(job
.get("job_name"), str) or vnfr_id
not in job
["job_name"]:
4081 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
4082 job
["nsr_id"] = nsr_id
4083 job_dict
= {jl
["job_name"]: jl
for jl
in job_list
}
4084 if await self
.prometheus
.update(job_dict
):
4085 return list(job_dict
.keys())
4087 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
4089 Get VCA Cloud and VCA Cloud Credentials for the VIM account
4091 :param: vim_account_id: VIM Account ID
4093 :return: (cloud_name, cloud_credential)
4095 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
4096 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
4098 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
4100 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
4102 :param: vim_account_id: VIM Account ID
4104 :return: (cloud_name, cloud_credential)
4106 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
4107 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")