package org.apache.flink.runtime.leaderretrieval;

import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.UUID;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.leaderelection.LeaderInformation;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.state.ConnectionState;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.class */
public class ZooKeeperLeaderRetrievalDriver implements LeaderRetrievalDriver {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ZooKeeperLeaderRetrievalDriver.class);
    private final CuratorFramework client;
    private final TreeCache cache;
    private final String connectionInformationPath;
    private final ConnectionStateListener connectionStateListener = (curatorFramework, connectionState) -> {
        handleStateChange(connectionState);
    };
    private final LeaderRetrievalEventHandler leaderRetrievalEventHandler;
    private final LeaderInformationClearancePolicy leaderInformationClearancePolicy;
    private final FatalErrorHandler fatalErrorHandler;
    private volatile boolean running;

    /* loaded from: input_file:org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver$LeaderInformationClearancePolicy.class */
    public enum LeaderInformationClearancePolicy {
        ON_SUSPENDED_CONNECTION,
        ON_LOST_CONNECTION
    }

    public ZooKeeperLeaderRetrievalDriver(CuratorFramework curatorFramework, String str, LeaderRetrievalEventHandler leaderRetrievalEventHandler, LeaderInformationClearancePolicy leaderInformationClearancePolicy, FatalErrorHandler fatalErrorHandler) throws Exception {
        this.client = (CuratorFramework) Preconditions.checkNotNull(curatorFramework, "CuratorFramework client");
        this.connectionInformationPath = ZooKeeperUtils.generateConnectionInformationPath(str);
        this.cache = ZooKeeperUtils.createTreeCache(curatorFramework, this.connectionInformationPath, this::retrieveLeaderInformationFromZooKeeper);
        this.leaderRetrievalEventHandler = (LeaderRetrievalEventHandler) Preconditions.checkNotNull(leaderRetrievalEventHandler);
        this.leaderInformationClearancePolicy = leaderInformationClearancePolicy;
        this.fatalErrorHandler = (FatalErrorHandler) Preconditions.checkNotNull(fatalErrorHandler);
        this.cache.start();
        curatorFramework.getConnectionStateListenable().addListener(this.connectionStateListener);
        this.running = true;
    }

    @Override // org.apache.flink.runtime.leaderretrieval.LeaderRetrievalDriver
    public void close() throws Exception {
        if (this.running) {
            this.running = false;
            LOG.info("Closing {}.", this);
            this.client.getConnectionStateListenable().removeListener(this.connectionStateListener);
            this.cache.close();
        }
    }

    private void retrieveLeaderInformationFromZooKeeper() {
        byte[] data;
        try {
            LOG.debug("Leader node has changed.");
            ChildData currentData = this.cache.getCurrentData(this.connectionInformationPath);
            if (currentData == null || (data = currentData.getData()) == null || data.length <= 0) {
                notifyNoLeader();
                return;
            }
            ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(data));
            this.leaderRetrievalEventHandler.notifyLeaderAddress(LeaderInformation.known((UUID) objectInputStream.readObject(), objectInputStream.readUTF()));
        } catch (Exception e) {
            this.fatalErrorHandler.onFatalError(new LeaderRetrievalException("Could not handle node changed event.", e));
            ExceptionUtils.checkInterrupted(e);
        }
    }

    private void handleStateChange(ConnectionState connectionState) {
        switch (connectionState) {
            case CONNECTED:
                LOG.debug("Connected to ZooKeeper quorum. Leader retrieval can start.");
                return;
            case SUSPENDED:
                LOG.warn("Connection to ZooKeeper suspended, waiting for reconnection.");
                if (this.leaderInformationClearancePolicy == LeaderInformationClearancePolicy.ON_SUSPENDED_CONNECTION) {
                    notifyNoLeader();
                    return;
                }
                return;
            case RECONNECTED:
                LOG.info("Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.");
                onReconnectedConnectionState();
                return;
            case LOST:
                LOG.warn("Connection to ZooKeeper lost. Can no longer retrieve the leader from ZooKeeper.");
                notifyNoLeader();
                return;
            default:
                return;
        }
    }

    private void notifyNoLeader() {
        this.leaderRetrievalEventHandler.notifyLeaderAddress(LeaderInformation.empty());
    }

    private void onReconnectedConnectionState() {
        retrieveLeaderInformationFromZooKeeper();
    }

    public String toString() {
        return "ZookeeperLeaderRetrievalDriver{connectionInformationPath='" + this.connectionInformationPath + "'}";
    }

    @VisibleForTesting
    public String getConnectionInformationPath() {
        return this.connectionInformationPath;
    }
}
