package org.apache.flink.runtime.heartbeat;

import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.heartbeat.HeartbeatMonitor;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/heartbeat/HeartbeatMonitorImpl.class */
public class HeartbeatMonitorImpl<O> implements HeartbeatMonitor<O>, Runnable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) HeartbeatMonitorImpl.class);
    private final ResourceID resourceID;
    private final HeartbeatTarget<O> heartbeatTarget;
    private final ScheduledExecutor scheduledExecutor;
    private final HeartbeatListener<?, ?> heartbeatListener;
    private final long heartbeatTimeoutIntervalMs;
    private final int failedRpcRequestsUntilUnreachable;
    private volatile ScheduledFuture<?> futureTimeout;
    private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING);
    private final AtomicInteger numberFailedRpcRequestsSinceLastSuccess = new AtomicInteger(0);
    private volatile long lastHeartbeat;

    /* loaded from: input_file:org/apache/flink/runtime/heartbeat/HeartbeatMonitorImpl$Factory.class */
    static class Factory<O> implements HeartbeatMonitor.Factory<O> {
        @Override // org.apache.flink.runtime.heartbeat.HeartbeatMonitor.Factory
        public HeartbeatMonitor<O> createHeartbeatMonitor(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget, ScheduledExecutor scheduledExecutor, HeartbeatListener<?, O> heartbeatListener, long j, int i) {
            return new HeartbeatMonitorImpl(resourceID, heartbeatTarget, scheduledExecutor, heartbeatListener, j, i);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/heartbeat/HeartbeatMonitorImpl$State.class */
    public enum State {
        RUNNING,
        TIMEOUT,
        UNREACHABLE,
        CANCELED
    }

    HeartbeatMonitorImpl(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget, ScheduledExecutor scheduledExecutor, HeartbeatListener<?, O> heartbeatListener, long j, int i) {
        this.resourceID = (ResourceID) Preconditions.checkNotNull(resourceID);
        this.heartbeatTarget = (HeartbeatTarget) Preconditions.checkNotNull(heartbeatTarget);
        this.scheduledExecutor = (ScheduledExecutor) Preconditions.checkNotNull(scheduledExecutor);
        this.heartbeatListener = (HeartbeatListener) Preconditions.checkNotNull(heartbeatListener);
        Preconditions.checkArgument(j > 0, "The heartbeat timeout interval has to be larger than 0.");
        this.heartbeatTimeoutIntervalMs = j;
        Preconditions.checkArgument(i > 0 || i == -1, "The number of failed heartbeat RPC requests has to be larger than 0 or -1 (deactivated).");
        this.failedRpcRequestsUntilUnreachable = i;
        this.lastHeartbeat = 0L;
        resetHeartbeatTimeout(j);
    }

    @Override // org.apache.flink.runtime.heartbeat.HeartbeatMonitor
    public HeartbeatTarget<O> getHeartbeatTarget() {
        return this.heartbeatTarget;
    }

    @Override // org.apache.flink.runtime.heartbeat.HeartbeatMonitor
    public ResourceID getHeartbeatTargetId() {
        return this.resourceID;
    }

    @Override // org.apache.flink.runtime.heartbeat.HeartbeatMonitor
    public long getLastHeartbeat() {
        return this.lastHeartbeat;
    }

    @Override // org.apache.flink.runtime.heartbeat.HeartbeatMonitor
    public void reportHeartbeatRpcFailure() {
        int incrementAndGet = this.numberFailedRpcRequestsSinceLastSuccess.incrementAndGet();
        if (isHeartbeatRpcFailureDetectionEnabled() && incrementAndGet >= this.failedRpcRequestsUntilUnreachable && this.state.compareAndSet(State.RUNNING, State.UNREACHABLE)) {
            LOG.debug("Mark heartbeat target {} as unreachable because {} consecutive heartbeat RPCs have failed.", this.resourceID, Integer.valueOf(incrementAndGet));
            cancelTimeout();
            this.heartbeatListener.notifyTargetUnreachable(this.resourceID);
        }
    }

    private boolean isHeartbeatRpcFailureDetectionEnabled() {
        return this.failedRpcRequestsUntilUnreachable > 0;
    }

    @Override // org.apache.flink.runtime.heartbeat.HeartbeatMonitor
    public void reportHeartbeatRpcSuccess() {
        this.numberFailedRpcRequestsSinceLastSuccess.set(0);
    }

    @Override // org.apache.flink.runtime.heartbeat.HeartbeatMonitor
    public void reportHeartbeat() {
        this.lastHeartbeat = System.currentTimeMillis();
        resetHeartbeatTimeout(this.heartbeatTimeoutIntervalMs);
    }

    @Override // org.apache.flink.runtime.heartbeat.HeartbeatMonitor
    public void cancel() {
        if (this.state.compareAndSet(State.RUNNING, State.CANCELED)) {
            cancelTimeout();
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        if (this.state.compareAndSet(State.RUNNING, State.TIMEOUT)) {
            this.heartbeatListener.notifyHeartbeatTimeout(this.resourceID);
        }
    }

    public boolean isCanceled() {
        return this.state.get() == State.CANCELED;
    }

    void resetHeartbeatTimeout(long j) {
        if (this.state.get() == State.RUNNING) {
            cancelTimeout();
            this.futureTimeout = this.scheduledExecutor.schedule(this, j, TimeUnit.MILLISECONDS);
            if (this.state.get() != State.RUNNING) {
                cancelTimeout();
            }
        }
    }

    private void cancelTimeout() {
        if (this.futureTimeout != null) {
            this.futureTimeout.cancel(true);
        }
    }
}
