From: David Garcia Date: Mon, 6 Apr 2020 10:42:26 +0000 (+0200) Subject: Fix bug 1050: Make provisioner asynchronous X-Git-Tag: v7.1.0rc1~2 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=019363e955266e48f0bc8dd8a9beddedea4f584b;p=osm%2FN2VC.git Fix bug 1050: Make provisioner asynchronous Change-Id: I19a47abbea81deff64698a16715f5fd7c9e6e8e4 Signed-off-by: David Garcia --- diff --git a/n2vc/n2vc_juju_conn.py b/n2vc/n2vc_juju_conn.py index 0696e20..0e10337 100644 --- a/n2vc/n2vc_juju_conn.py +++ b/n2vc/n2vc_juju_conn.py @@ -47,7 +47,7 @@ from n2vc.exceptions import ( from n2vc.juju_observer import JujuModelObserver from n2vc.n2vc_conn import N2VCConnector from n2vc.n2vc_conn import obj_to_dict, obj_to_yaml -from n2vc.provisioner import SSHProvisioner +from n2vc.provisioner import AsyncSSHProvisioner class N2VCJujuConnector(N2VCConnector): @@ -1009,7 +1009,7 @@ class N2VCJujuConnector(N2VCConnector): # TODO check if machine is already provisioned machine_list = await model.get_machines() - provisioner = SSHProvisioner( + provisioner = AsyncSSHProvisioner( host=hostname, user=username, private_key_path=private_key_path, @@ -1018,7 +1018,7 @@ class N2VCJujuConnector(N2VCConnector): params = None try: - params = provisioner.provision_machine() + params = await provisioner.provision_machine() except Exception as ex: msg = "Exception provisioning machine: {}".format(ex) self.log.error(msg) @@ -1335,8 +1335,8 @@ class N2VCJujuConnector(N2VCConnector): if self.apt_mirror: config_dict["apt-mirror"] = self.apt_mirror if not self.enable_os_upgrade: - config_dict['enable-os-refresh-update'] = False - config_dict['enable-os-upgrade'] = False + config_dict["enable-os-refresh-update"] = False + config_dict["enable-os-upgrade"] = False if self.cloud in self.BUILT_IN_CLOUDS: model = await self.controller.add_model( model_name=model_name, @@ -1348,9 +1348,9 @@ class N2VCJujuConnector(N2VCConnector): model_name=model_name, config=config_dict, cloud_name=self.cloud, - credential_name="admin" + credential_name="admin", ) - self.log.info('New model created, name={}'.format(model_name)) + self.log.info("New model created, name={}".format(model_name)) else: self.log.debug( "Model already exists in juju. Getting model {}".format(model_name) diff --git a/n2vc/provisioner.py b/n2vc/provisioner.py index 5107242..a2fe13e 100644 --- a/n2vc/provisioner.py +++ b/n2vc/provisioner.py @@ -23,7 +23,7 @@ import uuid from juju.client import client import n2vc.exceptions import paramiko - +import asyncio arches = [ [re.compile(r"amd64|x86_64"), "amd64"], @@ -79,6 +79,272 @@ netfilter-persistent save """ +class AsyncSSHProvisioner: + """Provision a manually created machine via SSH.""" + + user = "" + host = "" + private_key_path = "" + + def __init__(self, user, host, private_key_path, log=None): + self.host = host + self.user = user + self.private_key_path = private_key_path + self.log = log if log else logging.getLogger(__name__) + + async def _scp(self, source_file, destination_file): + """Execute an scp command. Requires a fully qualified source and + destination. + + :param str source_file: Path to the source file + :param str destination_file: Path to the destination file + """ + cmd = [ + "scp", + "-i", + os.path.expanduser(self.private_key_path), + "-o", + "StrictHostKeyChecking=no", + "-q", + "-B", + ] + destination = "{}@{}:{}".format(self.user, self.host, destination_file) + cmd.extend([source_file, destination]) + process = await asyncio.create_subprocess_exec(*cmd) + await process.wait() + if process.returncode != 0: + raise CalledProcessError(returncode=process.returncode, cmd=cmd) + + async def _ssh(self, command): + """Run a command remotely via SSH. + + :param str command: The command to execute + :return: tuple: The stdout and stderr of the command execution + :raises: :class:`CalledProcessError` if the command fails + """ + + destination = "{}@{}".format(self.user, self.host) + cmd = [ + "ssh", + "-i", + os.path.expanduser(self.private_key_path), + "-o", + "StrictHostKeyChecking=no", + "-q", + destination, + ] + cmd.extend([command]) + process = await asyncio.create_subprocess_exec( + *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await process.communicate() + + if process.returncode != 0: + output = stderr.decode("utf-8").strip() + raise CalledProcessError( + returncode=process.returncode, cmd=cmd, output=output + ) + return (stdout.decode("utf-8").strip(), stderr.decode("utf-8").strip()) + + async def _init_ubuntu_user(self): + """Initialize the ubuntu user. + + :return: bool: If the initialization was successful + :raises: :class:`CalledProcessError` if the _ssh command fails + """ + retry = 10 + attempts = 0 + delay = 15 + while attempts <= retry: + try: + attempts += 1 + # Attempt to establish a SSH connection + stdout, stderr = await self._ssh("sudo -n true") + break + except CalledProcessError as e: + self.log.debug( + "Waiting for VM to boot, sleeping {} seconds".format(delay) + ) + if attempts > retry: + raise e + else: + await asyncio.sleep(delay) + # Slowly back off the retry + delay += 15 + + # Infer the public key + public_key = None + public_key_path = "{}.pub".format(self.private_key_path) + + if not os.path.exists(public_key_path): + raise FileNotFoundError( + "Public key '{}' doesn't exist.".format(public_key_path) + ) + + with open(public_key_path, "r") as f: + public_key = f.readline() + + script = INITIALIZE_UBUNTU_SCRIPT.format(public_key) + + stdout, stderr = await self._run_configure_script(script) + + return True + + async def _detect_hardware_and_os(self): + """Detect the target hardware capabilities and OS series. + + :return: str: A raw string containing OS and hardware information. + """ + + info = { + "series": "", + "arch": "", + "cpu-cores": "", + "mem": "", + } + + stdout, stderr = await self._run_configure_script(DETECTION_SCRIPT) + + lines = stdout.split("\n") + info["series"] = lines[0].strip() + info["arch"] = normalize_arch(lines[1].strip()) + + memKb = re.split(r"\s+", lines[2])[1] + + # Convert megabytes -> kilobytes + info["mem"] = round(int(memKb) / 1024) + + # Detect available CPUs + recorded = {} + for line in lines[3:]: + physical_id = "" + print(line) + + if line.find("physical id") == 0: + physical_id = line.split(":")[1].strip() + elif line.find("cpu cores") == 0: + cores = line.split(":")[1].strip() + + if physical_id not in recorded.keys(): + info["cpu-cores"] += cores + recorded[physical_id] = True + + return info + + async def provision_machine(self): + """Perform the initial provisioning of the target machine. + + :return: bool: The client.AddMachineParams + """ + params = client.AddMachineParams() + + if await self._init_ubuntu_user(): + hw = await self._detect_hardware_and_os() + params.series = hw["series"] + params.instance_id = "manual:{}".format(self.host) + params.nonce = "manual:{}:{}".format( + self.host, str(uuid.uuid4()), # a nop for Juju w/manual machines + ) + params.hardware_characteristics = { + "arch": hw["arch"], + "mem": int(hw["mem"]), + "cpu-cores": int(hw["cpu-cores"]), + } + params.addresses = [{"value": self.host, "type": "ipv4", "scope": "public"}] + + return params + + async def install_agent(self, connection, nonce, machine_id, api): + """ + :param object connection: Connection to Juju API + :param str nonce: The nonce machine specification + :param str machine_id: The id assigned to the machine + :param str api: IP of the API_PROXY + + :return: bool: If the initialization was successful + """ + # The path where the Juju agent should be installed. + data_dir = "/var/lib/juju" + + # Disabling this prevents `apt-get update` from running initially, so + # charms will fail to deploy + disable_package_commands = False + + client_facade = client.ClientFacade.from_connection(connection) + results = await client_facade.ProvisioningScript( + data_dir=data_dir, + disable_package_commands=disable_package_commands, + machine_id=machine_id, + nonce=nonce, + ) + + """Get the IP of the controller + + Parse the provisioning script, looking for the first apiaddress. + + Example: + apiaddresses: + - 10.195.8.2:17070 + - 127.0.0.1:17070 + - '[::1]:17070' + """ + m = re.search(r"apiaddresses:\n- (\d+\.\d+\.\d+\.\d+):17070", results.script) + apiaddress = m.group(1) + + """Add IP Table rule + + In order to route the traffic to the private ip of the Juju controller + we use a DNAT rule to tell the machine that the destination for the + private address is the public address of the machine where the Juju + controller is running in LXD. That machine will have a complimentary + iptables rule, routing traffic to the appropriate LXD container. + """ + + script = IPTABLES_SCRIPT.format(apiaddress, api) + + # Run this in a retry loop, because dpkg may be running and cause the + # script to fail. + retry = 10 + attempts = 0 + delay = 15 + + while attempts <= retry: + try: + attempts += 1 + stdout, stderr = await self._run_configure_script(script) + break + except Exception as e: + self.log.debug("Waiting for dpkg, sleeping {} seconds".format(delay)) + if attempts > retry: + raise e + else: + await asyncio.sleep(delay) + # Slowly back off the retry + delay += 15 + + # self.log.debug("Running configure script") + await self._run_configure_script(results.script) + # self.log.debug("Configure script finished") + + async def _run_configure_script(self, script, root=True): + """Run the script to install the Juju agent on the target machine. + + :param str script: The script to be executed + """ + _, tmpFile = tempfile.mkstemp() + with open(tmpFile, "w") as f: + f.write(script) + f.close() + + # copy the local copy of the script to the remote machine + await self._scp(tmpFile, tmpFile) + + # run the provisioning script + return await self._ssh( + "{} /bin/bash {}".format("sudo" if root else "", tmpFile) + ) + + class SSHProvisioner: """Provision a manually created machine via SSH."""