Created
May 24, 2025 00:40
-
-
Save sigridjineth/6c1ccbe082799f3440087b47515b66f8 to your computer and use it in GitHub Desktop.
Tuning Test
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import argparse | |
| import os | |
| import pandas as pd | |
| import vertexai | |
| from google.cloud.aiplatform import pipeline_jobs | |
| from google.cloud.aiplatform.models import Model # For type hinting deployed_model | |
| from vertexai.language_models import TextEmbeddingInput, TextEmbeddingModel | |
| from google.oauth2 import service_account # <--- [수정됨] 모듈 임포트 | |
| # --- Configuration (can be overridden by argparse) --- | |
| # export GOOGLE_APPLICATION_CREDENTIALS="/path/to/your-service-account-key.json" | |
| DEFAULT_PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT", "jh-project-1-460723") | |
| DEFAULT_REGION = "us-central1" | |
| DEFAULT_TUNING_JOB_ID = "hello_2" # Optional: Resume an existing tuning session | |
| # 경고: 서비스 계정 키를 코드에 하드코딩하는 것은 보안 위험을 초래할 수 있습니다. | |
| # 테스트 용도로만 사용하고, 프로덕션 환경에서는 절대 사용하지 마십시오. | |
| SERVICE_ACCOUNT_KEY_PATH = "/Users/sigridjineth/PycharmProjects/PythonProject3/resources/jh_service_account.json" # <--- [수정됨] 실제 경로로 수정하세요 | |
| import ssl | |
| ssl._create_default_https_context = ssl._create_unverified_context | |
| # ... (나머지 DEFAULT 값들은 그대로 둡니다) ... | |
| DEFAULT_QUERIES_PATH = "gs://cloud-samples-data-test/queries.jsonl" | |
| DEFAULT_CORPUS_PATH = "gs://cloud-samples-data-test/corpus.jsonl" | |
| DEFAULT_TRAIN_LABEL_PATH = "gs://cloud-samples-data-test/train.tsv" | |
| DEFAULT_VALIDATION_LABEL_PATH = "gs://cloud-samples-data-test/validation.tsv" | |
| DEFAULT_TEST_LABEL_PATH = "gs://cloud-samples-data-test/test.tsv" | |
| DEFAULT_TRAIN_STEPS = 1000 | |
| DEFAULT_BATCH_SIZE = 128 | |
| DEFAULT_BASE_MODEL_VERSION_ID = "text-embedding-005" | |
| DEFAULT_TASK_TYPE = "DEFAULT" | |
| DEFAULT_LEARNING_RATE_MULTIPLIER = 1.0 | |
| DEFAULT_OUTPUT_DIMENSIONALITY = 768 | |
| # Deployment parameters | |
| DEFAULT_MACHINE_TYPE = "n1-standard-2" | |
| DEFAULT_ACCELERATOR = "NVIDIA_TESLA_T4" | |
| DEFAULT_ACCELERATOR_COUNT = 1 | |
| def _validate_pipeline_parameters( | |
| base_model_version_id: str, | |
| task_type: str, | |
| output_dimensionality: int, | |
| train_steps: int, | |
| ): | |
| """Validates pipeline parameters based on model compatibility and constraints.""" | |
| if train_steps <= 30: | |
| raise ValueError( | |
| f"'train_steps' must be greater than 30. Received: {train_steps}" | |
| ) | |
| if base_model_version_id not in ["text-embedding-005"]: | |
| if task_type == "CODE_RETRIEVAL_QUERY": | |
| raise ValueError( | |
| f"task_type '{task_type}' is not valid for model '{base_model_version_id}'." | |
| ) | |
| if base_model_version_id not in [ | |
| "text-embedding-005", | |
| "text-embedding-004", | |
| "text-multilingual-embedding-002", | |
| ]: | |
| if task_type in ["QUESTION_ANSWERING", "FACT_VERIFICATION"]: | |
| raise ValueError( | |
| f"task_type '{task_type}' is not valid for model '{base_model_version_id}'." | |
| ) | |
| if output_dimensionality not in [-1, 768]: | |
| raise ValueError( | |
| f"For model '{base_model_version_id}', 'output_dimensionality' must be -1 or 768. " | |
| f"Received: {output_dimensionality}." | |
| ) | |
| if base_model_version_id == "text-embedding-005" and output_dimensionality not in [ | |
| -1, | |
| 768, | |
| ]: | |
| print( | |
| f"Warning: For 'text-embedding-005', 'output_dimensionality' is typically 768 (or -1 for default/max). " | |
| f"Received {output_dimensionality}. This specific value might be unsupported or ignored by the model." | |
| ) | |
| def get_or_create_tuning_job( | |
| project_id: str, | |
| region: str, | |
| tuning_job_id_str: str, | |
| base_model_version_id: str, | |
| task_type: str, | |
| corpus_path: str, | |
| queries_path: str, | |
| train_label_path: str, | |
| validation_label_path: str, | |
| test_label_path: str, | |
| batch_size: int, | |
| train_steps: int, | |
| learning_rate_multiplier: float, | |
| output_dimensionality: int, | |
| # [수정됨] credentials 인자 추가 | |
| credentials, | |
| ) -> tuple[pipeline_jobs.PipelineJob | None, TextEmbeddingModel | None]: | |
| """Gets an existing tuning job or creates a new one. | |
| Returns a tuple of (PipelineJob, tuning_result_object from SDK if new job). | |
| """ | |
| print(f"Initializing Vertex AI for project {project_id} in {region}...") | |
| # [수정됨] credentials 전달 | |
| vertexai.init(project=project_id, location=region, credentials=credentials) | |
| _validate_pipeline_parameters( | |
| base_model_version_id, task_type, output_dimensionality, train_steps | |
| ) | |
| base_model = TextEmbeddingModel.from_pretrained(base_model_version_id) | |
| tuning_job: pipeline_jobs.PipelineJob | None = None | |
| tuning_result_sdk_obj = None # Stores the result from base_model.tune_model() | |
| if tuning_job_id_str: | |
| print( | |
| f"Attempting to resume/fetch tuning job with resource name: {tuning_job_id_str}" | |
| ) | |
| try: | |
| tuning_job = pipeline_jobs.PipelineJob.get(resource_name=tuning_job_id_str) | |
| print( | |
| f"Found existing tuning job '{tuning_job.name}' (State: {tuning_job.state.name})." | |
| ) | |
| except Exception as e: | |
| print( | |
| f"Could not find or get tuning job '{tuning_job_id_str}': {e}. Will attempt to start a new job if not skipping." | |
| ) | |
| tuning_job_id_str = "" | |
| if ( | |
| not tuning_job and not tuning_job_id_str | |
| ): | |
| print( | |
| f"Starting a new tuning session for base model '{base_model_version_id}'." | |
| ) | |
| tuning_params = { | |
| "task_type": task_type, | |
| "corpus_data": corpus_path, | |
| "queries_data": queries_path, | |
| "training_data": train_label_path, | |
| "batch_size": batch_size, | |
| "train_steps": train_steps, | |
| "tuned_model_location": region, | |
| "learning_rate_multiplier": learning_rate_multiplier, | |
| "output_dimensionality": output_dimensionality, | |
| } | |
| if ( | |
| validation_label_path | |
| ): | |
| tuning_params["validation_data"] = validation_label_path | |
| if test_label_path: | |
| tuning_params["test_data"] = test_label_path | |
| tuning_result_sdk_obj = base_model.tune_model(**tuning_params) | |
| tuning_job = pipeline_jobs.PipelineJob.get( | |
| tuning_result_sdk_obj.pipeline_job_name | |
| ) | |
| print( | |
| f"Submitted new tuning job '{tuning_job.name}' (State: {tuning_job.state.name})." | |
| ) | |
| print(f"To resume this session later, use TUNING_JOB_ID='{tuning_job.name}'") | |
| return tuning_job, tuning_result_sdk_obj | |
| # ... (deploy_tuned_model_from_job, display_tuning_metrics, get_embeddings_from_deployed_model 함수는 변경 없음) ... | |
| def deploy_tuned_model_from_job( | |
| tuning_job: pipeline_jobs.PipelineJob, | |
| tuning_result_sdk_obj, | |
| machine_type: str, | |
| accelerator: str, | |
| accelerator_count: int, | |
| ) -> ( | |
| Model | None | |
| ): | |
| print(f"Ensuring tuning job '{tuning_job.name}' is complete before deployment...") | |
| tuning_job.wait() | |
| if tuning_job.state != pipeline_jobs.PipelineState.PIPELINE_STATE_SUCCEEDED: | |
| print( | |
| f"Tuning job {tuning_job.name} did not succeed (State: {tuning_job.state.name}). Cannot deploy." | |
| ) | |
| if tuning_job.error: | |
| print( | |
| f"Job error: {tuning_job.error.message if hasattr(tuning_job.error, 'message') else 'No error message'}" | |
| ) | |
| return None | |
| print("Tuning job completed successfully.") | |
| deployed_model_endpoint_obj: Model | None = None | |
| if tuning_result_sdk_obj: | |
| print("Attempting to deploy model using SDK's tuning_result object...") | |
| try: | |
| deployed_model_endpoint_obj = tuning_result_sdk_obj.deploy_tuned_model( | |
| machine_type=machine_type, | |
| accelerator=accelerator, | |
| accelerator_count=accelerator_count, | |
| ) | |
| print( | |
| f"Model deployed via SDK result object. Endpoint: {deployed_model_endpoint_obj.endpoint_name}" | |
| ) | |
| except Exception as e: | |
| print(f"Error deploying model using SDK tuning_result object: {e}") | |
| print( | |
| "Will attempt deployment via tuned_model_name as a fallback if needed." | |
| ) | |
| deployed_model_endpoint_obj = None | |
| if not deployed_model_endpoint_obj: | |
| print( | |
| "Attempting to deploy model by fetching tuned_model_name from job tasks (resume or fallback)..." | |
| ) | |
| tasks = tuning_job.task_details | |
| upload_task = next((t for t in tasks if "uploader" in t.task_name), None) | |
| if ( | |
| not upload_task | |
| or not hasattr(upload_task, "execution") | |
| or not upload_task.execution | |
| ): | |
| print( | |
| "Could not find uploader task or its execution details. Cannot deploy." | |
| ) | |
| return None | |
| upload_metadata = dict(upload_task.execution.metadata) | |
| tuned_model_name = upload_metadata.get("output:model_resource_name") | |
| if not tuned_model_name: | |
| print( | |
| "Could not find 'output:model_resource_name' in uploader task metadata. Cannot deploy." | |
| ) | |
| return None | |
| print(f"Found tuned model resource name: {tuned_model_name}") | |
| print(f"Deploying model '{tuned_model_name}'...") | |
| try: | |
| deployed_model_endpoint_obj = TextEmbeddingModel.deploy_tuned_model( | |
| tuned_model_name=tuned_model_name, | |
| machine_type=machine_type, | |
| accelerator=accelerator, | |
| accelerator_count=accelerator_count, | |
| ) | |
| print( | |
| f"Model '{tuned_model_name}' deployed. Endpoint: {deployed_model_endpoint_obj.endpoint_name}" | |
| ) | |
| except Exception as e: | |
| print(f"Error deploying model '{tuned_model_name}': {e}") | |
| return None | |
| return deployed_model_endpoint_obj | |
| def display_tuning_metrics(tuning_job: pipeline_jobs.PipelineJob): | |
| if not tuning_job.done(): | |
| print( | |
| f"Tuning job '{tuning_job.name}' is not yet complete. Metrics might not be final or available." | |
| ) | |
| print(f"\n--- Quality Metrics for Job '{tuning_job.name}' ---") | |
| try: | |
| tuning_job.refresh() | |
| tasks = tuning_job.task_details | |
| eval_task = next((t for t in tasks if "evaluator" in t.task_name), None) | |
| if ( | |
| eval_task | |
| and hasattr(eval_task, "outputs") | |
| and eval_task.outputs | |
| and "metrics" in eval_task.outputs | |
| and eval_task.outputs["metrics"].artifacts | |
| ): | |
| metrics_artifact = eval_task.outputs["metrics"].artifacts[0] | |
| metrics = dict(metrics_artifact.metadata) | |
| metrics_df = pd.DataFrame.from_dict( | |
| {"metric": metrics.keys(), "value": metrics.values()} | |
| ) | |
| print(metrics_df.sort_values(by="metric", ignore_index=True)) | |
| else: | |
| print( | |
| "Evaluator task or metrics artifact not found or empty. Job state:", | |
| tuning_job.state.name, | |
| ) | |
| except Exception as e: | |
| print(f"Could not retrieve or display metrics: {e}") | |
| def get_embeddings_from_deployed_model( | |
| deployed_model_obj: TextEmbeddingModel, | |
| texts_to_embed: list[str], | |
| task_type: str, | |
| titles: list[str] | None = None, | |
| ): | |
| if not deployed_model_obj: | |
| print("No deployed model object available to get embeddings.") | |
| return | |
| print( | |
| f"\n--- Getting Embeddings from Tuned Model on Endpoint: {deployed_model_obj.endpoint_name} ---" | |
| ) | |
| actual_titles = titles if titles else [""] * len(texts_to_embed) | |
| if len(texts_to_embed) != len(actual_titles): | |
| raise ValueError("Length of texts and titles must match.") | |
| embedding_inputs = [ | |
| TextEmbeddingInput(text=text, task_type=task_type, title=title) | |
| for text, title in zip(texts_to_embed, actual_titles) | |
| ] | |
| try: | |
| embeddings_list_sdk = deployed_model_obj.get_embeddings(embedding_inputs) | |
| print(f"Successfully retrieved {len(embeddings_list_sdk)} embeddings:") | |
| for i, emb_sdk_obj in enumerate(embeddings_list_sdk): | |
| series = pd.Series(emb_sdk_obj.values) | |
| print(f'\n--- Embedding for: "{texts_to_embed[i]}" ---') | |
| print(series.head()) | |
| if len(series) > 5: | |
| print("...") | |
| print(f"(Total {len(series)} values)") | |
| except Exception as e: | |
| print(f"Error getting embeddings: {e}") | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Tune and deploy Vertex AI Text Embedding Models." | |
| ) | |
| # ... (argparse 설정은 그대로 둡니다) ... | |
| parser.add_argument( | |
| "--project_id", | |
| type=str, | |
| default=DEFAULT_PROJECT_ID, | |
| help="Google Cloud Project ID.", | |
| ) | |
| parser.add_argument( | |
| "--region", type=str, default=DEFAULT_REGION, help="Google Cloud Region." | |
| ) | |
| parser.add_argument( | |
| "--tuning_job_id", | |
| type=str, | |
| default=DEFAULT_TUNING_JOB_ID, | |
| help="Optional. Vertex AI Pipeline job resource name (e.g., projects/.../pipelineJobs/...) to resume.", | |
| ) | |
| parser.add_argument("--queries_path", type=str, default=DEFAULT_QUERIES_PATH) | |
| parser.add_argument("--corpus_path", type=str, default=DEFAULT_CORPUS_PATH) | |
| parser.add_argument( | |
| "--train_label_path", type=str, default=DEFAULT_TRAIN_LABEL_PATH | |
| ) | |
| parser.add_argument( | |
| "--validation_label_path", | |
| type=str, | |
| default=DEFAULT_VALIDATION_LABEL_PATH, | |
| help="Optional. GCS URI for validation labels. Empty string for auto-split.", | |
| ) | |
| parser.add_argument( | |
| "--test_label_path", | |
| type=str, | |
| default=DEFAULT_TEST_LABEL_PATH, | |
| help="Optional. GCS URI for test labels. Empty string for auto-split.", | |
| ) | |
| parser.add_argument("--train_steps", type=int, default=DEFAULT_TRAIN_STEPS) | |
| parser.add_argument("--batch_size", type=int, default=DEFAULT_BATCH_SIZE) | |
| parser.add_argument( | |
| "--base_model_version_id", | |
| type=str, | |
| default=DEFAULT_BASE_MODEL_VERSION_ID, | |
| choices=[ | |
| "text-embedding-005", | |
| "text-embedding-004", | |
| "text-multilingual-embedding-002", | |
| ], | |
| ) | |
| parser.add_argument( | |
| "--task_type", | |
| type=str, | |
| default=DEFAULT_TASK_TYPE, | |
| choices=[ | |
| "DEFAULT", | |
| "RETRIEVAL_QUERY", | |
| "RETRIEVAL_DOCUMENT", | |
| "SEMANTIC_SIMILARITY", | |
| "CLASSIFICATION", | |
| "CLUSTERING", | |
| "QUESTION_ANSWERING", | |
| "FACT_VERIFICATION", | |
| "CODE_RETRIEVAL_QUERY", | |
| ], | |
| ) | |
| parser.add_argument( | |
| "--learning_rate_multiplier", | |
| type=float, | |
| default=DEFAULT_LEARNING_RATE_MULTIPLIER, | |
| ) | |
| parser.add_argument( | |
| "--output_dimensionality", | |
| type=int, | |
| default=DEFAULT_OUTPUT_DIMENSIONALITY, | |
| help="Desired embedding dimension. -1 for model default/max. Check model compatibility.", | |
| ) | |
| parser.add_argument("--machine_type", type=str, default=DEFAULT_MACHINE_TYPE) | |
| parser.add_argument("--accelerator", type=str, default=DEFAULT_ACCELERATOR) | |
| parser.add_argument( | |
| "--accelerator_count", type=int, default=DEFAULT_ACCELERATOR_COUNT | |
| ) | |
| parser.add_argument( | |
| "--skip_tuning", | |
| action="store_true", | |
| help="Skip tuning, only use existing --tuning_job_id for deployment/metrics.", | |
| ) | |
| parser.add_argument( | |
| "--skip_deployment", | |
| action="store_true", | |
| help="Skip deployment. Runs tuning (if not skipped) and shows metrics.", | |
| ) | |
| parser.add_argument( | |
| "--skip_inference", action="store_true", help="Skip getting sample embeddings." | |
| ) | |
| args = parser.parse_args() | |
| if not args.project_id: | |
| raise ValueError( | |
| "PROJECT_ID must be set via --project_id or configured via gcloud/environment." | |
| ) | |
| print("--- Vertex AI Text Embedding Tuning Script ---") | |
| print(f"Project: {args.project_id}, Region: {args.region}") | |
| # [수정됨] 서비스 계정 키 파일 경로를 사용하여 credentials 객체 생성 | |
| try: | |
| creds = service_account.Credentials.from_service_account_file( | |
| SERVICE_ACCOUNT_KEY_PATH, | |
| scopes=['https://www.googleapis.com/auth/cloud-platform'] # 필요한 경우 범위 명시 | |
| ) | |
| print(f"Using service account credentials from: {SERVICE_ACCOUNT_KEY_PATH}") | |
| except Exception as e: | |
| print(f"Error loading service account key from {SERVICE_ACCOUNT_KEY_PATH}: {e}") | |
| print("Ensure the path is correct and the file is accessible.") | |
| print("Falling back to Application Default Credentials if available.") | |
| # ADC로 폴백하거나, 여기서 스크립트를 종료하도록 선택할 수 있습니다. | |
| # 여기서는 오류를 출력하고 ADC를 사용하도록 둡니다 (vertexai.init이 credentials 없이 호출될 경우). | |
| # 만약 강제로 이 파일만 사용하게 하려면, 여기서 return 또는 raise를 해야 합니다. | |
| # return # <--- 강제하려면 주석 해제 | |
| creds = None # ADC 사용을 위해 None으로 설정 (또는 vertexai.init 호출 시 credentials 인자 제거) | |
| # print("Ensure you are authenticated to Google Cloud (e.g., 'gcloud auth application-default login').") # 이 메시지는 하드코딩 시 덜 관련됨 | |
| print("Required packages: google-cloud-aiplatform, pandas. Install if missing.") | |
| tuning_job: pipeline_jobs.PipelineJob | None = None | |
| tuning_result_sdk_obj = None | |
| if args.skip_tuning: | |
| if not args.tuning_job_id: | |
| print("Error: --skip_tuning requires --tuning_job_id to be provided.") | |
| return | |
| print( | |
| f"Skipping tuning. Attempting to fetch existing job: {args.tuning_job_id}" | |
| ) | |
| try: | |
| # [수정됨] credentials 전달 (creds가 None이면 ADC 사용) | |
| vertexai.init(project=args.project_id, location=args.region, credentials=creds) | |
| tuning_job = pipeline_jobs.PipelineJob.get(resource_name=args.tuning_job_id) | |
| print( | |
| f"Successfully fetched existing job '{tuning_job.name}' (State: {tuning_job.state.name})." | |
| ) | |
| except Exception as e: | |
| print(f"Error fetching existing job '{args.tuning_job_id}': {e}") | |
| return | |
| else: | |
| job_tuple = get_or_create_tuning_job( | |
| project_id=args.project_id, | |
| region=args.region, | |
| tuning_job_id_str=args.tuning_job_id, | |
| base_model_version_id=args.base_model_version_id, | |
| task_type=args.task_type, | |
| corpus_path=args.corpus_path, | |
| queries_path=args.queries_path, | |
| train_label_path=args.train_label_path, | |
| validation_label_path=args.validation_label_path, | |
| test_label_path=args.test_label_path, | |
| batch_size=args.batch_size, | |
| train_steps=args.train_steps, | |
| learning_rate_multiplier=args.learning_rate_multiplier, | |
| output_dimensionality=args.output_dimensionality, | |
| credentials=creds, # [수정됨] credentials 전달 | |
| ) | |
| tuning_job, tuning_result_sdk_obj = job_tuple | |
| # ... (이후 로직은 동일) ... | |
| if not tuning_job: | |
| print("Failed to get or create a tuning job. Exiting.") | |
| return | |
| if args.skip_deployment or tuning_job.done(): | |
| display_tuning_metrics(tuning_job) | |
| else: | |
| print( | |
| f"Tuning job '{tuning_job.name}' is running. Waiting for completion to show metrics and deploy..." | |
| ) | |
| tuning_job.wait() | |
| display_tuning_metrics(tuning_job) | |
| deployed_model_sdk_obj: TextEmbeddingModel | None = None | |
| if not args.skip_deployment: | |
| if tuning_job.state == pipeline_jobs.PipelineState.PIPELINE_STATE_SUCCEEDED: | |
| deployed_model_sdk_obj = deploy_tuned_model_from_job( | |
| tuning_job=tuning_job, | |
| tuning_result_sdk_obj=tuning_result_sdk_obj, | |
| machine_type=args.machine_type, | |
| accelerator=args.accelerator, | |
| accelerator_count=args.accelerator_count, | |
| ) | |
| else: | |
| print( | |
| f"Tuning job {tuning_job.name} did not succeed (State: {tuning_job.state.name}). Skipping deployment." | |
| ) | |
| else: | |
| print("Skipping model deployment as per --skip_deployment flag.") | |
| if not args.skip_inference: | |
| if deployed_model_sdk_obj: | |
| sample_texts = [ | |
| "What is the future of AI?", | |
| "Tell me about financial performance of Alphabet.", | |
| ] | |
| get_embeddings_from_deployed_model( | |
| deployed_model_obj=deployed_model_sdk_obj, | |
| texts_to_embed=sample_texts, | |
| task_type=args.task_type, | |
| ) | |
| else: | |
| print( | |
| "Skipping inference: Model was not deployed in this run, deployment failed, or was skipped." | |
| ) | |
| if args.skip_deployment and args.tuning_job_id: | |
| print( | |
| "If a model was previously deployed from this job, you would need to fetch it separately." | |
| ) | |
| else: | |
| print("Skipping inference as per --skip_inference flag.") | |
| print("\nScript finished.") | |
| print( | |
| "To clean up billable resources, manually delete the Vertex AI Endpoint, Model, and PipelineJob if they are no longer needed." | |
| ) | |
| if __name__ == "__main__": | |
| main() |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/generative_ai/tuned_text-embeddings.ipynb
https://github.com/GoogleCloudPlatform/generative-ai/blob/main/embeddings/intro_embeddings_tuning.ipynb