Showing posts with label thread communications. Show all posts
Showing posts with label thread communications. Show all posts

Saturday, November 22, 2014

Java Thread IV: BlockingQueue to Print Odd/Even Numbers

Today, we will see how to print odd even numbers using Blocking Queue.  

We want to print odd even numbers using two different threads and so it will obviously require some kind of thread synchronization to communicate while one thread is printing the number. So one thread will become producer (odd) and other one (even) will become consumer. Java provides BlockingQueue for thread synchronization in these kind of scenarios. 

BlockingQueue provides functions to support operations that awaits for the queue to become non-empty while retrieving elements or wait for the space to become available while inserting the elements.

In our example, we will use LinkedBlockingQueue which uses ReentrantLock (refer my earlier post) internally for thread synchronization. To implement the solution for our odd even thread, we will need two classes: TakeAndOfferNext class which is callable thread and OddAndEvenSignalling which will invoke the thread.

TakeAndOfferNext:

Now let's see the implementation of TakeAndOfferNext. We will need two blocking queues here: one for odd numbers and another one for even numbers.

    BlockingQueue takeFrom;
    BlockingQueue offerTo;

Blocking queue offers two synchronous operations called take() and offer(e). Take removes the element from the queue if it's non-empty and offer inserts to the queue if it's not full. Here, you dont' need to be aware of thread synchronization as blocking queue will take care of it through these two methods. So let's print odd even numbers using these blocking queues:

    public void print()    {
        while (true)  {
            try   {
                int i = takeFrom.take(); //removes the value in the "from" queue

                System.out.println(Thread.currentThread().getName() + " --> " + i);

                offerTo.offer(i + 1);    //increments the value by 1 and puts it in the "to" queue.

                if (i >= (maxNumber - 1)) {
                    System.exit(0);
                }
            } catch (InterruptedException e) {
                throw new IllegalStateException("Unexpected interrupt", e);
            }
        }
    }

As you can see here we are looping through the continuous loop until it reaches the max number. In the loop, we first take the first number (odd one) from the first queue and pass the next number (even one) to the another queue.

Okay, half work is done now how do we take even element from the another queue which we set at the end ? Well, we will need two threads to do this job which will switch the odde and even queues alternately in below class.

OddAndEvenSignalling:

        BlockingQueue odds = new LinkedBlockingQueue();
        BlockingQueue
evens = new LinkedBlockingQueue();
        ExecutorService executorService = Executors.newFixedThreadPool(2);

Now, our two threads are ready to print odd and even numbers. So, let's pass it on to the TakeAndOfferNext class in such a way that it will switch odd and even queues in the print() method.

        executorService.submit(new TakeAndOfferNext(odds, evens, MAX_NUMBER));
        executorService.submit(new TakeAndOfferNext(evens, odds, MAX_NUMBER));

And that's it, your job is done !!!

Working code samples can be found here.


You can try producer consumer example using blocking queue as an exercise.

Friday, October 31, 2014

Java Thread III: Producer Consumer Using Synchronization vs Locks

Producer Consumer has always been favorite question in java multi-threading interviews.

We will create threads using two ways: Using usual synchronization & Locks.

Synchronization:\

Synchronization is standard way of locking shared resources and provide multi-threading. We  will create here two inner classes: Producer & Consumer and will use Stack to share the resources.

Producer:

class Producer extends Thread {


@Override
public void run() {
while (true) {
synchronized (itemStack) {
if (itemStack.size() <= MAX) {
COUNTER++;
itemStack.push(String.valueOf(COUNTER));
System.out.println("producing item ->" + COUNTER
+ " & size of stack is now :: "
+ itemStack.size());
itemStack.notifyAll();
}

while (itemStack.size() == MAX) {
try {
System.out.println("Stack is full & Producer is waiting.");
itemStack.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

}

Consumer:

private class Consumer extends Thread {


@Override
public void run() {
while (true) {
synchronized (itemStack) {
if (itemStack.size() != EMPTY) {
System.out.println("Consuming item ->"
+ itemStack.pop()
+ " & size of stack is now :: "
+ itemStack.size());
itemStack.notifyAll();
}g

while (itemStack.size() == EMPTY) {
System.out.println("Stack is empty & Consumer is waiting.");
COUNTER = 0;
try {
itemStack.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}

}

As you can see here, we are trying to acquire the lock using synchronization on shared object Stack.

NOTE: Make sure you declare itemStack as static variable or else both threads will deadlock each other.

Working code can be found here.

Lock:

Now let's see how we can rewrite same example using ReentrantLock object which is provided in java.util.concurrent package in jdk 5.


Please see my earlier post on Reentrant Lock if you are new to Lock concepts.

To rewrite producer consumer example using Lock, we will need below piece of code.

    private static Lock lock = new ReentrantLock();
    private static Condition hasSpace = lock.newCondition();
    private static Condition hasItems = lock.newCondition();

First line will create explicit lock object for our example and rest of two lines will provide the conditions required to synchronize the Stack.

Condition objects provides await(), signal() & signalAll() methods similar to wait(), notify() and notifyAll() provided by Object class.

Now, Lets see how we can leverage all of this in our example below.

private class Producer extends Thread {

@Override
public void run() {
while (true) {
try {
lock.lock();
while (itemStack.size() >= MAX) {
System.out.println("Stack is full & Producer is waiting.");
try {
// wait for the list to have space
hasSpace.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

COUNTER++;
itemStack.push(String.valueOf(COUNTER));
System.out.println("producing item number -> "
+ itemStack.size());
hasItems.signalAll();

} finally {
lock.unlock();
}
}
}
}

private class Consumer extends Thread {

@Override
public void run() {
while (true) {
try {
lock.lock();
while (itemStack.size() == EMPTY) {
System.out.println("Stack is empty & Consumer is waiting.");
try {
COUNTER = 0;
// wait for the list to have space
hasItems.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
itemStack.pop();
System.out.println("consuming item number -> "+ itemStack.size());
hasSpace.signalAll();
} finally {
lock.unlock();
}
}
}

}

So As you can see now,  we are acquiring the lock explicitly at the beginning of run() method and releasing it in finally block. Also, lock object provides separate conditions for Producer & Consumer objects as well and simplifies the code further because you know here which condition should await and which one should be signaled. (Check synchronized version for wait & notify conditions to compare against).

It provides more readability to synchronization code since now you have separate object for locking and so you know what exactly you are synchronizing on. In synchronized version, remember, we were relying on Stack object for locking and sometime this can lead to more confusion and can create deadlocks as well. (Hint: try synchronized version with Stack object as non-static reference.)

However, here since locking is explicit, you need to be alert to unlock the lock object at the end of it can create issues later in the code.

Working code can be found here.

Hope, this will help you understand synchronization and locking concepts.

Thursday, October 30, 2014

Java Thread II: Reentrant Lock Vs Synchronized

Today we will see the difference between Lock and Synchronized block but before we go into details, let's understand what is Lock and how it works.

What is Lock ?

Lock interface was introduced in jdk 1.5 along with other concurrency utilities like countdown latch etc. ReentrantLock is the main concrete implementation of lock interface which behaves as mutually exclusive lock like synchronized block.
So the next question comes in mind is why, or most importantly when, one should prefer Lock when we already have synchronized. There are some trade offs between these two and Reentrant Lock can be used in couple of conditions.
For example, Lock provides explicit locking and so it has more readability comparing to synchronized. You will see this in below examples.

1. Fairness:

The ReentrantLock constructor offers a choice of two fairness options: create a non-fair lock or a fair lock. With fair locking, threads can acquire locks only in the order in which they were requested, whereas an unfair lock allows a lock to acquire it out of its turn. This is called barging (breaking the queue and acquiring the lock when it became available).

Fair locking has a significant performance cost because of the overhead of suspending and resuming threads. There could be cases where there is a significant delay between when a suspended thread is resumed and when it actually runs. Let's see a situation:

A -> holds a lock.
B -> has requested and is in a suspended state waiting for A to release the lock.
C -> requests the lock at the same time that A releases the lock, and has not yet gone to a suspended state.

As C has not yet gone to a suspended state, there is a chance that it can acquire the lock released by A, use it, and release it before B even finishes waking up. So, in this context, unfair lock has a significant performance advantage.

2. Polled and Timed Lock Acquisition: 

Let's see some example code:

public void transferMoneyWithSync(Account fromAccount, Account toAccount,
   float amount) throws InsufficientAmountException {
  synchronized (fromAccount) {
   // acquired lock on fromAccount Object
   synchronized (toAccount) {
    // acquired lock on toAccount Object
    if (amount > fromAccount.getCurrentAmount()) {
     throw new InsufficientAmountException(
       "Insufficient Balance");
    } else {
     fromAccount.debit(amount);
     toAccount.credit(amount);
    }
   }
  }
 }

In the transferMoney() method above, there is a possibility of deadlock when two threads 

A and B are trying to transfer money at almost the same time.
A: transferMoney(acc1, acc2, 20);
B: transferMoney(acc2, acc1 ,25);

It is possible that thread A has acquired a lock on the acc1 object and is waiting to acquire a lock on the acc2 object. Meanwhile, thread B has acquired a lock on the acc2 object and is waiting for a lock on acc1. This will lead to deadlock, and the system would have to be restarted! There is, however, a way to avoid this, which is called "lock ordering." Personally, I find this a bit complex. 

A cleaner approach is implemented by ReentrantLock with the use of tryLock() method. This approach is called the "timed and polled lock-acquisition." It lets you regain control if you cannot acquire all the required locks, release the ones you have acquired and retry. 

So, using tryLock, we will attempt to acquire both locks. If we cannot attain both, we will release if one of these has been acquired, then retry

public boolean transferMoneyWithTryLock(Account fromAccount,
   Account toAccount, float amount) throws InsufficientAmountException, InterruptedException 
{
 // we are defining a stopTime
 long stopTime = System.nanoTime() + 5000;
 while (true) {
  if (fromAccount.lock.tryLock()) {
    try {
   if (toAccount.lock.tryLock()) {
       try {
      if (amount > fromAccount.getCurrentAmount()) {
     throw new InsufficientAmountException(          "Insufficient Balance");
       } else {
     fromAccount.debit(amount);
     toAccount.credit(amount);
       }
    } finally {
       toAccount.lock.unlock();
    }
    }
      } finally {
   fromAccount.lock.unlock();
      }
  }
 if(System.nanoTime() < stopTime)
    return false;
    Thread.sleep(100);
 }//while
 }

Here we implemented a timed lock, so if the locks cannot be acquired within the specified time, the transferMoney method will return a failure notice and exit gracefully. We can also maintain time budget activities using this concept. 


3. Interruptible Lock Acquisition:

Interruptible lock acquisition allows locking to be used within cancellable activities.
The lockInterruptibly method allows us to try and acquire a lock while being available for interruption. So, basically it allows the thread to immediately react to the interrupt signal sent to it from another thread. 

This can be helpful when we want to send a KILL signal to all the waiting locks. Let's see one example: Suppose we have a shared line to send messages. We would want to design it in such a way that if another thread comes and interrupts the current thread, the lock should release and perform the exit or shut down operations to cancel the current task.

public boolean sendOnSharedLine(String message) throws InterruptedException{
  lock.lockInterruptibly();
  try{
   return cancellableSendOnSharedLine(message);
  } finally {
   lock.unlock();
  }
 }
private boolean cancellableSendOnSharedLine(String message){
.......

4. Non-block Structured Locking:

In intrinsic locks, acquire-release pairs are block-structured. In other words, a lock is always released in the same basic block in which it was acquired, regardless of how control exits the block. Extrinsic locks allow the facility to have more explicit control. Some concepts, like Lock Strapping, can be achieved more easily using extrinsic locks. Some use cases are seen in hash-bashed collections and linked lists.

private ReentrantLock lock;
public void foo() {
  ...
  lock.lock();
  ...
}
public void bar() {
  ...
  lock.unlock();
  ...
}

Intrinsic locks and extrinsic locks have the same mechanism inside for locking, so the performance improvement is purely subjective. It depends on the use cases we discussed above. Extrinsic locks give a more explicit control mechanism for better handling of deadlocks, starvation, and so on

When should you use ReentrantLocks? 

The answer is pretty simple - use it when you actually need something it provides that synchronized doesn't, like timed lock waits, interruptible lock waits, non-block-structured locks, multiple condition variables, or lock polling. ReentrantLock also has scalability benefits, and you should use it if you actually have a situation that exhibits high contention, but remember that the vast majority of synchronized blocks hardly ever exhibit any contention, let alone high contention. I would advise developing with synchronization until synchronization has proven to be inadequate, rather than simply assuming "the performance will be better" if you use ReentrantLock.

Remember, these are advanced tools for advanced users. (And truly advanced users tend to prefer the simplest tools they can find until they're convinced the simple tools are inadequate.) As always, make it right first, and then worry about whether or not you have to make it faster.