The Computer Language
Benchmarks Game

chameneos-redux Rust #2 program

source code

// The Computer Language Benchmarks Game
// http://benchmarksgame.alioth.debian.org/
//
// contributed by the Rust Project Developers
// contributed by Joshua Landau

use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;


const DIGITS: [&'static str; 10] = [
   "zero", "one", "two", "three", "four",
   "five", "six", "seven", "eight", "nine",
];

fn wordy_num(num: usize) -> String {
   let mut out = String::new();
   for char in num.to_string().chars() {
      out.push_str(" ");
      out.push_str(DIGITS[char.to_digit(10).unwrap() as usize]);
   }
   out
}


#[derive(Clone, Copy)]
#[repr(u8)]
enum Color {
   Red = 0,
   Yellow = 1,
   Blue = 2,
}

impl Color {
   fn show(&self) -> &'static str {
      use Color::*;
      match *self {
         Red => "red",
         Yellow => "yellow",
         Blue => "blue",
      }
   }
}

fn complement_color(left: Color, right: Color) -> Color {
   use Color::*;
   match ((left as u8) << 2) | (right as u8) {
      0b_00_00 => Red,
      0b_00_01 => Blue,
      0b_00_10 => Yellow,
      0b_01_00 => Blue,
      0b_01_01 => Yellow,
      0b_01_10 => Red,
      0b_10_00 => Yellow,
      0b_10_01 => Red,
      _      => Blue,
   }
}


#[derive(Default)]
struct AtomicColor(AtomicUsize);

impl AtomicColor {
   fn load(&self, order: Ordering) -> Color {
      use Color::*;
      match self.0.load(order) {
         0 => Red,
         1 => Yellow,
         _ => Blue,
      }
   }

   fn store(&self, color: Color, order: Ordering) {
      self.0.store(color as usize, order)
   }
}


// Each Chameneos is atomic to allow safe, fast
// parallel thread access. Unfortunately this
// this is a bit wordy, but it works out OK.
#[derive(Default)]
struct ChameneosState {
   name: u8,
   color: AtomicColor,
   meet_count: AtomicUsize,
   meet_same_count: AtomicUsize,
}

impl ChameneosState {
   fn name(&self) -> u8 {
      self.name
   }

   fn color(&self) -> Color {
      self.color.load(Ordering::Acquire)
   }

   fn meet(&self, same: bool, color: Color) {
      let new = self.meet_count.load(Ordering::Acquire) + 1;
      self.meet_count.store(new, Ordering::Release);
      if same {
         let new = self.meet_same_count.load(Ordering::Acquire) + 1;
         self.meet_same_count.store(new, Ordering::Release);
      }
      self.color.store(color, Ordering::Release);
   }
}


#[derive(Copy, Clone)]
struct Chameneos {
   idx: u32
}

impl Chameneos {
   fn is_valid(&self) -> bool {
      self.idx != 0
   }

   fn get<'st>(&self, shared: &'st Shared) -> &'st ChameneosState {
      &shared.states[(self.idx & BLOCK) as usize]
   }
}


struct Shared {
   // We can only store 12 due to overflow
   // when the maximum number of pairs are
   // created, so no need for a large buffer.
   // Using 16 avoids bounds checks.
   states: [ChameneosState; 16],
   // Bottom block is mall, rest are queue slots
   atomic_queue: AtomicUsize,
   meetings_had: AtomicUsize,
   meetings_limit: usize,
}

impl Shared {
   fn null_task(&self) -> Chameneos {
      self.task_at(0)
   }

   fn task_at(&self, idx: u32) -> Chameneos {
      Chameneos { idx: idx }
   }

   fn load(&self, order: Ordering) -> u32 {
      self.atomic_queue.load(order) as u32
   }

   fn store(&self, val: u32, order: Ordering) {
      self.atomic_queue.store(val as usize, order)
   }

   fn compare_and_swap(&self, current: u32, new: u32, order: Ordering) -> u32 {
      self.atomic_queue.compare_and_swap(current as usize, new as usize, order) as u32
   }
}


const BLOCK: u32 = 0b1111;
const BLOCK_LEN: u32 = 4;
const QUEUE_LEN: u32 = 32 / BLOCK_LEN;
const QUEUE_STOPPED: u32 = !0;

struct State {
   cache: u32,
}

impl State {
   fn new(shared: &Shared) -> State {
      State { cache: shared.load(Ordering::SeqCst) }
   }

   fn run(&mut self, shared: &Shared) -> Option<(TransactionalQueue, Chameneos, u8)> {
      let cache = &mut self.cache;

      if *cache == QUEUE_STOPPED {
         None
      }
      else {
         let mut queue = TransactionalQueue {
            set_state: *cache,
            cache: cache
         };
         let count = queue.take_count();
         let mall = queue.take(shared);
         Some((queue, mall, count))
      }
   }

   fn register_meeting(&mut self, shared: &Shared, count: u8) -> bool {
      if count != 0 { return true; }

      let meetings_had = shared.meetings_had.fetch_add(1, Ordering::Acquire);
      if meetings_had < shared.meetings_limit {
         shared.atomic_queue.fetch_add(0b11111111, Ordering::AcqRel);
         return true;
      }
      // Oops, we couldn't actually do that
      shared.store(QUEUE_STOPPED, Ordering::SeqCst);
      false
   }
}


struct TransactionalQueue<'a> {
   set_state: u32,
   cache: &'a mut u32,
}

impl<'a> TransactionalQueue<'a> {
   fn submit(mut self, shared: &Shared, mall: Chameneos, count: u8) -> bool {
      self.set_state <<= BLOCK_LEN;
      self.set_state |= mall.idx;

      self.set_state <<= 8;
      self.set_state |= count as u32;

      let actual = shared.compare_and_swap(
         *self.cache,    // expected current value
         self.set_state,  // wanted value
         Ordering::Release
      );

      let worked = actual == *self.cache;
      *self.cache = if worked { self.set_state } else { actual };
      worked
   }

   fn cancel(&mut self, shared: &Shared) {
      *self.cache = shared.load(Ordering::Relaxed);
   }

   fn take_count(&mut self) -> u8 {
      let ret = (self.set_state & 0b11111111) as u8;
      self.set_state >>= 8;
      ret
   }

   fn take(&mut self, shared: &Shared) -> Chameneos {
      let ret = self.set_state & BLOCK;
      self.set_state >>= BLOCK_LEN;
      shared.task_at(ret)
   }

   fn put(&mut self, first: Chameneos, second: Chameneos) {
      let zeros = self.set_state.leading_zeros();
      let shift = (QUEUE_LEN - (zeros / BLOCK_LEN)) * BLOCK_LEN;
      self.set_state |= ((first.idx << BLOCK_LEN) | second.idx) << shift;
   }
}


// Runs threads from the shared thread pool.
// Uses optimistic concurrency to queue and
// deque threads, as well as to take from the
// mall.
fn thread_executor(mut task: Chameneos, shared: &Shared) {
   let mut state = State::new(shared);
   let mut mall = shared.null_task();

   loop {
      let mut task_tmp = task;
      let mut mall_tmp = mall;
      let mut count;

      {
         let (mut queue, new_mall, new_count) = match state.run(shared) {
            Some(x) => x,
            None => return,
         };
         count = new_count;

         if mall_tmp.is_valid() {
            queue.put(task_tmp, mall_tmp);
            task_tmp = queue.take(shared);
         }
         else if !task_tmp.is_valid() {
            task_tmp = queue.take(shared);
            if !task_tmp.is_valid() {
               std::thread::sleep_ms(1);
               queue.cancel(shared);
               continue;
            }
         }

         mall_tmp = new_mall;
         if !mall_tmp.is_valid() {
            let replacement_task = queue.take(shared);
            if queue.submit(shared, task_tmp, count) {
               task = replacement_task;
               mall = shared.null_task();
            }
            continue;
         }

         if count == 0 {
            queue.cancel(shared);
            continue;
         }

         count -= 1;

         if !queue.submit(shared, shared.null_task(), count) {
            continue;
         }
         task = task_tmp;
         mall = mall_tmp;
      }

      if !state.register_meeting(shared, count) { return; }

      let actor_ref = task_tmp.get(shared);
      let mall_ref = mall.get(shared);

      let same = actor_ref.name() == mall_ref.name();
      let new_color = complement_color(actor_ref.color(), mall_ref.color());

      actor_ref.meet(same, new_color);
      mall_ref.meet(same, new_color);
   }
}


fn run_for(meetings_limit: usize, colors: &[Color]) -> Vec<(usize, usize)> {
   let num_threads = colors.len();

   let x = || Default::default();
   let mut states: [ChameneosState; 16] = [
      x(), x(), x(), x(), x(), x(), x(), x(),
      x(), x(), x(), x(), x(), x(), x(), x(),
   ];

   for (i, &color) in colors.iter().enumerate() {
      let idx = i + 1;
      let chameneos_state = &mut states[idx];
      chameneos_state.name = idx as u8;
      chameneos_state.color.store(color, Ordering::Release);
   }

   let shared = Arc::new(Shared {
      atomic_queue: AtomicUsize::new((meetings_limit % 255) + 1),
      meetings_had: AtomicUsize::new(0),
      meetings_limit: meetings_limit / 255,
      states: states,
   });

   let threads: Vec<_> = (0..num_threads).map(|i| {
      let task = shared.task_at((i + 1) as u32);
      let shared = shared.clone();
      thread::spawn(move || thread_executor(task, &shared))
   }).collect();

   for thread in threads {
      thread.join().unwrap();
   }

   let output = &shared.states[..];

   output[1..colors.len() + 1].iter().map(|ch| (
      ch.meet_count.load(Ordering::SeqCst),
      ch.meet_same_count.load(Ordering::SeqCst),
   )).collect()
}


fn main() {
   use Color::*;
   let small = [Blue, Red, Yellow];
   let large = [Blue, Red, Yellow, Red, Yellow, Blue, Red, Yellow, Red, Blue];

   let num_meetings = std::env::args_os().nth(1)
      .and_then(|s| s.into_string().ok())
      .and_then(|n| n.parse().ok())
      .unwrap_or(600);

   let colors = [Blue, Red, Yellow];
   for &left in &colors {
      for &right in &colors {
         let complement = complement_color(left, right);
         println!("{} + {} -> {}", left.show(), right.show(), complement.show());
      }
   }

   let threads: Vec<(&[_], _)> = vec![
      (&small, thread::spawn(move || run_for(num_meetings, &small))),
      (&large, thread::spawn(move || run_for(num_meetings, &large))),
   ];

   for (colors, thread) in threads {
      println!("");

      for color in colors { print!(" {}", color.show()); }
      println!("");

      let mut total_count = 0;
      for (meet_count, meet_same_count) in thread.join().unwrap() {
         println!("{}{}", meet_count, wordy_num(meet_same_count));
         total_count += meet_count;
      }

      println!("{}", wordy_num(total_count));
   }

   println!("");
}
    

notes, command-line, and program output

NOTES:
32-bit Ubuntu one core
rustc 1.13.0 (2c6933acc 2016-11-07)


Fri, 11 Nov 2016 20:13:15 GMT

MAKE:
/usr/local/src/rust-1.13.0/bin/rustc -C opt-level=3 -C target-cpu=core2  chameneosredux.rs -o chameneosredux.rust-2.rust_run
warning: use of deprecated item: replaced by `std::thread::sleep`, #[warn(deprecated)] on by default
   --> chameneosredux.rs:285:16
    |
285 |                std::thread::sleep_ms(1);
    |                ^^^^^^^^^^^^^^^^^^^^^

2.42s to complete and log all make actions

COMMAND LINE:
./chameneosredux.rust-2.rust_run 6000000

PROGRAM OUTPUT:
blue + blue -> blue
blue + red -> yellow
blue + yellow -> red
red + blue -> yellow
red + red -> red
red + yellow -> blue
yellow + blue -> red
yellow + red -> blue
yellow + yellow -> yellow

 blue red yellow
3744863 zero
3750105 zero
4505032 zero
 one two zero zero zero zero zero zero

 blue red yellow red yellow blue red yellow red blue
852973 zero
827278 zero
1211382 zero
1721388 zero
1134416 zero
1567538 zero
1406816 zero
975687 zero
1083539 zero
1218983 zero
 one two zero zero zero zero zero zero