package org.apache.flink.table.planner.connectors;

import java.time.Duration;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rex.RexBuilder;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ContextResolvedTable;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.catalog.ExternalCatalogTable;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
import org.apache.flink.table.operations.CollectModifyOperation;
import org.apache.flink.table.operations.ExternalModifyOperation;
import org.apache.flink.table.operations.SinkModifyOperation;
import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.plan.abilities.sink.OverwriteSpec;
import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
import org.apache.flink.table.planner.plan.abilities.sink.WritingMetadataSpec;
import org.apache.flink.table.planner.plan.nodes.calcite.LogicalSink;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.TypeTransformations;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.table.types.utils.TypeConversions;
import org.codehaus.plexus.util.SelectorUtils;

@Internal
/* loaded from: input_file:org/apache/flink/table/planner/connectors/DynamicSinkUtils.class */
public final class DynamicSinkUtils {
    public static RelNode convertCollectToRel(FlinkRelBuilder flinkRelBuilder, RelNode relNode, CollectModifyOperation collectModifyOperation, ReadableConfig readableConfig, ClassLoader classLoader) {
        DataTypeFactory dataTypeFactory = ShortcutUtils.unwrapContext(flinkRelBuilder).getCatalogManager().getDataTypeFactory();
        ResolvedSchema resolvedSchema = collectModifyOperation.getChild().getResolvedSchema();
        ResolvedSchema physical = ResolvedSchema.physical(resolvedSchema.getColumnNames(), resolvedSchema.getColumnDataTypes());
        ContextResolvedTable anonymous = ContextResolvedTable.anonymous("collect", new ResolvedCatalogTable(new ExternalCatalogTable(Schema.newBuilder().fromResolvedSchema(physical).build()), physical));
        DataType fixCollectDataType = fixCollectDataType(dataTypeFactory, physical);
        String str = (String) readableConfig.get(TableConfigOptions.LOCAL_TIME_ZONE);
        CollectDynamicSink collectDynamicSink = new CollectDynamicSink(anonymous.getIdentifier(), fixCollectDataType, (MemorySize) readableConfig.get(CollectSinkOperatorFactory.MAX_BATCH_SIZE), (Duration) readableConfig.get(CollectSinkOperatorFactory.SOCKET_TIMEOUT), classLoader, TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(str) ? ZoneId.systemDefault() : ZoneId.of(str), ((ExecutionConfigOptions.LegacyCastBehaviour) readableConfig.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR)).isEnabled());
        collectModifyOperation.setSelectResultProvider(collectDynamicSink.getSelectResultProvider());
        collectModifyOperation.setConsumedDataType(fixCollectDataType);
        return convertSinkToRel(flinkRelBuilder, relNode, Collections.emptyMap(), anonymous, Collections.emptyMap(), false, collectDynamicSink);
    }

    public static RelNode convertExternalToRel(FlinkRelBuilder flinkRelBuilder, RelNode relNode, ExternalModifyOperation externalModifyOperation) {
        return convertSinkToRel(flinkRelBuilder, relNode, Collections.emptyMap(), externalModifyOperation.getContextResolvedTable(), Collections.emptyMap(), false, new ExternalDynamicSink(externalModifyOperation.getChangelogMode().orElse(null), externalModifyOperation.getPhysicalDataType()));
    }

    public static RelNode convertSinkToRel(FlinkRelBuilder flinkRelBuilder, RelNode relNode, SinkModifyOperation sinkModifyOperation, DynamicTableSink dynamicTableSink) {
        return convertSinkToRel(flinkRelBuilder, relNode, sinkModifyOperation.getDynamicOptions(), sinkModifyOperation.getContextResolvedTable(), sinkModifyOperation.getStaticPartitions(), sinkModifyOperation.isOverwrite(), dynamicTableSink);
    }

    private static RelNode convertSinkToRel(FlinkRelBuilder flinkRelBuilder, RelNode relNode, Map<String, String> map, ContextResolvedTable contextResolvedTable, Map<String, String> map2, boolean z, DynamicTableSink dynamicTableSink) {
        DataTypeFactory dataTypeFactory = ShortcutUtils.unwrapContext(flinkRelBuilder).getCatalogManager().getDataTypeFactory();
        FlinkTypeFactory unwrapTypeFactory = ShortcutUtils.unwrapTypeFactory(flinkRelBuilder);
        ResolvedSchema resolvedSchema = contextResolvedTable.getResolvedSchema();
        String asSummaryString = contextResolvedTable.getIdentifier().asSummaryString();
        ArrayList arrayList = new ArrayList();
        prepareDynamicSink(asSummaryString, map2, z, dynamicTableSink, (ResolvedCatalogTable) contextResolvedTable.getResolvedTable(), arrayList);
        arrayList.forEach(sinkAbilitySpec -> {
            sinkAbilitySpec.apply(dynamicTableSink);
        });
        flinkRelBuilder.push(validateSchemaAndApplyImplicitCast(relNode, resolvedSchema, asSummaryString, dataTypeFactory, unwrapTypeFactory));
        if (!extractPersistedMetadataColumns(resolvedSchema).isEmpty()) {
            pushMetadataProjection(flinkRelBuilder, unwrapTypeFactory, resolvedSchema, dynamicTableSink);
        }
        ArrayList arrayList2 = new ArrayList();
        if (!map.isEmpty()) {
            arrayList2.add(RelHint.builder("OPTIONS").hintOptions(map).build());
        }
        return LogicalSink.create(flinkRelBuilder.build(), arrayList2, contextResolvedTable, dynamicTableSink, map2, (SinkAbilitySpec[]) arrayList.toArray(new SinkAbilitySpec[0]));
    }

    public static RelNode validateSchemaAndApplyImplicitCast(RelNode relNode, ResolvedSchema resolvedSchema, String str, DataTypeFactory dataTypeFactory, FlinkTypeFactory flinkTypeFactory) {
        List<RowType.RowField> fields = FlinkTypeFactory.toLogicalRowType(relNode.getRowType()).getFields();
        RowType rowType = (RowType) fixSinkDataType(dataTypeFactory, resolvedSchema.toSinkRowDataType()).getLogicalType();
        List<RowType.RowField> fields2 = rowType.getFields();
        if (fields.size() != fields2.size()) {
            throw createSchemaMismatchException("Different number of columns.", str, fields, fields2);
        }
        boolean z = false;
        for (int i = 0; i < fields2.size(); i++) {
            LogicalType type = fields.get(i).getType();
            LogicalType type2 = fields2.get(i).getType();
            if (!LogicalTypeCasts.supportsImplicitCast(type, type2)) {
                throw createSchemaMismatchException(String.format("Incompatible types for sink column '%s' at position %s.", fields2.get(i).getName(), Integer.valueOf(i)), str, fields, fields2);
            }
            if (!LogicalTypeCasts.supportsAvoidingCast(type, type2)) {
                z = true;
            }
        }
        return z ? RelOptUtil.createCastRel(relNode, flinkTypeFactory.buildRelNodeRowType(rowType), true) : relNode;
    }

    private static DataType fixCollectDataType(DataTypeFactory dataTypeFactory, ResolvedSchema resolvedSchema) {
        return TypeConversions.fromLogicalToDataType(DataTypeUtils.transform(dataTypeFactory, resolvedSchema.toSourceRowDataType(), TypeTransformations.legacyRawToTypeInfoRaw(), TypeTransformations.legacyToNonLegacy()).getLogicalType());
    }

    private static void pushMetadataProjection(FlinkRelBuilder flinkRelBuilder, FlinkTypeFactory flinkTypeFactory, ResolvedSchema resolvedSchema, DynamicTableSink dynamicTableSink) {
        RexBuilder rexBuilder = flinkRelBuilder.getRexBuilder();
        List<Column> columns = resolvedSchema.getColumns();
        List<Integer> extractPhysicalColumns = extractPhysicalColumns(resolvedSchema);
        Map map = (Map) extractPersistedMetadataColumns(resolvedSchema).stream().collect(Collectors.toMap(num -> {
            Column.MetadataColumn metadataColumn = (Column.MetadataColumn) columns.get(num.intValue());
            return metadataColumn.getMetadataKey().orElse(metadataColumn.getName());
        }, Function.identity()));
        Stream<R> map2 = createRequiredMetadataColumns(resolvedSchema, dynamicTableSink).stream().map(metadataColumn -> {
            return metadataColumn.getMetadataKey().orElse(metadataColumn.getName());
        });
        map.getClass();
        List list = (List) map2.map((v1) -> {
            return r1.get(v1);
        }).collect(Collectors.toList());
        Stream<Integer> stream = extractPhysicalColumns.stream();
        columns.getClass();
        Stream map3 = stream.map((v1) -> {
            return r1.get(v1);
        }).map((v0) -> {
            return v0.getName();
        });
        Stream stream2 = list.stream();
        columns.getClass();
        Stream map4 = stream2.map((v1) -> {
            return r2.get(v1);
        });
        Class<Column.MetadataColumn> cls = Column.MetadataColumn.class;
        Column.MetadataColumn.class.getClass();
        List list2 = (List) Stream.concat(map3, map4.map((v1) -> {
            return r2.cast(v1);
        }).map(metadataColumn2 -> {
            return metadataColumn2.getMetadataKey().orElse(metadataColumn2.getName());
        })).collect(Collectors.toList());
        Map<String, DataType> extractMetadataMap = extractMetadataMap(dynamicTableSink);
        flinkRelBuilder.projectNamed((List) Stream.concat(extractPhysicalColumns.stream().map(num2 -> {
            return flinkRelBuilder.field(adjustByVirtualColumns(columns, num2.intValue()));
        }), list.stream().map(num3 -> {
            Column.MetadataColumn metadataColumn3 = (Column.MetadataColumn) columns.get(num3.intValue());
            return rexBuilder.makeAbstractCast(flinkTypeFactory.createFieldTypeFromLogicalType(((DataType) extractMetadataMap.get(metadataColumn3.getMetadataKey().orElse(metadataColumn3.getName()))).getLogicalType()), flinkRelBuilder.field(adjustByVirtualColumns(columns, num3.intValue())));
        })).collect(Collectors.toList()), list2, true);
    }

    private static void prepareDynamicSink(String str, Map<String, String> map, boolean z, DynamicTableSink dynamicTableSink, ResolvedCatalogTable resolvedCatalogTable, List<SinkAbilitySpec> list) {
        validatePartitioning(str, map, dynamicTableSink, resolvedCatalogTable.getPartitionKeys());
        validateAndApplyOverwrite(str, z, dynamicTableSink, list);
        validateAndApplyMetadata(str, dynamicTableSink, resolvedCatalogTable.getResolvedSchema(), list);
    }

    private static List<Column.MetadataColumn> createRequiredMetadataColumns(ResolvedSchema resolvedSchema, DynamicTableSink dynamicTableSink) {
        List<Column> columns = resolvedSchema.getColumns();
        List<Integer> extractPersistedMetadataColumns = extractPersistedMetadataColumns(resolvedSchema);
        HashMap hashMap = new HashMap();
        Iterator<Integer> it = extractPersistedMetadataColumns.iterator();
        while (it.hasNext()) {
            Column.MetadataColumn metadataColumn = (Column.MetadataColumn) columns.get(it.next().intValue());
            hashMap.put(metadataColumn.getMetadataKey().orElse(metadataColumn.getName()), metadataColumn);
        }
        Stream<String> stream = extractMetadataMap(dynamicTableSink).keySet().stream();
        hashMap.getClass();
        Stream<String> filter = stream.filter((v1) -> {
            return r1.containsKey(v1);
        });
        hashMap.getClass();
        return (List) filter.map((v1) -> {
            return r1.get(v1);
        }).collect(Collectors.toList());
    }

    private static ValidationException createSchemaMismatchException(String str, String str2, List<RowType.RowField> list, List<RowType.RowField> list2) {
        return new ValidationException(String.format("Column types of query result and sink for '%s' do not match.\nCause: %s\n\nQuery schema: %s\nSink schema:  %s", str2, str, (String) list.stream().map(rowField -> {
            return rowField.getName() + ": " + rowField.getType().asSummaryString();
        }).collect(Collectors.joining(", ", SelectorUtils.PATTERN_HANDLER_PREFIX, "]")), (String) list2.stream().map(rowField2 -> {
            return rowField2.getName() + ": " + rowField2.getType().asSummaryString();
        }).collect(Collectors.joining(", ", SelectorUtils.PATTERN_HANDLER_PREFIX, "]"))));
    }

    private static DataType fixSinkDataType(DataTypeFactory dataTypeFactory, DataType dataType) {
        return DataTypeUtils.transform(dataTypeFactory, dataType, TypeTransformations.legacyRawToTypeInfoRaw(), TypeTransformations.legacyToNonLegacy(), TypeTransformations.toNullable());
    }

    private static void validatePartitioning(String str, Map<String, String> map, DynamicTableSink dynamicTableSink, List<String> list) {
        if (!list.isEmpty() && !(dynamicTableSink instanceof SupportsPartitioning)) {
            throw new TableException(String.format("Table '%s' is a partitioned table, but the underlying %s doesn't implement the %s interface.", str, DynamicTableSink.class.getSimpleName(), SupportsPartitioning.class.getSimpleName()));
        }
        map.keySet().forEach(str2 -> {
            if (!list.contains(str2)) {
                throw new ValidationException(String.format("Static partition column '%s' should be in the partition keys list %s for table '%s'.", str2, list, str));
            }
        });
    }

    private static void validateAndApplyOverwrite(String str, boolean z, DynamicTableSink dynamicTableSink, List<SinkAbilitySpec> list) {
        if (z) {
            if (!(dynamicTableSink instanceof SupportsOverwrite)) {
                throw new ValidationException(String.format("INSERT OVERWRITE requires that the underlying %s of table '%s' implements the %s interface.", DynamicTableSink.class.getSimpleName(), str, SupportsOverwrite.class.getSimpleName()));
            }
            list.add(new OverwriteSpec(true));
        }
    }

    private static List<Integer> extractPhysicalColumns(ResolvedSchema resolvedSchema) {
        List<Column> columns = resolvedSchema.getColumns();
        return (List) IntStream.range(0, resolvedSchema.getColumnCount()).filter(i -> {
            return ((Column) columns.get(i)).isPhysical();
        }).boxed().collect(Collectors.toList());
    }

    private static List<Integer> extractPersistedMetadataColumns(ResolvedSchema resolvedSchema) {
        List<Column> columns = resolvedSchema.getColumns();
        return (List) IntStream.range(0, resolvedSchema.getColumnCount()).filter(i -> {
            Column column = (Column) columns.get(i);
            return (column instanceof Column.MetadataColumn) && column.isPersisted();
        }).boxed().collect(Collectors.toList());
    }

    private static int adjustByVirtualColumns(List<Column> list, int i) {
        return i - ((int) IntStream.range(0, i).filter(i2 -> {
            return !((Column) list.get(i2)).isPersisted();
        }).count());
    }

    private static Map<String, DataType> extractMetadataMap(DynamicTableSink dynamicTableSink) {
        return dynamicTableSink instanceof SupportsWritingMetadata ? ((SupportsWritingMetadata) dynamicTableSink).listWritableMetadata() : Collections.emptyMap();
    }

    private static void validateAndApplyMetadata(String str, DynamicTableSink dynamicTableSink, ResolvedSchema resolvedSchema, List<SinkAbilitySpec> list) {
        List<Column> columns = resolvedSchema.getColumns();
        List<Integer> extractPersistedMetadataColumns = extractPersistedMetadataColumns(resolvedSchema);
        if (extractPersistedMetadataColumns.isEmpty()) {
            return;
        }
        if (!(dynamicTableSink instanceof SupportsWritingMetadata)) {
            throw new ValidationException(String.format("Table '%s' declares persistable metadata columns, but the underlying %s doesn't implement the %s interface. If the column should not be persisted, it can be declared with the VIRTUAL keyword.", str, DynamicTableSink.class.getSimpleName(), SupportsWritingMetadata.class.getSimpleName()));
        }
        Map<String, DataType> listWritableMetadata = ((SupportsWritingMetadata) dynamicTableSink).listWritableMetadata();
        extractPersistedMetadataColumns.forEach(num -> {
            Column.MetadataColumn metadataColumn = (Column.MetadataColumn) columns.get(num.intValue());
            String orElse = metadataColumn.getMetadataKey().orElse(metadataColumn.getName());
            LogicalType logicalType = metadataColumn.getDataType().getLogicalType();
            DataType dataType = (DataType) listWritableMetadata.get(orElse);
            if (dataType == null) {
                throw new ValidationException(String.format("Invalid metadata key '%s' in column '%s' of table '%s'. The %s class '%s' supports the following metadata keys for writing:\n%s", orElse, metadataColumn.getName(), str, DynamicTableSink.class.getSimpleName(), dynamicTableSink.getClass().getName(), String.join("\n", listWritableMetadata.keySet())));
            }
            if (LogicalTypeCasts.supportsExplicitCast(logicalType, dataType.getLogicalType())) {
                return;
            }
            if (!orElse.equals(metadataColumn.getName())) {
                throw new ValidationException(String.format("Invalid data type for metadata column '%s' with metadata key '%s' of table '%s'. The column cannot be declared as '%s' because the type must be castable to metadata type '%s'.", metadataColumn.getName(), orElse, str, logicalType, dataType.getLogicalType()));
            }
            throw new ValidationException(String.format("Invalid data type for metadata column '%s' of table '%s'. The column cannot be declared as '%s' because the type must be castable to metadata type '%s'.", metadataColumn.getName(), str, logicalType, dataType.getLogicalType()));
        });
        list.add(new WritingMetadataSpec((List) createRequiredMetadataColumns(resolvedSchema, dynamicTableSink).stream().map(metadataColumn -> {
            return metadataColumn.getMetadataKey().orElse(metadataColumn.getName());
        }).collect(Collectors.toList()), createConsumedType(resolvedSchema, dynamicTableSink)));
    }

    private static RowType createConsumedType(ResolvedSchema resolvedSchema, DynamicTableSink dynamicTableSink) {
        Map<String, DataType> extractMetadataMap = extractMetadataMap(dynamicTableSink);
        return new RowType(false, (List) Stream.concat(resolvedSchema.getColumns().stream().filter((v0) -> {
            return v0.isPhysical();
        }).map(column -> {
            return new RowType.RowField(column.getName(), column.getDataType().getLogicalType());
        }), createRequiredMetadataColumns(resolvedSchema, dynamicTableSink).stream().map(metadataColumn -> {
            return new RowType.RowField(metadataColumn.getName(), ((DataType) extractMetadataMap.get(metadataColumn.getMetadataKey().orElse(metadataColumn.getName()))).getLogicalType());
        })).collect(Collectors.toList()));
    }

    private DynamicSinkUtils() {
    }
}
