Source code for langchain.vectorstores.milvus

"""Wrapper around the Milvus vector database."""
from __future__ import annotations

import uuid
from typing import Any, Iterable, List, Optional, Tuple

import numpy as np

from langchain.docstore.document import Document
from langchain.embeddings.base import Embeddings
from langchain.vectorstores.base import VectorStore
from langchain.vectorstores.utils import maximal_marginal_relevance


[docs]class Milvus(VectorStore): """Wrapper around the Milvus vector database.""" def __init__( self, embedding_function: Embeddings, connection_args: dict, collection_name: str, text_field: str, ): """Initialize wrapper around the milvus vector database. In order to use this you need to have `pymilvus` installed and a running Milvus instance. See the following documentation for how to run a Milvus instance: https://milvus.io/docs/install_standalone-docker.md Args: embedding_function (Embeddings): Function used to embed the text connection_args (dict): Arguments for pymilvus connections.connect() collection_name (str): The name of the collection to search. text_field (str): The field in Milvus schema where the original text is stored. """ try: from pymilvus import Collection, DataType, connections except ImportError: raise ValueError( "Could not import pymilvus python package. " "Please install it with `pip install pymilvus`." ) # Connecting to Milvus instance if not connections.has_connection("default"): connections.connect(**connection_args) self.embedding_func = embedding_function self.collection_name = collection_name self.text_field = text_field self.auto_id = False self.primary_field = None self.vector_field = None self.fields = [] self.col = Collection(self.collection_name) schema = self.col.schema # Grabbing the fields for the existing collection. for x in schema.fields: self.fields.append(x.name) if x.auto_id: self.fields.remove(x.name) if x.is_primary: self.primary_field = x.name if x.dtype == DataType.FLOAT_VECTOR or x.dtype == DataType.BINARY_VECTOR: self.vector_field = x.name # Default search params when one is not provided. self.index_params = { "IVF_FLAT": {"params": {"nprobe": 10}}, "IVF_SQ8": {"params": {"nprobe": 10}}, "IVF_PQ": {"params": {"nprobe": 10}}, "HNSW": {"params": {"ef": 10}}, "RHNSW_FLAT": {"params": {"ef": 10}}, "RHNSW_SQ": {"params": {"ef": 10}}, "RHNSW_PQ": {"params": {"ef": 10}}, "IVF_HNSW": {"params": {"nprobe": 10, "ef": 10}}, "ANNOY": {"params": {"search_k": 10}}, }
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, partition_name: Optional[str] = None, timeout: Optional[int] = None, **kwargs: Any, ) -> List[str]: """Insert text data into Milvus. When using add_texts() it is assumed that a collecton has already been made and indexed. If metadata is included, it is assumed that it is ordered correctly to match the schema provided to the Collection and that the embedding vector is the first schema field. Args: texts (Iterable[str]): The text being embedded and inserted. metadatas (Optional[List[dict]], optional): The metadata that corresponds to each insert. Defaults to None. partition_name (str, optional): The partition of the collection to insert data into. Defaults to None. timeout: specified timeout. Returns: List[str]: The resulting keys for each inserted element. """ insert_dict: Any = {self.text_field: list(texts)} try: insert_dict[self.vector_field] = self.embedding_func.embed_documents( list(texts) ) except NotImplementedError: insert_dict[self.vector_field] = [ self.embedding_func.embed_query(x) for x in texts ] # Collect the metadata into the insert dict. if len(self.fields) > 2 and metadatas is not None: for d in metadatas: for key, value in d.items(): if key in self.fields: insert_dict.setdefault(key, []).append(value) # Convert dict to list of lists for insertion insert_list = [insert_dict[x] for x in self.fields] # Insert into the collection. res = self.col.insert( insert_list, partition_name=partition_name, timeout=timeout ) # Flush to make sure newly inserted is immediately searchable. self.col.flush() return res.primary_keys
def _worker_search( self, query: str, k: int = 4, param: Optional[dict] = None, expr: Optional[str] = None, partition_names: Optional[List[str]] = None, round_decimal: int = -1, timeout: Optional[int] = None, **kwargs: Any, ) -> Tuple[List[float], List[Tuple[Document, Any, Any]]]: # Load the collection into memory for searching. self.col.load() # Decide to use default params if not passed in. if param is None: index_type = self.col.indexes[0].params["index_type"] param = self.index_params[index_type] # Embed the query text. data = [self.embedding_func.embed_query(query)] # Determine result metadata fields. output_fields = self.fields[:] output_fields.remove(self.vector_field) # Perform the search. res = self.col.search( data, self.vector_field, param, k, expr=expr, output_fields=output_fields, partition_names=partition_names, round_decimal=round_decimal, timeout=timeout, **kwargs, ) # Organize results. ret = [] for result in res[0]: meta = {x: result.entity.get(x) for x in output_fields} ret.append( ( Document(page_content=meta.pop(self.text_field), metadata=meta), result.distance, result.id, ) ) return data[0], ret
[docs] def similarity_search_with_score( self, query: str, k: int = 4, param: Optional[dict] = None, expr: Optional[str] = None, partition_names: Optional[List[str]] = None, round_decimal: int = -1, timeout: Optional[int] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """Perform a search on a query string and return results. Args: query (str): The text being searched. k (int, optional): The amount of results ot return. Defaults to 4. param (dict, optional): The search params for the specified index. Defaults to None. expr (str, optional): Filtering expression. Defaults to None. partition_names (List[str], optional): Partitions to search through. Defaults to None. round_decimal (int, optional): Round the resulting distance. Defaults to -1. timeout (int, optional): Amount to wait before timeout error. Defaults to None. kwargs: Collection.search() keyword arguments. Returns: List[float], List[Tuple[Document, any, any]]: search_embedding, (Document, distance, primary_field) results. """ _, result = self._worker_search( query, k, param, expr, partition_names, round_decimal, timeout, **kwargs ) return [(x, y) for x, y, _ in result]
[docs] @classmethod def from_texts( cls, texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, **kwargs: Any, ) -> Milvus: """Create a Milvus collection, indexes it with HNSW, and insert data. Args: texts (List[str]): Text to insert. embedding (Embeddings): Embedding function to use. metadatas (Optional[List[dict]], optional): Dict metatadata. Defaults to None. Returns: VectorStore: The Milvus vector store. """ try: from pymilvus import ( Collection, CollectionSchema, DataType, FieldSchema, connections, ) from pymilvus.orm.types import infer_dtype_bydata except ImportError: raise ValueError( "Could not import pymilvus python package. " "Please install it with `pip install pymilvus`." ) # Connect to Milvus instance if not connections.has_connection("default"): connections.connect(**kwargs.get("connection_args", {"port": 19530})) # Determine embedding dim embeddings = embedding.embed_query(texts[0]) dim = len(embeddings) # Generate unique names primary_field = "c" + str(uuid.uuid4().hex) vector_field = "c" + str(uuid.uuid4().hex) text_field = "c" + str(uuid.uuid4().hex) collection_name = "c" + str(uuid.uuid4().hex) fields = [] # Determine metadata schema if metadatas: # Check if all metadata keys line up key = metadatas[0].keys() for x in metadatas: if key != x.keys(): raise ValueError( "Mismatched metadata. " "Make sure all metadata has the same keys and datatype." ) # Create FieldSchema for each entry in singular metadata. for key, value in metadatas[0].items(): # Infer the corresponding datatype of the metadata dtype = infer_dtype_bydata(value) if dtype == DataType.UNKNOWN: raise ValueError(f"Unrecognized datatype for {key}.") elif dtype == DataType.VARCHAR: # Find out max length text based metadata max_length = 0 for subvalues in metadatas: max_length = max(max_length, len(subvalues[key])) fields.append( FieldSchema(key, DataType.VARCHAR, max_length=max_length + 1) ) else: fields.append(FieldSchema(key, dtype)) # Find out max length of texts max_length = 0 for y in texts: max_length = max(max_length, len(y)) # Create the text field fields.append( FieldSchema(text_field, DataType.VARCHAR, max_length=max_length + 1) ) # Create the primary key field fields.append( FieldSchema(primary_field, DataType.INT64, is_primary=True, auto_id=True) ) # Create the vector field fields.append(FieldSchema(vector_field, DataType.FLOAT_VECTOR, dim=dim)) # Create the schema for the collection schema = CollectionSchema(fields) # Create the collection collection = Collection(collection_name, schema) # Index parameters for the collection index = { "index_type": "HNSW", "metric_type": "L2", "params": {"M": 8, "efConstruction": 64}, } # Create the index collection.create_index(vector_field, index) # Create the VectorStore milvus = cls( embedding, kwargs.get("connection_args", {"port": 19530}), collection_name, text_field, ) # Add the texts. milvus.add_texts(texts, metadatas) return milvus