Skip to content

Watch directory for changes

You can also write multiple files to the sandbox filesystem using the files.write() method.

The watch_dir() method returns a WatchHandle object that can be used to stop watching the directory.

from agentbox import Sandbox
from agentbox.sandbox.filesystem.watch_handle import FilesystemEvent,FilesystemEventType

sandbox = Sandbox(
    api_key="ab_xxxxxxxxxxxxxxxxxxxxxxxxx",
    template="wemmodr8mb2uk3kn7exw",
    timeout=120)

dirname = "/home/user"
contentBytes = "hello, world!".encode("utf-8")

sandbox.files.write(f"{dirname}/1.txt", "this is a test file")
sandbox.files.write(f"{dirname}/2.txt", "this is a test file")
sandbox.files.write(f"{dirname}/3.txt", "this is a test file")
sandbox.files.write(f"{dirname}/4.txt", "this is a test file")

# Watch directory for changes
handle = sandbox.files.watch_dir(dirname)

# write: write new content to an existed file
sandbox.files.write(f"{dirname}/1.txt", contentBytes)
# chmod: change permission of the file
sandbox.commands.run(f"chmod -R 0777 {dirname}/2.txt")
# rename: rename the file
sandbox.files.rename(f"{dirname}/3.txt", f"{dirname}/6.txt")
# create: write a new content to new file
sandbox.files.write(f"{dirname}/5.txt", contentBytes)
# remove: delete an existed file
sandbox.files.remove(f"{dirname}/4.txt")
# recursive==false: can not collect this file
sandbox.files.write(f"{dirname}/test/7.txt", contentBytes)

# Get new events
events = handle.get_new_events()
for event in events:
    print("event:", event)
    if event.type == FilesystemEventType.WRITE:
        print(f"wrote to file {event.name}")

handle.stop()
sandbox.kill()

You can enable recursive watching using the parameter recursive.

from agentbox import Sandbox
from agentbox.sandbox.filesystem.watch_handle import FilesystemEvent,FilesystemEventType

sandbox = Sandbox(
    api_key="ab_xxxxxxxxxxxxxxxxxxxxxxxxx",
    template="wemmodr8mb2uk3kn7exw",
    timeout=120)

dirname = "/home/user"
contentBytes = "Hello world!".encode("utf-8")

# Watch directory for changes
handle = sandbox.files.watch_dir(dirname, recursive=True)
# recursive==True: can collect this file
sandbox.files.write(f"{dirname}/test/7.txt", contentBytes)

# Get new events
events = handle.get_new_events()
for event in events:
    print("event:", event)
    if event.type == FilesystemEventType.WRITE:
        print(f"wrote to file {event.name}")

handle.stop()
sandbox.kill()