return return_data, None, True
+ def get_action_viminfo(
+ self, session, indata, version, nsr_id, action_id, *args, **kwargs
+ ):
+ self.logger.debug(
+ "ns.action_viminfo version={} nsr_id={}, action_id={} indata={}".format(
+ version, nsr_id, action_id, indata
+ )
+ )
+
+ ro_tasks = self.db.get_list("ro_tasks", {"tasks.action_id": action_id})
+ vim_info_list = []
+ for task in ro_tasks:
+ if task.get("vim_info"):
+ vim_info_list.append(task.get("vim_info"))
+
+ data = {
+ "nrs_id": nsr_id,
+ "action_id": action_id,
+ "vim_info_list": vim_info_list,
+ }
+ return data, None, True
+
def recreate_status(
self, session, indata, version, nsr_id, action_id, *args, **kwargs
):
)
raise NsException(e)
+ def get_console_task(
+ self,
+ vdu_id,
+ vnf_id,
+ vdu_index,
+ action_id,
+ nsr_id,
+ task_index,
+ target_vim,
+ extra_dict,
+ ):
+ self._assign_vim(target_vim)
+ target_record = "vnfrs:{}:vdur.{}.vim_info.{}".format(
+ vnf_id, vdu_index, target_vim
+ )
+ target_record_id = "vnfrs:{}:vdur.{}".format(vnf_id, vdu_id)
+ deployment_info = {
+ "action_id": action_id,
+ "nsr_id": nsr_id,
+ "task_index": task_index,
+ }
+
+ task = Ns._create_task(
+ deployment_info=deployment_info,
+ target_id=target_vim,
+ item="console",
+ action="EXEC",
+ target_record=target_record,
+ target_record_id=target_record_id,
+ extra_dict=extra_dict,
+ )
+ return task
+
+ def prepare_get_console(
+ self, session, action_dict, version, nsr_id, *args, **kwargs
+ ):
+ self.logger.debug(
+ f"prepare_get_console enter, session: {session}, "
+ f"action_dict: {action_dict}, nsr_id: {nsr_id}"
+ )
+ task_index = 0
+ extra_dict = {}
+ now = time()
+ action_id = action_dict.get("action_id", str(uuid4()))
+ step = ""
+ logging_text = "Task deploy nsr_id={} action_id={} ".format(nsr_id, action_id)
+ self.logger.debug(logging_text + "Enter")
+
+ action = list(action_dict.keys())[0]
+ task_dict = action_dict.get(action)
+ vim_vm_id = action_dict.get(action).get("vim_vm_id")
+
+ db_new_tasks = []
+ try:
+ step = "lock the operation & do task creation"
+ with self.write_lock:
+ extra_dict["params"] = {
+ "vim_vm_id": vim_vm_id,
+ "action": action,
+ }
+ task = self.get_console_task(
+ task_dict["vdu_id"],
+ task_dict["vnf_id"],
+ task_dict["vdu_index"],
+ action_id,
+ nsr_id,
+ task_index,
+ task_dict["target_vim"],
+ extra_dict,
+ )
+ db_new_tasks.append(task)
+ step = "upload Task to db"
+ self.upload_all_tasks(
+ db_new_tasks=db_new_tasks,
+ now=now,
+ )
+ self.logger.debug(
+ logging_text + "Exit. Created {} tasks".format(len(db_new_tasks))
+ )
+ return (
+ {"status": "ok", "nsr_id": nsr_id, "action_id": action_id},
+ action_id,
+ True,
+ )
+ except Exception as e:
+ if isinstance(e, (DbException, NsException)):
+ self.logger.error(
+ logging_text + "Exit Exception while '{}': {}".format(step, e)
+ )
+ else:
+ e = traceback_format_exc()
+ self.logger.critical(
+ logging_text + "Exit Exception while '{}': {}".format(step, e),
+ exc_info=True,
+ )
+ raise NsException(e)
+
def get_deploy(self, session, indata, version, nsr_id, action_id, *args, **kwargs):
nsrs = self.db.get_list("nsrs", {})
return_data = []
return "FAILED", ro_vim_item_update, db_task_update
+class VimInteractionConsoleVdu(VimInteractionBase):
+ def exec(self, ro_task, task_index, task_depends):
+ self.logger.debug("Execute getconsole")
+ task = ro_task["tasks"][task_index]
+ task_id = task["task_id"]
+ db_task_update = {"retries": 0}
+ target_vim = self.my_vims[ro_task["target_id"]]
+
+ self.logger.debug(f"Execute getconsole task: {task}")
+ try:
+ vim_vm_id = ""
+ if task.get("params"):
+ vim_vm_id = task["params"].get("vim_vm_id")
+ console_data = target_vim.get_vminstance_console(vim_vm_id)
+ self.logger.debug(f"Execute getconsole task result: {console_data}")
+ ro_vim_item_update = {"vim_id": vim_vm_id, "vim_console_data": console_data}
+ self.logger.debug(
+ "task={} {} getconsole done".format(task_id, ro_task["target_id"])
+ )
+ return "DONE", ro_vim_item_update, db_task_update
+ except (vimconn.VimConnException, NsWorkerException) as e:
+ self.logger.error(
+ "task={} vim={} VM Migration:"
+ " {}".format(task_id, ro_task["target_id"], e)
+ )
+ ro_vim_item_update = {
+ "vim_status": "VIM_ERROR",
+ "vim_message": str(e),
+ }
+
+ return "FAILED", ro_vim_item_update, db_task_update
+
+
class VimInteractionSdnNet(VimInteractionBase):
@staticmethod
def _match_pci(port_pci, mapping):
"update": VimInteractionUpdateVdu(
self.db, self.my_vims, self.db_vims, self.logger
),
+ "console": VimInteractionConsoleVdu(
+ self.db, self.my_vims, self.db_vims, self.logger
+ ),
"affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
self.db, self.my_vims, self.db_vims, self.logger
),
/<nsrs_id> O O O
/<action_id> O
/cancel O
-
"""
valid_query_string = ("ADMIN", "SET_PROJECT", "FORCE", "PUBLIC")
"ROLE_PERMISSION": "stop:id:",
},
},
+ "console": {
+ "METHODS": ("POST",),
+ "ROLE_PERMISSION": "console:",
+ "<ID>": {
+ "METHODS": ("POST",),
+ "ROLE_PERMISSION": "console:id:",
+ },
+ },
"deploy": {
"METHODS": ("GET",),
"ROLE_PERMISSION": "deploy:",
"METHODS": ("POST",),
"ROLE_PERMISSION": "deploy:id:id:cancel",
},
+ "viminfo": {
+ "METHODS": ("GET",),
+ "ROLE_PERMISSION": "deploy:id:id:viminfo:",
+ },
},
},
},
"deploy:id:post": self.ns.deploy,
"deploy:id:delete": self.ns.delete,
"deploy:id:id:get": self.ns.status,
+ "deploy:id:id:viminfo:get": self.ns.get_action_viminfo,
"deploy:id:id:cancel:post": self.ns.cancel,
"rebuild:id:post": self.ns.rebuild_start_stop,
"start:id:post": self.ns.rebuild_start_stop,
"stop:id:post": self.ns.rebuild_start_stop,
+ "console:id:post": self.ns.prepare_get_console,
"recreate:id:post": self.ns.recreate,
"recreate:id:id:get": self.ns.recreate_status,
"migrate:id:post": self.ns.migrate,
Utility class with helper methods to deal with vcenter
"""
import logging
+from queue import Empty, Queue
+import ssl
+import threading
import time
from osm_ro_plugin import vimconn
+from pyVim.connect import Disconnect, SmartConnect
from pyVmomi import vim
import requests
)
else:
self.logger.debug("ISO File updated successfully")
+
+
+class VCenterSessionPool:
+ """
+ Utility class to manage sessions using a pool
+ """
+
+ def __init__(
+ self,
+ host,
+ user,
+ password,
+ port=443,
+ pool_size=5,
+ ssl_context=None,
+ log_level=None,
+ ):
+ self._host = host
+ self._user = user
+ self._password = password
+ self._port = port
+ self._max_pool_size = pool_size
+ self._ssl_context = ssl_context
+ if not self._ssl_context:
+ self._ssl_context = ssl._create_unverified_context()
+
+ self.pool = Queue(maxsize=pool_size) # Limit the queue size
+ self.lock = threading.Lock()
+ self.live_sessions = 0
+
+ self.logger = logging.getLogger("ro.vim.vcenter.util")
+ if log_level:
+ self.logger.setLevel(getattr(logging, log_level))
+
+ def _connect(self):
+ try:
+ si = SmartConnect(
+ host=self._host,
+ user=self._user,
+ pwd=self._password,
+ port=self._port,
+ sslContext=self._ssl_context,
+ )
+ self.logger.debug("Created a new vCenter session")
+ return si
+ except vim.fault.InvalidLogin as e:
+ raise vimconn.VimConnAuthException(
+ f"Invalid login accesing vcenter: {str(e)}"
+ )
+ except Exception as e:
+ raise vimconn.VimConnConnectionException(
+ f"Invalid login accesing vcenter: {str(e)}"
+ )
+
+ def _is_session_alive(self, si):
+ if si is None:
+ return False
+ try:
+ alive = si.content.sessionManager.currentSession is not None
+ return alive
+ except Exception as e:
+ self.logger.info(f"Session check failed: {e}, must recreate session")
+ return False
+
+ def get_session(self, timeout=5):
+ try:
+ si = self.pool.get_nowait()
+ self.logger.debug("Reusing session from pool.")
+ except Empty:
+ with self.lock:
+ if self.live_sessions < self._max_pool_size:
+ si = self._connect()
+ self.live_sessions += 1
+ self.logger.debug(f"Live sessions count: {self.live_sessions}")
+ else:
+ self.logger.info(
+ "Pool is full. Waiting for an available session..."
+ )
+ si = self.pool.get(timeout=timeout)
+
+ if not self._is_session_alive(si):
+ self.logger.warning("Dead session detected. Replacing...")
+ try:
+ Disconnect(si)
+ except Exception as e:
+ self.logger.debug(f"Error during disconnect: {e}")
+ with self.lock:
+ self.live_sessions -= 1
+ self.logger.debug(f"Live sessions count: {self.live_sessions}")
+ return self.get_session(timeout=timeout)
+
+ return si
+
+ def return_session(self, si):
+ if self._is_session_alive(si):
+ self.logger.debug("Returning session to pool.")
+ self.pool.put(si)
+ else:
+ self.logger.debug(
+ "Session is dead on return. Dropping and decrementing count."
+ )
+ try:
+ Disconnect(si)
+ except Exception as e:
+ self.logger.debug(f"Error during disconnect: {e}")
+ with self.lock:
+ self.live_sessions -= 1
+ self.logger.info(f"Live sessions count: {self.live_sessions}")
+
+ def close_all(self):
+ self.logger.info("Closing all sessions in pool...")
+ while not self.pool.empty():
+ si = self.pool.get_nowait()
+ try:
+ Disconnect(si)
+ self.logger.debug("Session disconnected.")
+ except Exception as e:
+ self.logger.warning(f"Error closing session: {e}")
+ with self.lock:
+ self.live_sessions = 0
+ self.logger.info("All sessions closed. Pool is clean.")
self.logger.warning("WARN : Instance is not in Active state")
return None
+ def get_vm_clone_session_ticket(self, session, vm):
+ """
+ Obtain a clone session ticket for the indicated vm
+ """
+ ticket = session.content.sessionManager.AcquireCloneTicket()
+ return ticket
+
def unattach_volumes(self, session, vm, volumes):
"""
Unattach the indicated volumes, volumes includes the volume_path quoted
from osm_rovim_vcenter.vcenter_ipmanager import VCenterIpManager\r
from osm_rovim_vcenter.vcenter_network import VCenterNetworkUtil\r
from osm_rovim_vcenter.vcenter_util import VCenterFileUploader\r
+from osm_rovim_vcenter.vcenter_util import VCenterSessionPool\r
from osm_rovim_vcenter.vcenter_vms import VCenterVmsOps\r
from osm_rovim_vcenter.vcenter_vms import VCenterVmsUtil\r
from osm_rovim_vcenter.vim_helper import CloudInitHelper\r
-from pyVim.connect import Disconnect, SmartConnect\r
from pyVmomi import vim\r
import yaml\r
\r
nsx_verify_ssl=self.nsx_verify_ssl,\r
dhcp_configure_always=self.dhcp_configure_always,\r
)\r
+ self.vc_session_pool = VCenterSessionPool(\r
+ self.vcenter_hostname,\r
+ self.user,\r
+ self.passwd,\r
+ self.vcenter_port,\r
+ ssl_context=self.ssl_context,\r
+ log_level=log_level,\r
+ )\r
\r
def check_vim_connectivity(self):\r
self.logger.debug("Check vim connectivity")\r
finally:\r
self._disconnect_si(session)\r
\r
- def get_vminstance_console(self, vm_id, console_type="vnc"):\r
+ @handle_connector_exceptions\r
+ def get_vminstance_console(self, vm_id, console_type="vmrc"):\r
"""\r
Get a console for the virtual machine\r
Params:\r
self.logger.debug(\r
"Get vm instance console, vm_id: %s, console_type: %s", vm_id, console_type\r
)\r
- raise vimconn.VimConnNotImplemented(\r
- "get instance console is not supported in vcenter"\r
- )\r
+ # Check allowed consolo type\r
+ console_types = "vmrc"\r
+ if console_type not in console_types:\r
+ raise vimconn.VimConnException(\r
+ "console type '{}' not allowed".format(console_type),\r
+ http_code=vimconn.HTTP_Bad_Request,\r
+ )\r
+ VMRC_URL_FORMAT = "vmrc://clone:{ticket}@{vcenter_host}/?moid={vm_moid}"\r
+\r
+ session = self._get_vcenter_instance()\r
+ try:\r
+ # Get vm\r
+ vm = self.vcvms_util.get_vm_by_uuid(session, vm_id)\r
+\r
+ # Get session ticket\r
+ ticket = self.vcvms_util.get_vm_clone_session_ticket(session, vm)\r
+\r
+ # Build the URL\r
+ console_url = VMRC_URL_FORMAT.format(\r
+ ticket=ticket, vcenter_host=self.vcenter_hostname, vm_moid=vm._moId\r
+ )\r
+\r
+ console_dict = {"console_type": console_type, "url": console_url}\r
+ self.logger.debug("Obtained console_dict: %s", console_dict)\r
+ return console_dict\r
+ finally:\r
+ self._disconnect_si(session)\r
\r
@handle_connector_exceptions\r
def new_network(\r
self.vcenter_port,\r
self.user,\r
)\r
- si = SmartConnect(\r
- host=self.vcenter_hostname,\r
- user=self.user,\r
- pwd=self.passwd,\r
- port=self.vcenter_port,\r
- sslContext=self.ssl_context,\r
- )\r
- return si\r
+ return self.vc_session_pool.get_session()\r
\r
def _disconnect_si(self, server_instance):\r
- Disconnect(server_instance)\r
+ self.logger.debug("Disconnect session")\r
+ self.vc_session_pool.return_session(server_instance)\r
\r
def _get_vcenter_content(self, server_instance):\r
return server_instance.RetrieveContent()\r