package io.debezium.connector.mysql.legacy;

import io.debezium.connector.mysql.HaltingPredicate;
import io.debezium.connector.mysql.legacy.Reader;
import io.debezium.document.Document;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/mysql/legacy/ReconcilingBinlogReader.class */
public class ReconcilingBinlogReader implements Reader {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ReconcilingBinlogReader.class);
    private final BinlogReader binlogReaderA;
    private final BinlogReader binlogReaderB;
    private final BinlogReader unifiedReader;
    private BinlogReader reconcilingReader;
    private Boolean aReaderLeading = null;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final AtomicBoolean completed = new AtomicBoolean(false);
    private final AtomicReference<Runnable> uponCompletion = new AtomicReference<>();
    private final long serverId;

    /* loaded from: input_file:io/debezium/connector/mysql/legacy/ReconcilingBinlogReader$OffsetLimitPredicate.class */
    static class OffsetLimitPredicate implements HaltingPredicate {
        private final Document leadingReaderFinalOffsetDocument;
        private final Predicate<String> gtidFilter;

        OffsetLimitPredicate(Map<String, ?> map, Predicate<String> predicate) {
            this.leadingReaderFinalOffsetDocument = SourceInfo.createDocumentFromOffset(map);
            this.gtidFilter = predicate;
        }

        @Override // io.debezium.connector.mysql.HaltingPredicate
        public boolean accepts(SourceRecord sourceRecord) {
            return !SourceInfo.isPositionAtOrBefore(this.leadingReaderFinalOffsetDocument, SourceInfo.createDocumentFromOffset(sourceRecord.sourceOffset()), this.gtidFilter);
        }
    }

    public ReconcilingBinlogReader(BinlogReader binlogReader, BinlogReader binlogReader2, BinlogReader binlogReader3, long j) {
        this.binlogReaderA = binlogReader;
        this.binlogReaderB = binlogReader2;
        this.unifiedReader = binlogReader3;
        this.serverId = j;
    }

    @Override // io.debezium.connector.mysql.legacy.Reader
    public String name() {
        return "reconcilingBinlogReader";
    }

    @Override // io.debezium.connector.mysql.legacy.Reader
    public Reader.State state() {
        return this.running.get() ? Reader.State.RUNNING : this.completed.get() ? Reader.State.STOPPED : Reader.State.STOPPING;
    }

    @Override // io.debezium.connector.mysql.legacy.Reader
    public void uponCompletion(Runnable runnable) {
        this.uponCompletion.set(runnable);
    }

    @Override // io.debezium.connector.mysql.legacy.Reader
    public void start() {
        if (this.running.compareAndSet(false, true)) {
            this.completed.set(false);
            determineLeadingReader();
            MySqlTaskContext mySqlTaskContext = getLaggingReader().context;
            this.reconcilingReader = new BinlogReader("innerReconcilingReader", mySqlTaskContext, new OffsetLimitPredicate(getLeadingReader().getLastOffset(), mySqlTaskContext.gtidSourceFilter()), this.serverId);
            this.reconcilingReader.start();
        }
    }

    @Override // io.debezium.connector.mysql.legacy.Reader
    public void stop() {
        if (this.running.compareAndSet(true, false)) {
            try {
                LOGGER.info("Stopping the {} reader", this.reconcilingReader.name());
                this.reconcilingReader.stop();
                this.reconcilingReader.context.shutdown();
            } catch (Throwable th) {
                LOGGER.error("Unexpected error stopping the {} reader", this.reconcilingReader.name());
            }
        }
    }

    @Override // io.debezium.connector.mysql.legacy.Reader
    public List<SourceRecord> poll() throws InterruptedException {
        List<SourceRecord> poll = this.reconcilingReader.poll();
        if (poll == null) {
            completeSuccessfully();
        }
        return poll;
    }

    private void completeSuccessfully() {
        if (this.completed.compareAndSet(false, true)) {
            stop();
            setupUnifiedReader();
            LOGGER.info("Completed reconciliation of parallel readers.");
            Runnable andSet = this.uponCompletion.getAndSet(null);
            if (andSet != null) {
                andSet.run();
            }
        }
    }

    private void setupUnifiedReader() {
        this.unifiedReader.context.loadHistory(getLeadingReader().context.source());
        this.unifiedReader.context.source().setFilterDataFromConfig(this.unifiedReader.context.config());
        Map<String, ?> lastOffset = this.reconcilingReader.getLastOffset() == null ? getLeadingReader().getLastOffset() : this.reconcilingReader.getLastOffset();
        this.unifiedReader.context.source().setCompletedGtidSet((String) lastOffset.get("gtids"));
        this.unifiedReader.context.source().setBinlogStartPoint((String) lastOffset.get("file"), ((Long) lastOffset.get("pos")).longValue());
    }

    private void determineLeadingReader() {
        Map<String, ?> lastOffset = this.binlogReaderA.getLastOffset();
        Map<String, ?> lastOffset2 = this.binlogReaderB.getLastOffset();
        boolean z = this.binlogReaderA.state() != Reader.State.STOPPED;
        boolean z2 = this.binlogReaderB.state() != Reader.State.STOPPED;
        if ((lastOffset == null && lastOffset2 == null) || z || z2) {
            throw new IllegalStateException("Cannot determine leading reader until both source readers have completed.");
        }
        if (lastOffset == null) {
            this.aReaderLeading = false;
        } else if (lastOffset2 == null) {
            this.aReaderLeading = true;
        } else {
            this.aReaderLeading = Boolean.valueOf(SourceInfo.isPositionAtOrBefore(SourceInfo.createDocumentFromOffset(lastOffset2), SourceInfo.createDocumentFromOffset(lastOffset), this.binlogReaderA.context.gtidSourceFilter()));
        }
        if (this.aReaderLeading.booleanValue()) {
            LOGGER.info("old tables leading; reading only from new tables");
        } else {
            LOGGER.info("new tables leading; reading only from old tables");
        }
    }

    BinlogReader getLeadingReader() {
        checkLaggingLeadingInfo();
        return this.aReaderLeading.booleanValue() ? this.binlogReaderA : this.binlogReaderB;
    }

    BinlogReader getLaggingReader() {
        checkLaggingLeadingInfo();
        return this.aReaderLeading.booleanValue() ? this.binlogReaderB : this.binlogReaderA;
    }

    private void checkLaggingLeadingInfo() {
        if (this.aReaderLeading == null) {
            throw new IllegalStateException("Cannot return leading or lagging readers until this reader has started.");
        }
    }
}
