Skip to content

Commit

Permalink
Merge pull request #713 from hubmapconsortium/bulk_updating_of_entrie…
Browse files Browse the repository at this point in the history
…s_endpoints

New endpoints (PUT /datasets and PUT /uploads) to handle the bulk upd…
  • Loading branch information
yuanzhou authored Aug 12, 2024
2 parents 8b19402 + 26fea17 commit d165a7e
Show file tree
Hide file tree
Showing 5 changed files with 278 additions and 12 deletions.
46 changes: 46 additions & 0 deletions entity-api-spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2532,6 +2532,52 @@ paths:

'500':
description: Internal error
'/datasets':
put:
summary: Bulk updating of entity type dataset. it's only limited to the fields:: assigned_to_group_name, ingest_task, status
requestBody:
required: true
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/Dataset'
responses:
'202':
description: request is being processed
content:
application/json:
schema:
type: array
items:
type: string
description: The uuids of the entities being processed
'500':
description: Internal error
'/uploads':
put:
summary: Bulk updating of entity type upload. it's only limited to the fields:: assigned_to_group_name, ingest_task, status
requestBody:
required: true
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/Upload'
responses:
'202':
description: request is being processed
content:
application/json:
schema:
type: array
items:
type: string
description: The uuids of the entities being processed
'500':
description: Internal error
'/samples/prov-info':
get:
summary: 'returns all provenance information for a each sample in a json format'
Expand Down
176 changes: 172 additions & 4 deletions src/app.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import collections
import yaml
from typing import List
from typing import Callable, List, Optional
from datetime import datetime
from flask import Flask, g, jsonify, abort, request, Response, redirect, make_response
from neo4j.exceptions import TransactionError
import os
import re
import csv
import requests
from requests.adapters import HTTPAdapter, Retry
import threading
import urllib.request
from io import StringIO
# Don't confuse urllib (Python native library) with urllib3 (3rd-party library, requests also uses urllib3)
Expand Down Expand Up @@ -4212,6 +4213,173 @@ def multiple_components():
return jsonify(normalized_complete_entity_list)


# Bulk update the entities in the entity-api.
#
# This function supports request throttling and retries.
#
# Parameters
# ----------
# entity_updates : dict
# The dictionary of entity updates. The key is the uuid and the value is the
# update dictionary.
# token : str
# The groups token for the request.
# entity_api_url : str
# The url of the entity-api.
# total_tries : int, optional
# The number of total requests to be made for each update, by default 3.
# throttle : float, optional
# The time to wait between requests and retries, by default 5.
# after_each_callback : Callable[[int], None], optional
# A callback function to be called after each update, by default None. The index
# of the update is passed as a parameter to the callback.
#
# Returns
# -------
# dict
# The results of the bulk update. The key is the uuid of the entity. If
# successful, the value is a dictionary with "success" as True and "data" as the
# entity data. If failed, the value is a dictionary with "success" as False and
# "data" as the error message.
def bulk_update_entities(
entity_updates: dict,
token: str,
entity_api_url: str,
total_tries: int = 3,
throttle: float = 5,
after_each_callback: Optional[Callable[[int], None]] = None,
) -> dict:
headers = {
"Authorization": f"Bearer {token}",
SchemaConstants.HUBMAP_APP_HEADER: SchemaConstants.ENTITY_API_APP,
}
# create a session with retries
session = requests.Session()
session.headers = headers
retries = Retry(
total=total_tries,
backoff_factor=throttle,
status_forcelist=[500, 502, 503, 504],
)
session.mount(entity_api_url, HTTPAdapter(max_retries=retries))

results = {}
with session as s:
for idx, (uuid, payload) in enumerate(entity_updates.items()):
try:
# https://github.com/hubmapconsortium/entity-api/issues/698#issuecomment-2260799700
# yuanzhou: When you iterate over the target uuids make individual PUT /entities/<uuid> calls.
# The main reason we use the PUT call rather than direct neo4j query is because the entity update
# needs to go through the schema trigger methods and generate corresponding values programmatically
# before sending over to neo4j.
# The PUT call returns the response immediately while the backend updating may be still going on.
res = s.put(
f"{entity_api_url}/entities/{uuid}", json=payload, timeout=15
)
results[uuid] = {
"success": res.ok,
"data": res.json() if res.ok else res.json().get("error"),
}
except requests.exceptions.RequestException as e:
logger.error(f"Failed to update entity {uuid}: {e}")
results[uuid] = {"success": False, "data": str(e)}

if after_each_callback:
after_each_callback(idx)

if idx < len(entity_updates) - 1:
time.sleep(throttle)

logger.info(f"bulk_update_entities() results: {results}")
return results


# For this call to work READ_ONLY_MODE = False in the app.cfg file.
def update_datasets_uploads(entity_updates: list, token: str, entity_api_url: str) -> None:
update_payload = {ds.pop("uuid"): ds for ds in entity_updates}

# send the dataset/upload updates to entity-api
update_res = bulk_update_entities(update_payload, token, entity_api_url)

for uuid, res in update_res.items():
if not res["success"]:
logger.error(f"Failed to update entity {uuid}: {res['data']}")


ENTITY_BULK_UPDATE_FIELDS_ACCEPTED = ['uuid', 'status', 'ingest_task', 'assigned_to_group_name']


# New endpoints (PUT /datasets and PUT /uploads) to handle the bulk updating of entities see Issue: #698
# https://github.com/hubmapconsortium/entity-api/issues/698
#
# This is used by Data Ingest Board application for now.
#
# Shirey: With this use case we're not worried about a lot of concurrent calls to this endpoint (only one user,
# Brendan, will be ever using it). Just start a thread on request and loop through the Datasets/Uploads to change
# with a 5 second delay or so between them to allow some time for reindexing.
#
# Example call
# 1) pick Dataset entities to change by querying Neo4J...
# URL: http://18.205.215.12:7474/browser/
# query: MATCH (e:Dataset {entity_type: 'Dataset'}) RETURN e.uuid, e.status, e.ingest_task, e.assigned_to_group_name LIMIT 100
#
# curl --request PUT \
# --url ${ENTITY_API}/datasets \
# --header "Content-Type: application/json" \
# --header "Authorization: Bearer ${TOKEN}" \
# --header "X-Hubmap-Application: entity-api" \
# --data '[{"uuid":"f22a9ba97b79eefe6b152b4315e43c76", "status":"Error", "assigned_to_group_name":"TMC - Cal Tech"}, {"uuid":"e4b371ea3ed4c3ca77791b34b829803f", "status":"Error", "assigned_to_group_name":"TMC - Cal Tech"}]'
@app.route('/datasets', methods=['PUT'])
@app.route('/uploads', methods=['PUT'])
def entity_bulk_update():
entity_type: str = 'dataset'
if request.path == "/uploads":
entity_type = "upload"

validate_token_if_auth_header_exists(request)
require_json(request)

entities = request.get_json()
if entities is None or not isinstance(entities, list) or len(entities) == 0:
bad_request_error("Request object field 'entities' is either missing, "
"does not contain a list, or contains an empty list")

user_token: str = get_user_token(request)
for entity in entities:
validate_user_update_privilege(entity, user_token)

uuids = [e.get("uuid") for e in entities]
if None in uuids:
bad_request_error(f"All {entity_type}s must have a 'uuid' field")
if len(set(uuids)) != len(uuids):
bad_request_error(f"{entity_type}s must have unique 'uuid' fields")

if not all(set(e.keys()).issubset(ENTITY_BULK_UPDATE_FIELDS_ACCEPTED) for e in entities):
bad_request_error(
f"Some {entity_type}s have invalid fields. Acceptable fields are: " +
", ".join(ENTITY_BULK_UPDATE_FIELDS_ACCEPTED)
)

uuids = set([e["uuid"] for e in entities])
try:
fields = {"uuid", "entity_type"}
db_entities = app_neo4j_queries.get_entities_by_uuid(neo4j_driver_instance, uuids, fields)
except Exception as e:
logger.error(f"Error while submitting datasets: {str(e)}")
bad_request_error(str(e))

diff = uuids.difference({e["uuid"] for e in db_entities if e["entity_type"].lower() == entity_type})
if len(diff) > 0:
bad_request_error(f"No {entity_type} found with the following uuids: {', '.join(diff)}")

entity_api_url = app.config["ENTITY_API_URL"].rstrip('/')
thread_instance =\
threading.Thread(target=update_datasets_uploads,
args=(entities, user_token, entity_api_url))
thread_instance.start()

return jsonify(list(uuids)), 202


####################################################################################################
## Internal Functions
Expand Down Expand Up @@ -4382,9 +4550,9 @@ def validate_user_update_privilege(entity, user_token):
abort(user_write_groups)

user_group_uuids = [d['uuid'] for d in user_write_groups]
if entity['group_uuid'] not in user_group_uuids and is_admin is False:
if entity.get('group_uuid') not in user_group_uuids and is_admin is False:
forbidden_error(f"User does not have write privileges for this entity. "
f"Reach out to the help desk ([email protected]) to request access to group: {entity['group_uuid']}.")
"Please reach out to the help desk ([email protected]) to request access.")


"""
Expand Down
47 changes: 47 additions & 0 deletions src/app_neo4j_queries.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import Iterable, List, Optional, Union
from neo4j.exceptions import TransactionError
import logging
import json
Expand Down Expand Up @@ -1286,3 +1287,49 @@ def uuids_all_exist(neo4j_driver, uuids:list):
if (expected_match_count == match_count): return True
raise Exception(f"For {expected_match_count} uuids, only found {match_count}"
f" exist as node identifiers in the Neo4j graph.")


def get_entities_by_uuid(neo4j_driver,
uuids: Union[str, Iterable],
fields: Union[dict, Iterable, None] = None) -> Optional[list]:
"""Get the entities from the neo4j database with the given uuids.
Parameters
----------
uuids : Union[str, Iterable]
The uuid(s) of the entities to get.
fields : Union[dict, Iterable, None], optional
The fields to return for each entity. If None, all fields are returned.
If a dict, the keys are the database fields to return and the values are the names to return them as.
If an iterable, the fields to return. Defaults to None.
Returns
-------
Optional[List[neo4j.Record]]:
The entity records with the given uuids, or None if no datasets were found.
The specified fields are returned for each entity.
Raises
------
ValueError
If fields is not a dict, an iterable, or None.
"""
if isinstance(uuids, str):
uuids = [uuids]
if not isinstance(uuids, list):
uuids = list(uuids)

if fields is None or len(fields) == 0:
return_stmt = 'e'
elif isinstance(fields, dict):
return_stmt = ', '.join([f'e.{field} AS {name}' for field, name in fields.items()])
elif isinstance(fields, Iterable):
return_stmt = ', '.join([f'e.{field} AS {field}' for field in fields])
else:
raise ValueError("fields must be a dict or an iterable")

with neo4j_driver.session() as session:
length = len(uuids)
query = "MATCH (e:Entity) WHERE e.uuid IN $uuids RETURN " + return_stmt
records = session.run(query, uuids=uuids).fetch(length)
if records is None or len(records) == 0:
return None

return records
1 change: 1 addition & 0 deletions src/schema/schema_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ class SchemaConstants(object):
MEMCACHED_TTL = 7200

INGEST_API_APP = 'ingest-api'
ENTITY_API_APP = 'entity-api'
COMPONENT_DATASET = 'component-dataset'
INGEST_PIPELINE_APP = 'ingest-pipeline'
HUBMAP_APP_HEADER = 'X-Hubmap-Application'
Expand Down
20 changes: 12 additions & 8 deletions src/schema/schema_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@
The instance of Flask request passed in from application request
"""
def validate_application_header_before_entity_create(normalized_entity_type, request):
# A list of applications allowed to create this new entity
# Currently only ingest-api and ingest-pipeline are allowed
# to create or update Dataset and Upload
# A list of applications allowed to create this new entity or update Dataset and Upload
# Use lowercase for comparison
applications_allowed = [SchemaConstants.INGEST_API_APP, SchemaConstants.INGEST_PIPELINE_APP]
applications_allowed = [
SchemaConstants.INGEST_API_APP,
SchemaConstants.INGEST_PIPELINE_APP,
SchemaConstants.ENTITY_API_APP
]

_validate_application_header(applications_allowed, request.headers)

Expand Down Expand Up @@ -296,11 +298,13 @@ def collection_entities_are_existing_datasets(property_key, normalized_entity_ty
The json data in request body, already after the regular validations
"""
def validate_application_header_before_property_update(property_key, normalized_entity_type, request, existing_data_dict, new_data_dict):
# A list of applications allowed to update this property
# Currently only ingest-api and ingest-pipeline are allowed
# to update Dataset.status or Upload.status
# A list of applications allowed to update Dataset.status or Upload.status
# Use lowercase for comparison
applications_allowed = [SchemaConstants.INGEST_API_APP, SchemaConstants.INGEST_PIPELINE_APP]
applications_allowed = [
SchemaConstants.INGEST_API_APP,
SchemaConstants.INGEST_PIPELINE_APP,
SchemaConstants.ENTITY_API_APP
]

_validate_application_header(applications_allowed, request.headers)

Expand Down

0 comments on commit d165a7e

Please sign in to comment.