Skip to content

Commit

Permalink
introduce offset handling
Browse files Browse the repository at this point in the history
  • Loading branch information
LoW0lf committed Apr 16, 2015
1 parent 1c2795f commit 884163f
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 25 deletions.
14 changes: 1 addition & 13 deletions src/HMB/Internal/Handler.hs
Original file line number Diff line number Diff line change
Expand Up @@ -86,25 +86,16 @@ sendProduceResponse socket responsemessage = do
-- FetchRequest
-----------------

requestLog :: Request -> IO [Log]
requestLog req = mapM readLog [
(BS.unpack(topicName x), fromIntegral(rqFtPartitionNumber y))
| x <- rqFtTopics req, y <- partitions x
]

packPartitionsToFtRsPayload :: TopicName -> Partition -> IO RsFtPayload
packPartitionsToFtRsPayload t p = do
log <- readLog (BS.unpack $ t, fromIntegral $ rqFtPartitionNumber p)
log <- readLog (BS.unpack $ t, fromIntegral $ rqFtPartitionNumber p, fromIntegral $ rqFtFetchOffset p)
return $ RsFtPayload
0
0
0
(fromIntegral $ BL.length $ foldl (\acc ms -> BL.append acc (buildMessageSet ms)) BL.empty log)
log

--packLogToFtRsPayload :: Log -> [RsFtPayload]
--packLogToFtRsPayload log = packMsToFtRsPayload log

packLogToFtRs :: Topic -> IO Response
packLogToFtRs t = do
rss <- (mapM (packPartitionsToFtRsPayload $ topicName t) $ partitions t)
Expand All @@ -116,11 +107,8 @@ packLogToFtRs t = do

handleFetchRequest :: Request -> Socket -> IO()
handleFetchRequest req sock = do
logs <- requestLog req
rs <- mapM packLogToFtRs (rqFtTopics req)
let rsms = ResponseMessage 0 1 rs
print rsms
let msg = buildFtRsMessage rsms
SBL.sendAll sock msg
print $ "send resp:" ++ show msg

49 changes: 37 additions & 12 deletions src/HMB/Internal/Log/Writer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import qualified Data.ByteString.Lazy as BL
import Data.Binary.Put
import System.Directory
import Control.Conditional

import Control.Monad
import Kafka.Protocol

import Data.Binary.Get
Expand All @@ -25,22 +25,45 @@ logFile o = (show $ fromIntegral o) ++ ".log"
getPath :: String -> String -> String
getPath folder file = folder ++ "/" ++ file

buildLog :: Log -> BL.ByteString
buildLog [] = BL.empty
buildLog (x:xs) =
BL.append (buildMessageSet x) (buildLog xs)
buildLog :: Offset -> Log -> BL.ByteString
buildLog o [] = BL.empty
buildLog o (x:xs) =
(BL.append (buildLogEntry x o) (buildLog (o + 1) xs))

writeLog :: MessageInput -> IO()
writeLog (topicName, partitionNumber, log) = do
createDirectoryIfMissing False $ logFolder topicName partitionNumber
let filePath = getPath (logFolder topicName partitionNumber) (logFile 0)
ifM (doesFileExist filePath)
(BL.appendFile filePath $ buildLog log)
(BL.writeFile filePath $ buildLog log)
(appendToLog filePath (topicName,partitionNumber, log))
(newLog filePath (topicName,partitionNumber, log))

appendToLog :: String -> MessageInput -> IO()
appendToLog filepath (t, p, log) = do
o <- getMaxOffsetOfLog (t, p, log)
print o --TODO: is needed for preventing file lock ...
let l = buildLog (o + 1) log
BL.appendFile filepath l
return ()

newLog :: String -> MessageInput -> IO()
newLog filepath (t, p, log) = do
let l = buildLog 0 log
BL.writeFile filepath l
return ()

maxOffset :: [Offset] -> Offset
maxOffset [x] = x
maxOffset (x:xs) = max x (maxOffset xs)

getMaxOffsetOfLog :: MessageInput -> IO Offset
getMaxOffsetOfLog (t, p, _) = do
log <- readLogFromBeginning (t,p) --TODO: optimieren, dass nich gesamter log gelesen werden muss
return (maxOffset $ [ offset x | x <- log ])

-- todo: move to reader


-- todo: move to reader
getLog :: Get Log
getLog = do
empty <- isEmpty
Expand All @@ -55,9 +78,11 @@ parseLog a = do
input <- BL.readFile a
return (runGet getLog input)


readLog :: (String, Int) -> IO Log
readLog (t, p) = parseLog $
readLogFromBeginning :: (String, Int) -> IO Log
readLogFromBeginning (t, p) = parseLog $
getPath (logFolder t p) (logFile 0)


readLog :: (String, Int, Int) -> IO Log
readLog (t, p, o) = do
log <- readLogFromBeginning (t,p)
return ([ x | x <- log, fromIntegral(offset x) >= o])

0 comments on commit 884163f

Please sign in to comment.