package org.apache.flink.runtime.blob;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.Reference;
import org.apache.flink.util.ShutdownHookUtil;
import org.apache.flink.util.function.FunctionUtils;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/blob/AbstractBlobCache.class */
public abstract class AbstractBlobCache implements Closeable {
    protected final Logger log;
    protected final Reference<File> storageDir;
    protected final BlobView blobView;
    protected final Thread shutdownHook;
    protected final int numFetchRetries;
    protected final Configuration blobClientConfig;

    @Nullable
    protected volatile InetSocketAddress serverAddress;
    protected final AtomicLong tempFileCounter = new AtomicLong(0);
    protected final AtomicBoolean shutdownRequested = new AtomicBoolean();
    protected final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    public AbstractBlobCache(Configuration configuration, Reference<File> reference, BlobView blobView, Logger logger, @Nullable InetSocketAddress inetSocketAddress) throws IOException {
        this.log = (Logger) Preconditions.checkNotNull(logger);
        this.blobClientConfig = (Configuration) Preconditions.checkNotNull(configuration);
        this.blobView = (BlobView) Preconditions.checkNotNull(blobView);
        this.storageDir = reference;
        this.log.info("Created BLOB cache storage directory " + reference);
        int integer = configuration.getInteger(BlobServerOptions.FETCH_RETRIES);
        if (integer >= 0) {
            this.numFetchRetries = integer;
        } else {
            this.log.warn("Invalid value for {}. System will attempt no retries on failed fetch operations of BLOBs.", BlobServerOptions.FETCH_RETRIES.key());
            this.numFetchRetries = 0;
        }
        this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, getClass().getSimpleName(), this.log);
        this.serverAddress = inetSocketAddress;
        checkStoredBlobsForCorruption();
    }

    private void checkStoredBlobsForCorruption() throws IOException {
        if (this.storageDir.deref().exists()) {
            BlobUtils.checkAndDeleteCorruptedBlobs(this.storageDir.deref().toPath(), this.log);
        }
    }

    public File getStorageDir() {
        return this.storageDir.deref();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public File getFileInternal(@Nullable JobID jobID, BlobKey blobKey) throws IOException {
        Preconditions.checkArgument(blobKey != null, "BLOB key cannot be null.");
        File storageLocation = BlobUtils.getStorageLocation(this.storageDir.deref(), jobID, blobKey);
        this.readWriteLock.readLock().lock();
        try {
            if (storageLocation.exists()) {
                return storageLocation;
            }
            this.readWriteLock.readLock().unlock();
            File createTemporaryFilename = createTemporaryFilename();
            try {
                try {
                } catch (Exception e) {
                    this.log.info("Failed to copy from blob store. Downloading from BLOB server instead.", (Throwable) e);
                }
                if (this.blobView.get(jobID, blobKey, createTemporaryFilename)) {
                    this.readWriteLock.writeLock().lock();
                    try {
                        BlobUtils.moveTempFileToStore(createTemporaryFilename, jobID, blobKey, storageLocation, this.log, null);
                        this.readWriteLock.writeLock().unlock();
                        if (!createTemporaryFilename.delete() && createTemporaryFilename.exists()) {
                            this.log.warn("Could not delete the staging file {} for blob key {} and job {}.", createTemporaryFilename, blobKey, jobID);
                        }
                        return storageLocation;
                    } finally {
                    }
                }
                InetSocketAddress inetSocketAddress = this.serverAddress;
                if (inetSocketAddress == null) {
                    throw new IOException("Cannot download from BlobServer, because the server address is unknown.");
                }
                BlobClient.downloadFromBlobServer(jobID, blobKey, createTemporaryFilename, inetSocketAddress, this.blobClientConfig, this.numFetchRetries);
                this.readWriteLock.writeLock().lock();
                try {
                    BlobUtils.moveTempFileToStore(createTemporaryFilename, jobID, blobKey, storageLocation, this.log, null);
                    this.readWriteLock.writeLock().unlock();
                    if (!createTemporaryFilename.delete() && createTemporaryFilename.exists()) {
                        this.log.warn("Could not delete the staging file {} for blob key {} and job {}.", createTemporaryFilename, blobKey, jobID);
                    }
                    return storageLocation;
                } finally {
                }
            } catch (Throwable th) {
                if (!createTemporaryFilename.delete() && createTemporaryFilename.exists()) {
                    this.log.warn("Could not delete the staging file {} for blob key {} and job {}.", createTemporaryFilename, blobKey, jobID);
                }
                throw th;
            }
        } finally {
            this.readWriteLock.readLock().unlock();
        }
    }

    public int getPort() {
        InetSocketAddress inetSocketAddress = this.serverAddress;
        if (inetSocketAddress != null) {
            return inetSocketAddress.getPort();
        }
        return -1;
    }

    public void setBlobServerAddress(InetSocketAddress inetSocketAddress) {
        this.serverAddress = (InetSocketAddress) Preconditions.checkNotNull(inetSocketAddress);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public File createTemporaryFilename() throws IOException {
        return new File(BlobUtils.getIncomingDirectory(this.storageDir.deref()), String.format("temp-%08d", Long.valueOf(this.tempFileCounter.getAndIncrement())));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        cancelCleanupTask();
        if (this.shutdownRequested.compareAndSet(false, true)) {
            this.log.info("Shutting down BLOB cache");
            try {
                this.storageDir.owned().ifPresent(FunctionUtils.uncheckedConsumer(FileUtils::deleteDirectory));
            } finally {
                ShutdownHookUtil.removeShutdownHook(this.shutdownHook, getClass().getSimpleName(), this.log);
            }
        }
    }

    protected abstract void cancelCleanupTask();
}
