Sometime ago I did some interesting work with Akka. These days I don't code everyday, most of what I do is in my own time and I wanted to play with the latest Akka (1.1) and see if it was still as fun as it was.
I wanted something simple to connect to Twitter, read the stream, chop it up message by message, pass them to a JSON parsing actor, which it turn would pass them on to further actors for more processing. The concept can be extended in many ways.
The codebase as it stands is quite small, which is really thanks to standing on the shoulders of giants:
Scala 2.9.0
Akka 1.1
SJSON
Signpost OAuth library
Apache HTTP Client 4.1
ScalaTest
Mockito
SBT
A few comments about the code...
Firstly its not production code, it is an exploration of various things. I will outline some of what I think are the most interesting parts of the code. See the bottom of this post where to find the code.
Using Either and Option
One of the things that has always created confusion is the use of exceptions for things that are not really exceptional cases. Too often we see exceptions used for alternate path selection, often the less travelled path but still a valid path.
If you expect failure, which in a networked environment we should, then you should not be using exceptions, its not an exceptional case, its an expected one. However how do you represent this in code? Thankfully Scala gives you Either. Your methods can use Either to indicate that this method will return either an error or a result.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
trait Endpoint { | |
def uri: String | |
def connect: Either[Error,BufferedReader] | |
def connectionTimeout = 60000 | |
def soTimeout = 1000 | |
} |
In the above we have defined a trait with a connect method that can return either an Error if something goes wrong, or a BufferedReader if all goes well.
Similarly you can use Option to indicate that a method may return a value or not. This is much more meaningful than returning null's as you cannot be sure what a null really means, whereas an Option explicitly states its Optional.
And its possible to combine the two...
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private def readMessage: Either[ReadError,Option[String]] = { | |
try { | |
Right(Some(stream.get.readLine())) | |
} | |
catch { | |
case e: SocketTimeoutException => Right(None) | |
case e: Exception => Left(new ReadError(e.getMessage)) | |
case _ => Left(new ReadError("Unknown")) | |
} | |
} |
So in the above we are saying, either expect a ReadError if there is a problem otherwise you will 'optionally' get a string.
Using 'become' to manange connection state
Akka provides the ability to change the receive method of an Actor using 'become', this can be very useful to manage the state of an actor based on the messages it receives. In this case we have two "states", active and inActive. The messages that are valid in each state are different. The actor can move between these states using 'become'.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
protected def receive = inActive | |
def inActive: Receive = { | |
case Connect => { | |
endpoint.connect fold (connectFail _, connectSuccess _) | |
} | |
} | |
def active: Receive = { | |
case NextMessage => { | |
readMessage fold (readFail _, readSuccess _) | |
} | |
case CloseConnection => { | |
EventHandler.warning(this,"CloseConnection") | |
stream.get.close() | |
stream = None | |
become(inActive) | |
} | |
} |
Testing
I have written the tests with ScalaTest, in a Given-Then-When style. One of the challenges of testing an intergration point is verifying expected behaviour without actually having to connect to a real service. Using AkkaTestKit and Mockito I have attempted to test as close to the integration as possible without actually building a mock service.
Testing logging events
Akka provides an eventing service which is an alternative to traditional logging. Eventing decouples the action of logging from the source of the event, this is very useful in many respects including testing. It is possible to hook into the event service through the test, in this case I used an actor that simply places the events received on the queue. The test can then verify expected events are on the queue in the expected timeframe.
Whilst it is possible to get the Events sent to the TestKit testactor I found this tended to made the test confusing as essentially two streams of messages were being merged which in turn made the test logic quite complex. By using a separate queue it made it clearer (in my view) to understand.
Below you can see an example using Mockito as well as the event queue.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
scenario("Failing to connect") { | |
given("I have a mocked Twitter endpoint") | |
val endpoint = mock[TwitterEndpoint] | |
Mockito.when(endpoint.connect).thenReturn(Left(new Error("Connection Failed"))) | |
when("I create a Sample connector") | |
val connector = actorOf( new SampleIngest(endpoint, this.testActor)).start | |
and("I send it a connect message") | |
connector ! Connect | |
then("it shall result in an error event") | |
eventQueue.poll(1000,TimeUnit.MILLISECONDS) match { | |
case e: EventHandler.Error => {} | |
case a: Any => fail("Failed to get Error, got [" + a + "]") | |
} | |
} |
How to use it
Warning: whilst using this code for testing and experimentation does not (according to my understanding) break any Twitter T&C's you should satisfy yourself should you decide to use this code or alternatively base any future code upon it.
You will need to register an application with Twitter for which you will need to have a Twitter account, login and then go to https://dev.twitter.com/apps
Here you can register a new application, then make note of the consumerKey and consumerSecret. You will also need to make note of your Access Token (oauth_token) and Access Token Secret (oauth_token_secret).
Then you need to clone the source code and built it using sbt (http://code.google.com/p/simple-build-tool/)
The following transcript shows how to use it.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ sbt | |
[info] Extracting Jetty Startup files from [project/plugins/lib_managed/scala_2.7.7/sbt-jetty-embed-plugin-0.6.1.jar] | |
[info] Building project TwitterPipeline 0.1 against Scala 2.9.0 | |
[info] using TwitterPipeline with sbt 0.7.7 and Scala 2.9.0 | |
> update | |
[info] | |
[info] == update == | |
[info] :: retrieving :: net.usersource#twitterpipeline_2.9.0 [sync] | |
[info] confs: [test, system, provided, jetty6Embed, compile, runtime, sources, javadoc, jetty7Embed, optional] | |
[info] 0 artifacts copied, 68 already retrieved (0kB/72ms) | |
[info] == update == | |
[success] Successful. | |
[info] | |
[info] Total time: 5 s, completed Jun 11, 2011 8:16:09 PM | |
> console | |
[info] | |
[info] == compile == | |
[info] Source analysis: 0 new/modified, 0 indirectly invalidated, 0 removed. | |
[info] Compiling main sources... | |
[info] Nothing to compile. | |
[info] Post-analysis: 49 classes. | |
[info] == compile == | |
[info] | |
[info] == copy-resources == | |
[info] == copy-resources == | |
[info] | |
[info] == copy-test-resources == | |
[info] == copy-test-resources == | |
[info] | |
[info] == test-compile == | |
[info] Source analysis: 0 new/modified, 0 indirectly invalidated, 0 removed. | |
[info] Compiling test sources... | |
[info] Nothing to compile. | |
[info] Post-analysis: 19 classes. | |
[info] == test-compile == | |
[info] | |
[info] == console == | |
[info] Starting scala interpreter... | |
[info] | |
Welcome to Scala version 2.9.0.final (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_24). | |
Type in expressions to have them evaluated. | |
Type :help for more information. | |
scala> System.setProperty("consumerKey","XXXXXXXXXXXX"); | |
scala> System.setProperty("consumerSecret","XXXXXXXXXXXXX") | |
scala> System.setProperty("accessToken","XXXXXXXXXXXX"); | |
scala> System.setProperty("accessSecret","XXXXXXXXXXXXX") | |
scala> import net.usersource.twitpipe._ | |
import net.usersource.twitpipe._ | |
scala> Pipeline.start | |
Can't load 'akka.conf'. | |
One of the three ways of locating the 'akka.conf' file needs to be defined: | |
1. Define the '-Dakka.config=...' system property option. | |
2. Put the 'akka.conf' file on the classpath. | |
3. Define 'AKKA_HOME' environment variable pointing to the root of the Akka distribution. | |
I have no way of finding the 'akka.conf' configuration file. | |
Using default values everywhere. | |
scala> [INFO] [6/11/11 8:17 PM] [akka:event-driven:dispatcher:72484de0-945f-11e0-bc9b-c8bcc8efbcc2-1] [SampleIngest] Connected | |
Pipelinimport net.usersource.twitpipe._>>> tam_clementeT : RT @andreedimarco: @tam_clementeT si, te juro ver ese partido fue m?s armado que los programas de #SHOWMATCH haha | |
>>> ateruimashin : ?????? | |
>>> humbertogarciaa : Que yo quiero lo que quiero por que me gusta lo que quiero | |
>>> allybobwa1 : @courtpaige1 thank you for breaking my wrist. | |
>>> DougBeni : Quiero, quiero, quiero, que se AnahiOnRockInRio pare el tiempo | |
>>> slickitten : I am thinking about Auto show http://bit.ly/jGDrFk @GetGlue #AutoShow | |
>>> CoolAssTiff : @MayorJETS wiz | |
>>> yukio5699 : to estudando historia | |
>>> NuneySoPhresh : Laying Down All Lonely And Isshh..I Told #oomf There Is Room(: | |
>>> BeY0Nd_tHiS : #imjustsayin if you have no hang time you shouldn't have braids | |
>>> NatZimmermann : @yankalbertin pra ser algu?m na vida, tem que estudar, e n?o reclamar ! HAHAHAHAHAHHA | |
>>> zerobanks : Jlk tp bwt org kangen loh RT @ichaciichacoei: ih paklur, wooo dasar jelek :p RT @zerobanks: Gak ada ah. RT @ichaciichacoei: loh koq d gnti? | |
>>> ThaLegacysWife : @RuleBlindBreezy :/ I'd DIE if I had @ImBenJBro's digits. O_o | |
>>> cesarmzp : En el zool?gico con @_aLiNo_ y @raulillito! | |
>>> itsgeebitch : Morrendo de sono. | |
Pipeline.stop | |
>>> SirDrupe : Never trust a girl by her prom pictures | |
>>> naahpuente : 16:16 AnahiOnRockInRio | |
>>> Mar_Smallville : Even The Sight Of The Sun Is Pissing Me Off Now I'm #Moody AF | |
>>> vii_oO : boa tarde... to na casa do meu primo @hyagoalexandre ... daqui a pouco DM | |
scala> [WARN] [6/11/11 8:17 PM] [akka:event-driven:dispatcher:72484de0-945f-11e0-bc9b-c8bcc8efbcc2-1] [SampleIngest] CloseConnection |
If you want to play with the code in Intellij, just run 'sbt idea' on the command line. The code is tagged with 'blog_post' at the point in time of writing this blog post.