VertexFusor.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.flink.model.impl.operators.layouting.functions;

import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
import org.gradoop.flink.model.impl.operators.layouting.util.LEdge;
import org.gradoop.flink.model.impl.operators.layouting.util.LGraph;
import org.gradoop.flink.model.impl.operators.layouting.util.LVertex;
import org.gradoop.flink.model.impl.operators.layouting.util.Vector;

import java.util.Random;

/**
 * Simplifies the graph by combining similar vertices to super-vertices using a
 * Comparison-Function and a threshold.
 */
public class VertexFusor {
  /**
   * The VertexCompareFunction to use to find similar vertices
   */
  protected VertexCompareFunction compareFunction;
  /**
   * Only consider vertices as similar if their similarity is larger then this value
   */
  protected double threshold;

  /**
   * Construct new VertexFusor
   *
   * @param compareFunction The VertexCompareFunction to use to find similar vertices
   * @param threshold       Only consider vertices as similar if their similarity is larger then
   *                        this
   *                        value
   */
  public VertexFusor(VertexCompareFunction compareFunction, double threshold) {
    this.compareFunction = compareFunction;
    this.threshold = threshold;
  }

  /**
   * Execute the operation. Should be called iteratively. A single call is usually not enough for
   * practical results.
   *
   * @param graph The graph to simplify
   * @return A tuple containing the vertices and edges of the simplified graph
   */
  public LGraph execute(LGraph graph) {
    DataSet<LVertex> vertices = graph.getVertices();
    DataSet<LEdge> edges = graph.getEdges();

    DataSet<Tuple2<LVertex, Boolean>> classifiedVertices = chooseDonorsAndAcceptors(vertices);

    DataSet<Tuple2<LVertex, LVertex>> fusions = generateFusionCandidates(classifiedVertices, edges);

    DataSet<LVertex> superVertices = fusions.groupBy("1.0").reduceGroup(new SuperVertexGenerator());

    DataSet<LVertex> remainingVertices = findRemainingVertices(fusions, vertices, superVertices);

    vertices = remainingVertices.union(superVertices);

    edges = fixEdgeReferences(edges, fusions);

    edges = edges.groupBy(LEdge.SOURCE_ID_POSITION, LEdge.TARGET_ID_POSITION).reduce((a, b) -> {
      a.addSubEdge(b.getId());
      a.addSubEdges(b.getSubEdges());
      return a;
    });

    return new LGraph(vertices, edges);
  }

  /**
   * Splits the vertices randomly into donor and acceptor-vertices (~50/50).
   *
   * @param vertices vertices to classify
   * @return {@code Tuple2<LVertex, Boolean>}. If the boolean is true, the vertex is an acceptor
   */
  protected DataSet<Tuple2<LVertex, Boolean>> chooseDonorsAndAcceptors(DataSet<LVertex> vertices) {
    final Random rng = new Random();
    return vertices.map(v -> new Tuple2<>(v, rng.nextBoolean()))
      .returns(new TypeHint<Tuple2<LVertex, Boolean>>() {
      });
  }

  /**
   * Finds connected vertices by joining with the edges. Finds out if the two vertices should be
   * merged together and if so outputs a tuple for each merge.
   *
   * @param classifiedVertices The vertices (split into donors and acceptors)
   * @param edges              The edges
   * @return {@code Tuple2<LVertex, LVertex>} for each merge. f0 is the donor and f1 the acceptor
   * for the
   * merge.
   */
  protected DataSet<Tuple2<LVertex, LVertex>> generateFusionCandidates(
    DataSet<Tuple2<LVertex, Boolean>> classifiedVertices, DataSet<LEdge> edges) {
    return edges.join(classifiedVertices).where(LEdge.SOURCE_ID_POSITION).equalTo("0." + LVertex.ID_POSITION)
      .join(classifiedVertices).where("0." + LEdge.TARGET_ID_POSITION).equalTo("0." + LVertex.ID_POSITION)
      .with(new CandidateGenerator(compareFunction, threshold)).groupBy("0.0")
      .reduce((a, b) -> (a.f2 > b.f2) ? a : b).map(c -> new Tuple2<>(c.f0, c.f1))
      .returns(new TypeHint<Tuple2<LVertex, LVertex>>() { });
  }

  /**
   * There are some vertices that have neither become super-vertices nor have been merged with a
   * super vertex. Find them so they can be copied to the simplified graph.
   *
   * @param fusions       The merges that are to be performed in this iteration
   * @param vertices      The original vertices of the input graph
   * @param superVertices The newly created super-vertices
   * @return All vertices that have to be copied to the output-graph
   */
  protected DataSet<LVertex> findRemainingVertices(DataSet<Tuple2<LVertex, LVertex>> fusions,
    DataSet<LVertex> vertices, DataSet<LVertex> superVertices) {
    DataSet<LVertex> remainingVertices =
      vertices.leftOuterJoin(superVertices).where(LVertex.ID_POSITION).equalTo(LVertex.ID_POSITION)
        .with(new FlatJoinFunction<LVertex, LVertex, LVertex>() {
          @Override
          public void join(LVertex lVertex, LVertex lVertex2, Collector<LVertex> collector) {
            if (lVertex2 == null) {
              collector.collect(lVertex);
            }
          }
        });

    remainingVertices =
      remainingVertices.leftOuterJoin(fusions).where(LVertex.ID_POSITION).equalTo("0." + LVertex.ID_POSITION)
        .with(new FlatJoinFunction<LVertex, Tuple2<LVertex, LVertex>, LVertex>() {
          @Override
          public void join(LVertex lVertex, Tuple2<LVertex, LVertex> lVertexLVertexDoubleTuple3,
            Collector<LVertex> collector) {
            if (lVertexLVertexDoubleTuple3 == null) {
              collector.collect(lVertex);
            }
          }
        });
    return remainingVertices;
  }

  /**
   * When combining two vertices into one the edges of the old vertices have to be modified to
   * point to the new vertex.
   *
   * @param edges   The edges of the input-graph
   * @param fusions The merges performed in the current iteration
   * @return The "fixed" edges for the output-graph
   */
  protected DataSet<LEdge> fixEdgeReferences(DataSet<LEdge> edges,
    DataSet<Tuple2<LVertex, LVertex>> fusions) {
    edges = edges.leftOuterJoin(fusions).where(LEdge.SOURCE_ID_POSITION).equalTo("0." + LVertex.ID_POSITION)
      .with(new JoinFunction<LEdge, Tuple2<LVertex, LVertex>, LEdge>() {
        @Override
        public LEdge join(LEdge lEdge, Tuple2<LVertex, LVertex> lVertexLVertexDoubleTuple3) {
          if (lVertexLVertexDoubleTuple3 != null) {
            lEdge.setSourceId(lVertexLVertexDoubleTuple3.f1.getId());
          }
          return lEdge;
        }
      });

    edges = edges.leftOuterJoin(fusions).where(LEdge.TARGET_ID_POSITION).equalTo("0." + LVertex.ID_POSITION)
      .with(new JoinFunction<LEdge, Tuple2<LVertex, LVertex>, LEdge>() {
        @Override
        public LEdge join(LEdge lEdge, Tuple2<LVertex, LVertex> lVertexLVertexDoubleTuple3) {
          if (lVertexLVertexDoubleTuple3 != null) {
            lEdge.setTargetId(lVertexLVertexDoubleTuple3.f1.getId());
          }
          return lEdge;
        }
      });
    return edges;
  }

  /**
   * Finds out if two given vertices could be merged into one and how "good" this merge would be.
   * The strange signature of the join function is needed to be able to directly use it in the
   * join of generateFusionCandidates().
   */
  protected static class CandidateGenerator implements
    FlatJoinFunction<Tuple2<LEdge, Tuple2<LVertex, Boolean>>, Tuple2<LVertex, Boolean>,
      Tuple3<LVertex, LVertex, Double>> {

    /**
     * ComparisonFunction to use to compute similarity of vertices
     */
    protected VertexCompareFunction cf;
    /**
     * Minimum similarity to allow merge
     */
    protected Double threshold;

    /**
     * Construct new instance
     *
     * @param cf        ComparisonFunction to use to compute similarity of vertices
     * @param threshold Minimum similarity to allow merge
     */
    public CandidateGenerator(VertexCompareFunction cf, Double threshold) {
      this.cf = cf;
      this.threshold = threshold;
    }

    @Override
    public void join(Tuple2<LEdge, Tuple2<LVertex, Boolean>> source,
      Tuple2<LVertex, Boolean> target, Collector<Tuple3<LVertex, LVertex, Double>> collector) throws
      Exception {

      LVertex sourceVertex = source.f1.f0;
      boolean sourceType = source.f1.f1;

      LVertex targetVertex = target.f0;
      boolean targetType = target.f1;

      // Can not merge two vertices of the same type (donor/acceptor)
      if (sourceType == targetType) {
        return;
      }

      Double similarity = cf.compare(sourceVertex, targetVertex);

      // Can not merge vertices that are not similar enough
      if (similarity < threshold) {
        return;
      }

      // The acceptor-vertex (targetType==true) MUST be the second element of the tuple
      if (targetType) {
        collector.collect(new Tuple3<>(sourceVertex, targetVertex, similarity));
      } else {
        collector.collect(new Tuple3<>(targetVertex, sourceVertex, similarity));
      }

    }
  }

  /**
   * Combines multiple vertices into a single super-vertex. The created super-vertex inherits the
   * id of the acceptor. The position of the super-vertex is a weighted average of all
   * participating vertices.
   */
  protected static class SuperVertexGenerator implements
    GroupReduceFunction<Tuple2<LVertex, LVertex>, LVertex> {

    /**
     * Combine vertices
     *
     * @param iterable  The vertices to combine. f0 contains donor-vertices and f1 contains
     *                  (always the same) acceptor vertex.
     * @param collector Collector for the resuls
     */
    @Override
    public void reduce(Iterable<Tuple2<LVertex, LVertex>> iterable,
      Collector<LVertex> collector) {
      int count = 0;
      Vector positionSum = new Vector();
      LVertex self = null;

      for (Tuple2<LVertex, LVertex> t : iterable) {
        if (count == 0) {
          self = t.f1;
          count = t.f1.getCount();
          positionSum.mAdd(t.f1.getPosition().mul(t.f1.getCount()));
        }
        count += t.f0.getCount();
        positionSum.mAdd(t.f0.getPosition().mul(t.f0.getCount()));
        self.addSubVertex(t.f0.getId());
        self.addSubVertices(t.f0.getSubVertices());
      }

      self.setPosition(positionSum.div(count));

      collector.collect(self);
    }
  }
}