Merge branch 'v1.1'
[osm/SO.git] / rwlaunchpad / plugins / rwresmgr / rift / tasklets / rwresmgrtasklet / rwresmgr_core.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import uuid
18 import collections
19 import asyncio
20 import concurrent.futures
21
22 import gi
23 gi.require_version('RwDts', '1.0')
24 gi.require_version('RwYang', '1.0')
25 gi.require_version('RwResourceMgrYang', '1.0')
26 gi.require_version('RwLaunchpadYang', '1.0')
27 gi.require_version('RwcalYang', '1.0')
28 from gi.repository import (
29 RwDts as rwdts,
30 RwYang,
31 RwResourceMgrYang,
32 RwLaunchpadYang,
33 RwcalYang,
34 )
35
36 from gi.repository.RwTypes import RwStatus
37
38 class ResMgrCALNotPresent(Exception):
39 pass
40
41 class ResMgrCloudAccountNotFound(Exception):
42 pass
43
44 class ResMgrCloudAccountExists(Exception):
45 pass
46
47 class ResMgrCloudAccountInUse(Exception):
48 pass
49
50 class ResMgrDuplicatePool(Exception):
51 pass
52
53 class ResMgrPoolNotAvailable(Exception):
54 pass
55
56 class ResMgrPoolOperationFailed(Exception):
57 pass
58
59 class ResMgrDuplicateEventId(Exception):
60 pass
61
62 class ResMgrUnknownEventId(Exception):
63 pass
64
65 class ResMgrUnknownResourceId(Exception):
66 pass
67
68 class ResMgrResourceIdBusy(Exception):
69 pass
70
71 class ResMgrResourceIdNotAllocated(Exception):
72 pass
73
74 class ResMgrNoResourcesAvailable(Exception):
75 pass
76
77 class ResMgrResourcesInitFailed(Exception):
78 pass
79
80 class ResMgrCALOperationFailure(Exception):
81 pass
82
83
84
85 class ResourceMgrCALHandler(object):
86 def __init__(self, loop, executor, log, log_hdl, account):
87 self._log = log
88 self._loop = loop
89 self._executor = executor
90 self._account = account.cal_account_msg
91 self._rwcal = account.cal
92 if account.account_type == 'aws':
93 self._subnets = ["172.31.97.0/24", "172.31.98.0/24", "172.31.99.0/24", "172.31.100.0/24", "172.31.101.0/24"]
94 else:
95 self._subnets = ["11.0.0.0/24",
96 "12.0.0.0/24",
97 "13.0.0.0/24",
98 "14.0.0.0/24",
99 "15.0.0.0/24",
100 "16.0.0.0/24",
101 "17.0.0.0/24",
102 "18.0.0.0/24",
103 "19.0.0.0/24",
104 "20.0.0.0/24",
105 "21.0.0.0/24",
106 "22.0.0.0/24",]
107 self._subnet_ptr = 0
108
109 def _select_link_subnet(self):
110 subnet = self._subnets[self._subnet_ptr]
111 self._subnet_ptr += 1
112 if self._subnet_ptr == len(self._subnets):
113 self._subnet_ptr = 0
114 return subnet
115
116 @asyncio.coroutine
117 def create_virtual_network(self, req_params):
118 #rc, rsp = self._rwcal.get_virtual_link_list(self._account)
119 self._log.debug("Calling get_virtual_link_list API")
120 rc, rsp = yield from self._loop.run_in_executor(self._executor,
121 self._rwcal.get_virtual_link_list,
122 self._account)
123
124 assert rc == RwStatus.SUCCESS
125
126 links = [vlink for vlink in rsp.virtual_link_info_list if vlink.name == req_params.name]
127 if links:
128 self._log.debug("Found existing virtual-network with matching name in cloud. Reusing the virtual-network with id: %s" %(links[0].virtual_link_id))
129 return ('precreated', links[0].virtual_link_id)
130 elif req_params.vim_network_name:
131 self._log.error("Virtual-network-allocate operation failed for cloud account: %s Vim Network with name %s does not pre-exist",
132 self._account.name, req_params.vim_network_name)
133 raise ResMgrCALOperationFailure("Virtual-network allocate operation failed for cloud account: %s Vim Network with name %s does not pre-exist"
134 %(self._account.name, req_params.vim_network_name))
135
136 params = RwcalYang.VirtualLinkReqParams()
137 params.from_dict(req_params.as_dict())
138 params.subnet = self._select_link_subnet()
139 #rc, rs = self._rwcal.create_virtual_link(self._account, params)
140 self._log.debug("Calling create_virtual_link API with params: %s" %(str(req_params)))
141 rc, rs = yield from self._loop.run_in_executor(self._executor,
142 self._rwcal.create_virtual_link,
143 self._account,
144 params)
145 if rc.status != RwStatus.SUCCESS:
146 self._log.error("Virtual-network-allocate operation failed for cloud account: %s - error_msg: %s, Traceback: %s",
147 self._account.name, rc.error_msg, rc.traceback)
148 raise ResMgrCALOperationFailure("Virtual-network allocate operation failed for cloud account: %s (%s)"
149 %(self._account.name, rc.error_msg))
150
151 return ('dynamic',rs)
152
153 @asyncio.coroutine
154 def delete_virtual_network(self, network_id):
155 #rc = self._rwcal.delete_virtual_link(self._account, network_id)
156 self._log.debug("Calling delete_virtual_link API with id: %s" %(network_id))
157 rc = yield from self._loop.run_in_executor(self._executor,
158 self._rwcal.delete_virtual_link,
159 self._account,
160 network_id)
161 if rc != RwStatus.SUCCESS:
162 self._log.error("Virtual-network-release operation failed for cloud account: %s. ResourceID: %s",
163 self._account.name,
164 network_id)
165 raise ResMgrCALOperationFailure("Virtual-network release operation failed for cloud account: %s. ResourceId: %s" %(self._account.name, network_id))
166
167 @asyncio.coroutine
168 def get_virtual_network_info(self, network_id):
169 #rc, rs = self._rwcal.get_virtual_link(self._account, network_id)
170 self._log.debug("Calling get_virtual_link_info API with id: %s" %(network_id))
171 rc, rs = yield from self._loop.run_in_executor(self._executor,
172 self._rwcal.get_virtual_link,
173 self._account,
174 network_id)
175 if rc != RwStatus.SUCCESS:
176 self._log.error("Virtual-network-info operation failed for cloud account: %s. ResourceID: %s",
177 self._account.name,
178 network_id)
179 raise ResMgrCALOperationFailure("Virtual-network-info operation failed for cloud account: %s. ResourceID: %s" %(self._account.name, network_id))
180 return rs
181
182 @asyncio.coroutine
183 def create_virtual_compute(self, req_params):
184 #rc, rsp = self._rwcal.get_vdu_list(self._account)
185 self._log.debug("Calling get_vdu_list API")
186
187 rc, rsp = yield from self._loop.run_in_executor(self._executor,
188 self._rwcal.get_vdu_list,
189 self._account)
190 assert rc == RwStatus.SUCCESS
191 vdus = [vm for vm in rsp.vdu_info_list if vm.name == req_params.name]
192 if vdus:
193 self._log.debug("Found existing virtual-compute with matching name in cloud. Reusing the virtual-compute element with id: %s" %(vdus[0].vdu_id))
194 return vdus[0].vdu_id
195
196 params = RwcalYang.VDUInitParams()
197 params.from_dict(req_params.as_dict())
198
199 if 'image_name' in req_params:
200 image_checksum = req_params.image_checksum if req_params.has_field("image_checksum") else None
201 params.image_id = yield from self.get_image_id_from_image_info(req_params.image_name, image_checksum)
202
203 #rc, rs = self._rwcal.create_vdu(self._account, params)
204 self._log.debug("Calling create_vdu API with params %s" %(str(params)))
205 rc, rs = yield from self._loop.run_in_executor(self._executor,
206 self._rwcal.create_vdu,
207 self._account,
208 params)
209
210 if rc.status != RwStatus.SUCCESS:
211 self._log.error("Virtual-compute-create operation failed for cloud account: %s - error_msg: %s, Traceback: %s",
212 self._account.name, rc.error_msg, rc.traceback)
213 raise ResMgrCALOperationFailure("Virtual-compute-create operation failed for cloud account: %s (%s)"
214 %(self._account.name, rc.error_msg))
215
216 return rs
217
218 @asyncio.coroutine
219 def modify_virtual_compute(self, req_params):
220 #rc = self._rwcal.modify_vdu(self._account, req_params)
221 self._log.debug("Calling modify_vdu API with params: %s" %(str(req_params)))
222 rc = yield from self._loop.run_in_executor(self._executor,
223 self._rwcal.modify_vdu,
224 self._account,
225 req_params)
226 if rc != RwStatus.SUCCESS:
227 self._log.error("Virtual-compute-modify operation failed for cloud account: %s", self._account.name)
228 raise ResMgrCALOperationFailure("Virtual-compute-modify operation failed for cloud account: %s" %(self._account.name))
229
230 @asyncio.coroutine
231 def delete_virtual_compute(self, compute_id):
232 #rc = self._rwcal.delete_vdu(self._account, compute_id)
233 self._log.debug("Calling delete_vdu API with id: %s" %(compute_id))
234 rc = yield from self._loop.run_in_executor(self._executor,
235 self._rwcal.delete_vdu,
236 self._account,
237 compute_id)
238 if rc != RwStatus.SUCCESS:
239 self._log.error("Virtual-compute-release operation failed for cloud account: %s. ResourceID: %s",
240 self._account.name,
241 compute_id)
242 raise ResMgrCALOperationFailure("Virtual-compute-release operation failed for cloud account: %s. ResourceID: %s" %(self._account.name, compute_id))
243
244 @asyncio.coroutine
245 def get_virtual_compute_info(self, compute_id):
246 #rc, rs = self._rwcal.get_vdu(self._account, compute_id)
247 self._log.debug("Calling get_vdu API with id: %s" %(compute_id))
248 rc, rs = yield from self._loop.run_in_executor(self._executor,
249 self._rwcal.get_vdu,
250 self._account,
251 compute_id)
252 if rc != RwStatus.SUCCESS:
253 self._log.error("Virtual-compute-info operation failed for cloud account: %s. ResourceID: %s",
254 self._account.name,
255 compute_id)
256 raise ResMgrCALOperationFailure("Virtual-compute-info operation failed for cloud account: %s. ResourceID: %s" %(self._account.name, compute_id))
257 return rs
258
259 @asyncio.coroutine
260 def get_compute_flavor_info_list(self):
261 #rc, rs = self._rwcal.get_flavor_list(self._account)
262 self._log.debug("Calling get_flavor_list API")
263 rc, rs = yield from self._loop.run_in_executor(self._executor,
264 self._rwcal.get_flavor_list,
265 self._account)
266 if rc != RwStatus.SUCCESS:
267 self._log.error("Get-flavor-info-list operation failed for cloud account: %s",
268 self._account.name)
269 raise ResMgrCALOperationFailure("Get-flavor-info-list operation failed for cloud account: %s" %(self._account.name))
270 return rs.flavorinfo_list
271
272 @asyncio.coroutine
273 def create_compute_flavor(self, request):
274 flavor = RwcalYang.FlavorInfoItem()
275 flavor.name = str(uuid.uuid4())
276 epa_types = ['vm_flavor', 'guest_epa', 'host_epa', 'host_aggregate']
277 epa_dict = {k: v for k, v in request.as_dict().items() if k in epa_types}
278 flavor.from_dict(epa_dict)
279
280 self._log.info("Creating flavor: %s", flavor)
281 #rc, rs = self._rwcal.create_flavor(self._account, flavor)
282 self._log.debug("Calling create_flavor API")
283 rc, rs = yield from self._loop.run_in_executor(self._executor,
284 self._rwcal.create_flavor,
285 self._account,
286 flavor)
287 if rc != RwStatus.SUCCESS:
288 self._log.error("Create-flavor operation failed for cloud account: %s",
289 self._account.name)
290 raise ResMgrCALOperationFailure("Create-flavor operation failed for cloud account: %s" %(self._account.name))
291 return rs
292
293 @asyncio.coroutine
294 def get_image_info_list(self):
295 #rc, rs = self._rwcal.get_image_list(self._account)
296 self._log.debug("Calling get_image_list API")
297 rc, rs = yield from self._loop.run_in_executor(self._executor,
298 self._rwcal.get_image_list,
299 self._account)
300 if rc != RwStatus.SUCCESS:
301 self._log.error("Get-image-info-list operation failed for cloud account: %s",
302 self._account.name)
303 raise ResMgrCALOperationFailure("Get-image-info-list operation failed for cloud account: %s" %(self._account.name))
304 return rs.imageinfo_list
305
306 @asyncio.coroutine
307 def get_image_id_from_image_info(self, image_name, image_checksum=None):
308 self._log.debug("Looking up image id for image name %s and checksum %s on cloud account: %s",
309 image_name, image_checksum, self._account.name
310 )
311
312 image_list = yield from self.get_image_info_list()
313 matching_images = [i for i in image_list if i.name == image_name]
314
315 # If the image checksum was filled in then further filter the images by the checksum
316 if image_checksum is not None:
317 matching_images = [i for i in matching_images if i.checksum == image_checksum]
318 else:
319 self._log.warning("Image checksum not provided. Lookup using image name (%s) only.",
320 image_name)
321
322 if len(matching_images) == 0:
323 raise ResMgrCALOperationFailure("Could not find image name {} (using checksum: {}) for cloud account: {}".format(
324 image_name, image_checksum, self._account.name
325 ))
326
327 elif len(matching_images) > 1:
328 unique_checksums = {i.checksum for i in matching_images}
329 if len(unique_checksums) > 1:
330 msg = ("Too many images with different checksums matched "
331 "image name of %s for cloud account: %s" % (image_name, self._account.name))
332 raise ResMgrCALOperationFailure(msg)
333
334 return matching_images[0].id
335
336 @asyncio.coroutine
337 def get_image_info(self, image_id):
338 #rc, rs = self._rwcal.get_image(self._account, image_id)
339 self._log.debug("Calling get_image API for id: %s" %(image_id))
340 rc, rs = yield from self._loop.run_in_executor(self._executor,
341 self._rwcal.get_image,
342 self._account,
343 image_id)
344 if rc != RwStatus.SUCCESS:
345 self._log.error("Get-image-info-list operation failed for cloud account: %s",
346 self._account.name)
347 raise ResMgrCALOperationFailure("Get-image-info operation failed for cloud account: %s" %(self._account.name))
348 return rs.imageinfo_list
349
350 def dynamic_flavor_supported(self):
351 return getattr(self._account, self._account.account_type).dynamic_flavor_support
352
353
354 class Resource(object):
355 def __init__(self, resource_id, resource_type, request):
356 self._id = resource_id
357 self._type = resource_type
358 self._request = request
359
360 @property
361 def resource_id(self):
362 return self._id
363
364 @property
365 def resource_type(self):
366 return self._type
367
368 @property
369 def request(self):
370 return self._request
371
372 def cleanup(self):
373 pass
374
375
376 class ComputeResource(Resource):
377 pass
378
379
380 class NetworkResource(Resource):
381 pass
382
383
384 class ResourcePoolInfo(object):
385 def __init__(self, name, pool_type, resource_type, max_size):
386 self.name = name
387 self.pool_type = pool_type
388 self.resource_type = resource_type
389 self.max_size = max_size
390
391 @classmethod
392 def from_dict(cls, pool_dict):
393 return cls(
394 pool_dict["name"],
395 pool_dict["pool_type"],
396 pool_dict["resource_type"],
397 pool_dict["max_size"],
398 )
399
400
401 class ResourcePool(object):
402 def __init__(self, log, loop, pool_info, resource_class, cal):
403 self._log = log
404 self._loop = loop
405 self._name = pool_info.name
406 self._pool_type = pool_info.pool_type
407 self._resource_type = pool_info.resource_type
408 self._cal = cal
409 self._resource_class = resource_class
410
411 self._max_size = pool_info.max_size
412
413 self._status = 'unlocked'
414 ### A Dictionary of all the resources in this pool, keyed by CAL resource-id
415 self._all_resources = {}
416 ### A List of free resources in this pool
417 self._free_resources = []
418 ### A Dictionary of all the allocated resources in this pool, keyed by CAL resource-id
419 self._allocated_resources = {}
420
421 @property
422 def name(self):
423 return self._name
424
425 @property
426 def cal(self):
427 """ This instance's ResourceMgrCALHandler """
428 return self._cal
429
430 @property
431 def pool_type(self):
432 return self._pool_type
433
434 @property
435 def resource_type(self):
436 return self._resource_type
437
438 @property
439 def max_size(self):
440 return self._max_size
441
442 @property
443 def status(self):
444 return self._status
445
446 def in_use(self):
447 if len(self._allocated_resources) != 0:
448 return True
449 else:
450 return False
451
452 def update_cal_handler(self, cal):
453 if self.in_use():
454 raise ResMgrPoolOperationFailed(
455 "Cannot update CAL plugin for in use pool"
456 )
457
458 self._cal = cal
459
460 def lock_pool(self):
461 self._log.info("Locking the pool :%s", self.name)
462 self._status = 'locked'
463
464 def unlock_pool(self):
465 self._log.info("Unlocking the pool :%s", self.name)
466 self._status = 'unlocked'
467
468 def add_resource(self, resource_info):
469 self._log.info("Adding static resource to Pool: %s, Resource-id: %s Resource-Type: %s",
470 self.name,
471 resource_info.resource_id,
472 self.resource_type)
473
474 ### Add static resources to pool
475 resource = self._resource_class(resource_info.resource_id, 'static')
476 assert resource.resource_id == resource_info.resource_id
477 self._all_resources[resource.resource_id] = resource
478 self._free_resources.append(resource)
479
480 def delete_resource(self, resource_id):
481 if resource_id not in self._all_resources:
482 self._log.error("Resource Id: %s not present in pool: %s. Delete operation failed", resource_id, self.name)
483 raise ResMgrUnknownResourceId("Resource Id: %s requested for release is not found" %(resource_id))
484
485 if resource_id in self._allocated_resources:
486 self._log.error("Resource Id: %s in use. Delete operation failed", resource_id)
487 raise ResMgrResourceIdBusy("Resource Id: %s requested for release is in use" %(resource_id))
488
489 self._log.info("Deleting resource: %s from pool: %s, Resource-Type",
490 resource_id,
491 self.name,
492 self.resource_type)
493
494 resource = self._all_resources.pop(resource_id)
495 self._free_resources.remove(resource)
496 resource.cleanup()
497 del resource
498
499 @asyncio.coroutine
500 def read_resource_info(self, resource_id):
501 if resource_id not in self._all_resources:
502 self._log.error("Resource Id: %s not present in pool: %s. Read operation failed", resource_id, self.name)
503 raise ResMgrUnknownResourceId("Resource Id: %s requested for read is not found" %(resource_id))
504
505 if resource_id not in self._allocated_resources:
506 self._log.error("Resource Id: %s not in use. Read operation failed", resource_id)
507 raise ResMgrResourceIdNotAllocated("Resource Id: %s not in use. Read operation failed" %(resource_id))
508
509 resource = self._allocated_resources[resource_id]
510 resource_info = yield from self.get_resource_info(resource)
511 return resource_info
512
513 def get_pool_info(self):
514 info = RwResourceMgrYang.ResourceRecordInfo()
515 self._log.info("Providing info for pool: %s", self.name)
516 info.name = self.name
517 if self.pool_type:
518 info.pool_type = self.pool_type
519 if self.resource_type:
520 info.resource_type = self.resource_type
521 if self.status:
522 info.pool_status = self.status
523
524 info.total_resources = len(self._all_resources)
525 info.free_resources = len(self._free_resources)
526 info.allocated_resources = len(self._allocated_resources)
527 return info
528
529 def cleanup(self):
530 for _, v in self._all_resources.items():
531 v.cleanup()
532
533 @asyncio.coroutine
534 def _allocate_static_resource(self, request, resource_type):
535 unit_type = {'compute': 'VDU', 'network':'VirtualLink'}
536 match_found = False
537 resource = None
538 self._log.info("Doing resource match from pool :%s", self._free_resources)
539 for resource in self._free_resources:
540 resource_info = yield from self.get_resource_info(resource)
541 self._log.info("Attempting to match %s-requirements for %s: %s with resource-id :%s",
542 resource_type, unit_type[resource_type],request.name, resource.resource_id)
543 if self.match_epa_params(resource_info, request):
544 if self.match_image_params(resource_info, request):
545 match_found = True
546 self._log.info("%s-requirements matched for %s: %s with resource-id :%s",
547 resource_type, unit_type[resource_type],request.name, resource.resource_id)
548 yield from self.initialize_resource_in_cal(resource, request)
549 break
550
551 if not match_found:
552 self._log.error("No match found for %s-requirements for %s: %s in pool: %s. %s instantiation failed",
553 resource_type,
554 unit_type[resource_type],
555 request.name,
556 self.name,
557 unit_type[resource_type])
558 return None
559 else:
560 ### Move resource from free-list into allocated-list
561 self._log.info("Allocating the static resource with resource-id: %s for %s: %s",
562 resource.resource_id,
563 unit_type[resource_type],request.name)
564 self._free_resources.remove(resource)
565 self._allocated_resources[resource.resource_id] = resource
566
567 return resource
568
569 @asyncio.coroutine
570 def allocate_resource(self, request):
571 resource = yield from self.allocate_resource_in_cal(request)
572 resource_info = yield from self.get_resource_info(resource)
573 return resource.resource_id, resource_info
574
575 @asyncio.coroutine
576 def release_resource(self, resource_id):
577 self._log.debug("Releasing resource_id %s in pool %s", resource_id, self.name)
578 if resource_id not in self._allocated_resources:
579 self._log.error("Failed to release a resource with resource-id: %s in pool: %s. Resource not known",
580 resource_id,
581 self.name)
582 raise ResMgrUnknownResourceId("Failed to release resource with resource-id: %s. Unknown resource-id" %(resource_id))
583
584 ### Get resource object
585 resource = self._allocated_resources.pop(resource_id)
586 yield from self.uninitialize_resource_in_cal(resource)
587 yield from self.release_cal_resource(resource)
588
589
590 class NetworkPool(ResourcePool):
591 def __init__(self, log, loop, pool_info, cal):
592 super(NetworkPool, self).__init__(log, loop, pool_info, NetworkResource, cal)
593
594 @asyncio.coroutine
595 def allocate_resource_in_cal(self, request):
596 resource = None
597 if self.pool_type == 'static':
598 self._log.info("Attempting network resource allocation from static pool: %s", self.name)
599 ### Attempt resource allocation from static pool
600 resource = yield from self._allocate_static_resource(request, 'network')
601 elif self.pool_type == 'dynamic':
602 ### Attempt resource allocation from dynamic pool
603 self._log.info("Attempting network resource allocation from dynamic pool: %s", self.name)
604 if len(self._free_resources) != 0:
605 self._log.info("Dynamic pool: %s has %d static resources, Attempting resource allocation from static resources",
606 self.name, len(self._free_resources))
607 resource = yield from self._allocate_static_resource(request, 'network')
608 if resource is None:
609 self._log.info("Could not resource from static resources. Going for dynamic resource allocation")
610 ## Not static resource available. Attempt dynamic resource from pool
611 resource = yield from self.allocate_dynamic_resource(request)
612 if resource is None:
613 raise ResMgrNoResourcesAvailable("No matching resource available for allocation from pool: %s" %(self.name))
614 return resource
615
616 @asyncio.coroutine
617 def allocate_dynamic_resource(self, request):
618 resource_type, resource_id = yield from self._cal.create_virtual_network(request)
619 if resource_id in self._all_resources:
620 self._log.error("Resource with id %s name %s of type %s is already used", resource_id, request.name, resource_type)
621 raise ResMgrNoResourcesAvailable("Resource with name %s of type network is already used" %(resource_id))
622 resource = self._resource_class(resource_id, resource_type, request)
623 self._all_resources[resource_id] = resource
624 self._allocated_resources[resource_id] = resource
625 self._log.info("Successfully allocated virtual-network resource from CAL with resource-id: %s", resource_id)
626 return resource
627
628 @asyncio.coroutine
629 def release_cal_resource(self, resource):
630 if resource.resource_type == 'dynamic':
631 self._log.debug("Deleting virtual network with network_id: %s", resource.resource_id)
632 yield from self._cal.delete_virtual_network(resource.resource_id)
633 self._all_resources.pop(resource.resource_id)
634 self._log.info("Successfully released virtual-network resource in CAL with resource-id: %s", resource.resource_id)
635 elif resource.resource_type == 'precreated':
636 self._all_resources.pop(resource.resource_id)
637 self._log.info("Successfully removed precreated virtual-network resource from allocated list: %s", resource.resource_id)
638 else:
639 self._log.info("Successfully released virtual-network resource with resource-id: %s into available-list", resource.resource_id)
640 self._free_resources.append(resource)
641
642 @asyncio.coroutine
643 def get_resource_info(self, resource):
644 info = yield from self._cal.get_virtual_network_info(resource.resource_id)
645 self._log.info("Successfully retrieved virtual-network information from CAL with resource-id: %s. Info: %s",
646 resource.resource_id, str(info))
647 response = RwResourceMgrYang.VirtualLinkEventData_ResourceInfo()
648 response.from_dict(info.as_dict())
649 response.pool_name = self.name
650 response.resource_state = 'active'
651 return response
652
653 @asyncio.coroutine
654 def get_info_by_id(self, resource_id):
655 info = yield from self._cal.get_virtual_network_info(resource_id)
656 self._log.info("Successfully retrieved virtual-network information from CAL with resource-id: %s. Info: %s",
657 resource_id, str(info))
658 return info
659
660 def match_image_params(self, resource_info, request_params):
661 return True
662
663 def match_epa_params(self, resource_info, request_params):
664 if not hasattr(request_params, 'provider_network'):
665 ### Its a match if nothing is requested
666 return True
667 else:
668 required = getattr(request_params, 'provider_network')
669
670 if not hasattr(resource_info, 'provider_network'):
671 ### Its no match
672 return False
673 else:
674 available = getattr(resource_info, 'provider_network')
675
676 self._log.debug("Matching Network EPA params. Required: %s, Available: %s", required, available)
677
678 if required.has_field('name') and required.name!= available.name:
679 self._log.debug("Provider Network mismatch. Required: %s, Available: %s",
680 required.name,
681 available.name)
682 return False
683
684 self._log.debug("Matching EPA params physical network name")
685
686 if required.has_field('physical_network') and required.physical_network != available.physical_network:
687 self._log.debug("Physical Network mismatch. Required: %s, Available: %s",
688 required.physical_network,
689 available.physical_network)
690 return False
691
692 self._log.debug("Matching EPA params overlay type")
693 if required.has_field('overlay_type') and required.overlay_type != available.overlay_type:
694 self._log.debug("Overlay type mismatch. Required: %s, Available: %s",
695 required.overlay_type,
696 available.overlay_type)
697 return False
698
699 self._log.debug("Matching EPA params SegmentationID")
700 if required.has_field('segmentation_id') and required.segmentation_id != available.segmentation_id:
701 self._log.debug("Segmentation-Id mismatch. Required: %s, Available: %s",
702 required.segmentation_id,
703 available.segmentation_id)
704 return False
705 return True
706
707 @asyncio.coroutine
708 def initialize_resource_in_cal(self, resource, request):
709 pass
710
711 @asyncio.coroutine
712 def uninitialize_resource_in_cal(self, resource):
713 pass
714
715
716 class ComputePool(ResourcePool):
717 def __init__(self, log, loop, pool_info, cal):
718 super(ComputePool, self).__init__(log, loop, pool_info, ComputeResource, cal)
719
720 @asyncio.coroutine
721 def allocate_resource_in_cal(self, request):
722 resource = None
723 if self.pool_type == 'static':
724 self._log.info("Attempting compute resource allocation from static pool: %s", self.name)
725 ### Attempt resource allocation from static pool
726 resource = yield from self._allocate_static_resource(request, 'compute')
727 elif self.pool_type == 'dynamic':
728 ### Attempt resource allocation from dynamic pool
729 self._log.info("Attempting compute resource allocation from dynamic pool: %s", self.name)
730 if len(self._free_resources) != 0:
731 self._log.info("Dynamic pool: %s has %d static resources, Attempting resource allocation from static resources",
732 len(self._free_resources),
733 self.name)
734 resource = yield from self._allocate_static_resource(request, 'compute')
735 if resource is None:
736 self._log.info("Attempting for dynamic resource allocation")
737 resource = yield from self.allocate_dynamic_resource(request)
738 if resource is None:
739 raise ResMgrNoResourcesAvailable("No matching resource available for allocation from pool: %s" %(self.name))
740
741 requested_params = RwcalYang.VDUInitParams()
742 requested_params.from_dict(request.as_dict())
743 resource.requested_params = requested_params
744 return resource
745
746 @asyncio.coroutine
747 def allocate_dynamic_resource(self, request):
748 #request.flavor_id = yield from self.select_resource_flavor(request)
749 resource_id = yield from self._cal.create_virtual_compute(request)
750 resource = self._resource_class(resource_id, 'dynamic', request)
751 self._all_resources[resource_id] = resource
752 self._allocated_resources[resource_id] = resource
753 self._log.info("Successfully allocated virtual-compute resource from CAL with resource-id: %s", resource_id)
754 return resource
755
756 @asyncio.coroutine
757 def release_cal_resource(self, resource):
758 if hasattr(resource, 'requested_params'):
759 delattr(resource, 'requested_params')
760 if resource.resource_type == 'dynamic':
761 yield from self._cal.delete_virtual_compute(resource.resource_id)
762 self._all_resources.pop(resource.resource_id)
763 self._log.info("Successfully released virtual-compute resource in CAL with resource-id: %s", resource.resource_id)
764 else:
765 self._log.info("Successfully released virtual-compute resource with resource-id: %s into available-list", resource.resource_id)
766 self._free_resources.append(resource)
767
768 @asyncio.coroutine
769 def get_resource_info(self, resource):
770 info = yield from self._cal.get_virtual_compute_info(resource.resource_id)
771
772 self._log.info("Successfully retrieved virtual-compute information from CAL with resource-id: %s. Info: %s",
773 resource.resource_id, str(info))
774 response = RwResourceMgrYang.VDUEventData_ResourceInfo()
775 response.from_dict(info.as_dict())
776 response.pool_name = self.name
777 response.resource_state = self._get_resource_state(info, resource.requested_params)
778 return response
779
780 @asyncio.coroutine
781 def get_info_by_id(self, resource_id):
782 info = yield from self._cal.get_virtual_compute_info(resource_id)
783 self._log.info("Successfully retrieved virtual-compute information from CAL with resource-id: %s. Info: %s",
784 resource_id, str(info))
785 return info
786
787 def _get_resource_state(self, resource_info, requested_params):
788
789
790 def conn_pts_len_equal():
791 # if explicit mgmt network is defined then the allocated ports might
792 # one more than the expected.
793 allocated_ports = len(resource_info.connection_points)
794 requested_ports = len(requested_params.connection_points)
795
796 if not requested_params.mgmt_network:
797 allocated_ports -= 1
798
799 return allocated_ports == requested_ports
800
801 if resource_info.state == 'failed':
802 self._log.error("<Compute-Resource: %s> Reached failed state.",
803 resource_info.name)
804 return 'failed'
805
806 if resource_info.state != 'active':
807 self._log.info("<Compute-Resource: %s> Not reached active state.",
808 resource_info.name)
809 return 'pending'
810
811 if not resource_info.has_field('management_ip') or resource_info.management_ip == '':
812 self._log.info("<Compute-Resource: %s> Management IP not assigned.",
813 resource_info.name)
814 return 'pending'
815
816 if (requested_params.has_field('allocate_public_address')) and (requested_params.allocate_public_address == True):
817 if not resource_info.has_field('public_ip'):
818 self._log.warning("<Compute-Resource: %s> Management IP not assigned- waiting for public ip, %s",
819 resource_info.name, requested_params)
820 return 'pending'
821
822 if not conn_pts_len_equal():
823 self._log.warning("<Compute-Resource: %s> Waiting for requested number of ports to be assigned to virtual-compute, requested: %d, assigned: %d",
824 resource_info.name,
825 len(requested_params.connection_points),
826 len(resource_info.connection_points))
827 return 'pending'
828
829 #not_active = [c for c in resource_info.connection_points
830 # if c.state != 'active']
831
832 #if not_active:
833 # self._log.warning("<Compute-Resource: %s> Management IP not assigned- waiting for connection_points , %s",
834 # resource_info.name, resource_info)
835 # return 'pending'
836
837 ## Find the connection_points which are in active state but does not have IP address
838 no_address = [c for c in resource_info.connection_points
839 if (c.state == 'active') and (not c.has_field('ip_address'))]
840
841 if no_address:
842 self._log.warning("<Compute-Resource: %s> Management IP not assigned- waiting for connection_points , %s",
843 resource_info.name, resource_info)
844 return 'pending'
845
846 return 'active'
847
848 @asyncio.coroutine
849 def select_resource_flavor(self, request):
850 flavors = yield from self._cal.get_compute_flavor_info_list()
851 self._log.debug("Received %d flavor information from RW.CAL", len(flavors))
852 flavor_id = None
853 match_found = False
854 for flv in flavors:
855 self._log.info("Attempting to match compute requirement for VDU: %s with flavor %s",
856 request.name, flv)
857 if self.match_epa_params(flv, request):
858 self._log.info("Flavor match found for compute requirements for VDU: %s with flavor name: %s, flavor-id: %s",
859 request.name, flv.name, flv.id)
860 match_found = True
861 flavor_id = flv.id
862 break
863
864 if not match_found:
865 ### Check if CAL account allows dynamic flavor creation
866 if self._cal.dynamic_flavor_supported():
867 self._log.info("Attempting to create a new flavor for required compute-requirement for VDU: %s", request.name)
868 flavor_id = yield from self._cal.create_compute_flavor(request)
869 else:
870 ### No match with existing flavors and CAL does not support dynamic flavor creation
871 self._log.error("Unable to create flavor for compute requirement for VDU: %s. VDU instantiation failed", request.name)
872 raise ResMgrNoResourcesAvailable("No resource available with matching EPA attributes")
873 else:
874 ### Found flavor
875 self._log.info("Found flavor with id: %s for compute requirement for VDU: %s",
876 flavor_id, request.name)
877 return flavor_id
878
879 def _match_vm_flavor(self, required, available):
880 self._log.info("Matching VM Flavor attributes")
881 if available.vcpu_count != required.vcpu_count:
882 self._log.debug("VCPU requirement mismatch. Required: %d, Available: %d",
883 required.vcpu_count,
884 available.vcpu_count)
885 return False
886 if available.memory_mb != required.memory_mb:
887 self._log.debug("Memory requirement mismatch. Required: %d MB, Available: %d MB",
888 required.memory_mb,
889 available.memory_mb)
890 return False
891 if available.storage_gb != required.storage_gb:
892 self._log.debug("Storage requirement mismatch. Required: %d GB, Available: %d GB",
893 required.storage_gb,
894 available.storage_gb)
895 return False
896 self._log.debug("VM Flavor match found")
897 return True
898
899 def _match_guest_epa(self, required, available):
900 self._log.info("Matching Guest EPA attributes")
901 if required.has_field('pcie_device'):
902 self._log.debug("Matching pcie_device")
903 if available.has_field('pcie_device') == False:
904 self._log.debug("Matching pcie_device failed. Not available in flavor")
905 return False
906 else:
907 for dev in required.pcie_device:
908 if not [ d for d in available.pcie_device
909 if ((d.device_id == dev.device_id) and (d.count == dev.count)) ]:
910 self._log.debug("Matching pcie_device failed. Required: %s, Available: %s", required.pcie_device, available.pcie_device)
911 return False
912 elif available.has_field('pcie_device'):
913 self._log.debug("Rejecting available flavor because pcie_device not required but available")
914 return False
915
916
917 if required.has_field('mempage_size'):
918 self._log.debug("Matching mempage_size")
919 if available.has_field('mempage_size') == False:
920 self._log.debug("Matching mempage_size failed. Not available in flavor")
921 return False
922 else:
923 if required.mempage_size != available.mempage_size:
924 self._log.debug("Matching mempage_size failed. Required: %s, Available: %s", required.mempage_size, available.mempage_size)
925 return False
926 elif available.has_field('mempage_size'):
927 self._log.debug("Rejecting available flavor because mempage_size not required but available")
928 return False
929
930 if required.has_field('cpu_pinning_policy'):
931 self._log.debug("Matching cpu_pinning_policy")
932 if required.cpu_pinning_policy != 'ANY':
933 if available.has_field('cpu_pinning_policy') == False:
934 self._log.debug("Matching cpu_pinning_policy failed. Not available in flavor")
935 return False
936 else:
937 if required.cpu_pinning_policy != available.cpu_pinning_policy:
938 self._log.debug("Matching cpu_pinning_policy failed. Required: %s, Available: %s", required.cpu_pinning_policy, available.cpu_pinning_policy)
939 return False
940 elif available.has_field('cpu_pinning_policy'):
941 self._log.debug("Rejecting available flavor because cpu_pinning_policy not required but available")
942 return False
943
944 if required.has_field('cpu_thread_pinning_policy'):
945 self._log.debug("Matching cpu_thread_pinning_policy")
946 if available.has_field('cpu_thread_pinning_policy') == False:
947 self._log.debug("Matching cpu_thread_pinning_policy failed. Not available in flavor")
948 return False
949 else:
950 if required.cpu_thread_pinning_policy != available.cpu_thread_pinning_policy:
951 self._log.debug("Matching cpu_thread_pinning_policy failed. Required: %s, Available: %s", required.cpu_thread_pinning_policy, available.cpu_thread_pinning_policy)
952 return False
953 elif available.has_field('cpu_thread_pinning_policy'):
954 self._log.debug("Rejecting available flavor because cpu_thread_pinning_policy not required but available")
955 return False
956
957 if required.has_field('trusted_execution'):
958 self._log.debug("Matching trusted_execution")
959 if required.trusted_execution == True:
960 if available.has_field('trusted_execution') == False:
961 self._log.debug("Matching trusted_execution failed. Not available in flavor")
962 return False
963 else:
964 if required.trusted_execution != available.trusted_execution:
965 self._log.debug("Matching trusted_execution failed. Required: %s, Available: %s", required.trusted_execution, available.trusted_execution)
966 return False
967 elif available.has_field('trusted_execution'):
968 self._log.debug("Rejecting available flavor because trusted_execution not required but available")
969 return False
970
971 if required.has_field('numa_node_policy'):
972 self._log.debug("Matching numa_node_policy")
973 if available.has_field('numa_node_policy') == False:
974 self._log.debug("Matching numa_node_policy failed. Not available in flavor")
975 return False
976 else:
977 if required.numa_node_policy.has_field('node_cnt'):
978 self._log.debug("Matching numa_node_policy node_cnt")
979 if available.numa_node_policy.has_field('node_cnt') == False:
980 self._log.debug("Matching numa_node_policy node_cnt failed. Not available in flavor")
981 return False
982 else:
983 if required.numa_node_policy.node_cnt != available.numa_node_policy.node_cnt:
984 self._log.debug("Matching numa_node_policy node_cnt failed. Required: %s, Available: %s",required.numa_node_policy.node_cnt, available.numa_node_policy.node_cnt)
985 return False
986 elif available.numa_node_policy.has_field('node_cnt'):
987 self._log.debug("Rejecting available flavor because numa node count not required but available")
988 return False
989
990 if required.numa_node_policy.has_field('mem_policy'):
991 self._log.debug("Matching numa_node_policy mem_policy")
992 if available.numa_node_policy.has_field('mem_policy') == False:
993 self._log.debug("Matching numa_node_policy mem_policy failed. Not available in flavor")
994 return False
995 else:
996 if required.numa_node_policy.mem_policy != available.numa_node_policy.mem_policy:
997 self._log.debug("Matching numa_node_policy mem_policy failed. Required: %s, Available: %s", required.numa_node_policy.mem_policy, available.numa_node_policy.mem_policy)
998 return False
999 elif available.numa_node_policy.has_field('mem_policy'):
1000 self._log.debug("Rejecting available flavor because num node mem_policy not required but available")
1001 return False
1002
1003 if required.numa_node_policy.has_field('node'):
1004 self._log.debug("Matching numa_node_policy nodes configuration")
1005 if available.numa_node_policy.has_field('node') == False:
1006 self._log.debug("Matching numa_node_policy nodes configuration failed. Not available in flavor")
1007 return False
1008 for required_node in required.numa_node_policy.node:
1009 self._log.debug("Matching numa_node_policy nodes configuration for node %s", required_node)
1010 numa_match = False
1011 for available_node in available.numa_node_policy.node:
1012 if required_node.id != available_node.id:
1013 self._log.debug("Matching numa_node_policy nodes configuration failed. Required: %s, Available: %s", required_node, available_node)
1014 continue
1015 if required_node.vcpu != available_node.vcpu:
1016 self._log.debug("Matching numa_node_policy nodes configuration failed. Required: %s, Available: %s", required_node, available_node)
1017 continue
1018 if required_node.memory_mb != available_node.memory_mb:
1019 self._log.debug("Matching numa_node_policy nodes configuration failed. Required: %s, Available: %s", required_node, available_node)
1020 continue
1021 numa_match = True
1022 if numa_match == False:
1023 return False
1024 elif available.numa_node_policy.has_field('node'):
1025 self._log.debug("Rejecting available flavor because numa nodes not required but available")
1026 return False
1027 elif available.has_field('numa_node_policy'):
1028 self._log.debug("Rejecting available flavor because numa_node_policy not required but available")
1029 return False
1030 self._log.info("Successful match for Guest EPA attributes")
1031 return True
1032
1033 def _match_vswitch_epa(self, required, available):
1034 self._log.debug("VSwitch EPA match found")
1035 return True
1036
1037 def _match_hypervisor_epa(self, required, available):
1038 self._log.debug("Hypervisor EPA match found")
1039 return True
1040
1041 def _match_host_epa(self, required, available):
1042 self._log.info("Matching Host EPA attributes")
1043 if required.has_field('cpu_model'):
1044 self._log.debug("Matching CPU model")
1045 if available.has_field('cpu_model') == False:
1046 self._log.debug("Matching CPU model failed. Not available in flavor")
1047 return False
1048 else:
1049 #### Convert all PREFER to REQUIRE since flavor will only have REQUIRE attributes
1050 if required.cpu_model.replace('PREFER', 'REQUIRE') != available.cpu_model:
1051 self._log.debug("Matching CPU model failed. Required: %s, Available: %s", required.cpu_model, available.cpu_model)
1052 return False
1053 elif available.has_field('cpu_model'):
1054 self._log.debug("Rejecting available flavor because cpu_model not required but available")
1055 return False
1056
1057 if required.has_field('cpu_arch'):
1058 self._log.debug("Matching CPU architecture")
1059 if available.has_field('cpu_arch') == False:
1060 self._log.debug("Matching CPU architecture failed. Not available in flavor")
1061 return False
1062 else:
1063 #### Convert all PREFER to REQUIRE since flavor will only have REQUIRE attributes
1064 if required.cpu_arch.replace('PREFER', 'REQUIRE') != available.cpu_arch:
1065 self._log.debug("Matching CPU architecture failed. Required: %s, Available: %s", required.cpu_arch, available.cpu_arch)
1066 return False
1067 elif available.has_field('cpu_arch'):
1068 self._log.debug("Rejecting available flavor because cpu_arch not required but available")
1069 return False
1070
1071 if required.has_field('cpu_vendor'):
1072 self._log.debug("Matching CPU vendor")
1073 if available.has_field('cpu_vendor') == False:
1074 self._log.debug("Matching CPU vendor failed. Not available in flavor")
1075 return False
1076 else:
1077 #### Convert all PREFER to REQUIRE since flavor will only have REQUIRE attributes
1078 if required.cpu_vendor.replace('PREFER', 'REQUIRE') != available.cpu_vendor:
1079 self._log.debug("Matching CPU vendor failed. Required: %s, Available: %s", required.cpu_vendor, available.cpu_vendor)
1080 return False
1081 elif available.has_field('cpu_vendor'):
1082 self._log.debug("Rejecting available flavor because cpu_vendor not required but available")
1083 return False
1084
1085 if required.has_field('cpu_socket_count'):
1086 self._log.debug("Matching CPU socket count")
1087 if available.has_field('cpu_socket_count') == False:
1088 self._log.debug("Matching CPU socket count failed. Not available in flavor")
1089 return False
1090 else:
1091 if required.cpu_socket_count != available.cpu_socket_count:
1092 self._log.debug("Matching CPU socket count failed. Required: %s, Available: %s", required.cpu_socket_count, available.cpu_socket_count)
1093 return False
1094 elif available.has_field('cpu_socket_count'):
1095 self._log.debug("Rejecting available flavor because cpu_socket_count not required but available")
1096 return False
1097
1098 if required.has_field('cpu_core_count'):
1099 self._log.debug("Matching CPU core count")
1100 if available.has_field('cpu_core_count') == False:
1101 self._log.debug("Matching CPU core count failed. Not available in flavor")
1102 return False
1103 else:
1104 if required.cpu_core_count != available.cpu_core_count:
1105 self._log.debug("Matching CPU core count failed. Required: %s, Available: %s", required.cpu_core_count, available.cpu_core_count)
1106 return False
1107 elif available.has_field('cpu_core_count'):
1108 self._log.debug("Rejecting available flavor because cpu_core_count not required but available")
1109 return False
1110
1111 if required.has_field('cpu_core_thread_count'):
1112 self._log.debug("Matching CPU core thread count")
1113 if available.has_field('cpu_core_thread_count') == False:
1114 self._log.debug("Matching CPU core thread count failed. Not available in flavor")
1115 return False
1116 else:
1117 if required.cpu_core_thread_count != available.cpu_core_thread_count:
1118 self._log.debug("Matching CPU core thread count failed. Required: %s, Available: %s", required.cpu_core_thread_count, available.cpu_core_thread_count)
1119 return False
1120 elif available.has_field('cpu_core_thread_count'):
1121 self._log.debug("Rejecting available flavor because cpu_core_thread_count not required but available")
1122 return False
1123
1124 if required.has_field('cpu_feature'):
1125 self._log.debug("Matching CPU feature list")
1126 if available.has_field('cpu_feature') == False:
1127 self._log.debug("Matching CPU feature list failed. Not available in flavor")
1128 return False
1129 else:
1130 for feature in required.cpu_feature:
1131 if feature not in available.cpu_feature:
1132 self._log.debug("Matching CPU feature list failed. Required feature: %s is not present. Available features: %s", feature, available.cpu_feature)
1133 return False
1134 elif available.has_field('cpu_feature'):
1135 self._log.debug("Rejecting available flavor because cpu_feature not required but available")
1136 return False
1137 self._log.info("Successful match for Host EPA attributes")
1138 return True
1139
1140
1141 def _match_placement_group_inputs(self, required, available):
1142 self._log.info("Matching Host aggregate attributes")
1143
1144 if not required and not available:
1145 # Host aggregate not required and not available => success
1146 self._log.info("Successful match for Host Aggregate attributes")
1147 return True
1148 if required and available:
1149 # Host aggregate requested and available => Do a match and decide
1150 xx = [ x.as_dict() for x in required ]
1151 yy = [ y.as_dict() for y in available ]
1152 for i in xx:
1153 if i not in yy:
1154 self._log.debug("Rejecting available flavor because host Aggregate mismatch. Required: %s, Available: %s ", required, available)
1155 return False
1156 self._log.info("Successful match for Host Aggregate attributes")
1157 return True
1158 else:
1159 # Either of following conditions => Failure
1160 # - Host aggregate required but not available
1161 # - Host aggregate not required but available
1162 self._log.debug("Rejecting available flavor because host Aggregate mismatch. Required: %s, Available: %s ", required, available)
1163 return False
1164
1165
1166 def match_image_params(self, resource_info, request_params):
1167 return True
1168
1169 def match_epa_params(self, resource_info, request_params):
1170 result = self._match_vm_flavor(getattr(request_params, 'vm_flavor'),
1171 getattr(resource_info, 'vm_flavor'))
1172 if result == False:
1173 self._log.debug("VM Flavor mismatched")
1174 return False
1175
1176 result = self._match_guest_epa(getattr(request_params, 'guest_epa'),
1177 getattr(resource_info, 'guest_epa'))
1178 if result == False:
1179 self._log.debug("Guest EPA mismatched")
1180 return False
1181
1182 result = self._match_vswitch_epa(getattr(request_params, 'vswitch_epa'),
1183 getattr(resource_info, 'vswitch_epa'))
1184 if result == False:
1185 self._log.debug("Vswitch EPA mismatched")
1186 return False
1187
1188 result = self._match_hypervisor_epa(getattr(request_params, 'hypervisor_epa'),
1189 getattr(resource_info, 'hypervisor_epa'))
1190 if result == False:
1191 self._log.debug("Hypervisor EPA mismatched")
1192 return False
1193
1194 result = self._match_host_epa(getattr(request_params, 'host_epa'),
1195 getattr(resource_info, 'host_epa'))
1196 if result == False:
1197 self._log.debug("Host EPA mismatched")
1198 return False
1199
1200 result = self._match_placement_group_inputs(getattr(request_params, 'host_aggregate'),
1201 getattr(resource_info, 'host_aggregate'))
1202
1203 if result == False:
1204 self._log.debug("Host Aggregate mismatched")
1205 return False
1206
1207 return True
1208
1209 @asyncio.coroutine
1210 def initialize_resource_in_cal(self, resource, request):
1211 self._log.info("Initializing the compute-resource with id: %s in RW.CAL", resource.resource_id)
1212 modify_params = RwcalYang.VDUModifyParams()
1213 modify_params.vdu_id = resource.resource_id
1214 modify_params.image_id = request.image_id
1215
1216 for c_point in request.connection_points:
1217 self._log.debug("Adding connection point for VDU: %s to virtual-compute with id: %s Connection point Name: %s",
1218 request.name,resource.resource_id,c_point.name)
1219 point = modify_params.connection_points_add.add()
1220 point.name = c_point.name
1221 point.virtual_link_id = c_point.virtual_link_id
1222 yield from self._cal.modify_virtual_compute(modify_params)
1223
1224 @asyncio.coroutine
1225 def uninitialize_resource_in_cal(self, resource):
1226 self._log.info("Un-initializing the compute-resource with id: %s in RW.CAL", resource.resource_id)
1227 modify_params = RwcalYang.VDUModifyParams()
1228 modify_params.vdu_id = resource.resource_id
1229 resource_info = yield from self.get_resource_info(resource)
1230 for c_point in resource_info.connection_points:
1231 self._log.debug("Removing connection point: %s from VDU: %s ",
1232 c_point.name,resource_info.name)
1233 point = modify_params.connection_points_remove.add()
1234 point.connection_point_id = c_point.connection_point_id
1235 yield from self._cal.modify_virtual_compute(modify_params)
1236
1237
1238 class ResourceMgrCore(object):
1239 def __init__(self, dts, log, log_hdl, loop, parent):
1240 self._log = log
1241 self._log_hdl = log_hdl
1242 self._dts = dts
1243 self._loop = loop
1244 self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
1245 self._parent = parent
1246 self._cloud_cals = {}
1247 # Dictionary of pool objects keyed by name
1248 self._cloud_pool_table = {}
1249 # Dictionary of tuples (resource_id, cloud_account_name, pool_name) keyed by event_id
1250 self._resource_table = {}
1251 self._pool_class = {'compute': ComputePool,
1252 'network': NetworkPool}
1253
1254 def _get_cloud_pool_table(self, cloud_account_name):
1255 if cloud_account_name not in self._cloud_pool_table:
1256 msg = "Cloud account %s not found" % cloud_account_name
1257 self._log.error(msg)
1258 raise ResMgrCloudAccountNotFound(msg)
1259
1260 return self._cloud_pool_table[cloud_account_name]
1261
1262 def _get_cloud_cal_plugin(self, cloud_account_name):
1263 if cloud_account_name not in self._cloud_cals:
1264 msg = "Cloud account %s not found" % cloud_account_name
1265 self._log.error(msg)
1266 raise ResMgrCloudAccountNotFound(msg)
1267
1268 return self._cloud_cals[cloud_account_name]
1269
1270 def _add_default_cloud_pools(self, cloud_account_name):
1271 self._log.debug("Adding default compute and network pools for cloud account %s",
1272 cloud_account_name)
1273 default_pools = [
1274 {
1275 'name': '____default_compute_pool',
1276 'resource_type': 'compute',
1277 'pool_type': 'dynamic',
1278 'max_size': 128,
1279 },
1280 {
1281 'name': '____default_network_pool',
1282 'resource_type': 'network',
1283 'pool_type': 'dynamic',
1284 'max_size': 128,
1285 },
1286 ]
1287
1288 for pool_dict in default_pools:
1289 pool_info = ResourcePoolInfo.from_dict(pool_dict)
1290 self._log.info("Applying configuration for cloud account %s pool: %s",
1291 cloud_account_name, pool_info.name)
1292
1293 self.add_resource_pool(cloud_account_name, pool_info)
1294 self.unlock_resource_pool(cloud_account_name, pool_info.name)
1295
1296 def get_cloud_account_names(self):
1297 """ Returns a list of configured cloud account names """
1298 return self._cloud_cals.keys()
1299
1300 def add_cloud_account(self, account):
1301 self._log.debug("Received CAL account. Account Name: %s, Account Type: %s",
1302 account.name, account.account_type)
1303
1304 ### Add cal handler to all the pools
1305 if account.name in self._cloud_cals:
1306 raise ResMgrCloudAccountExists("Cloud account already exists in res mgr: %s",
1307 account.name)
1308
1309 self._cloud_pool_table[account.name] = {}
1310
1311 cal = ResourceMgrCALHandler(self._loop, self._executor, self._log, self._log_hdl, account)
1312 self._cloud_cals[account.name] = cal
1313
1314 self._add_default_cloud_pools(account.name)
1315
1316 def update_cloud_account(self, account):
1317 raise NotImplementedError("Update cloud account not implemented")
1318
1319 def delete_cloud_account(self, account_name, dry_run=False):
1320 cloud_pool_table = self._get_cloud_pool_table(account_name)
1321 for pool in cloud_pool_table.values():
1322 if pool.in_use():
1323 raise ResMgrCloudAccountInUse("Cannot delete cloud which is currently in use")
1324
1325 # If dry_run is specified, do not actually delete the cloud account
1326 if dry_run:
1327 return
1328
1329 for pool in list(cloud_pool_table):
1330 self.delete_resource_pool(account_name, pool)
1331
1332 del self._cloud_pool_table[account_name]
1333 del self._cloud_cals[account_name]
1334
1335 def add_resource_pool(self, cloud_account_name, pool_info):
1336 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1337 if pool_info.name in cloud_pool_table:
1338 raise ResMgrDuplicatePool("Pool with name: %s already exists", pool_info.name)
1339
1340 cloud_cal = self._get_cloud_cal_plugin(cloud_account_name)
1341 pool = self._pool_class[pool_info.resource_type](self._log, self._loop, pool_info, cloud_cal)
1342
1343 cloud_pool_table[pool_info.name] = pool
1344
1345 def delete_resource_pool(self, cloud_account_name, pool_name):
1346 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1347 if pool_name not in cloud_pool_table:
1348 self._log.error("Pool: %s not found for deletion", pool_name)
1349 return
1350 pool = cloud_pool_table[pool_name]
1351
1352 if pool.in_use():
1353 # Can't delete a pool in use
1354 self._log.error("Pool: %s in use. Can not delete in-use pool", pool.name)
1355 return
1356
1357 pool.cleanup()
1358 del cloud_pool_table[pool_name]
1359 self._log.info("Resource Pool: %s successfully deleted", pool_name)
1360
1361 def modify_resource_pool(self, cloud_account_name, pool):
1362 pass
1363
1364 def lock_resource_pool(self, cloud_account_name, pool_name):
1365 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1366 if pool_name not in cloud_pool_table:
1367 self._log.info("Pool: %s is not available for lock operation")
1368 return
1369
1370 pool = cloud_pool_table[pool_name]
1371 pool.lock_pool()
1372
1373 def unlock_resource_pool(self, cloud_account_name, pool_name):
1374 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1375 if pool_name not in cloud_pool_table:
1376 self._log.info("Pool: %s is not available for unlock operation")
1377 return
1378
1379 pool = cloud_pool_table[pool_name]
1380 pool.unlock_pool()
1381
1382 def get_resource_pool_info(self, cloud_account_name, pool_name):
1383 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1384 if pool_name in cloud_pool_table:
1385 pool = cloud_pool_table[pool_name]
1386 return pool.get_pool_info()
1387 else:
1388 return None
1389
1390 def get_resource_pool_list(self, cloud_account_name):
1391 return [v for _, v in self._get_cloud_pool_table(cloud_account_name).items()]
1392
1393 def _select_resource_pools(self, cloud_account_name, resource_type):
1394 pools = [pool for pool in self.get_resource_pool_list(cloud_account_name) if pool.resource_type == resource_type and pool.status == 'unlocked']
1395 if not pools:
1396 raise ResMgrPoolNotAvailable("No %s pool found for resource allocation", resource_type)
1397
1398 return pools[0]
1399
1400 @asyncio.coroutine
1401 def allocate_virtual_resource(self, event_id, cloud_account_name, request, resource_type):
1402 ### Check if event_id is unique or already in use
1403 if event_id in self._resource_table:
1404 r_id, cloud_account_name, pool_name = self._resource_table[event_id]
1405 self._log.warning("Requested event-id :%s for resource-allocation already active with pool: %s",
1406 event_id, pool_name)
1407 # If resource-type matches then return the same resource
1408 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1409 pool = cloud_pool_table[pool_name]
1410 if pool.resource_type == resource_type:
1411
1412 info = yield from pool.read_resource_info(r_id)
1413 return info
1414 else:
1415 self._log.error("Event-id conflict. Duplicate event-id: %s", event_id)
1416 raise ResMgrDuplicateEventId("Requested event-id :%s already active with pool: %s" %(event_id, pool_name))
1417
1418 ### All-OK, lets go ahead with resource allocation
1419 pool = self._select_resource_pools(cloud_account_name, resource_type)
1420 self._log.info("Selected pool %s for resource allocation", pool.name)
1421
1422 r_id, r_info = yield from pool.allocate_resource(request)
1423
1424 self._resource_table[event_id] = (r_id, cloud_account_name, pool.name)
1425 return r_info
1426
1427 @asyncio.coroutine
1428 def reallocate_virtual_resource(self, event_id, cloud_account_name, request, resource_type, resource):
1429 ### Check if event_id is unique or already in use
1430 if event_id in self._resource_table:
1431 r_id, cloud_account_name, pool_name = self._resource_table[event_id]
1432 self._log.warning("Requested event-id :%s for resource-allocation already active with pool: %s",
1433 event_id, pool_name)
1434 # If resource-type matches then return the same resource
1435 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1436 pool = cloud_pool_table[pool_name]
1437 if pool.resource_type == resource_type:
1438 info = yield from pool.read_resource_info(r_id)
1439 return info
1440 else:
1441 self._log.error("Event-id conflict. Duplicate event-id: %s", event_id)
1442 raise ResMgrDuplicateEventId("Requested event-id :%s already active with pool: %s" %(event_id, pool_name))
1443
1444 r_info = None
1445 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1446 pool = cloud_pool_table[resource.pool_name]
1447 if pool.resource_type == resource_type:
1448 if resource_type == 'network':
1449 r_id = resource.virtual_link_id
1450 r_info = yield from pool.get_info_by_id(resource.virtual_link_id)
1451 elif resource_type == 'compute':
1452 r_id = resource.vdu_id
1453 r_info = yield from pool.get_info_by_id(resource.vdu_id)
1454
1455 if r_info is None:
1456 r_id, r_info = yield from pool.allocate_resource(request)
1457 self._resource_table[event_id] = (r_id, cloud_account_name, resource.pool_name)
1458 return r_info
1459
1460 self._resource_table[event_id] = (r_id, cloud_account_name, resource.pool_name)
1461 new_resource = pool._resource_class(r_id, 'dynamic', request)
1462 if resource_type == 'compute':
1463 requested_params = RwcalYang.VDUInitParams()
1464 requested_params.from_dict(request.as_dict())
1465 new_resource.requested_params = requested_params
1466 pool._all_resources[r_id] = new_resource
1467 pool._allocated_resources[r_id] = new_resource
1468 return r_info
1469
1470 @asyncio.coroutine
1471 def release_virtual_resource(self, event_id, resource_type):
1472 ### Check if event_id exists
1473 if event_id not in self._resource_table:
1474 self._log.error("Received resource-release-request with unknown Event-id :%s", event_id)
1475 raise ResMgrUnknownEventId("Received resource-release-request with unknown Event-id :%s" %(event_id))
1476
1477 ## All-OK, lets proceed with resource release
1478 r_id, cloud_account_name, pool_name = self._resource_table.pop(event_id)
1479 self._log.debug("Attempting to release virtual resource id %s from pool %s",
1480 r_id, pool_name)
1481
1482 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1483 pool = cloud_pool_table[pool_name]
1484 yield from pool.release_resource(r_id)
1485
1486 @asyncio.coroutine
1487 def read_virtual_resource(self, event_id, resource_type):
1488 ### Check if event_id exists
1489 if event_id not in self._resource_table:
1490 self._log.error("Received resource-read-request with unknown Event-id :%s", event_id)
1491 raise ResMgrUnknownEventId("Received resource-read-request with unknown Event-id :%s" %(event_id))
1492
1493 ## All-OK, lets proceed
1494 r_id, cloud_account_name, pool_name = self._resource_table[event_id]
1495 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1496 pool = cloud_pool_table[pool_name]
1497 info = yield from pool.read_resource_info(r_id)
1498 return info