1. outputStream.writeBytes(rawMessage.toString());
2. log.debug("Flushing stream, size = " + s.getOutputStream().size());
s.getOutputStream().sync();
log.debug("Flushed stream, size = " + s.getOutputStream().size());
or
log.debug("Flushing stream, size = " + s.getOutputStream().size());
s.getOutputStream().flush();
log.debug("Flushed stream, size = " + s.getOutputStream().size());
Just see the size() remain the same after performing this action.
This is using hadoop-0.20.0.
2009-05-12 12:42:17,470 DEBUG [Thread-7] (FSStreamManager.java:28) hdfs.HdfsQueueConsumer: Thread 19 getting an output stream
2009-05-12 12:42:17,470 DEBUG [Thread-7] (FSStreamManager.java:49) hdfs.HdfsQueueConsumer: Re-using existing stream
2009-05-12 12:42:17,472 DEBUG [Thread-7] (FSStreamManager.java:63) hdfs.HdfsQueueConsumer: Flushing stream, size = 1986
2009-05-12 12:42:17,472 DEBUG [Thread-7] (DFSClient.java:3013) hdfs.DFSClient: DFSClient flush() : saveOffset 1613 bytesCurBlock 1986 lastFlushOffset 1731
2009-05-12 12:42:17,472 DEBUG [Thread-7] (FSStreamManager.java:66) hdfs.HdfsQueueConsumer: Flushed stream, size = 1986
2009-05-12 12:42:19,586 DEBUG [Thread-7] (HdfsQueueConsumer.java:39) hdfs.HdfsQueueConsumer: Consumer writing event
2009-05-12 12:42:19,587 DEBUG [Thread-7] (FSStreamManager.java:28) hdfs.HdfsQueueConsumer: Thread 19 getting an output stream
2009-05-12 12:42:19,588 DEBUG [Thread-7] (FSStreamManager.java:49) hdfs.HdfsQueueConsumer: Re-using existing stream
2009-05-12 12:42:19,589 DEBUG [Thread-7] (FSStreamManager.java:63) hdfs.HdfsQueueConsumer: Flushing stream, size = 2235
2009-05-12 12:42:19,589 DEBUG [Thread-7] (DFSClient.java:3013) hdfs.DFSClient: DFSClient flush() : saveOffset 2125 bytesCurBlock 2235 lastFlushOffset 1986
2009-05-12 12:42:19,590 DEBUG [Thread-7] (FSStreamManager.java:66) hdfs.HdfsQueueConsumer: Flushed stream, size = 2235
So although the Offset is changing as expected, the output stream isn't being flushed or cleared out and isn't being written to file...
Will investigate using hbase now as a container for all of the information. It adds a little more overhead but allows the ability to still use hadoop/hdfs as the underlying storage engine while satisfying lots of concurrent writes (inserts in the context of hbase)