diff --git a/pyproject.toml b/pyproject.toml index 13ee595d..cdd5c9fe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,7 @@ dynamic = [ "version" ] dependencies = [ "aiohttp>=3.8.3", "aioresponses>=0.7.6", + "aleph-cid>=0.1", "aleph-message>=1.1.1", "aleph-superfluid>=0.3", "base58==2.1.1", # Needed now as default with _load_account changement diff --git a/src/aleph/sdk/client/authenticated_http.py b/src/aleph/sdk/client/authenticated_http.py index 11aa08f0..0c27bf4b 100644 --- a/src/aleph/sdk/client/authenticated_http.py +++ b/src/aleph/sdk/client/authenticated_http.py @@ -1,13 +1,16 @@ +import asyncio import hashlib import json import logging import ssl +import tempfile import time from io import BytesIO from pathlib import Path from typing import Any, Dict, Mapping, NoReturn, Optional, Tuple, Union import aiohttp +import aleph_cid from aleph_message.models import ( AggregateContent, AggregateMessage, @@ -382,9 +385,18 @@ async def create_store( payment=payment, ) elif storage_engine == StorageEnum.ipfs: - # We do not support authenticated upload for IPFS yet. Use the legacy method - # of uploading the file first then publishing the message using POST /messages. - file_hash = await self.ipfs_push_file(file_content=file_content) + # Compute the CID locally and upload the file and message all + # at once using authenticated upload. + return await self._upload_file_ipfs( + address=address, + file_content=file_content, + guess_mime_type=guess_mime_type, + ref=ref, + extra_fields=extra_fields, + channel=channel, + sync=sync, + payment=payment, + ) else: raise ValueError(f"Unknown storage engine: '{storage_engine}'") @@ -620,14 +632,18 @@ async def submit( ) return message, message_status, response - async def _storage_push_file_with_message( + async def _push_file_with_message( self, + url: str, file_content: bytes, store_content: StoreContent, channel: Optional[str] = settings.DEFAULT_CHANNEL, sync: bool = False, + file_name: Optional[str] = None, + file_content_type: Optional[str] = None, ) -> Tuple[StoreMessage, MessageStatus]: - """Push a file to the storage service.""" + """Sign a STORE message and upload it together with the file in a + single authenticated multipart request.""" data = aiohttp.FormData() # Prepare the STORE message @@ -646,9 +662,13 @@ async def _storage_push_file_with_message( content_type="application/json", ) # Add the file - data.add_field("file", BytesIO(file_content)) + data.add_field( + "file", + BytesIO(file_content), + filename=file_name, + content_type=file_content_type, + ) - url = "/api/v0/storage/add_file" logger.debug(f"Posting file on {url}") async with self.http_session.post(url, data=data) as resp: @@ -685,7 +705,52 @@ async def _upload_file_native( payment=payment, **(extra_fields or {}), ) - message, _ = await self._storage_push_file_with_message( + message, _ = await self._push_file_with_message( + url="/api/v0/storage/add_file", + file_content=file_content, + store_content=store_content, + channel=channel, + sync=sync, + ) + + # Some nodes may not implement authenticated file upload yet. As we cannot detect + # this easily, broadcast the message a second time to ensure publication on older + # nodes. + _, status = await self._broadcast(message=message, sync=sync) + return message, status + + async def _upload_file_ipfs( + self, + address: str, + file_content: bytes, + guess_mime_type: bool = False, + ref: Optional[str] = None, + extra_fields: Optional[dict] = None, + channel: Optional[str] = settings.DEFAULT_CHANNEL, + sync: bool = False, + payment: Optional[Payment] = None, + ) -> Tuple[StoreMessage, MessageStatus]: + """Authenticated IPFS upload: compute the CID locally (kubo-default + CIDv0, matching what the node will assign), then upload the file and + the signed STORE message in a single request.""" + file_hash = aleph_cid.compute_cid(file_content) + if magic and guess_mime_type: + mime_type = magic.from_buffer(file_content, mime=True) + else: + mime_type = None + + store_content = StoreContent( + address=address, + ref=ref, + item_type=ItemType.ipfs, + item_hash=ItemHash(file_hash), + mime_type=mime_type, # type: ignore + time=time.time(), + payment=payment, + **(extra_fields or {}), + ) + message, _ = await self._push_file_with_message( + url="/api/v0/ipfs/add_file", file_content=file_content, store_content=store_content, channel=channel, @@ -697,3 +762,72 @@ async def _upload_file_native( # nodes. _, status = await self._broadcast(message=message, sync=sync) return message, status + + async def create_store_folder( + self, + folder_path: Union[str, Path], + address: Optional[str] = None, + ref: Optional[str] = None, + extra_fields: Optional[dict] = None, + channel: Optional[str] = settings.DEFAULT_CHANNEL, + sync: bool = False, + payment: Optional[Payment] = None, + ) -> Tuple[StoreMessage, MessageStatus]: + """ + Upload a folder to IPFS as a UnixFS directory and create a STORE + message for its root CID, in a single authenticated request. + + The folder is packed locally into a CARv1 file (with the same root + CID that kubo would assign, CIDv1) and posted to + /api/v0/ipfs/add_car together with the signed STORE message. + + Requires a node exposing /api/v0/ipfs/add_car. + + :param folder_path: Path to the folder to upload + :param address: Address to use or None to use the account address + :param ref: A reference to a previous message + :param extra_fields: Extra fields to add to the STORE message content + :param channel: Channel to use + :param sync: If true, waits for the message to be processed by the API server + :param payment: Payment method used to pay for the storage + """ + folder_path = Path(folder_path) + if not folder_path.is_dir(): + raise ValueError(f"Not a directory: {folder_path}") + + address = address or settings.ADDRESS_TO_USE or self.account.get_address() + payment = payment or Payment( + chain=Chain.ETH, type=PaymentType.hold, receiver=None + ) + + with tempfile.TemporaryDirectory() as tmp_dir: + car_path = Path(tmp_dir) / "upload.car" + # CAR packing reads the whole folder; run it off the event loop. + root_cid = await asyncio.to_thread( + aleph_cid.write_folder_car, folder_path, car_path + ) + car_content = car_path.read_bytes() + + store_content = StoreContent( + address=address, + ref=ref, + item_type=ItemType.ipfs, + item_hash=ItemHash(root_cid), + time=time.time(), + payment=payment, + **(extra_fields or {}), + ) + message, status = await self._push_file_with_message( + url="/api/v0/ipfs/add_car", + file_content=car_content, + store_content=store_content, + channel=channel, + sync=sync, + file_name="upload.car", + file_content_type="application/vnd.ipld.car", + ) + + # Mirror the file upload paths: broadcast the message a second time to + # ensure publication on nodes that do not process the metadata part. + _, status = await self._broadcast(message=message, sync=sync) + return message, status diff --git a/tests/unit/test_authenticated_ipfs_upload.py b/tests/unit/test_authenticated_ipfs_upload.py new file mode 100644 index 00000000..a7ea5a1e --- /dev/null +++ b/tests/unit/test_authenticated_ipfs_upload.py @@ -0,0 +1,122 @@ +"""Tests for authenticated IPFS uploads (client-side CID computation). + +Golden CIDs come from the aleph-cid test suite, which pins them against real +kubo (v0.30.0): the same fixtures must produce the same CIDs here. +""" + +import json +from pathlib import Path + +import aleph_cid +import pytest +from aleph_message.models import ItemType, StoreMessage +from aleph_message.status import MessageStatus + +from aleph.sdk.types import StorageEnum + +# kubo `ipfs add` of an empty file (CIDv0) +GOLDEN_EMPTY_FILE_V0 = "QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH" +# kubo /api/v0/add?wrap-with-directory=true of the nested fixture below (CIDv1) +GOLDEN_NESTED_DIR_V1 = "bafybeidcclyz24mrl4furbaf4ecb3ks52dbfer7r6dxqavd4wrqg7bp7lu" + + +def fixture_nested_dir(root: Path): + (root / "top.txt").write_bytes(b"top\n") + sub = root / "sub" + sub.mkdir() + (sub / "inner.txt").write_bytes(b"inner\n") + deeper = sub / "deeper" + deeper.mkdir() + (deeper / "leaf.txt").write_bytes(b"leaf\n") + + +def posted_form_fields(http_session, url: str) -> dict: + """Return {field_name: (headers, value)} of the multipart form posted to `url`.""" + for call in http_session.post.call_args_list: + if call.args and call.args[0] == url: + form = call.kwargs["data"] + return { + type_options["name"]: (headers, value) + for type_options, headers, value in form._fields + } + raise AssertionError(f"No POST to {url}") + + +def test_golden_cid_vector(): + assert aleph_cid.compute_cid(b"") == GOLDEN_EMPTY_FILE_V0 + + +@pytest.mark.asyncio +async def test_create_store_ipfs_authenticated(mock_session_with_post_success): + content = b"Test authenticated ipfs upload\n" + expected_cid = aleph_cid.compute_cid(content) + assert expected_cid.startswith("Qm") + + async with mock_session_with_post_success as session: + message, status = await session.create_store( + file_content=content, + channel="TEST", + storage_engine=StorageEnum.ipfs, + ) + + assert isinstance(message, StoreMessage) + assert status == MessageStatus.PENDING + assert message.content.item_type == ItemType.ipfs + assert message.content.item_hash == expected_cid + assert message.signature + + # The file and the signed message must travel in one multipart request. + fields = posted_form_fields( + mock_session_with_post_success.http_session, "/api/v0/ipfs/add_file" + ) + assert fields["file"][1].read() == content + metadata = json.loads(fields["metadata"][1]) + assert metadata["sync"] is False + assert metadata["message"]["signature"] == message.signature + assert json.loads(metadata["message"]["item_content"])["item_hash"] == expected_cid + + +@pytest.mark.asyncio +async def test_create_store_folder(mock_session_with_post_success, tmp_path): + folder = tmp_path / "site" + folder.mkdir() + fixture_nested_dir(folder) + + async with mock_session_with_post_success as session: + message, status = await session.create_store_folder( + folder_path=folder, + channel="TEST", + ) + + assert isinstance(message, StoreMessage) + assert status == MessageStatus.PENDING + assert message.content.item_type == ItemType.ipfs + assert message.content.item_hash == GOLDEN_NESTED_DIR_V1 + assert message.signature + + fields = posted_form_fields( + mock_session_with_post_success.http_session, "/api/v0/ipfs/add_car" + ) + metadata = json.loads(fields["metadata"][1]) + assert ( + json.loads(metadata["message"]["item_content"])["item_hash"] + == GOLDEN_NESTED_DIR_V1 + ) + + # The uploaded CAR must be a valid CARv1 whose root is the message CID. + car_headers, car_value = fields["file"] + assert car_headers.get("Content-Type") == "application/vnd.ipld.car" + car_file = tmp_path / "roundtrip.car" + car_file.write_bytes(car_value.read()) + assert aleph_cid.read_carv1_root(car_file) == GOLDEN_NESTED_DIR_V1 + + +@pytest.mark.asyncio +async def test_create_store_folder_rejects_files( + mock_session_with_post_success, tmp_path +): + not_a_dir = tmp_path / "file.txt" + not_a_dir.write_bytes(b"x") + async with mock_session_with_post_success as session: + with pytest.raises(ValueError): + await session.create_store_folder(folder_path=not_a_dir)