package org.apache.flink.runtime.checkpoint;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.operators.coordination.OperatorInfo;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.concurrent.FutureUtils;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.class */
public final class OperatorCoordinatorCheckpoints {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints$AllCoordinatorSnapshots.class */
    public static final class AllCoordinatorSnapshots {
        private final Collection<CoordinatorSnapshot> snapshots;

        AllCoordinatorSnapshots(Collection<CoordinatorSnapshot> collection) {
            this.snapshots = collection;
        }

        public Iterable<CoordinatorSnapshot> snapshots() {
            return this.snapshots;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints$CoordinatorSnapshot.class */
    public static final class CoordinatorSnapshot {
        final OperatorInfo coordinator;
        final ByteStreamStateHandle state;

        CoordinatorSnapshot(OperatorInfo operatorInfo, ByteStreamStateHandle byteStreamStateHandle) {
            this.coordinator = operatorInfo;
            this.state = byteStreamStateHandle;
        }
    }

    OperatorCoordinatorCheckpoints() {
    }

    public static CompletableFuture<CoordinatorSnapshot> triggerCoordinatorCheckpoint(OperatorCoordinatorCheckpointContext operatorCoordinatorCheckpointContext, long j) throws Exception {
        CompletableFuture<byte[]> completableFuture = new CompletableFuture<>();
        operatorCoordinatorCheckpointContext.checkpointCoordinator(j, completableFuture);
        return completableFuture.thenApply(bArr -> {
            return new CoordinatorSnapshot(operatorCoordinatorCheckpointContext, new ByteStreamStateHandle(operatorCoordinatorCheckpointContext.operatorId().toString(), bArr));
        });
    }

    public static CompletableFuture<AllCoordinatorSnapshots> triggerAllCoordinatorCheckpoints(Collection<OperatorCoordinatorCheckpointContext> collection, long j) throws Exception {
        ArrayList arrayList = new ArrayList(collection.size());
        Iterator<OperatorCoordinatorCheckpointContext> it = collection.iterator();
        while (it.hasNext()) {
            arrayList.add(triggerCoordinatorCheckpoint(it.next(), j));
        }
        return FutureUtils.combineAll(arrayList).thenApply(AllCoordinatorSnapshots::new);
    }

    public static CompletableFuture<Void> triggerAndAcknowledgeAllCoordinatorCheckpoints(Collection<OperatorCoordinatorCheckpointContext> collection, PendingCheckpoint pendingCheckpoint, Executor executor) throws Exception {
        return triggerAllCoordinatorCheckpoints(collection, pendingCheckpoint.getCheckpointID()).thenAcceptAsync(allCoordinatorSnapshots -> {
            try {
                acknowledgeAllCoordinators(pendingCheckpoint, allCoordinatorSnapshots.snapshots);
            } catch (Exception e) {
                throw new CompletionException(e);
            }
        }, executor);
    }

    public static CompletableFuture<Void> triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(Collection<OperatorCoordinatorCheckpointContext> collection, PendingCheckpoint pendingCheckpoint, Executor executor) throws CompletionException {
        try {
            return triggerAndAcknowledgeAllCoordinatorCheckpoints(collection, pendingCheckpoint, executor);
        } catch (Exception e) {
            throw new CompletionException(e);
        }
    }

    private static void acknowledgeAllCoordinators(PendingCheckpoint pendingCheckpoint, Collection<CoordinatorSnapshot> collection) throws CheckpointException {
        for (CoordinatorSnapshot coordinatorSnapshot : collection) {
            PendingCheckpoint.TaskAcknowledgeResult acknowledgeCoordinatorState = pendingCheckpoint.acknowledgeCoordinatorState(coordinatorSnapshot.coordinator, coordinatorSnapshot.state);
            if (acknowledgeCoordinatorState != PendingCheckpoint.TaskAcknowledgeResult.SUCCESS) {
                String str = "Coordinator state not acknowledged successfully: " + acknowledgeCoordinatorState;
                CheckpointException failureCause = pendingCheckpoint.isDisposed() ? pendingCheckpoint.getFailureCause() : null;
                CheckpointFailureReason checkpointFailureReason = CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE;
                if (failureCause == null) {
                    throw new CheckpointException(str, checkpointFailureReason);
                }
                if (ExceptionUtils.findThrowable(failureCause, IOException.class).isPresent()) {
                    checkpointFailureReason = CheckpointFailureReason.IO_EXCEPTION;
                }
                throw new CheckpointException(str, checkpointFailureReason, failureCause);
            }
        }
    }
}
