package org.apache.flink.runtime.highavailability;

import java.io.IOException;
import java.util.HashSet;
import java.util.NoSuchElementException;
import java.util.Set;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.rest.messages.json.JobResultDeserializer;
import org.apache.flink.runtime.rest.messages.json.JobResultSerializer;
import org.apache.flink.runtime.util.NonClosingOutputStreamDecorator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/highavailability/FileSystemJobResultStore.class */
public class FileSystemJobResultStore extends AbstractThreadsafeJobResultStore {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) FileSystemJobResultStore.class);

    @VisibleForTesting
    static final String FILE_EXTENSION = ".json";

    @VisibleForTesting
    static final String DIRTY_FILE_EXTENSION = "_DIRTY.json";
    private final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
    private final FileSystem fileSystem;
    private volatile boolean basePathCreated;
    private final Path basePath;
    private final boolean deleteOnCommit;

    @VisibleForTesting
    @JsonIgnoreProperties(value = {"version"}, allowGetters = true)
    /* loaded from: input_file:org/apache/flink/runtime/highavailability/FileSystemJobResultStore$JsonJobResultEntry.class */
    static class JsonJobResultEntry extends JobResultEntry {
        private static final String FIELD_NAME_RESULT = "result";
        static final String FIELD_NAME_VERSION = "version";

        private JsonJobResultEntry(JobResultEntry jobResultEntry) {
            this(jobResultEntry.getJobResult());
        }

        @JsonCreator
        private JsonJobResultEntry(@JsonProperty("result") JobResult jobResult) {
            super(jobResult);
        }

        @Override // org.apache.flink.runtime.highavailability.JobResultEntry
        @JsonSerialize(using = JobResultSerializer.class)
        @JsonDeserialize(using = JobResultDeserializer.class)
        @JsonProperty(FIELD_NAME_RESULT)
        public JobResult getJobResult() {
            return super.getJobResult();
        }

        @Override // org.apache.flink.runtime.highavailability.JobResultEntry
        @JsonIgnore
        public JobID getJobId() {
            return super.getJobId();
        }

        public int getVersion() {
            return 1;
        }
    }

    @VisibleForTesting
    public static boolean hasValidDirtyJobResultStoreEntryExtension(String str) {
        return str.endsWith(DIRTY_FILE_EXTENSION);
    }

    @VisibleForTesting
    public static boolean hasValidJobResultStoreEntryExtension(String str) {
        return str.endsWith(FILE_EXTENSION);
    }

    @VisibleForTesting
    FileSystemJobResultStore(FileSystem fileSystem, Path path, boolean z) {
        this.fileSystem = fileSystem;
        this.basePath = path;
        this.deleteOnCommit = z;
    }

    public static FileSystemJobResultStore fromConfiguration(Configuration configuration) throws IOException {
        Preconditions.checkNotNull(configuration);
        String str = (String) configuration.get(JobResultStoreOptions.STORAGE_PATH);
        Path path = StringUtils.isNullOrWhitespaceOnly(str) ? new Path(createDefaultJobResultStorePath((String) configuration.get(HighAvailabilityOptions.HA_STORAGE_PATH), (String) configuration.get(HighAvailabilityOptions.HA_CLUSTER_ID))) : new Path(str);
        return new FileSystemJobResultStore(path.getFileSystem(), path, ((Boolean) configuration.get(JobResultStoreOptions.DELETE_ON_COMMIT)).booleanValue());
    }

    private void createBasePathIfNeeded() throws IOException {
        if (this.basePathCreated) {
            return;
        }
        LOG.info("Creating highly available job result storage directory at {}", this.basePath);
        this.fileSystem.mkdirs(this.basePath);
        LOG.info("Created highly available job result storage directory at {}", this.basePath);
        this.basePathCreated = true;
    }

    public static String createDefaultJobResultStorePath(String str, String str2) {
        return str + "/job-result-store/" + str2;
    }

    private Path constructDirtyPath(JobID jobID) {
        return constructEntryPath(jobID.toString() + DIRTY_FILE_EXTENSION);
    }

    private Path constructCleanPath(JobID jobID) {
        return constructEntryPath(jobID.toString() + FILE_EXTENSION);
    }

    @VisibleForTesting
    Path constructEntryPath(String str) {
        return new Path(this.basePath, str);
    }

    @Override // org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore
    public void createDirtyResultInternal(JobResultEntry jobResultEntry) throws IOException {
        createBasePathIfNeeded();
        FSDataOutputStream create = this.fileSystem.create(constructDirtyPath(jobResultEntry.getJobId()), FileSystem.WriteMode.NO_OVERWRITE);
        Throwable th = null;
        try {
            this.mapper.writeValue(new NonClosingOutputStreamDecorator(create), new JsonJobResultEntry(jobResultEntry));
            if (create != null) {
                if (0 == 0) {
                    create.close();
                    return;
                }
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    @Override // org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore
    public void markResultAsCleanInternal(JobID jobID) throws IOException, NoSuchElementException {
        Path constructDirtyPath = constructDirtyPath(jobID);
        if (!this.fileSystem.exists(constructDirtyPath)) {
            throw new NoSuchElementException(String.format("Could not mark job %s as clean as it is not present in the job result store.", jobID));
        }
        if (this.deleteOnCommit) {
            this.fileSystem.delete(constructDirtyPath, false);
        } else {
            this.fileSystem.rename(constructDirtyPath, constructCleanPath(jobID));
        }
    }

    @Override // org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore
    public boolean hasDirtyJobResultEntryInternal(JobID jobID) throws IOException {
        return this.fileSystem.exists(constructDirtyPath(jobID));
    }

    @Override // org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore
    public boolean hasCleanJobResultEntryInternal(JobID jobID) throws IOException {
        return this.fileSystem.exists(constructCleanPath(jobID));
    }

    @Override // org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore
    public Set<JobResult> getDirtyResultsInternal() throws IOException {
        createBasePathIfNeeded();
        FileStatus[] listStatus = this.fileSystem.listStatus(this.basePath);
        Preconditions.checkState(listStatus != null, "The base directory of the JobResultStore isn't accessible. No dirty JobResults can be restored.");
        HashSet hashSet = new HashSet();
        for (FileStatus fileStatus : listStatus) {
            if (!fileStatus.isDir() && hasValidDirtyJobResultStoreEntryExtension(fileStatus.getPath().getName())) {
                hashSet.add(((JsonJobResultEntry) this.mapper.readValue(this.fileSystem.open(fileStatus.getPath()), JsonJobResultEntry.class)).getJobResult());
            }
        }
        return hashSet;
    }
}
