package org.neo4j.fabric.stream;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.neo4j.fabric.executor.Exceptions;
import org.neo4j.kernel.api.exceptions.Status;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;

/* loaded from: input_file:org/neo4j/fabric/stream/Rx2SyncStream.class */
public class Rx2SyncStream {
    private static final RecordOrError END = new RecordOrError(null, null);
    private final RecordSubscriber recordSubscriber = new RecordSubscriber();
    private final BlockingQueue<RecordOrError> buffer;
    private final int batchSize;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/neo4j/fabric/stream/Rx2SyncStream$RecordOrError.class */
    public static class RecordOrError {
        private final Record record;
        private final Throwable error;

        RecordOrError(Record record, Throwable th) {
            this.record = record;
            this.error = th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/neo4j/fabric/stream/Rx2SyncStream$RecordSubscriber.class */
    public class RecordSubscriber implements Subscriber<Record> {
        private volatile Subscription subscription;
        private AtomicLong pendingRequested = new AtomicLong(0);

        private RecordSubscriber() {
        }

        @Override // org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(Record record) {
            this.pendingRequested.decrementAndGet();
            Rx2SyncStream.this.buffer.add(new RecordOrError(record, null));
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            Rx2SyncStream.this.buffer.add(new RecordOrError(null, th));
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
            Rx2SyncStream.this.buffer.add(Rx2SyncStream.END);
        }

        void request(long j) {
            this.pendingRequested.addAndGet(j);
            this.subscription.request(j);
        }

        void close() {
            this.subscription.cancel();
        }
    }

    public Rx2SyncStream(Flux<Record> flux, int i) {
        this.batchSize = i;
        this.buffer = new ArrayBlockingQueue(i + 1);
        flux.subscribeWith(this.recordSubscriber);
    }

    public Record readRecord() {
        maybeRequest();
        try {
            RecordOrError take = this.buffer.take();
            if (take == END) {
                return null;
            }
            if (take.error != null) {
                throw Exceptions.transform(Status.Statement.ExecutionFailed, take.error);
            }
            return take.record;
        } catch (InterruptedException e) {
            this.recordSubscriber.close();
            throw new IllegalStateException(e);
        }
    }

    public boolean completed() {
        return this.buffer.peek() == END;
    }

    public void close() {
        this.recordSubscriber.close();
    }

    private void maybeRequest() {
        if (this.recordSubscriber.pendingRequested.get() + this.buffer.size() == 0) {
            this.recordSubscriber.request(this.batchSize);
        }
    }
}
