1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
22 import logging
.handlers
23 from osm_lcm
import ROclient
24 from osm_lcm
.lcm_utils
import LcmException
, LcmBase
, deep_get
25 from n2vc
.k8s_helm_conn
import K8sHelmConnector
26 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
27 from n2vc
.k8s_juju_conn
import K8sJujuConnector
28 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
29 from n2vc
.exceptions
import K8sException
, N2VCException
30 from osm_common
.dbbase
import DbException
31 from copy
import deepcopy
34 __author__
= "Alfonso Tierno"
37 class VimLcm(LcmBase
):
38 # values that are encrypted at vim config because they are passwords
39 vim_config_encrypted
= {
40 "1.1": ("admin_password", "nsx_password", "vcenter_password"),
49 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
51 Init, Connect to database, filesystem storage, and messaging
52 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
56 self
.logger
= logging
.getLogger("lcm.vim")
58 self
.lcm_tasks
= lcm_tasks
59 self
.ro_config
= config
["RO"]
61 super().__init
__(msg
, self
.logger
)
63 async def create(self
, vim_content
, order_id
):
65 # HA tasks and backward compatibility:
66 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
67 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
68 # Register 'create' task here for related future HA operations
69 op_id
= vim_content
.pop("op_id", None)
70 if not self
.lcm_tasks
.lock_HA("vim", "create", op_id
):
73 vim_id
= vim_content
["_id"]
74 logging_text
= "Task vim_create={} ".format(vim_id
)
75 self
.logger
.debug(logging_text
+ "Enter")
82 step
= "Getting vim-id='{}' from db".format(vim_id
)
83 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
84 if vim_content
.get("config") and vim_content
["config"].get(
87 step
= "Getting sdn-controller-id='{}' from db".format(
88 vim_content
["config"]["sdn-controller"]
90 db_sdn
= self
.db
.get_one(
91 "sdns", {"_id": vim_content
["config"]["sdn-controller"]}
94 # If the VIM account has an associated SDN account, also
95 # wait for any previous tasks in process for the SDN
96 await self
.lcm_tasks
.waitfor_related_HA("sdn", "ANY", db_sdn
["_id"])
100 and db_sdn
["_admin"].get("deployed")
101 and db_sdn
["_admin"]["deployed"].get("RO")
103 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
106 "sdn-controller={} is not available. Not deployed at RO".format(
107 vim_content
["config"]["sdn-controller"]
111 step
= "Creating vim at RO"
112 db_vim_update
["_admin.deployed.RO"] = None
113 db_vim_update
["_admin.detailed-status"] = step
114 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
115 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
116 vim_RO
= deepcopy(vim_content
)
117 vim_RO
.pop("_id", None)
118 vim_RO
.pop("_admin", None)
119 schema_version
= vim_RO
.pop("schema_version", None)
120 vim_RO
.pop("schema_type", None)
121 vim_RO
.pop("vim_tenant_name", None)
122 vim_RO
["type"] = vim_RO
.pop("vim_type")
123 vim_RO
.pop("vim_user", None)
124 vim_RO
.pop("vim_password", None)
126 vim_RO
["config"]["sdn-controller"] = RO_sdn_id
127 desc
= await RO
.create("vim", descriptor
=vim_RO
)
128 RO_vim_id
= desc
["uuid"]
129 db_vim_update
["_admin.deployed.RO"] = RO_vim_id
131 logging_text
+ "VIM created at RO_vim_id={}".format(RO_vim_id
)
134 step
= "Creating vim_account at RO"
135 db_vim_update
["_admin.detailed-status"] = step
136 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
138 if vim_content
.get("vim_password"):
139 vim_content
["vim_password"] = self
.db
.decrypt(
140 vim_content
["vim_password"],
141 schema_version
=schema_version
,
145 "vim_tenant_name": vim_content
["vim_tenant_name"],
146 "vim_username": vim_content
["vim_user"],
147 "vim_password": vim_content
["vim_password"],
149 if vim_RO
.get("config"):
150 vim_account_RO
["config"] = vim_RO
["config"]
151 if "sdn-controller" in vim_account_RO
["config"]:
152 del vim_account_RO
["config"]["sdn-controller"]
153 if "sdn-port-mapping" in vim_account_RO
["config"]:
154 del vim_account_RO
["config"]["sdn-port-mapping"]
155 vim_config_encrypted_keys
= self
.vim_config_encrypted
.get(
157 ) or self
.vim_config_encrypted
.get("default")
158 for p
in vim_config_encrypted_keys
:
159 if vim_account_RO
["config"].get(p
):
160 vim_account_RO
["config"][p
] = self
.db
.decrypt(
161 vim_account_RO
["config"][p
],
162 schema_version
=schema_version
,
166 desc
= await RO
.attach("vim_account", RO_vim_id
, descriptor
=vim_account_RO
)
167 db_vim_update
["_admin.deployed.RO-account"] = desc
["uuid"]
168 db_vim_update
["_admin.operationalState"] = "ENABLED"
169 db_vim_update
["_admin.detailed-status"] = "Done"
170 # Mark the VIM 'create' HA task as successful
171 operation_state
= "COMPLETED"
172 operation_details
= "Done"
176 + "Exit Ok VIM account created at RO_vim_account_id={}".format(
182 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
183 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
185 except Exception as e
:
186 self
.logger
.critical(
187 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
192 db_vim_update
["_admin.operationalState"] = "ERROR"
193 db_vim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
196 # Mark the VIM 'create' HA task as erroneous
197 operation_state
= "FAILED"
198 operation_details
= "ERROR {}: {}".format(step
, exc
)
201 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
202 # Register the VIM 'create' HA task either
203 # succesful or erroneous, or do nothing (if legacy NBI)
204 self
.lcm_tasks
.unlock_HA(
208 operationState
=operation_state
,
209 detailed_status
=operation_details
,
211 except DbException
as e
:
212 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
214 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
216 async def edit(self
, vim_content
, order_id
):
218 # HA tasks and backward compatibility:
219 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
220 # In such a case, HA is not supported by NBI, and the HA check always returns True
221 op_id
= vim_content
.pop("op_id", None)
222 if not self
.lcm_tasks
.lock_HA("vim", "edit", op_id
):
225 vim_id
= vim_content
["_id"]
226 logging_text
= "Task vim_edit={} ".format(vim_id
)
227 self
.logger
.debug(logging_text
+ "Enter")
234 step
= "Getting vim-id='{}' from db".format(vim_id
)
236 # wait for any previous tasks in process
237 await self
.lcm_tasks
.waitfor_related_HA("vim", "edit", op_id
)
239 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
243 and db_vim
["_admin"].get("deployed")
244 and db_vim
["_admin"]["deployed"].get("RO")
246 if vim_content
.get("config") and vim_content
["config"].get(
249 step
= "Getting sdn-controller-id='{}' from db".format(
250 vim_content
["config"]["sdn-controller"]
252 db_sdn
= self
.db
.get_one(
253 "sdns", {"_id": vim_content
["config"]["sdn-controller"]}
256 # If the VIM account has an associated SDN account, also
257 # wait for any previous tasks in process for the SDN
258 await self
.lcm_tasks
.waitfor_related_HA("sdn", "ANY", db_sdn
["_id"])
262 and db_sdn
["_admin"].get("deployed")
263 and db_sdn
["_admin"]["deployed"].get("RO")
265 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
268 "sdn-controller={} is not available. Not deployed at RO".format(
269 vim_content
["config"]["sdn-controller"]
273 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
274 step
= "Editing vim at RO"
275 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
276 vim_RO
= deepcopy(vim_content
)
277 vim_RO
.pop("_id", None)
278 vim_RO
.pop("_admin", None)
279 schema_version
= vim_RO
.pop("schema_version", None)
280 vim_RO
.pop("schema_type", None)
281 vim_RO
.pop("vim_tenant_name", None)
282 if "vim_type" in vim_RO
:
283 vim_RO
["type"] = vim_RO
.pop("vim_type")
284 vim_RO
.pop("vim_user", None)
285 vim_RO
.pop("vim_password", None)
287 vim_RO
["config"]["sdn-controller"] = RO_sdn_id
288 # TODO make a deep update of sdn-port-mapping
290 await RO
.edit("vim", RO_vim_id
, descriptor
=vim_RO
)
292 step
= "Editing vim-account at RO tenant"
294 if "config" in vim_content
:
295 if "sdn-controller" in vim_content
["config"]:
296 del vim_content
["config"]["sdn-controller"]
297 if "sdn-port-mapping" in vim_content
["config"]:
298 del vim_content
["config"]["sdn-port-mapping"]
299 if not vim_content
["config"]:
300 del vim_content
["config"]
301 if "vim_tenant_name" in vim_content
:
302 vim_account_RO
["vim_tenant_name"] = vim_content
["vim_tenant_name"]
303 if "vim_password" in vim_content
:
304 vim_account_RO
["vim_password"] = vim_content
["vim_password"]
305 if vim_content
.get("vim_password"):
306 vim_account_RO
["vim_password"] = self
.db
.decrypt(
307 vim_content
["vim_password"],
308 schema_version
=schema_version
,
311 if "config" in vim_content
:
312 vim_account_RO
["config"] = vim_content
["config"]
313 if vim_content
.get("config"):
314 vim_config_encrypted_keys
= self
.vim_config_encrypted
.get(
316 ) or self
.vim_config_encrypted
.get("default")
317 for p
in vim_config_encrypted_keys
:
318 if vim_content
["config"].get(p
):
319 vim_account_RO
["config"][p
] = self
.db
.decrypt(
320 vim_content
["config"][p
],
321 schema_version
=schema_version
,
325 if "vim_user" in vim_content
:
326 vim_content
["vim_username"] = vim_content
["vim_user"]
327 # vim_account must be edited always even if empty in order to ensure changes are translated to RO
328 # vim_thread. RO will remove and relaunch a new thread for this vim_account
329 await RO
.edit("vim_account", RO_vim_id
, descriptor
=vim_account_RO
)
330 db_vim_update
["_admin.operationalState"] = "ENABLED"
331 # Mark the VIM 'edit' HA task as successful
332 operation_state
= "COMPLETED"
333 operation_details
= "Done"
335 self
.logger
.debug(logging_text
+ "Exit Ok RO_vim_id={}".format(RO_vim_id
))
338 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
339 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
341 except Exception as e
:
342 self
.logger
.critical(
343 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
348 db_vim_update
["_admin.operationalState"] = "ERROR"
349 db_vim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
352 # Mark the VIM 'edit' HA task as erroneous
353 operation_state
= "FAILED"
354 operation_details
= "ERROR {}: {}".format(step
, exc
)
357 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
358 # Register the VIM 'edit' HA task either
359 # succesful or erroneous, or do nothing (if legacy NBI)
360 self
.lcm_tasks
.unlock_HA(
364 operationState
=operation_state
,
365 detailed_status
=operation_details
,
367 except DbException
as e
:
368 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
370 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
372 async def delete(self
, vim_content
, order_id
):
374 # HA tasks and backward compatibility:
375 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
376 # In such a case, HA is not supported by NBI, and the HA check always returns True
377 op_id
= vim_content
.pop("op_id", None)
378 if not self
.lcm_tasks
.lock_HA("vim", "delete", op_id
):
381 vim_id
= vim_content
["_id"]
382 logging_text
= "Task vim_delete={} ".format(vim_id
)
383 self
.logger
.debug(logging_text
+ "Enter")
388 step
= "Getting vim from db"
390 # wait for any previous tasks in process
391 await self
.lcm_tasks
.waitfor_related_HA("vim", "delete", op_id
)
392 if not self
.ro_config
.get("ng"):
393 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
396 and db_vim
["_admin"].get("deployed")
397 and db_vim
["_admin"]["deployed"].get("RO")
399 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
400 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
401 step
= "Detaching vim from RO tenant"
403 await RO
.detach("vim_account", RO_vim_id
)
404 except ROclient
.ROClientException
as e
:
405 if e
.http_code
== 404: # not found
408 + "RO_vim_id={} already detached".format(RO_vim_id
)
413 step
= "Deleting vim from RO"
415 await RO
.delete("vim", RO_vim_id
)
416 except ROclient
.ROClientException
as e
:
417 if e
.http_code
== 404: # not found
420 + "RO_vim_id={} already deleted".format(RO_vim_id
)
426 self
.logger
.debug(logging_text
+ "Nothing to remove at RO")
427 self
.db
.del_one("vim_accounts", {"_id": vim_id
})
429 self
.logger
.debug(logging_text
+ "Exit Ok")
432 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
433 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
435 except Exception as e
:
436 self
.logger
.critical(
437 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
441 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
443 db_vim_update
["_admin.operationalState"] = "ERROR"
444 db_vim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
447 # Mark the VIM 'delete' HA task as erroneous
448 operation_state
= "FAILED"
449 operation_details
= "ERROR {}: {}".format(step
, exc
)
450 self
.lcm_tasks
.unlock_HA(
454 operationState
=operation_state
,
455 detailed_status
=operation_details
,
458 if db_vim
and db_vim_update
:
459 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
460 # If the VIM 'delete' HA task was succesful, the DB entry has been deleted,
461 # which means that there is nowhere to register this task, so do nothing here.
462 except DbException
as e
:
463 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
464 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
467 class WimLcm(LcmBase
):
468 # values that are encrypted at wim config because they are passwords
469 wim_config_encrypted
= ()
471 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
473 Init, Connect to database, filesystem storage, and messaging
474 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
478 self
.logger
= logging
.getLogger("lcm.vim")
480 self
.lcm_tasks
= lcm_tasks
481 self
.ro_config
= config
["RO"]
483 super().__init
__(msg
, self
.logger
)
485 async def create(self
, wim_content
, order_id
):
487 # HA tasks and backward compatibility:
488 # If 'wim_content' does not include 'op_id', we a running a legacy NBI version.
489 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
490 # Register 'create' task here for related future HA operations
491 op_id
= wim_content
.pop("op_id", None)
492 self
.lcm_tasks
.lock_HA("wim", "create", op_id
)
494 wim_id
= wim_content
["_id"]
495 logging_text
= "Task wim_create={} ".format(wim_id
)
496 self
.logger
.debug(logging_text
+ "Enter")
502 step
= "Getting wim-id='{}' from db".format(wim_id
)
503 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_id
})
504 db_wim_update
["_admin.deployed.RO"] = None
506 step
= "Creating wim at RO"
507 db_wim_update
["_admin.detailed-status"] = step
508 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
509 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
510 wim_RO
= deepcopy(wim_content
)
511 wim_RO
.pop("_id", None)
512 wim_RO
.pop("_admin", None)
513 schema_version
= wim_RO
.pop("schema_version", None)
514 wim_RO
.pop("schema_type", None)
515 wim_RO
.pop("wim_tenant_name", None)
516 wim_RO
["type"] = wim_RO
.pop("wim_type")
517 wim_RO
.pop("wim_user", None)
518 wim_RO
.pop("wim_password", None)
519 desc
= await RO
.create("wim", descriptor
=wim_RO
)
520 RO_wim_id
= desc
["uuid"]
521 db_wim_update
["_admin.deployed.RO"] = RO_wim_id
523 logging_text
+ "WIM created at RO_wim_id={}".format(RO_wim_id
)
526 step
= "Creating wim_account at RO"
527 db_wim_update
["_admin.detailed-status"] = step
528 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
530 if wim_content
.get("wim_password"):
531 wim_content
["wim_password"] = self
.db
.decrypt(
532 wim_content
["wim_password"],
533 schema_version
=schema_version
,
537 "name": wim_content
["name"],
538 "user": wim_content
["user"],
539 "password": wim_content
["password"],
541 if wim_RO
.get("config"):
542 wim_account_RO
["config"] = wim_RO
["config"]
543 if "wim_port_mapping" in wim_account_RO
["config"]:
544 del wim_account_RO
["config"]["wim_port_mapping"]
545 for p
in self
.wim_config_encrypted
:
546 if wim_account_RO
["config"].get(p
):
547 wim_account_RO
["config"][p
] = self
.db
.decrypt(
548 wim_account_RO
["config"][p
],
549 schema_version
=schema_version
,
553 desc
= await RO
.attach("wim_account", RO_wim_id
, descriptor
=wim_account_RO
)
554 db_wim_update
["_admin.deployed.RO-account"] = desc
["uuid"]
555 db_wim_update
["_admin.operationalState"] = "ENABLED"
556 db_wim_update
["_admin.detailed-status"] = "Done"
557 # Mark the WIM 'create' HA task as successful
558 operation_state
= "COMPLETED"
559 operation_details
= "Done"
563 + "Exit Ok WIM account created at RO_wim_account_id={}".format(
569 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
570 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
572 except Exception as e
:
573 self
.logger
.critical(
574 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
579 db_wim_update
["_admin.operationalState"] = "ERROR"
580 db_wim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
583 # Mark the WIM 'create' HA task as erroneous
584 operation_state
= "FAILED"
585 operation_details
= "ERROR {}: {}".format(step
, exc
)
588 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
589 # Register the WIM 'create' HA task either
590 # succesful or erroneous, or do nothing (if legacy NBI)
591 self
.lcm_tasks
.unlock_HA(
595 operationState
=operation_state
,
596 detailed_status
=operation_details
,
598 except DbException
as e
:
599 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
600 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
602 async def edit(self
, wim_content
, order_id
):
604 # HA tasks and backward compatibility:
605 # If 'wim_content' does not include 'op_id', we a running a legacy NBI version.
606 # In such a case, HA is not supported by NBI, and the HA check always returns True
607 op_id
= wim_content
.pop("op_id", None)
608 if not self
.lcm_tasks
.lock_HA("wim", "edit", op_id
):
611 wim_id
= wim_content
["_id"]
612 logging_text
= "Task wim_edit={} ".format(wim_id
)
613 self
.logger
.debug(logging_text
+ "Enter")
619 step
= "Getting wim-id='{}' from db".format(wim_id
)
621 # wait for any previous tasks in process
622 await self
.lcm_tasks
.waitfor_related_HA("wim", "edit", op_id
)
624 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_id
})
628 and db_wim
["_admin"].get("deployed")
629 and db_wim
["_admin"]["deployed"].get("RO")
632 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO"]
633 step
= "Editing wim at RO"
634 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
635 wim_RO
= deepcopy(wim_content
)
636 wim_RO
.pop("_id", None)
637 wim_RO
.pop("_admin", None)
638 schema_version
= wim_RO
.pop("schema_version", None)
639 wim_RO
.pop("schema_type", None)
640 wim_RO
.pop("wim_tenant_name", None)
641 if "wim_type" in wim_RO
:
642 wim_RO
["type"] = wim_RO
.pop("wim_type")
643 wim_RO
.pop("wim_user", None)
644 wim_RO
.pop("wim_password", None)
645 # TODO make a deep update of wim_port_mapping
647 await RO
.edit("wim", RO_wim_id
, descriptor
=wim_RO
)
649 step
= "Editing wim-account at RO tenant"
651 if "config" in wim_content
:
652 if "wim_port_mapping" in wim_content
["config"]:
653 del wim_content
["config"]["wim_port_mapping"]
654 if not wim_content
["config"]:
655 del wim_content
["config"]
656 if "wim_tenant_name" in wim_content
:
657 wim_account_RO
["wim_tenant_name"] = wim_content
["wim_tenant_name"]
658 if "wim_password" in wim_content
:
659 wim_account_RO
["wim_password"] = wim_content
["wim_password"]
660 if wim_content
.get("wim_password"):
661 wim_account_RO
["wim_password"] = self
.db
.decrypt(
662 wim_content
["wim_password"],
663 schema_version
=schema_version
,
666 if "config" in wim_content
:
667 wim_account_RO
["config"] = wim_content
["config"]
668 if wim_content
.get("config"):
669 for p
in self
.wim_config_encrypted
:
670 if wim_content
["config"].get(p
):
671 wim_account_RO
["config"][p
] = self
.db
.decrypt(
672 wim_content
["config"][p
],
673 schema_version
=schema_version
,
677 if "wim_user" in wim_content
:
678 wim_content
["wim_username"] = wim_content
["wim_user"]
679 # wim_account must be edited always even if empty in order to ensure changes are translated to RO
680 # wim_thread. RO will remove and relaunch a new thread for this wim_account
681 await RO
.edit("wim_account", RO_wim_id
, descriptor
=wim_account_RO
)
682 db_wim_update
["_admin.operationalState"] = "ENABLED"
683 # Mark the WIM 'edit' HA task as successful
684 operation_state
= "COMPLETED"
685 operation_details
= "Done"
687 self
.logger
.debug(logging_text
+ "Exit Ok RO_wim_id={}".format(RO_wim_id
))
690 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
691 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
693 except Exception as e
:
694 self
.logger
.critical(
695 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
700 db_wim_update
["_admin.operationalState"] = "ERROR"
701 db_wim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
704 # Mark the WIM 'edit' HA task as erroneous
705 operation_state
= "FAILED"
706 operation_details
= "ERROR {}: {}".format(step
, exc
)
709 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
710 # Register the WIM 'edit' HA task either
711 # succesful or erroneous, or do nothing (if legacy NBI)
712 self
.lcm_tasks
.unlock_HA(
716 operationState
=operation_state
,
717 detailed_status
=operation_details
,
719 except DbException
as e
:
720 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
721 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
723 async def delete(self
, wim_content
, order_id
):
725 # HA tasks and backward compatibility:
726 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
727 # In such a case, HA is not supported by NBI, and the HA check always returns True
728 op_id
= wim_content
.pop("op_id", None)
729 if not self
.lcm_tasks
.lock_HA("wim", "delete", op_id
):
732 wim_id
= wim_content
["_id"]
733 logging_text
= "Task wim_delete={} ".format(wim_id
)
734 self
.logger
.debug(logging_text
+ "Enter")
739 step
= "Getting wim from db"
741 # wait for any previous tasks in process
742 await self
.lcm_tasks
.waitfor_related_HA("wim", "delete", op_id
)
744 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_id
})
747 and db_wim
["_admin"].get("deployed")
748 and db_wim
["_admin"]["deployed"].get("RO")
750 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO"]
751 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
752 step
= "Detaching wim from RO tenant"
754 await RO
.detach("wim_account", RO_wim_id
)
755 except ROclient
.ROClientException
as e
:
756 if e
.http_code
== 404: # not found
759 + "RO_wim_id={} already detached".format(RO_wim_id
)
764 step
= "Deleting wim from RO"
766 await RO
.delete("wim", RO_wim_id
)
767 except ROclient
.ROClientException
as e
:
768 if e
.http_code
== 404: # not found
771 + "RO_wim_id={} already deleted".format(RO_wim_id
)
777 self
.logger
.error(logging_text
+ "Nothing to remove at RO")
778 self
.db
.del_one("wim_accounts", {"_id": wim_id
})
780 self
.logger
.debug(logging_text
+ "Exit Ok")
783 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
784 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
786 except Exception as e
:
787 self
.logger
.critical(
788 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
792 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
794 db_wim_update
["_admin.operationalState"] = "ERROR"
795 db_wim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
798 # Mark the WIM 'delete' HA task as erroneous
799 operation_state
= "FAILED"
800 operation_details
= "ERROR {}: {}".format(step
, exc
)
801 self
.lcm_tasks
.unlock_HA(
805 operationState
=operation_state
,
806 detailed_status
=operation_details
,
809 if db_wim
and db_wim_update
:
810 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
811 # If the WIM 'delete' HA task was succesful, the DB entry has been deleted,
812 # which means that there is nowhere to register this task, so do nothing here.
813 except DbException
as e
:
814 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
815 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
818 class SdnLcm(LcmBase
):
819 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
821 Init, Connect to database, filesystem storage, and messaging
822 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
826 self
.logger
= logging
.getLogger("lcm.sdn")
828 self
.lcm_tasks
= lcm_tasks
829 self
.ro_config
= config
["RO"]
831 super().__init
__(msg
, self
.logger
)
833 async def create(self
, sdn_content
, order_id
):
835 # HA tasks and backward compatibility:
836 # If 'sdn_content' does not include 'op_id', we a running a legacy NBI version.
837 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
838 # Register 'create' task here for related future HA operations
839 op_id
= sdn_content
.pop("op_id", None)
840 self
.lcm_tasks
.lock_HA("sdn", "create", op_id
)
842 sdn_id
= sdn_content
["_id"]
843 logging_text
= "Task sdn_create={} ".format(sdn_id
)
844 self
.logger
.debug(logging_text
+ "Enter")
851 step
= "Getting sdn from db"
852 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
853 db_sdn_update
["_admin.deployed.RO"] = None
855 step
= "Creating sdn at RO"
856 db_sdn_update
["_admin.detailed-status"] = step
857 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
859 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
860 sdn_RO
= deepcopy(sdn_content
)
861 sdn_RO
.pop("_id", None)
862 sdn_RO
.pop("_admin", None)
863 schema_version
= sdn_RO
.pop("schema_version", None)
864 sdn_RO
.pop("schema_type", None)
865 sdn_RO
.pop("description", None)
866 if sdn_RO
.get("password"):
867 sdn_RO
["password"] = self
.db
.decrypt(
868 sdn_RO
["password"], schema_version
=schema_version
, salt
=sdn_id
871 desc
= await RO
.create("sdn", descriptor
=sdn_RO
)
872 RO_sdn_id
= desc
["uuid"]
873 db_sdn_update
["_admin.deployed.RO"] = RO_sdn_id
874 db_sdn_update
["_admin.operationalState"] = "ENABLED"
875 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id={}".format(RO_sdn_id
))
876 # Mark the SDN 'create' HA task as successful
877 operation_state
= "COMPLETED"
878 operation_details
= "Done"
881 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
882 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
884 except Exception as e
:
885 self
.logger
.critical(
886 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
891 db_sdn_update
["_admin.operationalState"] = "ERROR"
892 db_sdn_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
895 # Mark the SDN 'create' HA task as erroneous
896 operation_state
= "FAILED"
897 operation_details
= "ERROR {}: {}".format(step
, exc
)
899 if db_sdn
and db_sdn_update
:
900 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
901 # Register the SDN 'create' HA task either
902 # succesful or erroneous, or do nothing (if legacy NBI)
903 self
.lcm_tasks
.unlock_HA(
907 operationState
=operation_state
,
908 detailed_status
=operation_details
,
910 except DbException
as e
:
911 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
912 self
.lcm_tasks
.remove("sdn", sdn_id
, order_id
)
914 async def edit(self
, sdn_content
, order_id
):
916 # HA tasks and backward compatibility:
917 # If 'sdn_content' does not include 'op_id', we a running a legacy NBI version.
918 # In such a case, HA is not supported by NBI, and the HA check always returns True
919 op_id
= sdn_content
.pop("op_id", None)
920 if not self
.lcm_tasks
.lock_HA("sdn", "edit", op_id
):
923 sdn_id
= sdn_content
["_id"]
924 logging_text
= "Task sdn_edit={} ".format(sdn_id
)
925 self
.logger
.debug(logging_text
+ "Enter")
930 step
= "Getting sdn from db"
932 # wait for any previous tasks in process
933 await self
.lcm_tasks
.waitfor_related_HA("sdn", "edit", op_id
)
935 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
939 and db_sdn
["_admin"].get("deployed")
940 and db_sdn
["_admin"]["deployed"].get("RO")
942 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
943 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
944 step
= "Editing sdn at RO"
945 sdn_RO
= deepcopy(sdn_content
)
946 sdn_RO
.pop("_id", None)
947 sdn_RO
.pop("_admin", None)
948 schema_version
= sdn_RO
.pop("schema_version", None)
949 sdn_RO
.pop("schema_type", None)
950 sdn_RO
.pop("description", None)
951 if sdn_RO
.get("password"):
952 sdn_RO
["password"] = self
.db
.decrypt(
953 sdn_RO
["password"], schema_version
=schema_version
, salt
=sdn_id
956 await RO
.edit("sdn", RO_sdn_id
, descriptor
=sdn_RO
)
957 db_sdn_update
["_admin.operationalState"] = "ENABLED"
958 # Mark the SDN 'edit' HA task as successful
959 operation_state
= "COMPLETED"
960 operation_details
= "Done"
962 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id={}".format(RO_sdn_id
))
965 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
966 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
968 except Exception as e
:
969 self
.logger
.critical(
970 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
975 db_sdn
["_admin.operationalState"] = "ERROR"
976 db_sdn
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
977 # Mark the SDN 'edit' HA task as erroneous
978 operation_state
= "FAILED"
979 operation_details
= "ERROR {}: {}".format(step
, exc
)
982 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
983 # Register the SDN 'edit' HA task either
984 # succesful or erroneous, or do nothing (if legacy NBI)
985 self
.lcm_tasks
.unlock_HA(
989 operationState
=operation_state
,
990 detailed_status
=operation_details
,
992 except DbException
as e
:
993 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
994 self
.lcm_tasks
.remove("sdn", sdn_id
, order_id
)
996 async def delete(self
, sdn_content
, order_id
):
998 # HA tasks and backward compatibility:
999 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1000 # In such a case, HA is not supported by NBI, and the HA check always returns True
1001 op_id
= sdn_content
.pop("op_id", None)
1002 if not self
.lcm_tasks
.lock_HA("sdn", "delete", op_id
):
1005 sdn_id
= sdn_content
["_id"]
1006 logging_text
= "Task sdn_delete={} ".format(sdn_id
)
1007 self
.logger
.debug(logging_text
+ "Enter")
1012 step
= "Getting sdn from db"
1014 # wait for any previous tasks in process
1015 await self
.lcm_tasks
.waitfor_related_HA("sdn", "delete", op_id
)
1017 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
1019 db_sdn
.get("_admin")
1020 and db_sdn
["_admin"].get("deployed")
1021 and db_sdn
["_admin"]["deployed"].get("RO")
1023 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
1024 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
1025 step
= "Deleting sdn from RO"
1027 await RO
.delete("sdn", RO_sdn_id
)
1028 except ROclient
.ROClientException
as e
:
1029 if e
.http_code
== 404: # not found
1032 + "RO_sdn_id={} already deleted".format(RO_sdn_id
)
1039 logging_text
+ "Skipping. There is not RO information at database"
1041 self
.db
.del_one("sdns", {"_id": sdn_id
})
1043 self
.logger
.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id
))
1046 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
1047 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1049 except Exception as e
:
1050 self
.logger
.critical(
1051 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
1056 db_sdn
["_admin.operationalState"] = "ERROR"
1057 db_sdn
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
1058 # Mark the SDN 'delete' HA task as erroneous
1059 operation_state
= "FAILED"
1060 operation_details
= "ERROR {}: {}".format(step
, exc
)
1061 self
.lcm_tasks
.unlock_HA(
1065 operationState
=operation_state
,
1066 detailed_status
=operation_details
,
1069 if db_sdn
and db_sdn_update
:
1070 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
1071 # If the SDN 'delete' HA task was succesful, the DB entry has been deleted,
1072 # which means that there is nowhere to register this task, so do nothing here.
1073 except DbException
as e
:
1074 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1075 self
.lcm_tasks
.remove("sdn", sdn_id
, order_id
)
1078 class K8sClusterLcm(LcmBase
):
1079 timeout_create
= 300
1081 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
1083 Init, Connect to database, filesystem storage, and messaging
1084 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1088 self
.logger
= logging
.getLogger("lcm.k8scluster")
1090 self
.lcm_tasks
= lcm_tasks
1091 self
.vca_config
= config
["VCA"]
1093 super().__init
__(msg
, self
.logger
)
1095 self
.helm2_k8scluster
= K8sHelmConnector(
1096 kubectl_command
=self
.vca_config
.get("kubectlpath"),
1097 helm_command
=self
.vca_config
.get("helmpath"),
1104 self
.helm3_k8scluster
= K8sHelm3Connector(
1105 kubectl_command
=self
.vca_config
.get("kubectlpath"),
1106 helm_command
=self
.vca_config
.get("helm3path"),
1113 self
.juju_k8scluster
= K8sJujuConnector(
1114 kubectl_command
=self
.vca_config
.get("kubectlpath"),
1115 juju_command
=self
.vca_config
.get("jujupath"),
1124 "helm-chart": self
.helm2_k8scluster
,
1125 "helm-chart-v3": self
.helm3_k8scluster
,
1126 "juju-bundle": self
.juju_k8scluster
,
1129 async def create(self
, k8scluster_content
, order_id
):
1131 op_id
= k8scluster_content
.pop("op_id", None)
1132 if not self
.lcm_tasks
.lock_HA("k8scluster", "create", op_id
):
1135 k8scluster_id
= k8scluster_content
["_id"]
1136 logging_text
= "Task k8scluster_create={} ".format(k8scluster_id
)
1137 self
.logger
.debug(logging_text
+ "Enter")
1139 db_k8scluster
= None
1140 db_k8scluster_update
= {}
1143 step
= "Getting k8scluster-id='{}' from db".format(k8scluster_id
)
1144 self
.logger
.debug(logging_text
+ step
)
1145 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": k8scluster_id
})
1146 self
.db
.encrypt_decrypt_fields(
1147 db_k8scluster
.get("credentials"),
1149 ["password", "secret"],
1150 schema_version
=db_k8scluster
["schema_version"],
1151 salt
=db_k8scluster
["_id"],
1153 k8s_credentials
= yaml
.safe_dump(db_k8scluster
.get("credentials"))
1156 init_target
= deep_get(db_k8scluster
, ("_admin", "init"))
1157 step
= "Launching k8scluster init tasks"
1159 k8s_deploy_methods
= db_k8scluster
.get("deployment_methods", {})
1160 # for backwards compatibility and all-false case
1161 if not any(k8s_deploy_methods
.values()):
1162 k8s_deploy_methods
= {
1164 "juju-bundle": True,
1165 "helm-chart-v3": True,
1167 deploy_methods
= tuple(filter(k8s_deploy_methods
.get
, k8s_deploy_methods
))
1169 for task_name
in deploy_methods
:
1170 if init_target
and task_name
not in init_target
:
1172 task
= asyncio
.ensure_future(
1173 self
.k8s_map
[task_name
].init_env(
1175 reuse_cluster_uuid
=k8scluster_id
,
1176 vca_id
=db_k8scluster
.get("vca_id"),
1179 pending_tasks
.append(task
)
1180 task2name
[task
] = task_name
1182 error_text_list
= []
1184 reached_timeout
= False
1187 while pending_tasks
:
1189 1, self
.timeout_create
- (time() - now
)
1190 ) # ensure not negative with max
1191 step
= "Waiting for k8scluster init tasks"
1192 done
, pending_tasks
= await asyncio
.wait(
1193 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
1196 # timeout. Set timeout is reached and process pending as if they hase been finished
1197 done
= pending_tasks
1198 pending_tasks
= None
1199 reached_timeout
= True
1201 task_name
= task2name
[task
]
1204 elif task
.cancelled():
1207 exc
= task
.exception()
1210 error_text_list
.append(
1211 "Failing init {}: {}".format(task_name
, exc
)
1213 db_k8scluster_update
[
1214 "_admin.{}.error_msg".format(task_name
)
1216 db_k8scluster_update
["_admin.{}.id".format(task_name
)] = None
1217 db_k8scluster_update
[
1218 "_admin.{}.operationalState".format(task_name
)
1221 logging_text
+ "{} init fail: {}".format(task_name
, exc
),
1222 exc_info
=not isinstance(exc
, (N2VCException
, str)),
1225 k8s_id
, uninstall_sw
= task
.result()
1226 tasks_name_ok
.append(task_name
)
1229 + "{} init success. id={} created={}".format(
1230 task_name
, k8s_id
, uninstall_sw
1233 db_k8scluster_update
[
1234 "_admin.{}.error_msg".format(task_name
)
1236 db_k8scluster_update
["_admin.{}.id".format(task_name
)] = k8s_id
1237 db_k8scluster_update
[
1238 "_admin.{}.created".format(task_name
)
1240 db_k8scluster_update
[
1241 "_admin.{}.operationalState".format(task_name
)
1244 step
= "Updating database for " + task_name
1245 self
.update_db_2("k8sclusters", k8scluster_id
, db_k8scluster_update
)
1247 operation_details
= "ready for " + ", ".join(tasks_name_ok
)
1248 operation_state
= "COMPLETED"
1249 db_k8scluster_update
["_admin.operationalState"] = (
1250 "ENABLED" if not error_text_list
else "DEGRADED"
1252 operation_details
+= "; " + ";".join(error_text_list
)
1254 db_k8scluster_update
["_admin.operationalState"] = "ERROR"
1255 operation_state
= "FAILED"
1256 operation_details
= ";".join(error_text_list
)
1257 db_k8scluster_update
["_admin.detailed-status"] = operation_details
1258 self
.logger
.debug(logging_text
+ "Done. Result: " + operation_state
)
1261 except Exception as e
:
1269 asyncio
.CancelledError
,
1272 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1274 self
.logger
.critical(
1275 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
1279 if exc
and db_k8scluster
:
1280 db_k8scluster_update
["_admin.operationalState"] = "ERROR"
1281 db_k8scluster_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
1284 operation_state
= "FAILED"
1285 operation_details
= "ERROR {}: {}".format(step
, exc
)
1287 if db_k8scluster
and db_k8scluster_update
:
1288 self
.update_db_2("k8sclusters", k8scluster_id
, db_k8scluster_update
)
1290 # Register the operation and unlock
1291 self
.lcm_tasks
.unlock_HA(
1295 operationState
=operation_state
,
1296 detailed_status
=operation_details
,
1298 except DbException
as e
:
1299 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1300 self
.lcm_tasks
.remove("k8scluster", k8scluster_id
, order_id
)
1302 async def delete(self
, k8scluster_content
, order_id
):
1304 # HA tasks and backward compatibility:
1305 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1306 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1307 # Register 'delete' task here for related future HA operations
1308 op_id
= k8scluster_content
.pop("op_id", None)
1309 if not self
.lcm_tasks
.lock_HA("k8scluster", "delete", op_id
):
1312 k8scluster_id
= k8scluster_content
["_id"]
1313 logging_text
= "Task k8scluster_delete={} ".format(k8scluster_id
)
1314 self
.logger
.debug(logging_text
+ "Enter")
1316 db_k8scluster
= None
1317 db_k8scluster_update
= {}
1320 step
= "Getting k8scluster='{}' from db".format(k8scluster_id
)
1321 self
.logger
.debug(logging_text
+ step
)
1322 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": k8scluster_id
})
1323 k8s_hc_id
= deep_get(db_k8scluster
, ("_admin", "helm-chart", "id"))
1324 k8s_h3c_id
= deep_get(db_k8scluster
, ("_admin", "helm-chart-v3", "id"))
1325 k8s_jb_id
= deep_get(db_k8scluster
, ("_admin", "juju-bundle", "id"))
1327 cluster_removed
= True
1328 if k8s_jb_id
: # delete in reverse order of creation
1329 step
= "Removing juju-bundle '{}'".format(k8s_jb_id
)
1331 deep_get(db_k8scluster
, ("_admin", "juju-bundle", "created"))
1334 cluster_removed
= await self
.juju_k8scluster
.reset(
1335 cluster_uuid
=k8s_jb_id
,
1336 uninstall_sw
=uninstall_sw
,
1337 vca_id
=db_k8scluster
.get("vca_id"),
1339 db_k8scluster_update
["_admin.juju-bundle.id"] = None
1340 db_k8scluster_update
["_admin.juju-bundle.operationalState"] = "DISABLED"
1343 step
= "Removing helm-chart '{}'".format(k8s_hc_id
)
1345 deep_get(db_k8scluster
, ("_admin", "helm-chart", "created"))
1348 cluster_removed
= await self
.helm2_k8scluster
.reset(
1349 cluster_uuid
=k8s_hc_id
, uninstall_sw
=uninstall_sw
1351 db_k8scluster_update
["_admin.helm-chart.id"] = None
1352 db_k8scluster_update
["_admin.helm-chart.operationalState"] = "DISABLED"
1355 step
= "Removing helm-chart-v3 '{}'".format(k8s_hc_id
)
1357 deep_get(db_k8scluster
, ("_admin", "helm-chart-v3", "created"))
1360 cluster_removed
= await self
.helm3_k8scluster
.reset(
1361 cluster_uuid
=k8s_h3c_id
, uninstall_sw
=uninstall_sw
1363 db_k8scluster_update
["_admin.helm-chart-v3.id"] = None
1364 db_k8scluster_update
[
1365 "_admin.helm-chart-v3.operationalState"
1368 # Try to remove from cluster_inserted to clean old versions
1369 if k8s_hc_id
and cluster_removed
:
1370 step
= "Removing k8scluster='{}' from k8srepos".format(k8scluster_id
)
1371 self
.logger
.debug(logging_text
+ step
)
1372 db_k8srepo_list
= self
.db
.get_list(
1373 "k8srepos", {"_admin.cluster-inserted": k8s_hc_id
}
1375 for k8srepo
in db_k8srepo_list
:
1377 cluster_list
= k8srepo
["_admin"]["cluster-inserted"]
1378 cluster_list
.remove(k8s_hc_id
)
1382 {"_admin.cluster-inserted": cluster_list
},
1384 except Exception as e
:
1385 self
.logger
.error("{}: {}".format(step
, e
))
1386 self
.db
.del_one("k8sclusters", {"_id": k8scluster_id
})
1387 db_k8scluster_update
= None
1388 self
.logger
.debug(logging_text
+ "Done")
1390 except Exception as e
:
1398 asyncio
.CancelledError
,
1401 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1403 self
.logger
.critical(
1404 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
1408 if exc
and db_k8scluster
:
1409 db_k8scluster_update
["_admin.operationalState"] = "ERROR"
1410 db_k8scluster_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
1413 # Mark the WIM 'create' HA task as erroneous
1414 operation_state
= "FAILED"
1415 operation_details
= "ERROR {}: {}".format(step
, exc
)
1417 operation_state
= "COMPLETED"
1418 operation_details
= "deleted"
1421 if db_k8scluster_update
:
1422 self
.update_db_2("k8sclusters", k8scluster_id
, db_k8scluster_update
)
1423 # Register the K8scluster 'delete' HA task either
1424 # succesful or erroneous, or do nothing (if legacy NBI)
1425 self
.lcm_tasks
.unlock_HA(
1429 operationState
=operation_state
,
1430 detailed_status
=operation_details
,
1432 except DbException
as e
:
1433 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1434 self
.lcm_tasks
.remove("k8scluster", k8scluster_id
, order_id
)
1437 class VcaLcm(LcmBase
):
1440 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
1442 Init, Connect to database, filesystem storage, and messaging
1443 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1447 self
.logger
= logging
.getLogger("lcm.vca")
1449 self
.lcm_tasks
= lcm_tasks
1451 super().__init
__(msg
, self
.logger
)
1453 # create N2VC connector
1454 self
.n2vc
= N2VCJujuConnector(
1455 log
=self
.logger
, loop
=self
.loop
, fs
=self
.fs
, db
=self
.db
1458 def _get_vca_by_id(self
, vca_id
: str) -> dict:
1459 db_vca
= self
.db
.get_one("vca", {"_id": vca_id
})
1460 self
.db
.encrypt_decrypt_fields(
1463 ["secret", "cacert"],
1464 schema_version
=db_vca
["schema_version"],
1469 async def create(self
, vca_content
, order_id
):
1470 op_id
= vca_content
.pop("op_id", None)
1471 if not self
.lcm_tasks
.lock_HA("vca", "create", op_id
):
1474 vca_id
= vca_content
["_id"]
1475 self
.logger
.debug("Task vca_create={} {}".format(vca_id
, "Enter"))
1479 operation_state
= "FAILED"
1480 operation_details
= ""
1483 "Task vca_create={} {}".format(vca_id
, "Getting vca from db")
1485 db_vca
= self
._get
_vca
_by
_id
(vca_id
)
1487 task
= asyncio
.ensure_future(
1489 self
.n2vc
.validate_vca(db_vca
["_id"]),
1490 timeout
=self
.timeout_create
,
1494 await asyncio
.wait([task
], return_when
=asyncio
.FIRST_COMPLETED
)
1495 if task
.exception():
1496 raise task
.exception()
1498 "Task vca_create={} {}".format(
1499 vca_id
, "vca registered and validated successfully"
1502 db_vca_update
["_admin.operationalState"] = "ENABLED"
1503 db_vca_update
["_admin.detailed-status"] = "Connectivity: ok"
1504 operation_details
= "VCA validated"
1505 operation_state
= "COMPLETED"
1508 "Task vca_create={} {}".format(
1509 vca_id
, "Done. Result: {}".format(operation_state
)
1513 except Exception as e
:
1514 error_msg
= "Failed with exception: {}".format(e
)
1515 self
.logger
.error("Task vca_create={} {}".format(vca_id
, error_msg
))
1516 db_vca_update
["_admin.operationalState"] = "ERROR"
1517 db_vca_update
["_admin.detailed-status"] = error_msg
1518 operation_details
= error_msg
1521 self
.update_db_2("vca", vca_id
, db_vca_update
)
1523 # Register the operation and unlock
1524 self
.lcm_tasks
.unlock_HA(
1528 operationState
=operation_state
,
1529 detailed_status
=operation_details
,
1531 except DbException
as e
:
1533 "Task vca_create={} {}".format(
1534 vca_id
, "Cannot update database: {}".format(e
)
1537 self
.lcm_tasks
.remove("vca", vca_id
, order_id
)
1539 async def delete(self
, vca_content
, order_id
):
1541 # HA tasks and backward compatibility:
1542 # If "vim_content" does not include "op_id", we a running a legacy NBI version.
1543 # In such a case, HA is not supported by NBI, "op_id" is None, and lock_HA() will do nothing.
1544 # Register "delete" task here for related future HA operations
1545 op_id
= vca_content
.pop("op_id", None)
1546 if not self
.lcm_tasks
.lock_HA("vca", "delete", op_id
):
1550 vca_id
= vca_content
["_id"]
1552 operation_state
= "FAILED"
1553 operation_details
= ""
1557 "Task vca_delete={} {}".format(vca_id
, "Deleting vca from db")
1559 self
.db
.del_one("vca", {"_id": vca_id
})
1560 db_vca_update
= None
1561 operation_details
= "deleted"
1562 operation_state
= "COMPLETED"
1565 "Task vca_delete={} {}".format(
1566 vca_id
, "Done. Result: {}".format(operation_state
)
1569 except Exception as e
:
1570 error_msg
= "Failed with exception: {}".format(e
)
1571 self
.logger
.error("Task vca_delete={} {}".format(vca_id
, error_msg
))
1572 db_vca_update
["_admin.operationalState"] = "ERROR"
1573 db_vca_update
["_admin.detailed-status"] = error_msg
1574 operation_details
= error_msg
1577 self
.update_db_2("vca", vca_id
, db_vca_update
)
1578 self
.lcm_tasks
.unlock_HA(
1582 operationState
=operation_state
,
1583 detailed_status
=operation_details
,
1585 except DbException
as e
:
1587 "Task vca_delete={} {}".format(
1588 vca_id
, "Cannot update database: {}".format(e
)
1591 self
.lcm_tasks
.remove("vca", vca_id
, order_id
)
1594 class K8sRepoLcm(LcmBase
):
1595 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
1597 Init, Connect to database, filesystem storage, and messaging
1598 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1602 self
.logger
= logging
.getLogger("lcm.k8srepo")
1604 self
.lcm_tasks
= lcm_tasks
1605 self
.vca_config
= config
["VCA"]
1607 super().__init
__(msg
, self
.logger
)
1609 self
.k8srepo
= K8sHelmConnector(
1610 kubectl_command
=self
.vca_config
.get("kubectlpath"),
1611 helm_command
=self
.vca_config
.get("helmpath"),
1618 async def create(self
, k8srepo_content
, order_id
):
1620 # HA tasks and backward compatibility:
1621 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1622 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1623 # Register 'create' task here for related future HA operations
1625 op_id
= k8srepo_content
.pop("op_id", None)
1626 if not self
.lcm_tasks
.lock_HA("k8srepo", "create", op_id
):
1629 k8srepo_id
= k8srepo_content
.get("_id")
1630 logging_text
= "Task k8srepo_create={} ".format(k8srepo_id
)
1631 self
.logger
.debug(logging_text
+ "Enter")
1634 db_k8srepo_update
= {}
1636 operation_state
= "COMPLETED"
1637 operation_details
= ""
1639 step
= "Getting k8srepo-id='{}' from db".format(k8srepo_id
)
1640 self
.logger
.debug(logging_text
+ step
)
1641 db_k8srepo
= self
.db
.get_one("k8srepos", {"_id": k8srepo_id
})
1642 db_k8srepo_update
["_admin.operationalState"] = "ENABLED"
1643 except Exception as e
:
1645 logging_text
+ "Exit Exception {}".format(e
),
1646 exc_info
=not isinstance(
1653 asyncio
.CancelledError
,
1659 if exc
and db_k8srepo
:
1660 db_k8srepo_update
["_admin.operationalState"] = "ERROR"
1661 db_k8srepo_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
1664 # Mark the WIM 'create' HA task as erroneous
1665 operation_state
= "FAILED"
1666 operation_details
= "ERROR {}: {}".format(step
, exc
)
1668 if db_k8srepo_update
:
1669 self
.update_db_2("k8srepos", k8srepo_id
, db_k8srepo_update
)
1670 # Register the K8srepo 'create' HA task either
1671 # succesful or erroneous, or do nothing (if legacy NBI)
1672 self
.lcm_tasks
.unlock_HA(
1676 operationState
=operation_state
,
1677 detailed_status
=operation_details
,
1679 except DbException
as e
:
1680 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1681 self
.lcm_tasks
.remove("k8srepo", k8srepo_id
, order_id
)
1683 async def delete(self
, k8srepo_content
, order_id
):
1685 # HA tasks and backward compatibility:
1686 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1687 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1688 # Register 'delete' task here for related future HA operations
1689 op_id
= k8srepo_content
.pop("op_id", None)
1690 if not self
.lcm_tasks
.lock_HA("k8srepo", "delete", op_id
):
1693 k8srepo_id
= k8srepo_content
.get("_id")
1694 logging_text
= "Task k8srepo_delete={} ".format(k8srepo_id
)
1695 self
.logger
.debug(logging_text
+ "Enter")
1698 db_k8srepo_update
= {}
1701 operation_state
= "COMPLETED"
1702 operation_details
= ""
1704 step
= "Getting k8srepo-id='{}' from db".format(k8srepo_id
)
1705 self
.logger
.debug(logging_text
+ step
)
1706 db_k8srepo
= self
.db
.get_one("k8srepos", {"_id": k8srepo_id
})
1708 except Exception as e
:
1710 logging_text
+ "Exit Exception {}".format(e
),
1711 exc_info
=not isinstance(
1718 asyncio
.CancelledError
,
1724 if exc
and db_k8srepo
:
1725 db_k8srepo_update
["_admin.operationalState"] = "ERROR"
1726 db_k8srepo_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
1729 # Mark the WIM 'create' HA task as erroneous
1730 operation_state
= "FAILED"
1731 operation_details
= "ERROR {}: {}".format(step
, exc
)
1733 if db_k8srepo_update
:
1734 self
.update_db_2("k8srepos", k8srepo_id
, db_k8srepo_update
)
1735 # Register the K8srepo 'delete' HA task either
1736 # succesful or erroneous, or do nothing (if legacy NBI)
1737 self
.lcm_tasks
.unlock_HA(
1741 operationState
=operation_state
,
1742 detailed_status
=operation_details
,
1744 self
.db
.del_one("k8srepos", {"_id": k8srepo_id
})
1745 except DbException
as e
:
1746 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1747 self
.lcm_tasks
.remove("k8srepo", k8srepo_id
, order_id
)