02714a50c3ec97c15cf26b0cc88b74a5e0f5fa88
[osm/SO.git] / common / plugins / rwcntmgrtasklet / rift / tasklets / rwcntmgrtasklet / rwcntmgrtasklet.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import logging
20 import os
21 import shlex
22 import subprocess
23 import time
24 import uuid
25
26 import gi
27 gi.require_version('RwDts', '1.0')
28 gi.require_version('RwcalYang', '1.0')
29 gi.require_version('RwCal', '1.0')
30
31
32 from gi.repository import (
33 RwDts as rwdts,
34 RwcalYang,
35 )
36
37 import rift.rwcal.cloudsim.lvm as lvm
38 import rift.rwcal.cloudsim.lxc as lxc
39 import rift.tasklets
40 import rw_peas
41
42
43 class SaltConnectionTimeoutError(Exception):
44 pass
45
46
47 class ContainerManager(rift.tasklets.Tasklet):
48 def __init__(self, *args, **kwargs):
49 super(ContainerManager, self).__init__(*args, **kwargs)
50 self.lvm = None
51 self.resources = None
52 self.dts_api = None
53
54 def start(self):
55 super(ContainerManager, self).start()
56 self.log.info("Starting ContainerManager")
57 self.log.setLevel(logging.DEBUG)
58 ResourceProvisioning.log_hdl = self.log_hdl
59
60 self.log.debug("Registering with dts")
61 self._dts = rift.tasklets.DTS(
62 self.tasklet_info,
63 RwcalYang.get_schema(),
64 self.loop,
65 self.on_dts_state_change
66 )
67
68 self.log.debug("Created DTS Api GI Object: %s", self._dts)
69
70 def on_instance_started(self):
71 self.log.debug("Got instance started callback")
72
73 def stop(self):
74 super(ContainerManager, self).stop()
75 self.resources.destroy()
76 self.lvm.destroy()
77
78 @asyncio.coroutine
79 def init(self):
80 # Create the LVM backing store with the 'rift' volume group
81 self.lvm = LvmProvisioning()
82 self.resources = ResourceProvisioning(self.loop, self.log)
83
84 # Create lvm partition
85 yield from self.loop.run_in_executor(
86 None,
87 self.resources.destroy,
88 )
89
90 if "REUSE_LXC" not in os.environ:
91 # Create lvm partition
92 yield from self.loop.run_in_executor(
93 None,
94 self.lvm.destroy,
95 )
96
97 # Create lvm partition
98 yield from self.loop.run_in_executor(
99 None,
100 self.lvm.create,
101 )
102
103 # Create an initial set of VMs
104 yield from self.loop.run_in_executor(
105 None,
106 self.resources.create,
107 )
108
109 yield from self.loop.run_in_executor(
110 None,
111 self.resources.wait_ready,
112 )
113
114 @asyncio.coroutine
115 def run(self):
116 pass
117
118 @asyncio.coroutine
119 def on_dts_state_change(self, state):
120 """Take action according to current dts state to transition
121 application into the corresponding application state
122
123 Arguments
124 state - current dts state
125 """
126
127 switch = {
128 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
129 rwdts.State.CONFIG: rwdts.State.RUN,
130 }
131
132 handlers = {
133 rwdts.State.INIT: self.init,
134 rwdts.State.RUN: self.run,
135 }
136
137 # Transition application to next state
138 handler = handlers.get(state, None)
139 if handler is not None:
140 yield from handler()
141
142 # Transition dts to next state
143 next_state = switch.get(state, None)
144 if next_state is not None:
145 self._dts.handle.set_state(next_state)
146
147
148 class LvmProvisioning(object):
149 """
150 This class represents LVM provisioning.
151 """
152
153 def create(self):
154 """Creates an LVM backing store"""
155 lvm.create('rift')
156
157 def destroy(self):
158 """Destroys the existing LVM backing store"""
159 lvm.destroy('rift')
160
161
162 class ResourceProvisioning(object):
163 """
164 This is a placeholder class that is used to represent the provisioning of
165 container resources.
166 """
167
168 cal_interface = None
169 log_hdl = None
170
171 def __init__(self, loop, log):
172 # Initialize the CAL interface if it has not already been initialized
173 if ResourceProvisioning.cal_interface is None:
174 plugin = rw_peas.PeasPlugin('rwcal_cloudsimproxy', 'RwCal-1.0')
175 engine, info, extension = plugin()
176
177 ResourceProvisioning.cal_interface = plugin.get_interface("Cloud")
178 ResourceProvisioning.cal_interface.init(ResourceProvisioning.log_hdl)
179
180 self.account = RwcalYang.CloudAccount()
181 self.account.account_type = "cloudsim_proxy"
182 self.account.cloudsim_proxy.host = "192.168.122.1"
183
184 self.log = log
185 self.loop = loop
186 self.nvms = 1
187
188 self._vms = []
189
190 @property
191 def cal(self):
192 return ResourceProvisioning.cal_interface
193
194 def create(self):
195 """Create all of the necessary resources"""
196
197 rift_root = os.environ['RIFT_ROOT']
198 image = self.create_image("%s/images/rift-root-latest.qcow2" % (rift_root))
199
200 # Create a VM
201 for index in range(self.nvms):
202 self._vms.append(self.create_vm(image, index))
203
204 # Start the VMs
205 for vm in self._vms:
206 self.cal.start_vm(self.account, vm.vm_id)
207
208 def destroy(self):
209 """Destroy all of the provided resources"""
210
211 for container in lxc.containers():
212 lxc.stop(container)
213
214 for container in lxc.containers():
215 if not ("REUSE_LXC" in os.environ and container == "rwm0"):
216 lxc.destroy(container)
217
218 def create_image(self, location):
219 """Creates and returns a CAL image"""
220
221 image = RwcalYang.ImageInfoItem()
222 image.name = "rift-lxc-image"
223 image.location = location
224 image.disk_format = "qcow2"
225 rc, image.id = self.cal.create_image(self.account, image)
226 return image
227
228 def create_network(self, network_name, subnet):
229 """Creates and returns a CAL network"""
230
231 network = RwcalYang.NetworkInfoItem(
232 network_name=network_name,
233 subnet=subnet,
234 )
235 rc, network.network_id = self.cal.create_network(self.account, network)
236 return network
237
238 def create_vm(self, image, index):
239 """Returns a VM
240
241 Arguments:
242 image - the image used to create the VM
243 index - an index used to label the VM
244
245 Returns:
246 A VM object
247
248 """
249 vm = RwcalYang.VMInfoItem()
250 vm.vm_name = 'rift-s{}'.format(index + 1)
251 vm.image_id = image.id
252 vm.user_tags.node_id = str(uuid.uuid4())
253
254 user_data_template_str = open(
255 os.path.join(
256 os.environ['RIFT_INSTALL'],
257 'etc/userdata-template',
258 )
259 ).read()
260
261 # Get the interface ip address of the mgmt network
262 # This is where the salt master is accessible on
263 mgmt_interface_ip = "192.168.122.1"
264
265 # Create salt-stack userdata
266 vm.cloud_init.userdata = user_data_template_str.format(
267 master_ip=mgmt_interface_ip,
268 lxcname=vm.user_tags.node_id,
269 )
270
271 rc, vm.vm_id = self.cal.create_vm(self.account, vm)
272
273 return vm
274
275 def wait_vm_salt_connection(self, vm, timeout_secs=600):
276 """ Wait for vm salt minion to reach up state with master """
277
278 vm_node_id = vm.user_tags.node_id
279 start_time = time.time()
280 self.log.debug("Waiting up to %s seconds for node id %s",
281 timeout_secs, vm_node_id)
282 while (time.time() - start_time) < timeout_secs:
283 try:
284 stdout = subprocess.check_output(
285 shlex.split('salt %s test.ping' % vm_node_id),
286 universal_newlines=True,
287 )
288 except subprocess.CalledProcessError:
289 continue
290
291 up_minions = stdout.splitlines()
292 for line in up_minions:
293 if "True" in line:
294 return
295
296 raise SaltConnectionTimeoutError(
297 "Salt id %s did not enter UP state in %s seconds" % (
298 vm_node_id, timeout_secs
299 )
300 )
301
302 def wait_ready(self):
303 """ Wait for all resources to become ready """
304
305 self.log.info("Waiting for all VM's to make a salt minion connection")
306 for i, vm in enumerate(self._vms):
307 self.wait_vm_salt_connection(vm)
308 self.log.debug(
309 "Node id %s came up (%s/%s)",
310 vm.user_tags.node_id, i + 1, len(self._vms)
311 )
312
313 def create_port(self, network, vm, index):
314 """Returns a port
315
316 Arguments:
317 network - a network object
318 vm - a VM object
319 index - an index to label the port
320
321 Returns:
322 Returns a port object
323
324 """
325 port = RwcalYang.PortInfoItem()
326 port.port_name = "eth1"
327 port.network_id = network.network_id
328 port.vm_id = vm.vm_id
329
330 rc, port.port_id = self.cal.create_port(self.account, port)
331 return port