Business process monitoring Camunda



Hi, Habr.



My name is Anton and I am a technical lead at DomClick . I create and maintain microservices that allow the DomClick infrastructure to exchange data with the internal services of Sberbank.



This is a continuation of a series of articles about our experience with the Camunda business process diagram engine . The previous article was devoted to the development of a plugin for Bitbucket that allows you to view changes in BPMN schemas. Today I will talk about monitoring projects that use Camunda, how using third-party tools (in our case, this is the Elasticsearch stack from Kibana and Grafana ), as well as the "native" for Camunda - Cockpit . I will describe the difficulties that arose when using Cockpit, and our solutions.



When you have a lot of microservices, you want to know everything about their work and current status: the more monitoring, the more confident you feel both in regular and emergency situations, during release, and so on. We use the Elasticsearch stack: Kibana and Grafana as monitoring tools. In Kibana, we look at the logs, and in Grafana - the metrics. The database also contains historical data on Camunda processes. It would seem that this should be enough to understand whether the service is working normally, and if not, then why. The catch is that the data has to be viewed in three different places, and they do not always have a clear connection with each other. Parsing and analyzing an incident can be time-consuming. In particular, for the analysis of data from the database: Camunda has a far from obvious data scheme, it stores some variables in a serialized form. In theory,Cockpit, a Camunda tool for monitoring business processes, can make the task easier.





Cockpit interface.



The main problem is that Cockpit cannot work with a custom URL. There are many requests about this on their forum, but so far there is no such functionality out of the box. The only way out is to do it yourself. Cockpit has Sring Boot autoconfiguration CamundaBpmWebappAutoConfiguration



, so you need to replace it with your own. We are interested in the CamundaBpmWebappInitializer



main bean that initializes the Cockpit web filters and servlets.



We need to pass to the main filter ( LazyProcessEnginesFilter



) information about the URL at which it will work, and in ResourceLoadingProcessEnginesFilter



- information about the URL at which it will serve JS and CSS resources.



To do this, in our implementation, CamundaBpmWebappInitializer



change the line:



registerFilter("Engines Filter", LazyProcessEnginesFilter::class.java, "/api/*", "/app/*")

      
      





on:



registerFilter("Engines Filter", CustomLazyProcessEnginesFilter::class.java, singletonMap("servicePath", servicePath), *urlPatterns)

      
      





servicePath



Is our custom URL. In the very same we CustomLazyProcessEnginesFilter



indicate our implementation ResourceLoadingProcessEnginesFilter



:



class CustomLazyProcessEnginesFilter:
       LazyDelegateFilter<ResourceLoaderDependingFilter>
       (CustomResourceLoadingProcessEnginesFilter::class.java)

      
      





In CustomResourceLoadingProcessEnginesFilter



addition servicePath



to all links to resources that we plan to give to the client side:



override fun replacePlaceholder(
       data: String,
       appName: String,
       engineName: String,
       contextPath: String,
       request: HttpServletRequest,
       response: HttpServletResponse
) = data.replace(APP_ROOT_PLACEHOLDER, "$contextPath$servicePath")
           .replace(BASE_PLACEHOLDER,
                   String.format("%s$servicePath/app/%s/%s/", 
contextPath, appName, engineName))
           .replace(PLUGIN_PACKAGES_PLACEHOLDER,
                   createPluginPackagesString(appName, contextPath))
           .replace(PLUGIN_DEPENDENCIES_PLACEHOLDER,
                   createPluginDependenciesString(appName))

      
      





Now we can tell our Cockpit at which URL it should listen for requests and give resources.



But it can't be that simple, can it? In our case, Cockpit is not able to work out of the box on multiple instances of the application (for example, in Kubernetes pods), since instead of OAuth2 and JWT, the good old jsessionid is used, which is stored in the local cache. This means that if you try to log into Cockpit connected to Camunda, launched in several instances at once, having the same jsessionid issued to it, then with each request for resources from the client, you can get a 401 error with probability x, where x = (1 - 1 / number_pods). What can you do about it? Cockpit has the same CamundaBpmWebappInitializer



your Authentication Filter is declared, in which all work with tokens takes place; you need to replace it with your own. In it, we take jsessionid from the session cache, save it to the database if it is an authorization request, or check its validity against the database in other cases. Done, now we can watch incidents by business processes through the convenient Cockpit graphical interface, where you can immediately see the stacktrace errors and variables that the process had at the time of the incident.



And in cases where the cause of the incident is clear from the stacktrace of the exception, Cockpit allows you to reduce the time to analyze the incident to 3-5 minutes: I went in, looked at the incidents in the process, looked at the stacktrace, variables, and voila - the incident was sorted out, we put a bug in JIRA and drove on. But what if the situation is a little more complicated, the stacktrace is just a consequence of an earlier error, or the process ended without creating an incident at all (that is, technically everything went well, but, from the point of view of business logic, the wrong data was transferred, or the process went along the wrong branch scheme). In this case, you need to go to Kibana again, look at the logs and try to connect them to the Camunda processes, which again takes a lot of time. Of course, you can add the UUID of the current process and the ID of the current BPMN schema element (activityId) to each log, but this requires a lot of manual work,clutters up the codebase, complicates code review. This entire process can be automated.



The Sleuth project allows tracing logs with a unique identifier (in our case, the process UUID). Setting up the Sleuth context is described in detail in the documentation, here I will only show you how to start it in Camunda.



First, you need to register customPreBPMNParseListeners



with the current processEngine



Camunda. In the listener, override the methods parseStartEvent



(add a listener to the start event of the top-level process) and parseServiceTask



(add a listener to the start event ServiceTask



).



In the first case, we create a Sleuth context:



customContext[X_B_3_TRACE_ID] = businessKey
customContext[X_B_3_SPAN_ID] = businessKeyHalf
customContext[X_B_3_PARENT_SPAN_ID] = businessKeyHalf
customContext[X_B_3_SAMPLED] = "0" 
val contextFlags: TraceContextOrSamplingFlags = tracing.propagation()
       .extractor(OrcGetter())
       .extract(customContext)
val newSpan: Span = tracing.tracer().nextSpan(contextFlags)
tracing.currentTraceContext().newScope(newSpan.context())

      
      





... and save it to a business process variable:



execution.setVariable(TRACING_CONTEXT, sleuthService.tracingContextHeaders)

      
      





In the second case, we restore it from this variable:



val storedContext = execution
       .getVariableTyped<ObjectValue>(TRACING_CONTEXT)
       .getValue(HashMap::class.java) as HashMap<String?, String?>
val contextFlags: TraceContextOrSamplingFlags = tracing.propagation()
       .extractor(OrcGetter())
       .extract(storedContext)
val newSpan: Span = tracing.tracer().nextSpan(contextFlags)
tracing.currentTraceContext().newScope(newSpan.context())

      
      





We need to trace the logs along with additional parameters such as activityId



(ID of the current BPMN element), activityName



(its business name) and scenarioId



(ID of the business process diagram). This feature appeared only with the release of Sleuth 3.



For each parameter, you need to declare BaggageField



:



companion object {
   val HEADER_BUSINESS_KEY = BaggageField.create("HEADER_BUSINESS_KEY")
   val HEADER_SCENARIO_ID = BaggageField.create("HEADER_SCENARIO_ID")
   val HEADER_ACTIVITY_NAME = BaggageField.create("HEADER_ACTIVITY_NAME")
   val HEADER_ACTIVITY_ID = BaggageField.create("HEADER_ACTIVITY_ID")
}

      
      





Then declare three beans to handle these fields:



@Bean
open fun propagateBusinessProcessLocally(): BaggagePropagationCustomizer =
       BaggagePropagationCustomizer { fb ->
           fb.add(SingleBaggageField.local(HEADER_BUSINESS_KEY))
           fb.add(SingleBaggageField.local(HEADER_SCENARIO_ID))
           fb.add(SingleBaggageField.local(HEADER_ACTIVITY_NAME))
           fb.add(SingleBaggageField.local(HEADER_ACTIVITY_ID))
       }

/** [BaggageField.updateValue] now flushes to MDC  */
@Bean
open fun flushBusinessProcessToMDCOnUpdate(): CorrelationScopeCustomizer =
       CorrelationScopeCustomizer { builder ->
           builder.add(SingleCorrelationField.newBuilder(HEADER_BUSINESS_KEY).flushOnUpdate().build())
           builder.add(SingleCorrelationField.newBuilder(HEADER_SCENARIO_ID).flushOnUpdate().build())
           builder.add(SingleCorrelationField.newBuilder(HEADER_ACTIVITY_NAME).flushOnUpdate().build())
           builder.add(SingleCorrelationField.newBuilder(HEADER_ACTIVITY_ID).flushOnUpdate().build())
       }

/** [.BUSINESS_PROCESS] is added as a tag only in the first span.  */
@Bean
open fun tagBusinessProcessOncePerProcess(): SpanHandler =
       object : SpanHandler() {
           override fun end(context: TraceContext, span: MutableSpan, cause: Cause): Boolean {
               if (context.isLocalRoot && cause == Cause.FINISHED) {
                   Tags.BAGGAGE_FIELD.tag(HEADER_BUSINESS_KEY, context, span)
                   Tags.BAGGAGE_FIELD.tag(HEADER_SCENARIO_ID, context, span)
                   Tags.BAGGAGE_FIELD.tag(HEADER_ACTIVITY_NAME, context, span)
                   Tags.BAGGAGE_FIELD.tag(HEADER_ACTIVITY_ID, context, span)
               }
               return true
           }
       }

      
      





Then we can save additional fields to the Sleuth context:



HEADER_BUSINESS_KEY.updateValue(businessKey)
HEADER_SCENARIO_ID.updateValue(scenarioId)
HEADER_ACTIVITY_NAME.updateValue(activityName)
HEADER_ACTIVITY_ID.updateValue(activityId)

      
      





When we can see the logs separately for each business process by its key, the analysis of incidents is much faster. True, you still have to switch between Kibana and Cockpit, that would be to combine them within one UI.



And there is such an opportunity. Cockpit supports custom extensions - plugins, Kibana has a Rest API and two client libraries for working with it: elasticsearch-rest-low-level-client and elasticsearch-rest-high-level-client .



The plugin is a Maven project inherited from the camunda-release-parent artifact, with a Jax-RS backend and an AngularJS frontend. Yes, AngularJS, not Angular.



Cockpit has detailed documentation on how to write plugins for it.



I will only clarify that to display logs on the frontend, we are interested in the tab-panel on the Process Definition information page (cockpit.processDefinition.runtime.tab) and the Process Instance view page (cockpit.processInstance.runtime.tab). We register our components for them:



ViewsProvider.registerDefaultView('cockpit.processDefinition.runtime.tab', {
   id: 'process-definition-runtime-tab-log',
   priority: 20,
   label: 'Logs',
   url: 'plugin://log-plugin/static/app/components/process-definition/processDefinitionTabView.html'
});

ViewsProvider.registerDefaultView('cockpit.processInstance.runtime.tab', {
   id: 'process-instance-runtime-tab-log',
   priority: 20,
   label: 'Logs',
   url: 'plugin://log-plugin/static/app/components/process-instance/processInstanceTabView.html'
});

      
      





Cockpit has a UI component for displaying information in a tabular form, however, none of the documentation says about it, information about it and its use can be found only by reading the Cockpit source code. In short, using the component looks like this:



<div cam-searchable-area (1)
    config="searchConfig" (2)
    on-search-change="onSearchChange(query, pages)" (3)
    loading-state="’Loading...’" (4)
    text-empty="Not found"(5)
    storage-group="'ANU'"
    blocked="blocked">
   <div class="col-lg-12 col-md-12 col-sm-12">
       <table class="table table-hover cam-table">
           <thead cam-sortable-table-header (6)
                  default-sort-by="time"
                  default-sort-order="asc" (7)
                  sorting-id="admin-sorting-logs"
                  on-sort-change="onSortChanged(sorting)"
                  on-sort-initialized="onSortInitialized(sorting)" (8)>
           <tr>
               <!-- headers -->
           </tr>
           </thead>
           <tbody>
           <!-- table content -->
           </tbody>
       </table>
   </div>
</div>

      
      





  1. Attribute to declare the search component.
  2. Component configuration. Here we have the following structure:



    tooltips = { //     , 
                       //         
       'inputPlaceholder': 'Add criteria',
       'invalid': 'This search query is not valid',
       'deleteSearch': 'Remove search',
       'type': 'Type',
       'name': 'Property',
       'operator': 'Operator',
       'value': 'Value'
    },
    operators =  { //,   ,    
         'string': [
           {'key': 'eq',  'value': '='},
           {'key': 'like','value': 'like'}
       ]
    },
    types = [// ,     ,    businessKey
       {
           'id': {
               'key': 'businessKey',
               'value': 'Business Key'
           },
           'operators': [
               {'key': 'eq', 'value': '='}
           ],
           enforceString: true
       }
    ]
    
          
          





  3. The data search function is used both when changing search parameters and during the initial download.
  4. What message to display while loading data.
  5. What message to display if nothing was found.
  6. Attribute to declare the lookup data mapping table.
  7. Default sort field and type.
  8. Sorting functions.


On the backend, you need to configure the client to work with the Kibana API. To do this, just use the RestHighLevelClient from the elasticsearch-rest-high-level-client library. There, specify the path to Kibana, data for authentication: login and password, and if the encryption protocol is used, then you need to specify the appropriate implementation of X509TrustManager.



To form a search query, we use it QueryBuilders.boolQuery()



, it allows you to compose complex queries of the form:



val boolQueryBuilder = QueryBuilders.boolQuery();

KibanaConfiguration.ADDITIONAL_QUERY_PARAMS.forEach((key, value) ->
       boolQueryBuilder.filter()
               .add(QueryBuilders.matchPhraseQuery(key, value))
);
if (!StringUtils.isEmpty(businessKey)) {
   boolQueryBuilder.filter()
           .add(QueryBuilders.matchPhraseQuery(KibanaConfiguration.BUSINESS_KEY, businessKey));
}
if (!StringUtils.isEmpty(procDefKey)) {
   boolQueryBuilder.filter()
           .add(QueryBuilders.matchPhraseQuery(KibanaConfiguration.SCENARIO_ID, procDefKey));
}
if (!StringUtils.isEmpty(activityId)) {
   boolQueryBuilder.filter()
           .add(QueryBuilders.matchPhraseQuery(KibanaConfiguration.ACTIVITY_ID, activityId));
}

      
      





Now, right from Cockpit, we can view the logs separately for each process and for each activity. It looks like this:





Tab for viewing logs in the Cockpit interface.



But we cannot stop there, in the plans of the idea for the development of the project. First, expand your search capabilities. Often, at the beginning of parsing an incident, there is no business key process on hand, but there is information about other key parameters, and it would be nice to add the ability to customize the search for them. Also, the table in which information about the logs is displayed is not interactive: there is no way to go to the required Process Instance by clicking in the corresponding row of the table. In short, there is room for development. (As soon as the weekend is over, I'll post a link to the project's Github, and invite everyone interested there.)



All Articles