Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport 6460 to 7.6 #6538

Open
wants to merge 9 commits into
base: rel_7_6
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
type: perf
issue: 6538
title: "In HAPI FHIR 8.0.0, transaction processing has been significantly improved thanks
to ticket [#6460](https://github.com/hapifhir/hapi-fhir/pull/6460). This enhancement
has been partially backported to the 7.6.x release line in order to provide partial improvement
prior to the release of HAPI FHIR 8.0.0."
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
type: perf
issue: 6538
title: "In HAPI FHIR 8.0.0, validation processing has been significantly improved thanks
to ticket [#6508](https://github.com/hapifhir/hapi-fhir/pull/6508). This enhancement
has been partially backported to the 7.6.x release line in order to provide partial improvement
prior to the release of HAPI FHIR 8.0.0."
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,24 @@
import ca.uhn.fhir.jpa.model.dao.JpaPid;
import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamToken;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.util.QueryChunker;
import ca.uhn.fhir.model.api.IQueryParameterType;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
import ca.uhn.fhir.rest.param.TokenParam;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.util.FhirTerser;
import ca.uhn.fhir.util.ResourceReferenceInfo;
import ca.uhn.fhir.util.StopWatch;
import ca.uhn.fhir.util.TaskChunker;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.MultimapBuilder;
import jakarta.annotation.Nullable;
import jakarta.persistence.EntityManager;
import jakarta.persistence.FlushModeType;
import jakarta.persistence.PersistenceContext;
import jakarta.persistence.PersistenceContextType;
import jakarta.persistence.PersistenceException;
Expand Down Expand Up @@ -83,6 +86,7 @@ public class TransactionProcessor extends BaseTransactionProcessor {

public static final Pattern SINGLE_PARAMETER_MATCH_URL_PATTERN = Pattern.compile("^[^?]+[?][a-z0-9-]+=[^&,]+$");
private static final Logger ourLog = LoggerFactory.getLogger(TransactionProcessor.class);
public static final int CONDITIONAL_URL_FETCH_CHUNK_SIZE = 100;

@Autowired
private ApplicationContext myApplicationContext;
Expand All @@ -108,9 +112,6 @@ public class TransactionProcessor extends BaseTransactionProcessor {
@Autowired
private MatchUrlService myMatchUrlService;

@Autowired
private IRequestPartitionHelperSvc myRequestPartitionSvc;

public void setEntityManagerForUnitTest(EntityManager theEntityManager) {
myEntityManager = theEntityManager;
}
Expand Down Expand Up @@ -146,25 +147,67 @@ protected EntriesToProcessMap doTransactionWriteOperations(
List<IBase> theEntries,
StopWatch theTransactionStopWatch) {

ITransactionProcessorVersionAdapter<?, ?> versionAdapter = getVersionAdapter();
RequestPartitionId requestPartitionId =
super.determineRequestPartitionIdForWriteEntries(theRequest, theEntries);

if (requestPartitionId != null) {
preFetch(theTransactionDetails, theEntries, versionAdapter, requestPartitionId);
/*
* We temporarily set the flush mode for the duration of the DB transaction
* from the default of AUTO to the temporary value of COMMIT here. We do this
* because in AUTO mode, if any SQL SELECTs are required during the
* processing of an individual transaction entry, the server will flush the
* pending INSERTs/UPDATEs to the database before executing the SELECT.
* This hurts performance since we don't get the benefit of batching those
* write operations as much as possible. The tradeoff here is that we
* could theoretically have transaction operations which try to read
* data previously written in the same transaction, and they won't see it.
* This shouldn't actually be an issue anyhow - we pre-fetch conditional
* URLs and reference targets at the start of the transaction. But this
* tradeoff still feels worth it, since the most common use of transactions
* is for fast writing of data.
*
* Note that it's probably not necessary to reset it back, it should
* automatically go back to the default value after the transaction but
* we reset it just to be safe.
*/
FlushModeType flushMode = FlushModeType.COMMIT;
for (IBase entry : theEntries) {
IBaseResource res = myVersionAdapter.getResource(entry);
if (res != null) {
String type = myFhirContext.getResourceType(res);
// These types write additional tables during the entity write
if ("ValueSet".equals(type)
|| "CodeSystem".equals(type)
|| "Subscription".equals(type)
|| "ConceptMap".equals(type)) {
flushMode = FlushModeType.AUTO;
break;
}
}
}

return super.doTransactionWriteOperations(
theRequest,
theActionName,
theTransactionDetails,
theAllIds,
theIdSubstitutions,
theIdToPersistedOutcome,
theResponse,
theOriginalRequestOrder,
theEntries,
theTransactionStopWatch);
FlushModeType initialFlushMode = myEntityManager.getFlushMode();
try {
myEntityManager.setFlushMode(flushMode);

ITransactionProcessorVersionAdapter<?, ?> versionAdapter = getVersionAdapter();
RequestPartitionId requestPartitionId =
super.determineRequestPartitionIdForWriteEntries(theRequest, theEntries);

if (requestPartitionId != null) {
preFetch(theTransactionDetails, theEntries, versionAdapter, requestPartitionId);
}

return super.doTransactionWriteOperations(
theRequest,
theActionName,
theTransactionDetails,
theAllIds,
theIdSubstitutions,
theIdToPersistedOutcome,
theResponse,
theOriginalRequestOrder,
theEntries,
theTransactionStopWatch);
} finally {
myEntityManager.setFlushMode(initialFlushMode);
}
}

private void preFetch(
Expand Down Expand Up @@ -199,24 +242,60 @@ private void preFetchResourcesById(
RequestPartitionId theRequestPartitionId,
Set<String> foundIds,
List<Long> idsToPreFetch) {
List<IIdType> idsToPreResolve = new ArrayList<>();

FhirTerser terser = myFhirContext.newTerser();

Set<IIdType> idsToPreResolve = new HashSet<>();
Set<IIdType> idsToPreResolveIdOnly = new HashSet<>();

for (IBase nextEntry : theEntries) {
IBaseResource resource = theVersionAdapter.getResource(nextEntry);
if (resource != null) {
String verb = theVersionAdapter.getEntryRequestVerb(myFhirContext, nextEntry);

/*
* Pre-fetch any resources that are potentially being directly updated by ID
*/
if ("PUT".equals(verb) || "PATCH".equals(verb)) {
String requestUrl = theVersionAdapter.getEntryRequestUrl(nextEntry);
if (countMatches(requestUrl, '/') == 1 && countMatches(requestUrl, '?') == 0) {
if (countMatches(requestUrl, '?') == 0) {
IIdType id = myFhirContext.getVersion().newIdType();
id.setValue(requestUrl);
idsToPreResolve.add(id);
IIdType unqualifiedVersionless = id.toUnqualifiedVersionless();
idsToPreResolve.add(unqualifiedVersionless);
}
}

/*
* Pre-fetch any resources that are referred to directly by ID (don't replace
* the TRUE flag with FALSE in case we're updating a resource but also
* pointing to that resource elsewhere in the bundle)
*/
if ("PUT".equals(verb) || "POST".equals(verb)) {
for (ResourceReferenceInfo referenceInfo : terser.getAllResourceReferences(resource)) {
IIdType reference = referenceInfo.getResourceReference().getReferenceElement();
if (reference != null
&& !reference.isLocal()
&& !reference.isUuid()
&& reference.hasResourceType()
&& reference.hasIdPart()
&& !reference.getValue().contains("?")) {
idsToPreResolveIdOnly.add(reference.toUnqualifiedVersionless());
}
}
}
}
}
List<JpaPid> outcome =
myIdHelperService.resolveResourcePersistentIdsWithCache(theRequestPartitionId, idsToPreResolve).stream()
.collect(Collectors.toList());

idsToPreResolveIdOnly.removeAll(idsToPreResolve);

/*
* Pre-resolve any IDs that we'll need to prefetch entirely (this applies to
* resources that are being updated by the transaction itself, so we need to
* load everything about the resource)
*/
List<JpaPid> outcome = myIdHelperService.resolveResourcePersistentIdsWithCache(
theRequestPartitionId, List.copyOf(idsToPreResolve));
for (JpaPid next : outcome) {
foundIds.add(
next.getAssociatedResourceId().toUnqualifiedVersionless().getValue());
Expand All @@ -231,6 +310,42 @@ private void preFetchResourcesById(
theTransactionDetails.addResolvedResourceId(next.toUnqualifiedVersionless(), null);
}
}

/*
* Pre-resolve any IDs that are references to other resources outside of the
* transaction bundle. These ones don't need to be fully loaded since we're not
* updating them, we just need to verify that they exist and aren't deleted
* so that we know we can link against them. Doing this here means that
* we're not resolving them one-by-one.
*/
ListMultimap<String, String> typeToIds =
MultimapBuilder.hashKeys().arrayListValues().build();
for (IIdType next : idsToPreResolveIdOnly) {
typeToIds.put(next.getResourceType(), next.getIdPart());
}
for (String resourceType : typeToIds.keySet()) {
List<String> ids = typeToIds.get(resourceType);
try {
Map<String, JpaPid> resolvedIds =
myIdHelperService.resolveResourcePersistentIds(theRequestPartitionId, resourceType, ids, true);
for (Map.Entry<String, JpaPid> entries : resolvedIds.entrySet()) {
IIdType id = myFhirContext.getVersion().newIdType();
id.setValue(resourceType + "/" + entries.getKey());
JpaPid pid = entries.getValue();
pid.setAssociatedResourceId(id);
theTransactionDetails.addResolvedResourceId(id, pid);
}
} catch (ResourceNotFoundException e) {
ourLog.debug("Ignoring not found exception, it will be surfaced later in the processing", e);
}
}
}

@Override
protected void handleVerbChangeInTransactionWriteOperations() {
super.handleVerbChangeInTransactionWriteOperations();

myEntityManager.flush();
}

private void preFetchConditionalUrls(
Expand Down Expand Up @@ -274,10 +389,10 @@ private void preFetchConditionalUrls(
}
}

new QueryChunker<MatchUrlToResolve>()
new TaskChunker<MatchUrlToResolve>()
.chunk(
searchParameterMapsToResolve,
100,
CONDITIONAL_URL_FETCH_CHUNK_SIZE,
map -> preFetchSearchParameterMaps(
theTransactionDetails, theRequestPartitionId, map, idsToPreFetch));
}
Expand Down
Loading
Loading