Skip to content

Instantly share code, notes, and snippets.

@kurokobo
Last active December 5, 2025 12:41
Show Gist options
  • Select an option

  • Save kurokobo/51fbe7f92f4526957e12dacfa7783cdf to your computer and use it in GitHub Desktop.

Select an option

Save kurokobo/51fbe7f92f4526957e12dacfa7783cdf to your computer and use it in GitHub Desktop.
Dify: Weaviate 1.19 to 1.27+ Migration Guide (community-edited, simplified)
#!/usr/bin/env python3
"""
Migration script to fix Weaviate schema incompatibility between 1.19.0 and 1.27.0+
This script:
- Identifies collections with old schema (no vectorConfig)
- Creates new collections with proper vectorConfig including "default" named vector
- Migrates data using cursor-based pagination (efficient for large datasets)
- Uses batch operations for fast inserts
- Preserves all object properties and vectors
Note:
- This is a community-edited version of the draft of the script presented by the Dify Team.
- This script is not officially supported by the Dify Team.
- The original source for this script can be found at https://github.com/langgenius/dify/issues/27291#issuecomment-3501003678.
- The changes made in this script are:
- Retrieve Weaviate connection info from environment variables to make this script run in the Worker container.
- Switch to cursor-based pagination in "replace_old_collection", since the migration could fail with large collections.
- Fix an issue where both the old and new collections remained without being deleted after migrating an empty collection.
"""
import os
import weaviate
from weaviate.classes.config import Configure, VectorDistances
import sys
import time
from typing import List, Dict, Any
# Configuration
WEAVIATE_ENDPOINT = os.getenv("WEAVIATE_ENDPOINT", "http://weaviate:8080")
WEAVIATE_GRPC_ENDPOINT = os.getenv("WEAVIATE_GRPC_ENDPOINT", "grpc://weaviate:50051")
WEAVIATE_API_KEY = os.getenv("WEAVIATE_API_KEY", "WVF5YThaHlkYwhGUSmCRgsX3tD5ngdN8pkih")
BATCH_SIZE = 1000
WEAVIATE_HOST = WEAVIATE_ENDPOINT.split("//")[-1].split(":")[0]
WEAVIATE_PORT = int(WEAVIATE_ENDPOINT.split(":")[-1])
WEAVIATE_GRPC_PORT = int(WEAVIATE_GRPC_ENDPOINT.split(":")[-1])
def identify_old_collections(client: weaviate.WeaviateClient) -> List[str]:
"""Identify collections that need migration (those without vectorConfig)"""
collections_to_migrate = []
all_collections = client.collections.list_all()
print(f"Found {len(all_collections)} total collections")
for collection_name in all_collections.keys():
# Only check Vector_index collections (Dify knowledge bases)
if not collection_name.startswith("Vector_index_"):
continue
collection = client.collections.get(collection_name)
config = collection.config.get()
# Check if this collection has the old schema
if config.vector_config is None:
collections_to_migrate.append(collection_name)
print(f" - {collection_name}: OLD SCHEMA (needs migration)")
else:
print(f" - {collection_name}: NEW SCHEMA (skip)")
return collections_to_migrate
def get_collection_schema(client: weaviate.WeaviateClient, collection_name: str) -> Dict[str, Any]:
"""Get the full schema of a collection via REST API"""
import requests
response = requests.get(
f"http://{WEAVIATE_HOST}:{WEAVIATE_PORT}/v1/schema/{collection_name}",
headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}
)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to get schema: {response.text}")
def create_new_collection(client: weaviate.WeaviateClient, old_name: str, schema: Dict[str, Any]) -> str:
"""Create a new collection with updated schema using REST API"""
import requests
# Generate new collection name
new_name = f"{old_name}_migrated"
print(f"Creating new collection: {new_name}")
# Build new schema with proper vectorConfig
# Note: When using vectorConfig (named vectors), we don't set class-level vectorizer
new_schema = {
"class": new_name,
# This is the key: define vectorConfig with "default" named vector
# Do NOT set class-level vectorizer when using vectorConfig
"vectorConfig": {
"default": {
"vectorizer": {
"none": {}
},
"vectorIndexType": "hnsw",
"vectorIndexConfig": {
"distance": "cosine",
"ef": -1,
"efConstruction": 128,
"maxConnections": 32
}
}
},
"properties": []
}
# Copy properties from old schema
if "properties" in schema:
new_schema["properties"] = schema["properties"]
# Create collection via REST API
response = requests.post(
f"{WEAVIATE_ENDPOINT}/v1/schema",
json=new_schema,
headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}
)
if response.status_code not in [200, 201]:
raise Exception(f"Failed to create collection: {response.text}")
print(f" Created new collection: {new_name}")
return new_name
def migrate_collection_data(
client: weaviate.WeaviateClient,
old_collection_name: str,
new_collection_name: str
) -> int:
"""Migrate data from old collection to new collection using cursor-based pagination"""
old_collection = client.collections.get(old_collection_name)
new_collection = client.collections.get(new_collection_name)
total_migrated = 0
cursor = None
print(f"Migrating data from {old_collection_name} to {new_collection_name}")
while True:
# Fetch batch of objects using cursor-based pagination
if cursor is None:
# First batch
response = old_collection.query.fetch_objects(
limit=BATCH_SIZE,
include_vector=True
)
else:
# Subsequent batches using cursor
response = old_collection.query.fetch_objects(
limit=BATCH_SIZE,
include_vector=True,
after=cursor
)
objects = response.objects
if not objects:
break
# Use batch insert for efficiency
with new_collection.batch.dynamic() as batch:
for obj in objects:
# Prepare properties
properties = obj.properties
# Add object with vector
batch.add_object(
properties=properties,
vector=obj.vector["default"] if isinstance(obj.vector, dict) else obj.vector,
uuid=obj.uuid
)
total_migrated += len(objects)
print(f" Migrated {total_migrated} objects...")
# Update cursor for next iteration
if len(objects) < BATCH_SIZE:
# Last batch
break
else:
# Get the last object's UUID for cursor
cursor = objects[-1].uuid
print(f" Total migrated: {total_migrated} objects")
return total_migrated
def verify_migration(
client: weaviate.WeaviateClient,
old_collection_name: str,
new_collection_name: str
):
"""Verify that the migration was successful"""
old_collection = client.collections.get(old_collection_name)
new_collection = client.collections.get(new_collection_name)
# Count objects in both collections
old_count_response = old_collection.query.fetch_objects(limit=1)
new_count_response = new_collection.query.fetch_objects(limit=1)
# Get aggregation for accurate counts
old_agg = old_collection.aggregate.over_all(total_count=True)
new_agg = new_collection.aggregate.over_all(total_count=True)
old_count = old_agg.total_count
new_count = new_agg.total_count
print(f"\nVerification:")
print(f" Old collection ({old_collection_name}): {old_count} objects")
print(f" New collection ({new_collection_name}): {new_count} objects")
if old_count == new_count:
print(f" Status: SUCCESS - Counts match!")
return True
else:
print(f" Status: WARNING - Counts don't match!")
return False
def replace_old_collection(
client: weaviate.WeaviateClient,
old_collection_name: str,
new_collection_name: str
):
"""Replace old collection with migrated one by recreating with original name"""
import requests
print(f"\nReplacing old collection with migrated data...")
# Step 1: Delete old collection
print(f" Step 1: Deleting old collection...")
response = requests.delete(
f"{WEAVIATE_ENDPOINT}/v1/schema/{old_collection_name}",
headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}
)
if response.status_code != 200:
print(f" Warning: Could not delete old collection: {response.text}")
else:
print(f" Deleted")
# Step 2: Get schema from migrated collection
print(f" Step 2: Getting schema from migrated collection...")
schema_response = requests.get(
f"{WEAVIATE_ENDPOINT}/v1/schema/{new_collection_name}",
headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}
)
schema = schema_response.json()
schema["class"] = old_collection_name
# Step 3: Create collection with original name and new schema
print(f" Step 3: Creating collection with original name...")
create_response = requests.post(
f"{WEAVIATE_ENDPOINT}/v1/schema",
json=schema,
headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}
)
if create_response.status_code not in [200, 201]:
raise Exception(f"Failed to create collection: {create_response.text}")
print(f" Created")
# Step 4: Copy data to collection with original name using cursor-based pagination
print(f" Step 4: Copying data to original collection name...")
migrated_collection = client.collections.get(new_collection_name)
new_collection = client.collections.get(old_collection_name)
total_copied = 0
cursor = None
while True:
# Fetch batch of objects using cursor-based pagination
if cursor is None:
# First batch
response = migrated_collection.query.fetch_objects(
include_vector=True,
limit=BATCH_SIZE
)
else:
# Subsequent batches using cursor
response = migrated_collection.query.fetch_objects(
include_vector=True,
limit=BATCH_SIZE,
after=cursor
)
objects = response.objects
if not objects:
break
# Use batch insert for efficiency
with new_collection.batch.dynamic() as batch:
for obj in objects:
batch.add_object(
properties=obj.properties,
vector=obj.vector,
uuid=obj.uuid
)
total_copied += len(objects)
print(f" Copied {total_copied} objects...")
# Update cursor for next iteration
if len(objects) < BATCH_SIZE:
break
else:
cursor = objects[-1].uuid
print(f" Total copied: {total_copied} objects")
# Step 5: Delete the temporary migrated collection
print(f" Step 5: Cleaning up temporary migrated collection...")
response = requests.delete(
f"{WEAVIATE_ENDPOINT}/v1/schema/{new_collection_name}",
headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}
)
if response.status_code == 200:
print(f" Cleaned up")
print(f"\n SUCCESS! {old_collection_name} now has the new schema with {total_copied} objects")
return True
def migrate_all_collections():
"""Main migration function"""
print("=" * 80)
print("Weaviate Collection Migration Script")
print("Migrating from Weaviate 1.19.0 schema to 1.27.0+ schema")
print("=" * 80)
print()
client = weaviate.connect_to_local(
host=WEAVIATE_HOST,
port=WEAVIATE_PORT,
grpc_port=WEAVIATE_GRPC_PORT,
auth_credentials=weaviate.auth.AuthApiKey(WEAVIATE_API_KEY)
)
try:
# Step 1: Identify collections that need migration
print("Step 1: Identifying collections that need migration...")
collections_to_migrate = identify_old_collections(client)
if not collections_to_migrate:
print("\nNo collections need migration. All collections are up to date!")
return
print(f"\nFound {len(collections_to_migrate)} collections to migrate:")
for col in collections_to_migrate:
print(f" - {col}")
# Confirm before proceeding
print("\nThis script will:")
print("1. Create new collections with updated schema")
print("2. Copy all data using efficient batch operations")
print("3. Verify the migration")
print("4. Optionally rename collections to activate the new ones")
print()
# Step 2: Migrate each collection
for collection_name in collections_to_migrate:
print("\n" + "=" * 80)
print(f"Migrating: {collection_name}")
print("=" * 80)
try:
# Get old schema
schema = get_collection_schema(client, collection_name)
# Create new collection
new_collection_name = create_new_collection(client, collection_name, schema)
# Migrate data
migrated_count = migrate_collection_data(client, collection_name, new_collection_name)
# Verify migration
success = verify_migration(client, collection_name, new_collection_name)
if success:
print(f"\nMigration successful for {collection_name}!")
print(f"New collection: {new_collection_name}")
# Automatically replace old collection with migrated one
try:
replace_old_collection(client, collection_name, new_collection_name)
except Exception as e:
print(f"\nWarning: Could not automatically replace collection: {e}")
print(f"\nTo activate manually:")
print(f"1. Delete the old collection: {collection_name}")
print(f"2. Rename {new_collection_name} to {collection_name}")
except Exception as e:
print(f"\nError migrating {collection_name}: {e}")
print(f"Skipping this collection and continuing...")
continue
print("\n" + "=" * 80)
print("Migration Complete!")
print("=" * 80)
print("\nSummary:")
print(f" Collections migrated: {len(collections_to_migrate)}")
finally:
client.close()
if __name__ == "__main__":
try:
migrate_all_collections()
except KeyboardInterrupt:
print("\n\nMigration interrupted by user.")
sys.exit(1)
except Exception as e:
print(f"\n\nFatal error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)

Weaviate 1.19 to 1.27+ Migration Guide for Dify

  • ⚠️ This guide is not officially supported by the Dify Team.
  • ⚠️ This is a community-edited, simplified version of the draft of the official migration guide presented by the Dify Team.
  • ⚠️ The original source for this guide and script can be found at #27291.

Complete guide to safely migrate Dify knowledge bases from Weaviate 1.19 to 1.27/1.33.


Outline

This guide covers the following two cases.
While Case A is recommended for a safer migration, this guide can also be applied to Case B:

  • Case A
    • You are currently running a version of Dify 1.9.1 or earlier with Weaviate 1.19 included.
    • All knowledge is functioning properly.
  • Case B
    • You have already upgraded to Weaviate 1.27+ and are running Dify 1.9.2 or later.
    • The knowledge created with the previous version is corrupted, and you have no backup to revert to the earlier version.

The procedure in this guide is as follows:

  1. Take a complete backup of your current Dify environment.
  2. If your Dify version is 1.9.1 or earlier, upgrade Dify.
  3. Operate the weaviate container and modify the directory structure of the LSM data.
  4. Operate the worker container and run the migration script.
  5. Perform cleanup.

Migration Procedure

Note:
This procedure cannot be rolled back by any means other than a restore. Attempting to roll back using anything other than a restore may make things worse.
We recommend that you follow the steps to take a full backup first, in preparation for a possible restore.


Step 1: Backup Your Environment

Stop your Dify services:

cd /path/to/dify/docker
docker compose down

Then making full copy or archive of your entire docker directory (/path/to/dify/docker for example) as a safety measure.

If you encounter issues later, you can restore this backup to revert to the original state.


Step 2: Upgrade to Weaviate 1.27+ (Only for Case A)

This step is only for Case A - users currently on Dify 1.9.1 or earlier with Weaviate 1.19.
If you are already running Weaviate 1.27+ (Case B), you can skip this step.

Follow the upgrade guide to move to the latest (or a specific) Dify version that uses Weaviate 1.27+.


Step 3: Fix Orphaned LSM Data

If your Dify has stopped, start it and wait until it has fully launched.

cd /path/to/dify/docker
docker compose up -d

Ensure your Weaviate using the image version 1.27.0 or higher.

cd /path/to/dify/docker
docker compose ps weaviate  # The "IMAGE" column should show "semitechnologies/weaviate:1.27.0" or higher

Enter the shell of your weaviatwe container:

cd /path/to/dify/docker
docker compose exec -it weaviate /bin/sh

Then run the following commands inside the container to fix LSM data:

cd /var/lib/weaviate
for dir in vector_index_*_node_*_lsm; do
  [ -d "$dir" ] || continue
  
  # Extract index ID and shard ID
  index_id=$(echo "$dir" | sed -n 's/vector_index_\([^_]*_[^_]*_[^_]*_[^_]*_[^_]*\)_node_.*/\1/p')
  shard_id=$(echo "$dir" | sed -n 's/.*_node_\([^_]*\)_lsm/\1/p')
  
  # Create target directory and copy
  mkdir -p "vector_index_${index_id}_node/$shard_id/lsm"
  cp -a "$dir/"* "vector_index_${index_id}_node/$shard_id/lsm/"
  
  echo "✓ Copied $dir"
done
exit

Then restart weaviate container to ensure changes are recognized:

cd /path/to/dify/docker
docker compose restart weaviate

Step 4: Migrate Schema

Place migrate_weaviate_collections_ce.py script to your /path/to/dify/docker/volumes/app/storage/ directory, then enter the shell of your worker container:

cp /path/to/migrate_weaviate_collections_ce.py /path/to/dify/docker/volumes/app/storage/
cd /path/to/dify/docker
docker compose exec -it worker /bin/bash

Then run the following commands inside the container to execute the migration script:

uv run /app/api/storage/migrate_weaviate_collections_ce.py
exit

Restart Dify services:

docker compose down
docker compose up -d

Verify in Dify UI:

  1. Go to your Dify console
  2. Open your knowledge bases
  3. Try "Retrieval Testing"
  4. Should work without errors!

Step 5: Cleanup (Optional)

After successful migration, you can delete orphaned files to free up space.
Enter the shell of your weaviatwe container:

cd /path/to/dify/docker
docker compose exec -it weaviate /bin/sh

Then run the following commands inside the container to delete orphaned files:

cd /var/lib/weaviate
rm -rf vector_index_*_node_*
exit

Also, you can delete the migration script from your storage volume:

rm /path/to/dify/docker/volumes/app/storage/migrate_weaviate_collections_ce.py

Files Needed

Credits

  • Original migration approach: Dify team
  • LSM recovery method: Chinese Dify community user
  • Combined solution: Community effort
@tomy-kyu
Copy link

tomy-kyu commented Dec 5, 2025

@kurokobo
Thank you for your hard work on these modifications.
This procedure contains more detailed steps than the original plan, so when actually implementing the work, I plan to refer to this guide as a reference. If this proves to be of any help to you, I would be truly grateful.

One question I have for confirmation: In Step 3, you're starting the DIFY-related containers updated to version 1.9.2. Is this done so that the Worker-side can access the updated Weaviate client when performing operations in Step 4?

I also wondered whether starting the API also triggers schema updates in PostgreSQL, so perhaps you launched all services at once to ensure synchronization of update timing. I'd appreciate it if you could confirm this for me.

@kurokobo
Copy link
Author

kurokobo commented Dec 5, 2025

@tomy-kyu
Thank you for your positive comment.

One question I have for confirmation: In Step 3, you're starting the DIFY-related containers updated to version 1.9.2. Is this done so that the Worker-side can access the updated Weaviate client when performing operations in Step 4?

I also wondered whether starting the API also triggers schema updates in PostgreSQL, so perhaps you launched all services at once to ensure synchronization of update timing. I'd appreciate it if you could confirm this for me.

Your understanding is almost correct.

Technically, PostgreSQL migration is completely independent from Weaviate migration, and there is no dependency between them.
Therefore, when considering the migration steps for Weaviate, there is no need to take the PostgreSQL migration into account.

However, as you mentioned, Weaviate migration requires a new client, which is deployed in the Worker, API, and Beat containers. Of course, it's also possible to set up the Weaviate client on a user's terminal (as the official documentation suggests), but that's not necessarily a recommended approach for everyone.

Given this background, I thought that updating the API, Worker, Beat, and Weaviate all at once would be the most reasonable, simple, and least problematic migration process.

Hope this helps :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment