Subgraph.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.flink.model.impl.operators.subgraph;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.DataSet;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.flink.model.api.epgm.BaseGraph;
import org.gradoop.flink.model.api.epgm.BaseGraphCollection;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphToBaseGraphOperator;
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.functions.epgm.SourceId;
import org.gradoop.flink.model.impl.functions.epgm.TargetId;
import org.gradoop.flink.model.impl.functions.utils.RightSide;
import org.gradoop.flink.model.impl.operators.subgraph.functions.EdgeToSourceAndTargetId;
import org.gradoop.flink.model.impl.operators.verify.Verify;

import static org.gradoop.flink.model.impl.operators.subgraph.Subgraph.Strategy.BOTH;
import static org.gradoop.flink.model.impl.operators.subgraph.Subgraph.Strategy.VERTEX_INDUCED;
import static org.gradoop.flink.model.impl.operators.subgraph.Subgraph.Strategy.EDGE_INDUCED;
import static org.gradoop.flink.model.impl.operators.subgraph.Subgraph.Strategy.EDGE_INDUCED_PROJECT_FIRST;

/**
 * Extracts a subgraph from a base graph using the given filter functions.
 * The graph head stays unchanged for the subgraph.
 * <p>
 * The operator is able to:
 * <ol>
 *   <li>extract vertex-induced subgraph</li>
 *   <li>extract edge-induced subgraph via join + union strategy</li>
 *   <li>extract edge-induced subgraph via project + union + join strategy</li>
 *   <li>extract subgraph based on vertex and edge filter function without verification
 *   (no joins, use {@link Verify} to verify the subgraph)</li>
 * </ol>
 *
 * @param <G>  The graph head type.
 * @param <V>  The vertex type.
 * @param <E>  The edge type.
 * @param <LG> The type of the graph.
 * @param <GC> The type of the graph collection.
 */
public class Subgraph<
  G extends GraphHead,
  V extends Vertex,
  E extends Edge,
  LG extends BaseGraph<G, V, E, LG, GC>,
  GC extends BaseGraphCollection<G, V, E, LG, GC>>
  implements UnaryBaseGraphToBaseGraphOperator<LG> {

  /**
   * Used to filter vertices from the graph.
   */
  protected final FilterFunction<V> vertexFilterFunction;

  /**
   * Used to filter edges from the graph.
   */
  protected final FilterFunction<E> edgeFilterFunction;

  /**
   * Execution strategy for the operator.
   */
  protected final Strategy strategy;

  /**
   * Available execution strategies.
   */
  public enum Strategy {
    /**
     * Applies both filter functions on the input vertex and edge data set.
     */
    BOTH,
    /**
     * Only applies the vertex filter function and adds the incident edges connecting those
     * vertices via a join.
     */
    VERTEX_INDUCED,
    /**
     * Only applies the edge filter function and computes the resulting vertices via:<br>
     * {@code (E |><| V ON e.source = v.id) U (E |><| V on e.target = v.id)}
     */
    EDGE_INDUCED,
    /**
     * Only applies the edge filter function and computes the resulting vertices via:<br>
     * {@code DISTINCT((π_source(E) U π_target(E))) |><| V}
     */
    EDGE_INDUCED_PROJECT_FIRST
  }

  /**
   * Creates a new sub graph operator instance.
   * <p>
   * If both parameters are not {@code null}, the operator returns the subgraph
   * defined by filtered vertices and edges.
   * <p>
   * If the {@code edgeFilterFunction} is {@code null}, the operator returns the vertex-induced subgraph.
   * <p>
   * If the {@code vertexFilterFunction} is {@code null}, the operator returns the edge-induced subgraph.
   *
   * @param vertexFilterFunction vertex filter function
   * @param edgeFilterFunction   edge filter function
   * @param strategy             sets the execution strategy for the operator
   */
  public Subgraph(FilterFunction<V> vertexFilterFunction,
    FilterFunction<E> edgeFilterFunction, Strategy strategy) {

    if (strategy == BOTH &&
      (vertexFilterFunction == null || edgeFilterFunction == null)) {
      throw new IllegalArgumentException("No vertex or no edge filter function was given.");
    }

    if (strategy == VERTEX_INDUCED && vertexFilterFunction == null) {
      throw new IllegalArgumentException("No vertex filter functions was given.");
    }

    if ((strategy == EDGE_INDUCED || strategy == EDGE_INDUCED_PROJECT_FIRST) &&
      edgeFilterFunction == null) {
      throw new IllegalArgumentException("No vertex edge functions was given.");
    }

    this.strategy = strategy;

    this.vertexFilterFunction = vertexFilterFunction;
    this.edgeFilterFunction = edgeFilterFunction;
  }

  @Override
  public LG execute(LG superGraph) {

    LG result;
    switch (strategy) {
    case BOTH:
      result = subgraph(superGraph);
      break;
    case VERTEX_INDUCED:
      result = vertexInducedSubgraph(superGraph);
      break;
    case EDGE_INDUCED:
      result = edgeInducedSubgraph(superGraph);
      break;
    case EDGE_INDUCED_PROJECT_FIRST:
      result = edgeInducedSubgraphProjectFirst(superGraph);
      break;
    default:
      throw new IllegalArgumentException("Strategy " + strategy + " is not supported");
    }

    return result;
  }

  /**
   * Returns the subgraph of the given supergraph that is induced by the
   * vertices that fulfil the given filter function.
   *
   * @param superGraph supergraph
   * @return vertex-induced subgraph
   */
  private LG vertexInducedSubgraph(LG superGraph) {
    DataSet<V> filteredVertices = superGraph.getVertices().filter(vertexFilterFunction);

    return superGraph.getFactory()
      .fromDataSets(superGraph.getGraphHead(), filteredVertices, superGraph.getEdges())
      .verify();
  }

  /**
   * Returns the subgraph of the given supergraph that is induced by the
   * edges that fulfil the given filter function.
   *
   * @param superGraph supergraph
   * @return edge-induced subgraph
   */
  private LG edgeInducedSubgraph(LG superGraph) {
    DataSet<E> filteredEdges = superGraph.getEdges().filter(edgeFilterFunction);
    DataSet<V> inducedVertices = filteredEdges
      .join(superGraph.getVertices())
      .where(new SourceId<>()).equalTo(new Id<>())
      .with(new RightSide<>())
      .union(filteredEdges
        .join(superGraph.getVertices())
        .where(new TargetId<>()).equalTo(new Id<>())
        .with(new RightSide<>()))
      .distinct(new Id<>());

    return superGraph.getFactory()
      .fromDataSets(superGraph.getGraphHead(), inducedVertices, filteredEdges);
  }

  /**
   * Returns the subgraph of the given supergraph that is induced by the
   * edges that fulfil the given filter function.
   *
   * @param superGraph supergraph
   * @return edge-induced subgraph
   */
  private LG edgeInducedSubgraphProjectFirst(LG superGraph) {
    DataSet<E> filteredEdges = superGraph.getEdges().filter(edgeFilterFunction);
    DataSet<V> inducedVertices = filteredEdges
      .flatMap(new EdgeToSourceAndTargetId<>())
      .distinct()
      .join(superGraph.getVertices())
      .where("*").equalTo(new Id<>())
      .with(new RightSide<>());

    return superGraph.getFactory()
      .fromDataSets(superGraph.getGraphHead(), inducedVertices, filteredEdges);
  }

  /**
   * Returns the subgraph of the given supergraph that is defined by the
   * vertices that fulfil the vertex filter function and edges that fulfill the edge filter function.
   * <p>
   * <b>Note:</b> The operator does not verify the consistency of the resulting graph.
   *
   * @param superGraph supergraph
   * @return subgraph
   */
  private LG subgraph(LG superGraph) {
    return superGraph.getFactory().fromDataSets(
      superGraph.getGraphHead(),
      superGraph.getVertices().filter(vertexFilterFunction),
      superGraph.getEdges().filter(edgeFilterFunction));
  }
}