package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.IndexRange;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageConfiguration;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyServiceImpl;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerClient;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.NettyShuffleUtils;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.runtime.shuffle.ShuffleUtils;
import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
import org.apache.flink.runtime.throughput.BufferDebloatConfiguration;
import org.apache.flink.runtime.throughput.BufferDebloater;
import org.apache.flink.runtime.throughput.ThroughputCalculator;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.clock.SystemClock;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.class */
public class SingleInputGateFactory {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SingleInputGateFactory.class);

    @Nonnull
    protected final ResourceID taskExecutorResourceId;
    protected final int partitionRequestInitialBackoff;
    protected final int partitionRequestMaxBackoff;

    @Nonnull
    protected final ConnectionManager connectionManager;

    @Nonnull
    protected final ResultPartitionManager partitionManager;

    @Nonnull
    protected final TaskEventPublisher taskEventPublisher;

    @Nonnull
    protected final NetworkBufferPool networkBufferPool;
    private final Optional<Integer> maxRequiredBuffersPerGate;
    protected final int configuredNetworkBuffersPerChannel;
    private final int floatingNetworkBuffersPerGate;
    private final boolean batchShuffleCompressionEnabled;
    private final String compressionCodec;
    private final int networkBufferSize;
    private final BufferDebloatConfiguration debloatConfiguration;

    @Nullable
    private final TieredStorageConfiguration tieredStorageConfiguration;

    @Nullable
    private final TieredStorageNettyServiceImpl tieredStorageNettyService;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory$ChannelStatistics.class */
    public static class ChannelStatistics {
        int numLocalChannels;
        int numRemoteChannels;
        int numUnknownChannels;

        protected ChannelStatistics() {
        }

        public String toString() {
            return String.format("local: %s, remote: %s, unknown: %s", Integer.valueOf(this.numLocalChannels), Integer.valueOf(this.numRemoteChannels), Integer.valueOf(this.numUnknownChannels));
        }
    }

    public SingleInputGateFactory(@Nonnull ResourceID resourceID, @Nonnull NettyShuffleEnvironmentConfiguration nettyShuffleEnvironmentConfiguration, @Nonnull ConnectionManager connectionManager, @Nonnull ResultPartitionManager resultPartitionManager, @Nonnull TaskEventPublisher taskEventPublisher, @Nonnull NetworkBufferPool networkBufferPool, @Nullable TieredStorageConfiguration tieredStorageConfiguration, @Nullable TieredStorageNettyServiceImpl tieredStorageNettyServiceImpl) {
        this.taskExecutorResourceId = resourceID;
        this.partitionRequestInitialBackoff = nettyShuffleEnvironmentConfiguration.partitionRequestInitialBackoff();
        this.partitionRequestMaxBackoff = nettyShuffleEnvironmentConfiguration.partitionRequestMaxBackoff();
        this.maxRequiredBuffersPerGate = nettyShuffleEnvironmentConfiguration.maxRequiredBuffersPerGate();
        this.configuredNetworkBuffersPerChannel = NettyShuffleUtils.getNetworkBuffersPerInputChannel(nettyShuffleEnvironmentConfiguration.networkBuffersPerChannel());
        this.floatingNetworkBuffersPerGate = nettyShuffleEnvironmentConfiguration.floatingNetworkBuffersPerGate();
        this.batchShuffleCompressionEnabled = nettyShuffleEnvironmentConfiguration.isBatchShuffleCompressionEnabled();
        this.compressionCodec = nettyShuffleEnvironmentConfiguration.getCompressionCodec();
        this.networkBufferSize = nettyShuffleEnvironmentConfiguration.networkBufferSize();
        this.connectionManager = connectionManager;
        this.partitionManager = resultPartitionManager;
        this.taskEventPublisher = taskEventPublisher;
        this.networkBufferPool = networkBufferPool;
        this.debloatConfiguration = nettyShuffleEnvironmentConfiguration.getDebloatConfiguration();
        this.tieredStorageConfiguration = tieredStorageConfiguration;
        this.tieredStorageNettyService = tieredStorageNettyServiceImpl;
    }

    public SingleInputGate create(@Nonnull ShuffleIOOwnerContext shuffleIOOwnerContext, int i, @Nonnull InputGateDeploymentDescriptor inputGateDeploymentDescriptor, @Nonnull PartitionProducerStateProvider partitionProducerStateProvider, @Nonnull InputChannelMetrics inputChannelMetrics) {
        GateBuffersSpec createGateBuffersSpec = InputGateSpecUtils.createGateBuffersSpec(this.maxRequiredBuffersPerGate, this.configuredNetworkBuffersPerChannel, this.floatingNetworkBuffersPerGate, inputGateDeploymentDescriptor.getConsumedPartitionType(), calculateNumChannels(inputGateDeploymentDescriptor.getShuffleDescriptors().length, inputGateDeploymentDescriptor.getConsumedSubpartitionIndexRange()), this.tieredStorageConfiguration != null);
        SupplierWithException<BufferPool, IOException> createBufferPoolFactory = createBufferPoolFactory(this.networkBufferPool, createGateBuffersSpec.getRequiredFloatingBuffers(), createGateBuffersSpec.getTotalFloatingBuffers());
        BufferDecompressor bufferDecompressor = null;
        if (inputGateDeploymentDescriptor.getConsumedPartitionType().supportCompression() && this.batchShuffleCompressionEnabled) {
            bufferDecompressor = new BufferDecompressor(this.networkBufferSize, this.compressionCodec);
        }
        String ownerName = shuffleIOOwnerContext.getOwnerName();
        MetricGroup inputGroup = shuffleIOOwnerContext.getInputGroup();
        IndexRange consumedSubpartitionIndexRange = inputGateDeploymentDescriptor.getConsumedSubpartitionIndexRange();
        TieredStorageConsumerClient tieredStorageConsumerClient = null;
        ArrayList arrayList = null;
        if (this.tieredStorageConfiguration != null) {
            ShuffleDescriptor[] shuffleDescriptors = inputGateDeploymentDescriptor.getShuffleDescriptors();
            arrayList = new ArrayList();
            for (ShuffleDescriptor shuffleDescriptor : shuffleDescriptors) {
                TieredStoragePartitionId convertId = TieredStorageIdMappingUtils.convertId(shuffleDescriptor.getResultPartitionID());
                for (int startIndex = consumedSubpartitionIndexRange.getStartIndex(); startIndex <= consumedSubpartitionIndexRange.getEndIndex(); startIndex++) {
                    arrayList.add(new TieredStorageConsumerSpec(convertId, new TieredStorageSubpartitionId(startIndex)));
                }
            }
            tieredStorageConsumerClient = new TieredStorageConsumerClient(this.tieredStorageConfiguration.getTierFactories(), arrayList, this.tieredStorageNettyService);
        }
        SingleInputGate singleInputGate = new SingleInputGate(ownerName, i, inputGateDeploymentDescriptor.getConsumedResultId(), inputGateDeploymentDescriptor.getConsumedPartitionType(), consumedSubpartitionIndexRange, calculateNumChannels(inputGateDeploymentDescriptor.getShuffleDescriptors().length, consumedSubpartitionIndexRange), partitionProducerStateProvider, createBufferPoolFactory, bufferDecompressor, this.networkBufferPool, this.networkBufferSize, new ThroughputCalculator(SystemClock.getInstance()), maybeCreateBufferDebloater(ownerName, i, inputGroup.addGroup(i)), tieredStorageConsumerClient, this.tieredStorageNettyService, arrayList);
        createInputChannels(ownerName, inputGateDeploymentDescriptor, singleInputGate, consumedSubpartitionIndexRange, createGateBuffersSpec, inputChannelMetrics);
        return singleInputGate;
    }

    private BufferDebloater maybeCreateBufferDebloater(String str, int i, MetricGroup metricGroup) {
        if (!this.debloatConfiguration.isEnabled()) {
            return null;
        }
        BufferDebloater bufferDebloater = new BufferDebloater(str, i, this.debloatConfiguration.getTargetTotalBufferSize().toMillis(), this.debloatConfiguration.getMaxBufferSize(), this.debloatConfiguration.getMinBufferSize(), this.debloatConfiguration.getBufferDebloatThresholdPercentages(), this.debloatConfiguration.getNumberOfSamples());
        metricGroup.gauge(MetricNames.ESTIMATED_TIME_TO_CONSUME_BUFFERS, (String) () -> {
            return Long.valueOf(bufferDebloater.getLastEstimatedTimeToConsumeBuffers().toMillis());
        });
        bufferDebloater.getClass();
        metricGroup.gauge(MetricNames.DEBLOATED_BUFFER_SIZE, (String) bufferDebloater::getLastBufferSize);
        return bufferDebloater;
    }

    private void createInputChannels(String str, InputGateDeploymentDescriptor inputGateDeploymentDescriptor, SingleInputGate singleInputGate, IndexRange indexRange, GateBuffersSpec gateBuffersSpec, InputChannelMetrics inputChannelMetrics) {
        ShuffleDescriptor[] shuffleDescriptors = inputGateDeploymentDescriptor.getShuffleDescriptors();
        InputChannel[] inputChannelArr = new InputChannel[calculateNumChannels(shuffleDescriptors.length, indexRange)];
        ChannelStatistics channelStatistics = new ChannelStatistics();
        int i = 0;
        for (ShuffleDescriptor shuffleDescriptor : shuffleDescriptors) {
            for (int startIndex = indexRange.getStartIndex(); startIndex <= indexRange.getEndIndex(); startIndex++) {
                inputChannelArr[i] = createInputChannel(singleInputGate, i, gateBuffersSpec.getEffectiveExclusiveBuffersPerChannel(), shuffleDescriptor, startIndex, channelStatistics, inputChannelMetrics);
                i++;
            }
        }
        singleInputGate.setInputChannels(inputChannelArr);
        LOG.debug("{}: Created {} input channels ({}).", str, Integer.valueOf(inputChannelArr.length), channelStatistics);
    }

    private InputChannel createInputChannel(SingleInputGate singleInputGate, int i, int i2, ShuffleDescriptor shuffleDescriptor, int i3, ChannelStatistics channelStatistics, InputChannelMetrics inputChannelMetrics) {
        return (InputChannel) ShuffleUtils.applyWithShuffleTypeCheck(NettyShuffleDescriptor.class, shuffleDescriptor, unknownShuffleDescriptor -> {
            channelStatistics.numUnknownChannels++;
            return new UnknownInputChannel(singleInputGate, i, unknownShuffleDescriptor.getResultPartitionID(), i3, this.partitionManager, this.taskEventPublisher, this.connectionManager, this.partitionRequestInitialBackoff, this.partitionRequestMaxBackoff, i2, inputChannelMetrics);
        }, nettyShuffleDescriptor -> {
            return createKnownInputChannel(singleInputGate, i, i2, nettyShuffleDescriptor, i3, channelStatistics, inputChannelMetrics);
        });
    }

    private static int calculateNumChannels(int i, IndexRange indexRange) {
        return MathUtils.checkedDownCast(i * indexRange.size());
    }

    @VisibleForTesting
    protected InputChannel createKnownInputChannel(SingleInputGate singleInputGate, int i, int i2, NettyShuffleDescriptor nettyShuffleDescriptor, int i3, ChannelStatistics channelStatistics, InputChannelMetrics inputChannelMetrics) {
        ResultPartitionID resultPartitionID = nettyShuffleDescriptor.getResultPartitionID();
        if (nettyShuffleDescriptor.isLocalTo(this.taskExecutorResourceId)) {
            channelStatistics.numLocalChannels++;
            return new LocalRecoveredInputChannel(singleInputGate, i, resultPartitionID, i3, this.partitionManager, this.taskEventPublisher, this.partitionRequestInitialBackoff, this.partitionRequestMaxBackoff, i2, inputChannelMetrics);
        }
        channelStatistics.numRemoteChannels++;
        return new RemoteRecoveredInputChannel(singleInputGate, i, resultPartitionID, i3, nettyShuffleDescriptor.getConnectionId(), this.connectionManager, this.partitionRequestInitialBackoff, this.partitionRequestMaxBackoff, i2, inputChannelMetrics);
    }

    @VisibleForTesting
    static SupplierWithException<BufferPool, IOException> createBufferPoolFactory(BufferPoolFactory bufferPoolFactory, int i, int i2) {
        Pair of = Pair.of(Integer.valueOf(i), Integer.valueOf(i2));
        return () -> {
            return bufferPoolFactory.createBufferPool(((Integer) of.getLeft()).intValue(), ((Integer) of.getRight()).intValue());
        };
    }
}
