TemporalVertexDegree.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.temporal.model.impl.operators.metric;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphToValueOperator;
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.operators.metric.functions.BuildTemporalDegreeTree;
import org.gradoop.temporal.model.impl.operators.metric.functions.CalculateDegreesDefaultTimesFlatMap;
import org.gradoop.temporal.model.impl.operators.metric.functions.CalculateDegreesFlatMap;
import org.gradoop.temporal.model.impl.operators.metric.functions.ExtendVertexDataWithInterval;
import org.gradoop.temporal.model.impl.operators.metric.functions.ExtractIdIntervalMap;
import org.gradoop.temporal.model.impl.operators.metric.functions.FlatMapVertexIdEdgeInterval;

import java.util.Objects;
import java.util.TreeMap;

/**
 * A TPGM operator calculating the evolution of vertex degrees for all vertices of the graph. The result is a
 * dataset of tuples {@code {id,t-from,t-to,degree}} where {@code id} is the vertex id, {@code t-from} is the
 * lower interval bound, {@code t-to} the upper interval bound and {@code degree} the degree value.
 * <p>
 * The type of the degree (IN, OUT, BOTH) as well as the time dimension to consider, can be chosen by the
 * arguments.
 * <p>
 * If the vertex degree evolution of a single vertex is needed, use the edge induced subgraph operator
 * {@link org.gradoop.flink.model.api.epgm.BaseGraphOperators#edgeInducedSubgraph(FilterFunction)} with filter
 * {@link org.gradoop.flink.model.impl.functions.epgm.BySourceId} and/or
 * {@link org.gradoop.flink.model.impl.functions.epgm.ByTargetId} before you apply this operator.
 */
public class TemporalVertexDegree
  implements UnaryBaseGraphToValueOperator<TemporalGraph, DataSet<Tuple4<GradoopId, Long, Long, Integer>>> {

  /**
   * The time dimension that will be considered.
   */
  private final TimeDimension dimension;

  /**
   * The degree type (IN, OUT, BOTH);
   */
  private final VertexDegree degreeType;

  /**
   * A flag to decide whether to include the vertex interval as lower and upper bounds for the first and
   * last degree period. If set to false, the first degree period starts at
   * {@link org.gradoop.temporal.model.impl.pojo.TemporalElement#DEFAULT_TIME_FROM} and the last degree period
   * ends at {@link org.gradoop.temporal.model.impl.pojo.TemporalElement#DEFAULT_TIME_TO}.
   */
  private boolean includeVertexTime = false;

  /**
   * Creates an instance of this temporal vertex degree operator.
   *
   * @param degreeType the degree type to consider
   * @param dimension the time dimension to consider
   */
  public TemporalVertexDegree(VertexDegree degreeType, TimeDimension dimension) {
    this.degreeType = Objects.requireNonNull(degreeType);
    this.dimension = Objects.requireNonNull(dimension);
  }

  /**
   * Configure the operator to consider or not the vertex time by calculating the degree evolution.
   *
   * @param includeVertexTime true, iff the vertex time is used as lower and upper bounds for the first and
   *                          last degree period. Note that this requires a join.
   */
  public void setIncludeVertexTime(boolean includeVertexTime) {
    this.includeVertexTime = includeVertexTime;
  }

  @Override
  public DataSet<Tuple4<GradoopId, Long, Long, Integer>> execute(TemporalGraph graph) {

    DataSet<Tuple2<GradoopId, TreeMap<Long, Integer>>> edgesWithTrees = graph.getEdges()
      // 1) Extract vertex id(s) and corresponding time intervals
      .flatMap(new FlatMapVertexIdEdgeInterval(dimension, degreeType))
      // 2) Group them by the vertex id
      .groupBy(0)
      // 3) For each vertex id, build a degree tree data structure
      .reduceGroup(new BuildTemporalDegreeTree());

    if (includeVertexTime) {
      // The time information of the vertex must be included, therefore we need to join.
      DataSet<Tuple3<GradoopId, Long, Long>> vertexIdInterval = graph.getVertices()
        .map(new ExtractIdIntervalMap(dimension));

      return edgesWithTrees
        // 4) Join the vertices to get each vertex interval
        .join(vertexIdInterval)
        .where(0).equalTo(0)
        .with(new ExtendVertexDataWithInterval())
        // 5) Since join leads to possible divided groups, we need to group again
        .groupBy(0)
        .reduce(new BuildTemporalDegreeTree())
        // 6) For each vertex, calculate the degree evolution and output a tuple {v_id, t_from, t_to, degree}
        .flatMap(new CalculateDegreesFlatMap());
    } else {
      return edgesWithTrees
        // 4/7) For each vertex, calculate the degree evolution and output a tuple
        // {v_id, t_from, t_to, degree}
        .flatMap(new CalculateDegreesDefaultTimesFlatMap());
    }
  }
}