Skip to content

API Reference

RAGTrainer

Main trainer to fine-tune/train ColBERT models with a few lines.

Source code in ragatouille/RAGTrainer.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
class RAGTrainer:
    """Main trainer to fine-tune/train ColBERT models with a few lines."""

    model: Union[LateInteractionModel, None] = None
    negative_miner: Union[HardNegativeMiner, None] = None
    collection: list[str] = []
    queries: Union[list[str], None] = None
    raw_data: Union[list[tuple], list[list], None] = None
    training_triplets: list[list[int]] = list()

    def __init__(
        self,
        model_name: str,
        pretrained_model_name: str,
        language_code: str = "en",
        n_usable_gpus: int = -1,
    ):
        """
        Initialise a RAGTrainer instance. This will load a base model: either an existing ColBERT model to fine-tune or a BERT/RoBERTa-like model to build a new ColBERT model from.

        Parameters:
            model_name: str - Name of the model to train. This will be used to name the checkpoints and the index.
            pretrained_model_name: str - Name of the pretrained model to use as a base. Can be a local path to a checkpoint or a huggingface model name.
            language_code: str - Language code of the model to train. This will be used to name the checkpoints and the index.
            n_usable_gpus: int - Number of GPUs to use. By default, value is -1, which means use all available GPUs or none if no GPU is available.

        Returns:
            self (RAGTrainer): The current instance of RAGTrainer, with the base model initialised.
        """

        self.model_name = model_name
        self.pretrained_model_name = pretrained_model_name
        self.language_code = language_code
        self.model = ColBERT(
            pretrained_model_name_or_path=pretrained_model_name,
            n_gpu=n_usable_gpus,
            training_mode=True,
        )

    def add_documents(self, documents: list[str]):
        self.collection += documents
        seeded_shuffle(self.collection)

    def export_training_data(self, path: Union[str, Path]):
        """
        Manually export the training data processed by prepare_training_data to a given path.

        Parameters:
            path: Union[str, Path] - Path to the directory where the data will be exported."""
        self.data_processor.export_training_data(path)

    def prepare_training_data(
        self,
        raw_data: Union[list[tuple], list[list]],
        all_documents: Optional[list[str]] = None,
        data_out_path: Union[str, Path] = "./data/",
        num_new_negatives: int = 10,
        hard_negative_minimum_rank: int = 10,
        mine_hard_negatives: bool = True,
        hard_negative_model_size: str = "small",
        pairs_with_labels: bool = False,
        positive_label: Union[int, str] = 1,
        negative_label: Union[int, str] = 0,
    ) -> str:
        """
        Fully pre-process input-data in various raw formats into ColBERT-ready files and triplets.
        Will accept a variety of formats, such as unannotated pairs, annotated pairs, triplets of strings and triplets of list of strings.
        Will process into a ColBERT-ready format and export to data_out_path.
        Will generate hard negatives if mine_hard_negatives is True.
        num_new_negatives decides how many negatives will be generated. if mine_hard_negatives is False and num_new_negatives is > 0, these negatives will be randomly sampled.

        Parameters:
            raw_data: Union[list[tuple], list[list]] - List of pairs, annotated pairs, or triplets of strings.
            all_documents: Optional[list[str]] - A corpus of documents to be used for sampling negatives.
            data_out_path: Union[str, Path] - Path to the directory where the data will be exported (can be a tmp directory).
            num_new_negatives: int - Number of new negatives to generate for each query.
            mine_hard_negatives: bool - Whether to use hard negatives mining or not.
            hard_negative_model_size: str - Size of the model to use for hard negatives mining.
            pairs_with_labels: bool - Whether the raw_data is a list of pairs with labels or not.
            positive_label: Union[int, str] - Label to use for positive pairs.
            negative_label: Union[int, str] - Label to use for negative pairs.

        Returns:
            data_out_path: Union[str, Path] - Path to the directory where the data has been exported.
        """
        if all_documents is not None:
            self.collection += all_documents

        self.data_dir = Path(data_out_path)
        if len(raw_data[0]) == 2:
            data_type = "pairs"
            if pairs_with_labels:
                data_type = "labeled_pairs"
        elif len(raw_data[0]) == 3:
            data_type = "triplets"
        else:
            raise ValueError("Raw data must be a list of pairs or triplets of strings.")

        if type(raw_data[0][1]) == str:
            self.collection += [x[1] for x in raw_data]
        else:
            for x in raw_data:
                for txt in x[1]:
                    self.collection.append(txt)
        if data_type == "triplets":
            if type(raw_data[0][2]) == str:
                self.collection += [x[2] for x in raw_data]
            else:
                for x in raw_data:
                    for txt in x[2]:
                        self.collection.append(txt)

        self.queries = set([x[0] for x in raw_data])
        self.collection = list(set(self.collection))
        seeded_shuffle(self.collection)

        if mine_hard_negatives:
            self.negative_miner = SimpleMiner(
                language_code=self.language_code,
                model_size=hard_negative_model_size,
            )
            self.negative_miner.build_index(self.collection)

        self.data_processor = TrainingDataProcessor(
            collection=self.collection,
            queries=self.queries,
            negative_miner=self.negative_miner if mine_hard_negatives else None,
        )

        self.data_processor.process_raw_data(
            data_type=data_type,
            raw_data=raw_data,
            export=True,
            data_dir=data_out_path,
            num_new_negatives=num_new_negatives,
            positive_label=positive_label,
            negative_label=negative_label,
            mine_hard_negatives=mine_hard_negatives,
            hard_negative_minimum_rank=hard_negative_minimum_rank,
        )

        self.training_triplets = self.data_processor.training_triplets

        return data_out_path

    def train(
        self,
        batch_size: int = 32,
        nbits: int = 2,
        maxsteps: int = 500_000,
        use_ib_negatives: bool = True,
        learning_rate: float = 5e-6,
        dim: int = 128,
        doc_maxlen: int = 256,
        use_relu: bool = False,
        warmup_steps: Union[int, Literal["auto"]] = "auto",
        accumsteps: int = 1,
    ) -> str:
        """
        Launch training or fine-tuning of a ColBERT model.

        Parameters:
            batch_size: int - Total batch size -- divice by n_usable_gpus for per-GPU batch size.
            nbits: int - number of bits used for vector compression by the traiened model. 2 is usually ideal.
            maxsteps: int - End training early after maxsteps steps.
            use_ib_negatives: bool - Whether to use in-batch negatives to calculate loss or not.
            learning_rate: float - ColBERT litterature usually has this performing best between 3e-6 - 2e-5 depending on data size
            dim: int - Size of individual vector representations.
            doc_maxlen: int - The maximum length after which passages will be truncated
            warmup_steps: Union[int, Literal["auto"]] - How many warmup steps to use for the learning rate.
                                                      Auto will default to 10% of total steps
            accumsteps: How many gradient accummulation steps to use to simulate higher batch sizes.

        Returns:
            model_path: str - Path to the trained model.
        """
        if not self.training_triplets:
            total_triplets = sum(
                1 for _ in open(str(self.data_dir / "triples.train.colbert.jsonl"), "r")
            )
        else:
            total_triplets = len(self.training_triplets)

        training_config = ColBERTConfig(
            bsize=batch_size,
            model_name=self.model_name,
            name=self.model_name,
            checkpoint=self.pretrained_model_name,
            use_ib_negatives=use_ib_negatives,
            maxsteps=maxsteps,
            nbits=nbits,
            lr=learning_rate,
            dim=dim,
            doc_maxlen=doc_maxlen,
            relu=use_relu,
            accumsteps=accumsteps,
            warmup=int(total_triplets // batch_size * 0.1)
            if warmup_steps == "auto"
            else warmup_steps,
            save_every=int(total_triplets // batch_size // 10),
        )

        return self.model.train(data_dir=self.data_dir, training_config=training_config)

__init__(model_name, pretrained_model_name, language_code='en', n_usable_gpus=-1)

Initialise a RAGTrainer instance. This will load a base model: either an existing ColBERT model to fine-tune or a BERT/RoBERTa-like model to build a new ColBERT model from.

Parameters:

Name Type Description Default
model_name str

str - Name of the model to train. This will be used to name the checkpoints and the index.

required
pretrained_model_name str

str - Name of the pretrained model to use as a base. Can be a local path to a checkpoint or a huggingface model name.

required
language_code str

str - Language code of the model to train. This will be used to name the checkpoints and the index.

'en'
n_usable_gpus int

int - Number of GPUs to use. By default, value is -1, which means use all available GPUs or none if no GPU is available.

-1

Returns:

Name Type Description
self RAGTrainer

The current instance of RAGTrainer, with the base model initialised.

Source code in ragatouille/RAGTrainer.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def __init__(
    self,
    model_name: str,
    pretrained_model_name: str,
    language_code: str = "en",
    n_usable_gpus: int = -1,
):
    """
    Initialise a RAGTrainer instance. This will load a base model: either an existing ColBERT model to fine-tune or a BERT/RoBERTa-like model to build a new ColBERT model from.

    Parameters:
        model_name: str - Name of the model to train. This will be used to name the checkpoints and the index.
        pretrained_model_name: str - Name of the pretrained model to use as a base. Can be a local path to a checkpoint or a huggingface model name.
        language_code: str - Language code of the model to train. This will be used to name the checkpoints and the index.
        n_usable_gpus: int - Number of GPUs to use. By default, value is -1, which means use all available GPUs or none if no GPU is available.

    Returns:
        self (RAGTrainer): The current instance of RAGTrainer, with the base model initialised.
    """

    self.model_name = model_name
    self.pretrained_model_name = pretrained_model_name
    self.language_code = language_code
    self.model = ColBERT(
        pretrained_model_name_or_path=pretrained_model_name,
        n_gpu=n_usable_gpus,
        training_mode=True,
    )

export_training_data(path)

Manually export the training data processed by prepare_training_data to a given path.

Parameters:

Name Type Description Default
path Union[str, Path]

Union[str, Path] - Path to the directory where the data will be exported.

required
Source code in ragatouille/RAGTrainer.py
55
56
57
58
59
60
61
def export_training_data(self, path: Union[str, Path]):
    """
    Manually export the training data processed by prepare_training_data to a given path.

    Parameters:
        path: Union[str, Path] - Path to the directory where the data will be exported."""
    self.data_processor.export_training_data(path)

prepare_training_data(raw_data, all_documents=None, data_out_path='./data/', num_new_negatives=10, hard_negative_minimum_rank=10, mine_hard_negatives=True, hard_negative_model_size='small', pairs_with_labels=False, positive_label=1, negative_label=0)

Fully pre-process input-data in various raw formats into ColBERT-ready files and triplets. Will accept a variety of formats, such as unannotated pairs, annotated pairs, triplets of strings and triplets of list of strings. Will process into a ColBERT-ready format and export to data_out_path. Will generate hard negatives if mine_hard_negatives is True. num_new_negatives decides how many negatives will be generated. if mine_hard_negatives is False and num_new_negatives is > 0, these negatives will be randomly sampled.

Parameters:

Name Type Description Default
raw_data Union[list[tuple], list[list]]

Union[list[tuple], list[list]] - List of pairs, annotated pairs, or triplets of strings.

required
all_documents Optional[list[str]]

Optional[list[str]] - A corpus of documents to be used for sampling negatives.

None
data_out_path Union[str, Path]

Union[str, Path] - Path to the directory where the data will be exported (can be a tmp directory).

'./data/'
num_new_negatives int

int - Number of new negatives to generate for each query.

10
mine_hard_negatives bool

bool - Whether to use hard negatives mining or not.

True
hard_negative_model_size str

str - Size of the model to use for hard negatives mining.

'small'
pairs_with_labels bool

bool - Whether the raw_data is a list of pairs with labels or not.

False
positive_label Union[int, str]

Union[int, str] - Label to use for positive pairs.

1
negative_label Union[int, str]

Union[int, str] - Label to use for negative pairs.

0

Returns:

Name Type Description
data_out_path str

Union[str, Path] - Path to the directory where the data has been exported.

Source code in ragatouille/RAGTrainer.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def prepare_training_data(
    self,
    raw_data: Union[list[tuple], list[list]],
    all_documents: Optional[list[str]] = None,
    data_out_path: Union[str, Path] = "./data/",
    num_new_negatives: int = 10,
    hard_negative_minimum_rank: int = 10,
    mine_hard_negatives: bool = True,
    hard_negative_model_size: str = "small",
    pairs_with_labels: bool = False,
    positive_label: Union[int, str] = 1,
    negative_label: Union[int, str] = 0,
) -> str:
    """
    Fully pre-process input-data in various raw formats into ColBERT-ready files and triplets.
    Will accept a variety of formats, such as unannotated pairs, annotated pairs, triplets of strings and triplets of list of strings.
    Will process into a ColBERT-ready format and export to data_out_path.
    Will generate hard negatives if mine_hard_negatives is True.
    num_new_negatives decides how many negatives will be generated. if mine_hard_negatives is False and num_new_negatives is > 0, these negatives will be randomly sampled.

    Parameters:
        raw_data: Union[list[tuple], list[list]] - List of pairs, annotated pairs, or triplets of strings.
        all_documents: Optional[list[str]] - A corpus of documents to be used for sampling negatives.
        data_out_path: Union[str, Path] - Path to the directory where the data will be exported (can be a tmp directory).
        num_new_negatives: int - Number of new negatives to generate for each query.
        mine_hard_negatives: bool - Whether to use hard negatives mining or not.
        hard_negative_model_size: str - Size of the model to use for hard negatives mining.
        pairs_with_labels: bool - Whether the raw_data is a list of pairs with labels or not.
        positive_label: Union[int, str] - Label to use for positive pairs.
        negative_label: Union[int, str] - Label to use for negative pairs.

    Returns:
        data_out_path: Union[str, Path] - Path to the directory where the data has been exported.
    """
    if all_documents is not None:
        self.collection += all_documents

    self.data_dir = Path(data_out_path)
    if len(raw_data[0]) == 2:
        data_type = "pairs"
        if pairs_with_labels:
            data_type = "labeled_pairs"
    elif len(raw_data[0]) == 3:
        data_type = "triplets"
    else:
        raise ValueError("Raw data must be a list of pairs or triplets of strings.")

    if type(raw_data[0][1]) == str:
        self.collection += [x[1] for x in raw_data]
    else:
        for x in raw_data:
            for txt in x[1]:
                self.collection.append(txt)
    if data_type == "triplets":
        if type(raw_data[0][2]) == str:
            self.collection += [x[2] for x in raw_data]
        else:
            for x in raw_data:
                for txt in x[2]:
                    self.collection.append(txt)

    self.queries = set([x[0] for x in raw_data])
    self.collection = list(set(self.collection))
    seeded_shuffle(self.collection)

    if mine_hard_negatives:
        self.negative_miner = SimpleMiner(
            language_code=self.language_code,
            model_size=hard_negative_model_size,
        )
        self.negative_miner.build_index(self.collection)

    self.data_processor = TrainingDataProcessor(
        collection=self.collection,
        queries=self.queries,
        negative_miner=self.negative_miner if mine_hard_negatives else None,
    )

    self.data_processor.process_raw_data(
        data_type=data_type,
        raw_data=raw_data,
        export=True,
        data_dir=data_out_path,
        num_new_negatives=num_new_negatives,
        positive_label=positive_label,
        negative_label=negative_label,
        mine_hard_negatives=mine_hard_negatives,
        hard_negative_minimum_rank=hard_negative_minimum_rank,
    )

    self.training_triplets = self.data_processor.training_triplets

    return data_out_path

train(batch_size=32, nbits=2, maxsteps=500000, use_ib_negatives=True, learning_rate=5e-06, dim=128, doc_maxlen=256, use_relu=False, warmup_steps='auto', accumsteps=1)

Launch training or fine-tuning of a ColBERT model.

Parameters:

Name Type Description Default
batch_size int

int - Total batch size -- divice by n_usable_gpus for per-GPU batch size.

32
nbits int

int - number of bits used for vector compression by the traiened model. 2 is usually ideal.

2
maxsteps int

int - End training early after maxsteps steps.

500000
use_ib_negatives bool

bool - Whether to use in-batch negatives to calculate loss or not.

True
learning_rate float

float - ColBERT litterature usually has this performing best between 3e-6 - 2e-5 depending on data size

5e-06
dim int

int - Size of individual vector representations.

128
doc_maxlen int

int - The maximum length after which passages will be truncated

256
warmup_steps Union[int, Literal['auto']]

Union[int, Literal["auto"]] - How many warmup steps to use for the learning rate. Auto will default to 10% of total steps

'auto'
accumsteps int

How many gradient accummulation steps to use to simulate higher batch sizes.

1

Returns:

Name Type Description
model_path str

str - Path to the trained model.

Source code in ragatouille/RAGTrainer.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def train(
    self,
    batch_size: int = 32,
    nbits: int = 2,
    maxsteps: int = 500_000,
    use_ib_negatives: bool = True,
    learning_rate: float = 5e-6,
    dim: int = 128,
    doc_maxlen: int = 256,
    use_relu: bool = False,
    warmup_steps: Union[int, Literal["auto"]] = "auto",
    accumsteps: int = 1,
) -> str:
    """
    Launch training or fine-tuning of a ColBERT model.

    Parameters:
        batch_size: int - Total batch size -- divice by n_usable_gpus for per-GPU batch size.
        nbits: int - number of bits used for vector compression by the traiened model. 2 is usually ideal.
        maxsteps: int - End training early after maxsteps steps.
        use_ib_negatives: bool - Whether to use in-batch negatives to calculate loss or not.
        learning_rate: float - ColBERT litterature usually has this performing best between 3e-6 - 2e-5 depending on data size
        dim: int - Size of individual vector representations.
        doc_maxlen: int - The maximum length after which passages will be truncated
        warmup_steps: Union[int, Literal["auto"]] - How many warmup steps to use for the learning rate.
                                                  Auto will default to 10% of total steps
        accumsteps: How many gradient accummulation steps to use to simulate higher batch sizes.

    Returns:
        model_path: str - Path to the trained model.
    """
    if not self.training_triplets:
        total_triplets = sum(
            1 for _ in open(str(self.data_dir / "triples.train.colbert.jsonl"), "r")
        )
    else:
        total_triplets = len(self.training_triplets)

    training_config = ColBERTConfig(
        bsize=batch_size,
        model_name=self.model_name,
        name=self.model_name,
        checkpoint=self.pretrained_model_name,
        use_ib_negatives=use_ib_negatives,
        maxsteps=maxsteps,
        nbits=nbits,
        lr=learning_rate,
        dim=dim,
        doc_maxlen=doc_maxlen,
        relu=use_relu,
        accumsteps=accumsteps,
        warmup=int(total_triplets // batch_size * 0.1)
        if warmup_steps == "auto"
        else warmup_steps,
        save_every=int(total_triplets // batch_size // 10),
    )

    return self.model.train(data_dir=self.data_dir, training_config=training_config)

RAGPretrainedModel

Wrapper class for a pretrained RAG late-interaction model, and all the associated utilities. Allows you to load a pretrained model from disk or from the hub, build or query an index.

Usage

Load a pre-trained checkpoint:

from ragatouille import RAGPretrainedModel

RAG = RAGPretrainedModel.from_pretrained("colbert-ir/colbertv2.0")

Load checkpoint from an existing index:

from ragatouille import RAGPretrainedModel

RAG = RAGPretrainedModel.from_index("path/to/my/index")

Both methods will load a fully initialised instance of ColBERT, which you can use to build and query indexes.

RAG.search("How many people live in France?")
Source code in ragatouille/RAGPretrainedModel.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
class RAGPretrainedModel:
    """
    Wrapper class for a pretrained RAG late-interaction model, and all the associated utilities.
    Allows you to load a pretrained model from disk or from the hub, build or query an index.

    ## Usage

    Load a pre-trained checkpoint:

    ```python
    from ragatouille import RAGPretrainedModel

    RAG = RAGPretrainedModel.from_pretrained("colbert-ir/colbertv2.0")
    ```

    Load checkpoint from an existing index:

    ```python
    from ragatouille import RAGPretrainedModel

    RAG = RAGPretrainedModel.from_index("path/to/my/index")
    ```

    Both methods will load a fully initialised instance of ColBERT, which you can use to build and query indexes.

    ```python
    RAG.search("How many people live in France?")
    ```
    """

    model_name: Union[str, None] = None
    model: Union[LateInteractionModel, None] = None
    corpus_processor: Optional[CorpusProcessor] = None

    @classmethod
    def from_pretrained(
        cls,
        pretrained_model_name_or_path: Union[str, Path],
        n_gpu: int = -1,
        verbose: int = 1,
        index_root: Optional[str] = None,
    ):
        """Load a ColBERT model from a pre-trained checkpoint.

        Parameters:
            pretrained_model_name_or_path (str): Local path or huggingface model name.
            n_gpu (int): Number of GPUs to use. By default, value is -1, which means use all available GPUs or none if no GPU is available.
            verbose (int): The level of ColBERT verbosity requested. By default, 1, which will filter out most internal logs.
            index_root (Optional[str]): The root directory where indexes will be stored. If None, will use the default directory, '.ragatouille/'.

        Returns:
            cls (RAGPretrainedModel): The current instance of RAGPretrainedModel, with the model initialised.
        """
        instance = cls()
        instance.model = ColBERT(
            pretrained_model_name_or_path, n_gpu, index_root=index_root, verbose=verbose
        )
        return instance

    @classmethod
    def from_index(
        cls, index_path: Union[str, Path], n_gpu: int = -1, verbose: int = 1
    ):
        """Load an Index and the associated ColBERT encoder from an existing document index.

        Parameters:
            index_path (Union[str, path]): Path to the index.
            n_gpu (int): Number of GPUs to use. By default, value is -1, which means use all available GPUs or none if no GPU is available.
            verbose (int): The level of ColBERT verbosity requested. By default, 1, which will filter out most internal logs.

        Returns:
            cls (RAGPretrainedModel): The current instance of RAGPretrainedModel, with the model and index initialised.
        """
        instance = cls()
        index_path = Path(index_path)
        instance.model = ColBERT(
            index_path, n_gpu, verbose=verbose, load_from_index=True
        )

        return instance

    def _process_metadata(
        self,
        document_ids: Optional[Union[TypeVar("T"), List[TypeVar("T")]]],
        document_metadatas: Optional[list[dict[Any, Any]]],
        collection_len: int,
    ) -> tuple[list[str], Optional[dict[Any, Any]]]:
        if document_ids is None:
            document_ids = [str(uuid4()) for i in range(collection_len)]
        else:
            if len(document_ids) != collection_len:
                raise ValueError("document_ids must be the same length as collection")
            if len(document_ids) != len(set(document_ids)):
                raise ValueError("document_ids must be unique")
            if any(not id.strip() for id in document_ids):
                raise ValueError("document_ids must not contain empty strings")
            if not all(isinstance(id, type(document_ids[0])) for id in document_ids):
                raise ValueError("All document_ids must be of the same type")

        if document_metadatas is not None:
            if len(document_metadatas) != collection_len:
                raise ValueError(
                    "document_metadatas must be the same length as collection"
                )
            docid_metadata_map = {
                x: y for x, y in zip(document_ids, document_metadatas)
            }
        else:
            docid_metadata_map = None

        return document_ids, docid_metadata_map

    def index(
        self,
        collection: list[str],
        document_ids: Union[TypeVar("T"), List[TypeVar("T")]] = None,
        document_metadatas: Optional[list[dict]] = None,
        index_name: str = None,
        overwrite_index: Union[bool, str] = True,
        max_document_length: int = 256,
        split_documents: bool = True,
        document_splitter_fn: Optional[Callable] = llama_index_sentence_splitter,
        preprocessing_fn: Optional[Union[Callable, list[Callable]]] = None,
    ):
        """Build an index from a list of documents.

        Parameters:
            collection (list[str]): The collection of documents to index.
            document_ids (Optional[list[str]]): An optional list of document ids. Ids will be generated at index time if not supplied.
            index_name (str): The name of the index that will be built.
            overwrite_index (Union[bool, str]): Whether to overwrite an existing index with the same name.
            max_document_length (int): The maximum length of a document. Documents longer than this will be split into chunks.
            split_documents (bool): Whether to split documents into chunks.
            document_splitter_fn (Optional[Callable]): A function to split documents into chunks. If None and by default, will use the llama_index_sentence_splitter.
            preprocessing_fn (Optional[Union[Callable, list[Callable]]]): A function or list of functions to preprocess documents. If None and by default, will not preprocess documents.

        Returns:
            index (str): The path to the index that was built.
        """

        document_ids, docid_metadata_map = self._process_metadata(
            document_ids=document_ids,
            document_metadatas=document_metadatas,
            collection_len=len(collection),
        )

        if split_documents or preprocessing_fn is not None:
            self.corpus_processor = CorpusProcessor(
                document_splitter_fn=document_splitter_fn if split_documents else None,
                preprocessing_fn=preprocessing_fn,
            )
            collection_with_ids = self.corpus_processor.process_corpus(
                collection,
                document_ids,
                chunk_size=max_document_length,
            )
        else:
            collection_with_ids = [
                {"document_id": x, "content": y}
                for x, y in zip(document_ids, collection)
            ]

        pid_docid_map = {
            index: item["document_id"] for index, item in enumerate(collection_with_ids)
        }
        collection = [x["content"] for x in collection_with_ids]
        return self.model.index(
            collection,
            pid_docid_map=pid_docid_map,
            docid_metadata_map=docid_metadata_map,
            index_name=index_name,
            max_document_length=max_document_length,
            overwrite=overwrite_index,
        )

    def add_to_index(
        self,
        new_collection: list[str],
        new_document_ids: Optional[Union[TypeVar("T"), List[TypeVar("T")]]] = None,
        new_document_metadatas: Optional[list[dict]] = None,
        index_name: Optional[str] = None,
        split_documents: bool = True,
        document_splitter_fn: Optional[Callable] = llama_index_sentence_splitter,
        preprocessing_fn: Optional[Union[Callable, list[Callable]]] = None,
    ):
        """Add documents to an existing index.

        Parameters:
            new_collection (list[str]): The documents to add to the index.
            new_document_metadatas (Optional[list[dict]]): An optional list of metadata dicts
            index_name (Optional[str]): The name of the index to add documents to. If None and by default, will add documents to the already initialised one.
        """
        new_document_ids, new_docid_metadata_map = self._process_metadata(
            document_ids=new_document_ids,
            document_metadatas=new_document_metadatas,
            collection_len=len(new_collection),
        )

        if split_documents or preprocessing_fn is not None:
            self.corpus_processor = CorpusProcessor(
                document_splitter_fn=document_splitter_fn if split_documents else None,
                preprocessing_fn=preprocessing_fn,
            )
            new_collection_with_ids = self.corpus_processor.process_corpus(
                new_collection,
                new_document_ids,
                chunk_size=self.model.config.doc_maxlen,
            )
        else:
            new_collection_with_ids = [
                {"document_id": x, "content": y}
                for x, y in zip(new_document_ids, new_collection)
            ]

        new_collection = [x["content"] for x in new_collection_with_ids]

        new_pid_docid_map = {
            index: item["document_id"]
            for index, item in enumerate(new_collection_with_ids)
        }

        self.model.add_to_index(
            new_collection,
            new_pid_docid_map,
            new_docid_metadata_map=new_docid_metadata_map,
            index_name=index_name,
        )

    def delete_from_index(
        self,
        document_ids: Union[TypeVar("T"), List[TypeVar("T")]],
        index_name: Optional[str] = None,
    ):
        """Delete documents from an index by their IDs.

        Parameters:
            document_ids (Union[TypeVar("T"), List[TypeVar("T")]]): The IDs of the documents to delete.
            index_name (Optional[str]): The name of the index to delete documents from. If None and by default, will delete documents from the already initialised one.
        """
        self.model.delete_from_index(
            document_ids,
            index_name=index_name,
        )

    def search(
        self,
        query: Union[str, list[str]],
        index_name: Optional["str"] = None,
        k: int = 10,
        force_fast: bool = False,
        zero_index_ranks: bool = False,
        **kwargs,
    ):
        """Query an index.

        Parameters:
            query (Union[str, list[str]]): The query or list of queries to search for.
            index_name (Optional[str]): Provide the name of an index to query. If None and by default, will query an already initialised one.
            k (int): The number of results to return for each query.
            force_fast (bool): Whether to force the use of a faster but less accurate search method.
            zero_index_ranks (bool): Whether to zero the index ranks of the results. By default, result rank 1 is the highest ranked result

        Returns:
            results (Union[list[dict], list[list[dict]]]): A list of dict containing individual results for each query. If a list of queries is provided, returns a list of lists of dicts. Each result is a dict with keys `content`, `score`, `rank`, and 'document_id'. If metadata was indexed for the document, it will be returned under the "document_metadata" key.

        Individual results are always in the format:
        ```python3
        {"content": "text of the relevant passage", "score": 0.123456, "rank": 1, "document_id": "x"}
        ```
        or
        ```python3
        {"content": "text of the relevant passage", "score": 0.123456, "rank": 1, "document_id": "x", "document_metadata": {"metadata_key": "metadata_value", ...}}
        ```

        """
        return self.model.search(
            query=query,
            index_name=index_name,
            k=k,
            force_fast=force_fast,
            zero_index_ranks=zero_index_ranks,
            **kwargs,
        )

    def rerank(
        self,
        query: Union[str, list[str]],
        documents: list[str],
        k: int = 10,
        zero_index_ranks: bool = False,
        bsize: int = 64,
    ):
        """Encode documents and rerank them in-memory. Performance degrades rapidly with more documents.

        Parameters:
            query (Union[str, list[str]]): The query or list of queries to search for.
            documents (list[str]): The documents to rerank.
            k (int): The number of results to return for each query.
            zero_index_ranks (bool): Whether to zero the index ranks of the results. By default, result rank 1 is the highest ranked result
            bsize (int): The batch size to use for re-ranking.

        Returns:
            results (Union[list[dict], list[list[dict]]]): A list of dict containing individual results for each query. If a list of queries is provided, returns a list of lists of dicts. Each result is a dict with keys `content`, `score` and `rank`.

        Individual results are always in the format:
        ```python3
        {"content": "text of the relevant passage", "score": 0.123456, "rank": 1}
        ```
        """

        return self.model.rank(
            query=query,
            documents=documents,
            k=k,
            zero_index_ranks=zero_index_ranks,
            bsize=bsize,
        )

    def encode(
        self,
        documents: list[str],
        bsize: int = 32,
        document_metadatas: Optional[list[dict]] = None,
        verbose: bool = True,
        max_document_length: Union[Literal["auto"], int] = "auto",
    ):
        """Encode documents in memory to be searched through with no Index. Performance degrades rapidly with more documents.

        Parameters:
            documents (list[str]): The documents to encode.
            bsize (int): The batch size to use for encoding.
            document_metadatas (Optional[list[dict]]): An optional list of metadata dicts. Each entry must correspond to a document.
        """
        if verbose:
            print(f"Encoding {len(documents)} documents...")
        self.model.encode(
            documents=documents,
            bsize=bsize,
            document_metadatas=document_metadatas,
            verbose=verbose,
            max_tokens=max_document_length,
        )
        if verbose:
            print("Documents encoded!")

    def search_encoded_docs(
        self,
        query: Union[str, list[str]],
        k: int = 10,
        bsize: int = 32,
    ) -> list[dict[str, Any]]:
        """Search through documents encoded in-memory.

        Parameters:
            query (Union[str, list[str]]): The query or list of queries to search for.
            k (int): The number of results to return for each query.
            batch_size (int): The batch size to use for searching.

        Returns:
            results (list[dict[str, Any]]): A list of dict containing individual results for each query. If a list of queries is provided, returns a list of lists of dicts.
        """
        return self.model.search_encoded_docs(
            queries=query,
            k=k,
            bsize=bsize,
        )

    def clear_encoded_docs(self, force: bool = False):
        """Clear documents encoded in-memory.

        Parameters:
            force (bool): Whether to force the clearing of encoded documents without enforcing a 10s wait time.
        """
        self.model.clear_encoded_docs(force=force)

    def as_langchain_retriever(self, **kwargs: Any) -> BaseRetriever:
        return RAGatouilleLangChainRetriever(model=self, kwargs=kwargs)

    def as_langchain_document_compressor(
        self, k: int = 5, **kwargs: Any
    ) -> BaseDocumentCompressor:
        return RAGatouilleLangChainCompressor(model=self, k=k, kwargs=kwargs)

add_to_index(new_collection, new_document_ids=None, new_document_metadatas=None, index_name=None, split_documents=True, document_splitter_fn=llama_index_sentence_splitter, preprocessing_fn=None)

Add documents to an existing index.

Parameters:

Name Type Description Default
new_collection list[str]

The documents to add to the index.

required
new_document_metadatas Optional[list[dict]]

An optional list of metadata dicts

None
index_name Optional[str]

The name of the index to add documents to. If None and by default, will add documents to the already initialised one.

None
Source code in ragatouille/RAGPretrainedModel.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def add_to_index(
    self,
    new_collection: list[str],
    new_document_ids: Optional[Union[TypeVar("T"), List[TypeVar("T")]]] = None,
    new_document_metadatas: Optional[list[dict]] = None,
    index_name: Optional[str] = None,
    split_documents: bool = True,
    document_splitter_fn: Optional[Callable] = llama_index_sentence_splitter,
    preprocessing_fn: Optional[Union[Callable, list[Callable]]] = None,
):
    """Add documents to an existing index.

    Parameters:
        new_collection (list[str]): The documents to add to the index.
        new_document_metadatas (Optional[list[dict]]): An optional list of metadata dicts
        index_name (Optional[str]): The name of the index to add documents to. If None and by default, will add documents to the already initialised one.
    """
    new_document_ids, new_docid_metadata_map = self._process_metadata(
        document_ids=new_document_ids,
        document_metadatas=new_document_metadatas,
        collection_len=len(new_collection),
    )

    if split_documents or preprocessing_fn is not None:
        self.corpus_processor = CorpusProcessor(
            document_splitter_fn=document_splitter_fn if split_documents else None,
            preprocessing_fn=preprocessing_fn,
        )
        new_collection_with_ids = self.corpus_processor.process_corpus(
            new_collection,
            new_document_ids,
            chunk_size=self.model.config.doc_maxlen,
        )
    else:
        new_collection_with_ids = [
            {"document_id": x, "content": y}
            for x, y in zip(new_document_ids, new_collection)
        ]

    new_collection = [x["content"] for x in new_collection_with_ids]

    new_pid_docid_map = {
        index: item["document_id"]
        for index, item in enumerate(new_collection_with_ids)
    }

    self.model.add_to_index(
        new_collection,
        new_pid_docid_map,
        new_docid_metadata_map=new_docid_metadata_map,
        index_name=index_name,
    )

clear_encoded_docs(force=False)

Clear documents encoded in-memory.

Parameters:

Name Type Description Default
force bool

Whether to force the clearing of encoded documents without enforcing a 10s wait time.

False
Source code in ragatouille/RAGPretrainedModel.py
384
385
386
387
388
389
390
def clear_encoded_docs(self, force: bool = False):
    """Clear documents encoded in-memory.

    Parameters:
        force (bool): Whether to force the clearing of encoded documents without enforcing a 10s wait time.
    """
    self.model.clear_encoded_docs(force=force)

delete_from_index(document_ids, index_name=None)

Delete documents from an index by their IDs.

Parameters:

Name Type Description Default
document_ids Union[TypeVar(T), List[TypeVar(T)]]

The IDs of the documents to delete.

required
index_name Optional[str]

The name of the index to delete documents from. If None and by default, will delete documents from the already initialised one.

None
Source code in ragatouille/RAGPretrainedModel.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def delete_from_index(
    self,
    document_ids: Union[TypeVar("T"), List[TypeVar("T")]],
    index_name: Optional[str] = None,
):
    """Delete documents from an index by their IDs.

    Parameters:
        document_ids (Union[TypeVar("T"), List[TypeVar("T")]]): The IDs of the documents to delete.
        index_name (Optional[str]): The name of the index to delete documents from. If None and by default, will delete documents from the already initialised one.
    """
    self.model.delete_from_index(
        document_ids,
        index_name=index_name,
    )

encode(documents, bsize=32, document_metadatas=None, verbose=True, max_document_length='auto')

Encode documents in memory to be searched through with no Index. Performance degrades rapidly with more documents.

Parameters:

Name Type Description Default
documents list[str]

The documents to encode.

required
bsize int

The batch size to use for encoding.

32
document_metadatas Optional[list[dict]]

An optional list of metadata dicts. Each entry must correspond to a document.

None
Source code in ragatouille/RAGPretrainedModel.py
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
def encode(
    self,
    documents: list[str],
    bsize: int = 32,
    document_metadatas: Optional[list[dict]] = None,
    verbose: bool = True,
    max_document_length: Union[Literal["auto"], int] = "auto",
):
    """Encode documents in memory to be searched through with no Index. Performance degrades rapidly with more documents.

    Parameters:
        documents (list[str]): The documents to encode.
        bsize (int): The batch size to use for encoding.
        document_metadatas (Optional[list[dict]]): An optional list of metadata dicts. Each entry must correspond to a document.
    """
    if verbose:
        print(f"Encoding {len(documents)} documents...")
    self.model.encode(
        documents=documents,
        bsize=bsize,
        document_metadatas=document_metadatas,
        verbose=verbose,
        max_tokens=max_document_length,
    )
    if verbose:
        print("Documents encoded!")

from_index(index_path, n_gpu=-1, verbose=1) classmethod

Load an Index and the associated ColBERT encoder from an existing document index.

Parameters:

Name Type Description Default
index_path Union[str, path]

Path to the index.

required
n_gpu int

Number of GPUs to use. By default, value is -1, which means use all available GPUs or none if no GPU is available.

-1
verbose int

The level of ColBERT verbosity requested. By default, 1, which will filter out most internal logs.

1

Returns:

Name Type Description
cls RAGPretrainedModel

The current instance of RAGPretrainedModel, with the model and index initialised.

Source code in ragatouille/RAGPretrainedModel.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@classmethod
def from_index(
    cls, index_path: Union[str, Path], n_gpu: int = -1, verbose: int = 1
):
    """Load an Index and the associated ColBERT encoder from an existing document index.

    Parameters:
        index_path (Union[str, path]): Path to the index.
        n_gpu (int): Number of GPUs to use. By default, value is -1, which means use all available GPUs or none if no GPU is available.
        verbose (int): The level of ColBERT verbosity requested. By default, 1, which will filter out most internal logs.

    Returns:
        cls (RAGPretrainedModel): The current instance of RAGPretrainedModel, with the model and index initialised.
    """
    instance = cls()
    index_path = Path(index_path)
    instance.model = ColBERT(
        index_path, n_gpu, verbose=verbose, load_from_index=True
    )

    return instance

from_pretrained(pretrained_model_name_or_path, n_gpu=-1, verbose=1, index_root=None) classmethod

Load a ColBERT model from a pre-trained checkpoint.

Parameters:

Name Type Description Default
pretrained_model_name_or_path str

Local path or huggingface model name.

required
n_gpu int

Number of GPUs to use. By default, value is -1, which means use all available GPUs or none if no GPU is available.

-1
verbose int

The level of ColBERT verbosity requested. By default, 1, which will filter out most internal logs.

1
index_root Optional[str]

The root directory where indexes will be stored. If None, will use the default directory, '.ragatouille/'.

None

Returns:

Name Type Description
cls RAGPretrainedModel

The current instance of RAGPretrainedModel, with the model initialised.

Source code in ragatouille/RAGPretrainedModel.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@classmethod
def from_pretrained(
    cls,
    pretrained_model_name_or_path: Union[str, Path],
    n_gpu: int = -1,
    verbose: int = 1,
    index_root: Optional[str] = None,
):
    """Load a ColBERT model from a pre-trained checkpoint.

    Parameters:
        pretrained_model_name_or_path (str): Local path or huggingface model name.
        n_gpu (int): Number of GPUs to use. By default, value is -1, which means use all available GPUs or none if no GPU is available.
        verbose (int): The level of ColBERT verbosity requested. By default, 1, which will filter out most internal logs.
        index_root (Optional[str]): The root directory where indexes will be stored. If None, will use the default directory, '.ragatouille/'.

    Returns:
        cls (RAGPretrainedModel): The current instance of RAGPretrainedModel, with the model initialised.
    """
    instance = cls()
    instance.model = ColBERT(
        pretrained_model_name_or_path, n_gpu, index_root=index_root, verbose=verbose
    )
    return instance

index(collection, document_ids=None, document_metadatas=None, index_name=None, overwrite_index=True, max_document_length=256, split_documents=True, document_splitter_fn=llama_index_sentence_splitter, preprocessing_fn=None)

Build an index from a list of documents.

Parameters:

Name Type Description Default
collection list[str]

The collection of documents to index.

required
document_ids Optional[list[str]]

An optional list of document ids. Ids will be generated at index time if not supplied.

None
index_name str

The name of the index that will be built.

None
overwrite_index Union[bool, str]

Whether to overwrite an existing index with the same name.

True
max_document_length int

The maximum length of a document. Documents longer than this will be split into chunks.

256
split_documents bool

Whether to split documents into chunks.

True
document_splitter_fn Optional[Callable]

A function to split documents into chunks. If None and by default, will use the llama_index_sentence_splitter.

llama_index_sentence_splitter
preprocessing_fn Optional[Union[Callable, list[Callable]]]

A function or list of functions to preprocess documents. If None and by default, will not preprocess documents.

None

Returns:

Name Type Description
index str

The path to the index that was built.

Source code in ragatouille/RAGPretrainedModel.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def index(
    self,
    collection: list[str],
    document_ids: Union[TypeVar("T"), List[TypeVar("T")]] = None,
    document_metadatas: Optional[list[dict]] = None,
    index_name: str = None,
    overwrite_index: Union[bool, str] = True,
    max_document_length: int = 256,
    split_documents: bool = True,
    document_splitter_fn: Optional[Callable] = llama_index_sentence_splitter,
    preprocessing_fn: Optional[Union[Callable, list[Callable]]] = None,
):
    """Build an index from a list of documents.

    Parameters:
        collection (list[str]): The collection of documents to index.
        document_ids (Optional[list[str]]): An optional list of document ids. Ids will be generated at index time if not supplied.
        index_name (str): The name of the index that will be built.
        overwrite_index (Union[bool, str]): Whether to overwrite an existing index with the same name.
        max_document_length (int): The maximum length of a document. Documents longer than this will be split into chunks.
        split_documents (bool): Whether to split documents into chunks.
        document_splitter_fn (Optional[Callable]): A function to split documents into chunks. If None and by default, will use the llama_index_sentence_splitter.
        preprocessing_fn (Optional[Union[Callable, list[Callable]]]): A function or list of functions to preprocess documents. If None and by default, will not preprocess documents.

    Returns:
        index (str): The path to the index that was built.
    """

    document_ids, docid_metadata_map = self._process_metadata(
        document_ids=document_ids,
        document_metadatas=document_metadatas,
        collection_len=len(collection),
    )

    if split_documents or preprocessing_fn is not None:
        self.corpus_processor = CorpusProcessor(
            document_splitter_fn=document_splitter_fn if split_documents else None,
            preprocessing_fn=preprocessing_fn,
        )
        collection_with_ids = self.corpus_processor.process_corpus(
            collection,
            document_ids,
            chunk_size=max_document_length,
        )
    else:
        collection_with_ids = [
            {"document_id": x, "content": y}
            for x, y in zip(document_ids, collection)
        ]

    pid_docid_map = {
        index: item["document_id"] for index, item in enumerate(collection_with_ids)
    }
    collection = [x["content"] for x in collection_with_ids]
    return self.model.index(
        collection,
        pid_docid_map=pid_docid_map,
        docid_metadata_map=docid_metadata_map,
        index_name=index_name,
        max_document_length=max_document_length,
        overwrite=overwrite_index,
    )

rerank(query, documents, k=10, zero_index_ranks=False, bsize=64)

Encode documents and rerank them in-memory. Performance degrades rapidly with more documents.

Parameters:

Name Type Description Default
query Union[str, list[str]]

The query or list of queries to search for.

required
documents list[str]

The documents to rerank.

required
k int

The number of results to return for each query.

10
zero_index_ranks bool

Whether to zero the index ranks of the results. By default, result rank 1 is the highest ranked result

False
bsize int

The batch size to use for re-ranking.

64

Returns:

Name Type Description
results Union[list[dict], list[list[dict]]]

A list of dict containing individual results for each query. If a list of queries is provided, returns a list of lists of dicts. Each result is a dict with keys content, score and rank.

Individual results are always in the format:

{"content": "text of the relevant passage", "score": 0.123456, "rank": 1}
Source code in ragatouille/RAGPretrainedModel.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
def rerank(
    self,
    query: Union[str, list[str]],
    documents: list[str],
    k: int = 10,
    zero_index_ranks: bool = False,
    bsize: int = 64,
):
    """Encode documents and rerank them in-memory. Performance degrades rapidly with more documents.

    Parameters:
        query (Union[str, list[str]]): The query or list of queries to search for.
        documents (list[str]): The documents to rerank.
        k (int): The number of results to return for each query.
        zero_index_ranks (bool): Whether to zero the index ranks of the results. By default, result rank 1 is the highest ranked result
        bsize (int): The batch size to use for re-ranking.

    Returns:
        results (Union[list[dict], list[list[dict]]]): A list of dict containing individual results for each query. If a list of queries is provided, returns a list of lists of dicts. Each result is a dict with keys `content`, `score` and `rank`.

    Individual results are always in the format:
    ```python3
    {"content": "text of the relevant passage", "score": 0.123456, "rank": 1}
    ```
    """

    return self.model.rank(
        query=query,
        documents=documents,
        k=k,
        zero_index_ranks=zero_index_ranks,
        bsize=bsize,
    )

search(query, index_name=None, k=10, force_fast=False, zero_index_ranks=False, **kwargs)

Query an index.

Parameters:

Name Type Description Default
query Union[str, list[str]]

The query or list of queries to search for.

required
index_name Optional[str]

Provide the name of an index to query. If None and by default, will query an already initialised one.

None
k int

The number of results to return for each query.

10
force_fast bool

Whether to force the use of a faster but less accurate search method.

False
zero_index_ranks bool

Whether to zero the index ranks of the results. By default, result rank 1 is the highest ranked result

False

Returns:

Name Type Description
results Union[list[dict], list[list[dict]]]

A list of dict containing individual results for each query. If a list of queries is provided, returns a list of lists of dicts. Each result is a dict with keys content, score, rank, and 'document_id'. If metadata was indexed for the document, it will be returned under the "document_metadata" key.

Individual results are always in the format:

{"content": "text of the relevant passage", "score": 0.123456, "rank": 1, "document_id": "x"}

or

{"content": "text of the relevant passage", "score": 0.123456, "rank": 1, "document_id": "x", "document_metadata": {"metadata_key": "metadata_value", ...}}
Source code in ragatouille/RAGPretrainedModel.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
def search(
    self,
    query: Union[str, list[str]],
    index_name: Optional["str"] = None,
    k: int = 10,
    force_fast: bool = False,
    zero_index_ranks: bool = False,
    **kwargs,
):
    """Query an index.

    Parameters:
        query (Union[str, list[str]]): The query or list of queries to search for.
        index_name (Optional[str]): Provide the name of an index to query. If None and by default, will query an already initialised one.
        k (int): The number of results to return for each query.
        force_fast (bool): Whether to force the use of a faster but less accurate search method.
        zero_index_ranks (bool): Whether to zero the index ranks of the results. By default, result rank 1 is the highest ranked result

    Returns:
        results (Union[list[dict], list[list[dict]]]): A list of dict containing individual results for each query. If a list of queries is provided, returns a list of lists of dicts. Each result is a dict with keys `content`, `score`, `rank`, and 'document_id'. If metadata was indexed for the document, it will be returned under the "document_metadata" key.

    Individual results are always in the format:
    ```python3
    {"content": "text of the relevant passage", "score": 0.123456, "rank": 1, "document_id": "x"}
    ```
    or
    ```python3
    {"content": "text of the relevant passage", "score": 0.123456, "rank": 1, "document_id": "x", "document_metadata": {"metadata_key": "metadata_value", ...}}
    ```

    """
    return self.model.search(
        query=query,
        index_name=index_name,
        k=k,
        force_fast=force_fast,
        zero_index_ranks=zero_index_ranks,
        **kwargs,
    )

search_encoded_docs(query, k=10, bsize=32)

Search through documents encoded in-memory.

Parameters:

Name Type Description Default
query Union[str, list[str]]

The query or list of queries to search for.

required
k int

The number of results to return for each query.

10
batch_size int

The batch size to use for searching.

required

Returns:

Name Type Description
results list[dict[str, Any]]

A list of dict containing individual results for each query. If a list of queries is provided, returns a list of lists of dicts.

Source code in ragatouille/RAGPretrainedModel.py
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
def search_encoded_docs(
    self,
    query: Union[str, list[str]],
    k: int = 10,
    bsize: int = 32,
) -> list[dict[str, Any]]:
    """Search through documents encoded in-memory.

    Parameters:
        query (Union[str, list[str]]): The query or list of queries to search for.
        k (int): The number of results to return for each query.
        batch_size (int): The batch size to use for searching.

    Returns:
        results (list[dict[str, Any]]): A list of dict containing individual results for each query. If a list of queries is provided, returns a list of lists of dicts.
    """
    return self.model.search_encoded_docs(
        queries=query,
        k=k,
        bsize=bsize,
    )