Watch directory for changes
Watching
Section titled “Watching”You can also write multiple files to the sandbox filesystem using the files.write() method.
The watch_dir()
method returns a WatchHandle
object that can be used to stop watching the directory.
from agentbox import Sandbox
from agentbox.sandbox.filesystem.watch_handle import FilesystemEvent,FilesystemEventType
sandbox = Sandbox(
api_key="ab_xxxxxxxxxxxxxxxxxxxxxxxxx",
template="wemmodr8mb2uk3kn7exw",
timeout=120)
dirname = "/home/user"
contentBytes = "hello, world!".encode("utf-8")
sandbox.files.write(f"{dirname}/1.txt", "this is a test file")
sandbox.files.write(f"{dirname}/2.txt", "this is a test file")
sandbox.files.write(f"{dirname}/3.txt", "this is a test file")
sandbox.files.write(f"{dirname}/4.txt", "this is a test file")
# Watch directory for changes
handle = sandbox.files.watch_dir(dirname)
# write: write new content to an existed file
sandbox.files.write(f"{dirname}/1.txt", contentBytes)
# chmod: change permission of the file
sandbox.commands.run(f"chmod -R 0777 {dirname}/2.txt")
# rename: rename the file
sandbox.files.rename(f"{dirname}/3.txt", f"{dirname}/6.txt")
# create: write a new content to new file
sandbox.files.write(f"{dirname}/5.txt", contentBytes)
# remove: delete an existed file
sandbox.files.remove(f"{dirname}/4.txt")
# recursive==false: can not collect this file
sandbox.files.write(f"{dirname}/test/7.txt", contentBytes)
# Get new events
events = handle.get_new_events()
for event in events:
print("event:", event)
if event.type == FilesystemEventType.WRITE:
print(f"wrote to file {event.name}")
handle.stop()
sandbox.kill()
from agentbox import AsyncSandbox
from agentbox.sandbox.filesystem.watch_handle import FilesystemEvent,FilesystemEventType
def on_event_handler(event: FilesystemEvent) -> None:
print("Event:", event)
sandbox = await AsyncSandbox.create(
api_key="ab_xxxxxxxxxxxxxxxxxxxxxxxxx",
template="wemmodr8mb2uk3kn7exw",
timeout=120)
dirname = "/home/user"
contentBytes = "Hello world!".encode("utf-8")
await sandbox.files.write(f"{dirname}/1.txt", "this is a test file")
await sandbox.files.write(f"{dirname}/2.txt", "this is a test file")
await sandbox.files.write(f"{dirname}/3.txt", "this is a test file")
await sandbox.files.write(f"{dirname}/4.txt", "this is a test file")
# Watch directory for changes
handle = await sandbox.files.watch_dir(dirname, on_event=on_event_handler)
# write: write new content to an existed file
await sandbox.files.write(f"{dirname}/1.txt", contentBytes)
# chmod: change permission of the file
await sandbox.commands.run(f"chmod -R 0777 {dirname}/2.txt")
# rename: rename the file
await sandbox.files.rename(f"{dirname}/3.txt", f"{dirname}/6.txt")
# create: write a new content to new file
await sandbox.files.write(f"{dirname}/5.txt", contentBytes)
# remove: delete an existed file
await sandbox.files.remove(f"{dirname}/4.txt")
# recursive==false: can not collect this file
await sandbox.files.write(f"{dirname}/test/7.txt", contentBytes)
await asyncio.sleep(10)
await handle.stop()
await sandbox.kill()
Recursive Watching
Section titled “Recursive Watching”You can enable recursive watching using the parameter recursive.
from agentbox import Sandbox
from agentbox.sandbox.filesystem.watch_handle import FilesystemEvent,FilesystemEventType
sandbox = Sandbox(
api_key="ab_xxxxxxxxxxxxxxxxxxxxxxxxx",
template="wemmodr8mb2uk3kn7exw",
timeout=120)
dirname = "/home/user"
contentBytes = "Hello world!".encode("utf-8")
# Watch directory for changes
handle = sandbox.files.watch_dir(dirname, recursive=True)
# recursive==True: can collect this file
sandbox.files.write(f"{dirname}/test/7.txt", contentBytes)
# Get new events
events = handle.get_new_events()
for event in events:
print("event:", event)
if event.type == FilesystemEventType.WRITE:
print(f"wrote to file {event.name}")
handle.stop()
sandbox.kill()
from agentbox import AsyncSandbox
from agentbox.sandbox.filesystem.watch_handle import FilesystemEvent,FilesystemEventType
def on_event_handler(event: FilesystemEvent) -> None:
print("Event:", event)
sandbox = await AsyncSandbox.create(
api_key="ab_xxxxxxxxxxxxxxxxxxxxxxxxx",
template="wemmodr8mb2uk3kn7exw",
timeout=120)
dirname = "/home/user"
contentBytes = "Hello world!".encode("utf-8")
# Watch directory for changes
handle = await sandbox.files.watch_dir(dirname, on_event=on_event_handler, recursive=True)
# recursive==True: can collect this file
await sandbox.files.write(f"{dirname}/test/7.txt", contentBytes)
await asyncio.sleep(10)
await handle.stop()
await sandbox.kill()