package org.apache.flink.runtime.scheduler.adaptive;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.DefaultVertexAttemptNumberStore;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.MutableVertexAttemptNumberStore;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.failure.DefaultFailureEnricherContext;
import org.apache.flink.runtime.failure.FailureEnricherUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.scheduler.DefaultVertexParallelismInfo;
import org.apache.flink.runtime.scheduler.DefaultVertexParallelismStore;
import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.scheduler.JobStatusStore;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.SchedulerUtils;
import org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.runtime.scheduler.adaptive.Canceling;
import org.apache.flink.runtime.scheduler.adaptive.Created;
import org.apache.flink.runtime.scheduler.adaptive.CreatingExecutionGraph;
import org.apache.flink.runtime.scheduler.adaptive.Executing;
import org.apache.flink.runtime.scheduler.adaptive.Failing;
import org.apache.flink.runtime.scheduler.adaptive.Finished;
import org.apache.flink.runtime.scheduler.adaptive.Restarting;
import org.apache.flink.runtime.scheduler.adaptive.StopWithSavepoint;
import org.apache.flink.runtime.scheduler.adaptive.WaitingForResources;
import org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation;
import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
import org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots;
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
import org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.EnforceMinimalIncreaseRescalingController;
import org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.RescalingController;
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.metrics.DeploymentStateTimeMetrics;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.util.BoundedFIFOQueue;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.class */
public class AdaptiveScheduler implements SchedulerNG, Created.Context, WaitingForResources.Context, CreatingExecutionGraph.Context, Executing.Context, Restarting.Context, Failing.Context, Finished.Context, StopWithSavepoint.Context {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AdaptiveScheduler.class);
    private final JobGraph jobGraph;
    private final VertexParallelismStore initialParallelismStore;
    private final DeclarativeSlotPool declarativeSlotPool;
    private final long initializationTimestamp;
    private final Executor ioExecutor;
    private final ClassLoader userCodeClassLoader;
    private final CheckpointsCleaner checkpointsCleaner;
    private final CompletedCheckpointStore completedCheckpointStore;
    private final CheckpointIDCounter checkpointIdCounter;
    private final RestartBackoffTimeStrategy restartBackoffTimeStrategy;
    private final ComponentMainThreadExecutor componentMainThreadExecutor;
    private final FatalErrorHandler fatalErrorHandler;
    private final Collection<FailureEnricher> failureEnrichers;
    private final Collection<JobStatusListener> jobStatusListeners;
    private final SlotAllocator slotAllocator;
    private final RescalingController rescalingController;
    private final Duration initialResourceAllocationTimeout;
    private final Duration resourceStabilizationTimeout;
    private final ExecutionGraphFactory executionGraphFactory;
    private final SchedulerExecutionMode executionMode;
    private final DeploymentStateTimeMetrics deploymentTimeMetrics;
    private final BoundedFIFOQueue<RootExceptionHistoryEntry> exceptionHistory;
    private JobGraphJobInformation jobInformation;
    private final JobManagerJobMetricGroup jobManagerJobMetricGroup;
    private final Duration slotIdleTimeout;
    private final CompletableFuture<JobStatus> jobTerminationFuture = new CompletableFuture<>();
    private State state = new Created(this, LOG);
    private boolean isTransitioningState = false;
    private int numRestarts = 0;
    private final MutableVertexAttemptNumberStore vertexAttemptNumberStore = new DefaultVertexAttemptNumberStore();
    private BackgroundTask<ExecutionGraph> backgroundTask = BackgroundTask.finishedBackgroundTask();
    private ResourceCounter desiredResources = ResourceCounter.empty();

    public AdaptiveScheduler(JobGraph jobGraph, @Nullable JobResourceRequirements jobResourceRequirements, Configuration configuration, DeclarativeSlotPool declarativeSlotPool, SlotAllocator slotAllocator, Executor executor, ClassLoader classLoader, CheckpointsCleaner checkpointsCleaner, CheckpointRecoveryFactory checkpointRecoveryFactory, Duration duration, Duration duration2, JobManagerJobMetricGroup jobManagerJobMetricGroup, RestartBackoffTimeStrategy restartBackoffTimeStrategy, long j, ComponentMainThreadExecutor componentMainThreadExecutor, FatalErrorHandler fatalErrorHandler, JobStatusListener jobStatusListener, Collection<FailureEnricher> collection, ExecutionGraphFactory executionGraphFactory) throws JobExecutionException {
        assertPreconditions(jobGraph);
        this.jobGraph = jobGraph;
        this.executionMode = (SchedulerExecutionMode) configuration.get(JobManagerOptions.SCHEDULER_MODE);
        VertexParallelismStore computeVertexParallelismStore = computeVertexParallelismStore(jobGraph, this.executionMode);
        computeVertexParallelismStore = jobResourceRequirements != null ? DefaultVertexParallelismStore.applyJobResourceRequirements(computeVertexParallelismStore, jobResourceRequirements).orElse(computeVertexParallelismStore) : computeVertexParallelismStore;
        this.initialParallelismStore = computeVertexParallelismStore;
        this.jobInformation = new JobGraphJobInformation(jobGraph, computeVertexParallelismStore);
        this.declarativeSlotPool = declarativeSlotPool;
        this.initializationTimestamp = j;
        this.ioExecutor = executor;
        this.userCodeClassLoader = classLoader;
        this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
        this.fatalErrorHandler = fatalErrorHandler;
        this.checkpointsCleaner = checkpointsCleaner;
        this.completedCheckpointStore = SchedulerUtils.createCompletedCheckpointStoreIfCheckpointingIsEnabled(jobGraph, configuration, checkpointRecoveryFactory, executor, LOG);
        this.checkpointIdCounter = SchedulerUtils.createCheckpointIDCounterIfCheckpointingIsEnabled(jobGraph, checkpointRecoveryFactory);
        this.slotAllocator = slotAllocator;
        declarativeSlotPool.registerNewSlotsListener(this::newResourcesAvailable);
        this.componentMainThreadExecutor = componentMainThreadExecutor;
        this.rescalingController = new EnforceMinimalIncreaseRescalingController(configuration);
        this.initialResourceAllocationTimeout = duration;
        this.resourceStabilizationTimeout = duration2;
        this.executionGraphFactory = executionGraphFactory;
        JobStatusStore jobStatusStore = new JobStatusStore(j);
        ArrayList arrayList = new ArrayList();
        arrayList.add(Preconditions.checkNotNull(jobStatusListener));
        arrayList.add(jobStatusStore);
        MetricOptions.JobStatusMetricsSettings fromConfiguration = MetricOptions.JobStatusMetricsSettings.fromConfiguration(configuration);
        this.deploymentTimeMetrics = new DeploymentStateTimeMetrics(jobGraph.getJobType(), fromConfiguration);
        Gauge gauge = () -> {
            return Long.valueOf(this.numRestarts);
        };
        DeploymentStateTimeMetrics deploymentStateTimeMetrics = this.deploymentTimeMetrics;
        arrayList.getClass();
        SchedulerBase.registerJobMetrics(jobManagerJobMetricGroup, jobStatusStore, gauge, deploymentStateTimeMetrics, (v1) -> {
            r4.add(v1);
        }, j, fromConfiguration);
        this.jobStatusListeners = Collections.unmodifiableCollection(arrayList);
        this.failureEnrichers = collection;
        this.exceptionHistory = new BoundedFIFOQueue<>(configuration.getInteger(WebOptions.MAX_EXCEPTION_HISTORY_SIZE));
        this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
        this.slotIdleTimeout = Duration.ofMillis(((Long) configuration.get(JobManagerOptions.SLOT_IDLE_TIMEOUT)).longValue());
    }

    private static void assertPreconditions(JobGraph jobGraph) throws RuntimeException {
        Preconditions.checkState(jobGraph.getJobType() == JobType.STREAMING, "The adaptive scheduler only supports streaming jobs.");
        for (JobVertex jobVertex : jobGraph.getVertices()) {
            Preconditions.checkState(jobVertex.getParallelism() > 0, "The adaptive scheduler expects the parallelism being set for each JobVertex (violated JobVertex: %s).", jobVertex.getID());
            for (JobEdge jobEdge : jobVertex.getInputs()) {
                Preconditions.checkState(jobEdge.getSource().getResultType().isPipelinedOrPipelinedBoundedResultPartition(), "The adaptive scheduler supports pipelined data exchanges (violated by %s -> %s).", jobEdge.getSource().getProducer(), jobEdge.getTarget().getID());
            }
        }
    }

    @VisibleForTesting
    static VertexParallelismStore computeReactiveModeVertexParallelismStore(Iterable<JobVertex> iterable, Function<JobVertex, Integer> function, boolean z) {
        DefaultVertexParallelismStore defaultVertexParallelismStore = new DefaultVertexParallelismStore();
        for (JobVertex jobVertex : iterable) {
            int intValue = jobVertex.getMaxParallelism() == -1 ? function.apply(jobVertex).intValue() : jobVertex.getMaxParallelism();
            defaultVertexParallelismStore.setParallelismInfo(jobVertex.getID(), new DefaultVertexParallelismInfo(z ? intValue : jobVertex.getParallelism(), intValue, num -> {
                return num.intValue() >= intValue ? Optional.empty() : Optional.of("Cannot lower max parallelism in Reactive mode.");
            }));
        }
        return defaultVertexParallelismStore;
    }

    private static VertexParallelismStore computeVertexParallelismStore(JobGraph jobGraph, SchedulerExecutionMode schedulerExecutionMode) {
        return schedulerExecutionMode == SchedulerExecutionMode.REACTIVE ? computeReactiveModeVertexParallelismStore(jobGraph.getVertices(), SchedulerBase::getDefaultMaxParallelism, true) : SchedulerBase.computeVertexParallelismStore(jobGraph);
    }

    @VisibleForTesting
    static VertexParallelismStore computeVertexParallelismStoreForExecution(JobGraph jobGraph, SchedulerExecutionMode schedulerExecutionMode, Function<JobVertex, Integer> function) {
        return schedulerExecutionMode == SchedulerExecutionMode.REACTIVE ? computeReactiveModeVertexParallelismStore(jobGraph.getVertices(), function, false) : SchedulerBase.computeVertexParallelismStore(jobGraph.getVertices(), function);
    }

    private void newResourcesAvailable(Collection<? extends PhysicalSlot> collection) {
        this.state.tryRun(ResourceListener.class, (v0) -> {
            v0.onNewResourcesAvailable();
        }, "newResourcesAvailable");
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void startScheduling() {
        checkIdleSlotTimeout();
        ((Created) this.state.as(Created.class).orElseThrow(() -> {
            return new IllegalStateException("Can only start scheduling when being in Created state.");
        })).startScheduling();
    }

    @Override // org.apache.flink.util.AutoCloseableAsync
    public CompletableFuture<Void> closeAsync() {
        LOG.debug("Closing the AdaptiveScheduler. Trying to suspend the current job execution.");
        this.state.suspend(new FlinkException("AdaptiveScheduler is being stopped."));
        Preconditions.checkState(this.state instanceof Finished, "Scheduler state should be finished after calling state.suspend.");
        this.backgroundTask.abort();
        CompletableFuture<Void> runAfterwardsAsync = FutureUtils.runAfterwardsAsync(this.backgroundTask.getTerminationFuture(), () -> {
            stopCheckpointServicesSafely(this.jobTerminationFuture.get());
        }, getMainThreadExecutor());
        CheckpointsCleaner checkpointsCleaner = this.checkpointsCleaner;
        checkpointsCleaner.getClass();
        return FutureUtils.composeAfterwards(runAfterwardsAsync, checkpointsCleaner::closeAsync);
    }

    private void stopCheckpointServicesSafely(JobStatus jobStatus) {
        LOG.debug("Stopping the checkpoint services with state {}.", jobStatus);
        Exception exc = null;
        try {
            this.completedCheckpointStore.shutdown(jobStatus, this.checkpointsCleaner);
        } catch (Exception e) {
            exc = e;
        }
        try {
            this.checkpointIdCounter.shutdown(jobStatus).get();
        } catch (Exception e2) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
        }
        if (exc != null) {
            LOG.warn("Failed to stop checkpoint services.", (Throwable) exc);
        }
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void cancel() {
        this.state.cancel();
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public CompletableFuture<JobStatus> getJobTerminationFuture() {
        return this.jobTerminationFuture;
    }

    @Override // org.apache.flink.runtime.scheduler.GlobalFailureHandler
    public void handleGlobalFailure(Throwable th) {
        this.state.handleGlobalFailure(th, FailureEnricherUtils.labelFailure(th, DefaultFailureEnricherContext.forGlobalFailure(this.jobInformation.getJobID(), this.jobInformation.getName(), this.jobManagerJobMetricGroup, this.ioExecutor, this.userCodeClassLoader), getMainThreadExecutor(), this.failureEnrichers));
    }

    private CompletableFuture<Map<String, String>> labelFailure(TaskExecutionStateTransition taskExecutionStateTransition) {
        return (taskExecutionStateTransition.getExecutionState() != ExecutionState.FAILED || this.failureEnrichers.isEmpty()) ? FailureEnricherUtils.EMPTY_FAILURE_LABELS : FailureEnricherUtils.labelFailure(taskExecutionStateTransition.getError(this.userCodeClassLoader), DefaultFailureEnricherContext.forTaskFailure(this.jobGraph.getJobID(), this.jobGraph.getName(), this.jobManagerJobMetricGroup, this.ioExecutor, this.userCodeClassLoader), getMainThreadExecutor(), this.failureEnrichers);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionStateTransition) {
        return ((Boolean) this.state.tryCall(StateWithExecutionGraph.class, stateWithExecutionGraph -> {
            return Boolean.valueOf(stateWithExecutionGraph.updateTaskExecutionState(taskExecutionStateTransition, labelFailure(taskExecutionStateTransition)));
        }, "updateTaskExecutionState").orElse(false)).booleanValue();
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public SerializedInputSplit requestNextInputSplit(JobVertexID jobVertexID, ExecutionAttemptID executionAttemptID) throws IOException {
        return (SerializedInputSplit) this.state.tryCall(StateWithExecutionGraph.class, stateWithExecutionGraph -> {
            return stateWithExecutionGraph.requestNextInputSplit(jobVertexID, executionAttemptID);
        }, "requestNextInputSplit").orElseThrow(() -> {
            return new IOException("Scheduler is currently not executing the job.");
        });
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public ExecutionState requestPartitionState(IntermediateDataSetID intermediateDataSetID, ResultPartitionID resultPartitionID) throws PartitionProducerDisposedException {
        return (ExecutionState) this.state.tryCall(StateWithExecutionGraph.class, stateWithExecutionGraph -> {
            return stateWithExecutionGraph.requestPartitionState(intermediateDataSetID, resultPartitionID);
        }, "requestPartitionState").orElseThrow(() -> {
            return new PartitionProducerDisposedException(resultPartitionID);
        });
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public ExecutionGraphInfo requestJob() {
        return new ExecutionGraphInfo(this.state.getJob(), this.exceptionHistory.toArrayList());
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public CheckpointStatsSnapshot requestCheckpointStats() {
        return this.state.getJob().getCheckpointStatsSnapshot();
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.StateWithExecutionGraph.Context
    public void archiveFailure(RootExceptionHistoryEntry rootExceptionHistoryEntry) {
        this.exceptionHistory.add(rootExceptionHistoryEntry);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public JobStatus requestJobStatus() {
        return this.state.getJobStatus();
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public JobDetails requestJobDetails() {
        return JobDetails.createDetailsForJob(this.state.getJob());
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public KvStateLocation requestKvStateLocation(JobID jobID, String str) throws UnknownKvStateLocation, FlinkJobNotFoundException {
        Optional as = this.state.as(StateWithExecutionGraph.class);
        if (as.isPresent()) {
            return ((StateWithExecutionGraph) as.get()).requestKvStateLocation(jobID, str);
        }
        throw new UnknownKvStateLocation(str);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void notifyKvStateRegistered(JobID jobID, JobVertexID jobVertexID, KeyGroupRange keyGroupRange, String str, KvStateID kvStateID, InetSocketAddress inetSocketAddress) throws FlinkJobNotFoundException {
        this.state.tryRun(StateWithExecutionGraph.class, stateWithExecutionGraph -> {
            stateWithExecutionGraph.notifyKvStateRegistered(jobID, jobVertexID, keyGroupRange, str, kvStateID, inetSocketAddress);
        }, "notifyKvStateRegistered");
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void notifyKvStateUnregistered(JobID jobID, JobVertexID jobVertexID, KeyGroupRange keyGroupRange, String str) throws FlinkJobNotFoundException {
        this.state.tryRun(StateWithExecutionGraph.class, stateWithExecutionGraph -> {
            stateWithExecutionGraph.notifyKvStateUnregistered(jobID, jobVertexID, keyGroupRange, str);
        }, "notifyKvStateUnregistered");
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
        this.state.tryRun(StateWithExecutionGraph.class, stateWithExecutionGraph -> {
            stateWithExecutionGraph.updateAccumulators(accumulatorSnapshot);
        }, "updateAccumulators");
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public CompletableFuture<String> triggerSavepoint(@Nullable String str, boolean z, SavepointFormatType savepointFormatType) {
        return (CompletableFuture) this.state.tryCall(StateWithExecutionGraph.class, stateWithExecutionGraph -> {
            return stateWithExecutionGraph.triggerSavepoint(str, z, savepointFormatType);
        }, "triggerSavepoint").orElse(FutureUtils.completedExceptionally(new CheckpointException("The Flink job is currently not executing.", CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE)));
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType checkpointType) {
        return (CompletableFuture) this.state.tryCall(StateWithExecutionGraph.class, stateWithExecutionGraph -> {
            return stateWithExecutionGraph.triggerCheckpoint(checkpointType);
        }, "triggerCheckpoint").orElse(FutureUtils.completedExceptionally(new CheckpointException("The Flink job is currently not executing.", CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE)));
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long j, CheckpointMetrics checkpointMetrics, TaskStateSnapshot taskStateSnapshot) {
        this.state.tryRun(StateWithExecutionGraph.class, stateWithExecutionGraph -> {
            stateWithExecutionGraph.acknowledgeCheckpoint(jobID, executionAttemptID, j, checkpointMetrics, taskStateSnapshot);
        }, "acknowledgeCheckpoint");
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void notifyEndOfData(ExecutionAttemptID executionAttemptID) {
        this.state.tryRun(StateWithExecutionGraph.class, stateWithExecutionGraph -> {
            stateWithExecutionGraph.notifyEndOfData(executionAttemptID);
        }, "notifyEndOfData");
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void reportCheckpointMetrics(JobID jobID, ExecutionAttemptID executionAttemptID, long j, CheckpointMetrics checkpointMetrics) {
        this.state.tryRun(StateWithExecutionGraph.class, stateWithExecutionGraph -> {
            stateWithExecutionGraph.reportCheckpointMetrics(executionAttemptID, j, checkpointMetrics);
        }, "reportCheckpointMetrics");
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void declineCheckpoint(DeclineCheckpoint declineCheckpoint) {
        this.state.tryRun(StateWithExecutionGraph.class, stateWithExecutionGraph -> {
            stateWithExecutionGraph.declineCheckpoint(declineCheckpoint);
        }, "declineCheckpoint");
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public CompletableFuture<String> stopWithSavepoint(@Nullable String str, boolean z, SavepointFormatType savepointFormatType) {
        return (CompletableFuture) this.state.tryCall(Executing.class, executing -> {
            return executing.stopWithSavepoint(str, z, savepointFormatType);
        }, "stopWithSavepoint").orElse(FutureUtils.completedExceptionally(new CheckpointException("The Flink job is currently not executing.", CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE)));
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void deliverOperatorEventToCoordinator(ExecutionAttemptID executionAttemptID, OperatorID operatorID, OperatorEvent operatorEvent) throws FlinkException {
        ((StateWithExecutionGraph) this.state.as(StateWithExecutionGraph.class).orElseThrow(() -> {
            return new TaskNotRunningException("Task is not known or in state running on the JobManager.");
        })).deliverOperatorEventToCoordinator(executionAttemptID, operatorID, operatorEvent);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(OperatorID operatorID, CoordinationRequest coordinationRequest) throws FlinkException {
        return (CompletableFuture) this.state.tryCall(StateWithExecutionGraph.class, stateWithExecutionGraph -> {
            return stateWithExecutionGraph.deliverCoordinationRequestToCoordinator(operatorID, coordinationRequest);
        }, "deliverCoordinationRequestToCoordinator").orElseGet(() -> {
            return FutureUtils.completedExceptionally(new FlinkException("Coordinator of operator " + operatorID + " does not exist"));
        });
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public JobResourceRequirements requestJobResourceRequirements() {
        JobResourceRequirements.Builder newBuilder = JobResourceRequirements.newBuilder();
        for (JobInformation.VertexInformation vertexInformation : this.jobInformation.getVertices()) {
            newBuilder.setParallelismForJobVertex(vertexInformation.getJobVertexID(), vertexInformation.getMinParallelism(), vertexInformation.getParallelism());
        }
        return newBuilder.build();
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void updateJobResourceRequirements(JobResourceRequirements jobResourceRequirements) {
        if (this.executionMode == SchedulerExecutionMode.REACTIVE) {
            throw new UnsupportedOperationException("Cannot change the parallelism of a job running in reactive mode.");
        }
        Optional<VertexParallelismStore> applyJobResourceRequirements = DefaultVertexParallelismStore.applyJobResourceRequirements(this.jobInformation.getVertexParallelismStore(), jobResourceRequirements);
        if (applyJobResourceRequirements.isPresent()) {
            this.jobInformation = new JobGraphJobInformation(this.jobGraph, applyJobResourceRequirements.get());
            declareDesiredResources();
            this.state.tryRun(ResourceListener.class, (v0) -> {
                v0.onNewResourceRequirements();
            }, "Current state does not react to desired parallelism changes.");
        }
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.WaitingForResources.Context
    public boolean hasDesiredResources() {
        return hasDesiredResources(this.desiredResources, this.declarativeSlotPool.getFreeSlotInfoTracker().getFreeSlotsInformation());
    }

    @VisibleForTesting
    static boolean hasDesiredResources(ResourceCounter resourceCounter, Collection<? extends SlotInfo> collection) {
        ResourceCounter resourceCounter2 = resourceCounter;
        Iterator<? extends SlotInfo> it = collection.iterator();
        while (!resourceCounter2.isEmpty() && it.hasNext()) {
            ResourceProfile resourceProfile = it.next().getResourceProfile();
            resourceCounter2 = resourceCounter2.containsResource(resourceProfile) ? resourceCounter2.subtract(resourceProfile, 1) : resourceCounter2.subtract(ResourceProfile.UNKNOWN, 1);
        }
        return resourceCounter2.isEmpty();
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.WaitingForResources.Context
    public boolean hasSufficientResources() {
        return this.slotAllocator.determineParallelism(this.jobInformation, this.declarativeSlotPool.getAllSlotsInformation()).isPresent();
    }

    private JobSchedulingPlan determineParallelism(SlotAllocator slotAllocator, @Nullable ExecutionGraph executionGraph) throws NoResourceAvailableException {
        return slotAllocator.determineParallelismAndCalculateAssignment(this.jobInformation, this.declarativeSlotPool.getFreeSlotInfoTracker().getFreeSlotsInformation(), JobAllocationsInformation.fromGraph(executionGraph)).orElseThrow(() -> {
            return new NoResourceAvailableException("Not enough resources available for scheduling.");
        });
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.Created.Context, org.apache.flink.runtime.scheduler.adaptive.WaitingForResources.Context, org.apache.flink.runtime.scheduler.adaptive.CreatingExecutionGraph.Context
    public ArchivedExecutionGraph getArchivedExecutionGraph(JobStatus jobStatus, @Nullable Throwable th) {
        return ArchivedExecutionGraph.createSparseArchivedExecutionGraphWithJobVertices(this.jobInformation.getJobID(), this.jobInformation.getName(), jobStatus, th, this.jobInformation.getCheckpointingSettings(), this.initializationTimestamp, this.jobGraph.getVertices(), this.initialParallelismStore);
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.StateTransitions.ToWaitingForResources
    public void goToWaitingForResources(@Nullable ExecutionGraph executionGraph) {
        declareDesiredResources();
        transitionToState(new WaitingForResources.Factory(this, LOG, this.initialResourceAllocationTimeout, this.resourceStabilizationTimeout, executionGraph));
    }

    private void declareDesiredResources() {
        ResourceCounter calculateDesiredResources = calculateDesiredResources();
        if (calculateDesiredResources.equals(this.desiredResources)) {
            return;
        }
        this.desiredResources = calculateDesiredResources;
        this.declarativeSlotPool.setResourceRequirements(this.desiredResources);
    }

    private ResourceCounter calculateDesiredResources() {
        return this.slotAllocator.calculateRequiredSlots(this.jobInformation.getVertices());
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.StateTransitions.ToExecuting
    public void goToExecuting(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, List<ExceptionHistoryEntry> list) {
        transitionToState(new Executing.Factory(executionGraph, executionGraphHandler, operatorCoordinatorHandler, LOG, this, this.userCodeClassLoader, list));
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.StateTransitions.ToCancelling
    public void goToCanceling(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, List<ExceptionHistoryEntry> list) {
        transitionToState(new Canceling.Factory(this, executionGraph, executionGraphHandler, operatorCoordinatorHandler, LOG, this.userCodeClassLoader, list));
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.StateTransitions.ToRestarting
    public void goToRestarting(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Duration duration, List<ExceptionHistoryEntry> list) {
        for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) {
            this.vertexAttemptNumberStore.setAttemptCount(executionVertex.getJobvertexId(), executionVertex.getParallelSubtaskIndex(), executionVertex.getCurrentExecutionAttempt().getAttemptNumber() + 1);
        }
        transitionToState(new Restarting.Factory(this, executionGraph, executionGraphHandler, operatorCoordinatorHandler, LOG, duration, this.userCodeClassLoader, list));
        this.numRestarts++;
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.StateTransitions.ToFailing
    public void goToFailing(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Throwable th, List<ExceptionHistoryEntry> list) {
        transitionToState(new Failing.Factory(this, executionGraph, executionGraphHandler, operatorCoordinatorHandler, LOG, th, this.userCodeClassLoader, list));
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.StateTransitions.ToStopWithSavepoint
    public CompletableFuture<String> goToStopWithSavepoint(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, CheckpointScheduling checkpointScheduling, CompletableFuture<String> completableFuture, List<ExceptionHistoryEntry> list) {
        return ((StopWithSavepoint) transitionToState(new StopWithSavepoint.Factory(this, executionGraph, executionGraphHandler, operatorCoordinatorHandler, checkpointScheduling, LOG, this.userCodeClassLoader, completableFuture, list))).getOperationFuture();
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.StateTransitions.ToFinished
    public void goToFinished(ArchivedExecutionGraph archivedExecutionGraph) {
        transitionToState(new Finished.Factory(this, archivedExecutionGraph, LOG));
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.StateTransitions.ToCreatingExecutionGraph
    public void goToCreatingExecutionGraph(@Nullable ExecutionGraph executionGraph) {
        transitionToState(new CreatingExecutionGraph.Factory(this, createExecutionGraphWithAvailableResourcesAsync(executionGraph), LOG, executionGraph));
    }

    private CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism> createExecutionGraphWithAvailableResourcesAsync(@Nullable ExecutionGraph executionGraph) {
        try {
            JobSchedulingPlan determineParallelism = determineParallelism(this.slotAllocator, executionGraph);
            JobGraph copyJobGraph = this.jobInformation.copyJobGraph();
            for (JobVertex jobVertex : copyJobGraph.getVertices()) {
                jobVertex.setParallelism(determineParallelism.getVertexParallelism().getParallelism(jobVertex.getID()));
            }
            return createExecutionGraphAndRestoreStateAsync(computeVertexParallelismStoreForExecution(copyJobGraph, this.executionMode, jobVertex2 -> {
                return Integer.valueOf(this.initialParallelismStore.getParallelismInfo(jobVertex2.getID()).getMaxParallelism());
            })).thenApply(executionGraph2 -> {
                return CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(executionGraph2, determineParallelism);
            });
        } catch (Exception e) {
            return FutureUtils.completedExceptionally(e);
        }
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.CreatingExecutionGraph.Context
    public CreatingExecutionGraph.AssignmentResult tryToAssignSlots(CreatingExecutionGraph.ExecutionGraphWithVertexParallelism executionGraphWithVertexParallelism) {
        ExecutionGraph executionGraph = executionGraphWithVertexParallelism.getExecutionGraph();
        executionGraph.start(this.componentMainThreadExecutor);
        executionGraph.transitionToRunning();
        executionGraph.setInternalTaskFailuresListener(new UpdateSchedulerNgOnInternalFailuresListener(this));
        return (CreatingExecutionGraph.AssignmentResult) this.slotAllocator.tryReserveResources(executionGraphWithVertexParallelism.getJobSchedulingPlan()).map(reservedSlots -> {
            return assignSlotsToExecutionGraph(executionGraph, reservedSlots);
        }).map(CreatingExecutionGraph.AssignmentResult::success).orElseGet(CreatingExecutionGraph.AssignmentResult::notPossible);
    }

    @Nonnull
    private ExecutionGraph assignSlotsToExecutionGraph(ExecutionGraph executionGraph, ReservedSlots reservedSlots) {
        for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) {
            LogicalSlot slotFor = reservedSlots.getSlotFor(executionVertex.getID());
            Preconditions.checkState(executionVertex.getCurrentExecutionAttempt().registerProducedPartitions(slotFor.getTaskManagerLocation()).isDone(), "Partition registration must be completed immediately for reactive mode");
            executionVertex.tryAssignResource(slotFor);
        }
        return executionGraph;
    }

    private CompletableFuture<ExecutionGraph> createExecutionGraphAndRestoreStateAsync(VertexParallelismStore vertexParallelismStore) {
        this.backgroundTask.abort();
        this.backgroundTask = this.backgroundTask.runAfter(() -> {
            return createExecutionGraphAndRestoreState(vertexParallelismStore);
        }, this.ioExecutor);
        return FutureUtils.switchExecutor(this.backgroundTask.getResultFuture(), getMainThreadExecutor());
    }

    @Nonnull
    private ExecutionGraph createExecutionGraphAndRestoreState(VertexParallelismStore vertexParallelismStore) throws Exception {
        return this.executionGraphFactory.createAndRestoreExecutionGraph(this.jobInformation.copyJobGraph(), this.completedCheckpointStore, this.checkpointsCleaner, this.checkpointIdCounter, TaskDeploymentDescriptorFactory.PartitionLocationConstraint.MUST_BE_KNOWN, this.initializationTimestamp, this.vertexAttemptNumberStore, vertexParallelismStore, this.deploymentTimeMetrics, resultPartitionType -> {
            return false;
        }, LOG);
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.Executing.Context
    public boolean shouldRescale(ExecutionGraph executionGraph) {
        return this.slotAllocator.determineParallelism(this.jobInformation, this.declarativeSlotPool.getAllSlotsInformation()).filter(vertexParallelism -> {
            return this.rescalingController.shouldRescale(getCurrentParallelism(executionGraph), vertexParallelism);
        }).isPresent();
    }

    private static VertexParallelism getCurrentParallelism(ExecutionGraph executionGraph) {
        return new VertexParallelism((Map) executionGraph.getAllVertices().values().stream().collect(Collectors.toMap((v0) -> {
            return v0.getJobVertexId();
        }, (v0) -> {
            return v0.getParallelism();
        })));
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.Finished.Context
    public void onFinished(ArchivedExecutionGraph archivedExecutionGraph) {
        LOG.info("Job {} reached terminal state {}.", archivedExecutionGraph.getJobID(), archivedExecutionGraph.getState(), archivedExecutionGraph.getFailureInfo() != null ? archivedExecutionGraph.getFailureInfo().getException() : null);
        this.jobTerminationFuture.complete(archivedExecutionGraph.getState());
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.Executing.Context, org.apache.flink.runtime.scheduler.adaptive.StopWithSavepoint.Context
    public FailureResult howToHandleFailure(Throwable th) {
        if (ExecutionFailureHandler.isUnrecoverableError(th)) {
            return FailureResult.canNotRestart(new JobException("The failure is not recoverable", th));
        }
        this.restartBackoffTimeStrategy.notifyFailure(th);
        return this.restartBackoffTimeStrategy.canRestart() ? FailureResult.canRestart(th, Duration.ofMillis(this.restartBackoffTimeStrategy.getBackoffTime())) : FailureResult.canNotRestart(new JobException("Recovery is suppressed by " + this.restartBackoffTimeStrategy, th));
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.CreatingExecutionGraph.Context
    public Executor getIOExecutor() {
        return this.ioExecutor;
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.StateWithExecutionGraph.Context
    public ComponentMainThreadExecutor getMainThreadExecutor() {
        return this.componentMainThreadExecutor;
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.CreatingExecutionGraph.Context
    public JobManagerJobMetricGroup getMetricGroup() {
        return this.jobManagerJobMetricGroup;
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.StateWithExecutionGraph.Context
    public boolean isState(State state) {
        return state == this.state;
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.StateWithExecutionGraph.Context
    public void runIfState(State state, Runnable runnable) {
        if (!isState(state)) {
            LOG.debug("Ignoring scheduled action because expected state {} is not the actual state {}.", state, this.state);
            return;
        }
        try {
            runnable.run();
        } catch (Throwable th) {
            this.fatalErrorHandler.onFatalError(th);
        }
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.WaitingForResources.Context, org.apache.flink.runtime.scheduler.adaptive.CreatingExecutionGraph.Context, org.apache.flink.runtime.scheduler.adaptive.Executing.Context, org.apache.flink.runtime.scheduler.adaptive.Restarting.Context, org.apache.flink.runtime.scheduler.adaptive.StopWithSavepoint.Context
    public ScheduledFuture<?> runIfState(State state, Runnable runnable, Duration duration) {
        return this.componentMainThreadExecutor.schedule(() -> {
            runIfState(state, runnable);
        }, duration.toMillis(), TimeUnit.MILLISECONDS);
    }

    @VisibleForTesting
    <T extends State> T transitionToState(StateFactory<T> stateFactory) {
        Preconditions.checkState(!this.isTransitioningState, "State transitions must not be triggered while another state transition is in progress.");
        Preconditions.checkState(this.state.getClass() != stateFactory.getStateClass(), "Attempted to transition into the very state the scheduler is already in.");
        this.componentMainThreadExecutor.assertRunningInMainThread();
        try {
            this.isTransitioningState = true;
            LOG.debug("Transition from state {} to {}.", this.state.getClass().getSimpleName(), stateFactory.getStateClass().getSimpleName());
            JobStatus jobStatus = this.state.getJobStatus();
            this.state.onLeave(stateFactory.getStateClass());
            T state = stateFactory.getState();
            this.state = state;
            JobStatus jobStatus2 = this.state.getJobStatus();
            if (jobStatus != jobStatus2) {
                long currentTimeMillis = System.currentTimeMillis();
                this.jobStatusListeners.forEach(jobStatusListener -> {
                    jobStatusListener.jobStatusChanges(this.jobInformation.getJobID(), jobStatus2, currentTimeMillis);
                });
            }
            return state;
        } finally {
            this.isTransitioningState = false;
        }
    }

    @VisibleForTesting
    State getState() {
        return this.state;
    }

    private void checkIdleSlotTimeout() {
        if (getState().getJobStatus().isGloballyTerminalState()) {
            Iterator<? extends SlotInfo> it = this.declarativeSlotPool.getAllSlotsInformation().iterator();
            while (it.hasNext()) {
                this.declarativeSlotPool.releaseSlot(it.next().getAllocationId(), new FlinkException("Returning slots to their owners, because the job has reached a globally terminal state."));
            }
            return;
        }
        if (getState().getJobStatus().isTerminalState()) {
            return;
        }
        this.declarativeSlotPool.releaseIdleSlots(System.currentTimeMillis());
        getMainThreadExecutor().schedule(this::checkIdleSlotTimeout, this.slotIdleTimeout.toMillis(), TimeUnit.MILLISECONDS);
    }
}
