Shortcuts

hfai.client.remote

GlobalSession

远程运行的Session

SessionConfig

配置Session,本地还是远程,远程用哪个分组

class hfai.client.remote.GlobalSession(main_globals, modules, session_config=None)[source]

远程运行的Session

在主进程构造一个 session,用于管理用户开发的 modules,并且,比如设置 modules 中的 global values 逻辑上和 multiprocess.pool 有点接近,可以以此类别,帮助理解 详细配置见 SessionConfig

Examples:

# demo.py
foo = 'bar'
def func1(param1, param2):
    print(foo, param1, param2)
    return f'{foo}_{param1}_{param2}'
# main.ipynb
# 构造 session 来运行
import demo as demo2
session = GlobalSession(globals(), (demo2, ), session_config=SessionConfig())
with session.modules_globals():
    foo = 1
session.apply(demo2.func1, (1, ), {'param2': 'b'}) # 使用 foo = 1 且 demo 修改了会 自动 reload
session.apply(demo2.func1, (1, ), {'param2': 'b'}, local=False) # 远程运行

# 在 local + inline 运行模式中,等价与:
importlib.reload(demo2)
session.update_values(demo2)
demo2.func1('a', 'b')
apply(func, args=(), kwargs={}, local=None, inline=None, group=None, blocking=True, stdout=True, renew_global_var=False, packaging_func=None)[source]

运行注册了的 module 中的 func 函数,运行 module.func(*args, **kwargs)

根据配置有以下三种核心模式(直接模式)与 packaging 特殊模式(建议使用 package 接口):

  1. local = True and inline = True:

# 相当于下面的代码
importlib.reload(module)
session.update_values(module)
pool.apply(module.func, args, kwargs)
  1. local = True and inline = False

# 注意,会带来 pkl 的开销,建议在使用 remote 之前 inline False 跑一跑
# 一般而言,这个模式能跑的,remote 模式也能跑
# inline = False 可以通过,async_result.stdout() 来打印输出
python remote_call.py --module module.py --func function
  1. local = False and inline = False:

# 提交任务远程运行
hfai python remote_call.py --module module.py --func function -- --nodes 1
  1. packaging 模式: packaging_options:

# 把多进程任务打包到一个任务上运行
# pool.map(func, iterable=args)
# pool.starmap(func, iterable=args)
Parameters
  • func – 模块中的函数,输入中需要带上 module.,如 demo.func1 这样

  • args – func 的 args 参数

  • kwargs – func 的 kwargs 参数

  • local – 本地运行;不指定的话,会使用 session 构造中传入的 session_config

  • inline – local 下的 inline 模式, 不建议使用;不指定的话,会使用 session 构造中传入的 session_config

  • group – 远程运行到哪个分组的机器上

  • blocking – True 同步接口,False 异步接口,返回 AsyncResult

  • stdout – 是否在 outline 任务打印 stdout,apply 接口先不要用

  • renew_global_var – 在调用函数的时候,自动更新 global var

  • packaging_func – None, 表示不起用 outline 时候使用,此时,args 是一个 iterable,会启动一个 pool 来运行 func 把多个任务打包提交上去跑

Returns

blocking: func(*args, **kwargs) 的结果;not blocking,返回 AsyncResult

apply_async(func, args=(), kwargs={}, local=None, inline=None, group=None, renew_global_var=False, stdout=True)[source]

一个异步的接口,返回一个 AsyncResult;

load_global_vars(saved_global_values_path)[source]

从指定路径中加载上次保存的 global var

Parameters

saved_global_values_path

Returns

map(func, iterable=None, local=None, inline=None, group=None, blocking=True, stdout=False, star=False)[source]

多进程迭代运行,并且返回输出 list, 类比 pool.map, pool.starmap

注意事项:
  1. 远程运行如果没有资源会一个个排队

  2. args_list 和 kwargs_list 不能同时存在

Parameters
  • func – 需要引用的函数

  • iterable – [(1, 2), (3, 4)] -> start = True: func(1, 2), func(3, 4); star = False: func((1, 2), ), func((3, 4), )

  • local – 本地运行

  • inline

  • group – 远程运行到哪个分组的机器上

  • blocking – 是否放后台运行

  • stdout – 要不要 stdout 输出内容,默认 False 防止 map 太多打爆;outline 会把 stdout pkl 下来方便日后查看

  • star – True 的时候相当于 pool 的 starmap

Returns

blocking True 返回对应的输出;False 返回List[AsyncResult]

map_async(func, iterable=None, local=None, group=None)[source]

map 的异步调用

Returns

List[Async_result]

modules_globals()[source]

记录,并且更新 modules 中的 global_var(若有)

注意:

  1. 对于 [引用] 的变量,在 modules_globals 记录之后,可以直接赋值,会进行更新
    with session.modules_globals():
        a = []
    a.append(1)    # module.a -> [1]
    a = [2, 3, 4]  # module.a -> [2, 3, 4]
    
  2. 对于 [赋值] 的变量,如 int、str,在 modules_globals 记录之后,直接赋值不会进行更新,

    2.1 在同一个notebook cell中需要 reload 或者 with 重新更新;

    2.2 不同 cell 之间,我们注册了 auto reload 机制,会在 cell 运行之前设置变量、更新代码

    2.3 建议在赋值完成之后,启动一个新的 cell 运行,这样也比较清晰

    # session = GlobalSession(auto_reload_interval=2) # 构造的时候输入也可以
    # session.auto_reload(2)  # 显示调用,一般不用,在构造的时候可以传入,让他进行自动 reload
    
    with session.modules_globals():
        a = 1
    # 同一个 cell
    a = 2
    print(module.a) # -> 1, 赋值的不会变;
    session.reload() # 或者直接调用
    print(module.a) # -> 2, reload 之后就变了
    
    #[] cell 0
    # a = 3
    #[] cell 1
    print(module.a) # -> 3, 新启动 cell 会进行 auto reload
    
    # 另外重新记录的话,会立刻改变
    with session.modules_globals():
        a = 3      # module.a -> 3
    

Examples:

# main.ipynb
# 构造 session 来运行
import demo as demo2
demo2.foo = 123
session = GlobalSession(globals(), (demo2, ), session_config=SessionConfig())
with session.modules_globals():
    foo = 1  # 如果 demo2 中定义了 doo,会更新demo2 中的 foo 为 1
package(func, iterable=None, local=None, group=None, blocking=True, stdout=False, star=False)[source]

将多进程任务打包成一个任务运行,主要适用于我们想要提交远程任务的时候,方便在一个机器上启动一个多进程任务,而不是启动一堆任务

实现上调用的是 pool.map/pool.starmap,所以使用 iterable 作为接口名字,不是 inline 模式

session.package(demo.func, [0, 1, 2], star=False)

# 等同于
# demo_p.py
def package_func():
    pool = multiprocessing.Pool(process_limit)
    return pool.map(demo.func, [(0, 1), (2, 3)])
    # return pool.starmap(demo.func, [(0, 1), (2, 3)])  若 star = True
session.apply(demo_p.package_func(), [0, 1, 2], local=False)
Parameters
  • func – 需要引用的函数

  • iterable – [(1, 2), (3, 4)] -> star = True: func(1, 2), func(3, 4) else func((1, 2)), func((3, 4))

  • local – 本地运行

  • group – 远程运行到哪个分组的机器上

  • blocking – 是否放后台运行

  • stdout – 要不要 stdout 输出内容,默认 False 防止进程太多打爆;outline 会把 stdout pkl 下来方便日后查看

  • star – 是否使用 starmap 来处理,默认使用 map

Returns

blocking True 返回对应的输出;False 返回List[AsyncResult]

package_async(func, iterable=None, local=None, group=None, stdout=False)[source]

starpackage, 类比 pool.starmap

iterable: [(1, 2), (3, 4)] -> func(1, 2), func(3, 4)

save_global_vars()[source]

手动触发保存 global var,这样不用每次调用的时候都存一遍了 :return:

set_process_limit(process_limit=0)[source]

动态调整,限制运行时候的 process pool 进程数量,注意,已经启动的 process 不会被关闭

Parameters

process_limit – 0,表示不限制,将使用 cpu 的 process 数量

Returns

starmap(func, iterable=None, local=None, inline=None, group=None, blocking=True, stdout=False)[source]

iterable: [(1, 2), (3, 4)] -> start = True: func(1, 2), func(3, 4)

Returns

List[Async_result]

starmap_async(func, iterable=None, local=None, inline=None, group=None, stdout=False)[source]

iterable: [(1, 2), (3, 4)] -> start = True: func(1, 2), func(3, 4)

Returns

List[Async_result]

starpackage(func, iterable=None, local=None, group=None, stdout=False)[source]

starpackage 的异步调用,返回 async result

Parameters
  • func

  • iterable

  • local

  • group

  • stdout

Returns

starpackage_async(func, iterable=None, local=None, group=None, stdout=False)[source]

package 的异步调用,返回 async result

Parameters
  • func

  • iterable

  • local

  • group

  • stdout

Returns

class hfai.client.remote.SessionConfig(local=True, inline=True, group='jd_dev_alpha', nb_auto_reload=True, process_limit=0)[source]

配置Session,本地还是远程,远程用哪个分组

Parameters
  • local – 本地运行, 默认 True

  • inline – 在本地运行时候,不使用 python module.py 的模式,默认 True

  • group – 远程运行时候提交的分组, 默认 jd_dev_alpha

  • nb_auto_reload – 在运行 cell 之前自动 reload module

  • process_limit – 后台运行的子进程数量, 0 表示和 cpu_count 一样