package org.apache.flink.runtime.state;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader;
import org.apache.flink.runtime.state.changelog.StateChangelogStorageView;
import org.apache.flink.util.ShutdownHookUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.class */
public class TaskExecutorStateChangelogStoragesManager {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) TaskExecutorStateChangelogStoragesManager.class);
    private final Object lock = new Object();

    @GuardedBy("lock")
    private final Map<JobID, Optional<StateChangelogStorage<?>>> changelogStoragesByJobId = new HashMap();

    @GuardedBy("lock")
    private final Map<JobID, StateChangelogStorageView<ChangelogStateHandleStreamImpl>> changelogStorageViewsByJobId = new HashMap();

    @GuardedBy("lock")
    private boolean closed = false;
    private final Thread shutdownHook = ShutdownHookUtil.addShutdownHook(this::shutdown, getClass().getSimpleName(), LOG);

    @Nullable
    public StateChangelogStorage<?> stateChangelogStorageForJob(@Nonnull JobID jobID, Configuration configuration, TaskManagerJobMetricGroup taskManagerJobMetricGroup, LocalRecoveryConfig localRecoveryConfig) throws IOException {
        StateChangelogStorage<?> orElse;
        synchronized (this.lock) {
            if (this.closed) {
                throw new IllegalStateException("TaskExecutorStateChangelogStoragesManager is already closed and cannot register a new StateChangelogStorage.");
            }
            Optional<StateChangelogStorage<?>> optional = this.changelogStoragesByJobId.get(jobID);
            if (optional == null) {
                StateChangelogStorage<?> load = StateChangelogStorageLoader.load(jobID, configuration, taskManagerJobMetricGroup, localRecoveryConfig);
                optional = Optional.ofNullable(load);
                this.changelogStoragesByJobId.put(jobID, optional);
                if (load != null) {
                    LOG.debug("Registered new state changelog storage for job {} : {}.", jobID, load);
                } else {
                    LOG.info("Try to registered new state changelog storage for job {}, but result is null.", jobID);
                }
            } else if (optional.isPresent()) {
                LOG.debug("Found existing state changelog storage for job {}: {}.", jobID, optional.get());
            } else {
                LOG.debug("Found a previously loaded NULL state changelog storage for job {}.", jobID);
            }
            orElse = optional.orElse(null);
        }
        return orElse;
    }

    private void releaseStateChangelogStorageForJob(@Nonnull JobID jobID) {
        LOG.debug("Releasing state changelog storage under job id {}.", jobID);
        synchronized (this.lock) {
            if (this.closed) {
                return;
            }
            Optional<StateChangelogStorage<?>> remove = this.changelogStoragesByJobId.remove(jobID);
            if (remove != null) {
                remove.ifPresent((v1) -> {
                    doRelease(v1);
                });
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v19, types: [org.slf4j.Logger] */
    @Nullable
    public StateChangelogStorageView<?> stateChangelogStorageViewForJob(@Nonnull JobID jobID, Configuration configuration, ChangelogStateHandle changelogStateHandle) throws IOException {
        StateChangelogStorageView stateChangelogStorageView;
        if (this.closed) {
            throw new IllegalStateException("TaskExecutorStateChangelogStoragesManager is already closed and cannot register a new StateChangelogStorageView.");
        }
        synchronized (this.lock) {
            StateChangelogStorageView stateChangelogStorageView2 = this.changelogStorageViewsByJobId.get(jobID);
            if (stateChangelogStorageView2 == null) {
                StateChangelogStorageView loadFromStateHandle = StateChangelogStorageLoader.loadFromStateHandle(configuration, changelogStateHandle);
                stateChangelogStorageView2 = loadFromStateHandle;
                this.changelogStorageViewsByJobId.put(jobID, stateChangelogStorageView2);
                LOG.debug("Registered new state changelog storage view for job {} : {}.", jobID, loadFromStateHandle);
            } else {
                LOG.debug("Found existing state changelog storage view for job {}: {}.", jobID, stateChangelogStorageView2);
            }
            stateChangelogStorageView = stateChangelogStorageView2;
        }
        return stateChangelogStorageView;
    }

    private void releaseStateChangelogStorageViewForJob(@Nonnull JobID jobID) {
        LOG.debug("Releasing state changelog storage view under job id {}.", jobID);
        synchronized (this.lock) {
            if (this.closed) {
                return;
            }
            StateChangelogStorageView<ChangelogStateHandleStreamImpl> remove = this.changelogStorageViewsByJobId.remove(jobID);
            if (remove != null) {
                doRelease(remove);
            }
        }
    }

    public void releaseResourcesForJob(@Nonnull JobID jobID) {
        releaseStateChangelogStorageForJob(jobID);
        releaseStateChangelogStorageViewForJob(jobID);
    }

    public void shutdown() {
        synchronized (this.lock) {
            if (this.closed) {
                return;
            }
            this.closed = true;
            HashMap hashMap = new HashMap(this.changelogStoragesByJobId);
            HashMap hashMap2 = new HashMap(this.changelogStorageViewsByJobId);
            this.changelogStoragesByJobId.clear();
            this.changelogStorageViewsByJobId.clear();
            ShutdownHookUtil.removeShutdownHook(this.shutdownHook, getClass().getSimpleName(), LOG);
            LOG.info("Shutting down TaskExecutorStateChangelogStoragesManager.");
            Iterator it = hashMap.entrySet().iterator();
            while (it.hasNext()) {
                ((Optional) ((Map.Entry) it.next()).getValue()).ifPresent((v1) -> {
                    doRelease(v1);
                });
            }
            Iterator it2 = hashMap2.entrySet().iterator();
            while (it2.hasNext()) {
                doRelease((StateChangelogStorageView) ((Map.Entry) it2.next()).getValue());
            }
        }
    }

    private void doRelease(StateChangelogStorageView<?> stateChangelogStorageView) {
        if (stateChangelogStorageView != null) {
            try {
                stateChangelogStorageView.close();
            } catch (Exception e) {
                LOG.warn("Exception while disposing state changelog storage {}.", stateChangelogStorageView, e);
            }
        }
    }

    @VisibleForTesting
    @Nullable
    public Optional<StateChangelogStorage<?>> getChangelogStoragesByJobId(JobID jobID) {
        Optional<StateChangelogStorage<?>> optional;
        synchronized (this.lock) {
            optional = this.changelogStoragesByJobId.get(jobID);
        }
        return optional;
    }
}
