package org.apache.flink.runtime.state;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/runtime/state/SharedStateRegistryImpl.class */
public class SharedStateRegistryImpl implements SharedStateRegistry {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SharedStateRegistryImpl.class);
    private final Map<SharedStateRegistryKey, SharedStateEntry> registeredStates;
    private final Map<Long, Optional<SnapshotType.SharingFilesStrategy>> restoredCheckpointSharingStrategies;
    private boolean open;
    private final Executor asyncDisposalExecutor;
    private long highestNotClaimedCheckpointID;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/state/SharedStateRegistryImpl$AsyncDisposalRunnable.class */
    public static final class AsyncDisposalRunnable implements Runnable {
        private final StateObject toDispose;

        public AsyncDisposalRunnable(StateObject stateObject) {
            this.toDispose = (StateObject) Preconditions.checkNotNull(stateObject);
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.toDispose.discardState();
            } catch (Exception e) {
                SharedStateRegistryImpl.LOG.warn("A problem occurred during asynchronous disposal of a shared state object: {}", this.toDispose, e);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/SharedStateRegistryImpl$EmptyDiscardStateObjectForRegister.class */
    public static class EmptyDiscardStateObjectForRegister implements StreamStateHandle {
        private static final long serialVersionUID = 1;
        private StateHandleID stateHandleID;

        public EmptyDiscardStateObjectForRegister(StateHandleID stateHandleID) {
            this.stateHandleID = stateHandleID;
        }

        @Override // org.apache.flink.runtime.state.StateObject
        public void discardState() throws Exception {
        }

        @Override // org.apache.flink.runtime.state.StateObject
        public long getStateSize() {
            throw new UnsupportedOperationException("Should not call here.");
        }

        @Override // org.apache.flink.runtime.state.StreamStateHandle
        public FSDataInputStream openInputStream() throws IOException {
            throw new UnsupportedOperationException("Should not call here.");
        }

        @Override // org.apache.flink.runtime.state.StreamStateHandle
        public Optional<byte[]> asBytesIfInMemory() {
            throw new UnsupportedOperationException("Should not call here.");
        }

        @Override // org.apache.flink.runtime.state.StreamStateHandle
        public PhysicalStateHandleID getStreamStateHandleID() {
            throw new UnsupportedOperationException("Should not call here.");
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return Objects.equals(this.stateHandleID, ((EmptyDiscardStateObjectForRegister) obj).stateHandleID);
        }

        public int hashCode() {
            return Objects.hash(this.stateHandleID);
        }

        public String toString() {
            return "EmptyDiscardStateObject{" + this.stateHandleID + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/state/SharedStateRegistryImpl$SharedStateEntry.class */
    public static final class SharedStateEntry {
        private boolean preventDiscardingCreatedCheckpoint = false;
        StreamStateHandle stateHandle;
        private final long createdByCheckpointID;
        private long lastUsedCheckpointID;
        private boolean confirmed;

        SharedStateEntry(StreamStateHandle streamStateHandle, long j) {
            this.stateHandle = streamStateHandle;
            this.createdByCheckpointID = j;
            this.lastUsedCheckpointID = j;
        }

        public String toString() {
            return "SharedStateEntry{stateHandle=" + this.stateHandle + ", createdByCheckpointID=" + this.createdByCheckpointID + ", lastUsedCheckpointID=" + this.lastUsedCheckpointID + '}';
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void advanceLastUsingCheckpointID(long j) {
            this.lastUsedCheckpointID = Math.max(j, this.lastUsedCheckpointID);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void preventDiscardingCreatedCheckpoint() {
            this.preventDiscardingCreatedCheckpoint = true;
        }
    }

    public SharedStateRegistryImpl() {
        this(Executors.directExecutor());
    }

    public SharedStateRegistryImpl(Executor executor) {
        this.restoredCheckpointSharingStrategies = new HashMap();
        this.highestNotClaimedCheckpointID = -1L;
        this.registeredStates = new HashMap();
        this.asyncDisposalExecutor = (Executor) Preconditions.checkNotNull(executor);
        this.open = true;
    }

    @Override // org.apache.flink.runtime.state.SharedStateRegistry
    public StreamStateHandle registerReference(SharedStateRegistryKey sharedStateRegistryKey, StreamStateHandle streamStateHandle, long j, boolean z) {
        SharedStateEntry sharedStateEntry;
        Preconditions.checkNotNull(streamStateHandle);
        StreamStateHandle streamStateHandle2 = null;
        synchronized (this.registeredStates) {
            Preconditions.checkState(this.open, "Attempt to register state to closed SharedStateRegistry.");
            sharedStateEntry = this.registeredStates.get(sharedStateRegistryKey);
            if (sharedStateEntry == null) {
                Preconditions.checkState(!isPlaceholder(streamStateHandle), "Attempt to reference unknown state: " + sharedStateRegistryKey);
                sharedStateEntry = new SharedStateEntry(streamStateHandle, j);
                this.registeredStates.put(sharedStateRegistryKey, sharedStateEntry);
                LOG.trace("Registered new shared state {} under key {}.", sharedStateEntry, sharedStateRegistryKey);
            } else {
                if (!Objects.equals(streamStateHandle, sharedStateEntry.stateHandle)) {
                    if (sharedStateEntry.confirmed || isPlaceholder(streamStateHandle)) {
                        streamStateHandle2 = streamStateHandle;
                    } else {
                        streamStateHandle2 = sharedStateEntry.stateHandle;
                        sharedStateEntry.stateHandle = streamStateHandle;
                    }
                    LOG.trace("Identified duplicate state registration under key {}. New state {} was determined to be an unnecessary copy of existing state {} and will be dropped.", sharedStateRegistryKey, streamStateHandle, sharedStateEntry.stateHandle);
                }
                LOG.trace("Updating last checkpoint for {} from {} to {}", sharedStateRegistryKey, Long.valueOf(sharedStateEntry.lastUsedCheckpointID), Long.valueOf(j));
                sharedStateEntry.advanceLastUsingCheckpointID(j);
                if (z) {
                    sharedStateEntry.preventDiscardingCreatedCheckpoint();
                }
            }
        }
        scheduleAsyncDelete(streamStateHandle2);
        return sharedStateEntry.stateHandle;
    }

    @Override // org.apache.flink.runtime.state.SharedStateRegistry
    public Set<Long> unregisterUnusedState(long j) {
        HashSet hashSet = new HashSet();
        LOG.debug("Discard state created before checkpoint {} and not used afterwards", Long.valueOf(j));
        ArrayList arrayList = new ArrayList();
        synchronized (this.registeredStates) {
            Iterator<SharedStateEntry> it = this.registeredStates.values().iterator();
            while (it.hasNext()) {
                SharedStateEntry next = it.next();
                if (next.lastUsedCheckpointID < j) {
                    if (next.createdByCheckpointID > this.highestNotClaimedCheckpointID) {
                        arrayList.add(next.stateHandle);
                    }
                    it.remove();
                } else if (preventsDiscardingCreatedCheckpoint(next)) {
                    hashSet.add(Long.valueOf(next.createdByCheckpointID));
                }
            }
        }
        LOG.trace("Discard {} state asynchronously", Integer.valueOf(arrayList.size()));
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            scheduleAsyncDelete((StreamStateHandle) it2.next());
        }
        return hashSet;
    }

    @Override // org.apache.flink.runtime.state.SharedStateRegistry
    public void registerAll(Iterable<? extends CompositeStateHandle> iterable, long j) {
        if (iterable == null) {
            return;
        }
        synchronized (this.registeredStates) {
            Iterator<? extends CompositeStateHandle> it = iterable.iterator();
            while (it.hasNext()) {
                it.next().registerSharedStates(this, j);
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.state.SharedStateRegistry
    public void registerAllAfterRestored(CompletedCheckpoint completedCheckpoint, RestoreMode restoreMode) {
        registerAll(completedCheckpoint.getOperatorStates().values(), completedCheckpoint.getCheckpointID());
        this.restoredCheckpointSharingStrategies.put(Long.valueOf(completedCheckpoint.getCheckpointID()), completedCheckpoint.getRestoredProperties().map(checkpointProperties -> {
            return checkpointProperties.getCheckpointType().getSharingFilesStrategy();
        }));
        if (restoreMode != RestoreMode.CLAIM) {
            this.highestNotClaimedCheckpointID = Math.max(this.highestNotClaimedCheckpointID, completedCheckpoint.getCheckpointID());
        }
    }

    @Override // org.apache.flink.runtime.state.SharedStateRegistry
    public void checkpointCompleted(long j) {
        for (SharedStateEntry sharedStateEntry : this.registeredStates.values()) {
            if (sharedStateEntry.lastUsedCheckpointID == j) {
                sharedStateEntry.confirmed = true;
            }
        }
    }

    public String toString() {
        String str;
        synchronized (this.registeredStates) {
            str = "SharedStateRegistry{registeredStates=" + this.registeredStates + '}';
        }
        return str;
    }

    private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
        if (streamStateHandle == null || isPlaceholder(streamStateHandle)) {
            return;
        }
        LOG.trace("Scheduled delete of state handle {}.", streamStateHandle);
        AsyncDisposalRunnable asyncDisposalRunnable = new AsyncDisposalRunnable(streamStateHandle);
        try {
            this.asyncDisposalExecutor.execute(asyncDisposalRunnable);
        } catch (RejectedExecutionException e) {
            asyncDisposalRunnable.run();
        }
    }

    private boolean isPlaceholder(StreamStateHandle streamStateHandle) {
        return streamStateHandle instanceof PlaceholderStreamStateHandle;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        synchronized (this.registeredStates) {
            this.open = false;
        }
    }

    private boolean preventsDiscardingCreatedCheckpoint(SharedStateEntry sharedStateEntry) {
        return (sharedStateEntry.preventDiscardingCreatedCheckpoint && this.restoredCheckpointSharingStrategies.containsKey(Long.valueOf(sharedStateEntry.createdByCheckpointID))) || this.restoredCheckpointSharingStrategies.getOrDefault(Long.valueOf(sharedStateEntry.createdByCheckpointID), Optional.empty()).filter(sharingFilesStrategy -> {
            return sharingFilesStrategy == SnapshotType.SharingFilesStrategy.NO_SHARING;
        }).isPresent();
    }
}
