diff --git a/src/object/srv_internal.h b/src/object/srv_internal.h index 811911b54da..05010c17caa 100644 --- a/src/object/srv_internal.h +++ b/src/object/srv_internal.h @@ -178,6 +178,8 @@ struct obj_tls { d_sg_list_t ot_echo_sgl; d_list_t ot_pool_list; + int ot_rpc_bulk_inflight; + /** Measure per-operation latency in us (type = gauge) */ struct d_tm_node_t *ot_op_lat[OBJ_PROTO_CLI_COUNT]; /** Count number of per-opcode active requests (type = gauge) */ @@ -227,6 +229,11 @@ enum latency_type { } \ } while (0) +/* Per-target in-flight object RPC bulk count threshold. */ +extern uint32_t obj_rpc_bulk_thd; + +#define OBJ_RPC_BULK_THD_DEF 512 + static inline void obj_update_latency(uint32_t opc, uint32_t type, uint64_t latency, uint64_t io_size) { diff --git a/src/object/srv_mod.c b/src/object/srv_mod.c index a668089a71d..8b8451a0d16 100644 --- a/src/object/srv_mod.c +++ b/src/object/srv_mod.c @@ -1,6 +1,6 @@ /** * (C) Copyright 2016-2024 Intel Corporation. - * (C) Copyright 2025 Hewlett Packard Enterprise Development LP + * (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -46,6 +46,10 @@ obj_mod_init(void) goto out_ec; } + obj_rpc_bulk_thd = OBJ_RPC_BULK_THD_DEF; + d_getenv_uint32_t("DAOS_OBJ_RPC_BULK_THD", &obj_rpc_bulk_thd); + D_INFO("Set per-target in-flight object RPC bulk threshold as %u\n", obj_rpc_bulk_thd); + return 0; out_ec: diff --git a/src/object/srv_obj.c b/src/object/srv_obj.c index 62044b08221..eaba292a5be 100644 --- a/src/object/srv_obj.c +++ b/src/object/srv_obj.c @@ -262,6 +262,13 @@ obj_bulk_comp_cb(const struct crt_bulk_cb_info *cb_info) ABT_eventual_set(arg->eventual, &arg->result, sizeof(arg->result)); + if (obj_rpc_bulk_thd != 0) { + struct obj_tls *tls = obj_tls_get(); + + D_ASSERT(tls->ot_rpc_bulk_inflight > 0); + tls->ot_rpc_bulk_inflight--; + } + crt_req_decref(rpc); return cb_info->bci_rc; } @@ -333,14 +340,15 @@ bulk_transfer_sgl(daos_handle_t ioh, crt_rpc_t *rpc, crt_bulk_t remote_bulk, off_t remote_off, crt_bulk_op_t bulk_op, bool bulk_bind, d_sg_list_t *sgl, int sgl_idx, struct obj_bulk_args *p_arg) { - struct crt_bulk_desc bulk_desc; - crt_bulk_perm_t bulk_perm; - crt_bulk_opid_t bulk_opid; - crt_bulk_t local_bulk; - unsigned int local_off; - unsigned int iov_idx = 0; - size_t remote_size; - int rc, bulk_iovs = 0; + struct obj_tls *tls = obj_tls_get(); + struct crt_bulk_desc bulk_desc; + crt_bulk_perm_t bulk_perm; + crt_bulk_opid_t bulk_opid; + crt_bulk_t local_bulk; + unsigned int local_off; + unsigned int iov_idx = 0; + size_t remote_size; + int rc, bulk_iovs = 0; if (remote_bulk == NULL) { D_ERROR("Remote bulk is NULL\n"); @@ -477,6 +485,20 @@ bulk_transfer_sgl(daos_handle_t ioh, crt_rpc_t *rpc, crt_bulk_t remote_bulk, break; } + /* Do not allow too many inflight object RPC bulk transfer to avoid server overload + * and network congestion. Let client retry via returning -DER_INPROGRESS. + */ + if (obj_rpc_bulk_thd != 0) { + if (unlikely(tls->ot_rpc_bulk_inflight >= obj_rpc_bulk_thd)) { + D_WARN("Too many inflight object RPC bulk transfer %d vs %u\n", + tls->ot_rpc_bulk_inflight, obj_rpc_bulk_thd); + rc = -DER_INPROGRESS; + break; + } + + tls->ot_rpc_bulk_inflight++; + } + crt_req_addref(rpc); bulk_desc.bd_rpc = rpc; @@ -500,6 +522,10 @@ bulk_transfer_sgl(daos_handle_t ioh, crt_rpc_t *rpc, crt_bulk_t remote_bulk, if (rc < 0) { D_ERROR("crt_bulk_transfer %d error " DF_RC "\n", sgl_idx, DP_RC(rc)); p_arg->bulks_inflight--; + + if (obj_rpc_bulk_thd != 0) + tls->ot_rpc_bulk_inflight--; + if (!cached_bulk) crt_bulk_free(local_bulk); crt_req_decref(rpc); @@ -3035,6 +3061,7 @@ ds_obj_rw_handler(crt_rpc_t *rpc) uint32_t max_ver = 0; struct dtx_epoch epoch = {0}; int rc; + int retry = 0; bool need_abort = false; D_ASSERT(orw != NULL); @@ -3252,6 +3279,17 @@ ds_obj_rw_handler(crt_rpc_t *rpc) break; } + /* If we have already retried once, but still failed for -DER_TX_RESTART, then + * it is quite possible that the -DER_TX_RESTART failure is related with server + * overload or some congestion caused RPC delay. Let's ask client to retry with + * some backoff delay. That will avoid increasing server workload/congestion and + * avoid client RPC timeout during server retry repeatedly. + */ + if (++retry > 1) { + rc = -DER_INPROGRESS; + break; + } + /* Only standalone updates use this RPC. Retry with newer epoch. */ orw->orw_epoch = d_hlc_get(); exec_arg.flags |= ORF_RESEND; @@ -4023,6 +4061,7 @@ ds_obj_punch_handler(crt_rpc_t *rpc) uint32_t max_ver = 0; struct dtx_epoch epoch; int rc; + int retry = 0; bool need_abort = false; opi = crt_req_get(rpc); @@ -4163,6 +4202,17 @@ ds_obj_punch_handler(crt_rpc_t *rpc) break; } + /* If we have already retried once, but still failed for -DER_TX_RESTART, then + * it is quite possible that the -DER_TX_RESTART failure is related with server + * overload or some congestion caused RPC delay. Let's ask client to retry with + * some backoff delay. That will avoid increasing server workload/congestion and + * avoid client RPC timeout during server retry repeatedly. + */ + if (++retry > 1) { + rc = -DER_INPROGRESS; + break; + } + /* Only standalone punches use this RPC. Retry with newer epoch. */ opi->opi_epoch = d_hlc_get(); exec_arg.flags |= ORF_RESEND;