Skip to content

Instantly share code, notes, and snippets.

@sigridjineth
Created May 24, 2025 00:40
Show Gist options
  • Select an option

  • Save sigridjineth/6c1ccbe082799f3440087b47515b66f8 to your computer and use it in GitHub Desktop.

Select an option

Save sigridjineth/6c1ccbe082799f3440087b47515b66f8 to your computer and use it in GitHub Desktop.
Tuning Test
import argparse
import os
import pandas as pd
import vertexai
from google.cloud.aiplatform import pipeline_jobs
from google.cloud.aiplatform.models import Model # For type hinting deployed_model
from vertexai.language_models import TextEmbeddingInput, TextEmbeddingModel
from google.oauth2 import service_account # <--- [수정됨] 모듈 임포트
# --- Configuration (can be overridden by argparse) ---
# export GOOGLE_APPLICATION_CREDENTIALS="/path/to/your-service-account-key.json"
DEFAULT_PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT", "jh-project-1-460723")
DEFAULT_REGION = "us-central1"
DEFAULT_TUNING_JOB_ID = "hello_2" # Optional: Resume an existing tuning session
# 경고: 서비스 계정 키를 코드에 하드코딩하는 것은 보안 위험을 초래할 수 있습니다.
# 테스트 용도로만 사용하고, 프로덕션 환경에서는 절대 사용하지 마십시오.
SERVICE_ACCOUNT_KEY_PATH = "/Users/sigridjineth/PycharmProjects/PythonProject3/resources/jh_service_account.json" # <--- [수정됨] 실제 경로로 수정하세요
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
# ... (나머지 DEFAULT 값들은 그대로 둡니다) ...
DEFAULT_QUERIES_PATH = "gs://cloud-samples-data-test/queries.jsonl"
DEFAULT_CORPUS_PATH = "gs://cloud-samples-data-test/corpus.jsonl"
DEFAULT_TRAIN_LABEL_PATH = "gs://cloud-samples-data-test/train.tsv"
DEFAULT_VALIDATION_LABEL_PATH = "gs://cloud-samples-data-test/validation.tsv"
DEFAULT_TEST_LABEL_PATH = "gs://cloud-samples-data-test/test.tsv"
DEFAULT_TRAIN_STEPS = 1000
DEFAULT_BATCH_SIZE = 128
DEFAULT_BASE_MODEL_VERSION_ID = "text-embedding-005"
DEFAULT_TASK_TYPE = "DEFAULT"
DEFAULT_LEARNING_RATE_MULTIPLIER = 1.0
DEFAULT_OUTPUT_DIMENSIONALITY = 768
# Deployment parameters
DEFAULT_MACHINE_TYPE = "n1-standard-2"
DEFAULT_ACCELERATOR = "NVIDIA_TESLA_T4"
DEFAULT_ACCELERATOR_COUNT = 1
def _validate_pipeline_parameters(
base_model_version_id: str,
task_type: str,
output_dimensionality: int,
train_steps: int,
):
"""Validates pipeline parameters based on model compatibility and constraints."""
if train_steps <= 30:
raise ValueError(
f"'train_steps' must be greater than 30. Received: {train_steps}"
)
if base_model_version_id not in ["text-embedding-005"]:
if task_type == "CODE_RETRIEVAL_QUERY":
raise ValueError(
f"task_type '{task_type}' is not valid for model '{base_model_version_id}'."
)
if base_model_version_id not in [
"text-embedding-005",
"text-embedding-004",
"text-multilingual-embedding-002",
]:
if task_type in ["QUESTION_ANSWERING", "FACT_VERIFICATION"]:
raise ValueError(
f"task_type '{task_type}' is not valid for model '{base_model_version_id}'."
)
if output_dimensionality not in [-1, 768]:
raise ValueError(
f"For model '{base_model_version_id}', 'output_dimensionality' must be -1 or 768. "
f"Received: {output_dimensionality}."
)
if base_model_version_id == "text-embedding-005" and output_dimensionality not in [
-1,
768,
]:
print(
f"Warning: For 'text-embedding-005', 'output_dimensionality' is typically 768 (or -1 for default/max). "
f"Received {output_dimensionality}. This specific value might be unsupported or ignored by the model."
)
def get_or_create_tuning_job(
project_id: str,
region: str,
tuning_job_id_str: str,
base_model_version_id: str,
task_type: str,
corpus_path: str,
queries_path: str,
train_label_path: str,
validation_label_path: str,
test_label_path: str,
batch_size: int,
train_steps: int,
learning_rate_multiplier: float,
output_dimensionality: int,
# [수정됨] credentials 인자 추가
credentials,
) -> tuple[pipeline_jobs.PipelineJob | None, TextEmbeddingModel | None]:
"""Gets an existing tuning job or creates a new one.
Returns a tuple of (PipelineJob, tuning_result_object from SDK if new job).
"""
print(f"Initializing Vertex AI for project {project_id} in {region}...")
# [수정됨] credentials 전달
vertexai.init(project=project_id, location=region, credentials=credentials)
_validate_pipeline_parameters(
base_model_version_id, task_type, output_dimensionality, train_steps
)
base_model = TextEmbeddingModel.from_pretrained(base_model_version_id)
tuning_job: pipeline_jobs.PipelineJob | None = None
tuning_result_sdk_obj = None # Stores the result from base_model.tune_model()
if tuning_job_id_str:
print(
f"Attempting to resume/fetch tuning job with resource name: {tuning_job_id_str}"
)
try:
tuning_job = pipeline_jobs.PipelineJob.get(resource_name=tuning_job_id_str)
print(
f"Found existing tuning job '{tuning_job.name}' (State: {tuning_job.state.name})."
)
except Exception as e:
print(
f"Could not find or get tuning job '{tuning_job_id_str}': {e}. Will attempt to start a new job if not skipping."
)
tuning_job_id_str = ""
if (
not tuning_job and not tuning_job_id_str
):
print(
f"Starting a new tuning session for base model '{base_model_version_id}'."
)
tuning_params = {
"task_type": task_type,
"corpus_data": corpus_path,
"queries_data": queries_path,
"training_data": train_label_path,
"batch_size": batch_size,
"train_steps": train_steps,
"tuned_model_location": region,
"learning_rate_multiplier": learning_rate_multiplier,
"output_dimensionality": output_dimensionality,
}
if (
validation_label_path
):
tuning_params["validation_data"] = validation_label_path
if test_label_path:
tuning_params["test_data"] = test_label_path
tuning_result_sdk_obj = base_model.tune_model(**tuning_params)
tuning_job = pipeline_jobs.PipelineJob.get(
tuning_result_sdk_obj.pipeline_job_name
)
print(
f"Submitted new tuning job '{tuning_job.name}' (State: {tuning_job.state.name})."
)
print(f"To resume this session later, use TUNING_JOB_ID='{tuning_job.name}'")
return tuning_job, tuning_result_sdk_obj
# ... (deploy_tuned_model_from_job, display_tuning_metrics, get_embeddings_from_deployed_model 함수는 변경 없음) ...
def deploy_tuned_model_from_job(
tuning_job: pipeline_jobs.PipelineJob,
tuning_result_sdk_obj,
machine_type: str,
accelerator: str,
accelerator_count: int,
) -> (
Model | None
):
print(f"Ensuring tuning job '{tuning_job.name}' is complete before deployment...")
tuning_job.wait()
if tuning_job.state != pipeline_jobs.PipelineState.PIPELINE_STATE_SUCCEEDED:
print(
f"Tuning job {tuning_job.name} did not succeed (State: {tuning_job.state.name}). Cannot deploy."
)
if tuning_job.error:
print(
f"Job error: {tuning_job.error.message if hasattr(tuning_job.error, 'message') else 'No error message'}"
)
return None
print("Tuning job completed successfully.")
deployed_model_endpoint_obj: Model | None = None
if tuning_result_sdk_obj:
print("Attempting to deploy model using SDK's tuning_result object...")
try:
deployed_model_endpoint_obj = tuning_result_sdk_obj.deploy_tuned_model(
machine_type=machine_type,
accelerator=accelerator,
accelerator_count=accelerator_count,
)
print(
f"Model deployed via SDK result object. Endpoint: {deployed_model_endpoint_obj.endpoint_name}"
)
except Exception as e:
print(f"Error deploying model using SDK tuning_result object: {e}")
print(
"Will attempt deployment via tuned_model_name as a fallback if needed."
)
deployed_model_endpoint_obj = None
if not deployed_model_endpoint_obj:
print(
"Attempting to deploy model by fetching tuned_model_name from job tasks (resume or fallback)..."
)
tasks = tuning_job.task_details
upload_task = next((t for t in tasks if "uploader" in t.task_name), None)
if (
not upload_task
or not hasattr(upload_task, "execution")
or not upload_task.execution
):
print(
"Could not find uploader task or its execution details. Cannot deploy."
)
return None
upload_metadata = dict(upload_task.execution.metadata)
tuned_model_name = upload_metadata.get("output:model_resource_name")
if not tuned_model_name:
print(
"Could not find 'output:model_resource_name' in uploader task metadata. Cannot deploy."
)
return None
print(f"Found tuned model resource name: {tuned_model_name}")
print(f"Deploying model '{tuned_model_name}'...")
try:
deployed_model_endpoint_obj = TextEmbeddingModel.deploy_tuned_model(
tuned_model_name=tuned_model_name,
machine_type=machine_type,
accelerator=accelerator,
accelerator_count=accelerator_count,
)
print(
f"Model '{tuned_model_name}' deployed. Endpoint: {deployed_model_endpoint_obj.endpoint_name}"
)
except Exception as e:
print(f"Error deploying model '{tuned_model_name}': {e}")
return None
return deployed_model_endpoint_obj
def display_tuning_metrics(tuning_job: pipeline_jobs.PipelineJob):
if not tuning_job.done():
print(
f"Tuning job '{tuning_job.name}' is not yet complete. Metrics might not be final or available."
)
print(f"\n--- Quality Metrics for Job '{tuning_job.name}' ---")
try:
tuning_job.refresh()
tasks = tuning_job.task_details
eval_task = next((t for t in tasks if "evaluator" in t.task_name), None)
if (
eval_task
and hasattr(eval_task, "outputs")
and eval_task.outputs
and "metrics" in eval_task.outputs
and eval_task.outputs["metrics"].artifacts
):
metrics_artifact = eval_task.outputs["metrics"].artifacts[0]
metrics = dict(metrics_artifact.metadata)
metrics_df = pd.DataFrame.from_dict(
{"metric": metrics.keys(), "value": metrics.values()}
)
print(metrics_df.sort_values(by="metric", ignore_index=True))
else:
print(
"Evaluator task or metrics artifact not found or empty. Job state:",
tuning_job.state.name,
)
except Exception as e:
print(f"Could not retrieve or display metrics: {e}")
def get_embeddings_from_deployed_model(
deployed_model_obj: TextEmbeddingModel,
texts_to_embed: list[str],
task_type: str,
titles: list[str] | None = None,
):
if not deployed_model_obj:
print("No deployed model object available to get embeddings.")
return
print(
f"\n--- Getting Embeddings from Tuned Model on Endpoint: {deployed_model_obj.endpoint_name} ---"
)
actual_titles = titles if titles else [""] * len(texts_to_embed)
if len(texts_to_embed) != len(actual_titles):
raise ValueError("Length of texts and titles must match.")
embedding_inputs = [
TextEmbeddingInput(text=text, task_type=task_type, title=title)
for text, title in zip(texts_to_embed, actual_titles)
]
try:
embeddings_list_sdk = deployed_model_obj.get_embeddings(embedding_inputs)
print(f"Successfully retrieved {len(embeddings_list_sdk)} embeddings:")
for i, emb_sdk_obj in enumerate(embeddings_list_sdk):
series = pd.Series(emb_sdk_obj.values)
print(f'\n--- Embedding for: "{texts_to_embed[i]}" ---')
print(series.head())
if len(series) > 5:
print("...")
print(f"(Total {len(series)} values)")
except Exception as e:
print(f"Error getting embeddings: {e}")
def main():
parser = argparse.ArgumentParser(
description="Tune and deploy Vertex AI Text Embedding Models."
)
# ... (argparse 설정은 그대로 둡니다) ...
parser.add_argument(
"--project_id",
type=str,
default=DEFAULT_PROJECT_ID,
help="Google Cloud Project ID.",
)
parser.add_argument(
"--region", type=str, default=DEFAULT_REGION, help="Google Cloud Region."
)
parser.add_argument(
"--tuning_job_id",
type=str,
default=DEFAULT_TUNING_JOB_ID,
help="Optional. Vertex AI Pipeline job resource name (e.g., projects/.../pipelineJobs/...) to resume.",
)
parser.add_argument("--queries_path", type=str, default=DEFAULT_QUERIES_PATH)
parser.add_argument("--corpus_path", type=str, default=DEFAULT_CORPUS_PATH)
parser.add_argument(
"--train_label_path", type=str, default=DEFAULT_TRAIN_LABEL_PATH
)
parser.add_argument(
"--validation_label_path",
type=str,
default=DEFAULT_VALIDATION_LABEL_PATH,
help="Optional. GCS URI for validation labels. Empty string for auto-split.",
)
parser.add_argument(
"--test_label_path",
type=str,
default=DEFAULT_TEST_LABEL_PATH,
help="Optional. GCS URI for test labels. Empty string for auto-split.",
)
parser.add_argument("--train_steps", type=int, default=DEFAULT_TRAIN_STEPS)
parser.add_argument("--batch_size", type=int, default=DEFAULT_BATCH_SIZE)
parser.add_argument(
"--base_model_version_id",
type=str,
default=DEFAULT_BASE_MODEL_VERSION_ID,
choices=[
"text-embedding-005",
"text-embedding-004",
"text-multilingual-embedding-002",
],
)
parser.add_argument(
"--task_type",
type=str,
default=DEFAULT_TASK_TYPE,
choices=[
"DEFAULT",
"RETRIEVAL_QUERY",
"RETRIEVAL_DOCUMENT",
"SEMANTIC_SIMILARITY",
"CLASSIFICATION",
"CLUSTERING",
"QUESTION_ANSWERING",
"FACT_VERIFICATION",
"CODE_RETRIEVAL_QUERY",
],
)
parser.add_argument(
"--learning_rate_multiplier",
type=float,
default=DEFAULT_LEARNING_RATE_MULTIPLIER,
)
parser.add_argument(
"--output_dimensionality",
type=int,
default=DEFAULT_OUTPUT_DIMENSIONALITY,
help="Desired embedding dimension. -1 for model default/max. Check model compatibility.",
)
parser.add_argument("--machine_type", type=str, default=DEFAULT_MACHINE_TYPE)
parser.add_argument("--accelerator", type=str, default=DEFAULT_ACCELERATOR)
parser.add_argument(
"--accelerator_count", type=int, default=DEFAULT_ACCELERATOR_COUNT
)
parser.add_argument(
"--skip_tuning",
action="store_true",
help="Skip tuning, only use existing --tuning_job_id for deployment/metrics.",
)
parser.add_argument(
"--skip_deployment",
action="store_true",
help="Skip deployment. Runs tuning (if not skipped) and shows metrics.",
)
parser.add_argument(
"--skip_inference", action="store_true", help="Skip getting sample embeddings."
)
args = parser.parse_args()
if not args.project_id:
raise ValueError(
"PROJECT_ID must be set via --project_id or configured via gcloud/environment."
)
print("--- Vertex AI Text Embedding Tuning Script ---")
print(f"Project: {args.project_id}, Region: {args.region}")
# [수정됨] 서비스 계정 키 파일 경로를 사용하여 credentials 객체 생성
try:
creds = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_KEY_PATH,
scopes=['https://www.googleapis.com/auth/cloud-platform'] # 필요한 경우 범위 명시
)
print(f"Using service account credentials from: {SERVICE_ACCOUNT_KEY_PATH}")
except Exception as e:
print(f"Error loading service account key from {SERVICE_ACCOUNT_KEY_PATH}: {e}")
print("Ensure the path is correct and the file is accessible.")
print("Falling back to Application Default Credentials if available.")
# ADC로 폴백하거나, 여기서 스크립트를 종료하도록 선택할 수 있습니다.
# 여기서는 오류를 출력하고 ADC를 사용하도록 둡니다 (vertexai.init이 credentials 없이 호출될 경우).
# 만약 강제로 이 파일만 사용하게 하려면, 여기서 return 또는 raise를 해야 합니다.
# return # <--- 강제하려면 주석 해제
creds = None # ADC 사용을 위해 None으로 설정 (또는 vertexai.init 호출 시 credentials 인자 제거)
# print("Ensure you are authenticated to Google Cloud (e.g., 'gcloud auth application-default login').") # 이 메시지는 하드코딩 시 덜 관련됨
print("Required packages: google-cloud-aiplatform, pandas. Install if missing.")
tuning_job: pipeline_jobs.PipelineJob | None = None
tuning_result_sdk_obj = None
if args.skip_tuning:
if not args.tuning_job_id:
print("Error: --skip_tuning requires --tuning_job_id to be provided.")
return
print(
f"Skipping tuning. Attempting to fetch existing job: {args.tuning_job_id}"
)
try:
# [수정됨] credentials 전달 (creds가 None이면 ADC 사용)
vertexai.init(project=args.project_id, location=args.region, credentials=creds)
tuning_job = pipeline_jobs.PipelineJob.get(resource_name=args.tuning_job_id)
print(
f"Successfully fetched existing job '{tuning_job.name}' (State: {tuning_job.state.name})."
)
except Exception as e:
print(f"Error fetching existing job '{args.tuning_job_id}': {e}")
return
else:
job_tuple = get_or_create_tuning_job(
project_id=args.project_id,
region=args.region,
tuning_job_id_str=args.tuning_job_id,
base_model_version_id=args.base_model_version_id,
task_type=args.task_type,
corpus_path=args.corpus_path,
queries_path=args.queries_path,
train_label_path=args.train_label_path,
validation_label_path=args.validation_label_path,
test_label_path=args.test_label_path,
batch_size=args.batch_size,
train_steps=args.train_steps,
learning_rate_multiplier=args.learning_rate_multiplier,
output_dimensionality=args.output_dimensionality,
credentials=creds, # [수정됨] credentials 전달
)
tuning_job, tuning_result_sdk_obj = job_tuple
# ... (이후 로직은 동일) ...
if not tuning_job:
print("Failed to get or create a tuning job. Exiting.")
return
if args.skip_deployment or tuning_job.done():
display_tuning_metrics(tuning_job)
else:
print(
f"Tuning job '{tuning_job.name}' is running. Waiting for completion to show metrics and deploy..."
)
tuning_job.wait()
display_tuning_metrics(tuning_job)
deployed_model_sdk_obj: TextEmbeddingModel | None = None
if not args.skip_deployment:
if tuning_job.state == pipeline_jobs.PipelineState.PIPELINE_STATE_SUCCEEDED:
deployed_model_sdk_obj = deploy_tuned_model_from_job(
tuning_job=tuning_job,
tuning_result_sdk_obj=tuning_result_sdk_obj,
machine_type=args.machine_type,
accelerator=args.accelerator,
accelerator_count=args.accelerator_count,
)
else:
print(
f"Tuning job {tuning_job.name} did not succeed (State: {tuning_job.state.name}). Skipping deployment."
)
else:
print("Skipping model deployment as per --skip_deployment flag.")
if not args.skip_inference:
if deployed_model_sdk_obj:
sample_texts = [
"What is the future of AI?",
"Tell me about financial performance of Alphabet.",
]
get_embeddings_from_deployed_model(
deployed_model_obj=deployed_model_sdk_obj,
texts_to_embed=sample_texts,
task_type=args.task_type,
)
else:
print(
"Skipping inference: Model was not deployed in this run, deployment failed, or was skipped."
)
if args.skip_deployment and args.tuning_job_id:
print(
"If a model was previously deployed from this job, you would need to fetch it separately."
)
else:
print("Skipping inference as per --skip_inference flag.")
print("\nScript finished.")
print(
"To clean up billable resources, manually delete the Vertex AI Endpoint, Model, and PipelineJob if they are no longer needed."
)
if __name__ == "__main__":
main()