/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.common.util.concurrent;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.util.concurrent.ThreadContext;

public abstract class AsyncIOProcessor<Item> {
    private final Logger logger;
    private final ArrayBlockingQueue<Tuple<Item, Consumer<Exception>>> queue;
    private final ThreadContext threadContext;
    private final Semaphore promiseSemaphore = new Semaphore(1);
    private long lastRunStartTimeInNs;

    protected AsyncIOProcessor(Logger logger, int queueSize, ThreadContext threadContext) {
        this.logger = logger;
        this.queue = new ArrayBlockingQueue(queueSize);
        this.threadContext = threadContext;
    }

    public void put(Item item, Consumer<Exception> listener) {
        Objects.requireNonNull(item, "item must not be null");
        Objects.requireNonNull(listener, "listener must not be null");
        boolean promised = this.promiseSemaphore.tryAcquire();
        if (!promised) {
            this.addToQueue(item, listener);
        }
        if (promised || this.promiseSemaphore.tryAcquire()) {
            ArrayList<Tuple<Item, Consumer<Exception>>> candidates = new ArrayList<Tuple<Item, Consumer<Exception>>>();
            if (promised) {
                candidates.add(new Tuple(item, listener));
            }
            this.drainAndProcessAndRelease(candidates);
            while (!this.queue.isEmpty() && this.promiseSemaphore.tryAcquire()) {
                this.drainAndProcessAndRelease(candidates);
            }
        }
    }

    void addToQueue(Item item, Consumer<Exception> listener) {
        try {
            this.queue.put(new Tuple(item, this.preserveContext(listener)));
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            listener.accept(e);
        }
    }

    void drainAndProcessAndRelease(List<Tuple<Item, Consumer<Exception>>> candidates) {
        Exception exception;
        this.lastRunStartTimeInNs = System.nanoTime();
        try {
            this.queue.drainTo(candidates);
            exception = this.processList(candidates);
        }
        finally {
            this.promiseSemaphore.release();
        }
        this.notifyList(candidates, exception);
        candidates.clear();
    }

    private Exception processList(List<Tuple<Item, Consumer<Exception>>> candidates) {
        Exception exception = null;
        if (!candidates.isEmpty()) {
            try {
                this.write(candidates);
            }
            catch (Exception ex) {
                this.logger.debug("failed to write candidates", (Throwable)ex);
                exception = ex;
            }
        }
        return exception;
    }

    void notifyList(List<Tuple<Item, Consumer<Exception>>> candidates, Exception exception) {
        for (Tuple<Item, Consumer<Exception>> tuple : candidates) {
            Consumer consumer = (Consumer)tuple.v2();
            try {
                consumer.accept(exception);
            }
            catch (Exception ex) {
                this.logger.warn("failed to notify callback", (Throwable)ex);
            }
        }
    }

    Consumer<Exception> preserveContext(Consumer<Exception> consumer) {
        Supplier<ThreadContext.StoredContext> restorableContext = this.threadContext.newRestorableContext(false);
        return e -> {
            try (ThreadContext.StoredContext ignore = (ThreadContext.StoredContext)restorableContext.get();){
                consumer.accept((Exception)e);
            }
        };
    }

    protected abstract void write(List<Tuple<Item, Consumer<Exception>>> var1) throws IOException;

    Logger getLogger() {
        return this.logger;
    }

    Semaphore getPromiseSemaphore() {
        return this.promiseSemaphore;
    }

    long getLastRunStartTimeInNs() {
        return this.lastRunStartTimeInNs;
    }

    ArrayBlockingQueue<Tuple<Item, Consumer<Exception>>> getQueue() {
        return this.queue;
    }
}

