AbstractDynamoDBQuery.java

/*
 * Copyright © 2018 spring-data-dynamodb (https://github.com/prasanna0586/spring-data-dynamodb)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.socialsignin.spring.data.dynamodb.repository.query;

import org.socialsignin.spring.data.dynamodb.core.DynamoDBOperations;
import org.socialsignin.spring.data.dynamodb.domain.UnpagedPageImpl;
import org.socialsignin.spring.data.dynamodb.exception.BatchDeleteException;
import org.socialsignin.spring.data.dynamodb.query.Query;
import org.socialsignin.spring.data.dynamodb.utils.ExceptionHandler;
import org.springframework.data.domain.*;
import org.springframework.data.repository.query.ParameterAccessor;
import org.springframework.data.repository.query.Parameters;
import org.springframework.data.repository.query.ParametersParameterAccessor;
import org.springframework.data.repository.query.RepositoryQuery;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import software.amazon.awssdk.enhanced.dynamodb.model.BatchWriteResult;

import java.util.*;

/**
 * Abstract base class for DynamoDB repository queries that handles execution of queries
 * and manages different execution strategies (collection, paged, sliced, single entity, delete, etc.).
 * @param <T> the entity type
 * @param <ID> the ID type of the entity
 * @author Prasanna Kumar Ramachandran
 */
public abstract class AbstractDynamoDBQuery<T, ID> implements RepositoryQuery, ExceptionHandler {

    /**
     * DynamoDB operations instance used to execute queries against DynamoDB.
     */
    protected final DynamoDBOperations dynamoDBOperations;
    private final DynamoDBQueryMethod<T, ID> method;

    /**
     * Constructs an AbstractDynamoDBQuery with the provided DynamoDB operations and query method.
     * @param dynamoDBOperations the DynamoDB operations instance to use for query execution
     * @param method the query method metadata containing method-specific query information
     */
    public AbstractDynamoDBQuery(DynamoDBOperations dynamoDBOperations, DynamoDBQueryMethod<T, ID> method) {
        this.dynamoDBOperations = dynamoDBOperations;
        this.method = method;
    }

    /**
     * Determines and returns the appropriate query execution strategy based on the query method type
     * and result restriction settings.
     * @return a QueryExecution instance configured for the appropriate result type
     *         (collection, slice, page, single entity, or delete)
     */
    @NonNull
    protected QueryExecution<T, ID> getExecution() {
        if (method.isCollectionQuery() && !isSingleEntityResultsRestriction()) {
            return new CollectionExecution();
        } else if (method.isSliceQuery() && !isSingleEntityResultsRestriction()) {
            return new SlicedExecution(method.getParameters());
        } else if (method.isPageQuery() && !isSingleEntityResultsRestriction()) {
            return new PagedExecution(method.getParameters());
        } else if (method.isModifyingQuery()) {
            throw new UnsupportedOperationException("Modifying queries not yet supported");
        } else if (isSingleEntityResultsRestriction()) {
            return new SingleEntityLimitedExecution();
        } else if (isDeleteQuery()) {
            return new DeleteExecution();
        } else {
            return new SingleEntityExecution();
        }
    }

    /**
     * Creates a Query object for retrieving entities based on the provided parameter values.
     * This method is implemented by subclasses to provide specific query creation logic.
     * @param values the parameter values passed to the query method
     * @return a Query object for retrieving entities of type T
     */
    protected abstract Query<T> doCreateQuery(Object[] values);

    /**
     * Creates a Query object for counting entities based on the provided parameter values.
     * This method is implemented by subclasses to provide specific count query creation logic.
     * @param values the parameter values passed to the query method
     * @param pageQuery true if this is a count query for pagination, false otherwise
     * @return a Query object that returns the count as a Long
     */
    protected abstract Query<Long> doCreateCountQuery(Object[] values, boolean pageQuery);

    /**
     * Determines if this is a count query that should return the number of entities
     * matching the query criteria.
     * @return true if this is a count query, false otherwise
     */
    protected abstract boolean isCountQuery();

    /**
     * Determines if this is an exists query that checks for the presence of entities
     * matching the query criteria.
     * @return true if this is an exists query, false otherwise
     */
    protected abstract boolean isExistsQuery();

    /**
     * Determines if this is a delete query that should delete entities matching
     * the query criteria.
     * @return true if this is a delete query, false otherwise
     */
    protected abstract boolean isDeleteQuery();

    /**
     * Returns the maximum number of results to retrieve from the query, if applicable.
     * This is used to limit the result set size.
     * @return the maximum number of results as an Integer, or null if no limit is applied
     */
    @Nullable
    protected abstract Integer getResultsRestrictionIfApplicable();

    /**
     * Determines if the query results should be restricted to a single entity.
     * This is typically true for query methods that return a single entity rather than a collection.
     * @return true if results should be restricted to a single entity, false otherwise
     */
    protected abstract boolean isSingleEntityResultsRestriction();

    /**
     * Creates a Query object with scan permissions configured based on the query method settings.
     * This method wraps doCreateQuery() and sets scan permissions if enabled.
     * @param values the parameter values passed to the query method
     * @return a Query object with scan permissions configured
     */
    @NonNull
    protected Query<T> doCreateQueryWithPermissions(Object[] values) {
        Query<T> query = doCreateQuery(values);
        query.setScanEnabled(method.isScanEnabled());
        return query;
    }

    /**
     * Creates a count Query object with scan count permissions configured based on the query method settings.
     * This method wraps doCreateCountQuery() and sets scan count permissions if enabled.
     * @param values the parameter values passed to the query method
     * @param pageQuery true if this is a count query for pagination, false otherwise
     * @return a Query object that returns a count with scan count permissions configured
     */
    @NonNull
    protected Query<Long> doCreateCountQueryWithPermissions(Object[] values, boolean pageQuery) {
        Query<Long> query = doCreateCountQuery(values, pageQuery);
        query.setScanCountEnabled(method.isScanCountEnabled());
        return query;
    }

    /**
     * Interface that defines the execution strategy for DynamoDB queries.
     * Implementations handle different return types such as collections, pages, slices, single entities, and deletes.
     * @param <T> the entity type
     * @param <ID> the ID type of the entity
     */
    protected interface QueryExecution<T, ID> {
        /**
         * Executes a query and returns the result.
         * @param query the AbstractDynamoDBQuery instance executing the query
         * @param values the parameter values passed to the query method
         * @return the query result, which can be null, a single entity, a collection, a page, a slice, or a count
         */
        @Nullable
        Object execute(AbstractDynamoDBQuery<T, ID> query, Object[] values);
    }

    class CollectionExecution implements QueryExecution<T, ID> {

        @Override
        public Object execute(@NonNull AbstractDynamoDBQuery<T, ID> dynamoDBQuery, Object[] values) {
            Query<T> query = dynamoDBQuery.doCreateQueryWithPermissions(values);
            if (getResultsRestrictionIfApplicable() != null) {
                List<T> resultList = query.getResultList();
                if (resultList == null) {
                    return Collections.emptyList();
                }
                return restrictMaxResultsIfNecessary(resultList.iterator());
            } else
                return query.getResultList();
        }

        @NonNull
        private List<T> restrictMaxResultsIfNecessary(@NonNull Iterator<T> iterator) {
            Integer maxResults = getResultsRestrictionIfApplicable();
            if (maxResults == null) {
                throw new IllegalStateException("maxResults should not be null when restrictMaxResultsIfNecessary is called");
            }
            int processed = 0;
            List<T> resultsPage = new ArrayList<>();
            while (iterator.hasNext() && processed < maxResults) {
                resultsPage.add(iterator.next());
                processed++;
            }
            return resultsPage;
        }

    }

    /**
     * Executes the {@link AbstractDynamoDBQuery} to return a {@link org.springframework.data.domain.Page} of entities.
     */
    class PagedExecution implements QueryExecution<T, ID> {

        private final Parameters<?, ?> parameters;

        public PagedExecution(Parameters<?, ?> parameters) {

            this.parameters = parameters;
        }

        private long scanThroughResults(@NonNull Iterator<T> iterator, long resultsToScan) {
            long processed = 0;
            while (iterator.hasNext() && processed < resultsToScan) {
                iterator.next();
                processed++;
            }
            return processed;
        }

        @NonNull
        private List<T> readPageOfResultsRestrictMaxResultsIfNecessary(@NonNull Iterator<T> iterator, int pageSize) {
            int processed = 0;
            Integer maxResults = getResultsRestrictionIfApplicable();
            int toProcess = maxResults != null
                    ? Math.min(pageSize, maxResults)
                    : pageSize;
            List<T> resultsPage = new ArrayList<>();
            while (iterator.hasNext() && processed < toProcess) {
                resultsPage.add(iterator.next());
                processed++;
            }
            return resultsPage;
        }

        @NonNull
        @Override
        public Object execute(@NonNull AbstractDynamoDBQuery<T, ID> dynamoDBQuery, @NonNull Object[] values) {

            ParameterAccessor accessor = new ParametersParameterAccessor(parameters, values);
            Pageable pageable = accessor.getPageable();
            Query<T> query = dynamoDBQuery.doCreateQueryWithPermissions(values);

            List<T> results = query.getResultList();
            if (results == null) {
                results = Collections.emptyList();
            }
            return createPage(results, pageable, dynamoDBQuery, values);
        }

        @NonNull
        private Page<T> createPage(@NonNull List<T> allResults, @NonNull Pageable pageable, @NonNull AbstractDynamoDBQuery<T, ID> dynamoDBQuery,
                                   Object[] values) {

            // Get the result = this list might be a lazy list
            Iterator<T> iterator = allResults.iterator();

            // Check if the pageable request is 'beyond' the result set
            if (!pageable.isUnpaged() && pageable.getOffset() > 0) {
                long processedCount = scanThroughResults(iterator, pageable.getOffset());
                if (processedCount < pageable.getOffset()) {
                    return new PageImpl<>(Collections.emptyList());
                }
            }

            // Then Count the result set size
            Query<Long> countQuery = dynamoDBQuery.doCreateCountQueryWithPermissions(values, true);
            Long countResult = countQuery.getSingleResult();
            long count = countResult != null ? countResult : 0L;

            // Finally wrap the result in a page -
            if (!pageable.isUnpaged()) {
                // either seek to the proper part of the result set
                Integer maxResults = getResultsRestrictionIfApplicable();
                if (maxResults != null) {
                    count = Math.min(count, maxResults);
                }

                List<T> results = readPageOfResultsRestrictMaxResultsIfNecessary(iterator, pageable.getPageSize());
                return new PageImpl<>(results, pageable, count);
            } else {
                // or treat the whole (lazy) list as the result page if it's unpaged
                return new UnpagedPageImpl<>(allResults, count);
            }
        }
    }

    class SlicedExecution implements QueryExecution<T, ID> {

        private final Parameters<?, ?> parameters;

        public SlicedExecution(Parameters<?, ?> parameters) {

            this.parameters = parameters;
        }

        private long scanThroughResults(@NonNull Iterator<T> iterator, long resultsToScan) {
            long processed = 0;
            while (iterator.hasNext() && processed < resultsToScan) {
                iterator.next();
                processed++;
            }
            return processed;
        }

        @NonNull
        private List<T> readPageOfResultsRestrictMaxResultsIfNecessary(@NonNull Iterator<T> iterator, int pageSize) {
            int processed = 0;
            Integer maxResults = getResultsRestrictionIfApplicable();
            int toProcess = maxResults != null
                    ? Math.min(pageSize, maxResults)
                    : pageSize;

            List<T> resultsPage = new ArrayList<>();
            while (iterator.hasNext() && processed < toProcess) {
                resultsPage.add(iterator.next());
                processed++;
            }
            return resultsPage;
        }

        @NonNull
        @Override
        public Object execute(@NonNull AbstractDynamoDBQuery<T, ID> dynamoDBQuery, @NonNull Object[] values) {

            ParameterAccessor accessor = new ParametersParameterAccessor(parameters, values);
            Pageable pageable = accessor.getPageable();
            Query<T> query = dynamoDBQuery.doCreateQueryWithPermissions(values);
            List<T> results = query.getResultList();
            if (results == null) {
                results = Collections.emptyList();
            }
            return createSlice(results, pageable);
        }

        @NonNull
        private Slice<T> createSlice(@NonNull List<T> allResults, @NonNull Pageable pageable) {

            Iterator<T> iterator = allResults.iterator();
            if (pageable.getOffset() > 0) {
                long processedCount = scanThroughResults(iterator, pageable.getOffset());
                if (processedCount < pageable.getOffset())
                    return new SliceImpl<>(new ArrayList<>());
            }
            List<T> results = readPageOfResultsRestrictMaxResultsIfNecessary(iterator, pageable.getPageSize());
            // Scan ahead to retrieve the next page count
            boolean hasMoreResults = scanThroughResults(iterator, 1) > 0;
            Integer maxResults = getResultsRestrictionIfApplicable();
            if (maxResults != null && maxResults <= results.size())
                hasMoreResults = false;
            return new SliceImpl<>(results, pageable, hasMoreResults);
        }
    }

    class DeleteExecution implements QueryExecution<T, ID> {

        @NonNull
        @Override
        public Object execute(@NonNull AbstractDynamoDBQuery<T, ID> dynamoDBQuery, Object[] values) throws BatchDeleteException {
            List<T> entities = dynamoDBQuery.doCreateQueryWithPermissions(values).getResultList();

            if (entities == null || entities.isEmpty()) {
                return Collections.emptyList();
            }

            // Group entities by class for extraction if needed
            Map<Class<?>, List<Object>> entitiesByClass = new HashMap<>();
            for (T entity : entities) {
                entitiesByClass.computeIfAbsent(entity.getClass(), k -> new ArrayList<>()).add(entity);
            }

            List<BatchWriteResult> batchResults = dynamoDBOperations.batchDelete(entities);

            // Extract unprocessed entities using SDK v2 extraction
            List<Object> unprocessedEntities = dynamoDBOperations.extractUnprocessedDeleteItems(
                    batchResults, entitiesByClass);

            if (unprocessedEntities.isEmpty()) {
                // All entities were successfully deleted
                return entities;
            } else {
                // Throw exception with actual unprocessed entities
                throw repackageToException(
                        unprocessedEntities,
                        0, // SDK v2 client handles retries internally
                        null, // No exception, just unprocessed items
                        BatchDeleteException.class);
            }
        }
    }

    class SingleEntityExecution implements QueryExecution<T, ID> {

        @Override
        public Object execute(@NonNull AbstractDynamoDBQuery<T, ID> dynamoDBQuery, Object[] values) {
            if (isCountQuery()) {
                return dynamoDBQuery.doCreateCountQueryWithPermissions(values, false).getSingleResult();
            } else if (isExistsQuery()) {
                List<T> resultList = dynamoDBQuery.doCreateQueryWithPermissions(values).getResultList();
                return resultList != null && !resultList.isEmpty();
            } else {
                return dynamoDBQuery.doCreateQueryWithPermissions(values).getSingleResult();
            }

        }
    }

    class SingleEntityLimitedExecution implements QueryExecution<T, ID> {

        @Nullable
        @Override
        public Object execute(@NonNull AbstractDynamoDBQuery<T, ID> dynamoDBQuery, Object[] values) {
            if (isCountQuery()) {
                return dynamoDBQuery.doCreateCountQueryWithPermissions(values, false).getSingleResult();
            } else {
                List<T> resultList = dynamoDBQuery.doCreateQueryWithPermissions(values).getResultList();
                return resultList != null && !resultList.isEmpty() ? resultList.getFirst() : null;

            }

        }
    }

    /**
     * Executes the query using the appropriate execution strategy determined by getExecution().
     * Delegates to the execution strategy to handle the actual query execution and result processing.
     * @param parameters the parameter values passed to the query method
     * @return the query result, which type depends on the query method return type
     */
    public Object execute(@NonNull Object[] parameters) {

        return getExecution().execute(this, parameters);
    }

    @NonNull
    @Override
    public DynamoDBQueryMethod<T, ID> getQueryMethod() {
        return this.method;
    }

}