Source code for fairops.repositories.figshare

import hashlib
import json
import os
import re
from fairops.utils.decorators import private

import requests
from requests.exceptions import HTTPError
from tqdm import tqdm


# TODO: Implement ABC
[docs] class FigshareClient: """ A client for interacting with the Figshare API to manage projects, articles, and file uploads/downloads. """ def __init__(self, api_token: str): """ Initialize the Figshare client with an API token. Args: api_token (str): The Figshare API token for authentication. """ if api_token is None: raise Exception("figshare API token must be set") self.api_token = api_token self.base_url = "https://api.figshare.com/v2" self.headers = {"Authorization": f"token {self.api_token}"} self.chunk_size = 10485760 # 10MB @private def _issue_request(self, method: str, url: str, data: dict = None, binary: bool = False, stream: bool = None): """ Make an authenticated request to the Figshare API. Args: method (str): HTTP method (GET, POST, PUT, etc.). url (str): API endpoint URL. data (dict, optional): Request payload data. binary (bool, optional): Set to True for binary file uploads. stream (bool, optional): Set to True for streaming responses. Returns: dict or requests.Response: JSON response data or raw response object if streamed. """ if data is not None and not binary: data = json.dumps(data) response = requests.request( method, url, headers=self.headers, data=data, stream=stream ) try: response.raise_for_status() if stream is not None and stream: return response try: data = json.loads(response.content) except ValueError: data = response.content except HTTPError as error: print('Caught an HTTPError: {}'.format(error.message)) print('Body:\n', response.content) raise return data
[docs] def download_files_by_id(self, article_id: int, output_path: str, private=False) -> str: """ Download all files associated with an article. Args: article_id (int): The Figshare article ID. output_path (str): Local directory to save downloaded files. Returns: str: Path to the downloaded files. """ output_path = os.path.join(output_path, str(article_id)) if not os.path.exists(output_path): os.makedirs(output_path, exist_ok=True) files = [] try: files = self._issue_request( "GET", f"{self.base_url}/account/articles/{article_id}/files" ) except: # noqa: E722 try: files = self._issue_request( "GET", f"{self.base_url}/articles/{article_id}/files" ) except: # noqa: E722 raise Exception("DOI not found or insufficent permissions") for file in files: file_download_url = file["download_url"] file_name = file["name"] full_path = os.path.join(output_path, file_name) file_data = self._issue_request( "GET", file_download_url, stream=True ) total_size = int(file_data.headers.get("content-length", 0)) with open(full_path, "wb") as f, tqdm( total=total_size, unit="B", unit_scale=True, desc=file_name ) as progress_bar: for chunk in file_data.iter_content(chunk_size=8192): f.write(chunk) progress_bar.update(len(chunk)) return output_path
[docs] def download_files_by_doi(self, doi: str, output_path: str) -> str: """ Download files using a Figshare DOI. Args: doi (str): The DOI of the Figshare article. output_path (str): Local directory to save downloaded files. Returns: str: Path to the downloaded files, or None if the article is not found. """ doi_article_pattern = r"figshare\.(\d+)" match = re.search(doi_article_pattern, doi) article_id = None if match: article_id = match.group(1) else: print("Article not found") return None return self.download_files_by_id(article_id, output_path)
[docs] def delete_project(self, project_id: int): url = f"{self.base_url}/account/projects/{project_id}" self._issue_request("DELETE", url) return project_id
[docs] def create_project(self, title: str, description: str) -> int: """ Create a new project on Figshare. Args: title (str): The title of the project. description (str): A description of the project. Returns: int: The newly created project ID. """ url = f"{self.base_url}/account/projects" data = {"title": title, "description": description} project = self._issue_request( "POST", url, data=data ) return project["entity_id"]
[docs] def create_article_in_project(self, project_id: int, title: str) -> int: """ Create a new article within a Figshare project. Args: project_id (int): The Figshare project ID. title (str): The title of the article. Returns: int: The newly created article ID. """ url = f"{self.base_url}/account/projects/{project_id}/articles" data = {"title": title} response = self._issue_request("POST", url, data=data) return response["entity_id"]
[docs] def delete_article(self, article_id: int): url = f"{self.base_url}/account/articles/{article_id}" self._issue_request("DELETE", url) return article_id
@private def _get_file_check_data(self, file_name: str): """ Calculate the MD5 checksum and file size. Args: file_name (str): The file path. Returns: tuple: (MD5 hash, file size in bytes). """ with open(file_name, 'rb') as fin: md5 = hashlib.md5() size = 0 data = fin.read(self.chunk_size) while data: size += len(data) md5.update(data) data = fin.read(self.chunk_size) return md5.hexdigest(), size @private def _initiate_new_upload(self, article_id: int, file_name: str): """ Initiate a new file upload. Args: article_id (int): The ID of the article where the file will be uploaded. file_name (str): The local file path. Returns: dict: File upload details. """ endpoint = f'{self.base_url}/account/articles/{article_id}/files' md5, size = self._get_file_check_data(file_name) data = { 'name': os.path.basename(file_name), 'md5': md5, 'size': size } result = self._issue_request('POST', endpoint, data=data) result = self._issue_request('GET', result['location']) return result @private def _complete_upload(self, article_id: int, file_id: int): """ Complete an upload after all parts have been uploaded. Args: article_id (int): The article ID. file_id (int): The file ID. """ self._issue_request( "POST", f'{self.base_url}/account/articles/{article_id}/files/{file_id}' ) @private def _upload_part(self, file_info: dict, stream, part: dict): """ Upload a part of a file. Args: file_info (dict): File upload details. stream (file object): Opened file stream. part (dict): Part metadata including start and end offsets. """ udata = file_info.copy() udata.update(part) url = f'{udata["upload_url"]}/{udata["partNo"]}' stream.seek(part['startOffset']) data = stream.read(part['endOffset'] - part['startOffset'] + 1) self._issue_request('PUT', url, data=data, binary=True) @private def _upload_parts(self, data_file: str, file_info: dict, parent_pbar): """ Upload a file in chunks to Figshare. Args: data_file (str): Local file path. file_info (dict): File upload details. parent_pbar (tqdm): Parent progress bar. """ result = self._issue_request('GET', file_info["upload_url"]) file_size = os.path.getsize(data_file) cur_part = 0 with open(data_file, 'rb') as fin, tqdm( total=file_size, desc=" ↳ Uploading parts for file", unit="B", leave=False ) as parts_pbar: for part in result['parts']: self._upload_part(file_info, fin, part) uploaded_bytes = cur_part * self.chunk_size part_size = min(self.chunk_size, file_size - uploaded_bytes) parts_pbar.update(part_size) cur_part += 1 parent_pbar.update(1)
[docs] def upload_files_to_project(self, project_id: int, title: str, file_paths: list): """ Upload multiple files to a Figshare project. Args: project_id (int): The Figshare project ID. title (str): The article title. file_paths (list): List of file paths to upload. """ article_id = self.create_article_in_project(project_id, title) with tqdm( total=len(file_paths), desc="Uploading files", unit="file" ) as files_pbar: for file_path in file_paths: file_info = self._initiate_new_upload(article_id, file_path) self._upload_parts(file_path, file_info, files_pbar) self._complete_upload(article_id, file_info['id']) result = { "project_id": project_id, "article_id": article_id, "url": f"https://figshare.com/account/items/{article_id}/edit" } return result