package org.apache.flink.streaming.runtime.io.checkpointing;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.PullingAsyncDataInput;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.EventAnnouncement;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGate.class */
public class CheckpointedInputGate implements PullingAsyncDataInput<BufferOrEvent>, Closeable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) CheckpointedInputGate.class);
    private final CheckpointBarrierHandler barrierHandler;
    private final UpstreamRecoveryTracker upstreamRecoveryTracker;
    private final InputGate inputGate;
    private final MailboxExecutor mailboxExecutor;
    private boolean isFinished;

    public CheckpointedInputGate(InputGate inputGate, CheckpointBarrierHandler checkpointBarrierHandler, MailboxExecutor mailboxExecutor) {
        this(inputGate, checkpointBarrierHandler, mailboxExecutor, UpstreamRecoveryTracker.NO_OP);
    }

    public CheckpointedInputGate(InputGate inputGate, CheckpointBarrierHandler checkpointBarrierHandler, MailboxExecutor mailboxExecutor, UpstreamRecoveryTracker upstreamRecoveryTracker) {
        this.inputGate = inputGate;
        this.barrierHandler = checkpointBarrierHandler;
        this.mailboxExecutor = mailboxExecutor;
        this.upstreamRecoveryTracker = upstreamRecoveryTracker;
        waitForPriorityEvents(inputGate, mailboxExecutor);
    }

    private void processPriorityEvents() throws IOException, InterruptedException {
        boolean isDone = this.inputGate.getPriorityEventAvailableFuture().isDone();
        while (isDone) {
            Optional<BufferOrEvent> pollNext = pollNext();
            if (!pollNext.isPresent()) {
                break;
            }
            BufferOrEvent bufferOrEvent = pollNext.get();
            Preconditions.checkState(bufferOrEvent.hasPriority(), "Should only poll priority events");
            isDone = bufferOrEvent.morePriorityEvents();
        }
        waitForPriorityEvents(this.inputGate, this.mailboxExecutor);
    }

    private void waitForPriorityEvents(InputGate inputGate, MailboxExecutor mailboxExecutor) {
        FutureUtils.assertNoException(inputGate.getPriorityEventAvailableFuture().thenRun(() -> {
            try {
                mailboxExecutor.execute(this::processPriorityEvents, "process priority event @ gate %s", inputGate);
            } catch (RejectedExecutionException e) {
                LOG.debug("Ignored RejectedExecutionException in CheckpointedInputGate.waitForPriorityEvents");
            }
        }));
    }

    @Override // org.apache.flink.runtime.io.AvailabilityProvider
    public CompletableFuture<?> getAvailableFuture() {
        return this.inputGate.getAvailableFuture();
    }

    @Override // org.apache.flink.runtime.io.PullingAsyncDataInput
    public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
        Optional<BufferOrEvent> pollNext = this.inputGate.pollNext();
        if (!pollNext.isPresent()) {
            return handleEmptyBuffer();
        }
        BufferOrEvent bufferOrEvent = pollNext.get();
        if (bufferOrEvent.isEvent()) {
            return handleEvent(bufferOrEvent);
        }
        if (bufferOrEvent.isBuffer()) {
            this.barrierHandler.addProcessedBytes(bufferOrEvent.getBuffer().getSize());
        }
        return pollNext;
    }

    private Optional<BufferOrEvent> handleEvent(BufferOrEvent bufferOrEvent) throws IOException {
        Class<?> cls = bufferOrEvent.getEvent().getClass();
        if (cls == CheckpointBarrier.class) {
            this.barrierHandler.processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelInfo(), false);
        } else if (cls == CancelCheckpointMarker.class) {
            this.barrierHandler.processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent(), bufferOrEvent.getChannelInfo());
        } else if (cls == EndOfData.class) {
            this.inputGate.acknowledgeAllRecordsProcessed(bufferOrEvent.getChannelInfo());
        } else if (cls == EndOfPartitionEvent.class) {
            this.barrierHandler.processEndOfPartition(bufferOrEvent.getChannelInfo());
        } else if (cls == EventAnnouncement.class) {
            EventAnnouncement eventAnnouncement = (EventAnnouncement) bufferOrEvent.getEvent();
            AbstractEvent announcedEvent = eventAnnouncement.getAnnouncedEvent();
            Preconditions.checkState(announcedEvent instanceof CheckpointBarrier, "Only CheckpointBarrier announcement are currently supported, but found [%s]", announcedEvent);
            this.barrierHandler.processBarrierAnnouncement((CheckpointBarrier) announcedEvent, eventAnnouncement.getSequenceNumber(), bufferOrEvent.getChannelInfo());
        } else if (bufferOrEvent.getEvent().getClass() == EndOfChannelStateEvent.class) {
            this.upstreamRecoveryTracker.handleEndOfRecovery(bufferOrEvent.getChannelInfo());
        }
        return Optional.of(bufferOrEvent);
    }

    public CompletableFuture<Void> getAllBarriersReceivedFuture(long j) {
        return this.barrierHandler.getAllBarriersReceivedFuture(j);
    }

    private Optional<BufferOrEvent> handleEmptyBuffer() {
        if (this.inputGate.isFinished()) {
            this.isFinished = true;
        }
        return Optional.empty();
    }

    @Override // org.apache.flink.runtime.io.PullingAsyncDataInput
    public boolean isFinished() {
        return this.isFinished;
    }

    @Override // org.apache.flink.runtime.io.PullingAsyncDataInput
    public PullingAsyncDataInput.EndOfDataStatus hasReceivedEndOfData() {
        return this.inputGate.hasReceivedEndOfData();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.barrierHandler.close();
    }

    @VisibleForTesting
    long getLatestCheckpointId() {
        return this.barrierHandler.getLatestCheckpointId();
    }

    @VisibleForTesting
    long getAlignmentDurationNanos() {
        return this.barrierHandler.getAlignmentDurationNanos();
    }

    @VisibleForTesting
    long getCheckpointStartDelayNanos() {
        return this.barrierHandler.getCheckpointStartDelayNanos();
    }

    public int getNumberOfInputChannels() {
        return this.inputGate.getNumberOfInputChannels();
    }

    public String toString() {
        return this.barrierHandler.toString();
    }

    public InputChannel getChannel(int i) {
        return this.inputGate.getChannel(i);
    }

    public List<InputChannelInfo> getChannelInfos() {
        return this.inputGate.getChannelInfos();
    }

    public boolean allChannelsRecovered() {
        return this.upstreamRecoveryTracker.allChannelsRecovered();
    }

    @VisibleForTesting
    CheckpointBarrierHandler getCheckpointBarrierHandler() {
        return this.barrierHandler;
    }
}
