package io.debezium.connector.mysql.legacy;

import io.debezium.config.Configuration;
import io.debezium.connector.mysql.HaltingPredicate;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.connector.mysql.legacy.ChainedReader;
import io.debezium.connector.mysql.legacy.MySqlConnectorTask;
import io.debezium.connector.mysql.legacy.Reader;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/mysql/legacy/ParallelSnapshotReader.class */
public class ParallelSnapshotReader implements Reader {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ParallelSnapshotReader.class);
    private final BinlogReader oldTablesReader;
    private final BinlogReader newTablesBinlogReader;
    private final ChainedReader newTablesReader;
    private final AtomicReference<MySqlPartition> partition;
    private final AtomicBoolean running;
    private final AtomicBoolean completed;
    private final AtomicReference<Consumer<MySqlPartition>> uponCompletion;
    private final MySqlConnectorTask.ServerIdGenerator serverIdGenerator;

    /* loaded from: input_file:io/debezium/connector/mysql/legacy/ParallelSnapshotReader$ParallelHaltingPredicate.class */
    static class ParallelHaltingPredicate implements HaltingPredicate {
        private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ParallelHaltingPredicate.class);
        private static final Duration DEFAULT_MIN_HALTING_DURATION = Duration.ofMinutes(5);
        private volatile AtomicBoolean thisReaderNearEnd;
        private volatile AtomicBoolean otherReaderNearEnd;
        private final Duration minHaltingDuration;

        ParallelHaltingPredicate(AtomicBoolean atomicBoolean, AtomicBoolean atomicBoolean2) {
            this(atomicBoolean, atomicBoolean2, DEFAULT_MIN_HALTING_DURATION);
        }

        ParallelHaltingPredicate(AtomicBoolean atomicBoolean, AtomicBoolean atomicBoolean2, Duration duration) {
            this.otherReaderNearEnd = atomicBoolean2;
            this.thisReaderNearEnd = atomicBoolean;
            this.minHaltingDuration = duration;
        }

        @Override // io.debezium.connector.mysql.HaltingPredicate
        public boolean accepts(SourceRecord sourceRecord) {
            Long l;
            if (!this.thisReaderNearEnd.get() && (l = (Long) sourceRecord.sourceOffset().get("ts_sec")) != null && Duration.between(Instant.ofEpochSecond(l.longValue()), Instant.now()).compareTo(this.minHaltingDuration) <= 0) {
                LOGGER.debug("Parallel halting predicate: this reader near end");
                this.thisReaderNearEnd.set(true);
            }
            return (this.thisReaderNearEnd.get() && this.otherReaderNearEnd.get()) ? false : true;
        }
    }

    public ParallelSnapshotReader(Configuration configuration, MySqlTaskContext mySqlTaskContext, Filters filters, MySqlConnectorTask.ServerIdGenerator serverIdGenerator) {
        this.partition = new AtomicReference<>();
        this.running = new AtomicBoolean(false);
        this.completed = new AtomicBoolean(false);
        this.uponCompletion = new AtomicReference<>();
        this.serverIdGenerator = serverIdGenerator;
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        ParallelHaltingPredicate parallelHaltingPredicate = new ParallelHaltingPredicate(atomicBoolean, atomicBoolean2);
        ParallelHaltingPredicate parallelHaltingPredicate2 = new ParallelHaltingPredicate(atomicBoolean2, atomicBoolean);
        this.oldTablesReader = new BinlogReader("oldBinlog", mySqlTaskContext, parallelHaltingPredicate, serverIdGenerator.getNextServerId());
        MySqlTaskContext mySqlTaskContext2 = new MySqlTaskContext(configuration, filters, mySqlTaskContext.source().offset());
        mySqlTaskContext2.start();
        SnapshotReader snapshotReader = new SnapshotReader("newSnapshot", mySqlTaskContext2);
        this.newTablesBinlogReader = new BinlogReader("newBinlog", mySqlTaskContext2, parallelHaltingPredicate2, serverIdGenerator.getNextServerId());
        this.newTablesReader = new ChainedReader.Builder().addReader(snapshotReader).addReader(this.newTablesBinlogReader).build();
    }

    ParallelSnapshotReader(BinlogReader binlogReader, SnapshotReader snapshotReader, BinlogReader binlogReader2) {
        this.partition = new AtomicReference<>();
        this.running = new AtomicBoolean(false);
        this.completed = new AtomicBoolean(false);
        this.uponCompletion = new AtomicReference<>();
        this.oldTablesReader = binlogReader;
        this.newTablesBinlogReader = binlogReader2;
        this.newTablesReader = new ChainedReader.Builder().addReader(snapshotReader).addReader(binlogReader2).build();
        this.serverIdGenerator = null;
    }

    public ReconcilingBinlogReader createReconcilingBinlogReader(BinlogReader binlogReader) {
        return new ReconcilingBinlogReader(this.oldTablesReader, this.newTablesBinlogReader, binlogReader, this.serverIdGenerator.getNextServerId());
    }

    @Override // io.debezium.connector.mysql.legacy.Reader
    public void uponCompletion(Consumer<MySqlPartition> consumer) {
        this.uponCompletion.set(consumer);
    }

    @Override // io.debezium.connector.mysql.legacy.Reader
    public void initialize() {
        this.oldTablesReader.initialize();
        this.newTablesReader.initialize();
    }

    @Override // io.debezium.connector.mysql.legacy.Reader
    public void start(MySqlPartition mySqlPartition) {
        if (this.running.compareAndSet(false, true)) {
            this.partition.set(mySqlPartition);
            this.oldTablesReader.start(mySqlPartition);
            this.newTablesReader.start(mySqlPartition);
        }
    }

    @Override // io.debezium.connector.mysql.legacy.Reader
    public void stop() {
        if (this.running.compareAndSet(true, false)) {
            try {
                LOGGER.info("Stopping the {} reader", this.oldTablesReader.name());
                this.oldTablesReader.stop();
                this.oldTablesReader.context.shutdown();
            } catch (Throwable th) {
                LOGGER.error("Unexpected error stopping the {} reader", this.oldTablesReader.name());
            }
            try {
                LOGGER.info("Stopping the {} reader", this.newTablesReader.name());
                this.newTablesReader.stop();
                this.oldTablesReader.context.shutdown();
            } catch (Throwable th2) {
                LOGGER.error("Unexpected error stopping the {} reader", this.newTablesReader.name());
            }
        }
    }

    @Override // io.debezium.connector.mysql.legacy.Reader
    public Reader.State state() {
        return this.running.get() ? Reader.State.RUNNING : Reader.State.STOPPED;
    }

    @Override // io.debezium.connector.mysql.legacy.Reader
    public List<SourceRecord> poll() throws InterruptedException {
        List<SourceRecord> poll = this.oldTablesReader.isRunning() ? this.oldTablesReader.poll() : null;
        List<SourceRecord> poll2 = this.newTablesReader.poll();
        if (poll2 != null) {
            if (poll == null) {
                poll = poll2;
            } else {
                poll.addAll(poll2);
            }
        } else if (poll == null) {
            completeSuccessfully(this.partition.get());
        }
        return poll;
    }

    private void completeSuccessfully(MySqlPartition mySqlPartition) {
        if (this.completed.compareAndSet(false, true)) {
            stop();
            Consumer<MySqlPartition> andSet = this.uponCompletion.getAndSet(null);
            if (andSet != null) {
                andSet.accept(mySqlPartition);
            }
        }
    }

    @Override // io.debezium.connector.mysql.legacy.Reader
    public String name() {
        return "parallelSnapshotReader";
    }
}
