Skip to content

Instantly share code, notes, and snippets.

@bfreuden
Created June 1, 2023 14:04
Show Gist options
  • Select an option

  • Save bfreuden/54d6bf1f621c5b655042f22dc867a5ea to your computer and use it in GitHub Desktop.

Select an option

Save bfreuden/54d6bf1f621c5b655042f22dc867a5ea to your computer and use it in GitHub Desktop.
public class ParallelLimit {
private static class Parallelizer<T> {
private final LinkedList<Supplier<Future<T>>> pendingTasks;
private final int limit;
private final int totalTasks;
private int concurrency;
private final List<T> completedTasks = new ArrayList<>();
private final Promise<List<T>> promise = Promise.promise();
private boolean failed = false;
Parallelizer(List<Supplier<Future<T>>> tasks, int limit) {
if (limit < 0) {
throw new IllegalArgumentException("limit must be positive");
}
this.pendingTasks = new LinkedList<>(tasks);
this.totalTasks = tasks.size();
this.limit = limit;
this.concurrency = 0;
}
Future<List<T>> run() {
if (pendingTasks.isEmpty())
return Future.succeededFuture(Collections.emptyList());
runInternal();
return promise.future();
}
private void runInternal() {
while (!pendingTasks.isEmpty() && concurrency < limit && !failed) {
Supplier<Future<T>> futureSupplier = pendingTasks.removeFirst();
Future<T> future = futureSupplier.get();
concurrency++;
future.onSuccess(res -> {
completedTasks.add(res);
concurrency--;
if (completedTasks.size() == totalTasks)
promise.complete(completedTasks);
else if (concurrency < limit)
runInternal();
});
future.onFailure(t -> {
failed = true;
promise.fail(t);
});
}
}
}
public static <T> Future<List<T>> parallelLimit(
List<Supplier<Future<T>>> tasks, int limit) {
return new Parallelizer<T>(tasks, limit).run();
}
// ----------------- test code ------------------------
public static void main(String[] args) throws InterruptedException {
Vertx vertx = Vertx.vertx();
ArrayList<Supplier<Future<Integer>>> suppliers = new ArrayList<>();
for (int i=0; i<50 ; i++) {
int nb = i;
suppliers.add(() -> asyncMultiplier(vertx, nb));
}
CountDownLatch latch = new CountDownLatch(1);
parallelLimit(suppliers, 10).onComplete(ar -> {
if (ar.succeeded()) {
System.out.println("done!");
} else {
System.err.println("error");
ar.cause().printStackTrace();
}
latch.countDown();
});
latch.await();
vertx.close();
}
public static Future<Integer> asyncMultiplier(Vertx vertx, int n) {
System.out.println("starting task: " + n);
Promise<Integer> result = Promise.promise();
vertx.setTimer(1000, it -> {
System.out.println("task complete: " + n);
result.complete(n * 2);
});
return result.future();
}
}
@LangInteger
Copy link

test code can remove CountDownLatch:

  // ----------------- test code ------------------------

  public static void main(String[] args) throws InterruptedException {
    Vertx vertx = Vertx.vertx();
    ArrayList<Supplier<Future<Integer>>> suppliers = new ArrayList<>();
    for (int i=0; i<50 ; i++) {
      int nb = i;
      suppliers.add(() -> asyncMultiplier(vertx, nb));
    }
    parallelLimit(suppliers, 10).onComplete(ar -> {
      if (ar.succeeded()) {
        System.out.println("done!");
      } else {
        System.err.println("error");
        ar.cause().printStackTrace();
      }
      vertx.close();
    });
  }

  public static Future<Integer> asyncMultiplier(Vertx vertx, int n) {
    System.out.println("starting task: " + n);
    Promise<Integer> result = Promise.promise();
    vertx.setTimer(1000, it -> {
      System.out.println("task complete: " + n);
      result.complete(n * 2);
    });
    return result.future();
  }

@bfreuden
Copy link
Author

bfreuden commented Jun 5, 2023

@LangInteger ๐Ÿ‘

@FrancieKime
Copy link

FrancieKime commented Jun 12, 2023

Thank you for helping me out as well. Before this post, I found it challenging as well, but now the issue is resolved. If you are having trouble with your research paper as well, I recommend visiting this website https://www.grabmyessay.com/samples/category/geography You can attempt these works; they offer admissions essays, research papers, college papers, coursework, and dissertations. They are simply the best writers; they helped me out through a difficult moment.

@bfreuden
Copy link
Author

@FrancieKime ๐Ÿ‘

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment