Diff.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.operators.diff;
import org.apache.flink.api.java.DataSet;
import org.gradoop.common.model.impl.properties.PropertyValue;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphToBaseGraphOperator;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.api.functions.TemporalPredicate;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.operators.diff.functions.DiffPerElement;
import org.gradoop.temporal.model.impl.pojo.TemporalEdge;
import org.gradoop.temporal.model.impl.pojo.TemporalVertex;
import java.util.Objects;
/**
* Calculates the difference between two snapshots of a graph by comparing the temporal attributes
* of the graph elements.
* <p>
* The snapshots are extracted through two given temporal predicates. The result is a temporal graph
* containing the union of both graph element sets. Each element gets a new property named
* {@link Diff#PROPERTY_KEY} whose value will be a number indicating that an element is either
* equal in both snapshots (0) or added (1) or removed (-1) in the second snapshot.
* Elements not present in either snapshots will be discarded.
* <p>
* The resulting graph will not be verified, i.e. dangling edges could occur. Use the
* {@link TemporalGraph#verify()} operator to validate the graph. The graph head is preserved.
*/
public class Diff implements UnaryBaseGraphToBaseGraphOperator<TemporalGraph> {
/**
* The property key used to store the diff result on each element.
*/
public static final String PROPERTY_KEY = "_diff";
/**
* The property value used to indicate that an element was added in the second snapshot.
*/
public static final PropertyValue VALUE_ADDED = PropertyValue.create(1);
/**
* The property value used to indicate that an element is equal in both snapshots.
*/
public static final PropertyValue VALUE_EQUAL = PropertyValue.create(0);
/**
* The property value used to indicate that an element was removed in the second snapshot.
*/
public static final PropertyValue VALUE_REMOVED = PropertyValue.create(-1);
/**
* The predicate used to determine the first snapshot.
*/
private final TemporalPredicate firstPredicate;
/**
* The predicate used to determine the second snapshot.
*/
private final TemporalPredicate secondPredicate;
/**
* Specifies the time dimension that will be considered by the operator.
*/
private TimeDimension dimension;
/**
* Create an instance of the TPGM diff operator, setting the two predicates used to determine the snapshots.
* By default, valid times will be used by the predicate. To use transaction times, use
* {@link Diff#Diff(TemporalPredicate, TemporalPredicate, TimeDimension)} instead.
*
* @param firstPredicate The predicate used for the first snapshot.
* @param secondPredicate The predicate used for the second snapshot.
*/
public Diff(TemporalPredicate firstPredicate, TemporalPredicate secondPredicate) {
this(firstPredicate, secondPredicate, TimeDimension.VALID_TIME);
}
/**
* Create an instance of the TPGM diff operator, setting the two predicates used to determine the snapshots.
*
* @param firstPredicate The predicate used for the first snapshot.
* @param secondPredicate The predicate used for the second snapshot.
* @param dimension The time dimension that will be used.
*/
public Diff(TemporalPredicate firstPredicate, TemporalPredicate secondPredicate, TimeDimension dimension) {
this.firstPredicate = Objects.requireNonNull(firstPredicate, "No first predicate given.");
this.secondPredicate = Objects.requireNonNull(secondPredicate, "No second predicate given.");
this.dimension = Objects.requireNonNull(dimension, "No time dimension given.");
}
@Override
public TemporalGraph execute(TemporalGraph graph) {
DataSet<TemporalVertex> transformedVertices = graph.getVertices()
.flatMap(new DiffPerElement<>(firstPredicate, secondPredicate, dimension));
DataSet<TemporalEdge> transformedEdges = graph.getEdges()
.flatMap(new DiffPerElement<>(firstPredicate, secondPredicate, dimension));
return graph.getFactory().fromDataSets(graph.getGraphHead(), transformedVertices, transformedEdges);
}
}