package com.ververica.cdc.runtime.operators.schema.coordinator;

import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.event.CreateTableEvent;
import com.ververica.cdc.common.event.SchemaChangeEvent;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.sink.MetadataApplier;
import com.ververica.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
import com.ververica.cdc.runtime.operators.schema.event.ReleaseUpstreamResponse;
import com.ververica.cdc.runtime.operators.schema.event.SchemaChangeRequest;
import com.ververica.cdc.runtime.operators.schema.event.SchemaChangeResponse;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
@NotThreadSafe
/* loaded from: input_file:com/ververica/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.class */
public class SchemaRegistryRequestHandler {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SchemaRegistryRequestHandler.class);
    private final MetadataApplier metadataApplier;
    private final SchemaManager schemaManager;
    private final Set<Integer> activeSinkWriters = new HashSet();
    private final Set<Integer> flushedSinkWriters = new HashSet();
    private final List<PendingSchemaChange> pendingSchemaChanges = new LinkedList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/ververica/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler$PendingSchemaChange.class */
    public static class PendingSchemaChange {
        private final SchemaChangeRequest changeRequest;
        private CompletableFuture<CoordinationResponse> responseFuture;
        private RequestStatus status = RequestStatus.PENDING;

        public PendingSchemaChange(SchemaChangeRequest schemaChangeRequest, CompletableFuture<CoordinationResponse> completableFuture) {
            this.changeRequest = schemaChangeRequest;
            this.responseFuture = completableFuture;
        }

        public SchemaChangeRequest getChangeRequest() {
            return this.changeRequest;
        }

        public CompletableFuture<CoordinationResponse> getResponseFuture() {
            return this.responseFuture;
        }

        public RequestStatus getStatus() {
            return this.status;
        }

        public void startToWaitForReleaseRequest() {
            if (!this.responseFuture.isDone()) {
                throw new IllegalStateException("Cannot start to wait for flush success before the SchemaChangeRequest is done.");
            }
            this.responseFuture = new CompletableFuture<>();
            this.status = RequestStatus.WAIT_RELEASE_REQUEST;
        }

        public void receiveReleaseRequest() {
            this.status = RequestStatus.RECEIVED_RELEASE_REQUEST;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/ververica/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler$RequestStatus.class */
    public enum RequestStatus {
        PENDING,
        WAIT_RELEASE_REQUEST,
        RECEIVED_RELEASE_REQUEST
    }

    public SchemaRegistryRequestHandler(MetadataApplier metadataApplier, SchemaManager schemaManager) {
        this.metadataApplier = metadataApplier;
        this.schemaManager = schemaManager;
    }

    private void applySchemaChange(TableId tableId, SchemaChangeEvent schemaChangeEvent) {
        LOG.debug("Apply schema change {} to table {}.", schemaChangeEvent, tableId);
        this.metadataApplier.applySchemaChange(schemaChangeEvent);
    }

    public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(SchemaChangeRequest schemaChangeRequest) {
        if (!this.pendingSchemaChanges.isEmpty()) {
            LOG.info("There are already processing requests. Wait for processing.");
            CompletableFuture<CoordinationResponse> completableFuture = new CompletableFuture<>();
            this.pendingSchemaChanges.add(new PendingSchemaChange(schemaChangeRequest, completableFuture));
            return completableFuture;
        }
        LOG.info("Received schema change event request from table {}. Start to buffer requests for others.", schemaChangeRequest.getTableId().toString());
        if ((schemaChangeRequest.getSchemaChangeEvent() instanceof CreateTableEvent) && this.schemaManager.schemaExists(schemaChangeRequest.getTableId())) {
            return CompletableFuture.completedFuture(CoordinationResponseUtils.wrap(new SchemaChangeResponse(false)));
        }
        CompletableFuture<CoordinationResponse> completedFuture = CompletableFuture.completedFuture(CoordinationResponseUtils.wrap(new SchemaChangeResponse(true)));
        this.schemaManager.applySchemaChange(schemaChangeRequest.getSchemaChangeEvent());
        this.pendingSchemaChanges.add(new PendingSchemaChange(schemaChangeRequest, completedFuture));
        this.pendingSchemaChanges.get(0).startToWaitForReleaseRequest();
        return completedFuture;
    }

    public CompletableFuture<CoordinationResponse> handleReleaseUpstreamRequest() {
        CompletableFuture<CoordinationResponse> responseFuture = this.pendingSchemaChanges.get(0).getResponseFuture();
        if (responseFuture.isDone()) {
            startNextSchemaChangeRequest();
        } else {
            this.pendingSchemaChanges.get(0).receiveReleaseRequest();
        }
        return responseFuture;
    }

    public void registerSinkWriter(int i) {
        LOG.info("Register sink subtask {}.", Integer.valueOf(i));
        this.activeSinkWriters.add(Integer.valueOf(i));
    }

    public void flushSuccess(TableId tableId, int i) {
        this.flushedSinkWriters.add(Integer.valueOf(i));
        if (this.flushedSinkWriters.equals(this.activeSinkWriters)) {
            LOG.info("All sink subtask have flushed for table {}. Start to apply schema change.", tableId.toString());
            PendingSchemaChange pendingSchemaChange = this.pendingSchemaChanges.get(0);
            applySchemaChange(tableId, pendingSchemaChange.getChangeRequest().getSchemaChangeEvent());
            pendingSchemaChange.getResponseFuture().complete(CoordinationResponseUtils.wrap(new ReleaseUpstreamResponse()));
            if (RequestStatus.RECEIVED_RELEASE_REQUEST.equals(pendingSchemaChange.getStatus())) {
                startNextSchemaChangeRequest();
            }
        }
    }

    private void startNextSchemaChangeRequest() {
        this.pendingSchemaChanges.remove(0);
        this.flushedSinkWriters.clear();
        while (!this.pendingSchemaChanges.isEmpty()) {
            PendingSchemaChange pendingSchemaChange = this.pendingSchemaChanges.get(0);
            SchemaChangeRequest schemaChangeRequest = pendingSchemaChange.changeRequest;
            if (!(schemaChangeRequest.getSchemaChangeEvent() instanceof CreateTableEvent) || !this.schemaManager.schemaExists(schemaChangeRequest.getTableId())) {
                this.schemaManager.applySchemaChange(schemaChangeRequest.getSchemaChangeEvent());
                pendingSchemaChange.getResponseFuture().complete(CoordinationResponseUtils.wrap(new SchemaChangeResponse(true)));
                pendingSchemaChange.startToWaitForReleaseRequest();
                return;
            }
            pendingSchemaChange.getResponseFuture().complete(CoordinationResponseUtils.wrap(new SchemaChangeResponse(false)));
            this.pendingSchemaChanges.remove(0);
        }
    }
}
