Whenever I encounter a situation where I have to mix a blend of different services and endpoints and apply one or more of the traditional enterprise integration patterns then Apache Camel often is my weapon of choice.
I simply love how easy it is to set up some datasources, add some routing magic, data transformers, load balancers, content enrichers and enjoy the result.
Another thing that I’m beginning to love is Scala and so this is the perfect time to write an article about using Scala and Apache Camel together.
In the following tutorial we’re setting up our environment using SBT and Scala we’ll take a look at several interesting use cases for camel.
Dependencies / Project Setup
I’m using SBT to build the project here. You just need to add some dependencies here to your build.sbt.
name := "camel-scala-tutorial"
organization := "com.hascode.tutorial"
version := "1.0.0"
libraryDependencies ++= Seq(
"org.apache.camel" % "camel-scala" % "2.10.1",
"org.apache.activemq" % "activemq-core" % "5.6.0",
"org.apache.camel" % "camel-jms" % "2.10.1",
"ch.qos.logback" % "logback-core" % "1.0.9",
"ch.qos.logback" % "logback-classic" % "1.0.9",
"javax.mail" % "mail" % "1.4.5",
"com.icegreen" % "greenmail" % "1.3",
"org.apache.camel" % "camel-mail" % "2.10.3",
"org.apache.camel" % "camel-velocity" % "2.10.3"
)
A short explanation for the dependencies declared above:
-
camel-scala: Adds the Apache Camel API, this is the minimum requirement for us here..
-
activemq-core and camel-jsm: The first one allows us to start an embedded instance of an ActiveMQ server and the second one allows us to use JMS in our routes
-
logback-core/logback-classic: Just some logging API, you may use your favourite logging framework here ..
-
greenmail: Allows us to create an embedded mail server that may handle all kinds of e-mail transfer. Some more detailed information can be found in my article “https://www.hascode.com/2012/07/integration-testing-imap-smtp-and-pop3-with-greenmail/[Integration Testing IMAP, SMTP and POP3 with GreenMail]“.
-
camel-mail: Adds support for handling e-mail messages in a route.
-
camel-velocity: As you might have guessed is this the library to add velocity template support
Now that we’ve got every library we need let’s start to create some routes and play around with camel.
Camel Scala Enhancements
First of all, camel-scala offers a custom implementation of the route builder, org.apache.camel.scala.builder.RouteBuilder that offers us three features compared with the Java DSL:
-
there is no configure() method to override
-
a route starts directly with a URI instead of from(uri)
-
→ is just an alias for to
In addition there’s a trait that you may use, org.apache.camel.scala.dsl.builder.RouteBuilderSupport that adds an implicit conversion from the scala to the java builder:
implicit def scalaToJavaBuilder(scalaBuilder: RouteBuilder) = scalaBuilder.builder
The good thing is: You won’t encounter any problem using scala because the DSLs are 100% compatible. In addition there are open source projects that take this goal further and provide additional syntactic sugar when using scala and camel together.
One example is Camel Extras for Scala that adds some goodies like additional enhancements for the route builder or additional type converters for different scala types and collections.
If you’re interested, please feel free to take a look at the Github project at: https://github.com/osinka/camel-scala-extra
Now that we’ve covered some basics it is time to do some programming here…
Simple File Endpoint
The first example is an easy one .. we’ve got two directories: data/inbox and data/outbox. Everything we’re putting into data/inbox is fetched by camel and put into data/outbox.
package com.hascode.tutorial
import org.apache.camel.scala.dsl.builder.RouteBuilder
import org.apache.camel.CamelContext
import org.apache.camel.impl.DefaultCamelContext
import org.apache.camel.scala.dsl.builder.RouteBuilderSupport
object SimpleFileEndpointExample extends App {
val context: CamelContext = new DefaultCamelContext
val routeBuilder = new RouteBuilder {
from("file:data/inbox") --> ("file:data/outbox")
}
context.addRoutes(routeBuilder)
context.start
while (true) {}
}
Running the example above yields the following output when we put a file into the directory data/inbox
$ date >> data/inbox/test.txt
16:41:15.156 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.component.file.FileConsumer - Took 0.000 seconds to poll: data/inbox
16:41:15.159 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.component.file.FileConsumer - Total 1 files to consume
16:41:15.160 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.component.file.FileConsumer - About to process file: GenericFile[test.txt] using exchange: Exchange[test.txt]
16:41:15.166 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[file://data/outbox] Exchange[test.txt]
16:41:15.169 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.component.file.FileOperations - Using FileChannel to write file: data/outbox/test.txt
16:41:15.174 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.c.file.GenericFileProducer - Wrote [data/outbox/test.txt] to [Endpoint[file://data/outbox]]
16:41:15.176 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.c.file.GenericFileOnCompletion - Done processing file: GenericFile[test.txt] using exchange: Exchange[test.txt]
16:41:15.176 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG org.apache.camel.util.FileUtil - Retrying attempt 0 to delete file: /data/project/camel-scala-tutorial/data/inbox/test.txt.camelLock
16:41:15.176 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG org.apache.camel.util.FileUtil - Tried 1 to delete file: /data/project/camel-scala-tutorial/data/inbox/test.txt.camelLock with result: true
16:41:15.177 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.c.f.s.GenericFileRenameProcessStrategy - Renaming file: GenericFile[test.txt] to: GenericFile[.camel/test.txt]
16:41:15.178 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG org.apache.camel.util.FileUtil - Tried 1 to rename file: /data/project/camel-scala-tutorial/data/inbox/test.txt to: data/inbox/.camel/test.txt with result: true
JMS / ActiveMQ Example
In the next example we’re going to do some JMS messaging using an embedded instance of ActiveMQ.
The following route scans the directory data/inbox for incoming files.
If there is a file it is put in a specific queue, filtered by the file extension. In addition we’re fetching the messages from alle JMS queues and print the processed file’s name.
package com.hascode.tutorial
import org.apache.camel.scala.dsl.builder.RouteBuilder
import org.apache.camel.CamelContext
import org.apache.camel.impl.DefaultCamelContext
import javax.jms.ConnectionFactory
import org.apache.camel.component.jms.JmsComponent
import org.apache.activemq.ActiveMQConnectionFactory
import org.apache.camel.Processor
import org.apache.camel.scala.dsl.builder.RouteBuilderSupport
import org.apache.camel.Exchange
object RoutingAfterCBRExample extends App with RouteBuilderSupport {
val context: CamelContext = new DefaultCamelContext
val connectionFactory: ConnectionFactory = new ActiveMQConnectionFactory("vm://localhost")
context.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory))
val routeBuilder = new RouteBuilder {
from("file:data/inbox")
.choice {
when(fileEndsWith(_, "xml")).to("jms:xmlOrders")
when(fileEndsWith(_, "csv")).to("jms:csvOrders")
otherwise().to("jms:unknownOrders")
}
from("jms:xmlOrders")
.process(exchange => println("XML type order received: " + exchange.getIn().getHeader("CamelFileName")))
from("jms:csvOrders")
.process(exchange => println("CSV type order received: " + exchange.getIn().getHeader("CamelFileName")))
from("jms:unknownOrders")
.process(exchange => println("Unknown type order received: " + exchange.getIn().getHeader("CamelFileName")))
}
context.addRoutes(routeBuilder)
context.start
while (true) {}
def fileEndsWith(ex: Exchange, fileExt: String): Boolean = {
ex.getIn().getHeader("CamelFileName") match {
case x: String => return x.endsWith(fileExt)
case _ => false
}
}
}
Running the application above and adding the following files to the data/inbox directory should give you a similar output (shortened)
$ date >> data/inbox/order1.csv && date >> data/inbox/someorder.xml && date >> data/inbox/strangeorder.foo && date >> data/inbox/bang
[..]
CSV type order received: order1.csv
[..]
Unknown type order received: strangeorder.foo
[..]
Unknown type order received: bang
[..]
XML type order received: someorder.xml
Resequencer Example
The resequencer allows us to bring our messages back in a predefined order .. e.g. you might want to resort incoming messages using the JMSPriority header or something like that.
In our example, we have two queues .. wip and wip-inorder. What we’re doing now is to put some messages with a priority header into the wip queue and the resequencer orders messages from this queue and puts the messages in the correct order into the wip-inorder queue.
Some more detailed information can be found at the Camel website: Resequencer.
package com.hascode.tutorial
import org.apache.camel.scala.dsl.builder.RouteBuilder
import org.apache.camel.CamelContext
import org.apache.camel.impl.DefaultCamelContext
import javax.jms.ConnectionFactory
import org.apache.camel.component.jms.JmsComponent
import org.apache.activemq.ActiveMQConnectionFactory
import org.apache.camel.Processor
import org.apache.camel.scala.dsl.builder.RouteBuilderSupport
object ResequencerExample extends App with RouteBuilderSupport {
val context: CamelContext = new DefaultCamelContext
val connectionFactory: ConnectionFactory = new ActiveMQConnectionFactory("vm://localhost")
context.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory))
val routeBuilder = new RouteBuilder {
from("jms:wip").resequence(header("custom-priority")).batch().timeout(10000).reverse().to("jms:wip-inorder")
from("jms:wip-inorder")
.process(exchange => println("Message received with priority: " + exchange.getIn().getHeader("custom-priority")))
}
context.addRoutes(routeBuilder)
context.start
Thread.sleep(2000)
println("sending some messages..")
val tpl = context.createProducerTemplate
tpl.setDefaultEndpointUri("jms:wip")
tpl.sendBodyAndHeader("foo1", "custom-priority", 1)
tpl.sendBodyAndHeader("foo2", "custom-priority", 5)
tpl.sendBodyAndHeader("foo3", "custom-priority", 6)
tpl.sendBodyAndHeader("foo4", "custom-priority", 3)
tpl.sendBodyAndHeader("foo5", "custom-priority", 4)
tpl.sendBodyAndHeader("foo6", "custom-priority", 2)
while (true) {}
}
Running the application above should give a similar output. Now the messages are ordered by their header “custom-priority”:
[..]
16:10:02.253 [main] DEBUG o.a.camel.impl.DefaultCamelContext - Adding routes from builder: com.hascode.tutorial.ResequencerExample$$anon$1@7776cad3
16:10:02.452 [main] DEBUG o.apache.camel.impl.DefaultComponent - Creating endpoint uri=[jms://wip], path=[wip], parameters=[{}]
16:10:02.457 [main] DEBUG o.a.camel.impl.DefaultCamelContext - jms://wip converted to endpoint: Endpoint[jms://wip] by component: org.apache.camel.component.jms.JmsComponent@6b552b76
16:10:02.530 [main] DEBUG o.apache.camel.impl.DefaultComponent - Creating endpoint uri=[jms://wip-inorder], path=[wip-inorder], parameters=[{}]
16:10:02.530 [main] DEBUG o.a.camel.impl.DefaultCamelContext - jms://wip-inorder converted to endpoint: Endpoint[jms://wip-inorder] by component: org.apache.camel.component.jms.JmsComponent@6b552b76
16:10:02.555 [main] DEBUG o.a.c.m.DefaultManagementAgent - Registered MBean with ObjectName: org.apache.camel:context=styx/camel-1,type=endpoints,name="jms://wip-inorder"
16:10:02.557 [main] DEBUG o.a.c.p.interceptor.DefaultChannel - Initialize channel for target: 'To[jms:wip-inorder]'
16:10:02.589 [main] DEBUG o.a.c.p.interceptor.DefaultChannel - Initialize channel for target: 'Resequencer[{org.apache.camel.scala.ScalaExpression@6da0d866} -> [To[jms:wip-inorder]]]'
[..]
16:10:02.628 [main] DEBUG o.a.camel.component.jms.JmsProducer - Starting producer: Producer[jms://wip-inorder]
[..]
16:10:05.394 [main] DEBUG o.a.c.component.jms.JmsConfiguration - Sending JMS message to: queue://wip with message: ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {custom_HYPHEN_priority=1, breadcrumbId=ID-styx-41187-1360509002058-0-1}, readOnlyProperties = false, readOnlyBody = false, droppable = false, text = foo1}
[..]
16:10:05.494 [Camel (camel-1) thread #1 - JmsConsumer[wip]] DEBUG o.a.camel.processor.BatchProcessor - Received exchange to be batched: Exchange[JmsMessage[JmsMessageID: ID:styx-33805-1360509003088-3:8:1:1:1]]
16:10:15.495 [Camel (camel-1) thread #0 - Batch Sender] DEBUG o.a.camel.processor.BatchProcessor - Sending aggregated exchange: Exchange[JmsMessage[JmsMessageID: ID:styx-33805-1360509003088-3:5:1:1:1]]
16:10:15.496 [Camel (camel-1) thread #0 - Batch Sender] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[jms://wip-inorder] Exchange[JmsMessage[JmsMessageID: ID:styx-33805-1360509003088-3:5:1:1:1]]
[..]
16:10:15.499 [ActiveMQ Task-1] DEBUG o.a.a.broker.region.AbstractRegion - localhost adding consumer: ID:styx-33805-1360509003088-3:9:-1:1 for destination: ActiveMQ.Advisory.TempQueue,ActiveMQ.Advisory.TempTopic
16:10:15.502 [Camel (camel-1) thread #0 - Batch Sender] DEBUG o.a.c.component.jms.JmsConfiguration - Sending JMS message to: queue://wip-inorder with message: ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {custom_HYPHEN_priority=6, breadcrumbId=ID-styx-41187-1360509002058-0-7, CamelJmsDeliveryMode=2}, readOnlyProperties = false, readOnlyBody = false, droppable = false, text = foo3}
Message received with priority: 6
[..]
Message received with priority: 5
[..]
Message received with priority: 4
[..]
Message received with priority: 3
[..]
Message received with priority: 2
[..]
Message received with priority: 1
Load Balancer
Camel offers you an easy way for load balancing using different strategies. We’re using an easy round-robin strategy here without any special configuration.
There’s a lot of useful information on possible configuration and load balancing strategies on the Camel documentation.
package com.hascode.tutorial
import org.apache.camel.scala.dsl.builder.RouteBuilder
import org.apache.camel.CamelContext
import org.apache.camel.impl.DefaultCamelContext
import javax.jms.ConnectionFactory
import org.apache.camel.component.jms.JmsComponent
import org.apache.activemq.ActiveMQConnectionFactory
import org.apache.camel.Processor
import org.apache.camel.scala.dsl.builder.RouteBuilderSupport
object LoadBalancerExample extends App with RouteBuilderSupport {
val context: CamelContext = new DefaultCamelContext
val connectionFactory: ConnectionFactory = new ActiveMQConnectionFactory("vm://localhost")
context.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory))
val routeBuilder = new RouteBuilder {
from("jms:incoming").loadBalance().
roundRobin().to("jms:worker-queue-1", "jms:worker-queue-2", "jms:worker-queue-3");
from("jms:worker-queue-1")
.process(exchange => println("Handler 1 recevied a message with number flag: " + exchange.getIn().getHeader("my-number")))
from("jms:worker-queue-2")
.process(exchange => println("Handler 2 recevied a message with number flag: " + exchange.getIn().getHeader("my-number")))
from("jms:worker-queue-3")
.process(exchange => println("Handler 3 recevied a message with number flag: " + exchange.getIn().getHeader("my-number")))
}
context.addRoutes(routeBuilder)
context.start
Thread.sleep(2000)
val tpl = context.createProducerTemplate
tpl.setDefaultEndpointUri("jms:incoming")
tpl.sendBodyAndHeader("foo1", "my-number", 1)
tpl.sendBodyAndHeader("foo2", "my-number", 2)
tpl.sendBodyAndHeader("foo3", "my-number", 3)
tpl.sendBodyAndHeader("foo4", "my-number", 4)
tpl.sendBodyAndHeader("foo5", "my-number", 5)
tpl.sendBodyAndHeader("foo6", "my-number", 6)
while (true) {}
}
Running the example above could produce a similar output – but it is not guaranteed..
Handler 1 recevied a message with number flag: 1
Handler 2 recevied a message with number flag: 2
Handler 3 recevied a message with number flag: 3
Handler 1 recevied a message with number flag: 4
Handler 2 recevied a message with number flag: 5
Handler 3 recevied a message with number flag: 6
IMAP Component
In the next example we’re setting up an email server using the greenmail library – if you’re interested, please feel free to take a look at my tutorial Integration Testing IMAP, SMTP and POP3 with GreenMail.
Afterwards we’re setting up a camel route that fetches e-mails from this IMAP server and stores them in the directory data/outbox.
package com.hascode.tutorial
import org.apache.camel.impl.DefaultCamelContext
import org.apache.camel.scala.dsl.builder.RouteBuilder
import org.apache.camel.scala.dsl.builder.RouteBuilderSupport
import org.apache.camel.CamelContext
import com.icegreen.greenmail.user.GreenMailUser
import com.icegreen.greenmail.util.GreenMail
import com.icegreen.greenmail.util.ServerSetupTest
import javax.mail.internet.MimeMessage
import javax.mail.Session
import javax.mail.internet.InternetAddress
import javax.mail.Message
object ImapExample extends App {
val mailServer = new GreenMail(ServerSetupTest.IMAP)
mailServer.start
val user = mailServer.setUser("test@hascode.com", "joe", "XXXX")
val context: CamelContext = new DefaultCamelContext
val routeBuilder = new RouteBuilder {
from("imap://joe@0.0.0.0:3143?password=XXXX") --> ("file:data/outbox")
}
context.addRoutes(routeBuilder)
context.start
Thread.sleep(3000)
for (i <- 0 until 3) {
var message = new MimeMessage(null: Session)
message.setFrom(new InternetAddress("test@hascode.com"))
message.addRecipient(Message.RecipientType.TO, new InternetAddress(
"foo@hascode.com"))
message.setSubject("Test E-Mail #" + i)
message.setText("This is a fine test e-mail. It is the " + i + " message of 3.")
println("sending new email: #" + i)
user.deliver(message)
Thread.sleep(2000)
}
while (true) {}
}
Running the application above produces a similar output
22:01:42.794 [main] DEBUG o.apache.camel.impl.DefaultComponent - Creating endpoint uri=[imap://joe@0.0.0.0:3143?password=******], path=[joe@0.0.0.0:3143], parameters=[{password=XXXX}]
sending new email: #0
sending new email: #1
sending new email: #2
22:02:44.214 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.component.mail.MailConsumer - Polling mailbox folder: imap://0.0.0.0:3143, folder=INBOX
22:02:44.298 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.component.mail.MailConsumer - Fetching 3 messages. Total 3 messages.
22:02:44.480 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.component.mail.MailConsumer - Processing message: messageNumber=[1], from=[test@hascode.com], to=[foo@hascode.com], subject=[Test E-Mail #0], sentDate=[Feb 12, 2013 10:02:44 PM], receivedDate=[Feb 12, 2013 10:02:44 AM]
22:02:44.648 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[file://data/outbox] Exchange[MailMessage: messageNumber=[1], from=[test@hascode.com], to=[foo@hascode.com], subject=[Test E-Mail #0], sentDate=[Feb 12, 2013 10:02:44 PM], receivedDate=[Feb 12, 2013 10:02:44 AM]]
22:02:44.654 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.component.file.FileOperations - Using InputStream to write file: data/outbox/ID-styx-40497-1360702902385-0-1
22:02:44.660 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.c.file.GenericFileProducer - Wrote [data/outbox/ID-styx-40497-1360702902385-0-1] to [Endpoint[file://data/outbox]]
22:02:44.743 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.component.mail.MailConsumer - Processing message: messageNumber=[2], from=[test@hascode.com], to=[foo@hascode.com], subject=[Test E-Mail #1], sentDate=[Feb 12, 2013 10:02:44 PM], receivedDate=[Feb 12, 2013 10:02:44 AM]
22:02:44.904 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[file://data/outbox] Exchange[MailMessage: messageNumber=[2], from=[test@hascode.com], to=[foo@hascode.com], subject=[Test E-Mail #1], sentDate=[Feb 12, 2013 10:02:44 PM], receivedDate=[Feb 12, 2013 10:02:44 AM]]
22:02:44.904 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.component.file.FileOperations - Using InputStream to write file: data/outbox/ID-styx-40497-1360702902385-0-3
22:02:44.908 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.c.file.GenericFileProducer - Wrote [data/outbox/ID-styx-40497-1360702902385-0-3] to [Endpoint[file://data/outbox]]
22:02:44.987 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.component.mail.MailConsumer - Processing message: messageNumber=[3], from=[test@hascode.com], to=[foo@hascode.com], subject=[Test E-Mail #2], sentDate=[Feb 12, 2013 10:02:44 PM], receivedDate=[Feb 12, 2013 10:02:44 AM]
22:02:45.148 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[file://data/outbox] Exchange[MailMessage: messageNumber=[3], from=[test@hascode.com], to=[foo@hascode.com], subject=[Test E-Mail #2], sentDate=[Feb 12, 2013 10:02:44 PM], receivedDate=[Feb 12, 2013 10:02:44 AM]]
22:02:45.148 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.component.file.FileOperations - Using InputStream to write file: data/outbox/ID-styx-40497-1360702902385-0-5
22:02:45.152 [Camel (camel-1) thread #0 - imap://joe@0.0.0.0:3143] DEBUG o.a.c.c.file.GenericFileProducer - Wrote [data/outbox/ID-styx-40497-1360702902385-0-5] to [Endpoint[file://data/outbox]]
If you take a look now in the directory data/outbox you can see the messages .. each file contains the e-mail body:
$ tree data/outbox/
data/outbox/
├── ID-localhost-40497-1360702902385-0-1
├── ID-localhost-40497-1360702902385-0-3
└── ID-localhost-40497-1360702902385-0-5
Velocity Templates
In the following simple example we’re just scanning the directory data/inbox for incoming files. When a file is added to this directory, a velocity template is applied to create a new file.
First of all this is our velocity template in com/hascode/tutorial/ named transform.vm
This is the transformed file
Let's iterate over the available headers:
#foreach ($key in $headers.keySet())
- key: $key has value: $headers.get($key)
#end
The body:
${body}
This is the application and the route binding.
package com.hascode.tutorial
import org.apache.camel.scala.dsl.builder.RouteBuilder
import org.apache.camel.CamelContext
import org.apache.camel.impl.DefaultCamelContext
import org.apache.camel.scala.dsl.builder.RouteBuilderSupport
object VelocityExample extends App {
val context: CamelContext = new DefaultCamelContext
val routeBuilder = new RouteBuilder {
from("file:data/inbox") --> ("velocity:com/hascode/tutorial/transform.vm") --> ("file:data/outbox")
}
context.addRoutes(routeBuilder)
context.start
while (true) {}
}
Now add some new file to data/inbox and run the application you should be able to spot a new file in data/outbox that could like the following one:
echo "This is just a test" > data/inbox/foo.txt
data/outbox/foo.txt:
This is the transformed file
Let's iterate over the available headers:
- key: camelfilenameonly has value: foo.txt
- key: camelfileparent has value: data/inbox
- key: camelfilename has value: foo.txt
- key: breadcrumbid has value: ID-styx-59735-1360704546890-0-1
- key: camelfileabsolute has value: false
- key: camelfilelength has value: 20
- key: camelfilerelativepath has value: foo.txt
- key: camelfilepath has value: data/inbox/foo.txt
- key: camelfilelastmodified has value: Tue Feb 12 22:29:11 CET 2013
- key: camelfileabsolutepath has value: /data/project/camel-scala-tutorial/data/inbox/foo.txt
The body:
GenericFile[foo.txt]
Bean Binding / Annotated Beans
Camel allows us to use POJOs to specify and endpoint or handle further processing instructions.
This is our recipient bean .. depending on the file name extension it returns two possible JMS queues used for further processing.
package com.hascode.tutorial
import org.apache.camel.RecipientList
import org.apache.camel.Header
class RecipientsBean {
@RecipientList def getRecipients(@Header("CamelFileName") fileName: String): Array[String] = {
println("detecting recipients for given filename: " + fileName)
if (fileName.endsWith("csv")) {
return Array("jms:csvQueue")
}
return Array("jms:defaultQueue")
}
}
And this is our application that scans the directory data/inbox for files and delegates the flow to the recipient bean.
In addition we’re scanning both JMS queues for new messages and we’re printing the file names received from their messages.
package com.hascode.tutorial
import org.apache.camel.CamelContext
import org.apache.camel.impl.DefaultCamelContext
import org.apache.camel.scala.dsl.builder.RouteBuilder
import javax.jms.ConnectionFactory
import org.apache.camel.component.jms.JmsComponent
import org.apache.activemq.ActiveMQConnectionFactory
object AnnotatedBeanExample extends App {
val context: CamelContext = new DefaultCamelContext
val connectionFactory: ConnectionFactory = new ActiveMQConnectionFactory("vm://localhost")
context.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
val routeBuilder = new RouteBuilder {
from("file:data/inbox").bean(classOf[RecipientsBean])
from("jms:csvQueue").process(exchange => println("csv queue: file received: " + exchange.getIn().getHeader("CamelFileName")))
from("jms:defaultQueue").process(exchange => println("default queue: file received: " + exchange.getIn().getHeader("CamelFileName")))
}
context.addRoutes(routeBuilder)
context.start
while (true) {}
}
When we’re now adding some files to the directory ..
$ date > data/inbox/test.csv && date > /data/inbox/foo.txt
-
and run the application above we should get a similar output (shortened)
21:04:06.195 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.component.file.FileConsumer - Total 1 files to consume
21:04:06.196 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.component.file.FileConsumer - About to process file: GenericFile[test.csv] using exchange: Exchange[test.csv]
detecting recipients for given filename: test.csv
21:04:06.222 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.camel.component.jms.JmsProducer - Starting producer: Producer[jms://csvQueue]
[..]
csv queue: file received: test.csv
21:04:06.264 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG org.apache.camel.util.FileUtil - Retrying attempt 0 to delete file: /data/project/camel-scala-tutorial/data/inbox/test.csv.camelLock
21:04:06.264 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG org.apache.camel.util.FileUtil - Tried 1 to delete file: /data/project/camel-scala-tutorial/data/inbox/test.csv.camelLock with result: true
21:04:06.265 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG org.apache.camel.util.FileUtil - Retrying attempt 0 to delete file: data/inbox/.camel/test.csv
21:04:06.265 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG org.apache.camel.util.FileUtil - Tried 1 to delete file: data/inbox/.camel/test.csv with result: true
21:04:06.265 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.c.f.s.GenericFileRenameProcessStrategy - Renaming file: GenericFile[test.csv] to: GenericFile[.camel/test.csv]
21:04:06.265 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG org.apache.camel.util.FileUtil - Tried 1 to rename file: /data/project/camel-scala-tutorial/data/inbox/test.csv to: data/inbox/.camel/test.csv with result: true
21:04:06.741 [ActiveMQ Journal Checkpoint Worker] DEBUG o.a.a.store.kahadb.MessageDatabase - Checkpoint started.
21:04:06.753 [ActiveMQ Journal Checkpoint Worker] DEBUG o.a.a.store.kahadb.MessageDatabase - Checkpoint done.
21:04:06.766 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.component.file.FileConsumer - Took 0.000 seconds to poll: data/inbox
[..]
21:16:37.146 [Camel (camel-1) thread #0 - file://data/inbox] DEBUG o.a.c.component.file.FileConsumer - About to process file: GenericFile[foo.txt] using exchange: Exchange[foo.txt]
[..]
detecting recipients for given filename: foo.txt
[..]
default queue: file received: test.csv
[..]
Tutorial Sources
Please feel free to download the tutorial sources from my GitHub repository, fork it there or clone it using Git:
git clone https://github.com/hascode/camel-scala-dsl-tutorial.git