MergeEmbeddings.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.matching.single.cypher.operators.join.functions;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.pojos.Embedding;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.operators.join.JoinEmbeddings;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.ToIntFunction;
import java.util.stream.IntStream;
/**
* Given two input embeddings, the function merges them according to the given parameters and
* constraints.
*
* The constraints for merging are defined at {@link JoinEmbeddings}.
*/
public class MergeEmbeddings implements
FlatJoinFunction<Embedding, Embedding, Embedding>,
FlatMapFunction<Tuple2<Embedding, Embedding>, Embedding> {
/**
* Reduce object instantiations.
*/
protected final Embedding reuseEmbedding;
/**
* Non-Join columns from the right side.
*/
private final int[] nonJoinColumnsRight;
/**
* Number of join columns of the right side.
*/
private final int joinColumnsRightSize;
/**
* Vertex columns of the left embedding that need to have distinct id values.
*/
private final int[] distinctVertexColumnsLeft;
/**
* Vertex columns of the right embedding that need to have distinct id values.
*/
private final int[] distinctVertexColumnsRight;
/**
* Edge columns of the left embedding that need to have distinct id values.
*/
private final int[] distinctEdgeColumnsLeft;
/**
* Edge columns of the right embedding that need to have distinct id values.
*/
private final int[] distinctEdgeColumnsRight;
/**
* Flag, if vertex distinctiveness needs to be checked.
*/
private final boolean checkDistinctVertices;
/**
* Flag, if vertex distinctiveness needs to be checked.
*/
private final boolean checkDistinctEdges;
/**
* Creates a new UDF instance.
*
* @param rightColumns number of columns in the right embedding
* @param joinColumnsRight join columns of the right side
* @param distinctVertexColumnsLeft distinct vertex columns of the left embedding
* @param distinctVertexColumnsRight distinct vertex columns of the right embedding
* @param distinctEdgeColumnsLeft distinct edge columns of the left embedding
* @param distinctEdgeColumnsRight distinct edge columns of the right embedding
*/
public MergeEmbeddings(int rightColumns,
List<Integer> joinColumnsRight,
List<Integer> distinctVertexColumnsLeft,
List<Integer> distinctVertexColumnsRight,
List<Integer> distinctEdgeColumnsLeft,
List<Integer> distinctEdgeColumnsRight) {
this.nonJoinColumnsRight = IntStream.range(0, rightColumns)
.filter(col -> !joinColumnsRight.contains(col))
.toArray();
this.joinColumnsRightSize = joinColumnsRight.size();
ToIntFunction<Integer> f = i -> i;
this.distinctVertexColumnsLeft = distinctVertexColumnsLeft.stream().mapToInt(f).toArray();
this.distinctVertexColumnsRight = distinctVertexColumnsRight.stream().mapToInt(f).toArray();
this.distinctEdgeColumnsLeft = distinctEdgeColumnsLeft.stream().mapToInt(f).toArray();
this.distinctEdgeColumnsRight = distinctEdgeColumnsRight.stream().mapToInt(f).toArray();
this.checkDistinctVertices = distinctVertexColumnsLeft.size() > 0 ||
distinctVertexColumnsRight.size() > 0;
this.checkDistinctEdges = distinctEdgeColumnsLeft.size() > 0 ||
distinctEdgeColumnsRight.size() > 0;
this.reuseEmbedding = new Embedding();
}
@Override
public void join(Embedding left, Embedding right, Collector<Embedding> out) throws Exception {
if (isValid(left, right)) {
buildEmbedding(left, right);
out.collect(reuseEmbedding);
}
}
@Override
public void flatMap(Tuple2<Embedding, Embedding> value, Collector<Embedding> out)
throws Exception {
join(value.f0, value.f1, out);
}
/**
* Checks if the merged embedding would hold under morphism setting
* @param left left embedding
* @param right right embedding
* @return true if the morphism condition holds
*/
protected boolean isValid(Embedding left, Embedding right) {
boolean collect = false;
// Vertex-Homomorphism + Edge-Homomorphism
if (!checkDistinctVertices && !checkDistinctEdges) {
collect = true;
// Vertex-Homomorphism + Edge-Isomorphism
} else if (!checkDistinctVertices) {
if (isDistinct(distinctEdgeColumnsLeft, distinctEdgeColumnsRight, left, right)) {
collect = true;
}
// Vertex-Isomorphism + Edge-Isomorphism
} else {
if (isDistinct(distinctVertexColumnsLeft, distinctVertexColumnsRight, left, right) &&
isDistinct(distinctEdgeColumnsLeft, distinctEdgeColumnsRight, left, right)) {
collect = true;
}
}
return collect;
}
/**
* Merges left and right embeddings into {@link MergeEmbeddings#reuseEmbedding}
* @param left left embedding
* @param right right embedding
*/
protected void buildEmbedding(Embedding left, Embedding right) {
reuseEmbedding.setIdData(mergeIdData(left, right));
reuseEmbedding.setPropertyData(mergePropertyData(left, right));
reuseEmbedding.setIdListData(mergeIdListData(left, right));
}
/**
* Checks if both given embeddings contain distinct id values at all specified columns.
*
* @param columnsLeft left columns to include in check for uniqueness
* @param columnsRight right columns to include in check for uniqueness
* @param left left embedding
* @param right right embedding
* @return true, if both embeddings contain unique id values for all specified columns
*/
private boolean isDistinct(int[] columnsLeft, int[] columnsRight,
Embedding left, Embedding right) {
Set<GradoopId> ids = new HashSet<>(left.size() + right.size());
return isDistinct(ids, columnsLeft, left) && isDistinct(ids, columnsRight, right);
}
/**
* Checks if the specified embeddings contains distinct ids at the specified columns.
*
* @param ids used to track uniqueness
* @param columns columns to check for uniqueness
* @param embedding embedding to check
* @return true, if the embedding contains distinct Ids at the specified columns
*/
private boolean isDistinct(Set<GradoopId> ids, int[] columns, Embedding embedding) {
boolean isDistinct = true;
for (int column : columns) {
isDistinct = ids.addAll(embedding.getIdAsList(column));
if (!isDistinct) {
break;
}
}
return isDistinct;
}
/**
* Merges the idData columns of left and right
* All entries of left are kept as well as all right entries which aren't join columns
*
* @param left the left hand side embedding
* @param right the right hand side embedding
* @return the merged data represented as byte array
*/
private byte[] mergeIdData(Embedding left, Embedding right) {
byte[] newIdData = new byte[
left.getIdData().length +
right.getIdData().length -
(joinColumnsRightSize * (Embedding.ID_ENTRY_SIZE))
];
int offset = left.getIdData().length;
System.arraycopy(left.getIdData(), 0, newIdData, 0, offset);
for (int i : nonJoinColumnsRight) {
System.arraycopy(right.getRawIdEntry(i), 0, newIdData, offset, Embedding.ID_ENTRY_SIZE);
offset += Embedding.ID_ENTRY_SIZE;
}
return newIdData;
}
/**
* Merges the propertyData columns of the left and right embeddings
* All entries from both sides are kept.
*
* @param left the left hand side embedding
* @param right the right hand side embedding
* @return the merged data represented as byte array
*/
private byte[] mergePropertyData(Embedding left, Embedding right) {
return ArrayUtils.addAll(left.getPropertyData(), right.getPropertyData());
}
/**
* Merges the idListData columns of the left and right embeddings
* All entries from both sides are kept.
*
* @param left the left hand side embedding
* @param right the right hand side embedding
* @return the merged data represented as byte array
*/
private byte[] mergeIdListData(Embedding left, Embedding right) {
return ArrayUtils.addAll(left.getIdListData(), right.getIdListData());
}
}