package com.ververica.cdc.debezium.internal;

import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.internal.Handover;
import io.debezium.connector.SnapshotRecord;
import io.debezium.engine.ChangeEvent;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:com/ververica/cdc/debezium/internal/DebeziumChangeFetcher.class */
public class DebeziumChangeFetcher<T> {
    private static final Logger LOG;
    private final SourceFunction.SourceContext<T> sourceContext;
    private final Object checkpointLock;
    private final DebeziumDeserializationSchema<T> deserialization;
    private final String heartbeatTopicPrefix;
    private boolean isInDbSnapshotPhase;
    private final Handover handover;
    static final /* synthetic */ boolean $assertionsDisabled;
    private volatile boolean isRunning = true;
    private volatile long messageTimestamp = 0;
    private volatile long processTime = 0;
    private volatile long fetchDelay = 0;
    private volatile long emitDelay = 0;
    private volatile AtomicLong numRecordInErrors = new AtomicLong(0);
    private final DebeziumChangeFetcher<T>.DebeziumCollector debeziumCollector = new DebeziumCollector();
    private final DebeziumOffset debeziumOffset = new DebeziumOffset();
    private final DebeziumOffsetSerializer stateSerializer = DebeziumOffsetSerializer.INSTANCE;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/ververica/cdc/debezium/internal/DebeziumChangeFetcher$DebeziumCollector.class */
    public class DebeziumCollector implements Collector<T> {
        private final Queue<T> records;

        private DebeziumCollector() {
            this.records = new ArrayDeque();
        }

        @Override // org.apache.flink.util.Collector
        public void collect(T t) {
            this.records.add(t);
        }

        @Override // org.apache.flink.util.Collector
        public void close() {
        }
    }

    public DebeziumChangeFetcher(SourceFunction.SourceContext<T> sourceContext, DebeziumDeserializationSchema<T> debeziumDeserializationSchema, boolean z, String str, Handover handover) {
        this.sourceContext = sourceContext;
        this.checkpointLock = sourceContext.getCheckpointLock();
        this.deserialization = debeziumDeserializationSchema;
        this.isInDbSnapshotPhase = z;
        this.heartbeatTopicPrefix = str;
        this.handover = handover;
    }

    public byte[] snapshotCurrentState() throws Exception {
        if (!$assertionsDisabled && !Thread.holdsLock(this.checkpointLock)) {
            throw new AssertionError();
        }
        if (this.debeziumOffset.sourceOffset == null || this.debeziumOffset.sourcePartition == null) {
            return null;
        }
        return this.stateSerializer.serialize(this.debeziumOffset);
    }

    public void runFetchLoop() throws Exception {
        try {
            if (this.isInDbSnapshotPhase) {
                List<ChangeEvent<SourceRecord, SourceRecord>> pollNext = this.handover.pollNext();
                synchronized (this.checkpointLock) {
                    LOG.info("Database snapshot phase can't perform checkpoint, acquired Checkpoint lock.");
                    handleBatch(pollNext);
                    while (this.isRunning && this.isInDbSnapshotPhase) {
                        handleBatch(this.handover.pollNext());
                    }
                }
                LOG.info("Received record from streaming binlog phase, released checkpoint lock.");
            }
            while (this.isRunning) {
                handleBatch(this.handover.pollNext());
            }
        } catch (Handover.ClosedException e) {
        } catch (RetriableException e2) {
            LOG.info("Ignore the RetriableException, the underlying DebeziumEngine will restart automatically", (Throwable) e2);
        }
    }

    public void close() {
        this.isRunning = false;
        this.handover.close();
    }

    public long getFetchDelay() {
        return this.fetchDelay;
    }

    public long getEmitDelay() {
        return this.emitDelay;
    }

    public long getIdleTime() {
        return System.currentTimeMillis() - this.processTime;
    }

    public long getNumRecordInErrors() {
        return this.numRecordInErrors.get();
    }

    private void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> list) throws Exception {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        this.processTime = System.currentTimeMillis();
        Iterator<ChangeEvent<SourceRecord, SourceRecord>> it = list.iterator();
        while (it.hasNext()) {
            SourceRecord value = it.next().value();
            updateMessageTimestamp(value);
            this.fetchDelay = this.isInDbSnapshotPhase ? 0L : this.processTime - this.messageTimestamp;
            if (isHeartbeatEvent(value)) {
                synchronized (this.checkpointLock) {
                    this.debeziumOffset.setSourcePartition(value.sourcePartition());
                    this.debeziumOffset.setSourceOffset(value.sourceOffset());
                }
            } else {
                try {
                    this.deserialization.deserialize(value, this.debeziumCollector);
                    if (this.isInDbSnapshotPhase && !isSnapshotRecord(value)) {
                        LOG.debug("Snapshot phase finishes.");
                        this.isInDbSnapshotPhase = false;
                    }
                    emitRecordsUnderCheckpointLock(((DebeziumCollector) this.debeziumCollector).records, value.sourcePartition(), value.sourceOffset());
                } catch (Throwable th) {
                    this.numRecordInErrors.incrementAndGet();
                    LOG.error("Failed to deserialize record {}", value, th);
                    throw th;
                }
            }
        }
    }

    private void emitRecordsUnderCheckpointLock(Queue<T> queue, Map<String, ?> map, Map<String, ?> map2) {
        synchronized (this.checkpointLock) {
            while (true) {
                T poll = queue.poll();
                if (poll != null) {
                    this.emitDelay = this.isInDbSnapshotPhase ? 0L : System.currentTimeMillis() - this.messageTimestamp;
                    this.sourceContext.collect(poll);
                } else {
                    this.debeziumOffset.setSourcePartition(map);
                    this.debeziumOffset.setSourceOffset(map2);
                }
            }
        }
    }

    private void updateMessageTimestamp(SourceRecord sourceRecord) {
        Long int64;
        Schema valueSchema = sourceRecord.valueSchema();
        Struct struct = (Struct) sourceRecord.value();
        if (valueSchema.field("source") == null) {
            return;
        }
        Struct struct2 = struct.getStruct("source");
        if (struct2.schema().field("ts_ms") == null || (int64 = struct2.getInt64("ts_ms")) == null) {
            return;
        }
        this.messageTimestamp = int64.longValue();
    }

    private boolean isHeartbeatEvent(SourceRecord sourceRecord) {
        String str = sourceRecord.topic();
        return str != null && str.startsWith(this.heartbeatTopicPrefix);
    }

    private boolean isSnapshotRecord(SourceRecord sourceRecord) {
        Struct struct = (Struct) sourceRecord.value();
        if (struct != null) {
            return SnapshotRecord.TRUE == SnapshotRecord.fromSource(struct.getStruct("source"));
        }
        return false;
    }

    static {
        $assertionsDisabled = !DebeziumChangeFetcher.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger((Class<?>) DebeziumChangeFetcher.class);
    }
}
