📢 转载信息
原文作者:Hammad Ausaf and Rajat Jain
本文介绍了如何构建一个可扩展的多模态视频搜索系统,该系统能够使用 Amazon Nova 模型和 Amazon OpenSearch Service 对大型视频数据集进行自然语言搜索。您将学习如何超越手动标记和基于关键字的搜索,实现捕捉视频内容丰富性的语义搜索。
我们通过处理来自两个 AWS Open Data Registry 数据集的 792,270 个视频来大规模演示这一点:Multimedia Commons(787,479 个视频,平均时长 37 秒)和 MEVA(4,791 个视频,平均时长 5 分钟)。处理 8,480 小时的视频内容(3050 万秒)耗时 41 小时。第一年总成本:27,328 美元(使用 OpenSearch 按需付费)或 23,632 美元(使用 OpenSearch Service 预留实例)。成本包括一次性摄取成本(18,088 美元)和年度 Amazon OpenSearch Service 费用(按需 9,240 美元或预留 5,544 美元)。
摄取明细如下:
- Amazon Elastic Compute Cloud (Amazon EC2) 计算(4 × c7i.48xlarge spot,每小时 2.57 美元 × 41 小时):421 美元
- Amazon Bedrock Nova Multimodal Embeddings(3050 万秒 × 0.00056 美元/秒批量定价):17,096 美元
- Nova Pro 标记(792K 视频 × 600 tokens(平均)):571 美元
该解决方案使用 AUDIO_VIDEO_COMBINED 模式生成视听嵌入(请参阅 Nova Multimodal Embeddings API schema),将其存储在 OpenSearch Service 中,并支持文本到视频、视频到视频和混合搜索。
解决方案概述
该架构包含两个主要工作流程——摄取和搜索——它们协同工作,实现大规模多模态视频搜索:
视频摄取管道:
摄取管道使用四台 Amazon EC2 c7i.48xlarge 实例,配备 600 个并行工作进程,每小时处理 19,400 个视频。异步 API 每个账户的并发限制为 30 个作业(请参阅 Amazon Bedrock 配额),因此管道实现了一个带有轮询的作业队列。工作进程将作业提交到并发限制内,轮询完成情况,并在有可用槽位时提交新作业。 Amazon Nova Multimodal Embeddings 异步处理视频,将视频分割成 15 秒的块(优化以捕捉场景变化,同时保持嵌入数量可控),并生成 1024 维的嵌入。选择 1024 维嵌入而非 3072 维,可以节省 3 倍的存储成本,而准确性几乎没有影响。嵌入生成成本与嵌入维度无关。 Amazon Nova Pro 从预定义的分类法中为每个视频添加 10-15 个描述性标签。
注意:Amazon Nova 2 Lite 以更低的成本提高了标记任务的准确性。我们建议在新部署中考虑使用它。该系统将嵌入存储在 OpenSearch k-NN 索引 中,并将元数据标签存储在单独的文本索引中以进行关键字匹配。对于搜索,您可以通过三种方式查询视频:将自然语言转换为嵌入以进行文本到视频搜索,直接比较视频嵌入以进行视频到视频搜索,或将这两种方法结合起来进行混合搜索。
此解决方案支持的搜索类型:
- 文本到视频搜索 – 将自然语言查询转换为嵌入以进行语义相似性匹配
- 视频到视频搜索 – 通过直接比较视频嵌入来查找相似内容
- 混合搜索 – 结合向量相似性(70% 权重)和关键字匹配(30% 权重)以获得最高准确性
视频摄取管道
下图说明了视频摄取和处理管道:

图 1:视频摄取管道,显示从 S3 视频存储到 Nova Multimodal Embeddings 和 Nova Pro 再到双重 OpenSearch 索引的流程
视频处理工作流程如下:
- 将视频上传到 Amazon Simple Storage Service (Amazon S3)。
- 使用 Nova Multimodal Embeddings 异步 API 处理视频并生成嵌入。该 API 会自动分割视频并生成嵌入。一个协调器轮询作业完成情况(异步 API 每个账户有 30 个并发作业限制,请参阅 Amazon Bedrock 配额),并从 Amazon S3 检索结果。
- 使用 Nova Pro(或 Nova Lite 以获得更低的成本和更高的准确性)从预定义的分类法生成描述性标签,以增强搜索功能。
- 在 OpenSearch k-NN 索引中索引嵌入,并在文本索引中索引标签。
视频搜索架构
下图显示了完整的搜索架构:

图 2:视频搜索架构,演示了三种搜索模式——文本到视频、视频到视频以及结合 k-NN 和 BM25 的混合搜索
搜索架构支持三种模式:
- 文本到视频 – 自然语言查询
- 视频到视频 – 相似内容发现
- 混合 – 语义和关键字匹配的组合
先决条件
在开始之前,您需要:
- 一个 AWS 账户,该账户在
us-east-1区域具有访问 Amazon Bedrock 的权限(Nova 模型在具有适当 IAM 权限的情况下默认启用) - 安装了 Python 3.9 或更高版本
- 配置了适当凭证的 AWS Command Line Interface (AWS CLI)
- 一个 Amazon OpenSearch Service 域(建议使用 r6g.large 或更大)
- 一个用于视频存储和嵌入输出的 Amazon S3 存储桶
- 用于 Amazon Bedrock、OpenSearch Service 和 Amazon S3 的 AWS Identity and Access Management (IAM)
该解决方案使用了:
- Amazon Bedrock 与 Nova Multimodal Embeddings (amazon.nova-2-multimodal-embeddings-v1:0)
- Amazon Bedrock 与 Nova Pro (us.amazon.nova-pro-v1:0) 或 Nova Lite (us.amazon.nova-2-lite-v1:0) 用于标记
- Amazon OpenSearch Service 2.11 或更高版本,带有 k-NN 插件
- Amazon S3 用于视频和嵌入存储
演练
步骤 1:创建 IAM 角色和策略
创建一个 IAM 角色,并授予调用 Amazon Bedrock 模型、写入 OpenSearch 索引以及读取/写入 S3 对象的权限。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"bedrock:InvokeModel",
"bedrock:StartAsyncInvoke",
"bedrock:GetAsyncInvoke",
"bedrock:ListAsyncInvoke"
],
"Resource": [
"arn:aws:bedrock:us-east-1::foundation-model/amazon.nova-2-multimodal-embeddings-v1:0",
"arn:aws:bedrock:us-east-1::foundation-model/us.amazon.nova-pro-v1:0"
]
},
{
"Effect": "Allow",
"Action": [
"es:ESHttpPost",
"es:ESHttpPut",
"es:ESHttpGet"
],
"Resource": "arn:aws:es:us-east-1:ACCOUNT_ID:domain/DOMAIN_NAME/*"
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::amzn-s3-demo-video-bucket/*",
"arn:aws:s3:::amzn-s3-demo-embedding-bucket/*"
]
}
]
}
步骤 2:设置 OpenSearch Service 索引
创建两个 OpenSearch Service 索引:一个用于向量嵌入(k-NN),一个用于文本元数据。此架构支持语义搜索和混合查询。
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
import boto3
session = boto3.Session()
credentials = session.get_credentials()
awsauth = AWS4Auth(
credentials.access_key,
credentials.secret_key,
session.region_name,
'es',
session_token=credentials.token
)
opensearch_client = OpenSearch(
hosts=[{'host': 'YOUR_OPENSEARCH_ENDPOINT', 'port': 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection
)
# Create k-Nearest Neighbors (k-NN) index for embeddings
knn_index_body = {
"settings": {
"index.knn": True,
"number_of_shards": 2,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"video_id": {"type": "keyword"},
"segment_index": {"type": "integer"},
"timestamp": {"type": "float"},
"embedding": {
"type": "knn_vector",
"dimension": 1024,
"method": {
"name": "hnsw",
"space_type": "cosinesimilarity",
"engine": "faiss"
}
},
"s3_uri": {"type": "keyword"}
}
}
}
opensearch_client.indices.create( index="video-embeddings-knn", body=knn_index_body
)
# Create text index for metadata
text_index_body = {
"settings": {
"number_of_shards": 2,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"video_id": {"type": "keyword"},
"segment_index": {"type": "integer"},
"tags": {"type": "text", "analyzer": "standard"}
}
}
}
opensearch_client.indices.create(
index="video-embeddings-text",
body=text_index_body
)
步骤 3:使用 Nova Multimodal Embeddings 处理视频
Amazon Bedrock 异步 API 处理视频并生成嵌入。它将视频分割成 15 秒的块,并结合音频和视觉信息。
import boto3
import json
import time
bedrock = boto3.client('bedrock-runtime', region_name='us-east-1')
def generate_video_embeddings(video_s3_uri, output_s3_uri):
"""Generate embeddings for a video using Nova MME async API."""
# Start async job
response = bedrock.start_async_invoke(
modelId="amazon.nova-2-multimodal-embeddings-v1:0",
modelInput={
"taskType": "SEGMENTED_EMBEDDING",
"segmentedEmbeddingParams": {
"embeddingPurpose": "GENERIC_INDEX",
"embeddingDimension": 1024,
"video": {
"format": "mp4",
"embeddingMode": "AUDIO_VIDEO_COMBINED",
"source": {"s3Location": {"uri": video_s3_uri}},
"segmentationConfig": {"durationSeconds": 15}
}
}
},
outputDataConfig={"s3OutputDataConfig": {"s3Uri": output_s3_uri}}
)
# Poll for completion
invocation_arn = response["invocationArn"]
while True:
job = bedrock.get_async_invoke(invocationArn=invocation_arn)
if job["status"] == "Completed":
return read_embeddings_from_s3(job["outputDataConfig"]["s3OutputDataConfig"]["s3Uri"])
elif job["status"] in ["Failed", "Expired"]:
raise RuntimeError(f"Job failed: {job.get('failureMessage')}")
time.sleep(10)
def manage_concurrent_jobs(bedrock_client, video_queue, max_concurrent=30):
"""Manage 30 concurrent async jobs within quota limits."""
active_jobs = {}
while video_queue or active_jobs:
# Submit new jobs up to limit (uses same start_async_invoke call as above)
while len(active_jobs) < max_concurrent and video_queue:
video_info = video_queue.pop(0)
response = bedrock_client.start_async_invoke(
modelId="amazon.nova-2-multimodal-embeddings-v1:0",
modelInput={...}, # Same model_input structure as generate_video_embeddings()
outputDataConfig={"s3OutputDataConfig": {"s3Uri": video_info['output_uri']}}
)
active_jobs[response["invocationArn"]] = video_info
# Poll all active jobs
for arn in list(active_jobs.keys()):
job = bedrock_client.get_async_invoke(invocationArn=arn)
if job["status"] == "Completed":
video_info = active_jobs.pop(arn)
embeddings = read_embeddings_from_s3(job["outputDataConfig"]["s3OutputDataConfig"]["s3Uri"])
# Process embeddings...
elif job["status"] in ["Failed", "Expired"]:
active_jobs.pop(arn)
if active_jobs:
time.sleep(10)
def read_embeddings_from_s3(s3_uri):
"""Read JSONL embeddings from S3. Returns list of {startTime, endTime, embedding} dicts."""
# Download and parse JSONL from s3_uri (standard S3 GetObject + json.loads per line)
pass # Placeholder for actual S3 reading logic
步骤 4:使用 Nova Pro 或 Nova Lite 生成元数据标签
使用 Nova Pro(或 Nova Lite 以获得更好的成本效益和准确性)为视频生成描述性标签,以支持结合了语义和关键字匹配的混合搜索。
VALID_TAGS = [
"person", "vehicle", "animal", "building", "nature", "indoor", "outdoor",
"walking", "running", "sitting", "standing", "talking", "driving",
"day", "night", "sunny", "cloudy", "urban", "rural", "beach", "forest",
"sports", "music", "food", "technology", "crowd", "solo"
]
def generate_tags(video_s3_uri, sample_frame_count=3):
"""Generate descriptive tags using Nova Pro or Nova Lite."""
prompt = f"""Analyze this video and select 10-15 tags from this predefined list that best describe the content:
{', '.join(VALID_TAGS)}
Only return tags from this list as a comma-separated list. Do not invent new tags."""
response = bedrock.converse(
modelId="us.amazon.nova-pro-v1:0", # Or use us.amazon.nova-2-lite-v1:0
messages=[
{
"role": "user",
"content": [
{
"video": {
"format": "mp4",
"source": {"s3Location": {"uri": video_s3_uri}}
}
},
{
"text": prompt
}
]
}
]
)
# Parse tags from response and validate against taxonomy
tags_text = response['output']['message']['content'][0]['text']
tags = [tag.strip().lower() for tag in tags_text.split(',')]
# Filter to only valid tags from our taxonomy
valid_tags = [tag for tag in tags if tag in VALID_TAGS]
return valid_tags
步骤 5:在 OpenSearch Service 中索引嵌入和标签
使用 批量索引 将生成的嵌入和标签高效地存储在 OpenSearch Service 中。
from opensearchpy import helpers
def index_video_data(video_id, s3_uri, embeddings, tags):
"""Index embeddings and tags in OpenSearch."""
# Prepare bulk actions for k-NN index
knn_actions = []
for idx, emb in enumerate(embeddings):
doc_id = f"{video_id}_{idx}"
knn_actions.append({
"_index": "video-embeddings-knn",
"_id": doc_id,
"_source": {
"video_id": video_id,
"segment_index": idx,
"timestamp": emb['start_time'],
"embedding": emb['embedding'],
"s3_uri": s3_uri
}
})
helpers.bulk(opensearch_client, knn_actions)
# Prepare bulk actions for text index
text_actions = []
for idx in range(len(embeddings)):
doc_id = f"{video_id}_{idx}"
text_actions.append({
"_index": "video-embeddings-text",
"_id": doc_id,
"_source": {
"video_id": video_id,
"segment_index": idx,
"tags": " ".join(tags)
}
})
helpers.bulk(opensearch_client, text_actions)
print(f"Indexed {len(embeddings)} segments for video {video_id}")
步骤 6:实现搜索功能
数据摄取完成后,可以通过三种方式搜索已索引的视频。该实现的目标是低延迟查询。
初始化用于搜索的 OpenSearch Service 客户端
首先,创建用于搜索操作的 OpenSearch Service 客户端:
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
import boto3
def create_opensearch_client():
"""Create OpenSearch client with AWS authentication."""
session = boto3.Session(region_name='us-east-1')
credentials = session.get_credentials()
awsauth = AWS4Auth(
credentials.access_key,
credentials.secret_key,
'us-east-1',
'es',
session_token=credentials.token
)
return OpenSearch(
hosts=[{'host': 'YOUR_OPENSEARCH_ENDPOINT', 'port': 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
timeout=30
)
# Create client
opensearch_client = create_opensearch_client()
文本到视频语义搜索
使用同步 API 将自然语言查询转换为嵌入,然后执行 k-NN 相似性搜索:
def search_text_to_video(query_text, opensearch_client, k=10):
"""Search videos using natural language query converted to embedding."""
bedrock_client = boto3.client('bedrock-runtime', region_name='us-east-1')
# Use SINGLE_EMBEDDING task type for text-to-embedding conversion
# VIDEO_RETRIEVAL purpose optimizes embeddings for searching video content
request_body = {
"taskType": "SINGLE_EMBEDDING",
"singleEmbeddingParams": {
"embeddingPurpose": "VIDEO_RETRIEVAL",
"embeddingDimension": 1024,
"text": {
"truncationMode": "END",
"value": query_text
}
}
}
response = bedrock_client.invoke_model(
modelId='amazon.nova-2-multimodal-embeddings-v1:0',
body=json.dumps(request_body),
accept='application/json',
contentType='application/json'
)
response_body = json.loads(response['body'].read())
# Response structure: {"embeddings": [{"embeddingType": "TEXT", "embedding": [...]}]}
query_embedding = response_body['embeddings'][0]['embedding']
# Perform k-NN search against video embeddings
search_body = {
"query": {
"knn": {
"embedding": {
"vector": query_embedding,
"k": k
}
}
},
"size": k,
"_source": ["video_id", "segment_index", "timestamp", "s3_uri"]
}
response = opensearch_client.search(
index="video-embeddings-knn",
body=search_body
)
# Extract results
return [
{
'score': hit['_score'],
'video_id': hit['_source']['video_id'],
'segment_index': hit['_source']['segment_index'],
'timestamp': hit['_source'].get('timestamp', 0)
}
for hit in response['hits']['hits']
]
使用 BM25 进行文本搜索(关键字匹配)
使用 OpenSearch BM25 评分对标签进行关键字匹配,无需生成嵌入:
def search_text_bm25(search_term, opensearch_client, k=10):
"""Search videos using BM25 keyword matching on tags field."""
# Search text index using match query on tags
search_body = {
"query": {
"match": {
"tags": search_term
}
},
"size": k,
"_source": ["video_id", "segment_index", "tags"]
}
response = opensearch_client.search(
index="video-embeddings-text",
body=search_body
)
return response['hits']['hits'] # Extract results (same pattern as above)
视频到视频搜索
从 OpenSearch Service 中检索现有视频的嵌入,并搜索相似内容——无需调用 Amazon Bedrock API:
def search_video_to_video(query_video_id, query_segment_index, opensearch_client, k=10):
"""Find similar videos using a reference video segment."""
# Get the embedding from the reference video segment
sample_query = {
"query": {
"bool": {
"must": [
{"term": {"video_id": query_video_id}},
{"term": {"segment_index": query_segment_index}}
]
}
},
"_source": ["video_id", "segment_index", "embedding"]
}
sample_response = opensearch_client.search(
index="video-embeddings-knn",
body=sample_query
)
if not sample_response['hits']['hits']:
return []
sample_doc = sample_response['hits']['hits'][0]['_source']
query_embedding = sample_doc.get('embedding')
# Perform k-NN search with the embedding
search_body = {
"query": {
"knn": {
"embedding": {
"vector": query_embedding,
"k": k
}
}
},
"size": k,
"_source": ["video_id", "segment_index", "timestamp"]
}
response = opensearch_client.search(
index="video-embeddings-knn",
body=search_body
)
return response['hits']['hits'] # Extract results as needed
混合搜索
通过从两个索引检索结果并进行加权评分,结合语义 k-NN 和 BM25 关键字匹配:
def search_hybrid(query_text, opensearch_client, k=10, vector_weight=0.7, text_weight=0.3):
"""Hybrid search combining k-NN semantic search and BM25 text matching."""
# Generate query embedding (use same code as search_text_to_video above)
query_embedding = generate_query_embedding(query_text) # See text-to-video example
# Get k-NN results (same query as search_text_to_video)
knn_response = opensearch_client.search(
index="video-embeddings-knn",
body={
"query": {"knn": {"embedding": {"vector": query_embedding, "k": 20}}}
},
"size": 20
)
# Get BM25 text results (same query as search_text_bm25)
text_response = opensearch_client.search(
index="video-embeddings-text",
body={
"query": {"match": {"tags": query_text}},
"size": 20
}
)
# Combine results with weighted scoring
knn_hits = knn_response['hits']['hits']
text_hits = text_response['hits']['hits']
combined = {}
for hit in knn_hits:
vid = hit['_source']['video_id']
seg = hit['_source']['segment_index']
key = f"{vid}_{seg}"
combined[key] = {
'video_id': vid,
'segment_index': seg,
'tags': hit['_source'].get('tags', ''),
'vector_score': hit['_score'],
'text_score': 0,
'combined_score': hit['_score'] * vector_weight
}
for hit in text_hits:
vid = hit['_source']['video_id']
seg = hit['_source']['segment_index']
key = f"{vid}_{seg}"
if key in combined:
combined[key]['text_score'] = hit['_score']
combined[key]['combined_score'] += hit['_score'] * text_weight
else:
combined[key] = {
'video_id': vid,
'segment_index': seg,
'tags': hit['_source'].get('tags', ''),
'vector_score': 0,
'text_score': hit['_score'],
'combined_score': hit['_score'] * text_weight
}
# Sort by combined score and return top k
sorted_results = sorted(combined.values(), key=lambda x: x['combined_score'], reverse=True)[:k]
return sorted_results
# Usage example - search with natural language query
query = "person walking on beach at sunset"
hybrid_results = search_hybrid(query, opensearch_client, k=10)
for r in hybrid_results:
print(f"Combined: {r['combined_score']:.4f} (Vector: {r['vector_score']:.4f}, Text: {r['text_score']:.4f})")
print(f" Video: {r['video_id']}, Segment: {r['segment_index']}")
print(f" Tags: {r['tags']}\n")
大规模搜索性能
在索引了所有 792,218 个视频后,我们测量了所有三种方法的搜索性能。
在 792,218 个视频时的测量查询延迟如下:
在索引和存储了所有 792,218 个视频并生成嵌入后,存储需求如下:
- k-NN 索引:28.8 GB(针对 792K 视频)
- 文本索引:1.0 GB(针对 792K 视频)
- 总计:29.8 GB(在现代 OpenSearch 集群上可管理)
k-NN 搜索使用的分层可导航小世界 (HNSW) 算法提供了对数时间复杂度,这意味着搜索时间随着数据集的增加而缓慢增长。即使在 792K 视频的规模下,所有三种搜索方法都能保持低于 200 毫秒的响应时间,满足交互式搜索应用的生产要求。
注意事项
性能和成本考量
视频处理时间取决于视频长度。在我们的测试中,一个 45 秒的视频使用异步 API 处理大约需要 70 秒。处理过程包括自动分割、为每个片段生成嵌入以及输出到 Amazon S3。搜索操作可高效扩展——我们的测试表明,即使在 792K 视频的情况下,语义搜索也 在 80 毫秒内完成,文本搜索 在 30 毫秒内完成,混合搜索 在 110 毫秒内完成。使用 1024 维嵌入而不是 3072 维,可以在保持准确性的同时降低存储成本。Nova Multimodal Embeddings 按输入视频秒数收费(批量价格为 0.00056 美元/秒),因此视频时长——而不是嵌入维度或分割——决定了处理成本。异步 API 比单独处理帧更具成本效益。对于 OpenSearch Service,使用 r6g 实例比早期实例类型提供更好的价格性能,您可以实施分层存储,将冷数据移至 Amazon S3 以获得额外节省。
扩展到生产环境
对于拥有大型视频库的生产部署,请考虑使用 AWS Batch 在多个计算实例之间并行处理视频。您可以对视频数据集进行分区,并将子集分配给不同的工作进程。监控 OpenSearch Service 集群健康状况,并根据索引增长情况扩展数据节点。双索引架构的可扩展性很好,因为 k-NN 和文本搜索可以独立优化。
搜索准确性调优
根据您的用例调整混合搜索权重。默认的 0.7/0.3 分配(向量/文本)在大多数情况下倾向于语义相似性。如果您拥有高质量的元数据标签,将文本权重增加到 0.5 可以提高结果。我们建议使用您的特定内容测试不同的配置,以找到最佳平衡点。
清理
为避免持续收费,请删除您创建的资源:
- 从 Amazon OpenSearch Service 控制台中删除 OpenSearch Service 域
- 清空并删除用于视频和嵌入的 S3 存储桶
- 删除为该解决方案专门创建的任何 IAM 角色
🚀 想要体验更好更全面的AI调用?
欢迎使用青云聚合API,约为官网价格的十分之一,支持300+全球最新模型,以及全球各种生图生视频模型,无需翻墙高速稳定,文档丰富,小白也可以简单操作。
评论区