Fix bug 1050: Make provisioner asynchronous 59/8759/21
authorDavid Garcia <david.garcia@canonical.com>
Mon, 6 Apr 2020 10:42:26 +0000 (12:42 +0200)
committergarciadav <david.garcia@canonical.com>
Tue, 5 May 2020 19:46:36 +0000 (21:46 +0200)
Change-Id: I19a47abbea81deff64698a16715f5fd7c9e6e8e4
Signed-off-by: David Garcia <david.garcia@canonical.com>
n2vc/n2vc_juju_conn.py
n2vc/provisioner.py

index 0696e20..0e10337 100644 (file)
@@ -47,7 +47,7 @@ from n2vc.exceptions import (
 from n2vc.juju_observer import JujuModelObserver
 from n2vc.n2vc_conn import N2VCConnector
 from n2vc.n2vc_conn import obj_to_dict, obj_to_yaml
-from n2vc.provisioner import SSHProvisioner
+from n2vc.provisioner import AsyncSSHProvisioner
 
 
 class N2VCJujuConnector(N2VCConnector):
@@ -1009,7 +1009,7 @@ class N2VCJujuConnector(N2VCConnector):
         # TODO check if machine is already provisioned
         machine_list = await model.get_machines()
 
-        provisioner = SSHProvisioner(
+        provisioner = AsyncSSHProvisioner(
             host=hostname,
             user=username,
             private_key_path=private_key_path,
@@ -1018,7 +1018,7 @@ class N2VCJujuConnector(N2VCConnector):
 
         params = None
         try:
-            params = provisioner.provision_machine()
+            params = await provisioner.provision_machine()
         except Exception as ex:
             msg = "Exception provisioning machine: {}".format(ex)
             self.log.error(msg)
@@ -1335,8 +1335,8 @@ class N2VCJujuConnector(N2VCConnector):
                 if self.apt_mirror:
                     config_dict["apt-mirror"] = self.apt_mirror
                 if not self.enable_os_upgrade:
-                    config_dict['enable-os-refresh-update'] = False
-                    config_dict['enable-os-upgrade'] = False
+                    config_dict["enable-os-refresh-update"] = False
+                    config_dict["enable-os-upgrade"] = False
                 if self.cloud in self.BUILT_IN_CLOUDS:
                     model = await self.controller.add_model(
                         model_name=model_name,
@@ -1348,9 +1348,9 @@ class N2VCJujuConnector(N2VCConnector):
                         model_name=model_name,
                         config=config_dict,
                         cloud_name=self.cloud,
-                        credential_name="admin"
+                        credential_name="admin",
                     )
-                self.log.info('New model created, name={}'.format(model_name))
+                self.log.info("New model created, name={}".format(model_name))
             else:
                 self.log.debug(
                     "Model already exists in juju. Getting model {}".format(model_name)
index 5107242..a2fe13e 100644 (file)
@@ -23,7 +23,7 @@ import uuid
 from juju.client import client
 import n2vc.exceptions
 import paramiko
-
+import asyncio
 
 arches = [
     [re.compile(r"amd64|x86_64"), "amd64"],
@@ -79,6 +79,272 @@ netfilter-persistent save
 """
 
 
+class AsyncSSHProvisioner:
+    """Provision a manually created machine via SSH."""
+
+    user = ""
+    host = ""
+    private_key_path = ""
+
+    def __init__(self, user, host, private_key_path, log=None):
+        self.host = host
+        self.user = user
+        self.private_key_path = private_key_path
+        self.log = log if log else logging.getLogger(__name__)
+
+    async def _scp(self, source_file, destination_file):
+        """Execute an scp command. Requires a fully qualified source and
+        destination.
+
+        :param str source_file: Path to the source file
+        :param str destination_file: Path to the destination file
+        """
+        cmd = [
+            "scp",
+            "-i",
+            os.path.expanduser(self.private_key_path),
+            "-o",
+            "StrictHostKeyChecking=no",
+            "-q",
+            "-B",
+        ]
+        destination = "{}@{}:{}".format(self.user, self.host, destination_file)
+        cmd.extend([source_file, destination])
+        process = await asyncio.create_subprocess_exec(*cmd)
+        await process.wait()
+        if process.returncode != 0:
+            raise CalledProcessError(returncode=process.returncode, cmd=cmd)
+
+    async def _ssh(self, command):
+        """Run a command remotely via SSH.
+
+        :param str command: The command to execute
+        :return: tuple: The stdout and stderr of the command execution
+        :raises: :class:`CalledProcessError` if the command fails
+        """
+
+        destination = "{}@{}".format(self.user, self.host)
+        cmd = [
+            "ssh",
+            "-i",
+            os.path.expanduser(self.private_key_path),
+            "-o",
+            "StrictHostKeyChecking=no",
+            "-q",
+            destination,
+        ]
+        cmd.extend([command])
+        process = await asyncio.create_subprocess_exec(
+            *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
+        )
+        stdout, stderr = await process.communicate()
+
+        if process.returncode != 0:
+            output = stderr.decode("utf-8").strip()
+            raise CalledProcessError(
+                returncode=process.returncode, cmd=cmd, output=output
+            )
+        return (stdout.decode("utf-8").strip(), stderr.decode("utf-8").strip())
+
+    async def _init_ubuntu_user(self):
+        """Initialize the ubuntu user.
+
+        :return: bool: If the initialization was successful
+        :raises: :class:`CalledProcessError` if the _ssh command fails
+        """
+        retry = 10
+        attempts = 0
+        delay = 15
+        while attempts <= retry:
+            try:
+                attempts += 1
+                # Attempt to establish a SSH connection
+                stdout, stderr = await self._ssh("sudo -n true")
+                break
+            except CalledProcessError as e:
+                self.log.debug(
+                    "Waiting for VM to boot, sleeping {} seconds".format(delay)
+                )
+                if attempts > retry:
+                    raise e
+                else:
+                    await asyncio.sleep(delay)
+                    # Slowly back off the retry
+                    delay += 15
+
+        # Infer the public key
+        public_key = None
+        public_key_path = "{}.pub".format(self.private_key_path)
+
+        if not os.path.exists(public_key_path):
+            raise FileNotFoundError(
+                "Public key '{}' doesn't exist.".format(public_key_path)
+            )
+
+        with open(public_key_path, "r") as f:
+            public_key = f.readline()
+
+        script = INITIALIZE_UBUNTU_SCRIPT.format(public_key)
+
+        stdout, stderr = await self._run_configure_script(script)
+
+        return True
+
+    async def _detect_hardware_and_os(self):
+        """Detect the target hardware capabilities and OS series.
+
+        :return: str: A raw string containing OS and hardware information.
+        """
+
+        info = {
+            "series": "",
+            "arch": "",
+            "cpu-cores": "",
+            "mem": "",
+        }
+
+        stdout, stderr = await self._run_configure_script(DETECTION_SCRIPT)
+
+        lines = stdout.split("\n")
+        info["series"] = lines[0].strip()
+        info["arch"] = normalize_arch(lines[1].strip())
+
+        memKb = re.split(r"\s+", lines[2])[1]
+
+        # Convert megabytes -> kilobytes
+        info["mem"] = round(int(memKb) / 1024)
+
+        # Detect available CPUs
+        recorded = {}
+        for line in lines[3:]:
+            physical_id = ""
+            print(line)
+
+            if line.find("physical id") == 0:
+                physical_id = line.split(":")[1].strip()
+            elif line.find("cpu cores") == 0:
+                cores = line.split(":")[1].strip()
+
+                if physical_id not in recorded.keys():
+                    info["cpu-cores"] += cores
+                    recorded[physical_id] = True
+
+        return info
+
+    async def provision_machine(self):
+        """Perform the initial provisioning of the target machine.
+
+        :return: bool: The client.AddMachineParams
+        """
+        params = client.AddMachineParams()
+
+        if await self._init_ubuntu_user():
+            hw = await self._detect_hardware_and_os()
+            params.series = hw["series"]
+            params.instance_id = "manual:{}".format(self.host)
+            params.nonce = "manual:{}:{}".format(
+                self.host, str(uuid.uuid4()),  # a nop for Juju w/manual machines
+            )
+            params.hardware_characteristics = {
+                "arch": hw["arch"],
+                "mem": int(hw["mem"]),
+                "cpu-cores": int(hw["cpu-cores"]),
+            }
+            params.addresses = [{"value": self.host, "type": "ipv4", "scope": "public"}]
+
+        return params
+
+    async def install_agent(self, connection, nonce, machine_id, api):
+        """
+        :param object connection: Connection to Juju API
+        :param str nonce: The nonce machine specification
+        :param str machine_id: The id assigned to the machine
+        :param str api: IP of the API_PROXY
+
+        :return: bool: If the initialization was successful
+        """
+        # The path where the Juju agent should be installed.
+        data_dir = "/var/lib/juju"
+
+        # Disabling this prevents `apt-get update` from running initially, so
+        # charms will fail to deploy
+        disable_package_commands = False
+
+        client_facade = client.ClientFacade.from_connection(connection)
+        results = await client_facade.ProvisioningScript(
+            data_dir=data_dir,
+            disable_package_commands=disable_package_commands,
+            machine_id=machine_id,
+            nonce=nonce,
+        )
+
+        """Get the IP of the controller
+
+        Parse the provisioning script, looking for the first apiaddress.
+
+        Example:
+            apiaddresses:
+            - 10.195.8.2:17070
+            - 127.0.0.1:17070
+            - '[::1]:17070'
+        """
+        m = re.search(r"apiaddresses:\n- (\d+\.\d+\.\d+\.\d+):17070", results.script)
+        apiaddress = m.group(1)
+
+        """Add IP Table rule
+
+        In order to route the traffic to the private ip of the Juju controller
+        we use a DNAT rule to tell the machine that the destination for the
+        private address is the public address of the machine where the Juju
+        controller is running in LXD. That machine will have a complimentary
+        iptables rule, routing traffic to the appropriate LXD container.
+        """
+
+        script = IPTABLES_SCRIPT.format(apiaddress, api)
+
+        # Run this in a retry loop, because dpkg may be running and cause the
+        # script to fail.
+        retry = 10
+        attempts = 0
+        delay = 15
+
+        while attempts <= retry:
+            try:
+                attempts += 1
+                stdout, stderr = await self._run_configure_script(script)
+                break
+            except Exception as e:
+                self.log.debug("Waiting for dpkg, sleeping {} seconds".format(delay))
+                if attempts > retry:
+                    raise e
+                else:
+                    await asyncio.sleep(delay)
+                    # Slowly back off the retry
+                    delay += 15
+
+        # self.log.debug("Running configure script")
+        await self._run_configure_script(results.script)
+        # self.log.debug("Configure script finished")
+
+    async def _run_configure_script(self, script, root=True):
+        """Run the script to install the Juju agent on the target machine.
+
+        :param str script: The script to be executed
+        """
+        _, tmpFile = tempfile.mkstemp()
+        with open(tmpFile, "w") as f:
+            f.write(script)
+            f.close()
+
+        # copy the local copy of the script to the remote machine
+        await self._scp(tmpFile, tmpFile)
+
+        # run the provisioning script
+        return await self._ssh(
+            "{} /bin/bash {}".format("sudo" if root else "", tmpFile)
+        )
+
+
 class SSHProvisioner:
     """Provision a manually created machine via SSH."""