一些背景

假设您有一个数据分析批处理作业, 它在专用计算机上每小时运行一次。随着周的推移, 您会发现输入越来越大, 运行时间越长, 就会慢慢接近一个小时标记。您担心随后的处决可能会开始 ‘ 相互运行 ‘, 导致您的业务管线行为不端。或者, 您可能在 SLA 下为给定时间约束中的一批信息提供结果, 并且随着批次大小在生产中慢慢增加, 您正接近分配的最大时间。

这听起来像你可能有一个流问题!但是-你说-分析管道的其他部分是由其他团队拥有的, 让所有在船上迁移到流式架构的人都需要时间和大量的努力。在发生的时候, 你的管道的特定部分可能会被完全堵塞。沃拉鲁虽然最初是为流和事件数据设计的, 但也可以用来可靠地并行化许多通常不被认为是流式的工作负载, 几乎没有什么努力。

让我们让我们的熊猫走得更快!我们将使用特定的群集来并行化批处理作业, 并通过¾在一台计算机上减少其运行时。群集将由一台机器上的几个沃拉鲁工人组成, 在完成作业后可以关闭。

随着这个结构的到位, 我们可以很容易地横向扩展到多台机器, 如果需要的话。这意味着我们可以在自己的后院中推出一小部分流式架构, 并在时间到了将堆栈的其他部分移动到事件流的世界时准备好一个故事。

现有管线

# file: old_pipeline.py

df = pd.read_csv(infile, index_col=0, dtype=unicode, engine='python')
fancy_ml_black_box.classify_df(df)
df.to_csv(outfile, header=False)

瓶颈在于 fancy_ml_black_box.classify_df 。这个函数运行一个分类器, 由我们的数据分析员编写, 在每行的熊猫 dataframe。由于对特定行进行分类的结果与对任何其他行的分类无关, 所以它似乎是并行化的好候选。

花式黑匣子 Classifer 的注记

如果您查看分类器源代码, 您会发现它调用dataframe. 应用一个相当无意义的计算。我们已经选择了一些东西, 燃烧 CPU 周期, 以模拟昂贵的机器学习分类过程, 并展示了从并行化的好处。

以下是我们如何与沃拉鲁:

    ab = wallaroo.ApplicationBuilder("Parallel Pandas Classifier with Wallaroo")
    ab.new_pipeline("Classifier",
                    wallaroo.TCPSourceConfig(in_host, in_port, decode))
    ab.to_stateful(batch_rows, RowBuffer, "CSV rows + global header state")
    ab.to_parallel(classify)
    ab.to_sink(wallaroo.TCPSinkConfig(out_host, out_port, encode))

我们的想法是使用我们的 TCP 源摄取 csv 行, 将它们分批到小 dataframes, 并并行运行分类算法。

我们将保留管线的输入和输出格式, 保持与上游和下游系统的兼容性, 但希望通过利用服务器上的所有内核, 可以看到显著的速度增长。以下是不同大小的输入文件的运行时间:

输入大小 所采取的时间 (AWS c5.4xlarge)
1000行 3.7s
1万行 555
10万行 5m 53s
100万行 58m 21s

这些数字清楚地表明, 我们正在处理线性运行时复杂度的算法-执行任务所用的时间与输入的大小线性相关。我们可以估计, 如果数据的速率超过270行/秒, 我们的管线就会陷入麻烦。

这意味着, 如果每小时工作输入开始接近100万行, 新的作业可能会开始 “运行到” 尚未完成的旧作业。

熊猫与沃拉鲁的并行化

让我们来看看我们是否可以通过在这台机器上拆分所有可用的 CPU 内核 (8) 中的所有工作来提高这些数字。首先, 我们需要一些脚手架来设置沃拉鲁的输入和输出。

Three process architecture: send.py sends data, wallaroo processes it, and sends to data_receiver

步骤 1: 将 CSV 文件发送到沃拉鲁

我们将使用 Python 脚本读取输入 csv 文件中的所有行, 并将它们发送到我们的沃拉鲁 TCP 源。我们需要对每一行进行, 以便在沃拉鲁源中正确解码它们:

try:
   with open(filename, 'rb') as f:
     for line in f.readlines():
       line = line.strip()
       sock.sendall(struct.pack(">I",len(line))+line)

finally:
   sock.sendall(struct.pack(">I",len(EOT))+EOT)
   print('Done sending {}'.format(filename))
   sock.close()

sock.sendall(struct.pack(">I",len(line))+line)方法: 将行的长度编码为4字节、大端字节整数 ( I ), 然后在 TCP 套接字下发送该整数和整行文本。

finally 子句中, 我们还编码和发送一个单一的ASCII EOT字节, 信号, 这是我们的输入结束。

此 TCP 输入由解码器接收:

@wallaroo.decoder(header_length=4, length_fmt=">I")
def decode(bs):
    if bs == "\x04":
        return EndOfInput()
    else:
        return bs

正如您所看到的, 如果我们的数据是 EOT 字节 ( \x04 ), 我们将创建一个对象, 使 “输入的末尾” 含义显式。否则, 我们将把数据看作是。

步骤 2: 对 CSV 行进行批处理

管道中的下一步是将输入行批处理为100块。

@wallaroo.state_computation(name='Batch rows of csv, emit DataFrames')
def batch_rows(row, row_buffer):
    return (row_buffer.update_with(row), True)

RowBuffer状态对象将采取它所看到的第一行, 并将其保存在内部作为 header 。然后它将接受传入行, 直到它存储了一定数量 (我们的应用程序中有100行)。.update_with(row)如果添加了该方法, None row 但缓冲区中仍有空间, 则返回如果更新填充缓冲区, 它将在内部为零, 并发出 BatchedRows 两个字段的对象: a headerrows

关于序列化效率的一个注记

为什么要进行批处理, 当我们可以简单地将 CSV 文件中的每个条目作为单行 dataframe 发送到我们的分类器?答案是: 速度。在沃拉鲁中计算步骤之间的每次数据传输都可能需要对导线上的数据进行编码和解码, 而 dataframe 对象的创建也不是没有成本的。

步骤 3: 并行分类小型 Dataframes

这是管道的一部分, 我们可以将沃拉鲁的内置分配机制归结于我们的问题:

@wallaroo.computation(name="Classify")
def classify(batched_rows):
    df = build_dataframe(batched_rows)
    fancy_ml_black_box.classify_df(df)
    return df

在将对象转换为 dataframe 的过程中, 还有一些按摩 BatchedRows :

def build_dataframe(br):
    buf = StringIO(br.header + "\n" + ("\n".join(br.rows)))
    return pd.read_csv(buf, index_col=0, dtype=unicode, engine='python')

本质上, 我们粘附 BatchedRows.headerBatchedRows.rows 模拟一个独立的 csv 文件, 然后我们传递到 pandas.read_csv StringIO缓冲区的形式。现在, 我们可以将结果丰富的 dataframe 传递给 fancy_ml_black_box.classify_df() 函数。

上述所有工作 (包括将数据封送到 dataframe 中) 都是并行发生的, 而群集中的每个沃拉鲁工作者都得到了不同的实例 BufferedRows

步骤 4: 将编码回文件

dataframe 输出 classify() , 上面, 得到序列化和框架的 encode 步骤。现在, 您应该有点熟悉整个项目中使用的简单 TCP 框架:

def encode(df):
    s = dataframe_to_csv(df)
    return struct.pack('>I',len(s)) + s

将帮助器函数 dataframe_to_csv 定义为:

def dataframe_to_csv(df):
    buf = StringIO()
    df.to_csv(buf, header=False)
    s = buf.getvalue().strip()
    buf.close()
    return s

此表示法由沃拉鲁工具读取 data_receiver , 它被告知侦听 --framed 数据:

nohup data_receiver  \
      --framed --listen "$LEADER":"$SINK_PORT" \
      --ponynopin \
      > "$OUTPUT" 2>&1 &

这是很好的, 因为这是它会得到的。输出将被写入到由环境变量指定的文件中 OUTPUT

对运行时的影响

首先, 让我们验证新代码是否与旧代码生成了相同的输出:

$ /usr/bin/time make run-old INPUT=input/1000.csv
./old_pipeline.py input/1000.csv "output/old_1000.csv"
3.85user 0.47system 0:03.70elapsed 116%CPU (0avgtext+0avgdata 54260maxresident)k
176inputs+288outputs (0major+17423minor)pagefaults 0swaps

$ /usr/bin/time make run-new N_WORKERS=1 INPUT=input/1000.csv
INPUT=input/1000.csv OUTPUT="output/new_1000.csv" N_WORKERS=1 ./run_machida.sh
(..)
4.48user 0.90system 0:04.13elapsed 130%CPU (0avgtext+0avgdata 63808maxresident)k
0inputs+352outputs (0major+989180minor)pagefaults 0swaps

$ diff output/new_1000.csv output/old_1000.csv
$ echo $?
0

耶!结果是匹配的, 运行时只会慢1秒, 这并不那么糟糕, 考虑到我们正在启动3个单独的进程 (发件人、沃拉鲁和接收器), 并通过网络发送所有数据两次。首先, 1万行文件:

原始代码 1 工作者 4 名工人 8 名工人
555 595 405 11s

现在, 使用10万行文件:

原始代码 1 工作者 4 名工人 8 名工人
5m48s 6m28s 3m16s 1m41s

并与百万行文件:

原始代码 1 工作者 4 名工人 8 名工人
58m21s 1h03m46s 32m12s 16m33s

你为什么不测试两个工人?

由于 Python 执行模型的单线程约束, 沃拉鲁群集中的初始值设定项通常会在将工作发送到群集的其余部分之前, 积极地承担其在并行工作负载中的份额。

这意味着在两个工人上运行并行作业不会产生速度效益。我们建议运行至少四名员工集群, 以利用沃拉鲁的扩展能力。

正如您所看到的 (通过克隆这个示例项目来验证自己), 我们能够将百万行的处理时间减少到十六分钟。此外, 如果输入数据集对于我们的单机八工作者群集来说太大, 我们可以很容易地添加更多的机器并利用额外的并行性, 而不会在我们的沃拉鲁应用程序中更改单行代码。

这给了我们相当大的能力来抵御日益增加的负载的风暴, 而我们为整个系统设计了一个更成熟的流体系结构

接下来呢?

希望, 我已经提出了以上的情况下, 沃拉鲁可以作为一种特殊的方法, 以适应您现有的基于熊猫的分析管道, 以处理增加负荷。下一次, 我将向您展示如何沃拉鲁群集按需进行旋转, 以处理那些在一台机器上无法容纳的真正巨大的工作。

将分析管道放在流式框架中不仅可以扩展数据科学的规模, 还能为实时的洞察力提供机会。一旦你准备好进入一个真正的 evented 模型, 你所要做的就是把你的数据直接发送到沃拉鲁, 完全绕过 CSV 阶段。实际的沃拉鲁管线不需要改变!有了一点点的预先投资, 你已经解开了广泛的可能性, productionize 你的 Python 分析代码。

Comments are closed.