Skip to content

Commit

Permalink
Add filebase pinning client (#76)
Browse files Browse the repository at this point in the history
* Add filebase pinning client

* Fix IpfsMultiUploadClient.remove

* Restore flake8
  • Loading branch information
evgeny-stakewise authored Jan 9, 2024
1 parent 73f7956 commit 5e6b2b0
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 12 deletions.
7 changes: 6 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,12 @@ ignored-modules=["milagro_bls_binding"]

[tool.flake8]
max-line-length = 100
select = ["E121"]
extend-ignore = [
"E203", # whitespace before ':'
]
exclude = [
"*/__init__.py",
]

[tool.isort]
profile = "black"
Expand Down
101 changes: 91 additions & 10 deletions sw_utils/ipfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,21 @@ async def remove(self, ipfs_hash: str) -> None:
raise NotImplementedError


class BasePinClient(ABC):
"""
Allows to re-pin existing CID.
https://ipfs.github.io/pinning-services-api-spec/
"""

@abstractmethod
async def pin(self, ipfs_hash: str) -> str:
raise NotImplementedError

@abstractmethod
async def remove(self, ipfs_hash: str) -> None:
raise NotImplementedError


class IpfsUploadClient(BaseUploadClient):
def __init__(
self,
Expand Down Expand Up @@ -203,12 +218,67 @@ async def _upload(self, form_data: aiohttp.FormData) -> str:
return _strip_ipfs_prefix(ipfs_id)


class FilebasePinClient(BasePinClient):
"""
https://docs.filebase.com/api-documentation/ipfs-pinning-service-api
"""

base_url = 'https://api.filebase.io/v1/ipfs/'

def __init__(self, bucket: str, api_token: str, timeout: int = IPFS_DEFAULT_TIMEOUT):
self.bucket = bucket
self.api_token = api_token
self.timeout = timeout

async def pin(self, ipfs_hash: str) -> str:
data = {
'cid': ipfs_hash,
}
response = await self._call('POST', 'pins', data=data)
cid = response['pin']['cid']
if cid != ipfs_hash:
raise ValueError(f'cid {cid} is not equal to ipfs_hash {ipfs_hash}')
return cid

async def remove(self, ipfs_hash: str) -> None:
pin_results = await self._call('GET', 'pins', data={'cid': ipfs_hash})

# Filebase returns the same request_id when pinning the same cid twice
request_id = pin_results['results'][0]['requestid']

await self._call('DELETE', f'pins/{request_id}')

async def _call(self, http_method: str, endpoint: str, data: dict | None = None) -> dict:
url = urljoin(self.base_url, endpoint)
logger.debug('%s %s', http_method, url)

# User and bucket are determined by token
headers = {'Authorization': f'Bearer {self.api_token}'}

async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(self.timeout)) as session:
session_method = getattr(session, http_method.lower())
async with session_method(url, json=data, headers=headers) as response:
response.raise_for_status()
return await response.json()


class IpfsMultiUploadClient(BaseUploadClient):
def __init__(self, clients: list[BaseUploadClient], retry_timeout: int = 120):
if len(clients) == 0:
raise ValueError('Invalid number of clients')
self.clients = clients
self.quorum = (len(clients) // 2) + 1
def __init__(self, clients: list[BaseUploadClient | BasePinClient], retry_timeout: int = 120):
self.upload_clients = []
self.pin_clients = []

for client in clients:
if isinstance(client, BaseUploadClient):
self.upload_clients.append(client)
elif isinstance(client, BasePinClient):
self.pin_clients.append(client)
else:
logger.warning('Unexpected client type %s', type(client))

if len(self.upload_clients) == 0:
raise ValueError('Invalid number of upload clients')

self.quorum = (len(self.upload_clients) // 2) + 1
self.retry_timeout = retry_timeout

async def upload_bytes(self, data: bytes) -> str:
Expand All @@ -224,8 +294,13 @@ def custom_before_log(retry_state: 'RetryCallState') -> None:
return await retry_decorator(self._upload_bytes_all_clients)(data)

async def _upload_bytes_all_clients(self, data: bytes) -> str:
coros = [client.upload_bytes(data) for client in self.clients]
return await self._upload(coros)
coros = [client.upload_bytes(data) for client in self.upload_clients]
ipfs_hash = await self._upload(coros)

if self.pin_clients:
await asyncio.gather(*(pin_client.pin(ipfs_hash) for pin_client in self.pin_clients))

return ipfs_hash

async def upload_json(self, data: dict | list) -> str:
if not data:
Expand All @@ -240,8 +315,13 @@ def custom_before_log(retry_state: 'RetryCallState') -> None:
return await retry_decorator(self._upload_json_all_clients)(data)

async def _upload_json_all_clients(self, data: dict | list) -> str:
coros = [client.upload_json(data) for client in self.clients]
return await self._upload(coros)
coros = [client.upload_json(data) for client in self.upload_clients]
ipfs_hash = await self._upload(coros)

if self.pin_clients:
await asyncio.gather(*(pin_client.pin(ipfs_hash) for pin_client in self.pin_clients))

return ipfs_hash

async def _upload(self, coros: list) -> str:
result = await asyncio.gather(*coros, return_exceptions=True)
Expand Down Expand Up @@ -271,8 +351,9 @@ async def _upload(self, coros: list) -> str:
async def remove(self, ipfs_hash: str) -> None:
if not ipfs_hash:
raise ValueError('Empty IPFS hash provided')
clients: list = self.upload_clients + self.pin_clients
result = await asyncio.gather(
*[client.remove(ipfs_hash) for client in self.clients], return_exceptions=True
*[client.remove(ipfs_hash) for client in clients], return_exceptions=True
)
for value in result:
if isinstance(value, BaseException):
Expand Down
2 changes: 1 addition & 1 deletion sw_utils/ssz/hashable_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def update_element_in_chunk(original_chunk: Hash32, index: int, element: bytes)
chunk_size = len(original_chunk)

if element_size == 0:
raise ValueError(f'Element size is zero')
raise ValueError('Element size is zero')
if chunk_size % element_size != 0:
raise ValueError(f'Element size is not a divisor of chunk size: {element_size}')
if not 0 <= index < chunk_size // element_size:
Expand Down

0 comments on commit 5e6b2b0

Please sign in to comment.