The Computer Language
Benchmarks Game

k-nucleotide Dart #2 program

source code

/**
 * The Computer Language Benchmarks Game
 *
 * http://benchmarksgame.alioth.debian.org/
 *
 * Contributed by Dwayne Slater
 * Based on a Java implementation by James McIlree and Tagir Valeev
 */

library knucleotide;

import 'dart:async';
import 'dart:collection';
import 'dart:convert';
import 'dart:io';
import 'dart:isolate';

const codes = const [-1, 0, -1, 1, 3, -1, -1, 2];
const nucleotides = 'ACGT';

/**
 * Contains a map from keys to occurrence count.
 */
class Result {
  final Map<int, int> map = {};
  Result();

  @override
  String toString() => "$map";
}

/**
 * A pair of keys to counts.
 */
class KeyPair {
  final String key;
  final int count;
  KeyPair(this.key, this.count);
}

/**
 * Converts a slice of the sequence into a key.
 */
int getKey(List<int> arr, int offset, int length) {
  int key = 0;
  int lastOffset = offset + length;
  for (int i = offset; i < lastOffset; i++) {
    key = (key << 2) + arr[i];
  }
  return key;
}

/**
 * Creates a map of all the times any key occurs in the sequence.
 */
Result createFragmentMap(List<int> sequence, int offset, int fragmentLength) {
  var res = new Result();
  int lastIndex = sequence.length-fragmentLength+1;
  for (int index = offset; index < lastIndex; index += fragmentLength) {
    var key = getKey(sequence, index, fragmentLength);
    res.map[key] = (res.map[key] ?? 0)+1;
  }
  return res;
}

/**
 * Combines two [Result]s together.
 */
Result sumTwoMaps(Result map1, Result map2) {
  map2.map.forEach((key, value) => map1.map[key] += value);
  return map1;
}

/**
 * Converts a key to a string.
 */
String keyToString(int key, int length) {
  var res = new List<String>(length);
  for (int i = 0; i < length; i++) {
    res[length - i - 1] = nucleotides[key & 0x3];
    key >>= 2;
  }
  return res.join();
}

/**
 * Write out the frequencies for each key that appears in the given [Result].
 */
String writeFrequencies(double totalCount, int keyLength, Result frequencies) {
  var freq = new List<KeyPair>(frequencies.map.length);
  var i = 0;
  frequencies.map.forEach((key, cnt) => freq[i++] = new KeyPair(keyToString(key, keyLength), cnt));
  freq.sort((a, b) => b.count.compareTo(a.count));
  var sb = new StringBuffer();
  freq.forEach((entry) {
    sb.write("${entry.key} ${(entry.count * 100.0/totalCount).toStringAsFixed(3)}\n");
  });
  return (sb..write("\n")).toString();
}

/**
 * Converts a character list into a list of codes.
 */
List<int> toCodes(List<int> sequence) {
  var l = sequence.length;
  var result = new List<int>(l);
  for (int i = 0; i < l; i++) {
    result[i] = codes[sequence[i] & 0x7];
  }
  return result;
}

/**
 * Write out the occurrences of the given nucleotideFragment in the list of
 * pending results.
 */
Future<String> writeCount(List<Future<Result>> futureResults, String nucleotideFragment) async {
  var key = toCodes(nucleotideFragment.codeUnits);
  var k = getKey(key, 0, key.length);
  var count = 0;
  await Future.wait(futureResults.map((future) {
    return future.then((f) {
      count += f.map[k] ?? 0;
    });
  }).toList());
  return "$count\t$nucleotideFragment\n";
}

/**
 * Reads sequence data from stdin.
 */
FutureOr<List<int>> read({bool sync: false}) async {
  if (!sync) {
    // The old Dart k-nucleotide benchmark used stdin.readLineSync, which is
    // way slower than doing async stream transforms.
    var three = false;
    var builder = new BytesBuilder(copy: false);
    StreamSubscription<List<int>> sub;
    sub = stdin
      .transform(const Latin1Decoder())
      .transform(const LineSplitter())
      .listen((line) {
        if (three) {
          if (line[0] != '>') {
            builder.add(line.codeUnits);
          } else {
            sub.cancel();
          }
        } else if (line.startsWith('>THREE')) {
          three = true;
        }
      });
    await sub.asFuture();
    var bytes = builder.takeBytes();
    return toCodes(bytes);
  } else {
    var encoding = Encoding.getByName("ISO_8859-1:1987");
    String line;
    while (stdin.readLineSync().substring(0, 6) != '>THREE');

    var builder = new BytesBuilder(copy: false);
    while ((line = stdin.readLineSync(encoding: encoding)) != null && line[0] != '>') {
      builder.add(line.codeUnits);
    }

    var bytes = builder.takeBytes();
    return toCodes(bytes);
  }
}

/**
 * An object that runs on an Isolate.
 */
abstract class Task {
  run();
}

/**
 * The sequence data for the current Isolate.
 */
List<int> isolateSequence;

/**
 * Initializes the per-isolate sequence data.
 */
class InitIsolateSequenceTask extends Task {
  final String sequence;

  InitIsolateSequenceTask(this.sequence);

  void run() {
    isolateSequence = sequence.codeUnits;
  }
}

/**
 * Creates a FragmentMap using the sequence data and the given parameters.
 */
class CreateFragmentMapTask extends Task {
  final int index, fragmentLength;
  CreateFragmentMapTask(this.index, this.fragmentLength);

  run() => createFragmentMap(isolateSequence, index, fragmentLength);
}

/**
 * Handles the submission and completion of tasks sent to an Isolate.
 */
class IsolateHandler {
  final Isolate _isolate;
  final SendPort _taskport;
  final RawReceivePort _port;
  final Queue<Completer> _completers = new Queue<Completer>();
  IsolateHandler._(this._isolate, this._taskport, this._port);

  /**
   * Creates an IsolateHandler.
   */
  static Future<IsolateHandler> create() async {
    IsolateHandler handler;
    var completer = new Completer<SendPort>.sync();
    var port = new RawReceivePort((data) {
      if (!completer.isCompleted) {
        completer.complete(data);
      } else {
        handler._finish(data);
      }
    });
    var isolate = await Isolate.spawn(_runner, port.sendPort);
    var taskport = await completer.future;
    return handler = new IsolateHandler._(isolate, taskport, port);
  }

  /**
   * Main "loop" of the task running Isolate.
   */
  static void _runner(SendPort inport) {
    var port = new RawReceivePort((Task task) {
      inport.send(task.run());
    });
    inport.send(port.sendPort);
  }

  void _finish(data) {
    _completers.removeFirst().complete(data);
  }

  /**
   * Schedules a task to the underlying Isolate.
   */
  Future<T> schedule<T>(Task task) {
    var c = new Completer<T>.sync();
    _taskport.send(task);
    _completers.add(c);
    return c.future;
  }

  /**
   * Gets the current load on the Isolate.
   */
  int get weight => _completers.length;

  /**
   * Closes the IsolateHandler and cleans up associated resources.
   */
  void close() {
    _isolate.kill();
    _port.close();
  }
}

/**
 * Manages multiple [IsolateHandler] objects.
 * Allows tasks to be scheduled on the least loaded Isolate.
 */
class IsolateExecutor {
  final List<IsolateHandler> _handlers;

  IsolateExecutor._(this._handlers);

  /**
   * Create an IsolateExecutor. By default this creates an isolate for each
   * core of the current running system.
   */
  static Future<IsolateExecutor> create({int n}) async {
    n ??= Platform.numberOfProcessors;
    var handlers = await Future.wait(new Iterable.generate(n, (_) => IsolateHandler.create()));
    return new IsolateExecutor._(handlers);
  }

  /**
   * Schedules a task on ALL of the IsolateHandlers.
   */
  Future<List<T>> scheduleAll<T>(Task task) {
    var futures = _handlers.map((handler) => handler.schedule(task)).toList();
    return Future.wait(futures);
  }

  /**
   * Schedules a task on the least loaded IsolateHandler.
   */
  Future<T> schedule<T>(Task task) {
    var leastLoad = _handlers.first;
    var leastLoadScore = leastLoad.weight;
    for (int i=1; i<_handlers.length; i++) {
      var handler = _handlers[i];
      var score = handler.weight;
      if (score < leastLoadScore) {
        leastLoad = handler;
      }
    }
    return leastLoad.schedule(task);
  }

  /**
   * Closes all IsolateHandlers.
   */
  void close() {
    _handlers.forEach((h) => h.close());
  }
}

/**
 * Dispatches fragment map creation to an isolate,
 */
List<Future<Result>> dispatchFragmentTasks(IsolateExecutor executor, int fragmentLength) {
  List<Future<Result>> tasks = [];
  for (int index = 0; index < fragmentLength; index++) {
    tasks.add(executor.schedule(new CreateFragmentMapTask(index, fragmentLength)));
  }
  return tasks;
}

main() async {
  // Start creating the executor while we wait on IO to finish
  var executorFuture = IsolateExecutor.create();
  // Read the data from stdin
  var sequence = await read();
  // Wait on the executor to start if it hasn't finished yet
  var executor = await executorFuture;

  // We start by intializing all the executor isolates with the sequence we read
  // Unfortunately, Dart has to copy all the data when doing this
  // To optimize this a bit, we can send it as a String :P (Saves about 20 seconds from my testing)
  executor.scheduleAll(new InitIsolateSequenceTask(new String.fromCharCodes(sequence)));

  var futureBuffer = <Future<String>>[
    // Dispatch a task to get the frequencies with a fragment length of 1
    dispatchFragmentTasks(executor, 1)[0]
      .then((result) => writeFrequencies(sequence.length.toDouble(), 1, result)),
    // Dispatch a task to get the frequencies with a fragment length of 2
    Future.wait(dispatchFragmentTasks(executor, 2))
      .then((results) => writeFrequencies((sequence.length-1).toDouble(), 2, sumTwoMaps(results[0], results[1]))),
  ];
  // Dispatch tasks for each of the nucleotideFragments
  const ["GGT", "GGTA", "GGTATT", "GGTATTTTAATT", "GGTATTTTAATTTATAGT"].forEach((nucleotideFragment) {
    futureBuffer.add(writeCount(dispatchFragmentTasks(executor, nucleotideFragment.length), nucleotideFragment));
  });

  // Wait for everything to finish, then join the resulting output together
  stdout.write((await Future.wait(futureBuffer)).join());
  executor.close();
}
    

notes, command-line, and program output

NOTES:
64-bit Ubuntu quad core
Dart VM version: 1.22.1 (Tue Feb 21 14:27:26 2017) on "linux_x64"


Sat, 15 Apr 2017 05:19:49 GMT

MAKE:
make: 'knucleotide.dart-2.dart_run' is up to date.
0.02s to complete and log all make actions

COMMAND LINE:
/usr/local/src/dart-sdk/bin/dart  knucleotide.dart-2.dart 0 < knucleotide-input25000000.txt

PROGRAM OUTPUT:
A 30.295
T 30.151
C 19.800
G 19.754

AA 9.177
TA 9.132
AT 9.131
TT 9.091
CA 6.002
AC 6.001
AG 5.987
GA 5.984
CT 5.971
TC 5.971
GT 5.957
TG 5.956
CC 3.917
GC 3.911
CG 3.909
GG 3.902

1471758	GGT
446535	GGTA
47336	GGTATT
893	GGTATTTTAATT
893	GGTATTTTAATTTATAGT