TemporalGraphOperators.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.api;
import org.gradoop.flink.model.api.epgm.BaseGraphOperators;
import org.gradoop.flink.model.api.functions.AggregateFunction;
import org.gradoop.flink.model.api.functions.KeyFunction;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.operators.keyedgrouping.KeyedGrouping;
import org.gradoop.flink.model.impl.operators.matching.common.MatchStrategy;
import org.gradoop.temporal.model.api.functions.TemporalPredicate;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.TemporalGraphCollection;
import org.gradoop.temporal.model.impl.functions.predicates.AsOf;
import org.gradoop.temporal.model.impl.functions.predicates.Between;
import org.gradoop.temporal.model.impl.functions.predicates.ContainedIn;
import org.gradoop.temporal.model.impl.functions.predicates.CreatedIn;
import org.gradoop.temporal.model.impl.functions.predicates.DeletedIn;
import org.gradoop.temporal.model.impl.functions.predicates.FromTo;
import org.gradoop.temporal.model.impl.functions.predicates.ValidDuring;
import org.gradoop.temporal.model.impl.operators.aggregation.functions.MaxEdgeTime;
import org.gradoop.temporal.model.impl.operators.aggregation.functions.MaxVertexTime;
import org.gradoop.temporal.model.impl.operators.aggregation.functions.MinEdgeTime;
import org.gradoop.temporal.model.impl.operators.aggregation.functions.MinVertexTime;
import org.gradoop.temporal.model.impl.operators.diff.Diff;
import org.gradoop.temporal.model.impl.operators.matching.common.query.postprocessing.CNFPostProcessing;
import org.gradoop.temporal.model.impl.operators.matching.common.statistics.TemporalGraphStatistics;
import org.gradoop.temporal.model.impl.operators.matching.common.statistics.dummy.DummyTemporalGraphStatistics;
import org.gradoop.temporal.model.impl.operators.matching.single.cypher.CypherTemporalPatternMatching;
import org.gradoop.temporal.model.impl.operators.snapshot.Snapshot;
import org.gradoop.temporal.model.impl.operators.verify.VerifyAndUpdateEdgeValidity;
import org.gradoop.temporal.model.impl.pojo.TemporalEdge;
import org.gradoop.temporal.model.impl.pojo.TemporalGraphHead;
import org.gradoop.temporal.model.impl.pojo.TemporalVertex;
import java.util.ArrayList;
import java.util.List;
/**
* Defines the operators that are available on a {@link TemporalGraph}.
*/
public interface TemporalGraphOperators extends BaseGraphOperators<TemporalGraphHead, TemporalVertex,
TemporalEdge, TemporalGraph, TemporalGraphCollection> {
//----------------------------------------------------------------------------
// Unary Operators
//----------------------------------------------------------------------------
/**
* Extracts a snapshot of this temporal graph using a given temporal predicate. The predicate is applied
* on the valid time dimension by default. To use the transaction time dimension, use
* {@link TemporalGraphOperators#snapshot(TemporalPredicate, TimeDimension)} instead.
* This operator will calculate the subgraph induced by the predicate.
*
* @param predicate the temporal predicate to apply on the valid times
* @return the snapshot as a temporal graph
*/
default TemporalGraph snapshot(TemporalPredicate predicate) {
return snapshot(predicate, TimeDimension.VALID_TIME);
}
/**
* Extracts a snapshot of this temporal graph using a given temporal predicate. The predicate is applied
* on the given time dimension.
* This operator will calculate the subgraph induced by the predicate.
*
* @param predicate the temporal predicate to apply
* @param dimension the dimension that is used
* @return the snapshot as a temporal graph
*/
default TemporalGraph snapshot(TemporalPredicate predicate, TimeDimension dimension) {
return callForGraph(new Snapshot(predicate, dimension));
}
/**
* Extracts a snapshot of this temporal graph using the temporal predicate {@code AS OF timestamp}
* where {@code timestamp} is a timestamp in milliseconds.
*
* @param timestamp the timestamp in milliseconds to query
* @return the snapshot as a temporal graph
* @see Snapshot
* @see AsOf
*/
default TemporalGraph asOf(long timestamp) {
return snapshot(new AsOf(timestamp));
}
/**
* Extracts a snapshot of this temporal graph using the temporal predicate
* {@code FROM fromTimestamp TO toTimestamp} where both values are timestamps in milliseconds.
*
* @param fromTimestamp the from timestamp in milliseconds to query
* @param toTimestamp the to timestamp in milliseconds to query
* @return the snapshot as a temporal graph
* @see Snapshot
* @see FromTo
*/
default TemporalGraph fromTo(long fromTimestamp, long toTimestamp) {
return snapshot(new FromTo(fromTimestamp, toTimestamp));
}
/**
* Extracts a snapshot of this temporal graph using the temporal predicate
* {@code BETWEEN fromTimestamp AND toTimestamp} where both values are timestamps in milliseconds.
*
* @param fromTimestamp the from timestamp in milliseconds to query
* @param toTimestamp the to timestamp in milliseconds to query
* @return the snapshot as a temporal graph
* @see Snapshot
* @see Between
*/
default TemporalGraph between(long fromTimestamp, long toTimestamp) {
return snapshot(new Between(fromTimestamp, toTimestamp));
}
/**
* Extracts a snapshot of this temporal graph using the temporal predicate
* {@code CONTAINED IN (fromTimestamp, toTimestamp)} where both values are timestamps in milliseconds.
*
* @param fromTimestamp the from timestamp in milliseconds to query
* @param toTimestamp the to timestamp in milliseconds to query
* @return the snapshot as a temporal graph
* @see Snapshot
* @see ContainedIn
*/
default TemporalGraph containedIn(long fromTimestamp, long toTimestamp) {
return snapshot(new ContainedIn(fromTimestamp, toTimestamp));
}
/**
* Extracts a snapshot of this temporal graph using the temporal predicate
* {@code VALID DURING (fromTimestamp, toTimestamp)} where both values are timestamps in milliseconds.
*
* @param fromTimestamp the from timestamp in milliseconds to query
* @param toTimestamp the to timestamp in milliseconds to query
* @return the snapshot as a temporal graph
* @see Snapshot
* @see ValidDuring
*/
default TemporalGraph validDuring(long fromTimestamp, long toTimestamp) {
return snapshot(new ValidDuring(fromTimestamp, toTimestamp));
}
/**
* Extracts a snapshot of this temporal graph using the temporal predicate
* {@code CREATED IN (fromTimestamp, toTimestamp)} where both values are timestamps in milliseconds.
*
* @param fromTimestamp the from timestamp in milliseconds to query
* @param toTimestamp the to timestamp in milliseconds to query
* @return the snapshot as a temporal graph
* @see Snapshot
* @see CreatedIn
*/
default TemporalGraph createdIn(long fromTimestamp, long toTimestamp) {
return snapshot(new CreatedIn(fromTimestamp, toTimestamp));
}
/**
* Extracts a snapshot of this temporal graph using the temporal predicate
* {@code DELETED IN (fromTimestamp, toTimestamp)} where both values are timestamps in milliseconds.
*
* @param fromTimestamp the from timestamp in milliseconds to query
* @param toTimestamp the to timestamp in milliseconds to query
* @return the snapshot as a temporal graph
* @see Snapshot
* @see DeletedIn
*/
default TemporalGraph deletedIn(long fromTimestamp, long toTimestamp) {
return snapshot(new DeletedIn(fromTimestamp, toTimestamp));
}
/**
* Compares two snapshots of this graph. Given two temporal predicates, this operation
* will check if a graph element (vertex or edge) was added, removed or persists in the second
* snapshot compared to the first snapshot. The predicates are applied on the valid times. To use
* transaction time dimension, use
* {@link TemporalGraphOperators#diff(TemporalPredicate, TemporalPredicate, TimeDimension)}.
* <p>
* This operation returns the union of both snapshots with the following changes:
* A property with key {@value Diff#PROPERTY_KEY}
* will be set on each graph element. Its value will be set to
* <ul>
* <li>{@code 0}, if the element is present in both snapshots.</li>
* <li>{@code 1}, if the element is present in the second, but not the first snapshot
* (i.e. it was added since the first snapshot).</li>
* <li>{@code -1}, if the element is present in the first, but not the second snapshot
* (i.e. it was removed since the first snapshot).</li>
* </ul>
* Graph elements present in neither snapshot will be discarded.
* The resulting graph will not be verified, i.e. dangling edges could occur. Use the
* {@code verify()} operator to validate the graph. The graph head is preserved.
*
* @param firstSnapshot The predicate used to determine the first snapshot.
* @param secondSnapshot The predicate used to determine the second snapshot.
* @return A temporal graph containing the union of vertex and edge sets of both snapshots,
* defined by the given two predicate functions. A property with key
* {@link Diff#PROPERTY_KEY} is set on each graph element with a numerical value (-1, 0, 1) defined above.
*/
default TemporalGraph diff(TemporalPredicate firstSnapshot, TemporalPredicate secondSnapshot) {
return diff(firstSnapshot, secondSnapshot, TimeDimension.VALID_TIME);
}
/**
* Compares two snapshots of this graph. Given two temporal predicates, this operation will check if a
* graph element (vertex or edge) was added, removed or persists in the second snapshot compared to the
* first snapshot. The predicates are applied on the specified time dimension.
* <p>
* This operation returns the union of both snapshots with the following changes:
* A property with key {@value Diff#PROPERTY_KEY} will be set on each graph element.
* Its value will be set to
* <ul>
* <li>{@code 0}, if the element is present in both snapshots.</li>
* <li>{@code 1}, if the element is present in the second, but not the first snapshot
* (i.e. it was added since the first snapshot).</li>
* <li>{@code -1}, if the element is present in the first, but not the second snapshot
* (i.e. it was removed since the first snapshot).</li>
* </ul>
* Graph elements present in neither snapshot will be discarded.
* The resulting graph will not be verified, i.e. dangling edges could occur. Use the
* {@code verify()} operator to validate the graph. The graph head is preserved.
*
* @param firstSnapshot The predicate used to determine the first snapshot.
* @param secondSnapshot The predicate used to determine the second snapshot.
* @param dimension The time dimension that will be considered by the predicates.
* @return A temporal graph containing the union of vertex and edge sets of both snapshots, defined by the
* given two predicate functions. A property with key {@link Diff#PROPERTY_KEY} is set on each graph
* element with a numerical value (-1, 0, 1) defined above.
*/
default TemporalGraph diff(TemporalPredicate firstSnapshot, TemporalPredicate secondSnapshot,
TimeDimension dimension) {
return callForGraph(new Diff(firstSnapshot, secondSnapshot, dimension));
}
/**
* Evaluates the given query using the Temporal-GDL query engine. The engine uses default morphism
* strategies, which is vertex homomorphism and edge isomorphism. The vertex and edge data of the graph
* elements is attached to the resulting vertices.
* <p>
* Note, that this method used no statistics about the data graph which may result in bad runtime
* performance. Use {@link #temporalQuery(String, TemporalGraphStatistics)} to provide statistics for the
* query planner.
*
* @param temporalGdlQuery A Temporal-GDL query as {@link String}
* @return graph collection containing matching subgraphs
*/
default TemporalGraphCollection temporalQuery(String temporalGdlQuery) {
return temporalQuery(temporalGdlQuery, new DummyTemporalGraphStatistics());
}
/**
* Evaluates the given query using the Temporal-GDL query engine. The engine uses default morphism
* strategies, which is vertex homomorphism and edge isomorphism. The vertex and edge data of the data graph
* elements is attached to the resulting vertices.
* <p>
* Note, that this method used no statistics about the data graph which may result in bad runtime
* performance. Use {@link #temporalQuery(String, String, TemporalGraphStatistics)} to provide statistics
* for the query planner.
* <p>
* In addition, the operator can be supplied with a construction pattern allowing the creation of new graph
* elements based on variable bindings of the match pattern. Consider the following example:
* <br>
* {@code graph.query(
* "MATCH (a:Author)-[:WROTE]->(:Paper)<-[:WROTE]-(b:Author) WHERE a <> b",
* "(a)-[:CO_AUTHOR]->(b)")}
* <p>
* The query pattern is looking for pairs of authors that worked on the same paper.
* The construction pattern defines a new edge of type CO_AUTHOR between the two entities.
*
* @param temporalGdlQuery A Temporal-GDL query as {@link String}
* @param constructionPattern Construction pattern in Temporal-GDL format
* @return graph collection containing the output of the construct pattern
*/
default TemporalGraphCollection temporalQuery(String temporalGdlQuery, String constructionPattern) {
return temporalQuery(temporalGdlQuery, constructionPattern, new DummyTemporalGraphStatistics());
}
/**
* Evaluates the given query using the Temporal-GDL query engine. The engine uses default morphism
* strategies, which is vertex homomorphism and edge isomorphism. The vertex and edge data of the data graph
* elements is attached to the resulting vertices.
*
* @param temporalGdlQuery A Temporal-GDL query as {@link String}
* @param graphStatistics statistics about the data graph
* @return graph collection containing matching subgraphs
*/
default TemporalGraphCollection temporalQuery(String temporalGdlQuery,
TemporalGraphStatistics graphStatistics) {
return temporalQuery(temporalGdlQuery, null, graphStatistics);
}
/**
* Evaluates the given query using the Temporal-GDL query engine. The engine uses default morphism
* strategies, which is vertex homomorphism and edge isomorphism. The vertex and edge data of the data graph
* elements is attached to the resulting vertices.
* <p>
* In addition, the operator can be supplied with a construction pattern allowing the creation of new graph
* elements based on variable bindings of the match pattern. Consider the following example:
* <br>
* {@code graph.query(
* "MATCH (a:Author)-[:WROTE]->(:Paper)<-[:WROTE]-(b:Author) WHERE a <> b",
* "(a)-[:CO_AUTHOR]->(b)")}
* <p>
* The query pattern is looking for pairs of authors that worked on the same paper.
* The construction pattern defines a new edge of type CO_AUTHOR between the two entities.
*
* @param temporalGdlQuery A Temporal-GDL query as {@link String}
* @param constructionPattern Construction pattern in Temporal-GDL format
* @param graphStatistics Statistics about the data graph
* @return graph collection containing the output of the construct pattern
*/
default TemporalGraphCollection temporalQuery(String temporalGdlQuery, String constructionPattern,
TemporalGraphStatistics graphStatistics) {
return temporalQuery(temporalGdlQuery, constructionPattern, true, MatchStrategy.HOMOMORPHISM,
MatchStrategy.ISOMORPHISM, graphStatistics);
}
/**
* Evaluates the given query using the Temporal-GDL query engine.
*
* @param temporalGdlQuery A Temporal-GDL query as {@link String}
* @param constructionPattern Construction pattern in Temporal-GDL format
* @param attachData attach original vertex and edge data to the result
* @param vertexStrategy morphism setting for vertex mapping
* @param edgeStrategy morphism setting for edge mapping
* @param stats statistics about the data graph
* @return graph collection containing the output of the construct pattern or a graph collection containing
* matching subgraphs if the construction pattern is {@code null}.
*/
default TemporalGraphCollection temporalQuery(String temporalGdlQuery, String constructionPattern,
boolean attachData, MatchStrategy vertexStrategy, MatchStrategy edgeStrategy,
TemporalGraphStatistics stats) {
return callForCollection(
new CypherTemporalPatternMatching(temporalGdlQuery, constructionPattern, attachData, vertexStrategy,
edgeStrategy, stats, new CNFPostProcessing()));
}
/**
* Grouping operator that aggregates valid times per group and sets it as new valid time.
* The grouped validFrom value will be computed by min over all validFrom values.
* The grouped validTo value will be computed by max over all validTo values.
*
* @param vertexGroupingKeys property keys to group vertices
* @return summary graph
* @see KeyedGrouping
*/
default TemporalGraph temporalGroupBy(List<KeyFunction<TemporalVertex, ?>> vertexGroupingKeys) {
return temporalGroupBy(vertexGroupingKeys, null);
}
/**
* Grouping operator that aggregates valid times per group and sets it as new valid time.
* The grouped validFrom value will be computed by min over all validFrom values.
* The grouped validTo value will be computed by max over all validTo values.
*
* @param vertexGroupingKeys property keys to group vertices
* @param edgeGroupingKeys property keys to group edges
* @return summary graph
* @see KeyedGrouping
*/
default TemporalGraph temporalGroupBy(List<KeyFunction<TemporalVertex, ?>> vertexGroupingKeys,
List<KeyFunction<TemporalEdge, ?>> edgeGroupingKeys) {
return temporalGroupBy(vertexGroupingKeys, new ArrayList<>(), edgeGroupingKeys, new ArrayList<>());
}
/**
* Grouping operator that aggregates valid times per group and sets it as new valid time.
* The grouped validFrom value will be computed by min over all validFrom values.
* The grouped validTo value will be computed by max over all validTo values.
*
* @param vertexGroupingKeys property keys to group vertices
* @param vertexAggregateFunctions aggregate functions to apply on super vertices
* @param edgeGroupingKeys property keys to group edges
* @param edgeAggregateFunctions aggregate functions to apply on super edges
* @return summary graph
* @see KeyedGrouping
*/
default TemporalGraph temporalGroupBy(List<KeyFunction<TemporalVertex, ?>> vertexGroupingKeys,
List<AggregateFunction> vertexAggregateFunctions, List<KeyFunction<TemporalEdge, ?>> edgeGroupingKeys,
List<AggregateFunction> edgeAggregateFunctions) {
// Add min/max valid time aggregations that will result in the new valid times
List<AggregateFunction> tempVertexAgg = new ArrayList<>(vertexAggregateFunctions);
tempVertexAgg.add(new MinVertexTime(TimeDimension.VALID_TIME, TimeDimension.Field.FROM)
.setAsValidTime(TimeDimension.Field.FROM));
tempVertexAgg.add(new MaxVertexTime(TimeDimension.VALID_TIME, TimeDimension.Field.TO)
.setAsValidTime(TimeDimension.Field.TO));
List<AggregateFunction> tempEdgeAgg = new ArrayList<>(edgeAggregateFunctions);
tempEdgeAgg.add(new MinEdgeTime(TimeDimension.VALID_TIME, TimeDimension.Field.FROM)
.setAsValidTime(TimeDimension.Field.FROM));
tempEdgeAgg.add(new MaxEdgeTime(TimeDimension.VALID_TIME, TimeDimension.Field.TO)
.setAsValidTime(TimeDimension.Field.TO));
return callForGraph(new KeyedGrouping<>(vertexGroupingKeys, tempVertexAgg, edgeGroupingKeys,
tempEdgeAgg));
}
//----------------------------------------------------------------------------
// Utilities
//----------------------------------------------------------------------------
/**
* Converts the {@link TemporalGraph} to a {@link LogicalGraph} instance by discarding all
* temporal information from the graph elements. All Ids (graphs, vertices, edges) are kept
* during the transformation.
*
* @return the logical graph instance
*/
LogicalGraph toLogicalGraph();
/**
* Updates edges of this graph to set their validity such that an edge is only valid if both of
* its adjacent vertices are valid at that time. Edges that can never be valid since the
* validity of both vertices does not overlap, are discarded.<p>
* Note that this will also remove dangling edges, in the same way {@link #verify()} would.
*
* @return This graph with invalid or dangling edges removed.
*/
default TemporalGraph updateEdgeValidity() {
return callForGraph(new VerifyAndUpdateEdgeValidity());
}
}