Shortcuts

Source code for hfai.autotune.autotune

from hfai.client import create_experiment_v2
from io import StringIO
import numpy as np
import itertools
from tabulate import tabulate
import yaml
import json
import copy
import os

def asyncmain(jobList):
    import asyncio
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(jobList))
    loop.close()

async def createJob(config):
    exp = await create_experiment_v2(config)
    return exp

def args2name(args:dict) -> str:
    name = ""
    for key in args:
        name += f"_{key}_{args[key]}"
    return name

def checkEnv() -> bool:
    return True if "AUTOTUNE_LOG_DIR" in os.environ else False

[docs]def set_debug_mode(): ''' 将autotune设置为debug模式 在调试模式下,训练任务会在本地运行。当任务有多组参数时,只会运行其中的一组用于调试。 Examples: >>> hfai.autotune.set_debug_mode() >>> hfai.autotune.run(config, args, log_dir) #此时任务将在本地而非集群上运行 ''' os.environ["AUTOTUNE_DEBUG"] = "True" print("Autotune is set to debug mode, job will run locally")
def runLocal(cfg:dict): ''' 本地运行单个调参任务 使用hfai python/bash发起本地调试,测试任务是否正常运行 ''' #设置配置文件中的环境变量 bashscript = f'cd {cfg["spec"]["workspace"]};' for envname in cfg["spec"]["environments"]: envvalue = cfg["spec"]["environments"][envname] bashscript += f"export {envname}={envvalue};" bashscript += "export WORLD_SIZE=1;" # 处理任务发起 entrytype = os.path.splitext(cfg['spec']['entrypoint'])[-1] if entrytype == ".py": bashscript += f"hfai python {cfg['spec']['entrypoint']}" elif entrytype == ".sh": bashscript += f"hfai bash {cfg['spec']['entrypoint']}" else: raise AssertionError(f"Experiment entrypoint file type: {entrytype} is not valid, should be .py/.sh") if "parameters" in cfg["spec"]: bashscript += f" {cfg['spec']['parameters']}" ec = os.system(bashscript) assert ec == 0
[docs]def run(config:str, tunable_args:list, log_dir:str): ''' 按给定超参组合发起训练任务 Args: config (str): 记录除了待调参数以外的训练配置的yaml文件路径或yaml字符串,格式参考 `hfai配置文件示例 <http://10.2.201.15:8081/preview/api/client.html#hfai.client.create_experiment_v2>`_ 。 tunable_args (list): 所有待调参数组合的列表, 如[{"lr":0.1, "bs":256}]代表使用这一组参数进行发起一个训练任务 log_dir (str): 记录训练结果的log文件夹路径 Returns: None Examples: >>> args = [{"lr":0.1, "bs":256}, {"lr":1e-3, "bs":64}] >>> hfai.autotune.run(config = "autotunetest.yaml", tunable_args = args, log_dir = "autotune_test_log") ''' if isinstance(config, str): config_file = os.path.expanduser(config) if os.path.exists(config_file): cfg = yaml.load(open(config_file), Loader=yaml.FullLoader) else: cfg = yaml.load(StringIO(config), Loader=yaml.FullLoader) else: assert 0, 'Unknown input config type' if "environments" not in cfg["spec"] or cfg["spec"]["environments"] == None: cfg["spec"]["environments"] = dict() # 通过环境变量传递参数,创建调参任务 expList = [] cfg["spec"]["environments"]["AUTOTUNE_LOG_DIR"] = log_dir for args in tunable_args: _cfg = copy.deepcopy(cfg) _cfg["name"] += "_autotune"+args2name(args) for argname in args: _cfg["spec"]["environments"][f"AUTOTUNE_ARGS_{argname}"] = args[argname] if "AUTOTUNE_DEBUG" in os.environ: print(f"One debug job is started with the following params:\n{args}") runLocal(_cfg) return expList.append(createJob(yaml.dump(_cfg))) asyncmain(expList) print(f"{len(tunable_args)} jobs are started")
[docs]def get_args() -> dict: ''' 在训练代码中获取超参数 Returns: argsdata (dict): 包含待调参数名与其使用的值的字典, key与value分别是参数名和参数值 Examples: >>> args = hfai.autotune.get_args() >>> lr = args["lr"] >>> batch_size = args["bs"] ''' if not checkEnv(): raise EnvironmentError("Autotune environment not found.") args = [(env.strip("AUTOTUNE_ARGS_"), os.environ[env]) for env in os.environ if "AUTOTUNE_ARGS_" in env] argsdata = dict(args) return argsdata
[docs]def report(metrics:dict): ''' 在训练代码中汇报训练结果 Args: metrics (dict): 记录训练指标结果的字典, key与value分别是指标名和结果值 Returns: None Examples: >>> metrics = get_evaluation_result() #获取字典形式的训练结果 >>> metrics {"prec":0.85, "recall":0.78, "loss":0.13} >>> hfai.autotune.report(metrics) ''' if not checkEnv(): raise EnvironmentError("Autotune environment not found.") log_dir = os.environ["AUTOTUNE_LOG_DIR"] args = args2name(get_args()) if not os.path.exists(log_dir): os.makedirs(log_dir) with open(f"{log_dir}/log{args}.json", "w") as log_file: json.dump(metrics, log_file) print(f"Training result is recorded in {log_dir}/log{args}.json")
def printResult(results:list): if results == []: return tabledata = [["Log Name"] + [metric for metric in results[0][1]]] for line in results: tabledata.append([line[0]] + [data for data in line[1].values()]) print(tabulate(tabledata, headers='firstrow', tablefmt="fancy_grid"))
[docs]def show_result(log_dir:str, metric:str, sort_op = "max", mode = "all") -> list: ''' 在训练完成后统计训练结果 汇总不同参数下训练的模型的最终指标,返回最优或全部结果并以表格形式打印输出 Args: log_dir (str): 存储待统计结果的log文件夹名 metric (str): 选取最优结果时的评价指标 sort_op (str): 结果排序方式,默认为从大到小(max), 可改为从小到大(min) mode (str): 默认为"all"返回排序后的全部结果, 可改为"best"返回最好结果 Returns: results (list): 返回排序后的训练结果列表 Examples: >>> result = hfai.autotune.show_result("train_log", "prec") +---------------------------------+--------+---------+ | Log Name | Prec | Recall | +=================================+========+=========+ | log_lr_0.001_batchsize_256.json | 0.82 | 0.85 | ...... ''' results = [] for log_file in os.listdir(log_dir): with open(os.path.join(log_dir, log_file), "r") as f: result = json.load(f) results.append((log_file, result)) if sort_op == "max": reverse = True elif sort_op == "min": reverse = False else: raise ValueError(f"Expected sort_op \"max\" or \"min\", but got: {sort_op}.") results.sort(key = lambda x:float(x[1][metric]), reverse=reverse) if mode == "best": results = [results[0]] elif mode == "all": pass else: raise ValueError(f"Expected mode \"best\" or \"all\", but got: {mode}.") printResult(results) return results