package org.apache.flink.streaming.api.operators;

import java.io.Closeable;
import java.io.IOException;
import java.util.Optional;
import java.util.OptionalLong;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.DefaultKeyedStateStore;
import org.apache.flink.runtime.state.FullSnapshotResources;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.SavepointResources;
import org.apache.flink.runtime.state.SavepointSnapshotStrategy;
import org.apache.flink.runtime.state.SnapshotStrategyRunner;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.shaded.guava31.com.google.common.io.Closer;
import org.apache.flink.util.CloseableIterable;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.class */
public class StreamOperatorStateHandler {
    protected static final Logger LOG = LoggerFactory.getLogger((Class<?>) StreamOperatorStateHandler.class);

    @Nullable
    private final CheckpointableKeyedStateBackend<?> keyedStateBackend;
    private final CloseableRegistry closeableRegistry;

    @Nullable
    private final DefaultKeyedStateStore keyedStateStore;
    private final OperatorStateBackend operatorStateBackend;
    private final StreamOperatorStateContext context;

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamOperatorStateHandler$CheckpointedStreamOperator.class */
    public interface CheckpointedStreamOperator {
        void initializeState(StateInitializationContext stateInitializationContext) throws Exception;

        void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception;
    }

    public StreamOperatorStateHandler(StreamOperatorStateContext streamOperatorStateContext, ExecutionConfig executionConfig, CloseableRegistry closeableRegistry) {
        this.context = streamOperatorStateContext;
        this.operatorStateBackend = streamOperatorStateContext.operatorStateBackend();
        this.keyedStateBackend = streamOperatorStateContext.keyedStateBackend();
        this.closeableRegistry = closeableRegistry;
        if (this.keyedStateBackend != null) {
            this.keyedStateStore = new DefaultKeyedStateStore(this.keyedStateBackend, executionConfig);
        } else {
            this.keyedStateStore = null;
        }
    }

    public void initializeOperatorState(CheckpointedStreamOperator checkpointedStreamOperator) throws Exception {
        CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = this.context.rawKeyedStateInputs();
        CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = this.context.rawOperatorStateInputs();
        try {
            OptionalLong restoredCheckpointId = this.context.getRestoredCheckpointId();
            checkpointedStreamOperator.initializeState(new StateInitializationContextImpl(restoredCheckpointId.isPresent() ? Long.valueOf(restoredCheckpointId.getAsLong()) : null, this.operatorStateBackend, this.keyedStateStore, rawKeyedStateInputs, rawOperatorStateInputs));
            closeFromRegistry(rawOperatorStateInputs, this.closeableRegistry);
            closeFromRegistry(rawKeyedStateInputs, this.closeableRegistry);
        } catch (Throwable th) {
            closeFromRegistry(rawOperatorStateInputs, this.closeableRegistry);
            closeFromRegistry(rawKeyedStateInputs, this.closeableRegistry);
            throw th;
        }
    }

    private static void closeFromRegistry(Closeable closeable, CloseableRegistry closeableRegistry) {
        if (closeableRegistry.unregisterCloseable(closeable)) {
            IOUtils.closeQuietly(closeable);
        }
    }

    public void dispose() throws Exception {
        Closer create = Closer.create();
        Throwable th = null;
        try {
            if (this.closeableRegistry.unregisterCloseable(this.operatorStateBackend)) {
                create.register(this.operatorStateBackend);
            }
            if (this.closeableRegistry.unregisterCloseable(this.keyedStateBackend)) {
                create.register(this.keyedStateBackend);
            }
            if (this.operatorStateBackend != null) {
                OperatorStateBackend operatorStateBackend = this.operatorStateBackend;
                operatorStateBackend.getClass();
                create.register(operatorStateBackend::dispose);
            }
            if (this.keyedStateBackend != null) {
                CheckpointableKeyedStateBackend<?> checkpointableKeyedStateBackend = this.keyedStateBackend;
                checkpointableKeyedStateBackend.getClass();
                create.register(checkpointableKeyedStateBackend::dispose);
            }
            if (create != null) {
                if (0 == 0) {
                    create.close();
                    return;
                }
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    public OperatorSnapshotFutures snapshotState(CheckpointedStreamOperator checkpointedStreamOperator, Optional<InternalTimeServiceManager<?>> optional, String str, long j, long j2, CheckpointOptions checkpointOptions, CheckpointStreamFactory checkpointStreamFactory, boolean z) throws CheckpointException {
        KeyGroupRange keyGroupRange = null != this.keyedStateBackend ? this.keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
        OperatorSnapshotFutures operatorSnapshotFutures = new OperatorSnapshotFutures();
        snapshotState(checkpointedStreamOperator, optional, str, j, j2, checkpointOptions, checkpointStreamFactory, operatorSnapshotFutures, new StateSnapshotContextSynchronousImpl(j, j2, checkpointStreamFactory, keyGroupRange, this.closeableRegistry), z);
        return operatorSnapshotFutures;
    }

    @VisibleForTesting
    void snapshotState(CheckpointedStreamOperator checkpointedStreamOperator, Optional<InternalTimeServiceManager<?>> optional, String str, long j, long j2, CheckpointOptions checkpointOptions, CheckpointStreamFactory checkpointStreamFactory, OperatorSnapshotFutures operatorSnapshotFutures, StateSnapshotContextSynchronousImpl stateSnapshotContextSynchronousImpl, boolean z) throws CheckpointException {
        try {
            if (optional.isPresent()) {
                Preconditions.checkState(this.keyedStateBackend != null, "keyedStateBackend should be available with timeServiceManager");
                InternalTimeServiceManager<?> internalTimeServiceManager = optional.get();
                if ((this.keyedStateBackend instanceof AbstractKeyedStateBackend) && ((AbstractKeyedStateBackend) this.keyedStateBackend).requiresLegacySynchronousTimerSnapshots(checkpointOptions.getCheckpointType())) {
                    Preconditions.checkState(!z, "Attempting to snapshot timers to raw keyed state, but this operator has custom raw keyed state to write.");
                    internalTimeServiceManager.snapshotToRawKeyedState(stateSnapshotContextSynchronousImpl.getRawKeyedOperatorStateOutput(), str);
                }
            }
            checkpointedStreamOperator.snapshotState(stateSnapshotContextSynchronousImpl);
            operatorSnapshotFutures.setKeyedStateRawFuture(stateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture());
            operatorSnapshotFutures.setOperatorStateRawFuture(stateSnapshotContextSynchronousImpl.getOperatorStateStreamFuture());
            if (null != this.operatorStateBackend) {
                operatorSnapshotFutures.setOperatorStateManagedFuture(this.operatorStateBackend.snapshot(j, j2, checkpointStreamFactory, checkpointOptions));
            }
            if (null != this.keyedStateBackend) {
                if (isCanonicalSavepoint(checkpointOptions.getCheckpointType())) {
                    operatorSnapshotFutures.setKeyedStateManagedFuture(prepareCanonicalSavepoint(this.keyedStateBackend, this.closeableRegistry).snapshot(j, j2, checkpointStreamFactory, checkpointOptions));
                } else {
                    operatorSnapshotFutures.setKeyedStateManagedFuture(this.keyedStateBackend.snapshot(j, j2, checkpointStreamFactory, checkpointOptions));
                }
            }
        } catch (Exception e) {
            try {
                operatorSnapshotFutures.cancel();
            } catch (Exception e2) {
                e.addSuppressed(e2);
            }
            String str2 = "Could not complete snapshot " + j + " for operator " + str + ".";
            try {
                stateSnapshotContextSynchronousImpl.closeExceptionally();
            } catch (IOException e3) {
                e.addSuppressed(e3);
            }
            throw new CheckpointException(str2, CheckpointFailureReason.CHECKPOINT_DECLINED, e);
        }
    }

    private boolean isCanonicalSavepoint(SnapshotType snapshotType) {
        return snapshotType.isSavepoint() && ((SavepointType) snapshotType).getFormatType() == SavepointFormatType.CANONICAL;
    }

    @Nonnull
    public static SnapshotStrategyRunner<KeyedStateHandle, ? extends FullSnapshotResources<?>> prepareCanonicalSavepoint(CheckpointableKeyedStateBackend<?> checkpointableKeyedStateBackend, CloseableRegistry closeableRegistry) throws Exception {
        SavepointResources<?> savepoint = checkpointableKeyedStateBackend.savepoint();
        return new SnapshotStrategyRunner<>("Asynchronous full Savepoint", new SavepointSnapshotStrategy(savepoint.getSnapshotResources()), closeableRegistry, savepoint.getPreferredSnapshotExecutionType());
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        if (this.keyedStateBackend instanceof CheckpointListener) {
            ((CheckpointListener) this.keyedStateBackend).notifyCheckpointComplete(j);
        }
    }

    public void notifyCheckpointAborted(long j) throws Exception {
        if (this.keyedStateBackend instanceof CheckpointListener) {
            ((CheckpointListener) this.keyedStateBackend).notifyCheckpointAborted(j);
        }
    }

    public <K> KeyedStateBackend<K> getKeyedStateBackend() {
        return this.keyedStateBackend;
    }

    public OperatorStateBackend getOperatorStateBackend() {
        return this.operatorStateBackend;
    }

    public <N, S extends State, T> S getOrCreateKeyedState(TypeSerializer<N> typeSerializer, StateDescriptor<S, T> stateDescriptor) throws Exception {
        if (this.keyedStateBackend != null) {
            return (S) this.keyedStateBackend.getOrCreateKeyedState(typeSerializer, stateDescriptor);
        }
        throw new IllegalStateException("Cannot create partitioned state. The keyed state backend has not been set.This indicates that the operator is not partitioned/keyed.");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <S extends State, N> S getPartitionedState(N n, TypeSerializer<N> typeSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
        if (this.keyedStateBackend != null) {
            return (S) this.keyedStateBackend.getPartitionedState(n, typeSerializer, stateDescriptor);
        }
        throw new RuntimeException("Cannot create partitioned state. The keyed state backend has not been set. This indicates that the operator is not partitioned/keyed.");
    }

    public void setCurrentKey(Object obj) {
        if (this.keyedStateBackend != null) {
            try {
                this.keyedStateBackend.setCurrentKey(obj);
            } catch (Exception e) {
                throw new RuntimeException("Exception occurred while setting the current key context.", e);
            }
        }
    }

    public Object getCurrentKey() {
        if (this.keyedStateBackend != null) {
            return this.keyedStateBackend.getCurrentKey();
        }
        throw new UnsupportedOperationException("Key can only be retrieved on KeyedStream.");
    }

    public Optional<KeyedStateStore> getKeyedStateStore() {
        return Optional.ofNullable(this.keyedStateStore);
    }
}
