-
Notifications
You must be signed in to change notification settings - Fork 155
Expand file tree
/
Copy pathray_pickle_expr.py
More file actions
86 lines (67 loc) · 2.82 KB
/
ray_pickle_expr.py
File metadata and controls
86 lines (67 loc) · 2.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Distribute DataFusion expressions to Ray actors.
For background — the shipped-expression model, what travels inline vs
by name, portability requirements, and the security threat model —
see ``docs/source/user-guide/distributing-work.rst``.
Build an expression in the driver, ship it to a pool of Ray actors, and
have each actor evaluate it against its own slice of data. Python UDFs
travel with the shipped expression — no actor-side registration needed.
Prerequisites:
pip install ray
Run:
python examples/ray_pickle_expr.py
"""
import pyarrow as pa
import ray
from datafusion import Expr, SessionContext, col, lit, udf
def _build_double_udf():
"""Return the demo UDF used by the driver."""
return udf(
lambda arr: pa.array([(v.as_py() or 0) * 2 for v in arr]),
[pa.int64()],
pa.int64(),
volatility="immutable",
name="double",
)
@ray.remote
class DataFusionWorker:
"""A Ray actor with a private :class:`SessionContext`."""
def __init__(self) -> None:
self._ctx = SessionContext()
def evaluate(self, expr: Expr, batch_pylist: list[int]) -> list[int]:
"""Run the expression against an in-memory batch."""
# `expr` arrived here via Ray's automatic argument serialization;
# the Python UDF inside it was reconstructed from the bytes — no
# pre-registration on this actor required.
df = self._ctx.from_pydict({"a": batch_pylist})
out = df.with_column("result", expr).select("result")
return out.to_pydict()["result"]
def main() -> None:
ray.init(ignore_reinit_error=True)
expr = _build_double_udf()(col("a")) + lit(1)
workers = [DataFusionWorker.remote() for _ in range(2)]
batches = [[1, 2, 3], [10, 20, 30], [100, 200, 300]]
futures = [
workers[i % len(workers)].evaluate.remote(expr, batch)
for i, batch in enumerate(batches)
]
for batch, result in zip(batches, ray.get(futures), strict=True):
print(f"input {batch} -> {result}")
ray.shutdown()
if __name__ == "__main__":
main()