package org.apache.flink.runtime.scheduler.adaptive.allocator;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/allocator/SharedSlot.class */
class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SharedSlot.class);
    private final SlotRequestId physicalSlotRequestId;
    private final PhysicalSlot physicalSlot;
    private final Runnable externalReleaseCallback;
    private final Map<SlotRequestId, LogicalSlot> allocatedLogicalSlots = new HashMap();
    private final boolean slotWillBeOccupiedIndefinitely;
    private State state;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/allocator/SharedSlot$State.class */
    public enum State {
        ALLOCATED,
        RELEASING,
        RELEASED
    }

    public SharedSlot(SlotRequestId slotRequestId, PhysicalSlot physicalSlot, boolean z, Runnable runnable) {
        this.physicalSlotRequestId = slotRequestId;
        this.physicalSlot = physicalSlot;
        this.slotWillBeOccupiedIndefinitely = z;
        this.externalReleaseCallback = runnable;
        Preconditions.checkState(physicalSlot.tryAssignPayload(this), "The provided slot (%s) was not free.", physicalSlot.getAllocationId());
        this.state = State.ALLOCATED;
    }

    public LogicalSlot allocateLogicalSlot() {
        LOG.debug("Allocating logical slot from shared slot ({})", this.physicalSlotRequestId);
        Preconditions.checkState(this.state == State.ALLOCATED, "The shared slot has already been released.");
        SingleLogicalSlot singleLogicalSlot = new SingleLogicalSlot(new SlotRequestId(), this.physicalSlot, Locality.UNKNOWN, this, this.slotWillBeOccupiedIndefinitely);
        this.allocatedLogicalSlots.put(singleLogicalSlot.getSlotRequestId(), singleLogicalSlot);
        return singleLogicalSlot;
    }

    @Override // org.apache.flink.runtime.jobmaster.SlotOwner
    public void returnLogicalSlot(LogicalSlot logicalSlot) {
        LOG.debug("Returning logical slot to shared slot ({})", this.physicalSlotRequestId);
        Preconditions.checkState(this.state != State.RELEASED, "The shared slot has already been released.");
        Preconditions.checkState(!logicalSlot.isAlive(), "Returned logic slot must not be alive.");
        Preconditions.checkState(this.allocatedLogicalSlots.remove(logicalSlot.getSlotRequestId()) != null, "Trying to remove a logical slot request which has been either already removed or never created.");
        tryReleaseExternally();
    }

    private void tryReleaseExternally() {
        if (this.state == State.ALLOCATED && this.allocatedLogicalSlots.isEmpty()) {
            LOG.debug("Release shared slot externally ({})", this.physicalSlotRequestId);
            this.externalReleaseCallback.run();
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot.Payload
    public void release(Throwable th) {
        LOG.debug("Release shared slot ({})", this.physicalSlotRequestId);
        Preconditions.checkState(this.state == State.ALLOCATED, "The shared slot has already been released.");
        this.state = State.RELEASING;
        Iterator it = new ArrayList(this.allocatedLogicalSlots.values()).iterator();
        while (it.hasNext()) {
            ((LogicalSlot) it.next()).releaseSlot(th);
        }
        this.allocatedLogicalSlots.clear();
        this.state = State.RELEASED;
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot.Payload
    public boolean willOccupySlotIndefinitely() {
        return this.slotWillBeOccupiedIndefinitely;
    }
}
