Store k8s controller information in Mongo, and remove controller attribute from K8sJu...
[osm/N2VC.git] / n2vc / provisioner.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import logging
15 import os
16 import re
17 import shlex
18 from subprocess import CalledProcessError
19 import tempfile
20 import time
21 import uuid
22
23 from juju.client import client
24 import n2vc.exceptions
25 import paramiko
26 import asyncio
27
28 arches = [
29 [re.compile(r"amd64|x86_64"), "amd64"],
30 [re.compile(r"i?[3-9]86"), "i386"],
31 [re.compile(r"(arm$)|(armv.*)"), "armhf"],
32 [re.compile(r"aarch64"), "arm64"],
33 [re.compile(r"ppc64|ppc64el|ppc64le"), "ppc64el"],
34 [re.compile(r"s390x?"), "s390x"],
35 ]
36
37
38 def normalize_arch(rawArch):
39 """Normalize the architecture string."""
40 for arch in arches:
41 if arch[0].match(rawArch):
42 return arch[1]
43
44
45 DETECTION_SCRIPT = """#!/bin/bash
46 set -e
47 os_id=$(grep '^ID=' /etc/os-release | tr -d '"' | cut -d= -f2)
48 if [ "$os_id" = 'centos' ]; then
49 os_version=$(grep '^VERSION_ID=' /etc/os-release | tr -d '"' | cut -d= -f2)
50 echo "centos$os_version"
51 else
52 lsb_release -cs
53 fi
54 uname -m
55 grep MemTotal /proc/meminfo
56 cat /proc/cpuinfo
57 """
58
59 INITIALIZE_UBUNTU_SCRIPT = """set -e
60 (id ubuntu &> /dev/null) || useradd -m ubuntu -s /bin/bash
61 umask 0077
62 temp=$(mktemp)
63 echo 'ubuntu ALL=(ALL) NOPASSWD:ALL' > $temp
64 install -m 0440 $temp /etc/sudoers.d/90-juju-ubuntu
65 rm $temp
66 su ubuntu -c '[ -f ~/.ssh/authorized_keys ] || install -D -m 0600 /dev/null ~/.ssh/authorized_keys'
67 export authorized_keys="{}"
68 if [ ! -z "$authorized_keys" ]; then
69 su ubuntu -c 'echo $authorized_keys >> ~/.ssh/authorized_keys'
70 fi
71 """
72
73 IPTABLES_SCRIPT = """#!/bin/bash
74 set -e
75 [ -v `which netfilter-persistent` ] && apt update \
76 && DEBIAN_FRONTEND=noninteractive apt-get install -yqq iptables-persistent
77 iptables -t nat -A OUTPUT -p tcp -d {} -j DNAT --to-destination {}
78 netfilter-persistent save
79 """
80
81
82 class AsyncSSHProvisioner:
83 """Provision a manually created machine via SSH."""
84
85 user = ""
86 host = ""
87 private_key_path = ""
88
89 def __init__(self, user, host, private_key_path, log=None):
90 self.host = host
91 self.user = user
92 self.private_key_path = private_key_path
93 self.log = log if log else logging.getLogger(__name__)
94
95 async def _scp(self, source_file, destination_file):
96 """Execute an scp command. Requires a fully qualified source and
97 destination.
98
99 :param str source_file: Path to the source file
100 :param str destination_file: Path to the destination file
101 """
102 cmd = [
103 "scp",
104 "-i",
105 os.path.expanduser(self.private_key_path),
106 "-o",
107 "StrictHostKeyChecking=no",
108 "-q",
109 "-B",
110 ]
111 destination = "{}@{}:{}".format(self.user, self.host, destination_file)
112 cmd.extend([source_file, destination])
113 process = await asyncio.create_subprocess_exec(*cmd)
114 await process.wait()
115 if process.returncode != 0:
116 raise CalledProcessError(returncode=process.returncode, cmd=cmd)
117
118 async def _ssh(self, command):
119 """Run a command remotely via SSH.
120
121 :param str command: The command to execute
122 :return: tuple: The stdout and stderr of the command execution
123 :raises: :class:`CalledProcessError` if the command fails
124 """
125
126 destination = "{}@{}".format(self.user, self.host)
127 cmd = [
128 "ssh",
129 "-i",
130 os.path.expanduser(self.private_key_path),
131 "-o",
132 "StrictHostKeyChecking=no",
133 "-q",
134 destination,
135 ]
136 cmd.extend([command])
137 process = await asyncio.create_subprocess_exec(
138 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
139 )
140 stdout, stderr = await process.communicate()
141
142 if process.returncode != 0:
143 output = stderr.decode("utf-8").strip()
144 raise CalledProcessError(
145 returncode=process.returncode, cmd=cmd, output=output
146 )
147 return (stdout.decode("utf-8").strip(), stderr.decode("utf-8").strip())
148
149 async def _init_ubuntu_user(self):
150 """Initialize the ubuntu user.
151
152 :return: bool: If the initialization was successful
153 :raises: :class:`CalledProcessError` if the _ssh command fails
154 """
155 retry = 10
156 attempts = 0
157 delay = 15
158 while attempts <= retry:
159 try:
160 attempts += 1
161 # Attempt to establish a SSH connection
162 stdout, stderr = await self._ssh("sudo -n true")
163 break
164 except CalledProcessError as e:
165 self.log.debug(
166 "Waiting for VM to boot, sleeping {} seconds".format(delay)
167 )
168 if attempts > retry:
169 raise e
170 else:
171 await asyncio.sleep(delay)
172 # Slowly back off the retry
173 delay += 15
174
175 # Infer the public key
176 public_key = None
177 public_key_path = "{}.pub".format(self.private_key_path)
178
179 if not os.path.exists(public_key_path):
180 raise FileNotFoundError(
181 "Public key '{}' doesn't exist.".format(public_key_path)
182 )
183
184 with open(public_key_path, "r") as f:
185 public_key = f.readline()
186
187 script = INITIALIZE_UBUNTU_SCRIPT.format(public_key)
188
189 stdout, stderr = await self._run_configure_script(script)
190
191 return True
192
193 async def _detect_hardware_and_os(self):
194 """Detect the target hardware capabilities and OS series.
195
196 :return: str: A raw string containing OS and hardware information.
197 """
198
199 info = {
200 "series": "",
201 "arch": "",
202 "cpu-cores": "",
203 "mem": "",
204 }
205
206 stdout, stderr = await self._run_configure_script(DETECTION_SCRIPT)
207
208 lines = stdout.split("\n")
209 info["series"] = lines[0].strip()
210 info["arch"] = normalize_arch(lines[1].strip())
211
212 memKb = re.split(r"\s+", lines[2])[1]
213
214 # Convert megabytes -> kilobytes
215 info["mem"] = round(int(memKb) / 1024)
216
217 # Detect available CPUs
218 recorded = {}
219 for line in lines[3:]:
220 physical_id = ""
221 print(line)
222
223 if line.find("physical id") == 0:
224 physical_id = line.split(":")[1].strip()
225 elif line.find("cpu cores") == 0:
226 cores = line.split(":")[1].strip()
227
228 if physical_id not in recorded.keys():
229 info["cpu-cores"] += cores
230 recorded[physical_id] = True
231
232 return info
233
234 async def provision_machine(self):
235 """Perform the initial provisioning of the target machine.
236
237 :return: bool: The client.AddMachineParams
238 """
239 params = client.AddMachineParams()
240
241 if await self._init_ubuntu_user():
242 hw = await self._detect_hardware_and_os()
243 params.series = hw["series"]
244 params.instance_id = "manual:{}".format(self.host)
245 params.nonce = "manual:{}:{}".format(
246 self.host, str(uuid.uuid4()),
247 ) # a nop for Juju w/manual machines
248 params.hardware_characteristics = {
249 "arch": hw["arch"],
250 "mem": int(hw["mem"]),
251 "cpu-cores": int(hw["cpu-cores"]),
252 }
253 params.addresses = [{"value": self.host, "type": "ipv4", "scope": "public"}]
254
255 return params
256
257 async def install_agent(self, connection, nonce, machine_id, proxy=None):
258 """
259 :param object connection: Connection to Juju API
260 :param str nonce: The nonce machine specification
261 :param str machine_id: The id assigned to the machine
262 :param str proxy: IP of the API_PROXY
263
264 :return: bool: If the initialization was successful
265 """
266 # The path where the Juju agent should be installed.
267 data_dir = "/var/lib/juju"
268
269 # Disabling this prevents `apt-get update` from running initially, so
270 # charms will fail to deploy
271 disable_package_commands = False
272
273 client_facade = client.ClientFacade.from_connection(connection)
274 results = await client_facade.ProvisioningScript(
275 data_dir=data_dir,
276 disable_package_commands=disable_package_commands,
277 machine_id=machine_id,
278 nonce=nonce,
279 )
280
281 """Get the IP of the controller
282
283 Parse the provisioning script, looking for the first apiaddress.
284
285 Example:
286 apiaddresses:
287 - 10.195.8.2:17070
288 - 127.0.0.1:17070
289 - '[::1]:17070'
290 """
291 if proxy:
292 m = re.search(r"apiaddresses:\n- (\d+\.\d+\.\d+\.\d+):17070", results.script)
293 apiaddress = m.group(1)
294
295 """Add IP Table rule
296
297 In order to route the traffic to the private ip of the Juju controller
298 we use a DNAT rule to tell the machine that the destination for the
299 private address is the public address of the machine where the Juju
300 controller is running in LXD. That machine will have a complimentary
301 iptables rule, routing traffic to the appropriate LXD container.
302 """
303
304 script = IPTABLES_SCRIPT.format(apiaddress, proxy)
305
306 # Run this in a retry loop, because dpkg may be running and cause the
307 # script to fail.
308 retry = 10
309 attempts = 0
310 delay = 15
311
312 while attempts <= retry:
313 try:
314 attempts += 1
315 stdout, stderr = await self._run_configure_script(script)
316 break
317 except Exception as e:
318 self.log.debug("Waiting for dpkg, sleeping {} seconds".format(delay))
319 if attempts > retry:
320 raise e
321 else:
322 await asyncio.sleep(delay)
323 # Slowly back off the retry
324 delay += 15
325
326 # self.log.debug("Running configure script")
327 await self._run_configure_script(results.script)
328 # self.log.debug("Configure script finished")
329
330 async def _run_configure_script(self, script, root=True):
331 """Run the script to install the Juju agent on the target machine.
332
333 :param str script: The script to be executed
334 """
335 _, tmpFile = tempfile.mkstemp()
336 with open(tmpFile, "w") as f:
337 f.write(script)
338 f.close()
339
340 # copy the local copy of the script to the remote machine
341 await self._scp(tmpFile, tmpFile)
342
343 # run the provisioning script
344 return await self._ssh(
345 "{} /bin/bash {}".format("sudo" if root else "", tmpFile)
346 )
347
348
349 class SSHProvisioner:
350 """Provision a manually created machine via SSH."""
351
352 def __init__(self, user, host, private_key_path, log=None):
353
354 self.host = host
355 self.user = user
356 self.private_key_path = private_key_path
357
358 if log:
359 self.log = log
360 else:
361 self.log = logging.getLogger(__name__)
362
363 def _get_ssh_client(self, host=None, user=None, private_key_path=None):
364 """Return a connected Paramiko ssh object.
365
366 :param str host: The host to connect to.
367 :param str user: The user to connect as.
368 :param str key: The private key to authenticate with.
369
370 :return: object: A paramiko.SSHClient
371 :raises: :class:`paramiko.ssh_exception.SSHException` if the
372 connection failed
373 """
374
375 if not host:
376 host = self.host
377
378 if not user:
379 user = self.user
380
381 if not private_key_path:
382 private_key_path = self.private_key_path
383
384 ssh = paramiko.SSHClient()
385 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
386
387 pkey = None
388
389 # Read the private key into a paramiko.RSAKey
390 if os.path.exists(private_key_path):
391 with open(private_key_path, "r") as f:
392 pkey = paramiko.RSAKey.from_private_key(f)
393
394 #######################################################################
395 # There is a bug in some versions of OpenSSH 4.3 (CentOS/RHEL5) where #
396 # the server may not send the SSH_MSG_USERAUTH_BANNER message except #
397 # when responding to an auth_none request. For example, paramiko will #
398 # attempt to use password authentication when a password is set, but #
399 # the server could deny that, instead requesting keyboard-interactive.#
400 # The hack to workaround this is to attempt a reconnect, which will #
401 # receive the right banner, and authentication can proceed. See the #
402 # following for more info: #
403 # https://github.com/paramiko/paramiko/issues/432 #
404 # https://github.com/paramiko/paramiko/pull/438 #
405 #######################################################################
406
407 retry = 10
408 attempts = 0
409 delay = 15
410 while attempts <= retry:
411 try:
412 attempts += 1
413
414 # Attempt to establish a SSH connection
415 ssh.connect(
416 host,
417 port=22,
418 username=user,
419 pkey=pkey,
420 # allow_agent=False,
421 # look_for_keys=False,
422 )
423 break
424 except paramiko.ssh_exception.SSHException as e:
425 if "Error reading SSH protocol banner" == str(e):
426 # Once more, with feeling
427 ssh.connect(host, port=22, username=user, pkey=pkey)
428 else:
429 # Reraise the original exception
430 self.log.debug("Unhandled exception caught: {}".format(e))
431 raise e
432 except Exception as e:
433 if "Unable to connect to port" in str(e):
434 self.log.debug(
435 "Waiting for VM to boot, sleeping {} seconds".format(delay)
436 )
437 if attempts > retry:
438 raise e
439 else:
440 time.sleep(delay)
441 # Slowly back off the retry
442 delay += 15
443 else:
444 self.log.debug(e)
445 raise e
446 return ssh
447
448 def _run_command(self, ssh, cmd, pty=True):
449 """Run a command remotely via SSH.
450
451 :param object ssh: The SSHClient
452 :param str cmd: The command to execute
453 :param list cmd: The `shlex.split` command to execute
454 :param bool pty: Whether to allocate a pty
455
456 :return: tuple: The stdout and stderr of the command execution
457 :raises: :class:`CalledProcessError` if the command fails
458 """
459
460 if isinstance(cmd, str):
461 cmd = shlex.split(cmd)
462
463 if type(cmd) is not list:
464 cmd = [cmd]
465
466 cmds = " ".join(cmd)
467 _, stdout, stderr = ssh.exec_command(cmds, get_pty=pty)
468 retcode = stdout.channel.recv_exit_status()
469
470 if retcode > 0:
471 output = stderr.read().strip()
472 raise CalledProcessError(returncode=retcode, cmd=cmd, output=output)
473 return (
474 stdout.read().decode("utf-8").strip(),
475 stderr.read().decode("utf-8").strip(),
476 )
477
478 def _init_ubuntu_user(self):
479 """Initialize the ubuntu user.
480
481 :return: bool: If the initialization was successful
482 :raises: :class:`paramiko.ssh_exception.AuthenticationException`
483 if the authentication fails
484 """
485 ssh = None
486 try:
487 # Run w/o allocating a pty, so we fail if sudo prompts for a passwd
488 ssh = self._get_ssh_client()
489 self._run_command(ssh, "sudo -n true", pty=False)
490 except paramiko.ssh_exception.AuthenticationException:
491 raise n2vc.exceptions.AuthenticationFailed(self.user)
492 except paramiko.ssh_exception.NoValidConnectionsError:
493 raise n2vc.exceptions.NoRouteToHost(self.host)
494 finally:
495 if ssh:
496 ssh.close()
497
498 # Infer the public key
499 public_key_path = "{}.pub".format(self.private_key_path)
500
501 if not os.path.exists(public_key_path):
502 raise FileNotFoundError(
503 "Public key '{}' doesn't exist.".format(public_key_path)
504 )
505
506 with open(public_key_path, "r") as f:
507 public_key = f.readline()
508
509 script = INITIALIZE_UBUNTU_SCRIPT.format(public_key)
510
511 try:
512 ssh = self._get_ssh_client()
513
514 self._run_command(
515 ssh, ["sudo", "/bin/bash -c " + shlex.quote(script)], pty=True
516 )
517 except paramiko.ssh_exception.AuthenticationException as e:
518 raise e
519 finally:
520 ssh.close()
521
522 return True
523
524 def _detect_hardware_and_os(self, ssh):
525 """Detect the target hardware capabilities and OS series.
526
527 :param object ssh: The SSHClient
528 :return: str: A raw string containing OS and hardware information.
529 """
530
531 info = {
532 "series": "",
533 "arch": "",
534 "cpu-cores": "",
535 "mem": "",
536 }
537
538 stdout, _ = self._run_command(
539 ssh, ["sudo", "/bin/bash -c " + shlex.quote(DETECTION_SCRIPT)], pty=True,
540 )
541
542 lines = stdout.split("\n")
543
544 # Remove extraneous line if DNS resolution of hostname famils
545 # i.e. sudo: unable to resolve host test-1-mgmtvm-1: Connection timed out
546 if "unable to resolve host" in lines[0]:
547 lines = lines[1:]
548
549 info["series"] = lines[0].strip()
550 info["arch"] = normalize_arch(lines[1].strip())
551
552 memKb = re.split(r"\s+", lines[2])[1]
553
554 # Convert megabytes -> kilobytes
555 info["mem"] = round(int(memKb) / 1024)
556
557 # Detect available CPUs
558 recorded = {}
559 for line in lines[3:]:
560 physical_id = ""
561
562 if line.find("physical id") == 0:
563 physical_id = line.split(":")[1].strip()
564 elif line.find("cpu cores") == 0:
565 cores = line.split(":")[1].strip()
566
567 if physical_id not in recorded.keys():
568 info["cpu-cores"] += cores
569 recorded[physical_id] = True
570
571 return info
572
573 def provision_machine(self):
574 """Perform the initial provisioning of the target machine.
575
576 :return: bool: The client.AddMachineParams
577 :raises: :class:`paramiko.ssh_exception.AuthenticationException`
578 if the upload fails
579 """
580 params = client.AddMachineParams()
581
582 if self._init_ubuntu_user():
583 try:
584 ssh = self._get_ssh_client()
585
586 hw = self._detect_hardware_and_os(ssh)
587 params.series = hw["series"]
588 params.instance_id = "manual:{}".format(self.host)
589 params.nonce = "manual:{}:{}".format(
590 self.host, str(uuid.uuid4()),
591 ) # a nop for Juju w/manual machines
592 params.hardware_characteristics = {
593 "arch": hw["arch"],
594 "mem": int(hw["mem"]),
595 "cpu-cores": int(hw["cpu-cores"]),
596 }
597 params.addresses = [
598 {"value": self.host, "type": "ipv4", "scope": "public"}
599 ]
600
601 except paramiko.ssh_exception.AuthenticationException as e:
602 raise e
603 finally:
604 ssh.close()
605
606 return params
607
608 async def install_agent(self, connection, nonce, machine_id, api):
609 """
610 :param object connection: Connection to Juju API
611 :param str nonce: The nonce machine specification
612 :param str machine_id: The id assigned to the machine
613
614 :return: bool: If the initialization was successful
615 """
616 # The path where the Juju agent should be installed.
617 data_dir = "/var/lib/juju"
618
619 # Disabling this prevents `apt-get update` from running initially, so
620 # charms will fail to deploy
621 disable_package_commands = False
622
623 client_facade = client.ClientFacade.from_connection(connection)
624 results = await client_facade.ProvisioningScript(
625 data_dir=data_dir,
626 disable_package_commands=disable_package_commands,
627 machine_id=machine_id,
628 nonce=nonce,
629 )
630
631 """Get the IP of the controller
632
633 Parse the provisioning script, looking for the first apiaddress.
634
635 Example:
636 apiaddresses:
637 - 10.195.8.2:17070
638 - 127.0.0.1:17070
639 - '[::1]:17070'
640 """
641 m = re.search(r"apiaddresses:\n- (\d+\.\d+\.\d+\.\d+):17070", results.script)
642 apiaddress = m.group(1)
643
644 """Add IP Table rule
645
646 In order to route the traffic to the private ip of the Juju controller
647 we use a DNAT rule to tell the machine that the destination for the
648 private address is the public address of the machine where the Juju
649 controller is running in LXD. That machine will have a complimentary
650 iptables rule, routing traffic to the appropriate LXD container.
651 """
652
653 script = IPTABLES_SCRIPT.format(apiaddress, api)
654
655 # Run this in a retry loop, because dpkg may be running and cause the
656 # script to fail.
657 retry = 10
658 attempts = 0
659 delay = 15
660
661 while attempts <= retry:
662 try:
663 attempts += 1
664
665 self._run_configure_script(script)
666 break
667 except Exception as e:
668 self.log.debug("Waiting for dpkg, sleeping {} seconds".format(delay))
669 if attempts > retry:
670 raise e
671 else:
672 time.sleep(delay)
673 # Slowly back off the retry
674 delay += 15
675
676 # self.log.debug("Running configure script")
677 self._run_configure_script(results.script)
678 # self.log.debug("Configure script finished")
679
680 def _run_configure_script(self, script: str):
681 """Run the script to install the Juju agent on the target machine.
682
683 :param str script: The script returned by the ProvisioningScript API
684 :raises: :class:`paramiko.ssh_exception.AuthenticationException`
685 if the upload fails
686 """
687 _, tmpFile = tempfile.mkstemp()
688 with open(tmpFile, "w") as f:
689 f.write(script)
690 try:
691 # get ssh client
692 ssh = self._get_ssh_client(user="ubuntu",)
693
694 # copy the local copy of the script to the remote machine
695 sftp = paramiko.SFTPClient.from_transport(ssh.get_transport())
696 sftp.put(
697 tmpFile, tmpFile,
698 )
699
700 # run the provisioning script
701 self._run_command(
702 ssh, "sudo /bin/bash {}".format(tmpFile),
703 )
704
705 except paramiko.ssh_exception.AuthenticationException as e:
706 raise e
707 finally:
708 os.remove(tmpFile)
709 ssh.close()