Skip to content

Instantly share code, notes, and snippets.

@ebwood
Created April 20, 2024 08:52
Show Gist options
  • Select an option

  • Save ebwood/3c6571aac87585a27b5dcc03b0dcca68 to your computer and use it in GitHub Desktop.

Select an option

Save ebwood/3c6571aac87585a27b5dcc03b0dcca68 to your computer and use it in GitHub Desktop.
simulate download task count limit and speed limit
import 'dart:async';
import 'dart:math';
import 'package:collection/collection.dart';
extension on int {
int get mb => this * 1024 * 1024;
}
void main() {
DownloadPool pool = DownloadPool(maxTaskCount: 5, maxDownloadSpeed: 2.mb);
Future.delayed(Duration(seconds: 2), () {
pool.addNewTask();
pool.addNewTask();
});
Future.delayed(Duration(seconds: 3), () {
pool.addNewTask();
pool.addNewTask();
});
}
class DownloadPool {
final int maxTaskCount;
final int maxDownloadSpeed;
final List<DownloadTask> _runningQueue = [];
final List<DownloadTask> _waitingQueue = [];
int _lastOverflow = 0;
int _taskIndex = 0;
DownloadPool({
this.maxTaskCount = 5,
this.maxDownloadSpeed = 2,
}) {
for (int i = 0; i < maxTaskCount; i++) {}
_createTimer();
}
Timer? timer;
int _throttleIndex = -1;
int limitSpeedByTime(Duration duration) =>
(duration.inMilliseconds / 1000 * maxDownloadSpeed).toInt();
_createTimer() {
Duration duration = Duration(milliseconds: 1000);
int limitSpeed = limitSpeedByTime(duration);
timer = Timer.periodic(duration, (timer) async {
if (_runningQueue.isEmpty) {
print('all tasks done');
timer.cancel();
_lastOverflow = 0;
return;
}
_throttleIndex++;
int count = 0;
if (_lastOverflow < limitSpeed) {
// lastOverflow < limitSpeed, download new data
count = (await Future.wait(_runningQueue.map((e) => e.download())))
.fold(0, (previousValue, element) => previousValue + element);
_lastOverflow = count + _lastOverflow - limitSpeed;
} else {
// lastOverflow >= limitSpeed, wait and throttle until next time
_lastOverflow = _lastOverflow - limitSpeed;
}
// lastOverflow is negative, it means the bandwidth is not used full
String tag =
'\n[$_throttleIndex throttle] ${DateTime.now()} : $count, overflow: ${format(_lastOverflow)}';
print(tag);
_lastOverflow = max(_lastOverflow, 0);
});
}
void addNewTask() {
final task = DownloadTask(maxDownloadSpeed,
downloadTaskIndex: ++_taskIndex,
totalLength: Random().nextInt(100.mb),
onDone: taskDone);
_waitingQueue.add(task);
_scheduleTask();
}
void taskDone(DownloadTask task) {
print('done ${DateTime.now()} $task');
_runningQueue.remove(task);
_scheduleTask();
}
// schedule task
_scheduleTask() {
if (_waitingQueue.isEmpty) {
return;
}
if (_runningQueue.length >= maxTaskCount) {
return;
}
final task = _waitingQueue.removeAt(0);
_runningQueue.add(task);
_startTimer();
}
void _startTimer() {
if (timer != null && timer!.isActive) {
return;
}
_createTimer();
}
}
class DownloadTask {
final int maxDownloadSpeed;
late final StreamController<int> controller;
late final StreamSubscription<int> downloadSubscription;
final int downloadTaskIndex;
final int totalLength;
final Function(DownloadTask) onDone;
int currentLength = 0;
DownloadTask(this.maxDownloadSpeed,
{required this.downloadTaskIndex,
required this.totalLength,
required this.onDone}) {
controller = StreamController<int>.broadcast();
downloadSubscription = controller.stream.listen(_handleDownload);
}
void _handleDownload(int count) {
print(
'$downloadTaskIndex download ${DateTime.now()}: $count, currentLength: $currentLength, totalLength: $totalLength');
if (currentLength >= totalLength) {
dispose();
onDone(this);
}
}
Future<int> download() async {
final count = Random().nextInt(maxDownloadSpeed);
controller.sink.add(count);
currentLength = min(currentLength + count, totalLength);
return count;
}
void dispose() {
downloadSubscription.cancel();
controller.close();
}
@override
String toString() {
return 'DownloadTask$downloadTaskIndex: totalLength: $totalLength, currentLength: $currentLength';
}
}
String format(int i) {
bool negative = i < 0;
List<int> result = [];
if (i == 0) {
return '0';
}
int remain = i.abs();
while (remain > 0) {
int l = remain % 1000;
result.add(l);
remain ~/= 1000;
}
List<String> result2 = result.reversed
.mapIndexed((index, e) => index == 0 ? '$e' : '$e'.padLeft(3, '0'))
.toList();
return (negative ? '-' : '') + result2.join(',');
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment