13518219792

建站动态

根据您的个性需求进行定制 先人一步 抢占小程序红利时代

机器学习分布式框架Ray

机器学习分布式框架Ray

作者:wedo实验君 2021-09-09 15:45:17

人工智能

机器学习

分布式 Ray是UC Berkeley RISELab新推出的高性能分布式执行框架,具有比Spark更优异的计算性能,而且部署和改造更简单,同时支持机器学习和深度学习的分布式训练,支持主流的深度学习框架。

创新互联建站专注于企业成都营销网站建设、网站重做改版、鹿城网站定制设计、自适应品牌网站建设、H5高端网站建设商城网站建设、集团公司官网建设、外贸营销网站建设、高端网站制作、响应式网页设计等建站业务,价格优惠性价比高,为鹿城等各大城市提供网站开发制作服务。

 [[422916]]

Python中文社区 (ID:python-china)

1.什么是Ray

分布式计算框架大家一定都耳熟能详,诸如离线计算的Hadoop(map-reduce),spark, 流式计算的strom,Flink等。相对而言,这些计算框架都依赖于其他大数据组件,安装部署也相对复杂。

在python中,之前有分享过的Celery可以提供分布式的计算。今天和大家分享另外一个开源的分布式计算框架Ray。Ray是UC Berkeley RISELab新推出的高性能分布式执行框架,具有比Spark更优异的计算性能,而且部署和改造更简单,同时支持机器学习和深度学习的分布式训练,支持主流的深度学习框架(pytorch,tensorflow,keras等)。

2. Ray架构

Ray的架构参见最早发布的论文Ray: A Distributed Framework for Emerging AI Applications

由上图可知Ray主要包括:

Ray适用于任何分布式计算的任务,包括分布式训练。笔者最近是用在大量的时间序列预测模型训练和在线预测上。

Ray目前库支持超参数调优Ray tune, 梯度下降Ray SGD,推理服务RaySERVE, 分布式数据Dataset以及分布式增强学习RLlib。还有其他第三方库,如下所示:

3. 简单使用

3.1 安装部署 

  
 
 
 
  1. pip install --upgrade pip  
  2. # pip install ray  
  3. pip install ray == 1.6.0  
  4. # ImportError: cannot import name 'deep_mapping' from 'attr.validators'  
  5. # pip install attr == 19.1.0 

3.2 单机使用

  
 
 
 
  1. import time  
  2.   import ray  
  3.   ray.init(num_cpus = 4) # Specify this system has 4 CPUs.  
  4.   @ray.remote  
  5.   def do_some_work(x):  
  6.       time.sleep(1) # Replace this is with work you need to do.  
  7.       return x  
  8.   start = time.time()  
  9.   results = ray.get([do_some_work.remote(x) for x in range(4)])  
  10.   print("duration =", time.time() - start)  
  11.   print("results = ", results)   
  12.   # duration = 1.0107324123382568  
  13.   # results =  [0, 1, 2, 3] 

remote返回的对象的id 如ObjectRef(7f10737098927148ffffffff0100000001000000)。需要通过ray.get来获取实际的值, 需要注意的是ray.get是阻塞式的调用,不能[ray.get(do_some_work.remote(x)) for x in range(4)]

  
 
 
 
  1. @ray.remote  
  2.    def tiny_work(x):  
  3.        time.sleep(0.0001) # Replace this is with work you need to do.  
  4.        return x  
  5.    start = time.time()  
  6.    result_ids = [tiny_work.remote(x) for x in range(100000)]  
  7.    results = ray.get(result_ids)  
  8.    print("duration =", time.time() - start) 
  
 
 
 
  1. num = ray.put(10)  
  2. ray.get(num) 

          这个时候可以采用ray.wait()方法,ray.wait()返回执行完毕的和未执行完毕的任务结果,执行完成的结果可以继续后续的操作   

  
 
 
 
  1. import random  
  2.    @ray.remote  
  3.    def do_some_work(x):  
  4.        time.sleep(random.uniform(0, 4)) # Replace this is with work you need to do.  
  5.        return   
  6.    def process_incremental(sum, result):  
  7.        time.sleep(1) # Replace this with some processing code.  
  8.        return sum + result  
  9.    start = time.time()  
  10.    result_ids = [do_some_work.remote(x) for x in range(4)]  
  11.    sum = 0  
  12.    while len(result_ids):  
  13.        done_id, result_ids = ray.wait(result_ids)  
  14.        sum = process_incremental(sum, ray.get(done_id[0])) 
  15.     print("duration =", time.time() - start, "\nresult = ", sum)  
  16.    # duration = 5.270821809768677   
  17.    # result =  6    

2.3 集群部署

Ray的架构遵循master-slave的模式。Head Node 可以认为是Master,其他的Node为worker。在集群部署时,Head Node需要首先启动ray start --head, 其他机器依次启动worker,注意需要指定head Node的地址确定关系,ray start --address 10.8.xx.3:6379。

关闭服务,需要每一台机器执行 ray.stop

 

  
 
 
 
  1. # To start a head node.  
  2. #ray start --head --num-cpus= --num-gpus=  
  3. ray start --head --node-ip-address 10.8.xx.3 --port=6379  
  4. # To start a non-head node.  
  5. # ray start --address=
     --num-cpus= --num-gpus=  
  6. ray start --address 10.8.xx.3:6379 --node-ip-address 10.8.xx.3 --num-cpus 10 --temp-dir={your temp path} 

  
 
 
 
  1. import ray  
  2. ray.init(10.8.xx.3:6379) 

3. 不同任务的例子

  
 
 
 
  1. import numpy as np  
  2. # Define two remote functions. Invocations of these functions create tasks  
  3. # that are executed remotely.  
  4. @ray.remote  
  5. def multiply(x, y):  
  6.     return np.dot(x, y)  
  7. @ray.remote  
  8. def zeros(size):  
  9.     return np.zeros(size)  
  10. # Start two tasks in parallel. These immediately return futures and the  
  11. # tasks are executed in the background.  
  12. x_id = zeros.remote((100, 100))  
  13. y_id = zeros.remote((100, 100))  
  14. # Start a third task. This will not be scheduled until the first two  
  15. # tasks have completed.  
  16. z_id = multiply.remote(x_id, y_id)  
  17. # Get the result. This will block until the third task completes.  
  18. z = ray.get(z_id)  
  19. print(z)     

  
 
 
 
  1. @ray.remote  
  2.  class Counter(object):  
  3.      def __init__(self):  
  4.          self.n = 0  
  5.      def increment(self):  
  6.          self.n += 1  
  7.      def read(self):  
  8.          return self.n  
  9.  counters = [Counter.remote() for i in range(4)]  
  10.  # 不断的执行可以每个counter计数不断增加  
  11.  [c.increment.remote() for c in counters]  
  12.  futures = [c.read.remote() for c in counters]  
  13.  print(ray.get(futures))  
  14.  # [1, 1, 1, 1]  
  15.  # [11, 11, 11, 11] 

         这里举一个简单的例子:   

  
 
 
 
  1. @ray.remote  
  2.    def map(obj, f):  
  3.        return f(obj)  
  4.    @ray.remote  
  5.    def sum_results(*elements):  
  6.        return np.sum(elements)  
  7.    items = list(range(100))  
  8.    map_func = lambda i : i*2  
  9.    remote_elements = [map.remote(i, map_func) for i in items]  
  10.    # simple reduce  
  11.    remote_final_sum = sum_results.remote(*remote_elements)  
  12.    result = ray.get(remote_final_sum) 
  13.    # tree reduce  
  14.    intermediate_results = [sum_results.remote(  
  15.        *remote_elements[i * 20: (i + 1) * 20]) for i in range(5)]  
  16.    remote_final_sum = sum_results.remote(*intermediate_results)  
  17.    result = ray.get(remote_final_sum)     

           参见 https://docs.ray.io/en/latest/using-ray-with-pytorch.html

4. 总结

本文分享了高效的Python分布式计算框架Ray,希望对你有帮助。总结如下:


新闻标题:机器学习分布式框架Ray
网站链接:http://cdbrznjsb.com/article/djghocp.html

其他资讯

让你的专属顾问为你服务