RIFT OSM R1 Initial Submission
[osm/SO.git] / common / plugins / rwcntmgrtasklet / rift / tasklets / rwcntmgrtasklet / rwcntmgrtasklet.py
diff --git a/common/plugins/rwcntmgrtasklet/rift/tasklets/rwcntmgrtasklet/rwcntmgrtasklet.py b/common/plugins/rwcntmgrtasklet/rift/tasklets/rwcntmgrtasklet/rwcntmgrtasklet.py
new file mode 100755 (executable)
index 0000000..02714a5
--- /dev/null
@@ -0,0 +1,331 @@
+
+# 
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+import asyncio
+import logging
+import os
+import shlex
+import subprocess
+import time
+import uuid
+
+import gi
+gi.require_version('RwDts', '1.0')
+gi.require_version('RwcalYang', '1.0')
+gi.require_version('RwCal', '1.0')
+
+
+from gi.repository import (
+    RwDts as rwdts,
+    RwcalYang,
+)
+
+import rift.rwcal.cloudsim.lvm as lvm
+import rift.rwcal.cloudsim.lxc as lxc
+import rift.tasklets
+import rw_peas
+
+
+class SaltConnectionTimeoutError(Exception):
+    pass
+
+
+class ContainerManager(rift.tasklets.Tasklet):
+    def __init__(self, *args, **kwargs):
+        super(ContainerManager, self).__init__(*args, **kwargs)
+        self.lvm = None
+        self.resources = None
+        self.dts_api = None
+
+    def start(self):
+        super(ContainerManager, self).start()
+        self.log.info("Starting ContainerManager")
+        self.log.setLevel(logging.DEBUG)
+        ResourceProvisioning.log_hdl = self.log_hdl
+
+        self.log.debug("Registering with dts")
+        self._dts = rift.tasklets.DTS(
+                self.tasklet_info,
+                RwcalYang.get_schema(),
+                self.loop,
+                self.on_dts_state_change
+                )
+
+        self.log.debug("Created DTS Api GI Object: %s", self._dts)
+
+    def on_instance_started(self):
+        self.log.debug("Got instance started callback")
+
+    def stop(self):
+        super(ContainerManager, self).stop()
+        self.resources.destroy()
+        self.lvm.destroy()
+
+    @asyncio.coroutine
+    def init(self):
+        # Create the LVM backing store with the 'rift' volume group
+        self.lvm = LvmProvisioning()
+        self.resources = ResourceProvisioning(self.loop, self.log)
+
+        # Create lvm partition
+        yield from self.loop.run_in_executor(
+                None,
+                self.resources.destroy,
+                )
+
+        if "REUSE_LXC" not in os.environ:
+            # Create lvm partition
+            yield from self.loop.run_in_executor(
+                    None,
+                    self.lvm.destroy,
+                    )
+
+            # Create lvm partition
+            yield from self.loop.run_in_executor(
+                    None,
+                    self.lvm.create,
+                    )
+
+        # Create an initial set of VMs
+        yield from self.loop.run_in_executor(
+                None,
+                self.resources.create,
+                )
+
+        yield from self.loop.run_in_executor(
+                None,
+                self.resources.wait_ready,
+                )
+
+    @asyncio.coroutine
+    def run(self):
+        pass
+
+    @asyncio.coroutine
+    def on_dts_state_change(self, state):
+        """Take action according to current dts state to transition
+        application into the corresponding application state
+
+        Arguments
+            state - current dts state
+        """
+
+        switch = {
+            rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
+            rwdts.State.CONFIG: rwdts.State.RUN,
+        }
+
+        handlers = {
+            rwdts.State.INIT: self.init,
+            rwdts.State.RUN: self.run,
+        }
+
+        # Transition application to next state
+        handler = handlers.get(state, None)
+        if handler is not None:
+            yield from handler()
+
+        # Transition dts to next state
+        next_state = switch.get(state, None)
+        if next_state is not None:
+            self._dts.handle.set_state(next_state)
+
+
+class LvmProvisioning(object):
+    """
+    This class represents LVM provisioning.
+    """
+
+    def create(self):
+        """Creates an LVM backing store"""
+        lvm.create('rift')
+
+    def destroy(self):
+        """Destroys the existing LVM backing store"""
+        lvm.destroy('rift')
+
+
+class ResourceProvisioning(object):
+    """
+    This is a placeholder class that is used to represent the provisioning of
+    container resources.
+    """
+
+    cal_interface = None
+    log_hdl = None
+
+    def __init__(self, loop, log):
+        # Initialize the CAL interface if it has not already been initialized
+        if ResourceProvisioning.cal_interface is None:
+            plugin = rw_peas.PeasPlugin('rwcal_cloudsimproxy', 'RwCal-1.0')
+            engine, info, extension = plugin()
+
+            ResourceProvisioning.cal_interface = plugin.get_interface("Cloud")
+            ResourceProvisioning.cal_interface.init(ResourceProvisioning.log_hdl)
+
+        self.account = RwcalYang.CloudAccount()
+        self.account.account_type = "cloudsim_proxy"
+        self.account.cloudsim_proxy.host = "192.168.122.1"
+
+        self.log = log
+        self.loop = loop
+        self.nvms = 1
+
+        self._vms = []
+
+    @property
+    def cal(self):
+        return ResourceProvisioning.cal_interface
+
+    def create(self):
+        """Create all of the necessary resources"""
+
+        rift_root = os.environ['RIFT_ROOT']
+        image = self.create_image("%s/images/rift-root-latest.qcow2" % (rift_root))
+
+        # Create a VM
+        for index in range(self.nvms):
+            self._vms.append(self.create_vm(image, index))
+
+        # Start the VMs
+        for vm in self._vms:
+            self.cal.start_vm(self.account, vm.vm_id)
+
+    def destroy(self):
+        """Destroy all of the provided resources"""
+
+        for container in lxc.containers():
+            lxc.stop(container)
+
+        for container in lxc.containers():
+            if not ("REUSE_LXC" in os.environ and container == "rwm0"):
+                lxc.destroy(container)
+
+    def create_image(self, location):
+        """Creates and returns a CAL image"""
+
+        image = RwcalYang.ImageInfoItem()
+        image.name = "rift-lxc-image"
+        image.location = location
+        image.disk_format = "qcow2"
+        rc, image.id = self.cal.create_image(self.account, image)
+        return image
+
+    def create_network(self, network_name, subnet):
+        """Creates and returns a CAL network"""
+
+        network = RwcalYang.NetworkInfoItem(
+                network_name=network_name,
+                subnet=subnet,
+                )
+        rc, network.network_id = self.cal.create_network(self.account, network)
+        return network
+
+    def create_vm(self, image, index):
+        """Returns a VM
+
+        Arguments:
+            image - the image used to create the VM
+            index - an index used to label the VM
+
+        Returns:
+            A VM object
+
+        """
+        vm = RwcalYang.VMInfoItem()
+        vm.vm_name = 'rift-s{}'.format(index + 1)
+        vm.image_id = image.id
+        vm.user_tags.node_id = str(uuid.uuid4())
+
+        user_data_template_str = open(
+                    os.path.join(
+                        os.environ['RIFT_INSTALL'],
+                        'etc/userdata-template',
+                        )
+                    ).read()
+
+        # Get the interface ip address of the mgmt network
+        # This is where the salt master is accessible on
+        mgmt_interface_ip = "192.168.122.1"
+
+        # Create salt-stack userdata
+        vm.cloud_init.userdata = user_data_template_str.format(
+                master_ip=mgmt_interface_ip,
+                lxcname=vm.user_tags.node_id,
+                )
+
+        rc, vm.vm_id = self.cal.create_vm(self.account, vm)
+
+        return vm
+
+    def wait_vm_salt_connection(self, vm, timeout_secs=600):
+        """ Wait for vm salt minion to reach up state with master """
+
+        vm_node_id = vm.user_tags.node_id
+        start_time = time.time()
+        self.log.debug("Waiting up to %s seconds for node id %s",
+                       timeout_secs, vm_node_id)
+        while (time.time() - start_time) < timeout_secs:
+            try:
+                stdout = subprocess.check_output(
+                        shlex.split('salt %s test.ping' % vm_node_id),
+                        universal_newlines=True,
+                        )
+            except subprocess.CalledProcessError:
+                continue
+
+            up_minions = stdout.splitlines()
+            for line in up_minions:
+                if "True" in line:
+                    return
+
+        raise SaltConnectionTimeoutError(
+                "Salt id %s did not enter UP state in %s seconds" % (
+                    vm_node_id, timeout_secs
+                    )
+                )
+
+    def wait_ready(self):
+        """ Wait for all resources to become ready """
+
+        self.log.info("Waiting for all VM's to make a salt minion connection")
+        for i, vm in enumerate(self._vms):
+            self.wait_vm_salt_connection(vm)
+            self.log.debug(
+                "Node id %s came up (%s/%s)",
+                vm.user_tags.node_id, i + 1, len(self._vms)
+                )
+
+    def create_port(self, network, vm, index):
+        """Returns a port
+
+        Arguments:
+            network - a network object
+            vm      - a VM object
+            index   - an index to label the port
+
+        Returns:
+            Returns a port object
+
+        """
+        port = RwcalYang.PortInfoItem()
+        port.port_name = "eth1"
+        port.network_id = network.network_id
+        port.vm_id = vm.vm_id
+
+        rc, port.port_id = self.cal.create_port(self.account, port)
+        return port