1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 import logging
.handlers
22 from osm_lcm
import ROclient
23 from osm_lcm
.lcm_utils
import LcmException
, LcmBase
, deep_get
24 from n2vc
.k8s_helm_conn
import K8sHelmConnector
25 from n2vc
.k8s_juju_conn
import K8sJujuConnector
26 from n2vc
.exceptions
import K8sException
, N2VCException
27 from osm_common
.dbbase
import DbException
28 from copy
import deepcopy
30 __author__
= "Alfonso Tierno"
33 class VimLcm(LcmBase
):
34 # values that are encrypted at vim config because they are passwords
35 vim_config_encrypted
= {"1.1": ("admin_password", "nsx_password", "vcenter_password"),
36 "default": ("admin_password", "nsx_password", "vcenter_password", "vrops_password")}
38 def __init__(self
, db
, msg
, fs
, lcm_tasks
, config
, loop
):
40 Init, Connect to database, filesystem storage, and messaging
41 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
45 self
.logger
= logging
.getLogger('lcm.vim')
47 self
.lcm_tasks
= lcm_tasks
48 self
.ro_config
= config
["ro_config"]
50 super().__init
__(db
, msg
, fs
, self
.logger
)
52 async def create(self
, vim_content
, order_id
):
54 # HA tasks and backward compatibility:
55 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
56 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
57 # Register 'create' task here for related future HA operations
58 op_id
= vim_content
.pop('op_id', None)
59 if not self
.lcm_tasks
.lock_HA('vim', 'create', op_id
):
62 vim_id
= vim_content
["_id"]
63 logging_text
= "Task vim_create={} ".format(vim_id
)
64 self
.logger
.debug(logging_text
+ "Enter")
70 operationState_HA
= ''
71 detailed_status_HA
= ''
73 step
= "Getting vim-id='{}' from db".format(vim_id
)
74 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
75 if vim_content
.get("config") and vim_content
["config"].get("sdn-controller"):
76 step
= "Getting sdn-controller-id='{}' from db".format(vim_content
["config"]["sdn-controller"])
77 db_sdn
= self
.db
.get_one("sdns", {"_id": vim_content
["config"]["sdn-controller"]})
79 # If the VIM account has an associated SDN account, also
80 # wait for any previous tasks in process for the SDN
81 await self
.lcm_tasks
.waitfor_related_HA('sdn', 'ANY', db_sdn
["_id"])
83 if db_sdn
.get("_admin") and db_sdn
["_admin"].get("deployed") and db_sdn
["_admin"]["deployed"].get("RO"):
84 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
86 raise LcmException("sdn-controller={} is not available. Not deployed at RO".format(
87 vim_content
["config"]["sdn-controller"]))
89 step
= "Creating vim at RO"
90 db_vim_update
["_admin.deployed.RO"] = None
91 db_vim_update
["_admin.detailed-status"] = step
92 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
93 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
94 vim_RO
= deepcopy(vim_content
)
95 vim_RO
.pop("_id", None)
96 vim_RO
.pop("_admin", None)
97 schema_version
= vim_RO
.pop("schema_version", None)
98 vim_RO
.pop("schema_type", None)
99 vim_RO
.pop("vim_tenant_name", None)
100 vim_RO
["type"] = vim_RO
.pop("vim_type")
101 vim_RO
.pop("vim_user", None)
102 vim_RO
.pop("vim_password", None)
104 vim_RO
["config"]["sdn-controller"] = RO_sdn_id
105 desc
= await RO
.create("vim", descriptor
=vim_RO
)
106 RO_vim_id
= desc
["uuid"]
107 db_vim_update
["_admin.deployed.RO"] = RO_vim_id
108 self
.logger
.debug(logging_text
+ "VIM created at RO_vim_id={}".format(RO_vim_id
))
110 step
= "Creating vim_account at RO"
111 db_vim_update
["_admin.detailed-status"] = step
112 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
114 if vim_content
.get("vim_password"):
115 vim_content
["vim_password"] = self
.db
.decrypt(vim_content
["vim_password"],
116 schema_version
=schema_version
,
118 vim_account_RO
= {"vim_tenant_name": vim_content
["vim_tenant_name"],
119 "vim_username": vim_content
["vim_user"],
120 "vim_password": vim_content
["vim_password"]
122 if vim_RO
.get("config"):
123 vim_account_RO
["config"] = vim_RO
["config"]
124 if "sdn-controller" in vim_account_RO
["config"]:
125 del vim_account_RO
["config"]["sdn-controller"]
126 if "sdn-port-mapping" in vim_account_RO
["config"]:
127 del vim_account_RO
["config"]["sdn-port-mapping"]
128 vim_config_encrypted_keys
= self
.vim_config_encrypted
.get(schema_version
) or \
129 self
.vim_config_encrypted
.get("default")
130 for p
in vim_config_encrypted_keys
:
131 if vim_account_RO
["config"].get(p
):
132 vim_account_RO
["config"][p
] = self
.db
.decrypt(vim_account_RO
["config"][p
],
133 schema_version
=schema_version
,
136 desc
= await RO
.attach("vim_account", RO_vim_id
, descriptor
=vim_account_RO
)
137 db_vim_update
["_admin.deployed.RO-account"] = desc
["uuid"]
138 db_vim_update
["_admin.operationalState"] = "ENABLED"
139 db_vim_update
["_admin.detailed-status"] = "Done"
140 # Mark the VIM 'create' HA task as successful
141 operationState_HA
= 'COMPLETED'
142 detailed_status_HA
= 'Done'
144 # await asyncio.sleep(15) # TODO remove. This is for test
145 self
.logger
.debug(logging_text
+ "Exit Ok VIM account created at RO_vim_account_id={}".format(desc
["uuid"]))
148 except (ROclient
.ROClientException
, DbException
) as e
:
149 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
151 except Exception as e
:
152 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
156 db_vim_update
["_admin.operationalState"] = "ERROR"
157 db_vim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
158 # Mark the VIM 'create' HA task as erroneous
159 operationState_HA
= 'FAILED'
160 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
163 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
164 # Register the VIM 'create' HA task either
165 # succesful or erroneous, or do nothing (if legacy NBI)
166 self
.lcm_tasks
.register_HA('vim', 'create', op_id
,
167 operationState
=operationState_HA
,
168 detailed_status
=detailed_status_HA
)
169 except DbException
as e
:
170 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
172 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
174 async def edit(self
, vim_content
, order_id
):
176 # HA tasks and backward compatibility:
177 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
178 # In such a case, HA is not supported by NBI, and the HA check always returns True
179 op_id
= vim_content
.pop('op_id', None)
180 if not self
.lcm_tasks
.lock_HA('vim', 'edit', op_id
):
183 vim_id
= vim_content
["_id"]
184 logging_text
= "Task vim_edit={} ".format(vim_id
)
185 self
.logger
.debug(logging_text
+ "Enter")
192 operationState_HA
= ''
193 detailed_status_HA
= ''
194 step
= "Getting vim-id='{}' from db".format(vim_id
)
196 # wait for any previous tasks in process
197 await self
.lcm_tasks
.waitfor_related_HA('vim', 'edit', op_id
)
199 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
201 if db_vim
.get("_admin") and db_vim
["_admin"].get("deployed") and db_vim
["_admin"]["deployed"].get("RO"):
202 if vim_content
.get("config") and vim_content
["config"].get("sdn-controller"):
203 step
= "Getting sdn-controller-id='{}' from db".format(vim_content
["config"]["sdn-controller"])
204 db_sdn
= self
.db
.get_one("sdns", {"_id": vim_content
["config"]["sdn-controller"]})
206 # If the VIM account has an associated SDN account, also
207 # wait for any previous tasks in process for the SDN
208 await self
.lcm_tasks
.waitfor_related_HA('sdn', 'ANY', db_sdn
["_id"])
210 if db_sdn
.get("_admin") and db_sdn
["_admin"].get("deployed") and db_sdn
["_admin"]["deployed"].get(
212 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
214 raise LcmException("sdn-controller={} is not available. Not deployed at RO".format(
215 vim_content
["config"]["sdn-controller"]))
217 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
218 step
= "Editing vim at RO"
219 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
220 vim_RO
= deepcopy(vim_content
)
221 vim_RO
.pop("_id", None)
222 vim_RO
.pop("_admin", None)
223 schema_version
= vim_RO
.pop("schema_version", None)
224 vim_RO
.pop("schema_type", None)
225 vim_RO
.pop("vim_tenant_name", None)
226 if "vim_type" in vim_RO
:
227 vim_RO
["type"] = vim_RO
.pop("vim_type")
228 vim_RO
.pop("vim_user", None)
229 vim_RO
.pop("vim_password", None)
231 vim_RO
["config"]["sdn-controller"] = RO_sdn_id
232 # TODO make a deep update of sdn-port-mapping
234 await RO
.edit("vim", RO_vim_id
, descriptor
=vim_RO
)
236 step
= "Editing vim-account at RO tenant"
238 if "config" in vim_content
:
239 if "sdn-controller" in vim_content
["config"]:
240 del vim_content
["config"]["sdn-controller"]
241 if "sdn-port-mapping" in vim_content
["config"]:
242 del vim_content
["config"]["sdn-port-mapping"]
243 if not vim_content
["config"]:
244 del vim_content
["config"]
245 if "vim_tenant_name" in vim_content
:
246 vim_account_RO
["vim_tenant_name"] = vim_content
["vim_tenant_name"]
247 if "vim_password" in vim_content
:
248 vim_account_RO
["vim_password"] = vim_content
["vim_password"]
249 if vim_content
.get("vim_password"):
250 vim_account_RO
["vim_password"] = self
.db
.decrypt(vim_content
["vim_password"],
251 schema_version
=schema_version
,
253 if "config" in vim_content
:
254 vim_account_RO
["config"] = vim_content
["config"]
255 if vim_content
.get("config"):
256 vim_config_encrypted_keys
= self
.vim_config_encrypted
.get(schema_version
) or \
257 self
.vim_config_encrypted
.get("default")
258 for p
in vim_config_encrypted_keys
:
259 if vim_content
["config"].get(p
):
260 vim_account_RO
["config"][p
] = self
.db
.decrypt(vim_content
["config"][p
],
261 schema_version
=schema_version
,
264 if "vim_user" in vim_content
:
265 vim_content
["vim_username"] = vim_content
["vim_user"]
266 # vim_account must be edited always even if empty in order to ensure changes are translated to RO
267 # vim_thread. RO will remove and relaunch a new thread for this vim_account
268 await RO
.edit("vim_account", RO_vim_id
, descriptor
=vim_account_RO
)
269 db_vim_update
["_admin.operationalState"] = "ENABLED"
270 # Mark the VIM 'edit' HA task as successful
271 operationState_HA
= 'COMPLETED'
272 detailed_status_HA
= 'Done'
274 self
.logger
.debug(logging_text
+ "Exit Ok RO_vim_id={}".format(RO_vim_id
))
277 except (ROclient
.ROClientException
, DbException
) as e
:
278 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
280 except Exception as e
:
281 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
285 db_vim_update
["_admin.operationalState"] = "ERROR"
286 db_vim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
287 # Mark the VIM 'edit' HA task as erroneous
288 operationState_HA
= 'FAILED'
289 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
292 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
293 # Register the VIM 'edit' HA task either
294 # succesful or erroneous, or do nothing (if legacy NBI)
295 self
.lcm_tasks
.register_HA('vim', 'edit', op_id
,
296 operationState
=operationState_HA
,
297 detailed_status
=detailed_status_HA
)
298 except DbException
as e
:
299 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
301 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
303 async def delete(self
, vim_content
, order_id
):
305 # HA tasks and backward compatibility:
306 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
307 # In such a case, HA is not supported by NBI, and the HA check always returns True
308 op_id
= vim_content
.pop('op_id', None)
309 if not self
.lcm_tasks
.lock_HA('vim', 'delete', op_id
):
312 vim_id
= vim_content
["_id"]
313 logging_text
= "Task vim_delete={} ".format(vim_id
)
314 self
.logger
.debug(logging_text
+ "Enter")
319 operationState_HA
= ''
320 detailed_status_HA
= ''
321 step
= "Getting vim from db"
323 # wait for any previous tasks in process
324 await self
.lcm_tasks
.waitfor_related_HA('vim', 'delete', op_id
)
326 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
327 if db_vim
.get("_admin") and db_vim
["_admin"].get("deployed") and db_vim
["_admin"]["deployed"].get("RO"):
328 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
329 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
330 step
= "Detaching vim from RO tenant"
332 await RO
.detach("vim_account", RO_vim_id
)
333 except ROclient
.ROClientException
as e
:
334 if e
.http_code
== 404: # not found
335 self
.logger
.debug(logging_text
+ "RO_vim_id={} already detached".format(RO_vim_id
))
339 step
= "Deleting vim from RO"
341 await RO
.delete("vim", RO_vim_id
)
342 except ROclient
.ROClientException
as e
:
343 if e
.http_code
== 404: # not found
344 self
.logger
.debug(logging_text
+ "RO_vim_id={} already deleted".format(RO_vim_id
))
349 self
.logger
.error(logging_text
+ "Nothing to remove at RO")
350 self
.db
.del_one("vim_accounts", {"_id": vim_id
})
352 self
.logger
.debug(logging_text
+ "Exit Ok")
355 except (ROclient
.ROClientException
, DbException
) as e
:
356 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
358 except Exception as e
:
359 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
362 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
364 db_vim_update
["_admin.operationalState"] = "ERROR"
365 db_vim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
366 # Mark the VIM 'delete' HA task as erroneous
367 operationState_HA
= 'FAILED'
368 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
369 self
.lcm_tasks
.register_HA('vim', 'delete', op_id
,
370 operationState
=operationState_HA
,
371 detailed_status
=detailed_status_HA
)
373 if db_vim
and db_vim_update
:
374 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
375 # If the VIM 'delete' HA task was succesful, the DB entry has been deleted,
376 # which means that there is nowhere to register this task, so do nothing here.
377 except DbException
as e
:
378 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
379 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
382 class WimLcm(LcmBase
):
383 # values that are encrypted at wim config because they are passwords
384 wim_config_encrypted
= ()
386 def __init__(self
, db
, msg
, fs
, lcm_tasks
, config
, loop
):
388 Init, Connect to database, filesystem storage, and messaging
389 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
393 self
.logger
= logging
.getLogger('lcm.vim')
395 self
.lcm_tasks
= lcm_tasks
396 self
.ro_config
= config
["ro_config"]
398 super().__init
__(db
, msg
, fs
, self
.logger
)
400 async def create(self
, wim_content
, order_id
):
402 # HA tasks and backward compatibility:
403 # If 'wim_content' does not include 'op_id', we a running a legacy NBI version.
404 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
405 # Register 'create' task here for related future HA operations
406 op_id
= wim_content
.pop('op_id', None)
407 self
.lcm_tasks
.lock_HA('wim', 'create', op_id
)
409 wim_id
= wim_content
["_id"]
410 logging_text
= "Task wim_create={} ".format(wim_id
)
411 self
.logger
.debug(logging_text
+ "Enter")
416 operationState_HA
= ''
417 detailed_status_HA
= ''
419 step
= "Getting wim-id='{}' from db".format(wim_id
)
420 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_id
})
421 db_wim_update
["_admin.deployed.RO"] = None
423 step
= "Creating wim at RO"
424 db_wim_update
["_admin.detailed-status"] = step
425 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
426 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
427 wim_RO
= deepcopy(wim_content
)
428 wim_RO
.pop("_id", None)
429 wim_RO
.pop("_admin", None)
430 schema_version
= wim_RO
.pop("schema_version", None)
431 wim_RO
.pop("schema_type", None)
432 wim_RO
.pop("wim_tenant_name", None)
433 wim_RO
["type"] = wim_RO
.pop("wim_type")
434 wim_RO
.pop("wim_user", None)
435 wim_RO
.pop("wim_password", None)
436 desc
= await RO
.create("wim", descriptor
=wim_RO
)
437 RO_wim_id
= desc
["uuid"]
438 db_wim_update
["_admin.deployed.RO"] = RO_wim_id
439 self
.logger
.debug(logging_text
+ "WIM created at RO_wim_id={}".format(RO_wim_id
))
441 step
= "Creating wim_account at RO"
442 db_wim_update
["_admin.detailed-status"] = step
443 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
445 if wim_content
.get("wim_password"):
446 wim_content
["wim_password"] = self
.db
.decrypt(wim_content
["wim_password"],
447 schema_version
=schema_version
,
449 wim_account_RO
= {"name": wim_content
["name"],
450 "user": wim_content
["user"],
451 "password": wim_content
["password"]
453 if wim_RO
.get("config"):
454 wim_account_RO
["config"] = wim_RO
["config"]
455 if "wim_port_mapping" in wim_account_RO
["config"]:
456 del wim_account_RO
["config"]["wim_port_mapping"]
457 for p
in self
.wim_config_encrypted
:
458 if wim_account_RO
["config"].get(p
):
459 wim_account_RO
["config"][p
] = self
.db
.decrypt(wim_account_RO
["config"][p
],
460 schema_version
=schema_version
,
463 desc
= await RO
.attach("wim_account", RO_wim_id
, descriptor
=wim_account_RO
)
464 db_wim_update
["_admin.deployed.RO-account"] = desc
["uuid"]
465 db_wim_update
["_admin.operationalState"] = "ENABLED"
466 db_wim_update
["_admin.detailed-status"] = "Done"
467 # Mark the WIM 'create' HA task as successful
468 operationState_HA
= 'COMPLETED'
469 detailed_status_HA
= 'Done'
471 self
.logger
.debug(logging_text
+ "Exit Ok WIM account created at RO_wim_account_id={}".format(desc
["uuid"]))
474 except (ROclient
.ROClientException
, DbException
) as e
:
475 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
477 except Exception as e
:
478 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
482 db_wim_update
["_admin.operationalState"] = "ERROR"
483 db_wim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
484 # Mark the WIM 'create' HA task as erroneous
485 operationState_HA
= 'FAILED'
486 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
489 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
490 # Register the WIM 'create' HA task either
491 # succesful or erroneous, or do nothing (if legacy NBI)
492 self
.lcm_tasks
.register_HA('wim', 'create', op_id
,
493 operationState
=operationState_HA
,
494 detailed_status
=detailed_status_HA
)
495 except DbException
as e
:
496 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
497 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
499 async def edit(self
, wim_content
, order_id
):
501 # HA tasks and backward compatibility:
502 # If 'wim_content' does not include 'op_id', we a running a legacy NBI version.
503 # In such a case, HA is not supported by NBI, and the HA check always returns True
504 op_id
= wim_content
.pop('op_id', None)
505 if not self
.lcm_tasks
.lock_HA('wim', 'edit', op_id
):
508 wim_id
= wim_content
["_id"]
509 logging_text
= "Task wim_edit={} ".format(wim_id
)
510 self
.logger
.debug(logging_text
+ "Enter")
516 step
= "Getting wim-id='{}' from db".format(wim_id
)
517 operationState_HA
= ''
518 detailed_status_HA
= ''
520 # wait for any previous tasks in process
521 await self
.lcm_tasks
.waitfor_related_HA('wim', 'edit', op_id
)
523 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_id
})
525 if db_wim
.get("_admin") and db_wim
["_admin"].get("deployed") and db_wim
["_admin"]["deployed"].get("RO"):
527 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO"]
528 step
= "Editing wim at RO"
529 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
530 wim_RO
= deepcopy(wim_content
)
531 wim_RO
.pop("_id", None)
532 wim_RO
.pop("_admin", None)
533 schema_version
= wim_RO
.pop("schema_version", None)
534 wim_RO
.pop("schema_type", None)
535 wim_RO
.pop("wim_tenant_name", None)
536 if "wim_type" in wim_RO
:
537 wim_RO
["type"] = wim_RO
.pop("wim_type")
538 wim_RO
.pop("wim_user", None)
539 wim_RO
.pop("wim_password", None)
540 # TODO make a deep update of wim_port_mapping
542 await RO
.edit("wim", RO_wim_id
, descriptor
=wim_RO
)
544 step
= "Editing wim-account at RO tenant"
546 if "config" in wim_content
:
547 if "wim_port_mapping" in wim_content
["config"]:
548 del wim_content
["config"]["wim_port_mapping"]
549 if not wim_content
["config"]:
550 del wim_content
["config"]
551 if "wim_tenant_name" in wim_content
:
552 wim_account_RO
["wim_tenant_name"] = wim_content
["wim_tenant_name"]
553 if "wim_password" in wim_content
:
554 wim_account_RO
["wim_password"] = wim_content
["wim_password"]
555 if wim_content
.get("wim_password"):
556 wim_account_RO
["wim_password"] = self
.db
.decrypt(wim_content
["wim_password"],
557 schema_version
=schema_version
,
559 if "config" in wim_content
:
560 wim_account_RO
["config"] = wim_content
["config"]
561 if wim_content
.get("config"):
562 for p
in self
.wim_config_encrypted
:
563 if wim_content
["config"].get(p
):
564 wim_account_RO
["config"][p
] = self
.db
.decrypt(wim_content
["config"][p
],
565 schema_version
=schema_version
,
568 if "wim_user" in wim_content
:
569 wim_content
["wim_username"] = wim_content
["wim_user"]
570 # wim_account must be edited always even if empty in order to ensure changes are translated to RO
571 # wim_thread. RO will remove and relaunch a new thread for this wim_account
572 await RO
.edit("wim_account", RO_wim_id
, descriptor
=wim_account_RO
)
573 db_wim_update
["_admin.operationalState"] = "ENABLED"
574 # Mark the WIM 'edit' HA task as successful
575 operationState_HA
= 'COMPLETED'
576 detailed_status_HA
= 'Done'
578 self
.logger
.debug(logging_text
+ "Exit Ok RO_wim_id={}".format(RO_wim_id
))
581 except (ROclient
.ROClientException
, DbException
) as e
:
582 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
584 except Exception as e
:
585 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
589 db_wim_update
["_admin.operationalState"] = "ERROR"
590 db_wim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
591 # Mark the WIM 'edit' HA task as erroneous
592 operationState_HA
= 'FAILED'
593 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
596 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
597 # Register the WIM 'edit' HA task either
598 # succesful or erroneous, or do nothing (if legacy NBI)
599 self
.lcm_tasks
.register_HA('wim', 'edit', op_id
,
600 operationState
=operationState_HA
,
601 detailed_status
=detailed_status_HA
)
602 except DbException
as e
:
603 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
604 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
606 async def delete(self
, wim_content
, order_id
):
608 # HA tasks and backward compatibility:
609 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
610 # In such a case, HA is not supported by NBI, and the HA check always returns True
611 op_id
= wim_content
.pop('op_id', None)
612 if not self
.lcm_tasks
.lock_HA('wim', 'delete', op_id
):
615 wim_id
= wim_content
["_id"]
616 logging_text
= "Task wim_delete={} ".format(wim_id
)
617 self
.logger
.debug(logging_text
+ "Enter")
622 step
= "Getting wim from db"
623 operationState_HA
= ''
624 detailed_status_HA
= ''
626 # wait for any previous tasks in process
627 await self
.lcm_tasks
.waitfor_related_HA('wim', 'delete', op_id
)
629 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_id
})
630 if db_wim
.get("_admin") and db_wim
["_admin"].get("deployed") and db_wim
["_admin"]["deployed"].get("RO"):
631 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO"]
632 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
633 step
= "Detaching wim from RO tenant"
635 await RO
.detach("wim_account", RO_wim_id
)
636 except ROclient
.ROClientException
as e
:
637 if e
.http_code
== 404: # not found
638 self
.logger
.debug(logging_text
+ "RO_wim_id={} already detached".format(RO_wim_id
))
642 step
= "Deleting wim from RO"
644 await RO
.delete("wim", RO_wim_id
)
645 except ROclient
.ROClientException
as e
:
646 if e
.http_code
== 404: # not found
647 self
.logger
.debug(logging_text
+ "RO_wim_id={} already deleted".format(RO_wim_id
))
652 self
.logger
.error(logging_text
+ "Nohing to remove at RO")
653 self
.db
.del_one("wim_accounts", {"_id": wim_id
})
655 self
.logger
.debug(logging_text
+ "Exit Ok")
658 except (ROclient
.ROClientException
, DbException
) as e
:
659 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
661 except Exception as e
:
662 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
665 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
667 db_wim_update
["_admin.operationalState"] = "ERROR"
668 db_wim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
669 # Mark the WIM 'delete' HA task as erroneous
670 operationState_HA
= 'FAILED'
671 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
672 self
.lcm_tasks
.register_HA('wim', 'delete', op_id
,
673 operationState
=operationState_HA
,
674 detailed_status
=detailed_status_HA
)
676 if db_wim
and db_wim_update
:
677 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
678 # If the WIM 'delete' HA task was succesful, the DB entry has been deleted,
679 # which means that there is nowhere to register this task, so do nothing here.
680 except DbException
as e
:
681 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
682 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
685 class SdnLcm(LcmBase
):
687 def __init__(self
, db
, msg
, fs
, lcm_tasks
, config
, loop
):
689 Init, Connect to database, filesystem storage, and messaging
690 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
694 self
.logger
= logging
.getLogger('lcm.sdn')
696 self
.lcm_tasks
= lcm_tasks
697 self
.ro_config
= config
["ro_config"]
699 super().__init
__(db
, msg
, fs
, self
.logger
)
701 async def create(self
, sdn_content
, order_id
):
703 # HA tasks and backward compatibility:
704 # If 'sdn_content' does not include 'op_id', we a running a legacy NBI version.
705 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
706 # Register 'create' task here for related future HA operations
707 op_id
= sdn_content
.pop('op_id', None)
708 self
.lcm_tasks
.lock_HA('sdn', 'create', op_id
)
710 sdn_id
= sdn_content
["_id"]
711 logging_text
= "Task sdn_create={} ".format(sdn_id
)
712 self
.logger
.debug(logging_text
+ "Enter")
718 operationState_HA
= ''
719 detailed_status_HA
= ''
721 step
= "Getting sdn from db"
722 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
723 db_sdn_update
["_admin.deployed.RO"] = None
725 step
= "Creating sdn at RO"
726 db_sdn_update
["_admin.detailed-status"] = step
727 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
729 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
730 sdn_RO
= deepcopy(sdn_content
)
731 sdn_RO
.pop("_id", None)
732 sdn_RO
.pop("_admin", None)
733 schema_version
= sdn_RO
.pop("schema_version", None)
734 sdn_RO
.pop("schema_type", None)
735 sdn_RO
.pop("description", None)
736 if sdn_RO
.get("password"):
737 sdn_RO
["password"] = self
.db
.decrypt(sdn_RO
["password"], schema_version
=schema_version
, salt
=sdn_id
)
739 desc
= await RO
.create("sdn", descriptor
=sdn_RO
)
740 RO_sdn_id
= desc
["uuid"]
741 db_sdn_update
["_admin.deployed.RO"] = RO_sdn_id
742 db_sdn_update
["_admin.operationalState"] = "ENABLED"
743 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id={}".format(RO_sdn_id
))
744 # Mark the SDN 'create' HA task as successful
745 operationState_HA
= 'COMPLETED'
746 detailed_status_HA
= 'Done'
749 except (ROclient
.ROClientException
, DbException
) as e
:
750 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
752 except Exception as e
:
753 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
757 db_sdn_update
["_admin.operationalState"] = "ERROR"
758 db_sdn_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
759 # Mark the SDN 'create' HA task as erroneous
760 operationState_HA
= 'FAILED'
761 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
763 if db_sdn
and db_sdn_update
:
764 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
765 # Register the SDN 'create' HA task either
766 # succesful or erroneous, or do nothing (if legacy NBI)
767 self
.lcm_tasks
.register_HA('sdn', 'create', op_id
,
768 operationState
=operationState_HA
,
769 detailed_status
=detailed_status_HA
)
770 except DbException
as e
:
771 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
772 self
.lcm_tasks
.remove("sdn", sdn_id
, order_id
)
774 async def edit(self
, sdn_content
, order_id
):
776 # HA tasks and backward compatibility:
777 # If 'sdn_content' does not include 'op_id', we a running a legacy NBI version.
778 # In such a case, HA is not supported by NBI, and the HA check always returns True
779 op_id
= sdn_content
.pop('op_id', None)
780 if not self
.lcm_tasks
.lock_HA('sdn', 'edit', op_id
):
783 sdn_id
= sdn_content
["_id"]
784 logging_text
= "Task sdn_edit={} ".format(sdn_id
)
785 self
.logger
.debug(logging_text
+ "Enter")
790 operationState_HA
= ''
791 detailed_status_HA
= ''
792 step
= "Getting sdn from db"
794 # wait for any previous tasks in process
795 await self
.lcm_tasks
.waitfor_related_HA('sdn', 'edit', op_id
)
797 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
799 if db_sdn
.get("_admin") and db_sdn
["_admin"].get("deployed") and db_sdn
["_admin"]["deployed"].get("RO"):
800 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
801 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
802 step
= "Editing sdn at RO"
803 sdn_RO
= deepcopy(sdn_content
)
804 sdn_RO
.pop("_id", None)
805 sdn_RO
.pop("_admin", None)
806 schema_version
= sdn_RO
.pop("schema_version", None)
807 sdn_RO
.pop("schema_type", None)
808 sdn_RO
.pop("description", None)
809 if sdn_RO
.get("password"):
810 sdn_RO
["password"] = self
.db
.decrypt(sdn_RO
["password"], schema_version
=schema_version
, salt
=sdn_id
)
812 await RO
.edit("sdn", RO_sdn_id
, descriptor
=sdn_RO
)
813 db_sdn_update
["_admin.operationalState"] = "ENABLED"
814 # Mark the SDN 'edit' HA task as successful
815 operationState_HA
= 'COMPLETED'
816 detailed_status_HA
= 'Done'
818 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id={}".format(RO_sdn_id
))
821 except (ROclient
.ROClientException
, DbException
) as e
:
822 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
824 except Exception as e
:
825 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
829 db_sdn
["_admin.operationalState"] = "ERROR"
830 db_sdn
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
831 # Mark the SDN 'edit' HA task as erroneous
832 operationState_HA
= 'FAILED'
833 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
836 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
837 # Register the SDN 'edit' HA task either
838 # succesful or erroneous, or do nothing (if legacy NBI)
839 self
.lcm_tasks
.register_HA('sdn', 'edit', op_id
,
840 operationState
=operationState_HA
,
841 detailed_status
=detailed_status_HA
)
842 except DbException
as e
:
843 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
844 self
.lcm_tasks
.remove("sdn", sdn_id
, order_id
)
846 async def delete(self
, sdn_content
, order_id
):
848 # HA tasks and backward compatibility:
849 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
850 # In such a case, HA is not supported by NBI, and the HA check always returns True
851 op_id
= sdn_content
.pop('op_id', None)
852 if not self
.lcm_tasks
.lock_HA('sdn', 'delete', op_id
):
855 sdn_id
= sdn_content
["_id"]
856 logging_text
= "Task sdn_delete={} ".format(sdn_id
)
857 self
.logger
.debug(logging_text
+ "Enter")
862 operationState_HA
= ''
863 detailed_status_HA
= ''
864 step
= "Getting sdn from db"
866 # wait for any previous tasks in process
867 await self
.lcm_tasks
.waitfor_related_HA('sdn', 'delete', op_id
)
869 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
870 if db_sdn
.get("_admin") and db_sdn
["_admin"].get("deployed") and db_sdn
["_admin"]["deployed"].get("RO"):
871 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
872 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
873 step
= "Deleting sdn from RO"
875 await RO
.delete("sdn", RO_sdn_id
)
876 except ROclient
.ROClientException
as e
:
877 if e
.http_code
== 404: # not found
878 self
.logger
.debug(logging_text
+ "RO_sdn_id={} already deleted".format(RO_sdn_id
))
883 self
.logger
.error(logging_text
+ "Skipping. There is not RO information at database")
884 self
.db
.del_one("sdns", {"_id": sdn_id
})
886 self
.logger
.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id
))
889 except (ROclient
.ROClientException
, DbException
) as e
:
890 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
892 except Exception as e
:
893 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
897 db_sdn
["_admin.operationalState"] = "ERROR"
898 db_sdn
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
899 # Mark the SDN 'delete' HA task as erroneous
900 operationState_HA
= 'FAILED'
901 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
902 self
.lcm_tasks
.register_HA('sdn', 'delete', op_id
,
903 operationState
=operationState_HA
,
904 detailed_status
=detailed_status_HA
)
906 if db_sdn
and db_sdn_update
:
907 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
908 # If the SDN 'delete' HA task was succesful, the DB entry has been deleted,
909 # which means that there is nowhere to register this task, so do nothing here.
910 except DbException
as e
:
911 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
912 self
.lcm_tasks
.remove("sdn", sdn_id
, order_id
)
915 class K8sClusterLcm(LcmBase
):
917 def __init__(self
, db
, msg
, fs
, lcm_tasks
, config
, loop
):
919 Init, Connect to database, filesystem storage, and messaging
920 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
924 self
.logger
= logging
.getLogger('lcm.k8scluster')
926 self
.lcm_tasks
= lcm_tasks
927 self
.vca_config
= config
["VCA"]
931 self
.helm_k8scluster
= K8sHelmConnector(
932 kubectl_command
=self
.vca_config
.get("kubectlpath"),
933 helm_command
=self
.vca_config
.get("helmpath"),
940 self
.juju_k8scluster
= K8sJujuConnector(
941 kubectl_command
=self
.vca_config
.get("kubectlpath"),
942 juju_command
=self
.vca_config
.get("jujupath"),
949 super().__init
__(db
, msg
, fs
, self
.logger
)
951 async def create(self
, k8scluster_content
, order_id
):
953 # HA tasks and backward compatibility:
954 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
955 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
956 # Register 'create' task here for related future HA operations
957 op_id
= k8scluster_content
.pop('op_id', None)
958 if not self
.lcm_tasks
.lock_HA('k8scluster', 'create', op_id
):
961 k8scluster_id
= k8scluster_content
["_id"]
962 logging_text
= "Task k8scluster_create={} ".format(k8scluster_id
)
963 self
.logger
.debug(logging_text
+ "Enter")
966 db_k8scluster_update
= {}
969 operationState_HA
= ''
970 detailed_status_HA
= ''
972 step
= "Getting k8scluster-id='{}' from db".format(k8scluster_id
)
973 self
.logger
.debug(logging_text
+ step
)
974 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": k8scluster_id
})
975 self
.db
.encrypt_decrypt_fields(db_k8scluster
.get("credentials"), 'decrypt', ['password', 'secret'],
976 schema_version
=db_k8scluster
["schema_version"], salt
=db_k8scluster
["_id"])
977 k8s_credentials
= yaml
.safe_dump(db_k8scluster
.get("credentials"))
979 init_target
= deep_get(db_k8scluster
, ("_admin", "init"))
981 if not init_target
or "helm-chart" in init_target
:
984 k8s_hc_id
, uninstall_sw
= await self
.helm_k8scluster
.init_env(k8s_credentials
,
985 reuse_cluster_uuid
=k8scluster_id
)
986 db_k8scluster_update
["_admin.helm-chart.id"] = k8s_hc_id
987 db_k8scluster_update
["_admin.helm-chart.created"] = uninstall_sw
988 except Exception as e
:
989 error_text_list
.append("Failing init helm-chart: {}".format(e
))
990 db_k8scluster_update
["_admin.helm-chart.error_msg"] = str(e
)
991 if isinstance(e
, K8sException
):
992 self
.logger
.error(logging_text
+ "Failing init helm-chart: {}".format(e
))
994 self
.logger
.error(logging_text
+ "Failing init helm-chart: {}".format(e
), exc_info
=True)
996 if not init_target
or "juju-bundle" in init_target
:
1000 k8s_jb_id
, uninstall_sw
= await self
.juju_k8scluster
.init_env(
1002 reuse_cluster_uuid
=k8scluster_id
1004 db_k8scluster_update
["_admin.juju-bundle.id"] = k8s_jb_id
1005 db_k8scluster_update
["_admin.juju-bundle.created"] = uninstall_sw
1006 except Exception as e
:
1007 error_text_list
.append("Failing init juju-bundle: {}".format(e
))
1008 db_k8scluster_update
["_admin.juju-bundle.error_msg"] = str(e
)
1009 if isinstance(e
, N2VCException
):
1010 self
.logger
.error(logging_text
+ "Failing init juju-bundle: {}".format(e
))
1012 self
.logger
.error(logging_text
+ "Failing init juju-bundle: {}".format(e
), exc_info
=True)
1014 # mark as an error if both helm-chart and juju-bundle have been failed
1015 if k8s_hc_id
or k8s_jb_id
:
1016 self
.logger
.debug(logging_text
+ "successfully created")
1017 db_k8scluster_update
["_admin.operationalState"] = "ENABLED"
1019 self
.logger
.debug(logging_text
+ "created with errors")
1020 db_k8scluster_update
["_admin.operationalState"] = "ERROR"
1021 db_k8scluster_update
["_admin.detailed-status"] = ";".join(error_text_list
)
1023 except Exception as e
:
1024 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
1027 if exc
and db_k8scluster
:
1028 db_k8scluster_update
["_admin.operationalState"] = "ERROR"
1029 db_k8scluster_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
1031 # Mark the k8scluster 'create' HA task as erroneous
1032 operationState_HA
= 'FAILED'
1033 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
1035 if db_k8scluster_update
:
1036 self
.update_db_2("k8sclusters", k8scluster_id
, db_k8scluster_update
)
1038 # Register the K8scluster 'create' HA task either
1039 # succesful or erroneous, or do nothing (if legacy NBI)
1040 self
.lcm_tasks
.register_HA('k8scluster', 'create', op_id
,
1041 operationState
=operationState_HA
,
1042 detailed_status
=detailed_status_HA
)
1043 except DbException
as e
:
1044 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1045 self
.lcm_tasks
.remove("k8scluster", k8scluster_id
, order_id
)
1047 async def delete(self
, k8scluster_content
, order_id
):
1049 # HA tasks and backward compatibility:
1050 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1051 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1052 # Register 'delete' task here for related future HA operations
1053 op_id
= k8scluster_content
.pop('op_id', None)
1054 if not self
.lcm_tasks
.lock_HA('k8scluster', 'delete', op_id
):
1057 k8scluster_id
= k8scluster_content
["_id"]
1058 logging_text
= "Task k8scluster_delete={} ".format(k8scluster_id
)
1059 self
.logger
.debug(logging_text
+ "Enter")
1061 db_k8scluster
= None
1062 db_k8scluster_update
= {}
1064 operationState_HA
= ''
1065 detailed_status_HA
= ''
1067 step
= "Getting k8scluster='{}' from db".format(k8scluster_id
)
1068 self
.logger
.debug(logging_text
+ step
)
1069 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": k8scluster_id
})
1070 k8s_hc_id
= deep_get(db_k8scluster
, ("_admin", "helm-chart", "id"))
1071 k8s_jb_id
= deep_get(db_k8scluster
, ("_admin", "juju-bundle", "id"))
1073 uninstall_sw
= deep_get(db_k8scluster
, ("_admin", "helm-chart", "created"))
1074 cluster_removed
= True
1075 if k8s_jb_id
: # delete in reverse order of creation
1076 step
= "Removing juju-bundle '{}'".format(k8s_jb_id
)
1077 uninstall_sw
= uninstall_sw
or False
1078 cluster_removed
= await self
.juju_k8scluster
.reset(cluster_uuid
=k8s_jb_id
, uninstall_sw
=uninstall_sw
)
1079 db_k8scluster_update
["_admin.juju-bundle.id"] = None
1082 step
= "Removing helm-chart '{}'".format(k8s_hc_id
)
1083 uninstall_sw
= uninstall_sw
or False
1084 cluster_removed
= await self
.helm_k8scluster
.reset(cluster_uuid
=k8s_hc_id
, uninstall_sw
=uninstall_sw
)
1085 db_k8scluster_update
["_admin.helm-chart.id"] = None
1087 # Try to remove from cluster_inserted to clean old versions
1088 if k8s_hc_id
and cluster_removed
:
1089 step
= "Removing k8scluster='{}' from k8srepos".format(k8scluster_id
)
1090 self
.logger
.debug(logging_text
+ step
)
1091 db_k8srepo_list
= self
.db
.get_list("k8srepos", {"_admin.cluster-inserted": k8s_hc_id
})
1092 for k8srepo
in db_k8srepo_list
:
1094 cluster_list
= k8srepo
["_admin"]["cluster-inserted"]
1095 cluster_list
.remove(k8s_hc_id
)
1096 self
.update_db_2("k8srepos", k8srepo
["_id"], {"_admin.cluster-inserted": cluster_list
})
1097 except Exception as e
:
1098 self
.logger
.error("{}: {}".format(step
, e
))
1099 self
.db
.del_one("k8sclusters", {"_id": k8scluster_id
})
1100 db_k8scluster_update
= None
1101 self
.logger
.debug(logging_text
+ "Done")
1103 except Exception as e
:
1104 if isinstance(e
, (LcmException
, DbException
, K8sException
, N2VCException
)):
1105 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1107 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
1110 if exc
and db_k8scluster
:
1111 db_k8scluster_update
["_admin.operationalState"] = "ERROR"
1112 db_k8scluster_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
1113 # Mark the WIM 'create' HA task as erroneous
1114 operationState_HA
= 'FAILED'
1115 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
1117 if db_k8scluster_update
:
1118 self
.update_db_2("k8sclusters", k8scluster_id
, db_k8scluster_update
)
1119 # Register the K8scluster 'delete' HA task either
1120 # succesful or erroneous, or do nothing (if legacy NBI)
1121 self
.lcm_tasks
.register_HA('k8scluster', 'delete', op_id
,
1122 operationState
=operationState_HA
,
1123 detailed_status
=detailed_status_HA
)
1124 except DbException
as e
:
1125 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1126 self
.lcm_tasks
.remove("k8scluster", k8scluster_id
, order_id
)
1129 class K8sRepoLcm(LcmBase
):
1131 def __init__(self
, db
, msg
, fs
, lcm_tasks
, config
, loop
):
1133 Init, Connect to database, filesystem storage, and messaging
1134 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1138 self
.logger
= logging
.getLogger('lcm.k8srepo')
1140 self
.lcm_tasks
= lcm_tasks
1141 self
.vca_config
= config
["VCA"]
1145 self
.k8srepo
= K8sHelmConnector(
1146 kubectl_command
=self
.vca_config
.get("kubectlpath"),
1147 helm_command
=self
.vca_config
.get("helmpath"),
1154 super().__init
__(db
, msg
, fs
, self
.logger
)
1156 async def create(self
, k8srepo_content
, order_id
):
1158 # HA tasks and backward compatibility:
1159 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1160 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1161 # Register 'create' task here for related future HA operations
1163 op_id
= k8srepo_content
.pop('op_id', None)
1164 if not self
.lcm_tasks
.lock_HA('k8srepo', 'create', op_id
):
1167 k8srepo_id
= k8srepo_content
.get("_id")
1168 logging_text
= "Task k8srepo_create={} ".format(k8srepo_id
)
1169 self
.logger
.debug(logging_text
+ "Enter")
1172 db_k8srepo_update
= {}
1174 operationState_HA
= ''
1175 detailed_status_HA
= ''
1177 step
= "Getting k8srepo-id='{}' from db".format(k8srepo_id
)
1178 self
.logger
.debug(logging_text
+ step
)
1179 db_k8srepo
= self
.db
.get_one("k8srepos", {"_id": k8srepo_id
})
1180 db_k8srepo_update
["_admin.operationalState"] = "ENABLED"
1181 except Exception as e
:
1182 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
1185 if exc
and db_k8srepo
:
1186 db_k8srepo_update
["_admin.operationalState"] = "ERROR"
1187 db_k8srepo_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
1188 # Mark the WIM 'create' HA task as erroneous
1189 operationState_HA
= 'FAILED'
1190 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
1192 if db_k8srepo_update
:
1193 self
.update_db_2("k8srepos", k8srepo_id
, db_k8srepo_update
)
1194 # Register the K8srepo 'create' HA task either
1195 # succesful or erroneous, or do nothing (if legacy NBI)
1196 self
.lcm_tasks
.register_HA('k8srepo', 'create', op_id
,
1197 operationState
=operationState_HA
,
1198 detailed_status
=detailed_status_HA
)
1199 except DbException
as e
:
1200 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1201 self
.lcm_tasks
.remove("k8srepo", k8srepo_id
, order_id
)
1203 async def delete(self
, k8srepo_content
, order_id
):
1205 # HA tasks and backward compatibility:
1206 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1207 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1208 # Register 'delete' task here for related future HA operations
1209 op_id
= k8srepo_content
.pop('op_id', None)
1210 if not self
.lcm_tasks
.lock_HA('k8srepo', 'delete', op_id
):
1213 k8srepo_id
= k8srepo_content
.get("_id")
1214 logging_text
= "Task k8srepo_delete={} ".format(k8srepo_id
)
1215 self
.logger
.debug(logging_text
+ "Enter")
1218 db_k8srepo_update
= {}
1221 operationState_HA
= ''
1222 detailed_status_HA
= ''
1224 step
= "Getting k8srepo-id='{}' from db".format(k8srepo_id
)
1225 self
.logger
.debug(logging_text
+ step
)
1226 db_k8srepo
= self
.db
.get_one("k8srepos", {"_id": k8srepo_id
})
1228 except Exception as e
:
1229 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
1232 if exc
and db_k8srepo
:
1233 db_k8srepo_update
["_admin.operationalState"] = "ERROR"
1234 db_k8srepo_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
1235 # Mark the WIM 'create' HA task as erroneous
1236 operationState_HA
= 'FAILED'
1237 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
1239 if db_k8srepo_update
:
1240 self
.update_db_2("k8srepos", k8srepo_id
, db_k8srepo_update
)
1241 # Register the K8srepo 'delete' HA task either
1242 # succesful or erroneous, or do nothing (if legacy NBI)
1243 self
.lcm_tasks
.register_HA('k8srepo', 'delete', op_id
,
1244 operationState
=operationState_HA
,
1245 detailed_status
=detailed_status_HA
)
1246 self
.db
.del_one("k8srepos", {"_id": k8srepo_id
})
1247 except DbException
as e
:
1248 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1249 self
.lcm_tasks
.remove("k8srepo", k8srepo_id
, order_id
)