ExtractPropertyFromVertex.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.dataintegration.transformation.impl;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Preconditions;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.pojo.EPGMEdge;
import org.gradoop.common.model.impl.pojo.EPGMVertex;
import org.gradoop.common.model.impl.properties.PropertyValue;
import org.gradoop.dataintegration.transformation.impl.config.EdgeDirection;
import org.gradoop.dataintegration.transformation.impl.functions.CreateNewEdges;
import org.gradoop.dataintegration.transformation.impl.functions.CreateNewVertex;
import org.gradoop.dataintegration.transformation.impl.functions.CreateNewVertexWithEqualityCondense;
import org.gradoop.dataintegration.transformation.impl.functions.ExtractPropertyWithOriginId;
import org.gradoop.flink.model.api.operators.UnaryGraphToGraphOperator;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.epgm.ByLabel;
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.functions.graphcontainment.AddToGraphBroadcast;
import org.gradoop.flink.model.impl.functions.tuple.Value0Of2;

import java.util.List;

/**
 * This operator is executed on the vertex dataset and extracts a property to a newly created vertex
 * which can be connected to the vertex it was created from. If the property is a list every
 * element of the list is a new vertex.
 */
public class ExtractPropertyFromVertex implements UnaryGraphToGraphOperator {

  /**
   * The vertices the extraction is executed for.
   */
  private String forVerticesOfLabel;

  /**
   * The property key at the original vertex.
   */
  private String originalPropertyName;

  /**
   * The label of the extracted vertex.
   */
  private String newVertexLabel;

  /**
   * The property name of the extracted property at the new vertex.
   */
  private String newPropertyName;

  /**
   * The direction of the created edge(s).
   */
  private EdgeDirection edgeDirection;

  /**
   * The label of the newly created edge(s).
   */
  private String edgeLabel;

  /**
   * If true, all property values got condensed based on their equality.
   * So that every newly created vertex has a unique property value.
   * The default value is 'true'.
   */
  private boolean condense = true;

  /**
   * Constructs a new {@link UnaryGraphToGraphOperator} which extracts properties from vertices into
   * newly created vertices.
   *
   * @param forVerticesOfLabel   The vertices the extraction is executed for.
   * @param originalPropertyName The property key at the original vertex.
   * @param newVertexLabel       The label of the extracted vertex.
   * @param newPropertyName      The property name of the extracted property at the new vertex.
   */
  public ExtractPropertyFromVertex(String forVerticesOfLabel, String originalPropertyName,
                                   String newVertexLabel, String newPropertyName) {
    this(forVerticesOfLabel, originalPropertyName, newVertexLabel, newPropertyName,
      EdgeDirection.NONE, null);
  }

  /**
   * Constructs a new {@link UnaryGraphToGraphOperator} which extracts properties from vertices into
   * newly created vertices and connects original and new vertex with each other.
   *
   * @param forVerticesOfLabel   The vertices the extraction is executed for.
   * @param originalPropertyName The property key at the original vertex.
   * @param newVertexLabel       The label of the extracted vertex.
   * @param newPropertyName      The property name of the extracted property at the new vertex.
   * @param edgeDirection        The direction of the created edge(s).
   * @param edgeLabel            The label of the newly created edge.
   */
  public ExtractPropertyFromVertex(String forVerticesOfLabel, String originalPropertyName,
                                   String newVertexLabel, String newPropertyName,
                                   EdgeDirection edgeDirection, String edgeLabel) {
    this.forVerticesOfLabel = Preconditions.checkNotNull(forVerticesOfLabel);
    this.originalPropertyName = Preconditions.checkNotNull(originalPropertyName);
    this.newVertexLabel = Preconditions.checkNotNull(newVertexLabel);
    this.newPropertyName = Preconditions.checkNotNull(newPropertyName);
    this.edgeDirection = Preconditions.checkNotNull(edgeDirection);
    if (!edgeDirection.equals(EdgeDirection.NONE)) {
      this.edgeLabel = Preconditions.checkNotNull(edgeLabel);
    }
  }

  /**
   * This method sets whether property value based condensation should be executed or not.
   * The default value is 'true'.
   *
   * @param condense If true, all property values got condensed based on their equality.
   *                 So that every newly created vertex has a unique property value.
   */
  public void setCondensation(boolean condense) {
    this.condense = condense;
  }

  @Override
  public LogicalGraph execute(LogicalGraph logicalGraph) {
    // filter the vertices by the given label
    DataSet<EPGMVertex> filteredVertices = logicalGraph
      .getVertices()
      .filter(new ByLabel<>(forVerticesOfLabel));

    // calculate new vertices and store the origin for linking
    DataSet<Tuple2<PropertyValue, GradoopId>> candidates = filteredVertices
      .flatMap(new ExtractPropertyWithOriginId(originalPropertyName));

    // extract the new vertices
    DataSet<Tuple2<EPGMVertex, List<GradoopId>>> newVerticesAndOriginIds;
    if (condense) {
      newVerticesAndOriginIds = candidates
        .groupBy(0)
        .reduceGroup(new CreateNewVertexWithEqualityCondense(
          logicalGraph.getFactory().getVertexFactory(), newVertexLabel, newPropertyName));
    } else {
      newVerticesAndOriginIds = candidates
        .map(new CreateNewVertex(logicalGraph.getFactory().getVertexFactory(), newVertexLabel,
          newPropertyName));
    }


    DataSet<EPGMVertex> vertices = newVerticesAndOriginIds
      .map(new Value0Of2<>())
      .map(new AddToGraphBroadcast<>())
      .withBroadcastSet(logicalGraph.getGraphHead().map(new Id<>()), AddToGraphBroadcast.GRAPH_ID)
      .union(logicalGraph.getVertices());

    // the newly created vertices should be linked to the original vertices
    DataSet<EPGMEdge> edges = logicalGraph.getEdges();
    if (!edgeDirection.equals(EdgeDirection.NONE)) {
      edges = newVerticesAndOriginIds
        .flatMap(new CreateNewEdges(logicalGraph.getFactory().getEdgeFactory(), edgeDirection,
          edgeLabel))
        .map(new AddToGraphBroadcast<>())
        .withBroadcastSet(logicalGraph.getGraphHead().map(new Id<>()), AddToGraphBroadcast.GRAPH_ID)
        .union(edges);
    }

    return logicalGraph.getFactory()
      .fromDataSets(logicalGraph.getGraphHead(), vertices, edges);
  }
}