Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add filebase pinning client #76

Merged
merged 3 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,12 @@ ignored-modules=["milagro_bls_binding"]

[tool.flake8]
max-line-length = 100
select = ["E121"]
extend-ignore = [
"E203", # whitespace before ':'
]
exclude = [
"*/__init__.py",
]

[tool.isort]
profile = "black"
Expand Down
101 changes: 91 additions & 10 deletions sw_utils/ipfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,21 @@ async def remove(self, ipfs_hash: str) -> None:
raise NotImplementedError


class BasePinClient(ABC):
"""
Allows to re-pin existing CID.
https://ipfs.github.io/pinning-services-api-spec/
"""

@abstractmethod
async def pin(self, ipfs_hash: str) -> str:
raise NotImplementedError

@abstractmethod
async def remove(self, ipfs_hash: str) -> None:
raise NotImplementedError


class IpfsUploadClient(BaseUploadClient):
def __init__(
self,
Expand Down Expand Up @@ -203,12 +218,67 @@ async def _upload(self, form_data: aiohttp.FormData) -> str:
return _strip_ipfs_prefix(ipfs_id)


class FilebasePinClient(BasePinClient):
"""
https://docs.filebase.com/api-documentation/ipfs-pinning-service-api
"""

base_url = 'https://api.filebase.io/v1/ipfs/'

def __init__(self, bucket: str, api_token: str, timeout: int = IPFS_DEFAULT_TIMEOUT):
self.bucket = bucket
self.api_token = api_token
self.timeout = timeout

async def pin(self, ipfs_hash: str) -> str:
data = {
'cid': ipfs_hash,
}
response = await self._call('POST', 'pins', data=data)
cid = response['pin']['cid']
if cid != ipfs_hash:
raise ValueError(f'cid {cid} is not equal to ipfs_hash {ipfs_hash}')
return cid

async def remove(self, ipfs_hash: str) -> None:
pin_results = await self._call('GET', 'pins', data={'cid': ipfs_hash})

# Filebase returns the same request_id when pinning the same cid twice
request_id = pin_results['results'][0]['requestid']

await self._call('DELETE', f'pins/{request_id}')

async def _call(self, http_method: str, endpoint: str, data: dict | None = None) -> dict:
url = urljoin(self.base_url, endpoint)
logger.debug('%s %s', http_method, url)

# User and bucket are determined by token
headers = {'Authorization': f'Bearer {self.api_token}'}

async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(self.timeout)) as session:
session_method = getattr(session, http_method.lower())
async with session_method(url, json=data, headers=headers) as response:
response.raise_for_status()
return await response.json()


class IpfsMultiUploadClient(BaseUploadClient):
def __init__(self, clients: list[BaseUploadClient], retry_timeout: int = 120):
if len(clients) == 0:
raise ValueError('Invalid number of clients')
self.clients = clients
self.quorum = (len(clients) // 2) + 1
def __init__(self, clients: list[BaseUploadClient | BasePinClient], retry_timeout: int = 120):
self.upload_clients = []
self.pin_clients = []

for client in clients:
if isinstance(client, BaseUploadClient):
self.upload_clients.append(client)
elif isinstance(client, BasePinClient):
self.pin_clients.append(client)
else:
logger.warning('Unexpected client type %s', type(client))

if len(self.upload_clients) == 0:
raise ValueError('Invalid number of upload clients')

self.quorum = (len(self.upload_clients) // 2) + 1
self.retry_timeout = retry_timeout

async def upload_bytes(self, data: bytes) -> str:
Expand All @@ -224,8 +294,13 @@ def custom_before_log(retry_state: 'RetryCallState') -> None:
return await retry_decorator(self._upload_bytes_all_clients)(data)

async def _upload_bytes_all_clients(self, data: bytes) -> str:
coros = [client.upload_bytes(data) for client in self.clients]
return await self._upload(coros)
coros = [client.upload_bytes(data) for client in self.upload_clients]
ipfs_hash = await self._upload(coros)

if self.pin_clients:
await asyncio.gather(*(pin_client.pin(ipfs_hash) for pin_client in self.pin_clients))

return ipfs_hash

async def upload_json(self, data: dict | list) -> str:
if not data:
Expand All @@ -240,8 +315,13 @@ def custom_before_log(retry_state: 'RetryCallState') -> None:
return await retry_decorator(self._upload_json_all_clients)(data)

async def _upload_json_all_clients(self, data: dict | list) -> str:
coros = [client.upload_json(data) for client in self.clients]
return await self._upload(coros)
coros = [client.upload_json(data) for client in self.upload_clients]
ipfs_hash = await self._upload(coros)

if self.pin_clients:
await asyncio.gather(*(pin_client.pin(ipfs_hash) for pin_client in self.pin_clients))

return ipfs_hash

async def _upload(self, coros: list) -> str:
result = await asyncio.gather(*coros, return_exceptions=True)
Expand Down Expand Up @@ -271,8 +351,9 @@ async def _upload(self, coros: list) -> str:
async def remove(self, ipfs_hash: str) -> None:
if not ipfs_hash:
raise ValueError('Empty IPFS hash provided')
clients: list = self.upload_clients + self.pin_clients
result = await asyncio.gather(
*[client.remove(ipfs_hash) for client in self.clients], return_exceptions=True
*[client.remove(ipfs_hash) for client in clients], return_exceptions=True
)
for value in result:
if isinstance(value, BaseException):
Expand Down
2 changes: 1 addition & 1 deletion sw_utils/ssz/hashable_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def update_element_in_chunk(original_chunk: Hash32, index: int, element: bytes)
chunk_size = len(original_chunk)

if element_size == 0:
raise ValueError(f'Element size is zero')
raise ValueError('Element size is zero')
if chunk_size % element_size != 0:
raise ValueError(f'Element size is not a divisor of chunk size: {element_size}')
if not 0 <= index < chunk_size // element_size:
Expand Down
Loading