Skip to content

Commit

Permalink
fix: fixed all tests and added documentation
Browse files Browse the repository at this point in the history
Signed-off-by: Alwin Zomotor <[email protected]>
  • Loading branch information
CRAlwin committed Sep 18, 2024
1 parent 9eb7718 commit 5d05a94
Show file tree
Hide file tree
Showing 12 changed files with 146 additions and 487 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 - for information on the respective copyright owner
* Copyright (c) 2024 - for information on the respective copyright owner
* see the NOTICE file and/or the repository https://github.com/carbynestack/castor.
*
* SPDX-License-Identifier: Apache-2.0
Expand Down Expand Up @@ -62,12 +62,6 @@ public DefaultTuplesDownloadService(
@Override
public <T extends Tuple<T, F>, F extends Field> byte[] getTupleList(
Class<T> tupleCls, F field, long count, UUID requestId) {
// Was passiert, wenn Castor sehr Langsam ist? Hat das Auswirkungen auf die
// Ephemeral-Execution-Zeit?
// try {
// TimeUnit.MILLISECONDS.sleep(10);
// System.out.println("Slept for 500 millisecs");
// }catch(Exception ignored){System.out.println("Didn't sleep");}
TupleType tupleType = TupleType.findTupleType(tupleCls, field);
String reservationId = requestId + "_" + tupleType;
Reservation reservation;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
/*
* Copyright (c) 2021 - for information on the respective copyright owner
* Copyright (c) 2024 - for information on the respective copyright owner
* see the NOTICE file and/or the repository https://github.com/carbynestack/castor.
*
* SPDX-License-Identifier: Apache-2.0
*/

package io.carbynestack.castor.service.persistence.cache;

import io.carbynestack.castor.client.download.CastorInterVcpClient;
import io.carbynestack.castor.common.entities.Reservation;
import io.carbynestack.castor.common.entities.ReservationElement;
import io.carbynestack.castor.common.entities.TupleType;
Expand All @@ -27,15 +26,10 @@
@Slf4j
@AllArgsConstructor
public final class CreateReservationSupplier implements Supplier<Reservation> {
public static final String INSUFFICIENT_TUPLES_EXCEPTION_MSG =
"Insufficient Tuples of type %s available (%s out of %s).";
public static final String SHARING_RESERVATION_FAILED_EXCEPTION_MSG =
"Sharing reservation with slave services failed.";
public static final String FAILED_RESERVE_TUPLES_EXCEPTION_MSG = "Failed to reserve the tuples.";
public static final String FAILED_FETCH_AVAILABLE_FRAGMENT_EXCEPTION_MSG =
"Unable to locate available tuples.";
final CastorInterVcpClient castorInterVcpClient;
final ReservationCachingService reservationCache;
final TupleChunkFragmentStorageService fragmentStorageService;
final String reservationId;
final TupleType tupleType;
Expand All @@ -59,6 +53,10 @@ public Reservation get() {
}

/**
* Composes elements of the reservation by first trying to get as many fragments with the 'initial
* fragment size' as possible and then with fragments that deviate from this. The fragments with
* 'round' size are stored after the fragments with 'non-round' fragments size'
*
* @throws CastorServiceException if not enough tuples of the given type are available
* @throws CastorServiceException if reserving the requested amount of tuples failed, although
* there are enough tuples available
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 - for information on the respective copyright owner
* Copyright (c) 2024 - for information on the respective copyright owner
* see the NOTICE file and/or the repository https://github.com/carbynestack/castor.
*
* SPDX-License-Identifier: Apache-2.0
Expand Down Expand Up @@ -132,14 +132,6 @@ public void keepAndApplyReservation(Reservation reservation) throws InterruptedE
log.debug("persisting reservation {}", reservation);
ValueOperations<String, Object> ops = redisTemplate.opsForValue();
if (ops.get(cachePrefix + reservation.getReservationId()) == null) {
// TransactionSynchronizationManager.registerSynchronization(new
// TransactionSynchronization() {
// @Override
// public void afterCommit(){
// log.debug("put in database at {}", cachePrefix + reservation.getReservationId());
//
// }
// });
log.debug("Apply reservation {}", reservation);
storeReservationInDB(reservation);
ops.set(cachePrefix + reservation.getReservationId(), reservation);
Expand All @@ -153,6 +145,10 @@ public void keepAndApplyReservation(Reservation reservation) throws InterruptedE
}

/**
* Maps the reservation elements to tuplefragments and saves them accordingly in batches to the
* RDBMS. This function assumes that all 'non-round' fragments are saved before 'round' fragments
* for faster processing.
*
* @param reservation The reserved fragments
* @throws InterruptedException : If the fragments can not be reserved
*/
Expand All @@ -169,6 +165,8 @@ public void storeReservationInDB(@NotNull Reservation reservation) throws Interr
try {

TupleChunkFragmentEntity fragment = null;
// the timeout functionality is needed because the fragments might not be unlocked at that
// point in time
int singleRetryMicrosecs = 6000;
while (fragment == null) {
fragment =
Expand Down Expand Up @@ -317,8 +315,6 @@ public Reservation createReservation(String reservationId, TupleType tupleType,
.get()
.runAsNewTransaction(
new CreateReservationSupplier(
castorInterVcpClientOptional.get(),
this,
tupleChunkFragmentStorageService,
reservationId,
tupleType,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 - for information on the respective copyright owner
* Copyright (c) 2024 - for information on the respective copyright owner
* see the NOTICE file and/or the repository https://github.com/carbynestack/castor.
*
* SPDX-License-Identifier: Apache-2.0
Expand All @@ -22,7 +22,6 @@
import org.springframework.data.jpa.repository.QueryHints;
import org.springframework.data.repository.CrudRepository;
import org.springframework.data.repository.query.Param;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;

public interface TupleChunkFragmentRepository
Expand Down Expand Up @@ -61,51 +60,6 @@ public interface TupleChunkFragmentRepository
Optional<TupleChunkFragmentEntity> findAvailableFragmentForTupleChunkContainingIndex(
@Param("tupleChunkId") UUID tupleChunkId, @Param("startIndex") long startIndex);

@Lock(LockModeType.PESSIMISTIC_WRITE)
@QueryHints({@QueryHint(name = "javax.persistence.lock.timeout", value = "3000")})
@Query(
value =
"SELECT f FROM "
+ CLASS_NAME
+ " f "
+ "WHERE "
+ TUPLE_CHUNK_ID_FIELD
+ "=:tupleChunkId "
+ "AND "
+ START_INDEX_FIELD
+ "<=:startIndex "
+ "AND "
+ END_INDEX_FIELD
+ ">:startIndex"
+ " ORDER BY "
+ START_INDEX_FIELD
+ " DESC")
Optional<TupleChunkFragmentEntity> mockFindAvailableFragmentForTupleChunkContainingIndex(
@Param("tupleChunkId") UUID tupleChunkId, @Param("startIndex") long startIndex);

@Transactional(isolation = Isolation.SERIALIZABLE)
@QueryHints({@QueryHint(name = "javax.persistence.lock.timeout", value = "3000")})
@Query(
value = "SELECT f FROM " + CLASS_NAME + " f " + "WHERE " + TUPLE_TYPE_FIELD + "=:tupleType ")
ArrayList<TupleChunkFragmentEntity> findAvailableFragmentsForTupleType(
@Param("tupleType") TupleType type);

@Transactional
@Modifying
@Query(
value =
"CREATE INDEX IF NOT EXISTS "
+ TUPLECHUNK_STARTINDEX_INDEX_NAME
+ " ON "
+ TABLE_NAME
+ "("
+ TUPLE_CHUNK_ID_COLUMN
+ ", "
+ START_INDEX_COLUMN
+ ")",
nativeQuery = true)
void addIndexChunkIdAndStartIndex();

@Transactional(readOnly = true)
@Query(
value =
Expand Down Expand Up @@ -188,33 +142,6 @@ ArrayList<TupleChunkFragmentEntity> retrieveAndReserveRoundFragmentsByType(
@Param("amount") int amount,
@Param("reservationId") String reservationId);

@Transactional
@Query(
value =
" UPDATE "
+ TABLE_NAME
+ " SET "
+ RESERVATION_ID_COLUMN
+ " = :reservationId"
+ " WHERE "
+ FRAGMENT_ID_COLUMN
+ " IN (SELECT "
+ FRAGMENT_ID_COLUMN
+ " FROM "
+ TABLE_NAME
+ " WHERE "
+ TUPLE_TYPE_COLUMN
+ " = :tupleType AND "
+ RESERVATION_ID_COLUMN
+ " is NULL AND "
+ IS_ROUND_COLUMN
+ " FOR UPDATE SKIP LOCKED LIMIT :amount) RETURNING *",
nativeQuery = true)
ArrayList<TupleChunkFragmentEntity> mockRetrieveAndReserveRoundFragmentsByType(
@Param("tupleType") String tupleType,
@Param("amount") int amount,
@Param("reservationId") String reservationId);

@Transactional
@Modifying
@Query(
Expand All @@ -240,38 +167,6 @@ int reserveRoundFragmentsByIndices(
@Param("reservationId") String reservationId,
@Param("tupleChunkId") UUID tupleChunkId);

@Transactional
@Modifying
@Query(
value =
"UPDATE "
+ TABLE_NAME
+ " SET "
+ RESERVATION_ID_COLUMN
+ " = :reservationId WHERE "
+ START_INDEX_COLUMN
+ " IN :indices AND "
+ IS_ROUND_COLUMN
+ " AND "
+ TUPLE_CHUNK_ID_COLUMN
+ " = :tupleChunkId",
nativeQuery = true)
int mockReserveRoundFragmentsByIndices(
@Param("indices") ArrayList<Long> indices,
@Param("reservationId") String reservationId,
@Param("tupleChunkId") UUID tupleChunkId);

@Transactional
@Query(
value =
"SELECT COUNT(*) FROM "
+ TABLE_NAME
+ " WHERE "
+ RESERVATION_ID_COLUMN
+ " = :reservationId ",
nativeQuery = true)
int mockGetAllByReservationId(@Param("reservationId") String reservationId);

@Transactional
@Query(
value =
Expand Down Expand Up @@ -316,22 +211,6 @@ Optional<TupleChunkFragmentEntity> retrieveSinglePartialFragmentPreferSmall(
Optional<TupleChunkFragmentEntity> retrieveSinglePartialFragmentPreferBig(
@Param("tupleType") String tupleType);

@Transactional
@Query(
value =
"SELECT * FROM "
+ TABLE_NAME
+ " WHERE "
+ TUPLE_TYPE_COLUMN
+ " = :tupleType AND "
+ RESERVATION_ID_COLUMN
+ " is NULL ORDER BY "
+ IS_ROUND_COLUMN
+ " FOR UPDATE SKIP LOCKED LIMIT 1",
nativeQuery = true)
Optional<TupleChunkFragmentEntity> mockRetrieveSinglePartialFragment(
@Param("tupleType") String tupleType);

@Transactional
@Query(
value =
Expand Down Expand Up @@ -359,26 +238,6 @@ ArrayList<TupleChunkFragmentEntity> test(
@Param("amount") int amount,
@Param("reservationId") String reservationId);

@Transactional
@Query(
value =
"DELETE FROM "
+ TABLE_NAME
+ " WHERE "
+ FRAGMENT_ID_COLUMN
+ " IN (SELECT "
+ FRAGMENT_ID_COLUMN
+ " FROM "
+ TABLE_NAME
+ " WHERE "
+ RESERVATION_ID_COLUMN
+ " = :reservationId ORDER BY "
+ IS_ROUND_COLUMN
+ " FOR UPDATE NOWAIT) RETURNING *",
nativeQuery = true)
ArrayList<TupleChunkFragmentEntity> lockAndRetrieveReservedTuplesForConsumption(
@Param("reservationId") String reservationId);

@Transactional
@Query(
value =
Expand All @@ -403,57 +262,6 @@ String lockFirstFragmentReturningReservationId(
nativeQuery = true)
int lockRemainingTuplesWithoutRetrieving(@Param("reservationId") String reservationId);

@Transactional
@Query(
value =
"SELECT "
+ FRAGMENT_ID_COLUMN
+ " FROM "
+ TABLE_NAME
+ " WHERE "
+ TUPLE_CHUNK_ID_COLUMN
+ " = :tupleChunkId AND "
+ START_INDEX_COLUMN
+ " = :startIdx FOR UPDATE",
nativeQuery = true)
Optional<Integer> lockFirstRow(
@Param("tupleChunkId") UUID tupleChunkId, @Param("startIdx") long startIdx);

@Transactional
@Query(
value =
"DELETE FROM "
+ TABLE_NAME
+ " WHERE "
+ TUPLE_CHUNK_ID_COLUMN
+ " = :tupleChunkId AND "
+ START_INDEX_COLUMN
+ " IN :startIndices RETURNING "
+ START_INDEX_COLUMN,
nativeQuery = true)
ArrayList<Integer> deleteByChunkAndStartIndex(
@Param("tupleChunkId") UUID tupleChunkId,
@Param("startIndices") ArrayList<Integer> startIndexes);

@Transactional
@Modifying
@Query(
value =
"ALTER TABLE "
+ TABLE_NAME
+ " ADD CONSTRAINT no_conflict_unq UNIQUE("
+ TUPLE_CHUNK_ID_COLUMN
+ ", "
+ START_INDEX_COLUMN
+ ")",
nativeQuery = true)
void addUniqueConstraint();

@Transactional
@Modifying
@Query(value = "ALTER ROLE :pgUserName SET lock_timeout = :ms", nativeQuery = true)
void setUserLevelLockTimeout(@Param("pgUserName") String pgUsername, @Param("ms") int ms);

@Transactional
void deleteAllByReservationId(String reservationId);

Expand Down
Loading

0 comments on commit 5d05a94

Please sign in to comment.