AbstractDynamoDBQuery.java
/*
* Copyright © 2018 spring-data-dynamodb (https://github.com/prasanna0586/spring-data-dynamodb)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.socialsignin.spring.data.dynamodb.repository.query;
import org.socialsignin.spring.data.dynamodb.core.DynamoDBOperations;
import org.socialsignin.spring.data.dynamodb.domain.UnpagedPageImpl;
import org.socialsignin.spring.data.dynamodb.exception.BatchDeleteException;
import org.socialsignin.spring.data.dynamodb.query.Query;
import org.socialsignin.spring.data.dynamodb.utils.ExceptionHandler;
import org.springframework.data.domain.*;
import org.springframework.data.repository.query.ParameterAccessor;
import org.springframework.data.repository.query.Parameters;
import org.springframework.data.repository.query.ParametersParameterAccessor;
import org.springframework.data.repository.query.RepositoryQuery;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import software.amazon.awssdk.enhanced.dynamodb.model.BatchWriteResult;
import java.util.*;
/**
* Abstract base class for DynamoDB repository queries that handles execution of queries
* and manages different execution strategies (collection, paged, sliced, single entity, delete, etc.).
* @param <T> the entity type
* @param <ID> the ID type of the entity
* @author Prasanna Kumar Ramachandran
*/
public abstract class AbstractDynamoDBQuery<T, ID> implements RepositoryQuery, ExceptionHandler {
/**
* DynamoDB operations instance used to execute queries against DynamoDB.
*/
protected final DynamoDBOperations dynamoDBOperations;
private final DynamoDBQueryMethod<T, ID> method;
/**
* Constructs an AbstractDynamoDBQuery with the provided DynamoDB operations and query method.
* @param dynamoDBOperations the DynamoDB operations instance to use for query execution
* @param method the query method metadata containing method-specific query information
*/
public AbstractDynamoDBQuery(DynamoDBOperations dynamoDBOperations, DynamoDBQueryMethod<T, ID> method) {
this.dynamoDBOperations = dynamoDBOperations;
this.method = method;
}
/**
* Determines and returns the appropriate query execution strategy based on the query method type
* and result restriction settings.
* @return a QueryExecution instance configured for the appropriate result type
* (collection, slice, page, single entity, or delete)
*/
@NonNull
protected QueryExecution<T, ID> getExecution() {
if (method.isCollectionQuery() && !isSingleEntityResultsRestriction()) {
return new CollectionExecution();
} else if (method.isSliceQuery() && !isSingleEntityResultsRestriction()) {
return new SlicedExecution(method.getParameters());
} else if (method.isPageQuery() && !isSingleEntityResultsRestriction()) {
return new PagedExecution(method.getParameters());
} else if (method.isModifyingQuery()) {
throw new UnsupportedOperationException("Modifying queries not yet supported");
} else if (isSingleEntityResultsRestriction()) {
return new SingleEntityLimitedExecution();
} else if (isDeleteQuery()) {
return new DeleteExecution();
} else {
return new SingleEntityExecution();
}
}
/**
* Creates a Query object for retrieving entities based on the provided parameter values.
* This method is implemented by subclasses to provide specific query creation logic.
* @param values the parameter values passed to the query method
* @return a Query object for retrieving entities of type T
*/
protected abstract Query<T> doCreateQuery(Object[] values);
/**
* Creates a Query object for counting entities based on the provided parameter values.
* This method is implemented by subclasses to provide specific count query creation logic.
* @param values the parameter values passed to the query method
* @param pageQuery true if this is a count query for pagination, false otherwise
* @return a Query object that returns the count as a Long
*/
protected abstract Query<Long> doCreateCountQuery(Object[] values, boolean pageQuery);
/**
* Determines if this is a count query that should return the number of entities
* matching the query criteria.
* @return true if this is a count query, false otherwise
*/
protected abstract boolean isCountQuery();
/**
* Determines if this is an exists query that checks for the presence of entities
* matching the query criteria.
* @return true if this is an exists query, false otherwise
*/
protected abstract boolean isExistsQuery();
/**
* Determines if this is a delete query that should delete entities matching
* the query criteria.
* @return true if this is a delete query, false otherwise
*/
protected abstract boolean isDeleteQuery();
/**
* Returns the maximum number of results to retrieve from the query, if applicable.
* This is used to limit the result set size.
* @return the maximum number of results as an Integer, or null if no limit is applied
*/
@Nullable
protected abstract Integer getResultsRestrictionIfApplicable();
/**
* Determines if the query results should be restricted to a single entity.
* This is typically true for query methods that return a single entity rather than a collection.
* @return true if results should be restricted to a single entity, false otherwise
*/
protected abstract boolean isSingleEntityResultsRestriction();
/**
* Creates a Query object with scan permissions configured based on the query method settings.
* This method wraps doCreateQuery() and sets scan permissions if enabled.
* @param values the parameter values passed to the query method
* @return a Query object with scan permissions configured
*/
@NonNull
protected Query<T> doCreateQueryWithPermissions(Object[] values) {
Query<T> query = doCreateQuery(values);
query.setScanEnabled(method.isScanEnabled());
return query;
}
/**
* Creates a count Query object with scan count permissions configured based on the query method settings.
* This method wraps doCreateCountQuery() and sets scan count permissions if enabled.
* @param values the parameter values passed to the query method
* @param pageQuery true if this is a count query for pagination, false otherwise
* @return a Query object that returns a count with scan count permissions configured
*/
@NonNull
protected Query<Long> doCreateCountQueryWithPermissions(Object[] values, boolean pageQuery) {
Query<Long> query = doCreateCountQuery(values, pageQuery);
query.setScanCountEnabled(method.isScanCountEnabled());
return query;
}
/**
* Interface that defines the execution strategy for DynamoDB queries.
* Implementations handle different return types such as collections, pages, slices, single entities, and deletes.
* @param <T> the entity type
* @param <ID> the ID type of the entity
*/
protected interface QueryExecution<T, ID> {
/**
* Executes a query and returns the result.
* @param query the AbstractDynamoDBQuery instance executing the query
* @param values the parameter values passed to the query method
* @return the query result, which can be null, a single entity, a collection, a page, a slice, or a count
*/
@Nullable
Object execute(AbstractDynamoDBQuery<T, ID> query, Object[] values);
}
class CollectionExecution implements QueryExecution<T, ID> {
@Override
public Object execute(@NonNull AbstractDynamoDBQuery<T, ID> dynamoDBQuery, Object[] values) {
Query<T> query = dynamoDBQuery.doCreateQueryWithPermissions(values);
if (getResultsRestrictionIfApplicable() != null) {
List<T> resultList = query.getResultList();
if (resultList == null) {
return Collections.emptyList();
}
return restrictMaxResultsIfNecessary(resultList.iterator());
} else
return query.getResultList();
}
@NonNull
private List<T> restrictMaxResultsIfNecessary(@NonNull Iterator<T> iterator) {
Integer maxResults = getResultsRestrictionIfApplicable();
if (maxResults == null) {
throw new IllegalStateException("maxResults should not be null when restrictMaxResultsIfNecessary is called");
}
int processed = 0;
List<T> resultsPage = new ArrayList<>();
while (iterator.hasNext() && processed < maxResults) {
resultsPage.add(iterator.next());
processed++;
}
return resultsPage;
}
}
/**
* Executes the {@link AbstractDynamoDBQuery} to return a {@link org.springframework.data.domain.Page} of entities.
*/
class PagedExecution implements QueryExecution<T, ID> {
private final Parameters<?, ?> parameters;
public PagedExecution(Parameters<?, ?> parameters) {
this.parameters = parameters;
}
private long scanThroughResults(@NonNull Iterator<T> iterator, long resultsToScan) {
long processed = 0;
while (iterator.hasNext() && processed < resultsToScan) {
iterator.next();
processed++;
}
return processed;
}
@NonNull
private List<T> readPageOfResultsRestrictMaxResultsIfNecessary(@NonNull Iterator<T> iterator, int pageSize) {
int processed = 0;
Integer maxResults = getResultsRestrictionIfApplicable();
int toProcess = maxResults != null
? Math.min(pageSize, maxResults)
: pageSize;
List<T> resultsPage = new ArrayList<>();
while (iterator.hasNext() && processed < toProcess) {
resultsPage.add(iterator.next());
processed++;
}
return resultsPage;
}
@NonNull
@Override
public Object execute(@NonNull AbstractDynamoDBQuery<T, ID> dynamoDBQuery, @NonNull Object[] values) {
ParameterAccessor accessor = new ParametersParameterAccessor(parameters, values);
Pageable pageable = accessor.getPageable();
Query<T> query = dynamoDBQuery.doCreateQueryWithPermissions(values);
List<T> results = query.getResultList();
if (results == null) {
results = Collections.emptyList();
}
return createPage(results, pageable, dynamoDBQuery, values);
}
@NonNull
private Page<T> createPage(@NonNull List<T> allResults, @NonNull Pageable pageable, @NonNull AbstractDynamoDBQuery<T, ID> dynamoDBQuery,
Object[] values) {
// Get the result = this list might be a lazy list
Iterator<T> iterator = allResults.iterator();
// Check if the pageable request is 'beyond' the result set
if (!pageable.isUnpaged() && pageable.getOffset() > 0) {
long processedCount = scanThroughResults(iterator, pageable.getOffset());
if (processedCount < pageable.getOffset()) {
return new PageImpl<>(Collections.emptyList());
}
}
// Then Count the result set size
Query<Long> countQuery = dynamoDBQuery.doCreateCountQueryWithPermissions(values, true);
Long countResult = countQuery.getSingleResult();
long count = countResult != null ? countResult : 0L;
// Finally wrap the result in a page -
if (!pageable.isUnpaged()) {
// either seek to the proper part of the result set
Integer maxResults = getResultsRestrictionIfApplicable();
if (maxResults != null) {
count = Math.min(count, maxResults);
}
List<T> results = readPageOfResultsRestrictMaxResultsIfNecessary(iterator, pageable.getPageSize());
return new PageImpl<>(results, pageable, count);
} else {
// or treat the whole (lazy) list as the result page if it's unpaged
return new UnpagedPageImpl<>(allResults, count);
}
}
}
class SlicedExecution implements QueryExecution<T, ID> {
private final Parameters<?, ?> parameters;
public SlicedExecution(Parameters<?, ?> parameters) {
this.parameters = parameters;
}
private long scanThroughResults(@NonNull Iterator<T> iterator, long resultsToScan) {
long processed = 0;
while (iterator.hasNext() && processed < resultsToScan) {
iterator.next();
processed++;
}
return processed;
}
@NonNull
private List<T> readPageOfResultsRestrictMaxResultsIfNecessary(@NonNull Iterator<T> iterator, int pageSize) {
int processed = 0;
Integer maxResults = getResultsRestrictionIfApplicable();
int toProcess = maxResults != null
? Math.min(pageSize, maxResults)
: pageSize;
List<T> resultsPage = new ArrayList<>();
while (iterator.hasNext() && processed < toProcess) {
resultsPage.add(iterator.next());
processed++;
}
return resultsPage;
}
@NonNull
@Override
public Object execute(@NonNull AbstractDynamoDBQuery<T, ID> dynamoDBQuery, @NonNull Object[] values) {
ParameterAccessor accessor = new ParametersParameterAccessor(parameters, values);
Pageable pageable = accessor.getPageable();
Query<T> query = dynamoDBQuery.doCreateQueryWithPermissions(values);
List<T> results = query.getResultList();
if (results == null) {
results = Collections.emptyList();
}
return createSlice(results, pageable);
}
@NonNull
private Slice<T> createSlice(@NonNull List<T> allResults, @NonNull Pageable pageable) {
Iterator<T> iterator = allResults.iterator();
if (pageable.getOffset() > 0) {
long processedCount = scanThroughResults(iterator, pageable.getOffset());
if (processedCount < pageable.getOffset())
return new SliceImpl<>(new ArrayList<>());
}
List<T> results = readPageOfResultsRestrictMaxResultsIfNecessary(iterator, pageable.getPageSize());
// Scan ahead to retrieve the next page count
boolean hasMoreResults = scanThroughResults(iterator, 1) > 0;
Integer maxResults = getResultsRestrictionIfApplicable();
if (maxResults != null && maxResults <= results.size())
hasMoreResults = false;
return new SliceImpl<>(results, pageable, hasMoreResults);
}
}
class DeleteExecution implements QueryExecution<T, ID> {
@NonNull
@Override
public Object execute(@NonNull AbstractDynamoDBQuery<T, ID> dynamoDBQuery, Object[] values) throws BatchDeleteException {
List<T> entities = dynamoDBQuery.doCreateQueryWithPermissions(values).getResultList();
if (entities == null || entities.isEmpty()) {
return Collections.emptyList();
}
// Group entities by class for extraction if needed
Map<Class<?>, List<Object>> entitiesByClass = new HashMap<>();
for (T entity : entities) {
entitiesByClass.computeIfAbsent(entity.getClass(), k -> new ArrayList<>()).add(entity);
}
List<BatchWriteResult> batchResults = dynamoDBOperations.batchDelete(entities);
// Extract unprocessed entities using SDK v2 extraction
List<Object> unprocessedEntities = dynamoDBOperations.extractUnprocessedDeleteItems(
batchResults, entitiesByClass);
if (unprocessedEntities.isEmpty()) {
// All entities were successfully deleted
return entities;
} else {
// Throw exception with actual unprocessed entities
throw repackageToException(
unprocessedEntities,
0, // SDK v2 client handles retries internally
null, // No exception, just unprocessed items
BatchDeleteException.class);
}
}
}
class SingleEntityExecution implements QueryExecution<T, ID> {
@Override
public Object execute(@NonNull AbstractDynamoDBQuery<T, ID> dynamoDBQuery, Object[] values) {
if (isCountQuery()) {
return dynamoDBQuery.doCreateCountQueryWithPermissions(values, false).getSingleResult();
} else if (isExistsQuery()) {
List<T> resultList = dynamoDBQuery.doCreateQueryWithPermissions(values).getResultList();
return resultList != null && !resultList.isEmpty();
} else {
return dynamoDBQuery.doCreateQueryWithPermissions(values).getSingleResult();
}
}
}
class SingleEntityLimitedExecution implements QueryExecution<T, ID> {
@Nullable
@Override
public Object execute(@NonNull AbstractDynamoDBQuery<T, ID> dynamoDBQuery, Object[] values) {
if (isCountQuery()) {
return dynamoDBQuery.doCreateCountQueryWithPermissions(values, false).getSingleResult();
} else {
List<T> resultList = dynamoDBQuery.doCreateQueryWithPermissions(values).getResultList();
return resultList != null && !resultList.isEmpty() ? resultList.getFirst() : null;
}
}
}
/**
* Executes the query using the appropriate execution strategy determined by getExecution().
* Delegates to the execution strategy to handle the actual query execution and result processing.
* @param parameters the parameter values passed to the query method
* @return the query result, which type depends on the query method return type
*/
public Object execute(@NonNull Object[] parameters) {
return getExecution().execute(this, parameters);
}
@NonNull
@Override
public DynamoDBQueryMethod<T, ID> getQueryMethod() {
return this.method;
}
}