ScaLearning 7 – Distributing Concurrent Tests

facebooktwitterdiggdzonestumbleuponredditdelicious


Like many developers who make the journey from Java to Scala, I often find myself amazed at how much easier it is to do some things, or how much easier it is to express myself in Scala.

“ScaLearning” will be a series of short blog-posts just documenting little tidbits I find interesting, confusing, amusing, or otherwise worthy of talking about.
 
 

Motivation

Recently in order to gain confidence in our web application, our team decided it pragmatic to run a series of tests over a deployed version of the application complete with production-like database. A simple suite of non-destructive tests we could run in any environment was quickly put together.
 
Unfortunately one of our simplest tests quickly began causing us trouble. The test emulated a search spider, crawling every link it found on the entire site in an exhaustive graph traversal complete with cycle-detection. Unfortunately, the test ran for over 24 hours without completing.
 
While we’ve made other optimizations to improve performance of the test (such as excluding sufficiently similar pages), the topic of today’s post is the concurrency we introduced in order to help take the edge off test time.
 
 

Distributing Concurrent Tests

Our goal was relatively simple. We wished to run a very simple test across many thousands of URLs:

def testSingleUrl(nextUrl : String) = {
      client.open(nextUrl)
      client.statusCode should equal(200)
}

 
 
If either of the calls inside this test fail, an exception would be thrown. This was an acceptable way of detecting test failure. However, the test needed to do a bit more – traverse the graph:

  def testUrl(nextUrl: String): Unit = {
      if (wasVisited(nextUrl)) return

      val client = new HtmlUnitRunner
      client.open(nextUrl)
      client.statusCode should equal(200)

      registerVisitedUrl(nextUrl)
      
      JListWrapper(client.currentPage.getAnchors())
              .map(_.getAttribute("href"))
              .filter(isLinkValid(_))
              .foreach((a: String) => {
                      markForTesting(nextUrl)
              })
  }

Of course, this was called by a method which pushed and popped from a stack, and “markForTesting” pushed a new link onto that stack. This code worked great sequentially, but we wanted it to operate concurrently in order to minimize testing time. For this, we employed Akka’s actors:

class ConcurrentTest extends Actor {
  def receive = {
    case name: (() => Unit) =>
      try {
        name()
        self reply(true)
      } catch {
        case t: Throwable =>
          t.printStackTrace()
          self reply(false)
      }
    case _ =>
      println("Actor received an unrecognized message")
      self reply(false)
  }
}

ConcurrentTest, as you can see, is the driver behind an individual run of the test method:

val testRunner = actorOf[ConcurrentTest].start()
val result = testRunner !!! (() => testUrl("/index.html")
// Other code can go here
result.get

 
 
Assuming the methods called within testUrl are thread-safe (which we also did using Actors), this will run a single test using a second Thread, and allow us to continue on with our business. However, since there’s only a single Actor here, we only have one Thread with which to process URLs. This means that we’re still effectively opening each link sequentially.

We need a Thread pool, and Akka is glad to provide one:

class Master(nofWorkers: Int) extends Actor {
  val workers = List.fill(nofWorkers)(actorOf[ConcurrentTest].start())
  val router = Routing.loadBalancerActor(new CyclicIterator(workers)).start()
  val answers = ListBuffer[Future[Boolean]]()

  def receive = {
    case name: (() => Unit) => answers += router !!! name
    case "result" => {
      Futures.awaitAll(answers.toList)
      self reply(answers.foldLeft(true)(_ && _.result.get))
      workers.foreach(_.stop())
      router.stop()
      self stop()
    }
  }
}

 
 
So far everything I’ve presented as code came very naturally. In fact, minor modifications for the purpose of blogging notwithstanding, we used the code I’ve presented so far to test several links across our site very successfully, in a fashion very similar to the following:

val masterRunner = actorOf(new Master(concurrentActors)).start()
val productList = fetchAllProductsFromDatabase

productList.map(productToUrl(_))
        .foreach(url => masterRunner ! (() => testSingleUrl(url)))

((masterRunner !! "result").getOrElse(false).asInstanceOf[Boolean]) should equal(true)

However, when we attempted to apply the same methodology to our crawl test, things didn’t work as well as we’d hoped:

    val masterRunner = actorOf(new Master(concurrentActors)).start()

    // def testUrl as seen above

    def markForTesting(nextUrl : String) {
            masterRunner ! (() => testUrl(nextUrl))
    }

   def fullTest() = {
            val baseUrl = "/index.html"
            masterRunner ! testUrl("baseUrl")

             ((masterRunner !! "result").getOrElse(false).asInstanceOf[Boolean]) should equal(true)
    }

 
 
The theory was that “result” would wait for all of the answers to come back before returning. Unfortunately, that’s not quite the sequence of events the Actor sees. In reality, after digging, we figured out what messages the actor received:

  1. (() => testUrl(“/index.html”)) occurs, which is quickly sent to a ConcurrentTest runner
  2. “result” comes next, as it takes a second or two for the runner to open the test
  3. (() => testUrl(_)) is received for several other URLs as links are scraped off the first page

“result” doesn’t actually wait for all the answers to come back, as it has no way of knowing how many answers are actually required. For that matter, we aren’t sure of that number either, as the test is meant to be dynamic. Instead, “result” simply compiles the answers it has so far, and then shuts down all of the actors. This means we get a “yes” or “no” about “/index.html”, but all of the other URLs are still sitting in the mailbox of Master when it’s shut down. Uh-oh!

So how do we know when we’re done? Mailbox sizes. We added a new match case to Master which would calculate if it believed the tests to be done yet:

class Master(nofWorkers: Int) extends Actor {
  ...
  def receive = {
    ...
    case "done" => { // Added this case, the rest of "Master" remained unchanged
        Futures.awaitAll(answers.toList)
        val overallSize = workers.map(worker => worker.mailboxSize).foldLeft(0)(_ + _) + router.mailboxSize
        self.reply(overallSize == 0 && self.mailboxSize == 0)
    }
  }
}

 
 
This code is different than “result” in that it actually attempts to detect if the tests are done by:

  1. Waiting for all currently outstanding test-methods to complete
  2. Counting any pending messages in the router and worker mailboxes.
    (This should always be zero, as we’ve waited for all answers to return, but it’s still safer to be sure)
  3. Counting any pending messages on the master
  4. Return “true” if the total pending messages is 0, otherwise “false” as more tests have to run

This algorithm will work for us because when we run Futures.awaitAll it runs every outstanding test to completion. Any URLs found on the pages to be tested are checked against previously-visited URLs, and added to Master’s queue if they are new. Since Master is still processing “done” those tests will stay on the queue and “mailboxSize” will return a positive non-zero number. If, however, no new links are encountered, then there will be no tests waiting on the Master queue, and our “done” operation will detect 0 pending tests.

In use:

val masterRunner = actorOf(new Master(concurrentActors)).start()
def markForTesting(nextUrl : String) = masterRunner ! (() => testUrl(nextUrl))
// def testUrl as seen above

def fullTest() = {
    val baseUrl = "/index.html"
    masterRunner ! testUrl("baseUrl")
    while((masterRunner !! "done").getOrElse(false).asInstanceOf[Boolean] == false) {
        Thread.sleep(1000);
    }
    ((masterRunner !! "result").getOrElse(false).asInstanceOf[Boolean]) should equal(true)
}

Now we sleep our thread, asking the master if it’s completed its job once every cycle, until the master claims all of its workers have completed their work and no new work is pending for the master to distribute.

Feedback as to other potential approaches is very welcome, I find the entire topic of concurrency and job distribution very interesting.


facebooktwitterdiggdzonestumbleuponredditdelicious