package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
import org.apache.flink.runtime.io.network.partition.ChannelStateHolder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.class */
public abstract class RecoveredInputChannel extends InputChannel implements ChannelStateHolder {
    private static final Logger LOG;
    private final ArrayDeque<Buffer> receivedBuffers;
    private final CompletableFuture<?> stateConsumedFuture;
    protected final BufferManager bufferManager;

    @GuardedBy("receivedBuffers")
    private boolean isReleased;
    protected ChannelStateWriter channelStateWriter;
    private int sequenceNumber;
    protected final int networkBuffersPerChannel;
    private boolean exclusiveBuffersAssigned;
    private long lastStoppedCheckpointId;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public RecoveredInputChannel(SingleInputGate singleInputGate, int i, ResultPartitionID resultPartitionID, int i2, int i3, int i4, Counter counter, Counter counter2, int i5) {
        super(singleInputGate, i, resultPartitionID, i2, i3, i4, counter, counter2);
        this.receivedBuffers = new ArrayDeque<>();
        this.stateConsumedFuture = new CompletableFuture<>();
        this.sequenceNumber = Integer.MIN_VALUE;
        this.lastStoppedCheckpointId = -1L;
        this.bufferManager = new BufferManager(singleInputGate.getMemorySegmentProvider(), this, 0);
        this.networkBuffersPerChannel = i5;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ChannelStateHolder
    public void setChannelStateWriter(ChannelStateWriter channelStateWriter) {
        Preconditions.checkState(this.channelStateWriter == null, "Already initialized");
        this.channelStateWriter = (ChannelStateWriter) Preconditions.checkNotNull(channelStateWriter);
    }

    public final InputChannel toInputChannel() throws IOException {
        Preconditions.checkState(this.stateConsumedFuture.isDone(), "recovered state is not fully consumed");
        InputChannel inputChannelInternal = toInputChannelInternal();
        inputChannelInternal.checkpointStopped(this.lastStoppedCheckpointId);
        return inputChannelInternal;
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public void checkpointStopped(long j) {
        this.lastStoppedCheckpointId = j;
    }

    protected abstract InputChannel toInputChannelInternal() throws IOException;

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<?> getStateConsumedFuture() {
        return this.stateConsumedFuture;
    }

    public void onRecoveredStateBuffer(Buffer buffer) {
        boolean isEmpty;
        boolean z = true;
        NetworkActionsLogger.traceRecover("InputChannelRecoveredStateHandler#recover", buffer, this.inputGate.getOwningTaskName(), this.channelInfo);
        try {
            synchronized (this.receivedBuffers) {
                if (this.isReleased) {
                    isEmpty = false;
                } else {
                    isEmpty = this.receivedBuffers.isEmpty();
                    this.receivedBuffers.add(buffer);
                    z = false;
                }
            }
            if (isEmpty) {
                notifyChannelNonEmpty();
            }
        } finally {
            if (z) {
                buffer.recycleBuffer();
            }
        }
    }

    public void finishReadRecoveredState() throws IOException {
        onRecoveredStateBuffer(EventSerializer.toBuffer(EndOfChannelStateEvent.INSTANCE, false));
        this.bufferManager.releaseFloatingBuffers();
        LOG.debug("{}/{} finished recovering input.", this.inputGate.getOwningTaskName(), this.channelInfo);
    }

    @Nullable
    private InputChannel.BufferAndAvailability getNextRecoveredStateBuffer() throws IOException {
        Buffer poll;
        Buffer.DataType peekDataTypeUnsafe;
        synchronized (this.receivedBuffers) {
            Preconditions.checkState(!this.isReleased, "Trying to read from released RecoveredInputChannel");
            poll = this.receivedBuffers.poll();
            peekDataTypeUnsafe = peekDataTypeUnsafe();
        }
        if (poll == null) {
            return null;
        }
        if (isEndOfChannelStateEvent(poll)) {
            this.stateConsumedFuture.complete(null);
            return null;
        }
        int i = this.sequenceNumber;
        this.sequenceNumber = i + 1;
        return new InputChannel.BufferAndAvailability(poll, peekDataTypeUnsafe, 0, i);
    }

    private boolean isEndOfChannelStateEvent(Buffer buffer) throws IOException {
        if (buffer.isBuffer()) {
            return false;
        }
        AbstractEvent fromBuffer = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
        buffer.setReaderIndex(0);
        return fromBuffer.getClass() == EndOfChannelStateEvent.class;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public Optional<InputChannel.BufferAndAvailability> getNextBuffer() throws IOException {
        checkError();
        return Optional.ofNullable(getNextRecoveredStateBuffer());
    }

    private Buffer.DataType peekDataTypeUnsafe() {
        if (!$assertionsDisabled && !Thread.holdsLock(this.receivedBuffers)) {
            throw new AssertionError();
        }
        Buffer peek = this.receivedBuffers.peek();
        return peek != null ? peek.getDataType() : Buffer.DataType.NONE;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public int getBuffersInUseCount() {
        int size;
        synchronized (this.receivedBuffers) {
            size = this.receivedBuffers.size();
        }
        return size;
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public void resumeConsumption() {
        throw new UnsupportedOperationException("RecoveredInputChannel should never be blocked.");
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public void acknowledgeAllRecordsProcessed() throws IOException {
        throw new UnsupportedOperationException("RecoveredInputChannel should not need acknowledge all records processed.");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public final void requestSubpartition() {
        throw new UnsupportedOperationException("RecoveredInputChannel should never request partition.");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public void sendTaskEvent(TaskEvent taskEvent) {
        throw new UnsupportedOperationException("RecoveredInputChannel should never send any task events.");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public boolean isReleased() {
        boolean z;
        synchronized (this.receivedBuffers) {
            z = this.isReleased;
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public void releaseAllResources() throws IOException {
        ArrayDeque<Buffer> arrayDeque = new ArrayDeque<>();
        boolean z = false;
        synchronized (this.receivedBuffers) {
            if (!this.isReleased) {
                this.isReleased = true;
                z = true;
                arrayDeque.addAll(this.receivedBuffers);
                this.receivedBuffers.clear();
            }
        }
        if (z) {
            this.bufferManager.releaseAllBuffers(arrayDeque);
        }
    }

    @VisibleForTesting
    protected int getNumberOfQueuedBuffers() {
        int size;
        synchronized (this.receivedBuffers) {
            size = this.receivedBuffers.size();
        }
        return size;
    }

    public Buffer requestBufferBlocking() throws InterruptedException, IOException {
        if (!this.exclusiveBuffersAssigned) {
            this.bufferManager.requestExclusiveBuffers(this.networkBuffersPerChannel);
            this.exclusiveBuffersAssigned = true;
        }
        return this.bufferManager.requestBufferBlocking();
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public void checkpointStarted(CheckpointBarrier checkpointBarrier) throws CheckpointException {
        throw new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public void announceBufferSize(int i) {
    }

    static {
        $assertionsDisabled = !RecoveredInputChannel.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger((Class<?>) RecoveredInputChannel.class);
    }
}
