package org.apache.flink.runtime.security.token;

import java.time.Clock;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.core.security.token.DelegationTokenProvider;
import org.apache.flink.core.security.token.DelegationTokenReceiver;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.class */
public class DefaultDelegationTokenManager implements DelegationTokenManager {
    private static final String PROVIDER_RECEIVER_INCONSISTENCY_ERROR = "There is an inconsistency between loaded delegation token providers and receivers. One must implement a DelegationTokenProvider and a DelegationTokenReceiver with the same service name and add them together to the classpath to make the system consistent. The mentioned classes are loaded with Java's service loader so the appropriate META-INF registration also needs to be created.";
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) DefaultDelegationTokenManager.class);
    private final Configuration configuration;

    @Nullable
    private final PluginManager pluginManager;
    private final double tokensRenewalTimeRatio;
    private final long renewalRetryBackoffPeriod;
    private final DelegationTokenReceiverRepository delegationTokenReceiverRepository;

    @Nullable
    private final ScheduledExecutor scheduledExecutor;

    @Nullable
    private final ExecutorService ioExecutor;

    @GuardedBy("tokensUpdateFutureLock")
    @Nullable
    private ScheduledFuture<?> tokensUpdateFuture;

    @Nullable
    private DelegationTokenManager.Listener listener;
    private final Object tokensUpdateFutureLock = new Object();

    @VisibleForTesting
    final Map<String, DelegationTokenProvider> delegationTokenProviders = loadProviders();

    public DefaultDelegationTokenManager(Configuration configuration, @Nullable PluginManager pluginManager, @Nullable ScheduledExecutor scheduledExecutor, @Nullable ExecutorService executorService) {
        this.configuration = (Configuration) Preconditions.checkNotNull(configuration, "Flink configuration must not be null");
        this.pluginManager = pluginManager;
        this.tokensRenewalTimeRatio = ((Double) configuration.get(SecurityOptions.DELEGATION_TOKENS_RENEWAL_TIME_RATIO)).doubleValue();
        this.renewalRetryBackoffPeriod = ((Duration) configuration.get(SecurityOptions.DELEGATION_TOKENS_RENEWAL_RETRY_BACKOFF)).toMillis();
        this.delegationTokenReceiverRepository = new DelegationTokenReceiverRepository(configuration, pluginManager);
        this.scheduledExecutor = scheduledExecutor;
        this.ioExecutor = executorService;
        checkProviderAndReceiverConsistency(this.delegationTokenProviders, this.delegationTokenReceiverRepository.delegationTokenReceivers);
        HashSet hashSet = new HashSet();
        checkSamePrefixedProviders(this.delegationTokenProviders, hashSet);
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            LOG.warn((String) it.next());
        }
    }

    private Map<String, DelegationTokenProvider> loadProviders() {
        LOG.info("Loading delegation token providers");
        HashMap hashMap = new HashMap();
        Consumer consumer = delegationTokenProvider -> {
            try {
                if (isProviderEnabled(this.configuration, delegationTokenProvider.serviceName())) {
                    delegationTokenProvider.init(this.configuration);
                    LOG.info("Delegation token provider {} loaded and initialized", delegationTokenProvider.serviceName());
                    Preconditions.checkState(!hashMap.containsKey(delegationTokenProvider.serviceName()), "Delegation token provider with service name " + delegationTokenProvider.serviceName() + " has multiple implementations");
                    hashMap.put(delegationTokenProvider.serviceName(), delegationTokenProvider);
                } else {
                    LOG.info("Delegation token provider {} is disabled so not loaded", delegationTokenProvider.serviceName());
                }
            } catch (Exception | NoClassDefFoundError e) {
                LOG.error("Failed to initialize delegation token provider {}", delegationTokenProvider.serviceName(), e);
                throw new FlinkRuntimeException(e);
            }
        };
        ServiceLoader.load(DelegationTokenProvider.class).iterator().forEachRemaining(consumer);
        if (this.pluginManager != null) {
            this.pluginManager.load(DelegationTokenProvider.class).forEachRemaining(consumer);
        }
        LOG.info("Delegation token providers loaded successfully");
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean isProviderEnabled(Configuration configuration, String str) {
        return SecurityOptions.forProvider(configuration, str).getBoolean(SecurityOptions.DELEGATION_TOKEN_PROVIDER_ENABLED);
    }

    @VisibleForTesting
    boolean isProviderLoaded(String str) {
        return this.delegationTokenProviders.containsKey(str);
    }

    @VisibleForTesting
    boolean isReceiverLoaded(String str) {
        return this.delegationTokenReceiverRepository.isReceiverLoaded(str);
    }

    @VisibleForTesting
    static void checkProviderAndReceiverConsistency(Map<String, DelegationTokenProvider> map, Map<String, DelegationTokenReceiver> map2) {
        LOG.info("Checking provider and receiver instances consistency");
        if (map.size() != map2.size()) {
            HashSet hashSet = new HashSet(map.keySet());
            hashSet.removeAll(map2.keySet());
            if (!hashSet.isEmpty()) {
                throw new IllegalStateException("There is an inconsistency between loaded delegation token providers and receivers. One must implement a DelegationTokenProvider and a DelegationTokenReceiver with the same service name and add them together to the classpath to make the system consistent. The mentioned classes are loaded with Java's service loader so the appropriate META-INF registration also needs to be created. Missing receivers: " + String.join(",", hashSet));
            }
            HashSet hashSet2 = new HashSet(map2.keySet());
            hashSet2.removeAll(map.keySet());
            if (!hashSet2.isEmpty()) {
                throw new IllegalStateException("There is an inconsistency between loaded delegation token providers and receivers. One must implement a DelegationTokenProvider and a DelegationTokenReceiver with the same service name and add them together to the classpath to make the system consistent. The mentioned classes are loaded with Java's service loader so the appropriate META-INF registration also needs to be created. Missing providers: " + String.join(",", hashSet2));
            }
        }
        LOG.info("Provider and receiver instances are consistent");
    }

    @VisibleForTesting
    static void checkSamePrefixedProviders(Map<String, DelegationTokenProvider> map, Set<String> set) {
        HashSet hashSet = new HashSet();
        Iterator<String> it = map.keySet().iterator();
        while (it.hasNext()) {
            String[] split = it.next().split("-");
            if (!hashSet.add(split[0])) {
                set.add(String.format("Multiple providers loaded with the same prefix: %s. This might lead to unintended consequences, please consider using only one of them.", split[0]));
            }
        }
    }

    @Override // org.apache.flink.runtime.security.token.DelegationTokenManager
    public void obtainDelegationTokens(DelegationTokenContainer delegationTokenContainer) throws Exception {
        LOG.info("Obtaining delegation tokens");
        obtainDelegationTokensAndGetNextRenewal(delegationTokenContainer);
        LOG.info("Delegation tokens obtained successfully");
    }

    @Override // org.apache.flink.runtime.security.token.DelegationTokenManager
    public void obtainDelegationTokens() throws Exception {
        LOG.info("Obtaining delegation tokens");
        DelegationTokenContainer delegationTokenContainer = new DelegationTokenContainer();
        obtainDelegationTokensAndGetNextRenewal(delegationTokenContainer);
        LOG.info("Delegation tokens obtained successfully");
        if (delegationTokenContainer.hasTokens()) {
            this.delegationTokenReceiverRepository.onNewTokensObtained(delegationTokenContainer);
        } else {
            LOG.warn("No tokens obtained so skipping notifications");
        }
    }

    protected Optional<Long> obtainDelegationTokensAndGetNextRenewal(DelegationTokenContainer delegationTokenContainer) {
        return this.delegationTokenProviders.values().stream().map(delegationTokenProvider -> {
            Optional<Long> empty = Optional.empty();
            try {
                if (delegationTokenProvider.delegationTokensRequired()) {
                    LOG.debug("Obtaining delegation token for service {}", delegationTokenProvider.serviceName());
                    DelegationTokenProvider.ObtainedDelegationTokens obtainDelegationTokens = delegationTokenProvider.obtainDelegationTokens();
                    Preconditions.checkNotNull(obtainDelegationTokens, "Obtained delegation tokens must not be null");
                    delegationTokenContainer.addToken(delegationTokenProvider.serviceName(), obtainDelegationTokens.getTokens());
                    empty = obtainDelegationTokens.getValidUntil();
                    LOG.debug("Obtained delegation token for service {} successfully", delegationTokenProvider.serviceName());
                } else {
                    LOG.debug("Service {} does not need to obtain delegation token", delegationTokenProvider.serviceName());
                }
                return empty;
            } catch (Exception e) {
                LOG.error("Failed to obtain delegation token for provider {}", delegationTokenProvider.serviceName(), e);
                throw new FlinkRuntimeException(e);
            }
        }).flatMap(optional -> {
            return (Stream) optional.map((v0) -> {
                return Stream.of(v0);
            }).orElseGet(Stream::empty);
        }).min((v0, v1) -> {
            return Long.compare(v0, v1);
        });
    }

    @Override // org.apache.flink.runtime.security.token.DelegationTokenManager
    public void start(DelegationTokenManager.Listener listener) throws Exception {
        Preconditions.checkNotNull(this.scheduledExecutor, "Scheduled executor must not be null");
        Preconditions.checkNotNull(this.ioExecutor, "IO executor must not be null");
        this.listener = (DelegationTokenManager.Listener) Preconditions.checkNotNull(listener, "Listener must not be null");
        synchronized (this.tokensUpdateFutureLock) {
            Preconditions.checkState(this.tokensUpdateFuture == null, "Manager is already started");
        }
        startTokensUpdate();
    }

    @VisibleForTesting
    void startTokensUpdate() {
        try {
            LOG.info("Starting tokens update task");
            DelegationTokenContainer delegationTokenContainer = new DelegationTokenContainer();
            Optional<Long> obtainDelegationTokensAndGetNextRenewal = obtainDelegationTokensAndGetNextRenewal(delegationTokenContainer);
            if (delegationTokenContainer.hasTokens()) {
                this.delegationTokenReceiverRepository.onNewTokensObtained(delegationTokenContainer);
                LOG.info("Notifying listener about new tokens");
                Preconditions.checkNotNull(this.listener, "Listener must not be null");
                this.listener.onNewTokensObtained(InstantiationUtil.serializeObject(delegationTokenContainer));
                LOG.info("Listener notified successfully");
            } else {
                LOG.warn("No tokens obtained so skipping notifications");
            }
            if (obtainDelegationTokensAndGetNextRenewal.isPresent()) {
                long calculateRenewalDelay = calculateRenewalDelay(Clock.systemDefaultZone(), obtainDelegationTokensAndGetNextRenewal.get().longValue());
                synchronized (this.tokensUpdateFutureLock) {
                    this.tokensUpdateFuture = this.scheduledExecutor.schedule(() -> {
                        this.ioExecutor.execute(this::startTokensUpdate);
                    }, calculateRenewalDelay, TimeUnit.MILLISECONDS);
                }
                LOG.info("Tokens update task started with {} ms delay", Long.valueOf(calculateRenewalDelay));
            } else {
                LOG.warn("Tokens update task not started because either no tokens obtained or none of the tokens specified its renewal date");
            }
        } catch (InterruptedException e) {
            LOG.debug("Interrupted", (Throwable) e);
        } catch (Exception e2) {
            synchronized (this.tokensUpdateFutureLock) {
                this.tokensUpdateFuture = this.scheduledExecutor.schedule(() -> {
                    this.ioExecutor.execute(this::startTokensUpdate);
                }, this.renewalRetryBackoffPeriod, TimeUnit.MILLISECONDS);
                LOG.warn("Failed to update tokens, will try again in {} ms", Long.valueOf(this.renewalRetryBackoffPeriod), e2);
            }
        }
    }

    @VisibleForTesting
    void stopTokensUpdate() {
        synchronized (this.tokensUpdateFutureLock) {
            if (this.tokensUpdateFuture != null) {
                this.tokensUpdateFuture.cancel(true);
                this.tokensUpdateFuture = null;
            }
        }
    }

    @VisibleForTesting
    long calculateRenewalDelay(Clock clock, long j) {
        long millis = clock.millis();
        long round = Math.round(this.tokensRenewalTimeRatio * (j - millis));
        LOG.debug("Calculated delay on renewal is {}, based on next renewal {} and the ratio {}, and current time {}", Long.valueOf(round), Long.valueOf(j), Double.valueOf(this.tokensRenewalTimeRatio), Long.valueOf(millis));
        return round;
    }

    @Override // org.apache.flink.runtime.security.token.DelegationTokenManager
    public void stop() {
        LOG.info("Stopping credential renewal");
        stopTokensUpdate();
        LOG.info("Stopped credential renewal");
    }
}
