package com.ververica.cdc.connectors.mysql.source.reader;

import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaEvent;
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaRequestEvent;
import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsAckEvent;
import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsReportEvent;
import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsRequestEvent;
import com.ververica.cdc.connectors.mysql.source.events.LatestFinishedSplitsSizeEvent;
import com.ververica.cdc.connectors.mysql.source.events.LatestFinishedSplitsSizeRequestEvent;
import com.ververica.cdc.connectors.mysql.source.events.SuspendBinlogReaderAckEvent;
import com.ververica.cdc.connectors.mysql.source.events.SuspendBinlogReaderEvent;
import com.ververica.cdc.connectors.mysql.source.events.WakeupReaderEvent;
import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplitState;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplitState;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitState;
import com.ververica.cdc.connectors.mysql.source.split.SourceRecords;
import com.ververica.cdc.connectors.mysql.source.utils.ChunkUtils;
import com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.class */
public class MySqlSourceReader<T> extends SingleThreadMultiplexSourceReaderBase<SourceRecords, T, MySqlSplit, MySqlSplitState> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) MySqlSourceReader.class);
    private final MySqlSourceConfig sourceConfig;
    private final Map<String, MySqlSnapshotSplit> finishedUnackedSplits;
    private final Map<String, MySqlBinlogSplit> uncompletedBinlogSplits;
    private final int subtaskId;
    private final MySqlSourceReaderContext mySqlSourceReaderContext;
    private MySqlBinlogSplit suspendedBinlogSplit;

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public MySqlSourceReader(FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> futureCompletingBlockingQueue, Supplier<MySqlSplitReader> supplier, RecordEmitter<SourceRecords, T, MySqlSplitState> recordEmitter, Configuration configuration, MySqlSourceReaderContext mySqlSourceReaderContext, MySqlSourceConfig mySqlSourceConfig) {
        super(futureCompletingBlockingQueue, new SingleThreadFetcherManager(futureCompletingBlockingQueue, supplier::get), recordEmitter, configuration, mySqlSourceReaderContext.getSourceReaderContext());
        supplier.getClass();
        this.sourceConfig = mySqlSourceConfig;
        this.finishedUnackedSplits = new HashMap();
        this.uncompletedBinlogSplits = new HashMap();
        this.subtaskId = mySqlSourceReaderContext.getSourceReaderContext().getIndexOfSubtask();
        this.mySqlSourceReaderContext = mySqlSourceReaderContext;
        this.suspendedBinlogSplit = null;
    }

    @Override // org.apache.flink.connector.base.source.reader.SourceReaderBase, org.apache.flink.api.connector.source.SourceReader
    public void start() {
        if (getNumberOfCurrentlyAssignedSplits() == 0) {
            this.context.sendSplitRequest();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.connector.base.source.reader.SourceReaderBase
    public MySqlSplitState initializedState(MySqlSplit mySqlSplit) {
        return mySqlSplit.isSnapshotSplit() ? new MySqlSnapshotSplitState(mySqlSplit.asSnapshotSplit()) : new MySqlBinlogSplitState(mySqlSplit.asBinlogSplit());
    }

    @Override // org.apache.flink.connector.base.source.reader.SourceReaderBase, org.apache.flink.api.connector.source.SourceReader
    public List<MySqlSplit> snapshotState(long j) {
        List<MySqlSplit> list = (List) super.snapshotState(j).stream().filter(mySqlSplit -> {
            return !this.finishedUnackedSplits.containsKey(mySqlSplit.splitId());
        }).collect(Collectors.toList());
        list.addAll(this.finishedUnackedSplits.values());
        list.addAll(this.uncompletedBinlogSplits.values());
        if (this.suspendedBinlogSplit != null) {
            list.add(this.suspendedBinlogSplit);
        }
        logCurrentBinlogOffsets(list, j);
        return list;
    }

    @Override // org.apache.flink.connector.base.source.reader.SourceReaderBase
    protected void onSplitFinished(Map<String, MySqlSplitState> map) {
        boolean z = true;
        for (MySqlSplitState mySqlSplitState : map.values()) {
            MySqlSplit mySqlSplit = mySqlSplitState.toMySqlSplit();
            if (mySqlSplit.isBinlogSplit()) {
                LOG.info("binlog split reader suspended due to newly added table, offset {}", mySqlSplitState.asBinlogSplitState().getStartingOffset());
                this.mySqlSourceReaderContext.resetStopBinlogSplitReader();
                this.suspendedBinlogSplit = MySqlBinlogSplit.toSuspendedBinlogSplit(mySqlSplit.asBinlogSplit());
                this.context.sendSourceEventToCoordinator(new SuspendBinlogReaderAckEvent());
                z = false;
            } else {
                this.finishedUnackedSplits.put(mySqlSplit.splitId(), mySqlSplit.asSnapshotSplit());
            }
        }
        reportFinishedSnapshotSplitsIfNeed();
        if (z) {
            this.context.sendSplitRequest();
        }
    }

    @Override // org.apache.flink.connector.base.source.reader.SourceReaderBase, org.apache.flink.api.connector.source.SourceReader
    public void addSplits(List<MySqlSplit> list) {
        ArrayList arrayList = new ArrayList();
        for (MySqlSplit mySqlSplit : list) {
            LOG.info("Add Split: " + mySqlSplit);
            if (mySqlSplit.isSnapshotSplit()) {
                MySqlSnapshotSplit asSnapshotSplit = mySqlSplit.asSnapshotSplit();
                if (asSnapshotSplit.isSnapshotReadFinished()) {
                    this.finishedUnackedSplits.put(asSnapshotSplit.splitId(), asSnapshotSplit);
                } else {
                    arrayList.add(mySqlSplit);
                }
            } else {
                MySqlBinlogSplit asBinlogSplit = mySqlSplit.asBinlogSplit();
                if (asBinlogSplit.isSuspended()) {
                    this.suspendedBinlogSplit = asBinlogSplit;
                } else if (asBinlogSplit.isCompletedSplit()) {
                    this.uncompletedBinlogSplits.remove(mySqlSplit.splitId());
                    arrayList.add(discoverTableSchemasForBinlogSplit(mySqlSplit.asBinlogSplit()));
                } else {
                    this.uncompletedBinlogSplits.put(mySqlSplit.splitId(), mySqlSplit.asBinlogSplit());
                    requestBinlogSplitMetaIfNeeded(mySqlSplit.asBinlogSplit());
                }
            }
        }
        reportFinishedSnapshotSplitsIfNeed();
        if (arrayList.isEmpty()) {
            return;
        }
        super.addSplits(arrayList);
    }

    private MySqlBinlogSplit discoverTableSchemasForBinlogSplit(MySqlBinlogSplit mySqlBinlogSplit) {
        String splitId = mySqlBinlogSplit.splitId();
        if (!mySqlBinlogSplit.getTableSchemas().isEmpty()) {
            LOG.warn("The binlog split {} has table schemas yet, skip the table schema discovery", mySqlBinlogSplit);
            return mySqlBinlogSplit;
        }
        try {
            MySqlConnection createMySqlConnection = DebeziumUtils.createMySqlConnection(this.sourceConfig);
            Throwable th = null;
            try {
                try {
                    Map<TableId, TableChanges.TableChange> discoverCapturedTableSchemas = TableDiscoveryUtils.discoverCapturedTableSchemas(this.sourceConfig, createMySqlConnection);
                    LOG.info("The table schema discovery for binlog split {} success", splitId);
                    MySqlBinlogSplit fillTableSchemas = MySqlBinlogSplit.fillTableSchemas(mySqlBinlogSplit, discoverCapturedTableSchemas);
                    if (createMySqlConnection != null) {
                        if (0 != 0) {
                            try {
                                createMySqlConnection.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createMySqlConnection.close();
                        }
                    }
                    return fillTableSchemas;
                } finally {
                }
            } finally {
            }
        } catch (SQLException e) {
            LOG.error("Failed to obtains table schemas due to {}", e.getMessage());
            throw new FlinkRuntimeException(e);
        }
    }

    @Override // org.apache.flink.connector.base.source.reader.SourceReaderBase, org.apache.flink.api.connector.source.SourceReader
    public void handleSourceEvents(SourceEvent sourceEvent) {
        if (sourceEvent instanceof FinishedSnapshotSplitsAckEvent) {
            FinishedSnapshotSplitsAckEvent finishedSnapshotSplitsAckEvent = (FinishedSnapshotSplitsAckEvent) sourceEvent;
            LOG.debug("The subtask {} receives ack event for {} from enumerator.", Integer.valueOf(this.subtaskId), finishedSnapshotSplitsAckEvent.getFinishedSplits());
            Iterator<String> it = finishedSnapshotSplitsAckEvent.getFinishedSplits().iterator();
            while (it.hasNext()) {
                this.finishedUnackedSplits.remove(it.next());
            }
            return;
        }
        if (sourceEvent instanceof FinishedSnapshotSplitsRequestEvent) {
            LOG.debug("The subtask {} receives request to report finished snapshot splits.", Integer.valueOf(this.subtaskId));
            reportFinishedSnapshotSplitsIfNeed();
            return;
        }
        if (sourceEvent instanceof BinlogSplitMetaEvent) {
            LOG.debug("The subtask {} receives binlog meta with group id {}.", Integer.valueOf(this.subtaskId), Integer.valueOf(((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId()));
            fillMetaDataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent);
            return;
        }
        if (sourceEvent instanceof SuspendBinlogReaderEvent) {
            this.mySqlSourceReaderContext.setStopBinlogSplitReader();
            return;
        }
        if (sourceEvent instanceof WakeupReaderEvent) {
            if (((WakeupReaderEvent) sourceEvent).getTarget() == WakeupReaderEvent.WakeUpTarget.SNAPSHOT_READER) {
                this.context.sendSplitRequest();
                return;
            } else {
                if (this.suspendedBinlogSplit != null) {
                    this.context.sendSourceEventToCoordinator(new LatestFinishedSplitsSizeRequestEvent());
                    return;
                }
                return;
            }
        }
        if (!(sourceEvent instanceof LatestFinishedSplitsSizeEvent)) {
            super.handleSourceEvents(sourceEvent);
        } else if (this.suspendedBinlogSplit != null) {
            MySqlBinlogSplit normalBinlogSplit = MySqlBinlogSplit.toNormalBinlogSplit(this.suspendedBinlogSplit, ((LatestFinishedSplitsSizeEvent) sourceEvent).getLatestFinishedSplitsSize());
            this.suspendedBinlogSplit = null;
            addSplits(Collections.singletonList(normalBinlogSplit));
        }
    }

    private void reportFinishedSnapshotSplitsIfNeed() {
        if (this.finishedUnackedSplits.isEmpty()) {
            return;
        }
        HashMap hashMap = new HashMap();
        for (MySqlSnapshotSplit mySqlSnapshotSplit : this.finishedUnackedSplits.values()) {
            hashMap.put(mySqlSnapshotSplit.splitId(), mySqlSnapshotSplit.getHighWatermark());
        }
        this.context.sendSourceEventToCoordinator(new FinishedSnapshotSplitsReportEvent(hashMap));
        LOG.debug("The subtask {} reports offsets of finished snapshot splits {}.", Integer.valueOf(this.subtaskId), hashMap);
    }

    private void requestBinlogSplitMetaIfNeeded(MySqlBinlogSplit mySqlBinlogSplit) {
        String splitId = mySqlBinlogSplit.splitId();
        if (mySqlBinlogSplit.isCompletedSplit()) {
            LOG.info("The meta of binlog split {} has been collected success", splitId);
            addSplits(Collections.singletonList(mySqlBinlogSplit));
        } else {
            this.context.sendSourceEventToCoordinator(new BinlogSplitMetaRequestEvent(splitId, ChunkUtils.getNextMetaGroupId(mySqlBinlogSplit.getFinishedSnapshotSplitInfos().size(), this.sourceConfig.getSplitMetaGroupSize())));
        }
    }

    private void fillMetaDataForBinlogSplit(BinlogSplitMetaEvent binlogSplitMetaEvent) {
        MySqlBinlogSplit mySqlBinlogSplit = this.uncompletedBinlogSplits.get(binlogSplitMetaEvent.getSplitId());
        if (mySqlBinlogSplit == null) {
            LOG.warn("Received binlog meta event for split {}, but the uncompleted split map does not contain it", binlogSplitMetaEvent.getSplitId());
            return;
        }
        int metaGroupId = binlogSplitMetaEvent.getMetaGroupId();
        int nextMetaGroupId = ChunkUtils.getNextMetaGroupId(mySqlBinlogSplit.getFinishedSnapshotSplitInfos().size(), this.sourceConfig.getSplitMetaGroupSize());
        if (metaGroupId == nextMetaGroupId) {
            List list = (List) binlogSplitMetaEvent.getMetaGroup().stream().map(FinishedSnapshotSplitInfo::deserialize).collect(Collectors.toList());
            this.uncompletedBinlogSplits.put(mySqlBinlogSplit.splitId(), MySqlBinlogSplit.appendFinishedSplitInfos(mySqlBinlogSplit, list));
            LOG.info("Fill meta data of group {} to binlog split", Integer.valueOf(list.size()));
        } else {
            LOG.warn("Received out of oder binlog meta event for split {}, the received meta group id is {}, but expected is {}, ignore it", binlogSplitMetaEvent.getSplitId(), Integer.valueOf(metaGroupId), Integer.valueOf(nextMetaGroupId));
        }
        requestBinlogSplitMetaIfNeeded(this.uncompletedBinlogSplits.get(mySqlBinlogSplit.splitId()));
    }

    private void logCurrentBinlogOffsets(List<MySqlSplit> list, long j) {
        if (LOG.isInfoEnabled()) {
            for (MySqlSplit mySqlSplit : list) {
                if (!mySqlSplit.isBinlogSplit()) {
                    return;
                }
                LOG.info("Binlog offset on checkpoint {}: {}", Long.valueOf(j), mySqlSplit.asBinlogSplit().getStartingOffset());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.connector.base.source.reader.SourceReaderBase
    public MySqlSplit toSplitType(String str, MySqlSplitState mySqlSplitState) {
        return mySqlSplitState.toMySqlSplit();
    }
}
