Big / Bug Data: Analyzing Apache Flink Source Code

image1.png


Big Data applications process huge amounts of information, often in real time. Naturally, such applications must be highly reliable so that no error in the code could interfere with data processing. To achieve high reliability, it is necessary to closely monitor the quality of the code of projects developed for this area. The PVS-Studio static analyzer deals with this problem. Today, the Apache Flink project developed by the Apache Software Foundation, one of the leaders in the Big Data software market, was chosen as a test subject for the analyzer.



What is Apache Flink? It is an open-source framework for distributed processing of large amounts of data. It was developed as an alternative to Hadoop MapReduce in 2010 at the Technical University of Berlin. The framework is based on a distributed execution engine for batch and stream data processing applications. This engine is written in Java and Scala languages. Today Apache Flink can be used in projects written using Java, Scala, Python and even SQL.



Project analysis



Having downloaded the source code of the project, I started building the project with the command 'mvn clean package -DskipTests' specified in the instructions on GitHub . While the assembly was in progress, using the CLOC utility, I found out that there are 10838 Java files in the project, which have about 1.3 million lines of code. Moreover, there were already 3833 test Java files, which is more than 1/3 of all Java files. I also noticed that the project uses the FindBugs static code analyzer and the Cobertura utility, which provides information on code coverage by tests. With all of this in mind, it becomes clear that the Apache Flink developers carefully monitored code quality and test coverage during development.



After a successful build, I opened the project in IntelliJ IDEA and launched the analysis using the PVS-Studio for IDEA and Android Studio plugin . The analyzer warnings were distributed as follows:



  • 183 High;
  • 759 Medium;
  • 545 Low.


About 2/3 of the PVS-Studio analyzer triggers were assigned to test files. Considering this fact and the size of the project's codebase, we can say that the Apache Flink developers managed to keep the code quality at their best.



Having studied the analyzer warnings in more detail, I chose the most interesting ones in my opinion. So let's see what PVS-Studio managed to find in this project!





Just a little carelessness



V6001 There are identical sub-expressions 'processedData' to the left and to the right of the '==' operator. CheckpointStatistics.java (229)



@Override
public boolean equals(Object o) 
{
  ....
  CheckpointStatistics that = (CheckpointStatistics) o;
  return id == that.id &&
    savepoint == that.savepoint &&
    triggerTimestamp == that.triggerTimestamp &&
    latestAckTimestamp == that.latestAckTimestamp &&
    stateSize == that.stateSize &&
    duration == that.duration &&
    alignmentBuffered == that.alignmentBuffered &&
    processedData == processedData &&                // <=
    persistedData == that.persistedData &&
    numSubtasks == that.numSubtasks &&
    numAckSubtasks == that.numAckSubtasks &&
    status == that.status &&
    Objects.equals(checkpointType, that.checkpointType) &&
    Objects.equals(
      checkpointStatisticsPerTask, 
      that.checkpointStatisticsPerTask);
}
      
      





Against the background of other expressions in return, this error is not very striking. When overriding the equals method for the CheckpointStatistics class, the programmer made a mistake in the processedData == processedData expression , which is meaningless because it is always true. Similarly, the rest of the expression in return were to be compared field of the current object this and object That : processedData == that.processedData... This situation is one of the typical error patterns found in comparison functions, which are described in detail in the article " Evil lives in comparison functions ". So it turns out that just "a little inattention" broke the logic of checking the equivalence of objects of the CheckpointStatistics class .



Expression is always true



V6007 Expression 'input2.length> 0' is always true. Operator.java (283)



public static <T> Operator<T> createUnionCascade(Operator<T> input1, 
                                                 Operator<T>... input2) 
{
  if (input2 == null || input2.length == 0) 
  {
    return input1;                                // <=
  } 
  else if (input2.length == 1 && input1 == null) 
  {
    return input2[0];
  }
  ....
  if (input1 != null) 
  {
    ....
  } 
  else if (input2.length > 0 && input2[0] != null) // <=
  {
    ....
  } 
  else 
  {
    ....
  }
}
      
      





In this method, the analyzer turned out to be more attentive than a person, which it decided to report in its own peculiar manner, indicating that the expression input2.length> 0 will always be true. The reason is that if the length of the input2 array is 0, then the condition input2 == null || input2.length == 0 of the first if in the method will be true, and the execution of the method will be interrupted before reaching the line with the expression input2.length> 0 .



All-seeing analyzer



V6007 Expression 'slotSharingGroup == null' is always false. StreamGraphGenerator.java (510)



private <T> Collection<Integer> transformFeedback(....)
{
  ....
  String slotSharingGroup = determineSlotSharingGroup(null, allFeedbackIds);
  if (slotSharingGroup == null)
  {
    slotSharingGroup = "SlotSharingGroup-" + iterate.getId();
  }
  ....
}
      
      





The analyzer reported that slotSharingGroup == null is always false. This suggests that the determineSlotSharingGroup method will never return null . Is the analyzer so smart that it was able to calculate all the values ​​that this method can return? Let's better check everything ourselves:



public class StreamGraphGenerator 
{
  ....
  public static final String DEFAULT_SLOT_SHARING_GROUP = "default";
  ....
  private String determineSlotSharingGroup(String specifiedGroup, 
                                           Collection<Integer> inputIds) 
  {
    if (specifiedGroup != null)
    {
      return specifiedGroup; // <= 1
    }
    else
    {
      String inputGroup = null;
      for (int id: inputIds)
      {
        String inputGroupCandidate = streamGraph.getSlotSharingGroup(id);
        if (inputGroup == null)
        {
          inputGroup = inputGroupCandidate;
        }
        else if (!inputGroup.equals(inputGroupCandidate))
        {
          return DEFAULT_SLOT_SHARING_GROUP; // <= 2
        }
      }
      return inputGroup == null 
             ? DEFAULT_SLOT_SHARING_GROUP 
             : inputGroup; // <= 3
    }
  }
  ....
}
      
      





In the order we go through all return and see what can regain this method:



  • The first return will return the argument to the specifiedGroup method , but only if it is not null .
  • return for DEFAULT_SLOT_SHARING_GROUP, ;
  • return inputGroup, null. DEFAULT_SLOT_SHARING_GROUP.


It turns out that the analyzer was really able to calculate the impossibility of returning null from the determineSlotSharingGroup method and warned us about this, pointing out the meaninglessness of checking slotSharingGroup == null . And although this situation is not erroneous, such additional protection of the analyzer will be able to detect an error in some other case. For example, when you need a method to return null under certain conditions.



Collect them all



V6007 Expression 'currentCount <= lastEnd' is always true. CountSlidingWindowAssigner.java (75)



V6007 Expression 'lastStart <= currentCount' is always true. CountSlidingWindowAssigner.java (75)



@Override
public Collection<CountWindow> assignWindows(....) throws IOException 
{
  Long countValue = count.value();
  long currentCount = countValue == null ? 0L : countValue;
  count.update(currentCount + 1);
  long lastId = currentCount / windowSlide;
  long lastStart = lastId * windowSlide;
  long lastEnd = lastStart + windowSize - 1;
  List<CountWindow> windows = new ArrayList<>();
  while (lastId >= 0 && 
         lastStart <= currentCount && 
         currentCount <= lastEnd) 
  {
    if (lastStart <= currentCount && currentCount <= lastEnd) // <=
    {
      windows.add(new CountWindow(lastId));
    }
    lastId--;
    lastStart -= windowSlide;
    lastEnd -= windowSlide;
  }
  return windows;
}
      
      





The analyzer warns that the expressions currentCount <= lastEnd and lastStart <= currentCount are always true. And indeed, if you look at the condition of the while loop , then there are exactly the same expressions. This means that inside the loop these expressions will always be true, so all the objects of type CountWindow created in the loop will be added to the windows list . There are many options for the appearance of this meaningless check, and the first thing that comes to mind is either a refactoring artifact or a developer's safety net. But it can be a mistake, if you wanted to check something else ...



Incorrect argument order



V6029 Possible incorrect order of arguments passed to method: 'hasBufferForReleasedChannel', 'hasBufferForRemovedChannel'. NettyMessageClientDecoderDelegateTest.java (165), NettyMessageClientDecoderDelegateTest.java (166)



private void testNettyMessageClientDecoding(
       boolean hasEmptyBuffer,
       boolean hasBufferForReleasedChannel,
       boolean hasBufferForRemovedChannel) throws Exception 
{
  ....
  List<BufferResponse> messages = createMessageList (
    hasEmptyBuffer,
    hasBufferForReleasedChannel,
    hasBufferForRemovedChannel);
  ....
}
      
      





Java's lack of the ability to call a method with named parameters sometimes plays a cruel joke with developers. This is exactly what happened when the analyzer pointed to the createMessageList method . Looking at the definition of this method, it becomes clear that the hasBufferForRemovedChannel parameter must be passed to the method before the hasBufferForReleasedChannel parameter :



private List<BufferResponse> createMessageList(
  boolean hasEmptyBuffer,
  boolean hasBufferForRemovedChannel,
  boolean hasBufferForReleasedChannel) 
{
  ....
  if (hasBufferForReleasedChannel) {
    addBufferResponse(messages, 
                      releasedInputChannelId, 
                      Buffer.DataType.DATA_BUFFER, 
                      BUFFER_SIZE, 
                      seqNumber++);
  }
  if (hasBufferForRemovedChannel) {
    addBufferResponse(messages, 
                      new InputChannelID(), 
                      Buffer.DataType.DATA_BUFFER, 
                      BUFFER_SIZE, 
                      seqNumber++);
  }
  ....
  return messages;
}
      
      





However, when calling the method, the developer has mixed up the order of these arguments, which is why the logic of the createMessageList method will be broken if the values ​​of the mixed arguments differ.



Oh, this copy-paste



V6032 It is odd that the body of method 'seekToFirst' is fully equivalent to the body of another method 'seekToLast'. RocksIteratorWrapper.java (53), RocksIteratorWrapper.java (59)



public class RocksIteratorWrapper implements RocksIteratorInterface, Closeable {
  ....
  private RocksIterator iterator;
  ....

  @Override
  public void seekToFirst() {
    iterator.seekToFirst(); // <=
    status(); 
  }
  
  @Override
  public void seekToLast() {
    iterator.seekToFirst();  // <=
    status();
  }
  
  ....
}
      
      





The bodies of the seekToFirst and seekToLast methods are the same. Moreover, both methods are used in the code.



Something is unclean here! Indeed, if you look at what methods the iterator object has , it will become clear what error the analyzer helped to find:



public class RocksIterator extends AbstractRocksIterator<RocksDB>
{
  ....
}

public abstract class AbstractRocksIterator<....> extends ....
{
  ....
  public void seekToFirst() // <=
  {
    assert this.isOwningHandle();
    this.seekToFirst0(this.nativeHandle_);
  }
  
  public void seekToLast() // <=
  {
    assert this.isOwningHandle();
    this.seekToLast0(this.nativeHandle_);
  }
  ....
}
      
      





It turns out that the method seekToLast class RocksIteratorWrapper was created by copy-paste method seekToFirst the same class. However, for some reason, the developer forgot to replace the iterator 's seekToFirst method call with seekToLast .



Confusion with format strings



V6046 Incorrect format. A different number of format items is expected. Arguments not used: 1. UnsignedTypeConversionITCase.java (102)



public static void prepareMariaDB() throws IllegalStateException {
  ....
  if (!initDbSuccess) {
    throw new IllegalStateException(
      String.format(
        "Initialize MySQL database instance failed after {} attempts," + // <=
        " please open an issue.", INITIALIZE_DB_MAX_RETRY));
  }
}
      
      





The format strings of the String.format method and Java loggers are different. Unlike the format string of the String.format method , where argument substitutions are specified using the '%' character, the logger format strings use the '{}' character combination instead. Because of this confusion, this error occurred. As a format string, a string is passed to the String.format method , which, most likely, was copied from another place where it was used in some logger. As a result, the value of the INITIALIZE_DB_MAX_RETRY field will not be substituted in the IllegalStateException message. instead of '{}', and the person who catches or logs this exception will never know how many attempts to connect to the database were made.



Abnormal distribution



V6048 This expression can be simplified. Operand 'index' in the operation equals 0. CollectionUtil.java (76)



public static <T> Collection<List<T>> partition(Collection<T> elements, 
                                                int numBuckets) 
{
  Map<Integer, List<T>> buckets = new HashMap<>(numBuckets);
  
  int initialCapacity = elements.size() / numBuckets;

  int index = 0;
  for (T element : elements) 
  {
    int bucket = index % numBuckets;                                 // <=
    buckets.computeIfAbsent(bucket, 
                            key -> new ArrayList<>(initialCapacity))

           .add(element); 
  }

  return buckets.values();
}
      
      





The partition method splits the elements from the elements collection into multiple segments, and then returns those segments. However, due to the error pointed out by the analyzer, no separation will occur. The expression used to determine the segment number index% numBuckets will always be 0, because index is always 0. I originally thought that the code for this method was refactored, as a result of which they forgot to add an increment of the index variable in the for loop . But looking at the commitwhere this method was added, it turned out that this error came along with this method. Corrected version of the code:



public static <T> Collection<List<T>> partition(Collection<T> elements, 
                                                int numBuckets) 
{
  Map<Integer, List<T>> buckets = new HashMap<>(numBuckets);
  
  int initialCapacity = elements.size() / numBuckets;

  int index = 0;
  for (T element : elements) 
  {
    int bucket = index % numBuckets; 
    buckets.computeIfAbsent(bucket, 
                            key -> new ArrayList<>(initialCapacity))
           .add(element);
    index++;
  }

  return buckets.values();
}
      
      





Incompatible type



V6066 The type of object passed as argument is incompatible with the type of collection: String, ListStateDescriptor <NextTransactionalIdHint>. FlinkKafkaProducer.java (1083)



public interface OperatorStateStore 
{
  Set<String> getRegisteredStateNames();
}
public class FlinkKafkaProducer<IN> extends ....
{
  ....
  private static final 
  ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>
  NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR = ....;

  @Override
  public void initializeState(FunctionInitializationContext context).... 
  {
    ....
    if (context.getOperatorStateStore()
               .getRegisteredStateNames()
               .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR))    // <=
    {
       migrateNextTransactionalIdHindState(context);
    }
    ....
  }
}
      
      





The expression pointed to by the analyzer will always be false, which means that the call to the migrateNextTransactionalIdHindState method will never happen. How did it happen that someone is looking for an element of a completely different type in a collection of type Set <String> - ListStateDescriptor <FlinkKafkaProducer.NextTransactionalIdHint> ? Without the help of the analyzer, such an error would most likely have lived in the code for a very long time, since it does not strike the eye and it is simply impossible to find it without a thorough check of this method.



Non-atomic variable change



V6074 Non-atomic modification of volatile variable. Inspect 'currentNumAcknowledgedSubtasks'. PendingCheckpointStats.java (131)



boolean reportSubtaskStats(JobVertexID jobVertexId, SubtaskStateStats subtask) {
  TaskStateStats taskStateStats = taskStats.get(jobVertexId);

  if (taskStateStats != null && taskStateStats.reportSubtaskStats(subtask)) {
    currentNumAcknowledgedSubtasks++;                // <=
    latestAcknowledgedSubtask = subtask;

    currentStateSize += subtask.getStateSize();      // <=

    long processedData = subtask.getProcessedData();
    if (processedData > 0) {
      currentProcessedData += processedData;         // <=
    }

    long persistedData = subtask.getPersistedData();
    if (persistedData > 0) {
      currentPersistedData += persistedData;         // <=
    }
    return true;
  } else {
    return false;
  }
}
      
      





Plus 3 more analyzer warnings in the same method:



  • V6074 Non-atomic modification of volatile variable. Inspect 'currentStateSize'. PendingCheckpointStats.java (134)
  • V6074 Non-atomic modification of volatile variable. Inspect 'currentProcessedData'. PendingCheckpointStats.java (138)
  • V6074 Non-atomic modification of volatile variable. Inspect 'currentPersistedData'. PendingCheckpointStats.java (143)


The analyzer suggested that as many as 4 volatile fields in the method change non-atomic. And the analyzer, as always, turns out to be right, because the ++ and + = operations are , in fact, a sequence of several read-modify-write operations. As you know, the volatile value of a field is visible to all threads, which means that some of the field changes may be lost due to a race condition. You can read more detailed information about this in the description of the diagnostics.



Conclusion



In Big Data projects, reliability is one of the key requirements, therefore, the quality of the code in them must be closely monitored. Apache Flink developers were assisted in this by several tools, and they also wrote a significant number of tests. However, even under such conditions, the PVS-Studio analyzer was able to find errors. It is impossible to completely get rid of errors, but using various static code analysis tools on a regular basis will allow you to get closer to this ideal. Yes, exactly regularly. Only with regular use does static analysis show its effectiveness, which is described in more detail in this article .





If you want to share this article with an English-speaking audience, please use the translation link: Valery Komarov. Big / Bug Data: Analyzing the Apache Flink Source Code .



All Articles