https://github.com/Gozala/reducers
Raw File
Tip revision: fc0627cda4f3fafad3b8b212ce7912c53771e78e authored by Irakli Gozalishvili on 22 January 2013, 17:02:00 UTC
Add IE10 into test matrix.
Tip revision: fc0627c
hub.js
"use strict";

var reduce = require("reducible/reduce")
var reduced = require("reducible/reduced")
var isReduced = require("reducible/is-reduced")
var end = require("reducible/end")

var input = "input@" + module.id
var consumers = "consumers@" + module.id


var isArray = Array.isArray

function Hub(source) {
  this[input] = source
  this[consumers] = []
}

reduce.define(Hub, function reduceHub(hub, next, initial) {
  // Enqueue new consumer into consumers array so that new
  // values will be delegated to it.
  hub[consumers].push({ next: next, state: initial })
  // If source is not in the process of consumption than
  // start it up.
  if (!isOpen(hub)) open(hub)
})

function drain(consumers) {
  while (consumers.length) {
    var count = consumers.length
    var index = 0
    while (index < count) {
      var consumer = consumers[index]
      consumer.next(end, consumer.state)
      index = index + 1
    }
    consumers.splice(0, count)
  }
}

function dispatch(consumers, value) {
  var count = consumers.length
  var index = 0
  while (index < count) {
    var consumer = consumers[index]
    var state = consumer.next(value, consumer.state)
    // If consumer has finished accumulation remove it from the consumers
    // list. And dispatch end of stream on it (maybe that should not be
    // necessary).
    if (isReduced(state)) {
      consumers.splice(index, 1)
      consumer.next(end, state.value)
      // If consumer is removed than we decrease count as consumers array
      // will contain less elements (unless of course more elements were
      // added but we would like to ignore those).
      count = count - 1
    } else {
      consumer.state = state
      index = index + 1
    }
  }
}

function open(hub) {
  var source = hub[input]
  var reducers = hub[consumers]
  hub[input] = null         // mark hub as open
  reduce(source, function distribute(value) {
    // If it's end of the source we close all the reducers including
    // ones that subscribe as side effect.
    if (value === end) drain(reducers)
    // otherwise we dispatch value to all the registered reducers.
    else dispatch(reducers, value)

    // reducers will be empty if either source is drained or if all the
    // reducers finished reductions. Either way we reset input back to
    // source and return `reduced` marker to stop the reduction of
    // source.
    if (reducers.length === 0) {
      hub[input] = source
      return reduced()
    }
  })
}

function isOpen(hub) {
  return hub[input] === null
}

function hub(source) {
  /**
  Take a reducible `source`, such as a `signal` and return a reducible that can
  be consumed by many reducers.
  **/
  if (source === null) return null
  if (source === void(0)) return void(0)
  return new Hub(source)
}
hub.isOpen = isOpen
hub.type = Hub

module.exports = hub
back to top