Implement get_service and get_services methods for K8sJujuConnector
[osm/N2VC.git] / n2vc / provisioner.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import logging
15 import os
16 import re
17 import shlex
18 from subprocess import CalledProcessError
19 import tempfile
20 import time
21 import uuid
22
23 from juju.client import client
24 import n2vc.exceptions
25 import paramiko
26 import asyncio
27
28 arches = [
29 [re.compile(r"amd64|x86_64"), "amd64"],
30 [re.compile(r"i?[3-9]86"), "i386"],
31 [re.compile(r"(arm$)|(armv.*)"), "armhf"],
32 [re.compile(r"aarch64"), "arm64"],
33 [re.compile(r"ppc64|ppc64el|ppc64le"), "ppc64el"],
34 [re.compile(r"s390x?"), "s390x"],
35 ]
36
37
38 def normalize_arch(rawArch):
39 """Normalize the architecture string."""
40 for arch in arches:
41 if arch[0].match(rawArch):
42 return arch[1]
43
44
45 DETECTION_SCRIPT = """#!/bin/bash
46 set -e
47 os_id=$(grep '^ID=' /etc/os-release | tr -d '"' | cut -d= -f2)
48 if [ "$os_id" = 'centos' ]; then
49 os_version=$(grep '^VERSION_ID=' /etc/os-release | tr -d '"' | cut -d= -f2)
50 echo "centos$os_version"
51 else
52 lsb_release -cs
53 fi
54 uname -m
55 grep MemTotal /proc/meminfo
56 cat /proc/cpuinfo
57 """
58
59 INITIALIZE_UBUNTU_SCRIPT = """set -e
60 (id ubuntu &> /dev/null) || useradd -m ubuntu -s /bin/bash
61 umask 0077
62 temp=$(mktemp)
63 echo 'ubuntu ALL=(ALL) NOPASSWD:ALL' > $temp
64 install -m 0440 $temp /etc/sudoers.d/90-juju-ubuntu
65 rm $temp
66 su ubuntu -c 'install -D -m 0600 /dev/null ~/.ssh/authorized_keys'
67 export authorized_keys="{}"
68 if [ ! -z "$authorized_keys" ]; then
69 su ubuntu -c 'echo $authorized_keys >> ~/.ssh/authorized_keys'
70 fi
71 """
72
73 IPTABLES_SCRIPT = """#!/bin/bash
74 set -e
75 [ -v `which netfilter-persistent` ] && apt update \
76 && DEBIAN_FRONTEND=noninteractive apt-get install -yqq iptables-persistent
77 iptables -t nat -A OUTPUT -p tcp -d {} -j DNAT --to-destination {}
78 netfilter-persistent save
79 """
80
81
82 class AsyncSSHProvisioner:
83 """Provision a manually created machine via SSH."""
84
85 user = ""
86 host = ""
87 private_key_path = ""
88
89 def __init__(self, user, host, private_key_path, log=None):
90 self.host = host
91 self.user = user
92 self.private_key_path = private_key_path
93 self.log = log if log else logging.getLogger(__name__)
94
95 async def _scp(self, source_file, destination_file):
96 """Execute an scp command. Requires a fully qualified source and
97 destination.
98
99 :param str source_file: Path to the source file
100 :param str destination_file: Path to the destination file
101 """
102 cmd = [
103 "scp",
104 "-i",
105 os.path.expanduser(self.private_key_path),
106 "-o",
107 "StrictHostKeyChecking=no",
108 "-q",
109 "-B",
110 ]
111 destination = "{}@{}:{}".format(self.user, self.host, destination_file)
112 cmd.extend([source_file, destination])
113 process = await asyncio.create_subprocess_exec(*cmd)
114 await process.wait()
115 if process.returncode != 0:
116 raise CalledProcessError(returncode=process.returncode, cmd=cmd)
117
118 async def _ssh(self, command):
119 """Run a command remotely via SSH.
120
121 :param str command: The command to execute
122 :return: tuple: The stdout and stderr of the command execution
123 :raises: :class:`CalledProcessError` if the command fails
124 """
125
126 destination = "{}@{}".format(self.user, self.host)
127 cmd = [
128 "ssh",
129 "-i",
130 os.path.expanduser(self.private_key_path),
131 "-o",
132 "StrictHostKeyChecking=no",
133 "-q",
134 destination,
135 ]
136 cmd.extend([command])
137 process = await asyncio.create_subprocess_exec(
138 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
139 )
140 stdout, stderr = await process.communicate()
141
142 if process.returncode != 0:
143 output = stderr.decode("utf-8").strip()
144 raise CalledProcessError(
145 returncode=process.returncode, cmd=cmd, output=output
146 )
147 return (stdout.decode("utf-8").strip(), stderr.decode("utf-8").strip())
148
149 async def _init_ubuntu_user(self):
150 """Initialize the ubuntu user.
151
152 :return: bool: If the initialization was successful
153 :raises: :class:`CalledProcessError` if the _ssh command fails
154 """
155 retry = 10
156 attempts = 0
157 delay = 15
158 while attempts <= retry:
159 try:
160 attempts += 1
161 # Attempt to establish a SSH connection
162 stdout, stderr = await self._ssh("sudo -n true")
163 break
164 except CalledProcessError as e:
165 self.log.debug(
166 "Waiting for VM to boot, sleeping {} seconds".format(delay)
167 )
168 if attempts > retry:
169 raise e
170 else:
171 await asyncio.sleep(delay)
172 # Slowly back off the retry
173 delay += 15
174
175 # Infer the public key
176 public_key = None
177 public_key_path = "{}.pub".format(self.private_key_path)
178
179 if not os.path.exists(public_key_path):
180 raise FileNotFoundError(
181 "Public key '{}' doesn't exist.".format(public_key_path)
182 )
183
184 with open(public_key_path, "r") as f:
185 public_key = f.readline()
186
187 script = INITIALIZE_UBUNTU_SCRIPT.format(public_key)
188
189 stdout, stderr = await self._run_configure_script(script)
190
191 return True
192
193 async def _detect_hardware_and_os(self):
194 """Detect the target hardware capabilities and OS series.
195
196 :return: str: A raw string containing OS and hardware information.
197 """
198
199 info = {
200 "series": "",
201 "arch": "",
202 "cpu-cores": "",
203 "mem": "",
204 }
205
206 stdout, stderr = await self._run_configure_script(DETECTION_SCRIPT)
207
208 lines = stdout.split("\n")
209 info["series"] = lines[0].strip()
210 info["arch"] = normalize_arch(lines[1].strip())
211
212 memKb = re.split(r"\s+", lines[2])[1]
213
214 # Convert megabytes -> kilobytes
215 info["mem"] = round(int(memKb) / 1024)
216
217 # Detect available CPUs
218 recorded = {}
219 for line in lines[3:]:
220 physical_id = ""
221 print(line)
222
223 if line.find("physical id") == 0:
224 physical_id = line.split(":")[1].strip()
225 elif line.find("cpu cores") == 0:
226 cores = line.split(":")[1].strip()
227
228 if physical_id not in recorded.keys():
229 info["cpu-cores"] += cores
230 recorded[physical_id] = True
231
232 return info
233
234 async def provision_machine(self):
235 """Perform the initial provisioning of the target machine.
236
237 :return: bool: The client.AddMachineParams
238 """
239 params = client.AddMachineParams()
240
241 if await self._init_ubuntu_user():
242 hw = await self._detect_hardware_and_os()
243 params.series = hw["series"]
244 params.instance_id = "manual:{}".format(self.host)
245 params.nonce = "manual:{}:{}".format(
246 self.host, str(uuid.uuid4()),
247 ) # a nop for Juju w/manual machines
248 params.hardware_characteristics = {
249 "arch": hw["arch"],
250 "mem": int(hw["mem"]),
251 "cpu-cores": int(hw["cpu-cores"]),
252 }
253 params.addresses = [{"value": self.host, "type": "ipv4", "scope": "public"}]
254
255 return params
256
257 async def install_agent(self, connection, nonce, machine_id, api):
258 """
259 :param object connection: Connection to Juju API
260 :param str nonce: The nonce machine specification
261 :param str machine_id: The id assigned to the machine
262 :param str api: IP of the API_PROXY
263
264 :return: bool: If the initialization was successful
265 """
266 # The path where the Juju agent should be installed.
267 data_dir = "/var/lib/juju"
268
269 # Disabling this prevents `apt-get update` from running initially, so
270 # charms will fail to deploy
271 disable_package_commands = False
272
273 client_facade = client.ClientFacade.from_connection(connection)
274 results = await client_facade.ProvisioningScript(
275 data_dir=data_dir,
276 disable_package_commands=disable_package_commands,
277 machine_id=machine_id,
278 nonce=nonce,
279 )
280
281 """Get the IP of the controller
282
283 Parse the provisioning script, looking for the first apiaddress.
284
285 Example:
286 apiaddresses:
287 - 10.195.8.2:17070
288 - 127.0.0.1:17070
289 - '[::1]:17070'
290 """
291 m = re.search(r"apiaddresses:\n- (\d+\.\d+\.\d+\.\d+):17070", results.script)
292 apiaddress = m.group(1)
293
294 """Add IP Table rule
295
296 In order to route the traffic to the private ip of the Juju controller
297 we use a DNAT rule to tell the machine that the destination for the
298 private address is the public address of the machine where the Juju
299 controller is running in LXD. That machine will have a complimentary
300 iptables rule, routing traffic to the appropriate LXD container.
301 """
302
303 script = IPTABLES_SCRIPT.format(apiaddress, api)
304
305 # Run this in a retry loop, because dpkg may be running and cause the
306 # script to fail.
307 retry = 10
308 attempts = 0
309 delay = 15
310
311 while attempts <= retry:
312 try:
313 attempts += 1
314 stdout, stderr = await self._run_configure_script(script)
315 break
316 except Exception as e:
317 self.log.debug("Waiting for dpkg, sleeping {} seconds".format(delay))
318 if attempts > retry:
319 raise e
320 else:
321 await asyncio.sleep(delay)
322 # Slowly back off the retry
323 delay += 15
324
325 # self.log.debug("Running configure script")
326 await self._run_configure_script(results.script)
327 # self.log.debug("Configure script finished")
328
329 async def _run_configure_script(self, script, root=True):
330 """Run the script to install the Juju agent on the target machine.
331
332 :param str script: The script to be executed
333 """
334 _, tmpFile = tempfile.mkstemp()
335 with open(tmpFile, "w") as f:
336 f.write(script)
337 f.close()
338
339 # copy the local copy of the script to the remote machine
340 await self._scp(tmpFile, tmpFile)
341
342 # run the provisioning script
343 return await self._ssh(
344 "{} /bin/bash {}".format("sudo" if root else "", tmpFile)
345 )
346
347
348 class SSHProvisioner:
349 """Provision a manually created machine via SSH."""
350
351 def __init__(self, user, host, private_key_path, log=None):
352
353 self.host = host
354 self.user = user
355 self.private_key_path = private_key_path
356
357 if log:
358 self.log = log
359 else:
360 self.log = logging.getLogger(__name__)
361
362 def _get_ssh_client(self, host=None, user=None, private_key_path=None):
363 """Return a connected Paramiko ssh object.
364
365 :param str host: The host to connect to.
366 :param str user: The user to connect as.
367 :param str key: The private key to authenticate with.
368
369 :return: object: A paramiko.SSHClient
370 :raises: :class:`paramiko.ssh_exception.SSHException` if the
371 connection failed
372 """
373
374 if not host:
375 host = self.host
376
377 if not user:
378 user = self.user
379
380 if not private_key_path:
381 private_key_path = self.private_key_path
382
383 ssh = paramiko.SSHClient()
384 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
385
386 pkey = None
387
388 # Read the private key into a paramiko.RSAKey
389 if os.path.exists(private_key_path):
390 with open(private_key_path, "r") as f:
391 pkey = paramiko.RSAKey.from_private_key(f)
392
393 #######################################################################
394 # There is a bug in some versions of OpenSSH 4.3 (CentOS/RHEL5) where #
395 # the server may not send the SSH_MSG_USERAUTH_BANNER message except #
396 # when responding to an auth_none request. For example, paramiko will #
397 # attempt to use password authentication when a password is set, but #
398 # the server could deny that, instead requesting keyboard-interactive.#
399 # The hack to workaround this is to attempt a reconnect, which will #
400 # receive the right banner, and authentication can proceed. See the #
401 # following for more info: #
402 # https://github.com/paramiko/paramiko/issues/432 #
403 # https://github.com/paramiko/paramiko/pull/438 #
404 #######################################################################
405
406 retry = 10
407 attempts = 0
408 delay = 15
409 while attempts <= retry:
410 try:
411 attempts += 1
412
413 # Attempt to establish a SSH connection
414 ssh.connect(
415 host,
416 port=22,
417 username=user,
418 pkey=pkey,
419 # allow_agent=False,
420 # look_for_keys=False,
421 )
422 break
423 except paramiko.ssh_exception.SSHException as e:
424 if "Error reading SSH protocol banner" == str(e):
425 # Once more, with feeling
426 ssh.connect(host, port=22, username=user, pkey=pkey)
427 else:
428 # Reraise the original exception
429 self.log.debug("Unhandled exception caught: {}".format(e))
430 raise e
431 except Exception as e:
432 if "Unable to connect to port" in str(e):
433 self.log.debug(
434 "Waiting for VM to boot, sleeping {} seconds".format(delay)
435 )
436 if attempts > retry:
437 raise e
438 else:
439 time.sleep(delay)
440 # Slowly back off the retry
441 delay += 15
442 else:
443 self.log.debug(e)
444 raise e
445 return ssh
446
447 def _run_command(self, ssh, cmd, pty=True):
448 """Run a command remotely via SSH.
449
450 :param object ssh: The SSHClient
451 :param str cmd: The command to execute
452 :param list cmd: The `shlex.split` command to execute
453 :param bool pty: Whether to allocate a pty
454
455 :return: tuple: The stdout and stderr of the command execution
456 :raises: :class:`CalledProcessError` if the command fails
457 """
458
459 if isinstance(cmd, str):
460 cmd = shlex.split(cmd)
461
462 if type(cmd) is not list:
463 cmd = [cmd]
464
465 cmds = " ".join(cmd)
466 _, stdout, stderr = ssh.exec_command(cmds, get_pty=pty)
467 retcode = stdout.channel.recv_exit_status()
468
469 if retcode > 0:
470 output = stderr.read().strip()
471 raise CalledProcessError(returncode=retcode, cmd=cmd, output=output)
472 return (
473 stdout.read().decode("utf-8").strip(),
474 stderr.read().decode("utf-8").strip(),
475 )
476
477 def _init_ubuntu_user(self):
478 """Initialize the ubuntu user.
479
480 :return: bool: If the initialization was successful
481 :raises: :class:`paramiko.ssh_exception.AuthenticationException`
482 if the authentication fails
483 """
484 ssh = None
485 try:
486 # Run w/o allocating a pty, so we fail if sudo prompts for a passwd
487 ssh = self._get_ssh_client()
488 self._run_command(ssh, "sudo -n true", pty=False)
489 except paramiko.ssh_exception.AuthenticationException:
490 raise n2vc.exceptions.AuthenticationFailed(self.user)
491 except paramiko.ssh_exception.NoValidConnectionsError:
492 raise n2vc.exceptions.NoRouteToHost(self.host)
493 finally:
494 if ssh:
495 ssh.close()
496
497 # Infer the public key
498 public_key_path = "{}.pub".format(self.private_key_path)
499
500 if not os.path.exists(public_key_path):
501 raise FileNotFoundError(
502 "Public key '{}' doesn't exist.".format(public_key_path)
503 )
504
505 with open(public_key_path, "r") as f:
506 public_key = f.readline()
507
508 script = INITIALIZE_UBUNTU_SCRIPT.format(public_key)
509
510 try:
511 ssh = self._get_ssh_client()
512
513 self._run_command(
514 ssh, ["sudo", "/bin/bash -c " + shlex.quote(script)], pty=True
515 )
516 except paramiko.ssh_exception.AuthenticationException as e:
517 raise e
518 finally:
519 ssh.close()
520
521 return True
522
523 def _detect_hardware_and_os(self, ssh):
524 """Detect the target hardware capabilities and OS series.
525
526 :param object ssh: The SSHClient
527 :return: str: A raw string containing OS and hardware information.
528 """
529
530 info = {
531 "series": "",
532 "arch": "",
533 "cpu-cores": "",
534 "mem": "",
535 }
536
537 stdout, _ = self._run_command(
538 ssh, ["sudo", "/bin/bash -c " + shlex.quote(DETECTION_SCRIPT)], pty=True,
539 )
540
541 lines = stdout.split("\n")
542
543 # Remove extraneous line if DNS resolution of hostname famils
544 # i.e. sudo: unable to resolve host test-1-mgmtvm-1: Connection timed out
545 if "unable to resolve host" in lines[0]:
546 lines = lines[1:]
547
548 info["series"] = lines[0].strip()
549 info["arch"] = normalize_arch(lines[1].strip())
550
551 memKb = re.split(r"\s+", lines[2])[1]
552
553 # Convert megabytes -> kilobytes
554 info["mem"] = round(int(memKb) / 1024)
555
556 # Detect available CPUs
557 recorded = {}
558 for line in lines[3:]:
559 physical_id = ""
560
561 if line.find("physical id") == 0:
562 physical_id = line.split(":")[1].strip()
563 elif line.find("cpu cores") == 0:
564 cores = line.split(":")[1].strip()
565
566 if physical_id not in recorded.keys():
567 info["cpu-cores"] += cores
568 recorded[physical_id] = True
569
570 return info
571
572 def provision_machine(self):
573 """Perform the initial provisioning of the target machine.
574
575 :return: bool: The client.AddMachineParams
576 :raises: :class:`paramiko.ssh_exception.AuthenticationException`
577 if the upload fails
578 """
579 params = client.AddMachineParams()
580
581 if self._init_ubuntu_user():
582 try:
583 ssh = self._get_ssh_client()
584
585 hw = self._detect_hardware_and_os(ssh)
586 params.series = hw["series"]
587 params.instance_id = "manual:{}".format(self.host)
588 params.nonce = "manual:{}:{}".format(
589 self.host, str(uuid.uuid4()),
590 ) # a nop for Juju w/manual machines
591 params.hardware_characteristics = {
592 "arch": hw["arch"],
593 "mem": int(hw["mem"]),
594 "cpu-cores": int(hw["cpu-cores"]),
595 }
596 params.addresses = [
597 {"value": self.host, "type": "ipv4", "scope": "public"}
598 ]
599
600 except paramiko.ssh_exception.AuthenticationException as e:
601 raise e
602 finally:
603 ssh.close()
604
605 return params
606
607 async def install_agent(self, connection, nonce, machine_id, api):
608 """
609 :param object connection: Connection to Juju API
610 :param str nonce: The nonce machine specification
611 :param str machine_id: The id assigned to the machine
612
613 :return: bool: If the initialization was successful
614 """
615 # The path where the Juju agent should be installed.
616 data_dir = "/var/lib/juju"
617
618 # Disabling this prevents `apt-get update` from running initially, so
619 # charms will fail to deploy
620 disable_package_commands = False
621
622 client_facade = client.ClientFacade.from_connection(connection)
623 results = await client_facade.ProvisioningScript(
624 data_dir=data_dir,
625 disable_package_commands=disable_package_commands,
626 machine_id=machine_id,
627 nonce=nonce,
628 )
629
630 """Get the IP of the controller
631
632 Parse the provisioning script, looking for the first apiaddress.
633
634 Example:
635 apiaddresses:
636 - 10.195.8.2:17070
637 - 127.0.0.1:17070
638 - '[::1]:17070'
639 """
640 m = re.search(r"apiaddresses:\n- (\d+\.\d+\.\d+\.\d+):17070", results.script)
641 apiaddress = m.group(1)
642
643 """Add IP Table rule
644
645 In order to route the traffic to the private ip of the Juju controller
646 we use a DNAT rule to tell the machine that the destination for the
647 private address is the public address of the machine where the Juju
648 controller is running in LXD. That machine will have a complimentary
649 iptables rule, routing traffic to the appropriate LXD container.
650 """
651
652 script = IPTABLES_SCRIPT.format(apiaddress, api)
653
654 # Run this in a retry loop, because dpkg may be running and cause the
655 # script to fail.
656 retry = 10
657 attempts = 0
658 delay = 15
659
660 while attempts <= retry:
661 try:
662 attempts += 1
663
664 self._run_configure_script(script)
665 break
666 except Exception as e:
667 self.log.debug("Waiting for dpkg, sleeping {} seconds".format(delay))
668 if attempts > retry:
669 raise e
670 else:
671 time.sleep(delay)
672 # Slowly back off the retry
673 delay += 15
674
675 # self.log.debug("Running configure script")
676 self._run_configure_script(results.script)
677 # self.log.debug("Configure script finished")
678
679 def _run_configure_script(self, script: str):
680 """Run the script to install the Juju agent on the target machine.
681
682 :param str script: The script returned by the ProvisioningScript API
683 :raises: :class:`paramiko.ssh_exception.AuthenticationException`
684 if the upload fails
685 """
686 _, tmpFile = tempfile.mkstemp()
687 with open(tmpFile, "w") as f:
688 f.write(script)
689 try:
690 # get ssh client
691 ssh = self._get_ssh_client(user="ubuntu",)
692
693 # copy the local copy of the script to the remote machine
694 sftp = paramiko.SFTPClient.from_transport(ssh.get_transport())
695 sftp.put(
696 tmpFile, tmpFile,
697 )
698
699 # run the provisioning script
700 self._run_command(
701 ssh, "sudo /bin/bash {}".format(tmpFile),
702 )
703
704 except paramiko.ssh_exception.AuthenticationException as e:
705 raise e
706 finally:
707 os.remove(tmpFile)
708 ssh.close()