ElementsFromEmbedding.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.flink.model.impl.operators.matching.common.functions;

import com.google.common.collect.Maps;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.EdgeFactory;
import org.gradoop.common.model.api.entities.Element;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.GraphHeadFactory;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.common.model.api.entities.VertexFactory;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.pojo.EPGMElement;
import org.gradoop.common.model.impl.properties.PropertyValue;
import org.gradoop.flink.model.impl.operators.matching.common.query.QueryHandler;
import org.gradoop.flink.model.impl.operators.matching.common.query.Step;
import org.gradoop.flink.model.impl.operators.matching.common.query.TraversalCode;
import org.gradoop.flink.model.impl.operators.matching.common.tuples.Embedding;
import org.gradoop.flink.model.impl.operators.matching.single.PatternMatching;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * Extracts {@link EPGMElement} instances from an {@link Embedding}.
 */
public class ElementsFromEmbedding extends RichFlatMapFunction<Tuple1<Embedding<GradoopId>>, Element> {

  /**
   * Maps edge candidates to the step in which they are traversed
   */
  private final Map<Integer, Step> edgeToStep;
  /**
   * Constructs graph heads
   */
  private final GraphHeadFactory<? extends GraphHead> graphHeadFactory;
  /**
   * Constructs vertices
   */
  private final VertexFactory<? extends Vertex> vertexFactory;
  /**
   * Constructs edges
   */
  private final EdgeFactory<? extends Edge> edgeFactory;
  /**
   * Maps query vertex ids to variables
   */
  private final Map<Long, String> queryVertexMapping;
  /**
   * Maps query edge ids to variables
   */
  private final Map<Long, String> queryEdgeMapping;
  /**
   * Reuse map for storing the variable mapping
   */
  private Map<PropertyValue, PropertyValue> reuseVariableMapping;
  /**
   * Constructor
   *
   * @param traversalCode     traversal code to retrieve sourceId/targetId
   * @param graphHeadFactory  graph head factory
   * @param vertexFactory     vertex factory
   * @param edgeFactory       edge factory
   * @param query             query handler
   */
  public ElementsFromEmbedding(TraversalCode traversalCode,
    GraphHeadFactory<? extends GraphHead> graphHeadFactory,
    VertexFactory<? extends Vertex> vertexFactory,
    EdgeFactory<? extends Edge> edgeFactory,
    QueryHandler query) {
    this.graphHeadFactory = graphHeadFactory;
    this.vertexFactory = vertexFactory;
    this.edgeFactory = edgeFactory;

    this.queryVertexMapping = query.getVertices()
      .stream()
      .collect(Collectors.toMap(v -> v.getId(), v -> v.getVariable()));

    this.queryEdgeMapping = query.getEdges()
      .stream()
      .collect(Collectors.toMap(e -> e.getId(), e -> e.getVariable()));

    List<Step> steps = traversalCode.getSteps();
    edgeToStep = Maps.newHashMapWithExpectedSize(steps.size());

    for (Step step : steps) {
      edgeToStep.put((int) step.getVia(), step);
    }
  }

  @Override
  public void open(Configuration conf) {
    this.reuseVariableMapping = new HashMap<>();
  }

  @Override
  public void flatMap(Tuple1<Embedding<GradoopId>> embedding, Collector<Element> out)
      throws Exception {

    GradoopId[] vertexMapping = embedding.f0.getVertexMapping();
    GradoopId[] edgeMapping = embedding.f0.getEdgeMapping();


    // create graph head for this embedding
    GraphHead graphHead = graphHeadFactory.createGraphHead();

    // collect vertices (and assign to graph head)
    for (int i = 0; i < vertexMapping.length; i++) {
      if (!isProcessed(vertexMapping, i)) {
        Vertex v = vertexFactory.initVertex(vertexMapping[i]);
        v.addGraphId(graphHead.getId());
        out.collect(v);
      }

      reuseVariableMapping.put(
        PropertyValue.create(queryVertexMapping.get((long) i)),
        PropertyValue.create(vertexMapping[i])
      );
    }

    // collect edges (and assign to graph head)
    for (int i = 0; i < edgeMapping.length; i++) {
      if (!isProcessed(edgeMapping, i)) {
        Step s = edgeToStep.get(i);
        // get sourceId/targetId according to traversal step
        GradoopId sourceId = s.isOutgoing() ?
          vertexMapping[(int) s.getFrom()] : vertexMapping[(int) s.getTo()];
        GradoopId targetId = s.isOutgoing() ?
          vertexMapping[(int) s.getTo()] : vertexMapping[(int) s.getFrom()];
        Edge e = edgeFactory.initEdge(edgeMapping[i], sourceId, targetId);
        e.addGraphId(graphHead.getId());
        out.collect(e);
      }

      reuseVariableMapping.put(
        PropertyValue.create(queryEdgeMapping.get((long) i)),
        PropertyValue.create(edgeMapping[i])
      );
    }

    graphHead.setProperty(PatternMatching.VARIABLE_MAPPING_KEY, reuseVariableMapping);
    out.collect(graphHead);
  }

  /**
   * Checks if the the id at the specified index has been processed before.
   *
   * @param mapping id mapping
   * @param i index
   * @return true, iff the element at position i is present at a position between 0 and i
   */
  private boolean isProcessed(GradoopId[] mapping, int i) {
    for (int j = 0; j < i; j++) {
      if (mapping[j].equals(mapping[i])) {
        return true;
      }
    }
    return false;
  }
}