mirror of
https://github.com/wiremod/advdupe2.git
synced 2025-03-04 03:03:05 -05:00
Rewrote netstream (#472)
* Rewrote netstream * Only add timer when queue empty * Only update activitytimeout if queue is empty too * Move activity timeout reset into successful write * Move net read before return
This commit is contained in:
parent
238d02d289
commit
b9984fcea1
@ -3,153 +3,301 @@
|
|||||||
AddCSLuaFile()
|
AddCSLuaFile()
|
||||||
|
|
||||||
net.Stream = {}
|
net.Stream = {}
|
||||||
net.Stream.ReadStreamQueues = {} --This holds a read stream for each player, or one read stream for the server if running on the CLIENT
|
net.Stream.SendSize = 20000 --This is the size of each packet to send
|
||||||
net.Stream.WriteStreams = {} --This holds the write streams
|
net.Stream.Timeout = 10 --How long to wait for client response before cleaning up
|
||||||
net.Stream.SendSize = 20000 --This is the maximum size of each stream to send
|
net.Stream.MaxWriteStreams = 1024 --The maximum number of write data items to store
|
||||||
net.Stream.Timeout = 30 --How long the data should exist in the store without being used before being destroyed
|
net.Stream.MaxReadStreams = 128 --The maximum number of queued read data items to store
|
||||||
net.Stream.MaxServerReadStreams = 128 --The maximum number of keep-alives to have queued. This should prevent naughty players from flooding the network with keep-alive messages.
|
net.Stream.MaxChunks = 3200 --Maximum number of pieces the stream can send to the server. 64 MB
|
||||||
net.Stream.MaxServerChunks = 3200 --Maximum number of pieces the stream can send to the server. 64 MB
|
net.Stream.MaxSize = net.Stream.SendSize*net.Stream.MaxChunks
|
||||||
net.Stream.MaxTries = 3 --Maximum times the client may retry downloading the whole data
|
net.Stream.MaxTries = 3 --Maximum times the client may retry downloading the whole data
|
||||||
net.Stream.MaxKeepalive = 15 --Maximum times the client may request data stay live
|
|
||||||
|
|
||||||
net.Stream.ReadStream = {}
|
local WriteStreamQueue = {
|
||||||
--Send the data sender a request for data
|
__index = {
|
||||||
function net.Stream.ReadStream:Request()
|
Add = function(self, stream)
|
||||||
if self.downloads == net.Stream.MaxTries * self.numchunks then self:Remove() return end
|
local identifier = self.curidentifier
|
||||||
self.downloads = self.downloads + 1
|
local startid = identifier
|
||||||
-- print("Requesting",self.identifier,false,false,#self.chunks)
|
while self.queue[identifier] do
|
||||||
|
identifier = identifier % net.Stream.MaxWriteStreams + 1
|
||||||
|
if identifier == startid then
|
||||||
|
ErrorNoHalt("Netstream is full of WriteStreams!")
|
||||||
|
net.WriteUInt(0, 32)
|
||||||
|
return
|
||||||
|
end
|
||||||
|
end
|
||||||
|
self.curidentifier = identifier % net.Stream.MaxWriteStreams + 1
|
||||||
|
|
||||||
net.Start("NetStreamRequest")
|
if next(self.queue)==nil then
|
||||||
net.WriteUInt(self.identifier, 32)
|
self.activitytimeout = CurTime()+net.Stream.Timeout
|
||||||
net.WriteBit(false)
|
timer.Create("netstream_queueclean", 5, 0, function() self:Clean() end)
|
||||||
net.WriteBit(false)
|
end
|
||||||
net.WriteUInt(#self.chunks, 32)
|
self.queue[identifier] = stream
|
||||||
if CLIENT then net.SendToServer() else net.Send(self.player) end
|
stream.identifier = identifier
|
||||||
|
return stream
|
||||||
|
end,
|
||||||
|
|
||||||
timer.Create("NetStreamReadTimeout" .. self.identifier, net.Stream.Timeout/2, 1, function() self:Request() end)
|
Write = function(self, ply)
|
||||||
|
local identifier = net.ReadUInt(32)
|
||||||
|
local chunkidx = net.ReadUInt(32)
|
||||||
|
local stream = self.queue[identifier]
|
||||||
|
--print("Got request", identifier, chunkidx, stream)
|
||||||
|
if stream then
|
||||||
|
if stream:Write(ply, chunkidx) then
|
||||||
|
self.activitytimeout = CurTime()+net.Stream.Timeout
|
||||||
|
stream.timeout = CurTime()+net.Stream.Timeout
|
||||||
|
end
|
||||||
|
else
|
||||||
|
-- Tell them the stream doesn't exist
|
||||||
|
net.Start("NetStreamRead")
|
||||||
|
net.WriteUInt(identifier, 32)
|
||||||
|
net.WriteUInt(0, 32)
|
||||||
|
if SERVER then net.Send(ply) else net.SendToServer() end
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
|
||||||
end
|
Clean = function(self)
|
||||||
|
local t = CurTime()
|
||||||
--Received data so process it
|
for k, stream in pairs(self.queue) do
|
||||||
function net.Stream.ReadStream:Read(size)
|
if (next(stream.clients)~=nil and t >= stream.timeout) or t >= self.activitytimeout then
|
||||||
timer.Remove("NetStreamReadTimeout" .. self.identifier)
|
stream:Remove()
|
||||||
|
self.queue[k] = nil
|
||||||
local progress = net.ReadUInt(32)
|
end
|
||||||
if self.chunks[progress] then return end
|
end
|
||||||
|
if next(self.queue)==nil then
|
||||||
local crc = net.ReadString()
|
timer.Remove("netstream_queueclean")
|
||||||
local data = net.ReadData(size)
|
end
|
||||||
|
end,
|
||||||
if crc == util.CRC(data) then
|
},
|
||||||
self.chunks[progress] = data
|
__call = function(t)
|
||||||
end
|
return setmetatable({
|
||||||
if #self.chunks == self.numchunks then
|
activitytimeout = CurTime()+net.Stream.Timeout,
|
||||||
self.returndata = table.concat(self.chunks)
|
curidentifier = 1,
|
||||||
if self.compressed then
|
queue = {}
|
||||||
self.returndata = util.Decompress(self.returndata)
|
}, t)
|
||||||
end
|
|
||||||
self:Remove()
|
|
||||||
else
|
|
||||||
self:Request()
|
|
||||||
end
|
end
|
||||||
|
}
|
||||||
|
setmetatable(WriteStreamQueue, WriteStreamQueue)
|
||||||
|
net.Stream.WriteStreams = WriteStreamQueue()
|
||||||
|
|
||||||
end
|
local ReadStreamQueue = {
|
||||||
|
__index = {
|
||||||
|
Add = function(self, stream)
|
||||||
|
local queue = self.queues[stream.player]
|
||||||
|
|
||||||
--Gets the download progress
|
if #queue == net.Stream.MaxReadStreams then
|
||||||
function net.Stream.ReadStream:GetProgress()
|
ErrorNoHalt("Receiving too many ReadStream requests!")
|
||||||
return #self.chunks/self.numchunks
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
|
for _, v in ipairs(queue) do
|
||||||
|
if v.identifier == stream.identifier then
|
||||||
|
ErrorNoHalt("Tried to start a new ReadStream for an already existing stream!")
|
||||||
|
return
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
--Pop the queue and start the next task
|
queue[#queue+1] = stream
|
||||||
function net.Stream.ReadStream:Remove()
|
if #queue == 1 then
|
||||||
|
stream:Request()
|
||||||
|
end
|
||||||
|
return stream
|
||||||
|
end,
|
||||||
|
|
||||||
local ok, err = xpcall(self.callback, debug.traceback, self.returndata)
|
Remove = function(self, stream)
|
||||||
if not ok then ErrorNoHalt(err) end
|
local queue = rawget(self.queues, stream.player)
|
||||||
|
if queue then
|
||||||
|
if stream == queue[1] then
|
||||||
|
table.remove(queue, 1)
|
||||||
|
local nextInQueue = queue[1]
|
||||||
|
if nextInQueue then
|
||||||
|
nextInQueue:Request()
|
||||||
|
else
|
||||||
|
self.queues[stream.player] = nil
|
||||||
|
end
|
||||||
|
else
|
||||||
|
for k, v in ipairs(queue) do
|
||||||
|
if v == stream then
|
||||||
|
table.remove(queue, k)
|
||||||
|
break
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
|
||||||
net.Start("NetStreamRequest")
|
Read = function(self, ply)
|
||||||
net.WriteUInt(self.identifier, 32)
|
local identifier = net.ReadUInt(32)
|
||||||
net.WriteBit(false)
|
local queue = rawget(self.queues, ply)
|
||||||
net.WriteBit(true)
|
if queue and queue[1] then
|
||||||
if CLIENT then net.SendToServer() else net.Send(self.player) end
|
queue[1]:Read(identifier)
|
||||||
|
|
||||||
timer.Remove("NetStreamReadTimeout" .. self.identifier)
|
|
||||||
timer.Remove("NetStreamKeepAlive" .. self.identifier)
|
|
||||||
|
|
||||||
if self == self.queue[1] then
|
|
||||||
table.remove(self.queue, 1)
|
|
||||||
local nextInQueue = self.queue[1]
|
|
||||||
if nextInQueue then
|
|
||||||
timer.Remove("NetStreamKeepAlive" .. nextInQueue.identifier)
|
|
||||||
nextInQueue:Request()
|
|
||||||
else
|
|
||||||
net.Stream.ReadStreamQueues[self.player] = nil
|
|
||||||
end
|
|
||||||
else
|
|
||||||
for k, v in ipairs(self.queue) do
|
|
||||||
if v == self then
|
|
||||||
table.remove(self.queue, k)
|
|
||||||
break
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
},
|
||||||
|
__call = function(t)
|
||||||
|
return setmetatable({
|
||||||
|
queues = setmetatable({}, {__index = function(t,k) local r={} t[k]=r return r end})
|
||||||
|
}, t)
|
||||||
end
|
end
|
||||||
end
|
}
|
||||||
|
setmetatable(ReadStreamQueue, ReadStreamQueue)
|
||||||
|
net.Stream.ReadStreams = ReadStreamQueue()
|
||||||
|
|
||||||
net.Stream.ReadStream.__index = net.Stream.ReadStream
|
|
||||||
|
|
||||||
net.Stream.WriteStream = {}
|
local WritingDataItem = {
|
||||||
|
__index = {
|
||||||
|
Write = function(self, ply, chunkidx)
|
||||||
|
local client = self.clients[ply]
|
||||||
|
if client.finished then return false end
|
||||||
|
if chunkidx == #self.chunks+1 then self:Finished(ply) return true end
|
||||||
|
|
||||||
-- The player wants some data
|
if client.downloads+#self.chunks-client.progress >= net.Stream.MaxTries * #self.chunks then self:Finished(ply) return false end
|
||||||
function net.Stream.WriteStream:Write(ply)
|
client.downloads = client.downloads + 1
|
||||||
local progress = net.ReadUInt(32)+1
|
|
||||||
local chunk = self.chunks[progress]
|
|
||||||
if chunk then
|
|
||||||
self.clients[ply].progress = progress
|
|
||||||
net.Start("NetStreamDownload")
|
|
||||||
net.WriteUInt(#chunk.data, 32)
|
|
||||||
net.WriteUInt(progress, 32)
|
|
||||||
net.WriteString(chunk.crc)
|
|
||||||
net.WriteData(chunk.data, #chunk.data)
|
|
||||||
if CLIENT then net.SendToServer() else net.Send(ply) end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
-- The player notified us they finished downloading or cancelled
|
local chunk = self.chunks[chunkidx]
|
||||||
function net.Stream.WriteStream:Finished(ply)
|
if not chunk then return false end
|
||||||
self.clients[ply].finished = true
|
|
||||||
if self.callback then
|
|
||||||
local ok, err = xpcall(self.callback, debug.traceback, ply)
|
|
||||||
if not ok then ErrorNoHalt(err) end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
-- Get player's download progress
|
client.progress = chunkidx
|
||||||
function net.Stream.WriteStream:GetProgress(ply)
|
|
||||||
return self.clients[ply].progress / #self.chunks
|
|
||||||
end
|
|
||||||
|
|
||||||
-- If the stream owner cancels it, notify everyone who is subscribed
|
--print("Sending", "NetStreamRead", self.identifier, #chunk.data, chunkidx, chunk.crc)
|
||||||
function net.Stream.WriteStream:Remove()
|
net.Start("NetStreamRead")
|
||||||
local sendTo = {}
|
net.WriteUInt(self.identifier, 32)
|
||||||
for ply, client in pairs(self.clients) do
|
net.WriteUInt(#chunk.data, 32)
|
||||||
if not client.finished then
|
net.WriteUInt(chunkidx, 32)
|
||||||
client.finished = true
|
net.WriteString(chunk.crc)
|
||||||
if ply:IsValid() then sendTo[#sendTo+1] = ply end
|
net.WriteData(chunk.data, #chunk.data)
|
||||||
|
if CLIENT then net.SendToServer() else net.Send(ply) end
|
||||||
|
return true
|
||||||
|
end,
|
||||||
|
|
||||||
|
Finished = function(self, ply)
|
||||||
|
self.clients[ply].finished = true
|
||||||
|
if self.callback then
|
||||||
|
local ok, err = xpcall(self.callback, debug.traceback, ply)
|
||||||
|
if not ok then ErrorNoHalt(err) end
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
|
||||||
|
GetProgress = function(self, ply)
|
||||||
|
return self.clients[ply].progress / #self.chunks
|
||||||
|
end,
|
||||||
|
|
||||||
|
Remove = function(self)
|
||||||
|
local sendTo = {}
|
||||||
|
for ply, client in pairs(self.clients) do
|
||||||
|
if not client.finished then
|
||||||
|
client.finished = true
|
||||||
|
if CLIENT or ply:IsValid() then sendTo[#sendTo+1] = ply end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
if next(sendTo)~=nil then
|
||||||
|
--print("Sending", "NetStreamRead", self.identifier, 0)
|
||||||
|
net.Start("NetStreamRead")
|
||||||
|
net.WriteUInt(self.identifier, 32)
|
||||||
|
net.WriteUInt(0, 32)
|
||||||
|
if SERVER then net.Send(sendTo) else net.SendToServer() end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
},
|
||||||
|
__call = function(t, data, callback)
|
||||||
|
local chunks = {}
|
||||||
|
for i=1, math.ceil(#data / net.Stream.SendSize) do
|
||||||
|
local datachunk = string.sub(data, (i - 1) * net.Stream.SendSize + 1, i * net.Stream.SendSize)
|
||||||
|
chunks[i] = { data = datachunk, crc = util.CRC(datachunk) }
|
||||||
|
end
|
||||||
|
|
||||||
|
return setmetatable({
|
||||||
|
timeout = CurTime()+net.Stream.Timeout,
|
||||||
|
chunks = chunks,
|
||||||
|
callback = callback,
|
||||||
|
lasttouched = 0,
|
||||||
|
clients = setmetatable({},{__index = function(t,k)
|
||||||
|
local r = {
|
||||||
|
finished = false,
|
||||||
|
downloads = 0,
|
||||||
|
progress = 0,
|
||||||
|
} t[k]=r return r
|
||||||
|
end})
|
||||||
|
}, t)
|
||||||
end
|
end
|
||||||
|
}
|
||||||
|
setmetatable(WritingDataItem, WritingDataItem)
|
||||||
|
|
||||||
net.Start("NetStreamDownload")
|
local ReadingDataItem = {
|
||||||
net.WriteUInt(0, 32)
|
__index = {
|
||||||
net.WriteUInt(self.identifier, 32)
|
Request = function(self)
|
||||||
if SERVER then net.Send(sendTo) else net.SendToServer() end
|
if self.downloads+self.numchunks-#self.chunks >= net.Stream.MaxTries*self.numchunks then self:Remove() return end
|
||||||
net.Stream.WriteStreams[self.identifier] = nil
|
self.downloads = self.downloads + 1
|
||||||
end
|
timer.Create("NetStreamReadTimeout" .. self.identifier, net.Stream.Timeout*0.5, 1, function() self:Request() end)
|
||||||
|
self:WriteRequest()
|
||||||
|
end,
|
||||||
|
|
||||||
|
WriteRequest = function(self)
|
||||||
|
--print("Requesting", self.identifier, #self.chunks)
|
||||||
|
net.Start("NetStreamWrite")
|
||||||
|
net.WriteUInt(self.identifier, 32)
|
||||||
|
net.WriteUInt(#self.chunks+1, 32)
|
||||||
|
if CLIENT then net.SendToServer() else net.Send(self.player) end
|
||||||
|
end,
|
||||||
|
|
||||||
|
Read = function(self, identifier)
|
||||||
|
if self.identifier ~= identifier then self:Request() return end
|
||||||
|
|
||||||
|
local size = net.ReadUInt(32)
|
||||||
|
if size == 0 then self:Remove() return end
|
||||||
|
|
||||||
|
local chunkidx = net.ReadUInt(32)
|
||||||
|
if chunkidx ~= #self.chunks+1 then self:Request() return end
|
||||||
|
|
||||||
|
local crc = net.ReadString()
|
||||||
|
local data = net.ReadData(size)
|
||||||
|
|
||||||
|
if crc ~= util.CRC(data) then self:Request() return end
|
||||||
|
|
||||||
|
self.chunks[chunkidx] = data
|
||||||
|
if #self.chunks == self.numchunks then self:Remove(true) return end
|
||||||
|
|
||||||
|
self:Request()
|
||||||
|
end,
|
||||||
|
|
||||||
|
GetProgress = function(self)
|
||||||
|
return #self.chunks/self.numchunks
|
||||||
|
end,
|
||||||
|
|
||||||
|
Remove = function(self, finished)
|
||||||
|
timer.Remove("NetStreamReadTimeout" .. self.identifier)
|
||||||
|
|
||||||
|
local data
|
||||||
|
if finished then
|
||||||
|
data = table.concat(self.chunks)
|
||||||
|
if self.compressed then
|
||||||
|
data = util.Decompress(data, net.Stream.MaxSize)
|
||||||
|
end
|
||||||
|
self:WriteRequest() -- Notify we finished
|
||||||
|
end
|
||||||
|
|
||||||
|
local ok, err = xpcall(self.callback, debug.traceback, data)
|
||||||
|
if not ok then ErrorNoHalt(err) end
|
||||||
|
|
||||||
|
net.Stream.ReadStreams:Remove(self)
|
||||||
|
end
|
||||||
|
},
|
||||||
|
__call = function(t, ply, callback, numchunks, identifier, compressed)
|
||||||
|
return setmetatable({
|
||||||
|
identifier = identifier,
|
||||||
|
chunks = {},
|
||||||
|
compressed = compressed,
|
||||||
|
numchunks = numchunks,
|
||||||
|
callback = callback,
|
||||||
|
player = ply,
|
||||||
|
downloads = 0
|
||||||
|
}, t)
|
||||||
|
end
|
||||||
|
}
|
||||||
|
setmetatable(ReadingDataItem, ReadingDataItem)
|
||||||
|
|
||||||
net.Stream.WriteStream.__index = net.Stream.WriteStream
|
|
||||||
|
|
||||||
--Store the data and write the file info so receivers can request it.
|
|
||||||
local identifier = 1
|
|
||||||
function net.WriteStream(data, callback, dontcompress)
|
function net.WriteStream(data, callback, dontcompress)
|
||||||
|
|
||||||
if not isstring(data) then
|
if not isstring(data) then
|
||||||
error("bad argument #1 to 'WriteStream' (string expected, got " .. type(data) .. ")", 2)
|
error("bad argument #1 to 'WriteStream' (string expected, got " .. type(data) .. ")", 2)
|
||||||
end
|
end
|
||||||
@ -167,54 +315,18 @@ function net.WriteStream(data, callback, dontcompress)
|
|||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
local numchunks = math.ceil(#data / net.Stream.SendSize)
|
if #data > net.Stream.MaxSize then
|
||||||
if CLIENT and numchunks > net.Stream.MaxServerChunks then
|
|
||||||
ErrorNoHalt("net.WriteStream request is too large! ", #data/1048576, "MiB")
|
ErrorNoHalt("net.WriteStream request is too large! ", #data/1048576, "MiB")
|
||||||
net.WriteUInt(0, 32)
|
net.WriteUInt(0, 32)
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
local chunks = {}
|
local stream = net.Stream.WriteStreams:Add(WritingDataItem(data, callback, compressed))
|
||||||
for i=1, numchunks do
|
if not stream then return end
|
||||||
local datachunk = string.sub(data, (i - 1) * net.Stream.SendSize + 1, i * net.Stream.SendSize)
|
|
||||||
chunks[i] = {
|
|
||||||
data = datachunk,
|
|
||||||
crc = util.CRC(datachunk),
|
|
||||||
}
|
|
||||||
end
|
|
||||||
|
|
||||||
local startid = identifier
|
--print("WriteStream", #stream.chunks, stream.identifier, compressed)
|
||||||
while net.Stream.WriteStreams[identifier] do
|
net.WriteUInt(#stream.chunks, 32)
|
||||||
identifier = identifier % 1024 + 1
|
net.WriteUInt(stream.identifier, 32)
|
||||||
if identifier == startid then
|
|
||||||
ErrorNoHalt("Netstream is full of WriteStreams!")
|
|
||||||
net.WriteUInt(0, 32)
|
|
||||||
return
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
local stream = {
|
|
||||||
identifier = identifier,
|
|
||||||
chunks = chunks,
|
|
||||||
compressed = compressed,
|
|
||||||
numchunks = numchunks,
|
|
||||||
callback = callback,
|
|
||||||
clients = setmetatable({},{__index = function(t,k)
|
|
||||||
local r = {
|
|
||||||
finished = false,
|
|
||||||
downloads = 0,
|
|
||||||
keepalives = 0,
|
|
||||||
progress = 0,
|
|
||||||
} t[k]=r return r
|
|
||||||
end})
|
|
||||||
}
|
|
||||||
setmetatable(stream, net.Stream.WriteStream)
|
|
||||||
|
|
||||||
net.Stream.WriteStreams[identifier] = stream
|
|
||||||
timer.Create("NetStreamWriteTimeout" .. identifier, net.Stream.Timeout, 1, function() stream:Remove() end)
|
|
||||||
|
|
||||||
net.WriteUInt(numchunks, 32)
|
|
||||||
net.WriteUInt(identifier, 32)
|
|
||||||
net.WriteBool(compressed)
|
net.WriteBool(compressed)
|
||||||
|
|
||||||
return stream
|
return stream
|
||||||
@ -223,7 +335,6 @@ end
|
|||||||
--If the receiver is a player then add it to a queue.
|
--If the receiver is a player then add it to a queue.
|
||||||
--If the receiver is the server then add it to a queue for each individual player
|
--If the receiver is the server then add it to a queue for each individual player
|
||||||
function net.ReadStream(ply, callback)
|
function net.ReadStream(ply, callback)
|
||||||
|
|
||||||
if CLIENT then
|
if CLIENT then
|
||||||
ply = NULL
|
ply = NULL
|
||||||
else
|
else
|
||||||
@ -236,17 +347,7 @@ function net.ReadStream(ply, callback)
|
|||||||
if not isfunction(callback) then
|
if not isfunction(callback) then
|
||||||
error("bad argument #2 to 'ReadStream' (function expected, got " .. type(callback) .. ")", 2)
|
error("bad argument #2 to 'ReadStream' (function expected, got " .. type(callback) .. ")", 2)
|
||||||
end
|
end
|
||||||
|
|
||||||
local queue = net.Stream.ReadStreamQueues[ply]
|
|
||||||
if queue then
|
|
||||||
if SERVER and #queue == net.Stream.MaxServerReadStreams then
|
|
||||||
ErrorNoHalt("Receiving too many ReadStream requests from ", ply)
|
|
||||||
return
|
|
||||||
end
|
|
||||||
else
|
|
||||||
queue = {} net.Stream.ReadStreamQueues[ply] = queue
|
|
||||||
end
|
|
||||||
|
|
||||||
local numchunks = net.ReadUInt(32)
|
local numchunks = net.ReadUInt(32)
|
||||||
if numchunks == nil then
|
if numchunks == nil then
|
||||||
return
|
return
|
||||||
@ -255,110 +356,31 @@ function net.ReadStream(ply, callback)
|
|||||||
if not ok then ErrorNoHalt(err) end
|
if not ok then ErrorNoHalt(err) end
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
if SERVER and numchunks > net.Stream.MaxServerChunks then
|
|
||||||
|
local identifier = net.ReadUInt(32)
|
||||||
|
local compressed = net.ReadBool()
|
||||||
|
|
||||||
|
if numchunks > net.Stream.MaxChunks then
|
||||||
ErrorNoHalt("ReadStream requests from ", ply, " is too large! ", numchunks * net.Stream.SendSize / 1048576, "MiB")
|
ErrorNoHalt("ReadStream requests from ", ply, " is too large! ", numchunks * net.Stream.SendSize / 1048576, "MiB")
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
local identifier = net.ReadUInt(32)
|
--print("ReadStream", numchunks, identifier, compressed)
|
||||||
local compressed = net.ReadBool()
|
|
||||||
--print("Got info", numchunks, identifier, compressed)
|
|
||||||
|
|
||||||
for _, v in ipairs(queue) do
|
return net.Stream.ReadStreams:Add(ReadingDataItem(ply, callback, numchunks, identifier, compressed))
|
||||||
if v.identifier == identifier then
|
|
||||||
ErrorNoHalt("Tried to start a new ReadStream for an already existing stream!")
|
|
||||||
return
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
local stream = {
|
|
||||||
identifier = identifier,
|
|
||||||
chunks = {},
|
|
||||||
compressed = compressed,
|
|
||||||
numchunks = numchunks,
|
|
||||||
callback = callback,
|
|
||||||
queue = queue,
|
|
||||||
player = ply,
|
|
||||||
downloads = 0
|
|
||||||
}
|
|
||||||
setmetatable(stream, net.Stream.ReadStream)
|
|
||||||
|
|
||||||
queue[#queue + 1] = stream
|
|
||||||
if #queue > 1 then
|
|
||||||
timer.Create("NetStreamKeepAlive" .. identifier, net.Stream.Timeout / 2, 0, function()
|
|
||||||
net.Start("NetStreamRequest")
|
|
||||||
net.WriteUInt(identifier, 32)
|
|
||||||
net.WriteBit(true)
|
|
||||||
if CLIENT then net.SendToServer() else net.Send(ply) end
|
|
||||||
end)
|
|
||||||
else
|
|
||||||
stream:Request()
|
|
||||||
end
|
|
||||||
|
|
||||||
return stream
|
|
||||||
end
|
end
|
||||||
|
|
||||||
if SERVER then
|
if SERVER then
|
||||||
|
util.AddNetworkString("NetStreamWrite")
|
||||||
util.AddNetworkString("NetStreamRequest")
|
util.AddNetworkString("NetStreamRead")
|
||||||
util.AddNetworkString("NetStreamDownload")
|
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
--Stream data is requested
|
--Send requested stream data
|
||||||
net.Receive("NetStreamRequest", function(len, ply)
|
net.Receive("NetStreamWrite", function(len, ply)
|
||||||
|
net.Stream.WriteStreams:Write(ply or NULL)
|
||||||
local identifier = net.ReadUInt(32)
|
|
||||||
local stream = net.Stream.WriteStreams[identifier]
|
|
||||||
|
|
||||||
if stream then
|
|
||||||
ply = ply or NULL
|
|
||||||
local client = stream.clients[ply]
|
|
||||||
|
|
||||||
if not client.finished then
|
|
||||||
local keepalive = net.ReadBit() == 1
|
|
||||||
if keepalive then
|
|
||||||
if client.keepalives < net.Stream.MaxKeepalive then
|
|
||||||
client.keepalives = client.keepalives + 1
|
|
||||||
timer.Adjust("NetStreamWriteTimeout" .. identifier, net.Stream.Timeout, 1)
|
|
||||||
end
|
|
||||||
else
|
|
||||||
local completed = net.ReadBit() == 1
|
|
||||||
if completed then
|
|
||||||
stream:Finished(ply)
|
|
||||||
else
|
|
||||||
if client.downloads < net.Stream.MaxTries * #stream.chunks then
|
|
||||||
client.downloads = client.downloads + 1
|
|
||||||
stream:Write(ply)
|
|
||||||
timer.Adjust("NetStreamWriteTimeout" .. identifier, net.Stream.Timeout, 1)
|
|
||||||
else
|
|
||||||
client.finished = true
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
end)
|
end)
|
||||||
|
|
||||||
--Download the stream data
|
--Download the sent stream data
|
||||||
net.Receive("NetStreamDownload", function(len, ply)
|
net.Receive("NetStreamRead", function(len, ply)
|
||||||
|
net.Stream.ReadStreams:Read(ply or NULL)
|
||||||
ply = ply or NULL
|
|
||||||
local queue = net.Stream.ReadStreamQueues[ply]
|
|
||||||
if queue then
|
|
||||||
local size = net.ReadUInt(32)
|
|
||||||
if size > 0 then
|
|
||||||
queue[1]:Read(size)
|
|
||||||
else
|
|
||||||
local id = net.ReadUInt(32)
|
|
||||||
for k, v in ipairs(queue) do
|
|
||||||
if v.identifier == id then
|
|
||||||
v:Remove()
|
|
||||||
break
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
end)
|
end)
|
||||||
|
Loading…
Reference in New Issue
Block a user