3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
27 gi
.require_version('RwDts', '1.0')
28 gi
.require_version('RwcalYang', '1.0')
29 gi
.require_version('RwCal', '1.0')
32 from gi
.repository
import (
37 import rift
.rwcal
.cloudsim
.lvm
as lvm
38 import rift
.rwcal
.cloudsim
.lxc
as lxc
43 class SaltConnectionTimeoutError(Exception):
47 class ContainerManager(rift
.tasklets
.Tasklet
):
48 def __init__(self
, *args
, **kwargs
):
49 super(ContainerManager
, self
).__init
__(*args
, **kwargs
)
55 super(ContainerManager
, self
).start()
56 self
.log
.info("Starting ContainerManager")
57 self
.log
.setLevel(logging
.DEBUG
)
58 ResourceProvisioning
.log_hdl
= self
.log_hdl
60 self
.log
.debug("Registering with dts")
61 self
._dts
= rift
.tasklets
.DTS(
63 RwcalYang
.get_schema(),
65 self
.on_dts_state_change
68 self
.log
.debug("Created DTS Api GI Object: %s", self
._dts
)
70 def on_instance_started(self
):
71 self
.log
.debug("Got instance started callback")
74 super(ContainerManager
, self
).stop()
75 self
.resources
.destroy()
80 # Create the LVM backing store with the 'rift' volume group
81 self
.lvm
= LvmProvisioning()
82 self
.resources
= ResourceProvisioning(self
.loop
, self
.log
)
84 # Create lvm partition
85 yield from self
.loop
.run_in_executor(
87 self
.resources
.destroy
,
90 if "REUSE_LXC" not in os
.environ
:
91 # Create lvm partition
92 yield from self
.loop
.run_in_executor(
97 # Create lvm partition
98 yield from self
.loop
.run_in_executor(
103 # Create an initial set of VMs
104 yield from self
.loop
.run_in_executor(
106 self
.resources
.create
,
109 yield from self
.loop
.run_in_executor(
111 self
.resources
.wait_ready
,
119 def on_dts_state_change(self
, state
):
120 """Take action according to current dts state to transition
121 application into the corresponding application state
124 state - current dts state
128 rwdts
.State
.INIT
: rwdts
.State
.REGN_COMPLETE
,
129 rwdts
.State
.CONFIG
: rwdts
.State
.RUN
,
133 rwdts
.State
.INIT
: self
.init
,
134 rwdts
.State
.RUN
: self
.run
,
137 # Transition application to next state
138 handler
= handlers
.get(state
, None)
139 if handler
is not None:
142 # Transition dts to next state
143 next_state
= switch
.get(state
, None)
144 if next_state
is not None:
145 self
._dts
.handle
.set_state(next_state
)
148 class LvmProvisioning(object):
150 This class represents LVM provisioning.
154 """Creates an LVM backing store"""
158 """Destroys the existing LVM backing store"""
162 class ResourceProvisioning(object):
164 This is a placeholder class that is used to represent the provisioning of
171 def __init__(self
, loop
, log
):
172 # Initialize the CAL interface if it has not already been initialized
173 if ResourceProvisioning
.cal_interface
is None:
174 plugin
= rw_peas
.PeasPlugin('rwcal_cloudsimproxy', 'RwCal-1.0')
175 engine
, info
, extension
= plugin()
177 ResourceProvisioning
.cal_interface
= plugin
.get_interface("Cloud")
178 ResourceProvisioning
.cal_interface
.init(ResourceProvisioning
.log_hdl
)
180 self
.account
= RwcalYang
.CloudAccount()
181 self
.account
.account_type
= "cloudsim_proxy"
182 self
.account
.cloudsim_proxy
.host
= "192.168.122.1"
192 return ResourceProvisioning
.cal_interface
195 """Create all of the necessary resources"""
197 rift_root
= os
.environ
['RIFT_ROOT']
198 image
= self
.create_image("%s/images/rift-root-latest.qcow2" % (rift_root
))
201 for index
in range(self
.nvms
):
202 self
._vms
.append(self
.create_vm(image
, index
))
206 self
.cal
.start_vm(self
.account
, vm
.vm_id
)
209 """Destroy all of the provided resources"""
211 for container
in lxc
.containers():
214 for container
in lxc
.containers():
215 if not ("REUSE_LXC" in os
.environ
and container
== "rwm0"):
216 lxc
.destroy(container
)
218 def create_image(self
, location
):
219 """Creates and returns a CAL image"""
221 image
= RwcalYang
.ImageInfoItem()
222 image
.name
= "rift-lxc-image"
223 image
.location
= location
224 image
.disk_format
= "qcow2"
225 rc
, image
.id = self
.cal
.create_image(self
.account
, image
)
228 def create_network(self
, network_name
, subnet
):
229 """Creates and returns a CAL network"""
231 network
= RwcalYang
.NetworkInfoItem(
232 network_name
=network_name
,
235 rc
, network
.network_id
= self
.cal
.create_network(self
.account
, network
)
238 def create_vm(self
, image
, index
):
242 image - the image used to create the VM
243 index - an index used to label the VM
249 vm
= RwcalYang
.VMInfoItem()
250 vm
.vm_name
= 'rift-s{}'.format(index
+ 1)
251 vm
.image_id
= image
.id
252 vm
.user_tags
.node_id
= str(uuid
.uuid4())
254 user_data_template_str
= open(
256 os
.environ
['RIFT_INSTALL'],
257 'etc/userdata-template',
261 # Get the interface ip address of the mgmt network
262 # This is where the salt master is accessible on
263 mgmt_interface_ip
= "192.168.122.1"
265 # Create salt-stack userdata
266 vm
.cloud_init
.userdata
= user_data_template_str
.format(
267 master_ip
=mgmt_interface_ip
,
268 lxcname
=vm
.user_tags
.node_id
,
271 rc
, vm
.vm_id
= self
.cal
.create_vm(self
.account
, vm
)
275 def wait_vm_salt_connection(self
, vm
, timeout_secs
=600):
276 """ Wait for vm salt minion to reach up state with master """
278 vm_node_id
= vm
.user_tags
.node_id
279 start_time
= time
.time()
280 self
.log
.debug("Waiting up to %s seconds for node id %s",
281 timeout_secs
, vm_node_id
)
282 while (time
.time() - start_time
) < timeout_secs
:
284 stdout
= subprocess
.check_output(
285 shlex
.split('salt %s test.ping' % vm_node_id
),
286 universal_newlines
=True,
288 except subprocess
.CalledProcessError
:
291 up_minions
= stdout
.splitlines()
292 for line
in up_minions
:
296 raise SaltConnectionTimeoutError(
297 "Salt id %s did not enter UP state in %s seconds" % (
298 vm_node_id
, timeout_secs
302 def wait_ready(self
):
303 """ Wait for all resources to become ready """
305 self
.log
.info("Waiting for all VM's to make a salt minion connection")
306 for i
, vm
in enumerate(self
._vms
):
307 self
.wait_vm_salt_connection(vm
)
309 "Node id %s came up (%s/%s)",
310 vm
.user_tags
.node_id
, i
+ 1, len(self
._vms
)
313 def create_port(self
, network
, vm
, index
):
317 network - a network object
319 index - an index to label the port
322 Returns a port object
325 port
= RwcalYang
.PortInfoItem()
326 port
.port_name
= "eth1"
327 port
.network_id
= network
.network_id
328 port
.vm_id
= vm
.vm_id
330 rc
, port
.port_id
= self
.cal
.create_port(self
.account
, port
)