There are a few issues around Mulesoft connectors, especially intercepting connection events: disconnects and connecting failures and, while Mule will log them in the default log, it is important that we can handle them too, to log them (say in an ELK) or alarm or take some other corrective action.
One of the issues is that connections may fail during startup (if the 3rd party is not up) and this will prevent the flows from being initialized properly. This is a particularly annoying issue because an obscure connection to a non-important system (like the ELK firehose itself) can hold up the entire application from starting up, which warrants the effort to alarm the connection failure, so you know your app is not even up.
Also, because of this, you cannot invoke any of the flows to do any processing when this happens, so if you had some alarming/logging flows, they're out of reach at this point, so we're left with having to do this in Java code. Here's how:
The configuration requires:
Here they are:
<spring:beans>
<spring:bean id="connectionElkNotifier" scope="singleton" init-method="init" class="com.razie.notify.ElkConnectionNotificationLogger">
<spring:property name="shouldFire" value="${audit.should.fire}" />
</spring:bean>
</spring:beans>
<!-- notification listeners from different connections -->
<notifications>
<notification-listener ref="connectionElkNotifier"/>
</notifications>
And finally, note the empty <reconnect-notifier/>
tag on the connector itself:
<wmq:connector name="WMQ.CoolService" hostName="${wmq.hostname}"
port="${wmq.port}" queueManager="${wmq.queueManager}" channel="${wmq.channel}"
username="${wmq.userid}" password="${wmq.password}"
transportType="CLIENT_MQ_TCPIP" specification="1.1"
disableTemporaryReplyToDestinations="true"
targetClient="JMS_COMPLIANT" validateConnections="true" doc:name="WMQ">
<reconnect-forever frequency="2000">
<!-- notifier needs to be here, to elk re-connections -->
<reconnect-notifier/>
</reconnect-forever>
</wmq:connector>
The only missing piece is now the notifier itself, here's a brief example:
public class ElkConnectionNotificationLogger implements
ConnectionNotificationListener<ConnectionNotification>,
MuleContextAware {
final static Logger logger = Logger.getLogger(ElkConnectionNotificationLogger.class);
/** injected by config */
String shouldFire = "true";
public void setShouldFire(String s) {
shouldFire = s;
}
/** injected by spring */
MuleContext muleContext;
@Override
public void setMuleContext(MuleContext context) {
muleContext = context;
}
public void init() {
}
/**
* (resource, alreadyFailed) - used to avoid logging each in a long series
* of failed reconnect attempts
*/
Map<String, Boolean> state = new HashMap<String, Boolean>();
private synchronized void stateRemove (String key) {
state.remove(key);
}
private synchronized void stateSet (String key, Boolean value) {
state.put(key, value);
}
@Override
public void onNotification(ConnectionNotification cn) {
try {
logger.debug("CONNECTION notification: " + cn.getResourceIdentifier() + " : " + cn.toString());
HashMap<String, String> props = new HashMap<String, String>();
props.put("message", cn.getActionName());
props.put("code", cn.getAction());
String key = cn.getResourceIdentifier();
if(cn.getAction() == ConnectionNotification.CONNECTION_CONNECTED) {
logElkEvent(props);
stateRemove (key);
} else if(cn.getAction() == ConnectionNotification.CONNECTION_FAILED) {
// elk only if it had not been failing already
if (!state.getOrDefault(key, false))
logElkEvent(props);
else
logger.info("NOT Sending to elk: " + cn.toString());
stateSet(key, true);
} else if(cn.getAction() == ConnectionNotification.CONNECTION_DISCONNECTED) {
logElkEvent(props);
} else {
logger.error("CONNECTION notification UNKNOWN action: " + cn.toString());
logElkEvent(props);
}
} catch (Exception e) {
logger.error("CONNECTION notification: Exception during notification: " + e);
e.printStackTrace();
}
}
/** replace props in json and send to elk */
private void logElkEvent(HashMap<String, String> props) {
// ...
}
}
Here are some documentation links: