1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
22 import logging
.handlers
23 from osm_lcm
import ROclient
24 from osm_lcm
.lcm_utils
import LcmException
, LcmBase
, deep_get
25 from n2vc
.k8s_helm_conn
import K8sHelmConnector
26 from n2vc
.k8s_juju_conn
import K8sJujuConnector
27 from n2vc
.exceptions
import K8sException
, N2VCException
28 from osm_common
.dbbase
import DbException
29 from copy
import deepcopy
31 __author__
= "Alfonso Tierno"
34 class VimLcm(LcmBase
):
35 # values that are encrypted at vim config because they are passwords
36 vim_config_encrypted
= {"1.1": ("admin_password", "nsx_password", "vcenter_password"),
37 "default": ("admin_password", "nsx_password", "vcenter_password", "vrops_password")}
39 def __init__(self
, db
, msg
, fs
, lcm_tasks
, ro_config
, loop
):
41 Init, Connect to database, filesystem storage, and messaging
42 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
46 self
.logger
= logging
.getLogger('lcm.vim')
48 self
.lcm_tasks
= lcm_tasks
49 self
.ro_config
= ro_config
51 super().__init
__(db
, msg
, fs
, self
.logger
)
53 async def create(self
, vim_content
, order_id
):
55 # HA tasks and backward compatibility:
56 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
57 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
58 # Register 'create' task here for related future HA operations
59 op_id
= vim_content
.pop('op_id', None)
60 if not self
.lcm_tasks
.lock_HA('vim', 'create', op_id
):
63 vim_id
= vim_content
["_id"]
64 vim_content
.pop("op_id", None)
65 logging_text
= "Task vim_create={} ".format(vim_id
)
66 self
.logger
.debug(logging_text
+ "Enter")
72 operationState_HA
= ''
73 detailed_status_HA
= ''
75 step
= "Getting vim-id='{}' from db".format(vim_id
)
76 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
77 if vim_content
.get("config") and vim_content
["config"].get("sdn-controller"):
78 step
= "Getting sdn-controller-id='{}' from db".format(vim_content
["config"]["sdn-controller"])
79 db_sdn
= self
.db
.get_one("sdns", {"_id": vim_content
["config"]["sdn-controller"]})
81 # If the VIM account has an associated SDN account, also
82 # wait for any previous tasks in process for the SDN
83 await self
.lcm_tasks
.waitfor_related_HA('sdn', 'ANY', db_sdn
["_id"])
85 if db_sdn
.get("_admin") and db_sdn
["_admin"].get("deployed") and db_sdn
["_admin"]["deployed"].get("RO"):
86 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
88 raise LcmException("sdn-controller={} is not available. Not deployed at RO".format(
89 vim_content
["config"]["sdn-controller"]))
91 step
= "Creating vim at RO"
92 db_vim_update
["_admin.deployed.RO"] = None
93 db_vim_update
["_admin.detailed-status"] = step
94 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
95 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
96 vim_RO
= deepcopy(vim_content
)
97 vim_RO
.pop("_id", None)
98 vim_RO
.pop("_admin", None)
99 schema_version
= vim_RO
.pop("schema_version", None)
100 vim_RO
.pop("schema_type", None)
101 vim_RO
.pop("vim_tenant_name", None)
102 vim_RO
["type"] = vim_RO
.pop("vim_type")
103 vim_RO
.pop("vim_user", None)
104 vim_RO
.pop("vim_password", None)
106 vim_RO
["config"]["sdn-controller"] = RO_sdn_id
107 desc
= await RO
.create("vim", descriptor
=vim_RO
)
108 RO_vim_id
= desc
["uuid"]
109 db_vim_update
["_admin.deployed.RO"] = RO_vim_id
110 self
.logger
.debug(logging_text
+ "VIM created at RO_vim_id={}".format(RO_vim_id
))
112 step
= "Creating vim_account at RO"
113 db_vim_update
["_admin.detailed-status"] = step
114 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
116 if vim_content
.get("vim_password"):
117 vim_content
["vim_password"] = self
.db
.decrypt(vim_content
["vim_password"],
118 schema_version
=schema_version
,
120 vim_account_RO
= {"vim_tenant_name": vim_content
["vim_tenant_name"],
121 "vim_username": vim_content
["vim_user"],
122 "vim_password": vim_content
["vim_password"]
124 if vim_RO
.get("config"):
125 vim_account_RO
["config"] = vim_RO
["config"]
126 if "sdn-controller" in vim_account_RO
["config"]:
127 del vim_account_RO
["config"]["sdn-controller"]
128 if "sdn-port-mapping" in vim_account_RO
["config"]:
129 del vim_account_RO
["config"]["sdn-port-mapping"]
130 vim_config_encrypted_keys
= self
.vim_config_encrypted
.get(schema_version
) or \
131 self
.vim_config_encrypted
.get("default")
132 for p
in vim_config_encrypted_keys
:
133 if vim_account_RO
["config"].get(p
):
134 vim_account_RO
["config"][p
] = self
.db
.decrypt(vim_account_RO
["config"][p
],
135 schema_version
=schema_version
,
138 desc
= await RO
.attach("vim_account", RO_vim_id
, descriptor
=vim_account_RO
)
139 db_vim_update
["_admin.deployed.RO-account"] = desc
["uuid"]
140 db_vim_update
["_admin.operationalState"] = "ENABLED"
141 db_vim_update
["_admin.detailed-status"] = "Done"
142 # Mark the VIM 'create' HA task as successful
143 operationState_HA
= 'COMPLETED'
144 detailed_status_HA
= 'Done'
146 # await asyncio.sleep(15) # TODO remove. This is for test
147 self
.logger
.debug(logging_text
+ "Exit Ok VIM account created at RO_vim_account_id={}".format(desc
["uuid"]))
150 except (ROclient
.ROClientException
, DbException
) as e
:
151 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
153 except Exception as e
:
154 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
158 db_vim_update
["_admin.operationalState"] = "ERROR"
159 db_vim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
160 # Mark the VIM 'create' HA task as erroneous
161 operationState_HA
= 'FAILED'
162 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
165 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
166 # Register the VIM 'create' HA task either
167 # succesful or erroneous, or do nothing (if legacy NBI)
168 self
.lcm_tasks
.register_HA('vim', 'create', op_id
,
169 operationState
=operationState_HA
,
170 detailed_status
=detailed_status_HA
)
171 except DbException
as e
:
172 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
174 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
176 async def edit(self
, vim_content
, order_id
):
178 # HA tasks and backward compatibility:
179 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
180 # In such a case, HA is not supported by NBI, and the HA check always returns True
181 op_id
= vim_content
.pop('op_id', None)
182 if not self
.lcm_tasks
.lock_HA('vim', 'edit', op_id
):
185 vim_id
= vim_content
["_id"]
186 vim_content
.pop("op_id", None)
187 logging_text
= "Task vim_edit={} ".format(vim_id
)
188 self
.logger
.debug(logging_text
+ "Enter")
195 operationState_HA
= ''
196 detailed_status_HA
= ''
197 step
= "Getting vim-id='{}' from db".format(vim_id
)
199 # wait for any previous tasks in process
200 await self
.lcm_tasks
.waitfor_related_HA('vim', 'edit', op_id
)
202 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
204 if db_vim
.get("_admin") and db_vim
["_admin"].get("deployed") and db_vim
["_admin"]["deployed"].get("RO"):
205 if vim_content
.get("config") and vim_content
["config"].get("sdn-controller"):
206 step
= "Getting sdn-controller-id='{}' from db".format(vim_content
["config"]["sdn-controller"])
207 db_sdn
= self
.db
.get_one("sdns", {"_id": vim_content
["config"]["sdn-controller"]})
209 # If the VIM account has an associated SDN account, also
210 # wait for any previous tasks in process for the SDN
211 await self
.lcm_tasks
.waitfor_related_HA('sdn', 'ANY', db_sdn
["_id"])
213 if db_sdn
.get("_admin") and db_sdn
["_admin"].get("deployed") and db_sdn
["_admin"]["deployed"].get(
215 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
217 raise LcmException("sdn-controller={} is not available. Not deployed at RO".format(
218 vim_content
["config"]["sdn-controller"]))
220 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
221 step
= "Editing vim at RO"
222 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
223 vim_RO
= deepcopy(vim_content
)
224 vim_RO
.pop("_id", None)
225 vim_RO
.pop("_admin", None)
226 schema_version
= vim_RO
.pop("schema_version", None)
227 vim_RO
.pop("schema_type", None)
228 vim_RO
.pop("vim_tenant_name", None)
229 if "vim_type" in vim_RO
:
230 vim_RO
["type"] = vim_RO
.pop("vim_type")
231 vim_RO
.pop("vim_user", None)
232 vim_RO
.pop("vim_password", None)
234 vim_RO
["config"]["sdn-controller"] = RO_sdn_id
235 # TODO make a deep update of sdn-port-mapping
237 await RO
.edit("vim", RO_vim_id
, descriptor
=vim_RO
)
239 step
= "Editing vim-account at RO tenant"
241 if "config" in vim_content
:
242 if "sdn-controller" in vim_content
["config"]:
243 del vim_content
["config"]["sdn-controller"]
244 if "sdn-port-mapping" in vim_content
["config"]:
245 del vim_content
["config"]["sdn-port-mapping"]
246 if not vim_content
["config"]:
247 del vim_content
["config"]
248 if "vim_tenant_name" in vim_content
:
249 vim_account_RO
["vim_tenant_name"] = vim_content
["vim_tenant_name"]
250 if "vim_password" in vim_content
:
251 vim_account_RO
["vim_password"] = vim_content
["vim_password"]
252 if vim_content
.get("vim_password"):
253 vim_account_RO
["vim_password"] = self
.db
.decrypt(vim_content
["vim_password"],
254 schema_version
=schema_version
,
256 if "config" in vim_content
:
257 vim_account_RO
["config"] = vim_content
["config"]
258 if vim_content
.get("config"):
259 vim_config_encrypted_keys
= self
.vim_config_encrypted
.get(schema_version
) or \
260 self
.vim_config_encrypted
.get("default")
261 for p
in vim_config_encrypted_keys
:
262 if vim_content
["config"].get(p
):
263 vim_account_RO
["config"][p
] = self
.db
.decrypt(vim_content
["config"][p
],
264 schema_version
=schema_version
,
267 if "vim_user" in vim_content
:
268 vim_content
["vim_username"] = vim_content
["vim_user"]
269 # vim_account must be edited always even if empty in order to ensure changes are translated to RO
270 # vim_thread. RO will remove and relaunch a new thread for this vim_account
271 await RO
.edit("vim_account", RO_vim_id
, descriptor
=vim_account_RO
)
272 db_vim_update
["_admin.operationalState"] = "ENABLED"
273 # Mark the VIM 'edit' HA task as successful
274 operationState_HA
= 'COMPLETED'
275 detailed_status_HA
= 'Done'
277 self
.logger
.debug(logging_text
+ "Exit Ok RO_vim_id={}".format(RO_vim_id
))
280 except (ROclient
.ROClientException
, DbException
) as e
:
281 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
283 except Exception as e
:
284 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
288 db_vim_update
["_admin.operationalState"] = "ERROR"
289 db_vim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
290 # Mark the VIM 'edit' HA task as erroneous
291 operationState_HA
= 'FAILED'
292 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
295 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
296 # Register the VIM 'edit' HA task either
297 # succesful or erroneous, or do nothing (if legacy NBI)
298 self
.lcm_tasks
.register_HA('vim', 'edit', op_id
,
299 operationState
=operationState_HA
,
300 detailed_status
=detailed_status_HA
)
301 except DbException
as e
:
302 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
304 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
306 async def delete(self
, vim_content
, order_id
):
308 # HA tasks and backward compatibility:
309 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
310 # In such a case, HA is not supported by NBI, and the HA check always returns True
311 op_id
= vim_content
.pop('op_id', None)
312 if not self
.lcm_tasks
.lock_HA('vim', 'delete', op_id
):
315 vim_id
= vim_content
["_id"]
316 logging_text
= "Task vim_delete={} ".format(vim_id
)
317 self
.logger
.debug(logging_text
+ "Enter")
322 operationState_HA
= ''
323 detailed_status_HA
= ''
324 step
= "Getting vim from db"
326 # wait for any previous tasks in process
327 await self
.lcm_tasks
.waitfor_related_HA('vim', 'delete', op_id
)
329 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
330 if db_vim
.get("_admin") and db_vim
["_admin"].get("deployed") and db_vim
["_admin"]["deployed"].get("RO"):
331 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
332 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
333 step
= "Detaching vim from RO tenant"
335 await RO
.detach("vim_account", RO_vim_id
)
336 except ROclient
.ROClientException
as e
:
337 if e
.http_code
== 404: # not found
338 self
.logger
.debug(logging_text
+ "RO_vim_id={} already detached".format(RO_vim_id
))
342 step
= "Deleting vim from RO"
344 await RO
.delete("vim", RO_vim_id
)
345 except ROclient
.ROClientException
as e
:
346 if e
.http_code
== 404: # not found
347 self
.logger
.debug(logging_text
+ "RO_vim_id={} already deleted".format(RO_vim_id
))
352 self
.logger
.error(logging_text
+ "Nothing to remove at RO")
353 self
.db
.del_one("vim_accounts", {"_id": vim_id
})
355 self
.logger
.debug(logging_text
+ "Exit Ok")
358 except (ROclient
.ROClientException
, DbException
) as e
:
359 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
361 except Exception as e
:
362 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
365 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
367 db_vim_update
["_admin.operationalState"] = "ERROR"
368 db_vim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
369 # Mark the VIM 'delete' HA task as erroneous
370 operationState_HA
= 'FAILED'
371 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
372 self
.lcm_tasks
.register_HA('vim', 'delete', op_id
,
373 operationState
=operationState_HA
,
374 detailed_status
=detailed_status_HA
)
376 if db_vim
and db_vim_update
:
377 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
378 # If the VIM 'delete' HA task was succesful, the DB entry has been deleted,
379 # which means that there is nowhere to register this task, so do nothing here.
380 except DbException
as e
:
381 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
382 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
385 class WimLcm(LcmBase
):
386 # values that are encrypted at wim config because they are passwords
387 wim_config_encrypted
= ()
389 def __init__(self
, db
, msg
, fs
, lcm_tasks
, ro_config
, loop
):
391 Init, Connect to database, filesystem storage, and messaging
392 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
396 self
.logger
= logging
.getLogger('lcm.vim')
398 self
.lcm_tasks
= lcm_tasks
399 self
.ro_config
= ro_config
401 super().__init
__(db
, msg
, fs
, self
.logger
)
403 async def create(self
, wim_content
, order_id
):
405 # HA tasks and backward compatibility:
406 # If 'wim_content' does not include 'op_id', we a running a legacy NBI version.
407 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
408 # Register 'create' task here for related future HA operations
409 op_id
= wim_content
.pop('op_id', None)
410 self
.lcm_tasks
.lock_HA('wim', 'create', op_id
)
412 wim_id
= wim_content
["_id"]
413 wim_content
.pop("op_id", None)
414 logging_text
= "Task wim_create={} ".format(wim_id
)
415 self
.logger
.debug(logging_text
+ "Enter")
420 operationState_HA
= ''
421 detailed_status_HA
= ''
423 step
= "Getting wim-id='{}' from db".format(wim_id
)
424 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_id
})
425 db_wim_update
["_admin.deployed.RO"] = None
427 step
= "Creating wim at RO"
428 db_wim_update
["_admin.detailed-status"] = step
429 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
430 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
431 wim_RO
= deepcopy(wim_content
)
432 wim_RO
.pop("_id", None)
433 wim_RO
.pop("_admin", None)
434 schema_version
= wim_RO
.pop("schema_version", None)
435 wim_RO
.pop("schema_type", None)
436 wim_RO
.pop("wim_tenant_name", None)
437 wim_RO
["type"] = wim_RO
.pop("wim_type")
438 wim_RO
.pop("wim_user", None)
439 wim_RO
.pop("wim_password", None)
440 desc
= await RO
.create("wim", descriptor
=wim_RO
)
441 RO_wim_id
= desc
["uuid"]
442 db_wim_update
["_admin.deployed.RO"] = RO_wim_id
443 self
.logger
.debug(logging_text
+ "WIM created at RO_wim_id={}".format(RO_wim_id
))
445 step
= "Creating wim_account at RO"
446 db_wim_update
["_admin.detailed-status"] = step
447 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
449 if wim_content
.get("wim_password"):
450 wim_content
["wim_password"] = self
.db
.decrypt(wim_content
["wim_password"],
451 schema_version
=schema_version
,
453 wim_account_RO
= {"name": wim_content
["name"],
454 "user": wim_content
["user"],
455 "password": wim_content
["password"]
457 if wim_RO
.get("config"):
458 wim_account_RO
["config"] = wim_RO
["config"]
459 if "wim_port_mapping" in wim_account_RO
["config"]:
460 del wim_account_RO
["config"]["wim_port_mapping"]
461 for p
in self
.wim_config_encrypted
:
462 if wim_account_RO
["config"].get(p
):
463 wim_account_RO
["config"][p
] = self
.db
.decrypt(wim_account_RO
["config"][p
],
464 schema_version
=schema_version
,
467 desc
= await RO
.attach("wim_account", RO_wim_id
, descriptor
=wim_account_RO
)
468 db_wim_update
["_admin.deployed.RO-account"] = desc
["uuid"]
469 db_wim_update
["_admin.operationalState"] = "ENABLED"
470 db_wim_update
["_admin.detailed-status"] = "Done"
471 # Mark the WIM 'create' HA task as successful
472 operationState_HA
= 'COMPLETED'
473 detailed_status_HA
= 'Done'
475 self
.logger
.debug(logging_text
+ "Exit Ok WIM account created at RO_wim_account_id={}".format(desc
["uuid"]))
478 except (ROclient
.ROClientException
, DbException
) as e
:
479 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
481 except Exception as e
:
482 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
486 db_wim_update
["_admin.operationalState"] = "ERROR"
487 db_wim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
488 # Mark the WIM 'create' HA task as erroneous
489 operationState_HA
= 'FAILED'
490 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
493 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
494 # Register the WIM 'create' HA task either
495 # succesful or erroneous, or do nothing (if legacy NBI)
496 self
.lcm_tasks
.register_HA('wim', 'create', op_id
,
497 operationState
=operationState_HA
,
498 detailed_status
=detailed_status_HA
)
499 except DbException
as e
:
500 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
501 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
503 async def edit(self
, wim_content
, order_id
):
505 # HA tasks and backward compatibility:
506 # If 'wim_content' does not include 'op_id', we a running a legacy NBI version.
507 # In such a case, HA is not supported by NBI, and the HA check always returns True
508 op_id
= wim_content
.pop('op_id', None)
509 if not self
.lcm_tasks
.lock_HA('wim', 'edit', op_id
):
512 wim_id
= wim_content
["_id"]
513 wim_content
.pop("op_id", None)
514 logging_text
= "Task wim_edit={} ".format(wim_id
)
515 self
.logger
.debug(logging_text
+ "Enter")
521 step
= "Getting wim-id='{}' from db".format(wim_id
)
522 operationState_HA
= ''
523 detailed_status_HA
= ''
525 # wait for any previous tasks in process
526 await self
.lcm_tasks
.waitfor_related_HA('wim', 'edit', op_id
)
528 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_id
})
530 if db_wim
.get("_admin") and db_wim
["_admin"].get("deployed") and db_wim
["_admin"]["deployed"].get("RO"):
532 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO"]
533 step
= "Editing wim at RO"
534 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
535 wim_RO
= deepcopy(wim_content
)
536 wim_RO
.pop("_id", None)
537 wim_RO
.pop("_admin", None)
538 schema_version
= wim_RO
.pop("schema_version", None)
539 wim_RO
.pop("schema_type", None)
540 wim_RO
.pop("wim_tenant_name", None)
541 if "wim_type" in wim_RO
:
542 wim_RO
["type"] = wim_RO
.pop("wim_type")
543 wim_RO
.pop("wim_user", None)
544 wim_RO
.pop("wim_password", None)
545 # TODO make a deep update of wim_port_mapping
547 await RO
.edit("wim", RO_wim_id
, descriptor
=wim_RO
)
549 step
= "Editing wim-account at RO tenant"
551 if "config" in wim_content
:
552 if "wim_port_mapping" in wim_content
["config"]:
553 del wim_content
["config"]["wim_port_mapping"]
554 if not wim_content
["config"]:
555 del wim_content
["config"]
556 if "wim_tenant_name" in wim_content
:
557 wim_account_RO
["wim_tenant_name"] = wim_content
["wim_tenant_name"]
558 if "wim_password" in wim_content
:
559 wim_account_RO
["wim_password"] = wim_content
["wim_password"]
560 if wim_content
.get("wim_password"):
561 wim_account_RO
["wim_password"] = self
.db
.decrypt(wim_content
["wim_password"],
562 schema_version
=schema_version
,
564 if "config" in wim_content
:
565 wim_account_RO
["config"] = wim_content
["config"]
566 if wim_content
.get("config"):
567 for p
in self
.wim_config_encrypted
:
568 if wim_content
["config"].get(p
):
569 wim_account_RO
["config"][p
] = self
.db
.decrypt(wim_content
["config"][p
],
570 schema_version
=schema_version
,
573 if "wim_user" in wim_content
:
574 wim_content
["wim_username"] = wim_content
["wim_user"]
575 # wim_account must be edited always even if empty in order to ensure changes are translated to RO
576 # wim_thread. RO will remove and relaunch a new thread for this wim_account
577 await RO
.edit("wim_account", RO_wim_id
, descriptor
=wim_account_RO
)
578 db_wim_update
["_admin.operationalState"] = "ENABLED"
579 # Mark the WIM 'edit' HA task as successful
580 operationState_HA
= 'COMPLETED'
581 detailed_status_HA
= 'Done'
583 self
.logger
.debug(logging_text
+ "Exit Ok RO_wim_id={}".format(RO_wim_id
))
586 except (ROclient
.ROClientException
, DbException
) as e
:
587 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
589 except Exception as e
:
590 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
594 db_wim_update
["_admin.operationalState"] = "ERROR"
595 db_wim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
596 # Mark the WIM 'edit' HA task as erroneous
597 operationState_HA
= 'FAILED'
598 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
601 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
602 # Register the WIM 'edit' HA task either
603 # succesful or erroneous, or do nothing (if legacy NBI)
604 self
.lcm_tasks
.register_HA('wim', 'edit', op_id
,
605 operationState
=operationState_HA
,
606 detailed_status
=detailed_status_HA
)
607 except DbException
as e
:
608 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
609 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
611 async def delete(self
, wim_content
, order_id
):
613 # HA tasks and backward compatibility:
614 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
615 # In such a case, HA is not supported by NBI, and the HA check always returns True
616 op_id
= wim_content
.pop('op_id', None)
617 if not self
.lcm_tasks
.lock_HA('wim', 'delete', op_id
):
620 wim_id
= wim_content
["_id"]
621 logging_text
= "Task wim_delete={} ".format(wim_id
)
622 self
.logger
.debug(logging_text
+ "Enter")
627 step
= "Getting wim from db"
628 operationState_HA
= ''
629 detailed_status_HA
= ''
631 # wait for any previous tasks in process
632 await self
.lcm_tasks
.waitfor_related_HA('wim', 'delete', op_id
)
634 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_id
})
635 if db_wim
.get("_admin") and db_wim
["_admin"].get("deployed") and db_wim
["_admin"]["deployed"].get("RO"):
636 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO"]
637 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
638 step
= "Detaching wim from RO tenant"
640 await RO
.detach("wim_account", RO_wim_id
)
641 except ROclient
.ROClientException
as e
:
642 if e
.http_code
== 404: # not found
643 self
.logger
.debug(logging_text
+ "RO_wim_id={} already detached".format(RO_wim_id
))
647 step
= "Deleting wim from RO"
649 await RO
.delete("wim", RO_wim_id
)
650 except ROclient
.ROClientException
as e
:
651 if e
.http_code
== 404: # not found
652 self
.logger
.debug(logging_text
+ "RO_wim_id={} already deleted".format(RO_wim_id
))
657 self
.logger
.error(logging_text
+ "Nohing to remove at RO")
658 self
.db
.del_one("wim_accounts", {"_id": wim_id
})
660 self
.logger
.debug(logging_text
+ "Exit Ok")
663 except (ROclient
.ROClientException
, DbException
) as e
:
664 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
666 except Exception as e
:
667 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
670 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
672 db_wim_update
["_admin.operationalState"] = "ERROR"
673 db_wim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
674 # Mark the WIM 'delete' HA task as erroneous
675 operationState_HA
= 'FAILED'
676 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
677 self
.lcm_tasks
.register_HA('wim', 'delete', op_id
,
678 operationState
=operationState_HA
,
679 detailed_status
=detailed_status_HA
)
681 if db_wim
and db_wim_update
:
682 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
683 # If the WIM 'delete' HA task was succesful, the DB entry has been deleted,
684 # which means that there is nowhere to register this task, so do nothing here.
685 except DbException
as e
:
686 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
687 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
690 class SdnLcm(LcmBase
):
692 def __init__(self
, db
, msg
, fs
, lcm_tasks
, ro_config
, loop
):
694 Init, Connect to database, filesystem storage, and messaging
695 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
699 self
.logger
= logging
.getLogger('lcm.sdn')
701 self
.lcm_tasks
= lcm_tasks
702 self
.ro_config
= ro_config
704 super().__init
__(db
, msg
, fs
, self
.logger
)
706 async def create(self
, sdn_content
, order_id
):
708 # HA tasks and backward compatibility:
709 # If 'sdn_content' does not include 'op_id', we a running a legacy NBI version.
710 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
711 # Register 'create' task here for related future HA operations
712 op_id
= sdn_content
.pop('op_id', None)
713 self
.lcm_tasks
.lock_HA('sdn', 'create', op_id
)
715 sdn_id
= sdn_content
["_id"]
716 sdn_content
.pop("op_id", None)
717 logging_text
= "Task sdn_create={} ".format(sdn_id
)
718 self
.logger
.debug(logging_text
+ "Enter")
724 operationState_HA
= ''
725 detailed_status_HA
= ''
727 step
= "Getting sdn from db"
728 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
729 db_sdn_update
["_admin.deployed.RO"] = None
731 step
= "Creating sdn at RO"
732 db_sdn_update
["_admin.detailed-status"] = step
733 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
735 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
736 sdn_RO
= deepcopy(sdn_content
)
737 sdn_RO
.pop("_id", None)
738 sdn_RO
.pop("_admin", None)
739 schema_version
= sdn_RO
.pop("schema_version", None)
740 sdn_RO
.pop("schema_type", None)
741 sdn_RO
.pop("description", None)
742 if sdn_RO
.get("password"):
743 sdn_RO
["password"] = self
.db
.decrypt(sdn_RO
["password"], schema_version
=schema_version
, salt
=sdn_id
)
745 desc
= await RO
.create("sdn", descriptor
=sdn_RO
)
746 RO_sdn_id
= desc
["uuid"]
747 db_sdn_update
["_admin.deployed.RO"] = RO_sdn_id
748 db_sdn_update
["_admin.operationalState"] = "ENABLED"
749 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id={}".format(RO_sdn_id
))
750 # Mark the SDN 'create' HA task as successful
751 operationState_HA
= 'COMPLETED'
752 detailed_status_HA
= 'Done'
755 except (ROclient
.ROClientException
, DbException
) as e
:
756 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
758 except Exception as e
:
759 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
763 db_sdn_update
["_admin.operationalState"] = "ERROR"
764 db_sdn_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
765 # Mark the SDN 'create' HA task as erroneous
766 operationState_HA
= 'FAILED'
767 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
769 if db_sdn
and db_sdn_update
:
770 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
771 # Register the SDN 'create' HA task either
772 # succesful or erroneous, or do nothing (if legacy NBI)
773 self
.lcm_tasks
.register_HA('sdn', 'create', op_id
,
774 operationState
=operationState_HA
,
775 detailed_status
=detailed_status_HA
)
776 except DbException
as e
:
777 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
778 self
.lcm_tasks
.remove("sdn", sdn_id
, order_id
)
780 async def edit(self
, sdn_content
, order_id
):
782 # HA tasks and backward compatibility:
783 # If 'sdn_content' does not include 'op_id', we a running a legacy NBI version.
784 # In such a case, HA is not supported by NBI, and the HA check always returns True
785 op_id
= sdn_content
.pop('op_id', None)
786 if not self
.lcm_tasks
.lock_HA('sdn', 'edit', op_id
):
789 sdn_id
= sdn_content
["_id"]
790 sdn_content
.pop("op_id", None)
791 logging_text
= "Task sdn_edit={} ".format(sdn_id
)
792 self
.logger
.debug(logging_text
+ "Enter")
797 operationState_HA
= ''
798 detailed_status_HA
= ''
799 step
= "Getting sdn from db"
801 # wait for any previous tasks in process
802 await self
.lcm_tasks
.waitfor_related_HA('sdn', 'edit', op_id
)
804 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
806 if db_sdn
.get("_admin") and db_sdn
["_admin"].get("deployed") and db_sdn
["_admin"]["deployed"].get("RO"):
807 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
808 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
809 step
= "Editing sdn at RO"
810 sdn_RO
= deepcopy(sdn_content
)
811 sdn_RO
.pop("_id", None)
812 sdn_RO
.pop("_admin", None)
813 schema_version
= sdn_RO
.pop("schema_version", None)
814 sdn_RO
.pop("schema_type", None)
815 sdn_RO
.pop("description", None)
816 if sdn_RO
.get("password"):
817 sdn_RO
["password"] = self
.db
.decrypt(sdn_RO
["password"], schema_version
=schema_version
, salt
=sdn_id
)
819 await RO
.edit("sdn", RO_sdn_id
, descriptor
=sdn_RO
)
820 db_sdn_update
["_admin.operationalState"] = "ENABLED"
821 # Mark the SDN 'edit' HA task as successful
822 operationState_HA
= 'COMPLETED'
823 detailed_status_HA
= 'Done'
825 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id={}".format(RO_sdn_id
))
828 except (ROclient
.ROClientException
, DbException
) as e
:
829 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
831 except Exception as e
:
832 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
836 db_sdn
["_admin.operationalState"] = "ERROR"
837 db_sdn
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
838 # Mark the SDN 'edit' HA task as erroneous
839 operationState_HA
= 'FAILED'
840 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
843 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
844 # Register the SDN 'edit' HA task either
845 # succesful or erroneous, or do nothing (if legacy NBI)
846 self
.lcm_tasks
.register_HA('sdn', 'edit', op_id
,
847 operationState
=operationState_HA
,
848 detailed_status
=detailed_status_HA
)
849 except DbException
as e
:
850 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
851 self
.lcm_tasks
.remove("sdn", sdn_id
, order_id
)
853 async def delete(self
, sdn_content
, order_id
):
855 # HA tasks and backward compatibility:
856 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
857 # In such a case, HA is not supported by NBI, and the HA check always returns True
858 op_id
= sdn_content
.pop('op_id', None)
859 if not self
.lcm_tasks
.lock_HA('sdn', 'delete', op_id
):
862 sdn_id
= sdn_content
["_id"]
863 logging_text
= "Task sdn_delete={} ".format(sdn_id
)
864 self
.logger
.debug(logging_text
+ "Enter")
869 operationState_HA
= ''
870 detailed_status_HA
= ''
871 step
= "Getting sdn from db"
873 # wait for any previous tasks in process
874 await self
.lcm_tasks
.waitfor_related_HA('sdn', 'delete', op_id
)
876 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
877 if db_sdn
.get("_admin") and db_sdn
["_admin"].get("deployed") and db_sdn
["_admin"]["deployed"].get("RO"):
878 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
879 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
880 step
= "Deleting sdn from RO"
882 await RO
.delete("sdn", RO_sdn_id
)
883 except ROclient
.ROClientException
as e
:
884 if e
.http_code
== 404: # not found
885 self
.logger
.debug(logging_text
+ "RO_sdn_id={} already deleted".format(RO_sdn_id
))
890 self
.logger
.error(logging_text
+ "Skipping. There is not RO information at database")
891 self
.db
.del_one("sdns", {"_id": sdn_id
})
893 self
.logger
.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id
))
896 except (ROclient
.ROClientException
, DbException
) as e
:
897 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
899 except Exception as e
:
900 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
904 db_sdn
["_admin.operationalState"] = "ERROR"
905 db_sdn
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
906 # Mark the SDN 'delete' HA task as erroneous
907 operationState_HA
= 'FAILED'
908 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
909 self
.lcm_tasks
.register_HA('sdn', 'delete', op_id
,
910 operationState
=operationState_HA
,
911 detailed_status
=detailed_status_HA
)
913 if db_sdn
and db_sdn_update
:
914 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
915 # If the SDN 'delete' HA task was succesful, the DB entry has been deleted,
916 # which means that there is nowhere to register this task, so do nothing here.
917 except DbException
as e
:
918 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
919 self
.lcm_tasks
.remove("sdn", sdn_id
, order_id
)
922 class K8sClusterLcm(LcmBase
):
924 def __init__(self
, db
, msg
, fs
, lcm_tasks
, vca_config
, loop
):
926 Init, Connect to database, filesystem storage, and messaging
927 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
931 self
.logger
= logging
.getLogger('lcm.k8scluster')
933 self
.lcm_tasks
= lcm_tasks
934 self
.vca_config
= vca_config
938 self
.helm_k8scluster
= K8sHelmConnector(
939 kubectl_command
=self
.vca_config
.get("kubectlpath"),
940 helm_command
=self
.vca_config
.get("helmpath"),
947 self
.juju_k8scluster
= K8sJujuConnector(
948 kubectl_command
=self
.vca_config
.get("kubectlpath"),
949 juju_command
=self
.vca_config
.get("jujupath"),
956 super().__init
__(db
, msg
, fs
, self
.logger
)
958 async def create(self
, k8scluster_content
, order_id
):
960 # HA tasks and backward compatibility:
961 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
962 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
963 # Register 'create' task here for related future HA operations
964 op_id
= k8scluster_content
.pop('op_id', None)
965 if not self
.lcm_tasks
.lock_HA('k8scluster', 'create', op_id
):
968 k8scluster_id
= k8scluster_content
["_id"]
969 k8scluster_content
.pop("op_id", None)
970 logging_text
= "Task k8scluster_create={} ".format(k8scluster_id
)
971 self
.logger
.debug(logging_text
+ "Enter")
974 db_k8scluster_update
= {}
977 operationState_HA
= ''
978 detailed_status_HA
= ''
980 step
= "Getting k8scluster-id='{}' from db".format(k8scluster_id
)
981 self
.logger
.debug(logging_text
+ step
)
982 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": k8scluster_id
})
983 self
.db
.encrypt_decrypt_fields(db_k8scluster
.get("credentials"), 'decrypt', ['password', 'secret'],
984 schema_version
=db_k8scluster
["schema_version"], salt
=db_k8scluster
["_id"])
985 k8s_credentials
= yaml
.safe_dump(db_k8scluster
.get("credentials"))
990 k8s_hc_id
, uninstall_sw
= await self
.helm_k8scluster
.init_env(k8s_credentials
)
991 db_k8scluster_update
["_admin.helm-chart.id"] = k8s_hc_id
992 db_k8scluster_update
["_admin.helm-chart.created"] = uninstall_sw
993 except Exception as e
:
994 error_text_list
.append("Failing init helm-chart: {}".format(e
))
995 db_k8scluster_update
["_admin.helm-chart.error_msg"] = str(e
)
996 if isinstance(e
, K8sException
):
997 self
.logger
.error(logging_text
+ "Failing init helm-chart: {}".format(e
))
999 self
.logger
.error(logging_text
+ "Failing init helm-chart: {}".format(e
), exc_info
=True)
1004 k8s_jb_id
, uninstall_sw
= await self
.juju_k8scluster
.init_env(k8s_credentials
)
1005 db_k8scluster_update
["_admin.juju-bundle.id"] = k8s_jb_id
1006 db_k8scluster_update
["_admin.juju-bundle.created"] = uninstall_sw
1007 except Exception as e
:
1008 error_text_list
.append("Failing init juju-bundle: {}".format(e
))
1009 db_k8scluster_update
["_admin.juju-bundle.error_msg"] = str(e
)
1010 if isinstance(e
, N2VCException
):
1011 self
.logger
.error(logging_text
+ "Failing init juju-bundle: {}".format(e
))
1013 self
.logger
.error(logging_text
+ "Failing init juju-bundle: {}".format(e
), exc_info
=True)
1015 step
= "Getting the list of repos"
1017 self
.logger
.debug(logging_text
+ step
)
1019 db_k8srepo_list
= self
.db
.get_list("k8srepos", {"type": "helm-chart"})
1020 for repo
in db_k8srepo_list
:
1021 step
= "Adding repo {} to cluster: {}".format(repo
["name"], k8s_hc_id
)
1022 self
.logger
.debug(logging_text
+ step
)
1023 task
= asyncio
.ensure_future(self
.helm_k8scluster
.repo_add(cluster_uuid
=k8s_hc_id
,
1024 name
=repo
["name"], url
=repo
["url"],
1026 task_list
.append(task
)
1027 repo_k8scluster_list
= deep_get(repo
, ("_admin", "cluster-inserted")) or []
1028 repo_k8scluster_list
.append(k8s_hc_id
)
1029 self
.update_db_2("k8srepos", repo
["_id"], {"_admin.cluster-inserted": repo_k8scluster_list
})
1032 self
.logger
.debug(logging_text
+ 'Waiting for terminate tasks of repo_add')
1033 done
, pending
= await asyncio
.wait(task_list
, timeout
=3600)
1035 self
.logger
.error(logging_text
+ 'There are pending tasks: {}'.format(pending
))
1037 # mark as an error if both helm-chart and juju-bundle have been failed
1038 if k8s_hc_id
or k8s_jb_id
:
1039 db_k8scluster_update
["_admin.operationalState"] = "ENABLED"
1041 db_k8scluster_update
["_admin.operationalState"] = "ERROR"
1042 db_k8scluster_update
["_admin.detailed-status"] = ";".join(error_text_list
)
1044 except Exception as e
:
1045 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
1048 if exc
and db_k8scluster
:
1049 db_k8scluster_update
["_admin.operationalState"] = "ERROR"
1050 db_k8scluster_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
1052 # Mark the k8scluster 'create' HA task as erroneous
1053 operationState_HA
= 'FAILED'
1054 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
1056 if db_k8scluster_update
:
1057 self
.update_db_2("k8sclusters", k8scluster_id
, db_k8scluster_update
)
1059 # Register the K8scluster 'create' HA task either
1060 # succesful or erroneous, or do nothing (if legacy NBI)
1061 self
.lcm_tasks
.register_HA('k8scluster', 'create', op_id
,
1062 operationState
=operationState_HA
,
1063 detailed_status
=detailed_status_HA
)
1064 except DbException
as e
:
1065 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1066 self
.lcm_tasks
.remove("k8sclusters", k8scluster_id
, order_id
)
1068 async def delete(self
, k8scluster_content
, order_id
):
1070 # HA tasks and backward compatibility:
1071 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1072 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1073 # Register 'delete' task here for related future HA operations
1074 op_id
= k8scluster_content
.pop('op_id', None)
1075 if not self
.lcm_tasks
.lock_HA('k8scluster', 'delete', op_id
):
1078 k8scluster_id
= k8scluster_content
["_id"]
1079 k8scluster_content
.pop("op_id", None)
1080 logging_text
= "Task k8scluster_delete={} ".format(k8scluster_id
)
1081 self
.logger
.debug(logging_text
+ "Enter")
1083 db_k8scluster
= None
1084 db_k8scluster_update
= {}
1086 operationState_HA
= ''
1087 detailed_status_HA
= ''
1089 step
= "Getting k8scluster='{}' from db".format(k8scluster_id
)
1090 self
.logger
.debug(logging_text
+ step
)
1091 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": k8scluster_id
})
1092 k8s_hc_id
= deep_get(db_k8scluster
, ("_admin", "helm-chart", "id"))
1093 k8s_jb_id
= deep_get(db_k8scluster
, ("_admin", "juju-bundle", "id"))
1095 uninstall_sw
= deep_get(db_k8scluster
, ("_admin", "helm-chart", "created"))
1096 cluster_removed
= True
1098 uninstall_sw
= uninstall_sw
or False
1099 cluster_removed
= await self
.helm_k8scluster
.reset(cluster_uuid
=k8s_hc_id
, uninstall_sw
=uninstall_sw
)
1102 uninstall_sw
= uninstall_sw
or False
1103 cluster_removed
= await self
.juju_k8scluster
.reset(cluster_uuid
=k8s_jb_id
, uninstall_sw
=uninstall_sw
)
1105 if k8s_hc_id
and cluster_removed
:
1106 step
= "Removing k8scluster='{}' from k8srepos".format(k8scluster_id
)
1107 self
.logger
.debug(logging_text
+ step
)
1108 db_k8srepo_list
= self
.db
.get_list("k8srepos", {"_admin.cluster-inserted": k8s_hc_id
})
1109 for k8srepo
in db_k8srepo_list
:
1111 cluster_list
= k8srepo
["_admin"]["cluster-inserted"]
1112 cluster_list
.remove(k8s_hc_id
)
1113 self
.update_db_2("k8srepos", k8srepo
["_id"], {"_admin.cluster-inserted": cluster_list
})
1114 except Exception as e
:
1115 self
.logger
.error("{}: {}".format(step
, e
))
1116 self
.db
.del_one("k8sclusters", {"_id": k8scluster_id
})
1118 raise LcmException("An error happened during the reset of the k8s cluster '{}'".format(k8scluster_id
))
1119 # if not cluster_removed:
1120 # raise Exception("K8scluster was not properly removed")
1122 except Exception as e
:
1123 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
1126 if exc
and db_k8scluster
:
1127 db_k8scluster_update
["_admin.operationalState"] = "ERROR"
1128 db_k8scluster_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
1129 # Mark the WIM 'create' HA task as erroneous
1130 operationState_HA
= 'FAILED'
1131 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
1133 if db_k8scluster_update
:
1134 self
.update_db_2("k8sclusters", k8scluster_id
, db_k8scluster_update
)
1135 # Register the K8scluster 'delete' HA task either
1136 # succesful or erroneous, or do nothing (if legacy NBI)
1137 self
.lcm_tasks
.register_HA('k8scluster', 'delete', op_id
,
1138 operationState
=operationState_HA
,
1139 detailed_status
=detailed_status_HA
)
1140 except DbException
as e
:
1141 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1142 self
.lcm_tasks
.remove("k8sclusters", k8scluster_id
, order_id
)
1145 class K8sRepoLcm(LcmBase
):
1147 def __init__(self
, db
, msg
, fs
, lcm_tasks
, vca_config
, loop
):
1149 Init, Connect to database, filesystem storage, and messaging
1150 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1154 self
.logger
= logging
.getLogger('lcm.k8srepo')
1156 self
.lcm_tasks
= lcm_tasks
1157 self
.vca_config
= vca_config
1161 self
.k8srepo
= K8sHelmConnector(
1162 kubectl_command
=self
.vca_config
.get("kubectlpath"),
1163 helm_command
=self
.vca_config
.get("helmpath"),
1170 super().__init
__(db
, msg
, fs
, self
.logger
)
1172 async def create(self
, k8srepo_content
, order_id
):
1174 # HA tasks and backward compatibility:
1175 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1176 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1177 # Register 'create' task here for related future HA operations
1179 op_id
= k8srepo_content
.pop('op_id', None)
1180 if not self
.lcm_tasks
.lock_HA('k8srepo', 'create', op_id
):
1183 k8srepo_id
= k8srepo_content
.get("_id")
1184 logging_text
= "Task k8srepo_create={} ".format(k8srepo_id
)
1185 self
.logger
.debug(logging_text
+ "Enter")
1188 db_k8srepo_update
= {}
1190 operationState_HA
= ''
1191 detailed_status_HA
= ''
1193 step
= "Getting k8srepo-id='{}' from db".format(k8srepo_id
)
1194 self
.logger
.debug(logging_text
+ step
)
1195 db_k8srepo
= self
.db
.get_one("k8srepos", {"_id": k8srepo_id
})
1196 step
= "Getting k8scluster_list from db"
1197 self
.logger
.debug(logging_text
+ step
)
1198 db_k8scluster_list
= self
.db
.get_list("k8sclusters", {})
1199 db_k8srepo_update
["_admin.cluster-inserted"] = []
1201 for k8scluster
in db_k8scluster_list
:
1202 hc_id
= deep_get(k8scluster
, ("_admin", "helm-chart", "id"))
1204 step
= "Adding repo to cluster: {}".format(hc_id
)
1205 self
.logger
.debug(logging_text
+ step
)
1206 task
= asyncio
.ensure_future(self
.k8srepo
.repo_add(cluster_uuid
=hc_id
,
1207 name
=db_k8srepo
["name"], url
=db_k8srepo
["url"],
1209 task_list
.append(task
)
1210 db_k8srepo_update
["_admin.cluster-inserted"].append(hc_id
)
1214 if len(task_list
) > 0:
1215 self
.logger
.debug('Waiting for terminate pending tasks...')
1216 done
, pending
= await asyncio
.wait(task_list
, timeout
=3600)
1218 self
.logger
.debug('All tasks finished...')
1220 self
.logger
.info('There are pending tasks: {}'.format(pending
))
1221 db_k8srepo_update
["_admin.operationalState"] = "ENABLED"
1222 except Exception as e
:
1223 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
1226 if exc
and db_k8srepo
:
1227 db_k8srepo_update
["_admin.operationalState"] = "ERROR"
1228 db_k8srepo_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
1229 # Mark the WIM 'create' HA task as erroneous
1230 operationState_HA
= 'FAILED'
1231 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
1233 if db_k8srepo_update
:
1234 self
.update_db_2("k8srepos", k8srepo_id
, db_k8srepo_update
)
1235 # Register the K8srepo 'create' HA task either
1236 # succesful or erroneous, or do nothing (if legacy NBI)
1237 self
.lcm_tasks
.register_HA('k8srepo', 'create', op_id
,
1238 operationState
=operationState_HA
,
1239 detailed_status
=detailed_status_HA
)
1240 except DbException
as e
:
1241 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1242 self
.lcm_tasks
.remove("k8srepo", k8srepo_id
, order_id
)
1244 async def delete(self
, k8srepo_content
, order_id
):
1246 # HA tasks and backward compatibility:
1247 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1248 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1249 # Register 'delete' task here for related future HA operations
1250 op_id
= k8srepo_content
.pop('op_id', None)
1251 if not self
.lcm_tasks
.lock_HA('k8srepo', 'delete', op_id
):
1254 k8srepo_id
= k8srepo_content
.get("_id")
1255 logging_text
= "Task k8srepo_delete={} ".format(k8srepo_id
)
1256 self
.logger
.debug(logging_text
+ "Enter")
1259 db_k8srepo_update
= {}
1261 operationState_HA
= ''
1262 detailed_status_HA
= ''
1264 step
= "Getting k8srepo-id='{}' from db".format(k8srepo_id
)
1265 self
.logger
.debug(logging_text
+ step
)
1266 db_k8srepo
= self
.db
.get_one("k8srepos", {"_id": k8srepo_id
})
1267 step
= "Getting k8scluster_list from db"
1268 self
.logger
.debug(logging_text
+ step
)
1269 db_k8scluster_list
= self
.db
.get_list("k8sclusters", {})
1272 for k8scluster
in db_k8scluster_list
:
1273 hc_id
= deep_get(k8scluster
, ("_admin", "helm-chart", "id"))
1275 task
= asyncio
.ensure_future(self
.k8srepo
.repo_remove(cluster_uuid
=hc_id
,
1276 name
=db_k8srepo
["name"]))
1277 task_list
.append(task
)
1280 if len(task_list
) > 0:
1281 self
.logger
.debug('Waiting for terminate pending tasks...')
1282 done
, pending
= await asyncio
.wait(task_list
, timeout
=3600)
1284 self
.logger
.debug('All tasks finished...')
1286 self
.logger
.info('There are pending tasks: {}'.format(pending
))
1287 self
.db
.del_one("k8srepos", {"_id": k8srepo_id
})
1289 except Exception as e
:
1290 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
1293 if exc
and db_k8srepo
:
1294 db_k8srepo_update
["_admin.operationalState"] = "ERROR"
1295 db_k8srepo_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
1296 # Mark the WIM 'create' HA task as erroneous
1297 operationState_HA
= 'FAILED'
1298 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
1300 if db_k8srepo_update
:
1301 self
.update_db_2("k8srepos", k8srepo_id
, db_k8srepo_update
)
1302 # Register the K8srepo 'delete' HA task either
1303 # succesful or erroneous, or do nothing (if legacy NBI)
1304 self
.lcm_tasks
.register_HA('k8srepo', 'delete', op_id
,
1305 operationState
=operationState_HA
,
1306 detailed_status
=detailed_status_HA
)
1307 except DbException
as e
:
1308 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1309 self
.lcm_tasks
.remove("k8srepo", k8srepo_id
, order_id
)