package org.apache.flink.runtime.checkpoint;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointFailureManager.class */
public class CheckpointFailureManager {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) CheckpointFailureManager.class);
    public static final int UNLIMITED_TOLERABLE_FAILURE_NUMBER = Integer.MAX_VALUE;
    public static final String EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE = "Exceeded checkpoint tolerable failure threshold.";
    private static final int UNKNOWN_CHECKPOINT_ID = -1;
    private final int tolerableCpFailureNumber;
    private final FailJobCallback failureCallback;
    private final AtomicInteger continuousFailureCounter;
    private final Set<Long> countedCheckpointIds;
    private long lastSucceededCheckpointId = Long.MIN_VALUE;

    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointFailureManager$FailJobCallback.class */
    public interface FailJobCallback {
        void failJob(Throwable th);

        void failJobDueToTaskFailure(Throwable th, ExecutionAttemptID executionAttemptID);
    }

    public CheckpointFailureManager(int i, FailJobCallback failJobCallback) {
        Preconditions.checkArgument(i >= 0, "The tolerable checkpoint failure number is illegal, it must be greater than or equal to 0 .");
        this.tolerableCpFailureNumber = i;
        this.continuousFailureCounter = new AtomicInteger(0);
        this.failureCallback = (FailJobCallback) Preconditions.checkNotNull(failJobCallback);
        this.countedCheckpointIds = ConcurrentHashMap.newKeySet();
    }

    public void handleCheckpointException(@Nullable PendingCheckpoint pendingCheckpoint, CheckpointProperties checkpointProperties, CheckpointException checkpointException, @Nullable ExecutionAttemptID executionAttemptID, JobID jobID, @Nullable PendingCheckpointStats pendingCheckpointStats, CheckpointStatsTracker checkpointStatsTracker) {
        long checkpointID = pendingCheckpoint == null ? -1L : pendingCheckpoint.getCheckpointID();
        updateStatsAfterCheckpointFailed(pendingCheckpointStats, checkpointStatsTracker, checkpointException);
        if (CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING.equals(checkpointException.getCheckpointFailureReason())) {
            LOG.info("Failed to trigger checkpoint for job {} since {}.", jobID, checkpointException.getMessage());
        } else {
            Logger logger = LOG;
            Object[] objArr = new Object[4];
            objArr[0] = checkpointID == -1 ? "UNKNOWN_CHECKPOINT_ID" : Long.valueOf(checkpointID);
            objArr[1] = jobID;
            objArr[2] = Integer.valueOf(this.continuousFailureCounter.get());
            objArr[3] = checkpointException;
            logger.warn("Failed to trigger or complete checkpoint {} for job {}. ({} consecutive failed attempts so far)", objArr);
        }
        if (isJobManagerFailure(checkpointException, executionAttemptID)) {
            handleJobLevelCheckpointException(checkpointProperties, checkpointException, checkpointID);
        } else {
            handleTaskLevelCheckpointException((PendingCheckpoint) Preconditions.checkNotNull(pendingCheckpoint), checkpointException, (ExecutionAttemptID) Preconditions.checkNotNull(executionAttemptID));
        }
    }

    private void updateStatsAfterCheckpointFailed(@Nullable PendingCheckpointStats pendingCheckpointStats, CheckpointStatsTracker checkpointStatsTracker, CheckpointException checkpointException) {
        if (pendingCheckpointStats != null) {
            checkpointStatsTracker.reportFailedCheckpoint(pendingCheckpointStats.toFailedCheckpoint(System.currentTimeMillis(), checkpointException));
        } else {
            checkpointStatsTracker.reportFailedCheckpointsWithoutInProgress();
        }
    }

    private boolean isJobManagerFailure(CheckpointException checkpointException, @Nullable ExecutionAttemptID executionAttemptID) {
        return isPreFlightFailure(checkpointException) || executionAttemptID == null;
    }

    void handleJobLevelCheckpointException(CheckpointProperties checkpointProperties, CheckpointException checkpointException, long j) {
        if (checkpointProperties.isSavepoint()) {
            return;
        }
        FailJobCallback failJobCallback = this.failureCallback;
        failJobCallback.getClass();
        checkFailureAgainstCounter(checkpointException, j, (v1) -> {
            r3.failJob(v1);
        });
    }

    void handleTaskLevelCheckpointException(PendingCheckpoint pendingCheckpoint, CheckpointException checkpointException, ExecutionAttemptID executionAttemptID) {
        CheckpointProperties props = pendingCheckpoint.getProps();
        if (props.isSavepoint() && props.isSynchronous()) {
            this.failureCallback.failJob(checkpointException);
        } else {
            checkFailureAgainstCounter(checkpointException, pendingCheckpoint.getCheckpointID(), flinkRuntimeException -> {
                this.failureCallback.failJobDueToTaskFailure(flinkRuntimeException, executionAttemptID);
            });
        }
    }

    private void checkFailureAgainstCounter(CheckpointException checkpointException, long j, Consumer<FlinkRuntimeException> consumer) {
        if (j == -1 || j > this.lastSucceededCheckpointId) {
            checkFailureCounter(checkpointException, j);
            if (this.continuousFailureCounter.get() > this.tolerableCpFailureNumber) {
                clearCount();
                consumer.accept(new FlinkRuntimeException(String.format("%s The latest checkpoint failed due to %s, view the Checkpoint History tab or the Job Manager log to find out why continuous checkpoints failed.", EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE, checkpointException.getCheckpointFailureReason().message())));
            }
        }
    }

    public void checkFailureCounter(CheckpointException checkpointException, long j) {
        if (this.tolerableCpFailureNumber == Integer.MAX_VALUE) {
            return;
        }
        CheckpointFailureReason checkpointFailureReason = checkpointException.getCheckpointFailureReason();
        switch (checkpointFailureReason) {
            case PERIODIC_SCHEDULER_SHUTDOWN:
            case TOO_MANY_CHECKPOINT_REQUESTS:
            case MINIMUM_TIME_BETWEEN_CHECKPOINTS:
            case NOT_ALL_REQUIRED_TASKS_RUNNING:
            case CHECKPOINT_SUBSUMED:
            case CHECKPOINT_COORDINATOR_SUSPEND:
            case CHECKPOINT_COORDINATOR_SHUTDOWN:
            case CHANNEL_STATE_SHARED_STREAM_EXCEPTION:
            case JOB_FAILOVER_REGION:
            case CHECKPOINT_DECLINED_TASK_NOT_READY:
            case CHECKPOINT_DECLINED_TASK_CLOSING:
            case CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER:
            case CHECKPOINT_DECLINED_SUBSUMED:
            case CHECKPOINT_DECLINED_INPUT_END_OF_STREAM:
            case TASK_FAILURE:
            case TASK_CHECKPOINT_FAILURE:
            case UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE:
            case TRIGGER_CHECKPOINT_FAILURE:
                return;
            case IO_EXCEPTION:
            case CHECKPOINT_ASYNC_EXCEPTION:
            case CHECKPOINT_DECLINED:
            case CHECKPOINT_EXPIRED:
            case FINALIZE_CHECKPOINT_FAILURE:
                if (j == -1 || this.countedCheckpointIds.add(Long.valueOf(j))) {
                    this.continuousFailureCounter.incrementAndGet();
                    return;
                }
                return;
            default:
                throw new FlinkRuntimeException("Unknown checkpoint failure reason : " + checkpointFailureReason.name());
        }
    }

    public void handleCheckpointSuccess(long j) {
        if (j > this.lastSucceededCheckpointId) {
            this.lastSucceededCheckpointId = j;
            clearCount();
        }
    }

    private void clearCount() {
        this.continuousFailureCounter.set(0);
        this.countedCheckpointIds.clear();
    }

    private static boolean isPreFlightFailure(Throwable th) {
        return ((Boolean) ExceptionUtils.findThrowable(th, CheckpointException.class).map((v0) -> {
            return v0.getCheckpointFailureReason();
        }).map((v0) -> {
            return v0.isPreFlight();
        }).orElse(false)).booleanValue();
    }
}
