/* Message.java */

package concurrency;

import java.util.Queue;
import java.util.LinkedList;
import java.util.Map;
import java.util.HashMap;

/** A message sent between threads
 *
 * @author Russell C. Bjork
 */
public class Message
{
    /** Constructor
     * 
     *  @param content the content of the message
     */
    public Message(String content)
    {
        sender = Thread.currentThread();
        this.content = content;
    }
    
    /** Send this message to another thread.  The sending thread continues
     *  execution without waiting for the message to be handled by the
     *  recipient. 
     * 
     *  @param destination the thread to send this message to
     *  @exception IllegalArgumentException if a thread attempts to send a
     *             a message to itself
     */
    public void send(Thread destination) throws IllegalArgumentException
    {
        if (destination == Thread.currentThread())
            throw new IllegalArgumentException("Destination same as sender");
        
        // Find the queue for the destination thread - or create if necessary -
        // and then add this message to it
        
        Queue<Message> recipientQueue;
        synchronized(recipientsMap)
        {
            recipientQueue = recipientsMap.get(destination);
            if (recipientQueue == null)
            {
                recipientQueue = new LinkedList<Message>();
                recipientsMap.put(destination, recipientQueue);
            }
            recipientQueue.add(this);
        }
        
        // If the recipient is waiting for a message, notify it
        
        synchronized(recipientQueue)
        {
            recipientQueue.notify();
        }
    }
    
    /** Send this message to another thread, and wait for a reply.  The sending
     *  thread blocks until the receiving thread replies to it with reply()
     *
     *  @param destination the thread to send this message to
     *  @return the reply message
     *  @exception IllegalArgumentException if a thread attempts to send a
     *             a message to itself
     */
    public synchronized Message sendAwaitReply(Thread destination)
                throws IllegalArgumentException
    {
        repliedTo = false;
        send(destination);
        boolean replyReceived = false;
        while (! repliedTo)
            try
            {
                wait();
            }
            catch(InterruptedException e)
            { }

        return this;
    }

    /** Receive a message.  If no message is available for the current thread,
     *  the thread will wait until one becomes available or a timeout period
     *  expires.
     *
     *  @return the message received 
     */
    public static Message receive()
    {
        Thread recipient = Thread.currentThread();

        // Find the queue for the current thread - or create if necessary
        
        Queue<Message> recipientQueue;
        synchronized(recipientsMap)
        {
            recipientQueue = recipientsMap.get(recipient);
            if (recipientQueue == null)
            {
                recipientQueue = new LinkedList<Message>();
                recipientsMap.put(recipient, recipientQueue);
            }
        }

        // If there is no message waiting in this queue, block this thread on
        // the queue.  When there is a message in the queue, return it.  

        synchronized(recipientQueue) {
            while (recipientQueue.size() == 0)
            {
                try
                {
                    recipientQueue.wait();
                }
                catch(InterruptedException e)
                { }
            }
            return recipientQueue.poll();
        }
    }

    /** Reply to the sender of this message.  If the sender is waiting for
     *  a reply, it will receive it; otherwise, the reply will be ignored
     *
     *  @param replyContent the content of the reply
     */
    public synchronized void reply(String replyContent)
    {
        sender = Thread.currentThread();
        content = replyContent;
        repliedTo = true;
        notify();
    }

    /** Accessor for the sender of this message
     * 
     *  @return the sender
     */
    public Thread getSender()
    {
        return sender;
    }
    
    /** Accessor for the content of this message
     * 
     *  @return the content
     */
    public String getContent()
    {
        return content;
    }

    /** Construct a string representation of this message
     *
     *  @return the string representation
     */
    public String toString() {
        return content + " from " + sender;
    }
    
    private Thread sender;
    private String content;
    private boolean repliedTo;
    
    private static Map<Thread, Queue<Message>> recipientsMap =
       new HashMap<Thread, Queue<Message>>();
}
