Skip to content

Commit

Permalink
fix #407 修复结合RxJava发请求,监听文件上传进度,偶现进度没有递增问题
Browse files Browse the repository at this point in the history
(cherry picked from commit 66d0693)
  • Loading branch information
liujingxing committed Sep 19, 2022
1 parent 541edbd commit 45aeefd
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rxhttp.wrapper.param;

import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

import io.reactivex.rxjava3.core.Observable;
Expand All @@ -12,7 +13,6 @@
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.operators.SpscArrayQueue;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import okhttp3.Response;
import rxhttp.wrapper.annotations.NonNull;
Expand Down Expand Up @@ -163,7 +163,7 @@ private static final class AsyncParserObserver<T> extends AtomicInteger

private volatile boolean done;
private volatile boolean disposed;
private final SpscArrayQueue<Progress> queue;
private final LinkedBlockingQueue<Progress> queue;
private final Scheduler.Worker worker;

private final Consumer<Progress> progressConsumer;
Expand All @@ -173,7 +173,7 @@ private static final class AsyncParserObserver<T> extends AtomicInteger
this.parser = parser;
this.worker = worker;
this.progressConsumer = progressConsumer;
queue = new SpscArrayQueue<>(2);
queue = new LinkedBlockingQueue<>(2);

if (progressConsumer != null && parser instanceof StreamParser) {
((StreamParser) parser).setProgressCallback(this);
Expand Down Expand Up @@ -221,9 +221,8 @@ public void onNext(Progress progress) {
}

private void offer(Progress p) {
if (!queue.offer(p)) {
while (!queue.offer(p)) {
queue.poll();
queue.offer(p);
}
schedule();
}
Expand Down Expand Up @@ -260,7 +259,7 @@ void schedule() {
public void run() {
int missed = 1;

final SpscArrayQueue<Progress> q = queue;
final LinkedBlockingQueue<Progress> q = queue;
final Observer<? super T> a = downstream;
while (!checkTerminated(done, q.isEmpty(), a)) {
for (; ; ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ class ClassHelper(private val isAndroidPlatform: Boolean) {
package $rxHttpPackage;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import ${getClassPath("Observable")};
Expand All @@ -512,7 +513,6 @@ class ClassHelper(private val isAndroidPlatform: Boolean) {
import ${getClassPath("Exceptions")};
import ${getClassPath("Consumer")};
import ${getClassPath("DisposableHelper")};
import ${getClassPath("SpscArrayQueue")};
import ${getClassPath("RxJavaPlugins")};
import okhttp3.Response;
import rxhttp.wrapper.annotations.NonNull;
Expand Down Expand Up @@ -663,7 +663,7 @@ class ClassHelper(private val isAndroidPlatform: Boolean) {
private volatile boolean done;
private volatile boolean disposed;
private final SpscArrayQueue<Progress> queue;
private final LinkedBlockingQueue<Progress> queue;
private final Scheduler.Worker worker;
private final Consumer<Progress> progressConsumer;
Expand All @@ -673,7 +673,7 @@ class ClassHelper(private val isAndroidPlatform: Boolean) {
this.parser = parser;
this.worker = worker;
this.progressConsumer = progressConsumer;
queue = new SpscArrayQueue<>(2);
queue = new LinkedBlockingQueue<>(2);
if (progressConsumer != null && parser instanceof StreamParser) {
((StreamParser) parser).setProgressCallback(this);
Expand Down Expand Up @@ -721,9 +721,8 @@ class ClassHelper(private val isAndroidPlatform: Boolean) {
}
private void offer(Progress p) {
if (!queue.offer(p)) {
while (!queue.offer(p)) {
queue.poll();
queue.offer(p);
}
schedule();
}
Expand Down Expand Up @@ -760,7 +759,7 @@ class ClassHelper(private val isAndroidPlatform: Boolean) {
public void run() {
int missed = 1;
final SpscArrayQueue<Progress> q = queue;
final LinkedBlockingQueue<Progress> q = queue;
final Observer<? super T> a = downstream;
while (!checkTerminated(done, q.isEmpty(), a)) {
for (; ; ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ class ClassHelper(
package $rxHttpPackage;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import ${getClassPath("Observable")};
Expand All @@ -310,7 +311,6 @@ class ClassHelper(
import ${getClassPath("Exceptions")};
import ${getClassPath("Consumer")};
import ${getClassPath("DisposableHelper")};
import ${getClassPath("SpscArrayQueue")};
import ${getClassPath("RxJavaPlugins")};
import okhttp3.Response;
import rxhttp.wrapper.annotations.NonNull;
Expand Down Expand Up @@ -461,7 +461,7 @@ class ClassHelper(
private volatile boolean done;
private volatile boolean disposed;
private final SpscArrayQueue<Progress> queue;
private final LinkedBlockingQueue<Progress> queue;
private final Scheduler.Worker worker;
private final Consumer<Progress> progressConsumer;
Expand All @@ -471,7 +471,7 @@ class ClassHelper(
this.parser = parser;
this.worker = worker;
this.progressConsumer = progressConsumer;
queue = new SpscArrayQueue<>(2);
queue = new LinkedBlockingQueue<>(2);
if (progressConsumer != null && parser instanceof StreamParser) {
((StreamParser) parser).setProgressCallback(this);
Expand Down Expand Up @@ -519,9 +519,8 @@ class ClassHelper(
}
private void offer(Progress p) {
if (!queue.offer(p)) {
while (!queue.offer(p)) {
queue.poll();
queue.offer(p);
}
schedule();
}
Expand Down Expand Up @@ -558,7 +557,7 @@ class ClassHelper(
public void run() {
int missed = 1;
final SpscArrayQueue<Progress> q = queue;
final LinkedBlockingQueue<Progress> q = queue;
final Observer<? super T> a = downstream;
while (!checkTerminated(done, q.isEmpty(), a)) {
for (; ; ) {
Expand Down

0 comments on commit 45aeefd

Please sign in to comment.