TemporalGraphLayoutFactory.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.layout;
import org.apache.flink.api.java.DataSet;
import org.gradoop.flink.model.api.layouts.LogicalGraphLayout;
import org.gradoop.flink.model.api.layouts.LogicalGraphLayoutFactory;
import org.gradoop.flink.model.impl.functions.graphcontainment.AddToGraph;
import org.gradoop.temporal.model.impl.pojo.TemporalEdge;
import org.gradoop.temporal.model.impl.pojo.TemporalGraphHead;
import org.gradoop.temporal.model.impl.pojo.TemporalVertex;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static java.util.Objects.requireNonNull;
/**
* Factory responsible for creating temporal GVE graph layouts.
*/
public class TemporalGraphLayoutFactory extends TemporalBaseLayoutFactory
implements LogicalGraphLayoutFactory<TemporalGraphHead, TemporalVertex, TemporalEdge> {
@Override
public LogicalGraphLayout<TemporalGraphHead, TemporalVertex, TemporalEdge> fromDataSets(
DataSet<TemporalVertex> vertices) {
return fromDataSets(vertices, createEdgeDataSet(Collections.emptyList()));
}
@Override
public LogicalGraphLayout<TemporalGraphHead, TemporalVertex, TemporalEdge> fromDataSets(
DataSet<TemporalVertex> vertices, DataSet<TemporalEdge> edges) {
requireNonNull(vertices, "Vertex DataSet is null");
requireNonNull(edges, "Edge DataSet is null");
TemporalGraphHead graphHead = getGraphHeadFactory().createGraphHead();
DataSet<TemporalGraphHead> graphHeadSet = getConfig().getExecutionEnvironment()
.fromElements(graphHead);
// update vertices and edges with new graph head id
vertices = vertices
.map(new AddToGraph<>(graphHead))
.withForwardedFields("id;label;properties;transactionTime;validTime");
edges = edges
.map(new AddToGraph<>(graphHead))
.withForwardedFields("id;sourceId;targetId;label;properties;transactionTime;validTime");
return new TemporalGVELayout(graphHeadSet, vertices, edges);
}
@Override
public LogicalGraphLayout<TemporalGraphHead, TemporalVertex, TemporalEdge> fromDataSets(
DataSet<TemporalGraphHead> graphHead,
DataSet<TemporalVertex> vertices,
DataSet<TemporalEdge> edges) {
requireNonNull(graphHead, "Temporal graphHead DataSet is null.");
requireNonNull(vertices, "Temporal vertex DataSet is null.");
requireNonNull(edges, "Temporal edge DataSet is null.");
return new TemporalGVELayout(graphHead, vertices, edges);
}
/**
* Creates a graph layout from the given datasets indexed by label.
*
* @param graphHeads 1-element Mapping from label to GraphHead DataSet
* @param vertices Mapping from label to vertex dataset
* @param edges Mapping from label to edge dataset
* @return a temporal indexed graph layout
*/
@Override
public LogicalGraphLayout<TemporalGraphHead, TemporalVertex, TemporalEdge> fromIndexedDataSets(
Map<String, DataSet<TemporalGraphHead>> graphHeads,
Map<String, DataSet<TemporalVertex>> vertices, Map<String, DataSet<TemporalEdge>> edges) {
return new TemporalIndexedLayout(graphHeads, vertices, edges);
}
@Override
public LogicalGraphLayout<TemporalGraphHead, TemporalVertex, TemporalEdge> fromCollections(
TemporalGraphHead graphHead, Collection<TemporalVertex> vertices,
Collection<TemporalEdge> edges) {
requireNonNull(vertices, "Temporal vertex collection is null.");
List<TemporalGraphHead> graphHeads = graphHead == null ? Collections.emptyList() :
Collections.singletonList(graphHead);
if (edges == null) {
edges = Collections.emptyList();
}
return fromDataSets(
createGraphHeadDataSet(graphHeads),
createVertexDataSet(vertices),
createEdgeDataSet(edges));
}
@Override
public LogicalGraphLayout<TemporalGraphHead, TemporalVertex, TemporalEdge> fromCollections(
Collection<TemporalVertex> vertices, Collection<TemporalEdge> edges) {
requireNonNull(vertices, "Temporal vertex collection is null.");
requireNonNull(edges, "Temporal edge collection is null.");
return fromDataSets(createVertexDataSet(vertices), createEdgeDataSet(edges));
}
@Override
public LogicalGraphLayout<TemporalGraphHead, TemporalVertex, TemporalEdge> createEmptyGraph() {
return fromCollections(null, Collections.emptyList(), Collections.emptyList());
}
}