package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.class */
public class FineGrainedTaskManagerTracker implements TaskManagerTracker {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) FineGrainedTaskManagerTracker.class);
    private ResourceProfile totalRegisteredResource = ResourceProfile.ZERO;
    private ResourceProfile totalPendingResource = ResourceProfile.ZERO;
    private final Map<AllocationID, FineGrainedTaskManagerSlot> slots = new HashMap();
    private final Map<InstanceID, FineGrainedTaskManagerRegistration> taskManagerRegistrations = new HashMap();
    private final Map<InstanceID, WorkerResourceSpec> unWantedTaskManagers = new HashMap();
    private final Map<PendingTaskManagerId, PendingTaskManager> pendingTaskManagers = new HashMap();
    private final Map<Tuple2<ResourceProfile, ResourceProfile>, Set<PendingTaskManager>> totalAndDefaultSlotProfilesToPendingTaskManagers = new HashMap();

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerTracker
    public void replaceAllPendingAllocations(Map<PendingTaskManagerId, Map<JobID, ResourceCounter>> map) {
        Preconditions.checkNotNull(map);
        LOG.trace("Record the pending allocations {}.", map);
        this.pendingTaskManagers.values().forEach((v0) -> {
            v0.clearAllPendingAllocations();
        });
        map.forEach((pendingTaskManagerId, map2) -> {
            ((PendingTaskManager) Preconditions.checkNotNull(this.pendingTaskManagers.get(pendingTaskManagerId))).replaceAllPendingAllocations(map2);
        });
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerTracker
    public void clearPendingAllocationsOfJob(JobID jobID) {
        LOG.info("Clear all pending allocations for job {}.", jobID);
        this.pendingTaskManagers.values().forEach(pendingTaskManager -> {
            pendingTaskManager.clearPendingAllocationsOfJob(jobID);
        });
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerTracker
    public void addTaskManager(TaskExecutorConnection taskExecutorConnection, ResourceProfile resourceProfile, ResourceProfile resourceProfile2) {
        Preconditions.checkNotNull(taskExecutorConnection);
        Preconditions.checkNotNull(resourceProfile);
        Preconditions.checkNotNull(resourceProfile2);
        LOG.debug("Add task manager {} with total resource {} and default slot resource {}.", taskExecutorConnection.getInstanceID(), resourceProfile, resourceProfile2);
        this.taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), new FineGrainedTaskManagerRegistration(taskExecutorConnection, resourceProfile, resourceProfile2));
        this.totalRegisteredResource = this.totalRegisteredResource.merge(resourceProfile);
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerTracker
    public void removeTaskManager(InstanceID instanceID) {
        Preconditions.checkNotNull(instanceID);
        this.unWantedTaskManagers.remove(instanceID);
        FineGrainedTaskManagerRegistration fineGrainedTaskManagerRegistration = (FineGrainedTaskManagerRegistration) Preconditions.checkNotNull(this.taskManagerRegistrations.remove(instanceID));
        this.totalRegisteredResource = this.totalRegisteredResource.subtract(fineGrainedTaskManagerRegistration.getTotalResource());
        LOG.debug("Remove task manager {}.", instanceID);
        Iterator<AllocationID> it = fineGrainedTaskManagerRegistration.getAllocatedSlots().keySet().iterator();
        while (it.hasNext()) {
            this.slots.remove(it.next());
        }
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerTracker
    public void addUnWantedTaskManager(InstanceID instanceID) {
        FineGrainedTaskManagerRegistration fineGrainedTaskManagerRegistration = this.taskManagerRegistrations.get(instanceID);
        if (fineGrainedTaskManagerRegistration != null) {
            this.unWantedTaskManagers.put(instanceID, WorkerResourceSpec.fromTotalResourceProfile(fineGrainedTaskManagerRegistration.getTotalResource(), SlotManagerUtils.calculateDefaultNumSlots(fineGrainedTaskManagerRegistration.getTotalResource(), fineGrainedTaskManagerRegistration.getDefaultSlotResourceProfile())));
        } else {
            LOG.debug("Unwanted task manager {} does not exists.", instanceID);
        }
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerTracker
    public Map<InstanceID, WorkerResourceSpec> getUnWantedTaskManager() {
        return this.unWantedTaskManagers;
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerTracker
    public void addPendingTaskManager(PendingTaskManager pendingTaskManager) {
        Preconditions.checkNotNull(pendingTaskManager);
        LOG.debug("Add pending task manager {}.", pendingTaskManager);
        this.pendingTaskManagers.put(pendingTaskManager.getPendingTaskManagerId(), pendingTaskManager);
        this.totalPendingResource = this.totalPendingResource.merge(pendingTaskManager.getTotalResourceProfile());
        this.totalAndDefaultSlotProfilesToPendingTaskManagers.computeIfAbsent(Tuple2.of(pendingTaskManager.getTotalResourceProfile(), pendingTaskManager.getDefaultSlotResourceProfile()), tuple2 -> {
            return new HashSet();
        }).add(pendingTaskManager);
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerTracker
    public Map<JobID, ResourceCounter> removePendingTaskManager(PendingTaskManagerId pendingTaskManagerId) {
        Preconditions.checkNotNull(pendingTaskManagerId);
        PendingTaskManager pendingTaskManager = (PendingTaskManager) Preconditions.checkNotNull(this.pendingTaskManagers.remove(pendingTaskManagerId));
        this.totalPendingResource = this.totalPendingResource.subtract(pendingTaskManager.getTotalResourceProfile());
        LOG.debug("Remove pending task manager {}.", pendingTaskManagerId);
        this.totalAndDefaultSlotProfilesToPendingTaskManagers.compute(Tuple2.of(pendingTaskManager.getTotalResourceProfile(), pendingTaskManager.getDefaultSlotResourceProfile()), (tuple2, set) -> {
            ((Set) Preconditions.checkNotNull(set)).remove(pendingTaskManager);
            if (set.isEmpty()) {
                return null;
            }
            return set;
        });
        return pendingTaskManager.getPendingSlotAllocationRecords();
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerTracker
    public Collection<TaskManagerInfo> getTaskManagersWithAllocatedSlotsForJob(JobID jobID) {
        return (Collection) this.taskManagerRegistrations.values().stream().filter(fineGrainedTaskManagerRegistration -> {
            return fineGrainedTaskManagerRegistration.getAllocatedSlots().values().stream().anyMatch(taskManagerSlotInformation -> {
                return jobID.equals(taskManagerSlotInformation.getJobId());
            });
        }).collect(Collectors.toList());
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerTracker
    public void notifySlotStatus(AllocationID allocationID, JobID jobID, InstanceID instanceID, ResourceProfile resourceProfile, SlotState slotState) {
        Preconditions.checkNotNull(allocationID);
        Preconditions.checkNotNull(jobID);
        Preconditions.checkNotNull(instanceID);
        Preconditions.checkNotNull(resourceProfile);
        Preconditions.checkNotNull(slotState);
        switch (slotState) {
            case FREE:
                freeSlot(instanceID, allocationID);
                return;
            case ALLOCATED:
                addAllocatedSlot(allocationID, jobID, instanceID, resourceProfile);
                return;
            case PENDING:
                addPendingSlot(allocationID, jobID, instanceID, resourceProfile);
                return;
            default:
                return;
        }
    }

    private void freeSlot(InstanceID instanceID, AllocationID allocationID) {
        FineGrainedTaskManagerRegistration fineGrainedTaskManagerRegistration = (FineGrainedTaskManagerRegistration) Preconditions.checkNotNull(this.taskManagerRegistrations.get(instanceID));
        Preconditions.checkNotNull(this.slots.remove(allocationID));
        LOG.debug("Free allocated slot with allocationId {}.", allocationID);
        fineGrainedTaskManagerRegistration.freeSlot(allocationID);
    }

    private void addAllocatedSlot(AllocationID allocationID, JobID jobID, InstanceID instanceID, ResourceProfile resourceProfile) {
        FineGrainedTaskManagerRegistration fineGrainedTaskManagerRegistration = (FineGrainedTaskManagerRegistration) Preconditions.checkNotNull(this.taskManagerRegistrations.get(instanceID));
        if (this.slots.containsKey(allocationID)) {
            LOG.debug("Complete slot allocation with allocationId {}.", allocationID);
            fineGrainedTaskManagerRegistration.notifyAllocationComplete(allocationID);
        } else {
            LOG.debug("Register new allocated slot with allocationId {}.", allocationID);
            FineGrainedTaskManagerSlot fineGrainedTaskManagerSlot = new FineGrainedTaskManagerSlot(allocationID, jobID, resourceProfile, fineGrainedTaskManagerRegistration.getTaskExecutorConnection(), SlotState.ALLOCATED);
            this.slots.put(allocationID, fineGrainedTaskManagerSlot);
            fineGrainedTaskManagerRegistration.notifyAllocation(allocationID, fineGrainedTaskManagerSlot);
        }
    }

    private void addPendingSlot(AllocationID allocationID, JobID jobID, InstanceID instanceID, ResourceProfile resourceProfile) {
        Preconditions.checkState(!this.slots.containsKey(allocationID));
        FineGrainedTaskManagerRegistration fineGrainedTaskManagerRegistration = (FineGrainedTaskManagerRegistration) Preconditions.checkNotNull(this.taskManagerRegistrations.get(instanceID));
        LOG.debug("Add pending slot with allocationId {}.", allocationID);
        FineGrainedTaskManagerSlot fineGrainedTaskManagerSlot = new FineGrainedTaskManagerSlot(allocationID, jobID, resourceProfile, fineGrainedTaskManagerRegistration.getTaskExecutorConnection(), SlotState.PENDING);
        fineGrainedTaskManagerRegistration.notifyAllocation(allocationID, fineGrainedTaskManagerSlot);
        this.slots.put(allocationID, fineGrainedTaskManagerSlot);
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerResourceInfoProvider
    public Collection<? extends TaskManagerInfo> getRegisteredTaskManagers() {
        return Collections.unmodifiableCollection(this.taskManagerRegistrations.values());
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerResourceInfoProvider
    public Optional<TaskManagerInfo> getRegisteredTaskManager(InstanceID instanceID) {
        return Optional.ofNullable(this.taskManagerRegistrations.get(instanceID));
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerResourceInfoProvider
    public Optional<TaskManagerSlotInformation> getAllocatedOrPendingSlot(AllocationID allocationID) {
        return Optional.ofNullable(this.slots.get(allocationID));
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerResourceInfoProvider
    public Collection<PendingTaskManager> getPendingTaskManagers() {
        return Collections.unmodifiableCollection(this.pendingTaskManagers.values());
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerResourceInfoProvider
    public Collection<PendingTaskManager> getPendingTaskManagersByTotalAndDefaultSlotResourceProfile(ResourceProfile resourceProfile, ResourceProfile resourceProfile2) {
        return Collections.unmodifiableCollection(this.totalAndDefaultSlotProfilesToPendingTaskManagers.getOrDefault(Tuple2.of(resourceProfile, resourceProfile2), Collections.emptySet()));
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.ClusterResourceStatisticsProvider
    public int getNumberRegisteredSlots() {
        return this.taskManagerRegistrations.values().stream().mapToInt((v0) -> {
            return v0.getDefaultNumSlots();
        }).sum();
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.ClusterResourceStatisticsProvider
    public int getNumberRegisteredSlotsOf(InstanceID instanceID) {
        return ((Integer) Optional.ofNullable(this.taskManagerRegistrations.get(instanceID)).map((v0) -> {
            return v0.getDefaultNumSlots();
        }).orElse(0)).intValue();
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.ClusterResourceStatisticsProvider
    public int getNumberFreeSlots() {
        return this.taskManagerRegistrations.keySet().stream().mapToInt(this::getNumberFreeSlotsOf).sum();
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.ClusterResourceStatisticsProvider
    public int getNumberFreeSlotsOf(InstanceID instanceID) {
        return ((Integer) Optional.ofNullable(this.taskManagerRegistrations.get(instanceID)).map(fineGrainedTaskManagerRegistration -> {
            return Integer.valueOf(Math.max(fineGrainedTaskManagerRegistration.getDefaultNumSlots() - fineGrainedTaskManagerRegistration.getAllocatedSlots().size(), 0));
        }).orElse(0)).intValue();
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.ClusterResourceStatisticsProvider
    public ResourceProfile getRegisteredResource() {
        return this.totalRegisteredResource;
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.ClusterResourceStatisticsProvider
    public ResourceProfile getRegisteredResourceOf(InstanceID instanceID) {
        return (ResourceProfile) Optional.ofNullable(this.taskManagerRegistrations.get(instanceID)).map((v0) -> {
            return v0.getTotalResource();
        }).orElse(ResourceProfile.ZERO);
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.ClusterResourceStatisticsProvider
    public ResourceProfile getFreeResource() {
        return (ResourceProfile) this.taskManagerRegistrations.values().stream().map((v0) -> {
            return v0.getAvailableResource();
        }).reduce(ResourceProfile.ZERO, (v0, v1) -> {
            return v0.merge(v1);
        });
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.ClusterResourceStatisticsProvider
    public ResourceProfile getFreeResourceOf(InstanceID instanceID) {
        return (ResourceProfile) Optional.ofNullable(this.taskManagerRegistrations.get(instanceID)).map((v0) -> {
            return v0.getAvailableResource();
        }).orElse(ResourceProfile.ZERO);
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.ClusterResourceStatisticsProvider
    public ResourceProfile getPendingResource() {
        return this.totalPendingResource;
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerTracker
    public void clear() {
        this.slots.clear();
        this.taskManagerRegistrations.clear();
        this.totalRegisteredResource = ResourceProfile.ZERO;
        this.pendingTaskManagers.clear();
        this.totalPendingResource = ResourceProfile.ZERO;
        this.unWantedTaskManagers.clear();
    }
}
