Raw File
stream-functional.hpp
#pragma once

#include <random>

#include <stxxl/sorter>
#include "defs.hpp"
#include "contraction.hpp"
#include "kruskal.hpp"
#include "stream-checks.hpp"
#include "stream-sibeyn.hpp"
#include "stream-utils.hpp"
#include "streaming/utils/StreamFilter.h"
#include "variants.hpp"



template <typename edge_stream_t>
void compute_ccs(edge_stream_t& input_stream, size_t node_estimate, size_t semiexternal_limit, edge_stream_t& output,
                 unsigned level, policy_t& policy) {
	foxxll::timer full_timer;
	foxxll::timer timer;
	std::cout << "call," << node_estimate << "," << input_stream.size() << "," << std::endl;
	full_timer.start();
	timer.start();
	size_t old_estimate = node_estimate;
	node_estimate = count_unique_nodes(input_stream);
	input_stream.rewind();
	timer.stop();
	std::cout << "count," << old_estimate << "," << node_estimate << "," << timer.useconds() << std::endl;
	timer.reset();

	// base case
	if (node_estimate <= semiexternal_limit) {
		// TODO: streamify instead of materializing vector
		std::cout << "base_case,,," << std::endl;
		timer.start();
		em_edge_vector edges(input_stream.size());
		stxxl::stream::materialize(input_stream, edges.begin(), edges.end());
		em_edge_vector stars;
		SEMKruskal kruskal_algo(edges, 0, edges.size(), stars);
		stxxl::sort(stars.begin(), stars.end(), edge_lt_ordering(), SORTER_MEM);
		for (const auto& e: stars) {
			output.push(e);
		}
		timer.stop();
		std::cout << "kruskal,,," << timer.useconds() << std::endl;
		full_timer.stop();
		std::cout << "done,,," << full_timer.useconds() << std::endl;
		return;
	}

	std::cout << "recursive_case,,," << std::endl;
	assert(is_sorted(input_stream, edge_lt_ordering()));

	bool perform_contraction = policy.perform_contraction(node_estimate, input_stream.size(), level);
	edge_stream_t contracted_edges;
	edge_stream_t contraction_stars;
	if (perform_contraction) {
		timer.start();
		// contraction
		size_t number_to_contract = policy.contract_number(node_estimate, input_stream.size(), level);
		old_estimate = node_estimate;
		contract_nodes(input_stream, number_to_contract, contracted_edges, contraction_stars);
		contracted_edges.consume();
		contraction_stars.consume();
		node_estimate -= number_to_contract;
		timer.stop();
		std::cout << "contraction," << old_estimate << "," << node_estimate << "," << timer.useconds() << std::endl;
		timer.reset();

		assert(disjoint_sources(contraction_stars, contracted_edges));
		assert(only_stars(contraction_stars));
		assert(is_sorted(contracted_edges, edge_lt_ordering()));
		assert(is_sorted(contraction_stars, edge_lt_ordering()));
	} else {
		// TODO: figure out something smarter
		flush(input_stream, contracted_edges);
		contracted_edges.consume();
	}


	// 1. sample edges
	timer.start();
	edge_stream_t E1;
	edge_stream_t E2;
	// TODO: parameter tweaking of 0.5
	double sample_prob = policy.sample_prob(node_estimate, contracted_edges.size(), level);
	sample_edges(contracted_edges, E1, E2, sample_prob);
	E1.consume();
	E2.consume();
	timer.stop();
	std::cout << "sample," << E1.size() << "," << E2.size() << "," << timer.useconds() << std::endl;
	timer.reset();
	assert(E1.size() + E2.size() == contracted_edges.size());

	// 2. compute CCs of sampled E1
	edge_stream_t CC_G1;
	// TODO: update node estimate more smartly
	compute_ccs(E1, node_estimate/2, semiexternal_limit, CC_G1, level+1, policy);
	CC_G1.consume();
	std::cout << "cc_g1,," << CC_G1.size() << "," << std::endl;
	assert(is_sorted(CC_G1, edge_lt_ordering()));
	assert(only_stars(CC_G1));

	// 3. contract to get G' = G/CC(G₁)
	timer.start();
	edge_stream_t G_;
	contract_stars(E2, CC_G1, G_);
	G_.consume();
	CC_G1.rewind();
	timer.stop();
	std::cout << "g_,," << G_.size() << "," << timer.useconds() << std::endl;
	assert(disjoint_sources(G_, CC_G1));
	assert(only_stars(CC_G1));

	// 4. recurse on G' to get CC(G')
	edge_stream_t CC_G_;
	// TODO: update node estimate smartly
	compute_ccs(G_, node_estimate/2, semiexternal_limit, CC_G_, level+1, policy);
	CC_G_.consume();
	std::cout << "cc_g_,," << CC_G_.size() << "," << std::endl;

	// 5. relabel CC(G₁) by CC(G'), merge and output
	timer.start();
	edge_stream_t relabeled_G1;
	relabel(CC_G1, CC_G_, relabeled_G1);
	relabeled_G1.consume();
	CC_G_.rewind();
	timer.stop();
	std::cout << "relabel,,," << timer.useconds() << std::endl;
	timer.reset();
	assert(only_stars(relabeled_G1));

	timer.start();
	StreamMerger functional_output(relabeled_G1, CC_G_, edge_lt_ordering());;
	assert(only_stars(functional_output));
	assert(relabeled_G1.size() + CC_G_.size() == functional_output.size());

	if (perform_contraction) {
		// update the targets of set aside contraction stars
		edge_stream_t updated_contraction_stars;
		relabel_targets(contraction_stars, functional_output, updated_contraction_stars);
		updated_contraction_stars.consume();
		functional_output.rewind();

		// merge in the contracted stars
		assert(disjoint_sources(contraction_stars, functional_output));
		StreamMerger merged_stars(functional_output, updated_contraction_stars, edge_lt_ordering());
		flush(merged_stars, output);
		assert(functional_output.size() + contraction_stars.size() == output.size());
	} else {
		// TODO: smarter
		flush(functional_output, output);
	}
	timer.stop();
	std::cout << "merge,,," << timer.useconds() << std::endl;
	full_timer.stop();
	std::cout << "done,,," << full_timer.useconds() << std::endl;
}

template <typename edge_stream_t>
size_t count_unique_nodes(edge_stream_t& edges) {
	// TODO: check if it's faster to split into two node lists and then merge
	using node_sorter = stxxl::sorter<node_t, node_lt_ordering>;
	node_sorter node_set(node_lt_ordering(), SORTER_MEM);
	node_t prev_source = MIN_NODE;
	for (; !edges.empty(); ++edges) {
		if ((*edges).u != prev_source) {
			node_set.push((*edges).u);
			prev_source = (*edges).u;
		}
		node_set.push((*edges).v);
	}
	node_set.sort();
	UniqueElementStreamFilter<node_sorter> unique_nodes(node_set);
	size_t count = 0;
	for (; !unique_nodes.empty(); ++unique_nodes) {
		++count;
	}
	return count;
}

// turn this function into a stream type instead?
template <typename edge_stream_t>
void contract_stars(edge_stream_t& edges, edge_stream_t& stars, edge_stream_t& output) {
	// assumes edges and stars are already sorted
	// and that we can rewind stars
	// TODO: think about this more (buffering between levels?)
	assert(is_sorted(edges, edge_lt_ordering()));
	assert(is_sorted(stars, edge_lt_ordering()));
	assert(only_stars(stars));
	using edge_target_sorter = stxxl::sorter<edge_t, edge_target_lt_ordering>;
	edge_target_sorter updated_sources(edge_target_lt_ordering(), SORTER_MEM);
	// note: needs to be strong cmp for unique filtering after
	for (; !edges.empty(); ++edges) {
		const edge_t e = *edges;
		while (!stars.empty() && (*stars).u < e.u) {
			++stars;
		}
		if (!stars.empty() && (*stars).u == e.u) {
			updated_sources.push(edge_t((*stars).v, e.v));
		} else {
			updated_sources.push(e);
		}
	}
	updated_sources.sort();

	// make unique
	UniqueElementStreamFilter<edge_target_sorter> updated_sources_unique(updated_sources);
	// update targets
	stars.rewind();
	using edge_source_sorter = stxxl::sorter<edge_t, edge_lt_ordering>;
	edge_source_sorter updated_targets(edge_lt_ordering(), SORTER_MEM);
	for (; !updated_sources_unique.empty(); ++updated_sources_unique) {
		const edge_t e = *updated_sources_unique;
		while (!stars.empty() && (*stars).u < e.v) {
			++stars;
		}
		// making sure that resulting edges are still smaller-to-larger
		edge_t new_edge = e;
		if (!stars.empty() && (*stars).u == e.v) {
			new_edge = edge_t(e.u, (*stars).v);
		}
		new_edge.orient_smaller_to_larger();
		if (!new_edge.self_loop()) {
			updated_targets.push(new_edge);
		}
	}
	updated_targets.sort();

	// make unique and output
	UniqueElementStreamFilter<edge_source_sorter> updated_targets_unique(updated_targets);
	flush(updated_targets_unique, output);
}

// turn this function into a stream type instead?
template <typename edge_stream_t>
void relabel(edge_stream_t& edges, edge_stream_t& stars, edge_stream_t& output) {
	// assumes edges and stars are already sorted
	// and that we can rewind stars
	// TODO: think about this more (buffering between levels?)
	assert(is_sorted(edges, edge_lt_ordering()));
	assert(is_sorted(stars, edge_lt_ordering()));
	assert(only_stars(stars));
	using edge_target_sorter = stxxl::sorter<edge_t, edge_target_lt_ordering>;
	edge_target_sorter updated_sources(edge_target_lt_ordering(), SORTER_MEM);
	// note: needs to be strong cmp for unique filtering after
	for (; !edges.empty(); ++edges) {
		const edge_t e = *edges;
		while (!stars.empty() && (*stars).u < e.u) {
			++stars;
		}
		if (!stars.empty() && (*stars).u == e.u) {
			updated_sources.push(edge_t((*stars).v, e.v));
		} else {
			updated_sources.push(e);
		}
	}
	updated_sources.sort();

	// make unique
	UniqueElementStreamFilter<edge_target_sorter> updated_sources_unique(updated_sources);
	// update targets
	stars.rewind();
	using edge_source_sorter = stxxl::sorter<edge_t, edge_lt_ordering>;
	edge_source_sorter updated_targets(edge_lt_ordering(), SORTER_MEM);
	for (; !updated_sources_unique.empty(); ++updated_sources_unique) {
		const edge_t e = *updated_sources_unique;
		while (!stars.empty() && (*stars).u < e.v) {
			++stars;
		}
		// making sure that resulting edges are still smaller-to-larger
		edge_t new_edge = e;
		if (!stars.empty() && (*stars).u == e.v) {
			new_edge = edge_t(e.u, (*stars).v);
		}
		// difference from contract: allow self-loop and do not reorient
		updated_targets.push(new_edge);
	}
	updated_targets.sort();

	// make unique and output
	UniqueElementStreamFilter<edge_source_sorter> updated_targets_unique(updated_targets);
	flush(updated_targets_unique, output);
}

template <typename edge_stream_1_t, typename edge_stream_2_t, typename edge_stream_3_t>
void relabel_targets(edge_stream_1_t& edges, edge_stream_2_t& stars, edge_stream_3_t& output) {
	// TODO: put the stars into a sorter earlier instead of flushing
	using edge_target_sorter = stxxl::sorter<edge_t, edge_target_lt_ordering>;
	edge_target_sorter sort_by_target(edge_target_lt_ordering(), SORTER_MEM);
	flush(edges, sort_by_target);
	sort_by_target.sort();

	using edge_source_sorter = stxxl::sorter<edge_t, edge_lt_ordering>;
	edge_source_sorter updated_targets(edge_lt_ordering(), SORTER_MEM);
	for (; !sort_by_target.empty(); ++sort_by_target) {
		const edge_t e = *sort_by_target;
		while (!stars.empty() && (*stars).u < e.v) {
			++stars;
		}
		if (!stars.empty() && (*stars).u == e.v) {
			updated_targets.push(edge_t(e.u, (*stars).v));
		} else {
			updated_targets.push(e);
		}
	}
	updated_targets.sort();
	flush(updated_targets, output);
}

template <typename edge_stream_t>
void sample_edges(edge_stream_t& in_stream, edge_stream_t& out_stream1, edge_stream_t& out_stream2, double sample_prob) {
	// TODO: maybe carry generator around
	std::default_random_engine gen;
	std::bernoulli_distribution coin(sample_prob);

	// push edges into respective sub-problem
	for (; !in_stream.empty(); ++in_stream) {
		if (coin(gen)) {
			out_stream1.push(*in_stream);
		} else {
			out_stream2.push(*in_stream);
		}
	}
}

template <typename edge_stream_t>
void contract_nodes(edge_stream_t& edges, size_t contract_num, edge_stream_t& output_edges, edge_stream_t& output_stars) {
	// TODO: make this streaming-based, too
	edge_stream_t tree;
	using edge_source_sorter = stxxl::sorter<edge_t, edge_lt_ordering>;
	edge_source_sorter sort_contracted_edges(edge_lt_ordering(), SORTER_MEM);
	run_sibeyn(edges, contract_num, tree, sort_contracted_edges);
	tree.consume();
	sort_contracted_edges.sort();
	flush(sort_contracted_edges, output_edges);
	StreamEdgesOrientReverse reversing_edges(tree);
	tfp(reversing_edges, output_stars);
}
back to top