7779479de2bf446c2f45cbd0e0b18bbad1f3de9c
[osm/SO.git] / rwcm / plugins / rwconman / rift / tasklets / rwconmantasklet / jujuconf.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import os
19 import re
20 import tempfile
21 import time
22 import yaml
23
24 import rift.mano.utils.juju_api as juju
25 from . import riftcm_config_plugin
26
27
28 # Charm service name accepts only a to z and -.
29 def get_vnf_unique_name(nsr_name, vnfr_short_name, member_vnf_index):
30 name = "{}-{}-{}".format(nsr_name, vnfr_short_name, member_vnf_index)
31 new_name = ''
32 for c in name:
33 if c.isdigit():
34 c = chr(97 + int(c))
35 elif not c.isalpha():
36 c = "-"
37 new_name += c
38 return new_name.lower()
39
40
41 class JujuConfigPlugin(riftcm_config_plugin.RiftCMConfigPluginBase):
42 """
43 Juju implementation of the riftcm_config_plugin.RiftCMConfigPluginBase
44 """
45 def __init__(self, dts, log, loop, account):
46 riftcm_config_plugin.RiftCMConfigPluginBase.__init__(self, dts, log, loop, account)
47 self._name = account.name
48 self._type = 'juju'
49 self._ip_address = account.juju.ip_address
50 self._port = account.juju.port
51 self._user = account.juju.user
52 self._secret = account.juju.secret
53 self._rift_install_dir = os.environ['RIFT_INSTALL']
54 self._rift_artif_dir = os.environ['RIFT_ARTIFACTS']
55
56 ############################################################
57 # This is wrongfully overloaded with 'juju' private data. #
58 # Really need to separate agent_vnfr from juju vnfr data. #
59 # Currently, this holds agent_vnfr, which has actual vnfr, #
60 # then this juju overloads actual vnfr with its own #
61 # dictionary elemetns (WRONG!!!) #
62 self._juju_vnfs = {}
63 ############################################################
64
65 self._tasks = {}
66 self._api = juju.JujuApi(log, loop,
67 self._ip_address, self._port,
68 self._user, self._secret)
69
70 @property
71 def name(self):
72 return self._name
73
74 @property
75 def agent_type(self):
76 return self._type
77
78 @property
79 def api(self):
80 return self._api
81
82 @property
83 def agent_data(self):
84 return dict(
85 type=self.agent_type,
86 name=self.name,
87 host=self._ip_address,
88 port=self._port,
89 user=self._user,
90 secret=self._secret
91 )
92
93 def vnfr(self, vnfr_id):
94 try:
95 vnfr = self._juju_vnfs[vnfr_id].vnfr
96 except KeyError:
97 self._log.error("jujuCA: Did not find VNFR %s in juju plugin", vnfr_id)
98 return None
99
100 return vnfr
101
102 def get_service_name(self, vnfr_id):
103 vnfr = self.vnfr(vnfr_id)
104 if vnfr and 'vnf_juju_name' in vnfr:
105 return vnfr['vnf_juju_name']
106 return None
107
108 def juju_log(self, level, name, log_str, *args):
109 if name is not None:
110 g_log_str = 'jujuCA:({}) {}'.format(name, log_str)
111 else:
112 g_log_str = 'jujuCA: {}'.format(log_str)
113 getattr(self._log, level)(g_log_str, *args)
114
115 # TBD: Do a better, similar to config manager
116 def xlate(self, tag, tags):
117 # TBD
118 if tag is None:
119 return tag
120 val = tag
121 if re.search('<.*>', tag):
122 self._log.debug("jujuCA: Xlate value %s", tag)
123 try:
124 if tag == '<rw_mgmt_ip>':
125 val = tags['rw_mgmt_ip']
126 except KeyError as e:
127 self._log.info("jujuCA: Did not get a value for tag %s, e=%s",
128 tag, e)
129 return val
130
131 @asyncio.coroutine
132 def notify_create_vlr(self, agent_nsr, agent_vnfr, vld, vlr):
133 """
134 Notification of create VL record
135 """
136 return True
137
138 @asyncio.coroutine
139 def notify_create_vnfr(self, agent_nsr, agent_vnfr):
140 """
141 Notification of create Network VNF record
142 Returns True if configured using config_agent
143 """
144 # Deploy the charm if specified for the vnf
145 self._log.debug("jujuCA: create vnfr nsr=%s vnfr=%s",
146 agent_nsr.name, agent_vnfr.name)
147 self._log.debug("jujuCA: Config = %s",
148 agent_vnfr.vnf_configuration)
149 try:
150 vnf_config = agent_vnfr.vnfr_msg.vnf_configuration
151 self._log.debug("jujuCA: vnf_configuration = %s", vnf_config)
152 if not vnf_config.has_field('juju'):
153 return False
154 charm = vnf_config.juju.charm
155 self._log.debug("jujuCA: charm = %s", charm)
156 except Exception as e:
157 self._log.Error("jujuCA: vnf_configuration error for vnfr {}: {}".
158 format(agent_vnfr.name, e))
159 return False
160
161 # Prepare unique name for this VNF
162 vnf_unique_name = get_vnf_unique_name(agent_nsr.name,
163 agent_vnfr.name,
164 agent_vnfr.member_vnf_index)
165 if vnf_unique_name in self._tasks:
166 self._log.warn("jujuCA: Service %s already deployed",
167 vnf_unique_name)
168
169 vnfr_dict = agent_vnfr.vnfr
170 vnfr_dict.update({'vnf_juju_name': vnf_unique_name,
171 'charm': charm,
172 'nsr_id': agent_nsr.id,
173 'member_vnf_index': agent_vnfr.member_vnf_index,
174 'tags': {},
175 'active': False,
176 'config': vnf_config,
177 'vnfr_name' : agent_vnfr.name})
178 self._log.debug("jujuCA: Charm %s for vnf %s to be deployed as %s",
179 charm, agent_vnfr.name, vnf_unique_name)
180
181 # Find the charm directory
182 try:
183 path = os.path.join(self._rift_artif_dir,
184 'launchpad/libs',
185 agent_vnfr.vnfr_msg.vnfd.id,
186 'charms/trusty',
187 charm)
188 self._log.debug("jujuCA: Charm dir is {}".format(path))
189 if not os.path.isdir(path):
190 self._log.error("jujuCA: Did not find the charm directory at {}".
191 format(path))
192 path = None
193 except Exception as e:
194 self.log.exception(e)
195 return False
196
197 if vnf_unique_name not in self._tasks:
198 self._tasks[vnf_unique_name] = {}
199
200 self._tasks[vnf_unique_name]['deploy'] = self.loop.create_task(
201 self.api.deploy_service(charm, vnf_unique_name, path=path))
202
203 self._log.debug("jujuCA: Deploying service %s",
204 vnf_unique_name)
205
206 return True
207
208 @asyncio.coroutine
209 def notify_instantiate_vnfr(self, agent_nsr, agent_vnfr):
210 """
211 Notification of Instantiate NSR with the passed nsr id
212 """
213 return True
214
215 @asyncio.coroutine
216 def notify_instantiate_vlr(self, agent_nsr, agent_vnfr, vlr):
217 """
218 Notification of Instantiate NSR with the passed nsr id
219 """
220 return True
221
222 @asyncio.coroutine
223 def notify_terminate_nsr(self, agent_nsr, agent_vnfr):
224 """
225 Notification of Terminate the network service
226 """
227 return True
228
229 @asyncio.coroutine
230 def notify_terminate_vnfr(self, agent_nsr, agent_vnfr):
231 """
232 Notification of Terminate the network service
233 """
234 self._log.debug("jujuCA: Terminate VNFr {}, current vnfrs={}".
235 format(agent_vnfr.name, self._juju_vnfs))
236 try:
237 vnfr = agent_vnfr.vnfr
238 service = vnfr['vnf_juju_name']
239
240 self._log.debug ("jujuCA: Terminating VNFr %s, %s",
241 agent_vnfr.name, service)
242 self._tasks[service]['destroy'] = self.loop.create_task(
243 self.api.destroy_service(service)
244 )
245
246 del self._juju_vnfs[agent_vnfr.id]
247 self._log.debug ("jujuCA: current vnfrs={}".
248 format(self._juju_vnfs))
249 if service in self._tasks:
250 tasks = []
251 for action in self._tasks[service].keys():
252 #if self.check_task_status(service, action):
253 tasks.append(action)
254 del tasks
255 except KeyError as e:
256 self._log.debug ("jujuCA: Termiating charm service for VNFr {}, e={}".
257 format(agent_vnfr.name, e))
258 except Exception as e:
259 self._log.error("jujuCA: Exception terminating charm service for VNFR {}: {}".
260 format(agent_vnfr.name, e))
261
262 return True
263
264 @asyncio.coroutine
265 def notify_terminate_vlr(self, agent_nsr, agent_vnfr, vlr):
266 """
267 Notification of Terminate the virtual link
268 """
269 return True
270
271 def check_task_status(self, service, action):
272 #self.log.debug("jujuCA: check task status for %s, %s" % (service, action))
273 try:
274 task = self._tasks[service][action]
275 if task.done():
276 self.log.debug("jujuCA: Task for %s, %s done" % (service, action))
277 e = task.exception()
278 if e:
279 self.log.error("jujuCA: Error in task for {} and {} : {}".
280 format(service, action, e))
281 raise Exception(e)
282 r= task.result()
283 if r:
284 self.log.debug("jujuCA: Task for {} and {}, returned {}".
285 format(service, action,r))
286 return True
287 else:
288 self.log.debug("jujuCA: task {}, {} not done".
289 format(service, action))
290 return False
291 except KeyError as e:
292 self.log.error("jujuCA: KeyError for task for {} and {}: {}".
293 format(service, action, e))
294 except Exception as e:
295 self.log.error("jujuCA: Error for task for {} and {}: {}".
296 format(service, action, e))
297 raise
298 return True
299
300 @asyncio.coroutine
301 def vnf_config_primitive(self, nsr_id, vnfr_id, primitive, output):
302 self._log.debug("jujuCA: VNF config primititve {} for nsr {}, vnfr_id {}".
303 format(primitive, nsr_id, vnfr_id))
304 try:
305 vnfr = self._juju_vnfs[vnfr_id].vnfr
306 except KeyError:
307 self._log.error("jujuCA: Did not find VNFR %s in juju plugin",
308 vnfr_id)
309 return
310
311 output.execution_status = "failed"
312 output.execution_id = ''
313 output.execution_error_details = ''
314
315 try:
316 service = vnfr['vnf_juju_name']
317 vnf_config = vnfr['config']
318 self._log.debug("VNF config %s", vnf_config)
319 configs = vnf_config.service_primitive
320 for config in configs:
321 if config.name == primitive.name:
322 self._log.debug("jujuCA: Found the config primitive %s",
323 config.name)
324 params = {}
325 for parameter in primitive.parameter:
326 if parameter.value:
327 val = self.xlate(parameter.value, vnfr['tags'])
328 # TBD do validation of the parameters
329 data_type = 'string'
330 found = False
331 for ca_param in config.parameter:
332 if ca_param.name == parameter.name:
333 data_type = ca_param.data_type
334 found = True
335 break
336 if data_type == 'integer':
337 val = int(parameter.value)
338 if not found:
339 self._log.warn("jujuCA: Did not find parameter {} for {}".
340 format(parameter, config.name))
341 params.update({parameter.name: val})
342
343 if config.name == 'config':
344 output.execution_id = 'config'
345 if len(params):
346 self._log.debug("jujuCA: applying config with params {} for service {}".
347 format(params, service))
348
349 rc = yield from self.api.apply_config(params, service=service, wait=False)
350
351 if rc:
352 # Mark as pending and check later for the status
353 output.execution_status = "pending"
354 self._log.debug("jujuCA: applied config {} on {}".
355 format(params, service))
356 else:
357 output.execution_status = 'failed'
358 output.execution_error_details = \
359 'Failed to apply config: {}'.format(params)
360 self._log.error("jujuCA: Error applying config {} on service {}".
361 format(params, service))
362 else:
363 self._log.warn("jujuCA: Did not find valid parameters for config : {}".
364 format(primitive.parameter))
365 output.execution_status = "completed"
366 else:
367 self._log.debug("jujuCA: Execute action {} on service {} with params {}".
368 format(config.name, service, params))
369
370 resp = yield from self.api.execute_action(config.name,
371 params,
372 service=service)
373
374 if resp:
375 if 'error' in resp:
376 output.execution_error_details = resp['error']['Message']
377 else:
378 output.execution_id = resp['action']['tag']
379 output.execution_status = resp['status']
380 if output.execution_status == 'failed':
381 output.execution_error_details = resp['message']
382 self._log.debug("jujuCA: execute action {} on service {} returned {}".
383 format(config.name, service, output.execution_status))
384 else:
385 self._log.error("jujuCA: error executing action {} for {} with {}".
386 format(config.name, service, params))
387 output.execution_id = ''
388 output.execution_status = 'failed'
389 output.execution_error_details = "Failed to queue the action"
390 break
391
392 except KeyError as e:
393 self._log.info("VNF %s does not have config primititves, e=%s", vnfr_id, e)
394
395 @asyncio.coroutine
396 def apply_config(self, agent_nsr, agent_vnfr, config, rpc_ip):
397 """ Notification on configuration of an NSR """
398 pass
399
400 @asyncio.coroutine
401 def apply_ns_config(self, agent_nsr, agent_vnfrs, rpc_ip):
402 """
403
404 ###### TBD - This really does not belong here. Looks more like NS level script ####
405 ###### apply_config should be called for a particular VNF only here ###############
406
407 Hook: Runs the user defined script. Feeds all the necessary data
408 for the script thro' yaml file.
409
410 Args:
411 rpc_ip (YangInput_Nsr_ExecNsConfigPrimitive): The input data.
412 nsr (NetworkServiceRecord): Description
413 vnfrs (dict): VNFR ID => VirtualNetworkFunctionRecord
414
415 """
416 def get_meta(agent_nsr):
417 unit_names, initial_params, vnfr_index_map = {}, {}, {}
418
419 for vnfr_id in agent_nsr.vnfr_ids:
420 juju_vnf = self._juju_vnfs[vnfr_id].vnfr
421
422 # Vnfr -> index ref
423 vnfr_index_map[vnfr_id] = juju_vnf['member_vnf_index']
424
425 # Unit name
426 unit_names[vnfr_id] = juju_vnf['vnf_juju_name']
427
428 # Flatten the data for simplicity
429 param_data = {}
430 self._log.debug("Juju Config:%s", juju_vnf['config'])
431 for primitive in juju_vnf['config'].initial_config_primitive:
432 for parameter in primitive.parameter:
433 value = self.xlate(parameter.value, juju_vnf['tags'])
434 param_data[parameter.name] = value
435
436 initial_params[vnfr_id] = param_data
437
438
439 return unit_names, initial_params, vnfr_index_map
440
441 unit_names, init_data, vnfr_index_map = get_meta(agent_nsr)
442
443 # The data consists of 4 sections
444 # 1. Account data
445 # 2. The input passed.
446 # 3. Juju unit names (keyed by vnfr ID).
447 # 4. Initial config data (keyed by vnfr ID).
448 data = dict()
449 data['config_agent'] = dict(
450 name=self._name,
451 host=self._ip_address,
452 port=self._port,
453 user=self._user,
454 secret=self._secret
455 )
456 data["rpc_ip"] = rpc_ip.as_dict()
457 data["unit_names"] = unit_names
458 data["init_config"] = init_data
459 data["vnfr_index_map"] = vnfr_index_map
460
461 tmp_file = None
462 with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
463 tmp_file.write(yaml.dump(data, default_flow_style=True)
464 .encode("UTF-8"))
465
466 self._log.debug("jujuCA: Creating a temp file: {} with input data".format(
467 tmp_file.name))
468
469 # Get the full path to the script
470 script = ''
471 if rpc_ip.user_defined_script[0] == '/':
472 # The script has full path, use as is
473 script = rpc_ip.user_defined_script
474 else:
475 script = os.path.join(self._rift_artif_dir, 'launchpad/libs', agent_nsr.id, 'scripts',
476 rpc_ip.user_defined_script)
477 self.log.debug("jujuCA: Checking for script in %s", script)
478 if not os.path.exists(script):
479 script = os.path.join(self._rift_install_dir, 'usr/bin', rpc_ip.user_defined_script)
480
481 cmd = "{} {}".format(rpc_ip.user_defined_script, tmp_file.name)
482 self._log.debug("jujuCA: Running the CMD: {}".format(cmd))
483
484 coro = asyncio.create_subprocess_shell(cmd, loop=self._loop,
485 stderr=asyncio.subprocess.PIPE)
486 process = yield from coro
487 err = yield from process.stderr.read()
488 task = self._loop.create_task(process.wait())
489
490 return task, err
491
492 @asyncio.coroutine
493 def apply_initial_config(self, agent_nsr, agent_vnfr):
494 """
495 Apply the initial configuration
496 Expect config directives mostly, not actions
497 Actions in initial config may not work based on charm design
498 """
499
500 vnfr = agent_vnfr.vnfr
501 service = vnfr['vnf_juju_name']
502
503 rc = yield from self.api.is_service_up(service=service)
504 if not rc:
505 return False
506
507 action_ids = []
508 try:
509 vnf_cat = agent_vnfr.vnfr_msg
510 if vnf_cat and vnf_cat.mgmt_interface.ip_address:
511 vnfr['tags'].update({'rw_mgmt_ip': vnf_cat.mgmt_interface.ip_address})
512 self._log.debug("jujuCA:(%s) tags: %s", vnfr['vnf_juju_name'], vnfr['tags'])
513
514 config = {}
515 try:
516 for primitive in vnfr['config'].initial_config_primitive:
517 self._log.debug("jujuCA:(%s) Initial config primitive %s", vnfr['vnf_juju_name'], primitive)
518 if primitive.name == 'config':
519 for param in primitive.parameter:
520 if vnfr['tags']:
521 val = self.xlate(param.value, vnfr['tags'])
522 config.update({param.name: val})
523 except KeyError as e:
524 self._log.exception("jujuCA:(%s) Initial config error(%s): config=%s",
525 vnfr['vnf_juju_name'], str(e), config)
526 config = None
527 return False
528
529 if config:
530 self.juju_log('info', vnfr['vnf_juju_name'],
531 "Applying Initial config:%s",
532 config)
533
534 rc = yield from self.api.apply_config(config, service=service)
535 if rc is False:
536 self.log.error("Service {} is in error state".format(service))
537 return False
538
539
540 # Apply any actions specified as part of initial config
541 for primitive in vnfr['config'].initial_config_primitive:
542 if primitive.name != 'config':
543 self._log.debug("jujuCA:(%s) Initial config action primitive %s",
544 vnfr['vnf_juju_name'], primitive)
545 action = primitive.name
546 params = {}
547 for param in primitive.parameter:
548 val = self.xlate(param.value, vnfr['tags'])
549 params.update({param.name: val})
550
551 self._log.info("jujuCA:(%s) Action %s with params %s",
552 vnfr['vnf_juju_name'], action, params)
553
554 resp = yield from self.api.execute_action(action, params,
555 service=service)
556 if 'error' in resp:
557 self._log.error("Applying initial config on {} failed for {} with {}: {}".
558 format(vnfr['vnf_juju_name'], action, params, resp))
559 return False
560
561 action_ids.append(resp['action']['tag'])
562
563 except KeyError as e:
564 self._log.info("Juju config agent(%s): VNFR %s not managed by Juju",
565 vnfr['vnf_juju_name'], agent_vnfr.id)
566 return False
567 except Exception as e:
568 self._log.exception("jujuCA:(%s) Exception juju apply_initial_config for VNFR {}: {}".
569 format(vnfr['vnf_juju_name'], agent_vnfr.id, e))
570 return False
571
572 # Check if all actions completed
573 pending = True
574 while pending:
575 pending = False
576 for act in action_ids:
577 resp = yield from self.api.get_action_status(act)
578 if 'error' in resp:
579 self._log.error("Initial config failed: {}".format(resp))
580 return False
581
582 if resp['status'] == 'failed':
583 self._log.error("Initial config action failed: {}".format(resp))
584 return False
585
586 if resp['status'] == 'pending':
587 pending = True
588
589 return True
590
591 def add_vnfr_managed(self, agent_vnfr):
592 if agent_vnfr.id not in self._juju_vnfs.keys():
593 self._log.info("juju config agent: add vnfr={}/{}".
594 format(agent_vnfr.name, agent_vnfr.id))
595 self._juju_vnfs[agent_vnfr.id] = agent_vnfr
596
597 def is_vnfr_managed(self, vnfr_id):
598 try:
599 if vnfr_id in self._juju_vnfs:
600 return True
601 except Exception as e:
602 self._log.debug("jujuCA: Is VNFR {} managed: {}".
603 format(vnfr_id, e))
604 return False
605
606 @asyncio.coroutine
607 def is_configured(self, vnfr_id):
608 try:
609 agent_vnfr = self._juju_vnfs[vnfr_id]
610 vnfr = agent_vnfr.vnfr
611 if vnfr['active']:
612 return True
613
614 vnfr = self._juju_vnfs[vnfr_id].vnfr
615 service = vnfr['vnf_juju_name']
616 resp = self.api.is_service_active(service=service)
617 self._juju_vnfs[vnfr_id]['active'] = resp
618 self._log.debug("jujuCA: Service state for {} is {}".
619 format(service, resp))
620 return resp
621
622 except KeyError:
623 self._log.debug("jujuCA: VNFR id {} not found in config agent".
624 format(vnfr_id))
625 return False
626 except Exception as e:
627 self._log.error("jujuCA: VNFR id {} is_configured: {}".
628 format(vnfr_id, e))
629 return False
630
631 @asyncio.coroutine
632 def get_config_status(self, agent_nsr, agent_vnfr):
633 """Get the configuration status for the VNF"""
634 rc = 'unknown'
635
636 try:
637 vnfr = agent_vnfr.vnfr
638 service = vnfr['vnf_juju_name']
639 except KeyError:
640 # This VNF is not managed by Juju
641 return rc
642
643 rc = 'configuring'
644
645 if not self.check_task_status(service, 'deploy'):
646 return rc
647
648 try:
649 resp = yield from self.api.get_service_status(service=service)
650 self._log.debug("jujuCA: Get service %s status? %s", service, resp)
651
652 if resp == 'error':
653 return 'error'
654 if resp == 'active':
655 return 'configured'
656 except KeyError:
657 self._log.error("jujuCA: Check unknown service %s status", service)
658 except Exception as e:
659 self._log.error("jujuCA: Caught exception when checking for service is active: %s", e)
660 self._log.exception(e)
661
662 return rc
663
664 def get_action_status(self, execution_id):
665 ''' Get the action status for an execution ID
666 *** Make sure this is NOT a asyncio coroutine function ***
667 '''
668
669 try:
670 self._log.debug("jujuCA: Get action status for {}".format(execution_id))
671 resp = self.api._get_action_status(execution_id)
672 self._log.debug("jujuCA: Action status: {}".format(resp))
673 return resp
674 except Exception as e:
675 self._log.error("jujuCA: Error fetching execution status for %s",
676 execution_id)
677 self._log.exception(e)
678 raise e
679
680 def get_service_status(self, vnfr_id):
681 '''Get the service status, used by job status handle
682 Make sure this is NOT a coroutine
683 '''
684 service = self.get_service_name(vnfr_id)
685 if service is None:
686 self._log.error("jujuCA: VNFR {} not managed by this Juju agent".
687 format(vnfr_id))
688 return None
689
690 # Delay for 3 seconds before checking as config apply takes a
691 # few seconds to transfer to the service
692 time.sleep(3)
693 return self.api._get_service_status(service=service)