How do I read the contents of multiple files asynchronously?

This doesn't work:

List<String> texts;
new Directory(path).list().listen((File f) {
  f.readAsString().then((content) => texts.add(text)
}, onDone: () => ...)

The Future returned by readAsString will fire after onDone is triggered. Future.wait looks like what I want, but I've a stream, not a list of futures. I tried to make use of a StreamTransformer but failed.

This is a workaround, but then I could have used listSync() in the first place, I figure...

new Directory(path).list().toList().then((List<File> list) {
    return Future.wait(list.map((File f) => f.readAsString()));
  }).then((List<String> texts) {
    ...
  })

What's the best way to transform a stream by mapping its values using some asynchronous operation that returns a future?
2
1
Simon Bear's profile photoFlorian Loitsch's profile photoWilliam Hesse's profile photoDzenan Ridjanovic's profile photo
8 comments
 
Try this:

List<Future> futures;
List<String> texts;
new Directory(path).list().listen((File f) {
  var c = new Completer();
  futures.add(c.future);
  f.readAsString().then((content) {
    texts.add(text);
    c.complete(null);
  }
})
Future.wait(futures).then(() {
  // all texts read now
});
 
Thanks, +James Ots. Do you really need those Completer instances?  You lose all errors this way. Why not simply do this?

List<Future> futures = new List();
new Directory(path).list().listen((File f) {
  futures.add(f.readAsString());
});
Future.wait(futures).then((List<String> texts) {
  // all texts read now
});

But I think both this code and your code have a racing condition. Future.wait may iterate over all futures and think it is done before all Future instances are added in listen(). Therefore, I'm afraid, you have to use toList() as shown in my second example.
 
You're right, those completers were unnecessary.

You could use an onDone method when you call listen, but I can't see a problem with using listSync - it's the read operation which is the slow part.
 
new Directory(path).list().map((f) => f.readAsString()).toList().then((fs) => Future.wait(fs).then((ts) => print(ts)));
 
+James Ots is correct about needing to use the onDone callback to the stream, here's an implementation without error handling:

import "dart:async";
import "dart:io";

void main() {
  Future<List<Future<String>>> result = readDirAsString("/tmp");
  result.then((List<Future<String>> fs) {
    for (Future<String> f in fs) {
      f.then((String s) {
        print(s);
      });
    }
  });
}

Future<List<Future<String>>> readDirAsString(String path) {
  Directory d = new Directory(path);
  List<Future<String>> futures = new List();
  Stream<FileSystemEntity> list = d.list();
  Completer c = new Completer();
  list.listen((FileSystemEntity e) {
    if (e is File) {
      futures.add((e as File).readAsString());
    }
  },
  onDone: () {
    c.complete(futures);
  });
  return c.future;
}
 
Unfortunately these solutions are not correct once you have errors.
+Lukas Renggli's code is very elegant and almost there, though. The only problem it has, is that the futures of the `readAsString` could report an error, and if the `toList` didn't finish before, nobody would be there to listen for them.
One alternative (untested) would be:
new Directory(path).list()
  .map((f) => f.readAsString()..catchError((e) {}))
  .toList()
  .then((fs) => Future.wait(fs))
  .then((ts) => print(ts));

Note the `..catchError`. It makes sure that the future has an error-handler and won't go to the global error-handler. The `Future.wait` will register another handler and will still receive the errors (if there are any).

I'm currently working on something like `tryAsync` and if everything goes well, it would make these things much easier. Fingers crossed.

Small additional comment: the last two lines of the code could be written as:
  .then(Future.wait)
  .then(print)

I'm keeping the wrapping closures to make it easier to understand what's going on.

Edit: Changed the catchError handler from `(e) => e` to `(e) {}` to make it more obvious that we completely ignore what happens in that error-handler.

Edit2: added small additional comment.
 
I think many of these are wrong, in the case that you want to read the files in a specific order, because the files could be read in any order.  Of course, this doesn't matter with Directory.list, but in other cases, the order matters.  The last couple of solutions keep all the strings read from files separate, and report them in parallel as a list at the end, but if you want to read the files in order, rather than all in parallel (for throttling, for example), and process the results, in order, then you need to chain the readAsString calls after each other:

Future readFilesSequentially(Stream<File> files, doWork(String)) {
    return files.fold(new Future.immediate(null),
       (chain, file) =>
       chain.then((_) => file.readAsString())
               .then((text) => doWork(text)));
}

In this case, doWork can either be synchronous, or be asynchronous and return a future, and the calls are still guaranteed to run in sequential order.

Using Stream.fold rather than Stream.listen is the best way to get a future that completes when the stream is done, rather than needing an onDone handler on the stream.  The general pattern for asynchronously processing data from a stream, sequentially, with the output from the previous stage fed to the next, is

Future<S> processSequentially(
    Stream<T> stream,
    Future<S> doWork(S, T),
    S initial) {
  return stream.fold(new Future.immediate(initial),
     (Future<S> chain, T data) =>
         chain.then((S previous) => doWork(previous, data)));
}

This is really just a "lifting" of Stream.fold to the case that the second argument acts asynchronously.  It will pause (but not block the event loop) both when a call to doWork has not completed its future, and also when the stream has not delivered a new stream element to process.  Perhaps Stream.fold should have provided this functionality automatically, whenever the folded function returns a future.

Also posted to Stack Overflow.
Add a comment...