Skip to main content

Overview

Move API converts video footage into 3D motion capture data using AI, physics and biomechanics models. It can track human movement from video and output motion capture data in multiple formats including FBX, BVH, USDC, USDZ, GLB, Blend, C3D, JSON, and CSV for use in 3D software, game engines, sports, life sciences, industrial and other applications.

Models

Quickstart - singlecam

Process a single video using the s1 model (singlecam):

from move_ugc import MoveUgc
from move_ugc.schemas.sources import SourceIn
from datetime import datetime
import os
from pathlib import Path
import time
import requests

# API class
class MoveAI:
def __init__(self, api_key, endpoint_url=None):
self.api_key = api_key
if endpoint_url is None:
endpoint_url = 'https://api.move.ai/ugc/graphql'
self.endpoint_url = endpoint_url
self.client = MoveUgc(api_key=api_key, endpoint_url=endpoint_url)

def create_files(self, video_path):
video_file = self.client.files.create(file_type="mp4")
print("File created:", video_file.id)
with open(video_path, 'rb') as f:
requests.put(video_file.presigned_url, data=f.read())
print("File uploaded successfully")
return video_file.id

def create_take(self, video_file_id, device_label, format, metadata=None):
if metadata is None:
metadata = {"test": "test"}
take = self.client.takes.create_singlecam(
sources=[SourceIn(file_id=video_file_id, device_label=device_label, format=format)],
metadata=metadata,
)
print("Take created:", take.id)
return take

def get_take(self, take_id):
take = self.client.takes.retrieve(id=take_id)
return take

def create_job(self, take_id):
job = self.client.jobs.create_singlecam(take_id=take_id, metadata={"test": "demo_job"})
print("Job created:", job.id)
return job

def get_job(self, job_id, expand=False):
# Get a job using the Move One Public API
if expand is False:
job = self.client.jobs.retrieve(id=job_id)
else:
job = self.client.jobs.retrieve(
id=job_id, expand=["take", "outputs", "client"]
)
return job

def download_outputs(self, job_id, output_dir, output_name):
# make output dir if it doesn't exist
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# get job
job = self.get_job(job_id, expand=True)
print("Downloading outputs for job:", job.id)
# for each output download the file in the output_dir
output_paths = []
for output in job.outputs:
output_file_name = f"{output_name}{output.file.type}"
output_path = os.path.join(output_dir, output_file_name)
with open(output_path, 'wb') as f:
response = requests.get(output.file.presigned_url)
f.write(response.content)
output_paths.append(output_path)
return output_paths

# Initialize the MoveAI client
client = MoveAI("your_api_key_here")

if not client.api_key:
raise ValueError('Please set your API key')

# Set the path to the video file and output directory
input_video_file = Path('data/input_videos/action/Demo.mp4')
output_dir = Path('data/output')

# Upload the video file to MoveAI
video_file_id = client.create_files(input_video_file)
device_label = "cam01"
format = "MP4"

# Create a take based on the id of the uploaded video and start the job
take = client.create_take(video_file_id, device_label, format)
job = client.create_job(take.id)

# Poll the job until it is finished
attempts = 0
while attempts < 100:
job = client.get_job(job.id)
update_str = f"[{datetime.now().isoformat()} | {attempts}] Job {job.id} is {job.state}"
print(update_str)
if job.state == 'FINISHED':
outputs = client.download_outputs(job.id, output_dir, input_video_file.stem)
print(f"Outputs downloaded to {output_dir}")
print(f"Output files: {outputs}")
break
else:
time.sleep(30)
attempts += 1

Quickstart - multicam

Process videos from 2 cameras using the m1 model (multicam):

from move_ugc import MoveUgc
from move_ugc.schemas.sources import SourceIn
from move_ugc.schemas.sync_method import SyncMethodInput, ClapWindowInput
from move_ugc.schemas.volume import AreaType
from datetime import datetime
import os
from pathlib import Path
import time
import requests

class MoveAI:
"""MoveAI UGC utility class."""

def __init__(self, api_key, endpoint_url=None) -> None:
"""Initialize MoveAI UGC utility class."""
self.api_key = api_key
if endpoint_url is None:
endpoint_url = 'https://api-test.move.ai/ugc/graphql'
self.endpoint_url = endpoint_url
self.ugc_client = MoveUgc(api_key=api_key, endpoint_url=endpoint_url)

def get_client(self):
"""Get MoveUGC ugc_client."""
client = self.ugc_client.client.retrieve()
return client

def create_files(self, video_path: str) -> str:
"""Create a file in MoveUGC."""
video_file = self.ugc_client.files.create(file_type="mp4")
print("File created:", video_file.id)

with open(video_path, 'rb') as f:
print("Uploading...")
requests.put(video_file.presigned_url, data=f.read())

return video_file.id

def create_volume(self, sources, human_height: float, name=None, metadata=None):
"""Create a new volume."""
clap_window = ClapWindowInput(start_time=0.1, end_time=5.0)
sync_method = SyncMethodInput(clap_window=clap_window)
if metadata is None:
metadata = {"test": "Multicam Quickstart"}
return self.ugc_client.volumes.create_human_volume(
sources=sources,
name=name,
metadata=metadata,
sync_method=sync_method,
human_height=human_height,
area_type=AreaType.NORMAL,
)

def get_volume(self, volume_id: str):
"""Retrieve volume."""
return self.ugc_client.volumes.retrieve_human_volume(id=volume_id)

def create_take(self, sources, volume_id: str, sync_method, name=None, metadata=None) -> str:
"""Create a new take."""
if metadata is None:
metadata = {"test": "Multicam Quickstart"}
return self.ugc_client.takes.create_multicam(
volume_id=volume_id,
sources=sources,
metadata=metadata,
name=name,
sync_method=sync_method,
).id

def create_job(self, take_id: str, number_of_actors: str, name=None, metadata=None):
"""Create a new multicam job."""
return self.ugc_client.jobs.create_multicam(
take_id=take_id, number_of_actors=number_of_actors, metadata={"test": "Multicam Quickstart"},
)

def get_job(self, job_id, expand=False):
if expand is False:
job = self.ugc_client.jobs.retrieve(id=job_id)
else:
job = self.ugc_client.jobs.retrieve(
id=job_id, expand=["take", "outputs"]
)
return job

def download_outputs(self, job_id, output_dir, output_name):
if not os.path.exists(output_dir):
os.makedirs(output_dir)
job = self.get_job(job_id, expand=True)
output_paths = []
for output in job.outputs:
output_file_name = f"{output_name}{output.file.type}"
output_path = os.path.join(output_dir, output_file_name)
with open(output_path, 'wb') as f:
response = requests.get(output.file.presigned_url)
f.write(response.content)
output_paths.append(output_path)
return output_paths

# Initialize the MoveAI client
ugc_client = MoveAI("your_api_key_here")

if not ugc_client.api_key:
raise ValueError('Please set your API key')

# Create calibration videos
sources = []
for camera_number in range(1, 3):
input_video_file = Path(f'data/input_videos/calib/cam0{camera_number}_calib.mp4')
ugc_file = ugc_client.create_files(input_video_file)
print("File uploaded:", ugc_file, "appending to sources...")
sources.append(
SourceIn(
device_label=f"cam0{camera_number}",
file_id=ugc_file,
format="MP4",
camera_settings={
"lens": "goprohero10-fhd",
}
)
)

# Create and process volume
volume = ugc_client.create_volume(sources=sources, human_height=1.77, name="Multicam Quickstart")
print("Volume created:", volume.id)

# Poll the volume until it is finished processing
attempts = 0
print("Polling volume...")
while attempts < 300:
volume = ugc_client.get_volume(volume.id)
update_str = f"[{datetime.now().isoformat()} | {attempts}] Volume {volume.id} is {volume.state}"
print(update_str)
if volume.state == 'FINISHED':
print("Volume is processed successfully, please proceed to job creation")
break
else:
time.sleep(30)
attempts += 1

# Create action videos
sources = []
for camera_number in range(1, 3):
input_video_file = Path(f'data/input_videos/action/cam0{camera_number}_action.mp4')
ugc_file = ugc_client.create_files(input_video_file)
print("File uploaded:", ugc_file, "appending to sources...")
sources.append(
SourceIn(
device_label=f"cam0{camera_number}",
file_id=ugc_file,
format="MP4",
camera_settings={
"lens": "goprohero8-fhd",
}
)
)

# Create take and job
take_id = ugc_client.create_take(
sources,
volume_id=volume.id,
sync_method=SyncMethodInput(
clap_window={
"start_time": 0.1,
"end_time": 3.0,
},
),
name="Test take"
)

print("Take created:", take_id)

job = ugc_client.create_job(take_id=take_id, number_of_actors=1, name="Quickstart job")
print("Job Created:", job.id)

# Poll the job until it is finished
attempts = 0
output_dir = Path('data/output')
while attempts < 300:
job = ugc_client.get_job(job.id)
update_str = f"[{datetime.now().isoformat()} | {attempts}] Job {job.id} is {job.state}"
if job.state == 'FINISHED':
outputs = ugc_client.download_outputs(job.id, output_dir, "Multicam_demo.mp4")
print(f"Outputs downloaded to {output_dir}")
print(f"Output files: {outputs}")
break
else:
time.sleep(60)
attempts += 1