@Source
public void subscribeTopic(String topic, final SourceCallback callback) {
getBayeuxClient().subscribe(topic, new ClientSessionChannel.MessageListener() {
@Override
public void onMessage(ClientSessionChannel channel, Message message) {
try {
callback.process(message.getData());
} catch (Exception e) {
LOGGER.error(e);
}
}
});
}
Creating Message Sources
In some cases it is necessary to create Message Sources instead of Message Processors. Basically, a Message Source receives or generates new messages to be processed by Mule.
On of the use cases of Message Sources is implementing Streaming APIs. The @Source
annotation marks a method inside a @Module
or @Connector
annotated class as callable from a Mule flow and capable of generating Mule events. Each marked method will have a Message Source generated. The method must receive a SourceCallback
as one of its arguments that represents the next message processor in the chain. It does not matter the order in which this parameter appears as long it is present in the method signature.
Example
Salesforce Connector supports the Salesforce Streaming API in which users can subscribe to topics and receive notifications when a new event related to that topic happens.
This method can be invoked from Mule as follows:
<flow name="myFlow">
<sfdc:subscribe-topic topic="/someTopic"/>
<logger level="INFO" message="#[payload]"/>
...
</flow>
It subscribes to a topic with the given parameter name, and when an update is received it will invoke the next message processor in the chain which in this case is a logger.