1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
22 import logging
.handlers
23 from osm_lcm
import ROclient
24 from osm_lcm
.lcm_utils
import LcmException
, LcmBase
, deep_get
25 from n2vc
.k8s_helm_conn
import K8sHelmConnector
26 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
27 from n2vc
.k8s_juju_conn
import K8sJujuConnector
28 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
29 from n2vc
.exceptions
import K8sException
, N2VCException
30 from osm_common
.dbbase
import DbException
31 from copy
import deepcopy
34 __author__
= "Alfonso Tierno"
37 class VimLcm(LcmBase
):
38 # values that are encrypted at vim config because they are passwords
39 vim_config_encrypted
= {
40 "1.1": ("admin_password", "nsx_password", "vcenter_password"),
49 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
51 Init, Connect to database, filesystem storage, and messaging
52 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
56 self
.logger
= logging
.getLogger("lcm.vim")
58 self
.lcm_tasks
= lcm_tasks
59 self
.ro_config
= config
["RO"]
61 super().__init
__(msg
, self
.logger
)
63 async def create(self
, vim_content
, order_id
):
65 # HA tasks and backward compatibility:
66 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
67 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
68 # Register 'create' task here for related future HA operations
69 op_id
= vim_content
.pop("op_id", None)
70 if not self
.lcm_tasks
.lock_HA("vim", "create", op_id
):
73 vim_id
= vim_content
["_id"]
74 logging_text
= "Task vim_create={} ".format(vim_id
)
75 self
.logger
.debug(logging_text
+ "Enter")
82 step
= "Getting vim-id='{}' from db".format(vim_id
)
83 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
84 if vim_content
.get("config") and vim_content
["config"].get(
87 step
= "Getting sdn-controller-id='{}' from db".format(
88 vim_content
["config"]["sdn-controller"]
90 db_sdn
= self
.db
.get_one(
91 "sdns", {"_id": vim_content
["config"]["sdn-controller"]}
94 # If the VIM account has an associated SDN account, also
95 # wait for any previous tasks in process for the SDN
96 await self
.lcm_tasks
.waitfor_related_HA("sdn", "ANY", db_sdn
["_id"])
100 and db_sdn
["_admin"].get("deployed")
101 and db_sdn
["_admin"]["deployed"].get("RO")
103 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
106 "sdn-controller={} is not available. Not deployed at RO".format(
107 vim_content
["config"]["sdn-controller"]
111 step
= "Creating vim at RO"
112 db_vim_update
["_admin.deployed.RO"] = None
113 db_vim_update
["_admin.detailed-status"] = step
114 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
115 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
116 vim_RO
= deepcopy(vim_content
)
117 vim_RO
.pop("_id", None)
118 vim_RO
.pop("_admin", None)
119 schema_version
= vim_RO
.pop("schema_version", None)
120 vim_RO
.pop("schema_type", None)
121 vim_RO
.pop("vim_tenant_name", None)
122 vim_RO
["type"] = vim_RO
.pop("vim_type")
123 vim_RO
.pop("vim_user", None)
124 vim_RO
.pop("vim_password", None)
126 vim_RO
["config"]["sdn-controller"] = RO_sdn_id
127 desc
= await RO
.create("vim", descriptor
=vim_RO
)
128 RO_vim_id
= desc
["uuid"]
129 db_vim_update
["_admin.deployed.RO"] = RO_vim_id
131 logging_text
+ "VIM created at RO_vim_id={}".format(RO_vim_id
)
134 step
= "Creating vim_account at RO"
135 db_vim_update
["_admin.detailed-status"] = step
136 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
138 if vim_content
.get("vim_password"):
139 vim_content
["vim_password"] = self
.db
.decrypt(
140 vim_content
["vim_password"],
141 schema_version
=schema_version
,
145 "vim_tenant_name": vim_content
["vim_tenant_name"],
146 "vim_username": vim_content
["vim_user"],
147 "vim_password": vim_content
["vim_password"],
149 if vim_RO
.get("config"):
150 vim_account_RO
["config"] = vim_RO
["config"]
151 if "sdn-controller" in vim_account_RO
["config"]:
152 del vim_account_RO
["config"]["sdn-controller"]
153 if "sdn-port-mapping" in vim_account_RO
["config"]:
154 del vim_account_RO
["config"]["sdn-port-mapping"]
155 vim_config_encrypted_keys
= self
.vim_config_encrypted
.get(
157 ) or self
.vim_config_encrypted
.get("default")
158 for p
in vim_config_encrypted_keys
:
159 if vim_account_RO
["config"].get(p
):
160 vim_account_RO
["config"][p
] = self
.db
.decrypt(
161 vim_account_RO
["config"][p
],
162 schema_version
=schema_version
,
166 desc
= await RO
.attach("vim_account", RO_vim_id
, descriptor
=vim_account_RO
)
167 db_vim_update
["_admin.deployed.RO-account"] = desc
["uuid"]
168 db_vim_update
["_admin.operationalState"] = "ENABLED"
169 db_vim_update
["_admin.detailed-status"] = "Done"
170 # Mark the VIM 'create' HA task as successful
171 operation_state
= "COMPLETED"
172 operation_details
= "Done"
176 + "Exit Ok VIM account created at RO_vim_account_id={}".format(
182 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
183 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
185 except Exception as e
:
186 self
.logger
.critical(
187 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
192 db_vim_update
["_admin.operationalState"] = "ERROR"
193 db_vim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
196 # Mark the VIM 'create' HA task as erroneous
197 operation_state
= "FAILED"
198 operation_details
= "ERROR {}: {}".format(step
, exc
)
201 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
202 # Register the VIM 'create' HA task either
203 # succesful or erroneous, or do nothing (if legacy NBI)
204 self
.lcm_tasks
.unlock_HA(
208 operationState
=operation_state
,
209 detailed_status
=operation_details
,
211 except DbException
as e
:
212 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
214 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
216 async def edit(self
, vim_content
, order_id
):
218 # HA tasks and backward compatibility:
219 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
220 # In such a case, HA is not supported by NBI, and the HA check always returns True
221 op_id
= vim_content
.pop("op_id", None)
222 if not self
.lcm_tasks
.lock_HA("vim", "edit", op_id
):
225 vim_id
= vim_content
["_id"]
226 logging_text
= "Task vim_edit={} ".format(vim_id
)
227 self
.logger
.debug(logging_text
+ "Enter")
234 step
= "Getting vim-id='{}' from db".format(vim_id
)
236 # wait for any previous tasks in process
237 await self
.lcm_tasks
.waitfor_related_HA("vim", "edit", op_id
)
239 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
243 and db_vim
["_admin"].get("deployed")
244 and db_vim
["_admin"]["deployed"].get("RO")
246 if vim_content
.get("config") and vim_content
["config"].get(
249 step
= "Getting sdn-controller-id='{}' from db".format(
250 vim_content
["config"]["sdn-controller"]
252 db_sdn
= self
.db
.get_one(
253 "sdns", {"_id": vim_content
["config"]["sdn-controller"]}
256 # If the VIM account has an associated SDN account, also
257 # wait for any previous tasks in process for the SDN
258 await self
.lcm_tasks
.waitfor_related_HA("sdn", "ANY", db_sdn
["_id"])
262 and db_sdn
["_admin"].get("deployed")
263 and db_sdn
["_admin"]["deployed"].get("RO")
265 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
268 "sdn-controller={} is not available. Not deployed at RO".format(
269 vim_content
["config"]["sdn-controller"]
273 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
274 step
= "Editing vim at RO"
275 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
276 vim_RO
= deepcopy(vim_content
)
277 vim_RO
.pop("_id", None)
278 vim_RO
.pop("_admin", None)
279 schema_version
= vim_RO
.pop("schema_version", None)
280 vim_RO
.pop("schema_type", None)
281 vim_RO
.pop("vim_tenant_name", None)
282 if "vim_type" in vim_RO
:
283 vim_RO
["type"] = vim_RO
.pop("vim_type")
284 vim_RO
.pop("vim_user", None)
285 vim_RO
.pop("vim_password", None)
287 vim_RO
["config"]["sdn-controller"] = RO_sdn_id
288 # TODO make a deep update of sdn-port-mapping
290 await RO
.edit("vim", RO_vim_id
, descriptor
=vim_RO
)
292 step
= "Editing vim-account at RO tenant"
294 if "config" in vim_content
:
295 if "sdn-controller" in vim_content
["config"]:
296 del vim_content
["config"]["sdn-controller"]
297 if "sdn-port-mapping" in vim_content
["config"]:
298 del vim_content
["config"]["sdn-port-mapping"]
299 if not vim_content
["config"]:
300 del vim_content
["config"]
301 if "vim_tenant_name" in vim_content
:
302 vim_account_RO
["vim_tenant_name"] = vim_content
["vim_tenant_name"]
303 if "vim_password" in vim_content
:
304 vim_account_RO
["vim_password"] = vim_content
["vim_password"]
305 if vim_content
.get("vim_password"):
306 vim_account_RO
["vim_password"] = self
.db
.decrypt(
307 vim_content
["vim_password"],
308 schema_version
=schema_version
,
311 if "config" in vim_content
:
312 vim_account_RO
["config"] = vim_content
["config"]
313 if vim_content
.get("config"):
314 vim_config_encrypted_keys
= self
.vim_config_encrypted
.get(
316 ) or self
.vim_config_encrypted
.get("default")
317 for p
in vim_config_encrypted_keys
:
318 if vim_content
["config"].get(p
):
319 vim_account_RO
["config"][p
] = self
.db
.decrypt(
320 vim_content
["config"][p
],
321 schema_version
=schema_version
,
325 if "vim_user" in vim_content
:
326 vim_content
["vim_username"] = vim_content
["vim_user"]
327 # vim_account must be edited always even if empty in order to ensure changes are translated to RO
328 # vim_thread. RO will remove and relaunch a new thread for this vim_account
329 await RO
.edit("vim_account", RO_vim_id
, descriptor
=vim_account_RO
)
330 db_vim_update
["_admin.operationalState"] = "ENABLED"
331 # Mark the VIM 'edit' HA task as successful
332 operation_state
= "COMPLETED"
333 operation_details
= "Done"
335 self
.logger
.debug(logging_text
+ "Exit Ok RO_vim_id={}".format(RO_vim_id
))
338 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
339 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
341 except Exception as e
:
342 self
.logger
.critical(
343 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
348 db_vim_update
["_admin.operationalState"] = "ERROR"
349 db_vim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
352 # Mark the VIM 'edit' HA task as erroneous
353 operation_state
= "FAILED"
354 operation_details
= "ERROR {}: {}".format(step
, exc
)
357 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
358 # Register the VIM 'edit' HA task either
359 # succesful or erroneous, or do nothing (if legacy NBI)
360 self
.lcm_tasks
.unlock_HA(
364 operationState
=operation_state
,
365 detailed_status
=operation_details
,
367 except DbException
as e
:
368 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
370 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
372 async def delete(self
, vim_content
, order_id
):
374 # HA tasks and backward compatibility:
375 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
376 # In such a case, HA is not supported by NBI, and the HA check always returns True
377 op_id
= vim_content
.pop("op_id", None)
378 if not self
.lcm_tasks
.lock_HA("vim", "delete", op_id
):
381 vim_id
= vim_content
["_id"]
382 logging_text
= "Task vim_delete={} ".format(vim_id
)
383 self
.logger
.debug(logging_text
+ "Enter")
388 step
= "Getting vim from db"
390 # wait for any previous tasks in process
391 await self
.lcm_tasks
.waitfor_related_HA("vim", "delete", op_id
)
392 if not self
.ro_config
.get("ng"):
393 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
396 and db_vim
["_admin"].get("deployed")
397 and db_vim
["_admin"]["deployed"].get("RO")
399 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
400 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
401 step
= "Detaching vim from RO tenant"
403 await RO
.detach("vim_account", RO_vim_id
)
404 except ROclient
.ROClientException
as e
:
405 if e
.http_code
== 404: # not found
408 + "RO_vim_id={} already detached".format(RO_vim_id
)
413 step
= "Deleting vim from RO"
415 await RO
.delete("vim", RO_vim_id
)
416 except ROclient
.ROClientException
as e
:
417 if e
.http_code
== 404: # not found
420 + "RO_vim_id={} already deleted".format(RO_vim_id
)
426 self
.logger
.debug(logging_text
+ "Nothing to remove at RO")
427 self
.db
.del_one("vim_accounts", {"_id": vim_id
})
429 self
.logger
.debug(logging_text
+ "Exit Ok")
432 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
433 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
435 except Exception as e
:
436 self
.logger
.critical(
437 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
441 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
443 db_vim_update
["_admin.operationalState"] = "ERROR"
444 db_vim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
447 # Mark the VIM 'delete' HA task as erroneous
448 operation_state
= "FAILED"
449 operation_details
= "ERROR {}: {}".format(step
, exc
)
450 self
.lcm_tasks
.unlock_HA(
454 operationState
=operation_state
,
455 detailed_status
=operation_details
,
458 if db_vim
and db_vim_update
:
459 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
460 # If the VIM 'delete' HA task was succesful, the DB entry has been deleted,
461 # which means that there is nowhere to register this task, so do nothing here.
462 except DbException
as e
:
463 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
464 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
467 class WimLcm(LcmBase
):
468 # values that are encrypted at wim config because they are passwords
469 wim_config_encrypted
= ()
471 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
473 Init, Connect to database, filesystem storage, and messaging
474 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
478 self
.logger
= logging
.getLogger("lcm.vim")
480 self
.lcm_tasks
= lcm_tasks
481 self
.ro_config
= config
["RO"]
483 super().__init
__(msg
, self
.logger
)
485 async def create(self
, wim_content
, order_id
):
487 # HA tasks and backward compatibility:
488 # If 'wim_content' does not include 'op_id', we a running a legacy NBI version.
489 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
490 # Register 'create' task here for related future HA operations
491 op_id
= wim_content
.pop("op_id", None)
492 self
.lcm_tasks
.lock_HA("wim", "create", op_id
)
494 wim_id
= wim_content
["_id"]
495 logging_text
= "Task wim_create={} ".format(wim_id
)
496 self
.logger
.debug(logging_text
+ "Enter")
502 step
= "Getting wim-id='{}' from db".format(wim_id
)
503 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_id
})
504 db_wim_update
["_admin.deployed.RO"] = None
506 step
= "Creating wim at RO"
507 db_wim_update
["_admin.detailed-status"] = step
508 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
509 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
510 wim_RO
= deepcopy(wim_content
)
511 wim_RO
.pop("_id", None)
512 wim_RO
.pop("_admin", None)
513 schema_version
= wim_RO
.pop("schema_version", None)
514 wim_RO
.pop("schema_type", None)
515 wim_RO
.pop("wim_tenant_name", None)
516 wim_RO
["type"] = wim_RO
.pop("wim_type")
517 wim_RO
.pop("wim_user", None)
518 wim_RO
.pop("wim_password", None)
519 desc
= await RO
.create("wim", descriptor
=wim_RO
)
520 RO_wim_id
= desc
["uuid"]
521 db_wim_update
["_admin.deployed.RO"] = RO_wim_id
523 logging_text
+ "WIM created at RO_wim_id={}".format(RO_wim_id
)
526 step
= "Creating wim_account at RO"
527 db_wim_update
["_admin.detailed-status"] = step
528 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
530 if wim_content
.get("wim_password"):
531 wim_content
["wim_password"] = self
.db
.decrypt(
532 wim_content
["wim_password"],
533 schema_version
=schema_version
,
537 "name": wim_content
["name"],
538 "user": wim_content
["user"],
539 "password": wim_content
["password"],
541 if wim_RO
.get("config"):
542 wim_account_RO
["config"] = wim_RO
["config"]
543 if "wim_port_mapping" in wim_account_RO
["config"]:
544 del wim_account_RO
["config"]["wim_port_mapping"]
545 for p
in self
.wim_config_encrypted
:
546 if wim_account_RO
["config"].get(p
):
547 wim_account_RO
["config"][p
] = self
.db
.decrypt(
548 wim_account_RO
["config"][p
],
549 schema_version
=schema_version
,
553 desc
= await RO
.attach("wim_account", RO_wim_id
, descriptor
=wim_account_RO
)
554 db_wim_update
["_admin.deployed.RO-account"] = desc
["uuid"]
555 db_wim_update
["_admin.operationalState"] = "ENABLED"
556 db_wim_update
["_admin.detailed-status"] = "Done"
557 # Mark the WIM 'create' HA task as successful
558 operation_state
= "COMPLETED"
559 operation_details
= "Done"
563 + "Exit Ok WIM account created at RO_wim_account_id={}".format(
569 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
570 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
572 except Exception as e
:
573 self
.logger
.critical(
574 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
579 db_wim_update
["_admin.operationalState"] = "ERROR"
580 db_wim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
583 # Mark the WIM 'create' HA task as erroneous
584 operation_state
= "FAILED"
585 operation_details
= "ERROR {}: {}".format(step
, exc
)
588 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
589 # Register the WIM 'create' HA task either
590 # succesful or erroneous, or do nothing (if legacy NBI)
591 self
.lcm_tasks
.unlock_HA(
595 operationState
=operation_state
,
596 detailed_status
=operation_details
,
598 except DbException
as e
:
599 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
600 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
602 async def edit(self
, wim_content
, order_id
):
604 # HA tasks and backward compatibility:
605 # If 'wim_content' does not include 'op_id', we a running a legacy NBI version.
606 # In such a case, HA is not supported by NBI, and the HA check always returns True
607 op_id
= wim_content
.pop("op_id", None)
608 if not self
.lcm_tasks
.lock_HA("wim", "edit", op_id
):
611 wim_id
= wim_content
["_id"]
612 logging_text
= "Task wim_edit={} ".format(wim_id
)
613 self
.logger
.debug(logging_text
+ "Enter")
619 step
= "Getting wim-id='{}' from db".format(wim_id
)
621 # wait for any previous tasks in process
622 await self
.lcm_tasks
.waitfor_related_HA("wim", "edit", op_id
)
624 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_id
})
628 and db_wim
["_admin"].get("deployed")
629 and db_wim
["_admin"]["deployed"].get("RO")
632 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO"]
633 step
= "Editing wim at RO"
634 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
635 wim_RO
= deepcopy(wim_content
)
636 wim_RO
.pop("_id", None)
637 wim_RO
.pop("_admin", None)
638 schema_version
= wim_RO
.pop("schema_version", None)
639 wim_RO
.pop("schema_type", None)
640 wim_RO
.pop("wim_tenant_name", None)
641 if "wim_type" in wim_RO
:
642 wim_RO
["type"] = wim_RO
.pop("wim_type")
643 wim_RO
.pop("wim_user", None)
644 wim_RO
.pop("wim_password", None)
645 # TODO make a deep update of wim_port_mapping
647 await RO
.edit("wim", RO_wim_id
, descriptor
=wim_RO
)
649 step
= "Editing wim-account at RO tenant"
651 if "config" in wim_content
:
652 if "wim_port_mapping" in wim_content
["config"]:
653 del wim_content
["config"]["wim_port_mapping"]
654 if not wim_content
["config"]:
655 del wim_content
["config"]
656 if "wim_tenant_name" in wim_content
:
657 wim_account_RO
["wim_tenant_name"] = wim_content
["wim_tenant_name"]
658 if "wim_password" in wim_content
:
659 wim_account_RO
["wim_password"] = wim_content
["wim_password"]
660 if wim_content
.get("wim_password"):
661 wim_account_RO
["wim_password"] = self
.db
.decrypt(
662 wim_content
["wim_password"],
663 schema_version
=schema_version
,
666 if "config" in wim_content
:
667 wim_account_RO
["config"] = wim_content
["config"]
668 if wim_content
.get("config"):
669 for p
in self
.wim_config_encrypted
:
670 if wim_content
["config"].get(p
):
671 wim_account_RO
["config"][p
] = self
.db
.decrypt(
672 wim_content
["config"][p
],
673 schema_version
=schema_version
,
677 if "wim_user" in wim_content
:
678 wim_content
["wim_username"] = wim_content
["wim_user"]
679 # wim_account must be edited always even if empty in order to ensure changes are translated to RO
680 # wim_thread. RO will remove and relaunch a new thread for this wim_account
681 await RO
.edit("wim_account", RO_wim_id
, descriptor
=wim_account_RO
)
682 db_wim_update
["_admin.operationalState"] = "ENABLED"
683 # Mark the WIM 'edit' HA task as successful
684 operation_state
= "COMPLETED"
685 operation_details
= "Done"
687 self
.logger
.debug(logging_text
+ "Exit Ok RO_wim_id={}".format(RO_wim_id
))
690 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
691 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
693 except Exception as e
:
694 self
.logger
.critical(
695 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
700 db_wim_update
["_admin.operationalState"] = "ERROR"
701 db_wim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
704 # Mark the WIM 'edit' HA task as erroneous
705 operation_state
= "FAILED"
706 operation_details
= "ERROR {}: {}".format(step
, exc
)
709 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
710 # Register the WIM 'edit' HA task either
711 # succesful or erroneous, or do nothing (if legacy NBI)
712 self
.lcm_tasks
.unlock_HA(
716 operationState
=operation_state
,
717 detailed_status
=operation_details
,
719 except DbException
as e
:
720 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
721 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
723 async def delete(self
, wim_content
, order_id
):
725 # HA tasks and backward compatibility:
726 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
727 # In such a case, HA is not supported by NBI, and the HA check always returns True
728 op_id
= wim_content
.pop("op_id", None)
729 if not self
.lcm_tasks
.lock_HA("wim", "delete", op_id
):
732 wim_id
= wim_content
["_id"]
733 logging_text
= "Task wim_delete={} ".format(wim_id
)
734 self
.logger
.debug(logging_text
+ "Enter")
739 step
= "Getting wim from db"
741 # wait for any previous tasks in process
742 await self
.lcm_tasks
.waitfor_related_HA("wim", "delete", op_id
)
744 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_id
})
747 and db_wim
["_admin"].get("deployed")
748 and db_wim
["_admin"]["deployed"].get("RO")
750 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO"]
751 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
752 step
= "Detaching wim from RO tenant"
754 await RO
.detach("wim_account", RO_wim_id
)
755 except ROclient
.ROClientException
as e
:
756 if e
.http_code
== 404: # not found
759 + "RO_wim_id={} already detached".format(RO_wim_id
)
764 step
= "Deleting wim from RO"
766 await RO
.delete("wim", RO_wim_id
)
767 except ROclient
.ROClientException
as e
:
768 if e
.http_code
== 404: # not found
771 + "RO_wim_id={} already deleted".format(RO_wim_id
)
777 self
.logger
.error(logging_text
+ "Nothing to remove at RO")
778 self
.db
.del_one("wim_accounts", {"_id": wim_id
})
780 self
.logger
.debug(logging_text
+ "Exit Ok")
783 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
784 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
786 except Exception as e
:
787 self
.logger
.critical(
788 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
792 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
794 db_wim_update
["_admin.operationalState"] = "ERROR"
795 db_wim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
798 # Mark the WIM 'delete' HA task as erroneous
799 operation_state
= "FAILED"
800 operation_details
= "ERROR {}: {}".format(step
, exc
)
801 self
.lcm_tasks
.unlock_HA(
805 operationState
=operation_state
,
806 detailed_status
=operation_details
,
809 if db_wim
and db_wim_update
:
810 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
811 # If the WIM 'delete' HA task was succesful, the DB entry has been deleted,
812 # which means that there is nowhere to register this task, so do nothing here.
813 except DbException
as e
:
814 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
815 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
818 class SdnLcm(LcmBase
):
819 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
821 Init, Connect to database, filesystem storage, and messaging
822 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
826 self
.logger
= logging
.getLogger("lcm.sdn")
828 self
.lcm_tasks
= lcm_tasks
829 self
.ro_config
= config
["RO"]
831 super().__init
__(msg
, self
.logger
)
833 async def create(self
, sdn_content
, order_id
):
835 # HA tasks and backward compatibility:
836 # If 'sdn_content' does not include 'op_id', we a running a legacy NBI version.
837 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
838 # Register 'create' task here for related future HA operations
839 op_id
= sdn_content
.pop("op_id", None)
840 self
.lcm_tasks
.lock_HA("sdn", "create", op_id
)
842 sdn_id
= sdn_content
["_id"]
843 logging_text
= "Task sdn_create={} ".format(sdn_id
)
844 self
.logger
.debug(logging_text
+ "Enter")
851 step
= "Getting sdn from db"
852 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
853 db_sdn_update
["_admin.deployed.RO"] = None
855 step
= "Creating sdn at RO"
856 db_sdn_update
["_admin.detailed-status"] = step
857 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
859 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
860 sdn_RO
= deepcopy(sdn_content
)
861 sdn_RO
.pop("_id", None)
862 sdn_RO
.pop("_admin", None)
863 schema_version
= sdn_RO
.pop("schema_version", None)
864 sdn_RO
.pop("schema_type", None)
865 sdn_RO
.pop("description", None)
866 if sdn_RO
.get("password"):
867 sdn_RO
["password"] = self
.db
.decrypt(
868 sdn_RO
["password"], schema_version
=schema_version
, salt
=sdn_id
871 desc
= await RO
.create("sdn", descriptor
=sdn_RO
)
872 RO_sdn_id
= desc
["uuid"]
873 db_sdn_update
["_admin.deployed.RO"] = RO_sdn_id
874 db_sdn_update
["_admin.operationalState"] = "ENABLED"
875 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id={}".format(RO_sdn_id
))
876 # Mark the SDN 'create' HA task as successful
877 operation_state
= "COMPLETED"
878 operation_details
= "Done"
881 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
882 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
884 except Exception as e
:
885 self
.logger
.critical(
886 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
891 db_sdn_update
["_admin.operationalState"] = "ERROR"
892 db_sdn_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
895 # Mark the SDN 'create' HA task as erroneous
896 operation_state
= "FAILED"
897 operation_details
= "ERROR {}: {}".format(step
, exc
)
899 if db_sdn
and db_sdn_update
:
900 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
901 # Register the SDN 'create' HA task either
902 # succesful or erroneous, or do nothing (if legacy NBI)
903 self
.lcm_tasks
.unlock_HA(
907 operationState
=operation_state
,
908 detailed_status
=operation_details
,
910 except DbException
as e
:
911 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
912 self
.lcm_tasks
.remove("sdn", sdn_id
, order_id
)
914 async def edit(self
, sdn_content
, order_id
):
916 # HA tasks and backward compatibility:
917 # If 'sdn_content' does not include 'op_id', we a running a legacy NBI version.
918 # In such a case, HA is not supported by NBI, and the HA check always returns True
919 op_id
= sdn_content
.pop("op_id", None)
920 if not self
.lcm_tasks
.lock_HA("sdn", "edit", op_id
):
923 sdn_id
= sdn_content
["_id"]
924 logging_text
= "Task sdn_edit={} ".format(sdn_id
)
925 self
.logger
.debug(logging_text
+ "Enter")
930 step
= "Getting sdn from db"
932 # wait for any previous tasks in process
933 await self
.lcm_tasks
.waitfor_related_HA("sdn", "edit", op_id
)
935 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
939 and db_sdn
["_admin"].get("deployed")
940 and db_sdn
["_admin"]["deployed"].get("RO")
942 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
943 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
944 step
= "Editing sdn at RO"
945 sdn_RO
= deepcopy(sdn_content
)
946 sdn_RO
.pop("_id", None)
947 sdn_RO
.pop("_admin", None)
948 schema_version
= sdn_RO
.pop("schema_version", None)
949 sdn_RO
.pop("schema_type", None)
950 sdn_RO
.pop("description", None)
951 if sdn_RO
.get("password"):
952 sdn_RO
["password"] = self
.db
.decrypt(
953 sdn_RO
["password"], schema_version
=schema_version
, salt
=sdn_id
956 await RO
.edit("sdn", RO_sdn_id
, descriptor
=sdn_RO
)
957 db_sdn_update
["_admin.operationalState"] = "ENABLED"
958 # Mark the SDN 'edit' HA task as successful
959 operation_state
= "COMPLETED"
960 operation_details
= "Done"
962 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id={}".format(RO_sdn_id
))
965 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
966 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
968 except Exception as e
:
969 self
.logger
.critical(
970 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
975 db_sdn
["_admin.operationalState"] = "ERROR"
976 db_sdn
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
977 # Mark the SDN 'edit' HA task as erroneous
978 operation_state
= "FAILED"
979 operation_details
= "ERROR {}: {}".format(step
, exc
)
982 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
983 # Register the SDN 'edit' HA task either
984 # succesful or erroneous, or do nothing (if legacy NBI)
985 self
.lcm_tasks
.unlock_HA(
989 operationState
=operation_state
,
990 detailed_status
=operation_details
,
992 except DbException
as e
:
993 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
994 self
.lcm_tasks
.remove("sdn", sdn_id
, order_id
)
996 async def delete(self
, sdn_content
, order_id
):
998 # HA tasks and backward compatibility:
999 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1000 # In such a case, HA is not supported by NBI, and the HA check always returns True
1001 op_id
= sdn_content
.pop("op_id", None)
1002 if not self
.lcm_tasks
.lock_HA("sdn", "delete", op_id
):
1005 sdn_id
= sdn_content
["_id"]
1006 logging_text
= "Task sdn_delete={} ".format(sdn_id
)
1007 self
.logger
.debug(logging_text
+ "Enter")
1012 step
= "Getting sdn from db"
1014 # wait for any previous tasks in process
1015 await self
.lcm_tasks
.waitfor_related_HA("sdn", "delete", op_id
)
1017 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
1019 db_sdn
.get("_admin")
1020 and db_sdn
["_admin"].get("deployed")
1021 and db_sdn
["_admin"]["deployed"].get("RO")
1023 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
1024 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
1025 step
= "Deleting sdn from RO"
1027 await RO
.delete("sdn", RO_sdn_id
)
1028 except ROclient
.ROClientException
as e
:
1029 if e
.http_code
== 404: # not found
1032 + "RO_sdn_id={} already deleted".format(RO_sdn_id
)
1039 logging_text
+ "Skipping. There is not RO information at database"
1041 self
.db
.del_one("sdns", {"_id": sdn_id
})
1043 self
.logger
.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id
))
1046 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
1047 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1049 except Exception as e
:
1050 self
.logger
.critical(
1051 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
1056 db_sdn
["_admin.operationalState"] = "ERROR"
1057 db_sdn
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
1058 # Mark the SDN 'delete' HA task as erroneous
1059 operation_state
= "FAILED"
1060 operation_details
= "ERROR {}: {}".format(step
, exc
)
1061 self
.lcm_tasks
.unlock_HA(
1065 operationState
=operation_state
,
1066 detailed_status
=operation_details
,
1069 if db_sdn
and db_sdn_update
:
1070 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
1071 # If the SDN 'delete' HA task was succesful, the DB entry has been deleted,
1072 # which means that there is nowhere to register this task, so do nothing here.
1073 except DbException
as e
:
1074 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1075 self
.lcm_tasks
.remove("sdn", sdn_id
, order_id
)
1078 class K8sClusterLcm(LcmBase
):
1079 timeout_create
= 300
1081 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
1083 Init, Connect to database, filesystem storage, and messaging
1084 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1088 self
.logger
= logging
.getLogger("lcm.k8scluster")
1090 self
.lcm_tasks
= lcm_tasks
1091 self
.vca_config
= config
["VCA"]
1093 super().__init
__(msg
, self
.logger
)
1095 self
.helm2_k8scluster
= K8sHelmConnector(
1096 kubectl_command
=self
.vca_config
.get("kubectlpath"),
1097 helm_command
=self
.vca_config
.get("helmpath"),
1104 self
.helm3_k8scluster
= K8sHelm3Connector(
1105 kubectl_command
=self
.vca_config
.get("kubectlpath"),
1106 helm_command
=self
.vca_config
.get("helm3path"),
1113 self
.juju_k8scluster
= K8sJujuConnector(
1114 kubectl_command
=self
.vca_config
.get("kubectlpath"),
1115 juju_command
=self
.vca_config
.get("jujupath"),
1124 "helm-chart": self
.helm2_k8scluster
,
1125 "helm-chart-v3": self
.helm3_k8scluster
,
1126 "juju-bundle": self
.juju_k8scluster
,
1129 async def create(self
, k8scluster_content
, order_id
):
1131 op_id
= k8scluster_content
.pop("op_id", None)
1132 if not self
.lcm_tasks
.lock_HA("k8scluster", "create", op_id
):
1135 k8scluster_id
= k8scluster_content
["_id"]
1136 logging_text
= "Task k8scluster_create={} ".format(k8scluster_id
)
1137 self
.logger
.debug(logging_text
+ "Enter")
1139 db_k8scluster
= None
1140 db_k8scluster_update
= {}
1143 step
= "Getting k8scluster-id='{}' from db".format(k8scluster_id
)
1144 self
.logger
.debug(logging_text
+ step
)
1145 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": k8scluster_id
})
1146 self
.db
.encrypt_decrypt_fields(
1147 db_k8scluster
.get("credentials"),
1149 ["password", "secret"],
1150 schema_version
=db_k8scluster
["schema_version"],
1151 salt
=db_k8scluster
["_id"],
1153 k8s_credentials
= yaml
.safe_dump(db_k8scluster
.get("credentials"))
1156 init_target
= deep_get(db_k8scluster
, ("_admin", "init"))
1157 step
= "Launching k8scluster init tasks"
1159 k8s_deploy_methods
= db_k8scluster
.get("deployment_methods", {})
1160 # for backwards compatibility and all-false case
1161 if not any(k8s_deploy_methods
.values()):
1162 k8s_deploy_methods
= {
1164 "juju-bundle": True,
1165 "helm-chart-v3": True,
1167 deploy_methods
= tuple(filter(k8s_deploy_methods
.get
, k8s_deploy_methods
))
1169 for task_name
in deploy_methods
:
1170 if init_target
and task_name
not in init_target
:
1172 task
= asyncio
.ensure_future(
1173 self
.k8s_map
[task_name
].init_env(
1175 reuse_cluster_uuid
=k8scluster_id
,
1176 vca_id
=db_k8scluster
.get("vca_id"),
1179 pending_tasks
.append(task
)
1180 task2name
[task
] = task_name
1182 error_text_list
= []
1184 reached_timeout
= False
1187 while pending_tasks
:
1189 1, self
.timeout_create
- (time() - now
)
1190 ) # ensure not negative with max
1191 step
= "Waiting for k8scluster init tasks"
1192 done
, pending_tasks
= await asyncio
.wait(
1193 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
1196 # timeout. Set timeout is reached and process pending as if they hase been finished
1197 done
= pending_tasks
1198 pending_tasks
= None
1199 reached_timeout
= True
1201 task_name
= task2name
[task
]
1204 elif task
.cancelled():
1207 exc
= task
.exception()
1210 error_text_list
.append(
1211 "Failing init {}: {}".format(task_name
, exc
)
1213 db_k8scluster_update
[
1214 "_admin.{}.error_msg".format(task_name
)
1216 db_k8scluster_update
["_admin.{}.id".format(task_name
)] = None
1217 db_k8scluster_update
[
1218 "_admin.{}.operationalState".format(task_name
)
1221 logging_text
+ "{} init fail: {}".format(task_name
, exc
),
1222 exc_info
=not isinstance(exc
, (N2VCException
, str)),
1225 k8s_id
, uninstall_sw
= task
.result()
1226 tasks_name_ok
.append(task_name
)
1229 + "{} init success. id={} created={}".format(
1230 task_name
, k8s_id
, uninstall_sw
1233 db_k8scluster_update
[
1234 "_admin.{}.error_msg".format(task_name
)
1236 db_k8scluster_update
["_admin.{}.id".format(task_name
)] = k8s_id
1237 db_k8scluster_update
[
1238 "_admin.{}.created".format(task_name
)
1240 db_k8scluster_update
[
1241 "_admin.{}.operationalState".format(task_name
)
1244 step
= "Updating database for " + task_name
1245 self
.update_db_2("k8sclusters", k8scluster_id
, db_k8scluster_update
)
1247 operation_details
= "ready for " + ", ".join(tasks_name_ok
)
1248 operation_state
= "COMPLETED"
1249 db_k8scluster_update
["_admin.operationalState"] = (
1250 "ENABLED" if not error_text_list
else "DEGRADED"
1252 operation_details
+= "; " + ";".join(error_text_list
)
1254 db_k8scluster_update
["_admin.operationalState"] = "ERROR"
1255 operation_state
= "FAILED"
1256 operation_details
= ";".join(error_text_list
)
1257 db_k8scluster_update
["_admin.detailed-status"] = operation_details
1258 self
.logger
.debug(logging_text
+ "Done. Result: " + operation_state
)
1261 except Exception as e
:
1269 asyncio
.CancelledError
,
1272 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1274 self
.logger
.critical(
1275 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
1279 if exc
and db_k8scluster
:
1280 db_k8scluster_update
["_admin.operationalState"] = "ERROR"
1281 db_k8scluster_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
1284 operation_state
= "FAILED"
1285 operation_details
= "ERROR {}: {}".format(step
, exc
)
1287 if db_k8scluster
and db_k8scluster_update
:
1288 self
.update_db_2("k8sclusters", k8scluster_id
, db_k8scluster_update
)
1290 # Register the operation and unlock
1291 self
.lcm_tasks
.unlock_HA(
1295 operationState
=operation_state
,
1296 detailed_status
=operation_details
,
1298 except DbException
as e
:
1299 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1300 self
.lcm_tasks
.remove("k8scluster", k8scluster_id
, order_id
)
1302 async def edit(self
, k8scluster_content
, order_id
):
1304 op_id
= k8scluster_content
.pop("op_id", None)
1305 if not self
.lcm_tasks
.lock_HA("k8scluster", "edit", op_id
):
1308 k8scluster_id
= k8scluster_content
["_id"]
1309 logging_text
= "Task k8scluster_edit={} ".format(k8scluster_id
)
1310 self
.logger
.debug(logging_text
+ "Enter")
1312 # TODO the implementation is pending and will be part of a new feature
1313 # It will support rotation of certificates, update of credentials and K8S API endpoint
1314 # At the moment the operation is set as completed
1316 operation_state
= "COMPLETED"
1317 operation_details
= "Not implemented"
1319 self
.lcm_tasks
.unlock_HA(
1323 operationState
=operation_state
,
1324 detailed_status
=operation_details
,
1326 self
.lcm_tasks
.remove("k8scluster", k8scluster_id
, order_id
)
1328 async def delete(self
, k8scluster_content
, order_id
):
1330 # HA tasks and backward compatibility:
1331 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1332 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1333 # Register 'delete' task here for related future HA operations
1334 op_id
= k8scluster_content
.pop("op_id", None)
1335 if not self
.lcm_tasks
.lock_HA("k8scluster", "delete", op_id
):
1338 k8scluster_id
= k8scluster_content
["_id"]
1339 logging_text
= "Task k8scluster_delete={} ".format(k8scluster_id
)
1340 self
.logger
.debug(logging_text
+ "Enter")
1342 db_k8scluster
= None
1343 db_k8scluster_update
= {}
1346 step
= "Getting k8scluster='{}' from db".format(k8scluster_id
)
1347 self
.logger
.debug(logging_text
+ step
)
1348 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": k8scluster_id
})
1349 k8s_hc_id
= deep_get(db_k8scluster
, ("_admin", "helm-chart", "id"))
1350 k8s_h3c_id
= deep_get(db_k8scluster
, ("_admin", "helm-chart-v3", "id"))
1351 k8s_jb_id
= deep_get(db_k8scluster
, ("_admin", "juju-bundle", "id"))
1353 cluster_removed
= True
1354 if k8s_jb_id
: # delete in reverse order of creation
1355 step
= "Removing juju-bundle '{}'".format(k8s_jb_id
)
1357 deep_get(db_k8scluster
, ("_admin", "juju-bundle", "created"))
1360 cluster_removed
= await self
.juju_k8scluster
.reset(
1361 cluster_uuid
=k8s_jb_id
,
1362 uninstall_sw
=uninstall_sw
,
1363 vca_id
=db_k8scluster
.get("vca_id"),
1365 db_k8scluster_update
["_admin.juju-bundle.id"] = None
1366 db_k8scluster_update
["_admin.juju-bundle.operationalState"] = "DISABLED"
1369 step
= "Removing helm-chart '{}'".format(k8s_hc_id
)
1371 deep_get(db_k8scluster
, ("_admin", "helm-chart", "created"))
1374 cluster_removed
= await self
.helm2_k8scluster
.reset(
1375 cluster_uuid
=k8s_hc_id
, uninstall_sw
=uninstall_sw
1377 db_k8scluster_update
["_admin.helm-chart.id"] = None
1378 db_k8scluster_update
["_admin.helm-chart.operationalState"] = "DISABLED"
1381 step
= "Removing helm-chart-v3 '{}'".format(k8s_hc_id
)
1383 deep_get(db_k8scluster
, ("_admin", "helm-chart-v3", "created"))
1386 cluster_removed
= await self
.helm3_k8scluster
.reset(
1387 cluster_uuid
=k8s_h3c_id
, uninstall_sw
=uninstall_sw
1389 db_k8scluster_update
["_admin.helm-chart-v3.id"] = None
1390 db_k8scluster_update
[
1391 "_admin.helm-chart-v3.operationalState"
1394 # Try to remove from cluster_inserted to clean old versions
1395 if k8s_hc_id
and cluster_removed
:
1396 step
= "Removing k8scluster='{}' from k8srepos".format(k8scluster_id
)
1397 self
.logger
.debug(logging_text
+ step
)
1398 db_k8srepo_list
= self
.db
.get_list(
1399 "k8srepos", {"_admin.cluster-inserted": k8s_hc_id
}
1401 for k8srepo
in db_k8srepo_list
:
1403 cluster_list
= k8srepo
["_admin"]["cluster-inserted"]
1404 cluster_list
.remove(k8s_hc_id
)
1408 {"_admin.cluster-inserted": cluster_list
},
1410 except Exception as e
:
1411 self
.logger
.error("{}: {}".format(step
, e
))
1412 self
.db
.del_one("k8sclusters", {"_id": k8scluster_id
})
1413 db_k8scluster_update
= None
1414 self
.logger
.debug(logging_text
+ "Done")
1416 except Exception as e
:
1424 asyncio
.CancelledError
,
1427 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1429 self
.logger
.critical(
1430 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
1434 if exc
and db_k8scluster
:
1435 db_k8scluster_update
["_admin.operationalState"] = "ERROR"
1436 db_k8scluster_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
1439 # Mark the WIM 'create' HA task as erroneous
1440 operation_state
= "FAILED"
1441 operation_details
= "ERROR {}: {}".format(step
, exc
)
1443 operation_state
= "COMPLETED"
1444 operation_details
= "deleted"
1447 if db_k8scluster_update
:
1448 self
.update_db_2("k8sclusters", k8scluster_id
, db_k8scluster_update
)
1449 # Register the K8scluster 'delete' HA task either
1450 # succesful or erroneous, or do nothing (if legacy NBI)
1451 self
.lcm_tasks
.unlock_HA(
1455 operationState
=operation_state
,
1456 detailed_status
=operation_details
,
1458 except DbException
as e
:
1459 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1460 self
.lcm_tasks
.remove("k8scluster", k8scluster_id
, order_id
)
1463 class VcaLcm(LcmBase
):
1466 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
1468 Init, Connect to database, filesystem storage, and messaging
1469 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1473 self
.logger
= logging
.getLogger("lcm.vca")
1475 self
.lcm_tasks
= lcm_tasks
1477 super().__init
__(msg
, self
.logger
)
1479 # create N2VC connector
1480 self
.n2vc
= N2VCJujuConnector(
1481 log
=self
.logger
, loop
=self
.loop
, fs
=self
.fs
, db
=self
.db
1484 def _get_vca_by_id(self
, vca_id
: str) -> dict:
1485 db_vca
= self
.db
.get_one("vca", {"_id": vca_id
})
1486 self
.db
.encrypt_decrypt_fields(
1489 ["secret", "cacert"],
1490 schema_version
=db_vca
["schema_version"],
1495 async def _validate_vca(self
, db_vca_id
: str) -> None:
1496 task
= asyncio
.ensure_future(
1498 self
.n2vc
.validate_vca(db_vca_id
),
1499 timeout
=self
.timeout_create
,
1502 await asyncio
.wait([task
], return_when
=asyncio
.FIRST_COMPLETED
)
1503 if task
.exception():
1504 raise task
.exception()
1506 def _is_vca_config_update(self
, update_options
) -> bool:
1508 word
in update_options
.keys()
1522 async def create(self
, vca_content
, order_id
):
1523 op_id
= vca_content
.pop("op_id", None)
1524 if not self
.lcm_tasks
.lock_HA("vca", "create", op_id
):
1527 vca_id
= vca_content
["_id"]
1528 self
.logger
.debug("Task vca_create={} {}".format(vca_id
, "Enter"))
1532 operation_state
= "FAILED"
1533 operation_details
= ""
1536 "Task vca_create={} {}".format(vca_id
, "Getting vca from db")
1538 db_vca
= self
._get
_vca
_by
_id
(vca_id
)
1540 await self
._validate
_vca
(db_vca
["_id"])
1542 "Task vca_create={} {}".format(
1543 vca_id
, "vca registered and validated successfully"
1546 db_vca_update
["_admin.operationalState"] = "ENABLED"
1547 db_vca_update
["_admin.detailed-status"] = "Connectivity: ok"
1548 operation_details
= "VCA validated"
1549 operation_state
= "COMPLETED"
1552 "Task vca_create={} {}".format(
1553 vca_id
, "Done. Result: {}".format(operation_state
)
1557 except Exception as e
:
1558 error_msg
= "Failed with exception: {}".format(e
)
1559 self
.logger
.error("Task vca_create={} {}".format(vca_id
, error_msg
))
1560 db_vca_update
["_admin.operationalState"] = "ERROR"
1561 db_vca_update
["_admin.detailed-status"] = error_msg
1562 operation_details
= error_msg
1565 self
.update_db_2("vca", vca_id
, db_vca_update
)
1567 # Register the operation and unlock
1568 self
.lcm_tasks
.unlock_HA(
1572 operationState
=operation_state
,
1573 detailed_status
=operation_details
,
1575 except DbException
as e
:
1577 "Task vca_create={} {}".format(
1578 vca_id
, "Cannot update database: {}".format(e
)
1581 self
.lcm_tasks
.remove("vca", vca_id
, order_id
)
1583 async def edit(self
, vca_content
, order_id
):
1584 op_id
= vca_content
.pop("op_id", None)
1585 if not self
.lcm_tasks
.lock_HA("vca", "edit", op_id
):
1588 vca_id
= vca_content
["_id"]
1589 self
.logger
.debug("Task vca_edit={} {}".format(vca_id
, "Enter"))
1594 operation_state
= "FAILED"
1595 operation_details
= ""
1598 "Task vca_edit={} {}".format(vca_id
, "Getting vca from db")
1600 db_vca
= self
._get
_vca
_by
_id
(vca_id
)
1601 if self
._is
_vca
_config
_update
(vca_content
):
1602 await self
._validate
_vca
(db_vca
["_id"])
1604 "Task vca_edit={} {}".format(
1605 vca_id
, "vca registered and validated successfully"
1608 db_vca_update
["_admin.operationalState"] = "ENABLED"
1609 db_vca_update
["_admin.detailed-status"] = "Connectivity: ok"
1611 operation_details
= "Edited"
1612 operation_state
= "COMPLETED"
1615 "Task vca_edit={} {}".format(
1616 vca_id
, "Done. Result: {}".format(operation_state
)
1620 except Exception as e
:
1621 error_msg
= "Failed with exception: {}".format(e
)
1622 self
.logger
.error("Task vca_edit={} {}".format(vca_id
, error_msg
))
1623 db_vca_update
["_admin.operationalState"] = "ERROR"
1624 db_vca_update
["_admin.detailed-status"] = error_msg
1625 operation_state
= "FAILED"
1626 operation_details
= error_msg
1629 self
.update_db_2("vca", vca_id
, db_vca_update
)
1631 # Register the operation and unlock
1632 self
.lcm_tasks
.unlock_HA(
1636 operationState
=operation_state
,
1637 detailed_status
=operation_details
,
1639 except DbException
as e
:
1641 "Task vca_edit={} {}".format(
1642 vca_id
, "Cannot update database: {}".format(e
)
1645 self
.lcm_tasks
.remove("vca", vca_id
, order_id
)
1647 async def delete(self
, vca_content
, order_id
):
1649 # HA tasks and backward compatibility:
1650 # If "vim_content" does not include "op_id", we a running a legacy NBI version.
1651 # In such a case, HA is not supported by NBI, "op_id" is None, and lock_HA() will do nothing.
1652 # Register "delete" task here for related future HA operations
1653 op_id
= vca_content
.pop("op_id", None)
1654 if not self
.lcm_tasks
.lock_HA("vca", "delete", op_id
):
1658 vca_id
= vca_content
["_id"]
1660 operation_state
= "FAILED"
1661 operation_details
= ""
1665 "Task vca_delete={} {}".format(vca_id
, "Deleting vca from db")
1667 self
.db
.del_one("vca", {"_id": vca_id
})
1668 db_vca_update
= None
1669 operation_details
= "deleted"
1670 operation_state
= "COMPLETED"
1673 "Task vca_delete={} {}".format(
1674 vca_id
, "Done. Result: {}".format(operation_state
)
1677 except Exception as e
:
1678 error_msg
= "Failed with exception: {}".format(e
)
1679 self
.logger
.error("Task vca_delete={} {}".format(vca_id
, error_msg
))
1680 db_vca_update
["_admin.operationalState"] = "ERROR"
1681 db_vca_update
["_admin.detailed-status"] = error_msg
1682 operation_details
= error_msg
1685 self
.update_db_2("vca", vca_id
, db_vca_update
)
1686 self
.lcm_tasks
.unlock_HA(
1690 operationState
=operation_state
,
1691 detailed_status
=operation_details
,
1693 except DbException
as e
:
1695 "Task vca_delete={} {}".format(
1696 vca_id
, "Cannot update database: {}".format(e
)
1699 self
.lcm_tasks
.remove("vca", vca_id
, order_id
)
1702 class K8sRepoLcm(LcmBase
):
1703 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
1705 Init, Connect to database, filesystem storage, and messaging
1706 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1710 self
.logger
= logging
.getLogger("lcm.k8srepo")
1712 self
.lcm_tasks
= lcm_tasks
1713 self
.vca_config
= config
["VCA"]
1715 super().__init
__(msg
, self
.logger
)
1717 self
.k8srepo
= K8sHelmConnector(
1718 kubectl_command
=self
.vca_config
.get("kubectlpath"),
1719 helm_command
=self
.vca_config
.get("helmpath"),
1726 async def create(self
, k8srepo_content
, order_id
):
1728 # HA tasks and backward compatibility:
1729 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1730 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1731 # Register 'create' task here for related future HA operations
1733 op_id
= k8srepo_content
.pop("op_id", None)
1734 if not self
.lcm_tasks
.lock_HA("k8srepo", "create", op_id
):
1737 k8srepo_id
= k8srepo_content
.get("_id")
1738 logging_text
= "Task k8srepo_create={} ".format(k8srepo_id
)
1739 self
.logger
.debug(logging_text
+ "Enter")
1742 db_k8srepo_update
= {}
1744 operation_state
= "COMPLETED"
1745 operation_details
= ""
1747 step
= "Getting k8srepo-id='{}' from db".format(k8srepo_id
)
1748 self
.logger
.debug(logging_text
+ step
)
1749 db_k8srepo
= self
.db
.get_one("k8srepos", {"_id": k8srepo_id
})
1750 db_k8srepo_update
["_admin.operationalState"] = "ENABLED"
1751 except Exception as e
:
1753 logging_text
+ "Exit Exception {}".format(e
),
1754 exc_info
=not isinstance(
1761 asyncio
.CancelledError
,
1767 if exc
and db_k8srepo
:
1768 db_k8srepo_update
["_admin.operationalState"] = "ERROR"
1769 db_k8srepo_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
1772 # Mark the WIM 'create' HA task as erroneous
1773 operation_state
= "FAILED"
1774 operation_details
= "ERROR {}: {}".format(step
, exc
)
1776 if db_k8srepo_update
:
1777 self
.update_db_2("k8srepos", k8srepo_id
, db_k8srepo_update
)
1778 # Register the K8srepo 'create' HA task either
1779 # succesful or erroneous, or do nothing (if legacy NBI)
1780 self
.lcm_tasks
.unlock_HA(
1784 operationState
=operation_state
,
1785 detailed_status
=operation_details
,
1787 except DbException
as e
:
1788 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1789 self
.lcm_tasks
.remove("k8srepo", k8srepo_id
, order_id
)
1791 async def delete(self
, k8srepo_content
, order_id
):
1793 # HA tasks and backward compatibility:
1794 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1795 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1796 # Register 'delete' task here for related future HA operations
1797 op_id
= k8srepo_content
.pop("op_id", None)
1798 if not self
.lcm_tasks
.lock_HA("k8srepo", "delete", op_id
):
1801 k8srepo_id
= k8srepo_content
.get("_id")
1802 logging_text
= "Task k8srepo_delete={} ".format(k8srepo_id
)
1803 self
.logger
.debug(logging_text
+ "Enter")
1806 db_k8srepo_update
= {}
1809 operation_state
= "COMPLETED"
1810 operation_details
= ""
1812 step
= "Getting k8srepo-id='{}' from db".format(k8srepo_id
)
1813 self
.logger
.debug(logging_text
+ step
)
1814 db_k8srepo
= self
.db
.get_one("k8srepos", {"_id": k8srepo_id
})
1816 except Exception as e
:
1818 logging_text
+ "Exit Exception {}".format(e
),
1819 exc_info
=not isinstance(
1826 asyncio
.CancelledError
,
1832 if exc
and db_k8srepo
:
1833 db_k8srepo_update
["_admin.operationalState"] = "ERROR"
1834 db_k8srepo_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
1837 # Mark the WIM 'create' HA task as erroneous
1838 operation_state
= "FAILED"
1839 operation_details
= "ERROR {}: {}".format(step
, exc
)
1841 if db_k8srepo_update
:
1842 self
.update_db_2("k8srepos", k8srepo_id
, db_k8srepo_update
)
1843 # Register the K8srepo 'delete' HA task either
1844 # succesful or erroneous, or do nothing (if legacy NBI)
1845 self
.lcm_tasks
.unlock_HA(
1849 operationState
=operation_state
,
1850 detailed_status
=operation_details
,
1852 self
.db
.del_one("k8srepos", {"_id": k8srepo_id
})
1853 except DbException
as e
:
1854 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1855 self
.lcm_tasks
.remove("k8srepo", k8srepo_id
, order_id
)