BaseGraphCollectionOperators.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.flink.model.api.epgm;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.DataSet;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.id.GradoopIdSet;
import org.gradoop.flink.model.api.functions.GraphHeadReduceFunction;
import org.gradoop.flink.model.api.operators.ApplicableUnaryBaseGraphToBaseGraphOperator;
import org.gradoop.flink.model.api.operators.BinaryBaseGraphCollectionToBaseGraphCollectionOperator;
import org.gradoop.flink.model.api.operators.BinaryBaseGraphCollectionToValueOperator;
import org.gradoop.flink.model.api.operators.ReducibleBinaryBaseGraphToBaseGraphOperator;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphCollectionToBaseGraphCollectionOperator;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphCollectionToBaseGraphOperator;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphCollectionToValueOperator;
import org.gradoop.flink.model.impl.operators.combination.Combination;
import org.gradoop.flink.model.impl.operators.difference.Difference;
import org.gradoop.flink.model.impl.operators.difference.DifferenceBroadcast;
import org.gradoop.flink.model.impl.operators.distinction.DistinctById;
import org.gradoop.flink.model.impl.operators.distinction.DistinctByIsomorphism;
import org.gradoop.flink.model.impl.operators.distinction.GroupByIsomorphism;
import org.gradoop.flink.model.impl.operators.equality.CollectionEquality;
import org.gradoop.flink.model.impl.operators.equality.CollectionEqualityByGraphIds;
import org.gradoop.flink.model.impl.operators.exclusion.Exclusion;
import org.gradoop.flink.model.impl.operators.intersection.Intersection;
import org.gradoop.flink.model.impl.operators.intersection.IntersectionBroadcast;
import org.gradoop.flink.model.impl.operators.limit.Limit;
import org.gradoop.flink.model.impl.operators.matching.transactional.TransactionalPatternMatching;
import org.gradoop.flink.model.impl.operators.matching.transactional.algorithm.DepthSearchMatching;
import org.gradoop.flink.model.impl.operators.matching.transactional.algorithm.PatternMatchingAlgorithm;
import org.gradoop.flink.model.impl.operators.matching.transactional.function.AddMatchesToProperties;
import org.gradoop.flink.model.impl.operators.overlap.Overlap;
import org.gradoop.flink.model.impl.operators.selection.Selection;
import org.gradoop.flink.model.impl.operators.tostring.functions.EdgeToDataString;
import org.gradoop.flink.model.impl.operators.tostring.functions.EdgeToIdString;
import org.gradoop.flink.model.impl.operators.tostring.functions.GraphHeadToDataString;
import org.gradoop.flink.model.impl.operators.tostring.functions.GraphHeadToEmptyString;
import org.gradoop.flink.model.impl.operators.tostring.functions.VertexToDataString;
import org.gradoop.flink.model.impl.operators.tostring.functions.VertexToIdString;
import org.gradoop.flink.model.impl.operators.union.Union;
import org.gradoop.flink.model.impl.operators.verify.VerifyCollection;
import org.gradoop.flink.model.impl.operators.verify.VerifyGraphsContainment;

/**
 * Defines the operators that are available on a {@link BaseGraphCollection}.
 *
 * @param <G> type of the graph head
 * @param <V> the vertex type
 * @param <E> the edge type
 * @param <LG> the type of the base graph
 * @param <GC> the type of the graph collection
 */
public interface BaseGraphCollectionOperators<
  G extends GraphHead,
  V extends Vertex,
  E extends Edge,
  LG extends BaseGraph<G, V, E, LG, GC>,
  GC extends BaseGraphCollection<G, V, E, LG, GC>> {

  //----------------------------------------------------------------------------
  // Base Graph / Graph Head Getters
  //----------------------------------------------------------------------------

  /**
   * Returns base graph from collection using the given identifier. If the graph does not exist,
   * an empty base graph is returned.
   *
   * @param graphID graph identifier
   * @return base graph with given id or an empty base graph
   */
  LG getGraph(final GradoopId graphID);

  /**
   * Extracts base graphs from collection using their identifiers.
   *
   * @param identifiers graph identifiers
   * @return collection containing requested base graphs
   */
  default GC getGraphs(final GradoopId... identifiers) {
    return getGraphs(GradoopIdSet.fromExisting(identifiers));
  }
  /**
   * Extracts base graphs from collection using their identifiers.
   *
   * @param identifiers graph identifiers
   * @return collection containing requested base graphs
   */
  GC getGraphs(final GradoopIdSet identifiers);

  //----------------------------------------------------------------------------
  // Unary Operators
  //----------------------------------------------------------------------------

  /**
   * Filter containing graphs based on their associated graph head.
   *
   * @param predicateFunction predicate function for graph head
   * @return collection with base graphs that fulfil the predicate
   */
  default GC select(final FilterFunction<G> predicateFunction) {
    return callForCollection(new Selection<>(predicateFunction));
  }

  /**
   * Returns the first {@code n} arbitrary logical graphs contained in that collection.
   *
   * @param n number of graphs to return from collection
   * @return subset of the graph collection
   */
  default GC limit(int n) {
    return callForCollection(new Limit<>(n));
  }

  /**
   * Matches a given pattern on a graph collection.
   * The boolean flag {@code returnEmbeddings} specifies, if the return shall be the input graphs with
   * a new property {@link AddMatchesToProperties#DEFAULT_KEY}, or a new collection consisting of the
   * constructed embeddings.
   *
   * @param query the query pattern in GDL syntax
   * @param algorithm custom pattern matching algorithm, e.g., {@link DepthSearchMatching}
   * @param returnEmbeddings if true it returns the embeddings as a new graph collection
   *                         if false it returns the input collection with a new property with key
   *                         {@link AddMatchesToProperties#DEFAULT_KEY} and value true/false if the pattern
   *                         is contained in the respective graph
   * @return a graph collection containing either the embeddings or the input
   * graphs with a new property with name {@link AddMatchesToProperties#DEFAULT_KEY}
   */
  default GC query(String query, PatternMatchingAlgorithm algorithm, boolean returnEmbeddings) {
    return callForCollection(new TransactionalPatternMatching<>(query, algorithm, returnEmbeddings));
  }

  /**
   * Verifies each graph of this collection, removing dangling edges, i.e. edges pointing to or from
   * a vertex not contained in the graph.<br>
   * This operator can be applied after an operator that has not checked these graphs validity.
   * The graph heads of these base graphs remains unchanged.
   *
   * @return this graph collection with all dangling edges removed.
   */
  default GC verify() {
    return callForCollection(new VerifyCollection<>());
  }

  /**
   * Verifies this graph collection, removing dangling graph ids from its elements,
   * i.e. ids not contained in the collection.<br>
   * This operator can be applied after an operator that has not checked the graphs validity.
   * The graph head of this graph collection remains unchanged.
   *
   * @return this graph collection with all dangling graph ids removed.
   */
  default GC verifyGraphsContainment() {
    return callForCollection(new VerifyGraphsContainment<>());
  }

  //----------------------------------------------------------------------------
  // Binary Operators
  //----------------------------------------------------------------------------

  /**
   * Returns a collection with all base graphs from two input collections.
   * Graph equality is based on their identifiers.
   *
   * @param otherCollection collection to build union with
   * @return union of both collections
   */
  default GC union(GC otherCollection) {
    return callForCollection(new Union<>(), otherCollection);
  }

  /**
   * Returns a collection with all base graphs that exist in both input
   * collections. Graph equality is based on their identifiers.
   *
   * @param otherCollection collection to build intersect with
   * @return intersection of both collections
   */
  default GC intersect(GC otherCollection) {
    return callForCollection(new Intersection<>(), otherCollection);
  }

  /**
   * Returns a collection with all base graphs that exist in both input collections.
   * Graph equality is based on their identifiers.
   * <p>
   * Implementation that works faster if {@code otherCollection} is small
   * (e.g. fits in the workers main memory).
   *
   * @param otherCollection collection to build intersect with
   * @return intersection of both collections
   */
  default GC intersectWithSmallResult(GC otherCollection) {
    return callForCollection(new IntersectionBroadcast<>(), otherCollection);
  }

  /**
   * Returns a collection with all base graphs that are contained in that
   * collection but not in the other. Graph equality is based on their identifiers.
   *
   * @param otherCollection collection to subtract from that collection
   * @return difference between that and the other collection
   */
  default GC difference(GC otherCollection) {
    return callForCollection(new Difference<>(), otherCollection);
  }

  /**
   * Returns a collection with all base graphs that are contained in that
   * collection but not in the other. Graph equality is based on their identifiers.
   * <p>
   * Alternate implementation that works faster if the intermediate result
   * (list of graph identifiers) fits into the workers memory.
   *
   * @param otherCollection collection to subtract from that collection
   * @return difference between that and the other collection
   */
  default GC differenceWithSmallResult(GC otherCollection) {
    return callForCollection(new DifferenceBroadcast<>(), otherCollection);
  }

  /**
   * Checks, if another collection contains the same graphs as this graph (by id).
   *
   * @param otherCollection other graph collection
   * @return 1-element dataset containing true, if equal by graph ids
   */
  default DataSet<Boolean> equalsByGraphIds(GC otherCollection) {
    return callForValue(new CollectionEqualityByGraphIds<>(), otherCollection);
  }

  /**
   * Checks, if another collection contains the same graphs as this graph (by vertex and edge ids).
   *
   * @param otherCollection other graph
   * @return 1-element dataset containing true, if equal by element ids
   */
  default DataSet<Boolean> equalsByGraphElementIds(GC otherCollection) {
    return callForValue(new CollectionEquality<>(
      new GraphHeadToEmptyString<>(),
      new VertexToIdString<>(),
      new EdgeToIdString<>(), true), otherCollection);
  }

  /**
   * Returns a 1-element dataset containing a {@code boolean} value which
   * indicates if the graph collection is equal to the given graph collection.
   * <p>
   * Equality is defined on the element data contained inside the collection, i.e. vertices and edges.
   * This is limited to EPGM data (IDs, Label, Properties). All extensions of EPGM are ignored.
   *
   * @param otherCollection graph collection to compare with
   * @return 1-element dataset containing {@code true} if the two collections
   * are equal or {@code false} if not
   */
  default DataSet<Boolean> equalsByGraphElementData(GC otherCollection) {
    return callForValue(new CollectionEquality<>(
      new GraphHeadToEmptyString<>(),
      new VertexToDataString<>(),
      new EdgeToDataString<>(), true), otherCollection);
  }

  /**
   * Returns a 1-element dataset containing a {@code boolean} value which
   * indicates if the graph collection is equal to the given graph collection.
   * <p>
   * Equality is defined on the data contained inside the collection, i.e. graph heads, vertices and edges.
   * This is limited to EPGM data (IDs, Label, Properties). All extensions of EPGM are ignored.
   *
   * @param otherCollection graph collection to compare with
   * @return 1-element dataset containing {@code true} if the two collections
   * are equal or {@code false} if not
   */
  default DataSet<Boolean> equalsByGraphData(GC otherCollection) {
    return callForValue(new CollectionEquality<>(
      new GraphHeadToDataString<>(),
      new VertexToDataString<>(),
      new EdgeToDataString<>(), true), otherCollection);
  }

  //----------------------------------------------------------------------------
  // Utility methods
  //----------------------------------------------------------------------------

  /**
   * Returns a 1-element dataset containing a {@code boolean} value which indicates if the collection
   * is empty.
   *
   * A collection is considered empty, if it contains no logical graphs.
   *
   * @return  1-element dataset containing {@code true}, if the collection is empty or {@code false} if not
   */
  DataSet<Boolean> isEmpty();

  /**
   * Returns a distinct collection of base graphs. Graph equality is based on graph identifiers.
   *
   * @return distinct graph collection
   */
  default GC distinctById() {
    return callForCollection(new DistinctById<>());
  }

  /**
   * Groups a graph collection by isomorphism.
   * Graph equality is based on isomorphism including labels and properties.
   *
   * @return distinct graph collection
   */
  default GC distinctByIsomorphism() {
    return callForCollection(new DistinctByIsomorphism<>());
  }

  /**
   * Groups a graph collection by isomorphism including labels and values.
   *
   * @param reduceFunction function to reduce all graph heads of a group into a single representative one,
   *                       e.g., to count the number of group members
   *
   * @return grouped graph collection
   */
  default GC groupByIsomorphism(GraphHeadReduceFunction<G> reduceFunction) {
    return callForCollection(new GroupByIsomorphism<>(reduceFunction));
  }

  //----------------------------------------------------------------------------
  // Call for Operators
  //----------------------------------------------------------------------------

  /**
   * Creates a value using the given unary collection to value operator.
   *
   * @param operator unary graph collection to value operator
   * @param <T> return type
   * @return result of given operator
   */
  <T> T callForValue(UnaryBaseGraphCollectionToValueOperator<GC, T> operator);

  /**
   * Calls the given binary collection to value operator using this graph collection and the
   * input graph collection.
   *
   * @param operator        binary collection to value operator
   * @param otherCollection second input collection for operator
   * @param <T> return type
   * @return result of given operator
   */
  <T> T callForValue(BinaryBaseGraphCollectionToValueOperator<GC, T> operator, GC otherCollection);

  /**
   * Calls the given unary collection to graph operator using this graph collection to get a graph of type
   * {@link LG}.
   *
   * @param operator unary collection to graph operator
   * @return result of given operator
   */
  default LG callForGraph(UnaryBaseGraphCollectionToBaseGraphOperator<GC, LG> operator) {
    return callForValue(operator);
  }

  /**
   * Creates a graph collection using the given unary graph collection operator.
   *
   * @param operator unary graph collection to graph collection operator
   * @return result of given operator
   */
  default GC callForCollection(UnaryBaseGraphCollectionToBaseGraphCollectionOperator<GC> operator) {
    return callForValue(operator);
  }

  /**
   * Calls the given binary collection to collection operator using this graph collection and the
   * input graph collection.
   *
   * @param operator        binary collection to collection operator
   * @param otherCollection second input collection for operator
   * @return result of given operator
   */
  default GC callForCollection(BinaryBaseGraphCollectionToBaseGraphCollectionOperator<GC> operator,
                               GC otherCollection) {
    return callForValue(operator, otherCollection);
  }

  /**
   * Applies a given unary graph to graph operator (e.g., aggregate) on each
   * base graph in the graph collection.
   *
   * @param operator applicable unary graph to graph operator
   * @return collection with resulting logical graphs
   */
  default GC apply(ApplicableUnaryBaseGraphToBaseGraphOperator<GC> operator) {
    return callForValue(operator);
  }

  /**
   * Transforms a graph collection into a graph by applying a
   * {@link ReducibleBinaryBaseGraphToBaseGraphOperator} pairwise on the elements of the collection.
   *
   * @param operator reducible binary graph to graph operator
   * @return base graph returned by the operator
   *
   * @see Exclusion
   * @see Overlap
   * @see Combination
   */
  default LG reduce(ReducibleBinaryBaseGraphToBaseGraphOperator<GC, LG> operator) {
    return callForValue(operator);
  }
}