Skip to content

Instantly share code, notes, and snippets.

@FevenKitsune
Last active July 13, 2023 06:42
Show Gist options
  • Select an option

  • Save FevenKitsune/fd5b77830f89cb2a76f42164c001d534 to your computer and use it in GitHub Desktop.

Select an option

Save FevenKitsune/fd5b77830f89cb2a76f42164c001d534 to your computer and use it in GitHub Desktop.
Starfall Bash Indev (note, only for demo, this has SEVERAL security flaws)
--@name WebsocketShellv2
--@author FevenKitsune
--@shared
--@model models/props_lab/harddrive02.mdl
if SERVER then
--> SERVER CODE <--
local program_buffer = "" -- Buffer to store outgoing data.
-- Forward websocket messages to all clients.
net.receive("writeToScreen", function(len)
-- Read message and push to all clients
msg = net.readString()
if net.getBytesLeft() > #msg then
net.start("writeToLocalScreen")
net.writeString(msg)
net.send()
end
end)
-- Chat input hook
hook.add("PlayerSay", "CommandProcess", function(ply, text, teamChat)
if ply ~= owner() && ply:getName() ~= "A Normal Twig" then
-- This is the single line of code that prevents RCE exploits.
return
end
if text == "clr" then
net.start("clrLocalScreen")
net.send()
return ""
elseif text:sub(1,1) == ">" then
local subtext = text:sub(2)
net.start("transmitToWebsocket")
net.writeString(subtext)
net.send()
return ""
end
end)
else
--> DISTRIBUTED CLIENT CODE <--
local string_output_buffer = ""
--https://stackoverflow.com/questions/11152220/counting-number-of-string-occurrences
function count(base, pattern)
return select(2, string.gsub(base, pattern, ""))
end
-- Screen renderer
hook.add("render", "", function ()
local font_height
local rx, ry = render.getResolution()
render.setFilterMin(1)
render.setFilterMag(1)
render.setFont("DebugFixedSmall")
-- Check line count
if count(string_output_buffer, "\n") > 35 then
-- Find first line and remove it.
local pos = string_output_buffer:find("\n",0)
string_output_buffer = string_output_buffer:sub(pos+1)
end
-- Render textbox
render.drawText(2, 0, string_output_buffer)
end)
-- Recieve screen data from server
net.receive("writeToLocalScreen", function(len)
-- Read message and concatenate to string_output_buffer
msg = net.readString()
string_output_buffer = string_output_buffer .. msg
end)
-- Recieve screen clear request
net.receive("clrLocalScreen", function(len)
string_output_buffer = ""
end)
--> OWNER CLIENT CODE <--
-- Only run on owner's client!
if player() ~= owner() then
return
end
local ws = WebSocket("localhost", 8765, false) -- WebSocket(url, port, encrypted?)
-- Socket connect function.
function ws:onConnected()
hook.add("tick", "heartbeat", function()
ws:write("hb")
end)
print("Socket connected", self:getState())
end
-- Socket disconnect function. Errored is true if disconnect caused by error.
function ws:onDisconnected(errored)
print("Socket disconnected", errored, self:getState())
end
-- Socket message recieve function.
function ws:onMessage(msg)
if msg == "exit" then
-- Close the socket
self:close()
else
if net.getBytesLeft() > #msg then
-- if not exit request, forward to server
net.start("writeToScreen")
net.writeString(msg)
net.send()
end
end
end
-- Recieve program buffer to transmit to webSocket
net.receive("transmitToWebsocket", function(len)
-- Read message and write to websocket.
msg = net.readString()
ws:write(msg)
end)
-- Connect to the server
ws:connect()
end
import asyncio
import websockets
import subprocess
import select
import struct
import os
import re
import fcntl
import termios
import pty
app_child_pid = None
app_fd = None
def spawn_child():
global app_child_pid
global app_fd
if app_child_pid:
# already spawned
return
try:
(child_pid, fd) = pty.fork()
except OSError as e:
print(str(e))
if child_pid == 0:
# this instance is the child
print("child is running bash")
subprocess.run("bash")
else:
app_child_pid = child_pid
app_fd = fd
print(f"Child PID is {app_child_pid}")
winsize = struct.pack("HHHH", 36, 72, 0, 0)
fcntl.ioctl(fd, termios.TIOCSWINSZ, winsize)
async def ttyws(websocket):
async for message in websocket:
# https://github.com/cs01/pyxtermjs/blob/master/pyxtermjs/app.py
if message == "hb":
# heartbeat, return data
timeout_sec = 0
(data_ready, _, _) = select.select([app_fd], [], [], timeout_sec)
if data_ready:
try:
output = os.read(app_fd, 1024 * 20).decode(
errors="ignore"
)
except OSError as e:
print(str(e))
finally:
print(f"OUTOUT:{type(output)}")
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
result = ansi_escape.sub('', output)
print(result)
await websocket.send(result)
else:
os.write(app_fd, (message + "\n").encode())
print(f"wrote {message} to child")
async def main():
spawn_child()
async with websockets.serve(ttyws, "localhost", 8765):
await asyncio.Future() # run forever
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment