Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dynamic = [ "version" ]
dependencies = [
"aiohttp>=3.8.3",
"aioresponses>=0.7.6",
"aleph-cid>=0.1",
"aleph-message>=1.1.1",
"aleph-superfluid>=0.3",
"base58==2.1.1", # Needed now as default with _load_account changement
Expand Down
150 changes: 142 additions & 8 deletions src/aleph/sdk/client/authenticated_http.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import asyncio
import hashlib
import json
import logging
import ssl
import tempfile
import time
from io import BytesIO
from pathlib import Path
from typing import Any, Dict, Mapping, NoReturn, Optional, Tuple, Union

import aiohttp
import aleph_cid
from aleph_message.models import (
AggregateContent,
AggregateMessage,
Expand Down Expand Up @@ -382,9 +385,18 @@ async def create_store(
payment=payment,
)
elif storage_engine == StorageEnum.ipfs:
# We do not support authenticated upload for IPFS yet. Use the legacy method
# of uploading the file first then publishing the message using POST /messages.
file_hash = await self.ipfs_push_file(file_content=file_content)
# Compute the CID locally and upload the file and message all
# at once using authenticated upload.
return await self._upload_file_ipfs(
address=address,
file_content=file_content,
guess_mime_type=guess_mime_type,
ref=ref,
extra_fields=extra_fields,
channel=channel,
sync=sync,
payment=payment,
)
else:
raise ValueError(f"Unknown storage engine: '{storage_engine}'")

Expand Down Expand Up @@ -620,14 +632,18 @@ async def submit(
)
return message, message_status, response

async def _storage_push_file_with_message(
async def _push_file_with_message(
self,
url: str,
file_content: bytes,
store_content: StoreContent,
channel: Optional[str] = settings.DEFAULT_CHANNEL,
sync: bool = False,
file_name: Optional[str] = None,
file_content_type: Optional[str] = None,
) -> Tuple[StoreMessage, MessageStatus]:
"""Push a file to the storage service."""
"""Sign a STORE message and upload it together with the file in a
single authenticated multipart request."""
data = aiohttp.FormData()

# Prepare the STORE message
Expand All @@ -646,9 +662,13 @@ async def _storage_push_file_with_message(
content_type="application/json",
)
# Add the file
data.add_field("file", BytesIO(file_content))
data.add_field(
"file",
BytesIO(file_content),
filename=file_name,
content_type=file_content_type,
)

url = "/api/v0/storage/add_file"
logger.debug(f"Posting file on {url}")

async with self.http_session.post(url, data=data) as resp:
Expand Down Expand Up @@ -685,7 +705,52 @@ async def _upload_file_native(
payment=payment,
**(extra_fields or {}),
)
message, _ = await self._storage_push_file_with_message(
message, _ = await self._push_file_with_message(
url="/api/v0/storage/add_file",
file_content=file_content,
store_content=store_content,
channel=channel,
sync=sync,
)

# Some nodes may not implement authenticated file upload yet. As we cannot detect
# this easily, broadcast the message a second time to ensure publication on older
# nodes.
_, status = await self._broadcast(message=message, sync=sync)
return message, status

async def _upload_file_ipfs(
self,
address: str,
file_content: bytes,
guess_mime_type: bool = False,
ref: Optional[str] = None,
extra_fields: Optional[dict] = None,
channel: Optional[str] = settings.DEFAULT_CHANNEL,
sync: bool = False,
payment: Optional[Payment] = None,
) -> Tuple[StoreMessage, MessageStatus]:
"""Authenticated IPFS upload: compute the CID locally (kubo-default
CIDv0, matching what the node will assign), then upload the file and
the signed STORE message in a single request."""
file_hash = aleph_cid.compute_cid(file_content)
if magic and guess_mime_type:
mime_type = magic.from_buffer(file_content, mime=True)
else:
mime_type = None

store_content = StoreContent(
address=address,
ref=ref,
item_type=ItemType.ipfs,
item_hash=ItemHash(file_hash),
mime_type=mime_type, # type: ignore
time=time.time(),
payment=payment,
**(extra_fields or {}),
)
message, _ = await self._push_file_with_message(
url="/api/v0/ipfs/add_file",
file_content=file_content,
store_content=store_content,
channel=channel,
Expand All @@ -697,3 +762,72 @@ async def _upload_file_native(
# nodes.
_, status = await self._broadcast(message=message, sync=sync)
return message, status

async def create_store_folder(
self,
folder_path: Union[str, Path],
address: Optional[str] = None,
ref: Optional[str] = None,
extra_fields: Optional[dict] = None,
channel: Optional[str] = settings.DEFAULT_CHANNEL,
sync: bool = False,
payment: Optional[Payment] = None,
) -> Tuple[StoreMessage, MessageStatus]:
"""
Upload a folder to IPFS as a UnixFS directory and create a STORE
message for its root CID, in a single authenticated request.

The folder is packed locally into a CARv1 file (with the same root
CID that kubo would assign, CIDv1) and posted to
/api/v0/ipfs/add_car together with the signed STORE message.

Requires a node exposing /api/v0/ipfs/add_car.

:param folder_path: Path to the folder to upload
:param address: Address to use or None to use the account address
:param ref: A reference to a previous message
:param extra_fields: Extra fields to add to the STORE message content
:param channel: Channel to use
:param sync: If true, waits for the message to be processed by the API server
:param payment: Payment method used to pay for the storage
"""
folder_path = Path(folder_path)
if not folder_path.is_dir():
raise ValueError(f"Not a directory: {folder_path}")

address = address or settings.ADDRESS_TO_USE or self.account.get_address()
payment = payment or Payment(
chain=Chain.ETH, type=PaymentType.hold, receiver=None
)

with tempfile.TemporaryDirectory() as tmp_dir:
car_path = Path(tmp_dir) / "upload.car"
# CAR packing reads the whole folder; run it off the event loop.
root_cid = await asyncio.to_thread(
aleph_cid.write_folder_car, folder_path, car_path
)
car_content = car_path.read_bytes()

store_content = StoreContent(
address=address,
ref=ref,
item_type=ItemType.ipfs,
item_hash=ItemHash(root_cid),
time=time.time(),
payment=payment,
**(extra_fields or {}),
)
message, status = await self._push_file_with_message(
url="/api/v0/ipfs/add_car",
file_content=car_content,
store_content=store_content,
channel=channel,
sync=sync,
file_name="upload.car",
file_content_type="application/vnd.ipld.car",
)

# Mirror the file upload paths: broadcast the message a second time to
# ensure publication on nodes that do not process the metadata part.
_, status = await self._broadcast(message=message, sync=sync)
return message, status
122 changes: 122 additions & 0 deletions tests/unit/test_authenticated_ipfs_upload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
"""Tests for authenticated IPFS uploads (client-side CID computation).

Golden CIDs come from the aleph-cid test suite, which pins them against real
kubo (v0.30.0): the same fixtures must produce the same CIDs here.
"""

import json
from pathlib import Path

import aleph_cid
import pytest
from aleph_message.models import ItemType, StoreMessage
from aleph_message.status import MessageStatus

from aleph.sdk.types import StorageEnum

# kubo `ipfs add` of an empty file (CIDv0)
GOLDEN_EMPTY_FILE_V0 = "QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH"
# kubo /api/v0/add?wrap-with-directory=true of the nested fixture below (CIDv1)
GOLDEN_NESTED_DIR_V1 = "bafybeidcclyz24mrl4furbaf4ecb3ks52dbfer7r6dxqavd4wrqg7bp7lu"


def fixture_nested_dir(root: Path):
(root / "top.txt").write_bytes(b"top\n")
sub = root / "sub"
sub.mkdir()
(sub / "inner.txt").write_bytes(b"inner\n")
deeper = sub / "deeper"
deeper.mkdir()
(deeper / "leaf.txt").write_bytes(b"leaf\n")


def posted_form_fields(http_session, url: str) -> dict:
"""Return {field_name: (headers, value)} of the multipart form posted to `url`."""
for call in http_session.post.call_args_list:
if call.args and call.args[0] == url:
form = call.kwargs["data"]
return {
type_options["name"]: (headers, value)
for type_options, headers, value in form._fields
}
raise AssertionError(f"No POST to {url}")


def test_golden_cid_vector():
assert aleph_cid.compute_cid(b"") == GOLDEN_EMPTY_FILE_V0


@pytest.mark.asyncio
async def test_create_store_ipfs_authenticated(mock_session_with_post_success):
content = b"Test authenticated ipfs upload\n"
expected_cid = aleph_cid.compute_cid(content)
assert expected_cid.startswith("Qm")

async with mock_session_with_post_success as session:
message, status = await session.create_store(
file_content=content,
channel="TEST",
storage_engine=StorageEnum.ipfs,
)

assert isinstance(message, StoreMessage)
assert status == MessageStatus.PENDING
assert message.content.item_type == ItemType.ipfs
assert message.content.item_hash == expected_cid
assert message.signature

# The file and the signed message must travel in one multipart request.
fields = posted_form_fields(
mock_session_with_post_success.http_session, "/api/v0/ipfs/add_file"
)
assert fields["file"][1].read() == content
metadata = json.loads(fields["metadata"][1])
assert metadata["sync"] is False
assert metadata["message"]["signature"] == message.signature
assert json.loads(metadata["message"]["item_content"])["item_hash"] == expected_cid


@pytest.mark.asyncio
async def test_create_store_folder(mock_session_with_post_success, tmp_path):
folder = tmp_path / "site"
folder.mkdir()
fixture_nested_dir(folder)

async with mock_session_with_post_success as session:
message, status = await session.create_store_folder(
folder_path=folder,
channel="TEST",
)

assert isinstance(message, StoreMessage)
assert status == MessageStatus.PENDING
assert message.content.item_type == ItemType.ipfs
assert message.content.item_hash == GOLDEN_NESTED_DIR_V1
assert message.signature

fields = posted_form_fields(
mock_session_with_post_success.http_session, "/api/v0/ipfs/add_car"
)
metadata = json.loads(fields["metadata"][1])
assert (
json.loads(metadata["message"]["item_content"])["item_hash"]
== GOLDEN_NESTED_DIR_V1
)

# The uploaded CAR must be a valid CARv1 whose root is the message CID.
car_headers, car_value = fields["file"]
assert car_headers.get("Content-Type") == "application/vnd.ipld.car"
car_file = tmp_path / "roundtrip.car"
car_file.write_bytes(car_value.read())
assert aleph_cid.read_carv1_root(car_file) == GOLDEN_NESTED_DIR_V1


@pytest.mark.asyncio
async def test_create_store_folder_rejects_files(
mock_session_with_post_success, tmp_path
):
not_a_dir = tmp_path / "file.txt"
not_a_dir.write_bytes(b"x")
async with mock_session_with_post_success as session:
with pytest.raises(ValueError):
await session.create_store_folder(folder_path=not_a_dir)
Loading