Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions pyatlan/client/aio/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
AppRemoveSchedule,
AppSubmit,
AppUpdate,
existing_slug_from_conflict,
is_duplicate_name_conflict,
)
from pyatlan.errors import ConflictError, ErrorCode
from pyatlan.errors import AtlanError, ErrorCode
from pyatlan.model.apps import AppInput
from pyatlan.model.app import (
AppDeleteResponse,
Expand Down Expand Up @@ -113,17 +115,22 @@ async def create(
endpoint, request_obj = AppCreate.prepare_request(request)
try:
raw = await self._client._call_api(endpoint, request_obj=request_obj)
except ConflictError as conflict:
return await self._reuse_on_conflict(name, conflict)
except AtlanError as exc:
if is_duplicate_name_conflict(exc):
return await self._reuse_on_conflict(name, exc)
raise
return AppCreate.process_response(raw)

async def _reuse_on_conflict(
self, name: str, conflict: ConflictError
) -> AppResponse:
async def _reuse_on_conflict(self, name: str, conflict: AtlanError) -> AppResponse:
"""Resolve a duplicate-name ``409`` to the existing workflow's slug (reuse).

A non-unique name can't be reused safely, so the conflict is re-raised.
Prefer the slug carried in the ``409`` body; fall back to a by-name lookup. A
non-unique name can't be reused safely, so the conflict is re-raised.
"""
slug = existing_slug_from_conflict(conflict)
if slug:
LOGGER.info("App workflow %r already exists; reusing slug %s", name, slug)
return AppResponse(slug=slug)
existing = [w for w in (await self.get_all(name=name)).workflows if w.slug]
if len(existing) == 1:
slug = existing[0].slug
Expand Down
24 changes: 16 additions & 8 deletions pyatlan/client/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
AppRemoveSchedule,
AppSubmit,
AppUpdate,
existing_slug_from_conflict,
is_duplicate_name_conflict,
)
from pyatlan.errors import ConflictError, ErrorCode
from pyatlan.errors import AtlanError, ErrorCode
from pyatlan.model.apps import AppInput
from pyatlan.model.app import (
AppDeleteResponse,
Expand Down Expand Up @@ -142,18 +144,24 @@ def create(
endpoint, request_obj = AppCreate.prepare_request(request)
try:
raw = self._client._call_api(endpoint, request_obj=request_obj)
except ConflictError as conflict:
return self._reuse_on_conflict(name, conflict)
except AtlanError as exc:
if is_duplicate_name_conflict(exc):
return self._reuse_on_conflict(name, exc)
raise
return AppCreate.process_response(raw)

def _reuse_on_conflict(self, name: str, conflict: ConflictError) -> AppResponse:
def _reuse_on_conflict(self, name: str, conflict: AtlanError) -> AppResponse:
"""Resolve a duplicate-name ``409`` to the existing workflow's slug.

Heracles returns ``409`` (with the existing slug) when ``name`` already
exists. We look the workflow up by name and return it so callers don't have
to special-case re-runs. A non-unique name can't be reused safely, so the
original conflict is re-raised.
Heracles returns ``409`` with the existing slug in the body when ``name``
already exists; prefer that, and fall back to a by-name lookup. Either way we
return the existing workflow so callers don't special-case re-runs. A
non-unique name can't be reused safely, so the conflict is re-raised.
"""
slug = existing_slug_from_conflict(conflict)
if slug:
LOGGER.info("App workflow %r already exists; reusing slug %s", name, slug)
return AppResponse(slug=slug)
existing = [w for w in self.get_all(name=name).workflows if w.slug]
if len(existing) == 1:
slug = existing[0].slug
Expand Down
32 changes: 32 additions & 0 deletions pyatlan/client/common/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@

from __future__ import annotations

import json
from typing import Any, Dict, Optional, Tuple

from pyatlan.errors import AtlanError

from pyatlan.client.constants import (
ADD_APP_SCHEDULE,
CANCEL_APP_RUN,
Expand Down Expand Up @@ -44,6 +47,35 @@
)


def is_duplicate_name_conflict(exc: AtlanError) -> bool:
"""True when ``exc`` is the server's ``409`` "workflow name already exists".

The create endpoint returns ``409`` with a body that has no ``code``/``status``
field, so the transport raises a plain :class:`AtlanError` (not the mapped
``ConflictError``) — detect it by the HTTP status carried on ``error_code``.
"""
return getattr(getattr(exc, "error_code", None), "http_error_code", None) == 409


def existing_slug_from_conflict(exc: AtlanError) -> Optional[str]:
"""Pull the existing workflow's ``slug`` out of a duplicate-name ``409`` body.

Heracles returns ``{"message": ..., "slug": ..., "version": ...}``; the raw body
is preserved on the error message. Returns ``None`` if no single slug is present
(e.g. the multi-match ``slugs[]`` form), so callers can fall back to a lookup.
"""
raw = getattr(getattr(exc, "error_code", None), "error_message", "") or ""
try:
start, end = raw.find("{"), raw.rfind("}")
if start != -1 and end > start:
slug = json.loads(raw[start : end + 1]).get("slug")
if isinstance(slug, str) and slug:
return slug
except (ValueError, TypeError):
pass
return None


class AppGetInfo:
"""``GET /v1/apps/{app_id}`` — describe an app (native-readiness, entrypoints)."""

Expand Down
38 changes: 30 additions & 8 deletions tests/unit/test_app_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@
# Copyright 2026 Atlan Pte. Ltd.
"""Unit tests for the App workflow client — sync + async."""

import json
from types import SimpleNamespace
from typing import Optional
from unittest.mock import AsyncMock, Mock

import pytest

from pyatlan.client.aio.app import AsyncAppClient
from pyatlan.client.app import AppClient
from pyatlan.client.common import ApiCaller, AsyncApiCaller
from pyatlan.errors import ConflictError, ErrorCode
from pyatlan.errors import AtlanError
from pyatlan.model.app import (
AppDeleteResponse,
AppInfo,
Expand Down Expand Up @@ -133,14 +136,34 @@ def test_get_all_passes_name_filter(client, mock_api_caller):
}


def _conflict() -> ConflictError:
return ErrorCode.CONFLICT_PASSTHROUGH.exception_with_parameters(
409, "name already exists", ""
def _conflict(slug: Optional[str] = None) -> AtlanError:
"""A realistic duplicate-name 409 — the server body has no code/status field, so
the transport raises a plain AtlanError (NOT the mapped ConflictError). When
``slug`` is given it is embedded in the body, as Heracles actually returns it."""
body = {"message": "a workflow with that name already exists"}
if slug:
body["slug"] = slug
return AtlanError(
SimpleNamespace(
http_error_code=409,
error_id="ATLAN-PYTHON-409-000",
error_message=json.dumps(body),
user_action="resolve the conflict",
)
)


def test_create_reuses_existing_slug_on_conflict(client, mock_api_caller):
# First call (create) -> 409; second call (get_all by name) -> the existing one.
def test_create_reuses_slug_from_conflict_body(client, mock_api_caller):
# The 409 body carries the existing slug -> reuse it directly, no second call.
mock_api_caller._call_api.side_effect = [_conflict(slug="prod-crawler-9f")]
result = client.create(app_id="bigquery-crawler", name="prod-crawler", inputs={})
assert isinstance(result, AppResponse)
assert result.slug == "prod-crawler-9f"
assert mock_api_caller._call_api.call_count == 1 # no get_all needed


def test_create_reuses_slug_via_name_lookup(client, mock_api_caller):
# 409 body without a slug -> fall back to resolving by name.
mock_api_caller._call_api.side_effect = [
_conflict(),
{
Expand All @@ -152,7 +175,6 @@ def test_create_reuses_existing_slug_on_conflict(client, mock_api_caller):
result = client.create(app_id="bigquery-crawler", name="prod-crawler", inputs={})
assert isinstance(result, AppResponse)
assert result.slug == "prod-crawler-9f"
# the reuse lookup filtered by name
assert mock_api_caller._call_api.call_args.kwargs["query_params"] == {
"name": "prod-crawler"
}
Expand All @@ -163,7 +185,7 @@ def test_create_reraises_conflict_when_name_not_unique(client, mock_api_caller):
_conflict(),
{"workflows": [{"slug": "a-1", "name": "dup"}, {"slug": "a-2", "name": "dup"}]},
]
with pytest.raises(ConflictError):
with pytest.raises(AtlanError):
client.create(app_id="x", name="dup", inputs={})


Expand Down
Loading