package io.vertx.ext.reactivestreams.impl;

import io.vertx.core.Handler;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.StreamBase;
import io.vertx.ext.reactivestreams.ReactiveReadStream;
import java.util.ArrayDeque;
import java.util.Queue;
import org.reactivestreams.Subscription;

/* loaded from: classes2.dex */
public class ReactiveReadStreamImpl<T> implements ReactiveReadStream<T> {
    private final long batchSize;
    private Handler<T> dataHandler;
    private Handler<Void> endHandler;
    private Handler<Throwable> exceptionHandler;
    private boolean paused;
    private final Queue<T> pending = new ArrayDeque();
    private Subscription subscription;
    private long tokens;

    public ReactiveReadStreamImpl(long j) {
        this.batchSize = j;
    }

    private void checkRequestTokens() {
        Subscription subscription;
        if (this.paused || (subscription = this.subscription) == null || this.tokens != 0) {
            return;
        }
        long j = this.batchSize;
        this.tokens = j;
        subscription.request(j);
    }

    private synchronized void handleData(T t) {
        if (this.paused) {
            this.pending.add(t);
        } else if (this.dataHandler != null) {
            this.dataHandler.handle(t);
            this.tokens--;
            checkRequestTokens();
        }
    }

    protected void checkUnsolicitedTokens() {
        if (this.tokens == 0) {
            throw new IllegalStateException("Data received but wasn't requested");
        }
    }

    @Override // io.vertx.core.streams.ReadStream
    public /* bridge */ /* synthetic */ ReadStream endHandler(Handler handler) {
        return endHandler((Handler<Void>) handler);
    }

    @Override // io.vertx.ext.reactivestreams.ReactiveReadStream, io.vertx.core.streams.ReadStream
    public synchronized ReactiveReadStream<T> endHandler(Handler<Void> handler) {
        this.endHandler = handler;
        return this;
    }

    @Override // io.vertx.core.streams.ReadStream, io.vertx.core.streams.StreamBase
    public /* bridge */ /* synthetic */ ReadStream exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }

    @Override // io.vertx.core.streams.StreamBase
    public /* bridge */ /* synthetic */ StreamBase exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }

    @Override // io.vertx.ext.reactivestreams.ReactiveReadStream, io.vertx.core.streams.ReadStream, io.vertx.core.streams.StreamBase
    public synchronized ReactiveReadStream<T> exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    @Override // io.vertx.core.streams.ReadStream
    /* renamed from: handler */
    public synchronized ReactiveReadStream<T> handler2(Handler<T> handler) {
        this.dataHandler = handler;
        if (handler != null && !this.paused) {
            checkRequestTokens();
        }
        return this;
    }

    @Override // org.reactivestreams.Subscriber
    public synchronized void onComplete() {
        if (this.endHandler != null) {
            this.endHandler.handle(null);
        }
    }

    @Override // org.reactivestreams.Subscriber
    public synchronized void onError(Throwable th) {
        if (th == null) {
            throw new NullPointerException("throwable");
        }
        if (this.exceptionHandler != null) {
            this.exceptionHandler.handle(th);
        }
    }

    @Override // org.reactivestreams.Subscriber
    public synchronized void onNext(T t) {
        if (t == null) {
            throw new NullPointerException("data");
        }
        checkUnsolicitedTokens();
        handleData(t);
    }

    @Override // org.reactivestreams.Subscriber
    public synchronized void onSubscribe(Subscription subscription) {
        if (subscription == null) {
            throw new NullPointerException("subscription");
        }
        if (this.subscription != null) {
            subscription.cancel();
        } else {
            this.subscription = subscription;
        }
    }

    @Override // io.vertx.core.streams.ReadStream
    /* renamed from: pause */
    public synchronized ReactiveReadStream<T> pause2() {
        this.paused = true;
        return this;
    }

    @Override // io.vertx.core.streams.ReadStream
    /* renamed from: resume */
    public synchronized ReactiveReadStream<T> resume2() {
        this.paused = false;
        while (true) {
            T poll = this.pending.poll();
            if (poll != null) {
                handleData(poll);
            } else {
                checkRequestTokens();
            }
        }
        return this;
    }
}
