ValidateNeighborhood.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.functions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.tuples.IdPair;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.flink.model.impl.operators.matching.common.query.QueryHandler;
import org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.tuples.Deletion;
import org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.tuples.FatVertex;
import org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.util.MessageType;
import org.gradoop.gdl.model.Edge;
import java.util.Collection;
import java.util.List;
import java.util.Set;
/**
* Validates the neighborhood of a {@link FatVertex} according to the query.
* <p>
* For each query vertex candidate, the flatMap function checks if the vertex
* has the corresponding incident incoming and outgoing edges. If this is not
* the case, the vertex sends delete messages to all of its neighbors.
* <p>
* {@code fatVertex -> [deletion]}
* <p>
* {@code f0->f1: vertexId -> senderId}
*/
@FunctionAnnotation.ForwardedFields("f0->f1")
public class ValidateNeighborhood extends RichFlatMapFunction<FatVertex, Deletion> {
/**
* serial version uid
*/
private static final long serialVersionUID = 42L;
/**
* GDL query
*/
private final String query;
/**
* Query handler
*/
private transient QueryHandler queryHandler;
/**
* Reduce instantiations
*/
private final Deletion reuseDeletion;
/**
* Constructor
*
* @param query GDL query
*/
public ValidateNeighborhood(String query) {
this.query = query;
this.reuseDeletion = new Deletion();
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
queryHandler = new QueryHandler(query);
}
@Override
public void flatMap(FatVertex fatVertex, Collector<Deletion> collector) throws Exception {
List<Long> deletions = Lists
.newArrayListWithCapacity(fatVertex.getCandidates().size());
Set<Long> outgoingEdgeCandidates = getOutgoingEdgeCandidates(fatVertex);
for (Long vQ : fatVertex.getCandidates()) {
if (!isValidCandidate(vQ, fatVertex, outgoingEdgeCandidates)) {
deletions.add(vQ);
}
}
if (!deletions.isEmpty()) {
sendDeletions(deletions, fatVertex, collector);
}
}
/**
* Validates if the given query vertex candidate is a valid candidate for the vertex.
*
* @param vQ vertex candidate
* @param fatVertex vertex
* @param outgoingEdgeCandidates outgoing edge candidates
* @return true, if {@code fatVertex} is a valid embedding for {@code vQ}
*/
private boolean isValidCandidate(Long vQ, FatVertex fatVertex,
Set<Long> outgoingEdgeCandidates) {
boolean isValidChild = isValidChild(vQ, fatVertex);
boolean isValidParent = true;
if (isValidChild) {
isValidParent = isValidParent(vQ, outgoingEdgeCandidates);
}
return isValidChild && isValidParent;
}
/**
* Validate incoming edge candidates relating to the given vertex candidate.
*
* @param vQ vertex candidate
* @param fatVertex vertex
* @return true, if {@code fatVertex} is a valid child for {@code vQ}
*/
private boolean isValidChild(Long vQ, FatVertex fatVertex) {
boolean isValidChild = true;
Collection<Edge> inE = queryHandler.getEdgesByTargetVertexId(vQ);
if (inE != null) {
for (Edge eQIn : inE) {
if (fatVertex.getIncomingCandidateCounts()[(int) eQIn.getId()] == 0) {
isValidChild = false;
break;
}
}
}
return isValidChild;
}
/**
* Validate outgoing edge candidates relating to the given vertex candidate.
*
* @param vQ vertex candidate
* @param outgoingEdgeCandidates outgoing edge candidates
* @return true, if {@code fatVertex} is a valid parent for {@code vQ}
*/
private boolean isValidParent(Long vQ, Set<Long> outgoingEdgeCandidates) {
boolean isValidParent = true;
Collection<Edge> outE = queryHandler.getEdgesBySourceVertexId(vQ);
if (outE != null) {
for (Edge eQOut : outE) {
if (!outgoingEdgeCandidates.contains(eQOut.getId())) {
isValidParent = false;
break;
}
}
}
return isValidParent;
}
/**
* Sends delete messages to the vertex' neighborhood including itself.
*
* @param deletions vertex candidates that need to be deleted
* @param fatVertex current vertex
* @param collector message collector
*/
private void sendDeletions(List<Long> deletions, FatVertex fatVertex,
Collector<Deletion> collector) {
reuseDeletion.setSenderId(fatVertex.getVertexId());
boolean toBeRemoved = deletions.size() == fatVertex.getCandidates().size();
for (Long deletion : deletions) {
reuseDeletion.setDeletion(deletion);
sendToSelf(fatVertex, collector);
sendToParents(fatVertex, collector, toBeRemoved);
sendToChildren(fatVertex, collector, toBeRemoved);
}
}
/**
* Sends delete message to itself.
*
* @param fatVertex fat vertex
* @param collector message collector
*/
private void sendToSelf(FatVertex fatVertex, Collector<Deletion> collector) {
reuseDeletion.setMessageType(MessageType.FROM_SELF);
reuseDeletion.setRecipientId(fatVertex.getVertexId());
collector.collect(reuseDeletion);
}
/**
* Sends delete messages to all parents.
*
* @param fatVertex fat vertex
* @param collector message collector
* @param toBeRemoved true, if {@code fatVertex} will be removed
*/
private void sendToParents(FatVertex fatVertex, Collector<Deletion> collector,
boolean toBeRemoved) {
reuseDeletion.setMessageType(toBeRemoved ?
MessageType.FROM_CHILD_REMOVE : MessageType.FROM_CHILD);
for (GradoopId gradoopId : fatVertex.getParentIds()) {
reuseDeletion.setRecipientId(gradoopId);
collector.collect(reuseDeletion);
}
}
/**
* Sends delete messages to all children.
*
* @param fatVertex fat vertex
* @param collector message collector
* @param toBeRemoved true, if {@code fatVertex} will be removed
*/
private void sendToChildren(FatVertex fatVertex,
Collector<Deletion> collector, boolean toBeRemoved) {
reuseDeletion.setMessageType(toBeRemoved ?
MessageType.FROM_PARENT_REMOVE : MessageType.FROM_PARENT);
for (IdPair idPair : fatVertex.getEdgeCandidates().keySet()) {
reuseDeletion.setRecipientId(idPair.getTargetId());
collector.collect(reuseDeletion);
}
}
/**
* Returns all outgoing edge candidates for the given vertex.
*
* @param fatVertex fat vertex
* @return all outgoing edge candidates of {@code fatVertex}
*/
private Set<Long> getOutgoingEdgeCandidates(FatVertex fatVertex) {
Set<Long> outgoingEdgeCandidates = Sets.newHashSet();
for (boolean[] candidates : fatVertex.getEdgeCandidates().values()) {
for (int i = 0; i < candidates.length; i++) {
if (candidates[i]) {
outgoingEdgeCandidates.add((long) i);
}
}
}
return outgoingEdgeCandidates;
}
}