package com.ververica.cdc.runtime.operators.sink;

import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.event.ChangeEvent;
import com.ververica.cdc.common.event.CreateTableEvent;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.event.FlushEvent;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.schema.Schema;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;

@Internal
/* loaded from: input_file:com/ververica/cdc/runtime/operators/sink/DataSinkWriterOperator.class */
public class DataSinkWriterOperator<CommT> extends AbstractStreamOperator<CommittableMessage<CommT>> implements OneInputStreamOperator<Event, CommittableMessage<CommT>>, BoundedOneInput {
    private SchemaEvolutionClient schemaEvolutionClient;
    private final OperatorID schemaOperatorID;
    private final Sink<Event> sink;
    private final ProcessingTimeService processingTimeService;
    private final MailboxExecutor mailboxExecutor;
    private Object flinkWriterOperator;
    private SinkWriter<Event> copySinkWriter;
    private final Set<TableId> processedTableIds = new HashSet();

    public DataSinkWriterOperator(Sink<Event> sink, ProcessingTimeService processingTimeService, MailboxExecutor mailboxExecutor, OperatorID operatorID) {
        this.sink = sink;
        this.processingTimeService = processingTimeService;
        this.mailboxExecutor = mailboxExecutor;
        this.schemaOperatorID = operatorID;
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.SetupableStreamOperator
    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<CommittableMessage<CommT>>> output) {
        super.setup(streamTask, streamConfig, output);
        this.flinkWriterOperator = createFlinkWriterOperator();
        ((AbstractStreamOperator) getFlinkWriterOperator()).setup(streamTask, streamConfig, output);
        this.schemaEvolutionClient = new SchemaEvolutionClient(streamTask.getEnvironment().getOperatorCoordinatorEventGateway(), this.schemaOperatorID);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        ((AbstractStreamOperator) getFlinkWriterOperator()).open();
        this.copySinkWriter = (SinkWriter) getFieldValue("sinkWriter");
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator
    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        this.schemaEvolutionClient.registerSubtask(getRuntimeContext().getIndexOfThisSubtask());
        ((AbstractStreamOperator) getFlinkWriterOperator()).initializeState(stateInitializationContext);
    }

    @Override // org.apache.flink.streaming.api.operators.Input
    public void processElement(StreamRecord<Event> streamRecord) throws Exception {
        Event value = streamRecord.getValue();
        if (value instanceof FlushEvent) {
            handleFlushEvent((FlushEvent) value);
            return;
        }
        if (value instanceof CreateTableEvent) {
            this.processedTableIds.add(((CreateTableEvent) value).tableId());
            ((OneInputStreamOperator) getFlinkWriterOperator()).processElement(streamRecord);
            return;
        }
        ChangeEvent changeEvent = (ChangeEvent) value;
        if (!this.processedTableIds.contains(changeEvent.tableId())) {
            emitLatestSchema(changeEvent.tableId());
            this.processedTableIds.add(changeEvent.tableId());
        }
        this.processedTableIds.add(changeEvent.tableId());
        ((OneInputStreamOperator) getFlinkWriterOperator()).processElement(streamRecord);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void prepareSnapshotPreBarrier(long j) throws Exception {
        ((AbstractStreamOperator) getFlinkWriterOperator()).prepareSnapshotPreBarrier(j);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        ((OneInputStreamOperator) getFlinkWriterOperator()).close();
    }

    @Override // org.apache.flink.streaming.api.operators.BoundedOneInput
    public void endInput() throws Exception {
        ((BoundedOneInput) getFlinkWriterOperator()).endInput();
    }

    private void handleFlushEvent(FlushEvent flushEvent) throws Exception {
        this.copySinkWriter.flush(false);
        this.schemaEvolutionClient.notifyFlushSuccess(getRuntimeContext().getIndexOfThisSubtask(), flushEvent.getTableId());
    }

    private void emitLatestSchema(TableId tableId) throws Exception {
        Optional<Schema> latestSchema = this.schemaEvolutionClient.getLatestSchema(tableId);
        if (!latestSchema.isPresent()) {
            throw new RuntimeException("Could not find schema message from SchemaRegistry for " + tableId);
        }
        ((OneInputStreamOperator) getFlinkWriterOperator()).processElement(new StreamRecord(new CreateTableEvent(tableId, latestSchema.get())));
        this.processedTableIds.add(tableId);
    }

    private Object createFlinkWriterOperator() {
        try {
            Constructor<?> declaredConstructor = getRuntimeContext().getUserCodeClassLoader().loadClass("org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator").getDeclaredConstructor(Sink.class, ProcessingTimeService.class, MailboxExecutor.class);
            declaredConstructor.setAccessible(true);
            return declaredConstructor.newInstance(this.sink, this.processingTimeService, this.mailboxExecutor);
        } catch (Exception e) {
            throw new RuntimeException("Failed to create SinkWriterOperator in Flink", e);
        }
    }

    private <T> T getFieldValue(String str) throws IllegalAccessException {
        Class<?> cls = this.flinkWriterOperator.getClass();
        while (true) {
            Class<?> cls2 = cls;
            if (cls2 == null) {
                throw new RuntimeException("failed to get sinkWriter");
            }
            try {
                Field declaredField = cls2.getDeclaredField(str);
                declaredField.setAccessible(true);
                return (T) declaredField.get(this.flinkWriterOperator);
            } catch (NoSuchFieldException e) {
                cls = cls2.getSuperclass();
            }
        }
    }

    private <T> T getFlinkWriterOperator() {
        return (T) this.flinkWriterOperator;
    }
}
