[docs][examples] Add parallel LLM quickstart and examples for multi-action fan-out#830
Conversation
…on fan-out Add a new quickstart page and runnable examples (Java + Python) demonstrating parallel LLM invocations via multi-action fan-out. The example analyzes a restaurant review by fanning out sentiment judgments across three dimensions in parallel and aggregating the results with a final LLM call. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…LM doc Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…h project conventions - Move custom types (AspectResponse, SummaryResponse, SentimentRequest, SentimentKeySelector) from agent classes into shared CustomTypesAndResources (Java) and custom_types_and_resources.py (Python) to follow project convention - Simplify OutputEvent construction: use Map/dict instead of Row - Remove debug System.out.println / print statements from agent handlers - Java: use private fields + getters for SentimentRequest (matching POJO style) - Java: remove wall-time measurement and banner prints from Example main() - Python: rewrite example from from_table to from_datastream with env.from_collection(dict), matching workflow_single_agent_example style - Python: remove ParallelChatKeySelector (replaced by inline lambda) - Update parallel_llm.md doc code snippets to reflect the refactored code Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Remove redundant same-package imports in ParallelChatAgent.java and unused import in ParallelChatRequestExample.java. Use fully-qualified CustomTypesAndResources.XxxType references with proper line breaks to satisfy the project line-width constraint. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Hi @wenjin272 @GreatEugenius , I have added examples and docs about parallel LLM invocations via multi-action fan-out, but I am not sure if it meets the requirements. PTAL orz. |
GreatEugenius
left a comment
There was a problem hiding this comment.
Thanks for the PR. Overall, it looks good to me; I only left a few minor comments.
weiqingy
left a comment
There was a problem hiding this comment.
Thanks for taking this on — a parallel multi-action fan-out is a useful pattern to have an example for, and the JDK-21 Continuation vs. sequential-fallback split is documented accurately against what RunnerContext.durableExecuteAsync and the supportAsync() guard actually do, which is non-obvious framework behavior to get right. A few questions inline, mostly around the fan-out collection logic and Java/Python parity.
|
Thanks for the PR. On the implementation details, the comments from @weiqingy and @GreatEugenius already cover my feedback. One point I'd like to emphasize, which both reviewers also raised: the code examples in the docs rely on many helper functions and global variables that aren't shown, which makes the examples hard for users to follow. |
- Fix aspect correlation: pre-build all ChatRequestEvents and record a
{request_id → aspect} map at dispatch time; look up the dispatched
aspect via ChatResponseEvent.request_id instead of relying on the
model to echo back the correct dimension label
- Inline short helpers (_save_row/_load_row, parseResponse/saveRow in
Java) directly into action methods with brief explanatory comments
- Clean up code style: indentation, blank lines, zip strict=True,
remove dead code, align AGGREGATE_SYSTEM_PROMPT formatting
- Update "Create the Agent" doc section to match current code: expose
prompts and key helpers inline, reference full source for omitted
supporting functions
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Hi @GreatEugenius @weiqingy @wenjin272, thanks again for your review. PR is updated based on your comments, most of the feedback has been addressed. A few notes on the non-trivial changes:
|
|
Verified on the new revision — all six of my comments are resolved, including the |
wenjin272
left a comment
There was a problem hiding this comment.
Thanks @Ryan-Nightwish, just one comment
| body = ( | ||
| f"Original: {row['text']}\n" | ||
| + "Judgments: " | ||
| + " ".join(f"{a}:{sentiments[a]}" for a in ASPECTS) |
There was a problem hiding this comment.
The definition of ASPECTS is provided on the Java example but not on the Python side. I believe adding it to the code would improve readability.
There was a problem hiding this comment.
Got it, added ASPECTS and N_ASPECTS to the Python code block. Thanks :)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
wenjin272
left a comment
There was a problem hiding this comment.
LGTM. PTAL @pltbkd @xintongsong
- Replace fan-out loop in request_aspect_judgments with a single SentimentInputEvent dispatch; handle_taste_input and handle_service_input each independently subscribe to that event and dispatch their own ChatRequestEvent — better reflecting the event-driven, observer-style programming model of Flink Agents - Drop the price dimension (taste + service suffice to demonstrate parallelism) and update AGGREGATE_SYSTEM_PROMPT accordingly - Add event flow comments to ParallelChatAgent class docstring / Javadoc for at-a-glance understanding of the parallel topology - Update parallel_llm.md: Overview, Create the Agent code snippets, and Constraints hint to match the new structure Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Hi @xintongsong , based on your suggestion and without changing anything required by previous reviewers, I've made the following updates:
Please take a look — happy to adjust if there's anything else you'd like changed. |
d5e607e to
1737d0d
Compare
Replace the single "res" key (a JSON-serialized dict in Python / a
nested Map in Java) with path-based individual keys in the MemoryObject
API: "id" and "text" stored as primitives, "aspect_map.{request_id}"
and "sentiments.{aspect}" stored as flat strings using dot-path notation.
Benefits:
- Eliminates all json.dumps / json.loads calls in Python (the
Pemja JVM boundary is crossed only with primitive types)
- Removes the read-modify-write-whole-blob pattern; each handler
writes only the single key it owns
- Drops _init_row / initRow and _all_aspects_received / allAspectsReceived
helpers; updates _build_summarize_request and _build_output_event
signatures to take typed arguments directly
- Syncs the parallel_llm.md quickstart doc to the new code
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Hi @xintongsong @wenjin272 , I made the following additional changes:
|
| # Primitive types (int, str) cross the Pemja JVM boundary without serialization. | ||
| ctx.sensory_memory.set("id", payload["id"]) | ||
| ctx.sensory_memory.set("text", payload["text"]) | ||
| ctx.send_event(SentimentInputEvent(input_id=payload["id"], text=payload["text"])) |
There was a problem hiding this comment.
I think we can just use the general Event here:
ctx.send_event(Event(type="SentimentInputEvent",
attributes={"input_id": payload["id"],
"text": payload["text"]}))
Of course, SentimentInputEvent can be declared as a constant string.
There was a problem hiding this comment.
Done, please check it out.
…ring constant Replace the custom SentimentInputEvent subclass with a plain Event constructed inline, and declare SENTIMENT_INPUT_EVENT_TYPE as a module/class-level string constant. Update @action/@action annotations and all dispatch/handler sites in both Python and Java accordingly. Sync the doc code snippets and description. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Linked issue: #822
Purpose of change
Flink Agents supports parallel LLM invocations via multi-action fan-out, but no existing quickstart or example demonstrates this capability. This PR adds a complete "Parallel Sentiment Analysis" example (Java + Python) and a quickstart doc page covering the end-to-end workflow.
New files:
docs/content/docs/get-started/quickstart/parallel_llm.md— quickstart page with code walkthrough (both Python and Java tabs), deployment instructions, and constraints for multi-action fan-outexamples/.../ParallelChatRequestExample.java— Java entry pointexamples/.../agents/ParallelChatAgent.java— Java agent that fans out threeChatRequestEventevents (one per sentiment dimension) in parallel, collects responses via sensory memory, and aggregates with a final LLM callpython/.../parallel_chat_request_example.py— Python entry pointpython/.../agents/parallel_chat_agent.py— Python equivalent of the Java agentModified files:
examples/.../agents/CustomTypesAndResources.java— add shared types (SentimentRequest,SentimentKeySelector,AspectResponse,SummaryResponse) following project conventionpython/.../agents/custom_types_and_resources.py— add shared types (AspectResponse,SummaryResponse)Tests
taste:positive, service:negative, price:not_mentionedAPI
No public API changes.
Documentation
doc-neededdoc-not-neededdoc-included