Code Coverage

Cobertura Coverage Report > n2vc >

provisioner.py

Trend

Classes100%
 
Lines27%
   
Conditionals100%
 

File Coverage summary

NameClassesLinesConditionals
provisioner.py
100%
1/1
27%
78/284
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
provisioner.py
27%
78/284
N/A

Source

n2vc/provisioner.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #     Unless required by applicable law or agreed to in writing, software
10 #     distributed under the License is distributed on an "AS IS" BASIS,
11 #     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #     See the License for the specific language governing permissions and
13 #     limitations under the License.
14 1 import logging
15 1 import os
16 1 import re
17 1 import shlex
18 1 from subprocess import CalledProcessError
19 1 import tempfile
20 1 import time
21 1 import uuid
22
23 1 from juju.client import client
24 1 import n2vc.exceptions
25 1 import paramiko
26 1 import asyncio
27
28 1 arches = [
29     [re.compile(r"amd64|x86_64"), "amd64"],
30     [re.compile(r"i?[3-9]86"), "i386"],
31     [re.compile(r"(arm$)|(armv.*)"), "armhf"],
32     [re.compile(r"aarch64"), "arm64"],
33     [re.compile(r"ppc64|ppc64el|ppc64le"), "ppc64el"],
34     [re.compile(r"s390x?"), "s390x"],
35 ]
36
37
38 1 def normalize_arch(rawArch):
39     """Normalize the architecture string."""
40 0     for arch in arches:
41 0         if arch[0].match(rawArch):
42 0             return arch[1]
43
44
45 1 DETECTION_SCRIPT = """#!/bin/bash
46 set -e
47 os_id=$(grep '^ID=' /etc/os-release | tr -d '"' | cut -d= -f2)
48 if [ "$os_id" = 'centos' ]; then
49   os_version=$(grep '^VERSION_ID=' /etc/os-release | tr -d '"' | cut -d= -f2)
50   echo "centos$os_version"
51 else
52   lsb_release -cs
53 fi
54 uname -m
55 grep MemTotal /proc/meminfo
56 cat /proc/cpuinfo
57 """
58
59 1 INITIALIZE_UBUNTU_SCRIPT = """set -e
60 (id ubuntu &> /dev/null) || useradd -m ubuntu -s /bin/bash
61 umask 0077
62 temp=$(mktemp)
63 echo 'ubuntu ALL=(ALL) NOPASSWD:ALL' > $temp
64 install -m 0440 $temp /etc/sudoers.d/90-juju-ubuntu
65 rm $temp
66 su ubuntu -c '[ -f ~/.ssh/authorized_keys ] || install -D -m 0600 /dev/null ~/.ssh/authorized_keys'
67 export authorized_keys="{}"
68 if [ ! -z "$authorized_keys" ]; then
69     su ubuntu -c 'echo $authorized_keys >> ~/.ssh/authorized_keys'
70 fi
71 """
72
73 1 IPTABLES_SCRIPT = """#!/bin/bash
74 set -e
75 [ -v `which netfilter-persistent` ] && apt update \
76     && DEBIAN_FRONTEND=noninteractive apt-get install -yqq iptables-persistent
77 iptables -t nat -A OUTPUT -p tcp -d {} -j DNAT --to-destination {}
78 netfilter-persistent save
79 """
80
81
82 1 class AsyncSSHProvisioner:
83     """Provision a manually created machine via SSH."""
84
85 1     user = ""
86 1     host = ""
87 1     private_key_path = ""
88
89 1     def __init__(self, user, host, private_key_path, log=None):
90 0         self.host = host
91 0         self.user = user
92 0         self.private_key_path = private_key_path
93 0         self.log = log if log else logging.getLogger(__name__)
94
95 1     async def _scp(self, source_file, destination_file):
96         """Execute an scp command. Requires a fully qualified source and
97         destination.
98
99         :param str source_file: Path to the source file
100         :param str destination_file: Path to the destination file
101         """
102 0         cmd = [
103             "scp",
104             "-i",
105             os.path.expanduser(self.private_key_path),
106             "-o",
107             "StrictHostKeyChecking=no",
108             "-q",
109             "-B",
110         ]
111 0         destination = "{}@{}:{}".format(self.user, self.host, destination_file)
112 0         cmd.extend([source_file, destination])
113 0         process = await asyncio.create_subprocess_exec(*cmd)
114 0         await process.wait()
115 0         if process.returncode != 0:
116 0             raise CalledProcessError(returncode=process.returncode, cmd=cmd)
117
118 1     async def _ssh(self, command):
119         """Run a command remotely via SSH.
120
121         :param str command: The command to execute
122         :return: tuple: The stdout and stderr of the command execution
123         :raises: :class:`CalledProcessError` if the command fails
124         """
125
126 0         destination = "{}@{}".format(self.user, self.host)
127 0         cmd = [
128             "ssh",
129             "-i",
130             os.path.expanduser(self.private_key_path),
131             "-o",
132             "StrictHostKeyChecking=no",
133             "-q",
134             destination,
135         ]
136 0         cmd.extend([command])
137 0         process = await asyncio.create_subprocess_exec(
138             *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
139         )
140 0         stdout, stderr = await process.communicate()
141
142 0         if process.returncode != 0:
143 0             output = stderr.decode("utf-8").strip()
144 0             raise CalledProcessError(
145                 returncode=process.returncode, cmd=cmd, output=output
146             )
147 0         return (stdout.decode("utf-8").strip(), stderr.decode("utf-8").strip())
148
149 1     async def _init_ubuntu_user(self):
150         """Initialize the ubuntu user.
151
152         :return: bool: If the initialization was successful
153         :raises: :class:`CalledProcessError` if the _ssh command fails
154         """
155 0         retry = 10
156 0         attempts = 0
157 0         delay = 15
158 0         while attempts <= retry:
159 0             try:
160 0                 attempts += 1
161                 # Attempt to establish a SSH connection
162 0                 stdout, stderr = await self._ssh("sudo -n true")
163 0                 break
164 0             except CalledProcessError as e:
165 0                 self.log.debug(
166                     "Waiting for VM to boot, sleeping {} seconds".format(delay)
167                 )
168 0                 if attempts > retry:
169 0                     raise e
170                 else:
171 0                     await asyncio.sleep(delay)
172                     # Slowly back off the retry
173 0                     delay += 15
174
175         # Infer the public key
176 0         public_key = None
177 0         public_key_path = "{}.pub".format(self.private_key_path)
178
179 0         if not os.path.exists(public_key_path):
180 0             raise FileNotFoundError(
181                 "Public key '{}' doesn't exist.".format(public_key_path)
182             )
183
184 0         with open(public_key_path, "r") as f:
185 0             public_key = f.readline()
186
187 0         script = INITIALIZE_UBUNTU_SCRIPT.format(public_key)
188
189 0         stdout, stderr = await self._run_configure_script(script)
190
191 0         return True
192
193 1     async def _detect_hardware_and_os(self):
194         """Detect the target hardware capabilities and OS series.
195
196         :return: str: A raw string containing OS and hardware information.
197         """
198
199 0         info = {
200             "series": "",
201             "arch": "",
202             "cpu-cores": "",
203             "mem": "",
204         }
205
206 0         stdout, stderr = await self._run_configure_script(DETECTION_SCRIPT)
207
208 0         lines = stdout.split("\n")
209 0         info["series"] = lines[0].strip()
210 0         info["arch"] = normalize_arch(lines[1].strip())
211
212 0         memKb = re.split(r"\s+", lines[2])[1]
213
214         # Convert megabytes -> kilobytes
215 0         info["mem"] = round(int(memKb) / 1024)
216
217         # Detect available CPUs
218 0         recorded = {}
219 0         for line in lines[3:]:
220 0             physical_id = ""
221 0             print(line)
222
223 0             if line.find("physical id") == 0:
224 0                 physical_id = line.split(":")[1].strip()
225 0             elif line.find("cpu cores") == 0:
226 0                 cores = line.split(":")[1].strip()
227
228 0                 if physical_id not in recorded.keys():
229 0                     info["cpu-cores"] += cores
230 0                     recorded[physical_id] = True
231
232 0         return info
233
234 1     async def provision_machine(self):
235         """Perform the initial provisioning of the target machine.
236
237         :return: bool: The client.AddMachineParams
238         """
239 0         params = client.AddMachineParams()
240
241 0         if await self._init_ubuntu_user():
242 0             hw = await self._detect_hardware_and_os()
243 0             params.series = hw["series"]
244 0             params.instance_id = "manual:{}".format(self.host)
245 0             params.nonce = "manual:{}:{}".format(
246                 self.host, str(uuid.uuid4()),
247             )  # a nop for Juju w/manual machines
248 0             params.hardware_characteristics = {
249                 "arch": hw["arch"],
250                 "mem": int(hw["mem"]),
251                 "cpu-cores": int(hw["cpu-cores"]),
252             }
253 0             params.addresses = [{"value": self.host, "type": "ipv4", "scope": "public"}]
254
255 0         return params
256
257 1     async def install_agent(self, connection, nonce, machine_id, proxy=None):
258         """
259         :param object connection: Connection to Juju API
260         :param str nonce: The nonce machine specification
261         :param str machine_id: The id assigned to the machine
262         :param str proxy: IP of the API_PROXY
263
264         :return: bool: If the initialization was successful
265         """
266         # The path where the Juju agent should be installed.
267 0         data_dir = "/var/lib/juju"
268
269         # Disabling this prevents `apt-get update` from running initially, so
270         # charms will fail to deploy
271 0         disable_package_commands = False
272
273 0         client_facade = client.ClientFacade.from_connection(connection)
274 0         results = await client_facade.ProvisioningScript(
275             data_dir=data_dir,
276             disable_package_commands=disable_package_commands,
277             machine_id=machine_id,
278             nonce=nonce,
279         )
280
281         """Get the IP of the controller
282
283         Parse the provisioning script, looking for the first apiaddress.
284
285         Example:
286             apiaddresses:
287             - 10.195.8.2:17070
288             - 127.0.0.1:17070
289             - '[::1]:17070'
290         """
291 0         if proxy:
292 0             m = re.search(r"apiaddresses:\n- (\d+\.\d+\.\d+\.\d+):17070", results.script)
293 0             apiaddress = m.group(1)
294
295             """Add IP Table rule
296
297             In order to route the traffic to the private ip of the Juju controller
298             we use a DNAT rule to tell the machine that the destination for the
299             private address is the public address of the machine where the Juju
300             controller is running in LXD. That machine will have a complimentary
301             iptables rule, routing traffic to the appropriate LXD container.
302             """
303
304 0             script = IPTABLES_SCRIPT.format(apiaddress, proxy)
305
306             # Run this in a retry loop, because dpkg may be running and cause the
307             # script to fail.
308 0             retry = 10
309 0             attempts = 0
310 0             delay = 15
311
312 0             while attempts <= retry:
313 0                 try:
314 0                     attempts += 1
315 0                     stdout, stderr = await self._run_configure_script(script)
316 0                     break
317 0                 except Exception as e:
318 0                     self.log.debug("Waiting for dpkg, sleeping {} seconds".format(delay))
319 0                     if attempts > retry:
320 0                         raise e
321                     else:
322 0                         await asyncio.sleep(delay)
323                         # Slowly back off the retry
324 0                         delay += 15
325
326         # self.log.debug("Running configure script")
327 0         await self._run_configure_script(results.script)
328         # self.log.debug("Configure script finished")
329
330 1     async def _run_configure_script(self, script, root=True):
331         """Run the script to install the Juju agent on the target machine.
332
333         :param str script: The script to be executed
334         """
335 0         _, tmpFile = tempfile.mkstemp()
336 0         with open(tmpFile, "w") as f:
337 0             f.write(script)
338 0             f.close()
339
340         # copy the local copy of the script to the remote machine
341 0         await self._scp(tmpFile, tmpFile)
342
343         # run the provisioning script
344 0         return await self._ssh(
345             "{} /bin/bash {}".format("sudo" if root else "", tmpFile)
346         )
347
348
349 1 class SSHProvisioner:
350     """Provision a manually created machine via SSH."""
351
352 1     def __init__(self, user, host, private_key_path, log=None):
353
354 1         self.host = host
355 1         self.user = user
356 1         self.private_key_path = private_key_path
357
358 1         if log:
359 0             self.log = log
360         else:
361 1             self.log = logging.getLogger(__name__)
362
363 1     def _get_ssh_client(self, host=None, user=None, private_key_path=None):
364         """Return a connected Paramiko ssh object.
365
366         :param str host: The host to connect to.
367         :param str user: The user to connect as.
368         :param str key: The private key to authenticate with.
369
370         :return: object: A paramiko.SSHClient
371         :raises: :class:`paramiko.ssh_exception.SSHException` if the
372             connection failed
373         """
374
375 1         if not host:
376 1             host = self.host
377
378 1         if not user:
379 1             user = self.user
380
381 1         if not private_key_path:
382 1             private_key_path = self.private_key_path
383
384 1         ssh = paramiko.SSHClient()
385 1         ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
386
387 1         pkey = None
388
389         # Read the private key into a paramiko.RSAKey
390 1         if os.path.exists(private_key_path):
391 1             with open(private_key_path, "r") as f:
392 1                 pkey = paramiko.RSAKey.from_private_key(f)
393
394         #######################################################################
395         # There is a bug in some versions of OpenSSH 4.3 (CentOS/RHEL5) where #
396         # the server may not send the SSH_MSG_USERAUTH_BANNER message except  #
397         # when responding to an auth_none request. For example, paramiko will #
398         # attempt to use password authentication when a password is set, but  #
399         # the server could deny that, instead requesting keyboard-interactive.#
400         # The hack to workaround this is to attempt a reconnect, which will   #
401         # receive the right banner, and authentication can proceed. See the   #
402         # following for more info:                                            #
403         # https://github.com/paramiko/paramiko/issues/432                     #
404         # https://github.com/paramiko/paramiko/pull/438                       #
405         #######################################################################
406
407 1         retry = 10
408 1         attempts = 0
409 1         delay = 15
410 1         while attempts <= retry:
411 1             try:
412 1                 attempts += 1
413
414                 # Attempt to establish a SSH connection
415 1                 ssh.connect(
416                     host,
417                     port=22,
418                     username=user,
419                     pkey=pkey,
420                     # allow_agent=False,
421                     # look_for_keys=False,
422                 )
423 1                 break
424 1             except paramiko.ssh_exception.SSHException as e:
425 1                 if "Error reading SSH protocol banner" == str(e):
426                     # Once more, with feeling
427 1                     ssh.connect(host, port=22, username=user, pkey=pkey)
428                 else:
429                     # Reraise the original exception
430 1                     self.log.debug("Unhandled exception caught: {}".format(e))
431 1                     raise e
432 1             except Exception as e:
433 1                 if "Unable to connect to port" in str(e):
434 1                     self.log.debug(
435                         "Waiting for VM to boot, sleeping {} seconds".format(delay)
436                     )
437 1                     if attempts > retry:
438 1                         raise e
439                     else:
440 1                         time.sleep(delay)
441                         # Slowly back off the retry
442 1                         delay += 15
443                 else:
444 1                     self.log.debug(e)
445 1                     raise e
446 1         return ssh
447
448 1     def _run_command(self, ssh, cmd, pty=True):
449         """Run a command remotely via SSH.
450
451         :param object ssh: The SSHClient
452         :param str cmd: The command to execute
453         :param list cmd: The `shlex.split` command to execute
454         :param bool pty: Whether to allocate a pty
455
456         :return: tuple: The stdout and stderr of the command execution
457         :raises: :class:`CalledProcessError` if the command fails
458         """
459
460 0         if isinstance(cmd, str):
461 0             cmd = shlex.split(cmd)
462
463 0         if type(cmd) is not list:
464 0             cmd = [cmd]
465
466 0         cmds = " ".join(cmd)
467 0         _, stdout, stderr = ssh.exec_command(cmds, get_pty=pty)
468 0         retcode = stdout.channel.recv_exit_status()
469
470 0         if retcode > 0:
471 0             output = stderr.read().strip()
472 0             raise CalledProcessError(returncode=retcode, cmd=cmd, output=output)
473 0         return (
474             stdout.read().decode("utf-8").strip(),
475             stderr.read().decode("utf-8").strip(),
476         )
477
478 1     def _init_ubuntu_user(self):
479         """Initialize the ubuntu user.
480
481         :return: bool: If the initialization was successful
482         :raises: :class:`paramiko.ssh_exception.AuthenticationException`
483             if the authentication fails
484         """
485 0         ssh = None
486 0         try:
487             # Run w/o allocating a pty, so we fail if sudo prompts for a passwd
488 0             ssh = self._get_ssh_client()
489 0             self._run_command(ssh, "sudo -n true", pty=False)
490 0         except paramiko.ssh_exception.AuthenticationException:
491 0             raise n2vc.exceptions.AuthenticationFailed(self.user)
492 0         except paramiko.ssh_exception.NoValidConnectionsError:
493 0             raise n2vc.exceptions.NoRouteToHost(self.host)
494         finally:
495 0             if ssh:
496 0                 ssh.close()
497
498         # Infer the public key
499 0         public_key_path = "{}.pub".format(self.private_key_path)
500
501 0         if not os.path.exists(public_key_path):
502 0             raise FileNotFoundError(
503                 "Public key '{}' doesn't exist.".format(public_key_path)
504             )
505
506 0         with open(public_key_path, "r") as f:
507 0             public_key = f.readline()
508
509 0         script = INITIALIZE_UBUNTU_SCRIPT.format(public_key)
510
511 0         try:
512 0             ssh = self._get_ssh_client()
513
514 0             self._run_command(
515                 ssh, ["sudo", "/bin/bash -c " + shlex.quote(script)], pty=True
516             )
517 0         except paramiko.ssh_exception.AuthenticationException as e:
518 0             raise e
519         finally:
520 0             ssh.close()
521
522 0         return True
523
524 1     def _detect_hardware_and_os(self, ssh):
525         """Detect the target hardware capabilities and OS series.
526
527         :param object ssh: The SSHClient
528         :return: str: A raw string containing OS and hardware information.
529         """
530
531 0         info = {
532             "series": "",
533             "arch": "",
534             "cpu-cores": "",
535             "mem": "",
536         }
537
538 0         stdout, _ = self._run_command(
539             ssh, ["sudo", "/bin/bash -c " + shlex.quote(DETECTION_SCRIPT)], pty=True,
540         )
541
542 0         lines = stdout.split("\n")
543
544         # Remove extraneous line if DNS resolution of hostname famils
545         # i.e. sudo: unable to resolve host test-1-mgmtvm-1: Connection timed out
546 0         if "unable to resolve host" in lines[0]:
547 0             lines = lines[1:]
548
549 0         info["series"] = lines[0].strip()
550 0         info["arch"] = normalize_arch(lines[1].strip())
551
552 0         memKb = re.split(r"\s+", lines[2])[1]
553
554         # Convert megabytes -> kilobytes
555 0         info["mem"] = round(int(memKb) / 1024)
556
557         # Detect available CPUs
558 0         recorded = {}
559 0         for line in lines[3:]:
560 0             physical_id = ""
561
562 0             if line.find("physical id") == 0:
563 0                 physical_id = line.split(":")[1].strip()
564 0             elif line.find("cpu cores") == 0:
565 0                 cores = line.split(":")[1].strip()
566
567 0                 if physical_id not in recorded.keys():
568 0                     info["cpu-cores"] += cores
569 0                     recorded[physical_id] = True
570
571 0         return info
572
573 1     def provision_machine(self):
574         """Perform the initial provisioning of the target machine.
575
576         :return: bool: The client.AddMachineParams
577         :raises: :class:`paramiko.ssh_exception.AuthenticationException`
578             if the upload fails
579         """
580 0         params = client.AddMachineParams()
581
582 0         if self._init_ubuntu_user():
583 0             try:
584 0                 ssh = self._get_ssh_client()
585
586 0                 hw = self._detect_hardware_and_os(ssh)
587 0                 params.series = hw["series"]
588 0                 params.instance_id = "manual:{}".format(self.host)
589 0                 params.nonce = "manual:{}:{}".format(
590                     self.host, str(uuid.uuid4()),
591                 )  # a nop for Juju w/manual machines
592 0                 params.hardware_characteristics = {
593                     "arch": hw["arch"],
594                     "mem": int(hw["mem"]),
595                     "cpu-cores": int(hw["cpu-cores"]),
596                 }
597 0                 params.addresses = [
598                     {"value": self.host, "type": "ipv4", "scope": "public"}
599                 ]
600
601 0             except paramiko.ssh_exception.AuthenticationException as e:
602 0                 raise e
603             finally:
604 0                 ssh.close()
605
606 0         return params
607
608 1     async def install_agent(self, connection, nonce, machine_id, api):
609         """
610         :param object connection: Connection to Juju API
611         :param str nonce: The nonce machine specification
612         :param str machine_id: The id assigned to the machine
613
614         :return: bool: If the initialization was successful
615         """
616         # The path where the Juju agent should be installed.
617 0         data_dir = "/var/lib/juju"
618
619         # Disabling this prevents `apt-get update` from running initially, so
620         # charms will fail to deploy
621 0         disable_package_commands = False
622
623 0         client_facade = client.ClientFacade.from_connection(connection)
624 0         results = await client_facade.ProvisioningScript(
625             data_dir=data_dir,
626             disable_package_commands=disable_package_commands,
627             machine_id=machine_id,
628             nonce=nonce,
629         )
630
631         """Get the IP of the controller
632
633         Parse the provisioning script, looking for the first apiaddress.
634
635         Example:
636             apiaddresses:
637             - 10.195.8.2:17070
638             - 127.0.0.1:17070
639             - '[::1]:17070'
640         """
641 0         m = re.search(r"apiaddresses:\n- (\d+\.\d+\.\d+\.\d+):17070", results.script)
642 0         apiaddress = m.group(1)
643
644         """Add IP Table rule
645
646         In order to route the traffic to the private ip of the Juju controller
647         we use a DNAT rule to tell the machine that the destination for the
648         private address is the public address of the machine where the Juju
649         controller is running in LXD. That machine will have a complimentary
650         iptables rule, routing traffic to the appropriate LXD container.
651         """
652
653 0         script = IPTABLES_SCRIPT.format(apiaddress, api)
654
655         # Run this in a retry loop, because dpkg may be running and cause the
656         # script to fail.
657 0         retry = 10
658 0         attempts = 0
659 0         delay = 15
660
661 0         while attempts <= retry:
662 0             try:
663 0                 attempts += 1
664
665 0                 self._run_configure_script(script)
666 0                 break
667 0             except Exception as e:
668 0                 self.log.debug("Waiting for dpkg, sleeping {} seconds".format(delay))
669 0                 if attempts > retry:
670 0                     raise e
671                 else:
672 0                     time.sleep(delay)
673                     # Slowly back off the retry
674 0                     delay += 15
675
676         # self.log.debug("Running configure script")
677 0         self._run_configure_script(results.script)
678         # self.log.debug("Configure script finished")
679
680 1     def _run_configure_script(self, script: str):
681         """Run the script to install the Juju agent on the target machine.
682
683         :param str script: The script returned by the ProvisioningScript API
684         :raises: :class:`paramiko.ssh_exception.AuthenticationException`
685             if the upload fails
686         """
687 0         _, tmpFile = tempfile.mkstemp()
688 0         with open(tmpFile, "w") as f:
689 0             f.write(script)
690 0         try:
691             # get ssh client
692 0             ssh = self._get_ssh_client(user="ubuntu",)
693
694             # copy the local copy of the script to the remote machine
695 0             sftp = paramiko.SFTPClient.from_transport(ssh.get_transport())
696 0             sftp.put(
697                 tmpFile, tmpFile,
698             )
699
700             # run the provisioning script
701 0             self._run_command(
702                 ssh, "sudo /bin/bash {}".format(tmpFile),
703             )
704
705 0         except paramiko.ssh_exception.AuthenticationException as e:
706 0             raise e
707         finally:
708 0             os.remove(tmpFile)
709 0             ssh.close()