Skip to content

[docs][examples] Add parallel LLM quickstart and examples for multi-action fan-out#830

Merged
wenjin272 merged 10 commits into
apache:mainfrom
Ryan-Nightwish:fix/parallel-llm-example
Jun 16, 2026
Merged

[docs][examples] Add parallel LLM quickstart and examples for multi-action fan-out#830
wenjin272 merged 10 commits into
apache:mainfrom
Ryan-Nightwish:fix/parallel-llm-example

Conversation

@Ryan-Nightwish

Copy link
Copy Markdown
Contributor

Linked issue: #822

Purpose of change

Flink Agents supports parallel LLM invocations via multi-action fan-out, but no existing quickstart or example demonstrates this capability. This PR adds a complete "Parallel Sentiment Analysis" example (Java + Python) and a quickstart doc page covering the end-to-end workflow.

New files:

  • docs/content/docs/get-started/quickstart/parallel_llm.md — quickstart page with code walkthrough (both Python and Java tabs), deployment instructions, and constraints for multi-action fan-out
  • examples/.../ParallelChatRequestExample.java — Java entry point
  • examples/.../agents/ParallelChatAgent.java — Java agent that fans out three ChatRequestEvent events (one per sentiment dimension) in parallel, collects responses via sensory memory, and aggregates with a final LLM call
  • python/.../parallel_chat_request_example.py — Python entry point
  • python/.../agents/parallel_chat_agent.py — Python equivalent of the Java agent

Modified files:

  • examples/.../agents/CustomTypesAndResources.java — add shared types (SentimentRequest, SentimentKeySelector, AspectResponse, SummaryResponse) following project convention
  • python/.../agents/custom_types_and_resources.py — add shared types (AspectResponse, SummaryResponse)

Tests

  • Verified Java example runs successfully (JDK 21, parallel execution)
  • Verified Python example runs successfully (conda + Python 3.11)
  • Both produce correct output: taste:positive, service:negative, price:not_mentioned

API

No public API changes.

Documentation

  • doc-needed
  • doc-not-needed
  • doc-included

Ryan-Nightwish and others added 3 commits June 9, 2026 18:12
…on fan-out

Add a new quickstart page and runnable examples (Java + Python) demonstrating
parallel LLM invocations via multi-action fan-out. The example analyzes a
restaurant review by fanning out sentiment judgments across three dimensions
in parallel and aggregating the results with a final LLM call.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…LM doc

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…h project conventions

- Move custom types (AspectResponse, SummaryResponse, SentimentRequest,
  SentimentKeySelector) from agent classes into shared CustomTypesAndResources
  (Java) and custom_types_and_resources.py (Python) to follow project convention
- Simplify OutputEvent construction: use Map/dict instead of Row
- Remove debug System.out.println / print statements from agent handlers
- Java: use private fields + getters for SentimentRequest (matching POJO style)
- Java: remove wall-time measurement and banner prints from Example main()
- Python: rewrite example from from_table to from_datastream with
  env.from_collection(dict), matching workflow_single_agent_example style
- Python: remove ParallelChatKeySelector (replaced by inline lambda)
- Update parallel_llm.md doc code snippets to reflect the refactored code

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@github-actions github-actions Bot added doc-included Your PR already contains the necessary documentation updates. fixVersion/0.3.0 The feature or bug should be implemented/fixed in the 0.3.0 version. priority/major Default priority of the PR or issue. labels Jun 10, 2026
Remove redundant same-package imports in ParallelChatAgent.java and unused
import in ParallelChatRequestExample.java. Use fully-qualified
CustomTypesAndResources.XxxType references with proper line breaks to
satisfy the project line-width constraint.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@Ryan-Nightwish

Ryan-Nightwish commented Jun 11, 2026

Copy link
Copy Markdown
Contributor Author

Hi @wenjin272 @GreatEugenius , I have added examples and docs about parallel LLM invocations via multi-action fan-out, but I am not sure if it meets the requirements. PTAL orz.

@GreatEugenius GreatEugenius left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Overall, it looks good to me; I only left a few minor comments.

Comment thread docs/content/docs/get-started/quickstart/parallel_llm.md Outdated
Comment thread docs/content/docs/get-started/quickstart/parallel_llm.md Outdated
Comment thread docs/content/docs/get-started/quickstart/parallel_llm.md

@weiqingy weiqingy left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking this on — a parallel multi-action fan-out is a useful pattern to have an example for, and the JDK-21 Continuation vs. sequential-fallback split is documented accurately against what RunnerContext.durableExecuteAsync and the supportAsync() guard actually do, which is non-obvious framework behavior to get right. A few questions inline, mostly around the fan-out collection logic and Java/Python parity.

Comment thread python/flink_agents/examples/quickstart/agents/parallel_chat_agent.py Outdated
Comment thread python/flink_agents/examples/quickstart/agents/parallel_chat_agent.py Outdated
Comment thread docs/content/docs/get-started/quickstart/parallel_llm.md Outdated
@wenjin272

Copy link
Copy Markdown
Contributor

Thanks for the PR.

On the implementation details, the comments from @weiqingy and @GreatEugenius already cover my feedback. One point I'd like to emphasize, which both reviewers also raised: the code examples in the docs rely on many helper functions and global variables that aren't shown, which makes the examples hard for users to follow.

Ryan-Nightwish and others added 2 commits June 12, 2026 12:19
- Fix aspect correlation: pre-build all ChatRequestEvents and record a
  {request_id → aspect} map at dispatch time; look up the dispatched
  aspect via ChatResponseEvent.request_id instead of relying on the
  model to echo back the correct dimension label
- Inline short helpers (_save_row/_load_row, parseResponse/saveRow in
  Java) directly into action methods with brief explanatory comments
- Clean up code style: indentation, blank lines, zip strict=True,
  remove dead code, align AGGREGATE_SYSTEM_PROMPT formatting
- Update "Create the Agent" doc section to match current code: expose
  prompts and key helpers inline, reference full source for omitted
  supporting functions

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@Ryan-Nightwish

Copy link
Copy Markdown
Contributor Author

Hi @GreatEugenius @weiqingy @wenjin272, thanks again for your review. PR is updated based on your comments, most of the feedback has been addressed. A few notes on the non-trivial changes:

  • Aspect correlation fix: I now pre-build all three ChatRequestEvents before dispatching, record a {request_id → aspect} map at that point, and look up the dispatched aspect via ChatResponseEvent.request_id in the response handler — so the mapping is fully under our control regardless of what the model returns.
  • Helper functions: Simple plumbing helpers are inlined into the action methods. More meaningful helpers are kept as named module-level / private static functions, with the doc linking to the full source file for their implementations.

@weiqingy

Copy link
Copy Markdown
Collaborator

Verified on the new revision — all six of my comments are resolved, including the request_id correlation fix (traced it through the framework: the response's request_id always matches the dispatched ChatRequestEvent.id, so the keying is solid). LGTM.

@wenjin272 wenjin272 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Ryan-Nightwish, just one comment

body = (
f"Original: {row['text']}\n"
+ "Judgments: "
+ " ".join(f"{a}:{sentiments[a]}" for a in ASPECTS)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The definition of ASPECTS is provided on the Java example but not on the Python side. I believe adding it to the code would improve readability.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, added ASPECTS and N_ASPECTS to the Python code block. Thanks :)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

@wenjin272 wenjin272 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. PTAL @pltbkd @xintongsong

@wenjin272 wenjin272 added priority/blocker Indicates the PR or issue that should block the release until it gets resolved. and removed priority/major Default priority of the PR or issue. labels Jun 15, 2026
- Replace fan-out loop in request_aspect_judgments with a single
  SentimentInputEvent dispatch; handle_taste_input and
  handle_service_input each independently subscribe to that event
  and dispatch their own ChatRequestEvent — better reflecting the
  event-driven, observer-style programming model of Flink Agents
- Drop the price dimension (taste + service suffice to demonstrate
  parallelism) and update AGGREGATE_SYSTEM_PROMPT accordingly
- Add event flow comments to ParallelChatAgent class docstring /
  Javadoc for at-a-glance understanding of the parallel topology
- Update parallel_llm.md: Overview, Create the Agent code snippets,
  and Constraints hint to match the new structure

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@Ryan-Nightwish

Copy link
Copy Markdown
Contributor Author

Hi @xintongsong , based on your suggestion and without changing anything required by previous reviewers, I've made the following updates:

  • Switched to a broadcast event model for parallel dispatch. Replace fan-out loop in request_aspect_judgments with a single SentimentInputEvent dispatch as an intermediate broadcast: request_aspect_judgments emits just one event, and each aspect handler independently subscribes to it and issues its own ChatRequestEvent.
  • Removed the price dimension. Two parallel branches (taste + service) are sufficient to demonstrate the pattern, and the shorter example is easier to follow.
  • Added event flow comments to the ParallelChatAgent class docstring (Python) and Javadoc (Java) for at-a-glance understanding of the parallel topology.
  • Updated parallel_llm.md accordingly: Overview description, Create the Agent code snippets, and the Constraints hint.

Please take a look — happy to adjust if there's anything else you'd like changed.

@Ryan-Nightwish Ryan-Nightwish force-pushed the fix/parallel-llm-example branch from d5e607e to 1737d0d Compare June 15, 2026 10:01
Replace the single "res" key (a JSON-serialized dict in Python / a
nested Map in Java) with path-based individual keys in the MemoryObject
API: "id" and "text" stored as primitives, "aspect_map.{request_id}"
and "sentiments.{aspect}" stored as flat strings using dot-path notation.

Benefits:
- Eliminates all json.dumps / json.loads calls in Python (the
  Pemja JVM boundary is crossed only with primitive types)
- Removes the read-modify-write-whole-blob pattern; each handler
  writes only the single key it owns
- Drops _init_row / initRow and _all_aspects_received / allAspectsReceived
  helpers; updates _build_summarize_request and _build_output_event
  signatures to take typed arguments directly
- Syncs the parallel_llm.md quickstart doc to the new code

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@Ryan-Nightwish

Ryan-Nightwish commented Jun 15, 2026

Copy link
Copy Markdown
Contributor Author

Hi @xintongsong @wenjin272 , I made the following additional changes:

  • Simplified sensory memory access: Replaced the single "res" key (which packed all state into one JSON-serialized blob in Python, or one nested Map in Java) with path-based individual keys supported by the MemoryObject API. Each field now has its own key: primitives "id" and "text" are stored directly. This removes all json.dumps / json.loads calls from the Python side and eliminates the unchecked Map casts from the Java side.
  • Removed now-redundant helpers: _init_row / initRow, _all_aspects_received / allAspectsReceived are inlined or dropped; _build_summarize_request and _build_output_event now take typed arguments directly instead of receiving the whole row dict / Map.
  • Updated docs: The parallel_llm.md quickstart code blocks and Constraints hint are updated to reflect the new memory access pattern.
    Please take a look and let me know if there is anything else to adjust.

@wenjin272 wenjin272 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

# Primitive types (int, str) cross the Pemja JVM boundary without serialization.
ctx.sensory_memory.set("id", payload["id"])
ctx.sensory_memory.set("text", payload["text"])
ctx.send_event(SentimentInputEvent(input_id=payload["id"], text=payload["text"]))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just use the general Event here:

ctx.send_event(Event(type="SentimentInputEvent",
                             attributes={"input_id": payload["id"],
                                         "text": payload["text"]}))

Of course, SentimentInputEvent can be declared as a constant string.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, please check it out.

…ring constant

Replace the custom SentimentInputEvent subclass with a plain Event constructed
inline, and declare SENTIMENT_INPUT_EVENT_TYPE as a module/class-level string
constant. Update @action/@action annotations and all dispatch/handler sites in
both Python and Java accordingly. Sync the doc code snippets and description.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@wenjin272 wenjin272 merged commit f7af693 into apache:main Jun 16, 2026
25 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-included Your PR already contains the necessary documentation updates. fixVersion/0.3.0 The feature or bug should be implemented/fixed in the 0.3.0 version. priority/blocker Indicates the PR or issue that should block the release until it gets resolved.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants