package org.apache.flink.streaming.api.functions.source;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Preconditions;
import org.xbill.DNS.TTL;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.class */
public class StatefulSequenceSource extends RichParallelSourceFunction<Long> implements CheckpointedFunction {
    private static final long serialVersionUID = 1;
    private final long start;
    private final long end;
    private volatile boolean isRunning = true;
    private transient Deque<Long> valuesToEmit;
    private transient ListState<Long> checkpointedState;

    public StatefulSequenceSource(long j, long j2) {
        this.start = j;
        this.end = j2;
    }

    @Override // org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        Preconditions.checkState(this.checkpointedState == null, "The " + getClass().getSimpleName() + " has already been initialized.");
        this.checkpointedState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("stateful-sequence-source-state", LongSerializer.INSTANCE));
        this.valuesToEmit = new ArrayDeque();
        if (functionInitializationContext.isRestored()) {
            Iterator it = this.checkpointedState.get().iterator();
            while (it.hasNext()) {
                this.valuesToEmit.add((Long) it.next());
            }
            return;
        }
        int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        long j = this.start + indexOfThisSubtask;
        long abs = Math.abs((this.end - this.start) + 1);
        int safeDivide = safeDivide(abs, numberOfParallelSubtasks);
        int i = abs % ((long) numberOfParallelSubtasks) > ((long) indexOfThisSubtask) ? safeDivide + 1 : safeDivide;
        long j2 = 0;
        while (true) {
            long j3 = j2;
            if (j3 >= i) {
                return;
            }
            this.valuesToEmit.add(Long.valueOf((j3 * numberOfParallelSubtasks) + j));
            j2 = j3 + 1;
        }
    }

    @Override // org.apache.flink.streaming.api.functions.source.SourceFunction
    public void run(SourceFunction.SourceContext<Long> sourceContext) throws Exception {
        while (this.isRunning && !this.valuesToEmit.isEmpty()) {
            synchronized (sourceContext.getCheckpointLock()) {
                sourceContext.collect(this.valuesToEmit.poll());
            }
        }
    }

    @Override // org.apache.flink.streaming.api.functions.source.SourceFunction
    public void cancel() {
        this.isRunning = false;
    }

    @Override // org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        Preconditions.checkState(this.checkpointedState != null, "The " + getClass().getSimpleName() + " state has not been properly initialized.");
        this.checkpointedState.clear();
        Iterator<Long> it = this.valuesToEmit.iterator();
        while (it.hasNext()) {
            this.checkpointedState.add(it.next());
        }
    }

    private static int safeDivide(long j, long j2) {
        Preconditions.checkArgument(j2 > 0);
        Preconditions.checkArgument(j >= 0);
        Preconditions.checkArgument(j <= TTL.MAX_VALUE * j2);
        return (int) (j / j2);
    }
}
