RepresentationConverters.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.representation.transactional;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.flink.api.common.functions.MapFunction;
import org.gradoop.common.model.api.entities.Element;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.id.GradoopIdSet;
import org.gradoop.common.model.impl.pojo.EPGMEdge;
import org.gradoop.common.model.impl.pojo.EPGMGraphHead;
import org.gradoop.common.model.impl.pojo.EPGMVertex;
import org.gradoop.common.model.impl.properties.Properties;
import org.gradoop.flink.model.impl.layouts.transactional.tuples.GraphTransaction;
import org.gradoop.flink.representation.common.adjacencylist.AdjacencyListCell;
import org.gradoop.flink.representation.common.adjacencylist.AdjacencyListRow;
import java.util.Map;
import java.util.Set;
/**
* Util to convert among different graph representations.
*/
public class RepresentationConverters {
/**
* {@code transaction => adjacency list}
*
* @param transaction (g,V,E)
* @param edgeDataFactory edge data factory
* @param vertexDataFactory vertex data factory
*
* @param <ED> edge data
* @param <VD> vertex data
*
* @return adjacency list
* @throws Exception on failure
*/
public static <ED, VD> AdjacencyList<GradoopId, String, ED, VD> getAdjacencyList(
GraphTransaction transaction,
MapFunction<EPGMEdge, ED> edgeDataFactory,
MapFunction<EPGMVertex, VD> vertexDataFactory
) throws Exception {
Set<EPGMVertex> vertices = transaction.getVertices();
Set<EPGMEdge> edges = transaction.getEdges();
int vertexCount = vertices.size();
Map<GradoopId, String> labels =
Maps.newHashMapWithExpectedSize(1 + vertexCount + edges.size());
Map<GradoopId, Properties> properties =
Maps.newHashMap();
Map<GradoopId, AdjacencyListRow<ED, VD>> outgoingRows =
Maps.newHashMapWithExpectedSize(vertexCount);
Map<GradoopId, AdjacencyListRow<ED, VD>> incomingRows =
Maps.newHashMapWithExpectedSize(vertexCount);
Map<GradoopId, EPGMVertex> vertexIndex = Maps.newHashMapWithExpectedSize(vertexCount);
// VERTICES
for (EPGMVertex vertex : vertices) {
addLabelsAndProperties(vertex, labels, properties);
vertexIndex.put(vertex.getId(), vertex);
}
// EDGES
for (EPGMEdge edge : edges) {
addLabelsAndProperties(edge, labels, properties);
EPGMVertex source = vertexIndex.get(edge.getSourceId());
AdjacencyListRow<ED, VD> outgoingRow =
outgoingRows.computeIfAbsent(source.getId(), k -> new AdjacencyListRow<>());
VD sourceData = vertexDataFactory.map(source);
EPGMVertex target = vertexIndex.get(edge.getTargetId());
AdjacencyListRow<ED, VD> incomingRow =
incomingRows.computeIfAbsent(target.getId(), k -> new AdjacencyListRow<>());
VD targetData = vertexDataFactory.map(target);
ED edgeData = edgeDataFactory.map(edge);
outgoingRow.getCells().add(new AdjacencyListCell<>(edgeData, targetData));
incomingRow.getCells().add(new AdjacencyListCell<>(edgeData, sourceData));
}
return new AdjacencyList<>(
transaction.getGraphHead(), labels, properties, outgoingRows, incomingRows);
}
/**
* Adds label and properties of an EPGM element to id-label and id-properties maps.
*
* @param element EPGM element
* @param labels id-label map
* @param properties id-properties map
*/
private static void addLabelsAndProperties(
Element element,
Map<GradoopId, String> labels,
Map<GradoopId, Properties> properties
) {
labels.put(element.getId(), element.getLabel());
Properties propertyList = element.getProperties();
if (propertyList != null && !propertyList.isEmpty()) {
properties.put(element.getId(), propertyList);
}
}
/**
* {@code adjacency list => transaction}
*
* @param adjacencyList adjacency list
* @return transaction
*/
public static GraphTransaction getGraphTransaction(AdjacencyList<GradoopId, String, GradoopId,
GradoopId> adjacencyList) {
// GRAPH HEAD
EPGMGraphHead graphHead = adjacencyList.getGraphHead();
GradoopIdSet graphIds = GradoopIdSet.fromExisting(graphHead.getId());
Set<EPGMVertex> vertices = Sets.newHashSet();
Set<EPGMEdge> edges = Sets.newHashSet();
// VERTICES
for (Map.Entry<GradoopId, AdjacencyListRow<GradoopId, GradoopId>> entry :
adjacencyList.getOutgoingRows().entrySet()) {
GradoopId sourceId = entry.getKey();
Properties properties = adjacencyList.getProperties(sourceId);
String label = adjacencyList.getLabel(sourceId);
vertices.add(new EPGMVertex(sourceId, label, properties, graphIds));
// EDGES
for (AdjacencyListCell<GradoopId, GradoopId> cell : entry.getValue().getCells()) {
GradoopId edgeId = cell.getEdgeData();
label = adjacencyList.getLabel(edgeId);
properties = adjacencyList.getProperties(edgeId);
GradoopId targetId = cell.getVertexData();
edges.add(new EPGMEdge(edgeId, label, sourceId, targetId, properties, graphIds));
}
}
return new GraphTransaction(graphHead, vertices, edges);
}
}