Create VL with appropriate type after launchpad restart
[osm/SO.git] / rwlaunchpad / plugins / rwresmgr / rift / tasklets / rwresmgrtasklet / rwresmgr_core.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import uuid
18 import collections
19 import asyncio
20 import concurrent.futures
21
22 import gi
23 gi.require_version('RwDts', '1.0')
24 gi.require_version('RwYang', '1.0')
25 gi.require_version('RwResourceMgrYang', '1.0')
26 gi.require_version('RwLaunchpadYang', '1.0')
27 gi.require_version('RwcalYang', '1.0')
28 from gi.repository import (
29 RwDts as rwdts,
30 RwYang,
31 RwResourceMgrYang,
32 RwLaunchpadYang,
33 RwcalYang,
34 )
35
36 from gi.repository.RwTypes import RwStatus
37
38 class ResMgrCALNotPresent(Exception):
39 pass
40
41 class ResMgrCloudAccountNotFound(Exception):
42 pass
43
44 class ResMgrCloudAccountExists(Exception):
45 pass
46
47 class ResMgrCloudAccountInUse(Exception):
48 pass
49
50 class ResMgrDuplicatePool(Exception):
51 pass
52
53 class ResMgrPoolNotAvailable(Exception):
54 pass
55
56 class ResMgrPoolOperationFailed(Exception):
57 pass
58
59 class ResMgrDuplicateEventId(Exception):
60 pass
61
62 class ResMgrUnknownEventId(Exception):
63 pass
64
65 class ResMgrUnknownResourceId(Exception):
66 pass
67
68 class ResMgrResourceIdBusy(Exception):
69 pass
70
71 class ResMgrResourceIdNotAllocated(Exception):
72 pass
73
74 class ResMgrNoResourcesAvailable(Exception):
75 pass
76
77 class ResMgrResourcesInitFailed(Exception):
78 pass
79
80 class ResMgrCALOperationFailure(Exception):
81 pass
82
83
84
85 class ResourceMgrCALHandler(object):
86 def __init__(self, loop, executor, log, log_hdl, account):
87 self._log = log
88 self._loop = loop
89 self._executor = executor
90 self._account = account.cal_account_msg
91 self._rwcal = account.cal
92 if account.account_type == 'aws':
93 self._subnets = ["172.31.97.0/24", "172.31.98.0/24", "172.31.99.0/24", "172.31.100.0/24", "172.31.101.0/24"]
94 else:
95 self._subnets = ["11.0.0.0/24",
96 "12.0.0.0/24",
97 "13.0.0.0/24",
98 "14.0.0.0/24",
99 "15.0.0.0/24",
100 "16.0.0.0/24",
101 "17.0.0.0/24",
102 "18.0.0.0/24",
103 "19.0.0.0/24",
104 "20.0.0.0/24",
105 "21.0.0.0/24",
106 "22.0.0.0/24",]
107 self._subnet_ptr = 0
108
109 def _select_link_subnet(self):
110 subnet = self._subnets[self._subnet_ptr]
111 self._subnet_ptr += 1
112 if self._subnet_ptr == len(self._subnets):
113 self._subnet_ptr = 0
114 return subnet
115
116 @asyncio.coroutine
117 def create_virtual_network(self, req_params):
118 #rc, rsp = self._rwcal.get_virtual_link_list(self._account)
119 self._log.debug("Calling get_virtual_link_list API")
120 rc, rsp = yield from self._loop.run_in_executor(self._executor,
121 self._rwcal.get_virtual_link_list,
122 self._account)
123
124 assert rc == RwStatus.SUCCESS
125
126 links = [vlink for vlink in rsp.virtual_link_info_list if vlink.name == req_params.name]
127 if links:
128 self._log.debug("Found existing virtual-network with matching name in cloud. Reusing the virtual-network with id: %s" %(links[0].virtual_link_id))
129 if req_params.vim_network_name:
130 resource_type = 'precreated'
131 else:
132 # This is case of realloc
133 resource_type = 'dynamic'
134 return (resource_type, links[0].virtual_link_id)
135 elif req_params.vim_network_name:
136 self._log.error("Virtual-network-allocate operation failed for cloud account: %s Vim Network with name %s does not pre-exist",
137 self._account.name, req_params.vim_network_name)
138 raise ResMgrCALOperationFailure("Virtual-network allocate operation failed for cloud account: %s Vim Network with name %s does not pre-exist"
139 %(self._account.name, req_params.vim_network_name))
140
141 params = RwcalYang.VirtualLinkReqParams()
142 params.from_dict(req_params.as_dict())
143 params.subnet = self._select_link_subnet()
144 #rc, rs = self._rwcal.create_virtual_link(self._account, params)
145 self._log.debug("Calling create_virtual_link API with params: %s" %(str(req_params)))
146 rc, rs = yield from self._loop.run_in_executor(self._executor,
147 self._rwcal.create_virtual_link,
148 self._account,
149 params)
150 if rc.status != RwStatus.SUCCESS:
151 self._log.error("Virtual-network-allocate operation failed for cloud account: %s - error_msg: %s, Traceback: %s",
152 self._account.name, rc.error_msg, rc.traceback)
153 raise ResMgrCALOperationFailure("Virtual-network allocate operation failed for cloud account: %s (%s)"
154 %(self._account.name, rc.error_msg))
155
156 return ('dynamic',rs)
157
158 @asyncio.coroutine
159 def delete_virtual_network(self, network_id):
160 #rc = self._rwcal.delete_virtual_link(self._account, network_id)
161 self._log.debug("Calling delete_virtual_link API with id: %s" %(network_id))
162 rc = yield from self._loop.run_in_executor(self._executor,
163 self._rwcal.delete_virtual_link,
164 self._account,
165 network_id)
166 if rc != RwStatus.SUCCESS:
167 self._log.error("Virtual-network-release operation failed for cloud account: %s. ResourceID: %s",
168 self._account.name,
169 network_id)
170 raise ResMgrCALOperationFailure("Virtual-network release operation failed for cloud account: %s. ResourceId: %s" %(self._account.name, network_id))
171
172 @asyncio.coroutine
173 def get_virtual_network_info(self, network_id):
174 #rc, rs = self._rwcal.get_virtual_link(self._account, network_id)
175 self._log.debug("Calling get_virtual_link_info API with id: %s" %(network_id))
176 rc, rs = yield from self._loop.run_in_executor(self._executor,
177 self._rwcal.get_virtual_link,
178 self._account,
179 network_id)
180 if rc != RwStatus.SUCCESS:
181 self._log.error("Virtual-network-info operation failed for cloud account: %s. ResourceID: %s",
182 self._account.name,
183 network_id)
184 raise ResMgrCALOperationFailure("Virtual-network-info operation failed for cloud account: %s. ResourceID: %s" %(self._account.name, network_id))
185 return rs
186
187 @asyncio.coroutine
188 def create_virtual_compute(self, req_params):
189 #rc, rsp = self._rwcal.get_vdu_list(self._account)
190 self._log.debug("Calling get_vdu_list API")
191
192 rc, rsp = yield from self._loop.run_in_executor(self._executor,
193 self._rwcal.get_vdu_list,
194 self._account)
195 assert rc == RwStatus.SUCCESS
196 vdus = [vm for vm in rsp.vdu_info_list if vm.name == req_params.name]
197 if vdus:
198 self._log.debug("Found existing virtual-compute with matching name in cloud. Reusing the virtual-compute element with id: %s" %(vdus[0].vdu_id))
199 return vdus[0].vdu_id
200
201 params = RwcalYang.VDUInitParams()
202 params.from_dict(req_params.as_dict())
203
204 if 'image_name' in req_params:
205 image_checksum = req_params.image_checksum if req_params.has_field("image_checksum") else None
206 params.image_id = yield from self.get_image_id_from_image_info(req_params.image_name, image_checksum)
207
208 #rc, rs = self._rwcal.create_vdu(self._account, params)
209 self._log.debug("Calling create_vdu API with params %s" %(str(params)))
210 rc, rs = yield from self._loop.run_in_executor(self._executor,
211 self._rwcal.create_vdu,
212 self._account,
213 params)
214
215 if rc.status != RwStatus.SUCCESS:
216 self._log.error("Virtual-compute-create operation failed for cloud account: %s - error_msg: %s, Traceback: %s",
217 self._account.name, rc.error_msg, rc.traceback)
218 raise ResMgrCALOperationFailure("Virtual-compute-create operation failed for cloud account: %s (%s)"
219 %(self._account.name, rc.error_msg))
220
221 return rs
222
223 @asyncio.coroutine
224 def modify_virtual_compute(self, req_params):
225 #rc = self._rwcal.modify_vdu(self._account, req_params)
226 self._log.debug("Calling modify_vdu API with params: %s" %(str(req_params)))
227 rc = yield from self._loop.run_in_executor(self._executor,
228 self._rwcal.modify_vdu,
229 self._account,
230 req_params)
231 if rc != RwStatus.SUCCESS:
232 self._log.error("Virtual-compute-modify operation failed for cloud account: %s", self._account.name)
233 raise ResMgrCALOperationFailure("Virtual-compute-modify operation failed for cloud account: %s" %(self._account.name))
234
235 @asyncio.coroutine
236 def delete_virtual_compute(self, compute_id):
237 #rc = self._rwcal.delete_vdu(self._account, compute_id)
238 self._log.debug("Calling delete_vdu API with id: %s" %(compute_id))
239 rc = yield from self._loop.run_in_executor(self._executor,
240 self._rwcal.delete_vdu,
241 self._account,
242 compute_id)
243 if rc != RwStatus.SUCCESS:
244 self._log.error("Virtual-compute-release operation failed for cloud account: %s. ResourceID: %s",
245 self._account.name,
246 compute_id)
247 raise ResMgrCALOperationFailure("Virtual-compute-release operation failed for cloud account: %s. ResourceID: %s" %(self._account.name, compute_id))
248
249 @asyncio.coroutine
250 def get_virtual_compute_info(self, compute_id):
251 #rc, rs = self._rwcal.get_vdu(self._account, compute_id)
252 self._log.debug("Calling get_vdu API with id: %s" %(compute_id))
253 rc, rs = yield from self._loop.run_in_executor(self._executor,
254 self._rwcal.get_vdu,
255 self._account,
256 compute_id)
257 if rc != RwStatus.SUCCESS:
258 self._log.error("Virtual-compute-info operation failed for cloud account: %s. ResourceID: %s",
259 self._account.name,
260 compute_id)
261 raise ResMgrCALOperationFailure("Virtual-compute-info operation failed for cloud account: %s. ResourceID: %s" %(self._account.name, compute_id))
262 return rs
263
264 @asyncio.coroutine
265 def get_compute_flavor_info_list(self):
266 #rc, rs = self._rwcal.get_flavor_list(self._account)
267 self._log.debug("Calling get_flavor_list API")
268 rc, rs = yield from self._loop.run_in_executor(self._executor,
269 self._rwcal.get_flavor_list,
270 self._account)
271 if rc != RwStatus.SUCCESS:
272 self._log.error("Get-flavor-info-list operation failed for cloud account: %s",
273 self._account.name)
274 raise ResMgrCALOperationFailure("Get-flavor-info-list operation failed for cloud account: %s" %(self._account.name))
275 return rs.flavorinfo_list
276
277 @asyncio.coroutine
278 def create_compute_flavor(self, request):
279 flavor = RwcalYang.FlavorInfoItem()
280 flavor.name = str(uuid.uuid4())
281 epa_types = ['vm_flavor', 'guest_epa', 'host_epa', 'host_aggregate']
282 epa_dict = {k: v for k, v in request.as_dict().items() if k in epa_types}
283 flavor.from_dict(epa_dict)
284
285 self._log.info("Creating flavor: %s", flavor)
286 #rc, rs = self._rwcal.create_flavor(self._account, flavor)
287 self._log.debug("Calling create_flavor API")
288 rc, rs = yield from self._loop.run_in_executor(self._executor,
289 self._rwcal.create_flavor,
290 self._account,
291 flavor)
292 if rc != RwStatus.SUCCESS:
293 self._log.error("Create-flavor operation failed for cloud account: %s",
294 self._account.name)
295 raise ResMgrCALOperationFailure("Create-flavor operation failed for cloud account: %s" %(self._account.name))
296 return rs
297
298 @asyncio.coroutine
299 def get_image_info_list(self):
300 #rc, rs = self._rwcal.get_image_list(self._account)
301 self._log.debug("Calling get_image_list API")
302 rc, rs = yield from self._loop.run_in_executor(self._executor,
303 self._rwcal.get_image_list,
304 self._account)
305 if rc != RwStatus.SUCCESS:
306 self._log.error("Get-image-info-list operation failed for cloud account: %s",
307 self._account.name)
308 raise ResMgrCALOperationFailure("Get-image-info-list operation failed for cloud account: %s" %(self._account.name))
309 return rs.imageinfo_list
310
311 @asyncio.coroutine
312 def get_image_id_from_image_info(self, image_name, image_checksum=None):
313 self._log.debug("Looking up image id for image name %s and checksum %s on cloud account: %s",
314 image_name, image_checksum, self._account.name
315 )
316
317 image_list = yield from self.get_image_info_list()
318 matching_images = [i for i in image_list if i.name == image_name]
319
320 # If the image checksum was filled in then further filter the images by the checksum
321 if image_checksum is not None:
322 matching_images = [i for i in matching_images if i.checksum == image_checksum]
323 else:
324 self._log.warning("Image checksum not provided. Lookup using image name (%s) only.",
325 image_name)
326
327 if len(matching_images) == 0:
328 raise ResMgrCALOperationFailure("Could not find image name {} (using checksum: {}) for cloud account: {}".format(
329 image_name, image_checksum, self._account.name
330 ))
331
332 elif len(matching_images) > 1:
333 unique_checksums = {i.checksum for i in matching_images}
334 if len(unique_checksums) > 1:
335 msg = ("Too many images with different checksums matched "
336 "image name of %s for cloud account: %s" % (image_name, self._account.name))
337 raise ResMgrCALOperationFailure(msg)
338
339 return matching_images[0].id
340
341 @asyncio.coroutine
342 def get_image_info(self, image_id):
343 #rc, rs = self._rwcal.get_image(self._account, image_id)
344 self._log.debug("Calling get_image API for id: %s" %(image_id))
345 rc, rs = yield from self._loop.run_in_executor(self._executor,
346 self._rwcal.get_image,
347 self._account,
348 image_id)
349 if rc != RwStatus.SUCCESS:
350 self._log.error("Get-image-info-list operation failed for cloud account: %s",
351 self._account.name)
352 raise ResMgrCALOperationFailure("Get-image-info operation failed for cloud account: %s" %(self._account.name))
353 return rs.imageinfo_list
354
355 def dynamic_flavor_supported(self):
356 return getattr(self._account, self._account.account_type).dynamic_flavor_support
357
358
359 class Resource(object):
360 def __init__(self, resource_id, resource_type, request):
361 self._id = resource_id
362 self._type = resource_type
363 self._request = request
364
365 @property
366 def resource_id(self):
367 return self._id
368
369 @property
370 def resource_type(self):
371 return self._type
372
373 @property
374 def request(self):
375 return self._request
376
377 def cleanup(self):
378 pass
379
380
381 class ComputeResource(Resource):
382 pass
383
384
385 class NetworkResource(Resource):
386 pass
387
388
389 class ResourcePoolInfo(object):
390 def __init__(self, name, pool_type, resource_type, max_size):
391 self.name = name
392 self.pool_type = pool_type
393 self.resource_type = resource_type
394 self.max_size = max_size
395
396 @classmethod
397 def from_dict(cls, pool_dict):
398 return cls(
399 pool_dict["name"],
400 pool_dict["pool_type"],
401 pool_dict["resource_type"],
402 pool_dict["max_size"],
403 )
404
405
406 class ResourcePool(object):
407 def __init__(self, log, loop, pool_info, resource_class, cal):
408 self._log = log
409 self._loop = loop
410 self._name = pool_info.name
411 self._pool_type = pool_info.pool_type
412 self._resource_type = pool_info.resource_type
413 self._cal = cal
414 self._resource_class = resource_class
415
416 self._max_size = pool_info.max_size
417
418 self._status = 'unlocked'
419 ### A Dictionary of all the resources in this pool, keyed by CAL resource-id
420 self._all_resources = {}
421 ### A List of free resources in this pool
422 self._free_resources = []
423 ### A Dictionary of all the allocated resources in this pool, keyed by CAL resource-id
424 self._allocated_resources = {}
425
426 @property
427 def name(self):
428 return self._name
429
430 @property
431 def cal(self):
432 """ This instance's ResourceMgrCALHandler """
433 return self._cal
434
435 @property
436 def pool_type(self):
437 return self._pool_type
438
439 @property
440 def resource_type(self):
441 return self._resource_type
442
443 @property
444 def max_size(self):
445 return self._max_size
446
447 @property
448 def status(self):
449 return self._status
450
451 def in_use(self):
452 if len(self._allocated_resources) != 0:
453 return True
454 else:
455 return False
456
457 def update_cal_handler(self, cal):
458 if self.in_use():
459 raise ResMgrPoolOperationFailed(
460 "Cannot update CAL plugin for in use pool"
461 )
462
463 self._cal = cal
464
465 def lock_pool(self):
466 self._log.info("Locking the pool :%s", self.name)
467 self._status = 'locked'
468
469 def unlock_pool(self):
470 self._log.info("Unlocking the pool :%s", self.name)
471 self._status = 'unlocked'
472
473 def add_resource(self, resource_info):
474 self._log.info("Adding static resource to Pool: %s, Resource-id: %s Resource-Type: %s",
475 self.name,
476 resource_info.resource_id,
477 self.resource_type)
478
479 ### Add static resources to pool
480 resource = self._resource_class(resource_info.resource_id, 'static')
481 assert resource.resource_id == resource_info.resource_id
482 self._all_resources[resource.resource_id] = resource
483 self._free_resources.append(resource)
484
485 def delete_resource(self, resource_id):
486 if resource_id not in self._all_resources:
487 self._log.error("Resource Id: %s not present in pool: %s. Delete operation failed", resource_id, self.name)
488 raise ResMgrUnknownResourceId("Resource Id: %s requested for release is not found" %(resource_id))
489
490 if resource_id in self._allocated_resources:
491 self._log.error("Resource Id: %s in use. Delete operation failed", resource_id)
492 raise ResMgrResourceIdBusy("Resource Id: %s requested for release is in use" %(resource_id))
493
494 self._log.info("Deleting resource: %s from pool: %s, Resource-Type",
495 resource_id,
496 self.name,
497 self.resource_type)
498
499 resource = self._all_resources.pop(resource_id)
500 self._free_resources.remove(resource)
501 resource.cleanup()
502 del resource
503
504 @asyncio.coroutine
505 def read_resource_info(self, resource_id):
506 if resource_id not in self._all_resources:
507 self._log.error("Resource Id: %s not present in pool: %s. Read operation failed", resource_id, self.name)
508 raise ResMgrUnknownResourceId("Resource Id: %s requested for read is not found" %(resource_id))
509
510 if resource_id not in self._allocated_resources:
511 self._log.error("Resource Id: %s not in use. Read operation failed", resource_id)
512 raise ResMgrResourceIdNotAllocated("Resource Id: %s not in use. Read operation failed" %(resource_id))
513
514 resource = self._allocated_resources[resource_id]
515 resource_info = yield from self.get_resource_info(resource)
516 return resource_info
517
518 def get_pool_info(self):
519 info = RwResourceMgrYang.ResourceRecordInfo()
520 self._log.info("Providing info for pool: %s", self.name)
521 info.name = self.name
522 if self.pool_type:
523 info.pool_type = self.pool_type
524 if self.resource_type:
525 info.resource_type = self.resource_type
526 if self.status:
527 info.pool_status = self.status
528
529 info.total_resources = len(self._all_resources)
530 info.free_resources = len(self._free_resources)
531 info.allocated_resources = len(self._allocated_resources)
532 return info
533
534 def cleanup(self):
535 for _, v in self._all_resources.items():
536 v.cleanup()
537
538 @asyncio.coroutine
539 def _allocate_static_resource(self, request, resource_type):
540 unit_type = {'compute': 'VDU', 'network':'VirtualLink'}
541 match_found = False
542 resource = None
543 self._log.info("Doing resource match from pool :%s", self._free_resources)
544 for resource in self._free_resources:
545 resource_info = yield from self.get_resource_info(resource)
546 self._log.info("Attempting to match %s-requirements for %s: %s with resource-id :%s",
547 resource_type, unit_type[resource_type],request.name, resource.resource_id)
548 if self.match_epa_params(resource_info, request):
549 if self.match_image_params(resource_info, request):
550 match_found = True
551 self._log.info("%s-requirements matched for %s: %s with resource-id :%s",
552 resource_type, unit_type[resource_type],request.name, resource.resource_id)
553 yield from self.initialize_resource_in_cal(resource, request)
554 break
555
556 if not match_found:
557 self._log.error("No match found for %s-requirements for %s: %s in pool: %s. %s instantiation failed",
558 resource_type,
559 unit_type[resource_type],
560 request.name,
561 self.name,
562 unit_type[resource_type])
563 return None
564 else:
565 ### Move resource from free-list into allocated-list
566 self._log.info("Allocating the static resource with resource-id: %s for %s: %s",
567 resource.resource_id,
568 unit_type[resource_type],request.name)
569 self._free_resources.remove(resource)
570 self._allocated_resources[resource.resource_id] = resource
571
572 return resource
573
574 @asyncio.coroutine
575 def allocate_resource(self, request):
576 resource = yield from self.allocate_resource_in_cal(request)
577 resource_info = yield from self.get_resource_info(resource)
578 return resource.resource_id, resource_info
579
580 @asyncio.coroutine
581 def release_resource(self, resource_id):
582 self._log.debug("Releasing resource_id %s in pool %s", resource_id, self.name)
583 if resource_id not in self._allocated_resources:
584 self._log.error("Failed to release a resource with resource-id: %s in pool: %s. Resource not known",
585 resource_id,
586 self.name)
587 raise ResMgrUnknownResourceId("Failed to release resource with resource-id: %s. Unknown resource-id" %(resource_id))
588
589 ### Get resource object
590 resource = self._allocated_resources.pop(resource_id)
591 yield from self.uninitialize_resource_in_cal(resource)
592 yield from self.release_cal_resource(resource)
593
594
595 class NetworkPool(ResourcePool):
596 def __init__(self, log, loop, pool_info, cal):
597 super(NetworkPool, self).__init__(log, loop, pool_info, NetworkResource, cal)
598
599 @asyncio.coroutine
600 def allocate_resource_in_cal(self, request):
601 resource = None
602 if self.pool_type == 'static':
603 self._log.info("Attempting network resource allocation from static pool: %s", self.name)
604 ### Attempt resource allocation from static pool
605 resource = yield from self._allocate_static_resource(request, 'network')
606 elif self.pool_type == 'dynamic':
607 ### Attempt resource allocation from dynamic pool
608 self._log.info("Attempting network resource allocation from dynamic pool: %s", self.name)
609 if len(self._free_resources) != 0:
610 self._log.info("Dynamic pool: %s has %d static resources, Attempting resource allocation from static resources",
611 self.name, len(self._free_resources))
612 resource = yield from self._allocate_static_resource(request, 'network')
613 if resource is None:
614 self._log.info("Could not resource from static resources. Going for dynamic resource allocation")
615 ## Not static resource available. Attempt dynamic resource from pool
616 resource = yield from self.allocate_dynamic_resource(request)
617 if resource is None:
618 raise ResMgrNoResourcesAvailable("No matching resource available for allocation from pool: %s" %(self.name))
619 return resource
620
621 @asyncio.coroutine
622 def allocate_dynamic_resource(self, request):
623 resource_type, resource_id = yield from self._cal.create_virtual_network(request)
624 if resource_id in self._all_resources:
625 self._log.error("Resource with id %s name %s of type %s is already used", resource_id, request.name, resource_type)
626 raise ResMgrNoResourcesAvailable("Resource with name %s of type network is already used" %(resource_id))
627 resource = self._resource_class(resource_id, resource_type, request)
628 self._all_resources[resource_id] = resource
629 self._allocated_resources[resource_id] = resource
630 self._log.info("Successfully allocated virtual-network resource from CAL with resource-id: %s resource type %s", resource_id, resource_type)
631 return resource
632
633 @asyncio.coroutine
634 def release_cal_resource(self, resource):
635 if resource.resource_type == 'dynamic':
636 self._log.debug("Deleting virtual network with network_id: %s", resource.resource_id)
637 yield from self._cal.delete_virtual_network(resource.resource_id)
638 self._all_resources.pop(resource.resource_id)
639 self._log.info("Successfully released virtual-network resource in CAL with resource-id: %s", resource.resource_id)
640 elif resource.resource_type == 'precreated':
641 self._all_resources.pop(resource.resource_id)
642 self._log.info("Successfully removed precreated virtual-network resource from allocated list: %s", resource.resource_id)
643 else:
644 self._log.info("Successfully released virtual-network resource with resource-id: %s into available-list", resource.resource_id)
645 self._free_resources.append(resource)
646
647 @asyncio.coroutine
648 def get_resource_info(self, resource):
649 info = yield from self._cal.get_virtual_network_info(resource.resource_id)
650 self._log.info("Successfully retrieved virtual-network information from CAL with resource-id: %s. Info: %s",
651 resource.resource_id, str(info))
652 response = RwResourceMgrYang.VirtualLinkEventData_ResourceInfo()
653 response.from_dict(info.as_dict())
654 response.pool_name = self.name
655 response.resource_state = 'active'
656 return response
657
658 @asyncio.coroutine
659 def get_info_by_id(self, resource_id):
660 info = yield from self._cal.get_virtual_network_info(resource_id)
661 self._log.info("Successfully retrieved virtual-network information from CAL with resource-id: %s. Info: %s",
662 resource_id, str(info))
663 return info
664
665 def match_image_params(self, resource_info, request_params):
666 return True
667
668 def match_epa_params(self, resource_info, request_params):
669 if not hasattr(request_params, 'provider_network'):
670 ### Its a match if nothing is requested
671 return True
672 else:
673 required = getattr(request_params, 'provider_network')
674
675 if not hasattr(resource_info, 'provider_network'):
676 ### Its no match
677 return False
678 else:
679 available = getattr(resource_info, 'provider_network')
680
681 self._log.debug("Matching Network EPA params. Required: %s, Available: %s", required, available)
682
683 if required.has_field('name') and required.name!= available.name:
684 self._log.debug("Provider Network mismatch. Required: %s, Available: %s",
685 required.name,
686 available.name)
687 return False
688
689 self._log.debug("Matching EPA params physical network name")
690
691 if required.has_field('physical_network') and required.physical_network != available.physical_network:
692 self._log.debug("Physical Network mismatch. Required: %s, Available: %s",
693 required.physical_network,
694 available.physical_network)
695 return False
696
697 self._log.debug("Matching EPA params overlay type")
698 if required.has_field('overlay_type') and required.overlay_type != available.overlay_type:
699 self._log.debug("Overlay type mismatch. Required: %s, Available: %s",
700 required.overlay_type,
701 available.overlay_type)
702 return False
703
704 self._log.debug("Matching EPA params SegmentationID")
705 if required.has_field('segmentation_id') and required.segmentation_id != available.segmentation_id:
706 self._log.debug("Segmentation-Id mismatch. Required: %s, Available: %s",
707 required.segmentation_id,
708 available.segmentation_id)
709 return False
710 return True
711
712 @asyncio.coroutine
713 def initialize_resource_in_cal(self, resource, request):
714 pass
715
716 @asyncio.coroutine
717 def uninitialize_resource_in_cal(self, resource):
718 pass
719
720
721 class ComputePool(ResourcePool):
722 def __init__(self, log, loop, pool_info, cal):
723 super(ComputePool, self).__init__(log, loop, pool_info, ComputeResource, cal)
724
725 @asyncio.coroutine
726 def allocate_resource_in_cal(self, request):
727 resource = None
728 if self.pool_type == 'static':
729 self._log.info("Attempting compute resource allocation from static pool: %s", self.name)
730 ### Attempt resource allocation from static pool
731 resource = yield from self._allocate_static_resource(request, 'compute')
732 elif self.pool_type == 'dynamic':
733 ### Attempt resource allocation from dynamic pool
734 self._log.info("Attempting compute resource allocation from dynamic pool: %s", self.name)
735 if len(self._free_resources) != 0:
736 self._log.info("Dynamic pool: %s has %d static resources, Attempting resource allocation from static resources",
737 len(self._free_resources),
738 self.name)
739 resource = yield from self._allocate_static_resource(request, 'compute')
740 if resource is None:
741 self._log.info("Attempting for dynamic resource allocation")
742 resource = yield from self.allocate_dynamic_resource(request)
743 if resource is None:
744 raise ResMgrNoResourcesAvailable("No matching resource available for allocation from pool: %s" %(self.name))
745
746 requested_params = RwcalYang.VDUInitParams()
747 requested_params.from_dict(request.as_dict())
748 resource.requested_params = requested_params
749 return resource
750
751 @asyncio.coroutine
752 def allocate_dynamic_resource(self, request):
753 #request.flavor_id = yield from self.select_resource_flavor(request)
754 resource_id = yield from self._cal.create_virtual_compute(request)
755 resource = self._resource_class(resource_id, 'dynamic', request)
756 self._all_resources[resource_id] = resource
757 self._allocated_resources[resource_id] = resource
758 self._log.info("Successfully allocated virtual-compute resource from CAL with resource-id: %s", resource_id)
759 return resource
760
761 @asyncio.coroutine
762 def release_cal_resource(self, resource):
763 if hasattr(resource, 'requested_params'):
764 delattr(resource, 'requested_params')
765 if resource.resource_type == 'dynamic':
766 yield from self._cal.delete_virtual_compute(resource.resource_id)
767 self._all_resources.pop(resource.resource_id)
768 self._log.info("Successfully released virtual-compute resource in CAL with resource-id: %s", resource.resource_id)
769 else:
770 self._log.info("Successfully released virtual-compute resource with resource-id: %s into available-list", resource.resource_id)
771 self._free_resources.append(resource)
772
773 @asyncio.coroutine
774 def get_resource_info(self, resource):
775 info = yield from self._cal.get_virtual_compute_info(resource.resource_id)
776
777 self._log.info("Successfully retrieved virtual-compute information from CAL with resource-id: %s. Info: %s",
778 resource.resource_id, str(info))
779 response = RwResourceMgrYang.VDUEventData_ResourceInfo()
780 response.from_dict(info.as_dict())
781 response.pool_name = self.name
782 response.resource_state = self._get_resource_state(info, resource.requested_params)
783 return response
784
785 @asyncio.coroutine
786 def get_info_by_id(self, resource_id):
787 info = yield from self._cal.get_virtual_compute_info(resource_id)
788 self._log.info("Successfully retrieved virtual-compute information from CAL with resource-id: %s. Info: %s",
789 resource_id, str(info))
790 return info
791
792 def _get_resource_state(self, resource_info, requested_params):
793
794
795 def conn_pts_len_equal():
796 # if explicit mgmt network is defined then the allocated ports might
797 # one more than the expected.
798 allocated_ports = len(resource_info.connection_points)
799 requested_ports = len(requested_params.connection_points)
800
801 if not requested_params.mgmt_network:
802 allocated_ports -= 1
803
804 return allocated_ports == requested_ports
805
806 if resource_info.state == 'failed':
807 self._log.error("<Compute-Resource: %s> Reached failed state.",
808 resource_info.name)
809 return 'failed'
810
811 if resource_info.state != 'active':
812 self._log.info("<Compute-Resource: %s> Not reached active state.",
813 resource_info.name)
814 return 'pending'
815
816 if not resource_info.has_field('management_ip') or resource_info.management_ip == '':
817 self._log.info("<Compute-Resource: %s> Management IP not assigned.",
818 resource_info.name)
819 return 'pending'
820
821 if (requested_params.has_field('allocate_public_address')) and (requested_params.allocate_public_address == True):
822 if not resource_info.has_field('public_ip'):
823 self._log.warning("<Compute-Resource: %s> Management IP not assigned- waiting for public ip, %s",
824 resource_info.name, requested_params)
825 return 'pending'
826
827 if not conn_pts_len_equal():
828 self._log.warning("<Compute-Resource: %s> Waiting for requested number of ports to be assigned to virtual-compute, requested: %d, assigned: %d",
829 resource_info.name,
830 len(requested_params.connection_points),
831 len(resource_info.connection_points))
832 return 'pending'
833
834 #not_active = [c for c in resource_info.connection_points
835 # if c.state != 'active']
836
837 #if not_active:
838 # self._log.warning("<Compute-Resource: %s> Management IP not assigned- waiting for connection_points , %s",
839 # resource_info.name, resource_info)
840 # return 'pending'
841
842 ## Find the connection_points which are in active state but does not have IP address
843 no_address = [c for c in resource_info.connection_points
844 if (c.state == 'active') and (not c.has_field('ip_address'))]
845
846 if no_address:
847 self._log.warning("<Compute-Resource: %s> Management IP not assigned- waiting for connection_points , %s",
848 resource_info.name, resource_info)
849 return 'pending'
850
851 return 'active'
852
853 @asyncio.coroutine
854 def select_resource_flavor(self, request):
855 flavors = yield from self._cal.get_compute_flavor_info_list()
856 self._log.debug("Received %d flavor information from RW.CAL", len(flavors))
857 flavor_id = None
858 match_found = False
859 for flv in flavors:
860 self._log.info("Attempting to match compute requirement for VDU: %s with flavor %s",
861 request.name, flv)
862 if self.match_epa_params(flv, request):
863 self._log.info("Flavor match found for compute requirements for VDU: %s with flavor name: %s, flavor-id: %s",
864 request.name, flv.name, flv.id)
865 match_found = True
866 flavor_id = flv.id
867 break
868
869 if not match_found:
870 ### Check if CAL account allows dynamic flavor creation
871 if self._cal.dynamic_flavor_supported():
872 self._log.info("Attempting to create a new flavor for required compute-requirement for VDU: %s", request.name)
873 flavor_id = yield from self._cal.create_compute_flavor(request)
874 else:
875 ### No match with existing flavors and CAL does not support dynamic flavor creation
876 self._log.error("Unable to create flavor for compute requirement for VDU: %s. VDU instantiation failed", request.name)
877 raise ResMgrNoResourcesAvailable("No resource available with matching EPA attributes")
878 else:
879 ### Found flavor
880 self._log.info("Found flavor with id: %s for compute requirement for VDU: %s",
881 flavor_id, request.name)
882 return flavor_id
883
884 def _match_vm_flavor(self, required, available):
885 self._log.info("Matching VM Flavor attributes")
886 if available.vcpu_count != required.vcpu_count:
887 self._log.debug("VCPU requirement mismatch. Required: %d, Available: %d",
888 required.vcpu_count,
889 available.vcpu_count)
890 return False
891 if available.memory_mb != required.memory_mb:
892 self._log.debug("Memory requirement mismatch. Required: %d MB, Available: %d MB",
893 required.memory_mb,
894 available.memory_mb)
895 return False
896 if available.storage_gb != required.storage_gb:
897 self._log.debug("Storage requirement mismatch. Required: %d GB, Available: %d GB",
898 required.storage_gb,
899 available.storage_gb)
900 return False
901 self._log.debug("VM Flavor match found")
902 return True
903
904 def _match_guest_epa(self, required, available):
905 self._log.info("Matching Guest EPA attributes")
906 if required.has_field('pcie_device'):
907 self._log.debug("Matching pcie_device")
908 if available.has_field('pcie_device') == False:
909 self._log.debug("Matching pcie_device failed. Not available in flavor")
910 return False
911 else:
912 for dev in required.pcie_device:
913 if not [ d for d in available.pcie_device
914 if ((d.device_id == dev.device_id) and (d.count == dev.count)) ]:
915 self._log.debug("Matching pcie_device failed. Required: %s, Available: %s", required.pcie_device, available.pcie_device)
916 return False
917 elif available.has_field('pcie_device'):
918 self._log.debug("Rejecting available flavor because pcie_device not required but available")
919 return False
920
921
922 if required.has_field('mempage_size'):
923 self._log.debug("Matching mempage_size")
924 if available.has_field('mempage_size') == False:
925 self._log.debug("Matching mempage_size failed. Not available in flavor")
926 return False
927 else:
928 if required.mempage_size != available.mempage_size:
929 self._log.debug("Matching mempage_size failed. Required: %s, Available: %s", required.mempage_size, available.mempage_size)
930 return False
931 elif available.has_field('mempage_size'):
932 self._log.debug("Rejecting available flavor because mempage_size not required but available")
933 return False
934
935 if required.has_field('cpu_pinning_policy'):
936 self._log.debug("Matching cpu_pinning_policy")
937 if required.cpu_pinning_policy != 'ANY':
938 if available.has_field('cpu_pinning_policy') == False:
939 self._log.debug("Matching cpu_pinning_policy failed. Not available in flavor")
940 return False
941 else:
942 if required.cpu_pinning_policy != available.cpu_pinning_policy:
943 self._log.debug("Matching cpu_pinning_policy failed. Required: %s, Available: %s", required.cpu_pinning_policy, available.cpu_pinning_policy)
944 return False
945 elif available.has_field('cpu_pinning_policy'):
946 self._log.debug("Rejecting available flavor because cpu_pinning_policy not required but available")
947 return False
948
949 if required.has_field('cpu_thread_pinning_policy'):
950 self._log.debug("Matching cpu_thread_pinning_policy")
951 if available.has_field('cpu_thread_pinning_policy') == False:
952 self._log.debug("Matching cpu_thread_pinning_policy failed. Not available in flavor")
953 return False
954 else:
955 if required.cpu_thread_pinning_policy != available.cpu_thread_pinning_policy:
956 self._log.debug("Matching cpu_thread_pinning_policy failed. Required: %s, Available: %s", required.cpu_thread_pinning_policy, available.cpu_thread_pinning_policy)
957 return False
958 elif available.has_field('cpu_thread_pinning_policy'):
959 self._log.debug("Rejecting available flavor because cpu_thread_pinning_policy not required but available")
960 return False
961
962 if required.has_field('trusted_execution'):
963 self._log.debug("Matching trusted_execution")
964 if required.trusted_execution == True:
965 if available.has_field('trusted_execution') == False:
966 self._log.debug("Matching trusted_execution failed. Not available in flavor")
967 return False
968 else:
969 if required.trusted_execution != available.trusted_execution:
970 self._log.debug("Matching trusted_execution failed. Required: %s, Available: %s", required.trusted_execution, available.trusted_execution)
971 return False
972 elif available.has_field('trusted_execution'):
973 self._log.debug("Rejecting available flavor because trusted_execution not required but available")
974 return False
975
976 if required.has_field('numa_node_policy'):
977 self._log.debug("Matching numa_node_policy")
978 if available.has_field('numa_node_policy') == False:
979 self._log.debug("Matching numa_node_policy failed. Not available in flavor")
980 return False
981 else:
982 if required.numa_node_policy.has_field('node_cnt'):
983 self._log.debug("Matching numa_node_policy node_cnt")
984 if available.numa_node_policy.has_field('node_cnt') == False:
985 self._log.debug("Matching numa_node_policy node_cnt failed. Not available in flavor")
986 return False
987 else:
988 if required.numa_node_policy.node_cnt != available.numa_node_policy.node_cnt:
989 self._log.debug("Matching numa_node_policy node_cnt failed. Required: %s, Available: %s",required.numa_node_policy.node_cnt, available.numa_node_policy.node_cnt)
990 return False
991 elif available.numa_node_policy.has_field('node_cnt'):
992 self._log.debug("Rejecting available flavor because numa node count not required but available")
993 return False
994
995 if required.numa_node_policy.has_field('mem_policy'):
996 self._log.debug("Matching numa_node_policy mem_policy")
997 if available.numa_node_policy.has_field('mem_policy') == False:
998 self._log.debug("Matching numa_node_policy mem_policy failed. Not available in flavor")
999 return False
1000 else:
1001 if required.numa_node_policy.mem_policy != available.numa_node_policy.mem_policy:
1002 self._log.debug("Matching numa_node_policy mem_policy failed. Required: %s, Available: %s", required.numa_node_policy.mem_policy, available.numa_node_policy.mem_policy)
1003 return False
1004 elif available.numa_node_policy.has_field('mem_policy'):
1005 self._log.debug("Rejecting available flavor because num node mem_policy not required but available")
1006 return False
1007
1008 if required.numa_node_policy.has_field('node'):
1009 self._log.debug("Matching numa_node_policy nodes configuration")
1010 if available.numa_node_policy.has_field('node') == False:
1011 self._log.debug("Matching numa_node_policy nodes configuration failed. Not available in flavor")
1012 return False
1013 for required_node in required.numa_node_policy.node:
1014 self._log.debug("Matching numa_node_policy nodes configuration for node %s", required_node)
1015 numa_match = False
1016 for available_node in available.numa_node_policy.node:
1017 if required_node.id != available_node.id:
1018 self._log.debug("Matching numa_node_policy nodes configuration failed. Required: %s, Available: %s", required_node, available_node)
1019 continue
1020 if required_node.vcpu != available_node.vcpu:
1021 self._log.debug("Matching numa_node_policy nodes configuration failed. Required: %s, Available: %s", required_node, available_node)
1022 continue
1023 if required_node.memory_mb != available_node.memory_mb:
1024 self._log.debug("Matching numa_node_policy nodes configuration failed. Required: %s, Available: %s", required_node, available_node)
1025 continue
1026 numa_match = True
1027 if numa_match == False:
1028 return False
1029 elif available.numa_node_policy.has_field('node'):
1030 self._log.debug("Rejecting available flavor because numa nodes not required but available")
1031 return False
1032 elif available.has_field('numa_node_policy'):
1033 self._log.debug("Rejecting available flavor because numa_node_policy not required but available")
1034 return False
1035 self._log.info("Successful match for Guest EPA attributes")
1036 return True
1037
1038 def _match_vswitch_epa(self, required, available):
1039 self._log.debug("VSwitch EPA match found")
1040 return True
1041
1042 def _match_hypervisor_epa(self, required, available):
1043 self._log.debug("Hypervisor EPA match found")
1044 return True
1045
1046 def _match_host_epa(self, required, available):
1047 self._log.info("Matching Host EPA attributes")
1048 if required.has_field('cpu_model'):
1049 self._log.debug("Matching CPU model")
1050 if available.has_field('cpu_model') == False:
1051 self._log.debug("Matching CPU model failed. Not available in flavor")
1052 return False
1053 else:
1054 #### Convert all PREFER to REQUIRE since flavor will only have REQUIRE attributes
1055 if required.cpu_model.replace('PREFER', 'REQUIRE') != available.cpu_model:
1056 self._log.debug("Matching CPU model failed. Required: %s, Available: %s", required.cpu_model, available.cpu_model)
1057 return False
1058 elif available.has_field('cpu_model'):
1059 self._log.debug("Rejecting available flavor because cpu_model not required but available")
1060 return False
1061
1062 if required.has_field('cpu_arch'):
1063 self._log.debug("Matching CPU architecture")
1064 if available.has_field('cpu_arch') == False:
1065 self._log.debug("Matching CPU architecture failed. Not available in flavor")
1066 return False
1067 else:
1068 #### Convert all PREFER to REQUIRE since flavor will only have REQUIRE attributes
1069 if required.cpu_arch.replace('PREFER', 'REQUIRE') != available.cpu_arch:
1070 self._log.debug("Matching CPU architecture failed. Required: %s, Available: %s", required.cpu_arch, available.cpu_arch)
1071 return False
1072 elif available.has_field('cpu_arch'):
1073 self._log.debug("Rejecting available flavor because cpu_arch not required but available")
1074 return False
1075
1076 if required.has_field('cpu_vendor'):
1077 self._log.debug("Matching CPU vendor")
1078 if available.has_field('cpu_vendor') == False:
1079 self._log.debug("Matching CPU vendor failed. Not available in flavor")
1080 return False
1081 else:
1082 #### Convert all PREFER to REQUIRE since flavor will only have REQUIRE attributes
1083 if required.cpu_vendor.replace('PREFER', 'REQUIRE') != available.cpu_vendor:
1084 self._log.debug("Matching CPU vendor failed. Required: %s, Available: %s", required.cpu_vendor, available.cpu_vendor)
1085 return False
1086 elif available.has_field('cpu_vendor'):
1087 self._log.debug("Rejecting available flavor because cpu_vendor not required but available")
1088 return False
1089
1090 if required.has_field('cpu_socket_count'):
1091 self._log.debug("Matching CPU socket count")
1092 if available.has_field('cpu_socket_count') == False:
1093 self._log.debug("Matching CPU socket count failed. Not available in flavor")
1094 return False
1095 else:
1096 if required.cpu_socket_count != available.cpu_socket_count:
1097 self._log.debug("Matching CPU socket count failed. Required: %s, Available: %s", required.cpu_socket_count, available.cpu_socket_count)
1098 return False
1099 elif available.has_field('cpu_socket_count'):
1100 self._log.debug("Rejecting available flavor because cpu_socket_count not required but available")
1101 return False
1102
1103 if required.has_field('cpu_core_count'):
1104 self._log.debug("Matching CPU core count")
1105 if available.has_field('cpu_core_count') == False:
1106 self._log.debug("Matching CPU core count failed. Not available in flavor")
1107 return False
1108 else:
1109 if required.cpu_core_count != available.cpu_core_count:
1110 self._log.debug("Matching CPU core count failed. Required: %s, Available: %s", required.cpu_core_count, available.cpu_core_count)
1111 return False
1112 elif available.has_field('cpu_core_count'):
1113 self._log.debug("Rejecting available flavor because cpu_core_count not required but available")
1114 return False
1115
1116 if required.has_field('cpu_core_thread_count'):
1117 self._log.debug("Matching CPU core thread count")
1118 if available.has_field('cpu_core_thread_count') == False:
1119 self._log.debug("Matching CPU core thread count failed. Not available in flavor")
1120 return False
1121 else:
1122 if required.cpu_core_thread_count != available.cpu_core_thread_count:
1123 self._log.debug("Matching CPU core thread count failed. Required: %s, Available: %s", required.cpu_core_thread_count, available.cpu_core_thread_count)
1124 return False
1125 elif available.has_field('cpu_core_thread_count'):
1126 self._log.debug("Rejecting available flavor because cpu_core_thread_count not required but available")
1127 return False
1128
1129 if required.has_field('cpu_feature'):
1130 self._log.debug("Matching CPU feature list")
1131 if available.has_field('cpu_feature') == False:
1132 self._log.debug("Matching CPU feature list failed. Not available in flavor")
1133 return False
1134 else:
1135 for feature in required.cpu_feature:
1136 if feature not in available.cpu_feature:
1137 self._log.debug("Matching CPU feature list failed. Required feature: %s is not present. Available features: %s", feature, available.cpu_feature)
1138 return False
1139 elif available.has_field('cpu_feature'):
1140 self._log.debug("Rejecting available flavor because cpu_feature not required but available")
1141 return False
1142 self._log.info("Successful match for Host EPA attributes")
1143 return True
1144
1145
1146 def _match_placement_group_inputs(self, required, available):
1147 self._log.info("Matching Host aggregate attributes")
1148
1149 if not required and not available:
1150 # Host aggregate not required and not available => success
1151 self._log.info("Successful match for Host Aggregate attributes")
1152 return True
1153 if required and available:
1154 # Host aggregate requested and available => Do a match and decide
1155 xx = [ x.as_dict() for x in required ]
1156 yy = [ y.as_dict() for y in available ]
1157 for i in xx:
1158 if i not in yy:
1159 self._log.debug("Rejecting available flavor because host Aggregate mismatch. Required: %s, Available: %s ", required, available)
1160 return False
1161 self._log.info("Successful match for Host Aggregate attributes")
1162 return True
1163 else:
1164 # Either of following conditions => Failure
1165 # - Host aggregate required but not available
1166 # - Host aggregate not required but available
1167 self._log.debug("Rejecting available flavor because host Aggregate mismatch. Required: %s, Available: %s ", required, available)
1168 return False
1169
1170
1171 def match_image_params(self, resource_info, request_params):
1172 return True
1173
1174 def match_epa_params(self, resource_info, request_params):
1175 result = self._match_vm_flavor(getattr(request_params, 'vm_flavor'),
1176 getattr(resource_info, 'vm_flavor'))
1177 if result == False:
1178 self._log.debug("VM Flavor mismatched")
1179 return False
1180
1181 result = self._match_guest_epa(getattr(request_params, 'guest_epa'),
1182 getattr(resource_info, 'guest_epa'))
1183 if result == False:
1184 self._log.debug("Guest EPA mismatched")
1185 return False
1186
1187 result = self._match_vswitch_epa(getattr(request_params, 'vswitch_epa'),
1188 getattr(resource_info, 'vswitch_epa'))
1189 if result == False:
1190 self._log.debug("Vswitch EPA mismatched")
1191 return False
1192
1193 result = self._match_hypervisor_epa(getattr(request_params, 'hypervisor_epa'),
1194 getattr(resource_info, 'hypervisor_epa'))
1195 if result == False:
1196 self._log.debug("Hypervisor EPA mismatched")
1197 return False
1198
1199 result = self._match_host_epa(getattr(request_params, 'host_epa'),
1200 getattr(resource_info, 'host_epa'))
1201 if result == False:
1202 self._log.debug("Host EPA mismatched")
1203 return False
1204
1205 result = self._match_placement_group_inputs(getattr(request_params, 'host_aggregate'),
1206 getattr(resource_info, 'host_aggregate'))
1207
1208 if result == False:
1209 self._log.debug("Host Aggregate mismatched")
1210 return False
1211
1212 return True
1213
1214 @asyncio.coroutine
1215 def initialize_resource_in_cal(self, resource, request):
1216 self._log.info("Initializing the compute-resource with id: %s in RW.CAL", resource.resource_id)
1217 modify_params = RwcalYang.VDUModifyParams()
1218 modify_params.vdu_id = resource.resource_id
1219 modify_params.image_id = request.image_id
1220
1221 for c_point in request.connection_points:
1222 self._log.debug("Adding connection point for VDU: %s to virtual-compute with id: %s Connection point Name: %s",
1223 request.name,resource.resource_id,c_point.name)
1224 point = modify_params.connection_points_add.add()
1225 point.name = c_point.name
1226 point.virtual_link_id = c_point.virtual_link_id
1227 yield from self._cal.modify_virtual_compute(modify_params)
1228
1229 @asyncio.coroutine
1230 def uninitialize_resource_in_cal(self, resource):
1231 self._log.info("Un-initializing the compute-resource with id: %s in RW.CAL", resource.resource_id)
1232 modify_params = RwcalYang.VDUModifyParams()
1233 modify_params.vdu_id = resource.resource_id
1234 resource_info = yield from self.get_resource_info(resource)
1235 for c_point in resource_info.connection_points:
1236 self._log.debug("Removing connection point: %s from VDU: %s ",
1237 c_point.name,resource_info.name)
1238 point = modify_params.connection_points_remove.add()
1239 point.connection_point_id = c_point.connection_point_id
1240 yield from self._cal.modify_virtual_compute(modify_params)
1241
1242
1243 class ResourceMgrCore(object):
1244 def __init__(self, dts, log, log_hdl, loop, parent):
1245 self._log = log
1246 self._log_hdl = log_hdl
1247 self._dts = dts
1248 self._loop = loop
1249 self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
1250 self._parent = parent
1251 self._cloud_cals = {}
1252 # Dictionary of pool objects keyed by name
1253 self._cloud_pool_table = {}
1254 # Dictionary of tuples (resource_id, cloud_account_name, pool_name) keyed by event_id
1255 self._resource_table = {}
1256 self._pool_class = {'compute': ComputePool,
1257 'network': NetworkPool}
1258
1259 def _get_cloud_pool_table(self, cloud_account_name):
1260 if cloud_account_name not in self._cloud_pool_table:
1261 msg = "Cloud account %s not found" % cloud_account_name
1262 self._log.error(msg)
1263 raise ResMgrCloudAccountNotFound(msg)
1264
1265 return self._cloud_pool_table[cloud_account_name]
1266
1267 def _get_cloud_cal_plugin(self, cloud_account_name):
1268 if cloud_account_name not in self._cloud_cals:
1269 msg = "Cloud account %s not found" % cloud_account_name
1270 self._log.error(msg)
1271 raise ResMgrCloudAccountNotFound(msg)
1272
1273 return self._cloud_cals[cloud_account_name]
1274
1275 def _add_default_cloud_pools(self, cloud_account_name):
1276 self._log.debug("Adding default compute and network pools for cloud account %s",
1277 cloud_account_name)
1278 default_pools = [
1279 {
1280 'name': '____default_compute_pool',
1281 'resource_type': 'compute',
1282 'pool_type': 'dynamic',
1283 'max_size': 128,
1284 },
1285 {
1286 'name': '____default_network_pool',
1287 'resource_type': 'network',
1288 'pool_type': 'dynamic',
1289 'max_size': 128,
1290 },
1291 ]
1292
1293 for pool_dict in default_pools:
1294 pool_info = ResourcePoolInfo.from_dict(pool_dict)
1295 self._log.info("Applying configuration for cloud account %s pool: %s",
1296 cloud_account_name, pool_info.name)
1297
1298 self.add_resource_pool(cloud_account_name, pool_info)
1299 self.unlock_resource_pool(cloud_account_name, pool_info.name)
1300
1301 def get_cloud_account_names(self):
1302 """ Returns a list of configured cloud account names """
1303 return self._cloud_cals.keys()
1304
1305 def add_cloud_account(self, account):
1306 self._log.debug("Received CAL account. Account Name: %s, Account Type: %s",
1307 account.name, account.account_type)
1308
1309 ### Add cal handler to all the pools
1310 if account.name in self._cloud_cals:
1311 raise ResMgrCloudAccountExists("Cloud account already exists in res mgr: %s",
1312 account.name)
1313
1314 self._cloud_pool_table[account.name] = {}
1315
1316 cal = ResourceMgrCALHandler(self._loop, self._executor, self._log, self._log_hdl, account)
1317 self._cloud_cals[account.name] = cal
1318
1319 self._add_default_cloud_pools(account.name)
1320
1321 def update_cloud_account(self, account):
1322 raise NotImplementedError("Update cloud account not implemented")
1323
1324 def delete_cloud_account(self, account_name, dry_run=False):
1325 cloud_pool_table = self._get_cloud_pool_table(account_name)
1326 for pool in cloud_pool_table.values():
1327 if pool.in_use():
1328 raise ResMgrCloudAccountInUse("Cannot delete cloud which is currently in use")
1329
1330 # If dry_run is specified, do not actually delete the cloud account
1331 if dry_run:
1332 return
1333
1334 for pool in list(cloud_pool_table):
1335 self.delete_resource_pool(account_name, pool)
1336
1337 del self._cloud_pool_table[account_name]
1338 del self._cloud_cals[account_name]
1339
1340 def add_resource_pool(self, cloud_account_name, pool_info):
1341 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1342 if pool_info.name in cloud_pool_table:
1343 raise ResMgrDuplicatePool("Pool with name: %s already exists", pool_info.name)
1344
1345 cloud_cal = self._get_cloud_cal_plugin(cloud_account_name)
1346 pool = self._pool_class[pool_info.resource_type](self._log, self._loop, pool_info, cloud_cal)
1347
1348 cloud_pool_table[pool_info.name] = pool
1349
1350 def delete_resource_pool(self, cloud_account_name, pool_name):
1351 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1352 if pool_name not in cloud_pool_table:
1353 self._log.error("Pool: %s not found for deletion", pool_name)
1354 return
1355 pool = cloud_pool_table[pool_name]
1356
1357 if pool.in_use():
1358 # Can't delete a pool in use
1359 self._log.error("Pool: %s in use. Can not delete in-use pool", pool.name)
1360 return
1361
1362 pool.cleanup()
1363 del cloud_pool_table[pool_name]
1364 self._log.info("Resource Pool: %s successfully deleted", pool_name)
1365
1366 def modify_resource_pool(self, cloud_account_name, pool):
1367 pass
1368
1369 def lock_resource_pool(self, cloud_account_name, pool_name):
1370 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1371 if pool_name not in cloud_pool_table:
1372 self._log.info("Pool: %s is not available for lock operation")
1373 return
1374
1375 pool = cloud_pool_table[pool_name]
1376 pool.lock_pool()
1377
1378 def unlock_resource_pool(self, cloud_account_name, pool_name):
1379 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1380 if pool_name not in cloud_pool_table:
1381 self._log.info("Pool: %s is not available for unlock operation")
1382 return
1383
1384 pool = cloud_pool_table[pool_name]
1385 pool.unlock_pool()
1386
1387 def get_resource_pool_info(self, cloud_account_name, pool_name):
1388 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1389 if pool_name in cloud_pool_table:
1390 pool = cloud_pool_table[pool_name]
1391 return pool.get_pool_info()
1392 else:
1393 return None
1394
1395 def get_resource_pool_list(self, cloud_account_name):
1396 return [v for _, v in self._get_cloud_pool_table(cloud_account_name).items()]
1397
1398 def _select_resource_pools(self, cloud_account_name, resource_type):
1399 pools = [pool for pool in self.get_resource_pool_list(cloud_account_name) if pool.resource_type == resource_type and pool.status == 'unlocked']
1400 if not pools:
1401 raise ResMgrPoolNotAvailable("No %s pool found for resource allocation", resource_type)
1402
1403 return pools[0]
1404
1405 @asyncio.coroutine
1406 def allocate_virtual_resource(self, event_id, cloud_account_name, request, resource_type):
1407 ### Check if event_id is unique or already in use
1408 if event_id in self._resource_table:
1409 r_id, cloud_account_name, pool_name = self._resource_table[event_id]
1410 self._log.warning("Requested event-id :%s for resource-allocation already active with pool: %s",
1411 event_id, pool_name)
1412 # If resource-type matches then return the same resource
1413 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1414 pool = cloud_pool_table[pool_name]
1415 if pool.resource_type == resource_type:
1416
1417 info = yield from pool.read_resource_info(r_id)
1418 return info
1419 else:
1420 self._log.error("Event-id conflict. Duplicate event-id: %s", event_id)
1421 raise ResMgrDuplicateEventId("Requested event-id :%s already active with pool: %s" %(event_id, pool_name))
1422
1423 ### All-OK, lets go ahead with resource allocation
1424 pool = self._select_resource_pools(cloud_account_name, resource_type)
1425 self._log.info("Selected pool %s for resource allocation", pool.name)
1426
1427 r_id, r_info = yield from pool.allocate_resource(request)
1428
1429 self._resource_table[event_id] = (r_id, cloud_account_name, pool.name)
1430 return r_info
1431
1432 @asyncio.coroutine
1433 def reallocate_virtual_resource(self, event_id, cloud_account_name, request, resource_type, resource):
1434 ### Check if event_id is unique or already in use
1435 if event_id in self._resource_table:
1436 r_id, cloud_account_name, pool_name = self._resource_table[event_id]
1437 self._log.warning("Requested event-id :%s for resource-allocation already active with pool: %s",
1438 event_id, pool_name)
1439 # If resource-type matches then return the same resource
1440 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1441 pool = cloud_pool_table[pool_name]
1442 if pool.resource_type == resource_type:
1443 info = yield from pool.read_resource_info(r_id)
1444 return info
1445 else:
1446 self._log.error("Event-id conflict. Duplicate event-id: %s", event_id)
1447 raise ResMgrDuplicateEventId("Requested event-id :%s already active with pool: %s" %(event_id, pool_name))
1448
1449 self._log.debug("Re-allocate virtual resource. resource type %s", resource_type)
1450 r_info = None
1451 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1452 pool = cloud_pool_table[resource.pool_name]
1453 if pool.resource_type == resource_type:
1454 if resource_type == 'network':
1455 r_id = resource.virtual_link_id
1456 r_info = yield from pool.get_info_by_id(resource.virtual_link_id)
1457 elif resource_type == 'compute':
1458 r_id = resource.vdu_id
1459 r_info = yield from pool.get_info_by_id(resource.vdu_id)
1460
1461 if r_info is None:
1462 r_id, r_info = yield from pool.allocate_resource(request)
1463 self._resource_table[event_id] = (r_id, cloud_account_name, resource.pool_name)
1464 return r_info
1465
1466 self._resource_table[event_id] = (r_id, cloud_account_name, resource.pool_name)
1467 new_resource = pool._resource_class(r_id, 'dynamic', request)
1468 if resource_type == 'compute':
1469 requested_params = RwcalYang.VDUInitParams()
1470 requested_params.from_dict(request.as_dict())
1471 new_resource.requested_params = requested_params
1472 pool._all_resources[r_id] = new_resource
1473 pool._allocated_resources[r_id] = new_resource
1474 return r_info
1475
1476 @asyncio.coroutine
1477 def release_virtual_resource(self, event_id, resource_type):
1478 ### Check if event_id exists
1479 if event_id not in self._resource_table:
1480 self._log.error("Received resource-release-request with unknown Event-id :%s", event_id)
1481 raise ResMgrUnknownEventId("Received resource-release-request with unknown Event-id :%s" %(event_id))
1482
1483 ## All-OK, lets proceed with resource release
1484 r_id, cloud_account_name, pool_name = self._resource_table.pop(event_id)
1485 self._log.debug("Attempting to release virtual resource id %s from pool %s",
1486 r_id, pool_name)
1487
1488 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1489 pool = cloud_pool_table[pool_name]
1490 yield from pool.release_resource(r_id)
1491
1492 @asyncio.coroutine
1493 def read_virtual_resource(self, event_id, resource_type):
1494 ### Check if event_id exists
1495 if event_id not in self._resource_table:
1496 self._log.error("Received resource-read-request with unknown Event-id :%s", event_id)
1497 raise ResMgrUnknownEventId("Received resource-read-request with unknown Event-id :%s" %(event_id))
1498
1499 ## All-OK, lets proceed
1500 r_id, cloud_account_name, pool_name = self._resource_table[event_id]
1501 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1502 pool = cloud_pool_table[pool_name]
1503 info = yield from pool.read_resource_info(r_id)
1504 return info