Commit 86ef36a9 authored by garciadav's avatar garciadav
Browse files

Add relations between enodeB and MagmaGW

parent 9f5ae0b0
Loading
Loading
Loading
Loading
+0 −21
Original line number Diff line number Diff line
register:
  description: "Register to AGW (EPC)."
  params:
    mme-addr:
      description: "MME address."
      type: string
      default: ""
    gtp-bind-addr:
      description: "GTP bind address"
      type: string
      default: ""
    s1c-bind-addr:
      description: "S1C bind address."
      type: string
      default: ""
  required:
    - mme-addr
    - gtp-bind-addr
    - s1c-bind-addr
unregister:
  description: "Unregister from AGW."
attach-ue:
  description: Attach User Emulator to enodeB
  params:
+3 −0
Original line number Diff line number Diff line
@@ -6,3 +6,6 @@ description: |
series:
  - xenial
  - bionic
requires:
  agw:
    interface: lte-vepc
 No newline at end of file
+42 −55
Original line number Diff line number Diff line
@@ -47,39 +47,42 @@ class EnodebCharm(CharmBase):
        # Register all of the events we want to observe
        self.framework.observe(self.on.config_changed, self.on_config_changed)
        self.framework.observe(self.on.install, self.on_install)
        self.framework.observe(self.on.start, self.on_start)
        self.framework.observe(self.on.upgrade_charm, self.on_upgrade_charm)
        self.framework.observe(self.on.register_action, self.on_register_action)

        self.framework.observe(self.on.attach_ue_action, self.on_attach_ue_action)
        self.framework.observe(self.on.unregister_action, self.on_unregister_action)
        self.framework.observe(self.on.detach_ue_action, self.on_detach_ue_action)
        self.framework.observe(self.on.remove_default_gw_action, self.on_remove_default_gw_action)

        self.framework.observe(self.on.agw_relation_changed, self.agw_relation_changed)

    def on_config_changed(self, event):
        """Handle changes in configuration"""
        unit = self.model.unit

    def on_start(self, event):
        """Called when the charm starts"""
        self.unit.status = ActiveStatus()

    def on_install(self, event):
        """Called when the charm is being installed"""
        unit = self.model.unit
        # Install your software and its dependencies
        unit.status = ActiveStatus()

    def on_upgrade_charm(self, event):
        """Upgrade the charm."""
        unit = self.model.unit
        # Mark the unit as under Maintenance.
        unit.status = MaintenanceStatus("Upgrading charm")
        self.on_install(event)
        # When maintenance is done, return to an Active state
        unit.status = ActiveStatus()

    def on_register_action(self, event):

    def agw_relation_changed(self, event):
        """Register to AGW (EPC)."""
        try:
            mme_addr = event.params["mme-addr"]
            gtp_bind_addr = event.params["gtp-bind-addr"]
            s1c_bind_addr = event.params["s1c-bind-addr"]
            log_file = "/tmp/{}.log".format(random.randint(1000, 100000))
        self.unit.status = MaintenanceStatus("Getting MagmaGW data from relation")
        relation = self.model.get_relation("agw")
        if relation is None:
            event.defer()
            return
        self._unregister()
        mme_addr = relation.data[event.unit].get("mme-addr", None)
        status = "Not registered"
        if mme_addr is not None:
            self.unit.status = MaintenanceStatus(f"Relation data {dict(relation.data[event.unit])}")
            gtp_bind_addr = self._get_bind_address()
            s1c_bind_addr = gtp_bind_addr
            command = " ".join(
                [
                    "/home/ubuntu/srsLTE/build/srsenb/src/srsenb",
@@ -97,35 +100,18 @@ class EnodebCharm(CharmBase):
                    "--rf.device_args='fail_on_disconnect=true,tx_port=tcp://*:2000,rx_port=tcp://localhost:2001,id=enb,base_srate=23.04e6'",
                ]
            )
            logger.debug("Register action: executing")
            process = self._run_daemon(command, log_file)
            logger.debug("Register action: executed")
            event.set_results(
                {"status": "ok", "pid": process.pid, "log-file": log_file}
            )
        except subprocess.CalledProcessError as e:
            event.fail("Command error: {}".format(e.output))
        except asyncio.TimeoutError as e:
            event.fail("Timeout error")
        except Exception as e:
            event.fail(e)
            self._run_daemon(command)
            status = "Registered"
        self.unit.status = ActiveStatus(status)

    def on_unregister_action(self, event):
        """Unregister action"""
        try:
            command = "sudo killall -s KILL srsenb"
            output = subprocess.check_output(command, shell=True)
            event.set_results(
                {
                    "status": "ok",
                    "message": "Unregistered successfully",
                    "output": output,
                }
            )
        except subprocess.CalledProcessError as e:
            event.fail("Command error: {}".format(e.output))
        except Exception as e:
            event.fail(e)
    def _get_bind_address(self):
        output = subprocess.run(["hostname", "-I"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        return output.stdout.decode("utf-8").split(" ")[1]

    def _unregister(self):
        """Unregister"""
        self.unit.status = MaintenanceStatus("Unregistering")
        subprocess.run("sudo killall -s KILL srsenb", shell=True)

    def on_attach_ue_action(self, event):
        """Attach User Emulator to EnodeB."""
@@ -133,7 +119,6 @@ class EnodebCharm(CharmBase):
            usim_imsi = event.params["usim-imsi"]
            usim_k = event.params["usim-k"]
            usim_opc = event.params["usim-opc"]
            log_file = "/tmp/{}.log".format(random.randint(1000, 100000))
            command = " ".join(
                [
                    "sudo",
@@ -148,11 +133,9 @@ class EnodebCharm(CharmBase):
                    "/configzmq/ue.conf",
                ]
            )
            logger.debug("Attach UE action: executing")
            process = self._run_daemon(command, log_file)
            logger.debug("Attach UE action: executed")
            process = self._run_daemon(command)
            event.set_results(
                {"status": "ok", "pid": process.pid, "log-file": log_file}
                {"status": "ok", "pid": process.pid}
            )
        except subprocess.CalledProcessError as ex:
            event.fail(ex)
@@ -183,9 +166,13 @@ class EnodebCharm(CharmBase):
        except Exception as e:
            event.fail(e)

    def _run_daemon(self, cmd, stdout_file):
        with open(stdout_file, "wb") as f:
            return subprocess.Popen(cmd, shell=True, stdout=f)
    def _run_daemon(self, cmd):
        log_basename = random.randint(1000, 100000)
        stdout_file = f"/tmp/{log_basename}.stdout"
        stderr_file = f"/tmp/{log_basename}.stderr"
        with open(stdout_file, "wb") as o:
            with open(stderr_file, "wb") as e:
                return subprocess.Popen(cmd, shell=True, stdout=o, stderr=e)


if __name__ == "__main__":
+3 −0
Original line number Diff line number Diff line
@@ -10,3 +10,6 @@ series:
peers:
  proxypeer:
    interface: proxypeer
provides:
  agw:
    interface: lte-vepc
+49 −216
Original line number Diff line number Diff line
@@ -27,149 +27,79 @@ from ops.model import (
    WaitingStatus,
    ModelError,
)
import os
import subprocess
from proxy_cluster import ProxyCluster

from charms.osm.sshproxy import SSHProxy


class SSHKeysInitialized(EventBase):
    def __init__(self, handle, ssh_public_key, ssh_private_key):
        super().__init__(handle)
        self.ssh_public_key = ssh_public_key
        self.ssh_private_key = ssh_private_key

    def snapshot(self):
        return {
            "ssh_public_key": self.ssh_public_key,
            "ssh_private_key": self.ssh_private_key,
        }

    def restore(self, snapshot):
        self.ssh_public_key = snapshot["ssh_public_key"]
        self.ssh_private_key = snapshot["ssh_private_key"]


class ProxyClusterEvents(CharmEvents):
    ssh_keys_initialized = EventSource(SSHKeysInitialized)
from charms.osm.sshproxy import SSHProxyCharm
import subprocess


class SimpleHAProxyCharm(CharmBase):
class MagmaAGWProxyCharm(SSHProxyCharm):

    state = StoredState()
    on = ProxyClusterEvents()

    def __init__(self, framework, key):
        super().__init__(framework, key)

        # An example of setting charm state
        # that's persistent across events
        self.state.set_default(is_started=False)

        self.peers = ProxyCluster(self, "proxypeer")
        self.state.set_default(registered=False, ready=False)

        if not self.state.is_started:
            self.state.is_started = True

        # Register all of the events we want to observe
        # Listen to charm events
        self.framework.observe(self.on.config_changed, self.on_config_changed)
        self.framework.observe(self.on.install, self.on_install)
        self.framework.observe(self.on.start, self.on_start)
        self.framework.observe(self.on.upgrade_charm, self.on_upgrade_charm)

        # Charm actions (primitives)
        self.framework.observe(self.on.add_net_action, self.on_add_net_action)
        self.framework.observe(self.on.add_gw_action, self.on_add_gw_action)
        self.framework.observe(self.on.reset_id_action, self.on_reset_id_action)
        self.framework.observe(self.on.add_hosts_action, self.on_add_hosts_action)
        self.framework.observe(self.on.restart_magma_action, self.on_restart_magma_action)
        self.framework.observe(
            self.on.restart_magma_action, self.on_restart_magma_action
        )
        self.framework.observe(self.on.del_gw_action, self.on_del_gw_action)
        self.framework.observe(self.on.reset_id_action, self.on_reset_id_action)
        self.framework.observe(self.on.add_test_subscriber_action, self.on_add_test_subscriber_action)
        # OSM actions (primitives)
        self.framework.observe(self.on.start_action, self.on_start_action)
        self.framework.observe(self.on.stop_action, self.on_stop_action)
        self.framework.observe(self.on.restart_action, self.on_restart_action)
        self.framework.observe(self.on.reboot_action, self.on_reboot_action)
        self.framework.observe(self.on.upgrade_action, self.on_upgrade_action)
        # SSH Proxy actions (primitives)
        self.framework.observe(self.on.generate_ssh_key_action, self.on_generate_ssh_key_action)
        self.framework.observe(self.on.get_ssh_public_key_action, self.on_get_ssh_public_key_action)
        self.framework.observe(self.on.run_action, self.on_run_action)
        self.framework.observe(self.on.verify_ssh_credentials_action, self.on_verify_ssh_credentials_action)

        self.framework.observe(self.on.proxypeer_relation_changed, self.on_proxypeer_relation_changed)
        

    def get_ssh_proxy(self):
        """Get the SSHProxy instance"""
        proxy = SSHProxy(
            hostname=self.model.config["ssh-hostname"],
            username=self.model.config["ssh-username"],
            password=self.model.config["ssh-password"],
        self.framework.observe(
            self.on.add_test_subscriber_action, self.on_add_test_subscriber_action
        )
        return proxy

    def on_proxypeer_relation_changed(self, event):
        if self.peers.is_cluster_initialized:
            pubkey = self.peers.ssh_public_key
            privkey = self.peers.ssh_private_key
            SSHProxy.write_ssh_keys(public=pubkey, private=privkey)
            self.on_config_changed(event)
        else:
        # # Relation
        self.framework.observe(self.on.agw_relation_joined, self.agw_relation_joined)

    def agw_relation_joined(self, event):
        if self.unit.is_leader():
            if not self.state.ready:
                event.defer()
                return
            self.send_relation_data()

    def send_relation_data(self):
        relation = self.model.get_relation("agw")
        if relation is not None and self.state.mme_addr and self.state.magmagw:
            relation.data[self.unit]["mme-addr"] = self.state.mme_addr
            relation.data[self.unit]["magmagw"] = self.state.magmagw

    def on_config_changed(self, event):
        """Handle changes in configuration"""
        unit = self.model.unit

        # Unit should go into a waiting state until verify_ssh_credentials is successful
        unit.status = WaitingStatus("Waiting for SSH credentials")
        proxy = self.get_ssh_proxy()

        verified = proxy.verify_credentials()
        if verified:
            unit.status = ActiveStatus()
        else:
            unit.status = BlockedStatus("Invalid SSH credentials.")
        super().on_config_changed(event)

    def on_install(self, event):
        unit = self.model.unit
        unit.status = MaintenanceStatus("Installing all SW")
        SSHProxy.install()
        unit.status = ActiveStatus()
        """Called when the charm is being installed"""
        super().on_install(event)

    def on_start(self, event):
        """Called when the charm is being installed"""
        if not self.peers.is_joined:
        """Called when the charm is being started"""
        super().on_start(event)
        if not self.verify_credentials():
            event.defer()
            return

        unit = self.model.unit

        if not SSHProxy.has_ssh_key():
            unit.status = MaintenanceStatus("Generating SSH keys...")
            pubkey = None
            privkey = None
            if self.is_leader:
                if self.peers.is_cluster_initialized:
                    SSHProxy.write_ssh_keys(
                        public=self.peers.ssh_public_key,
                        private=self.peers.ssh_private_key,
                    )
                else:
                    SSHProxy.generate_ssh_key()
                    self.on.ssh_keys_initialized.emit(
                        SSHProxy.get_ssh_public_key(), SSHProxy.get_ssh_private_key()
                    )
                unit.status = ActiveStatus()
            else:
                unit.status = WaitingStatus("Waiting for leader to populate the keys")
        proxy = self.get_ssh_proxy()
        ips, _ = proxy.run("hostname -I")
        ip_list = ips.split(" ")
        self.state.mme_addr = ip_list[0]
        self.state.magmagw = ip_list[1]

    # Magma AGW Action implementation
    def on_add_net_action(self, event):
        """Add AGW Network if needed"""
        if self.is_leader:
        if self.unit.is_leader():
            orch_ip = event.params["orch_ip"]
            orch_net = event.params["orch_net"]
            proxy = self.get_ssh_proxy()
@@ -186,15 +116,17 @@ class SimpleHAProxyCharm(CharmBase):
                except subprocess.CalledProcessError:
                    attempt += 1
                    import time

                    time.sleep(5)
            event.set_results({"output": stdout, "stderr": stderr})
            self.state.registered = True
        else:
            event.fail("Unit is not leader")
            return

    def on_add_gw_action(self, event):
        """Self-register for the AGW"""
        if self.is_leader:
        if self.unit.is_leader():
            agw_id = event.params["agw_id"]
            agw_name = event.params["agw_name"]
            orch_ip = event.params["orch_ip"]
@@ -212,7 +144,7 @@ class SimpleHAProxyCharm(CharmBase):

    def on_reset_id_action(self, event):
        """Resets the hardware ID"""
        if self.is_leader:
        if self.unit.is_leader():
            proxy = self.get_ssh_proxy()
            stdout, stderr = proxy.run("sudo snowflake --force-new-key")
            event.set_results({"output": stdout, "stderr": stderr})
@@ -222,7 +154,7 @@ class SimpleHAProxyCharm(CharmBase):

    def on_add_hosts_action(self, event):
        """Add Orchestrator host in /etc/hosts"""
        if self.is_leader:
        if self.unit.is_leader():
            orch_ip = event.params["orch_ip"]
            orch_hosts = "ORCH_IP controller.magma.test\nORCH_IP bootstrapper-controller.magma.test\nORCH_IP state-controller.magma.test\nORCH_IP dispatcher-controller.magma.test\nORCH_IP logger-controller.magma.test\nORCH_IP streamer-controller.magma.test\n"
            orch_hosts = orch_hosts.replace("ORCH_IP", orch_ip)
@@ -237,17 +169,20 @@ class SimpleHAProxyCharm(CharmBase):

    def on_restart_magma_action(self, event):
        """Resets the hardware ID"""
        if self.is_leader:
        if self.unit.is_leader():
            proxy = self.get_ssh_proxy()
            stdout, stderr = proxy.run("sudo service magma@* restart")
            event.set_results({"output": stdout, "stderr": stderr})
            if self.state.registered:
                self.state.ready = True
                self.send_relation_data()
        else:
            event.fail("Unit is not leader")
            return

    def on_del_gw_action(self, event):
        """Deregister from AGW"""
        if self.is_leader:
        if self.unit.is_leader():
            agw_id = event.params["agw_id"]
            orch_ip = event.params["orch_ip"]
            orch_net = event.params["orch_net"]
@@ -264,7 +199,7 @@ class SimpleHAProxyCharm(CharmBase):

    def on_add_test_subscriber_action(self, event):
        """Adds test subscriber to Orc8r HSS"""
        if self.is_leader:
        if self.unit.is_leader():
            orch_ip = event.params["orch_ip"]
            orch_net = event.params["orch_net"]
            proxy = self.get_ssh_proxy()
@@ -278,108 +213,6 @@ class SimpleHAProxyCharm(CharmBase):
            event.fail("Unit is not leader")
            return

    def on_upgrade_charm(self, event):
        """Upgrade the charm."""
        unit = self.model.unit

        # Mark the unit as under Maintenance.
        unit.status = MaintenanceStatus("Upgrading charm")

        self.on_install(event)

        # When maintenance is done, return to an Active state
        unit.status = ActiveStatus()

    ###############
    # OSM methods #
    ###############
    def on_start_action(self, event):
        """Start the VNF service on the VM."""
        pass

    def on_stop_action(self, event):
        """Stop the VNF service on the VM."""
        pass

    def on_restart_action(self, event):
        """Restart the VNF service on the VM."""
        pass

    def on_reboot_action(self, event):
        """Reboot the VM."""
        if self.is_leader:
            proxy = self.get_ssh_proxy()
            stdout, stderr = proxy.run("sudo reboot")
            if len(stderr):
                event.fail(stderr)
        else:
            event.fail("Unit is not leader")
            return

    def on_upgrade_action(self, event):
        """Upgrade the VNF service on the VM."""
        pass

    #####################
    # SSH Proxy methods #
    #####################
    def on_generate_ssh_key_action(self, event):
        """Generate a new SSH keypair for this unit."""
        if self.is_leader:
            if not SSHProxy.generate_ssh_key():
                event.fail("Unable to generate ssh key")
        else:
            event.fail("Unit is not leader")
            return

    def on_get_ssh_public_key_action(self, event):
        """Get the SSH public key for this unit."""
        if self.is_leader:
            pubkey = SSHProxy.get_ssh_public_key()
            event.set_results({"pubkey": SSHProxy.get_ssh_public_key()})
        else:
            event.fail("Unit is not leader")
            return

    def on_run_action(self, event):
        """Run an arbitrary command on the remote host."""
        if self.is_leader:
            cmd = event.params["command"]
            proxy = self.get_ssh_proxy()
            stdout, stderr = proxy.run(cmd)
            event.set_results({"output": stdout})
            if len(stderr):
                event.fail(stderr)
        else:
            event.fail("Unit is not leader")
            return

    def on_verify_ssh_credentials_action(self, event):
        """Verify the SSH credentials for this unit."""
        if self.is_leader:
            proxy = self.get_ssh_proxy()

            verified = proxy.verify_credentials()
            if verified:
                print("Verified!")
                event.set_results({"verified": True})
            else:
                print("Verification failed!")
                event.set_results({"verified": False})
        else:
            event.fail("Unit is not leader")
            return

    @property
    def is_leader(self):
        # update the framework to include self.unit.is_leader()
        return self.model.unit.is_leader()


class LeadershipError(ModelError):
    def __init__(self):
        super().__init__("not leader")


if __name__ == "__main__":
    main(SimpleHAProxyCharm)
    main(MagmaAGWProxyCharm)
Loading