3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
22 import concurrent
.futures
27 gi
.require_version('RwVnfrYang', '1.0')
28 gi
.require_version('RwMon', '1.0')
29 from gi
.repository
import (
37 class VdurMissingVimIdError(Exception):
38 def __init__(self
, vdur_id
):
39 super().__init
__("VDUR:{} is has no VIM ID".format(vdur_id
))
42 class VdurAlreadyRegisteredError(Exception):
43 def __init__(self
, vdur_id
):
44 super().__init
__("VDUR:{} is already registered".format(vdur_id
))
47 class AccountInUseError(Exception):
51 class UnknownAccountError(Exception):
55 class AccountAlreadyRegisteredError(Exception):
56 def __init__(self
, account_name
):
57 msg
= "'{}' already registered".format(account_name
)
58 super().__init
__(account_name
)
61 class PluginUnavailableError(Exception):
65 class PluginNotSupportedError(PluginUnavailableError
):
69 class AlarmCreateError(Exception):
71 super().__init
__("failed to create alarm")
74 class AlarmDestroyError(Exception):
76 super().__init
__("failed to destroy alarm")
79 class PluginFactory(object):
80 __metaclass__
= abc
.ABCMeta
83 def create(self
, cloud_account
, plugin_name
):
88 return self
.__class
__.PLUGIN_NAME
93 return list(self
.__class
__.FALLBACKS
)
98 class MonascaPluginFactory(PluginFactory
):
99 PLUGIN_NAME
= "monasca"
100 FALLBACKS
= ["ceilometer",]
102 def create(self
, cloud_account
):
103 raise PluginUnavailableError()
106 class CeilometerPluginFactory(PluginFactory
):
107 PLUGIN_NAME
= "ceilometer"
108 FALLBACKS
= ["unavailable",]
110 def create(self
, cloud_account
):
111 plugin
= rw_peas
.PeasPlugin("rwmon_ceilometer", 'RwMon-1.0')
112 impl
= plugin
.get_interface("Monitoring")
114 # Check that the plugin is available on the platform associated with
116 _
, available
= impl
.nfvi_metrics_available(cloud_account
)
118 raise PluginUnavailableError()
123 class UnavailablePluginFactory(PluginFactory
):
124 PLUGIN_NAME
= "unavailable"
126 class UnavailablePlugin(object):
127 def nfvi_metrics_available(self
, cloud_account
):
130 def create(self
, cloud_account
):
131 return UnavailablePluginFactory
.UnavailablePlugin()
134 class MockPluginFactory(PluginFactory
):
136 FALLBACKS
= ["unavailable",]
138 def create(self
, cloud_account
):
139 plugin
= rw_peas
.PeasPlugin("rwmon_mock", 'RwMon-1.0')
140 impl
= plugin
.get_interface("Monitoring")
142 # Check that the plugin is available on the platform associated with
144 _
, available
= impl
.nfvi_metrics_available(cloud_account
)
146 raise PluginUnavailableError()
151 class NfviMetricsPluginManager(object):
152 def __init__(self
, log
):
153 self
._plugins
= dict()
155 self
._factories
= dict()
157 self
.register_plugin_factory(MockPluginFactory())
158 self
.register_plugin_factory(CeilometerPluginFactory())
159 self
.register_plugin_factory(MonascaPluginFactory())
160 self
.register_plugin_factory(UnavailablePluginFactory())
166 def register_plugin_factory(self
, factory
):
167 self
._factories
[factory
.name
] = factory
169 def plugin(self
, account_name
):
170 return self
._plugins
[account_name
]
172 def register(self
, cloud_account
, plugin_name
):
173 # Check to see if the cloud account has already been registered
174 if cloud_account
.name
in self
._plugins
:
175 raise AccountAlreadyRegisteredError(cloud_account
.name
)
177 if plugin_name
not in self
._factories
:
178 raise PluginNotSupportedError(plugin_name
)
180 # Create a plugin from one of the factories
181 fallbacks
= [plugin_name
,]
184 name
= fallbacks
.pop(0)
186 factory
= self
._factories
[name
]
187 plugin
= factory
.create(cloud_account
)
188 self
._plugins
[cloud_account
.name
] = plugin
191 except PluginUnavailableError
as e
:
192 self
.log
.warning("plugin for {} unavailable".format(name
))
193 fallbacks
.extend(factory
.fallbacks
)
195 raise PluginUnavailableError()
197 def unregister(self
, account_name
):
198 if account_name
in self
._plugins
:
199 del self
._plugins
[account_name
]
202 class NfviMetrics(object):
204 The NfviMetrics class contains the logic to retrieve NFVI metrics for a
205 particular VDUR. Of particular importance is that this object caches the
206 metrics until the data become stale so that it does not create excessive
207 load upon the underlying data-source.
210 # The sample interval defines the maximum time (secs) that metrics will be
211 # cached for. This duration should coincide with the sampling interval used
212 # by the underlying data-source to capture metrics.
215 # The maximum time (secs) an instance will wait for a request to the data
216 # source to be completed
219 def __init__(self
, log
, loop
, account
, plugin
, vdur
):
220 """Creates an instance of NfviMetrics
223 manager - a NfviInterface instance
224 account - a CloudAccount instance
225 plugin - an NFVI plugin
226 vdur - a VDUR instance
231 self
._account
= account
232 self
._plugin
= plugin
234 self
._metrics
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_NfviMetrics()
236 self
._vim
_id
= vdur
.vim_id
237 self
._updating
= None
241 """The logger used by NfviMetrics"""
246 """The current asyncio loop"""
251 """The VDUR that these metrics are associated with"""
255 """Return the NFVI metrics for this VDUR
257 This function will immediately return the current, known NFVI metrics
258 for the associated VDUR. It will also, if the data are stale, schedule
259 a call to the data-source to retrieve new data.
262 if self
.should_update():
263 self
._updating
= self
.loop
.create_task(self
.update())
267 def should_update(self
):
268 """Return a boolean indicating whether an update should be performed"""
269 running
= self
._updating
is not None and not self
._updating
.done()
270 overdue
= time
.time() > self
._timestamp
+ NfviMetrics
.SAMPLE_INTERVAL
272 return overdue
and not running
276 """Update the NFVI metrics for the associated VDUR
278 This coroutine will request new metrics from the data-source and update
284 # Make the request to the plugin in a separate thread and do
285 # not exceed the timeout
286 _
, metrics
= yield from asyncio
.wait_for(
287 self
.loop
.run_in_executor(
289 self
._plugin
.nfvi_metrics
,
293 timeout
=NfviMetrics
.TIMEOUT
,
297 except asyncio
.TimeoutError
:
298 msg
= "timeout on request for nfvi metrics (vim-id = {})"
299 self
.log
.warning(msg
.format(self
._vim
_id
))
302 except Exception as e
:
303 self
.log
.exception(e
)
307 # Create uninitialized metric structure
308 vdu_metrics
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_NfviMetrics()
311 vdu_metrics
.vcpu
.total
= self
.vdur
.vm_flavor
.vcpu_count
312 vdu_metrics
.vcpu
.utilization
= metrics
.vcpu
.utilization
315 vdu_metrics
.memory
.used
= metrics
.memory
.used
316 vdu_metrics
.memory
.total
= self
.vdur
.vm_flavor
.memory_mb
317 vdu_metrics
.memory
.utilization
= 100 * vdu_metrics
.memory
.used
/ vdu_metrics
.memory
.total
321 vdu_metrics
.storage
.used
= metrics
.storage
.used
322 if self
.vdur
.has_field('volumes'):
323 for volume
in self
.vdur
.volumes
:
324 if vdu_metrics
.storage
.total
is None:
325 vdu_metrics
.storage
.total
= 1e9
* volume
.size
327 vdu_metrics
.storage
.total
+= (1e9
* volume
.size
)
329 vdu_metrics
.storage
.total
= 1e9
* self
.vdur
.vm_flavor
.storage_gb
330 utilization
= 100 * vdu_metrics
.storage
.used
/ vdu_metrics
.storage
.total
331 if utilization
> 100:
333 vdu_metrics
.storage
.utilization
= utilization
334 except ZeroDivisionError:
335 vdu_metrics
.storage
.utilization
= 0
338 vdu_metrics
.network
.incoming
.packets
= metrics
.network
.incoming
.packets
339 vdu_metrics
.network
.incoming
.packet_rate
= metrics
.network
.incoming
.packet_rate
340 vdu_metrics
.network
.incoming
.bytes
= metrics
.network
.incoming
.bytes
341 vdu_metrics
.network
.incoming
.byte_rate
= metrics
.network
.incoming
.byte_rate
344 vdu_metrics
.network
.outgoing
.packets
= metrics
.network
.outgoing
.packets
345 vdu_metrics
.network
.outgoing
.packet_rate
= metrics
.network
.outgoing
.packet_rate
346 vdu_metrics
.network
.outgoing
.bytes
= metrics
.network
.outgoing
.bytes
347 vdu_metrics
.network
.outgoing
.byte_rate
= metrics
.network
.outgoing
.byte_rate
350 vdu_metrics
.external_ports
.total
= len(self
.vdur
.external_interface
)
353 vdu_metrics
.internal_ports
.total
= len(self
.vdur
.internal_interface
)
355 self
._metrics
= vdu_metrics
357 except Exception as e
:
358 self
.log
.exception(e
)
361 # Regardless of the result of the query, we want to make sure that
362 # we do not poll the data source until another sample duration has
364 self
._timestamp
= time
.time()
367 class NfviMetricsCache(object):
368 def __init__(self
, log
, loop
, plugin_manager
):
371 self
._plugin
_manager
= plugin_manager
372 self
._nfvi
_metrics
= dict()
374 self
._vim
_to
_vdur
= dict()
375 self
._vdur
_to
_vim
= dict()
377 def create_entry(self
, account
, vdur
):
378 plugin
= self
._plugin
_manager
.plugin(account
.name
)
379 metrics
= NfviMetrics(self
._log
, self
._loop
, account
, plugin
, vdur
)
380 self
._nfvi
_metrics
[vdur
.vim_id
] = metrics
382 self
._vim
_to
_vdur
[vdur
.vim_id
] = vdur
.id
383 self
._vdur
_to
_vim
[vdur
.id] = vdur
.vim_id
385 def destroy_entry(self
, vdur_id
):
386 vim_id
= self
._vdur
_to
_vim
[vdur_id
]
388 del self
._nfvi
_metrics
[vim_id
]
389 del self
._vdur
_to
_vim
[vdur_id
]
390 del self
._vim
_to
_vdur
[vim_id
]
392 def retrieve(self
, vim_id
):
393 return self
._nfvi
_metrics
[vim_id
].retrieve()
395 def to_vim_id(self
, vdur_id
):
396 return self
._vdur
_to
_vim
[vdur_id
]
398 def to_vdur_id(self
, vim_id
):
399 return self
._vim
_to
_vdur
[vim_id
]
401 def contains_vdur_id(self
, vdur_id
):
402 return vdur_id
in self
._vdur
_to
_vim
404 def contains_vim_id(self
, vim_id
):
405 return vim_id
in self
._vim
_to
_vdur
408 class NfviInterface(object):
410 The NfviInterface serves as an interface for communicating with the
411 underlying infrastructure, i.e. retrieving metrics for VDURs that have been
412 registered with it and managing alarms.
414 The NfviInterface should only need to be invoked using a cloud account and
415 optionally a VIM ID; It should not need to handle mapping from VDUR ID to
419 def __init__(self
, loop
, log
, plugin_manager
, cache
):
420 """Creates an NfviInterface instance
425 plugin_manager - an instance of NfviMetricsPluginManager
426 cache - an instance of NfviMetricsCache
429 self
._executor
= concurrent
.futures
.ThreadPoolExecutor(max_workers
=16)
430 self
._plugin
_manager
= plugin_manager
437 """The event loop used by this NfviInterface"""
442 """The event log used by this NfviInterface"""
447 """The list of metrics contained in this NfviInterface"""
448 return list(self
._cache
._nfvi
_metrics
.values())
450 def nfvi_metrics_available(self
, account
):
451 plugin
= self
._plugin
_manager
.plugin(account
.name
)
452 _
, available
= plugin
.nfvi_metrics_available(account
)
455 def retrieve(self
, vdur_id
):
456 """Returns the NFVI metrics for the specified VDUR
458 Note, a VDUR must be registered with a NfviInterface before
459 metrics can be retrieved for it.
462 vdur_id - the ID of the VDUR to whose metrics should be retrieve
465 An NfviMetrics object for the specified VDUR
468 return self
._cache
.retrieve(self
._cache
.to_vim_id(vdur_id
))
471 def alarm_create(self
, account
, vim_id
, alarm
, timeout
=5):
472 """Create a new alarm
475 account - a CloudAccount instance
476 vim_id - the VM to associate with this alarm
477 alarm - an alarm structure
478 timeout - the request timeout (sec)
481 If the data source does not respond in a timely manner, an
482 asyncio.TimeoutError will be raised.
485 plugin
= self
._plugin
_manager
.plugin(account
.name
)
486 status
= yield from asyncio
.wait_for(
487 self
.loop
.run_in_executor(
489 plugin
.do_alarm_create
,
498 if status
== RwTypes
.RwStatus
.FAILURE
:
499 raise AlarmCreateError()
502 def alarm_destroy(self
, account
, alarm_id
, timeout
=5):
503 """Destroy an existing alarm
506 account - a CloudAccount instance
507 alarm_id - the identifier of the alarm to destroy
508 timeout - the request timeout (sec)
511 If the data source does not respond in a timely manner, an
512 asyncio.TimeoutError will be raised.
515 plugin
= self
._plugin
_manager
.plugin(account
.name
)
516 status
= yield from asyncio
.wait_for(
517 self
.loop
.run_in_executor(
519 plugin
.do_alarm_delete
,
527 if status
== RwTypes
.RwStatus
.FAILURE
:
528 raise AlarmDestroyError()
531 class InstanceConfiguration(object):
533 The InstanceConfiguration class represents configuration information that
534 affects the behavior of the monitor. Essentially this class should contain
535 not functional behavior but serve as a convenient way to share data amongst
536 the components of the monitoring system.
540 self
.polling_period
= None
541 self
.max_polling_frequency
= None
542 self
.min_cache_lifetime
= None
543 self
.public_ip
= None
546 class Monitor(object):
548 The Monitor class is intended to act as a unifying interface for the
549 different sub-systems that are used to monitor the NFVI.
552 def __init__(self
, loop
, log
, config
):
553 """Create a Monitor object
557 log - the logger used by this object
558 config - an instance of InstanceConfiguration
564 self
._cloud
_accounts
= dict()
565 self
._nfvi
_plugins
= NfviMetricsPluginManager(log
)
566 self
._cache
= NfviMetricsCache(log
, loop
, self
._nfvi
_plugins
)
567 self
._nfvi
_interface
= NfviInterface(loop
, log
, self
._nfvi
_plugins
, self
._cache
)
568 self
._config
= config
570 self
._vnfr
_to
_vdurs
= collections
.defaultdict(set)
571 self
._alarms
= collections
.defaultdict(list)
575 """The event loop used by this object"""
580 """The event log used by this object"""
585 """The NFVI metrics cache"""
590 """The list of metrics contained in this Monitor"""
591 return self
._nfvi
_interface
.metrics
593 def nfvi_metrics_available(self
, account
):
594 """Returns a boolean indicating whether NFVI metrics are available
597 account - the name of the cloud account to check
600 a boolean indicating availability of NFVI metrics
603 if account
not in self
._cloud
_accounts
:
606 cloud_account
= self
._cloud
_accounts
[account
]
607 return self
._nfvi
_interface
.nfvi_metrics_available(cloud_account
)
609 def add_cloud_account(self
, account
):
610 """Add a cloud account to the monitor
613 account - a cloud account object
616 If the cloud account has already been added to the monitor, an
617 AccountAlreadyRegisteredError is raised.
620 if account
.name
in self
._cloud
_accounts
:
621 raise AccountAlreadyRegisteredError(account
.name
)
623 self
._cloud
_accounts
[account
.name
] = account
625 if account
.account_type
== "openstack":
626 self
.register_cloud_account(account
, "monasca")
628 self
.register_cloud_account(account
, "mock")
630 def remove_cloud_account(self
, account_name
):
631 """Remove a cloud account from the monitor
634 account_name - removes the cloud account that has this name
637 If the specified cloud account cannot be found, an
638 UnknownAccountError is raised.
641 if account_name
not in self
._cloud
_accounts
:
642 raise UnknownAccountError()
644 # Make sure that there are no VNFRs associated with this account
645 for vnfr
in self
._vnfrs
.values():
646 if vnfr
.cloud_account
== account_name
:
647 raise AccountInUseError()
649 del self
._cloud
_accounts
[account_name
]
650 self
._nfvi
_plugins
.unregister(account_name
)
652 def get_cloud_account(self
, account_name
):
653 """Returns a cloud account by name
656 account_name - the name of the account to return
659 An UnknownAccountError is raised if there is not account object
660 associated with the provided name
663 A cloud account object
666 if account_name
not in self
._cloud
_accounts
:
667 raise UnknownAccountError()
669 return self
._cloud
_accounts
[account_name
]
671 def register_cloud_account(self
, account
, plugin_name
):
672 """Register a cloud account with an NFVI plugin
674 Note that a cloud account can only be registered for one plugin at a
678 account - the cloud account to associate with the plugin
679 plugin_name - the name of the plugin to use
682 self
._nfvi
_plugins
.register(account
, plugin_name
)
684 def add_vnfr(self
, vnfr
):
685 """Add a VNFR to the monitor
691 An UnknownAccountError is raised if the account name contained in
692 the VNFR does not reference a cloud account that has been added to
696 if vnfr
.cloud_account
not in self
._cloud
_accounts
:
697 raise UnknownAccountError()
699 account
= self
._cloud
_accounts
[vnfr
.cloud_account
]
701 for vdur
in vnfr
.vdur
:
703 self
.add_vdur(account
, vdur
)
704 self
._vnfr
_to
_vdurs
[vnfr
.id].add(vdur
.id)
705 except (VdurMissingVimIdError
, VdurAlreadyRegisteredError
):
708 self
._vnfrs
[vnfr
.id] = vnfr
710 def update_vnfr(self
, vnfr
):
711 """Updates the VNFR information in the monitor
717 An UnknownAccountError is raised if the account name contained in
718 the VNFR does not reference a cloud account that has been added to
722 if vnfr
.cloud_account
not in self
._cloud
_accounts
:
723 raise UnknownAccountError()
725 account
= self
._cloud
_accounts
[vnfr
.cloud_account
]
727 for vdur
in vnfr
.vdur
:
729 self
.add_vdur(account
, vdur
)
730 self
._vnfr
_to
_vdurs
[vnfr
.id].add(vdur
.id)
731 except (VdurMissingVimIdError
, VdurAlreadyRegisteredError
):
734 def remove_vnfr(self
, vnfr_id
):
735 """Remove a VNFR from the monitor
738 vnfr_id - the ID of the VNFR to remove
741 vdur_ids
= self
._vnfr
_to
_vdurs
[vnfr_id
]
743 for vdur_id
in vdur_ids
:
744 self
.remove_vdur(vdur_id
)
746 del self
._vnfrs
[vnfr_id
]
747 del self
._vnfr
_to
_vdurs
[vnfr_id
]
749 def add_vdur(self
, account
, vdur
):
750 """Adds a VDUR to the monitor
752 Adding a VDUR to the monitor will automatically create a NFVI metrics
753 object that is associated with the VDUR so that the monitor cane
754 provide the NFVI metrics associated with the VDUR.
757 account - the cloud account associated with the VNFR that contains
762 A VdurMissingVimIdError is raised if the provided VDUR does not
763 contain a VIM ID. A VdurAlreadyRegisteredError is raised if the ID
764 associated with the VDUR has already been registered.
768 raise VdurMissingVimIdError(vdur
.id)
770 if self
.is_registered_vdur(vdur
.id):
771 raise VdurAlreadyRegisteredError(vdur
.id)
773 self
.cache
.create_entry(account
, vdur
)
775 def remove_vdur(self
, vdur_id
):
776 """Removes a VDUR from the monitor
779 vdur_id - the ID of the VDUR to remove
782 self
.cache
.destroy_entry(vdur_id
)
784 # Schedule any alarms associated with the VDUR for destruction
785 for account_name
, alarm_id
in self
._alarms
[vdur_id
]:
786 self
.loop
.create_task(self
.destroy_alarm(account_name
, alarm_id
))
788 del self
._alarms
[vdur_id
]
790 def list_vdur(self
, vnfr_id
):
791 """Returns a list of VDURs
794 vnfr_id - the identifier of the VNFR contains the VDURs
800 return self
._vnfrs
[vnfr_id
].vdur
802 def is_registered_vnfr(self
, vnfr_id
):
803 """Returns True if the VNFR is registered with the monitor
806 vnfr_id - the ID of the VNFR to check
809 True if the VNFR is registered and False otherwise.
812 return vnfr_id
in self
._vnfrs
814 def is_registered_vdur(self
, vdur_id
):
815 """Returns True if the VDUR is registered with the monitor
818 vnfr_id - the ID of the VDUR to check
821 True if the VDUR is registered and False otherwise.
824 return self
.cache
.contains_vdur_id(vdur_id
)
826 def retrieve_nfvi_metrics(self
, vdur_id
):
827 """Retrieves the NFVI metrics associated with a VDUR
830 vdur_id - the ID of the VDUR whose metrics are to be retrieved
833 NFVI metrics for a VDUR
836 return self
._nfvi
_interface
.retrieve(vdur_id
)
839 def create_alarm(self
, account_name
, vdur_id
, alarm
):
840 """Create a new alarm
842 This function create an alarm and augments the provided endpoints with
843 endpoints to the launchpad if the launchpad has a public IP. The added
844 endpoints are of the form,
846 http://{host}:4568/{platform}/{vdur_id}/{action}
848 where the 'action' is one of 'ok', 'alarm', or 'insufficient_data'. The
849 messages that are pushed to the launchpad are not defined by RIFT so
850 we need to know which platform an alarm is sent from in order to
855 account_name - the name of the account to use to create the alarm
856 vdur_id - the identifier of the VDUR to associated with the
857 alarm. If the identifier is None, the alarm is not
858 associated with a specific VDUR.
859 alarm - the alarm data
862 account
= self
.get_cloud_account(account_name
)
863 vim_id
= self
.cache
.to_vim_id(vdur_id
)
865 # If the launchpad has a public IP, augment the action webhooks to
866 # include the launchpad so that the alarms can be broadcast as event
868 if self
._config
.public_ip
is not None:
869 url
= "http://{host}:4568/{platform}/{vdur_id}".format(
870 host
=self
._config
.public_ip
,
871 platform
=account
.account_type
,
874 alarm
.actions
.ok
.add().url
= url
+ "/ok"
875 alarm
.actions
.alarm
.add().url
= url
+ "/alarm"
876 alarm
.actions
.alarm
.add().url
= url
+ "/insufficient_data"
878 yield from self
._nfvi
_interface
.alarm_create(account
, vim_id
, alarm
)
880 # Associate the VDUR ID with the alarm ID
881 self
._alarms
[vdur_id
].append((account_name
, alarm
.alarm_id
))
884 def destroy_alarm(self
, account_name
, alarm_id
):
885 """Destroy an existing alarm
888 account_name - the name of the account that owns the alert
889 alarm_id - the identifier of the alarm to destroy
892 account
= self
.get_cloud_account(account_name
)
893 yield from self
._nfvi
_interface
.alarm_destroy(account
, alarm_id
)