package egb; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.PrintStream; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.regex.Pattern; public class MTNIOStats extends Thread { static final int HASH_SIZE = 100000; static int counted = 0; static final int MAIN = 0, WORKER = 1, COLLATER = 2, REPORTER = 3; static final int STRING_BUFFER_INITIAL_LENGTH = 512; static int NUMBER_OF_WORKERS; static int NIO_BLOCK_READ_SIZE; static MTNIOStats[] workers; static MTNIOStats[] collaters; Map u_hits = new HashMap(HASH_SIZE); Map u_bytes = new HashMap(HASH_SIZE); Map s404s = new HashMap(HASH_SIZE); Map clients = new HashMap(HASH_SIZE); Map refs = new HashMap(HASH_SIZE); BlockingQueue queue = new LinkedBlockingQueue(); int threadType = -1; // MAIN stuff: static String filename; // WORKER stuff: Pattern pattern = Pattern.compile("^/ongoing/When/\\d\\d\\dx/\\d\\d\\d\\d/\\d\\d/\\d\\d/[^ .]+$"); DataToProcess dataToProcess; // COLLATER stuff: ByteArrayOutputStream baos = new ByteArrayOutputStream(); PrintStream collatedReportPiece = new PrintStream(baos); String label; Map hash; boolean shrink; public MTNIOStats(int threadType) { this.threadType = threadType; } public static void main(String[] args) throws Exception { NUMBER_OF_WORKERS = Integer.parseInt(args[0]); NIO_BLOCK_READ_SIZE = Integer.parseInt(args[1]); filename = args[2]; workers = new MTNIOStats[NUMBER_OF_WORKERS]; for (int i=0;i 0 || workers[i].dataToProcess != null) Thread.sleep(100); } // Collate all worker hashes back together // TODO: Run this in 5 threads for (int i=0; i 60) ? keyAndInty.key.substring(0,59) + "..." : keyAndInty.key; Inty inty = keyAndInty.inty; if (shrink) collatedReportPiece.printf(fmt, new Object[]{ new Float(inty.count/(1024.0*1024)), pkey}); else collatedReportPiece.printf(fmt, new Object[]{ new Integer(inty.count), pkey}); } } public void run() { try { if (threadType == WORKER) doWorker(); else if (threadType == MAIN) doMain(); else if (threadType == COLLATER) doCollater(); } catch (Exception e) { System.out.println("Got exception:"+e); e.printStackTrace(); System.exit(1); } } private static List top(Map hash, int howmany) { ArrayList list = new ArrayList(); int min = 0; Iterator iterator = hash.keySet().iterator(); while (iterator.hasNext()) { String key = (String) iterator.next(); Inty inty = (Inty) hash.get(key); if (list.size() < howmany || inty.count > min) { list.add(new SortableKeyAndInty(key,inty)); Collections.sort(list); if (list.size() > howmany) { list.remove(howmany); min = ((SortableKeyAndInty)list.get(howmany-1)).inty.count; } else { min = ((SortableKeyAndInty)list.get(list.size()-1)).inty.count; } } } return list; } private static void report(String label, Map hash, boolean shrink, int i) { MTNIOStats statsObject = new MTNIOStats(COLLATER); statsObject.label = label; statsObject.hash = hash; statsObject.shrink = shrink; statsObject.setName("Collater-"+i); statsObject.start(); collaters[i] = statsObject; } private void record(String client, String u, int bytes, String ref) { increment(u_bytes, u, bytes); if (pattern.matcher(u).matches()) { increment(u_hits, u, 1); increment(clients, client, 1); if (ref.length() > 3 && !"\"-\"".equals(ref) && !ref.startsWith("\"http://www.tbray.org/ongoing/")) increment(refs, ref.substring(1, ref.length()-2), 1); } } private static void increment(Map map, String u, int amount) { Inty inty = (Inty) map.get(u); if (inty == null) { inty = new Inty(); map.put(u, inty); } inty.count += amount; } static class Inty { int count = 0; } static class SortableKeyAndInty implements Comparable { Inty inty; String key; public SortableKeyAndInty(String key, Inty inty) { this.key = key; this.inty = inty; } public int compareTo(Object arg0) { int diff = ((SortableKeyAndInty)arg0).inty.count - inty.count; if (diff == 0) diff = key.compareTo(((SortableKeyAndInty)arg0).key); return diff; } } static class DataToProcess { long amountRead; ByteBuffer byteBuffer; String leftoverFromPrevious; public DataToProcess(long amountRead, ByteBuffer byteBuffer, String leftoverFromPrevious) { this.amountRead = amountRead; this.byteBuffer = byteBuffer; this.leftoverFromPrevious = leftoverFromPrevious; } } // Copied in from Apache Commons Lang for convenience, and also to optimize splits to give up after 11 // For speed, I removed use of ArrayList and hardcoded return array to 11 entries public static final String[] EMPTY_STRING_ARRAY = new String[0]; public static String[] split(String str, char separatorChar) { // Performance tuned for 2.0 (JDK1.4) if (str == null) { return null; } int len = str.length(); if (len == 0) { return EMPTY_STRING_ARRAY; } // List list = new ArrayList(); String[] result = new String[11]; int i = 0, start = 0, splits = 0, countOfQuotes = 0; boolean match = false; // while (i < len) { while (i < len && splits < 11) { if (str.charAt(i) == '"') countOfQuotes++; // if (str.charAt(i) == separatorChar) { if (str.charAt(i) == separatorChar && (splits != 6 || countOfQuotes == 2)) { if (match) { result[splits++] = str.substring(start, i); // list.add(str.substring(start, i)); match = false; } start = ++i; continue; } match = true; i++; } if (match) { result[splits++] = str.substring(start, i); // list.add(str.substring(start, i)); } // return (String[]) list.toArray(new String[list.size()]); if (splits < 11) return EMPTY_STRING_ARRAY; return result; } }