package org.neo4j.bolt.runtime;

import io.netty.channel.Channel;
import java.net.SocketAddress;
import java.time.Clock;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.neo4j.bolt.BoltChannel;
import org.neo4j.bolt.BoltServer;
import org.neo4j.bolt.messaging.BoltResponseMessageWriter;
import org.neo4j.bolt.runtime.scheduling.BoltConnectionLifetimeListener;
import org.neo4j.bolt.runtime.scheduling.BoltConnectionQueueMonitor;
import org.neo4j.bolt.runtime.statemachine.BoltStateMachine;
import org.neo4j.bolt.transport.pipeline.KeepAliveHandler;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.logging.Log;
import org.neo4j.logging.internal.LogService;
import org.neo4j.util.FeatureToggles;

/* loaded from: input_file:org/neo4j/bolt/runtime/DefaultBoltConnection.class */
public class DefaultBoltConnection implements BoltConnection {
    static final int DEFAULT_MAX_BATCH_SIZE;
    private final String id;
    private final BoltChannel channel;
    private final BoltStateMachine machine;
    private final BoltConnectionLifetimeListener listener;
    private final BoltConnectionQueueMonitor queueMonitor;
    private final Log log;
    private final Log userLog;
    private final int maxBatchSize;
    private final List<Job> batch;
    private final LinkedBlockingQueue<Job> queue = new LinkedBlockingQueue<>();
    private final AtomicBoolean shouldClose = new AtomicBoolean();
    private final AtomicBoolean closed = new AtomicBoolean();
    private final AtomicBoolean idle = new AtomicBoolean(true);
    private final BoltConnectionMetricsMonitor metricsMonitor;
    private final Clock clock;
    private final BoltResponseMessageWriter messageWriter;
    private final KeepAliveHandler keepAliveHandler;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultBoltConnection(BoltChannel boltChannel, BoltResponseMessageWriter boltResponseMessageWriter, BoltStateMachine boltStateMachine, LogService logService, BoltConnectionLifetimeListener boltConnectionLifetimeListener, BoltConnectionQueueMonitor boltConnectionQueueMonitor, int i, KeepAliveHandler keepAliveHandler, BoltConnectionMetricsMonitor boltConnectionMetricsMonitor, Clock clock) {
        this.id = boltChannel.id();
        this.channel = boltChannel;
        this.machine = boltStateMachine;
        this.listener = boltConnectionLifetimeListener;
        this.queueMonitor = boltConnectionQueueMonitor;
        this.log = logService.getInternalLog(getClass());
        this.userLog = logService.getUserLog(getClass());
        this.maxBatchSize = i;
        this.batch = new ArrayList(i);
        this.metricsMonitor = boltConnectionMetricsMonitor;
        this.clock = clock;
        this.messageWriter = boltResponseMessageWriter;
        this.keepAliveHandler = keepAliveHandler;
    }

    @Override // org.neo4j.bolt.runtime.BoltConnection
    public String id() {
        return this.id;
    }

    @Override // org.neo4j.bolt.runtime.BoltConnection
    public boolean idle() {
        return this.idle.get() && this.queue.isEmpty();
    }

    @Override // org.neo4j.bolt.runtime.BoltConnection
    public SocketAddress localAddress() {
        return this.channel.serverAddress();
    }

    @Override // org.neo4j.bolt.runtime.BoltConnection
    public SocketAddress remoteAddress() {
        return this.channel.clientAddress();
    }

    @Override // org.neo4j.bolt.runtime.BoltConnection
    public Channel channel() {
        return this.channel.rawChannel();
    }

    @Override // org.neo4j.bolt.runtime.BoltConnection
    public boolean hasPendingJobs() {
        return !this.queue.isEmpty();
    }

    @Override // org.neo4j.bolt.runtime.BoltConnection
    public void start() {
        notifyCreated();
        this.metricsMonitor.connectionOpened();
    }

    @Override // org.neo4j.bolt.runtime.BoltConnection
    public void enqueue(Job job) {
        this.metricsMonitor.messageReceived();
        long millis = this.clock.millis();
        enqueueInternal(boltStateMachine -> {
            if (this.keepAliveHandler != null) {
                this.keepAliveHandler.setActive(true);
            }
            long millis2 = this.clock.millis() - millis;
            this.metricsMonitor.messageProcessingStarted(millis2);
            try {
                try {
                    job.perform(boltStateMachine);
                    this.metricsMonitor.messageProcessingCompleted((this.clock.millis() - millis) - millis2);
                    if (this.keepAliveHandler == null || boltStateMachine.hasOpenStatement()) {
                        return;
                    }
                    this.keepAliveHandler.setActive(false);
                } catch (Throwable th) {
                    this.metricsMonitor.messageProcessingFailed();
                    throw th;
                }
            } catch (Throwable th2) {
                if (this.keepAliveHandler != null && !boltStateMachine.hasOpenStatement()) {
                    this.keepAliveHandler.setActive(false);
                }
                throw th2;
            }
        });
    }

    @Override // org.neo4j.bolt.runtime.BoltConnection
    public boolean processNextBatch() {
        return processNextBatch(this.maxBatchSize, false);
    }

    private boolean processNextBatch(int i, boolean z) {
        this.idle.set(false);
        this.metricsMonitor.connectionActivated();
        try {
            boolean processNextBatchInternal = processNextBatchInternal(i, z);
            if (!processNextBatchInternal) {
                this.metricsMonitor.connectionClosed();
            }
            return processNextBatchInternal;
        } finally {
            this.idle.set(true);
            this.metricsMonitor.connectionWaiting();
        }
    }

    private boolean processNextBatchInternal(int i, boolean z) {
        try {
            boolean z2 = false;
            boolean z3 = false;
            do {
                try {
                    try {
                        try {
                            if (willClose()) {
                                break;
                            }
                            if (z2 || !this.queue.isEmpty()) {
                                this.queue.drainTo(this.batch, i);
                                if (this.batch.isEmpty() && !z) {
                                    while (true) {
                                        if (willClose()) {
                                            break;
                                        }
                                        Job poll = this.queue.poll(10L, TimeUnit.SECONDS);
                                        if (poll != null) {
                                            this.batch.add(poll);
                                            break;
                                        }
                                        this.machine.validateTransaction();
                                    }
                                }
                                notifyDrained(this.batch);
                                while (!this.batch.isEmpty()) {
                                    this.batch.remove(0).perform(this.machine);
                                }
                                z3 = this.machine.shouldStickOnThread();
                                z2 = z3;
                            }
                            if (this.queue.isEmpty()) {
                                this.messageWriter.flush();
                            }
                        } catch (BoltProtocolBreachFatality e) {
                            this.shouldClose.set(true);
                            this.log.error(String.format("Protocol breach detected in bolt session '%s'.", id()), e);
                            if (willClose()) {
                                close();
                            }
                        }
                    } catch (BoltConnectionAuthFatality e2) {
                        this.shouldClose.set(true);
                        if (e2.isLoggable()) {
                            this.userLog.warn(e2.getMessage());
                        }
                        if (willClose()) {
                            close();
                        }
                    }
                } catch (InterruptedException e3) {
                    this.shouldClose.set(true);
                    this.log.info("Bolt session '%s' is interrupted probably due to server shutdown.", new Object[]{id()});
                    if (willClose()) {
                        close();
                    }
                } catch (Throwable th) {
                    this.shouldClose.set(true);
                    this.userLog.error(String.format("Unexpected error detected in bolt session '%s'.", id()), th);
                    if (willClose()) {
                        close();
                    }
                }
            } while (z3);
            if (!$assertionsDisabled && !willClose() && this.machine.hasOpenStatement()) {
                throw new AssertionError();
            }
            if (willClose()) {
                close();
            }
            return !this.closed.get();
        } catch (Throwable th2) {
            if (willClose()) {
                close();
            }
            throw th2;
        }
    }

    @Override // org.neo4j.bolt.runtime.BoltConnection
    public void handleSchedulingError(Throwable th) {
        Neo4jError fatalFrom;
        String format;
        if (!willClose()) {
            if (ExceptionUtils.hasCause(th, RejectedExecutionException.class)) {
                fatalFrom = Neo4jError.from(Status.Request.NoThreadsAvailable, Status.Request.NoThreadsAvailable.code().description());
                format = String.format("Unable to schedule bolt session '%s' for execution since there are no available threads to serve it at the moment. You can retry at a later time or consider increasing max thread pool size for bolt connector(s).", id());
            } else {
                fatalFrom = Neo4jError.fatalFrom(th);
                format = String.format("Unexpected error during scheduling of bolt session '%s'.", id());
            }
            this.log.error(format, th);
            this.userLog.error(format);
            this.machine.markFailed(fatalFrom);
        }
        processNextBatch(1, true);
        close();
    }

    @Override // org.neo4j.bolt.runtime.BoltConnection
    public void interrupt() {
        this.machine.interrupt();
    }

    @Override // org.neo4j.bolt.runtime.BoltConnection
    public void stop() {
        if (this.shouldClose.compareAndSet(false, true)) {
            this.machine.markForTermination();
            enqueueInternal(boltStateMachine -> {
            });
        }
    }

    @Override // org.neo4j.bolt.runtime.BoltConnection
    @Deprecated(forRemoval = true)
    public void keepAlive() {
        try {
            this.messageWriter.keepAlive();
        } catch (Throwable th) {
            this.log.error("Failed to perform keep alive check.", th);
            this.shouldClose.set(true);
        }
    }

    @Override // org.neo4j.bolt.runtime.BoltConnection
    public void initKeepAliveTimer() {
        this.messageWriter.initKeepAliveTimer();
    }

    private boolean willClose() {
        return this.shouldClose.get();
    }

    private void close() {
        if (this.closed.compareAndSet(false, true)) {
            try {
                this.messageWriter.close();
            } catch (Throwable th) {
                this.log.error(String.format("Unable to close pack output of bolt session '%s'.", id()), th);
            }
            try {
                this.machine.close();
            } catch (Throwable th2) {
                this.log.error(String.format("Unable to close state machine of bolt session '%s'.", id()), th2);
            } finally {
                notifyDestroyed();
            }
        }
    }

    private void enqueueInternal(Job job) {
        this.queue.offer(job);
        notifyEnqueued(job);
    }

    private void notifyCreated() {
        if (this.listener != null) {
            this.listener.created(this);
        }
    }

    private void notifyDestroyed() {
        if (this.listener != null) {
            this.listener.closed(this);
        }
    }

    private void notifyEnqueued(Job job) {
        if (this.queueMonitor != null) {
            this.queueMonitor.enqueued(this, job);
        }
    }

    private void notifyDrained(List<Job> list) {
        if (this.queueMonitor == null || list.isEmpty()) {
            return;
        }
        this.queueMonitor.drained(this, list);
    }

    static {
        $assertionsDisabled = !DefaultBoltConnection.class.desiredAssertionStatus();
        DEFAULT_MAX_BATCH_SIZE = FeatureToggles.getInteger(BoltServer.class, "max_batch_size", 100);
    }
}
