SetPairTraverser.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.traverser;

import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.gradoop.common.model.impl.properties.PropertyValue;
import org.gradoop.flink.model.impl.operators.matching.common.MatchStrategy;
import org.gradoop.flink.model.impl.operators.matching.common.functions.ElementHasCandidate;
import org.gradoop.flink.model.impl.operators.matching.common.query.Step;
import org.gradoop.flink.model.impl.operators.matching.common.query.TraversalCode;
import org.gradoop.flink.model.impl.operators.matching.common.tuples.Embedding;
import org.gradoop.flink.model.impl.operators.matching.common.tuples.IdWithCandidates;
import org.gradoop.flink.model.impl.operators.matching.common.tuples.TripleWithCandidates;
import org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.ExplorativePatternMatching;

import org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.debug.PrintEdgeStep;
import org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.debug.PrintEmbeddingWithTiePoint;
import org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.debug.PrintVertexStep;
import org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.functions.BuildEdgeStep;
import org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.functions.BuildEmbeddingFromVertex;
import org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.functions.BuildVertexStep;
import org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.functions.EdgeHasCandidate;
import org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.functions.UpdateEdgeMapping;
import org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.functions.UpdateVertexMapping;
import org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.functions.VertexHasCandidate;
import org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.tuples.EdgeStep;
import org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.tuples.EmbeddingWithTiePoint;
import org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.tuples.VertexStep;

import java.util.Objects;

import static org.gradoop.flink.model.impl.operators.matching.common.debug.Printer.log;

/**
 * Traverses a graph represented by two DataSets (vertices and edges).
 *
 * @param <K> key type
 */
public abstract class SetPairTraverser<K> extends DistributedTraverser<K> {
  /**
   * Join strategy used for the join between embeddings and edges
   */
  private final JoinOperatorBase.JoinHint edgeStepJoinStrategy;
  /**
   * Join strategy used for the join between embeddings and vertices
   */
  private final JoinOperatorBase.JoinHint vertexStepJoinStrategy;

  /**
   * Creates a new distributed traverser.
   *
   * @param traversalCode          describes the graph traversal
   * @param matchStrategy          matching strategy for vertices and edges
   * @param vertexCount            number of query vertices
   * @param edgeCount              number of query edges
   * @param keyClazz               key type for embedding initialization
   * @param edgeStepJoinStrategy   Join strategy for edge extension
   * @param vertexStepJoinStrategy Join strategy for vertex extension
   * @param vertexMapping          used for debug
   * @param edgeMapping            used for debug
   */
  SetPairTraverser(TraversalCode traversalCode, MatchStrategy matchStrategy, int vertexCount,
    int edgeCount, Class<K> keyClazz, JoinOperatorBase.JoinHint edgeStepJoinStrategy,
    JoinOperatorBase.JoinHint vertexStepJoinStrategy,
    DataSet<Tuple2<K, PropertyValue>> vertexMapping,
    DataSet<Tuple2<K, PropertyValue>> edgeMapping) {
    super(traversalCode, matchStrategy, vertexCount, edgeCount, keyClazz,
      vertexMapping, edgeMapping);

    Objects.requireNonNull(edgeStepJoinStrategy);
    Objects.requireNonNull(vertexStepJoinStrategy);

    this.edgeStepJoinStrategy = edgeStepJoinStrategy;
    this.vertexStepJoinStrategy = vertexStepJoinStrategy;
  }

  /**
   * Traverses the graph, thereby extracting embeddings of a given pattern.
   *
   * @param vertices  vertices including their query candidates
   * @param edges     edges including their query candidates
   * @return embeddings contained in the graph
   */
  public abstract DataSet<Tuple1<Embedding<K>>> traverse(
    DataSet<IdWithCandidates<K>> vertices,
    DataSet<TripleWithCandidates<K>> edges);

  private JoinOperatorBase.JoinHint getEdgeStepJoinStrategy() {
    return edgeStepJoinStrategy;
  }

  private JoinOperatorBase.JoinHint getVertexStepJoinStrategy() {
    return vertexStepJoinStrategy;
  }

  /**
   * Builds the initial embeddings from the given vertices.
   *
   * @param vertices vertices and their query candidates
   * @return initial embeddings
   */
  DataSet<EmbeddingWithTiePoint<K>> buildInitialEmbeddings(DataSet<IdWithCandidates<K>> vertices) {

    Step initialStep = getTraversalCode().getStep(0);

    DataSet<EmbeddingWithTiePoint<K>> initialEmbeddings = vertices
      .filter(new ElementHasCandidate<>((int) initialStep.getFrom()))
      .map(new BuildEmbeddingFromVertex<>(getKeyClazz(), initialStep,
        getVertexCount(), getEdgeCount()));

    return log(initialEmbeddings,
      new PrintEmbeddingWithTiePoint<>(), getVertexMapping(), getEdgeMapping());
  }

  /**
   * Extends the given embeddings with valid edges and returns the updated
   * embeddings.
   *
   * @param edges             edges including their candidates
   * @param embeddings        embeddings
   * @param superstep         current iteration
   * @param traverserStrategy iteration strategy
   * @param forwardedFields   forwarded fields for edge step creation
   * @return updated embeddings
   */
  DataSet<EmbeddingWithTiePoint<K>> traverseEdges(
    DataSet<TripleWithCandidates<K>> edges,
    DataSet<EmbeddingWithTiePoint<K>> embeddings,
    DataSet<Integer> superstep,
    TraverserStrategy traverserStrategy,
    String[] forwardedFields) {

    DataSet<EdgeStep<K>> edgeSteps = edges
      .filter(new EdgeHasCandidate<>(getTraversalCode()))
      .withBroadcastSet(superstep, ExplorativePatternMatching.BC_SUPERSTEP)
      .map(new BuildEdgeStep<>(getTraversalCode()))
      .withBroadcastSet(superstep, ExplorativePatternMatching.BC_SUPERSTEP)
      .withForwardedFields(forwardedFields);

    edgeSteps = log(edgeSteps,
      new PrintEdgeStep<>(isIterative(), "post-filter-map-edge"),
      getVertexMapping(), getEdgeMapping());

    JoinOperator<EmbeddingWithTiePoint<K>, EdgeStep<K>,
      EmbeddingWithTiePoint<K>> join = embeddings
      .join(edgeSteps, getEdgeStepJoinStrategy())
      .where(0).equalTo(1) // tiePointId == sourceId/targetId tie point
      .with(new UpdateEdgeMapping<>(getTraversalCode(), getMatchStrategy(), traverserStrategy));

    if (traverserStrategy == TraverserStrategy.SET_PAIR_FOR_LOOP_ITERATION) {
      embeddings = join.withBroadcastSet(superstep, ExplorativePatternMatching.BC_SUPERSTEP);
    } else {
      embeddings = join;
    }

    return log(embeddings,
      new PrintEmbeddingWithTiePoint<>(isIterative(), "post-edge-update"),
      getVertexMapping(), getEdgeMapping());
  }

  /**
   * Extends the given embeddings with valid vertices and returns the updated
   * embeddings.
   *
   * @param vertices          vertices including their candidates
   * @param embeddings        embeddings
   * @param superstep         current super step
   * @param traverserStrategy iteration strategy
   * @return updated embeddings
   */
  DataSet<EmbeddingWithTiePoint<K>> traverseVertices(
    DataSet<IdWithCandidates<K>> vertices,
    DataSet<EmbeddingWithTiePoint<K>> embeddings,
    DataSet<Integer> superstep,
    TraverserStrategy traverserStrategy) {

    DataSet<VertexStep<K>> vertexSteps = vertices
      .filter(new VertexHasCandidate<>(getTraversalCode()))
      .withBroadcastSet(superstep, ExplorativePatternMatching.BC_SUPERSTEP)
      .map(new BuildVertexStep<>());

    vertexSteps = log(vertexSteps,
      new PrintVertexStep<>(isIterative(), "post-filter-project-vertex"),
      getVertexMapping(), getEdgeMapping());

    JoinOperator<EmbeddingWithTiePoint<K>, VertexStep<K>,
      EmbeddingWithTiePoint<K>> join = embeddings
      .join(vertexSteps, getVertexStepJoinStrategy())
      .where(0).equalTo(0) // tiePointId == vertexId
      .with(new UpdateVertexMapping<>(getTraversalCode(), getMatchStrategy(), traverserStrategy));

    if (traverserStrategy == TraverserStrategy.SET_PAIR_FOR_LOOP_ITERATION) {
      embeddings = join.withBroadcastSet(superstep, ExplorativePatternMatching.BC_SUPERSTEP);
    } else {
      embeddings = join;
    }

    embeddings = log(embeddings,
      new PrintEmbeddingWithTiePoint<>(isIterative(), "post-vertex-update"),
      getVertexMapping(), getEdgeMapping());
    return embeddings;
  }
}