基于 openYuanrong 实现蒙特卡罗方法#
蒙特卡罗(Monte Carlo)方法,或称计算机随机模拟方法,是一种基于“随机数”的计算方法,它是高性能计算 HPC 常见的算法之一。一个简单的用法是计算圆周率 π,主要思想是,在边长为 R 的正方形内画一个内切圆,然后在正方形内随机打点,则点落在圆内的概率为圆面积除以正方形面积,即:π/4 。据此通过计算概率,就能估算出 π 的值,并且打的点越多,估算的精度越高。
本示例将向您展示如何使用 openYuanrong 开发一个简单的 HPC 服务,实现动态任务并行,它包含以下内容:
如何使用无状态函数并行多个任务。
如何使用有状态函数获取并保存并行任务的执行状态。
方案介绍#
我们使用 Python 语言,通过定义一个无状态函数来实现打点任务,一个有状态函数来统计任务的运行状态。代码可以运行在单台主机上,也可直接扩展到集群上,以配置更大的任务量提升计算精度。
准备工作#
参考在主机上部署完成 openYuanrong 部署。
实现流程#
定义有状态函数统计打点任务的状态#
我们通过 @yr.instance 装饰类 TaskSummary 来定义一个有状态函数,用以统计打点任务的运行状态。打点任务通过调用它的 report_status 方法上报任务状态,主程序通过调用它的 get_task_status 方法获取所有任务的状态。该类的实例会在远程进程上运行,其成员变量 task_started_num 等在运行中一直保持状态。持有类对象的句柄即可调用它的方法,改变成员变量的状态。
class TaskStatus(Enum):
PENDING = -1
STARTED = 0
COMPLETED = 1
@yr.instance
class TaskSummary:
def __init__(self, total_task_num: int):
self.total_task_num = total_task_num
self.task_started_num = 0
self.task_completed_num = 0
def report_status(self, task_status: TaskStatus) -> None:
if task_status == TaskStatus.STARTED:
self.task_started_num += 1
elif task_status == TaskStatus.COMPLETED:
self.task_completed_num += 1
def get_task_status(self) -> int:
return self.task_started_num, self.task_completed_num
定义无状态函数处理打点任务#
我们通过 @yr.invoke 装饰普通函数 monte_carlo_pi 来定义一个无状态函数,用以处理打点任务,统计落入圆中的点的个数。函数实例会在远程进程上异步运行,通过有状态函数 TaskSummary 的句柄调用 report_status 方法上报任务状态。
@yr.invoke
def monte_carlo_pi(total_points: int, task_summary: yr.decorator.instance_proxy.InstanceProxy) -> int:
# 上报任务已开始
task_summary.report_status.invoke(TaskStatus.STARTED)
circle_points = 0
for i in range(total_points):
rand_x = random.uniform(-1, 1)
rand_y = random.uniform(-1, 1)
origin_dist = rand_x**2 + rand_y**2
if origin_dist <= 1:
circle_points += 1
# 上报任务已结束
task_summary.report_status.invoke(TaskStatus.COMPLETED)
return circle_points
定义主流程#
主流程中通过 yr.init() 初始化 openYuanrong 运行上下文,您可以根据自己的集群规模调整任务的数量。
我们通过 yr.InvokeOptions 指定了无状态函数任务运行所需资源量,以便更好的观察它们的并行状态。使用 monte_carlo_pi.options(opt).invoke(POINTS_PER_TASK, task_summary) 启动任务,在 while 循环中查询任务的运行状态,直到所有任务全部完成。
最后,汇总各任务返回的打点数据计算 π,并调用 yr.finalize() 来清理上下文。
import yr
import time
import random
from enum import Enum
if __name__ == '__main__':
yr.init()
# 根据集群规模调整
TASKS_NUM = 15
POINTS_PER_TASK = 10000000
TOTAL_POINTS = TASKS_NUM * POINTS_PER_TASK
# 创建负责任务统计的有状态函数实例
task_summary = TaskSummary.invoke(POINTS_PER_TASK)
# 并行执行所有任务,这里也可以不指定资源需求
opt = yr.InvokeOptions(cpu=1000, memory=1000)
results = [
monte_carlo_pi.options(opt).invoke(POINTS_PER_TASK, task_summary)
for i in range(TASKS_NUM)
]
# 查询任务运行状态
while True:
started_num, completed_num = yr.get(task_summary.get_task_status.invoke())
print("Total tasks:", TASKS_NUM, "Started:", started_num, "Completed", completed_num)
if completed_num == TASKS_NUM:
break
time.sleep(1)
# 计算最终结果
circle_points = sum(yr.get(results))
pi = (circle_points * 4) / TOTAL_POINTS
print(f"π is: {pi}")
yr.finalize()
运行程序#
完整代码
import yr
import time
import random
from enum import Enum
class TaskStatus(Enum):
PENDING = -1
STARTED = 0
COMPLETED = 1
@yr.invoke
def monte_carlo_pi(total_points: int, task_summary: yr.decorator.instance_proxy.InstanceProxy) -> int:
# 上报任务已开始
task_summary.report_status.invoke(TaskStatus.STARTED)
circle_points = 0
for i in range(total_points):
rand_x = random.uniform(-1, 1)
rand_y = random.uniform(-1, 1)
origin_dist = rand_x**2 + rand_y**2
if origin_dist <= 1:
circle_points += 1
# 上报任务已结束
task_summary.report_status.invoke(TaskStatus.COMPLETED)
return circle_points
@yr.instance
class TaskSummary:
def __init__(self, total_task_num: int):
self.total_task_num = total_task_num
self.task_started_num = 0
self.task_completed_num = 0
def report_status(self, task_status: TaskStatus) -> None:
if task_status == TaskStatus.STARTED:
self.task_started_num += 1
elif task_status == TaskStatus.COMPLETED:
self.task_completed_num += 1
def get_task_status(self) -> int:
return self.task_started_num, self.task_completed_num
if __name__ == '__main__':
yr.init()
# 根据集群规模调整
TASKS_NUM = 15
POINTS_PER_TASK = 10000000
TOTAL_POINTS = TASKS_NUM * POINTS_PER_TASK
# 创建负责任务统计的有状态函数实例
task_summary = TaskSummary.invoke(POINTS_PER_TASK)
# 并行执行所有任务,这里也可以不指定资源需求
opt = yr.InvokeOptions(cpu=1000, memory=1000)
results = [
monte_carlo_pi.options(opt).invoke(POINTS_PER_TASK, task_summary)
for i in range(TASKS_NUM)
]
# 查询任务运行状态
while True:
started_num, completed_num = yr.get(task_summary.get_task_status.invoke())
print("Total tasks:", TASKS_NUM, "Started:", started_num, "Completed", completed_num)
if completed_num == TASKS_NUM:
break
time.sleep(1)
# 计算最终结果
circle_points = sum(yr.get(results))
pi = (circle_points * 4) / TOTAL_POINTS
print(f"π is: {pi}")
yr.finalize()
在一个单节点 CPU 资源 9000(单位:1/1000 核)的 openYuanrong 环境上运行,输出如下:
Total tasks: 15 Started: 8 Completed 0
Total tasks: 15 Started: 9 Completed 0
Total tasks: 15 Started: 9 Completed 0
Total tasks: 15 Started: 9 Completed 0
Total tasks: 15 Started: 9 Completed 0
Total tasks: 15 Started: 9 Completed 0
Total tasks: 15 Started: 9 Completed 0
Total tasks: 15 Started: 9 Completed 0
Total tasks: 15 Started: 9 Completed 0
Total tasks: 15 Started: 9 Completed 0
Total tasks: 15 Started: 9 Completed 0
Total tasks: 15 Started: 9 Completed 0
Total tasks: 15 Started: 10 Completed 1
Total tasks: 15 Started: 13 Completed 4
Total tasks: 15 Started: 14 Completed 5
Total tasks: 15 Started: 15 Completed 7
Total tasks: 15 Started: 15 Completed 9
Total tasks: 15 Started: 15 Completed 9
Total tasks: 15 Started: 15 Completed 9
Total tasks: 15 Started: 15 Completed 9
Total tasks: 15 Started: 15 Completed 9
Total tasks: 15 Started: 15 Completed 9
Total tasks: 15 Started: 15 Completed 9
Total tasks: 15 Started: 15 Completed 9
Total tasks: 15 Started: 15 Completed 11
Total tasks: 15 Started: 15 Completed 13
Total tasks: 15 Started: 15 Completed 13
Total tasks: 15 Started: 15 Completed 14
Total tasks: 15 Started: 15 Completed 14
Total tasks: 15 Started: 15 Completed 15
π is: 3.14155216