package org.apache.flink.runtime.io.network.partition.hybrid;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionFileReader;
import org.apache.flink.util.FatalExitExceptionHandler;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.class */
public class HsFileDataManager implements Runnable, BufferRecycler {
    private static final Logger LOG;
    private final ScheduledExecutorService ioExecutor;
    private final int maxRequestedBuffers;
    private final Duration bufferRequestTimeout;
    private final BatchShuffleReadBufferPool bufferPool;
    private final Path dataFilePath;
    private final HsFileDataIndex dataIndex;
    private final HsSubpartitionFileReader.Factory fileReaderFactory;
    private final HybridShuffleConfiguration hybridShuffleConfiguration;

    @GuardedBy("lock")
    private boolean isRunning;

    @GuardedBy("lock")
    private volatile int numRequestedBuffers;

    @GuardedBy("lock")
    private volatile boolean isReleased;

    @GuardedBy("lock")
    private FileChannel dataFileChannel;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Object lock = new Object();

    @GuardedBy("lock")
    private final CompletableFuture<?> releaseFuture = new CompletableFuture<>();
    private final ByteBuffer headerBuf = BufferReaderWriterUtil.allocatedHeaderBuffer();

    @GuardedBy("lock")
    private final Set<HsSubpartitionFileReader> allReaders = new HashSet();

    public HsFileDataManager(BatchShuffleReadBufferPool batchShuffleReadBufferPool, ScheduledExecutorService scheduledExecutorService, HsFileDataIndex hsFileDataIndex, Path path, HsSubpartitionFileReader.Factory factory, HybridShuffleConfiguration hybridShuffleConfiguration) {
        this.fileReaderFactory = factory;
        this.hybridShuffleConfiguration = (HybridShuffleConfiguration) Preconditions.checkNotNull(hybridShuffleConfiguration);
        this.dataIndex = (HsFileDataIndex) Preconditions.checkNotNull(hsFileDataIndex);
        this.dataFilePath = (Path) Preconditions.checkNotNull(path);
        this.bufferPool = (BatchShuffleReadBufferPool) Preconditions.checkNotNull(batchShuffleReadBufferPool);
        this.ioExecutor = (ScheduledExecutorService) Preconditions.checkNotNull(scheduledExecutorService);
        this.maxRequestedBuffers = hybridShuffleConfiguration.getMaxRequestedBuffers();
        this.bufferRequestTimeout = (Duration) Preconditions.checkNotNull(hybridShuffleConfiguration.getBufferRequestTimeout());
    }

    public void setup() {
        this.bufferPool.initialize();
    }

    @Override // java.lang.Runnable
    public synchronized void run() {
        endCurrentRoundOfReading(tryRead());
    }

    public HsDataView registerNewConsumer(int i, HsConsumerId hsConsumerId, HsSubpartitionConsumerInternalOperations hsSubpartitionConsumerInternalOperations) throws IOException {
        HsSubpartitionFileReader createFileReader;
        synchronized (this.lock) {
            Preconditions.checkState(!this.isReleased, "HsFileDataManager is already released.");
            lazyInitialize();
            createFileReader = this.fileReaderFactory.createFileReader(i, hsConsumerId, this.dataFileChannel, hsSubpartitionConsumerInternalOperations, this.dataIndex, this.hybridShuffleConfiguration.getMaxBuffersReadAhead(), this::releaseSubpartitionReader, this.headerBuf);
            this.allReaders.add(createFileReader);
            mayTriggerReading();
        }
        return createFileReader;
    }

    public void closeDataIndexAndDeleteShuffleFile() {
        this.dataIndex.close();
        IOUtils.deleteFileQuietly(this.dataFilePath);
    }

    public void releaseSubpartitionReader(HsSubpartitionFileReader hsSubpartitionFileReader) {
        synchronized (this.lock) {
            removeSubpartitionReaders(Collections.singleton(hsSubpartitionFileReader));
        }
    }

    public void release() {
        synchronized (this.lock) {
            if (this.isReleased) {
                return;
            }
            this.isReleased = true;
            ArrayList arrayList = new ArrayList(this.allReaders);
            mayNotifyReleased();
            failSubpartitionReaders(arrayList, new IllegalStateException("Result partition has been already released."));
            this.releaseFuture.thenRun(this::closeDataIndexAndDeleteShuffleFile);
        }
    }

    private int tryRead() {
        Queue<HsSubpartitionFileReader> prepareAndGetAvailableReaders = prepareAndGetAvailableReaders();
        if (prepareAndGetAvailableReaders.isEmpty()) {
            return 0;
        }
        try {
            Queue<MemorySegment> allocateBuffers = allocateBuffers();
            int size = allocateBuffers.size();
            if (size <= 0) {
                return 0;
            }
            readData(prepareAndGetAvailableReaders, allocateBuffers);
            int size2 = size - allocateBuffers.size();
            releaseBuffers(allocateBuffers);
            return size2;
        } catch (Exception e) {
            failSubpartitionReaders(prepareAndGetAvailableReaders, e);
            LOG.error("Failed to request buffers for data reading.", (Throwable) e);
            return 0;
        }
    }

    private Queue<MemorySegment> allocateBuffers() throws Exception {
        long bufferRequestTimeoutTime = getBufferRequestTimeoutTime();
        while (true) {
            List<MemorySegment> requestBuffers = this.bufferPool.requestBuffers();
            if (!requestBuffers.isEmpty()) {
                return new ArrayDeque(requestBuffers);
            }
            Preconditions.checkState(!this.isReleased, "Result partition has been already released.");
            if (System.currentTimeMillis() >= bufferRequestTimeoutTime) {
                long currentTimeMillis = System.currentTimeMillis();
                bufferRequestTimeoutTime = getBufferRequestTimeoutTime();
                if (currentTimeMillis >= currentTimeMillis) {
                    throw new TimeoutException(String.format("Buffer request timeout, this means there is a fierce contention of the batch shuffle read memory, please increase '%s'.", TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key()));
                }
            }
        }
    }

    private void mayTriggerReading() {
        synchronized (this.lock) {
            if (!this.isRunning && !this.allReaders.isEmpty() && this.numRequestedBuffers + this.bufferPool.getNumBuffersPerRequest() <= this.maxRequestedBuffers && this.numRequestedBuffers < this.bufferPool.getAverageBuffersPerRequester()) {
                this.isRunning = true;
                this.ioExecutor.execute(() -> {
                    try {
                        run();
                    } catch (Throwable th) {
                        FatalExitExceptionHandler.INSTANCE.uncaughtException(Thread.currentThread(), th);
                    }
                });
            }
        }
    }

    @GuardedBy("lock")
    private void mayNotifyReleased() {
        if (!$assertionsDisabled && !Thread.holdsLock(this.lock)) {
            throw new AssertionError();
        }
        if (this.isReleased && this.allReaders.isEmpty()) {
            this.releaseFuture.complete(null);
        }
    }

    private long getBufferRequestTimeoutTime() {
        return this.bufferPool.getLastBufferOperationTimestamp() + this.bufferRequestTimeout.toMillis();
    }

    private void releaseBuffers(Queue<MemorySegment> queue) {
        if (queue.isEmpty()) {
            return;
        }
        try {
            this.bufferPool.recycle(queue);
            queue.clear();
        } catch (Throwable th) {
            FatalExitExceptionHandler.INSTANCE.uncaughtException(Thread.currentThread(), th);
        }
    }

    private Queue<HsSubpartitionFileReader> prepareAndGetAvailableReaders() {
        synchronized (this.lock) {
            if (this.isReleased) {
                return new ArrayDeque();
            }
            Iterator<HsSubpartitionFileReader> it = this.allReaders.iterator();
            while (it.hasNext()) {
                it.next().prepareForScheduling();
            }
            return new PriorityQueue(this.allReaders);
        }
    }

    private void readData(Queue<HsSubpartitionFileReader> queue, Queue<MemorySegment> queue2) {
        while (!queue.isEmpty() && !queue2.isEmpty()) {
            HsSubpartitionFileReader poll = queue.poll();
            try {
                poll.readBuffers(queue2, this);
            } catch (IOException e) {
                failSubpartitionReaders(Collections.singletonList(poll), e);
                LOG.debug("Failed to read shuffle data.", (Throwable) e);
            }
        }
    }

    private void failSubpartitionReaders(Collection<HsSubpartitionFileReader> collection, Throwable th) {
        synchronized (this.lock) {
            removeSubpartitionReaders(collection);
        }
        Iterator<HsSubpartitionFileReader> it = collection.iterator();
        while (it.hasNext()) {
            it.next().fail(th);
        }
    }

    @GuardedBy("lock")
    private void removeSubpartitionReaders(Collection<HsSubpartitionFileReader> collection) {
        this.allReaders.removeAll(collection);
        if (this.allReaders.isEmpty()) {
            this.bufferPool.unregisterRequester(this);
            closeFileChannel();
        }
    }

    private void endCurrentRoundOfReading(int i) {
        synchronized (this.lock) {
            this.numRequestedBuffers += i;
            this.isRunning = false;
            mayNotifyReleased();
        }
        if (i == 0) {
            this.ioExecutor.schedule(this::mayTriggerReading, 5L, TimeUnit.MILLISECONDS);
        } else {
            mayTriggerReading();
        }
    }

    @GuardedBy("lock")
    private void lazyInitialize() throws IOException {
        if (!$assertionsDisabled && !Thread.holdsLock(this.lock)) {
            throw new AssertionError();
        }
        try {
            if (this.allReaders.isEmpty()) {
                this.dataFileChannel = openFileChannel(this.dataFilePath);
                this.bufferPool.registerRequester(this);
            }
        } catch (IOException e) {
            if (this.allReaders.isEmpty()) {
                this.bufferPool.unregisterRequester(this);
                closeFileChannel();
            }
            throw e;
        }
    }

    private FileChannel openFileChannel(Path path) throws IOException {
        return FileChannel.open(path, StandardOpenOption.READ);
    }

    @GuardedBy("lock")
    private void closeFileChannel() {
        if (!$assertionsDisabled && !Thread.holdsLock(this.lock)) {
            throw new AssertionError();
        }
        IOUtils.closeQuietly(this.dataFileChannel);
        this.dataFileChannel = null;
    }

    @Override // org.apache.flink.runtime.io.network.buffer.BufferRecycler
    public void recycle(MemorySegment memorySegment) {
        synchronized (this.lock) {
            this.bufferPool.recycle(memorySegment);
            this.numRequestedBuffers--;
            mayTriggerReading();
        }
    }

    static {
        $assertionsDisabled = !HsFileDataManager.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger((Class<?>) HsFileDataManager.class);
    }
}
