Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/object/srv_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ struct obj_tls {
d_sg_list_t ot_echo_sgl;
d_list_t ot_pool_list;

int ot_rpc_bulk_inflight;

/** Measure per-operation latency in us (type = gauge) */
struct d_tm_node_t *ot_op_lat[OBJ_PROTO_CLI_COUNT];
/** Count number of per-opcode active requests (type = gauge) */
Expand Down Expand Up @@ -227,6 +229,11 @@ enum latency_type {
} \
} while (0)

/* Per-target in-flight object RPC bulk count threshold. */
extern uint32_t obj_rpc_bulk_thd;

#define OBJ_RPC_BULK_THD_DEF 512

static inline void
obj_update_latency(uint32_t opc, uint32_t type, uint64_t latency, uint64_t io_size)
{
Expand Down
6 changes: 5 additions & 1 deletion src/object/srv_mod.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/**
* (C) Copyright 2016-2024 Intel Corporation.
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
* (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -46,6 +46,10 @@ obj_mod_init(void)
goto out_ec;
}

obj_rpc_bulk_thd = OBJ_RPC_BULK_THD_DEF;
d_getenv_uint32_t("DAOS_OBJ_RPC_BULK_THD", &obj_rpc_bulk_thd);
D_INFO("Set per-target in-flight object RPC bulk threshold as %u\n", obj_rpc_bulk_thd);

return 0;

out_ec:
Expand Down
66 changes: 58 additions & 8 deletions src/object/srv_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,13 @@ obj_bulk_comp_cb(const struct crt_bulk_cb_info *cb_info)
ABT_eventual_set(arg->eventual, &arg->result,
sizeof(arg->result));

if (obj_rpc_bulk_thd != 0) {
struct obj_tls *tls = obj_tls_get();

D_ASSERT(tls->ot_rpc_bulk_inflight > 0);
tls->ot_rpc_bulk_inflight--;
}

crt_req_decref(rpc);
return cb_info->bci_rc;
}
Expand Down Expand Up @@ -333,14 +340,15 @@ bulk_transfer_sgl(daos_handle_t ioh, crt_rpc_t *rpc, crt_bulk_t remote_bulk,
off_t remote_off, crt_bulk_op_t bulk_op, bool bulk_bind,
d_sg_list_t *sgl, int sgl_idx, struct obj_bulk_args *p_arg)
{
struct crt_bulk_desc bulk_desc;
crt_bulk_perm_t bulk_perm;
crt_bulk_opid_t bulk_opid;
crt_bulk_t local_bulk;
unsigned int local_off;
unsigned int iov_idx = 0;
size_t remote_size;
int rc, bulk_iovs = 0;
struct obj_tls *tls = obj_tls_get();
struct crt_bulk_desc bulk_desc;
crt_bulk_perm_t bulk_perm;
crt_bulk_opid_t bulk_opid;
crt_bulk_t local_bulk;
unsigned int local_off;
unsigned int iov_idx = 0;
size_t remote_size;
int rc, bulk_iovs = 0;

if (remote_bulk == NULL) {
D_ERROR("Remote bulk is NULL\n");
Expand Down Expand Up @@ -477,6 +485,20 @@ bulk_transfer_sgl(daos_handle_t ioh, crt_rpc_t *rpc, crt_bulk_t remote_bulk,
break;
}

/* Do not allow too many inflight object RPC bulk transfer to avoid server overload
* and network congestion. Let client retry via returning -DER_INPROGRESS.
*/
if (obj_rpc_bulk_thd != 0) {
if (unlikely(tls->ot_rpc_bulk_inflight >= obj_rpc_bulk_thd)) {
D_WARN("Too many inflight object RPC bulk transfer %d vs %u\n",
tls->ot_rpc_bulk_inflight, obj_rpc_bulk_thd);
rc = -DER_INPROGRESS;
break;
}

tls->ot_rpc_bulk_inflight++;
}

crt_req_addref(rpc);

bulk_desc.bd_rpc = rpc;
Expand All @@ -500,6 +522,10 @@ bulk_transfer_sgl(daos_handle_t ioh, crt_rpc_t *rpc, crt_bulk_t remote_bulk,
if (rc < 0) {
D_ERROR("crt_bulk_transfer %d error " DF_RC "\n", sgl_idx, DP_RC(rc));
p_arg->bulks_inflight--;

if (obj_rpc_bulk_thd != 0)
tls->ot_rpc_bulk_inflight--;

if (!cached_bulk)
crt_bulk_free(local_bulk);
crt_req_decref(rpc);
Expand Down Expand Up @@ -3035,6 +3061,7 @@ ds_obj_rw_handler(crt_rpc_t *rpc)
uint32_t max_ver = 0;
struct dtx_epoch epoch = {0};
int rc;
int retry = 0;
bool need_abort = false;

D_ASSERT(orw != NULL);
Expand Down Expand Up @@ -3252,6 +3279,17 @@ ds_obj_rw_handler(crt_rpc_t *rpc)
break;
}

/* If we have already retried once, but still failed for -DER_TX_RESTART, then
* it is quite possible that the -DER_TX_RESTART failure is related with server
* overload or some congestion caused RPC delay. Let's ask client to retry with
* some backoff delay. That will avoid increasing server workload/congestion and
* avoid client RPC timeout during server retry repeatedly.
*/
if (++retry > 1) {
rc = -DER_INPROGRESS;
break;
}

/* Only standalone updates use this RPC. Retry with newer epoch. */
orw->orw_epoch = d_hlc_get();
exec_arg.flags |= ORF_RESEND;
Expand Down Expand Up @@ -4023,6 +4061,7 @@ ds_obj_punch_handler(crt_rpc_t *rpc)
uint32_t max_ver = 0;
struct dtx_epoch epoch;
int rc;
int retry = 0;
bool need_abort = false;

opi = crt_req_get(rpc);
Expand Down Expand Up @@ -4163,6 +4202,17 @@ ds_obj_punch_handler(crt_rpc_t *rpc)
break;
}

/* If we have already retried once, but still failed for -DER_TX_RESTART, then
* it is quite possible that the -DER_TX_RESTART failure is related with server
* overload or some congestion caused RPC delay. Let's ask client to retry with
* some backoff delay. That will avoid increasing server workload/congestion and
* avoid client RPC timeout during server retry repeatedly.
*/
if (++retry > 1) {
rc = -DER_INPROGRESS;
break;
}

/* Only standalone punches use this RPC. Retry with newer epoch. */
opi->opi_epoch = d_hlc_get();
exec_arg.flags |= ORF_RESEND;
Expand Down
Loading