#!/usr/bin/env python3 # Copyright 2017 The Australian National University # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import sys import yaml import logging import crayons import json from pathlib import Path from types import SimpleNamespace from mubench import SUITE_DIR, CALLBACKS_DIR from mubench.conf import settings from mubench.util import expandenv, dictify, run_in_subproc, ExecutionFailure from mubench.util import add_path_to_ld_library_path from mubench.lang import get_lang, Language from mubench.models.result import Result from mubench.models import cbconf logger = logging.getLogger(__name__) def task_print(task_name, message): logger.info("[{}] {}".format( crayons.yellow(task_name), message )) class TaskSet: def __init__(self, name, benchmark, iterations, callback, runnerwrap=None, **kwds): self.name = name self.benchmark = benchmark self.iterations = iterations self.callback = callback self.runnerwrap = runnerwrap self.output_dir = kwds['output_dir'] self.resfile = kwds['resfile'] self.tasks = [] self.comparison = kwds['comparisons'] # environ self.env = os.environ.copy() # base on os.environ self.env.update(getattr(settings, 'ENVIRON', {})) # local settings self.env['MUBENCH_TASKSET_NAME'] = name # taskset name ts_env = kwds['env'] # taskset definitions for v in ts_env: # first expand the environs (based on what's been defined so far) ts_env[v] = expandenv(ts_env[v], self.env) self.env.update(ts_env) # configure callback conf_func = getattr(cbconf, 'configure_cb_%(name)s' % callback) conf_func(callback, self.env) # expand environs in benchmark args and callback param and paths def get_expanded_list(l): return list(map(lambda a: expandenv(str(a), self.env), l)) benchmark['args'] = get_expanded_list(benchmark['args']) callback['param'] = expandenv(callback['param'], self.env) callback['include_dirs'] = get_expanded_list(callback['include_dirs']) callback['library_dirs'] = get_expanded_list(callback['library_dirs']) callback['extra_srcs'] = get_expanded_list(callback['extra_srcs']) # compiled callback shared library libext = '.dylib' if sys.platform == 'darwin' else '.so' add_path_to_ld_library_path(str(self.output_dir), self.env) self.callback['dylib'] = self.output_dir / ('libcb_%(name)s' % self.callback + libext) for d in self.callback['library_dirs']: add_path_to_ld_library_path(d, self.env) @staticmethod def from_config_dict(name, conf_d, conf_dir=None): # output directory output_dir = Path(conf_d.get('outdir', str(conf_dir))) # check iterations assert 'iterations' in conf_d, 'iterations not defined' # check benchmark # check name assert 'benchmark' in conf_d, 'benchmark not defined' assert (SUITE_DIR / conf_d['benchmark']['name']).exists(), \ "benchmark %(name)s not found" % conf_d['benchmark'] conf_d['benchmark'].setdefault('args', []) # check record file # check record resfile = Path(conf_d.get('recfile', '%(name)s.json' % locals())) if not resfile.is_absolute(): resfile = output_dir / resfile if not resfile.parent.exists(): resfile.parent.mkdir(parents=True) # check callback assert 'callback' in conf_d, 'callback not defined' d = dictify(conf_d['callback']) if 'param' not in d or d['param'] is None: d['param'] = "" # default to "" if 'include_dirs' not in d or d['include_dirs'] is None: d['include_dirs'] = [] # default to [] if 'library_dirs' not in d or d['library_dirs'] is None: d['library_dirs'] = [] # default to [] if 'libraries' not in d or d['libraries'] is None: d['libraries'] = [] # default to [] if 'extra_srcs' not in d or d['extra_srcs'] is None: d['extra_srcs'] = [] # default to [] if 'flags' not in d or d['flags'] is None: d['flags'] = [] # default to [] conf_d['callback'] = d # add comparison comparisons = [] if 'compare' in conf_d: for cmp in conf_d["compare"]: comparisons.append(SimpleNamespace(op1=cmp[0], op2=cmp[1])) ts = TaskSet(name, conf_d['benchmark'], conf_d['iterations'], conf_d['callback'], output_dir=output_dir, resfile=resfile, env=conf_d.get('environ', {}), comparisons=comparisons) # add tasks for task_name, task_conf in conf_d['tasks'].items(): try: ts.tasks.append(Task(ts, task_name, **task_conf)) except Exception as e: task_print(task_name, crayons.red('parsing configuration failed.')) logger.critical(crayons.red(str(e))) # ts.tasks.append(Task(ts, task_name, **task_conf)) return ts def run(self, skipcomp_l): # compile callback into shared library first self.compile_callback() # compile first targets = {} for task in self.tasks: if not task.lang_cls.compiled: # interpreted targets[task] = task.srcfile else: # need compilation if task.name in skipcomp_l: # skip compilation -> assume default target targets[task] = task.get_default_target() else: task_print(task.name, 'compiling...') try: target = task.compile() task_print(task.name, 'target %s generated' % target) targets[task] = target except ExecutionFailure as e: task_print(task.name, crayons.red('FAILED')) logger.critical(crayons.red(str(e))) errlog_file = self.output_dir / (task.name + '.log') e.dump(errlog_file) task_print(task.name, crayons.red( 'error output written to %s' % errlog_file)) # run data = {t: [] for t in targets} # only run tasks that have a target # Generating record for i in range(self.iterations): logger.info("Running iteration %d..." % i) keys = list(data.keys()) for task in keys: target = targets[task] try: res, t_proc = task.run(target) task.add_datapoint(res.stdout, res.stderr, t_proc) data[task].append({ 'stdout': str(res.stdout, encoding='utf-8'), 'stderr': str(res.stderr, encoding='utf-8'), 't_proc': t_proc }) except ExecutionFailure as e: task_print(task.name, crayons.red('FAILED')) logger.critical(crayons.red(str(e))) errlog_file = self.output_dir / (task.name + '.log') e.dump(errlog_file) task_print(task.name, crayons.red( 'error output written to %s' % errlog_file)) del data[task] self.callback['dylib'] = str(self.callback['dylib']) # convert into string for dumping record = { 'name': self.name, 'iterations': self.iterations, 'benchmark': self.benchmark, 'callback': self.callback, 'results': {t.name: data[t] for t in data}, } # save to result file with self.resfile.open('w') as fp: json.dump(record, fp, indent=2, separators=(', ', ': ')) # TODO: restructure this for task in data: task.aggregate_datapoint() self.results = {task.name: task.get_result() for task in data} return self.results # TODO: debug def compile_callback(self): cmd = [] cc = self.env.get('CC', 'clang') cmd.append(cc) cmd.extend(['-shared', '-fPIC']) # include_dirs cmd.extend(map(lambda s: '-I' + s, self.callback['include_dirs'])) # library_dirs cmd.extend(map(lambda s: '-L' + s, self.callback['library_dirs'])) # libraries cmd.extend(map(lambda s: '-l' + s, self.callback['libraries'])) # flags cmd.extend(self.callback['flags']) # output cmd.extend(['-o', self.callback['dylib']]) # source cmd.append(CALLBACKS_DIR / ('cb_%(name)s.c' % self.callback)) cmd.extend(self.callback['extra_srcs']) run_in_subproc(cmd, self.env) class Task: """ An task of benchmark performance measurement; corresponds to the outmost level mapping in YAML configuration file """ def __init__(self, taskset, name, **conf): self.taskset = taskset self.name = name self.env = taskset.env.copy() # based on taskset environ self.env['MUBENCH_TASK_NAME'] = name task_env = conf.get('environ', {}) for v in task_env: task_env[v] = expandenv(task_env[v], self.env) self.env.update(task_env) self.output_dir = taskset.output_dir # benchmark self.benchmark = taskset.benchmark # callback self.callback = taskset.callback # check source assert 'source' in conf, 'source not defined' src = SUITE_DIR / self.benchmark['name'] / conf['source'] assert src.exists(), "source file %(src)s not found" % locals() conf['source'] = src self.srcfile = src # language lang_d = dictify(conf.get('language', {})) assert 'name' in lang_d, 'language not defined' self.lang_cls = get_lang(lang_d['name']) self.lang = self.lang_cls.check_lang(lang_d) # set defaults for others self.compiler = self.lang_cls.check_compiler(conf.get('compiler', {}), self.lang, self) self.runner = self.lang_cls.check_runner(conf.get('runner', {}), self.lang, self) self.config = conf self.data_callback = [] self.data_t_proc = [] def compile(self): if self.lang_cls.compiled: return self.lang_cls.compile(self) def run(self, target): res = self.lang_cls.run(target, self) return res def get_default_target(self): return self.lang_cls.get_default_target(self) # TODO: maybe refactor this. # Results and data should not be part of a taskset/task, # but rather the Result should be *about* a TaskSet def add_datapoint(self, stdout, stderr, t_proc): if self.callback['name'] == 'clock': try: dp = float(stdout.split()[-1]) except: raise RuntimeError( "Cannot extract duration from last line of stdout") else: msg = "'%(name)s' callback output processing not implemented" % self.callback raise NotImplementedError(msg) self.data_callback.append(dp) self.data_t_proc.append(float(t_proc)) def aggregate_datapoint(self): self.result_callback = Result(self.data_callback, "{}:{} callback".format(self.taskset.name, self.name)) self.result_t_proc = Result(self.data_t_proc, "{}:{} t_proc".format(self.taskset.name, self.name)) def get_result(self): return SimpleNamespace(callback=self.result_callback, t_proc=self.result_t_proc) def __str__(self): return self.name def load_yaml(yaml_s, run_dir): config_d = yaml.load(yaml_s) tasksets = [] for name, ts_conf_d in config_d.items(): tasksets.append(TaskSet.from_config_dict(name, ts_conf_d, run_dir)) return tasksets def load_file(config_file): with open(config_file) as fp: return load_yaml(fp.read(), Path(config_file).parent)