package org.apache.flink.runtime.scheduler.strategy;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.SchedulerOperations;
import org.apache.flink.runtime.scheduler.SchedulingTopologyListener;
import org.apache.flink.runtime.scheduler.strategy.InputConsumableDecider;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.class */
public class VertexwiseSchedulingStrategy implements SchedulingStrategy, SchedulingTopologyListener {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) VertexwiseSchedulingStrategy.class);
    private final SchedulerOperations schedulerOperations;
    private final SchedulingTopology schedulingTopology;
    private final Set<ExecutionVertexID> newVertices = new HashSet();
    private final Set<ExecutionVertexID> scheduledVertices = new HashSet();
    private final InputConsumableDecider inputConsumableDecider;

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy$Factory.class */
    public static class Factory implements SchedulingStrategyFactory {
        private final InputConsumableDecider.Factory inputConsumableDeciderFactory;

        public Factory(InputConsumableDecider.Factory factory) {
            this.inputConsumableDeciderFactory = factory;
        }

        @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory
        public SchedulingStrategy createInstance(SchedulerOperations schedulerOperations, SchedulingTopology schedulingTopology) {
            return new VertexwiseSchedulingStrategy(schedulerOperations, schedulingTopology, this.inputConsumableDeciderFactory);
        }
    }

    public VertexwiseSchedulingStrategy(SchedulerOperations schedulerOperations, SchedulingTopology schedulingTopology, InputConsumableDecider.Factory factory) {
        this.schedulerOperations = (SchedulerOperations) Preconditions.checkNotNull(schedulerOperations);
        this.schedulingTopology = (SchedulingTopology) Preconditions.checkNotNull(schedulingTopology);
        Set<ExecutionVertexID> set = this.scheduledVertices;
        set.getClass();
        this.inputConsumableDecider = factory.createInstance(schedulingTopology, (v1) -> {
            return r3.contains(v1);
        });
        LOG.info("Using InputConsumableDecider {} for VertexwiseSchedulingStrategy.", this.inputConsumableDecider.getClass().getName());
        schedulingTopology.registerSchedulingTopologyListener(this);
    }

    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy
    public void startScheduling() {
        maybeScheduleVertices((Set) IterableUtils.toStream(this.schedulingTopology.getVertices()).filter(schedulingExecutionVertex -> {
            return schedulingExecutionVertex.getConsumedPartitionGroups().isEmpty();
        }).map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toSet()));
    }

    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy
    public void restartTasks(Set<ExecutionVertexID> set) {
        this.scheduledVertices.removeAll(set);
        maybeScheduleVertices(set);
    }

    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy
    public void onExecutionStateChange(ExecutionVertexID executionVertexID, ExecutionState executionState) {
        if (executionState == ExecutionState.FINISHED) {
            maybeScheduleVertices((Set) IterableUtils.toStream(this.schedulingTopology.getVertex(executionVertexID).getProducedResults()).map((v0) -> {
                return v0.getConsumerVertexGroups();
            }).flatMap((v0) -> {
                return v0.stream();
            }).filter(consumerVertexGroup -> {
                return this.inputConsumableDecider.isConsumableBasedOnFinishedProducers(consumerVertexGroup.getConsumedPartitionGroup());
            }).flatMap((v0) -> {
                return IterableUtils.toStream(v0);
            }).collect(Collectors.toSet()));
        }
    }

    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy
    public void onPartitionConsumable(IntermediateResultPartitionID intermediateResultPartitionID) {
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulingTopologyListener
    public void notifySchedulingTopologyUpdated(SchedulingTopology schedulingTopology, List<ExecutionVertexID> list) {
        Preconditions.checkState(schedulingTopology == this.schedulingTopology);
        this.newVertices.addAll(list);
    }

    private void maybeScheduleVertices(Set<ExecutionVertexID> set) {
        Set<ExecutionVertexID> hashSet;
        if (this.newVertices.isEmpty()) {
            hashSet = set;
        } else {
            hashSet = new HashSet(set);
            hashSet.addAll(this.newVertices);
            this.newVertices.clear();
        }
        HashSet hashSet2 = new HashSet();
        Set<ExecutionVertexID> set2 = hashSet;
        while (true) {
            Set<ExecutionVertexID> set3 = set2;
            if (set3.isEmpty()) {
                scheduleVerticesOneByOne(hashSet2);
                this.scheduledVertices.addAll(hashSet2);
                return;
            }
            set2 = addToScheduleAndGetVertices(set3, hashSet2);
        }
    }

    private Set<ExecutionVertexID> addToScheduleAndGetVertices(Set<ExecutionVertexID> set, Set<ExecutionVertexID> set2) {
        HashSet hashSet = new HashSet();
        IdentityHashMap identityHashMap = new IdentityHashMap();
        Set newSetFromMap = Collections.newSetFromMap(new IdentityHashMap());
        for (ExecutionVertexID executionVertexID : set) {
            if (isVertexSchedulable(executionVertexID, identityHashMap, set2)) {
                set2.add(executionVertexID);
                for (ConsumerVertexGroup consumerVertexGroup : (Set) IterableUtils.toStream(this.schedulingTopology.getVertex(executionVertexID).getProducedResults()).map((v0) -> {
                    return v0.getConsumerVertexGroups();
                }).flatMap((v0) -> {
                    return v0.stream();
                }).filter(consumerVertexGroup2 -> {
                    return consumerVertexGroup2.getResultPartitionType().canBePipelinedConsumed();
                }).collect(Collectors.toSet())) {
                    if (!newSetFromMap.contains(consumerVertexGroup)) {
                        newSetFromMap.add(consumerVertexGroup);
                        hashSet.addAll((Collection) IterableUtils.toStream(consumerVertexGroup).collect(Collectors.toSet()));
                    }
                }
            }
        }
        return hashSet;
    }

    private boolean isVertexSchedulable(ExecutionVertexID executionVertexID, Map<ConsumedPartitionGroup, Boolean> map, Set<ExecutionVertexID> set) {
        return (set.contains(executionVertexID) || this.scheduledVertices.contains(executionVertexID) || !this.inputConsumableDecider.isInputConsumable(this.schedulingTopology.getVertex(executionVertexID), set, map)) ? false : true;
    }

    private void scheduleVerticesOneByOne(Set<ExecutionVertexID> set) {
        if (set.isEmpty()) {
            return;
        }
        SchedulingStrategyUtils.sortExecutionVerticesInTopologicalOrder(this.schedulingTopology, set).forEach(executionVertexID -> {
            this.schedulerOperations.allocateSlotsAndDeploy(Collections.singletonList(executionVertexID));
        });
    }
}
