1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
22 import logging
.handlers
23 from osm_lcm
import ROclient
24 from osm_lcm
.lcm_utils
import LcmException
, LcmBase
, deep_get
25 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
26 from n2vc
.k8s_juju_conn
import K8sJujuConnector
27 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
28 from n2vc
.exceptions
import K8sException
, N2VCException
29 from osm_common
.dbbase
import DbException
30 from copy
import deepcopy
33 __author__
= "Alfonso Tierno"
36 class VimLcm(LcmBase
):
37 # values that are encrypted at vim config because they are passwords
38 vim_config_encrypted
= {
39 "1.1": ("admin_password", "nsx_password", "vcenter_password"),
48 def __init__(self
, msg
, lcm_tasks
, config
):
50 Init, Connect to database, filesystem storage, and messaging
51 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
55 self
.logger
= logging
.getLogger("lcm.vim")
56 self
.lcm_tasks
= lcm_tasks
57 self
.ro_config
= config
["RO"]
59 super().__init
__(msg
, self
.logger
)
61 async def create(self
, vim_content
, order_id
):
62 # HA tasks and backward compatibility:
63 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
64 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
65 # Register 'create' task here for related future HA operations
66 op_id
= vim_content
.pop("op_id", None)
67 if not self
.lcm_tasks
.lock_HA("vim", "create", op_id
):
70 vim_id
= vim_content
["_id"]
71 logging_text
= "Task vim_create={} ".format(vim_id
)
72 self
.logger
.debug(logging_text
+ "Enter")
79 step
= "Getting vim-id='{}' from db".format(vim_id
)
80 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
81 if vim_content
.get("config") and vim_content
["config"].get(
84 step
= "Getting sdn-controller-id='{}' from db".format(
85 vim_content
["config"]["sdn-controller"]
87 db_sdn
= self
.db
.get_one(
88 "sdns", {"_id": vim_content
["config"]["sdn-controller"]}
91 # If the VIM account has an associated SDN account, also
92 # wait for any previous tasks in process for the SDN
93 await self
.lcm_tasks
.waitfor_related_HA("sdn", "ANY", db_sdn
["_id"])
97 and db_sdn
["_admin"].get("deployed")
98 and db_sdn
["_admin"]["deployed"].get("RO")
100 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
103 "sdn-controller={} is not available. Not deployed at RO".format(
104 vim_content
["config"]["sdn-controller"]
108 step
= "Creating vim at RO"
109 db_vim_update
["_admin.deployed.RO"] = None
110 db_vim_update
["_admin.detailed-status"] = step
111 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
112 RO
= ROclient
.ROClient(**self
.ro_config
)
113 vim_RO
= deepcopy(vim_content
)
114 vim_RO
.pop("_id", None)
115 vim_RO
.pop("_admin", None)
116 schema_version
= vim_RO
.pop("schema_version", None)
117 vim_RO
.pop("schema_type", None)
118 vim_RO
.pop("vim_tenant_name", None)
119 vim_RO
["type"] = vim_RO
.pop("vim_type")
120 vim_RO
.pop("vim_user", None)
121 vim_RO
.pop("vim_password", None)
123 vim_RO
["config"]["sdn-controller"] = RO_sdn_id
124 desc
= await RO
.create("vim", descriptor
=vim_RO
)
125 RO_vim_id
= desc
["uuid"]
126 db_vim_update
["_admin.deployed.RO"] = RO_vim_id
128 logging_text
+ "VIM created at RO_vim_id={}".format(RO_vim_id
)
131 step
= "Creating vim_account at RO"
132 db_vim_update
["_admin.detailed-status"] = step
133 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
135 if vim_content
.get("vim_password"):
136 vim_content
["vim_password"] = self
.db
.decrypt(
137 vim_content
["vim_password"],
138 schema_version
=schema_version
,
142 "vim_tenant_name": vim_content
["vim_tenant_name"],
143 "vim_username": vim_content
["vim_user"],
144 "vim_password": vim_content
["vim_password"],
146 if vim_RO
.get("config"):
147 vim_account_RO
["config"] = vim_RO
["config"]
148 if "sdn-controller" in vim_account_RO
["config"]:
149 del vim_account_RO
["config"]["sdn-controller"]
150 if "sdn-port-mapping" in vim_account_RO
["config"]:
151 del vim_account_RO
["config"]["sdn-port-mapping"]
152 vim_config_encrypted_keys
= self
.vim_config_encrypted
.get(
154 ) or self
.vim_config_encrypted
.get("default")
155 for p
in vim_config_encrypted_keys
:
156 if vim_account_RO
["config"].get(p
):
157 vim_account_RO
["config"][p
] = self
.db
.decrypt(
158 vim_account_RO
["config"][p
],
159 schema_version
=schema_version
,
163 desc
= await RO
.attach("vim_account", RO_vim_id
, descriptor
=vim_account_RO
)
164 db_vim_update
["_admin.deployed.RO-account"] = desc
["uuid"]
165 db_vim_update
["_admin.operationalState"] = "ENABLED"
166 db_vim_update
["_admin.detailed-status"] = "Done"
167 # Mark the VIM 'create' HA task as successful
168 operation_state
= "COMPLETED"
169 operation_details
= "Done"
173 + "Exit Ok VIM account created at RO_vim_account_id={}".format(
179 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
180 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
182 except Exception as e
:
183 self
.logger
.critical(
184 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
189 db_vim_update
["_admin.operationalState"] = "ERROR"
190 db_vim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
193 # Mark the VIM 'create' HA task as erroneous
194 operation_state
= "FAILED"
195 operation_details
= "ERROR {}: {}".format(step
, exc
)
198 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
199 # Register the VIM 'create' HA task either
200 # succesful or erroneous, or do nothing (if legacy NBI)
201 self
.lcm_tasks
.unlock_HA(
205 operationState
=operation_state
,
206 detailed_status
=operation_details
,
208 except DbException
as e
:
209 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
211 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
213 async def edit(self
, vim_content
, order_id
):
214 # HA tasks and backward compatibility:
215 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
216 # In such a case, HA is not supported by NBI, and the HA check always returns True
217 op_id
= vim_content
.pop("op_id", None)
218 if not self
.lcm_tasks
.lock_HA("vim", "edit", op_id
):
221 vim_id
= vim_content
["_id"]
222 logging_text
= "Task vim_edit={} ".format(vim_id
)
223 self
.logger
.debug(logging_text
+ "Enter")
230 step
= "Getting vim-id='{}' from db".format(vim_id
)
232 # wait for any previous tasks in process
233 await self
.lcm_tasks
.waitfor_related_HA("vim", "edit", op_id
)
235 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
239 and db_vim
["_admin"].get("deployed")
240 and db_vim
["_admin"]["deployed"].get("RO")
242 if vim_content
.get("config") and vim_content
["config"].get(
245 step
= "Getting sdn-controller-id='{}' from db".format(
246 vim_content
["config"]["sdn-controller"]
248 db_sdn
= self
.db
.get_one(
249 "sdns", {"_id": vim_content
["config"]["sdn-controller"]}
252 # If the VIM account has an associated SDN account, also
253 # wait for any previous tasks in process for the SDN
254 await self
.lcm_tasks
.waitfor_related_HA("sdn", "ANY", db_sdn
["_id"])
258 and db_sdn
["_admin"].get("deployed")
259 and db_sdn
["_admin"]["deployed"].get("RO")
261 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
264 "sdn-controller={} is not available. Not deployed at RO".format(
265 vim_content
["config"]["sdn-controller"]
269 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
270 step
= "Editing vim at RO"
271 RO
= ROclient
.ROClient(**self
.ro_config
)
272 vim_RO
= deepcopy(vim_content
)
273 vim_RO
.pop("_id", None)
274 vim_RO
.pop("_admin", None)
275 schema_version
= vim_RO
.pop("schema_version", None)
276 vim_RO
.pop("schema_type", None)
277 vim_RO
.pop("vim_tenant_name", None)
278 if "vim_type" in vim_RO
:
279 vim_RO
["type"] = vim_RO
.pop("vim_type")
280 vim_RO
.pop("vim_user", None)
281 vim_RO
.pop("vim_password", None)
283 vim_RO
["config"]["sdn-controller"] = RO_sdn_id
284 # TODO make a deep update of sdn-port-mapping
286 await RO
.edit("vim", RO_vim_id
, descriptor
=vim_RO
)
288 step
= "Editing vim-account at RO tenant"
290 if "config" in vim_content
:
291 if "sdn-controller" in vim_content
["config"]:
292 del vim_content
["config"]["sdn-controller"]
293 if "sdn-port-mapping" in vim_content
["config"]:
294 del vim_content
["config"]["sdn-port-mapping"]
295 if not vim_content
["config"]:
296 del vim_content
["config"]
297 if "vim_tenant_name" in vim_content
:
298 vim_account_RO
["vim_tenant_name"] = vim_content
["vim_tenant_name"]
299 if "vim_password" in vim_content
:
300 vim_account_RO
["vim_password"] = vim_content
["vim_password"]
301 if vim_content
.get("vim_password"):
302 vim_account_RO
["vim_password"] = self
.db
.decrypt(
303 vim_content
["vim_password"],
304 schema_version
=schema_version
,
307 if "config" in vim_content
:
308 vim_account_RO
["config"] = vim_content
["config"]
309 if vim_content
.get("config"):
310 vim_config_encrypted_keys
= self
.vim_config_encrypted
.get(
312 ) or self
.vim_config_encrypted
.get("default")
313 for p
in vim_config_encrypted_keys
:
314 if vim_content
["config"].get(p
):
315 vim_account_RO
["config"][p
] = self
.db
.decrypt(
316 vim_content
["config"][p
],
317 schema_version
=schema_version
,
321 if "vim_user" in vim_content
:
322 vim_content
["vim_username"] = vim_content
["vim_user"]
323 # vim_account must be edited always even if empty in order to ensure changes are translated to RO
324 # vim_thread. RO will remove and relaunch a new thread for this vim_account
325 await RO
.edit("vim_account", RO_vim_id
, descriptor
=vim_account_RO
)
326 db_vim_update
["_admin.operationalState"] = "ENABLED"
327 # Mark the VIM 'edit' HA task as successful
328 operation_state
= "COMPLETED"
329 operation_details
= "Done"
331 self
.logger
.debug(logging_text
+ "Exit Ok RO_vim_id={}".format(RO_vim_id
))
334 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
335 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
337 except Exception as e
:
338 self
.logger
.critical(
339 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
344 db_vim_update
["_admin.operationalState"] = "ERROR"
345 db_vim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
348 # Mark the VIM 'edit' HA task as erroneous
349 operation_state
= "FAILED"
350 operation_details
= "ERROR {}: {}".format(step
, exc
)
353 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
354 # Register the VIM 'edit' HA task either
355 # succesful or erroneous, or do nothing (if legacy NBI)
356 self
.lcm_tasks
.unlock_HA(
360 operationState
=operation_state
,
361 detailed_status
=operation_details
,
363 except DbException
as e
:
364 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
366 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
368 async def delete(self
, vim_content
, order_id
):
369 # HA tasks and backward compatibility:
370 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
371 # In such a case, HA is not supported by NBI, and the HA check always returns True
372 op_id
= vim_content
.pop("op_id", None)
373 if not self
.lcm_tasks
.lock_HA("vim", "delete", op_id
):
376 vim_id
= vim_content
["_id"]
377 logging_text
= "Task vim_delete={} ".format(vim_id
)
378 self
.logger
.debug(logging_text
+ "Enter")
383 step
= "Getting vim from db"
385 # wait for any previous tasks in process
386 await self
.lcm_tasks
.waitfor_related_HA("vim", "delete", op_id
)
387 if not self
.ro_config
.get("ng"):
388 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
391 and db_vim
["_admin"].get("deployed")
392 and db_vim
["_admin"]["deployed"].get("RO")
394 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
395 RO
= ROclient
.ROClient(**self
.ro_config
)
396 step
= "Detaching vim from RO tenant"
398 await RO
.detach("vim_account", RO_vim_id
)
399 except ROclient
.ROClientException
as e
:
400 if e
.http_code
== 404: # not found
403 + "RO_vim_id={} already detached".format(RO_vim_id
)
408 step
= "Deleting vim from RO"
410 await RO
.delete("vim", RO_vim_id
)
411 except ROclient
.ROClientException
as e
:
412 if e
.http_code
== 404: # not found
415 + "RO_vim_id={} already deleted".format(RO_vim_id
)
421 self
.logger
.debug(logging_text
+ "Nothing to remove at RO")
422 self
.db
.del_one("vim_accounts", {"_id": vim_id
})
424 self
.logger
.debug(logging_text
+ "Exit Ok")
427 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
428 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
430 except Exception as e
:
431 self
.logger
.critical(
432 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
436 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
438 db_vim_update
["_admin.operationalState"] = "ERROR"
439 db_vim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
442 # Mark the VIM 'delete' HA task as erroneous
443 operation_state
= "FAILED"
444 operation_details
= "ERROR {}: {}".format(step
, exc
)
445 self
.lcm_tasks
.unlock_HA(
449 operationState
=operation_state
,
450 detailed_status
=operation_details
,
453 if db_vim
and db_vim_update
:
454 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
455 # If the VIM 'delete' HA task was succesful, the DB entry has been deleted,
456 # which means that there is nowhere to register this task, so do nothing here.
457 except DbException
as e
:
458 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
459 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
462 class WimLcm(LcmBase
):
463 # values that are encrypted at wim config because they are passwords
464 wim_config_encrypted
= ()
466 def __init__(self
, msg
, lcm_tasks
, config
):
468 Init, Connect to database, filesystem storage, and messaging
469 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
473 self
.logger
= logging
.getLogger("lcm.vim")
474 self
.lcm_tasks
= lcm_tasks
475 self
.ro_config
= config
["RO"]
477 super().__init
__(msg
, self
.logger
)
479 async def create(self
, wim_content
, order_id
):
480 # HA tasks and backward compatibility:
481 # If 'wim_content' does not include 'op_id', we a running a legacy NBI version.
482 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
483 # Register 'create' task here for related future HA operations
484 op_id
= wim_content
.pop("op_id", None)
485 self
.lcm_tasks
.lock_HA("wim", "create", op_id
)
487 wim_id
= wim_content
["_id"]
488 logging_text
= "Task wim_create={} ".format(wim_id
)
489 self
.logger
.debug(logging_text
+ "Enter")
495 step
= "Getting wim-id='{}' from db".format(wim_id
)
496 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_id
})
497 db_wim_update
["_admin.deployed.RO"] = None
499 step
= "Creating wim at RO"
500 db_wim_update
["_admin.detailed-status"] = step
501 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
502 RO
= ROclient
.ROClient(**self
.ro_config
)
503 wim_RO
= deepcopy(wim_content
)
504 wim_RO
.pop("_id", None)
505 wim_RO
.pop("_admin", None)
506 schema_version
= wim_RO
.pop("schema_version", None)
507 wim_RO
.pop("schema_type", None)
508 wim_RO
.pop("wim_tenant_name", None)
509 wim_RO
["type"] = wim_RO
.pop("wim_type")
510 wim_RO
.pop("wim_user", None)
511 wim_RO
.pop("wim_password", None)
512 desc
= await RO
.create("wim", descriptor
=wim_RO
)
513 RO_wim_id
= desc
["uuid"]
514 db_wim_update
["_admin.deployed.RO"] = RO_wim_id
516 logging_text
+ "WIM created at RO_wim_id={}".format(RO_wim_id
)
519 step
= "Creating wim_account at RO"
520 db_wim_update
["_admin.detailed-status"] = step
521 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
523 if wim_content
.get("wim_password"):
524 wim_content
["wim_password"] = self
.db
.decrypt(
525 wim_content
["wim_password"],
526 schema_version
=schema_version
,
530 "name": wim_content
["name"],
531 "user": wim_content
["user"],
532 "password": wim_content
["password"],
534 if wim_RO
.get("config"):
535 wim_account_RO
["config"] = wim_RO
["config"]
536 if "wim_port_mapping" in wim_account_RO
["config"]:
537 del wim_account_RO
["config"]["wim_port_mapping"]
538 for p
in self
.wim_config_encrypted
:
539 if wim_account_RO
["config"].get(p
):
540 wim_account_RO
["config"][p
] = self
.db
.decrypt(
541 wim_account_RO
["config"][p
],
542 schema_version
=schema_version
,
546 desc
= await RO
.attach("wim_account", RO_wim_id
, descriptor
=wim_account_RO
)
547 db_wim_update
["_admin.deployed.RO-account"] = desc
["uuid"]
548 db_wim_update
["_admin.operationalState"] = "ENABLED"
549 db_wim_update
["_admin.detailed-status"] = "Done"
550 # Mark the WIM 'create' HA task as successful
551 operation_state
= "COMPLETED"
552 operation_details
= "Done"
556 + "Exit Ok WIM account created at RO_wim_account_id={}".format(
562 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
563 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
565 except Exception as e
:
566 self
.logger
.critical(
567 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
572 db_wim_update
["_admin.operationalState"] = "ERROR"
573 db_wim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
576 # Mark the WIM 'create' HA task as erroneous
577 operation_state
= "FAILED"
578 operation_details
= "ERROR {}: {}".format(step
, exc
)
581 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
582 # Register the WIM 'create' HA task either
583 # succesful or erroneous, or do nothing (if legacy NBI)
584 self
.lcm_tasks
.unlock_HA(
588 operationState
=operation_state
,
589 detailed_status
=operation_details
,
591 except DbException
as e
:
592 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
593 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
595 async def edit(self
, wim_content
, order_id
):
596 # HA tasks and backward compatibility:
597 # If 'wim_content' does not include 'op_id', we a running a legacy NBI version.
598 # In such a case, HA is not supported by NBI, and the HA check always returns True
599 op_id
= wim_content
.pop("op_id", None)
600 if not self
.lcm_tasks
.lock_HA("wim", "edit", op_id
):
603 wim_id
= wim_content
["_id"]
604 logging_text
= "Task wim_edit={} ".format(wim_id
)
605 self
.logger
.debug(logging_text
+ "Enter")
611 step
= "Getting wim-id='{}' from db".format(wim_id
)
613 # wait for any previous tasks in process
614 await self
.lcm_tasks
.waitfor_related_HA("wim", "edit", op_id
)
616 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_id
})
620 and db_wim
["_admin"].get("deployed")
621 and db_wim
["_admin"]["deployed"].get("RO")
623 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO"]
624 step
= "Editing wim at RO"
625 RO
= ROclient
.ROClient(**self
.ro_config
)
626 wim_RO
= deepcopy(wim_content
)
627 wim_RO
.pop("_id", None)
628 wim_RO
.pop("_admin", None)
629 schema_version
= wim_RO
.pop("schema_version", None)
630 wim_RO
.pop("schema_type", None)
631 wim_RO
.pop("wim_tenant_name", None)
632 if "wim_type" in wim_RO
:
633 wim_RO
["type"] = wim_RO
.pop("wim_type")
634 wim_RO
.pop("wim_user", None)
635 wim_RO
.pop("wim_password", None)
636 # TODO make a deep update of wim_port_mapping
638 await RO
.edit("wim", RO_wim_id
, descriptor
=wim_RO
)
640 step
= "Editing wim-account at RO tenant"
642 if "config" in wim_content
:
643 if "wim_port_mapping" in wim_content
["config"]:
644 del wim_content
["config"]["wim_port_mapping"]
645 if not wim_content
["config"]:
646 del wim_content
["config"]
647 if "wim_tenant_name" in wim_content
:
648 wim_account_RO
["wim_tenant_name"] = wim_content
["wim_tenant_name"]
649 if "wim_password" in wim_content
:
650 wim_account_RO
["wim_password"] = wim_content
["wim_password"]
651 if wim_content
.get("wim_password"):
652 wim_account_RO
["wim_password"] = self
.db
.decrypt(
653 wim_content
["wim_password"],
654 schema_version
=schema_version
,
657 if "config" in wim_content
:
658 wim_account_RO
["config"] = wim_content
["config"]
659 if wim_content
.get("config"):
660 for p
in self
.wim_config_encrypted
:
661 if wim_content
["config"].get(p
):
662 wim_account_RO
["config"][p
] = self
.db
.decrypt(
663 wim_content
["config"][p
],
664 schema_version
=schema_version
,
668 if "wim_user" in wim_content
:
669 wim_content
["wim_username"] = wim_content
["wim_user"]
670 # wim_account must be edited always even if empty in order to ensure changes are translated to RO
671 # wim_thread. RO will remove and relaunch a new thread for this wim_account
672 await RO
.edit("wim_account", RO_wim_id
, descriptor
=wim_account_RO
)
673 db_wim_update
["_admin.operationalState"] = "ENABLED"
674 # Mark the WIM 'edit' HA task as successful
675 operation_state
= "COMPLETED"
676 operation_details
= "Done"
678 self
.logger
.debug(logging_text
+ "Exit Ok RO_wim_id={}".format(RO_wim_id
))
681 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
682 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
684 except Exception as e
:
685 self
.logger
.critical(
686 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
691 db_wim_update
["_admin.operationalState"] = "ERROR"
692 db_wim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
695 # Mark the WIM 'edit' HA task as erroneous
696 operation_state
= "FAILED"
697 operation_details
= "ERROR {}: {}".format(step
, exc
)
700 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
701 # Register the WIM 'edit' HA task either
702 # succesful or erroneous, or do nothing (if legacy NBI)
703 self
.lcm_tasks
.unlock_HA(
707 operationState
=operation_state
,
708 detailed_status
=operation_details
,
710 except DbException
as e
:
711 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
712 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
714 async def delete(self
, wim_content
, order_id
):
715 # HA tasks and backward compatibility:
716 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
717 # In such a case, HA is not supported by NBI, and the HA check always returns True
718 op_id
= wim_content
.pop("op_id", None)
719 if not self
.lcm_tasks
.lock_HA("wim", "delete", op_id
):
722 wim_id
= wim_content
["_id"]
723 logging_text
= "Task wim_delete={} ".format(wim_id
)
724 self
.logger
.debug(logging_text
+ "Enter")
729 step
= "Getting wim from db"
731 # wait for any previous tasks in process
732 await self
.lcm_tasks
.waitfor_related_HA("wim", "delete", op_id
)
734 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_id
})
737 and db_wim
["_admin"].get("deployed")
738 and db_wim
["_admin"]["deployed"].get("RO")
740 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO"]
741 RO
= ROclient
.ROClient(**self
.ro_config
)
742 step
= "Detaching wim from RO tenant"
744 await RO
.detach("wim_account", RO_wim_id
)
745 except ROclient
.ROClientException
as e
:
746 if e
.http_code
== 404: # not found
749 + "RO_wim_id={} already detached".format(RO_wim_id
)
754 step
= "Deleting wim from RO"
756 await RO
.delete("wim", RO_wim_id
)
757 except ROclient
.ROClientException
as e
:
758 if e
.http_code
== 404: # not found
761 + "RO_wim_id={} already deleted".format(RO_wim_id
)
767 self
.logger
.error(logging_text
+ "Nothing to remove at RO")
768 self
.db
.del_one("wim_accounts", {"_id": wim_id
})
770 self
.logger
.debug(logging_text
+ "Exit Ok")
773 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
774 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
776 except Exception as e
:
777 self
.logger
.critical(
778 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
782 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
784 db_wim_update
["_admin.operationalState"] = "ERROR"
785 db_wim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
788 # Mark the WIM 'delete' HA task as erroneous
789 operation_state
= "FAILED"
790 operation_details
= "ERROR {}: {}".format(step
, exc
)
791 self
.lcm_tasks
.unlock_HA(
795 operationState
=operation_state
,
796 detailed_status
=operation_details
,
799 if db_wim
and db_wim_update
:
800 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
801 # If the WIM 'delete' HA task was succesful, the DB entry has been deleted,
802 # which means that there is nowhere to register this task, so do nothing here.
803 except DbException
as e
:
804 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
805 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
808 class SdnLcm(LcmBase
):
809 def __init__(self
, msg
, lcm_tasks
, config
):
811 Init, Connect to database, filesystem storage, and messaging
812 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
816 self
.logger
= logging
.getLogger("lcm.sdn")
817 self
.lcm_tasks
= lcm_tasks
818 self
.ro_config
= config
["RO"]
820 super().__init
__(msg
, self
.logger
)
822 async def create(self
, sdn_content
, order_id
):
823 # HA tasks and backward compatibility:
824 # If 'sdn_content' does not include 'op_id', we a running a legacy NBI version.
825 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
826 # Register 'create' task here for related future HA operations
827 op_id
= sdn_content
.pop("op_id", None)
828 self
.lcm_tasks
.lock_HA("sdn", "create", op_id
)
830 sdn_id
= sdn_content
["_id"]
831 logging_text
= "Task sdn_create={} ".format(sdn_id
)
832 self
.logger
.debug(logging_text
+ "Enter")
839 step
= "Getting sdn from db"
840 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
841 db_sdn_update
["_admin.deployed.RO"] = None
843 step
= "Creating sdn at RO"
844 db_sdn_update
["_admin.detailed-status"] = step
845 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
847 RO
= ROclient
.ROClient(**self
.ro_config
)
848 sdn_RO
= deepcopy(sdn_content
)
849 sdn_RO
.pop("_id", None)
850 sdn_RO
.pop("_admin", None)
851 schema_version
= sdn_RO
.pop("schema_version", None)
852 sdn_RO
.pop("schema_type", None)
853 sdn_RO
.pop("description", None)
854 if sdn_RO
.get("password"):
855 sdn_RO
["password"] = self
.db
.decrypt(
856 sdn_RO
["password"], schema_version
=schema_version
, salt
=sdn_id
859 desc
= await RO
.create("sdn", descriptor
=sdn_RO
)
860 RO_sdn_id
= desc
["uuid"]
861 db_sdn_update
["_admin.deployed.RO"] = RO_sdn_id
862 db_sdn_update
["_admin.operationalState"] = "ENABLED"
863 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id={}".format(RO_sdn_id
))
864 # Mark the SDN 'create' HA task as successful
865 operation_state
= "COMPLETED"
866 operation_details
= "Done"
869 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
870 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
872 except Exception as e
:
873 self
.logger
.critical(
874 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
879 db_sdn_update
["_admin.operationalState"] = "ERROR"
880 db_sdn_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
883 # Mark the SDN 'create' HA task as erroneous
884 operation_state
= "FAILED"
885 operation_details
= "ERROR {}: {}".format(step
, exc
)
887 if db_sdn
and db_sdn_update
:
888 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
889 # Register the SDN 'create' HA task either
890 # succesful or erroneous, or do nothing (if legacy NBI)
891 self
.lcm_tasks
.unlock_HA(
895 operationState
=operation_state
,
896 detailed_status
=operation_details
,
898 except DbException
as e
:
899 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
900 self
.lcm_tasks
.remove("sdn", sdn_id
, order_id
)
902 async def edit(self
, sdn_content
, order_id
):
903 # HA tasks and backward compatibility:
904 # If 'sdn_content' does not include 'op_id', we a running a legacy NBI version.
905 # In such a case, HA is not supported by NBI, and the HA check always returns True
906 op_id
= sdn_content
.pop("op_id", None)
907 if not self
.lcm_tasks
.lock_HA("sdn", "edit", op_id
):
910 sdn_id
= sdn_content
["_id"]
911 logging_text
= "Task sdn_edit={} ".format(sdn_id
)
912 self
.logger
.debug(logging_text
+ "Enter")
917 step
= "Getting sdn from db"
919 # wait for any previous tasks in process
920 await self
.lcm_tasks
.waitfor_related_HA("sdn", "edit", op_id
)
922 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
926 and db_sdn
["_admin"].get("deployed")
927 and db_sdn
["_admin"]["deployed"].get("RO")
929 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
930 RO
= ROclient
.ROClient(**self
.ro_config
)
931 step
= "Editing sdn at RO"
932 sdn_RO
= deepcopy(sdn_content
)
933 sdn_RO
.pop("_id", None)
934 sdn_RO
.pop("_admin", None)
935 schema_version
= sdn_RO
.pop("schema_version", None)
936 sdn_RO
.pop("schema_type", None)
937 sdn_RO
.pop("description", None)
938 if sdn_RO
.get("password"):
939 sdn_RO
["password"] = self
.db
.decrypt(
940 sdn_RO
["password"], schema_version
=schema_version
, salt
=sdn_id
943 await RO
.edit("sdn", RO_sdn_id
, descriptor
=sdn_RO
)
944 db_sdn_update
["_admin.operationalState"] = "ENABLED"
945 # Mark the SDN 'edit' HA task as successful
946 operation_state
= "COMPLETED"
947 operation_details
= "Done"
949 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id={}".format(RO_sdn_id
))
952 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
953 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
955 except Exception as e
:
956 self
.logger
.critical(
957 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
962 db_sdn
["_admin.operationalState"] = "ERROR"
963 db_sdn
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
964 # Mark the SDN 'edit' HA task as erroneous
965 operation_state
= "FAILED"
966 operation_details
= "ERROR {}: {}".format(step
, exc
)
969 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
970 # Register the SDN 'edit' HA task either
971 # succesful or erroneous, or do nothing (if legacy NBI)
972 self
.lcm_tasks
.unlock_HA(
976 operationState
=operation_state
,
977 detailed_status
=operation_details
,
979 except DbException
as e
:
980 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
981 self
.lcm_tasks
.remove("sdn", sdn_id
, order_id
)
983 async def delete(self
, sdn_content
, order_id
):
984 # HA tasks and backward compatibility:
985 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
986 # In such a case, HA is not supported by NBI, and the HA check always returns True
987 op_id
= sdn_content
.pop("op_id", None)
988 if not self
.lcm_tasks
.lock_HA("sdn", "delete", op_id
):
991 sdn_id
= sdn_content
["_id"]
992 logging_text
= "Task sdn_delete={} ".format(sdn_id
)
993 self
.logger
.debug(logging_text
+ "Enter")
998 step
= "Getting sdn from db"
1000 # wait for any previous tasks in process
1001 await self
.lcm_tasks
.waitfor_related_HA("sdn", "delete", op_id
)
1003 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
1005 db_sdn
.get("_admin")
1006 and db_sdn
["_admin"].get("deployed")
1007 and db_sdn
["_admin"]["deployed"].get("RO")
1009 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
1010 RO
= ROclient
.ROClient(**self
.ro_config
)
1011 step
= "Deleting sdn from RO"
1013 await RO
.delete("sdn", RO_sdn_id
)
1014 except ROclient
.ROClientException
as e
:
1015 if e
.http_code
== 404: # not found
1018 + "RO_sdn_id={} already deleted".format(RO_sdn_id
)
1025 logging_text
+ "Skipping. There is not RO information at database"
1027 self
.db
.del_one("sdns", {"_id": sdn_id
})
1029 self
.logger
.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id
))
1032 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
1033 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1035 except Exception as e
:
1036 self
.logger
.critical(
1037 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
1042 db_sdn
["_admin.operationalState"] = "ERROR"
1043 db_sdn
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
1044 # Mark the SDN 'delete' HA task as erroneous
1045 operation_state
= "FAILED"
1046 operation_details
= "ERROR {}: {}".format(step
, exc
)
1047 self
.lcm_tasks
.unlock_HA(
1051 operationState
=operation_state
,
1052 detailed_status
=operation_details
,
1055 if db_sdn
and db_sdn_update
:
1056 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
1057 # If the SDN 'delete' HA task was succesful, the DB entry has been deleted,
1058 # which means that there is nowhere to register this task, so do nothing here.
1059 except DbException
as e
:
1060 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1061 self
.lcm_tasks
.remove("sdn", sdn_id
, order_id
)
1064 class K8sClusterLcm(LcmBase
):
1065 timeout_create
= 300
1067 def __init__(self
, msg
, lcm_tasks
, config
):
1069 Init, Connect to database, filesystem storage, and messaging
1070 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1074 self
.logger
= logging
.getLogger("lcm.k8scluster")
1075 self
.lcm_tasks
= lcm_tasks
1076 self
.vca_config
= config
["VCA"]
1078 super().__init
__(msg
, self
.logger
)
1080 self
.helm3_k8scluster
= K8sHelm3Connector(
1081 kubectl_command
=self
.vca_config
.get("kubectlpath"),
1082 helm_command
=self
.vca_config
.get("helm3path"),
1089 self
.juju_k8scluster
= K8sJujuConnector(
1090 kubectl_command
=self
.vca_config
.get("kubectlpath"),
1091 juju_command
=self
.vca_config
.get("jujupath"),
1099 "helm-chart-v3": self
.helm3_k8scluster
,
1100 "juju-bundle": self
.juju_k8scluster
,
1103 async def create(self
, k8scluster_content
, order_id
):
1104 op_id
= k8scluster_content
.pop("op_id", None)
1105 if not self
.lcm_tasks
.lock_HA("k8scluster", "create", op_id
):
1108 k8scluster_id
= k8scluster_content
["_id"]
1109 logging_text
= "Task k8scluster_create={} ".format(k8scluster_id
)
1110 self
.logger
.debug(logging_text
+ "Enter")
1112 db_k8scluster
= None
1113 db_k8scluster_update
= {}
1116 step
= "Getting k8scluster-id='{}' from db".format(k8scluster_id
)
1117 self
.logger
.debug(logging_text
+ step
)
1118 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": k8scluster_id
})
1119 self
.db
.encrypt_decrypt_fields(
1120 db_k8scluster
.get("credentials"),
1122 ["password", "secret"],
1123 schema_version
=db_k8scluster
["schema_version"],
1124 salt
=db_k8scluster
["_id"],
1126 k8s_credentials
= yaml
.safe_dump(db_k8scluster
.get("credentials"))
1129 init_target
= deep_get(db_k8scluster
, ("_admin", "init"))
1130 step
= "Launching k8scluster init tasks"
1132 k8s_deploy_methods
= db_k8scluster
.get("deployment_methods", {})
1133 # for backwards compatibility and all-false case
1134 if not any(k8s_deploy_methods
.values()):
1135 k8s_deploy_methods
= {
1136 "juju-bundle": True,
1137 "helm-chart-v3": True,
1139 deploy_methods
= tuple(filter(k8s_deploy_methods
.get
, k8s_deploy_methods
))
1141 for task_name
in deploy_methods
:
1142 if init_target
and task_name
not in init_target
:
1144 task
= asyncio
.ensure_future(
1145 self
.k8s_map
[task_name
].init_env(
1147 reuse_cluster_uuid
=k8scluster_id
,
1148 vca_id
=db_k8scluster
.get("vca_id"),
1151 pending_tasks
.append(task
)
1152 task2name
[task
] = task_name
1154 error_text_list
= []
1156 reached_timeout
= False
1159 while pending_tasks
:
1161 1, self
.timeout_create
- (time() - now
)
1162 ) # ensure not negative with max
1163 step
= "Waiting for k8scluster init tasks"
1164 done
, pending_tasks
= await asyncio
.wait(
1165 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
1168 # timeout. Set timeout is reached and process pending as if they hase been finished
1169 done
= pending_tasks
1170 pending_tasks
= None
1171 reached_timeout
= True
1173 task_name
= task2name
[task
]
1176 elif task
.cancelled():
1179 exc
= task
.exception()
1182 error_text_list
.append(
1183 "Failing init {}: {}".format(task_name
, exc
)
1185 db_k8scluster_update
[
1186 "_admin.{}.error_msg".format(task_name
)
1188 db_k8scluster_update
["_admin.{}.id".format(task_name
)] = None
1189 db_k8scluster_update
[
1190 "_admin.{}.operationalState".format(task_name
)
1193 logging_text
+ "{} init fail: {}".format(task_name
, exc
),
1194 exc_info
=not isinstance(exc
, (N2VCException
, str)),
1197 k8s_id
, uninstall_sw
= task
.result()
1198 tasks_name_ok
.append(task_name
)
1201 + "{} init success. id={} created={}".format(
1202 task_name
, k8s_id
, uninstall_sw
1205 db_k8scluster_update
[
1206 "_admin.{}.error_msg".format(task_name
)
1208 db_k8scluster_update
["_admin.{}.id".format(task_name
)] = k8s_id
1209 db_k8scluster_update
[
1210 "_admin.{}.created".format(task_name
)
1212 db_k8scluster_update
[
1213 "_admin.{}.operationalState".format(task_name
)
1216 step
= "Updating database for " + task_name
1217 self
.update_db_2("k8sclusters", k8scluster_id
, db_k8scluster_update
)
1219 operation_details
= "ready for " + ", ".join(tasks_name_ok
)
1220 operation_state
= "COMPLETED"
1221 db_k8scluster_update
["_admin.operationalState"] = (
1222 "ENABLED" if not error_text_list
else "DEGRADED"
1224 operation_details
+= "; " + ";".join(error_text_list
)
1226 db_k8scluster_update
["_admin.operationalState"] = "ERROR"
1227 operation_state
= "FAILED"
1228 operation_details
= ";".join(error_text_list
)
1229 db_k8scluster_update
["_admin.detailed-status"] = operation_details
1230 self
.logger
.debug(logging_text
+ "Done. Result: " + operation_state
)
1233 except Exception as e
:
1241 asyncio
.CancelledError
,
1244 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1246 self
.logger
.critical(
1247 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
1251 if exc
and db_k8scluster
:
1252 db_k8scluster_update
["_admin.operationalState"] = "ERROR"
1253 db_k8scluster_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
1256 operation_state
= "FAILED"
1257 operation_details
= "ERROR {}: {}".format(step
, exc
)
1259 if db_k8scluster
and db_k8scluster_update
:
1260 self
.update_db_2("k8sclusters", k8scluster_id
, db_k8scluster_update
)
1262 # Register the operation and unlock
1263 self
.lcm_tasks
.unlock_HA(
1267 operationState
=operation_state
,
1268 detailed_status
=operation_details
,
1270 except DbException
as e
:
1271 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1272 self
.lcm_tasks
.remove("k8scluster", k8scluster_id
, order_id
)
1274 async def edit(self
, k8scluster_content
, order_id
):
1275 op_id
= k8scluster_content
.pop("op_id", None)
1276 if not self
.lcm_tasks
.lock_HA("k8scluster", "edit", op_id
):
1279 k8scluster_id
= k8scluster_content
["_id"]
1281 logging_text
= "Task k8scluster_edit={} ".format(k8scluster_id
)
1282 self
.logger
.debug(logging_text
+ "Enter")
1284 # TODO the implementation is pending and will be part of a new feature
1285 # It will support rotation of certificates, update of credentials and K8S API endpoint
1286 # At the moment the operation is set as completed
1288 operation_state
= "COMPLETED"
1289 operation_details
= "Not implemented"
1291 self
.lcm_tasks
.unlock_HA(
1295 operationState
=operation_state
,
1296 detailed_status
=operation_details
,
1298 self
.lcm_tasks
.remove("k8scluster", k8scluster_id
, order_id
)
1300 async def delete(self
, k8scluster_content
, order_id
):
1301 # HA tasks and backward compatibility:
1302 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1303 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1304 # Register 'delete' task here for related future HA operations
1305 op_id
= k8scluster_content
.pop("op_id", None)
1306 if not self
.lcm_tasks
.lock_HA("k8scluster", "delete", op_id
):
1309 k8scluster_id
= k8scluster_content
["_id"]
1310 logging_text
= "Task k8scluster_delete={} ".format(k8scluster_id
)
1311 self
.logger
.debug(logging_text
+ "Enter")
1313 db_k8scluster
= None
1314 db_k8scluster_update
= {}
1317 step
= "Getting k8scluster='{}' from db".format(k8scluster_id
)
1318 self
.logger
.debug(logging_text
+ step
)
1319 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": k8scluster_id
})
1320 k8s_h3c_id
= deep_get(db_k8scluster
, ("_admin", "helm-chart-v3", "id"))
1321 k8s_jb_id
= deep_get(db_k8scluster
, ("_admin", "juju-bundle", "id"))
1323 cluster_removed
= True
1324 if k8s_jb_id
: # delete in reverse order of creation
1325 step
= "Removing juju-bundle '{}'".format(k8s_jb_id
)
1327 deep_get(db_k8scluster
, ("_admin", "juju-bundle", "created"))
1330 cluster_removed
= await self
.juju_k8scluster
.reset(
1331 cluster_uuid
=k8s_jb_id
,
1332 uninstall_sw
=uninstall_sw
,
1333 vca_id
=db_k8scluster
.get("vca_id"),
1335 db_k8scluster_update
["_admin.juju-bundle.id"] = None
1336 db_k8scluster_update
["_admin.juju-bundle.operationalState"] = "DISABLED"
1339 step
= "Removing helm-chart-v3 '{}'".format(k8s_h3c_id
)
1341 deep_get(db_k8scluster
, ("_admin", "helm-chart-v3", "created"))
1344 cluster_removed
= await self
.helm3_k8scluster
.reset(
1345 cluster_uuid
=k8s_h3c_id
, uninstall_sw
=uninstall_sw
1347 db_k8scluster_update
["_admin.helm-chart-v3.id"] = None
1348 db_k8scluster_update
[
1349 "_admin.helm-chart-v3.operationalState"
1352 # Try to remove from cluster_inserted to clean old versions
1353 if k8s_h3c_id
and cluster_removed
:
1354 step
= "Removing k8scluster='{}' from k8srepos".format(k8scluster_id
)
1355 self
.logger
.debug(logging_text
+ step
)
1356 db_k8srepo_list
= self
.db
.get_list(
1357 "k8srepos", {"_admin.cluster-inserted": k8s_h3c_id
}
1359 for k8srepo
in db_k8srepo_list
:
1361 cluster_list
= k8srepo
["_admin"]["cluster-inserted"]
1362 cluster_list
.remove(k8s_h3c_id
)
1366 {"_admin.cluster-inserted": cluster_list
},
1368 except Exception as e
:
1369 self
.logger
.error("{}: {}".format(step
, e
))
1370 self
.db
.del_one("k8sclusters", {"_id": k8scluster_id
})
1371 db_k8scluster_update
= None
1372 self
.logger
.debug(logging_text
+ "Done")
1374 except Exception as e
:
1382 asyncio
.CancelledError
,
1385 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1387 self
.logger
.critical(
1388 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
1392 if exc
and db_k8scluster
:
1393 db_k8scluster_update
["_admin.operationalState"] = "ERROR"
1394 db_k8scluster_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
1397 # Mark the WIM 'create' HA task as erroneous
1398 operation_state
= "FAILED"
1399 operation_details
= "ERROR {}: {}".format(step
, exc
)
1401 operation_state
= "COMPLETED"
1402 operation_details
= "deleted"
1405 if db_k8scluster_update
:
1406 self
.update_db_2("k8sclusters", k8scluster_id
, db_k8scluster_update
)
1407 # Register the K8scluster 'delete' HA task either
1408 # succesful or erroneous, or do nothing (if legacy NBI)
1409 self
.lcm_tasks
.unlock_HA(
1413 operationState
=operation_state
,
1414 detailed_status
=operation_details
,
1416 except DbException
as e
:
1417 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1418 self
.lcm_tasks
.remove("k8scluster", k8scluster_id
, order_id
)
1421 class VcaLcm(LcmBase
):
1424 def __init__(self
, msg
, lcm_tasks
, config
):
1426 Init, Connect to database, filesystem storage, and messaging
1427 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1431 self
.logger
= logging
.getLogger("lcm.vca")
1432 self
.lcm_tasks
= lcm_tasks
1434 super().__init
__(msg
, self
.logger
)
1436 # create N2VC connector
1437 self
.n2vc
= N2VCJujuConnector(log
=self
.logger
, fs
=self
.fs
, db
=self
.db
)
1439 def _get_vca_by_id(self
, vca_id
: str) -> dict:
1440 db_vca
= self
.db
.get_one("vca", {"_id": vca_id
})
1441 self
.db
.encrypt_decrypt_fields(
1444 ["secret", "cacert"],
1445 schema_version
=db_vca
["schema_version"],
1450 async def _validate_vca(self
, db_vca_id
: str) -> None:
1451 task
= asyncio
.ensure_future(
1453 self
.n2vc
.validate_vca(db_vca_id
),
1454 timeout
=self
.timeout_create
,
1457 await asyncio
.wait([task
], return_when
=asyncio
.FIRST_COMPLETED
)
1458 if task
.exception():
1459 raise task
.exception()
1461 def _is_vca_config_update(self
, update_options
) -> bool:
1463 word
in update_options
.keys()
1477 async def create(self
, vca_content
, order_id
):
1478 op_id
= vca_content
.pop("op_id", None)
1479 if not self
.lcm_tasks
.lock_HA("vca", "create", op_id
):
1482 vca_id
= vca_content
["_id"]
1483 self
.logger
.debug("Task vca_create={} {}".format(vca_id
, "Enter"))
1487 operation_state
= "FAILED"
1488 operation_details
= ""
1491 "Task vca_create={} {}".format(vca_id
, "Getting vca from db")
1493 db_vca
= self
._get
_vca
_by
_id
(vca_id
)
1495 await self
._validate
_vca
(db_vca
["_id"])
1497 "Task vca_create={} {}".format(
1498 vca_id
, "vca registered and validated successfully"
1501 db_vca_update
["_admin.operationalState"] = "ENABLED"
1502 db_vca_update
["_admin.detailed-status"] = "Connectivity: ok"
1503 operation_details
= "VCA validated"
1504 operation_state
= "COMPLETED"
1507 "Task vca_create={} {}".format(
1508 vca_id
, "Done. Result: {}".format(operation_state
)
1512 except Exception as e
:
1513 error_msg
= "Failed with exception: {}".format(e
)
1514 self
.logger
.error("Task vca_create={} {}".format(vca_id
, error_msg
))
1515 db_vca_update
["_admin.operationalState"] = "ERROR"
1516 db_vca_update
["_admin.detailed-status"] = error_msg
1517 operation_details
= error_msg
1520 self
.update_db_2("vca", vca_id
, db_vca_update
)
1522 # Register the operation and unlock
1523 self
.lcm_tasks
.unlock_HA(
1527 operationState
=operation_state
,
1528 detailed_status
=operation_details
,
1530 except DbException
as e
:
1532 "Task vca_create={} {}".format(
1533 vca_id
, "Cannot update database: {}".format(e
)
1536 self
.lcm_tasks
.remove("vca", vca_id
, order_id
)
1538 async def edit(self
, vca_content
, order_id
):
1539 op_id
= vca_content
.pop("op_id", None)
1540 if not self
.lcm_tasks
.lock_HA("vca", "edit", op_id
):
1543 vca_id
= vca_content
["_id"]
1544 self
.logger
.debug("Task vca_edit={} {}".format(vca_id
, "Enter"))
1549 operation_state
= "FAILED"
1550 operation_details
= ""
1553 "Task vca_edit={} {}".format(vca_id
, "Getting vca from db")
1555 db_vca
= self
._get
_vca
_by
_id
(vca_id
)
1556 if self
._is
_vca
_config
_update
(vca_content
):
1557 await self
._validate
_vca
(db_vca
["_id"])
1559 "Task vca_edit={} {}".format(
1560 vca_id
, "vca registered and validated successfully"
1563 db_vca_update
["_admin.operationalState"] = "ENABLED"
1564 db_vca_update
["_admin.detailed-status"] = "Connectivity: ok"
1566 operation_details
= "Edited"
1567 operation_state
= "COMPLETED"
1570 "Task vca_edit={} {}".format(
1571 vca_id
, "Done. Result: {}".format(operation_state
)
1575 except Exception as e
:
1576 error_msg
= "Failed with exception: {}".format(e
)
1577 self
.logger
.error("Task vca_edit={} {}".format(vca_id
, error_msg
))
1578 db_vca_update
["_admin.operationalState"] = "ERROR"
1579 db_vca_update
["_admin.detailed-status"] = error_msg
1580 operation_state
= "FAILED"
1581 operation_details
= error_msg
1584 self
.update_db_2("vca", vca_id
, db_vca_update
)
1586 # Register the operation and unlock
1587 self
.lcm_tasks
.unlock_HA(
1591 operationState
=operation_state
,
1592 detailed_status
=operation_details
,
1594 except DbException
as e
:
1596 "Task vca_edit={} {}".format(
1597 vca_id
, "Cannot update database: {}".format(e
)
1600 self
.lcm_tasks
.remove("vca", vca_id
, order_id
)
1602 async def delete(self
, vca_content
, order_id
):
1603 # HA tasks and backward compatibility:
1604 # If "vim_content" does not include "op_id", we a running a legacy NBI version.
1605 # In such a case, HA is not supported by NBI, "op_id" is None, and lock_HA() will do nothing.
1606 # Register "delete" task here for related future HA operations
1607 op_id
= vca_content
.pop("op_id", None)
1608 if not self
.lcm_tasks
.lock_HA("vca", "delete", op_id
):
1612 vca_id
= vca_content
["_id"]
1614 operation_state
= "FAILED"
1615 operation_details
= ""
1619 "Task vca_delete={} {}".format(vca_id
, "Deleting vca from db")
1621 self
.db
.del_one("vca", {"_id": vca_id
})
1622 db_vca_update
= None
1623 operation_details
= "deleted"
1624 operation_state
= "COMPLETED"
1627 "Task vca_delete={} {}".format(
1628 vca_id
, "Done. Result: {}".format(operation_state
)
1631 except Exception as e
:
1632 error_msg
= "Failed with exception: {}".format(e
)
1633 self
.logger
.error("Task vca_delete={} {}".format(vca_id
, error_msg
))
1634 db_vca_update
["_admin.operationalState"] = "ERROR"
1635 db_vca_update
["_admin.detailed-status"] = error_msg
1636 operation_details
= error_msg
1639 self
.update_db_2("vca", vca_id
, db_vca_update
)
1640 self
.lcm_tasks
.unlock_HA(
1644 operationState
=operation_state
,
1645 detailed_status
=operation_details
,
1647 except DbException
as e
:
1649 "Task vca_delete={} {}".format(
1650 vca_id
, "Cannot update database: {}".format(e
)
1653 self
.lcm_tasks
.remove("vca", vca_id
, order_id
)
1656 class K8sRepoLcm(LcmBase
):
1657 def __init__(self
, msg
, lcm_tasks
, config
):
1659 Init, Connect to database, filesystem storage, and messaging
1660 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1664 self
.logger
= logging
.getLogger("lcm.k8srepo")
1665 self
.lcm_tasks
= lcm_tasks
1666 self
.vca_config
= config
["VCA"]
1668 super().__init
__(msg
, self
.logger
)
1670 self
.k8srepo
= K8sHelm3Connector(
1671 kubectl_command
=self
.vca_config
.get("kubectlpath"),
1672 helm_command
=self
.vca_config
.get("helmpath"),
1679 async def create(self
, k8srepo_content
, order_id
):
1680 # HA tasks and backward compatibility:
1681 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1682 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1683 # Register 'create' task here for related future HA operations
1685 op_id
= k8srepo_content
.pop("op_id", None)
1686 if not self
.lcm_tasks
.lock_HA("k8srepo", "create", op_id
):
1689 k8srepo_id
= k8srepo_content
.get("_id")
1690 logging_text
= "Task k8srepo_create={} ".format(k8srepo_id
)
1691 self
.logger
.debug(logging_text
+ "Enter")
1694 db_k8srepo_update
= {}
1696 operation_state
= "COMPLETED"
1697 operation_details
= ""
1699 step
= "Getting k8srepo-id='{}' from db".format(k8srepo_id
)
1700 self
.logger
.debug(logging_text
+ step
)
1701 db_k8srepo
= self
.db
.get_one("k8srepos", {"_id": k8srepo_id
})
1702 db_k8srepo_update
["_admin.operationalState"] = "ENABLED"
1703 except Exception as e
:
1705 logging_text
+ "Exit Exception {}".format(e
),
1706 exc_info
=not isinstance(
1713 asyncio
.CancelledError
,
1719 if exc
and db_k8srepo
:
1720 db_k8srepo_update
["_admin.operationalState"] = "ERROR"
1721 db_k8srepo_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
1724 # Mark the WIM 'create' HA task as erroneous
1725 operation_state
= "FAILED"
1726 operation_details
= "ERROR {}: {}".format(step
, exc
)
1728 if db_k8srepo_update
:
1729 self
.update_db_2("k8srepos", k8srepo_id
, db_k8srepo_update
)
1730 # Register the K8srepo 'create' HA task either
1731 # succesful or erroneous, or do nothing (if legacy NBI)
1732 self
.lcm_tasks
.unlock_HA(
1736 operationState
=operation_state
,
1737 detailed_status
=operation_details
,
1739 except DbException
as e
:
1740 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1741 self
.lcm_tasks
.remove("k8srepo", k8srepo_id
, order_id
)
1743 async def delete(self
, k8srepo_content
, order_id
):
1744 # HA tasks and backward compatibility:
1745 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1746 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1747 # Register 'delete' task here for related future HA operations
1748 op_id
= k8srepo_content
.pop("op_id", None)
1749 if not self
.lcm_tasks
.lock_HA("k8srepo", "delete", op_id
):
1752 k8srepo_id
= k8srepo_content
.get("_id")
1753 logging_text
= "Task k8srepo_delete={} ".format(k8srepo_id
)
1754 self
.logger
.debug(logging_text
+ "Enter")
1757 db_k8srepo_update
= {}
1760 operation_state
= "COMPLETED"
1761 operation_details
= ""
1763 step
= "Getting k8srepo-id='{}' from db".format(k8srepo_id
)
1764 self
.logger
.debug(logging_text
+ step
)
1765 db_k8srepo
= self
.db
.get_one("k8srepos", {"_id": k8srepo_id
})
1767 except Exception as e
:
1769 logging_text
+ "Exit Exception {}".format(e
),
1770 exc_info
=not isinstance(
1777 asyncio
.CancelledError
,
1783 if exc
and db_k8srepo
:
1784 db_k8srepo_update
["_admin.operationalState"] = "ERROR"
1785 db_k8srepo_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
1788 # Mark the WIM 'create' HA task as erroneous
1789 operation_state
= "FAILED"
1790 operation_details
= "ERROR {}: {}".format(step
, exc
)
1792 if db_k8srepo_update
:
1793 self
.update_db_2("k8srepos", k8srepo_id
, db_k8srepo_update
)
1794 # Register the K8srepo 'delete' HA task either
1795 # succesful or erroneous, or do nothing (if legacy NBI)
1796 self
.lcm_tasks
.unlock_HA(
1800 operationState
=operation_state
,
1801 detailed_status
=operation_details
,
1803 self
.db
.del_one("k8srepos", {"_id": k8srepo_id
})
1804 except DbException
as e
:
1805 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1806 self
.lcm_tasks
.remove("k8srepo", k8srepo_id
, order_id
)