package org.apache.flink.table.runtime.operators.sort;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.compression.BlockCompressionFactory;
import org.apache.flink.runtime.io.disk.iomanager.AbstractChannelWriterOutputView;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.operators.sort.IndexedSorter;
import org.apache.flink.runtime.operators.sort.QuickSort;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.io.ChannelWithMeta;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.table.runtime.util.FileChannelUtil;
import org.apache.flink.table.runtime.util.MemorySegmentPool;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/sort/BufferedKVExternalSorter.class */
public class BufferedKVExternalSorter {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BufferedKVExternalSorter.class);
    private final NormalizedKeyComputer nKeyComputer;
    private final RecordComparator comparator;
    private final BinaryRowDataSerializer keySerializer;
    private final BinaryRowDataSerializer valueSerializer;
    private final BinaryKVExternalMerger merger;
    private final IOManager ioManager;
    private final int maxNumFileHandles;
    private final FileIOChannel.Enumerator enumerator;
    private final SpillChannelManager channelManager;
    private final int pageSize;
    private long numSpillFiles;
    private long spillInBytes;
    private long spillInCompressedBytes;
    private final boolean compressionEnabled;
    private final BlockCompressionFactory compressionCodecFactory;
    private final int compressionBlockSize;
    private volatile boolean closed = false;
    private final List<ChannelWithMeta> channelIDs = new ArrayList();
    private final IndexedSorter sorter = new QuickSort();

    public BufferedKVExternalSorter(IOManager iOManager, BinaryRowDataSerializer binaryRowDataSerializer, BinaryRowDataSerializer binaryRowDataSerializer2, NormalizedKeyComputer normalizedKeyComputer, RecordComparator recordComparator, int i, int i2, boolean z, int i3) {
        this.keySerializer = binaryRowDataSerializer;
        this.valueSerializer = binaryRowDataSerializer2;
        this.nKeyComputer = normalizedKeyComputer;
        this.comparator = recordComparator;
        this.pageSize = i;
        this.maxNumFileHandles = i2;
        this.compressionEnabled = z;
        this.compressionCodecFactory = this.compressionEnabled ? BlockCompressionFactory.createBlockCompressionFactory(BlockCompressionFactory.CompressionFactoryName.LZ4.toString()) : null;
        this.compressionBlockSize = i3;
        this.ioManager = iOManager;
        this.enumerator = this.ioManager.createChannelEnumerator();
        this.channelManager = new SpillChannelManager();
        this.merger = new BinaryKVExternalMerger(iOManager, i, i2, this.channelManager, binaryRowDataSerializer, binaryRowDataSerializer2, recordComparator, z, this.compressionCodecFactory, i3);
    }

    public MutableObjectIterator<Tuple2<BinaryRowData, BinaryRowData>> getKVIterator() throws IOException {
        List<ChannelWithMeta> list;
        List<ChannelWithMeta> list2 = this.channelIDs;
        while (true) {
            list = list2;
            if (this.closed || list.size() <= this.maxNumFileHandles) {
                break;
            }
            list2 = this.merger.mergeChannelList(list);
        }
        ArrayList arrayList = new ArrayList();
        BinaryMergeIterator<Tuple2<BinaryRowData, BinaryRowData>> mergingIterator = this.merger.getMergingIterator(list, arrayList);
        this.channelManager.addOpenChannels(arrayList);
        return mergingIterator;
    }

    public void sortAndSpill(ArrayList<MemorySegment> arrayList, long j, MemorySegmentPool memorySegmentPool) throws IOException {
        BinaryKVInMemorySortBuffer createBuffer = BinaryKVInMemorySortBuffer.createBuffer(this.nKeyComputer, this.keySerializer, this.valueSerializer, this.comparator, arrayList, j, memorySegmentPool);
        this.sorter.sort(createBuffer);
        FileIOChannel.ID next = this.enumerator.next();
        this.channelManager.addChannel(next);
        AbstractChannelWriterOutputView abstractChannelWriterOutputView = null;
        try {
            this.numSpillFiles++;
            abstractChannelWriterOutputView = FileChannelUtil.createOutputView(this.ioManager, next, this.compressionEnabled, this.compressionCodecFactory, this.compressionBlockSize, this.pageSize);
            createBuffer.writeToOutput(abstractChannelWriterOutputView);
            this.spillInBytes += abstractChannelWriterOutputView.getNumBytes();
            this.spillInCompressedBytes += abstractChannelWriterOutputView.getNumCompressedBytes();
            int close = abstractChannelWriterOutputView.close();
            int blockCount = abstractChannelWriterOutputView.getBlockCount();
            LOG.info("here spill the {}th kv external buffer data with {} bytes and {} compressed bytes", Long.valueOf(this.numSpillFiles), Long.valueOf(this.spillInBytes), Long.valueOf(this.spillInCompressedBytes));
            this.channelIDs.add(new ChannelWithMeta(next, blockCount, close));
        } catch (IOException e) {
            if (abstractChannelWriterOutputView != null) {
                abstractChannelWriterOutputView.close();
                abstractChannelWriterOutputView.getChannel().deleteChannel();
            }
            throw e;
        }
    }

    public void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.merger.close();
        this.channelManager.close();
    }
}
