1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
22 import logging
.handlers
25 from jinja2
import Environment
, TemplateError
, TemplateNotFound
, StrictUndefined
, UndefinedError
27 from osm_lcm
import ROclient
28 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
29 from osm_lcm
.lcm_utils
import LcmException
, LcmExceptionNoMgmtIP
, LcmBase
, deep_get
, get_iterable
, populate_dict
30 from n2vc
.k8s_helm_conn
import K8sHelmConnector
31 from n2vc
.k8s_juju_conn
import K8sJujuConnector
33 from osm_common
.dbbase
import DbException
34 from osm_common
.fsbase
import FsException
36 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
37 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
39 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
41 from copy
import copy
, deepcopy
42 from http
import HTTPStatus
44 from uuid
import uuid4
46 from random
import randint
48 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
51 class N2VCJujuConnectorLCM(N2VCJujuConnector
):
53 async def create_execution_environment(self
, namespace
: str, db_dict
: dict, reuse_ee_id
: str = None,
54 progress_timeout
: float = None, total_timeout
: float = None,
55 config
: dict = None, artifact_path
: str = None,
56 vca_type
: str = None) -> (str, dict):
57 # admit two new parameters, artifact_path and vca_type
58 if vca_type
== "k8s_proxy_charm":
59 ee_id
= await self
.install_k8s_proxy_charm(
60 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1:],
62 artifact_path
=artifact_path
,
66 return await super().create_execution_environment(
67 namespace
=namespace
, db_dict
=db_dict
, reuse_ee_id
=reuse_ee_id
,
68 progress_timeout
=progress_timeout
, total_timeout
=total_timeout
)
70 async def install_configuration_sw(self
, ee_id
: str, artifact_path
: str, db_dict
: dict,
71 progress_timeout
: float = None, total_timeout
: float = None,
72 config
: dict = None, num_units
: int = 1, vca_type
: str = "lxc_proxy_charm"):
73 if vca_type
== "k8s_proxy_charm":
75 return await super().install_configuration_sw(
76 ee_id
=ee_id
, artifact_path
=artifact_path
, db_dict
=db_dict
, progress_timeout
=progress_timeout
,
77 total_timeout
=total_timeout
, config
=config
, num_units
=num_units
)
81 timeout_vca_on_error
= 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
82 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
83 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
84 timeout_charm_delete
= 10 * 60
85 timeout_primitive
= 30 * 60 # timeout for primitive execution
86 timeout_progress_primitive
= 10 * 60 # timeout for some progress in a primitive execution
88 SUBOPERATION_STATUS_NOT_FOUND
= -1
89 SUBOPERATION_STATUS_NEW
= -2
90 SUBOPERATION_STATUS_SKIP
= -3
91 task_name_deploy_vca
= "Deploying VCA"
93 def __init__(self
, db
, msg
, fs
, lcm_tasks
, config
, loop
, prometheus
=None):
95 Init, Connect to database, filesystem storage, and messaging
96 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
103 logger
=logging
.getLogger('lcm.ns')
107 self
.lcm_tasks
= lcm_tasks
108 self
.timeout
= config
["timeout"]
109 self
.ro_config
= config
["ro_config"]
110 self
.ng_ro
= config
["ro_config"].get("ng")
111 self
.vca_config
= config
["VCA"].copy()
113 # create N2VC connector
114 self
.n2vc
= N2VCJujuConnectorLCM(
119 url
='{}:{}'.format(self
.vca_config
['host'], self
.vca_config
['port']),
120 username
=self
.vca_config
.get('user', None),
121 vca_config
=self
.vca_config
,
122 on_update_db
=self
._on
_update
_n
2vc
_db
125 self
.conn_helm_ee
= LCMHelmConn(
132 vca_config
=self
.vca_config
,
133 on_update_db
=self
._on
_update
_n
2vc
_db
136 self
.k8sclusterhelm
= K8sHelmConnector(
137 kubectl_command
=self
.vca_config
.get("kubectlpath"),
138 helm_command
=self
.vca_config
.get("helmpath"),
145 self
.k8sclusterjuju
= K8sJujuConnector(
146 kubectl_command
=self
.vca_config
.get("kubectlpath"),
147 juju_command
=self
.vca_config
.get("jujupath"),
154 self
.k8scluster_map
= {
155 "helm-chart": self
.k8sclusterhelm
,
156 "chart": self
.k8sclusterhelm
,
157 "juju-bundle": self
.k8sclusterjuju
,
158 "juju": self
.k8sclusterjuju
,
162 "lxc_proxy_charm": self
.n2vc
,
163 "native_charm": self
.n2vc
,
164 "k8s_proxy_charm": self
.n2vc
,
165 "helm": self
.conn_helm_ee
168 self
.prometheus
= prometheus
172 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
174 self
.RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
176 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
178 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
181 # TODO filter RO descriptor fields...
185 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
186 db_dict
['deploymentStatus'] = ro_descriptor
187 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
189 except Exception as e
:
190 self
.logger
.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id
, e
))
192 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
):
194 # remove last dot from path (if exists)
195 if path
.endswith('.'):
198 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
199 # .format(table, filter, path, updated_data))
202 nsr_id
= filter.get('_id')
204 # read ns record from database
205 nsr
= self
.db
.get_one(table
='nsrs', q_filter
=filter)
206 current_ns_status
= nsr
.get('nsState')
208 # get vca status for NS
209 status_dict
= await self
.n2vc
.get_status(namespace
='.' + nsr_id
, yaml_format
=False)
213 db_dict
['vcaStatus'] = status_dict
215 # update configurationStatus for this VCA
217 vca_index
= int(path
[path
.rfind(".")+1:])
219 vca_list
= deep_get(target_dict
=nsr
, key_list
=('_admin', 'deployed', 'VCA'))
220 vca_status
= vca_list
[vca_index
].get('status')
222 configuration_status_list
= nsr
.get('configurationStatus')
223 config_status
= configuration_status_list
[vca_index
].get('status')
225 if config_status
== 'BROKEN' and vca_status
!= 'failed':
226 db_dict
['configurationStatus'][vca_index
] = 'READY'
227 elif config_status
!= 'BROKEN' and vca_status
== 'failed':
228 db_dict
['configurationStatus'][vca_index
] = 'BROKEN'
229 except Exception as e
:
230 # not update configurationStatus
231 self
.logger
.debug('Error updating vca_index (ignore): {}'.format(e
))
233 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
234 # if nsState = 'DEGRADED' check if all is OK
236 if current_ns_status
in ('READY', 'DEGRADED'):
237 error_description
= ''
239 if status_dict
.get('machines'):
240 for machine_id
in status_dict
.get('machines'):
241 machine
= status_dict
.get('machines').get(machine_id
)
242 # check machine agent-status
243 if machine
.get('agent-status'):
244 s
= machine
.get('agent-status').get('status')
247 error_description
+= 'machine {} agent-status={} ; '.format(machine_id
, s
)
248 # check machine instance status
249 if machine
.get('instance-status'):
250 s
= machine
.get('instance-status').get('status')
253 error_description
+= 'machine {} instance-status={} ; '.format(machine_id
, s
)
255 if status_dict
.get('applications'):
256 for app_id
in status_dict
.get('applications'):
257 app
= status_dict
.get('applications').get(app_id
)
258 # check application status
259 if app
.get('status'):
260 s
= app
.get('status').get('status')
263 error_description
+= 'application {} status={} ; '.format(app_id
, s
)
265 if error_description
:
266 db_dict
['errorDescription'] = error_description
267 if current_ns_status
== 'READY' and is_degraded
:
268 db_dict
['nsState'] = 'DEGRADED'
269 if current_ns_status
== 'DEGRADED' and not is_degraded
:
270 db_dict
['nsState'] = 'READY'
273 self
.update_db_2("nsrs", nsr_id
, db_dict
)
275 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
277 except Exception as e
:
278 self
.logger
.warn('Error updating NS state for ns={}: {}'.format(nsr_id
, e
))
281 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
283 env
= Environment(undefined
=StrictUndefined
)
284 template
= env
.from_string(cloud_init_text
)
285 return template
.render(additional_params
or {})
286 except UndefinedError
as e
:
287 raise LcmException("Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
288 "file, must be provided in the instantiation parameters inside the "
289 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
))
290 except (TemplateError
, TemplateNotFound
) as e
:
291 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
292 format(vnfd_id
, vdu_id
, e
))
294 def _get_cloud_init(self
, vdu
, vnfd
):
296 cloud_init_content
= cloud_init_file
= None
297 if vdu
.get("cloud-init-file"):
298 base_folder
= vnfd
["_admin"]["storage"]
299 cloud_init_file
= "{}/{}/cloud_init/{}".format(base_folder
["folder"], base_folder
["pkg-dir"],
300 vdu
["cloud-init-file"])
301 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
302 cloud_init_content
= ci_file
.read()
303 elif vdu
.get("cloud-init"):
304 cloud_init_content
= vdu
["cloud-init"]
306 return cloud_init_content
307 except FsException
as e
:
308 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
309 format(vnfd
["id"], vdu
["id"], cloud_init_file
, e
))
311 def _get_osm_params(self
, db_vnfr
, vdu_id
=None, vdu_count_index
=0):
312 osm_params
= {x
.replace("-", "_"): db_vnfr
[x
] for x
in ("ip-address", "vim-account-id", "vnfd-id", "vnfd-ref")
313 if db_vnfr
.get(x
) is not None}
314 osm_params
["ns_id"] = db_vnfr
["nsr-id-ref"]
315 osm_params
["vnf_id"] = db_vnfr
["_id"]
316 osm_params
["member_vnf_index"] = db_vnfr
["member-vnf-index-ref"]
317 if db_vnfr
.get("vdur"):
318 osm_params
["vdu"] = {}
319 for vdur
in db_vnfr
["vdur"]:
321 "count_index": vdur
["count-index"],
322 "vdu_id": vdur
["vdu-id-ref"],
325 if vdur
.get("ip-address"):
326 vdu
["ip_address"] = vdur
["ip-address"]
327 for iface
in vdur
["interfaces"]:
328 vdu
["interfaces"][iface
["name"]] = \
329 {x
.replace("-", "_"): iface
[x
] for x
in ("mac-address", "ip-address", "vnf-vld-id", "name")
330 if iface
.get(x
) is not None}
331 vdu_id_index
= "{}-{}".format(vdur
["vdu-id-ref"], vdur
["count-index"])
332 osm_params
["vdu"][vdu_id_index
] = vdu
334 osm_params
["vdu_id"] = vdu_id
335 osm_params
["count_index"] = vdu_count_index
338 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
339 vdur
= next(vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"])
340 additional_params
= vdur
.get("additionalParams")
341 return self
._format
_additional
_params
(additional_params
)
343 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
345 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
346 :param vnfd: input vnfd
347 :param new_id: overrides vnf id if provided
348 :param additionalParams: Instantiation params for VNFs provided
349 :param nsrId: Id of the NSR
350 :return: copy of vnfd
352 vnfd_RO
= deepcopy(vnfd
)
353 # remove unused by RO configuration, monitoring, scaling and internal keys
354 vnfd_RO
.pop("_id", None)
355 vnfd_RO
.pop("_admin", None)
356 vnfd_RO
.pop("vnf-configuration", None)
357 vnfd_RO
.pop("monitoring-param", None)
358 vnfd_RO
.pop("scaling-group-descriptor", None)
359 vnfd_RO
.pop("kdu", None)
360 vnfd_RO
.pop("k8s-cluster", None)
362 vnfd_RO
["id"] = new_id
364 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
365 for vdu
in get_iterable(vnfd_RO
, "vdu"):
366 vdu
.pop("cloud-init-file", None)
367 vdu
.pop("cloud-init", None)
370 def _ns_params_2_RO(self
, ns_params
, nsd
, vnfd_dict
, db_vnfrs
, n2vc_key_list
):
372 Creates a RO ns descriptor from OSM ns_instantiate params
373 :param ns_params: OSM instantiate params
374 :param vnfd_dict: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
375 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index. {member-vnf-index: {vnfr_object}, ...}
376 :return: The RO ns descriptor
380 # TODO feature 1417: Check that no instantiation is set over PDU
381 # check if PDU forces a concrete vim-network-id and add it
382 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
384 def vim_account_2_RO(vim_account
):
385 if vim_account
in vim_2_RO
:
386 return vim_2_RO
[vim_account
]
388 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
389 if db_vim
["_admin"]["operationalState"] != "ENABLED":
390 raise LcmException("VIM={} is not available. operationalState={}".format(
391 vim_account
, db_vim
["_admin"]["operationalState"]))
392 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
393 vim_2_RO
[vim_account
] = RO_vim_id
396 def wim_account_2_RO(wim_account
):
397 if isinstance(wim_account
, str):
398 if wim_account
in wim_2_RO
:
399 return wim_2_RO
[wim_account
]
401 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
402 if db_wim
["_admin"]["operationalState"] != "ENABLED":
403 raise LcmException("WIM={} is not available. operationalState={}".format(
404 wim_account
, db_wim
["_admin"]["operationalState"]))
405 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
406 wim_2_RO
[wim_account
] = RO_wim_id
411 def ip_profile_2_RO(ip_profile
):
412 RO_ip_profile
= deepcopy((ip_profile
))
413 if "dns-server" in RO_ip_profile
:
414 if isinstance(RO_ip_profile
["dns-server"], list):
415 RO_ip_profile
["dns-address"] = []
416 for ds
in RO_ip_profile
.pop("dns-server"):
417 RO_ip_profile
["dns-address"].append(ds
['address'])
419 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
420 if RO_ip_profile
.get("ip-version") == "ipv4":
421 RO_ip_profile
["ip-version"] = "IPv4"
422 if RO_ip_profile
.get("ip-version") == "ipv6":
423 RO_ip_profile
["ip-version"] = "IPv6"
424 if "dhcp-params" in RO_ip_profile
:
425 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
431 # "name": ns_params["nsName"],
432 # "description": ns_params.get("nsDescription"),
433 "datacenter": vim_account_2_RO(ns_params
["vimAccountId"]),
434 "wim_account": wim_account_2_RO(ns_params
.get("wimAccountId")),
435 # "scenario": ns_params["nsdId"],
437 # set vim_account of each vnf if different from general vim_account.
438 # Get this information from <vnfr> database content, key vim-account-id
439 # Vim account can be set by placement_engine and it may be different from
440 # the instantiate parameters (vnfs.member-vnf-index.datacenter).
441 for vnf_index
, vnfr
in db_vnfrs
.items():
442 if vnfr
.get("vim-account-id") and vnfr
["vim-account-id"] != ns_params
["vimAccountId"]:
443 populate_dict(RO_ns_params
, ("vnfs", vnf_index
, "datacenter"), vim_account_2_RO(vnfr
["vim-account-id"]))
445 n2vc_key_list
= n2vc_key_list
or []
446 for vnfd_ref
, vnfd
in vnfd_dict
.items():
447 vdu_needed_access
= []
449 if vnfd
.get("vnf-configuration"):
450 ssh_required
= deep_get(vnfd
, ("vnf-configuration", "config-access", "ssh-access", "required"))
451 if ssh_required
and vnfd
.get("mgmt-interface"):
452 if vnfd
["mgmt-interface"].get("vdu-id"):
453 vdu_needed_access
.append(vnfd
["mgmt-interface"]["vdu-id"])
454 elif vnfd
["mgmt-interface"].get("cp"):
455 mgmt_cp
= vnfd
["mgmt-interface"]["cp"]
457 for vdu
in vnfd
.get("vdu", ()):
458 if vdu
.get("vdu-configuration"):
459 ssh_required
= deep_get(vdu
, ("vdu-configuration", "config-access", "ssh-access", "required"))
461 vdu_needed_access
.append(vdu
["id"])
463 for vdu_interface
in vdu
.get("interface"):
464 if vdu_interface
.get("external-connection-point-ref") and \
465 vdu_interface
["external-connection-point-ref"] == mgmt_cp
:
466 vdu_needed_access
.append(vdu
["id"])
470 if vdu_needed_access
:
471 for vnf_member
in nsd
.get("constituent-vnfd"):
472 if vnf_member
["vnfd-id-ref"] != vnfd_ref
:
474 for vdu
in vdu_needed_access
:
475 populate_dict(RO_ns_params
,
476 ("vnfs", vnf_member
["member-vnf-index"], "vdus", vdu
, "mgmt_keys"),
479 for vdu
in get_iterable(vnfd
, "vdu"):
480 cloud_init_text
= self
._get
_cloud
_init
(vdu
, vnfd
)
481 if not cloud_init_text
:
483 for vnf_member
in nsd
.get("constituent-vnfd"):
484 if vnf_member
["vnfd-id-ref"] != vnfd_ref
:
486 db_vnfr
= db_vnfrs
[vnf_member
["member-vnf-index"]]
487 additional_params
= self
._get
_vdu
_additional
_params
(db_vnfr
, vdu
["id"]) or {}
490 for vdu_index
in range(0, int(vdu
.get("count", 1))):
491 additional_params
["OSM"] = self
._get
_osm
_params
(db_vnfr
, vdu
["id"], vdu_index
)
492 cloud_init_list
.append(self
._parse
_cloud
_init
(cloud_init_text
, additional_params
, vnfd
["id"],
494 populate_dict(RO_ns_params
,
495 ("vnfs", vnf_member
["member-vnf-index"], "vdus", vdu
["id"], "cloud_init"),
498 if ns_params
.get("vduImage"):
499 RO_ns_params
["vduImage"] = ns_params
["vduImage"]
501 if ns_params
.get("ssh_keys"):
502 RO_ns_params
["cloud-config"] = {"key-pairs": ns_params
["ssh_keys"]}
503 for vnf_params
in get_iterable(ns_params
, "vnf"):
504 for constituent_vnfd
in nsd
["constituent-vnfd"]:
505 if constituent_vnfd
["member-vnf-index"] == vnf_params
["member-vnf-index"]:
506 vnf_descriptor
= vnfd_dict
[constituent_vnfd
["vnfd-id-ref"]]
509 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
510 "constituent-vnfd".format(vnf_params
["member-vnf-index"]))
512 for vdu_params
in get_iterable(vnf_params
, "vdu"):
513 # TODO feature 1417: check that this VDU exist and it is not a PDU
514 if vdu_params
.get("volume"):
515 for volume_params
in vdu_params
["volume"]:
516 if volume_params
.get("vim-volume-id"):
517 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "vdus",
518 vdu_params
["id"], "devices", volume_params
["name"], "vim_id"),
519 volume_params
["vim-volume-id"])
520 if vdu_params
.get("interface"):
521 for interface_params
in vdu_params
["interface"]:
522 if interface_params
.get("ip-address"):
523 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "vdus",
524 vdu_params
["id"], "interfaces", interface_params
["name"],
526 interface_params
["ip-address"])
527 if interface_params
.get("mac-address"):
528 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "vdus",
529 vdu_params
["id"], "interfaces", interface_params
["name"],
531 interface_params
["mac-address"])
532 if interface_params
.get("floating-ip-required"):
533 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "vdus",
534 vdu_params
["id"], "interfaces", interface_params
["name"],
536 interface_params
["floating-ip-required"])
538 for internal_vld_params
in get_iterable(vnf_params
, "internal-vld"):
539 if internal_vld_params
.get("vim-network-name"):
540 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "networks",
541 internal_vld_params
["name"], "vim-network-name"),
542 internal_vld_params
["vim-network-name"])
543 if internal_vld_params
.get("vim-network-id"):
544 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "networks",
545 internal_vld_params
["name"], "vim-network-id"),
546 internal_vld_params
["vim-network-id"])
547 if internal_vld_params
.get("ip-profile"):
548 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "networks",
549 internal_vld_params
["name"], "ip-profile"),
550 ip_profile_2_RO(internal_vld_params
["ip-profile"]))
551 if internal_vld_params
.get("provider-network"):
553 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "networks",
554 internal_vld_params
["name"], "provider-network"),
555 internal_vld_params
["provider-network"].copy())
557 for icp_params
in get_iterable(internal_vld_params
, "internal-connection-point"):
560 for vdu_descriptor
in vnf_descriptor
["vdu"]:
561 for vdu_interface
in vdu_descriptor
["interface"]:
562 if vdu_interface
.get("internal-connection-point-ref") == icp_params
["id-ref"]:
563 if icp_params
.get("ip-address"):
564 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "vdus",
565 vdu_descriptor
["id"], "interfaces",
566 vdu_interface
["name"], "ip_address"),
567 icp_params
["ip-address"])
569 if icp_params
.get("mac-address"):
570 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "vdus",
571 vdu_descriptor
["id"], "interfaces",
572 vdu_interface
["name"], "mac_address"),
573 icp_params
["mac-address"])
579 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
580 "internal-vld:id-ref={} is not present at vnfd:internal-"
581 "connection-point".format(vnf_params
["member-vnf-index"],
582 icp_params
["id-ref"]))
584 for vld_params
in get_iterable(ns_params
, "vld"):
585 if "ip-profile" in vld_params
:
586 populate_dict(RO_ns_params
, ("networks", vld_params
["name"], "ip-profile"),
587 ip_profile_2_RO(vld_params
["ip-profile"]))
589 if vld_params
.get("provider-network"):
591 populate_dict(RO_ns_params
, ("networks", vld_params
["name"], "provider-network"),
592 vld_params
["provider-network"].copy())
594 if "wimAccountId" in vld_params
and vld_params
["wimAccountId"] is not None:
595 populate_dict(RO_ns_params
, ("networks", vld_params
["name"], "wim_account"),
596 wim_account_2_RO(vld_params
["wimAccountId"])),
597 if vld_params
.get("vim-network-name"):
599 if isinstance(vld_params
["vim-network-name"], dict):
600 for vim_account
, vim_net
in vld_params
["vim-network-name"].items():
601 RO_vld_sites
.append({
602 "netmap-use": vim_net
,
603 "datacenter": vim_account_2_RO(vim_account
)
605 else: # isinstance str
606 RO_vld_sites
.append({"netmap-use": vld_params
["vim-network-name"]})
608 populate_dict(RO_ns_params
, ("networks", vld_params
["name"], "sites"), RO_vld_sites
)
610 if vld_params
.get("vim-network-id"):
612 if isinstance(vld_params
["vim-network-id"], dict):
613 for vim_account
, vim_net
in vld_params
["vim-network-id"].items():
614 RO_vld_sites
.append({
615 "netmap-use": vim_net
,
616 "datacenter": vim_account_2_RO(vim_account
)
618 else: # isinstance str
619 RO_vld_sites
.append({"netmap-use": vld_params
["vim-network-id"]})
621 populate_dict(RO_ns_params
, ("networks", vld_params
["name"], "sites"), RO_vld_sites
)
622 if vld_params
.get("ns-net"):
623 if isinstance(vld_params
["ns-net"], dict):
624 for vld_id
, instance_scenario_id
in vld_params
["ns-net"].items():
625 RO_vld_ns_net
= {"instance_scenario_id": instance_scenario_id
, "osm_id": vld_id
}
626 populate_dict(RO_ns_params
, ("networks", vld_params
["name"], "use-network"), RO_vld_ns_net
)
627 if "vnfd-connection-point-ref" in vld_params
:
628 for cp_params
in vld_params
["vnfd-connection-point-ref"]:
630 for constituent_vnfd
in nsd
["constituent-vnfd"]:
631 if constituent_vnfd
["member-vnf-index"] == cp_params
["member-vnf-index-ref"]:
632 vnf_descriptor
= vnfd_dict
[constituent_vnfd
["vnfd-id-ref"]]
636 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
637 "is not present at nsd:constituent-vnfd".format(cp_params
["member-vnf-index-ref"]))
639 for vdu_descriptor
in vnf_descriptor
["vdu"]:
640 for interface_descriptor
in vdu_descriptor
["interface"]:
641 if interface_descriptor
.get("external-connection-point-ref") == \
642 cp_params
["vnfd-connection-point-ref"]:
649 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
650 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
651 cp_params
["member-vnf-index-ref"],
652 cp_params
["vnfd-connection-point-ref"],
653 vnf_descriptor
["id"]))
654 if cp_params
.get("ip-address"):
655 populate_dict(RO_ns_params
, ("vnfs", cp_params
["member-vnf-index-ref"], "vdus",
656 vdu_descriptor
["id"], "interfaces",
657 interface_descriptor
["name"], "ip_address"),
658 cp_params
["ip-address"])
659 if cp_params
.get("mac-address"):
660 populate_dict(RO_ns_params
, ("vnfs", cp_params
["member-vnf-index-ref"], "vdus",
661 vdu_descriptor
["id"], "interfaces",
662 interface_descriptor
["name"], "mac_address"),
663 cp_params
["mac-address"])
666 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None):
667 # make a copy to do not change
668 vdu_create
= copy(vdu_create
)
669 vdu_delete
= copy(vdu_delete
)
671 vdurs
= db_vnfr
.get("vdur")
674 vdu_index
= len(vdurs
)
677 vdur
= vdurs
[vdu_index
]
678 if vdur
.get("pdu-type"):
680 vdu_id_ref
= vdur
["vdu-id-ref"]
681 if vdu_create
and vdu_create
.get(vdu_id_ref
):
682 vdur_copy
= deepcopy(vdur
)
683 vdur_copy
["status"] = "BUILD"
684 vdur_copy
["status-detailed"] = None
685 vdur_copy
["ip_address"]: None
686 for iface
in vdur_copy
["interfaces"]:
687 iface
["ip-address"] = None
688 iface
["mac-address"] = None
689 iface
.pop("mgmt_vnf", None) # only first vdu can be managment of vnf # TODO ALF
690 for index
in range(0, vdu_create
[vdu_id_ref
]):
691 vdur_copy
["_id"] = str(uuid4())
692 vdur_copy
["count-index"] += 1
693 vdurs
.insert(vdu_index
+1+index
, vdur_copy
)
694 self
.logger
.debug("scale out, adding vdu={}".format(vdur_copy
))
695 vdur_copy
= deepcopy(vdur_copy
)
697 del vdu_create
[vdu_id_ref
]
698 if vdu_delete
and vdu_delete
.get(vdu_id_ref
):
700 vdu_delete
[vdu_id_ref
] -= 1
701 if not vdu_delete
[vdu_id_ref
]:
702 del vdu_delete
[vdu_id_ref
]
703 # check all operations are done
704 if vdu_create
or vdu_delete
:
705 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
708 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
711 vnfr_update
= {"vdur": vdurs
}
712 db_vnfr
["vdur"] = vdurs
713 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
715 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
717 Updates database nsr with the RO info for the created vld
718 :param ns_update_nsr: dictionary to be filled with the updated info
719 :param db_nsr: content of db_nsr. This is also modified
720 :param nsr_desc_RO: nsr descriptor from RO
721 :return: Nothing, LcmException is raised on errors
724 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
725 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
726 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
728 vld
["vim-id"] = net_RO
.get("vim_net_id")
729 vld
["name"] = net_RO
.get("vim_name")
730 vld
["status"] = net_RO
.get("status")
731 vld
["status-detailed"] = net_RO
.get("error_msg")
732 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
735 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld
["id"]))
737 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
739 for db_vnfr
in db_vnfrs
.values():
740 vnfr_update
= {"status": "ERROR"}
741 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
742 if "status" not in vdur
:
743 vdur
["status"] = "ERROR"
744 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
746 vdur
["status-detailed"] = str(error_text
)
747 vnfr_update
["vdur.{}.status-detailed".format(vdu_index
)] = "ERROR"
748 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
749 except DbException
as e
:
750 self
.logger
.error("Cannot update vnf. {}".format(e
))
752 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
754 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
755 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
756 :param nsr_desc_RO: nsr descriptor from RO
757 :return: Nothing, LcmException is raised on errors
759 for vnf_index
, db_vnfr
in db_vnfrs
.items():
760 for vnf_RO
in nsr_desc_RO
["vnfs"]:
761 if vnf_RO
["member_vnf_index"] != vnf_index
:
764 if vnf_RO
.get("ip_address"):
765 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
["ip_address"].split(";")[0]
766 elif not db_vnfr
.get("ip-address"):
767 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
768 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index
))
770 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
771 vdur_RO_count_index
= 0
772 if vdur
.get("pdu-type"):
774 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
775 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
777 if vdur
["count-index"] != vdur_RO_count_index
:
778 vdur_RO_count_index
+= 1
780 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
781 if vdur_RO
.get("ip_address"):
782 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
784 vdur
["ip-address"] = None
785 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
786 vdur
["name"] = vdur_RO
.get("vim_name")
787 vdur
["status"] = vdur_RO
.get("status")
788 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
789 for ifacer
in get_iterable(vdur
, "interfaces"):
790 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
791 if ifacer
["name"] == interface_RO
.get("internal_name"):
792 ifacer
["ip-address"] = interface_RO
.get("ip_address")
793 ifacer
["mac-address"] = interface_RO
.get("mac_address")
796 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
798 .format(vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]))
799 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
802 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
803 "VIM info".format(vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]))
805 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
806 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
807 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
809 vld
["vim-id"] = net_RO
.get("vim_net_id")
810 vld
["name"] = net_RO
.get("vim_name")
811 vld
["status"] = net_RO
.get("status")
812 vld
["status-detailed"] = net_RO
.get("error_msg")
813 vnfr_update
["vld.{}".format(vld_index
)] = vld
816 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
817 vnf_index
, vld
["id"]))
819 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
823 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index
))
825 def _get_ns_config_info(self
, nsr_id
):
827 Generates a mapping between vnf,vdu elements and the N2VC id
828 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
829 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
830 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
831 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
833 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
834 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
836 ns_config_info
= {"osm-config-mapping": mapping
}
837 for vca
in vca_deployed_list
:
838 if not vca
["member-vnf-index"]:
840 if not vca
["vdu_id"]:
841 mapping
[vca
["member-vnf-index"]] = vca
["application"]
843 mapping
["{}.{}.{}".format(vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"])] =\
845 return ns_config_info
848 def _get_initial_config_primitive_list(desc_primitive_list
, vca_deployed
, ee_descriptor_id
):
850 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
851 primitives as verify-ssh-credentials, or config when needed
852 :param desc_primitive_list: information of the descriptor
853 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
854 this element contains a ssh public key
855 :param ee_descriptor_id: execution environment descriptor id. It is the value of
856 XXX_configuration.execution-environment-list.INDEX.id; it can be None
857 :return: The modified list. Can ba an empty list, but always a list
860 primitive_list
= desc_primitive_list
or []
862 # filter primitives by ee_id
863 primitive_list
= [p
for p
in primitive_list
if p
.get("execution-environment-ref") == ee_descriptor_id
]
867 primitive_list
.sort(key
=lambda val
: int(val
['seq']))
869 # look for primitive config, and get the position. None if not present
870 config_position
= None
871 for index
, primitive
in enumerate(primitive_list
):
872 if primitive
["name"] == "config":
873 config_position
= index
876 # for NS, add always a config primitive if not present (bug 874)
877 if not vca_deployed
["member-vnf-index"] and config_position
is None:
878 primitive_list
.insert(0, {"name": "config", "parameter": []})
880 # TODO revise if needed: for VNF/VDU add verify-ssh-credentials after config
881 if vca_deployed
["member-vnf-index"] and config_position
is not None and vca_deployed
.get("ssh-public-key"):
882 primitive_list
.insert(config_position
+ 1, {"name": "verify-ssh-credentials", "parameter": []})
883 return primitive_list
885 async def _instantiate_ng_ro(self
, logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
, db_vnfds_ref
,
886 n2vc_key_list
, stage
, start_deploy
, timeout_ns_deploy
):
887 nslcmop_id
= db_nslcmop
["_id"]
889 "name": db_nsr
["name"],
892 "image": deepcopy(db_nsr
["image"]),
893 "flavor": deepcopy(db_nsr
["flavor"]),
894 "action_id": nslcmop_id
,
896 for image
in target
["image"]:
897 image
["vim_info"] = []
898 for flavor
in target
["flavor"]:
899 flavor
["vim_info"] = []
901 ns_params
= db_nslcmop
.get("operationParams")
903 if ns_params
.get("ssh_keys"):
904 ssh_keys
+= ns_params
.get("ssh_keys")
906 ssh_keys
+= n2vc_key_list
909 for vld_index
, vld
in enumerate(nsd
.get("vld")):
910 target_vld
= {"id": vld
["id"],
912 "mgmt-network": vld
.get("mgmt-network", False),
913 "type": vld
.get("type"),
914 "vim_info": [{"vim-network-name": vld
.get("vim-network-name"),
915 "vim_account_id": ns_params
["vimAccountId"]}],
917 for cp
in vld
["vnfd-connection-point-ref"]:
918 cp2target
["member_vnf:{}.{}".format(cp
["member-vnf-index-ref"], cp
["vnfd-connection-point-ref"])] = \
919 "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
920 target
["ns"]["vld"].append(target_vld
)
921 for vnfr
in db_vnfrs
.values():
922 vnfd
= db_vnfds_ref
[vnfr
["vnfd-ref"]]
923 target_vnf
= deepcopy(vnfr
)
924 for vld
in target_vnf
.get("vld", ()):
925 # check if connected to a ns.vld
926 vnf_cp
= next((cp
for cp
in vnfd
.get("connection-point", ()) if
927 cp
.get("internal-vld-ref") == vld
["id"]), None)
929 ns_cp
= "member_vnf:{}.{}".format(vnfr
["member-vnf-index-ref"], vnf_cp
["id"])
930 if cp2target
.get(ns_cp
):
931 vld
["target"] = cp2target
[ns_cp
]
932 vld
["vim_info"] = [{"vim-network-name": vld
.get("vim-network-name"),
933 "vim_account_id": vnfr
["vim-account-id"]}]
935 for vdur
in target_vnf
.get("vdur", ()):
936 vdur
["vim_info"] = [{"vim_account_id": vnfr
["vim-account-id"]}]
937 vdud_index
, vdud
= next(k
for k
in enumerate(vnfd
["vdu"]) if k
[1]["id"] == vdur
["vdu-id-ref"])
938 # vdur["additionalParams"] = vnfr.get("additionalParamsForVnf") # TODO additional params for VDU
941 if deep_get(vdud
, ("vdu-configuration", "config-access", "ssh-access", "required")):
942 vdur
["ssh-keys"] = ssh_keys
943 vdur
["ssh-access-required"] = True
944 elif deep_get(vnfd
, ("vnf-configuration", "config-access", "ssh-access", "required")) and \
945 any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"]):
946 vdur
["ssh-keys"] = ssh_keys
947 vdur
["ssh-access-required"] = True
950 if vdud
.get("cloud-init-file"):
951 vdur
["cloud-init"] = "{}:file:{}".format(vnfd
["_id"], vdud
.get("cloud-init-file"))
952 elif vdud
.get("cloud-init"):
953 vdur
["cloud-init"] = "{}:vdu:{}".format(vnfd
["_id"], vdud_index
)
956 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
957 if not next((vi
for vi
in ns_flavor
["vim_info"] if
958 vi
and vi
.get("vim_account_id") == vnfr
["vim-account-id"]), None):
959 ns_flavor
["vim_info"].append({"vim_account_id": vnfr
["vim-account-id"]})
961 ns_image
= target
["image"][int(vdur
["ns-image-id"])]
962 if not next((vi
for vi
in ns_image
["vim_info"] if
963 vi
and vi
.get("vim_account_id") == vnfr
["vim-account-id"]), None):
964 ns_image
["vim_info"].append({"vim_account_id": vnfr
["vim-account-id"]})
966 vdur
["vim_info"] = [{"vim_account_id": vnfr
["vim-account-id"]}]
967 target
["vnf"].append(target_vnf
)
969 desc
= await self
.RO
.deploy(nsr_id
, target
)
970 action_id
= desc
["action_id"]
971 await self
._wait
_ng
_ro
(self
, nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
)
975 "_admin.deployed.RO.operational-status": "running",
976 "detailed-status": " ".join(stage
)
978 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
979 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
980 self
._write
_op
_status
(nslcmop_id
, stage
)
981 self
.logger
.debug(logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
))
984 async def _wait_ng_ro(self
, nsr_id
, action_id
, nslcmop_id
, start_time
, timeout
, stage
):
985 detailed_status_old
= None
987 while time() <= start_time
+ timeout
:
988 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
989 if desc_status
["status"] == "FAILED":
990 raise NgRoException(desc_status
["details"])
991 elif desc_status
["status"] == "BUILD":
992 stage
[2] = "VIM: ({})".format(desc_status
["details"])
993 elif desc_status
["status"] == "DONE":
994 stage
[2] = "Deployed at VIM"
997 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status
["status"])
998 if stage
[2] != detailed_status_old
:
999 detailed_status_old
= stage
[2]
1000 db_nsr_update
["detailed-status"] = " ".join(stage
)
1001 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1002 self
._write
_op
_status
(nslcmop_id
, stage
)
1003 await asyncio
.sleep(5, loop
=self
.loop
)
1004 else: # timeout_ns_deploy
1005 raise NgRoException("Timeout waiting ns to deploy")
1007 async def _terminate_ng_ro(self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
):
1011 start_deploy
= time()
1019 desc
= await self
.RO
.deploy(nsr_id
, target
)
1020 action_id
= desc
["action_id"]
1021 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1022 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1023 self
.logger
.debug(logging_text
+ "ns terminate action at RO. action_id={}".format(action_id
))
1026 delete_timeout
= 20 * 60 # 20 minutes
1027 await self
._wait
_ng
_ro
(self
, nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
)
1029 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1030 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1032 await self
.RO
.delete(nsr_id
)
1033 except Exception as e
:
1034 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1035 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1036 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1037 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1038 self
.logger
.debug(logging_text
+ "RO_action_id={} already deleted".format(action_id
))
1039 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1040 failed_detail
.append("delete conflict: {}".format(e
))
1041 self
.logger
.debug(logging_text
+ "RO_action_id={} delete conflict: {}".format(action_id
, e
))
1043 failed_detail
.append("delete error: {}".format(e
))
1044 self
.logger
.error(logging_text
+ "RO_action_id={} delete error: {}".format(action_id
, e
))
1047 stage
[2] = "Error deleting from VIM"
1049 stage
[2] = "Deleted from VIM"
1050 db_nsr_update
["detailed-status"] = " ".join(stage
)
1051 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1052 self
._write
_op
_status
(nslcmop_id
, stage
)
1055 raise LcmException("; ".join(failed_detail
))
1058 async def instantiate_RO(self
, logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
, db_vnfds_ref
,
1059 n2vc_key_list
, stage
):
1062 :param logging_text: preffix text to use at logging
1063 :param nsr_id: nsr identity
1064 :param nsd: database content of ns descriptor
1065 :param db_nsr: database content of ns record
1066 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1068 :param db_vnfds_ref: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1069 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1070 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1071 :return: None or exception
1075 RO_descriptor_number
= 0 # number of descriptors created at RO
1076 vnf_index_2_RO_id
= {} # map between vnfd/nsd id to the id used at RO
1077 nslcmop_id
= db_nslcmop
["_id"]
1078 start_deploy
= time()
1079 ns_params
= db_nslcmop
.get("operationParams")
1080 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1081 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1083 timeout_ns_deploy
= self
.timeout
.get("ns_deploy", self
.timeout_ns_deploy
)
1085 # Check for and optionally request placement optimization. Database will be updated if placement activated
1086 stage
[2] = "Waiting for Placement."
1087 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1088 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1089 for vnfr
in db_vnfrs
.values():
1090 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1093 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1096 return await self
._instantiate
_ng
_ro
(logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
,
1097 db_vnfds_ref
, n2vc_key_list
, stage
, start_deploy
,
1100 # get vnfds, instantiate at RO
1101 for c_vnf
in nsd
.get("constituent-vnfd", ()):
1102 member_vnf_index
= c_vnf
["member-vnf-index"]
1103 vnfd
= db_vnfds_ref
[c_vnf
['vnfd-id-ref']]
1104 vnfd_ref
= vnfd
["id"]
1106 stage
[2] = "Creating vnfd='{}' member_vnf_index='{}' at RO".format(vnfd_ref
, member_vnf_index
)
1107 db_nsr_update
["detailed-status"] = " ".join(stage
)
1108 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1109 self
._write
_op
_status
(nslcmop_id
, stage
)
1111 # self.logger.debug(logging_text + stage[2])
1112 vnfd_id_RO
= "{}.{}.{}".format(nsr_id
, RO_descriptor_number
, member_vnf_index
[:23])
1113 vnf_index_2_RO_id
[member_vnf_index
] = vnfd_id_RO
1114 RO_descriptor_number
+= 1
1116 # look position at deployed.RO.vnfd if not present it will be appended at the end
1117 for index
, vnf_deployed
in enumerate(db_nsr
["_admin"]["deployed"]["RO"]["vnfd"]):
1118 if vnf_deployed
["member-vnf-index"] == member_vnf_index
:
1121 index
= len(db_nsr
["_admin"]["deployed"]["RO"]["vnfd"])
1122 db_nsr
["_admin"]["deployed"]["RO"]["vnfd"].append(None)
1125 RO_update
= {"member-vnf-index": member_vnf_index
}
1126 vnfd_list
= await self
.RO
.get_list("vnfd", filter_by
={"osm_id": vnfd_id_RO
})
1128 RO_update
["id"] = vnfd_list
[0]["uuid"]
1129 self
.logger
.debug(logging_text
+ "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
1130 format(vnfd_ref
, member_vnf_index
, vnfd_list
[0]["uuid"]))
1132 vnfd_RO
= self
.vnfd2RO(vnfd
, vnfd_id_RO
, db_vnfrs
[c_vnf
["member-vnf-index"]].
1133 get("additionalParamsForVnf"), nsr_id
)
1134 desc
= await self
.RO
.create("vnfd", descriptor
=vnfd_RO
)
1135 RO_update
["id"] = desc
["uuid"]
1136 self
.logger
.debug(logging_text
+ "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
1137 vnfd_ref
, member_vnf_index
, desc
["uuid"]))
1138 db_nsr_update
["_admin.deployed.RO.vnfd.{}".format(index
)] = RO_update
1139 db_nsr
["_admin"]["deployed"]["RO"]["vnfd"][index
] = RO_update
1144 stage
[2] = "Creating nsd={} at RO".format(nsd_ref
)
1145 db_nsr_update
["detailed-status"] = " ".join(stage
)
1146 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1147 self
._write
_op
_status
(nslcmop_id
, stage
)
1149 # self.logger.debug(logging_text + stage[2])
1150 RO_osm_nsd_id
= "{}.{}.{}".format(nsr_id
, RO_descriptor_number
, nsd_ref
[:23])
1151 RO_descriptor_number
+= 1
1152 nsd_list
= await self
.RO
.get_list("nsd", filter_by
={"osm_id": RO_osm_nsd_id
})
1154 db_nsr_update
["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid
= nsd_list
[0]["uuid"]
1155 self
.logger
.debug(logging_text
+ "nsd={} exists at RO. Using RO_id={}".format(
1156 nsd_ref
, RO_nsd_uuid
))
1158 nsd_RO
= deepcopy(nsd
)
1159 nsd_RO
["id"] = RO_osm_nsd_id
1160 nsd_RO
.pop("_id", None)
1161 nsd_RO
.pop("_admin", None)
1162 for c_vnf
in nsd_RO
.get("constituent-vnfd", ()):
1163 member_vnf_index
= c_vnf
["member-vnf-index"]
1164 c_vnf
["vnfd-id-ref"] = vnf_index_2_RO_id
[member_vnf_index
]
1165 for c_vld
in nsd_RO
.get("vld", ()):
1166 for cp
in c_vld
.get("vnfd-connection-point-ref", ()):
1167 member_vnf_index
= cp
["member-vnf-index-ref"]
1168 cp
["vnfd-id-ref"] = vnf_index_2_RO_id
[member_vnf_index
]
1170 desc
= await self
.RO
.create("nsd", descriptor
=nsd_RO
)
1171 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
1172 db_nsr_update
["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid
= desc
["uuid"]
1173 self
.logger
.debug(logging_text
+ "nsd={} created at RO. RO_id={}".format(nsd_ref
, RO_nsd_uuid
))
1174 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1177 stage
[2] = "Creating nsd={} at RO".format(nsd_ref
)
1178 db_nsr_update
["detailed-status"] = " ".join(stage
)
1179 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1180 self
._write
_op
_status
(nslcmop_id
, stage
)
1182 # if present use it unless in error status
1183 RO_nsr_id
= deep_get(db_nsr
, ("_admin", "deployed", "RO", "nsr_id"))
1186 stage
[2] = "Looking for existing ns at RO"
1187 db_nsr_update
["detailed-status"] = " ".join(stage
)
1188 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1189 self
._write
_op
_status
(nslcmop_id
, stage
)
1190 # self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
1191 desc
= await self
.RO
.show("ns", RO_nsr_id
)
1193 except ROclient
.ROClientException
as e
:
1194 if e
.http_code
!= HTTPStatus
.NOT_FOUND
:
1196 RO_nsr_id
= db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1198 ns_status
, ns_status_info
= self
.RO
.check_ns_status(desc
)
1199 db_nsr_update
["_admin.deployed.RO.nsr_status"] = ns_status
1200 if ns_status
== "ERROR":
1201 stage
[2] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id
)
1202 self
.logger
.debug(logging_text
+ stage
[2])
1203 await self
.RO
.delete("ns", RO_nsr_id
)
1204 RO_nsr_id
= db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1206 stage
[2] = "Checking dependencies"
1207 db_nsr_update
["detailed-status"] = " ".join(stage
)
1208 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1209 self
._write
_op
_status
(nslcmop_id
, stage
)
1210 # self.logger.debug(logging_text + stage[2])
1212 # check if VIM is creating and wait look if previous tasks in process
1213 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related("vim_account", ns_params
["vimAccountId"])
1215 stage
[2] = "Waiting for related tasks '{}' to be completed".format(task_name
)
1216 self
.logger
.debug(logging_text
+ stage
[2])
1217 await asyncio
.wait(task_dependency
, timeout
=3600)
1218 if ns_params
.get("vnf"):
1219 for vnf
in ns_params
["vnf"]:
1220 if "vimAccountId" in vnf
:
1221 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related("vim_account",
1222 vnf
["vimAccountId"])
1224 stage
[2] = "Waiting for related tasks '{}' to be completed.".format(task_name
)
1225 self
.logger
.debug(logging_text
+ stage
[2])
1226 await asyncio
.wait(task_dependency
, timeout
=3600)
1228 stage
[2] = "Checking instantiation parameters."
1229 RO_ns_params
= self
._ns
_params
_2_RO
(ns_params
, nsd
, db_vnfds_ref
, db_vnfrs
, n2vc_key_list
)
1230 stage
[2] = "Deploying ns at VIM."
1231 db_nsr_update
["detailed-status"] = " ".join(stage
)
1232 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1233 self
._write
_op
_status
(nslcmop_id
, stage
)
1235 desc
= await self
.RO
.create("ns", descriptor
=RO_ns_params
, name
=db_nsr
["name"], scenario
=RO_nsd_uuid
)
1236 RO_nsr_id
= db_nsr_update
["_admin.deployed.RO.nsr_id"] = desc
["uuid"]
1237 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
1238 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "BUILD"
1239 self
.logger
.debug(logging_text
+ "ns created at RO. RO_id={}".format(desc
["uuid"]))
1241 # wait until NS is ready
1242 stage
[2] = "Waiting VIM to deploy ns."
1243 db_nsr_update
["detailed-status"] = " ".join(stage
)
1244 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1245 self
._write
_op
_status
(nslcmop_id
, stage
)
1246 detailed_status_old
= None
1247 self
.logger
.debug(logging_text
+ stage
[2] + " RO_ns_id={}".format(RO_nsr_id
))
1250 while time() <= start_deploy
+ timeout_ns_deploy
:
1251 desc
= await self
.RO
.show("ns", RO_nsr_id
)
1254 if desc
!= old_desc
:
1255 # desc has changed => update db
1256 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
1259 ns_status
, ns_status_info
= self
.RO
.check_ns_status(desc
)
1260 db_nsr_update
["_admin.deployed.RO.nsr_status"] = ns_status
1261 if ns_status
== "ERROR":
1262 raise ROclient
.ROClientException(ns_status_info
)
1263 elif ns_status
== "BUILD":
1264 stage
[2] = "VIM: ({})".format(ns_status_info
)
1265 elif ns_status
== "ACTIVE":
1266 stage
[2] = "Waiting for management IP address reported by the VIM. Updating VNFRs."
1268 self
.ns_update_vnfr(db_vnfrs
, desc
)
1270 except LcmExceptionNoMgmtIP
:
1273 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status
)
1274 if stage
[2] != detailed_status_old
:
1275 detailed_status_old
= stage
[2]
1276 db_nsr_update
["detailed-status"] = " ".join(stage
)
1277 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1278 self
._write
_op
_status
(nslcmop_id
, stage
)
1279 await asyncio
.sleep(5, loop
=self
.loop
)
1280 else: # timeout_ns_deploy
1281 raise ROclient
.ROClientException("Timeout waiting ns to be ready")
1284 self
.ns_update_nsr(db_nsr_update
, db_nsr
, desc
)
1286 db_nsr_update
["_admin.deployed.RO.operational-status"] = "running"
1287 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1288 stage
[2] = "Deployed at VIM"
1289 db_nsr_update
["detailed-status"] = " ".join(stage
)
1290 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1291 self
._write
_op
_status
(nslcmop_id
, stage
)
1292 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
1293 # self.logger.debug(logging_text + "Deployed at VIM")
1294 except (ROclient
.ROClientException
, LcmException
, DbException
, NgRoException
) as e
:
1295 stage
[2] = "ERROR deploying at VIM"
1296 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1299 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1301 Wait for kdu to be up, get ip address
1302 :param logging_text: prefix use for logging
1309 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1312 while nb_tries
< 360:
1313 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1314 kdur
= next((x
for x
in get_iterable(db_vnfr
, "kdur") if x
.get("kdu-name") == kdu_name
), None)
1316 raise LcmException("Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
))
1317 if kdur
.get("status"):
1318 if kdur
["status"] in ("READY", "ENABLED"):
1319 return kdur
.get("ip-address")
1321 raise LcmException("target KDU={} is in error state".format(kdu_name
))
1323 await asyncio
.sleep(10, loop
=self
.loop
)
1325 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1327 async def wait_vm_up_insert_key_ro(self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None):
1329 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1330 :param logging_text: prefix use for logging
1335 :param pub_key: public ssh key to inject, None to skip
1336 :param user: user to apply the public ssh key
1340 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1344 target_vdu_id
= None
1350 if ro_retries
>= 360: # 1 hour
1351 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
))
1353 await asyncio
.sleep(10, loop
=self
.loop
)
1356 if not target_vdu_id
:
1357 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1359 if not vdu_id
: # for the VNF case
1360 if db_vnfr
.get("status") == "ERROR":
1361 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1362 ip_address
= db_vnfr
.get("ip-address")
1365 vdur
= next((x
for x
in get_iterable(db_vnfr
, "vdur") if x
.get("ip-address") == ip_address
), None)
1367 vdur
= next((x
for x
in get_iterable(db_vnfr
, "vdur")
1368 if x
.get("vdu-id-ref") == vdu_id
and x
.get("count-index") == vdu_index
), None)
1370 if not vdur
and len(db_vnfr
.get("vdur", ())) == 1: # If only one, this should be the target vdu
1371 vdur
= db_vnfr
["vdur"][0]
1373 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id
, vdu_id
,
1376 if vdur
.get("pdu-type") or vdur
.get("status") == "ACTIVE":
1377 ip_address
= vdur
.get("ip-address")
1380 target_vdu_id
= vdur
["vdu-id-ref"]
1381 elif vdur
.get("status") == "ERROR":
1382 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1384 if not target_vdu_id
:
1387 # inject public key into machine
1388 if pub_key
and user
:
1389 # wait until NS is deployed at RO
1391 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1392 ro_nsr_id
= deep_get(db_nsrs
, ("_admin", "deployed", "RO", "nsr_id"))
1396 # self.logger.debug(logging_text + "Inserting RO key")
1397 if vdur
.get("pdu-type"):
1398 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1401 ro_vm_id
= "{}-{}".format(db_vnfr
["member-vnf-index-ref"], target_vdu_id
) # TODO add vdu_index
1403 target
= {"action": "inject_ssh_key", "key": pub_key
, "user": user
,
1404 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdu_id
}]}],
1406 await self
.RO
.deploy(nsr_id
, target
)
1408 result_dict
= await self
.RO
.create_action(
1410 item_id_name
=ro_nsr_id
,
1411 descriptor
={"add_public_key": pub_key
, "vms": [ro_vm_id
], "user": user
}
1413 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1414 if not result_dict
or not isinstance(result_dict
, dict):
1415 raise LcmException("Unknown response from RO when injecting key")
1416 for result
in result_dict
.values():
1417 if result
.get("vim_result") == 200:
1420 raise ROclient
.ROClientException("error injecting key: {}".format(
1421 result
.get("description")))
1423 except NgRoException
as e
:
1424 raise LcmException("Reaching max tries injecting key. Error: {}".format(e
))
1425 except ROclient
.ROClientException
as e
:
1427 self
.logger
.debug(logging_text
+ "error injecting key: {}. Retrying until {} seconds".
1431 raise LcmException("Reaching max tries injecting key. Error: {}".format(e
))
1437 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1439 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1441 my_vca
= vca_deployed_list
[vca_index
]
1442 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1443 # vdu or kdu: no dependencies
1447 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1448 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1449 configuration_status_list
= db_nsr
["configurationStatus"]
1450 for index
, vca_deployed
in enumerate(configuration_status_list
):
1451 if index
== vca_index
:
1454 if not my_vca
.get("member-vnf-index") or \
1455 (vca_deployed
.get("member-vnf-index") == my_vca
.get("member-vnf-index")):
1456 internal_status
= configuration_status_list
[index
].get("status")
1457 if internal_status
== 'READY':
1459 elif internal_status
== 'BROKEN':
1460 raise LcmException("Configuration aborted because dependent charm/s has failed")
1464 # no dependencies, return
1466 await asyncio
.sleep(10)
1469 raise LcmException("Configuration aborted because dependent charm/s timeout")
1471 async def instantiate_N2VC(self
, logging_text
, vca_index
, nsi_id
, db_nsr
, db_vnfr
, vdu_id
, kdu_name
, vdu_index
,
1472 config_descriptor
, deploy_params
, base_folder
, nslcmop_id
, stage
, vca_type
, vca_name
,
1473 ee_config_descriptor
):
1474 nsr_id
= db_nsr
["_id"]
1475 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1476 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1477 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1478 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1480 'collection': 'nsrs',
1481 'filter': {'_id': nsr_id
},
1482 'path': db_update_entry
1488 element_under_configuration
= nsr_id
1492 vnfr_id
= db_vnfr
["_id"]
1493 osm_config
["osm"]["vnf_id"] = vnfr_id
1495 namespace
= "{nsi}.{ns}".format(
1496 nsi
=nsi_id
if nsi_id
else "",
1500 element_type
= 'VNF'
1501 element_under_configuration
= vnfr_id
1502 namespace
+= ".{}".format(vnfr_id
)
1504 namespace
+= ".{}-{}".format(vdu_id
, vdu_index
or 0)
1505 element_type
= 'VDU'
1506 element_under_configuration
= "{}-{}".format(vdu_id
, vdu_index
or 0)
1507 osm_config
["osm"]["vdu_id"] = vdu_id
1509 namespace
+= ".{}".format(kdu_name
)
1510 element_type
= 'KDU'
1511 element_under_configuration
= kdu_name
1512 osm_config
["osm"]["kdu_name"] = kdu_name
1515 artifact_path
= "{}/{}/{}/{}".format(
1516 base_folder
["folder"],
1517 base_folder
["pkg-dir"],
1518 "charms" if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1521 # get initial_config_primitive_list that applies to this element
1522 initial_config_primitive_list
= config_descriptor
.get('initial-config-primitive')
1524 # add config if not present for NS charm
1525 ee_descriptor_id
= ee_config_descriptor
.get("id")
1526 initial_config_primitive_list
= self
._get
_initial
_config
_primitive
_list
(initial_config_primitive_list
,
1527 vca_deployed
, ee_descriptor_id
)
1529 # n2vc_redesign STEP 3.1
1530 # find old ee_id if exists
1531 ee_id
= vca_deployed
.get("ee_id")
1533 # create or register execution environment in VCA
1534 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm"):
1536 self
._write
_configuration
_status
(
1538 vca_index
=vca_index
,
1540 element_under_configuration
=element_under_configuration
,
1541 element_type
=element_type
1544 step
= "create execution environment"
1545 self
.logger
.debug(logging_text
+ step
)
1546 ee_id
, credentials
= await self
.vca_map
[vca_type
].create_execution_environment(
1547 namespace
=namespace
,
1551 artifact_path
=artifact_path
,
1554 elif vca_type
== "native_charm":
1555 step
= "Waiting to VM being up and getting IP address"
1556 self
.logger
.debug(logging_text
+ step
)
1557 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
,
1558 user
=None, pub_key
=None)
1559 credentials
= {"hostname": rw_mgmt_ip
}
1561 username
= deep_get(config_descriptor
, ("config-access", "ssh-access", "default-user"))
1562 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1563 # merged. Meanwhile let's get username from initial-config-primitive
1564 if not username
and initial_config_primitive_list
:
1565 for config_primitive
in initial_config_primitive_list
:
1566 for param
in config_primitive
.get("parameter", ()):
1567 if param
["name"] == "ssh-username":
1568 username
= param
["value"]
1571 raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
1572 "'config-access.ssh-access.default-user'")
1573 credentials
["username"] = username
1574 # n2vc_redesign STEP 3.2
1576 self
._write
_configuration
_status
(
1578 vca_index
=vca_index
,
1579 status
='REGISTERING',
1580 element_under_configuration
=element_under_configuration
,
1581 element_type
=element_type
1584 step
= "register execution environment {}".format(credentials
)
1585 self
.logger
.debug(logging_text
+ step
)
1586 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1587 credentials
=credentials
, namespace
=namespace
, db_dict
=db_dict
)
1589 # for compatibility with MON/POL modules, the need model and application name at database
1590 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1591 ee_id_parts
= ee_id
.split('.')
1592 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1593 if len(ee_id_parts
) >= 2:
1594 model_name
= ee_id_parts
[0]
1595 application_name
= ee_id_parts
[1]
1596 db_nsr_update
[db_update_entry
+ "model"] = model_name
1597 db_nsr_update
[db_update_entry
+ "application"] = application_name
1599 # n2vc_redesign STEP 3.3
1600 step
= "Install configuration Software"
1602 self
._write
_configuration
_status
(
1604 vca_index
=vca_index
,
1605 status
='INSTALLING SW',
1606 element_under_configuration
=element_under_configuration
,
1607 element_type
=element_type
,
1608 other_update
=db_nsr_update
1611 # TODO check if already done
1612 self
.logger
.debug(logging_text
+ step
)
1614 if vca_type
== "native_charm":
1615 config_primitive
= next((p
for p
in initial_config_primitive_list
if p
["name"] == "config"), None)
1616 if config_primitive
:
1617 config
= self
._map
_primitive
_params
(
1623 if vca_type
== "lxc_proxy_charm":
1624 if element_type
== "NS":
1625 num_units
= db_nsr
.get("config-units") or 1
1626 elif element_type
== "VNF":
1627 num_units
= db_vnfr
.get("config-units") or 1
1628 elif element_type
== "VDU":
1629 for v
in db_vnfr
["vdur"]:
1630 if vdu_id
== v
["vdu-id-ref"]:
1631 num_units
= v
.get("config-units") or 1
1634 await self
.vca_map
[vca_type
].install_configuration_sw(
1636 artifact_path
=artifact_path
,
1639 num_units
=num_units
,
1643 # write in db flag of configuration_sw already installed
1644 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True})
1646 # add relations for this VCA (wait for other peers related with this VCA)
1647 await self
._add
_vca
_relations
(logging_text
=logging_text
, nsr_id
=nsr_id
,
1648 vca_index
=vca_index
, vca_type
=vca_type
)
1650 # if SSH access is required, then get execution environment SSH public
1651 # if native charm we have waited already to VM be UP
1652 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm"):
1655 # self.logger.debug("get ssh key block")
1656 if deep_get(config_descriptor
, ("config-access", "ssh-access", "required")):
1657 # self.logger.debug("ssh key needed")
1658 # Needed to inject a ssh key
1659 user
= deep_get(config_descriptor
, ("config-access", "ssh-access", "default-user"))
1660 step
= "Install configuration Software, getting public ssh key"
1661 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(ee_id
=ee_id
, db_dict
=db_dict
)
1663 step
= "Insert public key into VM user={} ssh_key={}".format(user
, pub_key
)
1665 # self.logger.debug("no need to get ssh key")
1666 step
= "Waiting to VM being up and getting IP address"
1667 self
.logger
.debug(logging_text
+ step
)
1669 # n2vc_redesign STEP 5.1
1670 # wait for RO (ip-address) Insert pub_key into VM
1673 rw_mgmt_ip
= await self
.wait_kdu_up(logging_text
, nsr_id
, vnfr_id
, kdu_name
)
1675 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(logging_text
, nsr_id
, vnfr_id
, vdu_id
,
1676 vdu_index
, user
=user
, pub_key
=pub_key
)
1678 rw_mgmt_ip
= None # This is for a NS configuration
1680 self
.logger
.debug(logging_text
+ ' VM_ip_address={}'.format(rw_mgmt_ip
))
1682 # store rw_mgmt_ip in deploy params for later replacement
1683 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
1685 # n2vc_redesign STEP 6 Execute initial config primitive
1686 step
= 'execute initial config primitive'
1688 # wait for dependent primitives execution (NS -> VNF -> VDU)
1689 if initial_config_primitive_list
:
1690 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
1692 # stage, in function of element type: vdu, kdu, vnf or ns
1693 my_vca
= vca_deployed_list
[vca_index
]
1694 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1696 stage
[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1697 elif my_vca
.get("member-vnf-index"):
1699 stage
[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1702 stage
[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1704 self
._write
_configuration
_status
(
1706 vca_index
=vca_index
,
1707 status
='EXECUTING PRIMITIVE'
1710 self
._write
_op
_status
(
1715 check_if_terminated_needed
= True
1716 for initial_config_primitive
in initial_config_primitive_list
:
1717 # adding information on the vca_deployed if it is a NS execution environment
1718 if not vca_deployed
["member-vnf-index"]:
1719 deploy_params
["ns_config_info"] = json
.dumps(self
._get
_ns
_config
_info
(nsr_id
))
1720 # TODO check if already done
1721 primitive_params_
= self
._map
_primitive
_params
(initial_config_primitive
, {}, deploy_params
)
1723 step
= "execute primitive '{}' params '{}'".format(initial_config_primitive
["name"], primitive_params_
)
1724 self
.logger
.debug(logging_text
+ step
)
1725 await self
.vca_map
[vca_type
].exec_primitive(
1727 primitive_name
=initial_config_primitive
["name"],
1728 params_dict
=primitive_params_
,
1731 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1732 if check_if_terminated_needed
:
1733 if config_descriptor
.get('terminate-config-primitive'):
1734 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True})
1735 check_if_terminated_needed
= False
1737 # TODO register in database that primitive is done
1739 # STEP 7 Configure metrics
1740 if vca_type
== "helm":
1741 prometheus_jobs
= await self
.add_prometheus_metrics(
1743 artifact_path
=artifact_path
,
1744 ee_config_descriptor
=ee_config_descriptor
,
1747 target_ip
=rw_mgmt_ip
,
1750 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "prometheus_jobs": prometheus_jobs
})
1752 step
= "instantiated at VCA"
1753 self
.logger
.debug(logging_text
+ step
)
1755 self
._write
_configuration
_status
(
1757 vca_index
=vca_index
,
1761 except Exception as e
: # TODO not use Exception but N2VC exception
1762 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1763 if not isinstance(e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)):
1764 self
.logger
.error("Exception while {} : {}".format(step
, e
), exc_info
=True)
1765 self
._write
_configuration
_status
(
1767 vca_index
=vca_index
,
1770 raise LcmException("{} {}".format(step
, e
)) from e
1772 def _write_ns_status(self
, nsr_id
: str, ns_state
: str, current_operation
: str, current_operation_id
: str,
1773 error_description
: str = None, error_detail
: str = None, other_update
: dict = None):
1775 Update db_nsr fields.
1778 :param current_operation:
1779 :param current_operation_id:
1780 :param error_description:
1781 :param error_detail:
1782 :param other_update: Other required changes at database if provided, will be cleared
1786 db_dict
= other_update
or {}
1787 db_dict
["_admin.nslcmop"] = current_operation_id
# for backward compatibility
1788 db_dict
["_admin.current-operation"] = current_operation_id
1789 db_dict
["_admin.operation-type"] = current_operation
if current_operation
!= "IDLE" else None
1790 db_dict
["currentOperation"] = current_operation
1791 db_dict
["currentOperationID"] = current_operation_id
1792 db_dict
["errorDescription"] = error_description
1793 db_dict
["errorDetail"] = error_detail
1796 db_dict
["nsState"] = ns_state
1797 self
.update_db_2("nsrs", nsr_id
, db_dict
)
1798 except DbException
as e
:
1799 self
.logger
.warn('Error writing NS status, ns={}: {}'.format(nsr_id
, e
))
1801 def _write_op_status(self
, op_id
: str, stage
: list = None, error_message
: str = None, queuePosition
: int = 0,
1802 operation_state
: str = None, other_update
: dict = None):
1804 db_dict
= other_update
or {}
1805 db_dict
['queuePosition'] = queuePosition
1806 if isinstance(stage
, list):
1807 db_dict
['stage'] = stage
[0]
1808 db_dict
['detailed-status'] = " ".join(stage
)
1809 elif stage
is not None:
1810 db_dict
['stage'] = str(stage
)
1812 if error_message
is not None:
1813 db_dict
['errorMessage'] = error_message
1814 if operation_state
is not None:
1815 db_dict
['operationState'] = operation_state
1816 db_dict
["statusEnteredTime"] = time()
1817 self
.update_db_2("nslcmops", op_id
, db_dict
)
1818 except DbException
as e
:
1819 self
.logger
.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id
, e
))
1821 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
1823 nsr_id
= db_nsr
["_id"]
1824 # configurationStatus
1825 config_status
= db_nsr
.get('configurationStatus')
1827 db_nsr_update
= {"configurationStatus.{}.status".format(index
): status
for index
, v
in
1828 enumerate(config_status
) if v
}
1830 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1832 except DbException
as e
:
1833 self
.logger
.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id
, e
))
1835 def _write_configuration_status(self
, nsr_id
: str, vca_index
: int, status
: str = None,
1836 element_under_configuration
: str = None, element_type
: str = None,
1837 other_update
: dict = None):
1839 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1840 # .format(vca_index, status))
1843 db_path
= 'configurationStatus.{}.'.format(vca_index
)
1844 db_dict
= other_update
or {}
1846 db_dict
[db_path
+ 'status'] = status
1847 if element_under_configuration
:
1848 db_dict
[db_path
+ 'elementUnderConfiguration'] = element_under_configuration
1850 db_dict
[db_path
+ 'elementType'] = element_type
1851 self
.update_db_2("nsrs", nsr_id
, db_dict
)
1852 except DbException
as e
:
1853 self
.logger
.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1854 .format(status
, nsr_id
, vca_index
, e
))
1856 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
1858 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1859 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1860 Database is used because the result can be obtained from a different LCM worker in case of HA.
1861 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1862 :param db_nslcmop: database content of nslcmop
1863 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1864 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
1865 computed 'vim-account-id'
1868 nslcmop_id
= db_nslcmop
['_id']
1869 placement_engine
= deep_get(db_nslcmop
, ('operationParams', 'placement-engine'))
1870 if placement_engine
== "PLA":
1871 self
.logger
.debug(logging_text
+ "Invoke and wait for placement optimization")
1872 await self
.msg
.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id
}, loop
=self
.loop
)
1873 db_poll_interval
= 5
1874 wait
= db_poll_interval
* 10
1876 while not pla_result
and wait
>= 0:
1877 await asyncio
.sleep(db_poll_interval
)
1878 wait
-= db_poll_interval
1879 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1880 pla_result
= deep_get(db_nslcmop
, ('_admin', 'pla'))
1883 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id
))
1885 for pla_vnf
in pla_result
['vnf']:
1886 vnfr
= db_vnfrs
.get(pla_vnf
['member-vnf-index'])
1887 if not pla_vnf
.get('vimAccountId') or not vnfr
:
1890 self
.db
.set_one("vnfrs", {"_id": vnfr
["_id"]}, {"vim-account-id": pla_vnf
['vimAccountId']})
1892 vnfr
["vim-account-id"] = pla_vnf
['vimAccountId']
1895 def update_nsrs_with_pla_result(self
, params
):
1897 nslcmop_id
= deep_get(params
, ('placement', 'nslcmopId'))
1898 self
.update_db_2("nslcmops", nslcmop_id
, {"_admin.pla": params
.get('placement')})
1899 except Exception as e
:
1900 self
.logger
.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id
, e
))
1902 async def instantiate(self
, nsr_id
, nslcmop_id
):
1905 :param nsr_id: ns instance to deploy
1906 :param nslcmop_id: operation to run
1910 # Try to lock HA task here
1911 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
1912 if not task_is_locked_by_me
:
1913 self
.logger
.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id
))
1916 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
1917 self
.logger
.debug(logging_text
+ "Enter")
1919 # get all needed from database
1921 # database nsrs record
1924 # database nslcmops record
1927 # update operation on nsrs
1929 # update operation on nslcmops
1930 db_nslcmop_update
= {}
1932 nslcmop_operation_state
= None
1933 db_vnfrs
= {} # vnf's info indexed by member-index
1935 tasks_dict_info
= {} # from task to info text
1938 stage
= ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1939 # ^ stage, step, VIM progress
1941 # wait for any previous tasks in process
1942 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
1944 stage
[1] = "Sync filesystem from database."
1945 self
.fs
.sync() # TODO, make use of partial sync, only for the needed packages
1947 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1948 stage
[1] = "Reading from database."
1949 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1950 db_nsr_update
["detailed-status"] = "creating"
1951 db_nsr_update
["operational-status"] = "init"
1952 self
._write
_ns
_status
(
1954 ns_state
="BUILDING",
1955 current_operation
="INSTANTIATING",
1956 current_operation_id
=nslcmop_id
,
1957 other_update
=db_nsr_update
1959 self
._write
_op
_status
(
1965 # read from db: operation
1966 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
1967 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1968 ns_params
= db_nslcmop
.get("operationParams")
1969 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1970 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1972 timeout_ns_deploy
= self
.timeout
.get("ns_deploy", self
.timeout_ns_deploy
)
1975 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
1976 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1977 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
1978 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
1980 # nsr_name = db_nsr["name"] # TODO short-name??
1982 # read from db: vnf's of this ns
1983 stage
[1] = "Getting vnfrs from db."
1984 self
.logger
.debug(logging_text
+ stage
[1])
1985 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
1987 # read from db: vnfd's for every vnf
1988 db_vnfds_ref
= {} # every vnfd data indexed by vnf name
1989 db_vnfds
= {} # every vnfd data indexed by vnf id
1990 db_vnfds_index
= {} # every vnfd data indexed by vnf member-index
1992 # for each vnf in ns, read vnfd
1993 for vnfr
in db_vnfrs_list
:
1994 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
# vnf's dict indexed by member-index: '1', '2', etc
1995 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
1996 vnfd_ref
= vnfr
["vnfd-ref"] # vnfd name for this vnf
1998 # if we haven't this vnfd, read it from db
1999 if vnfd_id
not in db_vnfds
:
2001 stage
[1] = "Getting vnfd={} id='{}' from db.".format(vnfd_id
, vnfd_ref
)
2002 self
.logger
.debug(logging_text
+ stage
[1])
2003 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2006 db_vnfds_ref
[vnfd_ref
] = vnfd
# vnfd's indexed by name
2007 db_vnfds
[vnfd_id
] = vnfd
# vnfd's indexed by id
2008 db_vnfds_index
[vnfr
["member-vnf-index-ref"]] = db_vnfds
[vnfd_id
] # vnfd's indexed by member-index
2010 # Get or generates the _admin.deployed.VCA list
2011 vca_deployed_list
= None
2012 if db_nsr
["_admin"].get("deployed"):
2013 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2014 if vca_deployed_list
is None:
2015 vca_deployed_list
= []
2016 configuration_status_list
= []
2017 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2018 db_nsr_update
["configurationStatus"] = configuration_status_list
2019 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2020 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2021 elif isinstance(vca_deployed_list
, dict):
2022 # maintain backward compatibility. Change a dict to list at database
2023 vca_deployed_list
= list(vca_deployed_list
.values())
2024 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2025 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2027 if not isinstance(deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list):
2028 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2029 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2031 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2032 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2033 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2034 self
.db
.set_list("vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"})
2036 # n2vc_redesign STEP 2 Deploy Network Scenario
2037 stage
[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
2038 self
._write
_op
_status
(
2043 stage
[1] = "Deploying KDUs."
2044 # self.logger.debug(logging_text + "Before deploy_kdus")
2045 # Call to deploy_kdus in case exists the "vdu:kdu" param
2046 await self
.deploy_kdus(
2047 logging_text
=logging_text
,
2049 nslcmop_id
=nslcmop_id
,
2052 task_instantiation_info
=tasks_dict_info
,
2055 stage
[1] = "Getting VCA public key."
2056 # n2vc_redesign STEP 1 Get VCA public ssh-key
2057 # feature 1429. Add n2vc public key to needed VMs
2058 n2vc_key
= self
.n2vc
.get_public_key()
2059 n2vc_key_list
= [n2vc_key
]
2060 if self
.vca_config
.get("public_key"):
2061 n2vc_key_list
.append(self
.vca_config
["public_key"])
2063 stage
[1] = "Deploying NS at VIM."
2064 task_ro
= asyncio
.ensure_future(
2065 self
.instantiate_RO(
2066 logging_text
=logging_text
,
2070 db_nslcmop
=db_nslcmop
,
2072 db_vnfds_ref
=db_vnfds_ref
,
2073 n2vc_key_list
=n2vc_key_list
,
2077 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2078 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2080 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2081 stage
[1] = "Deploying Execution Environments."
2082 self
.logger
.debug(logging_text
+ stage
[1])
2084 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2085 # get_iterable() returns a value from a dict or empty tuple if key does not exist
2086 for c_vnf
in get_iterable(nsd
, "constituent-vnfd"):
2087 vnfd_id
= c_vnf
["vnfd-id-ref"]
2088 vnfd
= db_vnfds_ref
[vnfd_id
]
2089 member_vnf_index
= str(c_vnf
["member-vnf-index"])
2090 db_vnfr
= db_vnfrs
[member_vnf_index
]
2091 base_folder
= vnfd
["_admin"]["storage"]
2097 # Get additional parameters
2098 deploy_params
= {"OSM": self
._get
_osm
_params
(db_vnfr
)}
2099 if db_vnfr
.get("additionalParamsForVnf"):
2100 deploy_params
.update(self
._format
_additional
_params
(db_vnfr
["additionalParamsForVnf"].copy()))
2102 descriptor_config
= vnfd
.get("vnf-configuration")
2103 if descriptor_config
:
2105 logging_text
=logging_text
+ "member_vnf_index={} ".format(member_vnf_index
),
2108 nslcmop_id
=nslcmop_id
,
2114 member_vnf_index
=member_vnf_index
,
2115 vdu_index
=vdu_index
,
2117 deploy_params
=deploy_params
,
2118 descriptor_config
=descriptor_config
,
2119 base_folder
=base_folder
,
2120 task_instantiation_info
=tasks_dict_info
,
2124 # Deploy charms for each VDU that supports one.
2125 for vdud
in get_iterable(vnfd
, 'vdu'):
2127 descriptor_config
= vdud
.get('vdu-configuration')
2128 vdur
= next((x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None)
2129 if vdur
.get("additionalParams"):
2130 deploy_params_vdu
= self
._format
_additional
_params
(vdur
["additionalParams"])
2132 deploy_params_vdu
= deploy_params
2133 deploy_params_vdu
["OSM"] = self
._get
_osm
_params
(db_vnfr
, vdu_id
, vdu_count_index
=0)
2134 if descriptor_config
:
2137 for vdu_index
in range(int(vdud
.get("count", 1))):
2138 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2140 logging_text
=logging_text
+ "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2141 member_vnf_index
, vdu_id
, vdu_index
),
2144 nslcmop_id
=nslcmop_id
,
2150 member_vnf_index
=member_vnf_index
,
2151 vdu_index
=vdu_index
,
2153 deploy_params
=deploy_params_vdu
,
2154 descriptor_config
=descriptor_config
,
2155 base_folder
=base_folder
,
2156 task_instantiation_info
=tasks_dict_info
,
2159 for kdud
in get_iterable(vnfd
, 'kdu'):
2160 kdu_name
= kdud
["name"]
2161 descriptor_config
= kdud
.get('kdu-configuration')
2162 if descriptor_config
:
2166 kdur
= next(x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
)
2167 deploy_params_kdu
= {"OSM": self
._get
_osm
_params
(db_vnfr
)}
2168 if kdur
.get("additionalParams"):
2169 deploy_params_kdu
= self
._format
_additional
_params
(kdur
["additionalParams"])
2172 logging_text
=logging_text
,
2175 nslcmop_id
=nslcmop_id
,
2181 member_vnf_index
=member_vnf_index
,
2182 vdu_index
=vdu_index
,
2184 deploy_params
=deploy_params_kdu
,
2185 descriptor_config
=descriptor_config
,
2186 base_folder
=base_folder
,
2187 task_instantiation_info
=tasks_dict_info
,
2191 # Check if this NS has a charm configuration
2192 descriptor_config
= nsd
.get("ns-configuration")
2193 if descriptor_config
and descriptor_config
.get("juju"):
2196 member_vnf_index
= None
2202 # Get additional parameters
2203 deploy_params
= {"OSM": self
._get
_osm
_params
(db_vnfr
)}
2204 if db_nsr
.get("additionalParamsForNs"):
2205 deploy_params
.update(self
._format
_additional
_params
(db_nsr
["additionalParamsForNs"].copy()))
2206 base_folder
= nsd
["_admin"]["storage"]
2208 logging_text
=logging_text
,
2211 nslcmop_id
=nslcmop_id
,
2217 member_vnf_index
=member_vnf_index
,
2218 vdu_index
=vdu_index
,
2220 deploy_params
=deploy_params
,
2221 descriptor_config
=descriptor_config
,
2222 base_folder
=base_folder
,
2223 task_instantiation_info
=tasks_dict_info
,
2227 # rest of staff will be done at finally
2229 except (ROclient
.ROClientException
, DbException
, LcmException
, N2VCException
) as e
:
2230 self
.logger
.error(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
))
2232 except asyncio
.CancelledError
:
2233 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(stage
[1]))
2234 exc
= "Operation was cancelled"
2235 except Exception as e
:
2236 exc
= traceback
.format_exc()
2237 self
.logger
.critical(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
), exc_info
=True)
2240 error_list
.append(str(exc
))
2242 # wait for pending tasks
2244 stage
[1] = "Waiting for instantiate pending tasks."
2245 self
.logger
.debug(logging_text
+ stage
[1])
2246 error_list
+= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
, timeout_ns_deploy
,
2247 stage
, nslcmop_id
, nsr_id
=nsr_id
)
2248 stage
[1] = stage
[2] = ""
2249 except asyncio
.CancelledError
:
2250 error_list
.append("Cancelled")
2251 # TODO cancel all tasks
2252 except Exception as exc
:
2253 error_list
.append(str(exc
))
2255 # update operation-status
2256 db_nsr_update
["operational-status"] = "running"
2257 # let's begin with VCA 'configured' status (later we can change it)
2258 db_nsr_update
["config-status"] = "configured"
2259 for task
, task_name
in tasks_dict_info
.items():
2260 if not task
.done() or task
.cancelled() or task
.exception():
2261 if task_name
.startswith(self
.task_name_deploy_vca
):
2262 # A N2VC task is pending
2263 db_nsr_update
["config-status"] = "failed"
2265 # RO or KDU task is pending
2266 db_nsr_update
["operational-status"] = "failed"
2268 # update status at database
2270 error_detail
= ". ".join(error_list
)
2271 self
.logger
.error(logging_text
+ error_detail
)
2272 error_description_nslcmop
= '{} Detail: {}'.format(stage
[0], error_detail
)
2273 error_description_nsr
= 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id
, stage
[0])
2275 db_nsr_update
["detailed-status"] = error_description_nsr
+ " Detail: " + error_detail
2276 db_nslcmop_update
["detailed-status"] = error_detail
2277 nslcmop_operation_state
= "FAILED"
2281 error_description_nsr
= error_description_nslcmop
= None
2283 db_nsr_update
["detailed-status"] = "Done"
2284 db_nslcmop_update
["detailed-status"] = "Done"
2285 nslcmop_operation_state
= "COMPLETED"
2288 self
._write
_ns
_status
(
2291 current_operation
="IDLE",
2292 current_operation_id
=None,
2293 error_description
=error_description_nsr
,
2294 error_detail
=error_detail
,
2295 other_update
=db_nsr_update
2297 self
._write
_op
_status
(
2300 error_message
=error_description_nslcmop
,
2301 operation_state
=nslcmop_operation_state
,
2302 other_update
=db_nslcmop_update
,
2305 if nslcmop_operation_state
:
2307 await self
.msg
.aiowrite("ns", "instantiated", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
2308 "operationState": nslcmop_operation_state
},
2310 except Exception as e
:
2311 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
2313 self
.logger
.debug(logging_text
+ "Exit")
2314 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2316 async def _add_vca_relations(self
, logging_text
, nsr_id
, vca_index
: int,
2317 timeout
: int = 3600, vca_type
: str = None) -> bool:
2320 # 1. find all relations for this VCA
2321 # 2. wait for other peers related
2325 vca_type
= vca_type
or "lxc_proxy_charm"
2327 # STEP 1: find all relations for this VCA
2330 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2331 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2334 my_vca
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))[vca_index
]
2336 # read all ns-configuration relations
2337 ns_relations
= list()
2338 db_ns_relations
= deep_get(nsd
, ('ns-configuration', 'relation'))
2340 for r
in db_ns_relations
:
2341 # check if this VCA is in the relation
2342 if my_vca
.get('member-vnf-index') in\
2343 (r
.get('entities')[0].get('id'), r
.get('entities')[1].get('id')):
2344 ns_relations
.append(r
)
2346 # read all vnf-configuration relations
2347 vnf_relations
= list()
2348 db_vnfd_list
= db_nsr
.get('vnfd-id')
2350 for vnfd
in db_vnfd_list
:
2351 db_vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd
})
2352 db_vnf_relations
= deep_get(db_vnfd
, ('vnf-configuration', 'relation'))
2353 if db_vnf_relations
:
2354 for r
in db_vnf_relations
:
2355 # check if this VCA is in the relation
2356 if my_vca
.get('vdu_id') in (r
.get('entities')[0].get('id'), r
.get('entities')[1].get('id')):
2357 vnf_relations
.append(r
)
2359 # if no relations, terminate
2360 if not ns_relations
and not vnf_relations
:
2361 self
.logger
.debug(logging_text
+ ' No relations')
2364 self
.logger
.debug(logging_text
+ ' adding relations\n {}\n {}'.format(ns_relations
, vnf_relations
))
2371 if now
- start
>= timeout
:
2372 self
.logger
.error(logging_text
+ ' : timeout adding relations')
2375 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2376 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2378 # for each defined NS relation, find the VCA's related
2379 for r
in ns_relations
.copy():
2380 from_vca_ee_id
= None
2382 from_vca_endpoint
= None
2383 to_vca_endpoint
= None
2384 vca_list
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))
2385 for vca
in vca_list
:
2386 if vca
.get('member-vnf-index') == r
.get('entities')[0].get('id') \
2387 and vca
.get('config_sw_installed'):
2388 from_vca_ee_id
= vca
.get('ee_id')
2389 from_vca_endpoint
= r
.get('entities')[0].get('endpoint')
2390 if vca
.get('member-vnf-index') == r
.get('entities')[1].get('id') \
2391 and vca
.get('config_sw_installed'):
2392 to_vca_ee_id
= vca
.get('ee_id')
2393 to_vca_endpoint
= r
.get('entities')[1].get('endpoint')
2394 if from_vca_ee_id
and to_vca_ee_id
:
2396 await self
.vca_map
[vca_type
].add_relation(
2397 ee_id_1
=from_vca_ee_id
,
2398 ee_id_2
=to_vca_ee_id
,
2399 endpoint_1
=from_vca_endpoint
,
2400 endpoint_2
=to_vca_endpoint
)
2401 # remove entry from relations list
2402 ns_relations
.remove(r
)
2404 # check failed peers
2406 vca_status_list
= db_nsr
.get('configurationStatus')
2408 for i
in range(len(vca_list
)):
2410 vca_status
= vca_status_list
[i
]
2411 if vca
.get('member-vnf-index') == r
.get('entities')[0].get('id'):
2412 if vca_status
.get('status') == 'BROKEN':
2413 # peer broken: remove relation from list
2414 ns_relations
.remove(r
)
2415 if vca
.get('member-vnf-index') == r
.get('entities')[1].get('id'):
2416 if vca_status
.get('status') == 'BROKEN':
2417 # peer broken: remove relation from list
2418 ns_relations
.remove(r
)
2423 # for each defined VNF relation, find the VCA's related
2424 for r
in vnf_relations
.copy():
2425 from_vca_ee_id
= None
2427 from_vca_endpoint
= None
2428 to_vca_endpoint
= None
2429 vca_list
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))
2430 for vca
in vca_list
:
2431 key_to_check
= "vdu_id"
2432 if vca
.get("vdu_id") is None:
2433 key_to_check
= "vnfd_id"
2434 if vca
.get(key_to_check
) == r
.get('entities')[0].get('id') and vca
.get('config_sw_installed'):
2435 from_vca_ee_id
= vca
.get('ee_id')
2436 from_vca_endpoint
= r
.get('entities')[0].get('endpoint')
2437 if vca
.get(key_to_check
) == r
.get('entities')[1].get('id') and vca
.get('config_sw_installed'):
2438 to_vca_ee_id
= vca
.get('ee_id')
2439 to_vca_endpoint
= r
.get('entities')[1].get('endpoint')
2440 if from_vca_ee_id
and to_vca_ee_id
:
2442 await self
.vca_map
[vca_type
].add_relation(
2443 ee_id_1
=from_vca_ee_id
,
2444 ee_id_2
=to_vca_ee_id
,
2445 endpoint_1
=from_vca_endpoint
,
2446 endpoint_2
=to_vca_endpoint
)
2447 # remove entry from relations list
2448 vnf_relations
.remove(r
)
2450 # check failed peers
2452 vca_status_list
= db_nsr
.get('configurationStatus')
2454 for i
in range(len(vca_list
)):
2456 vca_status
= vca_status_list
[i
]
2457 if vca
.get('vdu_id') == r
.get('entities')[0].get('id'):
2458 if vca_status
.get('status') == 'BROKEN':
2459 # peer broken: remove relation from list
2460 vnf_relations
.remove(r
)
2461 if vca
.get('vdu_id') == r
.get('entities')[1].get('id'):
2462 if vca_status
.get('status') == 'BROKEN':
2463 # peer broken: remove relation from list
2464 vnf_relations
.remove(r
)
2470 await asyncio
.sleep(5.0)
2472 if not ns_relations
and not vnf_relations
:
2473 self
.logger
.debug('Relations added')
2478 except Exception as e
:
2479 self
.logger
.warn(logging_text
+ ' ERROR adding relations: {}'.format(e
))
2482 async def _install_kdu(self
, nsr_id
: str, nsr_db_path
: str, vnfr_data
: dict, kdu_index
: int, kdud
: dict,
2483 vnfd
: dict, k8s_instance_info
: dict, k8params
: dict = None, timeout
: int = 600):
2486 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
2488 db_dict_install
= {"collection": "nsrs",
2489 "filter": {"_id": nsr_id
},
2490 "path": nsr_db_path
}
2492 kdu_instance
= await self
.k8scluster_map
[k8sclustertype
].install(
2493 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2494 kdu_model
=k8s_instance_info
["kdu-model"],
2497 db_dict
=db_dict_install
,
2499 kdu_name
=k8s_instance_info
["kdu-name"],
2500 namespace
=k8s_instance_info
["namespace"])
2501 self
.update_db_2("nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
})
2503 # Obtain services to obtain management service ip
2504 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
2505 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2506 kdu_instance
=kdu_instance
,
2507 namespace
=k8s_instance_info
["namespace"])
2509 # Obtain management service info (if exists)
2510 vnfr_update_dict
= {}
2512 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
2513 mgmt_services
= [service
for service
in kdud
.get("service", []) if service
.get("mgmt-service")]
2514 for mgmt_service
in mgmt_services
:
2515 for service
in services
:
2516 if service
["name"].startswith(mgmt_service
["name"]):
2517 # Mgmt service found, Obtain service ip
2518 ip
= service
.get("external_ip", service
.get("cluster_ip"))
2519 if isinstance(ip
, list) and len(ip
) == 1:
2522 vnfr_update_dict
["kdur.{}.ip-address".format(kdu_index
)] = ip
2524 # Check if must update also mgmt ip at the vnf
2525 service_external_cp
= mgmt_service
.get("external-connection-point-ref")
2526 if service_external_cp
:
2527 if deep_get(vnfd
, ("mgmt-interface", "cp")) == service_external_cp
:
2528 vnfr_update_dict
["ip-address"] = ip
2532 self
.logger
.warn("Mgmt service name: {} not found".format(mgmt_service
["name"]))
2534 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
2535 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
2537 kdu_config
= kdud
.get("kdu-configuration")
2538 if kdu_config
and kdu_config
.get("initial-config-primitive") and kdu_config
.get("juju") is None:
2539 initial_config_primitive_list
= kdu_config
.get("initial-config-primitive")
2540 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
2542 for initial_config_primitive
in initial_config_primitive_list
:
2543 primitive_params_
= self
._map
_primitive
_params
(initial_config_primitive
, {}, {})
2545 await asyncio
.wait_for(
2546 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
2547 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2548 kdu_instance
=kdu_instance
,
2549 primitive_name
=initial_config_primitive
["name"],
2550 params
=primitive_params_
, db_dict
={}),
2553 except Exception as e
:
2554 # Prepare update db with error and raise exception
2556 self
.update_db_2("nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)})
2557 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), {"kdur.{}.status".format(kdu_index
): "ERROR"})
2559 # ignore to keep original exception
2561 # reraise original error
2566 async def deploy_kdus(self
, logging_text
, nsr_id
, nslcmop_id
, db_vnfrs
, db_vnfds
, task_instantiation_info
):
2567 # Launch kdus if present in the descriptor
2569 k8scluster_id_2_uuic
= {"helm-chart": {}, "juju-bundle": {}}
2571 async def _get_cluster_id(cluster_id
, cluster_type
):
2572 nonlocal k8scluster_id_2_uuic
2573 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
2574 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
2576 # check if K8scluster is creating and wait look if previous tasks in process
2577 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related("k8scluster", cluster_id
)
2579 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name
, cluster_id
)
2580 self
.logger
.debug(logging_text
+ text
)
2581 await asyncio
.wait(task_dependency
, timeout
=3600)
2583 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False)
2584 if not db_k8scluster
:
2585 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
2587 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
2589 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id
,
2591 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
2594 logging_text
+= "Deploy kdus: "
2597 db_nsr_update
= {"_admin.deployed.K8s": []}
2598 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2601 updated_cluster_list
= []
2603 for vnfr_data
in db_vnfrs
.values():
2604 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
2605 # Step 0: Prepare and set parameters
2606 desc_params
= self
._format
_additional
_params
(kdur
.get("additionalParams"))
2607 vnfd_id
= vnfr_data
.get('vnfd-id')
2608 kdud
= next(kdud
for kdud
in db_vnfds
[vnfd_id
]["kdu"] if kdud
["name"] == kdur
["kdu-name"])
2609 namespace
= kdur
.get("k8s-namespace")
2610 if kdur
.get("helm-chart"):
2611 kdumodel
= kdur
["helm-chart"]
2612 k8sclustertype
= "helm-chart"
2613 elif kdur
.get("juju-bundle"):
2614 kdumodel
= kdur
["juju-bundle"]
2615 k8sclustertype
= "juju-bundle"
2617 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2618 "juju-bundle. Maybe an old NBI version is running".
2619 format(vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]))
2620 # check if kdumodel is a file and exists
2622 storage
= deep_get(db_vnfds
.get(vnfd_id
), ('_admin', 'storage'))
2623 if storage
and storage
.get('pkg-dir'): # may be not present if vnfd has not artifacts
2624 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2625 filename
= '{}/{}/{}s/{}'.format(storage
["folder"], storage
["pkg-dir"], k8sclustertype
,
2627 if self
.fs
.file_exists(filename
, mode
='file') or self
.fs
.file_exists(filename
, mode
='dir'):
2628 kdumodel
= self
.fs
.path
+ filename
2629 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
2631 except Exception: # it is not a file
2634 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
2635 step
= "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id
)
2636 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
2639 if k8sclustertype
== "helm-chart" and cluster_uuid
not in updated_cluster_list
:
2640 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
2641 self
.k8sclusterhelm
.synchronize_repos(cluster_uuid
=cluster_uuid
))
2642 if del_repo_list
or added_repo_dict
:
2643 unset
= {'_admin.helm_charts_added.' + item
: None for item
in del_repo_list
}
2644 updated
= {'_admin.helm_charts_added.' +
2645 item
: name
for item
, name
in added_repo_dict
.items()}
2646 self
.logger
.debug(logging_text
+ "repos synchronized on k8s cluster '{}' to_delete: {}, "
2647 "to_add: {}".format(k8s_cluster_id
, del_repo_list
,
2649 self
.db
.set_one("k8sclusters", {"_id": k8s_cluster_id
}, updated
, unset
=unset
)
2650 updated_cluster_list
.append(cluster_uuid
)
2653 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data
["member-vnf-index-ref"],
2654 kdur
["kdu-name"], k8s_cluster_id
)
2655 k8s_instance_info
= {"kdu-instance": None,
2656 "k8scluster-uuid": cluster_uuid
,
2657 "k8scluster-type": k8sclustertype
,
2658 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
2659 "kdu-name": kdur
["kdu-name"],
2660 "kdu-model": kdumodel
,
2661 "namespace": namespace
}
2662 db_path
= "_admin.deployed.K8s.{}".format(index
)
2663 db_nsr_update
[db_path
] = k8s_instance_info
2664 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2666 task
= asyncio
.ensure_future(
2667 self
._install
_kdu
(nsr_id
, db_path
, vnfr_data
, kdu_index
, kdud
, db_vnfds
[vnfd_id
],
2668 k8s_instance_info
, k8params
=desc_params
, timeout
=600))
2669 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_KDU-{}".format(index
), task
)
2670 task_instantiation_info
[task
] = "Deploying KDU {}".format(kdur
["kdu-name"])
2674 except (LcmException
, asyncio
.CancelledError
):
2676 except Exception as e
:
2677 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
2678 if isinstance(e
, (N2VCException
, DbException
)):
2679 self
.logger
.error(logging_text
+ msg
)
2681 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
2682 raise LcmException(msg
)
2685 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2687 def _deploy_n2vc(self
, logging_text
, db_nsr
, db_vnfr
, nslcmop_id
, nsr_id
, nsi_id
, vnfd_id
, vdu_id
,
2688 kdu_name
, member_vnf_index
, vdu_index
, vdu_name
, deploy_params
, descriptor_config
,
2689 base_folder
, task_instantiation_info
, stage
):
2690 # launch instantiate_N2VC in a asyncio task and register task object
2691 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2692 # if not found, create one entry and update database
2693 # fill db_nsr._admin.deployed.VCA.<index>
2695 self
.logger
.debug(logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
))
2696 if descriptor_config
.get("juju"): # There is one execution envioronment of type juju
2697 ee_list
= [descriptor_config
]
2698 elif descriptor_config
.get("execution-environment-list"):
2699 ee_list
= descriptor_config
.get("execution-environment-list")
2700 else: # other types as script are not supported
2703 for ee_item
in ee_list
:
2704 self
.logger
.debug(logging_text
+ "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item
.get('juju'),
2705 ee_item
.get("helm-chart")))
2706 ee_descriptor_id
= ee_item
.get("id")
2707 if ee_item
.get("juju"):
2708 vca_name
= ee_item
['juju'].get('charm')
2709 vca_type
= "lxc_proxy_charm" if ee_item
['juju'].get('charm') is not None else "native_charm"
2710 if ee_item
['juju'].get('cloud') == "k8s":
2711 vca_type
= "k8s_proxy_charm"
2712 elif ee_item
['juju'].get('proxy') is False:
2713 vca_type
= "native_charm"
2714 elif ee_item
.get("helm-chart"):
2715 vca_name
= ee_item
['helm-chart']
2718 self
.logger
.debug(logging_text
+ "skipping non juju neither charm configuration")
2722 for vca_index
, vca_deployed
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
2723 if not vca_deployed
:
2725 if vca_deployed
.get("member-vnf-index") == member_vnf_index
and \
2726 vca_deployed
.get("vdu_id") == vdu_id
and \
2727 vca_deployed
.get("kdu_name") == kdu_name
and \
2728 vca_deployed
.get("vdu_count_index", 0) == vdu_index
and \
2729 vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
:
2732 # not found, create one.
2733 target
= "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
2735 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
2737 target
+= "/kdu/{}".format(kdu_name
)
2739 "target_element": target
,
2740 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
2741 "member-vnf-index": member_vnf_index
,
2743 "kdu_name": kdu_name
,
2744 "vdu_count_index": vdu_index
,
2745 "operational-status": "init", # TODO revise
2746 "detailed-status": "", # TODO revise
2747 "step": "initial-deploy", # TODO revise
2749 "vdu_name": vdu_name
,
2751 "ee_descriptor_id": ee_descriptor_id
2755 # create VCA and configurationStatus in db
2757 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
2758 "configurationStatus.{}".format(vca_index
): dict()
2760 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2762 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
2765 task_n2vc
= asyncio
.ensure_future(
2766 self
.instantiate_N2VC(
2767 logging_text
=logging_text
,
2768 vca_index
=vca_index
,
2774 vdu_index
=vdu_index
,
2775 deploy_params
=deploy_params
,
2776 config_descriptor
=descriptor_config
,
2777 base_folder
=base_folder
,
2778 nslcmop_id
=nslcmop_id
,
2782 ee_config_descriptor
=ee_item
2785 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_N2VC-{}".format(vca_index
), task_n2vc
)
2786 task_instantiation_info
[task_n2vc
] = self
.task_name_deploy_vca
+ " {}.{}".format(
2787 member_vnf_index
or "", vdu_id
or "")
2790 def _get_terminate_config_primitive(primitive_list
, vca_deployed
):
2791 """ Get a sorted terminate config primitive list. In case ee_descriptor_id is present at vca_deployed,
2792 it get only those primitives for this execution envirom"""
2794 primitive_list
= primitive_list
or []
2795 # filter primitives by ee_descriptor_id
2796 ee_descriptor_id
= vca_deployed
.get("ee_descriptor_id")
2797 primitive_list
= [p
for p
in primitive_list
if p
.get("execution-environment-ref") == ee_descriptor_id
]
2800 primitive_list
.sort(key
=lambda val
: int(val
['seq']))
2802 return primitive_list
2805 def _create_nslcmop(nsr_id
, operation
, params
):
2807 Creates a ns-lcm-opp content to be stored at database.
2808 :param nsr_id: internal id of the instance
2809 :param operation: instantiate, terminate, scale, action, ...
2810 :param params: user parameters for the operation
2811 :return: dictionary following SOL005 format
2813 # Raise exception if invalid arguments
2814 if not (nsr_id
and operation
and params
):
2816 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2822 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2823 "operationState": "PROCESSING",
2824 "statusEnteredTime": now
,
2825 "nsInstanceId": nsr_id
,
2826 "lcmOperationType": operation
,
2828 "isAutomaticInvocation": False,
2829 "operationParams": params
,
2830 "isCancelPending": False,
2832 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
2833 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
2838 def _format_additional_params(self
, params
):
2839 params
= params
or {}
2840 for key
, value
in params
.items():
2841 if str(value
).startswith("!!yaml "):
2842 params
[key
] = yaml
.safe_load(value
[7:])
2845 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
2846 primitive
= seq
.get('name')
2847 primitive_params
= {}
2849 "member_vnf_index": vnf_index
,
2850 "primitive": primitive
,
2851 "primitive_params": primitive_params
,
2854 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
2858 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
2859 op
= deep_get(db_nslcmop
, ('_admin', 'operations'), [])[op_index
]
2860 if op
.get('operationState') == 'COMPLETED':
2861 # b. Skip sub-operation
2862 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2863 return self
.SUBOPERATION_STATUS_SKIP
2865 # c. retry executing sub-operation
2866 # The sub-operation exists, and operationState != 'COMPLETED'
2867 # Update operationState = 'PROCESSING' to indicate a retry.
2868 operationState
= 'PROCESSING'
2869 detailed_status
= 'In progress'
2870 self
._update
_suboperation
_status
(
2871 db_nslcmop
, op_index
, operationState
, detailed_status
)
2872 # Return the sub-operation index
2873 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2874 # with arguments extracted from the sub-operation
2877 # Find a sub-operation where all keys in a matching dictionary must match
2878 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2879 def _find_suboperation(self
, db_nslcmop
, match
):
2880 if db_nslcmop
and match
:
2881 op_list
= db_nslcmop
.get('_admin', {}).get('operations', [])
2882 for i
, op
in enumerate(op_list
):
2883 if all(op
.get(k
) == match
[k
] for k
in match
):
2885 return self
.SUBOPERATION_STATUS_NOT_FOUND
2887 # Update status for a sub-operation given its index
2888 def _update_suboperation_status(self
, db_nslcmop
, op_index
, operationState
, detailed_status
):
2889 # Update DB for HA tasks
2890 q_filter
= {'_id': db_nslcmop
['_id']}
2891 update_dict
= {'_admin.operations.{}.operationState'.format(op_index
): operationState
,
2892 '_admin.operations.{}.detailed-status'.format(op_index
): detailed_status
}
2893 self
.db
.set_one("nslcmops",
2895 update_dict
=update_dict
,
2896 fail_on_empty
=False)
2898 # Add sub-operation, return the index of the added sub-operation
2899 # Optionally, set operationState, detailed-status, and operationType
2900 # Status and type are currently set for 'scale' sub-operations:
2901 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2902 # 'detailed-status' : status message
2903 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2904 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2905 def _add_suboperation(self
, db_nslcmop
, vnf_index
, vdu_id
, vdu_count_index
, vdu_name
, primitive
,
2906 mapped_primitive_params
, operationState
=None, detailed_status
=None, operationType
=None,
2907 RO_nsr_id
=None, RO_scaling_info
=None):
2909 return self
.SUBOPERATION_STATUS_NOT_FOUND
2910 # Get the "_admin.operations" list, if it exists
2911 db_nslcmop_admin
= db_nslcmop
.get('_admin', {})
2912 op_list
= db_nslcmop_admin
.get('operations')
2913 # Create or append to the "_admin.operations" list
2914 new_op
= {'member_vnf_index': vnf_index
,
2916 'vdu_count_index': vdu_count_index
,
2917 'primitive': primitive
,
2918 'primitive_params': mapped_primitive_params
}
2920 new_op
['operationState'] = operationState
2922 new_op
['detailed-status'] = detailed_status
2924 new_op
['lcmOperationType'] = operationType
2926 new_op
['RO_nsr_id'] = RO_nsr_id
2928 new_op
['RO_scaling_info'] = RO_scaling_info
2930 # No existing operations, create key 'operations' with current operation as first list element
2931 db_nslcmop_admin
.update({'operations': [new_op
]})
2932 op_list
= db_nslcmop_admin
.get('operations')
2934 # Existing operations, append operation to list
2935 op_list
.append(new_op
)
2937 db_nslcmop_update
= {'_admin.operations': op_list
}
2938 self
.update_db_2("nslcmops", db_nslcmop
['_id'], db_nslcmop_update
)
2939 op_index
= len(op_list
) - 1
2942 # Helper methods for scale() sub-operations
2944 # pre-scale/post-scale:
2945 # Check for 3 different cases:
2946 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2947 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2948 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2949 def _check_or_add_scale_suboperation(self
, db_nslcmop
, vnf_index
, vnf_config_primitive
, primitive_params
,
2950 operationType
, RO_nsr_id
=None, RO_scaling_info
=None):
2951 # Find this sub-operation
2952 if RO_nsr_id
and RO_scaling_info
:
2953 operationType
= 'SCALE-RO'
2955 'member_vnf_index': vnf_index
,
2956 'RO_nsr_id': RO_nsr_id
,
2957 'RO_scaling_info': RO_scaling_info
,
2961 'member_vnf_index': vnf_index
,
2962 'primitive': vnf_config_primitive
,
2963 'primitive_params': primitive_params
,
2964 'lcmOperationType': operationType
2966 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
2967 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
2968 # a. New sub-operation
2969 # The sub-operation does not exist, add it.
2970 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2971 # The following parameters are set to None for all kind of scaling:
2973 vdu_count_index
= None
2975 if RO_nsr_id
and RO_scaling_info
:
2976 vnf_config_primitive
= None
2977 primitive_params
= None
2980 RO_scaling_info
= None
2981 # Initial status for sub-operation
2982 operationState
= 'PROCESSING'
2983 detailed_status
= 'In progress'
2984 # Add sub-operation for pre/post-scaling (zero or more operations)
2985 self
._add
_suboperation
(db_nslcmop
,
2990 vnf_config_primitive
,
2997 return self
.SUBOPERATION_STATUS_NEW
2999 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3000 # or op_index (operationState != 'COMPLETED')
3001 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
3003 # Function to return execution_environment id
3005 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
3006 # TODO vdu_index_count
3007 for vca
in vca_deployed_list
:
3008 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
3011 async def destroy_N2VC(self
, logging_text
, db_nslcmop
, vca_deployed
, config_descriptor
,
3012 vca_index
, destroy_ee
=True, exec_primitives
=True):
3014 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3015 :param logging_text:
3017 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3018 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3019 :param vca_index: index in the database _admin.deployed.VCA
3020 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3021 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3022 not executed properly
3023 :return: None or exception
3027 logging_text
+ " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3028 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
3032 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
3034 # execute terminate_primitives
3036 terminate_primitives
= self
._get
_terminate
_config
_primitive
(
3037 config_descriptor
.get("terminate-config-primitive"), vca_deployed
)
3038 vdu_id
= vca_deployed
.get("vdu_id")
3039 vdu_count_index
= vca_deployed
.get("vdu_count_index")
3040 vdu_name
= vca_deployed
.get("vdu_name")
3041 vnf_index
= vca_deployed
.get("member-vnf-index")
3042 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
3043 for seq
in terminate_primitives
:
3044 # For each sequence in list, get primitive and call _ns_execute_primitive()
3045 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
3046 vnf_index
, seq
.get("name"))
3047 self
.logger
.debug(logging_text
+ step
)
3048 # Create the primitive for each sequence, i.e. "primitive": "touch"
3049 primitive
= seq
.get('name')
3050 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(seq
, vnf_index
)
3053 self
._add
_suboperation
(db_nslcmop
,
3059 mapped_primitive_params
)
3060 # Sub-operations: Call _ns_execute_primitive() instead of action()
3062 result
, result_detail
= await self
._ns
_execute
_primitive
(vca_deployed
["ee_id"], primitive
,
3063 mapped_primitive_params
,
3065 except LcmException
:
3066 # this happens when VCA is not deployed. In this case it is not needed to terminate
3068 result_ok
= ['COMPLETED', 'PARTIALLY_COMPLETED']
3069 if result
not in result_ok
:
3070 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
3071 "error {}".format(seq
.get("name"), vnf_index
, result_detail
))
3072 # set that this VCA do not need terminated
3073 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(vca_index
)
3074 self
.update_db_2("nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False})
3076 if vca_deployed
.get("prometheus_jobs") and self
.prometheus
:
3077 await self
.prometheus
.update(remove_jobs
=vca_deployed
["prometheus_jobs"])
3080 await self
.vca_map
[vca_type
].delete_execution_environment(vca_deployed
["ee_id"])
3082 async def _delete_all_N2VC(self
, db_nsr
: dict):
3083 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
='TERMINATING')
3084 namespace
= "." + db_nsr
["_id"]
3086 await self
.n2vc
.delete_namespace(namespace
=namespace
, total_timeout
=self
.timeout_charm_delete
)
3087 except N2VCNotFound
: # already deleted. Skip
3089 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
='DELETED')
3091 async def _terminate_RO(self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
):
3093 Terminates a deployment from RO
3094 :param logging_text:
3095 :param nsr_deployed: db_nsr._admin.deployed
3098 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
3099 this method will update only the index 2, but it will write on database the concatenated content of the list
3104 ro_nsr_id
= ro_delete_action
= None
3105 if nsr_deployed
and nsr_deployed
.get("RO"):
3106 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
3107 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
3110 stage
[2] = "Deleting ns from VIM."
3111 db_nsr_update
["detailed-status"] = " ".join(stage
)
3112 self
._write
_op
_status
(nslcmop_id
, stage
)
3113 self
.logger
.debug(logging_text
+ stage
[2])
3114 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3115 self
._write
_op
_status
(nslcmop_id
, stage
)
3116 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
3117 ro_delete_action
= desc
["action_id"]
3118 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
3119 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
3120 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
3121 if ro_delete_action
:
3122 # wait until NS is deleted from VIM
3123 stage
[2] = "Waiting ns deleted from VIM."
3124 detailed_status_old
= None
3125 self
.logger
.debug(logging_text
+ stage
[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id
,
3127 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3128 self
._write
_op
_status
(nslcmop_id
, stage
)
3130 delete_timeout
= 20 * 60 # 20 minutes
3131 while delete_timeout
> 0:
3132 desc
= await self
.RO
.show(
3134 item_id_name
=ro_nsr_id
,
3135 extra_item
="action",
3136 extra_item_id
=ro_delete_action
)
3139 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
3141 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
3142 if ns_status
== "ERROR":
3143 raise ROclient
.ROClientException(ns_status_info
)
3144 elif ns_status
== "BUILD":
3145 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
3146 elif ns_status
== "ACTIVE":
3147 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
3148 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
3151 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status
)
3152 if stage
[2] != detailed_status_old
:
3153 detailed_status_old
= stage
[2]
3154 db_nsr_update
["detailed-status"] = " ".join(stage
)
3155 self
._write
_op
_status
(nslcmop_id
, stage
)
3156 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3157 await asyncio
.sleep(5, loop
=self
.loop
)
3159 else: # delete_timeout <= 0:
3160 raise ROclient
.ROClientException("Timeout waiting ns deleted from VIM")
3162 except Exception as e
:
3163 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3164 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
3165 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
3166 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
3167 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
3168 self
.logger
.debug(logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
))
3169 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
3170 failed_detail
.append("delete conflict: {}".format(e
))
3171 self
.logger
.debug(logging_text
+ "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
))
3173 failed_detail
.append("delete error: {}".format(e
))
3174 self
.logger
.error(logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
))
3177 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
3178 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
3180 stage
[2] = "Deleting nsd from RO."
3181 db_nsr_update
["detailed-status"] = " ".join(stage
)
3182 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3183 self
._write
_op
_status
(nslcmop_id
, stage
)
3184 await self
.RO
.delete("nsd", ro_nsd_id
)
3185 self
.logger
.debug(logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
))
3186 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
3187 except Exception as e
:
3188 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
3189 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
3190 self
.logger
.debug(logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
))
3191 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
3192 failed_detail
.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
))
3193 self
.logger
.debug(logging_text
+ failed_detail
[-1])
3195 failed_detail
.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
))
3196 self
.logger
.error(logging_text
+ failed_detail
[-1])
3198 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
3199 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
3200 if not vnf_deployed
or not vnf_deployed
["id"]:
3203 ro_vnfd_id
= vnf_deployed
["id"]
3204 stage
[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
3205 vnf_deployed
["member-vnf-index"], ro_vnfd_id
)
3206 db_nsr_update
["detailed-status"] = " ".join(stage
)
3207 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3208 self
._write
_op
_status
(nslcmop_id
, stage
)
3209 await self
.RO
.delete("vnfd", ro_vnfd_id
)
3210 self
.logger
.debug(logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
))
3211 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
3212 except Exception as e
:
3213 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
3214 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
3215 self
.logger
.debug(logging_text
+ "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
))
3216 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
3217 failed_detail
.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
))
3218 self
.logger
.debug(logging_text
+ failed_detail
[-1])
3220 failed_detail
.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
))
3221 self
.logger
.error(logging_text
+ failed_detail
[-1])
3224 stage
[2] = "Error deleting from VIM"
3226 stage
[2] = "Deleted from VIM"
3227 db_nsr_update
["detailed-status"] = " ".join(stage
)
3228 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3229 self
._write
_op
_status
(nslcmop_id
, stage
)
3232 raise LcmException("; ".join(failed_detail
))
3234 async def terminate(self
, nsr_id
, nslcmop_id
):
3235 # Try to lock HA task here
3236 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3237 if not task_is_locked_by_me
:
3240 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
3241 self
.logger
.debug(logging_text
+ "Enter")
3242 timeout_ns_terminate
= self
.timeout_ns_terminate
3245 operation_params
= None
3247 error_list
= [] # annotates all failed error messages
3248 db_nslcmop_update
= {}
3249 autoremove
= False # autoremove after terminated
3250 tasks_dict_info
= {}
3252 stage
= ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
3253 # ^ contains [stage, step, VIM-status]
3255 # wait for any previous tasks in process
3256 await self
.lcm_tasks
.waitfor_related_HA("ns", 'nslcmops', nslcmop_id
)
3258 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
3259 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3260 operation_params
= db_nslcmop
.get("operationParams") or {}
3261 if operation_params
.get("timeout_ns_terminate"):
3262 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
3263 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
3264 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3266 db_nsr_update
["operational-status"] = "terminating"
3267 db_nsr_update
["config-status"] = "terminating"
3268 self
._write
_ns
_status
(
3270 ns_state
="TERMINATING",
3271 current_operation
="TERMINATING",
3272 current_operation_id
=nslcmop_id
,
3273 other_update
=db_nsr_update
3275 self
._write
_op
_status
(
3280 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
3281 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
3284 stage
[1] = "Getting vnf descriptors from db."
3285 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
3286 db_vnfds_from_id
= {}
3287 db_vnfds_from_member_index
= {}
3289 for vnfr
in db_vnfrs_list
:
3290 vnfd_id
= vnfr
["vnfd-id"]
3291 if vnfd_id
not in db_vnfds_from_id
:
3292 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
3293 db_vnfds_from_id
[vnfd_id
] = vnfd
3294 db_vnfds_from_member_index
[vnfr
["member-vnf-index-ref"]] = db_vnfds_from_id
[vnfd_id
]
3296 # Destroy individual execution environments when there are terminating primitives.
3297 # Rest of EE will be deleted at once
3298 # TODO - check before calling _destroy_N2VC
3299 # if not operation_params.get("skip_terminate_primitives"):#
3300 # or not vca.get("needed_terminate"):
3301 stage
[0] = "Stage 2/3 execute terminating primitives."
3302 self
.logger
.debug(logging_text
+ stage
[0])
3303 stage
[1] = "Looking execution environment that needs terminate."
3304 self
.logger
.debug(logging_text
+ stage
[1])
3305 # self.logger.debug("nsr_deployed: {}".format(nsr_deployed))
3306 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
3307 config_descriptor
= None
3308 if not vca
or not vca
.get("ee_id"):
3310 if not vca
.get("member-vnf-index"):
3312 config_descriptor
= db_nsr
.get("ns-configuration")
3313 elif vca
.get("vdu_id"):
3314 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3315 vdud
= next((vdu
for vdu
in db_vnfd
.get("vdu", ()) if vdu
["id"] == vca
.get("vdu_id")), None)
3317 config_descriptor
= vdud
.get("vdu-configuration")
3318 elif vca
.get("kdu_name"):
3319 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3320 kdud
= next((kdu
for kdu
in db_vnfd
.get("kdu", ()) if kdu
["name"] == vca
.get("kdu_name")), None)
3322 config_descriptor
= kdud
.get("kdu-configuration")
3324 config_descriptor
= db_vnfds_from_member_index
[vca
["member-vnf-index"]].get("vnf-configuration")
3325 vca_type
= vca
.get("type")
3326 exec_terminate_primitives
= (not operation_params
.get("skip_terminate_primitives") and
3327 vca
.get("needed_terminate"))
3328 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
3329 # pending native charms
3330 destroy_ee
= True if vca_type
in ("helm", "native_charm") else False
3331 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
3332 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
3333 task
= asyncio
.ensure_future(
3334 self
.destroy_N2VC(logging_text
, db_nslcmop
, vca
, config_descriptor
, vca_index
,
3335 destroy_ee
, exec_terminate_primitives
))
3336 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
3338 # wait for pending tasks of terminate primitives
3340 self
.logger
.debug(logging_text
+ 'Waiting for tasks {}'.format(list(tasks_dict_info
.keys())))
3341 error_list
= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
,
3342 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
3344 tasks_dict_info
.clear()
3346 return # raise LcmException("; ".join(error_list))
3348 # remove All execution environments at once
3349 stage
[0] = "Stage 3/3 delete all."
3351 if nsr_deployed
.get("VCA"):
3352 stage
[1] = "Deleting all execution environments."
3353 self
.logger
.debug(logging_text
+ stage
[1])
3354 task_delete_ee
= asyncio
.ensure_future(asyncio
.wait_for(self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
),
3355 timeout
=self
.timeout_charm_delete
))
3356 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3357 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
3359 # Delete from k8scluster
3360 stage
[1] = "Deleting KDUs."
3361 self
.logger
.debug(logging_text
+ stage
[1])
3362 # print(nsr_deployed)
3363 for kdu
in get_iterable(nsr_deployed
, "K8s"):
3364 if not kdu
or not kdu
.get("kdu-instance"):
3366 kdu_instance
= kdu
.get("kdu-instance")
3367 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
3368 task_delete_kdu_instance
= asyncio
.ensure_future(
3369 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
3370 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3371 kdu_instance
=kdu_instance
))
3373 self
.logger
.error(logging_text
+ "Unknown k8s deployment type {}".
3374 format(kdu
.get("k8scluster-type")))
3376 tasks_dict_info
[task_delete_kdu_instance
] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
3379 stage
[1] = "Deleting ns from VIM."
3381 task_delete_ro
= asyncio
.ensure_future(
3382 self
._terminate
_ng
_ro
(logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
))
3384 task_delete_ro
= asyncio
.ensure_future(
3385 self
._terminate
_RO
(logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
))
3386 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
3388 # rest of staff will be done at finally
3390 except (ROclient
.ROClientException
, DbException
, LcmException
, N2VCException
) as e
:
3391 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3393 except asyncio
.CancelledError
:
3394 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(stage
[1]))
3395 exc
= "Operation was cancelled"
3396 except Exception as e
:
3397 exc
= traceback
.format_exc()
3398 self
.logger
.critical(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
), exc_info
=True)
3401 error_list
.append(str(exc
))
3403 # wait for pending tasks
3405 stage
[1] = "Waiting for terminate pending tasks."
3406 self
.logger
.debug(logging_text
+ stage
[1])
3407 error_list
+= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
, timeout_ns_terminate
,
3409 stage
[1] = stage
[2] = ""
3410 except asyncio
.CancelledError
:
3411 error_list
.append("Cancelled")
3412 # TODO cancell all tasks
3413 except Exception as exc
:
3414 error_list
.append(str(exc
))
3415 # update status at database
3417 error_detail
= "; ".join(error_list
)
3418 # self.logger.error(logging_text + error_detail)
3419 error_description_nslcmop
= '{} Detail: {}'.format(stage
[0], error_detail
)
3420 error_description_nsr
= 'Operation: TERMINATING.{}, {}.'.format(nslcmop_id
, stage
[0])
3422 db_nsr_update
["operational-status"] = "failed"
3423 db_nsr_update
["detailed-status"] = error_description_nsr
+ " Detail: " + error_detail
3424 db_nslcmop_update
["detailed-status"] = error_detail
3425 nslcmop_operation_state
= "FAILED"
3429 error_description_nsr
= error_description_nslcmop
= None
3430 ns_state
= "NOT_INSTANTIATED"
3431 db_nsr_update
["operational-status"] = "terminated"
3432 db_nsr_update
["detailed-status"] = "Done"
3433 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
3434 db_nslcmop_update
["detailed-status"] = "Done"
3435 nslcmop_operation_state
= "COMPLETED"
3438 self
._write
_ns
_status
(
3441 current_operation
="IDLE",
3442 current_operation_id
=None,
3443 error_description
=error_description_nsr
,
3444 error_detail
=error_detail
,
3445 other_update
=db_nsr_update
3447 self
._write
_op
_status
(
3450 error_message
=error_description_nslcmop
,
3451 operation_state
=nslcmop_operation_state
,
3452 other_update
=db_nslcmop_update
,
3454 if ns_state
== "NOT_INSTANTIATED":
3456 self
.db
.set_list("vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "NOT_INSTANTIATED"})
3457 except DbException
as e
:
3458 self
.logger
.warn(logging_text
+ 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
3460 if operation_params
:
3461 autoremove
= operation_params
.get("autoremove", False)
3462 if nslcmop_operation_state
:
3464 await self
.msg
.aiowrite("ns", "terminated", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
3465 "operationState": nslcmop_operation_state
,
3466 "autoremove": autoremove
},
3468 except Exception as e
:
3469 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
3471 self
.logger
.debug(logging_text
+ "Exit")
3472 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
3474 async def _wait_for_tasks(self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None):
3476 error_detail_list
= []
3478 pending_tasks
= list(created_tasks_info
.keys())
3479 num_tasks
= len(pending_tasks
)
3481 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
3482 self
._write
_op
_status
(nslcmop_id
, stage
)
3483 while pending_tasks
:
3485 _timeout
= timeout
+ time_start
- time()
3486 done
, pending_tasks
= await asyncio
.wait(pending_tasks
, timeout
=_timeout
,
3487 return_when
=asyncio
.FIRST_COMPLETED
)
3488 num_done
+= len(done
)
3489 if not done
: # Timeout
3490 for task
in pending_tasks
:
3491 new_error
= created_tasks_info
[task
] + ": Timeout"
3492 error_detail_list
.append(new_error
)
3493 error_list
.append(new_error
)
3496 if task
.cancelled():
3499 exc
= task
.exception()
3501 if isinstance(exc
, asyncio
.TimeoutError
):
3503 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
3504 error_list
.append(created_tasks_info
[task
])
3505 error_detail_list
.append(new_error
)
3506 if isinstance(exc
, (str, DbException
, N2VCException
, ROclient
.ROClientException
, LcmException
,
3508 self
.logger
.error(logging_text
+ new_error
)
3510 exc_traceback
= "".join(traceback
.format_exception(None, exc
, exc
.__traceback
__))
3511 self
.logger
.error(logging_text
+ created_tasks_info
[task
] + exc_traceback
)
3513 self
.logger
.debug(logging_text
+ created_tasks_info
[task
] + ": Done")
3514 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
3516 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
3517 if nsr_id
: # update also nsr
3518 self
.update_db_2("nsrs", nsr_id
, {"errorDescription": "Error at: " + ", ".join(error_list
),
3519 "errorDetail": ". ".join(error_detail_list
)})
3520 self
._write
_op
_status
(nslcmop_id
, stage
)
3521 return error_detail_list
3524 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
3526 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3527 The default-value is used. If it is between < > it look for a value at instantiation_params
3528 :param primitive_desc: portion of VNFD/NSD that describes primitive
3529 :param params: Params provided by user
3530 :param instantiation_params: Instantiation params provided by user
3531 :return: a dictionary with the calculated params
3533 calculated_params
= {}
3534 for parameter
in primitive_desc
.get("parameter", ()):
3535 param_name
= parameter
["name"]
3536 if param_name
in params
:
3537 calculated_params
[param_name
] = params
[param_name
]
3538 elif "default-value" in parameter
or "value" in parameter
:
3539 if "value" in parameter
:
3540 calculated_params
[param_name
] = parameter
["value"]
3542 calculated_params
[param_name
] = parameter
["default-value"]
3543 if isinstance(calculated_params
[param_name
], str) and calculated_params
[param_name
].startswith("<") \
3544 and calculated_params
[param_name
].endswith(">"):
3545 if calculated_params
[param_name
][1:-1] in instantiation_params
:
3546 calculated_params
[param_name
] = instantiation_params
[calculated_params
[param_name
][1:-1]]
3548 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3549 format(calculated_params
[param_name
], primitive_desc
["name"]))
3551 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3552 format(param_name
, primitive_desc
["name"]))
3554 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
3555 calculated_params
[param_name
] = yaml
.safe_dump(calculated_params
[param_name
], default_flow_style
=True,
3557 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[param_name
].startswith("!!yaml "):
3558 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
3559 if parameter
.get("data-type") == "INTEGER":
3561 calculated_params
[param_name
] = int(calculated_params
[param_name
])
3562 except ValueError: # error converting string to int
3564 "Parameter {} of primitive {} must be integer".format(param_name
, primitive_desc
["name"]))
3565 elif parameter
.get("data-type") == "BOOLEAN":
3566 calculated_params
[param_name
] = not ((str(calculated_params
[param_name
])).lower() == 'false')
3568 # add always ns_config_info if primitive name is config
3569 if primitive_desc
["name"] == "config":
3570 if "ns_config_info" in instantiation_params
:
3571 calculated_params
["ns_config_info"] = instantiation_params
["ns_config_info"]
3572 return calculated_params
3574 def _look_for_deployed_vca(self
, deployed_vca
, member_vnf_index
, vdu_id
, vdu_count_index
, kdu_name
=None,
3575 ee_descriptor_id
=None):
3576 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3577 for vca
in deployed_vca
:
3580 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
3582 if vdu_count_index
is not None and vdu_count_index
!= vca
["vdu_count_index"]:
3584 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
3586 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
3590 # vca_deployed not found
3591 raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
3592 " is not deployed".format(member_vnf_index
, vdu_id
, vdu_count_index
, kdu_name
,
3596 ee_id
= vca
.get("ee_id")
3597 vca_type
= vca
.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3599 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3600 "execution environment"
3601 .format(member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
))
3602 return ee_id
, vca_type
3604 async def _ns_execute_primitive(self
, ee_id
, primitive
, primitive_params
, retries
=0,
3605 retries_interval
=30, timeout
=None,
3606 vca_type
=None, db_dict
=None) -> (str, str):
3608 if primitive
== "config":
3609 primitive_params
= {"params": primitive_params
}
3611 vca_type
= vca_type
or "lxc_proxy_charm"
3615 output
= await asyncio
.wait_for(
3616 self
.vca_map
[vca_type
].exec_primitive(
3618 primitive_name
=primitive
,
3619 params_dict
=primitive_params
,
3620 progress_timeout
=self
.timeout_progress_primitive
,
3621 total_timeout
=self
.timeout_primitive
,
3623 timeout
=timeout
or self
.timeout_primitive
)
3626 except asyncio
.CancelledError
:
3628 except Exception as e
: # asyncio.TimeoutError
3629 if isinstance(e
, asyncio
.TimeoutError
):
3633 self
.logger
.debug('Error executing action {} on {} -> {}'.format(primitive
, ee_id
, e
))
3635 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
3637 return 'FAILED', str(e
)
3639 return 'COMPLETED', output
3641 except (LcmException
, asyncio
.CancelledError
):
3643 except Exception as e
:
3644 return 'FAIL', 'Error executing action {}: {}'.format(primitive
, e
)
3646 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
3648 Updating the vca_status with latest juju information in nsrs record
3649 :param: nsr_id: Id of the nsr
3650 :param: nslcmop_id: Id of the nslcmop
3654 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
3655 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3657 for vca_index
, _
in enumerate(db_nsr
['_admin']['deployed']['VCA']):
3658 table
, filter, path
= "nsrs", {"_id": nsr_id
}, "_admin.deployed.VCA.{}.".format(vca_index
)
3659 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
3661 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
3662 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
3664 async def action(self
, nsr_id
, nslcmop_id
):
3666 # Try to lock HA task here
3667 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3668 if not task_is_locked_by_me
:
3671 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
3672 self
.logger
.debug(logging_text
+ "Enter")
3673 # get all needed from database
3677 db_nslcmop_update
= {}
3678 nslcmop_operation_state
= None
3679 error_description_nslcmop
= None
3682 # wait for any previous tasks in process
3683 step
= "Waiting for previous operations to terminate"
3684 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
3686 self
._write
_ns
_status
(
3689 current_operation
="RUNNING ACTION",
3690 current_operation_id
=nslcmop_id
3693 step
= "Getting information from database"
3694 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3695 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3697 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3698 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
3699 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
3700 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
3701 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
3702 primitive
= db_nslcmop
["operationParams"]["primitive"]
3703 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
3704 timeout_ns_action
= db_nslcmop
["operationParams"].get("timeout_ns_action", self
.timeout_primitive
)
3707 step
= "Getting vnfr from database"
3708 db_vnfr
= self
.db
.get_one("vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
})
3709 step
= "Getting vnfd from database"
3710 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
3712 step
= "Getting nsd from database"
3713 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
3715 # for backward compatibility
3716 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
3717 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
3718 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
3719 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3721 # look for primitive
3722 config_primitive_desc
= descriptor_configuration
= None
3724 for vdu
in get_iterable(db_vnfd
, "vdu"):
3725 if vdu_id
== vdu
["id"]:
3726 descriptor_configuration
= vdu
.get("vdu-configuration")
3729 for kdu
in get_iterable(db_vnfd
, "kdu"):
3730 if kdu_name
== kdu
["name"]:
3731 descriptor_configuration
= kdu
.get("kdu-configuration")
3734 descriptor_configuration
= db_vnfd
.get("vnf-configuration")
3736 descriptor_configuration
= db_nsd
.get("ns-configuration")
3738 if descriptor_configuration
and descriptor_configuration
.get("config-primitive"):
3739 for config_primitive
in descriptor_configuration
["config-primitive"]:
3740 if config_primitive
["name"] == primitive
:
3741 config_primitive_desc
= config_primitive
3744 if not config_primitive_desc
:
3745 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
3746 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3748 primitive_name
= primitive
3749 ee_descriptor_id
= None
3751 primitive_name
= config_primitive_desc
.get("execution-environment-primitive", primitive
)
3752 ee_descriptor_id
= config_primitive_desc
.get("execution-environment-ref")
3756 vdur
= next((x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None)
3757 desc_params
= self
._format
_additional
_params
(vdur
.get("additionalParams"))
3759 kdur
= next((x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None)
3760 desc_params
= self
._format
_additional
_params
(kdur
.get("additionalParams"))
3762 desc_params
= self
._format
_additional
_params
(db_vnfr
.get("additionalParamsForVnf"))
3764 desc_params
= self
._format
_additional
_params
(db_nsr
.get("additionalParamsForNs"))
3767 kdu_action
= True if not deep_get(kdu
, ("kdu-configuration", "juju")) else False
3769 # TODO check if ns is in a proper status
3770 if kdu_name
and (primitive_name
in ("upgrade", "rollback", "status") or kdu_action
):
3771 # kdur and desc_params already set from before
3772 if primitive_params
:
3773 desc_params
.update(primitive_params
)
3774 # TODO Check if we will need something at vnf level
3775 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
3776 if kdu_name
== kdu
["kdu-name"] and kdu
["member-vnf-index"] == vnf_index
:
3779 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
))
3781 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
3782 msg
= "unknown k8scluster-type '{}'".format(kdu
.get("k8scluster-type"))
3783 raise LcmException(msg
)
3785 db_dict
= {"collection": "nsrs",
3786 "filter": {"_id": nsr_id
},
3787 "path": "_admin.deployed.K8s.{}".format(index
)}
3788 self
.logger
.debug(logging_text
+ "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
))
3789 step
= "Executing kdu {}".format(primitive_name
)
3790 if primitive_name
== "upgrade":
3791 if desc_params
.get("kdu_model"):
3792 kdu_model
= desc_params
.get("kdu_model")
3793 del desc_params
["kdu_model"]
3795 kdu_model
= kdu
.get("kdu-model")
3796 parts
= kdu_model
.split(sep
=":")
3798 kdu_model
= parts
[0]
3800 detailed_status
= await asyncio
.wait_for(
3801 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
3802 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3803 kdu_instance
=kdu
.get("kdu-instance"),
3804 atomic
=True, kdu_model
=kdu_model
,
3805 params
=desc_params
, db_dict
=db_dict
,
3806 timeout
=timeout_ns_action
),
3807 timeout
=timeout_ns_action
+ 10)
3808 self
.logger
.debug(logging_text
+ " Upgrade of kdu {} done".format(detailed_status
))
3809 elif primitive_name
== "rollback":
3810 detailed_status
= await asyncio
.wait_for(
3811 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
3812 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3813 kdu_instance
=kdu
.get("kdu-instance"),
3815 timeout
=timeout_ns_action
)
3816 elif primitive_name
== "status":
3817 detailed_status
= await asyncio
.wait_for(
3818 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
3819 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3820 kdu_instance
=kdu
.get("kdu-instance")),
3821 timeout
=timeout_ns_action
)
3823 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(kdu
["kdu-name"], nsr_id
)
3824 params
= self
._map
_primitive
_params
(config_primitive_desc
, primitive_params
, desc_params
)
3826 detailed_status
= await asyncio
.wait_for(
3827 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
3828 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3829 kdu_instance
=kdu_instance
,
3830 primitive_name
=primitive_name
,
3831 params
=params
, db_dict
=db_dict
,
3832 timeout
=timeout_ns_action
),
3833 timeout
=timeout_ns_action
)
3836 nslcmop_operation_state
= 'COMPLETED'
3838 detailed_status
= ''
3839 nslcmop_operation_state
= 'FAILED'
3841 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
3842 member_vnf_index
=vnf_index
,
3844 vdu_count_index
=vdu_count_index
,
3845 ee_descriptor_id
=ee_descriptor_id
)
3846 db_nslcmop_notif
= {"collection": "nslcmops",
3847 "filter": {"_id": nslcmop_id
},
3848 "path": "admin.VCA"}
3849 nslcmop_operation_state
, detailed_status
= await self
._ns
_execute
_primitive
(
3851 primitive
=primitive_name
,
3852 primitive_params
=self
._map
_primitive
_params
(config_primitive_desc
, primitive_params
, desc_params
),
3853 timeout
=timeout_ns_action
,
3855 db_dict
=db_nslcmop_notif
)
3857 db_nslcmop_update
["detailed-status"] = detailed_status
3858 error_description_nslcmop
= detailed_status
if nslcmop_operation_state
== "FAILED" else ""
3859 self
.logger
.debug(logging_text
+ " task Done with result {} {}".format(nslcmop_operation_state
,
3861 return # database update is called inside finally
3863 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
3864 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3866 except asyncio
.CancelledError
:
3867 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(step
))
3868 exc
= "Operation was cancelled"
3869 except asyncio
.TimeoutError
:
3870 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
3872 except Exception as e
:
3873 exc
= traceback
.format_exc()
3874 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
3877 db_nslcmop_update
["detailed-status"] = detailed_status
= error_description_nslcmop
= \
3878 "FAILED {}: {}".format(step
, exc
)
3879 nslcmop_operation_state
= "FAILED"
3881 self
._write
_ns
_status
(
3883 ns_state
=db_nsr
["nsState"], # TODO check if degraded. For the moment use previous status
3884 current_operation
="IDLE",
3885 current_operation_id
=None,
3886 # error_description=error_description_nsr,
3887 # error_detail=error_detail,
3888 other_update
=db_nsr_update
3891 self
._write
_op
_status
(
3894 error_message
=error_description_nslcmop
,
3895 operation_state
=nslcmop_operation_state
,
3896 other_update
=db_nslcmop_update
,
3899 if nslcmop_operation_state
:
3901 await self
.msg
.aiowrite("ns", "actioned", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
3902 "operationState": nslcmop_operation_state
},
3904 except Exception as e
:
3905 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
3906 self
.logger
.debug(logging_text
+ "Exit")
3907 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
3908 return nslcmop_operation_state
, detailed_status
3910 async def scale(self
, nsr_id
, nslcmop_id
):
3912 # Try to lock HA task here
3913 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3914 if not task_is_locked_by_me
:
3917 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
3918 self
.logger
.debug(logging_text
+ "Enter")
3919 # get all needed from database
3922 db_nslcmop_update
= {}
3923 nslcmop_operation_state
= None
3926 # in case of error, indicates what part of scale was failed to put nsr at error status
3927 scale_process
= None
3928 old_operational_status
= ""
3929 old_config_status
= ""
3931 # wait for any previous tasks in process
3932 step
= "Waiting for previous operations to terminate"
3933 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
3935 self
._write
_ns
_status
(
3938 current_operation
="SCALING",
3939 current_operation_id
=nslcmop_id
3942 step
= "Getting nslcmop from database"
3943 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
3944 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3945 step
= "Getting nsr from database"
3946 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3948 old_operational_status
= db_nsr
["operational-status"]
3949 old_config_status
= db_nsr
["config-status"]
3950 step
= "Parsing scaling parameters"
3951 # self.logger.debug(step)
3952 db_nsr_update
["operational-status"] = "scaling"
3953 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3954 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3957 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3958 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
3959 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3960 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3961 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3964 RO_nsr_id
= nsr_deployed
["RO"]["nsr_id"]
3965 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3966 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3967 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
3968 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
3970 # for backward compatibility
3971 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
3972 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
3973 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
3974 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3976 step
= "Getting vnfr from database"
3977 db_vnfr
= self
.db
.get_one("vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
})
3978 step
= "Getting vnfd from database"
3979 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
3981 step
= "Getting scaling-group-descriptor"
3982 for scaling_descriptor
in db_vnfd
["scaling-group-descriptor"]:
3983 if scaling_descriptor
["name"] == scaling_group
:
3986 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3987 "at vnfd:scaling-group-descriptor".format(scaling_group
))
3990 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
3991 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
3992 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
3995 # TODO check if ns is in a proper status
3996 step
= "Sending scale order to VIM"
3998 if not db_nsr
["_admin"].get("scaling-group"):
3999 self
.update_db_2("nsrs", nsr_id
, {"_admin.scaling-group": [{"name": scaling_group
, "nb-scale-op": 0}]})
4000 admin_scale_index
= 0
4002 for admin_scale_index
, admin_scale_info
in enumerate(db_nsr
["_admin"]["scaling-group"]):
4003 if admin_scale_info
["name"] == scaling_group
:
4004 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
4006 else: # not found, set index one plus last element and add new entry with the name
4007 admin_scale_index
+= 1
4008 db_nsr_update
["_admin.scaling-group.{}.name".format(admin_scale_index
)] = scaling_group
4009 RO_scaling_info
= []
4010 vdu_scaling_info
= {"scaling_group_name": scaling_group
, "vdu": []}
4011 if scaling_type
== "SCALE_OUT":
4012 # count if max-instance-count is reached
4013 max_instance_count
= scaling_descriptor
.get("max-instance-count", 10)
4014 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
4015 if nb_scale_op
>= max_instance_count
:
4016 raise LcmException("reached the limit of {} (max-instance-count) "
4017 "scaling-out operations for the "
4018 "scaling-group-descriptor '{}'".format(nb_scale_op
, scaling_group
))
4021 vdu_scaling_info
["scaling_direction"] = "OUT"
4022 vdu_scaling_info
["vdu-create"] = {}
4023 for vdu_scale_info
in scaling_descriptor
["vdu"]:
4024 vdud
= next(vdu
for vdu
in db_vnfd
.get("vdu") if vdu
["id"] == vdu_scale_info
["vdu-id-ref"])
4025 vdu_index
= len([x
for x
in db_vnfr
.get("vdur", ())
4026 if x
.get("vdu-id-ref") == vdu_scale_info
["vdu-id-ref"] and
4027 x
.get("member-vnf-index-ref") == vnf_index
])
4028 cloud_init_text
= self
._get
_cloud
_init
(vdud
, db_vnfd
)
4030 additional_params
= self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"]) or {}
4031 cloud_init_list
= []
4032 for x
in range(vdu_scale_info
.get("count", 1)):
4034 # TODO Information of its own ip is not available because db_vnfr is not updated.
4035 additional_params
["OSM"] = self
._get
_osm
_params
(db_vnfr
, vdu_scale_info
["vdu-id-ref"],
4037 cloud_init_list
.append(self
._parse
_cloud
_init
(cloud_init_text
, additional_params
,
4038 db_vnfd
["id"], vdud
["id"]))
4039 RO_scaling_info
.append({"osm_vdu_id": vdu_scale_info
["vdu-id-ref"], "member-vnf-index": vnf_index
,
4040 "type": "create", "count": vdu_scale_info
.get("count", 1)})
4042 RO_scaling_info
[-1]["cloud_init"] = cloud_init_list
4043 vdu_scaling_info
["vdu-create"][vdu_scale_info
["vdu-id-ref"]] = vdu_scale_info
.get("count", 1)
4045 elif scaling_type
== "SCALE_IN":
4046 # count if min-instance-count is reached
4047 min_instance_count
= 0
4048 if "min-instance-count" in scaling_descriptor
and scaling_descriptor
["min-instance-count"] is not None:
4049 min_instance_count
= int(scaling_descriptor
["min-instance-count"])
4050 if nb_scale_op
<= min_instance_count
:
4051 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
4052 "scaling-group-descriptor '{}'".format(nb_scale_op
, scaling_group
))
4054 vdu_scaling_info
["scaling_direction"] = "IN"
4055 vdu_scaling_info
["vdu-delete"] = {}
4056 for vdu_scale_info
in scaling_descriptor
["vdu"]:
4057 RO_scaling_info
.append({"osm_vdu_id": vdu_scale_info
["vdu-id-ref"], "member-vnf-index": vnf_index
,
4058 "type": "delete", "count": vdu_scale_info
.get("count", 1)})
4059 vdu_scaling_info
["vdu-delete"][vdu_scale_info
["vdu-id-ref"]] = vdu_scale_info
.get("count", 1)
4061 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
4062 vdu_create
= vdu_scaling_info
.get("vdu-create")
4063 vdu_delete
= copy(vdu_scaling_info
.get("vdu-delete"))
4064 if vdu_scaling_info
["scaling_direction"] == "IN":
4065 for vdur
in reversed(db_vnfr
["vdur"]):
4066 if vdu_delete
.get(vdur
["vdu-id-ref"]):
4067 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
4068 vdu_scaling_info
["vdu"].append({
4069 "name": vdur
["name"],
4070 "vdu_id": vdur
["vdu-id-ref"],
4073 for interface
in vdur
["interfaces"]:
4074 vdu_scaling_info
["vdu"][-1]["interface"].append({
4075 "name": interface
["name"],
4076 "ip_address": interface
["ip-address"],
4077 "mac_address": interface
.get("mac-address"),
4079 vdu_delete
= vdu_scaling_info
.pop("vdu-delete")
4082 step
= "Executing pre-scale vnf-config-primitive"
4083 if scaling_descriptor
.get("scaling-config-action"):
4084 for scaling_config_action
in scaling_descriptor
["scaling-config-action"]:
4085 if (scaling_config_action
.get("trigger") == "pre-scale-in" and scaling_type
== "SCALE_IN") \
4086 or (scaling_config_action
.get("trigger") == "pre-scale-out" and scaling_type
== "SCALE_OUT"):
4087 vnf_config_primitive
= scaling_config_action
["vnf-config-primitive-name-ref"]
4088 step
= db_nslcmop_update
["detailed-status"] = \
4089 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive
)
4091 # look for primitive
4092 for config_primitive
in db_vnfd
.get("vnf-configuration", {}).get("config-primitive", ()):
4093 if config_primitive
["name"] == vnf_config_primitive
:
4097 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
4098 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
4099 "primitive".format(scaling_group
, vnf_config_primitive
))
4101 vnfr_params
= {"VDU_SCALE_INFO": vdu_scaling_info
}
4102 if db_vnfr
.get("additionalParamsForVnf"):
4103 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
4105 scale_process
= "VCA"
4106 db_nsr_update
["config-status"] = "configuring pre-scaling"
4107 primitive_params
= self
._map
_primitive
_params
(config_primitive
, {}, vnfr_params
)
4109 # Pre-scale retry check: Check if this sub-operation has been executed before
4110 op_index
= self
._check
_or
_add
_scale
_suboperation
(
4111 db_nslcmop
, nslcmop_id
, vnf_index
, vnf_config_primitive
, primitive_params
, 'PRE-SCALE')
4112 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
4113 # Skip sub-operation
4114 result
= 'COMPLETED'
4115 result_detail
= 'Done'
4116 self
.logger
.debug(logging_text
+
4117 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
4118 vnf_config_primitive
, result
, result_detail
))
4120 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
4121 # New sub-operation: Get index of this sub-operation
4122 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
4123 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} New sub-operation".
4124 format(vnf_config_primitive
))
4126 # retry: Get registered params for this existing sub-operation
4127 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
4128 vnf_index
= op
.get('member_vnf_index')
4129 vnf_config_primitive
= op
.get('primitive')
4130 primitive_params
= op
.get('primitive_params')
4131 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Sub-operation retry".
4132 format(vnf_config_primitive
))
4133 # Execute the primitive, either with new (first-time) or registered (reintent) args
4134 ee_descriptor_id
= config_primitive
.get("execution-environment-ref")
4135 primitive_name
= config_primitive
.get("execution-environment-primitive",
4136 vnf_config_primitive
)
4137 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
4138 member_vnf_index
=vnf_index
,
4140 vdu_count_index
=None,
4141 ee_descriptor_id
=ee_descriptor_id
)
4142 result
, result_detail
= await self
._ns
_execute
_primitive
(
4143 ee_id
, primitive_name
, primitive_params
, vca_type
)
4144 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Done with result {} {}".format(
4145 vnf_config_primitive
, result
, result_detail
))
4146 # Update operationState = COMPLETED | FAILED
4147 self
._update
_suboperation
_status
(
4148 db_nslcmop
, op_index
, result
, result_detail
)
4150 if result
== "FAILED":
4151 raise LcmException(result_detail
)
4152 db_nsr_update
["config-status"] = old_config_status
4153 scale_process
= None
4157 # Should this block be skipped if 'RO_nsr_id' == None ?
4158 # if (RO_nsr_id and RO_scaling_info):
4160 scale_process
= "RO"
4161 # Scale RO retry check: Check if this sub-operation has been executed before
4162 op_index
= self
._check
_or
_add
_scale
_suboperation
(
4163 db_nslcmop
, vnf_index
, None, None, 'SCALE-RO', RO_nsr_id
, RO_scaling_info
)
4164 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
4165 # Skip sub-operation
4166 result
= 'COMPLETED'
4167 result_detail
= 'Done'
4168 self
.logger
.debug(logging_text
+ "Skipped sub-operation RO, result {} {}".format(
4169 result
, result_detail
))
4171 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
4172 # New sub-operation: Get index of this sub-operation
4173 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
4174 self
.logger
.debug(logging_text
+ "New sub-operation RO")
4176 # retry: Get registered params for this existing sub-operation
4177 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
4178 RO_nsr_id
= op
.get('RO_nsr_id')
4179 RO_scaling_info
= op
.get('RO_scaling_info')
4180 self
.logger
.debug(logging_text
+ "Sub-operation RO retry for primitive {}".format(
4181 vnf_config_primitive
))
4183 RO_desc
= await self
.RO
.create_action("ns", RO_nsr_id
, {"vdu-scaling": RO_scaling_info
})
4184 db_nsr_update
["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)] = nb_scale_op
4185 db_nsr_update
["_admin.scaling-group.{}.time".format(admin_scale_index
)] = time()
4187 RO_nslcmop_id
= RO_desc
["instance_action_id"]
4188 db_nslcmop_update
["_admin.deploy.RO"] = RO_nslcmop_id
4190 RO_task_done
= False
4191 step
= detailed_status
= "Waiting for VIM to scale. RO_task_id={}.".format(RO_nslcmop_id
)
4192 detailed_status_old
= None
4193 self
.logger
.debug(logging_text
+ step
)
4195 deployment_timeout
= 1 * 3600 # One hour
4196 while deployment_timeout
> 0:
4197 if not RO_task_done
:
4198 desc
= await self
.RO
.show("ns", item_id_name
=RO_nsr_id
, extra_item
="action",
4199 extra_item_id
=RO_nslcmop_id
)
4202 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4204 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4205 if ns_status
== "ERROR":
4206 raise ROclient
.ROClientException(ns_status_info
)
4207 elif ns_status
== "BUILD":
4208 detailed_status
= step
+ "; {}".format(ns_status_info
)
4209 elif ns_status
== "ACTIVE":
4211 self
.scale_vnfr(db_vnfr
, vdu_create
=vdu_create
, vdu_delete
=vdu_delete
)
4212 step
= detailed_status
= "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id
)
4213 self
.logger
.debug(logging_text
+ step
)
4215 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status
)
4217 desc
= await self
.RO
.show("ns", RO_nsr_id
)
4218 ns_status
, ns_status_info
= self
.RO
.check_ns_status(desc
)
4220 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4222 if ns_status
== "ERROR":
4223 raise ROclient
.ROClientException(ns_status_info
)
4224 elif ns_status
== "BUILD":
4225 detailed_status
= step
+ "; {}".format(ns_status_info
)
4226 elif ns_status
== "ACTIVE":
4227 step
= detailed_status
= \
4228 "Waiting for management IP address reported by the VIM. Updating VNFRs"
4230 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
4231 self
.ns_update_vnfr({db_vnfr
["member-vnf-index-ref"]: db_vnfr
}, desc
)
4233 except LcmExceptionNoMgmtIP
:
4236 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status
)
4237 if detailed_status
!= detailed_status_old
:
4238 self
._update
_suboperation
_status
(
4239 db_nslcmop
, op_index
, 'COMPLETED', detailed_status
)
4240 detailed_status_old
= db_nslcmop_update
["detailed-status"] = detailed_status
4241 self
.update_db_2("nslcmops", nslcmop_id
, db_nslcmop_update
)
4243 await asyncio
.sleep(5, loop
=self
.loop
)
4244 deployment_timeout
-= 5
4245 if deployment_timeout
<= 0:
4246 self
._update
_suboperation
_status
(
4247 db_nslcmop
, nslcmop_id
, op_index
, 'FAILED', "Timeout when waiting for ns to get ready")
4248 raise ROclient
.ROClientException("Timeout waiting ns to be ready")
4250 # update VDU_SCALING_INFO with the obtained ip_addresses
4251 if vdu_scaling_info
["scaling_direction"] == "OUT":
4252 for vdur
in reversed(db_vnfr
["vdur"]):
4253 if vdu_scaling_info
["vdu-create"].get(vdur
["vdu-id-ref"]):
4254 vdu_scaling_info
["vdu-create"][vdur
["vdu-id-ref"]] -= 1
4255 vdu_scaling_info
["vdu"].append({
4256 "name": vdur
["name"],
4257 "vdu_id": vdur
["vdu-id-ref"],
4260 for interface
in vdur
["interfaces"]:
4261 vdu_scaling_info
["vdu"][-1]["interface"].append({
4262 "name": interface
["name"],
4263 "ip_address": interface
["ip-address"],
4264 "mac_address": interface
.get("mac-address"),
4266 del vdu_scaling_info
["vdu-create"]
4268 self
._update
_suboperation
_status
(db_nslcmop
, op_index
, 'COMPLETED', 'Done')
4271 scale_process
= None
4273 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4276 # execute primitive service POST-SCALING
4277 step
= "Executing post-scale vnf-config-primitive"
4278 if scaling_descriptor
.get("scaling-config-action"):
4279 for scaling_config_action
in scaling_descriptor
["scaling-config-action"]:
4280 if (scaling_config_action
.get("trigger") == "post-scale-in" and scaling_type
== "SCALE_IN") \
4281 or (scaling_config_action
.get("trigger") == "post-scale-out" and scaling_type
== "SCALE_OUT"):
4282 vnf_config_primitive
= scaling_config_action
["vnf-config-primitive-name-ref"]
4283 step
= db_nslcmop_update
["detailed-status"] = \
4284 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive
)
4286 vnfr_params
= {"VDU_SCALE_INFO": vdu_scaling_info
}
4287 if db_vnfr
.get("additionalParamsForVnf"):
4288 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
4290 # look for primitive
4291 for config_primitive
in db_vnfd
.get("vnf-configuration", {}).get("config-primitive", ()):
4292 if config_primitive
["name"] == vnf_config_primitive
:
4296 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
4297 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
4298 "config-primitive".format(scaling_group
, vnf_config_primitive
))
4299 scale_process
= "VCA"
4300 db_nsr_update
["config-status"] = "configuring post-scaling"
4301 primitive_params
= self
._map
_primitive
_params
(config_primitive
, {}, vnfr_params
)
4303 # Post-scale retry check: Check if this sub-operation has been executed before
4304 op_index
= self
._check
_or
_add
_scale
_suboperation
(
4305 db_nslcmop
, nslcmop_id
, vnf_index
, vnf_config_primitive
, primitive_params
, 'POST-SCALE')
4306 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
4307 # Skip sub-operation
4308 result
= 'COMPLETED'
4309 result_detail
= 'Done'
4310 self
.logger
.debug(logging_text
+
4311 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
4312 format(vnf_config_primitive
, result
, result_detail
))
4314 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
4315 # New sub-operation: Get index of this sub-operation
4316 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
4317 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} New sub-operation".
4318 format(vnf_config_primitive
))
4320 # retry: Get registered params for this existing sub-operation
4321 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
4322 vnf_index
= op
.get('member_vnf_index')
4323 vnf_config_primitive
= op
.get('primitive')
4324 primitive_params
= op
.get('primitive_params')
4325 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Sub-operation retry".
4326 format(vnf_config_primitive
))
4327 # Execute the primitive, either with new (first-time) or registered (reintent) args
4328 ee_descriptor_id
= config_primitive
.get("execution-environment-ref")
4329 primitive_name
= config_primitive
.get("execution-environment-primitive",
4330 vnf_config_primitive
)
4331 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
4332 member_vnf_index
=vnf_index
,
4334 vdu_count_index
=None,
4335 ee_descriptor_id
=ee_descriptor_id
)
4336 result
, result_detail
= await self
._ns
_execute
_primitive
(
4337 ee_id
, primitive_name
, primitive_params
, vca_type
)
4338 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Done with result {} {}".format(
4339 vnf_config_primitive
, result
, result_detail
))
4340 # Update operationState = COMPLETED | FAILED
4341 self
._update
_suboperation
_status
(
4342 db_nslcmop
, op_index
, result
, result_detail
)
4344 if result
== "FAILED":
4345 raise LcmException(result_detail
)
4346 db_nsr_update
["config-status"] = old_config_status
4347 scale_process
= None
4350 db_nsr_update
["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
4351 db_nsr_update
["operational-status"] = "running" if old_operational_status
== "failed" \
4352 else old_operational_status
4353 db_nsr_update
["config-status"] = old_config_status
4355 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
4356 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4358 except asyncio
.CancelledError
:
4359 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(step
))
4360 exc
= "Operation was cancelled"
4361 except Exception as e
:
4362 exc
= traceback
.format_exc()
4363 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
4365 self
._write
_ns
_status
(
4368 current_operation
="IDLE",
4369 current_operation_id
=None
4372 db_nslcmop_update
["detailed-status"] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
4373 nslcmop_operation_state
= "FAILED"
4375 db_nsr_update
["operational-status"] = old_operational_status
4376 db_nsr_update
["config-status"] = old_config_status
4377 db_nsr_update
["detailed-status"] = ""
4379 if "VCA" in scale_process
:
4380 db_nsr_update
["config-status"] = "failed"
4381 if "RO" in scale_process
:
4382 db_nsr_update
["operational-status"] = "failed"
4383 db_nsr_update
["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id
, step
,
4386 error_description_nslcmop
= None
4387 nslcmop_operation_state
= "COMPLETED"
4388 db_nslcmop_update
["detailed-status"] = "Done"
4390 self
._write
_op
_status
(
4393 error_message
=error_description_nslcmop
,
4394 operation_state
=nslcmop_operation_state
,
4395 other_update
=db_nslcmop_update
,
4398 self
._write
_ns
_status
(
4401 current_operation
="IDLE",
4402 current_operation_id
=None,
4403 other_update
=db_nsr_update
4406 if nslcmop_operation_state
:
4408 await self
.msg
.aiowrite("ns", "scaled", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
4409 "operationState": nslcmop_operation_state
},
4412 # await asyncio.sleep(cooldown_time, loop=self.loop)
4413 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
4414 except Exception as e
:
4415 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
4416 self
.logger
.debug(logging_text
+ "Exit")
4417 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
4419 async def add_prometheus_metrics(self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
):
4420 if not self
.prometheus
:
4422 # look if exist a file called 'prometheus*.j2' and
4423 artifact_content
= self
.fs
.dir_ls(artifact_path
)
4424 job_file
= next((f
for f
in artifact_content
if f
.startswith("prometheus") and f
.endswith(".j2")), None)
4427 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
4431 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
4432 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
4434 vnfr_id
= vnfr_id
.replace("-", "")
4436 "JOB_NAME": vnfr_id
,
4437 "TARGET_IP": target_ip
,
4438 "EXPORTER_POD_IP": host_name
,
4439 "EXPORTER_POD_PORT": host_port
,
4441 job_list
= self
.prometheus
.parse_job(job_data
, variables
)
4442 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
4443 for job
in job_list
:
4444 if not isinstance(job
.get("job_name"), str) or vnfr_id
not in job
["job_name"]:
4445 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
4446 job
["nsr_id"] = nsr_id
4447 job_dict
= {jl
["job_name"]: jl
for jl
in job_list
}
4448 if await self
.prometheus
.update(job_dict
):
4449 return list(job_dict
.keys())