Newer
Older
import apt
from apt.progress.base import OpProgress
import shutil
import subprocess
from typing import Dict, List, NoReturn
def service_active(service_name: str):
result = subprocess.run(
["systemctl", "is-active", service_name],
stdout=subprocess.PIPE,
encoding="utf-8",
)
return result.stdout == "active\n"
def all_values_set(dictionary: Dict[str, str]) -> bool:
return not any(v is None for v in dictionary.values())
def install_apt(packages: List, update: bool = False, progress=None) -> NoReturn:
cache = apt.cache.Cache()
if update:
cache.update()
cache.open()
for package in packages:
pkg = cache[package]
if not pkg.is_installed:
pkg.mark_install()
cache.commit(install_progress=progress)
def remove_apt(packages: List, update: bool = False, progress=None) -> NoReturn:
cache = apt.cache.Cache()
if update:
cache.update()
cache.open()
for package in packages:
pkg = cache[package]
cache.commit(install_progress=progress)
def upgrade_apt(update: bool = False, progress=None) -> NoReturn:
cache = apt.cache.Cache()
if update:
cache.update()
cache.open()
cache.upgrade(dist_upgrade=True)
cache.commit(install_progress=progress)
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def shell(command: str) -> NoReturn:
subprocess.run(command, shell=True).check_returncode()
def copy_files(origin: Dict[str, str], destination: Dict[str, str]) -> NoReturn:
for config, origin_path in origin.items():
destination_path = destination[config]
shutil.copy(origin_path, destination_path)
# Service functions
def _systemctl(action: str, service_name: str) -> NoReturn:
subprocess.run(["systemctl", action, service_name]).check_returncode()
def service_start(service_name: str) -> NoReturn:
_systemctl("start", service_name)
def service_restart(service_name: str) -> NoReturn:
_systemctl("restart", service_name)
def service_stop(service_name: str) -> NoReturn:
_systemctl("stop", service_name)
def service_enable(service_name: str) -> NoReturn:
_systemctl("enable", service_name)
def systemctl_daemon_reload():
subprocess.run(["systemctl", "daemon-reload"]).check_returncode()