package org.apache.flink.streaming.api.functions.sink.filesystem;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.class */
public class Buckets<IN, BucketID> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) Buckets.class);
    private final Path basePath;
    private final BucketFactory<IN, BucketID> bucketFactory;
    private final BucketAssigner<IN, BucketID> bucketAssigner;
    private final BucketWriter<IN, BucketID> bucketWriter;
    private final RollingPolicy<IN, BucketID> rollingPolicy;
    private final int subtaskIndex;
    private final OutputFileConfig outputFileConfig;

    @Nullable
    private BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener;

    @Nullable
    private FileLifeCycleListener<BucketID> fileLifeCycleListener;
    private final BucketStateSerializer<BucketID> bucketStateSerializer;
    private final Map<BucketID, Bucket<IN, BucketID>> activeBuckets = new HashMap();
    private final BucketerContext bucketerContext = new BucketerContext();
    private long maxPartCounter = 0;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/Buckets$BucketerContext.class */
    public static final class BucketerContext implements BucketAssigner.Context {

        @Nullable
        private Long elementTimestamp;
        private long currentWatermark;
        private long currentProcessingTime;

        private BucketerContext() {
            this.elementTimestamp = null;
            this.currentWatermark = Long.MIN_VALUE;
            this.currentProcessingTime = Long.MIN_VALUE;
        }

        void update(@Nullable Long l, long j, long j2) {
            this.elementTimestamp = l;
            this.currentWatermark = j;
            this.currentProcessingTime = j2;
        }

        @Override // org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner.Context
        public long currentProcessingTime() {
            return this.currentProcessingTime;
        }

        @Override // org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner.Context
        public long currentWatermark() {
            return this.currentWatermark;
        }

        @Override // org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner.Context
        @Nullable
        public Long timestamp() {
            return this.elementTimestamp;
        }
    }

    public Buckets(Path path, BucketAssigner<IN, BucketID> bucketAssigner, BucketFactory<IN, BucketID> bucketFactory, BucketWriter<IN, BucketID> bucketWriter, RollingPolicy<IN, BucketID> rollingPolicy, int i, OutputFileConfig outputFileConfig) {
        this.basePath = (Path) Preconditions.checkNotNull(path);
        this.bucketAssigner = (BucketAssigner) Preconditions.checkNotNull(bucketAssigner);
        this.bucketFactory = (BucketFactory) Preconditions.checkNotNull(bucketFactory);
        this.bucketWriter = (BucketWriter) Preconditions.checkNotNull(bucketWriter);
        this.rollingPolicy = (RollingPolicy) Preconditions.checkNotNull(rollingPolicy);
        this.subtaskIndex = i;
        this.outputFileConfig = (OutputFileConfig) Preconditions.checkNotNull(outputFileConfig);
        this.bucketStateSerializer = new BucketStateSerializer<>(bucketWriter.getProperties().getInProgressFileRecoverableSerializer(), bucketWriter.getProperties().getPendingFileRecoverableSerializer(), bucketAssigner.getSerializer());
    }

    public void setBucketLifeCycleListener(BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener) {
        this.bucketLifeCycleListener = (BucketLifeCycleListener) Preconditions.checkNotNull(bucketLifeCycleListener);
    }

    public void setFileLifeCycleListener(FileLifeCycleListener<BucketID> fileLifeCycleListener) {
        this.fileLifeCycleListener = (FileLifeCycleListener) Preconditions.checkNotNull(fileLifeCycleListener);
    }

    public void initializeState(ListState<byte[]> listState, ListState<Long> listState2) throws Exception {
        initializePartCounter(listState2);
        LOG.info("Subtask {} initializing its state (max part counter={}).", Integer.valueOf(this.subtaskIndex), Long.valueOf(this.maxPartCounter));
        initializeActiveBuckets(listState);
    }

    private void initializePartCounter(ListState<Long> listState) throws Exception {
        long j = 0;
        Iterator it = listState.get().iterator();
        while (it.hasNext()) {
            j = Math.max(((Long) it.next()).longValue(), j);
        }
        this.maxPartCounter = j;
    }

    private void initializeActiveBuckets(ListState<byte[]> listState) throws Exception {
        Iterator it = listState.get().iterator();
        while (it.hasNext()) {
            handleRestoredBucketState((BucketState) SimpleVersionedSerialization.readVersionAndDeSerialize(this.bucketStateSerializer, (byte[]) it.next()));
        }
    }

    private void handleRestoredBucketState(BucketState<BucketID> bucketState) throws Exception {
        BucketID bucketId = bucketState.getBucketId();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Subtask {} restoring: {}", Integer.valueOf(this.subtaskIndex), bucketState);
        }
        updateActiveBucketId(bucketId, this.bucketFactory.restoreBucket(this.subtaskIndex, this.maxPartCounter, this.bucketWriter, this.rollingPolicy, bucketState, this.fileLifeCycleListener, this.outputFileConfig));
    }

    private void updateActiveBucketId(BucketID bucketid, Bucket<IN, BucketID> bucket) throws IOException {
        if (!bucket.isActive()) {
            notifyBucketInactive(bucket);
            return;
        }
        Bucket<IN, BucketID> bucket2 = this.activeBuckets.get(bucketid);
        if (bucket2 != null) {
            bucket2.merge(bucket);
        } else {
            this.activeBuckets.put(bucketid, bucket);
        }
    }

    public void commitUpToCheckpoint(long j) throws IOException {
        Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> it = this.activeBuckets.entrySet().iterator();
        LOG.info("Subtask {} received completion notification for checkpoint with id={}.", Integer.valueOf(this.subtaskIndex), Long.valueOf(j));
        while (it.hasNext()) {
            Bucket<IN, BucketID> value = it.next().getValue();
            value.onSuccessfulCompletionOfCheckpoint(j);
            if (!value.isActive()) {
                it.remove();
                notifyBucketInactive(value);
            }
        }
    }

    public void snapshotState(long j, ListState<byte[]> listState, ListState<Long> listState2) throws Exception {
        Preconditions.checkState((this.bucketWriter == null || this.bucketStateSerializer == null) ? false : true, "sink has not been initialized");
        LOG.info("Subtask {} checkpointing for checkpoint with id={} (max part counter={}).", Integer.valueOf(this.subtaskIndex), Long.valueOf(j), Long.valueOf(this.maxPartCounter));
        listState.clear();
        listState2.clear();
        snapshotActiveBuckets(j, listState);
        listState2.add(Long.valueOf(this.maxPartCounter));
    }

    private void snapshotActiveBuckets(long j, ListState<byte[]> listState) throws Exception {
        Iterator<Bucket<IN, BucketID>> it = this.activeBuckets.values().iterator();
        while (it.hasNext()) {
            BucketState<BucketID> onReceptionOfCheckpoint = it.next().onReceptionOfCheckpoint(j);
            listState.add(SimpleVersionedSerialization.writeVersionAndSerialize(this.bucketStateSerializer, onReceptionOfCheckpoint));
            if (LOG.isDebugEnabled()) {
                LOG.debug("Subtask {} checkpointing: {}", Integer.valueOf(this.subtaskIndex), onReceptionOfCheckpoint);
            }
        }
    }

    @VisibleForTesting
    public Bucket<IN, BucketID> onElement(IN in, SinkFunction.Context context) throws Exception {
        return onElement(in, context.currentProcessingTime(), context.timestamp(), context.currentWatermark());
    }

    public Bucket<IN, BucketID> onElement(IN in, long j, @Nullable Long l, long j2) throws Exception {
        this.bucketerContext.update(l, j2, j);
        Bucket<IN, BucketID> orCreateBucketForBucketId = getOrCreateBucketForBucketId(this.bucketAssigner.getBucketId(in, this.bucketerContext));
        orCreateBucketForBucketId.write(in, j);
        this.maxPartCounter = Math.max(this.maxPartCounter, orCreateBucketForBucketId.getPartCounter());
        return orCreateBucketForBucketId;
    }

    private Bucket<IN, BucketID> getOrCreateBucketForBucketId(BucketID bucketid) throws IOException {
        Bucket<IN, BucketID> bucket = this.activeBuckets.get(bucketid);
        if (bucket == null) {
            bucket = this.bucketFactory.getNewBucket(this.subtaskIndex, bucketid, assembleBucketPath(bucketid), this.maxPartCounter, this.bucketWriter, this.rollingPolicy, this.fileLifeCycleListener, this.outputFileConfig);
            this.activeBuckets.put(bucketid, bucket);
            notifyBucketCreate(bucket);
        }
        return bucket;
    }

    public void onProcessingTime(long j) throws Exception {
        Iterator<Bucket<IN, BucketID>> it = this.activeBuckets.values().iterator();
        while (it.hasNext()) {
            it.next().onProcessingTime(j);
        }
    }

    public void closePartFileForBucket(BucketID bucketid) throws Exception {
        Bucket<IN, BucketID> bucket = this.activeBuckets.get(bucketid);
        if (bucket != null) {
            bucket.closePartFile();
        }
    }

    public void close() {
        if (this.activeBuckets != null) {
            this.activeBuckets.values().forEach((v0) -> {
                v0.disposePartFile();
            });
        }
    }

    private Path assembleBucketPath(BucketID bucketid) {
        String obj = bucketid.toString();
        return "".equals(obj) ? this.basePath : new Path(this.basePath, obj);
    }

    private void notifyBucketCreate(Bucket<IN, BucketID> bucket) {
        if (this.bucketLifeCycleListener != null) {
            this.bucketLifeCycleListener.bucketCreated(bucket);
        }
    }

    private void notifyBucketInactive(Bucket<IN, BucketID> bucket) {
        if (this.bucketLifeCycleListener != null) {
            this.bucketLifeCycleListener.bucketInactive(bucket);
        }
    }

    @VisibleForTesting
    public long getMaxPartCounter() {
        return this.maxPartCounter;
    }

    @VisibleForTesting
    Map<BucketID, Bucket<IN, BucketID>> getActiveBuckets() {
        return this.activeBuckets;
    }
}
