mirror of
https://github.com/wiremod/advdupe2.git
synced 2025-03-04 03:03:05 -05:00
Better error detection and handling (#294)
This commit is contained in:
parent
bb4091e3b6
commit
6036cdfcdf
@ -6,34 +6,42 @@ net.Stream.SendSize = 20000 --This is the maximum size of each stream
|
||||
net.Stream.Timeout = 30 --How long the data should exist in the store without being used before being destroyed
|
||||
net.Stream.MaxServerReadStreams = 128 --The maximum number of keep-alives to have queued. This should prevent naughty players from flooding the network with keep-alive messages.
|
||||
net.Stream.MaxServerChunks = 3200 --Maximum number of pieces the stream can send to the server. 64 MB
|
||||
net.Stream.MaxTries = 3 --Maximum times the client may retry downloading the whole data
|
||||
net.Stream.MaxKeepalive = 15 --Maximum times the client may request data stay live
|
||||
|
||||
net.Stream.ReadStream = {}
|
||||
--Send the data sender a request for data
|
||||
function net.Stream.ReadStream:Request()
|
||||
|
||||
--print("Requesting",self.identifier,#self.data)
|
||||
if self.downloads == net.Stream.MaxTries * self.numchunks then self:Remove() return end
|
||||
self.downloads = self.downloads + 1
|
||||
-- print("Requesting",self.identifier,false,false,#self.chunks)
|
||||
|
||||
net.Start("NetStreamRequest")
|
||||
net.WriteUInt(self.identifier, 32)
|
||||
net.WriteBit(false)
|
||||
net.WriteBit(false)
|
||||
net.WriteUInt(#self.chunks, 32)
|
||||
if CLIENT then net.SendToServer() else net.Send(self.player) end
|
||||
|
||||
timer.Create("NetStreamReadTimeout" .. self.identifier, net.Stream.Timeout, 1, function() self:Remove() end)
|
||||
timer.Create("NetStreamReadTimeout" .. self.identifier, net.Stream.Timeout/2, 1, function() self:Request() end)
|
||||
|
||||
end
|
||||
|
||||
--Received data so process it
|
||||
function net.Stream.ReadStream:Read(len)
|
||||
|
||||
local size = math.floor(len / 8)
|
||||
--print("Got", size)
|
||||
function net.Stream.ReadStream:Read()
|
||||
|
||||
local size = net.ReadUInt(32)
|
||||
if size == 0 then self:Remove() return end
|
||||
|
||||
self.data[#self.data + 1] = net.ReadData(size)
|
||||
if #self.data == self.numchunks then
|
||||
self.returndata = table.concat(self.data)
|
||||
timer.Remove("NetStreamReadTimeout" .. self.identifier)
|
||||
|
||||
local crc = net.ReadString()
|
||||
local data = net.ReadData(size)
|
||||
if crc == util.CRC(data) then
|
||||
self.chunks[#self.chunks + 1] = data
|
||||
end
|
||||
if #self.chunks == self.numchunks then
|
||||
self.returndata = table.concat(self.chunks)
|
||||
if self.compressed then
|
||||
self.returndata = util.Decompress(self.returndata)
|
||||
end
|
||||
@ -46,7 +54,7 @@ end
|
||||
|
||||
--Gets the download progress
|
||||
function net.Stream.ReadStream:GetProgress()
|
||||
return #self.data/self.numchunks
|
||||
return #self.chunks/self.numchunks
|
||||
end
|
||||
|
||||
--Pop the queue and start the next task
|
||||
@ -80,28 +88,21 @@ net.Stream.WriteStream = {}
|
||||
|
||||
-- The player wants some data
|
||||
function net.Stream.WriteStream:Write(ply)
|
||||
local progress = self.progress[ply] or 0
|
||||
if progress < self.numchunks then
|
||||
self.progress[ply] = progress + 1
|
||||
|
||||
local progress = net.ReadUInt(32)
|
||||
local chunk = self.chunks[progress+1]
|
||||
if chunk then
|
||||
self.clients[ply].progress = progress
|
||||
net.Start("NetStreamDownload")
|
||||
|
||||
local start = math.min(progress * net.Stream.SendSize + 1, #self.data)
|
||||
local endpos = math.min(start + net.Stream.SendSize - 1, #self.data)
|
||||
local senddata = string.sub(self.data, start, endpos)
|
||||
|
||||
--print("Responding",#senddata,start,endpos)
|
||||
|
||||
net.WriteData(senddata, #senddata)
|
||||
|
||||
net.WriteUInt(#chunk.data, 32)
|
||||
net.WriteString(chunk.crc)
|
||||
net.WriteData(chunk.data, #chunk.data)
|
||||
if CLIENT then net.SendToServer() else net.Send(ply) end
|
||||
end
|
||||
end
|
||||
|
||||
-- The player notified us they finished downloading or cancelled
|
||||
function net.Stream.WriteStream:Finished(ply)
|
||||
self.progress[ply] = nil
|
||||
self.finished[ply] = true
|
||||
self.clients[ply].finished = true
|
||||
if self.callback then
|
||||
local ok, err = xpcall(self.callback, debug.traceback, ply)
|
||||
if not ok then ErrorNoHalt(err) end
|
||||
@ -110,28 +111,19 @@ end
|
||||
|
||||
-- Get player's download progress
|
||||
function net.Stream.WriteStream:GetProgress(ply)
|
||||
return (self.progress[ply] or 0) * net.Stream.SendSize / #self.data
|
||||
return self.clients[ply].progress / #self.chunks
|
||||
end
|
||||
|
||||
-- If the stream owner cancels it, notify everyone who is subscribed
|
||||
function net.Stream.WriteStream:Remove()
|
||||
if SERVER then
|
||||
local sendTo = {}
|
||||
for ply, _ in pairs(self.progress) do
|
||||
self.progress[ply] = nil
|
||||
self.finished[ply] = true
|
||||
if ply:IsValid() then sendTo[#sendTo+1] = ply end
|
||||
end
|
||||
net.Start("NetStreamDownload")
|
||||
net.Send(sendTo)
|
||||
else
|
||||
if self.progress[NULL] then
|
||||
self.progress[NULL] = nil
|
||||
self.finished[NULL] = true
|
||||
net.Start("NetStreamDownload")
|
||||
net.SendToServer()
|
||||
end
|
||||
local sendTo = {}
|
||||
for ply, client in pairs(self.clients) do
|
||||
client.finished = true
|
||||
if ply:IsValid() then sendTo[#sendTo+1] = ply end
|
||||
end
|
||||
net.Start("NetStreamDownload")
|
||||
net.WriteUInt(0, 32)
|
||||
if SERVER then net.Send(sendTo) else net.SendToServer() end
|
||||
net.Stream.WriteStreams[self.identifier] = nil
|
||||
end
|
||||
|
||||
@ -152,18 +144,26 @@ function net.WriteStream(data, callback, dontcompress)
|
||||
data = util.Compress(data) or ""
|
||||
end
|
||||
|
||||
local numchunks = math.ceil(#data / net.Stream.SendSize)
|
||||
if numchunks == 0 then
|
||||
if #data == 0 then
|
||||
net.WriteUInt(0, 32)
|
||||
return
|
||||
end
|
||||
|
||||
|
||||
local numchunks = math.ceil(#data / net.Stream.SendSize)
|
||||
if CLIENT and numchunks > net.Stream.MaxServerChunks then
|
||||
ErrorNoHalt("net.WriteStream request is too large! ", #data/1048576, "MiB")
|
||||
net.WriteUInt(0, 32)
|
||||
net.WriteUInt(0, 32)
|
||||
return
|
||||
end
|
||||
|
||||
local chunks = {}
|
||||
for i=1, numchunks do
|
||||
local datachunk = string.sub(data, (i - 1) * net.Stream.SendSize + 1, i * net.Stream.SendSize)
|
||||
chunks[i] = {
|
||||
data = datachunk,
|
||||
crc = util.CRC(datachunk),
|
||||
}
|
||||
end
|
||||
|
||||
local identifier = 1
|
||||
while net.Stream.WriteStreams[identifier] do
|
||||
@ -172,12 +172,18 @@ function net.WriteStream(data, callback, dontcompress)
|
||||
|
||||
local stream = {
|
||||
identifier = identifier,
|
||||
data = data,
|
||||
chunks = chunks,
|
||||
compressed = compressed,
|
||||
numchunks = numchunks,
|
||||
callback = callback,
|
||||
progress = {},
|
||||
finished = {}
|
||||
clients = setmetatable({},{__index = function(t,k)
|
||||
local r = {
|
||||
finished = false,
|
||||
downloads = 0,
|
||||
keepalives = 0,
|
||||
progress = 0,
|
||||
} t[k]=r return r
|
||||
end})
|
||||
}
|
||||
setmetatable(stream, net.Stream.WriteStream)
|
||||
|
||||
@ -209,6 +215,14 @@ function net.ReadStream(ply, callback)
|
||||
end
|
||||
|
||||
local queue = net.Stream.ReadStreamQueues[ply]
|
||||
if queue then
|
||||
if SERVER and #queue == net.Stream.MaxServerReadStreams then
|
||||
ErrorNoHalt("Receiving too many ReadStream requests from ", ply)
|
||||
return
|
||||
end
|
||||
else
|
||||
queue = {} net.Stream.ReadStreamQueues[ply] = queue
|
||||
end
|
||||
|
||||
local numchunks = net.ReadUInt(32)
|
||||
if numchunks == nil then
|
||||
@ -218,30 +232,31 @@ function net.ReadStream(ply, callback)
|
||||
if not ok then ErrorNoHalt(err) end
|
||||
return
|
||||
end
|
||||
local identifier = net.ReadUInt(32)
|
||||
local compressed = net.ReadBool()
|
||||
--print("Got info", numchunks, identifier)
|
||||
|
||||
if SERVER and queue and #queue == net.Stream.MaxServerReadStreams then
|
||||
ErrorNoHalt("Receiving too many ReadStream requests from ", ply)
|
||||
return
|
||||
end
|
||||
|
||||
if SERVER and numchunks > net.Stream.MaxServerChunks then
|
||||
ErrorNoHalt("ReadStream requests from ", ply, " is too large! ", numchunks * net.Stream.SendSize / 1048576, "MiB")
|
||||
return
|
||||
end
|
||||
|
||||
if not queue then queue = {} net.Stream.ReadStreamQueues[ply] = queue end
|
||||
local identifier = net.ReadUInt(32)
|
||||
local compressed = net.ReadBool()
|
||||
--print("Got info", numchunks, identifier, compressed)
|
||||
|
||||
for _, v in ipairs(queue) do
|
||||
if v.identifier == identifier then
|
||||
ErrorNoHalt("Tried to start a new ReadStream for an already existing stream!")
|
||||
return
|
||||
end
|
||||
end
|
||||
|
||||
local stream = {
|
||||
identifier = identifier,
|
||||
data = {},
|
||||
chunks = {},
|
||||
compressed = compressed,
|
||||
numchunks = numchunks,
|
||||
callback = callback,
|
||||
queue = queue,
|
||||
player = ply
|
||||
player = ply,
|
||||
downloads = 0
|
||||
}
|
||||
setmetatable(stream, net.Stream.ReadStream)
|
||||
|
||||
@ -251,7 +266,6 @@ function net.ReadStream(ply, callback)
|
||||
net.Start("NetStreamRequest")
|
||||
net.WriteUInt(identifier, 32)
|
||||
net.WriteBit(true)
|
||||
net.WriteBit(false)
|
||||
if CLIENT then net.SendToServer() else net.Send(ply) end
|
||||
end)
|
||||
else
|
||||
@ -272,23 +286,36 @@ end
|
||||
net.Receive("NetStreamRequest", function(len, ply)
|
||||
|
||||
local identifier = net.ReadUInt(32)
|
||||
local keepalive = net.ReadBit() == 1
|
||||
local completed = net.ReadBit() == 1
|
||||
local stream = net.Stream.WriteStreams[identifier]
|
||||
|
||||
ply = ply or NULL
|
||||
if stream and not stream.finished[ply] then
|
||||
timer.Adjust("NetStreamWriteTimeout" .. identifier, net.Stream.Timeout, 1)
|
||||
if stream then
|
||||
ply = ply or NULL
|
||||
local client = stream.clients[ply]
|
||||
|
||||
if not keepalive then
|
||||
if completed then
|
||||
stream:Finished(ply)
|
||||
if not client.finished then
|
||||
local keepalive = net.ReadBit() == 1
|
||||
if keepalive then
|
||||
if client.keepalives < net.Stream.MaxKeepalive then
|
||||
client.keepalives = client.keepalives + 1
|
||||
timer.Adjust("NetStreamWriteTimeout" .. identifier, net.Stream.Timeout, 1)
|
||||
end
|
||||
else
|
||||
stream:Write(ply)
|
||||
local completed = net.ReadBit() == 1
|
||||
if completed then
|
||||
stream:Finished(ply)
|
||||
else
|
||||
if client.downloads < net.Stream.MaxTries * #stream.chunks then
|
||||
client.downloads = client.downloads + 1
|
||||
stream:Write(ply)
|
||||
timer.Adjust("NetStreamWriteTimeout" .. identifier, net.Stream.Timeout, 1)
|
||||
else
|
||||
client.finished = true
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
end)
|
||||
|
||||
--Download the stream data
|
||||
@ -298,7 +325,7 @@ net.Receive("NetStreamDownload", function(len, ply)
|
||||
local queue = net.Stream.ReadStreamQueues[ply]
|
||||
if queue and queue[1] then
|
||||
|
||||
queue[1]:Read(len)
|
||||
queue[1]:Read()
|
||||
|
||||
end
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user