Skip to main content

Singlecam

Use the python script below to process a single cam video. Place your API key and the location of the video file.

end_to_end_singlecam_processing.py
import subprocess, sys

#Install
subprocess.check_call([sys.executable, "-m", "pip", "install", "move-ugc-python"])

from move_ugc import MoveUgc
from move_ugc.schemas.sources import SourceIn
from datetime import datetime
import os
from pathlib import Path
import time
import requests

#API class

class MoveAI:
def __init__(self, api_key, endpoint_url=None):
self.api_key = api_key
if endpoint_url is None:
endpoint_url = 'https://api.move.ai/ugc/graphql'
self.endpoint_url = endpoint_url
self.client = MoveUgc(api_key=api_key, endpoint_url=endpoint_url)

def create_files(self, video_path):
video_file = self.client.files.create(file_type="mp4")
print("File created:", video_file.id)
with open(video_path, 'rb') as f:
requests.put(video_file.presigned_url, data=f.read())
print("File uploaded successfully")
return video_file.id

def create_take(self, video_file_id, device_label, format, metadata=None):
if metadata is None:
metadata = {"test": "test"}
take = self.client.takes.create_singlecam(
sources=[SourceIn(file_id=video_file_id, device_label=device_label, format=format)],
metadata=metadata,
)
print("Take created:", take.id)
return take

def get_take(self, take_id):
take = self.client.takes.retrieve(id=take_id)
return take

def create_job(self, take_id):
job = self.client.jobs.create_singlecam(take_id=take_id, metadata={"test": "demo_job"})
print("Job created:", job.id)
return job

def get_job(self, job_id, expand=False):
# Get a job using the Move One Public API
# Implement job retrieval logic using move_ugc_python SDK
if expand is False:
job = self.client.jobs.retrieve(id=job_id)
else:
job = self.client.jobs.retrieve(
id=job_id, expand=["take", "outputs", "client"]
)
return job

def download_outputs(self, job_id, output_dir, output_name):
# make output dir if it doesn't exist
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# get job
job = self.get_job(job_id, expand=True)
print("Downloading outputs for job:", job.id)
# for each output download the file in the output_dir
output_paths = []
for output in job.outputs:
output_file_name = f"{output_name}{output.file.type}"
output_path = os.path.join(output_dir, output_file_name)
with open(output_path, 'wb') as f:
response = requests.get(output.file.presigned_url)
f.write(response.content)
output_paths.append(output_path)
return output_paths


##Run


# Initialize the MoveAI client, make sure to set the MOVE_API_KEY environment variable or pass it as an argument
client = MoveAI("mv_key_lZDSoLyXYI_ZhZFk8mEX5bFXXg0Ze2Fdq1pdqmnuNJ4")

if not client.api_key:
raise ValueError('Please set the MOVE_API_KEY')

# Set the path to the video file and output directory
input_video_file = Path('data/input_videos/action/Demo.mp4')
output_dir = Path('data/output')

# Upload the video file to MoveAI
video_file_id = client.create_files(input_video_file)
device_label = "cam01"
# The format of the video file created above
format = "MP4"
# Create a take based on the id of the uploaded video and start the job
take = client.create_take(video_file_id, device_label, format)
job = client.create_job(take.id)

# Poll the job until it is finished
attempts = 0

while attempts < 100:
job = client.get_job(job.id)
update_str = f"[{datetime.now().isoformat()} | {attempts}] Job {job.id} is {job.state}"
print(update_str)
if job.state == 'FINISHED':
outputs = client.download_outputs(job.id, output_dir, input_video_file.stem)
print(f"Outputs downloaded to {output_dir}")
print(f"Output files: {outputs}")
break
else:
time.sleep(30)
attempts += 1