package egb;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Pattern;

public class MTNIOStats extends Thread
{
	static final int HASH_SIZE = 100000;
	static int counted = 0;
	static final int MAIN = 0, WORKER = 1, COLLATER = 2, REPORTER = 3;
	static final int STRING_BUFFER_INITIAL_LENGTH = 512;
	static int NUMBER_OF_WORKERS;
	static int NIO_BLOCK_READ_SIZE;
	static MTNIOStats[] workers;
	static MTNIOStats[] collaters;

	Map u_hits = new HashMap(HASH_SIZE);
	Map u_bytes = new HashMap(HASH_SIZE);
	Map s404s = new HashMap(HASH_SIZE);
	Map clients = new HashMap(HASH_SIZE);
	Map refs = new HashMap(HASH_SIZE);
	BlockingQueue queue = new LinkedBlockingQueue();
	
	int threadType = -1;
	
	// MAIN stuff:
	static String filename;
	// WORKER stuff:
	Pattern pattern = Pattern.compile("^/ongoing/When/\\d\\d\\dx/\\d\\d\\d\\d/\\d\\d/\\d\\d/[^ .]+$");
	DataToProcess dataToProcess;
	// COLLATER stuff:
	ByteArrayOutputStream baos = new ByteArrayOutputStream();
	PrintStream collatedReportPiece = new PrintStream(baos);
	String label;
	Map hash;
	boolean shrink;
	
	public MTNIOStats(int threadType)
	{
		this.threadType = threadType;
	}

	public static void main(String[] args) throws Exception
	{
		NUMBER_OF_WORKERS = Integer.parseInt(args[0]);
		NIO_BLOCK_READ_SIZE = Integer.parseInt(args[1]);
		filename = args[2];
		
		workers = new MTNIOStats[NUMBER_OF_WORKERS];
		for (int i=0;i<NUMBER_OF_WORKERS;i++)
		{
			MTNIOStats statsObject = new MTNIOStats(WORKER);
			statsObject.setName("Worker-"+i);
			statsObject.start();
			workers[i] = statsObject;
		}
		
		Thread mainThread = new MTNIOStats(MAIN);
		mainThread.setName("MainThread");
		mainThread.start();
		
		// Now watch some stats while we run
		while (true)
		{
			System.out.print("Counted:"+counted+"   Queues:");
			for (int i=0;i<NUMBER_OF_WORKERS;i++)
				System.out.print(((i==0)?"":"/") + workers[i].queue.size());
			System.out.print("   Hashes:");
			for (int i=0;i<NUMBER_OF_WORKERS;i++)
				System.out.print(((i==0)?"":"/") + workers[i].u_hits.size());
			System.out.println();
			Thread.sleep(1000);
		}
	}
	
	private void doMain() throws Exception
	{
		long start = System.currentTimeMillis();

		// Main thread will do all IO, reading in lines, and then hand them off to the worker threads.
		// Each thread has its own 5 maps, which all need to be added together at the end before reporting.
		File file = new File(filename);
		FileInputStream fis = new FileInputStream(file);

		// NIO
		FileChannel channel = fis.getChannel();
		long filePosition = channel.position();
		int dealTo = 0;
		String leftoverFromPrevious = "";
		while (true)
		{
			ByteBuffer byteBuffer = ByteBuffer.allocate(NIO_BLOCK_READ_SIZE * 1024);
			channel.read(byteBuffer);

			// If file position didn't move, we are done
			if (filePosition == channel.position())
				break;
			long amountRead = channel.position() - filePosition;
			filePosition = channel.position();

			// Before handing off to thread, walk from end backwards and find the last "\n" so we can
			// save off the trailing partial line and hand that to the next thread which will have the 2nd half
			// of the partial line to work with.
			// ignore the partial line if amountRead < byteBuffer size, which means we're done reading
			int lastLinefeed = (int) amountRead - 1;
			String leftoverForNextTime = "";
			if (amountRead == byteBuffer.capacity())
			{
				while (true)
				{
					char c = (char) byteBuffer.get(lastLinefeed);
					byteBuffer.limit(lastLinefeed--);
					if (c == '\n')
						break;
					leftoverForNextTime = c + leftoverForNextTime;
				}
			}
			workers[dealTo++].queue.offer(new DataToProcess(amountRead,byteBuffer,leftoverFromPrevious));
			dealTo = dealTo % NUMBER_OF_WORKERS;
			counted++;
			leftoverFromPrevious = leftoverForNextTime;
		}
		channel.close();
		fis.close();

		// Wait for all worker queues to be empty
		Thread.sleep(1);
		for (int i=0; i<NUMBER_OF_WORKERS; i++)
		{
			while (workers[i].queue.size() > 0 || workers[i].dataToProcess != null)
				Thread.sleep(100);
		}
		
		// Collate all worker hashes back together
		// TODO: Run this in 5 threads
		for (int i=0; i<NUMBER_OF_WORKERS; i++)
		{
			addWorkerStatsToMainStats(workers[i].u_hits,u_hits);
			addWorkerStatsToMainStats(workers[i].u_bytes,u_bytes);
			addWorkerStatsToMainStats(workers[i].s404s,s404s);
			addWorkerStatsToMainStats(workers[i].clients,clients);
			addWorkerStatsToMainStats(workers[i].refs,refs);
		}
		
		// Now run reports
		collaters = new MTNIOStats[5];
		report("URIs by hit", u_hits, false, 0);
		report("URIs by bytes", u_bytes, true, 1);
		report("404s", s404s, false, 2);
		report("client addresses", clients, false, 3);
		report("referrers", refs, false, 4);
		// Wait for all collaters to finish
		Thread.sleep(1);
		for (int i=0; i<5; i++)
		{
			while (collaters[i].isAlive())
				Thread.sleep(100);
		}
		for (int i=0; i<5; i++)
			System.out.println(collaters[i].baos.toString());
		System.out.println("TIME TAKEN:"+(System.currentTimeMillis()-start)+"msec");
		System.exit(0);
	}
	
	private void addWorkerStatsToMainStats(Map workerMap, Map mainMap)
	{
		Iterator iterator = workerMap.keySet().iterator();
		while (iterator.hasNext())
		{
			String key = (String) iterator.next();
			Inty inty = (Inty) workerMap.get(key);
			increment(mainMap, key, inty.count);
		}
	}

	private void doWorker()
	{
		String client,u,status,bytes,ref;
//		int counted = 0;
		String[] f;
		while (true)
		{
			dataToProcess = (DataToProcess) queue.poll();
			if (dataToProcess == null)
			{
				try{Thread.sleep(0,100);}catch(InterruptedException ignore){}
				continue;
			}

			StringBuffer buffer = new StringBuffer(STRING_BUFFER_INITIAL_LENGTH);
			dataToProcess.byteBuffer.rewind();
			buffer.append(dataToProcess.leftoverFromPrevious);
			while (dataToProcess.byteBuffer.hasRemaining() && dataToProcess.byteBuffer.position() < dataToProcess.amountRead)
			{
				char c = (char) dataToProcess.byteBuffer.get();
				if (c == '\n' || c == '\r')
				{
					String line = buffer.toString();
					try
					{
						f = split(line,' ');
						if (f.length < 11 || f[6].length() < 11 
								|| f[5].length() < 4 || f[5].charAt(0)!='"' || f[5].charAt(1)!='G' || f[5].charAt(2)!='E' || f[5].charAt(3)!='T')
						{
							buffer = new StringBuffer(STRING_BUFFER_INITIAL_LENGTH);
							continue;
						}
						
						client = f[0];
						u = f[6].substring(0,f[6].length()-10);
						// These changed from 8/9/10 to 7/8/9 with updates to split to handle URLs with spaces
						status = f[7];
						bytes = f[8];
						ref = f[9];
						
						if ("200".equals(status))
							record(client, u, ("-".equals(bytes)) ? 0 : Integer.parseInt(bytes), ref);
						else if ("304".equals(status))
							record(client, u, 0, ref);
						else if ("404".equals(status))
							increment(s404s,u,1);
					}
					catch (Exception e)
					{
						System.out.println("BAD LINE:"+line);
						e.printStackTrace();
					}
					buffer = new StringBuffer(STRING_BUFFER_INITIAL_LENGTH);
				}
				else
				{
					buffer.append(c);
				}
			}
			dataToProcess = null;
		}
	}

	private void doCollater()
	{
		collatedReportPiece.println("Top "+label+":");
		Iterator iterator = top(hash,10).iterator();
		String fmt = (shrink) ? " %9.1fM: %s\n" : " %10d: %s\n";
		int count = 0;
		while (iterator.hasNext() && count++ < 10)
		{
			SortableKeyAndInty keyAndInty = (SortableKeyAndInty) iterator.next();
			String pkey = (keyAndInty.key.length() > 60) ? keyAndInty.key.substring(0,59) + "..." : keyAndInty.key;
			Inty inty = keyAndInty.inty;
			if (shrink)
				collatedReportPiece.printf(fmt, new Object[]{ new Float(inty.count/(1024.0*1024)), pkey});
			else
				collatedReportPiece.printf(fmt, new Object[]{ new Integer(inty.count), pkey});
		}
	}
	
	public void run()
	{
		try
		{
			if (threadType == WORKER)
				doWorker();
			else if (threadType == MAIN)
				doMain();
			else if (threadType == COLLATER)
				doCollater();
		}
		catch (Exception e)
		{
			System.out.println("Got exception:"+e);
			e.printStackTrace();
			System.exit(1);
		}
	}

	private static List top(Map hash, int howmany)
	{
		ArrayList list = new ArrayList();
		int min = 0;
		Iterator iterator = hash.keySet().iterator();
		while (iterator.hasNext())
		{
			String key = (String) iterator.next();
			Inty inty = (Inty) hash.get(key);
			if (list.size() < howmany || inty.count > min)
			{
				list.add(new SortableKeyAndInty(key,inty));
				Collections.sort(list);
				
				if (list.size() > howmany)
				{
					list.remove(howmany);
					min = ((SortableKeyAndInty)list.get(howmany-1)).inty.count;
				}
				else
				{
					min = ((SortableKeyAndInty)list.get(list.size()-1)).inty.count;
				}
			}
		}
		return list;
	}
	
	private static void report(String label, Map hash, boolean shrink, int i)
	{
		MTNIOStats statsObject = new MTNIOStats(COLLATER);
		statsObject.label = label;
		statsObject.hash = hash;
		statsObject.shrink = shrink;
		statsObject.setName("Collater-"+i);
		statsObject.start();
		collaters[i] = statsObject;
	}
	
	private void record(String client, String u, int bytes, String ref)
	{
		increment(u_bytes, u, bytes);
		if (pattern.matcher(u).matches())
		{
			increment(u_hits, u, 1);
			increment(clients, client, 1);
			if (ref.length() > 3 && !"\"-\"".equals(ref) && !ref.startsWith("\"http://www.tbray.org/ongoing/"))
				increment(refs, ref.substring(1, ref.length()-2), 1);
		}
	}

	private static void increment(Map map, String u, int amount)
	{
		Inty inty = (Inty) map.get(u);
		if (inty == null)
		{
			inty = new Inty();
			map.put(u, inty);
		}
		inty.count += amount;
	}

	static class Inty
	{
		int count = 0;
	}
	
	static class SortableKeyAndInty implements Comparable
	{
		Inty inty;
		String key;
		public SortableKeyAndInty(String key, Inty inty)
		{
			this.key = key;
			this.inty = inty;
		}
		public int compareTo(Object arg0)
		{
			int diff = ((SortableKeyAndInty)arg0).inty.count - inty.count;
			if (diff == 0)
				diff = key.compareTo(((SortableKeyAndInty)arg0).key);
			return diff;
		}
	}
	
	static class DataToProcess
	{
		long amountRead;
		ByteBuffer byteBuffer;
		String leftoverFromPrevious;
		public DataToProcess(long amountRead, ByteBuffer byteBuffer, String leftoverFromPrevious)
		{
			this.amountRead = amountRead;
			this.byteBuffer = byteBuffer;
			this.leftoverFromPrevious = leftoverFromPrevious;
		}
	}

	// Copied in from Apache Commons Lang for convenience, and also to optimize splits to give up after 11
	// For speed, I removed use of ArrayList and hardcoded return array to 11 entries
	public static final String[] EMPTY_STRING_ARRAY = new String[0];
	public static String[] split(String str, char separatorChar) {
        // Performance tuned for 2.0 (JDK1.4)
        if (str == null) {
            return null;
        }
        int len = str.length();
        if (len == 0) {
            return EMPTY_STRING_ARRAY;
        }
//        List list = new ArrayList();
        String[] result = new String[11];
        int i = 0, start = 0, splits = 0, countOfQuotes = 0;
        boolean match = false;
//        while (i < len) {
        while (i < len && splits < 11) {
        	if (str.charAt(i) == '"')
        		countOfQuotes++;
//            if (str.charAt(i) == separatorChar) {
            if (str.charAt(i) == separatorChar && (splits != 6 || countOfQuotes == 2)) {
                if (match) {
                	result[splits++] = str.substring(start, i);
//                    list.add(str.substring(start, i));
                    match = false;
                }
                start = ++i;
                continue;
            }
            match = true;
            i++;
        }
        if (match) {
        	result[splits++] = str.substring(start, i);
//            list.add(str.substring(start, i));
        }
//        return (String[]) list.toArray(new String[list.size()]);
        if (splits < 11)
        	return EMPTY_STRING_ARRAY;
        return result;
    }
}