package com.atguigu.tms.realtime.app.ods;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.util.CreateEnvUtil;
import com.atguigu.tms.realtime.util.DateFormatUtil;
import com.atguigu.tms.realtime.util.KafkaUtil;
import com.ibm.icu.text.DateFormat;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.hadoop.fs.audit.AuditConstants;

/* loaded from: input_file:com/atguigu/tms/realtime/app/ods/OdsApp.class */
public class OdsApp {
    public static void main(String[] strArr) throws Exception {
        StreamExecutionEnvironment streamEnv = CreateEnvUtil.getStreamEnv(strArr);
        streamEnv.setParallelism(1);
        sinkToKafka("dim", "6020", "ods_dim_source", streamEnv, strArr);
        sinkToKafka("dwd", "6030", "ods_dwd_source", streamEnv, strArr);
        streamEnv.execute();
    }

    public static void sinkToKafka(String str, String str2, String str3, StreamExecutionEnvironment streamExecutionEnvironment, String[] strArr) {
        DataStreamSource parallelism = streamExecutionEnvironment.fromSource(CreateEnvUtil.getJSONSchemaMysqlSource(str, str2, strArr), WatermarkStrategy.noWatermarks(), str3).setParallelism(1);
        final String str4 = ParameterTool.fromArgs(strArr).get("mock_date");
        parallelism.flatMap(new FlatMapFunction<String, String>() { // from class: com.atguigu.tms.realtime.app.ods.OdsApp.1
            @Override // org.apache.flink.api.common.functions.FlatMapFunction
            public void flatMap(String str5, Collector<String> collector) throws Exception {
                try {
                    JSONObject parseObject = JSON.parseObject(str5);
                    if (parseObject.getJSONObject("after") != null && !parseObject.getString("op").equals(DateFormat.DAY)) {
                        Long l = parseObject.getLong("ts_ms");
                        if (str4 != null) {
                            parseObject.put(AuditConstants.PARAM_TIMESTAMP, (Object) DateFormatUtil.toTs(str4 + DateFormatUtil.toYmdHms(l).substring(10)));
                        } else {
                            parseObject.put(AuditConstants.PARAM_TIMESTAMP, (Object) l);
                        }
                        parseObject.remove("ts_ms");
                        collector.collect(parseObject.toJSONString());
                    }
                } catch (JSONException e) {
                    e.printStackTrace();
                }
            }
        }).setParallelism(1).keyBy(new KeySelector<String, String>() { // from class: com.atguigu.tms.realtime.app.ods.OdsApp.2
            @Override // org.apache.flink.api.java.functions.KeySelector
            public String getKey(String str5) {
                return JSON.parseObject(str5).getJSONObject("after").getString("id");
            }
        }).addSink(KafkaUtil.getKafkaProducer("tms_ods", strArr));
    }
}
