package org.apache.flink.table.planner.plan.rules.logical;

import java.time.Duration;
import java.util.List;
import java.util.Optional;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.WatermarkSpec;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark;
import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.plan.abilities.source.SourceAbilityContext;
import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
import org.apache.flink.table.planner.plan.abilities.source.SourceWatermarkSpec;
import org.apache.flink.table.planner.plan.abilities.source.WatermarkPushDownSpec;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.watermark.WatermarkParams;

/* loaded from: input_file:org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleBase.class */
public abstract class PushWatermarkIntoTableSourceScanRuleBase extends RelOptRule {
    public PushWatermarkIntoTableSourceScanRuleBase(RelOptRuleOperand relOptRuleOperand, String str) {
        super(relOptRuleOperand, str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FlinkLogicalTableSourceScan getNewScan(FlinkLogicalWatermarkAssigner flinkLogicalWatermarkAssigner, RexNode rexNode, FlinkLogicalTableSourceScan flinkLogicalTableSourceScan, TableConfig tableConfig, boolean z) {
        SourceAbilitySpec sourceAbilitySpec;
        TableSourceTable tableSourceTable = (TableSourceTable) flinkLogicalTableSourceScan.getTable().unwrap(TableSourceTable.class);
        DynamicTableSource copy = tableSourceTable.tableSource().copy();
        boolean z2 = (copy instanceof SupportsSourceWatermark) && hasSourceWatermarkDeclaration(rexNode);
        RelDataType rowType = z ? flinkLogicalWatermarkAssigner.getRowType() : flinkLogicalTableSourceScan.getRowType();
        RowType rowType2 = (RowType) FlinkTypeFactory.toLogicalType(rowType);
        SourceAbilityContext from = SourceAbilityContext.from(flinkLogicalTableSourceScan);
        if (z2) {
            SourceWatermarkSpec sourceWatermarkSpec = new SourceWatermarkSpec(true, rowType2);
            sourceWatermarkSpec.apply(copy, from);
            sourceAbilitySpec = sourceWatermarkSpec;
        } else {
            Duration duration = (Duration) tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT);
            long millis = (duration.isZero() || duration.isNegative()) ? -1L : duration.toMillis();
            Configuration configuration = (Configuration) flinkLogicalTableSourceScan.getHints().stream().filter(relHint -> {
                return relHint.hintName.equalsIgnoreCase("OPTIONS");
            }).findFirst().map(relHint2 -> {
                return Configuration.fromMap(relHint2.kvOptions);
            }).orElseGet(Configuration::new);
            Optional of = Optional.of(flinkLogicalTableSourceScan.getTable());
            Class<TableSourceTable> cls = TableSourceTable.class;
            TableSourceTable.class.getClass();
            WatermarkPushDownSpec watermarkPushDownSpec = new WatermarkPushDownSpec(rexNode, millis, rowType2, parseWatermarkParams(configuration, (Configuration) of.filter((v1) -> {
                return r1.isInstance(v1);
            }).map(relOptTable -> {
                return Configuration.fromMap(((TableSourceTable) relOptTable).contextResolvedTable().getResolvedTable().getOptions());
            }).orElseGet(Configuration::new)));
            watermarkPushDownSpec.apply(copy, from);
            sourceAbilitySpec = watermarkPushDownSpec;
        }
        return FlinkLogicalTableSourceScan.create(flinkLogicalTableSourceScan.getCluster(), flinkLogicalTableSourceScan.getHints(), tableSourceTable.copy(copy, rowType, new SourceAbilitySpec[]{sourceAbilitySpec}));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean supportsWatermarkPushDown(FlinkLogicalTableSourceScan flinkLogicalTableSourceScan) {
        TableSourceTable tableSourceTable = (TableSourceTable) flinkLogicalTableSourceScan.getTable().unwrap(TableSourceTable.class);
        if (tableSourceTable == null) {
            return false;
        }
        DynamicTableSource tableSource = tableSourceTable.tableSource();
        return (tableSource instanceof SupportsWatermarkPushDown) || ((tableSource instanceof SupportsSourceWatermark) && hasSourceWatermarkDeclaration(tableSourceTable));
    }

    private boolean hasSourceWatermarkDeclaration(TableSourceTable tableSourceTable) {
        List<WatermarkSpec> watermarkSpecs = tableSourceTable.contextResolvedTable().getResolvedSchema().getWatermarkSpecs();
        return watermarkSpecs.size() == 1 && ShortcutUtils.unwrapFunctionDefinition(watermarkSpecs.get(0).getWatermarkExpression()) == BuiltInFunctionDefinitions.SOURCE_WATERMARK;
    }

    private boolean hasSourceWatermarkDeclaration(RexNode rexNode) {
        return ShortcutUtils.unwrapFunctionDefinition(rexNode) == BuiltInFunctionDefinitions.SOURCE_WATERMARK;
    }

    private WatermarkParams parseWatermarkParams(Configuration configuration, Configuration configuration2) {
        WatermarkParams.WatermarkParamsBuilder builder = WatermarkParams.builder();
        Optional options = getOptions(FactoryUtil.WATERMARK_EMIT_STRATEGY, configuration, configuration2);
        builder.getClass();
        options.ifPresent(builder::emitStrategy);
        Optional options2 = getOptions(FactoryUtil.WATERMARK_ALIGNMENT_GROUP, configuration, configuration2);
        builder.getClass();
        options2.ifPresent(builder::alignGroupName);
        Optional options3 = getOptions(FactoryUtil.WATERMARK_ALIGNMENT_MAX_DRIFT, configuration, configuration2);
        builder.getClass();
        options3.ifPresent(builder::alignMaxDrift);
        Optional options4 = getOptions(FactoryUtil.WATERMARK_ALIGNMENT_UPDATE_INTERVAL, configuration, configuration2);
        builder.getClass();
        options4.ifPresent(builder::alignUpdateInterval);
        getOptions(FactoryUtil.SOURCE_IDLE_TIMEOUT, configuration, configuration2).ifPresent(duration -> {
            builder.sourceIdleTimeout(duration.toMillis());
        });
        return builder.build();
    }

    private <T> Optional<T> getOptions(ConfigOption<T> configOption, Configuration configuration, Configuration configuration2) {
        Optional<T> optional = configuration.getOptional(configOption);
        return optional.isPresent() ? optional : configuration2.getOptional(configOption);
    }
}
