1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
22 import logging
.handlers
23 from osm_lcm
import ROclient
24 from osm_lcm
.lcm_utils
import LcmException
, LcmBase
, deep_get
25 from n2vc
.k8s_helm_conn
import K8sHelmConnector
26 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
27 from n2vc
.k8s_juju_conn
import K8sJujuConnector
28 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
29 from n2vc
.exceptions
import K8sException
, N2VCException
30 from osm_common
.dbbase
import DbException
31 from copy
import deepcopy
34 __author__
= "Alfonso Tierno"
37 class VimLcm(LcmBase
):
38 # values that are encrypted at vim config because they are passwords
39 vim_config_encrypted
= {
40 "1.1": ("admin_password", "nsx_password", "vcenter_password"),
49 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
51 Init, Connect to database, filesystem storage, and messaging
52 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
56 self
.logger
= logging
.getLogger("lcm.vim")
58 self
.lcm_tasks
= lcm_tasks
59 self
.ro_config
= config
["RO"]
61 super().__init
__(msg
, self
.logger
)
63 async def create(self
, vim_content
, order_id
):
64 # HA tasks and backward compatibility:
65 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
66 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
67 # Register 'create' task here for related future HA operations
68 op_id
= vim_content
.pop("op_id", None)
69 if not self
.lcm_tasks
.lock_HA("vim", "create", op_id
):
72 vim_id
= vim_content
["_id"]
73 logging_text
= "Task vim_create={} ".format(vim_id
)
74 self
.logger
.debug(logging_text
+ "Enter")
81 step
= "Getting vim-id='{}' from db".format(vim_id
)
82 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
83 if vim_content
.get("config") and vim_content
["config"].get(
86 step
= "Getting sdn-controller-id='{}' from db".format(
87 vim_content
["config"]["sdn-controller"]
89 db_sdn
= self
.db
.get_one(
90 "sdns", {"_id": vim_content
["config"]["sdn-controller"]}
93 # If the VIM account has an associated SDN account, also
94 # wait for any previous tasks in process for the SDN
95 await self
.lcm_tasks
.waitfor_related_HA("sdn", "ANY", db_sdn
["_id"])
99 and db_sdn
["_admin"].get("deployed")
100 and db_sdn
["_admin"]["deployed"].get("RO")
102 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
105 "sdn-controller={} is not available. Not deployed at RO".format(
106 vim_content
["config"]["sdn-controller"]
110 step
= "Creating vim at RO"
111 db_vim_update
["_admin.deployed.RO"] = None
112 db_vim_update
["_admin.detailed-status"] = step
113 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
114 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
115 vim_RO
= deepcopy(vim_content
)
116 vim_RO
.pop("_id", None)
117 vim_RO
.pop("_admin", None)
118 schema_version
= vim_RO
.pop("schema_version", None)
119 vim_RO
.pop("schema_type", None)
120 vim_RO
.pop("vim_tenant_name", None)
121 vim_RO
["type"] = vim_RO
.pop("vim_type")
122 vim_RO
.pop("vim_user", None)
123 vim_RO
.pop("vim_password", None)
125 vim_RO
["config"]["sdn-controller"] = RO_sdn_id
126 desc
= await RO
.create("vim", descriptor
=vim_RO
)
127 RO_vim_id
= desc
["uuid"]
128 db_vim_update
["_admin.deployed.RO"] = RO_vim_id
130 logging_text
+ "VIM created at RO_vim_id={}".format(RO_vim_id
)
133 step
= "Creating vim_account at RO"
134 db_vim_update
["_admin.detailed-status"] = step
135 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
137 if vim_content
.get("vim_password"):
138 vim_content
["vim_password"] = self
.db
.decrypt(
139 vim_content
["vim_password"],
140 schema_version
=schema_version
,
144 "vim_tenant_name": vim_content
["vim_tenant_name"],
145 "vim_username": vim_content
["vim_user"],
146 "vim_password": vim_content
["vim_password"],
148 if vim_RO
.get("config"):
149 vim_account_RO
["config"] = vim_RO
["config"]
150 if "sdn-controller" in vim_account_RO
["config"]:
151 del vim_account_RO
["config"]["sdn-controller"]
152 if "sdn-port-mapping" in vim_account_RO
["config"]:
153 del vim_account_RO
["config"]["sdn-port-mapping"]
154 vim_config_encrypted_keys
= self
.vim_config_encrypted
.get(
156 ) or self
.vim_config_encrypted
.get("default")
157 for p
in vim_config_encrypted_keys
:
158 if vim_account_RO
["config"].get(p
):
159 vim_account_RO
["config"][p
] = self
.db
.decrypt(
160 vim_account_RO
["config"][p
],
161 schema_version
=schema_version
,
165 desc
= await RO
.attach("vim_account", RO_vim_id
, descriptor
=vim_account_RO
)
166 db_vim_update
["_admin.deployed.RO-account"] = desc
["uuid"]
167 db_vim_update
["_admin.operationalState"] = "ENABLED"
168 db_vim_update
["_admin.detailed-status"] = "Done"
169 # Mark the VIM 'create' HA task as successful
170 operation_state
= "COMPLETED"
171 operation_details
= "Done"
175 + "Exit Ok VIM account created at RO_vim_account_id={}".format(
181 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
182 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
184 except Exception as e
:
185 self
.logger
.critical(
186 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
191 db_vim_update
["_admin.operationalState"] = "ERROR"
192 db_vim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
195 # Mark the VIM 'create' HA task as erroneous
196 operation_state
= "FAILED"
197 operation_details
= "ERROR {}: {}".format(step
, exc
)
200 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
201 # Register the VIM 'create' HA task either
202 # succesful or erroneous, or do nothing (if legacy NBI)
203 self
.lcm_tasks
.unlock_HA(
207 operationState
=operation_state
,
208 detailed_status
=operation_details
,
210 except DbException
as e
:
211 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
213 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
215 async def edit(self
, vim_content
, order_id
):
216 # HA tasks and backward compatibility:
217 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
218 # In such a case, HA is not supported by NBI, and the HA check always returns True
219 op_id
= vim_content
.pop("op_id", None)
220 if not self
.lcm_tasks
.lock_HA("vim", "edit", op_id
):
223 vim_id
= vim_content
["_id"]
224 logging_text
= "Task vim_edit={} ".format(vim_id
)
225 self
.logger
.debug(logging_text
+ "Enter")
232 step
= "Getting vim-id='{}' from db".format(vim_id
)
234 # wait for any previous tasks in process
235 await self
.lcm_tasks
.waitfor_related_HA("vim", "edit", op_id
)
237 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
241 and db_vim
["_admin"].get("deployed")
242 and db_vim
["_admin"]["deployed"].get("RO")
244 if vim_content
.get("config") and vim_content
["config"].get(
247 step
= "Getting sdn-controller-id='{}' from db".format(
248 vim_content
["config"]["sdn-controller"]
250 db_sdn
= self
.db
.get_one(
251 "sdns", {"_id": vim_content
["config"]["sdn-controller"]}
254 # If the VIM account has an associated SDN account, also
255 # wait for any previous tasks in process for the SDN
256 await self
.lcm_tasks
.waitfor_related_HA("sdn", "ANY", db_sdn
["_id"])
260 and db_sdn
["_admin"].get("deployed")
261 and db_sdn
["_admin"]["deployed"].get("RO")
263 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
266 "sdn-controller={} is not available. Not deployed at RO".format(
267 vim_content
["config"]["sdn-controller"]
271 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
272 step
= "Editing vim at RO"
273 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
274 vim_RO
= deepcopy(vim_content
)
275 vim_RO
.pop("_id", None)
276 vim_RO
.pop("_admin", None)
277 schema_version
= vim_RO
.pop("schema_version", None)
278 vim_RO
.pop("schema_type", None)
279 vim_RO
.pop("vim_tenant_name", None)
280 if "vim_type" in vim_RO
:
281 vim_RO
["type"] = vim_RO
.pop("vim_type")
282 vim_RO
.pop("vim_user", None)
283 vim_RO
.pop("vim_password", None)
285 vim_RO
["config"]["sdn-controller"] = RO_sdn_id
286 # TODO make a deep update of sdn-port-mapping
288 await RO
.edit("vim", RO_vim_id
, descriptor
=vim_RO
)
290 step
= "Editing vim-account at RO tenant"
292 if "config" in vim_content
:
293 if "sdn-controller" in vim_content
["config"]:
294 del vim_content
["config"]["sdn-controller"]
295 if "sdn-port-mapping" in vim_content
["config"]:
296 del vim_content
["config"]["sdn-port-mapping"]
297 if not vim_content
["config"]:
298 del vim_content
["config"]
299 if "vim_tenant_name" in vim_content
:
300 vim_account_RO
["vim_tenant_name"] = vim_content
["vim_tenant_name"]
301 if "vim_password" in vim_content
:
302 vim_account_RO
["vim_password"] = vim_content
["vim_password"]
303 if vim_content
.get("vim_password"):
304 vim_account_RO
["vim_password"] = self
.db
.decrypt(
305 vim_content
["vim_password"],
306 schema_version
=schema_version
,
309 if "config" in vim_content
:
310 vim_account_RO
["config"] = vim_content
["config"]
311 if vim_content
.get("config"):
312 vim_config_encrypted_keys
= self
.vim_config_encrypted
.get(
314 ) or self
.vim_config_encrypted
.get("default")
315 for p
in vim_config_encrypted_keys
:
316 if vim_content
["config"].get(p
):
317 vim_account_RO
["config"][p
] = self
.db
.decrypt(
318 vim_content
["config"][p
],
319 schema_version
=schema_version
,
323 if "vim_user" in vim_content
:
324 vim_content
["vim_username"] = vim_content
["vim_user"]
325 # vim_account must be edited always even if empty in order to ensure changes are translated to RO
326 # vim_thread. RO will remove and relaunch a new thread for this vim_account
327 await RO
.edit("vim_account", RO_vim_id
, descriptor
=vim_account_RO
)
328 db_vim_update
["_admin.operationalState"] = "ENABLED"
329 # Mark the VIM 'edit' HA task as successful
330 operation_state
= "COMPLETED"
331 operation_details
= "Done"
333 self
.logger
.debug(logging_text
+ "Exit Ok RO_vim_id={}".format(RO_vim_id
))
336 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
337 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
339 except Exception as e
:
340 self
.logger
.critical(
341 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
346 db_vim_update
["_admin.operationalState"] = "ERROR"
347 db_vim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
350 # Mark the VIM 'edit' HA task as erroneous
351 operation_state
= "FAILED"
352 operation_details
= "ERROR {}: {}".format(step
, exc
)
355 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
356 # Register the VIM 'edit' HA task either
357 # succesful or erroneous, or do nothing (if legacy NBI)
358 self
.lcm_tasks
.unlock_HA(
362 operationState
=operation_state
,
363 detailed_status
=operation_details
,
365 except DbException
as e
:
366 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
368 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
370 async def delete(self
, vim_content
, order_id
):
371 # HA tasks and backward compatibility:
372 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
373 # In such a case, HA is not supported by NBI, and the HA check always returns True
374 op_id
= vim_content
.pop("op_id", None)
375 if not self
.lcm_tasks
.lock_HA("vim", "delete", op_id
):
378 vim_id
= vim_content
["_id"]
379 logging_text
= "Task vim_delete={} ".format(vim_id
)
380 self
.logger
.debug(logging_text
+ "Enter")
385 step
= "Getting vim from db"
387 # wait for any previous tasks in process
388 await self
.lcm_tasks
.waitfor_related_HA("vim", "delete", op_id
)
389 if not self
.ro_config
.get("ng"):
390 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
393 and db_vim
["_admin"].get("deployed")
394 and db_vim
["_admin"]["deployed"].get("RO")
396 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
397 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
398 step
= "Detaching vim from RO tenant"
400 await RO
.detach("vim_account", RO_vim_id
)
401 except ROclient
.ROClientException
as e
:
402 if e
.http_code
== 404: # not found
405 + "RO_vim_id={} already detached".format(RO_vim_id
)
410 step
= "Deleting vim from RO"
412 await RO
.delete("vim", RO_vim_id
)
413 except ROclient
.ROClientException
as e
:
414 if e
.http_code
== 404: # not found
417 + "RO_vim_id={} already deleted".format(RO_vim_id
)
423 self
.logger
.debug(logging_text
+ "Nothing to remove at RO")
424 self
.db
.del_one("vim_accounts", {"_id": vim_id
})
426 self
.logger
.debug(logging_text
+ "Exit Ok")
429 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
430 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
432 except Exception as e
:
433 self
.logger
.critical(
434 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
438 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
440 db_vim_update
["_admin.operationalState"] = "ERROR"
441 db_vim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
444 # Mark the VIM 'delete' HA task as erroneous
445 operation_state
= "FAILED"
446 operation_details
= "ERROR {}: {}".format(step
, exc
)
447 self
.lcm_tasks
.unlock_HA(
451 operationState
=operation_state
,
452 detailed_status
=operation_details
,
455 if db_vim
and db_vim_update
:
456 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
457 # If the VIM 'delete' HA task was succesful, the DB entry has been deleted,
458 # which means that there is nowhere to register this task, so do nothing here.
459 except DbException
as e
:
460 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
461 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
464 class WimLcm(LcmBase
):
465 # values that are encrypted at wim config because they are passwords
466 wim_config_encrypted
= ()
468 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
470 Init, Connect to database, filesystem storage, and messaging
471 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
475 self
.logger
= logging
.getLogger("lcm.vim")
477 self
.lcm_tasks
= lcm_tasks
478 self
.ro_config
= config
["RO"]
480 super().__init
__(msg
, self
.logger
)
482 async def create(self
, wim_content
, order_id
):
483 # HA tasks and backward compatibility:
484 # If 'wim_content' does not include 'op_id', we a running a legacy NBI version.
485 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
486 # Register 'create' task here for related future HA operations
487 op_id
= wim_content
.pop("op_id", None)
488 self
.lcm_tasks
.lock_HA("wim", "create", op_id
)
490 wim_id
= wim_content
["_id"]
491 logging_text
= "Task wim_create={} ".format(wim_id
)
492 self
.logger
.debug(logging_text
+ "Enter")
498 step
= "Getting wim-id='{}' from db".format(wim_id
)
499 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_id
})
500 db_wim_update
["_admin.deployed.RO"] = None
502 step
= "Creating wim at RO"
503 db_wim_update
["_admin.detailed-status"] = step
504 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
505 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
506 wim_RO
= deepcopy(wim_content
)
507 wim_RO
.pop("_id", None)
508 wim_RO
.pop("_admin", None)
509 schema_version
= wim_RO
.pop("schema_version", None)
510 wim_RO
.pop("schema_type", None)
511 wim_RO
.pop("wim_tenant_name", None)
512 wim_RO
["type"] = wim_RO
.pop("wim_type")
513 wim_RO
.pop("wim_user", None)
514 wim_RO
.pop("wim_password", None)
515 desc
= await RO
.create("wim", descriptor
=wim_RO
)
516 RO_wim_id
= desc
["uuid"]
517 db_wim_update
["_admin.deployed.RO"] = RO_wim_id
519 logging_text
+ "WIM created at RO_wim_id={}".format(RO_wim_id
)
522 step
= "Creating wim_account at RO"
523 db_wim_update
["_admin.detailed-status"] = step
524 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
526 if wim_content
.get("wim_password"):
527 wim_content
["wim_password"] = self
.db
.decrypt(
528 wim_content
["wim_password"],
529 schema_version
=schema_version
,
533 "name": wim_content
["name"],
534 "user": wim_content
["user"],
535 "password": wim_content
["password"],
537 if wim_RO
.get("config"):
538 wim_account_RO
["config"] = wim_RO
["config"]
539 if "wim_port_mapping" in wim_account_RO
["config"]:
540 del wim_account_RO
["config"]["wim_port_mapping"]
541 for p
in self
.wim_config_encrypted
:
542 if wim_account_RO
["config"].get(p
):
543 wim_account_RO
["config"][p
] = self
.db
.decrypt(
544 wim_account_RO
["config"][p
],
545 schema_version
=schema_version
,
549 desc
= await RO
.attach("wim_account", RO_wim_id
, descriptor
=wim_account_RO
)
550 db_wim_update
["_admin.deployed.RO-account"] = desc
["uuid"]
551 db_wim_update
["_admin.operationalState"] = "ENABLED"
552 db_wim_update
["_admin.detailed-status"] = "Done"
553 # Mark the WIM 'create' HA task as successful
554 operation_state
= "COMPLETED"
555 operation_details
= "Done"
559 + "Exit Ok WIM account created at RO_wim_account_id={}".format(
565 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
566 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
568 except Exception as e
:
569 self
.logger
.critical(
570 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
575 db_wim_update
["_admin.operationalState"] = "ERROR"
576 db_wim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
579 # Mark the WIM 'create' HA task as erroneous
580 operation_state
= "FAILED"
581 operation_details
= "ERROR {}: {}".format(step
, exc
)
584 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
585 # Register the WIM 'create' HA task either
586 # succesful or erroneous, or do nothing (if legacy NBI)
587 self
.lcm_tasks
.unlock_HA(
591 operationState
=operation_state
,
592 detailed_status
=operation_details
,
594 except DbException
as e
:
595 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
596 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
598 async def edit(self
, wim_content
, order_id
):
599 # HA tasks and backward compatibility:
600 # If 'wim_content' does not include 'op_id', we a running a legacy NBI version.
601 # In such a case, HA is not supported by NBI, and the HA check always returns True
602 op_id
= wim_content
.pop("op_id", None)
603 if not self
.lcm_tasks
.lock_HA("wim", "edit", op_id
):
606 wim_id
= wim_content
["_id"]
607 logging_text
= "Task wim_edit={} ".format(wim_id
)
608 self
.logger
.debug(logging_text
+ "Enter")
614 step
= "Getting wim-id='{}' from db".format(wim_id
)
616 # wait for any previous tasks in process
617 await self
.lcm_tasks
.waitfor_related_HA("wim", "edit", op_id
)
619 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_id
})
623 and db_wim
["_admin"].get("deployed")
624 and db_wim
["_admin"]["deployed"].get("RO")
626 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO"]
627 step
= "Editing wim at RO"
628 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
629 wim_RO
= deepcopy(wim_content
)
630 wim_RO
.pop("_id", None)
631 wim_RO
.pop("_admin", None)
632 schema_version
= wim_RO
.pop("schema_version", None)
633 wim_RO
.pop("schema_type", None)
634 wim_RO
.pop("wim_tenant_name", None)
635 if "wim_type" in wim_RO
:
636 wim_RO
["type"] = wim_RO
.pop("wim_type")
637 wim_RO
.pop("wim_user", None)
638 wim_RO
.pop("wim_password", None)
639 # TODO make a deep update of wim_port_mapping
641 await RO
.edit("wim", RO_wim_id
, descriptor
=wim_RO
)
643 step
= "Editing wim-account at RO tenant"
645 if "config" in wim_content
:
646 if "wim_port_mapping" in wim_content
["config"]:
647 del wim_content
["config"]["wim_port_mapping"]
648 if not wim_content
["config"]:
649 del wim_content
["config"]
650 if "wim_tenant_name" in wim_content
:
651 wim_account_RO
["wim_tenant_name"] = wim_content
["wim_tenant_name"]
652 if "wim_password" in wim_content
:
653 wim_account_RO
["wim_password"] = wim_content
["wim_password"]
654 if wim_content
.get("wim_password"):
655 wim_account_RO
["wim_password"] = self
.db
.decrypt(
656 wim_content
["wim_password"],
657 schema_version
=schema_version
,
660 if "config" in wim_content
:
661 wim_account_RO
["config"] = wim_content
["config"]
662 if wim_content
.get("config"):
663 for p
in self
.wim_config_encrypted
:
664 if wim_content
["config"].get(p
):
665 wim_account_RO
["config"][p
] = self
.db
.decrypt(
666 wim_content
["config"][p
],
667 schema_version
=schema_version
,
671 if "wim_user" in wim_content
:
672 wim_content
["wim_username"] = wim_content
["wim_user"]
673 # wim_account must be edited always even if empty in order to ensure changes are translated to RO
674 # wim_thread. RO will remove and relaunch a new thread for this wim_account
675 await RO
.edit("wim_account", RO_wim_id
, descriptor
=wim_account_RO
)
676 db_wim_update
["_admin.operationalState"] = "ENABLED"
677 # Mark the WIM 'edit' HA task as successful
678 operation_state
= "COMPLETED"
679 operation_details
= "Done"
681 self
.logger
.debug(logging_text
+ "Exit Ok RO_wim_id={}".format(RO_wim_id
))
684 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
685 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
687 except Exception as e
:
688 self
.logger
.critical(
689 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
694 db_wim_update
["_admin.operationalState"] = "ERROR"
695 db_wim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
698 # Mark the WIM 'edit' HA task as erroneous
699 operation_state
= "FAILED"
700 operation_details
= "ERROR {}: {}".format(step
, exc
)
703 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
704 # Register the WIM 'edit' HA task either
705 # succesful or erroneous, or do nothing (if legacy NBI)
706 self
.lcm_tasks
.unlock_HA(
710 operationState
=operation_state
,
711 detailed_status
=operation_details
,
713 except DbException
as e
:
714 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
715 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
717 async def delete(self
, wim_content
, order_id
):
718 # HA tasks and backward compatibility:
719 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
720 # In such a case, HA is not supported by NBI, and the HA check always returns True
721 op_id
= wim_content
.pop("op_id", None)
722 if not self
.lcm_tasks
.lock_HA("wim", "delete", op_id
):
725 wim_id
= wim_content
["_id"]
726 logging_text
= "Task wim_delete={} ".format(wim_id
)
727 self
.logger
.debug(logging_text
+ "Enter")
732 step
= "Getting wim from db"
734 # wait for any previous tasks in process
735 await self
.lcm_tasks
.waitfor_related_HA("wim", "delete", op_id
)
737 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_id
})
740 and db_wim
["_admin"].get("deployed")
741 and db_wim
["_admin"]["deployed"].get("RO")
743 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO"]
744 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
745 step
= "Detaching wim from RO tenant"
747 await RO
.detach("wim_account", RO_wim_id
)
748 except ROclient
.ROClientException
as e
:
749 if e
.http_code
== 404: # not found
752 + "RO_wim_id={} already detached".format(RO_wim_id
)
757 step
= "Deleting wim from RO"
759 await RO
.delete("wim", RO_wim_id
)
760 except ROclient
.ROClientException
as e
:
761 if e
.http_code
== 404: # not found
764 + "RO_wim_id={} already deleted".format(RO_wim_id
)
770 self
.logger
.error(logging_text
+ "Nothing to remove at RO")
771 self
.db
.del_one("wim_accounts", {"_id": wim_id
})
773 self
.logger
.debug(logging_text
+ "Exit Ok")
776 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
777 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
779 except Exception as e
:
780 self
.logger
.critical(
781 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
785 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
787 db_wim_update
["_admin.operationalState"] = "ERROR"
788 db_wim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
791 # Mark the WIM 'delete' HA task as erroneous
792 operation_state
= "FAILED"
793 operation_details
= "ERROR {}: {}".format(step
, exc
)
794 self
.lcm_tasks
.unlock_HA(
798 operationState
=operation_state
,
799 detailed_status
=operation_details
,
802 if db_wim
and db_wim_update
:
803 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
804 # If the WIM 'delete' HA task was succesful, the DB entry has been deleted,
805 # which means that there is nowhere to register this task, so do nothing here.
806 except DbException
as e
:
807 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
808 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
811 class SdnLcm(LcmBase
):
812 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
814 Init, Connect to database, filesystem storage, and messaging
815 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
819 self
.logger
= logging
.getLogger("lcm.sdn")
821 self
.lcm_tasks
= lcm_tasks
822 self
.ro_config
= config
["RO"]
824 super().__init
__(msg
, self
.logger
)
826 async def create(self
, sdn_content
, order_id
):
827 # HA tasks and backward compatibility:
828 # If 'sdn_content' does not include 'op_id', we a running a legacy NBI version.
829 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
830 # Register 'create' task here for related future HA operations
831 op_id
= sdn_content
.pop("op_id", None)
832 self
.lcm_tasks
.lock_HA("sdn", "create", op_id
)
834 sdn_id
= sdn_content
["_id"]
835 logging_text
= "Task sdn_create={} ".format(sdn_id
)
836 self
.logger
.debug(logging_text
+ "Enter")
843 step
= "Getting sdn from db"
844 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
845 db_sdn_update
["_admin.deployed.RO"] = None
847 step
= "Creating sdn at RO"
848 db_sdn_update
["_admin.detailed-status"] = step
849 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
851 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
852 sdn_RO
= deepcopy(sdn_content
)
853 sdn_RO
.pop("_id", None)
854 sdn_RO
.pop("_admin", None)
855 schema_version
= sdn_RO
.pop("schema_version", None)
856 sdn_RO
.pop("schema_type", None)
857 sdn_RO
.pop("description", None)
858 if sdn_RO
.get("password"):
859 sdn_RO
["password"] = self
.db
.decrypt(
860 sdn_RO
["password"], schema_version
=schema_version
, salt
=sdn_id
863 desc
= await RO
.create("sdn", descriptor
=sdn_RO
)
864 RO_sdn_id
= desc
["uuid"]
865 db_sdn_update
["_admin.deployed.RO"] = RO_sdn_id
866 db_sdn_update
["_admin.operationalState"] = "ENABLED"
867 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id={}".format(RO_sdn_id
))
868 # Mark the SDN 'create' HA task as successful
869 operation_state
= "COMPLETED"
870 operation_details
= "Done"
873 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
874 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
876 except Exception as e
:
877 self
.logger
.critical(
878 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
883 db_sdn_update
["_admin.operationalState"] = "ERROR"
884 db_sdn_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
887 # Mark the SDN 'create' HA task as erroneous
888 operation_state
= "FAILED"
889 operation_details
= "ERROR {}: {}".format(step
, exc
)
891 if db_sdn
and db_sdn_update
:
892 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
893 # Register the SDN 'create' HA task either
894 # succesful or erroneous, or do nothing (if legacy NBI)
895 self
.lcm_tasks
.unlock_HA(
899 operationState
=operation_state
,
900 detailed_status
=operation_details
,
902 except DbException
as e
:
903 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
904 self
.lcm_tasks
.remove("sdn", sdn_id
, order_id
)
906 async def edit(self
, sdn_content
, order_id
):
907 # HA tasks and backward compatibility:
908 # If 'sdn_content' does not include 'op_id', we a running a legacy NBI version.
909 # In such a case, HA is not supported by NBI, and the HA check always returns True
910 op_id
= sdn_content
.pop("op_id", None)
911 if not self
.lcm_tasks
.lock_HA("sdn", "edit", op_id
):
914 sdn_id
= sdn_content
["_id"]
915 logging_text
= "Task sdn_edit={} ".format(sdn_id
)
916 self
.logger
.debug(logging_text
+ "Enter")
921 step
= "Getting sdn from db"
923 # wait for any previous tasks in process
924 await self
.lcm_tasks
.waitfor_related_HA("sdn", "edit", op_id
)
926 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
930 and db_sdn
["_admin"].get("deployed")
931 and db_sdn
["_admin"]["deployed"].get("RO")
933 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
934 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
935 step
= "Editing sdn at RO"
936 sdn_RO
= deepcopy(sdn_content
)
937 sdn_RO
.pop("_id", None)
938 sdn_RO
.pop("_admin", None)
939 schema_version
= sdn_RO
.pop("schema_version", None)
940 sdn_RO
.pop("schema_type", None)
941 sdn_RO
.pop("description", None)
942 if sdn_RO
.get("password"):
943 sdn_RO
["password"] = self
.db
.decrypt(
944 sdn_RO
["password"], schema_version
=schema_version
, salt
=sdn_id
947 await RO
.edit("sdn", RO_sdn_id
, descriptor
=sdn_RO
)
948 db_sdn_update
["_admin.operationalState"] = "ENABLED"
949 # Mark the SDN 'edit' HA task as successful
950 operation_state
= "COMPLETED"
951 operation_details
= "Done"
953 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id={}".format(RO_sdn_id
))
956 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
957 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
959 except Exception as e
:
960 self
.logger
.critical(
961 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
966 db_sdn
["_admin.operationalState"] = "ERROR"
967 db_sdn
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
968 # Mark the SDN 'edit' HA task as erroneous
969 operation_state
= "FAILED"
970 operation_details
= "ERROR {}: {}".format(step
, exc
)
973 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
974 # Register the SDN 'edit' HA task either
975 # succesful or erroneous, or do nothing (if legacy NBI)
976 self
.lcm_tasks
.unlock_HA(
980 operationState
=operation_state
,
981 detailed_status
=operation_details
,
983 except DbException
as e
:
984 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
985 self
.lcm_tasks
.remove("sdn", sdn_id
, order_id
)
987 async def delete(self
, sdn_content
, order_id
):
988 # HA tasks and backward compatibility:
989 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
990 # In such a case, HA is not supported by NBI, and the HA check always returns True
991 op_id
= sdn_content
.pop("op_id", None)
992 if not self
.lcm_tasks
.lock_HA("sdn", "delete", op_id
):
995 sdn_id
= sdn_content
["_id"]
996 logging_text
= "Task sdn_delete={} ".format(sdn_id
)
997 self
.logger
.debug(logging_text
+ "Enter")
1002 step
= "Getting sdn from db"
1004 # wait for any previous tasks in process
1005 await self
.lcm_tasks
.waitfor_related_HA("sdn", "delete", op_id
)
1007 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
1009 db_sdn
.get("_admin")
1010 and db_sdn
["_admin"].get("deployed")
1011 and db_sdn
["_admin"]["deployed"].get("RO")
1013 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
1014 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
1015 step
= "Deleting sdn from RO"
1017 await RO
.delete("sdn", RO_sdn_id
)
1018 except ROclient
.ROClientException
as e
:
1019 if e
.http_code
== 404: # not found
1022 + "RO_sdn_id={} already deleted".format(RO_sdn_id
)
1029 logging_text
+ "Skipping. There is not RO information at database"
1031 self
.db
.del_one("sdns", {"_id": sdn_id
})
1033 self
.logger
.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id
))
1036 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
1037 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1039 except Exception as e
:
1040 self
.logger
.critical(
1041 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
1046 db_sdn
["_admin.operationalState"] = "ERROR"
1047 db_sdn
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
1048 # Mark the SDN 'delete' HA task as erroneous
1049 operation_state
= "FAILED"
1050 operation_details
= "ERROR {}: {}".format(step
, exc
)
1051 self
.lcm_tasks
.unlock_HA(
1055 operationState
=operation_state
,
1056 detailed_status
=operation_details
,
1059 if db_sdn
and db_sdn_update
:
1060 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
1061 # If the SDN 'delete' HA task was succesful, the DB entry has been deleted,
1062 # which means that there is nowhere to register this task, so do nothing here.
1063 except DbException
as e
:
1064 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1065 self
.lcm_tasks
.remove("sdn", sdn_id
, order_id
)
1068 class K8sClusterLcm(LcmBase
):
1069 timeout_create
= 300
1071 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
1073 Init, Connect to database, filesystem storage, and messaging
1074 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1078 self
.logger
= logging
.getLogger("lcm.k8scluster")
1080 self
.lcm_tasks
= lcm_tasks
1081 self
.vca_config
= config
["VCA"]
1083 super().__init
__(msg
, self
.logger
)
1085 self
.helm2_k8scluster
= K8sHelmConnector(
1086 kubectl_command
=self
.vca_config
.get("kubectlpath"),
1087 helm_command
=self
.vca_config
.get("helmpath"),
1094 self
.helm3_k8scluster
= K8sHelm3Connector(
1095 kubectl_command
=self
.vca_config
.get("kubectlpath"),
1096 helm_command
=self
.vca_config
.get("helm3path"),
1103 self
.juju_k8scluster
= K8sJujuConnector(
1104 kubectl_command
=self
.vca_config
.get("kubectlpath"),
1105 juju_command
=self
.vca_config
.get("jujupath"),
1114 "helm-chart": self
.helm2_k8scluster
,
1115 "helm-chart-v3": self
.helm3_k8scluster
,
1116 "juju-bundle": self
.juju_k8scluster
,
1119 async def create(self
, k8scluster_content
, order_id
):
1120 op_id
= k8scluster_content
.pop("op_id", None)
1121 if not self
.lcm_tasks
.lock_HA("k8scluster", "create", op_id
):
1124 k8scluster_id
= k8scluster_content
["_id"]
1125 logging_text
= "Task k8scluster_create={} ".format(k8scluster_id
)
1126 self
.logger
.debug(logging_text
+ "Enter")
1128 db_k8scluster
= None
1129 db_k8scluster_update
= {}
1132 step
= "Getting k8scluster-id='{}' from db".format(k8scluster_id
)
1133 self
.logger
.debug(logging_text
+ step
)
1134 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": k8scluster_id
})
1135 self
.db
.encrypt_decrypt_fields(
1136 db_k8scluster
.get("credentials"),
1138 ["password", "secret"],
1139 schema_version
=db_k8scluster
["schema_version"],
1140 salt
=db_k8scluster
["_id"],
1142 k8s_credentials
= yaml
.safe_dump(db_k8scluster
.get("credentials"))
1145 init_target
= deep_get(db_k8scluster
, ("_admin", "init"))
1146 step
= "Launching k8scluster init tasks"
1148 k8s_deploy_methods
= db_k8scluster
.get("deployment_methods", {})
1149 # for backwards compatibility and all-false case
1150 if not any(k8s_deploy_methods
.values()):
1151 k8s_deploy_methods
= {
1153 "juju-bundle": True,
1154 "helm-chart-v3": True,
1156 deploy_methods
= tuple(filter(k8s_deploy_methods
.get
, k8s_deploy_methods
))
1158 for task_name
in deploy_methods
:
1159 if init_target
and task_name
not in init_target
:
1161 task
= asyncio
.ensure_future(
1162 self
.k8s_map
[task_name
].init_env(
1164 reuse_cluster_uuid
=k8scluster_id
,
1165 vca_id
=db_k8scluster
.get("vca_id"),
1168 pending_tasks
.append(task
)
1169 task2name
[task
] = task_name
1171 error_text_list
= []
1173 reached_timeout
= False
1176 while pending_tasks
:
1178 1, self
.timeout_create
- (time() - now
)
1179 ) # ensure not negative with max
1180 step
= "Waiting for k8scluster init tasks"
1181 done
, pending_tasks
= await asyncio
.wait(
1182 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
1185 # timeout. Set timeout is reached and process pending as if they hase been finished
1186 done
= pending_tasks
1187 pending_tasks
= None
1188 reached_timeout
= True
1190 task_name
= task2name
[task
]
1193 elif task
.cancelled():
1196 exc
= task
.exception()
1199 error_text_list
.append(
1200 "Failing init {}: {}".format(task_name
, exc
)
1202 db_k8scluster_update
[
1203 "_admin.{}.error_msg".format(task_name
)
1205 db_k8scluster_update
["_admin.{}.id".format(task_name
)] = None
1206 db_k8scluster_update
[
1207 "_admin.{}.operationalState".format(task_name
)
1210 logging_text
+ "{} init fail: {}".format(task_name
, exc
),
1211 exc_info
=not isinstance(exc
, (N2VCException
, str)),
1214 k8s_id
, uninstall_sw
= task
.result()
1215 tasks_name_ok
.append(task_name
)
1218 + "{} init success. id={} created={}".format(
1219 task_name
, k8s_id
, uninstall_sw
1222 db_k8scluster_update
[
1223 "_admin.{}.error_msg".format(task_name
)
1225 db_k8scluster_update
["_admin.{}.id".format(task_name
)] = k8s_id
1226 db_k8scluster_update
[
1227 "_admin.{}.created".format(task_name
)
1229 db_k8scluster_update
[
1230 "_admin.{}.operationalState".format(task_name
)
1233 step
= "Updating database for " + task_name
1234 self
.update_db_2("k8sclusters", k8scluster_id
, db_k8scluster_update
)
1236 operation_details
= "ready for " + ", ".join(tasks_name_ok
)
1237 operation_state
= "COMPLETED"
1238 db_k8scluster_update
["_admin.operationalState"] = (
1239 "ENABLED" if not error_text_list
else "DEGRADED"
1241 operation_details
+= "; " + ";".join(error_text_list
)
1243 db_k8scluster_update
["_admin.operationalState"] = "ERROR"
1244 operation_state
= "FAILED"
1245 operation_details
= ";".join(error_text_list
)
1246 db_k8scluster_update
["_admin.detailed-status"] = operation_details
1247 self
.logger
.debug(logging_text
+ "Done. Result: " + operation_state
)
1250 except Exception as e
:
1258 asyncio
.CancelledError
,
1261 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1263 self
.logger
.critical(
1264 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
1268 if exc
and db_k8scluster
:
1269 db_k8scluster_update
["_admin.operationalState"] = "ERROR"
1270 db_k8scluster_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
1273 operation_state
= "FAILED"
1274 operation_details
= "ERROR {}: {}".format(step
, exc
)
1276 if db_k8scluster
and db_k8scluster_update
:
1277 self
.update_db_2("k8sclusters", k8scluster_id
, db_k8scluster_update
)
1279 # Register the operation and unlock
1280 self
.lcm_tasks
.unlock_HA(
1284 operationState
=operation_state
,
1285 detailed_status
=operation_details
,
1287 except DbException
as e
:
1288 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1289 self
.lcm_tasks
.remove("k8scluster", k8scluster_id
, order_id
)
1291 async def edit(self
, k8scluster_content
, order_id
):
1292 op_id
= k8scluster_content
.pop("op_id", None)
1293 if not self
.lcm_tasks
.lock_HA("k8scluster", "edit", op_id
):
1296 k8scluster_id
= k8scluster_content
["_id"]
1298 logging_text
= "Task k8scluster_edit={} ".format(k8scluster_id
)
1299 self
.logger
.debug(logging_text
+ "Enter")
1301 # TODO the implementation is pending and will be part of a new feature
1302 # It will support rotation of certificates, update of credentials and K8S API endpoint
1303 # At the moment the operation is set as completed
1305 operation_state
= "COMPLETED"
1306 operation_details
= "Not implemented"
1308 self
.lcm_tasks
.unlock_HA(
1312 operationState
=operation_state
,
1313 detailed_status
=operation_details
,
1315 self
.lcm_tasks
.remove("k8scluster", k8scluster_id
, order_id
)
1317 async def delete(self
, k8scluster_content
, order_id
):
1318 # HA tasks and backward compatibility:
1319 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1320 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1321 # Register 'delete' task here for related future HA operations
1322 op_id
= k8scluster_content
.pop("op_id", None)
1323 if not self
.lcm_tasks
.lock_HA("k8scluster", "delete", op_id
):
1326 k8scluster_id
= k8scluster_content
["_id"]
1327 logging_text
= "Task k8scluster_delete={} ".format(k8scluster_id
)
1328 self
.logger
.debug(logging_text
+ "Enter")
1330 db_k8scluster
= None
1331 db_k8scluster_update
= {}
1334 step
= "Getting k8scluster='{}' from db".format(k8scluster_id
)
1335 self
.logger
.debug(logging_text
+ step
)
1336 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": k8scluster_id
})
1337 k8s_hc_id
= deep_get(db_k8scluster
, ("_admin", "helm-chart", "id"))
1338 k8s_h3c_id
= deep_get(db_k8scluster
, ("_admin", "helm-chart-v3", "id"))
1339 k8s_jb_id
= deep_get(db_k8scluster
, ("_admin", "juju-bundle", "id"))
1341 cluster_removed
= True
1342 if k8s_jb_id
: # delete in reverse order of creation
1343 step
= "Removing juju-bundle '{}'".format(k8s_jb_id
)
1345 deep_get(db_k8scluster
, ("_admin", "juju-bundle", "created"))
1348 cluster_removed
= await self
.juju_k8scluster
.reset(
1349 cluster_uuid
=k8s_jb_id
,
1350 uninstall_sw
=uninstall_sw
,
1351 vca_id
=db_k8scluster
.get("vca_id"),
1353 db_k8scluster_update
["_admin.juju-bundle.id"] = None
1354 db_k8scluster_update
["_admin.juju-bundle.operationalState"] = "DISABLED"
1357 step
= "Removing helm-chart '{}'".format(k8s_hc_id
)
1359 deep_get(db_k8scluster
, ("_admin", "helm-chart", "created"))
1362 cluster_removed
= await self
.helm2_k8scluster
.reset(
1363 cluster_uuid
=k8s_hc_id
, uninstall_sw
=uninstall_sw
1365 db_k8scluster_update
["_admin.helm-chart.id"] = None
1366 db_k8scluster_update
["_admin.helm-chart.operationalState"] = "DISABLED"
1369 step
= "Removing helm-chart-v3 '{}'".format(k8s_h3c_id
)
1371 deep_get(db_k8scluster
, ("_admin", "helm-chart-v3", "created"))
1374 cluster_removed
= await self
.helm3_k8scluster
.reset(
1375 cluster_uuid
=k8s_h3c_id
, uninstall_sw
=uninstall_sw
1377 db_k8scluster_update
["_admin.helm-chart-v3.id"] = None
1378 db_k8scluster_update
[
1379 "_admin.helm-chart-v3.operationalState"
1382 # Try to remove from cluster_inserted to clean old versions
1383 if k8s_hc_id
and cluster_removed
:
1384 step
= "Removing k8scluster='{}' from k8srepos".format(k8scluster_id
)
1385 self
.logger
.debug(logging_text
+ step
)
1386 db_k8srepo_list
= self
.db
.get_list(
1387 "k8srepos", {"_admin.cluster-inserted": k8s_hc_id
}
1389 for k8srepo
in db_k8srepo_list
:
1391 cluster_list
= k8srepo
["_admin"]["cluster-inserted"]
1392 cluster_list
.remove(k8s_hc_id
)
1396 {"_admin.cluster-inserted": cluster_list
},
1398 except Exception as e
:
1399 self
.logger
.error("{}: {}".format(step
, e
))
1400 self
.db
.del_one("k8sclusters", {"_id": k8scluster_id
})
1401 db_k8scluster_update
= None
1402 self
.logger
.debug(logging_text
+ "Done")
1404 except Exception as e
:
1412 asyncio
.CancelledError
,
1415 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1417 self
.logger
.critical(
1418 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
1422 if exc
and db_k8scluster
:
1423 db_k8scluster_update
["_admin.operationalState"] = "ERROR"
1424 db_k8scluster_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
1427 # Mark the WIM 'create' HA task as erroneous
1428 operation_state
= "FAILED"
1429 operation_details
= "ERROR {}: {}".format(step
, exc
)
1431 operation_state
= "COMPLETED"
1432 operation_details
= "deleted"
1435 if db_k8scluster_update
:
1436 self
.update_db_2("k8sclusters", k8scluster_id
, db_k8scluster_update
)
1437 # Register the K8scluster 'delete' HA task either
1438 # succesful or erroneous, or do nothing (if legacy NBI)
1439 self
.lcm_tasks
.unlock_HA(
1443 operationState
=operation_state
,
1444 detailed_status
=operation_details
,
1446 except DbException
as e
:
1447 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1448 self
.lcm_tasks
.remove("k8scluster", k8scluster_id
, order_id
)
1451 class VcaLcm(LcmBase
):
1454 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
1456 Init, Connect to database, filesystem storage, and messaging
1457 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1461 self
.logger
= logging
.getLogger("lcm.vca")
1463 self
.lcm_tasks
= lcm_tasks
1465 super().__init
__(msg
, self
.logger
)
1467 # create N2VC connector
1468 self
.n2vc
= N2VCJujuConnector(
1469 log
=self
.logger
, loop
=self
.loop
, fs
=self
.fs
, db
=self
.db
1472 def _get_vca_by_id(self
, vca_id
: str) -> dict:
1473 db_vca
= self
.db
.get_one("vca", {"_id": vca_id
})
1474 self
.db
.encrypt_decrypt_fields(
1477 ["secret", "cacert"],
1478 schema_version
=db_vca
["schema_version"],
1483 async def _validate_vca(self
, db_vca_id
: str) -> None:
1484 task
= asyncio
.ensure_future(
1486 self
.n2vc
.validate_vca(db_vca_id
),
1487 timeout
=self
.timeout_create
,
1490 await asyncio
.wait([task
], return_when
=asyncio
.FIRST_COMPLETED
)
1491 if task
.exception():
1492 raise task
.exception()
1494 def _is_vca_config_update(self
, update_options
) -> bool:
1496 word
in update_options
.keys()
1510 async def create(self
, vca_content
, order_id
):
1511 op_id
= vca_content
.pop("op_id", None)
1512 if not self
.lcm_tasks
.lock_HA("vca", "create", op_id
):
1515 vca_id
= vca_content
["_id"]
1516 self
.logger
.debug("Task vca_create={} {}".format(vca_id
, "Enter"))
1520 operation_state
= "FAILED"
1521 operation_details
= ""
1524 "Task vca_create={} {}".format(vca_id
, "Getting vca from db")
1526 db_vca
= self
._get
_vca
_by
_id
(vca_id
)
1528 await self
._validate
_vca
(db_vca
["_id"])
1530 "Task vca_create={} {}".format(
1531 vca_id
, "vca registered and validated successfully"
1534 db_vca_update
["_admin.operationalState"] = "ENABLED"
1535 db_vca_update
["_admin.detailed-status"] = "Connectivity: ok"
1536 operation_details
= "VCA validated"
1537 operation_state
= "COMPLETED"
1540 "Task vca_create={} {}".format(
1541 vca_id
, "Done. Result: {}".format(operation_state
)
1545 except Exception as e
:
1546 error_msg
= "Failed with exception: {}".format(e
)
1547 self
.logger
.error("Task vca_create={} {}".format(vca_id
, error_msg
))
1548 db_vca_update
["_admin.operationalState"] = "ERROR"
1549 db_vca_update
["_admin.detailed-status"] = error_msg
1550 operation_details
= error_msg
1553 self
.update_db_2("vca", vca_id
, db_vca_update
)
1555 # Register the operation and unlock
1556 self
.lcm_tasks
.unlock_HA(
1560 operationState
=operation_state
,
1561 detailed_status
=operation_details
,
1563 except DbException
as e
:
1565 "Task vca_create={} {}".format(
1566 vca_id
, "Cannot update database: {}".format(e
)
1569 self
.lcm_tasks
.remove("vca", vca_id
, order_id
)
1571 async def edit(self
, vca_content
, order_id
):
1572 op_id
= vca_content
.pop("op_id", None)
1573 if not self
.lcm_tasks
.lock_HA("vca", "edit", op_id
):
1576 vca_id
= vca_content
["_id"]
1577 self
.logger
.debug("Task vca_edit={} {}".format(vca_id
, "Enter"))
1582 operation_state
= "FAILED"
1583 operation_details
= ""
1586 "Task vca_edit={} {}".format(vca_id
, "Getting vca from db")
1588 db_vca
= self
._get
_vca
_by
_id
(vca_id
)
1589 if self
._is
_vca
_config
_update
(vca_content
):
1590 await self
._validate
_vca
(db_vca
["_id"])
1592 "Task vca_edit={} {}".format(
1593 vca_id
, "vca registered and validated successfully"
1596 db_vca_update
["_admin.operationalState"] = "ENABLED"
1597 db_vca_update
["_admin.detailed-status"] = "Connectivity: ok"
1599 operation_details
= "Edited"
1600 operation_state
= "COMPLETED"
1603 "Task vca_edit={} {}".format(
1604 vca_id
, "Done. Result: {}".format(operation_state
)
1608 except Exception as e
:
1609 error_msg
= "Failed with exception: {}".format(e
)
1610 self
.logger
.error("Task vca_edit={} {}".format(vca_id
, error_msg
))
1611 db_vca_update
["_admin.operationalState"] = "ERROR"
1612 db_vca_update
["_admin.detailed-status"] = error_msg
1613 operation_state
= "FAILED"
1614 operation_details
= error_msg
1617 self
.update_db_2("vca", vca_id
, db_vca_update
)
1619 # Register the operation and unlock
1620 self
.lcm_tasks
.unlock_HA(
1624 operationState
=operation_state
,
1625 detailed_status
=operation_details
,
1627 except DbException
as e
:
1629 "Task vca_edit={} {}".format(
1630 vca_id
, "Cannot update database: {}".format(e
)
1633 self
.lcm_tasks
.remove("vca", vca_id
, order_id
)
1635 async def delete(self
, vca_content
, order_id
):
1636 # HA tasks and backward compatibility:
1637 # If "vim_content" does not include "op_id", we a running a legacy NBI version.
1638 # In such a case, HA is not supported by NBI, "op_id" is None, and lock_HA() will do nothing.
1639 # Register "delete" task here for related future HA operations
1640 op_id
= vca_content
.pop("op_id", None)
1641 if not self
.lcm_tasks
.lock_HA("vca", "delete", op_id
):
1645 vca_id
= vca_content
["_id"]
1647 operation_state
= "FAILED"
1648 operation_details
= ""
1652 "Task vca_delete={} {}".format(vca_id
, "Deleting vca from db")
1654 self
.db
.del_one("vca", {"_id": vca_id
})
1655 db_vca_update
= None
1656 operation_details
= "deleted"
1657 operation_state
= "COMPLETED"
1660 "Task vca_delete={} {}".format(
1661 vca_id
, "Done. Result: {}".format(operation_state
)
1664 except Exception as e
:
1665 error_msg
= "Failed with exception: {}".format(e
)
1666 self
.logger
.error("Task vca_delete={} {}".format(vca_id
, error_msg
))
1667 db_vca_update
["_admin.operationalState"] = "ERROR"
1668 db_vca_update
["_admin.detailed-status"] = error_msg
1669 operation_details
= error_msg
1672 self
.update_db_2("vca", vca_id
, db_vca_update
)
1673 self
.lcm_tasks
.unlock_HA(
1677 operationState
=operation_state
,
1678 detailed_status
=operation_details
,
1680 except DbException
as e
:
1682 "Task vca_delete={} {}".format(
1683 vca_id
, "Cannot update database: {}".format(e
)
1686 self
.lcm_tasks
.remove("vca", vca_id
, order_id
)
1689 class K8sRepoLcm(LcmBase
):
1690 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
1692 Init, Connect to database, filesystem storage, and messaging
1693 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1697 self
.logger
= logging
.getLogger("lcm.k8srepo")
1699 self
.lcm_tasks
= lcm_tasks
1700 self
.vca_config
= config
["VCA"]
1702 super().__init
__(msg
, self
.logger
)
1704 self
.k8srepo
= K8sHelmConnector(
1705 kubectl_command
=self
.vca_config
.get("kubectlpath"),
1706 helm_command
=self
.vca_config
.get("helmpath"),
1713 async def create(self
, k8srepo_content
, order_id
):
1714 # HA tasks and backward compatibility:
1715 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1716 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1717 # Register 'create' task here for related future HA operations
1719 op_id
= k8srepo_content
.pop("op_id", None)
1720 if not self
.lcm_tasks
.lock_HA("k8srepo", "create", op_id
):
1723 k8srepo_id
= k8srepo_content
.get("_id")
1724 logging_text
= "Task k8srepo_create={} ".format(k8srepo_id
)
1725 self
.logger
.debug(logging_text
+ "Enter")
1728 db_k8srepo_update
= {}
1730 operation_state
= "COMPLETED"
1731 operation_details
= ""
1733 step
= "Getting k8srepo-id='{}' from db".format(k8srepo_id
)
1734 self
.logger
.debug(logging_text
+ step
)
1735 db_k8srepo
= self
.db
.get_one("k8srepos", {"_id": k8srepo_id
})
1736 db_k8srepo_update
["_admin.operationalState"] = "ENABLED"
1737 except Exception as e
:
1739 logging_text
+ "Exit Exception {}".format(e
),
1740 exc_info
=not isinstance(
1747 asyncio
.CancelledError
,
1753 if exc
and db_k8srepo
:
1754 db_k8srepo_update
["_admin.operationalState"] = "ERROR"
1755 db_k8srepo_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
1758 # Mark the WIM 'create' HA task as erroneous
1759 operation_state
= "FAILED"
1760 operation_details
= "ERROR {}: {}".format(step
, exc
)
1762 if db_k8srepo_update
:
1763 self
.update_db_2("k8srepos", k8srepo_id
, db_k8srepo_update
)
1764 # Register the K8srepo 'create' HA task either
1765 # succesful or erroneous, or do nothing (if legacy NBI)
1766 self
.lcm_tasks
.unlock_HA(
1770 operationState
=operation_state
,
1771 detailed_status
=operation_details
,
1773 except DbException
as e
:
1774 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1775 self
.lcm_tasks
.remove("k8srepo", k8srepo_id
, order_id
)
1777 async def delete(self
, k8srepo_content
, order_id
):
1778 # HA tasks and backward compatibility:
1779 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1780 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1781 # Register 'delete' task here for related future HA operations
1782 op_id
= k8srepo_content
.pop("op_id", None)
1783 if not self
.lcm_tasks
.lock_HA("k8srepo", "delete", op_id
):
1786 k8srepo_id
= k8srepo_content
.get("_id")
1787 logging_text
= "Task k8srepo_delete={} ".format(k8srepo_id
)
1788 self
.logger
.debug(logging_text
+ "Enter")
1791 db_k8srepo_update
= {}
1794 operation_state
= "COMPLETED"
1795 operation_details
= ""
1797 step
= "Getting k8srepo-id='{}' from db".format(k8srepo_id
)
1798 self
.logger
.debug(logging_text
+ step
)
1799 db_k8srepo
= self
.db
.get_one("k8srepos", {"_id": k8srepo_id
})
1801 except Exception as e
:
1803 logging_text
+ "Exit Exception {}".format(e
),
1804 exc_info
=not isinstance(
1811 asyncio
.CancelledError
,
1817 if exc
and db_k8srepo
:
1818 db_k8srepo_update
["_admin.operationalState"] = "ERROR"
1819 db_k8srepo_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
1822 # Mark the WIM 'create' HA task as erroneous
1823 operation_state
= "FAILED"
1824 operation_details
= "ERROR {}: {}".format(step
, exc
)
1826 if db_k8srepo_update
:
1827 self
.update_db_2("k8srepos", k8srepo_id
, db_k8srepo_update
)
1828 # Register the K8srepo 'delete' HA task either
1829 # succesful or erroneous, or do nothing (if legacy NBI)
1830 self
.lcm_tasks
.unlock_HA(
1834 operationState
=operation_state
,
1835 detailed_status
=operation_details
,
1837 self
.db
.del_one("k8srepos", {"_id": k8srepo_id
})
1838 except DbException
as e
:
1839 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1840 self
.lcm_tasks
.remove("k8srepo", k8srepo_id
, order_id
)