package org.apache.flink.runtime.scheduler.adapter;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.agrona.concurrent.BackoffIdleStrategy;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.EdgeManager;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.failover.flip1.SchedulingPipelinedRegionComputeUtil;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalPipelinedRegion;
import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology;
import org.apache.flink.runtime.jobgraph.topology.LogicalEdge;
import org.apache.flink.runtime.jobgraph.topology.LogicalPipelinedRegion;
import org.apache.flink.runtime.jobgraph.topology.LogicalVertex;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.scheduler.SchedulingTopologyListener;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.ResultPartitionState;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.class */
public class DefaultExecutionTopology implements SchedulingTopology {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) DefaultExecutionTopology.class);
    private final EdgeManager edgeManager;
    private final Supplier<List<ExecutionVertexID>> sortedExecutionVertexIds;
    private final Map<JobVertexID, DefaultLogicalPipelinedRegion> logicalPipelinedRegionsByJobVertexId;
    private final List<SchedulingTopologyListener> schedulingTopologyListeners = new ArrayList();
    private final Map<ExecutionVertexID, DefaultExecutionVertex> executionVerticesById = new HashMap();
    private final List<DefaultExecutionVertex> executionVerticesList = new ArrayList();
    private final Map<IntermediateResultPartitionID, DefaultResultPartition> resultPartitionsById = new HashMap();
    private final Map<ExecutionVertexID, DefaultSchedulingPipelinedRegion> pipelinedRegionsByVertex = new HashMap();
    private final List<DefaultSchedulingPipelinedRegion> pipelinedRegions = new ArrayList();

    private DefaultExecutionTopology(Supplier<List<ExecutionVertexID>> supplier, EdgeManager edgeManager, Map<JobVertexID, DefaultLogicalPipelinedRegion> map) {
        this.sortedExecutionVertexIds = (Supplier) Preconditions.checkNotNull(supplier);
        this.edgeManager = (EdgeManager) Preconditions.checkNotNull(edgeManager);
        this.logicalPipelinedRegionsByJobVertexId = (Map) Preconditions.checkNotNull(map);
    }

    @Override // org.apache.flink.runtime.topology.BaseTopology
    public Iterable<DefaultExecutionVertex> getVertices() {
        return Collections.unmodifiableList(this.executionVerticesList);
    }

    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingTopology
    public DefaultExecutionVertex getVertex(ExecutionVertexID executionVertexID) {
        DefaultExecutionVertex defaultExecutionVertex = this.executionVerticesById.get(executionVertexID);
        if (defaultExecutionVertex == null) {
            throw new IllegalArgumentException("can not find vertex: " + executionVertexID);
        }
        return defaultExecutionVertex;
    }

    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingTopology
    public DefaultResultPartition getResultPartition(IntermediateResultPartitionID intermediateResultPartitionID) {
        DefaultResultPartition defaultResultPartition = this.resultPartitionsById.get(intermediateResultPartitionID);
        if (defaultResultPartition == null) {
            throw new IllegalArgumentException("can not find partition: " + intermediateResultPartitionID);
        }
        return defaultResultPartition;
    }

    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingTopology
    public void registerSchedulingTopologyListener(SchedulingTopologyListener schedulingTopologyListener) {
        Preconditions.checkNotNull(schedulingTopologyListener);
        this.schedulingTopologyListeners.add(schedulingTopologyListener);
    }

    @Override // org.apache.flink.runtime.topology.Topology
    public Iterable<? extends SchedulingPipelinedRegion> getAllPipelinedRegions() {
        Preconditions.checkNotNull(this.pipelinedRegions);
        return Collections.unmodifiableCollection(this.pipelinedRegions);
    }

    @Override // org.apache.flink.runtime.topology.Topology
    public DefaultSchedulingPipelinedRegion getPipelinedRegionOfVertex(ExecutionVertexID executionVertexID) {
        Preconditions.checkNotNull(this.pipelinedRegionsByVertex);
        DefaultSchedulingPipelinedRegion defaultSchedulingPipelinedRegion = this.pipelinedRegionsByVertex.get(executionVertexID);
        if (defaultSchedulingPipelinedRegion == null) {
            throw new IllegalArgumentException("Unknown execution vertex " + executionVertexID);
        }
        return defaultSchedulingPipelinedRegion;
    }

    public EdgeManager getEdgeManager() {
        return this.edgeManager;
    }

    private static Map<JobVertexID, DefaultLogicalPipelinedRegion> computeLogicalPipelinedRegionsByJobVertexId(ExecutionGraph executionGraph) {
        Iterable<? extends LogicalPipelinedRegion> allPipelinedRegions = DefaultLogicalTopology.fromTopologicallySortedJobVertices((List) IterableUtils.toStream(executionGraph.getVerticesTopologically()).map((v0) -> {
            return v0.getJobVertex();
        }).collect(Collectors.toList())).getAllPipelinedRegions();
        HashMap hashMap = new HashMap();
        Iterator<? extends LogicalPipelinedRegion> it = allPipelinedRegions.iterator();
        while (it.hasNext()) {
            DefaultLogicalPipelinedRegion defaultLogicalPipelinedRegion = (DefaultLogicalPipelinedRegion) it.next();
            Iterator<? extends LogicalVertex> it2 = defaultLogicalPipelinedRegion.getVertices().iterator();
            while (it2.hasNext()) {
                hashMap.put(it2.next().getId(), defaultLogicalPipelinedRegion);
            }
        }
        return hashMap;
    }

    public void notifyExecutionGraphUpdated(DefaultExecutionGraph defaultExecutionGraph, List<ExecutionJobVertex> list) {
        Preconditions.checkNotNull(defaultExecutionGraph, "execution graph can not be null");
        Set set = (Set) list.stream().map((v0) -> {
            return v0.getJobVertexId();
        }).collect(Collectors.toSet());
        list.stream().map((v0) -> {
            return v0.getJobVertex();
        }).flatMap(jobVertex -> {
            return jobVertex.getInputs().stream();
        }).map((v0) -> {
            return v0.getSource();
        }).filter(intermediateDataSet -> {
            return intermediateDataSet.getResultType().mustBePipelinedConsumed();
        }).map((v0) -> {
            return v0.getProducer();
        }).map((v0) -> {
            return v0.getID();
        }).forEach(jobVertexID -> {
            Preconditions.checkState(set.contains(jobVertexID));
        });
        Iterable<ExecutionVertex> iterable = (Iterable) list.stream().flatMap(executionJobVertex -> {
            return Stream.of((Object[]) executionJobVertex.getTaskVertices());
        }).collect(Collectors.toList());
        generateNewExecutionVerticesAndResultPartitions(iterable);
        generateNewPipelinedRegions(iterable);
        ensureCoLocatedVerticesInSameRegion(this.pipelinedRegions, defaultExecutionGraph);
        notifySchedulingTopologyUpdated(iterable);
    }

    private void notifySchedulingTopologyUpdated(Iterable<ExecutionVertex> iterable) {
        List<ExecutionVertexID> list = (List) IterableUtils.toStream(iterable).map((v0) -> {
            return v0.getID();
        }).collect(Collectors.toList());
        Iterator<SchedulingTopologyListener> it = this.schedulingTopologyListeners.iterator();
        while (it.hasNext()) {
            it.next().notifySchedulingTopologyUpdated(this, list);
        }
    }

    public static DefaultExecutionTopology fromExecutionGraph(DefaultExecutionGraph defaultExecutionGraph) {
        Preconditions.checkNotNull(defaultExecutionGraph, "execution graph can not be null");
        DefaultExecutionTopology defaultExecutionTopology = new DefaultExecutionTopology(() -> {
            return (List) IterableUtils.toStream(defaultExecutionGraph.getAllExecutionVertices()).map((v0) -> {
                return v0.getID();
            }).collect(Collectors.toList());
        }, defaultExecutionGraph.getEdgeManager(), computeLogicalPipelinedRegionsByJobVertexId(defaultExecutionGraph));
        defaultExecutionTopology.notifyExecutionGraphUpdated(defaultExecutionGraph, (List) IterableUtils.toStream(defaultExecutionGraph.getVerticesTopologically()).filter((v0) -> {
            return v0.isInitialized();
        }).collect(Collectors.toList()));
        return defaultExecutionTopology;
    }

    private void generateNewExecutionVerticesAndResultPartitions(Iterable<ExecutionVertex> iterable) {
        for (ExecutionVertex executionVertex : iterable) {
            Map<IntermediateResultPartitionID, IntermediateResultPartition> producedPartitions = executionVertex.getProducedPartitions();
            EdgeManager edgeManager = this.edgeManager;
            edgeManager.getClass();
            List<DefaultResultPartition> generateProducedSchedulingResultPartition = generateProducedSchedulingResultPartition(producedPartitions, edgeManager::getConsumerVertexGroupsForPartition);
            generateProducedSchedulingResultPartition.forEach(defaultResultPartition -> {
                this.resultPartitionsById.put(defaultResultPartition.getId(), defaultResultPartition);
            });
            List<ConsumedPartitionGroup> consumedPartitionGroupsForVertex = this.edgeManager.getConsumedPartitionGroupsForVertex(executionVertex.getID());
            Map<IntermediateResultPartitionID, DefaultResultPartition> map = this.resultPartitionsById;
            map.getClass();
            DefaultExecutionVertex generateSchedulingExecutionVertex = generateSchedulingExecutionVertex(executionVertex, generateProducedSchedulingResultPartition, consumedPartitionGroupsForVertex, (v1) -> {
                return r3.get(v1);
            });
            this.executionVerticesById.put(generateSchedulingExecutionVertex.getId(), generateSchedulingExecutionVertex);
        }
        this.executionVerticesList.clear();
        Iterator<ExecutionVertexID> it = this.sortedExecutionVertexIds.get().iterator();
        while (it.hasNext()) {
            this.executionVerticesList.add(this.executionVerticesById.get(it.next()));
        }
    }

    private static List<DefaultResultPartition> generateProducedSchedulingResultPartition(Map<IntermediateResultPartitionID, IntermediateResultPartition> map, Function<IntermediateResultPartitionID, List<ConsumerVertexGroup>> function) {
        ArrayList arrayList = new ArrayList(map.size());
        map.values().forEach(intermediateResultPartition -> {
            IntermediateResultPartitionID partitionId = intermediateResultPartition.getPartitionId();
            IntermediateDataSetID id = intermediateResultPartition.getIntermediateResult().getId();
            ResultPartitionType resultType = intermediateResultPartition.getResultType();
            Supplier supplier = () -> {
                return intermediateResultPartition.hasDataAllProduced() ? ResultPartitionState.ALL_DATA_PRODUCED : ResultPartitionState.CREATED;
            };
            Supplier supplier2 = () -> {
                return (List) function.apply(intermediateResultPartition.getPartitionId());
            };
            intermediateResultPartition.getClass();
            arrayList.add(new DefaultResultPartition(partitionId, id, resultType, supplier, supplier2, intermediateResultPartition::getConsumedPartitionGroups));
        });
        return arrayList;
    }

    private static DefaultExecutionVertex generateSchedulingExecutionVertex(ExecutionVertex executionVertex, List<DefaultResultPartition> list, List<ConsumedPartitionGroup> list2, Function<IntermediateResultPartitionID, DefaultResultPartition> function) {
        ExecutionVertexID id = executionVertex.getID();
        executionVertex.getClass();
        DefaultExecutionVertex defaultExecutionVertex = new DefaultExecutionVertex(id, list, executionVertex::getExecutionState, list2, function);
        list.forEach(defaultResultPartition -> {
            defaultResultPartition.setProducer(defaultExecutionVertex);
        });
        return defaultExecutionVertex;
    }

    private void generateNewPipelinedRegions(Iterable<ExecutionVertex> iterable) {
        Stream map = IterableUtils.toStream(iterable).map((v0) -> {
            return v0.getID();
        });
        Map<ExecutionVertexID, DefaultExecutionVertex> map2 = this.executionVerticesById;
        map2.getClass();
        Iterable<DefaultExecutionVertex> iterable2 = (Iterable) map.map((v1) -> {
            return r1.get(v1);
        }).collect(Collectors.toList());
        IdentityHashMap identityHashMap = new IdentityHashMap();
        for (DefaultExecutionVertex defaultExecutionVertex : iterable2) {
            ((List) identityHashMap.computeIfAbsent(this.logicalPipelinedRegionsByJobVertexId.get(defaultExecutionVertex.getId().getJobVertexId()), defaultLogicalPipelinedRegion -> {
                return new ArrayList();
            })).add(defaultExecutionVertex);
        }
        long nanoTime = System.nanoTime();
        Set<Set> newSetFromMap = Collections.newSetFromMap(new IdentityHashMap());
        for (Map.Entry entry : identityHashMap.entrySet()) {
            DefaultLogicalPipelinedRegion defaultLogicalPipelinedRegion2 = (DefaultLogicalPipelinedRegion) entry.getKey();
            List list = (List) entry.getValue();
            if (containsIntraRegionAllToAllEdge(defaultLogicalPipelinedRegion2)) {
                newSetFromMap.add(new HashSet(list));
            } else {
                Map<ExecutionVertexID, DefaultExecutionVertex> map3 = this.executionVerticesById;
                map3.getClass();
                Function function = (v1) -> {
                    return r2.get(v1);
                };
                Map<IntermediateResultPartitionID, DefaultResultPartition> map4 = this.resultPartitionsById;
                map4.getClass();
                newSetFromMap.addAll(SchedulingPipelinedRegionComputeUtil.computePipelinedRegions(list, function, (v1) -> {
                    return r3.get(v1);
                }));
            }
        }
        for (Set set : newSetFromMap) {
            Map<IntermediateResultPartitionID, DefaultResultPartition> map5 = this.resultPartitionsById;
            map5.getClass();
            DefaultSchedulingPipelinedRegion defaultSchedulingPipelinedRegion = new DefaultSchedulingPipelinedRegion(set, (v1) -> {
                return r3.get(v1);
            });
            this.pipelinedRegions.add(defaultSchedulingPipelinedRegion);
            Iterator it = set.iterator();
            while (it.hasNext()) {
                this.pipelinedRegionsByVertex.put(((SchedulingExecutionVertex) it.next()).getId(), defaultSchedulingPipelinedRegion);
            }
        }
        LOG.info("Built {} new pipelined regions in {} ms, total {} pipelined regions currently.", Integer.valueOf(newSetFromMap.size()), Long.valueOf((System.nanoTime() - nanoTime) / BackoffIdleStrategy.DEFAULT_MAX_PARK_PERIOD_NS), Integer.valueOf(this.pipelinedRegions.size()));
    }

    private static boolean containsIntraRegionAllToAllEdge(DefaultLogicalPipelinedRegion defaultLogicalPipelinedRegion) {
        Iterator<? extends LogicalVertex> it = defaultLogicalPipelinedRegion.getVertices().iterator();
        while (it.hasNext()) {
            for (LogicalEdge logicalEdge : it.next().getInputs()) {
                if (logicalEdge.getDistributionPattern() == DistributionPattern.ALL_TO_ALL && defaultLogicalPipelinedRegion.contains(logicalEdge.getProducerVertexId())) {
                    return true;
                }
            }
        }
        return false;
    }

    private static void ensureCoLocatedVerticesInSameRegion(List<DefaultSchedulingPipelinedRegion> list, ExecutionGraph executionGraph) {
        HashMap hashMap = new HashMap();
        Iterator<DefaultSchedulingPipelinedRegion> it = list.iterator();
        while (it.hasNext()) {
            DefaultSchedulingPipelinedRegion next = it.next();
            Iterator<? extends SchedulingExecutionVertex> it2 = next.getVertices().iterator();
            while (it2.hasNext()) {
                CoLocationConstraint coLocationConstraint = getCoLocationConstraint(((DefaultExecutionVertex) it2.next()).getId(), executionGraph);
                if (coLocationConstraint != null) {
                    DefaultSchedulingPipelinedRegion defaultSchedulingPipelinedRegion = (DefaultSchedulingPipelinedRegion) hashMap.get(coLocationConstraint);
                    Preconditions.checkState(defaultSchedulingPipelinedRegion == null || defaultSchedulingPipelinedRegion == next, "co-located tasks must be in the same pipelined region");
                    hashMap.putIfAbsent(coLocationConstraint, next);
                }
            }
        }
    }

    private static CoLocationConstraint getCoLocationConstraint(ExecutionVertexID executionVertexID, ExecutionGraph executionGraph) {
        CoLocationGroup coLocationGroup = ((ExecutionJobVertex) Objects.requireNonNull(executionGraph.getJobVertex(executionVertexID.getJobVertexId()))).getCoLocationGroup();
        if (coLocationGroup == null) {
            return null;
        }
        return coLocationGroup.getLocationConstraint(executionVertexID.getSubtaskIndex());
    }
}
