1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
22 import logging
.handlers
25 from jinja2
import Environment
, TemplateError
, TemplateNotFound
, StrictUndefined
, UndefinedError
27 from osm_lcm
import ROclient
28 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
29 from osm_lcm
.lcm_utils
import LcmException
, LcmExceptionNoMgmtIP
, LcmBase
, deep_get
, get_iterable
, populate_dict
30 from osm_lcm
.data_utils
.nsd
import get_vnf_profiles
31 from osm_lcm
.data_utils
.vnfd
import get_vdu_list
, get_vdu_profile
, \
32 get_ee_sorted_initial_config_primitive_list
, get_ee_sorted_terminate_config_primitive_list
, \
33 get_kdu_list
, get_virtual_link_profiles
, get_vdu
, get_configuration
, \
34 get_vdu_index
, get_scaling_aspect
, get_number_of_instances
35 from osm_lcm
.data_utils
.list_utils
import find_in_list
36 from osm_lcm
.data_utils
.vnfr
import get_osm_params
37 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
38 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
39 from n2vc
.k8s_helm_conn
import K8sHelmConnector
40 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
41 from n2vc
.k8s_juju_conn
import K8sJujuConnector
43 from osm_common
.dbbase
import DbException
44 from osm_common
.fsbase
import FsException
46 from osm_lcm
.data_utils
.database
.database
import Database
47 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
49 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
50 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
52 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
54 from copy
import copy
, deepcopy
56 from uuid
import uuid4
58 from random
import randint
60 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
64 timeout_vca_on_error
= 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
65 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
66 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
67 timeout_charm_delete
= 10 * 60
68 timeout_primitive
= 30 * 60 # timeout for primitive execution
69 timeout_progress_primitive
= 10 * 60 # timeout for some progress in a primitive execution
71 SUBOPERATION_STATUS_NOT_FOUND
= -1
72 SUBOPERATION_STATUS_NEW
= -2
73 SUBOPERATION_STATUS_SKIP
= -3
74 task_name_deploy_vca
= "Deploying VCA"
76 def __init__(self
, msg
, lcm_tasks
, config
, loop
, prometheus
=None):
78 Init, Connect to database, filesystem storage, and messaging
79 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
84 logger
=logging
.getLogger('lcm.ns')
87 self
.db
= Database().instance
.db
88 self
.fs
= Filesystem().instance
.fs
90 self
.lcm_tasks
= lcm_tasks
91 self
.timeout
= config
["timeout"]
92 self
.ro_config
= config
["ro_config"]
93 self
.ng_ro
= config
["ro_config"].get("ng")
94 self
.vca_config
= config
["VCA"].copy()
96 # create N2VC connector
97 self
.n2vc
= N2VCJujuConnector(
100 url
='{}:{}'.format(self
.vca_config
['host'], self
.vca_config
['port']),
101 username
=self
.vca_config
.get('user', None),
102 vca_config
=self
.vca_config
,
103 on_update_db
=self
._on
_update
_n
2vc
_db
,
108 self
.conn_helm_ee
= LCMHelmConn(
113 vca_config
=self
.vca_config
,
114 on_update_db
=self
._on
_update
_n
2vc
_db
117 self
.k8sclusterhelm2
= K8sHelmConnector(
118 kubectl_command
=self
.vca_config
.get("kubectlpath"),
119 helm_command
=self
.vca_config
.get("helmpath"),
126 self
.k8sclusterhelm3
= K8sHelm3Connector(
127 kubectl_command
=self
.vca_config
.get("kubectlpath"),
128 helm_command
=self
.vca_config
.get("helm3path"),
135 self
.k8sclusterjuju
= K8sJujuConnector(
136 kubectl_command
=self
.vca_config
.get("kubectlpath"),
137 juju_command
=self
.vca_config
.get("jujupath"),
141 vca_config
=self
.vca_config
,
146 self
.k8scluster_map
= {
147 "helm-chart": self
.k8sclusterhelm2
,
148 "helm-chart-v3": self
.k8sclusterhelm3
,
149 "chart": self
.k8sclusterhelm3
,
150 "juju-bundle": self
.k8sclusterjuju
,
151 "juju": self
.k8sclusterjuju
,
155 "lxc_proxy_charm": self
.n2vc
,
156 "native_charm": self
.n2vc
,
157 "k8s_proxy_charm": self
.n2vc
,
158 "helm": self
.conn_helm_ee
,
159 "helm-v3": self
.conn_helm_ee
162 self
.prometheus
= prometheus
165 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
168 def increment_ip_mac(ip_mac
, vm_index
=1):
169 if not isinstance(ip_mac
, str):
172 # try with ipv4 look for last dot
173 i
= ip_mac
.rfind(".")
176 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
177 # try with ipv6 or mac look for last colon. Operate in hex
178 i
= ip_mac
.rfind(":")
181 # format in hex, len can be 2 for mac or 4 for ipv6
182 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
)
187 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
189 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
192 # TODO filter RO descriptor fields...
196 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
197 db_dict
['deploymentStatus'] = ro_descriptor
198 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
200 except Exception as e
:
201 self
.logger
.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id
, e
))
203 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
):
205 # remove last dot from path (if exists)
206 if path
.endswith('.'):
209 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
210 # .format(table, filter, path, updated_data))
214 nsr_id
= filter.get('_id')
216 # read ns record from database
217 nsr
= self
.db
.get_one(table
='nsrs', q_filter
=filter)
218 current_ns_status
= nsr
.get('nsState')
220 # get vca status for NS
221 status_dict
= await self
.n2vc
.get_status(namespace
='.' + nsr_id
, yaml_format
=False)
225 db_dict
['vcaStatus'] = status_dict
227 # update configurationStatus for this VCA
229 vca_index
= int(path
[path
.rfind(".")+1:])
231 vca_list
= deep_get(target_dict
=nsr
, key_list
=('_admin', 'deployed', 'VCA'))
232 vca_status
= vca_list
[vca_index
].get('status')
234 configuration_status_list
= nsr
.get('configurationStatus')
235 config_status
= configuration_status_list
[vca_index
].get('status')
237 if config_status
== 'BROKEN' and vca_status
!= 'failed':
238 db_dict
['configurationStatus'][vca_index
] = 'READY'
239 elif config_status
!= 'BROKEN' and vca_status
== 'failed':
240 db_dict
['configurationStatus'][vca_index
] = 'BROKEN'
241 except Exception as e
:
242 # not update configurationStatus
243 self
.logger
.debug('Error updating vca_index (ignore): {}'.format(e
))
245 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
246 # if nsState = 'DEGRADED' check if all is OK
248 if current_ns_status
in ('READY', 'DEGRADED'):
249 error_description
= ''
251 if status_dict
.get('machines'):
252 for machine_id
in status_dict
.get('machines'):
253 machine
= status_dict
.get('machines').get(machine_id
)
254 # check machine agent-status
255 if machine
.get('agent-status'):
256 s
= machine
.get('agent-status').get('status')
259 error_description
+= 'machine {} agent-status={} ; '.format(machine_id
, s
)
260 # check machine instance status
261 if machine
.get('instance-status'):
262 s
= machine
.get('instance-status').get('status')
265 error_description
+= 'machine {} instance-status={} ; '.format(machine_id
, s
)
267 if status_dict
.get('applications'):
268 for app_id
in status_dict
.get('applications'):
269 app
= status_dict
.get('applications').get(app_id
)
270 # check application status
271 if app
.get('status'):
272 s
= app
.get('status').get('status')
275 error_description
+= 'application {} status={} ; '.format(app_id
, s
)
277 if error_description
:
278 db_dict
['errorDescription'] = error_description
279 if current_ns_status
== 'READY' and is_degraded
:
280 db_dict
['nsState'] = 'DEGRADED'
281 if current_ns_status
== 'DEGRADED' and not is_degraded
:
282 db_dict
['nsState'] = 'READY'
285 self
.update_db_2("nsrs", nsr_id
, db_dict
)
287 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
289 except Exception as e
:
290 self
.logger
.warn('Error updating NS state for ns={}: {}'.format(nsr_id
, e
))
293 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
295 env
= Environment(undefined
=StrictUndefined
)
296 template
= env
.from_string(cloud_init_text
)
297 return template
.render(additional_params
or {})
298 except UndefinedError
as e
:
299 raise LcmException("Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
300 "file, must be provided in the instantiation parameters inside the "
301 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
))
302 except (TemplateError
, TemplateNotFound
) as e
:
303 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
304 format(vnfd_id
, vdu_id
, e
))
306 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
307 cloud_init_content
= cloud_init_file
= None
309 if vdu
.get("cloud-init-file"):
310 base_folder
= vnfd
["_admin"]["storage"]
311 cloud_init_file
= "{}/{}/cloud_init/{}".format(base_folder
["folder"], base_folder
["pkg-dir"],
312 vdu
["cloud-init-file"])
313 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
314 cloud_init_content
= ci_file
.read()
315 elif vdu
.get("cloud-init"):
316 cloud_init_content
= vdu
["cloud-init"]
318 return cloud_init_content
319 except FsException
as e
:
320 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
321 format(vnfd
["id"], vdu
["id"], cloud_init_file
, e
))
323 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
324 vdur
= next(vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"])
325 additional_params
= vdur
.get("additionalParams")
326 return parse_yaml_strings(additional_params
)
328 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
330 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
331 :param vnfd: input vnfd
332 :param new_id: overrides vnf id if provided
333 :param additionalParams: Instantiation params for VNFs provided
334 :param nsrId: Id of the NSR
335 :return: copy of vnfd
337 vnfd_RO
= deepcopy(vnfd
)
338 # remove unused by RO configuration, monitoring, scaling and internal keys
339 vnfd_RO
.pop("_id", None)
340 vnfd_RO
.pop("_admin", None)
341 vnfd_RO
.pop("monitoring-param", None)
342 vnfd_RO
.pop("scaling-group-descriptor", None)
343 vnfd_RO
.pop("kdu", None)
344 vnfd_RO
.pop("k8s-cluster", None)
346 vnfd_RO
["id"] = new_id
348 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
349 for vdu
in get_iterable(vnfd_RO
, "vdu"):
350 vdu
.pop("cloud-init-file", None)
351 vdu
.pop("cloud-init", None)
355 def ip_profile_2_RO(ip_profile
):
356 RO_ip_profile
= deepcopy(ip_profile
)
357 if "dns-server" in RO_ip_profile
:
358 if isinstance(RO_ip_profile
["dns-server"], list):
359 RO_ip_profile
["dns-address"] = []
360 for ds
in RO_ip_profile
.pop("dns-server"):
361 RO_ip_profile
["dns-address"].append(ds
['address'])
363 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
364 if RO_ip_profile
.get("ip-version") == "ipv4":
365 RO_ip_profile
["ip-version"] = "IPv4"
366 if RO_ip_profile
.get("ip-version") == "ipv6":
367 RO_ip_profile
["ip-version"] = "IPv6"
368 if "dhcp-params" in RO_ip_profile
:
369 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
372 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
373 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
374 if db_vim
["_admin"]["operationalState"] != "ENABLED":
375 raise LcmException("VIM={} is not available. operationalState={}".format(
376 vim_account
, db_vim
["_admin"]["operationalState"]))
377 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
380 def get_ro_wim_id_for_wim_account(self
, wim_account
):
381 if isinstance(wim_account
, str):
382 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
383 if db_wim
["_admin"]["operationalState"] != "ENABLED":
384 raise LcmException("WIM={} is not available. operationalState={}".format(
385 wim_account
, db_wim
["_admin"]["operationalState"]))
386 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
391 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
393 db_vdu_push_list
= []
394 db_update
= {"_admin.modified": time()}
396 for vdu_id
, vdu_count
in vdu_create
.items():
397 vdur
= next((vdur
for vdur
in reversed(db_vnfr
["vdur"]) if vdur
["vdu-id-ref"] == vdu_id
), None)
399 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".
402 for count
in range(vdu_count
):
403 vdur_copy
= deepcopy(vdur
)
404 vdur_copy
["status"] = "BUILD"
405 vdur_copy
["status-detailed"] = None
406 vdur_copy
["ip-address"]: None
407 vdur_copy
["_id"] = str(uuid4())
408 vdur_copy
["count-index"] += count
+ 1
409 vdur_copy
["id"] = "{}-{}".format(vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"])
410 vdur_copy
.pop("vim_info", None)
411 for iface
in vdur_copy
["interfaces"]:
412 if iface
.get("fixed-ip"):
413 iface
["ip-address"] = self
.increment_ip_mac(iface
["ip-address"], count
+1)
415 iface
.pop("ip-address", None)
416 if iface
.get("fixed-mac"):
417 iface
["mac-address"] = self
.increment_ip_mac(iface
["mac-address"], count
+1)
419 iface
.pop("mac-address", None)
420 iface
.pop("mgmt_vnf", None) # only first vdu can be managment of vnf
421 db_vdu_push_list
.append(vdur_copy
)
422 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
424 for vdu_id
, vdu_count
in vdu_delete
.items():
426 indexes_to_delete
= [iv
[0] for iv
in enumerate(db_vnfr
["vdur"]) if iv
[1]["vdu-id-ref"] == vdu_id
]
427 db_update
.update({"vdur.{}.status".format(i
): "DELETING" for i
in indexes_to_delete
[-vdu_count
:]})
429 # it must be deleted one by one because common.db does not allow otherwise
430 vdus_to_delete
= [v
for v
in reversed(db_vnfr
["vdur"]) if v
["vdu-id-ref"] == vdu_id
]
431 for vdu
in vdus_to_delete
[:vdu_count
]:
432 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, None, pull
={"vdur": {"_id": vdu
["_id"]}})
433 db_push
= {"vdur": db_vdu_push_list
} if db_vdu_push_list
else None
434 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
435 # modify passed dictionary db_vnfr
436 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
437 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
439 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
441 Updates database nsr with the RO info for the created vld
442 :param ns_update_nsr: dictionary to be filled with the updated info
443 :param db_nsr: content of db_nsr. This is also modified
444 :param nsr_desc_RO: nsr descriptor from RO
445 :return: Nothing, LcmException is raised on errors
448 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
449 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
450 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
452 vld
["vim-id"] = net_RO
.get("vim_net_id")
453 vld
["name"] = net_RO
.get("vim_name")
454 vld
["status"] = net_RO
.get("status")
455 vld
["status-detailed"] = net_RO
.get("error_msg")
456 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
459 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld
["id"]))
461 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
463 for db_vnfr
in db_vnfrs
.values():
464 vnfr_update
= {"status": "ERROR"}
465 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
466 if "status" not in vdur
:
467 vdur
["status"] = "ERROR"
468 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
470 vdur
["status-detailed"] = str(error_text
)
471 vnfr_update
["vdur.{}.status-detailed".format(vdu_index
)] = "ERROR"
472 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
473 except DbException
as e
:
474 self
.logger
.error("Cannot update vnf. {}".format(e
))
476 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
478 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
479 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
480 :param nsr_desc_RO: nsr descriptor from RO
481 :return: Nothing, LcmException is raised on errors
483 for vnf_index
, db_vnfr
in db_vnfrs
.items():
484 for vnf_RO
in nsr_desc_RO
["vnfs"]:
485 if vnf_RO
["member_vnf_index"] != vnf_index
:
488 if vnf_RO
.get("ip_address"):
489 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
["ip_address"].split(";")[0]
490 elif not db_vnfr
.get("ip-address"):
491 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
492 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index
))
494 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
495 vdur_RO_count_index
= 0
496 if vdur
.get("pdu-type"):
498 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
499 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
501 if vdur
["count-index"] != vdur_RO_count_index
:
502 vdur_RO_count_index
+= 1
504 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
505 if vdur_RO
.get("ip_address"):
506 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
508 vdur
["ip-address"] = None
509 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
510 vdur
["name"] = vdur_RO
.get("vim_name")
511 vdur
["status"] = vdur_RO
.get("status")
512 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
513 for ifacer
in get_iterable(vdur
, "interfaces"):
514 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
515 if ifacer
["name"] == interface_RO
.get("internal_name"):
516 ifacer
["ip-address"] = interface_RO
.get("ip_address")
517 ifacer
["mac-address"] = interface_RO
.get("mac_address")
520 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
522 .format(vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]))
523 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
526 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
527 "VIM info".format(vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]))
529 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
530 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
531 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
533 vld
["vim-id"] = net_RO
.get("vim_net_id")
534 vld
["name"] = net_RO
.get("vim_name")
535 vld
["status"] = net_RO
.get("status")
536 vld
["status-detailed"] = net_RO
.get("error_msg")
537 vnfr_update
["vld.{}".format(vld_index
)] = vld
540 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
541 vnf_index
, vld
["id"]))
543 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
547 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index
))
549 def _get_ns_config_info(self
, nsr_id
):
551 Generates a mapping between vnf,vdu elements and the N2VC id
552 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
553 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
554 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
555 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
557 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
558 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
560 ns_config_info
= {"osm-config-mapping": mapping
}
561 for vca
in vca_deployed_list
:
562 if not vca
["member-vnf-index"]:
564 if not vca
["vdu_id"]:
565 mapping
[vca
["member-vnf-index"]] = vca
["application"]
567 mapping
["{}.{}.{}".format(vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"])] =\
569 return ns_config_info
571 async def _instantiate_ng_ro(self
, logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
, db_vnfds
,
572 n2vc_key_list
, stage
, start_deploy
, timeout_ns_deploy
):
576 def get_vim_account(vim_account_id
):
578 if vim_account_id
in db_vims
:
579 return db_vims
[vim_account_id
]
580 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
581 db_vims
[vim_account_id
] = db_vim
584 # modify target_vld info with instantiation parameters
585 def parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, target_sdn
):
586 if vld_params
.get("ip-profile"):
587 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
["ip-profile"]
588 if vld_params
.get("provider-network"):
589 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
["provider-network"]
590 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
591 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
["provider-network"]["sdn-ports"]
592 if vld_params
.get("wimAccountId"):
593 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
594 target_vld
["vim_info"][target_wim
] = {}
595 for param
in ("vim-network-name", "vim-network-id"):
596 if vld_params
.get(param
):
597 if isinstance(vld_params
[param
], dict):
598 for vim
, vim_net
in vld_params
[param
]:
599 other_target_vim
= "vim:" + vim
600 populate_dict(target_vld
["vim_info"], (other_target_vim
, param
.replace("-", "_")), vim_net
)
601 else: # isinstance str
602 target_vld
["vim_info"][target_vim
][param
.replace("-", "_")] = vld_params
[param
]
603 if vld_params
.get("common_id"):
604 target_vld
["common_id"] = vld_params
.get("common_id")
606 nslcmop_id
= db_nslcmop
["_id"]
608 "name": db_nsr
["name"],
611 "image": deepcopy(db_nsr
["image"]),
612 "flavor": deepcopy(db_nsr
["flavor"]),
613 "action_id": nslcmop_id
,
614 "cloud_init_content": {},
616 for image
in target
["image"]:
617 image
["vim_info"] = {}
618 for flavor
in target
["flavor"]:
619 flavor
["vim_info"] = {}
621 if db_nslcmop
.get("lcmOperationType") != "instantiate":
622 # get parameters of instantiation:
623 db_nslcmop_instantiate
= self
.db
.get_list("nslcmops", {"nsInstanceId": db_nslcmop
["nsInstanceId"],
624 "lcmOperationType": "instantiate"})[-1]
625 ns_params
= db_nslcmop_instantiate
.get("operationParams")
627 ns_params
= db_nslcmop
.get("operationParams")
628 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
629 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
632 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
633 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
637 "mgmt-network": vld
.get("mgmt-network", False),
638 "type": vld
.get("type"),
641 "vim_network_name": vld
.get("vim-network-name"),
642 "vim_account_id": ns_params
["vimAccountId"]
646 # check if this network needs SDN assist
647 if vld
.get("pci-interfaces"):
648 db_vim
= VimAccountDB
.get_vim_account_with_id(target_vld
["vim_info"][0]["vim_account_id"])
649 sdnc_id
= db_vim
["config"].get("sdn-controller")
651 target_vld
["vim_info"].append({"sdnc_id": sdnc_id
})
653 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
654 for nsd_vnf_profile
in nsd_vnf_profiles
:
655 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
656 if cp
["virtual-link-profile-id"] == vld
["id"]:
657 cp2target
["member_vnf:{}.{}".format(
658 cp
["constituent-cpd-id"][0]["constituent-base-element-id"],
659 cp
["constituent-cpd-id"][0]["constituent-cpd-id"]
660 )] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
662 # check at nsd descriptor, if there is an ip-profile
664 virtual_link_profiles
= get_virtual_link_profiles(nsd
)
666 for vlp
in virtual_link_profiles
:
667 ip_profile
= find_in_list(nsd
["ip-profiles"],
668 lambda profile
: profile
["name"] == vlp
["ip-profile-ref"])
669 vld_params
["ip-profile"] = ip_profile
["ip-profile-params"]
670 # update vld_params with instantiation params
671 vld_instantiation_params
= find_in_list(get_iterable(ns_params
, "vld"),
672 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]))
673 if vld_instantiation_params
:
674 vld_params
.update(vld_instantiation_params
)
675 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
676 target
["ns"]["vld"].append(target_vld
)
678 for vnfr
in db_vnfrs
.values():
679 vnfd
= find_in_list(db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"])
680 vnf_params
= find_in_list(get_iterable(ns_params
, "vnf"),
681 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"])
682 target_vnf
= deepcopy(vnfr
)
683 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
684 for vld
in target_vnf
.get("vld", ()):
685 # check if connected to a ns.vld, to fill target'
686 vnf_cp
= find_in_list(vnfd
.get("int-virtual-link-desc", ()),
687 lambda cpd
: cpd
.get("id") == vld
["id"])
689 ns_cp
= "member_vnf:{}.{}".format(vnfr
["member-vnf-index-ref"], vnf_cp
["id"])
690 if cp2target
.get(ns_cp
):
691 vld
["target"] = cp2target
[ns_cp
]
693 vld
["vim_info"] = {target_vim
: {"vim_network_name": vld
.get("vim-network-name")}}
694 # check if this network needs SDN assist
696 if vld
.get("pci-interfaces"):
697 db_vim
= get_vim_account(vnfr
["vim-account-id"])
698 sdnc_id
= db_vim
["config"].get("sdn-controller")
700 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
701 target_sdn
= "sdn:{}".format(sdnc_id
)
702 vld
["vim_info"][target_sdn
] = {
703 "sdn": True, "target_vim": target_vim
, "vlds": [sdn_vld
], "type": vld
.get("type")}
705 # check at vnfd descriptor, if there is an ip-profile
707 vnfd_vlp
= find_in_list(
708 get_virtual_link_profiles(vnfd
),
709 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"]
711 if vnfd_vlp
and vnfd_vlp
.get("virtual-link-protocol-data") and \
712 vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data"):
713 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"]["l3-protocol-data"]
714 ip_profile_dest_data
= {}
715 if "ip-version" in ip_profile_source_data
:
716 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
["ip-version"]
717 if "cidr" in ip_profile_source_data
:
718 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
["cidr"]
719 if "gateway-ip" in ip_profile_source_data
:
720 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
["gateway-ip"]
721 if "dhcp-enabled" in ip_profile_source_data
:
722 ip_profile_dest_data
["dhcp-params"] = {
723 "enabled": ip_profile_source_data
["dhcp-enabled"]
726 vld_params
["ip-profile"] = ip_profile_dest_data
727 # update vld_params with instantiation params
729 vld_instantiation_params
= find_in_list(get_iterable(vnf_params
, "internal-vld"),
730 lambda i_vld
: i_vld
["name"] == vld
["id"])
731 if vld_instantiation_params
:
732 vld_params
.update(vld_instantiation_params
)
733 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
736 for vdur
in target_vnf
.get("vdur", ()):
737 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
738 continue # This vdu must not be created
739 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
741 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
744 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
745 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
746 if vdu_configuration
and vdu_configuration
.get("config-access") and \
747 vdu_configuration
.get("config-access").get("ssh-access"):
748 vdur
["ssh-keys"] = ssh_keys_all
749 vdur
["ssh-access-required"] = vdu_configuration
["config-access"]["ssh-access"]["required"]
750 elif vnf_configuration
and vnf_configuration
.get("config-access") and \
751 vnf_configuration
.get("config-access").get("ssh-access") and \
752 any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"]):
753 vdur
["ssh-keys"] = ssh_keys_all
754 vdur
["ssh-access-required"] = vnf_configuration
["config-access"]["ssh-access"]["required"]
755 elif ssh_keys_instantiation
and \
756 find_in_list(vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")):
757 vdur
["ssh-keys"] = ssh_keys_instantiation
759 self
.logger
.debug("NS > vdur > {}".format(vdur
))
761 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
763 if vdud
.get("cloud-init-file"):
764 vdur
["cloud-init"] = "{}:file:{}".format(vnfd
["_id"], vdud
.get("cloud-init-file"))
765 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
766 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
767 base_folder
= vnfd
["_admin"]["storage"]
768 cloud_init_file
= "{}/{}/cloud_init/{}".format(base_folder
["folder"], base_folder
["pkg-dir"],
769 vdud
.get("cloud-init-file"))
770 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
771 target
["cloud_init_content"][vdur
["cloud-init"]] = ci_file
.read()
772 elif vdud
.get("cloud-init"):
773 vdur
["cloud-init"] = "{}:vdu:{}".format(vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"]))
774 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
775 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
["cloud-init"]
776 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
777 deploy_params_vdu
= self
._format
_additional
_params
(vdur
.get("additionalParams") or {})
778 deploy_params_vdu
["OSM"] = get_osm_params(vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"])
779 vdur
["additionalParams"] = deploy_params_vdu
782 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
783 if target_vim
not in ns_flavor
["vim_info"]:
784 ns_flavor
["vim_info"][target_vim
] = {}
787 # in case alternative images are provided we must check if they should be applied
788 # for the vim_type, modify the vim_type taking into account
789 ns_image_id
= int(vdur
["ns-image-id"])
790 if vdur
.get("alt-image-ids"):
791 db_vim
= get_vim_account(vnfr
["vim-account-id"])
792 vim_type
= db_vim
["vim_type"]
793 for alt_image_id
in vdur
.get("alt-image-ids"):
794 ns_alt_image
= target
["image"][int(alt_image_id
)]
795 if vim_type
== ns_alt_image
.get("vim-type"):
796 # must use alternative image
797 self
.logger
.debug("use alternative image id: {}".format(alt_image_id
))
798 ns_image_id
= alt_image_id
799 vdur
["ns-image-id"] = ns_image_id
801 ns_image
= target
["image"][int(ns_image_id
)]
802 if target_vim
not in ns_image
["vim_info"]:
803 ns_image
["vim_info"][target_vim
] = {}
805 vdur
["vim_info"] = {target_vim
: {}}
806 # instantiation parameters
808 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
810 vdur_list
.append(vdur
)
811 target_vnf
["vdur"] = vdur_list
812 target
["vnf"].append(target_vnf
)
814 desc
= await self
.RO
.deploy(nsr_id
, target
)
815 self
.logger
.debug("RO return > {}".format(desc
))
816 action_id
= desc
["action_id"]
817 await self
._wait
_ng
_ro
(nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
)
821 "_admin.deployed.RO.operational-status": "running",
822 "detailed-status": " ".join(stage
)
824 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
825 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
826 self
._write
_op
_status
(nslcmop_id
, stage
)
827 self
.logger
.debug(logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
))
830 async def _wait_ng_ro(self
, nsr_id
, action_id
, nslcmop_id
=None, start_time
=None, timeout
=600, stage
=None):
831 detailed_status_old
= None
833 start_time
= start_time
or time()
834 while time() <= start_time
+ timeout
:
835 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
836 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
837 if desc_status
["status"] == "FAILED":
838 raise NgRoException(desc_status
["details"])
839 elif desc_status
["status"] == "BUILD":
841 stage
[2] = "VIM: ({})".format(desc_status
["details"])
842 elif desc_status
["status"] == "DONE":
844 stage
[2] = "Deployed at VIM"
847 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status
["status"])
848 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
849 detailed_status_old
= stage
[2]
850 db_nsr_update
["detailed-status"] = " ".join(stage
)
851 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
852 self
._write
_op
_status
(nslcmop_id
, stage
)
853 await asyncio
.sleep(15, loop
=self
.loop
)
854 else: # timeout_ns_deploy
855 raise NgRoException("Timeout waiting ns to deploy")
857 async def _terminate_ng_ro(self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
):
861 start_deploy
= time()
868 "action_id": nslcmop_id
870 desc
= await self
.RO
.deploy(nsr_id
, target
)
871 action_id
= desc
["action_id"]
872 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
873 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
874 self
.logger
.debug(logging_text
+ "ns terminate action at RO. action_id={}".format(action_id
))
877 delete_timeout
= 20 * 60 # 20 minutes
878 await self
._wait
_ng
_ro
(nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
)
880 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
881 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
883 await self
.RO
.delete(nsr_id
)
884 except Exception as e
:
885 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
886 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
887 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
888 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
889 self
.logger
.debug(logging_text
+ "RO_action_id={} already deleted".format(action_id
))
890 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
891 failed_detail
.append("delete conflict: {}".format(e
))
892 self
.logger
.debug(logging_text
+ "RO_action_id={} delete conflict: {}".format(action_id
, e
))
894 failed_detail
.append("delete error: {}".format(e
))
895 self
.logger
.error(logging_text
+ "RO_action_id={} delete error: {}".format(action_id
, e
))
898 stage
[2] = "Error deleting from VIM"
900 stage
[2] = "Deleted from VIM"
901 db_nsr_update
["detailed-status"] = " ".join(stage
)
902 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
903 self
._write
_op
_status
(nslcmop_id
, stage
)
906 raise LcmException("; ".join(failed_detail
))
909 async def instantiate_RO(self
, logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
, db_vnfds
,
910 n2vc_key_list
, stage
):
913 :param logging_text: preffix text to use at logging
914 :param nsr_id: nsr identity
915 :param nsd: database content of ns descriptor
916 :param db_nsr: database content of ns record
917 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
919 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
920 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
921 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
922 :return: None or exception
925 start_deploy
= time()
926 ns_params
= db_nslcmop
.get("operationParams")
927 if ns_params
and ns_params
.get("timeout_ns_deploy"):
928 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
930 timeout_ns_deploy
= self
.timeout
.get("ns_deploy", self
.timeout_ns_deploy
)
932 # Check for and optionally request placement optimization. Database will be updated if placement activated
933 stage
[2] = "Waiting for Placement."
934 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
935 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
936 for vnfr
in db_vnfrs
.values():
937 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
940 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
942 return await self
._instantiate
_ng
_ro
(logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
,
943 db_vnfds
, n2vc_key_list
, stage
, start_deploy
, timeout_ns_deploy
)
944 except Exception as e
:
945 stage
[2] = "ERROR deploying at VIM"
946 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
947 self
.logger
.error("Error deploying at VIM {}".format(e
),
948 exc_info
=not isinstance(e
, (ROclient
.ROClientException
, LcmException
, DbException
,
952 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
954 Wait for kdu to be up, get ip address
955 :param logging_text: prefix use for logging
962 # self.logger.debug(logging_text + "Starting wait_kdu_up")
965 while nb_tries
< 360:
966 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
967 kdur
= next((x
for x
in get_iterable(db_vnfr
, "kdur") if x
.get("kdu-name") == kdu_name
), None)
969 raise LcmException("Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
))
970 if kdur
.get("status"):
971 if kdur
["status"] in ("READY", "ENABLED"):
972 return kdur
.get("ip-address")
974 raise LcmException("target KDU={} is in error state".format(kdu_name
))
976 await asyncio
.sleep(10, loop
=self
.loop
)
978 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
980 async def wait_vm_up_insert_key_ro(self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None):
982 Wait for ip addres at RO, and optionally, insert public key in virtual machine
983 :param logging_text: prefix use for logging
988 :param pub_key: public ssh key to inject, None to skip
989 :param user: user to apply the public ssh key
993 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1003 if ro_retries
>= 360: # 1 hour
1004 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
))
1006 await asyncio
.sleep(10, loop
=self
.loop
)
1009 if not target_vdu_id
:
1010 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1012 if not vdu_id
: # for the VNF case
1013 if db_vnfr
.get("status") == "ERROR":
1014 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1015 ip_address
= db_vnfr
.get("ip-address")
1018 vdur
= next((x
for x
in get_iterable(db_vnfr
, "vdur") if x
.get("ip-address") == ip_address
), None)
1020 vdur
= next((x
for x
in get_iterable(db_vnfr
, "vdur")
1021 if x
.get("vdu-id-ref") == vdu_id
and x
.get("count-index") == vdu_index
), None)
1023 if not vdur
and len(db_vnfr
.get("vdur", ())) == 1: # If only one, this should be the target vdu
1024 vdur
= db_vnfr
["vdur"][0]
1026 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id
, vdu_id
,
1028 # New generation RO stores information at "vim_info"
1031 if vdur
.get("vim_info"):
1032 target_vim
= next(t
for t
in vdur
["vim_info"]) # there should be only one key
1033 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1034 if vdur
.get("pdu-type") or vdur
.get("status") == "ACTIVE" or ng_ro_status
== "ACTIVE":
1035 ip_address
= vdur
.get("ip-address")
1038 target_vdu_id
= vdur
["vdu-id-ref"]
1039 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1040 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1042 if not target_vdu_id
:
1045 # inject public key into machine
1046 if pub_key
and user
:
1047 self
.logger
.debug(logging_text
+ "Inserting RO key")
1048 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1049 if vdur
.get("pdu-type"):
1050 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1053 ro_vm_id
= "{}-{}".format(db_vnfr
["member-vnf-index-ref"], target_vdu_id
) # TODO add vdu_index
1055 target
= {"action": {"action": "inject_ssh_key", "key": pub_key
, "user": user
},
1056 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1058 desc
= await self
.RO
.deploy(nsr_id
, target
)
1059 action_id
= desc
["action_id"]
1060 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600)
1063 # wait until NS is deployed at RO
1065 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1066 ro_nsr_id
= deep_get(db_nsrs
, ("_admin", "deployed", "RO", "nsr_id"))
1069 result_dict
= await self
.RO
.create_action(
1071 item_id_name
=ro_nsr_id
,
1072 descriptor
={"add_public_key": pub_key
, "vms": [ro_vm_id
], "user": user
}
1074 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1075 if not result_dict
or not isinstance(result_dict
, dict):
1076 raise LcmException("Unknown response from RO when injecting key")
1077 for result
in result_dict
.values():
1078 if result
.get("vim_result") == 200:
1081 raise ROclient
.ROClientException("error injecting key: {}".format(
1082 result
.get("description")))
1084 except NgRoException
as e
:
1085 raise LcmException("Reaching max tries injecting key. Error: {}".format(e
))
1086 except ROclient
.ROClientException
as e
:
1088 self
.logger
.debug(logging_text
+ "error injecting key: {}. Retrying until {} seconds".
1092 raise LcmException("Reaching max tries injecting key. Error: {}".format(e
))
1098 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1100 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1102 my_vca
= vca_deployed_list
[vca_index
]
1103 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1104 # vdu or kdu: no dependencies
1108 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1109 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1110 configuration_status_list
= db_nsr
["configurationStatus"]
1111 for index
, vca_deployed
in enumerate(configuration_status_list
):
1112 if index
== vca_index
:
1115 if not my_vca
.get("member-vnf-index") or \
1116 (vca_deployed
.get("member-vnf-index") == my_vca
.get("member-vnf-index")):
1117 internal_status
= configuration_status_list
[index
].get("status")
1118 if internal_status
== 'READY':
1120 elif internal_status
== 'BROKEN':
1121 raise LcmException("Configuration aborted because dependent charm/s has failed")
1125 # no dependencies, return
1127 await asyncio
.sleep(10)
1130 raise LcmException("Configuration aborted because dependent charm/s timeout")
1132 async def instantiate_N2VC(self
, logging_text
, vca_index
, nsi_id
, db_nsr
, db_vnfr
, vdu_id
, kdu_name
, vdu_index
,
1133 config_descriptor
, deploy_params
, base_folder
, nslcmop_id
, stage
, vca_type
, vca_name
,
1134 ee_config_descriptor
):
1135 nsr_id
= db_nsr
["_id"]
1136 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1137 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1138 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1139 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1141 'collection': 'nsrs',
1142 'filter': {'_id': nsr_id
},
1143 'path': db_update_entry
1149 element_under_configuration
= nsr_id
1153 vnfr_id
= db_vnfr
["_id"]
1154 osm_config
["osm"]["vnf_id"] = vnfr_id
1156 namespace
= "{nsi}.{ns}".format(
1157 nsi
=nsi_id
if nsi_id
else "",
1161 element_type
= 'VNF'
1162 element_under_configuration
= vnfr_id
1163 namespace
+= ".{}".format(vnfr_id
)
1165 namespace
+= ".{}-{}".format(vdu_id
, vdu_index
or 0)
1166 element_type
= 'VDU'
1167 element_under_configuration
= "{}-{}".format(vdu_id
, vdu_index
or 0)
1168 osm_config
["osm"]["vdu_id"] = vdu_id
1170 namespace
+= ".{}".format(kdu_name
)
1171 element_type
= 'KDU'
1172 element_under_configuration
= kdu_name
1173 osm_config
["osm"]["kdu_name"] = kdu_name
1176 artifact_path
= "{}/{}/{}/{}".format(
1177 base_folder
["folder"],
1178 base_folder
["pkg-dir"],
1179 "charms" if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1183 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1185 # get initial_config_primitive_list that applies to this element
1186 initial_config_primitive_list
= config_descriptor
.get('initial-config-primitive')
1188 self
.logger
.debug("Initial config primitive list > {}".format(initial_config_primitive_list
))
1190 # add config if not present for NS charm
1191 ee_descriptor_id
= ee_config_descriptor
.get("id")
1192 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1193 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(initial_config_primitive_list
,
1194 vca_deployed
, ee_descriptor_id
)
1196 self
.logger
.debug("Initial config primitive list #2 > {}".format(initial_config_primitive_list
))
1197 # n2vc_redesign STEP 3.1
1198 # find old ee_id if exists
1199 ee_id
= vca_deployed
.get("ee_id")
1202 deep_get(db_vnfr
, ("vim-account-id",)) or
1203 deep_get(deploy_params
, ("OSM", "vim_account_id"))
1205 vca_cloud
, vca_cloud_credential
= self
.get_vca_cloud_and_credentials(vim_account_id
)
1206 vca_k8s_cloud
, vca_k8s_cloud_credential
= self
.get_vca_k8s_cloud_and_credentials(vim_account_id
)
1207 # create or register execution environment in VCA
1208 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1210 self
._write
_configuration
_status
(
1212 vca_index
=vca_index
,
1214 element_under_configuration
=element_under_configuration
,
1215 element_type
=element_type
1218 step
= "create execution environment"
1219 self
.logger
.debug(logging_text
+ step
)
1223 if vca_type
== "k8s_proxy_charm":
1224 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1225 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1:],
1226 namespace
=namespace
,
1227 artifact_path
=artifact_path
,
1229 cloud_name
=vca_k8s_cloud
,
1230 credential_name
=vca_k8s_cloud_credential
,
1232 elif vca_type
== "helm" or vca_type
== "helm-v3":
1233 ee_id
, credentials
= await self
.vca_map
[vca_type
].create_execution_environment(
1234 namespace
=namespace
,
1238 artifact_path
=artifact_path
,
1242 ee_id
, credentials
= await self
.vca_map
[vca_type
].create_execution_environment(
1243 namespace
=namespace
,
1246 cloud_name
=vca_cloud
,
1247 credential_name
=vca_cloud_credential
,
1250 elif vca_type
== "native_charm":
1251 step
= "Waiting to VM being up and getting IP address"
1252 self
.logger
.debug(logging_text
+ step
)
1253 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
,
1254 user
=None, pub_key
=None)
1255 credentials
= {"hostname": rw_mgmt_ip
}
1257 username
= deep_get(config_descriptor
, ("config-access", "ssh-access", "default-user"))
1258 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1259 # merged. Meanwhile let's get username from initial-config-primitive
1260 if not username
and initial_config_primitive_list
:
1261 for config_primitive
in initial_config_primitive_list
:
1262 for param
in config_primitive
.get("parameter", ()):
1263 if param
["name"] == "ssh-username":
1264 username
= param
["value"]
1267 raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
1268 "'config-access.ssh-access.default-user'")
1269 credentials
["username"] = username
1270 # n2vc_redesign STEP 3.2
1272 self
._write
_configuration
_status
(
1274 vca_index
=vca_index
,
1275 status
='REGISTERING',
1276 element_under_configuration
=element_under_configuration
,
1277 element_type
=element_type
1280 step
= "register execution environment {}".format(credentials
)
1281 self
.logger
.debug(logging_text
+ step
)
1282 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1283 credentials
=credentials
,
1284 namespace
=namespace
,
1286 cloud_name
=vca_cloud
,
1287 credential_name
=vca_cloud_credential
,
1290 # for compatibility with MON/POL modules, the need model and application name at database
1291 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1292 ee_id_parts
= ee_id
.split('.')
1293 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1294 if len(ee_id_parts
) >= 2:
1295 model_name
= ee_id_parts
[0]
1296 application_name
= ee_id_parts
[1]
1297 db_nsr_update
[db_update_entry
+ "model"] = model_name
1298 db_nsr_update
[db_update_entry
+ "application"] = application_name
1300 # n2vc_redesign STEP 3.3
1301 step
= "Install configuration Software"
1303 self
._write
_configuration
_status
(
1305 vca_index
=vca_index
,
1306 status
='INSTALLING SW',
1307 element_under_configuration
=element_under_configuration
,
1308 element_type
=element_type
,
1309 other_update
=db_nsr_update
1312 # TODO check if already done
1313 self
.logger
.debug(logging_text
+ step
)
1315 if vca_type
== "native_charm":
1316 config_primitive
= next((p
for p
in initial_config_primitive_list
if p
["name"] == "config"), None)
1317 if config_primitive
:
1318 config
= self
._map
_primitive
_params
(
1324 if vca_type
== "lxc_proxy_charm":
1325 if element_type
== "NS":
1326 num_units
= db_nsr
.get("config-units") or 1
1327 elif element_type
== "VNF":
1328 num_units
= db_vnfr
.get("config-units") or 1
1329 elif element_type
== "VDU":
1330 for v
in db_vnfr
["vdur"]:
1331 if vdu_id
== v
["vdu-id-ref"]:
1332 num_units
= v
.get("config-units") or 1
1334 if vca_type
!= "k8s_proxy_charm":
1335 await self
.vca_map
[vca_type
].install_configuration_sw(
1337 artifact_path
=artifact_path
,
1340 num_units
=num_units
,
1343 # write in db flag of configuration_sw already installed
1344 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True})
1346 # add relations for this VCA (wait for other peers related with this VCA)
1347 await self
._add
_vca
_relations
(logging_text
=logging_text
, nsr_id
=nsr_id
,
1348 vca_index
=vca_index
, vca_type
=vca_type
)
1350 # if SSH access is required, then get execution environment SSH public
1351 # if native charm we have waited already to VM be UP
1352 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1355 # self.logger.debug("get ssh key block")
1356 if deep_get(config_descriptor
, ("config-access", "ssh-access", "required")):
1357 # self.logger.debug("ssh key needed")
1358 # Needed to inject a ssh key
1359 user
= deep_get(config_descriptor
, ("config-access", "ssh-access", "default-user"))
1360 step
= "Install configuration Software, getting public ssh key"
1361 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(ee_id
=ee_id
, db_dict
=db_dict
)
1363 step
= "Insert public key into VM user={} ssh_key={}".format(user
, pub_key
)
1365 # self.logger.debug("no need to get ssh key")
1366 step
= "Waiting to VM being up and getting IP address"
1367 self
.logger
.debug(logging_text
+ step
)
1369 # n2vc_redesign STEP 5.1
1370 # wait for RO (ip-address) Insert pub_key into VM
1373 rw_mgmt_ip
= await self
.wait_kdu_up(logging_text
, nsr_id
, vnfr_id
, kdu_name
)
1375 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(logging_text
, nsr_id
, vnfr_id
, vdu_id
,
1376 vdu_index
, user
=user
, pub_key
=pub_key
)
1378 rw_mgmt_ip
= None # This is for a NS configuration
1380 self
.logger
.debug(logging_text
+ ' VM_ip_address={}'.format(rw_mgmt_ip
))
1382 # store rw_mgmt_ip in deploy params for later replacement
1383 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
1385 # n2vc_redesign STEP 6 Execute initial config primitive
1386 step
= 'execute initial config primitive'
1388 # wait for dependent primitives execution (NS -> VNF -> VDU)
1389 if initial_config_primitive_list
:
1390 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
1392 # stage, in function of element type: vdu, kdu, vnf or ns
1393 my_vca
= vca_deployed_list
[vca_index
]
1394 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1396 stage
[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1397 elif my_vca
.get("member-vnf-index"):
1399 stage
[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1402 stage
[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1404 self
._write
_configuration
_status
(
1406 vca_index
=vca_index
,
1407 status
='EXECUTING PRIMITIVE'
1410 self
._write
_op
_status
(
1415 check_if_terminated_needed
= True
1416 for initial_config_primitive
in initial_config_primitive_list
:
1417 # adding information on the vca_deployed if it is a NS execution environment
1418 if not vca_deployed
["member-vnf-index"]:
1419 deploy_params
["ns_config_info"] = json
.dumps(self
._get
_ns
_config
_info
(nsr_id
))
1420 # TODO check if already done
1421 primitive_params_
= self
._map
_primitive
_params
(initial_config_primitive
, {}, deploy_params
)
1423 step
= "execute primitive '{}' params '{}'".format(initial_config_primitive
["name"], primitive_params_
)
1424 self
.logger
.debug(logging_text
+ step
)
1425 await self
.vca_map
[vca_type
].exec_primitive(
1427 primitive_name
=initial_config_primitive
["name"],
1428 params_dict
=primitive_params_
,
1431 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1432 if check_if_terminated_needed
:
1433 if config_descriptor
.get('terminate-config-primitive'):
1434 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True})
1435 check_if_terminated_needed
= False
1437 # TODO register in database that primitive is done
1439 # STEP 7 Configure metrics
1440 if vca_type
== "helm" or vca_type
== "helm-v3":
1441 prometheus_jobs
= await self
.add_prometheus_metrics(
1443 artifact_path
=artifact_path
,
1444 ee_config_descriptor
=ee_config_descriptor
,
1447 target_ip
=rw_mgmt_ip
,
1450 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "prometheus_jobs": prometheus_jobs
})
1452 step
= "instantiated at VCA"
1453 self
.logger
.debug(logging_text
+ step
)
1455 self
._write
_configuration
_status
(
1457 vca_index
=vca_index
,
1461 except Exception as e
: # TODO not use Exception but N2VC exception
1462 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1463 if not isinstance(e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)):
1464 self
.logger
.error("Exception while {} : {}".format(step
, e
), exc_info
=True)
1465 self
._write
_configuration
_status
(
1467 vca_index
=vca_index
,
1470 raise LcmException("{} {}".format(step
, e
)) from e
1472 def _write_ns_status(self
, nsr_id
: str, ns_state
: str, current_operation
: str, current_operation_id
: str,
1473 error_description
: str = None, error_detail
: str = None, other_update
: dict = None):
1475 Update db_nsr fields.
1478 :param current_operation:
1479 :param current_operation_id:
1480 :param error_description:
1481 :param error_detail:
1482 :param other_update: Other required changes at database if provided, will be cleared
1486 db_dict
= other_update
or {}
1487 db_dict
["_admin.nslcmop"] = current_operation_id
# for backward compatibility
1488 db_dict
["_admin.current-operation"] = current_operation_id
1489 db_dict
["_admin.operation-type"] = current_operation
if current_operation
!= "IDLE" else None
1490 db_dict
["currentOperation"] = current_operation
1491 db_dict
["currentOperationID"] = current_operation_id
1492 db_dict
["errorDescription"] = error_description
1493 db_dict
["errorDetail"] = error_detail
1496 db_dict
["nsState"] = ns_state
1497 self
.update_db_2("nsrs", nsr_id
, db_dict
)
1498 except DbException
as e
:
1499 self
.logger
.warn('Error writing NS status, ns={}: {}'.format(nsr_id
, e
))
1501 def _write_op_status(self
, op_id
: str, stage
: list = None, error_message
: str = None, queuePosition
: int = 0,
1502 operation_state
: str = None, other_update
: dict = None):
1504 db_dict
= other_update
or {}
1505 db_dict
['queuePosition'] = queuePosition
1506 if isinstance(stage
, list):
1507 db_dict
['stage'] = stage
[0]
1508 db_dict
['detailed-status'] = " ".join(stage
)
1509 elif stage
is not None:
1510 db_dict
['stage'] = str(stage
)
1512 if error_message
is not None:
1513 db_dict
['errorMessage'] = error_message
1514 if operation_state
is not None:
1515 db_dict
['operationState'] = operation_state
1516 db_dict
["statusEnteredTime"] = time()
1517 self
.update_db_2("nslcmops", op_id
, db_dict
)
1518 except DbException
as e
:
1519 self
.logger
.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id
, e
))
1521 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
1523 nsr_id
= db_nsr
["_id"]
1524 # configurationStatus
1525 config_status
= db_nsr
.get('configurationStatus')
1527 db_nsr_update
= {"configurationStatus.{}.status".format(index
): status
for index
, v
in
1528 enumerate(config_status
) if v
}
1530 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1532 except DbException
as e
:
1533 self
.logger
.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id
, e
))
1535 def _write_configuration_status(self
, nsr_id
: str, vca_index
: int, status
: str = None,
1536 element_under_configuration
: str = None, element_type
: str = None,
1537 other_update
: dict = None):
1539 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1540 # .format(vca_index, status))
1543 db_path
= 'configurationStatus.{}.'.format(vca_index
)
1544 db_dict
= other_update
or {}
1546 db_dict
[db_path
+ 'status'] = status
1547 if element_under_configuration
:
1548 db_dict
[db_path
+ 'elementUnderConfiguration'] = element_under_configuration
1550 db_dict
[db_path
+ 'elementType'] = element_type
1551 self
.update_db_2("nsrs", nsr_id
, db_dict
)
1552 except DbException
as e
:
1553 self
.logger
.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1554 .format(status
, nsr_id
, vca_index
, e
))
1556 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
1558 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1559 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1560 Database is used because the result can be obtained from a different LCM worker in case of HA.
1561 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1562 :param db_nslcmop: database content of nslcmop
1563 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1564 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
1565 computed 'vim-account-id'
1568 nslcmop_id
= db_nslcmop
['_id']
1569 placement_engine
= deep_get(db_nslcmop
, ('operationParams', 'placement-engine'))
1570 if placement_engine
== "PLA":
1571 self
.logger
.debug(logging_text
+ "Invoke and wait for placement optimization")
1572 await self
.msg
.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id
}, loop
=self
.loop
)
1573 db_poll_interval
= 5
1574 wait
= db_poll_interval
* 10
1576 while not pla_result
and wait
>= 0:
1577 await asyncio
.sleep(db_poll_interval
)
1578 wait
-= db_poll_interval
1579 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1580 pla_result
= deep_get(db_nslcmop
, ('_admin', 'pla'))
1583 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id
))
1585 for pla_vnf
in pla_result
['vnf']:
1586 vnfr
= db_vnfrs
.get(pla_vnf
['member-vnf-index'])
1587 if not pla_vnf
.get('vimAccountId') or not vnfr
:
1590 self
.db
.set_one("vnfrs", {"_id": vnfr
["_id"]}, {"vim-account-id": pla_vnf
['vimAccountId']})
1592 vnfr
["vim-account-id"] = pla_vnf
['vimAccountId']
1595 def update_nsrs_with_pla_result(self
, params
):
1597 nslcmop_id
= deep_get(params
, ('placement', 'nslcmopId'))
1598 self
.update_db_2("nslcmops", nslcmop_id
, {"_admin.pla": params
.get('placement')})
1599 except Exception as e
:
1600 self
.logger
.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id
, e
))
1602 async def instantiate(self
, nsr_id
, nslcmop_id
):
1605 :param nsr_id: ns instance to deploy
1606 :param nslcmop_id: operation to run
1610 # Try to lock HA task here
1611 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
1612 if not task_is_locked_by_me
:
1613 self
.logger
.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id
))
1616 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
1617 self
.logger
.debug(logging_text
+ "Enter")
1619 # get all needed from database
1621 # database nsrs record
1624 # database nslcmops record
1627 # update operation on nsrs
1629 # update operation on nslcmops
1630 db_nslcmop_update
= {}
1632 nslcmop_operation_state
= None
1633 db_vnfrs
= {} # vnf's info indexed by member-index
1635 tasks_dict_info
= {} # from task to info text
1638 stage
= ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1639 # ^ stage, step, VIM progress
1641 # wait for any previous tasks in process
1642 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
1644 stage
[1] = "Sync filesystem from database."
1645 self
.fs
.sync() # TODO, make use of partial sync, only for the needed packages
1647 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1648 stage
[1] = "Reading from database."
1649 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1650 db_nsr_update
["detailed-status"] = "creating"
1651 db_nsr_update
["operational-status"] = "init"
1652 self
._write
_ns
_status
(
1654 ns_state
="BUILDING",
1655 current_operation
="INSTANTIATING",
1656 current_operation_id
=nslcmop_id
,
1657 other_update
=db_nsr_update
1659 self
._write
_op
_status
(
1665 # read from db: operation
1666 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
1667 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1668 ns_params
= db_nslcmop
.get("operationParams")
1669 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1670 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1672 timeout_ns_deploy
= self
.timeout
.get("ns_deploy", self
.timeout_ns_deploy
)
1675 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
1676 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1677 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
1678 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
1680 # nsr_name = db_nsr["name"] # TODO short-name??
1682 # read from db: vnf's of this ns
1683 stage
[1] = "Getting vnfrs from db."
1684 self
.logger
.debug(logging_text
+ stage
[1])
1685 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
1687 # read from db: vnfd's for every vnf
1688 db_vnfds
= [] # every vnfd data
1690 # for each vnf in ns, read vnfd
1691 for vnfr
in db_vnfrs_list
:
1692 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
1693 vnfd_id
= vnfr
["vnfd-id"]
1694 vnfd_ref
= vnfr
["vnfd-ref"]
1696 # if we haven't this vnfd, read it from db
1697 if vnfd_id
not in db_vnfds
:
1699 stage
[1] = "Getting vnfd={} id='{}' from db.".format(vnfd_id
, vnfd_ref
)
1700 self
.logger
.debug(logging_text
+ stage
[1])
1701 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
1704 db_vnfds
.append(vnfd
)
1706 # Get or generates the _admin.deployed.VCA list
1707 vca_deployed_list
= None
1708 if db_nsr
["_admin"].get("deployed"):
1709 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
1710 if vca_deployed_list
is None:
1711 vca_deployed_list
= []
1712 configuration_status_list
= []
1713 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
1714 db_nsr_update
["configurationStatus"] = configuration_status_list
1715 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1716 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
1717 elif isinstance(vca_deployed_list
, dict):
1718 # maintain backward compatibility. Change a dict to list at database
1719 vca_deployed_list
= list(vca_deployed_list
.values())
1720 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
1721 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
1723 if not isinstance(deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list):
1724 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
1725 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
1727 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1728 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
1729 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1730 self
.db
.set_list("vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"})
1732 # n2vc_redesign STEP 2 Deploy Network Scenario
1733 stage
[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
1734 self
._write
_op
_status
(
1739 stage
[1] = "Deploying KDUs."
1740 # self.logger.debug(logging_text + "Before deploy_kdus")
1741 # Call to deploy_kdus in case exists the "vdu:kdu" param
1742 await self
.deploy_kdus(
1743 logging_text
=logging_text
,
1745 nslcmop_id
=nslcmop_id
,
1748 task_instantiation_info
=tasks_dict_info
,
1751 stage
[1] = "Getting VCA public key."
1752 # n2vc_redesign STEP 1 Get VCA public ssh-key
1753 # feature 1429. Add n2vc public key to needed VMs
1754 n2vc_key
= self
.n2vc
.get_public_key()
1755 n2vc_key_list
= [n2vc_key
]
1756 if self
.vca_config
.get("public_key"):
1757 n2vc_key_list
.append(self
.vca_config
["public_key"])
1759 stage
[1] = "Deploying NS at VIM."
1760 task_ro
= asyncio
.ensure_future(
1761 self
.instantiate_RO(
1762 logging_text
=logging_text
,
1766 db_nslcmop
=db_nslcmop
,
1769 n2vc_key_list
=n2vc_key_list
,
1773 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
1774 tasks_dict_info
[task_ro
] = "Deploying at VIM"
1776 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1777 stage
[1] = "Deploying Execution Environments."
1778 self
.logger
.debug(logging_text
+ stage
[1])
1780 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
1781 for vnf_profile
in get_vnf_profiles(nsd
):
1782 vnfd_id
= vnf_profile
["vnfd-id"]
1783 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
1784 member_vnf_index
= str(vnf_profile
["id"])
1785 db_vnfr
= db_vnfrs
[member_vnf_index
]
1786 base_folder
= vnfd
["_admin"]["storage"]
1792 # Get additional parameters
1793 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
1794 if db_vnfr
.get("additionalParamsForVnf"):
1795 deploy_params
.update(parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy()))
1797 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
1798 if descriptor_config
:
1800 logging_text
=logging_text
+ "member_vnf_index={} ".format(member_vnf_index
),
1803 nslcmop_id
=nslcmop_id
,
1809 member_vnf_index
=member_vnf_index
,
1810 vdu_index
=vdu_index
,
1812 deploy_params
=deploy_params
,
1813 descriptor_config
=descriptor_config
,
1814 base_folder
=base_folder
,
1815 task_instantiation_info
=tasks_dict_info
,
1819 # Deploy charms for each VDU that supports one.
1820 for vdud
in get_vdu_list(vnfd
):
1822 descriptor_config
= get_configuration(vnfd
, vdu_id
)
1823 vdur
= find_in_list(db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
)
1825 if vdur
.get("additionalParams"):
1826 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
1828 deploy_params_vdu
= deploy_params
1829 deploy_params_vdu
["OSM"] = get_osm_params(db_vnfr
, vdu_id
, vdu_count_index
=0)
1830 vdud_count
= get_vdu_profile(vnfd
, vdu_id
).get("max-number-of-instances", 1)
1832 self
.logger
.debug("VDUD > {}".format(vdud
))
1833 self
.logger
.debug("Descriptor config > {}".format(descriptor_config
))
1834 if descriptor_config
:
1837 for vdu_index
in range(vdud_count
):
1838 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1840 logging_text
=logging_text
+ "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1841 member_vnf_index
, vdu_id
, vdu_index
),
1844 nslcmop_id
=nslcmop_id
,
1850 member_vnf_index
=member_vnf_index
,
1851 vdu_index
=vdu_index
,
1853 deploy_params
=deploy_params_vdu
,
1854 descriptor_config
=descriptor_config
,
1855 base_folder
=base_folder
,
1856 task_instantiation_info
=tasks_dict_info
,
1859 for kdud
in get_kdu_list(vnfd
):
1860 kdu_name
= kdud
["name"]
1861 descriptor_config
= get_configuration(vnfd
, kdu_name
)
1862 if descriptor_config
:
1866 kdur
= next(x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
)
1867 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
1868 if kdur
.get("additionalParams"):
1869 deploy_params_kdu
= parse_yaml_strings(kdur
["additionalParams"])
1872 logging_text
=logging_text
,
1875 nslcmop_id
=nslcmop_id
,
1881 member_vnf_index
=member_vnf_index
,
1882 vdu_index
=vdu_index
,
1884 deploy_params
=deploy_params_kdu
,
1885 descriptor_config
=descriptor_config
,
1886 base_folder
=base_folder
,
1887 task_instantiation_info
=tasks_dict_info
,
1891 # Check if this NS has a charm configuration
1892 descriptor_config
= nsd
.get("ns-configuration")
1893 if descriptor_config
and descriptor_config
.get("juju"):
1896 member_vnf_index
= None
1902 # Get additional parameters
1903 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
1904 if db_nsr
.get("additionalParamsForNs"):
1905 deploy_params
.update(parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy()))
1906 base_folder
= nsd
["_admin"]["storage"]
1908 logging_text
=logging_text
,
1911 nslcmop_id
=nslcmop_id
,
1917 member_vnf_index
=member_vnf_index
,
1918 vdu_index
=vdu_index
,
1920 deploy_params
=deploy_params
,
1921 descriptor_config
=descriptor_config
,
1922 base_folder
=base_folder
,
1923 task_instantiation_info
=tasks_dict_info
,
1927 # rest of staff will be done at finally
1929 except (ROclient
.ROClientException
, DbException
, LcmException
, N2VCException
) as e
:
1930 self
.logger
.error(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
))
1932 except asyncio
.CancelledError
:
1933 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(stage
[1]))
1934 exc
= "Operation was cancelled"
1935 except Exception as e
:
1936 exc
= traceback
.format_exc()
1937 self
.logger
.critical(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
), exc_info
=True)
1940 error_list
.append(str(exc
))
1942 # wait for pending tasks
1944 stage
[1] = "Waiting for instantiate pending tasks."
1945 self
.logger
.debug(logging_text
+ stage
[1])
1946 error_list
+= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
, timeout_ns_deploy
,
1947 stage
, nslcmop_id
, nsr_id
=nsr_id
)
1948 stage
[1] = stage
[2] = ""
1949 except asyncio
.CancelledError
:
1950 error_list
.append("Cancelled")
1951 # TODO cancel all tasks
1952 except Exception as exc
:
1953 error_list
.append(str(exc
))
1955 # update operation-status
1956 db_nsr_update
["operational-status"] = "running"
1957 # let's begin with VCA 'configured' status (later we can change it)
1958 db_nsr_update
["config-status"] = "configured"
1959 for task
, task_name
in tasks_dict_info
.items():
1960 if not task
.done() or task
.cancelled() or task
.exception():
1961 if task_name
.startswith(self
.task_name_deploy_vca
):
1962 # A N2VC task is pending
1963 db_nsr_update
["config-status"] = "failed"
1965 # RO or KDU task is pending
1966 db_nsr_update
["operational-status"] = "failed"
1968 # update status at database
1970 error_detail
= ". ".join(error_list
)
1971 self
.logger
.error(logging_text
+ error_detail
)
1972 error_description_nslcmop
= '{} Detail: {}'.format(stage
[0], error_detail
)
1973 error_description_nsr
= 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id
, stage
[0])
1975 db_nsr_update
["detailed-status"] = error_description_nsr
+ " Detail: " + error_detail
1976 db_nslcmop_update
["detailed-status"] = error_detail
1977 nslcmop_operation_state
= "FAILED"
1981 error_description_nsr
= error_description_nslcmop
= None
1983 db_nsr_update
["detailed-status"] = "Done"
1984 db_nslcmop_update
["detailed-status"] = "Done"
1985 nslcmop_operation_state
= "COMPLETED"
1988 self
._write
_ns
_status
(
1991 current_operation
="IDLE",
1992 current_operation_id
=None,
1993 error_description
=error_description_nsr
,
1994 error_detail
=error_detail
,
1995 other_update
=db_nsr_update
1997 self
._write
_op
_status
(
2000 error_message
=error_description_nslcmop
,
2001 operation_state
=nslcmop_operation_state
,
2002 other_update
=db_nslcmop_update
,
2005 if nslcmop_operation_state
:
2007 await self
.msg
.aiowrite("ns", "instantiated", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
2008 "operationState": nslcmop_operation_state
},
2010 except Exception as e
:
2011 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
2013 self
.logger
.debug(logging_text
+ "Exit")
2014 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2016 async def _add_vca_relations(self
, logging_text
, nsr_id
, vca_index
: int,
2017 timeout
: int = 3600, vca_type
: str = None) -> bool:
2020 # 1. find all relations for this VCA
2021 # 2. wait for other peers related
2025 vca_type
= vca_type
or "lxc_proxy_charm"
2027 # STEP 1: find all relations for this VCA
2030 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2031 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2034 my_vca
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))[vca_index
]
2036 # read all ns-configuration relations
2037 ns_relations
= list()
2038 db_ns_relations
= deep_get(nsd
, ('ns-configuration', 'relation'))
2040 for r
in db_ns_relations
:
2041 # check if this VCA is in the relation
2042 if my_vca
.get('member-vnf-index') in\
2043 (r
.get('entities')[0].get('id'), r
.get('entities')[1].get('id')):
2044 ns_relations
.append(r
)
2046 # read all vnf-configuration relations
2047 vnf_relations
= list()
2048 db_vnfd_list
= db_nsr
.get('vnfd-id')
2050 for vnfd
in db_vnfd_list
:
2051 db_vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd
})
2052 db_vnf_relations
= get_configuration(db_vnfd
, db_vnfd
["id"]).get("relation", [])
2053 if db_vnf_relations
:
2054 for r
in db_vnf_relations
:
2055 # check if this VCA is in the relation
2056 if my_vca
.get('vdu_id') in (r
.get('entities')[0].get('id'), r
.get('entities')[1].get('id')):
2057 vnf_relations
.append(r
)
2059 # if no relations, terminate
2060 if not ns_relations
and not vnf_relations
:
2061 self
.logger
.debug(logging_text
+ ' No relations')
2064 self
.logger
.debug(logging_text
+ ' adding relations\n {}\n {}'.format(ns_relations
, vnf_relations
))
2071 if now
- start
>= timeout
:
2072 self
.logger
.error(logging_text
+ ' : timeout adding relations')
2075 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2076 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2078 # for each defined NS relation, find the VCA's related
2079 for r
in ns_relations
.copy():
2080 from_vca_ee_id
= None
2082 from_vca_endpoint
= None
2083 to_vca_endpoint
= None
2084 vca_list
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))
2085 for vca
in vca_list
:
2086 if vca
.get('member-vnf-index') == r
.get('entities')[0].get('id') \
2087 and vca
.get('config_sw_installed'):
2088 from_vca_ee_id
= vca
.get('ee_id')
2089 from_vca_endpoint
= r
.get('entities')[0].get('endpoint')
2090 if vca
.get('member-vnf-index') == r
.get('entities')[1].get('id') \
2091 and vca
.get('config_sw_installed'):
2092 to_vca_ee_id
= vca
.get('ee_id')
2093 to_vca_endpoint
= r
.get('entities')[1].get('endpoint')
2094 if from_vca_ee_id
and to_vca_ee_id
:
2096 await self
.vca_map
[vca_type
].add_relation(
2097 ee_id_1
=from_vca_ee_id
,
2098 ee_id_2
=to_vca_ee_id
,
2099 endpoint_1
=from_vca_endpoint
,
2100 endpoint_2
=to_vca_endpoint
)
2101 # remove entry from relations list
2102 ns_relations
.remove(r
)
2104 # check failed peers
2106 vca_status_list
= db_nsr
.get('configurationStatus')
2108 for i
in range(len(vca_list
)):
2110 vca_status
= vca_status_list
[i
]
2111 if vca
.get('member-vnf-index') == r
.get('entities')[0].get('id'):
2112 if vca_status
.get('status') == 'BROKEN':
2113 # peer broken: remove relation from list
2114 ns_relations
.remove(r
)
2115 if vca
.get('member-vnf-index') == r
.get('entities')[1].get('id'):
2116 if vca_status
.get('status') == 'BROKEN':
2117 # peer broken: remove relation from list
2118 ns_relations
.remove(r
)
2123 # for each defined VNF relation, find the VCA's related
2124 for r
in vnf_relations
.copy():
2125 from_vca_ee_id
= None
2127 from_vca_endpoint
= None
2128 to_vca_endpoint
= None
2129 vca_list
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))
2130 for vca
in vca_list
:
2131 key_to_check
= "vdu_id"
2132 if vca
.get("vdu_id") is None:
2133 key_to_check
= "vnfd_id"
2134 if vca
.get(key_to_check
) == r
.get('entities')[0].get('id') and vca
.get('config_sw_installed'):
2135 from_vca_ee_id
= vca
.get('ee_id')
2136 from_vca_endpoint
= r
.get('entities')[0].get('endpoint')
2137 if vca
.get(key_to_check
) == r
.get('entities')[1].get('id') and vca
.get('config_sw_installed'):
2138 to_vca_ee_id
= vca
.get('ee_id')
2139 to_vca_endpoint
= r
.get('entities')[1].get('endpoint')
2140 if from_vca_ee_id
and to_vca_ee_id
:
2142 await self
.vca_map
[vca_type
].add_relation(
2143 ee_id_1
=from_vca_ee_id
,
2144 ee_id_2
=to_vca_ee_id
,
2145 endpoint_1
=from_vca_endpoint
,
2146 endpoint_2
=to_vca_endpoint
)
2147 # remove entry from relations list
2148 vnf_relations
.remove(r
)
2150 # check failed peers
2152 vca_status_list
= db_nsr
.get('configurationStatus')
2154 for i
in range(len(vca_list
)):
2156 vca_status
= vca_status_list
[i
]
2157 if vca
.get('vdu_id') == r
.get('entities')[0].get('id'):
2158 if vca_status
.get('status') == 'BROKEN':
2159 # peer broken: remove relation from list
2160 vnf_relations
.remove(r
)
2161 if vca
.get('vdu_id') == r
.get('entities')[1].get('id'):
2162 if vca_status
.get('status') == 'BROKEN':
2163 # peer broken: remove relation from list
2164 vnf_relations
.remove(r
)
2170 await asyncio
.sleep(5.0)
2172 if not ns_relations
and not vnf_relations
:
2173 self
.logger
.debug('Relations added')
2178 except Exception as e
:
2179 self
.logger
.warn(logging_text
+ ' ERROR adding relations: {}'.format(e
))
2182 async def _install_kdu(self
, nsr_id
: str, nsr_db_path
: str, vnfr_data
: dict, kdu_index
: int, kdud
: dict,
2183 vnfd
: dict, k8s_instance_info
: dict, k8params
: dict = None, timeout
: int = 600):
2186 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
2188 db_dict_install
= {"collection": "nsrs",
2189 "filter": {"_id": nsr_id
},
2190 "path": nsr_db_path
}
2192 kdu_instance
= self
.k8scluster_map
[k8sclustertype
].generate_kdu_instance_name(
2193 db_dict
=db_dict_install
,
2194 kdu_model
=k8s_instance_info
["kdu-model"],
2196 self
.update_db_2("nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
})
2197 await self
.k8scluster_map
[k8sclustertype
].install(
2198 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2199 kdu_model
=k8s_instance_info
["kdu-model"],
2202 db_dict
=db_dict_install
,
2204 kdu_name
=k8s_instance_info
["kdu-name"],
2205 namespace
=k8s_instance_info
["namespace"],
2206 kdu_instance
=kdu_instance
,
2208 self
.update_db_2("nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
})
2210 # Obtain services to obtain management service ip
2211 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
2212 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2213 kdu_instance
=kdu_instance
,
2214 namespace
=k8s_instance_info
["namespace"])
2216 # Obtain management service info (if exists)
2217 vnfr_update_dict
= {}
2219 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
2220 mgmt_services
= [service
for service
in kdud
.get("service", []) if service
.get("mgmt-service")]
2221 for mgmt_service
in mgmt_services
:
2222 for service
in services
:
2223 if service
["name"].startswith(mgmt_service
["name"]):
2224 # Mgmt service found, Obtain service ip
2225 ip
= service
.get("external_ip", service
.get("cluster_ip"))
2226 if isinstance(ip
, list) and len(ip
) == 1:
2229 vnfr_update_dict
["kdur.{}.ip-address".format(kdu_index
)] = ip
2231 # Check if must update also mgmt ip at the vnf
2232 service_external_cp
= mgmt_service
.get("external-connection-point-ref")
2233 if service_external_cp
:
2234 if deep_get(vnfd
, ("mgmt-interface", "cp")) == service_external_cp
:
2235 vnfr_update_dict
["ip-address"] = ip
2239 self
.logger
.warn("Mgmt service name: {} not found".format(mgmt_service
["name"]))
2241 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
2242 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
2244 kdu_config
= kdud
.get("kdu-configuration")
2245 if kdu_config
and kdu_config
.get("initial-config-primitive") and kdu_config
.get("juju") is None:
2246 initial_config_primitive_list
= kdu_config
.get("initial-config-primitive")
2247 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
2249 for initial_config_primitive
in initial_config_primitive_list
:
2250 primitive_params_
= self
._map
_primitive
_params
(initial_config_primitive
, {}, {})
2252 await asyncio
.wait_for(
2253 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
2254 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2255 kdu_instance
=kdu_instance
,
2256 primitive_name
=initial_config_primitive
["name"],
2257 params
=primitive_params_
, db_dict
={}),
2260 except Exception as e
:
2261 # Prepare update db with error and raise exception
2263 self
.update_db_2("nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)})
2264 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), {"kdur.{}.status".format(kdu_index
): "ERROR"})
2266 # ignore to keep original exception
2268 # reraise original error
2273 async def deploy_kdus(self
, logging_text
, nsr_id
, nslcmop_id
, db_vnfrs
, db_vnfds
, task_instantiation_info
):
2274 # Launch kdus if present in the descriptor
2276 k8scluster_id_2_uuic
= {"helm-chart-v3": {}, "helm-chart": {}, "juju-bundle": {}}
2278 async def _get_cluster_id(cluster_id
, cluster_type
):
2279 nonlocal k8scluster_id_2_uuic
2280 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
2281 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
2283 # check if K8scluster is creating and wait look if previous tasks in process
2284 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related("k8scluster", cluster_id
)
2286 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name
, cluster_id
)
2287 self
.logger
.debug(logging_text
+ text
)
2288 await asyncio
.wait(task_dependency
, timeout
=3600)
2290 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False)
2291 if not db_k8scluster
:
2292 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
2294 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
2296 if cluster_type
== "helm-chart-v3":
2298 # backward compatibility for existing clusters that have not been initialized for helm v3
2299 k8s_credentials
= yaml
.safe_dump(db_k8scluster
.get("credentials"))
2300 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(k8s_credentials
,
2301 reuse_cluster_uuid
=cluster_id
)
2302 db_k8scluster_update
= {}
2303 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
2304 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
2305 db_k8scluster_update
["_admin.helm-chart-v3.created"] = uninstall_sw
2306 db_k8scluster_update
["_admin.helm-chart-v3.operationalState"] = "ENABLED"
2307 self
.update_db_2("k8sclusters", cluster_id
, db_k8scluster_update
)
2308 except Exception as e
:
2309 self
.logger
.error(logging_text
+ "error initializing helm-v3 cluster: {}".format(str(e
)))
2310 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id
,
2313 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".
2314 format(cluster_id
, cluster_type
))
2315 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
2318 logging_text
+= "Deploy kdus: "
2321 db_nsr_update
= {"_admin.deployed.K8s": []}
2322 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2325 updated_cluster_list
= []
2326 updated_v3_cluster_list
= []
2328 for vnfr_data
in db_vnfrs
.values():
2329 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
2330 # Step 0: Prepare and set parameters
2331 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
2332 vnfd_id
= vnfr_data
.get('vnfd-id')
2333 vnfd_with_id
= find_in_list(db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
)
2334 kdud
= next(kdud
for kdud
in vnfd_with_id
["kdu"] if kdud
["name"] == kdur
["kdu-name"])
2335 namespace
= kdur
.get("k8s-namespace")
2336 if kdur
.get("helm-chart"):
2337 kdumodel
= kdur
["helm-chart"]
2338 # Default version: helm3, if helm-version is v2 assign v2
2339 k8sclustertype
= "helm-chart-v3"
2340 self
.logger
.debug("kdur: {}".format(kdur
))
2341 if kdur
.get("helm-version") and kdur
.get("helm-version") == "v2":
2342 k8sclustertype
= "helm-chart"
2343 elif kdur
.get("juju-bundle"):
2344 kdumodel
= kdur
["juju-bundle"]
2345 k8sclustertype
= "juju-bundle"
2347 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2348 "juju-bundle. Maybe an old NBI version is running".
2349 format(vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]))
2350 # check if kdumodel is a file and exists
2352 vnfd_with_id
= find_in_list(db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
)
2353 storage
= deep_get(vnfd_with_id
, ('_admin', 'storage'))
2354 if storage
and storage
.get('pkg-dir'): # may be not present if vnfd has not artifacts
2355 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2356 filename
= '{}/{}/{}s/{}'.format(storage
["folder"], storage
["pkg-dir"], k8sclustertype
,
2358 if self
.fs
.file_exists(filename
, mode
='file') or self
.fs
.file_exists(filename
, mode
='dir'):
2359 kdumodel
= self
.fs
.path
+ filename
2360 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
2362 except Exception: # it is not a file
2365 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
2366 step
= "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id
)
2367 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
2370 if (k8sclustertype
== "helm-chart" and cluster_uuid
not in updated_cluster_list
)\
2371 or (k8sclustertype
== "helm-chart-v3" and cluster_uuid
not in updated_v3_cluster_list
):
2372 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
2373 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(cluster_uuid
=cluster_uuid
))
2374 if del_repo_list
or added_repo_dict
:
2375 if k8sclustertype
== "helm-chart":
2376 unset
= {'_admin.helm_charts_added.' + item
: None for item
in del_repo_list
}
2377 updated
= {'_admin.helm_charts_added.' +
2378 item
: name
for item
, name
in added_repo_dict
.items()}
2379 updated_cluster_list
.append(cluster_uuid
)
2380 elif k8sclustertype
== "helm-chart-v3":
2381 unset
= {'_admin.helm_charts_v3_added.' + item
: None for item
in del_repo_list
}
2382 updated
= {'_admin.helm_charts_v3_added.' +
2383 item
: name
for item
, name
in added_repo_dict
.items()}
2384 updated_v3_cluster_list
.append(cluster_uuid
)
2385 self
.logger
.debug(logging_text
+ "repos synchronized on k8s cluster "
2386 "'{}' to_delete: {}, to_add: {}".
2387 format(k8s_cluster_id
, del_repo_list
, added_repo_dict
))
2388 self
.db
.set_one("k8sclusters", {"_id": k8s_cluster_id
}, updated
, unset
=unset
)
2391 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data
["member-vnf-index-ref"],
2392 kdur
["kdu-name"], k8s_cluster_id
)
2393 k8s_instance_info
= {"kdu-instance": None,
2394 "k8scluster-uuid": cluster_uuid
,
2395 "k8scluster-type": k8sclustertype
,
2396 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
2397 "kdu-name": kdur
["kdu-name"],
2398 "kdu-model": kdumodel
,
2399 "namespace": namespace
}
2400 db_path
= "_admin.deployed.K8s.{}".format(index
)
2401 db_nsr_update
[db_path
] = k8s_instance_info
2402 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2403 vnfd_with_id
= find_in_list(db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
)
2404 task
= asyncio
.ensure_future(
2405 self
._install
_kdu
(nsr_id
, db_path
, vnfr_data
, kdu_index
, kdud
, vnfd_with_id
,
2406 k8s_instance_info
, k8params
=desc_params
, timeout
=600))
2407 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_KDU-{}".format(index
), task
)
2408 task_instantiation_info
[task
] = "Deploying KDU {}".format(kdur
["kdu-name"])
2412 except (LcmException
, asyncio
.CancelledError
):
2414 except Exception as e
:
2415 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
2416 if isinstance(e
, (N2VCException
, DbException
)):
2417 self
.logger
.error(logging_text
+ msg
)
2419 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
2420 raise LcmException(msg
)
2423 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2425 def _deploy_n2vc(self
, logging_text
, db_nsr
, db_vnfr
, nslcmop_id
, nsr_id
, nsi_id
, vnfd_id
, vdu_id
,
2426 kdu_name
, member_vnf_index
, vdu_index
, vdu_name
, deploy_params
, descriptor_config
,
2427 base_folder
, task_instantiation_info
, stage
):
2428 # launch instantiate_N2VC in a asyncio task and register task object
2429 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2430 # if not found, create one entry and update database
2431 # fill db_nsr._admin.deployed.VCA.<index>
2433 self
.logger
.debug(logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
))
2434 if descriptor_config
.get("juju"): # There is one execution envioronment of type juju
2435 ee_list
= [descriptor_config
]
2436 elif descriptor_config
.get("execution-environment-list"):
2437 ee_list
= descriptor_config
.get("execution-environment-list")
2438 else: # other types as script are not supported
2441 for ee_item
in ee_list
:
2442 self
.logger
.debug(logging_text
+ "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item
.get('juju'),
2443 ee_item
.get("helm-chart")))
2444 ee_descriptor_id
= ee_item
.get("id")
2445 if ee_item
.get("juju"):
2446 vca_name
= ee_item
['juju'].get('charm')
2447 vca_type
= "lxc_proxy_charm" if ee_item
['juju'].get('charm') is not None else "native_charm"
2448 if ee_item
['juju'].get('cloud') == "k8s":
2449 vca_type
= "k8s_proxy_charm"
2450 elif ee_item
['juju'].get('proxy') is False:
2451 vca_type
= "native_charm"
2452 elif ee_item
.get("helm-chart"):
2453 vca_name
= ee_item
['helm-chart']
2454 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
2457 vca_type
= "helm-v3"
2459 self
.logger
.debug(logging_text
+ "skipping non juju neither charm configuration")
2463 for vca_index
, vca_deployed
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
2464 if not vca_deployed
:
2466 if vca_deployed
.get("member-vnf-index") == member_vnf_index
and \
2467 vca_deployed
.get("vdu_id") == vdu_id
and \
2468 vca_deployed
.get("kdu_name") == kdu_name
and \
2469 vca_deployed
.get("vdu_count_index", 0) == vdu_index
and \
2470 vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
:
2473 # not found, create one.
2474 target
= "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
2476 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
2478 target
+= "/kdu/{}".format(kdu_name
)
2480 "target_element": target
,
2481 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
2482 "member-vnf-index": member_vnf_index
,
2484 "kdu_name": kdu_name
,
2485 "vdu_count_index": vdu_index
,
2486 "operational-status": "init", # TODO revise
2487 "detailed-status": "", # TODO revise
2488 "step": "initial-deploy", # TODO revise
2490 "vdu_name": vdu_name
,
2492 "ee_descriptor_id": ee_descriptor_id
2496 # create VCA and configurationStatus in db
2498 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
2499 "configurationStatus.{}".format(vca_index
): dict()
2501 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2503 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
2505 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
2506 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
2507 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
2510 task_n2vc
= asyncio
.ensure_future(
2511 self
.instantiate_N2VC(
2512 logging_text
=logging_text
,
2513 vca_index
=vca_index
,
2519 vdu_index
=vdu_index
,
2520 deploy_params
=deploy_params
,
2521 config_descriptor
=descriptor_config
,
2522 base_folder
=base_folder
,
2523 nslcmop_id
=nslcmop_id
,
2527 ee_config_descriptor
=ee_item
2530 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_N2VC-{}".format(vca_index
), task_n2vc
)
2531 task_instantiation_info
[task_n2vc
] = self
.task_name_deploy_vca
+ " {}.{}".format(
2532 member_vnf_index
or "", vdu_id
or "")
2535 def _create_nslcmop(nsr_id
, operation
, params
):
2537 Creates a ns-lcm-opp content to be stored at database.
2538 :param nsr_id: internal id of the instance
2539 :param operation: instantiate, terminate, scale, action, ...
2540 :param params: user parameters for the operation
2541 :return: dictionary following SOL005 format
2543 # Raise exception if invalid arguments
2544 if not (nsr_id
and operation
and params
):
2546 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2552 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2553 "operationState": "PROCESSING",
2554 "statusEnteredTime": now
,
2555 "nsInstanceId": nsr_id
,
2556 "lcmOperationType": operation
,
2558 "isAutomaticInvocation": False,
2559 "operationParams": params
,
2560 "isCancelPending": False,
2562 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
2563 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
2568 def _format_additional_params(self
, params
):
2569 params
= params
or {}
2570 for key
, value
in params
.items():
2571 if str(value
).startswith("!!yaml "):
2572 params
[key
] = yaml
.safe_load(value
[7:])
2575 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
2576 primitive
= seq
.get('name')
2577 primitive_params
= {}
2579 "member_vnf_index": vnf_index
,
2580 "primitive": primitive
,
2581 "primitive_params": primitive_params
,
2584 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
2588 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
2589 op
= deep_get(db_nslcmop
, ('_admin', 'operations'), [])[op_index
]
2590 if op
.get('operationState') == 'COMPLETED':
2591 # b. Skip sub-operation
2592 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2593 return self
.SUBOPERATION_STATUS_SKIP
2595 # c. retry executing sub-operation
2596 # The sub-operation exists, and operationState != 'COMPLETED'
2597 # Update operationState = 'PROCESSING' to indicate a retry.
2598 operationState
= 'PROCESSING'
2599 detailed_status
= 'In progress'
2600 self
._update
_suboperation
_status
(
2601 db_nslcmop
, op_index
, operationState
, detailed_status
)
2602 # Return the sub-operation index
2603 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2604 # with arguments extracted from the sub-operation
2607 # Find a sub-operation where all keys in a matching dictionary must match
2608 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2609 def _find_suboperation(self
, db_nslcmop
, match
):
2610 if db_nslcmop
and match
:
2611 op_list
= db_nslcmop
.get('_admin', {}).get('operations', [])
2612 for i
, op
in enumerate(op_list
):
2613 if all(op
.get(k
) == match
[k
] for k
in match
):
2615 return self
.SUBOPERATION_STATUS_NOT_FOUND
2617 # Update status for a sub-operation given its index
2618 def _update_suboperation_status(self
, db_nslcmop
, op_index
, operationState
, detailed_status
):
2619 # Update DB for HA tasks
2620 q_filter
= {'_id': db_nslcmop
['_id']}
2621 update_dict
= {'_admin.operations.{}.operationState'.format(op_index
): operationState
,
2622 '_admin.operations.{}.detailed-status'.format(op_index
): detailed_status
}
2623 self
.db
.set_one("nslcmops",
2625 update_dict
=update_dict
,
2626 fail_on_empty
=False)
2628 # Add sub-operation, return the index of the added sub-operation
2629 # Optionally, set operationState, detailed-status, and operationType
2630 # Status and type are currently set for 'scale' sub-operations:
2631 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2632 # 'detailed-status' : status message
2633 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2634 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2635 def _add_suboperation(self
, db_nslcmop
, vnf_index
, vdu_id
, vdu_count_index
, vdu_name
, primitive
,
2636 mapped_primitive_params
, operationState
=None, detailed_status
=None, operationType
=None,
2637 RO_nsr_id
=None, RO_scaling_info
=None):
2639 return self
.SUBOPERATION_STATUS_NOT_FOUND
2640 # Get the "_admin.operations" list, if it exists
2641 db_nslcmop_admin
= db_nslcmop
.get('_admin', {})
2642 op_list
= db_nslcmop_admin
.get('operations')
2643 # Create or append to the "_admin.operations" list
2644 new_op
= {'member_vnf_index': vnf_index
,
2646 'vdu_count_index': vdu_count_index
,
2647 'primitive': primitive
,
2648 'primitive_params': mapped_primitive_params
}
2650 new_op
['operationState'] = operationState
2652 new_op
['detailed-status'] = detailed_status
2654 new_op
['lcmOperationType'] = operationType
2656 new_op
['RO_nsr_id'] = RO_nsr_id
2658 new_op
['RO_scaling_info'] = RO_scaling_info
2660 # No existing operations, create key 'operations' with current operation as first list element
2661 db_nslcmop_admin
.update({'operations': [new_op
]})
2662 op_list
= db_nslcmop_admin
.get('operations')
2664 # Existing operations, append operation to list
2665 op_list
.append(new_op
)
2667 db_nslcmop_update
= {'_admin.operations': op_list
}
2668 self
.update_db_2("nslcmops", db_nslcmop
['_id'], db_nslcmop_update
)
2669 op_index
= len(op_list
) - 1
2672 # Helper methods for scale() sub-operations
2674 # pre-scale/post-scale:
2675 # Check for 3 different cases:
2676 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2677 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2678 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2679 def _check_or_add_scale_suboperation(self
, db_nslcmop
, vnf_index
, vnf_config_primitive
, primitive_params
,
2680 operationType
, RO_nsr_id
=None, RO_scaling_info
=None):
2681 # Find this sub-operation
2682 if RO_nsr_id
and RO_scaling_info
:
2683 operationType
= 'SCALE-RO'
2685 'member_vnf_index': vnf_index
,
2686 'RO_nsr_id': RO_nsr_id
,
2687 'RO_scaling_info': RO_scaling_info
,
2691 'member_vnf_index': vnf_index
,
2692 'primitive': vnf_config_primitive
,
2693 'primitive_params': primitive_params
,
2694 'lcmOperationType': operationType
2696 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
2697 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
2698 # a. New sub-operation
2699 # The sub-operation does not exist, add it.
2700 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2701 # The following parameters are set to None for all kind of scaling:
2703 vdu_count_index
= None
2705 if RO_nsr_id
and RO_scaling_info
:
2706 vnf_config_primitive
= None
2707 primitive_params
= None
2710 RO_scaling_info
= None
2711 # Initial status for sub-operation
2712 operationState
= 'PROCESSING'
2713 detailed_status
= 'In progress'
2714 # Add sub-operation for pre/post-scaling (zero or more operations)
2715 self
._add
_suboperation
(db_nslcmop
,
2720 vnf_config_primitive
,
2727 return self
.SUBOPERATION_STATUS_NEW
2729 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2730 # or op_index (operationState != 'COMPLETED')
2731 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
2733 # Function to return execution_environment id
2735 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
2736 # TODO vdu_index_count
2737 for vca
in vca_deployed_list
:
2738 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
2741 async def destroy_N2VC(self
, logging_text
, db_nslcmop
, vca_deployed
, config_descriptor
,
2742 vca_index
, destroy_ee
=True, exec_primitives
=True):
2744 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
2745 :param logging_text:
2747 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
2748 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
2749 :param vca_index: index in the database _admin.deployed.VCA
2750 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
2751 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
2752 not executed properly
2753 :return: None or exception
2757 logging_text
+ " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
2758 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
2762 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
2764 # execute terminate_primitives
2766 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
2767 config_descriptor
.get("terminate-config-primitive"), vca_deployed
.get("ee_descriptor_id"))
2768 vdu_id
= vca_deployed
.get("vdu_id")
2769 vdu_count_index
= vca_deployed
.get("vdu_count_index")
2770 vdu_name
= vca_deployed
.get("vdu_name")
2771 vnf_index
= vca_deployed
.get("member-vnf-index")
2772 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
2773 for seq
in terminate_primitives
:
2774 # For each sequence in list, get primitive and call _ns_execute_primitive()
2775 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
2776 vnf_index
, seq
.get("name"))
2777 self
.logger
.debug(logging_text
+ step
)
2778 # Create the primitive for each sequence, i.e. "primitive": "touch"
2779 primitive
= seq
.get('name')
2780 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(seq
, vnf_index
)
2783 self
._add
_suboperation
(db_nslcmop
,
2789 mapped_primitive_params
)
2790 # Sub-operations: Call _ns_execute_primitive() instead of action()
2792 result
, result_detail
= await self
._ns
_execute
_primitive
(vca_deployed
["ee_id"], primitive
,
2793 mapped_primitive_params
,
2795 except LcmException
:
2796 # this happens when VCA is not deployed. In this case it is not needed to terminate
2798 result_ok
= ['COMPLETED', 'PARTIALLY_COMPLETED']
2799 if result
not in result_ok
:
2800 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
2801 "error {}".format(seq
.get("name"), vnf_index
, result_detail
))
2802 # set that this VCA do not need terminated
2803 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(vca_index
)
2804 self
.update_db_2("nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False})
2806 if vca_deployed
.get("prometheus_jobs") and self
.prometheus
:
2807 await self
.prometheus
.update(remove_jobs
=vca_deployed
["prometheus_jobs"])
2810 await self
.vca_map
[vca_type
].delete_execution_environment(vca_deployed
["ee_id"])
2812 async def _delete_all_N2VC(self
, db_nsr
: dict):
2813 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
='TERMINATING')
2814 namespace
= "." + db_nsr
["_id"]
2816 await self
.n2vc
.delete_namespace(namespace
=namespace
, total_timeout
=self
.timeout_charm_delete
)
2817 except N2VCNotFound
: # already deleted. Skip
2819 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
='DELETED')
2821 async def _terminate_RO(self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
):
2823 Terminates a deployment from RO
2824 :param logging_text:
2825 :param nsr_deployed: db_nsr._admin.deployed
2828 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
2829 this method will update only the index 2, but it will write on database the concatenated content of the list
2834 ro_nsr_id
= ro_delete_action
= None
2835 if nsr_deployed
and nsr_deployed
.get("RO"):
2836 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
2837 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
2840 stage
[2] = "Deleting ns from VIM."
2841 db_nsr_update
["detailed-status"] = " ".join(stage
)
2842 self
._write
_op
_status
(nslcmop_id
, stage
)
2843 self
.logger
.debug(logging_text
+ stage
[2])
2844 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2845 self
._write
_op
_status
(nslcmop_id
, stage
)
2846 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
2847 ro_delete_action
= desc
["action_id"]
2848 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
2849 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
2850 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
2851 if ro_delete_action
:
2852 # wait until NS is deleted from VIM
2853 stage
[2] = "Waiting ns deleted from VIM."
2854 detailed_status_old
= None
2855 self
.logger
.debug(logging_text
+ stage
[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id
,
2857 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2858 self
._write
_op
_status
(nslcmop_id
, stage
)
2860 delete_timeout
= 20 * 60 # 20 minutes
2861 while delete_timeout
> 0:
2862 desc
= await self
.RO
.show(
2864 item_id_name
=ro_nsr_id
,
2865 extra_item
="action",
2866 extra_item_id
=ro_delete_action
)
2869 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
2871 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
2872 if ns_status
== "ERROR":
2873 raise ROclient
.ROClientException(ns_status_info
)
2874 elif ns_status
== "BUILD":
2875 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
2876 elif ns_status
== "ACTIVE":
2877 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
2878 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
2881 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status
)
2882 if stage
[2] != detailed_status_old
:
2883 detailed_status_old
= stage
[2]
2884 db_nsr_update
["detailed-status"] = " ".join(stage
)
2885 self
._write
_op
_status
(nslcmop_id
, stage
)
2886 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2887 await asyncio
.sleep(5, loop
=self
.loop
)
2889 else: # delete_timeout <= 0:
2890 raise ROclient
.ROClientException("Timeout waiting ns deleted from VIM")
2892 except Exception as e
:
2893 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2894 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
2895 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
2896 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
2897 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
2898 self
.logger
.debug(logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
))
2899 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
2900 failed_detail
.append("delete conflict: {}".format(e
))
2901 self
.logger
.debug(logging_text
+ "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
))
2903 failed_detail
.append("delete error: {}".format(e
))
2904 self
.logger
.error(logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
))
2907 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
2908 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
2910 stage
[2] = "Deleting nsd from RO."
2911 db_nsr_update
["detailed-status"] = " ".join(stage
)
2912 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2913 self
._write
_op
_status
(nslcmop_id
, stage
)
2914 await self
.RO
.delete("nsd", ro_nsd_id
)
2915 self
.logger
.debug(logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
))
2916 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
2917 except Exception as e
:
2918 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
2919 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
2920 self
.logger
.debug(logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
))
2921 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
2922 failed_detail
.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
))
2923 self
.logger
.debug(logging_text
+ failed_detail
[-1])
2925 failed_detail
.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
))
2926 self
.logger
.error(logging_text
+ failed_detail
[-1])
2928 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
2929 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
2930 if not vnf_deployed
or not vnf_deployed
["id"]:
2933 ro_vnfd_id
= vnf_deployed
["id"]
2934 stage
[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
2935 vnf_deployed
["member-vnf-index"], ro_vnfd_id
)
2936 db_nsr_update
["detailed-status"] = " ".join(stage
)
2937 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2938 self
._write
_op
_status
(nslcmop_id
, stage
)
2939 await self
.RO
.delete("vnfd", ro_vnfd_id
)
2940 self
.logger
.debug(logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
))
2941 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
2942 except Exception as e
:
2943 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
2944 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
2945 self
.logger
.debug(logging_text
+ "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
))
2946 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
2947 failed_detail
.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
))
2948 self
.logger
.debug(logging_text
+ failed_detail
[-1])
2950 failed_detail
.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
))
2951 self
.logger
.error(logging_text
+ failed_detail
[-1])
2954 stage
[2] = "Error deleting from VIM"
2956 stage
[2] = "Deleted from VIM"
2957 db_nsr_update
["detailed-status"] = " ".join(stage
)
2958 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2959 self
._write
_op
_status
(nslcmop_id
, stage
)
2962 raise LcmException("; ".join(failed_detail
))
2964 async def terminate(self
, nsr_id
, nslcmop_id
):
2965 # Try to lock HA task here
2966 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
2967 if not task_is_locked_by_me
:
2970 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
2971 self
.logger
.debug(logging_text
+ "Enter")
2972 timeout_ns_terminate
= self
.timeout_ns_terminate
2975 operation_params
= None
2977 error_list
= [] # annotates all failed error messages
2978 db_nslcmop_update
= {}
2979 autoremove
= False # autoremove after terminated
2980 tasks_dict_info
= {}
2982 stage
= ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
2983 # ^ contains [stage, step, VIM-status]
2985 # wait for any previous tasks in process
2986 await self
.lcm_tasks
.waitfor_related_HA("ns", 'nslcmops', nslcmop_id
)
2988 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2989 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2990 operation_params
= db_nslcmop
.get("operationParams") or {}
2991 if operation_params
.get("timeout_ns_terminate"):
2992 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
2993 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2994 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2996 db_nsr_update
["operational-status"] = "terminating"
2997 db_nsr_update
["config-status"] = "terminating"
2998 self
._write
_ns
_status
(
3000 ns_state
="TERMINATING",
3001 current_operation
="TERMINATING",
3002 current_operation_id
=nslcmop_id
,
3003 other_update
=db_nsr_update
3005 self
._write
_op
_status
(
3010 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
3011 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
3014 stage
[1] = "Getting vnf descriptors from db."
3015 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
3016 db_vnfds_from_id
= {}
3017 db_vnfds_from_member_index
= {}
3019 for vnfr
in db_vnfrs_list
:
3020 vnfd_id
= vnfr
["vnfd-id"]
3021 if vnfd_id
not in db_vnfds_from_id
:
3022 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
3023 db_vnfds_from_id
[vnfd_id
] = vnfd
3024 db_vnfds_from_member_index
[vnfr
["member-vnf-index-ref"]] = db_vnfds_from_id
[vnfd_id
]
3026 # Destroy individual execution environments when there are terminating primitives.
3027 # Rest of EE will be deleted at once
3028 # TODO - check before calling _destroy_N2VC
3029 # if not operation_params.get("skip_terminate_primitives"):#
3030 # or not vca.get("needed_terminate"):
3031 stage
[0] = "Stage 2/3 execute terminating primitives."
3032 self
.logger
.debug(logging_text
+ stage
[0])
3033 stage
[1] = "Looking execution environment that needs terminate."
3034 self
.logger
.debug(logging_text
+ stage
[1])
3036 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
3037 config_descriptor
= None
3038 if not vca
or not vca
.get("ee_id"):
3040 if not vca
.get("member-vnf-index"):
3042 config_descriptor
= db_nsr
.get("ns-configuration")
3043 elif vca
.get("vdu_id"):
3044 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3045 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
3046 elif vca
.get("kdu_name"):
3047 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3048 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
3050 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3051 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
3052 vca_type
= vca
.get("type")
3053 exec_terminate_primitives
= (not operation_params
.get("skip_terminate_primitives") and
3054 vca
.get("needed_terminate"))
3055 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
3056 # pending native charms
3057 destroy_ee
= True if vca_type
in ("helm", "helm-v3", "native_charm") else False
3058 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
3059 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
3060 task
= asyncio
.ensure_future(
3061 self
.destroy_N2VC(logging_text
, db_nslcmop
, vca
, config_descriptor
, vca_index
,
3062 destroy_ee
, exec_terminate_primitives
))
3063 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
3065 # wait for pending tasks of terminate primitives
3067 self
.logger
.debug(logging_text
+ 'Waiting for tasks {}'.format(list(tasks_dict_info
.keys())))
3068 error_list
= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
,
3069 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
3071 tasks_dict_info
.clear()
3073 return # raise LcmException("; ".join(error_list))
3075 # remove All execution environments at once
3076 stage
[0] = "Stage 3/3 delete all."
3078 if nsr_deployed
.get("VCA"):
3079 stage
[1] = "Deleting all execution environments."
3080 self
.logger
.debug(logging_text
+ stage
[1])
3081 task_delete_ee
= asyncio
.ensure_future(asyncio
.wait_for(self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
),
3082 timeout
=self
.timeout_charm_delete
))
3083 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3084 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
3086 # Delete from k8scluster
3087 stage
[1] = "Deleting KDUs."
3088 self
.logger
.debug(logging_text
+ stage
[1])
3089 # print(nsr_deployed)
3090 for kdu
in get_iterable(nsr_deployed
, "K8s"):
3091 if not kdu
or not kdu
.get("kdu-instance"):
3093 kdu_instance
= kdu
.get("kdu-instance")
3094 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
3095 task_delete_kdu_instance
= asyncio
.ensure_future(
3096 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
3097 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3098 kdu_instance
=kdu_instance
))
3100 self
.logger
.error(logging_text
+ "Unknown k8s deployment type {}".
3101 format(kdu
.get("k8scluster-type")))
3103 tasks_dict_info
[task_delete_kdu_instance
] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
3106 stage
[1] = "Deleting ns from VIM."
3108 task_delete_ro
= asyncio
.ensure_future(
3109 self
._terminate
_ng
_ro
(logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
))
3111 task_delete_ro
= asyncio
.ensure_future(
3112 self
._terminate
_RO
(logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
))
3113 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
3115 # rest of staff will be done at finally
3117 except (ROclient
.ROClientException
, DbException
, LcmException
, N2VCException
) as e
:
3118 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3120 except asyncio
.CancelledError
:
3121 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(stage
[1]))
3122 exc
= "Operation was cancelled"
3123 except Exception as e
:
3124 exc
= traceback
.format_exc()
3125 self
.logger
.critical(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
), exc_info
=True)
3128 error_list
.append(str(exc
))
3130 # wait for pending tasks
3132 stage
[1] = "Waiting for terminate pending tasks."
3133 self
.logger
.debug(logging_text
+ stage
[1])
3134 error_list
+= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
, timeout_ns_terminate
,
3136 stage
[1] = stage
[2] = ""
3137 except asyncio
.CancelledError
:
3138 error_list
.append("Cancelled")
3139 # TODO cancell all tasks
3140 except Exception as exc
:
3141 error_list
.append(str(exc
))
3142 # update status at database
3144 error_detail
= "; ".join(error_list
)
3145 # self.logger.error(logging_text + error_detail)
3146 error_description_nslcmop
= '{} Detail: {}'.format(stage
[0], error_detail
)
3147 error_description_nsr
= 'Operation: TERMINATING.{}, {}.'.format(nslcmop_id
, stage
[0])
3149 db_nsr_update
["operational-status"] = "failed"
3150 db_nsr_update
["detailed-status"] = error_description_nsr
+ " Detail: " + error_detail
3151 db_nslcmop_update
["detailed-status"] = error_detail
3152 nslcmop_operation_state
= "FAILED"
3156 error_description_nsr
= error_description_nslcmop
= None
3157 ns_state
= "NOT_INSTANTIATED"
3158 db_nsr_update
["operational-status"] = "terminated"
3159 db_nsr_update
["detailed-status"] = "Done"
3160 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
3161 db_nslcmop_update
["detailed-status"] = "Done"
3162 nslcmop_operation_state
= "COMPLETED"
3165 self
._write
_ns
_status
(
3168 current_operation
="IDLE",
3169 current_operation_id
=None,
3170 error_description
=error_description_nsr
,
3171 error_detail
=error_detail
,
3172 other_update
=db_nsr_update
3174 self
._write
_op
_status
(
3177 error_message
=error_description_nslcmop
,
3178 operation_state
=nslcmop_operation_state
,
3179 other_update
=db_nslcmop_update
,
3181 if ns_state
== "NOT_INSTANTIATED":
3183 self
.db
.set_list("vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "NOT_INSTANTIATED"})
3184 except DbException
as e
:
3185 self
.logger
.warn(logging_text
+ 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
3187 if operation_params
:
3188 autoremove
= operation_params
.get("autoremove", False)
3189 if nslcmop_operation_state
:
3191 await self
.msg
.aiowrite("ns", "terminated", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
3192 "operationState": nslcmop_operation_state
,
3193 "autoremove": autoremove
},
3195 except Exception as e
:
3196 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
3198 self
.logger
.debug(logging_text
+ "Exit")
3199 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
3201 async def _wait_for_tasks(self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None):
3203 error_detail_list
= []
3205 pending_tasks
= list(created_tasks_info
.keys())
3206 num_tasks
= len(pending_tasks
)
3208 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
3209 self
._write
_op
_status
(nslcmop_id
, stage
)
3210 while pending_tasks
:
3212 _timeout
= timeout
+ time_start
- time()
3213 done
, pending_tasks
= await asyncio
.wait(pending_tasks
, timeout
=_timeout
,
3214 return_when
=asyncio
.FIRST_COMPLETED
)
3215 num_done
+= len(done
)
3216 if not done
: # Timeout
3217 for task
in pending_tasks
:
3218 new_error
= created_tasks_info
[task
] + ": Timeout"
3219 error_detail_list
.append(new_error
)
3220 error_list
.append(new_error
)
3223 if task
.cancelled():
3226 exc
= task
.exception()
3228 if isinstance(exc
, asyncio
.TimeoutError
):
3230 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
3231 error_list
.append(created_tasks_info
[task
])
3232 error_detail_list
.append(new_error
)
3233 if isinstance(exc
, (str, DbException
, N2VCException
, ROclient
.ROClientException
, LcmException
,
3234 K8sException
, NgRoException
)):
3235 self
.logger
.error(logging_text
+ new_error
)
3237 exc_traceback
= "".join(traceback
.format_exception(None, exc
, exc
.__traceback
__))
3238 self
.logger
.error(logging_text
+ created_tasks_info
[task
] + " " + exc_traceback
)
3240 self
.logger
.debug(logging_text
+ created_tasks_info
[task
] + ": Done")
3241 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
3243 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
3244 if nsr_id
: # update also nsr
3245 self
.update_db_2("nsrs", nsr_id
, {"errorDescription": "Error at: " + ", ".join(error_list
),
3246 "errorDetail": ". ".join(error_detail_list
)})
3247 self
._write
_op
_status
(nslcmop_id
, stage
)
3248 return error_detail_list
3251 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
3253 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3254 The default-value is used. If it is between < > it look for a value at instantiation_params
3255 :param primitive_desc: portion of VNFD/NSD that describes primitive
3256 :param params: Params provided by user
3257 :param instantiation_params: Instantiation params provided by user
3258 :return: a dictionary with the calculated params
3260 calculated_params
= {}
3261 for parameter
in primitive_desc
.get("parameter", ()):
3262 param_name
= parameter
["name"]
3263 if param_name
in params
:
3264 calculated_params
[param_name
] = params
[param_name
]
3265 elif "default-value" in parameter
or "value" in parameter
:
3266 if "value" in parameter
:
3267 calculated_params
[param_name
] = parameter
["value"]
3269 calculated_params
[param_name
] = parameter
["default-value"]
3270 if isinstance(calculated_params
[param_name
], str) and calculated_params
[param_name
].startswith("<") \
3271 and calculated_params
[param_name
].endswith(">"):
3272 if calculated_params
[param_name
][1:-1] in instantiation_params
:
3273 calculated_params
[param_name
] = instantiation_params
[calculated_params
[param_name
][1:-1]]
3275 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3276 format(calculated_params
[param_name
], primitive_desc
["name"]))
3278 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3279 format(param_name
, primitive_desc
["name"]))
3281 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
3282 calculated_params
[param_name
] = yaml
.safe_dump(calculated_params
[param_name
],
3283 default_flow_style
=True, width
=256)
3284 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[param_name
].startswith("!!yaml "):
3285 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
3286 if parameter
.get("data-type") == "INTEGER":
3288 calculated_params
[param_name
] = int(calculated_params
[param_name
])
3289 except ValueError: # error converting string to int
3291 "Parameter {} of primitive {} must be integer".format(param_name
, primitive_desc
["name"]))
3292 elif parameter
.get("data-type") == "BOOLEAN":
3293 calculated_params
[param_name
] = not ((str(calculated_params
[param_name
])).lower() == 'false')
3295 # add always ns_config_info if primitive name is config
3296 if primitive_desc
["name"] == "config":
3297 if "ns_config_info" in instantiation_params
:
3298 calculated_params
["ns_config_info"] = instantiation_params
["ns_config_info"]
3299 return calculated_params
3301 def _look_for_deployed_vca(self
, deployed_vca
, member_vnf_index
, vdu_id
, vdu_count_index
, kdu_name
=None,
3302 ee_descriptor_id
=None):
3303 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3304 for vca
in deployed_vca
:
3307 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
3309 if vdu_count_index
is not None and vdu_count_index
!= vca
["vdu_count_index"]:
3311 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
3313 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
3317 # vca_deployed not found
3318 raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
3319 " is not deployed".format(member_vnf_index
, vdu_id
, vdu_count_index
, kdu_name
,
3322 ee_id
= vca
.get("ee_id")
3323 vca_type
= vca
.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3325 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3326 "execution environment"
3327 .format(member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
))
3328 return ee_id
, vca_type
3330 async def _ns_execute_primitive(self
, ee_id
, primitive
, primitive_params
, retries
=0, retries_interval
=30,
3331 timeout
=None, vca_type
=None, db_dict
=None) -> (str, str):
3333 if primitive
== "config":
3334 primitive_params
= {"params": primitive_params
}
3336 vca_type
= vca_type
or "lxc_proxy_charm"
3340 output
= await asyncio
.wait_for(
3341 self
.vca_map
[vca_type
].exec_primitive(
3343 primitive_name
=primitive
,
3344 params_dict
=primitive_params
,
3345 progress_timeout
=self
.timeout_progress_primitive
,
3346 total_timeout
=self
.timeout_primitive
,
3348 timeout
=timeout
or self
.timeout_primitive
)
3351 except asyncio
.CancelledError
:
3353 except Exception as e
: # asyncio.TimeoutError
3354 if isinstance(e
, asyncio
.TimeoutError
):
3358 self
.logger
.debug('Error executing action {} on {} -> {}'.format(primitive
, ee_id
, e
))
3360 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
3362 return 'FAILED', str(e
)
3364 return 'COMPLETED', output
3366 except (LcmException
, asyncio
.CancelledError
):
3368 except Exception as e
:
3369 return 'FAIL', 'Error executing action {}: {}'.format(primitive
, e
)
3371 async def action(self
, nsr_id
, nslcmop_id
):
3372 # Try to lock HA task here
3373 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3374 if not task_is_locked_by_me
:
3377 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
3378 self
.logger
.debug(logging_text
+ "Enter")
3379 # get all needed from database
3383 db_nslcmop_update
= {}
3384 nslcmop_operation_state
= None
3385 error_description_nslcmop
= None
3388 # wait for any previous tasks in process
3389 step
= "Waiting for previous operations to terminate"
3390 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
3392 self
._write
_ns
_status
(
3395 current_operation
="RUNNING ACTION",
3396 current_operation_id
=nslcmop_id
3399 step
= "Getting information from database"
3400 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3401 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3403 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3404 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
3405 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
3406 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
3407 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
3408 primitive
= db_nslcmop
["operationParams"]["primitive"]
3409 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
3410 timeout_ns_action
= db_nslcmop
["operationParams"].get("timeout_ns_action", self
.timeout_primitive
)
3413 step
= "Getting vnfr from database"
3414 db_vnfr
= self
.db
.get_one("vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
})
3415 step
= "Getting vnfd from database"
3416 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
3418 step
= "Getting nsd from database"
3419 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
3421 # for backward compatibility
3422 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
3423 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
3424 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
3425 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3427 # look for primitive
3428 config_primitive_desc
= descriptor_configuration
= None
3430 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
3432 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
3434 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
3436 descriptor_configuration
= db_nsd
.get("ns-configuration")
3438 if descriptor_configuration
and descriptor_configuration
.get("config-primitive"):
3439 for config_primitive
in descriptor_configuration
["config-primitive"]:
3440 if config_primitive
["name"] == primitive
:
3441 config_primitive_desc
= config_primitive
3444 if not config_primitive_desc
:
3445 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
3446 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3448 primitive_name
= primitive
3449 ee_descriptor_id
= None
3451 primitive_name
= config_primitive_desc
.get("execution-environment-primitive", primitive
)
3452 ee_descriptor_id
= config_primitive_desc
.get("execution-environment-ref")
3456 vdur
= next((x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None)
3457 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
3459 kdur
= next((x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None)
3460 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3462 desc_params
= parse_yaml_strings(db_vnfr
.get("additionalParamsForVnf"))
3464 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
3465 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
3466 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
3468 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
3469 actions
.add(primitive
["name"])
3470 for primitive
in kdu_configuration
.get("config-primitive", []):
3471 actions
.add(primitive
["name"])
3472 kdu_action
= True if primitive_name
in actions
else False
3474 # TODO check if ns is in a proper status
3475 if kdu_name
and (primitive_name
in ("upgrade", "rollback", "status") or kdu_action
):
3476 # kdur and desc_params already set from before
3477 if primitive_params
:
3478 desc_params
.update(primitive_params
)
3479 # TODO Check if we will need something at vnf level
3480 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
3481 if kdu_name
== kdu
["kdu-name"] and kdu
["member-vnf-index"] == vnf_index
:
3484 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
))
3486 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
3487 msg
= "unknown k8scluster-type '{}'".format(kdu
.get("k8scluster-type"))
3488 raise LcmException(msg
)
3490 db_dict
= {"collection": "nsrs",
3491 "filter": {"_id": nsr_id
},
3492 "path": "_admin.deployed.K8s.{}".format(index
)}
3493 self
.logger
.debug(logging_text
+ "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
))
3494 step
= "Executing kdu {}".format(primitive_name
)
3495 if primitive_name
== "upgrade":
3496 if desc_params
.get("kdu_model"):
3497 kdu_model
= desc_params
.get("kdu_model")
3498 del desc_params
["kdu_model"]
3500 kdu_model
= kdu
.get("kdu-model")
3501 parts
= kdu_model
.split(sep
=":")
3503 kdu_model
= parts
[0]
3505 detailed_status
= await asyncio
.wait_for(
3506 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
3507 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3508 kdu_instance
=kdu
.get("kdu-instance"),
3509 atomic
=True, kdu_model
=kdu_model
,
3510 params
=desc_params
, db_dict
=db_dict
,
3511 timeout
=timeout_ns_action
),
3512 timeout
=timeout_ns_action
+ 10)
3513 self
.logger
.debug(logging_text
+ " Upgrade of kdu {} done".format(detailed_status
))
3514 elif primitive_name
== "rollback":
3515 detailed_status
= await asyncio
.wait_for(
3516 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
3517 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3518 kdu_instance
=kdu
.get("kdu-instance"),
3520 timeout
=timeout_ns_action
)
3521 elif primitive_name
== "status":
3522 detailed_status
= await asyncio
.wait_for(
3523 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
3524 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3525 kdu_instance
=kdu
.get("kdu-instance")),
3526 timeout
=timeout_ns_action
)
3528 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(kdu
["kdu-name"], nsr_id
)
3529 params
= self
._map
_primitive
_params
(config_primitive_desc
, primitive_params
, desc_params
)
3531 detailed_status
= await asyncio
.wait_for(
3532 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
3533 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3534 kdu_instance
=kdu_instance
,
3535 primitive_name
=primitive_name
,
3536 params
=params
, db_dict
=db_dict
,
3537 timeout
=timeout_ns_action
),
3538 timeout
=timeout_ns_action
)
3541 nslcmop_operation_state
= 'COMPLETED'
3543 detailed_status
= ''
3544 nslcmop_operation_state
= 'FAILED'
3546 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"], member_vnf_index
=vnf_index
,
3547 vdu_id
=vdu_id
, vdu_count_index
=vdu_count_index
,
3548 ee_descriptor_id
=ee_descriptor_id
)
3549 db_nslcmop_notif
= {"collection": "nslcmops",
3550 "filter": {"_id": nslcmop_id
},
3551 "path": "admin.VCA"}
3552 nslcmop_operation_state
, detailed_status
= await self
._ns
_execute
_primitive
(
3554 primitive
=primitive_name
,
3555 primitive_params
=self
._map
_primitive
_params
(config_primitive_desc
, primitive_params
, desc_params
),
3556 timeout
=timeout_ns_action
,
3558 db_dict
=db_nslcmop_notif
)
3560 db_nslcmop_update
["detailed-status"] = detailed_status
3561 error_description_nslcmop
= detailed_status
if nslcmop_operation_state
== "FAILED" else ""
3562 self
.logger
.debug(logging_text
+ " task Done with result {} {}".format(nslcmop_operation_state
,
3564 return # database update is called inside finally
3566 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
3567 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3569 except asyncio
.CancelledError
:
3570 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(step
))
3571 exc
= "Operation was cancelled"
3572 except asyncio
.TimeoutError
:
3573 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
3575 except Exception as e
:
3576 exc
= traceback
.format_exc()
3577 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
3580 db_nslcmop_update
["detailed-status"] = detailed_status
= error_description_nslcmop
= \
3581 "FAILED {}: {}".format(step
, exc
)
3582 nslcmop_operation_state
= "FAILED"
3584 self
._write
_ns
_status
(
3586 ns_state
=db_nsr
["nsState"], # TODO check if degraded. For the moment use previous status
3587 current_operation
="IDLE",
3588 current_operation_id
=None,
3589 # error_description=error_description_nsr,
3590 # error_detail=error_detail,
3591 other_update
=db_nsr_update
3594 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
="", error_message
=error_description_nslcmop
,
3595 operation_state
=nslcmop_operation_state
, other_update
=db_nslcmop_update
)
3597 if nslcmop_operation_state
:
3599 await self
.msg
.aiowrite("ns", "actioned", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
3600 "operationState": nslcmop_operation_state
},
3602 except Exception as e
:
3603 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
3604 self
.logger
.debug(logging_text
+ "Exit")
3605 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
3606 return nslcmop_operation_state
, detailed_status
3608 async def scale(self
, nsr_id
, nslcmop_id
):
3609 # Try to lock HA task here
3610 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3611 if not task_is_locked_by_me
:
3614 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
3615 stage
= ['', '', '']
3616 # ^ stage, step, VIM progress
3617 self
.logger
.debug(logging_text
+ "Enter")
3618 # get all needed from database
3620 db_nslcmop_update
= {}
3623 # in case of error, indicates what part of scale was failed to put nsr at error status
3624 scale_process
= None
3625 old_operational_status
= ""
3626 old_config_status
= ""
3628 # wait for any previous tasks in process
3629 step
= "Waiting for previous operations to terminate"
3630 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
3631 self
._write
_ns
_status
(nsr_id
=nsr_id
, ns_state
=None,
3632 current_operation
="SCALING", current_operation_id
=nslcmop_id
)
3634 step
= "Getting nslcmop from database"
3635 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
3636 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3638 step
= "Getting nsr from database"
3639 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3640 old_operational_status
= db_nsr
["operational-status"]
3641 old_config_status
= db_nsr
["config-status"]
3643 step
= "Parsing scaling parameters"
3644 db_nsr_update
["operational-status"] = "scaling"
3645 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3646 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3649 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3650 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
3651 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3652 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3653 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3656 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3657 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3658 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
3659 # for backward compatibility
3660 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
3661 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
3662 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
3663 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3665 step
= "Getting vnfr from database"
3666 db_vnfr
= self
.db
.get_one("vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
})
3668 step
= "Getting vnfd from database"
3669 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
3671 step
= "Getting scaling-group-descriptor"
3672 scaling_descriptor
= find_in_list(
3676 lambda scale_desc
: scale_desc
["name"] == scaling_group
3678 if not scaling_descriptor
:
3679 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3680 "at vnfd:scaling-group-descriptor".format(scaling_group
))
3682 step
= "Sending scale order to VIM"
3683 # TODO check if ns is in a proper status
3685 if not db_nsr
["_admin"].get("scaling-group"):
3686 self
.update_db_2("nsrs", nsr_id
, {"_admin.scaling-group": [{"name": scaling_group
, "nb-scale-op": 0}]})
3687 admin_scale_index
= 0
3689 for admin_scale_index
, admin_scale_info
in enumerate(db_nsr
["_admin"]["scaling-group"]):
3690 if admin_scale_info
["name"] == scaling_group
:
3691 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
3693 else: # not found, set index one plus last element and add new entry with the name
3694 admin_scale_index
+= 1
3695 db_nsr_update
["_admin.scaling-group.{}.name".format(admin_scale_index
)] = scaling_group
3696 RO_scaling_info
= []
3697 vdu_scaling_info
= {"scaling_group_name": scaling_group
, "vdu": []}
3698 if scaling_type
== "SCALE_OUT":
3699 if "aspect-delta-details" not in scaling_descriptor
:
3701 "Aspect delta details not fount in scaling descriptor {}".format(
3702 scaling_descriptor
["name"]
3705 # count if max-instance-count is reached
3706 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
3708 vdu_scaling_info
["scaling_direction"] = "OUT"
3709 vdu_scaling_info
["vdu-create"] = {}
3710 for delta
in deltas
:
3711 for vdu_delta
in delta
["vdu-delta"]:
3712 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
3713 vdu_index
= get_vdu_index(db_vnfr
, vdu_delta
["id"])
3714 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
3716 additional_params
= self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"]) or {}
3717 cloud_init_list
= []
3719 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
3720 max_instance_count
= 10
3721 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
3722 max_instance_count
= vdu_profile
.get("max-number-of-instances", 10)
3724 deafult_instance_num
= get_number_of_instances(db_vnfd
, vdud
["id"])
3726 nb_scale_op
+= vdu_delta
.get("number-of-instances", 1)
3728 if nb_scale_op
+ deafult_instance_num
> max_instance_count
:
3730 "reached the limit of {} (max-instance-count) "
3731 "scaling-out operations for the "
3732 "scaling-group-descriptor '{}'".format(nb_scale_op
, scaling_group
)
3734 for x
in range(vdu_delta
.get("number-of-instances", 1)):
3736 # TODO Information of its own ip is not available because db_vnfr is not updated.
3737 additional_params
["OSM"] = get_osm_params(
3742 cloud_init_list
.append(
3743 self
._parse
_cloud
_init
(
3750 RO_scaling_info
.append(
3752 "osm_vdu_id": vdu_delta
["id"],
3753 "member-vnf-index": vnf_index
,
3755 "count": vdu_delta
.get("number-of-instances", 1)
3759 RO_scaling_info
[-1]["cloud_init"] = cloud_init_list
3760 vdu_scaling_info
["vdu-create"][vdu_delta
["id"]] = vdu_delta
.get("number-of-instances", 1)
3762 elif scaling_type
== "SCALE_IN":
3763 if "min-instance-count" in scaling_descriptor
and scaling_descriptor
["min-instance-count"] is not None:
3764 min_instance_count
= int(scaling_descriptor
["min-instance-count"])
3766 vdu_scaling_info
["scaling_direction"] = "IN"
3767 vdu_scaling_info
["vdu-delete"] = {}
3768 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
3769 for delta
in deltas
:
3770 for vdu_delta
in delta
["vdu-delta"]:
3771 min_instance_count
= 0
3772 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
3773 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
3774 min_instance_count
= vdu_profile
["min-number-of-instances"]
3776 deafult_instance_num
= get_number_of_instances(db_vnfd
, vdu_delta
["id"])
3778 nb_scale_op
-= vdu_delta
.get("number-of-instances", 1)
3779 if nb_scale_op
+ deafult_instance_num
< min_instance_count
:
3781 "reached the limit of {} (min-instance-count) scaling-in operations for the "
3782 "scaling-group-descriptor '{}'".format(nb_scale_op
, scaling_group
)
3784 RO_scaling_info
.append({"osm_vdu_id": vdu_delta
["id"], "member-vnf-index": vnf_index
,
3785 "type": "delete", "count": vdu_delta
.get("number-of-instances", 1)})
3786 vdu_scaling_info
["vdu-delete"][vdu_delta
["id"]] = vdu_delta
.get("number-of-instances", 1)
3788 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3789 vdu_delete
= copy(vdu_scaling_info
.get("vdu-delete"))
3790 if vdu_scaling_info
["scaling_direction"] == "IN":
3791 for vdur
in reversed(db_vnfr
["vdur"]):
3792 if vdu_delete
.get(vdur
["vdu-id-ref"]):
3793 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
3794 vdu_scaling_info
["vdu"].append({
3795 "name": vdur
.get("name") or vdur
.get("vdu-name"),
3796 "vdu_id": vdur
["vdu-id-ref"],
3799 for interface
in vdur
["interfaces"]:
3800 vdu_scaling_info
["vdu"][-1]["interface"].append({
3801 "name": interface
["name"],
3802 "ip_address": interface
["ip-address"],
3803 "mac_address": interface
.get("mac-address"),
3805 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
3808 step
= "Executing pre-scale vnf-config-primitive"
3809 if scaling_descriptor
.get("scaling-config-action"):
3810 for scaling_config_action
in scaling_descriptor
["scaling-config-action"]:
3811 if (scaling_config_action
.get("trigger") == "pre-scale-in" and scaling_type
== "SCALE_IN") \
3812 or (scaling_config_action
.get("trigger") == "pre-scale-out" and scaling_type
== "SCALE_OUT"):
3813 vnf_config_primitive
= scaling_config_action
["vnf-config-primitive-name-ref"]
3814 step
= db_nslcmop_update
["detailed-status"] = \
3815 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive
)
3817 # look for primitive
3818 for config_primitive
in (get_configuration(
3819 db_vnfd
, db_vnfd
["id"]
3820 ) or {}).get("config-primitive", ()):
3821 if config_primitive
["name"] == vnf_config_primitive
:
3825 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3826 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3827 "primitive".format(scaling_group
, vnf_config_primitive
))
3829 vnfr_params
= {"VDU_SCALE_INFO": vdu_scaling_info
}
3830 if db_vnfr
.get("additionalParamsForVnf"):
3831 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
3833 scale_process
= "VCA"
3834 db_nsr_update
["config-status"] = "configuring pre-scaling"
3835 primitive_params
= self
._map
_primitive
_params
(config_primitive
, {}, vnfr_params
)
3837 # Pre-scale retry check: Check if this sub-operation has been executed before
3838 op_index
= self
._check
_or
_add
_scale
_suboperation
(
3839 db_nslcmop
, nslcmop_id
, vnf_index
, vnf_config_primitive
, primitive_params
, 'PRE-SCALE')
3840 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
3841 # Skip sub-operation
3842 result
= 'COMPLETED'
3843 result_detail
= 'Done'
3844 self
.logger
.debug(logging_text
+
3845 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3846 vnf_config_primitive
, result
, result_detail
))
3848 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
3849 # New sub-operation: Get index of this sub-operation
3850 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
3851 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} New sub-operation".
3852 format(vnf_config_primitive
))
3854 # retry: Get registered params for this existing sub-operation
3855 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
3856 vnf_index
= op
.get('member_vnf_index')
3857 vnf_config_primitive
= op
.get('primitive')
3858 primitive_params
= op
.get('primitive_params')
3859 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Sub-operation retry".
3860 format(vnf_config_primitive
))
3861 # Execute the primitive, either with new (first-time) or registered (reintent) args
3862 ee_descriptor_id
= config_primitive
.get("execution-environment-ref")
3863 primitive_name
= config_primitive
.get("execution-environment-primitive",
3864 vnf_config_primitive
)
3865 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
3866 member_vnf_index
=vnf_index
,
3868 vdu_count_index
=None,
3869 ee_descriptor_id
=ee_descriptor_id
)
3870 result
, result_detail
= await self
._ns
_execute
_primitive
(
3871 ee_id
, primitive_name
, primitive_params
, vca_type
)
3872 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Done with result {} {}".format(
3873 vnf_config_primitive
, result
, result_detail
))
3874 # Update operationState = COMPLETED | FAILED
3875 self
._update
_suboperation
_status
(
3876 db_nslcmop
, op_index
, result
, result_detail
)
3878 if result
== "FAILED":
3879 raise LcmException(result_detail
)
3880 db_nsr_update
["config-status"] = old_config_status
3881 scale_process
= None
3884 db_nsr_update
["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)] = nb_scale_op
3885 db_nsr_update
["_admin.scaling-group.{}.time".format(admin_scale_index
)] = time()
3889 scale_process
= "RO"
3890 if self
.ro_config
.get("ng"):
3891 await self
._scale
_ng
_ro
(logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
)
3892 vdu_scaling_info
.pop("vdu-create", None)
3893 vdu_scaling_info
.pop("vdu-delete", None)
3895 scale_process
= None
3897 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3900 # execute primitive service POST-SCALING
3901 step
= "Executing post-scale vnf-config-primitive"
3902 if scaling_descriptor
.get("scaling-config-action"):
3903 for scaling_config_action
in scaling_descriptor
["scaling-config-action"]:
3904 if (scaling_config_action
.get("trigger") == "post-scale-in" and scaling_type
== "SCALE_IN") \
3905 or (scaling_config_action
.get("trigger") == "post-scale-out" and scaling_type
== "SCALE_OUT"):
3906 vnf_config_primitive
= scaling_config_action
["vnf-config-primitive-name-ref"]
3907 step
= db_nslcmop_update
["detailed-status"] = \
3908 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive
)
3910 vnfr_params
= {"VDU_SCALE_INFO": vdu_scaling_info
}
3911 if db_vnfr
.get("additionalParamsForVnf"):
3912 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
3914 # look for primitive
3915 for config_primitive
in db_vnfd
.get("vnf-configuration", {}).get("config-primitive", ()):
3916 if config_primitive
["name"] == vnf_config_primitive
:
3920 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
3921 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
3922 "config-primitive".format(scaling_group
, vnf_config_primitive
))
3923 scale_process
= "VCA"
3924 db_nsr_update
["config-status"] = "configuring post-scaling"
3925 primitive_params
= self
._map
_primitive
_params
(config_primitive
, {}, vnfr_params
)
3927 # Post-scale retry check: Check if this sub-operation has been executed before
3928 op_index
= self
._check
_or
_add
_scale
_suboperation
(
3929 db_nslcmop
, nslcmop_id
, vnf_index
, vnf_config_primitive
, primitive_params
, 'POST-SCALE')
3930 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
3931 # Skip sub-operation
3932 result
= 'COMPLETED'
3933 result_detail
= 'Done'
3934 self
.logger
.debug(logging_text
+
3935 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3936 format(vnf_config_primitive
, result
, result_detail
))
3938 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
3939 # New sub-operation: Get index of this sub-operation
3940 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
3941 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} New sub-operation".
3942 format(vnf_config_primitive
))
3944 # retry: Get registered params for this existing sub-operation
3945 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
3946 vnf_index
= op
.get('member_vnf_index')
3947 vnf_config_primitive
= op
.get('primitive')
3948 primitive_params
= op
.get('primitive_params')
3949 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Sub-operation retry".
3950 format(vnf_config_primitive
))
3951 # Execute the primitive, either with new (first-time) or registered (reintent) args
3952 ee_descriptor_id
= config_primitive
.get("execution-environment-ref")
3953 primitive_name
= config_primitive
.get("execution-environment-primitive",
3954 vnf_config_primitive
)
3955 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
3956 member_vnf_index
=vnf_index
,
3958 vdu_count_index
=None,
3959 ee_descriptor_id
=ee_descriptor_id
)
3960 result
, result_detail
= await self
._ns
_execute
_primitive
(
3961 ee_id
, primitive_name
, primitive_params
, vca_type
)
3962 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Done with result {} {}".format(
3963 vnf_config_primitive
, result
, result_detail
))
3964 # Update operationState = COMPLETED | FAILED
3965 self
._update
_suboperation
_status
(
3966 db_nslcmop
, op_index
, result
, result_detail
)
3968 if result
== "FAILED":
3969 raise LcmException(result_detail
)
3970 db_nsr_update
["config-status"] = old_config_status
3971 scale_process
= None
3974 db_nsr_update
["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3975 db_nsr_update
["operational-status"] = "running" if old_operational_status
== "failed" \
3976 else old_operational_status
3977 db_nsr_update
["config-status"] = old_config_status
3979 except (ROclient
.ROClientException
, DbException
, LcmException
, NgRoException
) as e
:
3980 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3982 except asyncio
.CancelledError
:
3983 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(step
))
3984 exc
= "Operation was cancelled"
3985 except Exception as e
:
3986 exc
= traceback
.format_exc()
3987 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
3989 self
._write
_ns
_status
(nsr_id
=nsr_id
, ns_state
=None, current_operation
="IDLE", current_operation_id
=None)
3991 db_nslcmop_update
["detailed-status"] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
3992 nslcmop_operation_state
= "FAILED"
3994 db_nsr_update
["operational-status"] = old_operational_status
3995 db_nsr_update
["config-status"] = old_config_status
3996 db_nsr_update
["detailed-status"] = ""
3998 if "VCA" in scale_process
:
3999 db_nsr_update
["config-status"] = "failed"
4000 if "RO" in scale_process
:
4001 db_nsr_update
["operational-status"] = "failed"
4002 db_nsr_update
["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id
, step
,
4005 error_description_nslcmop
= None
4006 nslcmop_operation_state
= "COMPLETED"
4007 db_nslcmop_update
["detailed-status"] = "Done"
4009 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
="", error_message
=error_description_nslcmop
,
4010 operation_state
=nslcmop_operation_state
, other_update
=db_nslcmop_update
)
4012 self
._write
_ns
_status
(nsr_id
=nsr_id
, ns_state
=None, current_operation
="IDLE",
4013 current_operation_id
=None, other_update
=db_nsr_update
)
4015 if nslcmop_operation_state
:
4017 msg
= {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
, "operationState": nslcmop_operation_state
}
4018 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
4019 except Exception as e
:
4020 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
4021 self
.logger
.debug(logging_text
+ "Exit")
4022 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
4024 async def _scale_ng_ro(self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
):
4025 nsr_id
= db_nslcmop
["nsInstanceId"]
4026 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
4029 # read from db: vnfd's for every vnf
4032 # for each vnf in ns, read vnfd
4033 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
4034 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
4035 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
4036 # if we haven't this vnfd, read it from db
4037 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
4039 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4040 db_vnfds
.append(vnfd
)
4041 n2vc_key
= self
.n2vc
.get_public_key()
4042 n2vc_key_list
= [n2vc_key
]
4043 self
.scale_vnfr(db_vnfr
, vdu_scaling_info
.get("vdu-create"), vdu_scaling_info
.get("vdu-delete"),
4045 # db_vnfr has been updated, update db_vnfrs to use it
4046 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
4047 await self
._instantiate
_ng
_ro
(logging_text
, nsr_id
, db_nsd
, db_nsr
, db_nslcmop
, db_vnfrs
,
4048 db_vnfds
, n2vc_key_list
, stage
=stage
, start_deploy
=time(),
4049 timeout_ns_deploy
=self
.timeout_ns_deploy
)
4050 if vdu_scaling_info
.get("vdu-delete"):
4051 self
.scale_vnfr(db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False)
4053 async def add_prometheus_metrics(self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
):
4054 if not self
.prometheus
:
4056 # look if exist a file called 'prometheus*.j2' and
4057 artifact_content
= self
.fs
.dir_ls(artifact_path
)
4058 job_file
= next((f
for f
in artifact_content
if f
.startswith("prometheus") and f
.endswith(".j2")), None)
4061 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
4065 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
4066 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
4068 vnfr_id
= vnfr_id
.replace("-", "")
4070 "JOB_NAME": vnfr_id
,
4071 "TARGET_IP": target_ip
,
4072 "EXPORTER_POD_IP": host_name
,
4073 "EXPORTER_POD_PORT": host_port
,
4075 job_list
= self
.prometheus
.parse_job(job_data
, variables
)
4076 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
4077 for job
in job_list
:
4078 if not isinstance(job
.get("job_name"), str) or vnfr_id
not in job
["job_name"]:
4079 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
4080 job
["nsr_id"] = nsr_id
4081 job_dict
= {jl
["job_name"]: jl
for jl
in job_list
}
4082 if await self
.prometheus
.update(job_dict
):
4083 return list(job_dict
.keys())
4085 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
4087 Get VCA Cloud and VCA Cloud Credentials for the VIM account
4089 :param: vim_account_id: VIM Account ID
4091 :return: (cloud_name, cloud_credential)
4093 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
4094 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
4096 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
4098 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
4100 :param: vim_account_id: VIM Account ID
4102 :return: (cloud_name, cloud_credential)
4104 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
4105 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")