package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageNettyServiceImpl.class */
public class TieredStorageNettyServiceImpl implements TieredStorageNettyService {
    private final Map<TieredStoragePartitionId, List<NettyServiceProducer>> registeredServiceProducers = new ConcurrentHashMap();
    private final Map<TieredStoragePartitionId, Map<TieredStorageSubpartitionId, List<NettyConnectionReaderRegistration>>> nettyConnectionReaderRegistrations = new HashMap();
    private final TieredStorageResourceRegistry resourceRegistry;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageNettyServiceImpl$NettyConnectionReaderRegistration.class */
    public static class NettyConnectionReaderRegistration {
        private int channelIndex;

        @Nullable
        private Supplier<InputChannel> channelSupplier;

        @Nullable
        private CompletableFuture<NettyConnectionReader> readerFuture;

        private NettyConnectionReaderRegistration() {
            this.channelIndex = -1;
        }

        public boolean trySetChannel(int i, Supplier<InputChannel> supplier) {
            if (isChannelSet()) {
                return false;
            }
            Preconditions.checkArgument(i >= 0);
            this.channelIndex = i;
            this.channelSupplier = (Supplier) Preconditions.checkNotNull(supplier);
            tryCreateNettyConnectionReader();
            return true;
        }

        public Optional<CompletableFuture<NettyConnectionReader>> trySetConsumer() {
            if (isReaderSet()) {
                tryCreateNettyConnectionReader();
                return Optional.empty();
            }
            this.readerFuture = new CompletableFuture<>();
            return Optional.of(this.readerFuture);
        }

        void tryCreateNettyConnectionReader() {
            if (isChannelSet() && isReaderSet()) {
                this.readerFuture.complete(new NettyConnectionReaderImpl(this.channelSupplier));
            }
        }

        private boolean isChannelSet() {
            return this.channelIndex >= 0;
        }

        private boolean isReaderSet() {
            return this.readerFuture != null;
        }
    }

    public TieredStorageNettyServiceImpl(TieredStorageResourceRegistry tieredStorageResourceRegistry) {
        this.resourceRegistry = tieredStorageResourceRegistry;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService
    public void registerProducer(TieredStoragePartitionId tieredStoragePartitionId, NettyServiceProducer nettyServiceProducer) {
        this.registeredServiceProducers.computeIfAbsent(tieredStoragePartitionId, tieredStoragePartitionId2 -> {
            this.resourceRegistry.registerResource(tieredStoragePartitionId, () -> {
                this.registeredServiceProducers.remove(tieredStoragePartitionId);
            });
            return new ArrayList();
        }).add(nettyServiceProducer);
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService
    public CompletableFuture<NettyConnectionReader> registerConsumer(TieredStoragePartitionId tieredStoragePartitionId, TieredStorageSubpartitionId tieredStorageSubpartitionId) {
        List<NettyConnectionReaderRegistration> readerRegistration = getReaderRegistration(tieredStoragePartitionId, tieredStorageSubpartitionId);
        Iterator<NettyConnectionReaderRegistration> it = readerRegistration.iterator();
        while (it.hasNext()) {
            Optional<CompletableFuture<NettyConnectionReader>> trySetConsumer = it.next().trySetConsumer();
            if (trySetConsumer.isPresent()) {
                return trySetConsumer.get();
            }
        }
        NettyConnectionReaderRegistration nettyConnectionReaderRegistration = new NettyConnectionReaderRegistration();
        readerRegistration.add(nettyConnectionReaderRegistration);
        return nettyConnectionReaderRegistration.trySetConsumer().get();
    }

    public ResultSubpartitionView createResultSubpartitionView(TieredStoragePartitionId tieredStoragePartitionId, TieredStorageSubpartitionId tieredStorageSubpartitionId, BufferAvailabilityListener bufferAvailabilityListener) {
        List<NettyServiceProducer> list = this.registeredServiceProducers.get(tieredStoragePartitionId);
        if (list == null) {
            return new TieredStorageResultSubpartitionView(bufferAvailabilityListener, new ArrayList(), new ArrayList(), new ArrayList());
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (NettyServiceProducer nettyServiceProducer : list) {
            NettyPayloadManager nettyPayloadManager = new NettyPayloadManager();
            NettyConnectionWriterImpl nettyConnectionWriterImpl = new NettyConnectionWriterImpl(nettyPayloadManager, bufferAvailabilityListener);
            nettyServiceProducer.connectionEstablished(tieredStorageSubpartitionId, nettyConnectionWriterImpl);
            arrayList2.add(nettyConnectionWriterImpl.getNettyConnectionId());
            arrayList.add(nettyPayloadManager);
        }
        return new TieredStorageResultSubpartitionView(bufferAvailabilityListener, arrayList, arrayList2, this.registeredServiceProducers.get(tieredStoragePartitionId));
    }

    public void setupInputChannels(List<TieredStorageConsumerSpec> list, List<Supplier<InputChannel>> list2) {
        Preconditions.checkState(list.size() == list2.size());
        for (int i = 0; i < list.size(); i++) {
            setupInputChannel(i, list.get(i).getPartitionId(), list.get(i).getSubpartitionId(), list2.get(i));
        }
    }

    private void setupInputChannel(int i, TieredStoragePartitionId tieredStoragePartitionId, TieredStorageSubpartitionId tieredStorageSubpartitionId, Supplier<InputChannel> supplier) {
        List<NettyConnectionReaderRegistration> readerRegistration = getReaderRegistration(tieredStoragePartitionId, tieredStorageSubpartitionId);
        boolean z = false;
        Iterator<NettyConnectionReaderRegistration> it = readerRegistration.iterator();
        while (it.hasNext()) {
            if (it.next().trySetChannel(i, supplier)) {
                z = true;
            }
        }
        if (z) {
            removeRegistration(tieredStoragePartitionId, tieredStorageSubpartitionId);
            return;
        }
        NettyConnectionReaderRegistration nettyConnectionReaderRegistration = new NettyConnectionReaderRegistration();
        nettyConnectionReaderRegistration.trySetChannel(i, supplier);
        readerRegistration.add(nettyConnectionReaderRegistration);
    }

    private void removeRegistration(TieredStoragePartitionId tieredStoragePartitionId, TieredStorageSubpartitionId tieredStorageSubpartitionId) {
        Map<TieredStorageSubpartitionId, List<NettyConnectionReaderRegistration>> map = this.nettyConnectionReaderRegistrations.get(tieredStoragePartitionId);
        map.remove(tieredStorageSubpartitionId);
        if (map.isEmpty()) {
            this.nettyConnectionReaderRegistrations.remove(tieredStoragePartitionId);
        }
    }

    private List<NettyConnectionReaderRegistration> getReaderRegistration(TieredStoragePartitionId tieredStoragePartitionId, TieredStorageSubpartitionId tieredStorageSubpartitionId) {
        return this.nettyConnectionReaderRegistrations.computeIfAbsent(tieredStoragePartitionId, tieredStoragePartitionId2 -> {
            return new HashMap();
        }).computeIfAbsent(tieredStorageSubpartitionId, tieredStorageSubpartitionId2 -> {
            return new ArrayList();
        });
    }
}
