package org.apache.flink.runtime.checkpoint;

import java.util.Comparator;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.function.IntSupplier;
import java.util.function.LongConsumer;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.class */
public class CheckpointRequestDecider {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) CheckpointRequestDecider.class);
    private static final int LOG_TIME_IN_QUEUE_THRESHOLD_MS = 100;
    private static final int DEFAULT_MAX_QUEUED_REQUESTS = 1000;
    private final int maxConcurrentCheckpointAttempts;
    private final LongConsumer rescheduleTrigger;
    private final Clock clock;
    private final long minPauseBetweenCheckpoints;
    private final IntSupplier pendingCheckpointsSizeSupplier;
    private final IntSupplier numberOfCleaningCheckpointsSupplier;
    private final NavigableSet<CheckpointCoordinator.CheckpointTriggerRequest> queuedRequests;
    private final int maxQueuedRequests;

    /* JADX INFO: Access modifiers changed from: package-private */
    public CheckpointRequestDecider(int i, LongConsumer longConsumer, Clock clock, long j, IntSupplier intSupplier, IntSupplier intSupplier2) {
        this(i, longConsumer, clock, j, intSupplier, intSupplier2, 1000);
    }

    CheckpointRequestDecider(int i, LongConsumer longConsumer, Clock clock, long j, IntSupplier intSupplier, IntSupplier intSupplier2, int i2) {
        this.queuedRequests = new TreeSet(checkpointTriggerRequestsComparator());
        Preconditions.checkArgument(i > 0);
        Preconditions.checkArgument(i2 > 0);
        this.maxConcurrentCheckpointAttempts = i;
        this.rescheduleTrigger = longConsumer;
        this.clock = clock;
        this.minPauseBetweenCheckpoints = j;
        this.pendingCheckpointsSizeSupplier = intSupplier;
        this.numberOfCleaningCheckpointsSupplier = intSupplier2;
        this.maxQueuedRequests = i2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Optional<CheckpointCoordinator.CheckpointTriggerRequest> chooseRequestToExecute(CheckpointCoordinator.CheckpointTriggerRequest checkpointTriggerRequest, boolean z, long j) {
        if (this.queuedRequests.size() >= this.maxQueuedRequests && !this.queuedRequests.last().isPeriodic) {
            checkpointTriggerRequest.completeExceptionally(new CheckpointException(CheckpointFailureReason.TOO_MANY_CHECKPOINT_REQUESTS));
            return Optional.empty();
        }
        this.queuedRequests.add(checkpointTriggerRequest);
        if (this.queuedRequests.size() > this.maxQueuedRequests) {
            this.queuedRequests.pollLast().completeExceptionally(new CheckpointException(CheckpointFailureReason.TOO_MANY_CHECKPOINT_REQUESTS));
        }
        Optional<CheckpointCoordinator.CheckpointTriggerRequest> chooseRequestToExecute = chooseRequestToExecute(z, j);
        chooseRequestToExecute.ifPresent(CheckpointRequestDecider::logInQueueTime);
        return chooseRequestToExecute;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Optional<CheckpointCoordinator.CheckpointTriggerRequest> chooseQueuedRequestToExecute(boolean z, long j) {
        Optional<CheckpointCoordinator.CheckpointTriggerRequest> chooseRequestToExecute = chooseRequestToExecute(z, j);
        chooseRequestToExecute.ifPresent(CheckpointRequestDecider::logInQueueTime);
        return chooseRequestToExecute;
    }

    private Optional<CheckpointCoordinator.CheckpointTriggerRequest> chooseRequestToExecute(boolean z, long j) {
        if (z || this.queuedRequests.isEmpty() || this.numberOfCleaningCheckpointsSupplier.getAsInt() > this.maxConcurrentCheckpointAttempts) {
            return Optional.empty();
        }
        if (this.pendingCheckpointsSizeSupplier.getAsInt() >= this.maxConcurrentCheckpointAttempts) {
            return Optional.of(this.queuedRequests.first()).filter((v0) -> {
                return v0.isForce();
            }).map(checkpointTriggerRequest -> {
                return this.queuedRequests.pollFirst();
            });
        }
        CheckpointCoordinator.CheckpointTriggerRequest first = this.queuedRequests.first();
        if (!first.isForce() && first.isPeriodic) {
            long nextTriggerDelayMillis = nextTriggerDelayMillis(j);
            if (nextTriggerDelayMillis > 0) {
                this.queuedRequests.pollFirst().completeExceptionally(new CheckpointException(CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS));
                this.rescheduleTrigger.accept(nextTriggerDelayMillis);
                return Optional.empty();
            }
        }
        return Optional.of(this.queuedRequests.pollFirst());
    }

    private long nextTriggerDelayMillis(long j) {
        return (j - this.clock.relativeTimeMillis()) + this.minPauseBetweenCheckpoints;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    @Deprecated
    public PriorityQueue<CheckpointCoordinator.CheckpointTriggerRequest> getTriggerRequestQueue() {
        return new PriorityQueue<>((SortedSet) this.queuedRequests);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void abortAll(CheckpointException checkpointException) {
        while (!this.queuedRequests.isEmpty()) {
            this.queuedRequests.pollFirst().completeExceptionally(checkpointException);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getNumQueuedRequests() {
        return this.queuedRequests.size();
    }

    private static Comparator<CheckpointCoordinator.CheckpointTriggerRequest> checkpointTriggerRequestsComparator() {
        return (checkpointTriggerRequest, checkpointTriggerRequest2) -> {
            return checkpointTriggerRequest.props.isSavepoint() != checkpointTriggerRequest2.props.isSavepoint() ? checkpointTriggerRequest.props.isSavepoint() ? -1 : 1 : checkpointTriggerRequest.isForce() != checkpointTriggerRequest2.isForce() ? checkpointTriggerRequest.isForce() ? -1 : 1 : checkpointTriggerRequest.isPeriodic != checkpointTriggerRequest2.isPeriodic ? checkpointTriggerRequest.isPeriodic ? 1 : -1 : checkpointTriggerRequest.timestamp != checkpointTriggerRequest2.timestamp ? Long.compare(checkpointTriggerRequest.timestamp, checkpointTriggerRequest2.timestamp) : Integer.compare(System.identityHashCode(checkpointTriggerRequest), System.identityHashCode(checkpointTriggerRequest2));
        };
    }

    private static void logInQueueTime(CheckpointCoordinator.CheckpointTriggerRequest checkpointTriggerRequest) {
        if (LOG.isInfoEnabled()) {
            long currentTimeMillis = System.currentTimeMillis() - checkpointTriggerRequest.timestamp;
            if (currentTimeMillis > 100) {
                LOG.info("checkpoint request time in queue: {}", Long.valueOf(currentTimeMillis));
            }
        }
    }
}
