drbd: fix potential distributed deadlock

We limit ourselves to a configurable maximum number of pages used as
temporary bio pages.

If the configured "max_buffers" is not big enough to match the bandwidth
of the respective deployment, a distributed deadlock could be triggered
by e.g. fast online verify and heavy application IO.

TCP connections would block on congestion, because both receivers
would wait on pages to become available.

Fortunately the respective senders in this case would be able to give
back some pages already. So do that.

Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
This commit is contained in:
Lars Ellenberg 2011-03-08 17:11:40 +01:00 committed by Philipp Reisner
parent 600942e0fd
commit 53ea433145

View File

@ -297,42 +297,48 @@ void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *
crypto_hash_final(&desc, digest);
}
static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
/* TODO merge common code with w_e_end_ov_req */
int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
{
struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
int digest_size;
void *digest;
int ok;
int ok = 1;
D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
if (unlikely(cancel)) {
if (unlikely(cancel))
goto out;
if (likely((e->flags & EE_WAS_ERROR) != 0))
goto out;
digest_size = crypto_hash_digestsize(mdev->csums_tfm);
digest = kmalloc(digest_size, GFP_NOIO);
if (digest) {
sector_t sector = e->sector;
unsigned int size = e->size;
drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
/* Free e and pages before send.
* In case we block on congestion, we could otherwise run into
* some distributed deadlock, if the other side blocks on
* congestion as well, because our receiver blocks in
* drbd_pp_alloc due to pp_in_use > max_buffers. */
drbd_free_ee(mdev, e);
return 1;
e = NULL;
inc_rs_pending(mdev);
ok = drbd_send_drequest_csum(mdev, sector, size,
digest, digest_size,
P_CSUM_RS_REQUEST);
kfree(digest);
} else {
dev_err(DEV, "kmalloc() of digest failed.\n");
ok = 0;
}
if (likely((e->flags & EE_WAS_ERROR) == 0)) {
digest_size = crypto_hash_digestsize(mdev->csums_tfm);
digest = kmalloc(digest_size, GFP_NOIO);
if (digest) {
drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
inc_rs_pending(mdev);
ok = drbd_send_drequest_csum(mdev,
e->sector,
e->size,
digest,
digest_size,
P_CSUM_RS_REQUEST);
kfree(digest);
} else {
dev_err(DEV, "kmalloc() of digest failed.\n");
ok = 0;
}
} else
ok = 1;
drbd_free_ee(mdev, e);
out:
if (e)
drbd_free_ee(mdev, e);
if (unlikely(!ok))
dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
@ -1071,9 +1077,12 @@ int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
return ok;
}
/* TODO merge common code with w_e_send_csum */
int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
{
struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
sector_t sector = e->sector;
unsigned int size = e->size;
int digest_size;
void *digest;
int ok = 1;
@ -1093,17 +1102,25 @@ int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
else
memset(digest, 0, digest_size);
/* Free e and pages before send.
* In case we block on congestion, we could otherwise run into
* some distributed deadlock, if the other side blocks on
* congestion as well, because our receiver blocks in
* drbd_pp_alloc due to pp_in_use > max_buffers. */
drbd_free_ee(mdev, e);
e = NULL;
inc_rs_pending(mdev);
ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
digest, digest_size, P_OV_REPLY);
ok = drbd_send_drequest_csum(mdev, sector, size,
digest, digest_size,
P_OV_REPLY);
if (!ok)
dec_rs_pending(mdev);
kfree(digest);
out:
drbd_free_ee(mdev, e);
if (e)
drbd_free_ee(mdev, e);
dec_unacked(mdev);
return ok;
}
@ -1122,8 +1139,10 @@ int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
{
struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
struct digest_info *di;
int digest_size;
void *digest;
sector_t sector = e->sector;
unsigned int size = e->size;
int digest_size;
int ok, eq = 0;
if (unlikely(cancel)) {
@ -1153,16 +1172,21 @@ int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
}
}
dec_unacked(mdev);
/* Free e and pages before send.
* In case we block on congestion, we could otherwise run into
* some distributed deadlock, if the other side blocks on
* congestion as well, because our receiver blocks in
* drbd_pp_alloc due to pp_in_use > max_buffers. */
drbd_free_ee(mdev, e);
if (!eq)
drbd_ov_oos_found(mdev, e->sector, e->size);
drbd_ov_oos_found(mdev, sector, size);
else
ov_oos_print(mdev);
ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
ok = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
drbd_free_ee(mdev, e);
dec_unacked(mdev);
--mdev->ov_left;